OSDN Git Service

Hot Standby feedback for avoidance of cleanup conflicts on standby.
authorSimon Riggs <simon@2ndQuadrant.com>
Wed, 16 Feb 2011 19:29:37 +0000 (19:29 +0000)
committerSimon Riggs <simon@2ndQuadrant.com>
Wed, 16 Feb 2011 19:29:37 +0000 (19:29 +0000)
Standby optionally sends back information about oldestXmin of queries
which is then checked and applied to the WALSender's proc->xmin.
GetOldestXmin() is modified slightly to agree with GetSnapshotData(),
so that all backends on primary include WALSender within their snapshots.
Note this does nothing to change the snapshot xmin on either master or
standby. Feedback piggybacks on the standby reply message.
vacuum_defer_cleanup_age is no longer used on standby, though parameter
still exists on primary, since some use cases still exist.

Simon Riggs, review comments from Fujii Masao, Heikki Linnakangas, Robert Haas

doc/src/sgml/config.sgml
doc/src/sgml/high-availability.sgml
src/backend/access/transam/xlog.c
src/backend/replication/walreceiver.c
src/backend/replication/walsender.c
src/backend/storage/ipc/procarray.c
src/backend/utils/misc/guc.c
src/backend/utils/misc/postgresql.conf.sample
src/include/access/xlog.h
src/include/replication/walprotocol.h
src/include/replication/walreceiver.h

index 9505caf..cee09c7 100644 (file)
@@ -2006,6 +2006,10 @@ SET ENABLE_SEQSCAN TO OFF;
         This parameter can only be set in the <filename>postgresql.conf</>
         file or on the server command line.
        </para>
+       <para>
+        You should also consider setting <varname>hot_standby_feedback</>
+        as an alternative to using this parameter.
+       </para>
       </listitem>
      </varlistentry>
      </variablelist>
@@ -2121,6 +2125,22 @@ SET ENABLE_SEQSCAN TO OFF;
       </listitem>
      </varlistentry>
 
+     <varlistentry id="guc-hot-standby-feedback" xreflabel="hot_standby">
+      <term><varname>hot_standby_feedback</varname> (<type>boolean</type>)</term>
+      <indexterm>
+       <primary><varname>hot_standby_feedback</> configuration parameter</primary>
+      </indexterm>
+      <listitem>
+       <para>
+        Specifies whether or not a hot standby will send feedback to the primary
+        about queries currently executing on the standby. This parameter can
+        be used to eliminate query cancels caused by cleanup records, though
+        it can cause database bloat on the primary for some workloads.
+        The default value is <literal>off</literal>.
+       </para>
+      </listitem>
+     </varlistentry>
+
      </variablelist>
     </sect2>
    </sect1>
index 368c688..37ba43b 100644 (file)
@@ -1484,23 +1484,6 @@ if (!triggered)
    </para>
 
    <para>
-    The most common reason for conflict between standby queries and WAL replay
-    is <quote>early cleanup</>.  Normally, <productname>PostgreSQL</> allows
-    cleanup of old row versions when there are no transactions that need to
-    see them to ensure correct visibility of data according to MVCC rules.
-    However, this rule can only be applied for transactions executing on the
-    master.  So it is possible that cleanup on the master will remove row
-    versions that are still visible to a transaction on the standby.
-   </para>
-
-   <para>
-    Experienced users should note that both row version cleanup and row version
-    freezing will potentially conflict with standby queries. Running a manual
-    <command>VACUUM FREEZE</> is likely to cause conflicts even on tables with
-    no updated or deleted rows.
-   </para>
-
-   <para>
     Once the delay specified by <varname>max_standby_archive_delay</> or
     <varname>max_standby_streaming_delay</> has been exceeded, conflicting
     queries will be cancelled.  This usually results just in a cancellation
@@ -1527,6 +1510,23 @@ if (!triggered)
    </para>
 
    <para>
+    The most common reason for conflict between standby queries and WAL replay
+    is <quote>early cleanup</>.  Normally, <productname>PostgreSQL</> allows
+    cleanup of old row versions when there are no transactions that need to
+    see them to ensure correct visibility of data according to MVCC rules.
+    However, this rule can only be applied for transactions executing on the
+    master.  So it is possible that cleanup on the master will remove row
+    versions that are still visible to a transaction on the standby.
+   </para>
+
+   <para>
+    Experienced users should note that both row version cleanup and row version
+    freezing will potentially conflict with standby queries. Running a manual
+    <command>VACUUM FREEZE</> is likely to cause conflicts even on tables with
+    no updated or deleted rows.
+   </para>
+
+   <para>
     Users should be clear that tables that are regularly and heavily updated
     on the primary server will quickly cause cancellation of longer running
     queries on the standby. In such cases the setting of a finite value for
@@ -1537,12 +1537,10 @@ if (!triggered)
 
    <para>
     Remedial possibilities exist if the number of standby-query cancellations
-    is found to be unacceptable.  The first option is to connect to the
-    primary server and keep a query active for as long as needed to
-    run queries on the standby. This prevents <command>VACUUM</> from removing
-    recently-dead rows and so cleanup conflicts do not occur.
-    This could be done using <xref linkend="dblink"> and
-    <function>pg_sleep()</>, or via other mechanisms. If you do this, you
+    is found to be unacceptable.  The first option is to set the parameter
+    <varname>hot_standby_feedback</>, which prevents <command>VACUUM</> from
+    removing recently-dead rows and so cleanup conflicts do not occur.
+    If you do this, you
     should note that this will delay cleanup of dead rows on the primary,
     which may result in undesirable table bloat. However, the cleanup
     situation will be no worse than if the standby queries were running
index 6fdaaff..3ba1f29 100644 (file)
@@ -158,6 +158,11 @@ static XLogRecPtr LastRec;
  * known, need to check the shared state".
  */
 static bool LocalRecoveryInProgress = true;
+/*
+ * Local copy of SharedHotStandbyActive variable. False actually means "not
+ * known, need to check the shared state".
+ */
+static bool LocalHotStandbyActive = false;
 
 /*
  * Local state for XLogInsertAllowed():
@@ -406,6 +411,12 @@ typedef struct XLogCtlData
        bool            SharedRecoveryInProgress;
 
        /*
+        * SharedHotStandbyActive indicates if we're still in crash or archive
+        * recovery.  Protected by info_lck.
+        */
+       bool            SharedHotStandbyActive;
+
+       /*
         * recoveryWakeupLatch is used to wake up the startup process to
         * continue WAL replay, if it is waiting for WAL to arrive or failover
         * trigger file to appear.
@@ -4917,6 +4928,7 @@ XLOGShmemInit(void)
         */
        XLogCtl->XLogCacheBlck = XLOGbuffers - 1;
        XLogCtl->SharedRecoveryInProgress = true;
+       XLogCtl->SharedHotStandbyActive = false;
        XLogCtl->Insert.currpage = (XLogPageHeader) (XLogCtl->pages);
        SpinLockInit(&XLogCtl->info_lck);
        InitSharedLatch(&XLogCtl->recoveryWakeupLatch);
@@ -6790,8 +6802,6 @@ StartupXLOG(void)
 static void
 CheckRecoveryConsistency(void)
 {
-       static bool backendsAllowed = false;
-
        /*
         * Have we passed our safe starting point?
         */
@@ -6811,11 +6821,19 @@ CheckRecoveryConsistency(void)
         * enabling connections.
         */
        if (standbyState == STANDBY_SNAPSHOT_READY &&
-               !backendsAllowed &&
+               !LocalHotStandbyActive &&
                reachedMinRecoveryPoint &&
                IsUnderPostmaster)
        {
-               backendsAllowed = true;
+               /* use volatile pointer to prevent code rearrangement */
+               volatile XLogCtlData *xlogctl = XLogCtl;
+
+               SpinLockAcquire(&xlogctl->info_lck);
+               xlogctl->SharedHotStandbyActive = true;
+               SpinLockRelease(&xlogctl->info_lck);
+
+               LocalHotStandbyActive = true;
+
                SendPostmasterSignal(PMSIGNAL_BEGIN_HOT_STANDBY);
        }
 }
@@ -6863,6 +6881,38 @@ RecoveryInProgress(void)
 }
 
 /*
+ * Is HotStandby active yet? This is only important in special backends
+ * since normal backends won't ever be able to connect until this returns
+ * true. Postmaster knows this by way of signal, not via shared memory.
+ *
+ * Unlike testing standbyState, this works in any process that's connected to
+ * shared memory.
+ */
+bool
+HotStandbyActive(void)
+{
+       /*
+        * We check shared state each time only until Hot Standby is active. We
+        * can't de-activate Hot Standby, so there's no need to keep checking after
+        * the shared variable has once been seen true.
+        */
+       if (LocalHotStandbyActive)
+               return true;
+       else
+       {
+               /* use volatile pointer to prevent code rearrangement */
+               volatile XLogCtlData *xlogctl = XLogCtl;
+
+               /* spinlock is essential on machines with weak memory ordering! */
+               SpinLockAcquire(&xlogctl->info_lck);
+               LocalHotStandbyActive = xlogctl->SharedHotStandbyActive;
+               SpinLockRelease(&xlogctl->info_lck);
+
+               return LocalHotStandbyActive;
+       }
+}
+
+/*
  * Is this process allowed to insert new WAL records?
  *
  * Ordinarily this is essentially equivalent to !RecoveryInProgress().
index b1e5247..ee09468 100644 (file)
@@ -38,6 +38,7 @@
 #include <signal.h>
 #include <unistd.h>
 
+#include "access/transam.h"
 #include "access/xlog_internal.h"
 #include "libpq/pqsignal.h"
 #include "miscadmin.h"
@@ -45,6 +46,7 @@
 #include "replication/walreceiver.h"
 #include "storage/ipc.h"
 #include "storage/pmsignal.h"
+#include "storage/procarray.h"
 #include "utils/builtins.h"
 #include "utils/guc.h"
 #include "utils/memutils.h"
@@ -56,6 +58,7 @@ bool          am_walreceiver;
 
 /* GUC variable */
 int                    wal_receiver_status_interval;
+bool           hot_standby_feedback;
 
 /* libpqreceiver hooks to these when loaded */
 walrcv_connect_type walrcv_connect = NULL;
@@ -610,16 +613,43 @@ XLogWalRcvSendReply(void)
                        wal_receiver_status_interval * 1000))
                return;
 
-       /* Construct a new message. */
+       /* Construct a new message */
        reply_message.write = LogstreamResult.Write;
        reply_message.flush = LogstreamResult.Flush;
        reply_message.apply = GetXLogReplayRecPtr();
        reply_message.sendTime = now;
 
-       elog(DEBUG2, "sending write %X/%X flush %X/%X apply %X/%X",
+       /*
+        * Get the OldestXmin and its associated epoch
+        */
+       if (hot_standby_feedback && HotStandbyActive())
+       {
+               TransactionId   nextXid;
+               uint32                  nextEpoch;
+
+               reply_message.xmin = GetOldestXmin(true, false);
+
+               /*
+                * Get epoch and adjust if nextXid and oldestXmin are different
+                * sides of the epoch boundary.
+                */
+               GetNextXidAndEpoch(&nextXid, &nextEpoch);
+               if (nextXid < reply_message.xmin)
+                       nextEpoch--;
+               reply_message.epoch = nextEpoch;
+       }
+       else
+       {
+               reply_message.xmin = InvalidTransactionId;
+               reply_message.epoch = 0;
+       }
+
+       elog(DEBUG2, "sending write %X/%X flush %X/%X apply %X/%X xmin %u epoch %u",
                                 reply_message.write.xlogid, reply_message.write.xrecoff,
                                 reply_message.flush.xlogid, reply_message.flush.xrecoff,
-                                reply_message.apply.xlogid, reply_message.apply.xrecoff);
+                                reply_message.apply.xlogid, reply_message.apply.xrecoff,
+                                reply_message.xmin,
+                                reply_message.epoch);
 
        /* Prepend with the message type and send it. */
        buf[0] = 'r';
index fe99616..0fdf722 100644 (file)
@@ -53,6 +53,7 @@
 #include "storage/ipc.h"
 #include "storage/pmsignal.h"
 #include "storage/proc.h"
+#include "storage/procarray.h"
 #include "tcop/tcopprot.h"
 #include "utils/builtins.h"
 #include "utils/guc.h"
@@ -502,6 +503,7 @@ ProcessStandbyReplyMessage(void)
 {
        StandbyReplyMessage     reply;
        char msgtype;
+       TransactionId newxmin = InvalidTransactionId;
 
        resetStringInfo(&reply_message);
 
@@ -531,10 +533,12 @@ ProcessStandbyReplyMessage(void)
 
        pq_copymsgbytes(&reply_message, (char *) &reply, sizeof(StandbyReplyMessage));
 
-       elog(DEBUG2, "write %X/%X flush %X/%X apply %X/%X ",
+       elog(DEBUG2, "write %X/%X flush %X/%X apply %X/%X xmin %u epoch %u",
                 reply.write.xlogid, reply.write.xrecoff,
                 reply.flush.xlogid, reply.flush.xrecoff,
-                reply.apply.xlogid, reply.apply.xrecoff);
+                reply.apply.xlogid, reply.apply.xrecoff,
+                reply.xmin,
+                reply.epoch);
 
        /*
         * Update shared state for this WalSender process
@@ -550,6 +554,69 @@ ProcessStandbyReplyMessage(void)
                walsnd->apply = reply.apply;
                SpinLockRelease(&walsnd->mutex);
        }
+
+       /*
+        * Update the WalSender's proc xmin to allow it to be visible
+        * to snapshots. This will hold back the removal of dead rows
+        * and thereby prevent the generation of cleanup conflicts
+        * on the standby server.
+        */
+       if (TransactionIdIsValid(reply.xmin))
+       {
+               TransactionId   nextXid;
+               uint32                  nextEpoch;
+               bool                    epochOK;
+
+               GetNextXidAndEpoch(&nextXid, &nextEpoch);
+
+               /*
+                * Epoch of oldestXmin should be same as standby or
+                * if the counter has wrapped, then one less than reply.
+                */
+               if (reply.xmin <= nextXid)
+               {
+                       if (reply.epoch == nextEpoch)
+                               epochOK = true;
+               }
+               else
+               {
+                       if (nextEpoch > 0 && reply.epoch == nextEpoch - 1)
+                               epochOK = true;
+               }
+
+               /*
+                * Feedback from standby must not go backwards, nor should it go
+                * forwards further than our most recent xid.
+                */
+               if (epochOK && TransactionIdPrecedesOrEquals(reply.xmin, nextXid))
+               {
+                       if (!TransactionIdIsValid(MyProc->xmin))
+                       {
+                               TransactionId oldestXmin = GetOldestXmin(true, true);
+                               if (TransactionIdPrecedes(oldestXmin, reply.xmin))
+                                       newxmin = reply.xmin;
+                               else
+                                       newxmin = oldestXmin;
+                       }
+                       else
+                       {
+                               if (TransactionIdPrecedes(MyProc->xmin, reply.xmin))
+                                       newxmin = reply.xmin;
+                               else
+                                       newxmin = MyProc->xmin; /* stay the same */
+                       }
+               }
+       }
+
+       /*
+        * Grab the ProcArrayLock to set xmin, or invalidate for bad reply
+        */
+       if (MyProc->xmin != newxmin)
+       {
+               LWLockAcquire(ProcArrayLock, LW_SHARED);
+               MyProc->xmin = newxmin;
+               LWLockRelease(ProcArrayLock);
+       }
 }
 
 /* Main loop of walsender process */
index 8b36df4..2473881 100644 (file)
@@ -1034,7 +1034,9 @@ GetOldestXmin(bool allDbs, bool ignoreVacuum)
                if (ignoreVacuum && (proc->vacuumFlags & PROC_IN_VACUUM))
                        continue;
 
-               if (allDbs || proc->databaseId == MyDatabaseId)
+               if (allDbs ||
+                       proc->databaseId == MyDatabaseId ||
+                       proc->databaseId == 0)  /* include WalSender */
                {
                        /* Fetch xid just once - see GetNewTransactionId */
                        TransactionId xid = proc->xid;
@@ -1066,28 +1068,35 @@ GetOldestXmin(bool allDbs, bool ignoreVacuum)
                 */
                TransactionId kaxmin = KnownAssignedXidsGetOldestXmin();
 
+               LWLockRelease(ProcArrayLock);
+
                if (TransactionIdIsNormal(kaxmin) &&
                        TransactionIdPrecedes(kaxmin, result))
                                result = kaxmin;
        }
+       else
+       {
+               /*
+                * No other information needed, so release the lock immediately.
+                */
+               LWLockRelease(ProcArrayLock);
 
-       LWLockRelease(ProcArrayLock);
-
-       /*
-        * Compute the cutoff XID, being careful not to generate a "permanent"
-        * XID.
-        *
-        * vacuum_defer_cleanup_age provides some additional "slop" for the
-        * benefit of hot standby queries on slave servers.  This is quick and
-        * dirty, and perhaps not all that useful unless the master has a
-        * predictable transaction rate, but it's what we've got.  Note that we
-        * are assuming vacuum_defer_cleanup_age isn't large enough to cause
-        * wraparound --- so guc.c should limit it to no more than the
-        * xidStopLimit threshold in varsup.c.
-        */
-       result -= vacuum_defer_cleanup_age;
-       if (!TransactionIdIsNormal(result))
-               result = FirstNormalTransactionId;
+               /*
+                * Compute the cutoff XID, being careful not to generate a "permanent"
+                * XID. We need do this only on the primary, never on standby.
+                *
+                * vacuum_defer_cleanup_age provides some additional "slop" for the
+                * benefit of hot standby queries on slave servers.  This is quick and
+                * dirty, and perhaps not all that useful unless the master has a
+                * predictable transaction rate, but it's what we've got.  Note that we
+                * are assuming vacuum_defer_cleanup_age isn't large enough to cause
+                * wraparound --- so guc.c should limit it to no more than the
+                * xidStopLimit threshold in varsup.c.
+                */
+               result -= vacuum_defer_cleanup_age;
+               if (!TransactionIdIsNormal(result))
+                       result = FirstNormalTransactionId;
+       }
 
        return result;
 }
index 5688557..55cbf75 100644 (file)
@@ -1279,6 +1279,15 @@ static struct config_bool ConfigureNamesBool[] =
        },
 
        {
+               {"hot_standby_feedback", PGC_SIGHUP, WAL_STANDBY_SERVERS,
+                       gettext_noop("Allows feedback from a hot standby primary that will avoid query conflicts."),
+                       NULL
+               },
+               &hot_standby_feedback,
+               false, NULL, NULL
+       },
+
+       {
                {"allow_system_table_mods", PGC_POSTMASTER, DEVELOPER_OPTIONS,
                        gettext_noop("Allows modifications of the structure of system tables."),
                        NULL,
index 3b00a03..6726733 100644 (file)
 
 #hot_standby = off                     # "on" allows queries during recovery
                                        # (change requires restart)
+#hot_standby_feedback = off    # info from standby to prevent query conflicts
 #max_standby_archive_delay = 30s       # max delay before canceling queries
                                        # when reading WAL from archive;
                                        # -1 allows indefinite delay
index 7cd07a2..7e9bad6 100644 (file)
@@ -289,6 +289,7 @@ extern void xlog_desc(StringInfo buf, uint8 xl_info, char *rec);
 extern void issue_xlog_fsync(int fd, uint32 log, uint32 seg);
 
 extern bool RecoveryInProgress(void);
+extern bool HotStandbyActive(void);
 extern bool XLogInsertAllowed(void);
 extern void GetXLogReceiptTime(TimestampTz *rtime, bool *fromStream);
 extern XLogRecPtr GetXLogReplayRecPtr(void);
index 32c4962..da94b6b 100644 (file)
@@ -56,6 +56,15 @@ typedef struct
        XLogRecPtr      flush;
        XLogRecPtr      apply;
 
+       /*
+        * The current xmin and epoch from the standby, for Hot Standby feedback.
+        * This may be invalid if the standby-side does not support feedback,
+        * or Hot Standby is not yet available.
+        */
+       TransactionId   xmin;
+       uint32                  epoch;
+
+
        /* Sender's system clock at the time of transmission */
        TimestampTz sendTime;
 } StandbyReplyMessage;
index aa5bfb7..9137b86 100644 (file)
@@ -18,6 +18,7 @@
 
 extern bool am_walreceiver;
 extern int wal_receiver_status_interval;
+extern bool hot_standby_feedback;
 
 /*
  * MAXCONNINFO: maximum size of a connection string.