* Create a new decoding context, for a logical slot that has previously been
* used already.
*
- * start_lsn contains the LSN of the last received data or InvalidXLogRecPtr
- * output_plugin_options contains options passed to the output plugin
- * read_page, prepare_write, do_write are callbacks that have to be filled to
- * perform the use-case dependent, actual, work.
+ * start_lsn
+ * The LSN at which to start decoding. If InvalidXLogRecPtr, restart
+ * from the slot's confirmed_flush; otherwise, start from the specified
+ * location (but move it forwards to confirmed_flush if it's older than
+ * that, see below).
+ *
+ * output_plugin_options
+ * contains options passed to the output plugin.
+ *
+ * read_page, prepare_write, do_write
+ * callbacks that have to be filled to perform the use-case dependent,
+ * actual work.
*
* Needs to be called while in a memory context that's at least as long lived
* as the decoding context because further memory contexts will be created
* replication slot.
*
* Note that in the most cases, we won't be able to immediately use the xmin
- * to increase the xmin horizon, we need to wait till the client has confirmed
+ * to increase the xmin horizon: we need to wait till the client has confirmed
* receiving current_lsn with LogicalConfirmReceivedLocation().
*/
void
MyReplicationSlot->data.confirmed_flush = lsn;
- /* if were past the location required for bumping xmin, do so */
+ /* if we're past the location required for bumping xmin, do so */
if (MyReplicationSlot->candidate_xmin_lsn != InvalidXLogRecPtr &&
MyReplicationSlot->candidate_xmin_lsn <= lsn)
{
SpinLockRelease(&MyReplicationSlot->mutex);
- /* first write new xmin to disk, so we know whats up after a crash */
+ /* first write new xmin to disk, so we know what's up after a crash */
if (updated_xmin || updated_restart)
{
ReplicationSlotMarkDirty();
PG_TRY();
{
- /*
- * Passing InvalidXLogRecPtr here causes replay to start at the slot's
- * confirmed_flush.
- */
+ /* restart at slot's confirmed_flush */
ctx = CreateDecodingContext(InvalidXLogRecPtr,
options,
logical_read_local_xlog_page,
ctx->output_writer_private = p;
/*
- * We start reading xlog from the restart lsn, even though in
- * CreateDecodingContext we set the snapshot builder up using the
- * slot's confirmed_flush. This means we might read xlog we don't
- * actually decode rows from, but the snapshot builder might need it
- * to get to a consistent point. The point we start returning data to
- * *users* at is the confirmed_flush lsn set up in the decoding
- * context.
+ * Decoding of WAL must start at restart_lsn so that the entirety of
+ * xacts that committed after the slot's confirmed_flush can be
+ * accumulated into reorder buffers.
*/
startptr = MyReplicationSlot->data.restart_lsn;
/* oldest LSN that might be required by this replication slot */
XLogRecPtr restart_lsn;
- /* oldest LSN that the client has acked receipt for */
+ /*
+ * Oldest LSN that the client has acked receipt for. This is used as the
+ * start_lsn point in case the client doesn't specify one, and also as a
+ * safety measure to back off in case the client specifies a start_lsn
+ * that's further in the future than this value.
+ */
XLogRecPtr confirmed_flush;
/* plugin name */
/* all the remaining data is only used for logical slots */
- /* ----
+ /*
* When the client has confirmed flushes >= candidate_xmin_lsn we can
- * advance the catalog xmin, when restart_valid has been passed,
+ * advance the catalog xmin. When restart_valid has been passed,
* restart_lsn can be increased.
- * ----
*/
TransactionId candidate_catalog_xmin;
XLogRecPtr candidate_xmin_lsn;