]> granicus.if.org Git - postgresql/commitdiff
Add timestamp of last received message from standby to pg_stat_replication
authorMichael Paquier <michael@paquier.xyz>
Sun, 9 Dec 2018 07:35:06 +0000 (16:35 +0900)
committerMichael Paquier <michael@paquier.xyz>
Sun, 9 Dec 2018 07:35:06 +0000 (16:35 +0900)
The timestamp generated by the standby at message transmission has been
included in the protocol since its introduction for both the status
update message and hot standby feedback message, but it has never
appeared in pg_stat_replication.  Seeing this timestamp does not matter
much with a cluster which has a lot of activity, but on a mostly-idle
cluster, this makes monitoring able to react faster than the configured
timeouts.

Author: MyungKyu LIM
Reviewed-by: Michael Paquier, Masahiko Sawada
Discussion: https://postgr.es/m/1657809367.407321.1533027417725.JavaMail.jboss@ep2ml404

doc/src/sgml/monitoring.sgml
src/backend/catalog/system_views.sql
src/backend/replication/walsender.c
src/include/catalog/catversion.h
src/include/catalog/pg_proc.dat
src/include/replication/walsender_private.h
src/test/regress/expected/rules.out

index 40e4298cf4e8e9cfd8d53d5a2290dcc2da4140ab..96bcc3a63be38bf5553c30ccbffe4047144275bd 100644 (file)
@@ -1920,6 +1920,11 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i
        </itemizedlist>
      </entry>
     </row>
+    <row>
+     <entry><structfield>reply_time</structfield></entry>
+     <entry><type>timestamp with time zone</type></entry>
+     <entry>Send time of last reply message received from standby server</entry>
+    </row>
    </tbody>
    </tgroup>
   </table>
index 715995dd883c694222f01f3989cf13fffba0870a..8630542bb34091a57d2f8122d818d2d435945104 100644 (file)
@@ -734,7 +734,8 @@ CREATE VIEW pg_stat_replication AS
             W.flush_lag,
             W.replay_lag,
             W.sync_priority,
-            W.sync_state
+            W.sync_state,
+            W.reply_time
     FROM pg_stat_get_activity(NULL) AS S
         JOIN pg_stat_get_wal_senders() AS W ON (S.pid = W.pid)
         LEFT JOIN pg_authid AS U ON (S.usesysid = U.oid);
index 46edb525e88d1b0994a754e53e39b4a833a69f03..d1a8113cb66380be21a6fa36fc38a7589d4e36d1 100644 (file)
@@ -1763,6 +1763,7 @@ ProcessStandbyReplyMessage(void)
                                applyLag;
        bool            clearLagTimes;
        TimestampTz now;
+       TimestampTz replyTime;
 
        static bool fullyAppliedLastTime = false;
 
@@ -1770,14 +1771,25 @@ ProcessStandbyReplyMessage(void)
        writePtr = pq_getmsgint64(&reply_message);
        flushPtr = pq_getmsgint64(&reply_message);
        applyPtr = pq_getmsgint64(&reply_message);
-       (void) pq_getmsgint64(&reply_message);  /* sendTime; not used ATM */
+       replyTime = pq_getmsgint64(&reply_message);
        replyRequested = pq_getmsgbyte(&reply_message);
 
-       elog(DEBUG2, "write %X/%X flush %X/%X apply %X/%X%s",
-                (uint32) (writePtr >> 32), (uint32) writePtr,
-                (uint32) (flushPtr >> 32), (uint32) flushPtr,
-                (uint32) (applyPtr >> 32), (uint32) applyPtr,
-                replyRequested ? " (reply requested)" : "");
+       if (log_min_messages <= DEBUG2)
+       {
+               char       *replyTimeStr;
+
+               /* Copy because timestamptz_to_str returns a static buffer */
+               replyTimeStr = pstrdup(timestamptz_to_str(replyTime));
+
+               elog(DEBUG2, "write %X/%X flush %X/%X apply %X/%X%s reply_time %s",
+                        (uint32) (writePtr >> 32), (uint32) writePtr,
+                        (uint32) (flushPtr >> 32), (uint32) flushPtr,
+                        (uint32) (applyPtr >> 32), (uint32) applyPtr,
+                        replyRequested ? " (reply requested)" : "",
+                        replyTimeStr);
+
+               pfree(replyTimeStr);
+       }
 
        /* See if we can compute the round-trip lag for these positions. */
        now = GetCurrentTimestamp();
@@ -1824,6 +1836,7 @@ ProcessStandbyReplyMessage(void)
                        walsnd->flushLag = flushLag;
                if (applyLag != -1 || clearLagTimes)
                        walsnd->applyLag = applyLag;
+               walsnd->replyTime = replyTime;
                SpinLockRelease(&walsnd->mutex);
        }
 
@@ -1927,23 +1940,47 @@ ProcessStandbyHSFeedbackMessage(void)
        uint32          feedbackEpoch;
        TransactionId feedbackCatalogXmin;
        uint32          feedbackCatalogEpoch;
+       TimestampTz replyTime;
 
        /*
         * Decipher the reply message. The caller already consumed the msgtype
         * byte. See XLogWalRcvSendHSFeedback() in walreceiver.c for the creation
         * of this message.
         */
-       (void) pq_getmsgint64(&reply_message);  /* sendTime; not used ATM */
+       replyTime = pq_getmsgint64(&reply_message);
        feedbackXmin = pq_getmsgint(&reply_message, 4);
        feedbackEpoch = pq_getmsgint(&reply_message, 4);
        feedbackCatalogXmin = pq_getmsgint(&reply_message, 4);
        feedbackCatalogEpoch = pq_getmsgint(&reply_message, 4);
 
-       elog(DEBUG2, "hot standby feedback xmin %u epoch %u, catalog_xmin %u epoch %u",
-                feedbackXmin,
-                feedbackEpoch,
-                feedbackCatalogXmin,
-                feedbackCatalogEpoch);
+       if (log_min_messages <= DEBUG2)
+       {
+               char       *replyTimeStr;
+
+               /* Copy because timestamptz_to_str returns a static buffer */
+               replyTimeStr = pstrdup(timestamptz_to_str(replyTime));
+
+               elog(DEBUG2, "hot standby feedback xmin %u epoch %u, catalog_xmin %u epoch %u reply_time %s",
+                        feedbackXmin,
+                        feedbackEpoch,
+                        feedbackCatalogXmin,
+                        feedbackCatalogEpoch,
+                        replyTimeStr);
+
+               pfree(replyTimeStr);
+       }
+
+       /*
+        * Update shared state for this WalSender process based on reply data from
+        * standby.
+        */
+       {
+               WalSnd     *walsnd = MyWalSnd;
+
+               SpinLockAcquire(&walsnd->mutex);
+               walsnd->replyTime = replyTime;
+               SpinLockRelease(&walsnd->mutex);
+       }
 
        /*
         * Unset WalSender's xmins if the feedback message values are invalid.
@@ -2265,6 +2302,7 @@ InitWalSenderSlot(void)
                        walsnd->applyLag = -1;
                        walsnd->state = WALSNDSTATE_STARTUP;
                        walsnd->latch = &MyProc->procLatch;
+                       walsnd->replyTime = 0;
                        SpinLockRelease(&walsnd->mutex);
                        /* don't need the lock anymore */
                        MyWalSnd = (WalSnd *) walsnd;
@@ -3179,7 +3217,7 @@ offset_to_interval(TimeOffset offset)
 Datum
 pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
 {
-#define PG_STAT_GET_WAL_SENDERS_COLS   11
+#define PG_STAT_GET_WAL_SENDERS_COLS   12
        ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
        TupleDesc       tupdesc;
        Tuplestorestate *tupstore;
@@ -3233,6 +3271,7 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
                int                     priority;
                int                     pid;
                WalSndState state;
+               TimestampTz replyTime;
                Datum           values[PG_STAT_GET_WAL_SENDERS_COLS];
                bool            nulls[PG_STAT_GET_WAL_SENDERS_COLS];
 
@@ -3252,6 +3291,7 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
                flushLag = walsnd->flushLag;
                applyLag = walsnd->applyLag;
                priority = walsnd->sync_standby_priority;
+               replyTime = walsnd->replyTime;
                SpinLockRelease(&walsnd->mutex);
 
                memset(nulls, 0, sizeof(nulls));
@@ -3328,6 +3368,11 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
                                        CStringGetTextDatum("sync") : CStringGetTextDatum("quorum");
                        else
                                values[10] = CStringGetTextDatum("potential");
+
+                       if (replyTime == 0)
+                               nulls[11] = true;
+                       else
+                               values[11] = TimestampTzGetDatum(replyTime);
                }
 
                tuplestore_putvalues(tupstore, tupdesc, values, nulls);
index be72bddd17cf2a065aa5281bd620c08f1ab53f6c..e16ec9dd7786ab203ebcfd77aa1f498a59252715 100644 (file)
@@ -53,6 +53,6 @@
  */
 
 /*                                                     yyyymmddN */
-#define CATALOG_VERSION_NO     201811201
+#define CATALOG_VERSION_NO     201812091
 
 #endif
index 034a41eb556dc716ccb218131b361592addaf872..f79fcfe029f1545599759c0f22e89a952bc86ea1 100644 (file)
   proname => 'pg_stat_get_wal_senders', prorows => '10', proisstrict => 'f',
   proretset => 't', provolatile => 's', proparallel => 'r',
   prorettype => 'record', proargtypes => '',
-  proallargtypes => '{int4,text,pg_lsn,pg_lsn,pg_lsn,pg_lsn,interval,interval,interval,int4,text}',
-  proargmodes => '{o,o,o,o,o,o,o,o,o,o,o}',
-  proargnames => '{pid,state,sent_lsn,write_lsn,flush_lsn,replay_lsn,write_lag,flush_lag,replay_lag,sync_priority,sync_state}',
+  proallargtypes => '{int4,text,pg_lsn,pg_lsn,pg_lsn,pg_lsn,interval,interval,interval,int4,text,timestamptz}',
+  proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o}',
+  proargnames => '{pid,state,sent_lsn,write_lsn,flush_lsn,replay_lsn,write_lag,flush_lag,replay_lag,sync_priority,sync_state,reply_time}',
   prosrc => 'pg_stat_get_wal_senders' },
 { oid => '3317', descr => 'statistics: information about WAL receiver',
   proname => 'pg_stat_get_wal_receiver', proisstrict => 'f', provolatile => 's',
index 4b904779361f5a28f0f6939d632ddbf82db8cc16..53314b1fae5c65dffa436fc3e763769939a47396 100644 (file)
@@ -75,6 +75,11 @@ typedef struct WalSnd
         * SyncRepLock.
         */
        int                     sync_standby_priority;
+
+       /*
+        * Timestamp of the last message received from standby.
+        */
+       TimestampTz replyTime;
 } WalSnd;
 
 extern WalSnd *MyWalSnd;
index 735dd37acff359800d76c46a98723857ef3d2a22..b68b8d273f32105e179dbe839dda99039e329de7 100644 (file)
@@ -1861,9 +1861,10 @@ pg_stat_replication| SELECT s.pid,
     w.flush_lag,
     w.replay_lag,
     w.sync_priority,
-    w.sync_state
+    w.sync_state,
+    w.reply_time
    FROM ((pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, application_name, state, query, wait_event_type, wait_event, xact_start, query_start, backend_start, state_change, client_addr, client_hostname, client_port, backend_xid, backend_xmin, backend_type, ssl, sslversion, sslcipher, sslbits, sslcompression, sslclientdn)
-     JOIN pg_stat_get_wal_senders() w(pid, state, sent_lsn, write_lsn, flush_lsn, replay_lsn, write_lag, flush_lag, replay_lag, sync_priority, sync_state) ON ((s.pid = w.pid)))
+     JOIN pg_stat_get_wal_senders() w(pid, state, sent_lsn, write_lsn, flush_lsn, replay_lsn, write_lag, flush_lag, replay_lag, sync_priority, sync_state, reply_time) ON ((s.pid = w.pid)))
      LEFT JOIN pg_authid u ON ((s.usesysid = u.oid)));
 pg_stat_ssl| SELECT s.pid,
     s.ssl,