我的编程空间,编程开发者的网络收藏夹
学习永远不晚

PostgreSQL 源码解读(154)- 后台进程#6(walsender#2)

短信预约 -IT技能 免费直播动态提醒
省份

北京

  • 北京
  • 上海
  • 天津
  • 重庆
  • 河北
  • 山东
  • 辽宁
  • 黑龙江
  • 吉林
  • 甘肃
  • 青海
  • 河南
  • 江苏
  • 湖北
  • 湖南
  • 江西
  • 浙江
  • 广东
  • 云南
  • 福建
  • 海南
  • 山西
  • 四川
  • 陕西
  • 贵州
  • 安徽
  • 广西
  • 内蒙
  • 西藏
  • 新疆
  • 宁夏
  • 兵团
手机号立即预约

请填写图片验证码后获取短信验证码

看不清楚,换张图片

免费获取短信验证码

PostgreSQL 源码解读(154)- 后台进程#6(walsender#2)

本节继续介绍PostgreSQL的后台进程walsender,重点介绍的是调用栈中的exec_replication_command和StartReplication函数.
调用栈如下:



(gdb) bt
#0  0x00007fb6e6390903 in __epoll_wait_nocancel () from /lib64/libc.so.6
#1  0x000000000088e668 in WaitEventSetWaitBlock (set=0x10ac808, cur_timeout=29999, occurred_events=0x7ffd634441b0, 
    nevents=1) at latch.c:1048
#2  0x000000000088e543 in WaitEventSetWait (set=0x10ac808, timeout=29999, occurred_events=0x7ffd634441b0, nevents=1, 
    wait_event_info=83886092) at latch.c:1000
#3  0x000000000088dcec in WaitLatchOrSocket (latch=0x7fb6dcbfc4d4, wakeEvents=27, sock=10, timeout=29999, 
    wait_event_info=83886092) at latch.c:385
#4  0x000000000085405b in WalSndLoop (send_data=0x8547fe <XLogSendPhysical>) at walsender.c:2229
#5  0x0000000000851c93 in StartReplication (cmd=0x10ab750) at walsender.c:684
#6  0x00000000008532f0 in exec_replication_command (cmd_string=0x101dd78 "START_REPLICATION 0/5D000000 TIMELINE 16")
    at walsender.c:1539
#7  0x00000000008c0170 in PostgresMain (argc=1, argv=0x1049cb8, dbname=0x1049ba8 "", username=0x1049b80 "replicator")
    at postgres.c:4178
#8  0x000000000081e06c in BackendRun (port=0x103fb50) at postmaster.c:4361
#9  0x000000000081d7df in BackendStartup (port=0x103fb50) at postmaster.c:4033
#10 0x0000000000819bd9 in ServerLoop () at postmaster.c:1706
#11 0x000000000081948f in PostmasterMain (argc=1, argv=0x1018a50) at postmaster.c:1379
#12 0x0000000000742931 in main (argc=1, argv=0x1018a50) at main.c:228

一、数据结构

StringInfo
StringInfoData结构体保存关于扩展字符串的相关信息.




typedef struct StringInfoData
{
    char       *data;
    int         len;
    int         maxlen;
    int         cursor;
} StringInfoData;
typedef StringInfoData *StringInfo;

二、源码解读

exec_replication_command
exec_replication_command执行复制命令,如cmd_string被识别为WalSender命令,返回T,否则返回F.
其主要逻辑如下:
1.执行相关初始化和校验
2.切换内存上下文
3.初始化复制扫描器
4.执行事务相关的判断或校验
5.初始化输入输出消息
6.根据命令类型执行相应的命令
6.1命令类型为T_StartReplicationCmd,调用StartReplication




bool
exec_replication_command(const char *cmd_string)
{
    int         parse_rc;
    Node       *cmd_node;
    MemoryContext cmd_context;
    MemoryContext old_context;
    
    if (got_STOPPING)
        WalSndSetState(WALSNDSTATE_STOPPING);
    
    if (MyWalSnd->state == WALSNDSTATE_STOPPING)
        ereport(ERROR,
                (errmsg("cannot execute new commands while WAL sender is in stopping mode")));
    
    SnapBuildClearExportedSnapshot();
    //检查中断
    CHECK_FOR_INTERRUPTS();
    //命令上下文
    cmd_context = AllocSetContextCreate(CurrentMemoryContext,
                                        "Replication command context",
                                        ALLOCSET_DEFAULT_SIZES);
    old_context = MemoryContextSwitchTo(cmd_context);
    //初始化复制扫描器
    replication_scanner_init(cmd_string);
    parse_rc = replication_yyparse();
    if (parse_rc != 0)
        ereport(ERROR,
                (errcode(ERRCODE_SYNTAX_ERROR),
                 (errmsg_internal("replication command parser returned %d",
                                  parse_rc))));
    cmd_node = replication_parse_result;
    
    if (cmd_node->type != T_SQLCmd)
        ereport(log_replication_commands ? LOG : DEBUG1,
                (errmsg("received replication command: %s", cmd_string)));
    
    if (!IsTransactionBlock())
        SnapBuildClearExportedSnapshot();
    
    if (IsAbortedTransactionBlockState() && !IsA(cmd_node, SQLCmd))
        ereport(ERROR,
                (errcode(ERRCODE_IN_FAILED_SQL_TRANSACTION),
                 errmsg("current transaction is aborted, "
                        "commands ignored until end of transaction block")));
    CHECK_FOR_INTERRUPTS();
    
    initStringInfo(&output_message);
    initStringInfo(&reply_message);
    initStringInfo(&tmpbuf);
    
    //向pgstat报告该进程正在运行.
    pgstat_report_activity(STATE_RUNNING, NULL);
    //根据命令类型执行相应的命令
    switch (cmd_node->type)
    {
        case T_IdentifySystemCmd:
            //识别系统
            IdentifySystem();
            break;
        case T_BaseBackupCmd:
            //BASE_BACKUP
            PreventInTransactionBlock(true, "BASE_BACKUP");
            SendBaseBackup((BaseBackupCmd *) cmd_node);
            break;
        case T_CreateReplicationSlotCmd:
            //创建复制slot
            CreateReplicationSlot((CreateReplicationSlotCmd *) cmd_node);
            break;
        case T_DropReplicationSlotCmd:
            //删除复制slot
            DropReplicationSlot((DropReplicationSlotCmd *) cmd_node);
            break;
        case T_StartReplicationCmd:
            //START_REPLICATION
            {
                StartReplicationCmd *cmd = (StartReplicationCmd *) cmd_node;
                PreventInTransactionBlock(true, "START_REPLICATION");
                if (cmd->kind == REPLICATION_KIND_PHYSICAL)
                    StartReplication(cmd);
                else
                    StartLogicalReplication(cmd);
                break;
            }
        case T_TimeLineHistoryCmd:
            //构造时间线历史 TIMELINE_HISTORY
            PreventInTransactionBlock(true, "TIMELINE_HISTORY");
            SendTimeLineHistory((TimeLineHistoryCmd *) cmd_node);
            break;
        case T_VariableShowStmt:
            //
            {
                DestReceiver *dest = CreateDestReceiver(DestRemoteSimple);
                VariableShowStmt *n = (VariableShowStmt *) cmd_node;
                GetPGVariable(n->name, dest);
            }
            break;
        case T_SQLCmd:
            //SQL命令
            if (MyDatabaseId == InvalidOid)
                ereport(ERROR,
                        (errmsg("cannot execute SQL commands in WAL sender for physical replication")));
            
            pgstat_report_activity(STATE_IDLE, NULL);
            
            return false;
        default:
            //其他命令
            elog(ERROR, "unrecognized replication command node tag: %u",
                 cmd_node->type);
    }
    
    //执行完毕,回到原来的内存上下文中
    MemoryContextSwitchTo(old_context);
    MemoryContextDelete(cmd_context);
    
    //命令结束
    EndCommand("SELECT", DestRemote);
    
    //报告状态
    pgstat_report_activity(STATE_IDLE, NULL);
    return true;
}

StartReplication
StartReplication处理START_REPLICATION命令.
其主要逻辑如下:
1.执行相关初始化和校验
2.选择时间线
3.进入COPY模式
3.1设置状态
3.2发送CopyBothResponse消息,启动streaming
3.3初始化相关变量,如共享内存状态等
3.4进入主循环(WalSndLoop)




static void
StartReplication(StartReplicationCmd *cmd)
{
    StringInfoData buf;
    XLogRecPtr  FlushPtr;
    if (ThisTimeLineID == 0)
        //时间线校验
        ereport(ERROR,
                (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
                 errmsg("IDENTIFY_SYSTEM has not been run before START_REPLICATION")));
    
    if (cmd->slotname)
    {
        ReplicationSlotAcquire(cmd->slotname, true);
        //#define SlotIsLogical ( slot ) (slot->data.database != InvalidOid)
        if (SlotIsLogical(MyReplicationSlot))
            ereport(ERROR,
                    (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
                     (errmsg("cannot use a logical replication slot for physical replication"))));
    }
    
    if (am_cascading_walsender)
    {
        
        //这也会更新ThisTimeLineID变量
        FlushPtr = GetStandbyFlushRecPtr();
    }
    else
        FlushPtr = GetFlushRecPtr();
    if (cmd->timeline != 0)
    {
        XLogRecPtr  switchpoint;
        sendTimeLine = cmd->timeline;
        if (sendTimeLine == ThisTimeLineID)
        {
            sendTimeLineIsHistoric = false;
            sendTimeLineValidUpto = InvalidXLogRecPtr;
        }
        else
        {
            List       *timeLineHistory;
            sendTimeLineIsHistoric = true;
            
            timeLineHistory = readTimeLineHistory(ThisTimeLineID);
            switchpoint = tliSwitchPoint(cmd->timeline, timeLineHistory,
                                         &sendTimeLineNextTLI);
            list_free_deep(timeLineHistory);
            
            if (!XLogRecPtrIsInvalid(switchpoint) &&
                switchpoint < cmd->startpoint)
            {
                ereport(ERROR,
                        (errmsg("requested starting point %X/%X on timeline %u is not in this server's history",
                                (uint32) (cmd->startpoint >> 32),
                                (uint32) (cmd->startpoint),
                                cmd->timeline),
                         errdetail("This server's history forked from timeline %u at %X/%X.",
                                   cmd->timeline,
                                   (uint32) (switchpoint >> 32),
                                   (uint32) (switchpoint))));
            }
            sendTimeLineValidUpto = switchpoint;
        }
    }
    else
    {
        sendTimeLine = ThisTimeLineID;
        sendTimeLineValidUpto = InvalidXLogRecPtr;
        sendTimeLineIsHistoric = false;
    }
    streamingDoneSending = streamingDoneReceiving = false;
    
    //如果没有任何东西需要stream,不需要启动COPY命令
    if (!sendTimeLineIsHistoric || cmd->startpoint < sendTimeLineValidUpto)
    {
        
        //设置状态
        WalSndSetState(WALSNDSTATE_CATCHUP);
        
        //发送CopyBothResponse消息,启动streaming
        pq_beginmessage(&buf, 'W');//W->COPY命令?
        pq_sendbyte(&buf, 0);
        pq_sendint16(&buf, 0);
        pq_endmessage(&buf);
        pq_flush();
        
        if (FlushPtr < cmd->startpoint)
        {
            ereport(ERROR,
                    (errmsg("requested starting point %X/%X is ahead of the WAL flush position of this server %X/%X",
                            (uint32) (cmd->startpoint >> 32),
                            (uint32) (cmd->startpoint),
                            (uint32) (FlushPtr >> 32),
                            (uint32) (FlushPtr))));
        }
        
        //从请求点开始streaming
        sentPtr = cmd->startpoint;
        
        //初始化共享内存状态
        SpinLockAcquire(&MyWalSnd->mutex);
        MyWalSnd->sentPtr = sentPtr;
        SpinLockRelease(&MyWalSnd->mutex);
        SyncRepInitConfig();
        
        //walsender主循环,开始复制,激活复制
        replication_active = true;
        //主循环
        WalSndLoop(XLogSendPhysical);
        //完结后设置为非活动状态
        replication_active = false;
        if (got_STOPPING)
            proc_exit(0);//退出
        //设置状态
        WalSndSetState(WALSNDSTATE_STARTUP);
        Assert(streamingDoneSending && streamingDoneReceiving);
    }
    if (cmd->slotname)
        ReplicationSlotRelease();
    
    if (sendTimeLineIsHistoric)
    {
        char        startpos_str[8 + 1 + 8 + 1];
        DestReceiver *dest;
        TupOutputState *tstate;
        TupleDesc   tupdesc;
        Datum       values[2];
        bool        nulls[2];
        snprintf(startpos_str, sizeof(startpos_str), "%X/%X",
                 (uint32) (sendTimeLineValidUpto >> 32),
                 (uint32) sendTimeLineValidUpto);
        dest = CreateDestReceiver(DestRemoteSimple);
        MemSet(nulls, false, sizeof(nulls));
        
        tupdesc = CreateTemplateTupleDesc(2);
        TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "next_tli",
                                  INT8OID, -1, 0);
        TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "next_tli_startpos",
                                  TEXTOID, -1, 0);
        
        tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
        values[0] = Int64GetDatum((int64) sendTimeLineNextTLI);
        values[1] = CStringGetTextDatum(startpos_str);
        
        do_tup_output(tstate, values, nulls);
        end_tup_output(tstate);
    }
    
    pq_puttextmessage('C', "START_STREAMING");
}

三、跟踪分析

在主节点上用gdb跟踪postmaster,在PostgresMain上设置断点后启动standby节点,进入断点



[xdb@localhost ~]$ ps -ef|grep postgres
xdb       1339     1  2 14:45 pts/0    00:00:00 /appdb/xdb/pg11.2/bin/postgres
[xdb@localhost ~]$ gdb -p 1339
GNU gdb (GDB) Red Hat Enterprise Linux 7.6.1-100.el7
...
(gdb) set follow-fork-mode child
(gdb) b exec_replication_command
Breakpoint 1 at 0x852fd2: file walsender.c, line 1438.
(gdb) c
Continuing.
[New process 1356]
[Thread debugging using libthread_db enabled]
Using host libthread_db library "/lib64/libthread_db.so.1".
[Switching to Thread 0x7f5df9d2d8c0 (LWP 1356)]
Breakpoint 1, exec_replication_command (cmd_string=0x1d66d78 "IDENTIFY_SYSTEM") at walsender.c:1438
1438        if (got_STOPPING)
(gdb)

第一个命令是IDENTIFY_SYSTEM,第二个命令才是需要跟踪的对象START_REPLICATION



(gdb) c
Continuing.
Breakpoint 1, exec_replication_command (cmd_string=0x1d66d78 "START_REPLICATION 0/5D000000 TIMELINE 16") at walsender.c:1438
1438        if (got_STOPPING)
(gdb)

1.执行相关初始化和校验



(gdb) n
1446        if (MyWalSnd->state == WALSNDSTATE_STOPPING)
(gdb) 
1454        SnapBuildClearExportedSnapshot();
(gdb) p *MyWalSnd
$1 = {pid = 1356, state = WALSNDSTATE_STARTUP, sentPtr = 0, needreload = false, write = 0, flush = 0, apply = 0, 
  writeLag = -1, flushLag = -1, applyLag = -1, mutex = 0 '\000', latch = 0x7f5dee92c4d4, sync_standby_priority = 0}
(gdb) n
1456        CHECK_FOR_INTERRUPTS();
(gdb)

2.切换内存上下文



(gdb) 
1458        cmd_context = AllocSetContextCreate(CurrentMemoryContext,
(gdb) 
1461        old_context = MemoryContextSwitchTo(cmd_context);
(gdb)

3.初始化复制扫描器



(gdb) 
1463        replication_scanner_init(cmd_string);
(gdb) n
1464        parse_rc = replication_yyparse();
(gdb) 
1465        if (parse_rc != 0)
(gdb) p parse_rc
$3 = 0
(gdb) 
(gdb) n
1471        cmd_node = replication_parse_result;
(gdb)
(gdb) 
1479        if (cmd_node->type != T_SQLCmd)
(gdb) n
1480            ereport(log_replication_commands ? LOG : DEBUG1,
(gdb) p cmd_node
$4 = (Node *) 0x1df4710
(gdb) p *cmd_node
$5 = {type = T_StartReplicationCmd}
(gdb)

4.执行事务相关的判断或校验



(gdb) n
1487        if (!IsTransactionBlock())
(gdb) 
1488            SnapBuildClearExportedSnapshot();
(gdb) 
1494        if (IsAbortedTransactionBlockState() && !IsA(cmd_node, SQLCmd))
(gdb) 
1500        CHECK_FOR_INTERRUPTS();
(gdb)

5.初始化输入输出消息



(gdb) 
1506        initStringInfo(&output_message);
(gdb) 
1507        initStringInfo(&reply_message);
(gdb) 
1508        initStringInfo(&tmpbuf);
(gdb) 
1511        pgstat_report_activity(STATE_RUNNING, NULL);

6.根据命令类型执行相应的命令
6.1命令类型为T_StartReplicationCmd,调用StartReplication



(gdb) n
1513        switch (cmd_node->type)
(gdb) 
1534                    StartReplicationCmd *cmd = (StartReplicationCmd *) cmd_node;
(gdb) 
1536                    PreventInTransactionBlock(true, "START_REPLICATION");
(gdb) 
1538                    if (cmd->kind == REPLICATION_KIND_PHYSICAL)
(gdb) 
1539                        StartReplication(cmd);

进入StartReplication



1539                        StartReplication(cmd);
(gdb) step
StartReplication (cmd=0x1df4710) at walsender.c:532
532     if (ThisTimeLineID == 0)
(gdb)

1.执行相关初始化和校验



(gdb) n
546     if (cmd->slotname)
(gdb) 
560     if (am_cascading_walsender)
(gdb)

2.选择时间线



(gdb) n
568     if (cmd->timeline != 0)
(gdb) 
572         sendTimeLine = cmd->timeline;
(gdb) 
573         if (sendTimeLine == ThisTimeLineID)
(gdb) 
575             sendTimeLineIsHistoric = false;
(gdb) p FlushPtr
$9 = 1560397696
(gdb) n
576             sendTimeLineValidUpto = InvalidXLogRecPtr;
(gdb) 
634     streamingDoneSending = streamingDoneReceiving = false;
(gdb) p sendTimeLine
$10 = 16
(gdb) p ThisTimeLineID
$11 = 16
(gdb) p *cmd
$12 = {type = T_StartReplicationCmd, kind = REPLICATION_KIND_PHYSICAL, slotname = 0x0, timeline = 16, 
  startpoint = 1560281088, options = 0x0}
(gdb)

3.进入COPY模式



(gdb) n
637     if (!sendTimeLineIsHistoric || cmd->startpoint < sendTimeLineValidUpto)
(gdb)

3.1设置状态



648         WalSndSetState(WALSNDSTATE_CATCHUP);
(gdb) p sendTimeLineValidUpto
$13 = 0
(gdb) p cmd->startpoint
$14 = 1560281088
(gdb)

3.2发送CopyBothResponse消息,启动streaming



(gdb) n
651         pq_beginmessage(&buf, 'W');
(gdb) 
652         pq_sendbyte(&buf, 0);
(gdb) 
653         pq_sendint16(&buf, 0);
(gdb) 
654         pq_endmessage(&buf);
(gdb) p buf
$15 = {data = 0x1df53b0 "", len = 3, maxlen = 1024, cursor = 87}
(gdb) p buf->data
$16 = 0x1df53b0 ""
(gdb) x/hb buf->data
0x1df53b0:  0
(gdb) x/32hb buf->data
0x1df53b0:  0   0   0   127 127 127 127 127
0x1df53b8:  127 127 127 127 127 127 127 127
0x1df53c0:  127 127 127 127 127 127 127 127
0x1df53c8:  127 127 127 127 127 127 127 127
(gdb)

3.3初始化相关变量,如共享内存状态等



(gdb) n
655         pq_flush();
(gdb) 
661         if (FlushPtr < cmd->startpoint)
(gdb) p FlushPtr
$17 = 1560397696
(gdb) p cmd->startpoint
$18 = 1560281088
(gdb) n
672         sentPtr = cmd->startpoint;
(gdb) 
675         SpinLockAcquire(&MyWalSnd->mutex);
(gdb) 
676         MyWalSnd->sentPtr = sentPtr;
(gdb) 
677         SpinLockRelease(&MyWalSnd->mutex);
(gdb) 
679         SyncRepInitConfig();
(gdb) 
682         replication_active = true;

3.4进入主循环(WalSndLoop)



(gdb) 
684         WalSndLoop(XLogSendPhysical);
(gdb)

DONE!

四、参考资料

PG Source Code

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

PostgreSQL 源码解读(154)- 后台进程#6(walsender#2)

下载Word文档到电脑,方便收藏和打印~

下载Word文档

编程热搜

目录