* walsender.c
*
* The WAL sender process (walsender) is new as of Postgres 9.0. It takes
- * charge of XLOG streaming sender in the primary server. At first, it is
- * started by the postmaster when the walreceiver in the standby server
- * connects to the primary server and requests XLOG streaming replication,
- * i.e., unlike any auxiliary process, it is not an always-running process.
+ * care of sending XLOG from the primary server to a single recipient.
+ * (Note that there can be more than one walsender process concurrently.)
+ * It is started by the postmaster when the walreceiver of a standby server
+ * connects to the primary server and requests XLOG streaming replication.
* It attempts to keep reading XLOG records from the disk and sending them
* to the standby server, as long as the connection is alive (i.e., like
- * any backend, there is an one to one relationship between a connection
+ * any backend, there is a one-to-one relationship between a connection
* and a walsender process).
*
* Normal termination is by SIGTERM, which instructs the walsender to
* This instruct walsender to send any outstanding WAL, including the
* shutdown checkpoint record, and then exit.
*
- * Note that there can be more than one walsender process concurrently.
*
* Portions Copyright (c) 2010-2010, PostgreSQL Global Development Group
*
- *
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/backend/replication/walsender.c,v 1.19 2010/04/28 16:54:15 tgl Exp $
+ * $PostgreSQL: pgsql/src/backend/replication/walsender.c,v 1.32 2010/09/15 06:51:19 heikki Exp $
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
+#include <signal.h>
#include <unistd.h>
#include "access/xlog_internal.h"
#include "libpq/pqformat.h"
#include "libpq/pqsignal.h"
#include "miscadmin.h"
+#include "replication/walprotocol.h"
#include "replication/walsender.h"
#include "storage/fd.h"
#include "storage/ipc.h"
#include "storage/pmsignal.h"
+#include "storage/proc.h"
#include "tcop/tcopprot.h"
#include "utils/guc.h"
#include "utils/memutils.h"
/* My slot in the shared memory array */
static WalSnd *MyWalSnd = NULL;
+/* Array of WalSndWaiter in shared memory */
+static WalSndWaiter *WalSndWaiters;
+
/* Global state */
bool am_walsender = false; /* Am I a walsender process ? */
/* User-settable parameters for walsender */
int max_wal_senders = 0; /* the maximum number of concurrent walsenders */
int WalSndDelay = 200; /* max sleep time between some actions */
+int replication_timeout = 0; /* maximum time to wait for the Ack from the standby */
-#define NAPTIME_PER_CYCLE 100000L /* max sleep time between cycles (100ms) */
+/*
+ * Buffer for WAL sending
+ *
+ * WalSndOutBuffer is a work area in which the output message is constructed.
+ * It's used in just so we can avoid re-palloc'ing the buffer on each cycle.
+ * It must be of size 6 + sizeof(WalDataMessageHeader) + MAX_SEND_SIZE.
+ */
+static char *WalSndOutBuffer;
+static int WalSndOutHead; /* head of pending output */
+static int WalSndOutTail; /* tail of pending output */
/*
* These variables are used similarly to openLogFile/Id/Seg/Off,
/*
* How far have we sent WAL already? This is also advertised in
- * MyWalSnd->sentPtr.
+ * MyWalSnd->sentPtr. (Actually, this is the next WAL location to send.)
*/
static XLogRecPtr sentPtr = {0, 0};
+/*
+ * How far have we completed replication already? This is also
+ * advertised in MyWalSnd->ackdPtr. This is not used in asynchronous
+ * replication case.
+ */
+static XLogRecPtr ackdPtr = {0, 0};
+
+/* Replication mode requested by connected standby */
+static int rplMode = REPLICATION_MODE_ASYNC;
+
/* Flags set by signal handlers for later service in main loop */
static volatile sig_atomic_t got_SIGHUP = false;
static volatile sig_atomic_t shutdown_requested = false;
static volatile sig_atomic_t ready_to_stop = false;
+/* Flag set by signal handler of backends for replication */
+static volatile sig_atomic_t replication_done = false;
+
/* Signal handlers */
static void WalSndSigHupHandler(SIGNAL_ARGS);
static void WalSndShutdownHandler(SIGNAL_ARGS);
static void WalSndQuickDieHandler(SIGNAL_ARGS);
+static void WalSndXLogSendHandler(SIGNAL_ARGS);
+static void WalSndLastCycleHandler(SIGNAL_ARGS);
/* Prototypes for private functions */
static int WalSndLoop(void);
static void WalSndHandshake(void);
static void WalSndKill(int code, Datum arg);
static void XLogRead(char *buf, XLogRecPtr recptr, Size nbytes);
-static bool XLogSend(StringInfo outMsg);
-static void CheckClosedConnection(void);
+static bool XLogSend(bool *caughtup, bool *pending);
+static void ProcessStreamMsgs(StringInfo inMsg);
+
+static void RegisterWalSndWaiter(BackendId backendId, XLogRecPtr record,
+ Latch *latch);
+static void WakeupWalSndWaiters(XLogRecPtr record);
+static XLogRecPtr GetOldestAckdPtr(void);
-/*
- * How much WAL to send in one message? Must be >= XLOG_BLCKSZ.
- */
-#define MAX_SEND_SIZE (XLOG_SEG_SIZE / 2)
/* Main entry point for walsender process */
int
/* Handle handshake messages before streaming */
WalSndHandshake();
+ /* Initialize shared memory status */
+ {
+ /* use volatile pointer to prevent code rearrangement */
+ volatile WalSnd *walsnd = MyWalSnd;
+
+ SpinLockAcquire(&walsnd->mutex);
+ walsnd->sentPtr = sentPtr;
+ SpinLockRelease(&walsnd->mutex);
+ }
+
/* Main loop of walsender */
return WalSndLoop();
}
+/*
+ * Execute commands from walreceiver, until we enter streaming mode.
+ */
static void
WalSndHandshake(void)
{
firstchar = pq_getbyte();
/*
+ * Emergency bailout if postmaster has died. This is to avoid the
+ * necessity for manual cleanup of all postmaster children.
+ */
+ if (!PostmasterIsAlive(true))
+ exit(1);
+
+ /*
* Check for any other interesting events that happened while we
* slept.
*/
/* Handle the very limited subset of commands expected in this phase */
switch (firstchar)
{
+ char rplModeStr[6];
+
case 'Q': /* Query message */
{
const char *query_string;
/*
* Reply with a result set with one row, two columns.
- * First col is system ID, and second if timeline ID
+ * First col is system ID, and second is timeline ID
*/
snprintf(sysid, sizeof(sysid), UINT64_FORMAT,
/* Send CommandComplete and ReadyForQuery messages */
EndCommand("SELECT", DestRemote);
ReadyForQuery(DestRemote);
+ /* ReadyForQuery did pq_flush for us */
}
- else if (sscanf(query_string, "START_REPLICATION %X/%X",
- &recptr.xlogid, &recptr.xrecoff) == 2)
+ else if (sscanf(query_string, "START_REPLICATION %X/%X MODE %5s",
+ &recptr.xlogid, &recptr.xrecoff, rplModeStr) == 3)
{
StringInfoData buf;
* NOTE: This only checks the current value of
* wal_level. Even if the current setting is not
* 'minimal', there can be old WAL in the pg_xlog
- * directory that was created with 'minimal'.
- * So this is not bulletproof, the purpose is
- * just to give a user-friendly error message that
- * hints how to configure the system correctly.
+ * directory that was created with 'minimal'. So this
+ * is not bulletproof, the purpose is just to give a
+ * user-friendly error message that hints how to
+ * configure the system correctly.
*/
if (wal_level == WAL_LEVEL_MINIMAL)
ereport(FATAL,
(errcode(ERRCODE_CANNOT_CONNECT_NOW),
- errmsg("standby connections not allowed because wal_level=\"minimal\"")));
-
- /* Send a CopyOutResponse message, and start streaming */
- pq_beginmessage(&buf, 'H');
- pq_sendbyte(&buf, 0);
- pq_sendint(&buf, 0, 2);
+ errmsg("standby connections not allowed because wal_level=minimal")));
+
+ /* Verify that the specified replication mode is valid */
+ {
+ const struct config_enum_entry *entry;
+
+ for (entry = replication_mode_options; entry && entry->name; entry++)
+ {
+ if (strcmp(rplModeStr, entry->name) == 0)
+ {
+ rplMode = entry->val;
+ break;
+ }
+ }
+ if (entry == NULL || entry->name == NULL)
+ ereport(FATAL,
+ (errcode(ERRCODE_PROTOCOL_VIOLATION),
+ errmsg("invalid replication mode: %s", rplModeStr)));
+ }
+ MyWalSnd->rplMode = rplMode;
+
+ /* Send a CopyXLogResponse message, and start streaming */
+ pq_beginmessage(&buf, 'W');
pq_endmessage(&buf);
pq_flush();
/*
- * Initialize position to the received one, then the
+ * Initialize positions to the received one, then the
* xlog records begin to be shipped from that position
*/
- sentPtr = recptr;
+ sentPtr = ackdPtr = recptr;
/* break out of the loop */
replication_started = true;
}
/*
- * Check if the remote end has closed the connection.
+ * Process messages received from the standby.
+ *
+ * ereports on error.
*/
static void
-CheckClosedConnection(void)
+ProcessStreamMsgs(StringInfo inMsg)
{
- unsigned char firstchar;
- int r;
+ bool acked = false;
- r = pq_getbyte_if_available(&firstchar);
- if (r < 0)
- {
- /* unexpected error or EOF */
- ereport(COMMERROR,
- (errcode(ERRCODE_PROTOCOL_VIOLATION),
- errmsg("unexpected EOF on standby connection")));
- proc_exit(0);
- }
- if (r == 0)
+ /* Loop to process successive complete messages available */
+ for (;;)
{
- /* no data available without blocking */
- return;
- }
+ unsigned char firstchar;
+ int r;
+
+ r = pq_getbyte_if_available(&firstchar);
+ if (r < 0)
+ {
+ /* unexpected error or EOF */
+ ereport(COMMERROR,
+ (errcode(ERRCODE_PROTOCOL_VIOLATION),
+ errmsg("unexpected EOF on standby connection")));
+ proc_exit(0);
+ }
+ if (r == 0)
+ {
+ /* no data available without blocking */
+ break;
+ }
+
+ /* Handle the very limited subset of commands expected in this phase */
+ switch (firstchar)
+ {
+ case 'd': /* CopyData message */
+ {
+ unsigned char rpltype;
+
+ /*
+ * Read the message contents. This is expected to be done without
+ * blocking because we've been able to get message type code.
+ */
+ if (pq_getmessage(inMsg, 0))
+ proc_exit(0); /* suitable message already logged */
+
+ /* Read the replication message type from CopyData message */
+ rpltype = pq_getmsgbyte(inMsg);
+ switch (rpltype)
+ {
+ case 'l':
+ {
+ WalAckMessageData *msgdata;
+
+ msgdata = (WalAckMessageData *) pq_getmsgbytes(inMsg, sizeof(WalAckMessageData));
+
+ /*
+ * Update local status.
+ *
+ * The ackd ptr received from standby should not
+ * go backwards.
+ */
+ if (XLByteLE(ackdPtr, msgdata->ackEnd))
+ ackdPtr = msgdata->ackEnd;
+ else
+ ereport(FATAL,
+ (errmsg("replication completion location went back from "
+ "%X/%X to %X/%X",
+ ackdPtr.xlogid, ackdPtr.xrecoff,
+ msgdata->ackEnd.xlogid, msgdata->ackEnd.xrecoff)));
+
+ acked = true; /* also need to update shared position */
+ break;
+ }
+ default:
+ ereport(FATAL,
+ (errcode(ERRCODE_PROTOCOL_VIOLATION),
+ errmsg("invalid replication message type %d",
+ rpltype)));
+ }
+ break;
+ }
- /* Handle the very limited subset of commands expected in this phase */
- switch (firstchar)
- {
/*
* 'X' means that the standby is closing down the socket.
*/
- case 'X':
- proc_exit(0);
+ case 'X':
+ proc_exit(0);
- default:
- ereport(FATAL,
- (errcode(ERRCODE_PROTOCOL_VIOLATION),
- errmsg("invalid standby closing message type %d",
- firstchar)));
+ default:
+ ereport(FATAL,
+ (errcode(ERRCODE_PROTOCOL_VIOLATION),
+ errmsg("invalid standby message type %d",
+ firstchar)));
+ }
+ }
+
+ if (acked)
+ {
+ /* use volatile pointer to prevent code rearrangement */
+ volatile WalSnd *walsnd = MyWalSnd;
+
+ SpinLockAcquire(&walsnd->mutex);
+ walsnd->ackdPtr = ackdPtr;
+ SpinLockRelease(&walsnd->mutex);
}
+
+ /* Wake up the backends that this walsender had been blocking */
+ WakeupWalSndWaiters(ackdPtr);
}
/* Main loop of walsender process */
static int
WalSndLoop(void)
{
- StringInfoData output_message;
+ StringInfoData input_message;
+ bool caughtup = false;
+ bool pending = false;
+
+ initStringInfo(&input_message);
- initStringInfo(&output_message);
+ /*
+ * Allocate buffer that will be used for each output message. We do this
+ * just once to reduce palloc overhead. The buffer must be made large
+ * enough for maximum-sized messages.
+ */
+ WalSndOutBuffer = palloc(6 + sizeof(WalDataMessageHeader) + MAX_SEND_SIZE);
+ WalSndOutHead = WalSndOutTail = 0;
- /* Loop forever */
+ /* Loop forever, unless we get an error */
for (;;)
{
- long remain; /* remaining time (us) */
-
/*
* Emergency bailout if postmaster has died. This is to avoid the
* necessity for manual cleanup of all postmaster children.
*/
if (!PostmasterIsAlive(true))
exit(1);
+
/* Process any requests or signals received recently */
if (got_SIGHUP)
{
*/
if (ready_to_stop)
{
- XLogSend(&output_message);
- shutdown_requested = true;
+ if (!XLogSend(&caughtup, &pending))
+ break;
+ if (caughtup && !pending)
+ shutdown_requested = true;
}
/* Normal exit from the walsender is here */
}
/*
- * Nap for the configured time or until a message arrives.
- *
- * On some platforms, signals won't interrupt the sleep. To ensure we
- * respond reasonably promptly when someone signals us, break down the
- * sleep into NAPTIME_PER_CYCLE increments, and check for
- * interrupts after each nap.
+ * If we had sent all accumulated WAL in last round or could not
+ * flush pending WAL in output buffer because the socket was not
+ * writable, nap for the configured time before retrying.
*/
- remain = WalSndDelay * 1000L;
- while (remain > 0)
+ if (caughtup || pending)
{
- if (got_SIGHUP || shutdown_requested || ready_to_stop)
- break;
-
/*
- * Check to see whether a message from the standby or an interrupt
- * from other processes has arrived.
+ * Even if we wrote all the WAL that was available when we started
+ * sending, more might have arrived while we were sending this
+ * batch. We had the latch set while sending, so we have not
+ * received any signals from that time. Let's arm the latch
+ * again, and after that check that we're still up-to-date.
*/
- pg_usleep(remain > NAPTIME_PER_CYCLE ? NAPTIME_PER_CYCLE : remain);
- CheckClosedConnection();
+ ResetLatch(&MyWalSnd->latch);
- remain -= NAPTIME_PER_CYCLE;
- }
+ if (!XLogSend(&caughtup, &pending))
+ break;
+ if ((caughtup || pending) && !got_SIGHUP && !ready_to_stop &&
+ !shutdown_requested)
+ {
+ bool check_timeout;
+ long sleeptime;
+ int res;
- /* Attempt to send the log once every loop */
- if (!XLogSend(&output_message))
- goto eof;
- }
+ /*
+ * XXX: We don't really need the periodic wakeups anymore,
+ * WaitLatchOrSocket should reliably wake up as soon as
+ * something interesting happens.
+ */
+
+ /*
+ * Check for replication timeout if it's enabled and we need
+ * to wait until the socket has become writable to flush
+ * pending WAL in output buffer or until the Ack message
+ * from the standby has become available.
+ */
+ if (replication_timeout > 0 &&
+ (pending ||
+ (rplMode != REPLICATION_MODE_ASYNC &&
+ XLByteLT(ackdPtr, sentPtr))))
+ {
+ sleeptime = replication_timeout;
+ check_timeout = true;
+ }
+ else
+ {
+ sleeptime = WalSndDelay;
+ check_timeout = false;
+ }
+
+ /* Sleep */
+ res = WaitLatchOrSocket(&MyWalSnd->latch, MyProcPort->sock,
+ true, (WalSndOutTail > 0),
+ sleeptime * 1000L);
- /* can't get here because the above loop never exits */
- return 1;
+ if (res == 0 && check_timeout)
+ {
+ /*
+ * Since typically expiration of replication timeout means
+ * communication problem, we don't send the error message
+ * to the standby.
+ */
+ ereport(COMMERROR,
+ (errmsg("terminating walsender process due to replication timeout")));
+ break;
+ }
+ }
-eof:
+ /* Process messages received from the standby */
+ ProcessStreamMsgs(&input_message);
+ }
+ else
+ {
+ /* Attempt to send the log once every loop */
+ if (!XLogSend(&caughtup, &pending))
+ break;
+ }
+ }
/*
+ * Get here on send failure. Clean up and exit.
+ *
* Reset whereToSendOutput to prevent ereport from attempting to send any
* more messages to the standby.
*/
static void
InitWalSnd(void)
{
- /* use volatile pointer to prevent code rearrangement */
int i;
/*
*/
for (i = 0; i < max_wal_senders; i++)
{
+ /* use volatile pointer to prevent code rearrangement */
volatile WalSnd *walsnd = &WalSndCtl->walsnds[i];
SpinLockAcquire(&walsnd->mutex);
}
else
{
- /* found */
- MyWalSnd = (WalSnd *) walsnd;
+ /*
+ * Found a free slot. Reserve it for us.
+ */
walsnd->pid = MyProcPid;
- MemSet(&MyWalSnd->sentPtr, 0, sizeof(XLogRecPtr));
+ MemSet(&walsnd->sentPtr, 0, sizeof(XLogRecPtr));
+ MemSet(&walsnd->ackdPtr, 0, sizeof(XLogRecPtr));
SpinLockRelease(&walsnd->mutex);
+ /* don't need the lock anymore */
+ OwnLatch((Latch *) &walsnd->latch);
+ MyWalSnd = (WalSnd *) walsnd;
+
break;
}
}
ereport(FATAL,
(errcode(ERRCODE_TOO_MANY_CONNECTIONS),
errmsg("number of requested standby connections "
- "exceeds max_wal_senders (currently %d)",
- max_wal_senders)));
+ "exceeds max_wal_senders (currently %d)",
+ max_wal_senders)));
/* Arrange to clean up at walsender exit */
on_shmem_exit(WalSndKill, 0);
{
Assert(MyWalSnd != NULL);
+ /* Wake up the backends that this walsender had been blocking */
+ MyWalSnd->rplMode = REPLICATION_MODE_ASYNC;
+ WakeupWalSndWaiters(GetOldestAckdPtr());
+
/*
* Mark WalSnd struct no longer in use. Assume that no lock is required
* for this.
*/
MyWalSnd->pid = 0;
+ DisownLatch(&MyWalSnd->latch);
/* WalSnd struct isn't mine anymore */
MyWalSnd = NULL;
/*
* Read 'nbytes' bytes from WAL into 'buf', starting at location 'recptr'
+ *
+ * XXX probably this should be improved to suck data directly from the
+ * WAL buffers when possible.
*/
-void
+static void
XLogRead(char *buf, XLogRecPtr recptr, Size nbytes)
{
XLogRecPtr startRecPtr = recptr;
if (sendFile < 0)
{
/*
- * If the file is not found, assume it's because the
- * standby asked for a too old WAL segment that has already
- * been removed or recycled.
+ * If the file is not found, assume it's because the standby
+ * asked for a too old WAL segment that has already been
+ * removed or recycled.
*/
if (errno == ENOENT)
{
- char filename[MAXFNAMELEN];
+ char filename[MAXFNAMELEN];
+
XLogFileName(filename, ThisTimeLineID, sendId, sendSeg);
ereport(ERROR,
(errcode_for_file_access(),
}
/*
- * After reading into the buffer, check that what we read was valid.
- * We do this after reading, because even though the segment was present
- * when we opened it, it might get recycled or removed while we read it.
- * The read() succeeds in that case, but the data we tried to read might
+ * After reading into the buffer, check that what we read was valid. We do
+ * this after reading, because even though the segment was present when we
+ * opened it, it might get recycled or removed while we read it. The
+ * read() succeeds in that case, but the data we tried to read might
* already have been overwritten with new WAL records.
*/
XLogGetLastRemoved(&lastRemovedLog, &lastRemovedSeg);
if (log < lastRemovedLog ||
(log == lastRemovedLog && seg <= lastRemovedSeg))
{
- char filename[MAXFNAMELEN];
+ char filename[MAXFNAMELEN];
+
XLogFileName(filename, ThisTimeLineID, log, seg);
ereport(ERROR,
(errcode_for_file_access(),
}
/*
- * Read all WAL that's been written (and flushed) since last cycle, and send
- * it to client.
+ * Read up to MAX_SEND_SIZE bytes of WAL that's been flushed to disk,
+ * but not yet sent to the client, and send it.
+ *
+ * If there is no unsent WAL remaining, *caughtup is set to true, otherwise
+ * *caughtup is set to false.
+ *
+ * If there is pending WAL in output buffer, *pending is set to true,
+ * otherwise *pending is set to false.
*
* Returns true if OK, false if trouble.
*/
static bool
-XLogSend(StringInfo outMsg)
+XLogSend(bool *caughtup, bool *pending)
{
XLogRecPtr SendRqstPtr;
- char activitymsg[50];
+ XLogRecPtr startptr;
+ static XLogRecPtr endptr;
+ Size nbytes;
+ uint32 n32;
+ int res;
+ WalDataMessageHeader msghdr;
+
+ /* Attempt to flush pending WAL in output buffer */
+ if (*pending)
+ {
+ if (WalSndOutHead != WalSndOutTail)
+ {
+ res = pq_putbytes_if_writable(WalSndOutBuffer + WalSndOutHead,
+ WalSndOutTail - WalSndOutHead);
+ if (res == EOF)
+ return false;
+ WalSndOutHead += res;
+ if (WalSndOutHead != WalSndOutTail)
+ return true;
+ }
- /* use volatile pointer to prevent code rearrangement */
- volatile WalSnd *walsnd = MyWalSnd;
+ res = pq_flush_if_writable();
+ if (res == EOF)
+ return false;
+ if (res == 0)
+ return true;
- /* Attempt to send all records flushed to the disk already */
- SendRqstPtr = GetWriteRecPtr();
+ goto updt;
+ }
+
+ /*
+ * Attempt to send all data that's already been written out and fsync'd to
+ * disk. We cannot go further than what's been written out given the
+ * current implementation of XLogRead(). And in any case it's unsafe to
+ * send WAL that is not securely down to disk on the master: if the master
+ * subsequently crashes and restarts, slaves must not have applied any WAL
+ * that gets lost on the master.
+ */
+ SendRqstPtr = GetFlushRecPtr();
/* Quick exit if nothing to do */
- if (!XLByteLT(sentPtr, SendRqstPtr))
+ if (XLByteLE(SendRqstPtr, sentPtr))
+ {
+ *caughtup = true;
return true;
+ }
/*
- * We gather multiple records together by issuing just one XLogRead() of a
- * suitable size, and send them as one CopyData message. Repeat until
- * we've sent everything we can.
+ * Figure out how much to send in one message. If there's no more than
+ * MAX_SEND_SIZE bytes to send, send everything. Otherwise send
+ * MAX_SEND_SIZE bytes, but round back to logfile or page boundary.
+ *
+ * The rounding is not only for performance reasons. Walreceiver relies on
+ * the fact that we never split a WAL record across two messages. Since a
+ * long WAL record is split at page boundary into continuation records,
+ * page boundary is always a safe cut-off point. We also assume that
+ * SendRqstPtr never points to the middle of a WAL record.
*/
- while (XLByteLT(sentPtr, SendRqstPtr))
+ startptr = sentPtr;
+ if (startptr.xrecoff >= XLogFileSize)
{
- XLogRecPtr startptr;
- XLogRecPtr endptr;
- Size nbytes;
-
/*
- * Figure out how much to send in one message. If there's less than
- * MAX_SEND_SIZE bytes to send, send everything. Otherwise send
- * MAX_SEND_SIZE bytes, but round to page boundary.
- *
- * The rounding is not only for performance reasons. Walreceiver
- * relies on the fact that we never split a WAL record across two
- * messages. Since a long WAL record is split at page boundary into
- * continuation records, page boundary is always a safe cut-off point.
- * We also assume that SendRqstPtr never points in the middle of a WAL
- * record.
+ * crossing a logid boundary, skip the non-existent last log segment
+ * in previous logical log file.
*/
- startptr = sentPtr;
- if (startptr.xrecoff >= XLogFileSize)
- {
- /*
- * crossing a logid boundary, skip the non-existent last log
- * segment in previous logical log file.
- */
- startptr.xlogid += 1;
- startptr.xrecoff = 0;
- }
+ startptr.xlogid += 1;
+ startptr.xrecoff = 0;
+ }
+
+ endptr = startptr;
+ XLByteAdvance(endptr, MAX_SEND_SIZE);
+ if (endptr.xlogid != startptr.xlogid)
+ {
+ /* Don't cross a logfile boundary within one message */
+ Assert(endptr.xlogid == startptr.xlogid + 1);
+ endptr.xlogid = startptr.xlogid;
+ endptr.xrecoff = XLogFileSize;
+ }
- endptr = startptr;
- XLByteAdvance(endptr, MAX_SEND_SIZE);
+ /* if we went beyond SendRqstPtr, back off */
+ if (XLByteLE(SendRqstPtr, endptr))
+ {
+ endptr = SendRqstPtr;
+ *caughtup = true;
+ }
+ else
+ {
/* round down to page boundary. */
endptr.xrecoff -= (endptr.xrecoff % XLOG_BLCKSZ);
- /* if we went beyond SendRqstPtr, back off */
- if (XLByteLT(SendRqstPtr, endptr))
- endptr = SendRqstPtr;
+ *caughtup = false;
+ }
- /*
- * OK to read and send the slice.
- *
- * We don't need to convert the xlogid/xrecoff from host byte order to
- * network byte order because the both server can be expected to have
- * the same byte order. If they have different byte order, we don't
- * reach here.
- */
- pq_sendbyte(outMsg, 'w');
- pq_sendbytes(outMsg, (char *) &startptr, sizeof(startptr));
+ nbytes = endptr.xrecoff - startptr.xrecoff;
+ Assert(nbytes <= MAX_SEND_SIZE);
- if (endptr.xlogid != startptr.xlogid)
- {
- Assert(endptr.xlogid == startptr.xlogid + 1);
- nbytes = endptr.xrecoff + XLogFileSize - startptr.xrecoff;
- }
- else
- nbytes = endptr.xrecoff - startptr.xrecoff;
+ /*
+ * OK to read and send the slice.
+ */
+ WalSndOutBuffer[0] = 'd';
+ WalSndOutBuffer[5] = 'w';
+ WalSndOutHead = 0;
+ WalSndOutTail = 6 + sizeof(WalDataMessageHeader) + nbytes;
- sentPtr = endptr;
+ n32 = htonl((uint32) WalSndOutTail - 1);
+ memcpy(WalSndOutBuffer + 1, &n32, 4);
- /*
- * Read the log directly into the output buffer to prevent extra
- * memcpy calls.
- */
- enlargeStringInfo(outMsg, nbytes);
+ /*
+ * Read the log directly into the output buffer to avoid extra memcpy
+ * calls.
+ */
+ XLogRead(WalSndOutBuffer + 6 + sizeof(WalDataMessageHeader), startptr, nbytes);
+
+ /*
+ * We fill the message header last so that the send timestamp is taken as
+ * late as possible.
+ */
+ msghdr.dataStart = startptr;
+ msghdr.walEnd = SendRqstPtr;
+ msghdr.sendTime = GetCurrentTimestamp();
+
+ memcpy(WalSndOutBuffer + 6, &msghdr, sizeof(WalDataMessageHeader));
+
+ res = pq_putbytes_if_writable(WalSndOutBuffer, WalSndOutTail);
+ if (res == EOF)
+ return false;
- XLogRead(&outMsg->data[outMsg->len], startptr, nbytes);
- outMsg->len += nbytes;
- outMsg->data[outMsg->len] = '\0';
+ WalSndOutHead = res;
+ if (WalSndOutHead != WalSndOutTail)
+ {
+ *caughtup = false;
+ *pending = true;
+ return true;
+ }
- pq_putmessage('d', outMsg->data, outMsg->len);
- resetStringInfo(outMsg);
+ /* Flush pending output to the client */
+ res = pq_flush_if_writable();
+ if (res == EOF)
+ return false;
+ if (res == 0)
+ {
+ *caughtup = false;
+ *pending = true;
+ return true;
}
+updt:
+ WalSndOutHead = WalSndOutTail = 0;
+ *pending = false;
+
+ sentPtr = endptr;
+
/* Update shared memory status */
- SpinLockAcquire(&walsnd->mutex);
- walsnd->sentPtr = sentPtr;
- SpinLockRelease(&walsnd->mutex);
+ {
+ /* use volatile pointer to prevent code rearrangement */
+ volatile WalSnd *walsnd = MyWalSnd;
- /* Flush pending output */
- if (pq_flush())
- return false;
+ SpinLockAcquire(&walsnd->mutex);
+ walsnd->sentPtr = sentPtr;
+ SpinLockRelease(&walsnd->mutex);
+ }
/* Report progress of XLOG streaming in PS display */
- snprintf(activitymsg, sizeof(activitymsg), "streaming %X/%X",
- sentPtr.xlogid, sentPtr.xrecoff);
- set_ps_display(activitymsg, false);
+ if (update_process_title)
+ {
+ char activitymsg[50];
+
+ snprintf(activitymsg, sizeof(activitymsg), "streaming %X/%X",
+ sentPtr.xlogid, sentPtr.xrecoff);
+ set_ps_display(activitymsg, false);
+ }
return true;
}
WalSndSigHupHandler(SIGNAL_ARGS)
{
got_SIGHUP = true;
+ if (MyWalSnd)
+ SetLatch(&MyWalSnd->latch);
}
/* SIGTERM: set flag to shut down */
WalSndShutdownHandler(SIGNAL_ARGS)
{
shutdown_requested = true;
+ if (MyWalSnd)
+ SetLatch(&MyWalSnd->latch);
}
/*
exit(2);
}
+/* SIGUSR1: set flag to send WAL records */
+static void
+WalSndXLogSendHandler(SIGNAL_ARGS)
+{
+ latch_sigusr1_handler();
+}
+
/* SIGUSR2: set flag to do a last cycle and shut down afterwards */
static void
WalSndLastCycleHandler(SIGNAL_ARGS)
{
ready_to_stop = true;
+ if (MyWalSnd)
+ SetLatch(&MyWalSnd->latch);
}
/* Set up signal handlers */
pqsignal(SIGQUIT, WalSndQuickDieHandler); /* hard crash time */
pqsignal(SIGALRM, SIG_IGN);
pqsignal(SIGPIPE, SIG_IGN);
- pqsignal(SIGUSR1, SIG_IGN); /* not used */
+ pqsignal(SIGUSR1, WalSndXLogSendHandler); /* request WAL sending */
pqsignal(SIGUSR2, WalSndLastCycleHandler); /* request a last cycle and
* shutdown */
size = offsetof(WalSndCtlData, walsnds);
size = add_size(size, mul_size(max_wal_senders, sizeof(WalSnd)));
+ /*
+ * If replication is enabled, we have a data structure called
+ * WalSndWaiters, created in shared memory.
+ */
+ if (max_wal_senders > 0)
+ size = add_size(size, mul_size(MaxBackends, sizeof(WalSndWaiter)));
+
return size;
}
{
bool found;
int i;
+ Size size = add_size(offsetof(WalSndCtlData, walsnds),
+ mul_size(max_wal_senders, sizeof(WalSnd)));
WalSndCtl = (WalSndCtlData *)
- ShmemInitStruct("Wal Sender Ctl", WalSndShmemSize(), &found);
+ ShmemInitStruct("Wal Sender Ctl", size, &found);
if (!found)
{
/* First time through, so initialize */
- MemSet(WalSndCtl, 0, WalSndShmemSize());
+ MemSet(WalSndCtl, 0, size);
for (i = 0; i < max_wal_senders; i++)
{
WalSnd *walsnd = &WalSndCtl->walsnds[i];
SpinLockInit(&walsnd->mutex);
+ InitSharedLatch(&walsnd->latch);
+ }
+ }
+
+ /* Create or attach to the WalSndWaiters array too, if needed */
+ if (max_wal_senders > 0)
+ {
+ WalSndWaiters = (WalSndWaiter *)
+ ShmemInitStruct("WalSndWaiters",
+ mul_size(MaxBackends, sizeof(WalSndWaiter)),
+ &found);
+ WalSndCtl->maxWaiters = MaxBackends;
+ }
+}
+
+/* Wake up all walsenders */
+void
+WalSndWakeup(void)
+{
+ int i;
+
+ for (i = 0; i < max_wal_senders; i++)
+ SetLatch(&WalSndCtl->walsnds[i].latch);
+}
+
+/*
+ * Ensure that replication has been completed up to the given position.
+ */
+void
+WaitXLogSend(XLogRecPtr record)
+{
+ int i;
+ bool mustwait = false;
+
+ Assert(max_wal_senders > 0);
+
+ for (i = 0; i < max_wal_senders; i++)
+ {
+ /* use volatile pointer to prevent code rearrangement */
+ volatile WalSnd *walsnd = &WalSndCtl->walsnds[i];
+ XLogRecPtr recptr;
+
+ /* Don't need to wait for asynchronous walsender */
+ if (walsnd->pid == 0 ||
+ walsnd->rplMode <= REPLICATION_MODE_ASYNC)
+ continue;
+
+ SpinLockAcquire(&walsnd->mutex);
+ recptr = walsnd->ackdPtr;
+ SpinLockRelease(&walsnd->mutex);
+
+ if (recptr.xlogid == 0 && recptr.xrecoff == 0)
+ continue;
+
+ /* Quick exit if already known replicated */
+ if (XLByteLE(record, recptr))
+ return;
+
+ mustwait = true;
+ }
+
+ /*
+ * Don't need to wait for replication if there is no synchronous
+ * standby
+ */
+ if (!mustwait)
+ return;
+
+ /*
+ * Register myself into the wait list and sleep until replication
+ * has been completed up to the given position and the walsender
+ * signals me.
+ *
+ * If replication has been completed up to the latest position
+ * before the registration, walsender might be unable to send the
+ * signal immediately. We must wake up the walsender after the
+ * registration.
+ */
+ ResetLatch(&MyProc->latch);
+ RegisterWalSndWaiter(MyBackendId, record, &MyProc->latch);
+ WalSndWakeup();
+
+ for (;;)
+ {
+ WaitLatch(&MyProc->latch, 1000000L);
+
+ /* If done already, we finish waiting */
+ if (replication_done)
+ {
+ replication_done = false;
+ return;
}
}
}
/*
- * This isn't currently used for anything. Monitoring tools might be
- * interested in the future, and we'll need something like this in the
- * future for synchronous replication.
+ * Register the given backend into the wait list.
*/
-#ifdef NOT_USED
+static void
+RegisterWalSndWaiter(BackendId backendId, XLogRecPtr record, Latch *latch)
+{
+ /* use volatile pointer to prevent code rearrangement */
+ volatile WalSndCtlData *walsndctl = WalSndCtl;
+ int i;
+ int count = 0;
+
+ LWLockAcquire(WalSndWaiterLock, LW_EXCLUSIVE);
+
+ /* Out of slots. This should not happen. */
+ if (walsndctl->numWaiters + 1 > walsndctl->maxWaiters)
+ elog(PANIC, "out of replication waiters slots");
+
+ /*
+ * The given position is expected to be relatively new in the
+ * wait list. Since the entries in the list are sorted in an
+ * increasing order of XLogRecPtr, we can shorten the time it
+ * takes to find an insert slot by scanning the list backwards.
+ */
+ for (i = walsndctl->numWaiters; i > 0; i--)
+ {
+ if (XLByteLE(WalSndWaiters[i - 1].record, record))
+ break;
+ count++;
+ }
+
+ /* Shuffle the list if needed */
+ if (count > 0)
+ memmove(&WalSndWaiters[i + 1], &WalSndWaiters[i],
+ count * sizeof(WalSndWaiter));
+
+ WalSndWaiters[i].backendId = backendId;
+ WalSndWaiters[i].record = record;
+ WalSndWaiters[i].latch = latch;
+ walsndctl->numWaiters++;
+
+ LWLockRelease(WalSndWaiterLock);
+}
+
/*
- * Returns the oldest Send position among walsenders. Or InvalidXLogRecPtr
- * if none.
+ * Wake up the backends waiting until replication has been completed
+ * up to the position older than or equal to the given one.
+ *
+ * Wake up all waiters if InvalidXLogRecPtr is given.
+ */
+static void
+WakeupWalSndWaiters(XLogRecPtr record)
+{
+ /* use volatile pointer to prevent code rearrangement */
+ volatile WalSndCtlData *walsndctl = WalSndCtl;
+ int i;
+ int count = 0;
+ bool all_wakeup = (record.xlogid == 0 && record.xrecoff == 0);
+
+ LWLockAcquire(WalSndWaiterLock, LW_EXCLUSIVE);
+
+ for (i = 0; i < walsndctl->numWaiters; i++)
+ {
+ /* use volatile pointer to prevent code rearrangement */
+ volatile WalSndWaiter *waiter = &WalSndWaiters[i];
+
+ if (all_wakeup || XLByteLE(waiter->record, record))
+ {
+ SetProcLatch(waiter->latch, PROCSIG_REPLICATION_INTERRUPT,
+ waiter->backendId);
+ count++;
+ }
+ else
+ {
+ /*
+ * If the backend waiting for the Ack position newer than
+ * the given one is found, we don't need to search the wait
+ * list any more. This is because the waiters in the list
+ * are guaranteed to be sorted in an increasing order of
+ * XLogRecPtr.
+ */
+ break;
+ }
+ }
+
+ /* If there are still some waiters, left-justify them in the list */
+ walsndctl->numWaiters -= count;
+ if (walsndctl->numWaiters > 0 && count > 0)
+ memmove(&WalSndWaiters[0], &WalSndWaiters[i],
+ walsndctl->numWaiters * sizeof(WalSndWaiter));
+
+ LWLockRelease(WalSndWaiterLock);
+}
+
+/*
+ * Returns the oldest Ack position in synchronous walsenders. Or
+ * InvalidXLogRecPtr if none.
*/
-XLogRecPtr
-GetOldestWALSendPointer(void)
+static XLogRecPtr
+GetOldestAckdPtr(void)
{
XLogRecPtr oldest = {0, 0};
- int i;
- bool found = false;
+ int i;
+ bool found = false;
for (i = 0; i < max_wal_senders; i++)
{
/* use volatile pointer to prevent code rearrangement */
volatile WalSnd *walsnd = &WalSndCtl->walsnds[i];
- XLogRecPtr recptr;
+ XLogRecPtr recptr;
- if (walsnd->pid == 0)
+ /*
+ * Ignore the Ack position that asynchronous walsender has
+ * since it has never received any Ack.
+ */
+ if (walsnd->pid == 0 ||
+ walsnd->rplMode <= REPLICATION_MODE_ASYNC)
continue;
SpinLockAcquire(&walsnd->mutex);
- recptr = walsnd->sentPtr;
+ recptr = walsnd->ackdPtr;
SpinLockRelease(&walsnd->mutex);
+ /*
+ * Ignore the Ack position that the walsender which has not
+ * received any Ack yet has.
+ */
if (recptr.xlogid == 0 && recptr.xrecoff == 0)
continue;
}
return oldest;
}
-#endif
+
+/*
+ * This is called when PROCSIG_REPLICATION_INTERRUPT is received.
+ */
+void
+HandleReplicationInterrupt(void)
+{
+ replication_done = true;
+}