OSDN Git Service

When the primary cannot work any longer, for example, because of
authorMasaoFujii <masao.fujii@gmail.com>
Tue, 14 Dec 2010 13:56:11 +0000 (22:56 +0900)
committerMasaoFujii <masao.fujii@gmail.com>
Wed, 15 Dec 2010 02:22:48 +0000 (11:22 +0900)
shutdown, walsender tries to signal the waiters to exit with fatal
error before returning a success to the client.

src/backend/replication/walsender.c
src/backend/storage/ipc/procsignal.c
src/include/replication/walsender.h
src/include/storage/procsignal.h

index f333902..aba5da9 100644 (file)
@@ -113,7 +113,8 @@ static volatile sig_atomic_t shutdown_requested = false;
 static volatile sig_atomic_t ready_to_stop = false;
 
 /* Flag set by signal handler of backends for replication */
-static volatile sig_atomic_t replication_done = false;
+static volatile sig_atomic_t replication_complete = false;
+static volatile sig_atomic_t replication_abort = false;
 
 /* Signal handlers */
 static void WalSndSigHupHandler(SIGNAL_ARGS);
@@ -135,7 +136,7 @@ static void ExecuteStandbyFencingCommand(void);
 
 static void RegisterWalSndWaiter(BackendId backendId, XLogRecPtr record,
                                                                 Latch *latch);
-static void WakeupWalSndWaiters(XLogRecPtr record);
+static void WakeupWalSndWaiters(XLogRecPtr record, bool abort);
 static XLogRecPtr GetOldestAckdPtr(void);
 
 
@@ -506,7 +507,7 @@ ProcessStreamMsgs(StringInfo inMsg)
 
        /* Wake up the backends that this walsender had been blocking */
        if (MyWalSnd->walSndState >= WALSND_CATCHUP)
-               WakeupWalSndWaiters(ackdPtr);
+               WakeupWalSndWaiters(ackdPtr, false);
 }
 
 /* Main loop of walsender process */
@@ -776,7 +777,8 @@ WalSndKill(int code, Datum arg)
 {
        /* use volatile pointer to prevent code rearrangement */
        volatile WalSnd *walsnd = MyWalSnd;
-       bool                    needwakeup;
+       XLogRecPtr              oldestPtr;
+       WalSndState             saveState;
 
        Assert(MyWalSnd != NULL);
 
@@ -790,13 +792,18 @@ WalSndKill(int code, Datum arg)
        if (!standalone_disabled && PostmasterIsAlive(true))
                ExecuteStandbyFencingCommand();
 
-       /* Wake up the backends that this walsender had been blocking */
-       needwakeup = (walsnd->walSndState >= WALSND_CATCHUP);
+       /*
+        * If this walsender had been the last synchronous one, we wake up
+        * all the backends waiting for replication.
+        */
+       saveState = walsnd->walSndState;
        SpinLockAcquire(&walsnd->mutex);
        walsnd->walSndState = WALSND_INIT;
        SpinLockRelease(&walsnd->mutex);
-       if (needwakeup)
-               WakeupWalSndWaiters(GetOldestAckdPtr());
+       oldestPtr = GetOldestAckdPtr();
+       if (saveState >= WALSND_CATCHUP &&
+               oldestPtr.xlogid == 0 && oldestPtr.xrecoff == 0)
+               WakeupWalSndWaiters(oldestPtr, standalone_disabled);
 
        /*
         * Mark WalSnd struct no longer in use. Assume that no lock is required
@@ -1437,11 +1444,24 @@ WaitXLogSend(XLogRecPtr record)
                WaitLatch(&MyProc->latch, 1000000L);
 
                /* If done already, we finish waiting */
-               if (replication_done)
+               if (replication_complete)
                {
-                       replication_done = false;
+                       replication_complete = false;
                        return;
                }
+
+               /*
+                * If the primary cannot work any longer, we don't return
+                * a success to the client
+                */
+               if (replication_abort)
+               {
+                       replication_abort = false;
+                       ereport(FATAL,
+                                       (errcode(ERRCODE_ADMIN_SHUTDOWN),
+                                        errmsg("terminating connection due to administrator command "
+                                                       "and replication cancellation")));
+               }
        }
 }
 
@@ -1492,10 +1512,13 @@ RegisterWalSndWaiter(BackendId backendId, XLogRecPtr record, Latch *latch)
  * Wake up the backends waiting until replication has been completed
  * up to the position older than or equal to the given one.
  *
+ * If abort is true, we signal the waiters to exit with FATAL error
+ * before returning a success to the client.
+ *
  * Wake up all waiters if InvalidXLogRecPtr is given.
    */
 static void
-WakeupWalSndWaiters(XLogRecPtr record)
+WakeupWalSndWaiters(XLogRecPtr record, bool abort)
 {
        /* use volatile pointer to prevent code rearrangement */
        volatile WalSndCtlData  *walsndctl = WalSndCtl;
@@ -1512,8 +1535,8 @@ WakeupWalSndWaiters(XLogRecPtr record)
 
                if (all_wakeup || XLByteLE(waiter->record, record))
                {
-                       SetProcLatch(waiter->latch, PROCSIG_REPLICATION_INTERRUPT,
-                                                waiter->backendId);
+                       SetProcLatch(waiter->latch, abort ? PROCSIG_REPLICATION_ABORT :
+                                                PROCSIG_REPLICATION_COMPLETE, waiter->backendId);
                        count++;
                }
                else
@@ -1576,10 +1599,19 @@ GetOldestAckdPtr(void)
 }
 
 /*
- * This is called when PROCSIG_REPLICATION_INTERRUPT is received.
+ * This is called when PROCSIG_REPLICATION_COMPLETE is received.
+ */
+void
+HandleReplicationComplete(void)
+{
+       replication_complete = true;
+}
+
+/*
+ * This is called when PROCSIG_REPLICATION_ABORT is received.
  */
 void
-HandleReplicationInterrupt(void)
+HandleReplicationAbort(void)
 {
-       replication_done = true;
+       replication_abort = true;
 }
index 4f416ef..c30a28f 100644 (file)
@@ -290,8 +290,11 @@ procsignal_sigusr1_handler(SIGNAL_ARGS)
        if (CheckProcSignal(PROCSIG_RECOVERY_CONFLICT_BUFFERPIN))
                RecoveryConflictInterrupt(PROCSIG_RECOVERY_CONFLICT_BUFFERPIN);
 
-       if (CheckProcSignal(PROCSIG_REPLICATION_INTERRUPT))
-               HandleReplicationInterrupt();
+       if (CheckProcSignal(PROCSIG_REPLICATION_COMPLETE))
+               HandleReplicationComplete();
+
+       if (CheckProcSignal(PROCSIG_REPLICATION_ABORT))
+               HandleReplicationAbort();
 
        latch_sigusr1_handler();
 
index 6186cf8..caee109 100644 (file)
@@ -88,6 +88,7 @@ extern void WalSndShmemInit(void);
 extern void WalSndWakeup(void);
 extern void WaitXLogSend(XLogRecPtr record);
 
-extern void HandleReplicationInterrupt(void);
+extern void HandleReplicationComplete(void);
+extern void HandleReplicationAbort(void);
 
 #endif   /* _WALSENDER_H */
index 9f0b642..05158c5 100644 (file)
@@ -40,7 +40,9 @@ typedef enum
        PROCSIG_RECOVERY_CONFLICT_BUFFERPIN,
        PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK,
 
-       PROCSIG_REPLICATION_INTERRUPT,  /* replication interrupt */
+       /* Replication interrupts */
+       PROCSIG_REPLICATION_COMPLETE,
+       PROCSIG_REPLICATION_ABORT,
 
        NUM_PROCSIGNALS                         /* Must be last! */
 } ProcSignalReason;