static volatile sig_atomic_t ready_to_stop = false;
/* Flag set by signal handler of backends for replication */
-static volatile sig_atomic_t replication_done = false;
+static volatile sig_atomic_t replication_complete = false;
+static volatile sig_atomic_t replication_abort = false;
/* Signal handlers */
static void WalSndSigHupHandler(SIGNAL_ARGS);
static void RegisterWalSndWaiter(BackendId backendId, XLogRecPtr record,
Latch *latch);
-static void WakeupWalSndWaiters(XLogRecPtr record);
+static void WakeupWalSndWaiters(XLogRecPtr record, bool abort);
static XLogRecPtr GetOldestAckdPtr(void);
/* Wake up the backends that this walsender had been blocking */
if (MyWalSnd->walSndState >= WALSND_CATCHUP)
- WakeupWalSndWaiters(ackdPtr);
+ WakeupWalSndWaiters(ackdPtr, false);
}
/* Main loop of walsender process */
{
/* use volatile pointer to prevent code rearrangement */
volatile WalSnd *walsnd = MyWalSnd;
- bool needwakeup;
+ XLogRecPtr oldestPtr;
+ WalSndState saveState;
Assert(MyWalSnd != NULL);
if (!standalone_disabled && PostmasterIsAlive(true))
ExecuteStandbyFencingCommand();
- /* Wake up the backends that this walsender had been blocking */
- needwakeup = (walsnd->walSndState >= WALSND_CATCHUP);
+ /*
+ * If this walsender had been the last synchronous one, we wake up
+ * all the backends waiting for replication.
+ */
+ saveState = walsnd->walSndState;
SpinLockAcquire(&walsnd->mutex);
walsnd->walSndState = WALSND_INIT;
SpinLockRelease(&walsnd->mutex);
- if (needwakeup)
- WakeupWalSndWaiters(GetOldestAckdPtr());
+ oldestPtr = GetOldestAckdPtr();
+ if (saveState >= WALSND_CATCHUP &&
+ oldestPtr.xlogid == 0 && oldestPtr.xrecoff == 0)
+ WakeupWalSndWaiters(oldestPtr, standalone_disabled);
/*
* Mark WalSnd struct no longer in use. Assume that no lock is required
WaitLatch(&MyProc->latch, 1000000L);
/* If done already, we finish waiting */
- if (replication_done)
+ if (replication_complete)
{
- replication_done = false;
+ replication_complete = false;
return;
}
+
+ /*
+ * If the primary cannot work any longer, we don't return
+ * a success to the client
+ */
+ if (replication_abort)
+ {
+ replication_abort = false;
+ ereport(FATAL,
+ (errcode(ERRCODE_ADMIN_SHUTDOWN),
+ errmsg("terminating connection due to administrator command "
+ "and replication cancellation")));
+ }
}
}
* Wake up the backends waiting until replication has been completed
* up to the position older than or equal to the given one.
*
+ * If abort is true, we signal the waiters to exit with FATAL error
+ * before returning a success to the client.
+ *
* Wake up all waiters if InvalidXLogRecPtr is given.
*/
static void
-WakeupWalSndWaiters(XLogRecPtr record)
+WakeupWalSndWaiters(XLogRecPtr record, bool abort)
{
/* use volatile pointer to prevent code rearrangement */
volatile WalSndCtlData *walsndctl = WalSndCtl;
if (all_wakeup || XLByteLE(waiter->record, record))
{
- SetProcLatch(waiter->latch, PROCSIG_REPLICATION_INTERRUPT,
- waiter->backendId);
+ SetProcLatch(waiter->latch, abort ? PROCSIG_REPLICATION_ABORT :
+ PROCSIG_REPLICATION_COMPLETE, waiter->backendId);
count++;
}
else
}
/*
- * This is called when PROCSIG_REPLICATION_INTERRUPT is received.
+ * This is called when PROCSIG_REPLICATION_COMPLETE is received.
+ */
+void
+HandleReplicationComplete(void)
+{
+ replication_complete = true;
+}
+
+/*
+ * This is called when PROCSIG_REPLICATION_ABORT is received.
*/
void
-HandleReplicationInterrupt(void)
+HandleReplicationAbort(void)
{
- replication_done = true;
+ replication_abort = true;
}