PostgreSQL 源码解读(189)- 查询#105(聚合函数#10 - agg_retrieve_hash_table)
本节继续介绍聚合函数的实现,主要介绍了agg_retrieve_hash_table函数中与投影相关的实现逻辑,包括函数prepare_projection_slot/finalize_aggregates/project_aggregates.
一、数据结构
AggState
聚合函数执行时状态结构体,内含AggStatePerAgg等结构体
//在nodeAgg.c中私有的结构体
typedef struct AggStatePerAggData *AggStatePerAgg;
typedef struct AggStatePerTransData *AggStatePerTrans;
typedef struct AggStatePerGroupData *AggStatePerGroup;
typedef struct AggStatePerPhaseData *AggStatePerPhase;
typedef struct AggStatePerHashData *AggStatePerHash;
typedef struct AggState
{
//第一个字段是NodeTag(继承自ScanState)
ScanState ss;
//targetlist和quals中所有的Aggref
List *aggs;
//链表的大小(可以为0)
int numaggs;
//pertrans条目大小
int numtrans;
//Agg策略模式
AggStrategy aggstrategy;
//agg-splitting模式,参见nodes.h
AggSplit aggsplit;
//指向当前步骤数据的指针
AggStatePerPhase phase;
//步骤数(包括0)
int numphases;
//当前步骤
int current_phase;
//per-Aggref信息
AggStatePerAgg peragg;
//per-Trans状态信息
AggStatePerTrans pertrans;
//长生命周期数据的ExprContexts(hashtable)
ExprContext *hashcontext;
////长生命周期数据的ExprContexts(每一个GS使用)
ExprContext **aggcontexts;
//输入表达式的ExprContext
ExprContext *tmpcontext;
#define FIELDNO_AGGSTATE_CURAGGCONTEXT 14
//当前活跃的aggcontext
ExprContext *curaggcontext;
//当前活跃的aggregate(如存在)
AggStatePerAgg curperagg;
#define FIELDNO_AGGSTATE_CURPERTRANS 16
//当前活跃的trans state
AggStatePerTrans curpertrans;
//输入结束?
bool input_done;
//Agg扫描结束?
bool agg_done;
//最后一个grouping set
int projected_set;
#define FIELDNO_AGGSTATE_CURRENT_SET 20
//将要解析的当前grouping set
int current_set;
//当前投影操作的分组列
Bitmapset *grouped_cols;
//倒序的分组列链表
List *all_grouped_cols;
//-------- 下面的列用于grouping set步骤数据
//所有步骤中最大的sets大小
int maxsets;
//所有步骤的数组
AggStatePerPhase phases;
//对于phases > 1,已排序的输入信息
Tuplesortstate *sort_in;
//对于下一个步骤,输入已拷贝
Tuplesortstate *sort_out;
//排序结果的slot
TupleTableSlot *sort_slot;
//------- 下面的列用于AGG_PLAIN和AGG_SORTED模式:
//per-group指针的grouping set编号数组
AggStatePerGroup *pergroups;
//当前组的第一个元组拷贝
HeapTuple grp_firstTuple;
//--------- 下面的列用于AGG_HASHED和AGG_MIXED模式:
//是否已填充hash表?
bool table_filled;
//hash桶数?
int num_hashes;
//相应的哈希表数据数组
AggStatePerHash perhash;
//per-group指针的grouping set编号数组
AggStatePerGroup *hash_pergroup;
//---------- agg输入表达式解析支持
#define FIELDNO_AGGSTATE_ALL_PERGROUPS 34
//首先是->pergroups,然后是hash_pergroup
AggStatePerGroup *all_pergroups;
//投影实现机制
ProjectionInfo *combinedproj;
} AggState;
//nodeag .c支持的基本选项
#define AGGSPLITOP_COMBINE 0x01
#define AGGSPLITOP_SKIPFINAL 0x02
#define AGGSPLITOP_SERIALIZE 0x04
#define AGGSPLITOP_DESERIALIZE 0x08
//支持的操作模式
typedef enum AggSplit
{
//基本 : 非split聚合
AGGSPLIT_SIMPLE = 0,
//部分聚合的初始步骤,序列化
AGGSPLIT_INITIAL_SERIAL = AGGSPLITOP_SKIPFINAL | AGGSPLITOP_SERIALIZE,
//部分聚合的最终步骤,反序列化
AGGSPLIT_FINAL_DESERIAL = AGGSPLITOP_COMBINE | AGGSPLITOP_DESERIALIZE
} AggSplit;
//测试AggSplit选择了哪些基本选项
#define DO_AGGSPLIT_COMBINE(as) (((as) & AGGSPLITOP_COMBINE) != 0)
#define DO_AGGSPLIT_SKIPFINAL(as) (((as) & AGGSPLITOP_SKIPFINAL) != 0)
#define DO_AGGSPLIT_SERIALIZE(as) (((as) & AGGSPLITOP_SERIALIZE) != 0)
#define DO_AGGSPLIT_DESERIALIZE(as) (((as) & AGGSPLITOP_DESERIALIZE) != 0)
二、源码解读
prepare_projection_slot
prepare_projection_slot函数基于指定的典型元组slot和grouping set准备finalize和project.
比如初始化isnull数组等.
static void
prepare_projection_slot(AggState *aggstate, TupleTableSlot *slot, int currentSet)
{
if (aggstate->phase->grouped_cols)
{
Bitmapset *grouped_cols = aggstate->phase->grouped_cols[currentSet];
aggstate->grouped_cols = grouped_cols;
if (slot->tts_isempty)
{
ExecStoreAllNullTuple(slot);
}
else if (aggstate->all_grouped_cols)
{
ListCell *lc;
//all_grouped_cols以倒序的方式组织
slot_getsomeattrs(slot, linitial_int(aggstate->all_grouped_cols));
foreach(lc, aggstate->all_grouped_cols)
{
int attnum = lfirst_int(lc);
if (!bms_is_member(attnum, grouped_cols))
slot->tts_isnull[attnum - 1] = true;
}
}
}
}
finalize_aggregates
finalize_aggregates函数计算某一组所有聚合的最终值,实现函数是finalize_aggregate,该实现函数下节再行介绍.
static void
finalize_aggregates(AggState *aggstate,
AggStatePerAgg peraggs,
AggStatePerGroup pergroup)
{
ExprContext *econtext = aggstate->ss.ps.ps_ExprContext;
Datum *aggvalues = econtext->ecxt_aggvalues;
bool *aggnulls = econtext->ecxt_aggnulls;
int aggno;
int transno;
//遍历转换函数
for (transno = 0; transno < aggstate->numtrans; transno++)
{
//转换函数
AggStatePerTrans pertrans = &aggstate->pertrans[transno];
//pergroup
AggStatePerGroup pergroupstate;
pergroupstate = &pergroup[transno];
if (pertrans->numSortCols > 0)
{
//--- 存在DISTINCT/ORDER BY
//验证,Hash不需要排序
Assert(aggstate->aggstrategy != AGG_HASHED &&
aggstate->aggstrategy != AGG_MIXED);
if (pertrans->numInputs == 1)
//单独
process_ordered_aggregate_single(aggstate,
pertrans,
pergroupstate);
else
//多个
process_ordered_aggregate_multi(aggstate,
pertrans,
pergroupstate);
}
}
//遍历聚合
for (aggno = 0; aggno < aggstate->numaggs; aggno++)
{
//获取peragg
AggStatePerAgg peragg = &peraggs[aggno];
int transno = peragg->transno;
AggStatePerGroup pergroupstate;
//pergroup
pergroupstate = &pergroup[transno];
if (DO_AGGSPLIT_SKIPFINAL(aggstate->aggsplit))
//并行处理结果
finalize_partialaggregate(aggstate, peragg, pergroupstate,
&aggvalues[aggno], &aggnulls[aggno]);
else
//调用finalize_aggregate获取结果
finalize_aggregate(aggstate, peragg, pergroupstate,
&aggvalues[aggno], &aggnulls[aggno]);
}
}
project_aggregates
project_aggregates函数投影某一组的结果(该组结果已通过finalize_aggregates函数计算得到).
static TupleTableSlot *
project_aggregates(AggState *aggstate)
{
ExprContext *econtext = aggstate->ss.ps.ps_ExprContext;
if (ExecQual(aggstate->ss.ps.qual, econtext))
{
return ExecProject(aggstate->ss.ps.ps_ProjInfo);
}
else
InstrCountFiltered1(aggstate, 1);
return NULL;
}
#define InstrCountFiltered1(node, delta) \
do { \
if (((PlanState *)(node))->instrument) \
((PlanState *)(node))->instrument->nfiltered1 += (delta); \
} while(0)
ExecProject
ExecProject函数基于投影信息投影元组并把元组存储在传递给ExecBuildProjectInfo()的slot参数中.
#ifndef FRONTEND
static inline TupleTableSlot *
ExecProject(ProjectionInfo *projInfo)
{
ExprContext *econtext = projInfo->pi_exprContext;
ExprState *state = &projInfo->pi_state;
TupleTableSlot *slot = state->resultslot;
bool isnull;
ExecClearTuple(slot);
//执行表达式解析,丢弃scalar结果.
(void) ExecEvalExprSwitchContext(state, econtext, &isnull);
slot->tts_isempty = false;
slot->tts_nvalid = slot->tts_tupleDescriptor->natts;
return slot;
}
#endif
三、跟踪分析
N/A
四、参考资料
PostgreSQL 源码解读(178)- 查询#95(聚合函数)#1相关数据结构
免责声明:
① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。
② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341
PostgreSQL 源码解读(189)- 查询#105(聚合函数#10 - agg_retrieve_hash_table)
下载Word文档到电脑,方便收藏和打印~