]> granicus.if.org Git - postgresql/commitdiff
Fix race condition leading to hanging logical slot creation.
authorAndres Freund <andres@anarazel.de>
Sat, 13 May 2017 21:21:00 +0000 (14:21 -0700)
committerAndres Freund <andres@anarazel.de>
Sat, 13 May 2017 21:21:00 +0000 (14:21 -0700)
The snapshot assembly during the creation of logical slots relied
waiting for transactions in xl_running_xacts to end, by checking for
their commit/abort records.  Unfortunately, despite locking, it is
possible to see an xl_running_xact record listing transactions as
ready, that have already WAL-logged an commit/abort record, as the
locking just prevents the ProcArray to be adjusted, and the commit
record has to be logged first.

That lead to either delayed or hanging snapshot creation, because
snapbuild.c would wait "forever" to see commit/abort records for some
transactions.  That hang resolved only if a xl_running_xacts record
without any running transactions happened to be logged, far from
certain on a busy server.

It's impractical to prevent that via more heavyweight locking, the
likelihood of deadlocks and significantly increased contention would
be too big.

Instead change the initial snapshot creation to be solely based on
tracking the oldest running transaction via
xl_running_xacts->oldestRunningXid - that actually ends up
significantly simplifying the code.  That has two disadvantages:
1) Because we cannot fully "trust" the contents of xl_running_xacts,
   we cannot use it to build the initial snapshot.  Instead we have to
   wait twice for all running transactions to finish.
2) Previously a slot, unless the race occurred, could be created when
   the all transaction perceived as running based on commit/abort
   records, now we have to wait for the next xl_running_xacts record.
To address that, trigger logging new xl_running_xacts record from
within snapbuild.c exactly when necessary.

Unfortunately snabuild.c's SnapBuild is stored on disk, one of the
stupider ideas of a certain Mr Freund, so we can't change it in a
minor release.  As this is going to be backpatched, we have to hack
around a bit to keep on-disk compatibility.  A later commit will
rejigger that on master.

Author: Andres Freund, based on a quite different patch from Petr Jelinek
Analyzed-By: Petr Jelinek
Reviewed-By: Petr Jelinek
Discussion: https://postgr.es/m/f37e975c-908f-858e-707f-058d3b1eb214@2ndquadrant.com
Backpatch: 9.4-, where logical decoding has been introduced

contrib/test_decoding/expected/ondisk_startup.out
contrib/test_decoding/specs/ondisk_startup.spec
src/backend/replication/logical/decode.c
src/backend/replication/logical/reorderbuffer.c
src/backend/replication/logical/snapbuild.c
src/include/replication/snapbuild.h

index 65115c830a4952961d2b4c60b7a1b2b962481cd7..c7b1f45b46b53dc4e092de401f07614f5d48d755 100644 (file)
@@ -1,21 +1,30 @@
 Parsed test spec with 3 sessions
 
-starting permutation: s2txid s1init s3txid s2alter s2c s1insert s1checkpoint s1start s1insert s1alter s1insert s1start
-step s2txid: BEGIN ISOLATION LEVEL REPEATABLE READ; SELECT txid_current() IS NULL;
+starting permutation: s2b s2txid s1init s3b s3txid s2alter s2c s2b s2txid s3c s2c s1insert s1checkpoint s1start s1insert s1alter s1insert s1start
+step s2b: BEGIN;
+step s2txid: SELECT txid_current() IS NULL;
 ?column?       
 
 f              
 step s1init: SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding'); <waiting ...>
-step s3txid: BEGIN ISOLATION LEVEL REPEATABLE READ; SELECT txid_current() IS NULL;
+step s3b: BEGIN;
+step s3txid: SELECT txid_current() IS NULL;
 ?column?       
 
 f              
 step s2alter: ALTER TABLE do_write ADD COLUMN addedbys2 int;
 step s2c: COMMIT;
+step s2b: BEGIN;
+step s2txid: SELECT txid_current() IS NULL;
+?column?       
+
+f              
+step s3c: COMMIT;
 step s1init: <... completed>
 ?column?       
 
 init           
+step s2c: COMMIT;
 step s1insert: INSERT INTO do_write DEFAULT VALUES;
 step s1checkpoint: CHECKPOINT;
 step s1start: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', 'false');
index 8223705639200584ac58243daacbf4779d31ac21..12c57a813da2b8ed1e3a0e49301a23f2216d56a2 100644 (file)
@@ -24,7 +24,8 @@ step "s1alter" { ALTER TABLE do_write ADD COLUMN addedbys1 int; }
 session "s2"
 setup { SET synchronous_commit=on; }
 
-step "s2txid" { BEGIN ISOLATION LEVEL REPEATABLE READ; SELECT txid_current() IS NULL; }
+step "s2b" { BEGIN; }
+step "s2txid" { SELECT txid_current() IS NULL; }
 step "s2alter" { ALTER TABLE do_write ADD COLUMN addedbys2 int; }
 step "s2c" { COMMIT; }
 
@@ -32,7 +33,8 @@ step "s2c" { COMMIT; }
 session "s3"
 setup { SET synchronous_commit=on; }
 
-step "s3txid" { BEGIN ISOLATION LEVEL REPEATABLE READ; SELECT txid_current() IS NULL; }
+step "s3b" { BEGIN; }
+step "s3txid" { SELECT txid_current() IS NULL; }
 step "s3c" { COMMIT; }
 
 # Force usage of ondisk snapshot by starting and not finishing a
@@ -40,4 +42,4 @@ step "s3c" { COMMIT; }
 # reached. In combination with a checkpoint forcing a snapshot to be
 # written and a new restart point computed that'll lead to the usage
 # of the snapshot.
-permutation "s2txid" "s1init" "s3txid" "s2alter" "s2c" "s1insert" "s1checkpoint" "s1start" "s1insert" "s1alter" "s1insert" "s1start"
+permutation "s2b" "s2txid" "s1init" "s3b" "s3txid" "s2alter" "s2c" "s2b" "s2txid" "s3c" "s2c" "s1insert" "s1checkpoint" "s1start" "s1insert" "s1alter" "s1insert" "s1start"
index 5c13d2609989db7b496c66204bc9672e7515a212..68825ef5981e599ad4eda78cc47412a067d3b2c3 100644 (file)
@@ -622,9 +622,6 @@ DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
 {
        int                     i;
 
-       SnapBuildAbortTxn(ctx->snapshot_builder, buf->record->EndRecPtr, xid,
-                                         parsed->nsubxacts, parsed->subxacts);
-
        for (i = 0; i < parsed->nsubxacts; i++)
        {
                ReorderBufferAbort(ctx->reorder, parsed->subxacts[i],
index 0c174be8ed78334380d22e16b844a9865dca961b..9d882dac567763af474867990cb525d358ff157e 100644 (file)
@@ -1725,7 +1725,7 @@ ReorderBufferAbortOld(ReorderBuffer *rb, TransactionId oldestRunningXid)
 
                if (TransactionIdPrecedes(txn->xid, oldestRunningXid))
                {
-                       elog(DEBUG1, "aborting old transaction %u", txn->xid);
+                       elog(DEBUG2, "aborting old transaction %u", txn->xid);
 
                        /* remove potential on-disk data, and deallocate this tx */
                        ReorderBufferCleanupTXN(rb, txn);
index 068d214fa1273a79e7a24007538d43238d265a7e..0f2dcb84befb581eafe30737a560f64fcd06dad2 100644 (file)
  *
  *
  * The snapbuild machinery is starting up in several stages, as illustrated
- * by the following graph:
+ * by the following graph describing the SnapBuild->state transitions:
+ *
  *                +-------------------------+
- *       +----|SNAPBUILD_START                  |-------------+
+ *       +----|         START                   |-------------+
  *       |    +-------------------------+                         |
  *       |                                     |                                                  |
  *       |                                     |                                                  |
- *       |             running_xacts with running xacts           |
+ *       |                running_xacts #1                                        |
  *       |                                     |                                                  |
  *       |                                     |                                                  |
  *       |                                     v                                                  |
  *       |    +-------------------------+                         v
- *       |    |SNAPBUILD_FULL_SNAPSHOT  |------------>|
+ *       |    |   BUILDING_SNAPSHOT     |------------>|
  *       |    +-------------------------+                         |
+ *       |                                     |                                                  |
+ *       |                                     |                                                  |
+ *       |     running_xacts #2, xacts from #1 finished   |
+ *       |                                     |                                                  |
+ *       |                                     |                                                  |
+ *       |                                     v                                                  |
+ *       |    +-------------------------+                         v
+ *       |    |       FULL_SNAPSHOT     |------------>|
+ *       |    +-------------------------+                         |
+ *       |                                     |                                                  |
  * running_xacts               |                                          saved snapshot
  * with zero xacts             |                                 at running_xacts's lsn
  *       |                                     |                                                  |
- *       |             all running toplevel TXNs finished         |
+ *       |     running_xacts with xacts from #2 finished  |
  *       |                                     |                                                  |
  *       |                                     v                                                  |
  *       |    +-------------------------+                         |
@@ -83,7 +94,7 @@
  * record is read that is sufficiently new (above the safe xmin horizon),
  * there's a state transition. If there were no running xacts when the
  * running_xacts record was generated, we'll directly go into CONSISTENT
- * state, otherwise we'll switch to the FULL_SNAPSHOT state. Having a full
+ * state, otherwise we'll switch to the BUILDING_SNAPSHOT state. Having a full
  * snapshot means that all transactions that start henceforth can be decoded
  * in their entirety, but transactions that started previously can't. In
  * FULL_SNAPSHOT we'll switch into CONSISTENT once all those previously
@@ -184,26 +195,24 @@ struct SnapBuild
        ReorderBuffer *reorder;
 
        /*
-        * Information about initially running transactions
-        *
-        * When we start building a snapshot there already may be transactions in
-        * progress.  Those are stored in running.xip.  We don't have enough
-        * information about those to decode their contents, so until they are
-        * finished (xcnt=0) we cannot switch to a CONSISTENT state.
+        * Outdated: This struct isn't used for its original purpose anymore, but
+        * can't be removed / changed in a minor version, because it's stored
+        * on-disk.
         */
        struct
        {
                /*
-                * As long as running.xcnt all XIDs < running.xmin and > running.xmax
-                * have to be checked whether they still are running.
+                * NB: This field is misused, until a major version can break on-disk
+                * compatibility. See SnapBuildNextPhaseAt() /
+                * SnapBuildStartNextPhaseAt().
                 */
-               TransactionId xmin;
-               TransactionId xmax;
+               TransactionId was_xmin;
+               TransactionId was_xmax;
 
-               size_t          xcnt;           /* number of used xip entries */
-               size_t          xcnt_space; /* allocated size of xip */
-               TransactionId *xip;             /* running xacts array, xidComparator-sorted */
-       }                       running;
+               size_t          was_xcnt;               /* number of used xip entries */
+               size_t          was_xcnt_space; /* allocated size of xip */
+               TransactionId *was_xip;         /* running xacts array, xidComparator-sorted */
+       }                       was_running;
 
        /*
         * Array of transactions which could have catalog changes that committed
@@ -249,12 +258,6 @@ struct SnapBuild
 static ResourceOwner SavedResourceOwnerDuringExport = NULL;
 static bool ExportInProgress = false;
 
-/* transaction state manipulation functions */
-static void SnapBuildEndTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid);
-
-/* ->running manipulation */
-static bool SnapBuildTxnIsRunning(SnapBuild *builder, TransactionId xid);
-
 /* ->committed manipulation */
 static void SnapBuildPurgeCommittedTxn(SnapBuild *builder);
 
@@ -269,11 +272,39 @@ static void SnapBuildDistributeNewCatalogSnapshot(SnapBuild *builder, XLogRecPtr
 
 /* xlog reading helper functions for SnapBuildProcessRecord */
 static bool SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *running);
+static void SnapBuildWaitSnapshot(xl_running_xacts *running, TransactionId cutoff);
 
 /* serialization functions */
 static void SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn);
 static bool SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn);
 
+/*
+ * Return TransactionId after which the next phase of initial snapshot
+ * building will happen.
+ */
+static inline TransactionId
+SnapBuildNextPhaseAt(SnapBuild *builder)
+{
+       /*
+        * For backward compatibility reasons this has to be stored in the wrongly
+        * named field.  Will be fixed in next major version.
+        */
+       return builder->was_running.was_xmax;
+}
+
+/*
+ * Set TransactionId after which the next phase of initial snapshot building
+ * will happen.
+ */
+static inline void
+SnapBuildStartNextPhaseAt(SnapBuild *builder, TransactionId at)
+{
+       /*
+        * For backward compatibility reasons this has to be stored in the wrongly
+        * named field.  Will be fixed in next major version.
+        */
+       builder->was_running.was_xmax = at;
+}
 
 /*
  * Allocate a new snapshot builder.
@@ -700,7 +731,7 @@ SnapBuildProcessChange(SnapBuild *builder, TransactionId xid, XLogRecPtr lsn)
         * we got into the SNAPBUILD_FULL_SNAPSHOT state.
         */
        if (builder->state < SNAPBUILD_CONSISTENT &&
-               SnapBuildTxnIsRunning(builder, xid))
+               TransactionIdPrecedes(xid, SnapBuildNextPhaseAt(builder)))
                return false;
 
        /*
@@ -768,38 +799,6 @@ SnapBuildProcessNewCid(SnapBuild *builder, TransactionId xid,
        ReorderBufferAddNewCommandId(builder->reorder, xid, lsn, cid + 1);
 }
 
-/*
- * Check whether `xid` is currently 'running'.
- *
- * Running transactions in our parlance are transactions which we didn't
- * observe from the start so we can't properly decode their contents. They
- * only exist after we freshly started from an < CONSISTENT snapshot.
- */
-static bool
-SnapBuildTxnIsRunning(SnapBuild *builder, TransactionId xid)
-{
-       Assert(builder->state < SNAPBUILD_CONSISTENT);
-       Assert(TransactionIdIsNormal(builder->running.xmin));
-       Assert(TransactionIdIsNormal(builder->running.xmax));
-
-       if (builder->running.xcnt &&
-               NormalTransactionIdFollows(xid, builder->running.xmin) &&
-               NormalTransactionIdPrecedes(xid, builder->running.xmax))
-       {
-               TransactionId *search =
-               bsearch(&xid, builder->running.xip, builder->running.xcnt_space,
-                               sizeof(TransactionId), xidComparator);
-
-               if (search != NULL)
-               {
-                       Assert(*search == xid);
-                       return true;
-               }
-       }
-
-       return false;
-}
-
 /*
  * Add a new Snapshot to all transactions we're decoding that currently are
  * in-progress so they can see new catalog contents made by the transaction
@@ -921,63 +920,6 @@ SnapBuildPurgeCommittedTxn(SnapBuild *builder)
        pfree(workspace);
 }
 
-/*
- * Common logic for SnapBuildAbortTxn and SnapBuildCommitTxn dealing with
- * keeping track of the amount of running transactions.
- */
-static void
-SnapBuildEndTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid)
-{
-       if (builder->state == SNAPBUILD_CONSISTENT)
-               return;
-
-       /*
-        * NB: This handles subtransactions correctly even if we started from
-        * suboverflowed xl_running_xacts because we only keep track of toplevel
-        * transactions. Since the latter are always allocated before their
-        * subxids and since they end at the same time it's sufficient to deal
-        * with them here.
-        */
-       if (SnapBuildTxnIsRunning(builder, xid))
-       {
-               Assert(builder->running.xcnt > 0);
-
-               if (!--builder->running.xcnt)
-               {
-                       /*
-                        * None of the originally running transaction is running anymore,
-                        * so our incrementally built snapshot now is consistent.
-                        */
-                       ereport(LOG,
-                                 (errmsg("logical decoding found consistent point at %X/%X",
-                                                 (uint32) (lsn >> 32), (uint32) lsn),
-                                  errdetail("Transaction ID %u finished; no more running transactions.",
-                                                        xid)));
-                       builder->state = SNAPBUILD_CONSISTENT;
-               }
-       }
-}
-
-/*
- * Abort a transaction, throw away all state we kept.
- */
-void
-SnapBuildAbortTxn(SnapBuild *builder, XLogRecPtr lsn,
-                                 TransactionId xid,
-                                 int nsubxacts, TransactionId *subxacts)
-{
-       int                     i;
-
-       for (i = 0; i < nsubxacts; i++)
-       {
-               TransactionId subxid = subxacts[i];
-
-               SnapBuildEndTxn(builder, lsn, subxid);
-       }
-
-       SnapBuildEndTxn(builder, lsn, xid);
-}
-
 /*
  * Handle everything that needs to be done when a transaction commits
  */
@@ -1021,11 +963,6 @@ SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid,
        {
                TransactionId subxid = subxacts[nxact];
 
-               /*
-                * make sure txn is not tracked in running txn's anymore, switch state
-                */
-               SnapBuildEndTxn(builder, lsn, subxid);
-
                /*
                 * If we're forcing timetravel we also need visibility information
                 * about subtransaction, so keep track of subtransaction's state.
@@ -1055,12 +992,6 @@ SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid,
                }
        }
 
-       /*
-        * Make sure toplevel txn is not tracked in running txn's anymore, switch
-        * state to consistent if possible.
-        */
-       SnapBuildEndTxn(builder, lsn, xid);
-
        if (forced_timetravel)
        {
                elog(DEBUG2, "forced transaction %u to do timetravel.", xid);
@@ -1250,25 +1181,20 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn
         *
         * a) There were no running transactions when the xl_running_xacts record
         *        was inserted, jump to CONSISTENT immediately. We might find such a
-        *        state we were waiting for b) or c).
-        *
-        * b) Wait for all toplevel transactions that were running to end. We
-        *        simply track the number of in-progress toplevel transactions and
-        *        lower it whenever one commits or aborts. When that number
-        *        (builder->running.xcnt) reaches zero, we can go from FULL_SNAPSHOT
-        *        to CONSISTENT.
-        *        NB: We need to search running.xip when seeing a transaction's end to
-        *        make sure it's a toplevel transaction and it's been one of the
-        *        initially running ones.
-        *        Interestingly, in contrast to HS, this allows us not to care about
-        *        subtransactions - and by extension suboverflowed xl_running_xacts -
-        *        at all.
+        *        state while waiting on c)'s sub-states.
         *
-        * c) This (in a previous run) or another decoding slot serialized a
+        * b) This (in a previous run) or another decoding slot serialized a
         *        snapshot to disk that we can use.  Can't use this method for the
         *        initial snapshot when slot is being created and needs full snapshot
         *        for export or direct use, as that snapshot will only contain catalog
         *        modifying transactions.
+        *
+        * c) First incrementally build a snapshot for catalog tuples
+        *    (BUILDING_SNAPSHOT), that requires all, already in-progress,
+        *    transactions to finish.  Every transaction starting after that
+        *    (FULL_SNAPSHOT state), has enough information to be decoded.  But
+        *    for older running transactions no viable snapshot exists yet, so
+        *    CONSISTENT will only be reached once all of those have finished.
         * ---
         */
 
@@ -1285,16 +1211,23 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn
                                                                 (uint32) (lsn >> 32), (uint32) lsn),
                errdetail_internal("initial xmin horizon of %u vs the snapshot's %u",
                                 builder->initial_xmin_horizon, running->oldestRunningXid)));
+
+
+               SnapBuildWaitSnapshot(running, builder->initial_xmin_horizon);
+
                return true;
        }
 
        /*
         * a) No transaction were running, we can jump to consistent.
         *
+        * This is not affected by races around xl_running_xacts, because we can
+        * miss transaction commits, but currently not transactions starting.
+        *
         * NB: We might have already started to incrementally assemble a snapshot,
         * so we need to be careful to deal with that.
         */
-       if (running->xcnt == 0)
+       if (running->oldestRunningXid == running->nextXid)
        {
                if (builder->start_decoding_at == InvalidXLogRecPtr ||
                        builder->start_decoding_at <= lsn)
@@ -1309,12 +1242,8 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn
                Assert(TransactionIdIsNormal(builder->xmin));
                Assert(TransactionIdIsNormal(builder->xmax));
 
-               /* no transactions running now */
-               builder->running.xcnt = 0;
-               builder->running.xmin = InvalidTransactionId;
-               builder->running.xmax = InvalidTransactionId;
-
                builder->state = SNAPBUILD_CONSISTENT;
+               SnapBuildStartNextPhaseAt(builder, InvalidTransactionId);
 
                ereport(LOG,
                                (errmsg("logical decoding found consistent point at %X/%X",
@@ -1323,30 +1252,29 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn
 
                return false;
        }
-       /* c) valid on disk state and not building full snapshot */
+       /* b) valid on disk state and not building full snapshot */
        else if (!builder->building_full_snapshot &&
                         SnapBuildRestore(builder, lsn))
        {
                /* there won't be any state to cleanup */
                return false;
        }
-
        /*
-        * b) first encounter of a useable xl_running_xacts record. If we had
-        * found one earlier we would either track running transactions (i.e.
-        * builder->running.xcnt != 0) or be consistent (this function wouldn't
-        * get called).
+        * c) transition from START to BUILDING_SNAPSHOT.
+        *
+        * In START state, and a xl_running_xacts record with running xacts is
+        * encountered.  In that case, switch to BUILDING_SNAPSHOT state, and
+        * record xl_running_xacts->nextXid.  Once all running xacts have finished
+        * (i.e. they're all >= nextXid), we have a complete catalog snapshot.  It
+        * might look that we could use xl_running_xact's ->xids information to
+        * get there quicker, but that is problematic because transactions marked
+        * as running, might already have inserted their commit record - it's
+        * infeasible to change that with locking.
         */
-       else if (!builder->running.xcnt)
+       else if (builder->state == SNAPBUILD_START)
        {
-               int                     off;
-
-               /*
-                * We only care about toplevel xids as those are the ones we
-                * definitely see in the wal stream. As snapbuild.c tracks committed
-                * instead of running transactions we don't need to know anything
-                * about uncommitted subtransactions.
-                */
+               builder->state = SNAPBUILD_BUILDING_SNAPSHOT;
+               SnapBuildStartNextPhaseAt(builder, running->nextXid);
 
                /*
                 * Start with an xmin/xmax that's correct for future, when all the
@@ -1360,59 +1288,57 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn
                Assert(TransactionIdIsNormal(builder->xmin));
                Assert(TransactionIdIsNormal(builder->xmax));
 
-               builder->running.xcnt = running->xcnt;
-               builder->running.xcnt_space = running->xcnt;
-               builder->running.xip =
-                       MemoryContextAlloc(builder->context,
-                                                          builder->running.xcnt * sizeof(TransactionId));
-               memcpy(builder->running.xip, running->xids,
-                          builder->running.xcnt * sizeof(TransactionId));
-
-               /* sort so we can do a binary search */
-               qsort(builder->running.xip, builder->running.xcnt,
-                         sizeof(TransactionId), xidComparator);
-
-               builder->running.xmin = builder->running.xip[0];
-               builder->running.xmax = builder->running.xip[running->xcnt - 1];
-
-               /* makes comparisons cheaper later */
-               TransactionIdRetreat(builder->running.xmin);
-               TransactionIdAdvance(builder->running.xmax);
-
-               builder->state = SNAPBUILD_FULL_SNAPSHOT;
-
                ereport(LOG,
                        (errmsg("logical decoding found initial starting point at %X/%X",
                                        (uint32) (lsn >> 32), (uint32) lsn),
-                        errdetail_plural("%u transaction needs to finish.",
-                                                         "%u transactions need to finish.",
-                                                         builder->running.xcnt,
-                                                         (uint32) builder->running.xcnt)));
+                        errdetail("Waiting for transactions (approximately %d) older than %u to end.",
+                                          running->xcnt, running->nextXid)));
 
-               /*
-                * Iterate through all xids, wait for them to finish.
-                *
-                * This isn't required for the correctness of decoding, but to allow
-                * isolationtester to notice that we're currently waiting for
-                * something.
-                */
-               for (off = 0; off < builder->running.xcnt; off++)
-               {
-                       TransactionId xid = builder->running.xip[off];
+               SnapBuildWaitSnapshot(running, running->nextXid);
+       }
+       /*
+        * c) transition from BUILDING_SNAPSHOT to FULL_SNAPSHOT.
+        *
+        * In BUILDING_SNAPSHOT state, and this xl_running_xacts' oldestRunningXid
+        * is >= than nextXid from when we switched to BUILDING_SNAPSHOT.  This
+        * means all transactions starting afterwards have enough information to
+        * be decoded.  Switch to FULL_SNAPSHOT.
+        */
+       else if (builder->state == SNAPBUILD_BUILDING_SNAPSHOT &&
+                        TransactionIdPrecedesOrEquals(SnapBuildNextPhaseAt(builder),
+                                                                                  running->oldestRunningXid))
+       {
+               builder->state = SNAPBUILD_FULL_SNAPSHOT;
+               SnapBuildStartNextPhaseAt(builder, running->nextXid);
 
-                       /*
-                        * Upper layers should prevent that we ever need to wait on
-                        * ourselves. Check anyway, since failing to do so would either
-                        * result in an endless wait or an Assert() failure.
-                        */
-                       if (TransactionIdIsCurrentTransactionId(xid))
-                               elog(ERROR, "waiting for ourselves");
+               ereport(LOG,
+                               (errmsg("logical decoding found initial consistent point at %X/%X",
+                                               (uint32) (lsn >> 32), (uint32) lsn),
+                                errdetail("Waiting for transactions (approximately %d) older than %u to end.",
+                                                  running->xcnt, running->nextXid)));
 
-                       XactLockTableWait(xid, NULL, NULL, XLTW_None);
-               }
+               SnapBuildWaitSnapshot(running, running->nextXid);
+       }
+       /*
+        * c) transition from FULL_SNAPSHOT to CONSISTENT.
+        *
+        * In FULL_SNAPSHOT state (see d) ), and this xl_running_xacts'
+        * oldestRunningXid is >= than nextXid from when we switched to
+        * FULL_SNAPSHOT.  This means all transactions that are currently in
+        * progress have a catalog snapshot, and all their changes have been
+        * collected.  Switch to CONSISTENT.
+        */
+       else if (builder->state == SNAPBUILD_FULL_SNAPSHOT &&
+                        TransactionIdPrecedesOrEquals(SnapBuildNextPhaseAt(builder),
+                                                                                  running->oldestRunningXid))
+       {
+               builder->state = SNAPBUILD_CONSISTENT;
+               SnapBuildStartNextPhaseAt(builder, InvalidTransactionId);
 
-               /* nothing could have built up so far, so don't perform cleanup */
-               return false;
+               ereport(LOG,
+                               (errmsg("logical decoding found consistent point at %X/%X",
+                                               (uint32) (lsn >> 32), (uint32) lsn),
+                                errdetail("There are no old transactions anymore.")));
        }
 
        /*
@@ -1421,8 +1347,54 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn
         * records so incremental cleanup can be performed.
         */
        return true;
+
 }
 
+/* ---
+ * Iterate through xids in record, wait for all older than the cutoff to
+ * finish.  Then, if possible, log a new xl_running_xacts record.
+ *
+ * This isn't required for the correctness of decoding, but to:
+ * a) allow isolationtester to notice that we're currently waiting for
+ *    something.
+ * b) log a new xl_running_xacts record where it'd be helpful, without having
+ *    to write for bgwriter or checkpointer.
+ * ---
+ */
+static void
+SnapBuildWaitSnapshot(xl_running_xacts *running, TransactionId cutoff)
+{
+       int                     off;
+
+       for (off = 0; off < running->xcnt; off++)
+       {
+               TransactionId xid = running->xids[off];
+
+               /*
+                * Upper layers should prevent that we ever need to wait on
+                * ourselves. Check anyway, since failing to do so would either
+                * result in an endless wait or an Assert() failure.
+                */
+               if (TransactionIdIsCurrentTransactionId(xid))
+                       elog(ERROR, "waiting for ourselves");
+
+               if (TransactionIdFollows(xid, cutoff))
+                       continue;
+
+               XactLockTableWait(xid, NULL, NULL, XLTW_None);
+       }
+
+       /*
+        * All transactions we needed to finish finished - try to ensure there is
+        * another xl_running_xacts record in a timely manner, without having to
+        * write for bgwriter or checkpointer to log one.  During recovery we
+        * can't enforce that, so we'll have to wait.
+        */
+       if (!RecoveryInProgress())
+       {
+               LogStandbySnapshot();
+       }
+}
 
 /* -----------------------------------
  * Snapshot serialization support
@@ -1572,7 +1544,6 @@ SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn)
                                 errmsg("could not remove file \"%s\": %m", path)));
 
        needed_length = sizeof(SnapBuildOnDisk) +
-               sizeof(TransactionId) * builder->running.xcnt_space +
                sizeof(TransactionId) * builder->committed.xcnt;
 
        ondisk_c = MemoryContextAllocZero(builder->context, needed_length);
@@ -1591,18 +1562,14 @@ SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn)
        ondisk->builder.context = NULL;
        ondisk->builder.snapshot = NULL;
        ondisk->builder.reorder = NULL;
-       ondisk->builder.running.xip = NULL;
        ondisk->builder.committed.xip = NULL;
 
        COMP_CRC32C(ondisk->checksum,
                                &ondisk->builder,
                                sizeof(SnapBuild));
 
-       /* copy running xacts */
-       sz = sizeof(TransactionId) * builder->running.xcnt_space;
-       memcpy(ondisk_c, builder->running.xip, sz);
-       COMP_CRC32C(ondisk->checksum, ondisk_c, sz);
-       ondisk_c += sz;
+       /* there shouldn't be any running xacts */
+       Assert(builder->was_running.was_xcnt == 0);
 
        /* copy committed xacts */
        sz = sizeof(TransactionId) * builder->committed.xcnt;
@@ -1762,11 +1729,12 @@ SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn)
        }
        COMP_CRC32C(checksum, &ondisk.builder, sizeof(SnapBuild));
 
-       /* restore running xacts information */
-       sz = sizeof(TransactionId) * ondisk.builder.running.xcnt_space;
-       ondisk.builder.running.xip = MemoryContextAllocZero(builder->context, sz);
+       /* restore running xacts (dead, but kept for backward compat) */
+       sz = sizeof(TransactionId) * ondisk.builder.was_running.was_xcnt_space;
+       ondisk.builder.was_running.was_xip =
+               MemoryContextAllocZero(builder->context, sz);
        pgstat_report_wait_start(WAIT_EVENT_SNAPBUILD_READ);
-       readBytes = read(fd, ondisk.builder.running.xip, sz);
+       readBytes = read(fd, ondisk.builder.was_running.was_xip, sz);
        pgstat_report_wait_end();
        if (readBytes != sz)
        {
@@ -1776,7 +1744,7 @@ SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn)
                                 errmsg("could not read file \"%s\", read %d of %d: %m",
                                                path, readBytes, (int) sz)));
        }
-       COMP_CRC32C(checksum, ondisk.builder.running.xip, sz);
+       COMP_CRC32C(checksum, ondisk.builder.was_running.was_xip, sz);
 
        /* restore committed xacts information */
        sz = sizeof(TransactionId) * ondisk.builder.committed.xcnt;
@@ -1842,12 +1810,6 @@ SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn)
        }
        ondisk.builder.committed.xip = NULL;
 
-       builder->running.xcnt = ondisk.builder.running.xcnt;
-       if (builder->running.xip)
-               pfree(builder->running.xip);
-       builder->running.xcnt_space = ondisk.builder.running.xcnt_space;
-       builder->running.xip = ondisk.builder.running.xip;
-
        /* our snapshot is not interesting anymore, build a new one */
        if (builder->snapshot != NULL)
        {
@@ -1867,8 +1829,6 @@ SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn)
        return true;
 
 snapshot_not_interesting:
-       if (ondisk.builder.running.xip != NULL)
-               pfree(ondisk.builder.running.xip);
        if (ondisk.builder.committed.xip != NULL)
                pfree(ondisk.builder.committed.xip);
        return false;
index 494751d70ac46e8222bf2a1dc2c94c0a45280e54..ccb5f831c4421b7b578a08a3020f89f7ab80b694 100644 (file)
@@ -20,24 +20,30 @@ typedef enum
        /*
         * Initial state, we can't do much yet.
         */
-       SNAPBUILD_START,
+       SNAPBUILD_START = -1,
+
+       /*
+        * Collecting committed transactions, to build the initial catalog
+        * snapshot.
+        */
+       SNAPBUILD_BUILDING_SNAPSHOT = 0,
 
        /*
         * We have collected enough information to decode tuples in transactions
         * that started after this.
         *
         * Once we reached this we start to collect changes. We cannot apply them
-        * yet because the might be based on transactions that were still running
-        * when we reached them yet.
+        * yet, because they might be based on transactions that were still running
+        * when FULL_SNAPSHOT was reached.
         */
-       SNAPBUILD_FULL_SNAPSHOT,
+       SNAPBUILD_FULL_SNAPSHOT = 1,
 
        /*
-        * Found a point after hitting built_full_snapshot where all transactions
-        * that were running at that point finished. Till we reach that we hold
-        * off calling any commit callbacks.
+        * Found a point after SNAPBUILD_FULL_SNAPSHOT where all transactions that
+        * were running at that point finished. Till we reach that we hold off
+        * calling any commit callbacks.
         */
-       SNAPBUILD_CONSISTENT
+       SNAPBUILD_CONSISTENT = 2
 } SnapBuildState;
 
 /* forward declare so we don't have to expose the struct to the public */
@@ -73,9 +79,6 @@ extern bool SnapBuildXactNeedsSkip(SnapBuild *snapstate, XLogRecPtr ptr);
 extern void SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn,
                                   TransactionId xid, int nsubxacts,
                                   TransactionId *subxacts);
-extern void SnapBuildAbortTxn(SnapBuild *builder, XLogRecPtr lsn,
-                                 TransactionId xid, int nsubxacts,
-                                 TransactionId *subxacts);
 extern bool SnapBuildProcessChange(SnapBuild *builder, TransactionId xid,
                                           XLogRecPtr lsn);
 extern void SnapBuildProcessNewCid(SnapBuild *builder, TransactionId xid,