From 0f066cefc8520af93ece7ea10618d037005af5dc Mon Sep 17 00:00:00 2001 From: "shiy.fnst" Date: Wed, 8 Jun 2022 11:10:21 +0800 Subject: [PATCH v2 1/2] Fix partition map cache issues. 1. Fix the bad structure in logicalrep_partmap_invalidate_cb(). 2. Check whether the entry is valid in logicalrep_partition_open(). 3. Update partition map cache when the publisher send new relation mapping. Author: Shi yu, Hou Zhijie --- src/backend/replication/logical/relation.c | 120 ++++++++++++++------- src/test/subscription/t/013_partition.pl | 48 ++++++++- 2 files changed, 125 insertions(+), 43 deletions(-) diff --git a/src/backend/replication/logical/relation.c b/src/backend/replication/logical/relation.c index 80fb561a9a..85712fb0f4 100644 --- a/src/backend/replication/logical/relation.c +++ b/src/backend/replication/logical/relation.c @@ -147,6 +147,66 @@ logicalrep_relmap_free_entry(LogicalRepRelMapEntry *entry) pfree(entry->attrmap); } +/* + * Update remote relation information in the relation map entry. + */ +static void +logicalrep_update_remoterel(LogicalRepRelMapEntry *entry, + LogicalRepRelation *remoterel) +{ + int i; + + entry->remoterel.remoteid = remoterel->remoteid; + entry->remoterel.nspname = pstrdup(remoterel->nspname); + entry->remoterel.relname = pstrdup(remoterel->relname); + entry->remoterel.natts = remoterel->natts; + entry->remoterel.attnames = palloc(remoterel->natts * sizeof(char *)); + entry->remoterel.atttyps = palloc(remoterel->natts * sizeof(Oid)); + for (i = 0; i < remoterel->natts; i++) + { + entry->remoterel.attnames[i] = pstrdup(remoterel->attnames[i]); + entry->remoterel.atttyps[i] = remoterel->atttyps[i]; + } + entry->remoterel.replident = remoterel->replident; + entry->remoterel.attkeys = bms_copy(remoterel->attkeys); +} + +/* + * Invalidate the existing entry in the partition map. + * + * Called when new relation mapping is sent by the publisher to update our + * expected view of incoming data from said publisher. + */ +static void +logicalrep_partmap_invalidate(LogicalRepRelation *remoterel) +{ + MemoryContext oldctx; + HASH_SEQ_STATUS status; + LogicalRepPartMapEntry *part_entry; + LogicalRepRelMapEntry *entry; + + if (LogicalRepPartMap == NULL) + return; + + hash_seq_init(&status, LogicalRepPartMap); + while ((part_entry = (LogicalRepPartMapEntry *) hash_seq_search(&status)) != NULL) + { + entry = &part_entry->relmapentry; + + if (entry->remoterel.remoteid != remoterel->remoteid) + continue; + + logicalrep_relmap_free_entry(entry); + + memset(entry, 0, sizeof(LogicalRepRelMapEntry)); + + /* Make cached copy of the data */ + oldctx = MemoryContextSwitchTo(LogicalRepPartMapContext); + logicalrep_update_remoterel(entry, remoterel); + MemoryContextSwitchTo(oldctx); + } +} + /* * Add new entry or update existing entry in the relation map cache. * @@ -159,7 +219,6 @@ logicalrep_relmap_update(LogicalRepRelation *remoterel) MemoryContext oldctx; LogicalRepRelMapEntry *entry; bool found; - int i; if (LogicalRepRelMap == NULL) logicalrep_relmap_init(); @@ -177,20 +236,11 @@ logicalrep_relmap_update(LogicalRepRelation *remoterel) /* Make cached copy of the data */ oldctx = MemoryContextSwitchTo(LogicalRepRelMapContext); - entry->remoterel.remoteid = remoterel->remoteid; - entry->remoterel.nspname = pstrdup(remoterel->nspname); - entry->remoterel.relname = pstrdup(remoterel->relname); - entry->remoterel.natts = remoterel->natts; - entry->remoterel.attnames = palloc(remoterel->natts * sizeof(char *)); - entry->remoterel.atttyps = palloc(remoterel->natts * sizeof(Oid)); - for (i = 0; i < remoterel->natts; i++) - { - entry->remoterel.attnames[i] = pstrdup(remoterel->attnames[i]); - entry->remoterel.atttyps[i] = remoterel->atttyps[i]; - } - entry->remoterel.replident = remoterel->replident; - entry->remoterel.attkeys = bms_copy(remoterel->attkeys); + logicalrep_update_remoterel(entry, remoterel); MemoryContextSwitchTo(oldctx); + + /* Invalidate the corresponding partition map as well */ + logicalrep_partmap_invalidate(remoterel); } /* @@ -451,7 +501,7 @@ logicalrep_rel_close(LogicalRepRelMapEntry *rel, LOCKMODE lockmode) static void logicalrep_partmap_invalidate_cb(Datum arg, Oid reloid) { - LogicalRepRelMapEntry *entry; + LogicalRepPartMapEntry *entry; /* Just to be sure. */ if (LogicalRepPartMap == NULL) @@ -464,11 +514,11 @@ logicalrep_partmap_invalidate_cb(Datum arg, Oid reloid) hash_seq_init(&status, LogicalRepPartMap); /* TODO, use inverse lookup hashtable? */ - while ((entry = (LogicalRepRelMapEntry *) hash_seq_search(&status)) != NULL) + while ((entry = (LogicalRepPartMapEntry *) hash_seq_search(&status)) != NULL) { - if (entry->localreloid == reloid) + if (entry->relmapentry.localreloid == reloid) { - entry->localrelvalid = false; + entry->relmapentry.localrelvalid = false; hash_seq_term(&status); break; } @@ -481,8 +531,8 @@ logicalrep_partmap_invalidate_cb(Datum arg, Oid reloid) hash_seq_init(&status, LogicalRepPartMap); - while ((entry = (LogicalRepRelMapEntry *) hash_seq_search(&status)) != NULL) - entry->localrelvalid = false; + while ((entry = (LogicalRepPartMapEntry *) hash_seq_search(&status)) != NULL) + entry->relmapentry.localrelvalid = false; } } @@ -534,7 +584,6 @@ logicalrep_partition_open(LogicalRepRelMapEntry *root, Oid partOid = RelationGetRelid(partrel); AttrMap *attrmap = root->attrmap; bool found; - int i; MemoryContext oldctx; if (LogicalRepPartMap == NULL) @@ -545,31 +594,24 @@ logicalrep_partition_open(LogicalRepRelMapEntry *root, (void *) &partOid, HASH_ENTER, &found); - if (found) - return &part_entry->relmapentry; + entry = &part_entry->relmapentry; - memset(part_entry, 0, sizeof(LogicalRepPartMapEntry)); + if (found && entry->localrelvalid) + return entry; /* Switch to longer-lived context. */ oldctx = MemoryContextSwitchTo(LogicalRepPartMapContext); - part_entry->partoid = partOid; - - /* Remote relation is copied as-is from the root entry. */ - entry = &part_entry->relmapentry; - entry->remoterel.remoteid = remoterel->remoteid; - entry->remoterel.nspname = pstrdup(remoterel->nspname); - entry->remoterel.relname = pstrdup(remoterel->relname); - entry->remoterel.natts = remoterel->natts; - entry->remoterel.attnames = palloc(remoterel->natts * sizeof(char *)); - entry->remoterel.atttyps = palloc(remoterel->natts * sizeof(Oid)); - for (i = 0; i < remoterel->natts; i++) + if (!found) { - entry->remoterel.attnames[i] = pstrdup(remoterel->attnames[i]); - entry->remoterel.atttyps[i] = remoterel->atttyps[i]; + memset(part_entry, 0, sizeof(LogicalRepPartMapEntry)); + + part_entry->partoid = partOid; + + /* Remote relation is copied as-is from the root entry. */ + logicalrep_update_remoterel(entry, remoterel); + } - entry->remoterel.replident = remoterel->replident; - entry->remoterel.attkeys = bms_copy(remoterel->attkeys); entry->localrel = partrel; entry->localreloid = partOid; diff --git a/src/test/subscription/t/013_partition.pl b/src/test/subscription/t/013_partition.pl index e7f4a94f19..b2183e0232 100644 --- a/src/test/subscription/t/013_partition.pl +++ b/src/test/subscription/t/013_partition.pl @@ -800,9 +800,49 @@ ok( $logfile =~ qr/logical replication did not find row to be deleted in replication target relation "tab2_1"/, 'delete target row is missing in tab2_1'); -# No need for this until more tests are added. -# $node_subscriber1->append_conf('postgresql.conf', -# "log_min_messages = warning"); -# $node_subscriber1->reload; +$node_subscriber1->append_conf('postgresql.conf', + "log_min_messages = warning"); +$node_subscriber1->reload; + +# Test the case that target table in subscriber is a partitioned table. + +$node_publisher->safe_psql( + 'postgres', q{ + CREATE TABLE tab5 (a int NOT NULL, b int); + CREATE UNIQUE INDEX tab5_a_idx ON tab5 (a); + ALTER TABLE tab5 REPLICA IDENTITY USING INDEX tab5_a_idx;}); + +$node_subscriber2->safe_psql( + 'postgres', q{ + CREATE TABLE tab5 (a int NOT NULL, b int, c int) PARTITION BY LIST (a); + CREATE TABLE tab5_1 PARTITION OF tab5 DEFAULT; + CREATE UNIQUE INDEX tab5_a_idx ON tab5 (a); + ALTER TABLE tab5 REPLICA IDENTITY USING INDEX tab5_a_idx; + ALTER TABLE tab5_1 REPLICA IDENTITY USING INDEX tab5_1_a_idx;}); + +$node_subscriber2->safe_psql('postgres', + "ALTER SUBSCRIPTION sub2 REFRESH PUBLICATION"); + +# Make cache. +$node_publisher->safe_psql('postgres', "INSERT INTO tab5 VALUES (1, 1)"); +$node_publisher->safe_psql('postgres', "UPDATE tab5 SET a = 2 WHERE a = 1"); + +$node_publisher->wait_for_catchup('sub2'); + +$result = $node_subscriber2->safe_psql('postgres', + "SELECT a, b FROM tab5 ORDER BY 1"); +is($result, qq(2|1), 'updates of tab5 replicated correctly'); + +# Alter table on publisher. +$node_publisher->safe_psql('postgres', + "ALTER TABLE tab5 ADD COLUMN c INT"); + +$node_publisher->safe_psql('postgres', "UPDATE tab5 SET c = 1 WHERE a = 2"); + +$node_publisher->wait_for_catchup('sub2'); + +$result = $node_subscriber2->safe_psql('postgres', + "SELECT a, b, c FROM tab5 ORDER BY 1"); +is($result, qq(2|1|1), 'updates of tab5 replicated correctly after altering table on publisher'); done_testing(); -- 2.18.4