* commit_ts.c
* PostgreSQL commit timestamp manager
*
- * This module is a pg_clog-like system that stores the commit timestamp
+ * This module is a pg_xact-like system that stores the commit timestamp
* for each transaction.
*
* XLOG interactions: this module generates an XLOG record whenever a new
* re-perform the status update on redo; so we need make no additional XLOG
* entry here.
*
- * Portions Copyright (c) 1996-2015, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* src/backend/access/transam/commit_ts.c
#include "funcapi.h"
#include "miscadmin.h"
#include "pg_trace.h"
+#include "storage/shmem.h"
#include "utils/builtins.h"
#include "utils/snapmgr.h"
#include "utils/timestamp.h"
#define CommitTsCtl (&CommitTsCtlData)
/*
- * We keep a cache of the last value set in shared memory. This is protected
- * by CommitTsLock.
+ * We keep a cache of the last value set in shared memory.
+ *
+ * This is also good place to keep the activation status. We keep this
+ * separate from the GUC so that the standby can activate the module if the
+ * primary has it active independently of the value of the GUC.
+ *
+ * This is protected by CommitTsLock. In some places, we use commitTsActive
+ * without acquiring the lock; where this happens, a comment explains the
+ * rationale for it.
*/
typedef struct CommitTimestampShared
{
TransactionId xidLastCommit;
CommitTimestampEntry dataLastCommit;
+ bool commitTsActive;
} CommitTimestampShared;
CommitTimestampShared *commitTsShared;
/* GUC variable */
bool track_commit_timestamp;
-/*
- * When this is set, commit_ts is force-enabled during recovery. This is so
- * that a standby can replay WAL records coming from a master with the setting
- * enabled. (Note that this doesn't enable SQL access to the data; it's
- * effectively write-only until the GUC itself is enabled.)
- */
-static bool enable_during_recovery;
-
static void SetXidCommitTsInPage(TransactionId xid, int nsubxids,
TransactionId *subxids, TimestampTz ts,
RepOriginId nodeid, int pageno);
static void TransactionIdSetCommitTs(TransactionId xid, TimestampTz ts,
RepOriginId nodeid, int slotno);
+static void error_commit_ts_disabled(void);
static int ZeroCommitTsPage(int pageno, bool writeXlog);
static bool CommitTsPagePrecedes(int page1, int page2);
static void ActivateCommitTs(void);
-static void DeactivateCommitTs(bool do_wal);
+static void DeactivateCommitTs(void);
static void WriteZeroPageXlogRec(int pageno);
-static void WriteTruncateXlogRec(int pageno);
+static void WriteTruncateXlogRec(int pageno, TransactionId oldestXid);
static void WriteSetTimestampXlogRec(TransactionId mainxid, int nsubxids,
TransactionId *subxids, TimestampTz timestamp,
RepOriginId nodeid);
TransactionId newestXact;
/*
- * No-op if the module is not enabled, but allow writes in a standby
- * during recovery.
+ * No-op if the module is not active.
+ *
+ * An unlocked read here is fine, because in a standby (the only place
+ * where the flag can change in flight) this routine is only called by the
+ * recovery process, which is also the only process which can change the
+ * flag.
*/
- if (!track_commit_timestamp && !enable_during_recovery)
+ if (!commitTsShared->commitTsActive)
return;
/*
commitTsShared->dataLastCommit.nodeid = nodeid;
/* and move forwards our endpoint, if needed */
- if (TransactionIdPrecedes(ShmemVariableCache->newestCommitTs, newestXact))
- ShmemVariableCache->newestCommitTs = newestXact;
+ if (TransactionIdPrecedes(ShmemVariableCache->newestCommitTsXid, newestXact))
+ ShmemVariableCache->newestCommitTsXid = newestXact;
LWLockRelease(CommitTsLock);
}
int entryno = TransactionIdToCTsEntry(xid);
int slotno;
CommitTimestampEntry entry;
- TransactionId oldestCommitTs;
- TransactionId newestCommitTs;
+ TransactionId oldestCommitTsXid;
+ TransactionId newestCommitTsXid;
- /* Error if module not enabled */
- if (!track_commit_timestamp)
- ereport(ERROR,
- (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
- errmsg("could not get commit timestamp data"),
- errhint("Make sure the configuration parameter \"%s\" is set.",
- "track_commit_timestamp")));
-
- /* error if the given Xid doesn't normally commit */
- if (!TransactionIdIsNormal(xid))
+ if (!TransactionIdIsValid(xid))
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("cannot retrieve commit timestamp for transaction %u", xid)));
-
- /*
- * Return empty if the requested value is outside our valid range.
- */
- LWLockAcquire(CommitTsLock, LW_SHARED);
- oldestCommitTs = ShmemVariableCache->oldestCommitTs;
- newestCommitTs = ShmemVariableCache->newestCommitTs;
- /* neither is invalid, or both are */
- Assert(TransactionIdIsValid(oldestCommitTs) == TransactionIdIsValid(newestCommitTs));
- LWLockRelease(CommitTsLock);
-
- if (!TransactionIdIsValid(oldestCommitTs) ||
- TransactionIdPrecedes(xid, oldestCommitTs) ||
- TransactionIdPrecedes(newestCommitTs, xid))
+ else if (!TransactionIdIsNormal(xid))
{
+ /* frozen and bootstrap xids are always committed far in the past */
*ts = 0;
if (nodeid)
- *nodeid = InvalidRepOriginId;
+ *nodeid = 0;
return false;
}
+ LWLockAcquire(CommitTsLock, LW_SHARED);
+
+ /* Error if module not enabled */
+ if (!commitTsShared->commitTsActive)
+ error_commit_ts_disabled();
+
/*
- * Use an unlocked atomic read on our cached value in shared memory; if
- * it's a hit, acquire a lock and read the data, after verifying that it's
- * still what we initially read. Otherwise, fall through to read from
- * SLRU.
+ * If we're asked for the cached value, return that. Otherwise, fall
+ * through to read from SLRU.
*/
if (commitTsShared->xidLastCommit == xid)
{
- LWLockAcquire(CommitTsLock, LW_SHARED);
- if (commitTsShared->xidLastCommit == xid)
- {
- *ts = commitTsShared->dataLastCommit.time;
- if (nodeid)
- *nodeid = commitTsShared->dataLastCommit.nodeid;
+ *ts = commitTsShared->dataLastCommit.time;
+ if (nodeid)
+ *nodeid = commitTsShared->dataLastCommit.nodeid;
- LWLockRelease(CommitTsLock);
- return *ts != 0;
- }
LWLockRelease(CommitTsLock);
+ return *ts != 0;
+ }
+
+ oldestCommitTsXid = ShmemVariableCache->oldestCommitTsXid;
+ newestCommitTsXid = ShmemVariableCache->newestCommitTsXid;
+ /* neither is invalid, or both are */
+ Assert(TransactionIdIsValid(oldestCommitTsXid) == TransactionIdIsValid(newestCommitTsXid));
+ LWLockRelease(CommitTsLock);
+
+ /*
+ * Return empty if the requested value is outside our valid range.
+ */
+ if (!TransactionIdIsValid(oldestCommitTsXid) ||
+ TransactionIdPrecedes(xid, oldestCommitTsXid) ||
+ TransactionIdPrecedes(newestCommitTsXid, xid))
+ {
+ *ts = 0;
+ if (nodeid)
+ *nodeid = InvalidRepOriginId;
+ return false;
}
/* lock is acquired by SimpleLruReadPage_ReadOnly */
{
TransactionId xid;
+ LWLockAcquire(CommitTsLock, LW_SHARED);
+
/* Error if module not enabled */
- if (!track_commit_timestamp)
- ereport(ERROR,
- (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
- errmsg("could not get commit timestamp data"),
- errhint("Make sure the configuration parameter \"%s\" is set.",
- "track_commit_timestamp")));
+ if (!commitTsShared->commitTsActive)
+ error_commit_ts_disabled();
- LWLockAcquire(CommitTsLock, LW_SHARED);
xid = commitTsShared->xidLastCommit;
if (ts)
*ts = commitTsShared->dataLastCommit.time;
return xid;
}
+static void
+error_commit_ts_disabled(void)
+{
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("could not get commit timestamp data"),
+ RecoveryInProgress() ?
+ errhint("Make sure the configuration parameter \"%s\" is set on the master server.",
+ "track_commit_timestamp") :
+ errhint("Make sure the configuration parameter \"%s\" is set.",
+ "track_commit_timestamp")));
+}
+
/*
* SQL-callable wrapper to obtain commit time of a transaction
*/
bool found;
CommitTsCtl->PagePrecedes = CommitTsPagePrecedes;
- SimpleLruInit(CommitTsCtl, "CommitTs Ctl", CommitTsShmemBuffers(), 0,
- CommitTsControlLock, "pg_commit_ts");
+ SimpleLruInit(CommitTsCtl, "commit_timestamp", CommitTsShmemBuffers(), 0,
+ CommitTsControlLock, "pg_commit_ts",
+ LWTRANCHE_COMMITTS_BUFFERS);
commitTsShared = ShmemInitStruct("CommitTs shared",
sizeof(CommitTimestampShared),
commitTsShared->xidLastCommit = InvalidTransactionId;
TIMESTAMP_NOBEGIN(commitTsShared->dataLastCommit.time);
commitTsShared->dataLastCommit.nodeid = InvalidRepOriginId;
+ commitTsShared->commitTsActive = false;
}
else
Assert(found);
/*
* Nothing to do here at present, unlike most other SLRU modules; segments
* are created when the server is started with this module enabled. See
- * StartupCommitTs.
+ * ActivateCommitTs.
*/
}
/*
* This must be called ONCE during postmaster or standalone-backend startup,
* after StartupXLOG has initialized ShmemVariableCache->nextXid.
- *
- * Caller may choose to enable the feature even when it is turned off in the
- * configuration.
*/
void
-StartupCommitTs(bool force_enable)
+StartupCommitTs(void)
{
- /*
- * If the module is not enabled, there's nothing to do here. The module
- * could still be activated from elsewhere.
- */
- if (track_commit_timestamp || force_enable)
- ActivateCommitTs();
+ ActivateCommitTs();
}
/*
/*
* If the feature is not enabled, turn it off for good. This also removes
* any leftover data.
+ *
+ * Conversely, we activate the module if the feature is enabled. This is
+ * not necessary in a master system because we already did it earlier, but
+ * if we're in a standby server that got promoted which had the feature
+ * enabled and was following a master that had the feature disabled, this
+ * is where we turn it on locally.
*/
if (!track_commit_timestamp)
- DeactivateCommitTs(true);
+ DeactivateCommitTs();
+ else
+ ActivateCommitTs();
}
/*
* pg_control. If the old value was already set, we already did this, so
* don't do anything.
*
- * If the module is disabled in the master, disable it here too.
+ * If the module is disabled in the master, disable it here too, unless
+ * the module is enabled locally.
+ *
+ * Note this only runs in the recovery process, so an unlocked read is
+ * fine.
*/
if (newvalue)
{
- if (!track_commit_timestamp && !oldvalue)
+ if (!commitTsShared->commitTsActive)
ActivateCommitTs();
}
- else if (oldvalue)
- DeactivateCommitTs(false);
+ else if (commitTsShared->commitTsActive)
+ DeactivateCommitTs();
}
/*
* Activate this module whenever necessary.
- * This must happen during postmaster or standalong-backend startup,
+ * This must happen during postmaster or standalone-backend startup,
* or during WAL replay anytime the track_commit_timestamp setting is
* changed in the master.
*
static void
ActivateCommitTs(void)
{
- TransactionId xid = ShmemVariableCache->nextXid;
- int pageno = TransactionIdToCTsPage(xid);
+ TransactionId xid;
+ int pageno;
+
+ /* If we've done this already, there's nothing to do */
+ LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
+ if (commitTsShared->commitTsActive)
+ {
+ LWLockRelease(CommitTsLock);
+ return;
+ }
+ LWLockRelease(CommitTsLock);
+
+ xid = ShmemVariableCache->nextXid;
+ pageno = TransactionIdToCTsPage(xid);
/*
* Re-Initialize our idea of the latest page number.
* enabled again? It doesn't look like it does, because there should be a
* checkpoint that sets the value to InvalidTransactionId at end of
* recovery; and so any chance of injecting new transactions without
- * CommitTs values would occur after the oldestCommitTs has been set to
+ * CommitTs values would occur after the oldestCommitTsXid has been set to
* Invalid temporarily.
*/
LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
- if (ShmemVariableCache->oldestCommitTs == InvalidTransactionId)
+ if (ShmemVariableCache->oldestCommitTsXid == InvalidTransactionId)
{
- ShmemVariableCache->oldestCommitTs =
- ShmemVariableCache->newestCommitTs = ReadNewTransactionId();
+ ShmemVariableCache->oldestCommitTsXid =
+ ShmemVariableCache->newestCommitTsXid = ReadNewTransactionId();
}
LWLockRelease(CommitTsLock);
- /* Finally, create the current segment file, if necessary */
+ /* Create the current segment file, if necessary */
if (!SimpleLruDoesPhysicalPageExist(CommitTsCtl, pageno))
{
int slotno;
LWLockRelease(CommitTsControlLock);
}
- /* We can now replay xlog records from this module */
- enable_during_recovery = true;
+ /* Change the activation status in shared memory. */
+ LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
+ commitTsShared->commitTsActive = true;
+ LWLockRelease(CommitTsLock);
}
/*
* possibly-invalid data; also removes segments of old data.
*/
static void
-DeactivateCommitTs(bool do_wal)
+DeactivateCommitTs(void)
{
- TransactionId xid = ShmemVariableCache->nextXid;
- int pageno = TransactionIdToCTsPage(xid);
-
/*
- * Re-Initialize our idea of the latest page number.
+ * Cleanup the status in the shared memory.
+ *
+ * We reset everything in the commitTsShared record to prevent user from
+ * getting confusing data about last committed transaction on the standby
+ * when the module was activated repeatedly on the primary.
*/
- LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE);
- CommitTsCtl->shared->latest_page_number = pageno;
- LWLockRelease(CommitTsControlLock);
-
LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
- ShmemVariableCache->oldestCommitTs = InvalidTransactionId;
- ShmemVariableCache->newestCommitTs = InvalidTransactionId;
+
+ commitTsShared->commitTsActive = false;
+ commitTsShared->xidLastCommit = InvalidTransactionId;
+ TIMESTAMP_NOBEGIN(commitTsShared->dataLastCommit.time);
+ commitTsShared->dataLastCommit.nodeid = InvalidRepOriginId;
+
+ ShmemVariableCache->oldestCommitTsXid = InvalidTransactionId;
+ ShmemVariableCache->newestCommitTsXid = InvalidTransactionId;
+
LWLockRelease(CommitTsLock);
/*
* be overwritten anyway when we wrap around, but it seems better to be
* tidy.)
*/
+ LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE);
(void) SlruScanDirectory(CommitTsCtl, SlruScanDirCbDeleteAll, NULL);
-
- /* No longer enabled on recovery */
- enable_during_recovery = false;
+ LWLockRelease(CommitTsControlLock);
}
/*
{
/* Flush dirty CommitTs pages to disk */
SimpleLruFlush(CommitTsCtl, false);
+
+ /*
+ * fsync pg_commit_ts to ensure that any files flushed previously are
+ * durably on disk.
+ */
+ fsync_fname("pg_commit_ts", true);
}
/*
{
/* Flush dirty CommitTs pages to disk */
SimpleLruFlush(CommitTsCtl, true);
+
+ /*
+ * fsync pg_commit_ts to ensure that any files flushed previously are
+ * durably on disk.
+ */
+ fsync_fname("pg_commit_ts", true);
}
/*
{
int pageno;
- /* nothing to do if module not enabled */
- if (!track_commit_timestamp && !enable_during_recovery)
+ /*
+ * Nothing to do if module not enabled. Note we do an unlocked read of
+ * the flag here, which is okay because this routine is only called from
+ * GetNewTransactionId, which is never called in a standby.
+ */
+ Assert(!InRecovery);
+ if (!commitTsShared->commitTsActive)
return;
/*
* Note that we don't need to flush XLOG here.
*/
void
-TruncateCommitTs(TransactionId oldestXact, bool do_wal)
+TruncateCommitTs(TransactionId oldestXact)
{
int cutoffPage;
return; /* nothing to remove */
/* Write XLOG record */
- if (do_wal)
- WriteTruncateXlogRec(cutoffPage);
+ WriteTruncateXlogRec(cutoffPage, oldestXact);
/* Now we can remove the old CommitTs segment(s) */
SimpleLruTruncate(CommitTsCtl, cutoffPage);
* "future" or signal a disabled committs.
*/
LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
- if (ShmemVariableCache->oldestCommitTs != InvalidTransactionId)
+ if (ShmemVariableCache->oldestCommitTsXid != InvalidTransactionId)
{
- if (TransactionIdPrecedes(ShmemVariableCache->oldestCommitTs, oldestXact))
- ShmemVariableCache->oldestCommitTs = oldestXact;
- if (TransactionIdPrecedes(newestXact, ShmemVariableCache->newestCommitTs))
- ShmemVariableCache->newestCommitTs = newestXact;
+ if (TransactionIdPrecedes(ShmemVariableCache->oldestCommitTsXid, oldestXact))
+ ShmemVariableCache->oldestCommitTsXid = oldestXact;
+ if (TransactionIdPrecedes(newestXact, ShmemVariableCache->newestCommitTsXid))
+ ShmemVariableCache->newestCommitTsXid = newestXact;
}
else
{
- Assert(ShmemVariableCache->newestCommitTs == InvalidTransactionId);
+ Assert(ShmemVariableCache->newestCommitTsXid == InvalidTransactionId);
+ ShmemVariableCache->oldestCommitTsXid = oldestXact;
+ ShmemVariableCache->newestCommitTsXid = newestXact;
}
LWLockRelease(CommitTsLock);
}
* Move forwards the oldest commitTS value that can be consulted
*/
void
-AdvanceOldestCommitTs(TransactionId oldestXact)
+AdvanceOldestCommitTsXid(TransactionId oldestXact)
{
LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
- if (ShmemVariableCache->oldestCommitTs != InvalidTransactionId &&
- TransactionIdPrecedes(ShmemVariableCache->oldestCommitTs, oldestXact))
- ShmemVariableCache->oldestCommitTs = oldestXact;
+ if (ShmemVariableCache->oldestCommitTsXid != InvalidTransactionId &&
+ TransactionIdPrecedes(ShmemVariableCache->oldestCommitTsXid, oldestXact))
+ ShmemVariableCache->oldestCommitTsXid = oldestXact;
LWLockRelease(CommitTsLock);
}
* Write a TRUNCATE xlog record
*/
static void
-WriteTruncateXlogRec(int pageno)
+WriteTruncateXlogRec(int pageno, TransactionId oldestXid)
{
+ xl_commit_ts_truncate xlrec;
+
+ xlrec.pageno = pageno;
+ xlrec.oldestXid = oldestXid;
+
XLogBeginInsert();
- XLogRegisterData((char *) (&pageno), sizeof(int));
+ XLogRegisterData((char *) (&xlrec), SizeOfCommitTsTruncate);
(void) XLogInsert(RM_COMMIT_TS_ID, COMMIT_TS_TRUNCATE);
}
}
else if (info == COMMIT_TS_TRUNCATE)
{
- int pageno;
+ xl_commit_ts_truncate *trunc = (xl_commit_ts_truncate *) XLogRecGetData(record);
- memcpy(&pageno, XLogRecGetData(record), sizeof(int));
+ AdvanceOldestCommitTsXid(trunc->oldestXid);
/*
* During XLOG replay, latest_page_number isn't set up yet; insert a
* suitable value to bypass the sanity test in SimpleLruTruncate.
*/
- CommitTsCtl->shared->latest_page_number = pageno;
+ CommitTsCtl->shared->latest_page_number = trunc->pageno;
- SimpleLruTruncate(CommitTsCtl, pageno);
+ SimpleLruTruncate(CommitTsCtl, trunc->pageno);
}
else if (info == COMMIT_TS_SETTS)
{