我的编程空间,编程开发者的网络收藏夹
学习永远不晚

PostgreSQL 源码解读(91)- 查询语句#76(ExecHashJoin函数#2)

短信预约 -IT技能 免费直播动态提醒
省份

北京

  • 北京
  • 上海
  • 天津
  • 重庆
  • 河北
  • 山东
  • 辽宁
  • 黑龙江
  • 吉林
  • 甘肃
  • 青海
  • 河南
  • 江苏
  • 湖北
  • 湖南
  • 江西
  • 浙江
  • 广东
  • 云南
  • 福建
  • 海南
  • 山西
  • 四川
  • 陕西
  • 贵州
  • 安徽
  • 广西
  • 内蒙
  • 西藏
  • 新疆
  • 宁夏
  • 兵团
手机号立即预约

请填写图片验证码后获取短信验证码

看不清楚,换张图片

免费获取短信验证码

PostgreSQL 源码解读(91)- 查询语句#76(ExecHashJoin函数#2)

本节是ExecHashJoin函数介绍的第二部分,主要介绍了ExecHashJoin中依赖的其他函数的实现逻辑,包括ExecHashTableCreate、ExecChooseHashTableSize等。

一、数据结构

Plan
所有计划节点通过将Plan结构作为第一个字段从Plan结构“派生”。这确保了在将节点转换为计划节点时,一切都能正常工作。(在执行器中以通用方式传递时,节点指针经常被转换为Plan *)


typedef struct Plan
{
    NodeTag     type;//节点类型

    
    Cost        startup_cost;   
    Cost        total_cost;     

    
    double      plan_rows;      
    int         plan_width;     

    
    bool        parallel_aware; 
    bool        parallel_safe;  

    
    int         plan_node_id;   
    List       *targetlist;     
    List       *qual;           
    struct Plan *lefttree;      
    struct Plan *righttree;
    List       *initPlan;       

    
    Bitmapset  *extParam;
    Bitmapset  *allParam;
} Plan;

JoinState
Hash/NestLoop/Merge Join的基类


typedef struct JoinState
{
    PlanState   ps;//基类PlanState
    JoinType    jointype;//连接类型
    //在找到一个匹配inner tuple的时候,如需要跳转到下一个outer tuple,则该值为T
    bool        single_match;   
    //连接条件表达式(除了ps.qual)
    ExprState  *joinqual;       
} JoinState;

HashJoinState
Hash Join运行期状态结构体


typedef struct HashJoinTupleData *HashJoinTuple;
typedef struct HashJoinTableData *HashJoinTable;

typedef struct HashJoinState
{
    JoinState   js;             
    ExprState  *hashclauses;//hash连接条件
    List       *hj_OuterHashKeys;   
    List       *hj_InnerHashKeys;   
    List       *hj_HashOperators;   
    HashJoinTable hj_HashTable;//Hash表
    uint32      hj_CurHashValue;//当前的Hash值
    int         hj_CurBucketNo;//当前的bucket编号
    int         hj_CurSkewBucketNo;//行倾斜bucket编号
    HashJoinTuple hj_CurTuple;//当前元组
    TupleTableSlot *hj_OuterTupleSlot;//outer relation slot
    TupleTableSlot *hj_HashTupleSlot;//Hash tuple slot
    TupleTableSlot *hj_NullOuterTupleSlot;//用于外连接的outer虚拟slot
    TupleTableSlot *hj_NullInnerTupleSlot;//用于外连接的inner虚拟slot
    TupleTableSlot *hj_FirstOuterTupleSlot;//
    int         hj_JoinState;//JoinState状态
    bool        hj_MatchedOuter;//是否匹配
    bool        hj_OuterNotEmpty;//outer relation是否为空
} HashJoinState;

HashJoinTable
Hash表数据结构

typedef struct HashJoinTableData
{
    int         nbuckets;       
    int         log2_nbuckets;  

    int         nbuckets_original;  
    int         nbuckets_optimal;   
    int         log2_nbuckets_optimal;  

    
    //bucket [i]是内存中第i个桶中的元组链表的head item
    union
    {
        
        //未共享数组是按批处理存储的,所有元组均如此
        struct HashJoinTupleData **unshared;
        
        //共享数组是每个查询的DSA区域,所有元组均如此
        dsa_pointer_atomic *shared;
    }           buckets;

    bool        keepNulls;      

    bool        skewEnabled;    
    HashSkewBucket **skewBucket;    
    int         skewBucketLen;  
    int         nSkewBuckets;   
    int        *skewBucketNums; 

    int         nbatch;         
    int         curbatch;       

    int         nbatch_original;    
    int         nbatch_outstart;    

    bool        growEnabled;    

    double      totalTuples;    
    double      partialTuples;  
    double      skewTuples;     

    
    BufFile   **innerBatchFile; 
    BufFile   **outerBatchFile; 

    
    FmgrInfo   *outer_hashfunctions;    
    FmgrInfo   *inner_hashfunctions;    
    bool       *hashStrict;     

    Size        spaceUsed;      
    Size        spaceAllowed;   
    Size        spacePeak;      
    Size        spaceUsedSkew;  
    Size        spaceAllowedSkew;   

    MemoryContext hashCxt;      
    MemoryContext batchCxt;     

    
    //用于密集分配元组(到链接块中)
    HashMemoryChunk chunks;     

    
    //并行hash使用的共享和私有状态
    HashMemoryChunk current_chunk;  
    dsa_area   *area;           
    ParallelHashJoinState *parallel_state;//并行执行状态
    ParallelHashJoinBatchAccessor *batches;//并行访问器
    dsa_pointer current_chunk_shared;//当前chunk的开始指针
} HashJoinTableData;

typedef struct HashJoinTableData *HashJoinTable;

HashJoinTupleData
Hash连接元组数据







typedef struct HashJoinTupleData
{
    
    //link同一个桶中的下一个元组
    union
    {
        struct HashJoinTupleData *unshared;
        dsa_pointer shared;
    }           next;
    uint32      hashvalue;      
    
}           HashJoinTupleData;

#define HJTUPLE_OVERHEAD  MAXALIGN(sizeof(HashJoinTupleData))
#define HJTUPLE_MINTUPLE(hjtup)  \
    ((MinimalTuple) ((char *) (hjtup) + HJTUPLE_OVERHEAD))

二、源码解读

ExecHashTableCreate
ExecHashTableCreate函数初始化hashjoin需要使用的hashtable.




#define innerPlan(node)         (((Plan *)(node))->righttree)
#define outerPlan(node)         (((Plan *)(node))->lefttree)


HashJoinTable
ExecHashTableCreate(HashState *state, List *hashOperators, bool keepNulls)
{
    Hash       *node;
    HashJoinTable hashtable;
    Plan       *outerNode;
    size_t      space_allowed;
    int         nbuckets;
    int         nbatch;
    double      rows;
    int         num_skew_mcvs;
    int         log2_nbuckets;
    int         nkeys;
    int         i;
    ListCell   *ho;
    MemoryContext oldcxt;

    
    node = (Hash *) state->ps.plan;//获取Hash节点
    outerNode = outerPlan(node);//获取outer relation Plan节点

    
    rows = node->plan.parallel_aware ? node->rows_total : outerNode->plan_rows;//获取总行数

    ExecChooseHashTableSize(rows, outerNode->plan_width,
                            OidIsValid(node->skewTable),
                            state->parallel_state != NULL,
                            state->parallel_state != NULL ?
                            state->parallel_state->nparticipants - 1 : 0,
                            &space_allowed,
                            &nbuckets, &nbatch, &num_skew_mcvs);//计算Hash Table的大小尺寸

    
    //nbuckets(hash桶数)必须是2的n次方
    log2_nbuckets = my_log2(nbuckets);
    Assert(nbuckets == (1 << log2_nbuckets));

    
    hashtable = (HashJoinTable) palloc(sizeof(HashJoinTableData));//分配内存
    hashtable->nbuckets = nbuckets;//桶数
    hashtable->nbuckets_original = nbuckets;
    hashtable->nbuckets_optimal = nbuckets;
    hashtable->log2_nbuckets = log2_nbuckets;
    hashtable->log2_nbuckets_optimal = log2_nbuckets;
    hashtable->buckets.unshared = NULL;
    hashtable->keepNulls = keepNulls;
    hashtable->skewEnabled = false;
    hashtable->skewBucket = NULL;
    hashtable->skewBucketLen = 0;
    hashtable->nSkewBuckets = 0;
    hashtable->skewBucketNums = NULL;
    hashtable->nbatch = nbatch;
    hashtable->curbatch = 0;
    hashtable->nbatch_original = nbatch;
    hashtable->nbatch_outstart = nbatch;
    hashtable->growEnabled = true;
    hashtable->totalTuples = 0;
    hashtable->partialTuples = 0;
    hashtable->skewTuples = 0;
    hashtable->innerBatchFile = NULL;
    hashtable->outerBatchFile = NULL;
    hashtable->spaceUsed = 0;
    hashtable->spacePeak = 0;
    hashtable->spaceAllowed = space_allowed;
    hashtable->spaceUsedSkew = 0;
    hashtable->spaceAllowedSkew =
        hashtable->spaceAllowed * SKEW_WORK_MEM_PERCENT / 100;
    hashtable->chunks = NULL;
    hashtable->current_chunk = NULL;
    hashtable->parallel_state = state->parallel_state;
    hashtable->area = state->ps.state->es_query_dsa;
    hashtable->batches = NULL;

#ifdef HJDEBUG
    printf("Hashjoin %p: initial nbatch = %d, nbuckets = %d\n",
           hashtable, nbatch, nbuckets);
#endif

    
    hashtable->hashCxt = AllocSetContextCreate(CurrentMemoryContext,
                                               "HashTableContext",
                                               ALLOCSET_DEFAULT_SIZES);

    hashtable->batchCxt = AllocSetContextCreate(hashtable->hashCxt,
                                                "HashBatchContext",
                                                ALLOCSET_DEFAULT_SIZES);

    
    //分配内存,切换至hashCxt
    oldcxt = MemoryContextSwitchTo(hashtable->hashCxt);

    
    nkeys = list_length(hashOperators);//键值数
    hashtable->outer_hashfunctions =
        (FmgrInfo *) palloc(nkeys * sizeof(FmgrInfo));//outer relation所使用的hash函数
    hashtable->inner_hashfunctions =
        (FmgrInfo *) palloc(nkeys * sizeof(FmgrInfo));//inner relation所使用的hash函数
    hashtable->hashStrict = (bool *) palloc(nkeys * sizeof(bool));//是否严格的操作符
    i = 0;
    foreach(ho, hashOperators)//遍历hash操作符
    {
        Oid         hashop = lfirst_oid(ho);//hash操作符
        Oid         left_hashfn;//左函数
        Oid         right_hashfn;//右函数
        //获取与给定操作符兼容的标准哈希函数的OID,并根据需要对其LHS和/或RHS数据类型进行操作。
        if (!get_op_hash_functions(hashop, &left_hashfn, &right_hashfn))//获取hash函数
            elog(ERROR, "could not find hash function for hash operator %u",
                 hashop);
        fmgr_info(left_hashfn, &hashtable->outer_hashfunctions[i]);
        fmgr_info(right_hashfn, &hashtable->inner_hashfunctions[i]);
        hashtable->hashStrict[i] = op_strict(hashop);
        i++;
    }

    if (nbatch > 1 && hashtable->parallel_state == NULL)//批次>1而且并行状态为NULL
    {
        
        hashtable->innerBatchFile = (BufFile **)
            palloc0(nbatch * sizeof(BufFile *));//用于缓存该批次的inner relation的tuple
        hashtable->outerBatchFile = (BufFile **)
            palloc0(nbatch * sizeof(BufFile *));//用于缓存该批次的outerr relation的tuple
        
        
        //这些文件需要时才会打开……
        //…但是要确保为它们建立了临时表空间
        PrepareTempTablespaces();
    }

    MemoryContextSwitchTo(oldcxt);//切换回原内存上下文

    if (hashtable->parallel_state)//并行处理
    {
        ParallelHashJoinState *pstate = hashtable->parallel_state;
        Barrier    *build_barrier;

        
        build_barrier = &pstate->build_barrier;
        BarrierAttach(build_barrier);

        
        if (BarrierPhase(build_barrier) == PHJ_BUILD_ELECTING &&
            BarrierArriveAndWait(build_barrier, WAIT_EVENT_HASH_BUILD_ELECTING))
        {
            pstate->nbatch = nbatch;
            pstate->space_allowed = space_allowed;
            pstate->growth = PHJ_GROWTH_OK;

            
            ExecParallelHashJoinSetUpBatches(hashtable, nbatch);

            
            pstate->nbuckets = nbuckets;
            ExecParallelHashTableAlloc(hashtable, 0);
        }

        
    }
    else//非并行处理
    {
        
        MemoryContextSwitchTo(hashtable->batchCxt);//切换上下文

        hashtable->buckets.unshared = (HashJoinTuple *)
            palloc0(nbuckets * sizeof(HashJoinTuple));//分配内存空间

        
        if (nbatch > 1)
            ExecHashBuildSkewHash(hashtable, node, num_skew_mcvs);

        MemoryContextSwitchTo(oldcxt);//切换上下文
    }

    return hashtable;//返回Hash表
}

 
 void
 fmgr_info(Oid functionId, FmgrInfo *finfo)
 {
     fmgr_info_cxt_security(functionId, finfo, CurrentMemoryContext, false);
 }
 

ExecChooseHashTableSize
ExecChooseHashTableSize函数根据给定要散列的关系的估计大小(行数和平均行宽),计算适当的散列表大小。





#define NTUP_PER_BUCKET         1

void
ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew,
                        bool try_combined_work_mem,
                        int parallel_workers,
                        size_t *space_allowed,
                        int *numbuckets,
                        int *numbatches,
                        int *num_skew_mcvs)
{
    int         tupsize;//元组大小
    double      inner_rel_bytes;//inner relation大小
    long        bucket_bytes;//桶大小
    long        hash_table_bytes;//hash table大小
    long        skew_table_bytes;//倾斜表大小
    long        max_pointers;//最大的指针数
    long        mppow2;//
    int         nbatch = 1;//批次
    int         nbuckets;//桶数
    double      dbuckets;//

    
    //如relation大小没有信息,则设定为默认值1000.0
    if (ntuples <= 0.0)
        ntuples = 1000.0;

    
    tupsize = HJTUPLE_OVERHEAD +
        MAXALIGN(SizeofMinimalTupleHeader) +
        MAXALIGN(tupwidth);//估算元组大小
    inner_rel_bytes = ntuples * tupsize;//inner relation大小

    
    hash_table_bytes = work_mem * 1024L;

    
    if (try_combined_work_mem)//尝试融合work_mem
        hash_table_bytes += hash_table_bytes * parallel_workers;

    *space_allowed = hash_table_bytes;

    
    if (useskew)
    {
        //倾斜优化
        skew_table_bytes = hash_table_bytes * SKEW_WORK_MEM_PERCENT / 100;

        
        *num_skew_mcvs = skew_table_bytes / (tupsize +
                                             (8 * sizeof(HashSkewBucket *)) +
                                             sizeof(int) +
                                             SKEW_BUCKET_OVERHEAD);
        if (*num_skew_mcvs > 0)
            hash_table_bytes -= skew_table_bytes;
    }
    else
        *num_skew_mcvs = 0;//不使用倾斜优化,默认为0

    
    max_pointers = *space_allowed / sizeof(HashJoinTuple);//最大指针数
    max_pointers = Min(max_pointers, MaxAllocSize / sizeof(HashJoinTuple));//控制上限
    
    //如果max_pointer不是2的幂,则必须四舍五入到符合规则的某个值(如110.1 --> 128)
    mppow2 = 1L << my_log2(max_pointers);
    if (max_pointers != mppow2)
        max_pointers = mppow2 / 2;

    
    
    //还要确保在nbatch和nbucket中避免整数溢出
    //(鉴于MaxAllocSize的当前值,此步骤是多余的)
    max_pointers = Min(max_pointers, INT_MAX / 2);//设定上限

    dbuckets = ceil(ntuples / NTUP_PER_BUCKET);//取整
    dbuckets = Min(dbuckets, max_pointers);//设定上限
    nbuckets = (int) dbuckets;//桶数
    
    //但是,不要让nbucket非常小……
    nbuckets = Max(nbuckets, 1024);//设定下限(1024)
    
    //2的幂
    nbuckets = 1 << my_log2(nbuckets);

    
    bucket_bytes = sizeof(HashJoinTuple) * nbuckets;
    if (inner_rel_bytes + bucket_bytes > hash_table_bytes)//inner relation大小 + 桶数大于可用空间
    {
        
        //需要多批次
        long        lbuckets;
        double      dbatch;
        int         minbatch;
        long        bucket_size;

        
        if (try_combined_work_mem)
        {
            ExecChooseHashTableSize(ntuples, tupwidth, useskew,
                                    false, parallel_workers,
                                    space_allowed,
                                    numbuckets,
                                    numbatches,
                                    num_skew_mcvs);
            return;
        }

        
        bucket_size = (tupsize * NTUP_PER_BUCKET + sizeof(HashJoinTuple));//桶大小
        lbuckets = 1L << my_log2(hash_table_bytes / bucket_size);
        lbuckets = Min(lbuckets, max_pointers);
        nbuckets = (int) lbuckets;
        nbuckets = 1 << my_log2(nbuckets);
        bucket_bytes = nbuckets * sizeof(HashJoinTuple);

        
        Assert(bucket_bytes <= hash_table_bytes / 2);

        
        //计算批次数
        dbatch = ceil(inner_rel_bytes / (hash_table_bytes - bucket_bytes));
        dbatch = Min(dbatch, max_pointers);
        minbatch = (int) dbatch;
        nbatch = 2;
        while (nbatch < minbatch)
            nbatch <<= 1;
    }

    Assert(nbuckets > 0);
    Assert(nbatch > 0);

    *numbuckets = nbuckets;
    *numbatches = nbatch;
}

三、跟踪分析

测试脚本如下

testdb=# set enable_nestloop=false;
SET
testdb=# set enable_mergejoin=false;
SET
testdb=# explain verbose select dw.*,grjf.grbh,grjf.xm,grjf.ny,grjf.je 
testdb-# from t_dwxx dw,lateral (select gr.grbh,gr.xm,jf.ny,jf.je 
testdb(#                         from t_grxx gr inner join t_jfxx jf 
testdb(#                                        on gr.dwbh = dw.dwbh 
testdb(#                                           and gr.grbh = jf.grbh) grjf
testdb-# order by dw.dwbh;
                                          QUERY PLAN                                           
-----------------------------------------------------------------------------------------------
 Sort  (cost=14828.83..15078.46 rows=99850 width=47)
   Output: dw.dwmc, dw.dwbh, dw.dwdz, gr.grbh, gr.xm, jf.ny, jf.je
   Sort Key: dw.dwbh
   ->  Hash Join  (cost=3176.00..6537.55 rows=99850 width=47)
         Output: dw.dwmc, dw.dwbh, dw.dwdz, gr.grbh, gr.xm, jf.ny, jf.je
         Hash Cond: ((gr.grbh)::text = (jf.grbh)::text)
         ->  Hash Join  (cost=289.00..2277.61 rows=99850 width=32)
               Output: dw.dwmc, dw.dwbh, dw.dwdz, gr.grbh, gr.xm
               Inner Unique: true
               Hash Cond: ((gr.dwbh)::text = (dw.dwbh)::text)
               ->  Seq Scan on public.t_grxx gr  (cost=0.00..1726.00 rows=100000 width=16)
                     Output: gr.dwbh, gr.grbh, gr.xm, gr.xb, gr.nl
               ->  Hash  (cost=164.00..164.00 rows=10000 width=20)
                     Output: dw.dwmc, dw.dwbh, dw.dwdz
                     ->  Seq Scan on public.t_dwxx dw  (cost=0.00..164.00 rows=10000 width=20)
                           Output: dw.dwmc, dw.dwbh, dw.dwdz
         ->  Hash  (cost=1637.00..1637.00 rows=100000 width=20)
               Output: jf.ny, jf.je, jf.grbh
               ->  Seq Scan on public.t_jfxx jf  (cost=0.00..1637.00 rows=100000 width=20)
                     Output: jf.ny, jf.je, jf.grbh
(20 rows)

启动gdb,设置断点,进入ExecHashTableCreate

(gdb) b ExecHashTableCreate
Breakpoint 1 at 0x6fc75d: file nodeHash.c, line 449.
(gdb) c
Continuing.

Breakpoint 1, ExecHashTableCreate (state=0x1e3cbc8, hashOperators=0x1e59890, keepNulls=false) at nodeHash.c:449
449     node = (Hash *) state->ps.plan;

获取相关信息

449     node = (Hash *) state->ps.plan;
(gdb) n
450     outerNode = outerPlan(node);
(gdb) 
457     rows = node->plan.parallel_aware ? node->rows_total : outerNode->plan_rows;
(gdb) 
462                             state->parallel_state != NULL ?
(gdb) 
459     ExecChooseHashTableSize(rows, outerNode->plan_width,
(gdb) 

获取Hash节点;
outer节点为顺序扫描SeqScan节点
inner(构造hash表的relation)行数为10000

(gdb) p *node
$1 = {plan = {type = T_Hash, startup_cost = 164, total_cost = 164, plan_rows = 10000, plan_width = 20, 
    parallel_aware = false, parallel_safe = true, plan_node_id = 4, targetlist = 0x1e4bf90, qual = 0x0, 
    lefttree = 0x1e493e8, righttree = 0x0, initPlan = 0x0, extParam = 0x0, allParam = 0x0}, skewTable = 16977, 
  skewColumn = 1, skewInherit = false, rows_total = 0}
(gdb) p *outerNode
$2 = {type = T_SeqScan, startup_cost = 0, total_cost = 164, plan_rows = 10000, plan_width = 20, parallel_aware = false, 
  parallel_safe = true, plan_node_id = 5, targetlist = 0x1e492b0, qual = 0x0, lefttree = 0x0, righttree = 0x0, 
  initPlan = 0x0, extParam = 0x0, allParam = 0x0}
(gdb) p rows
$3 = 10000
(gdb) 

进入ExecChooseHashTableSize函数

(gdb) step
ExecChooseHashTableSize (ntuples=10000, tupwidth=20, useskew=true, try_combined_work_mem=false, parallel_workers=0, 
    space_allowed=0x7ffdcf148540, numbuckets=0x7ffdcf14853c, numbatches=0x7ffdcf148538, num_skew_mcvs=0x7ffdcf148534)
    at nodeHash.c:677
677     int         nbatch = 1;

ExecChooseHashTableSize->计算元组大小(56B)/inner relation大小(约560K)/hash表空间(16M)

(gdb) n
682     if (ntuples <= 0.0)
(gdb) 
690     tupsize = HJTUPLE_OVERHEAD +
(gdb) 
693     inner_rel_bytes = ntuples * tupsize;
(gdb) 
698     hash_table_bytes = work_mem * 1024L;
(gdb) 
705     if (try_combined_work_mem)
(gdb) p tupsize
$4 = 56
(gdb) p inner_rel_bytes
$5 = 560000
(gdb) p hash_table_bytes
$6 = 16777216

ExecChooseHashTableSize->使用数据倾斜优化(所需空间从Hash Table中获取)

(gdb) n
708     *space_allowed = hash_table_bytes;
(gdb) 
724     if (useskew)
(gdb) 
726         skew_table_bytes = hash_table_bytes * SKEW_WORK_MEM_PERCENT / 100;
(gdb) p useskew
$8 = true
(gdb) p hash_table_bytes
$9 = 16441672
(gdb) p skew_table_bytes
$10 = 335544
(gdb) p num_skew_mcvs
$11 = (int *) 0x7ffdcf148534
(gdb) p *num_skew_mcvs
$12 = 2396
(gdb) 

ExecChooseHashTableSize->获取最大指针数目(2097152)

(gdb) n
756     max_pointers = Min(max_pointers, MaxAllocSize / sizeof(HashJoinTuple));
(gdb) 
758     mppow2 = 1L << my_log2(max_pointers);
(gdb) n
759     if (max_pointers != mppow2)
(gdb) p max_pointers
$13 = 2097152
(gdb) p mppow2
$15 = 2097152

ExecChooseHashTableSize->计算Hash桶数

(gdb) n
764     max_pointers = Min(max_pointers, INT_MAX / 2);
(gdb) 
766     dbuckets = ceil(ntuples / NTUP_PER_BUCKET);
(gdb) 
767     dbuckets = Min(dbuckets, max_pointers);
(gdb) 
768     nbuckets = (int) dbuckets;
(gdb) 
770     nbuckets = Max(nbuckets, 1024);
(gdb) 
772     nbuckets = 1 << my_log2(nbuckets);
(gdb) 
778     bucket_bytes = sizeof(HashJoinTuple) * nbuckets;
(gdb) n
779     if (inner_rel_bytes + bucket_bytes > hash_table_bytes)
(gdb) 
834     Assert(nbuckets > 0);
(gdb) p dbuckets
$16 = 10000
(gdb) p nbuckets
$17 = 16384
(gdb) p bucket_bytes
$18 = 131072

ExecChooseHashTableSize->只需要一个批次,赋值,返回

835     Assert(nbatch > 0);
(gdb) 
837     *numbuckets = nbuckets;
(gdb) 
838     *numbatches = nbatch;
(gdb) 
839 }
(gdb) 
(gdb) 
ExecHashTableCreate (state=0x1e3cbc8, hashOperators=0x1e59890, keepNulls=false) at nodeHash.c:468
468     log2_nbuckets = my_log2(nbuckets);

初始化Hash表

468     log2_nbuckets = my_log2(nbuckets);
(gdb) p nbuckets
$19 = 16384
(gdb) n
469     Assert(nbuckets == (1 << log2_nbuckets));
(gdb) 
478     hashtable = (HashJoinTable) palloc(sizeof(HashJoinTableData));
(gdb) 
479     hashtable->nbuckets = nbuckets;
...

分配内存上下文

...
(gdb) 
522     hashtable->hashCxt = AllocSetContextCreate(CurrentMemoryContext,
(gdb) 
526     hashtable->batchCxt = AllocSetContextCreate(hashtable->hashCxt,
(gdb) 
532     oldcxt = MemoryContextSwitchTo(hashtable->hashCxt);
(gdb) 

切换上下文,并初始化hash函数

(gdb) 
532     oldcxt = MemoryContextSwitchTo(hashtable->hashCxt);
(gdb) n
538     nkeys = list_length(hashOperators);
(gdb) 
540         (FmgrInfo *) palloc(nkeys * sizeof(FmgrInfo));
(gdb) p nkeys
$20 = 1
(gdb) n
539     hashtable->outer_hashfunctions =
(gdb) 
542         (FmgrInfo *) palloc(nkeys * sizeof(FmgrInfo));
(gdb) 
541     hashtable->inner_hashfunctions =
(gdb) 
543     hashtable->hashStrict = (bool *) palloc(nkeys * sizeof(bool));
(gdb) 
544     i = 0;

初始化Hash操作符

(gdb) n
545     foreach(ho, hashOperators)
(gdb) 
547         Oid         hashop = lfirst_oid(ho);
(gdb) 
551         if (!get_op_hash_functions(hashop, &left_hashfn, &right_hashfn))
(gdb) 
554         fmgr_info(left_hashfn, &hashtable->outer_hashfunctions[i]);
(gdb) 
555         fmgr_info(right_hashfn, &hashtable->inner_hashfunctions[i]);
(gdb) 
556         hashtable->hashStrict[i] = op_strict(hashop);
(gdb) 
557         i++;
(gdb) 
545     foreach(ho, hashOperators)
(gdb) p *hashtable->hashStrict
$21 = true
(gdb) n
560     if (nbatch > 1 && hashtable->parallel_state == NULL)

分配hash桶内存空间

gdb) n
575     MemoryContextSwitchTo(oldcxt);
(gdb) 
577     if (hashtable->parallel_state)
(gdb) 
631         MemoryContextSwitchTo(hashtable->batchCxt);
(gdb) 
634             palloc0(nbuckets * sizeof(HashJoinTuple));
(gdb) 
633         hashtable->buckets.unshared = (HashJoinTuple *)
(gdb) p nbuckets
$23 = 16384

构造完成,返回hash表

(gdb) n
641         if (nbatch > 1)
(gdb) 
644         MemoryContextSwitchTo(oldcxt);
(gdb) 
647     return hashtable;
(gdb) 
648 }
(gdb) 
ExecHashJoinImpl (pstate=0x1e3c048, parallel=false) at nodeHashjoin.c:282
282                 node->hj_HashTable = hashtable;
(gdb) 

DONE!

四、参考资料

Hash Joins: Past, Present and Future/PGCon 2017
A Look at How Postgres Executes a Tiny Join - Part 1
A Look at How Postgres Executes a Tiny Join - Part 2
Assignment 2 Symmetric Hash Join

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

PostgreSQL 源码解读(91)- 查询语句#76(ExecHashJoin函数#2)

下载Word文档到电脑,方便收藏和打印~

下载Word文档

猜你喜欢

PostgreSQL 源码解读(71)- 查询语句#56(make_one_rel函数#21-...

本节大体介绍了动态规划算法实现(standard_join_search)中的join_search_one_level->make_join_rel->populate_joinrel_with_paths->add_paths_to_j
2022-11-30

编程热搜

目录