OSDN Git Service

Use a latch to make startup process wake up and replay immediately when
[pg-rex/syncrep.git] / src / backend / replication / walreceiver.c
1 /*-------------------------------------------------------------------------
2  *
3  * walreceiver.c
4  *
5  * The WAL receiver process (walreceiver) is new as of Postgres 9.0. It
6  * is the process in the standby server that takes charge of receiving
7  * XLOG records from a primary server during streaming replication.
8  *
9  * When the startup process determines that it's time to start streaming,
10  * it instructs postmaster to start walreceiver. Walreceiver first connects
11  * to the primary server (it will be served by a walsender process
12  * in the primary server), and then keeps receiving XLOG records and
13  * writing them to the disk as long as the connection is alive. As XLOG
14  * records are received and flushed to disk, it updates the
15  * WalRcv->receivedUpTo variable in shared memory, to inform the startup
16  * process of how far it can proceed with XLOG replay.
17  *
18  * Normal termination is by SIGTERM, which instructs the walreceiver to
19  * exit(0). Emergency termination is by SIGQUIT; like any postmaster child
20  * process, the walreceiver will simply abort and exit on SIGQUIT. A close
21  * of the connection and a FATAL error are treated not as a crash but as
22  * normal operation.
23  *
24  * This file contains the server-facing parts of walreceiver. The libpq-
25  * specific parts are in the libpqwalreceiver module. It's loaded
26  * dynamically to avoid linking the server with libpq.
27  *
28  * Portions Copyright (c) 2010-2010, PostgreSQL Global Development Group
29  *
30  *
31  * IDENTIFICATION
32  *        $PostgreSQL: pgsql/src/backend/replication/walreceiver.c,v 1.17 2010/09/15 10:35:05 heikki Exp $
33  *
34  *-------------------------------------------------------------------------
35  */
36 #include "postgres.h"
37
38 #include <signal.h>
39 #include <unistd.h>
40
41 #include "access/xlog_internal.h"
42 #include "libpq/pqsignal.h"
43 #include "miscadmin.h"
44 #include "replication/walprotocol.h"
45 #include "replication/walreceiver.h"
46 #include "storage/ipc.h"
47 #include "storage/pmsignal.h"
48 #include "utils/builtins.h"
49 #include "utils/guc.h"
50 #include "utils/memutils.h"
51 #include "utils/ps_status.h"
52 #include "utils/resowner.h"
53
54 /* Global variable to indicate if this process is a walreceiver process */
55 bool            am_walreceiver;
56
57 /* libpqreceiver hooks to these when loaded */
58 walrcv_connect_type walrcv_connect = NULL;
59 walrcv_receive_type walrcv_receive = NULL;
60 walrcv_disconnect_type walrcv_disconnect = NULL;
61
62 #define NAPTIME_PER_CYCLE 100   /* max sleep time between cycles (100ms) */
63
64 /*
65  * These variables are used similarly to openLogFile/Id/Seg/Off,
66  * but for walreceiver to write the XLOG.
67  */
68 static int      recvFile = -1;
69 static uint32 recvId = 0;
70 static uint32 recvSeg = 0;
71 static uint32 recvOff = 0;
72
73 /*
74  * Flags set by interrupt handlers of walreceiver for later service in the
75  * main loop.
76  */
77 static volatile sig_atomic_t got_SIGHUP = false;
78 static volatile sig_atomic_t got_SIGTERM = false;
79
80 /*
81  * LogstreamResult indicates the byte positions that we have already
82  * written/fsynced.
83  */
84 static struct
85 {
86         XLogRecPtr      Write;                  /* last byte + 1 written out in the standby */
87         XLogRecPtr      Flush;                  /* last byte + 1 flushed in the standby */
88 }       LogstreamResult;
89
90 /*
91  * About SIGTERM handling:
92  *
93  * We can't just exit(1) within SIGTERM signal handler, because the signal
94  * might arrive in the middle of some critical operation, like while we're
95  * holding a spinlock. We also can't just set a flag in signal handler and
96  * check it in the main loop, because we perform some blocking operations
97  * like libpqrcv_PQexec(), which can take a long time to finish.
98  *
99  * We use a combined approach: When WalRcvImmediateInterruptOK is true, it's
100  * safe for the signal handler to elog(FATAL) immediately. Otherwise it just
101  * sets got_SIGTERM flag, which is checked in the main loop when convenient.
102  *
103  * This is very much like what regular backends do with ImmediateInterruptOK,
104  * ProcessInterrupts() etc.
105  */
106 static volatile bool WalRcvImmediateInterruptOK = false;
107
108 /* Prototypes for private functions */
109 static void ProcessWalRcvInterrupts(void);
110 static void EnableWalRcvImmediateExit(void);
111 static void DisableWalRcvImmediateExit(void);
112 static void WalRcvDie(int code, Datum arg);
113 static void XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len);
114 static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr);
115 static void XLogWalRcvFlush(void);
116
117 /* Signal handlers */
118 static void WalRcvSigHupHandler(SIGNAL_ARGS);
119 static void WalRcvShutdownHandler(SIGNAL_ARGS);
120 static void WalRcvQuickDieHandler(SIGNAL_ARGS);
121
122
123 static void
124 ProcessWalRcvInterrupts(void)
125 {
126         /*
127          * Although walreceiver interrupt handling doesn't use the same scheme as
128          * regular backends, call CHECK_FOR_INTERRUPTS() to make sure we receive
129          * any incoming signals on Win32.
130          */
131         CHECK_FOR_INTERRUPTS();
132
133         if (got_SIGTERM)
134         {
135                 WalRcvImmediateInterruptOK = false;
136                 ereport(FATAL,
137                                 (errcode(ERRCODE_ADMIN_SHUTDOWN),
138                                  errmsg("terminating walreceiver process due to administrator command")));
139         }
140 }
141
142 static void
143 EnableWalRcvImmediateExit(void)
144 {
145         WalRcvImmediateInterruptOK = true;
146         ProcessWalRcvInterrupts();
147 }
148
149 static void
150 DisableWalRcvImmediateExit(void)
151 {
152         WalRcvImmediateInterruptOK = false;
153         ProcessWalRcvInterrupts();
154 }
155
156 /* Main entry point for walreceiver process */
157 void
158 WalReceiverMain(void)
159 {
160         char            conninfo[MAXCONNINFO];
161         XLogRecPtr      startpoint;
162
163         /* use volatile pointer to prevent code rearrangement */
164         volatile WalRcvData *walrcv = WalRcv;
165
166         am_walreceiver = true;
167
168         /*
169          * WalRcv should be set up already (if we are a backend, we inherit this
170          * by fork() or EXEC_BACKEND mechanism from the postmaster).
171          */
172         Assert(walrcv != NULL);
173
174         /*
175          * Mark walreceiver as running in shared memory.
176          *
177          * Do this as early as possible, so that if we fail later on, we'll set
178          * state to STOPPED. If we die before this, the startup process will keep
179          * waiting for us to start up, until it times out.
180          */
181         SpinLockAcquire(&walrcv->mutex);
182         Assert(walrcv->pid == 0);
183         switch (walrcv->walRcvState)
184         {
185                 case WALRCV_STOPPING:
186                         /* If we've already been requested to stop, don't start up. */
187                         walrcv->walRcvState = WALRCV_STOPPED;
188                         /* fall through */
189
190                 case WALRCV_STOPPED:
191                         SpinLockRelease(&walrcv->mutex);
192                         proc_exit(1);
193                         break;
194
195                 case WALRCV_STARTING:
196                         /* The usual case */
197                         break;
198
199                 case WALRCV_RUNNING:
200                         /* Shouldn't happen */
201                         elog(PANIC, "walreceiver still running according to shared memory state");
202         }
203         /* Advertise our PID so that the startup process can kill us */
204         walrcv->pid = MyProcPid;
205         walrcv->walRcvState = WALRCV_RUNNING;
206
207         /* Fetch information required to start streaming */
208         strlcpy(conninfo, (char *) walrcv->conninfo, MAXCONNINFO);
209         startpoint = walrcv->receivedUpto;
210         SpinLockRelease(&walrcv->mutex);
211
212         /* Arrange to clean up at walreceiver exit */
213         on_shmem_exit(WalRcvDie, 0);
214
215         /*
216          * If possible, make this process a group leader, so that the postmaster
217          * can signal any child processes too.  (walreceiver probably never has
218          * any child processes, but for consistency we make all postmaster child
219          * processes do this.)
220          */
221 #ifdef HAVE_SETSID
222         if (setsid() < 0)
223                 elog(FATAL, "setsid() failed: %m");
224 #endif
225
226         /* Properly accept or ignore signals the postmaster might send us */
227         pqsignal(SIGHUP, WalRcvSigHupHandler);          /* set flag to read config
228                                                                                                  * file */
229         pqsignal(SIGINT, SIG_IGN);
230         pqsignal(SIGTERM, WalRcvShutdownHandler);       /* request shutdown */
231         pqsignal(SIGQUIT, WalRcvQuickDieHandler);       /* hard crash time */
232         pqsignal(SIGALRM, SIG_IGN);
233         pqsignal(SIGPIPE, SIG_IGN);
234         pqsignal(SIGUSR1, SIG_IGN);
235         pqsignal(SIGUSR2, SIG_IGN);
236
237         /* Reset some signals that are accepted by postmaster but not here */
238         pqsignal(SIGCHLD, SIG_DFL);
239         pqsignal(SIGTTIN, SIG_DFL);
240         pqsignal(SIGTTOU, SIG_DFL);
241         pqsignal(SIGCONT, SIG_DFL);
242         pqsignal(SIGWINCH, SIG_DFL);
243
244         /* We allow SIGQUIT (quickdie) at all times */
245         sigdelset(&BlockSig, SIGQUIT);
246
247         /* Load the libpq-specific functions */
248         load_file("libpqwalreceiver", false);
249         if (walrcv_connect == NULL || walrcv_receive == NULL ||
250                 walrcv_disconnect == NULL)
251                 elog(ERROR, "libpqwalreceiver didn't initialize correctly");
252
253         /*
254          * Create a resource owner to keep track of our resources (not clear that
255          * we need this, but may as well have one).
256          */
257         CurrentResourceOwner = ResourceOwnerCreate(NULL, "Wal Receiver");
258
259         /* Unblock signals (they were blocked when the postmaster forked us) */
260         PG_SETMASK(&UnBlockSig);
261
262         /* Establish the connection to the primary for XLOG streaming */
263         EnableWalRcvImmediateExit();
264         walrcv_connect(conninfo, startpoint);
265         DisableWalRcvImmediateExit();
266
267         /* Loop until end-of-streaming or error */
268         for (;;)
269         {
270                 unsigned char type;
271                 char       *buf;
272                 int                     len;
273
274                 /*
275                  * Emergency bailout if postmaster has died.  This is to avoid the
276                  * necessity for manual cleanup of all postmaster children.
277                  */
278                 if (!PostmasterIsAlive(true))
279                         exit(1);
280
281                 /*
282                  * Exit walreceiver if we're not in recovery. This should not happen,
283                  * but cross-check the status here.
284                  */
285                 if (!RecoveryInProgress())
286                         ereport(FATAL,
287                                         (errmsg("cannot continue WAL streaming, recovery has already ended")));
288
289                 /* Process any requests or signals received recently */
290                 ProcessWalRcvInterrupts();
291
292                 if (got_SIGHUP)
293                 {
294                         got_SIGHUP = false;
295                         ProcessConfigFile(PGC_SIGHUP);
296                 }
297
298                 /* Wait a while for data to arrive */
299                 if (walrcv_receive(NAPTIME_PER_CYCLE, &type, &buf, &len))
300                 {
301                         /* Accept the received data, and process it */
302                         XLogWalRcvProcessMsg(type, buf, len);
303
304                         /* Receive any more data we can without sleeping */
305                         while (walrcv_receive(0, &type, &buf, &len))
306                                 XLogWalRcvProcessMsg(type, buf, len);
307
308                         /*
309                          * If we've written some records, flush them to disk and let the
310                          * startup process know about them.
311                          */
312                         XLogWalRcvFlush();
313                 }
314         }
315 }
316
317 /*
318  * Mark us as STOPPED in shared memory at exit.
319  */
320 static void
321 WalRcvDie(int code, Datum arg)
322 {
323         /* use volatile pointer to prevent code rearrangement */
324         volatile WalRcvData *walrcv = WalRcv;
325
326         SpinLockAcquire(&walrcv->mutex);
327         Assert(walrcv->walRcvState == WALRCV_RUNNING ||
328                    walrcv->walRcvState == WALRCV_STOPPING);
329         walrcv->walRcvState = WALRCV_STOPPED;
330         walrcv->pid = 0;
331         SpinLockRelease(&walrcv->mutex);
332
333         /* Terminate the connection gracefully. */
334         if (walrcv_disconnect != NULL)
335                 walrcv_disconnect();
336 }
337
338 /* SIGHUP: set flag to re-read config file at next convenient time */
339 static void
340 WalRcvSigHupHandler(SIGNAL_ARGS)
341 {
342         got_SIGHUP = true;
343 }
344
345 /* SIGTERM: set flag for main loop, or shutdown immediately if safe */
346 static void
347 WalRcvShutdownHandler(SIGNAL_ARGS)
348 {
349         got_SIGTERM = true;
350
351         /* Don't joggle the elbow of proc_exit */
352         if (!proc_exit_inprogress && WalRcvImmediateInterruptOK)
353                 ProcessWalRcvInterrupts();
354 }
355
356 /*
357  * WalRcvQuickDieHandler() occurs when signalled SIGQUIT by the postmaster.
358  *
359  * Some backend has bought the farm, so we need to stop what we're doing and
360  * exit.
361  */
362 static void
363 WalRcvQuickDieHandler(SIGNAL_ARGS)
364 {
365         PG_SETMASK(&BlockSig);
366
367         /*
368          * We DO NOT want to run proc_exit() callbacks -- we're here because
369          * shared memory may be corrupted, so we don't want to try to clean up our
370          * transaction.  Just nail the windows shut and get out of town.  Now that
371          * there's an atexit callback to prevent third-party code from breaking
372          * things by calling exit() directly, we have to reset the callbacks
373          * explicitly to make this work as intended.
374          */
375         on_exit_reset();
376
377         /*
378          * Note we do exit(2) not exit(0).      This is to force the postmaster into a
379          * system reset cycle if some idiot DBA sends a manual SIGQUIT to a random
380          * backend.  This is necessary precisely because we don't clean up our
381          * shared memory state.  (The "dead man switch" mechanism in pmsignal.c
382          * should ensure the postmaster sees this as a crash, too, but no harm in
383          * being doubly sure.)
384          */
385         exit(2);
386 }
387
388 /*
389  * Accept the message from XLOG stream, and process it.
390  */
391 static void
392 XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len)
393 {
394         switch (type)
395         {
396                 case 'w':                               /* WAL records */
397                         {
398                                 WalDataMessageHeader msghdr;
399
400                                 if (len < sizeof(WalDataMessageHeader))
401                                         ereport(ERROR,
402                                                         (errcode(ERRCODE_PROTOCOL_VIOLATION),
403                                                          errmsg_internal("invalid WAL message received from primary")));
404                                 /* memcpy is required here for alignment reasons */
405                                 memcpy(&msghdr, buf, sizeof(WalDataMessageHeader));
406                                 buf += sizeof(WalDataMessageHeader);
407                                 len -= sizeof(WalDataMessageHeader);
408
409                                 XLogWalRcvWrite(buf, len, msghdr.dataStart);
410                                 break;
411                         }
412                 default:
413                         ereport(ERROR,
414                                         (errcode(ERRCODE_PROTOCOL_VIOLATION),
415                                          errmsg_internal("invalid replication message type %d",
416                                                                          type)));
417         }
418 }
419
420 /*
421  * Write XLOG data to disk.
422  */
423 static void
424 XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr)
425 {
426         int                     startoff;
427         int                     byteswritten;
428
429         while (nbytes > 0)
430         {
431                 int                     segbytes;
432
433                 if (recvFile < 0 || !XLByteInSeg(recptr, recvId, recvSeg))
434                 {
435                         bool            use_existent;
436
437                         /*
438                          * fsync() and close current file before we switch to next one. We
439                          * would otherwise have to reopen this file to fsync it later
440                          */
441                         if (recvFile >= 0)
442                         {
443                                 XLogWalRcvFlush();
444
445                                 /*
446                                  * XLOG segment files will be re-read by recovery in startup
447                                  * process soon, so we don't advise the OS to release cache
448                                  * pages associated with the file like XLogFileClose() does.
449                                  */
450                                 if (close(recvFile) != 0)
451                                         ereport(PANIC,
452                                                         (errcode_for_file_access(),
453                                                 errmsg("could not close log file %u, segment %u: %m",
454                                                            recvId, recvSeg)));
455                         }
456                         recvFile = -1;
457
458                         /* Create/use new log file */
459                         XLByteToSeg(recptr, recvId, recvSeg);
460                         use_existent = true;
461                         recvFile = XLogFileInit(recvId, recvSeg, &use_existent, true);
462                         recvOff = 0;
463                 }
464
465                 /* Calculate the start offset of the received logs */
466                 startoff = recptr.xrecoff % XLogSegSize;
467
468                 if (startoff + nbytes > XLogSegSize)
469                         segbytes = XLogSegSize - startoff;
470                 else
471                         segbytes = nbytes;
472
473                 /* Need to seek in the file? */
474                 if (recvOff != startoff)
475                 {
476                         if (lseek(recvFile, (off_t) startoff, SEEK_SET) < 0)
477                                 ereport(PANIC,
478                                                 (errcode_for_file_access(),
479                                                  errmsg("could not seek in log file %u, "
480                                                                 "segment %u to offset %u: %m",
481                                                                 recvId, recvSeg, startoff)));
482                         recvOff = startoff;
483                 }
484
485                 /* OK to write the logs */
486                 errno = 0;
487
488                 byteswritten = write(recvFile, buf, segbytes);
489                 if (byteswritten <= 0)
490                 {
491                         /* if write didn't set errno, assume no disk space */
492                         if (errno == 0)
493                                 errno = ENOSPC;
494                         ereport(PANIC,
495                                         (errcode_for_file_access(),
496                                          errmsg("could not write to log file %u, segment %u "
497                                                         "at offset %u, length %lu: %m",
498                                                         recvId, recvSeg,
499                                                         recvOff, (unsigned long) segbytes)));
500                 }
501
502                 /* Update state for write */
503                 XLByteAdvance(recptr, byteswritten);
504
505                 recvOff += byteswritten;
506                 nbytes -= byteswritten;
507                 buf += byteswritten;
508
509                 LogstreamResult.Write = recptr;
510         }
511 }
512
513 /* Flush the log to disk */
514 static void
515 XLogWalRcvFlush(void)
516 {
517         if (XLByteLT(LogstreamResult.Flush, LogstreamResult.Write))
518         {
519                 /* use volatile pointer to prevent code rearrangement */
520                 volatile WalRcvData *walrcv = WalRcv;
521
522                 issue_xlog_fsync(recvFile, recvId, recvSeg);
523
524                 LogstreamResult.Flush = LogstreamResult.Write;
525
526                 /* Update shared-memory status */
527                 SpinLockAcquire(&walrcv->mutex);
528                 walrcv->latestChunkStart = walrcv->receivedUpto;
529                 walrcv->receivedUpto = LogstreamResult.Flush;
530                 SpinLockRelease(&walrcv->mutex);
531
532                 /* Signal the startup process that new WAL has arrived */
533                 WakeupRecovery();
534
535                 /* Report XLOG streaming progress in PS display */
536                 if (update_process_title)
537                 {
538                         char            activitymsg[50];
539
540                         snprintf(activitymsg, sizeof(activitymsg), "streaming %X/%X",
541                                          LogstreamResult.Write.xlogid,
542                                          LogstreamResult.Write.xrecoff);
543                         set_ps_display(activitymsg, false);
544                 }
545         }
546 }