PostgreSQL 源码解读(71)- 查询语句#56(make_one_rel函数#21-...
本节大体介绍了动态规划算法实现(standard_join_search)中的join_search_one_level->make_join_rel->populate_joinrel_with_paths->add_paths_to_joinrel->hash_inner_and_outer中的initial_cost_hashjoin和final_cost_nestloop函数,这些函数用于计算hash join的Cost。
一、数据结构
Cost相关
注意:实际使用的参数值通过系统配置文件定义,而不是这里的常量定义!
typedef double Cost;
#define DEFAULT_SEQ_PAGE_COST 1.0 //顺序扫描page的成本
#define DEFAULT_RANDOM_PAGE_COST 4.0 //随机扫描page的成本
#define DEFAULT_CPU_TUPLE_COST 0.01 //处理一个元组的CPU成本
#define DEFAULT_CPU_INDEX_TUPLE_COST 0.005 //处理一个索引元组的CPU成本
#define DEFAULT_CPU_OPERATOR_COST 0.0025 //执行一次操作或函数的CPU成本
#define DEFAULT_PARALLEL_TUPLE_COST 0.1 //并行执行,从一个worker传输一个元组到另一个worker的成本
#define DEFAULT_PARALLEL_SETUP_COST 1000.0 //构建并行执行环境的成本
#define DEFAULT_EFFECTIVE_CACHE_SIZE 524288
double seq_page_cost = DEFAULT_SEQ_PAGE_COST;
double random_page_cost = DEFAULT_RANDOM_PAGE_COST;
double cpu_tuple_cost = DEFAULT_CPU_TUPLE_COST;
double cpu_index_tuple_cost = DEFAULT_CPU_INDEX_TUPLE_COST;
double cpu_operator_cost = DEFAULT_CPU_OPERATOR_COST;
double parallel_tuple_cost = DEFAULT_PARALLEL_TUPLE_COST;
double parallel_setup_cost = DEFAULT_PARALLEL_SETUP_COST;
int effective_cache_size = DEFAULT_EFFECTIVE_CACHE_SIZE;
Cost disable_cost = 1.0e10;//1后面10个0,通过设置一个巨大的成本,让优化器自动放弃此路径
int max_parallel_workers_per_gather = 2;//每次gather使用的worker数
二、源码解读
hash join的算法实现伪代码如下:
Step 1
FOR small_table_row IN (SELECT * FROM small_table)
LOOP
slot := HASH(small_table_row.join_key);
INSERT_HASH_TABLE(slot,small_table_row);
END LOOP;
Step 2
FOR large_table_row IN (SELECT * FROM large_table) LOOP
slot := HASH(large_table_row.join_key);
small_table_row = LOOKUP_HASH_TABLE(slot,large_table_row.join_key);
IF small_table_row FOUND THEN
output small_table_row + large_table_row;
END IF;
END LOOP;
initial_cost_hashjoin和final_cost_nestloop函数初步预估hash join访问路径的成本和计算最终hash join的Cost。
initial_cost_hashjoin
//----------------------------------------------------------------------------- initial_cost_hashjoin
void
initial_cost_hashjoin(PlannerInfo *root, JoinCostWorkspace *workspace,
JoinType jointype,
List *hashclauses,
Path *outer_path, Path *inner_path,
JoinPathExtraData *extra,
bool parallel_hash)
{
Cost startup_cost = 0;
Cost run_cost = 0;
double outer_path_rows = outer_path->rows;
double inner_path_rows = inner_path->rows;
double inner_path_rows_total = inner_path_rows;
int num_hashclauses = list_length(hashclauses);
int numbuckets;
int numbatches;
int num_skew_mcvs;
size_t space_allowed;
startup_cost += outer_path->startup_cost;
run_cost += outer_path->total_cost - outer_path->startup_cost;
startup_cost += inner_path->total_cost;
startup_cost += (cpu_operator_cost * num_hashclauses + cpu_tuple_cost)
* inner_path_rows;
run_cost += cpu_operator_cost * num_hashclauses * outer_path_rows;
if (parallel_hash)
inner_path_rows_total *= get_parallel_divisor(inner_path);
ExecChooseHashTableSize(inner_path_rows_total,
inner_path->pathtarget->width,
true,
parallel_hash,
outer_path->parallel_workers,
&space_allowed,
&numbuckets,
&numbatches,
&num_skew_mcvs);
if (numbatches > 1)
{
double outerpages = page_size(outer_path_rows,
outer_path->pathtarget->width);
double innerpages = page_size(inner_path_rows,
inner_path->pathtarget->width);
startup_cost += seq_page_cost * innerpages;
run_cost += seq_page_cost * (innerpages + 2 * outerpages);
}
workspace->startup_cost = startup_cost;
workspace->total_cost = startup_cost + run_cost;
workspace->run_cost = run_cost;
workspace->numbuckets = numbuckets;
workspace->numbatches = numbatches;
workspace->inner_rows_total = inner_path_rows_total;
}
//----------------------------------------------------------- ExecChooseHashTableSize
#define NTUP_PER_BUCKET 1
void
ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew,
bool try_combined_work_mem,
int parallel_workers,
size_t *space_allowed,
int *numbuckets,
int *numbatches,
int *num_skew_mcvs)
{
int tupsize;
double inner_rel_bytes;
long bucket_bytes;
long hash_table_bytes;
long skew_table_bytes;
long max_pointers;
long mppow2;
int nbatch = 1;
int nbuckets;
double dbuckets;
//如元组数目没有统计,则默认值为1000
if (ntuples <= 0.0)
ntuples = 1000.0;
tupsize = HJTUPLE_OVERHEAD +
MAXALIGN(SizeofMinimalTupleHeader) +
MAXALIGN(tupwidth);
inner_rel_bytes = ntuples * tupsize;//元组数 * 元组大小
hash_table_bytes = work_mem * 1024L;
if (try_combined_work_mem)
hash_table_bytes += hash_table_bytes * parallel_workers;
*space_allowed = hash_table_bytes;
if (useskew)//使用倾斜优化(数据列值非均匀分布)
{
skew_table_bytes = hash_table_bytes * SKEW_WORK_MEM_PERCENT / 100;
*num_skew_mcvs = skew_table_bytes / (tupsize +
(8 * sizeof(HashSkewBucket *)) +
sizeof(int) +
SKEW_BUCKET_OVERHEAD);
if (*num_skew_mcvs > 0)
hash_table_bytes -= skew_table_bytes;
}
else
*num_skew_mcvs = 0;
max_pointers = *space_allowed / sizeof(HashJoinTuple);
max_pointers = Min(max_pointers, MaxAllocSize / sizeof(HashJoinTuple));
//设置为2的倍数
mppow2 = 1L << my_log2(max_pointers);
if (max_pointers != mppow2)
max_pointers = mppow2 / 2;
//确保没有整数溢出
max_pointers = Min(max_pointers, INT_MAX / 2);
dbuckets = ceil(ntuples / NTUP_PER_BUCKET);
dbuckets = Min(dbuckets, max_pointers);
nbuckets = (int) dbuckets;
//nbuckets最大为1024
nbuckets = Max(nbuckets, 1024);
//左移一位,即结果*2
nbuckets = 1 << my_log2(nbuckets);
bucket_bytes = sizeof(HashJoinTuple) * nbuckets;
if (inner_rel_bytes + bucket_bytes > hash_table_bytes)
{
//内表大小 + 桶大小 > hash表大小,需要拆分为多个批次
long lbuckets;
double dbatch;
int minbatch;
long bucket_size;
if (try_combined_work_mem)
{
ExecChooseHashTableSize(ntuples, tupwidth, useskew,
false, parallel_workers,
space_allowed,
numbuckets,
numbatches,
num_skew_mcvs);
return;
}
bucket_size = (tupsize * NTUP_PER_BUCKET + sizeof(HashJoinTuple));
lbuckets = 1L << my_log2(hash_table_bytes / bucket_size);
lbuckets = Min(lbuckets, max_pointers);
nbuckets = (int) lbuckets;
nbuckets = 1 << my_log2(nbuckets);
bucket_bytes = nbuckets * sizeof(HashJoinTuple);
Assert(bucket_bytes <= hash_table_bytes / 2);
//计算批次
dbatch = ceil(inner_rel_bytes / (hash_table_bytes - bucket_bytes));
dbatch = Min(dbatch, max_pointers);
minbatch = (int) dbatch;
nbatch = 2;
while (nbatch < minbatch)
nbatch <<= 1;
}
Assert(nbuckets > 0);
Assert(nbatch > 0);
*numbuckets = nbuckets;
*numbatches = nbatch;
}
final_cost_hashjoin
//----------------------------------------------------------------------------- final_cost_hashjoin
void
final_cost_hashjoin(PlannerInfo *root, HashPath *path,
JoinCostWorkspace *workspace,
JoinPathExtraData *extra)
{
Path *outer_path = path->jpath.outerjoinpath;
Path *inner_path = path->jpath.innerjoinpath;
double outer_path_rows = outer_path->rows;
double inner_path_rows = inner_path->rows;
double inner_path_rows_total = workspace->inner_rows_total;
List *hashclauses = path->path_hashclauses;
Cost startup_cost = workspace->startup_cost;
Cost run_cost = workspace->run_cost;
int numbuckets = workspace->numbuckets;
int numbatches = workspace->numbatches;
Cost cpu_per_tuple;
QualCost hash_qual_cost;
QualCost qp_qual_cost;
double hashjointuples;
double virtualbuckets;
Selectivity innerbucketsize;
Selectivity innermcvfreq;
ListCell *hcl;
if (path->jpath.path.param_info)
path->jpath.path.rows = path->jpath.path.param_info->ppi_rows;
else
path->jpath.path.rows = path->jpath.path.parent->rows;
if (path->jpath.path.parallel_workers > 0)
{
double parallel_divisor = get_parallel_divisor(&path->jpath.path);
path->jpath.path.rows =
clamp_row_est(path->jpath.path.rows / parallel_divisor);
}
if (!enable_hashjoin)
startup_cost += disable_cost;//禁用hash join
path->num_batches = numbatches;//处理批数
//存储元组总数.
path->inner_rows_total = inner_path_rows_total;
//计算虚拟的桶数(buckets)
virtualbuckets = (double) numbuckets * (double) numbatches;
if (IsA(inner_path, UniquePath))
{
//内表唯一
innerbucketsize = 1.0 / virtualbuckets;
innermcvfreq = 0.0;
}
else//内表不唯一
{
innerbucketsize = 1.0;
innermcvfreq = 1.0;
foreach(hcl, hashclauses)//循环遍历hash条件
{
RestrictInfo *restrictinfo = lfirst_node(RestrictInfo, hcl);
Selectivity thisbucketsize;
Selectivity thismcvfreq;
if (bms_is_subset(restrictinfo->right_relids,
inner_path->parent->relids))
{
thisbucketsize = restrictinfo->right_bucketsize;
if (thisbucketsize < 0)
{
estimate_hash_bucket_stats(root,
get_rightop(restrictinfo->clause),
virtualbuckets,
&restrictinfo->right_mcvfreq,
&restrictinfo->right_bucketsize);
thisbucketsize = restrictinfo->right_bucketsize;
}
thismcvfreq = restrictinfo->right_mcvfreq;
}
else
{
Assert(bms_is_subset(restrictinfo->left_relids,
inner_path->parent->relids));
thisbucketsize = restrictinfo->left_bucketsize;
if (thisbucketsize < 0)
{
estimate_hash_bucket_stats(root,
get_leftop(restrictinfo->clause),
virtualbuckets,
&restrictinfo->left_mcvfreq,
&restrictinfo->left_bucketsize);
thisbucketsize = restrictinfo->left_bucketsize;
}
thismcvfreq = restrictinfo->left_mcvfreq;
}
if (innerbucketsize > thisbucketsize)
innerbucketsize = thisbucketsize;
if (innermcvfreq > thismcvfreq)
innermcvfreq = thismcvfreq;
}
}
if (relation_byte_size(clamp_row_est(inner_path_rows * innermcvfreq),
inner_path->pathtarget->width) >
(work_mem * 1024L))
startup_cost += disable_cost;//MCV的值如果大于work_mem,则禁用hash连接
cost_qual_eval(&hash_qual_cost, hashclauses, root);
cost_qual_eval(&qp_qual_cost, path->jpath.joinrestrictinfo, root);
qp_qual_cost.startup -= hash_qual_cost.startup;
qp_qual_cost.per_tuple -= hash_qual_cost.per_tuple;
if (path->jpath.jointype == JOIN_SEMI ||
path->jpath.jointype == JOIN_ANTI ||
extra->inner_unique)
{
double outer_matched_rows;
Selectivity inner_scan_frac;
outer_matched_rows = rint(outer_path_rows * extra->semifactors.outer_match_frac);
inner_scan_frac = 2.0 / (extra->semifactors.match_count + 1.0);
startup_cost += hash_qual_cost.startup;
run_cost += hash_qual_cost.per_tuple * outer_matched_rows *
clamp_row_est(inner_path_rows * innerbucketsize * inner_scan_frac) * 0.5;
run_cost += hash_qual_cost.per_tuple *
(outer_path_rows - outer_matched_rows) *
clamp_row_est(inner_path_rows / virtualbuckets) * 0.05;
//参与基础连接的元组数
if (path->jpath.jointype == JOIN_ANTI)
hashjointuples = outer_path_rows - outer_matched_rows;
else
hashjointuples = outer_matched_rows;
}
else
{
startup_cost += hash_qual_cost.startup;
run_cost += hash_qual_cost.per_tuple * outer_path_rows *
clamp_row_est(inner_path_rows * innerbucketsize) * 0.5;
hashjointuples = approx_tuple_count(root, &path->jpath, hashclauses);
}
startup_cost += qp_qual_cost.startup;
cpu_per_tuple = cpu_tuple_cost + qp_qual_cost.per_tuple;
run_cost += cpu_per_tuple * hashjointuples;
startup_cost += path->jpath.path.pathtarget->cost.startup;
run_cost += path->jpath.path.pathtarget->cost.per_tuple * path->jpath.path.rows;
path->jpath.path.startup_cost = startup_cost;
path->jpath.path.total_cost = startup_cost + run_cost;
}
三、跟踪分析
测试脚本如下
select a.*,b.grbh,b.je
from t_dwxx a,
lateral (select t1.dwbh,t1.grbh,t2.je
from t_grxx t1
inner join t_jfxx t2 on t1.dwbh = a.dwbh and t1.grbh = t2.grbh) b
order by b.dwbh;
启动gdb,设置断点
(gdb) b try_hashjoin_path
Breakpoint 1 at 0x7af2a4: file joinpath.c, line 737.
进入函数try_hashjoin_path,连接类型为JOIN_INNER
(gdb) c
Continuing.
Breakpoint 1, try_hashjoin_path (root=0x1b73980, joinrel=0x1b91d30, outer_path=0x1b866c0, inner_path=0x1b8e780,
hashclauses=0x1b93200, jointype=JOIN_INNER, extra=0x7ffc09955e80) at joinpath.c:737
737 required_outer = calc_non_nestloop_required_outer(outer_path,
进入initial_cost_hashjoin函数
(gdb) n
739 if (required_outer &&
(gdb)
751 initial_cost_hashjoin(root, &workspace, jointype, hashclauses,
(gdb) step
initial_cost_hashjoin (root=0x1b73980, workspace=0x7ffc09955d00, jointype=JOIN_INNER, hashclauses=0x1b93200,
outer_path=0x1b866c0, inner_path=0x1b8e780, extra=0x7ffc09955e80, parallel_hash=false) at costsize.c:3160
3160 Cost startup_cost = 0;
初始赋值,hash连接条件只有1个,即t_dwxx.dwbh = t_grxx.dwbh(连接信息参照上一节)
3160 Cost startup_cost = 0;
(gdb) n
3161 Cost run_cost = 0;
(gdb)
3162 double outer_path_rows = outer_path->rows;
(gdb)
3163 double inner_path_rows = inner_path->rows;
(gdb)
3164 double inner_path_rows_total = inner_path_rows;
(gdb)
3165 int num_hashclauses = list_length(hashclauses);
(gdb)
3172 startup_cost += outer_path->startup_cost;
(gdb) p num_hashclauses
$1 = 1
进入ExecChooseHashTableSize函数,该函数根据给定需要散列的关系的估计大小(行数和平均行宽度),计算散列表的合适大小。
...
(gdb) step
ExecChooseHashTableSize (ntuples=100000, tupwidth=9, useskew=true, try_combined_work_mem=false, parallel_workers=0,
space_allowed=0x7ffc09955c38, numbuckets=0x7ffc09955c4c, numbatches=0x7ffc09955c48, num_skew_mcvs=0x7ffc09955c44)
at nodeHash.c:677
获取元组大小->48Bytes,内部大小4800000Bytes(4.58M)和hash表大小->4194304(4M)
(gdb) n
690 tupsize = HJTUPLE_OVERHEAD +
(gdb)
693 inner_rel_bytes = ntuples * tupsize;
(gdb)
698 hash_table_bytes = work_mem * 1024L;
(gdb)
705 if (try_combined_work_mem)
(gdb) p tupsize
$2 = 48
(gdb) p inner_rel_bytes
$3 = 4800000
(gdb) p hash_table_bytes
$4 = 4194304
使用行倾斜技术
(gdb) n
708 *space_allowed = hash_table_bytes;
(gdb)
724 if (useskew)
(gdb)
726 skew_table_bytes = hash_table_bytes * SKEW_WORK_MEM_PERCENT / 100;
(gdb) p useskew
$5 = true
行倾斜桶数为635,调整后的hash表大小4110418,最大的指针数524288
...
(gdb) p *num_skew_mcvs
$7 = 635
(gdb) p hash_table_bytes
$8 = 4110418
...
(gdb) n
756 max_pointers = Min(max_pointers, MaxAllocSize / sizeof(HashJoinTuple));
(gdb)
758 mppow2 = 1L << my_log2(max_pointers);
(gdb)
759 if (max_pointers != mppow2)
(gdb)
764 max_pointers = Min(max_pointers, INT_MAX / 2);
(gdb)
766 dbuckets = ceil(ntuples / NTUP_PER_BUCKET);
(gdb) p max_pointers
$10 = 524288
计算桶数=131072
...
(gdb) n
767 dbuckets = Min(dbuckets, max_pointers);
(gdb)
768 nbuckets = (int) dbuckets;
(gdb)
770 nbuckets = Max(nbuckets, 1024);
(gdb)
772 nbuckets = 1 << my_log2(nbuckets);
(gdb)
778 bucket_bytes = sizeof(HashJoinTuple) * nbuckets;
(gdb)
779 if (inner_rel_bytes + bucket_bytes > hash_table_bytes)
(gdb) p nbuckets
$11 = 131072
如果没有足够的空间,将需要多个批处理。
779 if (inner_rel_bytes + bucket_bytes > hash_table_bytes)
(gdb) n
791 if (try_combined_work_mem)
计算桶数->131072和批次->2等信息
...
(gdb)
837 *numbuckets = nbuckets;
(gdb)
838 *numbatches = nbatch;
(gdb) p nbuckets
$12 = 131072
(gdb) p nbatch
$13 = 2
回到initial_cost_hashjoin
(gdb)
initial_cost_hashjoin (root=0x1b73980, workspace=0x7ffc09955d00, jointype=JOIN_INNER, hashclauses=0x1b93200,
outer_path=0x1b866c0, inner_path=0x1b8e780, extra=0x7ffc09955e80, parallel_hash=false) at costsize.c:3226
3226 if (numbatches > 1)
initial_cost_hashjoin函数执行完毕,查看计算结果
(gdb)
3246 workspace->inner_rows_total = inner_path_rows_total;
(gdb)
3247 }
(gdb) p workspace
$14 = (JoinCostWorkspace *) 0x7ffc09955d00
(gdb) p *workspace
$15 = {startup_cost = 3465, total_cost = 4261, run_cost = 796, inner_run_cost = 0,
inner_rescan_run_cost = 6.9525149533036983e-310, outer_rows = 3.7882102964330281e-317,
inner_rows = 1.4285501039407471e-316, outer_skip_rows = 1.4285501039407471e-316,
inner_skip_rows = 6.9525149532894692e-310, numbuckets = 131072, numbatches = 2, inner_rows_total = 100000}
(gdb)
设置断点,进入final_cost_hashjoin
(gdb) b final_cost_hashjoin
Breakpoint 2 at 0x7a0b49: file costsize.c, line 3265.
(gdb) c
Continuing.
Breakpoint 2, final_cost_hashjoin (root=0x1b73980, path=0x1b93238, workspace=0x7ffc09955d00, extra=0x7ffc09955e80)
at costsize.c:3265
3265 Path *outer_path = path->jpath.outerjoinpath;
赋值,并计算虚拟桶数
3265 Path *outer_path = path->jpath.outerjoinpath;
(gdb) n
3266 Path *inner_path = path->jpath.innerjoinpath;
...
(gdb)
3314 virtualbuckets = (double) numbuckets * (double) numbatches;
(gdb)
3326 if (IsA(inner_path, UniquePath))
(gdb) p virtualbuckets
$16 = 262144
(gdb)
开始遍历hash连接条件,确定桶大小分数和MCV频率之间的内在联系,更新连接条件中的相关信息。
3335 foreach(hcl, hashclauses)
(gdb)
3337 RestrictInfo *restrictinfo = lfirst_node(RestrictInfo, hcl);
内表在右端RTE=3,即t_grxx),更新相关信息
3349 if (bms_is_subset(restrictinfo->right_relids,
(gdb)
3353 thisbucketsize = restrictinfo->right_bucketsize;
(gdb)
3354 if (thisbucketsize < 0)
(gdb)
3357 estimate_hash_bucket_stats(root,
(gdb)
3358 get_rightop(restrictinfo->clause),
(gdb)
3357 estimate_hash_bucket_stats(root,
(gdb)
3362 thisbucketsize = restrictinfo->right_bucketsize;
(gdb) p *restrictinfo
$17 = {type = T_RestrictInfo, clause = 0x1b87260, is_pushed_down = true, outerjoin_delayed = false, can_join = true,
pseudoconstant = false, leakproof = false, security_level = 0, clause_relids = 0x1b87380, required_relids = 0x1b86c68,
outer_relids = 0x0, nullable_relids = 0x0, left_relids = 0x1b87340, right_relids = 0x1b87360, orclause = 0x0,
parent_ec = 0x1b85b88, eval_cost = {startup = 0, per_tuple = 0.0025000000000000001}, norm_selec = 0.0001,
outer_selec = -1, mergeopfamilies = 0x1b873c8, left_ec = 0x1b85b88, right_ec = 0x1b85b88, left_em = 0x1b85d58,
right_em = 0x1b85c80, scansel_cache = 0x1b92e50, outer_is_left = true, hashjoinoperator = 98, left_bucketsize = -1,
right_bucketsize = 0.00010013016921998598, left_mcvfreq = -1, right_mcvfreq = 0}
计算过程与先前介绍的类似,计算表达式的成本等等,具体可自行debug.
最终的计算结果
3505 }
(gdb) p *path
$19 = {jpath = {path = {type = T_HashPath, pathtype = T_HashJoin, parent = 0x1b91d30, pathtarget = 0x1b91f68,
param_info = 0x0, parallel_aware = false, parallel_safe = true, parallel_workers = 0, rows = 100000,
startup_cost = 3465, total_cost = 5386, pathkeys = 0x0}, jointype = JOIN_INNER, inner_unique = false,
outerjoinpath = 0x1b866c0, innerjoinpath = 0x1b8e780, joinrestrictinfo = 0x1b92208}, path_hashclauses = 0x1b93200,
num_batches = 2, inner_rows_total = 100000}
DONE!
四、参考资料
allpaths.c
cost.h
costsize.c
PG Document:Query Planning
免责声明:
① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。
② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341
PostgreSQL 源码解读(71)- 查询语句#56(make_one_rel函数#21-...
下载Word文档到电脑,方便收藏和打印~