From e5fd69c0ec80cde40e3111fe580b8cb43ffabcda Mon Sep 17 00:00:00 2001 From: "shiy.fnst" Date: Wed, 8 Jun 2022 11:10:21 +0800 Subject: [PATCH] Fix partition map cache issues. 1. Fix the bad structure in logicalrep_partmap_invalidate_cb(). 2. Check whether the entry is valid in logicalrep_partition_open() and rebuild the entry if not. 3. Invalidate partition map cache when the publisher send new relation mapping. 4. Fix the column map build for dropped column in partition. Author: Shi yu, Hou Zhijie --- src/backend/replication/logical/relation.c | 120 +++++++++++++++++++---------- src/backend/replication/logical/worker.c | 6 ++ src/include/replication/logicalrelation.h | 1 + src/test/subscription/t/013_partition.pl | 71 ++++++++++++++++- 4 files changed, 154 insertions(+), 44 deletions(-) diff --git a/src/backend/replication/logical/relation.c b/src/backend/replication/logical/relation.c index 80fb561..34c2f53 100644 --- a/src/backend/replication/logical/relation.c +++ b/src/backend/replication/logical/relation.c @@ -148,6 +148,30 @@ logicalrep_relmap_free_entry(LogicalRepRelMapEntry *entry) } /* + * Update remote relation information in the relation map entry. + */ +static void +logicalrep_update_remoterel(LogicalRepRelMapEntry *entry, + LogicalRepRelation *remoterel) +{ + int i; + + entry->remoterel.remoteid = remoterel->remoteid; + entry->remoterel.nspname = pstrdup(remoterel->nspname); + entry->remoterel.relname = pstrdup(remoterel->relname); + entry->remoterel.natts = remoterel->natts; + entry->remoterel.attnames = palloc(remoterel->natts * sizeof(char *)); + entry->remoterel.atttyps = palloc(remoterel->natts * sizeof(Oid)); + for (i = 0; i < remoterel->natts; i++) + { + entry->remoterel.attnames[i] = pstrdup(remoterel->attnames[i]); + entry->remoterel.atttyps[i] = remoterel->atttyps[i]; + } + entry->remoterel.replident = remoterel->replident; + entry->remoterel.attkeys = bms_copy(remoterel->attkeys); +} + +/* * Add new entry or update existing entry in the relation map cache. * * Called when new relation mapping is sent by the publisher to update @@ -159,7 +183,6 @@ logicalrep_relmap_update(LogicalRepRelation *remoterel) MemoryContext oldctx; LogicalRepRelMapEntry *entry; bool found; - int i; if (LogicalRepRelMap == NULL) logicalrep_relmap_init(); @@ -177,19 +200,7 @@ logicalrep_relmap_update(LogicalRepRelation *remoterel) /* Make cached copy of the data */ oldctx = MemoryContextSwitchTo(LogicalRepRelMapContext); - entry->remoterel.remoteid = remoterel->remoteid; - entry->remoterel.nspname = pstrdup(remoterel->nspname); - entry->remoterel.relname = pstrdup(remoterel->relname); - entry->remoterel.natts = remoterel->natts; - entry->remoterel.attnames = palloc(remoterel->natts * sizeof(char *)); - entry->remoterel.atttyps = palloc(remoterel->natts * sizeof(Oid)); - for (i = 0; i < remoterel->natts; i++) - { - entry->remoterel.attnames[i] = pstrdup(remoterel->attnames[i]); - entry->remoterel.atttyps[i] = remoterel->atttyps[i]; - } - entry->remoterel.replident = remoterel->replident; - entry->remoterel.attkeys = bms_copy(remoterel->attkeys); + logicalrep_update_remoterel(entry, remoterel); MemoryContextSwitchTo(oldctx); } @@ -451,7 +462,7 @@ logicalrep_rel_close(LogicalRepRelMapEntry *rel, LOCKMODE lockmode) static void logicalrep_partmap_invalidate_cb(Datum arg, Oid reloid) { - LogicalRepRelMapEntry *entry; + LogicalRepPartMapEntry *entry; /* Just to be sure. */ if (LogicalRepPartMap == NULL) @@ -464,11 +475,11 @@ logicalrep_partmap_invalidate_cb(Datum arg, Oid reloid) hash_seq_init(&status, LogicalRepPartMap); /* TODO, use inverse lookup hashtable? */ - while ((entry = (LogicalRepRelMapEntry *) hash_seq_search(&status)) != NULL) + while ((entry = (LogicalRepPartMapEntry *) hash_seq_search(&status)) != NULL) { - if (entry->localreloid == reloid) + if (entry->relmapentry.localreloid == reloid) { - entry->localrelvalid = false; + entry->relmapentry.localrelvalid = false; hash_seq_term(&status); break; } @@ -481,8 +492,42 @@ logicalrep_partmap_invalidate_cb(Datum arg, Oid reloid) hash_seq_init(&status, LogicalRepPartMap); - while ((entry = (LogicalRepRelMapEntry *) hash_seq_search(&status)) != NULL) - entry->localrelvalid = false; + while ((entry = (LogicalRepPartMapEntry *) hash_seq_search(&status)) != NULL) + entry->relmapentry.localrelvalid = false; + } +} + +/* + * Invalidate the entries in the partition map that refer to remoterel + * + * Called when new relation mapping is sent by the publisher to update our + * expected view of incoming data from said publisher. + * + * Note that we don't update the remoterel information in the entry here, + * we will update the information in logicalrep_partition_open to save + * unnecessary work. + */ +void +logicalrep_partmap_invalidate(LogicalRepRelation *remoterel) +{ + HASH_SEQ_STATUS status; + LogicalRepPartMapEntry *part_entry; + LogicalRepRelMapEntry *entry; + + if (LogicalRepPartMap == NULL) + return; + + hash_seq_init(&status, LogicalRepPartMap); + while ((part_entry = (LogicalRepPartMapEntry *) hash_seq_search(&status)) != NULL) + { + entry = &part_entry->relmapentry; + + if (entry->remoterel.remoteid != remoterel->remoteid) + continue; + + logicalrep_relmap_free_entry(entry); + + memset(entry, 0, sizeof(LogicalRepRelMapEntry)); } } @@ -534,7 +579,6 @@ logicalrep_partition_open(LogicalRepRelMapEntry *root, Oid partOid = RelationGetRelid(partrel); AttrMap *attrmap = root->attrmap; bool found; - int i; MemoryContext oldctx; if (LogicalRepPartMap == NULL) @@ -545,31 +589,23 @@ logicalrep_partition_open(LogicalRepRelMapEntry *root, (void *) &partOid, HASH_ENTER, &found); - if (found) - return &part_entry->relmapentry; + entry = &part_entry->relmapentry; - memset(part_entry, 0, sizeof(LogicalRepPartMapEntry)); + if (found && entry->localrelvalid) + return entry; /* Switch to longer-lived context. */ oldctx = MemoryContextSwitchTo(LogicalRepPartMapContext); - part_entry->partoid = partOid; - - /* Remote relation is copied as-is from the root entry. */ - entry = &part_entry->relmapentry; - entry->remoterel.remoteid = remoterel->remoteid; - entry->remoterel.nspname = pstrdup(remoterel->nspname); - entry->remoterel.relname = pstrdup(remoterel->relname); - entry->remoterel.natts = remoterel->natts; - entry->remoterel.attnames = palloc(remoterel->natts * sizeof(char *)); - entry->remoterel.atttyps = palloc(remoterel->natts * sizeof(Oid)); - for (i = 0; i < remoterel->natts; i++) + if (!found) { - entry->remoterel.attnames[i] = pstrdup(remoterel->attnames[i]); - entry->remoterel.atttyps[i] = remoterel->atttyps[i]; + memset(part_entry, 0, sizeof(LogicalRepPartMapEntry)); + part_entry->partoid = partOid; } - entry->remoterel.replident = remoterel->replident; - entry->remoterel.attkeys = bms_copy(remoterel->attkeys); + + /* Remote relation is copied as-is from the root entry. */ + if (!entry->remoterel.remoteid) + logicalrep_update_remoterel(entry, remoterel); entry->localrel = partrel; entry->localreloid = partOid; @@ -594,7 +630,11 @@ logicalrep_partition_open(LogicalRepRelMapEntry *root, { AttrNumber root_attno = map->attnums[attno]; - entry->attrmap->attnums[attno] = attrmap->attnums[root_attno - 1]; + /* 0 means it's a dropped attribute */ + if (root_attno == 0) + entry->attrmap->attnums[attno] = -1; + else + entry->attrmap->attnums[attno] = attrmap->attnums[root_attno - 1]; } } else diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index fc210a9..81ce2e9 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -1562,6 +1562,12 @@ apply_handle_relation(StringInfo s) rel = logicalrep_read_rel(s); logicalrep_relmap_update(rel); + + /* + * Also invalidate all entries in the partition map that refer to + * remoterel. + */ + logicalrep_partmap_invalidate(rel); } /* diff --git a/src/include/replication/logicalrelation.h b/src/include/replication/logicalrelation.h index 7bf8cd2..71a1fa7 100644 --- a/src/include/replication/logicalrelation.h +++ b/src/include/replication/logicalrelation.h @@ -38,6 +38,7 @@ typedef struct LogicalRepRelMapEntry } LogicalRepRelMapEntry; extern void logicalrep_relmap_update(LogicalRepRelation *remoterel); +extern void logicalrep_partmap_invalidate(LogicalRepRelation *remoterel); extern LogicalRepRelMapEntry *logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode); diff --git a/src/test/subscription/t/013_partition.pl b/src/test/subscription/t/013_partition.pl index e7f4a94..daff675 100644 --- a/src/test/subscription/t/013_partition.pl +++ b/src/test/subscription/t/013_partition.pl @@ -800,9 +800,72 @@ ok( $logfile =~ qr/logical replication did not find row to be deleted in replication target relation "tab2_1"/, 'delete target row is missing in tab2_1'); -# No need for this until more tests are added. -# $node_subscriber1->append_conf('postgresql.conf', -# "log_min_messages = warning"); -# $node_subscriber1->reload; +$node_subscriber1->append_conf('postgresql.conf', + "log_min_messages = warning"); +$node_subscriber1->reload; + +# Test the case that target table on subscriber is a partitioned table and +# check that the changes are replicated correctly after changing the schema of +# table on publisher and subcriber. + +$node_publisher->safe_psql( + 'postgres', q{ + CREATE TABLE tab5 (a int NOT NULL, b int); + CREATE UNIQUE INDEX tab5_a_idx ON tab5 (a); + ALTER TABLE tab5 REPLICA IDENTITY USING INDEX tab5_a_idx;}); + +$node_subscriber2->safe_psql( + 'postgres', q{ + CREATE TABLE tab5 (a int NOT NULL, b int, c int) PARTITION BY LIST (a); + CREATE TABLE tab5_1 PARTITION OF tab5 DEFAULT; + CREATE UNIQUE INDEX tab5_a_idx ON tab5 (a); + ALTER TABLE tab5 REPLICA IDENTITY USING INDEX tab5_a_idx; + ALTER TABLE tab5_1 REPLICA IDENTITY USING INDEX tab5_1_a_idx;}); + +$node_subscriber2->safe_psql('postgres', + "ALTER SUBSCRIPTION sub2 REFRESH PUBLICATION"); + +$node_subscriber2->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; + +# Make partition map cache +$node_publisher->safe_psql('postgres', "INSERT INTO tab5 VALUES (1, 1)"); +$node_publisher->safe_psql('postgres', "UPDATE tab5 SET a = 2 WHERE a = 1"); + +$node_publisher->wait_for_catchup('sub2'); + +$result = $node_subscriber2->safe_psql('postgres', + "SELECT a, b FROM tab5 ORDER BY 1"); +is($result, qq(2|1), 'updates of tab5 replicated correctly'); + +# Change the column order of table on publisher +$node_publisher->safe_psql( + 'postgres', q{ + ALTER TABLE tab5 DROP COLUMN b, ADD COLUMN c INT; + ALTER TABLE tab5 ADD COLUMN b INT;}); + +$node_publisher->safe_psql('postgres', "UPDATE tab5 SET c = 1 WHERE a = 2"); + +$node_publisher->wait_for_catchup('sub2'); + +$result = $node_subscriber2->safe_psql('postgres', + "SELECT a, b, c FROM tab5 ORDER BY 1"); +is($result, qq(2||1), 'updates of tab5 replicated correctly after altering table on publisher'); + +# Change the column order of partition on subscriber +$node_subscriber2->safe_psql( + 'postgres', q{ + ALTER TABLE tab5 DETACH PARTITION tab5_1; + ALTER TABLE tab5_1 DROP COLUMN b; + ALTER TABLE tab5_1 ADD COLUMN b int; + ALTER TABLE tab5 ATTACH PARTITION tab5_1 DEFAULT}); + +$node_publisher->safe_psql('postgres', "UPDATE tab5 SET a = 3 WHERE a = 2"); + +$node_publisher->wait_for_catchup('sub2'); + +$result = $node_subscriber2->safe_psql('postgres', + "SELECT a, b, c FROM tab5 ORDER BY 1"); +is($result, qq(3||1), 'updates of tab5 replicated correctly after altering table on subscriber'); done_testing(); -- 2.7.2.windows.1