PostgreSQL 源码解读(199)- 查询#114(排序#7 - inittapes&dumptuples)
短信预约 -IT技能 免费直播动态提醒
本节继续介绍排序的实现,主要内容是tuplesort_puttupleslot->puttuple_common调用的inittapes和dumptuples函数.
在内存不能满足排序需求时,使用了Polyphase Merging排序
一、数据结构
Tuplesortstate
Tuplesort操作的私有状态.
typedef enum
{
//装载元组,在内存限制之内
TSS_INITIAL,
//装载元组到有界堆中
TSS_BOUNDED,
//装载元组,写入到tape中
TSS_BUILDRUNS,
//完全在内存中完成排序
TSS_SORTEDINMEM,
//完成排序,最后在tape上执行排序
TSS_SORTEDONTAPE,
//不落地执行最后的归并
TSS_FINALMERGE
} TupSortStatus;
#define MINORDER 6
#define MAXORDER 500
#define TAPE_BUFFER_OVERHEAD BLCKSZ
#define MERGE_BUFFER_SIZE (BLCKSZ * 32)
typedef int (*SortTupleComparator) (const SortTuple *a, const SortTuple *b,
Tuplesortstate *state);
struct Tuplesortstate
{
//状态 : 枚举值详见上面的信息
TupSortStatus status;
//sort key中的列数
int nKeys;
//调用者需要随机访问?
bool randomAccess;
//调用者是否指定了最大返回的元组的数目?
bool bounded;
//使用有界堆,则返回T
bool boundUsed;
//如为有界堆,这里存储最大的元组个数
int bound;
//SortTuple.tuple是否可以设置?
bool tuples;
//剩余可用内存大小(单位:字节)
int64 availMem;
//允许的内存总大小(单位:字节)
int64 allowedMem;
//tapes个数
int maxTapes;
//tapes个数 - 1
int tapeRange;
//主要用于排序数据的内存上下文
MemoryContext sortcontext;
//用于元组数据的sortcontext的子上下文
MemoryContext tuplecontext;
//临时文件中tapes的logtape.c对象
LogicalTapeSet *tapeset;
SortTupleComparator comparetup;
void (*copytup) (Tuplesortstate *state, SortTuple *stup, void *tup);
void (*writetup) (Tuplesortstate *state, int tapenum,
SortTuple *stup);
void (*readtup) (Tuplesortstate *state, SortTuple *stup,
int tapenum, unsigned int len);
//SortTuple结构体数组
SortTuple *memtuples;
//当前存在的元组数
int memtupcount;
//memtuples数组的已分配的大小
int memtupsize;
//memtuples的增长仍在进行中?
bool growmemtuples;
bool slabAllocatorUsed;
//slab内存空间的起始位置
char *slabMemoryBegin;
//slab内存空间的结束位置
char *slabMemoryEnd;
//链表头
SlabSlot *slabFreeHead;
//在归并期间用于读取输入tapes的缓存大小
size_t read_buffer_size;
void *lastReturnedTuple;
int currentRun;
//活跃的输入run源?
bool *mergeactive;
//Knuth's l
int Level;
//当前输出tape(Knuth's j)
int destTape;
//目标斐波拉契run计数(A[])
int *tp_fib;
//每一个tape上真正runs的编号
int *tp_runs;
//每一个tape(D[])上虚拟runs的编号
int *tp_dummy;
//实际的tape编号(TAPE[])
int *tp_tapenum;
//归并轮中的活动输入tapes编号
int activeTapes;
//已完成输出的实际tape编号
int result_tape;
//数组编号(仅用于SORTEDINMEM)
int current;
//是否到达EOF(用于游标)
bool eof_reached;
//markpos_xxx保持已标记的位置,用于标记和存储
//tape block编号(只用于SORTEDONTAPE)
long markpos_block;
//存储的"current",或者tape块中的偏移
int markpos_offset;
//存储的eof_reached
bool markpos_eof;
int worker;
Sharedsort *shared;
int nParticipants;
TupleDesc tupDesc;
//长度nKeys数组
SortSupport sortKeys;
SortSupport onlyKey;
int64 abbrevNext;
//将用于依赖的索引信息
IndexInfo *indexInfo;
//解析索引表达式的运行期状态
EState *estate;
//数据表
Relation heapRel;
//正在创建的index
Relation indexRel;
//这些仅在index_btree下使用
//如发现重复元组,则提示
bool enforceUnique;
//index_hash情况
uint32 high_mask;
uint32 low_mask;
uint32 max_buckets;
Oid datumType;
//需要typelen用于知道如何拷贝Datums.
int datumTypeLen;
#ifdef TRACE_SORT
PGRUsage ru_start;
#endif
};
二、源码解读
inittapes
初始化tape sorting(Polyphase Merging).
static void
inittapes(Tuplesortstate *state, bool mergeruns)
{
int maxTapes,//最大tapes
j;
Assert(!LEADER(state));
if (mergeruns)
{
//计算tapes数 : 归并顺序 + 1
maxTapes = tuplesort_merge_order(state->allowedMem) + 1;
}
else
{
//Worker进程有时可能产生单个run,不需要归并直接输出.
Assert(WORKER(state));
maxTapes = MINORDER + 1;
}
#ifdef TRACE_SORT
if (trace_sort)
elog(LOG, "worker %d switching to external sort with %d tapes: %s",
state->worker, maxTapes, pg_rusage_show(&state->ru_start));
#endif
//创建tape集合并分配per-tape数据数组
inittapestate(state, maxTapes);
state->tapeset =
LogicalTapeSetCreate(maxTapes, NULL,
state->shared ? &state->shared->fileset : NULL,
state->worker);
state->currentRun = 0;
for (j = 0; j < maxTapes; j++)
{
state->tp_fib[j] = 1;
state->tp_runs[j] = 0;
state->tp_dummy[j] = 1;
state->tp_tapenum[j] = j;
}
state->tp_fib[state->tapeRange] = 0;
state->tp_dummy[state->tapeRange] = 0;
state->Level = 1;
state->destTape = 0;
//变更状态为TSS_BUILDRUNS
state->status = TSS_BUILDRUNS;
}
dumptuples
清除memtuples中的元组并写入初始run到tape中
static void
dumptuples(Tuplesortstate *state, bool alltuples)
{
int memtupwrite;
int i;
if (state->memtupcount < state->memtupsize && !LACKMEM(state) &&
!alltuples)
return;
Assert(state->status == TSS_BUILDRUNS);
if (state->currentRun == INT_MAX)
ereport(ERROR,
(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
errmsg("cannot have more than %d runs for an external sort",
INT_MAX)));
state->currentRun++;
#ifdef TRACE_SORT
if (trace_sort)
elog(LOG, "worker %d starting quicksort of run %d: %s",
state->worker, state->currentRun,
pg_rusage_show(&state->ru_start));
#endif
tuplesort_sort_memtuples(state);
#ifdef TRACE_SORT
if (trace_sort)
elog(LOG, "worker %d finished quicksort of run %d: %s",
state->worker, state->currentRun,
pg_rusage_show(&state->ru_start));
#endif
//写入到tape中
memtupwrite = state->memtupcount;
for (i = 0; i < memtupwrite; i++)
{
WRITETUP(state, state->tp_tapenum[state->destTape],
&state->memtuples[i]);
state->memtupcount--;
}
MemoryContextReset(state->tuplecontext);
markrunend(state, state->tp_tapenum[state->destTape]);
state->tp_runs[state->destTape]++;
state->tp_dummy[state->destTape]--;
#ifdef TRACE_SORT
if (trace_sort)
elog(LOG, "worker %d finished writing run %d to tape %d: %s",
state->worker, state->currentRun, state->destTape,
pg_rusage_show(&state->ru_start));
#endif
//未完成所有元组的处理,分配新的tape
if (!alltuples)
selectnewtape(state);
}
三、跟踪分析
N/A
四、参考资料
Merge sort
Polyphase merge sort
Sorting Algorithms: Internal and External
免责声明:
① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。
② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341
PostgreSQL 源码解读(199)- 查询#114(排序#7 - inittapes&dumptuples)
下载Word文档到电脑,方便收藏和打印~
下载Word文档