}
/*
- * Helper function for advancing physical replication slot forward.
- * The LSN position to move to is compared simply to the slot's
- * restart_lsn, knowing that any position older than that would be
- * removed by successive checkpoints.
+ * Helper function for advancing our physical replication slot forward.
+ *
+ * The LSN position to move to is compared simply to the slot's restart_lsn,
+ * knowing that any position older than that would be removed by successive
+ * checkpoints.
*/
static XLogRecPtr
pg_physical_replication_slot_advance(XLogRecPtr moveto)
}
/*
- * Helper function for advancing logical replication slot forward.
+ * Helper function for advancing our logical replication slot forward.
+ *
* The slot's restart_lsn is used as start point for reading records,
* while confirmed_lsn is used as base point for the decoding context.
- * The LSN position to move to is checked by doing a per-record scan and
- * logical decoding which makes sure that confirmed_lsn is updated to a
- * LSN which allows the future slot consumer to get consistent logical
- * changes.
+ *
+ * We cannot just do LogicalConfirmReceivedLocation to update confirmed_flush,
+ * because we need to digest WAL to advance restart_lsn allowing to recycle
+ * WAL and removal of old catalog tuples. As decoding is done in fast_forward
+ * mode, no changes are generated anyway.
*/
static XLogRecPtr
pg_logical_replication_slot_advance(XLogRecPtr moveto)
{
LogicalDecodingContext *ctx;
ResourceOwner old_resowner = CurrentResourceOwner;
- XLogRecPtr startlsn = MyReplicationSlot->data.restart_lsn;
- XLogRecPtr retlsn = MyReplicationSlot->data.confirmed_flush;
+ XLogRecPtr startlsn;
+ XLogRecPtr retlsn;
PG_TRY();
{
- /* restart at slot's confirmed_flush */
+ /*
+ * Create our decoding context in fast_forward mode, passing start_lsn
+ * as InvalidXLogRecPtr, so that we start processing from my slot's
+ * confirmed_flush.
+ */
ctx = CreateDecodingContext(InvalidXLogRecPtr,
NIL,
- true,
+ true, /* fast_forward */
logical_read_local_xlog_page,
NULL, NULL, NULL);
- CurrentResourceOwner = ResourceOwnerCreate(CurrentResourceOwner,
- "logical decoding");
+ /*
+ * Start reading at the slot's restart_lsn, which we know to point to
+ * a valid record.
+ */
+ startlsn = MyReplicationSlot->data.restart_lsn;
+
+ /* Initialize our return value in case we don't do anything */
+ retlsn = MyReplicationSlot->data.confirmed_flush;
/* invalidate non-timetravel entries */
InvalidateSystemCaches();
- /* Decode until we run out of records */
- while ((startlsn != InvalidXLogRecPtr && startlsn < moveto) ||
- (ctx->reader->EndRecPtr != InvalidXLogRecPtr && ctx->reader->EndRecPtr < moveto))
+ /* Decode at least one record, until we run out of records */
+ while ((!XLogRecPtrIsInvalid(startlsn) &&
+ startlsn < moveto) ||
+ (!XLogRecPtrIsInvalid(ctx->reader->EndRecPtr) &&
+ ctx->reader->EndRecPtr < moveto))
{
- XLogRecord *record;
char *errm = NULL;
+ XLogRecord *record;
+ /*
+ * Read records. No changes are generated in fast_forward mode,
+ * but snapbuilder/slot statuses are updated properly.
+ */
record = XLogReadRecord(ctx->reader, startlsn, &errm);
if (errm)
elog(ERROR, "%s", errm);
- /*
- * Now that we've set up the xlog reader state, subsequent calls
- * pass InvalidXLogRecPtr to say "continue from last record"
- */
+ /* Read sequentially from now on */
startlsn = InvalidXLogRecPtr;
/*
- * The {begin_txn,change,commit_txn}_wrapper callbacks above will
- * store the description into our tuplestore.
+ * Process the record. Storage-level changes are ignored in
+ * fast_forward mode, but other modules (such as snapbuilder)
+ * might still have critical updates to do.
*/
- if (record != NULL)
+ if (record)
LogicalDecodingProcessRecord(ctx, ctx->reader);
- /* Stop once the moving point wanted by caller has been reached */
+ /* Stop once the requested target has been reached */
if (moveto <= ctx->reader->EndRecPtr)
break;
CHECK_FOR_INTERRUPTS();
}
+ /*
+ * Logical decoding could have clobbered CurrentResourceOwner during
+ * transaction management, so restore the executor's value. (This is
+ * a kluge, but it's not worth cleaning up right now.)
+ */
CurrentResourceOwner = old_resowner;
if (ctx->reader->EndRecPtr != InvalidXLogRecPtr)
LogicalConfirmReceivedLocation(moveto);
/*
- * If only the confirmed_flush_lsn has changed the slot won't get
+ * If only the confirmed_flush LSN has changed the slot won't get
* marked as dirty by the above. Callers on the walsender
* interface are expected to keep track of their own progress and
* don't need it written out. But SQL-interface users cannot