* +--->|SNAPBUILD_CONSISTENT |<------------+
* +-------------------------+
*
- * Initially the machinery is in the START stage. When a xl_running_xacts
+ * Initially the machinery is in the START stage. When an xl_running_xacts
* record is read that is sufficiently new (above the safe xmin horizon),
* there's a state transition. If there were no running xacts when the
* runnign_xacts record was generated, we'll directly go into CONSISTENT
* is a convenient point to initialize replication from, which is why we
* export a snapshot at that point, which *can* be used to read normal data.
*
- * Copyright (c) 2012-2015, PostgreSQL Global Development Group
+ * Copyright (c) 2012-2016, PostgreSQL Global Development Group
*
* IDENTIFICATION
* src/backend/replication/snapbuild.c
TransactionId xmax;
/*
- * Don't replay commits from an LSN < this LSN. This can be set
- * externally but it will also be advanced (never retreat) from within
- * snapbuild.c.
+ * Don't replay commits from an LSN < this LSN. This can be set externally
+ * but it will also be advanced (never retreat) from within snapbuild.c.
*/
XLogRecPtr start_decoding_at;
/*
* Don't start decoding WAL until the "xl_running_xacts" information
- * indicates there are no running xids with a xid smaller than this.
+ * indicates there are no running xids with an xid smaller than this.
*/
TransactionId initial_xmin_horizon;
* removes knowledge about the previously used resowner, so we save it here.
*/
static ResourceOwner SavedResourceOwnerDuringExport = NULL;
-static bool ExportInProgress = false;
+static bool ExportInProgress = false;
/* transaction state manipulation functions */
static void SnapBuildEndTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid);
/* allocate memory in own context, to have better accountability */
context = AllocSetContextCreate(CurrentMemoryContext,
"snapshot builder context",
- ALLOCSET_DEFAULT_MINSIZE,
- ALLOCSET_DEFAULT_INITSIZE,
- ALLOCSET_DEFAULT_MAXSIZE);
+ ALLOCSET_DEFAULT_SIZES);
oldcontext = MemoryContextSwitchTo(context);
builder = palloc0(sizeof(SnapBuild));
Assert(snap->curcid == FirstCommandId);
Assert(!snap->suboverflowed);
Assert(!snap->takenDuringRecovery);
- Assert(snap->regd_count == 1);
+ Assert(snap->regd_count == 0);
/* slightly more likely, so it's checked even without c-asserts */
if (snap->copied)
Assert(!snap->suboverflowed);
Assert(!snap->takenDuringRecovery);
- Assert(snap->regd_count == 1);
+ Assert(snap->regd_count == 0);
- Assert(snap->active_count);
+ Assert(snap->active_count > 0);
/* slightly more likely, so it's checked even without casserts */
if (snap->copied)
elog(ERROR, "cannot free a copied snapshot");
snap->active_count--;
- if (!snap->active_count)
+ if (snap->active_count == 0)
SnapBuildFreeSnapshot(snap);
}
snapshot->copied = false;
snapshot->curcid = FirstCommandId;
snapshot->active_count = 0;
- snapshot->regd_count = 1; /* mark as registered so nobody frees it */
+ snapshot->regd_count = 0;
return snapshot;
}
ereport(LOG,
(errmsg_plural("exported logical decoding snapshot: \"%s\" with %u transaction ID",
- "exported logical decoding snapshot: \"%s\" with %u transaction IDs",
+ "exported logical decoding snapshot: \"%s\" with %u transaction IDs",
snap->xcnt,
snapname, snap->xcnt)));
return snapname;
}
+/*
+ * Ensure there is a snapshot and if not build one for current transaction.
+ */
+Snapshot
+SnapBuildGetOrBuildSnapshot(SnapBuild *builder, TransactionId xid)
+{
+ Assert(builder->state == SNAPBUILD_CONSISTENT);
+
+ /* only build a new snapshot if we don't have a prebuilt one */
+ if (builder->snapshot == NULL)
+ {
+ builder->snapshot = SnapBuildBuildSnapshot(builder, xid);
+ /* inrease refcount for the snapshot builder */
+ SnapBuildSnapIncRefcount(builder->snapshot);
+ }
+
+ return builder->snapshot;
+}
+
/*
* Reset a previously SnapBuildExportSnapshot()'ed snapshot if there is
* any. Aborts the previously started transaction and resets the resource
* owner back to its original value.
*/
void
-SnapBuildClearExportedSnapshot()
+SnapBuildClearExportedSnapshot(void)
{
- /* nothing exported, thats the usual case */
+ /* nothing exported, that is the usual case */
if (!ExportInProgress)
return;
bool
SnapBuildProcessChange(SnapBuild *builder, TransactionId xid, XLogRecPtr lsn)
{
- bool is_old_tx;
-
/*
* We can't handle data in transactions if we haven't built a snapshot
* yet, so don't store them.
* If the reorderbuffer doesn't yet have a snapshot, add one now, it will
* be needed to decode the change we're currently processing.
*/
- is_old_tx = ReorderBufferIsXidKnown(builder->reorder, xid);
-
- if (!is_old_tx || !ReorderBufferXidHasBaseSnapshot(builder->reorder, xid))
+ if (!ReorderBufferXidHasBaseSnapshot(builder->reorder, xid))
{
/* only build a new snapshot if we don't have a prebuilt one */
if (builder->snapshot == NULL)
}
/*
- * Do CommandId/ComboCid handling after reading a xl_heap_new_cid record. This
- * implies that a transaction has done some form of write to system catalogs.
+ * Do CommandId/ComboCid handling after reading an xl_heap_new_cid record.
+ * This implies that a transaction has done some form of write to system
+ * catalogs.
*/
void
SnapBuildProcessNewCid(SnapBuild *builder, TransactionId xid,
/*
* Iterate through all toplevel transactions. This can include
* subtransactions which we just don't yet know to be that, but that's
- * fine, they will just get an unneccesary snapshot queued.
+ * fine, they will just get an unnecessary snapshot queued.
*/
dlist_foreach(txn_i, &builder->reorder->toplevel_by_lsn)
{
/*
* Remove knowledge about transactions we treat as committed that are smaller
- * than ->xmin. Those won't ever get checked via the ->commited array but via
+ * than ->xmin. Those won't ever get checked via the ->committed array but via
* the clog machinery, so we don't need to waste memory on them.
*/
static void
/*
* NB: This handles subtransactions correctly even if we started from
* suboverflowed xl_running_xacts because we only keep track of toplevel
- * transactions. Since the latter are always are allocated before their
+ * transactions. Since the latter are always allocated before their
* subxids and since they end at the same time it's sufficient to deal
* with them here.
*/
ereport(LOG,
(errmsg("logical decoding found consistent point at %X/%X",
(uint32) (lsn >> 32), (uint32) lsn),
- errdetail("Transaction ID %u finished; no more running transactions.",
- xid)));
+ errdetail("Transaction ID %u finished; no more running transactions.",
+ xid)));
builder->state = SNAPBUILD_CONSISTENT;
}
}
* we reached consistency.
*/
forced_timetravel = true;
- elog(DEBUG1, "forced to assume catalog changes for xid %u because it was running to early", xid);
+ elog(DEBUG1, "forced to assume catalog changes for xid %u because it was running too early", xid);
}
for (nxact = 0; nxact < nsubxacts; nxact++)
* to CONSISTENT.
* NB: We need to search running.xip when seeing a transaction's end to
* make sure it's a toplevel transaction and it's been one of the
- * intially running ones.
+ * initially running ones.
* Interestingly, in contrast to HS, this allows us not to care about
* subtransactions - and by extension suboverflowed xl_running_xacts -
* at all.
{
ereport(DEBUG1,
(errmsg_internal("skipping snapshot at %X/%X while building logical decoding snapshot, xmin horizon too low",
- (uint32) (lsn >> 32), (uint32) lsn),
- errdetail_internal("initial xmin horizon of %u vs the snapshot's %u",
+ (uint32) (lsn >> 32), (uint32) lsn),
+ errdetail_internal("initial xmin horizon of %u vs the snapshot's %u",
builder->initial_xmin_horizon, running->oldestRunningXid)));
return true;
}
builder->start_decoding_at = lsn + 1;
/* As no transactions were running xmin/xmax can be trivially set. */
- builder->xmin = running->nextXid; /* < are finished */
- builder->xmax = running->nextXid; /* >= are running */
+ builder->xmin = running->nextXid; /* < are finished */
+ builder->xmax = running->nextXid; /* >= are running */
/* so we can safely use the faster comparisons */
Assert(TransactionIdIsNormal(builder->xmin));
* currently running transactions have finished. We'll update both
* while waiting for the pending transactions to finish.
*/
- builder->xmin = running->nextXid; /* < are finished */
- builder->xmax = running->nextXid; /* >= are running */
+ builder->xmin = running->nextXid; /* < are finished */
+ builder->xmax = running->nextXid; /* >= are running */
/* so we can safely use the faster comparisons */
Assert(TransactionIdIsNormal(builder->xmin));
/*
* We may overwrite the work from some other backend, but that's ok, our
- * snapshot is valid as well, we'll just have done some superflous work.
+ * snapshot is valid as well, we'll just have done some superfluous work.
*/
if (rename(tmppath, path) != 0)
{
* Make sure the snapshot had been stored safely to disk, that's normally
* cheap.
* Note that we do not need PANIC here, nobody will be able to use the
- * slot without fsyncing, and saving it won't suceed without an fsync()
+ * slot without fsyncing, and saving it won't succeed without an fsync()
* either...
* ----
*/
if (ondisk.magic != SNAPBUILD_MAGIC)
ereport(ERROR,
- (errmsg("snapbuild state file \"%s\" has wrong magic %u instead of %u",
+ (errmsg("snapbuild state file \"%s\" has wrong magic number: %u instead of %u",
path, ondisk.magic, SNAPBUILD_MAGIC)));
if (ondisk.version != SNAPBUILD_VERSION)
ereport(ERROR,
- (errmsg("snapbuild state file \"%s\" has unsupported version %u instead of %u",
+ (errmsg("snapbuild state file \"%s\" has unsupported version: %u instead of %u",
path, ondisk.version, SNAPBUILD_VERSION)));
INIT_CRC32C(checksum);
COMP_CRC32C(checksum,
- ((char *) &ondisk) + SnapBuildOnDiskNotChecksummedSize,
+ ((char *) &ondisk) + SnapBuildOnDiskNotChecksummedSize,
SnapBuildOnDiskConstantSize - SnapBuildOnDiskNotChecksummedSize);
/* read SnapBuild */
if (!EQ_CRC32C(checksum, ondisk.checksum))
ereport(ERROR,
(errcode_for_file_access(),
- errmsg("snapbuild state file %s: checksum mismatch, is %u, should be %u",
+ errmsg("checksum mismatch for snapbuild state file \"%s\": is %u, should be %u",
path, checksum, ondisk.checksum)));
/*
/*
* We are only interested in consistent snapshots for now, comparing
- * whether one imcomplete snapshot is more "advanced" seems to be
+ * whether one incomplete snapshot is more "advanced" seems to be
* unnecessarily complex.
*/
if (ondisk.builder.state < SNAPBUILD_CONSISTENT)