]> granicus.if.org Git - postgresql/blob - src/backend/replication/logical/snapbuild.c
Message style improvements
[postgresql] / src / backend / replication / logical / snapbuild.c
1 /*-------------------------------------------------------------------------
2  *
3  * snapbuild.c
4  *
5  *        Infrastructure for building historic catalog snapshots based on contents
6  *        of the WAL, for the purpose of decoding heapam.c style values in the
7  *        WAL.
8  *
9  * NOTES:
10  *
11  * We build snapshots which can *only* be used to read catalog contents and we
12  * do so by reading and interpreting the WAL stream. The aim is to build a
13  * snapshot that behaves the same as a freshly taken MVCC snapshot would have
14  * at the time the XLogRecord was generated.
15  *
16  * To build the snapshots we reuse the infrastructure built for Hot
17  * Standby. The in-memory snapshots we build look different than HS' because
18  * we have different needs. To successfully decode data from the WAL we only
19  * need to access catalog tables and (sys|rel|cat)cache, not the actual user
20  * tables since the data we decode is wholly contained in the WAL
21  * records. Also, our snapshots need to be different in comparison to normal
22  * MVCC ones because in contrast to those we cannot fully rely on the clog and
23  * pg_subtrans for information about committed transactions because they might
24  * commit in the future from the POV of the WAL entry we're currently
25  * decoding. This definition has the advantage that we only need to prevent
26  * removal of catalog rows, while normal table's rows can still be
27  * removed. This is achieved by using the replication slot mechanism.
28  *
29  * As the percentage of transactions modifying the catalog normally is fairly
30  * small in comparisons to ones only manipulating user data, we keep track of
31  * the committed catalog modifying ones inside [xmin, xmax) instead of keeping
32  * track of all running transactions like it's done in a normal snapshot. Note
33  * that we're generally only looking at transactions that have acquired an
34  * xid. That is we keep a list of transactions between snapshot->(xmin, xmax)
35  * that we consider committed, everything else is considered aborted/in
36  * progress. That also allows us not to care about subtransactions before they
37  * have committed which means this modules, in contrast to HS, doesn't have to
38  * care about suboverflowed subtransactions and similar.
39  *
40  * One complexity of doing this is that to e.g. handle mixed DDL/DML
41  * transactions we need Snapshots that see intermediate versions of the
42  * catalog in a transaction. During normal operation this is achieved by using
43  * CommandIds/cmin/cmax. The problem with that however is that for space
44  * efficiency reasons only one value of that is stored
45  * (c.f. combocid.c). Since ComboCids are only available in memory we log
46  * additional information which allows us to get the original (cmin, cmax)
47  * pair during visibility checks. Check the reorderbuffer.c's comment above
48  * ResolveCminCmaxDuringDecoding() for details.
49  *
50  * To facilitate all this we need our own visibility routine, as the normal
51  * ones are optimized for different usecases.
52  *
53  * To replace the normal catalog snapshots with decoding ones use the
54  * SetupHistoricSnapshot() and TeardownHistoricSnapshot() functions.
55  *
56  *
57  *
58  * The snapbuild machinery is starting up in several stages, as illustrated
59  * by the following graph:
60  *                 +-------------------------+
61  *        +----|SNAPBUILD_START                  |-------------+
62  *        |    +-------------------------+                         |
63  *        |                                     |                                                  |
64  *        |                                     |                                                  |
65  *        |             running_xacts with running xacts           |
66  *        |                                     |                                                  |
67  *        |                                     |                                                  |
68  *        |                                     v                                                  |
69  *        |    +-------------------------+                         v
70  *        |    |SNAPBUILD_FULL_SNAPSHOT  |------------>|
71  *        |    +-------------------------+                         |
72  * running_xacts                |                                          saved snapshot
73  * with zero xacts              |                                 at running_xacts's lsn
74  *        |                                     |                                                  |
75  *        |             all running toplevel TXNs finished         |
76  *        |                                     |                                                  |
77  *        |                                     v                                                  |
78  *        |    +-------------------------+                         |
79  *        +--->|SNAPBUILD_CONSISTENT     |<------------+
80  *                 +-------------------------+
81  *
82  * Initially the machinery is in the START stage. When an xl_running_xacts
83  * record is read that is sufficiently new (above the safe xmin horizon),
84  * there's a state transition. If there were no running xacts when the
85  * runnign_xacts record was generated, we'll directly go into CONSISTENT
86  * state, otherwise we'll switch to the FULL_SNAPSHOT state. Having a full
87  * snapshot means that all transactions that start henceforth can be decoded
88  * in their entirety, but transactions that started previously can't. In
89  * FULL_SNAPSHOT we'll switch into CONSISTENT once all those previously
90  * running transactions have committed or aborted.
91  *
92  * Only transactions that commit after CONSISTENT state has been reached will
93  * be replayed, even though they might have started while still in
94  * FULL_SNAPSHOT. That ensures that we'll reach a point where no previous
95  * changes has been exported, but all the following ones will be. That point
96  * is a convenient point to initialize replication from, which is why we
97  * export a snapshot at that point, which *can* be used to read normal data.
98  *
99  * Copyright (c) 2012-2015, PostgreSQL Global Development Group
100  *
101  * IDENTIFICATION
102  *        src/backend/replication/snapbuild.c
103  *
104  *-------------------------------------------------------------------------
105  */
106
107 #include "postgres.h"
108
109 #include <sys/stat.h>
110 #include <sys/types.h>
111 #include <unistd.h>
112
113 #include "miscadmin.h"
114
115 #include "access/heapam_xlog.h"
116 #include "access/transam.h"
117 #include "access/xact.h"
118
119 #include "replication/logical.h"
120 #include "replication/reorderbuffer.h"
121 #include "replication/snapbuild.h"
122
123 #include "utils/builtins.h"
124 #include "utils/memutils.h"
125 #include "utils/snapshot.h"
126 #include "utils/snapmgr.h"
127 #include "utils/tqual.h"
128
129 #include "storage/block.h"              /* debugging output */
130 #include "storage/fd.h"
131 #include "storage/lmgr.h"
132 #include "storage/proc.h"
133 #include "storage/procarray.h"
134 #include "storage/standby.h"
135
136 /*
137  * This struct contains the current state of the snapshot building
138  * machinery. Besides a forward declaration in the header, it is not exposed
139  * to the public, so we can easily change its contents.
140  */
141 struct SnapBuild
142 {
143         /* how far are we along building our first full snapshot */
144         SnapBuildState state;
145
146         /* private memory context used to allocate memory for this module. */
147         MemoryContext context;
148
149         /* all transactions < than this have committed/aborted */
150         TransactionId xmin;
151
152         /* all transactions >= than this are uncommitted */
153         TransactionId xmax;
154
155         /*
156          * Don't replay commits from an LSN < this LSN. This can be set externally
157          * but it will also be advanced (never retreat) from within snapbuild.c.
158          */
159         XLogRecPtr      start_decoding_at;
160
161         /*
162          * Don't start decoding WAL until the "xl_running_xacts" information
163          * indicates there are no running xids with an xid smaller than this.
164          */
165         TransactionId initial_xmin_horizon;
166
167         /*
168          * Snapshot that's valid to see the catalog state seen at this moment.
169          */
170         Snapshot        snapshot;
171
172         /*
173          * LSN of the last location we are sure a snapshot has been serialized to.
174          */
175         XLogRecPtr      last_serialized_snapshot;
176
177         /*
178          * The reorderbuffer we need to update with usable snapshots et al.
179          */
180         ReorderBuffer *reorder;
181
182         /*
183          * Information about initially running transactions
184          *
185          * When we start building a snapshot there already may be transactions in
186          * progress.  Those are stored in running.xip.  We don't have enough
187          * information about those to decode their contents, so until they are
188          * finished (xcnt=0) we cannot switch to a CONSISTENT state.
189          */
190         struct
191         {
192                 /*
193                  * As long as running.xcnt all XIDs < running.xmin and > running.xmax
194                  * have to be checked whether they still are running.
195                  */
196                 TransactionId xmin;
197                 TransactionId xmax;
198
199                 size_t          xcnt;           /* number of used xip entries */
200                 size_t          xcnt_space; /* allocated size of xip */
201                 TransactionId *xip;             /* running xacts array, xidComparator-sorted */
202         }                       running;
203
204         /*
205          * Array of transactions which could have catalog changes that committed
206          * between xmin and xmax.
207          */
208         struct
209         {
210                 /* number of committed transactions */
211                 size_t          xcnt;
212
213                 /* available space for committed transactions */
214                 size_t          xcnt_space;
215
216                 /*
217                  * Until we reach a CONSISTENT state, we record commits of all
218                  * transactions, not just the catalog changing ones. Record when that
219                  * changes so we know we cannot export a snapshot safely anymore.
220                  */
221                 bool            includes_all_transactions;
222
223                 /*
224                  * Array of committed transactions that have modified the catalog.
225                  *
226                  * As this array is frequently modified we do *not* keep it in
227                  * xidComparator order. Instead we sort the array when building &
228                  * distributing a snapshot.
229                  *
230                  * TODO: It's unclear whether that reasoning has much merit. Every
231                  * time we add something here after becoming consistent will also
232                  * require distributing a snapshot. Storing them sorted would
233                  * potentially also make it easier to purge (but more complicated wrt
234                  * wraparound?). Should be improved if sorting while building the
235                  * snapshot shows up in profiles.
236                  */
237                 TransactionId *xip;
238         }                       committed;
239 };
240
241 /*
242  * Starting a transaction -- which we need to do while exporting a snapshot --
243  * removes knowledge about the previously used resowner, so we save it here.
244  */
245 static ResourceOwner SavedResourceOwnerDuringExport = NULL;
246 static bool ExportInProgress = false;
247
248 /* transaction state manipulation functions */
249 static void SnapBuildEndTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid);
250
251 /* ->running manipulation */
252 static bool SnapBuildTxnIsRunning(SnapBuild *builder, TransactionId xid);
253
254 /* ->committed manipulation */
255 static void SnapBuildPurgeCommittedTxn(SnapBuild *builder);
256
257 /* snapshot building/manipulation/distribution functions */
258 static Snapshot SnapBuildBuildSnapshot(SnapBuild *builder, TransactionId xid);
259
260 static void SnapBuildFreeSnapshot(Snapshot snap);
261
262 static void SnapBuildSnapIncRefcount(Snapshot snap);
263
264 static void SnapBuildDistributeNewCatalogSnapshot(SnapBuild *builder, XLogRecPtr lsn);
265
266 /* xlog reading helper functions for SnapBuildProcessRecord */
267 static bool SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *running);
268
269 /* serialization functions */
270 static void SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn);
271 static bool SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn);
272
273
274 /*
275  * Allocate a new snapshot builder.
276  *
277  * xmin_horizon is the xid >=which we can be sure no catalog rows have been
278  * removed, start_lsn is the LSN >= we want to replay commits.
279  */
280 SnapBuild *
281 AllocateSnapshotBuilder(ReorderBuffer *reorder,
282                                                 TransactionId xmin_horizon,
283                                                 XLogRecPtr start_lsn)
284 {
285         MemoryContext context;
286         MemoryContext oldcontext;
287         SnapBuild  *builder;
288
289         /* allocate memory in own context, to have better accountability */
290         context = AllocSetContextCreate(CurrentMemoryContext,
291                                                                         "snapshot builder context",
292                                                                         ALLOCSET_DEFAULT_MINSIZE,
293                                                                         ALLOCSET_DEFAULT_INITSIZE,
294                                                                         ALLOCSET_DEFAULT_MAXSIZE);
295         oldcontext = MemoryContextSwitchTo(context);
296
297         builder = palloc0(sizeof(SnapBuild));
298
299         builder->state = SNAPBUILD_START;
300         builder->context = context;
301         builder->reorder = reorder;
302         /* Other struct members initialized by zeroing via palloc0 above */
303
304         builder->committed.xcnt = 0;
305         builder->committed.xcnt_space = 128;            /* arbitrary number */
306         builder->committed.xip =
307                 palloc0(builder->committed.xcnt_space * sizeof(TransactionId));
308         builder->committed.includes_all_transactions = true;
309
310         builder->initial_xmin_horizon = xmin_horizon;
311         builder->start_decoding_at = start_lsn;
312
313         MemoryContextSwitchTo(oldcontext);
314
315         return builder;
316 }
317
318 /*
319  * Free a snapshot builder.
320  */
321 void
322 FreeSnapshotBuilder(SnapBuild *builder)
323 {
324         MemoryContext context = builder->context;
325
326         /* free snapshot explicitly, that contains some error checking */
327         if (builder->snapshot != NULL)
328         {
329                 SnapBuildSnapDecRefcount(builder->snapshot);
330                 builder->snapshot = NULL;
331         }
332
333         /* other resources are deallocated via memory context reset */
334         MemoryContextDelete(context);
335 }
336
337 /*
338  * Free an unreferenced snapshot that has previously been built by us.
339  */
340 static void
341 SnapBuildFreeSnapshot(Snapshot snap)
342 {
343         /* make sure we don't get passed an external snapshot */
344         Assert(snap->satisfies == HeapTupleSatisfiesHistoricMVCC);
345
346         /* make sure nobody modified our snapshot */
347         Assert(snap->curcid == FirstCommandId);
348         Assert(!snap->suboverflowed);
349         Assert(!snap->takenDuringRecovery);
350         Assert(snap->regd_count == 0);
351
352         /* slightly more likely, so it's checked even without c-asserts */
353         if (snap->copied)
354                 elog(ERROR, "cannot free a copied snapshot");
355
356         if (snap->active_count)
357                 elog(ERROR, "cannot free an active snapshot");
358
359         pfree(snap);
360 }
361
362 /*
363  * In which state of snapshot building are we?
364  */
365 SnapBuildState
366 SnapBuildCurrentState(SnapBuild *builder)
367 {
368         return builder->state;
369 }
370
371 /*
372  * Should the contents of transaction ending at 'ptr' be decoded?
373  */
374 bool
375 SnapBuildXactNeedsSkip(SnapBuild *builder, XLogRecPtr ptr)
376 {
377         return ptr < builder->start_decoding_at;
378 }
379
380 /*
381  * Increase refcount of a snapshot.
382  *
383  * This is used when handing out a snapshot to some external resource or when
384  * adding a Snapshot as builder->snapshot.
385  */
386 static void
387 SnapBuildSnapIncRefcount(Snapshot snap)
388 {
389         snap->active_count++;
390 }
391
392 /*
393  * Decrease refcount of a snapshot and free if the refcount reaches zero.
394  *
395  * Externally visible, so that external resources that have been handed an
396  * IncRef'ed Snapshot can adjust its refcount easily.
397  */
398 void
399 SnapBuildSnapDecRefcount(Snapshot snap)
400 {
401         /* make sure we don't get passed an external snapshot */
402         Assert(snap->satisfies == HeapTupleSatisfiesHistoricMVCC);
403
404         /* make sure nobody modified our snapshot */
405         Assert(snap->curcid == FirstCommandId);
406         Assert(!snap->suboverflowed);
407         Assert(!snap->takenDuringRecovery);
408
409         Assert(snap->regd_count == 0);
410
411         Assert(snap->active_count > 0);
412
413         /* slightly more likely, so it's checked even without casserts */
414         if (snap->copied)
415                 elog(ERROR, "cannot free a copied snapshot");
416
417         snap->active_count--;
418         if (snap->active_count == 0)
419                 SnapBuildFreeSnapshot(snap);
420 }
421
422 /*
423  * Build a new snapshot, based on currently committed catalog-modifying
424  * transactions.
425  *
426  * In-progress transactions with catalog access are *not* allowed to modify
427  * these snapshots; they have to copy them and fill in appropriate ->curcid
428  * and ->subxip/subxcnt values.
429  */
430 static Snapshot
431 SnapBuildBuildSnapshot(SnapBuild *builder, TransactionId xid)
432 {
433         Snapshot        snapshot;
434         Size            ssize;
435
436         Assert(builder->state >= SNAPBUILD_FULL_SNAPSHOT);
437
438         ssize = sizeof(SnapshotData)
439                 + sizeof(TransactionId) * builder->committed.xcnt
440                 + sizeof(TransactionId) * 1 /* toplevel xid */ ;
441
442         snapshot = MemoryContextAllocZero(builder->context, ssize);
443
444         snapshot->satisfies = HeapTupleSatisfiesHistoricMVCC;
445
446         /*
447          * We misuse the original meaning of SnapshotData's xip and subxip fields
448          * to make the more fitting for our needs.
449          *
450          * In the 'xip' array we store transactions that have to be treated as
451          * committed. Since we will only ever look at tuples from transactions
452          * that have modified the catalog it's more efficient to store those few
453          * that exist between xmin and xmax (frequently there are none).
454          *
455          * Snapshots that are used in transactions that have modified the catalog
456          * also use the 'subxip' array to store their toplevel xid and all the
457          * subtransaction xids so we can recognize when we need to treat rows as
458          * visible that are not in xip but still need to be visible. Subxip only
459          * gets filled when the transaction is copied into the context of a
460          * catalog modifying transaction since we otherwise share a snapshot
461          * between transactions. As long as a txn hasn't modified the catalog it
462          * doesn't need to treat any uncommitted rows as visible, so there is no
463          * need for those xids.
464          *
465          * Both arrays are qsort'ed so that we can use bsearch() on them.
466          */
467         Assert(TransactionIdIsNormal(builder->xmin));
468         Assert(TransactionIdIsNormal(builder->xmax));
469
470         snapshot->xmin = builder->xmin;
471         snapshot->xmax = builder->xmax;
472
473         /* store all transactions to be treated as committed by this snapshot */
474         snapshot->xip =
475                 (TransactionId *) ((char *) snapshot + sizeof(SnapshotData));
476         snapshot->xcnt = builder->committed.xcnt;
477         memcpy(snapshot->xip,
478                    builder->committed.xip,
479                    builder->committed.xcnt * sizeof(TransactionId));
480
481         /* sort so we can bsearch() */
482         qsort(snapshot->xip, snapshot->xcnt, sizeof(TransactionId), xidComparator);
483
484         /*
485          * Initially, subxip is empty, i.e. it's a snapshot to be used by
486          * transactions that don't modify the catalog. Will be filled by
487          * ReorderBufferCopySnap() if necessary.
488          */
489         snapshot->subxcnt = 0;
490         snapshot->subxip = NULL;
491
492         snapshot->suboverflowed = false;
493         snapshot->takenDuringRecovery = false;
494         snapshot->copied = false;
495         snapshot->curcid = FirstCommandId;
496         snapshot->active_count = 0;
497         snapshot->regd_count = 0;
498
499         return snapshot;
500 }
501
502 /*
503  * Export a snapshot so it can be set in another session with SET TRANSACTION
504  * SNAPSHOT.
505  *
506  * For that we need to start a transaction in the current backend as the
507  * importing side checks whether the source transaction is still open to make
508  * sure the xmin horizon hasn't advanced since then.
509  *
510  * After that we convert a locally built snapshot into the normal variant
511  * understood by HeapTupleSatisfiesMVCC et al.
512  */
513 const char *
514 SnapBuildExportSnapshot(SnapBuild *builder)
515 {
516         Snapshot        snap;
517         char       *snapname;
518         TransactionId xid;
519         TransactionId *newxip;
520         int                     newxcnt = 0;
521
522         if (builder->state != SNAPBUILD_CONSISTENT)
523                 elog(ERROR, "cannot export a snapshot before reaching a consistent state");
524
525         if (!builder->committed.includes_all_transactions)
526                 elog(ERROR, "cannot export a snapshot, not all transactions are monitored anymore");
527
528         /* so we don't overwrite the existing value */
529         if (TransactionIdIsValid(MyPgXact->xmin))
530                 elog(ERROR, "cannot export a snapshot when MyPgXact->xmin already is valid");
531
532         if (IsTransactionOrTransactionBlock())
533                 elog(ERROR, "cannot export a snapshot from within a transaction");
534
535         if (SavedResourceOwnerDuringExport)
536                 elog(ERROR, "can only export one snapshot at a time");
537
538         SavedResourceOwnerDuringExport = CurrentResourceOwner;
539         ExportInProgress = true;
540
541         StartTransactionCommand();
542
543         Assert(!FirstSnapshotSet);
544
545         /* There doesn't seem to a nice API to set these */
546         XactIsoLevel = XACT_REPEATABLE_READ;
547         XactReadOnly = true;
548
549         snap = SnapBuildBuildSnapshot(builder, GetTopTransactionId());
550
551         /*
552          * We know that snap->xmin is alive, enforced by the logical xmin
553          * mechanism. Due to that we can do this without locks, we're only
554          * changing our own value.
555          */
556         MyPgXact->xmin = snap->xmin;
557
558         /* allocate in transaction context */
559         newxip = (TransactionId *)
560                 palloc(sizeof(TransactionId) * GetMaxSnapshotXidCount());
561
562         /*
563          * snapbuild.c builds transactions in an "inverted" manner, which means it
564          * stores committed transactions in ->xip, not ones in progress. Build a
565          * classical snapshot by marking all non-committed transactions as
566          * in-progress. This can be expensive.
567          */
568         for (xid = snap->xmin; NormalTransactionIdPrecedes(xid, snap->xmax);)
569         {
570                 void       *test;
571
572                 /*
573                  * Check whether transaction committed using the decoding snapshot
574                  * meaning of ->xip.
575                  */
576                 test = bsearch(&xid, snap->xip, snap->xcnt,
577                                            sizeof(TransactionId), xidComparator);
578
579                 if (test == NULL)
580                 {
581                         if (newxcnt >= GetMaxSnapshotXidCount())
582                                 elog(ERROR, "snapshot too large");
583
584                         newxip[newxcnt++] = xid;
585                 }
586
587                 TransactionIdAdvance(xid);
588         }
589
590         snap->xcnt = newxcnt;
591         snap->xip = newxip;
592
593         /*
594          * now that we've built a plain snapshot, use the normal mechanisms for
595          * exporting it
596          */
597         snapname = ExportSnapshot(snap);
598
599         ereport(LOG,
600                         (errmsg_plural("exported logical decoding snapshot: \"%s\" with %u transaction ID",
601                 "exported logical decoding snapshot: \"%s\" with %u transaction IDs",
602                                                    snap->xcnt,
603                                                    snapname, snap->xcnt)));
604         return snapname;
605 }
606
607 /*
608  * Reset a previously SnapBuildExportSnapshot()'ed snapshot if there is
609  * any. Aborts the previously started transaction and resets the resource
610  * owner back to its original value.
611  */
612 void
613 SnapBuildClearExportedSnapshot(void)
614 {
615         /* nothing exported, thats the usual case */
616         if (!ExportInProgress)
617                 return;
618
619         if (!IsTransactionState())
620                 elog(ERROR, "clearing exported snapshot in wrong transaction state");
621
622         /* make sure nothing  could have ever happened */
623         AbortCurrentTransaction();
624
625         CurrentResourceOwner = SavedResourceOwnerDuringExport;
626         SavedResourceOwnerDuringExport = NULL;
627         ExportInProgress = false;
628 }
629
630 /*
631  * Handle the effects of a single heap change, appropriate to the current state
632  * of the snapshot builder and returns whether changes made at (xid, lsn) can
633  * be decoded.
634  */
635 bool
636 SnapBuildProcessChange(SnapBuild *builder, TransactionId xid, XLogRecPtr lsn)
637 {
638         bool            is_old_tx;
639
640         /*
641          * We can't handle data in transactions if we haven't built a snapshot
642          * yet, so don't store them.
643          */
644         if (builder->state < SNAPBUILD_FULL_SNAPSHOT)
645                 return false;
646
647         /*
648          * No point in keeping track of changes in transactions that we don't have
649          * enough information about to decode. This means that they started before
650          * we got into the SNAPBUILD_FULL_SNAPSHOT state.
651          */
652         if (builder->state < SNAPBUILD_CONSISTENT &&
653                 SnapBuildTxnIsRunning(builder, xid))
654                 return false;
655
656         /*
657          * If the reorderbuffer doesn't yet have a snapshot, add one now, it will
658          * be needed to decode the change we're currently processing.
659          */
660         is_old_tx = ReorderBufferIsXidKnown(builder->reorder, xid);
661
662         if (!is_old_tx || !ReorderBufferXidHasBaseSnapshot(builder->reorder, xid))
663         {
664                 /* only build a new snapshot if we don't have a prebuilt one */
665                 if (builder->snapshot == NULL)
666                 {
667                         builder->snapshot = SnapBuildBuildSnapshot(builder, xid);
668                         /* inrease refcount for the snapshot builder */
669                         SnapBuildSnapIncRefcount(builder->snapshot);
670                 }
671
672                 /*
673                  * Increase refcount for the transaction we're handing the snapshot
674                  * out to.
675                  */
676                 SnapBuildSnapIncRefcount(builder->snapshot);
677                 ReorderBufferSetBaseSnapshot(builder->reorder, xid, lsn,
678                                                                          builder->snapshot);
679         }
680
681         return true;
682 }
683
684 /*
685  * Do CommandId/ComboCid handling after reading an xl_heap_new_cid record.
686  * This implies that a transaction has done some form of write to system
687  * catalogs.
688  */
689 void
690 SnapBuildProcessNewCid(SnapBuild *builder, TransactionId xid,
691                                            XLogRecPtr lsn, xl_heap_new_cid *xlrec)
692 {
693         CommandId       cid;
694
695         /*
696          * we only log new_cid's if a catalog tuple was modified, so mark the
697          * transaction as containing catalog modifications
698          */
699         ReorderBufferXidSetCatalogChanges(builder->reorder, xid, lsn);
700
701         ReorderBufferAddNewTupleCids(builder->reorder, xlrec->top_xid, lsn,
702                                                                  xlrec->target_node, xlrec->target_tid,
703                                                                  xlrec->cmin, xlrec->cmax,
704                                                                  xlrec->combocid);
705
706         /* figure out new command id */
707         if (xlrec->cmin != InvalidCommandId &&
708                 xlrec->cmax != InvalidCommandId)
709                 cid = Max(xlrec->cmin, xlrec->cmax);
710         else if (xlrec->cmax != InvalidCommandId)
711                 cid = xlrec->cmax;
712         else if (xlrec->cmin != InvalidCommandId)
713                 cid = xlrec->cmin;
714         else
715         {
716                 cid = InvalidCommandId; /* silence compiler */
717                 elog(ERROR, "xl_heap_new_cid record without a valid CommandId");
718         }
719
720         ReorderBufferAddNewCommandId(builder->reorder, xid, lsn, cid + 1);
721 }
722
723 /*
724  * Check whether `xid` is currently 'running'.
725  *
726  * Running transactions in our parlance are transactions which we didn't
727  * observe from the start so we can't properly decode their contents. They
728  * only exist after we freshly started from an < CONSISTENT snapshot.
729  */
730 static bool
731 SnapBuildTxnIsRunning(SnapBuild *builder, TransactionId xid)
732 {
733         Assert(builder->state < SNAPBUILD_CONSISTENT);
734         Assert(TransactionIdIsNormal(builder->running.xmin));
735         Assert(TransactionIdIsNormal(builder->running.xmax));
736
737         if (builder->running.xcnt &&
738                 NormalTransactionIdFollows(xid, builder->running.xmin) &&
739                 NormalTransactionIdPrecedes(xid, builder->running.xmax))
740         {
741                 TransactionId *search =
742                 bsearch(&xid, builder->running.xip, builder->running.xcnt_space,
743                                 sizeof(TransactionId), xidComparator);
744
745                 if (search != NULL)
746                 {
747                         Assert(*search == xid);
748                         return true;
749                 }
750         }
751
752         return false;
753 }
754
755 /*
756  * Add a new Snapshot to all transactions we're decoding that currently are
757  * in-progress so they can see new catalog contents made by the transaction
758  * that just committed. This is necessary because those in-progress
759  * transactions will use the new catalog's contents from here on (at the very
760  * least everything they do needs to be compatible with newer catalog
761  * contents).
762  */
763 static void
764 SnapBuildDistributeNewCatalogSnapshot(SnapBuild *builder, XLogRecPtr lsn)
765 {
766         dlist_iter      txn_i;
767         ReorderBufferTXN *txn;
768
769         /*
770          * Iterate through all toplevel transactions. This can include
771          * subtransactions which we just don't yet know to be that, but that's
772          * fine, they will just get an unnecessary snapshot queued.
773          */
774         dlist_foreach(txn_i, &builder->reorder->toplevel_by_lsn)
775         {
776                 txn = dlist_container(ReorderBufferTXN, node, txn_i.cur);
777
778                 Assert(TransactionIdIsValid(txn->xid));
779
780                 /*
781                  * If we don't have a base snapshot yet, there are no changes in this
782                  * transaction which in turn implies we don't yet need a snapshot at
783                  * all. We'll add a snapshot when the first change gets queued.
784                  *
785                  * NB: This works correctly even for subtransactions because
786                  * ReorderBufferCommitChild() takes care to pass the parent the base
787                  * snapshot, and while iterating the changequeue we'll get the change
788                  * from the subtxn.
789                  */
790                 if (!ReorderBufferXidHasBaseSnapshot(builder->reorder, txn->xid))
791                         continue;
792
793                 elog(DEBUG2, "adding a new snapshot to %u at %X/%X",
794                          txn->xid, (uint32) (lsn >> 32), (uint32) lsn);
795
796                 /*
797                  * increase the snapshot's refcount for the transaction we are handing
798                  * it out to
799                  */
800                 SnapBuildSnapIncRefcount(builder->snapshot);
801                 ReorderBufferAddSnapshot(builder->reorder, txn->xid, lsn,
802                                                                  builder->snapshot);
803         }
804 }
805
806 /*
807  * Keep track of a new catalog changing transaction that has committed.
808  */
809 static void
810 SnapBuildAddCommittedTxn(SnapBuild *builder, TransactionId xid)
811 {
812         Assert(TransactionIdIsValid(xid));
813
814         if (builder->committed.xcnt == builder->committed.xcnt_space)
815         {
816                 builder->committed.xcnt_space = builder->committed.xcnt_space * 2 + 1;
817
818                 elog(DEBUG1, "increasing space for committed transactions to %u",
819                          (uint32) builder->committed.xcnt_space);
820
821                 builder->committed.xip = repalloc(builder->committed.xip,
822                                           builder->committed.xcnt_space * sizeof(TransactionId));
823         }
824
825         /*
826          * TODO: It might make sense to keep the array sorted here instead of
827          * doing it every time we build a new snapshot. On the other hand this
828          * gets called repeatedly when a transaction with subtransactions commits.
829          */
830         builder->committed.xip[builder->committed.xcnt++] = xid;
831 }
832
833 /*
834  * Remove knowledge about transactions we treat as committed that are smaller
835  * than ->xmin. Those won't ever get checked via the ->committed array but via
836  * the clog machinery, so we don't need to waste memory on them.
837  */
838 static void
839 SnapBuildPurgeCommittedTxn(SnapBuild *builder)
840 {
841         int                     off;
842         TransactionId *workspace;
843         int                     surviving_xids = 0;
844
845         /* not ready yet */
846         if (!TransactionIdIsNormal(builder->xmin))
847                 return;
848
849         /* TODO: Neater algorithm than just copying and iterating? */
850         workspace =
851                 MemoryContextAlloc(builder->context,
852                                                    builder->committed.xcnt * sizeof(TransactionId));
853
854         /* copy xids that still are interesting to workspace */
855         for (off = 0; off < builder->committed.xcnt; off++)
856         {
857                 if (NormalTransactionIdPrecedes(builder->committed.xip[off],
858                                                                                 builder->xmin))
859                         ;                                       /* remove */
860                 else
861                         workspace[surviving_xids++] = builder->committed.xip[off];
862         }
863
864         /* copy workspace back to persistent state */
865         memcpy(builder->committed.xip, workspace,
866                    surviving_xids * sizeof(TransactionId));
867
868         elog(DEBUG3, "purged committed transactions from %u to %u, xmin: %u, xmax: %u",
869                  (uint32) builder->committed.xcnt, (uint32) surviving_xids,
870                  builder->xmin, builder->xmax);
871         builder->committed.xcnt = surviving_xids;
872
873         pfree(workspace);
874 }
875
876 /*
877  * Common logic for SnapBuildAbortTxn and SnapBuildCommitTxn dealing with
878  * keeping track of the amount of running transactions.
879  */
880 static void
881 SnapBuildEndTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid)
882 {
883         if (builder->state == SNAPBUILD_CONSISTENT)
884                 return;
885
886         /*
887          * NB: This handles subtransactions correctly even if we started from
888          * suboverflowed xl_running_xacts because we only keep track of toplevel
889          * transactions. Since the latter are always are allocated before their
890          * subxids and since they end at the same time it's sufficient to deal
891          * with them here.
892          */
893         if (SnapBuildTxnIsRunning(builder, xid))
894         {
895                 Assert(builder->running.xcnt > 0);
896
897                 if (!--builder->running.xcnt)
898                 {
899                         /*
900                          * None of the originally running transaction is running anymore,
901                          * so our incrementaly built snapshot now is consistent.
902                          */
903                         ereport(LOG,
904                                   (errmsg("logical decoding found consistent point at %X/%X",
905                                                   (uint32) (lsn >> 32), (uint32) lsn),
906                                    errdetail("Transaction ID %u finished; no more running transactions.",
907                                                          xid)));
908                         builder->state = SNAPBUILD_CONSISTENT;
909                 }
910         }
911 }
912
913 /*
914  * Abort a transaction, throw away all state we kept.
915  */
916 void
917 SnapBuildAbortTxn(SnapBuild *builder, XLogRecPtr lsn,
918                                   TransactionId xid,
919                                   int nsubxacts, TransactionId *subxacts)
920 {
921         int                     i;
922
923         for (i = 0; i < nsubxacts; i++)
924         {
925                 TransactionId subxid = subxacts[i];
926
927                 SnapBuildEndTxn(builder, lsn, subxid);
928         }
929
930         SnapBuildEndTxn(builder, lsn, xid);
931 }
932
933 /*
934  * Handle everything that needs to be done when a transaction commits
935  */
936 void
937 SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid,
938                                    int nsubxacts, TransactionId *subxacts)
939 {
940         int                     nxact;
941
942         bool            forced_timetravel = false;
943         bool            sub_needs_timetravel = false;
944         bool            top_needs_timetravel = false;
945
946         TransactionId xmax = xid;
947
948         /*
949          * If we couldn't observe every change of a transaction because it was
950          * already running at the point we started to observe we have to assume it
951          * made catalog changes.
952          *
953          * This has the positive benefit that we afterwards have enough
954          * information to build an exportable snapshot that's usable by pg_dump et
955          * al.
956          */
957         if (builder->state < SNAPBUILD_CONSISTENT)
958         {
959                 /* ensure that only commits after this are getting replayed */
960                 if (builder->start_decoding_at <= lsn)
961                         builder->start_decoding_at = lsn + 1;
962
963                 /*
964                  * We could avoid treating !SnapBuildTxnIsRunning transactions as
965                  * timetravel ones, but we want to be able to export a snapshot when
966                  * we reached consistency.
967                  */
968                 forced_timetravel = true;
969                 elog(DEBUG1, "forced to assume catalog changes for xid %u because it was running to early", xid);
970         }
971
972         for (nxact = 0; nxact < nsubxacts; nxact++)
973         {
974                 TransactionId subxid = subxacts[nxact];
975
976                 /*
977                  * make sure txn is not tracked in running txn's anymore, switch state
978                  */
979                 SnapBuildEndTxn(builder, lsn, subxid);
980
981                 /*
982                  * If we're forcing timetravel we also need visibility information
983                  * about subtransaction, so keep track of subtransaction's state.
984                  */
985                 if (forced_timetravel)
986                 {
987                         SnapBuildAddCommittedTxn(builder, subxid);
988                         if (NormalTransactionIdFollows(subxid, xmax))
989                                 xmax = subxid;
990                 }
991
992                 /*
993                  * Add subtransaction to base snapshot if it DDL, we don't distinguish
994                  * to toplevel transactions there.
995                  */
996                 else if (ReorderBufferXidHasCatalogChanges(builder->reorder, subxid))
997                 {
998                         sub_needs_timetravel = true;
999
1000                         elog(DEBUG1, "found subtransaction %u:%u with catalog changes.",
1001                                  xid, subxid);
1002
1003                         SnapBuildAddCommittedTxn(builder, subxid);
1004
1005                         if (NormalTransactionIdFollows(subxid, xmax))
1006                                 xmax = subxid;
1007                 }
1008         }
1009
1010         /*
1011          * Make sure toplevel txn is not tracked in running txn's anymore, switch
1012          * state to consistent if possible.
1013          */
1014         SnapBuildEndTxn(builder, lsn, xid);
1015
1016         if (forced_timetravel)
1017         {
1018                 elog(DEBUG2, "forced transaction %u to do timetravel.", xid);
1019
1020                 SnapBuildAddCommittedTxn(builder, xid);
1021         }
1022         /* add toplevel transaction to base snapshot */
1023         else if (ReorderBufferXidHasCatalogChanges(builder->reorder, xid))
1024         {
1025                 elog(DEBUG2, "found top level transaction %u, with catalog changes!",
1026                          xid);
1027
1028                 top_needs_timetravel = true;
1029                 SnapBuildAddCommittedTxn(builder, xid);
1030         }
1031         else if (sub_needs_timetravel)
1032         {
1033                 /* mark toplevel txn as timetravel as well */
1034                 SnapBuildAddCommittedTxn(builder, xid);
1035         }
1036
1037         /* if there's any reason to build a historic snapshot, do so now */
1038         if (forced_timetravel || top_needs_timetravel || sub_needs_timetravel)
1039         {
1040                 /*
1041                  * Adjust xmax of the snapshot builder, we only do that for committed,
1042                  * catalog modifying, transactions, everything else isn't interesting
1043                  * for us since we'll never look at the respective rows.
1044                  */
1045                 if (!TransactionIdIsValid(builder->xmax) ||
1046                         TransactionIdFollowsOrEquals(xmax, builder->xmax))
1047                 {
1048                         builder->xmax = xmax;
1049                         TransactionIdAdvance(builder->xmax);
1050                 }
1051
1052                 /*
1053                  * If we haven't built a complete snapshot yet there's no need to hand
1054                  * it out, it wouldn't (and couldn't) be used anyway.
1055                  */
1056                 if (builder->state < SNAPBUILD_FULL_SNAPSHOT)
1057                         return;
1058
1059                 /*
1060                  * Decrease the snapshot builder's refcount of the old snapshot, note
1061                  * that it still will be used if it has been handed out to the
1062                  * reorderbuffer earlier.
1063                  */
1064                 if (builder->snapshot)
1065                         SnapBuildSnapDecRefcount(builder->snapshot);
1066
1067                 builder->snapshot = SnapBuildBuildSnapshot(builder, xid);
1068
1069                 /* we might need to execute invalidations, add snapshot */
1070                 if (!ReorderBufferXidHasBaseSnapshot(builder->reorder, xid))
1071                 {
1072                         SnapBuildSnapIncRefcount(builder->snapshot);
1073                         ReorderBufferSetBaseSnapshot(builder->reorder, xid, lsn,
1074                                                                                  builder->snapshot);
1075                 }
1076
1077                 /* refcount of the snapshot builder for the new snapshot */
1078                 SnapBuildSnapIncRefcount(builder->snapshot);
1079
1080                 /* add a new Snapshot to all currently running transactions */
1081                 SnapBuildDistributeNewCatalogSnapshot(builder, lsn);
1082         }
1083         else
1084         {
1085                 /* record that we cannot export a general snapshot anymore */
1086                 builder->committed.includes_all_transactions = false;
1087         }
1088 }
1089
1090
1091 /* -----------------------------------
1092  * Snapshot building functions dealing with xlog records
1093  * -----------------------------------
1094  */
1095
1096 /*
1097  * Process a running xacts record, and use its information to first build a
1098  * historic snapshot and later to release resources that aren't needed
1099  * anymore.
1100  */
1101 void
1102 SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *running)
1103 {
1104         ReorderBufferTXN *txn;
1105
1106         /*
1107          * If we're not consistent yet, inspect the record to see whether it
1108          * allows to get closer to being consistent. If we are consistent, dump
1109          * our snapshot so others or we, after a restart, can use it.
1110          */
1111         if (builder->state < SNAPBUILD_CONSISTENT)
1112         {
1113                 /* returns false if there's no point in performing cleanup just yet */
1114                 if (!SnapBuildFindSnapshot(builder, lsn, running))
1115                         return;
1116         }
1117         else
1118                 SnapBuildSerialize(builder, lsn);
1119
1120         /*
1121          * Update range of interesting xids based on the running xacts
1122          * information. We don't increase ->xmax using it, because once we are in
1123          * a consistent state we can do that ourselves and much more efficiently
1124          * so, because we only need to do it for catalog transactions since we
1125          * only ever look at those.
1126          *
1127          * NB: Because of that xmax can be lower than xmin, because we only
1128          * increase xmax when a catalog modifying transaction commits. While odd
1129          * looking, it's correct and actually more efficient this way since we hit
1130          * fast paths in tqual.c.
1131          */
1132         builder->xmin = running->oldestRunningXid;
1133
1134         /* Remove transactions we don't need to keep track off anymore */
1135         SnapBuildPurgeCommittedTxn(builder);
1136
1137         elog(DEBUG3, "xmin: %u, xmax: %u, oldestrunning: %u",
1138                  builder->xmin, builder->xmax,
1139                  running->oldestRunningXid);
1140
1141         /*
1142          * Inrease shared memory limits, so vacuum can work on tuples we prevented
1143          * from being pruned till now.
1144          */
1145         LogicalIncreaseXminForSlot(lsn, running->oldestRunningXid);
1146
1147         /*
1148          * Also tell the slot where we can restart decoding from. We don't want to
1149          * do that after every commit because changing that implies an fsync of
1150          * the logical slot's state file, so we only do it every time we see a
1151          * running xacts record.
1152          *
1153          * Do so by looking for the oldest in progress transaction (determined by
1154          * the first LSN of any of its relevant records). Every transaction
1155          * remembers the last location we stored the snapshot to disk before its
1156          * beginning. That point is where we can restart from.
1157          */
1158
1159         /*
1160          * Can't know about a serialized snapshot's location if we're not
1161          * consistent.
1162          */
1163         if (builder->state < SNAPBUILD_CONSISTENT)
1164                 return;
1165
1166         txn = ReorderBufferGetOldestTXN(builder->reorder);
1167
1168         /*
1169          * oldest ongoing txn might have started when we didn't yet serialize
1170          * anything because we hadn't reached a consistent state yet.
1171          */
1172         if (txn != NULL && txn->restart_decoding_lsn != InvalidXLogRecPtr)
1173                 LogicalIncreaseRestartDecodingForSlot(lsn, txn->restart_decoding_lsn);
1174
1175         /*
1176          * No in-progress transaction, can reuse the last serialized snapshot if
1177          * we have one.
1178          */
1179         else if (txn == NULL &&
1180                 builder->reorder->current_restart_decoding_lsn != InvalidXLogRecPtr &&
1181                          builder->last_serialized_snapshot != InvalidXLogRecPtr)
1182                 LogicalIncreaseRestartDecodingForSlot(lsn,
1183                                                                                   builder->last_serialized_snapshot);
1184 }
1185
1186
1187 /*
1188  * Build the start of a snapshot that's capable of decoding the catalog.
1189  *
1190  * Helper function for SnapBuildProcessRunningXacts() while we're not yet
1191  * consistent.
1192  *
1193  * Returns true if there is a point in performing internal maintenance/cleanup
1194  * using the xl_running_xacts record.
1195  */
1196 static bool
1197 SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *running)
1198 {
1199         /* ---
1200          * Build catalog decoding snapshot incrementally using information about
1201          * the currently running transactions. There are several ways to do that:
1202          *
1203          * a) There were no running transactions when the xl_running_xacts record
1204          *        was inserted, jump to CONSISTENT immediately. We might find such a
1205          *        state we were waiting for b) and c).
1206          *
1207          * b) Wait for all toplevel transactions that were running to end. We
1208          *        simply track the number of in-progress toplevel transactions and
1209          *        lower it whenever one commits or aborts. When that number
1210          *        (builder->running.xcnt) reaches zero, we can go from FULL_SNAPSHOT
1211          *        to CONSISTENT.
1212          *        NB: We need to search running.xip when seeing a transaction's end to
1213          *        make sure it's a toplevel transaction and it's been one of the
1214          *        initially running ones.
1215          *        Interestingly, in contrast to HS, this allows us not to care about
1216          *        subtransactions - and by extension suboverflowed xl_running_xacts -
1217          *        at all.
1218          *
1219          * c) This (in a previous run) or another decoding slot serialized a
1220          *        snapshot to disk that we can use.
1221          * ---
1222          */
1223
1224         /*
1225          * xl_running_xact record is older than what we can use, we might not have
1226          * all necessary catalog rows anymore.
1227          */
1228         if (TransactionIdIsNormal(builder->initial_xmin_horizon) &&
1229                 NormalTransactionIdPrecedes(running->oldestRunningXid,
1230                                                                         builder->initial_xmin_horizon))
1231         {
1232                 ereport(DEBUG1,
1233                                 (errmsg_internal("skipping snapshot at %X/%X while building logical decoding snapshot, xmin horizon too low",
1234                                                                  (uint32) (lsn >> 32), (uint32) lsn),
1235                 errdetail_internal("initial xmin horizon of %u vs the snapshot's %u",
1236                                  builder->initial_xmin_horizon, running->oldestRunningXid)));
1237                 return true;
1238         }
1239
1240         /*
1241          * a) No transaction were running, we can jump to consistent.
1242          *
1243          * NB: We might have already started to incrementally assemble a snapshot,
1244          * so we need to be careful to deal with that.
1245          */
1246         if (running->xcnt == 0)
1247         {
1248                 if (builder->start_decoding_at == InvalidXLogRecPtr ||
1249                         builder->start_decoding_at <= lsn)
1250                         /* can decode everything after this */
1251                         builder->start_decoding_at = lsn + 1;
1252
1253                 /* As no transactions were running xmin/xmax can be trivially set. */
1254                 builder->xmin = running->nextXid;               /* < are finished */
1255                 builder->xmax = running->nextXid;               /* >= are running */
1256
1257                 /* so we can safely use the faster comparisons */
1258                 Assert(TransactionIdIsNormal(builder->xmin));
1259                 Assert(TransactionIdIsNormal(builder->xmax));
1260
1261                 /* no transactions running now */
1262                 builder->running.xcnt = 0;
1263                 builder->running.xmin = InvalidTransactionId;
1264                 builder->running.xmax = InvalidTransactionId;
1265
1266                 builder->state = SNAPBUILD_CONSISTENT;
1267
1268                 ereport(LOG,
1269                                 (errmsg("logical decoding found consistent point at %X/%X",
1270                                                 (uint32) (lsn >> 32), (uint32) lsn),
1271                                  errdetail("There are no running transactions.")));
1272
1273                 return false;
1274         }
1275         /* c) valid on disk state */
1276         else if (SnapBuildRestore(builder, lsn))
1277         {
1278                 /* there won't be any state to cleanup */
1279                 return false;
1280         }
1281
1282         /*
1283          * b) first encounter of a useable xl_running_xacts record. If we had
1284          * found one earlier we would either track running transactions (i.e.
1285          * builder->running.xcnt != 0) or be consistent (this function wouldn't
1286          * get called).
1287          */
1288         else if (!builder->running.xcnt)
1289         {
1290                 int                     off;
1291
1292                 /*
1293                  * We only care about toplevel xids as those are the ones we
1294                  * definitely see in the wal stream. As snapbuild.c tracks committed
1295                  * instead of running transactions we don't need to know anything
1296                  * about uncommitted subtransactions.
1297                  */
1298
1299                 /*
1300                  * Start with an xmin/xmax that's correct for future, when all the
1301                  * currently running transactions have finished. We'll update both
1302                  * while waiting for the pending transactions to finish.
1303                  */
1304                 builder->xmin = running->nextXid;               /* < are finished */
1305                 builder->xmax = running->nextXid;               /* >= are running */
1306
1307                 /* so we can safely use the faster comparisons */
1308                 Assert(TransactionIdIsNormal(builder->xmin));
1309                 Assert(TransactionIdIsNormal(builder->xmax));
1310
1311                 builder->running.xcnt = running->xcnt;
1312                 builder->running.xcnt_space = running->xcnt;
1313                 builder->running.xip =
1314                         MemoryContextAlloc(builder->context,
1315                                                            builder->running.xcnt * sizeof(TransactionId));
1316                 memcpy(builder->running.xip, running->xids,
1317                            builder->running.xcnt * sizeof(TransactionId));
1318
1319                 /* sort so we can do a binary search */
1320                 qsort(builder->running.xip, builder->running.xcnt,
1321                           sizeof(TransactionId), xidComparator);
1322
1323                 builder->running.xmin = builder->running.xip[0];
1324                 builder->running.xmax = builder->running.xip[running->xcnt - 1];
1325
1326                 /* makes comparisons cheaper later */
1327                 TransactionIdRetreat(builder->running.xmin);
1328                 TransactionIdAdvance(builder->running.xmax);
1329
1330                 builder->state = SNAPBUILD_FULL_SNAPSHOT;
1331
1332                 ereport(LOG,
1333                         (errmsg("logical decoding found initial starting point at %X/%X",
1334                                         (uint32) (lsn >> 32), (uint32) lsn),
1335                          errdetail_plural("%u transaction needs to finish.",
1336                                                           "%u transactions need to finish.",
1337                                                           builder->running.xcnt,
1338                                                           (uint32) builder->running.xcnt)));
1339
1340                 /*
1341                  * Iterate through all xids, wait for them to finish.
1342                  *
1343                  * This isn't required for the correctness of decoding, but to allow
1344                  * isolationtester to notice that we're currently waiting for
1345                  * something.
1346                  */
1347                 for (off = 0; off < builder->running.xcnt; off++)
1348                 {
1349                         TransactionId xid = builder->running.xip[off];
1350
1351                         /*
1352                          * Upper layers should prevent that we ever need to wait on
1353                          * ourselves. Check anyway, since failing to do so would either
1354                          * result in an endless wait or an Assert() failure.
1355                          */
1356                         if (TransactionIdIsCurrentTransactionId(xid))
1357                                 elog(ERROR, "waiting for ourselves");
1358
1359                         XactLockTableWait(xid, NULL, NULL, XLTW_None);
1360                 }
1361
1362                 /* nothing could have built up so far, so don't perform cleanup */
1363                 return false;
1364         }
1365
1366         /*
1367          * We already started to track running xacts and need to wait for all
1368          * in-progress ones to finish. We fall through to the normal processing of
1369          * records so incremental cleanup can be performed.
1370          */
1371         return true;
1372 }
1373
1374
1375 /* -----------------------------------
1376  * Snapshot serialization support
1377  * -----------------------------------
1378  */
1379
1380 /*
1381  * We store current state of struct SnapBuild on disk in the following manner:
1382  *
1383  * struct SnapBuildOnDisk;
1384  * TransactionId * running.xcnt_space;
1385  * TransactionId * committed.xcnt; (*not xcnt_space*)
1386  *
1387  */
1388 typedef struct SnapBuildOnDisk
1389 {
1390         /* first part of this struct needs to be version independent */
1391
1392         /* data not covered by checksum */
1393         uint32          magic;
1394         pg_crc32c       checksum;
1395
1396         /* data covered by checksum */
1397
1398         /* version, in case we want to support pg_upgrade */
1399         uint32          version;
1400         /* how large is the on disk data, excluding the constant sized part */
1401         uint32          length;
1402
1403         /* version dependent part */
1404         SnapBuild       builder;
1405
1406         /* variable amount of TransactionIds follows */
1407 } SnapBuildOnDisk;
1408
1409 #define SnapBuildOnDiskConstantSize \
1410         offsetof(SnapBuildOnDisk, builder)
1411 #define SnapBuildOnDiskNotChecksummedSize \
1412         offsetof(SnapBuildOnDisk, version)
1413
1414 #define SNAPBUILD_MAGIC 0x51A1E001
1415 #define SNAPBUILD_VERSION 2
1416
1417 /*
1418  * Store/Load a snapshot from disk, depending on the snapshot builder's state.
1419  *
1420  * Supposed to be used by external (i.e. not snapbuild.c) code that just read
1421  * a record that's a potential location for a serialized snapshot.
1422  */
1423 void
1424 SnapBuildSerializationPoint(SnapBuild *builder, XLogRecPtr lsn)
1425 {
1426         if (builder->state < SNAPBUILD_CONSISTENT)
1427                 SnapBuildRestore(builder, lsn);
1428         else
1429                 SnapBuildSerialize(builder, lsn);
1430 }
1431
1432 /*
1433  * Serialize the snapshot 'builder' at the location 'lsn' if it hasn't already
1434  * been done by another decoding process.
1435  */
1436 static void
1437 SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn)
1438 {
1439         Size            needed_length;
1440         SnapBuildOnDisk *ondisk;
1441         char       *ondisk_c;
1442         int                     fd;
1443         char            tmppath[MAXPGPATH];
1444         char            path[MAXPGPATH];
1445         int                     ret;
1446         struct stat stat_buf;
1447         Size            sz;
1448
1449         Assert(lsn != InvalidXLogRecPtr);
1450         Assert(builder->last_serialized_snapshot == InvalidXLogRecPtr ||
1451                    builder->last_serialized_snapshot <= lsn);
1452
1453         /*
1454          * no point in serializing if we cannot continue to work immediately after
1455          * restoring the snapshot
1456          */
1457         if (builder->state < SNAPBUILD_CONSISTENT)
1458                 return;
1459
1460         /*
1461          * We identify snapshots by the LSN they are valid for. We don't need to
1462          * include timelines in the name as each LSN maps to exactly one timeline
1463          * unless the user used pg_resetxlog or similar. If a user did so, there's
1464          * no hope continuing to decode anyway.
1465          */
1466         sprintf(path, "pg_logical/snapshots/%X-%X.snap",
1467                         (uint32) (lsn >> 32), (uint32) lsn);
1468
1469         /*
1470          * first check whether some other backend already has written the snapshot
1471          * for this LSN. It's perfectly fine if there's none, so we accept ENOENT
1472          * as a valid state. Everything else is an unexpected error.
1473          */
1474         ret = stat(path, &stat_buf);
1475
1476         if (ret != 0 && errno != ENOENT)
1477                 ereport(ERROR,
1478                                 (errmsg("could not stat file \"%s\": %m", path)));
1479
1480         else if (ret == 0)
1481         {
1482                 /*
1483                  * somebody else has already serialized to this point, don't overwrite
1484                  * but remember location, so we don't need to read old data again.
1485                  *
1486                  * To be sure it has been synced to disk after the rename() from the
1487                  * tempfile filename to the real filename, we just repeat the fsync.
1488                  * That ought to be cheap because in most scenarios it should already
1489                  * be safely on disk.
1490                  */
1491                 fsync_fname(path, false);
1492                 fsync_fname("pg_logical/snapshots", true);
1493
1494                 builder->last_serialized_snapshot = lsn;
1495                 goto out;
1496         }
1497
1498         /*
1499          * there is an obvious race condition here between the time we stat(2) the
1500          * file and us writing the file. But we rename the file into place
1501          * atomically and all files created need to contain the same data anyway,
1502          * so this is perfectly fine, although a bit of a resource waste. Locking
1503          * seems like pointless complication.
1504          */
1505         elog(DEBUG1, "serializing snapshot to %s", path);
1506
1507         /* to make sure only we will write to this tempfile, include pid */
1508         sprintf(tmppath, "pg_logical/snapshots/%X-%X.snap.%u.tmp",
1509                         (uint32) (lsn >> 32), (uint32) lsn, MyProcPid);
1510
1511         /*
1512          * Unlink temporary file if it already exists, needs to have been before a
1513          * crash/error since we won't enter this function twice from within a
1514          * single decoding slot/backend and the temporary file contains the pid of
1515          * the current process.
1516          */
1517         if (unlink(tmppath) != 0 && errno != ENOENT)
1518                 ereport(ERROR,
1519                                 (errcode_for_file_access(),
1520                                  errmsg("could not remove file \"%s\": %m", path)));
1521
1522         needed_length = sizeof(SnapBuildOnDisk) +
1523                 sizeof(TransactionId) * builder->running.xcnt_space +
1524                 sizeof(TransactionId) * builder->committed.xcnt;
1525
1526         ondisk_c = MemoryContextAllocZero(builder->context, needed_length);
1527         ondisk = (SnapBuildOnDisk *) ondisk_c;
1528         ondisk->magic = SNAPBUILD_MAGIC;
1529         ondisk->version = SNAPBUILD_VERSION;
1530         ondisk->length = needed_length;
1531         INIT_CRC32C(ondisk->checksum);
1532         COMP_CRC32C(ondisk->checksum,
1533                                 ((char *) ondisk) + SnapBuildOnDiskNotChecksummedSize,
1534                         SnapBuildOnDiskConstantSize - SnapBuildOnDiskNotChecksummedSize);
1535         ondisk_c += sizeof(SnapBuildOnDisk);
1536
1537         memcpy(&ondisk->builder, builder, sizeof(SnapBuild));
1538         /* NULL-ify memory-only data */
1539         ondisk->builder.context = NULL;
1540         ondisk->builder.snapshot = NULL;
1541         ondisk->builder.reorder = NULL;
1542         ondisk->builder.running.xip = NULL;
1543         ondisk->builder.committed.xip = NULL;
1544
1545         COMP_CRC32C(ondisk->checksum,
1546                                 &ondisk->builder,
1547                                 sizeof(SnapBuild));
1548
1549         /* copy running xacts */
1550         sz = sizeof(TransactionId) * builder->running.xcnt_space;
1551         memcpy(ondisk_c, builder->running.xip, sz);
1552         COMP_CRC32C(ondisk->checksum, ondisk_c, sz);
1553         ondisk_c += sz;
1554
1555         /* copy committed xacts */
1556         sz = sizeof(TransactionId) * builder->committed.xcnt;
1557         memcpy(ondisk_c, builder->committed.xip, sz);
1558         COMP_CRC32C(ondisk->checksum, ondisk_c, sz);
1559         ondisk_c += sz;
1560
1561         FIN_CRC32C(ondisk->checksum);
1562
1563         /* we have valid data now, open tempfile and write it there */
1564         fd = OpenTransientFile(tmppath,
1565                                                    O_CREAT | O_EXCL | O_WRONLY | PG_BINARY,
1566                                                    S_IRUSR | S_IWUSR);
1567         if (fd < 0)
1568                 ereport(ERROR,
1569                                 (errmsg("could not open file \"%s\": %m", path)));
1570
1571         if ((write(fd, ondisk, needed_length)) != needed_length)
1572         {
1573                 CloseTransientFile(fd);
1574                 ereport(ERROR,
1575                                 (errcode_for_file_access(),
1576                                  errmsg("could not write to file \"%s\": %m", tmppath)));
1577         }
1578
1579         /*
1580          * fsync the file before renaming so that even if we crash after this we
1581          * have either a fully valid file or nothing.
1582          *
1583          * TODO: Do the fsync() via checkpoints/restartpoints, doing it here has
1584          * some noticeable overhead since it's performed synchronously during
1585          * decoding?
1586          */
1587         if (pg_fsync(fd) != 0)
1588         {
1589                 CloseTransientFile(fd);
1590                 ereport(ERROR,
1591                                 (errcode_for_file_access(),
1592                                  errmsg("could not fsync file \"%s\": %m", tmppath)));
1593         }
1594         CloseTransientFile(fd);
1595
1596         fsync_fname("pg_logical/snapshots", true);
1597
1598         /*
1599          * We may overwrite the work from some other backend, but that's ok, our
1600          * snapshot is valid as well, we'll just have done some superfluous work.
1601          */
1602         if (rename(tmppath, path) != 0)
1603         {
1604                 ereport(ERROR,
1605                                 (errcode_for_file_access(),
1606                                  errmsg("could not rename file \"%s\" to \"%s\": %m",
1607                                                 tmppath, path)));
1608         }
1609
1610         /* make sure we persist */
1611         fsync_fname(path, false);
1612         fsync_fname("pg_logical/snapshots", true);
1613
1614         /*
1615          * Now there's no way we can loose the dumped state anymore, remember this
1616          * as a serialization point.
1617          */
1618         builder->last_serialized_snapshot = lsn;
1619
1620 out:
1621         ReorderBufferSetRestartPoint(builder->reorder,
1622                                                                  builder->last_serialized_snapshot);
1623 }
1624
1625 /*
1626  * Restore a snapshot into 'builder' if previously one has been stored at the
1627  * location indicated by 'lsn'. Returns true if successful, false otherwise.
1628  */
1629 static bool
1630 SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn)
1631 {
1632         SnapBuildOnDisk ondisk;
1633         int                     fd;
1634         char            path[MAXPGPATH];
1635         Size            sz;
1636         int                     readBytes;
1637         pg_crc32c       checksum;
1638
1639         /* no point in loading a snapshot if we're already there */
1640         if (builder->state == SNAPBUILD_CONSISTENT)
1641                 return false;
1642
1643         sprintf(path, "pg_logical/snapshots/%X-%X.snap",
1644                         (uint32) (lsn >> 32), (uint32) lsn);
1645
1646         fd = OpenTransientFile(path, O_RDONLY | PG_BINARY, 0);
1647
1648         if (fd < 0 && errno == ENOENT)
1649                 return false;
1650         else if (fd < 0)
1651                 ereport(ERROR,
1652                                 (errcode_for_file_access(),
1653                                  errmsg("could not open file \"%s\": %m", path)));
1654
1655         /* ----
1656          * Make sure the snapshot had been stored safely to disk, that's normally
1657          * cheap.
1658          * Note that we do not need PANIC here, nobody will be able to use the
1659          * slot without fsyncing, and saving it won't succeed without an fsync()
1660          * either...
1661          * ----
1662          */
1663         fsync_fname(path, false);
1664         fsync_fname("pg_logical/snapshots", true);
1665
1666
1667         /* read statically sized portion of snapshot */
1668         readBytes = read(fd, &ondisk, SnapBuildOnDiskConstantSize);
1669         if (readBytes != SnapBuildOnDiskConstantSize)
1670         {
1671                 CloseTransientFile(fd);
1672                 ereport(ERROR,
1673                                 (errcode_for_file_access(),
1674                                  errmsg("could not read file \"%s\", read %d of %d: %m",
1675                                                 path, readBytes, (int) SnapBuildOnDiskConstantSize)));
1676         }
1677
1678         if (ondisk.magic != SNAPBUILD_MAGIC)
1679                 ereport(ERROR,
1680                                 (errmsg("snapbuild state file \"%s\" has wrong magic number: %u instead of %u",
1681                                                 path, ondisk.magic, SNAPBUILD_MAGIC)));
1682
1683         if (ondisk.version != SNAPBUILD_VERSION)
1684                 ereport(ERROR,
1685                                 (errmsg("snapbuild state file \"%s\" has unsupported version: %u instead of %u",
1686                                                 path, ondisk.version, SNAPBUILD_VERSION)));
1687
1688         INIT_CRC32C(checksum);
1689         COMP_CRC32C(checksum,
1690                                 ((char *) &ondisk) + SnapBuildOnDiskNotChecksummedSize,
1691                         SnapBuildOnDiskConstantSize - SnapBuildOnDiskNotChecksummedSize);
1692
1693         /* read SnapBuild */
1694         readBytes = read(fd, &ondisk.builder, sizeof(SnapBuild));
1695         if (readBytes != sizeof(SnapBuild))
1696         {
1697                 CloseTransientFile(fd);
1698                 ereport(ERROR,
1699                                 (errcode_for_file_access(),
1700                                  errmsg("could not read file \"%s\", read %d of %d: %m",
1701                                                 path, readBytes, (int) sizeof(SnapBuild))));
1702         }
1703         COMP_CRC32C(checksum, &ondisk.builder, sizeof(SnapBuild));
1704
1705         /* restore running xacts information */
1706         sz = sizeof(TransactionId) * ondisk.builder.running.xcnt_space;
1707         ondisk.builder.running.xip = MemoryContextAllocZero(builder->context, sz);
1708         readBytes = read(fd, ondisk.builder.running.xip, sz);
1709         if (readBytes != sz)
1710         {
1711                 CloseTransientFile(fd);
1712                 ereport(ERROR,
1713                                 (errcode_for_file_access(),
1714                                  errmsg("could not read file \"%s\", read %d of %d: %m",
1715                                                 path, readBytes, (int) sz)));
1716         }
1717         COMP_CRC32C(checksum, ondisk.builder.running.xip, sz);
1718
1719         /* restore committed xacts information */
1720         sz = sizeof(TransactionId) * ondisk.builder.committed.xcnt;
1721         ondisk.builder.committed.xip = MemoryContextAllocZero(builder->context, sz);
1722         readBytes = read(fd, ondisk.builder.committed.xip, sz);
1723         if (readBytes != sz)
1724         {
1725                 CloseTransientFile(fd);
1726                 ereport(ERROR,
1727                                 (errcode_for_file_access(),
1728                                  errmsg("could not read file \"%s\", read %d of %d: %m",
1729                                                 path, readBytes, (int) sz)));
1730         }
1731         COMP_CRC32C(checksum, ondisk.builder.committed.xip, sz);
1732
1733         CloseTransientFile(fd);
1734
1735         FIN_CRC32C(checksum);
1736
1737         /* verify checksum of what we've read */
1738         if (!EQ_CRC32C(checksum, ondisk.checksum))
1739                 ereport(ERROR,
1740                                 (errcode_for_file_access(),
1741                                  errmsg("checksum mismatch for snapbuild state file \"%s\": is %u, should be %u",
1742                                                 path, checksum, ondisk.checksum)));
1743
1744         /*
1745          * ok, we now have a sensible snapshot here, figure out if it has more
1746          * information than we have.
1747          */
1748
1749         /*
1750          * We are only interested in consistent snapshots for now, comparing
1751          * whether one incomplete snapshot is more "advanced" seems to be
1752          * unnecessarily complex.
1753          */
1754         if (ondisk.builder.state < SNAPBUILD_CONSISTENT)
1755                 goto snapshot_not_interesting;
1756
1757         /*
1758          * Don't use a snapshot that requires an xmin that we cannot guarantee to
1759          * be available.
1760          */
1761         if (TransactionIdPrecedes(ondisk.builder.xmin, builder->initial_xmin_horizon))
1762                 goto snapshot_not_interesting;
1763
1764
1765         /* ok, we think the snapshot is sensible, copy over everything important */
1766         builder->xmin = ondisk.builder.xmin;
1767         builder->xmax = ondisk.builder.xmax;
1768         builder->state = ondisk.builder.state;
1769
1770         builder->committed.xcnt = ondisk.builder.committed.xcnt;
1771         /* We only allocated/stored xcnt, not xcnt_space xids ! */
1772         /* don't overwrite preallocated xip, if we don't have anything here */
1773         if (builder->committed.xcnt > 0)
1774         {
1775                 pfree(builder->committed.xip);
1776                 builder->committed.xcnt_space = ondisk.builder.committed.xcnt;
1777                 builder->committed.xip = ondisk.builder.committed.xip;
1778         }
1779         ondisk.builder.committed.xip = NULL;
1780
1781         builder->running.xcnt = ondisk.builder.running.xcnt;
1782         if (builder->running.xip)
1783                 pfree(builder->running.xip);
1784         builder->running.xcnt_space = ondisk.builder.running.xcnt_space;
1785         builder->running.xip = ondisk.builder.running.xip;
1786
1787         /* our snapshot is not interesting anymore, build a new one */
1788         if (builder->snapshot != NULL)
1789         {
1790                 SnapBuildSnapDecRefcount(builder->snapshot);
1791         }
1792         builder->snapshot = SnapBuildBuildSnapshot(builder, InvalidTransactionId);
1793         SnapBuildSnapIncRefcount(builder->snapshot);
1794
1795         ReorderBufferSetRestartPoint(builder->reorder, lsn);
1796
1797         Assert(builder->state == SNAPBUILD_CONSISTENT);
1798
1799         ereport(LOG,
1800                         (errmsg("logical decoding found consistent point at %X/%X",
1801                                         (uint32) (lsn >> 32), (uint32) lsn),
1802                          errdetail("Logical decoding will begin using saved snapshot.")));
1803         return true;
1804
1805 snapshot_not_interesting:
1806         if (ondisk.builder.running.xip != NULL)
1807                 pfree(ondisk.builder.running.xip);
1808         if (ondisk.builder.committed.xip != NULL)
1809                 pfree(ondisk.builder.committed.xip);
1810         return false;
1811 }
1812
1813 /*
1814  * Remove all serialized snapshots that are not required anymore because no
1815  * slot can need them. This doesn't actually have to run during a checkpoint,
1816  * but it's a convenient point to schedule this.
1817  *
1818  * NB: We run this during checkpoints even if logical decoding is disabled so
1819  * we cleanup old slots at some point after it got disabled.
1820  */
1821 void
1822 CheckPointSnapBuild(void)
1823 {
1824         XLogRecPtr      cutoff;
1825         XLogRecPtr      redo;
1826         DIR                *snap_dir;
1827         struct dirent *snap_de;
1828         char            path[MAXPGPATH];
1829
1830         /*
1831          * We start of with a minimum of the last redo pointer. No new replication
1832          * slot will start before that, so that's a safe upper bound for removal.
1833          */
1834         redo = GetRedoRecPtr();
1835
1836         /* now check for the restart ptrs from existing slots */
1837         cutoff = ReplicationSlotsComputeLogicalRestartLSN();
1838
1839         /* don't start earlier than the restart lsn */
1840         if (redo < cutoff)
1841                 cutoff = redo;
1842
1843         snap_dir = AllocateDir("pg_logical/snapshots");
1844         while ((snap_de = ReadDir(snap_dir, "pg_logical/snapshots")) != NULL)
1845         {
1846                 uint32          hi;
1847                 uint32          lo;
1848                 XLogRecPtr      lsn;
1849                 struct stat statbuf;
1850
1851                 if (strcmp(snap_de->d_name, ".") == 0 ||
1852                         strcmp(snap_de->d_name, "..") == 0)
1853                         continue;
1854
1855                 snprintf(path, MAXPGPATH, "pg_logical/snapshots/%s", snap_de->d_name);
1856
1857                 if (lstat(path, &statbuf) == 0 && !S_ISREG(statbuf.st_mode))
1858                 {
1859                         elog(DEBUG1, "only regular files expected: %s", path);
1860                         continue;
1861                 }
1862
1863                 /*
1864                  * temporary filenames from SnapBuildSerialize() include the LSN and
1865                  * everything but are postfixed by .$pid.tmp. We can just remove them
1866                  * the same as other files because there can be none that are
1867                  * currently being written that are older than cutoff.
1868                  *
1869                  * We just log a message if a file doesn't fit the pattern, it's
1870                  * probably some editors lock/state file or similar...
1871                  */
1872                 if (sscanf(snap_de->d_name, "%X-%X.snap", &hi, &lo) != 2)
1873                 {
1874                         ereport(LOG,
1875                                         (errmsg("could not parse file name \"%s\"", path)));
1876                         continue;
1877                 }
1878
1879                 lsn = ((uint64) hi) << 32 | lo;
1880
1881                 /* check whether we still need it */
1882                 if (lsn < cutoff || cutoff == InvalidXLogRecPtr)
1883                 {
1884                         elog(DEBUG1, "removing snapbuild snapshot %s", path);
1885
1886                         /*
1887                          * It's not particularly harmful, though strange, if we can't
1888                          * remove the file here. Don't prevent the checkpoint from
1889                          * completing, that'd be cure worse than the disease.
1890                          */
1891                         if (unlink(path) < 0)
1892                         {
1893                                 ereport(LOG,
1894                                                 (errcode_for_file_access(),
1895                                                  errmsg("could not remove file \"%s\": %m",
1896                                                                 path)));
1897                                 continue;
1898                         }
1899                 }
1900         }
1901         FreeDir(snap_dir);
1902 }