From d1d094e4cf7d80cb2b4f68b5e8ce11aa9ebcbf3c Mon Sep 17 00:00:00 2001 From: Tom Lane Date: Thu, 20 Oct 2011 19:43:31 -0400 Subject: [PATCH] Simplify and improve ProcessStandbyHSFeedbackMessage logic. There's no need to clamp the standby's xmin to be greater than GetOldestXmin's result; if there were any such need this logic would be hopelessly inadequate anyway, because it fails to account for within-database versus cluster-wide values of GetOldestXmin. So get rid of that, and just rely on sanity-checking that the xmin is not wrapped around relative to the nextXid counter. Also, don't reset the walsender's xmin if the current feedback xmin is indeed out of range; that just creates more problems than we already had. Lastly, don't bother to take the ProcArrayLock; there's no need to do that to set xmin. Also improve the comments about this in GetOldestXmin itself. --- src/backend/replication/walsender.c | 105 +++++++++++++--------------- src/backend/storage/ipc/procarray.c | 50 ++++++++----- 2 files changed, 79 insertions(+), 76 deletions(-) diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 606b9e8571..3497269850 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -611,76 +611,67 @@ static void ProcessStandbyHSFeedbackMessage(void) { StandbyHSFeedbackMessage reply; - TransactionId newxmin = InvalidTransactionId; + TransactionId nextXid; + uint32 nextEpoch; - pq_copymsgbytes(&reply_message, (char *) &reply, sizeof(StandbyHSFeedbackMessage)); + /* Decipher the reply message */ + pq_copymsgbytes(&reply_message, (char *) &reply, + sizeof(StandbyHSFeedbackMessage)); elog(DEBUG2, "hot standby feedback xmin %u epoch %u", reply.xmin, reply.epoch); + /* Ignore invalid xmin (can't actually happen with current walreceiver) */ + if (!TransactionIdIsNormal(reply.xmin)) + return; + /* - * Update the WalSender's proc xmin to allow it to be visible to - * snapshots. This will hold back the removal of dead rows and thereby - * prevent the generation of cleanup conflicts on the standby server. + * Check that the provided xmin/epoch are sane, that is, not in the future + * and not so far back as to be already wrapped around. Ignore if not. + * + * Epoch of nextXid should be same as standby, or if the counter has + * wrapped, then one greater than standby. */ - if (TransactionIdIsValid(reply.xmin)) - { - TransactionId nextXid; - uint32 nextEpoch; - bool epochOK = false; - - GetNextXidAndEpoch(&nextXid, &nextEpoch); - - /* - * Epoch of oldestXmin should be same as standby or if the counter has - * wrapped, then one less than reply. - */ - if (reply.xmin <= nextXid) - { - if (reply.epoch == nextEpoch) - epochOK = true; - } - else - { - if (nextEpoch > 0 && reply.epoch == nextEpoch - 1) - epochOK = true; - } - - /* - * Feedback from standby must not go backwards, nor should it go - * forwards further than our most recent xid. - */ - if (epochOK && TransactionIdPrecedesOrEquals(reply.xmin, nextXid)) - { - if (!TransactionIdIsValid(MyProc->xmin)) - { - TransactionId oldestXmin = GetOldestXmin(true, true); + GetNextXidAndEpoch(&nextXid, &nextEpoch); - if (TransactionIdPrecedes(oldestXmin, reply.xmin)) - newxmin = reply.xmin; - else - newxmin = oldestXmin; - } - else - { - if (TransactionIdPrecedes(MyProc->xmin, reply.xmin)) - newxmin = reply.xmin; - else - newxmin = MyProc->xmin; /* stay the same */ - } - } + if (reply.xmin <= nextXid) + { + if (reply.epoch != nextEpoch) + return; } + else + { + if (reply.epoch + 1 != nextEpoch) + return; + } + + if (!TransactionIdPrecedesOrEquals(reply.xmin, nextXid)) + return; /* epoch OK, but it's wrapped around */ /* - * Grab the ProcArrayLock to set xmin, or invalidate for bad reply + * Set the WalSender's xmin equal to the standby's requested xmin, so that + * the xmin will be taken into account by GetOldestXmin. This will hold + * back the removal of dead rows and thereby prevent the generation of + * cleanup conflicts on the standby server. + * + * There is a small window for a race condition here: although we just + * checked that reply.xmin precedes nextXid, the nextXid could have gotten + * advanced between our fetching it and applying the xmin below, perhaps + * far enough to make reply.xmin wrap around. In that case the xmin we + * set here would be "in the future" and have no effect. No point in + * worrying about this since it's too late to save the desired data + * anyway. Assuming that the standby sends us an increasing sequence of + * xmins, this could only happen during the first reply cycle, else our + * own xmin would prevent nextXid from advancing so far. + * + * We don't bother taking the ProcArrayLock here. Setting the xmin field + * is assumed atomic, and there's no real need to prevent a concurrent + * GetOldestXmin. (If we're moving our xmin forward, this is obviously + * safe, and if we're moving it backwards, well, the data is at risk + * already since a VACUUM could have just finished calling GetOldestXmin.) */ - if (MyProc->xmin != newxmin) - { - LWLockAcquire(ProcArrayLock, LW_SHARED); - MyProc->xmin = newxmin; - LWLockRelease(ProcArrayLock); - } + MyProc->xmin = reply.xmin; } /* Main loop of walsender process */ diff --git a/src/backend/storage/ipc/procarray.c b/src/backend/storage/ipc/procarray.c index e7593fa2eb..a0e36bfeb8 100644 --- a/src/backend/storage/ipc/procarray.c +++ b/src/backend/storage/ipc/procarray.c @@ -998,22 +998,32 @@ TransactionIdIsActive(TransactionId xid) * This is also used to determine where to truncate pg_subtrans. allDbs * must be TRUE for that case, and ignoreVacuum FALSE. * - * Note: it's possible for the calculated value to move backwards on repeated - * calls. The calculated value is conservative, so that anything older is - * definitely not considered as running by anyone anymore, but the exact - * value calculated depends on a number of things. For example, if allDbs is - * TRUE and there are no transactions running in the current database, - * GetOldestXmin() returns latestCompletedXid. If a transaction begins after - * that, its xmin will include in-progress transactions in other databases - * that started earlier, so another call will return an lower value. The - * return value is also adjusted with vacuum_defer_cleanup_age, so increasing - * that setting on the fly is an easy way to have GetOldestXmin() move - * backwards. - * * Note: we include all currently running xids in the set of considered xids. * This ensures that if a just-started xact has not yet set its snapshot, * when it does set the snapshot it cannot set xmin less than what we compute. * See notes in src/backend/access/transam/README. + * + * Note: despite the above, it's possible for the calculated value to move + * backwards on repeated calls. The calculated value is conservative, so that + * anything older is definitely not considered as running by anyone anymore, + * but the exact value calculated depends on a number of things. For example, + * if allDbs is FALSE and there are no transactions running in the current + * database, GetOldestXmin() returns latestCompletedXid. If a transaction + * begins after that, its xmin will include in-progress transactions in other + * databases that started earlier, so another call will return a lower value. + * Nonetheless it is safe to vacuum a table in the current database with the + * first result. There are also replication-related effects: a walsender + * process can set its xmin based on transactions that are no longer running + * in the master but are still being replayed on the standby, thus possibly + * making the GetOldestXmin reading go backwards. In this case there is a + * possibility that we lose data that the standby would like to have, but + * there is little we can do about that --- data is only protected if the + * walsender runs continuously while queries are executed on the standby. + * (The Hot Standby code deals with such cases by failing standby queries + * that needed to access already-removed data, so there's no integrity bug.) + * The return value is also adjusted with vacuum_defer_cleanup_age, so + * increasing that setting on the fly is another easy way to make + * GetOldestXmin() move backwards, with no consequences for data integrity. */ TransactionId GetOldestXmin(bool allDbs, bool ignoreVacuum) @@ -1046,7 +1056,7 @@ GetOldestXmin(bool allDbs, bool ignoreVacuum) if (allDbs || proc->databaseId == MyDatabaseId || - proc->databaseId == 0) /* include WalSender */ + proc->databaseId == 0) /* always include WalSender */ { /* Fetch xid just once - see GetNewTransactionId */ TransactionId xid = proc->xid; @@ -1092,16 +1102,18 @@ GetOldestXmin(bool allDbs, bool ignoreVacuum) LWLockRelease(ProcArrayLock); /* - * Compute the cutoff XID, being careful not to generate a "permanent" - * XID. We need do this only on the primary, never on standby. + * Compute the cutoff XID by subtracting vacuum_defer_cleanup_age, + * being careful not to generate a "permanent" XID. * * vacuum_defer_cleanup_age provides some additional "slop" for the * benefit of hot standby queries on slave servers. This is quick and * dirty, and perhaps not all that useful unless the master has a - * predictable transaction rate, but it's what we've got. Note that - * we are assuming vacuum_defer_cleanup_age isn't large enough to - * cause wraparound --- so guc.c should limit it to no more than the - * xidStopLimit threshold in varsup.c. + * predictable transaction rate, but it offers some protection when + * there's no walsender connection. Note that we are assuming + * vacuum_defer_cleanup_age isn't large enough to cause wraparound --- + * so guc.c should limit it to no more than the xidStopLimit threshold + * in varsup.c. Also note that we intentionally don't apply + * vacuum_defer_cleanup_age on standby servers. */ result -= vacuum_defer_cleanup_age; if (!TransactionIdIsNormal(result)) -- 2.40.0