]> granicus.if.org Git - postgresql/blob - src/backend/access/transam/commit_ts.c
79ca04a6eafd285d44e2db6e6d57fc6a54990ed0
[postgresql] / src / backend / access / transam / commit_ts.c
1 /*-------------------------------------------------------------------------
2  *
3  * commit_ts.c
4  *              PostgreSQL commit timestamp manager
5  *
6  * This module is a pg_clog-like system that stores the commit timestamp
7  * for each transaction.
8  *
9  * XLOG interactions: this module generates an XLOG record whenever a new
10  * CommitTs page is initialized to zeroes.  Also, one XLOG record is
11  * generated for setting of values when the caller requests it; this allows
12  * us to support values coming from places other than transaction commit.
13  * Other writes of CommitTS come from recording of transaction commit in
14  * xact.c, which generates its own XLOG records for these events and will
15  * re-perform the status update on redo; so we need make no additional XLOG
16  * entry here.
17  *
18  * Portions Copyright (c) 1996-2015, PostgreSQL Global Development Group
19  * Portions Copyright (c) 1994, Regents of the University of California
20  *
21  * src/backend/access/transam/commit_ts.c
22  *
23  *-------------------------------------------------------------------------
24  */
25 #include "postgres.h"
26
27 #include "access/commit_ts.h"
28 #include "access/htup_details.h"
29 #include "access/slru.h"
30 #include "access/transam.h"
31 #include "catalog/pg_type.h"
32 #include "funcapi.h"
33 #include "miscadmin.h"
34 #include "pg_trace.h"
35 #include "utils/builtins.h"
36 #include "utils/snapmgr.h"
37 #include "utils/timestamp.h"
38
39 /*
40  * Defines for CommitTs page sizes.  A page is the same BLCKSZ as is used
41  * everywhere else in Postgres.
42  *
43  * Note: because TransactionIds are 32 bits and wrap around at 0xFFFFFFFF,
44  * CommitTs page numbering also wraps around at
45  * 0xFFFFFFFF/COMMIT_TS_XACTS_PER_PAGE, and CommitTs segment numbering at
46  * 0xFFFFFFFF/COMMIT_TS_XACTS_PER_PAGE/SLRU_PAGES_PER_SEGMENT.  We need take no
47  * explicit notice of that fact in this module, except when comparing segment
48  * and page numbers in TruncateCommitTs (see CommitTsPagePrecedes).
49  */
50
51 /*
52  * We need 8+2 bytes per xact.  Note that enlarging this struct might mean
53  * the largest possible file name is more than 5 chars long; see
54  * SlruScanDirectory.
55  */
56 typedef struct CommitTimestampEntry
57 {
58         TimestampTz time;
59         RepOriginId nodeid;
60 } CommitTimestampEntry;
61
62 #define SizeOfCommitTimestampEntry (offsetof(CommitTimestampEntry, nodeid) + \
63                                                                         sizeof(RepOriginId))
64
65 #define COMMIT_TS_XACTS_PER_PAGE \
66         (BLCKSZ / SizeOfCommitTimestampEntry)
67
68 #define TransactionIdToCTsPage(xid) \
69         ((xid) / (TransactionId) COMMIT_TS_XACTS_PER_PAGE)
70 #define TransactionIdToCTsEntry(xid)    \
71         ((xid) % (TransactionId) COMMIT_TS_XACTS_PER_PAGE)
72
73 /*
74  * Link to shared-memory data structures for CommitTs control
75  */
76 static SlruCtlData CommitTsCtlData;
77
78 #define CommitTsCtl (&CommitTsCtlData)
79
80 /*
81  * We keep a cache of the last value set in shared memory.  This is protected
82  * by CommitTsLock.
83  */
84 typedef struct CommitTimestampShared
85 {
86         TransactionId xidLastCommit;
87         CommitTimestampEntry dataLastCommit;
88 } CommitTimestampShared;
89
90 CommitTimestampShared *commitTsShared;
91
92
93 /* GUC variable */
94 bool            track_commit_timestamp;
95
96 /*
97  * When this is set, commit_ts is force-enabled during recovery.  This is so
98  * that a standby can replay WAL records coming from a master with the setting
99  * enabled.  (Note that this doesn't enable SQL access to the data; it's
100  * effectively write-only until the GUC itself is enabled.)
101  */
102 static bool             enable_during_recovery;
103
104 static void SetXidCommitTsInPage(TransactionId xid, int nsubxids,
105                                          TransactionId *subxids, TimestampTz ts,
106                                          RepOriginId nodeid, int pageno);
107 static void TransactionIdSetCommitTs(TransactionId xid, TimestampTz ts,
108                                                  RepOriginId nodeid, int slotno);
109 static int      ZeroCommitTsPage(int pageno, bool writeXlog);
110 static bool CommitTsPagePrecedes(int page1, int page2);
111 static void ActivateCommitTs(void);
112 static void DeactivateCommitTs(bool do_wal);
113 static void WriteZeroPageXlogRec(int pageno);
114 static void WriteTruncateXlogRec(int pageno);
115 static void WriteSetTimestampXlogRec(TransactionId mainxid, int nsubxids,
116                                                  TransactionId *subxids, TimestampTz timestamp,
117                                                  RepOriginId nodeid);
118
119 /*
120  * TransactionTreeSetCommitTsData
121  *
122  * Record the final commit timestamp of transaction entries in the commit log
123  * for a transaction and its subtransaction tree, as efficiently as possible.
124  *
125  * xid is the top level transaction id.
126  *
127  * subxids is an array of xids of length nsubxids, representing subtransactions
128  * in the tree of xid. In various cases nsubxids may be zero.
129  * The reason why tracking just the parent xid commit timestamp is not enough
130  * is that the subtrans SLRU does not stay valid across crashes (it's not
131  * permanent) so we need to keep the information about them here. If the
132  * subtrans implementation changes in the future, we might want to revisit the
133  * decision of storing timestamp info for each subxid.
134  *
135  * The write_xlog parameter tells us whether to include an XLog record of this
136  * or not.  Normally, this is called from transaction commit routines (both
137  * normal and prepared) and the information will be stored in the transaction
138  * commit XLog record, and so they should pass "false" for this.  The XLog redo
139  * code should use "false" here as well.  Other callers probably want to pass
140  * true, so that the given values persist in case of crashes.
141  */
142 void
143 TransactionTreeSetCommitTsData(TransactionId xid, int nsubxids,
144                                                            TransactionId *subxids, TimestampTz timestamp,
145                                                            RepOriginId nodeid, bool write_xlog)
146 {
147         int                     i;
148         TransactionId headxid;
149         TransactionId newestXact;
150
151         /*
152          * No-op if the module is not enabled, but allow writes in a standby
153          * during recovery.
154          */
155         if (!track_commit_timestamp && !enable_during_recovery)
156                 return;
157
158         /*
159          * Comply with the WAL-before-data rule: if caller specified it wants this
160          * value to be recorded in WAL, do so before touching the data.
161          */
162         if (write_xlog)
163                 WriteSetTimestampXlogRec(xid, nsubxids, subxids, timestamp, nodeid);
164
165         /*
166          * Figure out the latest Xid in this batch: either the last subxid if
167          * there's any, otherwise the parent xid.
168          */
169         if (nsubxids > 0)
170                 newestXact = subxids[nsubxids - 1];
171         else
172                 newestXact = xid;
173
174         /*
175          * We split the xids to set the timestamp to in groups belonging to the
176          * same SLRU page; the first element in each such set is its head.  The
177          * first group has the main XID as the head; subsequent sets use the first
178          * subxid not on the previous page as head.  This way, we only have to
179          * lock/modify each SLRU page once.
180          */
181         for (i = 0, headxid = xid;;)
182         {
183                 int                     pageno = TransactionIdToCTsPage(headxid);
184                 int                     j;
185
186                 for (j = i; j < nsubxids; j++)
187                 {
188                         if (TransactionIdToCTsPage(subxids[j]) != pageno)
189                                 break;
190                 }
191                 /* subxids[i..j] are on the same page as the head */
192
193                 SetXidCommitTsInPage(headxid, j - i, subxids + i, timestamp, nodeid,
194                                                          pageno);
195
196                 /* if we wrote out all subxids, we're done. */
197                 if (j + 1 >= nsubxids)
198                         break;
199
200                 /*
201                  * Set the new head and skip over it, as well as over the subxids we
202                  * just wrote.
203                  */
204                 headxid = subxids[j];
205                 i += j - i + 1;
206         }
207
208         /* update the cached value in shared memory */
209         LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
210         commitTsShared->xidLastCommit = xid;
211         commitTsShared->dataLastCommit.time = timestamp;
212         commitTsShared->dataLastCommit.nodeid = nodeid;
213
214         /* and move forwards our endpoint, if needed */
215         if (TransactionIdPrecedes(ShmemVariableCache->newestCommitTs, newestXact))
216                 ShmemVariableCache->newestCommitTs = newestXact;
217         LWLockRelease(CommitTsLock);
218 }
219
220 /*
221  * Record the commit timestamp of transaction entries in the commit log for all
222  * entries on a single page.  Atomic only on this page.
223  */
224 static void
225 SetXidCommitTsInPage(TransactionId xid, int nsubxids,
226                                          TransactionId *subxids, TimestampTz ts,
227                                          RepOriginId nodeid, int pageno)
228 {
229         int                     slotno;
230         int                     i;
231
232         LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE);
233
234         slotno = SimpleLruReadPage(CommitTsCtl, pageno, true, xid);
235
236         TransactionIdSetCommitTs(xid, ts, nodeid, slotno);
237         for (i = 0; i < nsubxids; i++)
238                 TransactionIdSetCommitTs(subxids[i], ts, nodeid, slotno);
239
240         CommitTsCtl->shared->page_dirty[slotno] = true;
241
242         LWLockRelease(CommitTsControlLock);
243 }
244
245 /*
246  * Sets the commit timestamp of a single transaction.
247  *
248  * Must be called with CommitTsControlLock held
249  */
250 static void
251 TransactionIdSetCommitTs(TransactionId xid, TimestampTz ts,
252                                                  RepOriginId nodeid, int slotno)
253 {
254         int                     entryno = TransactionIdToCTsEntry(xid);
255         CommitTimestampEntry entry;
256
257         Assert(TransactionIdIsNormal(xid));
258
259         entry.time = ts;
260         entry.nodeid = nodeid;
261
262         memcpy(CommitTsCtl->shared->page_buffer[slotno] +
263                    SizeOfCommitTimestampEntry * entryno,
264                    &entry, SizeOfCommitTimestampEntry);
265 }
266
267 /*
268  * Interrogate the commit timestamp of a transaction.
269  *
270  * The return value indicates whether a commit timestamp record was found for
271  * the given xid.  The timestamp value is returned in *ts (which may not be
272  * null), and the origin node for the Xid is returned in *nodeid, if it's not
273  * null.
274  */
275 bool
276 TransactionIdGetCommitTsData(TransactionId xid, TimestampTz *ts,
277                                                          RepOriginId *nodeid)
278 {
279         int                     pageno = TransactionIdToCTsPage(xid);
280         int                     entryno = TransactionIdToCTsEntry(xid);
281         int                     slotno;
282         CommitTimestampEntry entry;
283         TransactionId oldestCommitTs;
284         TransactionId newestCommitTs;
285
286         /* Error if module not enabled */
287         if (!track_commit_timestamp)
288                 ereport(ERROR,
289                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
290                                  errmsg("could not get commit timestamp data"),
291                           errhint("Make sure the configuration parameter \"%s\" is set.",
292                                           "track_commit_timestamp")));
293
294         /* error if the given Xid doesn't normally commit */
295         if (!TransactionIdIsNormal(xid))
296                 ereport(ERROR,
297                                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
298                 errmsg("cannot retrieve commit timestamp for transaction %u", xid)));
299
300         /*
301          * Return empty if the requested value is outside our valid range.
302          */
303         LWLockAcquire(CommitTsLock, LW_SHARED);
304         oldestCommitTs = ShmemVariableCache->oldestCommitTs;
305         newestCommitTs = ShmemVariableCache->newestCommitTs;
306         /* neither is invalid, or both are */
307         Assert(TransactionIdIsValid(oldestCommitTs) == TransactionIdIsValid(newestCommitTs));
308         LWLockRelease(CommitTsLock);
309
310         if (!TransactionIdIsValid(oldestCommitTs) ||
311                 TransactionIdPrecedes(xid, oldestCommitTs) ||
312                 TransactionIdPrecedes(newestCommitTs, xid))
313         {
314                 *ts = 0;
315                 if (nodeid)
316                         *nodeid = InvalidRepOriginId;
317                 return false;
318         }
319
320         /*
321          * Use an unlocked atomic read on our cached value in shared memory; if
322          * it's a hit, acquire a lock and read the data, after verifying that it's
323          * still what we initially read.  Otherwise, fall through to read from
324          * SLRU.
325          */
326         if (commitTsShared->xidLastCommit == xid)
327         {
328                 LWLockAcquire(CommitTsLock, LW_SHARED);
329                 if (commitTsShared->xidLastCommit == xid)
330                 {
331                         *ts = commitTsShared->dataLastCommit.time;
332                         if (nodeid)
333                                 *nodeid = commitTsShared->dataLastCommit.nodeid;
334
335                         LWLockRelease(CommitTsLock);
336                         return *ts != 0;
337                 }
338                 LWLockRelease(CommitTsLock);
339         }
340
341         /* lock is acquired by SimpleLruReadPage_ReadOnly */
342         slotno = SimpleLruReadPage_ReadOnly(CommitTsCtl, pageno, xid);
343         memcpy(&entry,
344                    CommitTsCtl->shared->page_buffer[slotno] +
345                    SizeOfCommitTimestampEntry * entryno,
346                    SizeOfCommitTimestampEntry);
347
348         *ts = entry.time;
349         if (nodeid)
350                 *nodeid = entry.nodeid;
351
352         LWLockRelease(CommitTsControlLock);
353         return *ts != 0;
354 }
355
356 /*
357  * Return the Xid of the latest committed transaction.  (As far as this module
358  * is concerned, anyway; it's up to the caller to ensure the value is useful
359  * for its purposes.)
360  *
361  * ts and extra are filled with the corresponding data; they can be passed
362  * as NULL if not wanted.
363  */
364 TransactionId
365 GetLatestCommitTsData(TimestampTz *ts, RepOriginId *nodeid)
366 {
367         TransactionId xid;
368
369         /* Error if module not enabled */
370         if (!track_commit_timestamp)
371                 ereport(ERROR,
372                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
373                                  errmsg("could not get commit timestamp data"),
374                           errhint("Make sure the configuration parameter \"%s\" is set.",
375                                           "track_commit_timestamp")));
376
377         LWLockAcquire(CommitTsLock, LW_SHARED);
378         xid = commitTsShared->xidLastCommit;
379         if (ts)
380                 *ts = commitTsShared->dataLastCommit.time;
381         if (nodeid)
382                 *nodeid = commitTsShared->dataLastCommit.nodeid;
383         LWLockRelease(CommitTsLock);
384
385         return xid;
386 }
387
388 /*
389  * SQL-callable wrapper to obtain commit time of a transaction
390  */
391 Datum
392 pg_xact_commit_timestamp(PG_FUNCTION_ARGS)
393 {
394         TransactionId xid = PG_GETARG_UINT32(0);
395         TimestampTz ts;
396         bool            found;
397
398         found = TransactionIdGetCommitTsData(xid, &ts, NULL);
399
400         if (!found)
401                 PG_RETURN_NULL();
402
403         PG_RETURN_TIMESTAMPTZ(ts);
404 }
405
406
407 Datum
408 pg_last_committed_xact(PG_FUNCTION_ARGS)
409 {
410         TransactionId xid;
411         TimestampTz ts;
412         Datum           values[2];
413         bool            nulls[2];
414         TupleDesc       tupdesc;
415         HeapTuple       htup;
416
417         /* and construct a tuple with our data */
418         xid = GetLatestCommitTsData(&ts, NULL);
419
420         /*
421          * Construct a tuple descriptor for the result row.  This must match this
422          * function's pg_proc entry!
423          */
424         tupdesc = CreateTemplateTupleDesc(2, false);
425         TupleDescInitEntry(tupdesc, (AttrNumber) 1, "xid",
426                                            XIDOID, -1, 0);
427         TupleDescInitEntry(tupdesc, (AttrNumber) 2, "timestamp",
428                                            TIMESTAMPTZOID, -1, 0);
429         tupdesc = BlessTupleDesc(tupdesc);
430
431         if (!TransactionIdIsNormal(xid))
432         {
433                 memset(nulls, true, sizeof(nulls));
434         }
435         else
436         {
437                 values[0] = TransactionIdGetDatum(xid);
438                 nulls[0] = false;
439
440                 values[1] = TimestampTzGetDatum(ts);
441                 nulls[1] = false;
442         }
443
444         htup = heap_form_tuple(tupdesc, values, nulls);
445
446         PG_RETURN_DATUM(HeapTupleGetDatum(htup));
447 }
448
449
450 /*
451  * Number of shared CommitTS buffers.
452  *
453  * We use a very similar logic as for the number of CLOG buffers; see comments
454  * in CLOGShmemBuffers.
455  */
456 Size
457 CommitTsShmemBuffers(void)
458 {
459         return Min(16, Max(4, NBuffers / 1024));
460 }
461
462 /*
463  * Shared memory sizing for CommitTs
464  */
465 Size
466 CommitTsShmemSize(void)
467 {
468         return SimpleLruShmemSize(CommitTsShmemBuffers(), 0) +
469                 sizeof(CommitTimestampShared);
470 }
471
472 /*
473  * Initialize CommitTs at system startup (postmaster start or standalone
474  * backend)
475  */
476 void
477 CommitTsShmemInit(void)
478 {
479         bool            found;
480
481         CommitTsCtl->PagePrecedes = CommitTsPagePrecedes;
482         SimpleLruInit(CommitTsCtl, "CommitTs Ctl", CommitTsShmemBuffers(), 0,
483                                   CommitTsControlLock, "pg_commit_ts");
484
485         commitTsShared = ShmemInitStruct("CommitTs shared",
486                                                                          sizeof(CommitTimestampShared),
487                                                                          &found);
488
489         if (!IsUnderPostmaster)
490         {
491                 Assert(!found);
492
493                 commitTsShared->xidLastCommit = InvalidTransactionId;
494                 TIMESTAMP_NOBEGIN(commitTsShared->dataLastCommit.time);
495                 commitTsShared->dataLastCommit.nodeid = InvalidRepOriginId;
496         }
497         else
498                 Assert(found);
499 }
500
501 /*
502  * This function must be called ONCE on system install.
503  *
504  * (The CommitTs directory is assumed to have been created by initdb, and
505  * CommitTsShmemInit must have been called already.)
506  */
507 void
508 BootStrapCommitTs(void)
509 {
510         /*
511          * Nothing to do here at present, unlike most other SLRU modules; segments
512          * are created when the server is started with this module enabled. See
513          * StartupCommitTs.
514          */
515 }
516
517 /*
518  * Initialize (or reinitialize) a page of CommitTs to zeroes.
519  * If writeXlog is TRUE, also emit an XLOG record saying we did this.
520  *
521  * The page is not actually written, just set up in shared memory.
522  * The slot number of the new page is returned.
523  *
524  * Control lock must be held at entry, and will be held at exit.
525  */
526 static int
527 ZeroCommitTsPage(int pageno, bool writeXlog)
528 {
529         int                     slotno;
530
531         slotno = SimpleLruZeroPage(CommitTsCtl, pageno);
532
533         if (writeXlog)
534                 WriteZeroPageXlogRec(pageno);
535
536         return slotno;
537 }
538
539 /*
540  * This must be called ONCE during postmaster or standalone-backend startup,
541  * after StartupXLOG has initialized ShmemVariableCache->nextXid.
542  *
543  * Caller may choose to enable the feature even when it is turned off in the
544  * configuration.
545  */
546 void
547 StartupCommitTs(bool force_enable)
548 {
549         /*
550          * If the module is not enabled, there's nothing to do here.  The module
551          * could still be activated from elsewhere.
552          */
553         if (track_commit_timestamp || force_enable)
554                 ActivateCommitTs();
555 }
556
557 /*
558  * This must be called ONCE during postmaster or standalone-backend startup,
559  * after recovery has finished.
560  */
561 void
562 CompleteCommitTsInitialization(void)
563 {
564         /*
565          * If the feature is not enabled, turn it off for good.  This also removes
566          * any leftover data.
567          */
568         if (!track_commit_timestamp)
569                 DeactivateCommitTs(true);
570 }
571
572 /*
573  * Activate or deactivate CommitTs' upon reception of a XLOG_PARAMETER_CHANGE
574  * XLog record in a standby.
575  */
576 void
577 CommitTsParameterChange(bool newvalue, bool oldvalue)
578 {
579         /*
580          * If the commit_ts module is disabled in this server and we get word from
581          * the master server that it is enabled there, activate it so that we can
582          * replay future WAL records involving it; also mark it as active on
583          * pg_control.  If the old value was already set, we already did this, so
584          * don't do anything.
585          *
586          * If the module is disabled in the master, disable it here too.
587          */
588         if (newvalue)
589         {
590                 if (!track_commit_timestamp && !oldvalue)
591                         ActivateCommitTs();
592         }
593         else if (oldvalue)
594                 DeactivateCommitTs(false);
595 }
596
597 /*
598  * Activate this module whenever necessary.
599  *              This must happen during postmaster or standalong-backend startup,
600  *              or during WAL replay anytime the track_commit_timestamp setting is
601  *              changed in the master.
602  *
603  * The reason why this SLRU needs separate activation/deactivation functions is
604  * that it can be enabled/disabled during start and the activation/deactivation
605  * on master is propagated to slave via replay. Other SLRUs don't have this
606  * property and they can be just initialized during normal startup.
607  *
608  * This is in charge of creating the currently active segment, if it's not
609  * already there.  The reason for this is that the server might have been
610  * running with this module disabled for a while and thus might have skipped
611  * the normal creation point.
612  */
613 static void
614 ActivateCommitTs(void)
615 {
616         TransactionId xid = ShmemVariableCache->nextXid;
617         int                     pageno = TransactionIdToCTsPage(xid);
618
619         /*
620          * Re-Initialize our idea of the latest page number.
621          */
622         LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE);
623         CommitTsCtl->shared->latest_page_number = pageno;
624         LWLockRelease(CommitTsControlLock);
625
626         /*
627          * If CommitTs is enabled, but it wasn't in the previous server run, we
628          * need to set the oldest and newest values to the next Xid; that way, we
629          * will not try to read data that might not have been set.
630          *
631          * XXX does this have a problem if a server is started with commitTs
632          * enabled, then started with commitTs disabled, then restarted with it
633          * enabled again?  It doesn't look like it does, because there should be a
634          * checkpoint that sets the value to InvalidTransactionId at end of
635          * recovery; and so any chance of injecting new transactions without
636          * CommitTs values would occur after the oldestCommitTs has been set to
637          * Invalid temporarily.
638          */
639         LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
640         if (ShmemVariableCache->oldestCommitTs == InvalidTransactionId)
641         {
642                 ShmemVariableCache->oldestCommitTs =
643                         ShmemVariableCache->newestCommitTs = ReadNewTransactionId();
644         }
645         LWLockRelease(CommitTsLock);
646
647         /* Finally, create the current segment file, if necessary */
648         if (!SimpleLruDoesPhysicalPageExist(CommitTsCtl, pageno))
649         {
650                 int                     slotno;
651
652                 LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE);
653                 slotno = ZeroCommitTsPage(pageno, false);
654                 SimpleLruWritePage(CommitTsCtl, slotno);
655                 Assert(!CommitTsCtl->shared->page_dirty[slotno]);
656                 LWLockRelease(CommitTsControlLock);
657         }
658
659         /* We can now replay xlog records from this module */
660         enable_during_recovery = true;
661 }
662
663 /*
664  * Deactivate this module.
665  *
666  * This must be called when the track_commit_timestamp parameter is turned off.
667  * This happens during postmaster or standalone-backend startup, or during WAL
668  * replay.
669  *
670  * Resets CommitTs into invalid state to make sure we don't hand back
671  * possibly-invalid data; also removes segments of old data.
672  */
673 static void
674 DeactivateCommitTs(bool do_wal)
675 {
676         TransactionId xid = ShmemVariableCache->nextXid;
677         int                     pageno = TransactionIdToCTsPage(xid);
678
679         /*
680          * Re-Initialize our idea of the latest page number.
681          */
682         LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE);
683         CommitTsCtl->shared->latest_page_number = pageno;
684         LWLockRelease(CommitTsControlLock);
685
686         LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
687         ShmemVariableCache->oldestCommitTs = InvalidTransactionId;
688         ShmemVariableCache->newestCommitTs = InvalidTransactionId;
689         LWLockRelease(CommitTsLock);
690
691         /*
692          * Remove *all* files.  This is necessary so that there are no leftover
693          * files; in the case where this feature is later enabled after running
694          * with it disabled for some time there may be a gap in the file sequence.
695          * (We can probably tolerate out-of-sequence files, as they are going to
696          * be overwritten anyway when we wrap around, but it seems better to be
697          * tidy.)
698          */
699         (void) SlruScanDirectory(CommitTsCtl, SlruScanDirCbDeleteAll, NULL);
700
701         /* No longer enabled on recovery */
702         enable_during_recovery = false;
703 }
704
705 /*
706  * This must be called ONCE during postmaster or standalone-backend shutdown
707  */
708 void
709 ShutdownCommitTs(void)
710 {
711         /* Flush dirty CommitTs pages to disk */
712         SimpleLruFlush(CommitTsCtl, false);
713 }
714
715 /*
716  * Perform a checkpoint --- either during shutdown, or on-the-fly
717  */
718 void
719 CheckPointCommitTs(void)
720 {
721         /* Flush dirty CommitTs pages to disk */
722         SimpleLruFlush(CommitTsCtl, true);
723 }
724
725 /*
726  * Make sure that CommitTs has room for a newly-allocated XID.
727  *
728  * NB: this is called while holding XidGenLock.  We want it to be very fast
729  * most of the time; even when it's not so fast, no actual I/O need happen
730  * unless we're forced to write out a dirty CommitTs or xlog page to make room
731  * in shared memory.
732  *
733  * NB: the current implementation relies on track_commit_timestamp being
734  * PGC_POSTMASTER.
735  */
736 void
737 ExtendCommitTs(TransactionId newestXact)
738 {
739         int                     pageno;
740
741         /* nothing to do if module not enabled */
742         if (!track_commit_timestamp && !enable_during_recovery)
743                 return;
744
745         /*
746          * No work except at first XID of a page.  But beware: just after
747          * wraparound, the first XID of page zero is FirstNormalTransactionId.
748          */
749         if (TransactionIdToCTsEntry(newestXact) != 0 &&
750                 !TransactionIdEquals(newestXact, FirstNormalTransactionId))
751                 return;
752
753         pageno = TransactionIdToCTsPage(newestXact);
754
755         LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE);
756
757         /* Zero the page and make an XLOG entry about it */
758         ZeroCommitTsPage(pageno, !InRecovery);
759
760         LWLockRelease(CommitTsControlLock);
761 }
762
763 /*
764  * Remove all CommitTs segments before the one holding the passed
765  * transaction ID.
766  *
767  * Note that we don't need to flush XLOG here.
768  */
769 void
770 TruncateCommitTs(TransactionId oldestXact, bool do_wal)
771 {
772         int                     cutoffPage;
773
774         /*
775          * The cutoff point is the start of the segment containing oldestXact. We
776          * pass the *page* containing oldestXact to SimpleLruTruncate.
777          */
778         cutoffPage = TransactionIdToCTsPage(oldestXact);
779
780         /* Check to see if there's any files that could be removed */
781         if (!SlruScanDirectory(CommitTsCtl, SlruScanDirCbReportPresence,
782                                                    &cutoffPage))
783                 return;                                 /* nothing to remove */
784
785         /* Write XLOG record */
786         if (do_wal)
787                 WriteTruncateXlogRec(cutoffPage);
788
789         /* Now we can remove the old CommitTs segment(s) */
790         SimpleLruTruncate(CommitTsCtl, cutoffPage);
791 }
792
793 /*
794  * Set the limit values between which commit TS can be consulted.
795  */
796 void
797 SetCommitTsLimit(TransactionId oldestXact, TransactionId newestXact)
798 {
799         /*
800          * Be careful not to overwrite values that are either further into the
801          * "future" or signal a disabled committs.
802          */
803         LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
804         if (ShmemVariableCache->oldestCommitTs != InvalidTransactionId)
805         {
806                 if (TransactionIdPrecedes(ShmemVariableCache->oldestCommitTs, oldestXact))
807                         ShmemVariableCache->oldestCommitTs = oldestXact;
808                 if (TransactionIdPrecedes(newestXact, ShmemVariableCache->newestCommitTs))
809                         ShmemVariableCache->newestCommitTs = newestXact;
810         }
811         else
812         {
813                 Assert(ShmemVariableCache->newestCommitTs == InvalidTransactionId);
814         }
815         LWLockRelease(CommitTsLock);
816 }
817
818 /*
819  * Move forwards the oldest commitTS value that can be consulted
820  */
821 void
822 AdvanceOldestCommitTs(TransactionId oldestXact)
823 {
824         LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
825         if (ShmemVariableCache->oldestCommitTs != InvalidTransactionId &&
826                 TransactionIdPrecedes(ShmemVariableCache->oldestCommitTs, oldestXact))
827                 ShmemVariableCache->oldestCommitTs = oldestXact;
828         LWLockRelease(CommitTsLock);
829 }
830
831
832 /*
833  * Decide which of two CLOG page numbers is "older" for truncation purposes.
834  *
835  * We need to use comparison of TransactionIds here in order to do the right
836  * thing with wraparound XID arithmetic.  However, if we are asked about
837  * page number zero, we don't want to hand InvalidTransactionId to
838  * TransactionIdPrecedes: it'll get weird about permanent xact IDs.  So,
839  * offset both xids by FirstNormalTransactionId to avoid that.
840  */
841 static bool
842 CommitTsPagePrecedes(int page1, int page2)
843 {
844         TransactionId xid1;
845         TransactionId xid2;
846
847         xid1 = ((TransactionId) page1) * COMMIT_TS_XACTS_PER_PAGE;
848         xid1 += FirstNormalTransactionId;
849         xid2 = ((TransactionId) page2) * COMMIT_TS_XACTS_PER_PAGE;
850         xid2 += FirstNormalTransactionId;
851
852         return TransactionIdPrecedes(xid1, xid2);
853 }
854
855
856 /*
857  * Write a ZEROPAGE xlog record
858  */
859 static void
860 WriteZeroPageXlogRec(int pageno)
861 {
862         XLogBeginInsert();
863         XLogRegisterData((char *) (&pageno), sizeof(int));
864         (void) XLogInsert(RM_COMMIT_TS_ID, COMMIT_TS_ZEROPAGE);
865 }
866
867 /*
868  * Write a TRUNCATE xlog record
869  */
870 static void
871 WriteTruncateXlogRec(int pageno)
872 {
873         XLogBeginInsert();
874         XLogRegisterData((char *) (&pageno), sizeof(int));
875         (void) XLogInsert(RM_COMMIT_TS_ID, COMMIT_TS_TRUNCATE);
876 }
877
878 /*
879  * Write a SETTS xlog record
880  */
881 static void
882 WriteSetTimestampXlogRec(TransactionId mainxid, int nsubxids,
883                                                  TransactionId *subxids, TimestampTz timestamp,
884                                                  RepOriginId nodeid)
885 {
886         xl_commit_ts_set record;
887
888         record.timestamp = timestamp;
889         record.nodeid = nodeid;
890         record.mainxid = mainxid;
891
892         XLogBeginInsert();
893         XLogRegisterData((char *) &record,
894                                          offsetof(xl_commit_ts_set, mainxid) +
895                                          sizeof(TransactionId));
896         XLogRegisterData((char *) subxids, nsubxids * sizeof(TransactionId));
897         XLogInsert(RM_COMMIT_TS_ID, COMMIT_TS_SETTS);
898 }
899
900 /*
901  * CommitTS resource manager's routines
902  */
903 void
904 commit_ts_redo(XLogReaderState *record)
905 {
906         uint8           info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
907
908         /* Backup blocks are not used in commit_ts records */
909         Assert(!XLogRecHasAnyBlockRefs(record));
910
911         if (info == COMMIT_TS_ZEROPAGE)
912         {
913                 int                     pageno;
914                 int                     slotno;
915
916                 memcpy(&pageno, XLogRecGetData(record), sizeof(int));
917
918                 LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE);
919
920                 slotno = ZeroCommitTsPage(pageno, false);
921                 SimpleLruWritePage(CommitTsCtl, slotno);
922                 Assert(!CommitTsCtl->shared->page_dirty[slotno]);
923
924                 LWLockRelease(CommitTsControlLock);
925         }
926         else if (info == COMMIT_TS_TRUNCATE)
927         {
928                 int                     pageno;
929
930                 memcpy(&pageno, XLogRecGetData(record), sizeof(int));
931
932                 /*
933                  * During XLOG replay, latest_page_number isn't set up yet; insert a
934                  * suitable value to bypass the sanity test in SimpleLruTruncate.
935                  */
936                 CommitTsCtl->shared->latest_page_number = pageno;
937
938                 SimpleLruTruncate(CommitTsCtl, pageno);
939         }
940         else if (info == COMMIT_TS_SETTS)
941         {
942                 xl_commit_ts_set *setts = (xl_commit_ts_set *) XLogRecGetData(record);
943                 int                     nsubxids;
944                 TransactionId *subxids;
945
946                 nsubxids = ((XLogRecGetDataLen(record) - SizeOfCommitTsSet) /
947                                         sizeof(TransactionId));
948                 if (nsubxids > 0)
949                 {
950                         subxids = palloc(sizeof(TransactionId) * nsubxids);
951                         memcpy(subxids,
952                                    XLogRecGetData(record) + SizeOfCommitTsSet,
953                                    sizeof(TransactionId) * nsubxids);
954                 }
955                 else
956                         subxids = NULL;
957
958                 TransactionTreeSetCommitTsData(setts->mainxid, nsubxids, subxids,
959                                                                            setts->timestamp, setts->nodeid, true);
960                 if (subxids)
961                         pfree(subxids);
962         }
963         else
964                 elog(PANIC, "commit_ts_redo: unknown op code %u", info);
965 }