Simplify some codes in pgoutput

Started by houzj.fnst@fujitsu.comalmost 3 years ago10 messages
#1houzj.fnst@fujitsu.com
houzj.fnst@fujitsu.com
1 attachment(s)

Hi,

I noticed that there are some duplicated codes in pgoutput_change() function
which can be simplified, and here is an attempt to do that.

Best Regards,
Hou Zhijie

Attachments:

0001-simplify-the-code-in-pgoutput_change.patchapplication/octet-stream; name=0001-simplify-the-code-in-pgoutput_change.patchDownload
From 76ab9f02a71c3c2dcae7d87681a29c011ad67a98 Mon Sep 17 00:00:00 2001
From: Hou Zhijie <houzj.fnst@cn.fujitsu.com>
Date: Wed, 15 Mar 2023 15:57:13 +0800
Subject: [PATCH] simplify the code in pgoutput_change

---
 src/backend/replication/pgoutput/pgoutput.c | 204 +++++---------------
 1 file changed, 51 insertions(+), 153 deletions(-)

diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 00a2d73dab..fdaaaa1922 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -1448,173 +1448,68 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 	/* Avoid leaking memory by using and resetting our own context */
 	old = MemoryContextSwitchTo(data->context);
 
-	/* Send the data */
-	switch (action)
+	/* Switch relation if publishing via root. */
+	if (relentry->publish_as_relid != RelationGetRelid(relation))
 	{
-		case REORDER_BUFFER_CHANGE_INSERT:
-			new_slot = relentry->new_slot;
-			ExecStoreHeapTuple(&change->data.tp.newtuple->tuple,
-							   new_slot, false);
+		Assert(relation->rd_rel->relispartition);
+		ancestor = RelationIdGetRelation(relentry->publish_as_relid);
+		targetrel = ancestor;
+	}
 
-			/* Switch relation if publishing via root. */
-			if (relentry->publish_as_relid != RelationGetRelid(relation))
-			{
-				Assert(relation->rd_rel->relispartition);
-				ancestor = RelationIdGetRelation(relentry->publish_as_relid);
-				targetrel = ancestor;
-				/* Convert tuple if needed. */
-				if (relentry->attrmap)
-				{
-					TupleDesc	tupdesc = RelationGetDescr(targetrel);
+	if (change->data.tp.newtuple)
+	{
+		new_slot = relentry->new_slot;
+		ExecStoreHeapTuple(&change->data.tp.newtuple->tuple, new_slot, false);
 
-					new_slot = execute_attr_map_slot(relentry->attrmap,
-													 new_slot,
-													 MakeTupleTableSlot(tupdesc, &TTSOpsVirtual));
-				}
-			}
+		if (relentry->attrmap)
+			new_slot = execute_attr_map_slot(relentry->attrmap, new_slot,
+											 MakeTupleTableSlot(RelationGetDescr(targetrel),
+																&TTSOpsVirtual));
+	}
 
-			/* Check row filter */
-			if (!pgoutput_row_filter(targetrel, NULL, &new_slot, relentry,
-									 &action))
-				break;
+	if (change->data.tp.oldtuple)
+	{
+		old_slot = relentry->old_slot;
+		ExecStoreHeapTuple(&change->data.tp.oldtuple->tuple, old_slot, false);
 
-			/*
-			 * Send BEGIN if we haven't yet.
-			 *
-			 * We send the BEGIN message after ensuring that we will actually
-			 * send the change. This avoids sending a pair of BEGIN/COMMIT
-			 * messages for empty transactions.
-			 */
-			if (txndata && !txndata->sent_begin_txn)
-				pgoutput_send_begin(ctx, txn);
+		if (relentry->attrmap)
+			old_slot = execute_attr_map_slot(relentry->attrmap, old_slot,
+											 MakeTupleTableSlot(RelationGetDescr(targetrel),
+																&TTSOpsVirtual));
+	}
 
-			/*
-			 * Schema should be sent using the original relation because it
-			 * also sends the ancestor's relation.
-			 */
-			maybe_send_schema(ctx, change, relation, relentry);
+	/*
+	 * Check row filter.
+	 *
+	 * Updates could be transformed to inserts or deletes based on the results
+	 * of the row filter for old and new tuple.
+	 */
+	if (!pgoutput_row_filter(targetrel, old_slot, &new_slot, relentry, &action))
+		goto cleanup;
 
-			OutputPluginPrepareWrite(ctx, true);
+	/* Send BEGIN if we haven't yet */
+	if (txndata && !txndata->sent_begin_txn)
+		pgoutput_send_begin(ctx, txn);
+
+	maybe_send_schema(ctx, change, relation, relentry);
+
+	OutputPluginPrepareWrite(ctx, true);
+
+	/* Send the data */
+	switch (action)
+	{
+		case REORDER_BUFFER_CHANGE_INSERT:
 			logicalrep_write_insert(ctx->out, xid, targetrel, new_slot,
 									data->binary, relentry->columns);
-			OutputPluginWrite(ctx, true);
 			break;
 		case REORDER_BUFFER_CHANGE_UPDATE:
-			if (change->data.tp.oldtuple)
-			{
-				old_slot = relentry->old_slot;
-				ExecStoreHeapTuple(&change->data.tp.oldtuple->tuple,
-								   old_slot, false);
-			}
-
-			new_slot = relentry->new_slot;
-			ExecStoreHeapTuple(&change->data.tp.newtuple->tuple,
-							   new_slot, false);
-
-			/* Switch relation if publishing via root. */
-			if (relentry->publish_as_relid != RelationGetRelid(relation))
-			{
-				Assert(relation->rd_rel->relispartition);
-				ancestor = RelationIdGetRelation(relentry->publish_as_relid);
-				targetrel = ancestor;
-				/* Convert tuples if needed. */
-				if (relentry->attrmap)
-				{
-					TupleDesc	tupdesc = RelationGetDescr(targetrel);
-
-					if (old_slot)
-						old_slot = execute_attr_map_slot(relentry->attrmap,
-														 old_slot,
-														 MakeTupleTableSlot(tupdesc, &TTSOpsVirtual));
-
-					new_slot = execute_attr_map_slot(relentry->attrmap,
-													 new_slot,
-													 MakeTupleTableSlot(tupdesc, &TTSOpsVirtual));
-				}
-			}
-
-			/* Check row filter */
-			if (!pgoutput_row_filter(targetrel, old_slot, &new_slot,
-									 relentry, &action))
-				break;
-
-			/* Send BEGIN if we haven't yet */
-			if (txndata && !txndata->sent_begin_txn)
-				pgoutput_send_begin(ctx, txn);
-
-			maybe_send_schema(ctx, change, relation, relentry);
-
-			OutputPluginPrepareWrite(ctx, true);
-
-			/*
-			 * Updates could be transformed to inserts or deletes based on the
-			 * results of the row filter for old and new tuple.
-			 */
-			switch (action)
-			{
-				case REORDER_BUFFER_CHANGE_INSERT:
-					logicalrep_write_insert(ctx->out, xid, targetrel,
-											new_slot, data->binary,
-											relentry->columns);
-					break;
-				case REORDER_BUFFER_CHANGE_UPDATE:
-					logicalrep_write_update(ctx->out, xid, targetrel,
-											old_slot, new_slot, data->binary,
-											relentry->columns);
-					break;
-				case REORDER_BUFFER_CHANGE_DELETE:
-					logicalrep_write_delete(ctx->out, xid, targetrel,
-											old_slot, data->binary,
-											relentry->columns);
-					break;
-				default:
-					Assert(false);
-			}
-
-			OutputPluginWrite(ctx, true);
+			logicalrep_write_update(ctx->out, xid, targetrel, old_slot,
+									new_slot, data->binary, relentry->columns);
 			break;
 		case REORDER_BUFFER_CHANGE_DELETE:
 			if (change->data.tp.oldtuple)
-			{
-				old_slot = relentry->old_slot;
-
-				ExecStoreHeapTuple(&change->data.tp.oldtuple->tuple,
-								   old_slot, false);
-
-				/* Switch relation if publishing via root. */
-				if (relentry->publish_as_relid != RelationGetRelid(relation))
-				{
-					Assert(relation->rd_rel->relispartition);
-					ancestor = RelationIdGetRelation(relentry->publish_as_relid);
-					targetrel = ancestor;
-					/* Convert tuple if needed. */
-					if (relentry->attrmap)
-					{
-						TupleDesc	tupdesc = RelationGetDescr(targetrel);
-
-						old_slot = execute_attr_map_slot(relentry->attrmap,
-														 old_slot,
-														 MakeTupleTableSlot(tupdesc, &TTSOpsVirtual));
-					}
-				}
-
-				/* Check row filter */
-				if (!pgoutput_row_filter(targetrel, old_slot, &new_slot,
-										 relentry, &action))
-					break;
-
-				/* Send BEGIN if we haven't yet */
-				if (txndata && !txndata->sent_begin_txn)
-					pgoutput_send_begin(ctx, txn);
-
-				maybe_send_schema(ctx, change, relation, relentry);
-
-				OutputPluginPrepareWrite(ctx, true);
-				logicalrep_write_delete(ctx->out, xid, targetrel,
-										old_slot, data->binary,
-										relentry->columns);
-				OutputPluginWrite(ctx, true);
-			}
+				logicalrep_write_delete(ctx->out, xid, targetrel, old_slot,
+										data->binary, relentry->columns);
 			else
 				elog(DEBUG1, "didn't send DELETE change because of missing oldtuple");
 			break;
@@ -1622,6 +1517,9 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 			Assert(false);
 	}
 
+	OutputPluginWrite(ctx, true);
+
+cleanup:
 	if (RelationIsValid(ancestor))
 	{
 		RelationClose(ancestor);
-- 
2.30.0.windows.2

#2Amit Kapila
amit.kapila16@gmail.com
In reply to: houzj.fnst@fujitsu.com (#1)
Re: Simplify some codes in pgoutput

On Wed, Mar 15, 2023 at 2:00 PM houzj.fnst@fujitsu.com
<houzj.fnst@fujitsu.com> wrote:

I noticed that there are some duplicated codes in pgoutput_change() function
which can be simplified, and here is an attempt to do that.

For REORDER_BUFFER_CHANGE_DELETE, when the old tuple is missing, after
this patch, we will still send BEGIN and do OutputPluginWrite, etc.
Also, it will try to perform row_filter when none of old_slot or
new_slot is set. I don't know for which particular case we have s
handling missing old tuples for deletes but that might require changes
in your proposed patch.

--
With Regards,
Amit Kapila.

#3Peter Smith
smithpb2250@gmail.com
In reply to: houzj.fnst@fujitsu.com (#1)
Re: Simplify some codes in pgoutput

On Wed, Mar 15, 2023 at 7:30 PM houzj.fnst@fujitsu.com
<houzj.fnst@fujitsu.com> wrote:

Hi,

I noticed that there are some duplicated codes in pgoutput_change() function
which can be simplified, and here is an attempt to do that.

Best Regards,
Hou Zhijie

Hi Hou-san.

I had a quick look at the 0001 patch.

Here are some first comments.

======

1.
+ if (relentry->attrmap)
+ old_slot = execute_attr_map_slot(relentry->attrmap, old_slot,
+ MakeTupleTableSlot(RelationGetDescr(targetrel),
+ &TTSOpsVirtual));

1a.
IMO maybe it was more readable before when there was a separate
'tupdesc' variable, instead of trying to squeeze too much into one
statement.

1b.
Should you retain the old comments that said "/* Convert tuple if needed. */"

~~~

2.
- if (old_slot)
- old_slot = execute_attr_map_slot(relentry->attrmap,
- old_slot,
- MakeTupleTableSlot(tupdesc, &TTSOpsVirtual));

The original code for REORDER_BUFFER_CHANGE_UPDATE was checking "if
(old_slot)" but that check seems no longer present. Is it OK?

~~~

3.
- /*
- * Send BEGIN if we haven't yet.
- *
- * We send the BEGIN message after ensuring that we will actually
- * send the change. This avoids sending a pair of BEGIN/COMMIT
- * messages for empty transactions.
- */

That original longer comment has been replaced with just "/* Send
BEGIN if we haven't yet */". Won't it be better to retain the more
informative longer comment?

~~~

4.
+
+cleanup:
if (RelationIsValid(ancestor))
{
RelationClose(ancestor);

~

Since you've introduced a new label 'cleanup:' then IMO you can remove
that old comment "/* Cleanup */".

------
Kind Regards,
Peter Smith.
Fujitsu Australia

#4houzj.fnst@fujitsu.com
houzj.fnst@fujitsu.com
In reply to: Amit Kapila (#2)
1 attachment(s)
RE: Simplify some codes in pgoutput

On Thursday, March 16, 2023 12:30 PM Amit Kapila <amit.kapila16@gmail.com> wrote:

On Wed, Mar 15, 2023 at 2:00 PM houzj.fnst@fujitsu.com
<houzj.fnst@fujitsu.com> wrote:

I noticed that there are some duplicated codes in pgoutput_change()

function

which can be simplified, and here is an attempt to do that.

For REORDER_BUFFER_CHANGE_DELETE, when the old tuple is missing, after
this patch, we will still send BEGIN and do OutputPluginWrite, etc.
Also, it will try to perform row_filter when none of old_slot or
new_slot is set. I don't know for which particular case we have s
handling missing old tuples for deletes but that might require changes
in your proposed patch.

I researched this a bit. I think the old tuple will be null only if the
modified table doesn't have PK or RI when the DELETE happens (referred to
the heap_delete()), but in that case the DELETE won't be allowed to be
replicated(e.g. the DELETE will either error out or be filtered by table level
filter in pgoutput_change).

I also checked this for system table and in that case it is null but
reorderbuffer doesn't forward it. For user_catalog_table, similarily, the
DELETE should be filtered by table filter in pgoutput_change as well.

So, I think we can remove this check and log.
And here is the new version patch which removes that for now.

Best Regards,
Hou zj

Attachments:

v2-0001-simplify-the-code-in-pgoutput_change.patchapplication/octet-stream; name=v2-0001-simplify-the-code-in-pgoutput_change.patchDownload
From 3810ff2dbd58eb812b95eb6c801fa23a010e82bc Mon Sep 17 00:00:00 2001
From: Hou Zhijie <houzj.fnst@cn.fujitsu.com>
Date: Wed, 15 Mar 2023 15:57:13 +0800
Subject: [PATCH v2] simplify the code in pgoutput_change

---
 src/backend/replication/pgoutput/pgoutput.c | 224 ++++++--------------
 1 file changed, 68 insertions(+), 156 deletions(-)

diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 3a2d2e357e..baccca30e8 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -1448,187 +1448,99 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 	/* Avoid leaking memory by using and resetting our own context */
 	old = MemoryContextSwitchTo(data->context);
 
-	/* Send the data */
-	switch (action)
+	/* Switch relation if publishing via root. */
+	if (relentry->publish_as_relid != RelationGetRelid(relation))
 	{
-		case REORDER_BUFFER_CHANGE_INSERT:
-			new_slot = relentry->new_slot;
-			ExecStoreHeapTuple(&change->data.tp.newtuple->tuple,
-							   new_slot, false);
-
-			/* Switch relation if publishing via root. */
-			if (relentry->publish_as_relid != RelationGetRelid(relation))
-			{
-				Assert(relation->rd_rel->relispartition);
-				ancestor = RelationIdGetRelation(relentry->publish_as_relid);
-				targetrel = ancestor;
-				/* Convert tuple if needed. */
-				if (relentry->attrmap)
-				{
-					TupleDesc	tupdesc = RelationGetDescr(targetrel);
-
-					new_slot = execute_attr_map_slot(relentry->attrmap,
-													 new_slot,
-													 MakeTupleTableSlot(tupdesc, &TTSOpsVirtual));
-				}
-			}
-
-			/* Check row filter */
-			if (!pgoutput_row_filter(targetrel, NULL, &new_slot, relentry,
-									 &action))
-				break;
-
-			/*
-			 * Send BEGIN if we haven't yet.
-			 *
-			 * We send the BEGIN message after ensuring that we will actually
-			 * send the change. This avoids sending a pair of BEGIN/COMMIT
-			 * messages for empty transactions.
-			 */
-			if (txndata && !txndata->sent_begin_txn)
-				pgoutput_send_begin(ctx, txn);
-
-			/*
-			 * Schema should be sent using the original relation because it
-			 * also sends the ancestor's relation.
-			 */
-			maybe_send_schema(ctx, change, relation, relentry);
+		Assert(relation->rd_rel->relispartition);
+		ancestor = RelationIdGetRelation(relentry->publish_as_relid);
+		targetrel = ancestor;
+	}
 
-			OutputPluginPrepareWrite(ctx, true);
-			logicalrep_write_insert(ctx->out, xid, targetrel, new_slot,
-									data->binary, relentry->columns);
-			OutputPluginWrite(ctx, true);
-			break;
-		case REORDER_BUFFER_CHANGE_UPDATE:
-			if (change->data.tp.oldtuple)
-			{
-				old_slot = relentry->old_slot;
-				ExecStoreHeapTuple(&change->data.tp.oldtuple->tuple,
-								   old_slot, false);
-			}
+	if (change->data.tp.newtuple)
+	{
+		new_slot = relentry->new_slot;
+		ExecStoreHeapTuple(&change->data.tp.newtuple->tuple, new_slot, false);
 
-			new_slot = relentry->new_slot;
-			ExecStoreHeapTuple(&change->data.tp.newtuple->tuple,
-							   new_slot, false);
+		/* Convert tuple if needed. */
+		if (relentry->attrmap)
+		{
+			TupleTableSlot *slot = MakeTupleTableSlot(RelationGetDescr(targetrel),
+													  &TTSOpsVirtual);
 
-			/* Switch relation if publishing via root. */
-			if (relentry->publish_as_relid != RelationGetRelid(relation))
-			{
-				Assert(relation->rd_rel->relispartition);
-				ancestor = RelationIdGetRelation(relentry->publish_as_relid);
-				targetrel = ancestor;
-				/* Convert tuples if needed. */
-				if (relentry->attrmap)
-				{
-					TupleDesc	tupdesc = RelationGetDescr(targetrel);
+			new_slot = execute_attr_map_slot(relentry->attrmap, new_slot, slot);
+		}
+	}
 
-					if (old_slot)
-						old_slot = execute_attr_map_slot(relentry->attrmap,
-														 old_slot,
-														 MakeTupleTableSlot(tupdesc, &TTSOpsVirtual));
+	if (change->data.tp.oldtuple)
+	{
+		old_slot = relentry->old_slot;
+		ExecStoreHeapTuple(&change->data.tp.oldtuple->tuple, old_slot, false);
 
-					new_slot = execute_attr_map_slot(relentry->attrmap,
-													 new_slot,
-													 MakeTupleTableSlot(tupdesc, &TTSOpsVirtual));
-				}
-			}
+		/* Convert tuple if needed. */
+		if (relentry->attrmap)
+		{
+			TupleTableSlot *slot = MakeTupleTableSlot(RelationGetDescr(targetrel),
+													  &TTSOpsVirtual);
 
-			/* Check row filter */
-			if (!pgoutput_row_filter(targetrel, old_slot, &new_slot,
-									 relentry, &action))
-				break;
+			old_slot = execute_attr_map_slot(relentry->attrmap, old_slot, slot);
+		}
+	}
 
-			/* Send BEGIN if we haven't yet */
-			if (txndata && !txndata->sent_begin_txn)
-				pgoutput_send_begin(ctx, txn);
+	/*
+	 * Check row filter.
+	 *
+	 * Updates could be transformed to inserts or deletes based on the results
+	 * of the row filter for old and new tuple.
+	 */
+	if (!pgoutput_row_filter(targetrel, old_slot, &new_slot, relentry, &action))
+		goto cleanup;
 
-			maybe_send_schema(ctx, change, relation, relentry);
+	/*
+	 * Send BEGIN if we haven't yet.
+	 *
+	 * We send the BEGIN message after ensuring that we will actually send the
+	 * change. This avoids sending a pair of BEGIN/COMMIT messages for empty
+	 * transactions.
+	 */
+	if (txndata && !txndata->sent_begin_txn)
+		pgoutput_send_begin(ctx, txn);
 
-			OutputPluginPrepareWrite(ctx, true);
+	/*
+	 * Schema should be sent using the original relation because it also sends
+	 * the ancestor's relation.
+	 */
+	maybe_send_schema(ctx, change, relation, relentry);
 
-			/*
-			 * Updates could be transformed to inserts or deletes based on the
-			 * results of the row filter for old and new tuple.
-			 */
-			switch (action)
-			{
-				case REORDER_BUFFER_CHANGE_INSERT:
-					logicalrep_write_insert(ctx->out, xid, targetrel,
-											new_slot, data->binary,
-											relentry->columns);
-					break;
-				case REORDER_BUFFER_CHANGE_UPDATE:
-					logicalrep_write_update(ctx->out, xid, targetrel,
-											old_slot, new_slot, data->binary,
-											relentry->columns);
-					break;
-				case REORDER_BUFFER_CHANGE_DELETE:
-					logicalrep_write_delete(ctx->out, xid, targetrel,
-											old_slot, data->binary,
-											relentry->columns);
-					break;
-				default:
-					Assert(false);
-			}
+	OutputPluginPrepareWrite(ctx, true);
 
-			OutputPluginWrite(ctx, true);
+	/* Send the data */
+	switch (action)
+	{
+		case REORDER_BUFFER_CHANGE_INSERT:
+			logicalrep_write_insert(ctx->out, xid, targetrel, new_slot,
+									data->binary, relentry->columns);
+			break;
+		case REORDER_BUFFER_CHANGE_UPDATE:
+			logicalrep_write_update(ctx->out, xid, targetrel, old_slot,
+									new_slot, data->binary, relentry->columns);
 			break;
 		case REORDER_BUFFER_CHANGE_DELETE:
-			if (change->data.tp.oldtuple)
-			{
-				old_slot = relentry->old_slot;
-
-				ExecStoreHeapTuple(&change->data.tp.oldtuple->tuple,
-								   old_slot, false);
-
-				/* Switch relation if publishing via root. */
-				if (relentry->publish_as_relid != RelationGetRelid(relation))
-				{
-					Assert(relation->rd_rel->relispartition);
-					ancestor = RelationIdGetRelation(relentry->publish_as_relid);
-					targetrel = ancestor;
-					/* Convert tuple if needed. */
-					if (relentry->attrmap)
-					{
-						TupleDesc	tupdesc = RelationGetDescr(targetrel);
-
-						old_slot = execute_attr_map_slot(relentry->attrmap,
-														 old_slot,
-														 MakeTupleTableSlot(tupdesc, &TTSOpsVirtual));
-					}
-				}
-
-				/* Check row filter */
-				if (!pgoutput_row_filter(targetrel, old_slot, &new_slot,
-										 relentry, &action))
-					break;
-
-				/* Send BEGIN if we haven't yet */
-				if (txndata && !txndata->sent_begin_txn)
-					pgoutput_send_begin(ctx, txn);
-
-				maybe_send_schema(ctx, change, relation, relentry);
-
-				OutputPluginPrepareWrite(ctx, true);
-				logicalrep_write_delete(ctx->out, xid, targetrel,
-										old_slot, data->binary,
-										relentry->columns);
-				OutputPluginWrite(ctx, true);
-			}
-			else
-				elog(DEBUG1, "didn't send DELETE change because of missing oldtuple");
+			logicalrep_write_delete(ctx->out, xid, targetrel, old_slot,
+									data->binary, relentry->columns);
 			break;
 		default:
 			Assert(false);
 	}
 
+	OutputPluginWrite(ctx, true);
+
+cleanup:
 	if (RelationIsValid(ancestor))
 	{
 		RelationClose(ancestor);
 		ancestor = NULL;
 	}
 
-	/* Cleanup */
 	MemoryContextSwitchTo(old);
 	MemoryContextReset(data->context);
 }
-- 
2.30.0.windows.2

#5houzj.fnst@fujitsu.com
houzj.fnst@fujitsu.com
In reply to: Peter Smith (#3)
RE: Simplify some codes in pgoutput

On Friday, March 17, 2023 11:49 AM Peter Smith <smithpb2250@gmail.com> wrote:

On Wed, Mar 15, 2023 at 7:30 PM houzj.fnst@fujitsu.com
<houzj.fnst@fujitsu.com> wrote:

Hi,

I noticed that there are some duplicated codes in pgoutput_change()
function which can be simplified, and here is an attempt to do that.

Hi Hou-san.

I had a quick look at the 0001 patch.

Here are some first comments.

Thanks for the comments.

======

1.
+ if (relentry->attrmap)
+ old_slot = execute_attr_map_slot(relentry->attrmap, old_slot,
+ MakeTupleTableSlot(RelationGetDescr(targetrel),
+ &TTSOpsVirtual));

1a.
IMO maybe it was more readable before when there was a separate 'tupdesc'
variable, instead of trying to squeeze too much into one statement.

1b.
Should you retain the old comments that said "/* Convert tuple if needed. */"

Added.

~~~

2.
- if (old_slot)
- old_slot = execute_attr_map_slot(relentry->attrmap,
- old_slot,
- MakeTupleTableSlot(tupdesc, &TTSOpsVirtual));

The original code for REORDER_BUFFER_CHANGE_UPDATE was checking "if
(old_slot)" but that check seems no longer present. Is it OK?

I think the logic is the same.

~~~

3.
- /*
- * Send BEGIN if we haven't yet.
- *
- * We send the BEGIN message after ensuring that we will actually
- * send the change. This avoids sending a pair of BEGIN/COMMIT
- * messages for empty transactions.
- */

That original longer comment has been replaced with just "/* Send BEGIN if we
haven't yet */". Won't it be better to retain the more informative longer
comment?

Added.

~~~

4.
+
+cleanup:
if (RelationIsValid(ancestor))
{
RelationClose(ancestor);

~

Since you've introduced a new label 'cleanup:' then IMO you can remove that
old comment "/* Cleanup */".

Removed.

Best Regards,
Hou zj

#6houzj.fnst@fujitsu.com
houzj.fnst@fujitsu.com
In reply to: houzj.fnst@fujitsu.com (#4)
1 attachment(s)
RE: Simplify some codes in pgoutput

On Monday, March 20, 2023 5:20 PMhouzj.fnst@fujitsu.com wrote:

On Thursday, March 16, 2023 12:30 PM Amit Kapila <amit.kapila16@gmail.com>
wrote:

On Wed, Mar 15, 2023 at 2:00 PM houzj.fnst@fujitsu.com
<houzj.fnst@fujitsu.com> wrote:

I noticed that there are some duplicated codes in pgoutput_change()

function

which can be simplified, and here is an attempt to do that.

For REORDER_BUFFER_CHANGE_DELETE, when the old tuple is missing, after
this patch, we will still send BEGIN and do OutputPluginWrite, etc.
Also, it will try to perform row_filter when none of old_slot or
new_slot is set. I don't know for which particular case we have s
handling missing old tuples for deletes but that might require changes
in your proposed patch.

I researched this a bit. I think the old tuple will be null only if the modified table
doesn't have PK or RI when the DELETE happens (referred to the heap_delete()),
but in that case the DELETE won't be allowed to be replicated(e.g. the DELETE
will either error out or be filtered by table level filter in pgoutput_change).

I also checked this for system table and in that case it is null but reorderbuffer
doesn't forward it. For user_catalog_table, similarily, the DELETE should be
filtered by table filter in pgoutput_change as well.

So, I think we can remove this check and log.
And here is the new version patch which removes that for now.

After rethinking about this, it seems better leave this check for now. Although
it may be unnecessary, but we can remove that later as a separate patch when we
are sure about this. So, here is a patch that add this check back.

Best Regards,
Hou zj

Attachments:

v3-0001-simplify-the-code-in-pgoutput_change.patchapplication/octet-stream; name=v3-0001-simplify-the-code-in-pgoutput_change.patchDownload
From 451ad9a768bedd68938bcb454c537b5b64c331c7 Mon Sep 17 00:00:00 2001
From: Hou Zhijie <houzj.fnst@cn.fujitsu.com>
Date: Wed, 15 Mar 2023 15:57:13 +0800
Subject: [PATCH v3] simplify the code in pgoutput_change

---
 src/backend/replication/pgoutput/pgoutput.c | 236 +++++++-------------
 1 file changed, 80 insertions(+), 156 deletions(-)

diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 3a2d2e357e..9d5aed8448 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -1440,6 +1440,18 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 		case REORDER_BUFFER_CHANGE_DELETE:
 			if (!relentry->pubactions.pubdelete)
 				return;
+
+			/*
+			 * If replica identity columns are not defined on this table, we
+			 * can only determine that a row was deleted. Since the DELETE
+			 * action cannot be published in this scenario, we simply return
+			 * here.
+			 */
+			if (!change->data.tp.oldtuple)
+			{
+				elog(DEBUG1, "didn't send DELETE change because of missing oldtuple");
+				return;
+			}
 			break;
 		default:
 			Assert(false);
@@ -1448,187 +1460,99 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 	/* Avoid leaking memory by using and resetting our own context */
 	old = MemoryContextSwitchTo(data->context);
 
-	/* Send the data */
-	switch (action)
+	/* Switch relation if publishing via root. */
+	if (relentry->publish_as_relid != RelationGetRelid(relation))
 	{
-		case REORDER_BUFFER_CHANGE_INSERT:
-			new_slot = relentry->new_slot;
-			ExecStoreHeapTuple(&change->data.tp.newtuple->tuple,
-							   new_slot, false);
-
-			/* Switch relation if publishing via root. */
-			if (relentry->publish_as_relid != RelationGetRelid(relation))
-			{
-				Assert(relation->rd_rel->relispartition);
-				ancestor = RelationIdGetRelation(relentry->publish_as_relid);
-				targetrel = ancestor;
-				/* Convert tuple if needed. */
-				if (relentry->attrmap)
-				{
-					TupleDesc	tupdesc = RelationGetDescr(targetrel);
-
-					new_slot = execute_attr_map_slot(relentry->attrmap,
-													 new_slot,
-													 MakeTupleTableSlot(tupdesc, &TTSOpsVirtual));
-				}
-			}
-
-			/* Check row filter */
-			if (!pgoutput_row_filter(targetrel, NULL, &new_slot, relentry,
-									 &action))
-				break;
-
-			/*
-			 * Send BEGIN if we haven't yet.
-			 *
-			 * We send the BEGIN message after ensuring that we will actually
-			 * send the change. This avoids sending a pair of BEGIN/COMMIT
-			 * messages for empty transactions.
-			 */
-			if (txndata && !txndata->sent_begin_txn)
-				pgoutput_send_begin(ctx, txn);
-
-			/*
-			 * Schema should be sent using the original relation because it
-			 * also sends the ancestor's relation.
-			 */
-			maybe_send_schema(ctx, change, relation, relentry);
+		Assert(relation->rd_rel->relispartition);
+		ancestor = RelationIdGetRelation(relentry->publish_as_relid);
+		targetrel = ancestor;
+	}
 
-			OutputPluginPrepareWrite(ctx, true);
-			logicalrep_write_insert(ctx->out, xid, targetrel, new_slot,
-									data->binary, relentry->columns);
-			OutputPluginWrite(ctx, true);
-			break;
-		case REORDER_BUFFER_CHANGE_UPDATE:
-			if (change->data.tp.oldtuple)
-			{
-				old_slot = relentry->old_slot;
-				ExecStoreHeapTuple(&change->data.tp.oldtuple->tuple,
-								   old_slot, false);
-			}
+	if (change->data.tp.newtuple)
+	{
+		new_slot = relentry->new_slot;
+		ExecStoreHeapTuple(&change->data.tp.newtuple->tuple, new_slot, false);
 
-			new_slot = relentry->new_slot;
-			ExecStoreHeapTuple(&change->data.tp.newtuple->tuple,
-							   new_slot, false);
+		/* Convert tuple if needed. */
+		if (relentry->attrmap)
+		{
+			TupleTableSlot *slot = MakeTupleTableSlot(RelationGetDescr(targetrel),
+													  &TTSOpsVirtual);
 
-			/* Switch relation if publishing via root. */
-			if (relentry->publish_as_relid != RelationGetRelid(relation))
-			{
-				Assert(relation->rd_rel->relispartition);
-				ancestor = RelationIdGetRelation(relentry->publish_as_relid);
-				targetrel = ancestor;
-				/* Convert tuples if needed. */
-				if (relentry->attrmap)
-				{
-					TupleDesc	tupdesc = RelationGetDescr(targetrel);
+			new_slot = execute_attr_map_slot(relentry->attrmap, new_slot, slot);
+		}
+	}
 
-					if (old_slot)
-						old_slot = execute_attr_map_slot(relentry->attrmap,
-														 old_slot,
-														 MakeTupleTableSlot(tupdesc, &TTSOpsVirtual));
+	if (change->data.tp.oldtuple)
+	{
+		old_slot = relentry->old_slot;
+		ExecStoreHeapTuple(&change->data.tp.oldtuple->tuple, old_slot, false);
 
-					new_slot = execute_attr_map_slot(relentry->attrmap,
-													 new_slot,
-													 MakeTupleTableSlot(tupdesc, &TTSOpsVirtual));
-				}
-			}
+		/* Convert tuple if needed. */
+		if (relentry->attrmap)
+		{
+			TupleTableSlot *slot = MakeTupleTableSlot(RelationGetDescr(targetrel),
+													  &TTSOpsVirtual);
 
-			/* Check row filter */
-			if (!pgoutput_row_filter(targetrel, old_slot, &new_slot,
-									 relentry, &action))
-				break;
+			old_slot = execute_attr_map_slot(relentry->attrmap, old_slot, slot);
+		}
+	}
 
-			/* Send BEGIN if we haven't yet */
-			if (txndata && !txndata->sent_begin_txn)
-				pgoutput_send_begin(ctx, txn);
+	/*
+	 * Check row filter.
+	 *
+	 * Updates could be transformed to inserts or deletes based on the results
+	 * of the row filter for old and new tuple.
+	 */
+	if (!pgoutput_row_filter(targetrel, old_slot, &new_slot, relentry, &action))
+		goto cleanup;
 
-			maybe_send_schema(ctx, change, relation, relentry);
+	/*
+	 * Send BEGIN if we haven't yet.
+	 *
+	 * We send the BEGIN message after ensuring that we will actually send the
+	 * change. This avoids sending a pair of BEGIN/COMMIT messages for empty
+	 * transactions.
+	 */
+	if (txndata && !txndata->sent_begin_txn)
+		pgoutput_send_begin(ctx, txn);
 
-			OutputPluginPrepareWrite(ctx, true);
+	/*
+	 * Schema should be sent using the original relation because it also sends
+	 * the ancestor's relation.
+	 */
+	maybe_send_schema(ctx, change, relation, relentry);
 
-			/*
-			 * Updates could be transformed to inserts or deletes based on the
-			 * results of the row filter for old and new tuple.
-			 */
-			switch (action)
-			{
-				case REORDER_BUFFER_CHANGE_INSERT:
-					logicalrep_write_insert(ctx->out, xid, targetrel,
-											new_slot, data->binary,
-											relentry->columns);
-					break;
-				case REORDER_BUFFER_CHANGE_UPDATE:
-					logicalrep_write_update(ctx->out, xid, targetrel,
-											old_slot, new_slot, data->binary,
-											relentry->columns);
-					break;
-				case REORDER_BUFFER_CHANGE_DELETE:
-					logicalrep_write_delete(ctx->out, xid, targetrel,
-											old_slot, data->binary,
-											relentry->columns);
-					break;
-				default:
-					Assert(false);
-			}
+	OutputPluginPrepareWrite(ctx, true);
 
-			OutputPluginWrite(ctx, true);
+	/* Send the data */
+	switch (action)
+	{
+		case REORDER_BUFFER_CHANGE_INSERT:
+			logicalrep_write_insert(ctx->out, xid, targetrel, new_slot,
+									data->binary, relentry->columns);
+			break;
+		case REORDER_BUFFER_CHANGE_UPDATE:
+			logicalrep_write_update(ctx->out, xid, targetrel, old_slot,
+									new_slot, data->binary, relentry->columns);
 			break;
 		case REORDER_BUFFER_CHANGE_DELETE:
-			if (change->data.tp.oldtuple)
-			{
-				old_slot = relentry->old_slot;
-
-				ExecStoreHeapTuple(&change->data.tp.oldtuple->tuple,
-								   old_slot, false);
-
-				/* Switch relation if publishing via root. */
-				if (relentry->publish_as_relid != RelationGetRelid(relation))
-				{
-					Assert(relation->rd_rel->relispartition);
-					ancestor = RelationIdGetRelation(relentry->publish_as_relid);
-					targetrel = ancestor;
-					/* Convert tuple if needed. */
-					if (relentry->attrmap)
-					{
-						TupleDesc	tupdesc = RelationGetDescr(targetrel);
-
-						old_slot = execute_attr_map_slot(relentry->attrmap,
-														 old_slot,
-														 MakeTupleTableSlot(tupdesc, &TTSOpsVirtual));
-					}
-				}
-
-				/* Check row filter */
-				if (!pgoutput_row_filter(targetrel, old_slot, &new_slot,
-										 relentry, &action))
-					break;
-
-				/* Send BEGIN if we haven't yet */
-				if (txndata && !txndata->sent_begin_txn)
-					pgoutput_send_begin(ctx, txn);
-
-				maybe_send_schema(ctx, change, relation, relentry);
-
-				OutputPluginPrepareWrite(ctx, true);
-				logicalrep_write_delete(ctx->out, xid, targetrel,
-										old_slot, data->binary,
-										relentry->columns);
-				OutputPluginWrite(ctx, true);
-			}
-			else
-				elog(DEBUG1, "didn't send DELETE change because of missing oldtuple");
+			logicalrep_write_delete(ctx->out, xid, targetrel, old_slot,
+									data->binary, relentry->columns);
 			break;
 		default:
 			Assert(false);
 	}
 
+	OutputPluginWrite(ctx, true);
+
+cleanup:
 	if (RelationIsValid(ancestor))
 	{
 		RelationClose(ancestor);
 		ancestor = NULL;
 	}
 
-	/* Cleanup */
 	MemoryContextSwitchTo(old);
 	MemoryContextReset(data->context);
 }
-- 
2.30.0.windows.2

#7Peter Smith
smithpb2250@gmail.com
In reply to: houzj.fnst@fujitsu.com (#6)
Re: Simplify some codes in pgoutput

Hi Hou-san,

I tried to compare the logic of patch v3-0001 versus the original HEAD code.

IMO this patch logic is not exactly the same as before -- there are
some subtle differences. I am not sure if these differences represent
real problems or not.

Below are all my review comments:

======

1.
/* Switch relation if publishing via root. */
if (relentry->publish_as_relid != RelationGetRelid(relation))
{
Assert(relation->rd_rel->relispartition);
ancestor = RelationIdGetRelation(relentry->publish_as_relid);
targetrel = ancestor;
}

~

The "switch relation if publishing via root" logic is now happening
first, whereas the original code was doing this after the slot
assignments. AFAIK it does not matter, it's just a small point of
difference.

======

2.
/* Convert tuple if needed. */
if (relentry->attrmap)
{
...
}

The "Convert tuple if needed." logic looks the same, but when it is
executed is NOT the same. It could be a problem.

Previously, the conversion would only happen within the "Switch
relation if publishing via root." condition. But the patch no longer
has that extra condition -- now I think it attempts conversion every
time regardless of "publishing via root".

I would expect the "publish via root" case to be less common, so even
if the current code works, by omitting that check won't this patch
have an unnecessary performance hit due to the extra conversions?

~~

3.
if (old_slot)
old_slot = execute_attr_map_slot(relentry->attrmap,old_slot,MakeTupleTableSlot(tupdesc,
&TTSOpsVirtual));

~

The previous conversion code for UPDATE (shown above) was checking
"if (old_slot)". Actually, I don't know why that check was even
necessary before but it seems to have been accounting for a
possibility that UPDATE might not have "oldtuple".

But this combination (if indeed it was possible) is not handled
anymore with the patch code because the old_slot is unconditionally
assigned in the same block doing this conversion. Perhaps that
original HEAD extra check was just overkill? TAP tests obviously
still are passing with the patch, but anyway, this is yet another
small point of difference for the refactored patch code.

======

4.
AFAIK, the "if (change->data.tp.newtuple)" can only be true for INSERT
or UPDATE, so the code would be better to include a sanity Assert.

SUGGESTION
if (change->data.tp.newtuple)
{
Assert(action == REORDER_BUFFER_CHANGE_INSERT || action ==
REORDER_BUFFER_CHANGE_UPDATE);
...
}

======

5.
AFAIK, the "if (change->data.tp.oldtuple)" can only be true for UPDATE
or DELETE, so the code would be better to include a sanity Assert.

SUGGESTION
if (change->data.tp.oldtuple)
{
Assert(action == REORDER_BUFFER_CHANGE_UPDATE || action ==
REORDER_BUFFER_CHANGE_DELETE);
...
}

======

6.
I suggest moving the "change->data.tp.oldtuple" check before the
"change->data.tp.newtuple" check. I don't think it makes any
difference, but it seems more natural IMO to have old before new.

------
Kind Regards,
Peter Smith

#8houzj.fnst@fujitsu.com
houzj.fnst@fujitsu.com
In reply to: Peter Smith (#7)
1 attachment(s)
RE: Simplify some codes in pgoutput

On Thursday, March 30, 2023 9:15 AM Peter Smith <smithpb2250@gmail.com> wrote:

Hi Hou-san,

I tried to compare the logic of patch v3-0001 versus the original HEAD code.

IMO this patch logic is not exactly the same as before -- there are
some subtle differences. I am not sure if these differences represent
real problems or not.

Below are all my review comments:

Thanks for the check and comments.

======

1.
/* Switch relation if publishing via root. */
if (relentry->publish_as_relid != RelationGetRelid(relation))
{
Assert(relation->rd_rel->relispartition);
ancestor = RelationIdGetRelation(relentry->publish_as_relid);
targetrel = ancestor;
}

~

The "switch relation if publishing via root" logic is now happening
first, whereas the original code was doing this after the slot
assignments. AFAIK it does not matter, it's just a small point of
difference.

I also think it doesn't matter.

======

2.
/* Convert tuple if needed. */
if (relentry-> attrmap)
{
...
}

The "Convert tuple if needed." logic looks the same, but when it is
executed is NOT the same. It could be a problem.

Previously, the conversion would only happen within the "Switch
relation if publishing via root." condition. But the patch no longer
has that extra condition -- now I think it attempts conversion every
time regardless of "publishing via root".

I would expect the "publish via root" case to be less common, so even
if the current code works, by omitting that check won't this patch
have an unnecessary performance hit due to the extra conversions?

No, the conversions won't happen in normal cases because "if (relentry-> attrmap)"
will pass only if we need to switch relation(publish via root).

~~

3.
if (old_slot)
old_slot =
execute_attr_map_slot(relentry->attrmap,old_slot,MakeTupleTableSlot(tupde
sc,
&TTSOpsVirtual));

~

The previous conversion code for UPDATE (shown above) was checking
"if (old_slot)". Actually, I don't know why that check was even
necessary before but it seems to have been accounting for a
possibility that UPDATE might not have "oldtuple".

If the RI key wasn't updated, then it's possible the old tuple is null.

But this combination (if indeed it was possible) is not handled
anymore with the patch code because the old_slot is unconditionally
assigned in the same block doing this conversion.

I think this case is handled by the generic old slot conversion in the patch.

======

4.
AFAIK, the "if (change->data.tp.newtuple)" can only be true for INSERT
or UPDATE, so the code would be better to include a sanity Assert.

SUGGESTION
if (change->data.tp.newtuple)
{
Assert(action == REORDER_BUFFER_CHANGE_INSERT || action ==
REORDER_BUFFER_CHANGE_UPDATE);
...
}

======

5.
AFAIK, the "if (change->data.tp.oldtuple)" can only be true for UPDATE
or DELETE, so the code would be better to include a sanity Assert.

SUGGESTION
if (change->data.tp.oldtuple)
{
Assert(action == REORDER_BUFFER_CHANGE_UPDATE || action ==
REORDER_BUFFER_CHANGE_DELETE);
...

It might be fine but I am not sure if it's necessary to add this in this
patch as we don't have such assertion before.

======

6.
I suggest moving the "change->data.tp.oldtuple" check before the
"change->data.tp.newtuple" check. I don't think it makes any
difference, but it seems more natural IMO to have old before new.

Changed.

Best Regards,
Hou zj

Attachments:

v4-0001-simplify-the-code-in-pgoutput_change.patchapplication/octet-stream; name=v4-0001-simplify-the-code-in-pgoutput_change.patchDownload
From 7691acce6b754e392ff5ae1e52bc962df783c52d Mon Sep 17 00:00:00 2001
From: Hou Zhijie <houzj.fnst@cn.fujitsu.com>
Date: Wed, 15 Mar 2023 15:57:13 +0800
Subject: [PATCH v4] simplify the code in pgoutput_change

---
 src/backend/replication/pgoutput/pgoutput.c | 236 +++++++-------------
 1 file changed, 80 insertions(+), 156 deletions(-)

diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 3a2d2e357e..d3ad48f480 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -1440,6 +1440,18 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 		case REORDER_BUFFER_CHANGE_DELETE:
 			if (!relentry->pubactions.pubdelete)
 				return;
+
+			/*
+			 * If replica identity columns are not defined on this table, we
+			 * can only determine that a row was deleted. Since the DELETE
+			 * action cannot be published in this scenario, we simply return
+			 * here.
+			 */
+			if (!change->data.tp.oldtuple)
+			{
+				elog(DEBUG1, "didn't send DELETE change because of missing oldtuple");
+				return;
+			}
 			break;
 		default:
 			Assert(false);
@@ -1448,187 +1460,99 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 	/* Avoid leaking memory by using and resetting our own context */
 	old = MemoryContextSwitchTo(data->context);
 
-	/* Send the data */
-	switch (action)
+	/* Switch relation if publishing via root. */
+	if (relentry->publish_as_relid != RelationGetRelid(relation))
 	{
-		case REORDER_BUFFER_CHANGE_INSERT:
-			new_slot = relentry->new_slot;
-			ExecStoreHeapTuple(&change->data.tp.newtuple->tuple,
-							   new_slot, false);
-
-			/* Switch relation if publishing via root. */
-			if (relentry->publish_as_relid != RelationGetRelid(relation))
-			{
-				Assert(relation->rd_rel->relispartition);
-				ancestor = RelationIdGetRelation(relentry->publish_as_relid);
-				targetrel = ancestor;
-				/* Convert tuple if needed. */
-				if (relentry->attrmap)
-				{
-					TupleDesc	tupdesc = RelationGetDescr(targetrel);
-
-					new_slot = execute_attr_map_slot(relentry->attrmap,
-													 new_slot,
-													 MakeTupleTableSlot(tupdesc, &TTSOpsVirtual));
-				}
-			}
-
-			/* Check row filter */
-			if (!pgoutput_row_filter(targetrel, NULL, &new_slot, relentry,
-									 &action))
-				break;
-
-			/*
-			 * Send BEGIN if we haven't yet.
-			 *
-			 * We send the BEGIN message after ensuring that we will actually
-			 * send the change. This avoids sending a pair of BEGIN/COMMIT
-			 * messages for empty transactions.
-			 */
-			if (txndata && !txndata->sent_begin_txn)
-				pgoutput_send_begin(ctx, txn);
-
-			/*
-			 * Schema should be sent using the original relation because it
-			 * also sends the ancestor's relation.
-			 */
-			maybe_send_schema(ctx, change, relation, relentry);
+		Assert(relation->rd_rel->relispartition);
+		ancestor = RelationIdGetRelation(relentry->publish_as_relid);
+		targetrel = ancestor;
+	}
 
-			OutputPluginPrepareWrite(ctx, true);
-			logicalrep_write_insert(ctx->out, xid, targetrel, new_slot,
-									data->binary, relentry->columns);
-			OutputPluginWrite(ctx, true);
-			break;
-		case REORDER_BUFFER_CHANGE_UPDATE:
-			if (change->data.tp.oldtuple)
-			{
-				old_slot = relentry->old_slot;
-				ExecStoreHeapTuple(&change->data.tp.oldtuple->tuple,
-								   old_slot, false);
-			}
+	if (change->data.tp.oldtuple)
+	{
+		old_slot = relentry->old_slot;
+		ExecStoreHeapTuple(&change->data.tp.oldtuple->tuple, old_slot, false);
 
-			new_slot = relentry->new_slot;
-			ExecStoreHeapTuple(&change->data.tp.newtuple->tuple,
-							   new_slot, false);
+		/* Convert tuple if needed. */
+		if (relentry->attrmap)
+		{
+			TupleTableSlot *slot = MakeTupleTableSlot(RelationGetDescr(targetrel),
+													  &TTSOpsVirtual);
 
-			/* Switch relation if publishing via root. */
-			if (relentry->publish_as_relid != RelationGetRelid(relation))
-			{
-				Assert(relation->rd_rel->relispartition);
-				ancestor = RelationIdGetRelation(relentry->publish_as_relid);
-				targetrel = ancestor;
-				/* Convert tuples if needed. */
-				if (relentry->attrmap)
-				{
-					TupleDesc	tupdesc = RelationGetDescr(targetrel);
+			old_slot = execute_attr_map_slot(relentry->attrmap, old_slot, slot);
+		}
+	}
 
-					if (old_slot)
-						old_slot = execute_attr_map_slot(relentry->attrmap,
-														 old_slot,
-														 MakeTupleTableSlot(tupdesc, &TTSOpsVirtual));
+	if (change->data.tp.newtuple)
+	{
+		new_slot = relentry->new_slot;
+		ExecStoreHeapTuple(&change->data.tp.newtuple->tuple, new_slot, false);
 
-					new_slot = execute_attr_map_slot(relentry->attrmap,
-													 new_slot,
-													 MakeTupleTableSlot(tupdesc, &TTSOpsVirtual));
-				}
-			}
+		/* Convert tuple if needed. */
+		if (relentry->attrmap)
+		{
+			TupleTableSlot *slot = MakeTupleTableSlot(RelationGetDescr(targetrel),
+													  &TTSOpsVirtual);
 
-			/* Check row filter */
-			if (!pgoutput_row_filter(targetrel, old_slot, &new_slot,
-									 relentry, &action))
-				break;
+			new_slot = execute_attr_map_slot(relentry->attrmap, new_slot, slot);
+		}
+	}
 
-			/* Send BEGIN if we haven't yet */
-			if (txndata && !txndata->sent_begin_txn)
-				pgoutput_send_begin(ctx, txn);
+	/*
+	 * Check row filter.
+	 *
+	 * Updates could be transformed to inserts or deletes based on the results
+	 * of the row filter for old and new tuple.
+	 */
+	if (!pgoutput_row_filter(targetrel, old_slot, &new_slot, relentry, &action))
+		goto cleanup;
 
-			maybe_send_schema(ctx, change, relation, relentry);
+	/*
+	 * Send BEGIN if we haven't yet.
+	 *
+	 * We send the BEGIN message after ensuring that we will actually send the
+	 * change. This avoids sending a pair of BEGIN/COMMIT messages for empty
+	 * transactions.
+	 */
+	if (txndata && !txndata->sent_begin_txn)
+		pgoutput_send_begin(ctx, txn);
 
-			OutputPluginPrepareWrite(ctx, true);
+	/*
+	 * Schema should be sent using the original relation because it also sends
+	 * the ancestor's relation.
+	 */
+	maybe_send_schema(ctx, change, relation, relentry);
 
-			/*
-			 * Updates could be transformed to inserts or deletes based on the
-			 * results of the row filter for old and new tuple.
-			 */
-			switch (action)
-			{
-				case REORDER_BUFFER_CHANGE_INSERT:
-					logicalrep_write_insert(ctx->out, xid, targetrel,
-											new_slot, data->binary,
-											relentry->columns);
-					break;
-				case REORDER_BUFFER_CHANGE_UPDATE:
-					logicalrep_write_update(ctx->out, xid, targetrel,
-											old_slot, new_slot, data->binary,
-											relentry->columns);
-					break;
-				case REORDER_BUFFER_CHANGE_DELETE:
-					logicalrep_write_delete(ctx->out, xid, targetrel,
-											old_slot, data->binary,
-											relentry->columns);
-					break;
-				default:
-					Assert(false);
-			}
+	OutputPluginPrepareWrite(ctx, true);
 
-			OutputPluginWrite(ctx, true);
+	/* Send the data */
+	switch (action)
+	{
+		case REORDER_BUFFER_CHANGE_INSERT:
+			logicalrep_write_insert(ctx->out, xid, targetrel, new_slot,
+									data->binary, relentry->columns);
+			break;
+		case REORDER_BUFFER_CHANGE_UPDATE:
+			logicalrep_write_update(ctx->out, xid, targetrel, old_slot,
+									new_slot, data->binary, relentry->columns);
 			break;
 		case REORDER_BUFFER_CHANGE_DELETE:
-			if (change->data.tp.oldtuple)
-			{
-				old_slot = relentry->old_slot;
-
-				ExecStoreHeapTuple(&change->data.tp.oldtuple->tuple,
-								   old_slot, false);
-
-				/* Switch relation if publishing via root. */
-				if (relentry->publish_as_relid != RelationGetRelid(relation))
-				{
-					Assert(relation->rd_rel->relispartition);
-					ancestor = RelationIdGetRelation(relentry->publish_as_relid);
-					targetrel = ancestor;
-					/* Convert tuple if needed. */
-					if (relentry->attrmap)
-					{
-						TupleDesc	tupdesc = RelationGetDescr(targetrel);
-
-						old_slot = execute_attr_map_slot(relentry->attrmap,
-														 old_slot,
-														 MakeTupleTableSlot(tupdesc, &TTSOpsVirtual));
-					}
-				}
-
-				/* Check row filter */
-				if (!pgoutput_row_filter(targetrel, old_slot, &new_slot,
-										 relentry, &action))
-					break;
-
-				/* Send BEGIN if we haven't yet */
-				if (txndata && !txndata->sent_begin_txn)
-					pgoutput_send_begin(ctx, txn);
-
-				maybe_send_schema(ctx, change, relation, relentry);
-
-				OutputPluginPrepareWrite(ctx, true);
-				logicalrep_write_delete(ctx->out, xid, targetrel,
-										old_slot, data->binary,
-										relentry->columns);
-				OutputPluginWrite(ctx, true);
-			}
-			else
-				elog(DEBUG1, "didn't send DELETE change because of missing oldtuple");
+			logicalrep_write_delete(ctx->out, xid, targetrel, old_slot,
+									data->binary, relentry->columns);
 			break;
 		default:
 			Assert(false);
 	}
 
+	OutputPluginWrite(ctx, true);
+
+cleanup:
 	if (RelationIsValid(ancestor))
 	{
 		RelationClose(ancestor);
 		ancestor = NULL;
 	}
 
-	/* Cleanup */
 	MemoryContextSwitchTo(old);
 	MemoryContextReset(data->context);
 }
-- 
2.30.0.windows.2

#9Peter Smith
smithpb2250@gmail.com
In reply to: houzj.fnst@fujitsu.com (#8)
Re: Simplify some codes in pgoutput

Hi Hou-san,

I looked again at v4-0001.

On Thu, Mar 30, 2023 at 2:01 PM houzj.fnst@fujitsu.com
<houzj.fnst@fujitsu.com> wrote:

On Thursday, March 30, 2023 9:15 AM Peter Smith <smithpb2250@gmail.com> wrote:

...

2.
/* Convert tuple if needed. */
if (relentry-> attrmap)
{
...
}

The "Convert tuple if needed." logic looks the same, but when it is
executed is NOT the same. It could be a problem.

Previously, the conversion would only happen within the "Switch
relation if publishing via root." condition. But the patch no longer
has that extra condition -- now I think it attempts conversion every
time regardless of "publishing via root".

I would expect the "publish via root" case to be less common, so even
if the current code works, by omitting that check won't this patch
have an unnecessary performance hit due to the extra conversions?

No, the conversions won't happen in normal cases because "if (relentry-> attrmap)"
will pass only if we need to switch relation(publish via root).

OK.

~~

3.
if (old_slot)
old_slot =
execute_attr_map_slot(relentry->attrmap,old_slot,MakeTupleTableSlot(tupde
sc,
&TTSOpsVirtual));

~

The previous conversion code for UPDATE (shown above) was checking
"if (old_slot)". Actually, I don't know why that check was even
necessary before but it seems to have been accounting for a
possibility that UPDATE might not have "oldtuple".

If the RI key wasn't updated, then it's possible the old tuple is null.

But this combination (if indeed it was possible) is not handled
anymore with the patch code because the old_slot is unconditionally
assigned in the same block doing this conversion.

I think this case is handled by the generic old slot conversion in the patch.

Yeah, I think you are right. Sorry, this was my mistake when reading v3.

======

4.
AFAIK, the "if (change->data.tp.newtuple)" can only be true for INSERT
or UPDATE, so the code would be better to include a sanity Assert.

SUGGESTION
if (change->data.tp.newtuple)
{
Assert(action == REORDER_BUFFER_CHANGE_INSERT || action ==
REORDER_BUFFER_CHANGE_UPDATE);
...
}

======

5.
AFAIK, the "if (change->data.tp.oldtuple)" can only be true for UPDATE
or DELETE, so the code would be better to include a sanity Assert.

SUGGESTION
if (change->data.tp.oldtuple)
{
Assert(action == REORDER_BUFFER_CHANGE_UPDATE || action ==
REORDER_BUFFER_CHANGE_DELETE);
...

It might be fine but I am not sure if it's necessary to add this in this
patch as we don't have such assertion before.

The Asserts are just for sanity and self-documentation regarding what
actions can get into this logic. IMO including them does no harm,
rather it does some small amount of good, so why not do it?

You can't really use the fact they were not there before as a reason
to not add them now -- There were no Asserts in the original code
because this same logic was duplicated multiple times and was always
within obvious scope of a particular switch (action) case:

~

Apart from the question of the Asserts, I have no more review comments
for this patch.

(FYI - patch v4 applied cleanly and the regression tests and TAP
subscription tests all pass OK)

------
Kind Regards,
Peter Smith.

#10Amit Kapila
amit.kapila16@gmail.com
In reply to: Peter Smith (#9)
Re: Simplify some codes in pgoutput

On Thu, Mar 30, 2023 at 11:12 AM Peter Smith <smithpb2250@gmail.com> wrote:

5.
AFAIK, the "if (change->data.tp.oldtuple)" can only be true for UPDATE
or DELETE, so the code would be better to include a sanity Assert.

SUGGESTION
if (change->data.tp.oldtuple)
{
Assert(action == REORDER_BUFFER_CHANGE_UPDATE || action ==
REORDER_BUFFER_CHANGE_DELETE);
...

It might be fine but I am not sure if it's necessary to add this in this
patch as we don't have such assertion before.

The Asserts are just for sanity and self-documentation regarding what
actions can get into this logic. IMO including them does no harm,
rather it does some small amount of good, so why not do it?

You can't really use the fact they were not there before as a reason
to not add them now -- There were no Asserts in the original code
because this same logic was duplicated multiple times and was always
within obvious scope of a particular switch (action) case:

I see your point but like Hou-San I am also not really sure if these
new Asserts will be better. The patch looks good to me, so will push
in some time.

--
With Regards,
Amit Kapila.