PostgreSQL的后台进程checkpointer分析
本篇内容介绍了“PostgreSQL的后台进程checkpointer分析”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!
一、数据结构
CheckPoint
CheckPoint XLOG record结构体.
typedef struct CheckPoint
{
//在开始创建CheckPoint时下一个可用的RecPtr(比如REDO的开始点)
XLogRecPtr redo;
//当前的时间线
TimeLineID ThisTimeLineID;
//上一个时间线(如该记录正在开启一条新的时间线,否则等于当前时间线)
TimeLineID PrevTimeLineID;
//是否full-page-write
bool fullPageWrites;
//nextXid的高阶位
uint32 nextXidEpoch;
//下一个free的XID
TransactionId nextXid;
//下一个free的OID
Oid nextOid;
//下一个fredd的MultiXactId
MultiXactId nextMulti;
//下一个空闲的MultiXact偏移
MultiXactOffset nextMultiOffset;
//集群范围内的最小datfrozenxid
TransactionId oldestXid;
//最小datfrozenxid所在的database
Oid oldestXidDB;
//集群范围内的最小datminmxid
MultiXactId oldestMulti;
//最小datminmxid所在的database
Oid oldestMultiDB;
//checkpoint的时间戳
pg_time_t time;
//带有有效提交时间戳的最老Xid
TransactionId oldestCommitTsXid;
//带有有效提交时间戳的最新Xid
TransactionId newestCommitTsXid;
TransactionId oldestActiveXid;
} CheckPoint;
#define XLOG_CHECKPOINT_SHUTDOWN 0x00
#define XLOG_CHECKPOINT_ONLINE 0x10
#define XLOG_NOOP 0x20
#define XLOG_NEXTOID 0x30
#define XLOG_SWITCH 0x40
#define XLOG_BACKUP_END 0x50
#define XLOG_PARAMETER_CHANGE 0x60
#define XLOG_RESTORE_POINT 0x70
#define XLOG_FPW_CHANGE 0x80
#define XLOG_END_OF_RECOVERY 0x90
#define XLOG_FPI_FOR_HINT 0xA0
#define XLOG_FPI 0xB0
CheckpointerShmem
checkpointer进程和其他后台进程之间通讯的共享内存结构.
typedef struct
{
RelFileNode rnode;//表空间/数据库/Relation信息
ForkNumber forknum;//fork编号
BlockNumber segno;
} CheckpointerRequest;
typedef struct
{
//checkpoint进程的pid(为0则进程未启动)
pid_t checkpointer_pid;
//用于保护所有的ckpt_*域
slock_t ckpt_lck;
//在checkpoint启动时计数
int ckpt_started;
//在checkpoint完成时计数
int ckpt_done;
//在checkpoint失败时计数
int ckpt_failed;
//检查点标记,在xlog.h中定义
int ckpt_flags;
//计数后台进程缓存写的次数
uint32 num_backend_writes;
//计数后台进程fsync调用次数
uint32 num_backend_fsync;
//当前的请求编号
int num_requests;
//最大的请求编号
int max_requests;
//请求数组
CheckpointerRequest requests[FLEXIBLE_ARRAY_MEMBER];
} CheckpointerShmemStruct;
//静态变量(CheckpointerShmemStruct结构体指针)
static CheckpointerShmemStruct *CheckpointerShmem;
二、源码解读
CheckpointerMain函数是checkpointer进程的入口.
该函数首先为信号设置控制器(如熟悉Java OO开发,对这样的写法应不陌生),然后创建进程的内存上下文,接着进入循环(forever),在"合适"的时候执行checkpoint.
void
CheckpointerMain(void)
{
sigjmp_buf local_sigjmp_buf;
MemoryContext checkpointer_context;
CheckpointerShmem->checkpointer_pid = MyProcPid;
//为信号设置控制器(如熟悉Java OO开发,对这样的写法应不陌生)
//设置标志,读取配置文件
pqsignal(SIGHUP, ChkptSigHupHandler);
//请求checkpoint
pqsignal(SIGINT, ReqCheckpointHandler);
//忽略SIGTERM
pqsignal(SIGTERM, SIG_IGN);
//宕机
pqsignal(SIGQUIT, chkpt_quickdie);
//忽略SIGALRM & SIGPIPE
pqsignal(SIGALRM, SIG_IGN);
pqsignal(SIGPIPE, SIG_IGN);
pqsignal(SIGUSR1, chkpt_sigusr1_handler);
//请求关闭
pqsignal(SIGUSR2, ReqShutdownHandler);
pqsignal(SIGCHLD, SIG_DFL);
//运行SIGQUIT信号
sigdelset(&BlockSig, SIGQUIT);
last_checkpoint_time = last_xlog_switch_time = (pg_time_t) time(NULL);
checkpointer_context = AllocSetContextCreate(TopMemoryContext,
"Checkpointer",
ALLOCSET_DEFAULT_SIZES);
MemoryContextSwitchTo(checkpointer_context);
if (sigsetjmp(local_sigjmp_buf, 1) != 0)
{
//没有使用PG_TRY,必须重置错误栈
error_context_stack = NULL;
//在清除期间必须避免中断
HOLD_INTERRUPTS();
//在日志中报告错误信息
EmitErrorReport();
LWLockReleaseAll();
ConditionVariableCancelSleep();
pgstat_report_wait_end();
AbortBufferIO();
UnlockBuffers();
ReleaseAuxProcessResources(false);
AtEOXact_Buffers(false);
AtEOXact_SMgr();
AtEOXact_Files(false);
AtEOXact_HashTables(false);
//通知正在等待的后台进程:checkpoint执行失败
if (ckpt_active)
{
SpinLockAcquire(&CheckpointerShmem->ckpt_lck);
CheckpointerShmem->ckpt_failed++;
CheckpointerShmem->ckpt_done = CheckpointerShmem->ckpt_started;
SpinLockRelease(&CheckpointerShmem->ckpt_lck);
ckpt_active = false;
}
MemoryContextSwitchTo(checkpointer_context);
FlushErrorState();
//在顶层上下文刷新泄漏的数据
MemoryContextResetAndDeleteChildren(checkpointer_context);
//现在我们可以允许中断了
RESUME_INTERRUPTS();
pg_usleep(1000000L);
smgrcloseall();
}
//现在可以处理ereport(ERROR)调用了.
PG_exception_stack = &local_sigjmp_buf;
PG_SETMASK(&UnBlockSig);
UpdateSharedMemoryConfig();
ProcGlobal->checkpointerLatch = &MyProc->procLatch;
for (;;)
{
bool do_checkpoint = false;//是否执行checkpoint
int flags = 0;//标记
pg_time_t now;//时间
int elapsed_secs;//已消逝的时间
int cur_timeout;//timeout时间
ResetLatch(MyLatch);
AbsorbFsyncRequests();
if (got_SIGHUP)//
{
got_SIGHUP = false;
ProcessConfigFile(PGC_SIGHUP);
UpdateSharedMemoryConfig();
}
if (checkpoint_requested)
{
//接收到checkpoint请求
checkpoint_requested = false;//重置标志
do_checkpoint = true;//需要执行checkpoint
BgWriterStats.m_requested_checkpoints++;//计数
}
if (shutdown_requested)
{
//接收到关闭请求
ExitOnAnyError = true;
//关闭数据库
ShutdownXLOG(0, 0);
//checkpointer在这里正常退出
proc_exit(0);
}
now = (pg_time_t) time(NULL);//当前时间
elapsed_secs = now - last_checkpoint_time;//已消逝的时间
if (elapsed_secs >= CheckPointTimeout)
{
//超时
if (!do_checkpoint)
BgWriterStats.m_timed_checkpoints++;//没有接收到checkpoint请求,进行统计
do_checkpoint = true;//设置标记
flags |= CHECKPOINT_CAUSE_TIME;//设置标记
}
if (do_checkpoint)
{
bool ckpt_performed = false;//设置标记
bool do_restartpoint;
do_restartpoint = RecoveryInProgress();
SpinLockAcquire(&CheckpointerShmem->ckpt_lck);
flags |= CheckpointerShmem->ckpt_flags;
CheckpointerShmem->ckpt_flags = 0;
CheckpointerShmem->ckpt_started++;
SpinLockRelease(&CheckpointerShmem->ckpt_lck);
if (flags & CHECKPOINT_END_OF_RECOVERY)
do_restartpoint = false;
if (!do_restartpoint &&
(flags & CHECKPOINT_CAUSE_XLOG) &&
elapsed_secs < CheckPointWarning)
ereport(LOG,
(errmsg_plural("checkpoints are occurring too frequently (%d second apart)",
"checkpoints are occurring too frequently (%d seconds apart)",
elapsed_secs,
elapsed_secs),
errhint("Consider increasing the configuration parameter \"max_wal_size\".")));
ckpt_active = true;
if (do_restartpoint)
//执行restartpoint
ckpt_start_recptr = GetXLogReplayRecPtr(NULL);//获取Redo pint
else
//执行checkpoint
ckpt_start_recptr = GetInsertRecPtr();//获取checkpoint XLOG Record插入的位置
ckpt_start_time = now;//开始时间
ckpt_cached_elapsed = 0;//消逝时间
if (!do_restartpoint)
{
//执行checkpoint
CreateCheckPoint(flags);//创建checkpoint
ckpt_performed = true;//DONE!
}
else
//恢复过程的restartpoint
ckpt_performed = CreateRestartPoint(flags);
smgrcloseall();
SpinLockAcquire(&CheckpointerShmem->ckpt_lck);
CheckpointerShmem->ckpt_done = CheckpointerShmem->ckpt_started;
SpinLockRelease(&CheckpointerShmem->ckpt_lck);
if (ckpt_performed)
{
//已完成checkpoint
last_checkpoint_time = now;
}
else
{
//
last_checkpoint_time = now - CheckPointTimeout + 15;
}
ckpt_active = false;
}
//在需要的时候,检查archive_timeout并切换xlog文件.
CheckArchiveTimeout();
pgstat_send_bgwriter();
//重置相关变量
now = (pg_time_t) time(NULL);
elapsed_secs = now - last_checkpoint_time;
if (elapsed_secs >= CheckPointTimeout)
continue;
cur_timeout = CheckPointTimeout - elapsed_secs;
if (XLogArchiveTimeout > 0 && !RecoveryInProgress())
{
elapsed_secs = now - last_xlog_switch_time;
if (elapsed_secs >= XLogArchiveTimeout)
continue;
cur_timeout = Min(cur_timeout, XLogArchiveTimeout - elapsed_secs);//获得最小休眠时间
}
(void) WaitLatch(MyLatch,
WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
cur_timeout * 1000L ,
WAIT_EVENT_CHECKPOINTER_MAIN);//休眠
}
}
pqsigfunc
pqsignal(int signum, pqsigfunc handler)
{
pqsigfunc prevfunc;//函数
if (signum >= PG_SIGNAL_COUNT || signum < 0)
return SIG_ERR;//验证不通过,返回错误
prevfunc = pg_signal_array[signum];//获取先前的处理函数
pg_signal_array[signum] = handler;//注册函数
return prevfunc;//返回先前注册的函数
}
XLogRecPtr
GetInsertRecPtr(void)
{
XLogRecPtr recptr;
SpinLockAcquire(&XLogCtl->info_lck);
recptr = XLogCtl->LogwrtRqst.Write;//获取插入位置
SpinLockRelease(&XLogCtl->info_lck);
return recptr;
}
三、跟踪分析
创建数据表,插入数据,执行checkpoint
testdb=# drop table t_wal_ckpt;
DROP TABLE
testdb=# create table t_wal_ckpt(c1 int not null,c2 varchar(40),c3 varchar(40));
CREATE TABLE
testdb=# insert into t_wal_ckpt(c1,c2,c3) values(1,'C2-1','C3-1');
INSERT 0 1
testdb=#
testdb=# checkpoint; --> 第一次checkpoint
更新数据,执行checkpoint.
testdb=# update t_wal_ckpt set c2 = 'C2#'||substr(c2,4,40);
UPDATE 1
testdb=# checkpoint;
启动gdb,设置信号控制
(gdb) handle SIGINT print nostop pass
SIGINT is used by the debugger.
Are you sure you want to change it? (y or n) y
Signal Stop Print Pass to program Description
SIGINT No Yes Yes Interrupt
(gdb)
(gdb) b checkpointer.c:441
Breakpoint 1 at 0x815197: file checkpointer.c, line 441.
(gdb) c
Continuing.
Program received signal SIGINT, Interrupt.
Breakpoint 1, CheckpointerMain () at checkpointer.c:441
441 flags |= CheckpointerShmem->ckpt_flags;
(gdb)
查看共享内存信息CheckpointerShmem
(gdb) p *CheckpointerShmem
$1 = {checkpointer_pid = 1650, ckpt_lck = 1 '\001', ckpt_started = 2, ckpt_done = 2, ckpt_failed = 0, ckpt_flags = 44,
num_backend_writes = 0, num_backend_fsync = 0, num_requests = 0, max_requests = 65536, requests = 0x7f2cdda07b28}
(gdb)
设置相关信息CheckpointerShmem
441 flags |= CheckpointerShmem->ckpt_flags;
(gdb) n
442 CheckpointerShmem->ckpt_flags = 0;
(gdb)
443 CheckpointerShmem->ckpt_started++;
(gdb)
444 SpinLockRelease(&CheckpointerShmem->ckpt_lck);
(gdb)
450 if (flags & CHECKPOINT_END_OF_RECOVERY)
(gdb)
460 if (!do_restartpoint &&
(gdb)
461 (flags & CHECKPOINT_CAUSE_XLOG) &&
(gdb)
460 if (!do_restartpoint &&
初始化checkpointer进程在checkpoint过程中需使用的私有变量.
其中ckpt_start_recptr为插入点,即Redo point,5521180544转换为16进制为0x1 49168780
(gdb)
474 ckpt_active = true;
(gdb)
475 if (do_restartpoint)
(gdb)
478 ckpt_start_recptr = GetInsertRecPtr();
(gdb) p XLogCtl->LogwrtRqst
$1 = {Write = 5521180544, Flush = 5521180544}
(gdb) n
479 ckpt_start_time = now;
(gdb) p ckpt_start_recptr
$2 = 5521180544
(gdb) n
480 ckpt_cached_elapsed = 0;
(gdb)
485 if (!do_restartpoint)
(gdb)
执行checkpoint.OK!
(gdb)
487 CreateCheckPoint(flags);
(gdb)
488 ckpt_performed = true;
(gdb)
关闭资源,并设置共享内存中的信息
497 smgrcloseall();
(gdb)
502 SpinLockAcquire(&CheckpointerShmem->ckpt_lck);
(gdb)
503 CheckpointerShmem->ckpt_done = CheckpointerShmem->ckpt_started;
(gdb)
504 SpinLockRelease(&CheckpointerShmem->ckpt_lck);
(gdb)
506 if (ckpt_performed)
(gdb) p CheckpointerShmem
$3 = (CheckpointerShmemStruct *) 0x7fcecc063b00
(gdb) p *CheckpointerShmem
$4 = {checkpointer_pid = 1697, ckpt_lck = 0 '\000', ckpt_started = 1, ckpt_done = 1, ckpt_failed = 0, ckpt_flags = 0,
num_backend_writes = 0, num_backend_fsync = 0, num_requests = 0, max_requests = 65536, requests = 0x7fcecc063b28}
(gdb)
checkpoint请求已清空
(gdb) p CheckpointerShmem->requests[0]
$5 = {rnode = {spcNode = 0, dbNode = 0, relNode = 0}, forknum = MAIN_FORKNUM, segno = 0}
在需要的时候,检查archive_timeout并切换xlog文件.
休眠,直至接收到信号或者需要启动新的checkpoint或xlog文件切换.
(gdb) n
513 last_checkpoint_time = now;
(gdb)
526 ckpt_active = false;
(gdb)
530 CheckArchiveTimeout();
(gdb)
539 pgstat_send_bgwriter();
(gdb)
545 now = (pg_time_t) time(NULL);
(gdb)
546 elapsed_secs = now - last_checkpoint_time;
(gdb)
547 if (elapsed_secs >= CheckPointTimeout)
(gdb) p elapsed_secs
$7 = 1044
(gdb) p CheckPointTimeout
$8 = 900
(gdb) n
548 continue;
已超时,执行新的checkpoint
(gdb)
569 }
(gdb)
352 bool do_checkpoint = false;
(gdb)
353 int flags = 0;
(gdb) n
360 ResetLatch(MyLatch);
(gdb)
365 AbsorbFsyncRequests();
(gdb)
367 if (got_SIGHUP)
(gdb)
385 if (checkpoint_requested)
(gdb)
391 if (shutdown_requested)
(gdb)
410 now = (pg_time_t) time(NULL);
(gdb)
411 elapsed_secs = now - last_checkpoint_time;
(gdb)
412 if (elapsed_secs >= CheckPointTimeout)
(gdb) p elapsed_secs
$9 = 1131
(gdb) n
414 if (!do_checkpoint)
(gdb)
415 BgWriterStats.m_timed_checkpoints++;
(gdb)
416 do_checkpoint = true;
(gdb)
417 flags |= CHECKPOINT_CAUSE_TIME;
(gdb)
423 if (do_checkpoint)
(gdb)
425 bool ckpt_performed = false;
(gdb)
433 do_restartpoint = RecoveryInProgress();
(gdb)
440 SpinLockAcquire(&CheckpointerShmem->ckpt_lck);
(gdb)
Breakpoint 1, CheckpointerMain () at checkpointer.c:441
441 flags |= CheckpointerShmem->ckpt_flags;
(gdb)
442 CheckpointerShmem->ckpt_flags = 0;
(gdb)
443 CheckpointerShmem->ckpt_started++;
(gdb) c
Continuing.
“PostgreSQL的后台进程checkpointer分析”的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注亿速云网站,小编将为大家输出更多高质量的实用文章!
免责声明:
① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。
② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341