]> granicus.if.org Git - postgresql/blob - src/backend/access/transam/commit_ts.c
Phase 2 of pgindent updates.
[postgresql] / src / backend / access / transam / commit_ts.c
1 /*-------------------------------------------------------------------------
2  *
3  * commit_ts.c
4  *              PostgreSQL commit timestamp manager
5  *
6  * This module is a pg_xact-like system that stores the commit timestamp
7  * for each transaction.
8  *
9  * XLOG interactions: this module generates an XLOG record whenever a new
10  * CommitTs page is initialized to zeroes.  Also, one XLOG record is
11  * generated for setting of values when the caller requests it; this allows
12  * us to support values coming from places other than transaction commit.
13  * Other writes of CommitTS come from recording of transaction commit in
14  * xact.c, which generates its own XLOG records for these events and will
15  * re-perform the status update on redo; so we need make no additional XLOG
16  * entry here.
17  *
18  * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
19  * Portions Copyright (c) 1994, Regents of the University of California
20  *
21  * src/backend/access/transam/commit_ts.c
22  *
23  *-------------------------------------------------------------------------
24  */
25 #include "postgres.h"
26
27 #include "access/commit_ts.h"
28 #include "access/htup_details.h"
29 #include "access/slru.h"
30 #include "access/transam.h"
31 #include "catalog/pg_type.h"
32 #include "funcapi.h"
33 #include "miscadmin.h"
34 #include "pg_trace.h"
35 #include "storage/shmem.h"
36 #include "utils/builtins.h"
37 #include "utils/snapmgr.h"
38 #include "utils/timestamp.h"
39
40 /*
41  * Defines for CommitTs page sizes.  A page is the same BLCKSZ as is used
42  * everywhere else in Postgres.
43  *
44  * Note: because TransactionIds are 32 bits and wrap around at 0xFFFFFFFF,
45  * CommitTs page numbering also wraps around at
46  * 0xFFFFFFFF/COMMIT_TS_XACTS_PER_PAGE, and CommitTs segment numbering at
47  * 0xFFFFFFFF/COMMIT_TS_XACTS_PER_PAGE/SLRU_PAGES_PER_SEGMENT.  We need take no
48  * explicit notice of that fact in this module, except when comparing segment
49  * and page numbers in TruncateCommitTs (see CommitTsPagePrecedes).
50  */
51
52 /*
53  * We need 8+2 bytes per xact.  Note that enlarging this struct might mean
54  * the largest possible file name is more than 5 chars long; see
55  * SlruScanDirectory.
56  */
57 typedef struct CommitTimestampEntry
58 {
59         TimestampTz time;
60         RepOriginId nodeid;
61 } CommitTimestampEntry;
62
63 #define SizeOfCommitTimestampEntry (offsetof(CommitTimestampEntry, nodeid) + \
64                                                                         sizeof(RepOriginId))
65
66 #define COMMIT_TS_XACTS_PER_PAGE \
67         (BLCKSZ / SizeOfCommitTimestampEntry)
68
69 #define TransactionIdToCTsPage(xid) \
70         ((xid) / (TransactionId) COMMIT_TS_XACTS_PER_PAGE)
71 #define TransactionIdToCTsEntry(xid)    \
72         ((xid) % (TransactionId) COMMIT_TS_XACTS_PER_PAGE)
73
74 /*
75  * Link to shared-memory data structures for CommitTs control
76  */
77 static SlruCtlData CommitTsCtlData;
78
79 #define CommitTsCtl (&CommitTsCtlData)
80
81 /*
82  * We keep a cache of the last value set in shared memory.
83  *
84  * This is also good place to keep the activation status.  We keep this
85  * separate from the GUC so that the standby can activate the module if the
86  * primary has it active independently of the value of the GUC.
87  *
88  * This is protected by CommitTsLock.  In some places, we use commitTsActive
89  * without acquiring the lock; where this happens, a comment explains the
90  * rationale for it.
91  */
92 typedef struct CommitTimestampShared
93 {
94         TransactionId xidLastCommit;
95         CommitTimestampEntry dataLastCommit;
96         bool            commitTsActive;
97 } CommitTimestampShared;
98
99 CommitTimestampShared *commitTsShared;
100
101
102 /* GUC variable */
103 bool            track_commit_timestamp;
104
105 static void SetXidCommitTsInPage(TransactionId xid, int nsubxids,
106                                          TransactionId *subxids, TimestampTz ts,
107                                          RepOriginId nodeid, int pageno);
108 static void TransactionIdSetCommitTs(TransactionId xid, TimestampTz ts,
109                                                  RepOriginId nodeid, int slotno);
110 static void error_commit_ts_disabled(void);
111 static int      ZeroCommitTsPage(int pageno, bool writeXlog);
112 static bool CommitTsPagePrecedes(int page1, int page2);
113 static void ActivateCommitTs(void);
114 static void DeactivateCommitTs(void);
115 static void WriteZeroPageXlogRec(int pageno);
116 static void WriteTruncateXlogRec(int pageno, TransactionId oldestXid);
117 static void WriteSetTimestampXlogRec(TransactionId mainxid, int nsubxids,
118                                                  TransactionId *subxids, TimestampTz timestamp,
119                                                  RepOriginId nodeid);
120
121 /*
122  * TransactionTreeSetCommitTsData
123  *
124  * Record the final commit timestamp of transaction entries in the commit log
125  * for a transaction and its subtransaction tree, as efficiently as possible.
126  *
127  * xid is the top level transaction id.
128  *
129  * subxids is an array of xids of length nsubxids, representing subtransactions
130  * in the tree of xid. In various cases nsubxids may be zero.
131  * The reason why tracking just the parent xid commit timestamp is not enough
132  * is that the subtrans SLRU does not stay valid across crashes (it's not
133  * permanent) so we need to keep the information about them here. If the
134  * subtrans implementation changes in the future, we might want to revisit the
135  * decision of storing timestamp info for each subxid.
136  *
137  * The write_xlog parameter tells us whether to include an XLog record of this
138  * or not.  Normally, this is called from transaction commit routines (both
139  * normal and prepared) and the information will be stored in the transaction
140  * commit XLog record, and so they should pass "false" for this.  The XLog redo
141  * code should use "false" here as well.  Other callers probably want to pass
142  * true, so that the given values persist in case of crashes.
143  */
144 void
145 TransactionTreeSetCommitTsData(TransactionId xid, int nsubxids,
146                                                            TransactionId *subxids, TimestampTz timestamp,
147                                                            RepOriginId nodeid, bool write_xlog)
148 {
149         int                     i;
150         TransactionId headxid;
151         TransactionId newestXact;
152
153         /*
154          * No-op if the module is not active.
155          *
156          * An unlocked read here is fine, because in a standby (the only place
157          * where the flag can change in flight) this routine is only called by the
158          * recovery process, which is also the only process which can change the
159          * flag.
160          */
161         if (!commitTsShared->commitTsActive)
162                 return;
163
164         /*
165          * Comply with the WAL-before-data rule: if caller specified it wants this
166          * value to be recorded in WAL, do so before touching the data.
167          */
168         if (write_xlog)
169                 WriteSetTimestampXlogRec(xid, nsubxids, subxids, timestamp, nodeid);
170
171         /*
172          * Figure out the latest Xid in this batch: either the last subxid if
173          * there's any, otherwise the parent xid.
174          */
175         if (nsubxids > 0)
176                 newestXact = subxids[nsubxids - 1];
177         else
178                 newestXact = xid;
179
180         /*
181          * We split the xids to set the timestamp to in groups belonging to the
182          * same SLRU page; the first element in each such set is its head.  The
183          * first group has the main XID as the head; subsequent sets use the first
184          * subxid not on the previous page as head.  This way, we only have to
185          * lock/modify each SLRU page once.
186          */
187         for (i = 0, headxid = xid;;)
188         {
189                 int                     pageno = TransactionIdToCTsPage(headxid);
190                 int                     j;
191
192                 for (j = i; j < nsubxids; j++)
193                 {
194                         if (TransactionIdToCTsPage(subxids[j]) != pageno)
195                                 break;
196                 }
197                 /* subxids[i..j] are on the same page as the head */
198
199                 SetXidCommitTsInPage(headxid, j - i, subxids + i, timestamp, nodeid,
200                                                          pageno);
201
202                 /* if we wrote out all subxids, we're done. */
203                 if (j + 1 >= nsubxids)
204                         break;
205
206                 /*
207                  * Set the new head and skip over it, as well as over the subxids we
208                  * just wrote.
209                  */
210                 headxid = subxids[j];
211                 i += j - i + 1;
212         }
213
214         /* update the cached value in shared memory */
215         LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
216         commitTsShared->xidLastCommit = xid;
217         commitTsShared->dataLastCommit.time = timestamp;
218         commitTsShared->dataLastCommit.nodeid = nodeid;
219
220         /* and move forwards our endpoint, if needed */
221         if (TransactionIdPrecedes(ShmemVariableCache->newestCommitTsXid, newestXact))
222                 ShmemVariableCache->newestCommitTsXid = newestXact;
223         LWLockRelease(CommitTsLock);
224 }
225
226 /*
227  * Record the commit timestamp of transaction entries in the commit log for all
228  * entries on a single page.  Atomic only on this page.
229  */
230 static void
231 SetXidCommitTsInPage(TransactionId xid, int nsubxids,
232                                          TransactionId *subxids, TimestampTz ts,
233                                          RepOriginId nodeid, int pageno)
234 {
235         int                     slotno;
236         int                     i;
237
238         LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE);
239
240         slotno = SimpleLruReadPage(CommitTsCtl, pageno, true, xid);
241
242         TransactionIdSetCommitTs(xid, ts, nodeid, slotno);
243         for (i = 0; i < nsubxids; i++)
244                 TransactionIdSetCommitTs(subxids[i], ts, nodeid, slotno);
245
246         CommitTsCtl->shared->page_dirty[slotno] = true;
247
248         LWLockRelease(CommitTsControlLock);
249 }
250
251 /*
252  * Sets the commit timestamp of a single transaction.
253  *
254  * Must be called with CommitTsControlLock held
255  */
256 static void
257 TransactionIdSetCommitTs(TransactionId xid, TimestampTz ts,
258                                                  RepOriginId nodeid, int slotno)
259 {
260         int                     entryno = TransactionIdToCTsEntry(xid);
261         CommitTimestampEntry entry;
262
263         Assert(TransactionIdIsNormal(xid));
264
265         entry.time = ts;
266         entry.nodeid = nodeid;
267
268         memcpy(CommitTsCtl->shared->page_buffer[slotno] +
269                    SizeOfCommitTimestampEntry * entryno,
270                    &entry, SizeOfCommitTimestampEntry);
271 }
272
273 /*
274  * Interrogate the commit timestamp of a transaction.
275  *
276  * The return value indicates whether a commit timestamp record was found for
277  * the given xid.  The timestamp value is returned in *ts (which may not be
278  * null), and the origin node for the Xid is returned in *nodeid, if it's not
279  * null.
280  */
281 bool
282 TransactionIdGetCommitTsData(TransactionId xid, TimestampTz *ts,
283                                                          RepOriginId *nodeid)
284 {
285         int                     pageno = TransactionIdToCTsPage(xid);
286         int                     entryno = TransactionIdToCTsEntry(xid);
287         int                     slotno;
288         CommitTimestampEntry entry;
289         TransactionId oldestCommitTsXid;
290         TransactionId newestCommitTsXid;
291
292         if (!TransactionIdIsValid(xid))
293                 ereport(ERROR,
294                                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
295                 errmsg("cannot retrieve commit timestamp for transaction %u", xid)));
296         else if (!TransactionIdIsNormal(xid))
297         {
298                 /* frozen and bootstrap xids are always committed far in the past */
299                 *ts = 0;
300                 if (nodeid)
301                         *nodeid = 0;
302                 return false;
303         }
304
305         LWLockAcquire(CommitTsLock, LW_SHARED);
306
307         /* Error if module not enabled */
308         if (!commitTsShared->commitTsActive)
309                 error_commit_ts_disabled();
310
311         /*
312          * If we're asked for the cached value, return that.  Otherwise, fall
313          * through to read from SLRU.
314          */
315         if (commitTsShared->xidLastCommit == xid)
316         {
317                 *ts = commitTsShared->dataLastCommit.time;
318                 if (nodeid)
319                         *nodeid = commitTsShared->dataLastCommit.nodeid;
320
321                 LWLockRelease(CommitTsLock);
322                 return *ts != 0;
323         }
324
325         oldestCommitTsXid = ShmemVariableCache->oldestCommitTsXid;
326         newestCommitTsXid = ShmemVariableCache->newestCommitTsXid;
327         /* neither is invalid, or both are */
328         Assert(TransactionIdIsValid(oldestCommitTsXid) == TransactionIdIsValid(newestCommitTsXid));
329         LWLockRelease(CommitTsLock);
330
331         /*
332          * Return empty if the requested value is outside our valid range.
333          */
334         if (!TransactionIdIsValid(oldestCommitTsXid) ||
335                 TransactionIdPrecedes(xid, oldestCommitTsXid) ||
336                 TransactionIdPrecedes(newestCommitTsXid, xid))
337         {
338                 *ts = 0;
339                 if (nodeid)
340                         *nodeid = InvalidRepOriginId;
341                 return false;
342         }
343
344         /* lock is acquired by SimpleLruReadPage_ReadOnly */
345         slotno = SimpleLruReadPage_ReadOnly(CommitTsCtl, pageno, xid);
346         memcpy(&entry,
347                    CommitTsCtl->shared->page_buffer[slotno] +
348                    SizeOfCommitTimestampEntry * entryno,
349                    SizeOfCommitTimestampEntry);
350
351         *ts = entry.time;
352         if (nodeid)
353                 *nodeid = entry.nodeid;
354
355         LWLockRelease(CommitTsControlLock);
356         return *ts != 0;
357 }
358
359 /*
360  * Return the Xid of the latest committed transaction.  (As far as this module
361  * is concerned, anyway; it's up to the caller to ensure the value is useful
362  * for its purposes.)
363  *
364  * ts and extra are filled with the corresponding data; they can be passed
365  * as NULL if not wanted.
366  */
367 TransactionId
368 GetLatestCommitTsData(TimestampTz *ts, RepOriginId *nodeid)
369 {
370         TransactionId xid;
371
372         LWLockAcquire(CommitTsLock, LW_SHARED);
373
374         /* Error if module not enabled */
375         if (!commitTsShared->commitTsActive)
376                 error_commit_ts_disabled();
377
378         xid = commitTsShared->xidLastCommit;
379         if (ts)
380                 *ts = commitTsShared->dataLastCommit.time;
381         if (nodeid)
382                 *nodeid = commitTsShared->dataLastCommit.nodeid;
383         LWLockRelease(CommitTsLock);
384
385         return xid;
386 }
387
388 static void
389 error_commit_ts_disabled(void)
390 {
391         ereport(ERROR,
392                         (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
393                          errmsg("could not get commit timestamp data"),
394                          RecoveryInProgress() ?
395                          errhint("Make sure the configuration parameter \"%s\" is set on the master server.",
396                                          "track_commit_timestamp") :
397                          errhint("Make sure the configuration parameter \"%s\" is set.",
398                                          "track_commit_timestamp")));
399 }
400
401 /*
402  * SQL-callable wrapper to obtain commit time of a transaction
403  */
404 Datum
405 pg_xact_commit_timestamp(PG_FUNCTION_ARGS)
406 {
407         TransactionId xid = PG_GETARG_UINT32(0);
408         TimestampTz ts;
409         bool            found;
410
411         found = TransactionIdGetCommitTsData(xid, &ts, NULL);
412
413         if (!found)
414                 PG_RETURN_NULL();
415
416         PG_RETURN_TIMESTAMPTZ(ts);
417 }
418
419
420 Datum
421 pg_last_committed_xact(PG_FUNCTION_ARGS)
422 {
423         TransactionId xid;
424         TimestampTz ts;
425         Datum           values[2];
426         bool            nulls[2];
427         TupleDesc       tupdesc;
428         HeapTuple       htup;
429
430         /* and construct a tuple with our data */
431         xid = GetLatestCommitTsData(&ts, NULL);
432
433         /*
434          * Construct a tuple descriptor for the result row.  This must match this
435          * function's pg_proc entry!
436          */
437         tupdesc = CreateTemplateTupleDesc(2, false);
438         TupleDescInitEntry(tupdesc, (AttrNumber) 1, "xid",
439                                            XIDOID, -1, 0);
440         TupleDescInitEntry(tupdesc, (AttrNumber) 2, "timestamp",
441                                            TIMESTAMPTZOID, -1, 0);
442         tupdesc = BlessTupleDesc(tupdesc);
443
444         if (!TransactionIdIsNormal(xid))
445         {
446                 memset(nulls, true, sizeof(nulls));
447         }
448         else
449         {
450                 values[0] = TransactionIdGetDatum(xid);
451                 nulls[0] = false;
452
453                 values[1] = TimestampTzGetDatum(ts);
454                 nulls[1] = false;
455         }
456
457         htup = heap_form_tuple(tupdesc, values, nulls);
458
459         PG_RETURN_DATUM(HeapTupleGetDatum(htup));
460 }
461
462
463 /*
464  * Number of shared CommitTS buffers.
465  *
466  * We use a very similar logic as for the number of CLOG buffers; see comments
467  * in CLOGShmemBuffers.
468  */
469 Size
470 CommitTsShmemBuffers(void)
471 {
472         return Min(16, Max(4, NBuffers / 1024));
473 }
474
475 /*
476  * Shared memory sizing for CommitTs
477  */
478 Size
479 CommitTsShmemSize(void)
480 {
481         return SimpleLruShmemSize(CommitTsShmemBuffers(), 0) +
482                 sizeof(CommitTimestampShared);
483 }
484
485 /*
486  * Initialize CommitTs at system startup (postmaster start or standalone
487  * backend)
488  */
489 void
490 CommitTsShmemInit(void)
491 {
492         bool            found;
493
494         CommitTsCtl->PagePrecedes = CommitTsPagePrecedes;
495         SimpleLruInit(CommitTsCtl, "commit_timestamp", CommitTsShmemBuffers(), 0,
496                                   CommitTsControlLock, "pg_commit_ts",
497                                   LWTRANCHE_COMMITTS_BUFFERS);
498
499         commitTsShared = ShmemInitStruct("CommitTs shared",
500                                                                          sizeof(CommitTimestampShared),
501                                                                          &found);
502
503         if (!IsUnderPostmaster)
504         {
505                 Assert(!found);
506
507                 commitTsShared->xidLastCommit = InvalidTransactionId;
508                 TIMESTAMP_NOBEGIN(commitTsShared->dataLastCommit.time);
509                 commitTsShared->dataLastCommit.nodeid = InvalidRepOriginId;
510                 commitTsShared->commitTsActive = false;
511         }
512         else
513                 Assert(found);
514 }
515
516 /*
517  * This function must be called ONCE on system install.
518  *
519  * (The CommitTs directory is assumed to have been created by initdb, and
520  * CommitTsShmemInit must have been called already.)
521  */
522 void
523 BootStrapCommitTs(void)
524 {
525         /*
526          * Nothing to do here at present, unlike most other SLRU modules; segments
527          * are created when the server is started with this module enabled. See
528          * ActivateCommitTs.
529          */
530 }
531
532 /*
533  * Initialize (or reinitialize) a page of CommitTs to zeroes.
534  * If writeXlog is TRUE, also emit an XLOG record saying we did this.
535  *
536  * The page is not actually written, just set up in shared memory.
537  * The slot number of the new page is returned.
538  *
539  * Control lock must be held at entry, and will be held at exit.
540  */
541 static int
542 ZeroCommitTsPage(int pageno, bool writeXlog)
543 {
544         int                     slotno;
545
546         slotno = SimpleLruZeroPage(CommitTsCtl, pageno);
547
548         if (writeXlog)
549                 WriteZeroPageXlogRec(pageno);
550
551         return slotno;
552 }
553
554 /*
555  * This must be called ONCE during postmaster or standalone-backend startup,
556  * after StartupXLOG has initialized ShmemVariableCache->nextXid.
557  */
558 void
559 StartupCommitTs(void)
560 {
561         ActivateCommitTs();
562 }
563
564 /*
565  * This must be called ONCE during postmaster or standalone-backend startup,
566  * after recovery has finished.
567  */
568 void
569 CompleteCommitTsInitialization(void)
570 {
571         /*
572          * If the feature is not enabled, turn it off for good.  This also removes
573          * any leftover data.
574          *
575          * Conversely, we activate the module if the feature is enabled.  This is
576          * not necessary in a master system because we already did it earlier, but
577          * if we're in a standby server that got promoted which had the feature
578          * enabled and was following a master that had the feature disabled, this
579          * is where we turn it on locally.
580          */
581         if (!track_commit_timestamp)
582                 DeactivateCommitTs();
583         else
584                 ActivateCommitTs();
585 }
586
587 /*
588  * Activate or deactivate CommitTs' upon reception of a XLOG_PARAMETER_CHANGE
589  * XLog record in a standby.
590  */
591 void
592 CommitTsParameterChange(bool newvalue, bool oldvalue)
593 {
594         /*
595          * If the commit_ts module is disabled in this server and we get word from
596          * the master server that it is enabled there, activate it so that we can
597          * replay future WAL records involving it; also mark it as active on
598          * pg_control.  If the old value was already set, we already did this, so
599          * don't do anything.
600          *
601          * If the module is disabled in the master, disable it here too, unless
602          * the module is enabled locally.
603          *
604          * Note this only runs in the recovery process, so an unlocked read is
605          * fine.
606          */
607         if (newvalue)
608         {
609                 if (!commitTsShared->commitTsActive)
610                         ActivateCommitTs();
611         }
612         else if (commitTsShared->commitTsActive)
613                 DeactivateCommitTs();
614 }
615
616 /*
617  * Activate this module whenever necessary.
618  *              This must happen during postmaster or standalone-backend startup,
619  *              or during WAL replay anytime the track_commit_timestamp setting is
620  *              changed in the master.
621  *
622  * The reason why this SLRU needs separate activation/deactivation functions is
623  * that it can be enabled/disabled during start and the activation/deactivation
624  * on master is propagated to slave via replay. Other SLRUs don't have this
625  * property and they can be just initialized during normal startup.
626  *
627  * This is in charge of creating the currently active segment, if it's not
628  * already there.  The reason for this is that the server might have been
629  * running with this module disabled for a while and thus might have skipped
630  * the normal creation point.
631  */
632 static void
633 ActivateCommitTs(void)
634 {
635         TransactionId xid;
636         int                     pageno;
637
638         /* If we've done this already, there's nothing to do */
639         LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
640         if (commitTsShared->commitTsActive)
641         {
642                 LWLockRelease(CommitTsLock);
643                 return;
644         }
645         LWLockRelease(CommitTsLock);
646
647         xid = ShmemVariableCache->nextXid;
648         pageno = TransactionIdToCTsPage(xid);
649
650         /*
651          * Re-Initialize our idea of the latest page number.
652          */
653         LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE);
654         CommitTsCtl->shared->latest_page_number = pageno;
655         LWLockRelease(CommitTsControlLock);
656
657         /*
658          * If CommitTs is enabled, but it wasn't in the previous server run, we
659          * need to set the oldest and newest values to the next Xid; that way, we
660          * will not try to read data that might not have been set.
661          *
662          * XXX does this have a problem if a server is started with commitTs
663          * enabled, then started with commitTs disabled, then restarted with it
664          * enabled again?  It doesn't look like it does, because there should be a
665          * checkpoint that sets the value to InvalidTransactionId at end of
666          * recovery; and so any chance of injecting new transactions without
667          * CommitTs values would occur after the oldestCommitTsXid has been set to
668          * Invalid temporarily.
669          */
670         LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
671         if (ShmemVariableCache->oldestCommitTsXid == InvalidTransactionId)
672         {
673                 ShmemVariableCache->oldestCommitTsXid =
674                         ShmemVariableCache->newestCommitTsXid = ReadNewTransactionId();
675         }
676         LWLockRelease(CommitTsLock);
677
678         /* Create the current segment file, if necessary */
679         if (!SimpleLruDoesPhysicalPageExist(CommitTsCtl, pageno))
680         {
681                 int                     slotno;
682
683                 LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE);
684                 slotno = ZeroCommitTsPage(pageno, false);
685                 SimpleLruWritePage(CommitTsCtl, slotno);
686                 Assert(!CommitTsCtl->shared->page_dirty[slotno]);
687                 LWLockRelease(CommitTsControlLock);
688         }
689
690         /* Change the activation status in shared memory. */
691         LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
692         commitTsShared->commitTsActive = true;
693         LWLockRelease(CommitTsLock);
694 }
695
696 /*
697  * Deactivate this module.
698  *
699  * This must be called when the track_commit_timestamp parameter is turned off.
700  * This happens during postmaster or standalone-backend startup, or during WAL
701  * replay.
702  *
703  * Resets CommitTs into invalid state to make sure we don't hand back
704  * possibly-invalid data; also removes segments of old data.
705  */
706 static void
707 DeactivateCommitTs(void)
708 {
709         /*
710          * Cleanup the status in the shared memory.
711          *
712          * We reset everything in the commitTsShared record to prevent user from
713          * getting confusing data about last committed transaction on the standby
714          * when the module was activated repeatedly on the primary.
715          */
716         LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
717
718         commitTsShared->commitTsActive = false;
719         commitTsShared->xidLastCommit = InvalidTransactionId;
720         TIMESTAMP_NOBEGIN(commitTsShared->dataLastCommit.time);
721         commitTsShared->dataLastCommit.nodeid = InvalidRepOriginId;
722
723         ShmemVariableCache->oldestCommitTsXid = InvalidTransactionId;
724         ShmemVariableCache->newestCommitTsXid = InvalidTransactionId;
725
726         LWLockRelease(CommitTsLock);
727
728         /*
729          * Remove *all* files.  This is necessary so that there are no leftover
730          * files; in the case where this feature is later enabled after running
731          * with it disabled for some time there may be a gap in the file sequence.
732          * (We can probably tolerate out-of-sequence files, as they are going to
733          * be overwritten anyway when we wrap around, but it seems better to be
734          * tidy.)
735          */
736         LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE);
737         (void) SlruScanDirectory(CommitTsCtl, SlruScanDirCbDeleteAll, NULL);
738         LWLockRelease(CommitTsControlLock);
739 }
740
741 /*
742  * This must be called ONCE during postmaster or standalone-backend shutdown
743  */
744 void
745 ShutdownCommitTs(void)
746 {
747         /* Flush dirty CommitTs pages to disk */
748         SimpleLruFlush(CommitTsCtl, false);
749
750         /*
751          * fsync pg_commit_ts to ensure that any files flushed previously are
752          * durably on disk.
753          */
754         fsync_fname("pg_commit_ts", true);
755 }
756
757 /*
758  * Perform a checkpoint --- either during shutdown, or on-the-fly
759  */
760 void
761 CheckPointCommitTs(void)
762 {
763         /* Flush dirty CommitTs pages to disk */
764         SimpleLruFlush(CommitTsCtl, true);
765
766         /*
767          * fsync pg_commit_ts to ensure that any files flushed previously are
768          * durably on disk.
769          */
770         fsync_fname("pg_commit_ts", true);
771 }
772
773 /*
774  * Make sure that CommitTs has room for a newly-allocated XID.
775  *
776  * NB: this is called while holding XidGenLock.  We want it to be very fast
777  * most of the time; even when it's not so fast, no actual I/O need happen
778  * unless we're forced to write out a dirty CommitTs or xlog page to make room
779  * in shared memory.
780  *
781  * NB: the current implementation relies on track_commit_timestamp being
782  * PGC_POSTMASTER.
783  */
784 void
785 ExtendCommitTs(TransactionId newestXact)
786 {
787         int                     pageno;
788
789         /*
790          * Nothing to do if module not enabled.  Note we do an unlocked read of
791          * the flag here, which is okay because this routine is only called from
792          * GetNewTransactionId, which is never called in a standby.
793          */
794         Assert(!InRecovery);
795         if (!commitTsShared->commitTsActive)
796                 return;
797
798         /*
799          * No work except at first XID of a page.  But beware: just after
800          * wraparound, the first XID of page zero is FirstNormalTransactionId.
801          */
802         if (TransactionIdToCTsEntry(newestXact) != 0 &&
803                 !TransactionIdEquals(newestXact, FirstNormalTransactionId))
804                 return;
805
806         pageno = TransactionIdToCTsPage(newestXact);
807
808         LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE);
809
810         /* Zero the page and make an XLOG entry about it */
811         ZeroCommitTsPage(pageno, !InRecovery);
812
813         LWLockRelease(CommitTsControlLock);
814 }
815
816 /*
817  * Remove all CommitTs segments before the one holding the passed
818  * transaction ID.
819  *
820  * Note that we don't need to flush XLOG here.
821  */
822 void
823 TruncateCommitTs(TransactionId oldestXact)
824 {
825         int                     cutoffPage;
826
827         /*
828          * The cutoff point is the start of the segment containing oldestXact. We
829          * pass the *page* containing oldestXact to SimpleLruTruncate.
830          */
831         cutoffPage = TransactionIdToCTsPage(oldestXact);
832
833         /* Check to see if there's any files that could be removed */
834         if (!SlruScanDirectory(CommitTsCtl, SlruScanDirCbReportPresence,
835                                                    &cutoffPage))
836                 return;                                 /* nothing to remove */
837
838         /* Write XLOG record */
839         WriteTruncateXlogRec(cutoffPage, oldestXact);
840
841         /* Now we can remove the old CommitTs segment(s) */
842         SimpleLruTruncate(CommitTsCtl, cutoffPage);
843 }
844
845 /*
846  * Set the limit values between which commit TS can be consulted.
847  */
848 void
849 SetCommitTsLimit(TransactionId oldestXact, TransactionId newestXact)
850 {
851         /*
852          * Be careful not to overwrite values that are either further into the
853          * "future" or signal a disabled committs.
854          */
855         LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
856         if (ShmemVariableCache->oldestCommitTsXid != InvalidTransactionId)
857         {
858                 if (TransactionIdPrecedes(ShmemVariableCache->oldestCommitTsXid, oldestXact))
859                         ShmemVariableCache->oldestCommitTsXid = oldestXact;
860                 if (TransactionIdPrecedes(newestXact, ShmemVariableCache->newestCommitTsXid))
861                         ShmemVariableCache->newestCommitTsXid = newestXact;
862         }
863         else
864         {
865                 Assert(ShmemVariableCache->newestCommitTsXid == InvalidTransactionId);
866                 ShmemVariableCache->oldestCommitTsXid = oldestXact;
867                 ShmemVariableCache->newestCommitTsXid = newestXact;
868         }
869         LWLockRelease(CommitTsLock);
870 }
871
872 /*
873  * Move forwards the oldest commitTS value that can be consulted
874  */
875 void
876 AdvanceOldestCommitTsXid(TransactionId oldestXact)
877 {
878         LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
879         if (ShmemVariableCache->oldestCommitTsXid != InvalidTransactionId &&
880         TransactionIdPrecedes(ShmemVariableCache->oldestCommitTsXid, oldestXact))
881                 ShmemVariableCache->oldestCommitTsXid = oldestXact;
882         LWLockRelease(CommitTsLock);
883 }
884
885
886 /*
887  * Decide which of two CLOG page numbers is "older" for truncation purposes.
888  *
889  * We need to use comparison of TransactionIds here in order to do the right
890  * thing with wraparound XID arithmetic.  However, if we are asked about
891  * page number zero, we don't want to hand InvalidTransactionId to
892  * TransactionIdPrecedes: it'll get weird about permanent xact IDs.  So,
893  * offset both xids by FirstNormalTransactionId to avoid that.
894  */
895 static bool
896 CommitTsPagePrecedes(int page1, int page2)
897 {
898         TransactionId xid1;
899         TransactionId xid2;
900
901         xid1 = ((TransactionId) page1) * COMMIT_TS_XACTS_PER_PAGE;
902         xid1 += FirstNormalTransactionId;
903         xid2 = ((TransactionId) page2) * COMMIT_TS_XACTS_PER_PAGE;
904         xid2 += FirstNormalTransactionId;
905
906         return TransactionIdPrecedes(xid1, xid2);
907 }
908
909
910 /*
911  * Write a ZEROPAGE xlog record
912  */
913 static void
914 WriteZeroPageXlogRec(int pageno)
915 {
916         XLogBeginInsert();
917         XLogRegisterData((char *) (&pageno), sizeof(int));
918         (void) XLogInsert(RM_COMMIT_TS_ID, COMMIT_TS_ZEROPAGE);
919 }
920
921 /*
922  * Write a TRUNCATE xlog record
923  */
924 static void
925 WriteTruncateXlogRec(int pageno, TransactionId oldestXid)
926 {
927         xl_commit_ts_truncate xlrec;
928
929         xlrec.pageno = pageno;
930         xlrec.oldestXid = oldestXid;
931
932         XLogBeginInsert();
933         XLogRegisterData((char *) (&xlrec), SizeOfCommitTsTruncate);
934         (void) XLogInsert(RM_COMMIT_TS_ID, COMMIT_TS_TRUNCATE);
935 }
936
937 /*
938  * Write a SETTS xlog record
939  */
940 static void
941 WriteSetTimestampXlogRec(TransactionId mainxid, int nsubxids,
942                                                  TransactionId *subxids, TimestampTz timestamp,
943                                                  RepOriginId nodeid)
944 {
945         xl_commit_ts_set record;
946
947         record.timestamp = timestamp;
948         record.nodeid = nodeid;
949         record.mainxid = mainxid;
950
951         XLogBeginInsert();
952         XLogRegisterData((char *) &record,
953                                          offsetof(xl_commit_ts_set, mainxid) +
954                                          sizeof(TransactionId));
955         XLogRegisterData((char *) subxids, nsubxids * sizeof(TransactionId));
956         XLogInsert(RM_COMMIT_TS_ID, COMMIT_TS_SETTS);
957 }
958
959 /*
960  * CommitTS resource manager's routines
961  */
962 void
963 commit_ts_redo(XLogReaderState *record)
964 {
965         uint8           info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
966
967         /* Backup blocks are not used in commit_ts records */
968         Assert(!XLogRecHasAnyBlockRefs(record));
969
970         if (info == COMMIT_TS_ZEROPAGE)
971         {
972                 int                     pageno;
973                 int                     slotno;
974
975                 memcpy(&pageno, XLogRecGetData(record), sizeof(int));
976
977                 LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE);
978
979                 slotno = ZeroCommitTsPage(pageno, false);
980                 SimpleLruWritePage(CommitTsCtl, slotno);
981                 Assert(!CommitTsCtl->shared->page_dirty[slotno]);
982
983                 LWLockRelease(CommitTsControlLock);
984         }
985         else if (info == COMMIT_TS_TRUNCATE)
986         {
987                 xl_commit_ts_truncate *trunc = (xl_commit_ts_truncate *) XLogRecGetData(record);
988
989                 AdvanceOldestCommitTsXid(trunc->oldestXid);
990
991                 /*
992                  * During XLOG replay, latest_page_number isn't set up yet; insert a
993                  * suitable value to bypass the sanity test in SimpleLruTruncate.
994                  */
995                 CommitTsCtl->shared->latest_page_number = trunc->pageno;
996
997                 SimpleLruTruncate(CommitTsCtl, trunc->pageno);
998         }
999         else if (info == COMMIT_TS_SETTS)
1000         {
1001                 xl_commit_ts_set *setts = (xl_commit_ts_set *) XLogRecGetData(record);
1002                 int                     nsubxids;
1003                 TransactionId *subxids;
1004
1005                 nsubxids = ((XLogRecGetDataLen(record) - SizeOfCommitTsSet) /
1006                                         sizeof(TransactionId));
1007                 if (nsubxids > 0)
1008                 {
1009                         subxids = palloc(sizeof(TransactionId) * nsubxids);
1010                         memcpy(subxids,
1011                                    XLogRecGetData(record) + SizeOfCommitTsSet,
1012                                    sizeof(TransactionId) * nsubxids);
1013                 }
1014                 else
1015                         subxids = NULL;
1016
1017                 TransactionTreeSetCommitTsData(setts->mainxid, nsubxids, subxids,
1018                                                                            setts->timestamp, setts->nodeid, true);
1019                 if (subxids)
1020                         pfree(subxids);
1021         }
1022         else
1023                 elog(PANIC, "commit_ts_redo: unknown op code %u", info);
1024 }