]> granicus.if.org Git - postgresql/blob - src/backend/access/transam/commit_ts.c
Make all built-in lwlock tranche IDs fixed.
[postgresql] / src / backend / access / transam / commit_ts.c
1 /*-------------------------------------------------------------------------
2  *
3  * commit_ts.c
4  *              PostgreSQL commit timestamp manager
5  *
6  * This module is a pg_clog-like system that stores the commit timestamp
7  * for each transaction.
8  *
9  * XLOG interactions: this module generates an XLOG record whenever a new
10  * CommitTs page is initialized to zeroes.  Also, one XLOG record is
11  * generated for setting of values when the caller requests it; this allows
12  * us to support values coming from places other than transaction commit.
13  * Other writes of CommitTS come from recording of transaction commit in
14  * xact.c, which generates its own XLOG records for these events and will
15  * re-perform the status update on redo; so we need make no additional XLOG
16  * entry here.
17  *
18  * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
19  * Portions Copyright (c) 1994, Regents of the University of California
20  *
21  * src/backend/access/transam/commit_ts.c
22  *
23  *-------------------------------------------------------------------------
24  */
25 #include "postgres.h"
26
27 #include "access/commit_ts.h"
28 #include "access/htup_details.h"
29 #include "access/slru.h"
30 #include "access/transam.h"
31 #include "catalog/pg_type.h"
32 #include "funcapi.h"
33 #include "miscadmin.h"
34 #include "pg_trace.h"
35 #include "utils/builtins.h"
36 #include "utils/snapmgr.h"
37 #include "utils/timestamp.h"
38
39 /*
40  * Defines for CommitTs page sizes.  A page is the same BLCKSZ as is used
41  * everywhere else in Postgres.
42  *
43  * Note: because TransactionIds are 32 bits and wrap around at 0xFFFFFFFF,
44  * CommitTs page numbering also wraps around at
45  * 0xFFFFFFFF/COMMIT_TS_XACTS_PER_PAGE, and CommitTs segment numbering at
46  * 0xFFFFFFFF/COMMIT_TS_XACTS_PER_PAGE/SLRU_PAGES_PER_SEGMENT.  We need take no
47  * explicit notice of that fact in this module, except when comparing segment
48  * and page numbers in TruncateCommitTs (see CommitTsPagePrecedes).
49  */
50
51 /*
52  * We need 8+2 bytes per xact.  Note that enlarging this struct might mean
53  * the largest possible file name is more than 5 chars long; see
54  * SlruScanDirectory.
55  */
56 typedef struct CommitTimestampEntry
57 {
58         TimestampTz time;
59         RepOriginId nodeid;
60 } CommitTimestampEntry;
61
62 #define SizeOfCommitTimestampEntry (offsetof(CommitTimestampEntry, nodeid) + \
63                                                                         sizeof(RepOriginId))
64
65 #define COMMIT_TS_XACTS_PER_PAGE \
66         (BLCKSZ / SizeOfCommitTimestampEntry)
67
68 #define TransactionIdToCTsPage(xid) \
69         ((xid) / (TransactionId) COMMIT_TS_XACTS_PER_PAGE)
70 #define TransactionIdToCTsEntry(xid)    \
71         ((xid) % (TransactionId) COMMIT_TS_XACTS_PER_PAGE)
72
73 /*
74  * Link to shared-memory data structures for CommitTs control
75  */
76 static SlruCtlData CommitTsCtlData;
77
78 #define CommitTsCtl (&CommitTsCtlData)
79
80 /*
81  * We keep a cache of the last value set in shared memory.
82  *
83  * This is also good place to keep the activation status.  We keep this
84  * separate from the GUC so that the standby can activate the module if the
85  * primary has it active independently of the value of the GUC.
86  *
87  * This is protected by CommitTsLock.  In some places, we use commitTsActive
88  * without acquiring the lock; where this happens, a comment explains the
89  * rationale for it.
90  */
91 typedef struct CommitTimestampShared
92 {
93         TransactionId xidLastCommit;
94         CommitTimestampEntry dataLastCommit;
95         bool    commitTsActive;
96 } CommitTimestampShared;
97
98 CommitTimestampShared *commitTsShared;
99
100
101 /* GUC variable */
102 bool            track_commit_timestamp;
103
104 static void SetXidCommitTsInPage(TransactionId xid, int nsubxids,
105                                          TransactionId *subxids, TimestampTz ts,
106                                          RepOriginId nodeid, int pageno);
107 static void TransactionIdSetCommitTs(TransactionId xid, TimestampTz ts,
108                                                  RepOriginId nodeid, int slotno);
109 static void error_commit_ts_disabled(void);
110 static int      ZeroCommitTsPage(int pageno, bool writeXlog);
111 static bool CommitTsPagePrecedes(int page1, int page2);
112 static void ActivateCommitTs(void);
113 static void DeactivateCommitTs(void);
114 static void WriteZeroPageXlogRec(int pageno);
115 static void WriteTruncateXlogRec(int pageno);
116 static void WriteSetTimestampXlogRec(TransactionId mainxid, int nsubxids,
117                                                  TransactionId *subxids, TimestampTz timestamp,
118                                                  RepOriginId nodeid);
119
120 /*
121  * TransactionTreeSetCommitTsData
122  *
123  * Record the final commit timestamp of transaction entries in the commit log
124  * for a transaction and its subtransaction tree, as efficiently as possible.
125  *
126  * xid is the top level transaction id.
127  *
128  * subxids is an array of xids of length nsubxids, representing subtransactions
129  * in the tree of xid. In various cases nsubxids may be zero.
130  * The reason why tracking just the parent xid commit timestamp is not enough
131  * is that the subtrans SLRU does not stay valid across crashes (it's not
132  * permanent) so we need to keep the information about them here. If the
133  * subtrans implementation changes in the future, we might want to revisit the
134  * decision of storing timestamp info for each subxid.
135  *
136  * The write_xlog parameter tells us whether to include an XLog record of this
137  * or not.  Normally, this is called from transaction commit routines (both
138  * normal and prepared) and the information will be stored in the transaction
139  * commit XLog record, and so they should pass "false" for this.  The XLog redo
140  * code should use "false" here as well.  Other callers probably want to pass
141  * true, so that the given values persist in case of crashes.
142  */
143 void
144 TransactionTreeSetCommitTsData(TransactionId xid, int nsubxids,
145                                                            TransactionId *subxids, TimestampTz timestamp,
146                                                            RepOriginId nodeid, bool write_xlog)
147 {
148         int                     i;
149         TransactionId headxid;
150         TransactionId newestXact;
151
152         /*
153          * No-op if the module is not active.
154          *
155          * An unlocked read here is fine, because in a standby (the only place
156          * where the flag can change in flight) this routine is only called by
157          * the recovery process, which is also the only process which can change
158          * the flag.
159          */
160         if (!commitTsShared->commitTsActive)
161                 return;
162
163         /*
164          * Comply with the WAL-before-data rule: if caller specified it wants this
165          * value to be recorded in WAL, do so before touching the data.
166          */
167         if (write_xlog)
168                 WriteSetTimestampXlogRec(xid, nsubxids, subxids, timestamp, nodeid);
169
170         /*
171          * Figure out the latest Xid in this batch: either the last subxid if
172          * there's any, otherwise the parent xid.
173          */
174         if (nsubxids > 0)
175                 newestXact = subxids[nsubxids - 1];
176         else
177                 newestXact = xid;
178
179         /*
180          * We split the xids to set the timestamp to in groups belonging to the
181          * same SLRU page; the first element in each such set is its head.  The
182          * first group has the main XID as the head; subsequent sets use the first
183          * subxid not on the previous page as head.  This way, we only have to
184          * lock/modify each SLRU page once.
185          */
186         for (i = 0, headxid = xid;;)
187         {
188                 int                     pageno = TransactionIdToCTsPage(headxid);
189                 int                     j;
190
191                 for (j = i; j < nsubxids; j++)
192                 {
193                         if (TransactionIdToCTsPage(subxids[j]) != pageno)
194                                 break;
195                 }
196                 /* subxids[i..j] are on the same page as the head */
197
198                 SetXidCommitTsInPage(headxid, j - i, subxids + i, timestamp, nodeid,
199                                                          pageno);
200
201                 /* if we wrote out all subxids, we're done. */
202                 if (j + 1 >= nsubxids)
203                         break;
204
205                 /*
206                  * Set the new head and skip over it, as well as over the subxids we
207                  * just wrote.
208                  */
209                 headxid = subxids[j];
210                 i += j - i + 1;
211         }
212
213         /* update the cached value in shared memory */
214         LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
215         commitTsShared->xidLastCommit = xid;
216         commitTsShared->dataLastCommit.time = timestamp;
217         commitTsShared->dataLastCommit.nodeid = nodeid;
218
219         /* and move forwards our endpoint, if needed */
220         if (TransactionIdPrecedes(ShmemVariableCache->newestCommitTsXid, newestXact))
221                 ShmemVariableCache->newestCommitTsXid = newestXact;
222         LWLockRelease(CommitTsLock);
223 }
224
225 /*
226  * Record the commit timestamp of transaction entries in the commit log for all
227  * entries on a single page.  Atomic only on this page.
228  */
229 static void
230 SetXidCommitTsInPage(TransactionId xid, int nsubxids,
231                                          TransactionId *subxids, TimestampTz ts,
232                                          RepOriginId nodeid, int pageno)
233 {
234         int                     slotno;
235         int                     i;
236
237         LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE);
238
239         slotno = SimpleLruReadPage(CommitTsCtl, pageno, true, xid);
240
241         TransactionIdSetCommitTs(xid, ts, nodeid, slotno);
242         for (i = 0; i < nsubxids; i++)
243                 TransactionIdSetCommitTs(subxids[i], ts, nodeid, slotno);
244
245         CommitTsCtl->shared->page_dirty[slotno] = true;
246
247         LWLockRelease(CommitTsControlLock);
248 }
249
250 /*
251  * Sets the commit timestamp of a single transaction.
252  *
253  * Must be called with CommitTsControlLock held
254  */
255 static void
256 TransactionIdSetCommitTs(TransactionId xid, TimestampTz ts,
257                                                  RepOriginId nodeid, int slotno)
258 {
259         int                     entryno = TransactionIdToCTsEntry(xid);
260         CommitTimestampEntry entry;
261
262         Assert(TransactionIdIsNormal(xid));
263
264         entry.time = ts;
265         entry.nodeid = nodeid;
266
267         memcpy(CommitTsCtl->shared->page_buffer[slotno] +
268                    SizeOfCommitTimestampEntry * entryno,
269                    &entry, SizeOfCommitTimestampEntry);
270 }
271
272 /*
273  * Interrogate the commit timestamp of a transaction.
274  *
275  * The return value indicates whether a commit timestamp record was found for
276  * the given xid.  The timestamp value is returned in *ts (which may not be
277  * null), and the origin node for the Xid is returned in *nodeid, if it's not
278  * null.
279  */
280 bool
281 TransactionIdGetCommitTsData(TransactionId xid, TimestampTz *ts,
282                                                          RepOriginId *nodeid)
283 {
284         int                     pageno = TransactionIdToCTsPage(xid);
285         int                     entryno = TransactionIdToCTsEntry(xid);
286         int                     slotno;
287         CommitTimestampEntry entry;
288         TransactionId oldestCommitTsXid;
289         TransactionId newestCommitTsXid;
290
291         /* error if the given Xid doesn't normally commit */
292         if (!TransactionIdIsNormal(xid))
293                 ereport(ERROR,
294                                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
295                 errmsg("cannot retrieve commit timestamp for transaction %u", xid)));
296
297         LWLockAcquire(CommitTsLock, LW_SHARED);
298
299         /* Error if module not enabled */
300         if (!commitTsShared->commitTsActive)
301                 error_commit_ts_disabled();
302
303         /*
304          * If we're asked for the cached value, return that.  Otherwise, fall
305          * through to read from SLRU.
306          */
307         if (commitTsShared->xidLastCommit == xid)
308         {
309                 *ts = commitTsShared->dataLastCommit.time;
310                 if (nodeid)
311                         *nodeid = commitTsShared->dataLastCommit.nodeid;
312
313                 LWLockRelease(CommitTsLock);
314                 return *ts != 0;
315         }
316
317         oldestCommitTsXid = ShmemVariableCache->oldestCommitTsXid;
318         newestCommitTsXid = ShmemVariableCache->newestCommitTsXid;
319         /* neither is invalid, or both are */
320         Assert(TransactionIdIsValid(oldestCommitTsXid) == TransactionIdIsValid(newestCommitTsXid));
321         LWLockRelease(CommitTsLock);
322
323         /*
324          * Return empty if the requested value is outside our valid range.
325          */
326         if (!TransactionIdIsValid(oldestCommitTsXid) ||
327                 TransactionIdPrecedes(xid, oldestCommitTsXid) ||
328                 TransactionIdPrecedes(newestCommitTsXid, xid))
329         {
330                 *ts = 0;
331                 if (nodeid)
332                         *nodeid = InvalidRepOriginId;
333                 return false;
334         }
335
336         /* lock is acquired by SimpleLruReadPage_ReadOnly */
337         slotno = SimpleLruReadPage_ReadOnly(CommitTsCtl, pageno, xid);
338         memcpy(&entry,
339                    CommitTsCtl->shared->page_buffer[slotno] +
340                    SizeOfCommitTimestampEntry * entryno,
341                    SizeOfCommitTimestampEntry);
342
343         *ts = entry.time;
344         if (nodeid)
345                 *nodeid = entry.nodeid;
346
347         LWLockRelease(CommitTsControlLock);
348         return *ts != 0;
349 }
350
351 /*
352  * Return the Xid of the latest committed transaction.  (As far as this module
353  * is concerned, anyway; it's up to the caller to ensure the value is useful
354  * for its purposes.)
355  *
356  * ts and extra are filled with the corresponding data; they can be passed
357  * as NULL if not wanted.
358  */
359 TransactionId
360 GetLatestCommitTsData(TimestampTz *ts, RepOriginId *nodeid)
361 {
362         TransactionId xid;
363
364         LWLockAcquire(CommitTsLock, LW_SHARED);
365
366         /* Error if module not enabled */
367         if (!commitTsShared->commitTsActive)
368                 error_commit_ts_disabled();
369
370         xid = commitTsShared->xidLastCommit;
371         if (ts)
372                 *ts = commitTsShared->dataLastCommit.time;
373         if (nodeid)
374                 *nodeid = commitTsShared->dataLastCommit.nodeid;
375         LWLockRelease(CommitTsLock);
376
377         return xid;
378 }
379
380 static void
381 error_commit_ts_disabled(void)
382 {
383         ereport(ERROR,
384                         (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
385                          errmsg("could not get commit timestamp data"),
386                          RecoveryInProgress() ?
387                          errhint("Make sure the configuration parameter \"%s\" is set on the master server.",
388                                          "track_commit_timestamp") :
389                          errhint("Make sure the configuration parameter \"%s\" is set.",
390                                          "track_commit_timestamp")));
391 }
392
393 /*
394  * SQL-callable wrapper to obtain commit time of a transaction
395  */
396 Datum
397 pg_xact_commit_timestamp(PG_FUNCTION_ARGS)
398 {
399         TransactionId xid = PG_GETARG_UINT32(0);
400         TimestampTz ts;
401         bool            found;
402
403         found = TransactionIdGetCommitTsData(xid, &ts, NULL);
404
405         if (!found)
406                 PG_RETURN_NULL();
407
408         PG_RETURN_TIMESTAMPTZ(ts);
409 }
410
411
412 Datum
413 pg_last_committed_xact(PG_FUNCTION_ARGS)
414 {
415         TransactionId xid;
416         TimestampTz ts;
417         Datum           values[2];
418         bool            nulls[2];
419         TupleDesc       tupdesc;
420         HeapTuple       htup;
421
422         /* and construct a tuple with our data */
423         xid = GetLatestCommitTsData(&ts, NULL);
424
425         /*
426          * Construct a tuple descriptor for the result row.  This must match this
427          * function's pg_proc entry!
428          */
429         tupdesc = CreateTemplateTupleDesc(2, false);
430         TupleDescInitEntry(tupdesc, (AttrNumber) 1, "xid",
431                                            XIDOID, -1, 0);
432         TupleDescInitEntry(tupdesc, (AttrNumber) 2, "timestamp",
433                                            TIMESTAMPTZOID, -1, 0);
434         tupdesc = BlessTupleDesc(tupdesc);
435
436         if (!TransactionIdIsNormal(xid))
437         {
438                 memset(nulls, true, sizeof(nulls));
439         }
440         else
441         {
442                 values[0] = TransactionIdGetDatum(xid);
443                 nulls[0] = false;
444
445                 values[1] = TimestampTzGetDatum(ts);
446                 nulls[1] = false;
447         }
448
449         htup = heap_form_tuple(tupdesc, values, nulls);
450
451         PG_RETURN_DATUM(HeapTupleGetDatum(htup));
452 }
453
454
455 /*
456  * Number of shared CommitTS buffers.
457  *
458  * We use a very similar logic as for the number of CLOG buffers; see comments
459  * in CLOGShmemBuffers.
460  */
461 Size
462 CommitTsShmemBuffers(void)
463 {
464         return Min(16, Max(4, NBuffers / 1024));
465 }
466
467 /*
468  * Shared memory sizing for CommitTs
469  */
470 Size
471 CommitTsShmemSize(void)
472 {
473         return SimpleLruShmemSize(CommitTsShmemBuffers(), 0) +
474                 sizeof(CommitTimestampShared);
475 }
476
477 /*
478  * Initialize CommitTs at system startup (postmaster start or standalone
479  * backend)
480  */
481 void
482 CommitTsShmemInit(void)
483 {
484         bool            found;
485
486         CommitTsCtl->PagePrecedes = CommitTsPagePrecedes;
487         SimpleLruInit(CommitTsCtl, "commit_timestamp", CommitTsShmemBuffers(), 0,
488                                   CommitTsControlLock, "pg_commit_ts",
489                                   LWTRANCHE_COMMITTS_BUFFERS);
490
491         commitTsShared = ShmemInitStruct("CommitTs shared",
492                                                                          sizeof(CommitTimestampShared),
493                                                                          &found);
494
495         if (!IsUnderPostmaster)
496         {
497                 Assert(!found);
498
499                 commitTsShared->xidLastCommit = InvalidTransactionId;
500                 TIMESTAMP_NOBEGIN(commitTsShared->dataLastCommit.time);
501                 commitTsShared->dataLastCommit.nodeid = InvalidRepOriginId;
502                 commitTsShared->commitTsActive = false;
503         }
504         else
505                 Assert(found);
506 }
507
508 /*
509  * This function must be called ONCE on system install.
510  *
511  * (The CommitTs directory is assumed to have been created by initdb, and
512  * CommitTsShmemInit must have been called already.)
513  */
514 void
515 BootStrapCommitTs(void)
516 {
517         /*
518          * Nothing to do here at present, unlike most other SLRU modules; segments
519          * are created when the server is started with this module enabled. See
520          * ActivateCommitTs.
521          */
522 }
523
524 /*
525  * Initialize (or reinitialize) a page of CommitTs to zeroes.
526  * If writeXlog is TRUE, also emit an XLOG record saying we did this.
527  *
528  * The page is not actually written, just set up in shared memory.
529  * The slot number of the new page is returned.
530  *
531  * Control lock must be held at entry, and will be held at exit.
532  */
533 static int
534 ZeroCommitTsPage(int pageno, bool writeXlog)
535 {
536         int                     slotno;
537
538         slotno = SimpleLruZeroPage(CommitTsCtl, pageno);
539
540         if (writeXlog)
541                 WriteZeroPageXlogRec(pageno);
542
543         return slotno;
544 }
545
546 /*
547  * This must be called ONCE during postmaster or standalone-backend startup,
548  * after StartupXLOG has initialized ShmemVariableCache->nextXid.
549  */
550 void
551 StartupCommitTs(void)
552 {
553         ActivateCommitTs();
554 }
555
556 /*
557  * This must be called ONCE during postmaster or standalone-backend startup,
558  * after recovery has finished.
559  */
560 void
561 CompleteCommitTsInitialization(void)
562 {
563         /*
564          * If the feature is not enabled, turn it off for good.  This also removes
565          * any leftover data.
566          *
567          * Conversely, we activate the module if the feature is enabled.  This is
568          * not necessary in a master system because we already did it earlier, but
569          * if we're in a standby server that got promoted which had the feature
570          * enabled and was following a master that had the feature disabled, this
571          * is where we turn it on locally.
572          */
573         if (!track_commit_timestamp)
574                 DeactivateCommitTs();
575         else
576                 ActivateCommitTs();
577 }
578
579 /*
580  * Activate or deactivate CommitTs' upon reception of a XLOG_PARAMETER_CHANGE
581  * XLog record in a standby.
582  */
583 void
584 CommitTsParameterChange(bool newvalue, bool oldvalue)
585 {
586         /*
587          * If the commit_ts module is disabled in this server and we get word from
588          * the master server that it is enabled there, activate it so that we can
589          * replay future WAL records involving it; also mark it as active on
590          * pg_control.  If the old value was already set, we already did this, so
591          * don't do anything.
592          *
593          * If the module is disabled in the master, disable it here too, unless
594          * the module is enabled locally.
595          *
596          * Note this only runs in the recovery process, so an unlocked read is
597          * fine.
598          */
599         if (newvalue)
600         {
601                 if (!commitTsShared->commitTsActive)
602                         ActivateCommitTs();
603         }
604         else if (commitTsShared->commitTsActive)
605                 DeactivateCommitTs();
606 }
607
608 /*
609  * Activate this module whenever necessary.
610  *              This must happen during postmaster or standalong-backend startup,
611  *              or during WAL replay anytime the track_commit_timestamp setting is
612  *              changed in the master.
613  *
614  * The reason why this SLRU needs separate activation/deactivation functions is
615  * that it can be enabled/disabled during start and the activation/deactivation
616  * on master is propagated to slave via replay. Other SLRUs don't have this
617  * property and they can be just initialized during normal startup.
618  *
619  * This is in charge of creating the currently active segment, if it's not
620  * already there.  The reason for this is that the server might have been
621  * running with this module disabled for a while and thus might have skipped
622  * the normal creation point.
623  */
624 static void
625 ActivateCommitTs(void)
626 {
627         TransactionId xid;
628         int                     pageno;
629
630         /* If we've done this already, there's nothing to do */
631         LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
632         if (commitTsShared->commitTsActive)
633         {
634                 LWLockRelease(CommitTsLock);
635                 return;
636         }
637         LWLockRelease(CommitTsLock);
638
639         xid = ShmemVariableCache->nextXid;
640         pageno = TransactionIdToCTsPage(xid);
641
642         /*
643          * Re-Initialize our idea of the latest page number.
644          */
645         LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE);
646         CommitTsCtl->shared->latest_page_number = pageno;
647         LWLockRelease(CommitTsControlLock);
648
649         /*
650          * If CommitTs is enabled, but it wasn't in the previous server run, we
651          * need to set the oldest and newest values to the next Xid; that way, we
652          * will not try to read data that might not have been set.
653          *
654          * XXX does this have a problem if a server is started with commitTs
655          * enabled, then started with commitTs disabled, then restarted with it
656          * enabled again?  It doesn't look like it does, because there should be a
657          * checkpoint that sets the value to InvalidTransactionId at end of
658          * recovery; and so any chance of injecting new transactions without
659          * CommitTs values would occur after the oldestCommitTsXid has been set to
660          * Invalid temporarily.
661          */
662         LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
663         if (ShmemVariableCache->oldestCommitTsXid == InvalidTransactionId)
664         {
665                 ShmemVariableCache->oldestCommitTsXid =
666                         ShmemVariableCache->newestCommitTsXid = ReadNewTransactionId();
667         }
668         LWLockRelease(CommitTsLock);
669
670         /* Create the current segment file, if necessary */
671         if (!SimpleLruDoesPhysicalPageExist(CommitTsCtl, pageno))
672         {
673                 int                     slotno;
674
675                 LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE);
676                 slotno = ZeroCommitTsPage(pageno, false);
677                 SimpleLruWritePage(CommitTsCtl, slotno);
678                 Assert(!CommitTsCtl->shared->page_dirty[slotno]);
679                 LWLockRelease(CommitTsControlLock);
680         }
681
682         /* Change the activation status in shared memory. */
683         LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
684         commitTsShared->commitTsActive = true;
685         LWLockRelease(CommitTsLock);
686 }
687
688 /*
689  * Deactivate this module.
690  *
691  * This must be called when the track_commit_timestamp parameter is turned off.
692  * This happens during postmaster or standalone-backend startup, or during WAL
693  * replay.
694  *
695  * Resets CommitTs into invalid state to make sure we don't hand back
696  * possibly-invalid data; also removes segments of old data.
697  */
698 static void
699 DeactivateCommitTs(void)
700 {
701         /*
702          * Cleanup the status in the shared memory.
703          *
704          * We reset everything in the commitTsShared record to prevent user from
705          * getting confusing data about last committed transaction on the standby
706          * when the module was activated repeatedly on the primary.
707          */
708         LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
709
710         commitTsShared->commitTsActive = false;
711         commitTsShared->xidLastCommit = InvalidTransactionId;
712         TIMESTAMP_NOBEGIN(commitTsShared->dataLastCommit.time);
713         commitTsShared->dataLastCommit.nodeid = InvalidRepOriginId;
714
715         ShmemVariableCache->oldestCommitTsXid = InvalidTransactionId;
716         ShmemVariableCache->newestCommitTsXid = InvalidTransactionId;
717
718         LWLockRelease(CommitTsLock);
719
720         /*
721          * Remove *all* files.  This is necessary so that there are no leftover
722          * files; in the case where this feature is later enabled after running
723          * with it disabled for some time there may be a gap in the file sequence.
724          * (We can probably tolerate out-of-sequence files, as they are going to
725          * be overwritten anyway when we wrap around, but it seems better to be
726          * tidy.)
727          */
728         LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE);
729         (void) SlruScanDirectory(CommitTsCtl, SlruScanDirCbDeleteAll, NULL);
730         LWLockRelease(CommitTsControlLock);
731 }
732
733 /*
734  * This must be called ONCE during postmaster or standalone-backend shutdown
735  */
736 void
737 ShutdownCommitTs(void)
738 {
739         /* Flush dirty CommitTs pages to disk */
740         SimpleLruFlush(CommitTsCtl, false);
741 }
742
743 /*
744  * Perform a checkpoint --- either during shutdown, or on-the-fly
745  */
746 void
747 CheckPointCommitTs(void)
748 {
749         /* Flush dirty CommitTs pages to disk */
750         SimpleLruFlush(CommitTsCtl, true);
751 }
752
753 /*
754  * Make sure that CommitTs has room for a newly-allocated XID.
755  *
756  * NB: this is called while holding XidGenLock.  We want it to be very fast
757  * most of the time; even when it's not so fast, no actual I/O need happen
758  * unless we're forced to write out a dirty CommitTs or xlog page to make room
759  * in shared memory.
760  *
761  * NB: the current implementation relies on track_commit_timestamp being
762  * PGC_POSTMASTER.
763  */
764 void
765 ExtendCommitTs(TransactionId newestXact)
766 {
767         int                     pageno;
768
769         /*
770          * Nothing to do if module not enabled.  Note we do an unlocked read of the
771          * flag here, which is okay because this routine is only called from
772          * GetNewTransactionId, which is never called in a standby.
773          */
774         Assert(!InRecovery);
775         if (!commitTsShared->commitTsActive)
776                 return;
777
778         /*
779          * No work except at first XID of a page.  But beware: just after
780          * wraparound, the first XID of page zero is FirstNormalTransactionId.
781          */
782         if (TransactionIdToCTsEntry(newestXact) != 0 &&
783                 !TransactionIdEquals(newestXact, FirstNormalTransactionId))
784                 return;
785
786         pageno = TransactionIdToCTsPage(newestXact);
787
788         LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE);
789
790         /* Zero the page and make an XLOG entry about it */
791         ZeroCommitTsPage(pageno, !InRecovery);
792
793         LWLockRelease(CommitTsControlLock);
794 }
795
796 /*
797  * Remove all CommitTs segments before the one holding the passed
798  * transaction ID.
799  *
800  * Note that we don't need to flush XLOG here.
801  */
802 void
803 TruncateCommitTs(TransactionId oldestXact)
804 {
805         int                     cutoffPage;
806
807         /*
808          * The cutoff point is the start of the segment containing oldestXact. We
809          * pass the *page* containing oldestXact to SimpleLruTruncate.
810          */
811         cutoffPage = TransactionIdToCTsPage(oldestXact);
812
813         /* Check to see if there's any files that could be removed */
814         if (!SlruScanDirectory(CommitTsCtl, SlruScanDirCbReportPresence,
815                                                    &cutoffPage))
816                 return;                                 /* nothing to remove */
817
818         /* Write XLOG record */
819         WriteTruncateXlogRec(cutoffPage);
820
821         /* Now we can remove the old CommitTs segment(s) */
822         SimpleLruTruncate(CommitTsCtl, cutoffPage);
823 }
824
825 /*
826  * Set the limit values between which commit TS can be consulted.
827  */
828 void
829 SetCommitTsLimit(TransactionId oldestXact, TransactionId newestXact)
830 {
831         /*
832          * Be careful not to overwrite values that are either further into the
833          * "future" or signal a disabled committs.
834          */
835         LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
836         if (ShmemVariableCache->oldestCommitTsXid != InvalidTransactionId)
837         {
838                 if (TransactionIdPrecedes(ShmemVariableCache->oldestCommitTsXid, oldestXact))
839                         ShmemVariableCache->oldestCommitTsXid = oldestXact;
840                 if (TransactionIdPrecedes(newestXact, ShmemVariableCache->newestCommitTsXid))
841                         ShmemVariableCache->newestCommitTsXid = newestXact;
842         }
843         else
844         {
845                 Assert(ShmemVariableCache->newestCommitTsXid == InvalidTransactionId);
846         }
847         LWLockRelease(CommitTsLock);
848 }
849
850 /*
851  * Move forwards the oldest commitTS value that can be consulted
852  */
853 void
854 AdvanceOldestCommitTsXid(TransactionId oldestXact)
855 {
856         LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
857         if (ShmemVariableCache->oldestCommitTsXid != InvalidTransactionId &&
858                 TransactionIdPrecedes(ShmemVariableCache->oldestCommitTsXid, oldestXact))
859                 ShmemVariableCache->oldestCommitTsXid = oldestXact;
860         LWLockRelease(CommitTsLock);
861 }
862
863
864 /*
865  * Decide which of two CLOG page numbers is "older" for truncation purposes.
866  *
867  * We need to use comparison of TransactionIds here in order to do the right
868  * thing with wraparound XID arithmetic.  However, if we are asked about
869  * page number zero, we don't want to hand InvalidTransactionId to
870  * TransactionIdPrecedes: it'll get weird about permanent xact IDs.  So,
871  * offset both xids by FirstNormalTransactionId to avoid that.
872  */
873 static bool
874 CommitTsPagePrecedes(int page1, int page2)
875 {
876         TransactionId xid1;
877         TransactionId xid2;
878
879         xid1 = ((TransactionId) page1) * COMMIT_TS_XACTS_PER_PAGE;
880         xid1 += FirstNormalTransactionId;
881         xid2 = ((TransactionId) page2) * COMMIT_TS_XACTS_PER_PAGE;
882         xid2 += FirstNormalTransactionId;
883
884         return TransactionIdPrecedes(xid1, xid2);
885 }
886
887
888 /*
889  * Write a ZEROPAGE xlog record
890  */
891 static void
892 WriteZeroPageXlogRec(int pageno)
893 {
894         XLogBeginInsert();
895         XLogRegisterData((char *) (&pageno), sizeof(int));
896         (void) XLogInsert(RM_COMMIT_TS_ID, COMMIT_TS_ZEROPAGE);
897 }
898
899 /*
900  * Write a TRUNCATE xlog record
901  */
902 static void
903 WriteTruncateXlogRec(int pageno)
904 {
905         XLogBeginInsert();
906         XLogRegisterData((char *) (&pageno), sizeof(int));
907         (void) XLogInsert(RM_COMMIT_TS_ID, COMMIT_TS_TRUNCATE);
908 }
909
910 /*
911  * Write a SETTS xlog record
912  */
913 static void
914 WriteSetTimestampXlogRec(TransactionId mainxid, int nsubxids,
915                                                  TransactionId *subxids, TimestampTz timestamp,
916                                                  RepOriginId nodeid)
917 {
918         xl_commit_ts_set record;
919
920         record.timestamp = timestamp;
921         record.nodeid = nodeid;
922         record.mainxid = mainxid;
923
924         XLogBeginInsert();
925         XLogRegisterData((char *) &record,
926                                          offsetof(xl_commit_ts_set, mainxid) +
927                                          sizeof(TransactionId));
928         XLogRegisterData((char *) subxids, nsubxids * sizeof(TransactionId));
929         XLogInsert(RM_COMMIT_TS_ID, COMMIT_TS_SETTS);
930 }
931
932 /*
933  * CommitTS resource manager's routines
934  */
935 void
936 commit_ts_redo(XLogReaderState *record)
937 {
938         uint8           info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
939
940         /* Backup blocks are not used in commit_ts records */
941         Assert(!XLogRecHasAnyBlockRefs(record));
942
943         if (info == COMMIT_TS_ZEROPAGE)
944         {
945                 int                     pageno;
946                 int                     slotno;
947
948                 memcpy(&pageno, XLogRecGetData(record), sizeof(int));
949
950                 LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE);
951
952                 slotno = ZeroCommitTsPage(pageno, false);
953                 SimpleLruWritePage(CommitTsCtl, slotno);
954                 Assert(!CommitTsCtl->shared->page_dirty[slotno]);
955
956                 LWLockRelease(CommitTsControlLock);
957         }
958         else if (info == COMMIT_TS_TRUNCATE)
959         {
960                 int                     pageno;
961
962                 memcpy(&pageno, XLogRecGetData(record), sizeof(int));
963
964                 /*
965                  * During XLOG replay, latest_page_number isn't set up yet; insert a
966                  * suitable value to bypass the sanity test in SimpleLruTruncate.
967                  */
968                 CommitTsCtl->shared->latest_page_number = pageno;
969
970                 SimpleLruTruncate(CommitTsCtl, pageno);
971         }
972         else if (info == COMMIT_TS_SETTS)
973         {
974                 xl_commit_ts_set *setts = (xl_commit_ts_set *) XLogRecGetData(record);
975                 int                     nsubxids;
976                 TransactionId *subxids;
977
978                 nsubxids = ((XLogRecGetDataLen(record) - SizeOfCommitTsSet) /
979                                         sizeof(TransactionId));
980                 if (nsubxids > 0)
981                 {
982                         subxids = palloc(sizeof(TransactionId) * nsubxids);
983                         memcpy(subxids,
984                                    XLogRecGetData(record) + SizeOfCommitTsSet,
985                                    sizeof(TransactionId) * nsubxids);
986                 }
987                 else
988                         subxids = NULL;
989
990                 TransactionTreeSetCommitTsData(setts->mainxid, nsubxids, subxids,
991                                                                            setts->timestamp, setts->nodeid, true);
992                 if (subxids)
993                         pfree(subxids);
994         }
995         else
996                 elog(PANIC, "commit_ts_redo: unknown op code %u", info);
997 }