to the standby when it has caught up with the primary.
walrcv_connect(conninfo, startpoint);
DisableWalRcvImmediateExit();
+ /*
+ * Once we succeeded in starting replication, we regard the standby
+ * as out-of-date until it has caught up with the primary.
+ */
+ SpinLockAcquire(&walrcv->mutex);
+ walrcv->reachSync = false;
+ SpinLockRelease(&walrcv->mutex);
+
/* Loop until end-of-streaming or error */
for (;;)
{
XLogWalRcvWrite(buf, len, msghdr.dataStart);
break;
}
+ case 'c': /* catchup complete */
+ {
+ /* use volatile pointer to prevent code rearrangement */
+ volatile WalRcvData *walrcv = WalRcv;
+
+ SpinLockAcquire(&walrcv->mutex);
+ walrcv->reachSync = true;
+ SpinLockRelease(&walrcv->mutex);
+ break;
+ }
default:
ereport(ERROR,
(errcode(ERRCODE_PROTOCOL_VIOLATION),
return recptr;
}
+
+/*
+ * Returns whether the standby has already caught up with the primary.
+ */
+Datum
+pg_is_in_sync(PG_FUNCTION_ARGS)
+{
+ /* use volatile pointer to prevent code rearrangement */
+ volatile WalRcvData *walrcv = WalRcv;
+ bool ret;
+
+ SpinLockAcquire(&walrcv->mutex);
+ ret = walrcv->reachSync;
+ SpinLockRelease(&walrcv->mutex);
+
+ return ret;
+}
/*
* switchptr indicates how far we must complete replication
- * before advertising that the standby has already been sync
- * with the primary.
+ * before advertising that the standby has already been in
+ * sync with the primary.
*/
switchptr = GetFlushRecPtr();
}
/*
* If the standby has caught up with the primary, we change
* the state to WALSND_SYNC and inform the standby that it's
- * sync with the primary. This state ensures that all the
+ * in sync with the primary. This state ensures that all the
* transactions completed from a client's point of view have
* been replicated to the standby.
*/
if (MyWalSnd->walSndState == WALSND_PRESYNC &&
- XLByteLE(switchptr, ackdPtr))
+ XLByteLE(switchptr, ackdPtr) && !pending)
{
/* use volatile pointer to prevent code rearrangement */
volatile WalSnd *walsnd = MyWalSnd;
SpinLockAcquire(&walsnd->mutex);
walsnd->walSndState = WALSND_SYNC;
SpinLockRelease(&walsnd->mutex);
+
+ /*
+ * We can send a XLogCatchupComplete message without blocking
+ * since it's guaranteed that there is no pending data in the
+ * output buffer.
+ */
+ pq_putmessage('d', "c", 1);
+ if (pq_flush())
+ break;
}
}
else
DATA(insert OID = 3810 ( pg_is_in_recovery PGNSP PGUID 12 1 0 0 f f f t f v 0 0 16 "" _null_ _null_ _null_ _null_ pg_is_in_recovery _null_ _null_ _null_ ));
DESCR("true if server is in recovery");
+DATA(insert OID = 3811 ( pg_is_in_sync PGNSP PGUID 12 1 0 0 f f f t f v 0 0 16 "" _null_ _null_ _null_ _null_ pg_is_in_sync _null_ _null_ _null_ ));
+DESCR("true if server is in sync with primary");
DATA(insert OID = 3820 ( pg_last_xlog_receive_location PGNSP PGUID 12 1 0 0 f f f t f v 0 0 25 "" _null_ _null_ _null_ _null_ pg_last_xlog_receive_location _null_ _null_ _null_ ));
DESCR("current xlog flush location");
pg_time_t startTime;
/*
+ * Has the standby already caught up with the primary?
+ */
+ bool reachSync;
+
+ /*
* receivedUpto-1 is the last byte position that has already been
* received. When startup process starts the walreceiver, it sets
* receivedUpto to the point where it wants the streaming to begin. After
extern bool WalRcvInProgress(void);
extern void RequestXLogStreaming(XLogRecPtr recptr, const char *conninfo);
extern XLogRecPtr GetWalRcvWriteRecPtr(XLogRecPtr *latestChunkStart);
+extern Datum pg_is_in_sync(PG_FUNCTION_ARGS);
#endif /* _WALRECEIVER_H */
WALSND_ASYNC, /* performing asynchronous replication */
WALSND_CATCHUP, /* bulk-sending WAL for standby to catch up */
WALSND_PRESYNC, /* sent all the WAL required to get into sync */
- WALSND_SYNC /* sync with standby */
+ WALSND_SYNC /* in sync with standby */
} WalSndState;
/*