OSDN Git Service

Change walsender so that it sends a XLogCatchupComplete message
authorMasaoFujii <masao.fujii@gmail.com>
Tue, 7 Dec 2010 13:59:40 +0000 (22:59 +0900)
committerMasaoFujii <masao.fujii@gmail.com>
Tue, 7 Dec 2010 13:59:40 +0000 (22:59 +0900)
to the standby when it has caught up with the primary.

src/backend/replication/walreceiver.c
src/backend/replication/walreceiverfuncs.c
src/backend/replication/walsender.c
src/include/catalog/pg_proc.h
src/include/replication/walreceiver.h
src/include/replication/walsender.h

index 82188d5..6ff6e27 100644 (file)
@@ -267,6 +267,14 @@ WalReceiverMain(void)
        walrcv_connect(conninfo, startpoint);
        DisableWalRcvImmediateExit();
 
+       /*
+        * Once we succeeded in starting replication, we regard the standby
+        * as out-of-date until it has caught up with the primary.
+        */
+       SpinLockAcquire(&walrcv->mutex);
+       walrcv->reachSync = false;
+       SpinLockRelease(&walrcv->mutex);
+
        /* Loop until end-of-streaming or error */
        for (;;)
        {
@@ -444,6 +452,16 @@ XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len)
                                XLogWalRcvWrite(buf, len, msghdr.dataStart);
                                break;
                        }
+               case 'c':                               /* catchup complete */
+                       {
+                               /* use volatile pointer to prevent code rearrangement */
+                               volatile WalRcvData *walrcv = WalRcv;
+
+                               SpinLockAcquire(&walrcv->mutex);
+                               walrcv->reachSync = true;
+                               SpinLockRelease(&walrcv->mutex);
+                               break;
+                       }
                default:
                        ereport(ERROR,
                                        (errcode(ERRCODE_PROTOCOL_VIOLATION),
index 2ccaedb..c674121 100644 (file)
@@ -229,3 +229,20 @@ GetWalRcvWriteRecPtr(XLogRecPtr *latestChunkStart)
 
        return recptr;
 }
+
+/*
+ * Returns whether the standby has already caught up with the primary.
+ */
+Datum
+pg_is_in_sync(PG_FUNCTION_ARGS)
+{
+       /* use volatile pointer to prevent code rearrangement */
+       volatile WalRcvData *walrcv = WalRcv;
+       bool    ret;
+
+       SpinLockAcquire(&walrcv->mutex);
+       ret = walrcv->reachSync;
+       SpinLockRelease(&walrcv->mutex);
+
+       return ret;
+}
index 34d5d9f..fa96810 100644 (file)
@@ -600,8 +600,8 @@ WalSndLoop(void)
 
                                /*
                                 * switchptr indicates how far we must complete replication
-                                * before advertising that the standby has already been sync
-                                * with the primary.
+                                * before advertising that the standby has already been in
+                                * sync with the primary.
                                 */
                                switchptr = GetFlushRecPtr();
                        }
@@ -663,12 +663,12 @@ WalSndLoop(void)
                        /*
                         * If the standby has caught up with the primary, we change
                         * the state to WALSND_SYNC and inform the standby that it's
-                        * sync with the primary. This state ensures that all the
+                        * in sync with the primary. This state ensures that all the
                         * transactions completed from a client's point of view have
                         * been replicated to the standby.
                         */
                        if (MyWalSnd->walSndState == WALSND_PRESYNC &&
-                               XLByteLE(switchptr, ackdPtr))
+                               XLByteLE(switchptr, ackdPtr) && !pending)
                        {
                                /* use volatile pointer to prevent code rearrangement */
                                volatile WalSnd *walsnd = MyWalSnd;
@@ -676,6 +676,15 @@ WalSndLoop(void)
                                SpinLockAcquire(&walsnd->mutex);
                                walsnd->walSndState = WALSND_SYNC;
                                SpinLockRelease(&walsnd->mutex);
+
+                               /*
+                                * We can send a XLogCatchupComplete message without blocking
+                                * since it's guaranteed that there is no pending data in the
+                                * output buffer.
+                                */
+                               pq_putmessage('d', "c", 1);
+                               if (pq_flush())
+                                       break;
                        }
                }
                else
index 31a9650..d389122 100644 (file)
@@ -3313,6 +3313,8 @@ DESCR("xlog filename, given an xlog location");
 
 DATA(insert OID = 3810 (  pg_is_in_recovery            PGNSP PGUID 12 1 0 0 f f f t f v 0 0 16 "" _null_ _null_ _null_ _null_ pg_is_in_recovery _null_ _null_ _null_ ));
 DESCR("true if server is in recovery");
+DATA(insert OID = 3811 (  pg_is_in_sync                PGNSP PGUID 12 1 0 0 f f f t f v 0 0 16 "" _null_ _null_ _null_ _null_ pg_is_in_sync _null_ _null_ _null_ ));
+DESCR("true if server is in sync with primary");
 
 DATA(insert OID = 3820 ( pg_last_xlog_receive_location PGNSP PGUID 12 1 0 0 f f f t f v 0 0 25 "" _null_ _null_ _null_ _null_ pg_last_xlog_receive_location _null_ _null_ _null_ ));
 DESCR("current xlog flush location");
index e8cae3c..5fa7f80 100644 (file)
@@ -50,6 +50,11 @@ typedef struct
        pg_time_t       startTime;
 
        /*
+        * Has the standby already caught up with the primary?
+        */
+       bool            reachSync;
+
+       /*
         * receivedUpto-1 is the last byte position that has already been
         * received.  When startup process starts the walreceiver, it sets
         * receivedUpto to the point where it wants the streaming to begin. After
@@ -100,5 +105,6 @@ extern void ShutdownWalRcv(void);
 extern bool WalRcvInProgress(void);
 extern void RequestXLogStreaming(XLogRecPtr recptr, const char *conninfo);
 extern XLogRecPtr GetWalRcvWriteRecPtr(XLogRecPtr *latestChunkStart);
+extern Datum pg_is_in_sync(PG_FUNCTION_ARGS);
 
 #endif   /* _WALRECEIVER_H */
index a8996b0..6186cf8 100644 (file)
@@ -25,7 +25,7 @@ typedef enum
        WALSND_ASYNC,           /* performing asynchronous replication */
        WALSND_CATCHUP,         /* bulk-sending WAL for standby to catch up */
        WALSND_PRESYNC,         /* sent all the WAL required to get into sync */
-       WALSND_SYNC                     /* sync with standby */
+       WALSND_SYNC                     /* in sync with standby */
 } WalSndState;
 
 /*