我的编程空间,编程开发者的网络收藏夹
学习永远不晚

PostgreSQL 源码解读(191)- 查询#107(聚合函数#12 - agg_retrieve_direct)

短信预约 -IT技能 免费直播动态提醒
省份

北京

  • 北京
  • 上海
  • 天津
  • 重庆
  • 河北
  • 山东
  • 辽宁
  • 黑龙江
  • 吉林
  • 甘肃
  • 青海
  • 河南
  • 江苏
  • 湖北
  • 湖南
  • 江西
  • 浙江
  • 广东
  • 云南
  • 福建
  • 海南
  • 山西
  • 四川
  • 陕西
  • 贵州
  • 安徽
  • 广西
  • 内蒙
  • 西藏
  • 新疆
  • 宁夏
  • 兵团
手机号立即预约

请填写图片验证码后获取短信验证码

看不清楚,换张图片

免费获取短信验证码

PostgreSQL 源码解读(191)- 查询#107(聚合函数#12 - agg_retrieve_direct)

本节继续介绍聚合函数的实现,主要介绍了不使用Hash算法的情况下聚合函数的实现,在这种情况下会先排序后执行聚合,在ExecAgg节点执行前,已完成排序的操作.下面介绍在已完成排序的情况下聚合的实现,主要实现函数是ExecAgg->agg_retrieve_direct.

下面是不使用HashAggregate情况下GroupAggregate的计划树:



",,,,,"select bh,avg(c1),min(c1),max(c2) from t_agg_simple group by bh;",,,"psql"
2019-05-16 12:04:45.621 CST,"xdb","testdb",1545,"[local]",5cdce11a.609,5,"SELECT",2019-05-16 12:03:38 CST,3/4,0,LOG,00000,"plan:","   {PLANNEDSTMT 
   :commandType 1 
   :queryId 0 
   :hasReturning false 
   :hasModifyingCTE false 
   :canSetTag true 
   :transientPlan false 
   :dependsOnRole false 
   :parallelModeNeeded false 
   :jitFlags 0 
   :planTree 
      {AGG 
      :startup_cost 52.67 
      :total_cost 64.42 
      :plan_rows 200 
      :plan_width 98 
      :parallel_aware false 
      :parallel_safe true 
      :plan_node_id 0 
      :targetlist (...
      )
      :qual <> 
      :lefttree 
         {SORT 
         :startup_cost 52.67 
         :total_cost 54.52 
         :plan_rows 740 
         :plan_width 66 
         :parallel_aware false 
         :parallel_safe true 
         :plan_node_id 1 
         :targetlist (...
         )
         :qual <> 
         :lefttree 
            {SEQSCAN 
            :startup_cost 0.00 
            :total_cost 17.40 
            :plan_rows 740 
            :plan_width 66 
            :parallel_aware false 
            :parallel_safe true 
            :plan_node_id 2 
            :targetlist (...
            )
            :qual <> 
            :lefttree <> 
            :righttree <> 
            :initPlan <> 
            :extParam (b)
            :allParam (b)
            :scanrelid 1
            }
         :righttree <> 
         :initPlan <> 
         :extParam (b)
         :allParam (b)
         :numCols 1 
         :sortColIdx 1 
         :sortOperators 664 
         :collations 100 
         :nullsFirst false
         }
      :righttree <> 
      :initPlan <> 
      :extParam (b)
      :allParam (b)
      :aggstrategy 1 
      :aggsplit 0 
      :numCols 1 
      :grpColIdx 1 
      :grpOperators 98 
      :numGroups 200 
      :aggParams (b)
      :groupingSets <> 
      :chain <>
      }
   :rtable (...
   )
   :resultRelations <> 
   :nonleafResultRelations <> 
   :rootResultRelations <> 
   :subplans <> 
   :rewindPlanIDs (b)
   :rowMarks <> 
   :relationOids (o 270375)
   :invalItems <> 
   :paramExecTypes <> 
   :utilityStmt <> 
   :stmt_location 0 
   :stmt_len 63
   }

可以看到,在ExecAgg前会先执行ExecSort.

一、数据结构

AggState
聚合函数执行时状态结构体,内含AggStatePerAgg等结构体





//在nodeAgg.c中私有的结构体
typedef struct AggStatePerAggData *AggStatePerAgg;
typedef struct AggStatePerTransData *AggStatePerTrans;
typedef struct AggStatePerGroupData *AggStatePerGroup;
typedef struct AggStatePerPhaseData *AggStatePerPhase;
typedef struct AggStatePerHashData *AggStatePerHash;
typedef struct AggState
{
    //第一个字段是NodeTag(继承自ScanState)
    ScanState    ss;                
    //targetlist和quals中所有的Aggref
    List       *aggs;            
    //链表的大小(可以为0)
    int            numaggs;        
    //pertrans条目大小
    int            numtrans;        
    //Agg策略模式
    AggStrategy aggstrategy;    
    //agg-splitting模式,参见nodes.h
    AggSplit    aggsplit;        
    //指向当前步骤数据的指针
    AggStatePerPhase phase;        
    //步骤数(包括0)
    int            numphases;        
    //当前步骤
    int            current_phase;    
    //per-Aggref信息
    AggStatePerAgg peragg;        
    //per-Trans状态信息
    AggStatePerTrans pertrans;    
    //长生命周期数据的ExprContexts(hashtable)
    ExprContext *hashcontext;    
    ////长生命周期数据的ExprContexts(每一个GS使用)
    ExprContext **aggcontexts;    
    //输入表达式的ExprContext
    ExprContext *tmpcontext;    
#define FIELDNO_AGGSTATE_CURAGGCONTEXT 14
    //当前活跃的aggcontext
    ExprContext *curaggcontext; 
    //当前活跃的aggregate(如存在)
    AggStatePerAgg curperagg;    
#define FIELDNO_AGGSTATE_CURPERTRANS 16
    //当前活跃的trans state
    AggStatePerTrans curpertrans;    
    //输入结束?
    bool        input_done;        
    //Agg扫描结束?
    bool        agg_done;        
    //最后一个grouping set
    int            projected_set;    
#define FIELDNO_AGGSTATE_CURRENT_SET 20
    //将要解析的当前grouping set
    int            current_set;    
    //当前投影操作的分组列
    Bitmapset  *grouped_cols;    
    //倒序的分组列链表
    List       *all_grouped_cols;    
    
    //-------- 下面的列用于grouping set步骤数据
    //所有步骤中最大的sets大小
    int            maxsets;        
    //所有步骤的数组
    AggStatePerPhase phases;    
    //对于phases > 1,已排序的输入信息
    Tuplesortstate *sort_in;    
    //对于下一个步骤,输入已拷贝
    Tuplesortstate *sort_out;    
    //排序结果的slot
    TupleTableSlot *sort_slot;    
    
    //------- 下面的列用于AGG_PLAIN和AGG_SORTED模式:
    //per-group指针的grouping set编号数组
    AggStatePerGroup *pergroups;    
    //当前组的第一个元组拷贝
    HeapTuple    grp_firstTuple; 
    
    //--------- 下面的列用于AGG_HASHED和AGG_MIXED模式:
    //是否已填充hash表?
    bool        table_filled;    
    //hash桶数?
    int            num_hashes;
    //相应的哈希表数据数组
    AggStatePerHash perhash;    
    //per-group指针的grouping set编号数组
    AggStatePerGroup *hash_pergroup;    
    
    //---------- agg输入表达式解析支持
#define FIELDNO_AGGSTATE_ALL_PERGROUPS 34
    //首先是->pergroups,然后是hash_pergroup
    AggStatePerGroup *all_pergroups;    
    //投影实现机制
    ProjectionInfo *combinedproj;    
} AggState;

//nodeag .c支持的基本选项
#define AGGSPLITOP_COMBINE        0x01    
#define AGGSPLITOP_SKIPFINAL    0x02    
#define AGGSPLITOP_SERIALIZE    0x04    
#define AGGSPLITOP_DESERIALIZE    0x08    

//支持的操作模式
typedef enum AggSplit
{
    
    //基本 : 非split聚合
    AGGSPLIT_SIMPLE = 0,
    
    //部分聚合的初始步骤,序列化
    AGGSPLIT_INITIAL_SERIAL = AGGSPLITOP_SKIPFINAL | AGGSPLITOP_SERIALIZE,
    
    //部分聚合的最终步骤,反序列化
    AGGSPLIT_FINAL_DESERIAL = AGGSPLITOP_COMBINE | AGGSPLITOP_DESERIALIZE
} AggSplit;

//测试AggSplit选择了哪些基本选项
#define DO_AGGSPLIT_COMBINE(as)        (((as) & AGGSPLITOP_COMBINE) != 0)
#define DO_AGGSPLIT_SKIPFINAL(as)    (((as) & AGGSPLITOP_SKIPFINAL) != 0)
#define DO_AGGSPLIT_SERIALIZE(as)    (((as) & AGGSPLITOP_SERIALIZE) != 0)
#define DO_AGGSPLIT_DESERIALIZE(as) (((as) & AGGSPLITOP_DESERIALIZE) != 0)

二、源码解读

agg_retrieve_direct
agg_retrieve_direct计算聚合的最终结果,适用于不使用Hash算法的情况.




static TupleTableSlot *
agg_retrieve_direct(AggState *aggstate)
{
    Agg           *node = aggstate->phase->aggnode;//aggstate Node
    ExprContext *econtext;//表达式解析上下文
    ExprContext *tmpcontext;//临时上下文
    AggStatePerAgg peragg;//聚合
    AggStatePerGroup *pergroups;//分组信息
    TupleTableSlot *outerslot;//outer元组slot
    TupleTableSlot *firstSlot;//第1个slot
    TupleTableSlot *result;//结果元组
    bool        hasGroupingSets = aggstate->phase->numsets > 0;//是否有grouping set
    int            numGroupingSets = Max(aggstate->phase->numsets, 1);
    int            currentSet;
    int            nextSetSize;
    int            numReset;
    int            i;
    
    econtext = aggstate->ss.ps.ps_ExprContext;
    tmpcontext = aggstate->tmpcontext;
    peragg = aggstate->peragg;
    pergroups = aggstate->pergroups;
    firstSlot = aggstate->ss.ss_ScanTupleSlot;
    
    while (!aggstate->agg_done)
    {
        //----------- 循环处理
        
        ReScanExprContext(econtext);
        
        if (aggstate->projected_set >= 0 &&
            aggstate->projected_set < numGroupingSets)
            numReset = aggstate->projected_set + 1;
        else
            numReset = numGroupingSets;
        
        for (i = 0; i < numReset; i++)
        {
            ReScanExprContext(aggstate->aggcontexts[i]);
        }
        
        if (aggstate->input_done == true &&
            aggstate->projected_set >= (numGroupingSets - 1))
        {
            if (aggstate->current_phase < aggstate->numphases - 1)
            {
                //仍在处理中
                initialize_phase(aggstate, aggstate->current_phase + 1);
                aggstate->input_done = false;
                aggstate->projected_set = -1;
                numGroupingSets = Max(aggstate->phase->numsets, 1);
                node = aggstate->phase->aggnode;
                numReset = numGroupingSets;
            }
            else if (aggstate->aggstrategy == AGG_MIXED)
            {
                //照理,不会进入这个分支(AGG_MIXED不是Hash才有吗?)
                
                initialize_phase(aggstate, 0);
                aggstate->table_filled = true;
                ResetTupleHashIterator(aggstate->perhash[0].hashtable,
                                       &aggstate->perhash[0].hashiter);
                select_current_set(aggstate, 0, true);
                return agg_retrieve_hash_table(aggstate);
            }
            else
            {
                //已完成处理
                aggstate->agg_done = true;
                break;
            }
        }
        
        if (aggstate->projected_set >= 0 &&
            aggstate->projected_set < (numGroupingSets - 1))
            nextSetSize = aggstate->phase->gset_lengths[aggstate->projected_set + 1];
        else
            nextSetSize = 0;
        
        tmpcontext->ecxt_innertuple = econtext->ecxt_outertuple;
        if (aggstate->input_done ||
            (node->aggstrategy != AGG_PLAIN &&
             aggstate->projected_set != -1 &&
             aggstate->projected_set < (numGroupingSets - 1) &&
             nextSetSize > 0 &&
             !ExecQualAndReset(aggstate->phase->eqfunctions[nextSetSize - 1],
                               tmpcontext)))
        {
            aggstate->projected_set += 1;
            Assert(aggstate->projected_set < numGroupingSets);
            Assert(nextSetSize > 0 || aggstate->input_done);
        }
        else
        {
            
            aggstate->projected_set = 0;
            
            if (aggstate->grp_firstTuple == NULL)
            {
                //提取一行
                outerslot = fetch_input_tuple(aggstate);
                if (!TupIsNull(outerslot))
                {
                    //成功提取一行
                    
                    aggstate->grp_firstTuple = ExecCopySlotTuple(outerslot);
                }
                else
                {
                    
                    //不再产生新行
                    if (hasGroupingSets)
                    {
                        //----------- 存在grouping set
                        
                        aggstate->input_done = true;
                        while (aggstate->phase->gset_lengths[aggstate->projected_set] > 0)
                        {
                            aggstate->projected_set += 1;
                            if (aggstate->projected_set >= numGroupingSets)
                            {
                                
                                break;
                            }
                        }
                        if (aggstate->projected_set >= numGroupingSets)
                            continue;
                    }
                    else
                    {
                        aggstate->agg_done = true;
                        
                        if (node->aggstrategy != AGG_PLAIN)
                            return NULL;
                    }
                }
            }
            
            initialize_aggregates(aggstate, pergroups, numReset);
            if (aggstate->grp_firstTuple != NULL)
            {
                
                ExecStoreTuple(aggstate->grp_firstTuple,
                               firstSlot,
                               InvalidBuffer,
                               true);
                aggstate->grp_firstTuple = NULL;    
                
                //为第一次advance_aggregates调用设置参数
                tmpcontext->ecxt_outertuple = firstSlot;
                
                for (;;)
                {
                    
                    if (aggstate->aggstrategy == AGG_MIXED &&
                        aggstate->current_phase == 1)
                    {
                        lookup_hash_entries(aggstate);
                    }
                    
                    //推动聚合(或者组合函数)
                    advance_aggregates(aggstate);
                    
                    //在每一个元组后重置per-input-tuple上下文
                    ResetExprContext(tmpcontext);
                    outerslot = fetch_input_tuple(aggstate);
                    if (TupIsNull(outerslot))
                    {
                        
                        //已无更多可用的outer slot
                        if (hasGroupingSets)
                        {
                            aggstate->input_done = true;
                            break;
                        }
                        else
                        {
                            aggstate->agg_done = true;
                            break;
                        }
                    }
                    
                    //为下一次advance_aggregates调用作准备
                    tmpcontext->ecxt_outertuple = outerslot;
                    
                    if (node->aggstrategy != AGG_PLAIN)
                    {
                        tmpcontext->ecxt_innertuple = firstSlot;
                        if (!ExecQual(aggstate->phase->eqfunctions[node->numCols - 1],
                                      tmpcontext))
                        {
                            aggstate->grp_firstTuple = ExecCopySlotTuple(outerslot);
                            break;
                        }
                    }
                }
            }
            
            econtext->ecxt_outertuple = firstSlot;
        }
        Assert(aggstate->projected_set >= 0);
        currentSet = aggstate->projected_set;
        //投影处理
        prepare_projection_slot(aggstate, econtext->ecxt_outertuple, currentSet);
        select_current_set(aggstate, currentSet, false);
        finalize_aggregates(aggstate,
                            peragg,
                            pergroups[currentSet]);
        
        result = project_aggregates(aggstate);
        if (result)
            return result;
    }
    
    //DONE!
    return NULL;
}

三、跟踪分析

测试脚本



-- 禁用并行
set max_parallel_workers_per_gather=0;
-- 禁用hashagg
set enable_hashagg = off;
select bh,avg(c1),min(c1),max(c2) from t_agg_simple group by bh;

跟踪分析



(gdb) b agg_retrieve_direct
Breakpoint 1 at 0x6ee511: file nodeAgg.c, line 1572.
(gdb) c
Continuing.
Breakpoint 1, agg_retrieve_direct (aggstate=0x268f640) at nodeAgg.c:1572
1572        Agg           *node = aggstate->phase->aggnode;

输入参数



(gdb) p *aggstate
$1 = {ss = {ps = {type = T_AggState, plan = 0x25af578, state = 0x268f428, ExecProcNode = 0x6ee438 <ExecAgg>, 
      ExecProcNodeReal = 0x6ee438 <ExecAgg>, instrument = 0x0, worker_instrument = 0x0, worker_jit_instrument = 0x0, 
      qual = 0x0, lefttree = 0x268faf0, righttree = 0x0, initPlan = 0x0, subPlan = 0x0, chgParam = 0x0, 
      ps_ResultTupleSlot = 0x2690d50, ps_ExprContext = 0x268fa30, ps_ProjInfo = 0x2690e90, scandesc = 0x26907a0}, 
    ss_currentRelation = 0x0, ss_currentScanDesc = 0x0, ss_ScanTupleSlot = 0x2690a78}, aggs = 0x25d4290, numaggs = 3, 
  numtrans = 3, aggstrategy = AGG_SORTED, aggsplit = AGGSPLIT_SIMPLE, phase = 0x2691290, numphases = 2, current_phase = 1, 
  peragg = 0x2690f28, pertrans = 0x26b24d0, hashcontext = 0x0, aggcontexts = 0x268f858, tmpcontext = 0x268f878, 
  curaggcontext = 0x268f970, curperagg = 0x0, curpertrans = 0x0, input_done = false, agg_done = false, projected_set = -1, 
  current_set = 0, grouped_cols = 0x0, all_grouped_cols = 0x0, maxsets = 1, phases = 0x2691258, sort_in = 0x0, 
  sort_out = 0x0, sort_slot = 0x0, pergroups = 0x25d4da0, grp_firstTuple = 0x0, table_filled = false, num_hashes = 0, 
  perhash = 0x0, hash_pergroup = 0x0, all_pergroups = 0x25d4da0, combinedproj = 0x0}

需要2个阶段,分别是AGG_PLAIN/AGG_SORTED



(gdb) p aggstate->phases[0]
$2 = {aggstrategy = AGG_PLAIN, numsets = 0, gset_lengths = 0x0, grouped_cols = 0x0, eqfunctions = 0x0, aggnode = 0x0, 
  sortnode = 0x0, evaltrans = 0x0}
(gdb) p aggstate->phases[1]
$3 = {aggstrategy = AGG_SORTED, numsets = 0, gset_lengths = 0x0, grouped_cols = 0x0, eqfunctions = 0x25d4388, 
  aggnode = 0x25af578, sortnode = 0x0, evaltrans = 0x25d5488}

不存在grouping set.
变量numGroupingSets设置为1



(gdb) n
1580        bool        hasGroupingSets = aggstate->phase->numsets > 0;
(gdb) p aggstate->phase->numsets
$5 = 0
(gdb) n
1581        int            numGroupingSets = Max(aggstate->phase->numsets, 1);
(gdb) n
1594        econtext = aggstate->ss.ps.ps_ExprContext;
(gdb) p numGroupingSets
$6 = 1

设置内存上下文



(gdb) n
1595        tmpcontext = aggstate->tmpcontext;
(gdb) 
1597        peragg = aggstate->peragg;
(gdb) p *econtext
$7 = {type = T_ExprContext, ecxt_scantuple = 0x0, ecxt_innertuple = 0x0, ecxt_outertuple = 0x0, 
  ecxt_per_query_memory = 0x268f310, ecxt_per_tuple_memory = 0x26a6370, ecxt_param_exec_vals = 0x0, 
  ecxt_param_list_info = 0x0, ecxt_aggvalues = 0x25d4d48, ecxt_aggnulls = 0x25d4d80, caseValue_datum = 0, 
  caseValue_isNull = true, domainValue_datum = 0, domainValue_isNull = true, ecxt_estate = 0x268f428, ecxt_callbacks = 0x0}
(gdb) p *tmpcontext
$8 = {type = T_ExprContext, ecxt_scantuple = 0x0, ecxt_innertuple = 0x0, ecxt_outertuple = 0x0, 
  ecxt_per_query_memory = 0x268f310, ecxt_per_tuple_memory = 0x2691320, ecxt_param_exec_vals = 0x0, 
  ecxt_param_list_info = 0x0, ecxt_aggvalues = 0x0, ecxt_aggnulls = 0x0, caseValue_datum = 0, caseValue_isNull = true, 
  domainValue_datum = 0, domainValue_isNull = true, ecxt_estate = 0x268f428, ecxt_callbacks = 0x0}
(gdb)

获取聚合信息,一共有3个



(gdb) n
1598        pergroups = aggstate->pergroups;
(gdb)
1599        firstSlot = aggstate->ss.ss_ScanTupleSlot;
(gdb) p *peragg
$9 = {aggref = 0x26a02d0, transno = 0, finalfn_oid = 0, finalfn = {fn_addr = 0x0, fn_oid = 0, fn_nargs = 0, 
    fn_strict = false, fn_retset = false, fn_stats = 0 '\000', fn_extra = 0x0, fn_mcxt = 0x0, fn_expr = 0x0}, 
  numFinalArgs = 1, aggdirectargs = 0x0, resulttypeLen = 4, resulttypeByVal = true, shareable = true}
(gdb) p peragg[0]
$10 = {aggref = 0x26a02d0, transno = 0, finalfn_oid = 0, finalfn = {fn_addr = 0x0, fn_oid = 0, fn_nargs = 0, 
    fn_strict = false, fn_retset = false, fn_stats = 0 '\000', fn_extra = 0x0, fn_mcxt = 0x0, fn_expr = 0x0}, 
  numFinalArgs = 1, aggdirectargs = 0x0, resulttypeLen = 4, resulttypeByVal = true, shareable = true}
(gdb) p peragg[1]
$11 = {aggref = 0x26a0048, transno = 1, finalfn_oid = 0, finalfn = {fn_addr = 0x0, fn_oid = 0, fn_nargs = 0, 
    fn_strict = false, fn_retset = false, fn_stats = 0 '\000', fn_extra = 0x0, fn_mcxt = 0x0, fn_expr = 0x0}, 
  numFinalArgs = 1, aggdirectargs = 0x0, resulttypeLen = 4, resulttypeByVal = true, shareable = true}
(gdb) p peragg[2]
$12 = {aggref = 0x269fdc0, transno = 2, finalfn_oid = 1964, finalfn = {fn_addr = 0x978251 <int8_avg>, fn_oid = 1964, 
    fn_nargs = 1, fn_strict = true, fn_retset = false, fn_stats = 2 '\002', fn_extra = 0x0, fn_mcxt = 0x268f310, 
    fn_expr = 0x25d5190}, numFinalArgs = 1, aggdirectargs = 0x0, resulttypeLen = -1, resulttypeByVal = false, 
  shareable = true}

分组只有一个



(gdb) p pergroups[0]
$14 = (AggStatePerGroup) 0x25d4dc0
(gdb) p *pergroups[0]
$15 = {transValue = 0, transValueIsNull = false, noTransValue = false}

进入循环



(gdb) n
1610        while (!aggstate->agg_done)
(gdb) 
1624            ReScanExprContext(econtext);
(gdb)

重置内存上下文



(gdb) 
1629            if (aggstate->projected_set >= 0 &&
(gdb) 
1633                numReset = numGroupingSets;
(gdb) 
1642            for (i = 0; i < numReset; i++)
(gdb) 
1644                ReScanExprContext(aggstate->aggcontexts[i]);
(gdb) 
1642            for (i = 0; i < numReset; i++)
(gdb)

检查输入是否已完成处理/本组已完成投影(实际不满足条件)



(gdb) 
1651            if (aggstate->input_done == true &&
(gdb) 
1688            if (aggstate->projected_set >= 0 &&
(gdb) p aggstate->input_done
$16 = false
(gdb) p aggstate->projected_set
$17 = -1

设置待处理的元组,为NULL



(gdb) 
1711            tmpcontext->ecxt_innertuple = econtext->ecxt_outertuple;
(gdb) 
1712            if (aggstate->input_done ||
(gdb) 
(gdb) p *tmpcontext->ecxt_innertuple
Cannot access memory at address 0x0

如果子分组已存在,则执行投影(实际不满足条件).



(gdb) n
1713                (node->aggstrategy != AGG_PLAIN &&
(gdb) p node->aggstrategy
$18 = AGG_SORTED
(gdb) n
1712            if (aggstate->input_done ||
(gdb) 
1714                 aggstate->projected_set != -1 &&
(gdb) 
1713                (node->aggstrategy != AGG_PLAIN &&
(gdb) 
1732                aggstate->projected_set = 0;
(gdb) p aggstate->input_done
$19 = false
(gdb) p aggstate->projected_set
$20 = -1
(gdb) p node->aggstrategy
$21 = AGG_SORTED
(gdb)

从outer plan中提取一行,并拷贝为首行



(gdb) n
1738                if (aggstate->grp_firstTuple == NULL)
(gdb) p aggstate->grp_firstTuple
$22 = (HeapTuple) 0x0
(gdb) n
1740                    outerslot = fetch_input_tuple(aggstate);
(gdb) 
1741                    if (!TupIsNull(outerslot))
(gdb) p *outerslot
$23 = {type = T_TupleTableSlot, tts_isempty = false, tts_shouldFree = false, tts_shouldFreeMin = false, tts_slow = false, 
  tts_tuple = 0x26909f8, tts_tupleDescriptor = 0x26907a0, tts_mcxt = 0x268f310, tts_buffer = 0, tts_nvalid = 0, 
  tts_values = 0x2690a18, tts_isnull = 0x2690a30, tts_mintuple = 0x26b8ad8, tts_minhdr = {t_len = 40, t_self = {ip_blkid = {
        bi_hi = 0, bi_lo = 0}, ip_posid = 0}, t_tableOid = 0, t_data = 0x26b8ad0}, tts_off = 0, 
  tts_fixedTupleDescriptor = true}
(gdb) 
(gdb) n
1747                        aggstate->grp_firstTuple = ExecCopySlotTuple(outerslot);
(gdb)

为新输入的元组组初始化工作状态.



(gdb) 
1797                initialize_aggregates(aggstate, pergroups, numReset);

把元组拷贝到内存上下文中,并执行聚合运算(advance_aggregates)



(gdb) n
1799                if (aggstate->grp_firstTuple != NULL)
(gdb) 
1806                    ExecStoreTuple(aggstate->grp_firstTuple,
(gdb) 
1810                    aggstate->grp_firstTuple = NULL;    
(gdb) 
1813                    tmpcontext->ecxt_outertuple = firstSlot;
(gdb) 
1825                        if (aggstate->aggstrategy == AGG_MIXED &&
(gdb) 
1832                        advance_aggregates(aggstate);
(gdb) 
1835                        ResetExprContext(tmpcontext);
(gdb)

继续提取行,拷贝到内存上下文中



(gdb) n
1837                        outerslot = fetch_input_tuple(aggstate);
(gdb) 
1838                        if (TupIsNull(outerslot))
(gdb) 
1853                        tmpcontext->ecxt_outertuple = outerslot;
(gdb) 
1859                        if (node->aggstrategy != AGG_PLAIN)
(gdb) 
1861                            tmpcontext->ecxt_innertuple = firstSlot;
(gdb) 
1862                            if (!ExecQual(aggstate->phase->eqfunctions[node->numCols - 1],
(gdb) 
1869                    }

执行聚合运算,并继续提取下一行



825                        if (aggstate->aggstrategy == AGG_MIXED &&
(gdb) 
1832                        advance_aggregates(aggstate);
(gdb) 
1835                        ResetExprContext(tmpcontext);
(gdb) 
1837                        outerslot = fetch_input_tuple(aggstate);
(gdb) 
1838                        if (TupIsNull(outerslot))
(gdb) 
1853                        tmpcontext->ecxt_outertuple = outerslot;
(gdb) 
1859                        if (node->aggstrategy != AGG_PLAIN)
(gdb) 
1861                            tmpcontext->ecxt_innertuple = firstSlot;
(gdb)

如果是分组,检查是否已跨越分组边界,如已越界在跳出循环.



1862                            if (!ExecQual(aggstate->phase->eqfunctions[node->numCols - 1],
(gdb) 
1865                                aggstate->grp_firstTuple = ExecCopySlotTuple(outerslot);
(gdb) 
1866                                break;

已获得一行结果行,返回结果



(gdb) 
1880                econtext->ecxt_outertuple = firstSlot;
(gdb) n
1883            Assert(aggstate->projected_set >= 0);
(gdb) 
1885            currentSet = aggstate->projected_set;
(gdb) 
1887            prepare_projection_slot(aggstate, econtext->ecxt_outertuple, currentSet);
(gdb) p aggstate->projected_set
$24 = 0
(gdb) n
1889            select_current_set(aggstate, currentSet, false);
(gdb) 
1893                                pergroups[currentSet]);
(gdb) 
1891            finalize_aggregates(aggstate,
(gdb) 
1899            result = project_aggregates(aggstate);
(gdb) 
1900            if (result)
(gdb) 
1901                return result;
(gdb) p *result
$25 = {type = T_TupleTableSlot, tts_isempty = false, tts_shouldFree = false, tts_shouldFreeMin = false, tts_slow = false, 
  tts_tuple = 0x0, tts_tupleDescriptor = 0x2690b38, tts_mcxt = 0x268f310, tts_buffer = 0, tts_nvalid = 4, 
  tts_values = 0x2690db0, tts_isnull = 0x2690dd0, tts_mintuple = 0x0, tts_minhdr = {t_len = 0, t_self = {ip_blkid = {
        bi_hi = 0, bi_lo = 0}, ip_posid = 0}, t_tableOid = 0, t_data = 0x0}, tts_off = 0, tts_fixedTupleDescriptor = true}
(gdb)

DONE!

四、参考资料

PostgreSQL 源码解读(178)- 查询#95(聚合函数)#1相关数据结构
PostgreSQL 源码解读(186)- 查询#102(聚合函数#7-advance_aggregates)

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

PostgreSQL 源码解读(191)- 查询#107(聚合函数#12 - agg_retrieve_direct)

下载Word文档到电脑,方便收藏和打印~

下载Word文档

猜你喜欢

PostgreSQL 源码解读(71)- 查询语句#56(make_one_rel函数#21-...

本节大体介绍了动态规划算法实现(standard_join_search)中的join_search_one_level->make_join_rel->populate_joinrel_with_paths->add_paths_to_j
2022-11-30

编程热搜

目录