]> granicus.if.org Git - postgresql/blobdiff - src/backend/replication/logical/snapbuild.c
Add macros to make AllocSetContextCreate() calls simpler and safer.
[postgresql] / src / backend / replication / logical / snapbuild.c
index c462e9059d64018db90a37c32f36da521d2feb48..8b59fc5a16a15cabac8816edb44215824dd92bf2 100644 (file)
@@ -28,7 +28,7 @@
  *
  * As the percentage of transactions modifying the catalog normally is fairly
  * small in comparisons to ones only manipulating user data, we keep track of
- * the committed catalog modifying ones inside (xmin, xmax) instead of keeping
+ * the committed catalog modifying ones inside [xmin, xmax) instead of keeping
  * track of all running transactions like it's done in a normal snapshot. Note
  * that we're generally only looking at transactions that have acquired an
  * xid. That is we keep a list of transactions between snapshot->(xmin, xmax)
  *
  * The snapbuild machinery is starting up in several stages, as illustrated
  * by the following graph:
- *         +-------------------------+
- *    +----|SNAPBUILD_START          |-------------+
- *    |    +-------------------------+             |
- *    |                 |                          |
- *    |                 |                          |
- *    |     running_xacts with running xacts       |
- *    |                 |                          |
- *    |                 |                          |
- *    |                 v                          |
- *    |    +-------------------------+             v
- *    |    |SNAPBUILD_FULL_SNAPSHOT  |------------>|
- *    |    +-------------------------+             |
- * running_xacts        |                      saved snapshot
- * with zero xacts      |                 at running_xacts's lsn
- *    |                 |                          |
- *    |     all running toplevel TXNs finished     |
- *    |                 |                          |
- *    |                 v                          |
- *    |    +-------------------------+             |
- *    +--->|SNAPBUILD_CONSISTENT     |<------------+
- *         +-------------------------+
+ *                +-------------------------+
+ *       +----|SNAPBUILD_START                  |-------------+
+ *       |    +-------------------------+                         |
+ *       |                                     |                                                  |
+ *       |                                     |                                                  |
+ *       |             running_xacts with running xacts           |
+ *       |                                     |                                                  |
+ *       |                                     |                                                  |
+ *       |                                     v                                                  |
+ *       |    +-------------------------+                         v
+ *       |    |SNAPBUILD_FULL_SNAPSHOT  |------------>|
+ *       |    +-------------------------+                         |
+ * running_xacts               |                                          saved snapshot
+ * with zero xacts             |                                 at running_xacts's lsn
+ *       |                                     |                                                  |
+ *       |             all running toplevel TXNs finished         |
+ *       |                                     |                                                  |
+ *       |                                     v                                                  |
+ *       |    +-------------------------+                         |
+ *       +--->|SNAPBUILD_CONSISTENT     |<------------+
+ *                +-------------------------+
  *
- * Initially the machinery is in the START stage. When a xl_running_xacts
+ * Initially the machinery is in the START stage. When an xl_running_xacts
  * record is read that is sufficiently new (above the safe xmin horizon),
  * there's a state transition. If there were no running xacts when the
  * runnign_xacts record was generated, we'll directly go into CONSISTENT
@@ -96,7 +96,7 @@
  * is a convenient point to initialize replication from, which is why we
  * export a snapshot at that point, which *can* be used to read normal data.
  *
- * Copyright (c) 2012-2014, PostgreSQL Global Development Group
+ * Copyright (c) 2012-2016, PostgreSQL Global Development Group
  *
  * IDENTIFICATION
  *       src/backend/replication/snapbuild.c
@@ -153,15 +153,14 @@ struct SnapBuild
        TransactionId xmax;
 
        /*
-        * Don't replay commits from an LSN <= this LSN. This can be set
-        * externally but it will also be advanced (never retreat) from within
-        * snapbuild.c.
+        * Don't replay commits from an LSN < this LSN. This can be set externally
+        * but it will also be advanced (never retreat) from within snapbuild.c.
         */
-       XLogRecPtr      transactions_after;
+       XLogRecPtr      start_decoding_at;
 
        /*
         * Don't start decoding WAL until the "xl_running_xacts" information
-        * indicates there are no running xids with a xid smaller than this.
+        * indicates there are no running xids with an xid smaller than this.
         */
        TransactionId initial_xmin_horizon;
 
@@ -184,7 +183,7 @@ struct SnapBuild
         * Information about initially running transactions
         *
         * When we start building a snapshot there already may be transactions in
-        * progress.  Those are stored in running.xip.  We don't have enough
+        * progress.  Those are stored in running.xip.  We don't have enough
         * information about those to decode their contents, so until they are
         * finished (xcnt=0) we cannot switch to a CONSISTENT state.
         */
@@ -243,8 +242,8 @@ struct SnapBuild
  * Starting a transaction -- which we need to do while exporting a snapshot --
  * removes knowledge about the previously used resowner, so we save it here.
  */
-ResourceOwner SavedResourceOwnerDuringExport = NULL;
-bool ExportInProgress = false;
+static ResourceOwner SavedResourceOwnerDuringExport = NULL;
+static bool ExportInProgress = false;
 
 /* transaction state manipulation functions */
 static void SnapBuildEndTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid);
@@ -290,9 +289,7 @@ AllocateSnapshotBuilder(ReorderBuffer *reorder,
        /* allocate memory in own context, to have better accountability */
        context = AllocSetContextCreate(CurrentMemoryContext,
                                                                        "snapshot builder context",
-                                                                       ALLOCSET_DEFAULT_MINSIZE,
-                                                                       ALLOCSET_DEFAULT_INITSIZE,
-                                                                       ALLOCSET_DEFAULT_MAXSIZE);
+                                                                       ALLOCSET_DEFAULT_SIZES);
        oldcontext = MemoryContextSwitchTo(context);
 
        builder = palloc0(sizeof(SnapBuild));
@@ -307,10 +304,9 @@ AllocateSnapshotBuilder(ReorderBuffer *reorder,
        builder->committed.xip =
                palloc0(builder->committed.xcnt_space * sizeof(TransactionId));
        builder->committed.includes_all_transactions = true;
-       builder->committed.xip =
-               palloc0(builder->committed.xcnt_space * sizeof(TransactionId));
+
        builder->initial_xmin_horizon = xmin_horizon;
-       builder->transactions_after = start_lsn;
+       builder->start_decoding_at = start_lsn;
 
        MemoryContextSwitchTo(oldcontext);
 
@@ -349,7 +345,7 @@ SnapBuildFreeSnapshot(Snapshot snap)
        Assert(snap->curcid == FirstCommandId);
        Assert(!snap->suboverflowed);
        Assert(!snap->takenDuringRecovery);
-       Assert(snap->regd_count == 1);
+       Assert(snap->regd_count == 0);
 
        /* slightly more likely, so it's checked even without c-asserts */
        if (snap->copied)
@@ -376,7 +372,7 @@ SnapBuildCurrentState(SnapBuild *builder)
 bool
 SnapBuildXactNeedsSkip(SnapBuild *builder, XLogRecPtr ptr)
 {
-       return ptr <= builder->transactions_after;
+       return ptr < builder->start_decoding_at;
 }
 
 /*
@@ -408,16 +404,16 @@ SnapBuildSnapDecRefcount(Snapshot snap)
        Assert(!snap->suboverflowed);
        Assert(!snap->takenDuringRecovery);
 
-       Assert(snap->regd_count == 1);
+       Assert(snap->regd_count == 0);
 
-       Assert(snap->active_count);
+       Assert(snap->active_count > 0);
 
        /* slightly more likely, so it's checked even without casserts */
        if (snap->copied)
                elog(ERROR, "cannot free a copied snapshot");
 
        snap->active_count--;
-       if (!snap->active_count)
+       if (snap->active_count == 0)
                SnapBuildFreeSnapshot(snap);
 }
 
@@ -496,7 +492,7 @@ SnapBuildBuildSnapshot(SnapBuild *builder, TransactionId xid)
        snapshot->copied = false;
        snapshot->curcid = FirstCommandId;
        snapshot->active_count = 0;
-       snapshot->regd_count = 1; /* mark as registered so nobody frees it */
+       snapshot->regd_count = 0;
 
        return snapshot;
 }
@@ -599,20 +595,41 @@ SnapBuildExportSnapshot(SnapBuild *builder)
        snapname = ExportSnapshot(snap);
 
        ereport(LOG,
-                       (errmsg("exported logical decoding snapshot: \"%s\" with %u xids",
-                                       snapname, snap->xcnt)));
+                       (errmsg_plural("exported logical decoding snapshot: \"%s\" with %u transaction ID",
+               "exported logical decoding snapshot: \"%s\" with %u transaction IDs",
+                                                  snap->xcnt,
+                                                  snapname, snap->xcnt)));
        return snapname;
 }
 
+/*
+ * Ensure there is a snapshot and if not build one for current transaction.
+ */
+Snapshot
+SnapBuildGetOrBuildSnapshot(SnapBuild *builder, TransactionId xid)
+{
+       Assert(builder->state == SNAPBUILD_CONSISTENT);
+
+       /* only build a new snapshot if we don't have a prebuilt one */
+       if (builder->snapshot == NULL)
+       {
+               builder->snapshot = SnapBuildBuildSnapshot(builder, xid);
+               /* inrease refcount for the snapshot builder */
+               SnapBuildSnapIncRefcount(builder->snapshot);
+       }
+
+       return builder->snapshot;
+}
+
 /*
  * Reset a previously SnapBuildExportSnapshot()'ed snapshot if there is
  * any. Aborts the previously started transaction and resets the resource
  * owner back to its original value.
  */
 void
-SnapBuildClearExportedSnapshot()
+SnapBuildClearExportedSnapshot(void)
 {
-       /* nothing exported, thats the usual case */
+       /* nothing exported, that is the usual case */
        if (!ExportInProgress)
                return;
 
@@ -635,8 +652,6 @@ SnapBuildClearExportedSnapshot()
 bool
 SnapBuildProcessChange(SnapBuild *builder, TransactionId xid, XLogRecPtr lsn)
 {
-       bool is_old_tx;
-
        /*
         * We can't handle data in transactions if we haven't built a snapshot
         * yet, so don't store them.
@@ -657,9 +672,7 @@ SnapBuildProcessChange(SnapBuild *builder, TransactionId xid, XLogRecPtr lsn)
         * If the reorderbuffer doesn't yet have a snapshot, add one now, it will
         * be needed to decode the change we're currently processing.
         */
-       is_old_tx = ReorderBufferIsXidKnown(builder->reorder, xid);
-
-       if (!is_old_tx || !ReorderBufferXidHasBaseSnapshot(builder->reorder, xid))
+       if (!ReorderBufferXidHasBaseSnapshot(builder->reorder, xid))
        {
                /* only build a new snapshot if we don't have a prebuilt one */
                if (builder->snapshot == NULL)
@@ -682,8 +695,9 @@ SnapBuildProcessChange(SnapBuild *builder, TransactionId xid, XLogRecPtr lsn)
 }
 
 /*
- * Do CommandId/ComboCid handling after reading a xl_heap_new_cid record. This
- * implies that a transaction has done some form of write to system catalogs.
+ * Do CommandId/ComboCid handling after reading an xl_heap_new_cid record.
+ * This implies that a transaction has done some form of write to system
+ * catalogs.
  */
 void
 SnapBuildProcessNewCid(SnapBuild *builder, TransactionId xid,
@@ -692,13 +706,13 @@ SnapBuildProcessNewCid(SnapBuild *builder, TransactionId xid,
        CommandId       cid;
 
        /*
-        * we only log new_cid's if a catalog tuple was modified, so mark
-        * the transaction as containing catalog modifications
+        * we only log new_cid's if a catalog tuple was modified, so mark the
+        * transaction as containing catalog modifications
         */
-       ReorderBufferXidSetCatalogChanges(builder->reorder, xid,lsn);
+       ReorderBufferXidSetCatalogChanges(builder->reorder, xid, lsn);
 
        ReorderBufferAddNewTupleCids(builder->reorder, xlrec->top_xid, lsn,
-                                                                xlrec->target.node, xlrec->target.tid,
+                                                                xlrec->target_node, xlrec->target_tid,
                                                                 xlrec->cmin, xlrec->cmax,
                                                                 xlrec->combocid);
 
@@ -712,7 +726,7 @@ SnapBuildProcessNewCid(SnapBuild *builder, TransactionId xid,
                cid = xlrec->cmin;
        else
        {
-               cid = InvalidCommandId;         /* silence compiler */
+               cid = InvalidCommandId; /* silence compiler */
                elog(ERROR, "xl_heap_new_cid record without a valid CommandId");
        }
 
@@ -768,7 +782,7 @@ SnapBuildDistributeNewCatalogSnapshot(SnapBuild *builder, XLogRecPtr lsn)
        /*
         * Iterate through all toplevel transactions. This can include
         * subtransactions which we just don't yet know to be that, but that's
-        * fine, they will just get an unneccesary snapshot queued.
+        * fine, they will just get an unnecessary snapshot queued.
         */
        dlist_foreach(txn_i, &builder->reorder->toplevel_by_lsn)
        {
@@ -818,7 +832,7 @@ SnapBuildAddCommittedTxn(SnapBuild *builder, TransactionId xid)
                         (uint32) builder->committed.xcnt_space);
 
                builder->committed.xip = repalloc(builder->committed.xip,
-                                       builder->committed.xcnt_space * sizeof(TransactionId));
+                                         builder->committed.xcnt_space * sizeof(TransactionId));
        }
 
        /*
@@ -831,7 +845,7 @@ SnapBuildAddCommittedTxn(SnapBuild *builder, TransactionId xid)
 
 /*
  * Remove knowledge about transactions we treat as committed that are smaller
- * than ->xmin. Those won't ever get checked via the ->commited array but via
+ * than ->xmin. Those won't ever get checked via the ->committed array but via
  * the clog machinery, so we don't need to waste memory on them.
  */
 static void
@@ -885,7 +899,7 @@ SnapBuildEndTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid)
        /*
         * NB: This handles subtransactions correctly even if we started from
         * suboverflowed xl_running_xacts because we only keep track of toplevel
-        * transactions. Since the latter are always are allocated before their
+        * transactions. Since the latter are always allocated before their
         * subxids and since they end at the same time it's sufficient to deal
         * with them here.
         */
@@ -900,10 +914,10 @@ SnapBuildEndTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid)
                         * so our incrementaly built snapshot now is consistent.
                         */
                        ereport(LOG,
-                                       (errmsg("logical decoding found consistent point at %X/%X",
-                                                       (uint32)(lsn >> 32), (uint32)lsn),
-                                        errdetail("xid %u finished, no running transactions anymore",
-                                                          xid)));
+                                 (errmsg("logical decoding found consistent point at %X/%X",
+                                                 (uint32) (lsn >> 32), (uint32) lsn),
+                                  errdetail("Transaction ID %u finished; no more running transactions.",
+                                                        xid)));
                        builder->state = SNAPBUILD_CONSISTENT;
                }
        }
@@ -956,8 +970,8 @@ SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid,
        if (builder->state < SNAPBUILD_CONSISTENT)
        {
                /* ensure that only commits after this are getting replayed */
-               if (builder->transactions_after < lsn)
-                       builder->transactions_after = lsn;
+               if (builder->start_decoding_at <= lsn)
+                       builder->start_decoding_at = lsn + 1;
 
                /*
                 * We could avoid treating !SnapBuildTxnIsRunning transactions as
@@ -965,7 +979,7 @@ SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid,
                 * we reached consistency.
                 */
                forced_timetravel = true;
-               elog(DEBUG1, "forced to assume catalog changes for xid %u because it was running to early", xid);
+               elog(DEBUG1, "forced to assume catalog changes for xid %u because it was running too early", xid);
        }
 
        for (nxact = 0; nxact < nsubxacts; nxact++)
@@ -1076,7 +1090,7 @@ SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid,
                /* refcount of the snapshot builder for the new snapshot */
                SnapBuildSnapIncRefcount(builder->snapshot);
 
-               /* add a new SnapshotNow to all currently running transactions */
+               /* add a new Snapshot to all currently running transactions */
                SnapBuildDistributeNewCatalogSnapshot(builder, lsn);
        }
        else
@@ -1170,15 +1184,16 @@ SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn, xl_running_xact
         */
        if (txn != NULL && txn->restart_decoding_lsn != InvalidXLogRecPtr)
                LogicalIncreaseRestartDecodingForSlot(lsn, txn->restart_decoding_lsn);
+
        /*
         * No in-progress transaction, can reuse the last serialized snapshot if
         * we have one.
         */
        else if (txn == NULL &&
-                        builder->reorder->current_restart_decoding_lsn != InvalidXLogRecPtr &&
+               builder->reorder->current_restart_decoding_lsn != InvalidXLogRecPtr &&
                         builder->last_serialized_snapshot != InvalidXLogRecPtr)
                LogicalIncreaseRestartDecodingForSlot(lsn,
-                                                                                  builder->last_serialized_snapshot);
+                                                                                 builder->last_serialized_snapshot);
 }
 
 
@@ -1199,23 +1214,23 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn
         * the currently running transactions. There are several ways to do that:
         *
         * a) There were no running transactions when the xl_running_xacts record
-        *    was inserted, jump to CONSISTENT immediately. We might find such a
-        *    state we were waiting for b) and c).
+        *        was inserted, jump to CONSISTENT immediately. We might find such a
+        *        state we were waiting for b) and c).
         *
         * b) Wait for all toplevel transactions that were running to end. We
-        *    simply track the number of in-progress toplevel transactions and
-        *    lower it whenever one commits or aborts. When that number
-        *    (builder->running.xcnt) reaches zero, we can go from FULL_SNAPSHOT
-        *    to CONSISTENT.
+        *        simply track the number of in-progress toplevel transactions and
+        *        lower it whenever one commits or aborts. When that number
+        *        (builder->running.xcnt) reaches zero, we can go from FULL_SNAPSHOT
+        *        to CONSISTENT.
         *        NB: We need to search running.xip when seeing a transaction's end to
-        *    make sure it's a toplevel transaction and it's been one of the
-        *    intially running ones.
+        *        make sure it's a toplevel transaction and it's been one of the
+        *        initially running ones.
         *        Interestingly, in contrast to HS, this allows us not to care about
         *        subtransactions - and by extension suboverflowed xl_running_xacts -
         *        at all.
         *
         * c) This (in a previous run) or another decoding slot serialized a
-        *    snapshot to disk that we can use.
+        *        snapshot to disk that we can use.
         * ---
         */
 
@@ -1228,10 +1243,10 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn
                                                                        builder->initial_xmin_horizon))
        {
                ereport(DEBUG1,
-                               (errmsg("skipping snapshot at %X/%X while building logical decoding snapshot, xmin horizon too low",
-                                               (uint32) (lsn >> 32), (uint32) lsn),
-                                errdetail("initial xmin horizon of %u vs the snapshot's %u",
-                                                  builder->initial_xmin_horizon, running->oldestRunningXid)));
+                               (errmsg_internal("skipping snapshot at %X/%X while building logical decoding snapshot, xmin horizon too low",
+                                                                (uint32) (lsn >> 32), (uint32) lsn),
+               errdetail_internal("initial xmin horizon of %u vs the snapshot's %u",
+                                builder->initial_xmin_horizon, running->oldestRunningXid)));
                return true;
        }
 
@@ -1243,14 +1258,16 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn
         */
        if (running->xcnt == 0)
        {
-               if (builder->transactions_after == InvalidXLogRecPtr ||
-                       builder->transactions_after < lsn)
-                       builder->transactions_after = lsn;
+               if (builder->start_decoding_at == InvalidXLogRecPtr ||
+                       builder->start_decoding_at <= lsn)
+                       /* can decode everything after this */
+                       builder->start_decoding_at = lsn + 1;
 
-               builder->xmin = running->oldestRunningXid;
-               builder->xmax = running->latestCompletedXid;
-               TransactionIdAdvance(builder->xmax);
+               /* As no transactions were running xmin/xmax can be trivially set. */
+               builder->xmin = running->nextXid;               /* < are finished */
+               builder->xmax = running->nextXid;               /* >= are running */
 
+               /* so we can safely use the faster comparisons */
                Assert(TransactionIdIsNormal(builder->xmin));
                Assert(TransactionIdIsNormal(builder->xmax));
 
@@ -1263,8 +1280,8 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn
 
                ereport(LOG,
                                (errmsg("logical decoding found consistent point at %X/%X",
-                                               (uint32)(lsn >> 32), (uint32)lsn),
-                                errdetail("running xacts with xcnt == 0")));
+                                               (uint32) (lsn >> 32), (uint32) lsn),
+                                errdetail("There are no running transactions.")));
 
                return false;
        }
@@ -1274,15 +1291,16 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn
                /* there won't be any state to cleanup */
                return false;
        }
+
        /*
         * b) first encounter of a useable xl_running_xacts record. If we had
-        * found one earlier we would either track running transactions
-        * (i.e. builder->running.xcnt != 0) or be consistent (this function
-        * wouldn't get called).
+        * found one earlier we would either track running transactions (i.e.
+        * builder->running.xcnt != 0) or be consistent (this function wouldn't
+        * get called).
         */
        else if (!builder->running.xcnt)
        {
-               int off;
+               int                     off;
 
                /*
                 * We only care about toplevel xids as those are the ones we
@@ -1290,9 +1308,14 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn
                 * instead of running transactions we don't need to know anything
                 * about uncommitted subtransactions.
                 */
-               builder->xmin = running->oldestRunningXid;
-               builder->xmax = running->latestCompletedXid;
-               TransactionIdAdvance(builder->xmax);
+
+               /*
+                * Start with an xmin/xmax that's correct for future, when all the
+                * currently running transactions have finished. We'll update both
+                * while waiting for the pending transactions to finish.
+                */
+               builder->xmin = running->nextXid;               /* < are finished */
+               builder->xmax = running->nextXid;               /* >= are running */
 
                /* so we can safely use the faster comparisons */
                Assert(TransactionIdIsNormal(builder->xmin));
@@ -1302,7 +1325,7 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn
                builder->running.xcnt_space = running->xcnt;
                builder->running.xip =
                        MemoryContextAlloc(builder->context,
-                                                       builder->running.xcnt * sizeof(TransactionId));
+                                                          builder->running.xcnt * sizeof(TransactionId));
                memcpy(builder->running.xip, running->xids,
                           builder->running.xcnt * sizeof(TransactionId));
 
@@ -1320,9 +1343,12 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn
                builder->state = SNAPBUILD_FULL_SNAPSHOT;
 
                ereport(LOG,
-                               (errmsg("logical decoding found initial starting point at %X/%X",
-                                               (uint32)(lsn >> 32), (uint32)lsn),
-                                errdetail("%u xacts need to finish", (uint32) builder->running.xcnt)));
+                       (errmsg("logical decoding found initial starting point at %X/%X",
+                                       (uint32) (lsn >> 32), (uint32) lsn),
+                        errdetail_plural("%u transaction needs to finish.",
+                                                         "%u transactions need to finish.",
+                                                         builder->running.xcnt,
+                                                         (uint32) builder->running.xcnt)));
 
                /*
                 * Iterate through all xids, wait for them to finish.
@@ -1331,7 +1357,7 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn
                 * isolationtester to notice that we're currently waiting for
                 * something.
                 */
-               for(off = 0; off < builder->running.xcnt; off++)
+               for (off = 0; off < builder->running.xcnt; off++)
                {
                        TransactionId xid = builder->running.xip[off];
 
@@ -1378,7 +1404,7 @@ typedef struct SnapBuildOnDisk
 
        /* data not covered by checksum */
        uint32          magic;
-       pg_crc32        checksum;
+       pg_crc32c       checksum;
 
        /* data covered by checksum */
 
@@ -1399,7 +1425,7 @@ typedef struct SnapBuildOnDisk
        offsetof(SnapBuildOnDisk, version)
 
 #define SNAPBUILD_MAGIC 0x51A1E001
-#define SNAPBUILD_VERSION 1
+#define SNAPBUILD_VERSION 2
 
 /*
  * Store/Load a snapshot from disk, depending on the snapshot builder's state.
@@ -1431,7 +1457,7 @@ SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn)
        char            path[MAXPGPATH];
        int                     ret;
        struct stat stat_buf;
-       uint32          sz;
+       Size            sz;
 
        Assert(lsn != InvalidXLogRecPtr);
        Assert(builder->last_serialized_snapshot == InvalidXLogRecPtr ||
@@ -1450,7 +1476,7 @@ SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn)
         * unless the user used pg_resetxlog or similar. If a user did so, there's
         * no hope continuing to decode anyway.
         */
-       sprintf(path, "pg_llog/snapshots/%X-%X.snap",
+       sprintf(path, "pg_logical/snapshots/%X-%X.snap",
                        (uint32) (lsn >> 32), (uint32) lsn);
 
        /*
@@ -1471,12 +1497,12 @@ SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn)
                 * but remember location, so we don't need to read old data again.
                 *
                 * To be sure it has been synced to disk after the rename() from the
-                * tempfile filename to the real filename, we just repeat the
-                * fsync. That ought to be cheap because in most scenarios it should
-                * already be safely on disk.
+                * tempfile filename to the real filename, we just repeat the fsync.
+                * That ought to be cheap because in most scenarios it should already
+                * be safely on disk.
                 */
                fsync_fname(path, false);
-               fsync_fname("pg_llog/snapshots", true);
+               fsync_fname("pg_logical/snapshots", true);
 
                builder->last_serialized_snapshot = lsn;
                goto out;
@@ -1492,7 +1518,7 @@ SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn)
        elog(DEBUG1, "serializing snapshot to %s", path);
 
        /* to make sure only we will write to this tempfile, include pid */
-       sprintf(tmppath, "pg_llog/snapshots/%X-%X.snap.%u.tmp",
+       sprintf(tmppath, "pg_logical/snapshots/%X-%X.snap.%u.tmp",
                        (uint32) (lsn >> 32), (uint32) lsn, MyProcPid);
 
        /*
@@ -1504,7 +1530,7 @@ SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn)
        if (unlink(tmppath) != 0 && errno != ENOENT)
                ereport(ERROR,
                                (errcode_for_file_access(),
-                                errmsg("could not unlink file \"%s\": %m",     path)));
+                                errmsg("could not remove file \"%s\": %m", path)));
 
        needed_length = sizeof(SnapBuildOnDisk) +
                sizeof(TransactionId) * builder->running.xcnt_space +
@@ -1515,10 +1541,10 @@ SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn)
        ondisk->magic = SNAPBUILD_MAGIC;
        ondisk->version = SNAPBUILD_VERSION;
        ondisk->length = needed_length;
-       INIT_CRC32(ondisk->checksum);
-       COMP_CRC32(ondisk->checksum,
-                          ((char *) ondisk) + SnapBuildOnDiskNotChecksummedSize,
-                          SnapBuildOnDiskConstantSize - SnapBuildOnDiskNotChecksummedSize);
+       INIT_CRC32C(ondisk->checksum);
+       COMP_CRC32C(ondisk->checksum,
+                               ((char *) ondisk) + SnapBuildOnDiskNotChecksummedSize,
+                       SnapBuildOnDiskConstantSize - SnapBuildOnDiskNotChecksummedSize);
        ondisk_c += sizeof(SnapBuildOnDisk);
 
        memcpy(&ondisk->builder, builder, sizeof(SnapBuild));
@@ -1529,22 +1555,24 @@ SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn)
        ondisk->builder.running.xip = NULL;
        ondisk->builder.committed.xip = NULL;
 
-       COMP_CRC32(ondisk->checksum,
-                          &ondisk->builder,
-                          sizeof(SnapBuild));
+       COMP_CRC32C(ondisk->checksum,
+                               &ondisk->builder,
+                               sizeof(SnapBuild));
 
        /* copy running xacts */
        sz = sizeof(TransactionId) * builder->running.xcnt_space;
        memcpy(ondisk_c, builder->running.xip, sz);
-       COMP_CRC32(ondisk->checksum, ondisk_c, sz);
+       COMP_CRC32C(ondisk->checksum, ondisk_c, sz);
        ondisk_c += sz;
 
        /* copy committed xacts */
        sz = sizeof(TransactionId) * builder->committed.xcnt;
        memcpy(ondisk_c, builder->committed.xip, sz);
-       COMP_CRC32(ondisk->checksum, ondisk_c, sz);
+       COMP_CRC32C(ondisk->checksum, ondisk_c, sz);
        ondisk_c += sz;
 
+       FIN_CRC32C(ondisk->checksum);
+
        /* we have valid data now, open tempfile and write it there */
        fd = OpenTransientFile(tmppath,
                                                   O_CREAT | O_EXCL | O_WRONLY | PG_BINARY,
@@ -1578,11 +1606,11 @@ SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn)
        }
        CloseTransientFile(fd);
 
-       fsync_fname("pg_llog/snapshots", true);
+       fsync_fname("pg_logical/snapshots", true);
 
        /*
         * We may overwrite the work from some other backend, but that's ok, our
-        * snapshot is valid as well, we'll just have done some superflous work.
+        * snapshot is valid as well, we'll just have done some superfluous work.
         */
        if (rename(tmppath, path) != 0)
        {
@@ -1594,11 +1622,11 @@ SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn)
 
        /* make sure we persist */
        fsync_fname(path, false);
-       fsync_fname("pg_llog/snapshots", true);
+       fsync_fname("pg_logical/snapshots", true);
 
        /*
-        * Now there's no way we can loose the dumped state anymore, remember
-        * this as a serialization point.
+        * Now there's no way we can loose the dumped state anymore, remember this
+        * as a serialization point.
         */
        builder->last_serialized_snapshot = lsn;
 
@@ -1619,13 +1647,13 @@ SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn)
        char            path[MAXPGPATH];
        Size            sz;
        int                     readBytes;
-       pg_crc32        checksum;
+       pg_crc32c       checksum;
 
        /* no point in loading a snapshot if we're already there */
        if (builder->state == SNAPBUILD_CONSISTENT)
                return false;
 
-       sprintf(path, "pg_llog/snapshots/%X-%X.snap",
+       sprintf(path, "pg_logical/snapshots/%X-%X.snap",
                        (uint32) (lsn >> 32), (uint32) lsn);
 
        fd = OpenTransientFile(path, O_RDONLY | PG_BINARY, 0);
@@ -1641,12 +1669,12 @@ SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn)
         * Make sure the snapshot had been stored safely to disk, that's normally
         * cheap.
         * Note that we do not need PANIC here, nobody will be able to use the
-        * slot without fsyncing, and saving it won't suceed without an fsync()
+        * slot without fsyncing, and saving it won't succeed without an fsync()
         * either...
         * ----
         */
        fsync_fname(path, false);
-       fsync_fname("pg_llog/snapshots", true);
+       fsync_fname("pg_logical/snapshots", true);
 
 
        /* read statically sized portion of snapshot */
@@ -1662,18 +1690,18 @@ SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn)
 
        if (ondisk.magic != SNAPBUILD_MAGIC)
                ereport(ERROR,
-                               (errmsg("snapbuild state file \"%s\" has wrong magic %u instead of %u",
+                               (errmsg("snapbuild state file \"%s\" has wrong magic number: %u instead of %u",
                                                path, ondisk.magic, SNAPBUILD_MAGIC)));
 
        if (ondisk.version != SNAPBUILD_VERSION)
                ereport(ERROR,
-                               (errmsg("snapbuild state file \"%s\" has unsupported version %u instead of %u",
+                               (errmsg("snapbuild state file \"%s\" has unsupported version: %u instead of %u",
                                                path, ondisk.version, SNAPBUILD_VERSION)));
 
-       INIT_CRC32(checksum);
-       COMP_CRC32(checksum,
-                          ((char *) &ondisk) + SnapBuildOnDiskNotChecksummedSize,
-                          SnapBuildOnDiskConstantSize - SnapBuildOnDiskNotChecksummedSize);
+       INIT_CRC32C(checksum);
+       COMP_CRC32C(checksum,
+                               ((char *) &ondisk) + SnapBuildOnDiskNotChecksummedSize,
+                       SnapBuildOnDiskConstantSize - SnapBuildOnDiskNotChecksummedSize);
 
        /* read SnapBuild */
        readBytes = read(fd, &ondisk.builder, sizeof(SnapBuild));
@@ -1685,11 +1713,11 @@ SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn)
                                 errmsg("could not read file \"%s\", read %d of %d: %m",
                                                path, readBytes, (int) sizeof(SnapBuild))));
        }
-       COMP_CRC32(checksum, &ondisk.builder, sizeof(SnapBuild));
+       COMP_CRC32C(checksum, &ondisk.builder, sizeof(SnapBuild));
 
        /* restore running xacts information */
        sz = sizeof(TransactionId) * ondisk.builder.running.xcnt_space;
-       ondisk.builder.running.xip = MemoryContextAlloc(builder->context, sz);
+       ondisk.builder.running.xip = MemoryContextAllocZero(builder->context, sz);
        readBytes = read(fd, ondisk.builder.running.xip, sz);
        if (readBytes != sz)
        {
@@ -1699,11 +1727,11 @@ SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn)
                                 errmsg("could not read file \"%s\", read %d of %d: %m",
                                                path, readBytes, (int) sz)));
        }
-       COMP_CRC32(checksum, ondisk.builder.running.xip, sz);
+       COMP_CRC32C(checksum, ondisk.builder.running.xip, sz);
 
        /* restore committed xacts information */
        sz = sizeof(TransactionId) * ondisk.builder.committed.xcnt;
-       ondisk.builder.committed.xip = MemoryContextAlloc(builder->context, sz);
+       ondisk.builder.committed.xip = MemoryContextAllocZero(builder->context, sz);
        readBytes = read(fd, ondisk.builder.committed.xip, sz);
        if (readBytes != sz)
        {
@@ -1713,15 +1741,17 @@ SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn)
                                 errmsg("could not read file \"%s\", read %d of %d: %m",
                                                path, readBytes, (int) sz)));
        }
-       COMP_CRC32(checksum, ondisk.builder.committed.xip, sz);
+       COMP_CRC32C(checksum, ondisk.builder.committed.xip, sz);
 
        CloseTransientFile(fd);
 
+       FIN_CRC32C(checksum);
+
        /* verify checksum of what we've read */
-       if (!EQ_CRC32(checksum, ondisk.checksum))
+       if (!EQ_CRC32C(checksum, ondisk.checksum))
                ereport(ERROR,
                                (errcode_for_file_access(),
-                                errmsg("snapbuild state file %s: checksum mismatch, is %u, should be %u",
+                                errmsg("checksum mismatch for snapbuild state file \"%s\": is %u, should be %u",
                                                path, checksum, ondisk.checksum)));
 
        /*
@@ -1731,7 +1761,7 @@ SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn)
 
        /*
         * We are only interested in consistent snapshots for now, comparing
-        * whether one imcomplete snapshot is more "advanced" seems to be
+        * whether one incomplete snapshot is more "advanced" seems to be
         * unnecessarily complex.
         */
        if (ondisk.builder.state < SNAPBUILD_CONSISTENT)
@@ -1761,10 +1791,10 @@ SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn)
        }
        ondisk.builder.committed.xip = NULL;
 
-       builder->running.xcnt = ondisk.builder.committed.xcnt;
+       builder->running.xcnt = ondisk.builder.running.xcnt;
        if (builder->running.xip)
                pfree(builder->running.xip);
-       builder->running.xcnt_space = ondisk.builder.committed.xcnt_space;
+       builder->running.xcnt_space = ondisk.builder.running.xcnt_space;
        builder->running.xip = ondisk.builder.running.xip;
 
        /* our snapshot is not interesting anymore, build a new one */
@@ -1781,8 +1811,8 @@ SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn)
 
        ereport(LOG,
                        (errmsg("logical decoding found consistent point at %X/%X",
-                                       (uint32)(lsn >> 32), (uint32)lsn),
-                        errdetail("found initial snapshot in snapbuild file")));
+                                       (uint32) (lsn >> 32), (uint32) lsn),
+                        errdetail("Logical decoding will begin using saved snapshot.")));
        return true;
 
 snapshot_not_interesting:
@@ -1823,19 +1853,19 @@ CheckPointSnapBuild(void)
        if (redo < cutoff)
                cutoff = redo;
 
-       snap_dir = AllocateDir("pg_llog/snapshots");
-       while ((snap_de = ReadDir(snap_dir, "pg_llog/snapshots")) != NULL)
+       snap_dir = AllocateDir("pg_logical/snapshots");
+       while ((snap_de = ReadDir(snap_dir, "pg_logical/snapshots")) != NULL)
        {
                uint32          hi;
                uint32          lo;
                XLogRecPtr      lsn;
-               struct stat     statbuf;
+               struct stat statbuf;
 
                if (strcmp(snap_de->d_name, ".") == 0 ||
                        strcmp(snap_de->d_name, "..") == 0)
                        continue;
 
-               snprintf(path, MAXPGPATH, "pg_llog/snapshots/%s", snap_de->d_name);
+               snprintf(path, MAXPGPATH, "pg_logical/snapshots/%s", snap_de->d_name);
 
                if (lstat(path, &statbuf) == 0 && !S_ISREG(statbuf.st_mode))
                {
@@ -1846,8 +1876,8 @@ CheckPointSnapBuild(void)
                /*
                 * temporary filenames from SnapBuildSerialize() include the LSN and
                 * everything but are postfixed by .$pid.tmp. We can just remove them
-                * the same as other files because there can be none that are currently
-                * being written that are older than cutoff.
+                * the same as other files because there can be none that are
+                * currently being written that are older than cutoff.
                 *
                 * We just log a message if a file doesn't fit the pattern, it's
                 * probably some editors lock/state file or similar...
@@ -1855,7 +1885,7 @@ CheckPointSnapBuild(void)
                if (sscanf(snap_de->d_name, "%X-%X.snap", &hi, &lo) != 2)
                {
                        ereport(LOG,
-                                       (errmsg("could not parse filename \"%s\"", path)));
+                                       (errmsg("could not parse file name \"%s\"", path)));
                        continue;
                }
 
@@ -1875,7 +1905,7 @@ CheckPointSnapBuild(void)
                        {
                                ereport(LOG,
                                                (errcode_for_file_access(),
-                                                errmsg("could not unlink file \"%s\": %m",
+                                                errmsg("could not remove file \"%s\": %m",
                                                                path)));
                                continue;
                        }