]> granicus.if.org Git - postgresql/blob - src/backend/access/transam/commit_ts.c
Reduce pinning and buffer content locking for btree scans.
[postgresql] / src / backend / access / transam / commit_ts.c
1 /*-------------------------------------------------------------------------
2  *
3  * commit_ts.c
4  *              PostgreSQL commit timestamp manager
5  *
6  * This module is a pg_clog-like system that stores the commit timestamp
7  * for each transaction.
8  *
9  * XLOG interactions: this module generates an XLOG record whenever a new
10  * CommitTs page is initialized to zeroes.  Also, one XLOG record is
11  * generated for setting of values when the caller requests it; this allows
12  * us to support values coming from places other than transaction commit.
13  * Other writes of CommitTS come from recording of transaction commit in
14  * xact.c, which generates its own XLOG records for these events and will
15  * re-perform the status update on redo; so we need make no additional XLOG
16  * entry here.
17  *
18  * Portions Copyright (c) 1996-2015, PostgreSQL Global Development Group
19  * Portions Copyright (c) 1994, Regents of the University of California
20  *
21  * src/backend/access/transam/commit_ts.c
22  *
23  *-------------------------------------------------------------------------
24  */
25 #include "postgres.h"
26
27 #include "access/commit_ts.h"
28 #include "access/htup_details.h"
29 #include "access/slru.h"
30 #include "access/transam.h"
31 #include "catalog/pg_type.h"
32 #include "funcapi.h"
33 #include "miscadmin.h"
34 #include "pg_trace.h"
35 #include "utils/builtins.h"
36 #include "utils/snapmgr.h"
37 #include "utils/timestamp.h"
38
39 /*
40  * Defines for CommitTs page sizes.  A page is the same BLCKSZ as is used
41  * everywhere else in Postgres.
42  *
43  * Note: because TransactionIds are 32 bits and wrap around at 0xFFFFFFFF,
44  * CommitTs page numbering also wraps around at
45  * 0xFFFFFFFF/COMMIT_TS_XACTS_PER_PAGE, and CommitTs segment numbering at
46  * 0xFFFFFFFF/COMMIT_TS_XACTS_PER_PAGE/SLRU_PAGES_PER_SEGMENT.  We need take no
47  * explicit notice of that fact in this module, except when comparing segment
48  * and page numbers in TruncateCommitTs (see CommitTsPagePrecedes).
49  */
50
51 /*
52  * We need 8+4 bytes per xact.  Note that enlarging this struct might mean
53  * the largest possible file name is more than 5 chars long; see
54  * SlruScanDirectory.
55  */
56 typedef struct CommitTimestampEntry
57 {
58         TimestampTz             time;
59         CommitTsNodeId  nodeid;
60 } CommitTimestampEntry;
61
62 #define SizeOfCommitTimestampEntry (offsetof(CommitTimestampEntry, nodeid) + \
63                                                                         sizeof(CommitTsNodeId))
64
65 #define COMMIT_TS_XACTS_PER_PAGE \
66         (BLCKSZ / SizeOfCommitTimestampEntry)
67
68 #define TransactionIdToCTsPage(xid)     \
69         ((xid) / (TransactionId) COMMIT_TS_XACTS_PER_PAGE)
70 #define TransactionIdToCTsEntry(xid)    \
71         ((xid) % (TransactionId) COMMIT_TS_XACTS_PER_PAGE)
72
73 /*
74  * Link to shared-memory data structures for CommitTs control
75  */
76 static SlruCtlData CommitTsCtlData;
77
78 #define CommitTsCtl (&CommitTsCtlData)
79
80 /*
81  * We keep a cache of the last value set in shared memory.  This is protected
82  * by CommitTsLock.
83  */
84 typedef struct CommitTimestampShared
85 {
86         TransactionId   xidLastCommit;
87         CommitTimestampEntry dataLastCommit;
88 } CommitTimestampShared;
89
90 CommitTimestampShared   *commitTsShared;
91
92
93 /* GUC variable */
94 bool    track_commit_timestamp;
95
96 static CommitTsNodeId default_node_id = InvalidCommitTsNodeId;
97
98 static void SetXidCommitTsInPage(TransactionId xid, int nsubxids,
99                                          TransactionId *subxids, TimestampTz ts,
100                                          CommitTsNodeId nodeid, int pageno);
101 static void TransactionIdSetCommitTs(TransactionId xid, TimestampTz ts,
102                                                   CommitTsNodeId nodeid, int slotno);
103 static int      ZeroCommitTsPage(int pageno, bool writeXlog);
104 static bool CommitTsPagePrecedes(int page1, int page2);
105 static void WriteZeroPageXlogRec(int pageno);
106 static void WriteTruncateXlogRec(int pageno);
107 static void WriteSetTimestampXlogRec(TransactionId mainxid, int nsubxids,
108                                                  TransactionId *subxids, TimestampTz timestamp,
109                                                  CommitTsNodeId nodeid);
110
111
112 /*
113  * CommitTsSetDefaultNodeId
114  *
115  * Set default nodeid for current backend.
116  */
117 void
118 CommitTsSetDefaultNodeId(CommitTsNodeId nodeid)
119 {
120         default_node_id = nodeid;
121 }
122
123 /*
124  * CommitTsGetDefaultNodeId
125  *
126  * Set default nodeid for current backend.
127  */
128 CommitTsNodeId
129 CommitTsGetDefaultNodeId(void)
130 {
131         return default_node_id;
132 }
133
134 /*
135  * TransactionTreeSetCommitTsData
136  *
137  * Record the final commit timestamp of transaction entries in the commit log
138  * for a transaction and its subtransaction tree, as efficiently as possible.
139  *
140  * xid is the top level transaction id.
141  *
142  * subxids is an array of xids of length nsubxids, representing subtransactions
143  * in the tree of xid. In various cases nsubxids may be zero.
144  * The reason why tracking just the parent xid commit timestamp is not enough
145  * is that the subtrans SLRU does not stay valid across crashes (it's not
146  * permanent) so we need to keep the information about them here. If the
147  * subtrans implementation changes in the future, we might want to revisit the
148  * decision of storing timestamp info for each subxid.
149  *
150  * The do_xlog parameter tells us whether to include a XLog record of this
151  * or not.  Normal path through RecordTransactionCommit() will be related
152  * to a transaction commit XLog record, and so should pass "false" here.
153  * Other callers probably want to pass true, so that the given values persist
154  * in case of crashes.
155  */
156 void
157 TransactionTreeSetCommitTsData(TransactionId xid, int nsubxids,
158                                                            TransactionId *subxids, TimestampTz timestamp,
159                                                            CommitTsNodeId nodeid, bool do_xlog)
160 {
161         int                     i;
162         TransactionId headxid;
163         TransactionId newestXact;
164
165         if (!track_commit_timestamp)
166                 return;
167
168         /*
169          * Comply with the WAL-before-data rule: if caller specified it wants
170          * this value to be recorded in WAL, do so before touching the data.
171          */
172         if (do_xlog)
173                 WriteSetTimestampXlogRec(xid, nsubxids, subxids, timestamp, nodeid);
174
175         /*
176          * Figure out the latest Xid in this batch: either the last subxid if
177          * there's any, otherwise the parent xid.
178          */
179         if (nsubxids > 0)
180                 newestXact = subxids[nsubxids - 1];
181         else
182                 newestXact = xid;
183
184         /*
185          * We split the xids to set the timestamp to in groups belonging to the
186          * same SLRU page; the first element in each such set is its head.  The
187          * first group has the main XID as the head; subsequent sets use the
188          * first subxid not on the previous page as head.  This way, we only have
189          * to lock/modify each SLRU page once.
190          */
191         for (i = 0, headxid = xid;;)
192         {
193                 int                     pageno = TransactionIdToCTsPage(headxid);
194                 int                     j;
195
196                 for (j = i; j < nsubxids; j++)
197                 {
198                         if (TransactionIdToCTsPage(subxids[j]) != pageno)
199                                 break;
200                 }
201                 /* subxids[i..j] are on the same page as the head */
202
203                 SetXidCommitTsInPage(headxid, j - i, subxids + i, timestamp, nodeid,
204                                                          pageno);
205
206                 /* if we wrote out all subxids, we're done. */
207                 if (j + 1 >= nsubxids)
208                         break;
209
210                 /*
211                  * Set the new head and skip over it, as well as over the subxids
212                  * we just wrote.
213                  */
214                 headxid = subxids[j];
215                 i += j - i + 1;
216         }
217
218         /* update the cached value in shared memory */
219         LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
220         commitTsShared->xidLastCommit = xid;
221         commitTsShared->dataLastCommit.time = timestamp;
222         commitTsShared->dataLastCommit.nodeid = nodeid;
223
224         /* and move forwards our endpoint, if needed */
225         if (TransactionIdPrecedes(ShmemVariableCache->newestCommitTs, newestXact))
226                 ShmemVariableCache->newestCommitTs = newestXact;
227         LWLockRelease(CommitTsLock);
228 }
229
230 /*
231  * Record the commit timestamp of transaction entries in the commit log for all
232  * entries on a single page.  Atomic only on this page.
233  */
234 static void
235 SetXidCommitTsInPage(TransactionId xid, int nsubxids,
236                                          TransactionId *subxids, TimestampTz ts,
237                                          CommitTsNodeId nodeid, int pageno)
238 {
239         int                     slotno;
240         int                     i;
241
242         LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE);
243
244         slotno = SimpleLruReadPage(CommitTsCtl, pageno, true, xid);
245
246         TransactionIdSetCommitTs(xid, ts, nodeid, slotno);
247         for (i = 0; i < nsubxids; i++)
248                 TransactionIdSetCommitTs(subxids[i], ts, nodeid, slotno);
249
250         CommitTsCtl->shared->page_dirty[slotno] = true;
251
252         LWLockRelease(CommitTsControlLock);
253 }
254
255 /*
256  * Sets the commit timestamp of a single transaction.
257  *
258  * Must be called with CommitTsControlLock held
259  */
260 static void
261 TransactionIdSetCommitTs(TransactionId xid, TimestampTz ts,
262                                                  CommitTsNodeId nodeid, int slotno)
263 {
264         int                     entryno = TransactionIdToCTsEntry(xid);
265         CommitTimestampEntry entry;
266
267         Assert(TransactionIdIsNormal(xid));
268
269         entry.time = ts;
270         entry.nodeid = nodeid;
271
272         memcpy(CommitTsCtl->shared->page_buffer[slotno] +
273                    SizeOfCommitTimestampEntry * entryno,
274                    &entry, SizeOfCommitTimestampEntry);
275 }
276
277 /*
278  * Interrogate the commit timestamp of a transaction.
279  *
280  * Return value indicates whether commit timestamp record was found for
281  * given xid.
282  */
283 bool
284 TransactionIdGetCommitTsData(TransactionId xid, TimestampTz *ts,
285                                                          CommitTsNodeId *nodeid)
286 {
287         int                     pageno = TransactionIdToCTsPage(xid);
288         int                     entryno = TransactionIdToCTsEntry(xid);
289         int                     slotno;
290         CommitTimestampEntry entry;
291         TransactionId oldestCommitTs;
292         TransactionId newestCommitTs;
293
294         /* Error if module not enabled */
295         if (!track_commit_timestamp)
296                 ereport(ERROR,
297                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
298                                  errmsg("could not get commit timestamp data"),
299                                  errhint("Make sure the configuration parameter \"%s\" is set.",
300                                                  "track_commit_timestamp")));
301
302         /* error if the given Xid doesn't normally commit */
303         if (!TransactionIdIsNormal(xid))
304                 ereport(ERROR,
305                                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
306                                  errmsg("cannot retrieve commit timestamp for transaction %u", xid)));
307
308         /*
309          * Return empty if the requested value is outside our valid range.
310          */
311         LWLockAcquire(CommitTsLock, LW_SHARED);
312         oldestCommitTs = ShmemVariableCache->oldestCommitTs;
313         newestCommitTs = ShmemVariableCache->newestCommitTs;
314         /* neither is invalid, or both are */
315         Assert(TransactionIdIsValid(oldestCommitTs) == TransactionIdIsValid(newestCommitTs));
316         LWLockRelease(CommitTsLock);
317
318         if (!TransactionIdIsValid(oldestCommitTs) ||
319                 TransactionIdPrecedes(xid, oldestCommitTs) ||
320                 TransactionIdPrecedes(newestCommitTs, xid))
321         {
322                 if (ts)
323                         *ts = 0;
324                 if (nodeid)
325                         *nodeid = InvalidCommitTsNodeId;
326                 return false;
327         }
328
329         /*
330          * Use an unlocked atomic read on our cached value in shared memory; if
331          * it's a hit, acquire a lock and read the data, after verifying that it's
332          * still what we initially read.  Otherwise, fall through to read from
333          * SLRU.
334          */
335         if (commitTsShared->xidLastCommit == xid)
336         {
337                 LWLockAcquire(CommitTsLock, LW_SHARED);
338                 if (commitTsShared->xidLastCommit == xid)
339                 {
340                         if (ts)
341                                 *ts = commitTsShared->dataLastCommit.time;
342                         if (nodeid)
343                                 *nodeid = commitTsShared->dataLastCommit.nodeid;
344
345                         LWLockRelease(CommitTsLock);
346                         return *ts != 0;
347                 }
348                 LWLockRelease(CommitTsLock);
349         }
350
351         /* lock is acquired by SimpleLruReadPage_ReadOnly */
352         slotno = SimpleLruReadPage_ReadOnly(CommitTsCtl, pageno, xid);
353         memcpy(&entry,
354                    CommitTsCtl->shared->page_buffer[slotno] +
355                    SizeOfCommitTimestampEntry * entryno,
356                    SizeOfCommitTimestampEntry);
357
358         if (ts)
359                 *ts = entry.time;
360         if (nodeid)
361                 *nodeid = entry.nodeid;
362
363         LWLockRelease(CommitTsControlLock);
364         return *ts != 0;
365 }
366
367 /*
368  * Return the Xid of the latest committed transaction.  (As far as this module
369  * is concerned, anyway; it's up to the caller to ensure the value is useful
370  * for its purposes.)
371  *
372  * ts and extra are filled with the corresponding data; they can be passed
373  * as NULL if not wanted.
374  */
375 TransactionId
376 GetLatestCommitTsData(TimestampTz *ts, CommitTsNodeId *nodeid)
377 {
378         TransactionId   xid;
379
380         /* Error if module not enabled */
381         if (!track_commit_timestamp)
382                 ereport(ERROR,
383                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
384                                  errmsg("could not get commit timestamp data"),
385                                  errhint("Make sure the configuration parameter \"%s\" is set.",
386                                                  "track_commit_timestamp")));
387
388         LWLockAcquire(CommitTsLock, LW_SHARED);
389         xid = commitTsShared->xidLastCommit;
390         if (ts)
391                 *ts = commitTsShared->dataLastCommit.time;
392         if (nodeid)
393                 *nodeid = commitTsShared->dataLastCommit.nodeid;
394         LWLockRelease(CommitTsLock);
395
396         return xid;
397 }
398
399 /*
400  * SQL-callable wrapper to obtain commit time of a transaction
401  */
402 Datum
403 pg_xact_commit_timestamp(PG_FUNCTION_ARGS)
404 {
405         TransactionId   xid = PG_GETARG_UINT32(0);
406         TimestampTz             ts;
407         bool                    found;
408
409         found = TransactionIdGetCommitTsData(xid, &ts, NULL);
410
411         if (!found)
412                 PG_RETURN_NULL();
413
414         PG_RETURN_TIMESTAMPTZ(ts);
415 }
416
417
418 Datum
419 pg_last_committed_xact(PG_FUNCTION_ARGS)
420 {
421         TransactionId   xid;
422         TimestampTz             ts;
423         Datum       values[2];
424         bool        nulls[2];
425         TupleDesc   tupdesc;
426         HeapTuple       htup;
427
428         /* and construct a tuple with our data */
429         xid = GetLatestCommitTsData(&ts, NULL);
430
431         /*
432          * Construct a tuple descriptor for the result row.  This must match this
433          * function's pg_proc entry!
434          */
435         tupdesc = CreateTemplateTupleDesc(2, false);
436         TupleDescInitEntry(tupdesc, (AttrNumber) 1, "xid",
437                                            XIDOID, -1, 0);
438         TupleDescInitEntry(tupdesc, (AttrNumber) 2, "timestamp",
439                                            TIMESTAMPTZOID, -1, 0);
440         tupdesc = BlessTupleDesc(tupdesc);
441
442         if (!TransactionIdIsNormal(xid))
443         {
444                 memset(nulls, true, sizeof(nulls));
445         }
446         else
447         {
448                 values[0] = TransactionIdGetDatum(xid);
449                 nulls[0] = false;
450
451                 values[1] = TimestampTzGetDatum(ts);
452                 nulls[1] = false;
453         }
454
455         htup = heap_form_tuple(tupdesc, values, nulls);
456
457         PG_RETURN_DATUM(HeapTupleGetDatum(htup));
458 }
459
460
461 /*
462  * Number of shared CommitTS buffers.
463  *
464  * We use a very similar logic as for the number of CLOG buffers; see comments
465  * in CLOGShmemBuffers.
466  */
467 Size
468 CommitTsShmemBuffers(void)
469 {
470         return Min(16, Max(4, NBuffers / 1024));
471 }
472
473 /*
474  * Shared memory sizing for CommitTs
475  */
476 Size
477 CommitTsShmemSize(void)
478 {
479         return SimpleLruShmemSize(CommitTsShmemBuffers(), 0) +
480                 sizeof(CommitTimestampShared);
481 }
482
483 /*
484  * Initialize CommitTs at system startup (postmaster start or standalone
485  * backend)
486  */
487 void
488 CommitTsShmemInit(void)
489 {
490         bool    found;
491
492         CommitTsCtl->PagePrecedes = CommitTsPagePrecedes;
493         SimpleLruInit(CommitTsCtl, "CommitTs Ctl", CommitTsShmemBuffers(), 0,
494                                   CommitTsControlLock, "pg_commit_ts");
495
496         commitTsShared = ShmemInitStruct("CommitTs shared",
497                                                                          sizeof(CommitTimestampShared),
498                                                                          &found);
499
500         if (!IsUnderPostmaster)
501         {
502                 Assert(!found);
503
504                 commitTsShared->xidLastCommit = InvalidTransactionId;
505                 TIMESTAMP_NOBEGIN(commitTsShared->dataLastCommit.time);
506                 commitTsShared->dataLastCommit.nodeid = InvalidCommitTsNodeId;
507         }
508         else
509                 Assert(found);
510 }
511
512 /*
513  * This function must be called ONCE on system install.
514  *
515  * (The CommitTs directory is assumed to have been created by initdb, and
516  * CommitTsShmemInit must have been called already.)
517  */
518 void
519 BootStrapCommitTs(void)
520 {
521         /*
522          * Nothing to do here at present, unlike most other SLRU modules; segments
523          * are created when the server is started with this module enabled.
524          * See StartupCommitTs.
525          */
526 }
527
528 /*
529  * Initialize (or reinitialize) a page of CommitTs to zeroes.
530  * If writeXlog is TRUE, also emit an XLOG record saying we did this.
531  *
532  * The page is not actually written, just set up in shared memory.
533  * The slot number of the new page is returned.
534  *
535  * Control lock must be held at entry, and will be held at exit.
536  */
537 static int
538 ZeroCommitTsPage(int pageno, bool writeXlog)
539 {
540         int                     slotno;
541
542         slotno = SimpleLruZeroPage(CommitTsCtl, pageno);
543
544         if (writeXlog)
545                 WriteZeroPageXlogRec(pageno);
546
547         return slotno;
548 }
549
550 /*
551  * This must be called ONCE during postmaster or standalone-backend startup,
552  * after StartupXLOG has initialized ShmemVariableCache->nextXid.
553  */
554 void
555 StartupCommitTs(void)
556 {
557         TransactionId xid = ShmemVariableCache->nextXid;
558         int                     pageno = TransactionIdToCTsPage(xid);
559
560         if (track_commit_timestamp)
561         {
562                 ActivateCommitTs();
563                 return;
564         }
565
566         LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE);
567
568         /*
569          * Initialize our idea of the latest page number.
570          */
571         CommitTsCtl->shared->latest_page_number = pageno;
572
573         LWLockRelease(CommitTsControlLock);
574 }
575
576 /*
577  * This must be called ONCE during postmaster or standalone-backend startup,
578  * when commit timestamp is enabled, after recovery has finished.
579  */
580 void
581 CompleteCommitTsInitialization(void)
582 {
583         if (!track_commit_timestamp)
584                 DeactivateCommitTs(true);
585 }
586
587 /*
588  * Activate this module whenever necessary.
589  *              This must happen during postmaster or standalong-backend startup,
590  *              or during WAL replay anytime the track_commit_timestamp setting is
591  *              changed in the master.
592  *
593  * The reason why this SLRU needs separate activation/deactivation functions is
594  * that it can be enabled/disabled during start and the activation/deactivation
595  * on master is propagated to slave via replay. Other SLRUs don't have this
596  * property and they can be just initialized during normal startup.
597  *
598  * This is in charge of creating the currently active segment, if it's not
599  * already there.  The reason for this is that the server might have been
600  * running with this module disabled for a while and thus might have skipped
601  * the normal creation point.
602  */
603 void
604 ActivateCommitTs(void)
605 {
606         TransactionId xid = ShmemVariableCache->nextXid;
607         int                     pageno = TransactionIdToCTsPage(xid);
608
609         /*
610          * Re-Initialize our idea of the latest page number.
611          */
612         LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE);
613         CommitTsCtl->shared->latest_page_number = pageno;
614         LWLockRelease(CommitTsControlLock);
615
616         /*
617          * If CommitTs is enabled, but it wasn't in the previous server run, we
618          * need to set the oldest and newest values to the next Xid; that way, we
619          * will not try to read data that might not have been set.
620          *
621          * XXX does this have a problem if a server is started with commitTs
622          * enabled, then started with commitTs disabled, then restarted with it
623          * enabled again?  It doesn't look like it does, because there should be a
624          * checkpoint that sets the value to InvalidTransactionId at end of
625          * recovery; and so any chance of injecting new transactions without
626          * CommitTs values would occur after the oldestCommitTs has been set to
627          * Invalid temporarily.
628          */
629         LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
630         if (ShmemVariableCache->oldestCommitTs == InvalidTransactionId)
631         {
632                 ShmemVariableCache->oldestCommitTs =
633                         ShmemVariableCache->newestCommitTs = ReadNewTransactionId();
634         }
635         LWLockRelease(CommitTsLock);
636
637         /* Finally, create the current segment file, if necessary */
638         if (!SimpleLruDoesPhysicalPageExist(CommitTsCtl, pageno))
639         {
640                 int             slotno;
641
642                 LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE);
643                 slotno = ZeroCommitTsPage(pageno, false);
644                 SimpleLruWritePage(CommitTsCtl, slotno);
645                 Assert(!CommitTsCtl->shared->page_dirty[slotno]);
646                 LWLockRelease(CommitTsControlLock);
647         }
648 }
649
650 /*
651  * Deactivate this module.
652  *
653  * This must be called when the track_commit_timestamp parameter is turned off.
654  * This happens during postmaster or standalone-backend startup, or during WAL
655  * replay.
656  *
657  * Resets CommitTs into invalid state to make sure we don't hand back
658  * possibly-invalid data; also removes segments of old data.
659  */
660 void
661 DeactivateCommitTs(bool do_wal)
662 {
663         TransactionId xid = ShmemVariableCache->nextXid;
664         int                     pageno = TransactionIdToCTsPage(xid);
665
666         /*
667          * Re-Initialize our idea of the latest page number.
668          */
669         LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE);
670         CommitTsCtl->shared->latest_page_number = pageno;
671         LWLockRelease(CommitTsControlLock);
672
673         LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
674         ShmemVariableCache->oldestCommitTs = InvalidTransactionId;
675         ShmemVariableCache->newestCommitTs = InvalidTransactionId;
676         LWLockRelease(CommitTsLock);
677
678         TruncateCommitTs(ReadNewTransactionId(), do_wal);
679 }
680
681 /*
682  * This must be called ONCE during postmaster or standalone-backend shutdown
683  */
684 void
685 ShutdownCommitTs(void)
686 {
687         /* Flush dirty CommitTs pages to disk */
688         SimpleLruFlush(CommitTsCtl, false);
689 }
690
691 /*
692  * Perform a checkpoint --- either during shutdown, or on-the-fly
693  */
694 void
695 CheckPointCommitTs(void)
696 {
697         /* Flush dirty CommitTs pages to disk */
698         SimpleLruFlush(CommitTsCtl, true);
699 }
700
701 /*
702  * Make sure that CommitTs has room for a newly-allocated XID.
703  *
704  * NB: this is called while holding XidGenLock.  We want it to be very fast
705  * most of the time; even when it's not so fast, no actual I/O need happen
706  * unless we're forced to write out a dirty CommitTs or xlog page to make room
707  * in shared memory.
708  *
709  * NB: the current implementation relies on track_commit_timestamp being
710  * PGC_POSTMASTER.
711  */
712 void
713 ExtendCommitTs(TransactionId newestXact)
714 {
715         int                     pageno;
716
717         /* nothing to do if module not enabled */
718         if (!track_commit_timestamp)
719                 return;
720
721         /*
722          * No work except at first XID of a page.  But beware: just after
723          * wraparound, the first XID of page zero is FirstNormalTransactionId.
724          */
725         if (TransactionIdToCTsEntry(newestXact) != 0 &&
726                 !TransactionIdEquals(newestXact, FirstNormalTransactionId))
727                 return;
728
729         pageno = TransactionIdToCTsPage(newestXact);
730
731         LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE);
732
733         /* Zero the page and make an XLOG entry about it */
734         ZeroCommitTsPage(pageno, !InRecovery);
735
736         LWLockRelease(CommitTsControlLock);
737 }
738
739 /*
740  * Remove all CommitTs segments before the one holding the passed
741  * transaction ID.
742  *
743  * Note that we don't need to flush XLOG here.
744  */
745 void
746 TruncateCommitTs(TransactionId oldestXact, bool do_wal)
747 {
748         int                     cutoffPage;
749
750         /*
751          * The cutoff point is the start of the segment containing oldestXact. We
752          * pass the *page* containing oldestXact to SimpleLruTruncate.
753          */
754         cutoffPage = TransactionIdToCTsPage(oldestXact);
755
756         /* Check to see if there's any files that could be removed */
757         if (!SlruScanDirectory(CommitTsCtl, SlruScanDirCbReportPresence,
758                                                    &cutoffPage))
759                 return;                                 /* nothing to remove */
760
761         /* Write XLOG record */
762         if (do_wal)
763                 WriteTruncateXlogRec(cutoffPage);
764
765         /* Now we can remove the old CommitTs segment(s) */
766         SimpleLruTruncate(CommitTsCtl, cutoffPage);
767 }
768
769 /*
770  * Set the limit values between which commit TS can be consulted.
771  */
772 void
773 SetCommitTsLimit(TransactionId oldestXact, TransactionId newestXact)
774 {
775         /*
776          * Be careful not to overwrite values that are either further into the
777          * "future" or signal a disabled committs.
778          */
779         LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
780         if (ShmemVariableCache->oldestCommitTs != InvalidTransactionId)
781         {
782                 if (TransactionIdPrecedes(ShmemVariableCache->oldestCommitTs, oldestXact))
783                         ShmemVariableCache->oldestCommitTs = oldestXact;
784                 if (TransactionIdPrecedes(newestXact, ShmemVariableCache->newestCommitTs))
785                         ShmemVariableCache->newestCommitTs = newestXact;
786         }
787         else
788         {
789                 Assert(ShmemVariableCache->newestCommitTs == InvalidTransactionId);
790         }
791         LWLockRelease(CommitTsLock);
792 }
793
794 /*
795  * Move forwards the oldest commitTS value that can be consulted
796  */
797 void
798 AdvanceOldestCommitTs(TransactionId oldestXact)
799 {
800         LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
801         if (ShmemVariableCache->oldestCommitTs != InvalidTransactionId &&
802                 TransactionIdPrecedes(ShmemVariableCache->oldestCommitTs, oldestXact))
803                 ShmemVariableCache->oldestCommitTs = oldestXact;
804         LWLockRelease(CommitTsLock);
805 }
806
807
808 /*
809  * Decide which of two CLOG page numbers is "older" for truncation purposes.
810  *
811  * We need to use comparison of TransactionIds here in order to do the right
812  * thing with wraparound XID arithmetic.  However, if we are asked about
813  * page number zero, we don't want to hand InvalidTransactionId to
814  * TransactionIdPrecedes: it'll get weird about permanent xact IDs.  So,
815  * offset both xids by FirstNormalTransactionId to avoid that.
816  */
817 static bool
818 CommitTsPagePrecedes(int page1, int page2)
819 {
820         TransactionId xid1;
821         TransactionId xid2;
822
823         xid1 = ((TransactionId) page1) * COMMIT_TS_XACTS_PER_PAGE;
824         xid1 += FirstNormalTransactionId;
825         xid2 = ((TransactionId) page2) * COMMIT_TS_XACTS_PER_PAGE;
826         xid2 += FirstNormalTransactionId;
827
828         return TransactionIdPrecedes(xid1, xid2);
829 }
830
831
832 /*
833  * Write a ZEROPAGE xlog record
834  */
835 static void
836 WriteZeroPageXlogRec(int pageno)
837 {
838         XLogBeginInsert();
839         XLogRegisterData((char *) (&pageno), sizeof(int));
840         (void) XLogInsert(RM_COMMIT_TS_ID, COMMIT_TS_ZEROPAGE);
841 }
842
843 /*
844  * Write a TRUNCATE xlog record
845  */
846 static void
847 WriteTruncateXlogRec(int pageno)
848 {
849         XLogBeginInsert();
850         XLogRegisterData((char *) (&pageno), sizeof(int));
851         (void) XLogInsert(RM_COMMIT_TS_ID, COMMIT_TS_TRUNCATE);
852 }
853
854 /*
855  * Write a SETTS xlog record
856  */
857 static void
858 WriteSetTimestampXlogRec(TransactionId mainxid, int nsubxids,
859                                                  TransactionId *subxids, TimestampTz timestamp,
860                                                  CommitTsNodeId nodeid)
861 {
862         xl_commit_ts_set        record;
863
864         record.timestamp = timestamp;
865         record.nodeid = nodeid;
866         record.mainxid = mainxid;
867
868         XLogBeginInsert();
869         XLogRegisterData((char *) &record,
870                                          offsetof(xl_commit_ts_set, mainxid) +
871                                          sizeof(TransactionId));
872         XLogRegisterData((char *) subxids, nsubxids * sizeof(TransactionId));
873         XLogInsert(RM_COMMIT_TS_ID, COMMIT_TS_SETTS);
874 }
875
876 /*
877  * CommitTS resource manager's routines
878  */
879 void
880 commit_ts_redo(XLogReaderState *record)
881 {
882         uint8           info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
883
884         /* Backup blocks are not used in commit_ts records */
885         Assert(!XLogRecHasAnyBlockRefs(record));
886
887         if (info == COMMIT_TS_ZEROPAGE)
888         {
889                 int                     pageno;
890                 int                     slotno;
891
892                 memcpy(&pageno, XLogRecGetData(record), sizeof(int));
893
894                 LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE);
895
896                 slotno = ZeroCommitTsPage(pageno, false);
897                 SimpleLruWritePage(CommitTsCtl, slotno);
898                 Assert(!CommitTsCtl->shared->page_dirty[slotno]);
899
900                 LWLockRelease(CommitTsControlLock);
901         }
902         else if (info == COMMIT_TS_TRUNCATE)
903         {
904                 int                     pageno;
905
906                 memcpy(&pageno, XLogRecGetData(record), sizeof(int));
907
908                 /*
909                  * During XLOG replay, latest_page_number isn't set up yet; insert a
910                  * suitable value to bypass the sanity test in SimpleLruTruncate.
911                  */
912                 CommitTsCtl->shared->latest_page_number = pageno;
913
914                 SimpleLruTruncate(CommitTsCtl, pageno);
915         }
916         else if (info == COMMIT_TS_SETTS)
917         {
918                 xl_commit_ts_set *setts = (xl_commit_ts_set *) XLogRecGetData(record);
919                 int                     nsubxids;
920                 TransactionId *subxids;
921
922                 nsubxids = ((XLogRecGetDataLen(record) - SizeOfCommitTsSet) /
923                                         sizeof(TransactionId));
924                 if (nsubxids > 0)
925                 {
926                         subxids = palloc(sizeof(TransactionId) * nsubxids);
927                         memcpy(subxids,
928                                    XLogRecGetData(record) + SizeOfCommitTsSet,
929                                    sizeof(TransactionId) * nsubxids);
930                 }
931                 else
932                         subxids = NULL;
933
934                 TransactionTreeSetCommitTsData(setts->mainxid, nsubxids, subxids,
935                                                                            setts->timestamp, setts->nodeid, false);
936                 if (subxids)
937                         pfree(subxids);
938         }
939         else
940                 elog(PANIC, "commit_ts_redo: unknown op code %u", info);
941 }