OSDN Git Service

Separate messages for standby replies and hot standby feedback.
authorSimon Riggs <simon@2ndQuadrant.com>
Fri, 18 Feb 2011 11:31:49 +0000 (11:31 +0000)
committerSimon Riggs <simon@2ndQuadrant.com>
Fri, 18 Feb 2011 11:31:49 +0000 (11:31 +0000)
Allow messages to be sent at different times, and greatly reduce
the frequency of hot standby feedback. Refactor to allow additional
message types.

src/backend/replication/walreceiver.c
src/backend/replication/walsender.c
src/include/replication/walprotocol.h

index ee09468..c7f5bd5 100644 (file)
@@ -95,6 +95,7 @@ static struct
 }      LogstreamResult;
 
 static StandbyReplyMessage     reply_message;
+static StandbyHSFeedbackMessage        feedback_message;
 
 /*
  * About SIGTERM handling:
@@ -123,6 +124,7 @@ static void XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len);
 static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr);
 static void XLogWalRcvFlush(bool dying);
 static void XLogWalRcvSendReply(void);
+static void XLogWalRcvSendHSFeedback(void);
 
 /* Signal handlers */
 static void WalRcvSigHupHandler(SIGNAL_ARGS);
@@ -317,6 +319,7 @@ WalReceiverMain(void)
 
                        /* Let the master know that we received some data. */
                        XLogWalRcvSendReply();
+                       XLogWalRcvSendHSFeedback();
 
                        /*
                         * If we've written some records, flush them to disk and let the
@@ -331,6 +334,7 @@ WalReceiverMain(void)
                         * the master anyway, to report any progress in applying WAL.
                         */
                        XLogWalRcvSendReply();
+                       XLogWalRcvSendHSFeedback();
                }
        }
 }
@@ -619,40 +623,82 @@ XLogWalRcvSendReply(void)
        reply_message.apply = GetXLogReplayRecPtr();
        reply_message.sendTime = now;
 
-       /*
-        * Get the OldestXmin and its associated epoch
-        */
-       if (hot_standby_feedback && HotStandbyActive())
-       {
-               TransactionId   nextXid;
-               uint32                  nextEpoch;
-
-               reply_message.xmin = GetOldestXmin(true, false);
-
-               /*
-                * Get epoch and adjust if nextXid and oldestXmin are different
-                * sides of the epoch boundary.
-                */
-               GetNextXidAndEpoch(&nextXid, &nextEpoch);
-               if (nextXid < reply_message.xmin)
-                       nextEpoch--;
-               reply_message.epoch = nextEpoch;
-       }
-       else
-       {
-               reply_message.xmin = InvalidTransactionId;
-               reply_message.epoch = 0;
-       }
-
-       elog(DEBUG2, "sending write %X/%X flush %X/%X apply %X/%X xmin %u epoch %u",
+       elog(DEBUG2, "sending write %X/%X flush %X/%X apply %X/%X",
                                 reply_message.write.xlogid, reply_message.write.xrecoff,
                                 reply_message.flush.xlogid, reply_message.flush.xrecoff,
-                                reply_message.apply.xlogid, reply_message.apply.xrecoff,
-                                reply_message.xmin,
-                                reply_message.epoch);
+                                reply_message.apply.xlogid, reply_message.apply.xrecoff);
 
        /* Prepend with the message type and send it. */
        buf[0] = 'r';
        memcpy(&buf[1], &reply_message, sizeof(StandbyReplyMessage));
        walrcv_send(buf, sizeof(StandbyReplyMessage) + 1);
 }
+
+/*
+ * Send hot standby feedback message to primary, plus the current time,
+ * in case they don't have a watch.
+ */
+static void
+XLogWalRcvSendHSFeedback(void)
+{
+       char                    buf[sizeof(StandbyHSFeedbackMessage) + 1];
+       TimestampTz             now;
+       TransactionId   nextXid;
+       uint32                  nextEpoch;
+       TransactionId   xmin;
+
+       /*
+        * If the user doesn't want status to be reported to the master, be sure
+        * to exit before doing anything at all.
+        */
+       if (!hot_standby_feedback)
+               return;
+
+       /* Get current timestamp. */
+       now = GetCurrentTimestamp();
+
+       /*
+        * Send feedback at most once per wal_receiver_status_interval.
+        */
+       if (!TimestampDifferenceExceeds(feedback_message.sendTime, now,
+                       wal_receiver_status_interval * 1000))
+               return;
+
+       /*
+        * If Hot Standby is not yet active there is nothing to send.
+        * Check this after the interval has expired to reduce number of
+        * calls.
+        */
+       if (!HotStandbyActive())
+               return;
+
+       /*
+        * Make the expensive call to get the oldest xmin once we are
+        * certain everything else has been checked.
+        */
+       xmin = GetOldestXmin(true, false);
+
+       /*
+        * Get epoch and adjust if nextXid and oldestXmin are different
+        * sides of the epoch boundary.
+        */
+       GetNextXidAndEpoch(&nextXid, &nextEpoch);
+       if (nextXid < xmin)
+               nextEpoch--;
+
+       /*
+        * Always send feedback message.
+        */
+       feedback_message.sendTime = now;
+       feedback_message.xmin = xmin;
+       feedback_message.epoch = nextEpoch;
+
+       elog(DEBUG2, "sending hot standby feedback xmin %u epoch %u",
+                                feedback_message.xmin,
+                                feedback_message.epoch);
+
+       /* Prepend with the message type and send it. */
+       buf[0] = 'h';
+       memcpy(&buf[1], &feedback_message, sizeof(StandbyHSFeedbackMessage));
+       walrcv_send(buf, sizeof(StandbyHSFeedbackMessage) + 1);
+}
index a6a7a14..e04d59e 100644 (file)
@@ -116,7 +116,9 @@ static void WalSndKill(int code, Datum arg);
 static bool XLogSend(char *msgbuf, bool *caughtup);
 static void IdentifySystem(void);
 static void StartReplication(StartReplicationCmd * cmd);
+static void ProcessStandbyMessage(void);
 static void ProcessStandbyReplyMessage(void);
+static void ProcessStandbyHSFeedbackMessage(void);
 static void ProcessRepliesIfAny(void);
 
 
@@ -456,42 +458,45 @@ ProcessRepliesIfAny(void)
        unsigned char firstchar;
        int                     r;
 
-       r = pq_getbyte_if_available(&firstchar);
-       if (r < 0)
-       {
-               /* unexpected error or EOF */
-               ereport(COMMERROR,
-                               (errcode(ERRCODE_PROTOCOL_VIOLATION),
-                                errmsg("unexpected EOF on standby connection")));
-               proc_exit(0);
-       }
-       if (r == 0)
+       for (;;)
        {
-               /* no data available without blocking */
-               return;
-       }
+               r = pq_getbyte_if_available(&firstchar);
+               if (r < 0)
+               {
+                       /* unexpected error or EOF */
+                       ereport(COMMERROR,
+                                       (errcode(ERRCODE_PROTOCOL_VIOLATION),
+                                        errmsg("unexpected EOF on standby connection")));
+                       proc_exit(0);
+               }
+               if (r == 0)
+               {
+                       /* no data available without blocking */
+                       return;
+               }
 
-       /* Handle the very limited subset of commands expected in this phase */
-       switch (firstchar)
-       {
-                       /*
-                        * 'd' means a standby reply wrapped in a CopyData packet.
-                        */
-               case 'd':
-                       ProcessStandbyReplyMessage();
-                       break;
+               /* Handle the very limited subset of commands expected in this phase */
+               switch (firstchar)
+               {
+                               /*
+                                * 'd' means a standby reply wrapped in a CopyData packet.
+                                */
+                       case 'd':
+                               ProcessStandbyMessage();
+                               break;
 
-                       /*
-                        * 'X' means that the standby is closing down the socket.
-                        */
-               case 'X':
-                       proc_exit(0);
+                               /*
+                                * 'X' means that the standby is closing down the socket.
+                                */
+                       case 'X':
+                               proc_exit(0);
 
-               default:
-                       ereport(FATAL,
-                                       (errcode(ERRCODE_PROTOCOL_VIOLATION),
-                                        errmsg("invalid standby closing message type %d",
-                                                       firstchar)));
+                       default:
+                               ereport(FATAL,
+                                               (errcode(ERRCODE_PROTOCOL_VIOLATION),
+                                                errmsg("invalid standby closing message type %d",
+                                                               firstchar)));
+               }
        }
 }
 
@@ -499,11 +504,9 @@ ProcessRepliesIfAny(void)
  * Process a status update message received from standby.
  */
 static void
-ProcessStandbyReplyMessage(void)
+ProcessStandbyMessage(void)
 {
-       StandbyReplyMessage     reply;
        char msgtype;
-       TransactionId newxmin = InvalidTransactionId;
 
        resetStringInfo(&reply_message);
 
@@ -523,22 +526,39 @@ ProcessStandbyReplyMessage(void)
         * one type.
         */
        msgtype = pq_getmsgbyte(&reply_message);
-       if (msgtype != 'r')
+
+       switch (msgtype)
        {
-               ereport(COMMERROR,
-                               (errcode(ERRCODE_PROTOCOL_VIOLATION),
-                                errmsg("unexpected message type %c", msgtype)));
-               proc_exit(0);
+               case 'r':
+                       ProcessStandbyReplyMessage();
+                       break;
+
+               case 'h':
+                       ProcessStandbyHSFeedbackMessage();
+                       break;
+
+               default:
+                       ereport(COMMERROR,
+                                       (errcode(ERRCODE_PROTOCOL_VIOLATION),
+                                        errmsg("unexpected message type %c", msgtype)));
+                       proc_exit(0);
        }
+}
+
+/*
+ * Regular reply from standby advising of WAL positions on standby server.
+ */
+static void
+ProcessStandbyReplyMessage(void)
+{
+       StandbyReplyMessage     reply;
 
        pq_copymsgbytes(&reply_message, (char *) &reply, sizeof(StandbyReplyMessage));
 
-       elog(DEBUG2, "write %X/%X flush %X/%X apply %X/%X xmin %u epoch %u",
+       elog(DEBUG2, "write %X/%X flush %X/%X apply %X/%X",
                 reply.write.xlogid, reply.write.xrecoff,
                 reply.flush.xlogid, reply.flush.xrecoff,
-                reply.apply.xlogid, reply.apply.xrecoff,
-                reply.xmin,
-                reply.epoch);
+                reply.apply.xlogid, reply.apply.xrecoff);
 
        /*
         * Update shared state for this WalSender process
@@ -554,6 +574,22 @@ ProcessStandbyReplyMessage(void)
                walsnd->apply = reply.apply;
                SpinLockRelease(&walsnd->mutex);
        }
+}
+
+/*
+ * Hot Standby feedback
+ */
+static void
+ProcessStandbyHSFeedbackMessage(void)
+{
+       StandbyHSFeedbackMessage        reply;
+       TransactionId newxmin = InvalidTransactionId;
+
+       pq_copymsgbytes(&reply_message, (char *) &reply, sizeof(StandbyHSFeedbackMessage));
+
+       elog(DEBUG2, "hot standby feedback xmin %u epoch %u",
+                reply.xmin,
+                reply.epoch);
 
        /*
         * Update the WalSender's proc xmin to allow it to be visible
index da94b6b..9baca94 100644 (file)
@@ -56,6 +56,18 @@ typedef struct
        XLogRecPtr      flush;
        XLogRecPtr      apply;
 
+       /* Sender's system clock at the time of transmission */
+       TimestampTz sendTime;
+} StandbyReplyMessage;
+
+/*
+ * Hot Standby feedback from standby (message type 'h').  This is wrapped within
+ * a CopyData message at the FE/BE protocol level.
+ *
+ * Note that the data length is not specified here.
+ */
+typedef struct
+{
        /*
         * The current xmin and epoch from the standby, for Hot Standby feedback.
         * This may be invalid if the standby-side does not support feedback,
@@ -64,10 +76,9 @@ typedef struct
        TransactionId   xmin;
        uint32                  epoch;
 
-
        /* Sender's system clock at the time of transmission */
        TimestampTz sendTime;
-} StandbyReplyMessage;
+} StandbyHSFeedbackMessage;
 
 /*
  * Maximum data payload in a WAL data message. Must be >= XLOG_BLCKSZ.