From af85ead5481d844efe3ccd01e8c13ff4ad63cf85 Mon Sep 17 00:00:00 2001 From: "shiy.fnst" Date: Wed, 8 Jun 2022 11:10:21 +0800 Subject: [PATCH v1 1/2] Fix partition map cache issues. 1. Fix the bad structure in logicalrep_partmap_invalidate_cb(). 2. Check whether the entry is valid in logicalrep_partition_open(). 3. Update partition map cache when the publisher send new relation mapping. Author: Shi yu, Hou Zhijie --- src/backend/replication/logical/relation.c | 119 ++++++++++++++------- 1 file changed, 81 insertions(+), 38 deletions(-) diff --git a/src/backend/replication/logical/relation.c b/src/backend/replication/logical/relation.c index 80fb561a9a..9cc94067d5 100644 --- a/src/backend/replication/logical/relation.c +++ b/src/backend/replication/logical/relation.c @@ -147,6 +147,66 @@ logicalrep_relmap_free_entry(LogicalRepRelMapEntry *entry) pfree(entry->attrmap); } +/* + * Update remote relation information in the relation map entry. + */ +static void +logicalrep_update_remoterel(LogicalRepRelMapEntry *entry, + LogicalRepRelation *remoterel) +{ + int i; + + entry->remoterel.remoteid = remoterel->remoteid; + entry->remoterel.nspname = pstrdup(remoterel->nspname); + entry->remoterel.relname = pstrdup(remoterel->relname); + entry->remoterel.natts = remoterel->natts; + entry->remoterel.attnames = palloc(remoterel->natts * sizeof(char *)); + entry->remoterel.atttyps = palloc(remoterel->natts * sizeof(Oid)); + for (i = 0; i < remoterel->natts; i++) + { + entry->remoterel.attnames[i] = pstrdup(remoterel->attnames[i]); + entry->remoterel.atttyps[i] = remoterel->atttyps[i]; + } + entry->remoterel.replident = remoterel->replident; + entry->remoterel.attkeys = bms_copy(remoterel->attkeys); +} + +/* + * Invalidate the existing entry in the partition map. + * + * Called when new relation mapping is sent by the publisher to update our + * expected view of incoming data from said publisher. + */ +static void +logicalrep_partmap_invalidate(LogicalRepRelation *remoterel) +{ + MemoryContext oldctx; + HASH_SEQ_STATUS status; + LogicalRepPartMapEntry *part_entry; + LogicalRepRelMapEntry *entry; + + if (LogicalRepPartMap == NULL) + return; + + hash_seq_init(&status, LogicalRepPartMap); + while ((part_entry = (LogicalRepPartMapEntry *) hash_seq_search(&status)) != NULL) + { + entry = &part_entry->relmapentry; + + if (entry->remoterel.remoteid != remoterel->remoteid) + continue; + + logicalrep_relmap_free_entry(entry); + + memset(entry, 0, sizeof(LogicalRepRelMapEntry)); + + /* Make cached copy of the data */ + oldctx = MemoryContextSwitchTo(LogicalRepPartMapContext); + logicalrep_update_remoterel(entry, remoterel); + MemoryContextSwitchTo(oldctx); + } +} + /* * Add new entry or update existing entry in the relation map cache. * @@ -159,7 +219,6 @@ logicalrep_relmap_update(LogicalRepRelation *remoterel) MemoryContext oldctx; LogicalRepRelMapEntry *entry; bool found; - int i; if (LogicalRepRelMap == NULL) logicalrep_relmap_init(); @@ -177,20 +236,11 @@ logicalrep_relmap_update(LogicalRepRelation *remoterel) /* Make cached copy of the data */ oldctx = MemoryContextSwitchTo(LogicalRepRelMapContext); - entry->remoterel.remoteid = remoterel->remoteid; - entry->remoterel.nspname = pstrdup(remoterel->nspname); - entry->remoterel.relname = pstrdup(remoterel->relname); - entry->remoterel.natts = remoterel->natts; - entry->remoterel.attnames = palloc(remoterel->natts * sizeof(char *)); - entry->remoterel.atttyps = palloc(remoterel->natts * sizeof(Oid)); - for (i = 0; i < remoterel->natts; i++) - { - entry->remoterel.attnames[i] = pstrdup(remoterel->attnames[i]); - entry->remoterel.atttyps[i] = remoterel->atttyps[i]; - } - entry->remoterel.replident = remoterel->replident; - entry->remoterel.attkeys = bms_copy(remoterel->attkeys); + logicalrep_update_remoterel(entry, remoterel); MemoryContextSwitchTo(oldctx); + + /* Invalidate the corresponding partition map as well */ + logicalrep_partmap_invalidate(remoterel); } /* @@ -451,7 +501,7 @@ logicalrep_rel_close(LogicalRepRelMapEntry *rel, LOCKMODE lockmode) static void logicalrep_partmap_invalidate_cb(Datum arg, Oid reloid) { - LogicalRepRelMapEntry *entry; + LogicalRepPartMapEntry *entry; /* Just to be sure. */ if (LogicalRepPartMap == NULL) @@ -464,11 +514,11 @@ logicalrep_partmap_invalidate_cb(Datum arg, Oid reloid) hash_seq_init(&status, LogicalRepPartMap); /* TODO, use inverse lookup hashtable? */ - while ((entry = (LogicalRepRelMapEntry *) hash_seq_search(&status)) != NULL) + while ((entry = (LogicalRepPartMapEntry *) hash_seq_search(&status)) != NULL) { - if (entry->localreloid == reloid) + if (entry->relmapentry.localreloid == reloid) { - entry->localrelvalid = false; + entry->relmapentry.localrelvalid = false; hash_seq_term(&status); break; } @@ -481,8 +531,8 @@ logicalrep_partmap_invalidate_cb(Datum arg, Oid reloid) hash_seq_init(&status, LogicalRepPartMap); - while ((entry = (LogicalRepRelMapEntry *) hash_seq_search(&status)) != NULL) - entry->localrelvalid = false; + while ((entry = (LogicalRepPartMapEntry *) hash_seq_search(&status)) != NULL) + entry->relmapentry.localrelvalid = false; } } @@ -545,31 +595,24 @@ logicalrep_partition_open(LogicalRepRelMapEntry *root, (void *) &partOid, HASH_ENTER, &found); - if (found) - return &part_entry->relmapentry; + entry = &part_entry->relmapentry; - memset(part_entry, 0, sizeof(LogicalRepPartMapEntry)); + if (found && entry->localrelvalid) + return entry; /* Switch to longer-lived context. */ oldctx = MemoryContextSwitchTo(LogicalRepPartMapContext); - part_entry->partoid = partOid; - - /* Remote relation is copied as-is from the root entry. */ - entry = &part_entry->relmapentry; - entry->remoterel.remoteid = remoterel->remoteid; - entry->remoterel.nspname = pstrdup(remoterel->nspname); - entry->remoterel.relname = pstrdup(remoterel->relname); - entry->remoterel.natts = remoterel->natts; - entry->remoterel.attnames = palloc(remoterel->natts * sizeof(char *)); - entry->remoterel.atttyps = palloc(remoterel->natts * sizeof(Oid)); - for (i = 0; i < remoterel->natts; i++) + if (!found) { - entry->remoterel.attnames[i] = pstrdup(remoterel->attnames[i]); - entry->remoterel.atttyps[i] = remoterel->atttyps[i]; + memset(part_entry, 0, sizeof(LogicalRepPartMapEntry)); + + part_entry->partoid = partOid; + + /* Remote relation is copied as-is from the root entry. */ + logicalrep_update_remoterel(entry, remoterel); + } - entry->remoterel.replident = remoterel->replident; - entry->remoterel.attkeys = bms_copy(remoterel->attkeys); entry->localrel = partrel; entry->localreloid = partOid; -- 2.18.4