1 /*-------------------------------------------------------------------------
4 * PostgreSQL commit timestamp manager
6 * This module is a pg_clog-like system that stores the commit timestamp
7 * for each transaction.
9 * XLOG interactions: this module generates an XLOG record whenever a new
10 * CommitTs page is initialized to zeroes. Also, one XLOG record is
11 * generated for setting of values when the caller requests it; this allows
12 * us to support values coming from places other than transaction commit.
13 * Other writes of CommitTS come from recording of transaction commit in
14 * xact.c, which generates its own XLOG records for these events and will
15 * re-perform the status update on redo; so we need make no additional XLOG
18 * Portions Copyright (c) 1996-2015, PostgreSQL Global Development Group
19 * Portions Copyright (c) 1994, Regents of the University of California
21 * src/backend/access/transam/commit_ts.c
23 *-------------------------------------------------------------------------
27 #include "access/commit_ts.h"
28 #include "access/htup_details.h"
29 #include "access/slru.h"
30 #include "access/transam.h"
31 #include "catalog/pg_type.h"
33 #include "miscadmin.h"
35 #include "utils/builtins.h"
36 #include "utils/snapmgr.h"
37 #include "utils/timestamp.h"
40 * Defines for CommitTs page sizes. A page is the same BLCKSZ as is used
41 * everywhere else in Postgres.
43 * Note: because TransactionIds are 32 bits and wrap around at 0xFFFFFFFF,
44 * CommitTs page numbering also wraps around at
45 * 0xFFFFFFFF/COMMIT_TS_XACTS_PER_PAGE, and CommitTs segment numbering at
46 * 0xFFFFFFFF/COMMIT_TS_XACTS_PER_PAGE/SLRU_PAGES_PER_SEGMENT. We need take no
47 * explicit notice of that fact in this module, except when comparing segment
48 * and page numbers in TruncateCommitTs (see CommitTsPagePrecedes).
52 * We need 8+2 bytes per xact. Note that enlarging this struct might mean
53 * the largest possible file name is more than 5 chars long; see
56 typedef struct CommitTimestampEntry
60 } CommitTimestampEntry;
62 #define SizeOfCommitTimestampEntry (offsetof(CommitTimestampEntry, nodeid) + \
65 #define COMMIT_TS_XACTS_PER_PAGE \
66 (BLCKSZ / SizeOfCommitTimestampEntry)
68 #define TransactionIdToCTsPage(xid) \
69 ((xid) / (TransactionId) COMMIT_TS_XACTS_PER_PAGE)
70 #define TransactionIdToCTsEntry(xid) \
71 ((xid) % (TransactionId) COMMIT_TS_XACTS_PER_PAGE)
74 * Link to shared-memory data structures for CommitTs control
76 static SlruCtlData CommitTsCtlData;
78 #define CommitTsCtl (&CommitTsCtlData)
81 * We keep a cache of the last value set in shared memory. This is protected
84 typedef struct CommitTimestampShared
86 TransactionId xidLastCommit;
87 CommitTimestampEntry dataLastCommit;
88 } CommitTimestampShared;
90 CommitTimestampShared *commitTsShared;
94 bool track_commit_timestamp;
97 * When this is set, commit_ts is force-enabled during recovery. This is so
98 * that a standby can replay WAL records coming from a master with the setting
99 * enabled. (Note that this doesn't enable SQL access to the data; it's
100 * effectively write-only until the GUC itself is enabled.)
102 static bool enable_during_recovery;
104 static void SetXidCommitTsInPage(TransactionId xid, int nsubxids,
105 TransactionId *subxids, TimestampTz ts,
106 RepOriginId nodeid, int pageno);
107 static void TransactionIdSetCommitTs(TransactionId xid, TimestampTz ts,
108 RepOriginId nodeid, int slotno);
109 static int ZeroCommitTsPage(int pageno, bool writeXlog);
110 static bool CommitTsPagePrecedes(int page1, int page2);
111 static void ActivateCommitTs(void);
112 static void DeactivateCommitTs(bool do_wal);
113 static void WriteZeroPageXlogRec(int pageno);
114 static void WriteTruncateXlogRec(int pageno);
115 static void WriteSetTimestampXlogRec(TransactionId mainxid, int nsubxids,
116 TransactionId *subxids, TimestampTz timestamp,
120 * TransactionTreeSetCommitTsData
122 * Record the final commit timestamp of transaction entries in the commit log
123 * for a transaction and its subtransaction tree, as efficiently as possible.
125 * xid is the top level transaction id.
127 * subxids is an array of xids of length nsubxids, representing subtransactions
128 * in the tree of xid. In various cases nsubxids may be zero.
129 * The reason why tracking just the parent xid commit timestamp is not enough
130 * is that the subtrans SLRU does not stay valid across crashes (it's not
131 * permanent) so we need to keep the information about them here. If the
132 * subtrans implementation changes in the future, we might want to revisit the
133 * decision of storing timestamp info for each subxid.
135 * The write_xlog parameter tells us whether to include an XLog record of this
136 * or not. Normally, this is called from transaction commit routines (both
137 * normal and prepared) and the information will be stored in the transaction
138 * commit XLog record, and so they should pass "false" for this. The XLog redo
139 * code should use "false" here as well. Other callers probably want to pass
140 * true, so that the given values persist in case of crashes.
143 TransactionTreeSetCommitTsData(TransactionId xid, int nsubxids,
144 TransactionId *subxids, TimestampTz timestamp,
145 RepOriginId nodeid, bool write_xlog)
148 TransactionId headxid;
149 TransactionId newestXact;
152 * No-op if the module is not enabled, but allow writes in a standby
155 if (!track_commit_timestamp && !enable_during_recovery)
159 * Comply with the WAL-before-data rule: if caller specified it wants this
160 * value to be recorded in WAL, do so before touching the data.
163 WriteSetTimestampXlogRec(xid, nsubxids, subxids, timestamp, nodeid);
166 * Figure out the latest Xid in this batch: either the last subxid if
167 * there's any, otherwise the parent xid.
170 newestXact = subxids[nsubxids - 1];
175 * We split the xids to set the timestamp to in groups belonging to the
176 * same SLRU page; the first element in each such set is its head. The
177 * first group has the main XID as the head; subsequent sets use the first
178 * subxid not on the previous page as head. This way, we only have to
179 * lock/modify each SLRU page once.
181 for (i = 0, headxid = xid;;)
183 int pageno = TransactionIdToCTsPage(headxid);
186 for (j = i; j < nsubxids; j++)
188 if (TransactionIdToCTsPage(subxids[j]) != pageno)
191 /* subxids[i..j] are on the same page as the head */
193 SetXidCommitTsInPage(headxid, j - i, subxids + i, timestamp, nodeid,
196 /* if we wrote out all subxids, we're done. */
197 if (j + 1 >= nsubxids)
201 * Set the new head and skip over it, as well as over the subxids we
204 headxid = subxids[j];
208 /* update the cached value in shared memory */
209 LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
210 commitTsShared->xidLastCommit = xid;
211 commitTsShared->dataLastCommit.time = timestamp;
212 commitTsShared->dataLastCommit.nodeid = nodeid;
214 /* and move forwards our endpoint, if needed */
215 if (TransactionIdPrecedes(ShmemVariableCache->newestCommitTs, newestXact))
216 ShmemVariableCache->newestCommitTs = newestXact;
217 LWLockRelease(CommitTsLock);
221 * Record the commit timestamp of transaction entries in the commit log for all
222 * entries on a single page. Atomic only on this page.
225 SetXidCommitTsInPage(TransactionId xid, int nsubxids,
226 TransactionId *subxids, TimestampTz ts,
227 RepOriginId nodeid, int pageno)
232 LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE);
234 slotno = SimpleLruReadPage(CommitTsCtl, pageno, true, xid);
236 TransactionIdSetCommitTs(xid, ts, nodeid, slotno);
237 for (i = 0; i < nsubxids; i++)
238 TransactionIdSetCommitTs(subxids[i], ts, nodeid, slotno);
240 CommitTsCtl->shared->page_dirty[slotno] = true;
242 LWLockRelease(CommitTsControlLock);
246 * Sets the commit timestamp of a single transaction.
248 * Must be called with CommitTsControlLock held
251 TransactionIdSetCommitTs(TransactionId xid, TimestampTz ts,
252 RepOriginId nodeid, int slotno)
254 int entryno = TransactionIdToCTsEntry(xid);
255 CommitTimestampEntry entry;
257 Assert(TransactionIdIsNormal(xid));
260 entry.nodeid = nodeid;
262 memcpy(CommitTsCtl->shared->page_buffer[slotno] +
263 SizeOfCommitTimestampEntry * entryno,
264 &entry, SizeOfCommitTimestampEntry);
268 * Interrogate the commit timestamp of a transaction.
270 * The return value indicates whether a commit timestamp record was found for
271 * the given xid. The timestamp value is returned in *ts (which may not be
272 * null), and the origin node for the Xid is returned in *nodeid, if it's not
276 TransactionIdGetCommitTsData(TransactionId xid, TimestampTz *ts,
279 int pageno = TransactionIdToCTsPage(xid);
280 int entryno = TransactionIdToCTsEntry(xid);
282 CommitTimestampEntry entry;
283 TransactionId oldestCommitTs;
284 TransactionId newestCommitTs;
286 /* Error if module not enabled */
287 if (!track_commit_timestamp)
289 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
290 errmsg("could not get commit timestamp data"),
291 errhint("Make sure the configuration parameter \"%s\" is set.",
292 "track_commit_timestamp")));
294 /* error if the given Xid doesn't normally commit */
295 if (!TransactionIdIsNormal(xid))
297 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
298 errmsg("cannot retrieve commit timestamp for transaction %u", xid)));
301 * Return empty if the requested value is outside our valid range.
303 LWLockAcquire(CommitTsLock, LW_SHARED);
304 oldestCommitTs = ShmemVariableCache->oldestCommitTs;
305 newestCommitTs = ShmemVariableCache->newestCommitTs;
306 /* neither is invalid, or both are */
307 Assert(TransactionIdIsValid(oldestCommitTs) == TransactionIdIsValid(newestCommitTs));
308 LWLockRelease(CommitTsLock);
310 if (!TransactionIdIsValid(oldestCommitTs) ||
311 TransactionIdPrecedes(xid, oldestCommitTs) ||
312 TransactionIdPrecedes(newestCommitTs, xid))
316 *nodeid = InvalidRepOriginId;
321 * Use an unlocked atomic read on our cached value in shared memory; if
322 * it's a hit, acquire a lock and read the data, after verifying that it's
323 * still what we initially read. Otherwise, fall through to read from
326 if (commitTsShared->xidLastCommit == xid)
328 LWLockAcquire(CommitTsLock, LW_SHARED);
329 if (commitTsShared->xidLastCommit == xid)
331 *ts = commitTsShared->dataLastCommit.time;
333 *nodeid = commitTsShared->dataLastCommit.nodeid;
335 LWLockRelease(CommitTsLock);
338 LWLockRelease(CommitTsLock);
341 /* lock is acquired by SimpleLruReadPage_ReadOnly */
342 slotno = SimpleLruReadPage_ReadOnly(CommitTsCtl, pageno, xid);
344 CommitTsCtl->shared->page_buffer[slotno] +
345 SizeOfCommitTimestampEntry * entryno,
346 SizeOfCommitTimestampEntry);
350 *nodeid = entry.nodeid;
352 LWLockRelease(CommitTsControlLock);
357 * Return the Xid of the latest committed transaction. (As far as this module
358 * is concerned, anyway; it's up to the caller to ensure the value is useful
361 * ts and extra are filled with the corresponding data; they can be passed
362 * as NULL if not wanted.
365 GetLatestCommitTsData(TimestampTz *ts, RepOriginId *nodeid)
369 /* Error if module not enabled */
370 if (!track_commit_timestamp)
372 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
373 errmsg("could not get commit timestamp data"),
374 errhint("Make sure the configuration parameter \"%s\" is set.",
375 "track_commit_timestamp")));
377 LWLockAcquire(CommitTsLock, LW_SHARED);
378 xid = commitTsShared->xidLastCommit;
380 *ts = commitTsShared->dataLastCommit.time;
382 *nodeid = commitTsShared->dataLastCommit.nodeid;
383 LWLockRelease(CommitTsLock);
389 * SQL-callable wrapper to obtain commit time of a transaction
392 pg_xact_commit_timestamp(PG_FUNCTION_ARGS)
394 TransactionId xid = PG_GETARG_UINT32(0);
398 found = TransactionIdGetCommitTsData(xid, &ts, NULL);
403 PG_RETURN_TIMESTAMPTZ(ts);
408 pg_last_committed_xact(PG_FUNCTION_ARGS)
417 /* and construct a tuple with our data */
418 xid = GetLatestCommitTsData(&ts, NULL);
421 * Construct a tuple descriptor for the result row. This must match this
422 * function's pg_proc entry!
424 tupdesc = CreateTemplateTupleDesc(2, false);
425 TupleDescInitEntry(tupdesc, (AttrNumber) 1, "xid",
427 TupleDescInitEntry(tupdesc, (AttrNumber) 2, "timestamp",
428 TIMESTAMPTZOID, -1, 0);
429 tupdesc = BlessTupleDesc(tupdesc);
431 if (!TransactionIdIsNormal(xid))
433 memset(nulls, true, sizeof(nulls));
437 values[0] = TransactionIdGetDatum(xid);
440 values[1] = TimestampTzGetDatum(ts);
444 htup = heap_form_tuple(tupdesc, values, nulls);
446 PG_RETURN_DATUM(HeapTupleGetDatum(htup));
451 * Number of shared CommitTS buffers.
453 * We use a very similar logic as for the number of CLOG buffers; see comments
454 * in CLOGShmemBuffers.
457 CommitTsShmemBuffers(void)
459 return Min(16, Max(4, NBuffers / 1024));
463 * Shared memory sizing for CommitTs
466 CommitTsShmemSize(void)
468 return SimpleLruShmemSize(CommitTsShmemBuffers(), 0) +
469 sizeof(CommitTimestampShared);
473 * Initialize CommitTs at system startup (postmaster start or standalone
477 CommitTsShmemInit(void)
481 CommitTsCtl->PagePrecedes = CommitTsPagePrecedes;
482 SimpleLruInit(CommitTsCtl, "CommitTs Ctl", CommitTsShmemBuffers(), 0,
483 CommitTsControlLock, "pg_commit_ts");
485 commitTsShared = ShmemInitStruct("CommitTs shared",
486 sizeof(CommitTimestampShared),
489 if (!IsUnderPostmaster)
493 commitTsShared->xidLastCommit = InvalidTransactionId;
494 TIMESTAMP_NOBEGIN(commitTsShared->dataLastCommit.time);
495 commitTsShared->dataLastCommit.nodeid = InvalidRepOriginId;
502 * This function must be called ONCE on system install.
504 * (The CommitTs directory is assumed to have been created by initdb, and
505 * CommitTsShmemInit must have been called already.)
508 BootStrapCommitTs(void)
511 * Nothing to do here at present, unlike most other SLRU modules; segments
512 * are created when the server is started with this module enabled. See
518 * Initialize (or reinitialize) a page of CommitTs to zeroes.
519 * If writeXlog is TRUE, also emit an XLOG record saying we did this.
521 * The page is not actually written, just set up in shared memory.
522 * The slot number of the new page is returned.
524 * Control lock must be held at entry, and will be held at exit.
527 ZeroCommitTsPage(int pageno, bool writeXlog)
531 slotno = SimpleLruZeroPage(CommitTsCtl, pageno);
534 WriteZeroPageXlogRec(pageno);
540 * This must be called ONCE during postmaster or standalone-backend startup,
541 * after StartupXLOG has initialized ShmemVariableCache->nextXid.
543 * Caller may choose to enable the feature even when it is turned off in the
547 StartupCommitTs(bool force_enable)
550 * If the module is not enabled, there's nothing to do here. The module
551 * could still be activated from elsewhere.
553 if (track_commit_timestamp || force_enable)
558 * This must be called ONCE during postmaster or standalone-backend startup,
559 * after recovery has finished.
562 CompleteCommitTsInitialization(void)
565 * If the feature is not enabled, turn it off for good. This also removes
568 if (!track_commit_timestamp)
569 DeactivateCommitTs(true);
573 * Activate or deactivate CommitTs' upon reception of a XLOG_PARAMETER_CHANGE
574 * XLog record in a standby.
577 CommitTsParameterChange(bool newvalue, bool oldvalue)
580 * If the commit_ts module is disabled in this server and we get word from
581 * the master server that it is enabled there, activate it so that we can
582 * replay future WAL records involving it; also mark it as active on
583 * pg_control. If the old value was already set, we already did this, so
586 * If the module is disabled in the master, disable it here too.
590 if (!track_commit_timestamp && !oldvalue)
594 DeactivateCommitTs(false);
598 * Activate this module whenever necessary.
599 * This must happen during postmaster or standalong-backend startup,
600 * or during WAL replay anytime the track_commit_timestamp setting is
601 * changed in the master.
603 * The reason why this SLRU needs separate activation/deactivation functions is
604 * that it can be enabled/disabled during start and the activation/deactivation
605 * on master is propagated to slave via replay. Other SLRUs don't have this
606 * property and they can be just initialized during normal startup.
608 * This is in charge of creating the currently active segment, if it's not
609 * already there. The reason for this is that the server might have been
610 * running with this module disabled for a while and thus might have skipped
611 * the normal creation point.
614 ActivateCommitTs(void)
616 TransactionId xid = ShmemVariableCache->nextXid;
617 int pageno = TransactionIdToCTsPage(xid);
620 * Re-Initialize our idea of the latest page number.
622 LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE);
623 CommitTsCtl->shared->latest_page_number = pageno;
624 LWLockRelease(CommitTsControlLock);
627 * If CommitTs is enabled, but it wasn't in the previous server run, we
628 * need to set the oldest and newest values to the next Xid; that way, we
629 * will not try to read data that might not have been set.
631 * XXX does this have a problem if a server is started with commitTs
632 * enabled, then started with commitTs disabled, then restarted with it
633 * enabled again? It doesn't look like it does, because there should be a
634 * checkpoint that sets the value to InvalidTransactionId at end of
635 * recovery; and so any chance of injecting new transactions without
636 * CommitTs values would occur after the oldestCommitTs has been set to
637 * Invalid temporarily.
639 LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
640 if (ShmemVariableCache->oldestCommitTs == InvalidTransactionId)
642 ShmemVariableCache->oldestCommitTs =
643 ShmemVariableCache->newestCommitTs = ReadNewTransactionId();
645 LWLockRelease(CommitTsLock);
647 /* Finally, create the current segment file, if necessary */
648 if (!SimpleLruDoesPhysicalPageExist(CommitTsCtl, pageno))
652 LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE);
653 slotno = ZeroCommitTsPage(pageno, false);
654 SimpleLruWritePage(CommitTsCtl, slotno);
655 Assert(!CommitTsCtl->shared->page_dirty[slotno]);
656 LWLockRelease(CommitTsControlLock);
659 /* We can now replay xlog records from this module */
660 enable_during_recovery = true;
664 * Deactivate this module.
666 * This must be called when the track_commit_timestamp parameter is turned off.
667 * This happens during postmaster or standalone-backend startup, or during WAL
670 * Resets CommitTs into invalid state to make sure we don't hand back
671 * possibly-invalid data; also removes segments of old data.
674 DeactivateCommitTs(bool do_wal)
676 TransactionId xid = ShmemVariableCache->nextXid;
677 int pageno = TransactionIdToCTsPage(xid);
680 * Re-Initialize our idea of the latest page number.
682 LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE);
683 CommitTsCtl->shared->latest_page_number = pageno;
684 LWLockRelease(CommitTsControlLock);
686 LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
687 ShmemVariableCache->oldestCommitTs = InvalidTransactionId;
688 ShmemVariableCache->newestCommitTs = InvalidTransactionId;
689 LWLockRelease(CommitTsLock);
692 * Remove *all* files. This is necessary so that there are no leftover
693 * files; in the case where this feature is later enabled after running
694 * with it disabled for some time there may be a gap in the file sequence.
695 * (We can probably tolerate out-of-sequence files, as they are going to
696 * be overwritten anyway when we wrap around, but it seems better to be
699 (void) SlruScanDirectory(CommitTsCtl, SlruScanDirCbDeleteAll, NULL);
701 /* No longer enabled on recovery */
702 enable_during_recovery = false;
706 * This must be called ONCE during postmaster or standalone-backend shutdown
709 ShutdownCommitTs(void)
711 /* Flush dirty CommitTs pages to disk */
712 SimpleLruFlush(CommitTsCtl, false);
716 * Perform a checkpoint --- either during shutdown, or on-the-fly
719 CheckPointCommitTs(void)
721 /* Flush dirty CommitTs pages to disk */
722 SimpleLruFlush(CommitTsCtl, true);
726 * Make sure that CommitTs has room for a newly-allocated XID.
728 * NB: this is called while holding XidGenLock. We want it to be very fast
729 * most of the time; even when it's not so fast, no actual I/O need happen
730 * unless we're forced to write out a dirty CommitTs or xlog page to make room
733 * NB: the current implementation relies on track_commit_timestamp being
737 ExtendCommitTs(TransactionId newestXact)
741 /* nothing to do if module not enabled */
742 if (!track_commit_timestamp && !enable_during_recovery)
746 * No work except at first XID of a page. But beware: just after
747 * wraparound, the first XID of page zero is FirstNormalTransactionId.
749 if (TransactionIdToCTsEntry(newestXact) != 0 &&
750 !TransactionIdEquals(newestXact, FirstNormalTransactionId))
753 pageno = TransactionIdToCTsPage(newestXact);
755 LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE);
757 /* Zero the page and make an XLOG entry about it */
758 ZeroCommitTsPage(pageno, !InRecovery);
760 LWLockRelease(CommitTsControlLock);
764 * Remove all CommitTs segments before the one holding the passed
767 * Note that we don't need to flush XLOG here.
770 TruncateCommitTs(TransactionId oldestXact, bool do_wal)
775 * The cutoff point is the start of the segment containing oldestXact. We
776 * pass the *page* containing oldestXact to SimpleLruTruncate.
778 cutoffPage = TransactionIdToCTsPage(oldestXact);
780 /* Check to see if there's any files that could be removed */
781 if (!SlruScanDirectory(CommitTsCtl, SlruScanDirCbReportPresence,
783 return; /* nothing to remove */
785 /* Write XLOG record */
787 WriteTruncateXlogRec(cutoffPage);
789 /* Now we can remove the old CommitTs segment(s) */
790 SimpleLruTruncate(CommitTsCtl, cutoffPage);
794 * Set the limit values between which commit TS can be consulted.
797 SetCommitTsLimit(TransactionId oldestXact, TransactionId newestXact)
800 * Be careful not to overwrite values that are either further into the
801 * "future" or signal a disabled committs.
803 LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
804 if (ShmemVariableCache->oldestCommitTs != InvalidTransactionId)
806 if (TransactionIdPrecedes(ShmemVariableCache->oldestCommitTs, oldestXact))
807 ShmemVariableCache->oldestCommitTs = oldestXact;
808 if (TransactionIdPrecedes(newestXact, ShmemVariableCache->newestCommitTs))
809 ShmemVariableCache->newestCommitTs = newestXact;
813 Assert(ShmemVariableCache->newestCommitTs == InvalidTransactionId);
815 LWLockRelease(CommitTsLock);
819 * Move forwards the oldest commitTS value that can be consulted
822 AdvanceOldestCommitTs(TransactionId oldestXact)
824 LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
825 if (ShmemVariableCache->oldestCommitTs != InvalidTransactionId &&
826 TransactionIdPrecedes(ShmemVariableCache->oldestCommitTs, oldestXact))
827 ShmemVariableCache->oldestCommitTs = oldestXact;
828 LWLockRelease(CommitTsLock);
833 * Decide which of two CLOG page numbers is "older" for truncation purposes.
835 * We need to use comparison of TransactionIds here in order to do the right
836 * thing with wraparound XID arithmetic. However, if we are asked about
837 * page number zero, we don't want to hand InvalidTransactionId to
838 * TransactionIdPrecedes: it'll get weird about permanent xact IDs. So,
839 * offset both xids by FirstNormalTransactionId to avoid that.
842 CommitTsPagePrecedes(int page1, int page2)
847 xid1 = ((TransactionId) page1) * COMMIT_TS_XACTS_PER_PAGE;
848 xid1 += FirstNormalTransactionId;
849 xid2 = ((TransactionId) page2) * COMMIT_TS_XACTS_PER_PAGE;
850 xid2 += FirstNormalTransactionId;
852 return TransactionIdPrecedes(xid1, xid2);
857 * Write a ZEROPAGE xlog record
860 WriteZeroPageXlogRec(int pageno)
863 XLogRegisterData((char *) (&pageno), sizeof(int));
864 (void) XLogInsert(RM_COMMIT_TS_ID, COMMIT_TS_ZEROPAGE);
868 * Write a TRUNCATE xlog record
871 WriteTruncateXlogRec(int pageno)
874 XLogRegisterData((char *) (&pageno), sizeof(int));
875 (void) XLogInsert(RM_COMMIT_TS_ID, COMMIT_TS_TRUNCATE);
879 * Write a SETTS xlog record
882 WriteSetTimestampXlogRec(TransactionId mainxid, int nsubxids,
883 TransactionId *subxids, TimestampTz timestamp,
886 xl_commit_ts_set record;
888 record.timestamp = timestamp;
889 record.nodeid = nodeid;
890 record.mainxid = mainxid;
893 XLogRegisterData((char *) &record,
894 offsetof(xl_commit_ts_set, mainxid) +
895 sizeof(TransactionId));
896 XLogRegisterData((char *) subxids, nsubxids * sizeof(TransactionId));
897 XLogInsert(RM_COMMIT_TS_ID, COMMIT_TS_SETTS);
901 * CommitTS resource manager's routines
904 commit_ts_redo(XLogReaderState *record)
906 uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
908 /* Backup blocks are not used in commit_ts records */
909 Assert(!XLogRecHasAnyBlockRefs(record));
911 if (info == COMMIT_TS_ZEROPAGE)
916 memcpy(&pageno, XLogRecGetData(record), sizeof(int));
918 LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE);
920 slotno = ZeroCommitTsPage(pageno, false);
921 SimpleLruWritePage(CommitTsCtl, slotno);
922 Assert(!CommitTsCtl->shared->page_dirty[slotno]);
924 LWLockRelease(CommitTsControlLock);
926 else if (info == COMMIT_TS_TRUNCATE)
930 memcpy(&pageno, XLogRecGetData(record), sizeof(int));
933 * During XLOG replay, latest_page_number isn't set up yet; insert a
934 * suitable value to bypass the sanity test in SimpleLruTruncate.
936 CommitTsCtl->shared->latest_page_number = pageno;
938 SimpleLruTruncate(CommitTsCtl, pageno);
940 else if (info == COMMIT_TS_SETTS)
942 xl_commit_ts_set *setts = (xl_commit_ts_set *) XLogRecGetData(record);
944 TransactionId *subxids;
946 nsubxids = ((XLogRecGetDataLen(record) - SizeOfCommitTsSet) /
947 sizeof(TransactionId));
950 subxids = palloc(sizeof(TransactionId) * nsubxids);
952 XLogRecGetData(record) + SizeOfCommitTsSet,
953 sizeof(TransactionId) * nsubxids);
958 TransactionTreeSetCommitTsData(setts->mainxid, nsubxids, subxids,
959 setts->timestamp, setts->nodeid, true);
964 elog(PANIC, "commit_ts_redo: unknown op code %u", info);