我的编程空间,编程开发者的网络收藏夹
学习永远不晚

PostgreSQL 源码解读(115)- 后台进程#3(checkpointer进程#2)

短信预约 -IT技能 免费直播动态提醒
省份

北京

  • 北京
  • 上海
  • 天津
  • 重庆
  • 河北
  • 山东
  • 辽宁
  • 黑龙江
  • 吉林
  • 甘肃
  • 青海
  • 河南
  • 江苏
  • 湖北
  • 湖南
  • 江西
  • 浙江
  • 广东
  • 云南
  • 福建
  • 海南
  • 山西
  • 四川
  • 陕西
  • 贵州
  • 安徽
  • 广西
  • 内蒙
  • 西藏
  • 新疆
  • 宁夏
  • 兵团
手机号立即预约

请填写图片验证码后获取短信验证码

看不清楚,换张图片

免费获取短信验证码

PostgreSQL 源码解读(115)- 后台进程#3(checkpointer进程#2)

本节简单介绍了PostgreSQL的后台进程:checkpointer,主要分析CreateCheckPoint函数的实现逻辑。

一、数据结构

CheckPoint
CheckPoint XLOG record结构体.




typedef struct CheckPoint
{
    //在开始创建CheckPoint时下一个可用的RecPtr(比如REDO的开始点)
    XLogRecPtr  redo;           
    //当前的时间线
    TimeLineID  ThisTimeLineID; 
    //上一个时间线(如该记录正在开启一条新的时间线,否则等于当前时间线)
    TimeLineID  PrevTimeLineID; 
    //是否full-page-write
    bool        fullPageWrites; 
    //nextXid的高阶位
    uint32      nextXidEpoch;   
    //下一个free的XID
    TransactionId nextXid;      
    //下一个free的OID
    Oid         nextOid;        
    //下一个fredd的MultiXactId
    MultiXactId nextMulti;      
    //下一个空闲的MultiXact偏移
    MultiXactOffset nextMultiOffset;    
    //集群范围内的最小datfrozenxid
    TransactionId oldestXid;    
    //最小datfrozenxid所在的database
    Oid         oldestXidDB;    
    //集群范围内的最小datminmxid
    MultiXactId oldestMulti;    
    //最小datminmxid所在的database
    Oid         oldestMultiDB;  
    //checkpoint的时间戳
    pg_time_t   time;           
    //带有有效提交时间戳的最老Xid
    TransactionId oldestCommitTsXid;    
    //带有有效提交时间戳的最新Xid
    TransactionId newestCommitTsXid;    
    
    TransactionId oldestActiveXid;
} CheckPoint;

#define XLOG_CHECKPOINT_SHUTDOWN        0x00
#define XLOG_CHECKPOINT_ONLINE          0x10
#define XLOG_NOOP                       0x20
#define XLOG_NEXTOID                    0x30
#define XLOG_SWITCH                     0x40
#define XLOG_BACKUP_END                 0x50
#define XLOG_PARAMETER_CHANGE           0x60
#define XLOG_RESTORE_POINT              0x70
#define XLOG_FPW_CHANGE                 0x80
#define XLOG_END_OF_RECOVERY            0x90
#define XLOG_FPI_FOR_HINT               0xA0
#define XLOG_FPI                        0xB0

CheckpointerShmem
checkpointer进程和其他后台进程之间通讯的共享内存结构.




typedef struct
{
    RelFileNode rnode;//表空间/数据库/Relation信息
    ForkNumber  forknum;//fork编号
    BlockNumber segno;          
    
} CheckpointerRequest;
typedef struct
{
    //checkpoint进程的pid(为0则进程未启动)
    pid_t       checkpointer_pid;   
    //用于保护所有的ckpt_*域
    slock_t     ckpt_lck;       
    //在checkpoint启动时计数
    int         ckpt_started;   
    //在checkpoint完成时计数
    int         ckpt_done;      
    //在checkpoint失败时计数
    int         ckpt_failed;    
    //检查点标记,在xlog.h中定义
    int         ckpt_flags;     
    //计数后台进程缓存写的次数
    uint32      num_backend_writes; 
    //计数后台进程fsync调用次数
    uint32      num_backend_fsync;  
    //当前的请求编号
    int         num_requests;   
    //最大的请求编号
    int         max_requests;   
    //请求数组
    CheckpointerRequest requests[FLEXIBLE_ARRAY_MEMBER];
} CheckpointerShmemStruct;
//静态变量(CheckpointerShmemStruct结构体指针)
static CheckpointerShmemStruct *CheckpointerShmem;

VirtualTransactionId
最顶层的事务通过VirtualTransactionIDs定义.




typedef struct
{
    BackendId   backendId;      
    LocalTransactionId localTransactionId;  
} VirtualTransactionId;

二、源码解读

CreateCheckPoint函数,执行checkpoint,不管是在shutdown过程还是在运行中.




void
CreateCheckPoint(int flags)
{
    bool        shutdown;//是否处于shutdown?
    CheckPoint  checkPoint;//checkpoint
    XLogRecPtr  recptr;//XLOG Record位置
    XLogSegNo   _logSegNo;//LSN(uint64)
    XLogCtlInsert *Insert = &XLogCtl->Insert;//控制器
    uint32      freespace;//空闲空间
    XLogRecPtr  PriorRedoPtr;//上一个Redo point
    XLogRecPtr  curInsert;//当前插入的位置
    XLogRecPtr  last_important_lsn;//上一个重要的LSN
    VirtualTransactionId *vxids;//虚拟事务ID
    int         nvxids;
    
    if (flags & (CHECKPOINT_IS_SHUTDOWN | CHECKPOINT_END_OF_RECOVERY))
        shutdown = true;
    else
        shutdown = false;
    
    //验证
    if (RecoveryInProgress() && (flags & CHECKPOINT_END_OF_RECOVERY) == 0)
        elog(ERROR, "can't create a checkpoint during recovery");
    
    InitXLogInsert();
    
    LWLockAcquire(CheckpointLock, LW_EXCLUSIVE);
    
    MemSet(&CheckpointStats, 0, sizeof(CheckpointStats));
    CheckpointStats.ckpt_start_t = GetCurrentTimestamp();
    
    START_CRIT_SECTION();
    if (shutdown)
    {
        //shutdown = T
        //更新control file(pg_control文件)
        LWLockAcquire(ControlFileLock, LW_EXCLUSIVE);
        ControlFile->state = DB_SHUTDOWNING;
        ControlFile->time = (pg_time_t) time(NULL);
        UpdateControlFile();
        LWLockRelease(ControlFileLock);
    }
    
    smgrpreckpt();
    
    //填充Checkpoint XLOG Record
    MemSet(&checkPoint, 0, sizeof(checkPoint));
    checkPoint.time = (pg_time_t) time(NULL);//时间
    
    if (!shutdown && XLogStandbyInfoActive())
        checkPoint.oldestActiveXid = GetOldestActiveTransactionId();
    else
        checkPoint.oldestActiveXid = InvalidTransactionId;
    
    last_important_lsn = GetLastImportantRecPtr();
    
    WALInsertLockAcquireExclusive();
    curInsert = XLogBytePosToRecPtr(Insert->CurrBytePos);
    
    if ((flags & (CHECKPOINT_IS_SHUTDOWN | CHECKPOINT_END_OF_RECOVERY |
                  CHECKPOINT_FORCE)) == 0)
    {
        if (last_important_lsn == ControlFile->checkPoint)
        {
            WALInsertLockRelease();
            LWLockRelease(CheckpointLock);
            END_CRIT_SECTION();
            ereport(DEBUG1,
                    (errmsg("checkpoint skipped because system is idle")));
            return;
        }
    }
    
    if (flags & CHECKPOINT_END_OF_RECOVERY)
        LocalSetXLogInsertAllowed();
    checkPoint.ThisTimeLineID = ThisTimeLineID;
    if (flags & CHECKPOINT_END_OF_RECOVERY)
        checkPoint.PrevTimeLineID = XLogCtl->PrevTimeLineID;
    else
        checkPoint.PrevTimeLineID = ThisTimeLineID;
    checkPoint.fullPageWrites = Insert->fullPageWrites;
    
    freespace = INSERT_FREESPACE(curInsert);//获取空闲空间
    if (freespace == 0)
    {
        //没有空闲空间了
        if (XLogSegmentOffset(curInsert, wal_segment_size) == 0)
            curInsert += SizeOfXLogLongPHD;//新的WAL segment file,偏移为LONG header
        else
            curInsert += SizeOfXLogShortPHD;//原WAL segment file,偏移为常规的header
    }
    checkPoint.redo = curInsert;
    
    RedoRecPtr = XLogCtl->Insert.RedoRecPtr = checkPoint.redo;
    
    WALInsertLockRelease();
    
    //同时,更新RedoRecPtr的info_lck-protected拷贝锁.
    SpinLockAcquire(&XLogCtl->info_lck);
    XLogCtl->RedoRecPtr = checkPoint.redo;
    SpinLockRelease(&XLogCtl->info_lck);
    
    if (log_checkpoints)
        LogCheckpointStart(flags, false);
    TRACE_POSTGRESQL_CHECKPOINT_START(flags);
    
    LWLockAcquire(XidGenLock, LW_SHARED);
    checkPoint.nextXid = ShmemVariableCache->nextXid;
    checkPoint.oldestXid = ShmemVariableCache->oldestXid;
    checkPoint.oldestXidDB = ShmemVariableCache->oldestXidDB;
    LWLockRelease(XidGenLock);
    LWLockAcquire(CommitTsLock, LW_SHARED);
    checkPoint.oldestCommitTsXid = ShmemVariableCache->oldestCommitTsXid;
    checkPoint.newestCommitTsXid = ShmemVariableCache->newestCommitTsXid;
    LWLockRelease(CommitTsLock);
    
    //如果我们从上一个checkpoint开始wrapped around,则增加XID epoch
    checkPoint.nextXidEpoch = ControlFile->checkPointCopy.nextXidEpoch;
    if (checkPoint.nextXid < ControlFile->checkPointCopy.nextXid)
        checkPoint.nextXidEpoch++;
    LWLockAcquire(OidGenLock, LW_SHARED);
    checkPoint.nextOid = ShmemVariableCache->nextOid;
    if (!shutdown)
        checkPoint.nextOid += ShmemVariableCache->oidCount;
    LWLockRelease(OidGenLock);
    MultiXactGetCheckptMulti(shutdown,
                             &checkPoint.nextMulti,
                             &checkPoint.nextMultiOffset,
                             &checkPoint.oldestMulti,
                             &checkPoint.oldestMultiDB);
    
    END_CRIT_SECTION();
    
    vxids = GetVirtualXIDsDelayingChkpt(&nvxids);//获取虚拟事务XID
    if (nvxids > 0)
    {
        do
        {
            //等待10ms
            pg_usleep(10000L);  
        } while (HaveVirtualXIDsDelayingChkpt(vxids, nvxids));
    }
    pfree(vxids);
    //把共享内存中的数据刷到磁盘上,并执行fsync
    CheckPointGuts(checkPoint.redo, flags);
    
    if (!shutdown && XLogStandbyInfoActive())
        LogStandbySnapshot();
    START_CRIT_SECTION();//进入critical section.
    
    XLogBeginInsert();//开始插入
    XLogRegisterData((char *) (&checkPoint), sizeof(checkPoint));//注册数据
    recptr = XLogInsert(RM_XLOG_ID,
                        shutdown ? XLOG_CHECKPOINT_SHUTDOWN :
                        XLOG_CHECKPOINT_ONLINE);//执行插入
    XLogFlush(recptr);//刷盘
    
    if (shutdown)
    {
        //关闭过程中
        if (flags & CHECKPOINT_END_OF_RECOVERY)
            LocalXLogInsertAllowed = -1;    
        else
            LocalXLogInsertAllowed = 0; 
    }
    
    if (shutdown && checkPoint.redo != ProcLastRecPtr)
        ereport(PANIC,
                (errmsg("concurrent write-ahead log activity while database system is shutting down")));
    
    PriorRedoPtr = ControlFile->checkPointCopy.redo;
    
    LWLockAcquire(ControlFileLock, LW_EXCLUSIVE);
    if (shutdown)
        ControlFile->state = DB_SHUTDOWNED;
    ControlFile->checkPoint = ProcLastRecPtr;
    ControlFile->checkPointCopy = checkPoint;
    ControlFile->time = (pg_time_t) time(NULL);
    
    //crash recovery通常来说应恢复至WAL的末尾
    ControlFile->minRecoveryPoint = InvalidXLogRecPtr;
    ControlFile->minRecoveryPointTLI = 0;
    
    SpinLockAcquire(&XLogCtl->ulsn_lck);
    ControlFile->unloggedLSN = XLogCtl->unloggedLSN;
    SpinLockRelease(&XLogCtl->ulsn_lck);
    UpdateControlFile();
    LWLockRelease(ControlFileLock);
    
    //更新checkpoint XID/epoch的共享内存拷贝
    SpinLockAcquire(&XLogCtl->info_lck);
    XLogCtl->ckptXidEpoch = checkPoint.nextXidEpoch;
    XLogCtl->ckptXid = checkPoint.nextXid;
    SpinLockRelease(&XLogCtl->info_lck);
    
    END_CRIT_SECTION();
    
    smgrpostckpt();
    
    if (PriorRedoPtr != InvalidXLogRecPtr)
        UpdateCheckPointDistanceEstimate(RedoRecPtr - PriorRedoPtr);
    
    XLByteToSeg(RedoRecPtr, _logSegNo, wal_segment_size);
    KeepLogSeg(recptr, &_logSegNo);
    _logSegNo--;
    RemoveOldXlogFiles(_logSegNo, RedoRecPtr, recptr);
    
    if (!shutdown)
        PreallocXlogFiles(recptr);
    
    if (!RecoveryInProgress())
        TruncateSUBTRANS(GetOldestXmin(NULL, PROCARRAY_FLAGS_DEFAULT));
    
    //实际的工作已完成,除了记录日志已经更新统计信息.
    LogCheckpointEnd(false);
    TRACE_POSTGRESQL_CHECKPOINT_DONE(CheckpointStats.ckpt_bufs_written,
                                     NBuffers,
                                     CheckpointStats.ckpt_segs_added,
                                     CheckpointStats.ckpt_segs_removed,
                                     CheckpointStats.ckpt_segs_recycled);
    //释放锁
    LWLockRelease(CheckpointLock);
}

static void
CheckPointGuts(XLogRecPtr checkPointRedo, int flags)
{
    CheckPointCLOG();
    CheckPointCommitTs();
    CheckPointSUBTRANS();
    CheckPointMultiXact();
    CheckPointPredicate();
    CheckPointRelationMap();
    CheckPointReplicationSlots();
    CheckPointSnapBuild();
    CheckPointLogicalRewriteHeap();
    CheckPointBuffers(flags);   
    CheckPointReplicationOrigin();
    
    CheckPointTwoPhase(checkPointRedo);
}

三、跟踪分析

更新数据,执行checkpoint.



testdb=# update t_wal_ckpt set c2 = 'C2_'||substr(c2,4,40);
UPDATE 1
testdb=# checkpoint;

启动gdb,设置信号控制,设置断点,进入CreateCheckPoint



(gdb) handle SIGINT print nostop pass
SIGINT is used by the debugger.
Are you sure you want to change it? (y or n) y
Signal        Stop  Print Pass to program Description
SIGINT        No  Yes Yes   Interrupt
(gdb) 
(gdb) b CreateCheckPoint
Breakpoint 1 at 0x55b4fb: file xlog.c, line 8668.
(gdb) c
Continuing.
Program received signal SIGINT, Interrupt.
Breakpoint 1, CreateCheckPoint (flags=44) at xlog.c:8668
8668        XLogCtlInsert *Insert = &XLogCtl->Insert;
(gdb)

获取XLOG插入控制器



8668        XLogCtlInsert *Insert = &XLogCtl->Insert;
(gdb) n
8680        if (flags & (CHECKPOINT_IS_SHUTDOWN | CHECKPOINT_END_OF_RECOVERY))
(gdb) p XLogCtl
$1 = (XLogCtlData *) 0x7fadf8f6fa80
(gdb) p *XLogCtl
$2 = {Insert = {insertpos_lck = 0 '\000', CurrBytePos = 5505269968, PrevBytePos = 5505269928, 
    pad = '\000' <repeats 127 times>, RedoRecPtr = 5521450856, forcePageWrites = false, fullPageWrites = true, 
    exclusiveBackupState = EXCLUSIVE_BACKUP_NONE, nonExclusiveBackups = 0, lastBackupStart = 0, 
    WALInsertLocks = 0x7fadf8f74100}, LogwrtRqst = {Write = 5521451392, Flush = 5521451392}, RedoRecPtr = 5521450856, 
  ckptXidEpoch = 0, ckptXid = 2307, asyncXactLSN = 5521363848, replicationSlotMinLSN = 0, lastRemovedSegNo = 0, 
  unloggedLSN = 1, ulsn_lck = 0 '\000', lastSegSwitchTime = 1546915130, lastSegSwitchLSN = 5521363360, LogwrtResult = {
    Write = 5521451392, Flush = 5521451392}, InitializedUpTo = 5538226176, pages = 0x7fadf8f76000 "\230\320\006", 
  xlblocks = 0x7fadf8f70088, XLogCacheBlck = 2047, ThisTimeLineID = 1, PrevTimeLineID = 1, 
  archiveCleanupCommand = '\000' <repeats 1023 times>, SharedRecoveryInProgress = false, SharedHotStandbyActive = false, 
  WalWriterSleeping = true, recoveryWakeupLatch = {is_set = 0, is_shared = true, owner_pid = 0}, lastCheckPointRecPtr = 0, 
  lastCheckPointEndPtr = 0, lastCheckPoint = {redo = 0, ThisTimeLineID = 0, PrevTimeLineID = 0, fullPageWrites = false, 
    nextXidEpoch = 0, nextXid = 0, nextOid = 0, nextMulti = 0, nextMultiOffset = 0, oldestXid = 0, oldestXidDB = 0, 
    oldestMulti = 0, oldestMultiDB = 0, time = 0, oldestCommitTsXid = 0, newestCommitTsXid = 0, oldestActiveXid = 0}, 
  lastReplayedEndRecPtr = 0, lastReplayedTLI = 0, replayEndRecPtr = 0, replayEndTLI = 0, recoveryLastXTime = 0, 
  currentChunkStartTime = 0, recoveryPause = false, lastFpwDisableRecPtr = 0, info_lck = 0 '\000'}
(gdb) p *Insert
$4 = {insertpos_lck = 0 '\000', CurrBytePos = 5505269968, PrevBytePos = 5505269928, pad = '\000' <repeats 127 times>, 
  RedoRecPtr = 5521450856, forcePageWrites = false, fullPageWrites = true, exclusiveBackupState = EXCLUSIVE_BACKUP_NONE, 
  nonExclusiveBackups = 0, lastBackupStart = 0, WALInsertLocks = 0x7fadf8f74100}
(gdb)

RedoRecPtr = 5521450856,这是REDO point,与pg_control文件中的值一致



[xdb@localhost ~]$ echo "obase=16;ibase=10;5521450856"|bc
1491AA768
[xdb@localhost ~]$ pg_controldata|grep REDO
Latest checkpoint's REDO location:    1/491AA768
Latest checkpoint's REDO WAL file:    000000010000000100000049
[xdb@localhost ~]$

在进入critical section前,初始化InitXLogInsert工作空间.
请求CheckpointLock确保在同一时刻只能存在一个checkpoint.



(gdb) n
8683            shutdown = false;
(gdb) 
8686        if (RecoveryInProgress() && (flags & CHECKPOINT_END_OF_RECOVERY) == 0)
(gdb) 
8697        InitXLogInsert();
(gdb) 
8705        LWLockAcquire(CheckpointLock, LW_EXCLUSIVE);
(gdb) 
8714        MemSet(&CheckpointStats, 0, sizeof(CheckpointStats));
(gdb) 
8715        CheckpointStats.ckpt_start_t = GetCurrentTimestamp();
(gdb)

进入critical section,让smgr(资源管理器)为checkpoint作准备.



8720        START_CRIT_SECTION();
(gdb) 
(gdb) 
8722        if (shutdown)
(gdb) 
8736        smgrpreckpt();
(gdb) 
8739        MemSet(&checkPoint, 0, sizeof(checkPoint));
(gdb)

开始填充Checkpoint XLOG Record



(gdb) 
8740        checkPoint.time = (pg_time_t) time(NULL);
(gdb) p checkPoint
$5 = {redo = 0, ThisTimeLineID = 0, PrevTimeLineID = 0, fullPageWrites = false, nextXidEpoch = 0, nextXid = 0, nextOid = 0, 
  nextMulti = 0, nextMultiOffset = 0, oldestXid = 0, oldestXidDB = 0, oldestMulti = 0, oldestMultiDB = 0, time = 0, 
  oldestCommitTsXid = 0, newestCommitTsXid = 0, oldestActiveXid = 0}
(gdb) n
8747        if (!shutdown && XLogStandbyInfoActive())
(gdb) 
8750            checkPoint.oldestActiveXid = InvalidTransactionId;

在请求插入locks前,获取最后一个重要的XLOG Record的位置.



(gdb) 
8756        last_important_lsn = GetLastImportantRecPtr();
(gdb) 
8762        WALInsertLockAcquireExclusive();
(gdb) 
(gdb) p last_important_lsn
$6 = 5521451352 --> 0x1491AA958

在检查插入状态确定checkpoint的REDO pointer时,必须阻塞同步插入操作.



(gdb) n
8763        curInsert = XLogBytePosToRecPtr(Insert->CurrBytePos);
(gdb) 
8770        if ((flags & (CHECKPOINT_IS_SHUTDOWN | CHECKPOINT_END_OF_RECOVERY |
(gdb) p curInsert
$7 = 5521451392 --> 0x1491AA980
(gdb)

继续填充Checkpoint XLOG Record



(gdb) n
8790        if (flags & CHECKPOINT_END_OF_RECOVERY)
(gdb) 
8793        checkPoint.ThisTimeLineID = ThisTimeLineID;
(gdb) 
8794        if (flags & CHECKPOINT_END_OF_RECOVERY)
(gdb) 
8797            checkPoint.PrevTimeLineID = ThisTimeLineID;
(gdb) p ThisTimeLineID
$8 = 1
(gdb) n
8799        checkPoint.fullPageWrites = Insert->fullPageWrites;
(gdb) 
8809        freespace = INSERT_FREESPACE(curInsert);
(gdb) 
8810        if (freespace == 0)
(gdb) p freespace
$9 = 5760
(gdb) n
8817        checkPoint.redo = curInsert;
(gdb) 
8830        RedoRecPtr = XLogCtl->Insert.RedoRecPtr = checkPoint.redo;
(gdb) 
(gdb) p checkPoint
$10 = {redo = 5521451392, ThisTimeLineID = 1, PrevTimeLineID = 1, fullPageWrites = true, nextXidEpoch = 0, nextXid = 0, 
  nextOid = 0, nextMulti = 0, nextMultiOffset = 0, oldestXid = 0, oldestXidDB = 0, oldestMulti = 0, oldestMultiDB = 0, 
  time = 1546933255, oldestCommitTsXid = 0, newestCommitTsXid = 0, oldestActiveXid = 0}
(gdb)

更新共享的RedoRecPtr以备将来的XLogInsert调用,必须在持有所有插入锁才能完成。



(gdb) n
8836        WALInsertLockRelease();
(gdb) 
8839        SpinLockAcquire(&XLogCtl->info_lck);
(gdb) 
8840        XLogCtl->RedoRecPtr = checkPoint.redo;
(gdb) 
8841        SpinLockRelease(&XLogCtl->info_lck);
(gdb) 
8847        if (log_checkpoints)
(gdb) 
(gdb) p XLogCtl->RedoRecPtr
$11 = 5521451392

获取其他组装checkpoint记录的信息.



(gdb) n
8850        TRACE_POSTGRESQL_CHECKPOINT_START(flags);
(gdb) 
8860        LWLockAcquire(XidGenLock, LW_SHARED);
(gdb) 
8861        checkPoint.nextXid = ShmemVariableCache->nextXid;
(gdb) 
8862        checkPoint.oldestXid = ShmemVariableCache->oldestXid;
(gdb) 
8863        checkPoint.oldestXidDB = ShmemVariableCache->oldestXidDB;
(gdb) 
8864        LWLockRelease(XidGenLock);
(gdb) 
8866        LWLockAcquire(CommitTsLock, LW_SHARED);
(gdb) 
8867        checkPoint.oldestCommitTsXid = ShmemVariableCache->oldestCommitTsXid;
(gdb) 
8868        checkPoint.newestCommitTsXid = ShmemVariableCache->newestCommitTsXid;
(gdb) 
8869        LWLockRelease(CommitTsLock);
(gdb) 
8872        checkPoint.nextXidEpoch = ControlFile->checkPointCopy.nextXidEpoch;
(gdb) n
8873        if (checkPoint.nextXid < ControlFile->checkPointCopy.nextXid)
(gdb) 
8876        LWLockAcquire(OidGenLock, LW_SHARED);
(gdb) 
8877        checkPoint.nextOid = ShmemVariableCache->nextOid;
(gdb) p checkPoint
$13 = {redo = 5521451392, ThisTimeLineID = 1, PrevTimeLineID = 1, fullPageWrites = true, nextXidEpoch = 0, nextXid = 2308, 
  nextOid = 0, nextMulti = 0, nextMultiOffset = 0, oldestXid = 561, oldestXidDB = 16400, oldestMulti = 0, 
  oldestMultiDB = 0, time = 1546933255, oldestCommitTsXid = 0, newestCommitTsXid = 0, oldestActiveXid = 0}
(gdb) n
8878        if (!shutdown)
(gdb) 
8879            checkPoint.nextOid += ShmemVariableCache->oidCount;
(gdb) 
8880        LWLockRelease(OidGenLock);
(gdb) p *ShmemVariableCache
$14 = {nextOid = 42575, oidCount = 8189, nextXid = 2308, oldestXid = 561, xidVacLimit = 200000561, 
  xidWarnLimit = 2136484208, xidStopLimit = 2146484208, xidWrapLimit = 2147484208, oldestXidDB = 16400, 
  oldestCommitTsXid = 0, newestCommitTsXid = 0, latestCompletedXid = 2307, oldestClogXid = 561}
(gdb) n
8882        MultiXactGetCheckptMulti(shutdown,
(gdb)

再次查看checkpoint结构体



(gdb) p checkPoint
$15 = {redo = 5521451392, ThisTimeLineID = 1, PrevTimeLineID = 1, fullPageWrites = true, nextXidEpoch = 0, nextXid = 2308, 
  nextOid = 50764, nextMulti = 1, nextMultiOffset = 0, oldestXid = 561, oldestXidDB = 16400, oldestMulti = 1, 
  oldestMultiDB = 16402, time = 1546933255, oldestCommitTsXid = 0, newestCommitTsXid = 0, oldestActiveXid = 0}
(gdb)

结束CRIT_SECTION



(gdb) 
8896        END_CRIT_SECTION();

获取虚拟事务ID(无效的信息)



(gdb) n
8927        vxids = GetVirtualXIDsDelayingChkpt(&nvxids);
(gdb) 
8928        if (nvxids > 0)
(gdb) p vxids
$16 = (VirtualTransactionId *) 0x2f4eb20
(gdb) p *vxids
$17 = {backendId = 2139062143, localTransactionId = 2139062143}
(gdb) p nvxids
$18 = 0
(gdb) 
(gdb) n
8935        pfree(vxids);
(gdb)

把共享内存中的数据刷到磁盘上,并执行fsync



(gdb) 
8937        CheckPointGuts(checkPoint.redo, flags);
(gdb) p flags
$19 = 44
(gdb) n
8947        if (!shutdown && XLogStandbyInfoActive())
(gdb)

进入critical section.



(gdb) n
8950        START_CRIT_SECTION();
(gdb)

现在可以插入checkpoint record到XLOG中了.



(gdb) 
8955        XLogBeginInsert();
(gdb) n
8956        XLogRegisterData((char *) (&checkPoint), sizeof(checkPoint));
(gdb) 
8957        recptr = XLogInsert(RM_XLOG_ID,
(gdb) 
8961        XLogFlush(recptr);
(gdb) 
8970        if (shutdown)
(gdb)

更新控制文件(pg_control),首先为UpdateCheckPointDistanceEstimate()记录上一个checkpoint的REDO ptr



(gdb) 
8982        if (shutdown && checkPoint.redo != ProcLastRecPtr)
(gdb) 
8990        PriorRedoPtr = ControlFile->checkPointCopy.redo;
(gdb) 
8995        LWLockAcquire(ControlFileLock, LW_EXCLUSIVE);
(gdb) p ControlFile->checkPointCopy.redo
$20 = 5521450856
(gdb) n
8996        if (shutdown)
(gdb) 
8998        ControlFile->checkPoint = ProcLastRecPtr;
(gdb) 
8999        ControlFile->checkPointCopy = checkPoint;
(gdb) 
9000        ControlFile->time = (pg_time_t) time(NULL);
(gdb) 
9002        ControlFile->minRecoveryPoint = InvalidXLogRecPtr;
(gdb) 
9003        ControlFile->minRecoveryPointTLI = 0;
(gdb) 
9010        SpinLockAcquire(&XLogCtl->ulsn_lck);
(gdb) 
9011        ControlFile->unloggedLSN = XLogCtl->unloggedLSN;
(gdb) 
9012        SpinLockRelease(&XLogCtl->ulsn_lck);
(gdb) 
9014        UpdateControlFile();
(gdb) 
9015        LWLockRelease(ControlFileLock);
(gdb) 
9018        SpinLockAcquire(&XLogCtl->info_lck);
(gdb) p *ControlFile
$21 = {system_identifier = 6624362124887945794, pg_control_version = 1100, catalog_version_no = 201809051, 
  state = DB_IN_PRODUCTION, time = 1546934255, checkPoint = 5521451392, checkPointCopy = {redo = 5521451392, 
    ThisTimeLineID = 1, PrevTimeLineID = 1, fullPageWrites = true, nextXidEpoch = 0, nextXid = 2308, nextOid = 50764, 
    nextMulti = 1, nextMultiOffset = 0, oldestXid = 561, oldestXidDB = 16400, oldestMulti = 1, oldestMultiDB = 16402, 
    time = 1546933255, oldestCommitTsXid = 0, newestCommitTsXid = 0, oldestActiveXid = 0}, unloggedLSN = 1, 
  minRecoveryPoint = 0, minRecoveryPointTLI = 0, backupStartPoint = 0, backupEndPoint = 0, backupEndRequired = false, 
  wal_level = 0, wal_log_hints = false, MaxConnections = 100, max_worker_processes = 8, max_prepared_xacts = 0, 
  max_locks_per_xact = 64, track_commit_timestamp = false, maxAlign = 8, floatFormat = 1234567, blcksz = 8192, 
  relseg_size = 131072, xlog_blcksz = 8192, xlog_seg_size = 16777216, nameDataLen = 64, indexMaxKeys = 32, 
  toast_max_chunk_size = 1996, loblksize = 2048, float4ByVal = true, float8ByVal = true, data_checksum_version = 0, 
  mock_authentication_nonce = "\220\277\067Vg\003\205\232U{\177 h\216\271D\266\063[\\=6\365S\tA\353\361ߧw\301", 
  crc = 930305687}
(gdb)

更新checkpoint XID/epoch的共享内存拷贝,退出critical section,并让smgr执行checkpoint收尾工作(比如删除旧文件等).



(gdb) n
9019        XLogCtl->ckptXidEpoch = checkPoint.nextXidEpoch;
(gdb) 
9020        XLogCtl->ckptXid = checkPoint.nextXid;
(gdb) 
9021        SpinLockRelease(&XLogCtl->info_lck);
(gdb) 
9027        END_CRIT_SECTION();
(gdb) 
9032        smgrpostckpt();
(gdb)

删除旧的日志文件,这些文件自最后一个检查点后已不再需要,以防止保存xlog的磁盘撑满。



(gdb) n
9038        if (PriorRedoPtr != InvalidXLogRecPtr)
(gdb) p PriorRedoPtr
$23 = 5521450856
(gdb) n
9039            UpdateCheckPointDistanceEstimate(RedoRecPtr - PriorRedoPtr);
(gdb) 
9045        XLByteToSeg(RedoRecPtr, _logSegNo, wal_segment_size);
(gdb) 
9046        KeepLogSeg(recptr, &_logSegNo);
(gdb) p RedoRecPtr
$24 = 5521451392
(gdb) p _logSegNo
$25 = 329
(gdb) p wal_segment_size
$26 = 16777216
(gdb) n
9047        _logSegNo--;
(gdb) 
9048        RemoveOldXlogFiles(_logSegNo, RedoRecPtr, recptr);
(gdb) 
9054        if (!shutdown)
(gdb) p recptr
$27 = 5521451504
(gdb)

执行其他相关收尾工作



(gdb) n
9055            PreallocXlogFiles(recptr);
(gdb) 
9064        if (!RecoveryInProgress())
(gdb) 
9065            TruncateSUBTRANS(GetOldestXmin(NULL, PROCARRAY_FLAGS_DEFAULT));
(gdb) 
9068        LogCheckpointEnd(false);
(gdb) 
9070        TRACE_POSTGRESQL_CHECKPOINT_DONE(CheckpointStats.ckpt_bufs_written,
(gdb) 
9076        LWLockRelease(CheckpointLock);
(gdb) 
9077    }
(gdb)

完成调用



(gdb) 
CheckpointerMain () at checkpointer.c:488
488                 ckpt_performed = true;
(gdb)

DONE!

四、参考资料

checkpointer.c

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

PostgreSQL 源码解读(115)- 后台进程#3(checkpointer进程#2)

下载Word文档到电脑,方便收藏和打印~

下载Word文档

编程热搜

目录