我的编程空间,编程开发者的网络收藏夹
学习永远不晚

PostgreSQL 源码解读(189)- 查询#105(聚合函数#10 - agg_retrieve_hash_table)

短信预约 -IT技能 免费直播动态提醒
省份

北京

  • 北京
  • 上海
  • 天津
  • 重庆
  • 河北
  • 山东
  • 辽宁
  • 黑龙江
  • 吉林
  • 甘肃
  • 青海
  • 河南
  • 江苏
  • 湖北
  • 湖南
  • 江西
  • 浙江
  • 广东
  • 云南
  • 福建
  • 海南
  • 山西
  • 四川
  • 陕西
  • 贵州
  • 安徽
  • 广西
  • 内蒙
  • 西藏
  • 新疆
  • 宁夏
  • 兵团
手机号立即预约

请填写图片验证码后获取短信验证码

看不清楚,换张图片

免费获取短信验证码

PostgreSQL 源码解读(189)- 查询#105(聚合函数#10 - agg_retrieve_hash_table)

本节继续介绍聚合函数的实现,主要介绍了agg_retrieve_hash_table函数中与投影相关的实现逻辑,包括函数prepare_projection_slot/finalize_aggregates/project_aggregates.

一、数据结构

AggState
聚合函数执行时状态结构体,内含AggStatePerAgg等结构体





//在nodeAgg.c中私有的结构体
typedef struct AggStatePerAggData *AggStatePerAgg;
typedef struct AggStatePerTransData *AggStatePerTrans;
typedef struct AggStatePerGroupData *AggStatePerGroup;
typedef struct AggStatePerPhaseData *AggStatePerPhase;
typedef struct AggStatePerHashData *AggStatePerHash;
typedef struct AggState
{
    //第一个字段是NodeTag(继承自ScanState)
    ScanState    ss;                
    //targetlist和quals中所有的Aggref
    List       *aggs;            
    //链表的大小(可以为0)
    int            numaggs;        
    //pertrans条目大小
    int            numtrans;        
    //Agg策略模式
    AggStrategy aggstrategy;    
    //agg-splitting模式,参见nodes.h
    AggSplit    aggsplit;        
    //指向当前步骤数据的指针
    AggStatePerPhase phase;        
    //步骤数(包括0)
    int            numphases;        
    //当前步骤
    int            current_phase;    
    //per-Aggref信息
    AggStatePerAgg peragg;        
    //per-Trans状态信息
    AggStatePerTrans pertrans;    
    //长生命周期数据的ExprContexts(hashtable)
    ExprContext *hashcontext;    
    ////长生命周期数据的ExprContexts(每一个GS使用)
    ExprContext **aggcontexts;    
    //输入表达式的ExprContext
    ExprContext *tmpcontext;    
#define FIELDNO_AGGSTATE_CURAGGCONTEXT 14
    //当前活跃的aggcontext
    ExprContext *curaggcontext; 
    //当前活跃的aggregate(如存在)
    AggStatePerAgg curperagg;    
#define FIELDNO_AGGSTATE_CURPERTRANS 16
    //当前活跃的trans state
    AggStatePerTrans curpertrans;    
    //输入结束?
    bool        input_done;        
    //Agg扫描结束?
    bool        agg_done;        
    //最后一个grouping set
    int            projected_set;    
#define FIELDNO_AGGSTATE_CURRENT_SET 20
    //将要解析的当前grouping set
    int            current_set;    
    //当前投影操作的分组列
    Bitmapset  *grouped_cols;    
    //倒序的分组列链表
    List       *all_grouped_cols;    
    
    //-------- 下面的列用于grouping set步骤数据
    //所有步骤中最大的sets大小
    int            maxsets;        
    //所有步骤的数组
    AggStatePerPhase phases;    
    //对于phases > 1,已排序的输入信息
    Tuplesortstate *sort_in;    
    //对于下一个步骤,输入已拷贝
    Tuplesortstate *sort_out;    
    //排序结果的slot
    TupleTableSlot *sort_slot;    
    
    //------- 下面的列用于AGG_PLAIN和AGG_SORTED模式:
    //per-group指针的grouping set编号数组
    AggStatePerGroup *pergroups;    
    //当前组的第一个元组拷贝
    HeapTuple    grp_firstTuple; 
    
    //--------- 下面的列用于AGG_HASHED和AGG_MIXED模式:
    //是否已填充hash表?
    bool        table_filled;    
    //hash桶数?
    int            num_hashes;
    //相应的哈希表数据数组
    AggStatePerHash perhash;    
    //per-group指针的grouping set编号数组
    AggStatePerGroup *hash_pergroup;    
    
    //---------- agg输入表达式解析支持
#define FIELDNO_AGGSTATE_ALL_PERGROUPS 34
    //首先是->pergroups,然后是hash_pergroup
    AggStatePerGroup *all_pergroups;    
    //投影实现机制
    ProjectionInfo *combinedproj;    
} AggState;

//nodeag .c支持的基本选项
#define AGGSPLITOP_COMBINE        0x01    
#define AGGSPLITOP_SKIPFINAL    0x02    
#define AGGSPLITOP_SERIALIZE    0x04    
#define AGGSPLITOP_DESERIALIZE    0x08    

//支持的操作模式
typedef enum AggSplit
{
    
    //基本 : 非split聚合
    AGGSPLIT_SIMPLE = 0,
    
    //部分聚合的初始步骤,序列化
    AGGSPLIT_INITIAL_SERIAL = AGGSPLITOP_SKIPFINAL | AGGSPLITOP_SERIALIZE,
    
    //部分聚合的最终步骤,反序列化
    AGGSPLIT_FINAL_DESERIAL = AGGSPLITOP_COMBINE | AGGSPLITOP_DESERIALIZE
} AggSplit;

//测试AggSplit选择了哪些基本选项
#define DO_AGGSPLIT_COMBINE(as)        (((as) & AGGSPLITOP_COMBINE) != 0)
#define DO_AGGSPLIT_SKIPFINAL(as)    (((as) & AGGSPLITOP_SKIPFINAL) != 0)
#define DO_AGGSPLIT_SERIALIZE(as)    (((as) & AGGSPLITOP_SERIALIZE) != 0)
#define DO_AGGSPLIT_DESERIALIZE(as) (((as) & AGGSPLITOP_DESERIALIZE) != 0)

二、源码解读

prepare_projection_slot
prepare_projection_slot函数基于指定的典型元组slot和grouping set准备finalize和project.
比如初始化isnull数组等.




static void
prepare_projection_slot(AggState *aggstate, TupleTableSlot *slot, int currentSet)
{
    if (aggstate->phase->grouped_cols)
    {
        Bitmapset  *grouped_cols = aggstate->phase->grouped_cols[currentSet];
        aggstate->grouped_cols = grouped_cols;
        if (slot->tts_isempty)
        {
            
            ExecStoreAllNullTuple(slot);
        }
        else if (aggstate->all_grouped_cols)
        {
            ListCell   *lc;
            
            //all_grouped_cols以倒序的方式组织
            slot_getsomeattrs(slot, linitial_int(aggstate->all_grouped_cols));
            foreach(lc, aggstate->all_grouped_cols)
            {
                int            attnum = lfirst_int(lc);
                if (!bms_is_member(attnum, grouped_cols))
                    slot->tts_isnull[attnum - 1] = true;
            }
        }
    }
}

finalize_aggregates
finalize_aggregates函数计算某一组所有聚合的最终值,实现函数是finalize_aggregate,该实现函数下节再行介绍.




static void
finalize_aggregates(AggState *aggstate,
                    AggStatePerAgg peraggs,
                    AggStatePerGroup pergroup)
{
    ExprContext *econtext = aggstate->ss.ps.ps_ExprContext;
    Datum       *aggvalues = econtext->ecxt_aggvalues;
    bool       *aggnulls = econtext->ecxt_aggnulls;
    int            aggno;
    int            transno;
    
    //遍历转换函数
    for (transno = 0; transno < aggstate->numtrans; transno++)
    {
        //转换函数
        AggStatePerTrans pertrans = &aggstate->pertrans[transno];
        //pergroup
        AggStatePerGroup pergroupstate;
        pergroupstate = &pergroup[transno];
        if (pertrans->numSortCols > 0)
        {
            //--- 存在DISTINCT/ORDER BY
            //验证,Hash不需要排序
            Assert(aggstate->aggstrategy != AGG_HASHED &&
                   aggstate->aggstrategy != AGG_MIXED);
            if (pertrans->numInputs == 1)
                //单独
                process_ordered_aggregate_single(aggstate,
                                                 pertrans,
                                                 pergroupstate);
            else
                //多个
                process_ordered_aggregate_multi(aggstate,
                                                pertrans,
                                                pergroupstate);
        }
    }
    
    //遍历聚合
    for (aggno = 0; aggno < aggstate->numaggs; aggno++)
    {
        //获取peragg
        AggStatePerAgg peragg = &peraggs[aggno];
        int            transno = peragg->transno;
        AggStatePerGroup pergroupstate;
        //pergroup
        pergroupstate = &pergroup[transno];
        if (DO_AGGSPLIT_SKIPFINAL(aggstate->aggsplit))
            //并行处理结果
            finalize_partialaggregate(aggstate, peragg, pergroupstate,
                                      &aggvalues[aggno], &aggnulls[aggno]);
        else
            //调用finalize_aggregate获取结果
            finalize_aggregate(aggstate, peragg, pergroupstate,
                               &aggvalues[aggno], &aggnulls[aggno]);
    }
}

project_aggregates
project_aggregates函数投影某一组的结果(该组结果已通过finalize_aggregates函数计算得到).




static TupleTableSlot *
project_aggregates(AggState *aggstate)
{
    ExprContext *econtext = aggstate->ss.ps.ps_ExprContext;
    
    if (ExecQual(aggstate->ss.ps.qual, econtext))
    {
        
        return ExecProject(aggstate->ss.ps.ps_ProjInfo);
    }
    else
        InstrCountFiltered1(aggstate, 1);
    return NULL;
}
#define InstrCountFiltered1(node, delta) \
    do { \
        if (((PlanState *)(node))->instrument) \
            ((PlanState *)(node))->instrument->nfiltered1 += (delta); \
    } while(0)

ExecProject
ExecProject函数基于投影信息投影元组并把元组存储在传递给ExecBuildProjectInfo()的slot参数中.




#ifndef FRONTEND
static inline TupleTableSlot *
ExecProject(ProjectionInfo *projInfo)
{
    ExprContext *econtext = projInfo->pi_exprContext;
    ExprState  *state = &projInfo->pi_state;
    TupleTableSlot *slot = state->resultslot;
    bool        isnull;
    
    ExecClearTuple(slot);
    
    //执行表达式解析,丢弃scalar结果.
    (void) ExecEvalExprSwitchContext(state, econtext, &isnull);
    
    slot->tts_isempty = false;
    slot->tts_nvalid = slot->tts_tupleDescriptor->natts;
    return slot;
}
#endif

三、跟踪分析

N/A

四、参考资料

PostgreSQL 源码解读(178)- 查询#95(聚合函数)#1相关数据结构

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

PostgreSQL 源码解读(189)- 查询#105(聚合函数#10 - agg_retrieve_hash_table)

下载Word文档到电脑,方便收藏和打印~

下载Word文档

猜你喜欢

PostgreSQL 源码解读(71)- 查询语句#56(make_one_rel函数#21-...

本节大体介绍了动态规划算法实现(standard_join_search)中的join_search_one_level->make_join_rel->populate_joinrel_with_paths->add_paths_to_j
2022-11-30

编程热搜

目录