OSDN Git Service

Make walreceiver include its replication mode in START_REPLICATION
authorMasaoFujii <masao.fujii@gmail.com>
Mon, 22 Nov 2010 11:56:50 +0000 (20:56 +0900)
committerMasaoFujii <masao.fujii@gmail.com>
Mon, 22 Nov 2010 11:56:50 +0000 (20:56 +0900)
query which is sent in a handshake processing.

src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
src/backend/replication/walsender.c
src/backend/utils/misc/guc.c
src/include/access/xlog.h
src/include/replication/walsender.h

index 786bc43..e4cee68 100644 (file)
@@ -156,8 +156,9 @@ libpqrcv_connect(char *conninfo, XLogRecPtr startpoint)
        ThisTimeLineID = primary_tli;
 
        /* Start streaming from the point requested by startup process */
-       snprintf(cmd, sizeof(cmd), "START_REPLICATION %X/%X",
-                        startpoint.xlogid, startpoint.xrecoff);
+       snprintf(cmd, sizeof(cmd), "START_REPLICATION %X/%X MODE %s",
+                        startpoint.xlogid, startpoint.xrecoff,
+                        GetConfigOption("replication_mode", false));
        res = libpqrcv_PQexec(cmd);
        if (PQresultStatus(res) != PGRES_COPY_OUT)
        {
index 2328733..b5cd3bd 100644 (file)
@@ -89,6 +89,9 @@ static XLogRecPtr sentPtr = {0, 0};
  */
 static XLogRecPtr ackdPtr = {0, 0};
 
+/* Replication mode requested by connected standby */
+static int     rplMode = REPLICATION_MODE_ASYNC;
+
 /* Flags set by signal handlers for later service in main loop */
 static volatile sig_atomic_t got_SIGHUP = false;
 static volatile sig_atomic_t shutdown_requested = false;
@@ -212,6 +215,8 @@ WalSndHandshake(void)
                /* Handle the very limited subset of commands expected in this phase */
                switch (firstchar)
                {
+                       char            rplModeStr[6];
+
                        case 'Q':                       /* Query message */
                                {
                                        const char *query_string;
@@ -272,8 +277,8 @@ WalSndHandshake(void)
                                                ReadyForQuery(DestRemote);
                                                /* ReadyForQuery did pq_flush for us */
                                        }
-                                       else if (sscanf(query_string, "START_REPLICATION %X/%X",
-                                                                       &recptr.xlogid, &recptr.xrecoff) == 2)
+                                       else if (sscanf(query_string, "START_REPLICATION %X/%X MODE %5s",
+                                                                       &recptr.xlogid, &recptr.xrecoff, rplModeStr) == 3)
                                        {
                                                StringInfoData buf;
 
@@ -294,6 +299,25 @@ WalSndHandshake(void)
                                                                        (errcode(ERRCODE_CANNOT_CONNECT_NOW),
                                                                         errmsg("standby connections not allowed because wal_level=minimal")));
 
+                                               /* Verify that the specified replication mode is valid */
+                                               {
+                                                       const struct config_enum_entry *entry;
+
+                                                       for (entry = replication_mode_options; entry && entry->name; entry++)
+                                                       {
+                                                               if (strcmp(rplModeStr, entry->name) == 0)
+                                                               {
+                                                                       rplMode = entry->val;
+                                                                       break;
+                                                               }
+                                                       }
+                                                       if (entry == NULL || entry->name == NULL)
+                                                               ereport(FATAL,
+                                                                               (errcode(ERRCODE_PROTOCOL_VIOLATION),
+                                                                                errmsg("invalid replication mode: %s", rplModeStr)));
+                                               }
+                                               MyWalSnd->rplMode = rplMode;
+
                                                /* Send a CopyXLogResponse message, and start streaming */
                                                pq_beginmessage(&buf, 'W');
                                                pq_endmessage(&buf);
index d13a364..8bef823 100644 (file)
@@ -339,7 +339,6 @@ static const struct config_enum_entry constraint_exclusion_options[] = {
  * Options for enum values stored in other modules
  */
 extern const struct config_enum_entry wal_level_options[];
-extern const struct config_enum_entry replication_mode_options[];
 extern const struct config_enum_entry sync_method_options[];
 
 /*
index a4a1f33..3d2f538 100644 (file)
@@ -15,6 +15,7 @@
 #include "access/xlogdefs.h"
 #include "lib/stringinfo.h"
 #include "storage/buf.h"
+#include "utils/guc.h"
 #include "utils/pg_crc.h"
 #include "utils/timestamp.h"
 
@@ -244,6 +245,7 @@ typedef enum ReplicationMode
        REPLICATION_MODE_APPLY
 } ReplicationMode;
 extern int     replication_mode;
+extern const struct config_enum_entry replication_mode_options[];
 
 #ifdef WAL_DEBUG
 extern bool XLOG_DEBUG;
index 9bb0d0f..a3dec47 100644 (file)
@@ -25,6 +25,8 @@ typedef struct WalSnd
        XLogRecPtr      sentPtr;                /* WAL has been sent up to this point */
        XLogRecPtr      ackdPtr;                /* WAL has been replicated up to this point */
 
+       int                     rplMode;                /* replication mode */
+
        slock_t         mutex;                  /* locks shared variables shown above */
 
        /*