]> granicus.if.org Git - postgresql/blobdiff - src/backend/access/transam/commit_ts.c
Phase 2 of pgindent updates.
[postgresql] / src / backend / access / transam / commit_ts.c
index 79ca04a6eafd285d44e2db6e6d57fc6a54990ed0..7646c23c4e7fa6e24a75cddb8263b95bf46bdd20 100644 (file)
@@ -3,7 +3,7 @@
  * commit_ts.c
  *             PostgreSQL commit timestamp manager
  *
- * This module is a pg_clog-like system that stores the commit timestamp
+ * This module is a pg_xact-like system that stores the commit timestamp
  * for each transaction.
  *
  * XLOG interactions: this module generates an XLOG record whenever a new
@@ -15,7 +15,7 @@
  * re-perform the status update on redo; so we need make no additional XLOG
  * entry here.
  *
- * Portions Copyright (c) 1996-2015, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
  * src/backend/access/transam/commit_ts.c
@@ -32,6 +32,7 @@
 #include "funcapi.h"
 #include "miscadmin.h"
 #include "pg_trace.h"
+#include "storage/shmem.h"
 #include "utils/builtins.h"
 #include "utils/snapmgr.h"
 #include "utils/timestamp.h"
@@ -78,13 +79,21 @@ static SlruCtlData CommitTsCtlData;
 #define CommitTsCtl (&CommitTsCtlData)
 
 /*
- * We keep a cache of the last value set in shared memory.  This is protected
- * by CommitTsLock.
+ * We keep a cache of the last value set in shared memory.
+ *
+ * This is also good place to keep the activation status.  We keep this
+ * separate from the GUC so that the standby can activate the module if the
+ * primary has it active independently of the value of the GUC.
+ *
+ * This is protected by CommitTsLock.  In some places, we use commitTsActive
+ * without acquiring the lock; where this happens, a comment explains the
+ * rationale for it.
  */
 typedef struct CommitTimestampShared
 {
        TransactionId xidLastCommit;
        CommitTimestampEntry dataLastCommit;
+       bool            commitTsActive;
 } CommitTimestampShared;
 
 CommitTimestampShared *commitTsShared;
@@ -93,25 +102,18 @@ CommitTimestampShared *commitTsShared;
 /* GUC variable */
 bool           track_commit_timestamp;
 
-/*
- * When this is set, commit_ts is force-enabled during recovery.  This is so
- * that a standby can replay WAL records coming from a master with the setting
- * enabled.  (Note that this doesn't enable SQL access to the data; it's
- * effectively write-only until the GUC itself is enabled.)
- */
-static bool            enable_during_recovery;
-
 static void SetXidCommitTsInPage(TransactionId xid, int nsubxids,
                                         TransactionId *subxids, TimestampTz ts,
                                         RepOriginId nodeid, int pageno);
 static void TransactionIdSetCommitTs(TransactionId xid, TimestampTz ts,
                                                 RepOriginId nodeid, int slotno);
+static void error_commit_ts_disabled(void);
 static int     ZeroCommitTsPage(int pageno, bool writeXlog);
 static bool CommitTsPagePrecedes(int page1, int page2);
 static void ActivateCommitTs(void);
-static void DeactivateCommitTs(bool do_wal);
+static void DeactivateCommitTs(void);
 static void WriteZeroPageXlogRec(int pageno);
-static void WriteTruncateXlogRec(int pageno);
+static void WriteTruncateXlogRec(int pageno, TransactionId oldestXid);
 static void WriteSetTimestampXlogRec(TransactionId mainxid, int nsubxids,
                                                 TransactionId *subxids, TimestampTz timestamp,
                                                 RepOriginId nodeid);
@@ -149,10 +151,14 @@ TransactionTreeSetCommitTsData(TransactionId xid, int nsubxids,
        TransactionId newestXact;
 
        /*
-        * No-op if the module is not enabled, but allow writes in a standby
-        * during recovery.
+        * No-op if the module is not active.
+        *
+        * An unlocked read here is fine, because in a standby (the only place
+        * where the flag can change in flight) this routine is only called by the
+        * recovery process, which is also the only process which can change the
+        * flag.
         */
-       if (!track_commit_timestamp && !enable_during_recovery)
+       if (!commitTsShared->commitTsActive)
                return;
 
        /*
@@ -212,8 +218,8 @@ TransactionTreeSetCommitTsData(TransactionId xid, int nsubxids,
        commitTsShared->dataLastCommit.nodeid = nodeid;
 
        /* and move forwards our endpoint, if needed */
-       if (TransactionIdPrecedes(ShmemVariableCache->newestCommitTs, newestXact))
-               ShmemVariableCache->newestCommitTs = newestXact;
+       if (TransactionIdPrecedes(ShmemVariableCache->newestCommitTsXid, newestXact))
+               ShmemVariableCache->newestCommitTsXid = newestXact;
        LWLockRelease(CommitTsLock);
 }
 
@@ -280,62 +286,59 @@ TransactionIdGetCommitTsData(TransactionId xid, TimestampTz *ts,
        int                     entryno = TransactionIdToCTsEntry(xid);
        int                     slotno;
        CommitTimestampEntry entry;
-       TransactionId oldestCommitTs;
-       TransactionId newestCommitTs;
+       TransactionId oldestCommitTsXid;
+       TransactionId newestCommitTsXid;
 
-       /* Error if module not enabled */
-       if (!track_commit_timestamp)
-               ereport(ERROR,
-                               (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
-                                errmsg("could not get commit timestamp data"),
-                         errhint("Make sure the configuration parameter \"%s\" is set.",
-                                         "track_commit_timestamp")));
-
-       /* error if the given Xid doesn't normally commit */
-       if (!TransactionIdIsNormal(xid))
+       if (!TransactionIdIsValid(xid))
                ereport(ERROR,
                                (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
                errmsg("cannot retrieve commit timestamp for transaction %u", xid)));
-
-       /*
-        * Return empty if the requested value is outside our valid range.
-        */
-       LWLockAcquire(CommitTsLock, LW_SHARED);
-       oldestCommitTs = ShmemVariableCache->oldestCommitTs;
-       newestCommitTs = ShmemVariableCache->newestCommitTs;
-       /* neither is invalid, or both are */
-       Assert(TransactionIdIsValid(oldestCommitTs) == TransactionIdIsValid(newestCommitTs));
-       LWLockRelease(CommitTsLock);
-
-       if (!TransactionIdIsValid(oldestCommitTs) ||
-               TransactionIdPrecedes(xid, oldestCommitTs) ||
-               TransactionIdPrecedes(newestCommitTs, xid))
+       else if (!TransactionIdIsNormal(xid))
        {
+               /* frozen and bootstrap xids are always committed far in the past */
                *ts = 0;
                if (nodeid)
-                       *nodeid = InvalidRepOriginId;
+                       *nodeid = 0;
                return false;
        }
 
+       LWLockAcquire(CommitTsLock, LW_SHARED);
+
+       /* Error if module not enabled */
+       if (!commitTsShared->commitTsActive)
+               error_commit_ts_disabled();
+
        /*
-        * Use an unlocked atomic read on our cached value in shared memory; if
-        * it's a hit, acquire a lock and read the data, after verifying that it's
-        * still what we initially read.  Otherwise, fall through to read from
-        * SLRU.
+        * If we're asked for the cached value, return that.  Otherwise, fall
+        * through to read from SLRU.
         */
        if (commitTsShared->xidLastCommit == xid)
        {
-               LWLockAcquire(CommitTsLock, LW_SHARED);
-               if (commitTsShared->xidLastCommit == xid)
-               {
-                       *ts = commitTsShared->dataLastCommit.time;
-                       if (nodeid)
-                               *nodeid = commitTsShared->dataLastCommit.nodeid;
+               *ts = commitTsShared->dataLastCommit.time;
+               if (nodeid)
+                       *nodeid = commitTsShared->dataLastCommit.nodeid;
 
-                       LWLockRelease(CommitTsLock);
-                       return *ts != 0;
-               }
                LWLockRelease(CommitTsLock);
+               return *ts != 0;
+       }
+
+       oldestCommitTsXid = ShmemVariableCache->oldestCommitTsXid;
+       newestCommitTsXid = ShmemVariableCache->newestCommitTsXid;
+       /* neither is invalid, or both are */
+       Assert(TransactionIdIsValid(oldestCommitTsXid) == TransactionIdIsValid(newestCommitTsXid));
+       LWLockRelease(CommitTsLock);
+
+       /*
+        * Return empty if the requested value is outside our valid range.
+        */
+       if (!TransactionIdIsValid(oldestCommitTsXid) ||
+               TransactionIdPrecedes(xid, oldestCommitTsXid) ||
+               TransactionIdPrecedes(newestCommitTsXid, xid))
+       {
+               *ts = 0;
+               if (nodeid)
+                       *nodeid = InvalidRepOriginId;
+               return false;
        }
 
        /* lock is acquired by SimpleLruReadPage_ReadOnly */
@@ -366,15 +369,12 @@ GetLatestCommitTsData(TimestampTz *ts, RepOriginId *nodeid)
 {
        TransactionId xid;
 
+       LWLockAcquire(CommitTsLock, LW_SHARED);
+
        /* Error if module not enabled */
-       if (!track_commit_timestamp)
-               ereport(ERROR,
-                               (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
-                                errmsg("could not get commit timestamp data"),
-                         errhint("Make sure the configuration parameter \"%s\" is set.",
-                                         "track_commit_timestamp")));
+       if (!commitTsShared->commitTsActive)
+               error_commit_ts_disabled();
 
-       LWLockAcquire(CommitTsLock, LW_SHARED);
        xid = commitTsShared->xidLastCommit;
        if (ts)
                *ts = commitTsShared->dataLastCommit.time;
@@ -385,6 +385,19 @@ GetLatestCommitTsData(TimestampTz *ts, RepOriginId *nodeid)
        return xid;
 }
 
+static void
+error_commit_ts_disabled(void)
+{
+       ereport(ERROR,
+                       (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+                        errmsg("could not get commit timestamp data"),
+                        RecoveryInProgress() ?
+                        errhint("Make sure the configuration parameter \"%s\" is set on the master server.",
+                                        "track_commit_timestamp") :
+                        errhint("Make sure the configuration parameter \"%s\" is set.",
+                                        "track_commit_timestamp")));
+}
+
 /*
  * SQL-callable wrapper to obtain commit time of a transaction
  */
@@ -479,8 +492,9 @@ CommitTsShmemInit(void)
        bool            found;
 
        CommitTsCtl->PagePrecedes = CommitTsPagePrecedes;
-       SimpleLruInit(CommitTsCtl, "CommitTs Ctl", CommitTsShmemBuffers(), 0,
-                                 CommitTsControlLock, "pg_commit_ts");
+       SimpleLruInit(CommitTsCtl, "commit_timestamp", CommitTsShmemBuffers(), 0,
+                                 CommitTsControlLock, "pg_commit_ts",
+                                 LWTRANCHE_COMMITTS_BUFFERS);
 
        commitTsShared = ShmemInitStruct("CommitTs shared",
                                                                         sizeof(CommitTimestampShared),
@@ -493,6 +507,7 @@ CommitTsShmemInit(void)
                commitTsShared->xidLastCommit = InvalidTransactionId;
                TIMESTAMP_NOBEGIN(commitTsShared->dataLastCommit.time);
                commitTsShared->dataLastCommit.nodeid = InvalidRepOriginId;
+               commitTsShared->commitTsActive = false;
        }
        else
                Assert(found);
@@ -510,7 +525,7 @@ BootStrapCommitTs(void)
        /*
         * Nothing to do here at present, unlike most other SLRU modules; segments
         * are created when the server is started with this module enabled. See
-        * StartupCommitTs.
+        * ActivateCommitTs.
         */
 }
 
@@ -539,19 +554,11 @@ ZeroCommitTsPage(int pageno, bool writeXlog)
 /*
  * This must be called ONCE during postmaster or standalone-backend startup,
  * after StartupXLOG has initialized ShmemVariableCache->nextXid.
- *
- * Caller may choose to enable the feature even when it is turned off in the
- * configuration.
  */
 void
-StartupCommitTs(bool force_enable)
+StartupCommitTs(void)
 {
-       /*
-        * If the module is not enabled, there's nothing to do here.  The module
-        * could still be activated from elsewhere.
-        */
-       if (track_commit_timestamp || force_enable)
-               ActivateCommitTs();
+       ActivateCommitTs();
 }
 
 /*
@@ -564,9 +571,17 @@ CompleteCommitTsInitialization(void)
        /*
         * If the feature is not enabled, turn it off for good.  This also removes
         * any leftover data.
+        *
+        * Conversely, we activate the module if the feature is enabled.  This is
+        * not necessary in a master system because we already did it earlier, but
+        * if we're in a standby server that got promoted which had the feature
+        * enabled and was following a master that had the feature disabled, this
+        * is where we turn it on locally.
         */
        if (!track_commit_timestamp)
-               DeactivateCommitTs(true);
+               DeactivateCommitTs();
+       else
+               ActivateCommitTs();
 }
 
 /*
@@ -583,20 +598,24 @@ CommitTsParameterChange(bool newvalue, bool oldvalue)
         * pg_control.  If the old value was already set, we already did this, so
         * don't do anything.
         *
-        * If the module is disabled in the master, disable it here too.
+        * If the module is disabled in the master, disable it here too, unless
+        * the module is enabled locally.
+        *
+        * Note this only runs in the recovery process, so an unlocked read is
+        * fine.
         */
        if (newvalue)
        {
-               if (!track_commit_timestamp && !oldvalue)
+               if (!commitTsShared->commitTsActive)
                        ActivateCommitTs();
        }
-       else if (oldvalue)
-               DeactivateCommitTs(false);
+       else if (commitTsShared->commitTsActive)
+               DeactivateCommitTs();
 }
 
 /*
  * Activate this module whenever necessary.
- *             This must happen during postmaster or standalong-backend startup,
+ *             This must happen during postmaster or standalone-backend startup,
  *             or during WAL replay anytime the track_commit_timestamp setting is
  *             changed in the master.
  *
@@ -613,8 +632,20 @@ CommitTsParameterChange(bool newvalue, bool oldvalue)
 static void
 ActivateCommitTs(void)
 {
-       TransactionId xid = ShmemVariableCache->nextXid;
-       int                     pageno = TransactionIdToCTsPage(xid);
+       TransactionId xid;
+       int                     pageno;
+
+       /* If we've done this already, there's nothing to do */
+       LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
+       if (commitTsShared->commitTsActive)
+       {
+               LWLockRelease(CommitTsLock);
+               return;
+       }
+       LWLockRelease(CommitTsLock);
+
+       xid = ShmemVariableCache->nextXid;
+       pageno = TransactionIdToCTsPage(xid);
 
        /*
         * Re-Initialize our idea of the latest page number.
@@ -633,18 +664,18 @@ ActivateCommitTs(void)
         * enabled again?  It doesn't look like it does, because there should be a
         * checkpoint that sets the value to InvalidTransactionId at end of
         * recovery; and so any chance of injecting new transactions without
-        * CommitTs values would occur after the oldestCommitTs has been set to
+        * CommitTs values would occur after the oldestCommitTsXid has been set to
         * Invalid temporarily.
         */
        LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
-       if (ShmemVariableCache->oldestCommitTs == InvalidTransactionId)
+       if (ShmemVariableCache->oldestCommitTsXid == InvalidTransactionId)
        {
-               ShmemVariableCache->oldestCommitTs =
-                       ShmemVariableCache->newestCommitTs = ReadNewTransactionId();
+               ShmemVariableCache->oldestCommitTsXid =
+                       ShmemVariableCache->newestCommitTsXid = ReadNewTransactionId();
        }
        LWLockRelease(CommitTsLock);
 
-       /* Finally, create the current segment file, if necessary */
+       /* Create the current segment file, if necessary */
        if (!SimpleLruDoesPhysicalPageExist(CommitTsCtl, pageno))
        {
                int                     slotno;
@@ -656,8 +687,10 @@ ActivateCommitTs(void)
                LWLockRelease(CommitTsControlLock);
        }
 
-       /* We can now replay xlog records from this module */
-       enable_during_recovery = true;
+       /* Change the activation status in shared memory. */
+       LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
+       commitTsShared->commitTsActive = true;
+       LWLockRelease(CommitTsLock);
 }
 
 /*
@@ -671,21 +704,25 @@ ActivateCommitTs(void)
  * possibly-invalid data; also removes segments of old data.
  */
 static void
-DeactivateCommitTs(bool do_wal)
+DeactivateCommitTs(void)
 {
-       TransactionId xid = ShmemVariableCache->nextXid;
-       int                     pageno = TransactionIdToCTsPage(xid);
-
        /*
-        * Re-Initialize our idea of the latest page number.
+        * Cleanup the status in the shared memory.
+        *
+        * We reset everything in the commitTsShared record to prevent user from
+        * getting confusing data about last committed transaction on the standby
+        * when the module was activated repeatedly on the primary.
         */
-       LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE);
-       CommitTsCtl->shared->latest_page_number = pageno;
-       LWLockRelease(CommitTsControlLock);
-
        LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
-       ShmemVariableCache->oldestCommitTs = InvalidTransactionId;
-       ShmemVariableCache->newestCommitTs = InvalidTransactionId;
+
+       commitTsShared->commitTsActive = false;
+       commitTsShared->xidLastCommit = InvalidTransactionId;
+       TIMESTAMP_NOBEGIN(commitTsShared->dataLastCommit.time);
+       commitTsShared->dataLastCommit.nodeid = InvalidRepOriginId;
+
+       ShmemVariableCache->oldestCommitTsXid = InvalidTransactionId;
+       ShmemVariableCache->newestCommitTsXid = InvalidTransactionId;
+
        LWLockRelease(CommitTsLock);
 
        /*
@@ -696,10 +733,9 @@ DeactivateCommitTs(bool do_wal)
         * be overwritten anyway when we wrap around, but it seems better to be
         * tidy.)
         */
+       LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE);
        (void) SlruScanDirectory(CommitTsCtl, SlruScanDirCbDeleteAll, NULL);
-
-       /* No longer enabled on recovery */
-       enable_during_recovery = false;
+       LWLockRelease(CommitTsControlLock);
 }
 
 /*
@@ -710,6 +746,12 @@ ShutdownCommitTs(void)
 {
        /* Flush dirty CommitTs pages to disk */
        SimpleLruFlush(CommitTsCtl, false);
+
+       /*
+        * fsync pg_commit_ts to ensure that any files flushed previously are
+        * durably on disk.
+        */
+       fsync_fname("pg_commit_ts", true);
 }
 
 /*
@@ -720,6 +762,12 @@ CheckPointCommitTs(void)
 {
        /* Flush dirty CommitTs pages to disk */
        SimpleLruFlush(CommitTsCtl, true);
+
+       /*
+        * fsync pg_commit_ts to ensure that any files flushed previously are
+        * durably on disk.
+        */
+       fsync_fname("pg_commit_ts", true);
 }
 
 /*
@@ -738,8 +786,13 @@ ExtendCommitTs(TransactionId newestXact)
 {
        int                     pageno;
 
-       /* nothing to do if module not enabled */
-       if (!track_commit_timestamp && !enable_during_recovery)
+       /*
+        * Nothing to do if module not enabled.  Note we do an unlocked read of
+        * the flag here, which is okay because this routine is only called from
+        * GetNewTransactionId, which is never called in a standby.
+        */
+       Assert(!InRecovery);
+       if (!commitTsShared->commitTsActive)
                return;
 
        /*
@@ -767,7 +820,7 @@ ExtendCommitTs(TransactionId newestXact)
  * Note that we don't need to flush XLOG here.
  */
 void
-TruncateCommitTs(TransactionId oldestXact, bool do_wal)
+TruncateCommitTs(TransactionId oldestXact)
 {
        int                     cutoffPage;
 
@@ -783,8 +836,7 @@ TruncateCommitTs(TransactionId oldestXact, bool do_wal)
                return;                                 /* nothing to remove */
 
        /* Write XLOG record */
-       if (do_wal)
-               WriteTruncateXlogRec(cutoffPage);
+       WriteTruncateXlogRec(cutoffPage, oldestXact);
 
        /* Now we can remove the old CommitTs segment(s) */
        SimpleLruTruncate(CommitTsCtl, cutoffPage);
@@ -801,16 +853,18 @@ SetCommitTsLimit(TransactionId oldestXact, TransactionId newestXact)
         * "future" or signal a disabled committs.
         */
        LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
-       if (ShmemVariableCache->oldestCommitTs != InvalidTransactionId)
+       if (ShmemVariableCache->oldestCommitTsXid != InvalidTransactionId)
        {
-               if (TransactionIdPrecedes(ShmemVariableCache->oldestCommitTs, oldestXact))
-                       ShmemVariableCache->oldestCommitTs = oldestXact;
-               if (TransactionIdPrecedes(newestXact, ShmemVariableCache->newestCommitTs))
-                       ShmemVariableCache->newestCommitTs = newestXact;
+               if (TransactionIdPrecedes(ShmemVariableCache->oldestCommitTsXid, oldestXact))
+                       ShmemVariableCache->oldestCommitTsXid = oldestXact;
+               if (TransactionIdPrecedes(newestXact, ShmemVariableCache->newestCommitTsXid))
+                       ShmemVariableCache->newestCommitTsXid = newestXact;
        }
        else
        {
-               Assert(ShmemVariableCache->newestCommitTs == InvalidTransactionId);
+               Assert(ShmemVariableCache->newestCommitTsXid == InvalidTransactionId);
+               ShmemVariableCache->oldestCommitTsXid = oldestXact;
+               ShmemVariableCache->newestCommitTsXid = newestXact;
        }
        LWLockRelease(CommitTsLock);
 }
@@ -819,12 +873,12 @@ SetCommitTsLimit(TransactionId oldestXact, TransactionId newestXact)
  * Move forwards the oldest commitTS value that can be consulted
  */
 void
-AdvanceOldestCommitTs(TransactionId oldestXact)
+AdvanceOldestCommitTsXid(TransactionId oldestXact)
 {
        LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
-       if (ShmemVariableCache->oldestCommitTs != InvalidTransactionId &&
-               TransactionIdPrecedes(ShmemVariableCache->oldestCommitTs, oldestXact))
-               ShmemVariableCache->oldestCommitTs = oldestXact;
+       if (ShmemVariableCache->oldestCommitTsXid != InvalidTransactionId &&
+       TransactionIdPrecedes(ShmemVariableCache->oldestCommitTsXid, oldestXact))
+               ShmemVariableCache->oldestCommitTsXid = oldestXact;
        LWLockRelease(CommitTsLock);
 }
 
@@ -868,10 +922,15 @@ WriteZeroPageXlogRec(int pageno)
  * Write a TRUNCATE xlog record
  */
 static void
-WriteTruncateXlogRec(int pageno)
+WriteTruncateXlogRec(int pageno, TransactionId oldestXid)
 {
+       xl_commit_ts_truncate xlrec;
+
+       xlrec.pageno = pageno;
+       xlrec.oldestXid = oldestXid;
+
        XLogBeginInsert();
-       XLogRegisterData((char *) (&pageno), sizeof(int));
+       XLogRegisterData((char *) (&xlrec), SizeOfCommitTsTruncate);
        (void) XLogInsert(RM_COMMIT_TS_ID, COMMIT_TS_TRUNCATE);
 }
 
@@ -925,17 +984,17 @@ commit_ts_redo(XLogReaderState *record)
        }
        else if (info == COMMIT_TS_TRUNCATE)
        {
-               int                     pageno;
+               xl_commit_ts_truncate *trunc = (xl_commit_ts_truncate *) XLogRecGetData(record);
 
-               memcpy(&pageno, XLogRecGetData(record), sizeof(int));
+               AdvanceOldestCommitTsXid(trunc->oldestXid);
 
                /*
                 * During XLOG replay, latest_page_number isn't set up yet; insert a
                 * suitable value to bypass the sanity test in SimpleLruTruncate.
                 */
-               CommitTsCtl->shared->latest_page_number = pageno;
+               CommitTsCtl->shared->latest_page_number = trunc->pageno;
 
-               SimpleLruTruncate(CommitTsCtl, pageno);
+               SimpleLruTruncate(CommitTsCtl, trunc->pageno);
        }
        else if (info == COMMIT_TS_SETTS)
        {