Bloom filter Pushdown Optimization for Merge Join
Hello,
A bloom filter provides early filtering of rows that cannot be joined
before they would reach the join operator, the optimization is also
called a semi join filter (SJF) pushdown. Such a filter can be created
when one child of the join operator must materialize its derived table
before the other child is evaluated.
For example, a bloom filter can be created using the the join keys for
the build side/inner side of a hash join or the outer side of a merge
join, the bloom filter can then be used to pre-filter rows on the
other side of the join operator during the scan of the base relation.
The thread about “Hash Joins vs. Bloom Filters / take 2” [1]/messages/by-id/c902844d-837f-5f63-ced3-9f7fd222f175@2ndquadrant.com is good
discussion on using such optimization for hash join without going into
the pushdown of the filter where its performance gain could be further
increased.
We worked on prototyping bloom filter pushdown for both hash join and
merge join. Attached is a patch set for bloom filter pushdown for
merge join. We also plan to send the patch for hash join once we have
it rebased.
Here is a summary of the patch set:
1. Bloom Filter Pushdown optimizes Merge Join by filtering rows early
during the table scan instead of later on.
-The bloom filter is pushed down along the execution tree to
the target SeqScan nodes.
-Experiments show that this optimization can speed up Merge
Join by up to 36%.
2. The planner makes the decision to use the bloom filter based on the
estimated filtering rate and the expected performance gain.
-The planner accomplishes this by estimating four numbers per
variable - the total number of rows of the relation, the number of
distinct values for a given variable, and the minimum and maximum
value of the variable (when applicable). Using these numbers, the
planner estimates a filtering rate of a potential filter.
-Because actually creating and implementing the filter adds
more operations, there is a minimum threshold of filtering where the
filter would actually be useful. Based on testing, we query to see if
the estimated filtering rate is higher than 35%, and that informs our
decision to use a filter or not.
3. If using a bloom filter, the planner also adjusts the expected cost
of Merge Join based on expected performance gain.
4. Capability to build the bloom filter in parallel in case of
parallel SeqScan. This is done efficiently by populating a local bloom
filter for each parallel worker and then taking a bitwise OR over all
the local bloom filters to form a shared bloom filter at the end of
the parallel SeqScan.
5. The optimization is GUC controlled, with settings of
enable_mergejoin_semijoin_filter and force_mergejoin_semijoin_filter.
We found in experiments that there is a significant improvement
when using the bloom filter during Merge Join. One experiment involved
joining two large tables while varying the theoretical filtering rate
(TFR) between the two tables, the TFR is defined as the percentage
that the two datasets are disjoint. Both tables in the merge join were
the same size. We tested changing the TFR to see the change in
filtering optimization.
For example, let’s imagine t0 has 10 million rows, which contain the
numbers 1 through 10 million randomly shuffled. Also, t1 has the
numbers 4 million through 14 million randomly shuffled. Then the TFR
for a join of these two tables is 40%, since 40% of the tables are
disjoint from the other table (1 through 4 million for t0, 10 million
through 14 million for t4).
Here is the performance test result joining two tables:
TFR: theoretical filtering rate
EFR: estimated filtering rate
AFR: actual filtering rate
HJ: hash join
MJ Default: default merge join
MJ Filter: merge join with bloom filter optimization enabled
MJ Filter Forced: merge join with bloom filter optimization forced
TFR EFR AFR HJ MJ Default MJ Filter MJ Filter Forced
-------------------------------------------------------------------------------------
10 33.46 7.41 6529 22638 21949 23160
20 37.27 14.85 6483 22290 21928 21930
30 41.32 22.25 6395 22374 20718 20794
40 45.67 29.7 6272 21969 19449 19410
50 50.41 37.1 6210 21412 18222 18224
60 55.64 44.51 6052 21108 17060 17018
70 61.59 51.98 5947 21020 15682 15737
80 68.64 59.36 5761 20812 14411 14437
90 77.83 66.86 5701 20585 13171 13200
Table. Execution Time (ms) vs Filtering Rate (%) for Joining Two
Tables of 10M Rows.
Attached you can find figures of the same performance test and a SQL script
to reproduce the performance test.
The first thing to notice is that Hash Join generally is the most
efficient join strategy. This is because Hash Join is better at
dealing with small tables, and our size of 10 million is still small
enough where Hash Join outperforms the other join strategies. Future
experiments can investigate using much larger tables.
However, comparing just within the different Merge Join variants, we
see that using the bloom filter greatly improves performance.
Intuitively, all of these execution times follow linear paths.
Comparing forced filtering versus default, we can see that the default
Merge Join outperforms Merge Join with filtering at low filter rates,
but after about 20% TFR, the Merge Join with filtering outperforms
default Merge Join. This makes intuitive sense, as there are some
fixed costs associated with building and checking with the bloom
filter. In the worst case, at only 10% TFR, the bloom filter makes
Merge Join less than 5% slower. However, in the best case, at 90% TFR,
the bloom filter improves Merge Join by 36%.
Based on the results of the above experiments, we came up with a
linear equation for the performance ratio for using the filter
pushdown from the actual filtering rate. Based on the numbers
presented in the figure, this is the equation:
T_filter / T_no_filter = 1 / (0.83 * estimated filtering rate + 0.863)
For example, this means that with an estimated filtering rate of 0.4,
the execution time of merge join is estimated to be improved by 16.3%.
Note that the estimated filtering rate is used in the equation, not
the theoretical filtering rate or the actual filtering rate because it
is what we have during planning. In practice the estimated filtering
rate isn’t usually accurate. In fact, the estimated filtering rate can
differ from the theoretical filtering rate by as much as 17% in our
experiments. One way to mitigate the power loss of bloom filter caused
by inaccurate estimated filtering rate is to adaptively turn it off at
execution time, this is yet to be implemented.
Here is a list of tasks we plan to work on in order to improve this patch:
1. More regression testing to guarantee correctness.
2. More performance testing involving larger tables and complicated query plans.
3. Improve the cost model.
4. Explore runtime tuning such as making the bloom filter checking adaptive.
5. Currently, only the best single join key is used for building the
Bloom filter. However, if there are several keys and we know that
their distributions are somewhat disjoint, we could leverage this fact
and use multiple keys for the bloom filter.
6. Currently, Bloom filter pushdown is only implemented for SeqScan
nodes. However, it would be possible to allow push down to other types
of scan nodes.
7. Explore if the Bloom filter could be pushed down through a foreign
scan when the foreign server is capable of handling it – which could
be made true for postgres_fdw.
8. Better explain command on the usage of bloom filters.
This patch set is prepared by Marcus Ma, Lyu Pan and myself. Feedback
is appreciated.
With Regards,
Zheng Li
Amazon RDS/Aurora for PostgreSQL
[1]: /messages/by-id/c902844d-837f-5f63-ced3-9f7fd222f175@2ndquadrant.com
Attachments:
0001-Support-semijoin-filter-in-the-planner-optimizer.patchapplication/octet-stream; name=0001-Support-semijoin-filter-in-the-planner-optimizer.patchDownload
From 43c31315278890febb411eccd0652ab109215010 Mon Sep 17 00:00:00 2001
From: Lyu Pan <lyup@amazon.com>
Date: Thu, 15 Sep 2022 22:16:23 +0000
Subject: [PATCH 1/5] Support semijoin filter in the planner/optimizer.
1. Introduces two GUCs enable_mergejoin_semijoin_filter and force_mergejoin_semijoin_filter. enable_mergejoin_semijoin_filter enables the use of bloom filter during a merge-join, if such a filter is available, the planner will adjust the merge-join cost and use it only if it meets certain threshold (estimated filtering rate has to be higher than a certain value);
force_mergejoin_semijoin_filter forces the use of bloom filter during a merge-join if a valid filter is available.
2. In this prototype, only a single join clause where both sides maps to a base column will be considered as the key to build the bloom filter. For example:
bloom filter may be used in this query:
SELECT * FROM a JOIN b ON a.col1 = b.col1;
bloom filter will not be used in the following query (the left hand side of the join clause is an expression):
SELECT * FROM a JOIN b ON a.col1 + a.col2 = b.col1;
3. In this prototype, the cost model is based on an assumption that there is a linear relationship between the performance gain from using a semijoin filter and the estimated filtering rate:
% improvement to Merge Join cost = 0.83 * estimated filtering rate - 0.137.
---
src/backend/optimizer/path/costsize.c | 1298 ++++++++++++++++-
src/backend/optimizer/plan/createplan.c | 611 ++++++++
src/backend/utils/adt/selfuncs.c | 24 +-
src/backend/utils/misc/guc_tables.c | 20 +
src/backend/utils/misc/postgresql.conf.sample | 2 +
src/include/nodes/pathnodes.h | 13 +
src/include/nodes/plannodes.h | 8 +
src/include/optimizer/cost.h | 47 +
src/include/utils/selfuncs.h | 4 +-
src/test/regress/expected/sysviews.out | 49 +-
10 files changed, 2037 insertions(+), 39 deletions(-)
diff --git a/src/backend/optimizer/path/costsize.c b/src/backend/optimizer/path/costsize.c
index f486d42441..d1663e5a37 100644
--- a/src/backend/optimizer/path/costsize.c
+++ b/src/backend/optimizer/path/costsize.c
@@ -192,6 +192,30 @@ static double relation_byte_size(double tuples, int width);
static double page_size(double tuples, int width);
static double get_parallel_divisor(Path *path);
+/*
+ * Local functions and option variables to support
+ * semijoin pushdowns from join nodes
+ */
+static double evaluate_semijoin_filtering_rate(JoinPath *join_path,
+ const List *hash_equijoins,
+ const PlannerInfo *root,
+ JoinCostWorkspace *workspace,
+ int *best_clause,
+ int *rows_filtered);
+static bool verify_valid_pushdown(const Path *p,
+ const Index pushdown_target_key_no,
+ const PlannerInfo *root);
+static TargetEntry *get_nth_targetentry(int posn,
+ const List *targetlist);
+static bool is_fk_pk(const Var *outer_var,
+ const Var *inner_var,
+ Oid op_oid,
+ const PlannerInfo *root);
+static List *get_switched_clauses(List *clauses, Relids outerrelids);
+
+/* Global variables to store semijoin control options */
+bool enable_mergejoin_semijoin_filter;
+bool force_mergejoin_semijoin_filter;
/*
* clamp_row_est
@@ -3650,6 +3674,10 @@ initial_cost_mergejoin(PlannerInfo *root, JoinCostWorkspace *workspace,
innerstartsel = 0.0;
innerendsel = 1.0;
}
+ workspace->outer_min_val = cache->leftmin;
+ workspace->outer_max_val = cache->leftmax;
+ workspace->inner_min_val = cache->rightmin;
+ workspace->inner_max_val = cache->rightmax;
}
else
{
@@ -3811,6 +3839,10 @@ final_cost_mergejoin(PlannerInfo *root, MergePath *path,
double mergejointuples,
rescannedtuples;
double rescanratio;
+ List *mergeclauses_for_sjf;
+ double filteringRate;
+ int best_filter_clause;
+ int rows_filtered;
/* Protect some assumptions below that rowcounts aren't zero */
if (inner_path_rows <= 0)
@@ -3863,6 +3895,49 @@ final_cost_mergejoin(PlannerInfo *root, MergePath *path,
else
path->skip_mark_restore = false;
+ if (enable_mergejoin_semijoin_filter)
+ {
+ /*
+ * determine if merge join should use a semijoin filter. We need to
+ * rearrange the merge clauses so they match the status of the clauses
+ * during Plan creation.
+ */
+ mergeclauses_for_sjf = get_actual_clauses(path->path_mergeclauses);
+ mergeclauses_for_sjf = get_switched_clauses(path->path_mergeclauses,
+ path->jpath.outerjoinpath->parent->relids);
+ filteringRate = evaluate_semijoin_filtering_rate((JoinPath *) path, mergeclauses_for_sjf, root,
+ workspace, &best_filter_clause, &rows_filtered);
+ if (force_mergejoin_semijoin_filter ||
+ (filteringRate >= 0 && rows_filtered >= 0 && best_filter_clause >= 0))
+ {
+ /* found a valid SJF at the very least */
+ /* want at least 1000 rows_filtered to avoid any nasty edge cases */
+ if (force_mergejoin_semijoin_filter || (filteringRate >= 0.35 && rows_filtered > 1000))
+ {
+ double improvement;
+
+ path->use_semijoinfilter = true;
+ path->best_mergeclause = best_filter_clause;
+ path->filteringRate = filteringRate;
+
+ /*
+ * Based on experimental data, we have found that there is a
+ * linear relationship between the estimated filtering rate
+ * and improvement to the cost of Merge Join. In fact, this
+ * improvement can be modeled by this equation: improvement =
+ * 0.83 * filtering rate - 0.137 i.e., a filtering rate of 0.4
+ * yields an improvement of 19.5%. This equation also
+ * concludes thata a 17% filtering rate is the break-even
+ * point, so we use 35% just be conservative. We use this
+ * information to adjust the MergeJoin's planned cost.
+ */
+ improvement = 0.83 * filteringRate - 0.137;
+ run_cost = (1 - improvement) * run_cost;
+ workspace->run_cost = run_cost;
+ }
+ }
+ }
+
/*
* Get approx # tuples passing the mergequals. We use approx_tuple_count
* here because we need an estimate done with JOIN_INNER semantics.
@@ -4044,6 +4119,10 @@ cached_scansel(PlannerInfo *root, RestrictInfo *rinfo, PathKey *pathkey)
leftendsel,
rightstartsel,
rightendsel;
+ Datum leftmin,
+ leftmax,
+ rightmin,
+ rightmax;
MemoryContext oldcontext;
/* Do we have this result already? */
@@ -4066,7 +4145,11 @@ cached_scansel(PlannerInfo *root, RestrictInfo *rinfo, PathKey *pathkey)
&leftstartsel,
&leftendsel,
&rightstartsel,
- &rightendsel);
+ &rightendsel,
+ &leftmin,
+ &leftmax,
+ &rightmin,
+ &rightmax);
/* Cache the result in suitably long-lived workspace */
oldcontext = MemoryContextSwitchTo(root->planner_cxt);
@@ -4080,6 +4163,10 @@ cached_scansel(PlannerInfo *root, RestrictInfo *rinfo, PathKey *pathkey)
cache->leftendsel = leftendsel;
cache->rightstartsel = rightstartsel;
cache->rightendsel = rightendsel;
+ cache->leftmin = leftmin;
+ cache->leftmax = leftmax;
+ cache->rightmin = rightmin;
+ cache->rightmax = rightmax;
rinfo->scansel_cache = lappend(rinfo->scansel_cache, cache);
@@ -6552,3 +6639,1212 @@ compute_bitmap_pages(PlannerInfo *root, RelOptInfo *baserel, Path *bitmapqual,
return pages_fetched;
}
+
+/* The following code was modified from previous commits on the
+ * HashJoin SemiJoinFilter CR.
+ */
+
+/*
+ * Conditionally compiled tracing is available for the semijoin
+ * decision process. Tracing in only included for debug builds,
+ * and only if the TRACE_SJPD flag is defined.
+ */
+
+#define TRACE_SJPD 0
+#define DEBUG_BUILD 0
+
+#if defined(DEBUG_BUILD) && defined(TRACE_SJPD) && DEBUG_BUILD && TRACE_SJPD
+#define debug_sj1(x) elog(INFO, (x))
+#define debug_sj2(x,y) elog(INFO, (x), (y))
+#define debug_sj3(x,y,z) elog(INFO, (x), (y), (z))
+#define debug_sj4(w,x,y,z) elog(INFO, (w), (x), (y), (z))
+#define debug_sj_md(x,y,z) debug_sj_expr_metadata((x), (y), (z))
+#else
+#define debug_sj1(x)
+#define debug_sj2(x,y)
+#define debug_sj3(x,y,z)
+#define debug_sj4(w,x,y,z)
+#define debug_sj_md(x,y,z)
+#endif
+
+
+static void
+init_expr_metadata(ExprMetadata * md)
+{
+ /* Should only be called by analyze_expr_for_metadata */
+ Assert(md);
+
+ md->is_or_maps_to_constant = false;
+ md->is_or_maps_to_base_column = false;
+ md->local_column_expr = NULL;
+ md->local_relation = NULL;
+ md->est_col_width = 0;
+ md->base_column_expr = NULL;
+ md->base_rel = NULL;
+ md->base_rel_root = NULL;
+ md->base_rel_row_count = 0.0;
+ md->base_rel_filt_row_count = 0.0;
+ md->base_col_distincts = -1.0;
+ md->est_distincts_reliable = false;
+ md->expr_est_distincts = -1.0;
+}
+
+
+#if defined(DEBUG_BUILD) && defined(TRACE_SJPD) && DEBUG_BUILD && TRACE_SJPD
+
+static void
+debug_sj_expr_metadata(const char *side, int ord, const ExprMetadata * md)
+{
+ debug_sj4("SJPD: %s key [%d] is constant: %d",
+ side, ord, md->is_or_maps_to_constant);
+ debug_sj4("SJPD: %s key [%d] is base col: %d",
+ side, ord, md->is_or_maps_to_base_column);
+ debug_sj4("SJPD: %s key [%d] est reliable: %d",
+ side, ord, md->est_distincts_reliable);
+ debug_sj4("SJPD: %s key [%d] trows_bf: %.1lf",
+ side, ord, md->base_rel_row_count);
+ debug_sj4("SJPD: %s key [%d] trows_af: %.1lf",
+ side, ord, md->base_rel_filt_row_count);
+ debug_sj4("SJPD: %s key [%d] bcol_dist: %.1lf",
+ side, ord, md->base_col_distincts);
+ debug_sj4("SJPD: %s key [%d] est width: %d",
+ side, ord, md->est_col_width);
+
+ if (md->local_relation && md->local_relation != md->base_rel)
+ {
+ debug_sj4("SJPD: %s key [%d] logical relid: %d",
+ side, ord, md->local_relation->relid);
+ }
+ if (md->base_rel)
+ {
+ debug_sj4("SJPD: %s key [%d] base relid: %d",
+ side, ord, md->base_rel->relid);
+ }
+
+ if (md->base_rel && (md->base_rel->reloptkind == RELOPT_BASEREL ||
+ md->base_rel->reloptkind == RELOPT_OTHER_MEMBER_REL))
+ {
+ /* include the column name, if we can get it */
+ const RelOptInfo *cur_relation = md->base_rel;
+ Oid cur_var_reloid = InvalidOid;
+ const Var *cur_var = (const Var *) md->base_column_expr;
+
+ Assert(IsA(cur_var, Var));
+ cur_var_reloid = (planner_rt_fetch(cur_relation->relid,
+ md->base_rel_root))->relid;
+ if (cur_var_reloid != InvalidOid && cur_var->varattno > 0)
+ {
+ const char *base_attribute_name =
+ get_attname(cur_var_reloid, md->base_column_expr->varattno,
+ true);
+ const char *base_rel_name = get_rel_name(cur_var_reloid);
+ char name_str[260] = "";
+
+ if (base_rel_name && base_attribute_name)
+ {
+ snprintf(name_str, sizeof(name_str), "%s.%s",
+ base_rel_name, base_attribute_name);
+ }
+ else if (base_attribute_name)
+ {
+ snprintf(name_str, sizeof(name_str), "%s",
+ base_attribute_name);
+ }
+ if (base_attribute_name)
+ {
+ debug_sj4("SJPD: %s key [%d] base col name: %s",
+ side, ord, name_str);
+ }
+ }
+ }
+ debug_sj4("SJPD: %s key [%d] est distincts: %.1lf",
+ side, ord, md->expr_est_distincts);
+}
+#endif
+
+
+static double
+estimate_distincts_remaining(double original_table_row_count,
+ double original_distinct_count,
+ double est_row_count_after_predicates)
+{
+ /*
+ * Estimates the number of distinct values still present within a column
+ * after some local filtering has been applied to that table and thereby
+ * restricted the set of relevant rows.
+ *
+ * This method assumes that the original_distinct_count comes from a
+ * column whose values are uncorrelated with the row restricting
+ * condition(s) on this table. Other mechanisms need to be added to more
+ * accurately handle the cases where the row restrincting condition is
+ * directly on the current column.
+ *
+ * The most probable number of distinct values remaining can be computed
+ * exactly using Yao's iterative expansion formula from: "Approximating
+ * block accesses in database organizations", S. B. Yao, CACM, V20, N4,
+ * April 1977, p. 260-261 However, this formula gets very expensive to
+ * compute whenever the number of distinct values is large.
+ *
+ * This function instead uses a non-iterative approximation of Yao's
+ * iterative formula from: "Estimating Block Accesses in Database
+ * Organizations: A Closed Noniterative Formula", Kyu-Young Whang, Gio
+ * Wiederhold, and Daniel Sagalowicz CACM V26, N11, November 1983, p.
+ * 945-947 This approximation starts with terms for the first three
+ * iterations of Yao's formula, and then inserts two adjustment factors
+ * into the third term which minimize the total error related to the
+ * missing subsequent terms.
+ *
+ * Internally this function uses M, N, P, and K as variables to match the
+ * notation used in the equation in the paper.
+ */
+ double n = original_table_row_count;
+ double m = original_distinct_count;
+ double k = est_row_count_after_predicates;
+ double p = 0.0; /* avg rows per distinct */
+ double result;
+
+ /* The three partial probabality terms */
+ double term_1 = 0.0;
+ double term_2 = 0.0;
+ double term_3 = 0.0;
+ double sum_terms = 0.0;
+
+ /* In debug builds, validate the sanity of the inputs */
+ Assert(isfinite(original_table_row_count));
+ Assert(isfinite(original_distinct_count));
+ Assert(isfinite(est_row_count_after_predicates));
+ Assert(original_table_row_count >= 0.0);
+ Assert(original_distinct_count >= 0.0);
+ Assert(est_row_count_after_predicates >= 0.0);
+ Assert(original_distinct_count <= original_table_row_count);
+ Assert(est_row_count_after_predicates <= original_table_row_count);
+
+ if (n > 0.0 && m > 0.0)
+ {
+ p = (n / m);
+ }
+ Assert(isfinite(p));
+
+ if (k > (n - p))
+ { /* All distincts almost guaranteed to still be
+ * present */
+ result = m;
+ }
+ else if (m < 0.000001)
+ { /* When all values are NULL, avoid division by
+ * zero */
+ result = 0.0;
+ }
+ else if (k <= 1.000001)
+ { /* When only one or zero rows after filtering */
+ result = k;
+ }
+ else
+ {
+ /*
+ * When this is not a special case, compute the partial probabilities.
+ * However, if the probability calculation overflows, then revert to
+ * the estimate we can get from the upper bound analysis.
+ */
+ result = fmin(original_distinct_count, est_row_count_after_predicates);
+
+ if (isfinite(1.0 / m) && isfinite(pow((1.0 - (1.0 / m)), k)))
+ {
+ term_1 = (1.0 - pow((1.0 - (1.0 / m)), k));
+
+ if (isfinite(term_1))
+ {
+ /*
+ * As long as we at least have a usable term_1, then proceed
+ * to the much smaller term_2 and to the even smaller term_3.
+ *
+ * If no usable term_1, then just use the hard upper bounds.
+ */
+ if (isfinite(m * m * p)
+ && isfinite(pow((1.0 - (1.0 / m)), (k - 1.0))))
+ {
+ term_2 = ((1.0 / (m * m * p))
+ * ((k * (k - 1.0)) / 2.0)
+ * pow((1.0 - (1.0 / m)), (k - 1.0))
+ );
+ }
+ if (!isfinite(term_2))
+ {
+ term_2 = 0.0;
+ }
+ if (isfinite(pow(m, 3.0))
+ && isfinite(pow(p, 4.0))
+ && isfinite(pow(m, 3.0) * pow(p, 4.0))
+ && isfinite(k * (k - 1.0) * ((2 * k) - 1.0))
+ && isfinite(pow((1.0 - (1.0 / m)), (k - 1.0))))
+ {
+ term_3 = ((1.5 / (pow(m, 3.0) * pow(p, 4.0)))
+ * ((k * (k - 1.0) * ((2 * k) - 1.0)) / 6.0)
+ * pow((1.0 - (1.0 / m)), (k - 1.0)));
+ }
+ if (!isfinite(term_3))
+ {
+ term_3 = 0.0;
+ }
+ sum_terms = term_1 + term_2 + term_3;
+
+ /* In debug builds, validate the partial probability terms */
+ Assert(term_1 <= 1.0 && term_1 >= 0.0);
+ Assert(term_2 <= 1.0 && term_2 >= 0.0);
+ Assert(term_3 <= 1.0 && term_3 >= 0.0);
+ Assert(term_1 > term_2);
+ Assert(term_2 >= term_3);
+ Assert(isfinite(sum_terms));
+ Assert(sum_terms <= 1.0);
+
+ if (isfinite(m * sum_terms))
+ {
+ result = round(m * sum_terms);
+ }
+
+ /* Ensure hard upper bounds still satisfied */
+ result = fmin(result,
+ fmin(original_distinct_count,
+ est_row_count_after_predicates));
+
+ /* Since empty tables were handled above, must be >= 1 */
+ result = fmax(result, 1.0);
+ }
+ }
+ }
+
+ Assert(result >= 0.0);
+ Assert(result <= original_distinct_count);
+ Assert(result <= est_row_count_after_predicates);
+
+ return result;
+}
+
+
+static void
+gather_base_column_metadata(const Var *base_col,
+ const RelOptInfo *base_rel,
+ const PlannerInfo *root,
+ ExprMetadata * md)
+{
+ /*
+ * Given a Var for a base column, gather metadata about that column Should
+ * only be called indirectly under analyze_expr_for_metadata
+ */
+ VariableStatData base_col_vardata;
+ Oid base_col_reloid = InvalidOid;
+ bool is_default;
+
+ /* Oid var_sortop; */
+
+ Assert(md && base_rel && root);
+ Assert(base_col && IsA(base_col, Var));
+ Assert(base_rel->reloptkind == RELOPT_BASEREL ||
+ base_rel->reloptkind == RELOPT_OTHER_MEMBER_REL);
+ Assert(base_rel->rtekind == RTE_RELATION);
+
+ md->base_column_expr = base_col;
+ md->base_rel = base_rel;
+ md->base_rel_root = root;
+ md->is_or_maps_to_base_column = true;
+
+ examine_variable((PlannerInfo *) root, (Node *) base_col, 0,
+ &base_col_vardata);
+ Assert(base_col_vardata.rel);
+ Assert(base_col_vardata.rel == base_rel);
+
+ md->base_rel_row_count = md->base_rel->tuples;
+ md->base_rel_filt_row_count = md->base_rel->rows;
+ md->base_col_distincts =
+ fmin(get_variable_numdistinct(&base_col_vardata, &is_default),
+ md->base_rel_row_count);
+ md->est_distincts_reliable = !is_default;
+
+ /*
+ * For indirectly filtered columns estimate the effect of the rows
+ * filtered on the remaining column distinct count.
+ */
+ md->expr_est_distincts =
+ fmax(1.0,
+ estimate_distincts_remaining(md->base_rel_row_count,
+ md->base_col_distincts,
+ md->base_rel_filt_row_count));
+
+ base_col_reloid = (planner_rt_fetch(base_rel->relid, root))->relid;
+ if (base_col_reloid != InvalidOid && base_col->varattno > 0)
+ {
+ md->est_col_width =
+ get_attavgwidth(base_col_reloid, base_col->varattno);
+ }
+ ReleaseVariableStats(base_col_vardata);
+}
+
+static Expr *
+get_subquery_var_occluded_reference(const Expr *ex, const PlannerInfo *root)
+{
+ /*
+ * Given a virtual column from an unflattened subquery, return the
+ * expression it immediately occludes
+ */
+ Var *outside_subq_var = (Var *) ex;
+ RelOptInfo *outside_subq_relation = NULL;
+ RangeTblEntry *outside_subq_rte = NULL;
+ TargetEntry *te = NULL;
+ Expr *inside_subq_expr = NULL;
+
+ Assert(ex && root);
+ Assert(IsA(ex, Var));
+ Assert(outside_subq_var->varno < root->simple_rel_array_size);
+
+ outside_subq_relation = root->simple_rel_array[outside_subq_var->varno];
+ outside_subq_rte = root->simple_rte_array[outside_subq_var->varno];
+
+ /*
+ * If inheritance, subquery has append, leg of append in subquery may not
+ * have subroot, we may be able to better process it according to
+ * root->append_rel_list. For now just return the first leg... TODO better
+ * handling of Union All, we only return statistics of the first leg atm.
+ * TODO similarly, need better handling of partitioned tables, according
+ * to outside_subq_relation->part_scheme and part_rels.
+ */
+ if (outside_subq_rte->inh)
+ {
+ AppendRelInfo *appendRelInfo = NULL;
+
+ Assert(root->append_rel_list);
+
+ /* TODO remove this check once we add better handling of inheritance */
+ if (force_mergejoin_semijoin_filter)
+ {
+ appendRelInfo = list_nth(root->append_rel_list, 0);
+ Assert(appendRelInfo->parent_relid == outside_subq_var->varno);
+
+ Assert(appendRelInfo->translated_vars &&
+ outside_subq_var->varattno <=
+ list_length(appendRelInfo->translated_vars));
+ inside_subq_expr = list_nth(appendRelInfo->translated_vars,
+ outside_subq_var->varattno - 1);
+ }
+ }
+
+ /* Subquery without append and partitioned tables */
+ else
+ {
+ Assert(outside_subq_relation && IsA(outside_subq_relation, RelOptInfo));
+ Assert(outside_subq_relation->reloptkind == RELOPT_BASEREL);
+ Assert(outside_subq_relation->rtekind == RTE_SUBQUERY);
+ Assert(outside_subq_relation->subroot->processed_tlist);
+
+ te = get_nth_targetentry(outside_subq_var->varattno,
+ outside_subq_relation->subroot->processed_tlist);
+ Assert(te && outside_subq_var->varattno == te->resno);
+ inside_subq_expr = te->expr;
+
+ /*
+ * Strip off any Relabel present, and return the underlying expression
+ */
+ while (inside_subq_expr && IsA(inside_subq_expr, RelabelType))
+ {
+ inside_subq_expr = ((RelabelType *) inside_subq_expr)->arg;
+ }
+ }
+
+ return inside_subq_expr;
+}
+
+
+static void
+recursively_analyze_expr_metadata(const Expr *ex,
+ const PlannerInfo *root,
+ ExprMetadata * md)
+{
+ /* Should only be called by analyze_expr_for_metadata, or itself */
+ Assert(md && ex && root);
+
+ if (IsA(ex, Const))
+ {
+ md->is_or_maps_to_constant = true;
+ md->expr_est_distincts = 1.0;
+ md->est_distincts_reliable = true;
+ }
+ else if (IsA(ex, RelabelType))
+ {
+ recursively_analyze_expr_metadata(((RelabelType *) ex)->arg, root, md);
+ }
+ else if (IsA(ex, Var))
+ {
+ Var *local_var = (Var *) ex;
+ RelOptInfo *local_relation = NULL;
+
+ Assert(local_var->varno < root->simple_rel_array_size);
+
+ /* Bail out if varno is invalid */
+ if (local_var->varno == InvalidOid)
+ return;
+
+ local_relation = root->simple_rel_array[local_var->varno];
+ Assert(local_relation && IsA(local_relation, RelOptInfo));
+
+ /*
+ * For top level call (i.e. not a recursive invocation) cache the
+ * relation pointer
+ */
+ if (!md->local_relation
+ && (local_relation->reloptkind == RELOPT_BASEREL ||
+ local_relation->reloptkind == RELOPT_OTHER_MEMBER_REL))
+ {
+ md->local_relation = local_relation;
+ md->local_column_expr = local_var;
+ }
+
+ if ((local_relation->reloptkind == RELOPT_BASEREL ||
+ local_relation->reloptkind == RELOPT_OTHER_MEMBER_REL)
+ && local_relation->rtekind == RTE_RELATION)
+ {
+ /* Found Var is a base column, so gather the metadata we can */
+ gather_base_column_metadata(local_var, local_relation, root, md);
+ }
+ else if (local_relation->reloptkind == RELOPT_BASEREL
+ && local_relation->rtekind == RTE_SUBQUERY)
+ {
+ RangeTblEntry *outside_subq_rte =
+ root->simple_rte_array[local_relation->relid];
+
+ /* root doesn't change for inheritance case, e.g. for UNION ALL */
+ const PlannerInfo *new_root = outside_subq_rte->inh ?
+ root : local_relation->subroot;
+
+ /*
+ * Found that this Var is a subquery SELECT list item, so continue
+ * to recurse on the occluded expression
+ */
+ Expr *occluded_expr =
+ get_subquery_var_occluded_reference(ex, root);
+
+ if (occluded_expr)
+ {
+ recursively_analyze_expr_metadata(occluded_expr,
+ new_root, md);
+ }
+ }
+ }
+}
+
+
+void
+analyze_expr_for_metadata(const Expr *ex,
+ const PlannerInfo *root,
+ ExprMetadata * md)
+{
+ /*
+ * Analyze the supplied expression, and if possible, gather metadata about
+ * it. Currently handles: base table columns, constants, and virtual
+ * columns from unflattened subquery blocks. The metadata collected is
+ * placed into the supplied ExprMetadata object.
+ */
+ Assert(md && ex && root);
+
+ init_expr_metadata(md);
+ recursively_analyze_expr_metadata(ex, root, md);
+}
+
+
+/*
+ * Function: evaluate_semijoin_filtering_rate
+ *
+ * Given a merge join path, determine two things.
+ * First, can a Bloom filter based semijoin be created on the
+ * outer scan relation and checked on the inner scan relation to
+ * filter out rows from the inner relation? And second, if this
+ * is possible, determine the single equijoin condition that is most
+ * useful as well as the estimated filtering rate of the filter.
+ *
+ * The output args, inner_semijoin_keys and
+ * outer_semijoin_keys, will each contain a single key column
+ * from one of the hash equijoin conditions. probe_semijoin_keys
+ * contains keys from the target relation to probe the semijoin filter.
+ *
+ * A potential semijoin will be deemed valid only if all
+ * of the following are true:
+ * a) The enable_mergejoin_semijoin_filter option is set true
+ * b) The equijoin key from the outer side is or maps
+ * to a base table column
+ * c) The equijoin key from the inner side is or maps to
+ * a base column
+ *
+ * A potential semijoin will be deemed useful only if the
+ * force_mergejoin_semijoin_filter is set true, or if all of the
+ * following are true:
+ * a) The equijoin key base column from the outer side has
+ * reliable metadata (i.e. ANALYZE was done on it)
+ * b) The key column(s) from the outer side equijoin keys
+ * have width metadata available.
+ * c) The estimated outer side key column width(s) are not
+ * excessively wide.
+ * d) The equijoin key from the inner side either:
+ * 1) maps to a base column with reliable metadata, or
+ * 2) is constrained by the incoming estimated tuple
+ * count to have a distinct count smaller than the
+ * outer side key column's distinct count.
+ * e) The semijoin must be estimated to filter at least some of
+ * the rows from the inner relation. However, the exact filtering
+ * rate where the semijoin is deemed useful is determined by the
+ * mergejoin cost model itself, not this function.
+ *
+ * If there is more than one equijoin condition, we favor the one with the
+ * higher estimated filtering rate.
+ *
+ * If this function finds an appropriate semijoin, it will
+ * allocate a PushdownSemijoinMetadata object to store the
+ * semijoin metadata, and then attach it to the Join plan node.
+ */
+#define MAX_SEMIJOIN_SINGLE_KEY_WIDTH 128
+
+static double
+evaluate_semijoin_filtering_rate(JoinPath *join_path,
+ const List *equijoin_list,
+ const PlannerInfo *root,
+ JoinCostWorkspace *workspace,
+ int *best_clause,
+ int *rows_filtered)
+{
+ const Path *outer_path;
+ const Path *inner_path;
+ ListCell *equijoin_lc = NULL;
+ int equijoin_ordinal = -1;
+ int best_single_col_sj_ordinal = -1;
+ double best_sj_selectivity = 1.01;
+ double best_sj_inner_rows_filtered = -1.0;
+ int num_md;
+ ExprMetadata *outer_md_array = NULL;
+ ExprMetadata *inner_md_array = NULL;
+
+ Assert(equijoin_list);
+ Assert(list_length(equijoin_list) > 0);
+
+ if (!enable_mergejoin_semijoin_filter && !force_mergejoin_semijoin_filter)
+ {
+ return 0; /* option setting disabled semijoin insertion */
+ }
+
+ num_md = list_length(equijoin_list);
+ outer_md_array = alloca(sizeof(ExprMetadata) * num_md);
+ inner_md_array = alloca(sizeof(ExprMetadata) * num_md);
+ if (!outer_md_array || !inner_md_array)
+ {
+ return 0; /* a stack array allocation failed */
+ }
+
+ outer_path = join_path->outerjoinpath;
+ inner_path = join_path->innerjoinpath;
+
+ debug_sj1("SJPD: start evaluate_semijoin_filtering_rate");
+ debug_sj2("SJPD: join inner est rows: %.1lf",
+ inner_path->rows);
+ debug_sj2("SJPD: join outer est rows: %.1lf",
+ outer_path->rows);
+
+ /*
+ * Consider each of the individual equijoin conditions as a possible basis
+ * for creating a semijoin condition
+ */
+ foreach(equijoin_lc, equijoin_list)
+ {
+ OpExpr *equijoin;
+ Node *outer_equijoin_arg = NULL;
+ ExprMetadata *outer_arg_md = NULL;
+ Node *inner_equijoin_arg = NULL;
+ ExprMetadata *inner_arg_md = NULL;
+ double est_sj_selectivity = 1.01;
+ double est_sj_inner_rows_filtered = -1.0;
+
+ equijoin_ordinal++;
+ equijoin = (OpExpr *) lfirst(equijoin_lc);
+
+ Assert(IsA(equijoin, OpExpr));
+ Assert(list_length(equijoin->args) == 2);
+
+ outer_equijoin_arg = linitial(equijoin->args);
+ outer_arg_md = &(outer_md_array[equijoin_ordinal]);
+ analyze_expr_for_metadata((Expr *) outer_equijoin_arg,
+ root, outer_arg_md);
+
+ inner_equijoin_arg = llast(equijoin->args);
+ inner_arg_md = &(inner_md_array[equijoin_ordinal]);
+ analyze_expr_for_metadata((Expr *) inner_equijoin_arg,
+ root, inner_arg_md);
+
+ debug_sj2("SJPD: equijoin condition [%d]", equijoin_ordinal);
+ debug_sj_md("outer", equijoin_ordinal, outer_arg_md);
+ debug_sj_md("inner", equijoin_ordinal, inner_arg_md);
+
+ if (outer_arg_md->base_column_expr &&
+ inner_arg_md->base_column_expr)
+ {
+ /*
+ * If outer key - inner key has FK/PK relationship to each other
+ * and there is no restriction on the primary key side, the
+ * semijoin filter will be useless, we should bail out, even if
+ * the force_semijoin_push_down guc is set. There might be
+ * exceptions, if the outer key has restrictions on the key
+ * variable, but we won't be able to tell until the Plan level. We
+ * will be conservative and assume that an FK/PK relationship will
+ * yield a useless filter.
+ */
+ if (
+ is_fk_pk(outer_arg_md->base_column_expr,
+ inner_arg_md->base_column_expr,
+ equijoin->opno, root))
+ {
+ debug_sj2("SJPD: inner and outer equijoin columns %s",
+ "are PK/FK; semijoin would not be useful");
+ continue;
+ }
+ }
+
+ /* Now see if we can push a semijoin to its source scan node */
+ if (!outer_arg_md->local_column_expr || !inner_arg_md->local_column_expr)
+ {
+ debug_sj2("SJPD: could not find a local outer or inner column to%s",
+ " use as semijoin basis; semijoin is not valid");
+ continue; /* Continue on to the next equijoin condition */
+ }
+
+ if (!verify_valid_pushdown((Path *) (join_path->innerjoinpath),
+ inner_arg_md->local_column_expr->varno, root))
+ {
+ debug_sj2("SJPD: could not find a place to evaluate %s",
+ "a semijoin condition; semijoin is not valid");
+ continue; /* Continue on to the next equijoin condition */
+ }
+
+ /*
+ * Adjust cached estimated inner key distinct counts down using the
+ * inner side tuple count as an upper bound
+ */
+ inner_arg_md->expr_est_distincts =
+ fmax(1.0, fmin(inner_path->rows,
+ inner_arg_md->expr_est_distincts));
+
+ /*
+ * We need to estimate the outer key distinct count as close as
+ * possible to the where the semijoin filter will actually be applied,
+ * ignoring the effects of any indirect filtering that would occur
+ * after the semijoin.
+ */
+ outer_arg_md->expr_est_distincts =
+ fmax(1.0, fmin(outer_path->rows,
+ outer_arg_md->expr_est_distincts));
+
+ /* Next, see if this equijoin is valid as a semijoin basis */
+ if (!outer_arg_md->is_or_maps_to_base_column
+ && !inner_arg_md->is_or_maps_to_constant)
+ {
+ debug_sj2("SJPD: outer equijoin arg does not map %s",
+ "to a base column nor a constant; semijoin is not valid");
+ continue; /* Continue on to the next equijoin condition */
+ }
+ if (!inner_arg_md->is_or_maps_to_base_column
+ && !inner_arg_md->is_or_maps_to_constant)
+ {
+ debug_sj2("SJPD: inner equijoin arg maps to neither %s",
+ "a base column; semijoin is not valid");
+ continue; /* Continue on to the next equijoin condition */
+ }
+
+ /*
+ * If force_mergejoin_semijoin_filter is used, set the default clause
+ * as the first valid one.
+ */
+ if (force_mergejoin_semijoin_filter && best_single_col_sj_ordinal == -1)
+ {
+ best_single_col_sj_ordinal = equijoin_ordinal;
+ }
+ /* Now we know it's valid, see if this potential semijoin is useful */
+ if (!outer_arg_md->est_distincts_reliable)
+ {
+ debug_sj2("SJPD: outer equijoin column's distinct %s",
+ "estimates are not reliable; condition rejected");
+ continue; /* Continue on to the next equijoin condition */
+ }
+ if (outer_arg_md->est_col_width == 0)
+ {
+ debug_sj2("SJPD: outer equijoin column's width %s",
+ "could not be estimated; condition rejected");
+ continue; /* Continue on to the next equijoin condition */
+ }
+ if (outer_arg_md->est_col_width > MAX_SEMIJOIN_SINGLE_KEY_WIDTH)
+ {
+ debug_sj2("SJPD: outer equijoin column's width %s",
+ "was excessive; condition rejected");
+ continue; /* Continue on to the next equijoin condition */
+ }
+ if (!(outer_arg_md->is_or_maps_to_constant
+ || (inner_arg_md->is_or_maps_to_base_column
+ && inner_arg_md->est_distincts_reliable)
+ || (inner_path->rows
+ < outer_arg_md->expr_est_distincts)))
+ {
+ debug_sj2("SJPD: inner equijoin arg does not have %s",
+ "a reliable distinct count; condition rejected");
+ continue; /* Continue on to the next equijoin condition */
+ }
+
+ /*
+ * We now try to estimate the filtering rate (1 minus selectivity) and
+ * rows filtered of the filter. We first start by finding the ranges
+ * of both the outer and inner var, and find the overlap between these
+ * ranges. We assume an equal distribution of variables among this
+ * range, and we can then calculate the amount of filtering our SJF
+ * would do.
+ */
+ if (workspace->inner_min_val > workspace->outer_max_val
+ || workspace->inner_max_val < workspace->outer_min_val)
+ {
+ /*
+ * This would mean that the outer and inner tuples are completely
+ * disjoin from each other. We will not be as optimistic, and just
+ * assign a filtering rate of 95%.
+ */
+ est_sj_selectivity = 0.05; /* selectivity is 1 minus filtering
+ * rate */
+ est_sj_inner_rows_filtered = 0.95 * inner_arg_md->base_rel_filt_row_count;
+ }
+ else
+ {
+#define APPROACH_1_DAMPENING_FACTOR 0.8
+#define APPROACH_2_DAMPENING_FACTOR 0.66
+ /*
+ * There are two approaches to estimating the filtering rate. We
+ * have already outlined the first approach above, finding the
+ * range and assuming an equal distribution. For the second
+ * approach, we do not assume anything about the distribution, but
+ * compare the number of distincts. If, for example, the inner
+ * relation has 1000 distincts and the outer has 500, then there
+ * is guaranteed to be at least 500 rows filtered from the inner
+ * relation, regardless of the data distribution. We make an
+ * assumption here that the distribution of distinct variables is
+ * equal to the distribution of all rows so we can multiply by the
+ * ratio of duplicate values. We then take the geometric mean of
+ * these two approaches for our final estimated filtering rate. We
+ * also multiply these values by dampening factors, which we have
+ * found via experimentation and probably need fine-tuning.
+ */
+ double approach_1_selectivity; /* finding selectivity instead
+ * of filtering rate for
+ * legacy code reasons */
+ double approach_2_selectivity;
+ double inner_overlapping_range = workspace->outer_max_val - workspace->inner_min_val;
+
+ /* we are assuming an equal distribution of val's */
+ double inner_overlapping_ratio = inner_overlapping_range / inner_arg_md->base_rel_filt_row_count;
+
+ Assert(inner_overlapping_ratio >= 0 && inner_overlapping_ratio <= 1);
+
+ /*
+ * testing has found that this method is generaly over-optimistic,
+ * so we multiply by a dampening effect.
+ */
+ approach_1_selectivity = inner_overlapping_ratio * APPROACH_1_DAMPENING_FACTOR;
+ if (inner_arg_md->expr_est_distincts > outer_arg_md->expr_est_distincts)
+ {
+ int inner_more_distincts = inner_arg_md->expr_est_distincts - outer_arg_md->expr_est_distincts;
+
+ approach_2_selectivity = 1 - ((double) inner_more_distincts) / inner_arg_md->expr_est_distincts;
+
+ /*
+ * testing has found that this method is generaly
+ * over-optimistic, so we multiply by a dampening effect.
+ */
+ approach_2_selectivity = 1 - ((1 - approach_2_selectivity) * APPROACH_2_DAMPENING_FACTOR);
+ }
+ else
+ {
+ /*
+ * This means that the outer relation has the same or more
+ * distincts than the inner relation, which is not good for
+ * our filtering rate. We will assume a base filtering rate of
+ * 10% in this case.
+ */
+ approach_2_selectivity = 0.9;
+ }
+ est_sj_selectivity = sqrt(approach_1_selectivity * approach_2_selectivity);
+ est_sj_inner_rows_filtered = (1 - est_sj_selectivity) * inner_arg_md->base_rel_filt_row_count;
+ }
+ est_sj_selectivity = fmin(1.0, est_sj_selectivity);
+ est_sj_inner_rows_filtered = fmax(1.0, est_sj_inner_rows_filtered);
+
+ debug_sj2("SJPD: eligible semijoin selectivity: %.7lf",
+ est_sj_selectivity);
+ debug_sj2("SJPD: eligible semijoin rows filtered: %.7lf",
+ est_sj_inner_rows_filtered);
+
+ if (est_sj_selectivity < best_sj_selectivity)
+ {
+ debug_sj1("SJPD: found most useful semijoin seen so far");
+ best_sj_selectivity = est_sj_selectivity;
+ best_sj_inner_rows_filtered = est_sj_inner_rows_filtered;
+ best_single_col_sj_ordinal = equijoin_ordinal;
+ }
+ else
+ { /* This semijoin was rejected, so explain why */
+ debug_sj2("SJPD: found useful single column semijoin; %s",
+ "not as useful as best found so far, so rejected");
+ }
+ }
+
+ if (best_single_col_sj_ordinal != -1)
+ {
+ debug_sj2("SJPD: best single column sj selectivity: %.7lf",
+ best_sj_selectivity);
+ debug_sj2("SJPD: best single column rows filtered: %.7lf",
+ best_sj_inner_rows_filtered);
+ }
+
+ debug_sj1("SJPD: finish evaluate_semijoin_filtering_rate");
+ *best_clause = best_single_col_sj_ordinal;
+ *rows_filtered = best_sj_inner_rows_filtered;
+ return 1 - best_sj_selectivity;
+}
+
+/*
+ * Determine whether a semijoin condition could be pushed from the join
+ * all the way to the leaf scan node.
+ *
+ * Parameters:
+ * node: path node to be considered for semijoin push down.
+ * target_var: the inner side join key for a potential semijoin.
+ * target_relids: relids of all target leaf relations,
+ * used only for partitioned table.
+ */
+static bool
+verify_valid_pushdown(const Path *path,
+ const Index target_var_no,
+ const PlannerInfo *root)
+{
+ Assert(path);
+ Assert(target_var_no > 0);
+
+ if (path == NULL)
+ {
+ return false;
+ }
+
+ /* Guard against stack overflow due to overly complex plan trees */
+ check_stack_depth();
+
+ switch (path->pathtype)
+ {
+ /* directly push through these paths */
+ case T_Material:
+ {
+ return verify_valid_pushdown(((MaterialPath *) path)->subpath, target_var_no, root);
+ }
+ case T_Gather:
+ {
+ return verify_valid_pushdown(((GatherPath *) path)->subpath, target_var_no, root);
+ }
+ case T_GatherMerge:
+ {
+ return verify_valid_pushdown(((GatherMergePath *) path)->subpath, target_var_no, root);
+ }
+ case T_Sort:
+ {
+ return verify_valid_pushdown(((SortPath *) path)->subpath, target_var_no, root);
+ }
+ case T_Unique:
+ {
+ return verify_valid_pushdown(((UniquePath *) path)->subpath, target_var_no, root);
+ }
+
+ case T_Agg:
+ { /* We can directly push bloom through GROUP
+ * BYs and DISTINCTs, as long as there are no
+ * grouping sets. However, we cannot validate
+ * this fact until the Plan has been created.
+ * We will push through for now, but verify
+ * again during Plan creation. */
+ return verify_valid_pushdown(((AggPath *) path)->subpath, target_var_no, root);
+ }
+
+ case T_Append:
+ case T_SubqueryScan:
+ {
+ /*
+ * Both append and subquery paths are currently unimplemented,
+ * so we will just return false, but theoretically there are
+ * ways to check if a filter can be pushed through them. The
+ * previous HashJoin CR has implemented these cases, but that
+ * code is run these after the plan has been created, so code
+ * will need to be adjusted to do it during Path evaluation.
+ */
+ return false;
+ }
+
+ /* Leaf nodes */
+ case T_IndexScan:
+ case T_BitmapHeapScan:
+ {
+ /*
+ * We could definitely implement pushdown filters for Index
+ * and Bitmap Scans, but currently it is only implemented for
+ * SeqScan. For now, we return false.
+ */
+ return false;
+ }
+ case T_SeqScan:
+ {
+ if (path->parent->relid == target_var_no)
+ {
+ /*
+ * Found source of target var! We know that the pushdown
+ * is valid now.
+ */
+ return true;
+ }
+ return false;
+ }
+
+ case T_NestLoop:
+ case T_MergeJoin:
+ case T_HashJoin:
+ {
+ /*
+ * since this is going to be a sub-join, we can push through
+ * both sides and don't need to worry about left/right/inner
+ * joins.
+ */
+ JoinPath *join = (JoinPath *) path;
+
+ return verify_valid_pushdown(join->outerjoinpath, target_var_no, root) ||
+ verify_valid_pushdown(join->innerjoinpath, target_var_no, root);
+ }
+
+ default:
+ {
+ return false;
+ }
+ }
+}
+
+static TargetEntry *
+get_nth_targetentry(int n, const List *targetlist)
+{
+ int i = 1;
+ ListCell *lc = NULL;
+
+ Assert(n > 0);
+ Assert(targetlist && nodeTag(targetlist) == T_List);
+ Assert(list_length(targetlist) >= n);
+
+ if (targetlist && list_length(targetlist) >= n)
+ {
+ foreach(lc, targetlist)
+ {
+ if (i == n)
+ {
+ TargetEntry *te = lfirst(lc);
+
+ return te;
+ }
+ i++;
+ }
+ }
+ return NULL;
+}
+
+/*
+ * expressions_match_foreign_key
+ * True if the given con_exprs, ref_exprs and operators will exactlty
+ * reflect the expressions referenced by the given foreign key fk.
+ *
+ * Note: This function expects con_exprs and ref_exprs to only contain Var types.
+ * Expression indexes are not supported by foreign keys.
+ */
+bool
+expressions_match_foreign_key(ForeignKeyOptInfo *fk,
+ List *con_exprs,
+ List *ref_exprs,
+ List *operators)
+{
+ ListCell *lc;
+ ListCell *lc2;
+ ListCell *lc3;
+ int col;
+ Bitmapset *all_vars;
+ Bitmapset *matched_vars;
+ int idx;
+
+ Assert(list_length(con_exprs) == list_length(ref_exprs));
+ Assert(list_length(con_exprs) == list_length(operators));
+
+ /*
+ * Fast path out if there's not enough conditions to match each column in
+ * the foreign key. Note that we cannot check that the number of
+ * expressions are equal here since it would cause any expressions which
+ * are duplicated not to match.
+ */
+ if (list_length(con_exprs) < fk->nkeys)
+ return false;
+
+ /*
+ * We need to ensure that each item in con_exprs/ref_exprs can be matched
+ * to a foreign key column in the actual foreign key data fk. We can do
+ * this by looping over each fk column and checking that we find a
+ * matching con_expr/ref_expr in con_exprs/ref_exprs. This method does not
+ * however, allow us to ensure that there are no additional items in
+ * con_exprs/ref_exprs that have not been matched. To remedy this we will
+ * create 2 bitmapsets, one which will keep track of all of the vars, the
+ * other which will keep track of the vars that we have matched. After
+ * matching is complete, we will ensure that these bitmapsets are equal to
+ * ensure we have complete mapping in both directions (fk cols to vars and
+ * vars to fk cols)
+ */
+ all_vars = NULL;
+ matched_vars = NULL;
+
+ /*
+ * Build a bitmapset which tracks all vars by their index
+ */
+ for (idx = 0; idx < list_length(con_exprs); idx++)
+ all_vars = bms_add_member(all_vars, idx);
+
+ for (col = 0; col < fk->nkeys; col++)
+ {
+ bool matched = false;
+
+ idx = 0;
+
+ forthree(lc, con_exprs, lc2, ref_exprs, lc3, operators)
+ {
+ Var *con_expr = (Var *) lfirst(lc);
+ Var *ref_expr = (Var *) lfirst(lc2);
+ Oid opr = lfirst_oid(lc3);
+
+ Assert(IsA(con_expr, Var));
+ Assert(IsA(ref_expr, Var));
+
+ /* Does this join qual match up to the current fkey column? */
+ if (fk->conkey[col] == con_expr->varattno &&
+ fk->confkey[col] == ref_expr->varattno &&
+ equality_ops_are_compatible(opr, fk->conpfeqop[col]))
+ {
+ matched = true;
+
+ /* mark the index of this var as matched */
+ matched_vars = bms_add_member(matched_vars, idx);
+
+ /*
+ * Don't break here as there may be duplicate expressions that
+ * match this column that we also need to mark as matched
+ */
+ }
+ idx++;
+ }
+
+ /*
+ * can't remove a join if there's no match to fkey column on join
+ * condition.
+ */
+ if (!matched)
+ return false;
+ }
+
+ /*
+ * Ensure that we managed to match every var in con_var/ref_var to a
+ * foreign key constraint.
+ */
+ if (!bms_equal(all_vars, matched_vars))
+ return false;
+ return true;
+}
+
+/*
+ * Determine if the given outer and inner Exprs satisfy any fk-pk
+ * relationship.
+ */
+static bool
+is_fk_pk(const Var *outer_var,
+ const Var *inner_var,
+ Oid op_oid,
+ const PlannerInfo *root)
+{
+ ListCell *lc = NULL;
+ List *outer_key_list = list_make1((Var *) outer_var);
+ List *inner_key_list = list_make1((Var *) inner_var);
+ List *operators = list_make1_oid(op_oid);
+
+ foreach(lc, root->fkey_list)
+ {
+ ForeignKeyOptInfo *fk = (ForeignKeyOptInfo *) lfirst(lc);
+
+ if (expressions_match_foreign_key(fk,
+ outer_key_list,
+ inner_key_list,
+ operators))
+ {
+ return true;
+ }
+ }
+
+ return false;
+}
+
+/*
+ * get_switched_clauses
+ * Given a list of merge or hash joinclauses (as RestrictInfo nodes),
+ * extract the bare clauses, and rearrange the elements within the
+ * clauses, if needed, so the outer join variable is on the left and
+ * the inner is on the right. The original clause data structure is not
+ * touched; a modified list is returned. We do, however, set the transient
+ * outer_is_left field in each RestrictInfo to show which side was which.
+ */
+static List *
+get_switched_clauses(List *clauses, Relids outerrelids)
+{
+ List *t_list = NIL;
+ ListCell *l;
+
+ foreach(l, clauses)
+ {
+ RestrictInfo *restrictinfo = (RestrictInfo *) lfirst(l);
+ OpExpr *clause = (OpExpr *) restrictinfo->clause;
+
+ Assert(is_opclause(clause));
+
+ /* TODO: handle the case where the operator doesn't hava a commutator */
+ if (bms_is_subset(restrictinfo->right_relids, outerrelids)
+ && OidIsValid(get_commutator(clause->opno)))
+ {
+ /*
+ * Duplicate just enough of the structure to allow commuting the
+ * clause without changing the original list. Could use
+ * copyObject, but a complete deep copy is overkill.
+ */
+ OpExpr *temp = makeNode(OpExpr);
+
+ temp->opno = clause->opno;
+ temp->opfuncid = InvalidOid;
+ temp->opresulttype = clause->opresulttype;
+ temp->opretset = clause->opretset;
+ temp->opcollid = clause->opcollid;
+ temp->inputcollid = clause->inputcollid;
+ temp->args = list_copy(clause->args);
+ temp->location = clause->location;
+ /* Commute it --- note this modifies the temp node in-place. */
+ CommuteOpExpr(temp);
+ t_list = lappend(t_list, temp);
+ restrictinfo->outer_is_left = false;
+ }
+ else
+ {
+ /*
+ * TODO: check if Assert(bms_is_subset(restrictinfo->left_relids,
+ * outerrelids)) is necessary.
+ */
+ t_list = lappend(t_list, clause);
+ restrictinfo->outer_is_left = true;
+ }
+ }
+ return t_list;
+}
diff --git a/src/backend/optimizer/plan/createplan.c b/src/backend/optimizer/plan/createplan.c
index ab4d8e201d..98a673b9b9 100644
--- a/src/backend/optimizer/plan/createplan.c
+++ b/src/backend/optimizer/plan/createplan.c
@@ -315,6 +315,36 @@ static ModifyTable *make_modifytable(PlannerInfo *root, Plan *subplan,
static GatherMerge *create_gather_merge_plan(PlannerInfo *root,
GatherMergePath *best_path);
+/*
+ * Local functions and option variables to support
+ * semijoin pushdowns from join nodes
+ */
+static int depth_of_semijoin_target(Plan *pn,
+ const Var *target_var,
+ Bitmapset *target_relids,
+ int cur_depth,
+ const PlannerInfo *root,
+ Plan **target_node);
+static bool is_side_of_join_source_of_var(const Plan *pn,
+ bool testing_outer_side,
+ const Var *target_var);
+static bool
+ is_table_scan_node_source_of_relids_or_var(const Plan *pn,
+ const Var *target_var,
+ Bitmapset *target_relids);
+static int position_of_var_in_targetlist(const Var *target_var,
+ const List *targetlist);
+static TargetEntry *get_nth_targetentry(int posn,
+ const List *targetlist);
+static void get_partition_table_relids(RelOptInfo *rel,
+ Bitmapset **target_relids);
+static int get_appendrel_occluded_references(const Expr *ex,
+ Expr **occluded_exprs,
+ int num_exprs,
+ const PlannerInfo *root);
+static Expr *get_subquery_var_occluded_reference(const Expr *ex,
+ const PlannerInfo *root);
+
/*
* create_plan
@@ -4691,6 +4721,39 @@ create_mergejoin_plan(PlannerInfo *root,
/* Costs of sort and material steps are included in path cost already */
copy_generic_path_info(&join_plan->join.plan, &best_path->jpath.path);
+ /* Check if we should attach a pushdown semijoin to this join */
+ if (best_path->use_semijoinfilter)
+ {
+ if (best_path->best_mergeclause != -1)
+ {
+ ListCell *clause_cell = list_nth_cell(mergeclauses, best_path->best_mergeclause);
+ OpExpr *joinclause = (OpExpr *) lfirst(clause_cell);
+ Node *outer_join_arg = linitial(joinclause->args);
+ Node *inner_join_arg = llast(joinclause->args);
+ ExprMetadata *outer_arg_md = (ExprMetadata *) palloc0(sizeof(ExprMetadata));
+ ExprMetadata *inner_arg_md = (ExprMetadata *) palloc0(sizeof(ExprMetadata));
+ int outer_depth;
+ int inner_depth;
+
+ Assert(IsA(joinclause, OpExpr));
+ Assert(list_length(joinclause->args) == 2);
+ analyze_expr_for_metadata((Expr *) outer_join_arg, root, outer_arg_md);
+ analyze_expr_for_metadata((Expr *) inner_join_arg, root, inner_arg_md);
+ outer_depth = depth_of_semijoin_target((Plan *) join_plan,
+ outer_arg_md->local_column_expr, NULL, 0, root, &join_plan->buildingNode);
+ inner_depth = depth_of_semijoin_target((Plan *) join_plan,
+ inner_arg_md->local_column_expr, NULL, 0, root, &join_plan->checkingNode);
+ if (inner_depth > -1 && outer_depth > -1)
+ {
+ join_plan->applySemiJoinFilter = true;
+ join_plan->bestExpr = best_path->best_mergeclause;
+ join_plan->filteringRate = best_path->filteringRate;
+ }
+ pfree(outer_arg_md);
+ pfree(inner_arg_md);
+ }
+ }
+
return join_plan;
}
@@ -7216,3 +7279,551 @@ is_projection_capable_plan(Plan *plan)
}
return true;
}
+
+/*
+ * Determine whether a semijoin condition could be pushed from the join
+ * all the way to the leaf scan node. If so, determine the number of
+ * nodes between the join and the scan node (inclusive of the scan node).
+ * If the search was terminated by a node the semijoin could not be
+ * pushed through, the function returns -1.
+ *
+ * Parameters:
+ * node: plan node to be considered for semijoin push down.
+ * target_var: the outer side join key for a potential semijoin.
+ * target_relids: relids of all target leaf relations,
+ * used only for partitioned table.
+ * cur_depth: current depth from the root hash join plan node.
+ * target_node: stores the target plan node where filter will be applied
+ */
+static int
+depth_of_semijoin_target(Plan *pn,
+ const Var *target_var,
+ Bitmapset *target_relids,
+ int cur_depth,
+ const PlannerInfo *root,
+ Plan **target_node)
+{
+ int depth = -1;
+
+ Assert(pn);
+ Assert(target_var && IsA(target_var, Var));
+ Assert(target_var->varno > 0);
+
+ if (pn == NULL)
+ {
+ return -1;
+ }
+
+ /* Guard against stack overflow due to overly complex plan trees */
+ check_stack_depth();
+
+ switch (nodeTag(pn))
+ {
+ case T_Hash:
+ case T_Material:
+ case T_Gather:
+ case T_GatherMerge:
+ case T_Sort:
+ case T_Unique:
+ { /* Directly push bloom through these node
+ * types */
+ depth = depth_of_semijoin_target(pn->lefttree, target_var,
+ target_relids, cur_depth + 1, root, target_node);
+ break;
+ }
+
+ case T_Agg:
+ { /* Directly push bloom through GROUP BYs and
+ * DISTINCTs, as long as there are no grouping
+ * sets */
+ Agg *agg_pn = (Agg *) pn;
+
+ if (!agg_pn->groupingSets
+ || list_length(agg_pn->groupingSets) == 0)
+ {
+ depth = depth_of_semijoin_target(pn->lefttree, target_var,
+ target_relids, cur_depth + 1,
+ root, target_node);
+ }
+ break;
+ }
+
+ case T_SubqueryScan:
+ {
+ /*
+ * Directly push semijoin into subquery if we can, but we need
+ * to map the target var to the occluded expression within the
+ * SELECT list of the subquery
+ */
+ SubqueryScan *subq_scan = (SubqueryScan *) pn;
+ RelOptInfo *rel = NULL;
+ RangeTblEntry *rte = NULL;
+ Var *subq_target_var = NULL;
+
+ /*
+ * To travel into a subquery we need to use the subquery's
+ * PlannerInfo, the root of subquery's plan tree, and the
+ * subquery's SELECT list item that was occluded by the Var
+ * used within this query block
+ */
+ rte = root->simple_rte_array[subq_scan->scan.scanrelid];
+ Assert(rte);
+ Assert(rte->subquery);
+ Assert(rte->rtekind == RTE_SUBQUERY);
+ Assert(rte->subquery->targetList);
+
+ rel = find_base_rel((PlannerInfo *) root,
+ subq_scan->scan.scanrelid);
+ Assert(rel->rtekind == RTE_SUBQUERY);
+ Assert(rel->subroot);
+
+ if (rel && rel->subroot
+ && rte && rte->subquery && rte->subquery->targetList)
+ {
+ /* Find the target_var's occluded expression */
+ Expr *occluded_expr =
+ get_subquery_var_occluded_reference((Expr *) target_var,
+ root);
+
+ if (occluded_expr && IsA(occluded_expr, Var))
+ {
+ subq_target_var = (Var *) occluded_expr;
+ if (subq_target_var->varno > 0)
+ depth = depth_of_semijoin_target(subq_scan->subplan,
+ subq_target_var,
+ target_relids,
+ cur_depth + 1,
+ rel->subroot,
+ target_node);
+ }
+ }
+ break;
+ }
+
+ /* Either from a partitioned table or Union All */
+ case T_Append:
+ {
+ int max_depth = -1;
+ Append *append = (Append *) pn;
+ RelOptInfo *rel = NULL;
+ RangeTblEntry *rte = NULL;
+
+ rte = root->simple_rte_array[target_var->varno];
+ rel = find_base_rel((PlannerInfo *) root, target_var->varno);
+
+ if (rte->inh && append->appendplans)
+ {
+ int num_exprs = list_length(append->appendplans);
+ Expr **occluded_exprs = alloca(num_exprs * sizeof(Expr *));
+ int idx = 0;
+ ListCell *lc = NULL;
+
+ /* Partitioned table */
+ if (rel->part_scheme && rel->part_rels)
+ {
+ get_partition_table_relids(rel, &target_relids);
+
+ foreach(lc, append->appendplans)
+ {
+ Plan *appendplan = (Plan *) lfirst(lc);
+
+ depth = depth_of_semijoin_target(appendplan,
+ target_var,
+ target_relids,
+ cur_depth + 1,
+ root,
+ target_node);
+
+ if (depth > max_depth)
+ max_depth = depth;
+ }
+ }
+ /* Union All, not partitioned table */
+ else if (num_exprs == get_appendrel_occluded_references(
+ (Expr *) target_var,
+ occluded_exprs,
+ num_exprs,
+ root))
+ {
+ Var *subq_target_var = NULL;
+
+ foreach(lc, append->appendplans)
+ {
+ Expr *occluded_expr = occluded_exprs[idx++];
+ Plan *appendplan = (Plan *) lfirst(lc);
+
+ if (occluded_expr && IsA(occluded_expr, Var))
+ {
+ subq_target_var = (Var *) occluded_expr;
+
+ depth = depth_of_semijoin_target(appendplan,
+ subq_target_var,
+ target_relids,
+ cur_depth + 1,
+ root,
+ target_node);
+
+ if (depth > max_depth)
+ max_depth = depth;
+ }
+ }
+ }
+ }
+ depth = max_depth;
+ break;
+ }
+
+ /* Leaf nodes */
+ case T_IndexScan:
+ case T_BitmapHeapScan:
+ {
+ return -1;
+ }
+ case T_SeqScan:
+ {
+ if (is_table_scan_node_source_of_relids_or_var(pn, target_var, target_relids))
+ {
+ /* Found ultimate source of the join key! */
+ *target_node = pn;
+ depth = cur_depth;
+ }
+ break;
+ }
+
+ case T_NestLoop:
+ case T_MergeJoin:
+ case T_HashJoin:
+ {
+ /*
+ * pn->path_jointype is not always the same as join->jointype.
+ * Avoid using pn->path_jointype when you need accurate
+ * jointype, use join->jointype instead.
+ */
+ Join *join = (Join *) pn;
+
+ /*
+ * Push bloom filter to outer node if (target relation is
+ * under the outer plan node, decided by
+ * is_side_of_join_source_of_var() ) and either the following
+ * condition satisfies: 1. this is an inner join or semi join
+ * 2. this is a root right join 3. this is an intermediate
+ * left join
+ */
+ if (is_side_of_join_source_of_var(pn, true, target_var))
+ {
+ if (join->jointype == JOIN_INNER
+ || join->jointype == JOIN_SEMI
+ || (join->jointype == JOIN_RIGHT && cur_depth == 0)
+ || (join->jointype == JOIN_LEFT && cur_depth > 0))
+ {
+ depth = depth_of_semijoin_target(pn->lefttree, target_var,
+ target_relids, cur_depth + 1, root,
+ target_node);
+ }
+ }
+ else
+ {
+ /*
+ * Push bloom filter to inner node if (target rel is under
+ * the inner node, decided by
+ * is_side_of_join_source_of_var() ), and either the
+ * following condition satisfies: 1. this is an inner join
+ * or semi join 2. this is an intermediate right join
+ */
+ Assert(is_side_of_join_source_of_var(pn, false, target_var));
+ if (join->jointype == JOIN_INNER
+ || join->jointype == JOIN_SEMI
+ || (join->jointype == JOIN_RIGHT && cur_depth > 0))
+ {
+ depth = depth_of_semijoin_target(pn->righttree, target_var,
+ target_relids, cur_depth + 1, root,
+ target_node);
+ }
+ }
+ break;
+ }
+
+ default:
+ { /* For all other node types, just bail out and
+ * apply the semijoin filter somewhere above
+ * this node. */
+ depth = -1;
+ }
+ }
+ return depth;
+}
+
+static bool
+is_side_of_join_source_of_var(const Plan *pn,
+ bool testing_outer_side,
+ const Var *target_var)
+{
+ /* Determine if target_var is from the indicated child of the join */
+ Plan *target_child = NULL;
+
+ Assert(pn);
+ Assert(target_var && nodeTag(target_var) == T_Var);
+ Assert(nodeTag(pn) == T_NestLoop || nodeTag(pn) == T_MergeJoin
+ || nodeTag(pn) == T_HashJoin);
+
+ if (testing_outer_side)
+ {
+ target_child = pn->lefttree;
+ }
+ else
+ {
+ target_child = pn->righttree;
+ }
+
+ return (position_of_var_in_targetlist(target_var,
+ target_child->targetlist) >= 0);
+}
+
+/*
+ * Determine if this scan node is the source of the specified relids,
+ * or the source of the specified var if target_relids is not given.
+ */
+static bool
+is_table_scan_node_source_of_relids_or_var(const Plan *pn,
+ const Var *target_var,
+ Bitmapset *target_relids)
+{
+ Scan *scan_node = (Scan *) pn;
+ Index scan_node_varno = 0;
+
+ Assert(pn);
+ Assert(target_var && nodeTag(target_var) == T_Var);
+ Assert(nodeTag(pn) == T_SeqScan || nodeTag(pn) == T_IndexScan
+ || nodeTag(pn) == T_BitmapHeapScan);
+
+ scan_node_varno = scan_node->scanrelid;
+
+ if (target_relids)
+ {
+ return bms_is_member(scan_node_varno, target_relids);
+ }
+ else if (scan_node_varno == target_var->varno)
+ {
+ /*
+ * This should never be called for a column that is not being
+ * projected at it's table scan node
+ */
+ Assert(position_of_var_in_targetlist(target_var, pn->targetlist) >= 0);
+
+ return true;
+ }
+
+ return false;
+}
+
+static int
+position_of_var_in_targetlist(const Var *target_var, const List *targetlist)
+{
+ ListCell *lc = NULL;
+ int i = 1;
+
+ Assert(target_var && nodeTag(target_var) == T_Var);
+ Assert(targetlist && nodeTag(targetlist) == T_List);
+
+ if (targetlist && target_var)
+ {
+ foreach(lc, targetlist)
+ {
+ TargetEntry *te = lfirst(lc);
+
+ if (IsA(te->expr, Var))
+ {
+ Var *cur_var = (Var *) te->expr;
+
+ if (cur_var->varno == target_var->varno
+ && cur_var->varattno == target_var->varattno)
+ {
+ return i;
+ }
+ }
+ i++;
+ }
+ }
+ return -1;
+}
+
+/*
+ * Recursively gather all relids of the given partitioned table rel.
+ */
+static void
+get_partition_table_relids(RelOptInfo *rel, Bitmapset **target_relids)
+{
+ int i;
+
+ Assert(rel->part_scheme && rel->part_rels);
+
+ for (i = 0; i < rel->nparts; i++)
+ {
+ RelOptInfo *part_rel = rel->part_rels[i];
+
+ if (part_rel->part_scheme && part_rel->part_rels)
+ {
+ get_partition_table_relids(part_rel, target_relids);
+ }
+ else
+ {
+ *target_relids = bms_union(*target_relids,
+ part_rel->relids);
+ }
+ }
+}
+
+/*
+ * Given a virtual column from an Union ALL subquery,
+ * return the expression it immediately occludes that satisfy
+ * the inheritance condition,
+ * i.e. appendRelInfo->parent_relid == outside_subq_var->varno
+ */
+static int
+get_appendrel_occluded_references(const Expr *ex,
+ Expr **occluded_exprs,
+ int num_exprs,
+ const PlannerInfo *root)
+{
+ Var *outside_subq_var = (Var *) ex;
+ RangeTblEntry *outside_subq_rte = NULL;
+ int idx = 0;
+
+
+ Assert(ex && root);
+ Assert(IsA(ex, Var));
+ Assert(outside_subq_var->varno < root->simple_rel_array_size);
+
+ outside_subq_rte = root->simple_rte_array[outside_subq_var->varno];
+
+ /* System Vars have varattno < 0, don't bother */
+ if (outside_subq_var->varattno <= 0)
+ return 0;
+
+ /*
+ * If inheritance, subquery has append, leg of append in subquery may not
+ * have subroot, process it according to root->append_rel_list.
+ */
+ if (outside_subq_rte->inh)
+ {
+ ListCell *lc = NULL;
+
+ Assert(root->append_rel_list &&
+ num_exprs <= list_length(root->append_rel_list));
+
+ foreach(lc, root->append_rel_list)
+ {
+ AppendRelInfo *appendRelInfo = lfirst(lc);
+
+ if (appendRelInfo->parent_relid == outside_subq_var->varno)
+ {
+ Assert(appendRelInfo->translated_vars &&
+ outside_subq_var->varattno <=
+ list_length(appendRelInfo->translated_vars));
+
+ occluded_exprs[idx++] =
+ list_nth(appendRelInfo->translated_vars,
+ outside_subq_var->varattno - 1);
+ }
+ }
+ }
+
+ return idx;
+}
+
+static Expr *
+get_subquery_var_occluded_reference(const Expr *ex, const PlannerInfo *root)
+{
+ /*
+ * Given a virtual column from an unflattened subquery, return the
+ * expression it immediately occludes
+ */
+ Var *outside_subq_var = (Var *) ex;
+ RelOptInfo *outside_subq_relation = NULL;
+ RangeTblEntry *outside_subq_rte = NULL;
+ TargetEntry *te = NULL;
+ Expr *inside_subq_expr = NULL;
+
+ Assert(ex && root);
+ Assert(IsA(ex, Var));
+ Assert(outside_subq_var->varno < root->simple_rel_array_size);
+
+ outside_subq_relation = root->simple_rel_array[outside_subq_var->varno];
+ outside_subq_rte = root->simple_rte_array[outside_subq_var->varno];
+
+ /*
+ * If inheritance, subquery has append, leg of append in subquery may not
+ * have subroot, we may be able to better process it according to
+ * root->append_rel_list. For now just return the first leg... TODO better
+ * handling of Union All, we only return statistics of the first leg atm.
+ * TODO similarly, need better handling of partitioned tables, according
+ * to outside_subq_relation->part_scheme and part_rels.
+ */
+ if (outside_subq_rte->inh)
+ {
+ AppendRelInfo *appendRelInfo = NULL;
+
+ Assert(root->append_rel_list);
+
+ /* TODO remove this check once we add better handling of inheritance */
+ appendRelInfo = list_nth(root->append_rel_list, 0);
+ Assert(appendRelInfo->parent_relid == outside_subq_var->varno);
+
+ Assert(appendRelInfo->translated_vars &&
+ outside_subq_var->varattno <=
+ list_length(appendRelInfo->translated_vars));
+ inside_subq_expr = list_nth(appendRelInfo->translated_vars,
+ outside_subq_var->varattno - 1);
+ }
+
+ /* Subquery without append and partitioned tables */
+ else
+ {
+ Assert(outside_subq_relation && IsA(outside_subq_relation, RelOptInfo));
+ Assert(outside_subq_relation->reloptkind == RELOPT_BASEREL);
+ Assert(outside_subq_relation->rtekind == RTE_SUBQUERY);
+ Assert(outside_subq_relation->subroot->processed_tlist);
+
+ te = get_nth_targetentry(outside_subq_var->varattno,
+ outside_subq_relation->subroot->processed_tlist);
+ Assert(te && outside_subq_var->varattno == te->resno);
+ inside_subq_expr = te->expr;
+
+ /*
+ * Strip off any Relabel present, and return the underlying expression
+ */
+ while (inside_subq_expr && IsA(inside_subq_expr, RelabelType))
+ {
+ inside_subq_expr = ((RelabelType *) inside_subq_expr)->arg;
+ }
+ }
+
+ return inside_subq_expr;
+}
+
+
+static TargetEntry *
+get_nth_targetentry(int n, const List *targetlist)
+{
+ int i = 1;
+ ListCell *lc = NULL;
+
+ Assert(n > 0);
+ Assert(targetlist && nodeTag(targetlist) == T_List);
+ Assert(list_length(targetlist) >= n);
+
+ if (targetlist && list_length(targetlist) >= n)
+ {
+ foreach(lc, targetlist)
+ {
+ if (i == n)
+ {
+ TargetEntry *te = lfirst(lc);
+
+ return te;
+ }
+ i++;
+ }
+ }
+ return NULL;
+}
diff --git a/src/backend/utils/adt/selfuncs.c b/src/backend/utils/adt/selfuncs.c
index 1808388397..b62de16899 100644
--- a/src/backend/utils/adt/selfuncs.c
+++ b/src/backend/utils/adt/selfuncs.c
@@ -2905,7 +2905,9 @@ void
mergejoinscansel(PlannerInfo *root, Node *clause,
Oid opfamily, int strategy, bool nulls_first,
Selectivity *leftstart, Selectivity *leftend,
- Selectivity *rightstart, Selectivity *rightend)
+ Selectivity *rightstart, Selectivity *rightend,
+ Datum *leftmin, Datum *leftmax,
+ Datum *rightmin, Datum *rightmax)
{
Node *left,
*right;
@@ -2925,10 +2927,6 @@ mergejoinscansel(PlannerInfo *root, Node *clause,
revltop,
revleop;
bool isgt;
- Datum leftmin,
- leftmax,
- rightmin,
- rightmax;
double selec;
/* Set default results if we can't figure anything out. */
@@ -3075,20 +3073,20 @@ mergejoinscansel(PlannerInfo *root, Node *clause,
if (!isgt)
{
if (!get_variable_range(root, &leftvar, lstatop, collation,
- &leftmin, &leftmax))
+ leftmin, leftmax))
goto fail; /* no range available from stats */
if (!get_variable_range(root, &rightvar, rstatop, collation,
- &rightmin, &rightmax))
+ rightmin, rightmax))
goto fail; /* no range available from stats */
}
else
{
/* need to swap the max and min */
if (!get_variable_range(root, &leftvar, lstatop, collation,
- &leftmax, &leftmin))
+ leftmax, leftmin))
goto fail; /* no range available from stats */
if (!get_variable_range(root, &rightvar, rstatop, collation,
- &rightmax, &rightmin))
+ rightmax, rightmin))
goto fail; /* no range available from stats */
}
@@ -3098,13 +3096,13 @@ mergejoinscansel(PlannerInfo *root, Node *clause,
* non-default estimates, else stick with our 1.0.
*/
selec = scalarineqsel(root, leop, isgt, true, collation, &leftvar,
- rightmax, op_righttype);
+ *rightmax, op_righttype);
if (selec != DEFAULT_INEQ_SEL)
*leftend = selec;
/* And similarly for the right variable. */
selec = scalarineqsel(root, revleop, isgt, true, collation, &rightvar,
- leftmax, op_lefttype);
+ *leftmax, op_lefttype);
if (selec != DEFAULT_INEQ_SEL)
*rightend = selec;
@@ -3128,13 +3126,13 @@ mergejoinscansel(PlannerInfo *root, Node *clause,
* our own default.
*/
selec = scalarineqsel(root, ltop, isgt, false, collation, &leftvar,
- rightmin, op_righttype);
+ *rightmin, op_righttype);
if (selec != DEFAULT_INEQ_SEL)
*leftstart = selec;
/* And similarly for the right variable. */
selec = scalarineqsel(root, revltop, isgt, false, collation, &rightvar,
- leftmin, op_lefttype);
+ *leftmin, op_lefttype);
if (selec != DEFAULT_INEQ_SEL)
*rightstart = selec;
diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c
index fda3f9befb..084cfdf11f 100644
--- a/src/backend/utils/misc/guc_tables.c
+++ b/src/backend/utils/misc/guc_tables.c
@@ -885,6 +885,26 @@ struct config_bool ConfigureNamesBool[] =
true,
NULL, NULL, NULL
},
+ {
+ {"enable_mergejoin_semijoin_filter", PGC_USERSET, QUERY_TUNING_METHOD,
+ gettext_noop("Enables the planner's use of using Semijoin Bloom filters during merge join."),
+ NULL,
+ GUC_EXPLAIN
+ },
+ &enable_mergejoin_semijoin_filter,
+ false,
+ NULL, NULL, NULL
+ },
+ {
+ {"force_mergejoin_semijoin_filter", PGC_USERSET, QUERY_TUNING_METHOD,
+ gettext_noop("Forces the planner's use of using Semijoin Bloom filters during merge join. Overrides enable_mergejoin_semijoin_filter."),
+ NULL,
+ GUC_EXPLAIN
+ },
+ &force_mergejoin_semijoin_filter,
+ false,
+ NULL, NULL, NULL
+ },
{
{"enable_hashjoin", PGC_USERSET, QUERY_TUNING_METHOD,
gettext_noop("Enables the planner's use of hash join plans."),
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index 2ae76e5cfb..1f3d2c772a 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -379,6 +379,8 @@
#enable_material = on
#enable_memoize = on
#enable_mergejoin = on
+#enable_mergejoin_semijoin_filter = on
+#force_mergejoin_semijoin_filter = on
#enable_nestloop = on
#enable_parallel_append = on
#enable_parallel_hash = on
diff --git a/src/include/nodes/pathnodes.h b/src/include/nodes/pathnodes.h
index 294cfe9c47..663069455e 100644
--- a/src/include/nodes/pathnodes.h
+++ b/src/include/nodes/pathnodes.h
@@ -2014,6 +2014,9 @@ typedef struct MergePath
List *innersortkeys; /* keys for explicit sort, if any */
bool skip_mark_restore; /* can executor skip mark/restore? */
bool materialize_inner; /* add Materialize to inner? */
+ bool use_semijoinfilter; /* should we use a semijoin filter? */
+ double filteringRate; /* estimated filtering rate of SJF */
+ int best_mergeclause; /* best clause to build SJF on */
} MergePath;
/*
@@ -2591,6 +2594,12 @@ typedef struct MergeScanSelCache
Selectivity leftendsel; /* last-join fraction for clause left side */
Selectivity rightstartsel; /* first-join fraction for clause right side */
Selectivity rightendsel; /* last-join fraction for clause right side */
+
+ Datum leftmin; /* min and max values for left and right
+ * clauses */
+ Datum leftmax;
+ Datum rightmin;
+ Datum rightmax;
} MergeScanSelCache;
/*
@@ -3138,6 +3147,10 @@ typedef struct JoinCostWorkspace
Cardinality inner_rows;
Cardinality outer_skip_rows;
Cardinality inner_skip_rows;
+ Datum outer_min_val;
+ Datum outer_max_val;
+ Datum inner_min_val;
+ Datum inner_max_val;
/* private for cost_hashjoin code */
int numbuckets;
diff --git a/src/include/nodes/plannodes.h b/src/include/nodes/plannodes.h
index 21e642a64c..a418a406af 100644
--- a/src/include/nodes/plannodes.h
+++ b/src/include/nodes/plannodes.h
@@ -22,6 +22,7 @@
#include "nodes/lockoptions.h"
#include "nodes/parsenodes.h"
#include "nodes/primnodes.h"
+#include "optimizer/pathnode.h"
/* ----------------------------------------------------------------
@@ -844,6 +845,13 @@ typedef struct MergeJoin
/* per-clause nulls ordering */
bool *mergeNullsFirst pg_node_attr(array_size(mergeclauses));
+
+ /* fields for using a SemiJoinFilter */
+ bool applySemiJoinFilter;
+ double filteringRate;
+ int bestExpr;
+ Plan *buildingNode;
+ Plan *checkingNode;
} MergeJoin;
/* ----------------
diff --git a/src/include/optimizer/cost.h b/src/include/optimizer/cost.h
index f27d11eaa9..cca624a1d8 100644
--- a/src/include/optimizer/cost.h
+++ b/src/include/optimizer/cost.h
@@ -61,6 +61,8 @@ extern PGDLLIMPORT bool enable_nestloop;
extern PGDLLIMPORT bool enable_material;
extern PGDLLIMPORT bool enable_memoize;
extern PGDLLIMPORT bool enable_mergejoin;
+extern PGDLLIMPORT bool enable_mergejoin_semijoin_filter;
+extern PGDLLIMPORT bool force_mergejoin_semijoin_filter;
extern PGDLLIMPORT bool enable_hashjoin;
extern PGDLLIMPORT bool enable_gathermerge;
extern PGDLLIMPORT bool enable_partitionwise_join;
@@ -213,4 +215,49 @@ extern PathTarget *set_pathtarget_cost_width(PlannerInfo *root, PathTarget *targ
extern double compute_bitmap_pages(PlannerInfo *root, RelOptInfo *baserel,
Path *bitmapqual, int loop_count, Cost *cost, double *tuple);
+/*
+ * Container for metadata about an expression, used by semijoin decision logic
+ */
+typedef struct expr_metadata
+{
+ bool is_or_maps_to_constant;
+ bool is_or_maps_to_base_column;
+
+ /* Var and relation from the current query block, if it is a Var */
+ const Var *local_column_expr;
+ const RelOptInfo *local_relation;
+
+ int32 est_col_width;
+
+ /*
+ * The following will be the same as local Var and relation when the local
+ * relation is a base table (i.e. no occluding query blocks). Otherwise
+ * it will be the occluded base column, if the final occluded expression
+ * is a base column.
+ */
+ const Var *base_column_expr;
+ const RelOptInfo *base_rel;
+ const PlannerInfo *base_rel_root;
+ double base_rel_row_count;
+ double base_rel_filt_row_count;
+ double base_col_distincts;
+ Datum base_col_min_value;
+ Datum base_col_max_value;
+
+ /* True if the distinct est is based on something meaningful */
+ bool est_distincts_reliable;
+ bool est_minmax_reliable;
+
+ /* Estimated distincts after local filtering, and row count adjustments */
+ double expr_est_distincts;
+} ExprMetadata;
+
+extern void analyze_expr_for_metadata(const Expr *ex,
+ const PlannerInfo *root,
+ ExprMetadata * md);
+extern bool expressions_match_foreign_key(ForeignKeyOptInfo *fk,
+ List *con_exprs,
+ List *ref_exprs,
+ List *operators);
+
#endif /* COST_H */
diff --git a/src/include/utils/selfuncs.h b/src/include/utils/selfuncs.h
index d485b9bfcd..2d67efd295 100644
--- a/src/include/utils/selfuncs.h
+++ b/src/include/utils/selfuncs.h
@@ -208,7 +208,9 @@ extern Selectivity rowcomparesel(PlannerInfo *root,
extern void mergejoinscansel(PlannerInfo *root, Node *clause,
Oid opfamily, int strategy, bool nulls_first,
Selectivity *leftstart, Selectivity *leftend,
- Selectivity *rightstart, Selectivity *rightend);
+ Selectivity *rightstart, Selectivity *rightend,
+ Datum *leftmin, Datum *leftmax,
+ Datum *rightmin, Datum *rightmax);
extern double estimate_num_groups(PlannerInfo *root, List *groupExprs,
double input_rows, List **pgset,
diff --git a/src/test/regress/expected/sysviews.out b/src/test/regress/expected/sysviews.out
index 4e775af175..c2c3946e95 100644
--- a/src/test/regress/expected/sysviews.out
+++ b/src/test/regress/expected/sysviews.out
@@ -109,30 +109,31 @@ select count(*) = 0 as ok from pg_stat_wal_receiver;
-- This is to record the prevailing planner enable_foo settings during
-- a regression test run.
select name, setting from pg_settings where name like 'enable%';
- name | setting
---------------------------------+---------
- enable_async_append | on
- enable_bitmapscan | on
- enable_gathermerge | on
- enable_group_by_reordering | on
- enable_hashagg | on
- enable_hashjoin | on
- enable_incremental_sort | on
- enable_indexonlyscan | on
- enable_indexscan | on
- enable_material | on
- enable_memoize | on
- enable_mergejoin | on
- enable_nestloop | on
- enable_parallel_append | on
- enable_parallel_hash | on
- enable_partition_pruning | on
- enable_partitionwise_aggregate | off
- enable_partitionwise_join | off
- enable_seqscan | on
- enable_sort | on
- enable_tidscan | on
-(21 rows)
+ name | setting
+----------------------------------+---------
+ enable_async_append | on
+ enable_bitmapscan | on
+ enable_gathermerge | on
+ enable_group_by_reordering | on
+ enable_hashagg | on
+ enable_hashjoin | on
+ enable_incremental_sort | on
+ enable_indexonlyscan | on
+ enable_indexscan | on
+ enable_material | on
+ enable_memoize | on
+ enable_mergejoin | on
+ enable_mergejoin_semijoin_filter | off
+ enable_nestloop | on
+ enable_parallel_append | on
+ enable_parallel_hash | on
+ enable_partition_pruning | on
+ enable_partitionwise_aggregate | off
+ enable_partitionwise_join | off
+ enable_seqscan | on
+ enable_sort | on
+ enable_tidscan | on
+(22 rows)
-- Test that the pg_timezone_names and pg_timezone_abbrevs views are
-- more-or-less working. We can't test their contents in any great detail
--
2.37.1
0003-Support-semijoin-filter-in-the-executor-for-parallel.patchapplication/octet-stream; name=0003-Support-semijoin-filter-in-the-executor-for-parallel.patchDownload
From 0fb9ec964369c2af246cfc53e3866e3beb2b7ecc Mon Sep 17 00:00:00 2001
From: Lyu Pan <lyup@amazon.com>
Date: Fri, 16 Sep 2022 21:28:52 +0000
Subject: [PATCH 3/5] Support semijoin filter in the executor for parallel
mergejoin.
Each worker process create its own bloom filter, and updates its own bloom filter during the outer/left plan scan (no lock is needed). After all processes finish the scan, the bloom filtered are merged together (by performing OR operations on the bit arrays) into a shared bloom filter in the Dynamic Shared Memory area (use lock for synchronization). After this is completed, the merged bloom filter will be copied back to all the worker processes, and be used in the inner/right plan scan for filtering.
---
src/backend/executor/execScan.c | 3 +-
src/backend/executor/nodeMergejoin.c | 221 +++++++++++++++++++++++++--
src/backend/executor/nodeSeqscan.c | 173 ++++++++++++++++++++-
src/backend/lib/bloomfilter.c | 74 +++++++++
src/include/executor/nodeMergejoin.h | 4 +-
src/include/lib/bloomfilter.h | 7 +
src/include/nodes/execnodes.h | 31 ++++
7 files changed, 495 insertions(+), 18 deletions(-)
diff --git a/src/backend/executor/execScan.c b/src/backend/executor/execScan.c
index 4b97c39455..954d99392b 100644
--- a/src/backend/executor/execScan.c
+++ b/src/backend/executor/execScan.c
@@ -215,7 +215,8 @@ ExecScan(ScanState *node,
{
SemiJoinFilterFinishScan(
((SeqScanState *) node)->semiJoinFilters,
- node->ss_currentRelation->rd_id);
+ node->ss_currentRelation->rd_id,
+ node->ps.state->es_query_dsa);
}
if (projInfo)
return ExecClearTuple(projInfo->pi_state.resultslot);
diff --git a/src/backend/executor/nodeMergejoin.c b/src/backend/executor/nodeMergejoin.c
index 81c253aad5..d8fb352313 100644
--- a/src/backend/executor/nodeMergejoin.c
+++ b/src/backend/executor/nodeMergejoin.c
@@ -99,6 +99,10 @@
#include "executor/nodeMergejoin.h"
#include "lib/bloomfilter.h"
#include "miscadmin.h"
+#include "storage/dsm.h"
+#include "storage/lwlock.h"
+#include "storage/shm_toc.h"
+#include "utils/dsa.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
@@ -659,6 +663,26 @@ ExecMergeJoin(PlanState *pstate)
outerTupleSlot = ExecProcNode(outerPlan);
node->mj_OuterTupleSlot = outerTupleSlot;
+ /*
+ * Check if outer plan has an SJF and the inner plan does not.
+ * This case will only arise during parallel execution, when
+ * the outer plan is initialized with the SJF but the inner
+ * plan does not because it is not included in the memory
+ * copied over during worker creation. If this is the case,
+ * then push down the filter to the inner plan level to
+ * correct this error and then proceed as normal.
+ */
+ if (GetSemiJoinFilter(outerPlan, pstate->plan->plan_node_id)
+ && !GetSemiJoinFilter(innerPlan,
+ pstate->plan->plan_node_id))
+ {
+ SemiJoinFilter *sjf = GetSemiJoinFilter(outerPlan,
+ pstate->plan->plan_node_id);
+
+ PushDownFilter(innerPlan, NULL, sjf, &sjf->checkingId,
+ NULL);
+ }
+
/* Compute join values and check for unmatchability */
switch (MJEvalOuterValues(node))
{
@@ -1819,11 +1843,14 @@ PushDownDirection(PlanState *node)
}
}
-/* Recursively pushes down the filter until an appropriate SeqScan node is reached. Then, it
- * verifies if that SeqScan node is the one we want to push the filter to, and if it is, then
- * appends the SJF to the node. */
+/*
+ * Recursively pushes down the filter until an appropriate SeqScan node is
+ * reached. Then, it verifies if that SeqScan node is the one we want to push
+ * the filter to, and if it is, then appends the SJF to the node.
+ */
void
-PushDownFilter(PlanState *node, Plan *plan, SemiJoinFilter * sjf, Oid *relId, int64 *nodeRows)
+PushDownFilter(PlanState *node, Plan *plan, SemiJoinFilter * sjf, Oid *relId,
+ int64 *nodeRows)
{
if (node == NULL)
{
@@ -1838,8 +1865,8 @@ PushDownFilter(PlanState *node, Plan *plan, SemiJoinFilter * sjf, Oid *relId, in
Assert(IsA(scan, SeqScanState));
/*
- * found the right Scan node that we want to apply the filter onto via
- * matching relId
+ * Found the right Scan node that we want to apply the filter onto via
+ * matching relId.
*/
if (scan->ss.ss_currentRelation->rd_id == *relId)
{
@@ -1876,13 +1903,15 @@ PushDownFilter(PlanState *node, Plan *plan, SemiJoinFilter * sjf, Oid *relId, in
}
/*
- * If this table is the building-side table for the SemiJoinFilter, adds the element to
- * the bloom filter and always returns true. If this table is the checking-side table for the SemiJoinFilter,
- * then checks the element against the bloom filter and returns true if the element is (probably) in the set,
- * and false if the element is not in the bloom filter.
+ * If this table is the building-side table for the SemiJoinFilter, adds the
+ * element to the bloom filter and always returns true. If this table is the
+ * checking-side table for the SemiJoinFilter, then checks the element
+ * against the bloom filter and returns true if the element is (probably)
+ * in the set, and false if the element is not in the bloom filter.
*/
bool
-SemiJoinFilterExamineSlot(List *semiJoinFilters, TupleTableSlot *slot, Oid tableId)
+SemiJoinFilterExamineSlot(List *semiJoinFilters,
+ TupleTableSlot *slot, Oid tableId)
{
ListCell *cell;
@@ -1903,7 +1932,8 @@ SemiJoinFilterExamineSlot(List *semiJoinFilters, TupleTableSlot *slot, Oid table
* include multiple join keys.
*/
val = slot->tts_values[sjf->buildingAttr];
- bloom_add_element(sjf->filter, (unsigned char *) &val, sizeof(val));
+ bloom_add_element(sjf->filter, (unsigned char *) &val,
+ sizeof(val));
}
else if (sjf->doneBuilding && tableId == sjf->checkingId)
{
@@ -1912,7 +1942,8 @@ SemiJoinFilterExamineSlot(List *semiJoinFilters, TupleTableSlot *slot, Oid table
slot_getsomeattrs(slot, sjf->checkingAttr + 1);
sjf->elementsChecked++;
val = slot->tts_values[sjf->checkingAttr];
- if (bloom_lacks_element(sjf->filter, (unsigned char *) &val, sizeof(val)))
+ if (bloom_lacks_element(sjf->filter,
+ (unsigned char *) &val, sizeof(val)))
{
sjf->elementsFiltered++;
return false;
@@ -1923,7 +1954,8 @@ SemiJoinFilterExamineSlot(List *semiJoinFilters, TupleTableSlot *slot, Oid table
}
void
-SemiJoinFilterFinishScan(List *semiJoinFilters, Oid tableId)
+SemiJoinFilterFinishScan(List *semiJoinFilters, Oid tableId,
+ dsa_area *parallel_area)
{
ListCell *cell;
@@ -1933,7 +1965,166 @@ SemiJoinFilterFinishScan(List *semiJoinFilters, Oid tableId)
if (!sjf->doneBuilding && tableId == sjf->buildingId)
{
- sjf->doneBuilding = true;
+ if (!sjf->isParallel)
+ {
+ /*
+ * Not parallel, so only one process running and that process
+ * is now complete.
+ */
+ sjf->doneBuilding = true;
+ }
+ else
+ {
+ /* parallel, so need to sync with the other processes */
+ SemiJoinFilterParallelState *parallelState =
+ (SemiJoinFilterParallelState *) dsa_get_address(
+ parallel_area, sjf->parallelState);
+ bloom_filter *shared_bloom =
+ (bloom_filter *) dsa_get_address(
+ parallel_area, parallelState->bloom_dsa_address);
+
+ /*
+ * This process takes control of the lock and updates the
+ * shared bloom filter. These locks are created by the
+ * SemiJoinFilterParallelState and are unique to that struct.
+ */
+ LWLockAcquire(¶llelState->lock, LW_EXCLUSIVE);
+ parallelState->elementsAdded += sjf->elementsAdded;
+ add_to_filter(shared_bloom, sjf->filter);
+ parallelState->workersDone++;
+ LWLockRelease(¶llelState->lock);
+
+ /*
+ * We need to wait until all threads have had their chance to
+ * update the shared bloom filter, since our next step is to
+ * copy the finished bloom filter back into all of the
+ * separate processes.
+ */
+ if (parallelState->workersDone == parallelState->numProcesses)
+ {
+ LWLockUpdateVar(¶llelState->secondlock,
+ ¶llelState->lockStop, 1);
+ }
+ LWLockWaitForVar(¶llelState->secondlock,
+ ¶llelState->lockStop, 0,
+ ¶llelState->lockStop);
+
+ /*
+ * Now the shared Bloom filter is fully updated, so each
+ * individual process copies the finished Bloom filter to the
+ * local SemiJoinFilter.
+ */
+ LWLockAcquire(¶llelState->lock, LW_EXCLUSIVE);
+ replace_bitset(sjf->filter, shared_bloom);
+ sjf->elementsAdded = parallelState->elementsAdded;
+ sjf->doneBuilding = true;
+ parallelState->workersDone++;
+ LWLockRelease(¶llelState->lock);
+
+ /*
+ * Again, we need to wait for all processes to finish copying
+ * the completed bloom filter because the main process will
+ * free the shared memory afterwards.
+ */
+ if (parallelState->workersDone ==
+ 2 * parallelState->numProcesses)
+ {
+ LWLockUpdateVar(¶llelState->secondlock,
+ ¶llelState->lockStop, 2);
+ }
+ LWLockWaitForVar(¶llelState->secondlock,
+ ¶llelState->lockStop, 1,
+ ¶llelState->lockStop);
+ /* release allocated shared memory in main process */
+ if (!sjf->isWorker)
+ {
+ LWLockRelease(¶llelState->secondlock);
+ bloom_free_in_dsa(parallel_area,
+ parallelState->bloom_dsa_address);
+ dsa_free(parallel_area, sjf->parallelState);
+ }
+ }
}
}
}
+
+dsa_pointer
+CreateFilterParallelState(dsa_area *area, SemiJoinFilter * sjf,
+ int sjf_num)
+{
+ dsa_pointer bloom_dsa_address =
+ bloom_create_in_dsa(area, sjf->num_elements, sjf->work_mem, sjf->seed);
+ dsa_pointer parallel_address =
+ dsa_allocate0(area, sizeof(SemiJoinFilterParallelState));
+ SemiJoinFilterParallelState *parallelState =
+ (SemiJoinFilterParallelState *) dsa_get_address(area,
+ parallel_address);
+
+ /* copy over information to parallel state */
+ parallelState->doneBuilding = sjf->doneBuilding;
+ parallelState->seed = sjf->seed;
+ parallelState->num_elements = sjf->num_elements;
+ parallelState->work_mem = sjf->work_mem;
+ parallelState->buildingId = sjf->buildingId;
+ parallelState->checkingId = sjf->checkingId;
+ parallelState->buildingAttr = sjf->buildingAttr;
+ parallelState->checkingAttr = sjf->checkingAttr;
+ parallelState->bloom_dsa_address = bloom_dsa_address;
+ parallelState->sjf_num = sjf_num;
+ parallelState->mergejoin_plan_id = sjf->mergejoin_plan_id;
+ /* initialize locks */
+ LWLockInitialize(¶llelState->lock, LWLockNewTrancheId());
+ LWLockInitialize(¶llelState->secondlock, LWLockNewTrancheId());
+ /* should be main process that acquires lock */
+ LWLockAcquire(¶llelState->secondlock, LW_EXCLUSIVE);
+ return parallel_address;
+}
+
+/*
+ * Checks a side of the execution tree and fetches an SJF if its mergejoin
+ * plan ID matches that of the method's mergejoin ID. Used during parallel
+ * execution, where SJF information is lost during information copying to
+ * the worker.
+ */
+SemiJoinFilter *
+GetSemiJoinFilter(PlanState *node, int plan_id)
+{
+ if (node == NULL)
+ {
+ return NULL;
+ }
+ check_stack_depth();
+ if (node->type == T_SeqScanState)
+ {
+ SeqScanState *scan = (SeqScanState *) node;
+
+ Assert(IsA(scan, SeqScanState));
+ if (scan->applySemiJoinFilter)
+ {
+ ListCell *lc;
+
+ foreach(lc, scan->semiJoinFilters)
+ {
+ SemiJoinFilter *sjf = (SemiJoinFilter *) lfirst(lc);
+
+ if (sjf->mergejoin_plan_id == plan_id)
+ {
+ return sjf;
+ }
+ }
+ return NULL;
+ }
+ }
+ if (PushDownDirection(node) == 1)
+ {
+ /* check both children and return the non-null one */
+ return GetSemiJoinFilter(node->lefttree, plan_id) != NULL ?
+ GetSemiJoinFilter(node->lefttree, plan_id) :
+ GetSemiJoinFilter(node->righttree, plan_id);
+ }
+ if (PushDownDirection(node) == 0)
+ {
+ return GetSemiJoinFilter(node->lefttree, plan_id);
+ }
+ return NULL;
+}
diff --git a/src/backend/executor/nodeSeqscan.c b/src/backend/executor/nodeSeqscan.c
index e43ce3f8d0..c796d0c416 100644
--- a/src/backend/executor/nodeSeqscan.c
+++ b/src/backend/executor/nodeSeqscan.c
@@ -30,9 +30,18 @@
#include "access/relscan.h"
#include "access/tableam.h"
#include "executor/execdebug.h"
+#include "executor/nodeMergejoin.h"
#include "executor/nodeSeqscan.h"
+#include "storage/lwlock.h"
+#include "storage/shm_toc.h"
#include "utils/rel.h"
+/*
+ * Magic number for location of shared dsa pointer if scan is using a semi-join
+ * filter.
+ */
+#define DSA_LOCATION_KEY_FOR_SJF UINT64CONST(0xE00000000000FFFF)
+
static TupleTableSlot *SeqNext(SeqScanState *node);
/* ----------------------------------------------------------------
@@ -157,7 +166,8 @@ ExecInitSeqScan(SeqScan *node, EState *estate, int eflags)
/* and create slot with the appropriate rowtype */
ExecInitScanTupleSlot(estate, &scanstate->ss,
RelationGetDescr(scanstate->ss.ss_currentRelation),
- table_slot_callbacks(scanstate->ss.ss_currentRelation));
+ table_slot_callbacks(
+ scanstate->ss.ss_currentRelation));
/*
* Initialize result type and projection.
@@ -262,6 +272,20 @@ ExecSeqScanEstimate(SeqScanState *node,
estate->es_snapshot);
shm_toc_estimate_chunk(&pcxt->estimator, node->pscan_len);
shm_toc_estimate_keys(&pcxt->estimator, 1);
+
+ /*
+ * Estimate space for extra dsa_pointer address for when parallel
+ * sequential scans use a semi-join filter.
+ */
+ if (node->ss.ps.plan->parallel_aware && node->applySemiJoinFilter)
+ {
+ shm_toc_estimate_keys(&pcxt->estimator, 1);
+ if (node->semiJoinFilters)
+ {
+ shm_toc_estimate_keys(&pcxt->estimator,
+ sizeof(dsa_pointer) * list_length(node->semiJoinFilters));
+ }
+ }
}
/* ----------------------------------------------------------------
@@ -277,6 +301,51 @@ ExecSeqScanInitializeDSM(SeqScanState *node,
EState *estate = node->ss.ps.state;
ParallelTableScanDesc pscan;
+ /*
+ * If scan is using a semi-join filter, then initialize dsa pointer of
+ * shared sjf.
+ */
+ if (node->applySemiJoinFilter)
+ {
+ int sjf_num = list_length(node->semiJoinFilters);
+ dsa_pointer *dsa_pointer_address; /* an array of size sjf_num */
+ ListCell *lc;
+ int i = 0;
+
+ dsa_pointer_address = (dsa_pointer *) shm_toc_allocate(pcxt->toc,
+ sizeof(dsa_pointer) * sjf_num);
+ foreach(lc, node->semiJoinFilters)
+ {
+ SemiJoinFilter *sjf = (SemiJoinFilter *) (lfirst(lc));
+ SemiJoinFilterParallelState *parallelState;
+ dsa_area *area = node->ss.ps.state->es_query_dsa;
+
+ sjf->parallelState = CreateFilterParallelState(area, sjf, sjf_num);
+ sjf->isParallel = true;
+ /* check if main process always will run */
+ parallelState = (SemiJoinFilterParallelState *) dsa_get_address(area, sjf->parallelState);
+ parallelState->numProcesses = 1;
+ /* update parallelState with built bloom filter */
+ if (sjf->doneBuilding &&
+ node->ss.ss_currentRelation->rd_id == sjf->checkingId)
+ {
+ bloom_filter *parallel_bloom = (bloom_filter *) dsa_get_address(area, parallelState->bloom_dsa_address);
+
+ replace_bitset(parallel_bloom, sjf->filter);
+ LWLockRelease(¶llelState->secondlock);
+ }
+ dsa_pointer_address[i] = sjf->parallelState;
+ i++;
+ }
+
+ /*
+ * Add plan_id to magic number so this is also unique for each plan
+ * node.
+ */
+ shm_toc_insert(pcxt->toc, DSA_LOCATION_KEY_FOR_SJF +
+ node->ss.ps.plan->plan_node_id, dsa_pointer_address);
+ }
+
pscan = shm_toc_allocate(pcxt->toc, node->pscan_len);
table_parallelscan_initialize(node->ss.ss_currentRelation,
pscan,
@@ -317,4 +386,106 @@ ExecSeqScanInitializeWorker(SeqScanState *node,
pscan = shm_toc_lookup(pwcxt->toc, node->ss.ps.plan->plan_node_id, false);
node->ss.ss_currentScanDesc =
table_beginscan_parallel(node->ss.ss_currentRelation, pscan);
+
+ /*
+ * Create worker's semi-join filter for merge join, if using it. We first
+ * need to check shm_toc to see if a sjf exists, then create the local
+ * backend sjf.
+ */
+ if (shm_toc_lookup(pwcxt->toc,
+ DSA_LOCATION_KEY_FOR_SJF + node->ss.ps.plan->plan_node_id, 1))
+ {
+ dsa_pointer *parallel_addresses = (dsa_pointer *)
+ shm_toc_lookup(pwcxt->toc,
+ DSA_LOCATION_KEY_FOR_SJF + node->ss.ps.plan->plan_node_id, 1);
+
+ /*
+ * we know that there is at least one sjf, we will update accordingly
+ * if the parallel state says there is more (this avoids using an
+ * additional shm_toc allocation).
+ */
+ int sjf_num = 1;
+
+ /*
+ * If a copy of any sjf already exists on the backend, we want to free
+ * it and create a new one.
+ */
+ if (node->applySemiJoinFilter)
+ {
+ while (list_length(node->semiJoinFilters) > 0)
+ {
+ SemiJoinFilter *sjf = (SemiJoinFilter *)
+ (list_head(node->semiJoinFilters)->ptr_value);
+
+ node->semiJoinFilters =
+ list_delete_nth_cell(node->semiJoinFilters, 0);
+ FreeSemiJoinFilter(sjf);
+ }
+ }
+
+ /*
+ * Here, we create the process-local SJF's, which will later be
+ * combined into the single SJF after all parallel work is done.
+ */
+ for (int i = 0; i < sjf_num; i++)
+ {
+ dsa_pointer parallel_address = parallel_addresses[i];
+ SemiJoinFilterParallelState *parallelState =
+ (SemiJoinFilterParallelState *)
+ dsa_get_address(node->ss.ps.state->es_query_dsa,
+ parallel_address);
+ SemiJoinFilter *sjf;
+ MemoryContext oldContext;
+
+ sjf_num = parallelState->sjf_num;
+ oldContext = MemoryContextSwitchTo(GetMemoryChunkContext(node));
+ sjf = (SemiJoinFilter *) palloc0(sizeof(SemiJoinFilter));
+ sjf->filter = bloom_create(parallelState->num_elements,
+ parallelState->work_mem,
+ parallelState->seed);
+ sjf->buildingId = parallelState->buildingId;
+ sjf->checkingId = parallelState->checkingId;
+ sjf->seed = parallelState->seed;
+ sjf->isParallel = true;
+ sjf->isWorker = true;
+ sjf->doneBuilding = parallelState->doneBuilding;
+ sjf->parallelState = parallel_address;
+ node->applySemiJoinFilter = true;
+ sjf->buildingAttr = parallelState->buildingAttr;
+ sjf->checkingAttr = parallelState->checkingAttr;
+ node->semiJoinFilters =
+ lappend(node->semiJoinFilters, (void *) sjf);
+ sjf->mergejoin_plan_id = parallelState->mergejoin_plan_id;
+ /* copy over bloom filter if already built */
+ if (sjf->doneBuilding &&
+ parallelState->checkingId ==
+ node->ss.ss_currentRelation->rd_id)
+ {
+ SemiJoinFilterParallelState *parallelState =
+ (SemiJoinFilterParallelState *)
+ dsa_get_address(node->ss.ps.state->es_query_dsa,
+ sjf->parallelState);
+ bloom_filter *shared_bloom = (bloom_filter *) dsa_get_address(
+ node->ss.ps.state->es_query_dsa,
+ parallelState->bloom_dsa_address);
+
+ replace_bitset(sjf->filter, shared_bloom);
+ }
+ else if (!sjf->doneBuilding &&
+ parallelState->buildingId ==
+ node->ss.ss_currentRelation->rd_id)
+ {
+ /*
+ * Add this process to number of scan processes, need to use
+ * lock in case of multiple workers updating at same time. We
+ * want to avoid using the planned number of workers because
+ * that can be wrong.
+ */
+ LWLockAcquire(¶llelState->lock, LW_EXCLUSIVE);
+ parallelState->numProcesses += 1;
+ LWLockRelease(¶llelState->lock);
+ }
+ MemoryContextSwitchTo(oldContext);
+ }
+ }
}
diff --git a/src/backend/lib/bloomfilter.c b/src/backend/lib/bloomfilter.c
index 3ef67d35ac..0a05ada9b6 100644
--- a/src/backend/lib/bloomfilter.c
+++ b/src/backend/lib/bloomfilter.c
@@ -128,6 +128,56 @@ bloom_free(bloom_filter *filter)
pfree(filter);
}
+/*
+ * Create Bloom filter in dsa shared memory
+ */
+dsa_pointer
+bloom_create_in_dsa(dsa_area *area, int64 total_elems, int bloom_work_mem, uint64 seed)
+{
+ dsa_pointer filter_dsa_address;
+ bloom_filter *filter;
+ int bloom_power;
+ uint64 bitset_bytes;
+ uint64 bitset_bits;
+
+ /*
+ * Aim for two bytes per element; this is sufficient to get a false
+ * positive rate below 1%, independent of the size of the bitset or total
+ * number of elements. Also, if rounding down the size of the bitset to
+ * the next lowest power of two turns out to be a significant drop, the
+ * false positive rate still won't exceed 2% in almost all cases.
+ */
+ bitset_bytes = Min(bloom_work_mem * UINT64CONST(1024), total_elems * 2);
+ bitset_bytes = Max(1024 * 1024, bitset_bytes);
+
+ /*
+ * Size in bits should be the highest power of two <= target. bitset_bits
+ * is uint64 because PG_UINT32_MAX is 2^32 - 1, not 2^32
+ */
+ bloom_power = my_bloom_power(bitset_bytes * BITS_PER_BYTE);
+ bitset_bits = UINT64CONST(1) << bloom_power;
+ bitset_bytes = bitset_bits / BITS_PER_BYTE;
+
+ /* Allocate bloom filter with unset bitset */
+ filter_dsa_address = dsa_allocate0(area, offsetof(bloom_filter, bitset) +
+ sizeof(unsigned char) * bitset_bytes);
+ filter = (bloom_filter *) dsa_get_address(area, filter_dsa_address);
+ filter->k_hash_funcs = optimal_k(bitset_bits, total_elems);
+ filter->seed = seed;
+ filter->m = bitset_bits;
+
+ return filter_dsa_address;
+}
+
+/*
+ * Free Bloom filter in dsa shared memory
+ */
+void
+bloom_free_in_dsa(dsa_area *area, dsa_pointer filter_dsa_address)
+{
+ dsa_free(area, filter_dsa_address);
+}
+
/*
* Add element to Bloom filter
*/
@@ -292,3 +342,27 @@ mod_m(uint32 val, uint64 m)
return val & (m - 1);
}
+
+/*
+ * Add secondary filter to main filter, essentially "combining" the two filters together.
+ * This happens in-place, with main_filter being the combined filter.
+ * Both filters must have the same seed and size for this to work.
+ */
+void
+add_to_filter(bloom_filter *main_filter, bloom_filter *to_add)
+{
+ Assert(main_filter->seed == to_add->seed);
+ Assert(main_filter->m == to_add->m);
+ /* m is in bits not bytes */
+ for (int i = 0; i < main_filter->m / BITS_PER_BYTE; i++)
+ {
+ main_filter->bitset[i] = main_filter->bitset[i] | to_add->bitset[i];
+ }
+}
+
+void
+replace_bitset(bloom_filter *main_filter, bloom_filter *overriding_filter)
+{
+ Assert(main_filter->m == overriding_filter->m);
+ memcpy(&main_filter->bitset, &overriding_filter->bitset, main_filter->m / BITS_PER_BYTE);
+}
diff --git a/src/include/executor/nodeMergejoin.h b/src/include/executor/nodeMergejoin.h
index c311c7ed80..d4cc439315 100644
--- a/src/include/executor/nodeMergejoin.h
+++ b/src/include/executor/nodeMergejoin.h
@@ -23,6 +23,8 @@ extern void FreeSemiJoinFilter(SemiJoinFilter * sjf);
extern int PushDownDirection(PlanState *node);
extern void PushDownFilter(PlanState *node, Plan *plan, SemiJoinFilter * sjf, Oid *relId, int64 *nodeRows);
extern bool SemiJoinFilterExamineSlot(List *semiJoinFilters, TupleTableSlot *slot, Oid tableId);
-extern void SemiJoinFilterFinishScan(List *semiJoinFilters, Oid tableId);
+extern void SemiJoinFilterFinishScan(List *semiJoinFilters, Oid tableId, dsa_area *parallel_area);
+extern dsa_pointer CreateFilterParallelState(dsa_area *area, SemiJoinFilter * sjf, int sjf_num);
+extern SemiJoinFilter * GetSemiJoinFilter(PlanState *node, int plan_id);
#endif /* NODEMERGEJOIN_H */
diff --git a/src/include/lib/bloomfilter.h b/src/include/lib/bloomfilter.h
index 8146d8e7fd..3b5d1821a5 100644
--- a/src/include/lib/bloomfilter.h
+++ b/src/include/lib/bloomfilter.h
@@ -13,15 +13,22 @@
#ifndef BLOOMFILTER_H
#define BLOOMFILTER_H
+#include "utils/dsa.h"
+
typedef struct bloom_filter bloom_filter;
extern bloom_filter *bloom_create(int64 total_elems, int bloom_work_mem,
uint64 seed);
extern void bloom_free(bloom_filter *filter);
+extern dsa_pointer bloom_create_in_dsa(dsa_area *area, int64 total_elems,
+ int bloom_work_mem, uint64 seed);
+extern void bloom_free_in_dsa(dsa_area *area, dsa_pointer filter_dsa_address);
extern void bloom_add_element(bloom_filter *filter, unsigned char *elem,
size_t len);
extern bool bloom_lacks_element(bloom_filter *filter, unsigned char *elem,
size_t len);
extern double bloom_prop_bits_set(bloom_filter *filter);
+extern void add_to_filter(bloom_filter *main_filter, bloom_filter *to_add);
+extern void replace_bitset(bloom_filter *main_filter, bloom_filter *overriding_filter);
#endif /* BLOOMFILTER_H */
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index 6964462720..6ca6de437b 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -40,6 +40,8 @@
#include "nodes/tidbitmap.h"
#include "partitioning/partdefs.h"
#include "storage/condition_variable.h"
+#include "storage/lwlock.h"
+#include "storage/shm_toc.h"
#include "utils/hsearch.h"
#include "utils/queryenvironment.h"
#include "utils/reltrigger.h"
@@ -2026,6 +2028,10 @@ typedef struct SemiJoinFilter
Oid checkingId;
int checkingAttr;
bool doneBuilding;
+ /* Parallel information */
+ bool isParallel;
+ bool isWorker;
+ dsa_pointer parallelState;
/* metadata */
uint64 seed;
int64 num_elements;
@@ -2036,6 +2042,31 @@ typedef struct SemiJoinFilter
int mergejoin_plan_id;
} SemiJoinFilter;
+typedef struct SemiJoinFilterParallelState
+{
+ /* bloom filter information */
+ uint64 seed;
+ int64 num_elements;
+ int work_mem;
+ dsa_pointer bloom_dsa_address;
+ /* information to copy over to worker processes */
+ int numAttr;
+ Oid buildingId;
+ Oid checkingId;
+ int buildingAttr;
+ int checkingAttr;
+ int elementsAdded;
+ /* information for parallelization and locking */
+ bool doneBuilding;
+ int workersDone;
+ int numProcesses;
+ uint64 lockStop;
+ LWLock lock;
+ LWLock secondlock;
+ int sjf_num;
+ int mergejoin_plan_id;
+} SemiJoinFilterParallelState;
+
typedef struct MergeJoinState
{
JoinState js; /* its first field is NodeTag */
--
2.37.1
0004-Integrate-EXPLAIN-command-with-semijoin-filter.patchapplication/octet-stream; name=0004-Integrate-EXPLAIN-command-with-semijoin-filter.patchDownload
From e328d8ad83c21eb8a34c8101cd1dee1e4276e50e Mon Sep 17 00:00:00 2001
From: Lyu Pan <lyup@amazon.com>
Date: Fri, 16 Sep 2022 21:40:59 +0000
Subject: [PATCH 4/5] Integrate EXPLAIN command with semijoin filter.
When explaining Merge Join node, if semijoin filter is used in the Merge Join node, related metadata will be displyed, including filter clause, estimated filtering rate and actual filtering rate if EXPLAIN ANALYZE is used.
For example:
Merge Join (...)
Merge Cond: (...)
SemiJoin Filter Created Based on: (...)
SemiJoin Estimated Filtering Rate: XXX
SemiJoin Actual Filtering Rate: XXX
---
src/backend/commands/explain.c | 40 +++++++++++++++++++++++++++++++++-
1 file changed, 39 insertions(+), 1 deletion(-)
diff --git a/src/backend/commands/explain.c b/src/backend/commands/explain.c
index f86983c660..438792e31a 100644
--- a/src/backend/commands/explain.c
+++ b/src/backend/commands/explain.c
@@ -153,7 +153,8 @@ static void ExplainIndentText(ExplainState *es);
static void ExplainJSONLineEnding(ExplainState *es);
static void ExplainYAMLLineStarting(ExplainState *es);
static void escape_yaml(StringInfo buf, const char *str);
-
+static void show_semijoin_metadata(List *equijoins, PlanState *planstate,
+ List *ancestors, ExplainState *es);
/*
@@ -1981,6 +1982,11 @@ ExplainNode(PlanState *planstate, List *ancestors,
"Merge Cond", planstate, ancestors, es);
show_upper_qual(((MergeJoin *) plan)->join.joinqual,
"Join Filter", planstate, ancestors, es);
+ if (((MergeJoinState *) planstate)->sjf)
+ {
+ show_semijoin_metadata(((MergeJoin *) plan)->mergeclauses,
+ planstate, ancestors, es);
+ }
if (((MergeJoin *) plan)->join.joinqual)
show_instrumentation_count("Rows Removed by Join Filter", 1,
planstate, es);
@@ -5041,3 +5047,35 @@ escape_yaml(StringInfo buf, const char *str)
{
escape_json(buf, str);
}
+
+static void
+show_semijoin_metadata(List *equijoins, PlanState *planstate,
+ List *ancestors, ExplainState *es)
+{
+ char createStr[256];
+ int clause_ordinal;
+ Node *best_equijoin_clause;
+ MergeJoin *mj = ((MergeJoin *) planstate->plan);
+
+ Assert(planstate);
+ Assert(nodeTag(planstate) == T_MergeJoinState);
+ Assert(planstate->plan);
+ Assert(nodeTag(planstate->plan) == T_MergeJoin);
+
+ snprintf(createStr, sizeof(createStr), "%s",
+ "SemiJoin Filter Created Based on");
+ clause_ordinal = mj->bestExpr;
+ best_equijoin_clause =
+ (Node *) list_nth_node(OpExpr, equijoins, clause_ordinal);
+ show_expression(best_equijoin_clause, createStr, planstate, ancestors,
+ true, es);
+ ExplainPropertyFloat("SemiJoin Estimated Filtering Rate", NULL,
+ mj->filteringRate, 4, es);
+ if (es->analyze)
+ {
+ SemiJoinFilter *sjf = ((MergeJoinState *) planstate)->sjf;
+
+ ExplainPropertyFloat("SemiJoin Actual Filtering Rate", NULL,
+ ((double) sjf->elementsFiltered) / sjf->elementsChecked, 4, es);
+ }
+}
--
2.37.1
0002-Support-semijoin-filter-in-the-executor-for-non-para.patchapplication/octet-stream; name=0002-Support-semijoin-filter-in-the-executor-for-non-para.patchDownload
From 071e35b4fc038cd791479beb6fcbe8b0a4c0bee8 Mon Sep 17 00:00:00 2001
From: Lyu Pan <lyup@amazon.com>
Date: Fri, 16 Sep 2022 03:35:06 +0000
Subject: [PATCH 2/5] Support semijoin filter in the executor for non-parallel
mergejoin.
During MergeJoinState initialization, if a semijoin filter should be used in the MergeJoin node (according to the planner), a SemiJoinFilter struct is initialized, then the relation id and attribute number used to build/check the bloom filter are calculated, related information is pushed down to the scan node (only SeqScan for now). The bloom filter is always built on the outer/left tree, and used in the inner/right tree.
---
src/backend/executor/execScan.c | 52 +++++-
src/backend/executor/nodeMergejoin.c | 259 +++++++++++++++++++++++++++
src/backend/executor/nodeSeqscan.c | 6 +
src/include/executor/nodeMergejoin.h | 5 +
src/include/nodes/execnodes.h | 25 +++
5 files changed, 340 insertions(+), 7 deletions(-)
diff --git a/src/backend/executor/execScan.c b/src/backend/executor/execScan.c
index 043bb83f55..4b97c39455 100644
--- a/src/backend/executor/execScan.c
+++ b/src/backend/executor/execScan.c
@@ -19,8 +19,10 @@
#include "postgres.h"
#include "executor/executor.h"
+#include "executor/nodeMergejoin.h"
#include "miscadmin.h"
#include "utils/memutils.h"
+#include "utils/rel.h"
@@ -173,10 +175,12 @@ ExecScan(ScanState *node,
/* interrupt checks are in ExecScanFetch */
/*
- * If we have neither a qual to check nor a projection to do, just skip
- * all the overhead and return the raw scan tuple.
+ * If we have neither a qual to check nor a projection to do, nor a bloom
+ * filter to check, just skip all the overhead and return the raw scan
+ * tuple.
*/
- if (!qual && !projInfo)
+ if (!qual && !projInfo && !IsA(node, SeqScanState) &&
+ !((SeqScanState *) node)->applySemiJoinFilter)
{
ResetExprContext(econtext);
return ExecScanFetch(node, accessMtd, recheckMtd);
@@ -206,6 +210,13 @@ ExecScan(ScanState *node,
*/
if (TupIsNull(slot))
{
+ if (IsA(node, SeqScanState) &&
+ ((SeqScanState *) node)->applySemiJoinFilter)
+ {
+ SemiJoinFilterFinishScan(
+ ((SeqScanState *) node)->semiJoinFilters,
+ node->ss_currentRelation->rd_id);
+ }
if (projInfo)
return ExecClearTuple(projInfo->pi_state.resultslot);
else
@@ -232,16 +243,43 @@ ExecScan(ScanState *node,
if (projInfo)
{
/*
- * Form a projection tuple, store it in the result tuple slot
- * and return it.
+ * Form a projection tuple, store it in the result tuple slot,
+ * check against SemiJoinFilter, then return it.
*/
- return ExecProject(projInfo);
+ TupleTableSlot *projectedSlot = ExecProject(projInfo);
+
+ if (IsA(node, SeqScanState) &&
+ ((SeqScanState *) node)->applySemiJoinFilter)
+ {
+ if (!SemiJoinFilterExamineSlot(
+ ((SeqScanState *) node)->semiJoinFilters,
+ projectedSlot, node->ss_currentRelation->rd_id))
+ {
+ /* slot did not pass SemiJoinFilter, so skipping it. */
+ ResetExprContext(econtext);
+ continue;
+ }
+ }
+ return projectedSlot;
}
else
{
/*
- * Here, we aren't projecting, so just return scan tuple.
+ * Here, we aren't projecting, so check against
+ * SemiJoinFilter, then return tuple.
*/
+ if (IsA(node, SeqScanState) &&
+ ((SeqScanState *) node)->applySemiJoinFilter)
+ {
+ if (!SemiJoinFilterExamineSlot(
+ ((SeqScanState *) node)->semiJoinFilters, slot,
+ node->ss_currentRelation->rd_id))
+ {
+ /* slot did not pass SemiJoinFilter, so skipping it. */
+ ResetExprContext(econtext);
+ continue;
+ }
+ }
return slot;
}
}
diff --git a/src/backend/executor/nodeMergejoin.c b/src/backend/executor/nodeMergejoin.c
index fed345eae5..81c253aad5 100644
--- a/src/backend/executor/nodeMergejoin.c
+++ b/src/backend/executor/nodeMergejoin.c
@@ -93,8 +93,11 @@
#include "postgres.h"
#include "access/nbtree.h"
+#include "common/pg_prng.h"
#include "executor/execdebug.h"
+#include "executor/execExpr.h"
#include "executor/nodeMergejoin.h"
+#include "lib/bloomfilter.h"
#include "miscadmin.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
@@ -1603,6 +1606,95 @@ ExecInitMergeJoin(MergeJoin *node, EState *estate, int eflags)
node->mergeNullsFirst,
(PlanState *) mergestate);
+ /*
+ * initialize SemiJoinFilter, if planner decided to do so
+ */
+ if (((MergeJoin *) mergestate->js.ps.plan)->applySemiJoinFilter)
+ {
+ SemiJoinFilter *sjf;
+ Plan *buildingNode;
+ Plan *checkingNode;
+ uint64 seed;
+ MergeJoinClause clause;
+
+ /* create Bloom filter */
+ sjf = (SemiJoinFilter *) palloc0(sizeof(SemiJoinFilter));
+
+ /*
+ * Push down filter down outer and inner subtrees and apply filter to
+ * the nodes that correspond to the ones identified during planning.
+ * We are pushing down first because we need some metadata from the
+ * scan nodes (i.e. Relation Id's and planner-estimated number of
+ * rows).
+ */
+ buildingNode = ((MergeJoin *) mergestate->js.ps.plan)->buildingNode;
+ checkingNode = ((MergeJoin *) mergestate->js.ps.plan)->checkingNode;
+ sjf->buildingId = -1;
+ sjf->checkingId = -1;
+ PushDownFilter(mergestate->js.ps.lefttree, buildingNode, sjf, &sjf->buildingId, &sjf->num_elements);
+ PushDownFilter(mergestate->js.ps.righttree, checkingNode, sjf, &sjf->checkingId, &sjf->num_elements);
+
+ /* Initialize SJF data and create Bloom filter */
+ seed = pg_prng_uint64(&pg_global_prng_state);
+ sjf->filter = bloom_create(sjf->num_elements, work_mem, seed);
+ sjf->work_mem = work_mem;
+ sjf->seed = seed;
+ sjf->doneBuilding = false;
+
+ /*
+ * From the plan level, we already know which mergeclause to build the
+ * filter on. However, to implement this on the scan level, we look at
+ * the expression and figure out which slot we need to examine in
+ * ExecScan to build/check the filter on.
+ */
+
+ clause = &mergestate->mj_Clauses[((MergeJoin *) mergestate->js.ps.plan)->bestExpr];
+
+ /*
+ * Look through expression steps to determine which relation attribute
+ * slot they are comparing and take note of it. All merge join clauses
+ * eventually fetch tuples from either the outer or inner slot, so we
+ * just need to check for those specific ExprEvalOp's.
+ */
+ for (int j = 0; j < clause->lexpr->steps_len; j++)
+ {
+ ExprEvalOp leftOp = clause->lexpr->steps[j].opcode;
+ int leftAttr = clause->lexpr->steps[j].d.var.attnum;
+
+ if (leftOp == EEOP_OUTER_FETCHSOME)
+ {
+ /* attribute numbers are 1-indexed */
+ sjf->buildingAttr = leftAttr - 1;
+ break;
+ }
+ else if (leftOp == EEOP_INNER_FETCHSOME)
+ {
+ sjf->checkingAttr = leftAttr - 1;
+ break;
+ }
+ }
+ /* do it again for right expression */
+ for (int j = 0; j < clause->rexpr->steps_len; j++)
+ {
+ ExprEvalOp rightOp = clause->rexpr->steps[j].opcode;
+ int rightAttr = clause->rexpr->steps[j].d.var.attnum;
+
+ if (rightOp == EEOP_OUTER_FETCHSOME)
+ {
+ /* attribute numbers are 1-indexed */
+ sjf->buildingAttr = rightAttr - 1;
+ break;
+ }
+ else if (rightOp == EEOP_INNER_FETCHSOME)
+ {
+ sjf->checkingAttr = rightAttr - 1;
+ break;
+ }
+ }
+ sjf->mergejoin_plan_id = mergestate->js.ps.plan->plan_node_id;
+ mergestate->sjf = sjf;
+ }
+
/*
* initialize join state
*/
@@ -1645,6 +1737,14 @@ ExecEndMergeJoin(MergeJoinState *node)
ExecClearTuple(node->js.ps.ps_ResultTupleSlot);
ExecClearTuple(node->mj_MarkedTupleSlot);
+ /*
+ * free SemiJoinFilter
+ */
+ if (node->sjf)
+ {
+ FreeSemiJoinFilter(node->sjf);
+ }
+
/*
* shut down the subplans
*/
@@ -1678,3 +1778,162 @@ ExecReScanMergeJoin(MergeJoinState *node)
if (innerPlan->chgParam == NULL)
ExecReScan(innerPlan);
}
+
+void
+FreeSemiJoinFilter(SemiJoinFilter * sjf)
+{
+ bloom_free(sjf->filter);
+ pfree(sjf);
+}
+
+/*
+ * Determines the direction that a pushdown filter can be pushed. This is not very robust, but this
+ * is because we've already done the careful calculations at the plan level. If we end up pushing where
+ * we're not supposed to, it's fine because we've done the verifications in the planner.
+ */
+int
+PushDownDirection(PlanState *node)
+{
+ switch (nodeTag(node))
+ {
+ case T_HashState:
+ case T_MaterialState:
+ case T_GatherState:
+ case T_GatherMergeState:
+ case T_SortState:
+ case T_UniqueState:
+ case T_AggState:
+ {
+ return 0;
+ }
+ case T_NestLoopState:
+ case T_MergeJoinState:
+ case T_HashJoinState:
+ {
+ return 1;
+ }
+ default:
+ {
+ return -1;
+ }
+ }
+}
+
+/* Recursively pushes down the filter until an appropriate SeqScan node is reached. Then, it
+ * verifies if that SeqScan node is the one we want to push the filter to, and if it is, then
+ * appends the SJF to the node. */
+void
+PushDownFilter(PlanState *node, Plan *plan, SemiJoinFilter * sjf, Oid *relId, int64 *nodeRows)
+{
+ if (node == NULL)
+ {
+ return;
+ }
+
+ check_stack_depth();
+ if (node->type == T_SeqScanState)
+ {
+ SeqScanState *scan = (SeqScanState *) node;
+
+ Assert(IsA(scan, SeqScanState));
+
+ /*
+ * found the right Scan node that we want to apply the filter onto via
+ * matching relId
+ */
+ if (scan->ss.ss_currentRelation->rd_id == *relId)
+ {
+ scan->applySemiJoinFilter = true;
+ scan->semiJoinFilters = lappend(scan->semiJoinFilters, sjf);
+ }
+
+ /*
+ * Check if right Scan node, based on matching Plan nodes. This will
+ * be the most common way of matching Scan nodes to the filter, the
+ * above use case is only for fringe parallel-execution cases.
+ */
+ else if (scan->ss.ps.plan == plan)
+ {
+ scan->applySemiJoinFilter = true;
+ scan->semiJoinFilters = lappend(scan->semiJoinFilters, sjf);
+ *relId = scan->ss.ss_currentRelation->rd_id;
+ /* double row estimate to reduce error rate for Bloom filter */
+ *nodeRows = Max(*nodeRows, scan->ss.ps.plan->plan_rows * 2);
+ }
+ }
+ else
+ {
+ if (PushDownDirection(node) == 1)
+ {
+ PushDownFilter(node->lefttree, plan, sjf, relId, nodeRows);
+ PushDownFilter(node->righttree, plan, sjf, relId, nodeRows);
+ }
+ else if (PushDownDirection(node) == 0)
+ {
+ PushDownFilter(node->lefttree, plan, sjf, relId, nodeRows);
+ }
+ }
+}
+
+/*
+ * If this table is the building-side table for the SemiJoinFilter, adds the element to
+ * the bloom filter and always returns true. If this table is the checking-side table for the SemiJoinFilter,
+ * then checks the element against the bloom filter and returns true if the element is (probably) in the set,
+ * and false if the element is not in the bloom filter.
+ */
+bool
+SemiJoinFilterExamineSlot(List *semiJoinFilters, TupleTableSlot *slot, Oid tableId)
+{
+ ListCell *cell;
+
+ foreach(cell, semiJoinFilters)
+ {
+ SemiJoinFilter *sjf = ((SemiJoinFilter *) (lfirst(cell)));
+
+ /* check if this table's relation ID matches the filter's relation ID */
+ if (!sjf->doneBuilding && tableId == sjf->buildingId)
+ {
+ Datum val;
+
+ slot_getsomeattrs(slot, sjf->buildingAttr + 1);
+ sjf->elementsAdded++;
+
+ /*
+ * We are only using one key for now. Later functionality might
+ * include multiple join keys.
+ */
+ val = slot->tts_values[sjf->buildingAttr];
+ bloom_add_element(sjf->filter, (unsigned char *) &val, sizeof(val));
+ }
+ else if (sjf->doneBuilding && tableId == sjf->checkingId)
+ {
+ Datum val;
+
+ slot_getsomeattrs(slot, sjf->checkingAttr + 1);
+ sjf->elementsChecked++;
+ val = slot->tts_values[sjf->checkingAttr];
+ if (bloom_lacks_element(sjf->filter, (unsigned char *) &val, sizeof(val)))
+ {
+ sjf->elementsFiltered++;
+ return false;
+ }
+ }
+ }
+ return true;
+}
+
+void
+SemiJoinFilterFinishScan(List *semiJoinFilters, Oid tableId)
+{
+ ListCell *cell;
+
+ foreach(cell, semiJoinFilters)
+ {
+ SemiJoinFilter *sjf = ((SemiJoinFilter *) (lfirst(cell)));
+
+ if (!sjf->doneBuilding && tableId == sjf->buildingId)
+ {
+ sjf->doneBuilding = true;
+ }
+ }
+}
diff --git a/src/backend/executor/nodeSeqscan.c b/src/backend/executor/nodeSeqscan.c
index 7b58cd9162..e43ce3f8d0 100644
--- a/src/backend/executor/nodeSeqscan.c
+++ b/src/backend/executor/nodeSeqscan.c
@@ -207,6 +207,12 @@ ExecEndSeqScan(SeqScanState *node)
*/
if (scanDesc != NULL)
table_endscan(scanDesc);
+
+ /*
+ * clear out semijoinfilter list, if non-null
+ */
+ if (node->semiJoinFilters)
+ pfree(node->semiJoinFilters);
}
/* ----------------------------------------------------------------
diff --git a/src/include/executor/nodeMergejoin.h b/src/include/executor/nodeMergejoin.h
index 26ab517508..c311c7ed80 100644
--- a/src/include/executor/nodeMergejoin.h
+++ b/src/include/executor/nodeMergejoin.h
@@ -19,5 +19,10 @@
extern MergeJoinState *ExecInitMergeJoin(MergeJoin *node, EState *estate, int eflags);
extern void ExecEndMergeJoin(MergeJoinState *node);
extern void ExecReScanMergeJoin(MergeJoinState *node);
+extern void FreeSemiJoinFilter(SemiJoinFilter * sjf);
+extern int PushDownDirection(PlanState *node);
+extern void PushDownFilter(PlanState *node, Plan *plan, SemiJoinFilter * sjf, Oid *relId, int64 *nodeRows);
+extern bool SemiJoinFilterExamineSlot(List *semiJoinFilters, TupleTableSlot *slot, Oid tableId);
+extern void SemiJoinFilterFinishScan(List *semiJoinFilters, Oid tableId);
#endif /* NODEMERGEJOIN_H */
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index 01b1727fc0..6964462720 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -32,6 +32,7 @@
#include "access/tupconvert.h"
#include "executor/instrument.h"
#include "fmgr.h"
+#include "lib/bloomfilter.h"
#include "lib/ilist.h"
#include "lib/pairingheap.h"
#include "nodes/params.h"
@@ -1451,6 +1452,9 @@ typedef struct SeqScanState
{
ScanState ss; /* its first field is NodeTag */
Size pscan_len; /* size of parallel heap scan descriptor */
+ /* for use of SemiJoinFilters during merge join */
+ bool applySemiJoinFilter;
+ List *semiJoinFilters;
} SeqScanState;
/* ----------------
@@ -2012,6 +2016,26 @@ typedef struct NestLoopState
/* private in nodeMergejoin.c: */
typedef struct MergeJoinClauseData *MergeJoinClause;
+typedef struct SemiJoinFilter
+{
+ bloom_filter *filter;
+ /* Relation that Bloom Filter is built on */
+ Oid buildingId;
+ int buildingAttr;
+ /* Relation that Bloom Filter is checked on */
+ Oid checkingId;
+ int checkingAttr;
+ bool doneBuilding;
+ /* metadata */
+ uint64 seed;
+ int64 num_elements;
+ int work_mem;
+ int elementsAdded;
+ int elementsChecked;
+ int elementsFiltered;
+ int mergejoin_plan_id;
+} SemiJoinFilter;
+
typedef struct MergeJoinState
{
JoinState js; /* its first field is NodeTag */
@@ -2032,6 +2056,7 @@ typedef struct MergeJoinState
TupleTableSlot *mj_NullInnerTupleSlot;
ExprContext *mj_OuterEContext;
ExprContext *mj_InnerEContext;
+ SemiJoinFilter *sjf;
} MergeJoinState;
/* ----------------
--
2.37.1
0005-Add-basic-regress-tests-for-semijoin-filter.patchapplication/octet-stream; name=0005-Add-basic-regress-tests-for-semijoin-filter.patchDownload
From ccc7c86b38f541bf9c819f13f32c548812e216e6 Mon Sep 17 00:00:00 2001
From: Lyu Pan <lyup@amazon.com>
Date: Mon, 19 Sep 2022 23:25:30 +0000
Subject: [PATCH 5/5] Add basic regress tests for semijoin filter.
Because of implementation limits (and bugs) in the prototype, only some basic sqls are tested.
1. Test that when force_mergejoin_semijoin_filter is ON, semijoin filter is used.
2. Test that semijoin filter works in two-table inner merge join.
3. Test that semijoin filter can be pushed throudh SORT node.
4. Test that semijoin filter works in a basic three-table-merge-join. However, due to implementation bugs, it doesn't work in all kinds of three table merge joins.
---
.../expected/mergejoin_semijoinfilter.out | 158 ++++++++++++++++++
src/test/regress/parallel_schedule | 1 +
.../regress/sql/mergejoin_semijoinfilter.sql | 62 +++++++
3 files changed, 221 insertions(+)
create mode 100644 src/test/regress/expected/mergejoin_semijoinfilter.out
create mode 100644 src/test/regress/sql/mergejoin_semijoinfilter.sql
diff --git a/src/test/regress/expected/mergejoin_semijoinfilter.out b/src/test/regress/expected/mergejoin_semijoinfilter.out
new file mode 100644
index 0000000000..07c4acc253
--- /dev/null
+++ b/src/test/regress/expected/mergejoin_semijoinfilter.out
@@ -0,0 +1,158 @@
+SET enable_hashjoin = OFF;
+SET enable_nestloop = OFF;
+SET enable_mergejoin = ON;
+SET enable_mergejoin_semijoin_filter = ON;
+SET force_mergejoin_semijoin_filter = ON;
+CREATE TABLE t1 (
+ i integer,
+ j integer
+);
+CREATE TABLE t2 (
+ i integer,
+ k integer
+);
+CREATE TABLE t3 (
+ i integer,
+ m integer
+);
+INSERT INTO t1 (i, j)
+ SELECT
+ generate_series(1,100000) AS i,
+ generate_series(1,100000) AS j;
+INSERT INTO t2 (i, k)
+ SELECT
+ generate_series(1,100000) AS i,
+ generate_series(1,100000) AS k;
+INSERT INTO t3 (i, m)
+ SELECT
+ generate_series(1,100000) AS i,
+ generate_series(1,100000) AS m;
+-- Semijoin filter is not used when force_mergejoin_semijoin_filter is OFF.
+SET force_mergejoin_semijoin_filter = OFF;
+EXPLAIN (VERBOSE, COSTS OFF) SELECT COUNT(*) FROM t1 JOIN t2 ON t1.i = t2.i;
+ QUERY PLAN
+-----------------------------------------
+ Aggregate
+ Output: count(*)
+ -> Merge Join
+ Merge Cond: (t1.i = t2.i)
+ -> Sort
+ Output: t1.i
+ Sort Key: t1.i
+ -> Seq Scan on public.t1
+ Output: t1.i
+ -> Sort
+ Output: t2.i
+ Sort Key: t2.i
+ -> Seq Scan on public.t2
+ Output: t2.i
+(14 rows)
+
+SET force_mergejoin_semijoin_filter = ON;
+-- One level of inner mergejoin: push semi-join filter to outer scan.
+EXPLAIN (VERBOSE, COSTS OFF) SELECT COUNT(*) FROM t1 JOIN t2 ON t1.i = t2.i;
+ QUERY PLAN
+---------------------------------------------------------
+ Aggregate
+ Output: count(*)
+ -> Merge Join
+ Merge Cond: (t1.i = t2.i)
+ SemiJoin Filter Created Based on: (t1.i = t2.i)
+ SemiJoin Estimated Filtering Rate: -0.0100
+ -> Sort
+ Output: t1.i
+ Sort Key: t1.i
+ -> Seq Scan on public.t1
+ Output: t1.i
+ -> Sort
+ Output: t2.i
+ Sort Key: t2.i
+ -> Seq Scan on public.t2
+ Output: t2.i
+(16 rows)
+
+SELECT COUNT(*) FROM t1 JOIN t2 ON t1.i = t2.i;
+ count
+--------
+ 100000
+(1 row)
+
+-- Push semijoin filter through SORT node.
+EXPLAIN (VERBOSE, COSTS OFF) SELECT COUNT(*) FROM (SELECT DISTINCT t1.i FROM t1 ORDER BY t1.i) x JOIN t2 ON x.i = t2.i;
+ QUERY PLAN
+---------------------------------------------------------
+ Aggregate
+ Output: count(*)
+ -> Merge Join
+ Merge Cond: (t1.i = t2.i)
+ SemiJoin Filter Created Based on: (t1.i = t2.i)
+ SemiJoin Estimated Filtering Rate: -0.0100
+ -> Sort
+ Output: t1.i
+ Sort Key: t1.i
+ -> HashAggregate
+ Output: t1.i
+ Group Key: t1.i
+ -> Seq Scan on public.t1
+ Output: t1.i, t1.j
+ -> Sort
+ Output: t2.i
+ Sort Key: t2.i
+ -> Seq Scan on public.t2
+ Output: t2.i
+(19 rows)
+
+SELECT COUNT(*) FROM (SELECT DISTINCT t1.i FROM t1 ORDER BY t1.i) x JOIN t2 ON x.i = t2.i;
+ count
+--------
+ 100000
+(1 row)
+
+-- Two levels of MergeJoin
+EXPLAIN (VERBOSE, COSTS OFF) SELECT COUNT(*) FROM t1 JOIN t2 ON t1.i = t2.i JOIN t3 ON t2.i = t3.i;
+ QUERY PLAN
+---------------------------------------------------------------------
+ Aggregate
+ Output: count(*)
+ -> Merge Join
+ Merge Cond: (t3.i = t1.i)
+ SemiJoin Filter Created Based on: (t3.i = t1.i)
+ SemiJoin Estimated Filtering Rate: -0.0100
+ -> Sort
+ Output: t3.i
+ Sort Key: t3.i
+ -> Seq Scan on public.t3
+ Output: t3.i
+ -> Materialize
+ Output: t1.i, t2.i
+ -> Merge Join
+ Output: t1.i, t2.i
+ Merge Cond: (t1.i = t2.i)
+ SemiJoin Filter Created Based on: (t1.i = t2.i)
+ SemiJoin Estimated Filtering Rate: -0.0100
+ -> Sort
+ Output: t1.i
+ Sort Key: t1.i
+ -> Seq Scan on public.t1
+ Output: t1.i
+ -> Sort
+ Output: t2.i
+ Sort Key: t2.i
+ -> Seq Scan on public.t2
+ Output: t2.i
+(28 rows)
+
+SELECT COUNT(*) FROM t1 JOIN t2 ON t1.i = t2.i JOIN t3 ON t2.i = t3.i;
+ count
+--------
+ 100000
+(1 row)
+
+DROP TABLE t1;
+DROP TABLE t2;
+DROP TABLE t3;
+RESET enable_mergejoin;
+RESET enable_memoize;
+RESET enable_mergejoin;
+RESET enable_mergejoin_semijoin_filter;
+RESET force_mergejoin_semijoin_filter;
diff --git a/src/test/regress/parallel_schedule b/src/test/regress/parallel_schedule
index 9f644a0c1b..6bde05b8cb 100644
--- a/src/test/regress/parallel_schedule
+++ b/src/test/regress/parallel_schedule
@@ -98,6 +98,7 @@ test: rules psql psql_crosstab amutils stats_ext collate.linux.utf8
test: select_parallel
test: write_parallel
test: vacuum_parallel
+test: mergejoin_semijoinfilter
# no relation related tests can be put in this group
test: publication subscription
diff --git a/src/test/regress/sql/mergejoin_semijoinfilter.sql b/src/test/regress/sql/mergejoin_semijoinfilter.sql
new file mode 100644
index 0000000000..6c6418f71a
--- /dev/null
+++ b/src/test/regress/sql/mergejoin_semijoinfilter.sql
@@ -0,0 +1,62 @@
+SET enable_hashjoin = OFF;
+SET enable_nestloop = OFF;
+SET enable_mergejoin = ON;
+SET enable_mergejoin_semijoin_filter = ON;
+SET force_mergejoin_semijoin_filter = ON;
+
+CREATE TABLE t1 (
+ i integer,
+ j integer
+);
+
+CREATE TABLE t2 (
+ i integer,
+ k integer
+);
+
+CREATE TABLE t3 (
+ i integer,
+ m integer
+);
+
+INSERT INTO t1 (i, j)
+ SELECT
+ generate_series(1,100000) AS i,
+ generate_series(1,100000) AS j;
+
+INSERT INTO t2 (i, k)
+ SELECT
+ generate_series(1,100000) AS i,
+ generate_series(1,100000) AS k;
+
+INSERT INTO t3 (i, m)
+ SELECT
+ generate_series(1,100000) AS i,
+ generate_series(1,100000) AS m;
+
+-- Semijoin filter is not used when force_mergejoin_semijoin_filter is OFF.
+SET force_mergejoin_semijoin_filter = OFF;
+EXPLAIN (VERBOSE, COSTS OFF) SELECT COUNT(*) FROM t1 JOIN t2 ON t1.i = t2.i;
+SET force_mergejoin_semijoin_filter = ON;
+
+-- One level of inner mergejoin: push semi-join filter to outer scan.
+EXPLAIN (VERBOSE, COSTS OFF) SELECT COUNT(*) FROM t1 JOIN t2 ON t1.i = t2.i;
+SELECT COUNT(*) FROM t1 JOIN t2 ON t1.i = t2.i;
+
+-- Push semijoin filter through SORT node.
+EXPLAIN (VERBOSE, COSTS OFF) SELECT COUNT(*) FROM (SELECT DISTINCT t1.i FROM t1 ORDER BY t1.i) x JOIN t2 ON x.i = t2.i;
+SELECT COUNT(*) FROM (SELECT DISTINCT t1.i FROM t1 ORDER BY t1.i) x JOIN t2 ON x.i = t2.i;
+
+-- Two levels of MergeJoin
+EXPLAIN (VERBOSE, COSTS OFF) SELECT COUNT(*) FROM t1 JOIN t2 ON t1.i = t2.i JOIN t3 ON t2.i = t3.i;
+SELECT COUNT(*) FROM t1 JOIN t2 ON t1.i = t2.i JOIN t3 ON t2.i = t3.i;
+
+DROP TABLE t1;
+DROP TABLE t2;
+DROP TABLE t3;
+
+RESET enable_mergejoin;
+RESET enable_memoize;
+RESET enable_mergejoin;
+RESET enable_mergejoin_semijoin_filter;
+RESET force_mergejoin_semijoin_filter;
--
2.37.1
theoretical_filtering_rate.pngimage/png; name=theoretical_filtering_rate.pngDownload
�PNG
IHDR � , ���x sRGB ��� xeXIfMM * > F( �i N � � � � �� , ��g pHYs g��R @ IDATx�������A����`�XK��1j�~c��{5��c����DSL�4s�G�1�K4��Q�X�`Q�����7��u��{�����<�a����g����3����a�-Y � � � � � � ��:��i��nv �@ @ @ @ @ @ ���wjY�G��
��� � � � � � �@S
L�0!�n�|=z4�4�`� � � � � � �t��wH��tW�#� � � � � � �G� Z #� � � � � � 4� A���s�@ @ @ @ @ @ � A�<@F @ @ @ @ @ h>�h���\1 � � � � � �@�hy�8� � � � � � �|�����b@ @ @ @ @ @ �<�� q@ @ @ @ @ @���5_�s� � � � � � � y���0 � � � � � �@� Dk�>��@ @ @ @ @ @ �D��a@ @ @ @ @ @ �� ��|}�#� � � � � � � ���� � � � � � � �'@����+F @ @ @ @ @ �#@-�@ @ @ @ @ @ �O� Z��9W� � � � � � �G� Z #� � � � � � 4� A���s�@ @ @ @ @ @ � A�<@F @ @ @ @ @ h>�h���\1 � � � � � �@�hy�8� � � � � � �|�����b@ @ @ @ @ @ �<�� q@ @ @ @ @ @���5_�s� � � � � � � y���0 � � � � � �@� Dk�>��@ @ @ @ @ @ �D��a@ @ @ @ @ @ �� ��|}�#� � � � � � � ���� � � � � � � �'@����+F @ @ @ @ @ �#@-�@ @ @ @ @ @ �O� Z��9W� � � � � � �G� Z #� � � � � � 4� A���s�@ @ @ @ @ @ � A�<@F @ @ @ @ @ h>�h���\1 � � � � � �@�hy�8� � � � � � �|�����b@ @ @ @ @ @ �<�� q@ @ @ @ @ @���5_�s� � � �@���/w}�Q���X?��w�����!Z� � P�����h � � K�,q�4Cj�k������{�)���N>�d7g��zk^]����m-�.�5��-in�l�OSe@ @ ��@��q\ � �@�
|�����ot�-�y �������[�������7�����n��C7v�X��[��e������{�-�}C�u;���k�k-�"�P�T�+V�V�Z����J*L���R�������������%�5�����K������^���M��]�������N;����wVjB @ �W� Z��=W� � ���o��x����r�!���6������i��n���n���o���������W`������ ��:y������s����A�F��\�S�c������|���M�.�wY�|Z���j!�I�,��R�_�v��+_�<����S�N����sp@���:^�TT��� � �A� Z�@ @ ��A��Yg����f�R#�t���/����A�o�������x�;����6m�S�I��9m���n�m�m�����G#�����R#]k��������%�]��w4 8�]�����'��j�����|W1�NT��R�_�&%V����b��7v�~������l��r�Jw����`��#�I���)M�)� � �"@�=�"� � �U`��ww�{lY�l����-�������#G������>���d��s�E�������??k��Z�^D
��_�������i������v�=�,�w��k6K[&�Vz_)��t�����n�e���������j��/�"��z�)�����lY�:�@ �w����@�� � �@��n��~�IS��/�k��y(����6}��'����u�R��k�y����G}Xl�Wo������&j�S�{"�a)�����g��m��-��-�7E�YK�,��oP�us�I����������?��@�N���Bl�^{m���~�i��������y�r������r6�� � �@��V%hN� � P>��/��O[��kWw�g���_?����z��t�M�!��������o�����kn����c��n�
6��"}��n�5����+o�����_���|�M��{��Q`*�Qz�L�.]|������������G�a5~��?���C���|���h�\r�o���R��y�=�\�����v��o?��#��;�p�;wv?�����Q��V�^�;���;����O�>~��=��������"n����^M9z�h����:�_�_���/���e�]�G���TP���5��'�|�=��C�N��]w]���|�
>��L���}���/�m�wN��lu$���KX������'�u�u�Q��h����������F������M_~�e�/���.]�{zU��y5&����?����>o��O!�DZ�R�a�p������ =��w��.�m��+�mjK���������g'L��t/,X��7_�M�T�^�:���&��XS���>����D�{"W��:��6*�}�]w�g�y���3��������1�������Yjt�����KI���ks?U���\+e@ @ �jD��2�@ @ (���#
�(]{������@A�k���-Z���a��[K�\���+��c,�F(8��p*x�Q>�n����@
t����>(��K/E�/{��w���s}�
\%���C�w�y���C��_���?
P(�%�kI��9T��^a�T^W\q�7���?����S�Lq�F�r��u��(���F/( w�m�E�����G�:���3�Uj#����U�S��0��=�Om�Qx��w���(.Q��5k����z��%^����
�) ��� �����+u����}�Q��a^�����A�e}_UX��
>�{��[�wO�5,�=�����M�<����k�.X*�m�����r�-��������h���~7.��2�Up��)�w5�]����b����N��'r�?����F�s���>�����M�O�����a��4�-��_�~�{?U��2� � PG���3h
� �@azX���z���+��I�&������H{P�F�K%�+�������OS]�=���h���(=���d.���ht��7�pC81b��f�m|����#r4����Q�w���6
Rh��FY
G����������e4��F�����n�5�p����Y�kVM�5�!�t� ����Q.
�����mS�Q#b��u���j��4�O���{�t~��F{�U#w��=[��>=���[l57��U��~���>