int attnum;
} SlotErrCallbackArg;
-static MemoryContext ApplyContext = NULL;
-MemoryContext ApplyCacheContext = NULL;
+static MemoryContext ApplyMessageContext = NULL;
+MemoryContext ApplyContext = NULL;
WalReceiverConn *wrconn = NULL;
/*
* Make sure that we started local transaction.
*
- * Also switches to ApplyContext as necessary.
+ * Also switches to ApplyMessageContext as necessary.
*/
static bool
ensure_transaction(void)
{
if (IsTransactionState())
{
- if (CurrentMemoryContext != ApplyContext)
- MemoryContextSwitchTo(ApplyContext);
+ if (CurrentMemoryContext != ApplyMessageContext)
+ MemoryContextSwitchTo(ApplyMessageContext);
+
return false;
}
if (!MySubscriptionValid)
reread_subscription();
- MemoryContextSwitchTo(ApplyContext);
+ MemoryContextSwitchTo(ApplyMessageContext);
return true;
}
FlushPosition *flushpos;
/* Need to do this in permanent context */
- MemoryContextSwitchTo(ApplyCacheContext);
+ MemoryContextSwitchTo(ApplyContext);
/* Track commit lsn */
flushpos = (FlushPosition *) palloc(sizeof(FlushPosition));
flushpos->remote_end = remote_lsn;
dlist_push_tail(&lsn_mapping, &flushpos->node);
- MemoryContextSwitchTo(ApplyContext);
+ MemoryContextSwitchTo(ApplyMessageContext);
}
static void
LogicalRepApplyLoop(XLogRecPtr last_received)
{
- /* Init the ApplyContext which we use for easier cleanup. */
- ApplyContext = AllocSetContextCreate(TopMemoryContext,
- "ApplyContext",
- ALLOCSET_DEFAULT_MINSIZE,
- ALLOCSET_DEFAULT_INITSIZE,
- ALLOCSET_DEFAULT_MAXSIZE);
+ /*
+ * Init the ApplyMessageContext which we clean up after each
+ * replication protocol message.
+ */
+ ApplyMessageContext = AllocSetContextCreate(ApplyContext,
+ "ApplyMessageContext",
+ ALLOCSET_DEFAULT_SIZES);
/* mark as idle, before starting to loop */
pgstat_report_activity(STATE_IDLE, NULL);
TimestampTz last_recv_timestamp = GetCurrentTimestamp();
bool ping_sent = false;
- MemoryContextSwitchTo(ApplyContext);
+ MemoryContextSwitchTo(ApplyMessageContext);
len = walrcv_receive(wrconn, &buf, &fd);
ping_sent = false;
/* Ensure we are reading the data into our memory context. */
- MemoryContextSwitchTo(ApplyContext);
+ MemoryContextSwitchTo(ApplyMessageContext);
s.data = buf;
s.len = len;
UpdateWorkerStats(last_received, timestamp, true);
}
/* other message types are purposefully ignored */
+
+ MemoryContextReset(ApplyMessageContext);
}
len = walrcv_receive(wrconn, &buf, &fd);
}
/* Cleanup the memory. */
- MemoryContextResetAndDeleteChildren(ApplyContext);
+ MemoryContextResetAndDeleteChildren(ApplyMessageContext);
MemoryContextSwitchTo(TopMemoryContext);
/* Check if we need to exit the streaming loop. */
if (!reply_message)
{
- MemoryContext oldctx = MemoryContextSwitchTo(ApplyCacheContext);
+ MemoryContext oldctx = MemoryContextSwitchTo(ApplyContext);
reply_message = makeStringInfo();
MemoryContextSwitchTo(oldctx);
}
}
/* Ensure allocations in permanent context. */
- oldctx = MemoryContextSwitchTo(ApplyCacheContext);
+ oldctx = MemoryContextSwitchTo(ApplyContext);
newsub = GetSubscription(MyLogicalRepWorker->subid, true);
MyLogicalRepWorker->userid);
/* Load the subscription into persistent memory context. */
- CreateCacheMemoryContext();
- ApplyCacheContext = AllocSetContextCreate(CacheMemoryContext,
- "ApplyCacheContext",
+ ApplyContext = AllocSetContextCreate(TopMemoryContext,
+ "ApplyContext",
ALLOCSET_DEFAULT_SIZES);
StartTransactionCommand();
- oldctx = MemoryContextSwitchTo(ApplyCacheContext);
+ oldctx = MemoryContextSwitchTo(ApplyContext);
MySubscription = GetSubscription(MyLogicalRepWorker->subid, false);
MySubscriptionValid = true;
MemoryContextSwitchTo(oldctx);
syncslotname = LogicalRepSyncTableStart(&origin_startpos);
/* The slot name needs to be allocated in permanent memory context. */
- oldctx = MemoryContextSwitchTo(ApplyCacheContext);
+ oldctx = MemoryContextSwitchTo(ApplyContext);
myslotname = pstrdup(syncslotname);
MemoryContextSwitchTo(oldctx);