OSDN Git Service

Make walsender receive not only Terminate message but also XLogRecPtr one.
authorMasaoFujii <masao.fujii@gmail.com>
Thu, 7 Oct 2010 11:32:13 +0000 (20:32 +0900)
committerMasaoFujii <masao.fujii@gmail.com>
Thu, 7 Oct 2010 11:32:13 +0000 (20:32 +0900)
Conflicts:

src/backend/replication/walsender.c

src/backend/replication/walsender.c
src/include/replication/walsender.h

index 23de867..8c8e564 100644 (file)
@@ -82,6 +82,13 @@ static uint32 sendOff = 0;
  */
 static XLogRecPtr sentPtr = {0, 0};
 
+/*
+ * How far have we completed replication already? This is also
+ * advertised in MyWalSnd->ackdPtr. This is not used in asynchronous
+ * replication case.
+ */
+static XLogRecPtr ackdPtr = {0, 0};
+
 /* Flags set by signal handlers for later service in main loop */
 static volatile sig_atomic_t got_SIGHUP = false;
 static volatile sig_atomic_t shutdown_requested = false;
@@ -101,7 +108,7 @@ static void WalSndHandshake(void);
 static void WalSndKill(int code, Datum arg);
 static void XLogRead(char *buf, XLogRecPtr recptr, Size nbytes);
 static bool XLogSend(char *msgbuf, bool *caughtup);
-static void CheckClosedConnection(void);
+static void ProcessStreamMsgs(StringInfo inMsg);
 
 
 /* Main entry point for walsender process */
@@ -293,10 +300,10 @@ WalSndHandshake(void)
                                                pq_flush();
 
                                                /*
-                                                * Initialize position to the received one, then the
+                                                * Initialize positions to the received one, then the
                                                 * xlog records begin to be shipped from that position
                                                 */
-                                               sentPtr = recptr;
+                                               sentPtr = ackdPtr = recptr;
 
                                                /* break out of the loop */
                                                replication_started = true;
@@ -330,53 +337,122 @@ WalSndHandshake(void)
 }
 
 /*
- * Check if the remote end has closed the connection.
+ * Process messages received from the standby.
+ *
+ * ereports on error.
  */
 static void
-CheckClosedConnection(void)
+ProcessStreamMsgs(StringInfo inMsg)
 {
-       unsigned char firstchar;
-       int                     r;
+       bool    acked = false;
 
-       r = pq_getbyte_if_available(&firstchar);
-       if (r < 0)
-       {
-               /* unexpected error or EOF */
-               ereport(COMMERROR,
-                               (errcode(ERRCODE_PROTOCOL_VIOLATION),
-                                errmsg("unexpected EOF on standby connection")));
-               proc_exit(0);
-       }
-       if (r == 0)
+       /* Loop to process successive complete messages available */
+       for (;;)
        {
-               /* no data available without blocking */
-               return;
-       }
+               unsigned char firstchar;
+               int                     r;
+
+               r = pq_getbyte_if_available(&firstchar);
+               if (r < 0)
+               {
+                       /* unexpected error or EOF */
+                       ereport(COMMERROR,
+                                       (errcode(ERRCODE_PROTOCOL_VIOLATION),
+                                        errmsg("unexpected EOF on standby connection")));
+                       proc_exit(0);
+               }
+               if (r == 0)
+               {
+                       /* no data available without blocking */
+                       break;
+               }
+
+               /* Handle the very limited subset of commands expected in this phase */
+               switch (firstchar)
+               {
+                       case 'd':       /* CopyData message */
+                       {
+                               unsigned char   rpltype;
+
+                               /*
+                                * Read the message contents. This is expected to be done without
+                                * blocking because we've been able to get message type code.
+                                */
+                               if (pq_getmessage(inMsg, 0))
+                                       proc_exit(0);           /* suitable message already logged */
+
+                               /* Read the replication message type from CopyData message */
+                               rpltype = pq_getmsgbyte(inMsg);
+                               switch (rpltype)
+                               {
+                                       case 'l':
+                                       {
+                                               WalAckMessageData  *msgdata;
+
+                                               msgdata = (WalAckMessageData *) pq_getmsgbytes(inMsg, sizeof(WalAckMessageData));
+
+                                               /*
+                                                * Update local status.
+                                                *
+                                                * The ackd ptr received from standby should not
+                                                * go backwards.
+                                                */
+                                               if (XLByteLE(ackdPtr, msgdata->ackEnd))
+                                                       ackdPtr = msgdata->ackEnd;
+                                               else
+                                                       ereport(FATAL,
+                                                                       (errmsg("replication completion location went back from "
+                                                                                       "%X/%X to %X/%X",
+                                                                                       ackdPtr.xlogid, ackdPtr.xrecoff,
+                                                                                       msgdata->ackEnd.xlogid, msgdata->ackEnd.xrecoff)));
+
+                                               acked = true;   /* also need to update shared position */
+                                               break;
+                                       }
+                                       default:
+                                               ereport(FATAL,
+                                                               (errcode(ERRCODE_PROTOCOL_VIOLATION),
+                                                                errmsg("invalid replication message type %d",
+                                                                               rpltype)));
+                               }
+                               break;
+                       }
 
-       /* Handle the very limited subset of commands expected in this phase */
-       switch (firstchar)
-       {
                        /*
                         * 'X' means that the standby is closing down the socket.
                         */
-               case 'X':
-                       proc_exit(0);
+                       case 'X':
+                               proc_exit(0);
 
-               default:
-                       ereport(FATAL,
-                                       (errcode(ERRCODE_PROTOCOL_VIOLATION),
-                                        errmsg("invalid standby closing message type %d",
-                                                       firstchar)));
+                       default:
+                               ereport(FATAL,
+                                               (errcode(ERRCODE_PROTOCOL_VIOLATION),
+                                                errmsg("invalid standby closing message type %d",
+                                                               firstchar)));
+               }
        }
+
+       if (acked)
+       {
+               /* use volatile pointer to prevent code rearrangement */
+               volatile WalSnd *walsnd = MyWalSnd;
+
+               SpinLockAcquire(&walsnd->mutex);
+               walsnd->ackdPtr = ackdPtr;
+               SpinLockRelease(&walsnd->mutex);
+       }
 }
 
 /* Main loop of walsender process */
 static int
 WalSndLoop(void)
 {
+       StringInfoData  input_message;
        char       *output_message;
        bool            caughtup = false;
 
+       initStringInfo(&input_message);
+
        /*
         * Allocate buffer that will be used for each output message.  We do this
         * just once to reduce palloc overhead.  The buffer must be made large
@@ -513,6 +589,7 @@ InitWalSnd(void)
                         */
                        walsnd->pid = MyProcPid;
                        MemSet(&walsnd->sentPtr, 0, sizeof(XLogRecPtr));
+                       MemSet(&walsnd->ackdPtr, 0, sizeof(XLogRecPtr));
                        SpinLockRelease(&walsnd->mutex);
                        /* don't need the lock anymore */
                        OwnLatch((Latch *) &walsnd->latch);
index 87e0120..9bb0d0f 100644 (file)
@@ -23,6 +23,7 @@ typedef struct WalSnd
 {
        pid_t           pid;                    /* this walsender's process id, or 0 */
        XLogRecPtr      sentPtr;                /* WAL has been sent up to this point */
+       XLogRecPtr      ackdPtr;                /* WAL has been replicated up to this point */
 
        slock_t         mutex;                  /* locks shared variables shown above */