Logical replication ApplyContext bloat

Started by Stas Kelvichover 8 years ago17 messages
#1Stas Kelvich
s.kelvich@postgrespro.ru
1 attachment(s)

Hi,

currently logical replication worker uses ApplyContext to decode received data
and that context is never freed during transaction processing. Hence if publication
side is performing something like 10M row inserts in single transaction, then
subscription worker will occupy more than 10G of ram (and can be killed by OOM).

Attached patch resets ApplyContext after each insert/update/delete/commit.
I’ve tried to reset context only on each 100/1000/10000 value of CommandCounter,
but didn’t spotted any measurable difference in speed.

Problem spotted by Mikhail Shurutov.

Attachments:

applycontext_bloat.patchapplication/octet-stream; name=applycontext_bloat.patch; x-unix-mode=0644Download
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 656d399..d839324 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -147,23 +147,25 @@ should_apply_changes_for_rel(LogicalRepRelMapEntry *rel)
  *
  * Also switches to ApplyContext as necessary.
  */
-static bool
+static void
 ensure_transaction(void)
 {
-	if (IsTransactionState())
+	if (!IsTransactionState())
 	{
-		if (CurrentMemoryContext != ApplyContext)
-			MemoryContextSwitchTo(ApplyContext);
-		return false;
-	}
+		StartTransactionCommand();
 
-	StartTransactionCommand();
+		if (!MySubscriptionValid)
+			reread_subscription();
+	}
 
-	if (!MySubscriptionValid)
-		reread_subscription();
+	if (CurrentMemoryContext != ApplyContext)
+		MemoryContextSwitchTo(ApplyContext);
 
-	MemoryContextSwitchTo(ApplyContext);
-	return true;
+	/*
+	 * Free memory that was used to construct tuples in logicalrep_read_*
+	 * functions.
+	 */
+	MemoryContextReset(ApplyContext);
 }
 
 
#2Petr Jelinek
petr.jelinek@2ndquadrant.com
In reply to: Stas Kelvich (#1)
Re: Logical replication ApplyContext bloat

On 18/04/17 13:45, Stas Kelvich wrote:

Hi,

currently logical replication worker uses ApplyContext to decode received data
and that context is never freed during transaction processing. Hence if publication
side is performing something like 10M row inserts in single transaction, then
subscription worker will occupy more than 10G of ram (and can be killed by OOM).

Attached patch resets ApplyContext after each insert/update/delete/commit.
I�ve tried to reset context only on each 100/1000/10000 value of CommandCounter,
but didn�t spotted any measurable difference in speed.

Problem spotted by Mikhail Shurutov.

Hmm we also do replication protocol handling inside the ApplyContext
(LogicalRepApplyLoop), are you sure this change is safe in that regard?

--
Petr Jelinek http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Training & Services

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

#3Stas Kelvich
s.kelvich@postgrespro.ru
In reply to: Petr Jelinek (#2)
Re: Logical replication ApplyContext bloat

On 19 Apr 2017, at 12:37, Petr Jelinek <petr.jelinek@2ndquadrant.com> wrote:

On 18/04/17 13:45, Stas Kelvich wrote:

Hi,

currently logical replication worker uses ApplyContext to decode received data
and that context is never freed during transaction processing. Hence if publication
side is performing something like 10M row inserts in single transaction, then
subscription worker will occupy more than 10G of ram (and can be killed by OOM).

Attached patch resets ApplyContext after each insert/update/delete/commit.
I’ve tried to reset context only on each 100/1000/10000 value of CommandCounter,
but didn’t spotted any measurable difference in speed.

Problem spotted by Mikhail Shurutov.

Hmm we also do replication protocol handling inside the ApplyContext
(LogicalRepApplyLoop), are you sure this change is safe in that regard?

Memory is bloated by logicalrep_read_* when palloc happens inside.
I’ve inserted context reset in ensure_transaction() which is only called in beginning
of hande_insert/update/delete/commit when data from previous call of that
function isn’t needed. So probably it is safe to clear memory there. At least
i don’t see any state that can be accessed later in this functions. Do you
have any specific concerns?

Stas Kelvich
Postgres Professional: http://www.postgrespro.com
The Russian Postgres Company

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

#4Petr Jelinek
petr.jelinek@2ndquadrant.com
In reply to: Stas Kelvich (#3)
Re: Logical replication ApplyContext bloat

On 19/04/17 11:55, Stas Kelvich wrote:

On 19 Apr 2017, at 12:37, Petr Jelinek <petr.jelinek@2ndquadrant.com> wrote:

On 18/04/17 13:45, Stas Kelvich wrote:

Hi,

currently logical replication worker uses ApplyContext to decode received data
and that context is never freed during transaction processing. Hence if publication
side is performing something like 10M row inserts in single transaction, then
subscription worker will occupy more than 10G of ram (and can be killed by OOM).

Attached patch resets ApplyContext after each insert/update/delete/commit.
I’ve tried to reset context only on each 100/1000/10000 value of CommandCounter,
but didn’t spotted any measurable difference in speed.

Problem spotted by Mikhail Shurutov.

Hmm we also do replication protocol handling inside the ApplyContext
(LogicalRepApplyLoop), are you sure this change is safe in that regard?

Memory is bloated by logicalrep_read_* when palloc happens inside.
I’ve inserted context reset in ensure_transaction() which is only called in beginning
of hande_insert/update/delete/commit when data from previous call of that
function isn’t needed. So probably it is safe to clear memory there. At least
i don’t see any state that can be accessed later in this functions. Do you
have any specific concerns?

I wanted to make sure you checked things that are happening outside of
the apply_handle_* stuff. I checked myself now, the allocations that
happen outside don't use postgres memory contexts (libpqrcv_receive) so
that should not be problem. Other allocations that I see in neighboring
code switch to permanent contexts anyway. So looks safe on first look
indeed. Eventually we'll want to cache some of the executor stuff so
this will be problem but not in v10.

I'd still like you to add comment to the ApplyContext saying that it's
cleaned after handling each message so nothing that needs to survive for
example whole transaction can be allocated in it as that's not very well
visible IMHO (and I know I will forget about it when writing next patch
in that area).

--
Petr Jelinek http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Training & Services

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

#5Simon Riggs
simon@2ndquadrant.com
In reply to: Petr Jelinek (#4)
Re: Logical replication ApplyContext bloat

On 19 April 2017 at 11:24, Petr Jelinek <petr.jelinek@2ndquadrant.com> wrote:

I'd still like you to add comment to the ApplyContext saying that it's
cleaned after handling each message so nothing that needs to survive for
example whole transaction can be allocated in it as that's not very well
visible IMHO (and I know I will forget about it when writing next patch
in that area).

It would be better to invent the contexts we want now, so we get the
architecture right for future work. Otherwise we have problems with
"can't backpatch this fix because that infrastructure doesn't exist in
PG10" in a couple of years.

So I suggest we have

ApplyMessageContext - cleaned after every message
ApplyTransactionContext - cleaned at EOXact
ApplyContext - persistent

--
Simon Riggs http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

#6Stas Kelvich
s.kelvich@postgrespro.ru
In reply to: Simon Riggs (#5)
Re: Logical replication ApplyContext bloat

On 19 Apr 2017, at 13:34, Simon Riggs <simon@2ndquadrant.com> wrote:

On 19 April 2017 at 11:24, Petr Jelinek <petr.jelinek@2ndquadrant.com> wrote:

I'd still like you to add comment to the ApplyContext saying that it's
cleaned after handling each message so nothing that needs to survive for
example whole transaction can be allocated in it as that's not very well
visible IMHO (and I know I will forget about it when writing next patch
in that area).

It would be better to invent the contexts we want now, so we get the
architecture right for future work. Otherwise we have problems with
"can't backpatch this fix because that infrastructure doesn't exist in
PG10" in a couple of years.

So I suggest we have

ApplyMessageContext - cleaned after every message
ApplyTransactionContext - cleaned at EOXact
ApplyContext - persistent

Right now ApplyContext cleaned after each transaction and by this patch I basically
suggest to clean it after each command counter increment.

So in your definitions that is ApplyMessageContext, most short-lived one. We can
rename now ApplyContext -> ApplyMessageContext to make that clear and avoid
possible name conflict when somebody will decide to keep something during whole
transaction or worker lifespan.

P.S. It also seems to me now that resetting context in ensure_transaction() isn’t that
good idea, probably better just explicitly call reset at the end of each function involved.

--
Simon Riggs http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services

Stas Kelvich
Postgres Professional: http://www.postgrespro.com
The Russian Postgres Company

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

#7Petr Jelinek
petr.jelinek@2ndquadrant.com
In reply to: Stas Kelvich (#6)
Re: Logical replication ApplyContext bloat

On 19/04/17 12:46, Stas Kelvich wrote:

On 19 Apr 2017, at 13:34, Simon Riggs <simon@2ndquadrant.com> wrote:

On 19 April 2017 at 11:24, Petr Jelinek <petr.jelinek@2ndquadrant.com> wrote:

I'd still like you to add comment to the ApplyContext saying that it's
cleaned after handling each message so nothing that needs to survive for
example whole transaction can be allocated in it as that's not very well
visible IMHO (and I know I will forget about it when writing next patch
in that area).

It would be better to invent the contexts we want now, so we get the
architecture right for future work. Otherwise we have problems with
"can't backpatch this fix because that infrastructure doesn't exist in
PG10" in a couple of years.

So I suggest we have

ApplyMessageContext - cleaned after every message
ApplyTransactionContext - cleaned at EOXact
ApplyContext - persistent

Right now ApplyContext cleaned after each transaction and by this patch I basically
suggest to clean it after each command counter increment.

So in your definitions that is ApplyMessageContext, most short-lived one. We can
rename now ApplyContext -> ApplyMessageContext to make that clear and avoid
possible name conflict when somebody will decide to keep something during whole
transaction or worker lifespan.

Yeah we can rename, and then rename the ApplyCacheContext to
ApplyContext. That would leave the ApplyTransactionContext from Simon's
proposal which is not really need it anywhere right now and I am not
sure it will be needed given there is still TopTransactionContext
present. If we really want ApplyTransactionContext we can probably just
assign it to TopTransactionContext in ensure_transaction.

P.S. It also seems to me now that resetting context in ensure_transaction() isn’t that
good idea, probably better just explicitly call reset at the end of each function involved.

Well it would definitely improve clarity.

--
Petr Jelinek http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Training & Services

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

#8Stas Kelvich
s.kelvich@postgrespro.ru
In reply to: Petr Jelinek (#7)
1 attachment(s)
Re: Logical replication ApplyContext bloat

On 19 Apr 2017, at 14:30, Petr Jelinek <petr.jelinek@2ndquadrant.com> wrote:

On 19/04/17 12:46, Stas Kelvich wrote:

Right now ApplyContext cleaned after each transaction and by this patch I basically
suggest to clean it after each command counter increment.

So in your definitions that is ApplyMessageContext, most short-lived one. We can
rename now ApplyContext -> ApplyMessageContext to make that clear and avoid
possible name conflict when somebody will decide to keep something during whole
transaction or worker lifespan.

Yeah we can rename, and then rename the ApplyCacheContext to
ApplyContext. That would leave the ApplyTransactionContext from Simon's
proposal which is not really need it anywhere right now and I am not
sure it will be needed given there is still TopTransactionContext
present. If we really want ApplyTransactionContext we can probably just
assign it to TopTransactionContext in ensure_transaction.

P.S. It also seems to me now that resetting context in ensure_transaction() isn’t that
good idea, probably better just explicitly call reset at the end of each function involved.

Well it would definitely improve clarity.

Okay, done.

With patch MemoryContextStats() shows following hierarchy during slot operations in
apply worker:

TopMemoryContext: 83824 total in 5 blocks; 9224 free (8 chunks); 74600 used
ApplyContext: 8192 total in 1 blocks; 6520 free (4 chunks); 1672 used
ApplyMessageContext: 8192 total in 1 blocks; 6632 free (11 chunks); 1560 used
ExecutorState: 8192 total in 1 blocks; 7624 free (0 chunks); 568 used
ExprContext: 0 total in 0 blocks; 0 free (0 chunks); 0 used

Attachments:

apply_contexts.patchapplication/octet-stream; name=apply_contexts.patch; x-unix-mode=0644Download
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 656d399..23b7733 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -101,8 +101,8 @@ typedef struct SlotErrCallbackArg
 	int			attnum;
 } SlotErrCallbackArg;
 
-static MemoryContext	ApplyContext = NULL;
-MemoryContext			ApplyCacheContext = NULL;
+static MemoryContext	ApplyMessageContext = NULL;
+MemoryContext			ApplyContext = NULL;
 
 WalReceiverConn	   *wrconn = NULL;
 
@@ -145,15 +145,16 @@ should_apply_changes_for_rel(LogicalRepRelMapEntry *rel)
 /*
  * Make sure that we started local transaction.
  *
- * Also switches to ApplyContext as necessary.
+ * Also switches to ApplyMessageContext as necessary.
  */
 static bool
 ensure_transaction(void)
 {
 	if (IsTransactionState())
 	{
-		if (CurrentMemoryContext != ApplyContext)
-			MemoryContextSwitchTo(ApplyContext);
+		if (CurrentMemoryContext != ApplyMessageContext)
+			MemoryContextSwitchTo(ApplyMessageContext);
+
 		return false;
 	}
 
@@ -162,7 +163,7 @@ ensure_transaction(void)
 	if (!MySubscriptionValid)
 		reread_subscription();
 
-	MemoryContextSwitchTo(ApplyContext);
+	MemoryContextSwitchTo(ApplyMessageContext);
 	return true;
 }
 
@@ -592,6 +593,7 @@ apply_handle_insert(StringInfo s)
 	logicalrep_rel_close(rel, NoLock);
 
 	CommandCounterIncrement();
+	MemoryContextReset(ApplyMessageContext);
 }
 
 /*
@@ -746,6 +748,7 @@ apply_handle_update(StringInfo s)
 	logicalrep_rel_close(rel, NoLock);
 
 	CommandCounterIncrement();
+	MemoryContextReset(ApplyMessageContext);
 }
 
 /*
@@ -846,6 +849,7 @@ apply_handle_delete(StringInfo s)
 	logicalrep_rel_close(rel, NoLock);
 
 	CommandCounterIncrement();
+	MemoryContextReset(ApplyMessageContext);
 }
 
 
@@ -961,7 +965,7 @@ store_flush_position(XLogRecPtr remote_lsn)
 	FlushPosition *flushpos;
 
 	/* Need to do this in permanent context */
-	MemoryContextSwitchTo(ApplyCacheContext);
+	MemoryContextSwitchTo(ApplyContext);
 
 	/* Track commit lsn  */
 	flushpos = (FlushPosition *) palloc(sizeof(FlushPosition));
@@ -969,7 +973,7 @@ store_flush_position(XLogRecPtr remote_lsn)
 	flushpos->remote_end = remote_lsn;
 
 	dlist_push_tail(&lsn_mapping, &flushpos->node);
-	MemoryContextSwitchTo(ApplyContext);
+	MemoryContextSwitchTo(ApplyMessageContext);
 }
 
 
@@ -993,12 +997,13 @@ UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply)
 static void
 LogicalRepApplyLoop(XLogRecPtr last_received)
 {
-	/* Init the ApplyContext which we use for easier cleanup. */
-	ApplyContext = AllocSetContextCreate(TopMemoryContext,
-										 "ApplyContext",
-										 ALLOCSET_DEFAULT_MINSIZE,
-										 ALLOCSET_DEFAULT_INITSIZE,
-										 ALLOCSET_DEFAULT_MAXSIZE);
+	/*
+	 * Init the ApplyMessageContext which we clean up after each
+	 * command counter increment.
+	 */
+	ApplyMessageContext = AllocSetContextCreate(ApplyContext,
+										 "ApplyMessageContext",
+										 ALLOCSET_DEFAULT_SIZES);
 
 	/* mark as idle, before starting to loop */
 	pgstat_report_activity(STATE_IDLE, NULL);
@@ -1013,7 +1018,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
 		TimestampTz last_recv_timestamp = GetCurrentTimestamp();
 		bool		ping_sent = false;
 
-		MemoryContextSwitchTo(ApplyContext);
+		MemoryContextSwitchTo(ApplyMessageContext);
 
 		len = walrcv_receive(wrconn, &buf, &fd);
 
@@ -1045,7 +1050,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
 					ping_sent = false;
 
 					/* Ensure we are reading the data into our memory context. */
-					MemoryContextSwitchTo(ApplyContext);
+					MemoryContextSwitchTo(ApplyMessageContext);
 
 					s.data = buf;
 					s.len = len;
@@ -1115,7 +1120,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
 		}
 
 		/* Cleanup the memory. */
-		MemoryContextResetAndDeleteChildren(ApplyContext);
+		MemoryContextResetAndDeleteChildren(ApplyMessageContext);
 		MemoryContextSwitchTo(TopMemoryContext);
 
 		/* Check if we need to exit the streaming loop. */
@@ -1258,7 +1263,7 @@ send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
 
 	if (!reply_message)
 	{
-		MemoryContext	oldctx = MemoryContextSwitchTo(ApplyCacheContext);
+		MemoryContext	oldctx = MemoryContextSwitchTo(ApplyContext);
 		reply_message = makeStringInfo();
 		MemoryContextSwitchTo(oldctx);
 	}
@@ -1308,7 +1313,7 @@ reread_subscription(void)
 	}
 
 	/* Ensure allocations in permanent context. */
-	oldctx = MemoryContextSwitchTo(ApplyCacheContext);
+	oldctx = MemoryContextSwitchTo(ApplyContext);
 
 	newsub = GetSubscription(MyLogicalRepWorker->subid, true);
 
@@ -1480,12 +1485,11 @@ ApplyWorkerMain(Datum main_arg)
 											  MyLogicalRepWorker->userid);
 
 	/* Load the subscription into persistent memory context. */
-	CreateCacheMemoryContext();
-	ApplyCacheContext = AllocSetContextCreate(CacheMemoryContext,
-											  "ApplyCacheContext",
+	ApplyContext = AllocSetContextCreate(TopMemoryContext,
+											  "ApplyContext",
 											  ALLOCSET_DEFAULT_SIZES);
 	StartTransactionCommand();
-	oldctx = MemoryContextSwitchTo(ApplyCacheContext);
+	oldctx = MemoryContextSwitchTo(ApplyContext);
 	MySubscription = GetSubscription(MyLogicalRepWorker->subid, false);
 	MySubscriptionValid = true;
 	MemoryContextSwitchTo(oldctx);
@@ -1530,7 +1534,7 @@ ApplyWorkerMain(Datum main_arg)
 		syncslotname = LogicalRepSyncTableStart(&origin_startpos);
 
 		/* The slot name needs to be allocated in permanent memory context. */
-		oldctx = MemoryContextSwitchTo(ApplyCacheContext);
+		oldctx = MemoryContextSwitchTo(ApplyContext);
 		myslotname = pstrdup(syncslotname);
 		MemoryContextSwitchTo(oldctx);
 
diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h
index b8e35d4..b13e671 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -47,8 +47,8 @@ typedef struct LogicalRepWorker
 	TimestampTz	reply_time;
 } LogicalRepWorker;
 
-/* Memory context for cached variables in apply worker. */
-extern MemoryContext				ApplyCacheContext;
+/* Main memory context for apply worker. Permanent during worker lifetime. */
+extern MemoryContext				ApplyContext;
 
 /* libpqreceiver connection */
 extern struct WalReceiverConn	   *wrconn;
#9Alvaro Herrera
alvherre@2ndquadrant.com
In reply to: Stas Kelvich (#8)
Re: Logical replication ApplyContext bloat

Stas Kelvich wrote:

With patch MemoryContextStats() shows following hierarchy during slot operations in
apply worker:

TopMemoryContext: 83824 total in 5 blocks; 9224 free (8 chunks); 74600 used
ApplyContext: 8192 total in 1 blocks; 6520 free (4 chunks); 1672 used
ApplyMessageContext: 8192 total in 1 blocks; 6632 free (11 chunks); 1560 used
ExecutorState: 8192 total in 1 blocks; 7624 free (0 chunks); 568 used
ExprContext: 0 total in 0 blocks; 0 free (0 chunks); 0 used

Please update src/backend/utils/mmgr/README to list the new contexts.

--
�lvaro Herrera https://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

#10Stas Kelvich
s.kelvich@postgrespro.ru
In reply to: Alvaro Herrera (#9)
1 attachment(s)
Re: Logical replication ApplyContext bloat

On 19 Apr 2017, at 16:07, Alvaro Herrera <alvherre@2ndquadrant.com> wrote:

Stas Kelvich wrote:

With patch MemoryContextStats() shows following hierarchy during slot operations in
apply worker:

TopMemoryContext: 83824 total in 5 blocks; 9224 free (8 chunks); 74600 used
ApplyContext: 8192 total in 1 blocks; 6520 free (4 chunks); 1672 used
ApplyMessageContext: 8192 total in 1 blocks; 6632 free (11 chunks); 1560 used
ExecutorState: 8192 total in 1 blocks; 7624 free (0 chunks); 568 used
ExprContext: 0 total in 0 blocks; 0 free (0 chunks); 0 used

Please update src/backend/utils/mmgr/README to list the new contexts.

Thanks for noting.

Added short description of ApplyContext and ApplyMessageContext to README.

Attachments:

apply_contexts_2.patchapplication/octet-stream; name=apply_contexts_2.patch; x-unix-mode=0644Download
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index fa4f3b6..c053f33 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -101,8 +101,8 @@ typedef struct SlotErrCallbackArg
 	int			attnum;
 } SlotErrCallbackArg;
 
-static MemoryContext	ApplyContext = NULL;
-MemoryContext			ApplyCacheContext = NULL;
+static MemoryContext	ApplyMessageContext = NULL;
+MemoryContext			ApplyContext = NULL;
 
 WalReceiverConn	   *wrconn = NULL;
 
@@ -145,15 +145,16 @@ should_apply_changes_for_rel(LogicalRepRelMapEntry *rel)
 /*
  * Make sure that we started local transaction.
  *
- * Also switches to ApplyContext as necessary.
+ * Also switches to ApplyMessageContext as necessary.
  */
 static bool
 ensure_transaction(void)
 {
 	if (IsTransactionState())
 	{
-		if (CurrentMemoryContext != ApplyContext)
-			MemoryContextSwitchTo(ApplyContext);
+		if (CurrentMemoryContext != ApplyMessageContext)
+			MemoryContextSwitchTo(ApplyMessageContext);
+
 		return false;
 	}
 
@@ -162,7 +163,7 @@ ensure_transaction(void)
 	if (!MySubscriptionValid)
 		reread_subscription();
 
-	MemoryContextSwitchTo(ApplyContext);
+	MemoryContextSwitchTo(ApplyMessageContext);
 	return true;
 }
 
@@ -592,6 +593,7 @@ apply_handle_insert(StringInfo s)
 	logicalrep_rel_close(rel, NoLock);
 
 	CommandCounterIncrement();
+	MemoryContextReset(ApplyMessageContext);
 }
 
 /*
@@ -746,6 +748,7 @@ apply_handle_update(StringInfo s)
 	logicalrep_rel_close(rel, NoLock);
 
 	CommandCounterIncrement();
+	MemoryContextReset(ApplyMessageContext);
 }
 
 /*
@@ -846,6 +849,7 @@ apply_handle_delete(StringInfo s)
 	logicalrep_rel_close(rel, NoLock);
 
 	CommandCounterIncrement();
+	MemoryContextReset(ApplyMessageContext);
 }
 
 
@@ -961,7 +965,7 @@ store_flush_position(XLogRecPtr remote_lsn)
 	FlushPosition *flushpos;
 
 	/* Need to do this in permanent context */
-	MemoryContextSwitchTo(ApplyCacheContext);
+	MemoryContextSwitchTo(ApplyContext);
 
 	/* Track commit lsn  */
 	flushpos = (FlushPosition *) palloc(sizeof(FlushPosition));
@@ -969,7 +973,7 @@ store_flush_position(XLogRecPtr remote_lsn)
 	flushpos->remote_end = remote_lsn;
 
 	dlist_push_tail(&lsn_mapping, &flushpos->node);
-	MemoryContextSwitchTo(ApplyContext);
+	MemoryContextSwitchTo(ApplyMessageContext);
 }
 
 
@@ -993,12 +997,13 @@ UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply)
 static void
 LogicalRepApplyLoop(XLogRecPtr last_received)
 {
-	/* Init the ApplyContext which we use for easier cleanup. */
-	ApplyContext = AllocSetContextCreate(TopMemoryContext,
-										 "ApplyContext",
-										 ALLOCSET_DEFAULT_MINSIZE,
-										 ALLOCSET_DEFAULT_INITSIZE,
-										 ALLOCSET_DEFAULT_MAXSIZE);
+	/*
+	 * Init the ApplyMessageContext which we clean up after each
+	 * command counter increment.
+	 */
+	ApplyMessageContext = AllocSetContextCreate(ApplyContext,
+										 "ApplyMessageContext",
+										 ALLOCSET_DEFAULT_SIZES);
 
 	/* mark as idle, before starting to loop */
 	pgstat_report_activity(STATE_IDLE, NULL);
@@ -1013,7 +1018,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
 		TimestampTz last_recv_timestamp = GetCurrentTimestamp();
 		bool		ping_sent = false;
 
-		MemoryContextSwitchTo(ApplyContext);
+		MemoryContextSwitchTo(ApplyMessageContext);
 
 		len = walrcv_receive(wrconn, &buf, &fd);
 
@@ -1045,7 +1050,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
 					ping_sent = false;
 
 					/* Ensure we are reading the data into our memory context. */
-					MemoryContextSwitchTo(ApplyContext);
+					MemoryContextSwitchTo(ApplyMessageContext);
 
 					s.data = buf;
 					s.len = len;
@@ -1115,7 +1120,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
 		}
 
 		/* Cleanup the memory. */
-		MemoryContextResetAndDeleteChildren(ApplyContext);
+		MemoryContextResetAndDeleteChildren(ApplyMessageContext);
 		MemoryContextSwitchTo(TopMemoryContext);
 
 		/* Check if we need to exit the streaming loop. */
@@ -1258,7 +1263,7 @@ send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
 
 	if (!reply_message)
 	{
-		MemoryContext	oldctx = MemoryContextSwitchTo(ApplyCacheContext);
+		MemoryContext	oldctx = MemoryContextSwitchTo(ApplyContext);
 		reply_message = makeStringInfo();
 		MemoryContextSwitchTo(oldctx);
 	}
@@ -1308,7 +1313,7 @@ reread_subscription(void)
 	}
 
 	/* Ensure allocations in permanent context. */
-	oldctx = MemoryContextSwitchTo(ApplyCacheContext);
+	oldctx = MemoryContextSwitchTo(ApplyContext);
 
 	newsub = GetSubscription(MyLogicalRepWorker->subid, true);
 
@@ -1480,12 +1485,11 @@ ApplyWorkerMain(Datum main_arg)
 											  MyLogicalRepWorker->userid);
 
 	/* Load the subscription into persistent memory context. */
-	CreateCacheMemoryContext();
-	ApplyCacheContext = AllocSetContextCreate(CacheMemoryContext,
-											  "ApplyCacheContext",
+	ApplyContext = AllocSetContextCreate(TopMemoryContext,
+											  "ApplyContext",
 											  ALLOCSET_DEFAULT_SIZES);
 	StartTransactionCommand();
-	oldctx = MemoryContextSwitchTo(ApplyCacheContext);
+	oldctx = MemoryContextSwitchTo(ApplyContext);
 	MySubscription = GetSubscription(MyLogicalRepWorker->subid, false);
 	MySubscriptionValid = true;
 	MemoryContextSwitchTo(oldctx);
@@ -1530,7 +1534,7 @@ ApplyWorkerMain(Datum main_arg)
 		syncslotname = LogicalRepSyncTableStart(&origin_startpos);
 
 		/* The slot name needs to be allocated in permanent memory context. */
-		oldctx = MemoryContextSwitchTo(ApplyCacheContext);
+		oldctx = MemoryContextSwitchTo(ApplyContext);
 		myslotname = pstrdup(syncslotname);
 		MemoryContextSwitchTo(oldctx);
 
diff --git a/src/backend/utils/mmgr/README b/src/backend/utils/mmgr/README
index 480b1f8..69fb642 100644
--- a/src/backend/utils/mmgr/README
+++ b/src/backend/utils/mmgr/README
@@ -265,6 +265,17 @@ from prepared statements simply reference the prepared statements' trees,
 and don't actually need any storage allocated in their private contexts.
 
 
+Logical replication worker contexts
+-----------------------------------
+
+ApplyContext --- permanent during whole lifetime of apply worker. It is
+possible to use TopMemoryContext here as well, but for simplicity of memory usage
+analysys we spin up different context.
+
+ApplyMessageContext --- short-lived context that is cleaned after each
+update/insert/delete is applied.
+
+
 Transient Contexts During Execution
 -----------------------------------
 
diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h
index b8e35d4..b13e671 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -47,8 +47,8 @@ typedef struct LogicalRepWorker
 	TimestampTz	reply_time;
 } LogicalRepWorker;
 
-/* Memory context for cached variables in apply worker. */
-extern MemoryContext				ApplyCacheContext;
+/* Main memory context for apply worker. Permanent during worker lifetime. */
+extern MemoryContext				ApplyContext;
 
 /* libpqreceiver connection */
 extern struct WalReceiverConn	   *wrconn;
#11Dilip Kumar
dilipbalaut@gmail.com
In reply to: Stas Kelvich (#10)
Re: Logical replication ApplyContext bloat

On Thu, Apr 20, 2017 at 7:04 PM, Stas Kelvich <s.kelvich@postgrespro.ru> wrote:

Thanks for noting.

Added short description of ApplyContext and ApplyMessageContext to README.

+ApplyContext --- permanent during whole lifetime of apply worker. It is
+possible to use TopMemoryContext here as well, but for simplicity of
memory usage
+analysys we spin up different context.

Typo

/analysys/analysis

--
Regards,
Dilip Kumar
EnterpriseDB: http://www.enterprisedb.com

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

#12Stas Kelvich
s.kelvich@postgrespro.ru
In reply to: Dilip Kumar (#11)
1 attachment(s)
Re: Logical replication ApplyContext bloat

On 20 Apr 2017, at 17:01, Dilip Kumar <dilipbalaut@gmail.com> wrote:

On Thu, Apr 20, 2017 at 7:04 PM, Stas Kelvich <s.kelvich@postgrespro.ru> wrote:

Thanks for noting.

Added short description of ApplyContext and ApplyMessageContext to README.

Typo

/analysys/analysis

Fixed, thanks.

Added this to open items list.

Attachments:

apply_contexts_3.patchapplication/octet-stream; name=apply_contexts_3.patch; x-unix-mode=0644Download
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index fa4f3b6..c053f33 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -101,8 +101,8 @@ typedef struct SlotErrCallbackArg
 	int			attnum;
 } SlotErrCallbackArg;
 
-static MemoryContext	ApplyContext = NULL;
-MemoryContext			ApplyCacheContext = NULL;
+static MemoryContext	ApplyMessageContext = NULL;
+MemoryContext			ApplyContext = NULL;
 
 WalReceiverConn	   *wrconn = NULL;
 
@@ -145,15 +145,16 @@ should_apply_changes_for_rel(LogicalRepRelMapEntry *rel)
 /*
  * Make sure that we started local transaction.
  *
- * Also switches to ApplyContext as necessary.
+ * Also switches to ApplyMessageContext as necessary.
  */
 static bool
 ensure_transaction(void)
 {
 	if (IsTransactionState())
 	{
-		if (CurrentMemoryContext != ApplyContext)
-			MemoryContextSwitchTo(ApplyContext);
+		if (CurrentMemoryContext != ApplyMessageContext)
+			MemoryContextSwitchTo(ApplyMessageContext);
+
 		return false;
 	}
 
@@ -162,7 +163,7 @@ ensure_transaction(void)
 	if (!MySubscriptionValid)
 		reread_subscription();
 
-	MemoryContextSwitchTo(ApplyContext);
+	MemoryContextSwitchTo(ApplyMessageContext);
 	return true;
 }
 
@@ -592,6 +593,7 @@ apply_handle_insert(StringInfo s)
 	logicalrep_rel_close(rel, NoLock);
 
 	CommandCounterIncrement();
+	MemoryContextReset(ApplyMessageContext);
 }
 
 /*
@@ -746,6 +748,7 @@ apply_handle_update(StringInfo s)
 	logicalrep_rel_close(rel, NoLock);
 
 	CommandCounterIncrement();
+	MemoryContextReset(ApplyMessageContext);
 }
 
 /*
@@ -846,6 +849,7 @@ apply_handle_delete(StringInfo s)
 	logicalrep_rel_close(rel, NoLock);
 
 	CommandCounterIncrement();
+	MemoryContextReset(ApplyMessageContext);
 }
 
 
@@ -961,7 +965,7 @@ store_flush_position(XLogRecPtr remote_lsn)
 	FlushPosition *flushpos;
 
 	/* Need to do this in permanent context */
-	MemoryContextSwitchTo(ApplyCacheContext);
+	MemoryContextSwitchTo(ApplyContext);
 
 	/* Track commit lsn  */
 	flushpos = (FlushPosition *) palloc(sizeof(FlushPosition));
@@ -969,7 +973,7 @@ store_flush_position(XLogRecPtr remote_lsn)
 	flushpos->remote_end = remote_lsn;
 
 	dlist_push_tail(&lsn_mapping, &flushpos->node);
-	MemoryContextSwitchTo(ApplyContext);
+	MemoryContextSwitchTo(ApplyMessageContext);
 }
 
 
@@ -993,12 +997,13 @@ UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply)
 static void
 LogicalRepApplyLoop(XLogRecPtr last_received)
 {
-	/* Init the ApplyContext which we use for easier cleanup. */
-	ApplyContext = AllocSetContextCreate(TopMemoryContext,
-										 "ApplyContext",
-										 ALLOCSET_DEFAULT_MINSIZE,
-										 ALLOCSET_DEFAULT_INITSIZE,
-										 ALLOCSET_DEFAULT_MAXSIZE);
+	/*
+	 * Init the ApplyMessageContext which we clean up after each
+	 * command counter increment.
+	 */
+	ApplyMessageContext = AllocSetContextCreate(ApplyContext,
+										 "ApplyMessageContext",
+										 ALLOCSET_DEFAULT_SIZES);
 
 	/* mark as idle, before starting to loop */
 	pgstat_report_activity(STATE_IDLE, NULL);
@@ -1013,7 +1018,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
 		TimestampTz last_recv_timestamp = GetCurrentTimestamp();
 		bool		ping_sent = false;
 
-		MemoryContextSwitchTo(ApplyContext);
+		MemoryContextSwitchTo(ApplyMessageContext);
 
 		len = walrcv_receive(wrconn, &buf, &fd);
 
@@ -1045,7 +1050,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
 					ping_sent = false;
 
 					/* Ensure we are reading the data into our memory context. */
-					MemoryContextSwitchTo(ApplyContext);
+					MemoryContextSwitchTo(ApplyMessageContext);
 
 					s.data = buf;
 					s.len = len;
@@ -1115,7 +1120,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
 		}
 
 		/* Cleanup the memory. */
-		MemoryContextResetAndDeleteChildren(ApplyContext);
+		MemoryContextResetAndDeleteChildren(ApplyMessageContext);
 		MemoryContextSwitchTo(TopMemoryContext);
 
 		/* Check if we need to exit the streaming loop. */
@@ -1258,7 +1263,7 @@ send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
 
 	if (!reply_message)
 	{
-		MemoryContext	oldctx = MemoryContextSwitchTo(ApplyCacheContext);
+		MemoryContext	oldctx = MemoryContextSwitchTo(ApplyContext);
 		reply_message = makeStringInfo();
 		MemoryContextSwitchTo(oldctx);
 	}
@@ -1308,7 +1313,7 @@ reread_subscription(void)
 	}
 
 	/* Ensure allocations in permanent context. */
-	oldctx = MemoryContextSwitchTo(ApplyCacheContext);
+	oldctx = MemoryContextSwitchTo(ApplyContext);
 
 	newsub = GetSubscription(MyLogicalRepWorker->subid, true);
 
@@ -1480,12 +1485,11 @@ ApplyWorkerMain(Datum main_arg)
 											  MyLogicalRepWorker->userid);
 
 	/* Load the subscription into persistent memory context. */
-	CreateCacheMemoryContext();
-	ApplyCacheContext = AllocSetContextCreate(CacheMemoryContext,
-											  "ApplyCacheContext",
+	ApplyContext = AllocSetContextCreate(TopMemoryContext,
+											  "ApplyContext",
 											  ALLOCSET_DEFAULT_SIZES);
 	StartTransactionCommand();
-	oldctx = MemoryContextSwitchTo(ApplyCacheContext);
+	oldctx = MemoryContextSwitchTo(ApplyContext);
 	MySubscription = GetSubscription(MyLogicalRepWorker->subid, false);
 	MySubscriptionValid = true;
 	MemoryContextSwitchTo(oldctx);
@@ -1530,7 +1534,7 @@ ApplyWorkerMain(Datum main_arg)
 		syncslotname = LogicalRepSyncTableStart(&origin_startpos);
 
 		/* The slot name needs to be allocated in permanent memory context. */
-		oldctx = MemoryContextSwitchTo(ApplyCacheContext);
+		oldctx = MemoryContextSwitchTo(ApplyContext);
 		myslotname = pstrdup(syncslotname);
 		MemoryContextSwitchTo(oldctx);
 
diff --git a/src/backend/utils/mmgr/README b/src/backend/utils/mmgr/README
index 480b1f8..69fb642 100644
--- a/src/backend/utils/mmgr/README
+++ b/src/backend/utils/mmgr/README
@@ -265,6 +265,17 @@ from prepared statements simply reference the prepared statements' trees,
 and don't actually need any storage allocated in their private contexts.
 
 
+Logical replication worker contexts
+-----------------------------------
+
+ApplyContext --- permanent during whole lifetime of apply worker. It is
+possible to use TopMemoryContext here as well, but for simplicity of memory usage
+analysis we spin up different context.
+
+ApplyMessageContext --- short-lived context that is cleaned after each
+update/insert/delete is applied.
+
+
 Transient Contexts During Execution
 -----------------------------------
 
diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h
index b8e35d4..b13e671 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -47,8 +47,8 @@ typedef struct LogicalRepWorker
 	TimestampTz	reply_time;
 } LogicalRepWorker;
 
-/* Memory context for cached variables in apply worker. */
-extern MemoryContext				ApplyCacheContext;
+/* Main memory context for apply worker. Permanent during worker lifetime. */
+extern MemoryContext				ApplyContext;
 
 /* libpqreceiver connection */
 extern struct WalReceiverConn	   *wrconn;
#13Noah Misch
noah@leadboat.com
In reply to: Stas Kelvich (#12)
Re: Logical replication ApplyContext bloat

On Wed, May 03, 2017 at 12:02:41PM +0300, Stas Kelvich wrote:

On 20 Apr 2017, at 17:01, Dilip Kumar <dilipbalaut@gmail.com> wrote:
On Thu, Apr 20, 2017 at 7:04 PM, Stas Kelvich <s.kelvich@postgrespro.ru> wrote:

Thanks for noting.

Added short description of ApplyContext and ApplyMessageContext to README.

Typo

/analysys/analysis

Fixed, thanks.

Added this to open items list.

[Action required within three days. This is a generic notification.]

The above-described topic is currently a PostgreSQL 10 open item. Peter,
since you committed the patch believed to have created it, you own this open
item. If some other commit is more relevant or if this does not belong as a
v10 open item, please let us know. Otherwise, please observe the policy on
open item ownership[1]/messages/by-id/20170404140717.GA2675809@tornado.leadboat.com and send a status update within three calendar days of
this message. Include a date for your subsequent status update. Testers may
discover new open items at any time, and I want to plan to get them all fixed
well in advance of shipping v10. Consequently, I will appreciate your efforts
toward speedy resolution. Thanks.

[1]: /messages/by-id/20170404140717.GA2675809@tornado.leadboat.com

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

#14Peter Eisentraut
peter.eisentraut@2ndquadrant.com
In reply to: Stas Kelvich (#12)
Re: Logical replication ApplyContext bloat

I'm not sure about some of the details.

I think it would make more sense to reset ApplyMessageContext in
apply_dispatch(), so that it is done consistently after every message
(as the name implies), not only some messages.

Also, perhaps ApplyMessageContext should be a child of
TopTransactionContext. (You have it as a child of ApplyContext, which
is under TopMemoryContext.)

--
Peter Eisentraut http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

#15Peter Eisentraut
peter.eisentraut@2ndquadrant.com
In reply to: Noah Misch (#13)
Re: Logical replication ApplyContext bloat

On 5/5/17 03:09, Noah Misch wrote:

[Action required within three days. This is a generic notification.]

The above-described topic is currently a PostgreSQL 10 open item. Peter,
since you committed the patch believed to have created it, you own this open
item. If some other commit is more relevant or if this does not belong as a
v10 open item, please let us know. Otherwise, please observe the policy on
open item ownership[1] and send a status update within three calendar days of
this message. Include a date for your subsequent status update. Testers may
discover new open items at any time, and I want to plan to get them all fixed
well in advance of shipping v10. Consequently, I will appreciate your efforts
toward speedy resolution. Thanks.

The discussion is still ongoing. I will focus on it this week and
report back on Wednesday.

--
Peter Eisentraut http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

#16Peter Eisentraut
peter.eisentraut@2ndquadrant.com
In reply to: Peter Eisentraut (#14)
Re: Logical replication ApplyContext bloat

On 5/8/17 11:26, Peter Eisentraut wrote:

I think it would make more sense to reset ApplyMessageContext in
apply_dispatch(), so that it is done consistently after every message
(as the name implies), not only some messages.

Committed that.

Also, perhaps ApplyMessageContext should be a child of
TopTransactionContext. (You have it as a child of ApplyContext, which
is under TopMemoryContext.)

Left that as is.

--
Peter Eisentraut http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

#17Stas Kelvich
s.kelvich@postgrespro.ru
In reply to: Peter Eisentraut (#16)
Re: Logical replication ApplyContext bloat

On 9 May 2017, at 21:53, Peter Eisentraut <peter.eisentraut@2ndquadrant.com> wrote:

On 5/8/17 11:26, Peter Eisentraut wrote:

I think it would make more sense to reset ApplyMessageContext in
apply_dispatch(), so that it is done consistently after every message
(as the name implies), not only some messages.

Committed that.

Also, perhaps ApplyMessageContext should be a child of
TopTransactionContext. (You have it as a child of ApplyContext, which
is under TopMemoryContext.)

Left that as is.

Thanks!

Stas Kelvich
Postgres Professional: http://www.postgrespro.com
The Russian Postgres Company

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers