diff --git a/src/backend/access/common/relation.c b/src/backend/access/common/relation.c index d8a313a2c9..74b1415ad5 100644 --- a/src/backend/access/common/relation.c +++ b/src/backend/access/common/relation.c @@ -70,7 +70,10 @@ relation_open(Oid relationId, LOCKMODE lockmode) /* Make note that we've accessed a temporary relation */ if (RelationUsesLocalBuffers(r)) + { MyXactFlags |= XACT_FLAGS_ACCESSEDTEMPNAMESPACE; + on_commits_filter_add(relationId); + } pgstat_init_relation(r); @@ -120,7 +123,10 @@ try_relation_open(Oid relationId, LOCKMODE lockmode) /* Make note that we've accessed a temporary relation */ if (RelationUsesLocalBuffers(r)) + { MyXactFlags |= XACT_FLAGS_ACCESSEDTEMPNAMESPACE; + on_commits_filter_add(relationId); + } pgstat_init_relation(r); diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index d119ab909d..dc7bde98ab 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -2122,6 +2122,9 @@ StartTransaction(void) forceSyncCommit = false; MyXactFlags = 0; + /* When starting a new transaction, init or reset the oncommit filter. */ + on_commits_filter_init_or_reset(); + /* * reinitialize within-transaction counters */ diff --git a/src/backend/commands/lockcmds.c b/src/backend/commands/lockcmds.c index cd20ae76ba..77ad85576d 100644 --- a/src/backend/commands/lockcmds.c +++ b/src/backend/commands/lockcmds.c @@ -98,7 +98,10 @@ RangeVarCallbackForLockTable(const RangeVar *rv, Oid relid, Oid oldrelid, */ relpersistence = get_rel_persistence(relid); if (relpersistence == RELPERSISTENCE_TEMP) + { MyXactFlags |= XACT_FLAGS_ACCESSEDTEMPNAMESPACE; + on_commits_filter_add(relid); + } /* Check permissions. */ aclresult = LockTableAclCheck(relid, lockmode, GetUserId()); diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c index dbfe0d6b1c..e2282e6d54 100644 --- a/src/backend/commands/tablecmds.c +++ b/src/backend/commands/tablecmds.c @@ -68,6 +68,7 @@ #include "executor/executor.h" #include "foreign/fdwapi.h" #include "foreign/foreign.h" +#include "lib/bloomfilter.h" #include "miscadmin.h" #include "nodes/makefuncs.h" #include "nodes/nodeFuncs.h" @@ -128,6 +129,12 @@ typedef struct OnCommitItem static List *on_commits = NIL; +/* + * Filter out the temporary tables accessed in the current transaction. + */ +#define ON_COMMITS_FILTER_THRESHOLD 10 +#define ON_COMMITS_FILTER_BYTES (1 * 1024) +static bloom_filter* on_commits_filter = NULL; /* * State information for ALTER TABLE @@ -662,6 +669,7 @@ static void ATExecSplitPartition(List **wqueue, AlteredTableInfo *tab, AlterTableUtilityContext *context); static void ATExecMergePartitions(List **wqueue, AlteredTableInfo *tab, Relation rel, PartitionCmd *cmd, AlterTableUtilityContext *context); +static bool on_commits_filter_contains(Oid relid); /* ---------------------------------------------------------------- * DefineRelation @@ -17429,7 +17437,23 @@ PreCommit_on_commit_actions(void) * tables, as they must still be empty. */ if ((MyXactFlags & XACT_FLAGS_ACCESSEDTEMPNAMESPACE)) - oids_to_truncate = lappend_oid(oids_to_truncate, oc->relid); + { + if (on_commits_filter_contains(oc->relid)) + oids_to_truncate = lappend_oid(oids_to_truncate, oc->relid); +#ifdef USE_ASSERT_CHECKING + else + { + /* + * All temporary tables not in the filter should either + * have no physical files or a file size of zero. + */ + Relation rel; + rel = table_open(oc->relid, AccessShareLock); + Assert(!rel->rd_smgr || !smgrexists(rel->rd_smgr, MAIN_FORKNUM) || smgrnblocks(rel->rd_smgr, MAIN_FORKNUM) == 0); + table_close(rel, AccessShareLock); + } +#endif + } break; case ONCOMMIT_DROP: oids_to_drop = lappend_oid(oids_to_drop, oc->relid); @@ -20766,3 +20790,67 @@ ATExecMergePartitions(List **wqueue, AlteredTableInfo *tab, Relation rel, /* Keep the lock until commit. */ table_close(newPartRel, NoLock); } + +/* + * on_commits_filter_init_or_reset: Initialize or reset the bloom filter, + * ensuring it is only initialized when the number of on_commits exceeds the + * ON_COMMITS_FILTER_THRESHOLD. It is called when starting a new transaction, + * that is, when the XACT_FLAGS_ACCESSEDTEMPNAMESPACE flag is cleared. + */ +void +on_commits_filter_init_or_reset(void) +{ + if (list_length(on_commits) < ON_COMMITS_FILTER_THRESHOLD) + { + if (on_commits_filter != NULL) + { + bloom_free(on_commits_filter); + on_commits_filter = NULL; + } + return; + } + if (on_commits_filter != NULL) + bloom_clear(on_commits_filter); + else + { + MemoryContext oldcxt; + int64 total_items; + + oldcxt = MemoryContextSwitchTo(CacheMemoryContext); + total_items = Max(ON_COMMITS_FILTER_THRESHOLD * 8, 128); + on_commits_filter = + bloom_create_ex(total_items, work_mem, 0, ON_COMMITS_FILTER_BYTES); + MemoryContextSwitchTo(oldcxt); + } +} + +/* + * on_commits_filter_add: Add the temporary table OId to the Bloom filter, + * indicating that the current transaction has accessed this temporary table, + * which needs to be truncated upon commit. It is called when the temporary + * table is opened. + */ +void +on_commits_filter_add(Oid relid) +{ + if (on_commits_filter != NULL) + bloom_add_element(on_commits_filter, (unsigned char *) &relid, sizeof(relid)); +} + +/* + * on_commits_filter_contains: Determine whether the specified temporary table + * has been accessed in the current transaction. + */ +static bool +on_commits_filter_contains(Oid relid) +{ + /* + * If the on_commits filter is not initialized, assuming that the current + * transaction has accessed the temp table defaultly, keeping the logic + * consistent with when there is no filter. + */ + if (on_commits_filter == NULL) + return true; + + return !bloom_lacks_element(on_commits_filter, (unsigned char *) &relid, sizeof(relid)); +} diff --git a/src/backend/lib/bloomfilter.c b/src/backend/lib/bloomfilter.c index 360d21ca45..d2e2513150 100644 --- a/src/backend/lib/bloomfilter.c +++ b/src/backend/lib/bloomfilter.c @@ -40,6 +40,7 @@ #include "port/pg_bitutils.h" #define MAX_HASH_FUNCS 10 +#define MIN_BITSET_BYTES (1024 * 1024) struct bloom_filter { @@ -85,6 +86,13 @@ static inline uint32 mod_m(uint32 val, uint64 m); */ bloom_filter * bloom_create(int64 total_elems, int bloom_work_mem, uint64 seed) +{ + return bloom_create_ex(total_elems, bloom_work_mem, seed, MIN_BITSET_BYTES); +} + +bloom_filter * +bloom_create_ex(int64 total_elems, int bloom_work_mem, uint64 seed, + uint64 min_bitset_bytes) { bloom_filter *filter; int bloom_power; @@ -292,3 +300,9 @@ mod_m(uint32 val, uint64 m) return val & (m - 1); } + +void +bloom_clear(bloom_filter *filter) +{ + memset(filter->bitset, 0, filter->m / BITS_PER_BYTE); +} \ No newline at end of file diff --git a/src/include/commands/tablecmds.h b/src/include/commands/tablecmds.h index 85cbad3d0c..461bc1ea47 100644 --- a/src/include/commands/tablecmds.h +++ b/src/include/commands/tablecmds.h @@ -107,4 +107,7 @@ extern void RangeVarCallbackOwnsRelation(const RangeVar *relation, extern bool PartConstraintImpliedByRelConstraint(Relation scanrel, List *partConstraint); +extern void on_commits_filter_init_or_reset(void); +extern void on_commits_filter_add(Oid relid); + #endif /* TABLECMDS_H */ diff --git a/src/include/lib/bloomfilter.h b/src/include/lib/bloomfilter.h index 6ec7173843..13a0670e92 100644 --- a/src/include/lib/bloomfilter.h +++ b/src/include/lib/bloomfilter.h @@ -17,11 +17,14 @@ typedef struct bloom_filter bloom_filter; extern bloom_filter *bloom_create(int64 total_elems, int bloom_work_mem, uint64 seed); +extern bloom_filter *bloom_create_ex(int64 total_elems, int bloom_work_mem, + uint64 seed, uint64 min_bitset_bytes); extern void bloom_free(bloom_filter *filter); extern void bloom_add_element(bloom_filter *filter, unsigned char *elem, size_t len); extern bool bloom_lacks_element(bloom_filter *filter, unsigned char *elem, size_t len); extern double bloom_prop_bits_set(bloom_filter *filter); +extern void bloom_clear(bloom_filter *filter); #endif /* BLOOMFILTER_H */