From 024711bb544645c8b1061e9f02b261e2e336981d Mon Sep 17 00:00:00 2001 From: Simon Riggs Date: Fri, 12 May 2017 10:50:56 +0100 Subject: [PATCH] Lag tracking for logical replication Lag tracking is called for each commit, but we introduce a pacing delay to ensure we don't swamp the lag tracker. Author: Petr Jelinek, with minor pacing delay code from me --- src/backend/replication/logical/logical.c | 34 ++++++++++---- .../replication/logical/logicalfuncs.c | 2 +- src/backend/replication/pgoutput/pgoutput.c | 2 + src/backend/replication/slotfuncs.c | 3 +- src/backend/replication/walsender.c | 45 +++++++++++++++---- src/include/replication/logical.h | 15 +++++-- src/include/replication/output_plugin.h | 1 + 7 files changed, 79 insertions(+), 23 deletions(-) diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index ab963c5345..7409e5ce3d 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -117,7 +117,8 @@ StartupDecodingContext(List *output_plugin_options, bool need_full_snapshot, XLogPageReadCB read_page, LogicalOutputPluginWriterPrepareWrite prepare_write, - LogicalOutputPluginWriterWrite do_write) + LogicalOutputPluginWriterWrite do_write, + LogicalOutputPluginWriterUpdateProgress update_progress) { ReplicationSlot *slot; MemoryContext context, @@ -186,6 +187,7 @@ StartupDecodingContext(List *output_plugin_options, ctx->out = makeStringInfo(); ctx->prepare_write = prepare_write; ctx->write = do_write; + ctx->update_progress = update_progress; ctx->output_plugin_options = output_plugin_options; @@ -199,8 +201,9 @@ StartupDecodingContext(List *output_plugin_options, * * plugin contains the name of the output plugin * output_plugin_options contains options passed to the output plugin - * read_page, prepare_write, do_write are callbacks that have to be filled to - * perform the use-case dependent, actual, work. + * read_page, prepare_write, do_write, update_progress + * callbacks that have to be filled to perform the use-case dependent, + * actual, work. * * Needs to be called while in a memory context that's at least as long lived * as the decoding context because further memory contexts will be created @@ -215,7 +218,8 @@ CreateInitDecodingContext(char *plugin, bool need_full_snapshot, XLogPageReadCB read_page, LogicalOutputPluginWriterPrepareWrite prepare_write, - LogicalOutputPluginWriterWrite do_write) + LogicalOutputPluginWriterWrite do_write, + LogicalOutputPluginWriterUpdateProgress update_progress) { TransactionId xmin_horizon = InvalidTransactionId; ReplicationSlot *slot; @@ -300,7 +304,7 @@ CreateInitDecodingContext(char *plugin, ctx = StartupDecodingContext(NIL, InvalidXLogRecPtr, xmin_horizon, need_full_snapshot, read_page, prepare_write, - do_write); + do_write, update_progress); /* call output plugin initialization callback */ old_context = MemoryContextSwitchTo(ctx->context); @@ -324,7 +328,7 @@ CreateInitDecodingContext(char *plugin, * output_plugin_options * contains options passed to the output plugin. * - * read_page, prepare_write, do_write + * read_page, prepare_write, do_write, update_progress * callbacks that have to be filled to perform the use-case dependent, * actual work. * @@ -340,7 +344,8 @@ CreateDecodingContext(XLogRecPtr start_lsn, List *output_plugin_options, XLogPageReadCB read_page, LogicalOutputPluginWriterPrepareWrite prepare_write, - LogicalOutputPluginWriterWrite do_write) + LogicalOutputPluginWriterWrite do_write, + LogicalOutputPluginWriterUpdateProgress update_progress) { LogicalDecodingContext *ctx; ReplicationSlot *slot; @@ -390,7 +395,8 @@ CreateDecodingContext(XLogRecPtr start_lsn, ctx = StartupDecodingContext(output_plugin_options, start_lsn, InvalidTransactionId, false, - read_page, prepare_write, do_write); + read_page, prepare_write, do_write, + update_progress); /* call output plugin initialization callback */ old_context = MemoryContextSwitchTo(ctx->context); @@ -503,6 +509,18 @@ OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write) ctx->prepared_write = false; } +/* + * Update progress tracking (if supported). + */ +void +OutputPluginUpdateProgress(struct LogicalDecodingContext *ctx) +{ + if (!ctx->update_progress) + return; + + ctx->update_progress(ctx, ctx->write_location, ctx->write_xid); +} + /* * Load the output plugin, lookup its output plugin init function, and check * that it provides the required callbacks. diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c index c251b92f57..27164de093 100644 --- a/src/backend/replication/logical/logicalfuncs.c +++ b/src/backend/replication/logical/logicalfuncs.c @@ -253,7 +253,7 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin options, logical_read_local_xlog_page, LogicalOutputPrepareWrite, - LogicalOutputWrite); + LogicalOutputWrite, NULL); MemoryContextSwitchTo(oldcontext); diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index f3eaccffd5..4ddfbf7a98 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -244,6 +244,8 @@ static void pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn) { + OutputPluginUpdateProgress(ctx); + OutputPluginPrepareWrite(ctx, true); logicalrep_write_commit(ctx->out, txn, commit_lsn); OutputPluginWrite(ctx, true); diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c index 6ee1e68819..56a9ca9651 100644 --- a/src/backend/replication/slotfuncs.c +++ b/src/backend/replication/slotfuncs.c @@ -133,7 +133,8 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS) */ ctx = CreateInitDecodingContext(NameStr(*plugin), NIL, false, /* do not build snapshot */ - logical_read_local_xlog_page, NULL, NULL); + logical_read_local_xlog_page, NULL, NULL, + NULL); /* build initial snapshot, might take a while */ DecodingContextFindStartpoint(ctx); diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 45d027803a..e4e5337d54 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -245,7 +245,9 @@ static void WalSndCheckTimeOut(TimestampTz now); static long WalSndComputeSleeptime(TimestampTz now); static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write); static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write); +static void WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid); static XLogRecPtr WalSndWaitForWal(XLogRecPtr loc); +static void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time); static TimeOffset LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now); static bool TransactionIdInRecentPast(TransactionId xid, uint32 epoch); @@ -923,7 +925,8 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd) ctx = CreateInitDecodingContext(cmd->plugin, NIL, need_full_snapshot, logical_read_xlog_page, - WalSndPrepareWrite, WalSndWriteData); + WalSndPrepareWrite, WalSndWriteData, + WalSndUpdateProgress); /* * Signal that we don't need the timeout mechanism. We're just @@ -1077,10 +1080,11 @@ StartLogicalReplication(StartReplicationCmd *cmd) * Initialize position to the last ack'ed one, then the xlog records begin * to be shipped from that position. */ - logical_decoding_ctx = CreateDecodingContext( - cmd->startpoint, cmd->options, + logical_decoding_ctx = CreateDecodingContext(cmd->startpoint, cmd->options, logical_read_xlog_page, - WalSndPrepareWrite, WalSndWriteData); + WalSndPrepareWrite, + WalSndWriteData, + WalSndUpdateProgress); /* Start reading WAL from the oldest required WAL. */ logical_startptr = MyReplicationSlot->data.restart_lsn; @@ -1239,6 +1243,30 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, SetLatch(MyLatch); } +/* + * LogicalDecodingContext 'progress_update' callback. + * + * Write the current position to the log tracker (see XLogSendPhysical). + */ +static void +WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid) +{ + static TimestampTz sendTime = 0; + TimestampTz now = GetCurrentTimestamp(); + + /* + * Track lag no more than once per WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS + * to avoid flooding the lag tracker when we commit frequently. + */ +#define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS 1000 + if (!TimestampDifferenceExceeds(sendTime, now, + WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS)) + return; + + LagTrackerWrite(lsn, now); + sendTime = now; +} + /* * Wait till WAL < loc is flushed to disk so it can be safely read. */ @@ -2730,9 +2758,9 @@ XLogSendLogical(void) if (record != NULL) { /* - * Note the lack of any call to LagTrackerWrite() which is the responsibility - * of the logical decoding plugin. Response messages are handled normally, - * so this responsibility does not extend to needing to call LagTrackerRead(). + * Note the lack of any call to LagTrackerWrite() which is handled + * by WalSndUpdateProgress which is called by output plugin through + * logical decoding write api. */ LogicalDecodingProcessRecord(logical_decoding_ctx, logical_decoding_ctx->reader); @@ -3328,9 +3356,8 @@ WalSndKeepaliveIfNecessary(TimestampTz now) * LagTrackerRead can compute the elapsed time (lag) when this WAL position is * eventually reported to have been written, flushed and applied by the * standby in a reply message. - * Exported to allow logical decoding plugins to call this when they choose. */ -void +static void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time) { bool buffer_full; diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h index d0b2e0bbae..090f9c8268 100644 --- a/src/include/replication/logical.h +++ b/src/include/replication/logical.h @@ -26,6 +26,12 @@ typedef void (*LogicalOutputPluginWriterWrite) ( typedef LogicalOutputPluginWriterWrite LogicalOutputPluginWriterPrepareWrite; +typedef void (*LogicalOutputPluginWriterUpdateProgress) ( + struct LogicalDecodingContext *lr, + XLogRecPtr Ptr, + TransactionId xid +); + typedef struct LogicalDecodingContext { /* memory context this is all allocated in */ @@ -52,6 +58,7 @@ typedef struct LogicalDecodingContext */ LogicalOutputPluginWriterPrepareWrite prepare_write; LogicalOutputPluginWriterWrite write; + LogicalOutputPluginWriterUpdateProgress update_progress; /* * Output buffer. @@ -85,13 +92,15 @@ extern LogicalDecodingContext *CreateInitDecodingContext(char *plugin, bool need_full_snapshot, XLogPageReadCB read_page, LogicalOutputPluginWriterPrepareWrite prepare_write, - LogicalOutputPluginWriterWrite do_write); + LogicalOutputPluginWriterWrite do_write, + LogicalOutputPluginWriterUpdateProgress update_progress); extern LogicalDecodingContext *CreateDecodingContext( XLogRecPtr start_lsn, List *output_plugin_options, XLogPageReadCB read_page, LogicalOutputPluginWriterPrepareWrite prepare_write, - LogicalOutputPluginWriterWrite do_write); + LogicalOutputPluginWriterWrite do_write, + LogicalOutputPluginWriterUpdateProgress update_progress); extern void DecodingContextFindStartpoint(LogicalDecodingContext *ctx); extern bool DecodingContextReady(LogicalDecodingContext *ctx); extern void FreeDecodingContext(LogicalDecodingContext *ctx); @@ -101,8 +110,6 @@ extern void LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn, XLogRecPtr restart_lsn); extern void LogicalConfirmReceivedLocation(XLogRecPtr lsn); -extern void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time); - extern bool filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id); #endif diff --git a/src/include/replication/output_plugin.h b/src/include/replication/output_plugin.h index 08e962d0c0..2435e2be2d 100644 --- a/src/include/replication/output_plugin.h +++ b/src/include/replication/output_plugin.h @@ -106,5 +106,6 @@ typedef struct OutputPluginCallbacks /* Functions in replication/logical/logical.c */ extern void OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write); extern void OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write); +extern void OutputPluginUpdateProgress(struct LogicalDecodingContext *ctx); #endif /* OUTPUT_PLUGIN_H */ -- 2.40.0