1 /*-------------------------------------------------------------------------
4 * PostgreSQL commit timestamp manager
6 * This module is a pg_xact-like system that stores the commit timestamp
7 * for each transaction.
9 * XLOG interactions: this module generates an XLOG record whenever a new
10 * CommitTs page is initialized to zeroes. Also, one XLOG record is
11 * generated for setting of values when the caller requests it; this allows
12 * us to support values coming from places other than transaction commit.
13 * Other writes of CommitTS come from recording of transaction commit in
14 * xact.c, which generates its own XLOG records for these events and will
15 * re-perform the status update on redo; so we need make no additional XLOG
18 * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
19 * Portions Copyright (c) 1994, Regents of the University of California
21 * src/backend/access/transam/commit_ts.c
23 *-------------------------------------------------------------------------
27 #include "access/commit_ts.h"
28 #include "access/htup_details.h"
29 #include "access/slru.h"
30 #include "access/transam.h"
31 #include "catalog/pg_type.h"
33 #include "miscadmin.h"
35 #include "storage/shmem.h"
36 #include "utils/builtins.h"
37 #include "utils/snapmgr.h"
38 #include "utils/timestamp.h"
41 * Defines for CommitTs page sizes. A page is the same BLCKSZ as is used
42 * everywhere else in Postgres.
44 * Note: because TransactionIds are 32 bits and wrap around at 0xFFFFFFFF,
45 * CommitTs page numbering also wraps around at
46 * 0xFFFFFFFF/COMMIT_TS_XACTS_PER_PAGE, and CommitTs segment numbering at
47 * 0xFFFFFFFF/COMMIT_TS_XACTS_PER_PAGE/SLRU_PAGES_PER_SEGMENT. We need take no
48 * explicit notice of that fact in this module, except when comparing segment
49 * and page numbers in TruncateCommitTs (see CommitTsPagePrecedes).
53 * We need 8+2 bytes per xact. Note that enlarging this struct might mean
54 * the largest possible file name is more than 5 chars long; see
57 typedef struct CommitTimestampEntry
61 } CommitTimestampEntry;
63 #define SizeOfCommitTimestampEntry (offsetof(CommitTimestampEntry, nodeid) + \
66 #define COMMIT_TS_XACTS_PER_PAGE \
67 (BLCKSZ / SizeOfCommitTimestampEntry)
69 #define TransactionIdToCTsPage(xid) \
70 ((xid) / (TransactionId) COMMIT_TS_XACTS_PER_PAGE)
71 #define TransactionIdToCTsEntry(xid) \
72 ((xid) % (TransactionId) COMMIT_TS_XACTS_PER_PAGE)
75 * Link to shared-memory data structures for CommitTs control
77 static SlruCtlData CommitTsCtlData;
79 #define CommitTsCtl (&CommitTsCtlData)
82 * We keep a cache of the last value set in shared memory.
84 * This is also good place to keep the activation status. We keep this
85 * separate from the GUC so that the standby can activate the module if the
86 * primary has it active independently of the value of the GUC.
88 * This is protected by CommitTsLock. In some places, we use commitTsActive
89 * without acquiring the lock; where this happens, a comment explains the
92 typedef struct CommitTimestampShared
94 TransactionId xidLastCommit;
95 CommitTimestampEntry dataLastCommit;
97 } CommitTimestampShared;
99 CommitTimestampShared *commitTsShared;
103 bool track_commit_timestamp;
105 static void SetXidCommitTsInPage(TransactionId xid, int nsubxids,
106 TransactionId *subxids, TimestampTz ts,
107 RepOriginId nodeid, int pageno);
108 static void TransactionIdSetCommitTs(TransactionId xid, TimestampTz ts,
109 RepOriginId nodeid, int slotno);
110 static void error_commit_ts_disabled(void);
111 static int ZeroCommitTsPage(int pageno, bool writeXlog);
112 static bool CommitTsPagePrecedes(int page1, int page2);
113 static void ActivateCommitTs(void);
114 static void DeactivateCommitTs(void);
115 static void WriteZeroPageXlogRec(int pageno);
116 static void WriteTruncateXlogRec(int pageno, TransactionId oldestXid);
117 static void WriteSetTimestampXlogRec(TransactionId mainxid, int nsubxids,
118 TransactionId *subxids, TimestampTz timestamp,
122 * TransactionTreeSetCommitTsData
124 * Record the final commit timestamp of transaction entries in the commit log
125 * for a transaction and its subtransaction tree, as efficiently as possible.
127 * xid is the top level transaction id.
129 * subxids is an array of xids of length nsubxids, representing subtransactions
130 * in the tree of xid. In various cases nsubxids may be zero.
131 * The reason why tracking just the parent xid commit timestamp is not enough
132 * is that the subtrans SLRU does not stay valid across crashes (it's not
133 * permanent) so we need to keep the information about them here. If the
134 * subtrans implementation changes in the future, we might want to revisit the
135 * decision of storing timestamp info for each subxid.
137 * The write_xlog parameter tells us whether to include an XLog record of this
138 * or not. Normally, this is called from transaction commit routines (both
139 * normal and prepared) and the information will be stored in the transaction
140 * commit XLog record, and so they should pass "false" for this. The XLog redo
141 * code should use "false" here as well. Other callers probably want to pass
142 * true, so that the given values persist in case of crashes.
145 TransactionTreeSetCommitTsData(TransactionId xid, int nsubxids,
146 TransactionId *subxids, TimestampTz timestamp,
147 RepOriginId nodeid, bool write_xlog)
150 TransactionId headxid;
151 TransactionId newestXact;
154 * No-op if the module is not active.
156 * An unlocked read here is fine, because in a standby (the only place
157 * where the flag can change in flight) this routine is only called by the
158 * recovery process, which is also the only process which can change the
161 if (!commitTsShared->commitTsActive)
165 * Comply with the WAL-before-data rule: if caller specified it wants this
166 * value to be recorded in WAL, do so before touching the data.
169 WriteSetTimestampXlogRec(xid, nsubxids, subxids, timestamp, nodeid);
172 * Figure out the latest Xid in this batch: either the last subxid if
173 * there's any, otherwise the parent xid.
176 newestXact = subxids[nsubxids - 1];
181 * We split the xids to set the timestamp to in groups belonging to the
182 * same SLRU page; the first element in each such set is its head. The
183 * first group has the main XID as the head; subsequent sets use the first
184 * subxid not on the previous page as head. This way, we only have to
185 * lock/modify each SLRU page once.
187 for (i = 0, headxid = xid;;)
189 int pageno = TransactionIdToCTsPage(headxid);
192 for (j = i; j < nsubxids; j++)
194 if (TransactionIdToCTsPage(subxids[j]) != pageno)
197 /* subxids[i..j] are on the same page as the head */
199 SetXidCommitTsInPage(headxid, j - i, subxids + i, timestamp, nodeid,
202 /* if we wrote out all subxids, we're done. */
203 if (j + 1 >= nsubxids)
207 * Set the new head and skip over it, as well as over the subxids we
210 headxid = subxids[j];
214 /* update the cached value in shared memory */
215 LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
216 commitTsShared->xidLastCommit = xid;
217 commitTsShared->dataLastCommit.time = timestamp;
218 commitTsShared->dataLastCommit.nodeid = nodeid;
220 /* and move forwards our endpoint, if needed */
221 if (TransactionIdPrecedes(ShmemVariableCache->newestCommitTsXid, newestXact))
222 ShmemVariableCache->newestCommitTsXid = newestXact;
223 LWLockRelease(CommitTsLock);
227 * Record the commit timestamp of transaction entries in the commit log for all
228 * entries on a single page. Atomic only on this page.
231 SetXidCommitTsInPage(TransactionId xid, int nsubxids,
232 TransactionId *subxids, TimestampTz ts,
233 RepOriginId nodeid, int pageno)
238 LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE);
240 slotno = SimpleLruReadPage(CommitTsCtl, pageno, true, xid);
242 TransactionIdSetCommitTs(xid, ts, nodeid, slotno);
243 for (i = 0; i < nsubxids; i++)
244 TransactionIdSetCommitTs(subxids[i], ts, nodeid, slotno);
246 CommitTsCtl->shared->page_dirty[slotno] = true;
248 LWLockRelease(CommitTsControlLock);
252 * Sets the commit timestamp of a single transaction.
254 * Must be called with CommitTsControlLock held
257 TransactionIdSetCommitTs(TransactionId xid, TimestampTz ts,
258 RepOriginId nodeid, int slotno)
260 int entryno = TransactionIdToCTsEntry(xid);
261 CommitTimestampEntry entry;
263 Assert(TransactionIdIsNormal(xid));
266 entry.nodeid = nodeid;
268 memcpy(CommitTsCtl->shared->page_buffer[slotno] +
269 SizeOfCommitTimestampEntry * entryno,
270 &entry, SizeOfCommitTimestampEntry);
274 * Interrogate the commit timestamp of a transaction.
276 * The return value indicates whether a commit timestamp record was found for
277 * the given xid. The timestamp value is returned in *ts (which may not be
278 * null), and the origin node for the Xid is returned in *nodeid, if it's not
282 TransactionIdGetCommitTsData(TransactionId xid, TimestampTz *ts,
285 int pageno = TransactionIdToCTsPage(xid);
286 int entryno = TransactionIdToCTsEntry(xid);
288 CommitTimestampEntry entry;
289 TransactionId oldestCommitTsXid;
290 TransactionId newestCommitTsXid;
292 if (!TransactionIdIsValid(xid))
294 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
295 errmsg("cannot retrieve commit timestamp for transaction %u", xid)));
296 else if (!TransactionIdIsNormal(xid))
298 /* frozen and bootstrap xids are always committed far in the past */
305 LWLockAcquire(CommitTsLock, LW_SHARED);
307 /* Error if module not enabled */
308 if (!commitTsShared->commitTsActive)
309 error_commit_ts_disabled();
312 * If we're asked for the cached value, return that. Otherwise, fall
313 * through to read from SLRU.
315 if (commitTsShared->xidLastCommit == xid)
317 *ts = commitTsShared->dataLastCommit.time;
319 *nodeid = commitTsShared->dataLastCommit.nodeid;
321 LWLockRelease(CommitTsLock);
325 oldestCommitTsXid = ShmemVariableCache->oldestCommitTsXid;
326 newestCommitTsXid = ShmemVariableCache->newestCommitTsXid;
327 /* neither is invalid, or both are */
328 Assert(TransactionIdIsValid(oldestCommitTsXid) == TransactionIdIsValid(newestCommitTsXid));
329 LWLockRelease(CommitTsLock);
332 * Return empty if the requested value is outside our valid range.
334 if (!TransactionIdIsValid(oldestCommitTsXid) ||
335 TransactionIdPrecedes(xid, oldestCommitTsXid) ||
336 TransactionIdPrecedes(newestCommitTsXid, xid))
340 *nodeid = InvalidRepOriginId;
344 /* lock is acquired by SimpleLruReadPage_ReadOnly */
345 slotno = SimpleLruReadPage_ReadOnly(CommitTsCtl, pageno, xid);
347 CommitTsCtl->shared->page_buffer[slotno] +
348 SizeOfCommitTimestampEntry * entryno,
349 SizeOfCommitTimestampEntry);
353 *nodeid = entry.nodeid;
355 LWLockRelease(CommitTsControlLock);
360 * Return the Xid of the latest committed transaction. (As far as this module
361 * is concerned, anyway; it's up to the caller to ensure the value is useful
364 * ts and extra are filled with the corresponding data; they can be passed
365 * as NULL if not wanted.
368 GetLatestCommitTsData(TimestampTz *ts, RepOriginId *nodeid)
372 LWLockAcquire(CommitTsLock, LW_SHARED);
374 /* Error if module not enabled */
375 if (!commitTsShared->commitTsActive)
376 error_commit_ts_disabled();
378 xid = commitTsShared->xidLastCommit;
380 *ts = commitTsShared->dataLastCommit.time;
382 *nodeid = commitTsShared->dataLastCommit.nodeid;
383 LWLockRelease(CommitTsLock);
389 error_commit_ts_disabled(void)
392 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
393 errmsg("could not get commit timestamp data"),
394 RecoveryInProgress() ?
395 errhint("Make sure the configuration parameter \"%s\" is set on the master server.",
396 "track_commit_timestamp") :
397 errhint("Make sure the configuration parameter \"%s\" is set.",
398 "track_commit_timestamp")));
402 * SQL-callable wrapper to obtain commit time of a transaction
405 pg_xact_commit_timestamp(PG_FUNCTION_ARGS)
407 TransactionId xid = PG_GETARG_UINT32(0);
411 found = TransactionIdGetCommitTsData(xid, &ts, NULL);
416 PG_RETURN_TIMESTAMPTZ(ts);
421 pg_last_committed_xact(PG_FUNCTION_ARGS)
430 /* and construct a tuple with our data */
431 xid = GetLatestCommitTsData(&ts, NULL);
434 * Construct a tuple descriptor for the result row. This must match this
435 * function's pg_proc entry!
437 tupdesc = CreateTemplateTupleDesc(2, false);
438 TupleDescInitEntry(tupdesc, (AttrNumber) 1, "xid",
440 TupleDescInitEntry(tupdesc, (AttrNumber) 2, "timestamp",
441 TIMESTAMPTZOID, -1, 0);
442 tupdesc = BlessTupleDesc(tupdesc);
444 if (!TransactionIdIsNormal(xid))
446 memset(nulls, true, sizeof(nulls));
450 values[0] = TransactionIdGetDatum(xid);
453 values[1] = TimestampTzGetDatum(ts);
457 htup = heap_form_tuple(tupdesc, values, nulls);
459 PG_RETURN_DATUM(HeapTupleGetDatum(htup));
464 * Number of shared CommitTS buffers.
466 * We use a very similar logic as for the number of CLOG buffers; see comments
467 * in CLOGShmemBuffers.
470 CommitTsShmemBuffers(void)
472 return Min(16, Max(4, NBuffers / 1024));
476 * Shared memory sizing for CommitTs
479 CommitTsShmemSize(void)
481 return SimpleLruShmemSize(CommitTsShmemBuffers(), 0) +
482 sizeof(CommitTimestampShared);
486 * Initialize CommitTs at system startup (postmaster start or standalone
490 CommitTsShmemInit(void)
494 CommitTsCtl->PagePrecedes = CommitTsPagePrecedes;
495 SimpleLruInit(CommitTsCtl, "commit_timestamp", CommitTsShmemBuffers(), 0,
496 CommitTsControlLock, "pg_commit_ts",
497 LWTRANCHE_COMMITTS_BUFFERS);
499 commitTsShared = ShmemInitStruct("CommitTs shared",
500 sizeof(CommitTimestampShared),
503 if (!IsUnderPostmaster)
507 commitTsShared->xidLastCommit = InvalidTransactionId;
508 TIMESTAMP_NOBEGIN(commitTsShared->dataLastCommit.time);
509 commitTsShared->dataLastCommit.nodeid = InvalidRepOriginId;
510 commitTsShared->commitTsActive = false;
517 * This function must be called ONCE on system install.
519 * (The CommitTs directory is assumed to have been created by initdb, and
520 * CommitTsShmemInit must have been called already.)
523 BootStrapCommitTs(void)
526 * Nothing to do here at present, unlike most other SLRU modules; segments
527 * are created when the server is started with this module enabled. See
533 * Initialize (or reinitialize) a page of CommitTs to zeroes.
534 * If writeXlog is TRUE, also emit an XLOG record saying we did this.
536 * The page is not actually written, just set up in shared memory.
537 * The slot number of the new page is returned.
539 * Control lock must be held at entry, and will be held at exit.
542 ZeroCommitTsPage(int pageno, bool writeXlog)
546 slotno = SimpleLruZeroPage(CommitTsCtl, pageno);
549 WriteZeroPageXlogRec(pageno);
555 * This must be called ONCE during postmaster or standalone-backend startup,
556 * after StartupXLOG has initialized ShmemVariableCache->nextXid.
559 StartupCommitTs(void)
565 * This must be called ONCE during postmaster or standalone-backend startup,
566 * after recovery has finished.
569 CompleteCommitTsInitialization(void)
572 * If the feature is not enabled, turn it off for good. This also removes
575 * Conversely, we activate the module if the feature is enabled. This is
576 * not necessary in a master system because we already did it earlier, but
577 * if we're in a standby server that got promoted which had the feature
578 * enabled and was following a master that had the feature disabled, this
579 * is where we turn it on locally.
581 if (!track_commit_timestamp)
582 DeactivateCommitTs();
588 * Activate or deactivate CommitTs' upon reception of a XLOG_PARAMETER_CHANGE
589 * XLog record in a standby.
592 CommitTsParameterChange(bool newvalue, bool oldvalue)
595 * If the commit_ts module is disabled in this server and we get word from
596 * the master server that it is enabled there, activate it so that we can
597 * replay future WAL records involving it; also mark it as active on
598 * pg_control. If the old value was already set, we already did this, so
601 * If the module is disabled in the master, disable it here too, unless
602 * the module is enabled locally.
604 * Note this only runs in the recovery process, so an unlocked read is
609 if (!commitTsShared->commitTsActive)
612 else if (commitTsShared->commitTsActive)
613 DeactivateCommitTs();
617 * Activate this module whenever necessary.
618 * This must happen during postmaster or standalone-backend startup,
619 * or during WAL replay anytime the track_commit_timestamp setting is
620 * changed in the master.
622 * The reason why this SLRU needs separate activation/deactivation functions is
623 * that it can be enabled/disabled during start and the activation/deactivation
624 * on master is propagated to slave via replay. Other SLRUs don't have this
625 * property and they can be just initialized during normal startup.
627 * This is in charge of creating the currently active segment, if it's not
628 * already there. The reason for this is that the server might have been
629 * running with this module disabled for a while and thus might have skipped
630 * the normal creation point.
633 ActivateCommitTs(void)
638 /* If we've done this already, there's nothing to do */
639 LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
640 if (commitTsShared->commitTsActive)
642 LWLockRelease(CommitTsLock);
645 LWLockRelease(CommitTsLock);
647 xid = ShmemVariableCache->nextXid;
648 pageno = TransactionIdToCTsPage(xid);
651 * Re-Initialize our idea of the latest page number.
653 LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE);
654 CommitTsCtl->shared->latest_page_number = pageno;
655 LWLockRelease(CommitTsControlLock);
658 * If CommitTs is enabled, but it wasn't in the previous server run, we
659 * need to set the oldest and newest values to the next Xid; that way, we
660 * will not try to read data that might not have been set.
662 * XXX does this have a problem if a server is started with commitTs
663 * enabled, then started with commitTs disabled, then restarted with it
664 * enabled again? It doesn't look like it does, because there should be a
665 * checkpoint that sets the value to InvalidTransactionId at end of
666 * recovery; and so any chance of injecting new transactions without
667 * CommitTs values would occur after the oldestCommitTsXid has been set to
668 * Invalid temporarily.
670 LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
671 if (ShmemVariableCache->oldestCommitTsXid == InvalidTransactionId)
673 ShmemVariableCache->oldestCommitTsXid =
674 ShmemVariableCache->newestCommitTsXid = ReadNewTransactionId();
676 LWLockRelease(CommitTsLock);
678 /* Create the current segment file, if necessary */
679 if (!SimpleLruDoesPhysicalPageExist(CommitTsCtl, pageno))
683 LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE);
684 slotno = ZeroCommitTsPage(pageno, false);
685 SimpleLruWritePage(CommitTsCtl, slotno);
686 Assert(!CommitTsCtl->shared->page_dirty[slotno]);
687 LWLockRelease(CommitTsControlLock);
690 /* Change the activation status in shared memory. */
691 LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
692 commitTsShared->commitTsActive = true;
693 LWLockRelease(CommitTsLock);
697 * Deactivate this module.
699 * This must be called when the track_commit_timestamp parameter is turned off.
700 * This happens during postmaster or standalone-backend startup, or during WAL
703 * Resets CommitTs into invalid state to make sure we don't hand back
704 * possibly-invalid data; also removes segments of old data.
707 DeactivateCommitTs(void)
710 * Cleanup the status in the shared memory.
712 * We reset everything in the commitTsShared record to prevent user from
713 * getting confusing data about last committed transaction on the standby
714 * when the module was activated repeatedly on the primary.
716 LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
718 commitTsShared->commitTsActive = false;
719 commitTsShared->xidLastCommit = InvalidTransactionId;
720 TIMESTAMP_NOBEGIN(commitTsShared->dataLastCommit.time);
721 commitTsShared->dataLastCommit.nodeid = InvalidRepOriginId;
723 ShmemVariableCache->oldestCommitTsXid = InvalidTransactionId;
724 ShmemVariableCache->newestCommitTsXid = InvalidTransactionId;
726 LWLockRelease(CommitTsLock);
729 * Remove *all* files. This is necessary so that there are no leftover
730 * files; in the case where this feature is later enabled after running
731 * with it disabled for some time there may be a gap in the file sequence.
732 * (We can probably tolerate out-of-sequence files, as they are going to
733 * be overwritten anyway when we wrap around, but it seems better to be
736 LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE);
737 (void) SlruScanDirectory(CommitTsCtl, SlruScanDirCbDeleteAll, NULL);
738 LWLockRelease(CommitTsControlLock);
742 * This must be called ONCE during postmaster or standalone-backend shutdown
745 ShutdownCommitTs(void)
747 /* Flush dirty CommitTs pages to disk */
748 SimpleLruFlush(CommitTsCtl, false);
751 * fsync pg_commit_ts to ensure that any files flushed previously are
754 fsync_fname("pg_commit_ts", true);
758 * Perform a checkpoint --- either during shutdown, or on-the-fly
761 CheckPointCommitTs(void)
763 /* Flush dirty CommitTs pages to disk */
764 SimpleLruFlush(CommitTsCtl, true);
767 * fsync pg_commit_ts to ensure that any files flushed previously are
770 fsync_fname("pg_commit_ts", true);
774 * Make sure that CommitTs has room for a newly-allocated XID.
776 * NB: this is called while holding XidGenLock. We want it to be very fast
777 * most of the time; even when it's not so fast, no actual I/O need happen
778 * unless we're forced to write out a dirty CommitTs or xlog page to make room
781 * NB: the current implementation relies on track_commit_timestamp being
785 ExtendCommitTs(TransactionId newestXact)
790 * Nothing to do if module not enabled. Note we do an unlocked read of
791 * the flag here, which is okay because this routine is only called from
792 * GetNewTransactionId, which is never called in a standby.
795 if (!commitTsShared->commitTsActive)
799 * No work except at first XID of a page. But beware: just after
800 * wraparound, the first XID of page zero is FirstNormalTransactionId.
802 if (TransactionIdToCTsEntry(newestXact) != 0 &&
803 !TransactionIdEquals(newestXact, FirstNormalTransactionId))
806 pageno = TransactionIdToCTsPage(newestXact);
808 LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE);
810 /* Zero the page and make an XLOG entry about it */
811 ZeroCommitTsPage(pageno, !InRecovery);
813 LWLockRelease(CommitTsControlLock);
817 * Remove all CommitTs segments before the one holding the passed
820 * Note that we don't need to flush XLOG here.
823 TruncateCommitTs(TransactionId oldestXact)
828 * The cutoff point is the start of the segment containing oldestXact. We
829 * pass the *page* containing oldestXact to SimpleLruTruncate.
831 cutoffPage = TransactionIdToCTsPage(oldestXact);
833 /* Check to see if there's any files that could be removed */
834 if (!SlruScanDirectory(CommitTsCtl, SlruScanDirCbReportPresence,
836 return; /* nothing to remove */
838 /* Write XLOG record */
839 WriteTruncateXlogRec(cutoffPage, oldestXact);
841 /* Now we can remove the old CommitTs segment(s) */
842 SimpleLruTruncate(CommitTsCtl, cutoffPage);
846 * Set the limit values between which commit TS can be consulted.
849 SetCommitTsLimit(TransactionId oldestXact, TransactionId newestXact)
852 * Be careful not to overwrite values that are either further into the
853 * "future" or signal a disabled committs.
855 LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
856 if (ShmemVariableCache->oldestCommitTsXid != InvalidTransactionId)
858 if (TransactionIdPrecedes(ShmemVariableCache->oldestCommitTsXid, oldestXact))
859 ShmemVariableCache->oldestCommitTsXid = oldestXact;
860 if (TransactionIdPrecedes(newestXact, ShmemVariableCache->newestCommitTsXid))
861 ShmemVariableCache->newestCommitTsXid = newestXact;
865 Assert(ShmemVariableCache->newestCommitTsXid == InvalidTransactionId);
866 ShmemVariableCache->oldestCommitTsXid = oldestXact;
867 ShmemVariableCache->newestCommitTsXid = newestXact;
869 LWLockRelease(CommitTsLock);
873 * Move forwards the oldest commitTS value that can be consulted
876 AdvanceOldestCommitTsXid(TransactionId oldestXact)
878 LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
879 if (ShmemVariableCache->oldestCommitTsXid != InvalidTransactionId &&
880 TransactionIdPrecedes(ShmemVariableCache->oldestCommitTsXid, oldestXact))
881 ShmemVariableCache->oldestCommitTsXid = oldestXact;
882 LWLockRelease(CommitTsLock);
887 * Decide which of two CLOG page numbers is "older" for truncation purposes.
889 * We need to use comparison of TransactionIds here in order to do the right
890 * thing with wraparound XID arithmetic. However, if we are asked about
891 * page number zero, we don't want to hand InvalidTransactionId to
892 * TransactionIdPrecedes: it'll get weird about permanent xact IDs. So,
893 * offset both xids by FirstNormalTransactionId to avoid that.
896 CommitTsPagePrecedes(int page1, int page2)
901 xid1 = ((TransactionId) page1) * COMMIT_TS_XACTS_PER_PAGE;
902 xid1 += FirstNormalTransactionId;
903 xid2 = ((TransactionId) page2) * COMMIT_TS_XACTS_PER_PAGE;
904 xid2 += FirstNormalTransactionId;
906 return TransactionIdPrecedes(xid1, xid2);
911 * Write a ZEROPAGE xlog record
914 WriteZeroPageXlogRec(int pageno)
917 XLogRegisterData((char *) (&pageno), sizeof(int));
918 (void) XLogInsert(RM_COMMIT_TS_ID, COMMIT_TS_ZEROPAGE);
922 * Write a TRUNCATE xlog record
925 WriteTruncateXlogRec(int pageno, TransactionId oldestXid)
927 xl_commit_ts_truncate xlrec;
929 xlrec.pageno = pageno;
930 xlrec.oldestXid = oldestXid;
933 XLogRegisterData((char *) (&xlrec), SizeOfCommitTsTruncate);
934 (void) XLogInsert(RM_COMMIT_TS_ID, COMMIT_TS_TRUNCATE);
938 * Write a SETTS xlog record
941 WriteSetTimestampXlogRec(TransactionId mainxid, int nsubxids,
942 TransactionId *subxids, TimestampTz timestamp,
945 xl_commit_ts_set record;
947 record.timestamp = timestamp;
948 record.nodeid = nodeid;
949 record.mainxid = mainxid;
952 XLogRegisterData((char *) &record,
953 offsetof(xl_commit_ts_set, mainxid) +
954 sizeof(TransactionId));
955 XLogRegisterData((char *) subxids, nsubxids * sizeof(TransactionId));
956 XLogInsert(RM_COMMIT_TS_ID, COMMIT_TS_SETTS);
960 * CommitTS resource manager's routines
963 commit_ts_redo(XLogReaderState *record)
965 uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
967 /* Backup blocks are not used in commit_ts records */
968 Assert(!XLogRecHasAnyBlockRefs(record));
970 if (info == COMMIT_TS_ZEROPAGE)
975 memcpy(&pageno, XLogRecGetData(record), sizeof(int));
977 LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE);
979 slotno = ZeroCommitTsPage(pageno, false);
980 SimpleLruWritePage(CommitTsCtl, slotno);
981 Assert(!CommitTsCtl->shared->page_dirty[slotno]);
983 LWLockRelease(CommitTsControlLock);
985 else if (info == COMMIT_TS_TRUNCATE)
987 xl_commit_ts_truncate *trunc = (xl_commit_ts_truncate *) XLogRecGetData(record);
989 AdvanceOldestCommitTsXid(trunc->oldestXid);
992 * During XLOG replay, latest_page_number isn't set up yet; insert a
993 * suitable value to bypass the sanity test in SimpleLruTruncate.
995 CommitTsCtl->shared->latest_page_number = trunc->pageno;
997 SimpleLruTruncate(CommitTsCtl, trunc->pageno);
999 else if (info == COMMIT_TS_SETTS)
1001 xl_commit_ts_set *setts = (xl_commit_ts_set *) XLogRecGetData(record);
1003 TransactionId *subxids;
1005 nsubxids = ((XLogRecGetDataLen(record) - SizeOfCommitTsSet) /
1006 sizeof(TransactionId));
1009 subxids = palloc(sizeof(TransactionId) * nsubxids);
1011 XLogRecGetData(record) + SizeOfCommitTsSet,
1012 sizeof(TransactionId) * nsubxids);
1017 TransactionTreeSetCommitTsData(setts->mainxid, nsubxids, subxids,
1018 setts->timestamp, setts->nodeid, true);
1023 elog(PANIC, "commit_ts_redo: unknown op code %u", info);