PostgreSQL 源码解读(155)- 后台进程#7(walsender#3)
本节继续介绍PostgreSQL的后台进程walsender,重点介绍的是调用栈中的函数WalSndLoop->WaitLatchOrSocket->WaitEventSetWait->WaitEventSetWaitBlock.
调用栈如下:
(gdb) bt
#0 0x00007fb6e6390903 in __epoll_wait_nocancel () from /lib64/libc.so.6
#1 0x000000000088e668 in WaitEventSetWaitBlock (set=0x10ac808, cur_timeout=29999, occurred_events=0x7ffd634441b0,
nevents=1) at latch.c:1048
#2 0x000000000088e543 in WaitEventSetWait (set=0x10ac808, timeout=29999, occurred_events=0x7ffd634441b0, nevents=1,
wait_event_info=83886092) at latch.c:1000
#3 0x000000000088dcec in WaitLatchOrSocket (latch=0x7fb6dcbfc4d4, wakeEvents=27, sock=10, timeout=29999,
wait_event_info=83886092) at latch.c:385
#4 0x000000000085405b in WalSndLoop (send_data=0x8547fe <XLogSendPhysical>) at walsender.c:2229
#5 0x0000000000851c93 in StartReplication (cmd=0x10ab750) at walsender.c:684
#6 0x00000000008532f0 in exec_replication_command (cmd_string=0x101dd78 "START_REPLICATION 0/5D000000 TIMELINE 16")
at walsender.c:1539
#7 0x00000000008c0170 in PostgresMain (argc=1, argv=0x1049cb8, dbname=0x1049ba8 "", username=0x1049b80 "replicator")
at postgres.c:4178
#8 0x000000000081e06c in BackendRun (port=0x103fb50) at postmaster.c:4361
#9 0x000000000081d7df in BackendStartup (port=0x103fb50) at postmaster.c:4033
#10 0x0000000000819bd9 in ServerLoop () at postmaster.c:1706
#11 0x000000000081948f in PostmasterMain (argc=1, argv=0x1018a50) at postmaster.c:1379
#12 0x0000000000742931 in main (argc=1, argv=0x1018a50) at main.c:228
一、数据结构
WaitEvent
WaitEvent等待事件结构体
typedef struct WaitEvent
{
//在event数据结构体中的位置
int pos;
//已触发的事件
uint32 events;
//与该事件相关的socket fd(文件描述符)
pgsocket fd;
//在AddWaitEventToSet中提供的指针
void *user_data;
#ifdef WIN32
//WIN32 是否已重置?
bool reset;
#endif
} WaitEvent;
WaitEventSet
WaitEventSet等待事件集
//latch.h中定义的类型
struct WaitEventSet
{
//注册的事件数
int nevents;
//该集合中最大的事件数
int nevents_space;
WaitEvent *events;
Latch *latch;
int latch_pos;
bool exit_on_postmaster_death;
#if defined(WAIT_USE_EPOLL)
//使用epoll
int epoll_fd;
//epoll_wait在用户提供的数组中返回事件,只需要分配一次
struct epoll_event *epoll_ret_events;
#elif defined(WAIT_USE_POLL)
//使用poll
//poll期望事件在每一次poll()调用时等待,只需要准备一次
struct pollfd *pollfds;
#elif defined(WAIT_USE_WIN32)
//WIN32
HANDLE *handles;
#endif
};
二、源码解读
WalSndLoop
通过Copy处理WAL流数据的walsender进程主循环.
其主要逻辑如下:
1.获取时间戳,设置相关标记
2.进入循环
2.1重置MyLatch,检查中断
2.2处理最近接收的请求或信号
2.3检查客户端输入
2.4如果从客户端接收到CopyDone信号,并且buffer为空,则退出循环
2.5执行相关处理并设置WalSndCaughtUp变量
2.6如WalSndCaughtUp为T并且没有挂起待处理的数据
2.6.1设置状态并判断接收的信号执行相关处理
2.7检查复制是否超时以及是否需要发送Keepalive
2.8如处于CaughtUp状态并且仍未完成Streaming或者存在挂起的数据
则设置等待时间,执行WaitLatchOrSocket
//通过Copy处理WAL流数据的walsender进程主循环
static void
WalSndLoop(WalSndSendDataCallback send_data)
{
last_reply_timestamp = GetCurrentTimestamp();
waiting_for_ping_response = false;
for (;;)
{
//清除所有已挂起的wakeups.
ResetLatch(MyLatch);
//检查中断
CHECK_FOR_INTERRUPTS();
//处理最近接收的请求或信号
if (ConfigReloadPending)
{
ConfigReloadPending = false;
ProcessConfigFile(PGC_SIGHUP);
SyncRepInitConfig();
}
//检查客户端输入
ProcessRepliesIfAny();
if (streamingDoneReceiving && streamingDoneSending &&
!pq_is_send_pending())
//跳出循环
break;
if (!pq_is_send_pending())
//发送数据
send_data();
else
WalSndCaughtUp = false;
//尝试刷新挂起的输出到客户端
if (pq_flush_if_writable() != 0)
WalSndShutdown();
//如果现在没有遗留数据
if (WalSndCaughtUp && !pq_is_send_pending())
{
if (MyWalSnd->state == WALSNDSTATE_CATCHUP)
{
ereport(DEBUG1,
(errmsg("\"%s\" has now caught up with upstream server",
application_name)));
//设置状态
WalSndSetState(WALSNDSTATE_STREAMING);
}
if (got_SIGUSR2)
WalSndDone(send_data);
}
//检查复制超时
WalSndCheckTimeOut();
//是时候发送keepalive了
WalSndKeepaliveIfNecessary();
if ((WalSndCaughtUp && !streamingDoneSending) || pq_is_send_pending())
{
long sleeptime;
int wakeEvents;
wakeEvents = WL_LATCH_SET | WL_EXIT_ON_PM_DEATH | WL_TIMEOUT |
WL_SOCKET_READABLE;
sleeptime = WalSndComputeSleeptime(GetCurrentTimestamp());
if (pq_is_send_pending())
wakeEvents |= WL_SOCKET_WRITEABLE;
//休眠直至某些事情发送或者超时
(void) WaitLatchOrSocket(MyLatch, wakeEvents,
MyProcPort->sock, sleeptime,
WAIT_EVENT_WAL_SENDER_MAIN);
}
}
return;
}
WaitLatchOrSocket
该函数与WaitLatch类似,但额外有一个socket参数,用于WL
SOCKET
*.
在WaitEventSet中添加等待事件,调用函数WaitEventSetWait,等待事件的发生或者超时.
int
WaitLatchOrSocket(Latch *latch, int wakeEvents, pgsocket sock,
long timeout, uint32 wait_event_info)
{
int ret = 0;
int rc;
WaitEvent event;
//创建WaitEventSet,性能上的考虑
WaitEventSet *set = CreateWaitEventSet(CurrentMemoryContext, 3);
if (wakeEvents & WL_TIMEOUT)
//超时
Assert(timeout >= 0);
else
timeout = -1;
if (wakeEvents & WL_LATCH_SET)
//设置了LATCH
AddWaitEventToSet(set, WL_LATCH_SET, PGINVALID_SOCKET,
latch, NULL);
//Postmaster-managed的调用者必须处理postmaster崩溃的情况.
Assert(!IsUnderPostmaster ||
(wakeEvents & WL_EXIT_ON_PM_DEATH) ||
(wakeEvents & WL_POSTMASTER_DEATH));
if ((wakeEvents & WL_POSTMASTER_DEATH) && IsUnderPostmaster)
AddWaitEventToSet(set, WL_POSTMASTER_DEATH, PGINVALID_SOCKET,
NULL, NULL);
if ((wakeEvents & WL_EXIT_ON_PM_DEATH) && IsUnderPostmaster)
AddWaitEventToSet(set, WL_EXIT_ON_PM_DEATH, PGINVALID_SOCKET,
NULL, NULL);
if (wakeEvents & WL_SOCKET_MASK)
{
int ev;
ev = wakeEvents & WL_SOCKET_MASK;
AddWaitEventToSet(set, ev, sock, NULL, NULL);
}
rc = WaitEventSetWait(set, timeout, &event, 1, wait_event_info);
if (rc == 0)
ret |= WL_TIMEOUT;
else
{
ret |= event.events & (WL_LATCH_SET |
WL_POSTMASTER_DEATH |
WL_SOCKET_MASK);
}
//释放资源
FreeWaitEventSet(set);
return ret;
}
WaitEventSetWait
等待加入到等待事件集合中的事件发生,或者直至超时.
循环等待,调用WaitEventSetWaitBlock,直至有事件发生.
int
WaitEventSetWait(WaitEventSet *set, long timeout,
WaitEvent *occurred_events, int nevents,
uint32 wait_event_info)
{
int returned_events = 0;
instr_time start_time;
instr_time cur_time;
long cur_timeout = -1;
Assert(nevents > 0);
if (timeout >= 0)
{
INSTR_TIME_SET_CURRENT(start_time);
Assert(timeout >= 0 && timeout <= INT_MAX);
cur_timeout = timeout;
}
pgstat_report_wait_start(wait_event_info);
#ifndef WIN32
waiting = true;
#else
pgwin32_dispatch_queued_signals();
#endif
while (returned_events == 0)//未有事件发送
{
int rc;
if (set->latch && set->latch->is_set)
{
//已设置latch
occurred_events->fd = PGINVALID_SOCKET;
occurred_events->pos = set->latch_pos;
occurred_events->user_data =
set->events[set->latch_pos].user_data;//用户数据
occurred_events->events = WL_LATCH_SET;
occurred_events++;
returned_events++;
//退出循环
break;
}
rc = WaitEventSetWaitBlock(set, cur_timeout,
occurred_events, nevents);
if (rc == -1)
//超时
break;
else
//事件数
returned_events = rc;
//如为完成,更新cur_timeout已备下次循环迭代
if (returned_events == 0 && timeout >= 0)
{
INSTR_TIME_SET_CURRENT(cur_time);
INSTR_TIME_SUBTRACT(cur_time, start_time);
cur_timeout = timeout - (long) INSTR_TIME_GET_MILLISEC(cur_time);
if (cur_timeout <= 0)
break;
}
}
#ifndef WIN32
waiting = false;
#endif
pgstat_report_wait_end();
return returned_events;
}
WaitEventSetWaitBlock
使用linux’s epoll_wait(2)等待.
调用epoll_wait方法,如发生事件,遍历events执行相关处理.
#if defined(WAIT_USE_EPOLL)
static inline int
WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout,
WaitEvent *occurred_events, int nevents)
{
int returned_events = 0;
int rc;
WaitEvent *cur_event;
struct epoll_event *cur_epoll_event;
//休眠
rc = epoll_wait(set->epoll_fd, set->epoll_ret_events,
nevents, cur_timeout);
//检查返回代码
if (rc < 0)
{
//验证EINTR是否ok,否则报错
if (errno != EINTR)
{
waiting = false;
ereport(ERROR,
(errcode_for_socket_access(),
errmsg("epoll_wait() failed: %m")));
}
return 0;
}
else if (rc == 0)
{
//超时
return -1;
}
for (cur_epoll_event = set->epoll_ret_events;
cur_epoll_event < (set->epoll_ret_events + rc) &&
returned_events < nevents;
cur_epoll_event++)
{
//epoll的数据指针设置为关联WaitEvent
cur_event = (WaitEvent *) cur_epoll_event->data.ptr;
occurred_events->pos = cur_event->pos;
occurred_events->user_data = cur_event->user_data;
occurred_events->events = 0;
if (cur_event->events == WL_LATCH_SET &&
cur_epoll_event->events & (EPOLLIN | EPOLLERR | EPOLLHUP))
{
//------------- 出现事件
//在self-pipe中存在数据,清除之
drainSelfPipe();
if (set->latch->is_set)
{
occurred_events->fd = PGINVALID_SOCKET;
occurred_events->events = WL_LATCH_SET;
occurred_events++;
returned_events++;
}
}
else if (cur_event->events == WL_POSTMASTER_DEATH &&
cur_epoll_event->events & (EPOLLIN | EPOLLERR | EPOLLHUP))
{
//------------- postmaster挂了
if (!PostmasterIsAliveInternal())
{
if (set->exit_on_postmaster_death)
proc_exit(1);
occurred_events->fd = PGINVALID_SOCKET;
occurred_events->events = WL_POSTMASTER_DEATH;
occurred_events++;
returned_events++;
}
}
else if (cur_event->events & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE))
{
//--------- socket可读写
Assert(cur_event->fd != PGINVALID_SOCKET);
if ((cur_event->events & WL_SOCKET_READABLE) &&
(cur_epoll_event->events & (EPOLLIN | EPOLLERR | EPOLLHUP)))
{
//socket已有可用数据,或者已达末尾(EOF)
occurred_events->events |= WL_SOCKET_READABLE;
}
if ((cur_event->events & WL_SOCKET_WRITEABLE) &&
(cur_epoll_event->events & (EPOLLOUT | EPOLLERR | EPOLLHUP)))
{
//可写或者EOF
occurred_events->events |= WL_SOCKET_WRITEABLE;
}
if (occurred_events->events != 0)
{
occurred_events->fd = cur_event->fd;
occurred_events++;
returned_events++;
}
}
}
return returned_events;
}
#elif defined(WAIT_USE_POLL)
static inline int
WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout,
WaitEvent *occurred_events, int nevents)
{
int returned_events = 0;
int rc;
WaitEvent *cur_event;
struct pollfd *cur_pollfd;
rc = poll(set->pollfds, set->nevents, (int) cur_timeout);
if (rc < 0)
{
if (errno != EINTR)
{
waiting = false;
ereport(ERROR,
(errcode_for_socket_access(),
errmsg("poll() failed: %m")));
}
return 0;
}
else if (rc == 0)
{
return -1;
}
for (cur_event = set->events, cur_pollfd = set->pollfds;
cur_event < (set->events + set->nevents) &&
returned_events < nevents;
cur_event++, cur_pollfd++)
{
if (cur_pollfd->revents == 0)
continue;
occurred_events->pos = cur_event->pos;
occurred_events->user_data = cur_event->user_data;
occurred_events->events = 0;
if (cur_event->events == WL_LATCH_SET &&
(cur_pollfd->revents & (POLLIN | POLLHUP | POLLERR | POLLNVAL)))
{
drainSelfPipe();
if (set->latch->is_set)
{
occurred_events->fd = PGINVALID_SOCKET;
occurred_events->events = WL_LATCH_SET;
occurred_events++;
returned_events++;
}
}
else if (cur_event->events == WL_POSTMASTER_DEATH &&
(cur_pollfd->revents & (POLLIN | POLLHUP | POLLERR | POLLNVAL)))
{
if (!PostmasterIsAliveInternal())
{
if (set->exit_on_postmaster_death)
proc_exit(1);
occurred_events->fd = PGINVALID_SOCKET;
occurred_events->events = WL_POSTMASTER_DEATH;
occurred_events++;
returned_events++;
}
}
else if (cur_event->events & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE))
{
int errflags = POLLHUP | POLLERR | POLLNVAL;
Assert(cur_event->fd >= PGINVALID_SOCKET);
if ((cur_event->events & WL_SOCKET_READABLE) &&
(cur_pollfd->revents & (POLLIN | errflags)))
{
occurred_events->events |= WL_SOCKET_READABLE;
}
if ((cur_event->events & WL_SOCKET_WRITEABLE) &&
(cur_pollfd->revents & (POLLOUT | errflags)))
{
occurred_events->events |= WL_SOCKET_WRITEABLE;
}
if (occurred_events->events != 0)
{
occurred_events->fd = cur_event->fd;
occurred_events++;
returned_events++;
}
}
}
return returned_events;
}
#elif defined(WAIT_USE_WIN32)
static inline int
WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout,
WaitEvent *occurred_events, int nevents)
{
int returned_events = 0;
DWORD rc;
WaitEvent *cur_event;
for (cur_event = set->events;
cur_event < (set->events + set->nevents);
cur_event++)
{
if (cur_event->reset)
{
WaitEventAdjustWin32(set, cur_event);
cur_event->reset = false;
}
if (cur_event->events & WL_SOCKET_WRITEABLE)
{
char c;
WSABUF buf;
DWORD sent;
int r;
buf.buf = &c;
buf.len = 0;
r = WSASend(cur_event->fd, &buf, 1, &sent, 0, NULL, NULL);
if (r == 0 || WSAGetLastError() != WSAEWOULDBLOCK)
{
occurred_events->pos = cur_event->pos;
occurred_events->user_data = cur_event->user_data;
occurred_events->events = WL_SOCKET_WRITEABLE;
occurred_events->fd = cur_event->fd;
return 1;
}
}
}
rc = WaitForMultipleObjects(set->nevents + 1, set->handles, FALSE,
cur_timeout);
if (rc == WAIT_FAILED)
elog(ERROR, "WaitForMultipleObjects() failed: error code %lu",
GetLastError());
else if (rc == WAIT_TIMEOUT)
{
return -1;
}
if (rc == WAIT_OBJECT_0)
{
pgwin32_dispatch_queued_signals();
return 0;
}
cur_event = (WaitEvent *) &set->events[rc - WAIT_OBJECT_0 - 1];
occurred_events->pos = cur_event->pos;
occurred_events->user_data = cur_event->user_data;
occurred_events->events = 0;
if (cur_event->events == WL_LATCH_SET)
{
if (!ResetEvent(set->latch->event))
elog(ERROR, "ResetEvent failed: error code %lu", GetLastError());
if (set->latch->is_set)
{
occurred_events->fd = PGINVALID_SOCKET;
occurred_events->events = WL_LATCH_SET;
occurred_events++;
returned_events++;
}
}
else if (cur_event->events == WL_POSTMASTER_DEATH)
{
if (!PostmasterIsAliveInternal())
{
if (set->exit_on_postmaster_death)
proc_exit(1);
occurred_events->fd = PGINVALID_SOCKET;
occurred_events->events = WL_POSTMASTER_DEATH;
occurred_events++;
returned_events++;
}
}
else if (cur_event->events & WL_SOCKET_MASK)
{
WSANETWORKEVENTS resEvents;
HANDLE handle = set->handles[cur_event->pos + 1];
Assert(cur_event->fd);
occurred_events->fd = cur_event->fd;
ZeroMemory(&resEvents, sizeof(resEvents));
if (WSAEnumNetworkEvents(cur_event->fd, handle, &resEvents) != 0)
elog(ERROR, "failed to enumerate network events: error code %u",
WSAGetLastError());
if ((cur_event->events & WL_SOCKET_READABLE) &&
(resEvents.lNetworkEvents & FD_READ))
{
occurred_events->events |= WL_SOCKET_READABLE;
cur_event->reset = true;
}
if ((cur_event->events & WL_SOCKET_WRITEABLE) &&
(resEvents.lNetworkEvents & FD_WRITE))
{
occurred_events->events |= WL_SOCKET_WRITEABLE;
}
if ((cur_event->events & WL_SOCKET_CONNECTED) &&
(resEvents.lNetworkEvents & FD_CONNECT))
{
occurred_events->events |= WL_SOCKET_CONNECTED;
}
if (resEvents.lNetworkEvents & FD_CLOSE)
{
occurred_events->events |= (cur_event->events & WL_SOCKET_MASK);
}
if (occurred_events->events != 0)
{
occurred_events++;
returned_events++;
}
}
return returned_events;
}
#endif
三、跟踪分析
在主节点上用gdb跟踪postmaster,在PostgresMain上设置断点后启动standby节点,进入断点
[xdb@localhost ~]$ ps -ef|grep postgres
xdb 1376 1 1 14:16 pts/0 00:00:00 /appdb/xdb/pg11.2/bin/postgres
[xdb@localhost ~]$ gdb -p 1376
GNU gdb (GDB) Red Hat Enterprise Linux 7.6.1-100.el7
...
(gdb) set follow-fork-mode child
(gdb) b WalSndLoop
Breakpoint 1 at 0x853e63: file walsender.c, line 2111.
(gdb) c
Continuing.
[New process 1450]
[Thread debugging using libthread_db enabled]
Using host libthread_db library "/lib64/libthread_db.so.1".
[Switching to Thread 0x7f17cfa9a8c0 (LWP 1450)]
Breakpoint 1, WalSndLoop (send_data=0x8547fe <XLogSendPhysical>) at walsender.c:2111
2111 last_reply_timestamp = GetCurrentTimestamp();
(gdb)
获取时间戳,设置相关标记
(gdb) n
2112 waiting_for_ping_response = false;
(gdb) p last_reply_timestamp
$1 = 606818445090174
(gdb)
重置MyLatch
(gdb) n
2124 if (!PostmasterIsAlive())
(gdb)
2128 ResetLatch(MyLatch);
(gdb) p MyLatch
$2 = (struct Latch *) 0x7f17c46994d4
(gdb) p *MyLatch
$3 = {is_set = 1, is_shared = true, owner_pid = 1465}
(gdb) n
2130 CHECK_FOR_INTERRUPTS();
(gdb) p *MyLatch
$4 = {is_set = 0, is_shared = true, owner_pid = 1465}
(gdb)
处理最近接收到的信号
(gdb) n
2133 if (ConfigReloadPending)
(gdb)
2141 ProcessRepliesIfAny();
(gdb)
[Inferior 2 (process 1465) exited normally]
(gdb)
进程退出,新产生了进程1466
xdb 1466 1376 0 16:41 ? 00:00:00 postgres: walsender replicator 192.168.26.26(40516) streaming 0/5D032830
跟踪1466进程
(gdb) attach 1466
Attaching to program: /appdb/xdb/pg11.2/bin/postgres, process 1466
Reading symbols from /lib64/libpthread.so.0...(no debugging symbols found)...done.
[Thread debugging using libthread_db enabled]
...
执行SQL
testdb=# drop table t1;
DROP TABLE
接收到信号SIGUSR1,查看调用栈如下
Program received signal SIGUSR1, User defined signal 1.
0x00007f17cde2d903 in __epoll_wait_nocancel () from /lib64/libc.so.6
(gdb) bt
#0 0x00007f17cde2d903 in __epoll_wait_nocancel () from /lib64/libc.so.6
#1 0x000000000088e668 in WaitEventSetWaitBlock (set=0x296e7c8, cur_timeout=29999, occurred_events=0x7fffed781d00,
nevents=1) at latch.c:1048
DONE!
四、参考资料
PG Source Code
免责声明:
① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。
② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341
PostgreSQL 源码解读(155)- 后台进程#7(walsender#3)
下载Word文档到电脑,方便收藏和打印~