#define CommitTsCtl (&CommitTsCtlData)
/*
- * We keep a cache of the last value set in shared memory. This is protected
- * by CommitTsLock.
+ * We keep a cache of the last value set in shared memory.
+ *
+ * This is also good place to keep the activation status. We keep this
+ * separate from the GUC so that the standby can activate the module if the
+ * primary has it active independently of the value of the GUC.
+ *
+ * This is protected by CommitTsLock. In some places, we use commitTsActive
+ * without acquiring the lock; where this happens, a comment explains the
+ * rationale for it.
*/
typedef struct CommitTimestampShared
{
TransactionId xidLastCommit;
CommitTimestampEntry dataLastCommit;
+ bool commitTsActive;
} CommitTimestampShared;
CommitTimestampShared *commitTsShared;
/* GUC variable */
bool track_commit_timestamp;
-/*
- * When this is set, commit_ts is force-enabled during recovery. This is so
- * that a standby can replay WAL records coming from a master with the setting
- * enabled. (Note that this doesn't enable SQL access to the data; it's
- * effectively write-only until the GUC itself is enabled.)
- */
-static bool enable_during_recovery;
-
static void SetXidCommitTsInPage(TransactionId xid, int nsubxids,
TransactionId *subxids, TimestampTz ts,
RepOriginId nodeid, int pageno);
static int ZeroCommitTsPage(int pageno, bool writeXlog);
static bool CommitTsPagePrecedes(int page1, int page2);
static void ActivateCommitTs(void);
-static void DeactivateCommitTs(bool do_wal);
+static void DeactivateCommitTs(void);
static void WriteZeroPageXlogRec(int pageno);
static void WriteTruncateXlogRec(int pageno);
static void WriteSetTimestampXlogRec(TransactionId mainxid, int nsubxids,
TransactionId newestXact;
/*
- * No-op if the module is not enabled, but allow writes in a standby
- * during recovery.
+ * No-op if the module is not active.
+ *
+ * An unlocked read here is fine, because in a standby (the only place
+ * where the flag can change in flight) this routine is only called by
+ * the recovery process, which is also the only process which can change
+ * the flag.
*/
- if (!track_commit_timestamp && !enable_during_recovery)
+ if (!commitTsShared->commitTsActive)
return;
/*
TransactionId oldestCommitTs;
TransactionId newestCommitTs;
+ /* error if the given Xid doesn't normally commit */
+ if (!TransactionIdIsNormal(xid))
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("cannot retrieve commit timestamp for transaction %u", xid)));
+
+ LWLockAcquire(CommitTsLock, LW_SHARED);
+
/* Error if module not enabled */
- if (!track_commit_timestamp)
+ if (!commitTsShared->commitTsActive)
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("could not get commit timestamp data"),
errhint("Make sure the configuration parameter \"%s\" is set.",
"track_commit_timestamp")));
- /* error if the given Xid doesn't normally commit */
- if (!TransactionIdIsNormal(xid))
- ereport(ERROR,
- (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
- errmsg("cannot retrieve commit timestamp for transaction %u", xid)));
-
/*
- * Return empty if the requested value is outside our valid range.
+ * If we're asked for the cached value, return that. Otherwise, fall
+ * through to read from SLRU.
*/
- LWLockAcquire(CommitTsLock, LW_SHARED);
+ if (commitTsShared->xidLastCommit == xid)
+ {
+ *ts = commitTsShared->dataLastCommit.time;
+ if (nodeid)
+ *nodeid = commitTsShared->dataLastCommit.nodeid;
+
+ LWLockRelease(CommitTsLock);
+ return *ts != 0;
+ }
+
oldestCommitTs = ShmemVariableCache->oldestCommitTs;
newestCommitTs = ShmemVariableCache->newestCommitTs;
/* neither is invalid, or both are */
Assert(TransactionIdIsValid(oldestCommitTs) == TransactionIdIsValid(newestCommitTs));
LWLockRelease(CommitTsLock);
+ /*
+ * Return empty if the requested value is outside our valid range.
+ */
if (!TransactionIdIsValid(oldestCommitTs) ||
TransactionIdPrecedes(xid, oldestCommitTs) ||
TransactionIdPrecedes(newestCommitTs, xid))
return false;
}
- /*
- * Use an unlocked atomic read on our cached value in shared memory; if
- * it's a hit, acquire a lock and read the data, after verifying that it's
- * still what we initially read. Otherwise, fall through to read from
- * SLRU.
- */
- if (commitTsShared->xidLastCommit == xid)
- {
- LWLockAcquire(CommitTsLock, LW_SHARED);
- if (commitTsShared->xidLastCommit == xid)
- {
- *ts = commitTsShared->dataLastCommit.time;
- if (nodeid)
- *nodeid = commitTsShared->dataLastCommit.nodeid;
-
- LWLockRelease(CommitTsLock);
- return *ts != 0;
- }
- LWLockRelease(CommitTsLock);
- }
-
/* lock is acquired by SimpleLruReadPage_ReadOnly */
slotno = SimpleLruReadPage_ReadOnly(CommitTsCtl, pageno, xid);
memcpy(&entry,
{
TransactionId xid;
+ LWLockAcquire(CommitTsLock, LW_SHARED);
+
/* Error if module not enabled */
- if (!track_commit_timestamp)
+ if (!commitTsShared->commitTsActive)
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("could not get commit timestamp data"),
errhint("Make sure the configuration parameter \"%s\" is set.",
"track_commit_timestamp")));
- LWLockAcquire(CommitTsLock, LW_SHARED);
xid = commitTsShared->xidLastCommit;
if (ts)
*ts = commitTsShared->dataLastCommit.time;
commitTsShared->xidLastCommit = InvalidTransactionId;
TIMESTAMP_NOBEGIN(commitTsShared->dataLastCommit.time);
commitTsShared->dataLastCommit.nodeid = InvalidRepOriginId;
+ commitTsShared->commitTsActive = false;
}
else
Assert(found);
* any leftover data.
*/
if (!track_commit_timestamp)
- DeactivateCommitTs(true);
+ DeactivateCommitTs();
}
/*
*/
if (newvalue)
{
- if (!track_commit_timestamp && !oldvalue)
+ if (!commitTsShared->commitTsActive)
ActivateCommitTs();
}
- else if (!track_commit_timestamp && oldvalue)
- DeactivateCommitTs(false);
+ else if (commitTsShared->commitTsActive)
+ DeactivateCommitTs();
}
/*
}
LWLockRelease(CommitTsLock);
- /* Finally, create the current segment file, if necessary */
+ /* Create the current segment file, if necessary */
if (!SimpleLruDoesPhysicalPageExist(CommitTsCtl, pageno))
{
int slotno;
LWLockRelease(CommitTsControlLock);
}
- /* We can now replay xlog records from this module */
- enable_during_recovery = true;
+ /* Change the activation status in shared memory. */
+ LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
+ commitTsShared->commitTsActive = true;
+ LWLockRelease(CommitTsLock);
}
/*
* possibly-invalid data; also removes segments of old data.
*/
static void
-DeactivateCommitTs(bool do_wal)
+DeactivateCommitTs(void)
{
- TransactionId xid = ShmemVariableCache->nextXid;
- int pageno = TransactionIdToCTsPage(xid);
-
/*
- * Re-Initialize our idea of the latest page number.
+ * Cleanup the status in the shared memory.
+ *
+ * We reset everything in the commitTsShared record to prevent user from
+ * getting confusing data about last committed transaction on the standby
+ * when the module was activated repeatedly on the primary.
*/
- LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE);
- CommitTsCtl->shared->latest_page_number = pageno;
- LWLockRelease(CommitTsControlLock);
-
LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
+
+ commitTsShared->commitTsActive = false;
+ commitTsShared->xidLastCommit = InvalidTransactionId;
+ TIMESTAMP_NOBEGIN(commitTsShared->dataLastCommit.time);
+ commitTsShared->dataLastCommit.nodeid = InvalidRepOriginId;
+
ShmemVariableCache->oldestCommitTs = InvalidTransactionId;
ShmemVariableCache->newestCommitTs = InvalidTransactionId;
+
LWLockRelease(CommitTsLock);
/*
* be overwritten anyway when we wrap around, but it seems better to be
* tidy.)
*/
+ LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE);
(void) SlruScanDirectory(CommitTsCtl, SlruScanDirCbDeleteAll, NULL);
-
- /* No longer enabled on recovery */
- enable_during_recovery = false;
+ LWLockRelease(CommitTsControlLock);
}
/*
{
int pageno;
- /* nothing to do if module not enabled */
- if (!track_commit_timestamp && !enable_during_recovery)
+ /*
+ * Nothing to do if module not enabled. Note we do an unlocked read of the
+ * flag here, which is okay because this routine is only called from
+ * GetNewTransactionId, which is never called in a standby.
+ */
+ Assert(!InRecovery);
+ if (!commitTsShared->commitTsActive)
return;
/*
* Note that we don't need to flush XLOG here.
*/
void
-TruncateCommitTs(TransactionId oldestXact, bool do_wal)
+TruncateCommitTs(TransactionId oldestXact)
{
int cutoffPage;
return; /* nothing to remove */
/* Write XLOG record */
- if (do_wal)
- WriteTruncateXlogRec(cutoffPage);
+ WriteTruncateXlogRec(cutoffPage);
/* Now we can remove the old CommitTs segment(s) */
SimpleLruTruncate(CommitTsCtl, cutoffPage);