]> granicus.if.org Git - postgresql/blobdiff - src/backend/replication/logical/snapbuild.c
Add macros to make AllocSetContextCreate() calls simpler and safer.
[postgresql] / src / backend / replication / logical / snapbuild.c
index 82e7d986b42d65f60a22f42c8b114e9324183ac3..8b59fc5a16a15cabac8816edb44215824dd92bf2 100644 (file)
@@ -79,7 +79,7 @@
  *       +--->|SNAPBUILD_CONSISTENT     |<------------+
  *                +-------------------------+
  *
- * Initially the machinery is in the START stage. When a xl_running_xacts
+ * Initially the machinery is in the START stage. When an xl_running_xacts
  * record is read that is sufficiently new (above the safe xmin horizon),
  * there's a state transition. If there were no running xacts when the
  * runnign_xacts record was generated, we'll directly go into CONSISTENT
@@ -96,7 +96,7 @@
  * is a convenient point to initialize replication from, which is why we
  * export a snapshot at that point, which *can* be used to read normal data.
  *
- * Copyright (c) 2012-2015, PostgreSQL Global Development Group
+ * Copyright (c) 2012-2016, PostgreSQL Global Development Group
  *
  * IDENTIFICATION
  *       src/backend/replication/snapbuild.c
@@ -153,15 +153,14 @@ struct SnapBuild
        TransactionId xmax;
 
        /*
-        * Don't replay commits from an LSN < this LSN. This can be set
-        * externally but it will also be advanced (never retreat) from within
-        * snapbuild.c.
+        * Don't replay commits from an LSN < this LSN. This can be set externally
+        * but it will also be advanced (never retreat) from within snapbuild.c.
         */
        XLogRecPtr      start_decoding_at;
 
        /*
         * Don't start decoding WAL until the "xl_running_xacts" information
-        * indicates there are no running xids with a xid smaller than this.
+        * indicates there are no running xids with an xid smaller than this.
         */
        TransactionId initial_xmin_horizon;
 
@@ -244,7 +243,7 @@ struct SnapBuild
  * removes knowledge about the previously used resowner, so we save it here.
  */
 static ResourceOwner SavedResourceOwnerDuringExport = NULL;
-static bool    ExportInProgress = false;
+static bool ExportInProgress = false;
 
 /* transaction state manipulation functions */
 static void SnapBuildEndTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid);
@@ -290,9 +289,7 @@ AllocateSnapshotBuilder(ReorderBuffer *reorder,
        /* allocate memory in own context, to have better accountability */
        context = AllocSetContextCreate(CurrentMemoryContext,
                                                                        "snapshot builder context",
-                                                                       ALLOCSET_DEFAULT_MINSIZE,
-                                                                       ALLOCSET_DEFAULT_INITSIZE,
-                                                                       ALLOCSET_DEFAULT_MAXSIZE);
+                                                                       ALLOCSET_DEFAULT_SIZES);
        oldcontext = MemoryContextSwitchTo(context);
 
        builder = palloc0(sizeof(SnapBuild));
@@ -348,7 +345,7 @@ SnapBuildFreeSnapshot(Snapshot snap)
        Assert(snap->curcid == FirstCommandId);
        Assert(!snap->suboverflowed);
        Assert(!snap->takenDuringRecovery);
-       Assert(snap->regd_count == 1);
+       Assert(snap->regd_count == 0);
 
        /* slightly more likely, so it's checked even without c-asserts */
        if (snap->copied)
@@ -407,16 +404,16 @@ SnapBuildSnapDecRefcount(Snapshot snap)
        Assert(!snap->suboverflowed);
        Assert(!snap->takenDuringRecovery);
 
-       Assert(snap->regd_count == 1);
+       Assert(snap->regd_count == 0);
 
-       Assert(snap->active_count);
+       Assert(snap->active_count > 0);
 
        /* slightly more likely, so it's checked even without casserts */
        if (snap->copied)
                elog(ERROR, "cannot free a copied snapshot");
 
        snap->active_count--;
-       if (!snap->active_count)
+       if (snap->active_count == 0)
                SnapBuildFreeSnapshot(snap);
 }
 
@@ -495,7 +492,7 @@ SnapBuildBuildSnapshot(SnapBuild *builder, TransactionId xid)
        snapshot->copied = false;
        snapshot->curcid = FirstCommandId;
        snapshot->active_count = 0;
-       snapshot->regd_count = 1;       /* mark as registered so nobody frees it */
+       snapshot->regd_count = 0;
 
        return snapshot;
 }
@@ -599,21 +596,40 @@ SnapBuildExportSnapshot(SnapBuild *builder)
 
        ereport(LOG,
                        (errmsg_plural("exported logical decoding snapshot: \"%s\" with %u transaction ID",
-                                                  "exported logical decoding snapshot: \"%s\" with %u transaction IDs",
+               "exported logical decoding snapshot: \"%s\" with %u transaction IDs",
                                                   snap->xcnt,
                                                   snapname, snap->xcnt)));
        return snapname;
 }
 
+/*
+ * Ensure there is a snapshot and if not build one for current transaction.
+ */
+Snapshot
+SnapBuildGetOrBuildSnapshot(SnapBuild *builder, TransactionId xid)
+{
+       Assert(builder->state == SNAPBUILD_CONSISTENT);
+
+       /* only build a new snapshot if we don't have a prebuilt one */
+       if (builder->snapshot == NULL)
+       {
+               builder->snapshot = SnapBuildBuildSnapshot(builder, xid);
+               /* inrease refcount for the snapshot builder */
+               SnapBuildSnapIncRefcount(builder->snapshot);
+       }
+
+       return builder->snapshot;
+}
+
 /*
  * Reset a previously SnapBuildExportSnapshot()'ed snapshot if there is
  * any. Aborts the previously started transaction and resets the resource
  * owner back to its original value.
  */
 void
-SnapBuildClearExportedSnapshot()
+SnapBuildClearExportedSnapshot(void)
 {
-       /* nothing exported, thats the usual case */
+       /* nothing exported, that is the usual case */
        if (!ExportInProgress)
                return;
 
@@ -636,8 +652,6 @@ SnapBuildClearExportedSnapshot()
 bool
 SnapBuildProcessChange(SnapBuild *builder, TransactionId xid, XLogRecPtr lsn)
 {
-       bool            is_old_tx;
-
        /*
         * We can't handle data in transactions if we haven't built a snapshot
         * yet, so don't store them.
@@ -658,9 +672,7 @@ SnapBuildProcessChange(SnapBuild *builder, TransactionId xid, XLogRecPtr lsn)
         * If the reorderbuffer doesn't yet have a snapshot, add one now, it will
         * be needed to decode the change we're currently processing.
         */
-       is_old_tx = ReorderBufferIsXidKnown(builder->reorder, xid);
-
-       if (!is_old_tx || !ReorderBufferXidHasBaseSnapshot(builder->reorder, xid))
+       if (!ReorderBufferXidHasBaseSnapshot(builder->reorder, xid))
        {
                /* only build a new snapshot if we don't have a prebuilt one */
                if (builder->snapshot == NULL)
@@ -683,8 +695,9 @@ SnapBuildProcessChange(SnapBuild *builder, TransactionId xid, XLogRecPtr lsn)
 }
 
 /*
- * Do CommandId/ComboCid handling after reading a xl_heap_new_cid record. This
- * implies that a transaction has done some form of write to system catalogs.
+ * Do CommandId/ComboCid handling after reading an xl_heap_new_cid record.
+ * This implies that a transaction has done some form of write to system
+ * catalogs.
  */
 void
 SnapBuildProcessNewCid(SnapBuild *builder, TransactionId xid,
@@ -769,7 +782,7 @@ SnapBuildDistributeNewCatalogSnapshot(SnapBuild *builder, XLogRecPtr lsn)
        /*
         * Iterate through all toplevel transactions. This can include
         * subtransactions which we just don't yet know to be that, but that's
-        * fine, they will just get an unneccesary snapshot queued.
+        * fine, they will just get an unnecessary snapshot queued.
         */
        dlist_foreach(txn_i, &builder->reorder->toplevel_by_lsn)
        {
@@ -832,7 +845,7 @@ SnapBuildAddCommittedTxn(SnapBuild *builder, TransactionId xid)
 
 /*
  * Remove knowledge about transactions we treat as committed that are smaller
- * than ->xmin. Those won't ever get checked via the ->commited array but via
+ * than ->xmin. Those won't ever get checked via the ->committed array but via
  * the clog machinery, so we don't need to waste memory on them.
  */
 static void
@@ -886,7 +899,7 @@ SnapBuildEndTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid)
        /*
         * NB: This handles subtransactions correctly even if we started from
         * suboverflowed xl_running_xacts because we only keep track of toplevel
-        * transactions. Since the latter are always are allocated before their
+        * transactions. Since the latter are always allocated before their
         * subxids and since they end at the same time it's sufficient to deal
         * with them here.
         */
@@ -903,8 +916,8 @@ SnapBuildEndTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid)
                        ereport(LOG,
                                  (errmsg("logical decoding found consistent point at %X/%X",
                                                  (uint32) (lsn >> 32), (uint32) lsn),
-                               errdetail("Transaction ID %u finished; no more running transactions.",
-                                                 xid)));
+                                  errdetail("Transaction ID %u finished; no more running transactions.",
+                                                        xid)));
                        builder->state = SNAPBUILD_CONSISTENT;
                }
        }
@@ -966,7 +979,7 @@ SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid,
                 * we reached consistency.
                 */
                forced_timetravel = true;
-               elog(DEBUG1, "forced to assume catalog changes for xid %u because it was running to early", xid);
+               elog(DEBUG1, "forced to assume catalog changes for xid %u because it was running too early", xid);
        }
 
        for (nxact = 0; nxact < nsubxacts; nxact++)
@@ -1211,7 +1224,7 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn
         *        to CONSISTENT.
         *        NB: We need to search running.xip when seeing a transaction's end to
         *        make sure it's a toplevel transaction and it's been one of the
-        *        intially running ones.
+        *        initially running ones.
         *        Interestingly, in contrast to HS, this allows us not to care about
         *        subtransactions - and by extension suboverflowed xl_running_xacts -
         *        at all.
@@ -1231,8 +1244,8 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn
        {
                ereport(DEBUG1,
                                (errmsg_internal("skipping snapshot at %X/%X while building logical decoding snapshot, xmin horizon too low",
-                                               (uint32) (lsn >> 32), (uint32) lsn),
-                                errdetail_internal("initial xmin horizon of %u vs the snapshot's %u",
+                                                                (uint32) (lsn >> 32), (uint32) lsn),
+               errdetail_internal("initial xmin horizon of %u vs the snapshot's %u",
                                 builder->initial_xmin_horizon, running->oldestRunningXid)));
                return true;
        }
@@ -1251,8 +1264,8 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn
                        builder->start_decoding_at = lsn + 1;
 
                /* As no transactions were running xmin/xmax can be trivially set. */
-               builder->xmin = running->nextXid; /* < are finished */
-               builder->xmax = running->nextXid; /* >= are running */
+               builder->xmin = running->nextXid;               /* < are finished */
+               builder->xmax = running->nextXid;               /* >= are running */
 
                /* so we can safely use the faster comparisons */
                Assert(TransactionIdIsNormal(builder->xmin));
@@ -1301,8 +1314,8 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn
                 * currently running transactions have finished. We'll update both
                 * while waiting for the pending transactions to finish.
                 */
-               builder->xmin = running->nextXid; /* < are finished */
-               builder->xmax = running->nextXid;  /* >= are running */
+               builder->xmin = running->nextXid;               /* < are finished */
+               builder->xmax = running->nextXid;               /* >= are running */
 
                /* so we can safely use the faster comparisons */
                Assert(TransactionIdIsNormal(builder->xmin));
@@ -1597,7 +1610,7 @@ SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn)
 
        /*
         * We may overwrite the work from some other backend, but that's ok, our
-        * snapshot is valid as well, we'll just have done some superflous work.
+        * snapshot is valid as well, we'll just have done some superfluous work.
         */
        if (rename(tmppath, path) != 0)
        {
@@ -1656,7 +1669,7 @@ SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn)
         * Make sure the snapshot had been stored safely to disk, that's normally
         * cheap.
         * Note that we do not need PANIC here, nobody will be able to use the
-        * slot without fsyncing, and saving it won't suceed without an fsync()
+        * slot without fsyncing, and saving it won't succeed without an fsync()
         * either...
         * ----
         */
@@ -1677,17 +1690,17 @@ SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn)
 
        if (ondisk.magic != SNAPBUILD_MAGIC)
                ereport(ERROR,
-                               (errmsg("snapbuild state file \"%s\" has wrong magic %u instead of %u",
+                               (errmsg("snapbuild state file \"%s\" has wrong magic number: %u instead of %u",
                                                path, ondisk.magic, SNAPBUILD_MAGIC)));
 
        if (ondisk.version != SNAPBUILD_VERSION)
                ereport(ERROR,
-                               (errmsg("snapbuild state file \"%s\" has unsupported version %u instead of %u",
+                               (errmsg("snapbuild state file \"%s\" has unsupported version: %u instead of %u",
                                                path, ondisk.version, SNAPBUILD_VERSION)));
 
        INIT_CRC32C(checksum);
        COMP_CRC32C(checksum,
-                          ((char *) &ondisk) + SnapBuildOnDiskNotChecksummedSize,
+                               ((char *) &ondisk) + SnapBuildOnDiskNotChecksummedSize,
                        SnapBuildOnDiskConstantSize - SnapBuildOnDiskNotChecksummedSize);
 
        /* read SnapBuild */
@@ -1738,7 +1751,7 @@ SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn)
        if (!EQ_CRC32C(checksum, ondisk.checksum))
                ereport(ERROR,
                                (errcode_for_file_access(),
-                                errmsg("snapbuild state file %s: checksum mismatch, is %u, should be %u",
+                                errmsg("checksum mismatch for snapbuild state file \"%s\": is %u, should be %u",
                                                path, checksum, ondisk.checksum)));
 
        /*
@@ -1748,7 +1761,7 @@ SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn)
 
        /*
         * We are only interested in consistent snapshots for now, comparing
-        * whether one imcomplete snapshot is more "advanced" seems to be
+        * whether one incomplete snapshot is more "advanced" seems to be
         * unnecessarily complex.
         */
        if (ondisk.builder.state < SNAPBUILD_CONSISTENT)