]> granicus.if.org Git - postgresql/blob - src/backend/access/transam/commit_ts.c
Code review for transaction commit timestamps
[postgresql] / src / backend / access / transam / commit_ts.c
1 /*-------------------------------------------------------------------------
2  *
3  * commit_ts.c
4  *              PostgreSQL commit timestamp manager
5  *
6  * This module is a pg_clog-like system that stores the commit timestamp
7  * for each transaction.
8  *
9  * XLOG interactions: this module generates an XLOG record whenever a new
10  * CommitTs page is initialized to zeroes.  Also, one XLOG record is
11  * generated for setting of values when the caller requests it; this allows
12  * us to support values coming from places other than transaction commit.
13  * Other writes of CommitTS come from recording of transaction commit in
14  * xact.c, which generates its own XLOG records for these events and will
15  * re-perform the status update on redo; so we need make no additional XLOG
16  * entry here.
17  *
18  * Portions Copyright (c) 1996-2015, PostgreSQL Global Development Group
19  * Portions Copyright (c) 1994, Regents of the University of California
20  *
21  * src/backend/access/transam/commit_ts.c
22  *
23  *-------------------------------------------------------------------------
24  */
25 #include "postgres.h"
26
27 #include "access/commit_ts.h"
28 #include "access/htup_details.h"
29 #include "access/slru.h"
30 #include "access/transam.h"
31 #include "catalog/pg_type.h"
32 #include "funcapi.h"
33 #include "miscadmin.h"
34 #include "pg_trace.h"
35 #include "utils/builtins.h"
36 #include "utils/snapmgr.h"
37 #include "utils/timestamp.h"
38
39 /*
40  * Defines for CommitTs page sizes.  A page is the same BLCKSZ as is used
41  * everywhere else in Postgres.
42  *
43  * Note: because TransactionIds are 32 bits and wrap around at 0xFFFFFFFF,
44  * CommitTs page numbering also wraps around at
45  * 0xFFFFFFFF/COMMIT_TS_XACTS_PER_PAGE, and CommitTs segment numbering at
46  * 0xFFFFFFFF/COMMIT_TS_XACTS_PER_PAGE/SLRU_PAGES_PER_SEGMENT.  We need take no
47  * explicit notice of that fact in this module, except when comparing segment
48  * and page numbers in TruncateCommitTs (see CommitTsPagePrecedes).
49  */
50
51 /*
52  * We need 8+2 bytes per xact.  Note that enlarging this struct might mean
53  * the largest possible file name is more than 5 chars long; see
54  * SlruScanDirectory.
55  */
56 typedef struct CommitTimestampEntry
57 {
58         TimestampTz time;
59         RepOriginId nodeid;
60 } CommitTimestampEntry;
61
62 #define SizeOfCommitTimestampEntry (offsetof(CommitTimestampEntry, nodeid) + \
63                                                                         sizeof(RepOriginId))
64
65 #define COMMIT_TS_XACTS_PER_PAGE \
66         (BLCKSZ / SizeOfCommitTimestampEntry)
67
68 #define TransactionIdToCTsPage(xid) \
69         ((xid) / (TransactionId) COMMIT_TS_XACTS_PER_PAGE)
70 #define TransactionIdToCTsEntry(xid)    \
71         ((xid) % (TransactionId) COMMIT_TS_XACTS_PER_PAGE)
72
73 /*
74  * Link to shared-memory data structures for CommitTs control
75  */
76 static SlruCtlData CommitTsCtlData;
77
78 #define CommitTsCtl (&CommitTsCtlData)
79
80 /*
81  * We keep a cache of the last value set in shared memory.  This is protected
82  * by CommitTsLock.
83  */
84 typedef struct CommitTimestampShared
85 {
86         TransactionId xidLastCommit;
87         CommitTimestampEntry dataLastCommit;
88 } CommitTimestampShared;
89
90 CommitTimestampShared *commitTsShared;
91
92
93 /* GUC variable */
94 bool            track_commit_timestamp;
95
96 static void SetXidCommitTsInPage(TransactionId xid, int nsubxids,
97                                          TransactionId *subxids, TimestampTz ts,
98                                          RepOriginId nodeid, int pageno);
99 static void TransactionIdSetCommitTs(TransactionId xid, TimestampTz ts,
100                                                  RepOriginId nodeid, int slotno);
101 static int      ZeroCommitTsPage(int pageno, bool writeXlog);
102 static bool CommitTsPagePrecedes(int page1, int page2);
103 static void WriteZeroPageXlogRec(int pageno);
104 static void WriteTruncateXlogRec(int pageno);
105 static void WriteSetTimestampXlogRec(TransactionId mainxid, int nsubxids,
106                                                  TransactionId *subxids, TimestampTz timestamp,
107                                                  RepOriginId nodeid);
108
109 /*
110  * TransactionTreeSetCommitTsData
111  *
112  * Record the final commit timestamp of transaction entries in the commit log
113  * for a transaction and its subtransaction tree, as efficiently as possible.
114  *
115  * xid is the top level transaction id.
116  *
117  * subxids is an array of xids of length nsubxids, representing subtransactions
118  * in the tree of xid. In various cases nsubxids may be zero.
119  * The reason why tracking just the parent xid commit timestamp is not enough
120  * is that the subtrans SLRU does not stay valid across crashes (it's not
121  * permanent) so we need to keep the information about them here. If the
122  * subtrans implementation changes in the future, we might want to revisit the
123  * decision of storing timestamp info for each subxid.
124  *
125  * The replaying_xlog parameter indicates whether the module should execute
126  * its write even if the feature is nominally disabled, because we're replaying
127  * a record generated from a master where the feature is enabled.
128  *
129  * The write_xlog parameter tells us whether to include an XLog record of this
130  * or not.  Normally, this is called from transaction commit routines (both
131  * normal and prepared) and the information will be stored in the transaction
132  * commit XLog record, and so they should pass "false" for this.  The XLog redo
133  * code should use "false" here as well.  Other callers probably want to pass
134  * true, so that the given values persist in case of crashes.
135  */
136 void
137 TransactionTreeSetCommitTsData(TransactionId xid, int nsubxids,
138                                                            TransactionId *subxids, TimestampTz timestamp,
139                                                            RepOriginId nodeid,
140                                                            bool replaying_xlog, bool write_xlog)
141 {
142         int                     i;
143         TransactionId headxid;
144         TransactionId newestXact;
145
146         /* We'd better not try to write xlog during replay */
147         Assert(!(write_xlog && replaying_xlog));
148
149         /* No-op if feature not enabled, unless replaying WAL */
150         if (!track_commit_timestamp && !replaying_xlog)
151                 return;
152
153         /*
154          * Comply with the WAL-before-data rule: if caller specified it wants this
155          * value to be recorded in WAL, do so before touching the data.
156          */
157         if (write_xlog)
158                 WriteSetTimestampXlogRec(xid, nsubxids, subxids, timestamp, nodeid);
159
160         /*
161          * Figure out the latest Xid in this batch: either the last subxid if
162          * there's any, otherwise the parent xid.
163          */
164         if (nsubxids > 0)
165                 newestXact = subxids[nsubxids - 1];
166         else
167                 newestXact = xid;
168
169         /*
170          * We split the xids to set the timestamp to in groups belonging to the
171          * same SLRU page; the first element in each such set is its head.  The
172          * first group has the main XID as the head; subsequent sets use the first
173          * subxid not on the previous page as head.  This way, we only have to
174          * lock/modify each SLRU page once.
175          */
176         for (i = 0, headxid = xid;;)
177         {
178                 int                     pageno = TransactionIdToCTsPage(headxid);
179                 int                     j;
180
181                 for (j = i; j < nsubxids; j++)
182                 {
183                         if (TransactionIdToCTsPage(subxids[j]) != pageno)
184                                 break;
185                 }
186                 /* subxids[i..j] are on the same page as the head */
187
188                 SetXidCommitTsInPage(headxid, j - i, subxids + i, timestamp, nodeid,
189                                                          pageno);
190
191                 /* if we wrote out all subxids, we're done. */
192                 if (j + 1 >= nsubxids)
193                         break;
194
195                 /*
196                  * Set the new head and skip over it, as well as over the subxids we
197                  * just wrote.
198                  */
199                 headxid = subxids[j];
200                 i += j - i + 1;
201         }
202
203         /* update the cached value in shared memory */
204         LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
205         commitTsShared->xidLastCommit = xid;
206         commitTsShared->dataLastCommit.time = timestamp;
207         commitTsShared->dataLastCommit.nodeid = nodeid;
208
209         /* and move forwards our endpoint, if needed */
210         if (TransactionIdPrecedes(ShmemVariableCache->newestCommitTs, newestXact))
211                 ShmemVariableCache->newestCommitTs = newestXact;
212         LWLockRelease(CommitTsLock);
213 }
214
215 /*
216  * Record the commit timestamp of transaction entries in the commit log for all
217  * entries on a single page.  Atomic only on this page.
218  */
219 static void
220 SetXidCommitTsInPage(TransactionId xid, int nsubxids,
221                                          TransactionId *subxids, TimestampTz ts,
222                                          RepOriginId nodeid, int pageno)
223 {
224         int                     slotno;
225         int                     i;
226
227         LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE);
228
229         slotno = SimpleLruReadPage(CommitTsCtl, pageno, true, xid);
230
231         TransactionIdSetCommitTs(xid, ts, nodeid, slotno);
232         for (i = 0; i < nsubxids; i++)
233                 TransactionIdSetCommitTs(subxids[i], ts, nodeid, slotno);
234
235         CommitTsCtl->shared->page_dirty[slotno] = true;
236
237         LWLockRelease(CommitTsControlLock);
238 }
239
240 /*
241  * Sets the commit timestamp of a single transaction.
242  *
243  * Must be called with CommitTsControlLock held
244  */
245 static void
246 TransactionIdSetCommitTs(TransactionId xid, TimestampTz ts,
247                                                  RepOriginId nodeid, int slotno)
248 {
249         int                     entryno = TransactionIdToCTsEntry(xid);
250         CommitTimestampEntry entry;
251
252         Assert(TransactionIdIsNormal(xid));
253
254         entry.time = ts;
255         entry.nodeid = nodeid;
256
257         memcpy(CommitTsCtl->shared->page_buffer[slotno] +
258                    SizeOfCommitTimestampEntry * entryno,
259                    &entry, SizeOfCommitTimestampEntry);
260 }
261
262 /*
263  * Interrogate the commit timestamp of a transaction.
264  *
265  * The return value indicates whether a commit timestamp record was found for
266  * the given xid.  The timestamp value is returned in *ts (which may not be
267  * null), and the origin node for the Xid is returned in *nodeid, if it's not
268  * null.
269  */
270 bool
271 TransactionIdGetCommitTsData(TransactionId xid, TimestampTz *ts,
272                                                          RepOriginId *nodeid)
273 {
274         int                     pageno = TransactionIdToCTsPage(xid);
275         int                     entryno = TransactionIdToCTsEntry(xid);
276         int                     slotno;
277         CommitTimestampEntry entry;
278         TransactionId oldestCommitTs;
279         TransactionId newestCommitTs;
280
281         /* Error if module not enabled */
282         if (!track_commit_timestamp)
283                 ereport(ERROR,
284                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
285                                  errmsg("could not get commit timestamp data"),
286                           errhint("Make sure the configuration parameter \"%s\" is set.",
287                                           "track_commit_timestamp")));
288
289         /* error if the given Xid doesn't normally commit */
290         if (!TransactionIdIsNormal(xid))
291                 ereport(ERROR,
292                                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
293                 errmsg("cannot retrieve commit timestamp for transaction %u", xid)));
294
295         /*
296          * Return empty if the requested value is outside our valid range.
297          */
298         LWLockAcquire(CommitTsLock, LW_SHARED);
299         oldestCommitTs = ShmemVariableCache->oldestCommitTs;
300         newestCommitTs = ShmemVariableCache->newestCommitTs;
301         /* neither is invalid, or both are */
302         Assert(TransactionIdIsValid(oldestCommitTs) == TransactionIdIsValid(newestCommitTs));
303         LWLockRelease(CommitTsLock);
304
305         if (!TransactionIdIsValid(oldestCommitTs) ||
306                 TransactionIdPrecedes(xid, oldestCommitTs) ||
307                 TransactionIdPrecedes(newestCommitTs, xid))
308         {
309                 *ts = 0;
310                 if (nodeid)
311                         *nodeid = InvalidRepOriginId;
312                 return false;
313         }
314
315         /*
316          * Use an unlocked atomic read on our cached value in shared memory; if
317          * it's a hit, acquire a lock and read the data, after verifying that it's
318          * still what we initially read.  Otherwise, fall through to read from
319          * SLRU.
320          */
321         if (commitTsShared->xidLastCommit == xid)
322         {
323                 LWLockAcquire(CommitTsLock, LW_SHARED);
324                 if (commitTsShared->xidLastCommit == xid)
325                 {
326                         *ts = commitTsShared->dataLastCommit.time;
327                         if (nodeid)
328                                 *nodeid = commitTsShared->dataLastCommit.nodeid;
329
330                         LWLockRelease(CommitTsLock);
331                         return *ts != 0;
332                 }
333                 LWLockRelease(CommitTsLock);
334         }
335
336         /* lock is acquired by SimpleLruReadPage_ReadOnly */
337         slotno = SimpleLruReadPage_ReadOnly(CommitTsCtl, pageno, xid);
338         memcpy(&entry,
339                    CommitTsCtl->shared->page_buffer[slotno] +
340                    SizeOfCommitTimestampEntry * entryno,
341                    SizeOfCommitTimestampEntry);
342
343         *ts = entry.time;
344         if (nodeid)
345                 *nodeid = entry.nodeid;
346
347         LWLockRelease(CommitTsControlLock);
348         return *ts != 0;
349 }
350
351 /*
352  * Return the Xid of the latest committed transaction.  (As far as this module
353  * is concerned, anyway; it's up to the caller to ensure the value is useful
354  * for its purposes.)
355  *
356  * ts and extra are filled with the corresponding data; they can be passed
357  * as NULL if not wanted.
358  */
359 TransactionId
360 GetLatestCommitTsData(TimestampTz *ts, RepOriginId *nodeid)
361 {
362         TransactionId xid;
363
364         /* Error if module not enabled */
365         if (!track_commit_timestamp)
366                 ereport(ERROR,
367                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
368                                  errmsg("could not get commit timestamp data"),
369                           errhint("Make sure the configuration parameter \"%s\" is set.",
370                                           "track_commit_timestamp")));
371
372         LWLockAcquire(CommitTsLock, LW_SHARED);
373         xid = commitTsShared->xidLastCommit;
374         if (ts)
375                 *ts = commitTsShared->dataLastCommit.time;
376         if (nodeid)
377                 *nodeid = commitTsShared->dataLastCommit.nodeid;
378         LWLockRelease(CommitTsLock);
379
380         return xid;
381 }
382
383 /*
384  * SQL-callable wrapper to obtain commit time of a transaction
385  */
386 Datum
387 pg_xact_commit_timestamp(PG_FUNCTION_ARGS)
388 {
389         TransactionId xid = PG_GETARG_UINT32(0);
390         TimestampTz ts;
391         bool            found;
392
393         found = TransactionIdGetCommitTsData(xid, &ts, NULL);
394
395         if (!found)
396                 PG_RETURN_NULL();
397
398         PG_RETURN_TIMESTAMPTZ(ts);
399 }
400
401
402 Datum
403 pg_last_committed_xact(PG_FUNCTION_ARGS)
404 {
405         TransactionId xid;
406         TimestampTz ts;
407         Datum           values[2];
408         bool            nulls[2];
409         TupleDesc       tupdesc;
410         HeapTuple       htup;
411
412         /* and construct a tuple with our data */
413         xid = GetLatestCommitTsData(&ts, NULL);
414
415         /*
416          * Construct a tuple descriptor for the result row.  This must match this
417          * function's pg_proc entry!
418          */
419         tupdesc = CreateTemplateTupleDesc(2, false);
420         TupleDescInitEntry(tupdesc, (AttrNumber) 1, "xid",
421                                            XIDOID, -1, 0);
422         TupleDescInitEntry(tupdesc, (AttrNumber) 2, "timestamp",
423                                            TIMESTAMPTZOID, -1, 0);
424         tupdesc = BlessTupleDesc(tupdesc);
425
426         if (!TransactionIdIsNormal(xid))
427         {
428                 memset(nulls, true, sizeof(nulls));
429         }
430         else
431         {
432                 values[0] = TransactionIdGetDatum(xid);
433                 nulls[0] = false;
434
435                 values[1] = TimestampTzGetDatum(ts);
436                 nulls[1] = false;
437         }
438
439         htup = heap_form_tuple(tupdesc, values, nulls);
440
441         PG_RETURN_DATUM(HeapTupleGetDatum(htup));
442 }
443
444
445 /*
446  * Number of shared CommitTS buffers.
447  *
448  * We use a very similar logic as for the number of CLOG buffers; see comments
449  * in CLOGShmemBuffers.
450  */
451 Size
452 CommitTsShmemBuffers(void)
453 {
454         return Min(16, Max(4, NBuffers / 1024));
455 }
456
457 /*
458  * Shared memory sizing for CommitTs
459  */
460 Size
461 CommitTsShmemSize(void)
462 {
463         return SimpleLruShmemSize(CommitTsShmemBuffers(), 0) +
464                 sizeof(CommitTimestampShared);
465 }
466
467 /*
468  * Initialize CommitTs at system startup (postmaster start or standalone
469  * backend)
470  */
471 void
472 CommitTsShmemInit(void)
473 {
474         bool            found;
475
476         CommitTsCtl->PagePrecedes = CommitTsPagePrecedes;
477         SimpleLruInit(CommitTsCtl, "CommitTs Ctl", CommitTsShmemBuffers(), 0,
478                                   CommitTsControlLock, "pg_commit_ts");
479
480         commitTsShared = ShmemInitStruct("CommitTs shared",
481                                                                          sizeof(CommitTimestampShared),
482                                                                          &found);
483
484         if (!IsUnderPostmaster)
485         {
486                 Assert(!found);
487
488                 commitTsShared->xidLastCommit = InvalidTransactionId;
489                 TIMESTAMP_NOBEGIN(commitTsShared->dataLastCommit.time);
490                 commitTsShared->dataLastCommit.nodeid = InvalidRepOriginId;
491         }
492         else
493                 Assert(found);
494 }
495
496 /*
497  * This function must be called ONCE on system install.
498  *
499  * (The CommitTs directory is assumed to have been created by initdb, and
500  * CommitTsShmemInit must have been called already.)
501  */
502 void
503 BootStrapCommitTs(void)
504 {
505         /*
506          * Nothing to do here at present, unlike most other SLRU modules; segments
507          * are created when the server is started with this module enabled. See
508          * StartupCommitTs.
509          */
510 }
511
512 /*
513  * Initialize (or reinitialize) a page of CommitTs to zeroes.
514  * If writeXlog is TRUE, also emit an XLOG record saying we did this.
515  *
516  * The page is not actually written, just set up in shared memory.
517  * The slot number of the new page is returned.
518  *
519  * Control lock must be held at entry, and will be held at exit.
520  */
521 static int
522 ZeroCommitTsPage(int pageno, bool writeXlog)
523 {
524         int                     slotno;
525
526         slotno = SimpleLruZeroPage(CommitTsCtl, pageno);
527
528         if (writeXlog)
529                 WriteZeroPageXlogRec(pageno);
530
531         return slotno;
532 }
533
534 /*
535  * This must be called ONCE during postmaster or standalone-backend startup,
536  * after StartupXLOG has initialized ShmemVariableCache->nextXid.
537  */
538 void
539 StartupCommitTs(void)
540 {
541         TransactionId xid = ShmemVariableCache->nextXid;
542         int                     pageno = TransactionIdToCTsPage(xid);
543
544         if (track_commit_timestamp)
545         {
546                 ActivateCommitTs();
547                 return;
548         }
549
550         LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE);
551
552         /*
553          * Initialize our idea of the latest page number.
554          */
555         CommitTsCtl->shared->latest_page_number = pageno;
556
557         LWLockRelease(CommitTsControlLock);
558 }
559
560 /*
561  * This must be called ONCE during postmaster or standalone-backend startup,
562  * when commit timestamp is enabled, after recovery has finished.
563  */
564 void
565 CompleteCommitTsInitialization(void)
566 {
567         if (!track_commit_timestamp)
568                 DeactivateCommitTs(true);
569 }
570
571 /*
572  * Activate this module whenever necessary.
573  *              This must happen during postmaster or standalong-backend startup,
574  *              or during WAL replay anytime the track_commit_timestamp setting is
575  *              changed in the master.
576  *
577  * The reason why this SLRU needs separate activation/deactivation functions is
578  * that it can be enabled/disabled during start and the activation/deactivation
579  * on master is propagated to slave via replay. Other SLRUs don't have this
580  * property and they can be just initialized during normal startup.
581  *
582  * This is in charge of creating the currently active segment, if it's not
583  * already there.  The reason for this is that the server might have been
584  * running with this module disabled for a while and thus might have skipped
585  * the normal creation point.
586  */
587 void
588 ActivateCommitTs(void)
589 {
590         TransactionId xid = ShmemVariableCache->nextXid;
591         int                     pageno = TransactionIdToCTsPage(xid);
592
593         /*
594          * Re-Initialize our idea of the latest page number.
595          */
596         LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE);
597         CommitTsCtl->shared->latest_page_number = pageno;
598         LWLockRelease(CommitTsControlLock);
599
600         /*
601          * If CommitTs is enabled, but it wasn't in the previous server run, we
602          * need to set the oldest and newest values to the next Xid; that way, we
603          * will not try to read data that might not have been set.
604          *
605          * XXX does this have a problem if a server is started with commitTs
606          * enabled, then started with commitTs disabled, then restarted with it
607          * enabled again?  It doesn't look like it does, because there should be a
608          * checkpoint that sets the value to InvalidTransactionId at end of
609          * recovery; and so any chance of injecting new transactions without
610          * CommitTs values would occur after the oldestCommitTs has been set to
611          * Invalid temporarily.
612          */
613         LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
614         if (ShmemVariableCache->oldestCommitTs == InvalidTransactionId)
615         {
616                 ShmemVariableCache->oldestCommitTs =
617                         ShmemVariableCache->newestCommitTs = ReadNewTransactionId();
618         }
619         LWLockRelease(CommitTsLock);
620
621         /* Finally, create the current segment file, if necessary */
622         if (!SimpleLruDoesPhysicalPageExist(CommitTsCtl, pageno))
623         {
624                 int                     slotno;
625
626                 LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE);
627                 slotno = ZeroCommitTsPage(pageno, false);
628                 SimpleLruWritePage(CommitTsCtl, slotno);
629                 Assert(!CommitTsCtl->shared->page_dirty[slotno]);
630                 LWLockRelease(CommitTsControlLock);
631         }
632 }
633
634 /*
635  * Deactivate this module.
636  *
637  * This must be called when the track_commit_timestamp parameter is turned off.
638  * This happens during postmaster or standalone-backend startup, or during WAL
639  * replay.
640  *
641  * Resets CommitTs into invalid state to make sure we don't hand back
642  * possibly-invalid data; also removes segments of old data.
643  */
644 void
645 DeactivateCommitTs(bool do_wal)
646 {
647         TransactionId xid = ShmemVariableCache->nextXid;
648         int                     pageno = TransactionIdToCTsPage(xid);
649
650         /*
651          * Re-Initialize our idea of the latest page number.
652          */
653         LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE);
654         CommitTsCtl->shared->latest_page_number = pageno;
655         LWLockRelease(CommitTsControlLock);
656
657         LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
658         ShmemVariableCache->oldestCommitTs = InvalidTransactionId;
659         ShmemVariableCache->newestCommitTs = InvalidTransactionId;
660         LWLockRelease(CommitTsLock);
661
662         TruncateCommitTs(ReadNewTransactionId(), do_wal);
663 }
664
665 /*
666  * This must be called ONCE during postmaster or standalone-backend shutdown
667  */
668 void
669 ShutdownCommitTs(void)
670 {
671         /* Flush dirty CommitTs pages to disk */
672         SimpleLruFlush(CommitTsCtl, false);
673 }
674
675 /*
676  * Perform a checkpoint --- either during shutdown, or on-the-fly
677  */
678 void
679 CheckPointCommitTs(void)
680 {
681         /* Flush dirty CommitTs pages to disk */
682         SimpleLruFlush(CommitTsCtl, true);
683 }
684
685 /*
686  * Make sure that CommitTs has room for a newly-allocated XID.
687  *
688  * NB: this is called while holding XidGenLock.  We want it to be very fast
689  * most of the time; even when it's not so fast, no actual I/O need happen
690  * unless we're forced to write out a dirty CommitTs or xlog page to make room
691  * in shared memory.
692  *
693  * NB: the current implementation relies on track_commit_timestamp being
694  * PGC_POSTMASTER.
695  */
696 void
697 ExtendCommitTs(TransactionId newestXact)
698 {
699         int                     pageno;
700
701         /* nothing to do if module not enabled */
702         if (!track_commit_timestamp)
703                 return;
704
705         /*
706          * No work except at first XID of a page.  But beware: just after
707          * wraparound, the first XID of page zero is FirstNormalTransactionId.
708          */
709         if (TransactionIdToCTsEntry(newestXact) != 0 &&
710                 !TransactionIdEquals(newestXact, FirstNormalTransactionId))
711                 return;
712
713         pageno = TransactionIdToCTsPage(newestXact);
714
715         LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE);
716
717         /* Zero the page and make an XLOG entry about it */
718         ZeroCommitTsPage(pageno, !InRecovery);
719
720         LWLockRelease(CommitTsControlLock);
721 }
722
723 /*
724  * Remove all CommitTs segments before the one holding the passed
725  * transaction ID.
726  *
727  * Note that we don't need to flush XLOG here.
728  */
729 void
730 TruncateCommitTs(TransactionId oldestXact, bool do_wal)
731 {
732         int                     cutoffPage;
733
734         /*
735          * The cutoff point is the start of the segment containing oldestXact. We
736          * pass the *page* containing oldestXact to SimpleLruTruncate.
737          */
738         cutoffPage = TransactionIdToCTsPage(oldestXact);
739
740         /* Check to see if there's any files that could be removed */
741         if (!SlruScanDirectory(CommitTsCtl, SlruScanDirCbReportPresence,
742                                                    &cutoffPage))
743                 return;                                 /* nothing to remove */
744
745         /* Write XLOG record */
746         if (do_wal)
747                 WriteTruncateXlogRec(cutoffPage);
748
749         /* Now we can remove the old CommitTs segment(s) */
750         SimpleLruTruncate(CommitTsCtl, cutoffPage);
751 }
752
753 /*
754  * Set the limit values between which commit TS can be consulted.
755  */
756 void
757 SetCommitTsLimit(TransactionId oldestXact, TransactionId newestXact)
758 {
759         /*
760          * Be careful not to overwrite values that are either further into the
761          * "future" or signal a disabled committs.
762          */
763         LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
764         if (ShmemVariableCache->oldestCommitTs != InvalidTransactionId)
765         {
766                 if (TransactionIdPrecedes(ShmemVariableCache->oldestCommitTs, oldestXact))
767                         ShmemVariableCache->oldestCommitTs = oldestXact;
768                 if (TransactionIdPrecedes(newestXact, ShmemVariableCache->newestCommitTs))
769                         ShmemVariableCache->newestCommitTs = newestXact;
770         }
771         else
772         {
773                 Assert(ShmemVariableCache->newestCommitTs == InvalidTransactionId);
774         }
775         LWLockRelease(CommitTsLock);
776 }
777
778 /*
779  * Move forwards the oldest commitTS value that can be consulted
780  */
781 void
782 AdvanceOldestCommitTs(TransactionId oldestXact)
783 {
784         LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
785         if (ShmemVariableCache->oldestCommitTs != InvalidTransactionId &&
786                 TransactionIdPrecedes(ShmemVariableCache->oldestCommitTs, oldestXact))
787                 ShmemVariableCache->oldestCommitTs = oldestXact;
788         LWLockRelease(CommitTsLock);
789 }
790
791
792 /*
793  * Decide which of two CLOG page numbers is "older" for truncation purposes.
794  *
795  * We need to use comparison of TransactionIds here in order to do the right
796  * thing with wraparound XID arithmetic.  However, if we are asked about
797  * page number zero, we don't want to hand InvalidTransactionId to
798  * TransactionIdPrecedes: it'll get weird about permanent xact IDs.  So,
799  * offset both xids by FirstNormalTransactionId to avoid that.
800  */
801 static bool
802 CommitTsPagePrecedes(int page1, int page2)
803 {
804         TransactionId xid1;
805         TransactionId xid2;
806
807         xid1 = ((TransactionId) page1) * COMMIT_TS_XACTS_PER_PAGE;
808         xid1 += FirstNormalTransactionId;
809         xid2 = ((TransactionId) page2) * COMMIT_TS_XACTS_PER_PAGE;
810         xid2 += FirstNormalTransactionId;
811
812         return TransactionIdPrecedes(xid1, xid2);
813 }
814
815
816 /*
817  * Write a ZEROPAGE xlog record
818  */
819 static void
820 WriteZeroPageXlogRec(int pageno)
821 {
822         XLogBeginInsert();
823         XLogRegisterData((char *) (&pageno), sizeof(int));
824         (void) XLogInsert(RM_COMMIT_TS_ID, COMMIT_TS_ZEROPAGE);
825 }
826
827 /*
828  * Write a TRUNCATE xlog record
829  */
830 static void
831 WriteTruncateXlogRec(int pageno)
832 {
833         XLogBeginInsert();
834         XLogRegisterData((char *) (&pageno), sizeof(int));
835         (void) XLogInsert(RM_COMMIT_TS_ID, COMMIT_TS_TRUNCATE);
836 }
837
838 /*
839  * Write a SETTS xlog record
840  */
841 static void
842 WriteSetTimestampXlogRec(TransactionId mainxid, int nsubxids,
843                                                  TransactionId *subxids, TimestampTz timestamp,
844                                                  RepOriginId nodeid)
845 {
846         xl_commit_ts_set record;
847
848         record.timestamp = timestamp;
849         record.nodeid = nodeid;
850         record.mainxid = mainxid;
851
852         XLogBeginInsert();
853         XLogRegisterData((char *) &record,
854                                          offsetof(xl_commit_ts_set, mainxid) +
855                                          sizeof(TransactionId));
856         XLogRegisterData((char *) subxids, nsubxids * sizeof(TransactionId));
857         XLogInsert(RM_COMMIT_TS_ID, COMMIT_TS_SETTS);
858 }
859
860 /*
861  * CommitTS resource manager's routines
862  */
863 void
864 commit_ts_redo(XLogReaderState *record)
865 {
866         uint8           info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
867
868         /* Backup blocks are not used in commit_ts records */
869         Assert(!XLogRecHasAnyBlockRefs(record));
870
871         if (info == COMMIT_TS_ZEROPAGE)
872         {
873                 int                     pageno;
874                 int                     slotno;
875
876                 memcpy(&pageno, XLogRecGetData(record), sizeof(int));
877
878                 LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE);
879
880                 slotno = ZeroCommitTsPage(pageno, false);
881                 SimpleLruWritePage(CommitTsCtl, slotno);
882                 Assert(!CommitTsCtl->shared->page_dirty[slotno]);
883
884                 LWLockRelease(CommitTsControlLock);
885         }
886         else if (info == COMMIT_TS_TRUNCATE)
887         {
888                 int                     pageno;
889
890                 memcpy(&pageno, XLogRecGetData(record), sizeof(int));
891
892                 /*
893                  * During XLOG replay, latest_page_number isn't set up yet; insert a
894                  * suitable value to bypass the sanity test in SimpleLruTruncate.
895                  */
896                 CommitTsCtl->shared->latest_page_number = pageno;
897
898                 SimpleLruTruncate(CommitTsCtl, pageno);
899         }
900         else if (info == COMMIT_TS_SETTS)
901         {
902                 xl_commit_ts_set *setts = (xl_commit_ts_set *) XLogRecGetData(record);
903                 int                     nsubxids;
904                 TransactionId *subxids;
905
906                 nsubxids = ((XLogRecGetDataLen(record) - SizeOfCommitTsSet) /
907                                         sizeof(TransactionId));
908                 if (nsubxids > 0)
909                 {
910                         subxids = palloc(sizeof(TransactionId) * nsubxids);
911                         memcpy(subxids,
912                                    XLogRecGetData(record) + SizeOfCommitTsSet,
913                                    sizeof(TransactionId) * nsubxids);
914                 }
915                 else
916                         subxids = NULL;
917
918                 TransactionTreeSetCommitTsData(setts->mainxid, nsubxids, subxids,
919                                                                            setts->timestamp, setts->nodeid, false,
920                                                                            true);
921                 if (subxids)
922                         pfree(subxids);
923         }
924         else
925                 elog(PANIC, "commit_ts_redo: unknown op code %u", info);
926 }