ProcessStandbyHSFeedbackMessage(void)
{
StandbyHSFeedbackMessage reply;
- TransactionId newxmin = InvalidTransactionId;
+ TransactionId nextXid;
+ uint32 nextEpoch;
- pq_copymsgbytes(&reply_message, (char *) &reply, sizeof(StandbyHSFeedbackMessage));
+ /* Decipher the reply message */
+ pq_copymsgbytes(&reply_message, (char *) &reply,
+ sizeof(StandbyHSFeedbackMessage));
elog(DEBUG2, "hot standby feedback xmin %u epoch %u",
reply.xmin,
reply.epoch);
+ /* Ignore invalid xmin (can't actually happen with current walreceiver) */
+ if (!TransactionIdIsNormal(reply.xmin))
+ return;
+
/*
- * Update the WalSender's proc xmin to allow it to be visible to
- * snapshots. This will hold back the removal of dead rows and thereby
- * prevent the generation of cleanup conflicts on the standby server.
+ * Check that the provided xmin/epoch are sane, that is, not in the future
+ * and not so far back as to be already wrapped around. Ignore if not.
+ *
+ * Epoch of nextXid should be same as standby, or if the counter has
+ * wrapped, then one greater than standby.
*/
- if (TransactionIdIsValid(reply.xmin))
- {
- TransactionId nextXid;
- uint32 nextEpoch;
- bool epochOK = false;
-
- GetNextXidAndEpoch(&nextXid, &nextEpoch);
-
- /*
- * Epoch of oldestXmin should be same as standby or if the counter has
- * wrapped, then one less than reply.
- */
- if (reply.xmin <= nextXid)
- {
- if (reply.epoch == nextEpoch)
- epochOK = true;
- }
- else
- {
- if (nextEpoch > 0 && reply.epoch == nextEpoch - 1)
- epochOK = true;
- }
-
- /*
- * Feedback from standby must not go backwards, nor should it go
- * forwards further than our most recent xid.
- */
- if (epochOK && TransactionIdPrecedesOrEquals(reply.xmin, nextXid))
- {
- if (!TransactionIdIsValid(MyProc->xmin))
- {
- TransactionId oldestXmin = GetOldestXmin(true, true);
+ GetNextXidAndEpoch(&nextXid, &nextEpoch);
- if (TransactionIdPrecedes(oldestXmin, reply.xmin))
- newxmin = reply.xmin;
- else
- newxmin = oldestXmin;
- }
- else
- {
- if (TransactionIdPrecedes(MyProc->xmin, reply.xmin))
- newxmin = reply.xmin;
- else
- newxmin = MyProc->xmin; /* stay the same */
- }
- }
+ if (reply.xmin <= nextXid)
+ {
+ if (reply.epoch != nextEpoch)
+ return;
}
+ else
+ {
+ if (reply.epoch + 1 != nextEpoch)
+ return;
+ }
+
+ if (!TransactionIdPrecedesOrEquals(reply.xmin, nextXid))
+ return; /* epoch OK, but it's wrapped around */
/*
- * Grab the ProcArrayLock to set xmin, or invalidate for bad reply
+ * Set the WalSender's xmin equal to the standby's requested xmin, so that
+ * the xmin will be taken into account by GetOldestXmin. This will hold
+ * back the removal of dead rows and thereby prevent the generation of
+ * cleanup conflicts on the standby server.
+ *
+ * There is a small window for a race condition here: although we just
+ * checked that reply.xmin precedes nextXid, the nextXid could have gotten
+ * advanced between our fetching it and applying the xmin below, perhaps
+ * far enough to make reply.xmin wrap around. In that case the xmin we
+ * set here would be "in the future" and have no effect. No point in
+ * worrying about this since it's too late to save the desired data
+ * anyway. Assuming that the standby sends us an increasing sequence of
+ * xmins, this could only happen during the first reply cycle, else our
+ * own xmin would prevent nextXid from advancing so far.
+ *
+ * We don't bother taking the ProcArrayLock here. Setting the xmin field
+ * is assumed atomic, and there's no real need to prevent a concurrent
+ * GetOldestXmin. (If we're moving our xmin forward, this is obviously
+ * safe, and if we're moving it backwards, well, the data is at risk
+ * already since a VACUUM could have just finished calling GetOldestXmin.)
*/
- if (MyProc->xmin != newxmin)
- {
- LWLockAcquire(ProcArrayLock, LW_SHARED);
- MyProc->xmin = newxmin;
- LWLockRelease(ProcArrayLock);
- }
+ MyProc->xmin = reply.xmin;
}
/* Main loop of walsender process */
* This is also used to determine where to truncate pg_subtrans. allDbs
* must be TRUE for that case, and ignoreVacuum FALSE.
*
- * Note: it's possible for the calculated value to move backwards on repeated
- * calls. The calculated value is conservative, so that anything older is
- * definitely not considered as running by anyone anymore, but the exact
- * value calculated depends on a number of things. For example, if allDbs is
- * TRUE and there are no transactions running in the current database,
- * GetOldestXmin() returns latestCompletedXid. If a transaction begins after
- * that, its xmin will include in-progress transactions in other databases
- * that started earlier, so another call will return an lower value. The
- * return value is also adjusted with vacuum_defer_cleanup_age, so increasing
- * that setting on the fly is an easy way to have GetOldestXmin() move
- * backwards.
- *
* Note: we include all currently running xids in the set of considered xids.
* This ensures that if a just-started xact has not yet set its snapshot,
* when it does set the snapshot it cannot set xmin less than what we compute.
* See notes in src/backend/access/transam/README.
+ *
+ * Note: despite the above, it's possible for the calculated value to move
+ * backwards on repeated calls. The calculated value is conservative, so that
+ * anything older is definitely not considered as running by anyone anymore,
+ * but the exact value calculated depends on a number of things. For example,
+ * if allDbs is FALSE and there are no transactions running in the current
+ * database, GetOldestXmin() returns latestCompletedXid. If a transaction
+ * begins after that, its xmin will include in-progress transactions in other
+ * databases that started earlier, so another call will return a lower value.
+ * Nonetheless it is safe to vacuum a table in the current database with the
+ * first result. There are also replication-related effects: a walsender
+ * process can set its xmin based on transactions that are no longer running
+ * in the master but are still being replayed on the standby, thus possibly
+ * making the GetOldestXmin reading go backwards. In this case there is a
+ * possibility that we lose data that the standby would like to have, but
+ * there is little we can do about that --- data is only protected if the
+ * walsender runs continuously while queries are executed on the standby.
+ * (The Hot Standby code deals with such cases by failing standby queries
+ * that needed to access already-removed data, so there's no integrity bug.)
+ * The return value is also adjusted with vacuum_defer_cleanup_age, so
+ * increasing that setting on the fly is another easy way to make
+ * GetOldestXmin() move backwards, with no consequences for data integrity.
*/
TransactionId
GetOldestXmin(bool allDbs, bool ignoreVacuum)
if (allDbs ||
proc->databaseId == MyDatabaseId ||
- proc->databaseId == 0) /* include WalSender */
+ proc->databaseId == 0) /* always include WalSender */
{
/* Fetch xid just once - see GetNewTransactionId */
TransactionId xid = proc->xid;
LWLockRelease(ProcArrayLock);
/*
- * Compute the cutoff XID, being careful not to generate a "permanent"
- * XID. We need do this only on the primary, never on standby.
+ * Compute the cutoff XID by subtracting vacuum_defer_cleanup_age,
+ * being careful not to generate a "permanent" XID.
*
* vacuum_defer_cleanup_age provides some additional "slop" for the
* benefit of hot standby queries on slave servers. This is quick and
* dirty, and perhaps not all that useful unless the master has a
- * predictable transaction rate, but it's what we've got. Note that
- * we are assuming vacuum_defer_cleanup_age isn't large enough to
- * cause wraparound --- so guc.c should limit it to no more than the
- * xidStopLimit threshold in varsup.c.
+ * predictable transaction rate, but it offers some protection when
+ * there's no walsender connection. Note that we are assuming
+ * vacuum_defer_cleanup_age isn't large enough to cause wraparound ---
+ * so guc.c should limit it to no more than the xidStopLimit threshold
+ * in varsup.c. Also note that we intentionally don't apply
+ * vacuum_defer_cleanup_age on standby servers.
*/
result -= vacuum_defer_cleanup_age;
if (!TransactionIdIsNormal(result))