OSDN Git Service

Merge branch 'pgrex90-base' into pgrex90
[pg-rex/syncrep.git] / src / backend / replication / walsender.c
index 66f08e1..a41365f 100644 (file)
@@ -48,6 +48,7 @@
 #include "storage/fd.h"
 #include "storage/ipc.h"
 #include "storage/pmsignal.h"
+#include "storage/proc.h"
 #include "tcop/tcopprot.h"
 #include "utils/guc.h"
 #include "utils/memutils.h"
@@ -60,12 +61,28 @@ WalSndCtlData *WalSndCtl = NULL;
 /* My slot in the shared memory array */
 static WalSnd *MyWalSnd = NULL;
 
+/* Array of WalSndWaiter in shared memory */
+static WalSndWaiter  *WalSndWaiters;
+
 /* Global state */
 bool           am_walsender = false;           /* Am I a walsender process ? */
 
 /* User-settable parameters for walsender */
 int                    max_wal_senders = 0;    /* the maximum number of concurrent walsenders */
 int                    WalSndDelay = 200;      /* max sleep time between some actions */
+int                    replication_timeout = 0;        /* maximum time to wait for the Ack from the standby */
+char      *standby_fencing_command = NULL;     /* command to shoot the standby in the head */
+
+/*
+ * Buffer for WAL sending
+ *
+ * WalSndOutBuffer is a work area in which the output message is constructed.
+ * It's used in just so we can avoid re-palloc'ing the buffer on each cycle.
+ * It must be of size 6 + sizeof(WalDataMessageHeader) + MAX_SEND_SIZE.
+ */
+static char       *WalSndOutBuffer;
+static int             WalSndOutHead;          /* head of pending output */
+static int             WalSndOutTail;          /* tail of pending output */
 
 /*
  * These variables are used similarly to openLogFile/Id/Seg/Off,
@@ -82,11 +99,21 @@ static uint32 sendOff = 0;
  */
 static XLogRecPtr sentPtr = {0, 0};
 
+/*
+ * How far have we completed replication already? This is also
+ * advertised in MyWalSnd->ackdPtr. This is not used in asynchronous
+ * replication case.
+ */
+static XLogRecPtr ackdPtr = {0, 0};
+
 /* Flags set by signal handlers for later service in main loop */
 static volatile sig_atomic_t got_SIGHUP = false;
 static volatile sig_atomic_t shutdown_requested = false;
 static volatile sig_atomic_t ready_to_stop = false;
 
+/* Flag set by signal handler of backends for replication */
+static volatile sig_atomic_t replication_done = false;
+
 /* Signal handlers */
 static void WalSndSigHupHandler(SIGNAL_ARGS);
 static void WalSndShutdownHandler(SIGNAL_ARGS);
@@ -100,8 +127,14 @@ static void InitWalSnd(void);
 static void WalSndHandshake(void);
 static void WalSndKill(int code, Datum arg);
 static void XLogRead(char *buf, XLogRecPtr recptr, Size nbytes);
-static bool XLogSend(char *msgbuf, bool *caughtup);
-static void CheckClosedConnection(void);
+static bool XLogSend(bool *caughtup, bool *pending);
+static void ProcessStreamMsgs(StringInfo inMsg);
+static void ExecuteStandbyFencingCommand(void);
+
+static void RegisterWalSndWaiter(BackendId backendId, XLogRecPtr record,
+                                                                Latch *latch);
+static void WakeupWalSndWaiters(XLogRecPtr record);
+static XLogRecPtr GetOldestAckdPtr(void);
 
 
 /* Main entry point for walsender process */
@@ -205,6 +238,8 @@ WalSndHandshake(void)
                /* Handle the very limited subset of commands expected in this phase */
                switch (firstchar)
                {
+                       char            modestr[6];
+
                        case 'Q':                       /* Query message */
                                {
                                        const char *query_string;
@@ -265,10 +300,11 @@ WalSndHandshake(void)
                                                ReadyForQuery(DestRemote);
                                                /* ReadyForQuery did pq_flush for us */
                                        }
-                                       else if (sscanf(query_string, "START_REPLICATION %X/%X",
-                                                                       &recptr.xlogid, &recptr.xrecoff) == 2)
+                                       else if (sscanf(query_string, "START_REPLICATION %X/%X MODE %5s",
+                                                                       &recptr.xlogid, &recptr.xrecoff, modestr) == 3)
                                        {
                                                StringInfoData buf;
+                                               int                     mode;
 
                                                /*
                                                 * Check that we're logging enough information in the
@@ -287,6 +323,35 @@ WalSndHandshake(void)
                                                                        (errcode(ERRCODE_CANNOT_CONNECT_NOW),
                                                                         errmsg("standby connections not allowed because wal_level=minimal")));
 
+                                               /* Verify that the specified replication mode is valid */
+                                               {
+                                                       const struct config_enum_entry *entry;
+
+                                                       for (entry = replication_mode_options; entry && entry->name; entry++)
+                                                       {
+                                                               if (strcmp(modestr, entry->name) == 0)
+                                                               {
+                                                                       mode = entry->val;
+                                                                       break;
+                                                               }
+                                                       }
+                                                       if (entry == NULL || entry->name == NULL)
+                                                               ereport(FATAL,
+                                                                               (errcode(ERRCODE_PROTOCOL_VIOLATION),
+                                                                                errmsg("invalid replication mode: %s", modestr)));
+                                               }
+
+                                               /* Change the state according to replication mode specified by standby */
+                                               {
+                                                       /* use volatile pointer to prevent code rearrangement */
+                                                       volatile WalSnd *walsnd = MyWalSnd;
+
+                                                       SpinLockAcquire(&walsnd->mutex);
+                                                       walsnd->walSndState = (mode == REPLICATION_MODE_ASYNC) ?
+                                                               WALSND_ASYNC : WALSND_CATCHUP;
+                                                       SpinLockRelease(&walsnd->mutex);
+                                               }
+
                                                /* Send a CopyBothResponse message, and start streaming */
                                                pq_beginmessage(&buf, 'W');
                                                pq_sendbyte(&buf, 0);
@@ -295,10 +360,10 @@ WalSndHandshake(void)
                                                pq_flush();
 
                                                /*
-                                                * Initialize position to the received one, then the
+                                                * Initialize positions to the received one, then the
                                                 * xlog records begin to be shipped from that position
                                                 */
-                                               sentPtr = recptr;
+                                               sentPtr = ackdPtr = recptr;
 
                                                /* break out of the loop */
                                                replication_started = true;
@@ -332,59 +397,133 @@ WalSndHandshake(void)
 }
 
 /*
- * Check if the remote end has closed the connection.
+ * Process messages received from the standby.
+ *
+ * ereports on error.
  */
 static void
-CheckClosedConnection(void)
+ProcessStreamMsgs(StringInfo inMsg)
 {
-       unsigned char firstchar;
-       int                     r;
+       bool    acked = false;
 
-       r = pq_getbyte_if_available(&firstchar);
-       if (r < 0)
-       {
-               /* unexpected error or EOF */
-               ereport(COMMERROR,
-                               (errcode(ERRCODE_PROTOCOL_VIOLATION),
-                                errmsg("unexpected EOF on standby connection")));
-               proc_exit(0);
-       }
-       if (r == 0)
+       /* Loop to process successive complete messages available */
+       for (;;)
        {
-               /* no data available without blocking */
-               return;
-       }
+               unsigned char firstchar;
+               int                     r;
+
+               r = pq_getbyte_if_available(&firstchar);
+               if (r < 0)
+               {
+                       /* unexpected error or EOF */
+                       ereport(COMMERROR,
+                                       (errcode(ERRCODE_PROTOCOL_VIOLATION),
+                                        errmsg("unexpected EOF on standby connection")));
+                       proc_exit(0);
+               }
+               if (r == 0)
+               {
+                       /* no data available without blocking */
+                       break;
+               }
+
+               /* Handle the very limited subset of commands expected in this phase */
+               switch (firstchar)
+               {
+                       case 'd':       /* CopyData message */
+                       {
+                               unsigned char   rpltype;
+
+                               /*
+                                * Read the message contents. This is expected to be done without
+                                * blocking because we've been able to get message type code.
+                                */
+                               if (pq_getmessage(inMsg, 0))
+                                       proc_exit(0);           /* suitable message already logged */
+
+                               /* Read the replication message type from CopyData message */
+                               rpltype = pq_getmsgbyte(inMsg);
+                               switch (rpltype)
+                               {
+                                       case 'l':
+                                       {
+                                               WalAckMessageData  *msgdata;
+
+                                               msgdata = (WalAckMessageData *) pq_getmsgbytes(inMsg, sizeof(WalAckMessageData));
+
+                                               /*
+                                                * Update local status.
+                                                *
+                                                * The ackd ptr received from standby should not
+                                                * go backwards.
+                                                */
+                                               if (XLByteLE(ackdPtr, msgdata->ackEnd))
+                                                       ackdPtr = msgdata->ackEnd;
+                                               else
+                                                       ereport(FATAL,
+                                                                       (errmsg("replication completion location went back from "
+                                                                                       "%X/%X to %X/%X",
+                                                                                       ackdPtr.xlogid, ackdPtr.xrecoff,
+                                                                                       msgdata->ackEnd.xlogid, msgdata->ackEnd.xrecoff)));
+
+                                               acked = true;   /* also need to update shared position */
+                                               break;
+                                       }
+                                       default:
+                                               ereport(FATAL,
+                                                               (errcode(ERRCODE_PROTOCOL_VIOLATION),
+                                                                errmsg("invalid replication message type %d",
+                                                                               rpltype)));
+                               }
+                               break;
+                       }
 
-       /* Handle the very limited subset of commands expected in this phase */
-       switch (firstchar)
-       {
                        /*
                         * 'X' means that the standby is closing down the socket.
                         */
-               case 'X':
-                       proc_exit(0);
+                       case 'X':
+                               proc_exit(0);
 
-               default:
-                       ereport(FATAL,
-                                       (errcode(ERRCODE_PROTOCOL_VIOLATION),
-                                        errmsg("invalid standby closing message type %d",
-                                                       firstchar)));
+                       default:
+                               ereport(FATAL,
+                                               (errcode(ERRCODE_PROTOCOL_VIOLATION),
+                                                errmsg("invalid standby message type %d",
+                                                               firstchar)));
+               }
        }
+
+       if (acked)
+       {
+               /* use volatile pointer to prevent code rearrangement */
+               volatile WalSnd *walsnd = MyWalSnd;
+
+               SpinLockAcquire(&walsnd->mutex);
+               walsnd->ackdPtr = ackdPtr;
+               SpinLockRelease(&walsnd->mutex);
+       }
+
+       /* Wake up the backends that this walsender had been blocking */
+       WakeupWalSndWaiters(ackdPtr);
 }
 
 /* Main loop of walsender process */
 static int
 WalSndLoop(void)
 {
-       char       *output_message;
+       StringInfoData  input_message;
        bool            caughtup = false;
+       bool            pending = false;
+       XLogRecPtr      switchptr = {0, 0};
+
+       initStringInfo(&input_message);
 
        /*
         * Allocate buffer that will be used for each output message.  We do this
         * just once to reduce palloc overhead.  The buffer must be made large
         * enough for maximum-sized messages.
         */
-       output_message = palloc(1 + sizeof(WalDataMessageHeader) + MAX_SEND_SIZE);
+       WalSndOutBuffer = palloc(6 + sizeof(WalDataMessageHeader) + MAX_SEND_SIZE);
+       WalSndOutHead = WalSndOutTail = 0;
 
        /* Loop forever, unless we get an error */
        for (;;)
@@ -409,9 +548,9 @@ WalSndLoop(void)
                 */
                if (ready_to_stop)
                {
-                       if (!XLogSend(output_message, &caughtup))
+                       if (!XLogSend(&caughtup, &pending))
                                break;
-                       if (caughtup)
+                       if (caughtup && !pending)
                                shutdown_requested = true;
                }
 
@@ -426,10 +565,11 @@ WalSndLoop(void)
                }
 
                /*
-                * If we had sent all accumulated WAL in last round, nap for the
-                * configured time before retrying.
+                * If we had sent all accumulated WAL in last round or could not
+                * flush pending WAL in output buffer because the socket was not
+                * writable, nap for the configured time before retrying.
                 */
-               if (caughtup)
+               if (caughtup || pending)
                {
                        /*
                         * Even if we wrote all the WAL that was available when we started
@@ -440,28 +580,119 @@ WalSndLoop(void)
                         */
                        ResetLatch(&MyWalSnd->latch);
 
-                       if (!XLogSend(output_message, &caughtup))
+                       if (!XLogSend(&caughtup, &pending))
                                break;
-                       if (caughtup && !got_SIGHUP && !ready_to_stop && !shutdown_requested)
+
+                       /*
+                        * If the standby has almost caught up with the primary, we change
+                        * the state to WALSND_PRESYNC and start making transactions wait
+                        * until their WAL has been replicated.
+                        *
+                        * No lock is required to get WalSnd->walSndState here since it can
+                        * be updated only by walsender.
+                        */
+                       if (MyWalSnd->walSndState == WALSND_CATCHUP && caughtup)
                        {
+                               /* use volatile pointer to prevent code rearrangement */
+                               volatile WalSnd *walsnd = MyWalSnd;
+
+                               SpinLockAcquire(&walsnd->mutex);
+                               walsnd->walSndState = WALSND_PRESYNC;
+                               SpinLockRelease(&walsnd->mutex);
+
+                               /*
+                                * switchptr indicates how far we must complete replication
+                                * before advertising that the standby has already been in
+                                * sync with the primary.
+                                */
+                               switchptr = GetFlushRecPtr();
+                       }
+
+                       if ((caughtup || pending) && !got_SIGHUP && !ready_to_stop &&
+                               !shutdown_requested)
+                       {
+                               bool            check_timeout;
+                               long            sleeptime;
+                               int                     res;
+
                                /*
                                 * XXX: We don't really need the periodic wakeups anymore,
                                 * WaitLatchOrSocket should reliably wake up as soon as
                                 * something interesting happens.
                                 */
 
+                               /*
+                                * Check for replication timeout if it's enabled and we need
+                                * to wait until the socket has become writable to flush
+                                * pending WAL in output buffer or until the Ack message
+                                * from the standby has become available.
+                                */
+                               if (replication_timeout > 0 &&
+                                       (pending ||
+                                        (MyWalSnd->walSndState >= WALSND_CATCHUP &&
+                                         XLByteLT(ackdPtr, sentPtr))))
+                               {
+                                       sleeptime = replication_timeout;
+                                       check_timeout = true;
+                               }
+                               else
+                               {
+                                       sleeptime = WalSndDelay;
+                                       check_timeout = false;
+                               }
+
                                /* Sleep */
-                               WaitLatchOrSocket(&MyWalSnd->latch, MyProcPort->sock,
-                                                                 WalSndDelay * 1000L);
+                               res = WaitLatchOrSocket(&MyWalSnd->latch, MyProcPort->sock,
+                                                                               true, (WalSndOutTail > 0),
+                                                                               sleeptime * 1000L);
+
+                               if (res == 0 && check_timeout)
+                               {
+                                       /*
+                                        * Since typically expiration of replication timeout means
+                                        * communication problem, we don't send the error message
+                                        * to the standby.
+                                        */
+                                       ereport(COMMERROR,
+                                                       (errmsg("terminating walsender process due to replication timeout")));
+                                       break;
+                               }
                        }
 
-                       /* Check if the connection was closed */
-                       CheckClosedConnection();
+                       /* Process messages received from the standby */
+                       ProcessStreamMsgs(&input_message);
+
+                       /*
+                        * If the standby has caught up with the primary, we change
+                        * the state to WALSND_SYNC and inform the standby that it's
+                        * in sync with the primary. This state ensures that all the
+                        * transactions completed from a client's point of view have
+                        * been replicated to the standby.
+                        */
+                       if (MyWalSnd->walSndState == WALSND_PRESYNC &&
+                               XLByteLE(switchptr, ackdPtr) && !pending)
+                       {
+                               /* use volatile pointer to prevent code rearrangement */
+                               volatile WalSnd *walsnd = MyWalSnd;
+
+                               SpinLockAcquire(&walsnd->mutex);
+                               walsnd->walSndState = WALSND_SYNC;
+                               SpinLockRelease(&walsnd->mutex);
+
+                               /*
+                                * We can send a XLogCatchupComplete message without blocking
+                                * since it's guaranteed that there is no pending data in the
+                                * output buffer.
+                                */
+                               pq_putmessage('d', "c", 1);
+                               if (pq_flush())
+                                       break;
+                       }
                }
                else
                {
                        /* Attempt to send the log once every loop */
-                       if (!XLogSend(output_message, &caughtup))
+                       if (!XLogSend(&caughtup, &pending))
                                break;
                }
        }
@@ -515,6 +746,8 @@ InitWalSnd(void)
                         */
                        walsnd->pid = MyProcPid;
                        MemSet(&walsnd->sentPtr, 0, sizeof(XLogRecPtr));
+                       MemSet(&walsnd->ackdPtr, 0, sizeof(XLogRecPtr));
+                       walsnd->walSndState = WALSND_INIT;
                        SpinLockRelease(&walsnd->mutex);
                        /* don't need the lock anymore */
                        OwnLatch((Latch *) &walsnd->latch);
@@ -538,9 +771,28 @@ InitWalSnd(void)
 static void
 WalSndKill(int code, Datum arg)
 {
+       /* use volatile pointer to prevent code rearrangement */
+       volatile WalSnd *walsnd = MyWalSnd;
+
        Assert(MyWalSnd != NULL);
 
        /*
+        * If replication was terminated for a reason other than the master
+        * server shutdown or emergency bailout (i.e., unexpected death of
+        * postmaster), we can expect this server can work standalone,
+        * so we call standby_fencing_command to shoot the standby server
+        * in the head if it's specified.
+        */
+       if (!ready_to_stop && PostmasterIsAlive(true))
+               ExecuteStandbyFencingCommand();
+
+       /* Wake up the backends that this walsender had been blocking */
+       SpinLockAcquire(&walsnd->mutex);
+       walsnd->walSndState = WALSND_INIT;
+       SpinLockRelease(&walsnd->mutex);
+       WakeupWalSndWaiters(GetOldestAckdPtr());
+
+       /*
         * Mark WalSnd struct no longer in use. Assume that no lock is required
         * for this.
         */
@@ -670,24 +922,48 @@ XLogRead(char *buf, XLogRecPtr recptr, Size nbytes)
  * Read up to MAX_SEND_SIZE bytes of WAL that's been flushed to disk,
  * but not yet sent to the client, and send it.
  *
- * msgbuf is a work area in which the output message is constructed.  It's
- * passed in just so we can avoid re-palloc'ing the buffer on each cycle.
- * It must be of size 1 + sizeof(WalDataMessageHeader) + MAX_SEND_SIZE.
- *
  * If there is no unsent WAL remaining, *caughtup is set to true, otherwise
  * *caughtup is set to false.
  *
+ * If there is pending WAL in output buffer, *pending is set to true,
+ * otherwise *pending is set to false.
+ *
  * Returns true if OK, false if trouble.
  */
 static bool
-XLogSend(char *msgbuf, bool *caughtup)
+XLogSend(bool *caughtup, bool *pending)
 {
        XLogRecPtr      SendRqstPtr;
        XLogRecPtr      startptr;
-       XLogRecPtr      endptr;
+       static XLogRecPtr       endptr;
        Size            nbytes;
+       uint32          n32;
+       int                     res;
        WalDataMessageHeader msghdr;
 
+       /* Attempt to flush pending WAL in output buffer */
+       if (*pending)
+       {
+               if (WalSndOutHead != WalSndOutTail)
+               {
+                       res = pq_putbytes_if_writable(WalSndOutBuffer + WalSndOutHead,
+                                                                                 WalSndOutTail - WalSndOutHead);
+                       if (res == EOF)
+                               return false;
+                       WalSndOutHead += res;
+                       if (WalSndOutHead != WalSndOutTail)
+                               return true;
+               }
+
+               res = pq_flush_if_writable();
+               if (res == EOF)
+                       return false;
+               if (res == 0)
+                       return true;
+
+               goto updt;
+       }
+
        /*
         * Attempt to send all data that's already been written out and fsync'd to
         * disk.  We cannot go further than what's been written out given the
@@ -756,13 +1032,19 @@ XLogSend(char *msgbuf, bool *caughtup)
        /*
         * OK to read and send the slice.
         */
-       msgbuf[0] = 'w';
+       WalSndOutBuffer[0] = 'd';
+       WalSndOutBuffer[5] = 'w';
+       WalSndOutHead = 0;
+       WalSndOutTail = 6 + sizeof(WalDataMessageHeader) + nbytes;
+
+       n32 = htonl((uint32) WalSndOutTail - 1);
+       memcpy(WalSndOutBuffer + 1, &n32, 4);
 
        /*
         * Read the log directly into the output buffer to avoid extra memcpy
         * calls.
         */
-       XLogRead(msgbuf + 1 + sizeof(WalDataMessageHeader), startptr, nbytes);
+       XLogRead(WalSndOutBuffer + 6 + sizeof(WalDataMessageHeader), startptr, nbytes);
 
        /*
         * We fill the message header last so that the send timestamp is taken as
@@ -772,13 +1054,34 @@ XLogSend(char *msgbuf, bool *caughtup)
        msghdr.walEnd = SendRqstPtr;
        msghdr.sendTime = GetCurrentTimestamp();
 
-       memcpy(msgbuf + 1, &msghdr, sizeof(WalDataMessageHeader));
+       memcpy(WalSndOutBuffer + 6, &msghdr, sizeof(WalDataMessageHeader));
+
+       res = pq_putbytes_if_writable(WalSndOutBuffer, WalSndOutTail);
+       if (res == EOF)
+               return false;
 
-       pq_putmessage('d', msgbuf, 1 + sizeof(WalDataMessageHeader) + nbytes);
+       WalSndOutHead = res;
+       if (WalSndOutHead != WalSndOutTail)
+       {
+               *caughtup = false;
+               *pending = true;
+               return true;
+       }
 
        /* Flush pending output to the client */
-       if (pq_flush())
+       res = pq_flush_if_writable();
+       if (res == EOF)
                return false;
+       if (res == 0)
+       {
+               *caughtup = false;
+               *pending = true;
+               return true;
+       }
+
+updt:
+       WalSndOutHead = WalSndOutTail = 0;
+       *pending = false;
 
        sentPtr = endptr;
 
@@ -805,6 +1108,91 @@ XLogSend(char *msgbuf, bool *caughtup)
        return true;
 }
 
+/*
+ * Attempt to execute standby_fencing_command at the end of replication.
+ */
+static void
+ExecuteStandbyFencingCommand(void)
+{
+       char            standbyFencingCmd[MAXPGPATH];
+       char       *dp;
+       char       *endp;
+       const char *sp;
+       int                     rc;
+
+       /* Do nothing if no command supplied */
+       if (standby_fencing_command[0] == '\0')
+               return;
+
+       /*
+        * construct the command to be executed
+        */
+       dp = standbyFencingCmd;
+       endp = standbyFencingCmd + MAXPGPATH - 1;
+       *endp = '\0';
+
+       for (sp = standby_fencing_command; *sp; sp++)
+       {
+               if (*sp == '%')
+               {
+                       switch (sp[1])
+                       {
+                               case 'a':
+                               {
+                                       /* %a: application_name */
+                                       const char *appname = application_name;
+
+                                       if (appname == NULL || *appname == '\0')
+                                               appname = _("[unknown]");
+
+                                       sp++;
+                                       strlcpy(dp, appname, endp - dp);
+                                       dp += strlen(dp);
+                                       break;
+                               }
+                               case '%':
+                                       /* convert %% to a single % */
+                                       sp++;
+                                       if (dp < endp)
+                                               *dp++ = *sp;
+                                       break;
+                               default:
+                                       /* otherwise treat the % as not special */
+                                       if (dp < endp)
+                                               *dp++ = *sp;
+                                       break;
+                       }
+               }
+               else
+               {
+                       if (dp < endp)
+                               *dp++ = *sp;
+               }
+       }
+       *dp = '\0';
+
+       ereport(DEBUG3,
+                       (errmsg_internal("executing standby fencing command \"%s\"",
+                                                        standbyFencingCmd)));
+
+       /*
+        * execute the constructed command
+        */
+       rc = system(standbyFencingCmd);
+       if (rc != 0)
+       {
+               /*
+                * No matter what code is returned, walsender can't stop exiting.
+                * We don't need to care about the return code of the command here.
+                */
+               ereport(WARNING,
+                               (errmsg("standby fencing command failed with return code %d",
+                                               rc),
+                                errdetail("The failed standby fencing command was: %s",
+                                                  standbyFencingCmd)));
+       }
+}
+
 /* SIGHUP: set flag to re-read config file at next convenient time */
 static void
 WalSndSigHupHandler(SIGNAL_ARGS)
@@ -904,6 +1292,13 @@ WalSndShmemSize(void)
        size = offsetof(WalSndCtlData, walsnds);
        size = add_size(size, mul_size(max_wal_senders, sizeof(WalSnd)));
 
+       /*
+        * If replication is enabled, we have a data structure called
+        * WalSndWaiters, created in shared memory.
+        */
+       if (max_wal_senders > 0)
+               size = add_size(size, mul_size(MaxBackends, sizeof(WalSndWaiter)));
+
        return size;
 }
 
@@ -913,14 +1308,16 @@ WalSndShmemInit(void)
 {
        bool            found;
        int                     i;
+       Size            size = add_size(offsetof(WalSndCtlData, walsnds),
+                                                               mul_size(max_wal_senders, sizeof(WalSnd)));
 
        WalSndCtl = (WalSndCtlData *)
-               ShmemInitStruct("Wal Sender Ctl", WalSndShmemSize(), &found);
+               ShmemInitStruct("Wal Sender Ctl", size, &found);
 
        if (!found)
        {
                /* First time through, so initialize */
-               MemSet(WalSndCtl, 0, WalSndShmemSize());
+               MemSet(WalSndCtl, 0, size);
 
                for (i = 0; i < max_wal_senders; i++)
                {
@@ -930,6 +1327,16 @@ WalSndShmemInit(void)
                        InitSharedLatch(&walsnd->latch);
                }
        }
+
+       /* Create or attach to the WalSndWaiters array too, if needed */
+       if (max_wal_senders > 0)
+       {
+               WalSndWaiters = (WalSndWaiter *)
+                       ShmemInitStruct("WalSndWaiters",
+                                                       mul_size(MaxBackends, sizeof(WalSndWaiter)),
+                                                       &found);
+               WalSndCtl->maxWaiters = MaxBackends;
+       }
 }
 
 /* Wake up all walsenders */
@@ -943,36 +1350,204 @@ WalSndWakeup(void)
 }
 
 /*
- * This isn't currently used for anything. Monitoring tools might be
- * interested in the future, and we'll need something like this in the
- * future for synchronous replication.
+ * Ensure that replication has been completed up to the given position.
  */
-#ifdef NOT_USED
+void
+WaitXLogSend(XLogRecPtr record)
+{
+       int             i;
+       bool    mustwait = false;
+
+       Assert(max_wal_senders > 0);
+
+       for (i = 0; i < max_wal_senders; i++)
+       {
+               /* use volatile pointer to prevent code rearrangement */
+               volatile WalSnd *walsnd = &WalSndCtl->walsnds[i];
+               XLogRecPtr              recptr;
+               WalSndState             state;
+
+               if (walsnd->pid == 0)
+                       continue;
+
+               SpinLockAcquire(&walsnd->mutex);
+               state = walsnd->walSndState;
+               recptr = walsnd->ackdPtr;
+               SpinLockRelease(&walsnd->mutex);
+
+               if (state <= WALSND_ASYNC ||
+                       (recptr.xlogid == 0 && recptr.xrecoff == 0))
+                       continue;
+
+               /* Quick exit if already known replicated */
+               if (XLByteLE(record, recptr))
+                       return;
+
+               /*
+                * If walsender is bulk-sending WAL for standby to catch up,
+                * we don't need to wait for Ack from standby.
+                */
+               if (state <= WALSND_CATCHUP)
+                       continue;
+
+               mustwait = true;
+       }
+
+       /*
+        * Don't need to wait for replication if there is no synchronous
+        * standby
+        */
+       if (!mustwait)
+               return;
+
+       /*
+        * Register myself into the wait list and sleep until replication
+        * has been completed up to the given position and the walsender
+        * signals me.
+        *
+        * If replication has been completed up to the latest position
+        * before the registration, walsender might be unable to send the
+        * signal immediately. We must wake up the walsender after the
+        * registration.
+        */
+       ResetLatch(&MyProc->latch);
+       RegisterWalSndWaiter(MyBackendId, record, &MyProc->latch);
+       WalSndWakeup();
+
+       for (;;)
+       {
+               WaitLatch(&MyProc->latch, 1000000L);
+
+               /* If done already, we finish waiting */
+               if (replication_done)
+               {
+                       replication_done = false;
+                       return;
+               }
+       }
+}
+
+/*
+ * Register the given backend into the wait list.
+ */
+static void
+RegisterWalSndWaiter(BackendId backendId, XLogRecPtr record, Latch *latch)
+{
+       /* use volatile pointer to prevent code rearrangement */
+       volatile WalSndCtlData  *walsndctl = WalSndCtl;
+       int             i;
+       int             count = 0;
+
+       LWLockAcquire(WalSndWaiterLock, LW_EXCLUSIVE);
+
+       /* Out of slots. This should not happen. */
+       if (walsndctl->numWaiters + 1 > walsndctl->maxWaiters)
+               elog(PANIC, "out of replication waiters slots");
+
+       /*
+        * The given position is expected to be relatively new in the
+        * wait list. Since the entries in the list are sorted in an
+        * increasing order of XLogRecPtr, we can shorten the time it
+        * takes to find an insert slot by scanning the list backwards.
+        */
+       for (i = walsndctl->numWaiters; i > 0; i--)
+       {
+               if (XLByteLE(WalSndWaiters[i - 1].record, record))
+                       break;
+               count++;
+       }
+
+       /* Shuffle the list if needed */
+       if (count > 0)
+               memmove(&WalSndWaiters[i + 1], &WalSndWaiters[i],
+                               count * sizeof(WalSndWaiter));
+
+       WalSndWaiters[i].backendId = backendId;
+       WalSndWaiters[i].record = record;
+       WalSndWaiters[i].latch = latch;
+       walsndctl->numWaiters++;
+
+       LWLockRelease(WalSndWaiterLock);
+}
+
+/*
+ * Wake up the backends waiting until replication has been completed
+ * up to the position older than or equal to the given one.
+ *
+ * Wake up all waiters if InvalidXLogRecPtr is given.
+   */
+static void
+WakeupWalSndWaiters(XLogRecPtr record)
+{
+       /* use volatile pointer to prevent code rearrangement */
+       volatile WalSndCtlData  *walsndctl = WalSndCtl;
+       int             i;
+       int             count = 0;
+       bool    all_wakeup = (record.xlogid == 0 && record.xrecoff == 0);
+
+       LWLockAcquire(WalSndWaiterLock, LW_EXCLUSIVE);
+
+       for (i = 0; i < walsndctl->numWaiters; i++)
+       {
+               /* use volatile pointer to prevent code rearrangement */
+               volatile WalSndWaiter  *waiter = &WalSndWaiters[i];
+
+               if (all_wakeup || XLByteLE(waiter->record, record))
+               {
+                       SetProcLatch(waiter->latch, PROCSIG_REPLICATION_INTERRUPT,
+                                                waiter->backendId);
+                       count++;
+               }
+               else
+               {
+                       /*
+                        * If the backend waiting for the Ack position newer than
+                        * the given one is found, we don't need to search the wait
+                        * list any more. This is because the waiters in the list
+                        * are guaranteed to be sorted in an increasing order of
+                        * XLogRecPtr.
+                        */
+                       break;
+               }
+       }
+
+       /* If there are still some waiters, left-justify them in the list */
+       walsndctl->numWaiters -= count;
+       if (walsndctl->numWaiters > 0 && count > 0)
+               memmove(&WalSndWaiters[0], &WalSndWaiters[i],
+                               walsndctl->numWaiters * sizeof(WalSndWaiter));
+
+       LWLockRelease(WalSndWaiterLock);
+}
+
 /*
- * Returns the oldest Send position among walsenders. Or InvalidXLogRecPtr
- * if none.
+ * Returns the oldest Ack position in synchronous walsenders. Or
+ * InvalidXLogRecPtr if none.
  */
-XLogRecPtr
-GetOldestWALSendPointer(void)
+static XLogRecPtr
+GetOldestAckdPtr(void)
 {
        XLogRecPtr      oldest = {0, 0};
-       int                     i;
-       bool            found = false;
+       int             i;
+       bool    found = false;
 
        for (i = 0; i < max_wal_senders; i++)
        {
                /* use volatile pointer to prevent code rearrangement */
                volatile WalSnd *walsnd = &WalSndCtl->walsnds[i];
-               XLogRecPtr      recptr;
+               XLogRecPtr              recptr;
+               WalSndState             state;
 
                if (walsnd->pid == 0)
                        continue;
 
                SpinLockAcquire(&walsnd->mutex);
-               recptr = walsnd->sentPtr;
+               state = walsnd->walSndState;
+               recptr = walsnd->ackdPtr;
                SpinLockRelease(&walsnd->mutex);
 
-               if (recptr.xlogid == 0 && recptr.xrecoff == 0)
+               if (state <= WALSND_ASYNC ||
+                       (recptr.xlogid == 0 && recptr.xrecoff == 0))
                        continue;
 
                if (!found || XLByteLT(recptr, oldest))
@@ -982,4 +1557,11 @@ GetOldestWALSendPointer(void)
        return oldest;
 }
 
-#endif
+/*
+ * This is called when PROCSIG_REPLICATION_INTERRUPT is received.
+ */
+void
+HandleReplicationInterrupt(void)
+{
+       replication_done = true;
+}