int wakeEvents;
static XLogRecPtr RecentFlushPtr = InvalidXLogRecPtr;
-
/*
* Fast path to avoid acquiring the spinlock in case we already know we
* have enough WAL available. This is particularly interesting if we're
{
XLogRecord *record;
char *errm;
+ XLogRecPtr flushPtr;
/*
* Don't know whether we've caught up yet. We'll set WalSndCaughtUp to
if (errm != NULL)
elog(ERROR, "%s", errm);
+ /*
+ * We'll use the current flush point to determine whether we've caught up.
+ */
+ flushPtr = GetFlushRecPtr();
+
if (record != NULL)
{
- /* XXX: Note that logical decoding cannot be used while in recovery */
- XLogRecPtr flushPtr = GetFlushRecPtr();
-
/*
* Note the lack of any call to LagTrackerWrite() which is handled by
* WalSndUpdateProgress which is called by output plugin through
LogicalDecodingProcessRecord(logical_decoding_ctx, logical_decoding_ctx->reader);
sentPtr = logical_decoding_ctx->reader->EndRecPtr;
-
- /*
- * If we have sent a record that is at or beyond the flushed point, we
- * have caught up.
- */
- if (sentPtr >= flushPtr)
- WalSndCaughtUp = true;
}
- else
- {
- /*
- * If the record we just wanted read is at or beyond the flushed
- * point, then we're caught up.
- */
- if (logical_decoding_ctx->reader->EndRecPtr >= GetFlushRecPtr())
- {
- WalSndCaughtUp = true;
- /*
- * Have WalSndLoop() terminate the connection in an orderly
- * manner, after writing out all the pending data.
- */
- if (got_STOPPING)
- got_SIGUSR2 = true;
- }
- }
+ /* Set flag if we're caught up. */
+ if (logical_decoding_ctx->reader->EndRecPtr >= flushPtr)
+ WalSndCaughtUp = true;
+
+ /*
+ * If we're caught up and have been requested to stop, have WalSndLoop()
+ * terminate the connection in an orderly manner, after writing out all
+ * the pending data.
+ */
+ if (WalSndCaughtUp && got_STOPPING)
+ got_SIGUSR2 = true;
/* Update shared memory status */
{