]> granicus.if.org Git - postgresql/commitdiff
Cleanup commit timestamp module activaction, again
authorAlvaro Herrera <alvherre@alvh.no-ip.org>
Tue, 27 Oct 2015 18:06:50 +0000 (15:06 -0300)
committerAlvaro Herrera <alvherre@alvh.no-ip.org>
Tue, 27 Oct 2015 18:06:50 +0000 (15:06 -0300)
Further tweak commit_ts.c so that on a standby the state is completely
consistent with what that in the master, rather than behaving
differently in the cases that the settings differ.  Now in standby and
master the module should always be active or inactive in lockstep.

Author: Petr Jelínek, with some further tweaks by Álvaro Herrera.

Backpatch to 9.5, where commit timestamps were introduced.

Discussion: http://www.postgresql.org/message-id/5622BF9D.2010409@2ndquadrant.com

src/backend/access/transam/commit_ts.c
src/backend/commands/vacuum.c
src/include/access/commit_ts.h

index 24b8291083546a1a9ca8d51892ba2e2d38e6867d..b21a31345f76d44fadf1f3e69f9c4906fd30ae48 100644 (file)
@@ -78,13 +78,21 @@ static SlruCtlData CommitTsCtlData;
 #define CommitTsCtl (&CommitTsCtlData)
 
 /*
- * We keep a cache of the last value set in shared memory.  This is protected
- * by CommitTsLock.
+ * We keep a cache of the last value set in shared memory.
+ *
+ * This is also good place to keep the activation status.  We keep this
+ * separate from the GUC so that the standby can activate the module if the
+ * primary has it active independently of the value of the GUC.
+ *
+ * This is protected by CommitTsLock.  In some places, we use commitTsActive
+ * without acquiring the lock; where this happens, a comment explains the
+ * rationale for it.
  */
 typedef struct CommitTimestampShared
 {
        TransactionId xidLastCommit;
        CommitTimestampEntry dataLastCommit;
+       bool    commitTsActive;
 } CommitTimestampShared;
 
 CommitTimestampShared *commitTsShared;
@@ -93,14 +101,6 @@ CommitTimestampShared *commitTsShared;
 /* GUC variable */
 bool           track_commit_timestamp;
 
-/*
- * When this is set, commit_ts is force-enabled during recovery.  This is so
- * that a standby can replay WAL records coming from a master with the setting
- * enabled.  (Note that this doesn't enable SQL access to the data; it's
- * effectively write-only until the GUC itself is enabled.)
- */
-static bool            enable_during_recovery;
-
 static void SetXidCommitTsInPage(TransactionId xid, int nsubxids,
                                         TransactionId *subxids, TimestampTz ts,
                                         RepOriginId nodeid, int pageno);
@@ -109,7 +109,7 @@ static void TransactionIdSetCommitTs(TransactionId xid, TimestampTz ts,
 static int     ZeroCommitTsPage(int pageno, bool writeXlog);
 static bool CommitTsPagePrecedes(int page1, int page2);
 static void ActivateCommitTs(void);
-static void DeactivateCommitTs(bool do_wal);
+static void DeactivateCommitTs(void);
 static void WriteZeroPageXlogRec(int pageno);
 static void WriteTruncateXlogRec(int pageno);
 static void WriteSetTimestampXlogRec(TransactionId mainxid, int nsubxids,
@@ -149,10 +149,14 @@ TransactionTreeSetCommitTsData(TransactionId xid, int nsubxids,
        TransactionId newestXact;
 
        /*
-        * No-op if the module is not enabled, but allow writes in a standby
-        * during recovery.
+        * No-op if the module is not active.
+        *
+        * An unlocked read here is fine, because in a standby (the only place
+        * where the flag can change in flight) this routine is only called by
+        * the recovery process, which is also the only process which can change
+        * the flag.
         */
-       if (!track_commit_timestamp && !enable_during_recovery)
+       if (!commitTsShared->commitTsActive)
                return;
 
        /*
@@ -283,30 +287,45 @@ TransactionIdGetCommitTsData(TransactionId xid, TimestampTz *ts,
        TransactionId oldestCommitTs;
        TransactionId newestCommitTs;
 
+       /* error if the given Xid doesn't normally commit */
+       if (!TransactionIdIsNormal(xid))
+               ereport(ERROR,
+                               (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+               errmsg("cannot retrieve commit timestamp for transaction %u", xid)));
+
+       LWLockAcquire(CommitTsLock, LW_SHARED);
+
        /* Error if module not enabled */
-       if (!track_commit_timestamp)
+       if (!commitTsShared->commitTsActive)
                ereport(ERROR,
                                (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
                                 errmsg("could not get commit timestamp data"),
                          errhint("Make sure the configuration parameter \"%s\" is set.",
                                          "track_commit_timestamp")));
 
-       /* error if the given Xid doesn't normally commit */
-       if (!TransactionIdIsNormal(xid))
-               ereport(ERROR,
-                               (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
-               errmsg("cannot retrieve commit timestamp for transaction %u", xid)));
-
        /*
-        * Return empty if the requested value is outside our valid range.
+        * If we're asked for the cached value, return that.  Otherwise, fall
+        * through to read from SLRU.
         */
-       LWLockAcquire(CommitTsLock, LW_SHARED);
+       if (commitTsShared->xidLastCommit == xid)
+       {
+               *ts = commitTsShared->dataLastCommit.time;
+               if (nodeid)
+                       *nodeid = commitTsShared->dataLastCommit.nodeid;
+
+               LWLockRelease(CommitTsLock);
+               return *ts != 0;
+       }
+
        oldestCommitTs = ShmemVariableCache->oldestCommitTs;
        newestCommitTs = ShmemVariableCache->newestCommitTs;
        /* neither is invalid, or both are */
        Assert(TransactionIdIsValid(oldestCommitTs) == TransactionIdIsValid(newestCommitTs));
        LWLockRelease(CommitTsLock);
 
+       /*
+        * Return empty if the requested value is outside our valid range.
+        */
        if (!TransactionIdIsValid(oldestCommitTs) ||
                TransactionIdPrecedes(xid, oldestCommitTs) ||
                TransactionIdPrecedes(newestCommitTs, xid))
@@ -317,27 +336,6 @@ TransactionIdGetCommitTsData(TransactionId xid, TimestampTz *ts,
                return false;
        }
 
-       /*
-        * Use an unlocked atomic read on our cached value in shared memory; if
-        * it's a hit, acquire a lock and read the data, after verifying that it's
-        * still what we initially read.  Otherwise, fall through to read from
-        * SLRU.
-        */
-       if (commitTsShared->xidLastCommit == xid)
-       {
-               LWLockAcquire(CommitTsLock, LW_SHARED);
-               if (commitTsShared->xidLastCommit == xid)
-               {
-                       *ts = commitTsShared->dataLastCommit.time;
-                       if (nodeid)
-                               *nodeid = commitTsShared->dataLastCommit.nodeid;
-
-                       LWLockRelease(CommitTsLock);
-                       return *ts != 0;
-               }
-               LWLockRelease(CommitTsLock);
-       }
-
        /* lock is acquired by SimpleLruReadPage_ReadOnly */
        slotno = SimpleLruReadPage_ReadOnly(CommitTsCtl, pageno, xid);
        memcpy(&entry,
@@ -366,15 +364,16 @@ GetLatestCommitTsData(TimestampTz *ts, RepOriginId *nodeid)
 {
        TransactionId xid;
 
+       LWLockAcquire(CommitTsLock, LW_SHARED);
+
        /* Error if module not enabled */
-       if (!track_commit_timestamp)
+       if (!commitTsShared->commitTsActive)
                ereport(ERROR,
                                (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
                                 errmsg("could not get commit timestamp data"),
                          errhint("Make sure the configuration parameter \"%s\" is set.",
                                          "track_commit_timestamp")));
 
-       LWLockAcquire(CommitTsLock, LW_SHARED);
        xid = commitTsShared->xidLastCommit;
        if (ts)
                *ts = commitTsShared->dataLastCommit.time;
@@ -493,6 +492,7 @@ CommitTsShmemInit(void)
                commitTsShared->xidLastCommit = InvalidTransactionId;
                TIMESTAMP_NOBEGIN(commitTsShared->dataLastCommit.time);
                commitTsShared->dataLastCommit.nodeid = InvalidRepOriginId;
+               commitTsShared->commitTsActive = false;
        }
        else
                Assert(found);
@@ -566,7 +566,7 @@ CompleteCommitTsInitialization(void)
         * any leftover data.
         */
        if (!track_commit_timestamp)
-               DeactivateCommitTs(true);
+               DeactivateCommitTs();
 }
 
 /*
@@ -588,11 +588,11 @@ CommitTsParameterChange(bool newvalue, bool oldvalue)
         */
        if (newvalue)
        {
-               if (!track_commit_timestamp && !oldvalue)
+               if (!commitTsShared->commitTsActive)
                        ActivateCommitTs();
        }
-       else if (!track_commit_timestamp && oldvalue)
-               DeactivateCommitTs(false);
+       else if (commitTsShared->commitTsActive)
+               DeactivateCommitTs();
 }
 
 /*
@@ -645,7 +645,7 @@ ActivateCommitTs(void)
        }
        LWLockRelease(CommitTsLock);
 
-       /* Finally, create the current segment file, if necessary */
+       /* Create the current segment file, if necessary */
        if (!SimpleLruDoesPhysicalPageExist(CommitTsCtl, pageno))
        {
                int                     slotno;
@@ -657,8 +657,10 @@ ActivateCommitTs(void)
                LWLockRelease(CommitTsControlLock);
        }
 
-       /* We can now replay xlog records from this module */
-       enable_during_recovery = true;
+       /* Change the activation status in shared memory. */
+       LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
+       commitTsShared->commitTsActive = true;
+       LWLockRelease(CommitTsLock);
 }
 
 /*
@@ -672,21 +674,25 @@ ActivateCommitTs(void)
  * possibly-invalid data; also removes segments of old data.
  */
 static void
-DeactivateCommitTs(bool do_wal)
+DeactivateCommitTs(void)
 {
-       TransactionId xid = ShmemVariableCache->nextXid;
-       int                     pageno = TransactionIdToCTsPage(xid);
-
        /*
-        * Re-Initialize our idea of the latest page number.
+        * Cleanup the status in the shared memory.
+        *
+        * We reset everything in the commitTsShared record to prevent user from
+        * getting confusing data about last committed transaction on the standby
+        * when the module was activated repeatedly on the primary.
         */
-       LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE);
-       CommitTsCtl->shared->latest_page_number = pageno;
-       LWLockRelease(CommitTsControlLock);
-
        LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
+
+       commitTsShared->commitTsActive = false;
+       commitTsShared->xidLastCommit = InvalidTransactionId;
+       TIMESTAMP_NOBEGIN(commitTsShared->dataLastCommit.time);
+       commitTsShared->dataLastCommit.nodeid = InvalidRepOriginId;
+
        ShmemVariableCache->oldestCommitTs = InvalidTransactionId;
        ShmemVariableCache->newestCommitTs = InvalidTransactionId;
+
        LWLockRelease(CommitTsLock);
 
        /*
@@ -697,10 +703,9 @@ DeactivateCommitTs(bool do_wal)
         * be overwritten anyway when we wrap around, but it seems better to be
         * tidy.)
         */
+       LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE);
        (void) SlruScanDirectory(CommitTsCtl, SlruScanDirCbDeleteAll, NULL);
-
-       /* No longer enabled on recovery */
-       enable_during_recovery = false;
+       LWLockRelease(CommitTsControlLock);
 }
 
 /*
@@ -739,8 +744,13 @@ ExtendCommitTs(TransactionId newestXact)
 {
        int                     pageno;
 
-       /* nothing to do if module not enabled */
-       if (!track_commit_timestamp && !enable_during_recovery)
+       /*
+        * Nothing to do if module not enabled.  Note we do an unlocked read of the
+        * flag here, which is okay because this routine is only called from
+        * GetNewTransactionId, which is never called in a standby.
+        */
+       Assert(!InRecovery);
+       if (!commitTsShared->commitTsActive)
                return;
 
        /*
@@ -768,7 +778,7 @@ ExtendCommitTs(TransactionId newestXact)
  * Note that we don't need to flush XLOG here.
  */
 void
-TruncateCommitTs(TransactionId oldestXact, bool do_wal)
+TruncateCommitTs(TransactionId oldestXact)
 {
        int                     cutoffPage;
 
@@ -784,8 +794,7 @@ TruncateCommitTs(TransactionId oldestXact, bool do_wal)
                return;                                 /* nothing to remove */
 
        /* Write XLOG record */
-       if (do_wal)
-               WriteTruncateXlogRec(cutoffPage);
+       WriteTruncateXlogRec(cutoffPage);
 
        /* Now we can remove the old CommitTs segment(s) */
        SimpleLruTruncate(CommitTsCtl, cutoffPage);
index 6d55148edd4a72824f042cbea839bb22a55ac6dc..7c4ef58129e650bd62d20deba0e8df8201f9bdd1 100644 (file)
@@ -1140,7 +1140,7 @@ vac_truncate_clog(TransactionId frozenXID,
         * Truncate CLOG, multixact and CommitTs to the oldest computed value.
         */
        TruncateCLOG(frozenXID);
-       TruncateCommitTs(frozenXID, true);
+       TruncateCommitTs(frozenXID);
        TruncateMultiXact(minMulti, minmulti_datoid);
 
        /*
index 1b95b5837ea31dcc6433d371a1fb39b384b374c2..3844bb30ff252f0708c4ad63a18d297bfbcfe47c 100644 (file)
@@ -40,7 +40,7 @@ extern void CompleteCommitTsInitialization(void);
 extern void ShutdownCommitTs(void);
 extern void CheckPointCommitTs(void);
 extern void ExtendCommitTs(TransactionId newestXact);
-extern void TruncateCommitTs(TransactionId oldestXact, bool do_wal);
+extern void TruncateCommitTs(TransactionId oldestXact);
 extern void SetCommitTsLimit(TransactionId oldestXact,
                                 TransactionId newestXact);
 extern void AdvanceOldestCommitTs(TransactionId oldestXact);