OSDN Git Service

Fix typo.
[pg-rex/syncrep.git] / src / backend / replication / walsender.c
index c235a02..bfcb49f 100644 (file)
@@ -73,6 +73,17 @@ int                  WalSndDelay = 200;      /* max sleep time between some actions */
 int                    replication_timeout = 0;        /* maximum time to wait for the Ack from the standby */
 
 /*
+ * Buffer for WAL sending
+ *
+ * WalSndOutBuffer is a work area in which the output message is constructed.
+ * It's used in just so we can avoid re-palloc'ing the buffer on each cycle.
+ * It must be of size 6 + sizeof(WalDataMessageHeader) + MAX_SEND_SIZE.
+ */
+static char       *WalSndOutBuffer;
+static int             WalSndOutHead;          /* head of pending output */
+static int             WalSndOutTail;          /* tail of pending output */
+
+/*
  * These variables are used similarly to openLogFile/Id/Seg/Off,
  * but for walsender to read the XLOG.
  */
@@ -118,7 +129,7 @@ static void InitWalSnd(void);
 static void WalSndHandshake(void);
 static void WalSndKill(int code, Datum arg);
 static void XLogRead(char *buf, XLogRecPtr recptr, Size nbytes);
-static bool XLogSend(char *msgbuf, bool *caughtup);
+static bool XLogSend(bool *caughtup, bool *pending);
 static void ProcessStreamMsgs(StringInfo inMsg);
 
 static void RegisterWalSndWaiter(BackendId backendId, XLogRecPtr record,
@@ -464,7 +475,7 @@ ProcessStreamMsgs(StringInfo inMsg)
                        default:
                                ereport(FATAL,
                                                (errcode(ERRCODE_PROTOCOL_VIOLATION),
-                                                errmsg("invalid standby closing message type %d",
+                                                errmsg("invalid standby message type %d",
                                                                firstchar)));
                }
        }
@@ -488,8 +499,8 @@ static int
 WalSndLoop(void)
 {
        StringInfoData  input_message;
-       char       *output_message;
        bool            caughtup = false;
+       bool            pending = false;
 
        initStringInfo(&input_message);
 
@@ -498,7 +509,8 @@ WalSndLoop(void)
         * just once to reduce palloc overhead.  The buffer must be made large
         * enough for maximum-sized messages.
         */
-       output_message = palloc(1 + sizeof(WalDataMessageHeader) + MAX_SEND_SIZE);
+       WalSndOutBuffer = palloc(6 + sizeof(WalDataMessageHeader) + MAX_SEND_SIZE);
+       WalSndOutHead = WalSndOutTail = 0;
 
        /* Loop forever, unless we get an error */
        for (;;)
@@ -523,9 +535,9 @@ WalSndLoop(void)
                 */
                if (ready_to_stop)
                {
-                       if (!XLogSend(output_message, &caughtup))
+                       if (!XLogSend(&caughtup, &pending))
                                break;
-                       if (caughtup)
+                       if (caughtup && !pending)
                                shutdown_requested = true;
                }
 
@@ -540,10 +552,11 @@ WalSndLoop(void)
                }
 
                /*
-                * If we had sent all accumulated WAL in last round, nap for the
-                * configured time before retrying.
+                * If we had sent all accumulated WAL in last round or could not
+                * flush pending WAL in output buffer because the socket was not
+                * writable, nap for the configured time before retrying.
                 */
-               if (caughtup)
+               if (caughtup || pending)
                {
                        /*
                         * Even if we wrote all the WAL that was available when we started
@@ -554,19 +567,57 @@ WalSndLoop(void)
                         */
                        ResetLatch(&MyWalSnd->latch);
 
-                       if (!XLogSend(output_message, &caughtup))
+                       if (!XLogSend(&caughtup, &pending))
                                break;
-                       if (caughtup && !got_SIGHUP && !ready_to_stop && !shutdown_requested)
+                       if ((caughtup || pending) && !got_SIGHUP && !ready_to_stop &&
+                               !shutdown_requested)
                        {
+                               bool            check_timeout;
+                               long            sleeptime;
+                               int                     res;
+
                                /*
                                 * XXX: We don't really need the periodic wakeups anymore,
                                 * WaitLatchOrSocket should reliably wake up as soon as
                                 * something interesting happens.
                                 */
 
+                               /*
+                                * Check for replication timeout if it's enabled and we need
+                                * to wait until the socket has become writable to flush
+                                * pending WAL in output buffer or until the Ack message
+                                * from the standby has become available.
+                                */
+                               if (replication_timeout > 0 &&
+                                       (pending ||
+                                        (rplMode != REPLICATION_MODE_ASYNC &&
+                                         XLByteLT(ackdPtr, sentPtr))))
+                               {
+                                       sleeptime = replication_timeout;
+                                       check_timeout = true;
+                               }
+                               else
+                               {
+                                       sleeptime = WalSndDelay;
+                                       check_timeout = false;
+                               }
+
                                /* Sleep */
-                               WaitLatchOrSocket(&MyWalSnd->latch, MyProcPort->sock,
-                                                                 WalSndDelay * 1000L);
+                               res = WaitLatchOrSocket(&MyWalSnd->latch, MyProcPort->sock,
+                                                                               true, (WalSndOutTail > 0),
+                                                                               sleeptime * 1000L);
+
+                               if (res == 0 && check_timeout)
+                               {
+                                       /*
+                                        * Since typically expiration of replication timeout means
+                                        * communication problem, we don't send the error message
+                                        * to the standby.
+                                        */
+                                       ereport(COMMERROR,
+                                                       (errmsg("terminating walsender process due to replication timeout")));
+                                       break;
+                               }
                        }
 
                        /* Process messages received from the standby */
@@ -575,7 +626,7 @@ WalSndLoop(void)
                else
                {
                        /* Attempt to send the log once every loop */
-                       if (!XLogSend(output_message, &caughtup))
+                       if (!XLogSend(&caughtup, &pending))
                                break;
                }
        }
@@ -789,24 +840,48 @@ XLogRead(char *buf, XLogRecPtr recptr, Size nbytes)
  * Read up to MAX_SEND_SIZE bytes of WAL that's been flushed to disk,
  * but not yet sent to the client, and send it.
  *
- * msgbuf is a work area in which the output message is constructed.  It's
- * passed in just so we can avoid re-palloc'ing the buffer on each cycle.
- * It must be of size 1 + sizeof(WalDataMessageHeader) + MAX_SEND_SIZE.
- *
  * If there is no unsent WAL remaining, *caughtup is set to true, otherwise
  * *caughtup is set to false.
  *
+ * If there is pending WAL in output buffer, *pending is set to true,
+ * otherwise *pending is set to false.
+ *
  * Returns true if OK, false if trouble.
  */
 static bool
-XLogSend(char *msgbuf, bool *caughtup)
+XLogSend(bool *caughtup, bool *pending)
 {
        XLogRecPtr      SendRqstPtr;
        XLogRecPtr      startptr;
-       XLogRecPtr      endptr;
+       static XLogRecPtr       endptr;
        Size            nbytes;
+       uint32          n32;
+       int                     res;
        WalDataMessageHeader msghdr;
 
+       /* Attempt to flush pending WAL in output buffer */
+       if (*pending)
+       {
+               if (WalSndOutHead != WalSndOutTail)
+               {
+                       res = pq_putbytes_if_writable(WalSndOutBuffer + WalSndOutHead,
+                                                                                 WalSndOutTail - WalSndOutHead);
+                       if (res == EOF)
+                               return false;
+                       WalSndOutHead += res;
+                       if (WalSndOutHead != WalSndOutTail)
+                               return true;
+               }
+
+               res = pq_flush_if_writable();
+               if (res == EOF)
+                       return false;
+               if (res == 0)
+                       return true;
+
+               goto updt;
+       }
+
        /*
         * Attempt to send all data that's already been written out and fsync'd to
         * disk.  We cannot go further than what's been written out given the
@@ -875,13 +950,19 @@ XLogSend(char *msgbuf, bool *caughtup)
        /*
         * OK to read and send the slice.
         */
-       msgbuf[0] = 'w';
+       WalSndOutBuffer[0] = 'd';
+       WalSndOutBuffer[5] = 'w';
+       WalSndOutHead = 0;
+       WalSndOutTail = 6 + sizeof(WalDataMessageHeader) + nbytes;
+
+       n32 = htonl((uint32) WalSndOutTail - 1);
+       memcpy(WalSndOutBuffer + 1, &n32, 4);
 
        /*
         * Read the log directly into the output buffer to avoid extra memcpy
         * calls.
         */
-       XLogRead(msgbuf + 1 + sizeof(WalDataMessageHeader), startptr, nbytes);
+       XLogRead(WalSndOutBuffer + 6 + sizeof(WalDataMessageHeader), startptr, nbytes);
 
        /*
         * We fill the message header last so that the send timestamp is taken as
@@ -891,13 +972,34 @@ XLogSend(char *msgbuf, bool *caughtup)
        msghdr.walEnd = SendRqstPtr;
        msghdr.sendTime = GetCurrentTimestamp();
 
-       memcpy(msgbuf + 1, &msghdr, sizeof(WalDataMessageHeader));
+       memcpy(WalSndOutBuffer + 6, &msghdr, sizeof(WalDataMessageHeader));
 
-       pq_putmessage('d', msgbuf, 1 + sizeof(WalDataMessageHeader) + nbytes);
+       res = pq_putbytes_if_writable(WalSndOutBuffer, WalSndOutTail);
+       if (res == EOF)
+               return false;
+
+       WalSndOutHead = res;
+       if (WalSndOutHead != WalSndOutTail)
+       {
+               *caughtup = false;
+               *pending = true;
+               return true;
+       }
 
        /* Flush pending output to the client */
-       if (pq_flush())
+       res = pq_flush_if_writable();
+       if (res == EOF)
                return false;
+       if (res == 0)
+       {
+               *caughtup = false;
+               *pending = true;
+               return true;
+       }
+
+updt:
+       WalSndOutHead = WalSndOutTail = 0;
+       *pending = false;
 
        sentPtr = endptr;