From 1573995f55994ee04dd0d69481de17d662ad8e88 Mon Sep 17 00:00:00 2001 From: Alvaro Herrera Date: Thu, 19 Jul 2018 14:15:44 -0400 Subject: [PATCH] Rewrite comments in replication slot advance implementation MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit The code added by 9c7d06d60680 was a bit obscure; clarify that by rewriting the comments. Lack of clarity has already caused bugs, so it's a worthy goal. Co-authored-by: Arseny Sher Co-authored-by: Michaël Paquier Co-authored-by: Álvaro Herrera Reviewed-by: Petr Jelínek Discussion: https://postgr.es/m/87y3fgoyrn.fsf@ars-thinkpad --- src/backend/replication/logical/logical.c | 5 +- src/backend/replication/slotfuncs.c | 72 +++++++++++++++-------- 2 files changed, 50 insertions(+), 27 deletions(-) diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index c2d0e0c723..3cd4eefb9b 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -338,7 +338,10 @@ CreateInitDecodingContext(char *plugin, * that, see below). * * output_plugin_options - * contains options passed to the output plugin. + * options passed to the output plugin. + * + * fast_forward + * bypass the generation of logical changes. * * read_page, prepare_write, do_write, update_progress * callbacks that have to be filled to perform the use-case dependent, diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c index 6d7474a976..8782bad4a2 100644 --- a/src/backend/replication/slotfuncs.c +++ b/src/backend/replication/slotfuncs.c @@ -317,10 +317,11 @@ pg_get_replication_slots(PG_FUNCTION_ARGS) } /* - * Helper function for advancing physical replication slot forward. - * The LSN position to move to is compared simply to the slot's - * restart_lsn, knowing that any position older than that would be - * removed by successive checkpoints. + * Helper function for advancing our physical replication slot forward. + * + * The LSN position to move to is compared simply to the slot's restart_lsn, + * knowing that any position older than that would be removed by successive + * checkpoints. */ static XLogRecPtr pg_physical_replication_slot_advance(XLogRecPtr moveto) @@ -340,59 +341,78 @@ pg_physical_replication_slot_advance(XLogRecPtr moveto) } /* - * Helper function for advancing logical replication slot forward. + * Helper function for advancing our logical replication slot forward. + * * The slot's restart_lsn is used as start point for reading records, * while confirmed_lsn is used as base point for the decoding context. - * The LSN position to move to is checked by doing a per-record scan and - * logical decoding which makes sure that confirmed_lsn is updated to a - * LSN which allows the future slot consumer to get consistent logical - * changes. + * + * We cannot just do LogicalConfirmReceivedLocation to update confirmed_flush, + * because we need to digest WAL to advance restart_lsn allowing to recycle + * WAL and removal of old catalog tuples. As decoding is done in fast_forward + * mode, no changes are generated anyway. */ static XLogRecPtr pg_logical_replication_slot_advance(XLogRecPtr moveto) { LogicalDecodingContext *ctx; ResourceOwner old_resowner = CurrentResourceOwner; - XLogRecPtr startlsn = MyReplicationSlot->data.restart_lsn; - XLogRecPtr retlsn = MyReplicationSlot->data.confirmed_flush; + XLogRecPtr startlsn; + XLogRecPtr retlsn; PG_TRY(); { - /* restart at slot's confirmed_flush */ + /* + * Create our decoding context in fast_forward mode, passing start_lsn + * as InvalidXLogRecPtr, so that we start processing from my slot's + * confirmed_flush. + */ ctx = CreateDecodingContext(InvalidXLogRecPtr, NIL, - true, + true, /* fast_forward */ logical_read_local_xlog_page, NULL, NULL, NULL); + /* + * Start reading at the slot's restart_lsn, which we know to point to + * a valid record. + */ + startlsn = MyReplicationSlot->data.restart_lsn; + + /* Initialize our return value in case we don't do anything */ + retlsn = MyReplicationSlot->data.confirmed_flush; + /* invalidate non-timetravel entries */ InvalidateSystemCaches(); - /* Decode until we run out of records */ - while ((startlsn != InvalidXLogRecPtr && startlsn < moveto) || - (ctx->reader->EndRecPtr != InvalidXLogRecPtr && ctx->reader->EndRecPtr < moveto)) + /* Decode at least one record, until we run out of records */ + while ((!XLogRecPtrIsInvalid(startlsn) && + startlsn < moveto) || + (!XLogRecPtrIsInvalid(ctx->reader->EndRecPtr) && + ctx->reader->EndRecPtr < moveto)) { - XLogRecord *record; char *errm = NULL; + XLogRecord *record; + /* + * Read records. No changes are generated in fast_forward mode, + * but snapbuilder/slot statuses are updated properly. + */ record = XLogReadRecord(ctx->reader, startlsn, &errm); if (errm) elog(ERROR, "%s", errm); - /* - * Now that we've set up the xlog reader state, subsequent calls - * pass InvalidXLogRecPtr to say "continue from last record" - */ + /* Read sequentially from now on */ startlsn = InvalidXLogRecPtr; /* - * The {begin_txn,change,commit_txn}_wrapper callbacks above will - * store the description into our tuplestore. + * Process the record. Storage-level changes are ignored in + * fast_forward mode, but other modules (such as snapbuilder) + * might still have critical updates to do. */ - if (record != NULL) + if (record) LogicalDecodingProcessRecord(ctx, ctx->reader); - /* Stop once the moving point wanted by caller has been reached */ + /* Stop once the requested target has been reached */ if (moveto <= ctx->reader->EndRecPtr) break; @@ -411,7 +431,7 @@ pg_logical_replication_slot_advance(XLogRecPtr moveto) LogicalConfirmReceivedLocation(moveto); /* - * If only the confirmed_flush_lsn has changed the slot won't get + * If only the confirmed_flush LSN has changed the slot won't get * marked as dirty by the above. Callers on the walsender * interface are expected to keep track of their own progress and * don't need it written out. But SQL-interface users cannot -- 2.40.0