OSDN Git Service

Merge branch 'pgrex90-base' into pgrex90
authorMasaoFujii <masao.fujii@gmail.com>
Mon, 13 Dec 2010 09:12:21 +0000 (18:12 +0900)
committerMasaoFujii <masao.fujii@gmail.com>
Mon, 13 Dec 2010 09:12:21 +0000 (18:12 +0900)
Conflicts:
doc/src/sgml/protocol.sgml
src/backend/replication/walsender.c
src/interfaces/libpq/fe-exec.c
src/interfaces/libpq/fe-protocol2.c
src/interfaces/libpq/fe-protocol3.c
src/interfaces/libpq/libpq-int.h

1  2 
doc/src/sgml/protocol.sgml
src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
src/backend/replication/walsender.c
src/interfaces/libpq/fe-exec.c

     </para>
  
     <para>
-     The CopyInResponse and CopyOutResponse messages include fields that
-     inform the frontend of the number of columns per row and the format
-     codes being used for each column.  (As of the present implementation,
-     all columns in a given <command>COPY</> operation will use the same
-     format, but the message design does not assume this.)
+     There is another Copy-related mode called Copy-both, which allows
+     high-speed bulk data transfer to <emphasis>and</> from the server.
+     Copy-both mode is initiated when a backend in walsender mode
+     executes a <command>START_REPLICATION</command> statement.  The
+     backend sends a CopyBothResponse message to the frontend.  Both
+     the backend and the frontend may then send CopyData messages
+     until the connection is terminated.  See see <xref
+     linkend="protocol-replication">.
     </para>
+    <para>
+     The CopyInResponse, CopyOutResponse and CopyBothResponse messages
+     include fields that inform the frontend of the number of columns
+     per row and the format codes being used for each column.  (As of
+     the present implementation, all columns in a given <command>COPY</>
+     operation will use the same format, but the message design does not
+     assume this.)
+    </para>
    </sect2>
  
    <sect2 id="protocol-async">
@@@ -1344,7 -1357,7 +1357,7 @@@ The commands accepted in walsender mod
        WAL position <replaceable>XXX</>/<replaceable>XXX</>.
        The server can reply with an error, e.g. if the requested section of WAL
        has already been recycled. On success, server responds with a
-       CopyXLogResponse message, and then starts to stream WAL to the frontend.
+       CopyBothResponse message, and then starts to stream WAL to the frontend.
        WAL will continue to be streamed until the connection is broken;
        no further commands will be accepted.
       </para>
        <variablelist>
        <varlistentry>
        <term>
 +          XLogRecPtr (F)
 +      </term>
 +      <listitem>
 +      <para>
 +      <variablelist>
 +      <varlistentry>
 +      <term>
 +          Byte1('l')
 +      </term>
 +      <listitem>
 +      <para>
 +          Identifies the message as an acknowledgment of replication.
 +      </para>
 +      </listitem>
 +      </varlistentry>
 +      <varlistentry>
 +      <term>
 +          Byte8
 +      </term>
 +      <listitem>
 +      <para>
 +          The end of the WAL data replicated to the standby, given in
 +          XLogRecPtr format.
 +      </para>
 +      </listitem>
 +      </varlistentry>
 +      </variablelist>
 +      </para>
 +      </listitem>
 +      </varlistentry>
 +      </variablelist>
 +
 +      <variablelist>
 +      <varlistentry>
 +      <term>
            XLogData (B)
        </term>
        <listitem>
@@@ -2731,7 -2709,7 +2744,7 @@@ CopyOutResponse (B
  
  <varlistentry>
  <term>
- CopyXLogResponse (B)
+ CopyBothResponse (B)
  </term>
  <listitem>
  <para>
  </term>
  <listitem>
  <para>
-                 Identifies the message as a Start Copy XLog response.
+                 Identifies the message as a Start Copy Both response.
                  This message is used only for Streaming Replication.
  </para>
  </listitem>
  </para>
  </listitem>
  </varlistentry>
+ <varlistentry>
+ <term>
+         Int8
+ </term>
+ <listitem>
+ <para>
+                 0 indicates the overall <command>COPY</command> format
+                 is textual (rows separated by newlines, columns
+                 separated by separator characters, etc). 1 indicates
+                 the overall copy format is binary (similar to DataRow
+                 format). See <xref linkend="sql-copy"> for more information.
+ </para>
+ </listitem>
+ </varlistentry>
+ <varlistentry>
+ <term>
+         Int16
+ </term>
+ <listitem>
+ <para>
+                 The number of columns in the data to be copied
+                 (denoted <replaceable>N</> below).
+ </para>
+ </listitem>
+ </varlistentry>
+ <varlistentry>
+ <term>
+         Int16[<replaceable>N</>]
+ </term>
+ <listitem>
+ <para>
+                 The format codes to be used for each column.
+                 Each must presently be zero (text) or one (binary).
+                 All must be zero if the overall copy format is textual.
+ </para>
+ </listitem>
+ </varlistentry>
  </variablelist>
  
  </para>
@@@ -156,11 -156,10 +156,11 @@@ libpqrcv_connect(char *conninfo, XLogRe
        ThisTimeLineID = primary_tli;
  
        /* Start streaming from the point requested by startup process */
 -      snprintf(cmd, sizeof(cmd), "START_REPLICATION %X/%X",
 -                       startpoint.xlogid, startpoint.xrecoff);
 +      snprintf(cmd, sizeof(cmd), "START_REPLICATION %X/%X MODE %s",
 +                       startpoint.xlogid, startpoint.xrecoff,
 +                       GetConfigOption("replication_mode", false));
        res = libpqrcv_PQexec(cmd);
-       if (PQresultStatus(res) != PGRES_COPY_OUT)
+       if (PQresultStatus(res) != PGRES_COPY_BOTH)
        {
                PQclear(res);
                ereport(ERROR,
@@@ -306,6 -305,7 +306,7 @@@ libpqrcv_PQexec(const char *query
  
                if (PQresultStatus(lastResult) == PGRES_COPY_IN ||
                        PQresultStatus(lastResult) == PGRES_COPY_OUT ||
+                       PQresultStatus(lastResult) == PGRES_COPY_BOTH ||
                        PQstatus(streamConn) == CONNECTION_BAD)
                        break;
        }
@@@ -48,7 -48,6 +48,7 @@@
  #include "storage/fd.h"
  #include "storage/ipc.h"
  #include "storage/pmsignal.h"
 +#include "storage/proc.h"
  #include "tcop/tcopprot.h"
  #include "utils/guc.h"
  #include "utils/memutils.h"
@@@ -61,28 -60,12 +61,28 @@@ WalSndCtlData *WalSndCtl = NULL
  /* My slot in the shared memory array */
  static WalSnd *MyWalSnd = NULL;
  
 +/* Array of WalSndWaiter in shared memory */
 +static WalSndWaiter  *WalSndWaiters;
 +
  /* Global state */
  bool          am_walsender = false;           /* Am I a walsender process ? */
  
  /* User-settable parameters for walsender */
  int                   max_wal_senders = 0;    /* the maximum number of concurrent walsenders */
  int                   WalSndDelay = 200;      /* max sleep time between some actions */
 +int                   replication_timeout = 0;        /* maximum time to wait for the Ack from the standby */
 +char     *standby_fencing_command = NULL;     /* command to shoot the standby in the head */
 +
 +/*
 + * Buffer for WAL sending
 + *
 + * WalSndOutBuffer is a work area in which the output message is constructed.
 + * It's used in just so we can avoid re-palloc'ing the buffer on each cycle.
 + * It must be of size 6 + sizeof(WalDataMessageHeader) + MAX_SEND_SIZE.
 + */
 +static char      *WalSndOutBuffer;
 +static int            WalSndOutHead;          /* head of pending output */
 +static int            WalSndOutTail;          /* tail of pending output */
  
  /*
   * These variables are used similarly to openLogFile/Id/Seg/Off,
@@@ -99,21 -82,11 +99,21 @@@ static uint32 sendOff = 0
   */
  static XLogRecPtr sentPtr = {0, 0};
  
 +/*
 + * How far have we completed replication already? This is also
 + * advertised in MyWalSnd->ackdPtr. This is not used in asynchronous
 + * replication case.
 + */
 +static XLogRecPtr ackdPtr = {0, 0};
 +
  /* Flags set by signal handlers for later service in main loop */
  static volatile sig_atomic_t got_SIGHUP = false;
  static volatile sig_atomic_t shutdown_requested = false;
  static volatile sig_atomic_t ready_to_stop = false;
  
 +/* Flag set by signal handler of backends for replication */
 +static volatile sig_atomic_t replication_done = false;
 +
  /* Signal handlers */
  static void WalSndSigHupHandler(SIGNAL_ARGS);
  static void WalSndShutdownHandler(SIGNAL_ARGS);
@@@ -127,14 -100,8 +127,14 @@@ static void InitWalSnd(void)
  static void WalSndHandshake(void);
  static void WalSndKill(int code, Datum arg);
  static void XLogRead(char *buf, XLogRecPtr recptr, Size nbytes);
 -static bool XLogSend(char *msgbuf, bool *caughtup);
 -static void CheckClosedConnection(void);
 +static bool XLogSend(bool *caughtup, bool *pending);
 +static void ProcessStreamMsgs(StringInfo inMsg);
 +static void ExecuteStandbyFencingCommand(void);
 +
 +static void RegisterWalSndWaiter(BackendId backendId, XLogRecPtr record,
 +                                                               Latch *latch);
 +static void WakeupWalSndWaiters(XLogRecPtr record);
 +static XLogRecPtr GetOldestAckdPtr(void);
  
  
  /* Main entry point for walsender process */
@@@ -238,8 -205,6 +238,8 @@@ WalSndHandshake(void
                /* Handle the very limited subset of commands expected in this phase */
                switch (firstchar)
                {
 +                      char            modestr[6];
 +
                        case 'Q':                       /* Query message */
                                {
                                        const char *query_string;
                                                ReadyForQuery(DestRemote);
                                                /* ReadyForQuery did pq_flush for us */
                                        }
 -                                      else if (sscanf(query_string, "START_REPLICATION %X/%X",
 -                                                                      &recptr.xlogid, &recptr.xrecoff) == 2)
 +                                      else if (sscanf(query_string, "START_REPLICATION %X/%X MODE %5s",
 +                                                                      &recptr.xlogid, &recptr.xrecoff, modestr) == 3)
                                        {
                                                StringInfoData buf;
 +                                              int                     mode;
  
                                                /*
                                                 * Check that we're logging enough information in the
                                                                        (errcode(ERRCODE_CANNOT_CONNECT_NOW),
                                                                         errmsg("standby connections not allowed because wal_level=minimal")));
  
-                                               /* Send a CopyXLogResponse message, and start streaming */
 +                                              /* Verify that the specified replication mode is valid */
 +                                              {
 +                                                      const struct config_enum_entry *entry;
 +
 +                                                      for (entry = replication_mode_options; entry && entry->name; entry++)
 +                                                      {
 +                                                              if (strcmp(modestr, entry->name) == 0)
 +                                                              {
 +                                                                      mode = entry->val;
 +                                                                      break;
 +                                                              }
 +                                                      }
 +                                                      if (entry == NULL || entry->name == NULL)
 +                                                              ereport(FATAL,
 +                                                                              (errcode(ERRCODE_PROTOCOL_VIOLATION),
 +                                                                               errmsg("invalid replication mode: %s", modestr)));
 +                                              }
 +
 +                                              /* Change the state according to replication mode specified by standby */
 +                                              {
 +                                                      /* use volatile pointer to prevent code rearrangement */
 +                                                      volatile WalSnd *walsnd = MyWalSnd;
 +
 +                                                      SpinLockAcquire(&walsnd->mutex);
 +                                                      walsnd->walSndState = (mode == REPLICATION_MODE_ASYNC) ?
 +                                                              WALSND_ASYNC : WALSND_CATCHUP;
 +                                                      SpinLockRelease(&walsnd->mutex);
 +                                              }
 +
+                                               /* Send a CopyBothResponse message, and start streaming */
                                                pq_beginmessage(&buf, 'W');
+                                               pq_sendbyte(&buf, 0);
+                                               pq_sendint(&buf, 0, 2);
                                                pq_endmessage(&buf);
                                                pq_flush();
  
                                                /*
 -                                               * Initialize position to the received one, then the
 +                                               * Initialize positions to the received one, then the
                                                 * xlog records begin to be shipped from that position
                                                 */
 -                                              sentPtr = recptr;
 +                                              sentPtr = ackdPtr = recptr;
  
                                                /* break out of the loop */
                                                replication_started = true;
  }
  
  /*
 - * Check if the remote end has closed the connection.
 + * Process messages received from the standby.
 + *
 + * ereports on error.
   */
  static void
 -CheckClosedConnection(void)
 +ProcessStreamMsgs(StringInfo inMsg)
  {
 -      unsigned char firstchar;
 -      int                     r;
 +      bool    acked = false;
  
 -      r = pq_getbyte_if_available(&firstchar);
 -      if (r < 0)
 -      {
 -              /* unexpected error or EOF */
 -              ereport(COMMERROR,
 -                              (errcode(ERRCODE_PROTOCOL_VIOLATION),
 -                               errmsg("unexpected EOF on standby connection")));
 -              proc_exit(0);
 -      }
 -      if (r == 0)
 +      /* Loop to process successive complete messages available */
 +      for (;;)
        {
 -              /* no data available without blocking */
 -              return;
 -      }
 +              unsigned char firstchar;
 +              int                     r;
 +
 +              r = pq_getbyte_if_available(&firstchar);
 +              if (r < 0)
 +              {
 +                      /* unexpected error or EOF */
 +                      ereport(COMMERROR,
 +                                      (errcode(ERRCODE_PROTOCOL_VIOLATION),
 +                                       errmsg("unexpected EOF on standby connection")));
 +                      proc_exit(0);
 +              }
 +              if (r == 0)
 +              {
 +                      /* no data available without blocking */
 +                      break;
 +              }
 +
 +              /* Handle the very limited subset of commands expected in this phase */
 +              switch (firstchar)
 +              {
 +                      case 'd':       /* CopyData message */
 +                      {
 +                              unsigned char   rpltype;
 +
 +                              /*
 +                               * Read the message contents. This is expected to be done without
 +                               * blocking because we've been able to get message type code.
 +                               */
 +                              if (pq_getmessage(inMsg, 0))
 +                                      proc_exit(0);           /* suitable message already logged */
 +
 +                              /* Read the replication message type from CopyData message */
 +                              rpltype = pq_getmsgbyte(inMsg);
 +                              switch (rpltype)
 +                              {
 +                                      case 'l':
 +                                      {
 +                                              WalAckMessageData  *msgdata;
 +
 +                                              msgdata = (WalAckMessageData *) pq_getmsgbytes(inMsg, sizeof(WalAckMessageData));
 +
 +                                              /*
 +                                               * Update local status.
 +                                               *
 +                                               * The ackd ptr received from standby should not
 +                                               * go backwards.
 +                                               */
 +                                              if (XLByteLE(ackdPtr, msgdata->ackEnd))
 +                                                      ackdPtr = msgdata->ackEnd;
 +                                              else
 +                                                      ereport(FATAL,
 +                                                                      (errmsg("replication completion location went back from "
 +                                                                                      "%X/%X to %X/%X",
 +                                                                                      ackdPtr.xlogid, ackdPtr.xrecoff,
 +                                                                                      msgdata->ackEnd.xlogid, msgdata->ackEnd.xrecoff)));
 +
 +                                              acked = true;   /* also need to update shared position */
 +                                              break;
 +                                      }
 +                                      default:
 +                                              ereport(FATAL,
 +                                                              (errcode(ERRCODE_PROTOCOL_VIOLATION),
 +                                                               errmsg("invalid replication message type %d",
 +                                                                              rpltype)));
 +                              }
 +                              break;
 +                      }
  
 -      /* Handle the very limited subset of commands expected in this phase */
 -      switch (firstchar)
 -      {
                        /*
                         * 'X' means that the standby is closing down the socket.
                         */
 -              case 'X':
 -                      proc_exit(0);
 +                      case 'X':
 +                              proc_exit(0);
  
 -              default:
 -                      ereport(FATAL,
 -                                      (errcode(ERRCODE_PROTOCOL_VIOLATION),
 -                                       errmsg("invalid standby closing message type %d",
 -                                                      firstchar)));
 +                      default:
 +                              ereport(FATAL,
 +                                              (errcode(ERRCODE_PROTOCOL_VIOLATION),
 +                                               errmsg("invalid standby message type %d",
 +                                                              firstchar)));
 +              }
        }
 +
 +      if (acked)
 +      {
 +              /* use volatile pointer to prevent code rearrangement */
 +              volatile WalSnd *walsnd = MyWalSnd;
 +
 +              SpinLockAcquire(&walsnd->mutex);
 +              walsnd->ackdPtr = ackdPtr;
 +              SpinLockRelease(&walsnd->mutex);
 +      }
 +
 +      /* Wake up the backends that this walsender had been blocking */
 +      WakeupWalSndWaiters(ackdPtr);
  }
  
  /* Main loop of walsender process */
  static int
  WalSndLoop(void)
  {
 -      char       *output_message;
 +      StringInfoData  input_message;
        bool            caughtup = false;
 +      bool            pending = false;
 +      XLogRecPtr      switchptr = {0, 0};
 +
 +      initStringInfo(&input_message);
  
        /*
         * Allocate buffer that will be used for each output message.  We do this
         * just once to reduce palloc overhead.  The buffer must be made large
         * enough for maximum-sized messages.
         */
 -      output_message = palloc(1 + sizeof(WalDataMessageHeader) + MAX_SEND_SIZE);
 +      WalSndOutBuffer = palloc(6 + sizeof(WalDataMessageHeader) + MAX_SEND_SIZE);
 +      WalSndOutHead = WalSndOutTail = 0;
  
        /* Loop forever, unless we get an error */
        for (;;)
                 */
                if (ready_to_stop)
                {
 -                      if (!XLogSend(output_message, &caughtup))
 +                      if (!XLogSend(&caughtup, &pending))
                                break;
 -                      if (caughtup)
 +                      if (caughtup && !pending)
                                shutdown_requested = true;
                }
  
                }
  
                /*
 -               * If we had sent all accumulated WAL in last round, nap for the
 -               * configured time before retrying.
 +               * If we had sent all accumulated WAL in last round or could not
 +               * flush pending WAL in output buffer because the socket was not
 +               * writable, nap for the configured time before retrying.
                 */
 -              if (caughtup)
 +              if (caughtup || pending)
                {
                        /*
                         * Even if we wrote all the WAL that was available when we started
                         */
                        ResetLatch(&MyWalSnd->latch);
  
 -                      if (!XLogSend(output_message, &caughtup))
 +                      if (!XLogSend(&caughtup, &pending))
                                break;
 -                      if (caughtup && !got_SIGHUP && !ready_to_stop && !shutdown_requested)
 +
 +                      /*
 +                       * If the standby has almost caught up with the primary, we change
 +                       * the state to WALSND_PRESYNC and start making transactions wait
 +                       * until their WAL has been replicated.
 +                       *
 +                       * No lock is required to get WalSnd->walSndState here since it can
 +                       * be updated only by walsender.
 +                       */
 +                      if (MyWalSnd->walSndState == WALSND_CATCHUP && caughtup)
                        {
 +                              /* use volatile pointer to prevent code rearrangement */
 +                              volatile WalSnd *walsnd = MyWalSnd;
 +
 +                              SpinLockAcquire(&walsnd->mutex);
 +                              walsnd->walSndState = WALSND_PRESYNC;
 +                              SpinLockRelease(&walsnd->mutex);
 +
 +                              /*
 +                               * switchptr indicates how far we must complete replication
 +                               * before advertising that the standby has already been in
 +                               * sync with the primary.
 +                               */
 +                              switchptr = GetFlushRecPtr();
 +                      }
 +
 +                      if ((caughtup || pending) && !got_SIGHUP && !ready_to_stop &&
 +                              !shutdown_requested)
 +                      {
 +                              bool            check_timeout;
 +                              long            sleeptime;
 +                              int                     res;
 +
                                /*
                                 * XXX: We don't really need the periodic wakeups anymore,
                                 * WaitLatchOrSocket should reliably wake up as soon as
                                 * something interesting happens.
                                 */
  
 +                              /*
 +                               * Check for replication timeout if it's enabled and we need
 +                               * to wait until the socket has become writable to flush
 +                               * pending WAL in output buffer or until the Ack message
 +                               * from the standby has become available.
 +                               */
 +                              if (replication_timeout > 0 &&
 +                                      (pending ||
 +                                       (MyWalSnd->walSndState >= WALSND_CATCHUP &&
 +                                        XLByteLT(ackdPtr, sentPtr))))
 +                              {
 +                                      sleeptime = replication_timeout;
 +                                      check_timeout = true;
 +                              }
 +                              else
 +                              {
 +                                      sleeptime = WalSndDelay;
 +                                      check_timeout = false;
 +                              }
 +
                                /* Sleep */
 -                              WaitLatchOrSocket(&MyWalSnd->latch, MyProcPort->sock,
 -                                                                WalSndDelay * 1000L);
 +                              res = WaitLatchOrSocket(&MyWalSnd->latch, MyProcPort->sock,
 +                                                                              true, (WalSndOutTail > 0),
 +                                                                              sleeptime * 1000L);
 +
 +                              if (res == 0 && check_timeout)
 +                              {
 +                                      /*
 +                                       * Since typically expiration of replication timeout means
 +                                       * communication problem, we don't send the error message
 +                                       * to the standby.
 +                                       */
 +                                      ereport(COMMERROR,
 +                                                      (errmsg("terminating walsender process due to replication timeout")));
 +                                      break;
 +                              }
                        }
  
 -                      /* Check if the connection was closed */
 -                      CheckClosedConnection();
 +                      /* Process messages received from the standby */
 +                      ProcessStreamMsgs(&input_message);
 +
 +                      /*
 +                       * If the standby has caught up with the primary, we change
 +                       * the state to WALSND_SYNC and inform the standby that it's
 +                       * in sync with the primary. This state ensures that all the
 +                       * transactions completed from a client's point of view have
 +                       * been replicated to the standby.
 +                       */
 +                      if (MyWalSnd->walSndState == WALSND_PRESYNC &&
 +                              XLByteLE(switchptr, ackdPtr) && !pending)
 +                      {
 +                              /* use volatile pointer to prevent code rearrangement */
 +                              volatile WalSnd *walsnd = MyWalSnd;
 +
 +                              SpinLockAcquire(&walsnd->mutex);
 +                              walsnd->walSndState = WALSND_SYNC;
 +                              SpinLockRelease(&walsnd->mutex);
 +
 +                              /*
 +                               * We can send a XLogCatchupComplete message without blocking
 +                               * since it's guaranteed that there is no pending data in the
 +                               * output buffer.
 +                               */
 +                              pq_putmessage('d', "c", 1);
 +                              if (pq_flush())
 +                                      break;
 +                      }
                }
                else
                {
                        /* Attempt to send the log once every loop */
 -                      if (!XLogSend(output_message, &caughtup))
 +                      if (!XLogSend(&caughtup, &pending))
                                break;
                }
        }
@@@ -744,8 -515,6 +746,8 @@@ InitWalSnd(void
                         */
                        walsnd->pid = MyProcPid;
                        MemSet(&walsnd->sentPtr, 0, sizeof(XLogRecPtr));
 +                      MemSet(&walsnd->ackdPtr, 0, sizeof(XLogRecPtr));
 +                      walsnd->walSndState = WALSND_INIT;
                        SpinLockRelease(&walsnd->mutex);
                        /* don't need the lock anymore */
                        OwnLatch((Latch *) &walsnd->latch);
  static void
  WalSndKill(int code, Datum arg)
  {
 +      /* use volatile pointer to prevent code rearrangement */
 +      volatile WalSnd *walsnd = MyWalSnd;
 +
        Assert(MyWalSnd != NULL);
  
        /*
 +       * If replication was terminated for a reason other than the master
 +       * server shutdown or emergency bailout (i.e., unexpected death of
 +       * postmaster), we can expect this server can work standalone,
 +       * so we call standby_fencing_command to shoot the standby server
 +       * in the head if it's specified.
 +       */
 +      if (!ready_to_stop && PostmasterIsAlive(true))
 +              ExecuteStandbyFencingCommand();
 +
 +      /* Wake up the backends that this walsender had been blocking */
 +      SpinLockAcquire(&walsnd->mutex);
 +      walsnd->walSndState = WALSND_INIT;
 +      SpinLockRelease(&walsnd->mutex);
 +      WakeupWalSndWaiters(GetOldestAckdPtr());
 +
 +      /*
         * Mark WalSnd struct no longer in use. Assume that no lock is required
         * for this.
         */
@@@ -920,48 -670,24 +922,48 @@@ XLogRead(char *buf, XLogRecPtr recptr, 
   * Read up to MAX_SEND_SIZE bytes of WAL that's been flushed to disk,
   * but not yet sent to the client, and send it.
   *
 - * msgbuf is a work area in which the output message is constructed.  It's
 - * passed in just so we can avoid re-palloc'ing the buffer on each cycle.
 - * It must be of size 1 + sizeof(WalDataMessageHeader) + MAX_SEND_SIZE.
 - *
   * If there is no unsent WAL remaining, *caughtup is set to true, otherwise
   * *caughtup is set to false.
   *
 + * If there is pending WAL in output buffer, *pending is set to true,
 + * otherwise *pending is set to false.
 + *
   * Returns true if OK, false if trouble.
   */
  static bool
 -XLogSend(char *msgbuf, bool *caughtup)
 +XLogSend(bool *caughtup, bool *pending)
  {
        XLogRecPtr      SendRqstPtr;
        XLogRecPtr      startptr;
 -      XLogRecPtr      endptr;
 +      static XLogRecPtr       endptr;
        Size            nbytes;
 +      uint32          n32;
 +      int                     res;
        WalDataMessageHeader msghdr;
  
 +      /* Attempt to flush pending WAL in output buffer */
 +      if (*pending)
 +      {
 +              if (WalSndOutHead != WalSndOutTail)
 +              {
 +                      res = pq_putbytes_if_writable(WalSndOutBuffer + WalSndOutHead,
 +                                                                                WalSndOutTail - WalSndOutHead);
 +                      if (res == EOF)
 +                              return false;
 +                      WalSndOutHead += res;
 +                      if (WalSndOutHead != WalSndOutTail)
 +                              return true;
 +              }
 +
 +              res = pq_flush_if_writable();
 +              if (res == EOF)
 +                      return false;
 +              if (res == 0)
 +                      return true;
 +
 +              goto updt;
 +      }
 +
        /*
         * Attempt to send all data that's already been written out and fsync'd to
         * disk.  We cannot go further than what's been written out given the
        /*
         * OK to read and send the slice.
         */
 -      msgbuf[0] = 'w';
 +      WalSndOutBuffer[0] = 'd';
 +      WalSndOutBuffer[5] = 'w';
 +      WalSndOutHead = 0;
 +      WalSndOutTail = 6 + sizeof(WalDataMessageHeader) + nbytes;
 +
 +      n32 = htonl((uint32) WalSndOutTail - 1);
 +      memcpy(WalSndOutBuffer + 1, &n32, 4);
  
        /*
         * Read the log directly into the output buffer to avoid extra memcpy
         * calls.
         */
 -      XLogRead(msgbuf + 1 + sizeof(WalDataMessageHeader), startptr, nbytes);
 +      XLogRead(WalSndOutBuffer + 6 + sizeof(WalDataMessageHeader), startptr, nbytes);
  
        /*
         * We fill the message header last so that the send timestamp is taken as
        msghdr.walEnd = SendRqstPtr;
        msghdr.sendTime = GetCurrentTimestamp();
  
 -      memcpy(msgbuf + 1, &msghdr, sizeof(WalDataMessageHeader));
 +      memcpy(WalSndOutBuffer + 6, &msghdr, sizeof(WalDataMessageHeader));
 +
 +      res = pq_putbytes_if_writable(WalSndOutBuffer, WalSndOutTail);
 +      if (res == EOF)
 +              return false;
  
 -      pq_putmessage('d', msgbuf, 1 + sizeof(WalDataMessageHeader) + nbytes);
 +      WalSndOutHead = res;
 +      if (WalSndOutHead != WalSndOutTail)
 +      {
 +              *caughtup = false;
 +              *pending = true;
 +              return true;
 +      }
  
        /* Flush pending output to the client */
 -      if (pq_flush())
 +      res = pq_flush_if_writable();
 +      if (res == EOF)
                return false;
 +      if (res == 0)
 +      {
 +              *caughtup = false;
 +              *pending = true;
 +              return true;
 +      }
 +
 +updt:
 +      WalSndOutHead = WalSndOutTail = 0;
 +      *pending = false;
  
        sentPtr = endptr;
  
        return true;
  }
  
 +/*
 + * Attempt to execute standby_fencing_command at the end of replication.
 + */
 +static void
 +ExecuteStandbyFencingCommand(void)
 +{
 +      char            standbyFencingCmd[MAXPGPATH];
 +      char       *dp;
 +      char       *endp;
 +      const char *sp;
 +      int                     rc;
 +
 +      /* Do nothing if no command supplied */
 +      if (standby_fencing_command[0] == '\0')
 +              return;
 +
 +      /*
 +       * construct the command to be executed
 +       */
 +      dp = standbyFencingCmd;
 +      endp = standbyFencingCmd + MAXPGPATH - 1;
 +      *endp = '\0';
 +
 +      for (sp = standby_fencing_command; *sp; sp++)
 +      {
 +              if (*sp == '%')
 +              {
 +                      switch (sp[1])
 +                      {
 +                              case 'a':
 +                              {
 +                                      /* %a: application_name */
 +                                      const char *appname = application_name;
 +
 +                                      if (appname == NULL || *appname == '\0')
 +                                              appname = _("[unknown]");
 +
 +                                      sp++;
 +                                      strlcpy(dp, appname, endp - dp);
 +                                      dp += strlen(dp);
 +                                      break;
 +                              }
 +                              case '%':
 +                                      /* convert %% to a single % */
 +                                      sp++;
 +                                      if (dp < endp)
 +                                              *dp++ = *sp;
 +                                      break;
 +                              default:
 +                                      /* otherwise treat the % as not special */
 +                                      if (dp < endp)
 +                                              *dp++ = *sp;
 +                                      break;
 +                      }
 +              }
 +              else
 +              {
 +                      if (dp < endp)
 +                              *dp++ = *sp;
 +              }
 +      }
 +      *dp = '\0';
 +
 +      ereport(DEBUG3,
 +                      (errmsg_internal("executing standby fencing command \"%s\"",
 +                                                       standbyFencingCmd)));
 +
 +      /*
 +       * execute the constructed command
 +       */
 +      rc = system(standbyFencingCmd);
 +      if (rc != 0)
 +      {
 +              /*
 +               * No matter what code is returned, walsender can't stop exiting.
 +               * We don't need to care about the return code of the command here.
 +               */
 +              ereport(WARNING,
 +                              (errmsg("standby fencing command failed with return code %d",
 +                                              rc),
 +                               errdetail("The failed standby fencing command was: %s",
 +                                                 standbyFencingCmd)));
 +      }
 +}
 +
  /* SIGHUP: set flag to re-read config file at next convenient time */
  static void
  WalSndSigHupHandler(SIGNAL_ARGS)
@@@ -1290,13 -904,6 +1292,13 @@@ WalSndShmemSize(void
        size = offsetof(WalSndCtlData, walsnds);
        size = add_size(size, mul_size(max_wal_senders, sizeof(WalSnd)));
  
 +      /*
 +       * If replication is enabled, we have a data structure called
 +       * WalSndWaiters, created in shared memory.
 +       */
 +      if (max_wal_senders > 0)
 +              size = add_size(size, mul_size(MaxBackends, sizeof(WalSndWaiter)));
 +
        return size;
  }
  
@@@ -1306,16 -913,14 +1308,16 @@@ WalSndShmemInit(void
  {
        bool            found;
        int                     i;
 +      Size            size = add_size(offsetof(WalSndCtlData, walsnds),
 +                                                              mul_size(max_wal_senders, sizeof(WalSnd)));
  
        WalSndCtl = (WalSndCtlData *)
 -              ShmemInitStruct("Wal Sender Ctl", WalSndShmemSize(), &found);
 +              ShmemInitStruct("Wal Sender Ctl", size, &found);
  
        if (!found)
        {
                /* First time through, so initialize */
 -              MemSet(WalSndCtl, 0, WalSndShmemSize());
 +              MemSet(WalSndCtl, 0, size);
  
                for (i = 0; i < max_wal_senders; i++)
                {
                        InitSharedLatch(&walsnd->latch);
                }
        }
 +
 +      /* Create or attach to the WalSndWaiters array too, if needed */
 +      if (max_wal_senders > 0)
 +      {
 +              WalSndWaiters = (WalSndWaiter *)
 +                      ShmemInitStruct("WalSndWaiters",
 +                                                      mul_size(MaxBackends, sizeof(WalSndWaiter)),
 +                                                      &found);
 +              WalSndCtl->maxWaiters = MaxBackends;
 +      }
  }
  
  /* Wake up all walsenders */
@@@ -1348,204 -943,36 +1350,204 @@@ WalSndWakeup(void
  }
  
  /*
 - * This isn't currently used for anything. Monitoring tools might be
 - * interested in the future, and we'll need something like this in the
 - * future for synchronous replication.
 + * Ensure that replication has been completed up to the given position.
   */
 -#ifdef NOT_USED
 +void
 +WaitXLogSend(XLogRecPtr record)
 +{
 +      int             i;
 +      bool    mustwait = false;
 +
 +      Assert(max_wal_senders > 0);
 +
 +      for (i = 0; i < max_wal_senders; i++)
 +      {
 +              /* use volatile pointer to prevent code rearrangement */
 +              volatile WalSnd *walsnd = &WalSndCtl->walsnds[i];
 +              XLogRecPtr              recptr;
 +              WalSndState             state;
 +
 +              if (walsnd->pid == 0)
 +                      continue;
 +
 +              SpinLockAcquire(&walsnd->mutex);
 +              state = walsnd->walSndState;
 +              recptr = walsnd->ackdPtr;
 +              SpinLockRelease(&walsnd->mutex);
 +
 +              if (state <= WALSND_ASYNC ||
 +                      (recptr.xlogid == 0 && recptr.xrecoff == 0))
 +                      continue;
 +
 +              /* Quick exit if already known replicated */
 +              if (XLByteLE(record, recptr))
 +                      return;
 +
 +              /*
 +               * If walsender is bulk-sending WAL for standby to catch up,
 +               * we don't need to wait for Ack from standby.
 +               */
 +              if (state <= WALSND_CATCHUP)
 +                      continue;
 +
 +              mustwait = true;
 +      }
 +
 +      /*
 +       * Don't need to wait for replication if there is no synchronous
 +       * standby
 +       */
 +      if (!mustwait)
 +              return;
 +
 +      /*
 +       * Register myself into the wait list and sleep until replication
 +       * has been completed up to the given position and the walsender
 +       * signals me.
 +       *
 +       * If replication has been completed up to the latest position
 +       * before the registration, walsender might be unable to send the
 +       * signal immediately. We must wake up the walsender after the
 +       * registration.
 +       */
 +      ResetLatch(&MyProc->latch);
 +      RegisterWalSndWaiter(MyBackendId, record, &MyProc->latch);
 +      WalSndWakeup();
 +
 +      for (;;)
 +      {
 +              WaitLatch(&MyProc->latch, 1000000L);
 +
 +              /* If done already, we finish waiting */
 +              if (replication_done)
 +              {
 +                      replication_done = false;
 +                      return;
 +              }
 +      }
 +}
 +
 +/*
 + * Register the given backend into the wait list.
 + */
 +static void
 +RegisterWalSndWaiter(BackendId backendId, XLogRecPtr record, Latch *latch)
 +{
 +      /* use volatile pointer to prevent code rearrangement */
 +      volatile WalSndCtlData  *walsndctl = WalSndCtl;
 +      int             i;
 +      int             count = 0;
 +
 +      LWLockAcquire(WalSndWaiterLock, LW_EXCLUSIVE);
 +
 +      /* Out of slots. This should not happen. */
 +      if (walsndctl->numWaiters + 1 > walsndctl->maxWaiters)
 +              elog(PANIC, "out of replication waiters slots");
 +
 +      /*
 +       * The given position is expected to be relatively new in the
 +       * wait list. Since the entries in the list are sorted in an
 +       * increasing order of XLogRecPtr, we can shorten the time it
 +       * takes to find an insert slot by scanning the list backwards.
 +       */
 +      for (i = walsndctl->numWaiters; i > 0; i--)
 +      {
 +              if (XLByteLE(WalSndWaiters[i - 1].record, record))
 +                      break;
 +              count++;
 +      }
 +
 +      /* Shuffle the list if needed */
 +      if (count > 0)
 +              memmove(&WalSndWaiters[i + 1], &WalSndWaiters[i],
 +                              count * sizeof(WalSndWaiter));
 +
 +      WalSndWaiters[i].backendId = backendId;
 +      WalSndWaiters[i].record = record;
 +      WalSndWaiters[i].latch = latch;
 +      walsndctl->numWaiters++;
 +
 +      LWLockRelease(WalSndWaiterLock);
 +}
 +
 +/*
 + * Wake up the backends waiting until replication has been completed
 + * up to the position older than or equal to the given one.
 + *
 + * Wake up all waiters if InvalidXLogRecPtr is given.
 +   */
 +static void
 +WakeupWalSndWaiters(XLogRecPtr record)
 +{
 +      /* use volatile pointer to prevent code rearrangement */
 +      volatile WalSndCtlData  *walsndctl = WalSndCtl;
 +      int             i;
 +      int             count = 0;
 +      bool    all_wakeup = (record.xlogid == 0 && record.xrecoff == 0);
 +
 +      LWLockAcquire(WalSndWaiterLock, LW_EXCLUSIVE);
 +
 +      for (i = 0; i < walsndctl->numWaiters; i++)
 +      {
 +              /* use volatile pointer to prevent code rearrangement */
 +              volatile WalSndWaiter  *waiter = &WalSndWaiters[i];
 +
 +              if (all_wakeup || XLByteLE(waiter->record, record))
 +              {
 +                      SetProcLatch(waiter->latch, PROCSIG_REPLICATION_INTERRUPT,
 +                                               waiter->backendId);
 +                      count++;
 +              }
 +              else
 +              {
 +                      /*
 +                       * If the backend waiting for the Ack position newer than
 +                       * the given one is found, we don't need to search the wait
 +                       * list any more. This is because the waiters in the list
 +                       * are guaranteed to be sorted in an increasing order of
 +                       * XLogRecPtr.
 +                       */
 +                      break;
 +              }
 +      }
 +
 +      /* If there are still some waiters, left-justify them in the list */
 +      walsndctl->numWaiters -= count;
 +      if (walsndctl->numWaiters > 0 && count > 0)
 +              memmove(&WalSndWaiters[0], &WalSndWaiters[i],
 +                              walsndctl->numWaiters * sizeof(WalSndWaiter));
 +
 +      LWLockRelease(WalSndWaiterLock);
 +}
 +
  /*
 - * Returns the oldest Send position among walsenders. Or InvalidXLogRecPtr
 - * if none.
 + * Returns the oldest Ack position in synchronous walsenders. Or
 + * InvalidXLogRecPtr if none.
   */
 -XLogRecPtr
 -GetOldestWALSendPointer(void)
 +static XLogRecPtr
 +GetOldestAckdPtr(void)
  {
        XLogRecPtr      oldest = {0, 0};
 -      int                     i;
 -      bool            found = false;
 +      int             i;
 +      bool    found = false;
  
        for (i = 0; i < max_wal_senders; i++)
        {
                /* use volatile pointer to prevent code rearrangement */
                volatile WalSnd *walsnd = &WalSndCtl->walsnds[i];
 -              XLogRecPtr      recptr;
 +              XLogRecPtr              recptr;
 +              WalSndState             state;
  
                if (walsnd->pid == 0)
                        continue;
  
                SpinLockAcquire(&walsnd->mutex);
 -              recptr = walsnd->sentPtr;
 +              state = walsnd->walSndState;
 +              recptr = walsnd->ackdPtr;
                SpinLockRelease(&walsnd->mutex);
  
 -              if (recptr.xlogid == 0 && recptr.xrecoff == 0)
 +              if (state <= WALSND_ASYNC ||
 +                      (recptr.xlogid == 0 && recptr.xrecoff == 0))
                        continue;
  
                if (!found || XLByteLT(recptr, oldest))
        return oldest;
  }
  
 -#endif
 +/*
 + * This is called when PROCSIG_REPLICATION_INTERRUPT is received.
 + */
 +void
 +HandleReplicationInterrupt(void)
 +{
 +      replication_done = true;
 +}
@@@ -35,6 -35,7 +35,7 @@@ char     *const pgresStatus[] = 
        "PGRES_TUPLES_OK",
        "PGRES_COPY_OUT",
        "PGRES_COPY_IN",
+       "PGRES_COPY_BOTH",
        "PGRES_BAD_RESPONSE",
        "PGRES_NONFATAL_ERROR",
        "PGRES_FATAL_ERROR"
@@@ -174,6 -175,7 +175,7 @@@ PQmakeEmptyPGresult(PGconn *conn, ExecS
                        case PGRES_TUPLES_OK:
                        case PGRES_COPY_OUT:
                        case PGRES_COPY_IN:
+                       case PGRES_COPY_BOTH:
                                /* non-error cases */
                                break;
                        default:
@@@ -1586,12 -1588,17 +1588,18 @@@ PQgetResult(PGconn *conn
                                res = PQmakeEmptyPGresult(conn, PGRES_COPY_IN);
                        break;
                case PGASYNC_COPY_OUT:
 +              case PGASYNC_COPY_XLOG:
                        if (conn->result && conn->result->resultStatus == PGRES_COPY_OUT)
                                res = pqPrepareAsyncResult(conn);
                        else
                                res = PQmakeEmptyPGresult(conn, PGRES_COPY_OUT);
                        break;
+               case PGASYNC_COPY_BOTH:
+                       if (conn->result && conn->result->resultStatus == PGRES_COPY_BOTH)
+                               res = pqPrepareAsyncResult(conn);
+                       else
+                               res = PQmakeEmptyPGresult(conn, PGRES_COPY_BOTH);
+                       break;
                default:
                        printfPQExpBuffer(&conn->errorMessage,
                                                          libpq_gettext("unexpected asyncStatus: %d\n"),
@@@ -1776,6 -1783,13 +1784,13 @@@ PQexecStart(PGconn *conn
                                return false;
                        }
                }
+               else if (resultStatus == PGRES_COPY_BOTH)
+               {
+                       /* We don't allow PQexec during COPY BOTH */
+                       printfPQExpBuffer(&conn->errorMessage,
+                        libpq_gettext("PQexec not allowed during COPY BOTH\n"));
+                       return false;                   
+               }
                /* check for loss of connection, too */
                if (conn->status == CONNECTION_BAD)
                        return false;
@@@ -1799,7 -1813,7 +1814,7 @@@ PQexecFinish(PGconn *conn
         * than one --- but merge error messages if we get more than one error
         * result.
         *
-        * We have to stop if we see copy in/out, however. We will resume parsing
+        * We have to stop if we see copy in/out/both, however. We will resume parsing
         * after application performs the data transfer.
         *
         * Also stop if the connection is lost (else we'll loop infinitely).
                lastResult = result;
                if (result->resultStatus == PGRES_COPY_IN ||
                        result->resultStatus == PGRES_COPY_OUT ||
+                       result->resultStatus == PGRES_COPY_BOTH ||
                        conn->status == CONNECTION_BAD)
                        break;
        }
@@@ -2001,7 -2016,7 +2017,7 @@@ PQnotifies(PGconn *conn
  }
  
  /*
-  * PQputCopyData - send some data to the backend during COPY IN or COPY XLOG
+  * PQputCopyData - send some data to the backend during COPY IN or COPY BOTH
   *
   * Returns 1 if successful, 0 if data could not be sent (only possible
   * in nonblock mode), or -1 if an error occurs.
@@@ -2012,7 -2027,7 +2028,7 @@@ PQputCopyData(PGconn *conn, const char 
        if (!conn)
                return -1;
        if (conn->asyncStatus != PGASYNC_COPY_IN &&
-               conn->asyncStatus != PGASYNC_COPY_XLOG)
+               conn->asyncStatus != PGASYNC_COPY_BOTH)
        {
                printfPQExpBuffer(&conn->errorMessage,
                                                  libpq_gettext("no COPY in progress\n"));
@@@ -2150,7 -2165,7 +2166,7 @@@ PQputCopyEnd(PGconn *conn, const char *
  
  /*
   * PQgetCopyData - read a row of data from the backend during COPY OUT
-  * or COPY XLOG
+  * or COPY BOTH
   *
   * If successful, sets *buffer to point to a malloc'd row of data, and
   * returns row length (always > 0) as result.
@@@ -2165,7 -2180,7 +2181,7 @@@ PQgetCopyData(PGconn *conn, char **buff
        if (!conn)
                return -2;
        if (conn->asyncStatus != PGASYNC_COPY_OUT &&
-               conn->asyncStatus != PGASYNC_COPY_XLOG)
+               conn->asyncStatus != PGASYNC_COPY_BOTH)
        {
                printfPQExpBuffer(&conn->errorMessage,
                                                  libpq_gettext("no COPY in progress\n"));