From 489b96e80b96c0eda02575347654e87968f2f5f4 Mon Sep 17 00:00:00 2001 From: Peter Eisentraut Date: Tue, 9 May 2017 14:40:42 -0400 Subject: [PATCH] Improve memory use in logical replication apply Previously, the memory used by the logical replication apply worker for processing messages would never be freed, so that could end up using a lot of memory. To improve that, change the existing ApplyContext memory context to ApplyMessageContext and reset that after every message (similar to MessageContext used elsewhere). For consistency of naming, rename the ApplyCacheContext to ApplyContext. Author: Stas Kelvich --- src/backend/replication/logical/worker.c | 51 ++++++++++++----------- src/backend/utils/mmgr/README | 11 +++++ src/include/replication/worker_internal.h | 4 +- 3 files changed, 40 insertions(+), 26 deletions(-) diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 362de12457..04813b506e 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -101,8 +101,8 @@ typedef struct SlotErrCallbackArg int attnum; } SlotErrCallbackArg; -static MemoryContext ApplyContext = NULL; -MemoryContext ApplyCacheContext = NULL; +static MemoryContext ApplyMessageContext = NULL; +MemoryContext ApplyContext = NULL; WalReceiverConn *wrconn = NULL; @@ -145,15 +145,16 @@ should_apply_changes_for_rel(LogicalRepRelMapEntry *rel) /* * Make sure that we started local transaction. * - * Also switches to ApplyContext as necessary. + * Also switches to ApplyMessageContext as necessary. */ static bool ensure_transaction(void) { if (IsTransactionState()) { - if (CurrentMemoryContext != ApplyContext) - MemoryContextSwitchTo(ApplyContext); + if (CurrentMemoryContext != ApplyMessageContext) + MemoryContextSwitchTo(ApplyMessageContext); + return false; } @@ -162,7 +163,7 @@ ensure_transaction(void) if (!MySubscriptionValid) reread_subscription(); - MemoryContextSwitchTo(ApplyContext); + MemoryContextSwitchTo(ApplyMessageContext); return true; } @@ -961,7 +962,7 @@ store_flush_position(XLogRecPtr remote_lsn) FlushPosition *flushpos; /* Need to do this in permanent context */ - MemoryContextSwitchTo(ApplyCacheContext); + MemoryContextSwitchTo(ApplyContext); /* Track commit lsn */ flushpos = (FlushPosition *) palloc(sizeof(FlushPosition)); @@ -969,7 +970,7 @@ store_flush_position(XLogRecPtr remote_lsn) flushpos->remote_end = remote_lsn; dlist_push_tail(&lsn_mapping, &flushpos->node); - MemoryContextSwitchTo(ApplyContext); + MemoryContextSwitchTo(ApplyMessageContext); } @@ -993,12 +994,13 @@ UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply) static void LogicalRepApplyLoop(XLogRecPtr last_received) { - /* Init the ApplyContext which we use for easier cleanup. */ - ApplyContext = AllocSetContextCreate(TopMemoryContext, - "ApplyContext", - ALLOCSET_DEFAULT_MINSIZE, - ALLOCSET_DEFAULT_INITSIZE, - ALLOCSET_DEFAULT_MAXSIZE); + /* + * Init the ApplyMessageContext which we clean up after each + * replication protocol message. + */ + ApplyMessageContext = AllocSetContextCreate(ApplyContext, + "ApplyMessageContext", + ALLOCSET_DEFAULT_SIZES); /* mark as idle, before starting to loop */ pgstat_report_activity(STATE_IDLE, NULL); @@ -1013,7 +1015,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received) TimestampTz last_recv_timestamp = GetCurrentTimestamp(); bool ping_sent = false; - MemoryContextSwitchTo(ApplyContext); + MemoryContextSwitchTo(ApplyMessageContext); len = walrcv_receive(wrconn, &buf, &fd); @@ -1045,7 +1047,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received) ping_sent = false; /* Ensure we are reading the data into our memory context. */ - MemoryContextSwitchTo(ApplyContext); + MemoryContextSwitchTo(ApplyMessageContext); s.data = buf; s.len = len; @@ -1091,6 +1093,8 @@ LogicalRepApplyLoop(XLogRecPtr last_received) UpdateWorkerStats(last_received, timestamp, true); } /* other message types are purposefully ignored */ + + MemoryContextReset(ApplyMessageContext); } len = walrcv_receive(wrconn, &buf, &fd); @@ -1115,7 +1119,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received) } /* Cleanup the memory. */ - MemoryContextResetAndDeleteChildren(ApplyContext); + MemoryContextResetAndDeleteChildren(ApplyMessageContext); MemoryContextSwitchTo(TopMemoryContext); /* Check if we need to exit the streaming loop. */ @@ -1258,7 +1262,7 @@ send_feedback(XLogRecPtr recvpos, bool force, bool requestReply) if (!reply_message) { - MemoryContext oldctx = MemoryContextSwitchTo(ApplyCacheContext); + MemoryContext oldctx = MemoryContextSwitchTo(ApplyContext); reply_message = makeStringInfo(); MemoryContextSwitchTo(oldctx); } @@ -1308,7 +1312,7 @@ reread_subscription(void) } /* Ensure allocations in permanent context. */ - oldctx = MemoryContextSwitchTo(ApplyCacheContext); + oldctx = MemoryContextSwitchTo(ApplyContext); newsub = GetSubscription(MyLogicalRepWorker->subid, true); @@ -1483,12 +1487,11 @@ ApplyWorkerMain(Datum main_arg) MyLogicalRepWorker->userid); /* Load the subscription into persistent memory context. */ - CreateCacheMemoryContext(); - ApplyCacheContext = AllocSetContextCreate(CacheMemoryContext, - "ApplyCacheContext", + ApplyContext = AllocSetContextCreate(TopMemoryContext, + "ApplyContext", ALLOCSET_DEFAULT_SIZES); StartTransactionCommand(); - oldctx = MemoryContextSwitchTo(ApplyCacheContext); + oldctx = MemoryContextSwitchTo(ApplyContext); MySubscription = GetSubscription(MyLogicalRepWorker->subid, false); MySubscriptionValid = true; MemoryContextSwitchTo(oldctx); @@ -1533,7 +1536,7 @@ ApplyWorkerMain(Datum main_arg) syncslotname = LogicalRepSyncTableStart(&origin_startpos); /* The slot name needs to be allocated in permanent memory context. */ - oldctx = MemoryContextSwitchTo(ApplyCacheContext); + oldctx = MemoryContextSwitchTo(ApplyContext); myslotname = pstrdup(syncslotname); MemoryContextSwitchTo(oldctx); diff --git a/src/backend/utils/mmgr/README b/src/backend/utils/mmgr/README index 480b1f89d0..387c337985 100644 --- a/src/backend/utils/mmgr/README +++ b/src/backend/utils/mmgr/README @@ -265,6 +265,17 @@ from prepared statements simply reference the prepared statements' trees, and don't actually need any storage allocated in their private contexts. +Logical Replication Worker Contexts +----------------------------------- + +ApplyContext --- permanent during whole lifetime of apply worker. It +is possible to use TopMemoryContext here as well, but for simplicity +of memory usage analysis we spin up different context. + +ApplyMessageContext --- short-lived context that is reset after each +logical replication protocol message is processed. + + Transient Contexts During Execution ----------------------------------- diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h index f6fee102b2..26788fec5c 100644 --- a/src/include/replication/worker_internal.h +++ b/src/include/replication/worker_internal.h @@ -56,8 +56,8 @@ typedef struct LogicalRepWorker TimestampTz reply_time; } LogicalRepWorker; -/* Memory context for cached variables in apply worker. */ -extern MemoryContext ApplyCacheContext; +/* Main memory context for apply worker. Permanent during worker lifetime. */ +extern MemoryContext ApplyContext; /* libpqreceiver connection */ extern struct WalReceiverConn *wrconn; -- 2.40.0