]> granicus.if.org Git - postgresql/blob - src/backend/replication/logical/snapbuild.c
Add macros to make AllocSetContextCreate() calls simpler and safer.
[postgresql] / src / backend / replication / logical / snapbuild.c
1 /*-------------------------------------------------------------------------
2  *
3  * snapbuild.c
4  *
5  *        Infrastructure for building historic catalog snapshots based on contents
6  *        of the WAL, for the purpose of decoding heapam.c style values in the
7  *        WAL.
8  *
9  * NOTES:
10  *
11  * We build snapshots which can *only* be used to read catalog contents and we
12  * do so by reading and interpreting the WAL stream. The aim is to build a
13  * snapshot that behaves the same as a freshly taken MVCC snapshot would have
14  * at the time the XLogRecord was generated.
15  *
16  * To build the snapshots we reuse the infrastructure built for Hot
17  * Standby. The in-memory snapshots we build look different than HS' because
18  * we have different needs. To successfully decode data from the WAL we only
19  * need to access catalog tables and (sys|rel|cat)cache, not the actual user
20  * tables since the data we decode is wholly contained in the WAL
21  * records. Also, our snapshots need to be different in comparison to normal
22  * MVCC ones because in contrast to those we cannot fully rely on the clog and
23  * pg_subtrans for information about committed transactions because they might
24  * commit in the future from the POV of the WAL entry we're currently
25  * decoding. This definition has the advantage that we only need to prevent
26  * removal of catalog rows, while normal table's rows can still be
27  * removed. This is achieved by using the replication slot mechanism.
28  *
29  * As the percentage of transactions modifying the catalog normally is fairly
30  * small in comparisons to ones only manipulating user data, we keep track of
31  * the committed catalog modifying ones inside [xmin, xmax) instead of keeping
32  * track of all running transactions like it's done in a normal snapshot. Note
33  * that we're generally only looking at transactions that have acquired an
34  * xid. That is we keep a list of transactions between snapshot->(xmin, xmax)
35  * that we consider committed, everything else is considered aborted/in
36  * progress. That also allows us not to care about subtransactions before they
37  * have committed which means this modules, in contrast to HS, doesn't have to
38  * care about suboverflowed subtransactions and similar.
39  *
40  * One complexity of doing this is that to e.g. handle mixed DDL/DML
41  * transactions we need Snapshots that see intermediate versions of the
42  * catalog in a transaction. During normal operation this is achieved by using
43  * CommandIds/cmin/cmax. The problem with that however is that for space
44  * efficiency reasons only one value of that is stored
45  * (c.f. combocid.c). Since ComboCids are only available in memory we log
46  * additional information which allows us to get the original (cmin, cmax)
47  * pair during visibility checks. Check the reorderbuffer.c's comment above
48  * ResolveCminCmaxDuringDecoding() for details.
49  *
50  * To facilitate all this we need our own visibility routine, as the normal
51  * ones are optimized for different usecases.
52  *
53  * To replace the normal catalog snapshots with decoding ones use the
54  * SetupHistoricSnapshot() and TeardownHistoricSnapshot() functions.
55  *
56  *
57  *
58  * The snapbuild machinery is starting up in several stages, as illustrated
59  * by the following graph:
60  *                 +-------------------------+
61  *        +----|SNAPBUILD_START                  |-------------+
62  *        |    +-------------------------+                         |
63  *        |                                     |                                                  |
64  *        |                                     |                                                  |
65  *        |             running_xacts with running xacts           |
66  *        |                                     |                                                  |
67  *        |                                     |                                                  |
68  *        |                                     v                                                  |
69  *        |    +-------------------------+                         v
70  *        |    |SNAPBUILD_FULL_SNAPSHOT  |------------>|
71  *        |    +-------------------------+                         |
72  * running_xacts                |                                          saved snapshot
73  * with zero xacts              |                                 at running_xacts's lsn
74  *        |                                     |                                                  |
75  *        |             all running toplevel TXNs finished         |
76  *        |                                     |                                                  |
77  *        |                                     v                                                  |
78  *        |    +-------------------------+                         |
79  *        +--->|SNAPBUILD_CONSISTENT     |<------------+
80  *                 +-------------------------+
81  *
82  * Initially the machinery is in the START stage. When an xl_running_xacts
83  * record is read that is sufficiently new (above the safe xmin horizon),
84  * there's a state transition. If there were no running xacts when the
85  * runnign_xacts record was generated, we'll directly go into CONSISTENT
86  * state, otherwise we'll switch to the FULL_SNAPSHOT state. Having a full
87  * snapshot means that all transactions that start henceforth can be decoded
88  * in their entirety, but transactions that started previously can't. In
89  * FULL_SNAPSHOT we'll switch into CONSISTENT once all those previously
90  * running transactions have committed or aborted.
91  *
92  * Only transactions that commit after CONSISTENT state has been reached will
93  * be replayed, even though they might have started while still in
94  * FULL_SNAPSHOT. That ensures that we'll reach a point where no previous
95  * changes has been exported, but all the following ones will be. That point
96  * is a convenient point to initialize replication from, which is why we
97  * export a snapshot at that point, which *can* be used to read normal data.
98  *
99  * Copyright (c) 2012-2016, PostgreSQL Global Development Group
100  *
101  * IDENTIFICATION
102  *        src/backend/replication/snapbuild.c
103  *
104  *-------------------------------------------------------------------------
105  */
106
107 #include "postgres.h"
108
109 #include <sys/stat.h>
110 #include <sys/types.h>
111 #include <unistd.h>
112
113 #include "miscadmin.h"
114
115 #include "access/heapam_xlog.h"
116 #include "access/transam.h"
117 #include "access/xact.h"
118
119 #include "replication/logical.h"
120 #include "replication/reorderbuffer.h"
121 #include "replication/snapbuild.h"
122
123 #include "utils/builtins.h"
124 #include "utils/memutils.h"
125 #include "utils/snapshot.h"
126 #include "utils/snapmgr.h"
127 #include "utils/tqual.h"
128
129 #include "storage/block.h"              /* debugging output */
130 #include "storage/fd.h"
131 #include "storage/lmgr.h"
132 #include "storage/proc.h"
133 #include "storage/procarray.h"
134 #include "storage/standby.h"
135
136 /*
137  * This struct contains the current state of the snapshot building
138  * machinery. Besides a forward declaration in the header, it is not exposed
139  * to the public, so we can easily change its contents.
140  */
141 struct SnapBuild
142 {
143         /* how far are we along building our first full snapshot */
144         SnapBuildState state;
145
146         /* private memory context used to allocate memory for this module. */
147         MemoryContext context;
148
149         /* all transactions < than this have committed/aborted */
150         TransactionId xmin;
151
152         /* all transactions >= than this are uncommitted */
153         TransactionId xmax;
154
155         /*
156          * Don't replay commits from an LSN < this LSN. This can be set externally
157          * but it will also be advanced (never retreat) from within snapbuild.c.
158          */
159         XLogRecPtr      start_decoding_at;
160
161         /*
162          * Don't start decoding WAL until the "xl_running_xacts" information
163          * indicates there are no running xids with an xid smaller than this.
164          */
165         TransactionId initial_xmin_horizon;
166
167         /*
168          * Snapshot that's valid to see the catalog state seen at this moment.
169          */
170         Snapshot        snapshot;
171
172         /*
173          * LSN of the last location we are sure a snapshot has been serialized to.
174          */
175         XLogRecPtr      last_serialized_snapshot;
176
177         /*
178          * The reorderbuffer we need to update with usable snapshots et al.
179          */
180         ReorderBuffer *reorder;
181
182         /*
183          * Information about initially running transactions
184          *
185          * When we start building a snapshot there already may be transactions in
186          * progress.  Those are stored in running.xip.  We don't have enough
187          * information about those to decode their contents, so until they are
188          * finished (xcnt=0) we cannot switch to a CONSISTENT state.
189          */
190         struct
191         {
192                 /*
193                  * As long as running.xcnt all XIDs < running.xmin and > running.xmax
194                  * have to be checked whether they still are running.
195                  */
196                 TransactionId xmin;
197                 TransactionId xmax;
198
199                 size_t          xcnt;           /* number of used xip entries */
200                 size_t          xcnt_space; /* allocated size of xip */
201                 TransactionId *xip;             /* running xacts array, xidComparator-sorted */
202         }                       running;
203
204         /*
205          * Array of transactions which could have catalog changes that committed
206          * between xmin and xmax.
207          */
208         struct
209         {
210                 /* number of committed transactions */
211                 size_t          xcnt;
212
213                 /* available space for committed transactions */
214                 size_t          xcnt_space;
215
216                 /*
217                  * Until we reach a CONSISTENT state, we record commits of all
218                  * transactions, not just the catalog changing ones. Record when that
219                  * changes so we know we cannot export a snapshot safely anymore.
220                  */
221                 bool            includes_all_transactions;
222
223                 /*
224                  * Array of committed transactions that have modified the catalog.
225                  *
226                  * As this array is frequently modified we do *not* keep it in
227                  * xidComparator order. Instead we sort the array when building &
228                  * distributing a snapshot.
229                  *
230                  * TODO: It's unclear whether that reasoning has much merit. Every
231                  * time we add something here after becoming consistent will also
232                  * require distributing a snapshot. Storing them sorted would
233                  * potentially also make it easier to purge (but more complicated wrt
234                  * wraparound?). Should be improved if sorting while building the
235                  * snapshot shows up in profiles.
236                  */
237                 TransactionId *xip;
238         }                       committed;
239 };
240
241 /*
242  * Starting a transaction -- which we need to do while exporting a snapshot --
243  * removes knowledge about the previously used resowner, so we save it here.
244  */
245 static ResourceOwner SavedResourceOwnerDuringExport = NULL;
246 static bool ExportInProgress = false;
247
248 /* transaction state manipulation functions */
249 static void SnapBuildEndTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid);
250
251 /* ->running manipulation */
252 static bool SnapBuildTxnIsRunning(SnapBuild *builder, TransactionId xid);
253
254 /* ->committed manipulation */
255 static void SnapBuildPurgeCommittedTxn(SnapBuild *builder);
256
257 /* snapshot building/manipulation/distribution functions */
258 static Snapshot SnapBuildBuildSnapshot(SnapBuild *builder, TransactionId xid);
259
260 static void SnapBuildFreeSnapshot(Snapshot snap);
261
262 static void SnapBuildSnapIncRefcount(Snapshot snap);
263
264 static void SnapBuildDistributeNewCatalogSnapshot(SnapBuild *builder, XLogRecPtr lsn);
265
266 /* xlog reading helper functions for SnapBuildProcessRecord */
267 static bool SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *running);
268
269 /* serialization functions */
270 static void SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn);
271 static bool SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn);
272
273
274 /*
275  * Allocate a new snapshot builder.
276  *
277  * xmin_horizon is the xid >=which we can be sure no catalog rows have been
278  * removed, start_lsn is the LSN >= we want to replay commits.
279  */
280 SnapBuild *
281 AllocateSnapshotBuilder(ReorderBuffer *reorder,
282                                                 TransactionId xmin_horizon,
283                                                 XLogRecPtr start_lsn)
284 {
285         MemoryContext context;
286         MemoryContext oldcontext;
287         SnapBuild  *builder;
288
289         /* allocate memory in own context, to have better accountability */
290         context = AllocSetContextCreate(CurrentMemoryContext,
291                                                                         "snapshot builder context",
292                                                                         ALLOCSET_DEFAULT_SIZES);
293         oldcontext = MemoryContextSwitchTo(context);
294
295         builder = palloc0(sizeof(SnapBuild));
296
297         builder->state = SNAPBUILD_START;
298         builder->context = context;
299         builder->reorder = reorder;
300         /* Other struct members initialized by zeroing via palloc0 above */
301
302         builder->committed.xcnt = 0;
303         builder->committed.xcnt_space = 128;            /* arbitrary number */
304         builder->committed.xip =
305                 palloc0(builder->committed.xcnt_space * sizeof(TransactionId));
306         builder->committed.includes_all_transactions = true;
307
308         builder->initial_xmin_horizon = xmin_horizon;
309         builder->start_decoding_at = start_lsn;
310
311         MemoryContextSwitchTo(oldcontext);
312
313         return builder;
314 }
315
316 /*
317  * Free a snapshot builder.
318  */
319 void
320 FreeSnapshotBuilder(SnapBuild *builder)
321 {
322         MemoryContext context = builder->context;
323
324         /* free snapshot explicitly, that contains some error checking */
325         if (builder->snapshot != NULL)
326         {
327                 SnapBuildSnapDecRefcount(builder->snapshot);
328                 builder->snapshot = NULL;
329         }
330
331         /* other resources are deallocated via memory context reset */
332         MemoryContextDelete(context);
333 }
334
335 /*
336  * Free an unreferenced snapshot that has previously been built by us.
337  */
338 static void
339 SnapBuildFreeSnapshot(Snapshot snap)
340 {
341         /* make sure we don't get passed an external snapshot */
342         Assert(snap->satisfies == HeapTupleSatisfiesHistoricMVCC);
343
344         /* make sure nobody modified our snapshot */
345         Assert(snap->curcid == FirstCommandId);
346         Assert(!snap->suboverflowed);
347         Assert(!snap->takenDuringRecovery);
348         Assert(snap->regd_count == 0);
349
350         /* slightly more likely, so it's checked even without c-asserts */
351         if (snap->copied)
352                 elog(ERROR, "cannot free a copied snapshot");
353
354         if (snap->active_count)
355                 elog(ERROR, "cannot free an active snapshot");
356
357         pfree(snap);
358 }
359
360 /*
361  * In which state of snapshot building are we?
362  */
363 SnapBuildState
364 SnapBuildCurrentState(SnapBuild *builder)
365 {
366         return builder->state;
367 }
368
369 /*
370  * Should the contents of transaction ending at 'ptr' be decoded?
371  */
372 bool
373 SnapBuildXactNeedsSkip(SnapBuild *builder, XLogRecPtr ptr)
374 {
375         return ptr < builder->start_decoding_at;
376 }
377
378 /*
379  * Increase refcount of a snapshot.
380  *
381  * This is used when handing out a snapshot to some external resource or when
382  * adding a Snapshot as builder->snapshot.
383  */
384 static void
385 SnapBuildSnapIncRefcount(Snapshot snap)
386 {
387         snap->active_count++;
388 }
389
390 /*
391  * Decrease refcount of a snapshot and free if the refcount reaches zero.
392  *
393  * Externally visible, so that external resources that have been handed an
394  * IncRef'ed Snapshot can adjust its refcount easily.
395  */
396 void
397 SnapBuildSnapDecRefcount(Snapshot snap)
398 {
399         /* make sure we don't get passed an external snapshot */
400         Assert(snap->satisfies == HeapTupleSatisfiesHistoricMVCC);
401
402         /* make sure nobody modified our snapshot */
403         Assert(snap->curcid == FirstCommandId);
404         Assert(!snap->suboverflowed);
405         Assert(!snap->takenDuringRecovery);
406
407         Assert(snap->regd_count == 0);
408
409         Assert(snap->active_count > 0);
410
411         /* slightly more likely, so it's checked even without casserts */
412         if (snap->copied)
413                 elog(ERROR, "cannot free a copied snapshot");
414
415         snap->active_count--;
416         if (snap->active_count == 0)
417                 SnapBuildFreeSnapshot(snap);
418 }
419
420 /*
421  * Build a new snapshot, based on currently committed catalog-modifying
422  * transactions.
423  *
424  * In-progress transactions with catalog access are *not* allowed to modify
425  * these snapshots; they have to copy them and fill in appropriate ->curcid
426  * and ->subxip/subxcnt values.
427  */
428 static Snapshot
429 SnapBuildBuildSnapshot(SnapBuild *builder, TransactionId xid)
430 {
431         Snapshot        snapshot;
432         Size            ssize;
433
434         Assert(builder->state >= SNAPBUILD_FULL_SNAPSHOT);
435
436         ssize = sizeof(SnapshotData)
437                 + sizeof(TransactionId) * builder->committed.xcnt
438                 + sizeof(TransactionId) * 1 /* toplevel xid */ ;
439
440         snapshot = MemoryContextAllocZero(builder->context, ssize);
441
442         snapshot->satisfies = HeapTupleSatisfiesHistoricMVCC;
443
444         /*
445          * We misuse the original meaning of SnapshotData's xip and subxip fields
446          * to make the more fitting for our needs.
447          *
448          * In the 'xip' array we store transactions that have to be treated as
449          * committed. Since we will only ever look at tuples from transactions
450          * that have modified the catalog it's more efficient to store those few
451          * that exist between xmin and xmax (frequently there are none).
452          *
453          * Snapshots that are used in transactions that have modified the catalog
454          * also use the 'subxip' array to store their toplevel xid and all the
455          * subtransaction xids so we can recognize when we need to treat rows as
456          * visible that are not in xip but still need to be visible. Subxip only
457          * gets filled when the transaction is copied into the context of a
458          * catalog modifying transaction since we otherwise share a snapshot
459          * between transactions. As long as a txn hasn't modified the catalog it
460          * doesn't need to treat any uncommitted rows as visible, so there is no
461          * need for those xids.
462          *
463          * Both arrays are qsort'ed so that we can use bsearch() on them.
464          */
465         Assert(TransactionIdIsNormal(builder->xmin));
466         Assert(TransactionIdIsNormal(builder->xmax));
467
468         snapshot->xmin = builder->xmin;
469         snapshot->xmax = builder->xmax;
470
471         /* store all transactions to be treated as committed by this snapshot */
472         snapshot->xip =
473                 (TransactionId *) ((char *) snapshot + sizeof(SnapshotData));
474         snapshot->xcnt = builder->committed.xcnt;
475         memcpy(snapshot->xip,
476                    builder->committed.xip,
477                    builder->committed.xcnt * sizeof(TransactionId));
478
479         /* sort so we can bsearch() */
480         qsort(snapshot->xip, snapshot->xcnt, sizeof(TransactionId), xidComparator);
481
482         /*
483          * Initially, subxip is empty, i.e. it's a snapshot to be used by
484          * transactions that don't modify the catalog. Will be filled by
485          * ReorderBufferCopySnap() if necessary.
486          */
487         snapshot->subxcnt = 0;
488         snapshot->subxip = NULL;
489
490         snapshot->suboverflowed = false;
491         snapshot->takenDuringRecovery = false;
492         snapshot->copied = false;
493         snapshot->curcid = FirstCommandId;
494         snapshot->active_count = 0;
495         snapshot->regd_count = 0;
496
497         return snapshot;
498 }
499
500 /*
501  * Export a snapshot so it can be set in another session with SET TRANSACTION
502  * SNAPSHOT.
503  *
504  * For that we need to start a transaction in the current backend as the
505  * importing side checks whether the source transaction is still open to make
506  * sure the xmin horizon hasn't advanced since then.
507  *
508  * After that we convert a locally built snapshot into the normal variant
509  * understood by HeapTupleSatisfiesMVCC et al.
510  */
511 const char *
512 SnapBuildExportSnapshot(SnapBuild *builder)
513 {
514         Snapshot        snap;
515         char       *snapname;
516         TransactionId xid;
517         TransactionId *newxip;
518         int                     newxcnt = 0;
519
520         if (builder->state != SNAPBUILD_CONSISTENT)
521                 elog(ERROR, "cannot export a snapshot before reaching a consistent state");
522
523         if (!builder->committed.includes_all_transactions)
524                 elog(ERROR, "cannot export a snapshot, not all transactions are monitored anymore");
525
526         /* so we don't overwrite the existing value */
527         if (TransactionIdIsValid(MyPgXact->xmin))
528                 elog(ERROR, "cannot export a snapshot when MyPgXact->xmin already is valid");
529
530         if (IsTransactionOrTransactionBlock())
531                 elog(ERROR, "cannot export a snapshot from within a transaction");
532
533         if (SavedResourceOwnerDuringExport)
534                 elog(ERROR, "can only export one snapshot at a time");
535
536         SavedResourceOwnerDuringExport = CurrentResourceOwner;
537         ExportInProgress = true;
538
539         StartTransactionCommand();
540
541         Assert(!FirstSnapshotSet);
542
543         /* There doesn't seem to a nice API to set these */
544         XactIsoLevel = XACT_REPEATABLE_READ;
545         XactReadOnly = true;
546
547         snap = SnapBuildBuildSnapshot(builder, GetTopTransactionId());
548
549         /*
550          * We know that snap->xmin is alive, enforced by the logical xmin
551          * mechanism. Due to that we can do this without locks, we're only
552          * changing our own value.
553          */
554         MyPgXact->xmin = snap->xmin;
555
556         /* allocate in transaction context */
557         newxip = (TransactionId *)
558                 palloc(sizeof(TransactionId) * GetMaxSnapshotXidCount());
559
560         /*
561          * snapbuild.c builds transactions in an "inverted" manner, which means it
562          * stores committed transactions in ->xip, not ones in progress. Build a
563          * classical snapshot by marking all non-committed transactions as
564          * in-progress. This can be expensive.
565          */
566         for (xid = snap->xmin; NormalTransactionIdPrecedes(xid, snap->xmax);)
567         {
568                 void       *test;
569
570                 /*
571                  * Check whether transaction committed using the decoding snapshot
572                  * meaning of ->xip.
573                  */
574                 test = bsearch(&xid, snap->xip, snap->xcnt,
575                                            sizeof(TransactionId), xidComparator);
576
577                 if (test == NULL)
578                 {
579                         if (newxcnt >= GetMaxSnapshotXidCount())
580                                 elog(ERROR, "snapshot too large");
581
582                         newxip[newxcnt++] = xid;
583                 }
584
585                 TransactionIdAdvance(xid);
586         }
587
588         snap->xcnt = newxcnt;
589         snap->xip = newxip;
590
591         /*
592          * now that we've built a plain snapshot, use the normal mechanisms for
593          * exporting it
594          */
595         snapname = ExportSnapshot(snap);
596
597         ereport(LOG,
598                         (errmsg_plural("exported logical decoding snapshot: \"%s\" with %u transaction ID",
599                 "exported logical decoding snapshot: \"%s\" with %u transaction IDs",
600                                                    snap->xcnt,
601                                                    snapname, snap->xcnt)));
602         return snapname;
603 }
604
605 /*
606  * Ensure there is a snapshot and if not build one for current transaction.
607  */
608 Snapshot
609 SnapBuildGetOrBuildSnapshot(SnapBuild *builder, TransactionId xid)
610 {
611         Assert(builder->state == SNAPBUILD_CONSISTENT);
612
613         /* only build a new snapshot if we don't have a prebuilt one */
614         if (builder->snapshot == NULL)
615         {
616                 builder->snapshot = SnapBuildBuildSnapshot(builder, xid);
617                 /* inrease refcount for the snapshot builder */
618                 SnapBuildSnapIncRefcount(builder->snapshot);
619         }
620
621         return builder->snapshot;
622 }
623
624 /*
625  * Reset a previously SnapBuildExportSnapshot()'ed snapshot if there is
626  * any. Aborts the previously started transaction and resets the resource
627  * owner back to its original value.
628  */
629 void
630 SnapBuildClearExportedSnapshot(void)
631 {
632         /* nothing exported, that is the usual case */
633         if (!ExportInProgress)
634                 return;
635
636         if (!IsTransactionState())
637                 elog(ERROR, "clearing exported snapshot in wrong transaction state");
638
639         /* make sure nothing  could have ever happened */
640         AbortCurrentTransaction();
641
642         CurrentResourceOwner = SavedResourceOwnerDuringExport;
643         SavedResourceOwnerDuringExport = NULL;
644         ExportInProgress = false;
645 }
646
647 /*
648  * Handle the effects of a single heap change, appropriate to the current state
649  * of the snapshot builder and returns whether changes made at (xid, lsn) can
650  * be decoded.
651  */
652 bool
653 SnapBuildProcessChange(SnapBuild *builder, TransactionId xid, XLogRecPtr lsn)
654 {
655         /*
656          * We can't handle data in transactions if we haven't built a snapshot
657          * yet, so don't store them.
658          */
659         if (builder->state < SNAPBUILD_FULL_SNAPSHOT)
660                 return false;
661
662         /*
663          * No point in keeping track of changes in transactions that we don't have
664          * enough information about to decode. This means that they started before
665          * we got into the SNAPBUILD_FULL_SNAPSHOT state.
666          */
667         if (builder->state < SNAPBUILD_CONSISTENT &&
668                 SnapBuildTxnIsRunning(builder, xid))
669                 return false;
670
671         /*
672          * If the reorderbuffer doesn't yet have a snapshot, add one now, it will
673          * be needed to decode the change we're currently processing.
674          */
675         if (!ReorderBufferXidHasBaseSnapshot(builder->reorder, xid))
676         {
677                 /* only build a new snapshot if we don't have a prebuilt one */
678                 if (builder->snapshot == NULL)
679                 {
680                         builder->snapshot = SnapBuildBuildSnapshot(builder, xid);
681                         /* inrease refcount for the snapshot builder */
682                         SnapBuildSnapIncRefcount(builder->snapshot);
683                 }
684
685                 /*
686                  * Increase refcount for the transaction we're handing the snapshot
687                  * out to.
688                  */
689                 SnapBuildSnapIncRefcount(builder->snapshot);
690                 ReorderBufferSetBaseSnapshot(builder->reorder, xid, lsn,
691                                                                          builder->snapshot);
692         }
693
694         return true;
695 }
696
697 /*
698  * Do CommandId/ComboCid handling after reading an xl_heap_new_cid record.
699  * This implies that a transaction has done some form of write to system
700  * catalogs.
701  */
702 void
703 SnapBuildProcessNewCid(SnapBuild *builder, TransactionId xid,
704                                            XLogRecPtr lsn, xl_heap_new_cid *xlrec)
705 {
706         CommandId       cid;
707
708         /*
709          * we only log new_cid's if a catalog tuple was modified, so mark the
710          * transaction as containing catalog modifications
711          */
712         ReorderBufferXidSetCatalogChanges(builder->reorder, xid, lsn);
713
714         ReorderBufferAddNewTupleCids(builder->reorder, xlrec->top_xid, lsn,
715                                                                  xlrec->target_node, xlrec->target_tid,
716                                                                  xlrec->cmin, xlrec->cmax,
717                                                                  xlrec->combocid);
718
719         /* figure out new command id */
720         if (xlrec->cmin != InvalidCommandId &&
721                 xlrec->cmax != InvalidCommandId)
722                 cid = Max(xlrec->cmin, xlrec->cmax);
723         else if (xlrec->cmax != InvalidCommandId)
724                 cid = xlrec->cmax;
725         else if (xlrec->cmin != InvalidCommandId)
726                 cid = xlrec->cmin;
727         else
728         {
729                 cid = InvalidCommandId; /* silence compiler */
730                 elog(ERROR, "xl_heap_new_cid record without a valid CommandId");
731         }
732
733         ReorderBufferAddNewCommandId(builder->reorder, xid, lsn, cid + 1);
734 }
735
736 /*
737  * Check whether `xid` is currently 'running'.
738  *
739  * Running transactions in our parlance are transactions which we didn't
740  * observe from the start so we can't properly decode their contents. They
741  * only exist after we freshly started from an < CONSISTENT snapshot.
742  */
743 static bool
744 SnapBuildTxnIsRunning(SnapBuild *builder, TransactionId xid)
745 {
746         Assert(builder->state < SNAPBUILD_CONSISTENT);
747         Assert(TransactionIdIsNormal(builder->running.xmin));
748         Assert(TransactionIdIsNormal(builder->running.xmax));
749
750         if (builder->running.xcnt &&
751                 NormalTransactionIdFollows(xid, builder->running.xmin) &&
752                 NormalTransactionIdPrecedes(xid, builder->running.xmax))
753         {
754                 TransactionId *search =
755                 bsearch(&xid, builder->running.xip, builder->running.xcnt_space,
756                                 sizeof(TransactionId), xidComparator);
757
758                 if (search != NULL)
759                 {
760                         Assert(*search == xid);
761                         return true;
762                 }
763         }
764
765         return false;
766 }
767
768 /*
769  * Add a new Snapshot to all transactions we're decoding that currently are
770  * in-progress so they can see new catalog contents made by the transaction
771  * that just committed. This is necessary because those in-progress
772  * transactions will use the new catalog's contents from here on (at the very
773  * least everything they do needs to be compatible with newer catalog
774  * contents).
775  */
776 static void
777 SnapBuildDistributeNewCatalogSnapshot(SnapBuild *builder, XLogRecPtr lsn)
778 {
779         dlist_iter      txn_i;
780         ReorderBufferTXN *txn;
781
782         /*
783          * Iterate through all toplevel transactions. This can include
784          * subtransactions which we just don't yet know to be that, but that's
785          * fine, they will just get an unnecessary snapshot queued.
786          */
787         dlist_foreach(txn_i, &builder->reorder->toplevel_by_lsn)
788         {
789                 txn = dlist_container(ReorderBufferTXN, node, txn_i.cur);
790
791                 Assert(TransactionIdIsValid(txn->xid));
792
793                 /*
794                  * If we don't have a base snapshot yet, there are no changes in this
795                  * transaction which in turn implies we don't yet need a snapshot at
796                  * all. We'll add a snapshot when the first change gets queued.
797                  *
798                  * NB: This works correctly even for subtransactions because
799                  * ReorderBufferCommitChild() takes care to pass the parent the base
800                  * snapshot, and while iterating the changequeue we'll get the change
801                  * from the subtxn.
802                  */
803                 if (!ReorderBufferXidHasBaseSnapshot(builder->reorder, txn->xid))
804                         continue;
805
806                 elog(DEBUG2, "adding a new snapshot to %u at %X/%X",
807                          txn->xid, (uint32) (lsn >> 32), (uint32) lsn);
808
809                 /*
810                  * increase the snapshot's refcount for the transaction we are handing
811                  * it out to
812                  */
813                 SnapBuildSnapIncRefcount(builder->snapshot);
814                 ReorderBufferAddSnapshot(builder->reorder, txn->xid, lsn,
815                                                                  builder->snapshot);
816         }
817 }
818
819 /*
820  * Keep track of a new catalog changing transaction that has committed.
821  */
822 static void
823 SnapBuildAddCommittedTxn(SnapBuild *builder, TransactionId xid)
824 {
825         Assert(TransactionIdIsValid(xid));
826
827         if (builder->committed.xcnt == builder->committed.xcnt_space)
828         {
829                 builder->committed.xcnt_space = builder->committed.xcnt_space * 2 + 1;
830
831                 elog(DEBUG1, "increasing space for committed transactions to %u",
832                          (uint32) builder->committed.xcnt_space);
833
834                 builder->committed.xip = repalloc(builder->committed.xip,
835                                           builder->committed.xcnt_space * sizeof(TransactionId));
836         }
837
838         /*
839          * TODO: It might make sense to keep the array sorted here instead of
840          * doing it every time we build a new snapshot. On the other hand this
841          * gets called repeatedly when a transaction with subtransactions commits.
842          */
843         builder->committed.xip[builder->committed.xcnt++] = xid;
844 }
845
846 /*
847  * Remove knowledge about transactions we treat as committed that are smaller
848  * than ->xmin. Those won't ever get checked via the ->committed array but via
849  * the clog machinery, so we don't need to waste memory on them.
850  */
851 static void
852 SnapBuildPurgeCommittedTxn(SnapBuild *builder)
853 {
854         int                     off;
855         TransactionId *workspace;
856         int                     surviving_xids = 0;
857
858         /* not ready yet */
859         if (!TransactionIdIsNormal(builder->xmin))
860                 return;
861
862         /* TODO: Neater algorithm than just copying and iterating? */
863         workspace =
864                 MemoryContextAlloc(builder->context,
865                                                    builder->committed.xcnt * sizeof(TransactionId));
866
867         /* copy xids that still are interesting to workspace */
868         for (off = 0; off < builder->committed.xcnt; off++)
869         {
870                 if (NormalTransactionIdPrecedes(builder->committed.xip[off],
871                                                                                 builder->xmin))
872                         ;                                       /* remove */
873                 else
874                         workspace[surviving_xids++] = builder->committed.xip[off];
875         }
876
877         /* copy workspace back to persistent state */
878         memcpy(builder->committed.xip, workspace,
879                    surviving_xids * sizeof(TransactionId));
880
881         elog(DEBUG3, "purged committed transactions from %u to %u, xmin: %u, xmax: %u",
882                  (uint32) builder->committed.xcnt, (uint32) surviving_xids,
883                  builder->xmin, builder->xmax);
884         builder->committed.xcnt = surviving_xids;
885
886         pfree(workspace);
887 }
888
889 /*
890  * Common logic for SnapBuildAbortTxn and SnapBuildCommitTxn dealing with
891  * keeping track of the amount of running transactions.
892  */
893 static void
894 SnapBuildEndTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid)
895 {
896         if (builder->state == SNAPBUILD_CONSISTENT)
897                 return;
898
899         /*
900          * NB: This handles subtransactions correctly even if we started from
901          * suboverflowed xl_running_xacts because we only keep track of toplevel
902          * transactions. Since the latter are always allocated before their
903          * subxids and since they end at the same time it's sufficient to deal
904          * with them here.
905          */
906         if (SnapBuildTxnIsRunning(builder, xid))
907         {
908                 Assert(builder->running.xcnt > 0);
909
910                 if (!--builder->running.xcnt)
911                 {
912                         /*
913                          * None of the originally running transaction is running anymore,
914                          * so our incrementaly built snapshot now is consistent.
915                          */
916                         ereport(LOG,
917                                   (errmsg("logical decoding found consistent point at %X/%X",
918                                                   (uint32) (lsn >> 32), (uint32) lsn),
919                                    errdetail("Transaction ID %u finished; no more running transactions.",
920                                                          xid)));
921                         builder->state = SNAPBUILD_CONSISTENT;
922                 }
923         }
924 }
925
926 /*
927  * Abort a transaction, throw away all state we kept.
928  */
929 void
930 SnapBuildAbortTxn(SnapBuild *builder, XLogRecPtr lsn,
931                                   TransactionId xid,
932                                   int nsubxacts, TransactionId *subxacts)
933 {
934         int                     i;
935
936         for (i = 0; i < nsubxacts; i++)
937         {
938                 TransactionId subxid = subxacts[i];
939
940                 SnapBuildEndTxn(builder, lsn, subxid);
941         }
942
943         SnapBuildEndTxn(builder, lsn, xid);
944 }
945
946 /*
947  * Handle everything that needs to be done when a transaction commits
948  */
949 void
950 SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid,
951                                    int nsubxacts, TransactionId *subxacts)
952 {
953         int                     nxact;
954
955         bool            forced_timetravel = false;
956         bool            sub_needs_timetravel = false;
957         bool            top_needs_timetravel = false;
958
959         TransactionId xmax = xid;
960
961         /*
962          * If we couldn't observe every change of a transaction because it was
963          * already running at the point we started to observe we have to assume it
964          * made catalog changes.
965          *
966          * This has the positive benefit that we afterwards have enough
967          * information to build an exportable snapshot that's usable by pg_dump et
968          * al.
969          */
970         if (builder->state < SNAPBUILD_CONSISTENT)
971         {
972                 /* ensure that only commits after this are getting replayed */
973                 if (builder->start_decoding_at <= lsn)
974                         builder->start_decoding_at = lsn + 1;
975
976                 /*
977                  * We could avoid treating !SnapBuildTxnIsRunning transactions as
978                  * timetravel ones, but we want to be able to export a snapshot when
979                  * we reached consistency.
980                  */
981                 forced_timetravel = true;
982                 elog(DEBUG1, "forced to assume catalog changes for xid %u because it was running too early", xid);
983         }
984
985         for (nxact = 0; nxact < nsubxacts; nxact++)
986         {
987                 TransactionId subxid = subxacts[nxact];
988
989                 /*
990                  * make sure txn is not tracked in running txn's anymore, switch state
991                  */
992                 SnapBuildEndTxn(builder, lsn, subxid);
993
994                 /*
995                  * If we're forcing timetravel we also need visibility information
996                  * about subtransaction, so keep track of subtransaction's state.
997                  */
998                 if (forced_timetravel)
999                 {
1000                         SnapBuildAddCommittedTxn(builder, subxid);
1001                         if (NormalTransactionIdFollows(subxid, xmax))
1002                                 xmax = subxid;
1003                 }
1004
1005                 /*
1006                  * Add subtransaction to base snapshot if it DDL, we don't distinguish
1007                  * to toplevel transactions there.
1008                  */
1009                 else if (ReorderBufferXidHasCatalogChanges(builder->reorder, subxid))
1010                 {
1011                         sub_needs_timetravel = true;
1012
1013                         elog(DEBUG1, "found subtransaction %u:%u with catalog changes.",
1014                                  xid, subxid);
1015
1016                         SnapBuildAddCommittedTxn(builder, subxid);
1017
1018                         if (NormalTransactionIdFollows(subxid, xmax))
1019                                 xmax = subxid;
1020                 }
1021         }
1022
1023         /*
1024          * Make sure toplevel txn is not tracked in running txn's anymore, switch
1025          * state to consistent if possible.
1026          */
1027         SnapBuildEndTxn(builder, lsn, xid);
1028
1029         if (forced_timetravel)
1030         {
1031                 elog(DEBUG2, "forced transaction %u to do timetravel.", xid);
1032
1033                 SnapBuildAddCommittedTxn(builder, xid);
1034         }
1035         /* add toplevel transaction to base snapshot */
1036         else if (ReorderBufferXidHasCatalogChanges(builder->reorder, xid))
1037         {
1038                 elog(DEBUG2, "found top level transaction %u, with catalog changes!",
1039                          xid);
1040
1041                 top_needs_timetravel = true;
1042                 SnapBuildAddCommittedTxn(builder, xid);
1043         }
1044         else if (sub_needs_timetravel)
1045         {
1046                 /* mark toplevel txn as timetravel as well */
1047                 SnapBuildAddCommittedTxn(builder, xid);
1048         }
1049
1050         /* if there's any reason to build a historic snapshot, do so now */
1051         if (forced_timetravel || top_needs_timetravel || sub_needs_timetravel)
1052         {
1053                 /*
1054                  * Adjust xmax of the snapshot builder, we only do that for committed,
1055                  * catalog modifying, transactions, everything else isn't interesting
1056                  * for us since we'll never look at the respective rows.
1057                  */
1058                 if (!TransactionIdIsValid(builder->xmax) ||
1059                         TransactionIdFollowsOrEquals(xmax, builder->xmax))
1060                 {
1061                         builder->xmax = xmax;
1062                         TransactionIdAdvance(builder->xmax);
1063                 }
1064
1065                 /*
1066                  * If we haven't built a complete snapshot yet there's no need to hand
1067                  * it out, it wouldn't (and couldn't) be used anyway.
1068                  */
1069                 if (builder->state < SNAPBUILD_FULL_SNAPSHOT)
1070                         return;
1071
1072                 /*
1073                  * Decrease the snapshot builder's refcount of the old snapshot, note
1074                  * that it still will be used if it has been handed out to the
1075                  * reorderbuffer earlier.
1076                  */
1077                 if (builder->snapshot)
1078                         SnapBuildSnapDecRefcount(builder->snapshot);
1079
1080                 builder->snapshot = SnapBuildBuildSnapshot(builder, xid);
1081
1082                 /* we might need to execute invalidations, add snapshot */
1083                 if (!ReorderBufferXidHasBaseSnapshot(builder->reorder, xid))
1084                 {
1085                         SnapBuildSnapIncRefcount(builder->snapshot);
1086                         ReorderBufferSetBaseSnapshot(builder->reorder, xid, lsn,
1087                                                                                  builder->snapshot);
1088                 }
1089
1090                 /* refcount of the snapshot builder for the new snapshot */
1091                 SnapBuildSnapIncRefcount(builder->snapshot);
1092
1093                 /* add a new Snapshot to all currently running transactions */
1094                 SnapBuildDistributeNewCatalogSnapshot(builder, lsn);
1095         }
1096         else
1097         {
1098                 /* record that we cannot export a general snapshot anymore */
1099                 builder->committed.includes_all_transactions = false;
1100         }
1101 }
1102
1103
1104 /* -----------------------------------
1105  * Snapshot building functions dealing with xlog records
1106  * -----------------------------------
1107  */
1108
1109 /*
1110  * Process a running xacts record, and use its information to first build a
1111  * historic snapshot and later to release resources that aren't needed
1112  * anymore.
1113  */
1114 void
1115 SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *running)
1116 {
1117         ReorderBufferTXN *txn;
1118
1119         /*
1120          * If we're not consistent yet, inspect the record to see whether it
1121          * allows to get closer to being consistent. If we are consistent, dump
1122          * our snapshot so others or we, after a restart, can use it.
1123          */
1124         if (builder->state < SNAPBUILD_CONSISTENT)
1125         {
1126                 /* returns false if there's no point in performing cleanup just yet */
1127                 if (!SnapBuildFindSnapshot(builder, lsn, running))
1128                         return;
1129         }
1130         else
1131                 SnapBuildSerialize(builder, lsn);
1132
1133         /*
1134          * Update range of interesting xids based on the running xacts
1135          * information. We don't increase ->xmax using it, because once we are in
1136          * a consistent state we can do that ourselves and much more efficiently
1137          * so, because we only need to do it for catalog transactions since we
1138          * only ever look at those.
1139          *
1140          * NB: Because of that xmax can be lower than xmin, because we only
1141          * increase xmax when a catalog modifying transaction commits. While odd
1142          * looking, it's correct and actually more efficient this way since we hit
1143          * fast paths in tqual.c.
1144          */
1145         builder->xmin = running->oldestRunningXid;
1146
1147         /* Remove transactions we don't need to keep track off anymore */
1148         SnapBuildPurgeCommittedTxn(builder);
1149
1150         elog(DEBUG3, "xmin: %u, xmax: %u, oldestrunning: %u",
1151                  builder->xmin, builder->xmax,
1152                  running->oldestRunningXid);
1153
1154         /*
1155          * Inrease shared memory limits, so vacuum can work on tuples we prevented
1156          * from being pruned till now.
1157          */
1158         LogicalIncreaseXminForSlot(lsn, running->oldestRunningXid);
1159
1160         /*
1161          * Also tell the slot where we can restart decoding from. We don't want to
1162          * do that after every commit because changing that implies an fsync of
1163          * the logical slot's state file, so we only do it every time we see a
1164          * running xacts record.
1165          *
1166          * Do so by looking for the oldest in progress transaction (determined by
1167          * the first LSN of any of its relevant records). Every transaction
1168          * remembers the last location we stored the snapshot to disk before its
1169          * beginning. That point is where we can restart from.
1170          */
1171
1172         /*
1173          * Can't know about a serialized snapshot's location if we're not
1174          * consistent.
1175          */
1176         if (builder->state < SNAPBUILD_CONSISTENT)
1177                 return;
1178
1179         txn = ReorderBufferGetOldestTXN(builder->reorder);
1180
1181         /*
1182          * oldest ongoing txn might have started when we didn't yet serialize
1183          * anything because we hadn't reached a consistent state yet.
1184          */
1185         if (txn != NULL && txn->restart_decoding_lsn != InvalidXLogRecPtr)
1186                 LogicalIncreaseRestartDecodingForSlot(lsn, txn->restart_decoding_lsn);
1187
1188         /*
1189          * No in-progress transaction, can reuse the last serialized snapshot if
1190          * we have one.
1191          */
1192         else if (txn == NULL &&
1193                 builder->reorder->current_restart_decoding_lsn != InvalidXLogRecPtr &&
1194                          builder->last_serialized_snapshot != InvalidXLogRecPtr)
1195                 LogicalIncreaseRestartDecodingForSlot(lsn,
1196                                                                                   builder->last_serialized_snapshot);
1197 }
1198
1199
1200 /*
1201  * Build the start of a snapshot that's capable of decoding the catalog.
1202  *
1203  * Helper function for SnapBuildProcessRunningXacts() while we're not yet
1204  * consistent.
1205  *
1206  * Returns true if there is a point in performing internal maintenance/cleanup
1207  * using the xl_running_xacts record.
1208  */
1209 static bool
1210 SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *running)
1211 {
1212         /* ---
1213          * Build catalog decoding snapshot incrementally using information about
1214          * the currently running transactions. There are several ways to do that:
1215          *
1216          * a) There were no running transactions when the xl_running_xacts record
1217          *        was inserted, jump to CONSISTENT immediately. We might find such a
1218          *        state we were waiting for b) and c).
1219          *
1220          * b) Wait for all toplevel transactions that were running to end. We
1221          *        simply track the number of in-progress toplevel transactions and
1222          *        lower it whenever one commits or aborts. When that number
1223          *        (builder->running.xcnt) reaches zero, we can go from FULL_SNAPSHOT
1224          *        to CONSISTENT.
1225          *        NB: We need to search running.xip when seeing a transaction's end to
1226          *        make sure it's a toplevel transaction and it's been one of the
1227          *        initially running ones.
1228          *        Interestingly, in contrast to HS, this allows us not to care about
1229          *        subtransactions - and by extension suboverflowed xl_running_xacts -
1230          *        at all.
1231          *
1232          * c) This (in a previous run) or another decoding slot serialized a
1233          *        snapshot to disk that we can use.
1234          * ---
1235          */
1236
1237         /*
1238          * xl_running_xact record is older than what we can use, we might not have
1239          * all necessary catalog rows anymore.
1240          */
1241         if (TransactionIdIsNormal(builder->initial_xmin_horizon) &&
1242                 NormalTransactionIdPrecedes(running->oldestRunningXid,
1243                                                                         builder->initial_xmin_horizon))
1244         {
1245                 ereport(DEBUG1,
1246                                 (errmsg_internal("skipping snapshot at %X/%X while building logical decoding snapshot, xmin horizon too low",
1247                                                                  (uint32) (lsn >> 32), (uint32) lsn),
1248                 errdetail_internal("initial xmin horizon of %u vs the snapshot's %u",
1249                                  builder->initial_xmin_horizon, running->oldestRunningXid)));
1250                 return true;
1251         }
1252
1253         /*
1254          * a) No transaction were running, we can jump to consistent.
1255          *
1256          * NB: We might have already started to incrementally assemble a snapshot,
1257          * so we need to be careful to deal with that.
1258          */
1259         if (running->xcnt == 0)
1260         {
1261                 if (builder->start_decoding_at == InvalidXLogRecPtr ||
1262                         builder->start_decoding_at <= lsn)
1263                         /* can decode everything after this */
1264                         builder->start_decoding_at = lsn + 1;
1265
1266                 /* As no transactions were running xmin/xmax can be trivially set. */
1267                 builder->xmin = running->nextXid;               /* < are finished */
1268                 builder->xmax = running->nextXid;               /* >= are running */
1269
1270                 /* so we can safely use the faster comparisons */
1271                 Assert(TransactionIdIsNormal(builder->xmin));
1272                 Assert(TransactionIdIsNormal(builder->xmax));
1273
1274                 /* no transactions running now */
1275                 builder->running.xcnt = 0;
1276                 builder->running.xmin = InvalidTransactionId;
1277                 builder->running.xmax = InvalidTransactionId;
1278
1279                 builder->state = SNAPBUILD_CONSISTENT;
1280
1281                 ereport(LOG,
1282                                 (errmsg("logical decoding found consistent point at %X/%X",
1283                                                 (uint32) (lsn >> 32), (uint32) lsn),
1284                                  errdetail("There are no running transactions.")));
1285
1286                 return false;
1287         }
1288         /* c) valid on disk state */
1289         else if (SnapBuildRestore(builder, lsn))
1290         {
1291                 /* there won't be any state to cleanup */
1292                 return false;
1293         }
1294
1295         /*
1296          * b) first encounter of a useable xl_running_xacts record. If we had
1297          * found one earlier we would either track running transactions (i.e.
1298          * builder->running.xcnt != 0) or be consistent (this function wouldn't
1299          * get called).
1300          */
1301         else if (!builder->running.xcnt)
1302         {
1303                 int                     off;
1304
1305                 /*
1306                  * We only care about toplevel xids as those are the ones we
1307                  * definitely see in the wal stream. As snapbuild.c tracks committed
1308                  * instead of running transactions we don't need to know anything
1309                  * about uncommitted subtransactions.
1310                  */
1311
1312                 /*
1313                  * Start with an xmin/xmax that's correct for future, when all the
1314                  * currently running transactions have finished. We'll update both
1315                  * while waiting for the pending transactions to finish.
1316                  */
1317                 builder->xmin = running->nextXid;               /* < are finished */
1318                 builder->xmax = running->nextXid;               /* >= are running */
1319
1320                 /* so we can safely use the faster comparisons */
1321                 Assert(TransactionIdIsNormal(builder->xmin));
1322                 Assert(TransactionIdIsNormal(builder->xmax));
1323
1324                 builder->running.xcnt = running->xcnt;
1325                 builder->running.xcnt_space = running->xcnt;
1326                 builder->running.xip =
1327                         MemoryContextAlloc(builder->context,
1328                                                            builder->running.xcnt * sizeof(TransactionId));
1329                 memcpy(builder->running.xip, running->xids,
1330                            builder->running.xcnt * sizeof(TransactionId));
1331
1332                 /* sort so we can do a binary search */
1333                 qsort(builder->running.xip, builder->running.xcnt,
1334                           sizeof(TransactionId), xidComparator);
1335
1336                 builder->running.xmin = builder->running.xip[0];
1337                 builder->running.xmax = builder->running.xip[running->xcnt - 1];
1338
1339                 /* makes comparisons cheaper later */
1340                 TransactionIdRetreat(builder->running.xmin);
1341                 TransactionIdAdvance(builder->running.xmax);
1342
1343                 builder->state = SNAPBUILD_FULL_SNAPSHOT;
1344
1345                 ereport(LOG,
1346                         (errmsg("logical decoding found initial starting point at %X/%X",
1347                                         (uint32) (lsn >> 32), (uint32) lsn),
1348                          errdetail_plural("%u transaction needs to finish.",
1349                                                           "%u transactions need to finish.",
1350                                                           builder->running.xcnt,
1351                                                           (uint32) builder->running.xcnt)));
1352
1353                 /*
1354                  * Iterate through all xids, wait for them to finish.
1355                  *
1356                  * This isn't required for the correctness of decoding, but to allow
1357                  * isolationtester to notice that we're currently waiting for
1358                  * something.
1359                  */
1360                 for (off = 0; off < builder->running.xcnt; off++)
1361                 {
1362                         TransactionId xid = builder->running.xip[off];
1363
1364                         /*
1365                          * Upper layers should prevent that we ever need to wait on
1366                          * ourselves. Check anyway, since failing to do so would either
1367                          * result in an endless wait or an Assert() failure.
1368                          */
1369                         if (TransactionIdIsCurrentTransactionId(xid))
1370                                 elog(ERROR, "waiting for ourselves");
1371
1372                         XactLockTableWait(xid, NULL, NULL, XLTW_None);
1373                 }
1374
1375                 /* nothing could have built up so far, so don't perform cleanup */
1376                 return false;
1377         }
1378
1379         /*
1380          * We already started to track running xacts and need to wait for all
1381          * in-progress ones to finish. We fall through to the normal processing of
1382          * records so incremental cleanup can be performed.
1383          */
1384         return true;
1385 }
1386
1387
1388 /* -----------------------------------
1389  * Snapshot serialization support
1390  * -----------------------------------
1391  */
1392
1393 /*
1394  * We store current state of struct SnapBuild on disk in the following manner:
1395  *
1396  * struct SnapBuildOnDisk;
1397  * TransactionId * running.xcnt_space;
1398  * TransactionId * committed.xcnt; (*not xcnt_space*)
1399  *
1400  */
1401 typedef struct SnapBuildOnDisk
1402 {
1403         /* first part of this struct needs to be version independent */
1404
1405         /* data not covered by checksum */
1406         uint32          magic;
1407         pg_crc32c       checksum;
1408
1409         /* data covered by checksum */
1410
1411         /* version, in case we want to support pg_upgrade */
1412         uint32          version;
1413         /* how large is the on disk data, excluding the constant sized part */
1414         uint32          length;
1415
1416         /* version dependent part */
1417         SnapBuild       builder;
1418
1419         /* variable amount of TransactionIds follows */
1420 } SnapBuildOnDisk;
1421
1422 #define SnapBuildOnDiskConstantSize \
1423         offsetof(SnapBuildOnDisk, builder)
1424 #define SnapBuildOnDiskNotChecksummedSize \
1425         offsetof(SnapBuildOnDisk, version)
1426
1427 #define SNAPBUILD_MAGIC 0x51A1E001
1428 #define SNAPBUILD_VERSION 2
1429
1430 /*
1431  * Store/Load a snapshot from disk, depending on the snapshot builder's state.
1432  *
1433  * Supposed to be used by external (i.e. not snapbuild.c) code that just read
1434  * a record that's a potential location for a serialized snapshot.
1435  */
1436 void
1437 SnapBuildSerializationPoint(SnapBuild *builder, XLogRecPtr lsn)
1438 {
1439         if (builder->state < SNAPBUILD_CONSISTENT)
1440                 SnapBuildRestore(builder, lsn);
1441         else
1442                 SnapBuildSerialize(builder, lsn);
1443 }
1444
1445 /*
1446  * Serialize the snapshot 'builder' at the location 'lsn' if it hasn't already
1447  * been done by another decoding process.
1448  */
1449 static void
1450 SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn)
1451 {
1452         Size            needed_length;
1453         SnapBuildOnDisk *ondisk;
1454         char       *ondisk_c;
1455         int                     fd;
1456         char            tmppath[MAXPGPATH];
1457         char            path[MAXPGPATH];
1458         int                     ret;
1459         struct stat stat_buf;
1460         Size            sz;
1461
1462         Assert(lsn != InvalidXLogRecPtr);
1463         Assert(builder->last_serialized_snapshot == InvalidXLogRecPtr ||
1464                    builder->last_serialized_snapshot <= lsn);
1465
1466         /*
1467          * no point in serializing if we cannot continue to work immediately after
1468          * restoring the snapshot
1469          */
1470         if (builder->state < SNAPBUILD_CONSISTENT)
1471                 return;
1472
1473         /*
1474          * We identify snapshots by the LSN they are valid for. We don't need to
1475          * include timelines in the name as each LSN maps to exactly one timeline
1476          * unless the user used pg_resetxlog or similar. If a user did so, there's
1477          * no hope continuing to decode anyway.
1478          */
1479         sprintf(path, "pg_logical/snapshots/%X-%X.snap",
1480                         (uint32) (lsn >> 32), (uint32) lsn);
1481
1482         /*
1483          * first check whether some other backend already has written the snapshot
1484          * for this LSN. It's perfectly fine if there's none, so we accept ENOENT
1485          * as a valid state. Everything else is an unexpected error.
1486          */
1487         ret = stat(path, &stat_buf);
1488
1489         if (ret != 0 && errno != ENOENT)
1490                 ereport(ERROR,
1491                                 (errmsg("could not stat file \"%s\": %m", path)));
1492
1493         else if (ret == 0)
1494         {
1495                 /*
1496                  * somebody else has already serialized to this point, don't overwrite
1497                  * but remember location, so we don't need to read old data again.
1498                  *
1499                  * To be sure it has been synced to disk after the rename() from the
1500                  * tempfile filename to the real filename, we just repeat the fsync.
1501                  * That ought to be cheap because in most scenarios it should already
1502                  * be safely on disk.
1503                  */
1504                 fsync_fname(path, false);
1505                 fsync_fname("pg_logical/snapshots", true);
1506
1507                 builder->last_serialized_snapshot = lsn;
1508                 goto out;
1509         }
1510
1511         /*
1512          * there is an obvious race condition here between the time we stat(2) the
1513          * file and us writing the file. But we rename the file into place
1514          * atomically and all files created need to contain the same data anyway,
1515          * so this is perfectly fine, although a bit of a resource waste. Locking
1516          * seems like pointless complication.
1517          */
1518         elog(DEBUG1, "serializing snapshot to %s", path);
1519
1520         /* to make sure only we will write to this tempfile, include pid */
1521         sprintf(tmppath, "pg_logical/snapshots/%X-%X.snap.%u.tmp",
1522                         (uint32) (lsn >> 32), (uint32) lsn, MyProcPid);
1523
1524         /*
1525          * Unlink temporary file if it already exists, needs to have been before a
1526          * crash/error since we won't enter this function twice from within a
1527          * single decoding slot/backend and the temporary file contains the pid of
1528          * the current process.
1529          */
1530         if (unlink(tmppath) != 0 && errno != ENOENT)
1531                 ereport(ERROR,
1532                                 (errcode_for_file_access(),
1533                                  errmsg("could not remove file \"%s\": %m", path)));
1534
1535         needed_length = sizeof(SnapBuildOnDisk) +
1536                 sizeof(TransactionId) * builder->running.xcnt_space +
1537                 sizeof(TransactionId) * builder->committed.xcnt;
1538
1539         ondisk_c = MemoryContextAllocZero(builder->context, needed_length);
1540         ondisk = (SnapBuildOnDisk *) ondisk_c;
1541         ondisk->magic = SNAPBUILD_MAGIC;
1542         ondisk->version = SNAPBUILD_VERSION;
1543         ondisk->length = needed_length;
1544         INIT_CRC32C(ondisk->checksum);
1545         COMP_CRC32C(ondisk->checksum,
1546                                 ((char *) ondisk) + SnapBuildOnDiskNotChecksummedSize,
1547                         SnapBuildOnDiskConstantSize - SnapBuildOnDiskNotChecksummedSize);
1548         ondisk_c += sizeof(SnapBuildOnDisk);
1549
1550         memcpy(&ondisk->builder, builder, sizeof(SnapBuild));
1551         /* NULL-ify memory-only data */
1552         ondisk->builder.context = NULL;
1553         ondisk->builder.snapshot = NULL;
1554         ondisk->builder.reorder = NULL;
1555         ondisk->builder.running.xip = NULL;
1556         ondisk->builder.committed.xip = NULL;
1557
1558         COMP_CRC32C(ondisk->checksum,
1559                                 &ondisk->builder,
1560                                 sizeof(SnapBuild));
1561
1562         /* copy running xacts */
1563         sz = sizeof(TransactionId) * builder->running.xcnt_space;
1564         memcpy(ondisk_c, builder->running.xip, sz);
1565         COMP_CRC32C(ondisk->checksum, ondisk_c, sz);
1566         ondisk_c += sz;
1567
1568         /* copy committed xacts */
1569         sz = sizeof(TransactionId) * builder->committed.xcnt;
1570         memcpy(ondisk_c, builder->committed.xip, sz);
1571         COMP_CRC32C(ondisk->checksum, ondisk_c, sz);
1572         ondisk_c += sz;
1573
1574         FIN_CRC32C(ondisk->checksum);
1575
1576         /* we have valid data now, open tempfile and write it there */
1577         fd = OpenTransientFile(tmppath,
1578                                                    O_CREAT | O_EXCL | O_WRONLY | PG_BINARY,
1579                                                    S_IRUSR | S_IWUSR);
1580         if (fd < 0)
1581                 ereport(ERROR,
1582                                 (errmsg("could not open file \"%s\": %m", path)));
1583
1584         if ((write(fd, ondisk, needed_length)) != needed_length)
1585         {
1586                 CloseTransientFile(fd);
1587                 ereport(ERROR,
1588                                 (errcode_for_file_access(),
1589                                  errmsg("could not write to file \"%s\": %m", tmppath)));
1590         }
1591
1592         /*
1593          * fsync the file before renaming so that even if we crash after this we
1594          * have either a fully valid file or nothing.
1595          *
1596          * TODO: Do the fsync() via checkpoints/restartpoints, doing it here has
1597          * some noticeable overhead since it's performed synchronously during
1598          * decoding?
1599          */
1600         if (pg_fsync(fd) != 0)
1601         {
1602                 CloseTransientFile(fd);
1603                 ereport(ERROR,
1604                                 (errcode_for_file_access(),
1605                                  errmsg("could not fsync file \"%s\": %m", tmppath)));
1606         }
1607         CloseTransientFile(fd);
1608
1609         fsync_fname("pg_logical/snapshots", true);
1610
1611         /*
1612          * We may overwrite the work from some other backend, but that's ok, our
1613          * snapshot is valid as well, we'll just have done some superfluous work.
1614          */
1615         if (rename(tmppath, path) != 0)
1616         {
1617                 ereport(ERROR,
1618                                 (errcode_for_file_access(),
1619                                  errmsg("could not rename file \"%s\" to \"%s\": %m",
1620                                                 tmppath, path)));
1621         }
1622
1623         /* make sure we persist */
1624         fsync_fname(path, false);
1625         fsync_fname("pg_logical/snapshots", true);
1626
1627         /*
1628          * Now there's no way we can loose the dumped state anymore, remember this
1629          * as a serialization point.
1630          */
1631         builder->last_serialized_snapshot = lsn;
1632
1633 out:
1634         ReorderBufferSetRestartPoint(builder->reorder,
1635                                                                  builder->last_serialized_snapshot);
1636 }
1637
1638 /*
1639  * Restore a snapshot into 'builder' if previously one has been stored at the
1640  * location indicated by 'lsn'. Returns true if successful, false otherwise.
1641  */
1642 static bool
1643 SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn)
1644 {
1645         SnapBuildOnDisk ondisk;
1646         int                     fd;
1647         char            path[MAXPGPATH];
1648         Size            sz;
1649         int                     readBytes;
1650         pg_crc32c       checksum;
1651
1652         /* no point in loading a snapshot if we're already there */
1653         if (builder->state == SNAPBUILD_CONSISTENT)
1654                 return false;
1655
1656         sprintf(path, "pg_logical/snapshots/%X-%X.snap",
1657                         (uint32) (lsn >> 32), (uint32) lsn);
1658
1659         fd = OpenTransientFile(path, O_RDONLY | PG_BINARY, 0);
1660
1661         if (fd < 0 && errno == ENOENT)
1662                 return false;
1663         else if (fd < 0)
1664                 ereport(ERROR,
1665                                 (errcode_for_file_access(),
1666                                  errmsg("could not open file \"%s\": %m", path)));
1667
1668         /* ----
1669          * Make sure the snapshot had been stored safely to disk, that's normally
1670          * cheap.
1671          * Note that we do not need PANIC here, nobody will be able to use the
1672          * slot without fsyncing, and saving it won't succeed without an fsync()
1673          * either...
1674          * ----
1675          */
1676         fsync_fname(path, false);
1677         fsync_fname("pg_logical/snapshots", true);
1678
1679
1680         /* read statically sized portion of snapshot */
1681         readBytes = read(fd, &ondisk, SnapBuildOnDiskConstantSize);
1682         if (readBytes != SnapBuildOnDiskConstantSize)
1683         {
1684                 CloseTransientFile(fd);
1685                 ereport(ERROR,
1686                                 (errcode_for_file_access(),
1687                                  errmsg("could not read file \"%s\", read %d of %d: %m",
1688                                                 path, readBytes, (int) SnapBuildOnDiskConstantSize)));
1689         }
1690
1691         if (ondisk.magic != SNAPBUILD_MAGIC)
1692                 ereport(ERROR,
1693                                 (errmsg("snapbuild state file \"%s\" has wrong magic number: %u instead of %u",
1694                                                 path, ondisk.magic, SNAPBUILD_MAGIC)));
1695
1696         if (ondisk.version != SNAPBUILD_VERSION)
1697                 ereport(ERROR,
1698                                 (errmsg("snapbuild state file \"%s\" has unsupported version: %u instead of %u",
1699                                                 path, ondisk.version, SNAPBUILD_VERSION)));
1700
1701         INIT_CRC32C(checksum);
1702         COMP_CRC32C(checksum,
1703                                 ((char *) &ondisk) + SnapBuildOnDiskNotChecksummedSize,
1704                         SnapBuildOnDiskConstantSize - SnapBuildOnDiskNotChecksummedSize);
1705
1706         /* read SnapBuild */
1707         readBytes = read(fd, &ondisk.builder, sizeof(SnapBuild));
1708         if (readBytes != sizeof(SnapBuild))
1709         {
1710                 CloseTransientFile(fd);
1711                 ereport(ERROR,
1712                                 (errcode_for_file_access(),
1713                                  errmsg("could not read file \"%s\", read %d of %d: %m",
1714                                                 path, readBytes, (int) sizeof(SnapBuild))));
1715         }
1716         COMP_CRC32C(checksum, &ondisk.builder, sizeof(SnapBuild));
1717
1718         /* restore running xacts information */
1719         sz = sizeof(TransactionId) * ondisk.builder.running.xcnt_space;
1720         ondisk.builder.running.xip = MemoryContextAllocZero(builder->context, sz);
1721         readBytes = read(fd, ondisk.builder.running.xip, sz);
1722         if (readBytes != sz)
1723         {
1724                 CloseTransientFile(fd);
1725                 ereport(ERROR,
1726                                 (errcode_for_file_access(),
1727                                  errmsg("could not read file \"%s\", read %d of %d: %m",
1728                                                 path, readBytes, (int) sz)));
1729         }
1730         COMP_CRC32C(checksum, ondisk.builder.running.xip, sz);
1731
1732         /* restore committed xacts information */
1733         sz = sizeof(TransactionId) * ondisk.builder.committed.xcnt;
1734         ondisk.builder.committed.xip = MemoryContextAllocZero(builder->context, sz);
1735         readBytes = read(fd, ondisk.builder.committed.xip, sz);
1736         if (readBytes != sz)
1737         {
1738                 CloseTransientFile(fd);
1739                 ereport(ERROR,
1740                                 (errcode_for_file_access(),
1741                                  errmsg("could not read file \"%s\", read %d of %d: %m",
1742                                                 path, readBytes, (int) sz)));
1743         }
1744         COMP_CRC32C(checksum, ondisk.builder.committed.xip, sz);
1745
1746         CloseTransientFile(fd);
1747
1748         FIN_CRC32C(checksum);
1749
1750         /* verify checksum of what we've read */
1751         if (!EQ_CRC32C(checksum, ondisk.checksum))
1752                 ereport(ERROR,
1753                                 (errcode_for_file_access(),
1754                                  errmsg("checksum mismatch for snapbuild state file \"%s\": is %u, should be %u",
1755                                                 path, checksum, ondisk.checksum)));
1756
1757         /*
1758          * ok, we now have a sensible snapshot here, figure out if it has more
1759          * information than we have.
1760          */
1761
1762         /*
1763          * We are only interested in consistent snapshots for now, comparing
1764          * whether one incomplete snapshot is more "advanced" seems to be
1765          * unnecessarily complex.
1766          */
1767         if (ondisk.builder.state < SNAPBUILD_CONSISTENT)
1768                 goto snapshot_not_interesting;
1769
1770         /*
1771          * Don't use a snapshot that requires an xmin that we cannot guarantee to
1772          * be available.
1773          */
1774         if (TransactionIdPrecedes(ondisk.builder.xmin, builder->initial_xmin_horizon))
1775                 goto snapshot_not_interesting;
1776
1777
1778         /* ok, we think the snapshot is sensible, copy over everything important */
1779         builder->xmin = ondisk.builder.xmin;
1780         builder->xmax = ondisk.builder.xmax;
1781         builder->state = ondisk.builder.state;
1782
1783         builder->committed.xcnt = ondisk.builder.committed.xcnt;
1784         /* We only allocated/stored xcnt, not xcnt_space xids ! */
1785         /* don't overwrite preallocated xip, if we don't have anything here */
1786         if (builder->committed.xcnt > 0)
1787         {
1788                 pfree(builder->committed.xip);
1789                 builder->committed.xcnt_space = ondisk.builder.committed.xcnt;
1790                 builder->committed.xip = ondisk.builder.committed.xip;
1791         }
1792         ondisk.builder.committed.xip = NULL;
1793
1794         builder->running.xcnt = ondisk.builder.running.xcnt;
1795         if (builder->running.xip)
1796                 pfree(builder->running.xip);
1797         builder->running.xcnt_space = ondisk.builder.running.xcnt_space;
1798         builder->running.xip = ondisk.builder.running.xip;
1799
1800         /* our snapshot is not interesting anymore, build a new one */
1801         if (builder->snapshot != NULL)
1802         {
1803                 SnapBuildSnapDecRefcount(builder->snapshot);
1804         }
1805         builder->snapshot = SnapBuildBuildSnapshot(builder, InvalidTransactionId);
1806         SnapBuildSnapIncRefcount(builder->snapshot);
1807
1808         ReorderBufferSetRestartPoint(builder->reorder, lsn);
1809
1810         Assert(builder->state == SNAPBUILD_CONSISTENT);
1811
1812         ereport(LOG,
1813                         (errmsg("logical decoding found consistent point at %X/%X",
1814                                         (uint32) (lsn >> 32), (uint32) lsn),
1815                          errdetail("Logical decoding will begin using saved snapshot.")));
1816         return true;
1817
1818 snapshot_not_interesting:
1819         if (ondisk.builder.running.xip != NULL)
1820                 pfree(ondisk.builder.running.xip);
1821         if (ondisk.builder.committed.xip != NULL)
1822                 pfree(ondisk.builder.committed.xip);
1823         return false;
1824 }
1825
1826 /*
1827  * Remove all serialized snapshots that are not required anymore because no
1828  * slot can need them. This doesn't actually have to run during a checkpoint,
1829  * but it's a convenient point to schedule this.
1830  *
1831  * NB: We run this during checkpoints even if logical decoding is disabled so
1832  * we cleanup old slots at some point after it got disabled.
1833  */
1834 void
1835 CheckPointSnapBuild(void)
1836 {
1837         XLogRecPtr      cutoff;
1838         XLogRecPtr      redo;
1839         DIR                *snap_dir;
1840         struct dirent *snap_de;
1841         char            path[MAXPGPATH];
1842
1843         /*
1844          * We start of with a minimum of the last redo pointer. No new replication
1845          * slot will start before that, so that's a safe upper bound for removal.
1846          */
1847         redo = GetRedoRecPtr();
1848
1849         /* now check for the restart ptrs from existing slots */
1850         cutoff = ReplicationSlotsComputeLogicalRestartLSN();
1851
1852         /* don't start earlier than the restart lsn */
1853         if (redo < cutoff)
1854                 cutoff = redo;
1855
1856         snap_dir = AllocateDir("pg_logical/snapshots");
1857         while ((snap_de = ReadDir(snap_dir, "pg_logical/snapshots")) != NULL)
1858         {
1859                 uint32          hi;
1860                 uint32          lo;
1861                 XLogRecPtr      lsn;
1862                 struct stat statbuf;
1863
1864                 if (strcmp(snap_de->d_name, ".") == 0 ||
1865                         strcmp(snap_de->d_name, "..") == 0)
1866                         continue;
1867
1868                 snprintf(path, MAXPGPATH, "pg_logical/snapshots/%s", snap_de->d_name);
1869
1870                 if (lstat(path, &statbuf) == 0 && !S_ISREG(statbuf.st_mode))
1871                 {
1872                         elog(DEBUG1, "only regular files expected: %s", path);
1873                         continue;
1874                 }
1875
1876                 /*
1877                  * temporary filenames from SnapBuildSerialize() include the LSN and
1878                  * everything but are postfixed by .$pid.tmp. We can just remove them
1879                  * the same as other files because there can be none that are
1880                  * currently being written that are older than cutoff.
1881                  *
1882                  * We just log a message if a file doesn't fit the pattern, it's
1883                  * probably some editors lock/state file or similar...
1884                  */
1885                 if (sscanf(snap_de->d_name, "%X-%X.snap", &hi, &lo) != 2)
1886                 {
1887                         ereport(LOG,
1888                                         (errmsg("could not parse file name \"%s\"", path)));
1889                         continue;
1890                 }
1891
1892                 lsn = ((uint64) hi) << 32 | lo;
1893
1894                 /* check whether we still need it */
1895                 if (lsn < cutoff || cutoff == InvalidXLogRecPtr)
1896                 {
1897                         elog(DEBUG1, "removing snapbuild snapshot %s", path);
1898
1899                         /*
1900                          * It's not particularly harmful, though strange, if we can't
1901                          * remove the file here. Don't prevent the checkpoint from
1902                          * completing, that'd be cure worse than the disease.
1903                          */
1904                         if (unlink(path) < 0)
1905                         {
1906                                 ereport(LOG,
1907                                                 (errcode_for_file_access(),
1908                                                  errmsg("could not remove file \"%s\": %m",
1909                                                                 path)));
1910                                 continue;
1911                         }
1912                 }
1913         }
1914         FreeDir(snap_dir);
1915 }