]> granicus.if.org Git - postgresql/commitdiff
Track walsender state in shared memory and expose in pg_stat_replication
authorMagnus Hagander <magnus@hagander.net>
Tue, 11 Jan 2011 20:25:28 +0000 (21:25 +0100)
committerMagnus Hagander <magnus@hagander.net>
Tue, 11 Jan 2011 20:25:28 +0000 (21:25 +0100)
doc/src/sgml/monitoring.sgml
src/backend/catalog/system_views.sql
src/backend/replication/basebackup.c
src/backend/replication/walsender.c
src/include/catalog/catversion.h
src/include/catalog/pg_proc.h
src/include/replication/walsender.h

index e2d27da38c7a4d89025d72d9b28d753391812694..241131ce6144f3bd3d2c54d067cb41ac216276bb 100644 (file)
@@ -298,8 +298,8 @@ postgres: <replaceable>user</> <replaceable>database</> <replaceable>host</> <re
       <entry><structname>pg_stat_replication</><indexterm><primary>pg_stat_replication</primary></indexterm></entry>
       <entry>One row per WAL sender process, showing process <acronym>ID</>,
       user OID, user name, application name, client's address and port number,
-      time at which the server process began execution, and transaction log
-      location.
+      time at which the server process began execution, current WAL sender
+      state and transaction log location.
      </entry>
      </row>
 
index aa89240e85f09bc840319110b8b1554207abbee7..718e996e6b8d7dd26e2b0616f5a89090001bd244 100644 (file)
@@ -501,6 +501,7 @@ CREATE VIEW pg_stat_replication AS
             S.client_addr,
             S.client_port,
             S.backend_start,
+            W.state,
             W.sent_location
     FROM pg_stat_get_activity(NULL) AS S, pg_authid U,
             pg_stat_get_wal_senders() AS W
index c09700f79804542529ab0c5aaa1bac6994f33431..144b17c66b69e7f371310dcb7816283095b3e4ca 100644 (file)
@@ -24,6 +24,7 @@
 #include "libpq/pqformat.h"
 #include "nodes/pg_list.h"
 #include "replication/basebackup.h"
+#include "replication/walsender.h"
 #include "storage/fd.h"
 #include "storage/ipc.h"
 #include "utils/builtins.h"
@@ -115,6 +116,8 @@ SendBaseBackup(const char *options)
                                                                                   ALLOCSET_DEFAULT_MAXSIZE);
        old_context = MemoryContextSwitchTo(backup_context);
 
+       WalSndSetState(WALSNDSTATE_BACKUP);
+
        if (backup_label == NULL)
                ereport(FATAL,
                                (errcode(ERRCODE_PROTOCOL_VIOLATION),
index 559e7349fc59c60de457b69124c57e22d1cd2364..a0f20ab41f02ffc9463d7506ac3ec5f8d699cc4d 100644 (file)
@@ -179,6 +179,7 @@ WalSndHandshake(void)
        {
                int                     firstchar;
 
+               WalSndSetState(WALSNDSTATE_STARTUP);
                set_ps_display("idle", false);
 
                /* Wait for a command to arrive */
@@ -482,6 +483,9 @@ WalSndLoop(void)
                        if (!XLogSend(output_message, &caughtup))
                                break;
                }
+
+               /* Update our state to indicate if we're behind or not */
+               WalSndSetState(caughtup ? WALSNDSTATE_STREAMING : WALSNDSTATE_CATCHUP);
        }
 
        /*
@@ -533,6 +537,7 @@ InitWalSnd(void)
                         */
                        walsnd->pid = MyProcPid;
                        MemSet(&walsnd->sentPtr, 0, sizeof(XLogRecPtr));
+                       walsnd->state = WALSNDSTATE_STARTUP;
                        SpinLockRelease(&walsnd->mutex);
                        /* don't need the lock anymore */
                        OwnLatch((Latch *) &walsnd->latch);
@@ -960,6 +965,45 @@ WalSndWakeup(void)
                SetLatch(&WalSndCtl->walsnds[i].latch);
 }
 
+/* Set state for current walsender (only called in walsender) */
+void
+WalSndSetState(WalSndState state)
+{
+       /* use volatile pointer to prevent code rearrangement */
+       volatile WalSnd *walsnd = MyWalSnd;
+
+       Assert(am_walsender);
+
+       if (walsnd->state == state)
+               return;
+
+       SpinLockAcquire(&walsnd->mutex);
+       walsnd->state = state;
+       SpinLockRelease(&walsnd->mutex);
+}
+
+/*
+ * Return a string constant representing the state. This is used
+ * in system views, and should *not* be translated.
+ */
+static const char *
+WalSndGetStateString(WalSndState state)
+{
+       switch (state)
+       {
+               case WALSNDSTATE_STARTUP:
+                       return "STARTUP";
+               case WALSNDSTATE_BACKUP:
+                       return "BACKUP";
+               case WALSNDSTATE_CATCHUP:
+                       return "CATCHUP";
+               case WALSNDSTATE_STREAMING:
+                       return "STREAMING";
+       }
+       return "UNKNOWN";
+}
+
+
 /*
  * Returns activity of walsenders, including pids and xlog locations sent to
  * standby servers.
@@ -967,7 +1011,7 @@ WalSndWakeup(void)
 Datum
 pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
 {
-#define PG_STAT_GET_WAL_SENDERS_COLS   2
+#define PG_STAT_GET_WAL_SENDERS_COLS   3
        ReturnSetInfo      *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
        TupleDesc                       tupdesc;
        Tuplestorestate    *tupstore;
@@ -1021,7 +1065,8 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
 
                memset(nulls, 0, sizeof(nulls));
                values[0] = Int32GetDatum(walsnd->pid);
-               values[1] = CStringGetTextDatum(sent_location);
+               values[1] = CStringGetTextDatum(WalSndGetStateString(walsnd->state));
+               values[2] = CStringGetTextDatum(sent_location);
 
                tuplestore_putvalues(tupstore, tupdesc, values, nulls);
        }
index 7a03b1c1173f398572e2f24a5e8a1a22a26daf59..df3c95b5f916ae8e6af453de520fb0c3479570db 100644 (file)
@@ -53,6 +53,6 @@
  */
 
 /*                                                     yyyymmddN */
-#define CATALOG_VERSION_NO     201101081
+#define CATALOG_VERSION_NO     201101111
 
 #endif
index 2eadd2ced8d851218f5968c359116f16684264a3..f8b5d4da3da64f5b6221e4256e3468524742f0e7 100644 (file)
@@ -3075,7 +3075,7 @@ DATA(insert OID = 1936 (  pg_stat_get_backend_idset               PGNSP PGUID 12 1 100 0 f f
 DESCR("statistics: currently active backend IDs");
 DATA(insert OID = 2022 (  pg_stat_get_activity                 PGNSP PGUID 12 1 100 0 f f f f t s 1 0 2249 "23" "{23,26,23,26,25,25,16,1184,1184,1184,869,23}" "{i,o,o,o,o,o,o,o,o,o,o,o}" "{pid,datid,procpid,usesysid,application_name,current_query,waiting,xact_start,query_start,backend_start,client_addr,client_port}" _null_ pg_stat_get_activity _null_ _null_ _null_ ));
 DESCR("statistics: information about currently active backends");
-DATA(insert OID = 3099 (  pg_stat_get_wal_senders      PGNSP PGUID 12 1 10 0 f f f f t s 0 0 2249 "" "{23,25}" "{o,o}" "{procpid,sent_location}" _null_ pg_stat_get_wal_senders _null_ _null_ _null_ ));
+DATA(insert OID = 3099 (  pg_stat_get_wal_senders      PGNSP PGUID 12 1 10 0 f f f f t s 0 0 2249 "" "{23,25,25}" "{o,o,o}" "{procpid,state,sent_location}" _null_ pg_stat_get_wal_senders _null_ _null_ _null_ ));
 DESCR("statistics: information about currently active replication");
 DATA(insert OID = 2026 (  pg_backend_pid                               PGNSP PGUID 12 1 0 0 f f f t f s 0 0 23 "" _null_ _null_ _null_ _null_ pg_backend_pid _null_ _null_ _null_ ));
 DESCR("statistics: current backend PID");
index d6767b9dcf3aa4ef1a682c980de147dae10f7791..0b4a143f827e78a38bc855bbff7c10e41b714735 100644 (file)
 #include "storage/latch.h"
 #include "storage/spin.h"
 
+
+typedef enum WalSndState
+{
+       WALSNDSTATE_STARTUP = 0,
+       WALSNDSTATE_BACKUP,
+       WALSNDSTATE_CATCHUP,
+       WALSNDSTATE_STREAMING
+}      WalSndState;
+
 /*
  * Each walsender has a WalSnd struct in shared memory.
  */
 typedef struct WalSnd
 {
        pid_t           pid;                    /* this walsender's process id, or 0 */
+       WalSndState state;                      /* this walsender's state */
        XLogRecPtr      sentPtr;                /* WAL has been sent up to this point */
 
        slock_t         mutex;                  /* locks shared variables shown above */
@@ -53,6 +63,7 @@ extern void WalSndSignals(void);
 extern Size WalSndShmemSize(void);
 extern void WalSndShmemInit(void);
 extern void WalSndWakeup(void);
+extern void WalSndSetState(WalSndState state);
 
 extern Datum pg_stat_get_wal_senders(PG_FUNCTION_ARGS);