1 /*-------------------------------------------------------------------------
4 * PostgreSQL commit timestamp manager
6 * This module is a pg_clog-like system that stores the commit timestamp
7 * for each transaction.
9 * XLOG interactions: this module generates an XLOG record whenever a new
10 * CommitTs page is initialized to zeroes. Also, one XLOG record is
11 * generated for setting of values when the caller requests it; this allows
12 * us to support values coming from places other than transaction commit.
13 * Other writes of CommitTS come from recording of transaction commit in
14 * xact.c, which generates its own XLOG records for these events and will
15 * re-perform the status update on redo; so we need make no additional XLOG
18 * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
19 * Portions Copyright (c) 1994, Regents of the University of California
21 * src/backend/access/transam/commit_ts.c
23 *-------------------------------------------------------------------------
27 #include "access/commit_ts.h"
28 #include "access/htup_details.h"
29 #include "access/slru.h"
30 #include "access/transam.h"
31 #include "catalog/pg_type.h"
33 #include "miscadmin.h"
35 #include "utils/builtins.h"
36 #include "utils/snapmgr.h"
37 #include "utils/timestamp.h"
40 * Defines for CommitTs page sizes. A page is the same BLCKSZ as is used
41 * everywhere else in Postgres.
43 * Note: because TransactionIds are 32 bits and wrap around at 0xFFFFFFFF,
44 * CommitTs page numbering also wraps around at
45 * 0xFFFFFFFF/COMMIT_TS_XACTS_PER_PAGE, and CommitTs segment numbering at
46 * 0xFFFFFFFF/COMMIT_TS_XACTS_PER_PAGE/SLRU_PAGES_PER_SEGMENT. We need take no
47 * explicit notice of that fact in this module, except when comparing segment
48 * and page numbers in TruncateCommitTs (see CommitTsPagePrecedes).
52 * We need 8+2 bytes per xact. Note that enlarging this struct might mean
53 * the largest possible file name is more than 5 chars long; see
56 typedef struct CommitTimestampEntry
60 } CommitTimestampEntry;
62 #define SizeOfCommitTimestampEntry (offsetof(CommitTimestampEntry, nodeid) + \
65 #define COMMIT_TS_XACTS_PER_PAGE \
66 (BLCKSZ / SizeOfCommitTimestampEntry)
68 #define TransactionIdToCTsPage(xid) \
69 ((xid) / (TransactionId) COMMIT_TS_XACTS_PER_PAGE)
70 #define TransactionIdToCTsEntry(xid) \
71 ((xid) % (TransactionId) COMMIT_TS_XACTS_PER_PAGE)
74 * Link to shared-memory data structures for CommitTs control
76 static SlruCtlData CommitTsCtlData;
78 #define CommitTsCtl (&CommitTsCtlData)
81 * We keep a cache of the last value set in shared memory.
83 * This is also good place to keep the activation status. We keep this
84 * separate from the GUC so that the standby can activate the module if the
85 * primary has it active independently of the value of the GUC.
87 * This is protected by CommitTsLock. In some places, we use commitTsActive
88 * without acquiring the lock; where this happens, a comment explains the
91 typedef struct CommitTimestampShared
93 TransactionId xidLastCommit;
94 CommitTimestampEntry dataLastCommit;
96 } CommitTimestampShared;
98 CommitTimestampShared *commitTsShared;
102 bool track_commit_timestamp;
104 static void SetXidCommitTsInPage(TransactionId xid, int nsubxids,
105 TransactionId *subxids, TimestampTz ts,
106 RepOriginId nodeid, int pageno);
107 static void TransactionIdSetCommitTs(TransactionId xid, TimestampTz ts,
108 RepOriginId nodeid, int slotno);
109 static void error_commit_ts_disabled(void);
110 static int ZeroCommitTsPage(int pageno, bool writeXlog);
111 static bool CommitTsPagePrecedes(int page1, int page2);
112 static void ActivateCommitTs(void);
113 static void DeactivateCommitTs(void);
114 static void WriteZeroPageXlogRec(int pageno);
115 static void WriteTruncateXlogRec(int pageno);
116 static void WriteSetTimestampXlogRec(TransactionId mainxid, int nsubxids,
117 TransactionId *subxids, TimestampTz timestamp,
121 * TransactionTreeSetCommitTsData
123 * Record the final commit timestamp of transaction entries in the commit log
124 * for a transaction and its subtransaction tree, as efficiently as possible.
126 * xid is the top level transaction id.
128 * subxids is an array of xids of length nsubxids, representing subtransactions
129 * in the tree of xid. In various cases nsubxids may be zero.
130 * The reason why tracking just the parent xid commit timestamp is not enough
131 * is that the subtrans SLRU does not stay valid across crashes (it's not
132 * permanent) so we need to keep the information about them here. If the
133 * subtrans implementation changes in the future, we might want to revisit the
134 * decision of storing timestamp info for each subxid.
136 * The write_xlog parameter tells us whether to include an XLog record of this
137 * or not. Normally, this is called from transaction commit routines (both
138 * normal and prepared) and the information will be stored in the transaction
139 * commit XLog record, and so they should pass "false" for this. The XLog redo
140 * code should use "false" here as well. Other callers probably want to pass
141 * true, so that the given values persist in case of crashes.
144 TransactionTreeSetCommitTsData(TransactionId xid, int nsubxids,
145 TransactionId *subxids, TimestampTz timestamp,
146 RepOriginId nodeid, bool write_xlog)
149 TransactionId headxid;
150 TransactionId newestXact;
153 * No-op if the module is not active.
155 * An unlocked read here is fine, because in a standby (the only place
156 * where the flag can change in flight) this routine is only called by the
157 * recovery process, which is also the only process which can change the
160 if (!commitTsShared->commitTsActive)
164 * Comply with the WAL-before-data rule: if caller specified it wants this
165 * value to be recorded in WAL, do so before touching the data.
168 WriteSetTimestampXlogRec(xid, nsubxids, subxids, timestamp, nodeid);
171 * Figure out the latest Xid in this batch: either the last subxid if
172 * there's any, otherwise the parent xid.
175 newestXact = subxids[nsubxids - 1];
180 * We split the xids to set the timestamp to in groups belonging to the
181 * same SLRU page; the first element in each such set is its head. The
182 * first group has the main XID as the head; subsequent sets use the first
183 * subxid not on the previous page as head. This way, we only have to
184 * lock/modify each SLRU page once.
186 for (i = 0, headxid = xid;;)
188 int pageno = TransactionIdToCTsPage(headxid);
191 for (j = i; j < nsubxids; j++)
193 if (TransactionIdToCTsPage(subxids[j]) != pageno)
196 /* subxids[i..j] are on the same page as the head */
198 SetXidCommitTsInPage(headxid, j - i, subxids + i, timestamp, nodeid,
201 /* if we wrote out all subxids, we're done. */
202 if (j + 1 >= nsubxids)
206 * Set the new head and skip over it, as well as over the subxids we
209 headxid = subxids[j];
213 /* update the cached value in shared memory */
214 LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
215 commitTsShared->xidLastCommit = xid;
216 commitTsShared->dataLastCommit.time = timestamp;
217 commitTsShared->dataLastCommit.nodeid = nodeid;
219 /* and move forwards our endpoint, if needed */
220 if (TransactionIdPrecedes(ShmemVariableCache->newestCommitTsXid, newestXact))
221 ShmemVariableCache->newestCommitTsXid = newestXact;
222 LWLockRelease(CommitTsLock);
226 * Record the commit timestamp of transaction entries in the commit log for all
227 * entries on a single page. Atomic only on this page.
230 SetXidCommitTsInPage(TransactionId xid, int nsubxids,
231 TransactionId *subxids, TimestampTz ts,
232 RepOriginId nodeid, int pageno)
237 LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE);
239 slotno = SimpleLruReadPage(CommitTsCtl, pageno, true, xid);
241 TransactionIdSetCommitTs(xid, ts, nodeid, slotno);
242 for (i = 0; i < nsubxids; i++)
243 TransactionIdSetCommitTs(subxids[i], ts, nodeid, slotno);
245 CommitTsCtl->shared->page_dirty[slotno] = true;
247 LWLockRelease(CommitTsControlLock);
251 * Sets the commit timestamp of a single transaction.
253 * Must be called with CommitTsControlLock held
256 TransactionIdSetCommitTs(TransactionId xid, TimestampTz ts,
257 RepOriginId nodeid, int slotno)
259 int entryno = TransactionIdToCTsEntry(xid);
260 CommitTimestampEntry entry;
262 Assert(TransactionIdIsNormal(xid));
265 entry.nodeid = nodeid;
267 memcpy(CommitTsCtl->shared->page_buffer[slotno] +
268 SizeOfCommitTimestampEntry * entryno,
269 &entry, SizeOfCommitTimestampEntry);
273 * Interrogate the commit timestamp of a transaction.
275 * The return value indicates whether a commit timestamp record was found for
276 * the given xid. The timestamp value is returned in *ts (which may not be
277 * null), and the origin node for the Xid is returned in *nodeid, if it's not
281 TransactionIdGetCommitTsData(TransactionId xid, TimestampTz *ts,
284 int pageno = TransactionIdToCTsPage(xid);
285 int entryno = TransactionIdToCTsEntry(xid);
287 CommitTimestampEntry entry;
288 TransactionId oldestCommitTsXid;
289 TransactionId newestCommitTsXid;
291 /* error if the given Xid doesn't normally commit */
292 if (!TransactionIdIsNormal(xid))
294 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
295 errmsg("cannot retrieve commit timestamp for transaction %u", xid)));
297 LWLockAcquire(CommitTsLock, LW_SHARED);
299 /* Error if module not enabled */
300 if (!commitTsShared->commitTsActive)
301 error_commit_ts_disabled();
304 * If we're asked for the cached value, return that. Otherwise, fall
305 * through to read from SLRU.
307 if (commitTsShared->xidLastCommit == xid)
309 *ts = commitTsShared->dataLastCommit.time;
311 *nodeid = commitTsShared->dataLastCommit.nodeid;
313 LWLockRelease(CommitTsLock);
317 oldestCommitTsXid = ShmemVariableCache->oldestCommitTsXid;
318 newestCommitTsXid = ShmemVariableCache->newestCommitTsXid;
319 /* neither is invalid, or both are */
320 Assert(TransactionIdIsValid(oldestCommitTsXid) == TransactionIdIsValid(newestCommitTsXid));
321 LWLockRelease(CommitTsLock);
324 * Return empty if the requested value is outside our valid range.
326 if (!TransactionIdIsValid(oldestCommitTsXid) ||
327 TransactionIdPrecedes(xid, oldestCommitTsXid) ||
328 TransactionIdPrecedes(newestCommitTsXid, xid))
332 *nodeid = InvalidRepOriginId;
336 /* lock is acquired by SimpleLruReadPage_ReadOnly */
337 slotno = SimpleLruReadPage_ReadOnly(CommitTsCtl, pageno, xid);
339 CommitTsCtl->shared->page_buffer[slotno] +
340 SizeOfCommitTimestampEntry * entryno,
341 SizeOfCommitTimestampEntry);
345 *nodeid = entry.nodeid;
347 LWLockRelease(CommitTsControlLock);
352 * Return the Xid of the latest committed transaction. (As far as this module
353 * is concerned, anyway; it's up to the caller to ensure the value is useful
356 * ts and extra are filled with the corresponding data; they can be passed
357 * as NULL if not wanted.
360 GetLatestCommitTsData(TimestampTz *ts, RepOriginId *nodeid)
364 LWLockAcquire(CommitTsLock, LW_SHARED);
366 /* Error if module not enabled */
367 if (!commitTsShared->commitTsActive)
368 error_commit_ts_disabled();
370 xid = commitTsShared->xidLastCommit;
372 *ts = commitTsShared->dataLastCommit.time;
374 *nodeid = commitTsShared->dataLastCommit.nodeid;
375 LWLockRelease(CommitTsLock);
381 error_commit_ts_disabled(void)
384 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
385 errmsg("could not get commit timestamp data"),
386 RecoveryInProgress() ?
387 errhint("Make sure the configuration parameter \"%s\" is set on the master server.",
388 "track_commit_timestamp") :
389 errhint("Make sure the configuration parameter \"%s\" is set.",
390 "track_commit_timestamp")));
394 * SQL-callable wrapper to obtain commit time of a transaction
397 pg_xact_commit_timestamp(PG_FUNCTION_ARGS)
399 TransactionId xid = PG_GETARG_UINT32(0);
403 found = TransactionIdGetCommitTsData(xid, &ts, NULL);
408 PG_RETURN_TIMESTAMPTZ(ts);
413 pg_last_committed_xact(PG_FUNCTION_ARGS)
422 /* and construct a tuple with our data */
423 xid = GetLatestCommitTsData(&ts, NULL);
426 * Construct a tuple descriptor for the result row. This must match this
427 * function's pg_proc entry!
429 tupdesc = CreateTemplateTupleDesc(2, false);
430 TupleDescInitEntry(tupdesc, (AttrNumber) 1, "xid",
432 TupleDescInitEntry(tupdesc, (AttrNumber) 2, "timestamp",
433 TIMESTAMPTZOID, -1, 0);
434 tupdesc = BlessTupleDesc(tupdesc);
436 if (!TransactionIdIsNormal(xid))
438 memset(nulls, true, sizeof(nulls));
442 values[0] = TransactionIdGetDatum(xid);
445 values[1] = TimestampTzGetDatum(ts);
449 htup = heap_form_tuple(tupdesc, values, nulls);
451 PG_RETURN_DATUM(HeapTupleGetDatum(htup));
456 * Number of shared CommitTS buffers.
458 * We use a very similar logic as for the number of CLOG buffers; see comments
459 * in CLOGShmemBuffers.
462 CommitTsShmemBuffers(void)
464 return Min(16, Max(4, NBuffers / 1024));
468 * Shared memory sizing for CommitTs
471 CommitTsShmemSize(void)
473 return SimpleLruShmemSize(CommitTsShmemBuffers(), 0) +
474 sizeof(CommitTimestampShared);
478 * Initialize CommitTs at system startup (postmaster start or standalone
482 CommitTsShmemInit(void)
486 CommitTsCtl->PagePrecedes = CommitTsPagePrecedes;
487 SimpleLruInit(CommitTsCtl, "commit_timestamp", CommitTsShmemBuffers(), 0,
488 CommitTsControlLock, "pg_commit_ts",
489 LWTRANCHE_COMMITTS_BUFFERS);
491 commitTsShared = ShmemInitStruct("CommitTs shared",
492 sizeof(CommitTimestampShared),
495 if (!IsUnderPostmaster)
499 commitTsShared->xidLastCommit = InvalidTransactionId;
500 TIMESTAMP_NOBEGIN(commitTsShared->dataLastCommit.time);
501 commitTsShared->dataLastCommit.nodeid = InvalidRepOriginId;
502 commitTsShared->commitTsActive = false;
509 * This function must be called ONCE on system install.
511 * (The CommitTs directory is assumed to have been created by initdb, and
512 * CommitTsShmemInit must have been called already.)
515 BootStrapCommitTs(void)
518 * Nothing to do here at present, unlike most other SLRU modules; segments
519 * are created when the server is started with this module enabled. See
525 * Initialize (or reinitialize) a page of CommitTs to zeroes.
526 * If writeXlog is TRUE, also emit an XLOG record saying we did this.
528 * The page is not actually written, just set up in shared memory.
529 * The slot number of the new page is returned.
531 * Control lock must be held at entry, and will be held at exit.
534 ZeroCommitTsPage(int pageno, bool writeXlog)
538 slotno = SimpleLruZeroPage(CommitTsCtl, pageno);
541 WriteZeroPageXlogRec(pageno);
547 * This must be called ONCE during postmaster or standalone-backend startup,
548 * after StartupXLOG has initialized ShmemVariableCache->nextXid.
551 StartupCommitTs(void)
557 * This must be called ONCE during postmaster or standalone-backend startup,
558 * after recovery has finished.
561 CompleteCommitTsInitialization(void)
564 * If the feature is not enabled, turn it off for good. This also removes
567 * Conversely, we activate the module if the feature is enabled. This is
568 * not necessary in a master system because we already did it earlier, but
569 * if we're in a standby server that got promoted which had the feature
570 * enabled and was following a master that had the feature disabled, this
571 * is where we turn it on locally.
573 if (!track_commit_timestamp)
574 DeactivateCommitTs();
580 * Activate or deactivate CommitTs' upon reception of a XLOG_PARAMETER_CHANGE
581 * XLog record in a standby.
584 CommitTsParameterChange(bool newvalue, bool oldvalue)
587 * If the commit_ts module is disabled in this server and we get word from
588 * the master server that it is enabled there, activate it so that we can
589 * replay future WAL records involving it; also mark it as active on
590 * pg_control. If the old value was already set, we already did this, so
593 * If the module is disabled in the master, disable it here too, unless
594 * the module is enabled locally.
596 * Note this only runs in the recovery process, so an unlocked read is
601 if (!commitTsShared->commitTsActive)
604 else if (commitTsShared->commitTsActive)
605 DeactivateCommitTs();
609 * Activate this module whenever necessary.
610 * This must happen during postmaster or standalong-backend startup,
611 * or during WAL replay anytime the track_commit_timestamp setting is
612 * changed in the master.
614 * The reason why this SLRU needs separate activation/deactivation functions is
615 * that it can be enabled/disabled during start and the activation/deactivation
616 * on master is propagated to slave via replay. Other SLRUs don't have this
617 * property and they can be just initialized during normal startup.
619 * This is in charge of creating the currently active segment, if it's not
620 * already there. The reason for this is that the server might have been
621 * running with this module disabled for a while and thus might have skipped
622 * the normal creation point.
625 ActivateCommitTs(void)
630 /* If we've done this already, there's nothing to do */
631 LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
632 if (commitTsShared->commitTsActive)
634 LWLockRelease(CommitTsLock);
637 LWLockRelease(CommitTsLock);
639 xid = ShmemVariableCache->nextXid;
640 pageno = TransactionIdToCTsPage(xid);
643 * Re-Initialize our idea of the latest page number.
645 LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE);
646 CommitTsCtl->shared->latest_page_number = pageno;
647 LWLockRelease(CommitTsControlLock);
650 * If CommitTs is enabled, but it wasn't in the previous server run, we
651 * need to set the oldest and newest values to the next Xid; that way, we
652 * will not try to read data that might not have been set.
654 * XXX does this have a problem if a server is started with commitTs
655 * enabled, then started with commitTs disabled, then restarted with it
656 * enabled again? It doesn't look like it does, because there should be a
657 * checkpoint that sets the value to InvalidTransactionId at end of
658 * recovery; and so any chance of injecting new transactions without
659 * CommitTs values would occur after the oldestCommitTsXid has been set to
660 * Invalid temporarily.
662 LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
663 if (ShmemVariableCache->oldestCommitTsXid == InvalidTransactionId)
665 ShmemVariableCache->oldestCommitTsXid =
666 ShmemVariableCache->newestCommitTsXid = ReadNewTransactionId();
668 LWLockRelease(CommitTsLock);
670 /* Create the current segment file, if necessary */
671 if (!SimpleLruDoesPhysicalPageExist(CommitTsCtl, pageno))
675 LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE);
676 slotno = ZeroCommitTsPage(pageno, false);
677 SimpleLruWritePage(CommitTsCtl, slotno);
678 Assert(!CommitTsCtl->shared->page_dirty[slotno]);
679 LWLockRelease(CommitTsControlLock);
682 /* Change the activation status in shared memory. */
683 LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
684 commitTsShared->commitTsActive = true;
685 LWLockRelease(CommitTsLock);
689 * Deactivate this module.
691 * This must be called when the track_commit_timestamp parameter is turned off.
692 * This happens during postmaster or standalone-backend startup, or during WAL
695 * Resets CommitTs into invalid state to make sure we don't hand back
696 * possibly-invalid data; also removes segments of old data.
699 DeactivateCommitTs(void)
702 * Cleanup the status in the shared memory.
704 * We reset everything in the commitTsShared record to prevent user from
705 * getting confusing data about last committed transaction on the standby
706 * when the module was activated repeatedly on the primary.
708 LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
710 commitTsShared->commitTsActive = false;
711 commitTsShared->xidLastCommit = InvalidTransactionId;
712 TIMESTAMP_NOBEGIN(commitTsShared->dataLastCommit.time);
713 commitTsShared->dataLastCommit.nodeid = InvalidRepOriginId;
715 ShmemVariableCache->oldestCommitTsXid = InvalidTransactionId;
716 ShmemVariableCache->newestCommitTsXid = InvalidTransactionId;
718 LWLockRelease(CommitTsLock);
721 * Remove *all* files. This is necessary so that there are no leftover
722 * files; in the case where this feature is later enabled after running
723 * with it disabled for some time there may be a gap in the file sequence.
724 * (We can probably tolerate out-of-sequence files, as they are going to
725 * be overwritten anyway when we wrap around, but it seems better to be
728 LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE);
729 (void) SlruScanDirectory(CommitTsCtl, SlruScanDirCbDeleteAll, NULL);
730 LWLockRelease(CommitTsControlLock);
734 * This must be called ONCE during postmaster or standalone-backend shutdown
737 ShutdownCommitTs(void)
739 /* Flush dirty CommitTs pages to disk */
740 SimpleLruFlush(CommitTsCtl, false);
744 * Perform a checkpoint --- either during shutdown, or on-the-fly
747 CheckPointCommitTs(void)
749 /* Flush dirty CommitTs pages to disk */
750 SimpleLruFlush(CommitTsCtl, true);
754 * Make sure that CommitTs has room for a newly-allocated XID.
756 * NB: this is called while holding XidGenLock. We want it to be very fast
757 * most of the time; even when it's not so fast, no actual I/O need happen
758 * unless we're forced to write out a dirty CommitTs or xlog page to make room
761 * NB: the current implementation relies on track_commit_timestamp being
765 ExtendCommitTs(TransactionId newestXact)
770 * Nothing to do if module not enabled. Note we do an unlocked read of
771 * the flag here, which is okay because this routine is only called from
772 * GetNewTransactionId, which is never called in a standby.
775 if (!commitTsShared->commitTsActive)
779 * No work except at first XID of a page. But beware: just after
780 * wraparound, the first XID of page zero is FirstNormalTransactionId.
782 if (TransactionIdToCTsEntry(newestXact) != 0 &&
783 !TransactionIdEquals(newestXact, FirstNormalTransactionId))
786 pageno = TransactionIdToCTsPage(newestXact);
788 LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE);
790 /* Zero the page and make an XLOG entry about it */
791 ZeroCommitTsPage(pageno, !InRecovery);
793 LWLockRelease(CommitTsControlLock);
797 * Remove all CommitTs segments before the one holding the passed
800 * Note that we don't need to flush XLOG here.
803 TruncateCommitTs(TransactionId oldestXact)
808 * The cutoff point is the start of the segment containing oldestXact. We
809 * pass the *page* containing oldestXact to SimpleLruTruncate.
811 cutoffPage = TransactionIdToCTsPage(oldestXact);
813 /* Check to see if there's any files that could be removed */
814 if (!SlruScanDirectory(CommitTsCtl, SlruScanDirCbReportPresence,
816 return; /* nothing to remove */
818 /* Write XLOG record */
819 WriteTruncateXlogRec(cutoffPage);
821 /* Now we can remove the old CommitTs segment(s) */
822 SimpleLruTruncate(CommitTsCtl, cutoffPage);
826 * Set the limit values between which commit TS can be consulted.
829 SetCommitTsLimit(TransactionId oldestXact, TransactionId newestXact)
832 * Be careful not to overwrite values that are either further into the
833 * "future" or signal a disabled committs.
835 LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
836 if (ShmemVariableCache->oldestCommitTsXid != InvalidTransactionId)
838 if (TransactionIdPrecedes(ShmemVariableCache->oldestCommitTsXid, oldestXact))
839 ShmemVariableCache->oldestCommitTsXid = oldestXact;
840 if (TransactionIdPrecedes(newestXact, ShmemVariableCache->newestCommitTsXid))
841 ShmemVariableCache->newestCommitTsXid = newestXact;
845 Assert(ShmemVariableCache->newestCommitTsXid == InvalidTransactionId);
847 LWLockRelease(CommitTsLock);
851 * Move forwards the oldest commitTS value that can be consulted
854 AdvanceOldestCommitTsXid(TransactionId oldestXact)
856 LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
857 if (ShmemVariableCache->oldestCommitTsXid != InvalidTransactionId &&
858 TransactionIdPrecedes(ShmemVariableCache->oldestCommitTsXid, oldestXact))
859 ShmemVariableCache->oldestCommitTsXid = oldestXact;
860 LWLockRelease(CommitTsLock);
865 * Decide which of two CLOG page numbers is "older" for truncation purposes.
867 * We need to use comparison of TransactionIds here in order to do the right
868 * thing with wraparound XID arithmetic. However, if we are asked about
869 * page number zero, we don't want to hand InvalidTransactionId to
870 * TransactionIdPrecedes: it'll get weird about permanent xact IDs. So,
871 * offset both xids by FirstNormalTransactionId to avoid that.
874 CommitTsPagePrecedes(int page1, int page2)
879 xid1 = ((TransactionId) page1) * COMMIT_TS_XACTS_PER_PAGE;
880 xid1 += FirstNormalTransactionId;
881 xid2 = ((TransactionId) page2) * COMMIT_TS_XACTS_PER_PAGE;
882 xid2 += FirstNormalTransactionId;
884 return TransactionIdPrecedes(xid1, xid2);
889 * Write a ZEROPAGE xlog record
892 WriteZeroPageXlogRec(int pageno)
895 XLogRegisterData((char *) (&pageno), sizeof(int));
896 (void) XLogInsert(RM_COMMIT_TS_ID, COMMIT_TS_ZEROPAGE);
900 * Write a TRUNCATE xlog record
903 WriteTruncateXlogRec(int pageno)
906 XLogRegisterData((char *) (&pageno), sizeof(int));
907 (void) XLogInsert(RM_COMMIT_TS_ID, COMMIT_TS_TRUNCATE);
911 * Write a SETTS xlog record
914 WriteSetTimestampXlogRec(TransactionId mainxid, int nsubxids,
915 TransactionId *subxids, TimestampTz timestamp,
918 xl_commit_ts_set record;
920 record.timestamp = timestamp;
921 record.nodeid = nodeid;
922 record.mainxid = mainxid;
925 XLogRegisterData((char *) &record,
926 offsetof(xl_commit_ts_set, mainxid) +
927 sizeof(TransactionId));
928 XLogRegisterData((char *) subxids, nsubxids * sizeof(TransactionId));
929 XLogInsert(RM_COMMIT_TS_ID, COMMIT_TS_SETTS);
933 * CommitTS resource manager's routines
936 commit_ts_redo(XLogReaderState *record)
938 uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
940 /* Backup blocks are not used in commit_ts records */
941 Assert(!XLogRecHasAnyBlockRefs(record));
943 if (info == COMMIT_TS_ZEROPAGE)
948 memcpy(&pageno, XLogRecGetData(record), sizeof(int));
950 LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE);
952 slotno = ZeroCommitTsPage(pageno, false);
953 SimpleLruWritePage(CommitTsCtl, slotno);
954 Assert(!CommitTsCtl->shared->page_dirty[slotno]);
956 LWLockRelease(CommitTsControlLock);
958 else if (info == COMMIT_TS_TRUNCATE)
962 memcpy(&pageno, XLogRecGetData(record), sizeof(int));
965 * During XLOG replay, latest_page_number isn't set up yet; insert a
966 * suitable value to bypass the sanity test in SimpleLruTruncate.
968 CommitTsCtl->shared->latest_page_number = pageno;
970 SimpleLruTruncate(CommitTsCtl, pageno);
972 else if (info == COMMIT_TS_SETTS)
974 xl_commit_ts_set *setts = (xl_commit_ts_set *) XLogRecGetData(record);
976 TransactionId *subxids;
978 nsubxids = ((XLogRecGetDataLen(record) - SizeOfCommitTsSet) /
979 sizeof(TransactionId));
982 subxids = palloc(sizeof(TransactionId) * nsubxids);
984 XLogRecGetData(record) + SizeOfCommitTsSet,
985 sizeof(TransactionId) * nsubxids);
990 TransactionTreeSetCommitTsData(setts->mainxid, nsubxids, subxids,
991 setts->timestamp, setts->nodeid, true);
996 elog(PANIC, "commit_ts_redo: unknown op code %u", info);