*
* As the percentage of transactions modifying the catalog normally is fairly
* small in comparisons to ones only manipulating user data, we keep track of
- * the committed catalog modifying ones inside (xmin, xmax) instead of keeping
+ * the committed catalog modifying ones inside [xmin, xmax) instead of keeping
* track of all running transactions like it's done in a normal snapshot. Note
* that we're generally only looking at transactions that have acquired an
* xid. That is we keep a list of transactions between snapshot->(xmin, xmax)
*
* The snapbuild machinery is starting up in several stages, as illustrated
* by the following graph:
- * +-------------------------+
- * +----|SNAPBUILD_START |-------------+
- * | +-------------------------+ |
- * | | |
- * | | |
- * | running_xacts with running xacts |
- * | | |
- * | | |
- * | v |
- * | +-------------------------+ v
- * | |SNAPBUILD_FULL_SNAPSHOT |------------>|
- * | +-------------------------+ |
- * running_xacts | saved snapshot
- * with zero xacts | at running_xacts's lsn
- * | | |
- * | all running toplevel TXNs finished |
- * | | |
- * | v |
- * | +-------------------------+ |
- * +--->|SNAPBUILD_CONSISTENT |<------------+
- * +-------------------------+
+ * +-------------------------+
+ * +----|SNAPBUILD_START |-------------+
+ * | +-------------------------+ |
+ * | | |
+ * | | |
+ * | running_xacts with running xacts |
+ * | | |
+ * | | |
+ * | v |
+ * | +-------------------------+ v
+ * | |SNAPBUILD_FULL_SNAPSHOT |------------>|
+ * | +-------------------------+ |
+ * running_xacts | saved snapshot
+ * with zero xacts | at running_xacts's lsn
+ * | | |
+ * | all running toplevel TXNs finished |
+ * | | |
+ * | v |
+ * | +-------------------------+ |
+ * +--->|SNAPBUILD_CONSISTENT |<------------+
+ * +-------------------------+
*
- * Initially the machinery is in the START stage. When a xl_running_xacts
+ * Initially the machinery is in the START stage. When an xl_running_xacts
* record is read that is sufficiently new (above the safe xmin horizon),
* there's a state transition. If there were no running xacts when the
* runnign_xacts record was generated, we'll directly go into CONSISTENT
* is a convenient point to initialize replication from, which is why we
* export a snapshot at that point, which *can* be used to read normal data.
*
- * Copyright (c) 2012-2014, PostgreSQL Global Development Group
+ * Copyright (c) 2012-2016, PostgreSQL Global Development Group
*
* IDENTIFICATION
* src/backend/replication/snapbuild.c
TransactionId xmax;
/*
- * Don't replay commits from an LSN <= this LSN. This can be set
- * externally but it will also be advanced (never retreat) from within
- * snapbuild.c.
+ * Don't replay commits from an LSN < this LSN. This can be set externally
+ * but it will also be advanced (never retreat) from within snapbuild.c.
*/
- XLogRecPtr transactions_after;
+ XLogRecPtr start_decoding_at;
/*
* Don't start decoding WAL until the "xl_running_xacts" information
- * indicates there are no running xids with a xid smaller than this.
+ * indicates there are no running xids with an xid smaller than this.
*/
TransactionId initial_xmin_horizon;
* Information about initially running transactions
*
* When we start building a snapshot there already may be transactions in
- * progress. Those are stored in running.xip. We don't have enough
+ * progress. Those are stored in running.xip. We don't have enough
* information about those to decode their contents, so until they are
* finished (xcnt=0) we cannot switch to a CONSISTENT state.
*/
* Starting a transaction -- which we need to do while exporting a snapshot --
* removes knowledge about the previously used resowner, so we save it here.
*/
-ResourceOwner SavedResourceOwnerDuringExport = NULL;
-bool ExportInProgress = false;
+static ResourceOwner SavedResourceOwnerDuringExport = NULL;
+static bool ExportInProgress = false;
/* transaction state manipulation functions */
static void SnapBuildEndTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid);
/* allocate memory in own context, to have better accountability */
context = AllocSetContextCreate(CurrentMemoryContext,
"snapshot builder context",
- ALLOCSET_DEFAULT_MINSIZE,
- ALLOCSET_DEFAULT_INITSIZE,
- ALLOCSET_DEFAULT_MAXSIZE);
+ ALLOCSET_DEFAULT_SIZES);
oldcontext = MemoryContextSwitchTo(context);
builder = palloc0(sizeof(SnapBuild));
builder->committed.xip =
palloc0(builder->committed.xcnt_space * sizeof(TransactionId));
builder->committed.includes_all_transactions = true;
- builder->committed.xip =
- palloc0(builder->committed.xcnt_space * sizeof(TransactionId));
+
builder->initial_xmin_horizon = xmin_horizon;
- builder->transactions_after = start_lsn;
+ builder->start_decoding_at = start_lsn;
MemoryContextSwitchTo(oldcontext);
Assert(snap->curcid == FirstCommandId);
Assert(!snap->suboverflowed);
Assert(!snap->takenDuringRecovery);
- Assert(snap->regd_count == 1);
+ Assert(snap->regd_count == 0);
/* slightly more likely, so it's checked even without c-asserts */
if (snap->copied)
bool
SnapBuildXactNeedsSkip(SnapBuild *builder, XLogRecPtr ptr)
{
- return ptr <= builder->transactions_after;
+ return ptr < builder->start_decoding_at;
}
/*
Assert(!snap->suboverflowed);
Assert(!snap->takenDuringRecovery);
- Assert(snap->regd_count == 1);
+ Assert(snap->regd_count == 0);
- Assert(snap->active_count);
+ Assert(snap->active_count > 0);
/* slightly more likely, so it's checked even without casserts */
if (snap->copied)
elog(ERROR, "cannot free a copied snapshot");
snap->active_count--;
- if (!snap->active_count)
+ if (snap->active_count == 0)
SnapBuildFreeSnapshot(snap);
}
snapshot->copied = false;
snapshot->curcid = FirstCommandId;
snapshot->active_count = 0;
- snapshot->regd_count = 1; /* mark as registered so nobody frees it */
+ snapshot->regd_count = 0;
return snapshot;
}
snapname = ExportSnapshot(snap);
ereport(LOG,
- (errmsg("exported logical decoding snapshot: \"%s\" with %u xids",
- snapname, snap->xcnt)));
+ (errmsg_plural("exported logical decoding snapshot: \"%s\" with %u transaction ID",
+ "exported logical decoding snapshot: \"%s\" with %u transaction IDs",
+ snap->xcnt,
+ snapname, snap->xcnt)));
return snapname;
}
+/*
+ * Ensure there is a snapshot and if not build one for current transaction.
+ */
+Snapshot
+SnapBuildGetOrBuildSnapshot(SnapBuild *builder, TransactionId xid)
+{
+ Assert(builder->state == SNAPBUILD_CONSISTENT);
+
+ /* only build a new snapshot if we don't have a prebuilt one */
+ if (builder->snapshot == NULL)
+ {
+ builder->snapshot = SnapBuildBuildSnapshot(builder, xid);
+ /* inrease refcount for the snapshot builder */
+ SnapBuildSnapIncRefcount(builder->snapshot);
+ }
+
+ return builder->snapshot;
+}
+
/*
* Reset a previously SnapBuildExportSnapshot()'ed snapshot if there is
* any. Aborts the previously started transaction and resets the resource
* owner back to its original value.
*/
void
-SnapBuildClearExportedSnapshot()
+SnapBuildClearExportedSnapshot(void)
{
- /* nothing exported, thats the usual case */
+ /* nothing exported, that is the usual case */
if (!ExportInProgress)
return;
bool
SnapBuildProcessChange(SnapBuild *builder, TransactionId xid, XLogRecPtr lsn)
{
- bool is_old_tx;
-
/*
* We can't handle data in transactions if we haven't built a snapshot
* yet, so don't store them.
* If the reorderbuffer doesn't yet have a snapshot, add one now, it will
* be needed to decode the change we're currently processing.
*/
- is_old_tx = ReorderBufferIsXidKnown(builder->reorder, xid);
-
- if (!is_old_tx || !ReorderBufferXidHasBaseSnapshot(builder->reorder, xid))
+ if (!ReorderBufferXidHasBaseSnapshot(builder->reorder, xid))
{
/* only build a new snapshot if we don't have a prebuilt one */
if (builder->snapshot == NULL)
}
/*
- * Do CommandId/ComboCid handling after reading a xl_heap_new_cid record. This
- * implies that a transaction has done some form of write to system catalogs.
+ * Do CommandId/ComboCid handling after reading an xl_heap_new_cid record.
+ * This implies that a transaction has done some form of write to system
+ * catalogs.
*/
void
SnapBuildProcessNewCid(SnapBuild *builder, TransactionId xid,
CommandId cid;
/*
- * we only log new_cid's if a catalog tuple was modified, so mark
- * the transaction as containing catalog modifications
+ * we only log new_cid's if a catalog tuple was modified, so mark the
+ * transaction as containing catalog modifications
*/
- ReorderBufferXidSetCatalogChanges(builder->reorder, xid,lsn);
+ ReorderBufferXidSetCatalogChanges(builder->reorder, xid, lsn);
ReorderBufferAddNewTupleCids(builder->reorder, xlrec->top_xid, lsn,
- xlrec->target.node, xlrec->target.tid,
+ xlrec->target_node, xlrec->target_tid,
xlrec->cmin, xlrec->cmax,
xlrec->combocid);
cid = xlrec->cmin;
else
{
- cid = InvalidCommandId; /* silence compiler */
+ cid = InvalidCommandId; /* silence compiler */
elog(ERROR, "xl_heap_new_cid record without a valid CommandId");
}
/*
* Iterate through all toplevel transactions. This can include
* subtransactions which we just don't yet know to be that, but that's
- * fine, they will just get an unneccesary snapshot queued.
+ * fine, they will just get an unnecessary snapshot queued.
*/
dlist_foreach(txn_i, &builder->reorder->toplevel_by_lsn)
{
(uint32) builder->committed.xcnt_space);
builder->committed.xip = repalloc(builder->committed.xip,
- builder->committed.xcnt_space * sizeof(TransactionId));
+ builder->committed.xcnt_space * sizeof(TransactionId));
}
/*
/*
* Remove knowledge about transactions we treat as committed that are smaller
- * than ->xmin. Those won't ever get checked via the ->commited array but via
+ * than ->xmin. Those won't ever get checked via the ->committed array but via
* the clog machinery, so we don't need to waste memory on them.
*/
static void
/*
* NB: This handles subtransactions correctly even if we started from
* suboverflowed xl_running_xacts because we only keep track of toplevel
- * transactions. Since the latter are always are allocated before their
+ * transactions. Since the latter are always allocated before their
* subxids and since they end at the same time it's sufficient to deal
* with them here.
*/
* so our incrementaly built snapshot now is consistent.
*/
ereport(LOG,
- (errmsg("logical decoding found consistent point at %X/%X",
- (uint32)(lsn >> 32), (uint32)lsn),
- errdetail("xid %u finished, no running transactions anymore",
- xid)));
+ (errmsg("logical decoding found consistent point at %X/%X",
+ (uint32) (lsn >> 32), (uint32) lsn),
+ errdetail("Transaction ID %u finished; no more running transactions.",
+ xid)));
builder->state = SNAPBUILD_CONSISTENT;
}
}
if (builder->state < SNAPBUILD_CONSISTENT)
{
/* ensure that only commits after this are getting replayed */
- if (builder->transactions_after < lsn)
- builder->transactions_after = lsn;
+ if (builder->start_decoding_at <= lsn)
+ builder->start_decoding_at = lsn + 1;
/*
* We could avoid treating !SnapBuildTxnIsRunning transactions as
* we reached consistency.
*/
forced_timetravel = true;
- elog(DEBUG1, "forced to assume catalog changes for xid %u because it was running to early", xid);
+ elog(DEBUG1, "forced to assume catalog changes for xid %u because it was running too early", xid);
}
for (nxact = 0; nxact < nsubxacts; nxact++)
/* refcount of the snapshot builder for the new snapshot */
SnapBuildSnapIncRefcount(builder->snapshot);
- /* add a new SnapshotNow to all currently running transactions */
+ /* add a new Snapshot to all currently running transactions */
SnapBuildDistributeNewCatalogSnapshot(builder, lsn);
}
else
*/
if (txn != NULL && txn->restart_decoding_lsn != InvalidXLogRecPtr)
LogicalIncreaseRestartDecodingForSlot(lsn, txn->restart_decoding_lsn);
+
/*
* No in-progress transaction, can reuse the last serialized snapshot if
* we have one.
*/
else if (txn == NULL &&
- builder->reorder->current_restart_decoding_lsn != InvalidXLogRecPtr &&
+ builder->reorder->current_restart_decoding_lsn != InvalidXLogRecPtr &&
builder->last_serialized_snapshot != InvalidXLogRecPtr)
LogicalIncreaseRestartDecodingForSlot(lsn,
- builder->last_serialized_snapshot);
+ builder->last_serialized_snapshot);
}
* the currently running transactions. There are several ways to do that:
*
* a) There were no running transactions when the xl_running_xacts record
- * was inserted, jump to CONSISTENT immediately. We might find such a
- * state we were waiting for b) and c).
+ * was inserted, jump to CONSISTENT immediately. We might find such a
+ * state we were waiting for b) and c).
*
* b) Wait for all toplevel transactions that were running to end. We
- * simply track the number of in-progress toplevel transactions and
- * lower it whenever one commits or aborts. When that number
- * (builder->running.xcnt) reaches zero, we can go from FULL_SNAPSHOT
- * to CONSISTENT.
+ * simply track the number of in-progress toplevel transactions and
+ * lower it whenever one commits or aborts. When that number
+ * (builder->running.xcnt) reaches zero, we can go from FULL_SNAPSHOT
+ * to CONSISTENT.
* NB: We need to search running.xip when seeing a transaction's end to
- * make sure it's a toplevel transaction and it's been one of the
- * intially running ones.
+ * make sure it's a toplevel transaction and it's been one of the
+ * initially running ones.
* Interestingly, in contrast to HS, this allows us not to care about
* subtransactions - and by extension suboverflowed xl_running_xacts -
* at all.
*
* c) This (in a previous run) or another decoding slot serialized a
- * snapshot to disk that we can use.
+ * snapshot to disk that we can use.
* ---
*/
builder->initial_xmin_horizon))
{
ereport(DEBUG1,
- (errmsg("skipping snapshot at %X/%X while building logical decoding snapshot, xmin horizon too low",
- (uint32) (lsn >> 32), (uint32) lsn),
- errdetail("initial xmin horizon of %u vs the snapshot's %u",
- builder->initial_xmin_horizon, running->oldestRunningXid)));
+ (errmsg_internal("skipping snapshot at %X/%X while building logical decoding snapshot, xmin horizon too low",
+ (uint32) (lsn >> 32), (uint32) lsn),
+ errdetail_internal("initial xmin horizon of %u vs the snapshot's %u",
+ builder->initial_xmin_horizon, running->oldestRunningXid)));
return true;
}
*/
if (running->xcnt == 0)
{
- if (builder->transactions_after == InvalidXLogRecPtr ||
- builder->transactions_after < lsn)
- builder->transactions_after = lsn;
+ if (builder->start_decoding_at == InvalidXLogRecPtr ||
+ builder->start_decoding_at <= lsn)
+ /* can decode everything after this */
+ builder->start_decoding_at = lsn + 1;
- builder->xmin = running->oldestRunningXid;
- builder->xmax = running->latestCompletedXid;
- TransactionIdAdvance(builder->xmax);
+ /* As no transactions were running xmin/xmax can be trivially set. */
+ builder->xmin = running->nextXid; /* < are finished */
+ builder->xmax = running->nextXid; /* >= are running */
+ /* so we can safely use the faster comparisons */
Assert(TransactionIdIsNormal(builder->xmin));
Assert(TransactionIdIsNormal(builder->xmax));
ereport(LOG,
(errmsg("logical decoding found consistent point at %X/%X",
- (uint32)(lsn >> 32), (uint32)lsn),
- errdetail("running xacts with xcnt == 0")));
+ (uint32) (lsn >> 32), (uint32) lsn),
+ errdetail("There are no running transactions.")));
return false;
}
/* there won't be any state to cleanup */
return false;
}
+
/*
* b) first encounter of a useable xl_running_xacts record. If we had
- * found one earlier we would either track running transactions
- * (i.e. builder->running.xcnt != 0) or be consistent (this function
- * wouldn't get called).
+ * found one earlier we would either track running transactions (i.e.
+ * builder->running.xcnt != 0) or be consistent (this function wouldn't
+ * get called).
*/
else if (!builder->running.xcnt)
{
- int off;
+ int off;
/*
* We only care about toplevel xids as those are the ones we
* instead of running transactions we don't need to know anything
* about uncommitted subtransactions.
*/
- builder->xmin = running->oldestRunningXid;
- builder->xmax = running->latestCompletedXid;
- TransactionIdAdvance(builder->xmax);
+
+ /*
+ * Start with an xmin/xmax that's correct for future, when all the
+ * currently running transactions have finished. We'll update both
+ * while waiting for the pending transactions to finish.
+ */
+ builder->xmin = running->nextXid; /* < are finished */
+ builder->xmax = running->nextXid; /* >= are running */
/* so we can safely use the faster comparisons */
Assert(TransactionIdIsNormal(builder->xmin));
builder->running.xcnt_space = running->xcnt;
builder->running.xip =
MemoryContextAlloc(builder->context,
- builder->running.xcnt * sizeof(TransactionId));
+ builder->running.xcnt * sizeof(TransactionId));
memcpy(builder->running.xip, running->xids,
builder->running.xcnt * sizeof(TransactionId));
builder->state = SNAPBUILD_FULL_SNAPSHOT;
ereport(LOG,
- (errmsg("logical decoding found initial starting point at %X/%X",
- (uint32)(lsn >> 32), (uint32)lsn),
- errdetail("%u xacts need to finish", (uint32) builder->running.xcnt)));
+ (errmsg("logical decoding found initial starting point at %X/%X",
+ (uint32) (lsn >> 32), (uint32) lsn),
+ errdetail_plural("%u transaction needs to finish.",
+ "%u transactions need to finish.",
+ builder->running.xcnt,
+ (uint32) builder->running.xcnt)));
/*
* Iterate through all xids, wait for them to finish.
* isolationtester to notice that we're currently waiting for
* something.
*/
- for(off = 0; off < builder->running.xcnt; off++)
+ for (off = 0; off < builder->running.xcnt; off++)
{
TransactionId xid = builder->running.xip[off];
/* data not covered by checksum */
uint32 magic;
- pg_crc32 checksum;
+ pg_crc32c checksum;
/* data covered by checksum */
offsetof(SnapBuildOnDisk, version)
#define SNAPBUILD_MAGIC 0x51A1E001
-#define SNAPBUILD_VERSION 1
+#define SNAPBUILD_VERSION 2
/*
* Store/Load a snapshot from disk, depending on the snapshot builder's state.
char path[MAXPGPATH];
int ret;
struct stat stat_buf;
- uint32 sz;
+ Size sz;
Assert(lsn != InvalidXLogRecPtr);
Assert(builder->last_serialized_snapshot == InvalidXLogRecPtr ||
* unless the user used pg_resetxlog or similar. If a user did so, there's
* no hope continuing to decode anyway.
*/
- sprintf(path, "pg_llog/snapshots/%X-%X.snap",
+ sprintf(path, "pg_logical/snapshots/%X-%X.snap",
(uint32) (lsn >> 32), (uint32) lsn);
/*
* but remember location, so we don't need to read old data again.
*
* To be sure it has been synced to disk after the rename() from the
- * tempfile filename to the real filename, we just repeat the
- * fsync. That ought to be cheap because in most scenarios it should
- * already be safely on disk.
+ * tempfile filename to the real filename, we just repeat the fsync.
+ * That ought to be cheap because in most scenarios it should already
+ * be safely on disk.
*/
fsync_fname(path, false);
- fsync_fname("pg_llog/snapshots", true);
+ fsync_fname("pg_logical/snapshots", true);
builder->last_serialized_snapshot = lsn;
goto out;
elog(DEBUG1, "serializing snapshot to %s", path);
/* to make sure only we will write to this tempfile, include pid */
- sprintf(tmppath, "pg_llog/snapshots/%X-%X.snap.%u.tmp",
+ sprintf(tmppath, "pg_logical/snapshots/%X-%X.snap.%u.tmp",
(uint32) (lsn >> 32), (uint32) lsn, MyProcPid);
/*
if (unlink(tmppath) != 0 && errno != ENOENT)
ereport(ERROR,
(errcode_for_file_access(),
- errmsg("could not unlink file \"%s\": %m", path)));
+ errmsg("could not remove file \"%s\": %m", path)));
needed_length = sizeof(SnapBuildOnDisk) +
sizeof(TransactionId) * builder->running.xcnt_space +
ondisk->magic = SNAPBUILD_MAGIC;
ondisk->version = SNAPBUILD_VERSION;
ondisk->length = needed_length;
- INIT_CRC32(ondisk->checksum);
- COMP_CRC32(ondisk->checksum,
- ((char *) ondisk) + SnapBuildOnDiskNotChecksummedSize,
- SnapBuildOnDiskConstantSize - SnapBuildOnDiskNotChecksummedSize);
+ INIT_CRC32C(ondisk->checksum);
+ COMP_CRC32C(ondisk->checksum,
+ ((char *) ondisk) + SnapBuildOnDiskNotChecksummedSize,
+ SnapBuildOnDiskConstantSize - SnapBuildOnDiskNotChecksummedSize);
ondisk_c += sizeof(SnapBuildOnDisk);
memcpy(&ondisk->builder, builder, sizeof(SnapBuild));
ondisk->builder.running.xip = NULL;
ondisk->builder.committed.xip = NULL;
- COMP_CRC32(ondisk->checksum,
- &ondisk->builder,
- sizeof(SnapBuild));
+ COMP_CRC32C(ondisk->checksum,
+ &ondisk->builder,
+ sizeof(SnapBuild));
/* copy running xacts */
sz = sizeof(TransactionId) * builder->running.xcnt_space;
memcpy(ondisk_c, builder->running.xip, sz);
- COMP_CRC32(ondisk->checksum, ondisk_c, sz);
+ COMP_CRC32C(ondisk->checksum, ondisk_c, sz);
ondisk_c += sz;
/* copy committed xacts */
sz = sizeof(TransactionId) * builder->committed.xcnt;
memcpy(ondisk_c, builder->committed.xip, sz);
- COMP_CRC32(ondisk->checksum, ondisk_c, sz);
+ COMP_CRC32C(ondisk->checksum, ondisk_c, sz);
ondisk_c += sz;
+ FIN_CRC32C(ondisk->checksum);
+
/* we have valid data now, open tempfile and write it there */
fd = OpenTransientFile(tmppath,
O_CREAT | O_EXCL | O_WRONLY | PG_BINARY,
}
CloseTransientFile(fd);
- fsync_fname("pg_llog/snapshots", true);
+ fsync_fname("pg_logical/snapshots", true);
/*
* We may overwrite the work from some other backend, but that's ok, our
- * snapshot is valid as well, we'll just have done some superflous work.
+ * snapshot is valid as well, we'll just have done some superfluous work.
*/
if (rename(tmppath, path) != 0)
{
/* make sure we persist */
fsync_fname(path, false);
- fsync_fname("pg_llog/snapshots", true);
+ fsync_fname("pg_logical/snapshots", true);
/*
- * Now there's no way we can loose the dumped state anymore, remember
- * this as a serialization point.
+ * Now there's no way we can loose the dumped state anymore, remember this
+ * as a serialization point.
*/
builder->last_serialized_snapshot = lsn;
char path[MAXPGPATH];
Size sz;
int readBytes;
- pg_crc32 checksum;
+ pg_crc32c checksum;
/* no point in loading a snapshot if we're already there */
if (builder->state == SNAPBUILD_CONSISTENT)
return false;
- sprintf(path, "pg_llog/snapshots/%X-%X.snap",
+ sprintf(path, "pg_logical/snapshots/%X-%X.snap",
(uint32) (lsn >> 32), (uint32) lsn);
fd = OpenTransientFile(path, O_RDONLY | PG_BINARY, 0);
* Make sure the snapshot had been stored safely to disk, that's normally
* cheap.
* Note that we do not need PANIC here, nobody will be able to use the
- * slot without fsyncing, and saving it won't suceed without an fsync()
+ * slot without fsyncing, and saving it won't succeed without an fsync()
* either...
* ----
*/
fsync_fname(path, false);
- fsync_fname("pg_llog/snapshots", true);
+ fsync_fname("pg_logical/snapshots", true);
/* read statically sized portion of snapshot */
if (ondisk.magic != SNAPBUILD_MAGIC)
ereport(ERROR,
- (errmsg("snapbuild state file \"%s\" has wrong magic %u instead of %u",
+ (errmsg("snapbuild state file \"%s\" has wrong magic number: %u instead of %u",
path, ondisk.magic, SNAPBUILD_MAGIC)));
if (ondisk.version != SNAPBUILD_VERSION)
ereport(ERROR,
- (errmsg("snapbuild state file \"%s\" has unsupported version %u instead of %u",
+ (errmsg("snapbuild state file \"%s\" has unsupported version: %u instead of %u",
path, ondisk.version, SNAPBUILD_VERSION)));
- INIT_CRC32(checksum);
- COMP_CRC32(checksum,
- ((char *) &ondisk) + SnapBuildOnDiskNotChecksummedSize,
- SnapBuildOnDiskConstantSize - SnapBuildOnDiskNotChecksummedSize);
+ INIT_CRC32C(checksum);
+ COMP_CRC32C(checksum,
+ ((char *) &ondisk) + SnapBuildOnDiskNotChecksummedSize,
+ SnapBuildOnDiskConstantSize - SnapBuildOnDiskNotChecksummedSize);
/* read SnapBuild */
readBytes = read(fd, &ondisk.builder, sizeof(SnapBuild));
errmsg("could not read file \"%s\", read %d of %d: %m",
path, readBytes, (int) sizeof(SnapBuild))));
}
- COMP_CRC32(checksum, &ondisk.builder, sizeof(SnapBuild));
+ COMP_CRC32C(checksum, &ondisk.builder, sizeof(SnapBuild));
/* restore running xacts information */
sz = sizeof(TransactionId) * ondisk.builder.running.xcnt_space;
- ondisk.builder.running.xip = MemoryContextAlloc(builder->context, sz);
+ ondisk.builder.running.xip = MemoryContextAllocZero(builder->context, sz);
readBytes = read(fd, ondisk.builder.running.xip, sz);
if (readBytes != sz)
{
errmsg("could not read file \"%s\", read %d of %d: %m",
path, readBytes, (int) sz)));
}
- COMP_CRC32(checksum, ondisk.builder.running.xip, sz);
+ COMP_CRC32C(checksum, ondisk.builder.running.xip, sz);
/* restore committed xacts information */
sz = sizeof(TransactionId) * ondisk.builder.committed.xcnt;
- ondisk.builder.committed.xip = MemoryContextAlloc(builder->context, sz);
+ ondisk.builder.committed.xip = MemoryContextAllocZero(builder->context, sz);
readBytes = read(fd, ondisk.builder.committed.xip, sz);
if (readBytes != sz)
{
errmsg("could not read file \"%s\", read %d of %d: %m",
path, readBytes, (int) sz)));
}
- COMP_CRC32(checksum, ondisk.builder.committed.xip, sz);
+ COMP_CRC32C(checksum, ondisk.builder.committed.xip, sz);
CloseTransientFile(fd);
+ FIN_CRC32C(checksum);
+
/* verify checksum of what we've read */
- if (!EQ_CRC32(checksum, ondisk.checksum))
+ if (!EQ_CRC32C(checksum, ondisk.checksum))
ereport(ERROR,
(errcode_for_file_access(),
- errmsg("snapbuild state file %s: checksum mismatch, is %u, should be %u",
+ errmsg("checksum mismatch for snapbuild state file \"%s\": is %u, should be %u",
path, checksum, ondisk.checksum)));
/*
/*
* We are only interested in consistent snapshots for now, comparing
- * whether one imcomplete snapshot is more "advanced" seems to be
+ * whether one incomplete snapshot is more "advanced" seems to be
* unnecessarily complex.
*/
if (ondisk.builder.state < SNAPBUILD_CONSISTENT)
}
ondisk.builder.committed.xip = NULL;
- builder->running.xcnt = ondisk.builder.committed.xcnt;
+ builder->running.xcnt = ondisk.builder.running.xcnt;
if (builder->running.xip)
pfree(builder->running.xip);
- builder->running.xcnt_space = ondisk.builder.committed.xcnt_space;
+ builder->running.xcnt_space = ondisk.builder.running.xcnt_space;
builder->running.xip = ondisk.builder.running.xip;
/* our snapshot is not interesting anymore, build a new one */
ereport(LOG,
(errmsg("logical decoding found consistent point at %X/%X",
- (uint32)(lsn >> 32), (uint32)lsn),
- errdetail("found initial snapshot in snapbuild file")));
+ (uint32) (lsn >> 32), (uint32) lsn),
+ errdetail("Logical decoding will begin using saved snapshot.")));
return true;
snapshot_not_interesting:
if (redo < cutoff)
cutoff = redo;
- snap_dir = AllocateDir("pg_llog/snapshots");
- while ((snap_de = ReadDir(snap_dir, "pg_llog/snapshots")) != NULL)
+ snap_dir = AllocateDir("pg_logical/snapshots");
+ while ((snap_de = ReadDir(snap_dir, "pg_logical/snapshots")) != NULL)
{
uint32 hi;
uint32 lo;
XLogRecPtr lsn;
- struct stat statbuf;
+ struct stat statbuf;
if (strcmp(snap_de->d_name, ".") == 0 ||
strcmp(snap_de->d_name, "..") == 0)
continue;
- snprintf(path, MAXPGPATH, "pg_llog/snapshots/%s", snap_de->d_name);
+ snprintf(path, MAXPGPATH, "pg_logical/snapshots/%s", snap_de->d_name);
if (lstat(path, &statbuf) == 0 && !S_ISREG(statbuf.st_mode))
{
/*
* temporary filenames from SnapBuildSerialize() include the LSN and
* everything but are postfixed by .$pid.tmp. We can just remove them
- * the same as other files because there can be none that are currently
- * being written that are older than cutoff.
+ * the same as other files because there can be none that are
+ * currently being written that are older than cutoff.
*
* We just log a message if a file doesn't fit the pattern, it's
* probably some editors lock/state file or similar...
if (sscanf(snap_de->d_name, "%X-%X.snap", &hi, &lo) != 2)
{
ereport(LOG,
- (errmsg("could not parse filename \"%s\"", path)));
+ (errmsg("could not parse file name \"%s\"", path)));
continue;
}
{
ereport(LOG,
(errcode_for_file_access(),
- errmsg("could not unlink file \"%s\": %m",
+ errmsg("could not remove file \"%s\": %m",
path)));
continue;
}