LoopJump's Blog

MemSQL优化器

2022-10-29

《The MemSQL Query Optimizer: A modern optimizer for real-time analystic in a distributed database》VLDB2016

MemSQL是面向内存优化的数据库,本身是shared-nothing架构。MemSQL后来改名叫SingleStoreDB(S2DB),详细的架构参考这里:http://loopjump.com/singlestore-cloud-native-htap/

MemSQL的优化器分为三步:Rewriter, Enumerator, Planner。

因为MemSQL是shared-nothing的分布式架构,优化器的输出结果自然也是分布式执行计划(DQEP)。

先初步看下一个优化的例子,这里选的是TPC-H Q17

原始查询语句是

1
2
3
4
5
6
7
8
9
SELECT sum(l_extendedprice)/7.0 as avg_yearly
FROM lineitem, part
WHERE p_partkey = l_partkey
AND p_brand = 'BRAND#43'
AND p_container = 'LGPACK'
AND l_quantity <
(SELECT 0.2*avg(l_quantity)
FROM lineitem
    WHERE l_partkey = p_partkey);

Rewriter会将这里的标量子查询转为join。原子查询计算起来会非常麻烦,对于part的每一行都要进行分布式计算出子查询结果。Rewrite为join后可以有更灵活的join plan和DQEP。

1
2
3
4
5
6
7
8
9
10
SELECT Sum(l_extendedprice)/7.0 AS avg_yearly
FROM lineitem
(SELECT 0.2 * Avg(l_quantity) AS s_avg, l_partkey AS s_partkey
FROM lineitem, part
WHERE p_brand = 'BRAND#43'
AND p_container = 'LGPACK'
AND p_partkey = l_partkey
GROUP BY l_partkey  ) sub
WHERE s_partkey = l_partkey
AND l_quantity < s_avg;

Enumerator会尝试找代价低的计划。因为lineitem表很大,所以最好是广播part和sub两个较小的表。所以Enumerator的计划如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
Project [s2/7.0 AS avg_yearly]
Aggregate [SUM(1) AS s2]
Gather partitions : all
Aggregate [SUM(lineitem_1.l_extendedprice) AS s1]
Filter [lineitem_1.l_quantity < s_avg]
NestedLoopJoin
|---IndexRangeScan lineitem AS lineitem_1,
| KEY(l_partkey) scan: [l_partkey = p_partkey]
Broadcast
HashGroupBy [AVG(l_quantity)*0.2 AS s_avg]
groups:[l_partkey]
NestedLoopJoin
|---IndexRangeScan lineitem,
| KEY(l_partkey) scan: [l_partkey = p_partkey]
Broadcast
Filter [p_brand = 'BRAND#43' AND
p_container = 'LGPACK']
TableScan part, PRIMARY KEY(p_partkey)

Planner根据上面的计划生成DQEP,MemSQL引入了RemoteTable和ResultTable的概念来支持DQEP。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
CREATE RESULT TABLE r0 AS
SELECT p_partkey
FROM part
WHERE p_brand = 'BRAND#43'
AND p_container = 'LGPACK';

CREATE RESULT TABLE r2 AS
SELECT 0.2 * Avg(l_quantity) AS s_avg,
l_partkey as s_partkey
FROM REMOTE(r0), lineitem
WHERE p_partkey = l_partkey
GROUP BY l_partkey;

SELECT SUM(l_extendedprice)/7.0 AS avg_yearly
FROM REMOTE(r1), lineitem
WHERE p_partkey = s_partkey
AND l_quantity < s_avg;

Rewriter

基于heuristic和cost-base的重写,将一个查询改写为另一个性能更好的等价的查询。

一些典型的基于启发的重写规则:

  • Column Elimination:消除没有实际用上的列
  • Group-By Pushdown:将group by移动到join前执行
  • Sub-Query Merging:子查询改为JOIN,这样从局部优化变成全局优化,可能有更大的优化空间

重写规则很多,需要考虑这些规则的顺序,有时候可能需要交叉应用这些规则。

Rewriter改写时需要注意要将数据分布存储考虑进来。例如

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
CREATE RESULT TABLE r0 AS
SELECT p_partkey
FROM part
WHERE p_brand = 'BRAND#43'
AND p_container = 'LGPACK';

CREATE RESULT TABLE r2 AS
SELECT 0.2 * Avg(l_quantity) AS s_avg,
l_partkey as s_partkey
FROM REMOTE(r0), lineitem
WHERE p_partkey = l_partkey
GROUP BY l_partkey;

SELECT SUM(l_extendedprice)/7.0 AS avg_yearly
FROM REMOTE(r1), lineitem
WHERE p_partkey = s_partkey
AND l_quantity < s_avg;

假设T1有R1=200000行,T2有R2=50000行,T1执行完group后剩余有S_G =1/4比例,即S_G * R1 = 50000行,执行完Join剩余S_J=1/10,即S_J * R1=20000行,那么Group和Join执行完剩余R1 * S_G * S_J = 5000行。

假设T2.a索引单点查询代价为每行C_J=1,Group By通过hashtable实现,代价为每行C_G=1,那么:

$$ Cost_{Q_1} = R_1 C_J + R_1 S_J C_G = 200000 * 1 + 2000001/101 = 220000 \ Cost_{Q_2} = R_1 C_G + R_1 S_G C_J = 200000 * 1 + 2000001/41 = 250000 $$

如果根据这两个代价值对比,我们可能就选择Q1了。

但是如果考虑数据是分布存储的呢?因为T2按T2.a分区,但T1没有按T1.a分区,所以join需要移动数据,要么T1按T1.a做reshuffle或者广播T2,如果T2比较大,那么可能要选择reshuffle T1。这种情况下Q1和Q2的代价情况可能就有所不同了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
Q1:
Gather partitions:all
Project [r0.s]
NestedLoopJoin
|--- IndexSeek T2, UNIQUE KEY (a) scan:[a = r0.a]
Repartition AS r0 shard_key:[a]
HashGrouoBy [sum(T1.b) AS s] groups:[T1.a, T1.b]
TableScan T1

Q2:
Gather partitions:all
Project [r0.s]
HashGroupBy [sum(r0.b) AS s] groups:[r0.a, r0.b]
NestedLoopJoin
|---IndexSeek T2, UNIQUE KEY (a) scan:[a=r0.a]
Repartition AS r0 shard_key:[a]
TableScan T1

假设reshutffle的代价为每行C_R=3,则

$$ Cost_{Q_1} = R_1 C_R + R_1 C_J + R_1 S_J C_G = 620000 \ Cost_{Q_2} = R_1 C_G + R_1 S_G C_R + R_1 S_G C_J = 400000 $$

BUSHY JOINS

搜索所有的join顺序来找到最优解是个NP问题。很多系统基本上值考虑left-deep或roght-deep join tree。但对于一些涉及到star和snowflake schema的查询,bushy Join可能会有非常大的性能优势。

注意到,即使Enumerator只考虑left-deep join tree,生成bushy join也是一个自然的过程,因为join的右侧的derived table可以扩展开,每个subselect可以采用left-deep join,这样自然就生成了bushy join。

我们以一个例子具体介绍这个算法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
TPC-DS Q25:

SELECT ...
FROM store_sales ss
store_returns sr,
catalog_sales cs,
date_dim d1,
date_dim d2,
date_dim d3,
store s,
item i
WHERE d1.d_moy = 4
AND d1.d_year = 2000
AND d1.d_date_sk = ss_sold_date_sk
AND i_item_sk = ss_item_sk
AND s_store_sk = ss_store_sk
AND ss_customer_sk = sr_customer_sk
AND ss_item_sk = sr_item_sk
AND ss_ticket_number = sr_ticket_number
AND sr_returned_date_sk = d2.d_date_sk
AND d2.d_moy BETWEEN 4 and 10
AND d2.d_year = 2000
AND sr_customer_sk = cs_bill_customer_sk
AND sr_item_sk = cs_item_sk
AND d3.d_moy BETWEEN 4 and 10
AND d3.d_year = 2000
GROUP BY ...
ORDER BY ...
  1. 按照join关系构造如下的图,其中节点是表,线是join关系

    !http://loopjump.com/wp-content/uploads/2022/10/image-20220928171737894-300x163.png

  2. 定义satellite表为至少有一个谓词的join表,且这些表只跟一张表有join。如图中的d1, d2, d3。

  3. 定义seed表为至少跟两个表有join,且其中至少一个是satellite表,如图中ss, sr, cs。

  4. 对于每一个seed表:

  • 计算当前plan的代价C1。将这个seed表和它join的若干satellite表改为derived table写法。应用Predictate Pushdown规则,然后使用Column Elimination规则,使得外面的谓词转到内部并且外部不需要的列在内部也消除掉。计算新的代价C2。如果C1 < C2,则放弃这个重写变换,否则继续下一个seed表。

例如前面的查询改写后的结果为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
SELECT ...
FROM store_sales,
store_returns,
date_dim d1,
date_dim d2,
store,
item,
(SELECT *
FROM catalog_sales,
date_dim d3
WHERE cs_sold_date_sk = d3.d_date_sk
AND d3.d_moy BETWEEN 4 AND 10
AND d3.d_year = 2000) sub
WHERE d1.d_moy = 4 AND d1.d_year = 2000
AND d1.d_date_sk = ss_sold_date_sk
AND i_item_sk = ss_item_sk
AND s_store_sk = ss_store_sk
AND ss_customer_sk = sr_customer_sk
AND ss_item_sk = sr_item_sk
AND ss_ticket_number = sr_ticket_number
AND sr_returned_date_sk = d2.d_date_sk
AND d2.d_moy BETWEEN 4 AND 10
AND d2.d_year = 2000
AND sr_customer_sk = cs_bill_customer_sk
AND sr_item_sk = cs_item_sk

!http://loopjump.com/wp-content/uploads/2022/10/image-20220929103832599-300x174.png

Enumerator

Enumerator是Rewriter的下游,Rewriter主要是逻辑转换,Enumerator主要是物理转换。Enumerator要感知代价,而且单机上最佳的计划直接并行化并不一定是分布式下最好的计划。

Enumerator只负责针对每一个select block做join优化,跨select block的move join靠前面Rewriter来实现。

Enumerator通过bottom-up方式处理select block,最开始从优化最小的subselect开始,然后使用annotation information(类似于优化器hint)来激进优化更大的subselect,最后当最外面的select block处理完,整个查询的物理执行计划就确定了。

尽管Enumerator使用的是bottom-up的方式,但换用top-down的方式也类似。为了限制组合数爆炸,其他系统System-R类似基于interesting order的的动态规划,Enumerator实现了基于interesting property of sharding distribution(如sharding列等)的方式,值得考虑的sharding key如equal join的predicate column、grouping columns。

分布式代价

分布式执行计划代价里面多了数据移动的开销。

数据移动一般包含两类

  • broadcast:数据从一个leaf node广播到所有其他leaf node,代价为R*D,其中R为待移动数据量,D为平均move一行的代价
  • partition(reshuffle):数据从一个leaf node到某个特定的leaf node,代价为1/N*(RD + RH),其中N为节点总数,H为计算hash的平均代价

Fast Enumerator

详见另一篇论文《Query optimization time: The new bottleneck in real-time analystics》2015

Planner

Planner负责生成物理执行计划 - 一组DQEP Steps,DQEP Step是类SQL的step,可以以文本发送到其他节点,叶子step可以同时执行,一个Step可以在partition上并行执行。

MemSQL中节点间交互发送的是SQL语句,这可以透明支持node-level optimization。

Remote Table

MemSQL扩展了Remote Table功能,允许任何一个leaf node像aggregator一样来执行所有partition上的查询。

1
2
3
SELECT facts.id, facts.value
FROM REMOTE(facts) as facts
WHERE facts.value > 4;

REMOTE关键字意味着facts这张表分布在多个节点上,查询要在每个节点的partition上执行,而不是只计算本地的partition。

Result Table

Result Table是查询在local partition上的一个中间结果,如果其他partition要多次查询该结果,则可以只执行一次。

例如,针对上面的查询,Planner会针对每个partition生成一个查询:

1
2
3
4
CREATE RESULT TABLE facts_filtered
AS SELECT facts.id, facts.value
FROM facts
WHERE facts.value > 4;

这样,remote table查询的时候,可以直接查这个结果。

在DQEP中使用Remote/Result Table

DQEP里面包含一组数据移动step和计算step,中间通过Result Table衔接。Result Table并不一定要物化,可以只作为一层抽象流式提供数据。

1
2
3
4
SELECT * FROM x JOIN y
WHERE x.a = y.a
AND x.b < 2
AND y.c > 5;

其中x在x.a上shard,y没有在a上shard。根据过滤后数据量的大小,最佳计划可能是broadcast x或者reshuffle y。

假设filter后x数据量很小,则选择broadcast x。这种情况下可能生成如下的DQEP:

1
2
3
1. CREATE RESULT TABLE r1 AS SELECT * FROM x WHERE x.b < 2 (on every partition);
2. CREATE RESULT TABLE r2 AS SELECT * FROM REMOTE(x) (on every node); // 每个节点都计算r2,相当于广播r2。
3. SELECT * FROM r2 JOIN y WHERE y.c > 5 AND r2.a = y.a (on every partition);

Remote Table和Result Table抽象非常灵活,例如

1
2
3
4
1. CREATE RESULT TABLE r1 AS SELECT * FROM x WHERE x.b < 2 (on every partition)
2. CREATE RESULT TABLE r2 AS SELECT * FROM REMOTE(r1) (on a single node)
3. CREATE RESULT TABLE r3 AS SELECT * FROM REMOTE(r2) (on every node)
4. SELECT * FROM r3 JOIN y WHERE y.c > 5 AND r3.a = y.a (on every partition)

Result Table在创建的时候支持通过指定partition key的方式实现reshuffle。

1
2
1. CREATE RESULT TABLE r1 PARTITION BY (y.a) AS SELECT * FROM y WHERE y.c > 5 (on every partition)
2. SELECT * FROM x JOIN REMOTE(r1(p)) WHERE x.b < 2 AND x.a = y.a (on every partition)
Tags: SQL

扫描二维码,分享此文章