OSDN Git Service

Fix typo.
[pg-rex/syncrep.git] / src / backend / replication / walsender.c
index 1fa09cf..bfcb49f 100644 (file)
@@ -129,7 +129,7 @@ static void InitWalSnd(void);
 static void WalSndHandshake(void);
 static void WalSndKill(int code, Datum arg);
 static void XLogRead(char *buf, XLogRecPtr recptr, Size nbytes);
-static bool XLogSend(bool *caughtup);
+static bool XLogSend(bool *caughtup, bool *pending);
 static void ProcessStreamMsgs(StringInfo inMsg);
 
 static void RegisterWalSndWaiter(BackendId backendId, XLogRecPtr record,
@@ -475,7 +475,7 @@ ProcessStreamMsgs(StringInfo inMsg)
                        default:
                                ereport(FATAL,
                                                (errcode(ERRCODE_PROTOCOL_VIOLATION),
-                                                errmsg("invalid standby closing message type %d",
+                                                errmsg("invalid standby message type %d",
                                                                firstchar)));
                }
        }
@@ -500,6 +500,7 @@ WalSndLoop(void)
 {
        StringInfoData  input_message;
        bool            caughtup = false;
+       bool            pending = false;
 
        initStringInfo(&input_message);
 
@@ -534,9 +535,9 @@ WalSndLoop(void)
                 */
                if (ready_to_stop)
                {
-                       if (!XLogSend(&caughtup))
+                       if (!XLogSend(&caughtup, &pending))
                                break;
-                       if (caughtup)
+                       if (caughtup && !pending)
                                shutdown_requested = true;
                }
 
@@ -551,10 +552,11 @@ WalSndLoop(void)
                }
 
                /*
-                * If we had sent all accumulated WAL in last round, nap for the
-                * configured time before retrying.
+                * If we had sent all accumulated WAL in last round or could not
+                * flush pending WAL in output buffer because the socket was not
+                * writable, nap for the configured time before retrying.
                 */
-               if (caughtup || WalSndOutTail > 0)
+               if (caughtup || pending)
                {
                        /*
                         * Even if we wrote all the WAL that was available when we started
@@ -565,21 +567,57 @@ WalSndLoop(void)
                         */
                        ResetLatch(&MyWalSnd->latch);
 
-                       if (!XLogSend(&caughtup))
+                       if (!XLogSend(&caughtup, &pending))
                                break;
-                       if ((caughtup || WalSndOutTail > 0) && !got_SIGHUP &&
-                               !ready_to_stop && !shutdown_requested)
+                       if ((caughtup || pending) && !got_SIGHUP && !ready_to_stop &&
+                               !shutdown_requested)
                        {
+                               bool            check_timeout;
+                               long            sleeptime;
+                               int                     res;
+
                                /*
                                 * XXX: We don't really need the periodic wakeups anymore,
                                 * WaitLatchOrSocket should reliably wake up as soon as
                                 * something interesting happens.
                                 */
 
+                               /*
+                                * Check for replication timeout if it's enabled and we need
+                                * to wait until the socket has become writable to flush
+                                * pending WAL in output buffer or until the Ack message
+                                * from the standby has become available.
+                                */
+                               if (replication_timeout > 0 &&
+                                       (pending ||
+                                        (rplMode != REPLICATION_MODE_ASYNC &&
+                                         XLByteLT(ackdPtr, sentPtr))))
+                               {
+                                       sleeptime = replication_timeout;
+                                       check_timeout = true;
+                               }
+                               else
+                               {
+                                       sleeptime = WalSndDelay;
+                                       check_timeout = false;
+                               }
+
                                /* Sleep */
-                               WaitLatchOrSocket(&MyWalSnd->latch, MyProcPort->sock,
-                                                                 true, (WalSndOutTail > 0),
-                                                                 WalSndDelay * 1000L);
+                               res = WaitLatchOrSocket(&MyWalSnd->latch, MyProcPort->sock,
+                                                                               true, (WalSndOutTail > 0),
+                                                                               sleeptime * 1000L);
+
+                               if (res == 0 && check_timeout)
+                               {
+                                       /*
+                                        * Since typically expiration of replication timeout means
+                                        * communication problem, we don't send the error message
+                                        * to the standby.
+                                        */
+                                       ereport(COMMERROR,
+                                                       (errmsg("terminating walsender process due to replication timeout")));
+                                       break;
+                               }
                        }
 
                        /* Process messages received from the standby */
@@ -588,7 +626,7 @@ WalSndLoop(void)
                else
                {
                        /* Attempt to send the log once every loop */
-                       if (!XLogSend(&caughtup))
+                       if (!XLogSend(&caughtup, &pending))
                                break;
                }
        }
@@ -805,10 +843,13 @@ XLogRead(char *buf, XLogRecPtr recptr, Size nbytes)
  * If there is no unsent WAL remaining, *caughtup is set to true, otherwise
  * *caughtup is set to false.
  *
+ * If there is pending WAL in output buffer, *pending is set to true,
+ * otherwise *pending is set to false.
+ *
  * Returns true if OK, false if trouble.
  */
 static bool
-XLogSend(bool *caughtup)
+XLogSend(bool *caughtup, bool *pending)
 {
        XLogRecPtr      SendRqstPtr;
        XLogRecPtr      startptr;
@@ -818,8 +859,8 @@ XLogSend(bool *caughtup)
        int                     res;
        WalDataMessageHeader msghdr;
 
-       /* Attempt to flush the unsent data in WalSndOutBuffer and PqSendBuffer */
-       if (WalSndOutTail > 0)
+       /* Attempt to flush pending WAL in output buffer */
+       if (*pending)
        {
                if (WalSndOutHead != WalSndOutTail)
                {
@@ -838,7 +879,6 @@ XLogSend(bool *caughtup)
                if (res == 0)
                        return true;
 
-               WalSndOutHead = WalSndOutTail = 0;
                goto updt;
        }
 
@@ -942,6 +982,7 @@ XLogSend(bool *caughtup)
        if (WalSndOutHead != WalSndOutTail)
        {
                *caughtup = false;
+               *pending = true;
                return true;
        }
 
@@ -952,11 +993,14 @@ XLogSend(bool *caughtup)
        if (res == 0)
        {
                *caughtup = false;
+               *pending = true;
                return true;
        }
-       WalSndOutHead = WalSndOutTail = 0;
 
 updt:
+       WalSndOutHead = WalSndOutTail = 0;
+       *pending = false;
+
        sentPtr = endptr;
 
        /* Update shared memory status */