]> granicus.if.org Git - postgresql/blob - src/backend/replication/logical/snapbuild.c
Assorted message fixes and improvements
[postgresql] / src / backend / replication / logical / snapbuild.c
1 /*-------------------------------------------------------------------------
2  *
3  * snapbuild.c
4  *
5  *        Infrastructure for building historic catalog snapshots based on contents
6  *        of the WAL, for the purpose of decoding heapam.c style values in the
7  *        WAL.
8  *
9  * NOTES:
10  *
11  * We build snapshots which can *only* be used to read catalog contents and we
12  * do so by reading and interpreting the WAL stream. The aim is to build a
13  * snapshot that behaves the same as a freshly taken MVCC snapshot would have
14  * at the time the XLogRecord was generated.
15  *
16  * To build the snapshots we reuse the infrastructure built for Hot
17  * Standby. The in-memory snapshots we build look different than HS' because
18  * we have different needs. To successfully decode data from the WAL we only
19  * need to access catalog tables and (sys|rel|cat)cache, not the actual user
20  * tables since the data we decode is wholly contained in the WAL
21  * records. Also, our snapshots need to be different in comparison to normal
22  * MVCC ones because in contrast to those we cannot fully rely on the clog and
23  * pg_subtrans for information about committed transactions because they might
24  * commit in the future from the POV of the WAL entry we're currently
25  * decoding. This definition has the advantage that we only need to prevent
26  * removal of catalog rows, while normal table's rows can still be
27  * removed. This is achieved by using the replication slot mechanism.
28  *
29  * As the percentage of transactions modifying the catalog normally is fairly
30  * small in comparisons to ones only manipulating user data, we keep track of
31  * the committed catalog modifying ones inside (xmin, xmax) instead of keeping
32  * track of all running transactions like it's done in a normal snapshot. Note
33  * that we're generally only looking at transactions that have acquired an
34  * xid. That is we keep a list of transactions between snapshot->(xmin, xmax)
35  * that we consider committed, everything else is considered aborted/in
36  * progress. That also allows us not to care about subtransactions before they
37  * have committed which means this modules, in contrast to HS, doesn't have to
38  * care about suboverflowed subtransactions and similar.
39  *
40  * One complexity of doing this is that to e.g. handle mixed DDL/DML
41  * transactions we need Snapshots that see intermediate versions of the
42  * catalog in a transaction. During normal operation this is achieved by using
43  * CommandIds/cmin/cmax. The problem with that however is that for space
44  * efficiency reasons only one value of that is stored
45  * (c.f. combocid.c). Since ComboCids are only available in memory we log
46  * additional information which allows us to get the original (cmin, cmax)
47  * pair during visibility checks. Check the reorderbuffer.c's comment above
48  * ResolveCminCmaxDuringDecoding() for details.
49  *
50  * To facilitate all this we need our own visibility routine, as the normal
51  * ones are optimized for different usecases.
52  *
53  * To replace the normal catalog snapshots with decoding ones use the
54  * SetupHistoricSnapshot() and TeardownHistoricSnapshot() functions.
55  *
56  *
57  *
58  * The snapbuild machinery is starting up in several stages, as illustrated
59  * by the following graph:
60  *                 +-------------------------+
61  *        +----|SNAPBUILD_START                  |-------------+
62  *        |    +-------------------------+                         |
63  *        |                                     |                                                  |
64  *        |                                     |                                                  |
65  *        |             running_xacts with running xacts           |
66  *        |                                     |                                                  |
67  *        |                                     |                                                  |
68  *        |                                     v                                                  |
69  *        |    +-------------------------+                         v
70  *        |    |SNAPBUILD_FULL_SNAPSHOT  |------------>|
71  *        |    +-------------------------+                         |
72  * running_xacts                |                                          saved snapshot
73  * with zero xacts              |                                 at running_xacts's lsn
74  *        |                                     |                                                  |
75  *        |             all running toplevel TXNs finished         |
76  *        |                                     |                                                  |
77  *        |                                     v                                                  |
78  *        |    +-------------------------+                         |
79  *        +--->|SNAPBUILD_CONSISTENT     |<------------+
80  *                 +-------------------------+
81  *
82  * Initially the machinery is in the START stage. When a xl_running_xacts
83  * record is read that is sufficiently new (above the safe xmin horizon),
84  * there's a state transition. If there were no running xacts when the
85  * runnign_xacts record was generated, we'll directly go into CONSISTENT
86  * state, otherwise we'll switch to the FULL_SNAPSHOT state. Having a full
87  * snapshot means that all transactions that start henceforth can be decoded
88  * in their entirety, but transactions that started previously can't. In
89  * FULL_SNAPSHOT we'll switch into CONSISTENT once all those previously
90  * running transactions have committed or aborted.
91  *
92  * Only transactions that commit after CONSISTENT state has been reached will
93  * be replayed, even though they might have started while still in
94  * FULL_SNAPSHOT. That ensures that we'll reach a point where no previous
95  * changes has been exported, but all the following ones will be. That point
96  * is a convenient point to initialize replication from, which is why we
97  * export a snapshot at that point, which *can* be used to read normal data.
98  *
99  * Copyright (c) 2012-2014, PostgreSQL Global Development Group
100  *
101  * IDENTIFICATION
102  *        src/backend/replication/snapbuild.c
103  *
104  *-------------------------------------------------------------------------
105  */
106
107 #include "postgres.h"
108
109 #include <sys/stat.h>
110 #include <sys/types.h>
111 #include <unistd.h>
112
113 #include "miscadmin.h"
114
115 #include "access/heapam_xlog.h"
116 #include "access/transam.h"
117 #include "access/xact.h"
118
119 #include "replication/logical.h"
120 #include "replication/reorderbuffer.h"
121 #include "replication/snapbuild.h"
122
123 #include "utils/builtins.h"
124 #include "utils/memutils.h"
125 #include "utils/snapshot.h"
126 #include "utils/snapmgr.h"
127 #include "utils/tqual.h"
128
129 #include "storage/block.h"              /* debugging output */
130 #include "storage/fd.h"
131 #include "storage/lmgr.h"
132 #include "storage/proc.h"
133 #include "storage/procarray.h"
134 #include "storage/standby.h"
135
136 /*
137  * This struct contains the current state of the snapshot building
138  * machinery. Besides a forward declaration in the header, it is not exposed
139  * to the public, so we can easily change its contents.
140  */
141 struct SnapBuild
142 {
143         /* how far are we along building our first full snapshot */
144         SnapBuildState state;
145
146         /* private memory context used to allocate memory for this module. */
147         MemoryContext context;
148
149         /* all transactions < than this have committed/aborted */
150         TransactionId xmin;
151
152         /* all transactions >= than this are uncommitted */
153         TransactionId xmax;
154
155         /*
156          * Don't replay commits from an LSN < this LSN. This can be set
157          * externally but it will also be advanced (never retreat) from within
158          * snapbuild.c.
159          */
160         XLogRecPtr      start_decoding_at;
161
162         /*
163          * Don't start decoding WAL until the "xl_running_xacts" information
164          * indicates there are no running xids with a xid smaller than this.
165          */
166         TransactionId initial_xmin_horizon;
167
168         /*
169          * Snapshot that's valid to see the catalog state seen at this moment.
170          */
171         Snapshot        snapshot;
172
173         /*
174          * LSN of the last location we are sure a snapshot has been serialized to.
175          */
176         XLogRecPtr      last_serialized_snapshot;
177
178         /*
179          * The reorderbuffer we need to update with usable snapshots et al.
180          */
181         ReorderBuffer *reorder;
182
183         /*
184          * Information about initially running transactions
185          *
186          * When we start building a snapshot there already may be transactions in
187          * progress.  Those are stored in running.xip.  We don't have enough
188          * information about those to decode their contents, so until they are
189          * finished (xcnt=0) we cannot switch to a CONSISTENT state.
190          */
191         struct
192         {
193                 /*
194                  * As long as running.xcnt all XIDs < running.xmin and > running.xmax
195                  * have to be checked whether they still are running.
196                  */
197                 TransactionId xmin;
198                 TransactionId xmax;
199
200                 size_t          xcnt;           /* number of used xip entries */
201                 size_t          xcnt_space; /* allocated size of xip */
202                 TransactionId *xip;             /* running xacts array, xidComparator-sorted */
203         }                       running;
204
205         /*
206          * Array of transactions which could have catalog changes that committed
207          * between xmin and xmax.
208          */
209         struct
210         {
211                 /* number of committed transactions */
212                 size_t          xcnt;
213
214                 /* available space for committed transactions */
215                 size_t          xcnt_space;
216
217                 /*
218                  * Until we reach a CONSISTENT state, we record commits of all
219                  * transactions, not just the catalog changing ones. Record when that
220                  * changes so we know we cannot export a snapshot safely anymore.
221                  */
222                 bool            includes_all_transactions;
223
224                 /*
225                  * Array of committed transactions that have modified the catalog.
226                  *
227                  * As this array is frequently modified we do *not* keep it in
228                  * xidComparator order. Instead we sort the array when building &
229                  * distributing a snapshot.
230                  *
231                  * TODO: It's unclear whether that reasoning has much merit. Every
232                  * time we add something here after becoming consistent will also
233                  * require distributing a snapshot. Storing them sorted would
234                  * potentially also make it easier to purge (but more complicated wrt
235                  * wraparound?). Should be improved if sorting while building the
236                  * snapshot shows up in profiles.
237                  */
238                 TransactionId *xip;
239         }                       committed;
240 };
241
242 /*
243  * Starting a transaction -- which we need to do while exporting a snapshot --
244  * removes knowledge about the previously used resowner, so we save it here.
245  */
246 static ResourceOwner SavedResourceOwnerDuringExport = NULL;
247 static bool     ExportInProgress = false;
248
249 /* transaction state manipulation functions */
250 static void SnapBuildEndTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid);
251
252 /* ->running manipulation */
253 static bool SnapBuildTxnIsRunning(SnapBuild *builder, TransactionId xid);
254
255 /* ->committed manipulation */
256 static void SnapBuildPurgeCommittedTxn(SnapBuild *builder);
257
258 /* snapshot building/manipulation/distribution functions */
259 static Snapshot SnapBuildBuildSnapshot(SnapBuild *builder, TransactionId xid);
260
261 static void SnapBuildFreeSnapshot(Snapshot snap);
262
263 static void SnapBuildSnapIncRefcount(Snapshot snap);
264
265 static void SnapBuildDistributeNewCatalogSnapshot(SnapBuild *builder, XLogRecPtr lsn);
266
267 /* xlog reading helper functions for SnapBuildProcessRecord */
268 static bool SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *running);
269
270 /* serialization functions */
271 static void SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn);
272 static bool SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn);
273
274
275 /*
276  * Allocate a new snapshot builder.
277  *
278  * xmin_horizon is the xid >=which we can be sure no catalog rows have been
279  * removed, start_lsn is the LSN >= we want to replay commits.
280  */
281 SnapBuild *
282 AllocateSnapshotBuilder(ReorderBuffer *reorder,
283                                                 TransactionId xmin_horizon,
284                                                 XLogRecPtr start_lsn)
285 {
286         MemoryContext context;
287         MemoryContext oldcontext;
288         SnapBuild  *builder;
289
290         /* allocate memory in own context, to have better accountability */
291         context = AllocSetContextCreate(CurrentMemoryContext,
292                                                                         "snapshot builder context",
293                                                                         ALLOCSET_DEFAULT_MINSIZE,
294                                                                         ALLOCSET_DEFAULT_INITSIZE,
295                                                                         ALLOCSET_DEFAULT_MAXSIZE);
296         oldcontext = MemoryContextSwitchTo(context);
297
298         builder = palloc0(sizeof(SnapBuild));
299
300         builder->state = SNAPBUILD_START;
301         builder->context = context;
302         builder->reorder = reorder;
303         /* Other struct members initialized by zeroing via palloc0 above */
304
305         builder->committed.xcnt = 0;
306         builder->committed.xcnt_space = 128;            /* arbitrary number */
307         builder->committed.xip =
308                 palloc0(builder->committed.xcnt_space * sizeof(TransactionId));
309         builder->committed.includes_all_transactions = true;
310
311         builder->initial_xmin_horizon = xmin_horizon;
312         builder->start_decoding_at = start_lsn;
313
314         MemoryContextSwitchTo(oldcontext);
315
316         return builder;
317 }
318
319 /*
320  * Free a snapshot builder.
321  */
322 void
323 FreeSnapshotBuilder(SnapBuild *builder)
324 {
325         MemoryContext context = builder->context;
326
327         /* free snapshot explicitly, that contains some error checking */
328         if (builder->snapshot != NULL)
329         {
330                 SnapBuildSnapDecRefcount(builder->snapshot);
331                 builder->snapshot = NULL;
332         }
333
334         /* other resources are deallocated via memory context reset */
335         MemoryContextDelete(context);
336 }
337
338 /*
339  * Free an unreferenced snapshot that has previously been built by us.
340  */
341 static void
342 SnapBuildFreeSnapshot(Snapshot snap)
343 {
344         /* make sure we don't get passed an external snapshot */
345         Assert(snap->satisfies == HeapTupleSatisfiesHistoricMVCC);
346
347         /* make sure nobody modified our snapshot */
348         Assert(snap->curcid == FirstCommandId);
349         Assert(!snap->suboverflowed);
350         Assert(!snap->takenDuringRecovery);
351         Assert(snap->regd_count == 1);
352
353         /* slightly more likely, so it's checked even without c-asserts */
354         if (snap->copied)
355                 elog(ERROR, "cannot free a copied snapshot");
356
357         if (snap->active_count)
358                 elog(ERROR, "cannot free an active snapshot");
359
360         pfree(snap);
361 }
362
363 /*
364  * In which state of snapshot building are we?
365  */
366 SnapBuildState
367 SnapBuildCurrentState(SnapBuild *builder)
368 {
369         return builder->state;
370 }
371
372 /*
373  * Should the contents of transaction ending at 'ptr' be decoded?
374  */
375 bool
376 SnapBuildXactNeedsSkip(SnapBuild *builder, XLogRecPtr ptr)
377 {
378         return ptr < builder->start_decoding_at;
379 }
380
381 /*
382  * Increase refcount of a snapshot.
383  *
384  * This is used when handing out a snapshot to some external resource or when
385  * adding a Snapshot as builder->snapshot.
386  */
387 static void
388 SnapBuildSnapIncRefcount(Snapshot snap)
389 {
390         snap->active_count++;
391 }
392
393 /*
394  * Decrease refcount of a snapshot and free if the refcount reaches zero.
395  *
396  * Externally visible, so that external resources that have been handed an
397  * IncRef'ed Snapshot can adjust its refcount easily.
398  */
399 void
400 SnapBuildSnapDecRefcount(Snapshot snap)
401 {
402         /* make sure we don't get passed an external snapshot */
403         Assert(snap->satisfies == HeapTupleSatisfiesHistoricMVCC);
404
405         /* make sure nobody modified our snapshot */
406         Assert(snap->curcid == FirstCommandId);
407         Assert(!snap->suboverflowed);
408         Assert(!snap->takenDuringRecovery);
409
410         Assert(snap->regd_count == 1);
411
412         Assert(snap->active_count);
413
414         /* slightly more likely, so it's checked even without casserts */
415         if (snap->copied)
416                 elog(ERROR, "cannot free a copied snapshot");
417
418         snap->active_count--;
419         if (!snap->active_count)
420                 SnapBuildFreeSnapshot(snap);
421 }
422
423 /*
424  * Build a new snapshot, based on currently committed catalog-modifying
425  * transactions.
426  *
427  * In-progress transactions with catalog access are *not* allowed to modify
428  * these snapshots; they have to copy them and fill in appropriate ->curcid
429  * and ->subxip/subxcnt values.
430  */
431 static Snapshot
432 SnapBuildBuildSnapshot(SnapBuild *builder, TransactionId xid)
433 {
434         Snapshot        snapshot;
435         Size            ssize;
436
437         Assert(builder->state >= SNAPBUILD_FULL_SNAPSHOT);
438
439         ssize = sizeof(SnapshotData)
440                 + sizeof(TransactionId) * builder->committed.xcnt
441                 + sizeof(TransactionId) * 1 /* toplevel xid */ ;
442
443         snapshot = MemoryContextAllocZero(builder->context, ssize);
444
445         snapshot->satisfies = HeapTupleSatisfiesHistoricMVCC;
446
447         /*
448          * We misuse the original meaning of SnapshotData's xip and subxip fields
449          * to make the more fitting for our needs.
450          *
451          * In the 'xip' array we store transactions that have to be treated as
452          * committed. Since we will only ever look at tuples from transactions
453          * that have modified the catalog it's more efficient to store those few
454          * that exist between xmin and xmax (frequently there are none).
455          *
456          * Snapshots that are used in transactions that have modified the catalog
457          * also use the 'subxip' array to store their toplevel xid and all the
458          * subtransaction xids so we can recognize when we need to treat rows as
459          * visible that are not in xip but still need to be visible. Subxip only
460          * gets filled when the transaction is copied into the context of a
461          * catalog modifying transaction since we otherwise share a snapshot
462          * between transactions. As long as a txn hasn't modified the catalog it
463          * doesn't need to treat any uncommitted rows as visible, so there is no
464          * need for those xids.
465          *
466          * Both arrays are qsort'ed so that we can use bsearch() on them.
467          */
468         Assert(TransactionIdIsNormal(builder->xmin));
469         Assert(TransactionIdIsNormal(builder->xmax));
470
471         snapshot->xmin = builder->xmin;
472         snapshot->xmax = builder->xmax;
473
474         /* store all transactions to be treated as committed by this snapshot */
475         snapshot->xip =
476                 (TransactionId *) ((char *) snapshot + sizeof(SnapshotData));
477         snapshot->xcnt = builder->committed.xcnt;
478         memcpy(snapshot->xip,
479                    builder->committed.xip,
480                    builder->committed.xcnt * sizeof(TransactionId));
481
482         /* sort so we can bsearch() */
483         qsort(snapshot->xip, snapshot->xcnt, sizeof(TransactionId), xidComparator);
484
485         /*
486          * Initially, subxip is empty, i.e. it's a snapshot to be used by
487          * transactions that don't modify the catalog. Will be filled by
488          * ReorderBufferCopySnap() if necessary.
489          */
490         snapshot->subxcnt = 0;
491         snapshot->subxip = NULL;
492
493         snapshot->suboverflowed = false;
494         snapshot->takenDuringRecovery = false;
495         snapshot->copied = false;
496         snapshot->curcid = FirstCommandId;
497         snapshot->active_count = 0;
498         snapshot->regd_count = 1;       /* mark as registered so nobody frees it */
499
500         return snapshot;
501 }
502
503 /*
504  * Export a snapshot so it can be set in another session with SET TRANSACTION
505  * SNAPSHOT.
506  *
507  * For that we need to start a transaction in the current backend as the
508  * importing side checks whether the source transaction is still open to make
509  * sure the xmin horizon hasn't advanced since then.
510  *
511  * After that we convert a locally built snapshot into the normal variant
512  * understood by HeapTupleSatisfiesMVCC et al.
513  */
514 const char *
515 SnapBuildExportSnapshot(SnapBuild *builder)
516 {
517         Snapshot        snap;
518         char       *snapname;
519         TransactionId xid;
520         TransactionId *newxip;
521         int                     newxcnt = 0;
522
523         if (builder->state != SNAPBUILD_CONSISTENT)
524                 elog(ERROR, "cannot export a snapshot before reaching a consistent state");
525
526         if (!builder->committed.includes_all_transactions)
527                 elog(ERROR, "cannot export a snapshot, not all transactions are monitored anymore");
528
529         /* so we don't overwrite the existing value */
530         if (TransactionIdIsValid(MyPgXact->xmin))
531                 elog(ERROR, "cannot export a snapshot when MyPgXact->xmin already is valid");
532
533         if (IsTransactionOrTransactionBlock())
534                 elog(ERROR, "cannot export a snapshot from within a transaction");
535
536         if (SavedResourceOwnerDuringExport)
537                 elog(ERROR, "can only export one snapshot at a time");
538
539         SavedResourceOwnerDuringExport = CurrentResourceOwner;
540         ExportInProgress = true;
541
542         StartTransactionCommand();
543
544         Assert(!FirstSnapshotSet);
545
546         /* There doesn't seem to a nice API to set these */
547         XactIsoLevel = XACT_REPEATABLE_READ;
548         XactReadOnly = true;
549
550         snap = SnapBuildBuildSnapshot(builder, GetTopTransactionId());
551
552         /*
553          * We know that snap->xmin is alive, enforced by the logical xmin
554          * mechanism. Due to that we can do this without locks, we're only
555          * changing our own value.
556          */
557         MyPgXact->xmin = snap->xmin;
558
559         /* allocate in transaction context */
560         newxip = (TransactionId *)
561                 palloc(sizeof(TransactionId) * GetMaxSnapshotXidCount());
562
563         /*
564          * snapbuild.c builds transactions in an "inverted" manner, which means it
565          * stores committed transactions in ->xip, not ones in progress. Build a
566          * classical snapshot by marking all non-committed transactions as
567          * in-progress. This can be expensive.
568          */
569         for (xid = snap->xmin; NormalTransactionIdPrecedes(xid, snap->xmax);)
570         {
571                 void       *test;
572
573                 /*
574                  * Check whether transaction committed using the decoding snapshot
575                  * meaning of ->xip.
576                  */
577                 test = bsearch(&xid, snap->xip, snap->xcnt,
578                                            sizeof(TransactionId), xidComparator);
579
580                 if (test == NULL)
581                 {
582                         if (newxcnt >= GetMaxSnapshotXidCount())
583                                 elog(ERROR, "snapshot too large");
584
585                         newxip[newxcnt++] = xid;
586                 }
587
588                 TransactionIdAdvance(xid);
589         }
590
591         snap->xcnt = newxcnt;
592         snap->xip = newxip;
593
594         /*
595          * now that we've built a plain snapshot, use the normal mechanisms for
596          * exporting it
597          */
598         snapname = ExportSnapshot(snap);
599
600         ereport(LOG,
601                         (errmsg("exported logical decoding snapshot: \"%s\" with %u xids",
602                                         snapname, snap->xcnt)));
603         return snapname;
604 }
605
606 /*
607  * Reset a previously SnapBuildExportSnapshot()'ed snapshot if there is
608  * any. Aborts the previously started transaction and resets the resource
609  * owner back to its original value.
610  */
611 void
612 SnapBuildClearExportedSnapshot()
613 {
614         /* nothing exported, thats the usual case */
615         if (!ExportInProgress)
616                 return;
617
618         if (!IsTransactionState())
619                 elog(ERROR, "clearing exported snapshot in wrong transaction state");
620
621         /* make sure nothing  could have ever happened */
622         AbortCurrentTransaction();
623
624         CurrentResourceOwner = SavedResourceOwnerDuringExport;
625         SavedResourceOwnerDuringExport = NULL;
626         ExportInProgress = false;
627 }
628
629 /*
630  * Handle the effects of a single heap change, appropriate to the current state
631  * of the snapshot builder and returns whether changes made at (xid, lsn) can
632  * be decoded.
633  */
634 bool
635 SnapBuildProcessChange(SnapBuild *builder, TransactionId xid, XLogRecPtr lsn)
636 {
637         bool            is_old_tx;
638
639         /*
640          * We can't handle data in transactions if we haven't built a snapshot
641          * yet, so don't store them.
642          */
643         if (builder->state < SNAPBUILD_FULL_SNAPSHOT)
644                 return false;
645
646         /*
647          * No point in keeping track of changes in transactions that we don't have
648          * enough information about to decode. This means that they started before
649          * we got into the SNAPBUILD_FULL_SNAPSHOT state.
650          */
651         if (builder->state < SNAPBUILD_CONSISTENT &&
652                 SnapBuildTxnIsRunning(builder, xid))
653                 return false;
654
655         /*
656          * If the reorderbuffer doesn't yet have a snapshot, add one now, it will
657          * be needed to decode the change we're currently processing.
658          */
659         is_old_tx = ReorderBufferIsXidKnown(builder->reorder, xid);
660
661         if (!is_old_tx || !ReorderBufferXidHasBaseSnapshot(builder->reorder, xid))
662         {
663                 /* only build a new snapshot if we don't have a prebuilt one */
664                 if (builder->snapshot == NULL)
665                 {
666                         builder->snapshot = SnapBuildBuildSnapshot(builder, xid);
667                         /* inrease refcount for the snapshot builder */
668                         SnapBuildSnapIncRefcount(builder->snapshot);
669                 }
670
671                 /*
672                  * Increase refcount for the transaction we're handing the snapshot
673                  * out to.
674                  */
675                 SnapBuildSnapIncRefcount(builder->snapshot);
676                 ReorderBufferSetBaseSnapshot(builder->reorder, xid, lsn,
677                                                                          builder->snapshot);
678         }
679
680         return true;
681 }
682
683 /*
684  * Do CommandId/ComboCid handling after reading a xl_heap_new_cid record. This
685  * implies that a transaction has done some form of write to system catalogs.
686  */
687 void
688 SnapBuildProcessNewCid(SnapBuild *builder, TransactionId xid,
689                                            XLogRecPtr lsn, xl_heap_new_cid *xlrec)
690 {
691         CommandId       cid;
692
693         /*
694          * we only log new_cid's if a catalog tuple was modified, so mark the
695          * transaction as containing catalog modifications
696          */
697         ReorderBufferXidSetCatalogChanges(builder->reorder, xid, lsn);
698
699         ReorderBufferAddNewTupleCids(builder->reorder, xlrec->top_xid, lsn,
700                                                                  xlrec->target.node, xlrec->target.tid,
701                                                                  xlrec->cmin, xlrec->cmax,
702                                                                  xlrec->combocid);
703
704         /* figure out new command id */
705         if (xlrec->cmin != InvalidCommandId &&
706                 xlrec->cmax != InvalidCommandId)
707                 cid = Max(xlrec->cmin, xlrec->cmax);
708         else if (xlrec->cmax != InvalidCommandId)
709                 cid = xlrec->cmax;
710         else if (xlrec->cmin != InvalidCommandId)
711                 cid = xlrec->cmin;
712         else
713         {
714                 cid = InvalidCommandId; /* silence compiler */
715                 elog(ERROR, "xl_heap_new_cid record without a valid CommandId");
716         }
717
718         ReorderBufferAddNewCommandId(builder->reorder, xid, lsn, cid + 1);
719 }
720
721 /*
722  * Check whether `xid` is currently 'running'.
723  *
724  * Running transactions in our parlance are transactions which we didn't
725  * observe from the start so we can't properly decode their contents. They
726  * only exist after we freshly started from an < CONSISTENT snapshot.
727  */
728 static bool
729 SnapBuildTxnIsRunning(SnapBuild *builder, TransactionId xid)
730 {
731         Assert(builder->state < SNAPBUILD_CONSISTENT);
732         Assert(TransactionIdIsNormal(builder->running.xmin));
733         Assert(TransactionIdIsNormal(builder->running.xmax));
734
735         if (builder->running.xcnt &&
736                 NormalTransactionIdFollows(xid, builder->running.xmin) &&
737                 NormalTransactionIdPrecedes(xid, builder->running.xmax))
738         {
739                 TransactionId *search =
740                 bsearch(&xid, builder->running.xip, builder->running.xcnt_space,
741                                 sizeof(TransactionId), xidComparator);
742
743                 if (search != NULL)
744                 {
745                         Assert(*search == xid);
746                         return true;
747                 }
748         }
749
750         return false;
751 }
752
753 /*
754  * Add a new Snapshot to all transactions we're decoding that currently are
755  * in-progress so they can see new catalog contents made by the transaction
756  * that just committed. This is necessary because those in-progress
757  * transactions will use the new catalog's contents from here on (at the very
758  * least everything they do needs to be compatible with newer catalog
759  * contents).
760  */
761 static void
762 SnapBuildDistributeNewCatalogSnapshot(SnapBuild *builder, XLogRecPtr lsn)
763 {
764         dlist_iter      txn_i;
765         ReorderBufferTXN *txn;
766
767         /*
768          * Iterate through all toplevel transactions. This can include
769          * subtransactions which we just don't yet know to be that, but that's
770          * fine, they will just get an unneccesary snapshot queued.
771          */
772         dlist_foreach(txn_i, &builder->reorder->toplevel_by_lsn)
773         {
774                 txn = dlist_container(ReorderBufferTXN, node, txn_i.cur);
775
776                 Assert(TransactionIdIsValid(txn->xid));
777
778                 /*
779                  * If we don't have a base snapshot yet, there are no changes in this
780                  * transaction which in turn implies we don't yet need a snapshot at
781                  * all. We'll add a snapshot when the first change gets queued.
782                  *
783                  * NB: This works correctly even for subtransactions because
784                  * ReorderBufferCommitChild() takes care to pass the parent the base
785                  * snapshot, and while iterating the changequeue we'll get the change
786                  * from the subtxn.
787                  */
788                 if (!ReorderBufferXidHasBaseSnapshot(builder->reorder, txn->xid))
789                         continue;
790
791                 elog(DEBUG2, "adding a new snapshot to %u at %X/%X",
792                          txn->xid, (uint32) (lsn >> 32), (uint32) lsn);
793
794                 /*
795                  * increase the snapshot's refcount for the transaction we are handing
796                  * it out to
797                  */
798                 SnapBuildSnapIncRefcount(builder->snapshot);
799                 ReorderBufferAddSnapshot(builder->reorder, txn->xid, lsn,
800                                                                  builder->snapshot);
801         }
802 }
803
804 /*
805  * Keep track of a new catalog changing transaction that has committed.
806  */
807 static void
808 SnapBuildAddCommittedTxn(SnapBuild *builder, TransactionId xid)
809 {
810         Assert(TransactionIdIsValid(xid));
811
812         if (builder->committed.xcnt == builder->committed.xcnt_space)
813         {
814                 builder->committed.xcnt_space = builder->committed.xcnt_space * 2 + 1;
815
816                 elog(DEBUG1, "increasing space for committed transactions to %u",
817                          (uint32) builder->committed.xcnt_space);
818
819                 builder->committed.xip = repalloc(builder->committed.xip,
820                                           builder->committed.xcnt_space * sizeof(TransactionId));
821         }
822
823         /*
824          * TODO: It might make sense to keep the array sorted here instead of
825          * doing it every time we build a new snapshot. On the other hand this
826          * gets called repeatedly when a transaction with subtransactions commits.
827          */
828         builder->committed.xip[builder->committed.xcnt++] = xid;
829 }
830
831 /*
832  * Remove knowledge about transactions we treat as committed that are smaller
833  * than ->xmin. Those won't ever get checked via the ->commited array but via
834  * the clog machinery, so we don't need to waste memory on them.
835  */
836 static void
837 SnapBuildPurgeCommittedTxn(SnapBuild *builder)
838 {
839         int                     off;
840         TransactionId *workspace;
841         int                     surviving_xids = 0;
842
843         /* not ready yet */
844         if (!TransactionIdIsNormal(builder->xmin))
845                 return;
846
847         /* TODO: Neater algorithm than just copying and iterating? */
848         workspace =
849                 MemoryContextAlloc(builder->context,
850                                                    builder->committed.xcnt * sizeof(TransactionId));
851
852         /* copy xids that still are interesting to workspace */
853         for (off = 0; off < builder->committed.xcnt; off++)
854         {
855                 if (NormalTransactionIdPrecedes(builder->committed.xip[off],
856                                                                                 builder->xmin))
857                         ;                                       /* remove */
858                 else
859                         workspace[surviving_xids++] = builder->committed.xip[off];
860         }
861
862         /* copy workspace back to persistent state */
863         memcpy(builder->committed.xip, workspace,
864                    surviving_xids * sizeof(TransactionId));
865
866         elog(DEBUG3, "purged committed transactions from %u to %u, xmin: %u, xmax: %u",
867                  (uint32) builder->committed.xcnt, (uint32) surviving_xids,
868                  builder->xmin, builder->xmax);
869         builder->committed.xcnt = surviving_xids;
870
871         pfree(workspace);
872 }
873
874 /*
875  * Common logic for SnapBuildAbortTxn and SnapBuildCommitTxn dealing with
876  * keeping track of the amount of running transactions.
877  */
878 static void
879 SnapBuildEndTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid)
880 {
881         if (builder->state == SNAPBUILD_CONSISTENT)
882                 return;
883
884         /*
885          * NB: This handles subtransactions correctly even if we started from
886          * suboverflowed xl_running_xacts because we only keep track of toplevel
887          * transactions. Since the latter are always are allocated before their
888          * subxids and since they end at the same time it's sufficient to deal
889          * with them here.
890          */
891         if (SnapBuildTxnIsRunning(builder, xid))
892         {
893                 Assert(builder->running.xcnt > 0);
894
895                 if (!--builder->running.xcnt)
896                 {
897                         /*
898                          * None of the originally running transaction is running anymore,
899                          * so our incrementaly built snapshot now is consistent.
900                          */
901                         ereport(LOG,
902                                   (errmsg("logical decoding found consistent point at %X/%X",
903                                                   (uint32) (lsn >> 32), (uint32) lsn),
904                                 errdetail("xid %u finished, no running transactions anymore",
905                                                   xid)));
906                         builder->state = SNAPBUILD_CONSISTENT;
907                 }
908         }
909 }
910
911 /*
912  * Abort a transaction, throw away all state we kept.
913  */
914 void
915 SnapBuildAbortTxn(SnapBuild *builder, XLogRecPtr lsn,
916                                   TransactionId xid,
917                                   int nsubxacts, TransactionId *subxacts)
918 {
919         int                     i;
920
921         for (i = 0; i < nsubxacts; i++)
922         {
923                 TransactionId subxid = subxacts[i];
924
925                 SnapBuildEndTxn(builder, lsn, subxid);
926         }
927
928         SnapBuildEndTxn(builder, lsn, xid);
929 }
930
931 /*
932  * Handle everything that needs to be done when a transaction commits
933  */
934 void
935 SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid,
936                                    int nsubxacts, TransactionId *subxacts)
937 {
938         int                     nxact;
939
940         bool            forced_timetravel = false;
941         bool            sub_needs_timetravel = false;
942         bool            top_needs_timetravel = false;
943
944         TransactionId xmax = xid;
945
946         /*
947          * If we couldn't observe every change of a transaction because it was
948          * already running at the point we started to observe we have to assume it
949          * made catalog changes.
950          *
951          * This has the positive benefit that we afterwards have enough
952          * information to build an exportable snapshot that's usable by pg_dump et
953          * al.
954          */
955         if (builder->state < SNAPBUILD_CONSISTENT)
956         {
957                 /* ensure that only commits after this are getting replayed */
958                 if (builder->start_decoding_at <= lsn)
959                         builder->start_decoding_at = lsn + 1;
960
961                 /*
962                  * We could avoid treating !SnapBuildTxnIsRunning transactions as
963                  * timetravel ones, but we want to be able to export a snapshot when
964                  * we reached consistency.
965                  */
966                 forced_timetravel = true;
967                 elog(DEBUG1, "forced to assume catalog changes for xid %u because it was running to early", xid);
968         }
969
970         for (nxact = 0; nxact < nsubxacts; nxact++)
971         {
972                 TransactionId subxid = subxacts[nxact];
973
974                 /*
975                  * make sure txn is not tracked in running txn's anymore, switch state
976                  */
977                 SnapBuildEndTxn(builder, lsn, subxid);
978
979                 /*
980                  * If we're forcing timetravel we also need visibility information
981                  * about subtransaction, so keep track of subtransaction's state.
982                  */
983                 if (forced_timetravel)
984                 {
985                         SnapBuildAddCommittedTxn(builder, subxid);
986                         if (NormalTransactionIdFollows(subxid, xmax))
987                                 xmax = subxid;
988                 }
989
990                 /*
991                  * Add subtransaction to base snapshot if it DDL, we don't distinguish
992                  * to toplevel transactions there.
993                  */
994                 else if (ReorderBufferXidHasCatalogChanges(builder->reorder, subxid))
995                 {
996                         sub_needs_timetravel = true;
997
998                         elog(DEBUG1, "found subtransaction %u:%u with catalog changes.",
999                                  xid, subxid);
1000
1001                         SnapBuildAddCommittedTxn(builder, subxid);
1002
1003                         if (NormalTransactionIdFollows(subxid, xmax))
1004                                 xmax = subxid;
1005                 }
1006         }
1007
1008         /*
1009          * Make sure toplevel txn is not tracked in running txn's anymore, switch
1010          * state to consistent if possible.
1011          */
1012         SnapBuildEndTxn(builder, lsn, xid);
1013
1014         if (forced_timetravel)
1015         {
1016                 elog(DEBUG2, "forced transaction %u to do timetravel.", xid);
1017
1018                 SnapBuildAddCommittedTxn(builder, xid);
1019         }
1020         /* add toplevel transaction to base snapshot */
1021         else if (ReorderBufferXidHasCatalogChanges(builder->reorder, xid))
1022         {
1023                 elog(DEBUG2, "found top level transaction %u, with catalog changes!",
1024                          xid);
1025
1026                 top_needs_timetravel = true;
1027                 SnapBuildAddCommittedTxn(builder, xid);
1028         }
1029         else if (sub_needs_timetravel)
1030         {
1031                 /* mark toplevel txn as timetravel as well */
1032                 SnapBuildAddCommittedTxn(builder, xid);
1033         }
1034
1035         /* if there's any reason to build a historic snapshot, do so now */
1036         if (forced_timetravel || top_needs_timetravel || sub_needs_timetravel)
1037         {
1038                 /*
1039                  * Adjust xmax of the snapshot builder, we only do that for committed,
1040                  * catalog modifying, transactions, everything else isn't interesting
1041                  * for us since we'll never look at the respective rows.
1042                  */
1043                 if (!TransactionIdIsValid(builder->xmax) ||
1044                         TransactionIdFollowsOrEquals(xmax, builder->xmax))
1045                 {
1046                         builder->xmax = xmax;
1047                         TransactionIdAdvance(builder->xmax);
1048                 }
1049
1050                 /*
1051                  * If we haven't built a complete snapshot yet there's no need to hand
1052                  * it out, it wouldn't (and couldn't) be used anyway.
1053                  */
1054                 if (builder->state < SNAPBUILD_FULL_SNAPSHOT)
1055                         return;
1056
1057                 /*
1058                  * Decrease the snapshot builder's refcount of the old snapshot, note
1059                  * that it still will be used if it has been handed out to the
1060                  * reorderbuffer earlier.
1061                  */
1062                 if (builder->snapshot)
1063                         SnapBuildSnapDecRefcount(builder->snapshot);
1064
1065                 builder->snapshot = SnapBuildBuildSnapshot(builder, xid);
1066
1067                 /* we might need to execute invalidations, add snapshot */
1068                 if (!ReorderBufferXidHasBaseSnapshot(builder->reorder, xid))
1069                 {
1070                         SnapBuildSnapIncRefcount(builder->snapshot);
1071                         ReorderBufferSetBaseSnapshot(builder->reorder, xid, lsn,
1072                                                                                  builder->snapshot);
1073                 }
1074
1075                 /* refcount of the snapshot builder for the new snapshot */
1076                 SnapBuildSnapIncRefcount(builder->snapshot);
1077
1078                 /* add a new SnapshotNow to all currently running transactions */
1079                 SnapBuildDistributeNewCatalogSnapshot(builder, lsn);
1080         }
1081         else
1082         {
1083                 /* record that we cannot export a general snapshot anymore */
1084                 builder->committed.includes_all_transactions = false;
1085         }
1086 }
1087
1088
1089 /* -----------------------------------
1090  * Snapshot building functions dealing with xlog records
1091  * -----------------------------------
1092  */
1093
1094 /*
1095  * Process a running xacts record, and use its information to first build a
1096  * historic snapshot and later to release resources that aren't needed
1097  * anymore.
1098  */
1099 void
1100 SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *running)
1101 {
1102         ReorderBufferTXN *txn;
1103
1104         /*
1105          * If we're not consistent yet, inspect the record to see whether it
1106          * allows to get closer to being consistent. If we are consistent, dump
1107          * our snapshot so others or we, after a restart, can use it.
1108          */
1109         if (builder->state < SNAPBUILD_CONSISTENT)
1110         {
1111                 /* returns false if there's no point in performing cleanup just yet */
1112                 if (!SnapBuildFindSnapshot(builder, lsn, running))
1113                         return;
1114         }
1115         else
1116                 SnapBuildSerialize(builder, lsn);
1117
1118         /*
1119          * Update range of interesting xids based on the running xacts
1120          * information. We don't increase ->xmax using it, because once we are in
1121          * a consistent state we can do that ourselves and much more efficiently
1122          * so, because we only need to do it for catalog transactions since we
1123          * only ever look at those.
1124          *
1125          * NB: Because of that xmax can be lower than xmin, because we only
1126          * increase xmax when a catalog modifying transaction commits. While odd
1127          * looking, it's correct and actually more efficient this way since we hit
1128          * fast paths in tqual.c.
1129          */
1130         builder->xmin = running->oldestRunningXid;
1131
1132         /* Remove transactions we don't need to keep track off anymore */
1133         SnapBuildPurgeCommittedTxn(builder);
1134
1135         elog(DEBUG3, "xmin: %u, xmax: %u, oldestrunning: %u",
1136                  builder->xmin, builder->xmax,
1137                  running->oldestRunningXid);
1138
1139         /*
1140          * Inrease shared memory limits, so vacuum can work on tuples we prevented
1141          * from being pruned till now.
1142          */
1143         LogicalIncreaseXminForSlot(lsn, running->oldestRunningXid);
1144
1145         /*
1146          * Also tell the slot where we can restart decoding from. We don't want to
1147          * do that after every commit because changing that implies an fsync of
1148          * the logical slot's state file, so we only do it every time we see a
1149          * running xacts record.
1150          *
1151          * Do so by looking for the oldest in progress transaction (determined by
1152          * the first LSN of any of its relevant records). Every transaction
1153          * remembers the last location we stored the snapshot to disk before its
1154          * beginning. That point is where we can restart from.
1155          */
1156
1157         /*
1158          * Can't know about a serialized snapshot's location if we're not
1159          * consistent.
1160          */
1161         if (builder->state < SNAPBUILD_CONSISTENT)
1162                 return;
1163
1164         txn = ReorderBufferGetOldestTXN(builder->reorder);
1165
1166         /*
1167          * oldest ongoing txn might have started when we didn't yet serialize
1168          * anything because we hadn't reached a consistent state yet.
1169          */
1170         if (txn != NULL && txn->restart_decoding_lsn != InvalidXLogRecPtr)
1171                 LogicalIncreaseRestartDecodingForSlot(lsn, txn->restart_decoding_lsn);
1172
1173         /*
1174          * No in-progress transaction, can reuse the last serialized snapshot if
1175          * we have one.
1176          */
1177         else if (txn == NULL &&
1178                 builder->reorder->current_restart_decoding_lsn != InvalidXLogRecPtr &&
1179                          builder->last_serialized_snapshot != InvalidXLogRecPtr)
1180                 LogicalIncreaseRestartDecodingForSlot(lsn,
1181                                                                                   builder->last_serialized_snapshot);
1182 }
1183
1184
1185 /*
1186  * Build the start of a snapshot that's capable of decoding the catalog.
1187  *
1188  * Helper function for SnapBuildProcessRunningXacts() while we're not yet
1189  * consistent.
1190  *
1191  * Returns true if there is a point in performing internal maintenance/cleanup
1192  * using the xl_running_xacts record.
1193  */
1194 static bool
1195 SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *running)
1196 {
1197         /* ---
1198          * Build catalog decoding snapshot incrementally using information about
1199          * the currently running transactions. There are several ways to do that:
1200          *
1201          * a) There were no running transactions when the xl_running_xacts record
1202          *        was inserted, jump to CONSISTENT immediately. We might find such a
1203          *        state we were waiting for b) and c).
1204          *
1205          * b) Wait for all toplevel transactions that were running to end. We
1206          *        simply track the number of in-progress toplevel transactions and
1207          *        lower it whenever one commits or aborts. When that number
1208          *        (builder->running.xcnt) reaches zero, we can go from FULL_SNAPSHOT
1209          *        to CONSISTENT.
1210          *        NB: We need to search running.xip when seeing a transaction's end to
1211          *        make sure it's a toplevel transaction and it's been one of the
1212          *        intially running ones.
1213          *        Interestingly, in contrast to HS, this allows us not to care about
1214          *        subtransactions - and by extension suboverflowed xl_running_xacts -
1215          *        at all.
1216          *
1217          * c) This (in a previous run) or another decoding slot serialized a
1218          *        snapshot to disk that we can use.
1219          * ---
1220          */
1221
1222         /*
1223          * xl_running_xact record is older than what we can use, we might not have
1224          * all necessary catalog rows anymore.
1225          */
1226         if (TransactionIdIsNormal(builder->initial_xmin_horizon) &&
1227                 NormalTransactionIdPrecedes(running->oldestRunningXid,
1228                                                                         builder->initial_xmin_horizon))
1229         {
1230                 ereport(DEBUG1,
1231                                 (errmsg("skipping snapshot at %X/%X while building logical decoding snapshot, xmin horizon too low",
1232                                                 (uint32) (lsn >> 32), (uint32) lsn),
1233                                  errdetail("initial xmin horizon of %u vs the snapshot's %u",
1234                                  builder->initial_xmin_horizon, running->oldestRunningXid)));
1235                 return true;
1236         }
1237
1238         /*
1239          * a) No transaction were running, we can jump to consistent.
1240          *
1241          * NB: We might have already started to incrementally assemble a snapshot,
1242          * so we need to be careful to deal with that.
1243          */
1244         if (running->xcnt == 0)
1245         {
1246                 if (builder->start_decoding_at == InvalidXLogRecPtr ||
1247                         builder->start_decoding_at <= lsn)
1248                         /* can decode everything after this */
1249                         builder->start_decoding_at = lsn + 1;
1250
1251                 builder->xmin = running->oldestRunningXid;
1252                 builder->xmax = running->latestCompletedXid;
1253                 TransactionIdAdvance(builder->xmax);
1254
1255                 Assert(TransactionIdIsNormal(builder->xmin));
1256                 Assert(TransactionIdIsNormal(builder->xmax));
1257
1258                 /* no transactions running now */
1259                 builder->running.xcnt = 0;
1260                 builder->running.xmin = InvalidTransactionId;
1261                 builder->running.xmax = InvalidTransactionId;
1262
1263                 builder->state = SNAPBUILD_CONSISTENT;
1264
1265                 ereport(LOG,
1266                                 (errmsg("logical decoding found consistent point at %X/%X",
1267                                                 (uint32) (lsn >> 32), (uint32) lsn),
1268                                  errdetail("running xacts with xcnt == 0")));
1269
1270                 return false;
1271         }
1272         /* c) valid on disk state */
1273         else if (SnapBuildRestore(builder, lsn))
1274         {
1275                 /* there won't be any state to cleanup */
1276                 return false;
1277         }
1278
1279         /*
1280          * b) first encounter of a useable xl_running_xacts record. If we had
1281          * found one earlier we would either track running transactions (i.e.
1282          * builder->running.xcnt != 0) or be consistent (this function wouldn't
1283          * get called).
1284          */
1285         else if (!builder->running.xcnt)
1286         {
1287                 int                     off;
1288
1289                 /*
1290                  * We only care about toplevel xids as those are the ones we
1291                  * definitely see in the wal stream. As snapbuild.c tracks committed
1292                  * instead of running transactions we don't need to know anything
1293                  * about uncommitted subtransactions.
1294                  */
1295                 builder->xmin = running->oldestRunningXid;
1296                 builder->xmax = running->latestCompletedXid;
1297                 TransactionIdAdvance(builder->xmax);
1298
1299                 /* so we can safely use the faster comparisons */
1300                 Assert(TransactionIdIsNormal(builder->xmin));
1301                 Assert(TransactionIdIsNormal(builder->xmax));
1302
1303                 builder->running.xcnt = running->xcnt;
1304                 builder->running.xcnt_space = running->xcnt;
1305                 builder->running.xip =
1306                         MemoryContextAlloc(builder->context,
1307                                                            builder->running.xcnt * sizeof(TransactionId));
1308                 memcpy(builder->running.xip, running->xids,
1309                            builder->running.xcnt * sizeof(TransactionId));
1310
1311                 /* sort so we can do a binary search */
1312                 qsort(builder->running.xip, builder->running.xcnt,
1313                           sizeof(TransactionId), xidComparator);
1314
1315                 builder->running.xmin = builder->running.xip[0];
1316                 builder->running.xmax = builder->running.xip[running->xcnt - 1];
1317
1318                 /* makes comparisons cheaper later */
1319                 TransactionIdRetreat(builder->running.xmin);
1320                 TransactionIdAdvance(builder->running.xmax);
1321
1322                 builder->state = SNAPBUILD_FULL_SNAPSHOT;
1323
1324                 ereport(LOG,
1325                         (errmsg("logical decoding found initial starting point at %X/%X",
1326                                         (uint32) (lsn >> 32), (uint32) lsn),
1327                          errdetail("%u xacts need to finish", (uint32) builder->running.xcnt)));
1328
1329                 /*
1330                  * Iterate through all xids, wait for them to finish.
1331                  *
1332                  * This isn't required for the correctness of decoding, but to allow
1333                  * isolationtester to notice that we're currently waiting for
1334                  * something.
1335                  */
1336                 for (off = 0; off < builder->running.xcnt; off++)
1337                 {
1338                         TransactionId xid = builder->running.xip[off];
1339
1340                         /*
1341                          * Upper layers should prevent that we ever need to wait on
1342                          * ourselves. Check anyway, since failing to do so would either
1343                          * result in an endless wait or an Assert() failure.
1344                          */
1345                         if (TransactionIdIsCurrentTransactionId(xid))
1346                                 elog(ERROR, "waiting for ourselves");
1347
1348                         XactLockTableWait(xid, NULL, NULL, XLTW_None);
1349                 }
1350
1351                 /* nothing could have built up so far, so don't perform cleanup */
1352                 return false;
1353         }
1354
1355         /*
1356          * We already started to track running xacts and need to wait for all
1357          * in-progress ones to finish. We fall through to the normal processing of
1358          * records so incremental cleanup can be performed.
1359          */
1360         return true;
1361 }
1362
1363
1364 /* -----------------------------------
1365  * Snapshot serialization support
1366  * -----------------------------------
1367  */
1368
1369 /*
1370  * We store current state of struct SnapBuild on disk in the following manner:
1371  *
1372  * struct SnapBuildOnDisk;
1373  * TransactionId * running.xcnt_space;
1374  * TransactionId * committed.xcnt; (*not xcnt_space*)
1375  *
1376  */
1377 typedef struct SnapBuildOnDisk
1378 {
1379         /* first part of this struct needs to be version independent */
1380
1381         /* data not covered by checksum */
1382         uint32          magic;
1383         pg_crc32        checksum;
1384
1385         /* data covered by checksum */
1386
1387         /* version, in case we want to support pg_upgrade */
1388         uint32          version;
1389         /* how large is the on disk data, excluding the constant sized part */
1390         uint32          length;
1391
1392         /* version dependent part */
1393         SnapBuild       builder;
1394
1395         /* variable amount of TransactionIds follows */
1396 } SnapBuildOnDisk;
1397
1398 #define SnapBuildOnDiskConstantSize \
1399         offsetof(SnapBuildOnDisk, builder)
1400 #define SnapBuildOnDiskNotChecksummedSize \
1401         offsetof(SnapBuildOnDisk, version)
1402
1403 #define SNAPBUILD_MAGIC 0x51A1E001
1404 #define SNAPBUILD_VERSION 1
1405
1406 /*
1407  * Store/Load a snapshot from disk, depending on the snapshot builder's state.
1408  *
1409  * Supposed to be used by external (i.e. not snapbuild.c) code that just read
1410  * a record that's a potential location for a serialized snapshot.
1411  */
1412 void
1413 SnapBuildSerializationPoint(SnapBuild *builder, XLogRecPtr lsn)
1414 {
1415         if (builder->state < SNAPBUILD_CONSISTENT)
1416                 SnapBuildRestore(builder, lsn);
1417         else
1418                 SnapBuildSerialize(builder, lsn);
1419 }
1420
1421 /*
1422  * Serialize the snapshot 'builder' at the location 'lsn' if it hasn't already
1423  * been done by another decoding process.
1424  */
1425 static void
1426 SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn)
1427 {
1428         Size            needed_length;
1429         SnapBuildOnDisk *ondisk;
1430         char       *ondisk_c;
1431         int                     fd;
1432         char            tmppath[MAXPGPATH];
1433         char            path[MAXPGPATH];
1434         int                     ret;
1435         struct stat stat_buf;
1436         Size            sz;
1437
1438         Assert(lsn != InvalidXLogRecPtr);
1439         Assert(builder->last_serialized_snapshot == InvalidXLogRecPtr ||
1440                    builder->last_serialized_snapshot <= lsn);
1441
1442         /*
1443          * no point in serializing if we cannot continue to work immediately after
1444          * restoring the snapshot
1445          */
1446         if (builder->state < SNAPBUILD_CONSISTENT)
1447                 return;
1448
1449         /*
1450          * We identify snapshots by the LSN they are valid for. We don't need to
1451          * include timelines in the name as each LSN maps to exactly one timeline
1452          * unless the user used pg_resetxlog or similar. If a user did so, there's
1453          * no hope continuing to decode anyway.
1454          */
1455         sprintf(path, "pg_logical/snapshots/%X-%X.snap",
1456                         (uint32) (lsn >> 32), (uint32) lsn);
1457
1458         /*
1459          * first check whether some other backend already has written the snapshot
1460          * for this LSN. It's perfectly fine if there's none, so we accept ENOENT
1461          * as a valid state. Everything else is an unexpected error.
1462          */
1463         ret = stat(path, &stat_buf);
1464
1465         if (ret != 0 && errno != ENOENT)
1466                 ereport(ERROR,
1467                                 (errmsg("could not stat file \"%s\": %m", path)));
1468
1469         else if (ret == 0)
1470         {
1471                 /*
1472                  * somebody else has already serialized to this point, don't overwrite
1473                  * but remember location, so we don't need to read old data again.
1474                  *
1475                  * To be sure it has been synced to disk after the rename() from the
1476                  * tempfile filename to the real filename, we just repeat the fsync.
1477                  * That ought to be cheap because in most scenarios it should already
1478                  * be safely on disk.
1479                  */
1480                 fsync_fname(path, false);
1481                 fsync_fname("pg_logical/snapshots", true);
1482
1483                 builder->last_serialized_snapshot = lsn;
1484                 goto out;
1485         }
1486
1487         /*
1488          * there is an obvious race condition here between the time we stat(2) the
1489          * file and us writing the file. But we rename the file into place
1490          * atomically and all files created need to contain the same data anyway,
1491          * so this is perfectly fine, although a bit of a resource waste. Locking
1492          * seems like pointless complication.
1493          */
1494         elog(DEBUG1, "serializing snapshot to %s", path);
1495
1496         /* to make sure only we will write to this tempfile, include pid */
1497         sprintf(tmppath, "pg_logical/snapshots/%X-%X.snap.%u.tmp",
1498                         (uint32) (lsn >> 32), (uint32) lsn, MyProcPid);
1499
1500         /*
1501          * Unlink temporary file if it already exists, needs to have been before a
1502          * crash/error since we won't enter this function twice from within a
1503          * single decoding slot/backend and the temporary file contains the pid of
1504          * the current process.
1505          */
1506         if (unlink(tmppath) != 0 && errno != ENOENT)
1507                 ereport(ERROR,
1508                                 (errcode_for_file_access(),
1509                                  errmsg("could not remove file \"%s\": %m", path)));
1510
1511         needed_length = sizeof(SnapBuildOnDisk) +
1512                 sizeof(TransactionId) * builder->running.xcnt_space +
1513                 sizeof(TransactionId) * builder->committed.xcnt;
1514
1515         ondisk_c = MemoryContextAllocZero(builder->context, needed_length);
1516         ondisk = (SnapBuildOnDisk *) ondisk_c;
1517         ondisk->magic = SNAPBUILD_MAGIC;
1518         ondisk->version = SNAPBUILD_VERSION;
1519         ondisk->length = needed_length;
1520         INIT_CRC32(ondisk->checksum);
1521         COMP_CRC32(ondisk->checksum,
1522                            ((char *) ondisk) + SnapBuildOnDiskNotChecksummedSize,
1523                         SnapBuildOnDiskConstantSize - SnapBuildOnDiskNotChecksummedSize);
1524         ondisk_c += sizeof(SnapBuildOnDisk);
1525
1526         memcpy(&ondisk->builder, builder, sizeof(SnapBuild));
1527         /* NULL-ify memory-only data */
1528         ondisk->builder.context = NULL;
1529         ondisk->builder.snapshot = NULL;
1530         ondisk->builder.reorder = NULL;
1531         ondisk->builder.running.xip = NULL;
1532         ondisk->builder.committed.xip = NULL;
1533
1534         COMP_CRC32(ondisk->checksum,
1535                            &ondisk->builder,
1536                            sizeof(SnapBuild));
1537
1538         /* copy running xacts */
1539         sz = sizeof(TransactionId) * builder->running.xcnt_space;
1540         memcpy(ondisk_c, builder->running.xip, sz);
1541         COMP_CRC32(ondisk->checksum, ondisk_c, sz);
1542         ondisk_c += sz;
1543
1544         /* copy committed xacts */
1545         sz = sizeof(TransactionId) * builder->committed.xcnt;
1546         memcpy(ondisk_c, builder->committed.xip, sz);
1547         COMP_CRC32(ondisk->checksum, ondisk_c, sz);
1548         ondisk_c += sz;
1549
1550         /* we have valid data now, open tempfile and write it there */
1551         fd = OpenTransientFile(tmppath,
1552                                                    O_CREAT | O_EXCL | O_WRONLY | PG_BINARY,
1553                                                    S_IRUSR | S_IWUSR);
1554         if (fd < 0)
1555                 ereport(ERROR,
1556                                 (errmsg("could not open file \"%s\": %m", path)));
1557
1558         if ((write(fd, ondisk, needed_length)) != needed_length)
1559         {
1560                 CloseTransientFile(fd);
1561                 ereport(ERROR,
1562                                 (errcode_for_file_access(),
1563                                  errmsg("could not write to file \"%s\": %m", tmppath)));
1564         }
1565
1566         /*
1567          * fsync the file before renaming so that even if we crash after this we
1568          * have either a fully valid file or nothing.
1569          *
1570          * TODO: Do the fsync() via checkpoints/restartpoints, doing it here has
1571          * some noticeable overhead since it's performed synchronously during
1572          * decoding?
1573          */
1574         if (pg_fsync(fd) != 0)
1575         {
1576                 CloseTransientFile(fd);
1577                 ereport(ERROR,
1578                                 (errcode_for_file_access(),
1579                                  errmsg("could not fsync file \"%s\": %m", tmppath)));
1580         }
1581         CloseTransientFile(fd);
1582
1583         fsync_fname("pg_logical/snapshots", true);
1584
1585         /*
1586          * We may overwrite the work from some other backend, but that's ok, our
1587          * snapshot is valid as well, we'll just have done some superflous work.
1588          */
1589         if (rename(tmppath, path) != 0)
1590         {
1591                 ereport(ERROR,
1592                                 (errcode_for_file_access(),
1593                                  errmsg("could not rename file \"%s\" to \"%s\": %m",
1594                                                 tmppath, path)));
1595         }
1596
1597         /* make sure we persist */
1598         fsync_fname(path, false);
1599         fsync_fname("pg_logical/snapshots", true);
1600
1601         /*
1602          * Now there's no way we can loose the dumped state anymore, remember this
1603          * as a serialization point.
1604          */
1605         builder->last_serialized_snapshot = lsn;
1606
1607 out:
1608         ReorderBufferSetRestartPoint(builder->reorder,
1609                                                                  builder->last_serialized_snapshot);
1610 }
1611
1612 /*
1613  * Restore a snapshot into 'builder' if previously one has been stored at the
1614  * location indicated by 'lsn'. Returns true if successful, false otherwise.
1615  */
1616 static bool
1617 SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn)
1618 {
1619         SnapBuildOnDisk ondisk;
1620         int                     fd;
1621         char            path[MAXPGPATH];
1622         Size            sz;
1623         int                     readBytes;
1624         pg_crc32        checksum;
1625
1626         /* no point in loading a snapshot if we're already there */
1627         if (builder->state == SNAPBUILD_CONSISTENT)
1628                 return false;
1629
1630         sprintf(path, "pg_logical/snapshots/%X-%X.snap",
1631                         (uint32) (lsn >> 32), (uint32) lsn);
1632
1633         fd = OpenTransientFile(path, O_RDONLY | PG_BINARY, 0);
1634
1635         if (fd < 0 && errno == ENOENT)
1636                 return false;
1637         else if (fd < 0)
1638                 ereport(ERROR,
1639                                 (errcode_for_file_access(),
1640                                  errmsg("could not open file \"%s\": %m", path)));
1641
1642         /* ----
1643          * Make sure the snapshot had been stored safely to disk, that's normally
1644          * cheap.
1645          * Note that we do not need PANIC here, nobody will be able to use the
1646          * slot without fsyncing, and saving it won't suceed without an fsync()
1647          * either...
1648          * ----
1649          */
1650         fsync_fname(path, false);
1651         fsync_fname("pg_logical/snapshots", true);
1652
1653
1654         /* read statically sized portion of snapshot */
1655         readBytes = read(fd, &ondisk, SnapBuildOnDiskConstantSize);
1656         if (readBytes != SnapBuildOnDiskConstantSize)
1657         {
1658                 CloseTransientFile(fd);
1659                 ereport(ERROR,
1660                                 (errcode_for_file_access(),
1661                                  errmsg("could not read file \"%s\", read %d of %d: %m",
1662                                                 path, readBytes, (int) SnapBuildOnDiskConstantSize)));
1663         }
1664
1665         if (ondisk.magic != SNAPBUILD_MAGIC)
1666                 ereport(ERROR,
1667                                 (errmsg("snapbuild state file \"%s\" has wrong magic %u instead of %u",
1668                                                 path, ondisk.magic, SNAPBUILD_MAGIC)));
1669
1670         if (ondisk.version != SNAPBUILD_VERSION)
1671                 ereport(ERROR,
1672                                 (errmsg("snapbuild state file \"%s\" has unsupported version %u instead of %u",
1673                                                 path, ondisk.version, SNAPBUILD_VERSION)));
1674
1675         INIT_CRC32(checksum);
1676         COMP_CRC32(checksum,
1677                            ((char *) &ondisk) + SnapBuildOnDiskNotChecksummedSize,
1678                         SnapBuildOnDiskConstantSize - SnapBuildOnDiskNotChecksummedSize);
1679
1680         /* read SnapBuild */
1681         readBytes = read(fd, &ondisk.builder, sizeof(SnapBuild));
1682         if (readBytes != sizeof(SnapBuild))
1683         {
1684                 CloseTransientFile(fd);
1685                 ereport(ERROR,
1686                                 (errcode_for_file_access(),
1687                                  errmsg("could not read file \"%s\", read %d of %d: %m",
1688                                                 path, readBytes, (int) sizeof(SnapBuild))));
1689         }
1690         COMP_CRC32(checksum, &ondisk.builder, sizeof(SnapBuild));
1691
1692         /* restore running xacts information */
1693         sz = sizeof(TransactionId) * ondisk.builder.running.xcnt_space;
1694         ondisk.builder.running.xip = MemoryContextAllocZero(builder->context, sz);
1695         readBytes = read(fd, ondisk.builder.running.xip, sz);
1696         if (readBytes != sz)
1697         {
1698                 CloseTransientFile(fd);
1699                 ereport(ERROR,
1700                                 (errcode_for_file_access(),
1701                                  errmsg("could not read file \"%s\", read %d of %d: %m",
1702                                                 path, readBytes, (int) sz)));
1703         }
1704         COMP_CRC32(checksum, ondisk.builder.running.xip, sz);
1705
1706         /* restore committed xacts information */
1707         sz = sizeof(TransactionId) * ondisk.builder.committed.xcnt;
1708         ondisk.builder.committed.xip = MemoryContextAllocZero(builder->context, sz);
1709         readBytes = read(fd, ondisk.builder.committed.xip, sz);
1710         if (readBytes != sz)
1711         {
1712                 CloseTransientFile(fd);
1713                 ereport(ERROR,
1714                                 (errcode_for_file_access(),
1715                                  errmsg("could not read file \"%s\", read %d of %d: %m",
1716                                                 path, readBytes, (int) sz)));
1717         }
1718         COMP_CRC32(checksum, ondisk.builder.committed.xip, sz);
1719
1720         CloseTransientFile(fd);
1721
1722         /* verify checksum of what we've read */
1723         if (!EQ_CRC32(checksum, ondisk.checksum))
1724                 ereport(ERROR,
1725                                 (errcode_for_file_access(),
1726                                  errmsg("snapbuild state file %s: checksum mismatch, is %u, should be %u",
1727                                                 path, checksum, ondisk.checksum)));
1728
1729         /*
1730          * ok, we now have a sensible snapshot here, figure out if it has more
1731          * information than we have.
1732          */
1733
1734         /*
1735          * We are only interested in consistent snapshots for now, comparing
1736          * whether one imcomplete snapshot is more "advanced" seems to be
1737          * unnecessarily complex.
1738          */
1739         if (ondisk.builder.state < SNAPBUILD_CONSISTENT)
1740                 goto snapshot_not_interesting;
1741
1742         /*
1743          * Don't use a snapshot that requires an xmin that we cannot guarantee to
1744          * be available.
1745          */
1746         if (TransactionIdPrecedes(ondisk.builder.xmin, builder->initial_xmin_horizon))
1747                 goto snapshot_not_interesting;
1748
1749
1750         /* ok, we think the snapshot is sensible, copy over everything important */
1751         builder->xmin = ondisk.builder.xmin;
1752         builder->xmax = ondisk.builder.xmax;
1753         builder->state = ondisk.builder.state;
1754
1755         builder->committed.xcnt = ondisk.builder.committed.xcnt;
1756         /* We only allocated/stored xcnt, not xcnt_space xids ! */
1757         /* don't overwrite preallocated xip, if we don't have anything here */
1758         if (builder->committed.xcnt > 0)
1759         {
1760                 pfree(builder->committed.xip);
1761                 builder->committed.xcnt_space = ondisk.builder.committed.xcnt;
1762                 builder->committed.xip = ondisk.builder.committed.xip;
1763         }
1764         ondisk.builder.committed.xip = NULL;
1765
1766         builder->running.xcnt = ondisk.builder.running.xcnt;
1767         if (builder->running.xip)
1768                 pfree(builder->running.xip);
1769         builder->running.xcnt_space = ondisk.builder.running.xcnt_space;
1770         builder->running.xip = ondisk.builder.running.xip;
1771
1772         /* our snapshot is not interesting anymore, build a new one */
1773         if (builder->snapshot != NULL)
1774         {
1775                 SnapBuildSnapDecRefcount(builder->snapshot);
1776         }
1777         builder->snapshot = SnapBuildBuildSnapshot(builder, InvalidTransactionId);
1778         SnapBuildSnapIncRefcount(builder->snapshot);
1779
1780         ReorderBufferSetRestartPoint(builder->reorder, lsn);
1781
1782         Assert(builder->state == SNAPBUILD_CONSISTENT);
1783
1784         ereport(LOG,
1785                         (errmsg("logical decoding found consistent point at %X/%X",
1786                                         (uint32) (lsn >> 32), (uint32) lsn),
1787                          errdetail("found initial snapshot in snapbuild file")));
1788         return true;
1789
1790 snapshot_not_interesting:
1791         if (ondisk.builder.running.xip != NULL)
1792                 pfree(ondisk.builder.running.xip);
1793         if (ondisk.builder.committed.xip != NULL)
1794                 pfree(ondisk.builder.committed.xip);
1795         return false;
1796 }
1797
1798 /*
1799  * Remove all serialized snapshots that are not required anymore because no
1800  * slot can need them. This doesn't actually have to run during a checkpoint,
1801  * but it's a convenient point to schedule this.
1802  *
1803  * NB: We run this during checkpoints even if logical decoding is disabled so
1804  * we cleanup old slots at some point after it got disabled.
1805  */
1806 void
1807 CheckPointSnapBuild(void)
1808 {
1809         XLogRecPtr      cutoff;
1810         XLogRecPtr      redo;
1811         DIR                *snap_dir;
1812         struct dirent *snap_de;
1813         char            path[MAXPGPATH];
1814
1815         /*
1816          * We start of with a minimum of the last redo pointer. No new replication
1817          * slot will start before that, so that's a safe upper bound for removal.
1818          */
1819         redo = GetRedoRecPtr();
1820
1821         /* now check for the restart ptrs from existing slots */
1822         cutoff = ReplicationSlotsComputeLogicalRestartLSN();
1823
1824         /* don't start earlier than the restart lsn */
1825         if (redo < cutoff)
1826                 cutoff = redo;
1827
1828         snap_dir = AllocateDir("pg_logical/snapshots");
1829         while ((snap_de = ReadDir(snap_dir, "pg_logical/snapshots")) != NULL)
1830         {
1831                 uint32          hi;
1832                 uint32          lo;
1833                 XLogRecPtr      lsn;
1834                 struct stat statbuf;
1835
1836                 if (strcmp(snap_de->d_name, ".") == 0 ||
1837                         strcmp(snap_de->d_name, "..") == 0)
1838                         continue;
1839
1840                 snprintf(path, MAXPGPATH, "pg_logical/snapshots/%s", snap_de->d_name);
1841
1842                 if (lstat(path, &statbuf) == 0 && !S_ISREG(statbuf.st_mode))
1843                 {
1844                         elog(DEBUG1, "only regular files expected: %s", path);
1845                         continue;
1846                 }
1847
1848                 /*
1849                  * temporary filenames from SnapBuildSerialize() include the LSN and
1850                  * everything but are postfixed by .$pid.tmp. We can just remove them
1851                  * the same as other files because there can be none that are
1852                  * currently being written that are older than cutoff.
1853                  *
1854                  * We just log a message if a file doesn't fit the pattern, it's
1855                  * probably some editors lock/state file or similar...
1856                  */
1857                 if (sscanf(snap_de->d_name, "%X-%X.snap", &hi, &lo) != 2)
1858                 {
1859                         ereport(LOG,
1860                                         (errmsg("could not parse file name \"%s\"", path)));
1861                         continue;
1862                 }
1863
1864                 lsn = ((uint64) hi) << 32 | lo;
1865
1866                 /* check whether we still need it */
1867                 if (lsn < cutoff || cutoff == InvalidXLogRecPtr)
1868                 {
1869                         elog(DEBUG1, "removing snapbuild snapshot %s", path);
1870
1871                         /*
1872                          * It's not particularly harmful, though strange, if we can't
1873                          * remove the file here. Don't prevent the checkpoint from
1874                          * completing, that'd be cure worse than the disease.
1875                          */
1876                         if (unlink(path) < 0)
1877                         {
1878                                 ereport(LOG,
1879                                                 (errcode_for_file_access(),
1880                                                  errmsg("could not remove file \"%s\": %m",
1881                                                                 path)));
1882                                 continue;
1883                         }
1884                 }
1885         }
1886         FreeDir(snap_dir);
1887 }