PostgreSQL怎么调用mergeruns函数
这篇文章主要介绍“PostgreSQL怎么调用mergeruns函数”,在日常操作中,相信很多人在PostgreSQL怎么调用mergeruns函数问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”PostgreSQL怎么调用mergeruns函数”的疑惑有所帮助!接下来,请跟着小编一起来学习吧!
TupleTableSlot
执行器在”tuple table”中存储元组,这个表是各自独立的TupleTableSlots链表.
typedef struct TupleTableSlot
{
NodeTag type;//Node标记
//如slot为空,则为T
bool tts_isempty;
//是否需要pfree tts_tuple?
bool tts_shouldFree;
//是否需要pfree tts_mintuple?
bool tts_shouldFreeMin;
#define FIELDNO_TUPLETABLESLOT_SLOW 4
//为slot_deform_tuple存储状态?
bool tts_slow;
#define FIELDNO_TUPLETABLESLOT_TUPLE 5
//物理元组,如为虚拟元组则为NULL
HeapTuple tts_tuple;
#define FIELDNO_TUPLETABLESLOT_TUPLEDESCRIPTOR 6
//slot中的元组描述符
TupleDesc tts_tupleDescriptor;
//slot所在的上下文
MemoryContext tts_mcxt;
//元组缓存,如无则为InvalidBuffer
Buffer tts_buffer;
#define FIELDNO_TUPLETABLESLOT_NVALID 9
//tts_values中的有效值
int tts_nvalid;
#define FIELDNO_TUPLETABLESLOT_VALUES 10
//当前每个属性的值
Datum *tts_values;
#define FIELDNO_TUPLETABLESLOT_ISNULL 11
//isnull数组
bool *tts_isnull;
//minimal元组,如无则为NULL
MinimalTuple tts_mintuple;
//在minimal情况下的工作空间
HeapTupleData tts_minhdr;
#define FIELDNO_TUPLETABLESLOT_OFF 14
//slot_deform_tuple的存储状态
uint32 tts_off;
//不能被变更的描述符(固定描述符)
bool tts_fixedTupleDescriptor;
} TupleTableSlot;
typedef struct TupleTableSlot
{
NodeTag type;//Node标记
#define FIELDNO_TUPLETABLESLOT_FLAGS 1
uint16 tts_flags;
#define FIELDNO_TUPLETABLESLOT_NVALID 2
AttrNumber tts_nvalid;
const TupleTableSlotOps *const tts_ops;
#define FIELDNO_TUPLETABLESLOT_TUPLEDESCRIPTOR 4
TupleDesc tts_tupleDescriptor;
#define FIELDNO_TUPLETABLESLOT_VALUES 5
Datum *tts_values;
#define FIELDNO_TUPLETABLESLOT_ISNULL 6
bool *tts_isnull;
MemoryContext tts_mcxt;
} TupleTableSlot;
//TupleTableSlot的"小程序"
struct TupleTableSlotOps
{
//slot的最小化大小
size_t base_slot_size;
//初始化方法
void (*init)(TupleTableSlot *slot);
//析构方法
void (*release)(TupleTableSlot *slot);
void (*clear)(TupleTableSlot *slot);
void (*getsomeattrs)(TupleTableSlot *slot, int natts);
Datum (*getsysattr)(TupleTableSlot *slot, int attnum, bool *isnull);
void (*materialize)(TupleTableSlot *slot);
void (*copyslot) (TupleTableSlot *dstslot, TupleTableSlot *class="lazy" data-srcslot);
HeapTuple (*get_heap_tuple)(TupleTableSlot *slot);
MinimalTuple (*get_minimal_tuple)(TupleTableSlot *slot);
HeapTuple (*copy_heap_tuple)(TupleTableSlot *slot);
MinimalTuple (*copy_minimal_tuple)(TupleTableSlot *slot);
};
typedef struct tupleDesc
{
int natts;
Oid tdtypeid;
int32 tdtypmod;
int tdrefcount;
TupleConstr *constr;
//attrs[N]是第N+1个属性的描述符
FormData_pg_attribute attrs[FLEXIBLE_ARRAY_MEMBER];
} *TupleDesc;
SortState
排序运行期状态信息
typedef struct SortState
{
//基类
ScanState ss;
//是否需要随机访问排序输出?
bool randomAccess;
//结果集是否存在边界?
bool bounded;
//如存在边界,需要多少个元组?
int64 bound;
//是否已完成排序?
bool sort_Done;
//是否使用有界值?
bool bounded_Done;
//使用的有界值?
int64 bound_Done;
//tuplesort.c的私有状态
void *tuplesortstate;
//是否worker?
bool am_worker;
//每个worker对应一个条目
SharedSortInfo *shared_info;
} SortState;
typedef struct SharedSortInfo
{
//worker个数?
int num_workers;
//排序机制
TuplesortInstrumentation sinstrument[FLEXIBLE_ARRAY_MEMBER];
} SharedSortInfo;
TuplesortInstrumentation
报告排序统计的数据结构.
typedef enum
{
SORT_TYPE_STILL_IN_PROGRESS = 0,//仍然在排序中
SORT_TYPE_TOP_N_HEAPSORT,//TOP N 堆排序
SORT_TYPE_QUICKSORT,//快速排序
SORT_TYPE_EXTERNAL_SORT,//外排序
SORT_TYPE_EXTERNAL_MERGE//外排序后的合并
} TuplesortMethod;//排序方法
typedef enum
{
SORT_SPACE_TYPE_DISK,//需要用上磁盘
SORT_SPACE_TYPE_MEMORY//使用内存
} TuplesortSpaceType;
typedef struct TuplesortInstrumentation
{
//使用的排序算法
TuplesortMethod sortMethod;
//排序使用空间类型
TuplesortSpaceType spaceType;
//空间消耗(以K为单位)
long spaceUsed;
} TuplesortInstrumentation;
二、源码解读
mergeruns归并所有已完成初始轮的数据.
static void
mergeruns(Tuplesortstate *state)
{
int tapenum,
svTape,
svRuns,
svDummy;
int numTapes;
int numInputTapes;
Assert(state->status == TSS_BUILDRUNS);
Assert(state->memtupcount == 0);
if (state->sortKeys != NULL && state->sortKeys->abbrev_converter != NULL)
{
state->sortKeys->abbrev_converter = NULL;
state->sortKeys->comparator = state->sortKeys->abbrev_full_comparator;
//非严格性需要,但需要tidy
state->sortKeys->abbrev_abort = NULL;
state->sortKeys->abbrev_full_comparator = NULL;
}
MemoryContextDelete(state->tuplecontext);
state->tuplecontext = NULL;
FREEMEM(state, GetMemoryChunkSpace(state->memtuples));
pfree(state->memtuples);
state->memtuples = NULL;
if (state->Level == 1)
{
numInputTapes = state->currentRun;
numTapes = numInputTapes + 1;
FREEMEM(state, (state->maxTapes - numTapes) * TAPE_BUFFER_OVERHEAD);
}
else
{
numInputTapes = state->tapeRange;
numTapes = state->maxTapes;
}
if (state->tuples)
init_slab_allocator(state, numInputTapes + 1);
else
init_slab_allocator(state, 0);
state->memtupsize = numInputTapes;
state->memtuples = (SortTuple *) palloc(numInputTapes * sizeof(SortTuple));
USEMEM(state, GetMemoryChunkSpace(state->memtuples));
#ifdef TRACE_SORT
if (trace_sort)
elog(LOG, "worker %d using " INT64_FORMAT " KB of memory for read buffers among %d input tapes",
state->worker, state->availMem / 1024, numInputTapes);
#endif
state->read_buffer_size = Max(state->availMem / numInputTapes, 0);
USEMEM(state, state->read_buffer_size * numInputTapes);
//D2完成,倒回所有输出tapes准备归并
for (tapenum = 0; tapenum < state->tapeRange; tapenum++)
LogicalTapeRewindForRead(state->tapeset, tapenum, state->read_buffer_size);
for (;;)
{
//------------- 循环
if (!state->randomAccess && !WORKER(state))
{
bool allOneRun = true;
Assert(state->tp_runs[state->tapeRange] == 0);
for (tapenum = 0; tapenum < state->tapeRange; tapenum++)
{
if (state->tp_runs[tapenum] + state->tp_dummy[tapenum] != 1)
{
allOneRun = false;
break;
}
}
if (allOneRun)
{
//通知logtape.c,不再写入.
LogicalTapeSetForgetFreeSpace(state->tapeset);
//为最终的归并做准备
beginmerge(state);
state->status = TSS_FINALMERGE;
return;
}
}
//步骤D5:归并runs到tape[T]中直至tape[P]为空
while (state->tp_runs[state->tapeRange - 1] ||
state->tp_dummy[state->tapeRange - 1])
{
bool allDummy = true;
for (tapenum = 0; tapenum < state->tapeRange; tapenum++)
{
if (state->tp_dummy[tapenum] == 0)
{
allDummy = false;
break;
}
}
if (allDummy)
{
state->tp_dummy[state->tapeRange]++;
for (tapenum = 0; tapenum < state->tapeRange; tapenum++)
state->tp_dummy[tapenum]--;
}
else
mergeonerun(state);
}
//步骤D6:往上层汇总
if (--state->Level == 0)
break;
//倒回输入的Tape T作为新的输入
LogicalTapeRewindForRead(state->tapeset, state->tp_tapenum[state->tapeRange],
state->read_buffer_size);
//倒回使用上的输入tape P,并为写入轮准备
LogicalTapeRewindForWrite(state->tapeset, state->tp_tapenum[state->tapeRange - 1]);
state->tp_runs[state->tapeRange - 1] = 0;
svTape = state->tp_tapenum[state->tapeRange];
svDummy = state->tp_dummy[state->tapeRange];
svRuns = state->tp_runs[state->tapeRange];
for (tapenum = state->tapeRange; tapenum > 0; tapenum--)
{
state->tp_tapenum[tapenum] = state->tp_tapenum[tapenum - 1];
state->tp_dummy[tapenum] = state->tp_dummy[tapenum - 1];
state->tp_runs[tapenum] = state->tp_runs[tapenum - 1];
}
state->tp_tapenum[0] = svTape;
state->tp_dummy[0] = svDummy;
state->tp_runs[0] = svRuns;
}
state->result_tape = state->tp_tapenum[state->tapeRange];
if (!WORKER(state))
LogicalTapeFreeze(state->tapeset, state->result_tape, NULL);
else
worker_freeze_result_tape(state);
state->status = TSS_SORTEDONTAPE;
//通过倒回tapes,释放所有其他tapes的读缓存
for (tapenum = 0; tapenum < state->maxTapes; tapenum++)
{
if (tapenum != state->result_tape)
LogicalTapeRewindForWrite(state->tapeset, tapenum);
}
}
三、跟踪分析
测试脚本
select * from t_sort order by c1,c2;
跟踪分析
(gdb) b mergeruns
Breakpoint 1 at 0xa73508: file tuplesort.c, line 2570.
(gdb)
Note: breakpoint 1 also set at pc 0xa73508.
Breakpoint 2 at 0xa73508: file tuplesort.c, line 2570.
输入参数
(gdb) c
Continuing.
Breakpoint 1, mergeruns (state=0x2b808a8) at tuplesort.c:2570
2570 Assert(state->status == TSS_BUILDRUNS);
(gdb) p *state
$1 = {status = TSS_BUILDRUNS, nKeys = 2, randomAccess = false, bounded = false, boundUsed = false, bound = 0,
tuples = true, availMem = 3164456, allowedMem = 4194304, maxTapes = 16, tapeRange = 15, sortcontext = 0x2b80790,
tuplecontext = 0x2b827a0, tapeset = 0x2b81480, comparetup = 0xa7525b <comparetup_heap>,
copytup = 0xa76247 <copytup_heap>, writetup = 0xa76de1 <writetup_heap>, readtup = 0xa76ec6 <readtup_heap>,
memtuples = 0x7f0cfeb14050, memtupcount = 0, memtupsize = 37448, growmemtuples = false, slabAllocatorUsed = false,
slabMemoryBegin = 0x0, slabMemoryEnd = 0x0, slabFreeHead = 0x0, read_buffer_size = 0, lastReturnedTuple = 0x0,
currentRun = 3, mergeactive = 0x2b81350, Level = 1, destTape = 2, tp_fib = 0x2b80d58, tp_runs = 0x2b81378,
tp_dummy = 0x2b813d0, tp_tapenum = 0x2b81428, activeTapes = 0, result_tape = -1, current = 0, eof_reached = false,
markpos_block = 0, markpos_offset = 0, markpos_eof = false, worker = -1, shared = 0x0, nParticipants = -1,
tupDesc = 0x2b67ae0, sortKeys = 0x2b80cc0, onlyKey = 0x0, abbrevNext = 10, indexInfo = 0x0, estate = 0x0, heapRel = 0x0,
indexRel = 0x0, enforceUnique = false, high_mask = 0, low_mask = 0, max_buckets = 0, datumType = 0, datumTypeLen = 0,
ru_start = {tv = {tv_sec = 0, tv_usec = 0}, ru = {ru_utime = {tv_sec = 0, tv_usec = 0}, ru_stime = {tv_sec = 0,
tv_usec = 0}, {ru_maxrss = 0, __ru_maxrss_word = 0}, {ru_ixrss = 0, __ru_ixrss_word = 0}, {ru_idrss = 0,
__ru_idrss_word = 0}, {ru_isrss = 0, __ru_isrss_word = 0}, {ru_minflt = 0, __ru_minflt_word = 0}, {ru_majflt = 0,
__ru_majflt_word = 0}, {ru_nswap = 0, __ru_nswap_word = 0}, {ru_inblock = 0, __ru_inblock_word = 0}, {
ru_oublock = 0, __ru_oublock_word = 0}, {ru_msgsnd = 0, __ru_msgsnd_word = 0}, {ru_msgrcv = 0,
__ru_msgrcv_word = 0}, {ru_nsignals = 0, __ru_nsignals_word = 0}, {ru_nvcsw = 0, __ru_nvcsw_word = 0}, {
ru_nivcsw = 0, __ru_nivcsw_word = 0}}}}
(gdb)
排序键等信息
(gdb) n
2571 Assert(state->memtupcount == 0);
(gdb)
2573 if (state->sortKeys != NULL && state->sortKeys->abbrev_converter != NULL)
(gdb) p *state->sortKeys
$2 = {ssup_cxt = 0x2b80790, ssup_collation = 0, ssup_reverse = false, ssup_nulls_first = false, ssup_attno = 2,
ssup_extra = 0x0, comparator = 0x4fd4af <btint4fastcmp>, abbreviate = true, abbrev_converter = 0x0, abbrev_abort = 0x0,
abbrev_full_comparator = 0x0}
(gdb) p *state->sortKeys->abbrev_converter
Cannot access memory at address 0x0
重置元组内存,不再需要大块的memtuples数组.
(gdb) n
2593 MemoryContextDelete(state->tuplecontext);
(gdb)
2594 state->tuplecontext = NULL;
(gdb)
(gdb) n
2600 FREEMEM(state, GetMemoryChunkSpace(state->memtuples));
(gdb)
2601 pfree(state->memtuples);
(gdb)
2602 state->memtuples = NULL;
(gdb)
2613 if (state->Level == 1)
(gdb)
计算Tapes数
(gdb) n
2615 numInputTapes = state->currentRun;
(gdb) p state->currentRun
$3 = 3
(gdb) p state->Level
$4 = 1
(gdb) p state->tapeRange
$5 = 15
(gdb) p state->maxTapes
$6 = 16
(gdb) n
2616 numTapes = numInputTapes + 1;
(gdb)
2617 FREEMEM(state, (state->maxTapes - numTapes) * TAPE_BUFFER_OVERHEAD);
(gdb)
2634 if (state->tuples)
(gdb) p numInputTapes
$7 = 3
(gdb) p numTapes
$8 = 4
(gdb)
初始化slab分配器/为堆分配新的’memtuples’数组/倒回所有输出tapes准备归并
(gdb) n
2635 init_slab_allocator(state, numInputTapes + 1);
(gdb) n
2643 state->memtupsize = numInputTapes;
(gdb)
2644 state->memtuples = (SortTuple *) palloc(numInputTapes * sizeof(SortTuple));
(gdb)
2645 USEMEM(state, GetMemoryChunkSpace(state->memtuples));
(gdb) p state->memtupsize
$9 = 3
(gdb) n
2662 if (trace_sort)
(gdb)
2667 state->read_buffer_size = Max(state->availMem / numInputTapes, 0);
(gdb)
2668 USEMEM(state, state->read_buffer_size * numInputTapes);
(gdb) p state->read_buffer_size
$10 = 1385762
(gdb) n
2671 for (tapenum = 0; tapenum < state->tapeRange; tapenum++)
(gdb)
2672 LogicalTapeRewindForRead(state->tapeset, tapenum, state->read_buffer_size);
(gdb) p state->tapeRange
$11 = 15
(gdb) p state->status
$12 = TSS_BUILDRUNS
(gdb)
进入循环
2671 for (tapenum = 0; tapenum < state->tapeRange; tapenum++)
(gdb)
2682 if (!state->randomAccess && !WORKER(state))
(gdb)
2684 bool allOneRun = true;
(gdb) p state->randomAccess
$15 = false
(gdb) p WORKER(state)
$16 = 0
(gdb)
循环判断allOneRun是否为F
2687 for (tapenum = 0; tapenum < state->tapeRange; tapenum++)
(gdb)
2695 if (allOneRun)
(gdb) p allOneRun
$19 = true
(gdb)
开始归并,并设置状态,返回
(gdb) n
2698 LogicalTapeSetForgetFreeSpace(state->tapeset);
(gdb)
2700 beginmerge(state);
(gdb)
2701 state->status = TSS_FINALMERGE;
(gdb)
2702 return;
(gdb)
2779 }
(gdb)
tuplesort_performsort (state=0x2b808a8) at tuplesort.c:1866
1866 state->eof_reached = false;
(gdb)
完成排序
(gdb) n
1867 state->markpos_block = 0L;
(gdb)
1868 state->markpos_offset = 0;
(gdb)
1869 state->markpos_eof = false;
(gdb)
1870 break;
(gdb)
1878 if (trace_sort)
(gdb)
1890 MemoryContextSwitchTo(oldcontext);
(gdb)
1891 }
(gdb)
ExecSort (pstate=0x2b67640) at nodeSort.c:123
123 estate->es_direction = dir;
(gdb) c
Continuing.
到此,关于“PostgreSQL怎么调用mergeruns函数”的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注亿速云网站,小编会继续努力为大家带来更多实用的文章!
免责声明:
① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。
② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341