*
*
* The snapbuild machinery is starting up in several stages, as illustrated
- * by the following graph:
+ * by the following graph describing the SnapBuild->state transitions:
+ *
* +-------------------------+
- * +----|SNAPBUILD_START |-------------+
+ * +----| START |-------------+
* | +-------------------------+ |
* | | |
* | | |
- * | running_xacts with running xacts |
+ * | running_xacts #1 |
* | | |
* | | |
* | v |
* | +-------------------------+ v
- * | |SNAPBUILD_FULL_SNAPSHOT |------------>|
+ * | | BUILDING_SNAPSHOT |------------>|
* | +-------------------------+ |
+ * | | |
+ * | | |
+ * | running_xacts #2, xacts from #1 finished |
+ * | | |
+ * | | |
+ * | v |
+ * | +-------------------------+ v
+ * | | FULL_SNAPSHOT |------------>|
+ * | +-------------------------+ |
+ * | | |
* running_xacts | saved snapshot
* with zero xacts | at running_xacts's lsn
* | | |
- * | all running toplevel TXNs finished |
+ * | running_xacts with xacts from #2 finished |
* | | |
* | v |
* | +-------------------------+ |
* record is read that is sufficiently new (above the safe xmin horizon),
* there's a state transition. If there were no running xacts when the
* running_xacts record was generated, we'll directly go into CONSISTENT
- * state, otherwise we'll switch to the FULL_SNAPSHOT state. Having a full
+ * state, otherwise we'll switch to the BUILDING_SNAPSHOT state. Having a full
* snapshot means that all transactions that start henceforth can be decoded
* in their entirety, but transactions that started previously can't. In
* FULL_SNAPSHOT we'll switch into CONSISTENT once all those previously
ReorderBuffer *reorder;
/*
- * Information about initially running transactions
- *
- * When we start building a snapshot there already may be transactions in
- * progress. Those are stored in running.xip. We don't have enough
- * information about those to decode their contents, so until they are
- * finished (xcnt=0) we cannot switch to a CONSISTENT state.
+ * Outdated: This struct isn't used for its original purpose anymore, but
+ * can't be removed / changed in a minor version, because it's stored
+ * on-disk.
*/
struct
{
/*
- * As long as running.xcnt all XIDs < running.xmin and > running.xmax
- * have to be checked whether they still are running.
+ * NB: This field is misused, until a major version can break on-disk
+ * compatibility. See SnapBuildNextPhaseAt() /
+ * SnapBuildStartNextPhaseAt().
*/
- TransactionId xmin;
- TransactionId xmax;
+ TransactionId was_xmin;
+ TransactionId was_xmax;
- size_t xcnt; /* number of used xip entries */
- size_t xcnt_space; /* allocated size of xip */
- TransactionId *xip; /* running xacts array, xidComparator-sorted */
- } running;
+ size_t was_xcnt; /* number of used xip entries */
+ size_t was_xcnt_space; /* allocated size of xip */
+ TransactionId *was_xip; /* running xacts array, xidComparator-sorted */
+ } was_running;
/*
* Array of transactions which could have catalog changes that committed
static ResourceOwner SavedResourceOwnerDuringExport = NULL;
static bool ExportInProgress = false;
-/* transaction state manipulation functions */
-static void SnapBuildEndTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid);
-
-/* ->running manipulation */
-static bool SnapBuildTxnIsRunning(SnapBuild *builder, TransactionId xid);
-
/* ->committed manipulation */
static void SnapBuildPurgeCommittedTxn(SnapBuild *builder);
/* xlog reading helper functions for SnapBuildProcessRecord */
static bool SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *running);
+static void SnapBuildWaitSnapshot(xl_running_xacts *running, TransactionId cutoff);
/* serialization functions */
static void SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn);
static bool SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn);
+/*
+ * Return TransactionId after which the next phase of initial snapshot
+ * building will happen.
+ */
+static inline TransactionId
+SnapBuildNextPhaseAt(SnapBuild *builder)
+{
+ /*
+ * For backward compatibility reasons this has to be stored in the wrongly
+ * named field. Will be fixed in next major version.
+ */
+ return builder->was_running.was_xmax;
+}
+
+/*
+ * Set TransactionId after which the next phase of initial snapshot building
+ * will happen.
+ */
+static inline void
+SnapBuildStartNextPhaseAt(SnapBuild *builder, TransactionId at)
+{
+ /*
+ * For backward compatibility reasons this has to be stored in the wrongly
+ * named field. Will be fixed in next major version.
+ */
+ builder->was_running.was_xmax = at;
+}
/*
* Allocate a new snapshot builder.
* we got into the SNAPBUILD_FULL_SNAPSHOT state.
*/
if (builder->state < SNAPBUILD_CONSISTENT &&
- SnapBuildTxnIsRunning(builder, xid))
+ TransactionIdPrecedes(xid, SnapBuildNextPhaseAt(builder)))
return false;
/*
ReorderBufferAddNewCommandId(builder->reorder, xid, lsn, cid + 1);
}
-/*
- * Check whether `xid` is currently 'running'.
- *
- * Running transactions in our parlance are transactions which we didn't
- * observe from the start so we can't properly decode their contents. They
- * only exist after we freshly started from an < CONSISTENT snapshot.
- */
-static bool
-SnapBuildTxnIsRunning(SnapBuild *builder, TransactionId xid)
-{
- Assert(builder->state < SNAPBUILD_CONSISTENT);
- Assert(TransactionIdIsNormal(builder->running.xmin));
- Assert(TransactionIdIsNormal(builder->running.xmax));
-
- if (builder->running.xcnt &&
- NormalTransactionIdFollows(xid, builder->running.xmin) &&
- NormalTransactionIdPrecedes(xid, builder->running.xmax))
- {
- TransactionId *search =
- bsearch(&xid, builder->running.xip, builder->running.xcnt_space,
- sizeof(TransactionId), xidComparator);
-
- if (search != NULL)
- {
- Assert(*search == xid);
- return true;
- }
- }
-
- return false;
-}
-
/*
* Add a new Snapshot to all transactions we're decoding that currently are
* in-progress so they can see new catalog contents made by the transaction
pfree(workspace);
}
-/*
- * Common logic for SnapBuildAbortTxn and SnapBuildCommitTxn dealing with
- * keeping track of the amount of running transactions.
- */
-static void
-SnapBuildEndTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid)
-{
- if (builder->state == SNAPBUILD_CONSISTENT)
- return;
-
- /*
- * NB: This handles subtransactions correctly even if we started from
- * suboverflowed xl_running_xacts because we only keep track of toplevel
- * transactions. Since the latter are always allocated before their
- * subxids and since they end at the same time it's sufficient to deal
- * with them here.
- */
- if (SnapBuildTxnIsRunning(builder, xid))
- {
- Assert(builder->running.xcnt > 0);
-
- if (!--builder->running.xcnt)
- {
- /*
- * None of the originally running transaction is running anymore,
- * so our incrementally built snapshot now is consistent.
- */
- ereport(LOG,
- (errmsg("logical decoding found consistent point at %X/%X",
- (uint32) (lsn >> 32), (uint32) lsn),
- errdetail("Transaction ID %u finished; no more running transactions.",
- xid)));
- builder->state = SNAPBUILD_CONSISTENT;
- }
- }
-}
-
-/*
- * Abort a transaction, throw away all state we kept.
- */
-void
-SnapBuildAbortTxn(SnapBuild *builder, XLogRecPtr lsn,
- TransactionId xid,
- int nsubxacts, TransactionId *subxacts)
-{
- int i;
-
- for (i = 0; i < nsubxacts; i++)
- {
- TransactionId subxid = subxacts[i];
-
- SnapBuildEndTxn(builder, lsn, subxid);
- }
-
- SnapBuildEndTxn(builder, lsn, xid);
-}
-
/*
* Handle everything that needs to be done when a transaction commits
*/
{
TransactionId subxid = subxacts[nxact];
- /*
- * make sure txn is not tracked in running txn's anymore, switch state
- */
- SnapBuildEndTxn(builder, lsn, subxid);
-
/*
* If we're forcing timetravel we also need visibility information
* about subtransaction, so keep track of subtransaction's state.
}
}
- /*
- * Make sure toplevel txn is not tracked in running txn's anymore, switch
- * state to consistent if possible.
- */
- SnapBuildEndTxn(builder, lsn, xid);
-
if (forced_timetravel)
{
elog(DEBUG2, "forced transaction %u to do timetravel.", xid);
*
* a) There were no running transactions when the xl_running_xacts record
* was inserted, jump to CONSISTENT immediately. We might find such a
- * state we were waiting for b) or c).
- *
- * b) Wait for all toplevel transactions that were running to end. We
- * simply track the number of in-progress toplevel transactions and
- * lower it whenever one commits or aborts. When that number
- * (builder->running.xcnt) reaches zero, we can go from FULL_SNAPSHOT
- * to CONSISTENT.
- * NB: We need to search running.xip when seeing a transaction's end to
- * make sure it's a toplevel transaction and it's been one of the
- * initially running ones.
- * Interestingly, in contrast to HS, this allows us not to care about
- * subtransactions - and by extension suboverflowed xl_running_xacts -
- * at all.
+ * state while waiting on c)'s sub-states.
*
- * c) This (in a previous run) or another decoding slot serialized a
+ * b) This (in a previous run) or another decoding slot serialized a
* snapshot to disk that we can use. Can't use this method for the
* initial snapshot when slot is being created and needs full snapshot
* for export or direct use, as that snapshot will only contain catalog
* modifying transactions.
+ *
+ * c) First incrementally build a snapshot for catalog tuples
+ * (BUILDING_SNAPSHOT), that requires all, already in-progress,
+ * transactions to finish. Every transaction starting after that
+ * (FULL_SNAPSHOT state), has enough information to be decoded. But
+ * for older running transactions no viable snapshot exists yet, so
+ * CONSISTENT will only be reached once all of those have finished.
* ---
*/
(uint32) (lsn >> 32), (uint32) lsn),
errdetail_internal("initial xmin horizon of %u vs the snapshot's %u",
builder->initial_xmin_horizon, running->oldestRunningXid)));
+
+
+ SnapBuildWaitSnapshot(running, builder->initial_xmin_horizon);
+
return true;
}
/*
* a) No transaction were running, we can jump to consistent.
*
+ * This is not affected by races around xl_running_xacts, because we can
+ * miss transaction commits, but currently not transactions starting.
+ *
* NB: We might have already started to incrementally assemble a snapshot,
* so we need to be careful to deal with that.
*/
- if (running->xcnt == 0)
+ if (running->oldestRunningXid == running->nextXid)
{
if (builder->start_decoding_at == InvalidXLogRecPtr ||
builder->start_decoding_at <= lsn)
Assert(TransactionIdIsNormal(builder->xmin));
Assert(TransactionIdIsNormal(builder->xmax));
- /* no transactions running now */
- builder->running.xcnt = 0;
- builder->running.xmin = InvalidTransactionId;
- builder->running.xmax = InvalidTransactionId;
-
builder->state = SNAPBUILD_CONSISTENT;
+ SnapBuildStartNextPhaseAt(builder, InvalidTransactionId);
ereport(LOG,
(errmsg("logical decoding found consistent point at %X/%X",
return false;
}
- /* c) valid on disk state and not building full snapshot */
+ /* b) valid on disk state and not building full snapshot */
else if (!builder->building_full_snapshot &&
SnapBuildRestore(builder, lsn))
{
/* there won't be any state to cleanup */
return false;
}
-
/*
- * b) first encounter of a useable xl_running_xacts record. If we had
- * found one earlier we would either track running transactions (i.e.
- * builder->running.xcnt != 0) or be consistent (this function wouldn't
- * get called).
+ * c) transition from START to BUILDING_SNAPSHOT.
+ *
+ * In START state, and a xl_running_xacts record with running xacts is
+ * encountered. In that case, switch to BUILDING_SNAPSHOT state, and
+ * record xl_running_xacts->nextXid. Once all running xacts have finished
+ * (i.e. they're all >= nextXid), we have a complete catalog snapshot. It
+ * might look that we could use xl_running_xact's ->xids information to
+ * get there quicker, but that is problematic because transactions marked
+ * as running, might already have inserted their commit record - it's
+ * infeasible to change that with locking.
*/
- else if (!builder->running.xcnt)
+ else if (builder->state == SNAPBUILD_START)
{
- int off;
-
- /*
- * We only care about toplevel xids as those are the ones we
- * definitely see in the wal stream. As snapbuild.c tracks committed
- * instead of running transactions we don't need to know anything
- * about uncommitted subtransactions.
- */
+ builder->state = SNAPBUILD_BUILDING_SNAPSHOT;
+ SnapBuildStartNextPhaseAt(builder, running->nextXid);
/*
* Start with an xmin/xmax that's correct for future, when all the
Assert(TransactionIdIsNormal(builder->xmin));
Assert(TransactionIdIsNormal(builder->xmax));
- builder->running.xcnt = running->xcnt;
- builder->running.xcnt_space = running->xcnt;
- builder->running.xip =
- MemoryContextAlloc(builder->context,
- builder->running.xcnt * sizeof(TransactionId));
- memcpy(builder->running.xip, running->xids,
- builder->running.xcnt * sizeof(TransactionId));
-
- /* sort so we can do a binary search */
- qsort(builder->running.xip, builder->running.xcnt,
- sizeof(TransactionId), xidComparator);
-
- builder->running.xmin = builder->running.xip[0];
- builder->running.xmax = builder->running.xip[running->xcnt - 1];
-
- /* makes comparisons cheaper later */
- TransactionIdRetreat(builder->running.xmin);
- TransactionIdAdvance(builder->running.xmax);
-
- builder->state = SNAPBUILD_FULL_SNAPSHOT;
-
ereport(LOG,
(errmsg("logical decoding found initial starting point at %X/%X",
(uint32) (lsn >> 32), (uint32) lsn),
- errdetail_plural("%u transaction needs to finish.",
- "%u transactions need to finish.",
- builder->running.xcnt,
- (uint32) builder->running.xcnt)));
+ errdetail("Waiting for transactions (approximately %d) older than %u to end.",
+ running->xcnt, running->nextXid)));
- /*
- * Iterate through all xids, wait for them to finish.
- *
- * This isn't required for the correctness of decoding, but to allow
- * isolationtester to notice that we're currently waiting for
- * something.
- */
- for (off = 0; off < builder->running.xcnt; off++)
- {
- TransactionId xid = builder->running.xip[off];
+ SnapBuildWaitSnapshot(running, running->nextXid);
+ }
+ /*
+ * c) transition from BUILDING_SNAPSHOT to FULL_SNAPSHOT.
+ *
+ * In BUILDING_SNAPSHOT state, and this xl_running_xacts' oldestRunningXid
+ * is >= than nextXid from when we switched to BUILDING_SNAPSHOT. This
+ * means all transactions starting afterwards have enough information to
+ * be decoded. Switch to FULL_SNAPSHOT.
+ */
+ else if (builder->state == SNAPBUILD_BUILDING_SNAPSHOT &&
+ TransactionIdPrecedesOrEquals(SnapBuildNextPhaseAt(builder),
+ running->oldestRunningXid))
+ {
+ builder->state = SNAPBUILD_FULL_SNAPSHOT;
+ SnapBuildStartNextPhaseAt(builder, running->nextXid);
- /*
- * Upper layers should prevent that we ever need to wait on
- * ourselves. Check anyway, since failing to do so would either
- * result in an endless wait or an Assert() failure.
- */
- if (TransactionIdIsCurrentTransactionId(xid))
- elog(ERROR, "waiting for ourselves");
+ ereport(LOG,
+ (errmsg("logical decoding found initial consistent point at %X/%X",
+ (uint32) (lsn >> 32), (uint32) lsn),
+ errdetail("Waiting for transactions (approximately %d) older than %u to end.",
+ running->xcnt, running->nextXid)));
- XactLockTableWait(xid, NULL, NULL, XLTW_None);
- }
+ SnapBuildWaitSnapshot(running, running->nextXid);
+ }
+ /*
+ * c) transition from FULL_SNAPSHOT to CONSISTENT.
+ *
+ * In FULL_SNAPSHOT state (see d) ), and this xl_running_xacts'
+ * oldestRunningXid is >= than nextXid from when we switched to
+ * FULL_SNAPSHOT. This means all transactions that are currently in
+ * progress have a catalog snapshot, and all their changes have been
+ * collected. Switch to CONSISTENT.
+ */
+ else if (builder->state == SNAPBUILD_FULL_SNAPSHOT &&
+ TransactionIdPrecedesOrEquals(SnapBuildNextPhaseAt(builder),
+ running->oldestRunningXid))
+ {
+ builder->state = SNAPBUILD_CONSISTENT;
+ SnapBuildStartNextPhaseAt(builder, InvalidTransactionId);
- /* nothing could have built up so far, so don't perform cleanup */
- return false;
+ ereport(LOG,
+ (errmsg("logical decoding found consistent point at %X/%X",
+ (uint32) (lsn >> 32), (uint32) lsn),
+ errdetail("There are no old transactions anymore.")));
}
/*
* records so incremental cleanup can be performed.
*/
return true;
+
}
+/* ---
+ * Iterate through xids in record, wait for all older than the cutoff to
+ * finish. Then, if possible, log a new xl_running_xacts record.
+ *
+ * This isn't required for the correctness of decoding, but to:
+ * a) allow isolationtester to notice that we're currently waiting for
+ * something.
+ * b) log a new xl_running_xacts record where it'd be helpful, without having
+ * to write for bgwriter or checkpointer.
+ * ---
+ */
+static void
+SnapBuildWaitSnapshot(xl_running_xacts *running, TransactionId cutoff)
+{
+ int off;
+
+ for (off = 0; off < running->xcnt; off++)
+ {
+ TransactionId xid = running->xids[off];
+
+ /*
+ * Upper layers should prevent that we ever need to wait on
+ * ourselves. Check anyway, since failing to do so would either
+ * result in an endless wait or an Assert() failure.
+ */
+ if (TransactionIdIsCurrentTransactionId(xid))
+ elog(ERROR, "waiting for ourselves");
+
+ if (TransactionIdFollows(xid, cutoff))
+ continue;
+
+ XactLockTableWait(xid, NULL, NULL, XLTW_None);
+ }
+
+ /*
+ * All transactions we needed to finish finished - try to ensure there is
+ * another xl_running_xacts record in a timely manner, without having to
+ * write for bgwriter or checkpointer to log one. During recovery we
+ * can't enforce that, so we'll have to wait.
+ */
+ if (!RecoveryInProgress())
+ {
+ LogStandbySnapshot();
+ }
+}
/* -----------------------------------
* Snapshot serialization support
errmsg("could not remove file \"%s\": %m", path)));
needed_length = sizeof(SnapBuildOnDisk) +
- sizeof(TransactionId) * builder->running.xcnt_space +
sizeof(TransactionId) * builder->committed.xcnt;
ondisk_c = MemoryContextAllocZero(builder->context, needed_length);
ondisk->builder.context = NULL;
ondisk->builder.snapshot = NULL;
ondisk->builder.reorder = NULL;
- ondisk->builder.running.xip = NULL;
ondisk->builder.committed.xip = NULL;
COMP_CRC32C(ondisk->checksum,
&ondisk->builder,
sizeof(SnapBuild));
- /* copy running xacts */
- sz = sizeof(TransactionId) * builder->running.xcnt_space;
- memcpy(ondisk_c, builder->running.xip, sz);
- COMP_CRC32C(ondisk->checksum, ondisk_c, sz);
- ondisk_c += sz;
+ /* there shouldn't be any running xacts */
+ Assert(builder->was_running.was_xcnt == 0);
/* copy committed xacts */
sz = sizeof(TransactionId) * builder->committed.xcnt;
}
COMP_CRC32C(checksum, &ondisk.builder, sizeof(SnapBuild));
- /* restore running xacts information */
- sz = sizeof(TransactionId) * ondisk.builder.running.xcnt_space;
- ondisk.builder.running.xip = MemoryContextAllocZero(builder->context, sz);
+ /* restore running xacts (dead, but kept for backward compat) */
+ sz = sizeof(TransactionId) * ondisk.builder.was_running.was_xcnt_space;
+ ondisk.builder.was_running.was_xip =
+ MemoryContextAllocZero(builder->context, sz);
pgstat_report_wait_start(WAIT_EVENT_SNAPBUILD_READ);
- readBytes = read(fd, ondisk.builder.running.xip, sz);
+ readBytes = read(fd, ondisk.builder.was_running.was_xip, sz);
pgstat_report_wait_end();
if (readBytes != sz)
{
errmsg("could not read file \"%s\", read %d of %d: %m",
path, readBytes, (int) sz)));
}
- COMP_CRC32C(checksum, ondisk.builder.running.xip, sz);
+ COMP_CRC32C(checksum, ondisk.builder.was_running.was_xip, sz);
/* restore committed xacts information */
sz = sizeof(TransactionId) * ondisk.builder.committed.xcnt;
}
ondisk.builder.committed.xip = NULL;
- builder->running.xcnt = ondisk.builder.running.xcnt;
- if (builder->running.xip)
- pfree(builder->running.xip);
- builder->running.xcnt_space = ondisk.builder.running.xcnt_space;
- builder->running.xip = ondisk.builder.running.xip;
-
/* our snapshot is not interesting anymore, build a new one */
if (builder->snapshot != NULL)
{
return true;
snapshot_not_interesting:
- if (ondisk.builder.running.xip != NULL)
- pfree(ondisk.builder.running.xip);
if (ondisk.builder.committed.xip != NULL)
pfree(ondisk.builder.committed.xip);
return false;