OSDN Git Service

Fix typo.
[pg-rex/syncrep.git] / src / backend / replication / walsender.c
index a10dc3d..bfcb49f 100644 (file)
@@ -2,14 +2,14 @@
  *
  * walsender.c
  *
- * The WAL sender process (walsender) is new as of Postgres 8.5. It takes
- * charge of XLOG streaming sender in the primary server. At first, it is
- * started by the postmaster when the walreceiver in the standby server
- * connects to the primary server and requests XLOG streaming replication,
- * i.e., unlike any auxiliary process, it is not an always-running process.
+ * The WAL sender process (walsender) is new as of Postgres 9.0. It takes
+ * care of sending XLOG from the primary server to a single recipient.
+ * (Note that there can be more than one walsender process concurrently.)
+ * It is started by the postmaster when the walreceiver of a standby server
+ * connects to the primary server and requests XLOG streaming replication.
  * It attempts to keep reading XLOG records from the disk and sending them
  * to the standby server, as long as the connection is alive (i.e., like
- * any backend, there is an one to one relationship between a connection
+ * any backend, there is a one-to-one relationship between a connection
  * and a walsender process).
  *
  * Normal termination is by SIGTERM, which instructs the walsender to
  * This instruct walsender to send any outstanding WAL, including the
  * shutdown checkpoint record, and then exit.
  *
- * Note that there can be more than one walsender process concurrently.
  *
  * Portions Copyright (c) 2010-2010, PostgreSQL Global Development Group
  *
- *
  * IDENTIFICATION
- *       $PostgreSQL: pgsql/src/backend/replication/walsender.c,v 1.3 2010/01/21 08:19:57 heikki Exp $
+ *       $PostgreSQL: pgsql/src/backend/replication/walsender.c,v 1.32 2010/09/15 06:51:19 heikki Exp $
  *
  *-------------------------------------------------------------------------
  */
 #include "postgres.h"
 
+#include <signal.h>
 #include <unistd.h>
 
 #include "access/xlog_internal.h"
 #include "libpq/pqformat.h"
 #include "libpq/pqsignal.h"
 #include "miscadmin.h"
+#include "replication/walprotocol.h"
 #include "replication/walsender.h"
 #include "storage/fd.h"
 #include "storage/ipc.h"
-#include "storage/lock.h"
 #include "storage/pmsignal.h"
+#include "storage/proc.h"
 #include "tcop/tcopprot.h"
 #include "utils/guc.h"
 #include "utils/memutils.h"
 #include "utils/ps_status.h"
 
+
 /* Array of WalSnds in shared memory */
 WalSndCtlData *WalSndCtl = NULL;
 
 /* My slot in the shared memory array */
 static WalSnd *MyWalSnd = NULL;
 
+/* Array of WalSndWaiter in shared memory */
+static WalSndWaiter  *WalSndWaiters;
+
 /* Global state */
-bool   am_walsender    = false;        /* Am I a walsender process ? */
+bool           am_walsender = false;           /* Am I a walsender process ? */
 
 /* User-settable parameters for walsender */
-int    MaxWalSenders = 0;              /* the maximum number of concurrent walsenders */
-int    WalSndDelay     = 200;          /* max sleep time between some actions */
+int                    max_wal_senders = 0;    /* the maximum number of concurrent walsenders */
+int                    WalSndDelay = 200;      /* max sleep time between some actions */
+int                    replication_timeout = 0;        /* maximum time to wait for the Ack from the standby */
 
-#define NAPTIME_PER_CYCLE 100  /* max sleep time between cycles (100ms) */
+/*
+ * Buffer for WAL sending
+ *
+ * WalSndOutBuffer is a work area in which the output message is constructed.
+ * It's used in just so we can avoid re-palloc'ing the buffer on each cycle.
+ * It must be of size 6 + sizeof(WalDataMessageHeader) + MAX_SEND_SIZE.
+ */
+static char       *WalSndOutBuffer;
+static int             WalSndOutHead;          /* head of pending output */
+static int             WalSndOutTail;          /* tail of pending output */
 
 /*
  * These variables are used similarly to openLogFile/Id/Seg/Off,
@@ -80,33 +94,49 @@ static uint32 sendOff = 0;
 
 /*
  * How far have we sent WAL already? This is also advertised in
- * MyWalSnd->sentPtr.
+ * MyWalSnd->sentPtr.  (Actually, this is the next WAL location to send.)
  */
 static XLogRecPtr sentPtr = {0, 0};
 
+/*
+ * How far have we completed replication already? This is also
+ * advertised in MyWalSnd->ackdPtr. This is not used in asynchronous
+ * replication case.
+ */
+static XLogRecPtr ackdPtr = {0, 0};
+
+/* Replication mode requested by connected standby */
+static int     rplMode = REPLICATION_MODE_ASYNC;
+
 /* Flags set by signal handlers for later service in main loop */
 static volatile sig_atomic_t got_SIGHUP = false;
 static volatile sig_atomic_t shutdown_requested = false;
 static volatile sig_atomic_t ready_to_stop = false;
 
+/* Flag set by signal handler of backends for replication */
+static volatile sig_atomic_t replication_done = false;
+
 /* Signal handlers */
 static void WalSndSigHupHandler(SIGNAL_ARGS);
 static void WalSndShutdownHandler(SIGNAL_ARGS);
 static void WalSndQuickDieHandler(SIGNAL_ARGS);
+static void WalSndXLogSendHandler(SIGNAL_ARGS);
+static void WalSndLastCycleHandler(SIGNAL_ARGS);
 
 /* Prototypes for private functions */
 static int     WalSndLoop(void);
-static void    InitWalSnd(void);
-static void    WalSndHandshake(void);
-static void    WalSndKill(int code, Datum arg);
+static void InitWalSnd(void);
+static void WalSndHandshake(void);
+static void WalSndKill(int code, Datum arg);
 static void XLogRead(char *buf, XLogRecPtr recptr, Size nbytes);
-static bool XLogSend(StringInfo outMsg);
-static void CheckClosedConnection(void);
+static bool XLogSend(bool *caughtup, bool *pending);
+static void ProcessStreamMsgs(StringInfo inMsg);
+
+static void RegisterWalSndWaiter(BackendId backendId, XLogRecPtr record,
+                                                                Latch *latch);
+static void WakeupWalSndWaiters(XLogRecPtr record);
+static XLogRecPtr GetOldestAckdPtr(void);
 
-/*
- * How much WAL to send in one message? Must be >= XLOG_BLCKSZ.
- */
-#define MAX_SEND_SIZE (XLOG_SEG_SIZE / 2)
 
 /* Main entry point for walsender process */
 int
@@ -114,10 +144,10 @@ WalSenderMain(void)
 {
        MemoryContext walsnd_context;
 
-       if (!superuser())
+       if (RecoveryInProgress())
                ereport(FATAL,
-                               (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
-                                errmsg("must be superuser to start walsender")));
+                               (errcode(ERRCODE_CANNOT_CONNECT_NOW),
+                                errmsg("recovery is still in progress, can't accept WAL streaming connections")));
 
        /* Create a per-walsender data structure in shared memory */
        InitWalSnd();
@@ -147,26 +177,46 @@ WalSenderMain(void)
        /* Handle handshake messages before streaming */
        WalSndHandshake();
 
+       /* Initialize shared memory status */
+       {
+               /* use volatile pointer to prevent code rearrangement */
+               volatile WalSnd *walsnd = MyWalSnd;
+
+               SpinLockAcquire(&walsnd->mutex);
+               walsnd->sentPtr = sentPtr;
+               SpinLockRelease(&walsnd->mutex);
+       }
+
        /* Main loop of walsender */
        return WalSndLoop();
 }
 
+/*
+ * Execute commands from walreceiver, until we enter streaming mode.
+ */
 static void
 WalSndHandshake(void)
 {
        StringInfoData input_message;
-       bool replication_started = false;
+       bool            replication_started = false;
 
        initStringInfo(&input_message);
 
        while (!replication_started)
        {
-               int firstchar;
+               int                     firstchar;
 
                /* Wait for a command to arrive */
                firstchar = pq_getbyte();
 
                /*
+                * Emergency bailout if postmaster has died.  This is to avoid the
+                * necessity for manual cleanup of all postmaster children.
+                */
+               if (!PostmasterIsAlive(true))
+                       exit(1);
+
+               /*
                 * Check for any other interesting events that happened while we
                 * slept.
                 */
@@ -176,124 +226,155 @@ WalSndHandshake(void)
                        ProcessConfigFile(PGC_SIGHUP);
                }
 
-               if (firstchar == EOF)
-               {
-                       /* standby disconnected */
-                       ereport(COMMERROR,
-                                       (errcode(ERRCODE_PROTOCOL_VIOLATION),
-                                        errmsg("unexpected EOF on standby connection")));
-               }
-               else
+               if (firstchar != EOF)
                {
                        /*
                         * Read the message contents. This is expected to be done without
                         * blocking because we've been able to get message type code.
                         */
                        if (pq_getmessage(&input_message, 0))
-                               firstchar = EOF;                /* suitable message already logged */
+                               firstchar = EOF;        /* suitable message already logged */
                }
 
-
                /* Handle the very limited subset of commands expected in this phase */
-
                switch (firstchar)
                {
-                       case 'Q':       /* Query message */
-                       {
-                               const char *query_string;
-                               XLogRecPtr      recptr;
-
-                               query_string = pq_getmsgstring(&input_message);
-                               pq_getmsgend(&input_message);
+                       char            rplModeStr[6];
 
-                               if (strcmp(query_string, "IDENTIFY_SYSTEM") == 0)
+                       case 'Q':                       /* Query message */
                                {
-                                       StringInfoData  buf;
-                                       char    sysid[32];
-                                       char    tli[11];
-
-                                       /*
-                                        * Reply with a result set with one row, two columns.
-                                        * First col is system ID, and second if timeline ID
-                                        */
-
-                                       snprintf(sysid, sizeof(sysid), UINT64_FORMAT,
-                                                        GetSystemIdentifier());
-                                       snprintf(tli, sizeof(tli), "%u", ThisTimeLineID);
-
-                                       /* Send a RowDescription message */
-                                       pq_beginmessage(&buf, 'T');
-                                       pq_sendint(&buf, 2, 2); /* 2 fields */
-
-                                       /* first field */
-                                       pq_sendstring(&buf, "systemid"); /* col name */
-                                       pq_sendint(&buf, 0, 4);                 /* table oid */
-                                       pq_sendint(&buf, 0, 2);                 /* attnum */
-                                       pq_sendint(&buf, TEXTOID, 4);   /* type oid */
-                                       pq_sendint(&buf, -1, 2);                /* typlen */
-                                       pq_sendint(&buf, 0, 4);                 /* typmod */
-                                       pq_sendint(&buf, 0, 2);                 /* format code */
-
-                                       /* second field */
-                                       pq_sendstring(&buf, "timeline"); /* col name */
-                                       pq_sendint(&buf, 0, 4);                 /* table oid */
-                                       pq_sendint(&buf, 0, 2);                 /* attnum */
-                                       pq_sendint(&buf, INT4OID, 4);   /* type oid */
-                                       pq_sendint(&buf, 4, 2);                 /* typlen */
-                                       pq_sendint(&buf, 0, 4);                 /* typmod */
-                                       pq_sendint(&buf, 0, 2);                 /* format code */
-                                       pq_endmessage(&buf);
-
-                                       /* Send a DataRow message */
-                                       pq_beginmessage(&buf, 'D');
-                                       pq_sendint(&buf, 2, 2);                 /* # of columns */
-                                       pq_sendint(&buf, strlen(sysid), 4); /* col1 len */
-                                       pq_sendbytes(&buf, (char *) &sysid, strlen(sysid));
-                                       pq_sendint(&buf, strlen(tli), 4); /* col2 len */
-                                       pq_sendbytes(&buf, (char *) tli, strlen(tli));
-                                       pq_endmessage(&buf);
-
-                                       /* Send CommandComplete and ReadyForQuery messages */
-                                       EndCommand("SELECT", DestRemote);
-                                       ReadyForQuery(DestRemote);
+                                       const char *query_string;
+                                       XLogRecPtr      recptr;
+
+                                       query_string = pq_getmsgstring(&input_message);
+                                       pq_getmsgend(&input_message);
+
+                                       if (strcmp(query_string, "IDENTIFY_SYSTEM") == 0)
+                                       {
+                                               StringInfoData buf;
+                                               char            sysid[32];
+                                               char            tli[11];
+
+                                               /*
+                                                * Reply with a result set with one row, two columns.
+                                                * First col is system ID, and second is timeline ID
+                                                */
+
+                                               snprintf(sysid, sizeof(sysid), UINT64_FORMAT,
+                                                                GetSystemIdentifier());
+                                               snprintf(tli, sizeof(tli), "%u", ThisTimeLineID);
+
+                                               /* Send a RowDescription message */
+                                               pq_beginmessage(&buf, 'T');
+                                               pq_sendint(&buf, 2, 2); /* 2 fields */
+
+                                               /* first field */
+                                               pq_sendstring(&buf, "systemid");                /* col name */
+                                               pq_sendint(&buf, 0, 4); /* table oid */
+                                               pq_sendint(&buf, 0, 2); /* attnum */
+                                               pq_sendint(&buf, TEXTOID, 4);   /* type oid */
+                                               pq_sendint(&buf, -1, 2);                /* typlen */
+                                               pq_sendint(&buf, 0, 4); /* typmod */
+                                               pq_sendint(&buf, 0, 2); /* format code */
+
+                                               /* second field */
+                                               pq_sendstring(&buf, "timeline");                /* col name */
+                                               pq_sendint(&buf, 0, 4); /* table oid */
+                                               pq_sendint(&buf, 0, 2); /* attnum */
+                                               pq_sendint(&buf, INT4OID, 4);   /* type oid */
+                                               pq_sendint(&buf, 4, 2); /* typlen */
+                                               pq_sendint(&buf, 0, 4); /* typmod */
+                                               pq_sendint(&buf, 0, 2); /* format code */
+                                               pq_endmessage(&buf);
+
+                                               /* Send a DataRow message */
+                                               pq_beginmessage(&buf, 'D');
+                                               pq_sendint(&buf, 2, 2); /* # of columns */
+                                               pq_sendint(&buf, strlen(sysid), 4);             /* col1 len */
+                                               pq_sendbytes(&buf, (char *) &sysid, strlen(sysid));
+                                               pq_sendint(&buf, strlen(tli), 4);               /* col2 len */
+                                               pq_sendbytes(&buf, (char *) tli, strlen(tli));
+                                               pq_endmessage(&buf);
+
+                                               /* Send CommandComplete and ReadyForQuery messages */
+                                               EndCommand("SELECT", DestRemote);
+                                               ReadyForQuery(DestRemote);
+                                               /* ReadyForQuery did pq_flush for us */
+                                       }
+                                       else if (sscanf(query_string, "START_REPLICATION %X/%X MODE %5s",
+                                                                       &recptr.xlogid, &recptr.xrecoff, rplModeStr) == 3)
+                                       {
+                                               StringInfoData buf;
+
+                                               /*
+                                                * Check that we're logging enough information in the
+                                                * WAL for log-shipping.
+                                                *
+                                                * NOTE: This only checks the current value of
+                                                * wal_level. Even if the current setting is not
+                                                * 'minimal', there can be old WAL in the pg_xlog
+                                                * directory that was created with 'minimal'. So this
+                                                * is not bulletproof, the purpose is just to give a
+                                                * user-friendly error message that hints how to
+                                                * configure the system correctly.
+                                                */
+                                               if (wal_level == WAL_LEVEL_MINIMAL)
+                                                       ereport(FATAL,
+                                                                       (errcode(ERRCODE_CANNOT_CONNECT_NOW),
+                                                                        errmsg("standby connections not allowed because wal_level=minimal")));
+
+                                               /* Verify that the specified replication mode is valid */
+                                               {
+                                                       const struct config_enum_entry *entry;
+
+                                                       for (entry = replication_mode_options; entry && entry->name; entry++)
+                                                       {
+                                                               if (strcmp(rplModeStr, entry->name) == 0)
+                                                               {
+                                                                       rplMode = entry->val;
+                                                                       break;
+                                                               }
+                                                       }
+                                                       if (entry == NULL || entry->name == NULL)
+                                                               ereport(FATAL,
+                                                                               (errcode(ERRCODE_PROTOCOL_VIOLATION),
+                                                                                errmsg("invalid replication mode: %s", rplModeStr)));
+                                               }
+                                               MyWalSnd->rplMode = rplMode;
+
+                                               /* Send a CopyXLogResponse message, and start streaming */
+                                               pq_beginmessage(&buf, 'W');
+                                               pq_endmessage(&buf);
+                                               pq_flush();
+
+                                               /*
+                                                * Initialize positions to the received one, then the
+                                                * xlog records begin to be shipped from that position
+                                                */
+                                               sentPtr = ackdPtr = recptr;
+
+                                               /* break out of the loop */
+                                               replication_started = true;
+                                       }
+                                       else
+                                       {
+                                               ereport(FATAL,
+                                                               (errcode(ERRCODE_PROTOCOL_VIOLATION),
+                                                                errmsg("invalid standby query string: %s", query_string)));
+                                       }
+                                       break;
                                }
-                               else if (sscanf(query_string, "START_REPLICATION %X/%X",
-                                                               &recptr.xlogid, &recptr.xrecoff) == 2)
-                               {
-                                       StringInfoData  buf;
 
-                                       /* Send a CopyOutResponse message, and start streaming */
-                                       pq_beginmessage(&buf, 'H');
-                                       pq_sendbyte(&buf, 0);
-                                       pq_sendint(&buf, 0, 2);
-                                       pq_endmessage(&buf);
-
-                                       /*
-                                        * Initialize position to the received one, then
-                                        * the xlog records begin to be shipped from that position
-                                        */
-                                       sentPtr = recptr;
-
-                                       /* break out of the loop */
-                                       replication_started = true;
-                               }
-                               else
-                               {
-                                       ereport(FATAL,
-                                                       (errcode(ERRCODE_PROTOCOL_VIOLATION),
-                                                        errmsg("invalid standby query string: %s", query_string)));
-                               }
-                               break;
-                       }
-
-                       /* 'X' means that the standby is closing the connection */
                        case 'X':
+                               /* standby is closing the connection */
                                proc_exit(0);
 
                        case EOF:
-                               ereport(ERROR,
+                               /* standby disconnected unexpectedly */
+                               ereport(COMMERROR,
                                                (errcode(ERRCODE_PROTOCOL_VIOLATION),
                                                 errmsg("unexpected EOF on standby connection")));
+                               proc_exit(0);
 
                        default:
                                ereport(FATAL,
@@ -304,78 +385,143 @@ WalSndHandshake(void)
 }
 
 /*
- * Check if the remote end has closed the connection.
+ * Process messages received from the standby.
+ *
+ * ereports on error.
  */
 static void
-CheckClosedConnection(void)
+ProcessStreamMsgs(StringInfo inMsg)
 {
-       unsigned char firstchar;
-       int r;
+       bool    acked = false;
 
-       r = pq_getbyte_if_available(&firstchar);
-       if (r < 0)
+       /* Loop to process successive complete messages available */
+       for (;;)
        {
-               /* no data available */
-               if (errno == EAGAIN || errno == EWOULDBLOCK)
-                       return;
+               unsigned char firstchar;
+               int                     r;
 
-               /*
-                * Ok if interrupted, though it shouldn't really happen with
-                * a non-blocking operation.
-                */
-               if (errno == EINTR)
-                       return;
+               r = pq_getbyte_if_available(&firstchar);
+               if (r < 0)
+               {
+                       /* unexpected error or EOF */
+                       ereport(COMMERROR,
+                                       (errcode(ERRCODE_PROTOCOL_VIOLATION),
+                                        errmsg("unexpected EOF on standby connection")));
+                       proc_exit(0);
+               }
+               if (r == 0)
+               {
+                       /* no data available without blocking */
+                       break;
+               }
 
-               ereport(COMMERROR,
-                               (errcode_for_socket_access(),
-                                errmsg("could not receive data from client: %m")));
-       }
-       if (r == 0)
-       {
-               /* standby disconnected unexpectedly */
-               ereport(ERROR,
-                               (errcode(ERRCODE_PROTOCOL_VIOLATION),
-                                errmsg("unexpected EOF on standby connection")));
+               /* Handle the very limited subset of commands expected in this phase */
+               switch (firstchar)
+               {
+                       case 'd':       /* CopyData message */
+                       {
+                               unsigned char   rpltype;
+
+                               /*
+                                * Read the message contents. This is expected to be done without
+                                * blocking because we've been able to get message type code.
+                                */
+                               if (pq_getmessage(inMsg, 0))
+                                       proc_exit(0);           /* suitable message already logged */
+
+                               /* Read the replication message type from CopyData message */
+                               rpltype = pq_getmsgbyte(inMsg);
+                               switch (rpltype)
+                               {
+                                       case 'l':
+                                       {
+                                               WalAckMessageData  *msgdata;
+
+                                               msgdata = (WalAckMessageData *) pq_getmsgbytes(inMsg, sizeof(WalAckMessageData));
+
+                                               /*
+                                                * Update local status.
+                                                *
+                                                * The ackd ptr received from standby should not
+                                                * go backwards.
+                                                */
+                                               if (XLByteLE(ackdPtr, msgdata->ackEnd))
+                                                       ackdPtr = msgdata->ackEnd;
+                                               else
+                                                       ereport(FATAL,
+                                                                       (errmsg("replication completion location went back from "
+                                                                                       "%X/%X to %X/%X",
+                                                                                       ackdPtr.xlogid, ackdPtr.xrecoff,
+                                                                                       msgdata->ackEnd.xlogid, msgdata->ackEnd.xrecoff)));
+
+                                               acked = true;   /* also need to update shared position */
+                                               break;
+                                       }
+                                       default:
+                                               ereport(FATAL,
+                                                               (errcode(ERRCODE_PROTOCOL_VIOLATION),
+                                                                errmsg("invalid replication message type %d",
+                                                                               rpltype)));
+                               }
+                               break;
+                       }
+
+                       /*
+                        * 'X' means that the standby is closing down the socket.
+                        */
+                       case 'X':
+                               proc_exit(0);
+
+                       default:
+                               ereport(FATAL,
+                                               (errcode(ERRCODE_PROTOCOL_VIOLATION),
+                                                errmsg("invalid standby message type %d",
+                                                               firstchar)));
+               }
        }
 
-       /* Handle the very limited subset of commands expected in this phase */
-       switch (firstchar)
+       if (acked)
        {
-               /*
-                * 'X' means that the standby is closing down the socket. EOF means
-                * unexpected loss of standby connection. Either way, perform normal
-                * shutdown.
-                */
-               case 'X':
-                       proc_exit(0);
+               /* use volatile pointer to prevent code rearrangement */
+               volatile WalSnd *walsnd = MyWalSnd;
 
-               default:
-                       ereport(FATAL,
-                                       (errcode(ERRCODE_PROTOCOL_VIOLATION),
-                                        errmsg("invalid standby closing message type %d",
-                                                       firstchar)));
+               SpinLockAcquire(&walsnd->mutex);
+               walsnd->ackdPtr = ackdPtr;
+               SpinLockRelease(&walsnd->mutex);
        }
+
+       /* Wake up the backends that this walsender had been blocking */
+       WakeupWalSndWaiters(ackdPtr);
 }
 
 /* Main loop of walsender process */
 static int
 WalSndLoop(void)
 {
-       StringInfoData output_message;
+       StringInfoData  input_message;
+       bool            caughtup = false;
+       bool            pending = false;
+
+       initStringInfo(&input_message);
 
-       initStringInfo(&output_message);
+       /*
+        * Allocate buffer that will be used for each output message.  We do this
+        * just once to reduce palloc overhead.  The buffer must be made large
+        * enough for maximum-sized messages.
+        */
+       WalSndOutBuffer = palloc(6 + sizeof(WalDataMessageHeader) + MAX_SEND_SIZE);
+       WalSndOutHead = WalSndOutTail = 0;
 
-       /* Loop forever */
+       /* Loop forever, unless we get an error */
        for (;;)
        {
-               int remain;     /* remaining time (ms) */
-
                /*
                 * Emergency bailout if postmaster has died.  This is to avoid the
                 * necessity for manual cleanup of all postmaster children.
                 */
                if (!PostmasterIsAlive(true))
                        exit(1);
+
                /* Process any requests or signals received recently */
                if (got_SIGHUP)
                {
@@ -389,8 +535,10 @@ WalSndLoop(void)
                 */
                if (ready_to_stop)
                {
-                       XLogSend(&output_message);
-                       shutdown_requested = true;
+                       if (!XLogSend(&caughtup, &pending))
+                               break;
+                       if (caughtup && !pending)
+                               shutdown_requested = true;
                }
 
                /* Normal exit from the walsender is here */
@@ -404,55 +552,103 @@ WalSndLoop(void)
                }
 
                /*
-                * Nap for the configured time or until a message arrives.
-                *
-                * On some platforms, signals won't interrupt the sleep.  To ensure we
-                * respond reasonably promptly when someone signals us, break down the
-                * sleep into NAPTIME_PER_CYCLE (ms) increments, and check for
-                * interrupts after each nap.
+                * If we had sent all accumulated WAL in last round or could not
+                * flush pending WAL in output buffer because the socket was not
+                * writable, nap for the configured time before retrying.
                 */
-               remain = WalSndDelay;
-               while (remain > 0)
+               if (caughtup || pending)
                {
-                       if (got_SIGHUP || shutdown_requested || ready_to_stop)
-                               break;
-
                        /*
-                        * Check to see whether a message from the standby or an interrupt
-                        * from other processes has arrived.
+                        * Even if we wrote all the WAL that was available when we started
+                        * sending, more might have arrived while we were sending this
+                        * batch. We had the latch set while sending, so we have not
+                        * received any signals from that time. Let's arm the latch
+                        * again, and after that check that we're still up-to-date.
                         */
-                       pg_usleep(remain > NAPTIME_PER_CYCLE ? NAPTIME_PER_CYCLE : remain);
-                       CheckClosedConnection();
+                       ResetLatch(&MyWalSnd->latch);
 
-                       remain -= NAPTIME_PER_CYCLE;
-               }
+                       if (!XLogSend(&caughtup, &pending))
+                               break;
+                       if ((caughtup || pending) && !got_SIGHUP && !ready_to_stop &&
+                               !shutdown_requested)
+                       {
+                               bool            check_timeout;
+                               long            sleeptime;
+                               int                     res;
+
+                               /*
+                                * XXX: We don't really need the periodic wakeups anymore,
+                                * WaitLatchOrSocket should reliably wake up as soon as
+                                * something interesting happens.
+                                */
+
+                               /*
+                                * Check for replication timeout if it's enabled and we need
+                                * to wait until the socket has become writable to flush
+                                * pending WAL in output buffer or until the Ack message
+                                * from the standby has become available.
+                                */
+                               if (replication_timeout > 0 &&
+                                       (pending ||
+                                        (rplMode != REPLICATION_MODE_ASYNC &&
+                                         XLByteLT(ackdPtr, sentPtr))))
+                               {
+                                       sleeptime = replication_timeout;
+                                       check_timeout = true;
+                               }
+                               else
+                               {
+                                       sleeptime = WalSndDelay;
+                                       check_timeout = false;
+                               }
 
-               /* Attempt to send the log once every loop */
-               if (!XLogSend(&output_message))
-                       goto eof;
-       }
+                               /* Sleep */
+                               res = WaitLatchOrSocket(&MyWalSnd->latch, MyProcPort->sock,
+                                                                               true, (WalSndOutTail > 0),
+                                                                               sleeptime * 1000L);
 
-       /* can't get here because the above loop never exits */
-       return 1;
+                               if (res == 0 && check_timeout)
+                               {
+                                       /*
+                                        * Since typically expiration of replication timeout means
+                                        * communication problem, we don't send the error message
+                                        * to the standby.
+                                        */
+                                       ereport(COMMERROR,
+                                                       (errmsg("terminating walsender process due to replication timeout")));
+                                       break;
+                               }
+                       }
+
+                       /* Process messages received from the standby */
+                       ProcessStreamMsgs(&input_message);
+               }
+               else
+               {
+                       /* Attempt to send the log once every loop */
+                       if (!XLogSend(&caughtup, &pending))
+                               break;
+               }
+       }
 
-eof:
        /*
-        * Reset whereToSendOutput to prevent ereport from attempting
-        * to send any more messages to the standby.
+        * Get here on send failure.  Clean up and exit.
+        *
+        * Reset whereToSendOutput to prevent ereport from attempting to send any
+        * more messages to the standby.
         */
        if (whereToSendOutput == DestRemote)
                whereToSendOutput = DestNone;
 
        proc_exit(0);
-       return 1;               /* keep the compiler quiet */
+       return 1;                                       /* keep the compiler quiet */
 }
 
 /* Initialize a per-walsender data structure for this walsender process */
 static void
 InitWalSnd(void)
 {
-       /* use volatile pointer to prevent code rearrangement */
-       int             i;
+       int                     i;
 
        /*
         * WalSndCtl should be set up already (we inherit this by fork() or
@@ -465,8 +661,9 @@ InitWalSnd(void)
         * Find a free walsender slot and reserve it. If this fails, we must be
         * out of WalSnd structures.
         */
-       for (i = 0; i < MaxWalSenders; i++)
+       for (i = 0; i < max_wal_senders; i++)
        {
+               /* use volatile pointer to prevent code rearrangement */
                volatile WalSnd *walsnd = &WalSndCtl->walsnds[i];
 
                SpinLockAcquire(&walsnd->mutex);
@@ -478,18 +675,26 @@ InitWalSnd(void)
                }
                else
                {
-                       /* found */
-                       MyWalSnd = (WalSnd *) walsnd;
+                       /*
+                        * Found a free slot. Reserve it for us.
+                        */
                        walsnd->pid = MyProcPid;
-                       MemSet(&MyWalSnd->sentPtr, 0, sizeof(XLogRecPtr));
+                       MemSet(&walsnd->sentPtr, 0, sizeof(XLogRecPtr));
+                       MemSet(&walsnd->ackdPtr, 0, sizeof(XLogRecPtr));
                        SpinLockRelease(&walsnd->mutex);
+                       /* don't need the lock anymore */
+                       OwnLatch((Latch *) &walsnd->latch);
+                       MyWalSnd = (WalSnd *) walsnd;
+
                        break;
                }
        }
        if (MyWalSnd == NULL)
                ereport(FATAL,
                                (errcode(ERRCODE_TOO_MANY_CONNECTIONS),
-                                errmsg("sorry, too many standbys already")));
+                                errmsg("number of requested standby connections "
+                                               "exceeds max_wal_senders (currently %d)",
+                                               max_wal_senders)));
 
        /* Arrange to clean up at walsender exit */
        on_shmem_exit(WalSndKill, 0);
@@ -501,11 +706,16 @@ WalSndKill(int code, Datum arg)
 {
        Assert(MyWalSnd != NULL);
 
+       /* Wake up the backends that this walsender had been blocking */
+       MyWalSnd->rplMode = REPLICATION_MODE_ASYNC;
+       WakeupWalSndWaiters(GetOldestAckdPtr());
+
        /*
         * Mark WalSnd struct no longer in use. Assume that no lock is required
         * for this.
         */
        MyWalSnd->pid = 0;
+       DisownLatch(&MyWalSnd->latch);
 
        /* WalSnd struct isn't mine anymore */
        MyWalSnd = NULL;
@@ -513,17 +723,25 @@ WalSndKill(int code, Datum arg)
 
 /*
  * Read 'nbytes' bytes from WAL into 'buf', starting at location 'recptr'
+ *
+ * XXX probably this should be improved to suck data directly from the
+ * WAL buffers when possible.
  */
-void
+static void
 XLogRead(char *buf, XLogRecPtr recptr, Size nbytes)
 {
-       char path[MAXPGPATH];
-       uint32 startoff;
+       XLogRecPtr      startRecPtr = recptr;
+       char            path[MAXPGPATH];
+       uint32          lastRemovedLog;
+       uint32          lastRemovedSeg;
+       uint32          log;
+       uint32          seg;
 
        while (nbytes > 0)
        {
-               int segbytes;
-               int readbytes;
+               uint32          startoff;
+               int                     segbytes;
+               int                     readbytes;
 
                startoff = recptr.xrecoff % XLogSegSize;
 
@@ -538,10 +756,28 @@ XLogRead(char *buf, XLogRecPtr recptr, Size nbytes)
 
                        sendFile = BasicOpenFile(path, O_RDONLY | PG_BINARY, 0);
                        if (sendFile < 0)
-                               ereport(FATAL, /* XXX: Why FATAL? */
-                                               (errcode_for_file_access(),
-                                                errmsg("could not open file \"%s\" (log file %u, segment %u): %m",
-                                                               path, sendId, sendSeg)));
+                       {
+                               /*
+                                * If the file is not found, assume it's because the standby
+                                * asked for a too old WAL segment that has already been
+                                * removed or recycled.
+                                */
+                               if (errno == ENOENT)
+                               {
+                                       char            filename[MAXFNAMELEN];
+
+                                       XLogFileName(filename, ThisTimeLineID, sendId, sendSeg);
+                                       ereport(ERROR,
+                                                       (errcode_for_file_access(),
+                                                        errmsg("requested WAL segment %s has already been removed",
+                                                                       filename)));
+                               }
+                               else
+                                       ereport(ERROR,
+                                                       (errcode_for_file_access(),
+                                                        errmsg("could not open file \"%s\" (log file %u, segment %u): %m",
+                                                                       path, sendId, sendSeg)));
+                       }
                        sendOff = 0;
                }
 
@@ -549,7 +785,7 @@ XLogRead(char *buf, XLogRecPtr recptr, Size nbytes)
                if (sendOff != startoff)
                {
                        if (lseek(sendFile, (off_t) startoff, SEEK_SET) < 0)
-                               ereport(FATAL,
+                               ereport(ERROR,
                                                (errcode_for_file_access(),
                                                 errmsg("could not seek in log file %u, segment %u to offset %u: %m",
                                                                sendId, sendSeg, startoff)));
@@ -564,11 +800,11 @@ XLogRead(char *buf, XLogRecPtr recptr, Size nbytes)
 
                readbytes = read(sendFile, buf, segbytes);
                if (readbytes <= 0)
-                       ereport(FATAL,
+                       ereport(ERROR,
                                        (errcode_for_file_access(),
-                                        errmsg("could not read from log file %u, segment %u, offset %u, "
-                                                       "length %lu: %m",
-                                                       sendId, sendSeg, sendOff, (unsigned long) segbytes)));
+                       errmsg("could not read from log file %u, segment %u, offset %u, "
+                                  "length %lu: %m",
+                                  sendId, sendSeg, sendOff, (unsigned long) segbytes)));
 
                /* Update state for read */
                XLByteAdvance(recptr, readbytes);
@@ -577,117 +813,215 @@ XLogRead(char *buf, XLogRecPtr recptr, Size nbytes)
                nbytes -= readbytes;
                buf += readbytes;
        }
+
+       /*
+        * After reading into the buffer, check that what we read was valid. We do
+        * this after reading, because even though the segment was present when we
+        * opened it, it might get recycled or removed while we read it. The
+        * read() succeeds in that case, but the data we tried to read might
+        * already have been overwritten with new WAL records.
+        */
+       XLogGetLastRemoved(&lastRemovedLog, &lastRemovedSeg);
+       XLByteToSeg(startRecPtr, log, seg);
+       if (log < lastRemovedLog ||
+               (log == lastRemovedLog && seg <= lastRemovedSeg))
+       {
+               char            filename[MAXFNAMELEN];
+
+               XLogFileName(filename, ThisTimeLineID, log, seg);
+               ereport(ERROR,
+                               (errcode_for_file_access(),
+                                errmsg("requested WAL segment %s has already been removed",
+                                               filename)));
+       }
 }
 
 /*
- * Read all WAL that's been written (and flushed) since last cycle, and send
- * it to client.
+ * Read up to MAX_SEND_SIZE bytes of WAL that's been flushed to disk,
+ * but not yet sent to the client, and send it.
+ *
+ * If there is no unsent WAL remaining, *caughtup is set to true, otherwise
+ * *caughtup is set to false.
+ *
+ * If there is pending WAL in output buffer, *pending is set to true,
+ * otherwise *pending is set to false.
  *
  * Returns true if OK, false if trouble.
  */
 static bool
-XLogSend(StringInfo outMsg)
+XLogSend(bool *caughtup, bool *pending)
 {
        XLogRecPtr      SendRqstPtr;
-       char    activitymsg[50];
-       /* use volatile pointer to prevent code rearrangement */
-       volatile WalSnd *walsnd = MyWalSnd;
+       XLogRecPtr      startptr;
+       static XLogRecPtr       endptr;
+       Size            nbytes;
+       uint32          n32;
+       int                     res;
+       WalDataMessageHeader msghdr;
+
+       /* Attempt to flush pending WAL in output buffer */
+       if (*pending)
+       {
+               if (WalSndOutHead != WalSndOutTail)
+               {
+                       res = pq_putbytes_if_writable(WalSndOutBuffer + WalSndOutHead,
+                                                                                 WalSndOutTail - WalSndOutHead);
+                       if (res == EOF)
+                               return false;
+                       WalSndOutHead += res;
+                       if (WalSndOutHead != WalSndOutTail)
+                               return true;
+               }
+
+               res = pq_flush_if_writable();
+               if (res == EOF)
+                       return false;
+               if (res == 0)
+                       return true;
+
+               goto updt;
+       }
 
        /*
-        * Invalid position means that we have not yet received the initial
-        * CopyData message from the slave that indicates where to start the
-        * streaming.
+        * Attempt to send all data that's already been written out and fsync'd to
+        * disk.  We cannot go further than what's been written out given the
+        * current implementation of XLogRead().  And in any case it's unsafe to
+        * send WAL that is not securely down to disk on the master: if the master
+        * subsequently crashes and restarts, slaves must not have applied any WAL
+        * that gets lost on the master.
         */
-       if (sentPtr.xlogid == 0 &&
-               sentPtr.xrecoff == 0)
-               return true;
-
-       /* Attempt to send all records flushed to the disk already */
-       SendRqstPtr = GetWriteRecPtr();
+       SendRqstPtr = GetFlushRecPtr();
 
        /* Quick exit if nothing to do */
-       if (!XLByteLT(sentPtr, SendRqstPtr))
+       if (XLByteLE(SendRqstPtr, sentPtr))
+       {
+               *caughtup = true;
                return true;
+       }
 
        /*
-        * We gather multiple records together by issuing just one read() of
-        * a suitable size, and send them as one CopyData message. Repeat
-        * until we've sent everything we can.
+        * Figure out how much to send in one message. If there's no more than
+        * MAX_SEND_SIZE bytes to send, send everything. Otherwise send
+        * MAX_SEND_SIZE bytes, but round back to logfile or page boundary.
+        *
+        * The rounding is not only for performance reasons. Walreceiver relies on
+        * the fact that we never split a WAL record across two messages. Since a
+        * long WAL record is split at page boundary into continuation records,
+        * page boundary is always a safe cut-off point. We also assume that
+        * SendRqstPtr never points to the middle of a WAL record.
         */
-       while (XLByteLT(sentPtr, SendRqstPtr))
+       startptr = sentPtr;
+       if (startptr.xrecoff >= XLogFileSize)
        {
-               XLogRecPtr startptr;
-               XLogRecPtr endptr;
-               Size    nbytes;
-
                /*
-                * Figure out how much to send in one message. If there's less than
-                * MAX_SEND_SIZE bytes to send, send everything. Otherwise send
-                * MAX_SEND_SIZE bytes, but round to page boundary.
-                *
-                * The rounding is not only for performance reasons. Walreceiver
-                * relies on the fact that we never split a WAL record across two
-                * messages. Since a long WAL record is split at page boundary into
-                * continuation records, page boundary is alwayssafe cut-off point.
-                * We also assume that SendRqstPtr never points in the middle of a
-                * WAL record.
+                * crossing a logid boundary, skip the non-existent last log segment
+                * in previous logical log file.
                 */
-               startptr = sentPtr;
-               endptr = startptr;
-               XLByteAdvance(endptr, MAX_SEND_SIZE);
+               startptr.xlogid += 1;
+               startptr.xrecoff = 0;
+       }
+
+       endptr = startptr;
+       XLByteAdvance(endptr, MAX_SEND_SIZE);
+       if (endptr.xlogid != startptr.xlogid)
+       {
+               /* Don't cross a logfile boundary within one message */
+               Assert(endptr.xlogid == startptr.xlogid + 1);
+               endptr.xlogid = startptr.xlogid;
+               endptr.xrecoff = XLogFileSize;
+       }
+
+       /* if we went beyond SendRqstPtr, back off */
+       if (XLByteLE(SendRqstPtr, endptr))
+       {
+               endptr = SendRqstPtr;
+               *caughtup = true;
+       }
+       else
+       {
                /* round down to page boundary. */
                endptr.xrecoff -= (endptr.xrecoff % XLOG_BLCKSZ);
-               /* if we went beyond SendRqstPtr, back off */
-               if (XLByteLT(SendRqstPtr, endptr))
-                       endptr = SendRqstPtr;
+               *caughtup = false;
+       }
 
-               /*
-                * OK to read and send the slice.
-                *
-                * We don't need to convert the xlogid/xrecoff from host byte order
-                * to network byte order because the both server can be expected to
-                * have the same byte order. If they have different byte order, we
-                * don't reach here.
-                */
-               pq_sendbytes(outMsg, (char *) &startptr, sizeof(startptr));
+       nbytes = endptr.xrecoff - startptr.xrecoff;
+       Assert(nbytes <= MAX_SEND_SIZE);
 
-               if (endptr.xlogid != startptr.xlogid)
-               {
-                       Assert(endptr.xlogid == startptr.xlogid + 1);
-                       nbytes = endptr.xrecoff + XLogFileSize - startptr.xrecoff;
-               }
-               else
-                       nbytes = endptr.xrecoff - startptr.xrecoff;
+       /*
+        * OK to read and send the slice.
+        */
+       WalSndOutBuffer[0] = 'd';
+       WalSndOutBuffer[5] = 'w';
+       WalSndOutHead = 0;
+       WalSndOutTail = 6 + sizeof(WalDataMessageHeader) + nbytes;
 
-               sentPtr = endptr;
+       n32 = htonl((uint32) WalSndOutTail - 1);
+       memcpy(WalSndOutBuffer + 1, &n32, 4);
 
-               /*
-                * Read the log directly into the output buffer to prevent
-                * extra memcpy calls.
-                */
-               enlargeStringInfo(outMsg, nbytes);
+       /*
+        * Read the log directly into the output buffer to avoid extra memcpy
+        * calls.
+        */
+       XLogRead(WalSndOutBuffer + 6 + sizeof(WalDataMessageHeader), startptr, nbytes);
 
-               XLogRead(&outMsg->data[outMsg->len], startptr, nbytes);
-               outMsg->len += nbytes;
-               outMsg->data[outMsg->len] = '\0';
+       /*
+        * We fill the message header last so that the send timestamp is taken as
+        * late as possible.
+        */
+       msghdr.dataStart = startptr;
+       msghdr.walEnd = SendRqstPtr;
+       msghdr.sendTime = GetCurrentTimestamp();
 
-               pq_putmessage('d', outMsg->data, outMsg->len);
-               resetStringInfo(outMsg);
+       memcpy(WalSndOutBuffer + 6, &msghdr, sizeof(WalDataMessageHeader));
+
+       res = pq_putbytes_if_writable(WalSndOutBuffer, WalSndOutTail);
+       if (res == EOF)
+               return false;
+
+       WalSndOutHead = res;
+       if (WalSndOutHead != WalSndOutTail)
+       {
+               *caughtup = false;
+               *pending = true;
+               return true;
+       }
+
+       /* Flush pending output to the client */
+       res = pq_flush_if_writable();
+       if (res == EOF)
+               return false;
+       if (res == 0)
+       {
+               *caughtup = false;
+               *pending = true;
+               return true;
        }
 
+updt:
+       WalSndOutHead = WalSndOutTail = 0;
+       *pending = false;
+
+       sentPtr = endptr;
+
        /* Update shared memory status */
-       SpinLockAcquire(&walsnd->mutex);
-       walsnd->sentPtr = sentPtr;
-       SpinLockRelease(&walsnd->mutex);
+       {
+               /* use volatile pointer to prevent code rearrangement */
+               volatile WalSnd *walsnd = MyWalSnd;
 
-       /* Flush pending output */
-       if (pq_flush())
-               return false;
+               SpinLockAcquire(&walsnd->mutex);
+               walsnd->sentPtr = sentPtr;
+               SpinLockRelease(&walsnd->mutex);
+       }
 
        /* Report progress of XLOG streaming in PS display */
-       snprintf(activitymsg, sizeof(activitymsg), "streaming %X/%X",
-                        sentPtr.xlogid, sentPtr.xrecoff);
-       set_ps_display(activitymsg, false);
+       if (update_process_title)
+       {
+               char            activitymsg[50];
+
+               snprintf(activitymsg, sizeof(activitymsg), "streaming %X/%X",
+                                sentPtr.xlogid, sentPtr.xrecoff);
+               set_ps_display(activitymsg, false);
+       }
 
        return true;
 }
@@ -697,6 +1031,8 @@ static void
 WalSndSigHupHandler(SIGNAL_ARGS)
 {
        got_SIGHUP = true;
+       if (MyWalSnd)
+               SetLatch(&MyWalSnd->latch);
 }
 
 /* SIGTERM: set flag to shut down */
@@ -704,6 +1040,8 @@ static void
 WalSndShutdownHandler(SIGNAL_ARGS)
 {
        shutdown_requested = true;
+       if (MyWalSnd)
+               SetLatch(&MyWalSnd->latch);
 }
 
 /*
@@ -732,17 +1070,26 @@ WalSndQuickDieHandler(SIGNAL_ARGS)
         * system reset cycle if some idiot DBA sends a manual SIGQUIT to a random
         * backend.  This is necessary precisely because we don't clean up our
         * shared memory state.  (The "dead man switch" mechanism in pmsignal.c
-        * should ensure the postmaster sees this as a crash, too, but no harm
-        * in being doubly sure.)
+        * should ensure the postmaster sees this as a crash, too, but no harm in
+        * being doubly sure.)
         */
        exit(2);
 }
 
+/* SIGUSR1: set flag to send WAL records */
+static void
+WalSndXLogSendHandler(SIGNAL_ARGS)
+{
+       latch_sigusr1_handler();
+}
+
 /* SIGUSR2: set flag to do a last cycle and shut down afterwards */
 static void
 WalSndLastCycleHandler(SIGNAL_ARGS)
 {
        ready_to_stop = true;
+       if (MyWalSnd)
+               SetLatch(&MyWalSnd->latch);
 }
 
 /* Set up signal handlers */
@@ -750,14 +1097,16 @@ void
 WalSndSignals(void)
 {
        /* Set up signal handlers */
-       pqsignal(SIGHUP, WalSndSigHupHandler);  /* set flag to read config file */
+       pqsignal(SIGHUP, WalSndSigHupHandler);          /* set flag to read config
+                                                                                                * file */
        pqsignal(SIGINT, SIG_IGN);      /* not used */
        pqsignal(SIGTERM, WalSndShutdownHandler);       /* request shutdown */
        pqsignal(SIGQUIT, WalSndQuickDieHandler);       /* hard crash time */
        pqsignal(SIGALRM, SIG_IGN);
        pqsignal(SIGPIPE, SIG_IGN);
-       pqsignal(SIGUSR1, SIG_IGN);     /* not used */
-       pqsignal(SIGUSR2, WalSndLastCycleHandler);      /* request a last cycle and shutdown */
+       pqsignal(SIGUSR1, WalSndXLogSendHandler);       /* request WAL sending */
+       pqsignal(SIGUSR2, WalSndLastCycleHandler);      /* request a last cycle and
+                                                                                                * shutdown */
 
        /* Reset some signals that are accepted by postmaster but not here */
        pqsignal(SIGCHLD, SIG_DFL);
@@ -771,10 +1120,17 @@ WalSndSignals(void)
 Size
 WalSndShmemSize(void)
 {
-       Size size = 0;
+       Size            size = 0;
 
        size = offsetof(WalSndCtlData, walsnds);
-       size = add_size(size, mul_size(MaxWalSenders, sizeof(WalSnd)));
+       size = add_size(size, mul_size(max_wal_senders, sizeof(WalSnd)));
+
+       /*
+        * If replication is enabled, we have a data structure called
+        * WalSndWaiters, created in shared memory.
+        */
+       if (max_wal_senders > 0)
+               size = add_size(size, mul_size(MaxBackends, sizeof(WalSndWaiter)));
 
        return size;
 }
@@ -783,53 +1139,245 @@ WalSndShmemSize(void)
 void
 WalSndShmemInit(void)
 {
-       bool    found;
-       int             i;
+       bool            found;
+       int                     i;
+       Size            size = add_size(offsetof(WalSndCtlData, walsnds),
+                                                               mul_size(max_wal_senders, sizeof(WalSnd)));
 
        WalSndCtl = (WalSndCtlData *)
-               ShmemInitStruct("Wal Sender Ctl", WalSndShmemSize(), &found);
+               ShmemInitStruct("Wal Sender Ctl", size, &found);
 
-       if (WalSndCtl == NULL)
-               ereport(FATAL,
-                               (errcode(ERRCODE_OUT_OF_MEMORY),
-                                errmsg("not enough shared memory for walsender")));
-       if (found)
-               return;                                 /* already initialized */
+       if (!found)
+       {
+               /* First time through, so initialize */
+               MemSet(WalSndCtl, 0, size);
 
-       /* Initialize the data structures */
-       MemSet(WalSndCtl, 0, WalSndShmemSize());
+               for (i = 0; i < max_wal_senders; i++)
+               {
+                       WalSnd     *walsnd = &WalSndCtl->walsnds[i];
 
-       for (i = 0; i < MaxWalSenders; i++)
+                       SpinLockInit(&walsnd->mutex);
+                       InitSharedLatch(&walsnd->latch);
+               }
+       }
+
+       /* Create or attach to the WalSndWaiters array too, if needed */
+       if (max_wal_senders > 0)
        {
-               WalSnd  *walsnd = &WalSndCtl->walsnds[i];
-               SpinLockInit(&walsnd->mutex);
+               WalSndWaiters = (WalSndWaiter *)
+                       ShmemInitStruct("WalSndWaiters",
+                                                       mul_size(MaxBackends, sizeof(WalSndWaiter)),
+                                                       &found);
+               WalSndCtl->maxWaiters = MaxBackends;
        }
 }
 
+/* Wake up all walsenders */
+void
+WalSndWakeup(void)
+{
+       int             i;
+
+       for (i = 0; i < max_wal_senders; i++)
+               SetLatch(&WalSndCtl->walsnds[i].latch);
+}
+
 /*
- * Returns the oldest Send position among walsenders. Or InvalidXLogRecPtr
- * if none.
+ * Ensure that replication has been completed up to the given position.
  */
-XLogRecPtr
-GetOldestWALSendPointer(void)
+void
+WaitXLogSend(XLogRecPtr record)
 {
-       XLogRecPtr oldest = {0, 0};
+       int             i;
+       bool    mustwait = false;
+
+       Assert(max_wal_senders > 0);
+
+       for (i = 0; i < max_wal_senders; i++)
+       {
+               /* use volatile pointer to prevent code rearrangement */
+               volatile WalSnd *walsnd = &WalSndCtl->walsnds[i];
+               XLogRecPtr              recptr;
+
+               /* Don't need to wait for asynchronous walsender */
+               if (walsnd->pid == 0 ||
+                       walsnd->rplMode <= REPLICATION_MODE_ASYNC)
+                       continue;
+
+               SpinLockAcquire(&walsnd->mutex);
+               recptr = walsnd->ackdPtr;
+               SpinLockRelease(&walsnd->mutex);
+
+               if (recptr.xlogid == 0 && recptr.xrecoff == 0)
+                       continue;
+
+               /* Quick exit if already known replicated */
+               if (XLByteLE(record, recptr))
+                       return;
+
+               mustwait = true;
+       }
+
+       /*
+        * Don't need to wait for replication if there is no synchronous
+        * standby
+        */
+       if (!mustwait)
+               return;
+
+       /*
+        * Register myself into the wait list and sleep until replication
+        * has been completed up to the given position and the walsender
+        * signals me.
+        *
+        * If replication has been completed up to the latest position
+        * before the registration, walsender might be unable to send the
+        * signal immediately. We must wake up the walsender after the
+        * registration.
+        */
+       ResetLatch(&MyProc->latch);
+       RegisterWalSndWaiter(MyBackendId, record, &MyProc->latch);
+       WalSndWakeup();
+
+       for (;;)
+       {
+               WaitLatch(&MyProc->latch, 1000000L);
+
+               /* If done already, we finish waiting */
+               if (replication_done)
+               {
+                       replication_done = false;
+                       return;
+               }
+       }
+}
+
+/*
+ * Register the given backend into the wait list.
+ */
+static void
+RegisterWalSndWaiter(BackendId backendId, XLogRecPtr record, Latch *latch)
+{
+       /* use volatile pointer to prevent code rearrangement */
+       volatile WalSndCtlData  *walsndctl = WalSndCtl;
+       int             i;
+       int             count = 0;
+
+       LWLockAcquire(WalSndWaiterLock, LW_EXCLUSIVE);
+
+       /* Out of slots. This should not happen. */
+       if (walsndctl->numWaiters + 1 > walsndctl->maxWaiters)
+               elog(PANIC, "out of replication waiters slots");
+
+       /*
+        * The given position is expected to be relatively new in the
+        * wait list. Since the entries in the list are sorted in an
+        * increasing order of XLogRecPtr, we can shorten the time it
+        * takes to find an insert slot by scanning the list backwards.
+        */
+       for (i = walsndctl->numWaiters; i > 0; i--)
+       {
+               if (XLByteLE(WalSndWaiters[i - 1].record, record))
+                       break;
+               count++;
+       }
+
+       /* Shuffle the list if needed */
+       if (count > 0)
+               memmove(&WalSndWaiters[i + 1], &WalSndWaiters[i],
+                               count * sizeof(WalSndWaiter));
+
+       WalSndWaiters[i].backendId = backendId;
+       WalSndWaiters[i].record = record;
+       WalSndWaiters[i].latch = latch;
+       walsndctl->numWaiters++;
+
+       LWLockRelease(WalSndWaiterLock);
+}
+
+/*
+ * Wake up the backends waiting until replication has been completed
+ * up to the position older than or equal to the given one.
+ *
+ * Wake up all waiters if InvalidXLogRecPtr is given.
+   */
+static void
+WakeupWalSndWaiters(XLogRecPtr record)
+{
+       /* use volatile pointer to prevent code rearrangement */
+       volatile WalSndCtlData  *walsndctl = WalSndCtl;
+       int             i;
+       int             count = 0;
+       bool    all_wakeup = (record.xlogid == 0 && record.xrecoff == 0);
+
+       LWLockAcquire(WalSndWaiterLock, LW_EXCLUSIVE);
+
+       for (i = 0; i < walsndctl->numWaiters; i++)
+       {
+               /* use volatile pointer to prevent code rearrangement */
+               volatile WalSndWaiter  *waiter = &WalSndWaiters[i];
+
+               if (all_wakeup || XLByteLE(waiter->record, record))
+               {
+                       SetProcLatch(waiter->latch, PROCSIG_REPLICATION_INTERRUPT,
+                                                waiter->backendId);
+                       count++;
+               }
+               else
+               {
+                       /*
+                        * If the backend waiting for the Ack position newer than
+                        * the given one is found, we don't need to search the wait
+                        * list any more. This is because the waiters in the list
+                        * are guaranteed to be sorted in an increasing order of
+                        * XLogRecPtr.
+                        */
+                       break;
+               }
+       }
+
+       /* If there are still some waiters, left-justify them in the list */
+       walsndctl->numWaiters -= count;
+       if (walsndctl->numWaiters > 0 && count > 0)
+               memmove(&WalSndWaiters[0], &WalSndWaiters[i],
+                               walsndctl->numWaiters * sizeof(WalSndWaiter));
+
+       LWLockRelease(WalSndWaiterLock);
+}
+
+/*
+ * Returns the oldest Ack position in synchronous walsenders. Or
+ * InvalidXLogRecPtr if none.
+ */
+static XLogRecPtr
+GetOldestAckdPtr(void)
+{
+       XLogRecPtr      oldest = {0, 0};
        int             i;
        bool    found = false;
 
-       for (i = 0; i < MaxWalSenders; i++)
+       for (i = 0; i < max_wal_senders; i++)
        {
                /* use volatile pointer to prevent code rearrangement */
-               volatile WalSnd *walsnd = &WalSndCtl->walsnds[i];
-               XLogRecPtr recptr;
+               volatile WalSnd *walsnd = &WalSndCtl->walsnds[i];
+               XLogRecPtr              recptr;
 
-               if (walsnd->pid == 0)
+               /*
+                * Ignore the Ack position that asynchronous walsender has
+                * since it has never received any Ack.
+                */
+               if (walsnd->pid == 0 ||
+                       walsnd->rplMode <= REPLICATION_MODE_ASYNC)
                        continue;
 
                SpinLockAcquire(&walsnd->mutex);
-               recptr = walsnd->sentPtr;
+               recptr = walsnd->ackdPtr;
                SpinLockRelease(&walsnd->mutex);
 
+               /*
+                * Ignore the Ack position that the walsender which has not
+                * received any Ack yet has.
+                */
                if (recptr.xlogid == 0 && recptr.xrecoff == 0)
                        continue;
 
@@ -839,3 +1387,12 @@ GetOldestWALSendPointer(void)
        }
        return oldest;
 }
+
+/*
+ * This is called when PROCSIG_REPLICATION_INTERRUPT is received.
+ */
+void
+HandleReplicationInterrupt(void)
+{
+       replication_done = true;
+}