OSDN Git Service

In walsender, don't sleep if there's outstanding WAL waiting to be sent,
authorHeikki Linnakangas <heikki.linnakangas@iki.fi>
Wed, 26 May 2010 22:21:33 +0000 (22:21 +0000)
committerHeikki Linnakangas <heikki.linnakangas@iki.fi>
Wed, 26 May 2010 22:21:33 +0000 (22:21 +0000)
otherwise we effectively rate-limit the streaming as pointed out by
Simon Riggs. Also, send the WAL in smaller chunks, to respond to signals
more promptly.

src/backend/replication/walsender.c

index 2227220..0d976f5 100644 (file)
@@ -30,7 +30,7 @@
  *
  *
  * IDENTIFICATION
- *       $PostgreSQL: pgsql/src/backend/replication/walsender.c,v 1.20 2010/05/09 18:11:55 tgl Exp $
+ *       $PostgreSQL: pgsql/src/backend/replication/walsender.c,v 1.21 2010/05/26 22:21:33 heikki Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -100,13 +100,19 @@ static void InitWalSnd(void);
 static void WalSndHandshake(void);
 static void WalSndKill(int code, Datum arg);
 static void XLogRead(char *buf, XLogRecPtr recptr, Size nbytes);
-static bool XLogSend(StringInfo outMsg);
+static bool XLogSend(StringInfo outMsg, bool *caughtup);
 static void CheckClosedConnection(void);
 
 /*
  * How much WAL to send in one message? Must be >= XLOG_BLCKSZ.
+ *
+ * We don't have a good idea of what a good value would be; there's some
+ * overhead per message in both walsender and walreceiver, but on the other
+ * hand sending large batches makes walsender less responsive to signals
+ * because signals are checked only between messages. 128kB seems like
+ * a reasonable guess for now.
  */
-#define MAX_SEND_SIZE (XLOG_SEG_SIZE / 2)
+#define MAX_SEND_SIZE (128 * 1024)
 
 /* Main entry point for walsender process */
 int
@@ -360,6 +366,7 @@ static int
 WalSndLoop(void)
 {
        StringInfoData output_message;
+       bool            caughtup = false;
 
        initStringInfo(&output_message);
 
@@ -387,7 +394,7 @@ WalSndLoop(void)
                 */
                if (ready_to_stop)
                {
-                       XLogSend(&output_message);
+                       XLogSend(&output_message, &caughtup);
                        shutdown_requested = true;
                }
 
@@ -402,31 +409,32 @@ WalSndLoop(void)
                }
 
                /*
-                * Nap for the configured time or until a message arrives.
+                * If we had sent all accumulated WAL in last round, nap for the
+                * configured time before retrying.
                 *
                 * On some platforms, signals won't interrupt the sleep.  To ensure we
                 * respond reasonably promptly when someone signals us, break down the
                 * sleep into NAPTIME_PER_CYCLE increments, and check for
                 * interrupts after each nap.
                 */
-               remain = WalSndDelay * 1000L;
-               while (remain > 0)
+               if (caughtup)
                {
-                       if (got_SIGHUP || shutdown_requested || ready_to_stop)
-                               break;
+                       remain = WalSndDelay * 1000L;
+                       while (remain > 0)
+                       {
+                               /* Check for interrupts */
+                               if (got_SIGHUP || shutdown_requested || ready_to_stop)
+                                       break;
 
-                       /*
-                        * Check to see whether a message from the standby or an interrupt
-                        * from other processes has arrived.
-                        */
-                       pg_usleep(remain > NAPTIME_PER_CYCLE ? NAPTIME_PER_CYCLE : remain);
-                       CheckClosedConnection();
+                               /* Sleep and check that the connection is still alive */
+                               pg_usleep(remain > NAPTIME_PER_CYCLE ? NAPTIME_PER_CYCLE : remain);
+                               CheckClosedConnection();
 
-                       remain -= NAPTIME_PER_CYCLE;
+                               remain -= NAPTIME_PER_CYCLE;
+                       }
                }
-
                /* Attempt to send the log once every loop */
-               if (!XLogSend(&output_message))
+               if (!XLogSend(&output_message, &caughtup))
                        goto eof;
        }
 
@@ -623,15 +631,20 @@ XLogRead(char *buf, XLogRecPtr recptr, Size nbytes)
 }
 
 /*
- * Read all WAL that's been written (and flushed) since last cycle, and send
- * it to client.
+ * Read up to MAX_SEND_SIZE bytes of WAL that's been written (and flushed),
+ * but not yet sent to the client, and send it. If there is no unsent WAL,
+ * *caughtup is set to true and nothing is sent, otherwise *caughtup is set
+ * to false.
  *
  * Returns true if OK, false if trouble.
  */
 static bool
-XLogSend(StringInfo outMsg)
+XLogSend(StringInfo outMsg, bool *caughtup)
 {
        XLogRecPtr      SendRqstPtr;
+       XLogRecPtr      startptr;
+       XLogRecPtr      endptr;
+       Size            nbytes;
        char            activitymsg[50];
 
        /* use volatile pointer to prevent code rearrangement */
@@ -642,84 +655,82 @@ XLogSend(StringInfo outMsg)
 
        /* Quick exit if nothing to do */
        if (!XLByteLT(sentPtr, SendRqstPtr))
+       {
+               *caughtup = true;
                return true;
+       }
+       /*
+        * Otherwise let the caller know that we're not fully caught up. Unless
+        * there's a huge backlog, we'll be caught up to the current WriteRecPtr
+        * after we've sent everything below, but more WAL could accumulate while
+        * we're busy sending.
+        */
+       *caughtup = false;
 
        /*
-        * We gather multiple records together by issuing just one XLogRead() of a
-        * suitable size, and send them as one CopyData message. Repeat until
-        * we've sent everything we can.
+        * Figure out how much to send in one message. If there's less than
+        * MAX_SEND_SIZE bytes to send, send everything. Otherwise send
+        * MAX_SEND_SIZE bytes, but round to page boundary.
+        *
+        * The rounding is not only for performance reasons. Walreceiver
+        * relies on the fact that we never split a WAL record across two
+        * messages. Since a long WAL record is split at page boundary into
+        * continuation records, page boundary is always a safe cut-off point.
+        * We also assume that SendRqstPtr never points in the middle of a WAL
+        * record.
         */
-       while (XLByteLT(sentPtr, SendRqstPtr))
+       startptr = sentPtr;
+       if (startptr.xrecoff >= XLogFileSize)
        {
-               XLogRecPtr      startptr;
-               XLogRecPtr      endptr;
-               Size            nbytes;
-
                /*
-                * Figure out how much to send in one message. If there's less than
-                * MAX_SEND_SIZE bytes to send, send everything. Otherwise send
-                * MAX_SEND_SIZE bytes, but round to page boundary.
-                *
-                * The rounding is not only for performance reasons. Walreceiver
-                * relies on the fact that we never split a WAL record across two
-                * messages. Since a long WAL record is split at page boundary into
-                * continuation records, page boundary is always a safe cut-off point.
-                * We also assume that SendRqstPtr never points in the middle of a WAL
-                * record.
+                * crossing a logid boundary, skip the non-existent last log
+                * segment in previous logical log file.
                 */
-               startptr = sentPtr;
-               if (startptr.xrecoff >= XLogFileSize)
-               {
-                       /*
-                        * crossing a logid boundary, skip the non-existent last log
-                        * segment in previous logical log file.
-                        */
-                       startptr.xlogid += 1;
-                       startptr.xrecoff = 0;
-               }
+               startptr.xlogid += 1;
+               startptr.xrecoff = 0;
+       }
 
-               endptr = startptr;
-               XLByteAdvance(endptr, MAX_SEND_SIZE);
-               /* round down to page boundary. */
-               endptr.xrecoff -= (endptr.xrecoff % XLOG_BLCKSZ);
-               /* if we went beyond SendRqstPtr, back off */
-               if (XLByteLT(SendRqstPtr, endptr))
-                       endptr = SendRqstPtr;
+       endptr = startptr;
+       XLByteAdvance(endptr, MAX_SEND_SIZE);
+       /* round down to page boundary. */
+       endptr.xrecoff -= (endptr.xrecoff % XLOG_BLCKSZ);
+       /* if we went beyond SendRqstPtr, back off */
+       if (XLByteLT(SendRqstPtr, endptr))
+               endptr = SendRqstPtr;
 
-               /*
-                * OK to read and send the slice.
-                *
-                * We don't need to convert the xlogid/xrecoff from host byte order to
-                * network byte order because the both server can be expected to have
-                * the same byte order. If they have different byte order, we don't
-                * reach here.
-                */
-               pq_sendbyte(outMsg, 'w');
-               pq_sendbytes(outMsg, (char *) &startptr, sizeof(startptr));
+       /*
+        * OK to read and send the slice.
+        *
+        * We don't need to convert the xlogid/xrecoff from host byte order to
+        * network byte order because the both server can be expected to have
+        * the same byte order. If they have different byte order, we don't
+        * reach here.
+        */
+       pq_sendbyte(outMsg, 'w');
+       pq_sendbytes(outMsg, (char *) &startptr, sizeof(startptr));
 
-               if (endptr.xlogid != startptr.xlogid)
-               {
-                       Assert(endptr.xlogid == startptr.xlogid + 1);
-                       nbytes = endptr.xrecoff + XLogFileSize - startptr.xrecoff;
-               }
-               else
-                       nbytes = endptr.xrecoff - startptr.xrecoff;
+       if (endptr.xlogid != startptr.xlogid)
+       {
+               Assert(endptr.xlogid == startptr.xlogid + 1);
+               nbytes = endptr.xrecoff + XLogFileSize - startptr.xrecoff;
+       }
+       else
+               nbytes = endptr.xrecoff - startptr.xrecoff;
 
-               sentPtr = endptr;
+       sentPtr = endptr;
 
-               /*
-                * Read the log directly into the output buffer to prevent extra
-                * memcpy calls.
-                */
-               enlargeStringInfo(outMsg, nbytes);
+       /*
+        * Read the log directly into the output buffer to prevent extra
+        * memcpy calls.
+        */
+       enlargeStringInfo(outMsg, nbytes);
 
-               XLogRead(&outMsg->data[outMsg->len], startptr, nbytes);
-               outMsg->len += nbytes;
-               outMsg->data[outMsg->len] = '\0';
+       XLogRead(&outMsg->data[outMsg->len], startptr, nbytes);
+       outMsg->len += nbytes;
+       outMsg->data[outMsg->len] = '\0';
 
-               pq_putmessage('d', outMsg->data, outMsg->len);
-               resetStringInfo(outMsg);
-       }
+       pq_putmessage('d', outMsg->data, outMsg->len);
+       resetStringInfo(outMsg);
 
        /* Update shared memory status */
        SpinLockAcquire(&walsnd->mutex);