diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 656d399..23b7733 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -101,8 +101,8 @@ typedef struct SlotErrCallbackArg int attnum; } SlotErrCallbackArg; -static MemoryContext ApplyContext = NULL; -MemoryContext ApplyCacheContext = NULL; +static MemoryContext ApplyMessageContext = NULL; +MemoryContext ApplyContext = NULL; WalReceiverConn *wrconn = NULL; @@ -145,15 +145,16 @@ should_apply_changes_for_rel(LogicalRepRelMapEntry *rel) /* * Make sure that we started local transaction. * - * Also switches to ApplyContext as necessary. + * Also switches to ApplyMessageContext as necessary. */ static bool ensure_transaction(void) { if (IsTransactionState()) { - if (CurrentMemoryContext != ApplyContext) - MemoryContextSwitchTo(ApplyContext); + if (CurrentMemoryContext != ApplyMessageContext) + MemoryContextSwitchTo(ApplyMessageContext); + return false; } @@ -162,7 +163,7 @@ ensure_transaction(void) if (!MySubscriptionValid) reread_subscription(); - MemoryContextSwitchTo(ApplyContext); + MemoryContextSwitchTo(ApplyMessageContext); return true; } @@ -592,6 +593,7 @@ apply_handle_insert(StringInfo s) logicalrep_rel_close(rel, NoLock); CommandCounterIncrement(); + MemoryContextReset(ApplyMessageContext); } /* @@ -746,6 +748,7 @@ apply_handle_update(StringInfo s) logicalrep_rel_close(rel, NoLock); CommandCounterIncrement(); + MemoryContextReset(ApplyMessageContext); } /* @@ -846,6 +849,7 @@ apply_handle_delete(StringInfo s) logicalrep_rel_close(rel, NoLock); CommandCounterIncrement(); + MemoryContextReset(ApplyMessageContext); } @@ -961,7 +965,7 @@ store_flush_position(XLogRecPtr remote_lsn) FlushPosition *flushpos; /* Need to do this in permanent context */ - MemoryContextSwitchTo(ApplyCacheContext); + MemoryContextSwitchTo(ApplyContext); /* Track commit lsn */ flushpos = (FlushPosition *) palloc(sizeof(FlushPosition)); @@ -969,7 +973,7 @@ store_flush_position(XLogRecPtr remote_lsn) flushpos->remote_end = remote_lsn; dlist_push_tail(&lsn_mapping, &flushpos->node); - MemoryContextSwitchTo(ApplyContext); + MemoryContextSwitchTo(ApplyMessageContext); } @@ -993,12 +997,13 @@ UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply) static void LogicalRepApplyLoop(XLogRecPtr last_received) { - /* Init the ApplyContext which we use for easier cleanup. */ - ApplyContext = AllocSetContextCreate(TopMemoryContext, - "ApplyContext", - ALLOCSET_DEFAULT_MINSIZE, - ALLOCSET_DEFAULT_INITSIZE, - ALLOCSET_DEFAULT_MAXSIZE); + /* + * Init the ApplyMessageContext which we clean up after each + * command counter increment. + */ + ApplyMessageContext = AllocSetContextCreate(ApplyContext, + "ApplyMessageContext", + ALLOCSET_DEFAULT_SIZES); /* mark as idle, before starting to loop */ pgstat_report_activity(STATE_IDLE, NULL); @@ -1013,7 +1018,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received) TimestampTz last_recv_timestamp = GetCurrentTimestamp(); bool ping_sent = false; - MemoryContextSwitchTo(ApplyContext); + MemoryContextSwitchTo(ApplyMessageContext); len = walrcv_receive(wrconn, &buf, &fd); @@ -1045,7 +1050,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received) ping_sent = false; /* Ensure we are reading the data into our memory context. */ - MemoryContextSwitchTo(ApplyContext); + MemoryContextSwitchTo(ApplyMessageContext); s.data = buf; s.len = len; @@ -1115,7 +1120,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received) } /* Cleanup the memory. */ - MemoryContextResetAndDeleteChildren(ApplyContext); + MemoryContextResetAndDeleteChildren(ApplyMessageContext); MemoryContextSwitchTo(TopMemoryContext); /* Check if we need to exit the streaming loop. */ @@ -1258,7 +1263,7 @@ send_feedback(XLogRecPtr recvpos, bool force, bool requestReply) if (!reply_message) { - MemoryContext oldctx = MemoryContextSwitchTo(ApplyCacheContext); + MemoryContext oldctx = MemoryContextSwitchTo(ApplyContext); reply_message = makeStringInfo(); MemoryContextSwitchTo(oldctx); } @@ -1308,7 +1313,7 @@ reread_subscription(void) } /* Ensure allocations in permanent context. */ - oldctx = MemoryContextSwitchTo(ApplyCacheContext); + oldctx = MemoryContextSwitchTo(ApplyContext); newsub = GetSubscription(MyLogicalRepWorker->subid, true); @@ -1480,12 +1485,11 @@ ApplyWorkerMain(Datum main_arg) MyLogicalRepWorker->userid); /* Load the subscription into persistent memory context. */ - CreateCacheMemoryContext(); - ApplyCacheContext = AllocSetContextCreate(CacheMemoryContext, - "ApplyCacheContext", + ApplyContext = AllocSetContextCreate(TopMemoryContext, + "ApplyContext", ALLOCSET_DEFAULT_SIZES); StartTransactionCommand(); - oldctx = MemoryContextSwitchTo(ApplyCacheContext); + oldctx = MemoryContextSwitchTo(ApplyContext); MySubscription = GetSubscription(MyLogicalRepWorker->subid, false); MySubscriptionValid = true; MemoryContextSwitchTo(oldctx); @@ -1530,7 +1534,7 @@ ApplyWorkerMain(Datum main_arg) syncslotname = LogicalRepSyncTableStart(&origin_startpos); /* The slot name needs to be allocated in permanent memory context. */ - oldctx = MemoryContextSwitchTo(ApplyCacheContext); + oldctx = MemoryContextSwitchTo(ApplyContext); myslotname = pstrdup(syncslotname); MemoryContextSwitchTo(oldctx); diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h index b8e35d4..b13e671 100644 --- a/src/include/replication/worker_internal.h +++ b/src/include/replication/worker_internal.h @@ -47,8 +47,8 @@ typedef struct LogicalRepWorker TimestampTz reply_time; } LogicalRepWorker; -/* Memory context for cached variables in apply worker. */ -extern MemoryContext ApplyCacheContext; +/* Main memory context for apply worker. Permanent during worker lifetime. */ +extern MemoryContext ApplyContext; /* libpqreceiver connection */ extern struct WalReceiverConn *wrconn;