]> granicus.if.org Git - postgresql/blob - src/backend/replication/logical/snapbuild.c
logical decoding: Tell reorderbuffer about all xids.
[postgresql] / src / backend / replication / logical / snapbuild.c
1 /*-------------------------------------------------------------------------
2  *
3  * snapbuild.c
4  *
5  *        Infrastructure for building historic catalog snapshots based on contents
6  *        of the WAL, for the purpose of decoding heapam.c style values in the
7  *        WAL.
8  *
9  * NOTES:
10  *
11  * We build snapshots which can *only* be used to read catalog contents and we
12  * do so by reading and interpreting the WAL stream. The aim is to build a
13  * snapshot that behaves the same as a freshly taken MVCC snapshot would have
14  * at the time the XLogRecord was generated.
15  *
16  * To build the snapshots we reuse the infrastructure built for Hot
17  * Standby. The in-memory snapshots we build look different than HS' because
18  * we have different needs. To successfully decode data from the WAL we only
19  * need to access catalog tables and (sys|rel|cat)cache, not the actual user
20  * tables since the data we decode is wholly contained in the WAL
21  * records. Also, our snapshots need to be different in comparison to normal
22  * MVCC ones because in contrast to those we cannot fully rely on the clog and
23  * pg_subtrans for information about committed transactions because they might
24  * commit in the future from the POV of the WAL entry we're currently
25  * decoding. This definition has the advantage that we only need to prevent
26  * removal of catalog rows, while normal table's rows can still be
27  * removed. This is achieved by using the replication slot mechanism.
28  *
29  * As the percentage of transactions modifying the catalog normally is fairly
30  * small in comparisons to ones only manipulating user data, we keep track of
31  * the committed catalog modifying ones inside [xmin, xmax) instead of keeping
32  * track of all running transactions like it's done in a normal snapshot. Note
33  * that we're generally only looking at transactions that have acquired an
34  * xid. That is we keep a list of transactions between snapshot->(xmin, xmax)
35  * that we consider committed, everything else is considered aborted/in
36  * progress. That also allows us not to care about subtransactions before they
37  * have committed which means this modules, in contrast to HS, doesn't have to
38  * care about suboverflowed subtransactions and similar.
39  *
40  * One complexity of doing this is that to e.g. handle mixed DDL/DML
41  * transactions we need Snapshots that see intermediate versions of the
42  * catalog in a transaction. During normal operation this is achieved by using
43  * CommandIds/cmin/cmax. The problem with that however is that for space
44  * efficiency reasons only one value of that is stored
45  * (c.f. combocid.c). Since ComboCids are only available in memory we log
46  * additional information which allows us to get the original (cmin, cmax)
47  * pair during visibility checks. Check the reorderbuffer.c's comment above
48  * ResolveCminCmaxDuringDecoding() for details.
49  *
50  * To facilitate all this we need our own visibility routine, as the normal
51  * ones are optimized for different usecases.
52  *
53  * To replace the normal catalog snapshots with decoding ones use the
54  * SetupHistoricSnapshot() and TeardownHistoricSnapshot() functions.
55  *
56  *
57  *
58  * The snapbuild machinery is starting up in several stages, as illustrated
59  * by the following graph:
60  *                 +-------------------------+
61  *        +----|SNAPBUILD_START                  |-------------+
62  *        |    +-------------------------+                         |
63  *        |                                     |                                                  |
64  *        |                                     |                                                  |
65  *        |             running_xacts with running xacts           |
66  *        |                                     |                                                  |
67  *        |                                     |                                                  |
68  *        |                                     v                                                  |
69  *        |    +-------------------------+                         v
70  *        |    |SNAPBUILD_FULL_SNAPSHOT  |------------>|
71  *        |    +-------------------------+                         |
72  * running_xacts                |                                          saved snapshot
73  * with zero xacts              |                                 at running_xacts's lsn
74  *        |                                     |                                                  |
75  *        |             all running toplevel TXNs finished         |
76  *        |                                     |                                                  |
77  *        |                                     v                                                  |
78  *        |    +-------------------------+                         |
79  *        +--->|SNAPBUILD_CONSISTENT     |<------------+
80  *                 +-------------------------+
81  *
82  * Initially the machinery is in the START stage. When an xl_running_xacts
83  * record is read that is sufficiently new (above the safe xmin horizon),
84  * there's a state transition. If there were no running xacts when the
85  * runnign_xacts record was generated, we'll directly go into CONSISTENT
86  * state, otherwise we'll switch to the FULL_SNAPSHOT state. Having a full
87  * snapshot means that all transactions that start henceforth can be decoded
88  * in their entirety, but transactions that started previously can't. In
89  * FULL_SNAPSHOT we'll switch into CONSISTENT once all those previously
90  * running transactions have committed or aborted.
91  *
92  * Only transactions that commit after CONSISTENT state has been reached will
93  * be replayed, even though they might have started while still in
94  * FULL_SNAPSHOT. That ensures that we'll reach a point where no previous
95  * changes has been exported, but all the following ones will be. That point
96  * is a convenient point to initialize replication from, which is why we
97  * export a snapshot at that point, which *can* be used to read normal data.
98  *
99  * Copyright (c) 2012-2016, PostgreSQL Global Development Group
100  *
101  * IDENTIFICATION
102  *        src/backend/replication/snapbuild.c
103  *
104  *-------------------------------------------------------------------------
105  */
106
107 #include "postgres.h"
108
109 #include <sys/stat.h>
110 #include <sys/types.h>
111 #include <unistd.h>
112
113 #include "miscadmin.h"
114
115 #include "access/heapam_xlog.h"
116 #include "access/transam.h"
117 #include "access/xact.h"
118
119 #include "replication/logical.h"
120 #include "replication/reorderbuffer.h"
121 #include "replication/snapbuild.h"
122
123 #include "utils/builtins.h"
124 #include "utils/memutils.h"
125 #include "utils/snapshot.h"
126 #include "utils/snapmgr.h"
127 #include "utils/tqual.h"
128
129 #include "storage/block.h"              /* debugging output */
130 #include "storage/fd.h"
131 #include "storage/lmgr.h"
132 #include "storage/proc.h"
133 #include "storage/procarray.h"
134 #include "storage/standby.h"
135
136 /*
137  * This struct contains the current state of the snapshot building
138  * machinery. Besides a forward declaration in the header, it is not exposed
139  * to the public, so we can easily change its contents.
140  */
141 struct SnapBuild
142 {
143         /* how far are we along building our first full snapshot */
144         SnapBuildState state;
145
146         /* private memory context used to allocate memory for this module. */
147         MemoryContext context;
148
149         /* all transactions < than this have committed/aborted */
150         TransactionId xmin;
151
152         /* all transactions >= than this are uncommitted */
153         TransactionId xmax;
154
155         /*
156          * Don't replay commits from an LSN < this LSN. This can be set externally
157          * but it will also be advanced (never retreat) from within snapbuild.c.
158          */
159         XLogRecPtr      start_decoding_at;
160
161         /*
162          * Don't start decoding WAL until the "xl_running_xacts" information
163          * indicates there are no running xids with an xid smaller than this.
164          */
165         TransactionId initial_xmin_horizon;
166
167         /*
168          * Snapshot that's valid to see the catalog state seen at this moment.
169          */
170         Snapshot        snapshot;
171
172         /*
173          * LSN of the last location we are sure a snapshot has been serialized to.
174          */
175         XLogRecPtr      last_serialized_snapshot;
176
177         /*
178          * The reorderbuffer we need to update with usable snapshots et al.
179          */
180         ReorderBuffer *reorder;
181
182         /*
183          * Information about initially running transactions
184          *
185          * When we start building a snapshot there already may be transactions in
186          * progress.  Those are stored in running.xip.  We don't have enough
187          * information about those to decode their contents, so until they are
188          * finished (xcnt=0) we cannot switch to a CONSISTENT state.
189          */
190         struct
191         {
192                 /*
193                  * As long as running.xcnt all XIDs < running.xmin and > running.xmax
194                  * have to be checked whether they still are running.
195                  */
196                 TransactionId xmin;
197                 TransactionId xmax;
198
199                 size_t          xcnt;           /* number of used xip entries */
200                 size_t          xcnt_space; /* allocated size of xip */
201                 TransactionId *xip;             /* running xacts array, xidComparator-sorted */
202         }                       running;
203
204         /*
205          * Array of transactions which could have catalog changes that committed
206          * between xmin and xmax.
207          */
208         struct
209         {
210                 /* number of committed transactions */
211                 size_t          xcnt;
212
213                 /* available space for committed transactions */
214                 size_t          xcnt_space;
215
216                 /*
217                  * Until we reach a CONSISTENT state, we record commits of all
218                  * transactions, not just the catalog changing ones. Record when that
219                  * changes so we know we cannot export a snapshot safely anymore.
220                  */
221                 bool            includes_all_transactions;
222
223                 /*
224                  * Array of committed transactions that have modified the catalog.
225                  *
226                  * As this array is frequently modified we do *not* keep it in
227                  * xidComparator order. Instead we sort the array when building &
228                  * distributing a snapshot.
229                  *
230                  * TODO: It's unclear whether that reasoning has much merit. Every
231                  * time we add something here after becoming consistent will also
232                  * require distributing a snapshot. Storing them sorted would
233                  * potentially also make it easier to purge (but more complicated wrt
234                  * wraparound?). Should be improved if sorting while building the
235                  * snapshot shows up in profiles.
236                  */
237                 TransactionId *xip;
238         }                       committed;
239 };
240
241 /*
242  * Starting a transaction -- which we need to do while exporting a snapshot --
243  * removes knowledge about the previously used resowner, so we save it here.
244  */
245 static ResourceOwner SavedResourceOwnerDuringExport = NULL;
246 static bool ExportInProgress = false;
247
248 /* transaction state manipulation functions */
249 static void SnapBuildEndTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid);
250
251 /* ->running manipulation */
252 static bool SnapBuildTxnIsRunning(SnapBuild *builder, TransactionId xid);
253
254 /* ->committed manipulation */
255 static void SnapBuildPurgeCommittedTxn(SnapBuild *builder);
256
257 /* snapshot building/manipulation/distribution functions */
258 static Snapshot SnapBuildBuildSnapshot(SnapBuild *builder, TransactionId xid);
259
260 static void SnapBuildFreeSnapshot(Snapshot snap);
261
262 static void SnapBuildSnapIncRefcount(Snapshot snap);
263
264 static void SnapBuildDistributeNewCatalogSnapshot(SnapBuild *builder, XLogRecPtr lsn);
265
266 /* xlog reading helper functions for SnapBuildProcessRecord */
267 static bool SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *running);
268
269 /* serialization functions */
270 static void SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn);
271 static bool SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn);
272
273
274 /*
275  * Allocate a new snapshot builder.
276  *
277  * xmin_horizon is the xid >=which we can be sure no catalog rows have been
278  * removed, start_lsn is the LSN >= we want to replay commits.
279  */
280 SnapBuild *
281 AllocateSnapshotBuilder(ReorderBuffer *reorder,
282                                                 TransactionId xmin_horizon,
283                                                 XLogRecPtr start_lsn)
284 {
285         MemoryContext context;
286         MemoryContext oldcontext;
287         SnapBuild  *builder;
288
289         /* allocate memory in own context, to have better accountability */
290         context = AllocSetContextCreate(CurrentMemoryContext,
291                                                                         "snapshot builder context",
292                                                                         ALLOCSET_DEFAULT_MINSIZE,
293                                                                         ALLOCSET_DEFAULT_INITSIZE,
294                                                                         ALLOCSET_DEFAULT_MAXSIZE);
295         oldcontext = MemoryContextSwitchTo(context);
296
297         builder = palloc0(sizeof(SnapBuild));
298
299         builder->state = SNAPBUILD_START;
300         builder->context = context;
301         builder->reorder = reorder;
302         /* Other struct members initialized by zeroing via palloc0 above */
303
304         builder->committed.xcnt = 0;
305         builder->committed.xcnt_space = 128;            /* arbitrary number */
306         builder->committed.xip =
307                 palloc0(builder->committed.xcnt_space * sizeof(TransactionId));
308         builder->committed.includes_all_transactions = true;
309
310         builder->initial_xmin_horizon = xmin_horizon;
311         builder->start_decoding_at = start_lsn;
312
313         MemoryContextSwitchTo(oldcontext);
314
315         return builder;
316 }
317
318 /*
319  * Free a snapshot builder.
320  */
321 void
322 FreeSnapshotBuilder(SnapBuild *builder)
323 {
324         MemoryContext context = builder->context;
325
326         /* free snapshot explicitly, that contains some error checking */
327         if (builder->snapshot != NULL)
328         {
329                 SnapBuildSnapDecRefcount(builder->snapshot);
330                 builder->snapshot = NULL;
331         }
332
333         /* other resources are deallocated via memory context reset */
334         MemoryContextDelete(context);
335 }
336
337 /*
338  * Free an unreferenced snapshot that has previously been built by us.
339  */
340 static void
341 SnapBuildFreeSnapshot(Snapshot snap)
342 {
343         /* make sure we don't get passed an external snapshot */
344         Assert(snap->satisfies == HeapTupleSatisfiesHistoricMVCC);
345
346         /* make sure nobody modified our snapshot */
347         Assert(snap->curcid == FirstCommandId);
348         Assert(!snap->suboverflowed);
349         Assert(!snap->takenDuringRecovery);
350         Assert(snap->regd_count == 0);
351
352         /* slightly more likely, so it's checked even without c-asserts */
353         if (snap->copied)
354                 elog(ERROR, "cannot free a copied snapshot");
355
356         if (snap->active_count)
357                 elog(ERROR, "cannot free an active snapshot");
358
359         pfree(snap);
360 }
361
362 /*
363  * In which state of snapshot building are we?
364  */
365 SnapBuildState
366 SnapBuildCurrentState(SnapBuild *builder)
367 {
368         return builder->state;
369 }
370
371 /*
372  * Should the contents of transaction ending at 'ptr' be decoded?
373  */
374 bool
375 SnapBuildXactNeedsSkip(SnapBuild *builder, XLogRecPtr ptr)
376 {
377         return ptr < builder->start_decoding_at;
378 }
379
380 /*
381  * Increase refcount of a snapshot.
382  *
383  * This is used when handing out a snapshot to some external resource or when
384  * adding a Snapshot as builder->snapshot.
385  */
386 static void
387 SnapBuildSnapIncRefcount(Snapshot snap)
388 {
389         snap->active_count++;
390 }
391
392 /*
393  * Decrease refcount of a snapshot and free if the refcount reaches zero.
394  *
395  * Externally visible, so that external resources that have been handed an
396  * IncRef'ed Snapshot can adjust its refcount easily.
397  */
398 void
399 SnapBuildSnapDecRefcount(Snapshot snap)
400 {
401         /* make sure we don't get passed an external snapshot */
402         Assert(snap->satisfies == HeapTupleSatisfiesHistoricMVCC);
403
404         /* make sure nobody modified our snapshot */
405         Assert(snap->curcid == FirstCommandId);
406         Assert(!snap->suboverflowed);
407         Assert(!snap->takenDuringRecovery);
408
409         Assert(snap->regd_count == 0);
410
411         Assert(snap->active_count > 0);
412
413         /* slightly more likely, so it's checked even without casserts */
414         if (snap->copied)
415                 elog(ERROR, "cannot free a copied snapshot");
416
417         snap->active_count--;
418         if (snap->active_count == 0)
419                 SnapBuildFreeSnapshot(snap);
420 }
421
422 /*
423  * Build a new snapshot, based on currently committed catalog-modifying
424  * transactions.
425  *
426  * In-progress transactions with catalog access are *not* allowed to modify
427  * these snapshots; they have to copy them and fill in appropriate ->curcid
428  * and ->subxip/subxcnt values.
429  */
430 static Snapshot
431 SnapBuildBuildSnapshot(SnapBuild *builder, TransactionId xid)
432 {
433         Snapshot        snapshot;
434         Size            ssize;
435
436         Assert(builder->state >= SNAPBUILD_FULL_SNAPSHOT);
437
438         ssize = sizeof(SnapshotData)
439                 + sizeof(TransactionId) * builder->committed.xcnt
440                 + sizeof(TransactionId) * 1 /* toplevel xid */ ;
441
442         snapshot = MemoryContextAllocZero(builder->context, ssize);
443
444         snapshot->satisfies = HeapTupleSatisfiesHistoricMVCC;
445
446         /*
447          * We misuse the original meaning of SnapshotData's xip and subxip fields
448          * to make the more fitting for our needs.
449          *
450          * In the 'xip' array we store transactions that have to be treated as
451          * committed. Since we will only ever look at tuples from transactions
452          * that have modified the catalog it's more efficient to store those few
453          * that exist between xmin and xmax (frequently there are none).
454          *
455          * Snapshots that are used in transactions that have modified the catalog
456          * also use the 'subxip' array to store their toplevel xid and all the
457          * subtransaction xids so we can recognize when we need to treat rows as
458          * visible that are not in xip but still need to be visible. Subxip only
459          * gets filled when the transaction is copied into the context of a
460          * catalog modifying transaction since we otherwise share a snapshot
461          * between transactions. As long as a txn hasn't modified the catalog it
462          * doesn't need to treat any uncommitted rows as visible, so there is no
463          * need for those xids.
464          *
465          * Both arrays are qsort'ed so that we can use bsearch() on them.
466          */
467         Assert(TransactionIdIsNormal(builder->xmin));
468         Assert(TransactionIdIsNormal(builder->xmax));
469
470         snapshot->xmin = builder->xmin;
471         snapshot->xmax = builder->xmax;
472
473         /* store all transactions to be treated as committed by this snapshot */
474         snapshot->xip =
475                 (TransactionId *) ((char *) snapshot + sizeof(SnapshotData));
476         snapshot->xcnt = builder->committed.xcnt;
477         memcpy(snapshot->xip,
478                    builder->committed.xip,
479                    builder->committed.xcnt * sizeof(TransactionId));
480
481         /* sort so we can bsearch() */
482         qsort(snapshot->xip, snapshot->xcnt, sizeof(TransactionId), xidComparator);
483
484         /*
485          * Initially, subxip is empty, i.e. it's a snapshot to be used by
486          * transactions that don't modify the catalog. Will be filled by
487          * ReorderBufferCopySnap() if necessary.
488          */
489         snapshot->subxcnt = 0;
490         snapshot->subxip = NULL;
491
492         snapshot->suboverflowed = false;
493         snapshot->takenDuringRecovery = false;
494         snapshot->copied = false;
495         snapshot->curcid = FirstCommandId;
496         snapshot->active_count = 0;
497         snapshot->regd_count = 0;
498
499         return snapshot;
500 }
501
502 /*
503  * Export a snapshot so it can be set in another session with SET TRANSACTION
504  * SNAPSHOT.
505  *
506  * For that we need to start a transaction in the current backend as the
507  * importing side checks whether the source transaction is still open to make
508  * sure the xmin horizon hasn't advanced since then.
509  *
510  * After that we convert a locally built snapshot into the normal variant
511  * understood by HeapTupleSatisfiesMVCC et al.
512  */
513 const char *
514 SnapBuildExportSnapshot(SnapBuild *builder)
515 {
516         Snapshot        snap;
517         char       *snapname;
518         TransactionId xid;
519         TransactionId *newxip;
520         int                     newxcnt = 0;
521
522         if (builder->state != SNAPBUILD_CONSISTENT)
523                 elog(ERROR, "cannot export a snapshot before reaching a consistent state");
524
525         if (!builder->committed.includes_all_transactions)
526                 elog(ERROR, "cannot export a snapshot, not all transactions are monitored anymore");
527
528         /* so we don't overwrite the existing value */
529         if (TransactionIdIsValid(MyPgXact->xmin))
530                 elog(ERROR, "cannot export a snapshot when MyPgXact->xmin already is valid");
531
532         if (IsTransactionOrTransactionBlock())
533                 elog(ERROR, "cannot export a snapshot from within a transaction");
534
535         if (SavedResourceOwnerDuringExport)
536                 elog(ERROR, "can only export one snapshot at a time");
537
538         SavedResourceOwnerDuringExport = CurrentResourceOwner;
539         ExportInProgress = true;
540
541         StartTransactionCommand();
542
543         Assert(!FirstSnapshotSet);
544
545         /* There doesn't seem to a nice API to set these */
546         XactIsoLevel = XACT_REPEATABLE_READ;
547         XactReadOnly = true;
548
549         snap = SnapBuildBuildSnapshot(builder, GetTopTransactionId());
550
551         /*
552          * We know that snap->xmin is alive, enforced by the logical xmin
553          * mechanism. Due to that we can do this without locks, we're only
554          * changing our own value.
555          */
556         MyPgXact->xmin = snap->xmin;
557
558         /* allocate in transaction context */
559         newxip = (TransactionId *)
560                 palloc(sizeof(TransactionId) * GetMaxSnapshotXidCount());
561
562         /*
563          * snapbuild.c builds transactions in an "inverted" manner, which means it
564          * stores committed transactions in ->xip, not ones in progress. Build a
565          * classical snapshot by marking all non-committed transactions as
566          * in-progress. This can be expensive.
567          */
568         for (xid = snap->xmin; NormalTransactionIdPrecedes(xid, snap->xmax);)
569         {
570                 void       *test;
571
572                 /*
573                  * Check whether transaction committed using the decoding snapshot
574                  * meaning of ->xip.
575                  */
576                 test = bsearch(&xid, snap->xip, snap->xcnt,
577                                            sizeof(TransactionId), xidComparator);
578
579                 if (test == NULL)
580                 {
581                         if (newxcnt >= GetMaxSnapshotXidCount())
582                                 elog(ERROR, "snapshot too large");
583
584                         newxip[newxcnt++] = xid;
585                 }
586
587                 TransactionIdAdvance(xid);
588         }
589
590         snap->xcnt = newxcnt;
591         snap->xip = newxip;
592
593         /*
594          * now that we've built a plain snapshot, use the normal mechanisms for
595          * exporting it
596          */
597         snapname = ExportSnapshot(snap);
598
599         ereport(LOG,
600                         (errmsg_plural("exported logical decoding snapshot: \"%s\" with %u transaction ID",
601                 "exported logical decoding snapshot: \"%s\" with %u transaction IDs",
602                                                    snap->xcnt,
603                                                    snapname, snap->xcnt)));
604         return snapname;
605 }
606
607 /*
608  * Reset a previously SnapBuildExportSnapshot()'ed snapshot if there is
609  * any. Aborts the previously started transaction and resets the resource
610  * owner back to its original value.
611  */
612 void
613 SnapBuildClearExportedSnapshot(void)
614 {
615         /* nothing exported, that is the usual case */
616         if (!ExportInProgress)
617                 return;
618
619         if (!IsTransactionState())
620                 elog(ERROR, "clearing exported snapshot in wrong transaction state");
621
622         /* make sure nothing  could have ever happened */
623         AbortCurrentTransaction();
624
625         CurrentResourceOwner = SavedResourceOwnerDuringExport;
626         SavedResourceOwnerDuringExport = NULL;
627         ExportInProgress = false;
628 }
629
630 /*
631  * Handle the effects of a single heap change, appropriate to the current state
632  * of the snapshot builder and returns whether changes made at (xid, lsn) can
633  * be decoded.
634  */
635 bool
636 SnapBuildProcessChange(SnapBuild *builder, TransactionId xid, XLogRecPtr lsn)
637 {
638         /*
639          * We can't handle data in transactions if we haven't built a snapshot
640          * yet, so don't store them.
641          */
642         if (builder->state < SNAPBUILD_FULL_SNAPSHOT)
643                 return false;
644
645         /*
646          * No point in keeping track of changes in transactions that we don't have
647          * enough information about to decode. This means that they started before
648          * we got into the SNAPBUILD_FULL_SNAPSHOT state.
649          */
650         if (builder->state < SNAPBUILD_CONSISTENT &&
651                 SnapBuildTxnIsRunning(builder, xid))
652                 return false;
653
654         /*
655          * If the reorderbuffer doesn't yet have a snapshot, add one now, it will
656          * be needed to decode the change we're currently processing.
657          */
658         if (!ReorderBufferXidHasBaseSnapshot(builder->reorder, xid))
659         {
660                 /* only build a new snapshot if we don't have a prebuilt one */
661                 if (builder->snapshot == NULL)
662                 {
663                         builder->snapshot = SnapBuildBuildSnapshot(builder, xid);
664                         /* inrease refcount for the snapshot builder */
665                         SnapBuildSnapIncRefcount(builder->snapshot);
666                 }
667
668                 /*
669                  * Increase refcount for the transaction we're handing the snapshot
670                  * out to.
671                  */
672                 SnapBuildSnapIncRefcount(builder->snapshot);
673                 ReorderBufferSetBaseSnapshot(builder->reorder, xid, lsn,
674                                                                          builder->snapshot);
675         }
676
677         return true;
678 }
679
680 /*
681  * Do CommandId/ComboCid handling after reading an xl_heap_new_cid record.
682  * This implies that a transaction has done some form of write to system
683  * catalogs.
684  */
685 void
686 SnapBuildProcessNewCid(SnapBuild *builder, TransactionId xid,
687                                            XLogRecPtr lsn, xl_heap_new_cid *xlrec)
688 {
689         CommandId       cid;
690
691         /*
692          * we only log new_cid's if a catalog tuple was modified, so mark the
693          * transaction as containing catalog modifications
694          */
695         ReorderBufferXidSetCatalogChanges(builder->reorder, xid, lsn);
696
697         ReorderBufferAddNewTupleCids(builder->reorder, xlrec->top_xid, lsn,
698                                                                  xlrec->target_node, xlrec->target_tid,
699                                                                  xlrec->cmin, xlrec->cmax,
700                                                                  xlrec->combocid);
701
702         /* figure out new command id */
703         if (xlrec->cmin != InvalidCommandId &&
704                 xlrec->cmax != InvalidCommandId)
705                 cid = Max(xlrec->cmin, xlrec->cmax);
706         else if (xlrec->cmax != InvalidCommandId)
707                 cid = xlrec->cmax;
708         else if (xlrec->cmin != InvalidCommandId)
709                 cid = xlrec->cmin;
710         else
711         {
712                 cid = InvalidCommandId; /* silence compiler */
713                 elog(ERROR, "xl_heap_new_cid record without a valid CommandId");
714         }
715
716         ReorderBufferAddNewCommandId(builder->reorder, xid, lsn, cid + 1);
717 }
718
719 /*
720  * Check whether `xid` is currently 'running'.
721  *
722  * Running transactions in our parlance are transactions which we didn't
723  * observe from the start so we can't properly decode their contents. They
724  * only exist after we freshly started from an < CONSISTENT snapshot.
725  */
726 static bool
727 SnapBuildTxnIsRunning(SnapBuild *builder, TransactionId xid)
728 {
729         Assert(builder->state < SNAPBUILD_CONSISTENT);
730         Assert(TransactionIdIsNormal(builder->running.xmin));
731         Assert(TransactionIdIsNormal(builder->running.xmax));
732
733         if (builder->running.xcnt &&
734                 NormalTransactionIdFollows(xid, builder->running.xmin) &&
735                 NormalTransactionIdPrecedes(xid, builder->running.xmax))
736         {
737                 TransactionId *search =
738                 bsearch(&xid, builder->running.xip, builder->running.xcnt_space,
739                                 sizeof(TransactionId), xidComparator);
740
741                 if (search != NULL)
742                 {
743                         Assert(*search == xid);
744                         return true;
745                 }
746         }
747
748         return false;
749 }
750
751 /*
752  * Add a new Snapshot to all transactions we're decoding that currently are
753  * in-progress so they can see new catalog contents made by the transaction
754  * that just committed. This is necessary because those in-progress
755  * transactions will use the new catalog's contents from here on (at the very
756  * least everything they do needs to be compatible with newer catalog
757  * contents).
758  */
759 static void
760 SnapBuildDistributeNewCatalogSnapshot(SnapBuild *builder, XLogRecPtr lsn)
761 {
762         dlist_iter      txn_i;
763         ReorderBufferTXN *txn;
764
765         /*
766          * Iterate through all toplevel transactions. This can include
767          * subtransactions which we just don't yet know to be that, but that's
768          * fine, they will just get an unnecessary snapshot queued.
769          */
770         dlist_foreach(txn_i, &builder->reorder->toplevel_by_lsn)
771         {
772                 txn = dlist_container(ReorderBufferTXN, node, txn_i.cur);
773
774                 Assert(TransactionIdIsValid(txn->xid));
775
776                 /*
777                  * If we don't have a base snapshot yet, there are no changes in this
778                  * transaction which in turn implies we don't yet need a snapshot at
779                  * all. We'll add a snapshot when the first change gets queued.
780                  *
781                  * NB: This works correctly even for subtransactions because
782                  * ReorderBufferCommitChild() takes care to pass the parent the base
783                  * snapshot, and while iterating the changequeue we'll get the change
784                  * from the subtxn.
785                  */
786                 if (!ReorderBufferXidHasBaseSnapshot(builder->reorder, txn->xid))
787                         continue;
788
789                 elog(DEBUG2, "adding a new snapshot to %u at %X/%X",
790                          txn->xid, (uint32) (lsn >> 32), (uint32) lsn);
791
792                 /*
793                  * increase the snapshot's refcount for the transaction we are handing
794                  * it out to
795                  */
796                 SnapBuildSnapIncRefcount(builder->snapshot);
797                 ReorderBufferAddSnapshot(builder->reorder, txn->xid, lsn,
798                                                                  builder->snapshot);
799         }
800 }
801
802 /*
803  * Keep track of a new catalog changing transaction that has committed.
804  */
805 static void
806 SnapBuildAddCommittedTxn(SnapBuild *builder, TransactionId xid)
807 {
808         Assert(TransactionIdIsValid(xid));
809
810         if (builder->committed.xcnt == builder->committed.xcnt_space)
811         {
812                 builder->committed.xcnt_space = builder->committed.xcnt_space * 2 + 1;
813
814                 elog(DEBUG1, "increasing space for committed transactions to %u",
815                          (uint32) builder->committed.xcnt_space);
816
817                 builder->committed.xip = repalloc(builder->committed.xip,
818                                           builder->committed.xcnt_space * sizeof(TransactionId));
819         }
820
821         /*
822          * TODO: It might make sense to keep the array sorted here instead of
823          * doing it every time we build a new snapshot. On the other hand this
824          * gets called repeatedly when a transaction with subtransactions commits.
825          */
826         builder->committed.xip[builder->committed.xcnt++] = xid;
827 }
828
829 /*
830  * Remove knowledge about transactions we treat as committed that are smaller
831  * than ->xmin. Those won't ever get checked via the ->committed array but via
832  * the clog machinery, so we don't need to waste memory on them.
833  */
834 static void
835 SnapBuildPurgeCommittedTxn(SnapBuild *builder)
836 {
837         int                     off;
838         TransactionId *workspace;
839         int                     surviving_xids = 0;
840
841         /* not ready yet */
842         if (!TransactionIdIsNormal(builder->xmin))
843                 return;
844
845         /* TODO: Neater algorithm than just copying and iterating? */
846         workspace =
847                 MemoryContextAlloc(builder->context,
848                                                    builder->committed.xcnt * sizeof(TransactionId));
849
850         /* copy xids that still are interesting to workspace */
851         for (off = 0; off < builder->committed.xcnt; off++)
852         {
853                 if (NormalTransactionIdPrecedes(builder->committed.xip[off],
854                                                                                 builder->xmin))
855                         ;                                       /* remove */
856                 else
857                         workspace[surviving_xids++] = builder->committed.xip[off];
858         }
859
860         /* copy workspace back to persistent state */
861         memcpy(builder->committed.xip, workspace,
862                    surviving_xids * sizeof(TransactionId));
863
864         elog(DEBUG3, "purged committed transactions from %u to %u, xmin: %u, xmax: %u",
865                  (uint32) builder->committed.xcnt, (uint32) surviving_xids,
866                  builder->xmin, builder->xmax);
867         builder->committed.xcnt = surviving_xids;
868
869         pfree(workspace);
870 }
871
872 /*
873  * Common logic for SnapBuildAbortTxn and SnapBuildCommitTxn dealing with
874  * keeping track of the amount of running transactions.
875  */
876 static void
877 SnapBuildEndTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid)
878 {
879         if (builder->state == SNAPBUILD_CONSISTENT)
880                 return;
881
882         /*
883          * NB: This handles subtransactions correctly even if we started from
884          * suboverflowed xl_running_xacts because we only keep track of toplevel
885          * transactions. Since the latter are always are allocated before their
886          * subxids and since they end at the same time it's sufficient to deal
887          * with them here.
888          */
889         if (SnapBuildTxnIsRunning(builder, xid))
890         {
891                 Assert(builder->running.xcnt > 0);
892
893                 if (!--builder->running.xcnt)
894                 {
895                         /*
896                          * None of the originally running transaction is running anymore,
897                          * so our incrementaly built snapshot now is consistent.
898                          */
899                         ereport(LOG,
900                                   (errmsg("logical decoding found consistent point at %X/%X",
901                                                   (uint32) (lsn >> 32), (uint32) lsn),
902                                    errdetail("Transaction ID %u finished; no more running transactions.",
903                                                          xid)));
904                         builder->state = SNAPBUILD_CONSISTENT;
905                 }
906         }
907 }
908
909 /*
910  * Abort a transaction, throw away all state we kept.
911  */
912 void
913 SnapBuildAbortTxn(SnapBuild *builder, XLogRecPtr lsn,
914                                   TransactionId xid,
915                                   int nsubxacts, TransactionId *subxacts)
916 {
917         int                     i;
918
919         for (i = 0; i < nsubxacts; i++)
920         {
921                 TransactionId subxid = subxacts[i];
922
923                 SnapBuildEndTxn(builder, lsn, subxid);
924         }
925
926         SnapBuildEndTxn(builder, lsn, xid);
927 }
928
929 /*
930  * Handle everything that needs to be done when a transaction commits
931  */
932 void
933 SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid,
934                                    int nsubxacts, TransactionId *subxacts)
935 {
936         int                     nxact;
937
938         bool            forced_timetravel = false;
939         bool            sub_needs_timetravel = false;
940         bool            top_needs_timetravel = false;
941
942         TransactionId xmax = xid;
943
944         /*
945          * If we couldn't observe every change of a transaction because it was
946          * already running at the point we started to observe we have to assume it
947          * made catalog changes.
948          *
949          * This has the positive benefit that we afterwards have enough
950          * information to build an exportable snapshot that's usable by pg_dump et
951          * al.
952          */
953         if (builder->state < SNAPBUILD_CONSISTENT)
954         {
955                 /* ensure that only commits after this are getting replayed */
956                 if (builder->start_decoding_at <= lsn)
957                         builder->start_decoding_at = lsn + 1;
958
959                 /*
960                  * We could avoid treating !SnapBuildTxnIsRunning transactions as
961                  * timetravel ones, but we want to be able to export a snapshot when
962                  * we reached consistency.
963                  */
964                 forced_timetravel = true;
965                 elog(DEBUG1, "forced to assume catalog changes for xid %u because it was running to early", xid);
966         }
967
968         for (nxact = 0; nxact < nsubxacts; nxact++)
969         {
970                 TransactionId subxid = subxacts[nxact];
971
972                 /*
973                  * make sure txn is not tracked in running txn's anymore, switch state
974                  */
975                 SnapBuildEndTxn(builder, lsn, subxid);
976
977                 /*
978                  * If we're forcing timetravel we also need visibility information
979                  * about subtransaction, so keep track of subtransaction's state.
980                  */
981                 if (forced_timetravel)
982                 {
983                         SnapBuildAddCommittedTxn(builder, subxid);
984                         if (NormalTransactionIdFollows(subxid, xmax))
985                                 xmax = subxid;
986                 }
987
988                 /*
989                  * Add subtransaction to base snapshot if it DDL, we don't distinguish
990                  * to toplevel transactions there.
991                  */
992                 else if (ReorderBufferXidHasCatalogChanges(builder->reorder, subxid))
993                 {
994                         sub_needs_timetravel = true;
995
996                         elog(DEBUG1, "found subtransaction %u:%u with catalog changes.",
997                                  xid, subxid);
998
999                         SnapBuildAddCommittedTxn(builder, subxid);
1000
1001                         if (NormalTransactionIdFollows(subxid, xmax))
1002                                 xmax = subxid;
1003                 }
1004         }
1005
1006         /*
1007          * Make sure toplevel txn is not tracked in running txn's anymore, switch
1008          * state to consistent if possible.
1009          */
1010         SnapBuildEndTxn(builder, lsn, xid);
1011
1012         if (forced_timetravel)
1013         {
1014                 elog(DEBUG2, "forced transaction %u to do timetravel.", xid);
1015
1016                 SnapBuildAddCommittedTxn(builder, xid);
1017         }
1018         /* add toplevel transaction to base snapshot */
1019         else if (ReorderBufferXidHasCatalogChanges(builder->reorder, xid))
1020         {
1021                 elog(DEBUG2, "found top level transaction %u, with catalog changes!",
1022                          xid);
1023
1024                 top_needs_timetravel = true;
1025                 SnapBuildAddCommittedTxn(builder, xid);
1026         }
1027         else if (sub_needs_timetravel)
1028         {
1029                 /* mark toplevel txn as timetravel as well */
1030                 SnapBuildAddCommittedTxn(builder, xid);
1031         }
1032
1033         /* if there's any reason to build a historic snapshot, do so now */
1034         if (forced_timetravel || top_needs_timetravel || sub_needs_timetravel)
1035         {
1036                 /*
1037                  * Adjust xmax of the snapshot builder, we only do that for committed,
1038                  * catalog modifying, transactions, everything else isn't interesting
1039                  * for us since we'll never look at the respective rows.
1040                  */
1041                 if (!TransactionIdIsValid(builder->xmax) ||
1042                         TransactionIdFollowsOrEquals(xmax, builder->xmax))
1043                 {
1044                         builder->xmax = xmax;
1045                         TransactionIdAdvance(builder->xmax);
1046                 }
1047
1048                 /*
1049                  * If we haven't built a complete snapshot yet there's no need to hand
1050                  * it out, it wouldn't (and couldn't) be used anyway.
1051                  */
1052                 if (builder->state < SNAPBUILD_FULL_SNAPSHOT)
1053                         return;
1054
1055                 /*
1056                  * Decrease the snapshot builder's refcount of the old snapshot, note
1057                  * that it still will be used if it has been handed out to the
1058                  * reorderbuffer earlier.
1059                  */
1060                 if (builder->snapshot)
1061                         SnapBuildSnapDecRefcount(builder->snapshot);
1062
1063                 builder->snapshot = SnapBuildBuildSnapshot(builder, xid);
1064
1065                 /* we might need to execute invalidations, add snapshot */
1066                 if (!ReorderBufferXidHasBaseSnapshot(builder->reorder, xid))
1067                 {
1068                         SnapBuildSnapIncRefcount(builder->snapshot);
1069                         ReorderBufferSetBaseSnapshot(builder->reorder, xid, lsn,
1070                                                                                  builder->snapshot);
1071                 }
1072
1073                 /* refcount of the snapshot builder for the new snapshot */
1074                 SnapBuildSnapIncRefcount(builder->snapshot);
1075
1076                 /* add a new Snapshot to all currently running transactions */
1077                 SnapBuildDistributeNewCatalogSnapshot(builder, lsn);
1078         }
1079         else
1080         {
1081                 /* record that we cannot export a general snapshot anymore */
1082                 builder->committed.includes_all_transactions = false;
1083         }
1084 }
1085
1086
1087 /* -----------------------------------
1088  * Snapshot building functions dealing with xlog records
1089  * -----------------------------------
1090  */
1091
1092 /*
1093  * Process a running xacts record, and use its information to first build a
1094  * historic snapshot and later to release resources that aren't needed
1095  * anymore.
1096  */
1097 void
1098 SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *running)
1099 {
1100         ReorderBufferTXN *txn;
1101
1102         /*
1103          * If we're not consistent yet, inspect the record to see whether it
1104          * allows to get closer to being consistent. If we are consistent, dump
1105          * our snapshot so others or we, after a restart, can use it.
1106          */
1107         if (builder->state < SNAPBUILD_CONSISTENT)
1108         {
1109                 /* returns false if there's no point in performing cleanup just yet */
1110                 if (!SnapBuildFindSnapshot(builder, lsn, running))
1111                         return;
1112         }
1113         else
1114                 SnapBuildSerialize(builder, lsn);
1115
1116         /*
1117          * Update range of interesting xids based on the running xacts
1118          * information. We don't increase ->xmax using it, because once we are in
1119          * a consistent state we can do that ourselves and much more efficiently
1120          * so, because we only need to do it for catalog transactions since we
1121          * only ever look at those.
1122          *
1123          * NB: Because of that xmax can be lower than xmin, because we only
1124          * increase xmax when a catalog modifying transaction commits. While odd
1125          * looking, it's correct and actually more efficient this way since we hit
1126          * fast paths in tqual.c.
1127          */
1128         builder->xmin = running->oldestRunningXid;
1129
1130         /* Remove transactions we don't need to keep track off anymore */
1131         SnapBuildPurgeCommittedTxn(builder);
1132
1133         elog(DEBUG3, "xmin: %u, xmax: %u, oldestrunning: %u",
1134                  builder->xmin, builder->xmax,
1135                  running->oldestRunningXid);
1136
1137         /*
1138          * Inrease shared memory limits, so vacuum can work on tuples we prevented
1139          * from being pruned till now.
1140          */
1141         LogicalIncreaseXminForSlot(lsn, running->oldestRunningXid);
1142
1143         /*
1144          * Also tell the slot where we can restart decoding from. We don't want to
1145          * do that after every commit because changing that implies an fsync of
1146          * the logical slot's state file, so we only do it every time we see a
1147          * running xacts record.
1148          *
1149          * Do so by looking for the oldest in progress transaction (determined by
1150          * the first LSN of any of its relevant records). Every transaction
1151          * remembers the last location we stored the snapshot to disk before its
1152          * beginning. That point is where we can restart from.
1153          */
1154
1155         /*
1156          * Can't know about a serialized snapshot's location if we're not
1157          * consistent.
1158          */
1159         if (builder->state < SNAPBUILD_CONSISTENT)
1160                 return;
1161
1162         txn = ReorderBufferGetOldestTXN(builder->reorder);
1163
1164         /*
1165          * oldest ongoing txn might have started when we didn't yet serialize
1166          * anything because we hadn't reached a consistent state yet.
1167          */
1168         if (txn != NULL && txn->restart_decoding_lsn != InvalidXLogRecPtr)
1169                 LogicalIncreaseRestartDecodingForSlot(lsn, txn->restart_decoding_lsn);
1170
1171         /*
1172          * No in-progress transaction, can reuse the last serialized snapshot if
1173          * we have one.
1174          */
1175         else if (txn == NULL &&
1176                 builder->reorder->current_restart_decoding_lsn != InvalidXLogRecPtr &&
1177                          builder->last_serialized_snapshot != InvalidXLogRecPtr)
1178                 LogicalIncreaseRestartDecodingForSlot(lsn,
1179                                                                                   builder->last_serialized_snapshot);
1180 }
1181
1182
1183 /*
1184  * Build the start of a snapshot that's capable of decoding the catalog.
1185  *
1186  * Helper function for SnapBuildProcessRunningXacts() while we're not yet
1187  * consistent.
1188  *
1189  * Returns true if there is a point in performing internal maintenance/cleanup
1190  * using the xl_running_xacts record.
1191  */
1192 static bool
1193 SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *running)
1194 {
1195         /* ---
1196          * Build catalog decoding snapshot incrementally using information about
1197          * the currently running transactions. There are several ways to do that:
1198          *
1199          * a) There were no running transactions when the xl_running_xacts record
1200          *        was inserted, jump to CONSISTENT immediately. We might find such a
1201          *        state we were waiting for b) and c).
1202          *
1203          * b) Wait for all toplevel transactions that were running to end. We
1204          *        simply track the number of in-progress toplevel transactions and
1205          *        lower it whenever one commits or aborts. When that number
1206          *        (builder->running.xcnt) reaches zero, we can go from FULL_SNAPSHOT
1207          *        to CONSISTENT.
1208          *        NB: We need to search running.xip when seeing a transaction's end to
1209          *        make sure it's a toplevel transaction and it's been one of the
1210          *        initially running ones.
1211          *        Interestingly, in contrast to HS, this allows us not to care about
1212          *        subtransactions - and by extension suboverflowed xl_running_xacts -
1213          *        at all.
1214          *
1215          * c) This (in a previous run) or another decoding slot serialized a
1216          *        snapshot to disk that we can use.
1217          * ---
1218          */
1219
1220         /*
1221          * xl_running_xact record is older than what we can use, we might not have
1222          * all necessary catalog rows anymore.
1223          */
1224         if (TransactionIdIsNormal(builder->initial_xmin_horizon) &&
1225                 NormalTransactionIdPrecedes(running->oldestRunningXid,
1226                                                                         builder->initial_xmin_horizon))
1227         {
1228                 ereport(DEBUG1,
1229                                 (errmsg_internal("skipping snapshot at %X/%X while building logical decoding snapshot, xmin horizon too low",
1230                                                                  (uint32) (lsn >> 32), (uint32) lsn),
1231                 errdetail_internal("initial xmin horizon of %u vs the snapshot's %u",
1232                                  builder->initial_xmin_horizon, running->oldestRunningXid)));
1233                 return true;
1234         }
1235
1236         /*
1237          * a) No transaction were running, we can jump to consistent.
1238          *
1239          * NB: We might have already started to incrementally assemble a snapshot,
1240          * so we need to be careful to deal with that.
1241          */
1242         if (running->xcnt == 0)
1243         {
1244                 if (builder->start_decoding_at == InvalidXLogRecPtr ||
1245                         builder->start_decoding_at <= lsn)
1246                         /* can decode everything after this */
1247                         builder->start_decoding_at = lsn + 1;
1248
1249                 /* As no transactions were running xmin/xmax can be trivially set. */
1250                 builder->xmin = running->nextXid;               /* < are finished */
1251                 builder->xmax = running->nextXid;               /* >= are running */
1252
1253                 /* so we can safely use the faster comparisons */
1254                 Assert(TransactionIdIsNormal(builder->xmin));
1255                 Assert(TransactionIdIsNormal(builder->xmax));
1256
1257                 /* no transactions running now */
1258                 builder->running.xcnt = 0;
1259                 builder->running.xmin = InvalidTransactionId;
1260                 builder->running.xmax = InvalidTransactionId;
1261
1262                 builder->state = SNAPBUILD_CONSISTENT;
1263
1264                 ereport(LOG,
1265                                 (errmsg("logical decoding found consistent point at %X/%X",
1266                                                 (uint32) (lsn >> 32), (uint32) lsn),
1267                                  errdetail("There are no running transactions.")));
1268
1269                 return false;
1270         }
1271         /* c) valid on disk state */
1272         else if (SnapBuildRestore(builder, lsn))
1273         {
1274                 /* there won't be any state to cleanup */
1275                 return false;
1276         }
1277
1278         /*
1279          * b) first encounter of a useable xl_running_xacts record. If we had
1280          * found one earlier we would either track running transactions (i.e.
1281          * builder->running.xcnt != 0) or be consistent (this function wouldn't
1282          * get called).
1283          */
1284         else if (!builder->running.xcnt)
1285         {
1286                 int                     off;
1287
1288                 /*
1289                  * We only care about toplevel xids as those are the ones we
1290                  * definitely see in the wal stream. As snapbuild.c tracks committed
1291                  * instead of running transactions we don't need to know anything
1292                  * about uncommitted subtransactions.
1293                  */
1294
1295                 /*
1296                  * Start with an xmin/xmax that's correct for future, when all the
1297                  * currently running transactions have finished. We'll update both
1298                  * while waiting for the pending transactions to finish.
1299                  */
1300                 builder->xmin = running->nextXid;               /* < are finished */
1301                 builder->xmax = running->nextXid;               /* >= are running */
1302
1303                 /* so we can safely use the faster comparisons */
1304                 Assert(TransactionIdIsNormal(builder->xmin));
1305                 Assert(TransactionIdIsNormal(builder->xmax));
1306
1307                 builder->running.xcnt = running->xcnt;
1308                 builder->running.xcnt_space = running->xcnt;
1309                 builder->running.xip =
1310                         MemoryContextAlloc(builder->context,
1311                                                            builder->running.xcnt * sizeof(TransactionId));
1312                 memcpy(builder->running.xip, running->xids,
1313                            builder->running.xcnt * sizeof(TransactionId));
1314
1315                 /* sort so we can do a binary search */
1316                 qsort(builder->running.xip, builder->running.xcnt,
1317                           sizeof(TransactionId), xidComparator);
1318
1319                 builder->running.xmin = builder->running.xip[0];
1320                 builder->running.xmax = builder->running.xip[running->xcnt - 1];
1321
1322                 /* makes comparisons cheaper later */
1323                 TransactionIdRetreat(builder->running.xmin);
1324                 TransactionIdAdvance(builder->running.xmax);
1325
1326                 builder->state = SNAPBUILD_FULL_SNAPSHOT;
1327
1328                 ereport(LOG,
1329                         (errmsg("logical decoding found initial starting point at %X/%X",
1330                                         (uint32) (lsn >> 32), (uint32) lsn),
1331                          errdetail_plural("%u transaction needs to finish.",
1332                                                           "%u transactions need to finish.",
1333                                                           builder->running.xcnt,
1334                                                           (uint32) builder->running.xcnt)));
1335
1336                 /*
1337                  * Iterate through all xids, wait for them to finish.
1338                  *
1339                  * This isn't required for the correctness of decoding, but to allow
1340                  * isolationtester to notice that we're currently waiting for
1341                  * something.
1342                  */
1343                 for (off = 0; off < builder->running.xcnt; off++)
1344                 {
1345                         TransactionId xid = builder->running.xip[off];
1346
1347                         /*
1348                          * Upper layers should prevent that we ever need to wait on
1349                          * ourselves. Check anyway, since failing to do so would either
1350                          * result in an endless wait or an Assert() failure.
1351                          */
1352                         if (TransactionIdIsCurrentTransactionId(xid))
1353                                 elog(ERROR, "waiting for ourselves");
1354
1355                         XactLockTableWait(xid, NULL, NULL, XLTW_None);
1356                 }
1357
1358                 /* nothing could have built up so far, so don't perform cleanup */
1359                 return false;
1360         }
1361
1362         /*
1363          * We already started to track running xacts and need to wait for all
1364          * in-progress ones to finish. We fall through to the normal processing of
1365          * records so incremental cleanup can be performed.
1366          */
1367         return true;
1368 }
1369
1370
1371 /* -----------------------------------
1372  * Snapshot serialization support
1373  * -----------------------------------
1374  */
1375
1376 /*
1377  * We store current state of struct SnapBuild on disk in the following manner:
1378  *
1379  * struct SnapBuildOnDisk;
1380  * TransactionId * running.xcnt_space;
1381  * TransactionId * committed.xcnt; (*not xcnt_space*)
1382  *
1383  */
1384 typedef struct SnapBuildOnDisk
1385 {
1386         /* first part of this struct needs to be version independent */
1387
1388         /* data not covered by checksum */
1389         uint32          magic;
1390         pg_crc32c       checksum;
1391
1392         /* data covered by checksum */
1393
1394         /* version, in case we want to support pg_upgrade */
1395         uint32          version;
1396         /* how large is the on disk data, excluding the constant sized part */
1397         uint32          length;
1398
1399         /* version dependent part */
1400         SnapBuild       builder;
1401
1402         /* variable amount of TransactionIds follows */
1403 } SnapBuildOnDisk;
1404
1405 #define SnapBuildOnDiskConstantSize \
1406         offsetof(SnapBuildOnDisk, builder)
1407 #define SnapBuildOnDiskNotChecksummedSize \
1408         offsetof(SnapBuildOnDisk, version)
1409
1410 #define SNAPBUILD_MAGIC 0x51A1E001
1411 #define SNAPBUILD_VERSION 2
1412
1413 /*
1414  * Store/Load a snapshot from disk, depending on the snapshot builder's state.
1415  *
1416  * Supposed to be used by external (i.e. not snapbuild.c) code that just read
1417  * a record that's a potential location for a serialized snapshot.
1418  */
1419 void
1420 SnapBuildSerializationPoint(SnapBuild *builder, XLogRecPtr lsn)
1421 {
1422         if (builder->state < SNAPBUILD_CONSISTENT)
1423                 SnapBuildRestore(builder, lsn);
1424         else
1425                 SnapBuildSerialize(builder, lsn);
1426 }
1427
1428 /*
1429  * Serialize the snapshot 'builder' at the location 'lsn' if it hasn't already
1430  * been done by another decoding process.
1431  */
1432 static void
1433 SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn)
1434 {
1435         Size            needed_length;
1436         SnapBuildOnDisk *ondisk;
1437         char       *ondisk_c;
1438         int                     fd;
1439         char            tmppath[MAXPGPATH];
1440         char            path[MAXPGPATH];
1441         int                     ret;
1442         struct stat stat_buf;
1443         Size            sz;
1444
1445         Assert(lsn != InvalidXLogRecPtr);
1446         Assert(builder->last_serialized_snapshot == InvalidXLogRecPtr ||
1447                    builder->last_serialized_snapshot <= lsn);
1448
1449         /*
1450          * no point in serializing if we cannot continue to work immediately after
1451          * restoring the snapshot
1452          */
1453         if (builder->state < SNAPBUILD_CONSISTENT)
1454                 return;
1455
1456         /*
1457          * We identify snapshots by the LSN they are valid for. We don't need to
1458          * include timelines in the name as each LSN maps to exactly one timeline
1459          * unless the user used pg_resetxlog or similar. If a user did so, there's
1460          * no hope continuing to decode anyway.
1461          */
1462         sprintf(path, "pg_logical/snapshots/%X-%X.snap",
1463                         (uint32) (lsn >> 32), (uint32) lsn);
1464
1465         /*
1466          * first check whether some other backend already has written the snapshot
1467          * for this LSN. It's perfectly fine if there's none, so we accept ENOENT
1468          * as a valid state. Everything else is an unexpected error.
1469          */
1470         ret = stat(path, &stat_buf);
1471
1472         if (ret != 0 && errno != ENOENT)
1473                 ereport(ERROR,
1474                                 (errmsg("could not stat file \"%s\": %m", path)));
1475
1476         else if (ret == 0)
1477         {
1478                 /*
1479                  * somebody else has already serialized to this point, don't overwrite
1480                  * but remember location, so we don't need to read old data again.
1481                  *
1482                  * To be sure it has been synced to disk after the rename() from the
1483                  * tempfile filename to the real filename, we just repeat the fsync.
1484                  * That ought to be cheap because in most scenarios it should already
1485                  * be safely on disk.
1486                  */
1487                 fsync_fname(path, false);
1488                 fsync_fname("pg_logical/snapshots", true);
1489
1490                 builder->last_serialized_snapshot = lsn;
1491                 goto out;
1492         }
1493
1494         /*
1495          * there is an obvious race condition here between the time we stat(2) the
1496          * file and us writing the file. But we rename the file into place
1497          * atomically and all files created need to contain the same data anyway,
1498          * so this is perfectly fine, although a bit of a resource waste. Locking
1499          * seems like pointless complication.
1500          */
1501         elog(DEBUG1, "serializing snapshot to %s", path);
1502
1503         /* to make sure only we will write to this tempfile, include pid */
1504         sprintf(tmppath, "pg_logical/snapshots/%X-%X.snap.%u.tmp",
1505                         (uint32) (lsn >> 32), (uint32) lsn, MyProcPid);
1506
1507         /*
1508          * Unlink temporary file if it already exists, needs to have been before a
1509          * crash/error since we won't enter this function twice from within a
1510          * single decoding slot/backend and the temporary file contains the pid of
1511          * the current process.
1512          */
1513         if (unlink(tmppath) != 0 && errno != ENOENT)
1514                 ereport(ERROR,
1515                                 (errcode_for_file_access(),
1516                                  errmsg("could not remove file \"%s\": %m", path)));
1517
1518         needed_length = sizeof(SnapBuildOnDisk) +
1519                 sizeof(TransactionId) * builder->running.xcnt_space +
1520                 sizeof(TransactionId) * builder->committed.xcnt;
1521
1522         ondisk_c = MemoryContextAllocZero(builder->context, needed_length);
1523         ondisk = (SnapBuildOnDisk *) ondisk_c;
1524         ondisk->magic = SNAPBUILD_MAGIC;
1525         ondisk->version = SNAPBUILD_VERSION;
1526         ondisk->length = needed_length;
1527         INIT_CRC32C(ondisk->checksum);
1528         COMP_CRC32C(ondisk->checksum,
1529                                 ((char *) ondisk) + SnapBuildOnDiskNotChecksummedSize,
1530                         SnapBuildOnDiskConstantSize - SnapBuildOnDiskNotChecksummedSize);
1531         ondisk_c += sizeof(SnapBuildOnDisk);
1532
1533         memcpy(&ondisk->builder, builder, sizeof(SnapBuild));
1534         /* NULL-ify memory-only data */
1535         ondisk->builder.context = NULL;
1536         ondisk->builder.snapshot = NULL;
1537         ondisk->builder.reorder = NULL;
1538         ondisk->builder.running.xip = NULL;
1539         ondisk->builder.committed.xip = NULL;
1540
1541         COMP_CRC32C(ondisk->checksum,
1542                                 &ondisk->builder,
1543                                 sizeof(SnapBuild));
1544
1545         /* copy running xacts */
1546         sz = sizeof(TransactionId) * builder->running.xcnt_space;
1547         memcpy(ondisk_c, builder->running.xip, sz);
1548         COMP_CRC32C(ondisk->checksum, ondisk_c, sz);
1549         ondisk_c += sz;
1550
1551         /* copy committed xacts */
1552         sz = sizeof(TransactionId) * builder->committed.xcnt;
1553         memcpy(ondisk_c, builder->committed.xip, sz);
1554         COMP_CRC32C(ondisk->checksum, ondisk_c, sz);
1555         ondisk_c += sz;
1556
1557         FIN_CRC32C(ondisk->checksum);
1558
1559         /* we have valid data now, open tempfile and write it there */
1560         fd = OpenTransientFile(tmppath,
1561                                                    O_CREAT | O_EXCL | O_WRONLY | PG_BINARY,
1562                                                    S_IRUSR | S_IWUSR);
1563         if (fd < 0)
1564                 ereport(ERROR,
1565                                 (errmsg("could not open file \"%s\": %m", path)));
1566
1567         if ((write(fd, ondisk, needed_length)) != needed_length)
1568         {
1569                 CloseTransientFile(fd);
1570                 ereport(ERROR,
1571                                 (errcode_for_file_access(),
1572                                  errmsg("could not write to file \"%s\": %m", tmppath)));
1573         }
1574
1575         /*
1576          * fsync the file before renaming so that even if we crash after this we
1577          * have either a fully valid file or nothing.
1578          *
1579          * TODO: Do the fsync() via checkpoints/restartpoints, doing it here has
1580          * some noticeable overhead since it's performed synchronously during
1581          * decoding?
1582          */
1583         if (pg_fsync(fd) != 0)
1584         {
1585                 CloseTransientFile(fd);
1586                 ereport(ERROR,
1587                                 (errcode_for_file_access(),
1588                                  errmsg("could not fsync file \"%s\": %m", tmppath)));
1589         }
1590         CloseTransientFile(fd);
1591
1592         fsync_fname("pg_logical/snapshots", true);
1593
1594         /*
1595          * We may overwrite the work from some other backend, but that's ok, our
1596          * snapshot is valid as well, we'll just have done some superfluous work.
1597          */
1598         if (rename(tmppath, path) != 0)
1599         {
1600                 ereport(ERROR,
1601                                 (errcode_for_file_access(),
1602                                  errmsg("could not rename file \"%s\" to \"%s\": %m",
1603                                                 tmppath, path)));
1604         }
1605
1606         /* make sure we persist */
1607         fsync_fname(path, false);
1608         fsync_fname("pg_logical/snapshots", true);
1609
1610         /*
1611          * Now there's no way we can loose the dumped state anymore, remember this
1612          * as a serialization point.
1613          */
1614         builder->last_serialized_snapshot = lsn;
1615
1616 out:
1617         ReorderBufferSetRestartPoint(builder->reorder,
1618                                                                  builder->last_serialized_snapshot);
1619 }
1620
1621 /*
1622  * Restore a snapshot into 'builder' if previously one has been stored at the
1623  * location indicated by 'lsn'. Returns true if successful, false otherwise.
1624  */
1625 static bool
1626 SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn)
1627 {
1628         SnapBuildOnDisk ondisk;
1629         int                     fd;
1630         char            path[MAXPGPATH];
1631         Size            sz;
1632         int                     readBytes;
1633         pg_crc32c       checksum;
1634
1635         /* no point in loading a snapshot if we're already there */
1636         if (builder->state == SNAPBUILD_CONSISTENT)
1637                 return false;
1638
1639         sprintf(path, "pg_logical/snapshots/%X-%X.snap",
1640                         (uint32) (lsn >> 32), (uint32) lsn);
1641
1642         fd = OpenTransientFile(path, O_RDONLY | PG_BINARY, 0);
1643
1644         if (fd < 0 && errno == ENOENT)
1645                 return false;
1646         else if (fd < 0)
1647                 ereport(ERROR,
1648                                 (errcode_for_file_access(),
1649                                  errmsg("could not open file \"%s\": %m", path)));
1650
1651         /* ----
1652          * Make sure the snapshot had been stored safely to disk, that's normally
1653          * cheap.
1654          * Note that we do not need PANIC here, nobody will be able to use the
1655          * slot without fsyncing, and saving it won't succeed without an fsync()
1656          * either...
1657          * ----
1658          */
1659         fsync_fname(path, false);
1660         fsync_fname("pg_logical/snapshots", true);
1661
1662
1663         /* read statically sized portion of snapshot */
1664         readBytes = read(fd, &ondisk, SnapBuildOnDiskConstantSize);
1665         if (readBytes != SnapBuildOnDiskConstantSize)
1666         {
1667                 CloseTransientFile(fd);
1668                 ereport(ERROR,
1669                                 (errcode_for_file_access(),
1670                                  errmsg("could not read file \"%s\", read %d of %d: %m",
1671                                                 path, readBytes, (int) SnapBuildOnDiskConstantSize)));
1672         }
1673
1674         if (ondisk.magic != SNAPBUILD_MAGIC)
1675                 ereport(ERROR,
1676                                 (errmsg("snapbuild state file \"%s\" has wrong magic number: %u instead of %u",
1677                                                 path, ondisk.magic, SNAPBUILD_MAGIC)));
1678
1679         if (ondisk.version != SNAPBUILD_VERSION)
1680                 ereport(ERROR,
1681                                 (errmsg("snapbuild state file \"%s\" has unsupported version: %u instead of %u",
1682                                                 path, ondisk.version, SNAPBUILD_VERSION)));
1683
1684         INIT_CRC32C(checksum);
1685         COMP_CRC32C(checksum,
1686                                 ((char *) &ondisk) + SnapBuildOnDiskNotChecksummedSize,
1687                         SnapBuildOnDiskConstantSize - SnapBuildOnDiskNotChecksummedSize);
1688
1689         /* read SnapBuild */
1690         readBytes = read(fd, &ondisk.builder, sizeof(SnapBuild));
1691         if (readBytes != sizeof(SnapBuild))
1692         {
1693                 CloseTransientFile(fd);
1694                 ereport(ERROR,
1695                                 (errcode_for_file_access(),
1696                                  errmsg("could not read file \"%s\", read %d of %d: %m",
1697                                                 path, readBytes, (int) sizeof(SnapBuild))));
1698         }
1699         COMP_CRC32C(checksum, &ondisk.builder, sizeof(SnapBuild));
1700
1701         /* restore running xacts information */
1702         sz = sizeof(TransactionId) * ondisk.builder.running.xcnt_space;
1703         ondisk.builder.running.xip = MemoryContextAllocZero(builder->context, sz);
1704         readBytes = read(fd, ondisk.builder.running.xip, sz);
1705         if (readBytes != sz)
1706         {
1707                 CloseTransientFile(fd);
1708                 ereport(ERROR,
1709                                 (errcode_for_file_access(),
1710                                  errmsg("could not read file \"%s\", read %d of %d: %m",
1711                                                 path, readBytes, (int) sz)));
1712         }
1713         COMP_CRC32C(checksum, ondisk.builder.running.xip, sz);
1714
1715         /* restore committed xacts information */
1716         sz = sizeof(TransactionId) * ondisk.builder.committed.xcnt;
1717         ondisk.builder.committed.xip = MemoryContextAllocZero(builder->context, sz);
1718         readBytes = read(fd, ondisk.builder.committed.xip, sz);
1719         if (readBytes != sz)
1720         {
1721                 CloseTransientFile(fd);
1722                 ereport(ERROR,
1723                                 (errcode_for_file_access(),
1724                                  errmsg("could not read file \"%s\", read %d of %d: %m",
1725                                                 path, readBytes, (int) sz)));
1726         }
1727         COMP_CRC32C(checksum, ondisk.builder.committed.xip, sz);
1728
1729         CloseTransientFile(fd);
1730
1731         FIN_CRC32C(checksum);
1732
1733         /* verify checksum of what we've read */
1734         if (!EQ_CRC32C(checksum, ondisk.checksum))
1735                 ereport(ERROR,
1736                                 (errcode_for_file_access(),
1737                                  errmsg("checksum mismatch for snapbuild state file \"%s\": is %u, should be %u",
1738                                                 path, checksum, ondisk.checksum)));
1739
1740         /*
1741          * ok, we now have a sensible snapshot here, figure out if it has more
1742          * information than we have.
1743          */
1744
1745         /*
1746          * We are only interested in consistent snapshots for now, comparing
1747          * whether one incomplete snapshot is more "advanced" seems to be
1748          * unnecessarily complex.
1749          */
1750         if (ondisk.builder.state < SNAPBUILD_CONSISTENT)
1751                 goto snapshot_not_interesting;
1752
1753         /*
1754          * Don't use a snapshot that requires an xmin that we cannot guarantee to
1755          * be available.
1756          */
1757         if (TransactionIdPrecedes(ondisk.builder.xmin, builder->initial_xmin_horizon))
1758                 goto snapshot_not_interesting;
1759
1760
1761         /* ok, we think the snapshot is sensible, copy over everything important */
1762         builder->xmin = ondisk.builder.xmin;
1763         builder->xmax = ondisk.builder.xmax;
1764         builder->state = ondisk.builder.state;
1765
1766         builder->committed.xcnt = ondisk.builder.committed.xcnt;
1767         /* We only allocated/stored xcnt, not xcnt_space xids ! */
1768         /* don't overwrite preallocated xip, if we don't have anything here */
1769         if (builder->committed.xcnt > 0)
1770         {
1771                 pfree(builder->committed.xip);
1772                 builder->committed.xcnt_space = ondisk.builder.committed.xcnt;
1773                 builder->committed.xip = ondisk.builder.committed.xip;
1774         }
1775         ondisk.builder.committed.xip = NULL;
1776
1777         builder->running.xcnt = ondisk.builder.running.xcnt;
1778         if (builder->running.xip)
1779                 pfree(builder->running.xip);
1780         builder->running.xcnt_space = ondisk.builder.running.xcnt_space;
1781         builder->running.xip = ondisk.builder.running.xip;
1782
1783         /* our snapshot is not interesting anymore, build a new one */
1784         if (builder->snapshot != NULL)
1785         {
1786                 SnapBuildSnapDecRefcount(builder->snapshot);
1787         }
1788         builder->snapshot = SnapBuildBuildSnapshot(builder, InvalidTransactionId);
1789         SnapBuildSnapIncRefcount(builder->snapshot);
1790
1791         ReorderBufferSetRestartPoint(builder->reorder, lsn);
1792
1793         Assert(builder->state == SNAPBUILD_CONSISTENT);
1794
1795         ereport(LOG,
1796                         (errmsg("logical decoding found consistent point at %X/%X",
1797                                         (uint32) (lsn >> 32), (uint32) lsn),
1798                          errdetail("Logical decoding will begin using saved snapshot.")));
1799         return true;
1800
1801 snapshot_not_interesting:
1802         if (ondisk.builder.running.xip != NULL)
1803                 pfree(ondisk.builder.running.xip);
1804         if (ondisk.builder.committed.xip != NULL)
1805                 pfree(ondisk.builder.committed.xip);
1806         return false;
1807 }
1808
1809 /*
1810  * Remove all serialized snapshots that are not required anymore because no
1811  * slot can need them. This doesn't actually have to run during a checkpoint,
1812  * but it's a convenient point to schedule this.
1813  *
1814  * NB: We run this during checkpoints even if logical decoding is disabled so
1815  * we cleanup old slots at some point after it got disabled.
1816  */
1817 void
1818 CheckPointSnapBuild(void)
1819 {
1820         XLogRecPtr      cutoff;
1821         XLogRecPtr      redo;
1822         DIR                *snap_dir;
1823         struct dirent *snap_de;
1824         char            path[MAXPGPATH];
1825
1826         /*
1827          * We start of with a minimum of the last redo pointer. No new replication
1828          * slot will start before that, so that's a safe upper bound for removal.
1829          */
1830         redo = GetRedoRecPtr();
1831
1832         /* now check for the restart ptrs from existing slots */
1833         cutoff = ReplicationSlotsComputeLogicalRestartLSN();
1834
1835         /* don't start earlier than the restart lsn */
1836         if (redo < cutoff)
1837                 cutoff = redo;
1838
1839         snap_dir = AllocateDir("pg_logical/snapshots");
1840         while ((snap_de = ReadDir(snap_dir, "pg_logical/snapshots")) != NULL)
1841         {
1842                 uint32          hi;
1843                 uint32          lo;
1844                 XLogRecPtr      lsn;
1845                 struct stat statbuf;
1846
1847                 if (strcmp(snap_de->d_name, ".") == 0 ||
1848                         strcmp(snap_de->d_name, "..") == 0)
1849                         continue;
1850
1851                 snprintf(path, MAXPGPATH, "pg_logical/snapshots/%s", snap_de->d_name);
1852
1853                 if (lstat(path, &statbuf) == 0 && !S_ISREG(statbuf.st_mode))
1854                 {
1855                         elog(DEBUG1, "only regular files expected: %s", path);
1856                         continue;
1857                 }
1858
1859                 /*
1860                  * temporary filenames from SnapBuildSerialize() include the LSN and
1861                  * everything but are postfixed by .$pid.tmp. We can just remove them
1862                  * the same as other files because there can be none that are
1863                  * currently being written that are older than cutoff.
1864                  *
1865                  * We just log a message if a file doesn't fit the pattern, it's
1866                  * probably some editors lock/state file or similar...
1867                  */
1868                 if (sscanf(snap_de->d_name, "%X-%X.snap", &hi, &lo) != 2)
1869                 {
1870                         ereport(LOG,
1871                                         (errmsg("could not parse file name \"%s\"", path)));
1872                         continue;
1873                 }
1874
1875                 lsn = ((uint64) hi) << 32 | lo;
1876
1877                 /* check whether we still need it */
1878                 if (lsn < cutoff || cutoff == InvalidXLogRecPtr)
1879                 {
1880                         elog(DEBUG1, "removing snapbuild snapshot %s", path);
1881
1882                         /*
1883                          * It's not particularly harmful, though strange, if we can't
1884                          * remove the file here. Don't prevent the checkpoint from
1885                          * completing, that'd be cure worse than the disease.
1886                          */
1887                         if (unlink(path) < 0)
1888                         {
1889                                 ereport(LOG,
1890                                                 (errcode_for_file_access(),
1891                                                  errmsg("could not remove file \"%s\": %m",
1892                                                                 path)));
1893                                 continue;
1894                         }
1895                 }
1896         }
1897         FreeDir(snap_dir);
1898 }