applyLag;
bool clearLagTimes;
TimestampTz now;
+ TimestampTz replyTime;
static bool fullyAppliedLastTime = false;
writePtr = pq_getmsgint64(&reply_message);
flushPtr = pq_getmsgint64(&reply_message);
applyPtr = pq_getmsgint64(&reply_message);
- (void) pq_getmsgint64(&reply_message); /* sendTime; not used ATM */
+ replyTime = pq_getmsgint64(&reply_message);
replyRequested = pq_getmsgbyte(&reply_message);
- elog(DEBUG2, "write %X/%X flush %X/%X apply %X/%X%s",
- (uint32) (writePtr >> 32), (uint32) writePtr,
- (uint32) (flushPtr >> 32), (uint32) flushPtr,
- (uint32) (applyPtr >> 32), (uint32) applyPtr,
- replyRequested ? " (reply requested)" : "");
+ if (log_min_messages <= DEBUG2)
+ {
+ char *replyTimeStr;
+
+ /* Copy because timestamptz_to_str returns a static buffer */
+ replyTimeStr = pstrdup(timestamptz_to_str(replyTime));
+
+ elog(DEBUG2, "write %X/%X flush %X/%X apply %X/%X%s reply_time %s",
+ (uint32) (writePtr >> 32), (uint32) writePtr,
+ (uint32) (flushPtr >> 32), (uint32) flushPtr,
+ (uint32) (applyPtr >> 32), (uint32) applyPtr,
+ replyRequested ? " (reply requested)" : "",
+ replyTimeStr);
+
+ pfree(replyTimeStr);
+ }
/* See if we can compute the round-trip lag for these positions. */
now = GetCurrentTimestamp();
walsnd->flushLag = flushLag;
if (applyLag != -1 || clearLagTimes)
walsnd->applyLag = applyLag;
+ walsnd->replyTime = replyTime;
SpinLockRelease(&walsnd->mutex);
}
uint32 feedbackEpoch;
TransactionId feedbackCatalogXmin;
uint32 feedbackCatalogEpoch;
+ TimestampTz replyTime;
/*
* Decipher the reply message. The caller already consumed the msgtype
* byte. See XLogWalRcvSendHSFeedback() in walreceiver.c for the creation
* of this message.
*/
- (void) pq_getmsgint64(&reply_message); /* sendTime; not used ATM */
+ replyTime = pq_getmsgint64(&reply_message);
feedbackXmin = pq_getmsgint(&reply_message, 4);
feedbackEpoch = pq_getmsgint(&reply_message, 4);
feedbackCatalogXmin = pq_getmsgint(&reply_message, 4);
feedbackCatalogEpoch = pq_getmsgint(&reply_message, 4);
- elog(DEBUG2, "hot standby feedback xmin %u epoch %u, catalog_xmin %u epoch %u",
- feedbackXmin,
- feedbackEpoch,
- feedbackCatalogXmin,
- feedbackCatalogEpoch);
+ if (log_min_messages <= DEBUG2)
+ {
+ char *replyTimeStr;
+
+ /* Copy because timestamptz_to_str returns a static buffer */
+ replyTimeStr = pstrdup(timestamptz_to_str(replyTime));
+
+ elog(DEBUG2, "hot standby feedback xmin %u epoch %u, catalog_xmin %u epoch %u reply_time %s",
+ feedbackXmin,
+ feedbackEpoch,
+ feedbackCatalogXmin,
+ feedbackCatalogEpoch,
+ replyTimeStr);
+
+ pfree(replyTimeStr);
+ }
+
+ /*
+ * Update shared state for this WalSender process based on reply data from
+ * standby.
+ */
+ {
+ WalSnd *walsnd = MyWalSnd;
+
+ SpinLockAcquire(&walsnd->mutex);
+ walsnd->replyTime = replyTime;
+ SpinLockRelease(&walsnd->mutex);
+ }
/*
* Unset WalSender's xmins if the feedback message values are invalid.
walsnd->applyLag = -1;
walsnd->state = WALSNDSTATE_STARTUP;
walsnd->latch = &MyProc->procLatch;
+ walsnd->replyTime = 0;
SpinLockRelease(&walsnd->mutex);
/* don't need the lock anymore */
MyWalSnd = (WalSnd *) walsnd;
Datum
pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
{
-#define PG_STAT_GET_WAL_SENDERS_COLS 11
+#define PG_STAT_GET_WAL_SENDERS_COLS 12
ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
TupleDesc tupdesc;
Tuplestorestate *tupstore;
int priority;
int pid;
WalSndState state;
+ TimestampTz replyTime;
Datum values[PG_STAT_GET_WAL_SENDERS_COLS];
bool nulls[PG_STAT_GET_WAL_SENDERS_COLS];
flushLag = walsnd->flushLag;
applyLag = walsnd->applyLag;
priority = walsnd->sync_standby_priority;
+ replyTime = walsnd->replyTime;
SpinLockRelease(&walsnd->mutex);
memset(nulls, 0, sizeof(nulls));
CStringGetTextDatum("sync") : CStringGetTextDatum("quorum");
else
values[10] = CStringGetTextDatum("potential");
+
+ if (replyTime == 0)
+ nulls[11] = true;
+ else
+ values[11] = TimestampTzGetDatum(replyTime);
}
tuplestore_putvalues(tupstore, tupdesc, values, nulls);
proname => 'pg_stat_get_wal_senders', prorows => '10', proisstrict => 'f',
proretset => 't', provolatile => 's', proparallel => 'r',
prorettype => 'record', proargtypes => '',
- proallargtypes => '{int4,text,pg_lsn,pg_lsn,pg_lsn,pg_lsn,interval,interval,interval,int4,text}',
- proargmodes => '{o,o,o,o,o,o,o,o,o,o,o}',
- proargnames => '{pid,state,sent_lsn,write_lsn,flush_lsn,replay_lsn,write_lag,flush_lag,replay_lag,sync_priority,sync_state}',
+ proallargtypes => '{int4,text,pg_lsn,pg_lsn,pg_lsn,pg_lsn,interval,interval,interval,int4,text,timestamptz}',
+ proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o}',
+ proargnames => '{pid,state,sent_lsn,write_lsn,flush_lsn,replay_lsn,write_lag,flush_lag,replay_lag,sync_priority,sync_state,reply_time}',
prosrc => 'pg_stat_get_wal_senders' },
{ oid => '3317', descr => 'statistics: information about WAL receiver',
proname => 'pg_stat_get_wal_receiver', proisstrict => 'f', provolatile => 's',
w.flush_lag,
w.replay_lag,
w.sync_priority,
- w.sync_state
+ w.sync_state,
+ w.reply_time
FROM ((pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, application_name, state, query, wait_event_type, wait_event, xact_start, query_start, backend_start, state_change, client_addr, client_hostname, client_port, backend_xid, backend_xmin, backend_type, ssl, sslversion, sslcipher, sslbits, sslcompression, sslclientdn)
- JOIN pg_stat_get_wal_senders() w(pid, state, sent_lsn, write_lsn, flush_lsn, replay_lsn, write_lag, flush_lag, replay_lag, sync_priority, sync_state) ON ((s.pid = w.pid)))
+ JOIN pg_stat_get_wal_senders() w(pid, state, sent_lsn, write_lsn, flush_lsn, replay_lsn, write_lag, flush_lag, replay_lag, sync_priority, sync_state, reply_time) ON ((s.pid = w.pid)))
LEFT JOIN pg_authid u ON ((s.usesysid = u.oid)));
pg_stat_ssl| SELECT s.pid,
s.ssl,