1 /*-------------------------------------------------------------------------
4 * PostgreSQL commit timestamp manager
6 * This module is a pg_clog-like system that stores the commit timestamp
7 * for each transaction.
9 * XLOG interactions: this module generates an XLOG record whenever a new
10 * CommitTs page is initialized to zeroes. Also, one XLOG record is
11 * generated for setting of values when the caller requests it; this allows
12 * us to support values coming from places other than transaction commit.
13 * Other writes of CommitTS come from recording of transaction commit in
14 * xact.c, which generates its own XLOG records for these events and will
15 * re-perform the status update on redo; so we need make no additional XLOG
18 * Portions Copyright (c) 1996-2015, PostgreSQL Global Development Group
19 * Portions Copyright (c) 1994, Regents of the University of California
21 * src/backend/access/transam/commit_ts.c
23 *-------------------------------------------------------------------------
27 #include "access/commit_ts.h"
28 #include "access/htup_details.h"
29 #include "access/slru.h"
30 #include "access/transam.h"
31 #include "catalog/pg_type.h"
33 #include "miscadmin.h"
35 #include "utils/builtins.h"
36 #include "utils/snapmgr.h"
37 #include "utils/timestamp.h"
40 * Defines for CommitTs page sizes. A page is the same BLCKSZ as is used
41 * everywhere else in Postgres.
43 * Note: because TransactionIds are 32 bits and wrap around at 0xFFFFFFFF,
44 * CommitTs page numbering also wraps around at
45 * 0xFFFFFFFF/COMMIT_TS_XACTS_PER_PAGE, and CommitTs segment numbering at
46 * 0xFFFFFFFF/COMMIT_TS_XACTS_PER_PAGE/SLRU_PAGES_PER_SEGMENT. We need take no
47 * explicit notice of that fact in this module, except when comparing segment
48 * and page numbers in TruncateCommitTs (see CommitTsPagePrecedes).
52 * We need 8+2 bytes per xact. Note that enlarging this struct might mean
53 * the largest possible file name is more than 5 chars long; see
56 typedef struct CommitTimestampEntry
60 } CommitTimestampEntry;
62 #define SizeOfCommitTimestampEntry (offsetof(CommitTimestampEntry, nodeid) + \
65 #define COMMIT_TS_XACTS_PER_PAGE \
66 (BLCKSZ / SizeOfCommitTimestampEntry)
68 #define TransactionIdToCTsPage(xid) \
69 ((xid) / (TransactionId) COMMIT_TS_XACTS_PER_PAGE)
70 #define TransactionIdToCTsEntry(xid) \
71 ((xid) % (TransactionId) COMMIT_TS_XACTS_PER_PAGE)
74 * Link to shared-memory data structures for CommitTs control
76 static SlruCtlData CommitTsCtlData;
78 #define CommitTsCtl (&CommitTsCtlData)
81 * We keep a cache of the last value set in shared memory. This is protected
84 typedef struct CommitTimestampShared
86 TransactionId xidLastCommit;
87 CommitTimestampEntry dataLastCommit;
88 } CommitTimestampShared;
90 CommitTimestampShared *commitTsShared;
94 bool track_commit_timestamp;
96 static void SetXidCommitTsInPage(TransactionId xid, int nsubxids,
97 TransactionId *subxids, TimestampTz ts,
98 RepOriginId nodeid, int pageno);
99 static void TransactionIdSetCommitTs(TransactionId xid, TimestampTz ts,
100 RepOriginId nodeid, int slotno);
101 static int ZeroCommitTsPage(int pageno, bool writeXlog);
102 static bool CommitTsPagePrecedes(int page1, int page2);
103 static void WriteZeroPageXlogRec(int pageno);
104 static void WriteTruncateXlogRec(int pageno);
105 static void WriteSetTimestampXlogRec(TransactionId mainxid, int nsubxids,
106 TransactionId *subxids, TimestampTz timestamp,
110 * TransactionTreeSetCommitTsData
112 * Record the final commit timestamp of transaction entries in the commit log
113 * for a transaction and its subtransaction tree, as efficiently as possible.
115 * xid is the top level transaction id.
117 * subxids is an array of xids of length nsubxids, representing subtransactions
118 * in the tree of xid. In various cases nsubxids may be zero.
119 * The reason why tracking just the parent xid commit timestamp is not enough
120 * is that the subtrans SLRU does not stay valid across crashes (it's not
121 * permanent) so we need to keep the information about them here. If the
122 * subtrans implementation changes in the future, we might want to revisit the
123 * decision of storing timestamp info for each subxid.
125 * The replaying_xlog parameter indicates whether the module should execute
126 * its write even if the feature is nominally disabled, because we're replaying
127 * a record generated from a master where the feature is enabled.
129 * The write_xlog parameter tells us whether to include an XLog record of this
130 * or not. Normally, this is called from transaction commit routines (both
131 * normal and prepared) and the information will be stored in the transaction
132 * commit XLog record, and so they should pass "false" for this. The XLog redo
133 * code should use "false" here as well. Other callers probably want to pass
134 * true, so that the given values persist in case of crashes.
137 TransactionTreeSetCommitTsData(TransactionId xid, int nsubxids,
138 TransactionId *subxids, TimestampTz timestamp,
140 bool replaying_xlog, bool write_xlog)
143 TransactionId headxid;
144 TransactionId newestXact;
146 /* We'd better not try to write xlog during replay */
147 Assert(!(write_xlog && replaying_xlog));
149 /* No-op if feature not enabled, unless replaying WAL */
150 if (!track_commit_timestamp && !replaying_xlog)
154 * Comply with the WAL-before-data rule: if caller specified it wants this
155 * value to be recorded in WAL, do so before touching the data.
158 WriteSetTimestampXlogRec(xid, nsubxids, subxids, timestamp, nodeid);
161 * Figure out the latest Xid in this batch: either the last subxid if
162 * there's any, otherwise the parent xid.
165 newestXact = subxids[nsubxids - 1];
170 * We split the xids to set the timestamp to in groups belonging to the
171 * same SLRU page; the first element in each such set is its head. The
172 * first group has the main XID as the head; subsequent sets use the first
173 * subxid not on the previous page as head. This way, we only have to
174 * lock/modify each SLRU page once.
176 for (i = 0, headxid = xid;;)
178 int pageno = TransactionIdToCTsPage(headxid);
181 for (j = i; j < nsubxids; j++)
183 if (TransactionIdToCTsPage(subxids[j]) != pageno)
186 /* subxids[i..j] are on the same page as the head */
188 SetXidCommitTsInPage(headxid, j - i, subxids + i, timestamp, nodeid,
191 /* if we wrote out all subxids, we're done. */
192 if (j + 1 >= nsubxids)
196 * Set the new head and skip over it, as well as over the subxids we
199 headxid = subxids[j];
203 /* update the cached value in shared memory */
204 LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
205 commitTsShared->xidLastCommit = xid;
206 commitTsShared->dataLastCommit.time = timestamp;
207 commitTsShared->dataLastCommit.nodeid = nodeid;
209 /* and move forwards our endpoint, if needed */
210 if (TransactionIdPrecedes(ShmemVariableCache->newestCommitTs, newestXact))
211 ShmemVariableCache->newestCommitTs = newestXact;
212 LWLockRelease(CommitTsLock);
216 * Record the commit timestamp of transaction entries in the commit log for all
217 * entries on a single page. Atomic only on this page.
220 SetXidCommitTsInPage(TransactionId xid, int nsubxids,
221 TransactionId *subxids, TimestampTz ts,
222 RepOriginId nodeid, int pageno)
227 LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE);
229 slotno = SimpleLruReadPage(CommitTsCtl, pageno, true, xid);
231 TransactionIdSetCommitTs(xid, ts, nodeid, slotno);
232 for (i = 0; i < nsubxids; i++)
233 TransactionIdSetCommitTs(subxids[i], ts, nodeid, slotno);
235 CommitTsCtl->shared->page_dirty[slotno] = true;
237 LWLockRelease(CommitTsControlLock);
241 * Sets the commit timestamp of a single transaction.
243 * Must be called with CommitTsControlLock held
246 TransactionIdSetCommitTs(TransactionId xid, TimestampTz ts,
247 RepOriginId nodeid, int slotno)
249 int entryno = TransactionIdToCTsEntry(xid);
250 CommitTimestampEntry entry;
252 Assert(TransactionIdIsNormal(xid));
255 entry.nodeid = nodeid;
257 memcpy(CommitTsCtl->shared->page_buffer[slotno] +
258 SizeOfCommitTimestampEntry * entryno,
259 &entry, SizeOfCommitTimestampEntry);
263 * Interrogate the commit timestamp of a transaction.
265 * The return value indicates whether a commit timestamp record was found for
266 * the given xid. The timestamp value is returned in *ts (which may not be
267 * null), and the origin node for the Xid is returned in *nodeid, if it's not
271 TransactionIdGetCommitTsData(TransactionId xid, TimestampTz *ts,
274 int pageno = TransactionIdToCTsPage(xid);
275 int entryno = TransactionIdToCTsEntry(xid);
277 CommitTimestampEntry entry;
278 TransactionId oldestCommitTs;
279 TransactionId newestCommitTs;
281 /* Error if module not enabled */
282 if (!track_commit_timestamp)
284 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
285 errmsg("could not get commit timestamp data"),
286 errhint("Make sure the configuration parameter \"%s\" is set.",
287 "track_commit_timestamp")));
289 /* error if the given Xid doesn't normally commit */
290 if (!TransactionIdIsNormal(xid))
292 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
293 errmsg("cannot retrieve commit timestamp for transaction %u", xid)));
296 * Return empty if the requested value is outside our valid range.
298 LWLockAcquire(CommitTsLock, LW_SHARED);
299 oldestCommitTs = ShmemVariableCache->oldestCommitTs;
300 newestCommitTs = ShmemVariableCache->newestCommitTs;
301 /* neither is invalid, or both are */
302 Assert(TransactionIdIsValid(oldestCommitTs) == TransactionIdIsValid(newestCommitTs));
303 LWLockRelease(CommitTsLock);
305 if (!TransactionIdIsValid(oldestCommitTs) ||
306 TransactionIdPrecedes(xid, oldestCommitTs) ||
307 TransactionIdPrecedes(newestCommitTs, xid))
311 *nodeid = InvalidRepOriginId;
316 * Use an unlocked atomic read on our cached value in shared memory; if
317 * it's a hit, acquire a lock and read the data, after verifying that it's
318 * still what we initially read. Otherwise, fall through to read from
321 if (commitTsShared->xidLastCommit == xid)
323 LWLockAcquire(CommitTsLock, LW_SHARED);
324 if (commitTsShared->xidLastCommit == xid)
326 *ts = commitTsShared->dataLastCommit.time;
328 *nodeid = commitTsShared->dataLastCommit.nodeid;
330 LWLockRelease(CommitTsLock);
333 LWLockRelease(CommitTsLock);
336 /* lock is acquired by SimpleLruReadPage_ReadOnly */
337 slotno = SimpleLruReadPage_ReadOnly(CommitTsCtl, pageno, xid);
339 CommitTsCtl->shared->page_buffer[slotno] +
340 SizeOfCommitTimestampEntry * entryno,
341 SizeOfCommitTimestampEntry);
345 *nodeid = entry.nodeid;
347 LWLockRelease(CommitTsControlLock);
352 * Return the Xid of the latest committed transaction. (As far as this module
353 * is concerned, anyway; it's up to the caller to ensure the value is useful
356 * ts and extra are filled with the corresponding data; they can be passed
357 * as NULL if not wanted.
360 GetLatestCommitTsData(TimestampTz *ts, RepOriginId *nodeid)
364 /* Error if module not enabled */
365 if (!track_commit_timestamp)
367 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
368 errmsg("could not get commit timestamp data"),
369 errhint("Make sure the configuration parameter \"%s\" is set.",
370 "track_commit_timestamp")));
372 LWLockAcquire(CommitTsLock, LW_SHARED);
373 xid = commitTsShared->xidLastCommit;
375 *ts = commitTsShared->dataLastCommit.time;
377 *nodeid = commitTsShared->dataLastCommit.nodeid;
378 LWLockRelease(CommitTsLock);
384 * SQL-callable wrapper to obtain commit time of a transaction
387 pg_xact_commit_timestamp(PG_FUNCTION_ARGS)
389 TransactionId xid = PG_GETARG_UINT32(0);
393 found = TransactionIdGetCommitTsData(xid, &ts, NULL);
398 PG_RETURN_TIMESTAMPTZ(ts);
403 pg_last_committed_xact(PG_FUNCTION_ARGS)
412 /* and construct a tuple with our data */
413 xid = GetLatestCommitTsData(&ts, NULL);
416 * Construct a tuple descriptor for the result row. This must match this
417 * function's pg_proc entry!
419 tupdesc = CreateTemplateTupleDesc(2, false);
420 TupleDescInitEntry(tupdesc, (AttrNumber) 1, "xid",
422 TupleDescInitEntry(tupdesc, (AttrNumber) 2, "timestamp",
423 TIMESTAMPTZOID, -1, 0);
424 tupdesc = BlessTupleDesc(tupdesc);
426 if (!TransactionIdIsNormal(xid))
428 memset(nulls, true, sizeof(nulls));
432 values[0] = TransactionIdGetDatum(xid);
435 values[1] = TimestampTzGetDatum(ts);
439 htup = heap_form_tuple(tupdesc, values, nulls);
441 PG_RETURN_DATUM(HeapTupleGetDatum(htup));
446 * Number of shared CommitTS buffers.
448 * We use a very similar logic as for the number of CLOG buffers; see comments
449 * in CLOGShmemBuffers.
452 CommitTsShmemBuffers(void)
454 return Min(16, Max(4, NBuffers / 1024));
458 * Shared memory sizing for CommitTs
461 CommitTsShmemSize(void)
463 return SimpleLruShmemSize(CommitTsShmemBuffers(), 0) +
464 sizeof(CommitTimestampShared);
468 * Initialize CommitTs at system startup (postmaster start or standalone
472 CommitTsShmemInit(void)
476 CommitTsCtl->PagePrecedes = CommitTsPagePrecedes;
477 SimpleLruInit(CommitTsCtl, "CommitTs Ctl", CommitTsShmemBuffers(), 0,
478 CommitTsControlLock, "pg_commit_ts");
480 commitTsShared = ShmemInitStruct("CommitTs shared",
481 sizeof(CommitTimestampShared),
484 if (!IsUnderPostmaster)
488 commitTsShared->xidLastCommit = InvalidTransactionId;
489 TIMESTAMP_NOBEGIN(commitTsShared->dataLastCommit.time);
490 commitTsShared->dataLastCommit.nodeid = InvalidRepOriginId;
497 * This function must be called ONCE on system install.
499 * (The CommitTs directory is assumed to have been created by initdb, and
500 * CommitTsShmemInit must have been called already.)
503 BootStrapCommitTs(void)
506 * Nothing to do here at present, unlike most other SLRU modules; segments
507 * are created when the server is started with this module enabled. See
513 * Initialize (or reinitialize) a page of CommitTs to zeroes.
514 * If writeXlog is TRUE, also emit an XLOG record saying we did this.
516 * The page is not actually written, just set up in shared memory.
517 * The slot number of the new page is returned.
519 * Control lock must be held at entry, and will be held at exit.
522 ZeroCommitTsPage(int pageno, bool writeXlog)
526 slotno = SimpleLruZeroPage(CommitTsCtl, pageno);
529 WriteZeroPageXlogRec(pageno);
535 * This must be called ONCE during postmaster or standalone-backend startup,
536 * after StartupXLOG has initialized ShmemVariableCache->nextXid.
539 StartupCommitTs(void)
541 TransactionId xid = ShmemVariableCache->nextXid;
542 int pageno = TransactionIdToCTsPage(xid);
544 if (track_commit_timestamp)
550 LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE);
553 * Initialize our idea of the latest page number.
555 CommitTsCtl->shared->latest_page_number = pageno;
557 LWLockRelease(CommitTsControlLock);
561 * This must be called ONCE during postmaster or standalone-backend startup,
562 * when commit timestamp is enabled, after recovery has finished.
565 CompleteCommitTsInitialization(void)
567 if (!track_commit_timestamp)
568 DeactivateCommitTs(true);
572 * Activate this module whenever necessary.
573 * This must happen during postmaster or standalong-backend startup,
574 * or during WAL replay anytime the track_commit_timestamp setting is
575 * changed in the master.
577 * The reason why this SLRU needs separate activation/deactivation functions is
578 * that it can be enabled/disabled during start and the activation/deactivation
579 * on master is propagated to slave via replay. Other SLRUs don't have this
580 * property and they can be just initialized during normal startup.
582 * This is in charge of creating the currently active segment, if it's not
583 * already there. The reason for this is that the server might have been
584 * running with this module disabled for a while and thus might have skipped
585 * the normal creation point.
588 ActivateCommitTs(void)
590 TransactionId xid = ShmemVariableCache->nextXid;
591 int pageno = TransactionIdToCTsPage(xid);
594 * Re-Initialize our idea of the latest page number.
596 LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE);
597 CommitTsCtl->shared->latest_page_number = pageno;
598 LWLockRelease(CommitTsControlLock);
601 * If CommitTs is enabled, but it wasn't in the previous server run, we
602 * need to set the oldest and newest values to the next Xid; that way, we
603 * will not try to read data that might not have been set.
605 * XXX does this have a problem if a server is started with commitTs
606 * enabled, then started with commitTs disabled, then restarted with it
607 * enabled again? It doesn't look like it does, because there should be a
608 * checkpoint that sets the value to InvalidTransactionId at end of
609 * recovery; and so any chance of injecting new transactions without
610 * CommitTs values would occur after the oldestCommitTs has been set to
611 * Invalid temporarily.
613 LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
614 if (ShmemVariableCache->oldestCommitTs == InvalidTransactionId)
616 ShmemVariableCache->oldestCommitTs =
617 ShmemVariableCache->newestCommitTs = ReadNewTransactionId();
619 LWLockRelease(CommitTsLock);
621 /* Finally, create the current segment file, if necessary */
622 if (!SimpleLruDoesPhysicalPageExist(CommitTsCtl, pageno))
626 LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE);
627 slotno = ZeroCommitTsPage(pageno, false);
628 SimpleLruWritePage(CommitTsCtl, slotno);
629 Assert(!CommitTsCtl->shared->page_dirty[slotno]);
630 LWLockRelease(CommitTsControlLock);
635 * Deactivate this module.
637 * This must be called when the track_commit_timestamp parameter is turned off.
638 * This happens during postmaster or standalone-backend startup, or during WAL
641 * Resets CommitTs into invalid state to make sure we don't hand back
642 * possibly-invalid data; also removes segments of old data.
645 DeactivateCommitTs(bool do_wal)
647 TransactionId xid = ShmemVariableCache->nextXid;
648 int pageno = TransactionIdToCTsPage(xid);
651 * Re-Initialize our idea of the latest page number.
653 LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE);
654 CommitTsCtl->shared->latest_page_number = pageno;
655 LWLockRelease(CommitTsControlLock);
657 LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
658 ShmemVariableCache->oldestCommitTs = InvalidTransactionId;
659 ShmemVariableCache->newestCommitTs = InvalidTransactionId;
660 LWLockRelease(CommitTsLock);
662 TruncateCommitTs(ReadNewTransactionId(), do_wal);
666 * This must be called ONCE during postmaster or standalone-backend shutdown
669 ShutdownCommitTs(void)
671 /* Flush dirty CommitTs pages to disk */
672 SimpleLruFlush(CommitTsCtl, false);
676 * Perform a checkpoint --- either during shutdown, or on-the-fly
679 CheckPointCommitTs(void)
681 /* Flush dirty CommitTs pages to disk */
682 SimpleLruFlush(CommitTsCtl, true);
686 * Make sure that CommitTs has room for a newly-allocated XID.
688 * NB: this is called while holding XidGenLock. We want it to be very fast
689 * most of the time; even when it's not so fast, no actual I/O need happen
690 * unless we're forced to write out a dirty CommitTs or xlog page to make room
693 * NB: the current implementation relies on track_commit_timestamp being
697 ExtendCommitTs(TransactionId newestXact)
701 /* nothing to do if module not enabled */
702 if (!track_commit_timestamp)
706 * No work except at first XID of a page. But beware: just after
707 * wraparound, the first XID of page zero is FirstNormalTransactionId.
709 if (TransactionIdToCTsEntry(newestXact) != 0 &&
710 !TransactionIdEquals(newestXact, FirstNormalTransactionId))
713 pageno = TransactionIdToCTsPage(newestXact);
715 LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE);
717 /* Zero the page and make an XLOG entry about it */
718 ZeroCommitTsPage(pageno, !InRecovery);
720 LWLockRelease(CommitTsControlLock);
724 * Remove all CommitTs segments before the one holding the passed
727 * Note that we don't need to flush XLOG here.
730 TruncateCommitTs(TransactionId oldestXact, bool do_wal)
735 * The cutoff point is the start of the segment containing oldestXact. We
736 * pass the *page* containing oldestXact to SimpleLruTruncate.
738 cutoffPage = TransactionIdToCTsPage(oldestXact);
740 /* Check to see if there's any files that could be removed */
741 if (!SlruScanDirectory(CommitTsCtl, SlruScanDirCbReportPresence,
743 return; /* nothing to remove */
745 /* Write XLOG record */
747 WriteTruncateXlogRec(cutoffPage);
749 /* Now we can remove the old CommitTs segment(s) */
750 SimpleLruTruncate(CommitTsCtl, cutoffPage);
754 * Set the limit values between which commit TS can be consulted.
757 SetCommitTsLimit(TransactionId oldestXact, TransactionId newestXact)
760 * Be careful not to overwrite values that are either further into the
761 * "future" or signal a disabled committs.
763 LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
764 if (ShmemVariableCache->oldestCommitTs != InvalidTransactionId)
766 if (TransactionIdPrecedes(ShmemVariableCache->oldestCommitTs, oldestXact))
767 ShmemVariableCache->oldestCommitTs = oldestXact;
768 if (TransactionIdPrecedes(newestXact, ShmemVariableCache->newestCommitTs))
769 ShmemVariableCache->newestCommitTs = newestXact;
773 Assert(ShmemVariableCache->newestCommitTs == InvalidTransactionId);
775 LWLockRelease(CommitTsLock);
779 * Move forwards the oldest commitTS value that can be consulted
782 AdvanceOldestCommitTs(TransactionId oldestXact)
784 LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
785 if (ShmemVariableCache->oldestCommitTs != InvalidTransactionId &&
786 TransactionIdPrecedes(ShmemVariableCache->oldestCommitTs, oldestXact))
787 ShmemVariableCache->oldestCommitTs = oldestXact;
788 LWLockRelease(CommitTsLock);
793 * Decide which of two CLOG page numbers is "older" for truncation purposes.
795 * We need to use comparison of TransactionIds here in order to do the right
796 * thing with wraparound XID arithmetic. However, if we are asked about
797 * page number zero, we don't want to hand InvalidTransactionId to
798 * TransactionIdPrecedes: it'll get weird about permanent xact IDs. So,
799 * offset both xids by FirstNormalTransactionId to avoid that.
802 CommitTsPagePrecedes(int page1, int page2)
807 xid1 = ((TransactionId) page1) * COMMIT_TS_XACTS_PER_PAGE;
808 xid1 += FirstNormalTransactionId;
809 xid2 = ((TransactionId) page2) * COMMIT_TS_XACTS_PER_PAGE;
810 xid2 += FirstNormalTransactionId;
812 return TransactionIdPrecedes(xid1, xid2);
817 * Write a ZEROPAGE xlog record
820 WriteZeroPageXlogRec(int pageno)
823 XLogRegisterData((char *) (&pageno), sizeof(int));
824 (void) XLogInsert(RM_COMMIT_TS_ID, COMMIT_TS_ZEROPAGE);
828 * Write a TRUNCATE xlog record
831 WriteTruncateXlogRec(int pageno)
834 XLogRegisterData((char *) (&pageno), sizeof(int));
835 (void) XLogInsert(RM_COMMIT_TS_ID, COMMIT_TS_TRUNCATE);
839 * Write a SETTS xlog record
842 WriteSetTimestampXlogRec(TransactionId mainxid, int nsubxids,
843 TransactionId *subxids, TimestampTz timestamp,
846 xl_commit_ts_set record;
848 record.timestamp = timestamp;
849 record.nodeid = nodeid;
850 record.mainxid = mainxid;
853 XLogRegisterData((char *) &record,
854 offsetof(xl_commit_ts_set, mainxid) +
855 sizeof(TransactionId));
856 XLogRegisterData((char *) subxids, nsubxids * sizeof(TransactionId));
857 XLogInsert(RM_COMMIT_TS_ID, COMMIT_TS_SETTS);
861 * CommitTS resource manager's routines
864 commit_ts_redo(XLogReaderState *record)
866 uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
868 /* Backup blocks are not used in commit_ts records */
869 Assert(!XLogRecHasAnyBlockRefs(record));
871 if (info == COMMIT_TS_ZEROPAGE)
876 memcpy(&pageno, XLogRecGetData(record), sizeof(int));
878 LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE);
880 slotno = ZeroCommitTsPage(pageno, false);
881 SimpleLruWritePage(CommitTsCtl, slotno);
882 Assert(!CommitTsCtl->shared->page_dirty[slotno]);
884 LWLockRelease(CommitTsControlLock);
886 else if (info == COMMIT_TS_TRUNCATE)
890 memcpy(&pageno, XLogRecGetData(record), sizeof(int));
893 * During XLOG replay, latest_page_number isn't set up yet; insert a
894 * suitable value to bypass the sanity test in SimpleLruTruncate.
896 CommitTsCtl->shared->latest_page_number = pageno;
898 SimpleLruTruncate(CommitTsCtl, pageno);
900 else if (info == COMMIT_TS_SETTS)
902 xl_commit_ts_set *setts = (xl_commit_ts_set *) XLogRecGetData(record);
904 TransactionId *subxids;
906 nsubxids = ((XLogRecGetDataLen(record) - SizeOfCommitTsSet) /
907 sizeof(TransactionId));
910 subxids = palloc(sizeof(TransactionId) * nsubxids);
912 XLogRecGetData(record) + SizeOfCommitTsSet,
913 sizeof(TransactionId) * nsubxids);
918 TransactionTreeSetCommitTsData(setts->mainxid, nsubxids, subxids,
919 setts->timestamp, setts->nodeid, false,
925 elog(PANIC, "commit_ts_redo: unknown op code %u", info);