X-Git-Url: https://granicus.if.org/sourcecode?a=blobdiff_plain;f=src%2Fbackend%2Freplication%2Flogical%2Fsnapbuild.c;h=8b59fc5a16a15cabac8816edb44215824dd92bf2;hb=ea268cdc9a2631da4a5748b00059a9fd43470d0e;hp=c462e9059d64018db90a37c32f36da521d2feb48;hpb=150a9df5288d2ba59a26767659a99c44b683fe8f;p=postgresql diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c index c462e9059d..8b59fc5a16 100644 --- a/src/backend/replication/logical/snapbuild.c +++ b/src/backend/replication/logical/snapbuild.c @@ -28,7 +28,7 @@ * * As the percentage of transactions modifying the catalog normally is fairly * small in comparisons to ones only manipulating user data, we keep track of - * the committed catalog modifying ones inside (xmin, xmax) instead of keeping + * the committed catalog modifying ones inside [xmin, xmax) instead of keeping * track of all running transactions like it's done in a normal snapshot. Note * that we're generally only looking at transactions that have acquired an * xid. That is we keep a list of transactions between snapshot->(xmin, xmax) @@ -57,29 +57,29 @@ * * The snapbuild machinery is starting up in several stages, as illustrated * by the following graph: - * +-------------------------+ - * +----|SNAPBUILD_START |-------------+ - * | +-------------------------+ | - * | | | - * | | | - * | running_xacts with running xacts | - * | | | - * | | | - * | v | - * | +-------------------------+ v - * | |SNAPBUILD_FULL_SNAPSHOT |------------>| - * | +-------------------------+ | - * running_xacts | saved snapshot - * with zero xacts | at running_xacts's lsn - * | | | - * | all running toplevel TXNs finished | - * | | | - * | v | - * | +-------------------------+ | - * +--->|SNAPBUILD_CONSISTENT |<------------+ - * +-------------------------+ + * +-------------------------+ + * +----|SNAPBUILD_START |-------------+ + * | +-------------------------+ | + * | | | + * | | | + * | running_xacts with running xacts | + * | | | + * | | | + * | v | + * | +-------------------------+ v + * | |SNAPBUILD_FULL_SNAPSHOT |------------>| + * | +-------------------------+ | + * running_xacts | saved snapshot + * with zero xacts | at running_xacts's lsn + * | | | + * | all running toplevel TXNs finished | + * | | | + * | v | + * | +-------------------------+ | + * +--->|SNAPBUILD_CONSISTENT |<------------+ + * +-------------------------+ * - * Initially the machinery is in the START stage. When a xl_running_xacts + * Initially the machinery is in the START stage. When an xl_running_xacts * record is read that is sufficiently new (above the safe xmin horizon), * there's a state transition. If there were no running xacts when the * runnign_xacts record was generated, we'll directly go into CONSISTENT @@ -96,7 +96,7 @@ * is a convenient point to initialize replication from, which is why we * export a snapshot at that point, which *can* be used to read normal data. * - * Copyright (c) 2012-2014, PostgreSQL Global Development Group + * Copyright (c) 2012-2016, PostgreSQL Global Development Group * * IDENTIFICATION * src/backend/replication/snapbuild.c @@ -153,15 +153,14 @@ struct SnapBuild TransactionId xmax; /* - * Don't replay commits from an LSN <= this LSN. This can be set - * externally but it will also be advanced (never retreat) from within - * snapbuild.c. + * Don't replay commits from an LSN < this LSN. This can be set externally + * but it will also be advanced (never retreat) from within snapbuild.c. */ - XLogRecPtr transactions_after; + XLogRecPtr start_decoding_at; /* * Don't start decoding WAL until the "xl_running_xacts" information - * indicates there are no running xids with a xid smaller than this. + * indicates there are no running xids with an xid smaller than this. */ TransactionId initial_xmin_horizon; @@ -184,7 +183,7 @@ struct SnapBuild * Information about initially running transactions * * When we start building a snapshot there already may be transactions in - * progress. Those are stored in running.xip. We don't have enough + * progress. Those are stored in running.xip. We don't have enough * information about those to decode their contents, so until they are * finished (xcnt=0) we cannot switch to a CONSISTENT state. */ @@ -243,8 +242,8 @@ struct SnapBuild * Starting a transaction -- which we need to do while exporting a snapshot -- * removes knowledge about the previously used resowner, so we save it here. */ -ResourceOwner SavedResourceOwnerDuringExport = NULL; -bool ExportInProgress = false; +static ResourceOwner SavedResourceOwnerDuringExport = NULL; +static bool ExportInProgress = false; /* transaction state manipulation functions */ static void SnapBuildEndTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid); @@ -290,9 +289,7 @@ AllocateSnapshotBuilder(ReorderBuffer *reorder, /* allocate memory in own context, to have better accountability */ context = AllocSetContextCreate(CurrentMemoryContext, "snapshot builder context", - ALLOCSET_DEFAULT_MINSIZE, - ALLOCSET_DEFAULT_INITSIZE, - ALLOCSET_DEFAULT_MAXSIZE); + ALLOCSET_DEFAULT_SIZES); oldcontext = MemoryContextSwitchTo(context); builder = palloc0(sizeof(SnapBuild)); @@ -307,10 +304,9 @@ AllocateSnapshotBuilder(ReorderBuffer *reorder, builder->committed.xip = palloc0(builder->committed.xcnt_space * sizeof(TransactionId)); builder->committed.includes_all_transactions = true; - builder->committed.xip = - palloc0(builder->committed.xcnt_space * sizeof(TransactionId)); + builder->initial_xmin_horizon = xmin_horizon; - builder->transactions_after = start_lsn; + builder->start_decoding_at = start_lsn; MemoryContextSwitchTo(oldcontext); @@ -349,7 +345,7 @@ SnapBuildFreeSnapshot(Snapshot snap) Assert(snap->curcid == FirstCommandId); Assert(!snap->suboverflowed); Assert(!snap->takenDuringRecovery); - Assert(snap->regd_count == 1); + Assert(snap->regd_count == 0); /* slightly more likely, so it's checked even without c-asserts */ if (snap->copied) @@ -376,7 +372,7 @@ SnapBuildCurrentState(SnapBuild *builder) bool SnapBuildXactNeedsSkip(SnapBuild *builder, XLogRecPtr ptr) { - return ptr <= builder->transactions_after; + return ptr < builder->start_decoding_at; } /* @@ -408,16 +404,16 @@ SnapBuildSnapDecRefcount(Snapshot snap) Assert(!snap->suboverflowed); Assert(!snap->takenDuringRecovery); - Assert(snap->regd_count == 1); + Assert(snap->regd_count == 0); - Assert(snap->active_count); + Assert(snap->active_count > 0); /* slightly more likely, so it's checked even without casserts */ if (snap->copied) elog(ERROR, "cannot free a copied snapshot"); snap->active_count--; - if (!snap->active_count) + if (snap->active_count == 0) SnapBuildFreeSnapshot(snap); } @@ -496,7 +492,7 @@ SnapBuildBuildSnapshot(SnapBuild *builder, TransactionId xid) snapshot->copied = false; snapshot->curcid = FirstCommandId; snapshot->active_count = 0; - snapshot->regd_count = 1; /* mark as registered so nobody frees it */ + snapshot->regd_count = 0; return snapshot; } @@ -599,20 +595,41 @@ SnapBuildExportSnapshot(SnapBuild *builder) snapname = ExportSnapshot(snap); ereport(LOG, - (errmsg("exported logical decoding snapshot: \"%s\" with %u xids", - snapname, snap->xcnt))); + (errmsg_plural("exported logical decoding snapshot: \"%s\" with %u transaction ID", + "exported logical decoding snapshot: \"%s\" with %u transaction IDs", + snap->xcnt, + snapname, snap->xcnt))); return snapname; } +/* + * Ensure there is a snapshot and if not build one for current transaction. + */ +Snapshot +SnapBuildGetOrBuildSnapshot(SnapBuild *builder, TransactionId xid) +{ + Assert(builder->state == SNAPBUILD_CONSISTENT); + + /* only build a new snapshot if we don't have a prebuilt one */ + if (builder->snapshot == NULL) + { + builder->snapshot = SnapBuildBuildSnapshot(builder, xid); + /* inrease refcount for the snapshot builder */ + SnapBuildSnapIncRefcount(builder->snapshot); + } + + return builder->snapshot; +} + /* * Reset a previously SnapBuildExportSnapshot()'ed snapshot if there is * any. Aborts the previously started transaction and resets the resource * owner back to its original value. */ void -SnapBuildClearExportedSnapshot() +SnapBuildClearExportedSnapshot(void) { - /* nothing exported, thats the usual case */ + /* nothing exported, that is the usual case */ if (!ExportInProgress) return; @@ -635,8 +652,6 @@ SnapBuildClearExportedSnapshot() bool SnapBuildProcessChange(SnapBuild *builder, TransactionId xid, XLogRecPtr lsn) { - bool is_old_tx; - /* * We can't handle data in transactions if we haven't built a snapshot * yet, so don't store them. @@ -657,9 +672,7 @@ SnapBuildProcessChange(SnapBuild *builder, TransactionId xid, XLogRecPtr lsn) * If the reorderbuffer doesn't yet have a snapshot, add one now, it will * be needed to decode the change we're currently processing. */ - is_old_tx = ReorderBufferIsXidKnown(builder->reorder, xid); - - if (!is_old_tx || !ReorderBufferXidHasBaseSnapshot(builder->reorder, xid)) + if (!ReorderBufferXidHasBaseSnapshot(builder->reorder, xid)) { /* only build a new snapshot if we don't have a prebuilt one */ if (builder->snapshot == NULL) @@ -682,8 +695,9 @@ SnapBuildProcessChange(SnapBuild *builder, TransactionId xid, XLogRecPtr lsn) } /* - * Do CommandId/ComboCid handling after reading a xl_heap_new_cid record. This - * implies that a transaction has done some form of write to system catalogs. + * Do CommandId/ComboCid handling after reading an xl_heap_new_cid record. + * This implies that a transaction has done some form of write to system + * catalogs. */ void SnapBuildProcessNewCid(SnapBuild *builder, TransactionId xid, @@ -692,13 +706,13 @@ SnapBuildProcessNewCid(SnapBuild *builder, TransactionId xid, CommandId cid; /* - * we only log new_cid's if a catalog tuple was modified, so mark - * the transaction as containing catalog modifications + * we only log new_cid's if a catalog tuple was modified, so mark the + * transaction as containing catalog modifications */ - ReorderBufferXidSetCatalogChanges(builder->reorder, xid,lsn); + ReorderBufferXidSetCatalogChanges(builder->reorder, xid, lsn); ReorderBufferAddNewTupleCids(builder->reorder, xlrec->top_xid, lsn, - xlrec->target.node, xlrec->target.tid, + xlrec->target_node, xlrec->target_tid, xlrec->cmin, xlrec->cmax, xlrec->combocid); @@ -712,7 +726,7 @@ SnapBuildProcessNewCid(SnapBuild *builder, TransactionId xid, cid = xlrec->cmin; else { - cid = InvalidCommandId; /* silence compiler */ + cid = InvalidCommandId; /* silence compiler */ elog(ERROR, "xl_heap_new_cid record without a valid CommandId"); } @@ -768,7 +782,7 @@ SnapBuildDistributeNewCatalogSnapshot(SnapBuild *builder, XLogRecPtr lsn) /* * Iterate through all toplevel transactions. This can include * subtransactions which we just don't yet know to be that, but that's - * fine, they will just get an unneccesary snapshot queued. + * fine, they will just get an unnecessary snapshot queued. */ dlist_foreach(txn_i, &builder->reorder->toplevel_by_lsn) { @@ -818,7 +832,7 @@ SnapBuildAddCommittedTxn(SnapBuild *builder, TransactionId xid) (uint32) builder->committed.xcnt_space); builder->committed.xip = repalloc(builder->committed.xip, - builder->committed.xcnt_space * sizeof(TransactionId)); + builder->committed.xcnt_space * sizeof(TransactionId)); } /* @@ -831,7 +845,7 @@ SnapBuildAddCommittedTxn(SnapBuild *builder, TransactionId xid) /* * Remove knowledge about transactions we treat as committed that are smaller - * than ->xmin. Those won't ever get checked via the ->commited array but via + * than ->xmin. Those won't ever get checked via the ->committed array but via * the clog machinery, so we don't need to waste memory on them. */ static void @@ -885,7 +899,7 @@ SnapBuildEndTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid) /* * NB: This handles subtransactions correctly even if we started from * suboverflowed xl_running_xacts because we only keep track of toplevel - * transactions. Since the latter are always are allocated before their + * transactions. Since the latter are always allocated before their * subxids and since they end at the same time it's sufficient to deal * with them here. */ @@ -900,10 +914,10 @@ SnapBuildEndTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid) * so our incrementaly built snapshot now is consistent. */ ereport(LOG, - (errmsg("logical decoding found consistent point at %X/%X", - (uint32)(lsn >> 32), (uint32)lsn), - errdetail("xid %u finished, no running transactions anymore", - xid))); + (errmsg("logical decoding found consistent point at %X/%X", + (uint32) (lsn >> 32), (uint32) lsn), + errdetail("Transaction ID %u finished; no more running transactions.", + xid))); builder->state = SNAPBUILD_CONSISTENT; } } @@ -956,8 +970,8 @@ SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid, if (builder->state < SNAPBUILD_CONSISTENT) { /* ensure that only commits after this are getting replayed */ - if (builder->transactions_after < lsn) - builder->transactions_after = lsn; + if (builder->start_decoding_at <= lsn) + builder->start_decoding_at = lsn + 1; /* * We could avoid treating !SnapBuildTxnIsRunning transactions as @@ -965,7 +979,7 @@ SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid, * we reached consistency. */ forced_timetravel = true; - elog(DEBUG1, "forced to assume catalog changes for xid %u because it was running to early", xid); + elog(DEBUG1, "forced to assume catalog changes for xid %u because it was running too early", xid); } for (nxact = 0; nxact < nsubxacts; nxact++) @@ -1076,7 +1090,7 @@ SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid, /* refcount of the snapshot builder for the new snapshot */ SnapBuildSnapIncRefcount(builder->snapshot); - /* add a new SnapshotNow to all currently running transactions */ + /* add a new Snapshot to all currently running transactions */ SnapBuildDistributeNewCatalogSnapshot(builder, lsn); } else @@ -1170,15 +1184,16 @@ SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn, xl_running_xact */ if (txn != NULL && txn->restart_decoding_lsn != InvalidXLogRecPtr) LogicalIncreaseRestartDecodingForSlot(lsn, txn->restart_decoding_lsn); + /* * No in-progress transaction, can reuse the last serialized snapshot if * we have one. */ else if (txn == NULL && - builder->reorder->current_restart_decoding_lsn != InvalidXLogRecPtr && + builder->reorder->current_restart_decoding_lsn != InvalidXLogRecPtr && builder->last_serialized_snapshot != InvalidXLogRecPtr) LogicalIncreaseRestartDecodingForSlot(lsn, - builder->last_serialized_snapshot); + builder->last_serialized_snapshot); } @@ -1199,23 +1214,23 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn * the currently running transactions. There are several ways to do that: * * a) There were no running transactions when the xl_running_xacts record - * was inserted, jump to CONSISTENT immediately. We might find such a - * state we were waiting for b) and c). + * was inserted, jump to CONSISTENT immediately. We might find such a + * state we were waiting for b) and c). * * b) Wait for all toplevel transactions that were running to end. We - * simply track the number of in-progress toplevel transactions and - * lower it whenever one commits or aborts. When that number - * (builder->running.xcnt) reaches zero, we can go from FULL_SNAPSHOT - * to CONSISTENT. + * simply track the number of in-progress toplevel transactions and + * lower it whenever one commits or aborts. When that number + * (builder->running.xcnt) reaches zero, we can go from FULL_SNAPSHOT + * to CONSISTENT. * NB: We need to search running.xip when seeing a transaction's end to - * make sure it's a toplevel transaction and it's been one of the - * intially running ones. + * make sure it's a toplevel transaction and it's been one of the + * initially running ones. * Interestingly, in contrast to HS, this allows us not to care about * subtransactions - and by extension suboverflowed xl_running_xacts - * at all. * * c) This (in a previous run) or another decoding slot serialized a - * snapshot to disk that we can use. + * snapshot to disk that we can use. * --- */ @@ -1228,10 +1243,10 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn builder->initial_xmin_horizon)) { ereport(DEBUG1, - (errmsg("skipping snapshot at %X/%X while building logical decoding snapshot, xmin horizon too low", - (uint32) (lsn >> 32), (uint32) lsn), - errdetail("initial xmin horizon of %u vs the snapshot's %u", - builder->initial_xmin_horizon, running->oldestRunningXid))); + (errmsg_internal("skipping snapshot at %X/%X while building logical decoding snapshot, xmin horizon too low", + (uint32) (lsn >> 32), (uint32) lsn), + errdetail_internal("initial xmin horizon of %u vs the snapshot's %u", + builder->initial_xmin_horizon, running->oldestRunningXid))); return true; } @@ -1243,14 +1258,16 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn */ if (running->xcnt == 0) { - if (builder->transactions_after == InvalidXLogRecPtr || - builder->transactions_after < lsn) - builder->transactions_after = lsn; + if (builder->start_decoding_at == InvalidXLogRecPtr || + builder->start_decoding_at <= lsn) + /* can decode everything after this */ + builder->start_decoding_at = lsn + 1; - builder->xmin = running->oldestRunningXid; - builder->xmax = running->latestCompletedXid; - TransactionIdAdvance(builder->xmax); + /* As no transactions were running xmin/xmax can be trivially set. */ + builder->xmin = running->nextXid; /* < are finished */ + builder->xmax = running->nextXid; /* >= are running */ + /* so we can safely use the faster comparisons */ Assert(TransactionIdIsNormal(builder->xmin)); Assert(TransactionIdIsNormal(builder->xmax)); @@ -1263,8 +1280,8 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn ereport(LOG, (errmsg("logical decoding found consistent point at %X/%X", - (uint32)(lsn >> 32), (uint32)lsn), - errdetail("running xacts with xcnt == 0"))); + (uint32) (lsn >> 32), (uint32) lsn), + errdetail("There are no running transactions."))); return false; } @@ -1274,15 +1291,16 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn /* there won't be any state to cleanup */ return false; } + /* * b) first encounter of a useable xl_running_xacts record. If we had - * found one earlier we would either track running transactions - * (i.e. builder->running.xcnt != 0) or be consistent (this function - * wouldn't get called). + * found one earlier we would either track running transactions (i.e. + * builder->running.xcnt != 0) or be consistent (this function wouldn't + * get called). */ else if (!builder->running.xcnt) { - int off; + int off; /* * We only care about toplevel xids as those are the ones we @@ -1290,9 +1308,14 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn * instead of running transactions we don't need to know anything * about uncommitted subtransactions. */ - builder->xmin = running->oldestRunningXid; - builder->xmax = running->latestCompletedXid; - TransactionIdAdvance(builder->xmax); + + /* + * Start with an xmin/xmax that's correct for future, when all the + * currently running transactions have finished. We'll update both + * while waiting for the pending transactions to finish. + */ + builder->xmin = running->nextXid; /* < are finished */ + builder->xmax = running->nextXid; /* >= are running */ /* so we can safely use the faster comparisons */ Assert(TransactionIdIsNormal(builder->xmin)); @@ -1302,7 +1325,7 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn builder->running.xcnt_space = running->xcnt; builder->running.xip = MemoryContextAlloc(builder->context, - builder->running.xcnt * sizeof(TransactionId)); + builder->running.xcnt * sizeof(TransactionId)); memcpy(builder->running.xip, running->xids, builder->running.xcnt * sizeof(TransactionId)); @@ -1320,9 +1343,12 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn builder->state = SNAPBUILD_FULL_SNAPSHOT; ereport(LOG, - (errmsg("logical decoding found initial starting point at %X/%X", - (uint32)(lsn >> 32), (uint32)lsn), - errdetail("%u xacts need to finish", (uint32) builder->running.xcnt))); + (errmsg("logical decoding found initial starting point at %X/%X", + (uint32) (lsn >> 32), (uint32) lsn), + errdetail_plural("%u transaction needs to finish.", + "%u transactions need to finish.", + builder->running.xcnt, + (uint32) builder->running.xcnt))); /* * Iterate through all xids, wait for them to finish. @@ -1331,7 +1357,7 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn * isolationtester to notice that we're currently waiting for * something. */ - for(off = 0; off < builder->running.xcnt; off++) + for (off = 0; off < builder->running.xcnt; off++) { TransactionId xid = builder->running.xip[off]; @@ -1378,7 +1404,7 @@ typedef struct SnapBuildOnDisk /* data not covered by checksum */ uint32 magic; - pg_crc32 checksum; + pg_crc32c checksum; /* data covered by checksum */ @@ -1399,7 +1425,7 @@ typedef struct SnapBuildOnDisk offsetof(SnapBuildOnDisk, version) #define SNAPBUILD_MAGIC 0x51A1E001 -#define SNAPBUILD_VERSION 1 +#define SNAPBUILD_VERSION 2 /* * Store/Load a snapshot from disk, depending on the snapshot builder's state. @@ -1431,7 +1457,7 @@ SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn) char path[MAXPGPATH]; int ret; struct stat stat_buf; - uint32 sz; + Size sz; Assert(lsn != InvalidXLogRecPtr); Assert(builder->last_serialized_snapshot == InvalidXLogRecPtr || @@ -1450,7 +1476,7 @@ SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn) * unless the user used pg_resetxlog or similar. If a user did so, there's * no hope continuing to decode anyway. */ - sprintf(path, "pg_llog/snapshots/%X-%X.snap", + sprintf(path, "pg_logical/snapshots/%X-%X.snap", (uint32) (lsn >> 32), (uint32) lsn); /* @@ -1471,12 +1497,12 @@ SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn) * but remember location, so we don't need to read old data again. * * To be sure it has been synced to disk after the rename() from the - * tempfile filename to the real filename, we just repeat the - * fsync. That ought to be cheap because in most scenarios it should - * already be safely on disk. + * tempfile filename to the real filename, we just repeat the fsync. + * That ought to be cheap because in most scenarios it should already + * be safely on disk. */ fsync_fname(path, false); - fsync_fname("pg_llog/snapshots", true); + fsync_fname("pg_logical/snapshots", true); builder->last_serialized_snapshot = lsn; goto out; @@ -1492,7 +1518,7 @@ SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn) elog(DEBUG1, "serializing snapshot to %s", path); /* to make sure only we will write to this tempfile, include pid */ - sprintf(tmppath, "pg_llog/snapshots/%X-%X.snap.%u.tmp", + sprintf(tmppath, "pg_logical/snapshots/%X-%X.snap.%u.tmp", (uint32) (lsn >> 32), (uint32) lsn, MyProcPid); /* @@ -1504,7 +1530,7 @@ SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn) if (unlink(tmppath) != 0 && errno != ENOENT) ereport(ERROR, (errcode_for_file_access(), - errmsg("could not unlink file \"%s\": %m", path))); + errmsg("could not remove file \"%s\": %m", path))); needed_length = sizeof(SnapBuildOnDisk) + sizeof(TransactionId) * builder->running.xcnt_space + @@ -1515,10 +1541,10 @@ SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn) ondisk->magic = SNAPBUILD_MAGIC; ondisk->version = SNAPBUILD_VERSION; ondisk->length = needed_length; - INIT_CRC32(ondisk->checksum); - COMP_CRC32(ondisk->checksum, - ((char *) ondisk) + SnapBuildOnDiskNotChecksummedSize, - SnapBuildOnDiskConstantSize - SnapBuildOnDiskNotChecksummedSize); + INIT_CRC32C(ondisk->checksum); + COMP_CRC32C(ondisk->checksum, + ((char *) ondisk) + SnapBuildOnDiskNotChecksummedSize, + SnapBuildOnDiskConstantSize - SnapBuildOnDiskNotChecksummedSize); ondisk_c += sizeof(SnapBuildOnDisk); memcpy(&ondisk->builder, builder, sizeof(SnapBuild)); @@ -1529,22 +1555,24 @@ SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn) ondisk->builder.running.xip = NULL; ondisk->builder.committed.xip = NULL; - COMP_CRC32(ondisk->checksum, - &ondisk->builder, - sizeof(SnapBuild)); + COMP_CRC32C(ondisk->checksum, + &ondisk->builder, + sizeof(SnapBuild)); /* copy running xacts */ sz = sizeof(TransactionId) * builder->running.xcnt_space; memcpy(ondisk_c, builder->running.xip, sz); - COMP_CRC32(ondisk->checksum, ondisk_c, sz); + COMP_CRC32C(ondisk->checksum, ondisk_c, sz); ondisk_c += sz; /* copy committed xacts */ sz = sizeof(TransactionId) * builder->committed.xcnt; memcpy(ondisk_c, builder->committed.xip, sz); - COMP_CRC32(ondisk->checksum, ondisk_c, sz); + COMP_CRC32C(ondisk->checksum, ondisk_c, sz); ondisk_c += sz; + FIN_CRC32C(ondisk->checksum); + /* we have valid data now, open tempfile and write it there */ fd = OpenTransientFile(tmppath, O_CREAT | O_EXCL | O_WRONLY | PG_BINARY, @@ -1578,11 +1606,11 @@ SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn) } CloseTransientFile(fd); - fsync_fname("pg_llog/snapshots", true); + fsync_fname("pg_logical/snapshots", true); /* * We may overwrite the work from some other backend, but that's ok, our - * snapshot is valid as well, we'll just have done some superflous work. + * snapshot is valid as well, we'll just have done some superfluous work. */ if (rename(tmppath, path) != 0) { @@ -1594,11 +1622,11 @@ SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn) /* make sure we persist */ fsync_fname(path, false); - fsync_fname("pg_llog/snapshots", true); + fsync_fname("pg_logical/snapshots", true); /* - * Now there's no way we can loose the dumped state anymore, remember - * this as a serialization point. + * Now there's no way we can loose the dumped state anymore, remember this + * as a serialization point. */ builder->last_serialized_snapshot = lsn; @@ -1619,13 +1647,13 @@ SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn) char path[MAXPGPATH]; Size sz; int readBytes; - pg_crc32 checksum; + pg_crc32c checksum; /* no point in loading a snapshot if we're already there */ if (builder->state == SNAPBUILD_CONSISTENT) return false; - sprintf(path, "pg_llog/snapshots/%X-%X.snap", + sprintf(path, "pg_logical/snapshots/%X-%X.snap", (uint32) (lsn >> 32), (uint32) lsn); fd = OpenTransientFile(path, O_RDONLY | PG_BINARY, 0); @@ -1641,12 +1669,12 @@ SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn) * Make sure the snapshot had been stored safely to disk, that's normally * cheap. * Note that we do not need PANIC here, nobody will be able to use the - * slot without fsyncing, and saving it won't suceed without an fsync() + * slot without fsyncing, and saving it won't succeed without an fsync() * either... * ---- */ fsync_fname(path, false); - fsync_fname("pg_llog/snapshots", true); + fsync_fname("pg_logical/snapshots", true); /* read statically sized portion of snapshot */ @@ -1662,18 +1690,18 @@ SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn) if (ondisk.magic != SNAPBUILD_MAGIC) ereport(ERROR, - (errmsg("snapbuild state file \"%s\" has wrong magic %u instead of %u", + (errmsg("snapbuild state file \"%s\" has wrong magic number: %u instead of %u", path, ondisk.magic, SNAPBUILD_MAGIC))); if (ondisk.version != SNAPBUILD_VERSION) ereport(ERROR, - (errmsg("snapbuild state file \"%s\" has unsupported version %u instead of %u", + (errmsg("snapbuild state file \"%s\" has unsupported version: %u instead of %u", path, ondisk.version, SNAPBUILD_VERSION))); - INIT_CRC32(checksum); - COMP_CRC32(checksum, - ((char *) &ondisk) + SnapBuildOnDiskNotChecksummedSize, - SnapBuildOnDiskConstantSize - SnapBuildOnDiskNotChecksummedSize); + INIT_CRC32C(checksum); + COMP_CRC32C(checksum, + ((char *) &ondisk) + SnapBuildOnDiskNotChecksummedSize, + SnapBuildOnDiskConstantSize - SnapBuildOnDiskNotChecksummedSize); /* read SnapBuild */ readBytes = read(fd, &ondisk.builder, sizeof(SnapBuild)); @@ -1685,11 +1713,11 @@ SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn) errmsg("could not read file \"%s\", read %d of %d: %m", path, readBytes, (int) sizeof(SnapBuild)))); } - COMP_CRC32(checksum, &ondisk.builder, sizeof(SnapBuild)); + COMP_CRC32C(checksum, &ondisk.builder, sizeof(SnapBuild)); /* restore running xacts information */ sz = sizeof(TransactionId) * ondisk.builder.running.xcnt_space; - ondisk.builder.running.xip = MemoryContextAlloc(builder->context, sz); + ondisk.builder.running.xip = MemoryContextAllocZero(builder->context, sz); readBytes = read(fd, ondisk.builder.running.xip, sz); if (readBytes != sz) { @@ -1699,11 +1727,11 @@ SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn) errmsg("could not read file \"%s\", read %d of %d: %m", path, readBytes, (int) sz))); } - COMP_CRC32(checksum, ondisk.builder.running.xip, sz); + COMP_CRC32C(checksum, ondisk.builder.running.xip, sz); /* restore committed xacts information */ sz = sizeof(TransactionId) * ondisk.builder.committed.xcnt; - ondisk.builder.committed.xip = MemoryContextAlloc(builder->context, sz); + ondisk.builder.committed.xip = MemoryContextAllocZero(builder->context, sz); readBytes = read(fd, ondisk.builder.committed.xip, sz); if (readBytes != sz) { @@ -1713,15 +1741,17 @@ SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn) errmsg("could not read file \"%s\", read %d of %d: %m", path, readBytes, (int) sz))); } - COMP_CRC32(checksum, ondisk.builder.committed.xip, sz); + COMP_CRC32C(checksum, ondisk.builder.committed.xip, sz); CloseTransientFile(fd); + FIN_CRC32C(checksum); + /* verify checksum of what we've read */ - if (!EQ_CRC32(checksum, ondisk.checksum)) + if (!EQ_CRC32C(checksum, ondisk.checksum)) ereport(ERROR, (errcode_for_file_access(), - errmsg("snapbuild state file %s: checksum mismatch, is %u, should be %u", + errmsg("checksum mismatch for snapbuild state file \"%s\": is %u, should be %u", path, checksum, ondisk.checksum))); /* @@ -1731,7 +1761,7 @@ SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn) /* * We are only interested in consistent snapshots for now, comparing - * whether one imcomplete snapshot is more "advanced" seems to be + * whether one incomplete snapshot is more "advanced" seems to be * unnecessarily complex. */ if (ondisk.builder.state < SNAPBUILD_CONSISTENT) @@ -1761,10 +1791,10 @@ SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn) } ondisk.builder.committed.xip = NULL; - builder->running.xcnt = ondisk.builder.committed.xcnt; + builder->running.xcnt = ondisk.builder.running.xcnt; if (builder->running.xip) pfree(builder->running.xip); - builder->running.xcnt_space = ondisk.builder.committed.xcnt_space; + builder->running.xcnt_space = ondisk.builder.running.xcnt_space; builder->running.xip = ondisk.builder.running.xip; /* our snapshot is not interesting anymore, build a new one */ @@ -1781,8 +1811,8 @@ SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn) ereport(LOG, (errmsg("logical decoding found consistent point at %X/%X", - (uint32)(lsn >> 32), (uint32)lsn), - errdetail("found initial snapshot in snapbuild file"))); + (uint32) (lsn >> 32), (uint32) lsn), + errdetail("Logical decoding will begin using saved snapshot."))); return true; snapshot_not_interesting: @@ -1823,19 +1853,19 @@ CheckPointSnapBuild(void) if (redo < cutoff) cutoff = redo; - snap_dir = AllocateDir("pg_llog/snapshots"); - while ((snap_de = ReadDir(snap_dir, "pg_llog/snapshots")) != NULL) + snap_dir = AllocateDir("pg_logical/snapshots"); + while ((snap_de = ReadDir(snap_dir, "pg_logical/snapshots")) != NULL) { uint32 hi; uint32 lo; XLogRecPtr lsn; - struct stat statbuf; + struct stat statbuf; if (strcmp(snap_de->d_name, ".") == 0 || strcmp(snap_de->d_name, "..") == 0) continue; - snprintf(path, MAXPGPATH, "pg_llog/snapshots/%s", snap_de->d_name); + snprintf(path, MAXPGPATH, "pg_logical/snapshots/%s", snap_de->d_name); if (lstat(path, &statbuf) == 0 && !S_ISREG(statbuf.st_mode)) { @@ -1846,8 +1876,8 @@ CheckPointSnapBuild(void) /* * temporary filenames from SnapBuildSerialize() include the LSN and * everything but are postfixed by .$pid.tmp. We can just remove them - * the same as other files because there can be none that are currently - * being written that are older than cutoff. + * the same as other files because there can be none that are + * currently being written that are older than cutoff. * * We just log a message if a file doesn't fit the pattern, it's * probably some editors lock/state file or similar... @@ -1855,7 +1885,7 @@ CheckPointSnapBuild(void) if (sscanf(snap_de->d_name, "%X-%X.snap", &hi, &lo) != 2) { ereport(LOG, - (errmsg("could not parse filename \"%s\"", path))); + (errmsg("could not parse file name \"%s\"", path))); continue; } @@ -1875,7 +1905,7 @@ CheckPointSnapBuild(void) { ereport(LOG, (errcode_for_file_access(), - errmsg("could not unlink file \"%s\": %m", + errmsg("could not remove file \"%s\": %m", path))); continue; }