]> granicus.if.org Git - postgresql/commitdiff
Improve memory use in logical replication apply
authorPeter Eisentraut <peter_e@gmx.net>
Tue, 9 May 2017 18:40:42 +0000 (14:40 -0400)
committerPeter Eisentraut <peter_e@gmx.net>
Tue, 9 May 2017 18:51:49 +0000 (14:51 -0400)
Previously, the memory used by the logical replication apply worker for
processing messages would never be freed, so that could end up using a
lot of memory.  To improve that, change the existing ApplyContext memory
context to ApplyMessageContext and reset that after every
message (similar to MessageContext used elsewhere).  For consistency of
naming, rename the ApplyCacheContext to ApplyContext.

Author: Stas Kelvich <s.kelvich@postgrespro.ru>

src/backend/replication/logical/worker.c
src/backend/utils/mmgr/README
src/include/replication/worker_internal.h

index 362de12457b54f34a352a00bc382e290f8e26e70..04813b506e149f9e032b1ffaee49c4300a85a69c 100644 (file)
@@ -101,8 +101,8 @@ typedef struct SlotErrCallbackArg
        int                     attnum;
 } SlotErrCallbackArg;
 
-static MemoryContext   ApplyContext = NULL;
-MemoryContext                  ApplyCacheContext = NULL;
+static MemoryContext   ApplyMessageContext = NULL;
+MemoryContext                  ApplyContext = NULL;
 
 WalReceiverConn           *wrconn = NULL;
 
@@ -145,15 +145,16 @@ should_apply_changes_for_rel(LogicalRepRelMapEntry *rel)
 /*
  * Make sure that we started local transaction.
  *
- * Also switches to ApplyContext as necessary.
+ * Also switches to ApplyMessageContext as necessary.
  */
 static bool
 ensure_transaction(void)
 {
        if (IsTransactionState())
        {
-               if (CurrentMemoryContext != ApplyContext)
-                       MemoryContextSwitchTo(ApplyContext);
+               if (CurrentMemoryContext != ApplyMessageContext)
+                       MemoryContextSwitchTo(ApplyMessageContext);
+
                return false;
        }
 
@@ -162,7 +163,7 @@ ensure_transaction(void)
        if (!MySubscriptionValid)
                reread_subscription();
 
-       MemoryContextSwitchTo(ApplyContext);
+       MemoryContextSwitchTo(ApplyMessageContext);
        return true;
 }
 
@@ -961,7 +962,7 @@ store_flush_position(XLogRecPtr remote_lsn)
        FlushPosition *flushpos;
 
        /* Need to do this in permanent context */
-       MemoryContextSwitchTo(ApplyCacheContext);
+       MemoryContextSwitchTo(ApplyContext);
 
        /* Track commit lsn  */
        flushpos = (FlushPosition *) palloc(sizeof(FlushPosition));
@@ -969,7 +970,7 @@ store_flush_position(XLogRecPtr remote_lsn)
        flushpos->remote_end = remote_lsn;
 
        dlist_push_tail(&lsn_mapping, &flushpos->node);
-       MemoryContextSwitchTo(ApplyContext);
+       MemoryContextSwitchTo(ApplyMessageContext);
 }
 
 
@@ -993,12 +994,13 @@ UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply)
 static void
 LogicalRepApplyLoop(XLogRecPtr last_received)
 {
-       /* Init the ApplyContext which we use for easier cleanup. */
-       ApplyContext = AllocSetContextCreate(TopMemoryContext,
-                                                                                "ApplyContext",
-                                                                                ALLOCSET_DEFAULT_MINSIZE,
-                                                                                ALLOCSET_DEFAULT_INITSIZE,
-                                                                                ALLOCSET_DEFAULT_MAXSIZE);
+       /*
+        * Init the ApplyMessageContext which we clean up after each
+        * replication protocol message.
+        */
+       ApplyMessageContext = AllocSetContextCreate(ApplyContext,
+                                                                                "ApplyMessageContext",
+                                                                                ALLOCSET_DEFAULT_SIZES);
 
        /* mark as idle, before starting to loop */
        pgstat_report_activity(STATE_IDLE, NULL);
@@ -1013,7 +1015,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
                TimestampTz last_recv_timestamp = GetCurrentTimestamp();
                bool            ping_sent = false;
 
-               MemoryContextSwitchTo(ApplyContext);
+               MemoryContextSwitchTo(ApplyMessageContext);
 
                len = walrcv_receive(wrconn, &buf, &fd);
 
@@ -1045,7 +1047,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
                                        ping_sent = false;
 
                                        /* Ensure we are reading the data into our memory context. */
-                                       MemoryContextSwitchTo(ApplyContext);
+                                       MemoryContextSwitchTo(ApplyMessageContext);
 
                                        s.data = buf;
                                        s.len = len;
@@ -1091,6 +1093,8 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
                                                UpdateWorkerStats(last_received, timestamp, true);
                                        }
                                        /* other message types are purposefully ignored */
+
+                                       MemoryContextReset(ApplyMessageContext);
                                }
 
                                len = walrcv_receive(wrconn, &buf, &fd);
@@ -1115,7 +1119,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
                }
 
                /* Cleanup the memory. */
-               MemoryContextResetAndDeleteChildren(ApplyContext);
+               MemoryContextResetAndDeleteChildren(ApplyMessageContext);
                MemoryContextSwitchTo(TopMemoryContext);
 
                /* Check if we need to exit the streaming loop. */
@@ -1258,7 +1262,7 @@ send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
 
        if (!reply_message)
        {
-               MemoryContext   oldctx = MemoryContextSwitchTo(ApplyCacheContext);
+               MemoryContext   oldctx = MemoryContextSwitchTo(ApplyContext);
                reply_message = makeStringInfo();
                MemoryContextSwitchTo(oldctx);
        }
@@ -1308,7 +1312,7 @@ reread_subscription(void)
        }
 
        /* Ensure allocations in permanent context. */
-       oldctx = MemoryContextSwitchTo(ApplyCacheContext);
+       oldctx = MemoryContextSwitchTo(ApplyContext);
 
        newsub = GetSubscription(MyLogicalRepWorker->subid, true);
 
@@ -1483,12 +1487,11 @@ ApplyWorkerMain(Datum main_arg)
                                                                                          MyLogicalRepWorker->userid);
 
        /* Load the subscription into persistent memory context. */
-       CreateCacheMemoryContext();
-       ApplyCacheContext = AllocSetContextCreate(CacheMemoryContext,
-                                                                                         "ApplyCacheContext",
+       ApplyContext = AllocSetContextCreate(TopMemoryContext,
+                                                                                         "ApplyContext",
                                                                                          ALLOCSET_DEFAULT_SIZES);
        StartTransactionCommand();
-       oldctx = MemoryContextSwitchTo(ApplyCacheContext);
+       oldctx = MemoryContextSwitchTo(ApplyContext);
        MySubscription = GetSubscription(MyLogicalRepWorker->subid, false);
        MySubscriptionValid = true;
        MemoryContextSwitchTo(oldctx);
@@ -1533,7 +1536,7 @@ ApplyWorkerMain(Datum main_arg)
                syncslotname = LogicalRepSyncTableStart(&origin_startpos);
 
                /* The slot name needs to be allocated in permanent memory context. */
-               oldctx = MemoryContextSwitchTo(ApplyCacheContext);
+               oldctx = MemoryContextSwitchTo(ApplyContext);
                myslotname = pstrdup(syncslotname);
                MemoryContextSwitchTo(oldctx);
 
index 480b1f89d0243388522d718c63cab51761680618..387c337985fe97d95de8beffecffc9a4014c0455 100644 (file)
@@ -265,6 +265,17 @@ from prepared statements simply reference the prepared statements' trees,
 and don't actually need any storage allocated in their private contexts.
 
 
+Logical Replication Worker Contexts
+-----------------------------------
+
+ApplyContext --- permanent during whole lifetime of apply worker.  It
+is possible to use TopMemoryContext here as well, but for simplicity
+of memory usage analysis we spin up different context.
+
+ApplyMessageContext --- short-lived context that is reset after each
+logical replication protocol message is processed.
+
+
 Transient Contexts During Execution
 -----------------------------------
 
index f6fee102b2abefaeb47b5ecc2451febb353ad2b2..26788fec5c188d34bd74c634433d0361d93cb3bb 100644 (file)
@@ -56,8 +56,8 @@ typedef struct LogicalRepWorker
        TimestampTz     reply_time;
 } LogicalRepWorker;
 
-/* Memory context for cached variables in apply worker. */
-extern MemoryContext                           ApplyCacheContext;
+/* Main memory context for apply worker. Permanent during worker lifetime. */
+extern MemoryContext                           ApplyContext;
 
 /* libpqreceiver connection */
 extern struct WalReceiverConn     *wrconn;