1 /*-------------------------------------------------------------------------
4 * PostgreSQL commit timestamp manager
6 * This module is a pg_clog-like system that stores the commit timestamp
7 * for each transaction.
9 * XLOG interactions: this module generates an XLOG record whenever a new
10 * CommitTs page is initialized to zeroes. Also, one XLOG record is
11 * generated for setting of values when the caller requests it; this allows
12 * us to support values coming from places other than transaction commit.
13 * Other writes of CommitTS come from recording of transaction commit in
14 * xact.c, which generates its own XLOG records for these events and will
15 * re-perform the status update on redo; so we need make no additional XLOG
18 * Portions Copyright (c) 1996-2015, PostgreSQL Global Development Group
19 * Portions Copyright (c) 1994, Regents of the University of California
21 * src/backend/access/transam/commit_ts.c
23 *-------------------------------------------------------------------------
27 #include "access/commit_ts.h"
28 #include "access/htup_details.h"
29 #include "access/slru.h"
30 #include "access/transam.h"
31 #include "catalog/pg_type.h"
33 #include "miscadmin.h"
35 #include "utils/builtins.h"
36 #include "utils/snapmgr.h"
37 #include "utils/timestamp.h"
40 * Defines for CommitTs page sizes. A page is the same BLCKSZ as is used
41 * everywhere else in Postgres.
43 * Note: because TransactionIds are 32 bits and wrap around at 0xFFFFFFFF,
44 * CommitTs page numbering also wraps around at
45 * 0xFFFFFFFF/COMMIT_TS_XACTS_PER_PAGE, and CommitTs segment numbering at
46 * 0xFFFFFFFF/COMMIT_TS_XACTS_PER_PAGE/SLRU_PAGES_PER_SEGMENT. We need take no
47 * explicit notice of that fact in this module, except when comparing segment
48 * and page numbers in TruncateCommitTs (see CommitTsPagePrecedes).
52 * We need 8+4 bytes per xact. Note that enlarging this struct might mean
53 * the largest possible file name is more than 5 chars long; see
56 typedef struct CommitTimestampEntry
59 CommitTsNodeId nodeid;
60 } CommitTimestampEntry;
62 #define SizeOfCommitTimestampEntry (offsetof(CommitTimestampEntry, nodeid) + \
63 sizeof(CommitTsNodeId))
65 #define COMMIT_TS_XACTS_PER_PAGE \
66 (BLCKSZ / SizeOfCommitTimestampEntry)
68 #define TransactionIdToCTsPage(xid) \
69 ((xid) / (TransactionId) COMMIT_TS_XACTS_PER_PAGE)
70 #define TransactionIdToCTsEntry(xid) \
71 ((xid) % (TransactionId) COMMIT_TS_XACTS_PER_PAGE)
74 * Link to shared-memory data structures for CommitTs control
76 static SlruCtlData CommitTsCtlData;
78 #define CommitTsCtl (&CommitTsCtlData)
81 * We keep a cache of the last value set in shared memory. This is protected
84 typedef struct CommitTimestampShared
86 TransactionId xidLastCommit;
87 CommitTimestampEntry dataLastCommit;
88 } CommitTimestampShared;
90 CommitTimestampShared *commitTsShared;
94 bool track_commit_timestamp;
96 static CommitTsNodeId default_node_id = InvalidCommitTsNodeId;
98 static void SetXidCommitTsInPage(TransactionId xid, int nsubxids,
99 TransactionId *subxids, TimestampTz ts,
100 CommitTsNodeId nodeid, int pageno);
101 static void TransactionIdSetCommitTs(TransactionId xid, TimestampTz ts,
102 CommitTsNodeId nodeid, int slotno);
103 static int ZeroCommitTsPage(int pageno, bool writeXlog);
104 static bool CommitTsPagePrecedes(int page1, int page2);
105 static void WriteZeroPageXlogRec(int pageno);
106 static void WriteTruncateXlogRec(int pageno);
107 static void WriteSetTimestampXlogRec(TransactionId mainxid, int nsubxids,
108 TransactionId *subxids, TimestampTz timestamp,
109 CommitTsNodeId nodeid);
113 * CommitTsSetDefaultNodeId
115 * Set default nodeid for current backend.
118 CommitTsSetDefaultNodeId(CommitTsNodeId nodeid)
120 default_node_id = nodeid;
124 * CommitTsGetDefaultNodeId
126 * Set default nodeid for current backend.
129 CommitTsGetDefaultNodeId(void)
131 return default_node_id;
135 * TransactionTreeSetCommitTsData
137 * Record the final commit timestamp of transaction entries in the commit log
138 * for a transaction and its subtransaction tree, as efficiently as possible.
140 * xid is the top level transaction id.
142 * subxids is an array of xids of length nsubxids, representing subtransactions
143 * in the tree of xid. In various cases nsubxids may be zero.
144 * The reason why tracking just the parent xid commit timestamp is not enough
145 * is that the subtrans SLRU does not stay valid across crashes (it's not
146 * permanent) so we need to keep the information about them here. If the
147 * subtrans implementation changes in the future, we might want to revisit the
148 * decision of storing timestamp info for each subxid.
150 * The do_xlog parameter tells us whether to include a XLog record of this
151 * or not. Normal path through RecordTransactionCommit() will be related
152 * to a transaction commit XLog record, and so should pass "false" here.
153 * Other callers probably want to pass true, so that the given values persist
154 * in case of crashes.
157 TransactionTreeSetCommitTsData(TransactionId xid, int nsubxids,
158 TransactionId *subxids, TimestampTz timestamp,
159 CommitTsNodeId nodeid, bool do_xlog)
162 TransactionId headxid;
163 TransactionId newestXact;
165 if (!track_commit_timestamp)
169 * Comply with the WAL-before-data rule: if caller specified it wants
170 * this value to be recorded in WAL, do so before touching the data.
173 WriteSetTimestampXlogRec(xid, nsubxids, subxids, timestamp, nodeid);
176 * Figure out the latest Xid in this batch: either the last subxid if
177 * there's any, otherwise the parent xid.
180 newestXact = subxids[nsubxids - 1];
185 * We split the xids to set the timestamp to in groups belonging to the
186 * same SLRU page; the first element in each such set is its head. The
187 * first group has the main XID as the head; subsequent sets use the
188 * first subxid not on the previous page as head. This way, we only have
189 * to lock/modify each SLRU page once.
191 for (i = 0, headxid = xid;;)
193 int pageno = TransactionIdToCTsPage(headxid);
196 for (j = i; j < nsubxids; j++)
198 if (TransactionIdToCTsPage(subxids[j]) != pageno)
201 /* subxids[i..j] are on the same page as the head */
203 SetXidCommitTsInPage(headxid, j - i, subxids + i, timestamp, nodeid,
206 /* if we wrote out all subxids, we're done. */
207 if (j + 1 >= nsubxids)
211 * Set the new head and skip over it, as well as over the subxids
214 headxid = subxids[j];
218 /* update the cached value in shared memory */
219 LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
220 commitTsShared->xidLastCommit = xid;
221 commitTsShared->dataLastCommit.time = timestamp;
222 commitTsShared->dataLastCommit.nodeid = nodeid;
224 /* and move forwards our endpoint, if needed */
225 if (TransactionIdPrecedes(ShmemVariableCache->newestCommitTs, newestXact))
226 ShmemVariableCache->newestCommitTs = newestXact;
227 LWLockRelease(CommitTsLock);
231 * Record the commit timestamp of transaction entries in the commit log for all
232 * entries on a single page. Atomic only on this page.
235 SetXidCommitTsInPage(TransactionId xid, int nsubxids,
236 TransactionId *subxids, TimestampTz ts,
237 CommitTsNodeId nodeid, int pageno)
242 LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE);
244 slotno = SimpleLruReadPage(CommitTsCtl, pageno, true, xid);
246 TransactionIdSetCommitTs(xid, ts, nodeid, slotno);
247 for (i = 0; i < nsubxids; i++)
248 TransactionIdSetCommitTs(subxids[i], ts, nodeid, slotno);
250 CommitTsCtl->shared->page_dirty[slotno] = true;
252 LWLockRelease(CommitTsControlLock);
256 * Sets the commit timestamp of a single transaction.
258 * Must be called with CommitTsControlLock held
261 TransactionIdSetCommitTs(TransactionId xid, TimestampTz ts,
262 CommitTsNodeId nodeid, int slotno)
264 int entryno = TransactionIdToCTsEntry(xid);
265 CommitTimestampEntry entry;
267 Assert(TransactionIdIsNormal(xid));
270 entry.nodeid = nodeid;
272 memcpy(CommitTsCtl->shared->page_buffer[slotno] +
273 SizeOfCommitTimestampEntry * entryno,
274 &entry, SizeOfCommitTimestampEntry);
278 * Interrogate the commit timestamp of a transaction.
280 * Return value indicates whether commit timestamp record was found for
284 TransactionIdGetCommitTsData(TransactionId xid, TimestampTz *ts,
285 CommitTsNodeId *nodeid)
287 int pageno = TransactionIdToCTsPage(xid);
288 int entryno = TransactionIdToCTsEntry(xid);
290 CommitTimestampEntry entry;
291 TransactionId oldestCommitTs;
292 TransactionId newestCommitTs;
294 /* Error if module not enabled */
295 if (!track_commit_timestamp)
297 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
298 errmsg("could not get commit timestamp data"),
299 errhint("Make sure the configuration parameter \"%s\" is set.",
300 "track_commit_timestamp")));
302 /* error if the given Xid doesn't normally commit */
303 if (!TransactionIdIsNormal(xid))
305 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
306 errmsg("cannot retrieve commit timestamp for transaction %u", xid)));
309 * Return empty if the requested value is outside our valid range.
311 LWLockAcquire(CommitTsLock, LW_SHARED);
312 oldestCommitTs = ShmemVariableCache->oldestCommitTs;
313 newestCommitTs = ShmemVariableCache->newestCommitTs;
314 /* neither is invalid, or both are */
315 Assert(TransactionIdIsValid(oldestCommitTs) == TransactionIdIsValid(newestCommitTs));
316 LWLockRelease(CommitTsLock);
318 if (!TransactionIdIsValid(oldestCommitTs) ||
319 TransactionIdPrecedes(xid, oldestCommitTs) ||
320 TransactionIdPrecedes(newestCommitTs, xid))
325 *nodeid = InvalidCommitTsNodeId;
330 * Use an unlocked atomic read on our cached value in shared memory; if
331 * it's a hit, acquire a lock and read the data, after verifying that it's
332 * still what we initially read. Otherwise, fall through to read from
335 if (commitTsShared->xidLastCommit == xid)
337 LWLockAcquire(CommitTsLock, LW_SHARED);
338 if (commitTsShared->xidLastCommit == xid)
341 *ts = commitTsShared->dataLastCommit.time;
343 *nodeid = commitTsShared->dataLastCommit.nodeid;
345 LWLockRelease(CommitTsLock);
348 LWLockRelease(CommitTsLock);
351 /* lock is acquired by SimpleLruReadPage_ReadOnly */
352 slotno = SimpleLruReadPage_ReadOnly(CommitTsCtl, pageno, xid);
354 CommitTsCtl->shared->page_buffer[slotno] +
355 SizeOfCommitTimestampEntry * entryno,
356 SizeOfCommitTimestampEntry);
361 *nodeid = entry.nodeid;
363 LWLockRelease(CommitTsControlLock);
368 * Return the Xid of the latest committed transaction. (As far as this module
369 * is concerned, anyway; it's up to the caller to ensure the value is useful
372 * ts and extra are filled with the corresponding data; they can be passed
373 * as NULL if not wanted.
376 GetLatestCommitTsData(TimestampTz *ts, CommitTsNodeId *nodeid)
380 /* Error if module not enabled */
381 if (!track_commit_timestamp)
383 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
384 errmsg("could not get commit timestamp data"),
385 errhint("Make sure the configuration parameter \"%s\" is set.",
386 "track_commit_timestamp")));
388 LWLockAcquire(CommitTsLock, LW_SHARED);
389 xid = commitTsShared->xidLastCommit;
391 *ts = commitTsShared->dataLastCommit.time;
393 *nodeid = commitTsShared->dataLastCommit.nodeid;
394 LWLockRelease(CommitTsLock);
400 * SQL-callable wrapper to obtain commit time of a transaction
403 pg_xact_commit_timestamp(PG_FUNCTION_ARGS)
405 TransactionId xid = PG_GETARG_UINT32(0);
409 found = TransactionIdGetCommitTsData(xid, &ts, NULL);
414 PG_RETURN_TIMESTAMPTZ(ts);
419 pg_last_committed_xact(PG_FUNCTION_ARGS)
428 /* and construct a tuple with our data */
429 xid = GetLatestCommitTsData(&ts, NULL);
432 * Construct a tuple descriptor for the result row. This must match this
433 * function's pg_proc entry!
435 tupdesc = CreateTemplateTupleDesc(2, false);
436 TupleDescInitEntry(tupdesc, (AttrNumber) 1, "xid",
438 TupleDescInitEntry(tupdesc, (AttrNumber) 2, "timestamp",
439 TIMESTAMPTZOID, -1, 0);
440 tupdesc = BlessTupleDesc(tupdesc);
442 if (!TransactionIdIsNormal(xid))
444 memset(nulls, true, sizeof(nulls));
448 values[0] = TransactionIdGetDatum(xid);
451 values[1] = TimestampTzGetDatum(ts);
455 htup = heap_form_tuple(tupdesc, values, nulls);
457 PG_RETURN_DATUM(HeapTupleGetDatum(htup));
462 * Number of shared CommitTS buffers.
464 * We use a very similar logic as for the number of CLOG buffers; see comments
465 * in CLOGShmemBuffers.
468 CommitTsShmemBuffers(void)
470 return Min(16, Max(4, NBuffers / 1024));
474 * Shared memory sizing for CommitTs
477 CommitTsShmemSize(void)
479 return SimpleLruShmemSize(CommitTsShmemBuffers(), 0) +
480 sizeof(CommitTimestampShared);
484 * Initialize CommitTs at system startup (postmaster start or standalone
488 CommitTsShmemInit(void)
492 CommitTsCtl->PagePrecedes = CommitTsPagePrecedes;
493 SimpleLruInit(CommitTsCtl, "CommitTs Ctl", CommitTsShmemBuffers(), 0,
494 CommitTsControlLock, "pg_commit_ts");
496 commitTsShared = ShmemInitStruct("CommitTs shared",
497 sizeof(CommitTimestampShared),
500 if (!IsUnderPostmaster)
504 commitTsShared->xidLastCommit = InvalidTransactionId;
505 TIMESTAMP_NOBEGIN(commitTsShared->dataLastCommit.time);
506 commitTsShared->dataLastCommit.nodeid = InvalidCommitTsNodeId;
513 * This function must be called ONCE on system install.
515 * (The CommitTs directory is assumed to have been created by initdb, and
516 * CommitTsShmemInit must have been called already.)
519 BootStrapCommitTs(void)
522 * Nothing to do here at present, unlike most other SLRU modules; segments
523 * are created when the server is started with this module enabled.
524 * See StartupCommitTs.
529 * Initialize (or reinitialize) a page of CommitTs to zeroes.
530 * If writeXlog is TRUE, also emit an XLOG record saying we did this.
532 * The page is not actually written, just set up in shared memory.
533 * The slot number of the new page is returned.
535 * Control lock must be held at entry, and will be held at exit.
538 ZeroCommitTsPage(int pageno, bool writeXlog)
542 slotno = SimpleLruZeroPage(CommitTsCtl, pageno);
545 WriteZeroPageXlogRec(pageno);
551 * This must be called ONCE during postmaster or standalone-backend startup,
552 * after StartupXLOG has initialized ShmemVariableCache->nextXid.
555 StartupCommitTs(void)
557 TransactionId xid = ShmemVariableCache->nextXid;
558 int pageno = TransactionIdToCTsPage(xid);
560 if (track_commit_timestamp)
566 LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE);
569 * Initialize our idea of the latest page number.
571 CommitTsCtl->shared->latest_page_number = pageno;
573 LWLockRelease(CommitTsControlLock);
577 * This must be called ONCE during postmaster or standalone-backend startup,
578 * when commit timestamp is enabled, after recovery has finished.
581 CompleteCommitTsInitialization(void)
583 if (!track_commit_timestamp)
584 DeactivateCommitTs(true);
588 * Activate this module whenever necessary.
589 * This must happen during postmaster or standalong-backend startup,
590 * or during WAL replay anytime the track_commit_timestamp setting is
591 * changed in the master.
593 * The reason why this SLRU needs separate activation/deactivation functions is
594 * that it can be enabled/disabled during start and the activation/deactivation
595 * on master is propagated to slave via replay. Other SLRUs don't have this
596 * property and they can be just initialized during normal startup.
598 * This is in charge of creating the currently active segment, if it's not
599 * already there. The reason for this is that the server might have been
600 * running with this module disabled for a while and thus might have skipped
601 * the normal creation point.
604 ActivateCommitTs(void)
606 TransactionId xid = ShmemVariableCache->nextXid;
607 int pageno = TransactionIdToCTsPage(xid);
610 * Re-Initialize our idea of the latest page number.
612 LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE);
613 CommitTsCtl->shared->latest_page_number = pageno;
614 LWLockRelease(CommitTsControlLock);
617 * If CommitTs is enabled, but it wasn't in the previous server run, we
618 * need to set the oldest and newest values to the next Xid; that way, we
619 * will not try to read data that might not have been set.
621 * XXX does this have a problem if a server is started with commitTs
622 * enabled, then started with commitTs disabled, then restarted with it
623 * enabled again? It doesn't look like it does, because there should be a
624 * checkpoint that sets the value to InvalidTransactionId at end of
625 * recovery; and so any chance of injecting new transactions without
626 * CommitTs values would occur after the oldestCommitTs has been set to
627 * Invalid temporarily.
629 LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
630 if (ShmemVariableCache->oldestCommitTs == InvalidTransactionId)
632 ShmemVariableCache->oldestCommitTs =
633 ShmemVariableCache->newestCommitTs = ReadNewTransactionId();
635 LWLockRelease(CommitTsLock);
637 /* Finally, create the current segment file, if necessary */
638 if (!SimpleLruDoesPhysicalPageExist(CommitTsCtl, pageno))
642 LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE);
643 slotno = ZeroCommitTsPage(pageno, false);
644 SimpleLruWritePage(CommitTsCtl, slotno);
645 Assert(!CommitTsCtl->shared->page_dirty[slotno]);
646 LWLockRelease(CommitTsControlLock);
651 * Deactivate this module.
653 * This must be called when the track_commit_timestamp parameter is turned off.
654 * This happens during postmaster or standalone-backend startup, or during WAL
657 * Resets CommitTs into invalid state to make sure we don't hand back
658 * possibly-invalid data; also removes segments of old data.
661 DeactivateCommitTs(bool do_wal)
663 TransactionId xid = ShmemVariableCache->nextXid;
664 int pageno = TransactionIdToCTsPage(xid);
667 * Re-Initialize our idea of the latest page number.
669 LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE);
670 CommitTsCtl->shared->latest_page_number = pageno;
671 LWLockRelease(CommitTsControlLock);
673 LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
674 ShmemVariableCache->oldestCommitTs = InvalidTransactionId;
675 ShmemVariableCache->newestCommitTs = InvalidTransactionId;
676 LWLockRelease(CommitTsLock);
678 TruncateCommitTs(ReadNewTransactionId(), do_wal);
682 * This must be called ONCE during postmaster or standalone-backend shutdown
685 ShutdownCommitTs(void)
687 /* Flush dirty CommitTs pages to disk */
688 SimpleLruFlush(CommitTsCtl, false);
692 * Perform a checkpoint --- either during shutdown, or on-the-fly
695 CheckPointCommitTs(void)
697 /* Flush dirty CommitTs pages to disk */
698 SimpleLruFlush(CommitTsCtl, true);
702 * Make sure that CommitTs has room for a newly-allocated XID.
704 * NB: this is called while holding XidGenLock. We want it to be very fast
705 * most of the time; even when it's not so fast, no actual I/O need happen
706 * unless we're forced to write out a dirty CommitTs or xlog page to make room
709 * NB: the current implementation relies on track_commit_timestamp being
713 ExtendCommitTs(TransactionId newestXact)
717 /* nothing to do if module not enabled */
718 if (!track_commit_timestamp)
722 * No work except at first XID of a page. But beware: just after
723 * wraparound, the first XID of page zero is FirstNormalTransactionId.
725 if (TransactionIdToCTsEntry(newestXact) != 0 &&
726 !TransactionIdEquals(newestXact, FirstNormalTransactionId))
729 pageno = TransactionIdToCTsPage(newestXact);
731 LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE);
733 /* Zero the page and make an XLOG entry about it */
734 ZeroCommitTsPage(pageno, !InRecovery);
736 LWLockRelease(CommitTsControlLock);
740 * Remove all CommitTs segments before the one holding the passed
743 * Note that we don't need to flush XLOG here.
746 TruncateCommitTs(TransactionId oldestXact, bool do_wal)
751 * The cutoff point is the start of the segment containing oldestXact. We
752 * pass the *page* containing oldestXact to SimpleLruTruncate.
754 cutoffPage = TransactionIdToCTsPage(oldestXact);
756 /* Check to see if there's any files that could be removed */
757 if (!SlruScanDirectory(CommitTsCtl, SlruScanDirCbReportPresence,
759 return; /* nothing to remove */
761 /* Write XLOG record */
763 WriteTruncateXlogRec(cutoffPage);
765 /* Now we can remove the old CommitTs segment(s) */
766 SimpleLruTruncate(CommitTsCtl, cutoffPage);
770 * Set the limit values between which commit TS can be consulted.
773 SetCommitTsLimit(TransactionId oldestXact, TransactionId newestXact)
776 * Be careful not to overwrite values that are either further into the
777 * "future" or signal a disabled committs.
779 LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
780 if (ShmemVariableCache->oldestCommitTs != InvalidTransactionId)
782 if (TransactionIdPrecedes(ShmemVariableCache->oldestCommitTs, oldestXact))
783 ShmemVariableCache->oldestCommitTs = oldestXact;
784 if (TransactionIdPrecedes(newestXact, ShmemVariableCache->newestCommitTs))
785 ShmemVariableCache->newestCommitTs = newestXact;
789 Assert(ShmemVariableCache->newestCommitTs == InvalidTransactionId);
791 LWLockRelease(CommitTsLock);
795 * Move forwards the oldest commitTS value that can be consulted
798 AdvanceOldestCommitTs(TransactionId oldestXact)
800 LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
801 if (ShmemVariableCache->oldestCommitTs != InvalidTransactionId &&
802 TransactionIdPrecedes(ShmemVariableCache->oldestCommitTs, oldestXact))
803 ShmemVariableCache->oldestCommitTs = oldestXact;
804 LWLockRelease(CommitTsLock);
809 * Decide which of two CLOG page numbers is "older" for truncation purposes.
811 * We need to use comparison of TransactionIds here in order to do the right
812 * thing with wraparound XID arithmetic. However, if we are asked about
813 * page number zero, we don't want to hand InvalidTransactionId to
814 * TransactionIdPrecedes: it'll get weird about permanent xact IDs. So,
815 * offset both xids by FirstNormalTransactionId to avoid that.
818 CommitTsPagePrecedes(int page1, int page2)
823 xid1 = ((TransactionId) page1) * COMMIT_TS_XACTS_PER_PAGE;
824 xid1 += FirstNormalTransactionId;
825 xid2 = ((TransactionId) page2) * COMMIT_TS_XACTS_PER_PAGE;
826 xid2 += FirstNormalTransactionId;
828 return TransactionIdPrecedes(xid1, xid2);
833 * Write a ZEROPAGE xlog record
836 WriteZeroPageXlogRec(int pageno)
839 XLogRegisterData((char *) (&pageno), sizeof(int));
840 (void) XLogInsert(RM_COMMIT_TS_ID, COMMIT_TS_ZEROPAGE);
844 * Write a TRUNCATE xlog record
847 WriteTruncateXlogRec(int pageno)
850 XLogRegisterData((char *) (&pageno), sizeof(int));
851 (void) XLogInsert(RM_COMMIT_TS_ID, COMMIT_TS_TRUNCATE);
855 * Write a SETTS xlog record
858 WriteSetTimestampXlogRec(TransactionId mainxid, int nsubxids,
859 TransactionId *subxids, TimestampTz timestamp,
860 CommitTsNodeId nodeid)
862 xl_commit_ts_set record;
864 record.timestamp = timestamp;
865 record.nodeid = nodeid;
866 record.mainxid = mainxid;
869 XLogRegisterData((char *) &record,
870 offsetof(xl_commit_ts_set, mainxid) +
871 sizeof(TransactionId));
872 XLogRegisterData((char *) subxids, nsubxids * sizeof(TransactionId));
873 XLogInsert(RM_COMMIT_TS_ID, COMMIT_TS_SETTS);
877 * CommitTS resource manager's routines
880 commit_ts_redo(XLogReaderState *record)
882 uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
884 /* Backup blocks are not used in commit_ts records */
885 Assert(!XLogRecHasAnyBlockRefs(record));
887 if (info == COMMIT_TS_ZEROPAGE)
892 memcpy(&pageno, XLogRecGetData(record), sizeof(int));
894 LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE);
896 slotno = ZeroCommitTsPage(pageno, false);
897 SimpleLruWritePage(CommitTsCtl, slotno);
898 Assert(!CommitTsCtl->shared->page_dirty[slotno]);
900 LWLockRelease(CommitTsControlLock);
902 else if (info == COMMIT_TS_TRUNCATE)
906 memcpy(&pageno, XLogRecGetData(record), sizeof(int));
909 * During XLOG replay, latest_page_number isn't set up yet; insert a
910 * suitable value to bypass the sanity test in SimpleLruTruncate.
912 CommitTsCtl->shared->latest_page_number = pageno;
914 SimpleLruTruncate(CommitTsCtl, pageno);
916 else if (info == COMMIT_TS_SETTS)
918 xl_commit_ts_set *setts = (xl_commit_ts_set *) XLogRecGetData(record);
920 TransactionId *subxids;
922 nsubxids = ((XLogRecGetDataLen(record) - SizeOfCommitTsSet) /
923 sizeof(TransactionId));
926 subxids = palloc(sizeof(TransactionId) * nsubxids);
928 XLogRecGetData(record) + SizeOfCommitTsSet,
929 sizeof(TransactionId) * nsubxids);
934 TransactionTreeSetCommitTsData(setts->mainxid, nsubxids, subxids,
935 setts->timestamp, setts->nodeid, false);
940 elog(PANIC, "commit_ts_redo: unknown op code %u", info);