]> granicus.if.org Git - postgresql/commitdiff
Rewrite comments in replication slot advance implementation
authorAlvaro Herrera <alvherre@alvh.no-ip.org>
Thu, 19 Jul 2018 18:15:44 +0000 (14:15 -0400)
committerAlvaro Herrera <alvherre@alvh.no-ip.org>
Thu, 19 Jul 2018 18:15:44 +0000 (14:15 -0400)
The code added by 9c7d06d60680 was a bit obscure; clarify that by
rewriting the comments.  Lack of clarity has already caused bugs, so
it's a worthy goal.

Co-authored-by: Arseny Sher <a.sher@postgrespro.ru>
Co-authored-by: Michaël Paquier <michael@paquier.xyz>
Co-authored-by: Álvaro Herrera <alvherre@alvh.no-ip.org>
Reviewed-by: Petr Jelínek <petr.jelinek@2ndquadrant.com>
Discussion: https://postgr.es/m/87y3fgoyrn.fsf@ars-thinkpad

src/backend/replication/logical/logical.c
src/backend/replication/slotfuncs.c

index 61588d626f608006196c769ad9807f1d3ac592e9..c9bbdcda74d40b7ae7a70a8130fb009cfc643330 100644 (file)
@@ -338,7 +338,10 @@ CreateInitDecodingContext(char *plugin,
  *             that, see below).
  *
  * output_plugin_options
- *             contains options passed to the output plugin.
+ *             options passed to the output plugin.
+ *
+ * fast_forward
+ *             bypass the generation of logical changes.
  *
  * read_page, prepare_write, do_write, update_progress
  *             callbacks that have to be filled to perform the use-case dependent,
index 23af32355b79fc09dfdaeaaddcf08e3cf00329e9..9c8c86f12b78d9bed3e82a0cd72d62fa1a027c6e 100644 (file)
@@ -317,10 +317,11 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
 }
 
 /*
- * Helper function for advancing physical replication slot forward.
- * The LSN position to move to is compared simply to the slot's
- * restart_lsn, knowing that any position older than that would be
- * removed by successive checkpoints.
+ * Helper function for advancing our physical replication slot forward.
+ *
+ * The LSN position to move to is compared simply to the slot's restart_lsn,
+ * knowing that any position older than that would be removed by successive
+ * checkpoints.
  */
 static XLogRecPtr
 pg_physical_replication_slot_advance(XLogRecPtr moveto)
@@ -340,68 +341,89 @@ pg_physical_replication_slot_advance(XLogRecPtr moveto)
 }
 
 /*
- * Helper function for advancing logical replication slot forward.
+ * Helper function for advancing our logical replication slot forward.
+ *
  * The slot's restart_lsn is used as start point for reading records,
  * while confirmed_lsn is used as base point for the decoding context.
- * The LSN position to move to is checked by doing a per-record scan and
- * logical decoding which makes sure that confirmed_lsn is updated to a
- * LSN which allows the future slot consumer to get consistent logical
- * changes.
+ *
+ * We cannot just do LogicalConfirmReceivedLocation to update confirmed_flush,
+ * because we need to digest WAL to advance restart_lsn allowing to recycle
+ * WAL and removal of old catalog tuples.  As decoding is done in fast_forward
+ * mode, no changes are generated anyway.
  */
 static XLogRecPtr
 pg_logical_replication_slot_advance(XLogRecPtr moveto)
 {
        LogicalDecodingContext *ctx;
        ResourceOwner old_resowner = CurrentResourceOwner;
-       XLogRecPtr      startlsn = MyReplicationSlot->data.restart_lsn;
-       XLogRecPtr      retlsn = MyReplicationSlot->data.confirmed_flush;
+       XLogRecPtr      startlsn;
+       XLogRecPtr      retlsn;
 
        PG_TRY();
        {
-               /* restart at slot's confirmed_flush */
+               /*
+                * Create our decoding context in fast_forward mode, passing start_lsn
+                * as InvalidXLogRecPtr, so that we start processing from my slot's
+                * confirmed_flush.
+                */
                ctx = CreateDecodingContext(InvalidXLogRecPtr,
                                                                        NIL,
-                                                                       true,
+                                                                       true,   /* fast_forward */
                                                                        logical_read_local_xlog_page,
                                                                        NULL, NULL, NULL);
 
-               CurrentResourceOwner = ResourceOwnerCreate(CurrentResourceOwner,
-                                                                                                  "logical decoding");
+               /*
+                * Start reading at the slot's restart_lsn, which we know to point to
+                * a valid record.
+                */
+               startlsn = MyReplicationSlot->data.restart_lsn;
+
+               /* Initialize our return value in case we don't do anything */
+               retlsn = MyReplicationSlot->data.confirmed_flush;
 
                /* invalidate non-timetravel entries */
                InvalidateSystemCaches();
 
-               /* Decode until we run out of records */
-               while ((startlsn != InvalidXLogRecPtr && startlsn < moveto) ||
-                          (ctx->reader->EndRecPtr != InvalidXLogRecPtr && ctx->reader->EndRecPtr < moveto))
+               /* Decode at least one record, until we run out of records */
+               while ((!XLogRecPtrIsInvalid(startlsn) &&
+                               startlsn < moveto) ||
+                          (!XLogRecPtrIsInvalid(ctx->reader->EndRecPtr) &&
+                               ctx->reader->EndRecPtr < moveto))
                {
-                       XLogRecord *record;
                        char       *errm = NULL;
+                       XLogRecord *record;
 
+                       /*
+                        * Read records.  No changes are generated in fast_forward mode,
+                        * but snapbuilder/slot statuses are updated properly.
+                        */
                        record = XLogReadRecord(ctx->reader, startlsn, &errm);
                        if (errm)
                                elog(ERROR, "%s", errm);
 
-                       /*
-                        * Now that we've set up the xlog reader state, subsequent calls
-                        * pass InvalidXLogRecPtr to say "continue from last record"
-                        */
+                       /* Read sequentially from now on */
                        startlsn = InvalidXLogRecPtr;
 
                        /*
-                        * The {begin_txn,change,commit_txn}_wrapper callbacks above will
-                        * store the description into our tuplestore.
+                        * Process the record.  Storage-level changes are ignored in
+                        * fast_forward mode, but other modules (such as snapbuilder)
+                        * might still have critical updates to do.
                         */
-                       if (record != NULL)
+                       if (record)
                                LogicalDecodingProcessRecord(ctx, ctx->reader);
 
-                       /* Stop once the moving point wanted by caller has been reached */
+                       /* Stop once the requested target has been reached */
                        if (moveto <= ctx->reader->EndRecPtr)
                                break;
 
                        CHECK_FOR_INTERRUPTS();
                }
 
+               /*
+                * Logical decoding could have clobbered CurrentResourceOwner during
+                * transaction management, so restore the executor's value.  (This is
+                * a kluge, but it's not worth cleaning up right now.)
+                */
                CurrentResourceOwner = old_resowner;
 
                if (ctx->reader->EndRecPtr != InvalidXLogRecPtr)
@@ -409,7 +431,7 @@ pg_logical_replication_slot_advance(XLogRecPtr moveto)
                        LogicalConfirmReceivedLocation(moveto);
 
                        /*
-                        * If only the confirmed_flush_lsn has changed the slot won't get
+                        * If only the confirmed_flush LSN has changed the slot won't get
                         * marked as dirty by the above. Callers on the walsender
                         * interface are expected to keep track of their own progress and
                         * don't need it written out. But SQL-interface users cannot