Bugs in pgoutput.c

Started by Tom Laneabout 4 years ago10 messages
#1Tom Lane
tgl@sss.pgh.pa.us
1 attachment(s)

Commit 6ce16088b caused me to look at pgoutput.c's handling of
cache invalidations, and I was pretty appalled by what I found.

* rel_sync_cache_relation_cb does the wrong thing when called for
a cache flush (i.e., relid == 0). Instead of invalidating all
RelationSyncCache entries as it should, it will do nothing.

* When rel_sync_cache_relation_cb does invalidate an entry,
it immediately zaps the entry->map structure, even though that
might still be in use (as per the adjacent comment that carefully
explains why this isn't safe). I'm not sure if this could lead
to a dangling-pointer core dump, but it sure seems like it could
lead to failing to translate tuples that are about to be sent.

* Similarly, rel_sync_cache_publication_cb is way too eager to
reset the pubactions flags, which would likely lead to failing
to transmit changes that we should transmit.

The attached patch fixes these things, but I'm still pretty
unhappy with the general design of the data structures in
pgoutput.c, because there is this weird random mishmash of
static variables along with a palloc'd PGOutputData struct.
This cannot work if there are ever two active LogicalDecodingContexts
in the same process. I don't think serial use of LogicalDecodingContexts
(ie, destroy one and then make another) works very well either,
because pgoutput_shutdown is a mere fig leaf that ignores all the
junk the module previously made (in CacheMemoryContext no less).
So I wonder whether either of those scenarios is possible/supported/
expected to be needed in future.

Also ... maybe I'm not looking in the right place, but I do not
see anything anywhere in logical decoding that is taking any lock
on the relation being processed. How can that be safe? In
particular, how do we know that the data collected by get_rel_sync_entry
isn't already stale by the time we return from the function?
Setting replicate_valid = true at the bottom of the function would
overwrite any notification we might have gotten from a syscache callback
while reading catalog data.

regards, tom lane

Attachments:

clean-up-pgoutput-cache-invalidation.patchtext/x-diff; charset=us-ascii; name=clean-up-pgoutput-cache-invalidation.patchDownload
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index a08da859b4..f7e991cd16 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -108,11 +108,13 @@ typedef struct RelationSyncEntry
 {
 	Oid			relid;			/* relation oid */
 
+	bool		replicate_valid;	/* overall validity flag for entry */
+
 	bool		schema_sent;
 	List	   *streamed_txns;	/* streamed toplevel transactions with this
 								 * schema */
 
-	bool		replicate_valid;
+	/* are we publishing this rel? */
 	PublicationActions pubactions;
 
 	/*
@@ -903,7 +905,9 @@ LoadPublications(List *pubnames)
 }
 
 /*
- * Publication cache invalidation callback.
+ * Publication syscache invalidation callback.
+ *
+ * Called for invalidations on pg_publication.
  */
 static void
 publication_invalidation_cb(Datum arg, int cacheid, uint32 hashvalue)
@@ -1130,13 +1134,12 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
 											  HASH_ENTER, &found);
 	Assert(entry != NULL);
 
-	/* Not found means schema wasn't sent */
+	/* initialize entry, if it's new */
 	if (!found)
 	{
-		/* immediately make a new entry valid enough to satisfy callbacks */
+		entry->replicate_valid = false;
 		entry->schema_sent = false;
 		entry->streamed_txns = NIL;
-		entry->replicate_valid = false;
 		entry->pubactions.pubinsert = entry->pubactions.pubupdate =
 			entry->pubactions.pubdelete = entry->pubactions.pubtruncate = false;
 		entry->publish_as_relid = InvalidOid;
@@ -1166,13 +1169,40 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
 		{
 			oldctx = MemoryContextSwitchTo(CacheMemoryContext);
 			if (data->publications)
+			{
 				list_free_deep(data->publications);
-
+				data->publications = NIL;
+			}
 			data->publications = LoadPublications(data->publication_names);
 			MemoryContextSwitchTo(oldctx);
 			publications_valid = true;
 		}
 
+		/*
+		 * Reset schema_sent status as the relation definition may have
+		 * changed.  Also reset pubactions to empty in case rel was dropped
+		 * from a publication.  Also free any objects that depended on the
+		 * earlier definition.
+		 */
+		entry->schema_sent = false;
+		list_free(entry->streamed_txns);
+		entry->streamed_txns = NIL;
+		entry->pubactions.pubinsert = false;
+		entry->pubactions.pubupdate = false;
+		entry->pubactions.pubdelete = false;
+		entry->pubactions.pubtruncate = false;
+		if (entry->map)
+		{
+			/*
+			 * Must free the TupleDescs contained in the map explicitly,
+			 * because free_conversion_map() doesn't.
+			 */
+			FreeTupleDesc(entry->map->indesc);
+			FreeTupleDesc(entry->map->outdesc);
+			free_conversion_map(entry->map);
+		}
+		entry->map = NULL;
+
 		/*
 		 * Build publication cache. We can't use one provided by relcache as
 		 * relcache considers all publications given relation is in, but here
@@ -1212,16 +1242,18 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
 					foreach(lc2, ancestors)
 					{
 						Oid			ancestor = lfirst_oid(lc2);
+						List	   *apubids = GetRelationPublications(ancestor);
+						List	   *aschemaPubids = GetSchemaPublications(get_rel_namespace(ancestor));
 
-						if (list_member_oid(GetRelationPublications(ancestor),
-											pub->oid) ||
-							list_member_oid(GetSchemaPublications(get_rel_namespace(ancestor)),
-											pub->oid))
+						if (list_member_oid(apubids, pub->oid) ||
+							list_member_oid(aschemaPubids, pub->oid))
 						{
 							ancestor_published = true;
 							if (pub->pubviaroot)
 								publish_as_relid = ancestor;
 						}
+						list_free(apubids);
+						list_free(aschemaPubids);
 					}
 				}
 
@@ -1251,6 +1283,7 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
 		}
 
 		list_free(pubids);
+		list_free(schemaPubids);
 
 		entry->publish_as_relid = publish_as_relid;
 		entry->replicate_valid = true;
@@ -1322,43 +1355,40 @@ rel_sync_cache_relation_cb(Datum arg, Oid relid)
 	/*
 	 * Nobody keeps pointers to entries in this hash table around outside
 	 * logical decoding callback calls - but invalidation events can come in
-	 * *during* a callback if we access the relcache in the callback. Because
-	 * of that we must mark the cache entry as invalid but not remove it from
-	 * the hash while it could still be referenced, then prune it at a later
-	 * safe point.
-	 *
-	 * Getting invalidations for relations that aren't in the table is
-	 * entirely normal, since there's no way to unregister for an invalidation
-	 * event. So we don't care if it's found or not.
+	 * *during* a callback if we do any syscache or table access in the
+	 * callback.  Because of that we must mark the cache entry as invalid but
+	 * not damage any of its substructure here.  The next get_rel_sync_entry()
+	 * call will rebuild it all.
 	 */
-	entry = (RelationSyncEntry *) hash_search(RelationSyncCache, &relid,
-											  HASH_FIND, NULL);
-
-	/*
-	 * Reset schema sent status as the relation definition may have changed.
-	 * Also free any objects that depended on the earlier definition.
-	 */
-	if (entry != NULL)
+	if (OidIsValid(relid))
 	{
-		entry->schema_sent = false;
-		list_free(entry->streamed_txns);
-		entry->streamed_txns = NIL;
-		if (entry->map)
+		/*
+		 * Getting invalidations for relations that aren't in the table is
+		 * entirely normal.  So we don't care if it's found or not.
+		 */
+		entry = (RelationSyncEntry *) hash_search(RelationSyncCache, &relid,
+												  HASH_FIND, NULL);
+		if (entry != NULL)
+			entry->replicate_valid = false;
+	}
+	else
+	{
+		/* Whole cache must be flushed. */
+		HASH_SEQ_STATUS status;
+
+		hash_seq_init(&status, RelationSyncCache);
+		while ((entry = (RelationSyncEntry *) hash_seq_search(&status)) != NULL)
 		{
-			/*
-			 * Must free the TupleDescs contained in the map explicitly,
-			 * because free_conversion_map() doesn't.
-			 */
-			FreeTupleDesc(entry->map->indesc);
-			FreeTupleDesc(entry->map->outdesc);
-			free_conversion_map(entry->map);
+			entry->replicate_valid = false;
 		}
-		entry->map = NULL;
 	}
 }
 
 /*
  * Publication relation/schema map syscache invalidation callback
+ *
+ * Called for invalidations on pg_publication, pg_publication_rel, and
+ * pg_publication_namespace.
  */
 static void
 rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue)
@@ -1382,15 +1412,6 @@ rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue)
 	while ((entry = (RelationSyncEntry *) hash_seq_search(&status)) != NULL)
 	{
 		entry->replicate_valid = false;
-
-		/*
-		 * There might be some relations dropped from the publication so we
-		 * don't need to publish the changes for them.
-		 */
-		entry->pubactions.pubinsert = false;
-		entry->pubactions.pubupdate = false;
-		entry->pubactions.pubdelete = false;
-		entry->pubactions.pubtruncate = false;
 	}
 }
 
#2Amit Kapila
amit.kapila16@gmail.com
In reply to: Tom Lane (#1)
Re: Bugs in pgoutput.c

On Thu, Jan 6, 2022 at 3:42 AM Tom Lane <tgl@sss.pgh.pa.us> wrote:

Commit 6ce16088b caused me to look at pgoutput.c's handling of
cache invalidations, and I was pretty appalled by what I found.

* rel_sync_cache_relation_cb does the wrong thing when called for
a cache flush (i.e., relid == 0). Instead of invalidating all
RelationSyncCache entries as it should, it will do nothing.

* When rel_sync_cache_relation_cb does invalidate an entry,
it immediately zaps the entry->map structure, even though that
might still be in use (as per the adjacent comment that carefully
explains why this isn't safe). I'm not sure if this could lead
to a dangling-pointer core dump, but it sure seems like it could
lead to failing to translate tuples that are about to be sent.

* Similarly, rel_sync_cache_publication_cb is way too eager to
reset the pubactions flags, which would likely lead to failing
to transmit changes that we should transmit.

The attached patch fixes these things, but I'm still pretty
unhappy with the general design of the data structures in
pgoutput.c, because there is this weird random mishmash of
static variables along with a palloc'd PGOutputData struct.
This cannot work if there are ever two active LogicalDecodingContexts
in the same process. I don't think serial use of LogicalDecodingContexts
(ie, destroy one and then make another) works very well either,
because pgoutput_shutdown is a mere fig leaf that ignores all the
junk the module previously made (in CacheMemoryContext no less).
So I wonder whether either of those scenarios is possible/supported/
expected to be needed in future.

Also ... maybe I'm not looking in the right place, but I do not
see anything anywhere in logical decoding that is taking any lock
on the relation being processed. How can that be safe?

We don't need to acquire a lock on relation while decoding changes
from WAL because it uses a historic snapshot to build a relcache entry
and all the later changes to the rel are absorbed while decoding WAL.

It is important to not acquire a lock on user-defined relations during
decoding otherwise it could lead to deadlock as explained in the email
[1]: /messages/by-id/CAA4eK1Ks+p8wDbzhDr7yMYEWDbWFRJAd_uOY-moikc+zr9ER+g@mail.gmail.com

* Would it be better if we move all the initialization done by patch
in get_rel_sync_entry() to a separate function as I expect future
patches might need to reset more things?

*
  * logical decoding callback calls - but invalidation events can come in
- * *during* a callback if we access the relcache in the callback. Because
- * of that we must mark the cache entry as invalid but not remove it from
- * the hash while it could still be referenced, then prune it at a later
- * safe point.
- *
- * Getting invalidations for relations that aren't in the table is
- * entirely normal, since there's no way to unregister for an invalidation
- * event. So we don't care if it's found or not.
+ * *during* a callback if we do any syscache or table access in the
+ * callback.

As we don't take locks on tables, can invalidation events be accepted
during table access? I could be missing something but I see relation.c
accepts invalidation messages only when lock mode is not 'NoLock'.

[1]: /messages/by-id/CAA4eK1Ks+p8wDbzhDr7yMYEWDbWFRJAd_uOY-moikc+zr9ER+g@mail.gmail.com

--
With Regards,
Amit Kapila.

#3Tom Lane
tgl@sss.pgh.pa.us
In reply to: Amit Kapila (#2)
Re: Bugs in pgoutput.c

Amit Kapila <amit.kapila16@gmail.com> writes:

On Thu, Jan 6, 2022 at 3:42 AM Tom Lane <tgl@sss.pgh.pa.us> wrote:

Also ... maybe I'm not looking in the right place, but I do not
see anything anywhere in logical decoding that is taking any lock
on the relation being processed. How can that be safe?

We don't need to acquire a lock on relation while decoding changes
from WAL because it uses a historic snapshot to build a relcache entry
and all the later changes to the rel are absorbed while decoding WAL.

That might be okay for the system catalog entries, but I don't see
how it prevents some other session from dropping the table entirely,
thereby causing the on-disk storage to go away. Is it guaranteed
that logical decoding will never try to fetch any on-disk data?
(I can sort of believe that that might be true, but there are scary
corner cases for toasted data, such as an UPDATE that carries forward
a pre-existing toast datum.)

* Would it be better if we move all the initialization done by patch
in get_rel_sync_entry() to a separate function as I expect future
patches might need to reset more things?

Don't see that it helps particularly.

+ * *during* a callback if we do any syscache or table access in the
+ * callback.

As we don't take locks on tables, can invalidation events be accepted
during table access? I could be missing something but I see relation.c
accepts invalidation messages only when lock mode is not 'NoLock'.

The core point here is that you're assuming that NO code path taken
during logical decoding would try to take a lock. I don't believe it,
at least not unless you can point me to some debugging cross-check that
guarantees it.

Given that we're interested in historic not current snapshots, I can
buy that it might be workable to manage syscache invalidations totally
differently than the way it's done in normal processing, in which case
(*if* it's done like that) maybe no invals would need to be recognized
while an output plugin is executing. But (a) the comment here is
entirely wrong if that's so, and (b) I don't see anything in inval.c
that makes it work differently.

regards, tom lane

#4Robert Haas
robertmhaas@gmail.com
In reply to: Tom Lane (#3)
Re: Bugs in pgoutput.c

On Thu, Jan 6, 2022 at 2:58 PM Tom Lane <tgl@sss.pgh.pa.us> wrote:

That might be okay for the system catalog entries, but I don't see
how it prevents some other session from dropping the table entirely,
thereby causing the on-disk storage to go away. Is it guaranteed
that logical decoding will never try to fetch any on-disk data?
(I can sort of believe that that might be true, but there are scary
corner cases for toasted data, such as an UPDATE that carries forward
a pre-existing toast datum.)

If I'm not mistaken, logical decoding is only allowed to read data
from system catalog tables or tables that are flagged as being "like
system catalog tables for logical decoding purposes only". See
RelationIsAccessibleInLogicalDecoding and
RelationIsUsedAsCatalogTable. As far as the actual table data is
concerned, it has to be reconstructed solely from the WAL.

I am not sure what locking is required here and am not taking a
position on that ... but it's definitely not the case that a logical
decoding plugin can decide to just read from any old table it likes.

--
Robert Haas
EDB: http://www.enterprisedb.com

#5Amit Kapila
amit.kapila16@gmail.com
In reply to: Tom Lane (#3)
Re: Bugs in pgoutput.c

On Fri, Jan 7, 2022 at 1:28 AM Tom Lane <tgl@sss.pgh.pa.us> wrote:

Amit Kapila <amit.kapila16@gmail.com> writes:

+ * *during* a callback if we do any syscache or table access in the
+ * callback.

As we don't take locks on tables, can invalidation events be accepted
during table access? I could be missing something but I see relation.c
accepts invalidation messages only when lock mode is not 'NoLock'.

The core point here is that you're assuming that NO code path taken
during logical decoding would try to take a lock. I don't believe it,
at least not unless you can point me to some debugging cross-check that
guarantees it.

AFAIK, currently, there is no such debugging cross-check in locking
APIs but we can add one to ensure that we don't acquire lock on
user-defined tables during logical decoding. As pointed by Robert, I
also don't think accessing user tables will work during logical
decoding.

Given that we're interested in historic not current snapshots, I can
buy that it might be workable to manage syscache invalidations totally
differently than the way it's done in normal processing, in which case
(*if* it's done like that) maybe no invals would need to be recognized
while an output plugin is executing. But (a) the comment here is
entirely wrong if that's so, and (b) I don't see anything in inval.c
that makes it work differently.

I think we need invalidations to work in output plugin to ensure that
the RelationSyncEntry has correct information.

--
With Regards,
Amit Kapila.

#6Amit Kapila
amit.kapila16@gmail.com
In reply to: Tom Lane (#1)
Re: Bugs in pgoutput.c

On Thu, Jan 6, 2022 at 3:42 AM Tom Lane <tgl@sss.pgh.pa.us> wrote:

Commit 6ce16088b caused me to look at pgoutput.c's handling of
cache invalidations, and I was pretty appalled by what I found.

* rel_sync_cache_relation_cb does the wrong thing when called for
a cache flush (i.e., relid == 0). Instead of invalidating all
RelationSyncCache entries as it should, it will do nothing.

* When rel_sync_cache_relation_cb does invalidate an entry,
it immediately zaps the entry->map structure, even though that
might still be in use (as per the adjacent comment that carefully
explains why this isn't safe). I'm not sure if this could lead
to a dangling-pointer core dump, but it sure seems like it could
lead to failing to translate tuples that are about to be sent.

* Similarly, rel_sync_cache_publication_cb is way too eager to
reset the pubactions flags, which would likely lead to failing
to transmit changes that we should transmit.

Are you planning to proceed with this patch? AFAICS, this is good to
go. Yesterday, while debugging/analyzing one cfbot failure[1]https://cirrus-ci.com/task/5450648090050560?logs=test_world#L3975 with one
of my colleagues for row filter patch [2]https://commitfest.postgresql.org/36/2906/, we have seen the problem
due to the exact reason (second reason) you outlined here. After using
your patch and adapting the row filter patch atop it we see problem
got fixed.

[1]: https://cirrus-ci.com/task/5450648090050560?logs=test_world#L3975
[2]: https://commitfest.postgresql.org/36/2906/

--
With Regards,
Amit Kapila.

#7Amit Kapila
amit.kapila16@gmail.com
In reply to: Amit Kapila (#6)
Re: Bugs in pgoutput.c

On Sat, Jan 29, 2022 at 8:32 AM Amit Kapila <amit.kapila16@gmail.com> wrote:

On Thu, Jan 6, 2022 at 3:42 AM Tom Lane <tgl@sss.pgh.pa.us> wrote:

Commit 6ce16088b caused me to look at pgoutput.c's handling of
cache invalidations, and I was pretty appalled by what I found.

* rel_sync_cache_relation_cb does the wrong thing when called for
a cache flush (i.e., relid == 0). Instead of invalidating all
RelationSyncCache entries as it should, it will do nothing.

* When rel_sync_cache_relation_cb does invalidate an entry,
it immediately zaps the entry->map structure, even though that
might still be in use (as per the adjacent comment that carefully
explains why this isn't safe). I'm not sure if this could lead
to a dangling-pointer core dump, but it sure seems like it could
lead to failing to translate tuples that are about to be sent.

* Similarly, rel_sync_cache_publication_cb is way too eager to
reset the pubactions flags, which would likely lead to failing
to transmit changes that we should transmit.

Are you planning to proceed with this patch?

Tom, is it okay for you if I go ahead with this patch after some testing?

--
With Regards,
Amit Kapila.

#8Tom Lane
tgl@sss.pgh.pa.us
In reply to: Amit Kapila (#7)
Re: Bugs in pgoutput.c

Amit Kapila <amit.kapila16@gmail.com> writes:

Tom, is it okay for you if I go ahead with this patch after some testing?

I've been too busy to get back to it, so sure.

regards, tom lane

#9Amit Kapila
amit.kapila16@gmail.com
In reply to: Tom Lane (#8)
1 attachment(s)
Re: Bugs in pgoutput.c

On Thu, Feb 3, 2022 at 8:18 AM Tom Lane <tgl@sss.pgh.pa.us> wrote:

Amit Kapila <amit.kapila16@gmail.com> writes:

Tom, is it okay for you if I go ahead with this patch after some testing?

I've been too busy to get back to it, so sure.

Thanks. I have tested the patch by generating an invalidation message
for table DDL before accessing the syscache in
logicalrep_write_tuple(). I see that it correctly invalidates the
entry and rebuilds it for the next operation. I couldn't come up with
some automatic test for it so used the debugger to test it. I have
made a minor change in one of the comments. I am planning to push this
tomorrow unless there are comments or suggestions.

--
With Regards,
Amit Kapila.

Attachments:

v2-0001-Improve-invalidation-handling-in-pgoutput.c.patchapplication/octet-stream; name=v2-0001-Improve-invalidation-handling-in-pgoutput.c.patchDownload
From 91860283e0a5f6bca9e242c5be3162b4361751ca Mon Sep 17 00:00:00 2001
From: Amit Kapila <akapila@postgresql.org>
Date: Thu, 3 Feb 2022 14:55:37 +0530
Subject: [PATCH v2] Improve invalidation handling in pgoutput.c.

Fix the following issues in pgoutput.c:

* rel_sync_cache_relation_cb does the wrong thing when called for a cache
flush (i.e., relid == 0). Instead of invalidating all RelationSyncCache
entries as it should, it does nothing.

* When rel_sync_cache_relation_cb does invalidate an entry, it immediately
zaps the entry->map structure, even though that might still be in use. We
instead just mark the entry as invalid and rebuild it at a later safe
point.

* Similarly, rel_sync_cache_publication_cb is way too eager to reset the
pubactions flags, which would likely lead to failing to transmit changes
that we should transmit. In this case also, we just mark the entry as
invalid and rebuild it at a later safe point.

Author: Tom Lane
Reviewed-by: Amit Kapila
Discussion: https://postgr.es/m/885288.1641420714@sss.pgh.pa.us
---
 src/backend/replication/pgoutput/pgoutput.c | 115 ++++++++++++--------
 1 file changed, 68 insertions(+), 47 deletions(-)

diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index af8d51aee9..6df705f90f 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -108,11 +108,13 @@ typedef struct RelationSyncEntry
 {
 	Oid			relid;			/* relation oid */
 
+	bool		replicate_valid;	/* overall validity flag for entry */
+
 	bool		schema_sent;
 	List	   *streamed_txns;	/* streamed toplevel transactions with this
 								 * schema */
 
-	bool		replicate_valid;
+	/* are we publishing this rel? */
 	PublicationActions pubactions;
 
 	/*
@@ -903,7 +905,9 @@ LoadPublications(List *pubnames)
 }
 
 /*
- * Publication cache invalidation callback.
+ * Publication syscache invalidation callback.
+ *
+ * Called for invalidations on pg_publication.
  */
 static void
 publication_invalidation_cb(Datum arg, int cacheid, uint32 hashvalue)
@@ -1130,13 +1134,12 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
 											  HASH_ENTER, &found);
 	Assert(entry != NULL);
 
-	/* Not found means schema wasn't sent */
+	/* initialize entry, if it's new */
 	if (!found)
 	{
-		/* immediately make a new entry valid enough to satisfy callbacks */
+		entry->replicate_valid = false;
 		entry->schema_sent = false;
 		entry->streamed_txns = NIL;
-		entry->replicate_valid = false;
 		entry->pubactions.pubinsert = entry->pubactions.pubupdate =
 			entry->pubactions.pubdelete = entry->pubactions.pubtruncate = false;
 		entry->publish_as_relid = InvalidOid;
@@ -1166,13 +1169,40 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
 		{
 			oldctx = MemoryContextSwitchTo(CacheMemoryContext);
 			if (data->publications)
+			{
 				list_free_deep(data->publications);
-
+				data->publications = NIL;
+			}
 			data->publications = LoadPublications(data->publication_names);
 			MemoryContextSwitchTo(oldctx);
 			publications_valid = true;
 		}
 
+		/*
+		 * Reset schema_sent status as the relation definition may have
+		 * changed.  Also reset pubactions to empty in case rel was dropped
+		 * from a publication.  Also free any objects that depended on the
+		 * earlier definition.
+		 */
+		entry->schema_sent = false;
+		list_free(entry->streamed_txns);
+		entry->streamed_txns = NIL;
+		entry->pubactions.pubinsert = false;
+		entry->pubactions.pubupdate = false;
+		entry->pubactions.pubdelete = false;
+		entry->pubactions.pubtruncate = false;
+		if (entry->map)
+		{
+			/*
+			 * Must free the TupleDescs contained in the map explicitly,
+			 * because free_conversion_map() doesn't.
+			 */
+			FreeTupleDesc(entry->map->indesc);
+			FreeTupleDesc(entry->map->outdesc);
+			free_conversion_map(entry->map);
+		}
+		entry->map = NULL;
+
 		/*
 		 * Build publication cache. We can't use one provided by relcache as
 		 * relcache considers all publications given relation is in, but here
@@ -1212,16 +1242,18 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
 					foreach(lc2, ancestors)
 					{
 						Oid			ancestor = lfirst_oid(lc2);
+						List	   *apubids = GetRelationPublications(ancestor);
+						List	   *aschemaPubids = GetSchemaPublications(get_rel_namespace(ancestor));
 
-						if (list_member_oid(GetRelationPublications(ancestor),
-											pub->oid) ||
-							list_member_oid(GetSchemaPublications(get_rel_namespace(ancestor)),
-											pub->oid))
+						if (list_member_oid(apubids, pub->oid) ||
+							list_member_oid(aschemaPubids, pub->oid))
 						{
 							ancestor_published = true;
 							if (pub->pubviaroot)
 								publish_as_relid = ancestor;
 						}
+						list_free(apubids);
+						list_free(aschemaPubids);
 					}
 				}
 
@@ -1251,6 +1283,7 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
 		}
 
 		list_free(pubids);
+		list_free(schemaPubids);
 
 		entry->publish_as_relid = publish_as_relid;
 		entry->replicate_valid = true;
@@ -1322,43 +1355,40 @@ rel_sync_cache_relation_cb(Datum arg, Oid relid)
 	/*
 	 * Nobody keeps pointers to entries in this hash table around outside
 	 * logical decoding callback calls - but invalidation events can come in
-	 * *during* a callback if we access the relcache in the callback. Because
-	 * of that we must mark the cache entry as invalid but not remove it from
-	 * the hash while it could still be referenced, then prune it at a later
-	 * safe point.
-	 *
-	 * Getting invalidations for relations that aren't in the table is
-	 * entirely normal, since there's no way to unregister for an invalidation
-	 * event. So we don't care if it's found or not.
+	 * *during* a callback if we do any syscache access in the callback.
+	 * Because of that we must mark the cache entry as invalid but not damage
+	 * any of its substructure here.  The next get_rel_sync_entry() call will
+	 * rebuild it all.
 	 */
-	entry = (RelationSyncEntry *) hash_search(RelationSyncCache, &relid,
-											  HASH_FIND, NULL);
-
-	/*
-	 * Reset schema sent status as the relation definition may have changed.
-	 * Also free any objects that depended on the earlier definition.
-	 */
-	if (entry != NULL)
+	if (OidIsValid(relid))
 	{
-		entry->schema_sent = false;
-		list_free(entry->streamed_txns);
-		entry->streamed_txns = NIL;
-		if (entry->map)
+		/*
+		 * Getting invalidations for relations that aren't in the table is
+		 * entirely normal.  So we don't care if it's found or not.
+		 */
+		entry = (RelationSyncEntry *) hash_search(RelationSyncCache, &relid,
+												  HASH_FIND, NULL);
+		if (entry != NULL)
+			entry->replicate_valid = false;
+	}
+	else
+	{
+		/* Whole cache must be flushed. */
+		HASH_SEQ_STATUS status;
+
+		hash_seq_init(&status, RelationSyncCache);
+		while ((entry = (RelationSyncEntry *) hash_seq_search(&status)) != NULL)
 		{
-			/*
-			 * Must free the TupleDescs contained in the map explicitly,
-			 * because free_conversion_map() doesn't.
-			 */
-			FreeTupleDesc(entry->map->indesc);
-			FreeTupleDesc(entry->map->outdesc);
-			free_conversion_map(entry->map);
+			entry->replicate_valid = false;
 		}
-		entry->map = NULL;
 	}
 }
 
 /*
  * Publication relation/schema map syscache invalidation callback
+ *
+ * Called for invalidations on pg_publication, pg_publication_rel, and
+ * pg_publication_namespace.
  */
 static void
 rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue)
@@ -1382,15 +1412,6 @@ rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue)
 	while ((entry = (RelationSyncEntry *) hash_seq_search(&status)) != NULL)
 	{
 		entry->replicate_valid = false;
-
-		/*
-		 * There might be some relations dropped from the publication so we
-		 * don't need to publish the changes for them.
-		 */
-		entry->pubactions.pubinsert = false;
-		entry->pubactions.pubupdate = false;
-		entry->pubactions.pubdelete = false;
-		entry->pubactions.pubtruncate = false;
 	}
 }
 
-- 
2.28.0.windows.1

#10Amit Kapila
amit.kapila16@gmail.com
In reply to: Amit Kapila (#9)
Re: Bugs in pgoutput.c

On Thu, Feb 3, 2022 at 5:24 PM Amit Kapila <amit.kapila16@gmail.com> wrote:

On Thu, Feb 3, 2022 at 8:18 AM Tom Lane <tgl@sss.pgh.pa.us> wrote:

Amit Kapila <amit.kapila16@gmail.com> writes:

Tom, is it okay for you if I go ahead with this patch after some testing?

I've been too busy to get back to it, so sure.

Thanks. I have tested the patch by generating an invalidation message
for table DDL before accessing the syscache in
logicalrep_write_tuple(). I see that it correctly invalidates the
entry and rebuilds it for the next operation. I couldn't come up with
some automatic test for it so used the debugger to test it. I have
made a minor change in one of the comments. I am planning to push this
tomorrow unless there are comments or suggestions.

Pushed!

--
With Regards,
Amit Kapila.