1 /*-------------------------------------------------------------------------
5 * The WAL receiver process (walreceiver) is new as of Postgres 9.0. It
6 * is the process in the standby server that takes charge of receiving
7 * XLOG records from a primary server during streaming replication.
9 * When the startup process determines that it's time to start streaming,
10 * it instructs postmaster to start walreceiver. Walreceiver first connects
11 * to the primary server (it will be served by a walsender process
12 * in the primary server), and then keeps receiving XLOG records and
13 * writing them to the disk as long as the connection is alive. As XLOG
14 * records are received and flushed to disk, it updates the
15 * WalRcv->receivedUpTo variable in shared memory, to inform the startup
16 * process of how far it can proceed with XLOG replay.
18 * Normal termination is by SIGTERM, which instructs the walreceiver to
19 * exit(0). Emergency termination is by SIGQUIT; like any postmaster child
20 * process, the walreceiver will simply abort and exit on SIGQUIT. A close
21 * of the connection and a FATAL error are treated not as a crash but as
24 * This file contains the server-facing parts of walreceiver. The libpq-
25 * specific parts are in the libpqwalreceiver module. It's loaded
26 * dynamically to avoid linking the server with libpq.
28 * Portions Copyright (c) 2010-2010, PostgreSQL Global Development Group
32 * $PostgreSQL: pgsql/src/backend/replication/walreceiver.c,v 1.17 2010/09/15 10:35:05 heikki Exp $
34 *-------------------------------------------------------------------------
41 #include "access/xlog_internal.h"
42 #include "libpq/pqsignal.h"
43 #include "miscadmin.h"
44 #include "replication/walprotocol.h"
45 #include "replication/walreceiver.h"
46 #include "storage/ipc.h"
47 #include "storage/pmsignal.h"
48 #include "utils/builtins.h"
49 #include "utils/guc.h"
50 #include "utils/memutils.h"
51 #include "utils/ps_status.h"
52 #include "utils/resowner.h"
54 /* Global variable to indicate if this process is a walreceiver process */
57 /* libpqreceiver hooks to these when loaded */
58 walrcv_connect_type walrcv_connect = NULL;
59 walrcv_receive_type walrcv_receive = NULL;
60 walrcv_disconnect_type walrcv_disconnect = NULL;
62 #define NAPTIME_PER_CYCLE 100 /* max sleep time between cycles (100ms) */
65 * These variables are used similarly to openLogFile/Id/Seg/Off,
66 * but for walreceiver to write the XLOG.
68 static int recvFile = -1;
69 static uint32 recvId = 0;
70 static uint32 recvSeg = 0;
71 static uint32 recvOff = 0;
74 * Flags set by interrupt handlers of walreceiver for later service in the
77 static volatile sig_atomic_t got_SIGHUP = false;
78 static volatile sig_atomic_t got_SIGTERM = false;
81 * LogstreamResult indicates the byte positions that we have already
86 XLogRecPtr Write; /* last byte + 1 written out in the standby */
87 XLogRecPtr Flush; /* last byte + 1 flushed in the standby */
91 * About SIGTERM handling:
93 * We can't just exit(1) within SIGTERM signal handler, because the signal
94 * might arrive in the middle of some critical operation, like while we're
95 * holding a spinlock. We also can't just set a flag in signal handler and
96 * check it in the main loop, because we perform some blocking operations
97 * like libpqrcv_PQexec(), which can take a long time to finish.
99 * We use a combined approach: When WalRcvImmediateInterruptOK is true, it's
100 * safe for the signal handler to elog(FATAL) immediately. Otherwise it just
101 * sets got_SIGTERM flag, which is checked in the main loop when convenient.
103 * This is very much like what regular backends do with ImmediateInterruptOK,
104 * ProcessInterrupts() etc.
106 static volatile bool WalRcvImmediateInterruptOK = false;
108 /* Prototypes for private functions */
109 static void ProcessWalRcvInterrupts(void);
110 static void EnableWalRcvImmediateExit(void);
111 static void DisableWalRcvImmediateExit(void);
112 static void WalRcvDie(int code, Datum arg);
113 static void XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len);
114 static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr);
115 static void XLogWalRcvFlush(void);
117 /* Signal handlers */
118 static void WalRcvSigHupHandler(SIGNAL_ARGS);
119 static void WalRcvShutdownHandler(SIGNAL_ARGS);
120 static void WalRcvQuickDieHandler(SIGNAL_ARGS);
124 ProcessWalRcvInterrupts(void)
127 * Although walreceiver interrupt handling doesn't use the same scheme as
128 * regular backends, call CHECK_FOR_INTERRUPTS() to make sure we receive
129 * any incoming signals on Win32.
131 CHECK_FOR_INTERRUPTS();
135 WalRcvImmediateInterruptOK = false;
137 (errcode(ERRCODE_ADMIN_SHUTDOWN),
138 errmsg("terminating walreceiver process due to administrator command")));
143 EnableWalRcvImmediateExit(void)
145 WalRcvImmediateInterruptOK = true;
146 ProcessWalRcvInterrupts();
150 DisableWalRcvImmediateExit(void)
152 WalRcvImmediateInterruptOK = false;
153 ProcessWalRcvInterrupts();
156 /* Main entry point for walreceiver process */
158 WalReceiverMain(void)
160 char conninfo[MAXCONNINFO];
161 XLogRecPtr startpoint;
163 /* use volatile pointer to prevent code rearrangement */
164 volatile WalRcvData *walrcv = WalRcv;
166 am_walreceiver = true;
169 * WalRcv should be set up already (if we are a backend, we inherit this
170 * by fork() or EXEC_BACKEND mechanism from the postmaster).
172 Assert(walrcv != NULL);
175 * Mark walreceiver as running in shared memory.
177 * Do this as early as possible, so that if we fail later on, we'll set
178 * state to STOPPED. If we die before this, the startup process will keep
179 * waiting for us to start up, until it times out.
181 SpinLockAcquire(&walrcv->mutex);
182 Assert(walrcv->pid == 0);
183 switch (walrcv->walRcvState)
185 case WALRCV_STOPPING:
186 /* If we've already been requested to stop, don't start up. */
187 walrcv->walRcvState = WALRCV_STOPPED;
191 SpinLockRelease(&walrcv->mutex);
195 case WALRCV_STARTING:
200 /* Shouldn't happen */
201 elog(PANIC, "walreceiver still running according to shared memory state");
203 /* Advertise our PID so that the startup process can kill us */
204 walrcv->pid = MyProcPid;
205 walrcv->walRcvState = WALRCV_RUNNING;
207 /* Fetch information required to start streaming */
208 strlcpy(conninfo, (char *) walrcv->conninfo, MAXCONNINFO);
209 startpoint = walrcv->receivedUpto;
210 SpinLockRelease(&walrcv->mutex);
212 /* Arrange to clean up at walreceiver exit */
213 on_shmem_exit(WalRcvDie, 0);
216 * If possible, make this process a group leader, so that the postmaster
217 * can signal any child processes too. (walreceiver probably never has
218 * any child processes, but for consistency we make all postmaster child
219 * processes do this.)
223 elog(FATAL, "setsid() failed: %m");
226 /* Properly accept or ignore signals the postmaster might send us */
227 pqsignal(SIGHUP, WalRcvSigHupHandler); /* set flag to read config
229 pqsignal(SIGINT, SIG_IGN);
230 pqsignal(SIGTERM, WalRcvShutdownHandler); /* request shutdown */
231 pqsignal(SIGQUIT, WalRcvQuickDieHandler); /* hard crash time */
232 pqsignal(SIGALRM, SIG_IGN);
233 pqsignal(SIGPIPE, SIG_IGN);
234 pqsignal(SIGUSR1, SIG_IGN);
235 pqsignal(SIGUSR2, SIG_IGN);
237 /* Reset some signals that are accepted by postmaster but not here */
238 pqsignal(SIGCHLD, SIG_DFL);
239 pqsignal(SIGTTIN, SIG_DFL);
240 pqsignal(SIGTTOU, SIG_DFL);
241 pqsignal(SIGCONT, SIG_DFL);
242 pqsignal(SIGWINCH, SIG_DFL);
244 /* We allow SIGQUIT (quickdie) at all times */
245 sigdelset(&BlockSig, SIGQUIT);
247 /* Load the libpq-specific functions */
248 load_file("libpqwalreceiver", false);
249 if (walrcv_connect == NULL || walrcv_receive == NULL ||
250 walrcv_disconnect == NULL)
251 elog(ERROR, "libpqwalreceiver didn't initialize correctly");
254 * Create a resource owner to keep track of our resources (not clear that
255 * we need this, but may as well have one).
257 CurrentResourceOwner = ResourceOwnerCreate(NULL, "Wal Receiver");
259 /* Unblock signals (they were blocked when the postmaster forked us) */
260 PG_SETMASK(&UnBlockSig);
262 /* Establish the connection to the primary for XLOG streaming */
263 EnableWalRcvImmediateExit();
264 walrcv_connect(conninfo, startpoint);
265 DisableWalRcvImmediateExit();
267 /* Loop until end-of-streaming or error */
275 * Emergency bailout if postmaster has died. This is to avoid the
276 * necessity for manual cleanup of all postmaster children.
278 if (!PostmasterIsAlive(true))
282 * Exit walreceiver if we're not in recovery. This should not happen,
283 * but cross-check the status here.
285 if (!RecoveryInProgress())
287 (errmsg("cannot continue WAL streaming, recovery has already ended")));
289 /* Process any requests or signals received recently */
290 ProcessWalRcvInterrupts();
295 ProcessConfigFile(PGC_SIGHUP);
298 /* Wait a while for data to arrive */
299 if (walrcv_receive(NAPTIME_PER_CYCLE, &type, &buf, &len))
301 /* Accept the received data, and process it */
302 XLogWalRcvProcessMsg(type, buf, len);
304 /* Receive any more data we can without sleeping */
305 while (walrcv_receive(0, &type, &buf, &len))
306 XLogWalRcvProcessMsg(type, buf, len);
309 * If we've written some records, flush them to disk and let the
310 * startup process know about them.
318 * Mark us as STOPPED in shared memory at exit.
321 WalRcvDie(int code, Datum arg)
323 /* use volatile pointer to prevent code rearrangement */
324 volatile WalRcvData *walrcv = WalRcv;
326 SpinLockAcquire(&walrcv->mutex);
327 Assert(walrcv->walRcvState == WALRCV_RUNNING ||
328 walrcv->walRcvState == WALRCV_STOPPING);
329 walrcv->walRcvState = WALRCV_STOPPED;
331 SpinLockRelease(&walrcv->mutex);
333 /* Terminate the connection gracefully. */
334 if (walrcv_disconnect != NULL)
338 /* SIGHUP: set flag to re-read config file at next convenient time */
340 WalRcvSigHupHandler(SIGNAL_ARGS)
345 /* SIGTERM: set flag for main loop, or shutdown immediately if safe */
347 WalRcvShutdownHandler(SIGNAL_ARGS)
351 /* Don't joggle the elbow of proc_exit */
352 if (!proc_exit_inprogress && WalRcvImmediateInterruptOK)
353 ProcessWalRcvInterrupts();
357 * WalRcvQuickDieHandler() occurs when signalled SIGQUIT by the postmaster.
359 * Some backend has bought the farm, so we need to stop what we're doing and
363 WalRcvQuickDieHandler(SIGNAL_ARGS)
365 PG_SETMASK(&BlockSig);
368 * We DO NOT want to run proc_exit() callbacks -- we're here because
369 * shared memory may be corrupted, so we don't want to try to clean up our
370 * transaction. Just nail the windows shut and get out of town. Now that
371 * there's an atexit callback to prevent third-party code from breaking
372 * things by calling exit() directly, we have to reset the callbacks
373 * explicitly to make this work as intended.
378 * Note we do exit(2) not exit(0). This is to force the postmaster into a
379 * system reset cycle if some idiot DBA sends a manual SIGQUIT to a random
380 * backend. This is necessary precisely because we don't clean up our
381 * shared memory state. (The "dead man switch" mechanism in pmsignal.c
382 * should ensure the postmaster sees this as a crash, too, but no harm in
383 * being doubly sure.)
389 * Accept the message from XLOG stream, and process it.
392 XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len)
396 case 'w': /* WAL records */
398 WalDataMessageHeader msghdr;
400 if (len < sizeof(WalDataMessageHeader))
402 (errcode(ERRCODE_PROTOCOL_VIOLATION),
403 errmsg_internal("invalid WAL message received from primary")));
404 /* memcpy is required here for alignment reasons */
405 memcpy(&msghdr, buf, sizeof(WalDataMessageHeader));
406 buf += sizeof(WalDataMessageHeader);
407 len -= sizeof(WalDataMessageHeader);
409 XLogWalRcvWrite(buf, len, msghdr.dataStart);
414 (errcode(ERRCODE_PROTOCOL_VIOLATION),
415 errmsg_internal("invalid replication message type %d",
421 * Write XLOG data to disk.
424 XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr)
433 if (recvFile < 0 || !XLByteInSeg(recptr, recvId, recvSeg))
438 * fsync() and close current file before we switch to next one. We
439 * would otherwise have to reopen this file to fsync it later
446 * XLOG segment files will be re-read by recovery in startup
447 * process soon, so we don't advise the OS to release cache
448 * pages associated with the file like XLogFileClose() does.
450 if (close(recvFile) != 0)
452 (errcode_for_file_access(),
453 errmsg("could not close log file %u, segment %u: %m",
458 /* Create/use new log file */
459 XLByteToSeg(recptr, recvId, recvSeg);
461 recvFile = XLogFileInit(recvId, recvSeg, &use_existent, true);
465 /* Calculate the start offset of the received logs */
466 startoff = recptr.xrecoff % XLogSegSize;
468 if (startoff + nbytes > XLogSegSize)
469 segbytes = XLogSegSize - startoff;
473 /* Need to seek in the file? */
474 if (recvOff != startoff)
476 if (lseek(recvFile, (off_t) startoff, SEEK_SET) < 0)
478 (errcode_for_file_access(),
479 errmsg("could not seek in log file %u, "
480 "segment %u to offset %u: %m",
481 recvId, recvSeg, startoff)));
485 /* OK to write the logs */
488 byteswritten = write(recvFile, buf, segbytes);
489 if (byteswritten <= 0)
491 /* if write didn't set errno, assume no disk space */
495 (errcode_for_file_access(),
496 errmsg("could not write to log file %u, segment %u "
497 "at offset %u, length %lu: %m",
499 recvOff, (unsigned long) segbytes)));
502 /* Update state for write */
503 XLByteAdvance(recptr, byteswritten);
505 recvOff += byteswritten;
506 nbytes -= byteswritten;
509 LogstreamResult.Write = recptr;
513 /* Flush the log to disk */
515 XLogWalRcvFlush(void)
517 if (XLByteLT(LogstreamResult.Flush, LogstreamResult.Write))
519 /* use volatile pointer to prevent code rearrangement */
520 volatile WalRcvData *walrcv = WalRcv;
522 issue_xlog_fsync(recvFile, recvId, recvSeg);
524 LogstreamResult.Flush = LogstreamResult.Write;
526 /* Update shared-memory status */
527 SpinLockAcquire(&walrcv->mutex);
528 walrcv->latestChunkStart = walrcv->receivedUpto;
529 walrcv->receivedUpto = LogstreamResult.Flush;
530 SpinLockRelease(&walrcv->mutex);
532 /* Signal the startup process that new WAL has arrived */
535 /* Report XLOG streaming progress in PS display */
536 if (update_process_title)
538 char activitymsg[50];
540 snprintf(activitymsg, sizeof(activitymsg), "streaming %X/%X",
541 LogstreamResult.Write.xlogid,
542 LogstreamResult.Write.xrecoff);
543 set_ps_display(activitymsg, false);