*/
static XLogRecPtr sentPtr = {0, 0};
+/*
+ * How far have we completed replication already? This is also
+ * advertised in MyWalSnd->ackdPtr. This is not used in asynchronous
+ * replication case.
+ */
+static XLogRecPtr ackdPtr = {0, 0};
+
/* Flags set by signal handlers for later service in main loop */
static volatile sig_atomic_t got_SIGHUP = false;
static volatile sig_atomic_t shutdown_requested = false;
static void WalSndKill(int code, Datum arg);
static void XLogRead(char *buf, XLogRecPtr recptr, Size nbytes);
static bool XLogSend(char *msgbuf, bool *caughtup);
-static void CheckClosedConnection(void);
+static void ProcessStreamMsgs(StringInfo inMsg);
/* Main entry point for walsender process */
pq_flush();
/*
- * Initialize position to the received one, then the
+ * Initialize positions to the received one, then the
* xlog records begin to be shipped from that position
*/
- sentPtr = recptr;
+ sentPtr = ackdPtr = recptr;
/* break out of the loop */
replication_started = true;
}
/*
- * Check if the remote end has closed the connection.
+ * Process messages received from the standby.
+ *
+ * ereports on error.
*/
static void
-CheckClosedConnection(void)
+ProcessStreamMsgs(StringInfo inMsg)
{
- unsigned char firstchar;
- int r;
+ bool acked = false;
- r = pq_getbyte_if_available(&firstchar);
- if (r < 0)
- {
- /* unexpected error or EOF */
- ereport(COMMERROR,
- (errcode(ERRCODE_PROTOCOL_VIOLATION),
- errmsg("unexpected EOF on standby connection")));
- proc_exit(0);
- }
- if (r == 0)
+ /* Loop to process successive complete messages available */
+ for (;;)
{
- /* no data available without blocking */
- return;
- }
+ unsigned char firstchar;
+ int r;
+
+ r = pq_getbyte_if_available(&firstchar);
+ if (r < 0)
+ {
+ /* unexpected error or EOF */
+ ereport(COMMERROR,
+ (errcode(ERRCODE_PROTOCOL_VIOLATION),
+ errmsg("unexpected EOF on standby connection")));
+ proc_exit(0);
+ }
+ if (r == 0)
+ {
+ /* no data available without blocking */
+ break;
+ }
+
+ /* Handle the very limited subset of commands expected in this phase */
+ switch (firstchar)
+ {
+ case 'd': /* CopyData message */
+ {
+ unsigned char rpltype;
+
+ /*
+ * Read the message contents. This is expected to be done without
+ * blocking because we've been able to get message type code.
+ */
+ if (pq_getmessage(inMsg, 0))
+ proc_exit(0); /* suitable message already logged */
+
+ /* Read the replication message type from CopyData message */
+ rpltype = pq_getmsgbyte(inMsg);
+ switch (rpltype)
+ {
+ case 'l':
+ {
+ WalAckMessageData *msgdata;
+
+ msgdata = (WalAckMessageData *) pq_getmsgbytes(inMsg, sizeof(WalAckMessageData));
+
+ /*
+ * Update local status.
+ *
+ * The ackd ptr received from standby should not
+ * go backwards.
+ */
+ if (XLByteLE(ackdPtr, msgdata->ackEnd))
+ ackdPtr = msgdata->ackEnd;
+ else
+ ereport(FATAL,
+ (errmsg("replication completion location went back from "
+ "%X/%X to %X/%X",
+ ackdPtr.xlogid, ackdPtr.xrecoff,
+ msgdata->ackEnd.xlogid, msgdata->ackEnd.xrecoff)));
+
+ acked = true; /* also need to update shared position */
+ break;
+ }
+ default:
+ ereport(FATAL,
+ (errcode(ERRCODE_PROTOCOL_VIOLATION),
+ errmsg("invalid replication message type %d",
+ rpltype)));
+ }
+ break;
+ }
- /* Handle the very limited subset of commands expected in this phase */
- switch (firstchar)
- {
/*
* 'X' means that the standby is closing down the socket.
*/
- case 'X':
- proc_exit(0);
+ case 'X':
+ proc_exit(0);
- default:
- ereport(FATAL,
- (errcode(ERRCODE_PROTOCOL_VIOLATION),
- errmsg("invalid standby closing message type %d",
- firstchar)));
+ default:
+ ereport(FATAL,
+ (errcode(ERRCODE_PROTOCOL_VIOLATION),
+ errmsg("invalid standby closing message type %d",
+ firstchar)));
+ }
}
+
+ if (acked)
+ {
+ /* use volatile pointer to prevent code rearrangement */
+ volatile WalSnd *walsnd = MyWalSnd;
+
+ SpinLockAcquire(&walsnd->mutex);
+ walsnd->ackdPtr = ackdPtr;
+ SpinLockRelease(&walsnd->mutex);
+ }
}
/* Main loop of walsender process */
static int
WalSndLoop(void)
{
+ StringInfoData input_message;
char *output_message;
bool caughtup = false;
+ initStringInfo(&input_message);
+
/*
* Allocate buffer that will be used for each output message. We do this
* just once to reduce palloc overhead. The buffer must be made large
*/
walsnd->pid = MyProcPid;
MemSet(&walsnd->sentPtr, 0, sizeof(XLogRecPtr));
+ MemSet(&walsnd->ackdPtr, 0, sizeof(XLogRecPtr));
SpinLockRelease(&walsnd->mutex);
/* don't need the lock anymore */
OwnLatch((Latch *) &walsnd->latch);