]> granicus.if.org Git - postgresql/blob - src/backend/access/transam/commit_ts.c
Introduce replication progress tracking infrastructure.
[postgresql] / src / backend / access / transam / commit_ts.c
1 /*-------------------------------------------------------------------------
2  *
3  * commit_ts.c
4  *              PostgreSQL commit timestamp manager
5  *
6  * This module is a pg_clog-like system that stores the commit timestamp
7  * for each transaction.
8  *
9  * XLOG interactions: this module generates an XLOG record whenever a new
10  * CommitTs page is initialized to zeroes.  Also, one XLOG record is
11  * generated for setting of values when the caller requests it; this allows
12  * us to support values coming from places other than transaction commit.
13  * Other writes of CommitTS come from recording of transaction commit in
14  * xact.c, which generates its own XLOG records for these events and will
15  * re-perform the status update on redo; so we need make no additional XLOG
16  * entry here.
17  *
18  * Portions Copyright (c) 1996-2015, PostgreSQL Global Development Group
19  * Portions Copyright (c) 1994, Regents of the University of California
20  *
21  * src/backend/access/transam/commit_ts.c
22  *
23  *-------------------------------------------------------------------------
24  */
25 #include "postgres.h"
26
27 #include "access/commit_ts.h"
28 #include "access/htup_details.h"
29 #include "access/slru.h"
30 #include "access/transam.h"
31 #include "catalog/pg_type.h"
32 #include "funcapi.h"
33 #include "miscadmin.h"
34 #include "pg_trace.h"
35 #include "utils/builtins.h"
36 #include "utils/snapmgr.h"
37 #include "utils/timestamp.h"
38
39 /*
40  * Defines for CommitTs page sizes.  A page is the same BLCKSZ as is used
41  * everywhere else in Postgres.
42  *
43  * Note: because TransactionIds are 32 bits and wrap around at 0xFFFFFFFF,
44  * CommitTs page numbering also wraps around at
45  * 0xFFFFFFFF/COMMIT_TS_XACTS_PER_PAGE, and CommitTs segment numbering at
46  * 0xFFFFFFFF/COMMIT_TS_XACTS_PER_PAGE/SLRU_PAGES_PER_SEGMENT.  We need take no
47  * explicit notice of that fact in this module, except when comparing segment
48  * and page numbers in TruncateCommitTs (see CommitTsPagePrecedes).
49  */
50
51 /*
52  * We need 8+2 bytes per xact.  Note that enlarging this struct might mean
53  * the largest possible file name is more than 5 chars long; see
54  * SlruScanDirectory.
55  */
56 typedef struct CommitTimestampEntry
57 {
58         TimestampTz             time;
59         RepOriginId             nodeid;
60 } CommitTimestampEntry;
61
62 #define SizeOfCommitTimestampEntry (offsetof(CommitTimestampEntry, nodeid) + \
63                                                                         sizeof(RepOriginId))
64
65 #define COMMIT_TS_XACTS_PER_PAGE \
66         (BLCKSZ / SizeOfCommitTimestampEntry)
67
68 #define TransactionIdToCTsPage(xid)     \
69         ((xid) / (TransactionId) COMMIT_TS_XACTS_PER_PAGE)
70 #define TransactionIdToCTsEntry(xid)    \
71         ((xid) % (TransactionId) COMMIT_TS_XACTS_PER_PAGE)
72
73 /*
74  * Link to shared-memory data structures for CommitTs control
75  */
76 static SlruCtlData CommitTsCtlData;
77
78 #define CommitTsCtl (&CommitTsCtlData)
79
80 /*
81  * We keep a cache of the last value set in shared memory.  This is protected
82  * by CommitTsLock.
83  */
84 typedef struct CommitTimestampShared
85 {
86         TransactionId   xidLastCommit;
87         CommitTimestampEntry dataLastCommit;
88 } CommitTimestampShared;
89
90 CommitTimestampShared   *commitTsShared;
91
92
93 /* GUC variable */
94 bool    track_commit_timestamp;
95
96 static void SetXidCommitTsInPage(TransactionId xid, int nsubxids,
97                                          TransactionId *subxids, TimestampTz ts,
98                                          RepOriginId nodeid, int pageno);
99 static void TransactionIdSetCommitTs(TransactionId xid, TimestampTz ts,
100                                                   RepOriginId nodeid, int slotno);
101 static int      ZeroCommitTsPage(int pageno, bool writeXlog);
102 static bool CommitTsPagePrecedes(int page1, int page2);
103 static void WriteZeroPageXlogRec(int pageno);
104 static void WriteTruncateXlogRec(int pageno);
105 static void WriteSetTimestampXlogRec(TransactionId mainxid, int nsubxids,
106                                                  TransactionId *subxids, TimestampTz timestamp,
107                                                  RepOriginId nodeid);
108
109 /*
110  * TransactionTreeSetCommitTsData
111  *
112  * Record the final commit timestamp of transaction entries in the commit log
113  * for a transaction and its subtransaction tree, as efficiently as possible.
114  *
115  * xid is the top level transaction id.
116  *
117  * subxids is an array of xids of length nsubxids, representing subtransactions
118  * in the tree of xid. In various cases nsubxids may be zero.
119  * The reason why tracking just the parent xid commit timestamp is not enough
120  * is that the subtrans SLRU does not stay valid across crashes (it's not
121  * permanent) so we need to keep the information about them here. If the
122  * subtrans implementation changes in the future, we might want to revisit the
123  * decision of storing timestamp info for each subxid.
124  *
125  * The do_xlog parameter tells us whether to include a XLog record of this
126  * or not.  Normal path through RecordTransactionCommit() will be related
127  * to a transaction commit XLog record, and so should pass "false" here.
128  * Other callers probably want to pass true, so that the given values persist
129  * in case of crashes.
130  */
131 void
132 TransactionTreeSetCommitTsData(TransactionId xid, int nsubxids,
133                                                            TransactionId *subxids, TimestampTz timestamp,
134                                                            RepOriginId nodeid, bool do_xlog)
135 {
136         int                     i;
137         TransactionId headxid;
138         TransactionId newestXact;
139
140         if (!track_commit_timestamp)
141                 return;
142
143         /*
144          * Comply with the WAL-before-data rule: if caller specified it wants
145          * this value to be recorded in WAL, do so before touching the data.
146          */
147         if (do_xlog)
148                 WriteSetTimestampXlogRec(xid, nsubxids, subxids, timestamp, nodeid);
149
150         /*
151          * Figure out the latest Xid in this batch: either the last subxid if
152          * there's any, otherwise the parent xid.
153          */
154         if (nsubxids > 0)
155                 newestXact = subxids[nsubxids - 1];
156         else
157                 newestXact = xid;
158
159         /*
160          * We split the xids to set the timestamp to in groups belonging to the
161          * same SLRU page; the first element in each such set is its head.  The
162          * first group has the main XID as the head; subsequent sets use the
163          * first subxid not on the previous page as head.  This way, we only have
164          * to lock/modify each SLRU page once.
165          */
166         for (i = 0, headxid = xid;;)
167         {
168                 int                     pageno = TransactionIdToCTsPage(headxid);
169                 int                     j;
170
171                 for (j = i; j < nsubxids; j++)
172                 {
173                         if (TransactionIdToCTsPage(subxids[j]) != pageno)
174                                 break;
175                 }
176                 /* subxids[i..j] are on the same page as the head */
177
178                 SetXidCommitTsInPage(headxid, j - i, subxids + i, timestamp, nodeid,
179                                                          pageno);
180
181                 /* if we wrote out all subxids, we're done. */
182                 if (j + 1 >= nsubxids)
183                         break;
184
185                 /*
186                  * Set the new head and skip over it, as well as over the subxids
187                  * we just wrote.
188                  */
189                 headxid = subxids[j];
190                 i += j - i + 1;
191         }
192
193         /* update the cached value in shared memory */
194         LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
195         commitTsShared->xidLastCommit = xid;
196         commitTsShared->dataLastCommit.time = timestamp;
197         commitTsShared->dataLastCommit.nodeid = nodeid;
198
199         /* and move forwards our endpoint, if needed */
200         if (TransactionIdPrecedes(ShmemVariableCache->newestCommitTs, newestXact))
201                 ShmemVariableCache->newestCommitTs = newestXact;
202         LWLockRelease(CommitTsLock);
203 }
204
205 /*
206  * Record the commit timestamp of transaction entries in the commit log for all
207  * entries on a single page.  Atomic only on this page.
208  */
209 static void
210 SetXidCommitTsInPage(TransactionId xid, int nsubxids,
211                                          TransactionId *subxids, TimestampTz ts,
212                                          RepOriginId nodeid, int pageno)
213 {
214         int                     slotno;
215         int                     i;
216
217         LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE);
218
219         slotno = SimpleLruReadPage(CommitTsCtl, pageno, true, xid);
220
221         TransactionIdSetCommitTs(xid, ts, nodeid, slotno);
222         for (i = 0; i < nsubxids; i++)
223                 TransactionIdSetCommitTs(subxids[i], ts, nodeid, slotno);
224
225         CommitTsCtl->shared->page_dirty[slotno] = true;
226
227         LWLockRelease(CommitTsControlLock);
228 }
229
230 /*
231  * Sets the commit timestamp of a single transaction.
232  *
233  * Must be called with CommitTsControlLock held
234  */
235 static void
236 TransactionIdSetCommitTs(TransactionId xid, TimestampTz ts,
237                                                  RepOriginId nodeid, int slotno)
238 {
239         int                     entryno = TransactionIdToCTsEntry(xid);
240         CommitTimestampEntry entry;
241
242         Assert(TransactionIdIsNormal(xid));
243
244         entry.time = ts;
245         entry.nodeid = nodeid;
246
247         memcpy(CommitTsCtl->shared->page_buffer[slotno] +
248                    SizeOfCommitTimestampEntry * entryno,
249                    &entry, SizeOfCommitTimestampEntry);
250 }
251
252 /*
253  * Interrogate the commit timestamp of a transaction.
254  *
255  * Return value indicates whether commit timestamp record was found for
256  * given xid.
257  */
258 bool
259 TransactionIdGetCommitTsData(TransactionId xid, TimestampTz *ts,
260                                                          RepOriginId *nodeid)
261 {
262         int                     pageno = TransactionIdToCTsPage(xid);
263         int                     entryno = TransactionIdToCTsEntry(xid);
264         int                     slotno;
265         CommitTimestampEntry entry;
266         TransactionId oldestCommitTs;
267         TransactionId newestCommitTs;
268
269         /* Error if module not enabled */
270         if (!track_commit_timestamp)
271                 ereport(ERROR,
272                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
273                                  errmsg("could not get commit timestamp data"),
274                                  errhint("Make sure the configuration parameter \"%s\" is set.",
275                                                  "track_commit_timestamp")));
276
277         /* error if the given Xid doesn't normally commit */
278         if (!TransactionIdIsNormal(xid))
279                 ereport(ERROR,
280                                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
281                                  errmsg("cannot retrieve commit timestamp for transaction %u", xid)));
282
283         /*
284          * Return empty if the requested value is outside our valid range.
285          */
286         LWLockAcquire(CommitTsLock, LW_SHARED);
287         oldestCommitTs = ShmemVariableCache->oldestCommitTs;
288         newestCommitTs = ShmemVariableCache->newestCommitTs;
289         /* neither is invalid, or both are */
290         Assert(TransactionIdIsValid(oldestCommitTs) == TransactionIdIsValid(newestCommitTs));
291         LWLockRelease(CommitTsLock);
292
293         if (!TransactionIdIsValid(oldestCommitTs) ||
294                 TransactionIdPrecedes(xid, oldestCommitTs) ||
295                 TransactionIdPrecedes(newestCommitTs, xid))
296         {
297                 if (ts)
298                         *ts = 0;
299                 if (nodeid)
300                         *nodeid = InvalidRepOriginId;
301                 return false;
302         }
303
304         /*
305          * Use an unlocked atomic read on our cached value in shared memory; if
306          * it's a hit, acquire a lock and read the data, after verifying that it's
307          * still what we initially read.  Otherwise, fall through to read from
308          * SLRU.
309          */
310         if (commitTsShared->xidLastCommit == xid)
311         {
312                 LWLockAcquire(CommitTsLock, LW_SHARED);
313                 if (commitTsShared->xidLastCommit == xid)
314                 {
315                         if (ts)
316                                 *ts = commitTsShared->dataLastCommit.time;
317                         if (nodeid)
318                                 *nodeid = commitTsShared->dataLastCommit.nodeid;
319
320                         LWLockRelease(CommitTsLock);
321                         return *ts != 0;
322                 }
323                 LWLockRelease(CommitTsLock);
324         }
325
326         /* lock is acquired by SimpleLruReadPage_ReadOnly */
327         slotno = SimpleLruReadPage_ReadOnly(CommitTsCtl, pageno, xid);
328         memcpy(&entry,
329                    CommitTsCtl->shared->page_buffer[slotno] +
330                    SizeOfCommitTimestampEntry * entryno,
331                    SizeOfCommitTimestampEntry);
332
333         if (ts)
334                 *ts = entry.time;
335         if (nodeid)
336                 *nodeid = entry.nodeid;
337
338         LWLockRelease(CommitTsControlLock);
339         return *ts != 0;
340 }
341
342 /*
343  * Return the Xid of the latest committed transaction.  (As far as this module
344  * is concerned, anyway; it's up to the caller to ensure the value is useful
345  * for its purposes.)
346  *
347  * ts and extra are filled with the corresponding data; they can be passed
348  * as NULL if not wanted.
349  */
350 TransactionId
351 GetLatestCommitTsData(TimestampTz *ts, RepOriginId *nodeid)
352 {
353         TransactionId   xid;
354
355         /* Error if module not enabled */
356         if (!track_commit_timestamp)
357                 ereport(ERROR,
358                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
359                                  errmsg("could not get commit timestamp data"),
360                                  errhint("Make sure the configuration parameter \"%s\" is set.",
361                                                  "track_commit_timestamp")));
362
363         LWLockAcquire(CommitTsLock, LW_SHARED);
364         xid = commitTsShared->xidLastCommit;
365         if (ts)
366                 *ts = commitTsShared->dataLastCommit.time;
367         if (nodeid)
368                 *nodeid = commitTsShared->dataLastCommit.nodeid;
369         LWLockRelease(CommitTsLock);
370
371         return xid;
372 }
373
374 /*
375  * SQL-callable wrapper to obtain commit time of a transaction
376  */
377 Datum
378 pg_xact_commit_timestamp(PG_FUNCTION_ARGS)
379 {
380         TransactionId   xid = PG_GETARG_UINT32(0);
381         TimestampTz             ts;
382         bool                    found;
383
384         found = TransactionIdGetCommitTsData(xid, &ts, NULL);
385
386         if (!found)
387                 PG_RETURN_NULL();
388
389         PG_RETURN_TIMESTAMPTZ(ts);
390 }
391
392
393 Datum
394 pg_last_committed_xact(PG_FUNCTION_ARGS)
395 {
396         TransactionId   xid;
397         TimestampTz             ts;
398         Datum       values[2];
399         bool        nulls[2];
400         TupleDesc   tupdesc;
401         HeapTuple       htup;
402
403         /* and construct a tuple with our data */
404         xid = GetLatestCommitTsData(&ts, NULL);
405
406         /*
407          * Construct a tuple descriptor for the result row.  This must match this
408          * function's pg_proc entry!
409          */
410         tupdesc = CreateTemplateTupleDesc(2, false);
411         TupleDescInitEntry(tupdesc, (AttrNumber) 1, "xid",
412                                            XIDOID, -1, 0);
413         TupleDescInitEntry(tupdesc, (AttrNumber) 2, "timestamp",
414                                            TIMESTAMPTZOID, -1, 0);
415         tupdesc = BlessTupleDesc(tupdesc);
416
417         if (!TransactionIdIsNormal(xid))
418         {
419                 memset(nulls, true, sizeof(nulls));
420         }
421         else
422         {
423                 values[0] = TransactionIdGetDatum(xid);
424                 nulls[0] = false;
425
426                 values[1] = TimestampTzGetDatum(ts);
427                 nulls[1] = false;
428         }
429
430         htup = heap_form_tuple(tupdesc, values, nulls);
431
432         PG_RETURN_DATUM(HeapTupleGetDatum(htup));
433 }
434
435
436 /*
437  * Number of shared CommitTS buffers.
438  *
439  * We use a very similar logic as for the number of CLOG buffers; see comments
440  * in CLOGShmemBuffers.
441  */
442 Size
443 CommitTsShmemBuffers(void)
444 {
445         return Min(16, Max(4, NBuffers / 1024));
446 }
447
448 /*
449  * Shared memory sizing for CommitTs
450  */
451 Size
452 CommitTsShmemSize(void)
453 {
454         return SimpleLruShmemSize(CommitTsShmemBuffers(), 0) +
455                 sizeof(CommitTimestampShared);
456 }
457
458 /*
459  * Initialize CommitTs at system startup (postmaster start or standalone
460  * backend)
461  */
462 void
463 CommitTsShmemInit(void)
464 {
465         bool    found;
466
467         CommitTsCtl->PagePrecedes = CommitTsPagePrecedes;
468         SimpleLruInit(CommitTsCtl, "CommitTs Ctl", CommitTsShmemBuffers(), 0,
469                                   CommitTsControlLock, "pg_commit_ts");
470
471         commitTsShared = ShmemInitStruct("CommitTs shared",
472                                                                          sizeof(CommitTimestampShared),
473                                                                          &found);
474
475         if (!IsUnderPostmaster)
476         {
477                 Assert(!found);
478
479                 commitTsShared->xidLastCommit = InvalidTransactionId;
480                 TIMESTAMP_NOBEGIN(commitTsShared->dataLastCommit.time);
481                 commitTsShared->dataLastCommit.nodeid = InvalidRepOriginId;
482         }
483         else
484                 Assert(found);
485 }
486
487 /*
488  * This function must be called ONCE on system install.
489  *
490  * (The CommitTs directory is assumed to have been created by initdb, and
491  * CommitTsShmemInit must have been called already.)
492  */
493 void
494 BootStrapCommitTs(void)
495 {
496         /*
497          * Nothing to do here at present, unlike most other SLRU modules; segments
498          * are created when the server is started with this module enabled.
499          * See StartupCommitTs.
500          */
501 }
502
503 /*
504  * Initialize (or reinitialize) a page of CommitTs to zeroes.
505  * If writeXlog is TRUE, also emit an XLOG record saying we did this.
506  *
507  * The page is not actually written, just set up in shared memory.
508  * The slot number of the new page is returned.
509  *
510  * Control lock must be held at entry, and will be held at exit.
511  */
512 static int
513 ZeroCommitTsPage(int pageno, bool writeXlog)
514 {
515         int                     slotno;
516
517         slotno = SimpleLruZeroPage(CommitTsCtl, pageno);
518
519         if (writeXlog)
520                 WriteZeroPageXlogRec(pageno);
521
522         return slotno;
523 }
524
525 /*
526  * This must be called ONCE during postmaster or standalone-backend startup,
527  * after StartupXLOG has initialized ShmemVariableCache->nextXid.
528  */
529 void
530 StartupCommitTs(void)
531 {
532         TransactionId xid = ShmemVariableCache->nextXid;
533         int                     pageno = TransactionIdToCTsPage(xid);
534
535         if (track_commit_timestamp)
536         {
537                 ActivateCommitTs();
538                 return;
539         }
540
541         LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE);
542
543         /*
544          * Initialize our idea of the latest page number.
545          */
546         CommitTsCtl->shared->latest_page_number = pageno;
547
548         LWLockRelease(CommitTsControlLock);
549 }
550
551 /*
552  * This must be called ONCE during postmaster or standalone-backend startup,
553  * when commit timestamp is enabled, after recovery has finished.
554  */
555 void
556 CompleteCommitTsInitialization(void)
557 {
558         if (!track_commit_timestamp)
559                 DeactivateCommitTs(true);
560 }
561
562 /*
563  * Activate this module whenever necessary.
564  *              This must happen during postmaster or standalong-backend startup,
565  *              or during WAL replay anytime the track_commit_timestamp setting is
566  *              changed in the master.
567  *
568  * The reason why this SLRU needs separate activation/deactivation functions is
569  * that it can be enabled/disabled during start and the activation/deactivation
570  * on master is propagated to slave via replay. Other SLRUs don't have this
571  * property and they can be just initialized during normal startup.
572  *
573  * This is in charge of creating the currently active segment, if it's not
574  * already there.  The reason for this is that the server might have been
575  * running with this module disabled for a while and thus might have skipped
576  * the normal creation point.
577  */
578 void
579 ActivateCommitTs(void)
580 {
581         TransactionId xid = ShmemVariableCache->nextXid;
582         int                     pageno = TransactionIdToCTsPage(xid);
583
584         /*
585          * Re-Initialize our idea of the latest page number.
586          */
587         LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE);
588         CommitTsCtl->shared->latest_page_number = pageno;
589         LWLockRelease(CommitTsControlLock);
590
591         /*
592          * If CommitTs is enabled, but it wasn't in the previous server run, we
593          * need to set the oldest and newest values to the next Xid; that way, we
594          * will not try to read data that might not have been set.
595          *
596          * XXX does this have a problem if a server is started with commitTs
597          * enabled, then started with commitTs disabled, then restarted with it
598          * enabled again?  It doesn't look like it does, because there should be a
599          * checkpoint that sets the value to InvalidTransactionId at end of
600          * recovery; and so any chance of injecting new transactions without
601          * CommitTs values would occur after the oldestCommitTs has been set to
602          * Invalid temporarily.
603          */
604         LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
605         if (ShmemVariableCache->oldestCommitTs == InvalidTransactionId)
606         {
607                 ShmemVariableCache->oldestCommitTs =
608                         ShmemVariableCache->newestCommitTs = ReadNewTransactionId();
609         }
610         LWLockRelease(CommitTsLock);
611
612         /* Finally, create the current segment file, if necessary */
613         if (!SimpleLruDoesPhysicalPageExist(CommitTsCtl, pageno))
614         {
615                 int             slotno;
616
617                 LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE);
618                 slotno = ZeroCommitTsPage(pageno, false);
619                 SimpleLruWritePage(CommitTsCtl, slotno);
620                 Assert(!CommitTsCtl->shared->page_dirty[slotno]);
621                 LWLockRelease(CommitTsControlLock);
622         }
623 }
624
625 /*
626  * Deactivate this module.
627  *
628  * This must be called when the track_commit_timestamp parameter is turned off.
629  * This happens during postmaster or standalone-backend startup, or during WAL
630  * replay.
631  *
632  * Resets CommitTs into invalid state to make sure we don't hand back
633  * possibly-invalid data; also removes segments of old data.
634  */
635 void
636 DeactivateCommitTs(bool do_wal)
637 {
638         TransactionId xid = ShmemVariableCache->nextXid;
639         int                     pageno = TransactionIdToCTsPage(xid);
640
641         /*
642          * Re-Initialize our idea of the latest page number.
643          */
644         LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE);
645         CommitTsCtl->shared->latest_page_number = pageno;
646         LWLockRelease(CommitTsControlLock);
647
648         LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
649         ShmemVariableCache->oldestCommitTs = InvalidTransactionId;
650         ShmemVariableCache->newestCommitTs = InvalidTransactionId;
651         LWLockRelease(CommitTsLock);
652
653         TruncateCommitTs(ReadNewTransactionId(), do_wal);
654 }
655
656 /*
657  * This must be called ONCE during postmaster or standalone-backend shutdown
658  */
659 void
660 ShutdownCommitTs(void)
661 {
662         /* Flush dirty CommitTs pages to disk */
663         SimpleLruFlush(CommitTsCtl, false);
664 }
665
666 /*
667  * Perform a checkpoint --- either during shutdown, or on-the-fly
668  */
669 void
670 CheckPointCommitTs(void)
671 {
672         /* Flush dirty CommitTs pages to disk */
673         SimpleLruFlush(CommitTsCtl, true);
674 }
675
676 /*
677  * Make sure that CommitTs has room for a newly-allocated XID.
678  *
679  * NB: this is called while holding XidGenLock.  We want it to be very fast
680  * most of the time; even when it's not so fast, no actual I/O need happen
681  * unless we're forced to write out a dirty CommitTs or xlog page to make room
682  * in shared memory.
683  *
684  * NB: the current implementation relies on track_commit_timestamp being
685  * PGC_POSTMASTER.
686  */
687 void
688 ExtendCommitTs(TransactionId newestXact)
689 {
690         int                     pageno;
691
692         /* nothing to do if module not enabled */
693         if (!track_commit_timestamp)
694                 return;
695
696         /*
697          * No work except at first XID of a page.  But beware: just after
698          * wraparound, the first XID of page zero is FirstNormalTransactionId.
699          */
700         if (TransactionIdToCTsEntry(newestXact) != 0 &&
701                 !TransactionIdEquals(newestXact, FirstNormalTransactionId))
702                 return;
703
704         pageno = TransactionIdToCTsPage(newestXact);
705
706         LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE);
707
708         /* Zero the page and make an XLOG entry about it */
709         ZeroCommitTsPage(pageno, !InRecovery);
710
711         LWLockRelease(CommitTsControlLock);
712 }
713
714 /*
715  * Remove all CommitTs segments before the one holding the passed
716  * transaction ID.
717  *
718  * Note that we don't need to flush XLOG here.
719  */
720 void
721 TruncateCommitTs(TransactionId oldestXact, bool do_wal)
722 {
723         int                     cutoffPage;
724
725         /*
726          * The cutoff point is the start of the segment containing oldestXact. We
727          * pass the *page* containing oldestXact to SimpleLruTruncate.
728          */
729         cutoffPage = TransactionIdToCTsPage(oldestXact);
730
731         /* Check to see if there's any files that could be removed */
732         if (!SlruScanDirectory(CommitTsCtl, SlruScanDirCbReportPresence,
733                                                    &cutoffPage))
734                 return;                                 /* nothing to remove */
735
736         /* Write XLOG record */
737         if (do_wal)
738                 WriteTruncateXlogRec(cutoffPage);
739
740         /* Now we can remove the old CommitTs segment(s) */
741         SimpleLruTruncate(CommitTsCtl, cutoffPage);
742 }
743
744 /*
745  * Set the limit values between which commit TS can be consulted.
746  */
747 void
748 SetCommitTsLimit(TransactionId oldestXact, TransactionId newestXact)
749 {
750         /*
751          * Be careful not to overwrite values that are either further into the
752          * "future" or signal a disabled committs.
753          */
754         LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
755         if (ShmemVariableCache->oldestCommitTs != InvalidTransactionId)
756         {
757                 if (TransactionIdPrecedes(ShmemVariableCache->oldestCommitTs, oldestXact))
758                         ShmemVariableCache->oldestCommitTs = oldestXact;
759                 if (TransactionIdPrecedes(newestXact, ShmemVariableCache->newestCommitTs))
760                         ShmemVariableCache->newestCommitTs = newestXact;
761         }
762         else
763         {
764                 Assert(ShmemVariableCache->newestCommitTs == InvalidTransactionId);
765         }
766         LWLockRelease(CommitTsLock);
767 }
768
769 /*
770  * Move forwards the oldest commitTS value that can be consulted
771  */
772 void
773 AdvanceOldestCommitTs(TransactionId oldestXact)
774 {
775         LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
776         if (ShmemVariableCache->oldestCommitTs != InvalidTransactionId &&
777                 TransactionIdPrecedes(ShmemVariableCache->oldestCommitTs, oldestXact))
778                 ShmemVariableCache->oldestCommitTs = oldestXact;
779         LWLockRelease(CommitTsLock);
780 }
781
782
783 /*
784  * Decide which of two CLOG page numbers is "older" for truncation purposes.
785  *
786  * We need to use comparison of TransactionIds here in order to do the right
787  * thing with wraparound XID arithmetic.  However, if we are asked about
788  * page number zero, we don't want to hand InvalidTransactionId to
789  * TransactionIdPrecedes: it'll get weird about permanent xact IDs.  So,
790  * offset both xids by FirstNormalTransactionId to avoid that.
791  */
792 static bool
793 CommitTsPagePrecedes(int page1, int page2)
794 {
795         TransactionId xid1;
796         TransactionId xid2;
797
798         xid1 = ((TransactionId) page1) * COMMIT_TS_XACTS_PER_PAGE;
799         xid1 += FirstNormalTransactionId;
800         xid2 = ((TransactionId) page2) * COMMIT_TS_XACTS_PER_PAGE;
801         xid2 += FirstNormalTransactionId;
802
803         return TransactionIdPrecedes(xid1, xid2);
804 }
805
806
807 /*
808  * Write a ZEROPAGE xlog record
809  */
810 static void
811 WriteZeroPageXlogRec(int pageno)
812 {
813         XLogBeginInsert();
814         XLogRegisterData((char *) (&pageno), sizeof(int));
815         (void) XLogInsert(RM_COMMIT_TS_ID, COMMIT_TS_ZEROPAGE);
816 }
817
818 /*
819  * Write a TRUNCATE xlog record
820  */
821 static void
822 WriteTruncateXlogRec(int pageno)
823 {
824         XLogBeginInsert();
825         XLogRegisterData((char *) (&pageno), sizeof(int));
826         (void) XLogInsert(RM_COMMIT_TS_ID, COMMIT_TS_TRUNCATE);
827 }
828
829 /*
830  * Write a SETTS xlog record
831  */
832 static void
833 WriteSetTimestampXlogRec(TransactionId mainxid, int nsubxids,
834                                                  TransactionId *subxids, TimestampTz timestamp,
835                                                  RepOriginId nodeid)
836 {
837         xl_commit_ts_set        record;
838
839         record.timestamp = timestamp;
840         record.nodeid = nodeid;
841         record.mainxid = mainxid;
842
843         XLogBeginInsert();
844         XLogRegisterData((char *) &record,
845                                          offsetof(xl_commit_ts_set, mainxid) +
846                                          sizeof(TransactionId));
847         XLogRegisterData((char *) subxids, nsubxids * sizeof(TransactionId));
848         XLogInsert(RM_COMMIT_TS_ID, COMMIT_TS_SETTS);
849 }
850
851 /*
852  * CommitTS resource manager's routines
853  */
854 void
855 commit_ts_redo(XLogReaderState *record)
856 {
857         uint8           info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
858
859         /* Backup blocks are not used in commit_ts records */
860         Assert(!XLogRecHasAnyBlockRefs(record));
861
862         if (info == COMMIT_TS_ZEROPAGE)
863         {
864                 int                     pageno;
865                 int                     slotno;
866
867                 memcpy(&pageno, XLogRecGetData(record), sizeof(int));
868
869                 LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE);
870
871                 slotno = ZeroCommitTsPage(pageno, false);
872                 SimpleLruWritePage(CommitTsCtl, slotno);
873                 Assert(!CommitTsCtl->shared->page_dirty[slotno]);
874
875                 LWLockRelease(CommitTsControlLock);
876         }
877         else if (info == COMMIT_TS_TRUNCATE)
878         {
879                 int                     pageno;
880
881                 memcpy(&pageno, XLogRecGetData(record), sizeof(int));
882
883                 /*
884                  * During XLOG replay, latest_page_number isn't set up yet; insert a
885                  * suitable value to bypass the sanity test in SimpleLruTruncate.
886                  */
887                 CommitTsCtl->shared->latest_page_number = pageno;
888
889                 SimpleLruTruncate(CommitTsCtl, pageno);
890         }
891         else if (info == COMMIT_TS_SETTS)
892         {
893                 xl_commit_ts_set *setts = (xl_commit_ts_set *) XLogRecGetData(record);
894                 int                     nsubxids;
895                 TransactionId *subxids;
896
897                 nsubxids = ((XLogRecGetDataLen(record) - SizeOfCommitTsSet) /
898                                         sizeof(TransactionId));
899                 if (nsubxids > 0)
900                 {
901                         subxids = palloc(sizeof(TransactionId) * nsubxids);
902                         memcpy(subxids,
903                                    XLogRecGetData(record) + SizeOfCommitTsSet,
904                                    sizeof(TransactionId) * nsubxids);
905                 }
906                 else
907                         subxids = NULL;
908
909                 TransactionTreeSetCommitTsData(setts->mainxid, nsubxids, subxids,
910                                                                            setts->timestamp, setts->nodeid, false);
911                 if (subxids)
912                         pfree(subxids);
913         }
914         else
915                 elog(PANIC, "commit_ts_redo: unknown op code %u", info);
916 }