} LogstreamResult;
static StandbyReplyMessage reply_message;
+static StandbyHSFeedbackMessage feedback_message;
/*
* About SIGTERM handling:
static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr);
static void XLogWalRcvFlush(bool dying);
static void XLogWalRcvSendReply(void);
+static void XLogWalRcvSendHSFeedback(void);
/* Signal handlers */
static void WalRcvSigHupHandler(SIGNAL_ARGS);
/* Let the master know that we received some data. */
XLogWalRcvSendReply();
+ XLogWalRcvSendHSFeedback();
/*
* If we've written some records, flush them to disk and let the
* the master anyway, to report any progress in applying WAL.
*/
XLogWalRcvSendReply();
+ XLogWalRcvSendHSFeedback();
}
}
}
reply_message.apply = GetXLogReplayRecPtr();
reply_message.sendTime = now;
- /*
- * Get the OldestXmin and its associated epoch
- */
- if (hot_standby_feedback && HotStandbyActive())
- {
- TransactionId nextXid;
- uint32 nextEpoch;
-
- reply_message.xmin = GetOldestXmin(true, false);
-
- /*
- * Get epoch and adjust if nextXid and oldestXmin are different
- * sides of the epoch boundary.
- */
- GetNextXidAndEpoch(&nextXid, &nextEpoch);
- if (nextXid < reply_message.xmin)
- nextEpoch--;
- reply_message.epoch = nextEpoch;
- }
- else
- {
- reply_message.xmin = InvalidTransactionId;
- reply_message.epoch = 0;
- }
-
- elog(DEBUG2, "sending write %X/%X flush %X/%X apply %X/%X xmin %u epoch %u",
+ elog(DEBUG2, "sending write %X/%X flush %X/%X apply %X/%X",
reply_message.write.xlogid, reply_message.write.xrecoff,
reply_message.flush.xlogid, reply_message.flush.xrecoff,
- reply_message.apply.xlogid, reply_message.apply.xrecoff,
- reply_message.xmin,
- reply_message.epoch);
+ reply_message.apply.xlogid, reply_message.apply.xrecoff);
/* Prepend with the message type and send it. */
buf[0] = 'r';
memcpy(&buf[1], &reply_message, sizeof(StandbyReplyMessage));
walrcv_send(buf, sizeof(StandbyReplyMessage) + 1);
}
+
+/*
+ * Send hot standby feedback message to primary, plus the current time,
+ * in case they don't have a watch.
+ */
+static void
+XLogWalRcvSendHSFeedback(void)
+{
+ char buf[sizeof(StandbyHSFeedbackMessage) + 1];
+ TimestampTz now;
+ TransactionId nextXid;
+ uint32 nextEpoch;
+ TransactionId xmin;
+
+ /*
+ * If the user doesn't want status to be reported to the master, be sure
+ * to exit before doing anything at all.
+ */
+ if (!hot_standby_feedback)
+ return;
+
+ /* Get current timestamp. */
+ now = GetCurrentTimestamp();
+
+ /*
+ * Send feedback at most once per wal_receiver_status_interval.
+ */
+ if (!TimestampDifferenceExceeds(feedback_message.sendTime, now,
+ wal_receiver_status_interval * 1000))
+ return;
+
+ /*
+ * If Hot Standby is not yet active there is nothing to send.
+ * Check this after the interval has expired to reduce number of
+ * calls.
+ */
+ if (!HotStandbyActive())
+ return;
+
+ /*
+ * Make the expensive call to get the oldest xmin once we are
+ * certain everything else has been checked.
+ */
+ xmin = GetOldestXmin(true, false);
+
+ /*
+ * Get epoch and adjust if nextXid and oldestXmin are different
+ * sides of the epoch boundary.
+ */
+ GetNextXidAndEpoch(&nextXid, &nextEpoch);
+ if (nextXid < xmin)
+ nextEpoch--;
+
+ /*
+ * Always send feedback message.
+ */
+ feedback_message.sendTime = now;
+ feedback_message.xmin = xmin;
+ feedback_message.epoch = nextEpoch;
+
+ elog(DEBUG2, "sending hot standby feedback xmin %u epoch %u",
+ feedback_message.xmin,
+ feedback_message.epoch);
+
+ /* Prepend with the message type and send it. */
+ buf[0] = 'h';
+ memcpy(&buf[1], &feedback_message, sizeof(StandbyHSFeedbackMessage));
+ walrcv_send(buf, sizeof(StandbyHSFeedbackMessage) + 1);
+}
static bool XLogSend(char *msgbuf, bool *caughtup);
static void IdentifySystem(void);
static void StartReplication(StartReplicationCmd * cmd);
+static void ProcessStandbyMessage(void);
static void ProcessStandbyReplyMessage(void);
+static void ProcessStandbyHSFeedbackMessage(void);
static void ProcessRepliesIfAny(void);
unsigned char firstchar;
int r;
- r = pq_getbyte_if_available(&firstchar);
- if (r < 0)
- {
- /* unexpected error or EOF */
- ereport(COMMERROR,
- (errcode(ERRCODE_PROTOCOL_VIOLATION),
- errmsg("unexpected EOF on standby connection")));
- proc_exit(0);
- }
- if (r == 0)
+ for (;;)
{
- /* no data available without blocking */
- return;
- }
+ r = pq_getbyte_if_available(&firstchar);
+ if (r < 0)
+ {
+ /* unexpected error or EOF */
+ ereport(COMMERROR,
+ (errcode(ERRCODE_PROTOCOL_VIOLATION),
+ errmsg("unexpected EOF on standby connection")));
+ proc_exit(0);
+ }
+ if (r == 0)
+ {
+ /* no data available without blocking */
+ return;
+ }
- /* Handle the very limited subset of commands expected in this phase */
- switch (firstchar)
- {
- /*
- * 'd' means a standby reply wrapped in a CopyData packet.
- */
- case 'd':
- ProcessStandbyReplyMessage();
- break;
+ /* Handle the very limited subset of commands expected in this phase */
+ switch (firstchar)
+ {
+ /*
+ * 'd' means a standby reply wrapped in a CopyData packet.
+ */
+ case 'd':
+ ProcessStandbyMessage();
+ break;
- /*
- * 'X' means that the standby is closing down the socket.
- */
- case 'X':
- proc_exit(0);
+ /*
+ * 'X' means that the standby is closing down the socket.
+ */
+ case 'X':
+ proc_exit(0);
- default:
- ereport(FATAL,
- (errcode(ERRCODE_PROTOCOL_VIOLATION),
- errmsg("invalid standby closing message type %d",
- firstchar)));
+ default:
+ ereport(FATAL,
+ (errcode(ERRCODE_PROTOCOL_VIOLATION),
+ errmsg("invalid standby closing message type %d",
+ firstchar)));
+ }
}
}
* Process a status update message received from standby.
*/
static void
-ProcessStandbyReplyMessage(void)
+ProcessStandbyMessage(void)
{
- StandbyReplyMessage reply;
char msgtype;
- TransactionId newxmin = InvalidTransactionId;
resetStringInfo(&reply_message);
* one type.
*/
msgtype = pq_getmsgbyte(&reply_message);
- if (msgtype != 'r')
+
+ switch (msgtype)
{
- ereport(COMMERROR,
- (errcode(ERRCODE_PROTOCOL_VIOLATION),
- errmsg("unexpected message type %c", msgtype)));
- proc_exit(0);
+ case 'r':
+ ProcessStandbyReplyMessage();
+ break;
+
+ case 'h':
+ ProcessStandbyHSFeedbackMessage();
+ break;
+
+ default:
+ ereport(COMMERROR,
+ (errcode(ERRCODE_PROTOCOL_VIOLATION),
+ errmsg("unexpected message type %c", msgtype)));
+ proc_exit(0);
}
+}
+
+/*
+ * Regular reply from standby advising of WAL positions on standby server.
+ */
+static void
+ProcessStandbyReplyMessage(void)
+{
+ StandbyReplyMessage reply;
pq_copymsgbytes(&reply_message, (char *) &reply, sizeof(StandbyReplyMessage));
- elog(DEBUG2, "write %X/%X flush %X/%X apply %X/%X xmin %u epoch %u",
+ elog(DEBUG2, "write %X/%X flush %X/%X apply %X/%X",
reply.write.xlogid, reply.write.xrecoff,
reply.flush.xlogid, reply.flush.xrecoff,
- reply.apply.xlogid, reply.apply.xrecoff,
- reply.xmin,
- reply.epoch);
+ reply.apply.xlogid, reply.apply.xrecoff);
/*
* Update shared state for this WalSender process
walsnd->apply = reply.apply;
SpinLockRelease(&walsnd->mutex);
}
+}
+
+/*
+ * Hot Standby feedback
+ */
+static void
+ProcessStandbyHSFeedbackMessage(void)
+{
+ StandbyHSFeedbackMessage reply;
+ TransactionId newxmin = InvalidTransactionId;
+
+ pq_copymsgbytes(&reply_message, (char *) &reply, sizeof(StandbyHSFeedbackMessage));
+
+ elog(DEBUG2, "hot standby feedback xmin %u epoch %u",
+ reply.xmin,
+ reply.epoch);
/*
* Update the WalSender's proc xmin to allow it to be visible
XLogRecPtr flush;
XLogRecPtr apply;
+ /* Sender's system clock at the time of transmission */
+ TimestampTz sendTime;
+} StandbyReplyMessage;
+
+/*
+ * Hot Standby feedback from standby (message type 'h'). This is wrapped within
+ * a CopyData message at the FE/BE protocol level.
+ *
+ * Note that the data length is not specified here.
+ */
+typedef struct
+{
/*
* The current xmin and epoch from the standby, for Hot Standby feedback.
* This may be invalid if the standby-side does not support feedback,
TransactionId xmin;
uint32 epoch;
-
/* Sender's system clock at the time of transmission */
TimestampTz sendTime;
-} StandbyReplyMessage;
+} StandbyHSFeedbackMessage;
/*
* Maximum data payload in a WAL data message. Must be >= XLOG_BLCKSZ.