PostgreSQL 源码解读(90)- 查询语句#75(ExecHashJoin函数#1)
本节介绍了ExecProcNode的其中一个Real函数(ExecHashJoin)。ExecHashJoin函数实现了Hash Join算法。
一、数据结构
Plan
所有计划节点通过将Plan结构作为第一个字段从Plan结构“派生”。这确保了在将节点转换为计划节点时,一切都能正常工作。(在执行器中以通用方式传递时,节点指针经常被转换为Plan *)
typedef struct Plan
{
NodeTag type;//节点类型
Cost startup_cost;
Cost total_cost;
double plan_rows;
int plan_width;
bool parallel_aware;
bool parallel_safe;
int plan_node_id;
List *targetlist;
List *qual;
struct Plan *lefttree;
struct Plan *righttree;
List *initPlan;
Bitmapset *extParam;
Bitmapset *allParam;
} Plan;
JoinState
Hash/NestLoop/Merge Join的基类
typedef struct JoinState
{
PlanState ps;//基类PlanState
JoinType jointype;//连接类型
//在找到一个匹配inner tuple的时候,如需要跳转到下一个outer tuple,则该值为T
bool single_match;
//连接条件表达式(除了ps.qual)
ExprState *joinqual;
} JoinState;
HashJoinState
Hash Join运行期状态结构体
typedef struct HashJoinTupleData *HashJoinTuple;
typedef struct HashJoinTableData *HashJoinTable;
typedef struct HashJoinState
{
JoinState js;
ExprState *hashclauses;//hash连接条件
List *hj_OuterHashKeys;
List *hj_InnerHashKeys;
List *hj_HashOperators;
HashJoinTable hj_HashTable;//Hash表
uint32 hj_CurHashValue;//当前的Hash值
int hj_CurBucketNo;//当前的bucket编号
int hj_CurSkewBucketNo;//行倾斜bucket编号
HashJoinTuple hj_CurTuple;//当前元组
TupleTableSlot *hj_OuterTupleSlot;//outer relation slot
TupleTableSlot *hj_HashTupleSlot;//Hash tuple slot
TupleTableSlot *hj_NullOuterTupleSlot;//用于外连接的outer虚拟slot
TupleTableSlot *hj_NullInnerTupleSlot;//用于外连接的inner虚拟slot
TupleTableSlot *hj_FirstOuterTupleSlot;//
int hj_JoinState;//JoinState状态
bool hj_MatchedOuter;//是否匹配
bool hj_OuterNotEmpty;//outer relation是否为空
} HashJoinState;
二、源码解读
ExecHashJoin函数实现了Hash Join算法,实际实现的函数是ExecHashJoinImpl.
ExecHashJoinImpl函数把Hash Join划分为多个阶段/状态(有限状态机),保存在HashJoinState->hj_JoinState字段中,这些状态分别是分别为HJ_BUILD_HASHTABLE/HJ_NEED_NEW_OUTER/HJ_SCAN_BUCKET/HJ_FILL_OUTER_TUPLE/HJ_FILL_INNER_TUPLES/HJ_NEED_NEW_BATCH.
HJ_BUILD_HASHTABLE:创建Hash表;
HJ_NEED_NEW_OUTER:扫描outer relation,计算外表连接键的hash值,把相匹配元组放在合适的bucket中;
HJ_SCAN_BUCKET:扫描bucket,匹配的tuple返回
HJ_FILL_OUTER_TUPLE:当前outer relation元组已耗尽,因此检查是否发出一个虚拟的外连接元组。
HJ_FILL_INNER_TUPLES:已完成一个批处理,但做的是右外连接/完全连接,填充虚拟连接元组
HJ_NEED_NEW_BATCH:开启下一批次
注意:在work_mem不足以装下Hash Table时,分批执行.每个批次执行时,会把outer relation与inner relation匹配(指hash值一样)的tuple会存储起来,放在合适的批次文件中(hashtable->outerBatchFile[batchno]),以避免多次的outer relation扫描.
#define HJ_FILL_INNER(hjstate) ((hjstate)->hj_NullOuterTupleSlot != NULL)
static TupleTableSlot *
ExecHashJoin(PlanState *pstate)
{
return ExecHashJoinImpl(pstate, false);
}
#define HJ_BUILD_HASHTABLE 1
#define HJ_NEED_NEW_OUTER 2
#define HJ_SCAN_BUCKET 3
#define HJ_FILL_OUTER_TUPLE 4
#define HJ_FILL_INNER_TUPLES 5
#define HJ_NEED_NEW_BATCH 6
#define HJ_FILL_OUTER(hjstate) ((hjstate)->hj_NullInnerTupleSlot != NULL)
#define HJ_FILL_INNER(hjstate) ((hjstate)->hj_NullOuterTupleSlot != NULL)
static TupleTableSlot *ExecHashJoinOuterGetTuple(PlanState *outerNode,
HashJoinState *hjstate,
uint32 *hashvalue);
static TupleTableSlot *ExecParallelHashJoinOuterGetTuple(PlanState *outerNode,
HashJoinState *hjstate,
uint32 *hashvalue);
static TupleTableSlot *ExecHashJoinGetSavedTuple(HashJoinState *hjstate,
BufFile *file,
uint32 *hashvalue,
TupleTableSlot *tupleSlot);
static bool ExecHashJoinNewBatch(HashJoinState *hjstate);
static bool ExecParallelHashJoinNewBatch(HashJoinState *hjstate);
static void ExecParallelHashJoinPartitionOuter(HashJoinState *node);
static pg_attribute_always_inline TupleTableSlot *
ExecHashJoinImpl(PlanState *pstate, bool parallel)
{
HashJoinState *node = castNode(HashJoinState, pstate);
PlanState *outerNode;
HashState *hashNode;
ExprState *joinqual;
ExprState *otherqual;
ExprContext *econtext;
HashJoinTable hashtable;
TupleTableSlot *outerTupleSlot;
uint32 hashvalue;
int batchno;
ParallelHashJoinState *parallel_state;
joinqual = node->js.joinqual;
otherqual = node->js.ps.qual;
hashNode = (HashState *) innerPlanState(node);
outerNode = outerPlanState(node);
hashtable = node->hj_HashTable;
econtext = node->js.ps.ps_ExprContext;
parallel_state = hashNode->parallel_state;
ResetExprContext(econtext);
for (;;)
{
CHECK_FOR_INTERRUPTS();
switch (node->hj_JoinState)
{
case HJ_BUILD_HASHTABLE://-->HJ_BUILD_HASHTABLE阶段
Assert(hashtable == NULL);
if (HJ_FILL_INNER(node))
{
//不构建哈希表是不可能的了
node->hj_FirstOuterTupleSlot = NULL;
}
else if (parallel)
{
node->hj_FirstOuterTupleSlot = NULL;
}
else if (HJ_FILL_OUTER(node) ||
(outerNode->plan->startup_cost < hashNode->ps.plan->total_cost &&
!node->hj_OuterNotEmpty))
{
node->hj_FirstOuterTupleSlot = ExecProcNode(outerNode);
if (TupIsNull(node->hj_FirstOuterTupleSlot))
{
node->hj_OuterNotEmpty = false;
return NULL;
}
else
node->hj_OuterNotEmpty = true;
}
else
node->hj_FirstOuterTupleSlot = NULL;
hashtable = ExecHashTableCreate(hashNode,
node->hj_HashOperators,
HJ_FILL_INNER(node));
node->hj_HashTable = hashtable;
hashNode->hashtable = hashtable;
(void) MultiExecProcNode((PlanState *) hashNode);
if (hashtable->totalTuples == 0 && !HJ_FILL_OUTER(node))
return NULL;
hashtable->nbatch_outstart = hashtable->nbatch;
node->hj_OuterNotEmpty = false;//重置OuterNotEmpty为F
if (parallel)
{
//启用并行
Barrier *build_barrier;
build_barrier = ¶llel_state->build_barrier;
Assert(BarrierPhase(build_barrier) == PHJ_BUILD_HASHING_OUTER ||
BarrierPhase(build_barrier) == PHJ_BUILD_DONE);
if (BarrierPhase(build_barrier) == PHJ_BUILD_HASHING_OUTER)
{
if (hashtable->nbatch > 1)
ExecParallelHashJoinPartitionOuter(node);
BarrierArriveAndWait(build_barrier,
WAIT_EVENT_HASH_BUILD_HASHING_OUTER);
}
Assert(BarrierPhase(build_barrier) == PHJ_BUILD_DONE);
//每一个后台worker需选择批次
hashtable->curbatch = -1;
node->hj_JoinState = HJ_NEED_NEW_BATCH;
continue;//下一循环
}
else
//非并行执行,设置hj_JoinState状态
node->hj_JoinState = HJ_NEED_NEW_OUTER;
case HJ_NEED_NEW_OUTER://-->HJ_NEED_NEW_OUTER阶段
if (parallel)
outerTupleSlot =
ExecParallelHashJoinOuterGetTuple(outerNode, node,
&hashvalue);//并行执行
else
outerTupleSlot =
ExecHashJoinOuterGetTuple(outerNode, node, &hashvalue);//普通执行
if (TupIsNull(outerTupleSlot))
{
//如outerTupleSlot为NULL
//完成此批数据处理,或者可能是全连接
if (HJ_FILL_INNER(node))//hj_NullOuterTupleSlot != NULL
{
//不匹配的行,填充NULL(外连接)
ExecPrepHashTableForUnmatched(node);
node->hj_JoinState = HJ_FILL_INNER_TUPLES;
}
else
node->hj_JoinState = HJ_NEED_NEW_BATCH;//需要下一个批次
continue;
}
//设置变量
econtext->ecxt_outertuple = outerTupleSlot;
node->hj_MatchedOuter = false;
node->hj_CurHashValue = hashvalue;
//获取Hash Bucket并处理此批次
ExecHashGetBucketAndBatch(hashtable, hashvalue,
&node->hj_CurBucketNo, &batchno);
//Hash倾斜优化(某个值的数据特别多)
node->hj_CurSkewBucketNo = ExecHashGetSkewBucket(hashtable,
hashvalue);
node->hj_CurTuple = NULL;
if (batchno != hashtable->curbatch &&
node->hj_CurSkewBucketNo == INVALID_SKEW_BUCKET_NO)
{
Assert(parallel_state == NULL);
Assert(batchno > hashtable->curbatch);
ExecHashJoinSaveTuple(ExecFetchSlotMinimalTuple(outerTupleSlot),
hashvalue,
&hashtable->outerBatchFile[batchno]);
//循环,保持HJ_NEED_NEW_OUTER状态
continue;
}
//已完成此阶段,切换至HJ_SCAN_BUCKET状态
node->hj_JoinState = HJ_SCAN_BUCKET;
case HJ_SCAN_BUCKET://-->HJ_SCAN_BUCKET阶段
if (parallel)
{
//并行处理
if (!ExecParallelScanHashBucket(node, econtext))
{
// 无法匹配,检查可能的外连接填充,状态切换为HJ_FILL_OUTER_TUPLE
node->hj_JoinState = HJ_FILL_OUTER_TUPLE;
continue;
}
}
else
{
//非并行执行
if (!ExecScanHashBucket(node, econtext))
{
node->hj_JoinState = HJ_FILL_OUTER_TUPLE;//同上
continue;
}
}
if (joinqual == NULL || ExecQual(joinqual, econtext))
{
node->hj_MatchedOuter = true;
HeapTupleHeaderSetMatch(HJTUPLE_MINTUPLE(node->hj_CurTuple));
//反连接,则不能返回匹配的元组
if (node->js.jointype == JOIN_ANTI)
{
node->hj_JoinState = HJ_NEED_NEW_OUTER;
continue;
}
if (node->js.single_match)
node->hj_JoinState = HJ_NEED_NEW_OUTER;
if (otherqual == NULL || ExecQual(otherqual, econtext))
return ExecProject(node->js.ps.ps_ProjInfo);//执行投影操作
else
InstrCountFiltered2(node, 1);//其他条件不匹配
}
else
InstrCountFiltered1(node, 1);//连接条件不匹配
break;
case HJ_FILL_OUTER_TUPLE://-->HJ_FILL_OUTER_TUPLE阶段
node->hj_JoinState = HJ_NEED_NEW_OUTER;//切换状态为HJ_NEED_NEW_OUTER
if (!node->hj_MatchedOuter &&
HJ_FILL_OUTER(node))
{
econtext->ecxt_innertuple = node->hj_NullInnerTupleSlot;
if (otherqual == NULL || ExecQual(otherqual, econtext))
return ExecProject(node->js.ps.ps_ProjInfo);//投影操作
else
InstrCountFiltered2(node, 1);
}
break;
case HJ_FILL_INNER_TUPLES://-->HJ_FILL_INNER_TUPLES阶段
if (!ExecScanHashTableForUnmatched(node, econtext))
{
//不存在更多不匹配的元组,切换状态为HJ_NEED_NEW_BATCH(开始下一批次)
node->hj_JoinState = HJ_NEED_NEW_BATCH;
continue;
}
econtext->ecxt_outertuple = node->hj_NullOuterTupleSlot;
if (otherqual == NULL || ExecQual(otherqual, econtext))
return ExecProject(node->js.ps.ps_ProjInfo);
else
InstrCountFiltered2(node, 1);
break;
case HJ_NEED_NEW_BATCH://-->HJ_NEED_NEW_BATCH阶段
if (parallel)
{
//并行处理
if (!ExecParallelHashJoinNewBatch(node))
return NULL;
}
else
{
//非并行处理
if (!ExecHashJoinNewBatch(node))
return NULL;
}
node->hj_JoinState = HJ_NEED_NEW_OUTER;//切换状态
break;
default://非法的JoinState
elog(ERROR, "unrecognized hashjoin state: %d",
(int) node->hj_JoinState);
}
}
}
三、跟踪分析
测试脚本如下
testdb=# explain verbose select dw.*,grjf.grbh,grjf.xm,grjf.ny,grjf.je
testdb-# from t_dwxx dw,lateral (select gr.grbh,gr.xm,jf.ny,jf.je
testdb(# from t_grxx gr inner join t_jfxx jf
testdb(# on gr.dwbh = dw.dwbh
testdb(# and gr.grbh = jf.grbh) grjf
testdb-# order by dw.dwbh;
QUERY PLAN
-----------------------------------------------------------------------------------------------
Sort (cost=14828.83..15078.46 rows=99850 width=47)
Output: dw.dwmc, dw.dwbh, dw.dwdz, gr.grbh, gr.xm, jf.ny, jf.je
Sort Key: dw.dwbh
-> Hash Join (cost=3176.00..6537.55 rows=99850 width=47)
Output: dw.dwmc, dw.dwbh, dw.dwdz, gr.grbh, gr.xm, jf.ny, jf.je
Hash Cond: ((gr.grbh)::text = (jf.grbh)::text)
-> Hash Join (cost=289.00..2277.61 rows=99850 width=32)
Output: dw.dwmc, dw.dwbh, dw.dwdz, gr.grbh, gr.xm
Inner Unique: true
Hash Cond: ((gr.dwbh)::text = (dw.dwbh)::text)
-> Seq Scan on public.t_grxx gr (cost=0.00..1726.00 rows=100000 width=16)
Output: gr.dwbh, gr.grbh, gr.xm, gr.xb, gr.nl
-> Hash (cost=164.00..164.00 rows=10000 width=20)
Output: dw.dwmc, dw.dwbh, dw.dwdz
-> Seq Scan on public.t_dwxx dw (cost=0.00..164.00 rows=10000 width=20)
Output: dw.dwmc, dw.dwbh, dw.dwdz
-> Hash (cost=1637.00..1637.00 rows=100000 width=20)
Output: jf.ny, jf.je, jf.grbh
-> Seq Scan on public.t_jfxx jf (cost=0.00..1637.00 rows=100000 width=20)
Output: jf.ny, jf.je, jf.grbh
(20 rows)
启动gdb,设置断点,进入ExecHashJoin
(gdb) b ExecHashJoin
Breakpoint 1 at 0x70292e: file nodeHashjoin.c, line 565.
(gdb) c
Continuing.
Breakpoint 1, ExecHashJoin (pstate=0x2ee1a88) at nodeHashjoin.c:565
565 return ExecHashJoinImpl(pstate, false);
继续执行,进入第2个Hash Join,即t_grxx & t_dwxx的连接
(gdb) n
Breakpoint 1, ExecHashJoin (pstate=0x2ee1d98) at nodeHashjoin.c:565
565 return ExecHashJoinImpl(pstate, false);
查看输入参数,ExecProcNode=ExecProcNodeReal=ExecHashJoin
(gdb) p *pstate
$8 = {type = T_HashJoinState, plan = 0x2faaff8, state = 0x2ee1758, ExecProcNode = 0x70291d <ExecHashJoin>,
ExecProcNodeReal = 0x70291d <ExecHashJoin>, instrument = 0x0, worker_instrument = 0x0, worker_jit_instrument = 0x0,
qual = 0x0, lefttree = 0x2ee2070, righttree = 0x2ee2918, initPlan = 0x0, subPlan = 0x0, chgParam = 0x0,
ps_ResultTupleSlot = 0x2f20d98, ps_ExprContext = 0x2ee1fb0, ps_ProjInfo = 0x2ee3550, scandesc = 0x0}
(gdb)
pstate的lefttree对应的是SeqScan,righttree对应的是Hash,即左树(outer relation)为t_grxx的顺序扫描运算生成的relation,右树(inner relation)为t_dwxx的顺序扫描运算生成的relation(在此relation上创建Hash Table)
(gdb) p *pstate->lefttree
$6 = {type = T_SeqScanState, plan = 0x2fa8ff0, state = 0x2ee1758, ExecProcNode = 0x6e4bde <ExecProcNodeFirst>,
ExecProcNodeReal = 0x71578d <ExecSeqScan>, instrument = 0x0, worker_instrument = 0x0, worker_jit_instrument = 0x0,
qual = 0x0, lefttree = 0x0, righttree = 0x0, initPlan = 0x0, subPlan = 0x0, chgParam = 0x0,
ps_ResultTupleSlot = 0x2ee27d8, ps_ExprContext = 0x2ee2188, ps_ProjInfo = 0x0, scandesc = 0x7f0710d02bd0}
(gdb) p *pstate->righttree
$9 = {type = T_HashState, plan = 0x2faaf60, state = 0x2ee1758, ExecProcNode = 0x6e4bde <ExecProcNodeFirst>,
ExecProcNodeReal = 0x6fc015 <ExecHash>, instrument = 0x0, worker_instrument = 0x0, worker_jit_instrument = 0x0,
qual = 0x0, lefttree = 0x2ee2af0, righttree = 0x0, initPlan = 0x0, subPlan = 0x0, chgParam = 0x0,
ps_ResultTupleSlot = 0x2ee3278, ps_ExprContext = 0x2ee2a30, ps_ProjInfo = 0x0, scandesc = 0x0}
进入ExecHashJoinImpl函数
(gdb) step
ExecHashJoinImpl (pstate=0x2ee1d98, parallel=false) at nodeHashjoin.c:167
167 HashJoinState *node = castNode(HashJoinState, pstate);
赋值,查看HashJoinState等变量值
(gdb) n
182 joinqual = node->js.joinqual;
(gdb) n
183 otherqual = node->js.ps.qual;
(gdb)
184 hashNode = (HashState *) innerPlanState(node);
(gdb)
185 outerNode = outerPlanState(node);
(gdb)
186 hashtable = node->hj_HashTable;
(gdb)
187 econtext = node->js.ps.ps_ExprContext;
(gdb)
188 parallel_state = hashNode->parallel_state;
(gdb)
194 ResetExprContext(econtext);
(gdb) p *node
$10 = {js = {ps = {type = T_HashJoinState, plan = 0x2faaff8, state = 0x2ee1758, ExecProcNode = 0x70291d <ExecHashJoin>,
ExecProcNodeReal = 0x70291d <ExecHashJoin>, instrument = 0x0, worker_instrument = 0x0, worker_jit_instrument = 0x0,
qual = 0x0, lefttree = 0x2ee2070, righttree = 0x2ee2918, initPlan = 0x0, subPlan = 0x0, chgParam = 0x0,
ps_ResultTupleSlot = 0x2f20d98, ps_ExprContext = 0x2ee1fb0, ps_ProjInfo = 0x2ee3550, scandesc = 0x0},
jointype = JOIN_INNER, single_match = true, joinqual = 0x0}, hashclauses = 0x2f21430, hj_OuterHashKeys = 0x2f22230,
hj_InnerHashKeys = 0x2f22740, hj_HashOperators = 0x2f227a0, hj_HashTable = 0x0, hj_CurHashValue = 0, hj_CurBucketNo = 0,
hj_CurSkewBucketNo = -1, hj_CurTuple = 0x0, hj_OuterTupleSlot = 0x2f212f0, hj_HashTupleSlot = 0x2ee3278,
hj_NullOuterTupleSlot = 0x0, hj_NullInnerTupleSlot = 0x0, hj_FirstOuterTupleSlot = 0x0, hj_JoinState = 1,
hj_MatchedOuter = false, hj_OuterNotEmpty = false}
(gdb) p *otherqual
Cannot access memory at address 0x0
(gdb) p *hashNode
$11 = {ps = {type = T_HashState, plan = 0x2faaf60, state = 0x2ee1758, ExecProcNode = 0x6e4bde <ExecProcNodeFirst>,
ExecProcNodeReal = 0x6fc015 <ExecHash>, instrument = 0x0, worker_instrument = 0x0, worker_jit_instrument = 0x0,
qual = 0x0, lefttree = 0x2ee2af0, righttree = 0x0, initPlan = 0x0, subPlan = 0x0, chgParam = 0x0,
ps_ResultTupleSlot = 0x2ee3278, ps_ExprContext = 0x2ee2a30, ps_ProjInfo = 0x0, scandesc = 0x0}, hashtable = 0x0,
hashkeys = 0x2f22740, shared_info = 0x0, hinstrument = 0x0, parallel_state = 0x0}
(gdb) p *hashtable
Cannot access memory at address 0x0
(gdb) p parallel_state
$12 = (ParallelHashJoinState *) 0x0
(gdb)
进入HJ_BUILD_HASHTABLE处理逻辑,创建Hash表
(gdb) p node->hj_JoinState
$13 = 1
HJ_BUILD_HASHTABLE->执行相关判断,本例为内连接,因此不存在FILL_OUTER等情况
(gdb) n
216 Assert(hashtable == NULL);
(gdb)
241 if (HJ_FILL_INNER(node))
(gdb)
246 else if (parallel)
(gdb)
258 else if (HJ_FILL_OUTER(node) ||
(gdb)
259 (outerNode->plan->startup_cost < hashNode->ps.plan->total_cost &&
(gdb)
HJ_BUILD_HASHTABLE->outer node的启动成本低于创建Hash表的总成本而且outer relation为空(初始化node->hj_OuterNotEmpty为false),那么尝试获取outer relation的第一个元组,如为NULL,则可快速返回NULL,否则设置node->hj_OuterNotEmpty标记为T
258 else if (HJ_FILL_OUTER(node) ||
(gdb)
260 !node->hj_OuterNotEmpty))
(gdb)
259 (outerNode->plan->startup_cost < hashNode->ps.plan->total_cost &&
(gdb)
262 node->hj_FirstOuterTupleSlot = ExecProcNode(outerNode);
(gdb)
263 if (TupIsNull(node->hj_FirstOuterTupleSlot))
(gdb)
269 node->hj_OuterNotEmpty = true;
HJ_BUILD_HASHTABLE->创建Hash Table
(gdb) n
263 if (TupIsNull(node->hj_FirstOuterTupleSlot))
(gdb)
281 HJ_FILL_INNER(node));
(gdb)
279 hashtable = ExecHashTableCreate(hashNode,
(gdb)
HJ_BUILD_HASHTABLE->Hash Table(HashJoinTable结构体)的内存结构
bucket数量为16384(16K),取对数结果为14(即log2_nbuckets/log2_nbuckets_optimal的结果值)
skewEnabled为F,没有启用倾斜优化
(gdb) p *hashtable
$14 = {nbuckets = 16384, log2_nbuckets = 14, nbuckets_original = 16384, nbuckets_optimal = 16384,
log2_nbuckets_optimal = 14, buckets = {unshared = 0x2fb1260, shared = 0x2fb1260}, keepNulls = false, skewEnabled = false,
skewBucket = 0x0, skewBucketLen = 0, nSkewBuckets = 0, skewBucketNums = 0x0, nbatch = 1, curbatch = 0,
nbatch_original = 1, nbatch_outstart = 1, growEnabled = true, totalTuples = 0, partialTuples = 0, skewTuples = 0,
innerBatchFile = 0x0, outerBatchFile = 0x0, outer_hashfunctions = 0x3053b68, inner_hashfunctions = 0x3053bc0,
hashStrict = 0x3053c18, spaceUsed = 0, spaceAllowed = 16777216, spacePeak = 0, spaceUsedSkew = 0,
spaceAllowedSkew = 335544, hashCxt = 0x3053a50, batchCxt = 0x2f8b170, chunks = 0x0, current_chunk = 0x0, area = 0x0,
parallel_state = 0x0, batches = 0x0, current_chunk_shared = 9187201950435737471}
HJ_BUILD_HASHTABLE->使用的Hash函数
(gdb) p *hashtable->inner_hashfunctions
$15 = {fn_addr = 0x4c8a0a <hashtext>, fn_oid = 400, fn_nargs = 1, fn_strict = true, fn_retset = false, fn_stats = 2 '\002',
fn_extra = 0x0, fn_mcxt = 0x3053a50, fn_expr = 0x0}
(gdb) p *hashtable->outer_hashfunctions
$16 = {fn_addr = 0x4c8a0a <hashtext>, fn_oid = 400, fn_nargs = 1, fn_strict = true, fn_retset = false, fn_stats = 2 '\002',
fn_extra = 0x0, fn_mcxt = 0x3053a50, fn_expr = 0x0}
HJ_BUILD_HASHTABLE->赋值,并执行此Hash Node节点,结果总元组数为10000
(gdb) n
289 hashNode->hashtable = hashtable;
(gdb)
290 (void) MultiExecProcNode((PlanState *) hashNode);
(gdb)
297 if (hashtable->totalTuples == 0 && !HJ_FILL_OUTER(node))
(gdb) p hashtable->totalTuples
$18 = 10000
HJ_BUILD_HASHTABLE->批次数为1,只需要执行1个批次即可
(gdb) n
304 hashtable->nbatch_outstart = hashtable->nbatch;
(gdb) p hashtable->nbatch
$19 = 1
HJ_BUILD_HASHTABLE->重置OuterNotEmpty为F
(gdb) n
311 node->hj_OuterNotEmpty = false;
(gdb)
313 if (parallel)
HJ_BUILD_HASHTABLE->非并行执行,切换状态为HJ_NEED_NEW_OUTER
(gdb)
313 if (parallel)
(gdb) n
340 node->hj_JoinState = HJ_NEED_NEW_OUTER;
HJ_NEED_NEW_OUTER->获取(执行ExecHashJoinOuterGetTuple)下一个outer relation的一个元组
349 if (parallel)
(gdb) n
354 outerTupleSlot =
(gdb)
357 if (TupIsNull(outerTupleSlot))
(gdb) p *outerTupleSlot
$20 = {type = T_TupleTableSlot, tts_isempty = false, tts_shouldFree = false, tts_shouldFreeMin = false, tts_slow = true,
tts_tuple = 0x2f88300, tts_tupleDescriptor = 0x7f0710d02bd0, tts_mcxt = 0x2ee1640, tts_buffer = 507, tts_nvalid = 1,
tts_values = 0x2ee22a8, tts_isnull = 0x2ee22d0, tts_mintuple = 0x0, tts_minhdr = {t_len = 0, t_self = {ip_blkid = {
bi_hi = 0, bi_lo = 0}, ip_posid = 0}, t_tableOid = 0, t_data = 0x0}, tts_off = 2, tts_fixedTupleDescriptor = true}
HJ_NEED_NEW_OUTER->设置相关变量
(gdb) n
371 econtext->ecxt_outertuple = outerTupleSlot;
(gdb)
372 node->hj_MatchedOuter = false;
(gdb)
378 node->hj_CurHashValue = hashvalue;
(gdb)
379 ExecHashGetBucketAndBatch(hashtable, hashvalue,
(gdb) p hashvalue
$21 = 2324234220
(gdb) n
381 node->hj_CurSkewBucketNo = ExecHashGetSkewBucket(hashtable,
(gdb)
383 node->hj_CurTuple = NULL;
(gdb) p *node
$22 = {js = {ps = {type = T_HashJoinState, plan = 0x2faaff8, state = 0x2ee1758, ExecProcNode = 0x70291d <ExecHashJoin>,
ExecProcNodeReal = 0x70291d <ExecHashJoin>, instrument = 0x0, worker_instrument = 0x0, worker_jit_instrument = 0x0,
qual = 0x0, lefttree = 0x2ee2070, righttree = 0x2ee2918, initPlan = 0x0, subPlan = 0x0, chgParam = 0x0,
ps_ResultTupleSlot = 0x2f20d98, ps_ExprContext = 0x2ee1fb0, ps_ProjInfo = 0x2ee3550, scandesc = 0x0},
jointype = JOIN_INNER, single_match = true, joinqual = 0x0}, hashclauses = 0x2f21430, hj_OuterHashKeys = 0x2f22230,
hj_InnerHashKeys = 0x2f22740, hj_HashOperators = 0x2f227a0, hj_HashTable = 0x2f88ee8, hj_CurHashValue = 2324234220,
hj_CurBucketNo = 16364, hj_CurSkewBucketNo = -1, hj_CurTuple = 0x0, hj_OuterTupleSlot = 0x2f212f0,
hj_HashTupleSlot = 0x2ee3278, hj_NullOuterTupleSlot = 0x0, hj_NullInnerTupleSlot = 0x0, hj_FirstOuterTupleSlot = 0x0,
hj_JoinState = 2, hj_MatchedOuter = false, hj_OuterNotEmpty = true}
(gdb) p *econtext
$25 = {type = T_ExprContext, ecxt_scantuple = 0x0, ecxt_innertuple = 0x0, ecxt_outertuple = 0x2ee2248,
ecxt_per_query_memory = 0x2ee1640, ecxt_per_tuple_memory = 0x2f710c0, ecxt_param_exec_vals = 0x0,
ecxt_param_list_info = 0x0, ecxt_aggvalues = 0x0, ecxt_aggnulls = 0x0, caseValue_datum = 0, caseValue_isNull = true,
domainValue_datum = 0, domainValue_isNull = true, ecxt_estate = 0x2ee1758, ecxt_callbacks = 0x0}
(gdb) p *node->hj_HashTupleSlot
$26 = {type = T_TupleTableSlot, tts_isempty = true, tts_shouldFree = false, tts_shouldFreeMin = false, tts_slow = false,
tts_tuple = 0x0, tts_tupleDescriptor = 0x2ee3060, tts_mcxt = 0x2ee1640, tts_buffer = 0, tts_nvalid = 0,
tts_values = 0x2ee32d8, tts_isnull = 0x2ee32f0, tts_mintuple = 0x0, tts_minhdr = {t_len = 0, t_self = {ip_blkid = {
bi_hi = 0, bi_lo = 0}, ip_posid = 0}, t_tableOid = 0, t_data = 0x0}, tts_off = 0, tts_fixedTupleDescriptor = true}
HJ_NEED_NEW_OUTER->切换状态为HJ_SCAN_BUCKET,开始扫描Hash Table
(gdb) n
407 node->hj_JoinState = HJ_SCAN_BUCKET;
(gdb)
HJ_SCAN_BUCKET->不匹配,切换状态为HJ_FILL_OUTER_TUPLE
(gdb)
416 if (parallel)
(gdb) n
427 if (!ExecScanHashBucket(node, econtext))
(gdb)
430 node->hj_JoinState = HJ_FILL_OUTER_TUPLE;
(gdb)
431 continue;
(gdb)
HJ_FILL_OUTER_TUPLE->切换状态为HJ_NEED_NEW_OUTER
不管是否获得/发出一个元组,下一个状态是NEED_NEW_OUTER
209 switch (node->hj_JoinState)
(gdb)
483 node->hj_JoinState = HJ_NEED_NEW_OUTER;
HJ_FILL_OUTER_TUPLE->由于不是外连接,无需FILL,回到HJ_NEED_NEW_OUTER处理逻辑
(gdb) n
485 if (!node->hj_MatchedOuter &&
(gdb)
486 HJ_FILL_OUTER(node))
(gdb)
485 if (!node->hj_MatchedOuter &&
(gdb)
549 }
(gdb)
HJ_SCAN_BUCKET->在SCAN_BUCKET成功扫描的位置设置断点
(gdb) b nodeHashjoin.c:441
Breakpoint 3 at 0x7025c3: file nodeHashjoin.c, line 441.
(gdb) c
Continuing.
Breakpoint 3, ExecHashJoinImpl (pstate=0x2ee1d98, parallel=false) at nodeHashjoin.c:447
447 if (joinqual == NULL || ExecQual(joinqual, econtext))
HJ_SCAN_BUCKET->存在匹配的元组,设置相关标记
(gdb) n
449 node->hj_MatchedOuter = true;
(gdb)
450 HeapTupleHeaderSetMatch(HJTUPLE_MINTUPLE(node->hj_CurTuple));
(gdb)
453 if (node->js.jointype == JOIN_ANTI)
(gdb) n
464 if (node->js.single_match)
(gdb)
465 node->hj_JoinState = HJ_NEED_NEW_OUTER;
(gdb)
HJ_SCAN_BUCKET->执行投影操作并返回
467 if (otherqual == NULL || ExecQual(otherqual, econtext))
(gdb)
468 return ExecProject(node->js.ps.ps_ProjInfo);
(gdb)
总的来说,Hash Join的实现是创建inner relation的Hash Table,然后获取outer relation的元组,如匹配则执行投影操作返回相应的元组,除了创建HT外,其他步骤不断的变换状态执行,直至满足Portal要求的元组数量为止.
四、参考资料
Hash Joins: Past, Present and Future/PGCon 2017
A Look at How Postgres Executes a Tiny Join - Part 1
A Look at How Postgres Executes a Tiny Join - Part 2
Assignment 2 Symmetric Hash Join
免责声明:
① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。
② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341
PostgreSQL 源码解读(90)- 查询语句#75(ExecHashJoin函数#1)
下载Word文档到电脑,方便收藏和打印~