OSDN Git Service

Merge branch 'pgrex90-base' into pgrex90
authorMasaoFujii <masao.fujii@gmail.com>
Mon, 13 Dec 2010 09:12:21 +0000 (18:12 +0900)
committerMasaoFujii <masao.fujii@gmail.com>
Mon, 13 Dec 2010 09:12:21 +0000 (18:12 +0900)
Conflicts:
doc/src/sgml/protocol.sgml
src/backend/replication/walsender.c
src/interfaces/libpq/fe-exec.c
src/interfaces/libpq/fe-protocol2.c
src/interfaces/libpq/fe-protocol3.c
src/interfaces/libpq/libpq-int.h

35 files changed:
doc/src/sgml/protocol.sgml
doc/src/sgml/ref/pg_ctl-ref.sgml
src/backend/access/transam/twophase.c
src/backend/access/transam/xact.c
src/backend/access/transam/xlog.c
src/backend/libpq/pqcomm.c
src/backend/port/unix_latch.c
src/backend/port/win32/socket.c
src/backend/port/win32_latch.c
src/backend/postmaster/postmaster.c
src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
src/backend/replication/walreceiver.c
src/backend/replication/walreceiverfuncs.c
src/backend/replication/walsender.c
src/backend/storage/ipc/Makefile
src/backend/storage/ipc/pmevent.c [new file with mode: 0644]
src/backend/storage/ipc/procsignal.c
src/backend/storage/ipc/standby.c
src/backend/storage/lmgr/proc.c
src/backend/utils/misc/guc.c
src/backend/utils/misc/postgresql.conf.sample
src/bin/initdb/initdb.c
src/bin/pg_ctl/pg_ctl.c
src/include/access/xlog.h
src/include/catalog/pg_proc.h
src/include/libpq/libpq.h
src/include/replication/walprotocol.h
src/include/replication/walreceiver.h
src/include/replication/walsender.h
src/include/storage/latch.h
src/include/storage/lwlock.h
src/include/storage/pmevent.h [new file with mode: 0644]
src/include/storage/proc.h
src/include/storage/procsignal.h
src/interfaces/libpq/fe-exec.c

index edbe19f..048761f 100644 (file)
@@ -1373,6 +1373,41 @@ The commands accepted in walsender mode are:
       <variablelist>
       <varlistentry>
       <term>
+          XLogRecPtr (F)
+      </term>
+      <listitem>
+      <para>
+      <variablelist>
+      <varlistentry>
+      <term>
+          Byte1('l')
+      </term>
+      <listitem>
+      <para>
+          Identifies the message as an acknowledgment of replication.
+      </para>
+      </listitem>
+      </varlistentry>
+      <varlistentry>
+      <term>
+          Byte8
+      </term>
+      <listitem>
+      <para>
+          The end of the WAL data replicated to the standby, given in
+          XLogRecPtr format.
+      </para>
+      </listitem>
+      </varlistentry>
+      </variablelist>
+      </para>
+      </listitem>
+      </varlistentry>
+      </variablelist>
+
+      <variablelist>
+      <varlistentry>
+      <term>
           XLogData (B)
       </term>
       <listitem>
index 196f08f..2889f31 100644 (file)
@@ -77,6 +77,13 @@ PostgreSQL documentation
 
   <cmdsynopsis>
    <command>pg_ctl</command>
+   <arg choice="plain">promote</arg>
+   <arg>-s</arg>
+   <arg>-D <replaceable>datadir</replaceable></arg>
+  </cmdsynopsis>
+
+  <cmdsynopsis>
+   <command>pg_ctl</command>
    <arg choice="plain">reload</arg>
    <arg>-s</arg>
    <arg>-D <replaceable>datadir</replaceable></arg>
@@ -178,6 +185,11 @@ PostgreSQL documentation
   </para>
 
   <para>
+   In <option>promote</option> mode, the standby server that is
+   running in the specified data directory is promoted to the primary.
+  </para>
+
+  <para>
    <option>reload</option> mode simply sends the
    <command>postgres</command> process a <systemitem>SIGHUP</>
    signal, causing it to reread its configuration files
index 6a2140c..46cc18b 100644 (file)
@@ -1070,6 +1070,18 @@ EndPrepare(GlobalTransaction gxact)
 
        END_CRIT_SECTION();
 
+       /*
+        * Wait for WAL to be replicated up to the PREPARE record
+        * if replication is enabled. This operation has to be performed
+        * after the PREPARE record is generated and before other
+        * transactions know that this one has already been prepared.
+        *
+        * XXX: Since the caller prevents cancel/die interrupt, we cannot
+        * process that while waiting. Should we remove this restriction?
+        */
+       if (max_wal_senders > 0)
+               WaitXLogSend(gxact->prepare_lsn);
+
        records.tail = records.head = NULL;
 }
 
@@ -2027,6 +2039,15 @@ RecordTransactionCommitPrepared(TransactionId xid,
        MyProc->inCommit = false;
 
        END_CRIT_SECTION();
+
+       /*
+        * Wait for WAL to be replicated up to the COMMIT PREPARED record
+        * if replication is enabled. This operation has to be performed
+        * after the COMMIT PREPARED record is generated and before other
+        * transactions know that this one has already been committed.
+        */
+       if (max_wal_senders > 0)
+               WaitXLogSend(recptr);
 }
 
 /*
@@ -2106,4 +2127,13 @@ RecordTransactionAbortPrepared(TransactionId xid,
        TransactionIdAbortTree(xid, nchildren, children);
 
        END_CRIT_SECTION();
+
+       /*
+        * Wait for WAL to be replicated up to the ABORT PREPARED record
+        * if replication is enabled. This operation has to be performed
+        * after the ABORT PREPARED record is generated and before other
+        * transactions know that this one has already been aborted.
+        */
+       if (max_wal_senders > 0)
+               WaitXLogSend(recptr);
 }
index 057abe3..1e087ba 100644 (file)
@@ -1119,6 +1119,18 @@ RecordTransactionCommit(void)
        /* Compute latestXid while we have the child XIDs handy */
        latestXid = TransactionIdLatest(xid, nchildren, children);
 
+       /*
+        * Wait for WAL to be replicated up to the COMMIT record if replication
+        * is enabled. This operation has to be performed after the COMMIT record
+        * is generated and before other transactions know that this one has
+        * already been committed.
+        *
+        * XXX: Since the caller prevents cancel/die interrupt, we cannot
+        * process that while waiting. Should we remove this restriction?
+        */
+       if (max_wal_senders > 0)
+               WaitXLogSend(XactLastRecEnd);
+
        /* Reset XactLastRecEnd until the next transaction writes something */
        XactLastRecEnd.xrecoff = 0;
 
index e027d64..239abf9 100644 (file)
@@ -77,6 +77,7 @@ bool          fullPageWrites = true;
 bool           log_checkpoints = false;
 int                    sync_method = DEFAULT_SYNC_METHOD;
 int                    wal_level = WAL_LEVEL_MINIMAL;
+int                    replication_mode = REPLICATION_MODE_ASYNC;
 
 #ifdef WAL_DEBUG
 bool           XLOG_DEBUG = false;
@@ -105,6 +106,14 @@ const struct config_enum_entry wal_level_options[] = {
        {NULL, 0, false}
 };
 
+const struct config_enum_entry replication_mode_options[] = {
+       {"async", REPLICATION_MODE_ASYNC, false},
+       {"recv", REPLICATION_MODE_RECV, false},
+       {"fsync", REPLICATION_MODE_FSYNC, false},
+       {"apply", REPLICATION_MODE_APPLY, false},
+       {NULL, 0, false}
+};
+
 const struct config_enum_entry sync_method_options[] = {
        {"fsync", SYNC_METHOD_FSYNC, false},
 #ifdef HAVE_FSYNC_WRITETHROUGH
@@ -545,6 +554,7 @@ typedef struct xl_parameter_change
  */
 static volatile sig_atomic_t got_SIGHUP = false;
 static volatile sig_atomic_t shutdown_requested = false;
+static volatile sig_atomic_t standby_triggered = false;
 
 /*
  * Flag set when executing a restore command, to tell SIGTERM signal handler
@@ -6911,6 +6921,23 @@ GetFlushRecPtr(void)
 }
 
 /*
+ * GetReplayRecPtr -- Returns the last replay position.
+ */
+XLogRecPtr
+GetReplayRecPtr(void)
+{
+       /* use volatile pointer to prevent code rearrangement */
+       volatile XLogCtlData *xlogctl = XLogCtl;
+       XLogRecPtr      recptr;
+
+       SpinLockAcquire(&xlogctl->info_lck);
+       recptr = xlogctl->recoveryLastRecPtr;
+       SpinLockRelease(&xlogctl->info_lck);
+
+       return recptr;
+}
+
+/*
  * Get the time of the last xlog segment switch
  */
 pg_time_t
@@ -8872,15 +8899,10 @@ pg_last_xlog_receive_location(PG_FUNCTION_ARGS)
 Datum
 pg_last_xlog_replay_location(PG_FUNCTION_ARGS)
 {
-       /* use volatile pointer to prevent code rearrangement */
-       volatile XLogCtlData *xlogctl = XLogCtl;
        XLogRecPtr      recptr;
        char            location[MAXFNAMELEN];
 
-       SpinLockAcquire(&xlogctl->info_lck);
-       recptr = xlogctl->recoveryLastRecPtr;
-       SpinLockRelease(&xlogctl->info_lck);
-
+       recptr = GetReplayRecPtr();
        if (recptr.xlogid == 0 && recptr.xrecoff == 0)
                PG_RETURN_NULL();
 
@@ -9190,6 +9212,14 @@ StartupProcSigUsr1Handler(SIGNAL_ARGS)
        latch_sigusr1_handler();
 }
 
+/* SIGUSR2: set flag to finish recovery */
+static void
+StartupProcTriggerHandler(SIGNAL_ARGS)
+{
+       standby_triggered = true;
+       WakeupRecovery();
+}
+
 /* SIGHUP: set flag to re-read config file at next convenient time */
 static void
 StartupProcSigHupHandler(SIGNAL_ARGS)
@@ -9267,7 +9297,7 @@ StartupProcessMain(void)
                pqsignal(SIGALRM, SIG_IGN);
        pqsignal(SIGPIPE, SIG_IGN);
        pqsignal(SIGUSR1, StartupProcSigUsr1Handler);
-       pqsignal(SIGUSR2, SIG_IGN);
+       pqsignal(SIGUSR2, StartupProcTriggerHandler);
 
        /*
         * Reset some signals that are accepted by postmaster but not here
@@ -9713,7 +9743,8 @@ emode_for_corrupt_record(int emode, XLogRecPtr RecPtr)
 }
 
 /*
- * Check to see if the trigger file exists. If it does, request postmaster
+ * Check to see if the trigger file exists and if standby promotion
+ * has been requested. If either does, request postmaster
  * to shut down walreceiver, wait for it to exit, remove the trigger
  * file, and return true.
  */
@@ -9722,6 +9753,12 @@ CheckForStandbyTrigger(void)
 {
        struct stat stat_buf;
 
+       if (standby_triggered)
+       {
+               ShutdownWalRcv();
+               return true;
+       }
+
        if (TriggerFile == NULL)
                return false;
 
@@ -9731,12 +9768,22 @@ CheckForStandbyTrigger(void)
                                (errmsg("trigger file found: %s", TriggerFile)));
                ShutdownWalRcv();
                unlink(TriggerFile);
+               standby_triggered = true;
                return true;
        }
        return false;
 }
 
 /*
+ * Is the standby triggered?
+ */
+bool
+StandbyIsTriggered(void)
+{
+       return standby_triggered;
+}
+
+/*
  * Wake up startup process to replay newly arrived WAL, or to notice that
  * failover has been requested.
  */
index 8535066..b2454b4 100644 (file)
@@ -56,6 +56,8 @@
  *             pq_putbytes             - send bytes to connection (not flushed until pq_flush)
  *             pq_flush                - flush pending output
  *             pq_getbyte_if_available - get a byte if available without blocking
+ *             pq_putbytes_if_writable - send bytes to connection if writable without blocking
+ *             pq_flush_if_writable    - flush pending output if writable without blocking
  *
  * message-level I/O (and old-style-COPY-OUT cruft):
  *             pq_putmessage   - send a normal message (suppressed in COPY OUT mode)
@@ -112,6 +114,7 @@ static char sock_path[MAXPGPATH];
 
 static char PqSendBuffer[PQ_BUFFER_SIZE];
 static int     PqSendPointer;          /* Next index to store a byte in PqSendBuffer */
+static int     PqSendStart;            /* Next index to send a byte in PqSendBuffer */
 
 static char PqRecvBuffer[PQ_BUFFER_SIZE];
 static int     PqRecvPointer;          /* Next index to read a byte from PqRecvBuffer */
@@ -1153,6 +1156,56 @@ internal_putbytes(const char *s, size_t len)
 }
 
 /* --------------------------------
+ *             pq_putbytes_if_writable - send bytes to connection (not flushed
+ *                     until pq_flush), if writable
+ *
+ * Returns the number of bytes written without blocking, or EOF if trouble.
+ * --------------------------------
+ */
+int
+pq_putbytes_if_writable(const char *s, size_t len)
+{
+       size_t          amount;
+       size_t          nwritten = 0;
+
+       /* Should not be called by old-style COPY OUT */
+       Assert(!DoingCopyOut);
+       /* No-op if reentrant call */
+       if (PqCommBusy)
+               return 0;
+       PqCommBusy = true;
+
+       while (len > 0)
+       {
+               /* If buffer is full, then flush it out */
+               if (PqSendPointer >= PQ_BUFFER_SIZE)
+               {
+                       int             r;
+
+                       r = pq_flush_if_writable();
+                       if (r == 0)
+                               break;
+                       if (r == EOF)
+                       {
+                               PqCommBusy = false;
+                               return r;
+                       }
+               }
+               amount = PQ_BUFFER_SIZE - PqSendPointer;
+               if (amount > len)
+                       amount = len;
+               memcpy(PqSendBuffer + PqSendPointer, s, amount);
+               PqSendPointer += amount;
+               s += amount;
+               len -= amount;
+               nwritten += amount;
+       }
+
+       PqCommBusy = false;
+       return (int) nwritten;
+}
+
+/* --------------------------------
  *             pq_flush                - flush pending output
  *
  *             returns 0 if OK, EOF if trouble
@@ -1224,6 +1277,120 @@ internal_flush(void)
        return 0;
 }
 
+/* --------------------------------
+ *             pq_flush_if_writable - flush pending output if writable
+ *
+ * Returns 1 if OK, 0 if pending output cannot be written without blocking,
+ * or EOF if trouble.
+ * --------------------------------
+ */
+int
+pq_flush_if_writable(void)
+{
+       static int      last_reported_send_errno = 0;
+
+       char       *bufptr = PqSendBuffer + PqSendStart;
+       char       *bufend = PqSendBuffer + PqSendPointer;
+
+       while (bufptr < bufend)
+       {
+               int                     r;
+
+               /* Temporarily put the socket into non-blocking mode */
+#ifdef WIN32
+               pgwin32_noblock = 1;
+#else
+               if (!pg_set_noblock(MyProcPort->sock))
+                       ereport(ERROR,
+                                       (errmsg("could not set socket to non-blocking mode: %m")));
+#endif
+               MyProcPort->noblock = true;
+               PG_TRY();
+               {
+                       r = secure_write(MyProcPort, bufptr, bufend - bufptr);
+
+                       if (r < 0)
+                       {
+                               /*
+                                * Ok if no data writable without blocking or interrupted (though
+                                * EINTR really shouldn't happen with a non-blocking socket).
+                                * Report other errors.
+                                */
+                               if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR)
+                                       r = 0;
+                               else
+                               {
+                                       if (errno != last_reported_send_errno)
+                                       {
+                                               /*
+                                                * Careful: an ereport() that tries to write to the
+                                                * client would cause recursion to here, leading to
+                                                * stack overflow and core dump!  This message must
+                                                * go *only* to the postmaster log.
+                                                *
+                                                * If a client disconnects while we're in the midst
+                                                * of output, we might write quite a bit of data before
+                                                * we get to a safe query abort point.  So, suppress
+                                                * duplicate log messages.
+                                                */
+                                               last_reported_send_errno = errno;
+                                               ereport(COMMERROR,
+                                                               (errcode_for_socket_access(),
+                                                                errmsg("could not send data to client: %m")));
+                                       }
+
+                                       /*
+                                        * We drop the buffered data anyway so that processing can
+                                        * continue, even though we'll probably quit soon.
+                                        */
+                                       PqSendStart = PqSendPointer = 0;
+                                       r = EOF;
+                               }
+                       }
+                       else if (r == 0)
+                       {
+                               /* EOF detected */
+                               r = EOF;
+                       }
+               }
+               PG_CATCH();
+               {
+                       /*
+                        * The rest of the backend code assumes the socket is in blocking
+                        * mode, so treat failure as FATAL.
+                        */
+#ifdef WIN32
+                       pgwin32_noblock = 0;
+#else
+                       if (!pg_set_block(MyProcPort->sock))
+                               ereport(FATAL,
+                                               (errmsg("could not set socket to blocking mode: %m")));
+#endif
+                       MyProcPort->noblock = false;
+                       PG_RE_THROW();
+               }
+               PG_END_TRY();
+#ifdef WIN32
+               pgwin32_noblock = 0;
+#else
+               if (!pg_set_block(MyProcPort->sock))
+                       ereport(FATAL,
+                                       (errmsg("could not set socket to blocking mode: %m")));
+#endif
+               MyProcPort->noblock = false;
+
+               if (r == 0 || r == EOF)
+                       return r;
+
+               last_reported_send_errno = 0;   /* reset after any successful send */
+               bufptr += r;
+               PqSendStart += r;
+       }
+
+       PqSendStart = PqSendPointer = 0;
+       return 1;
+}
+
 
 /* --------------------------------
  * Message-level I/O routines begin here.
index d5d6143..8576af7 100644 (file)
@@ -193,19 +193,22 @@ DisownLatch(volatile Latch *latch)
 bool
 WaitLatch(volatile Latch *latch, long timeout)
 {
-       return WaitLatchOrSocket(latch, PGINVALID_SOCKET, timeout) > 0;
+       return WaitLatchOrSocket(latch, PGINVALID_SOCKET, false, false, timeout) > 0;
 }
 
 /*
  * Like WaitLatch, but will also return when there's data available in
- * 'sock' for reading. Returns 0 if timeout was reached, 1 if the latch
- * was set, or 2 if the scoket became readable.
+ * 'sock' for reading or writing. Returns 0 if timeout was reached,
+ * 1 if the latch was set, 2 if the scoket became readable, or 3 if
+ * the socket became writable.
  */
 int
-WaitLatchOrSocket(volatile Latch *latch, pgsocket sock, long timeout)
+WaitLatchOrSocket(volatile Latch *latch, pgsocket sock, bool forRead,
+                                 bool forWrite, long timeout)
 {
        struct timeval tv, *tvp = NULL;
        fd_set          input_mask;
+       fd_set          output_mask;
        int                     rc;
        int                     result = 0;
 
@@ -241,14 +244,22 @@ WaitLatchOrSocket(volatile Latch *latch, pgsocket sock, long timeout)
                FD_ZERO(&input_mask);
                FD_SET(selfpipe_readfd, &input_mask);
                hifd = selfpipe_readfd;
-               if (sock != PGINVALID_SOCKET)
+               if (sock != PGINVALID_SOCKET && forRead)
                {
                        FD_SET(sock, &input_mask);
                        if (sock > hifd)
                                hifd = sock;
                }
 
-               rc = select(hifd + 1, &input_mask, NULL, NULL, tvp);
+               FD_ZERO(&output_mask);
+               if (sock != PGINVALID_SOCKET && forWrite)
+               {
+                       FD_SET(sock, &output_mask);
+                       if (sock > hifd)
+                               hifd = sock;
+               }
+
+               rc = select(hifd + 1, &input_mask, &output_mask, NULL, tvp);
                if (rc < 0)
                {
                        if (errno == EINTR)
@@ -263,11 +274,18 @@ WaitLatchOrSocket(volatile Latch *latch, pgsocket sock, long timeout)
                        result = 0;
                        break;
                }
-               if (sock != PGINVALID_SOCKET && FD_ISSET(sock, &input_mask))
+               if (sock != PGINVALID_SOCKET && forRead &&
+                       FD_ISSET(sock, &input_mask))
                {
                        result = 2;
                        break;          /* data available in socket */
                }
+               if (sock != PGINVALID_SOCKET && forWrite &&
+                       FD_ISSET(sock, &output_mask))
+               {
+                       result = 3;
+                       break;          /* data writable in socket */
+               }
        }
        waiting = false;
 
@@ -312,6 +330,16 @@ SetLatch(volatile Latch *latch)
 }
 
 /*
+ * Signal the given reason, in addition to SetLatch.
+ */
+void
+SetProcLatch(volatile Latch *latch, ProcSignalReason reason, BackendId backendId)
+{
+       SetProcSignalReason(latch->owner_pid, reason, backendId);
+       SetLatch(latch);
+}
+
+/*
  * Clear the latch. Calling WaitLatch after this will sleep, unless
  * the latch is set again before the WaitLatch call.
  */
index 6c7d327..db92c47 100644 (file)
@@ -14,7 +14,8 @@
 #include "postgres.h"
 
 /*
- * Indicate if pgwin32_recv() should operate in non-blocking mode.
+ * Indicate if pgwin32_recv() and pgwin32_send() should operate
+ * in non-blocking mode.
  *
  * Since the socket emulation layer always sets the actual socket to
  * non-blocking mode in order to be able to deliver signals, we must
@@ -399,6 +400,16 @@ pgwin32_send(SOCKET s, char *buf, int len, int flags)
                        return -1;
                }
 
+               if (pgwin32_noblock)
+               {
+                       /*
+                        * No data sent, and we are in "emulated non-blocking mode", so
+                        * return indicating that we'd block if we were to continue.
+                        */
+                       errno = EWOULDBLOCK;
+                       return -1;
+               }
+
                /* No error, zero bytes (win2000+) or error+WSAEWOULDBLOCK (<=nt4) */
 
                if (pgwin32_waitforsinglesocket(s, FD_WRITE | FD_CLOSE, INFINITE) == 0)
index a36d1b6..b5fc93d 100644 (file)
@@ -85,11 +85,12 @@ DisownLatch(volatile Latch *latch)
 bool
 WaitLatch(volatile Latch *latch, long timeout)
 {
-       return WaitLatchOrSocket(latch, PGINVALID_SOCKET, timeout) > 0;
+       return WaitLatchOrSocket(latch, PGINVALID_SOCKET, false, false, timeout) > 0;
 }
 
 int
-WaitLatchOrSocket(volatile Latch *latch, SOCKET sock, long timeout)
+WaitLatchOrSocket(volatile Latch *latch, SOCKET sock, bool forRead,
+                                 bool forWrite, long timeout)
 {
        DWORD           rc;
        HANDLE          events[3];
@@ -98,15 +99,25 @@ WaitLatchOrSocket(volatile Latch *latch, SOCKET sock, long timeout)
        int                     numevents;
        int                     result = 0;
 
+       if (latch->owner_pid != MyProcPid)
+               elog(ERROR, "cannot wait on a latch owned by another process");
+
        latchevent = latch->event;
 
        events[0] = latchevent;
        events[1] = pgwin32_signal_event;
        numevents = 2;
-       if (sock != PGINVALID_SOCKET)
+       if (sock != PGINVALID_SOCKET && (forRead || forWrite))
        {
+               int             flags = 0;
+
+               if (forRead)
+                       flags |= FD_READ;
+               if (forWrite)
+                       flags |= FD_WRITE;
+
                sockevent = WSACreateEvent();
-               WSAEventSelect(sock, sockevent, FD_READ);
+               WSAEventSelect(sock, sockevent, flags);
                events[numevents++] = sockevent;
        }
 
@@ -139,8 +150,19 @@ WaitLatchOrSocket(volatile Latch *latch, SOCKET sock, long timeout)
                        pgwin32_dispatch_queued_signals();
                else if (rc == WAIT_OBJECT_0 + 2)
                {
+                       WSANETWORKEVENTS resEvents;
+
                        Assert(sock != PGINVALID_SOCKET);
-                       result = 2;
+
+                       ZeroMemory(&resEvents, sizeof(resEvents));
+                       if (WSAEnumNetworkEvents(sock, sockevent, &resEvents) == SOCKET_ERROR)
+                               ereport(FATAL,
+                                               (errmsg_internal("failed to enumerate network events: %i", (int) GetLastError())));
+
+                       if (forRead && resEvents.lNetworkEvents & FD_READ)
+                               result = 2;
+                       if (forWrite && resEvents.lNetworkEvents & FD_WRITE)
+                               result = 3;
                        break;
                }
                else if (rc != WAIT_OBJECT_0)
@@ -148,7 +170,7 @@ WaitLatchOrSocket(volatile Latch *latch, SOCKET sock, long timeout)
        }
 
        /* Clean up the handle we created for the socket */
-               if (sock != PGINVALID_SOCKET)
+       if (sock != PGINVALID_SOCKET && (forRead || forWrite))
        {
                WSAEventSelect(sock, sockevent, 0);
                WSACloseEvent(sockevent);
@@ -191,8 +213,21 @@ SetLatch(volatile Latch *latch)
        }
 }
 
+/*
+ * Signal the given reason, in addition to SetLatch.
+ */
+void
+SetProcLatch(volatile Latch *latch, ProcSignalReason reason, BackendId backendId)
+{
+       SetProcSignalReason(latch->owner_pid, reason, backendId);
+       SetLatch(latch);
+}
+
 void
 ResetLatch(volatile Latch *latch)
 {
+       /* Only the owner should reset the latch */
+       Assert(latch->owner_pid == MyProcPid);
+
        latch->is_set = false;
 }
index c3963e9..a0f1e1f 100644 (file)
 #include "storage/fd.h"
 #include "storage/ipc.h"
 #include "storage/pg_shmem.h"
+#include "storage/pmevent.h"
 #include "storage/pmsignal.h"
 #include "storage/proc.h"
 #include "tcop/tcopprot.h"
@@ -4251,6 +4252,15 @@ sigusr1_handler(SIGNAL_ARGS)
                WalReceiverPID = StartWalReceiver();
        }
 
+       if (CheckPostmasterEvent(PMEVENT_PROMOTE_STANDBY) &&
+               StartupPID != 0 &&
+               (pmState == PM_STARTUP || pmState == PM_RECOVERY ||
+                pmState == PM_HOT_STANDBY || pmState == PM_WAIT_READONLY))
+       {
+               /* Tell startup process to finish recovery */
+               signal_child(StartupPID, SIGUSR2);
+       }
+
        PG_SETMASK(&UnBlockSig);
 
        errno = save_errno;
index 6b477fd..0d8c278 100644 (file)
@@ -156,8 +156,9 @@ libpqrcv_connect(char *conninfo, XLogRecPtr startpoint)
        ThisTimeLineID = primary_tli;
 
        /* Start streaming from the point requested by startup process */
-       snprintf(cmd, sizeof(cmd), "START_REPLICATION %X/%X",
-                        startpoint.xlogid, startpoint.xrecoff);
+       snprintf(cmd, sizeof(cmd), "START_REPLICATION %X/%X MODE %s",
+                        startpoint.xlogid, startpoint.xrecoff,
+                        GetConfigOption("replication_mode", false));
        res = libpqrcv_PQexec(cmd);
        if (PQresultStatus(res) != PGRES_COPY_BOTH)
        {
index f61db04..6ff6e27 100644 (file)
@@ -114,6 +114,7 @@ static void WalRcvDie(int code, Datum arg);
 static void XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len);
 static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr);
 static void XLogWalRcvFlush(void);
+static void XLogWalRcvSendRecPtr(XLogRecPtr recptr);
 
 /* Signal handlers */
 static void WalRcvSigHupHandler(SIGNAL_ARGS);
@@ -160,6 +161,7 @@ WalReceiverMain(void)
 {
        char            conninfo[MAXCONNINFO];
        XLogRecPtr      startpoint;
+       XLogRecPtr      ackedpoint = {0, 0};
 
        /* use volatile pointer to prevent code rearrangement */
        volatile WalRcvData *walrcv = WalRcv;
@@ -265,6 +267,14 @@ WalReceiverMain(void)
        walrcv_connect(conninfo, startpoint);
        DisableWalRcvImmediateExit();
 
+       /*
+        * Once we succeeded in starting replication, we regard the standby
+        * as out-of-date until it has caught up with the primary.
+        */
+       SpinLockAcquire(&walrcv->mutex);
+       walrcv->reachSync = false;
+       SpinLockRelease(&walrcv->mutex);
+
        /* Loop until end-of-streaming or error */
        for (;;)
        {
@@ -312,6 +322,25 @@ WalReceiverMain(void)
                         */
                        XLogWalRcvFlush();
                }
+
+               /*
+                * If replication_mode is "apply", send the last WAL replay location
+                * to the primary, to acknowledge that replication has been completed
+                * up to that. This occurs only when WAL records were replayed since
+                * the last acknowledgement.
+                */
+               if (replication_mode == REPLICATION_MODE_APPLY &&
+                       XLByteLT(ackedpoint, LogstreamResult.Flush))
+               {
+                       XLogRecPtr      recptr;
+
+                       recptr = GetReplayRecPtr();
+                       if (XLByteLT(ackedpoint, recptr))
+                       {
+                               XLogWalRcvSendRecPtr(recptr);
+                               ackedpoint = recptr;
+                       }
+               }
        }
 }
 
@@ -407,9 +436,32 @@ XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len)
                                buf += sizeof(WalDataMessageHeader);
                                len -= sizeof(WalDataMessageHeader);
 
+                               /*
+                                * If replication_mode is "recv", send the last WAL receive
+                                * location to the primary, to acknowledge that replication
+                                * has been completed up to that.
+                                */
+                               if (replication_mode == REPLICATION_MODE_RECV)
+                               {
+                                       XLogRecPtr      endptr = msghdr.dataStart;
+
+                                       XLByteAdvance(endptr, len);
+                                       XLogWalRcvSendRecPtr(endptr);
+                               }
+
                                XLogWalRcvWrite(buf, len, msghdr.dataStart);
                                break;
                        }
+               case 'c':                               /* catchup complete */
+                       {
+                               /* use volatile pointer to prevent code rearrangement */
+                               volatile WalRcvData *walrcv = WalRcv;
+
+                               SpinLockAcquire(&walrcv->mutex);
+                               walrcv->reachSync = true;
+                               SpinLockRelease(&walrcv->mutex);
+                               break;
+                       }
                default:
                        ereport(ERROR,
                                        (errcode(ERRCODE_PROTOCOL_VIOLATION),
@@ -524,6 +576,14 @@ XLogWalRcvFlush(void)
 
                LogstreamResult.Flush = LogstreamResult.Write;
 
+               /*
+                * If replication_mode is "fsync", send the last WAL flush
+                * location to the primary, to acknowledge that replication
+                * has been completed up to that.
+                */
+               if (replication_mode == REPLICATION_MODE_FSYNC)
+                       XLogWalRcvSendRecPtr(LogstreamResult.Flush);
+
                /* Update shared-memory status */
                SpinLockAcquire(&walrcv->mutex);
                walrcv->latestChunkStart = walrcv->receivedUpto;
@@ -545,3 +605,24 @@ XLogWalRcvFlush(void)
                }
        }
 }
+
+/* Send the lsn to the primary server */
+static void
+XLogWalRcvSendRecPtr(XLogRecPtr recptr)
+{
+       static char        *msgbuf = NULL;
+       WalAckMessageData       msgdata;
+
+       /*
+        * Allocate buffer that will be used for each output message if first
+        * time through.  We do this just once to reduce palloc overhead.
+        * The buffer must be made large enough for maximum-sized messages.
+        */
+       if (msgbuf == NULL)
+               msgbuf = palloc(1 + sizeof(WalAckMessageData));
+
+       msgbuf[0] = 'l';
+       msgdata.ackEnd = recptr;
+       memcpy(msgbuf + 1, &msgdata, sizeof(WalAckMessageData));
+       walrcv_send(msgbuf, 1 + sizeof(WalAckMessageData));
+}
index 2ccaedb..c674121 100644 (file)
@@ -229,3 +229,20 @@ GetWalRcvWriteRecPtr(XLogRecPtr *latestChunkStart)
 
        return recptr;
 }
+
+/*
+ * Returns whether the standby has already caught up with the primary.
+ */
+Datum
+pg_is_in_sync(PG_FUNCTION_ARGS)
+{
+       /* use volatile pointer to prevent code rearrangement */
+       volatile WalRcvData *walrcv = WalRcv;
+       bool    ret;
+
+       SpinLockAcquire(&walrcv->mutex);
+       ret = walrcv->reachSync;
+       SpinLockRelease(&walrcv->mutex);
+
+       return ret;
+}
index 66f08e1..a41365f 100644 (file)
@@ -48,6 +48,7 @@
 #include "storage/fd.h"
 #include "storage/ipc.h"
 #include "storage/pmsignal.h"
+#include "storage/proc.h"
 #include "tcop/tcopprot.h"
 #include "utils/guc.h"
 #include "utils/memutils.h"
@@ -60,12 +61,28 @@ WalSndCtlData *WalSndCtl = NULL;
 /* My slot in the shared memory array */
 static WalSnd *MyWalSnd = NULL;
 
+/* Array of WalSndWaiter in shared memory */
+static WalSndWaiter  *WalSndWaiters;
+
 /* Global state */
 bool           am_walsender = false;           /* Am I a walsender process ? */
 
 /* User-settable parameters for walsender */
 int                    max_wal_senders = 0;    /* the maximum number of concurrent walsenders */
 int                    WalSndDelay = 200;      /* max sleep time between some actions */
+int                    replication_timeout = 0;        /* maximum time to wait for the Ack from the standby */
+char      *standby_fencing_command = NULL;     /* command to shoot the standby in the head */
+
+/*
+ * Buffer for WAL sending
+ *
+ * WalSndOutBuffer is a work area in which the output message is constructed.
+ * It's used in just so we can avoid re-palloc'ing the buffer on each cycle.
+ * It must be of size 6 + sizeof(WalDataMessageHeader) + MAX_SEND_SIZE.
+ */
+static char       *WalSndOutBuffer;
+static int             WalSndOutHead;          /* head of pending output */
+static int             WalSndOutTail;          /* tail of pending output */
 
 /*
  * These variables are used similarly to openLogFile/Id/Seg/Off,
@@ -82,11 +99,21 @@ static uint32 sendOff = 0;
  */
 static XLogRecPtr sentPtr = {0, 0};
 
+/*
+ * How far have we completed replication already? This is also
+ * advertised in MyWalSnd->ackdPtr. This is not used in asynchronous
+ * replication case.
+ */
+static XLogRecPtr ackdPtr = {0, 0};
+
 /* Flags set by signal handlers for later service in main loop */
 static volatile sig_atomic_t got_SIGHUP = false;
 static volatile sig_atomic_t shutdown_requested = false;
 static volatile sig_atomic_t ready_to_stop = false;
 
+/* Flag set by signal handler of backends for replication */
+static volatile sig_atomic_t replication_done = false;
+
 /* Signal handlers */
 static void WalSndSigHupHandler(SIGNAL_ARGS);
 static void WalSndShutdownHandler(SIGNAL_ARGS);
@@ -100,8 +127,14 @@ static void InitWalSnd(void);
 static void WalSndHandshake(void);
 static void WalSndKill(int code, Datum arg);
 static void XLogRead(char *buf, XLogRecPtr recptr, Size nbytes);
-static bool XLogSend(char *msgbuf, bool *caughtup);
-static void CheckClosedConnection(void);
+static bool XLogSend(bool *caughtup, bool *pending);
+static void ProcessStreamMsgs(StringInfo inMsg);
+static void ExecuteStandbyFencingCommand(void);
+
+static void RegisterWalSndWaiter(BackendId backendId, XLogRecPtr record,
+                                                                Latch *latch);
+static void WakeupWalSndWaiters(XLogRecPtr record);
+static XLogRecPtr GetOldestAckdPtr(void);
 
 
 /* Main entry point for walsender process */
@@ -205,6 +238,8 @@ WalSndHandshake(void)
                /* Handle the very limited subset of commands expected in this phase */
                switch (firstchar)
                {
+                       char            modestr[6];
+
                        case 'Q':                       /* Query message */
                                {
                                        const char *query_string;
@@ -265,10 +300,11 @@ WalSndHandshake(void)
                                                ReadyForQuery(DestRemote);
                                                /* ReadyForQuery did pq_flush for us */
                                        }
-                                       else if (sscanf(query_string, "START_REPLICATION %X/%X",
-                                                                       &recptr.xlogid, &recptr.xrecoff) == 2)
+                                       else if (sscanf(query_string, "START_REPLICATION %X/%X MODE %5s",
+                                                                       &recptr.xlogid, &recptr.xrecoff, modestr) == 3)
                                        {
                                                StringInfoData buf;
+                                               int                     mode;
 
                                                /*
                                                 * Check that we're logging enough information in the
@@ -287,6 +323,35 @@ WalSndHandshake(void)
                                                                        (errcode(ERRCODE_CANNOT_CONNECT_NOW),
                                                                         errmsg("standby connections not allowed because wal_level=minimal")));
 
+                                               /* Verify that the specified replication mode is valid */
+                                               {
+                                                       const struct config_enum_entry *entry;
+
+                                                       for (entry = replication_mode_options; entry && entry->name; entry++)
+                                                       {
+                                                               if (strcmp(modestr, entry->name) == 0)
+                                                               {
+                                                                       mode = entry->val;
+                                                                       break;
+                                                               }
+                                                       }
+                                                       if (entry == NULL || entry->name == NULL)
+                                                               ereport(FATAL,
+                                                                               (errcode(ERRCODE_PROTOCOL_VIOLATION),
+                                                                                errmsg("invalid replication mode: %s", modestr)));
+                                               }
+
+                                               /* Change the state according to replication mode specified by standby */
+                                               {
+                                                       /* use volatile pointer to prevent code rearrangement */
+                                                       volatile WalSnd *walsnd = MyWalSnd;
+
+                                                       SpinLockAcquire(&walsnd->mutex);
+                                                       walsnd->walSndState = (mode == REPLICATION_MODE_ASYNC) ?
+                                                               WALSND_ASYNC : WALSND_CATCHUP;
+                                                       SpinLockRelease(&walsnd->mutex);
+                                               }
+
                                                /* Send a CopyBothResponse message, and start streaming */
                                                pq_beginmessage(&buf, 'W');
                                                pq_sendbyte(&buf, 0);
@@ -295,10 +360,10 @@ WalSndHandshake(void)
                                                pq_flush();
 
                                                /*
-                                                * Initialize position to the received one, then the
+                                                * Initialize positions to the received one, then the
                                                 * xlog records begin to be shipped from that position
                                                 */
-                                               sentPtr = recptr;
+                                               sentPtr = ackdPtr = recptr;
 
                                                /* break out of the loop */
                                                replication_started = true;
@@ -332,59 +397,133 @@ WalSndHandshake(void)
 }
 
 /*
- * Check if the remote end has closed the connection.
+ * Process messages received from the standby.
+ *
+ * ereports on error.
  */
 static void
-CheckClosedConnection(void)
+ProcessStreamMsgs(StringInfo inMsg)
 {
-       unsigned char firstchar;
-       int                     r;
+       bool    acked = false;
 
-       r = pq_getbyte_if_available(&firstchar);
-       if (r < 0)
-       {
-               /* unexpected error or EOF */
-               ereport(COMMERROR,
-                               (errcode(ERRCODE_PROTOCOL_VIOLATION),
-                                errmsg("unexpected EOF on standby connection")));
-               proc_exit(0);
-       }
-       if (r == 0)
+       /* Loop to process successive complete messages available */
+       for (;;)
        {
-               /* no data available without blocking */
-               return;
-       }
+               unsigned char firstchar;
+               int                     r;
+
+               r = pq_getbyte_if_available(&firstchar);
+               if (r < 0)
+               {
+                       /* unexpected error or EOF */
+                       ereport(COMMERROR,
+                                       (errcode(ERRCODE_PROTOCOL_VIOLATION),
+                                        errmsg("unexpected EOF on standby connection")));
+                       proc_exit(0);
+               }
+               if (r == 0)
+               {
+                       /* no data available without blocking */
+                       break;
+               }
+
+               /* Handle the very limited subset of commands expected in this phase */
+               switch (firstchar)
+               {
+                       case 'd':       /* CopyData message */
+                       {
+                               unsigned char   rpltype;
+
+                               /*
+                                * Read the message contents. This is expected to be done without
+                                * blocking because we've been able to get message type code.
+                                */
+                               if (pq_getmessage(inMsg, 0))
+                                       proc_exit(0);           /* suitable message already logged */
+
+                               /* Read the replication message type from CopyData message */
+                               rpltype = pq_getmsgbyte(inMsg);
+                               switch (rpltype)
+                               {
+                                       case 'l':
+                                       {
+                                               WalAckMessageData  *msgdata;
+
+                                               msgdata = (WalAckMessageData *) pq_getmsgbytes(inMsg, sizeof(WalAckMessageData));
+
+                                               /*
+                                                * Update local status.
+                                                *
+                                                * The ackd ptr received from standby should not
+                                                * go backwards.
+                                                */
+                                               if (XLByteLE(ackdPtr, msgdata->ackEnd))
+                                                       ackdPtr = msgdata->ackEnd;
+                                               else
+                                                       ereport(FATAL,
+                                                                       (errmsg("replication completion location went back from "
+                                                                                       "%X/%X to %X/%X",
+                                                                                       ackdPtr.xlogid, ackdPtr.xrecoff,
+                                                                                       msgdata->ackEnd.xlogid, msgdata->ackEnd.xrecoff)));
+
+                                               acked = true;   /* also need to update shared position */
+                                               break;
+                                       }
+                                       default:
+                                               ereport(FATAL,
+                                                               (errcode(ERRCODE_PROTOCOL_VIOLATION),
+                                                                errmsg("invalid replication message type %d",
+                                                                               rpltype)));
+                               }
+                               break;
+                       }
 
-       /* Handle the very limited subset of commands expected in this phase */
-       switch (firstchar)
-       {
                        /*
                         * 'X' means that the standby is closing down the socket.
                         */
-               case 'X':
-                       proc_exit(0);
+                       case 'X':
+                               proc_exit(0);
 
-               default:
-                       ereport(FATAL,
-                                       (errcode(ERRCODE_PROTOCOL_VIOLATION),
-                                        errmsg("invalid standby closing message type %d",
-                                                       firstchar)));
+                       default:
+                               ereport(FATAL,
+                                               (errcode(ERRCODE_PROTOCOL_VIOLATION),
+                                                errmsg("invalid standby message type %d",
+                                                               firstchar)));
+               }
        }
+
+       if (acked)
+       {
+               /* use volatile pointer to prevent code rearrangement */
+               volatile WalSnd *walsnd = MyWalSnd;
+
+               SpinLockAcquire(&walsnd->mutex);
+               walsnd->ackdPtr = ackdPtr;
+               SpinLockRelease(&walsnd->mutex);
+       }
+
+       /* Wake up the backends that this walsender had been blocking */
+       WakeupWalSndWaiters(ackdPtr);
 }
 
 /* Main loop of walsender process */
 static int
 WalSndLoop(void)
 {
-       char       *output_message;
+       StringInfoData  input_message;
        bool            caughtup = false;
+       bool            pending = false;
+       XLogRecPtr      switchptr = {0, 0};
+
+       initStringInfo(&input_message);
 
        /*
         * Allocate buffer that will be used for each output message.  We do this
         * just once to reduce palloc overhead.  The buffer must be made large
         * enough for maximum-sized messages.
         */
-       output_message = palloc(1 + sizeof(WalDataMessageHeader) + MAX_SEND_SIZE);
+       WalSndOutBuffer = palloc(6 + sizeof(WalDataMessageHeader) + MAX_SEND_SIZE);
+       WalSndOutHead = WalSndOutTail = 0;
 
        /* Loop forever, unless we get an error */
        for (;;)
@@ -409,9 +548,9 @@ WalSndLoop(void)
                 */
                if (ready_to_stop)
                {
-                       if (!XLogSend(output_message, &caughtup))
+                       if (!XLogSend(&caughtup, &pending))
                                break;
-                       if (caughtup)
+                       if (caughtup && !pending)
                                shutdown_requested = true;
                }
 
@@ -426,10 +565,11 @@ WalSndLoop(void)
                }
 
                /*
-                * If we had sent all accumulated WAL in last round, nap for the
-                * configured time before retrying.
+                * If we had sent all accumulated WAL in last round or could not
+                * flush pending WAL in output buffer because the socket was not
+                * writable, nap for the configured time before retrying.
                 */
-               if (caughtup)
+               if (caughtup || pending)
                {
                        /*
                         * Even if we wrote all the WAL that was available when we started
@@ -440,28 +580,119 @@ WalSndLoop(void)
                         */
                        ResetLatch(&MyWalSnd->latch);
 
-                       if (!XLogSend(output_message, &caughtup))
+                       if (!XLogSend(&caughtup, &pending))
                                break;
-                       if (caughtup && !got_SIGHUP && !ready_to_stop && !shutdown_requested)
+
+                       /*
+                        * If the standby has almost caught up with the primary, we change
+                        * the state to WALSND_PRESYNC and start making transactions wait
+                        * until their WAL has been replicated.
+                        *
+                        * No lock is required to get WalSnd->walSndState here since it can
+                        * be updated only by walsender.
+                        */
+                       if (MyWalSnd->walSndState == WALSND_CATCHUP && caughtup)
                        {
+                               /* use volatile pointer to prevent code rearrangement */
+                               volatile WalSnd *walsnd = MyWalSnd;
+
+                               SpinLockAcquire(&walsnd->mutex);
+                               walsnd->walSndState = WALSND_PRESYNC;
+                               SpinLockRelease(&walsnd->mutex);
+
+                               /*
+                                * switchptr indicates how far we must complete replication
+                                * before advertising that the standby has already been in
+                                * sync with the primary.
+                                */
+                               switchptr = GetFlushRecPtr();
+                       }
+
+                       if ((caughtup || pending) && !got_SIGHUP && !ready_to_stop &&
+                               !shutdown_requested)
+                       {
+                               bool            check_timeout;
+                               long            sleeptime;
+                               int                     res;
+
                                /*
                                 * XXX: We don't really need the periodic wakeups anymore,
                                 * WaitLatchOrSocket should reliably wake up as soon as
                                 * something interesting happens.
                                 */
 
+                               /*
+                                * Check for replication timeout if it's enabled and we need
+                                * to wait until the socket has become writable to flush
+                                * pending WAL in output buffer or until the Ack message
+                                * from the standby has become available.
+                                */
+                               if (replication_timeout > 0 &&
+                                       (pending ||
+                                        (MyWalSnd->walSndState >= WALSND_CATCHUP &&
+                                         XLByteLT(ackdPtr, sentPtr))))
+                               {
+                                       sleeptime = replication_timeout;
+                                       check_timeout = true;
+                               }
+                               else
+                               {
+                                       sleeptime = WalSndDelay;
+                                       check_timeout = false;
+                               }
+
                                /* Sleep */
-                               WaitLatchOrSocket(&MyWalSnd->latch, MyProcPort->sock,
-                                                                 WalSndDelay * 1000L);
+                               res = WaitLatchOrSocket(&MyWalSnd->latch, MyProcPort->sock,
+                                                                               true, (WalSndOutTail > 0),
+                                                                               sleeptime * 1000L);
+
+                               if (res == 0 && check_timeout)
+                               {
+                                       /*
+                                        * Since typically expiration of replication timeout means
+                                        * communication problem, we don't send the error message
+                                        * to the standby.
+                                        */
+                                       ereport(COMMERROR,
+                                                       (errmsg("terminating walsender process due to replication timeout")));
+                                       break;
+                               }
                        }
 
-                       /* Check if the connection was closed */
-                       CheckClosedConnection();
+                       /* Process messages received from the standby */
+                       ProcessStreamMsgs(&input_message);
+
+                       /*
+                        * If the standby has caught up with the primary, we change
+                        * the state to WALSND_SYNC and inform the standby that it's
+                        * in sync with the primary. This state ensures that all the
+                        * transactions completed from a client's point of view have
+                        * been replicated to the standby.
+                        */
+                       if (MyWalSnd->walSndState == WALSND_PRESYNC &&
+                               XLByteLE(switchptr, ackdPtr) && !pending)
+                       {
+                               /* use volatile pointer to prevent code rearrangement */
+                               volatile WalSnd *walsnd = MyWalSnd;
+
+                               SpinLockAcquire(&walsnd->mutex);
+                               walsnd->walSndState = WALSND_SYNC;
+                               SpinLockRelease(&walsnd->mutex);
+
+                               /*
+                                * We can send a XLogCatchupComplete message without blocking
+                                * since it's guaranteed that there is no pending data in the
+                                * output buffer.
+                                */
+                               pq_putmessage('d', "c", 1);
+                               if (pq_flush())
+                                       break;
+                       }
                }
                else
                {
                        /* Attempt to send the log once every loop */
-                       if (!XLogSend(output_message, &caughtup))
+                       if (!XLogSend(&caughtup, &pending))
                                break;
                }
        }
@@ -515,6 +746,8 @@ InitWalSnd(void)
                         */
                        walsnd->pid = MyProcPid;
                        MemSet(&walsnd->sentPtr, 0, sizeof(XLogRecPtr));
+                       MemSet(&walsnd->ackdPtr, 0, sizeof(XLogRecPtr));
+                       walsnd->walSndState = WALSND_INIT;
                        SpinLockRelease(&walsnd->mutex);
                        /* don't need the lock anymore */
                        OwnLatch((Latch *) &walsnd->latch);
@@ -538,9 +771,28 @@ InitWalSnd(void)
 static void
 WalSndKill(int code, Datum arg)
 {
+       /* use volatile pointer to prevent code rearrangement */
+       volatile WalSnd *walsnd = MyWalSnd;
+
        Assert(MyWalSnd != NULL);
 
        /*
+        * If replication was terminated for a reason other than the master
+        * server shutdown or emergency bailout (i.e., unexpected death of
+        * postmaster), we can expect this server can work standalone,
+        * so we call standby_fencing_command to shoot the standby server
+        * in the head if it's specified.
+        */
+       if (!ready_to_stop && PostmasterIsAlive(true))
+               ExecuteStandbyFencingCommand();
+
+       /* Wake up the backends that this walsender had been blocking */
+       SpinLockAcquire(&walsnd->mutex);
+       walsnd->walSndState = WALSND_INIT;
+       SpinLockRelease(&walsnd->mutex);
+       WakeupWalSndWaiters(GetOldestAckdPtr());
+
+       /*
         * Mark WalSnd struct no longer in use. Assume that no lock is required
         * for this.
         */
@@ -670,24 +922,48 @@ XLogRead(char *buf, XLogRecPtr recptr, Size nbytes)
  * Read up to MAX_SEND_SIZE bytes of WAL that's been flushed to disk,
  * but not yet sent to the client, and send it.
  *
- * msgbuf is a work area in which the output message is constructed.  It's
- * passed in just so we can avoid re-palloc'ing the buffer on each cycle.
- * It must be of size 1 + sizeof(WalDataMessageHeader) + MAX_SEND_SIZE.
- *
  * If there is no unsent WAL remaining, *caughtup is set to true, otherwise
  * *caughtup is set to false.
  *
+ * If there is pending WAL in output buffer, *pending is set to true,
+ * otherwise *pending is set to false.
+ *
  * Returns true if OK, false if trouble.
  */
 static bool
-XLogSend(char *msgbuf, bool *caughtup)
+XLogSend(bool *caughtup, bool *pending)
 {
        XLogRecPtr      SendRqstPtr;
        XLogRecPtr      startptr;
-       XLogRecPtr      endptr;
+       static XLogRecPtr       endptr;
        Size            nbytes;
+       uint32          n32;
+       int                     res;
        WalDataMessageHeader msghdr;
 
+       /* Attempt to flush pending WAL in output buffer */
+       if (*pending)
+       {
+               if (WalSndOutHead != WalSndOutTail)
+               {
+                       res = pq_putbytes_if_writable(WalSndOutBuffer + WalSndOutHead,
+                                                                                 WalSndOutTail - WalSndOutHead);
+                       if (res == EOF)
+                               return false;
+                       WalSndOutHead += res;
+                       if (WalSndOutHead != WalSndOutTail)
+                               return true;
+               }
+
+               res = pq_flush_if_writable();
+               if (res == EOF)
+                       return false;
+               if (res == 0)
+                       return true;
+
+               goto updt;
+       }
+
        /*
         * Attempt to send all data that's already been written out and fsync'd to
         * disk.  We cannot go further than what's been written out given the
@@ -756,13 +1032,19 @@ XLogSend(char *msgbuf, bool *caughtup)
        /*
         * OK to read and send the slice.
         */
-       msgbuf[0] = 'w';
+       WalSndOutBuffer[0] = 'd';
+       WalSndOutBuffer[5] = 'w';
+       WalSndOutHead = 0;
+       WalSndOutTail = 6 + sizeof(WalDataMessageHeader) + nbytes;
+
+       n32 = htonl((uint32) WalSndOutTail - 1);
+       memcpy(WalSndOutBuffer + 1, &n32, 4);
 
        /*
         * Read the log directly into the output buffer to avoid extra memcpy
         * calls.
         */
-       XLogRead(msgbuf + 1 + sizeof(WalDataMessageHeader), startptr, nbytes);
+       XLogRead(WalSndOutBuffer + 6 + sizeof(WalDataMessageHeader), startptr, nbytes);
 
        /*
         * We fill the message header last so that the send timestamp is taken as
@@ -772,13 +1054,34 @@ XLogSend(char *msgbuf, bool *caughtup)
        msghdr.walEnd = SendRqstPtr;
        msghdr.sendTime = GetCurrentTimestamp();
 
-       memcpy(msgbuf + 1, &msghdr, sizeof(WalDataMessageHeader));
+       memcpy(WalSndOutBuffer + 6, &msghdr, sizeof(WalDataMessageHeader));
+
+       res = pq_putbytes_if_writable(WalSndOutBuffer, WalSndOutTail);
+       if (res == EOF)
+               return false;
 
-       pq_putmessage('d', msgbuf, 1 + sizeof(WalDataMessageHeader) + nbytes);
+       WalSndOutHead = res;
+       if (WalSndOutHead != WalSndOutTail)
+       {
+               *caughtup = false;
+               *pending = true;
+               return true;
+       }
 
        /* Flush pending output to the client */
-       if (pq_flush())
+       res = pq_flush_if_writable();
+       if (res == EOF)
                return false;
+       if (res == 0)
+       {
+               *caughtup = false;
+               *pending = true;
+               return true;
+       }
+
+updt:
+       WalSndOutHead = WalSndOutTail = 0;
+       *pending = false;
 
        sentPtr = endptr;
 
@@ -805,6 +1108,91 @@ XLogSend(char *msgbuf, bool *caughtup)
        return true;
 }
 
+/*
+ * Attempt to execute standby_fencing_command at the end of replication.
+ */
+static void
+ExecuteStandbyFencingCommand(void)
+{
+       char            standbyFencingCmd[MAXPGPATH];
+       char       *dp;
+       char       *endp;
+       const char *sp;
+       int                     rc;
+
+       /* Do nothing if no command supplied */
+       if (standby_fencing_command[0] == '\0')
+               return;
+
+       /*
+        * construct the command to be executed
+        */
+       dp = standbyFencingCmd;
+       endp = standbyFencingCmd + MAXPGPATH - 1;
+       *endp = '\0';
+
+       for (sp = standby_fencing_command; *sp; sp++)
+       {
+               if (*sp == '%')
+               {
+                       switch (sp[1])
+                       {
+                               case 'a':
+                               {
+                                       /* %a: application_name */
+                                       const char *appname = application_name;
+
+                                       if (appname == NULL || *appname == '\0')
+                                               appname = _("[unknown]");
+
+                                       sp++;
+                                       strlcpy(dp, appname, endp - dp);
+                                       dp += strlen(dp);
+                                       break;
+                               }
+                               case '%':
+                                       /* convert %% to a single % */
+                                       sp++;
+                                       if (dp < endp)
+                                               *dp++ = *sp;
+                                       break;
+                               default:
+                                       /* otherwise treat the % as not special */
+                                       if (dp < endp)
+                                               *dp++ = *sp;
+                                       break;
+                       }
+               }
+               else
+               {
+                       if (dp < endp)
+                               *dp++ = *sp;
+               }
+       }
+       *dp = '\0';
+
+       ereport(DEBUG3,
+                       (errmsg_internal("executing standby fencing command \"%s\"",
+                                                        standbyFencingCmd)));
+
+       /*
+        * execute the constructed command
+        */
+       rc = system(standbyFencingCmd);
+       if (rc != 0)
+       {
+               /*
+                * No matter what code is returned, walsender can't stop exiting.
+                * We don't need to care about the return code of the command here.
+                */
+               ereport(WARNING,
+                               (errmsg("standby fencing command failed with return code %d",
+                                               rc),
+                                errdetail("The failed standby fencing command was: %s",
+                                                  standbyFencingCmd)));
+       }
+}
+
 /* SIGHUP: set flag to re-read config file at next convenient time */
 static void
 WalSndSigHupHandler(SIGNAL_ARGS)
@@ -904,6 +1292,13 @@ WalSndShmemSize(void)
        size = offsetof(WalSndCtlData, walsnds);
        size = add_size(size, mul_size(max_wal_senders, sizeof(WalSnd)));
 
+       /*
+        * If replication is enabled, we have a data structure called
+        * WalSndWaiters, created in shared memory.
+        */
+       if (max_wal_senders > 0)
+               size = add_size(size, mul_size(MaxBackends, sizeof(WalSndWaiter)));
+
        return size;
 }
 
@@ -913,14 +1308,16 @@ WalSndShmemInit(void)
 {
        bool            found;
        int                     i;
+       Size            size = add_size(offsetof(WalSndCtlData, walsnds),
+                                                               mul_size(max_wal_senders, sizeof(WalSnd)));
 
        WalSndCtl = (WalSndCtlData *)
-               ShmemInitStruct("Wal Sender Ctl", WalSndShmemSize(), &found);
+               ShmemInitStruct("Wal Sender Ctl", size, &found);
 
        if (!found)
        {
                /* First time through, so initialize */
-               MemSet(WalSndCtl, 0, WalSndShmemSize());
+               MemSet(WalSndCtl, 0, size);
 
                for (i = 0; i < max_wal_senders; i++)
                {
@@ -930,6 +1327,16 @@ WalSndShmemInit(void)
                        InitSharedLatch(&walsnd->latch);
                }
        }
+
+       /* Create or attach to the WalSndWaiters array too, if needed */
+       if (max_wal_senders > 0)
+       {
+               WalSndWaiters = (WalSndWaiter *)
+                       ShmemInitStruct("WalSndWaiters",
+                                                       mul_size(MaxBackends, sizeof(WalSndWaiter)),
+                                                       &found);
+               WalSndCtl->maxWaiters = MaxBackends;
+       }
 }
 
 /* Wake up all walsenders */
@@ -943,36 +1350,204 @@ WalSndWakeup(void)
 }
 
 /*
- * This isn't currently used for anything. Monitoring tools might be
- * interested in the future, and we'll need something like this in the
- * future for synchronous replication.
+ * Ensure that replication has been completed up to the given position.
  */
-#ifdef NOT_USED
+void
+WaitXLogSend(XLogRecPtr record)
+{
+       int             i;
+       bool    mustwait = false;
+
+       Assert(max_wal_senders > 0);
+
+       for (i = 0; i < max_wal_senders; i++)
+       {
+               /* use volatile pointer to prevent code rearrangement */
+               volatile WalSnd *walsnd = &WalSndCtl->walsnds[i];
+               XLogRecPtr              recptr;
+               WalSndState             state;
+
+               if (walsnd->pid == 0)
+                       continue;
+
+               SpinLockAcquire(&walsnd->mutex);
+               state = walsnd->walSndState;
+               recptr = walsnd->ackdPtr;
+               SpinLockRelease(&walsnd->mutex);
+
+               if (state <= WALSND_ASYNC ||
+                       (recptr.xlogid == 0 && recptr.xrecoff == 0))
+                       continue;
+
+               /* Quick exit if already known replicated */
+               if (XLByteLE(record, recptr))
+                       return;
+
+               /*
+                * If walsender is bulk-sending WAL for standby to catch up,
+                * we don't need to wait for Ack from standby.
+                */
+               if (state <= WALSND_CATCHUP)
+                       continue;
+
+               mustwait = true;
+       }
+
+       /*
+        * Don't need to wait for replication if there is no synchronous
+        * standby
+        */
+       if (!mustwait)
+               return;
+
+       /*
+        * Register myself into the wait list and sleep until replication
+        * has been completed up to the given position and the walsender
+        * signals me.
+        *
+        * If replication has been completed up to the latest position
+        * before the registration, walsender might be unable to send the
+        * signal immediately. We must wake up the walsender after the
+        * registration.
+        */
+       ResetLatch(&MyProc->latch);
+       RegisterWalSndWaiter(MyBackendId, record, &MyProc->latch);
+       WalSndWakeup();
+
+       for (;;)
+       {
+               WaitLatch(&MyProc->latch, 1000000L);
+
+               /* If done already, we finish waiting */
+               if (replication_done)
+               {
+                       replication_done = false;
+                       return;
+               }
+       }
+}
+
+/*
+ * Register the given backend into the wait list.
+ */
+static void
+RegisterWalSndWaiter(BackendId backendId, XLogRecPtr record, Latch *latch)
+{
+       /* use volatile pointer to prevent code rearrangement */
+       volatile WalSndCtlData  *walsndctl = WalSndCtl;
+       int             i;
+       int             count = 0;
+
+       LWLockAcquire(WalSndWaiterLock, LW_EXCLUSIVE);
+
+       /* Out of slots. This should not happen. */
+       if (walsndctl->numWaiters + 1 > walsndctl->maxWaiters)
+               elog(PANIC, "out of replication waiters slots");
+
+       /*
+        * The given position is expected to be relatively new in the
+        * wait list. Since the entries in the list are sorted in an
+        * increasing order of XLogRecPtr, we can shorten the time it
+        * takes to find an insert slot by scanning the list backwards.
+        */
+       for (i = walsndctl->numWaiters; i > 0; i--)
+       {
+               if (XLByteLE(WalSndWaiters[i - 1].record, record))
+                       break;
+               count++;
+       }
+
+       /* Shuffle the list if needed */
+       if (count > 0)
+               memmove(&WalSndWaiters[i + 1], &WalSndWaiters[i],
+                               count * sizeof(WalSndWaiter));
+
+       WalSndWaiters[i].backendId = backendId;
+       WalSndWaiters[i].record = record;
+       WalSndWaiters[i].latch = latch;
+       walsndctl->numWaiters++;
+
+       LWLockRelease(WalSndWaiterLock);
+}
+
+/*
+ * Wake up the backends waiting until replication has been completed
+ * up to the position older than or equal to the given one.
+ *
+ * Wake up all waiters if InvalidXLogRecPtr is given.
+   */
+static void
+WakeupWalSndWaiters(XLogRecPtr record)
+{
+       /* use volatile pointer to prevent code rearrangement */
+       volatile WalSndCtlData  *walsndctl = WalSndCtl;
+       int             i;
+       int             count = 0;
+       bool    all_wakeup = (record.xlogid == 0 && record.xrecoff == 0);
+
+       LWLockAcquire(WalSndWaiterLock, LW_EXCLUSIVE);
+
+       for (i = 0; i < walsndctl->numWaiters; i++)
+       {
+               /* use volatile pointer to prevent code rearrangement */
+               volatile WalSndWaiter  *waiter = &WalSndWaiters[i];
+
+               if (all_wakeup || XLByteLE(waiter->record, record))
+               {
+                       SetProcLatch(waiter->latch, PROCSIG_REPLICATION_INTERRUPT,
+                                                waiter->backendId);
+                       count++;
+               }
+               else
+               {
+                       /*
+                        * If the backend waiting for the Ack position newer than
+                        * the given one is found, we don't need to search the wait
+                        * list any more. This is because the waiters in the list
+                        * are guaranteed to be sorted in an increasing order of
+                        * XLogRecPtr.
+                        */
+                       break;
+               }
+       }
+
+       /* If there are still some waiters, left-justify them in the list */
+       walsndctl->numWaiters -= count;
+       if (walsndctl->numWaiters > 0 && count > 0)
+               memmove(&WalSndWaiters[0], &WalSndWaiters[i],
+                               walsndctl->numWaiters * sizeof(WalSndWaiter));
+
+       LWLockRelease(WalSndWaiterLock);
+}
+
 /*
- * Returns the oldest Send position among walsenders. Or InvalidXLogRecPtr
- * if none.
+ * Returns the oldest Ack position in synchronous walsenders. Or
+ * InvalidXLogRecPtr if none.
  */
-XLogRecPtr
-GetOldestWALSendPointer(void)
+static XLogRecPtr
+GetOldestAckdPtr(void)
 {
        XLogRecPtr      oldest = {0, 0};
-       int                     i;
-       bool            found = false;
+       int             i;
+       bool    found = false;
 
        for (i = 0; i < max_wal_senders; i++)
        {
                /* use volatile pointer to prevent code rearrangement */
                volatile WalSnd *walsnd = &WalSndCtl->walsnds[i];
-               XLogRecPtr      recptr;
+               XLogRecPtr              recptr;
+               WalSndState             state;
 
                if (walsnd->pid == 0)
                        continue;
 
                SpinLockAcquire(&walsnd->mutex);
-               recptr = walsnd->sentPtr;
+               state = walsnd->walSndState;
+               recptr = walsnd->ackdPtr;
                SpinLockRelease(&walsnd->mutex);
 
-               if (recptr.xlogid == 0 && recptr.xrecoff == 0)
+               if (state <= WALSND_ASYNC ||
+                       (recptr.xlogid == 0 && recptr.xrecoff == 0))
                        continue;
 
                if (!found || XLByteLT(recptr, oldest))
@@ -982,4 +1557,11 @@ GetOldestWALSendPointer(void)
        return oldest;
 }
 
-#endif
+/*
+ * This is called when PROCSIG_REPLICATION_INTERRUPT is received.
+ */
+void
+HandleReplicationInterrupt(void)
+{
+       replication_done = true;
+}
index 1d897c5..f947339 100644 (file)
@@ -15,7 +15,7 @@ override CFLAGS+= -fno-inline
 endif
 endif
 
-OBJS = ipc.o ipci.o pmsignal.o procarray.o procsignal.o shmem.o shmqueue.o \
+OBJS = ipc.o ipci.o pmevent.o pmsignal.o procarray.o procsignal.o shmem.o shmqueue.o \
        sinval.o sinvaladt.o standby.o
 
 include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/storage/ipc/pmevent.c b/src/backend/storage/ipc/pmevent.c
new file mode 100644 (file)
index 0000000..dadbfb3
--- /dev/null
@@ -0,0 +1,41 @@
+/*-------------------------------------------------------------------------
+ *
+ * pmevent.c
+ *       routines for signaling the postmaster from users
+ *
+ *
+ * Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ *       $PostgreSQL: pgsql/src/backend/storage/ipc/pmevent.c, Exp $
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include <sys/stat.h>
+#include <unistd.h>
+
+#include "storage/pmevent.h"
+
+
+/*
+ * CheckPostmasterEvent - check to see if a particular reason has been
+ * signaled, and remove the event file.  Should be called by postmaster
+ * after receiving SIGUSR1.
+ */
+bool
+CheckPostmasterEvent(const char *eventfile)
+{
+       struct stat stat_buf;
+       char            eventFilePath[MAXPGPATH];
+
+       PMEventFilePath(eventFilePath, PMEVENTDIR, eventfile);
+       if (stat(eventFilePath, &stat_buf) == 0)
+       {
+               unlink(eventFilePath);
+               return true;
+       }
+       return false;
+}
index 0b8842a..4f416ef 100644 (file)
@@ -20,6 +20,7 @@
 #include "bootstrap/bootstrap.h"
 #include "commands/async.h"
 #include "miscadmin.h"
+#include "replication/walsender.h"
 #include "storage/ipc.h"
 #include "storage/latch.h"
 #include "storage/procsignal.h"
@@ -172,6 +173,20 @@ CleanupProcSignalState(int status, Datum arg)
 int
 SendProcSignal(pid_t pid, ProcSignalReason reason, BackendId backendId)
 {
+       if (SetProcSignalReason(pid, reason, backendId))
+               return kill(pid, SIGUSR1);              /* Send signal */
+
+       errno = ESRCH;
+       return -1;
+}
+
+/*
+ * SetProcSignalReason
+ *             Set the reason flag
+ */
+bool
+SetProcSignalReason(pid_t pid, ProcSignalReason reason, BackendId backendId)
+{
        volatile ProcSignalSlot *slot;
 
        if (backendId != InvalidBackendId)
@@ -190,8 +205,7 @@ SendProcSignal(pid_t pid, ProcSignalReason reason, BackendId backendId)
                {
                        /* Atomically set the proper flag */
                        slot->pss_signalFlags[reason] = true;
-                       /* Send signal */
-                       return kill(pid, SIGUSR1);
+                       return true;
                }
        }
        else
@@ -214,14 +228,11 @@ SendProcSignal(pid_t pid, ProcSignalReason reason, BackendId backendId)
 
                                /* Atomically set the proper flag */
                                slot->pss_signalFlags[reason] = true;
-                               /* Send signal */
-                               return kill(pid, SIGUSR1);
+                               return true;
                        }
                }
        }
-
-       errno = ESRCH;
-       return -1;
+       return false;
 }
 
 /*
@@ -279,6 +290,9 @@ procsignal_sigusr1_handler(SIGNAL_ARGS)
        if (CheckProcSignal(PROCSIG_RECOVERY_CONFLICT_BUFFERPIN))
                RecoveryConflictInterrupt(PROCSIG_RECOVERY_CONFLICT_BUFFERPIN);
 
+       if (CheckProcSignal(PROCSIG_REPLICATION_INTERRUPT))
+               HandleReplicationInterrupt();
+
        latch_sigusr1_handler();
 
        errno = save_errno;
index 2f9fe8a..3836e1d 100644 (file)
@@ -227,7 +227,7 @@ ResolveRecoveryConflictWithVirtualXIDs(VirtualTransactionId *waitlist,
                        }
 
                        /* Is it time to kill it? */
-                       if (WaitExceedsMaxStandbyDelay())
+                       if (StandbyIsTriggered() || WaitExceedsMaxStandbyDelay())
                        {
                                pid_t           pid;
 
index c9993a7..004e793 100644 (file)
@@ -196,6 +196,7 @@ InitProcGlobal(void)
                PGSemaphoreCreate(&(procs[i].sem));
                procs[i].links.next = (SHM_QUEUE *) ProcGlobal->freeProcs;
                ProcGlobal->freeProcs = &procs[i];
+               InitSharedLatch(&procs[i].latch);
        }
 
        /*
@@ -214,6 +215,7 @@ InitProcGlobal(void)
                PGSemaphoreCreate(&(procs[i].sem));
                procs[i].links.next = (SHM_QUEUE *) ProcGlobal->autovacFreeProcs;
                ProcGlobal->autovacFreeProcs = &procs[i];
+               InitSharedLatch(&procs[i].latch);
        }
 
        /*
@@ -325,6 +327,7 @@ InitProcess(void)
        for (i = 0; i < NUM_LOCK_PARTITIONS; i++)
                SHMQueueInit(&(MyProc->myProcLocks[i]));
        MyProc->recoveryConflictPending = false;
+       OwnLatch(&MyProc->latch);
 
        /*
         * We might be reusing a semaphore that belonged to a failed process. So
@@ -688,6 +691,7 @@ ProcKill(int code, Datum arg)
        }
 
        /* PGPROC struct isn't mine anymore */
+       DisownLatch(&MyProc->latch);
        MyProc = NULL;
 
        /* Update shared estimate of spins_per_delay */
index 362daae..9fd92c6 100644 (file)
@@ -163,6 +163,7 @@ static bool assign_transaction_read_only(bool newval, bool doit, GucSource sourc
 static const char *assign_canonical_path(const char *newval, bool doit, GucSource source);
 static const char *assign_timezone_abbreviations(const char *newval, bool doit, GucSource source);
 static const char *show_archive_command(void);
+static const char *show_standby_fencing_command(void);
 static bool assign_tcp_keepalives_idle(int newval, bool doit, GucSource source);
 static bool assign_tcp_keepalives_interval(int newval, bool doit, GucSource source);
 static bool assign_tcp_keepalives_count(int newval, bool doit, GucSource source);
@@ -1766,6 +1767,16 @@ static struct config_int ConfigureNamesInt[] =
        },
 
        {
+               {"replication_timeout", PGC_SIGHUP, WAL_REPLICATION,
+                       gettext_noop("Sets the maximum time to wait for the Ack from the standby."),
+                       NULL,
+                       GUC_UNIT_MS
+               },
+               &replication_timeout,
+               0, 0, INT_MAX, NULL, NULL
+       },
+
+       {
                {"commit_delay", PGC_USERSET, WAL_SETTINGS,
                        gettext_noop("Sets the delay in microseconds between transaction commit and "
                                                 "flushing WAL to disk."),
@@ -2235,6 +2246,15 @@ static struct config_string ConfigureNamesString[] =
        },
 
        {
+               {"standby_fencing_command", PGC_SIGHUP, WAL_REPLICATION,
+                       gettext_noop("Sets the shell command that will be called to shoot the standby in the head."),
+                       NULL
+               },
+               &standby_fencing_command,
+               "", NULL, show_standby_fencing_command
+       },
+
+       {
                {"client_encoding", PGC_USERSET, CLIENT_CONN_LOCALE,
                        gettext_noop("Sets the client's character set encoding."),
                        NULL,
@@ -2827,6 +2847,15 @@ static struct config_enum ConfigureNamesEnum[] =
        },
 
        {
+               {"replication_mode", PGC_POSTMASTER, WAL_STANDBY_SERVERS,
+                       gettext_noop("Set the synchronization mode of replication."),
+                       NULL
+               },
+               &replication_mode,
+               REPLICATION_MODE_ASYNC, replication_mode_options, NULL
+       },
+
+       {
                {"wal_sync_method", PGC_SIGHUP, WAL_SETTINGS,
                        gettext_noop("Selects the method used for forcing WAL updates to disk."),
                        NULL
@@ -7912,6 +7941,15 @@ show_archive_command(void)
                return "(disabled)";
 }
 
+static const char *
+show_standby_fencing_command(void)
+{
+       if (max_wal_senders > 0)
+               return standby_fencing_command;
+       else
+               return "(disabled)";
+}
+
 static bool
 assign_tcp_keepalives_idle(int newval, bool doit, GucSource source)
 {
index 0db24b0..eebd049 100644 (file)
 #wal_sender_delay = 200ms      # walsender cycle time, 1-10000 milliseconds
 #wal_keep_segments = 0         # in logfile segments, 16MB each; 0 disables
 #vacuum_defer_cleanup_age = 0  # number of xacts by which cleanup is delayed
+#replication_timeout = 0 # in milliseconds, 0 is disabled
+#standby_fencing_command = ''  # command to use to shoot standby
 
 # - Standby Servers -
 
 #max_standby_streaming_delay = 30s     # max delay before canceling queries
                                        # when reading streaming WAL;
                                        # -1 allows indefinite delay
+#replication_mode = async                      # async, recv, fsync, or apply
+                                       # (change requires restart)
 
 
 #------------------------------------------------------------------------------
index 0aee70d..8942d7b 100644 (file)
@@ -2458,6 +2458,7 @@ main(int argc, char *argv[])
                "pg_xlog",
                "pg_xlog/archive_status",
                "pg_clog",
+               "pg_event",
                "pg_notify",
                "pg_subtrans",
                "pg_twophase",
index dad7e8b..6fa858a 100644 (file)
@@ -34,6 +34,7 @@
 #include "libpq/pqsignal.h"
 #include "getopt_long.h"
 #include "miscadmin.h"
+#include "storage/pmevent.h"
 
 #if defined(__CYGWIN__)
 #include <sys/cygwin.h>
@@ -61,6 +62,7 @@ typedef enum
        START_COMMAND,
        STOP_COMMAND,
        RESTART_COMMAND,
+       PROMOTE_COMMAND,
        RELOAD_COMMAND,
        STATUS_COMMAND,
        KILL_COMMAND,
@@ -105,6 +107,7 @@ static void do_init(void);
 static void do_start(void);
 static void do_stop(void);
 static void do_restart(void);
+static void do_promote(void);
 static void do_reload(void);
 static void do_status(void);
 static void do_kill(pgpid_t pid);
@@ -143,6 +146,7 @@ static char pid_file[MAXPGPATH];
 static char conf_file[MAXPGPATH];
 static char backup_file[MAXPGPATH];
 static char recovery_file[MAXPGPATH];
+static char event_dir[MAXPGPATH];
 
 #if defined(HAVE_GETRLIMIT) && defined(RLIMIT_CORE)
 static void unlimit_core_size(void);
@@ -650,6 +654,53 @@ find_other_exec_or_die(const char *argv0, const char *target, const char *versio
 }
 
 static void
+send_postmaster_event(const char *eventtype, const char *eventname)
+{
+       FILE       *eventfile;
+       char            eventpath[MAXPGPATH];
+       pgpid_t         pid;
+
+       pid = get_pgpid();
+       if (pid == 0)                           /* no pid file */
+       {
+               write_stderr(_("%s: PID file \"%s\" does not exist\n"), progname, pid_file);
+               write_stderr(_("Is server running?\n"));
+               exit(1);
+       }
+       else if (pid < 0)                       /* standalone backend, not postmaster */
+       {
+               pid = -pid;
+               write_stderr(_("%s: cannot %s server; "
+                                          "single-user server is running (PID: %ld)\n"),
+                                        progname, eventname, pid);
+               write_stderr(_("Please terminate the single-user server and try again.\n"));
+               exit(1);
+       }
+
+       PMEventFilePath(eventpath, event_dir, eventtype);
+       if ((eventfile = fopen(eventpath, "w")) == NULL)
+       {
+               write_stderr(_("%s: could not create event file \"%s\": %s\n"),
+                                        progname, eventpath, strerror(errno));
+               exit(1);
+       }
+       if (fclose(eventfile))
+       {
+               write_stderr(_("%s: could not write event file \"%s\": %s\n"),
+                                        progname, eventpath, strerror(errno));
+               exit(1);
+       }
+
+       sig = SIGUSR1;
+       if (kill((pid_t) pid, sig) != 0)
+       {
+               write_stderr(_("%s: could not send %s signal (PID: %ld): %s\n"),
+                                        progname, eventname, pid, strerror(errno));
+               exit(1);
+       }
+}
+
+static void
 do_init(void)
 {
        char            cmd[MAXPGPATH];
@@ -940,6 +991,24 @@ do_restart(void)
 
 
 static void
+do_promote(void)
+{
+       struct stat statbuf;
+
+       /* If recovery.conf doesn't exist, the server is not in standby mode */
+       if (stat(recovery_file, &statbuf) != 0)
+       {
+               write_stderr(_("%s: cannot promote server; "
+                                          "server is not in standby mode\n"),
+                                        progname);
+               exit(1);
+       }
+       send_postmaster_event(PMEVENT_PROMOTE_STANDBY, "promote");
+       print_msg(_("server promoted\n"));
+}
+
+
+static void
 do_reload(void)
 {
        pgpid_t         pid;
@@ -1590,7 +1659,7 @@ do_advice(void)
 static void
 do_help(void)
 {
-       printf(_("%s is a utility to start, stop, restart, reload configuration files,\n"
+       printf(_("%s is a utility to start, stop, restart, promote, reload configuration files,\n"
                         "report the status of a PostgreSQL server, or signal a PostgreSQL process.\n\n"), progname);
        printf(_("Usage:\n"));
        printf(_("  %s init[db]               [-D DATADIR] [-s] [-o \"OPTIONS\"]\n"), progname);
@@ -1598,6 +1667,7 @@ do_help(void)
        printf(_("  %s stop    [-W] [-t SECS] [-D DATADIR] [-s] [-m SHUTDOWN-MODE]\n"), progname);
        printf(_("  %s restart [-w] [-t SECS] [-D DATADIR] [-s] [-m SHUTDOWN-MODE]\n"
                         "                 [-o \"OPTIONS\"]\n"), progname);
+       printf(_("  %s promote [-D DATADIR] [-s]\n"), progname);
        printf(_("  %s reload  [-D DATADIR] [-s]\n"), progname);
        printf(_("  %s status  [-D DATADIR]\n"), progname);
        printf(_("  %s kill    SIGNALNAME PID\n"), progname);
@@ -1892,6 +1962,8 @@ main(int argc, char **argv)
                                ctl_command = STOP_COMMAND;
                        else if (strcmp(argv[optind], "restart") == 0)
                                ctl_command = RESTART_COMMAND;
+                       else if (strcmp(argv[optind], "promote") == 0)
+                               ctl_command = PROMOTE_COMMAND;
                        else if (strcmp(argv[optind], "reload") == 0)
                                ctl_command = RELOAD_COMMAND;
                        else if (strcmp(argv[optind], "status") == 0)
@@ -1980,6 +2052,7 @@ main(int argc, char **argv)
                snprintf(conf_file, MAXPGPATH, "%s/postgresql.conf", pg_data);
                snprintf(backup_file, MAXPGPATH, "%s/backup_label", pg_data);
                snprintf(recovery_file, MAXPGPATH, "%s/recovery.conf", pg_data);
+               snprintf(event_dir, MAXPGPATH, "%s/" PMEVENTDIR, pg_data);
        }
 
        switch (ctl_command)
@@ -1999,6 +2072,9 @@ main(int argc, char **argv)
                case RESTART_COMMAND:
                        do_restart();
                        break;
+               case PROMOTE_COMMAND:
+                       do_promote();
+                       break;
                case RELOAD_COMMAND:
                        do_reload();
                        break;
index ea156d3..6dba7dc 100644 (file)
@@ -15,6 +15,7 @@
 #include "access/xlogdefs.h"
 #include "lib/stringinfo.h"
 #include "storage/buf.h"
+#include "utils/guc.h"
 #include "utils/pg_crc.h"
 #include "utils/timestamp.h"
 
@@ -220,6 +221,32 @@ extern int wal_level;
 /* Do we need to WAL-log information required only for Hot Standby? */
 #define XLogStandbyInfoActive() (wal_level >= WAL_LEVEL_HOT_STANDBY)
 
+/*
+ * Replication mode. This is used to identify how long transaction
+ * commit should wait for replication.
+ *
+ * REPLICATION_MODE_ASYNC doesn't make transaction commit wait for
+ * replication, i.e., asynchronous replication.
+ *
+ * REPLICATION_MODE_RECV makes transaction commit wait for XLOG
+ * records to be received on the standby.
+ *
+ * REPLICATION_MODE_FSYNC makes transaction commit wait for XLOG
+ * records to be received and fsync'd on the standby.
+ *
+ * REPLICATION_MODE_APPLY makes transaction commit wait for XLOG
+ * records to be received, fsync'd and applied on the standby.
+ */
+typedef enum ReplicationMode
+{
+       REPLICATION_MODE_ASYNC = 0,
+       REPLICATION_MODE_RECV,
+       REPLICATION_MODE_FSYNC,
+       REPLICATION_MODE_APPLY
+} ReplicationMode;
+extern int     replication_mode;
+extern const struct config_enum_entry replication_mode_options[];
+
 #ifdef WAL_DEBUG
 extern bool XLOG_DEBUG;
 #endif
@@ -298,11 +325,13 @@ extern void XLogPutNextOid(Oid nextOid);
 extern XLogRecPtr GetRedoRecPtr(void);
 extern XLogRecPtr GetInsertRecPtr(void);
 extern XLogRecPtr GetFlushRecPtr(void);
+extern XLogRecPtr GetReplayRecPtr(void);
 extern void GetNextXidAndEpoch(TransactionId *xid, uint32 *epoch);
 extern TimeLineID GetRecoveryTargetTLI(void);
 
 extern void HandleStartupProcInterrupts(void);
 extern void StartupProcessMain(void);
+extern bool StandbyIsTriggered(void);
 extern void WakeupRecovery(void);
 
 #endif   /* XLOG_H */
index 31a9650..d389122 100644 (file)
@@ -3313,6 +3313,8 @@ DESCR("xlog filename, given an xlog location");
 
 DATA(insert OID = 3810 (  pg_is_in_recovery            PGNSP PGUID 12 1 0 0 f f f t f v 0 0 16 "" _null_ _null_ _null_ _null_ pg_is_in_recovery _null_ _null_ _null_ ));
 DESCR("true if server is in recovery");
+DATA(insert OID = 3811 (  pg_is_in_sync                PGNSP PGUID 12 1 0 0 f f f t f v 0 0 16 "" _null_ _null_ _null_ _null_ pg_is_in_sync _null_ _null_ _null_ ));
+DESCR("true if server is in sync with primary");
 
 DATA(insert OID = 3820 ( pg_last_xlog_receive_location PGNSP PGUID 12 1 0 0 f f f t f v 0 0 25 "" _null_ _null_ _null_ _null_ pg_last_xlog_receive_location _null_ _null_ _null_ ));
 DESCR("current xlog flush location");
index 978d9a9..fb3277f 100644 (file)
@@ -59,7 +59,9 @@ extern int    pq_getbyte(void);
 extern int     pq_peekbyte(void);
 extern int     pq_getbyte_if_available(unsigned char *c);
 extern int     pq_putbytes(const char *s, size_t len);
+extern int     pq_putbytes_if_writable(const char *s, size_t len);
 extern int     pq_flush(void);
+extern int     pq_flush_if_writable(void);
 extern int     pq_putmessage(char msgtype, const char *s, size_t len);
 extern void pq_startcopyout(void);
 extern void pq_endcopyout(bool errorAbort);
index edba868..f176745 100644 (file)
@@ -50,4 +50,14 @@ typedef struct
  */
 #define MAX_SEND_SIZE (XLOG_BLCKSZ * 16)
 
+/*
+ * Body for a WAL acknowledgment message (message type 'l'). This is wrapped
+ * within a CopyData message at the FE/BE protocol level.
+ */
+typedef struct
+{
+       /* End of WAL replicated to the standby */
+       XLogRecPtr      ackEnd;
+} WalAckMessageData;
+
 #endif   /* _WALPROTOCOL_H */
index e8cae3c..5fa7f80 100644 (file)
@@ -50,6 +50,11 @@ typedef struct
        pg_time_t       startTime;
 
        /*
+        * Has the standby already caught up with the primary?
+        */
+       bool            reachSync;
+
+       /*
         * receivedUpto-1 is the last byte position that has already been
         * received.  When startup process starts the walreceiver, it sets
         * receivedUpto to the point where it wants the streaming to begin. After
@@ -100,5 +105,6 @@ extern void ShutdownWalRcv(void);
 extern bool WalRcvInProgress(void);
 extern void RequestXLogStreaming(XLogRecPtr recptr, const char *conninfo);
 extern XLogRecPtr GetWalRcvWriteRecPtr(XLogRecPtr *latestChunkStart);
+extern Datum pg_is_in_sync(PG_FUNCTION_ARGS);
 
 #endif   /* _WALRECEIVER_H */
index 87e0120..6186cf8 100644 (file)
 #include "storage/spin.h"
 
 /*
+ * Values for WalSnd->walSndState.
+ */
+typedef enum
+{
+       WALSND_INIT,            /* launched and initialized */
+       WALSND_ASYNC,           /* performing asynchronous replication */
+       WALSND_CATCHUP,         /* bulk-sending WAL for standby to catch up */
+       WALSND_PRESYNC,         /* sent all the WAL required to get into sync */
+       WALSND_SYNC                     /* in sync with standby */
+} WalSndState;
+
+/*
  * Each walsender has a WalSnd struct in shared memory.
  */
 typedef struct WalSnd
 {
        pid_t           pid;                    /* this walsender's process id, or 0 */
        XLogRecPtr      sentPtr;                /* WAL has been sent up to this point */
+       XLogRecPtr      ackdPtr;                /* WAL has been replicated up to this point */
+
+       WalSndState     walSndState;    /* current state */
 
        slock_t         mutex;                  /* locks shared variables shown above */
 
@@ -36,22 +51,43 @@ typedef struct WalSnd
 /* There is one WalSndCtl struct for the whole database cluster */
 typedef struct
 {
+       /* Protected by WalSndWaiterLock */
+       int                     numWaiters;     /* current # of WalSndWaiters */
+       int                     maxWaiters;     /* allocated size of WalSndWaiters */
+
        WalSnd          walsnds[1];             /* VARIABLE LENGTH ARRAY */
 } WalSndCtlData;
 
 extern WalSndCtlData *WalSndCtl;
 
+/*
+ * Each waiter has a WalSndWaiter struct in shared memory.
+ */
+typedef struct WalSndWaiter
+{
+       BackendId       backendId;      /* this waiter's backend ID */
+       XLogRecPtr      record;         /* this waiter wants for replication to be
+                                                        * acked up to this point */
+       Latch      *latch;              /* pointer to the latch used to wake up this
+                                                        * waiter */
+} WalSndWaiter;
+
 /* global state */
 extern bool am_walsender;
 
 /* user-settable parameters */
 extern int     WalSndDelay;
 extern int     max_wal_senders;
+extern int     replication_timeout;
+extern char    *standby_fencing_command;
 
 extern int     WalSenderMain(void);
 extern void WalSndSignals(void);
 extern Size WalSndShmemSize(void);
 extern void WalSndShmemInit(void);
 extern void WalSndWakeup(void);
+extern void WaitXLogSend(XLogRecPtr record);
+
+extern void HandleReplicationInterrupt(void);
 
 #endif   /* _WALSENDER_H */
index 0c0b01c..9a96fdb 100644 (file)
@@ -16,6 +16,8 @@
 
 #include <signal.h>
 
+#include "storage/procsignal.h"
+
 /*
  * Latch structure should be treated as opaque and only accessed through
  * the public functions. It is defined here to allow embedding Latches as
@@ -40,8 +42,10 @@ extern void OwnLatch(volatile Latch *latch);
 extern void DisownLatch(volatile Latch *latch);
 extern bool WaitLatch(volatile Latch *latch, long timeout);
 extern int     WaitLatchOrSocket(volatile Latch *latch, pgsocket sock,
-                                 long timeout);
+                                 bool forRead, bool forWrite, long timeout);
 extern void SetLatch(volatile Latch *latch);
+extern void SetProcLatch(volatile Latch *latch,
+                                 ProcSignalReason reason, BackendId backendId);
 extern void ResetLatch(volatile Latch *latch);
 #define TestLatch(latch) (((volatile Latch *) latch)->is_set)
 
index 0322007..cfa14f3 100644 (file)
@@ -70,6 +70,7 @@ typedef enum LWLockId
        RelationMappingLock,
        AsyncCtlLock,
        AsyncQueueLock,
+       WalSndWaiterLock,
        /* Individual lock IDs end here */
        FirstBufMappingLock,
        FirstLockMgrLock = FirstBufMappingLock + NUM_BUFFER_PARTITIONS,
diff --git a/src/include/storage/pmevent.h b/src/include/storage/pmevent.h
new file mode 100644 (file)
index 0000000..e3ac7f2
--- /dev/null
@@ -0,0 +1,38 @@
+/*-------------------------------------------------------------------------
+ *
+ * pmevent.h
+ *       routines for signaling the postmaster from users
+ *
+ *
+ * Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * $PostgreSQL: pgsql/src/include/storage/pmevent.h, Exp $
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef PMEVENT_H
+#define PMEVENT_H
+
+/*
+ * Postmaster event directory (relative to $PGDATA)
+ */
+#define PMEVENTDIR             "pg_event"
+
+/*
+ * Reasons for signaling the postmaster.  We can cope with simultaneous
+ * signals for different reasons.  If the same reason is signaled multiple
+ * times in quick succession, however, the postmaster is likely to observe
+ * only one notification of it.  This is okay for the present uses.
+ */
+#define PMEVENT_PROMOTE_STANDBY                "promote_standby"
+
+#define PMEventFilePath(path, eventdir, eventfile)     \
+       snprintf(path, MAXPGPATH, "%s/%s", eventdir, eventfile)
+
+/*
+ * prototypes for functions in pmevent.c
+ */
+extern bool CheckPostmasterEvent(const char *eventfile);
+
+#endif   /* PMEVENT_H */
index f77c4d3..80041c5 100644 (file)
@@ -14,6 +14,7 @@
 #ifndef _PROC_H_
 #define _PROC_H_
 
+#include "storage/latch.h"
 #include "storage/lock.h"
 #include "storage/pg_sema.h"
 #include "utils/timestamp.h"
@@ -116,6 +117,12 @@ struct PGPROC
                                                                 * lock object by this backend */
 
        /*
+        * Latch used by walsenders to wake up this backend when replication
+        * has been done.
+        */
+       Latch           latch;
+
+       /*
         * All PROCLOCK objects for locks held or awaited by this backend are
         * linked into one of these lists, according to the partition number of
         * their lock.
index aad9898..9f0b642 100644 (file)
@@ -40,6 +40,8 @@ typedef enum
        PROCSIG_RECOVERY_CONFLICT_BUFFERPIN,
        PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK,
 
+       PROCSIG_REPLICATION_INTERRUPT,  /* replication interrupt */
+
        NUM_PROCSIGNALS                         /* Must be last! */
 } ProcSignalReason;
 
@@ -52,6 +54,8 @@ extern void ProcSignalShmemInit(void);
 extern void ProcSignalInit(int pss_idx);
 extern int SendProcSignal(pid_t pid, ProcSignalReason reason,
                           BackendId backendId);
+extern bool SetProcSignalReason(pid_t pid, ProcSignalReason reason,
+                          BackendId backendId);
 
 extern void procsignal_sigusr1_handler(SIGNAL_ARGS);
 
index 5526738..344bd40 100644 (file)
@@ -1588,6 +1588,7 @@ PQgetResult(PGconn *conn)
                                res = PQmakeEmptyPGresult(conn, PGRES_COPY_IN);
                        break;
                case PGASYNC_COPY_OUT:
+               case PGASYNC_COPY_XLOG:
                        if (conn->result && conn->result->resultStatus == PGRES_COPY_OUT)
                                res = pqPrepareAsyncResult(conn);
                        else