PostgreSQL 源码解读(82)- 查询语句#67(PortalXXX系列函数)
本节介绍了PortalXXX函数,这些函数在create_simple_query中被调用,包括CreatePortal、PortalDefineQuery、PortalSetResultFormat、PortalRun和PortalDrop函数。
一、数据结构
Portal
包括场景PortalStrategy枚举定义/PortalStatus状态定义/PortalData结构体.Portal是PortalData结构体指针,详见代码注释.
typedef enum PortalStrategy
{
PORTAL_ONE_SELECT,
PORTAL_ONE_RETURNING,
PORTAL_ONE_MOD_WITH,
PORTAL_UTIL_SELECT,
PORTAL_MULTI_QUERY
} PortalStrategy;
typedef enum PortalStatus
{
PORTAL_NEW,
PORTAL_DEFINED,
PORTAL_READY,
PORTAL_ACTIVE,
PORTAL_DONE,
PORTAL_FAILED
} PortalStatus;
typedef struct PortalData *Portal;//结构体指针
typedef struct PortalData
{
const char *name;
const char *prepStmtName;
MemoryContext portalContext;
ResourceOwner resowner;
void (*cleanup) (Portal portal);
SubTransactionId createSubid;
SubTransactionId activeSubid;
//portal将会执行的查询
const char *sourceText;
const char *commandTag;
List *stmts;
CachedPlan *cplan;
ParamListInfo portalParams;
QueryEnvironment *queryEnv;
PortalStrategy strategy;
int cursorOptions;
bool run_once;
PortalStatus status;
bool portalPinned;
bool autoHeld;
//如不为NULL,执行器处于活动状态
QueryDesc *queryDesc;
//如Portal需要返回元组,这是元组的描述
TupleDesc tupDesc;
//列信息的格式码
int16 *formats;
Tuplestorestate *holdStore;
MemoryContext holdContext;
Snapshot holdSnapshot;
bool atStart;//处于开始位置?
bool atEnd;//处于结束位置?
uint64 portalPos;//实际行号
//用于表示的数据,主要由pg_cursors系统视图使用
TimestampTz creation_time;
bool visible;
} PortalData;
#define PortalIsValid(p) PointerIsValid(p)
二、源码解读
CreatePortal
CreatePortal函数创建给定名称的Portal结构.
//------------------------------------------------------ CreatePortal
Portal
CreatePortal(const char *name, bool allowDup, bool dupSilent)
{
Portal portal;
AssertArg(PointerIsValid(name));
//根据给定的名称获取portal
portal = GetPortalByName(name);
if (PortalIsValid(portal))
{
//如portal有效
if (!allowDup)//不允许同名
ereport(ERROR,
(errcode(ERRCODE_DUPLICATE_CURSOR),
errmsg("cursor \"%s\" already exists", name)));
if (!dupSilent)//是否静默警告信息
ereport(WARNING,
(errcode(ERRCODE_DUPLICATE_CURSOR),
errmsg("closing existing cursor \"%s\"",
name)));
PortalDrop(portal, false);
}
//创建新的portal结构
portal = (Portal) MemoryContextAllocZero(TopPortalContext, sizeof *portal);
//初始化portal上下文,仅仅只是结构体,不存在信息
portal->portalContext = AllocSetContextCreate(TopPortalContext,
"PortalContext",
ALLOCSET_SMALL_SIZES);
//创建resource owner
portal->resowner = ResourceOwnerCreate(CurTransactionResourceOwner,
"Portal");
//初始化portal中的域
portal->status = PORTAL_NEW;//状态
portal->cleanup = PortalCleanup;//默认的cleanup函数
portal->createSubid = GetCurrentSubTransactionId();//正在创建的subxact
portal->activeSubid = portal->createSubid;//与createSubid一致
portal->strategy = PORTAL_MULTI_QUERY;//场景为PORTAL_MULTI_QUERY
portal->cursorOptions = CURSOR_OPT_NO_SCROLL;//默认为不允许滚动的游标
portal->atStart = true;//处于开始
portal->atEnd = true;
portal->visible = true;//在pg_cursors中可见
portal->creation_time = GetCurrentStatementStartTimestamp();//创建时间
PortalHashTableInsert(portal, name);//放在HashTable中
MemoryContextSetIdentifier(portal->portalContext, portal->name);//设置内存上下文标识
return portal;//返回portal结构体
}
PortalDefineQuery
PortalDefineQuery是构建portal's query信息的一个简单过程.
//------------------------------------------------------ PortalDefineQuery
void
PortalDefineQuery(Portal portal,
const char *prepStmtName,
const char *sourceText,
const char *commandTag,
List *stmts,
CachedPlan *cplan)
{
AssertArg(PortalIsValid(portal));
AssertState(portal->status == PORTAL_NEW);
AssertArg(sourceText != NULL);
AssertArg(commandTag != NULL || stmts == NIL);
//仅用于传递参数,给portal结构赋值
portal->prepStmtName = prepStmtName;
portal->sourceText = sourceText;
portal->commandTag = commandTag;
portal->stmts = stmts;
portal->cplan = cplan;
portal->status = PORTAL_DEFINED;//设置状态为PORTAL_DEFINED
}
PortalSetResultFormat
PortalSetResultFormat函数为portal的输出选择格式化码.
//------------------------------------------------------ PortalSetResultFormat
void
PortalSetResultFormat(Portal portal, int nFormats, int16 *formats)
{
int natts;
int i;
//如portal不返回元组,则直接返回
if (portal->tupDesc == NULL)
return;
natts = portal->tupDesc->natts;
portal->formats = (int16 *)
MemoryContextAlloc(portal->portalContext,
natts * sizeof(int16));
if (nFormats > 1)
{
//对每一列进行格式化
if (nFormats != natts)
ereport(ERROR,
(errcode(ERRCODE_PROTOCOL_VIOLATION),
errmsg("bind message has %d result formats but query has %d columns",
nFormats, natts)));
memcpy(portal->formats, formats, natts * sizeof(int16));
}
else if (nFormats > 0)
{
//指定格式,用于所有列
int16 format1 = formats[0];
for (i = 0; i < natts; i++)
portal->formats[i] = format1;
}
else
{
//所有列使用默认的格式
for (i = 0; i < natts; i++)
portal->formats[i] = 0;
}
}
PortalRun
PortalRun执行portal单个查询或多个查询
//------------------------------------------------------ PortalRun
bool
PortalRun(Portal portal, long count, bool isTopLevel, bool run_once,
DestReceiver *dest, DestReceiver *altdest,
char *completionTag)
{
bool result;
uint64 nprocessed;
ResourceOwner saveTopTransactionResourceOwner;
MemoryContext saveTopTransactionContext;
Portal saveActivePortal;
ResourceOwner saveResourceOwner;
MemoryContext savePortalContext;
MemoryContext saveMemoryContext;
AssertArg(PortalIsValid(portal));
TRACE_POSTGRESQL_QUERY_EXECUTE_START();
//初始化completionTag为空串
if (completionTag)
completionTag[0] = '\0';
if (log_executor_stats && portal->strategy != PORTAL_MULTI_QUERY)
{
elog(DEBUG3, "PortalRun");
ResetUsage();
}
MarkPortalActive(portal);
//设置run_once标记,如果先前已设置,则不要清除此标记
Assert(!portal->run_once || run_once);
portal->run_once = run_once;
//保存"现场"
saveTopTransactionResourceOwner = TopTransactionResourceOwner;
saveTopTransactionContext = TopTransactionContext;
saveActivePortal = ActivePortal;
saveResourceOwner = CurrentResourceOwner;
savePortalContext = PortalContext;
saveMemoryContext = CurrentMemoryContext;
PG_TRY();
{
ActivePortal = portal;
if (portal->resowner)
CurrentResourceOwner = portal->resowner;
PortalContext = portal->portalContext;
MemoryContextSwitchTo(PortalContext);
switch (portal->strategy)//根据场景执行不同的逻辑
{
case PORTAL_ONE_SELECT:
case PORTAL_ONE_RETURNING:
case PORTAL_ONE_MOD_WITH:
case PORTAL_UTIL_SELECT:
if (portal->strategy != PORTAL_ONE_SELECT && !portal->holdStore)
FillPortalStore(portal, isTopLevel);
nprocessed = PortalRunSelect(portal, true, count, dest);
if (completionTag && portal->commandTag)
{
if (strcmp(portal->commandTag, "SELECT") == 0)
snprintf(completionTag, COMPLETION_TAG_BUFSIZE,
"SELECT " UINT64_FORMAT, nprocessed);
else
strcpy(completionTag, portal->commandTag);
}
//标记portal为PORTAL_READY
portal->status = PORTAL_READY;
result = portal->atEnd;
break;
case PORTAL_MULTI_QUERY:
PortalRunMulti(portal, isTopLevel, false,
dest, altdest, completionTag);
//防止portal命令重复执行
MarkPortalDone(portal);
//在RunMulti最后设置result为T
result = true;
break;
default://错误的场景
elog(ERROR, "unrecognized portal strategy: %d",
(int) portal->strategy);
result = false;
break;
}
}
PG_CATCH();
{
//未捕获的错误,设置portal状态为dead
MarkPortalFailed(portal);
//恢复全局的vars并抛出错误
if (saveMemoryContext == saveTopTransactionContext)
MemoryContextSwitchTo(TopTransactionContext);
else
MemoryContextSwitchTo(saveMemoryContext);
ActivePortal = saveActivePortal;
if (saveResourceOwner == saveTopTransactionResourceOwner)
CurrentResourceOwner = TopTransactionResourceOwner;
else
CurrentResourceOwner = saveResourceOwner;
PortalContext = savePortalContext;
PG_RE_THROW();
}
PG_END_TRY();
if (saveMemoryContext == saveTopTransactionContext)
MemoryContextSwitchTo(TopTransactionContext);
else
MemoryContextSwitchTo(saveMemoryContext);
ActivePortal = saveActivePortal;
if (saveResourceOwner == saveTopTransactionResourceOwner)
CurrentResourceOwner = TopTransactionResourceOwner;
else
CurrentResourceOwner = saveResourceOwner;
PortalContext = savePortalContext;
if (log_executor_stats && portal->strategy != PORTAL_MULTI_QUERY)
ShowUsage("EXECUTOR STATISTICS");
TRACE_POSTGRESQL_QUERY_EXECUTE_DONE();
return result;
}
PortalDrop
PortalDrop函数销毁portal结构体
//------------------------------------------------------ PortalDrop
void
PortalDrop(Portal portal, bool isTopCommit)
{
AssertArg(PortalIsValid(portal));
if (portal->portalPinned)
ereport(ERROR,
(errcode(ERRCODE_INVALID_CURSOR_STATE),
errmsg("cannot drop pinned portal \"%s\"", portal->name)));
if (portal->status == PORTAL_ACTIVE)
ereport(ERROR,
(errcode(ERRCODE_INVALID_CURSOR_STATE),
errmsg("cannot drop active portal \"%s\"", portal->name)));
if (PointerIsValid(portal->cleanup))
{
portal->cleanup(portal);
portal->cleanup = NULL;
}
PortalHashTableDelete(portal);
//清除已缓存的plan引用
PortalReleaseCachedPlan(portal);
if (portal->holdSnapshot)
{
if (portal->resowner)
UnregisterSnapshotFromOwner(portal->holdSnapshot,
portal->resowner);
portal->holdSnapshot = NULL;
}
if (portal->resowner &&
(!isTopCommit || portal->status == PORTAL_FAILED))
{
bool isCommit = (portal->status != PORTAL_FAILED);
ResourceOwnerRelease(portal->resowner,
RESOURCE_RELEASE_BEFORE_LOCKS,
isCommit, false);
ResourceOwnerRelease(portal->resowner,
RESOURCE_RELEASE_LOCKS,
isCommit, false);
ResourceOwnerRelease(portal->resowner,
RESOURCE_RELEASE_AFTER_LOCKS,
isCommit, false);
ResourceOwnerDelete(portal->resowner);
}
portal->resowner = NULL;
if (portal->holdStore)
{
MemoryContext oldcontext;
oldcontext = MemoryContextSwitchTo(portal->holdContext);
tuplestore_end(portal->holdStore);
MemoryContextSwitchTo(oldcontext);
portal->holdStore = NULL;
}
//删除tuplestore存储
if (portal->holdContext)
MemoryContextDelete(portal->holdContext);
//释放portalContext存储
MemoryContextDelete(portal->portalContext);
//释放portal结构体(在TopPortalContext中)
pfree(portal);
}
三、跟踪分析
测试脚本如下
testdb=# explain select dw.*,grjf.grbh,grjf.xm,grjf.ny,grjf.je
testdb-# from t_dwxx dw,lateral (select gr.grbh,gr.xm,jf.ny,jf.je
testdb(# from t_grxx gr inner join t_jfxx jf
testdb(# on gr.dwbh = dw.dwbh
testdb(# and gr.grbh = jf.grbh) grjf
testdb-# order by dw.dwbh;
QUERY PLAN
------------------------------------------------------------------------------------------
Sort (cost=20070.93..20320.93 rows=100000 width=47)
Sort Key: dw.dwbh
-> Hash Join (cost=3754.00..8689.61 rows=100000 width=47)
Hash Cond: ((gr.dwbh)::text = (dw.dwbh)::text)
-> Hash Join (cost=3465.00..8138.00 rows=100000 width=31)
Hash Cond: ((jf.grbh)::text = (gr.grbh)::text)
-> Seq Scan on t_jfxx jf (cost=0.00..1637.00 rows=100000 width=20)
-> Hash (cost=1726.00..1726.00 rows=100000 width=16)
-> Seq Scan on t_grxx gr (cost=0.00..1726.00 rows=100000 width=16)
-> Hash (cost=164.00..164.00 rows=10000 width=20)
-> Seq Scan on t_dwxx dw (cost=0.00..164.00 rows=10000 width=20)
(11 rows)
启动gdb,设置断点,进入exec_simple_query
(gdb) b exec_simple_query
Breakpoint 1 at 0x8c59af: file postgres.c, line 893.
(gdb) c
Continuing.
Breakpoint 1, exec_simple_query (
query_string=0x2a9eeb8 "select dw.*,grjf.grbh,grjf.xm,grjf.ny,grjf.je \nfrom t_dwxx dw,lateral (select gr.grbh,gr.xm,jf.ny,jf.je \n", ' ' <repeats 24 times>, "from t_grxx gr inner join t_jfxx jf \n", ' ' <repeats 34 times>...) at postgres.c:893
893 CommandDest dest = whereToSendOutput;
(gdb)
进入CreatePortal
1058 CHECK_FOR_INTERRUPTS();
(gdb)
1064 portal = CreatePortal("", true, true);
(gdb) step
CreatePortal (name=0xc5b7d8 "", allowDup=true, dupSilent=true) at portalmem.c:179
179 AssertArg(PointerIsValid(name));
CreatePortal-->设置portal的相关信息
216 portal->atEnd = true;
(gdb)
217 portal->visible = true;
(gdb)
218 portal->creation_time = GetCurrentStatementStartTimestamp();
(gdb)
221 PortalHashTableInsert(portal, name);
(gdb)
224 MemoryContextSetIdentifier(portal->portalContext, portal->name);
(gdb)
226 return portal;
CreatePortal-->查看portal结构体
(gdb) p *portal
$1 = {name = 0x2b07e90 "", prepStmtName = 0x0, portalContext = 0x2b8b7a0, resowner = 0x2acfe80,
cleanup = 0x6711b6 <PortalCleanup>, createSubid = 1, activeSubid = 1, sourceText = 0x0, commandTag = 0x0, stmts = 0x0,
cplan = 0x0, portalParams = 0x0, queryEnv = 0x0, strategy = PORTAL_MULTI_QUERY, cursorOptions = 4, run_once = false,
status = PORTAL_NEW, portalPinned = false, autoHeld = false, queryDesc = 0x0, tupDesc = 0x0, formats = 0x0,
holdStore = 0x0, holdContext = 0x0, holdSnapshot = 0x0, atStart = true, atEnd = true, portalPos = 0,
creation_time = 595049454962775, visible = true}
回到exec_simple_query
(gdb)
exec_simple_query (
query_string=0x2a9eeb8 "select dw.*,grjf.grbh,grjf.xm,grjf.ny,grjf.je \nfrom t_dwxx dw,lateral (select gr.grbh,gr.xm,jf.ny,jf.je \n", ' ' <repeats 24 times>, "from t_grxx gr inner join t_jfxx jf \n", ' ' <repeats 34 times>...) at postgres.c:1066
1066 portal->visible = false;
进入PortalDefineQuery
(gdb)
1073 PortalDefineQuery(portal,
(gdb) step
PortalDefineQuery (portal=0x2b04468, prepStmtName=0x0,
sourceText=0x2a9eeb8 "select dw.*,grjf.grbh,grjf.xm,grjf.ny,grjf.je \nfrom t_dwxx dw,lateral (select gr.grbh,gr.xm,jf.ny,jf.je \n", ' ' <repeats 24 times>, "from t_grxx gr inner join t_jfxx jf \n", ' ' <repeats 34 times>...,
commandTag=0xc5eed5 "SELECT", stmts=0x2b86800, cplan=0x0) at portalmem.c:288
288 AssertArg(PortalIsValid(portal));
PortalDefineQuery-->设置相关参数
294 portal->prepStmtName = prepStmtName;
(gdb)
295 portal->sourceText = sourceText;
(gdb)
296 portal->commandTag = commandTag;
(gdb)
297 portal->stmts = stmts;
(gdb)
298 portal->cplan = cplan;
(gdb)
299 portal->status = PORTAL_DEFINED;
(gdb)
300 }
PortalDefineQuery-->查看portal结构体
(gdb) p *portal
$2 = {name = 0x2b07e90 "", prepStmtName = 0x0, portalContext = 0x2b8b7a0, resowner = 0x2acfe80,
cleanup = 0x6711b6 <PortalCleanup>, createSubid = 1, activeSubid = 1,
sourceText = 0x2a9eeb8 "select dw.*,grjf.grbh,grjf.xm,grjf.ny,grjf.je \nfrom t_dwxx dw,lateral (select gr.grbh,gr.xm,jf.ny,jf.je \n", ' ' <repeats 24 times>, "from t_grxx gr inner join t_jfxx jf \n", ' ' <repeats 34 times>...,
commandTag = 0xc5eed5 "SELECT", stmts = 0x2b86800, cplan = 0x0, portalParams = 0x0, queryEnv = 0x0,
strategy = PORTAL_MULTI_QUERY, cursorOptions = 4, run_once = false, status = PORTAL_DEFINED, portalPinned = false,
autoHeld = false, queryDesc = 0x0, tupDesc = 0x0, formats = 0x0, holdStore = 0x0, holdContext = 0x0, holdSnapshot = 0x0,
atStart = true, atEnd = true, portalPos = 0, creation_time = 595049454962775, visible = false}
回到exec_simple_query,进入PortalSetResultFormat
(gdb)
1105 PortalSetResultFormat(portal, 1, &format);
(gdb) step
PortalSetResultFormat (portal=0x2b04468, nFormats=1, formats=0x7ffff7153cbe) at pquery.c:633
633 if (portal->tupDesc == NULL)
PortalSetResultFormat-->需返回元组,nFormats为1
...
(gdb) p *portal->tupDesc
$4 = {natts = 7, tdtypeid = 2249, tdtypmod = -1, tdhasoid = false, tdrefcount = -1, constr = 0x0, attrs = 0x2b989c8}
(gdb)
(gdb) p nFormats
$5 = 1
PortalSetResultFormat-->格式码为0
(gdb) p *portal->formats
$7 = 0
回到exec_simple_query,进入PortalRun
(gdb)
1122 (void) PortalRun(portal,
(gdb) step
PortalRun (portal=0x2b04468, count=9223372036854775807, isTopLevel=true, run_once=true, dest=0x2b86838, altdest=0x2b86838,
completionTag=0x7ffff7153c70 ":\001") at pquery.c:702
702 AssertArg(PortalIsValid(portal));
PortalRun-->初始化completionTag为空串
707 if (completionTag)
(gdb)
708 completionTag[0] = '\0';
(gdb) p *completionTag
$12 = 0 '\000'
PortalRun-->设置状态为active等
(gdb) p portal->status
$15 = PORTAL_ACTIVE
(gdb) p portal->run_once
$16 = true
PortalRun-->保护"现场"
(gdb) n
741 saveTopTransactionContext = TopTransactionContext;
(gdb)
742 saveActivePortal = ActivePortal;
(gdb)
743 saveResourceOwner = CurrentResourceOwner;
(gdb)
744 savePortalContext = PortalContext;
(gdb)
745 saveMemoryContext = CurrentMemoryContext;
PortalRun-->开始执行
(gdb)
746 PG_TRY();
PortalRun-->根据场景调用相应的函数,在这里是PortalRunSelect
...
(gdb)
755 switch (portal->strategy)
(gdb)
767 if (portal->strategy != PORTAL_ONE_SELECT && !portal->holdStore)
(gdb) n
773 nprocessed = PortalRunSelect(portal, true, count, dest);
PortalRun-->处理行数的计数
(gdb) p nprocessed
$17 = 99991
设置命令完成标记
(gdb) n
782 if (strcmp(portal->commandTag, "SELECT") == 0)
(gdb)
783 snprintf(completionTag, COMPLETION_TAG_BUFSIZE,
设置portal状态为PORTAL_READY,结果为T
(gdb)
790 portal->status = PORTAL_READY;
(gdb) p portal->status
$18 = PORTAL_ACTIVE
(gdb) n
795 result = portal->atEnd;
(gdb)
796 break;
(gdb) p result
$19 = true
恢复"现场",返回结果
...
846 PortalContext = savePortalContext;
(gdb)
848 if (log_executor_stats && portal->strategy != PORTAL_MULTI_QUERY)
(gdb)
851 TRACE_POSTGRESQL_QUERY_EXECUTE_DONE();
(gdb)
853 return result;
(gdb)
854 }
回到exec_simple_query,进入PortalDrop
(gdb) n
exec_simple_query (
query_string=0x2a9eeb8 "select dw.*,grjf.grbh,grjf.xm,grjf.ny,grjf.je \nfrom t_dwxx dw,lateral (select gr.grbh,gr.xm,jf.ny,jf.je \n", ' ' <repeats 24 times>, "from t_grxx gr inner join t_jfxx jf \n", ' ' <repeats 34 times>...) at postgres.c:1130
1130 receiver->rDestroy(receiver);
(gdb)
1132 PortalDrop(portal, false);
PortalDrop-->释放资源
...
(gdb)
589 MemoryContextDelete(portal->portalContext);
(gdb)
592 pfree(portal);
(gdb)
593 }
DONE!
四、参考资料
postgres.c
PG Document:Query Planning
免责声明:
① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。
② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341
PostgreSQL 源码解读(82)- 查询语句#67(PortalXXX系列函数)
下载Word文档到电脑,方便收藏和打印~