OSDN Git Service

Track walsender state in shared memory and expose in pg_stat_replication
authorMagnus Hagander <magnus@hagander.net>
Tue, 11 Jan 2011 20:25:28 +0000 (21:25 +0100)
committerMagnus Hagander <magnus@hagander.net>
Tue, 11 Jan 2011 20:25:28 +0000 (21:25 +0100)
doc/src/sgml/monitoring.sgml
src/backend/catalog/system_views.sql
src/backend/replication/basebackup.c
src/backend/replication/walsender.c
src/include/catalog/catversion.h
src/include/catalog/pg_proc.h
src/include/replication/walsender.h

index e2d27da..241131c 100644 (file)
@@ -298,8 +298,8 @@ postgres: <replaceable>user</> <replaceable>database</> <replaceable>host</> <re
       <entry><structname>pg_stat_replication</><indexterm><primary>pg_stat_replication</primary></indexterm></entry>
       <entry>One row per WAL sender process, showing process <acronym>ID</>,
       user OID, user name, application name, client's address and port number,
-      time at which the server process began execution, and transaction log
-      location.
+      time at which the server process began execution, current WAL sender
+      state and transaction log location.
      </entry>
      </row>
 
index aa89240..718e996 100644 (file)
@@ -501,6 +501,7 @@ CREATE VIEW pg_stat_replication AS
             S.client_addr,
             S.client_port,
             S.backend_start,
+            W.state,
             W.sent_location
     FROM pg_stat_get_activity(NULL) AS S, pg_authid U,
             pg_stat_get_wal_senders() AS W
index c09700f..144b17c 100644 (file)
@@ -24,6 +24,7 @@
 #include "libpq/pqformat.h"
 #include "nodes/pg_list.h"
 #include "replication/basebackup.h"
+#include "replication/walsender.h"
 #include "storage/fd.h"
 #include "storage/ipc.h"
 #include "utils/builtins.h"
@@ -115,6 +116,8 @@ SendBaseBackup(const char *options)
                                                                                   ALLOCSET_DEFAULT_MAXSIZE);
        old_context = MemoryContextSwitchTo(backup_context);
 
+       WalSndSetState(WALSNDSTATE_BACKUP);
+
        if (backup_label == NULL)
                ereport(FATAL,
                                (errcode(ERRCODE_PROTOCOL_VIOLATION),
index 559e734..a0f20ab 100644 (file)
@@ -179,6 +179,7 @@ WalSndHandshake(void)
        {
                int                     firstchar;
 
+               WalSndSetState(WALSNDSTATE_STARTUP);
                set_ps_display("idle", false);
 
                /* Wait for a command to arrive */
@@ -482,6 +483,9 @@ WalSndLoop(void)
                        if (!XLogSend(output_message, &caughtup))
                                break;
                }
+
+               /* Update our state to indicate if we're behind or not */
+               WalSndSetState(caughtup ? WALSNDSTATE_STREAMING : WALSNDSTATE_CATCHUP);
        }
 
        /*
@@ -533,6 +537,7 @@ InitWalSnd(void)
                         */
                        walsnd->pid = MyProcPid;
                        MemSet(&walsnd->sentPtr, 0, sizeof(XLogRecPtr));
+                       walsnd->state = WALSNDSTATE_STARTUP;
                        SpinLockRelease(&walsnd->mutex);
                        /* don't need the lock anymore */
                        OwnLatch((Latch *) &walsnd->latch);
@@ -960,6 +965,45 @@ WalSndWakeup(void)
                SetLatch(&WalSndCtl->walsnds[i].latch);
 }
 
+/* Set state for current walsender (only called in walsender) */
+void
+WalSndSetState(WalSndState state)
+{
+       /* use volatile pointer to prevent code rearrangement */
+       volatile WalSnd *walsnd = MyWalSnd;
+
+       Assert(am_walsender);
+
+       if (walsnd->state == state)
+               return;
+
+       SpinLockAcquire(&walsnd->mutex);
+       walsnd->state = state;
+       SpinLockRelease(&walsnd->mutex);
+}
+
+/*
+ * Return a string constant representing the state. This is used
+ * in system views, and should *not* be translated.
+ */
+static const char *
+WalSndGetStateString(WalSndState state)
+{
+       switch (state)
+       {
+               case WALSNDSTATE_STARTUP:
+                       return "STARTUP";
+               case WALSNDSTATE_BACKUP:
+                       return "BACKUP";
+               case WALSNDSTATE_CATCHUP:
+                       return "CATCHUP";
+               case WALSNDSTATE_STREAMING:
+                       return "STREAMING";
+       }
+       return "UNKNOWN";
+}
+
+
 /*
  * Returns activity of walsenders, including pids and xlog locations sent to
  * standby servers.
@@ -967,7 +1011,7 @@ WalSndWakeup(void)
 Datum
 pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
 {
-#define PG_STAT_GET_WAL_SENDERS_COLS   2
+#define PG_STAT_GET_WAL_SENDERS_COLS   3
        ReturnSetInfo      *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
        TupleDesc                       tupdesc;
        Tuplestorestate    *tupstore;
@@ -1021,7 +1065,8 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
 
                memset(nulls, 0, sizeof(nulls));
                values[0] = Int32GetDatum(walsnd->pid);
-               values[1] = CStringGetTextDatum(sent_location);
+               values[1] = CStringGetTextDatum(WalSndGetStateString(walsnd->state));
+               values[2] = CStringGetTextDatum(sent_location);
 
                tuplestore_putvalues(tupstore, tupdesc, values, nulls);
        }
index 7a03b1c..df3c95b 100644 (file)
@@ -53,6 +53,6 @@
  */
 
 /*                                                     yyyymmddN */
-#define CATALOG_VERSION_NO     201101081
+#define CATALOG_VERSION_NO     201101111
 
 #endif
index 2eadd2c..f8b5d4d 100644 (file)
@@ -3075,7 +3075,7 @@ DATA(insert OID = 1936 (  pg_stat_get_backend_idset               PGNSP PGUID 12 1 100 0 f f
 DESCR("statistics: currently active backend IDs");
 DATA(insert OID = 2022 (  pg_stat_get_activity                 PGNSP PGUID 12 1 100 0 f f f f t s 1 0 2249 "23" "{23,26,23,26,25,25,16,1184,1184,1184,869,23}" "{i,o,o,o,o,o,o,o,o,o,o,o}" "{pid,datid,procpid,usesysid,application_name,current_query,waiting,xact_start,query_start,backend_start,client_addr,client_port}" _null_ pg_stat_get_activity _null_ _null_ _null_ ));
 DESCR("statistics: information about currently active backends");
-DATA(insert OID = 3099 (  pg_stat_get_wal_senders      PGNSP PGUID 12 1 10 0 f f f f t s 0 0 2249 "" "{23,25}" "{o,o}" "{procpid,sent_location}" _null_ pg_stat_get_wal_senders _null_ _null_ _null_ ));
+DATA(insert OID = 3099 (  pg_stat_get_wal_senders      PGNSP PGUID 12 1 10 0 f f f f t s 0 0 2249 "" "{23,25,25}" "{o,o,o}" "{procpid,state,sent_location}" _null_ pg_stat_get_wal_senders _null_ _null_ _null_ ));
 DESCR("statistics: information about currently active replication");
 DATA(insert OID = 2026 (  pg_backend_pid                               PGNSP PGUID 12 1 0 0 f f f t f s 0 0 23 "" _null_ _null_ _null_ _null_ pg_backend_pid _null_ _null_ _null_ ));
 DESCR("statistics: current backend PID");
index d6767b9..0b4a143 100644 (file)
 #include "storage/latch.h"
 #include "storage/spin.h"
 
+
+typedef enum WalSndState
+{
+       WALSNDSTATE_STARTUP = 0,
+       WALSNDSTATE_BACKUP,
+       WALSNDSTATE_CATCHUP,
+       WALSNDSTATE_STREAMING
+}      WalSndState;
+
 /*
  * Each walsender has a WalSnd struct in shared memory.
  */
 typedef struct WalSnd
 {
        pid_t           pid;                    /* this walsender's process id, or 0 */
+       WalSndState state;                      /* this walsender's state */
        XLogRecPtr      sentPtr;                /* WAL has been sent up to this point */
 
        slock_t         mutex;                  /* locks shared variables shown above */
@@ -53,6 +63,7 @@ extern void WalSndSignals(void);
 extern Size WalSndShmemSize(void);
 extern void WalSndShmemInit(void);
 extern void WalSndWakeup(void);
+extern void WalSndSetState(WalSndState state);
 
 extern Datum pg_stat_get_wal_senders(PG_FUNCTION_ARGS);