From 4253c364838daf26c056c56d693bc00b1e3e8f73 Mon Sep 17 00:00:00 2001
From: Tomas Vondra <tomas.vondra@postgresql.org>
Date: Thu, 16 Nov 2023 17:46:14 +0100
Subject: [PATCH 1/3] debug logging

---
 src/backend/catalog/pg_subscription.c       |  8 ++++++
 src/backend/replication/logical/worker.c    | 11 ++++++++
 src/backend/replication/pgoutput/pgoutput.c | 29 ++++++++++++++++++++-
 3 files changed, 47 insertions(+), 1 deletion(-)

diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c
index d6a978f1362..0a2a644c293 100644
--- a/src/backend/catalog/pg_subscription.c
+++ b/src/backend/catalog/pg_subscription.c
@@ -238,6 +238,8 @@ AddSubscriptionRelState(Oid subid, Oid relid, char state,
 	bool		nulls[Natts_pg_subscription_rel];
 	Datum		values[Natts_pg_subscription_rel];
 
+	elog(LOG, "AddSubscriptionRelState relid %d state %c LSN %X/%X", relid, state, LSN_FORMAT_ARGS(sublsn));
+
 	LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
 
 	rel = table_open(SubscriptionRelRelationId, RowExclusiveLock);
@@ -285,6 +287,8 @@ UpdateSubscriptionRelState(Oid subid, Oid relid, char state,
 	Datum		values[Natts_pg_subscription_rel];
 	bool		replaces[Natts_pg_subscription_rel];
 
+	elog(LOG, "UpdateSubscriptionRelState relid %d state %c LSN %X/%X", relid, state, LSN_FORMAT_ARGS(sublsn));
+
 	LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
 
 	rel = table_open(SubscriptionRelRelationId, RowExclusiveLock);
@@ -369,6 +373,8 @@ GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn)
 
 	table_close(rel, AccessShareLock);
 
+	elog(LOG, "GetSubscriptionRelState relid %d state %c LSN %X/%X", relid, substate, LSN_FORMAT_ARGS(*sublsn));
+
 	return substate;
 }
 
@@ -531,6 +537,8 @@ GetSubscriptionRelations(Oid subid, bool not_ready)
 		else
 			relstate->lsn = DatumGetLSN(d);
 
+		elog(LOG, "GetSubscriptionRelations relid %d state %c LSN %X/%X", relstate->relid, relstate->state, LSN_FORMAT_ARGS(relstate->lsn));
+
 		res = lappend(res, relstate);
 	}
 
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 52a9f136ab9..42e5423172b 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -503,6 +503,10 @@ should_apply_changes_for_rel(LogicalRepRelMapEntry *rel)
 			return rel->state == SUBREL_STATE_READY;
 
 		case WORKERTYPE_APPLY:
+			elog(LOG, "should_apply_changes_for_rel relid %d return %d", RelationGetRelid(rel->localrel), (rel->state == SUBREL_STATE_READY ||
+					(rel->state == SUBREL_STATE_SYNCDONE &&
+					 rel->statelsn <= remote_final_lsn)));
+
 			return (rel->state == SUBREL_STATE_READY ||
 					(rel->state == SUBREL_STATE_SYNCDONE &&
 					 rel->statelsn <= remote_final_lsn));
@@ -2398,20 +2402,27 @@ apply_handle_insert(StringInfo s)
 	MemoryContext oldctx;
 	bool		run_as_owner;
 
+	elog(LOG, "apply_handle_insert");
+
 	/*
 	 * Quick return if we are skipping data modification changes or handling
 	 * streamed transactions.
 	 */
 	if (is_skipping_changes() ||
 		handle_streamed_transaction(LOGICAL_REP_MSG_INSERT, s))
+	{
+		elog(LOG, "apply_handle_insert / skipping changes or streaming");
 		return;
+	}
 
 	begin_replication_step();
 
 	relid = logicalrep_read_insert(s, &newtup);
 	rel = logicalrep_rel_open(relid, RowExclusiveLock);
+	elog(LOG, "apply_handle_insert relid %d state %c lsn %X/%X final %X/%X", RelationGetRelid(rel->localrel), rel->state, LSN_FORMAT_ARGS(rel->statelsn), LSN_FORMAT_ARGS(remote_final_lsn));
 	if (!should_apply_changes_for_rel(rel))
 	{
+		elog(LOG, "apply_handle_insert relid %d skipping", RelationGetRelid(rel->localrel));
 		/*
 		 * The relation can't become interesting in the middle of the
 		 * transaction so it's safe to unlock it.
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index e8add5ee5d9..09f53bea0a0 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -1409,9 +1409,12 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 	ReorderBufferChangeType action = change->action;
 	TupleTableSlot *old_slot = NULL;
 	TupleTableSlot *new_slot = NULL;
-
+elog(LOG, "pgoutput_change relid %d LSN %X/%X", RelationGetRelid(relation), LSN_FORMAT_ARGS(change->lsn));
 	if (!is_publishable_relation(relation))
+	{
+		elog(LOG, "pgoutput_change relid %d LSN %X/%X not publishable", RelationGetRelid(relation), LSN_FORMAT_ARGS(change->lsn));
 		return;
+	}
 
 	/*
 	 * Remember the xid for the change in streaming mode. We need to send xid
@@ -1429,7 +1432,10 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 	{
 		case REORDER_BUFFER_CHANGE_INSERT:
 			if (!relentry->pubactions.pubinsert)
+			{
+				elog(LOG, "pgoutput_change relid %d LSN %X/%X pubinsert=false", RelationGetRelid(relation), LSN_FORMAT_ARGS(change->lsn));
 				return;
+			}
 			break;
 		case REORDER_BUFFER_CHANGE_UPDATE:
 			if (!relentry->pubactions.pubupdate)
@@ -1502,7 +1508,10 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 	 * of the row filter for old and new tuple.
 	 */
 	if (!pgoutput_row_filter(targetrel, old_slot, &new_slot, relentry, &action))
+	{
+		elog(LOG, "pgoutput_change relid %d LSN %X/%X pgoutput_row_filter", RelationGetRelid(relation), LSN_FORMAT_ARGS(change->lsn));
 		goto cleanup;
+	}
 
 	/*
 	 * Send BEGIN if we haven't yet.
@@ -1522,10 +1531,13 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 
 	OutputPluginPrepareWrite(ctx, true);
 
+	elog(LOG, "pgoutput_change relid %d LSN %X/%X sending data", RelationGetRelid(relation), LSN_FORMAT_ARGS(change->lsn));
+
 	/* Send the data */
 	switch (action)
 	{
 		case REORDER_BUFFER_CHANGE_INSERT:
+			elog(LOG, "pgoutput_change relid %d LSN %X/%X write insert message", RelationGetRelid(relation), LSN_FORMAT_ARGS(change->lsn));
 			logicalrep_write_insert(ctx->out, xid, targetrel, new_slot,
 									data->binary, relentry->columns);
 			break;
@@ -1974,6 +1986,7 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
 	/* initialize entry, if it's new */
 	if (!found)
 	{
+		elog(LOG, "get_rel_sync_entry relation %d not found", RelationGetRelid(relation));
 		entry->replicate_valid = false;
 		entry->schema_sent = false;
 		entry->streamed_txns = NIL;
@@ -1988,6 +2001,8 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
 		entry->attrmap = NULL;
 	}
 
+	elog(LOG, "get_rel_sync_entry relation %d replicate_valid %d", RelationGetRelid(relation), entry->replicate_valid);
+
 	/* Validate the entry */
 	if (!entry->replicate_valid)
 	{
@@ -2007,6 +2022,8 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
 		char		relkind = get_rel_relkind(relid);
 		List	   *rel_publications = NIL;
 
+		elog(LOG, "GetRelationPublications relation %d publications %p %d", relid, pubids, list_length(pubids));
+
 		/* Reload publications if needed before use. */
 		if (!publications_valid)
 		{
@@ -2021,6 +2038,8 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
 			publications_valid = true;
 		}
 
+		elog(LOG, "get_rel_sync_entry relation %d publications %d", RelationGetRelid(relation), list_length(data->publications));
+
 		/*
 		 * Reset schema_sent status as the relation definition may have
 		 * changed.  Also reset pubactions to empty in case rel was dropped
@@ -2097,6 +2116,8 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
 				}
 			}
 
+			elog(LOG, "get_rel_sync_entry relation %d publish %d (A)", RelationGetRelid(relation), publish);
+
 			if (!publish)
 			{
 				bool		ancestor_published = false;
@@ -2128,12 +2149,16 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
 					}
 				}
 
+				elog(LOG, "get_rel_sync_entry relation %d relation pubs %d subscription pub %d", RelationGetRelid(relation), list_length(pubids), pub->oid);
+
 				if (list_member_oid(pubids, pub->oid) ||
 					list_member_oid(schemaPubids, pub->oid) ||
 					ancestor_published)
 					publish = true;
 			}
 
+			elog(LOG, "get_rel_sync_entry relation %d publish %d (B)", RelationGetRelid(relation), publish);
+
 			/*
 			 * If the relation is to be published, determine actions to
 			 * publish, and list of columns, if appropriate.
@@ -2210,6 +2235,8 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
 		entry->replicate_valid = true;
 	}
 
+	elog(LOG, "get_rel_sync_entry relation %d pubinsert %d", RelationGetRelid(relation), entry->pubactions.pubinsert);
+
 	return entry;
 }
 
-- 
2.41.0

