From 872f7b99210dbd8375790126cec3ba3c71cf30bc Mon Sep 17 00:00:00 2001
From: Filip Janus <fjanus@redhat.com>
Date: Thu, 31 Jul 2025 14:02:45 +0200
Subject: [PATCH v20250930 04/22] Add regression tests for temporary file
 compression

This commit adds comprehensive regression tests for the transparent
temporary file compression feature.

Test coverage:
- join_hash_lz4.sql: Tests hash join operations with LZ4 compression
- join_hash_pglz.sql: Tests hash join operations with PGLZ compression
- Both tests verify compression works correctly for various hash join scenarios
- Expected output files for validation

Test integration:
- LZ4 tests are conditionally enabled when PostgreSQL is built with --with-lz4
- PGLZ tests are always enabled as PGLZ is built-in
- Tests added to parallel regression test schedule
- GNUmakefile updated to include conditional LZ4 test execution

The tests ensure that compression/decompression works transparently
without affecting query results, while providing coverage for both
supported compression algorithms.
---
 src/test/regress/GNUmakefile                 |    4 +
 src/test/regress/expected/join_hash_lz4.out  | 1166 ++++++++++++++++++
 src/test/regress/expected/join_hash_pglz.out | 1166 ++++++++++++++++++
 src/test/regress/parallel_schedule           |    4 +-
 src/test/regress/sql/join_hash_lz4.sql       |  626 ++++++++++
 src/test/regress/sql/join_hash_pglz.sql      |  626 ++++++++++
 6 files changed, 3591 insertions(+), 1 deletion(-)
 create mode 100644 src/test/regress/expected/join_hash_lz4.out
 create mode 100644 src/test/regress/expected/join_hash_pglz.out
 create mode 100644 src/test/regress/sql/join_hash_lz4.sql
 create mode 100644 src/test/regress/sql/join_hash_pglz.sql

diff --git a/src/test/regress/GNUmakefile b/src/test/regress/GNUmakefile
index ef2bddf42ca..94df5649e34 100644
--- a/src/test/regress/GNUmakefile
+++ b/src/test/regress/GNUmakefile
@@ -94,6 +94,10 @@ installdirs-tests: installdirs
 REGRESS_OPTS = --dlpath=. --max-concurrent-tests=20 \
 	$(EXTRA_REGRESS_OPTS)
 
+ifeq ($(with_lz4),yes)
+override EXTRA_TESTS := $(EXTRA_TESTS) join_hash_lz4
+endif
+
 check: all
 	$(pg_regress_check) $(REGRESS_OPTS) --schedule=$(srcdir)/parallel_schedule $(MAXCONNOPT) $(EXTRA_TESTS)
 
diff --git a/src/test/regress/expected/join_hash_lz4.out b/src/test/regress/expected/join_hash_lz4.out
new file mode 100644
index 00000000000..966a5cd8f55
--- /dev/null
+++ b/src/test/regress/expected/join_hash_lz4.out
@@ -0,0 +1,1166 @@
+--
+-- exercises for the hash join code
+--
+begin;
+set local min_parallel_table_scan_size = 0;
+set local parallel_setup_cost = 0;
+set local enable_hashjoin = on;
+set local temp_file_compression = 'lz4';
+-- Extract bucket and batch counts from an explain analyze plan.  In
+-- general we can't make assertions about how many batches (or
+-- buckets) will be required because it can vary, but we can in some
+-- special cases and we can check for growth.
+create or replace function find_hash(node json)
+returns json language plpgsql
+as
+$$
+declare
+  x json;
+  child json;
+begin
+  if node->>'Node Type' = 'Hash' then
+    return node;
+  else
+    for child in select json_array_elements(node->'Plans')
+    loop
+      x := find_hash(child);
+      if x is not null then
+        return x;
+      end if;
+    end loop;
+    return null;
+  end if;
+end;
+$$;
+create or replace function hash_join_batches(query text)
+returns table (original int, final int) language plpgsql
+as
+$$
+declare
+  whole_plan json;
+  hash_node json;
+begin
+  for whole_plan in
+    execute 'explain (analyze, format ''json'') ' || query
+  loop
+    hash_node := find_hash(json_extract_path(whole_plan, '0', 'Plan'));
+    original := hash_node->>'Original Hash Batches';
+    final := hash_node->>'Hash Batches';
+    return next;
+  end loop;
+end;
+$$;
+-- Make a simple relation with well distributed keys and correctly
+-- estimated size.
+create table simple as
+  select generate_series(1, 20000) AS id, 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa';
+alter table simple set (parallel_workers = 2);
+analyze simple;
+-- Make a relation whose size we will under-estimate.  We want stats
+-- to say 1000 rows, but actually there are 20,000 rows.
+create table bigger_than_it_looks as
+  select generate_series(1, 20000) as id, 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa';
+alter table bigger_than_it_looks set (autovacuum_enabled = 'false');
+alter table bigger_than_it_looks set (parallel_workers = 2);
+analyze bigger_than_it_looks;
+update pg_class set reltuples = 1000 where relname = 'bigger_than_it_looks';
+-- Make a relation whose size we underestimate and that also has a
+-- kind of skew that breaks our batching scheme.  We want stats to say
+-- 2 rows, but actually there are 20,000 rows with the same key.
+create table extremely_skewed (id int, t text);
+alter table extremely_skewed set (autovacuum_enabled = 'false');
+alter table extremely_skewed set (parallel_workers = 2);
+analyze extremely_skewed;
+insert into extremely_skewed
+  select 42 as id, 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa'
+  from generate_series(1, 20000);
+update pg_class
+  set reltuples = 2, relpages = pg_relation_size('extremely_skewed') / 8192
+  where relname = 'extremely_skewed';
+-- Make a relation with a couple of enormous tuples.
+create table wide as select generate_series(1, 2) as id, rpad('', 320000, 'x') as t;
+alter table wide set (parallel_workers = 2);
+-- The "optimal" case: the hash table fits in memory; we plan for 1
+-- batch, we stick to that number, and peak memory usage stays within
+-- our work_mem budget
+-- non-parallel
+savepoint settings;
+set local max_parallel_workers_per_gather = 0;
+set local work_mem = '4MB';
+set local hash_mem_multiplier = 1.0;
+explain (costs off)
+  select count(*) from simple r join simple s using (id);
+               QUERY PLAN               
+----------------------------------------
+ Aggregate
+   ->  Hash Join
+         Hash Cond: (r.id = s.id)
+         ->  Seq Scan on simple r
+         ->  Hash
+               ->  Seq Scan on simple s
+(6 rows)
+
+select count(*) from simple r join simple s using (id);
+ count 
+-------
+ 20000
+(1 row)
+
+select original > 1 as initially_multibatch, final > original as increased_batches
+  from hash_join_batches(
+$$
+  select count(*) from simple r join simple s using (id);
+$$);
+ initially_multibatch | increased_batches 
+----------------------+-------------------
+ f                    | f
+(1 row)
+
+rollback to settings;
+-- parallel with parallel-oblivious hash join
+savepoint settings;
+set local max_parallel_workers_per_gather = 2;
+set local work_mem = '4MB';
+set local hash_mem_multiplier = 1.0;
+set local enable_parallel_hash = off;
+explain (costs off)
+  select count(*) from simple r join simple s using (id);
+                      QUERY PLAN                       
+-------------------------------------------------------
+ Finalize Aggregate
+   ->  Gather
+         Workers Planned: 2
+         ->  Partial Aggregate
+               ->  Hash Join
+                     Hash Cond: (r.id = s.id)
+                     ->  Parallel Seq Scan on simple r
+                     ->  Hash
+                           ->  Seq Scan on simple s
+(9 rows)
+
+select count(*) from simple r join simple s using (id);
+ count 
+-------
+ 20000
+(1 row)
+
+select original > 1 as initially_multibatch, final > original as increased_batches
+  from hash_join_batches(
+$$
+  select count(*) from simple r join simple s using (id);
+$$);
+ initially_multibatch | increased_batches 
+----------------------+-------------------
+ f                    | f
+(1 row)
+
+rollback to settings;
+-- parallel with parallel-aware hash join
+savepoint settings;
+set local max_parallel_workers_per_gather = 2;
+set local work_mem = '4MB';
+set local hash_mem_multiplier = 1.0;
+set local enable_parallel_hash = on;
+explain (costs off)
+  select count(*) from simple r join simple s using (id);
+                         QUERY PLAN                          
+-------------------------------------------------------------
+ Finalize Aggregate
+   ->  Gather
+         Workers Planned: 2
+         ->  Partial Aggregate
+               ->  Parallel Hash Join
+                     Hash Cond: (r.id = s.id)
+                     ->  Parallel Seq Scan on simple r
+                     ->  Parallel Hash
+                           ->  Parallel Seq Scan on simple s
+(9 rows)
+
+select count(*) from simple r join simple s using (id);
+ count 
+-------
+ 20000
+(1 row)
+
+select original > 1 as initially_multibatch, final > original as increased_batches
+  from hash_join_batches(
+$$
+  select count(*) from simple r join simple s using (id);
+$$);
+ initially_multibatch | increased_batches 
+----------------------+-------------------
+ f                    | f
+(1 row)
+
+rollback to settings;
+-- The "good" case: batches required, but we plan the right number; we
+-- plan for some number of batches, and we stick to that number, and
+-- peak memory usage says within our work_mem budget
+-- non-parallel
+savepoint settings;
+set local max_parallel_workers_per_gather = 0;
+set local work_mem = '128kB';
+set local hash_mem_multiplier = 1.0;
+explain (costs off)
+  select count(*) from simple r join simple s using (id);
+               QUERY PLAN               
+----------------------------------------
+ Aggregate
+   ->  Hash Join
+         Hash Cond: (r.id = s.id)
+         ->  Seq Scan on simple r
+         ->  Hash
+               ->  Seq Scan on simple s
+(6 rows)
+
+select count(*) from simple r join simple s using (id);
+ count 
+-------
+ 20000
+(1 row)
+
+select original > 1 as initially_multibatch, final > original as increased_batches
+  from hash_join_batches(
+$$
+  select count(*) from simple r join simple s using (id);
+$$);
+ initially_multibatch | increased_batches 
+----------------------+-------------------
+ t                    | f
+(1 row)
+
+rollback to settings;
+-- parallel with parallel-oblivious hash join
+savepoint settings;
+set local max_parallel_workers_per_gather = 2;
+set local work_mem = '128kB';
+set local hash_mem_multiplier = 1.0;
+set local enable_parallel_hash = off;
+explain (costs off)
+  select count(*) from simple r join simple s using (id);
+                      QUERY PLAN                       
+-------------------------------------------------------
+ Finalize Aggregate
+   ->  Gather
+         Workers Planned: 2
+         ->  Partial Aggregate
+               ->  Hash Join
+                     Hash Cond: (r.id = s.id)
+                     ->  Parallel Seq Scan on simple r
+                     ->  Hash
+                           ->  Seq Scan on simple s
+(9 rows)
+
+select count(*) from simple r join simple s using (id);
+ count 
+-------
+ 20000
+(1 row)
+
+select original > 1 as initially_multibatch, final > original as increased_batches
+  from hash_join_batches(
+$$
+  select count(*) from simple r join simple s using (id);
+$$);
+ initially_multibatch | increased_batches 
+----------------------+-------------------
+ t                    | f
+(1 row)
+
+rollback to settings;
+-- parallel with parallel-aware hash join
+savepoint settings;
+set local max_parallel_workers_per_gather = 2;
+set local work_mem = '192kB';
+set local hash_mem_multiplier = 1.0;
+set local enable_parallel_hash = on;
+explain (costs off)
+  select count(*) from simple r join simple s using (id);
+                         QUERY PLAN                          
+-------------------------------------------------------------
+ Finalize Aggregate
+   ->  Gather
+         Workers Planned: 2
+         ->  Partial Aggregate
+               ->  Parallel Hash Join
+                     Hash Cond: (r.id = s.id)
+                     ->  Parallel Seq Scan on simple r
+                     ->  Parallel Hash
+                           ->  Parallel Seq Scan on simple s
+(9 rows)
+
+select count(*) from simple r join simple s using (id);
+ count 
+-------
+ 20000
+(1 row)
+
+select original > 1 as initially_multibatch, final > original as increased_batches
+  from hash_join_batches(
+$$
+  select count(*) from simple r join simple s using (id);
+$$);
+ initially_multibatch | increased_batches 
+----------------------+-------------------
+ t                    | f
+(1 row)
+
+-- parallel full multi-batch hash join
+select count(*) from simple r full outer join simple s using (id);
+ count 
+-------
+ 20000
+(1 row)
+
+rollback to settings;
+-- The "bad" case: during execution we need to increase number of
+-- batches; in this case we plan for 1 batch, and increase at least a
+-- couple of times, and peak memory usage stays within our work_mem
+-- budget
+-- non-parallel
+savepoint settings;
+set local max_parallel_workers_per_gather = 0;
+set local work_mem = '128kB';
+set local hash_mem_multiplier = 1.0;
+explain (costs off)
+  select count(*) FROM simple r JOIN bigger_than_it_looks s USING (id);
+                      QUERY PLAN                      
+------------------------------------------------------
+ Aggregate
+   ->  Hash Join
+         Hash Cond: (r.id = s.id)
+         ->  Seq Scan on simple r
+         ->  Hash
+               ->  Seq Scan on bigger_than_it_looks s
+(6 rows)
+
+select count(*) FROM simple r JOIN bigger_than_it_looks s USING (id);
+ count 
+-------
+ 20000
+(1 row)
+
+select original > 1 as initially_multibatch, final > original as increased_batches
+  from hash_join_batches(
+$$
+  select count(*) FROM simple r JOIN bigger_than_it_looks s USING (id);
+$$);
+ initially_multibatch | increased_batches 
+----------------------+-------------------
+ f                    | t
+(1 row)
+
+rollback to settings;
+-- parallel with parallel-oblivious hash join
+savepoint settings;
+set local max_parallel_workers_per_gather = 2;
+set local work_mem = '128kB';
+set local hash_mem_multiplier = 1.0;
+set local enable_parallel_hash = off;
+explain (costs off)
+  select count(*) from simple r join bigger_than_it_looks s using (id);
+                            QUERY PLAN                            
+------------------------------------------------------------------
+ Finalize Aggregate
+   ->  Gather
+         Workers Planned: 2
+         ->  Partial Aggregate
+               ->  Hash Join
+                     Hash Cond: (r.id = s.id)
+                     ->  Parallel Seq Scan on simple r
+                     ->  Hash
+                           ->  Seq Scan on bigger_than_it_looks s
+(9 rows)
+
+select count(*) from simple r join bigger_than_it_looks s using (id);
+ count 
+-------
+ 20000
+(1 row)
+
+select original > 1 as initially_multibatch, final > original as increased_batches
+  from hash_join_batches(
+$$
+  select count(*) from simple r join bigger_than_it_looks s using (id);
+$$);
+ initially_multibatch | increased_batches 
+----------------------+-------------------
+ f                    | t
+(1 row)
+
+rollback to settings;
+-- parallel with parallel-aware hash join
+savepoint settings;
+set local max_parallel_workers_per_gather = 1;
+set local work_mem = '192kB';
+set local hash_mem_multiplier = 1.0;
+set local enable_parallel_hash = on;
+explain (costs off)
+  select count(*) from simple r join bigger_than_it_looks s using (id);
+                                QUERY PLAN                                 
+---------------------------------------------------------------------------
+ Finalize Aggregate
+   ->  Gather
+         Workers Planned: 1
+         ->  Partial Aggregate
+               ->  Parallel Hash Join
+                     Hash Cond: (r.id = s.id)
+                     ->  Parallel Seq Scan on simple r
+                     ->  Parallel Hash
+                           ->  Parallel Seq Scan on bigger_than_it_looks s
+(9 rows)
+
+select count(*) from simple r join bigger_than_it_looks s using (id);
+ count 
+-------
+ 20000
+(1 row)
+
+select original > 1 as initially_multibatch, final > original as increased_batches
+  from hash_join_batches(
+$$
+  select count(*) from simple r join bigger_than_it_looks s using (id);
+$$);
+ initially_multibatch | increased_batches 
+----------------------+-------------------
+ f                    | t
+(1 row)
+
+rollback to settings;
+-- The "ugly" case: increasing the number of batches during execution
+-- doesn't help, so stop trying to fit in work_mem and hope for the
+-- best; in this case we plan for 1 batch, increases just once and
+-- then stop increasing because that didn't help at all, so we blow
+-- right through the work_mem budget and hope for the best...
+-- non-parallel
+savepoint settings;
+set local max_parallel_workers_per_gather = 0;
+set local work_mem = '128kB';
+set local hash_mem_multiplier = 1.0;
+explain (costs off)
+  select count(*) from simple r join extremely_skewed s using (id);
+                    QUERY PLAN                    
+--------------------------------------------------
+ Aggregate
+   ->  Hash Join
+         Hash Cond: (r.id = s.id)
+         ->  Seq Scan on simple r
+         ->  Hash
+               ->  Seq Scan on extremely_skewed s
+(6 rows)
+
+select count(*) from simple r join extremely_skewed s using (id);
+ count 
+-------
+ 20000
+(1 row)
+
+select * from hash_join_batches(
+$$
+  select count(*) from simple r join extremely_skewed s using (id);
+$$);
+ original | final 
+----------+-------
+        1 |     2
+(1 row)
+
+rollback to settings;
+-- parallel with parallel-oblivious hash join
+savepoint settings;
+set local max_parallel_workers_per_gather = 2;
+set local work_mem = '128kB';
+set local hash_mem_multiplier = 1.0;
+set local enable_parallel_hash = off;
+explain (costs off)
+  select count(*) from simple r join extremely_skewed s using (id);
+                       QUERY PLAN                       
+--------------------------------------------------------
+ Aggregate
+   ->  Gather
+         Workers Planned: 2
+         ->  Hash Join
+               Hash Cond: (r.id = s.id)
+               ->  Parallel Seq Scan on simple r
+               ->  Hash
+                     ->  Seq Scan on extremely_skewed s
+(8 rows)
+
+select count(*) from simple r join extremely_skewed s using (id);
+ count 
+-------
+ 20000
+(1 row)
+
+select * from hash_join_batches(
+$$
+  select count(*) from simple r join extremely_skewed s using (id);
+$$);
+ original | final 
+----------+-------
+        1 |     2
+(1 row)
+
+rollback to settings;
+-- parallel with parallel-aware hash join
+savepoint settings;
+set local max_parallel_workers_per_gather = 1;
+set local work_mem = '128kB';
+set local hash_mem_multiplier = 1.0;
+set local enable_parallel_hash = on;
+explain (costs off)
+  select count(*) from simple r join extremely_skewed s using (id);
+                           QUERY PLAN                            
+-----------------------------------------------------------------
+ Aggregate
+   ->  Gather
+         Workers Planned: 1
+         ->  Parallel Hash Join
+               Hash Cond: (r.id = s.id)
+               ->  Parallel Seq Scan on simple r
+               ->  Parallel Hash
+                     ->  Parallel Seq Scan on extremely_skewed s
+(8 rows)
+
+select count(*) from simple r join extremely_skewed s using (id);
+ count 
+-------
+ 20000
+(1 row)
+
+select * from hash_join_batches(
+$$
+  select count(*) from simple r join extremely_skewed s using (id);
+$$);
+ original | final 
+----------+-------
+        1 |     4
+(1 row)
+
+rollback to settings;
+-- A couple of other hash join tests unrelated to work_mem management.
+-- Check that EXPLAIN ANALYZE has data even if the leader doesn't participate
+savepoint settings;
+set local max_parallel_workers_per_gather = 2;
+set local work_mem = '4MB';
+set local hash_mem_multiplier = 1.0;
+set local parallel_leader_participation = off;
+select * from hash_join_batches(
+$$
+  select count(*) from simple r join simple s using (id);
+$$);
+ original | final 
+----------+-------
+        1 |     1
+(1 row)
+
+rollback to settings;
+-- Exercise rescans.  We'll turn off parallel_leader_participation so
+-- that we can check that instrumentation comes back correctly.
+create table join_foo as select generate_series(1, 3) as id, 'xxxxx'::text as t;
+alter table join_foo set (parallel_workers = 0);
+create table join_bar as select generate_series(1, 10000) as id, 'xxxxx'::text as t;
+alter table join_bar set (parallel_workers = 2);
+-- multi-batch with rescan, parallel-oblivious
+savepoint settings;
+set enable_parallel_hash = off;
+set parallel_leader_participation = off;
+set min_parallel_table_scan_size = 0;
+set parallel_setup_cost = 0;
+set parallel_tuple_cost = 0;
+set max_parallel_workers_per_gather = 2;
+set enable_material = off;
+set enable_mergejoin = off;
+set work_mem = '64kB';
+set hash_mem_multiplier = 1.0;
+explain (costs off)
+  select count(*) from join_foo
+    left join (select b1.id, b1.t from join_bar b1 join join_bar b2 using (id)) ss
+    on join_foo.id < ss.id + 1 and join_foo.id > ss.id - 1;
+                                     QUERY PLAN                                     
+------------------------------------------------------------------------------------
+ Aggregate
+   ->  Nested Loop Left Join
+         Join Filter: ((join_foo.id < (b1.id + 1)) AND (join_foo.id > (b1.id - 1)))
+         ->  Seq Scan on join_foo
+         ->  Gather
+               Workers Planned: 2
+               ->  Hash Join
+                     Hash Cond: (b1.id = b2.id)
+                     ->  Parallel Seq Scan on join_bar b1
+                     ->  Hash
+                           ->  Seq Scan on join_bar b2
+(11 rows)
+
+select count(*) from join_foo
+  left join (select b1.id, b1.t from join_bar b1 join join_bar b2 using (id)) ss
+  on join_foo.id < ss.id + 1 and join_foo.id > ss.id - 1;
+ count 
+-------
+     3
+(1 row)
+
+select final > 1 as multibatch
+  from hash_join_batches(
+$$
+  select count(*) from join_foo
+    left join (select b1.id, b1.t from join_bar b1 join join_bar b2 using (id)) ss
+    on join_foo.id < ss.id + 1 and join_foo.id > ss.id - 1;
+$$);
+ multibatch 
+------------
+ t
+(1 row)
+
+rollback to settings;
+-- single-batch with rescan, parallel-oblivious
+savepoint settings;
+set enable_parallel_hash = off;
+set parallel_leader_participation = off;
+set min_parallel_table_scan_size = 0;
+set parallel_setup_cost = 0;
+set parallel_tuple_cost = 0;
+set max_parallel_workers_per_gather = 2;
+set enable_material = off;
+set enable_mergejoin = off;
+set work_mem = '4MB';
+set hash_mem_multiplier = 1.0;
+explain (costs off)
+  select count(*) from join_foo
+    left join (select b1.id, b1.t from join_bar b1 join join_bar b2 using (id)) ss
+    on join_foo.id < ss.id + 1 and join_foo.id > ss.id - 1;
+                                     QUERY PLAN                                     
+------------------------------------------------------------------------------------
+ Aggregate
+   ->  Nested Loop Left Join
+         Join Filter: ((join_foo.id < (b1.id + 1)) AND (join_foo.id > (b1.id - 1)))
+         ->  Seq Scan on join_foo
+         ->  Gather
+               Workers Planned: 2
+               ->  Hash Join
+                     Hash Cond: (b1.id = b2.id)
+                     ->  Parallel Seq Scan on join_bar b1
+                     ->  Hash
+                           ->  Seq Scan on join_bar b2
+(11 rows)
+
+select count(*) from join_foo
+  left join (select b1.id, b1.t from join_bar b1 join join_bar b2 using (id)) ss
+  on join_foo.id < ss.id + 1 and join_foo.id > ss.id - 1;
+ count 
+-------
+     3
+(1 row)
+
+select final > 1 as multibatch
+  from hash_join_batches(
+$$
+  select count(*) from join_foo
+    left join (select b1.id, b1.t from join_bar b1 join join_bar b2 using (id)) ss
+    on join_foo.id < ss.id + 1 and join_foo.id > ss.id - 1;
+$$);
+ multibatch 
+------------
+ f
+(1 row)
+
+rollback to settings;
+-- multi-batch with rescan, parallel-aware
+savepoint settings;
+set enable_parallel_hash = on;
+set parallel_leader_participation = off;
+set min_parallel_table_scan_size = 0;
+set parallel_setup_cost = 0;
+set parallel_tuple_cost = 0;
+set max_parallel_workers_per_gather = 2;
+set enable_material = off;
+set enable_mergejoin = off;
+set work_mem = '64kB';
+set hash_mem_multiplier = 1.0;
+explain (costs off)
+  select count(*) from join_foo
+    left join (select b1.id, b1.t from join_bar b1 join join_bar b2 using (id)) ss
+    on join_foo.id < ss.id + 1 and join_foo.id > ss.id - 1;
+                                     QUERY PLAN                                     
+------------------------------------------------------------------------------------
+ Aggregate
+   ->  Nested Loop Left Join
+         Join Filter: ((join_foo.id < (b1.id + 1)) AND (join_foo.id > (b1.id - 1)))
+         ->  Seq Scan on join_foo
+         ->  Gather
+               Workers Planned: 2
+               ->  Parallel Hash Join
+                     Hash Cond: (b1.id = b2.id)
+                     ->  Parallel Seq Scan on join_bar b1
+                     ->  Parallel Hash
+                           ->  Parallel Seq Scan on join_bar b2
+(11 rows)
+
+select count(*) from join_foo
+  left join (select b1.id, b1.t from join_bar b1 join join_bar b2 using (id)) ss
+  on join_foo.id < ss.id + 1 and join_foo.id > ss.id - 1;
+ count 
+-------
+     3
+(1 row)
+
+select final > 1 as multibatch
+  from hash_join_batches(
+$$
+  select count(*) from join_foo
+    left join (select b1.id, b1.t from join_bar b1 join join_bar b2 using (id)) ss
+    on join_foo.id < ss.id + 1 and join_foo.id > ss.id - 1;
+$$);
+ multibatch 
+------------
+ t
+(1 row)
+
+rollback to settings;
+-- single-batch with rescan, parallel-aware
+savepoint settings;
+set enable_parallel_hash = on;
+set parallel_leader_participation = off;
+set min_parallel_table_scan_size = 0;
+set parallel_setup_cost = 0;
+set parallel_tuple_cost = 0;
+set max_parallel_workers_per_gather = 2;
+set enable_material = off;
+set enable_mergejoin = off;
+set work_mem = '4MB';
+set hash_mem_multiplier = 1.0;
+explain (costs off)
+  select count(*) from join_foo
+    left join (select b1.id, b1.t from join_bar b1 join join_bar b2 using (id)) ss
+    on join_foo.id < ss.id + 1 and join_foo.id > ss.id - 1;
+                                     QUERY PLAN                                     
+------------------------------------------------------------------------------------
+ Aggregate
+   ->  Nested Loop Left Join
+         Join Filter: ((join_foo.id < (b1.id + 1)) AND (join_foo.id > (b1.id - 1)))
+         ->  Seq Scan on join_foo
+         ->  Gather
+               Workers Planned: 2
+               ->  Parallel Hash Join
+                     Hash Cond: (b1.id = b2.id)
+                     ->  Parallel Seq Scan on join_bar b1
+                     ->  Parallel Hash
+                           ->  Parallel Seq Scan on join_bar b2
+(11 rows)
+
+select count(*) from join_foo
+  left join (select b1.id, b1.t from join_bar b1 join join_bar b2 using (id)) ss
+  on join_foo.id < ss.id + 1 and join_foo.id > ss.id - 1;
+ count 
+-------
+     3
+(1 row)
+
+select final > 1 as multibatch
+  from hash_join_batches(
+$$
+  select count(*) from join_foo
+    left join (select b1.id, b1.t from join_bar b1 join join_bar b2 using (id)) ss
+    on join_foo.id < ss.id + 1 and join_foo.id > ss.id - 1;
+$$);
+ multibatch 
+------------
+ f
+(1 row)
+
+rollback to settings;
+-- A full outer join where every record is matched.
+-- non-parallel
+savepoint settings;
+set local max_parallel_workers_per_gather = 0;
+explain (costs off)
+     select  count(*) from simple r full outer join simple s using (id);
+               QUERY PLAN               
+----------------------------------------
+ Aggregate
+   ->  Hash Full Join
+         Hash Cond: (r.id = s.id)
+         ->  Seq Scan on simple r
+         ->  Hash
+               ->  Seq Scan on simple s
+(6 rows)
+
+select  count(*) from simple r full outer join simple s using (id);
+ count 
+-------
+ 20000
+(1 row)
+
+rollback to settings;
+-- parallelism not possible with parallel-oblivious full hash join
+savepoint settings;
+set enable_parallel_hash = off;
+set local max_parallel_workers_per_gather = 2;
+explain (costs off)
+     select  count(*) from simple r full outer join simple s using (id);
+               QUERY PLAN               
+----------------------------------------
+ Aggregate
+   ->  Hash Full Join
+         Hash Cond: (r.id = s.id)
+         ->  Seq Scan on simple r
+         ->  Hash
+               ->  Seq Scan on simple s
+(6 rows)
+
+select  count(*) from simple r full outer join simple s using (id);
+ count 
+-------
+ 20000
+(1 row)
+
+rollback to settings;
+-- parallelism is possible with parallel-aware full hash join
+savepoint settings;
+set local max_parallel_workers_per_gather = 2;
+explain (costs off)
+     select  count(*) from simple r full outer join simple s using (id);
+                         QUERY PLAN                          
+-------------------------------------------------------------
+ Finalize Aggregate
+   ->  Gather
+         Workers Planned: 2
+         ->  Partial Aggregate
+               ->  Parallel Hash Full Join
+                     Hash Cond: (r.id = s.id)
+                     ->  Parallel Seq Scan on simple r
+                     ->  Parallel Hash
+                           ->  Parallel Seq Scan on simple s
+(9 rows)
+
+select  count(*) from simple r full outer join simple s using (id);
+ count 
+-------
+ 20000
+(1 row)
+
+rollback to settings;
+-- A full outer join where every record is not matched.
+-- non-parallel
+savepoint settings;
+set local max_parallel_workers_per_gather = 0;
+explain (costs off)
+     select  count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
+               QUERY PLAN               
+----------------------------------------
+ Aggregate
+   ->  Hash Full Join
+         Hash Cond: ((0 - s.id) = r.id)
+         ->  Seq Scan on simple s
+         ->  Hash
+               ->  Seq Scan on simple r
+(6 rows)
+
+select  count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
+ count 
+-------
+ 40000
+(1 row)
+
+rollback to settings;
+-- parallelism not possible with parallel-oblivious full hash join
+savepoint settings;
+set enable_parallel_hash = off;
+set local max_parallel_workers_per_gather = 2;
+explain (costs off)
+     select  count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
+               QUERY PLAN               
+----------------------------------------
+ Aggregate
+   ->  Hash Full Join
+         Hash Cond: ((0 - s.id) = r.id)
+         ->  Seq Scan on simple s
+         ->  Hash
+               ->  Seq Scan on simple r
+(6 rows)
+
+select  count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
+ count 
+-------
+ 40000
+(1 row)
+
+rollback to settings;
+-- parallelism is possible with parallel-aware full hash join
+savepoint settings;
+set local max_parallel_workers_per_gather = 2;
+explain (costs off)
+     select  count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
+                         QUERY PLAN                          
+-------------------------------------------------------------
+ Finalize Aggregate
+   ->  Gather
+         Workers Planned: 2
+         ->  Partial Aggregate
+               ->  Parallel Hash Full Join
+                     Hash Cond: ((0 - s.id) = r.id)
+                     ->  Parallel Seq Scan on simple s
+                     ->  Parallel Hash
+                           ->  Parallel Seq Scan on simple r
+(9 rows)
+
+select  count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
+ count 
+-------
+ 40000
+(1 row)
+
+rollback to settings;
+-- exercise special code paths for huge tuples (note use of non-strict
+-- expression and left join required to get the detoasted tuple into
+-- the hash table)
+-- parallel with parallel-aware hash join (hits ExecParallelHashLoadTuple and
+-- sts_puttuple oversized tuple cases because it's multi-batch)
+savepoint settings;
+set max_parallel_workers_per_gather = 2;
+set enable_parallel_hash = on;
+set work_mem = '128kB';
+set hash_mem_multiplier = 1.0;
+explain (costs off)
+  select length(max(s.t))
+  from wide left join (select id, coalesce(t, '') || '' as t from wide) s using (id);
+                           QUERY PLAN                           
+----------------------------------------------------------------
+ Finalize Aggregate
+   ->  Gather
+         Workers Planned: 2
+         ->  Partial Aggregate
+               ->  Parallel Hash Left Join
+                     Hash Cond: (wide.id = wide_1.id)
+                     ->  Parallel Seq Scan on wide
+                     ->  Parallel Hash
+                           ->  Parallel Seq Scan on wide wide_1
+(9 rows)
+
+select length(max(s.t))
+from wide left join (select id, coalesce(t, '') || '' as t from wide) s using (id);
+ length 
+--------
+ 320000
+(1 row)
+
+select final > 1 as multibatch
+  from hash_join_batches(
+$$
+  select length(max(s.t))
+  from wide left join (select id, coalesce(t, '') || '' as t from wide) s using (id);
+$$);
+ multibatch 
+------------
+ t
+(1 row)
+
+rollback to settings;
+-- Hash join reuses the HOT status bit to indicate match status. This can only
+-- be guaranteed to produce correct results if all the hash join tuple match
+-- bits are reset before reuse. This is done upon loading them into the
+-- hashtable.
+SAVEPOINT settings;
+SET enable_parallel_hash = on;
+SET min_parallel_table_scan_size = 0;
+SET parallel_setup_cost = 0;
+SET parallel_tuple_cost = 0;
+CREATE TABLE hjtest_matchbits_t1(id int);
+CREATE TABLE hjtest_matchbits_t2(id int);
+INSERT INTO hjtest_matchbits_t1 VALUES (1);
+INSERT INTO hjtest_matchbits_t2 VALUES (2);
+-- Update should create a HOT tuple. If this status bit isn't cleared, we won't
+-- correctly emit the NULL-extended unmatching tuple in full hash join.
+UPDATE hjtest_matchbits_t2 set id = 2;
+SELECT * FROM hjtest_matchbits_t1 t1 FULL JOIN hjtest_matchbits_t2 t2 ON t1.id = t2.id
+  ORDER BY t1.id;
+ id | id 
+----+----
+  1 |   
+    |  2
+(2 rows)
+
+-- Test serial full hash join.
+-- Resetting parallel_setup_cost should force a serial plan.
+-- Just to be safe, however, set enable_parallel_hash to off, as parallel full
+-- hash joins are only supported with shared hashtables.
+RESET parallel_setup_cost;
+SET enable_parallel_hash = off;
+SELECT * FROM hjtest_matchbits_t1 t1 FULL JOIN hjtest_matchbits_t2 t2 ON t1.id = t2.id;
+ id | id 
+----+----
+  1 |   
+    |  2
+(2 rows)
+
+ROLLBACK TO settings;
+rollback;
+-- Verify that hash key expressions reference the correct
+-- nodes. Hashjoin's hashkeys need to reference its outer plan, Hash's
+-- need to reference Hash's outer plan (which is below HashJoin's
+-- inner plan). It's not trivial to verify that the references are
+-- correct (we don't display the hashkeys themselves), but if the
+-- hashkeys contain subplan references, those will be displayed. Force
+-- subplans to appear just about everywhere.
+--
+-- Bug report:
+-- https://www.postgresql.org/message-id/CAPpHfdvGVegF_TKKRiBrSmatJL2dR9uwFCuR%2BteQ_8tEXU8mxg%40mail.gmail.com
+--
+BEGIN;
+SET LOCAL enable_sort = OFF; -- avoid mergejoins
+SET LOCAL from_collapse_limit = 1; -- allows easy changing of join order
+CREATE TABLE hjtest_1 (a text, b int, id int, c bool);
+CREATE TABLE hjtest_2 (a bool, id int, b text, c int);
+INSERT INTO hjtest_1(a, b, id, c) VALUES ('text', 2, 1, false); -- matches
+INSERT INTO hjtest_1(a, b, id, c) VALUES ('text', 1, 2, false); -- fails id join condition
+INSERT INTO hjtest_1(a, b, id, c) VALUES ('text', 20, 1, false); -- fails < 50
+INSERT INTO hjtest_1(a, b, id, c) VALUES ('text', 1, 1, false); -- fails (SELECT hjtest_1.b * 5) = (SELECT hjtest_2.c*5)
+INSERT INTO hjtest_2(a, id, b, c) VALUES (true, 1, 'another', 2); -- matches
+INSERT INTO hjtest_2(a, id, b, c) VALUES (true, 3, 'another', 7); -- fails id join condition
+INSERT INTO hjtest_2(a, id, b, c) VALUES (true, 1, 'another', 90);  -- fails < 55
+INSERT INTO hjtest_2(a, id, b, c) VALUES (true, 1, 'another', 3); -- fails (SELECT hjtest_1.b * 5) = (SELECT hjtest_2.c*5)
+INSERT INTO hjtest_2(a, id, b, c) VALUES (true, 1, 'text', 1); --  fails hjtest_1.a <> hjtest_2.b;
+EXPLAIN (COSTS OFF, VERBOSE)
+SELECT hjtest_1.a a1, hjtest_2.a a2,hjtest_1.tableoid::regclass t1, hjtest_2.tableoid::regclass t2
+FROM hjtest_1, hjtest_2
+WHERE
+    hjtest_1.id = (SELECT 1 WHERE hjtest_2.id = 1)
+    AND (SELECT hjtest_1.b * 5) = (SELECT hjtest_2.c*5)
+    AND (SELECT hjtest_1.b * 5) < 50
+    AND (SELECT hjtest_2.c * 5) < 55
+    AND hjtest_1.a <> hjtest_2.b;
+                                           QUERY PLAN                                           
+------------------------------------------------------------------------------------------------
+ Hash Join
+   Output: hjtest_1.a, hjtest_2.a, (hjtest_1.tableoid)::regclass, (hjtest_2.tableoid)::regclass
+   Hash Cond: ((hjtest_1.id = (SubPlan 1)) AND ((SubPlan 2) = (SubPlan 3)))
+   Join Filter: (hjtest_1.a <> hjtest_2.b)
+   ->  Seq Scan on public.hjtest_1
+         Output: hjtest_1.a, hjtest_1.tableoid, hjtest_1.id, hjtest_1.b
+         Filter: ((SubPlan 4) < 50)
+         SubPlan 4
+           ->  Result
+                 Output: (hjtest_1.b * 5)
+   ->  Hash
+         Output: hjtest_2.a, hjtest_2.tableoid, hjtest_2.id, hjtest_2.c, hjtest_2.b
+         ->  Seq Scan on public.hjtest_2
+               Output: hjtest_2.a, hjtest_2.tableoid, hjtest_2.id, hjtest_2.c, hjtest_2.b
+               Filter: ((SubPlan 5) < 55)
+               SubPlan 5
+                 ->  Result
+                       Output: (hjtest_2.c * 5)
+         SubPlan 1
+           ->  Result
+                 Output: 1
+                 One-Time Filter: (hjtest_2.id = 1)
+         SubPlan 3
+           ->  Result
+                 Output: (hjtest_2.c * 5)
+   SubPlan 2
+     ->  Result
+           Output: (hjtest_1.b * 5)
+(28 rows)
+
+SELECT hjtest_1.a a1, hjtest_2.a a2,hjtest_1.tableoid::regclass t1, hjtest_2.tableoid::regclass t2
+FROM hjtest_1, hjtest_2
+WHERE
+    hjtest_1.id = (SELECT 1 WHERE hjtest_2.id = 1)
+    AND (SELECT hjtest_1.b * 5) = (SELECT hjtest_2.c*5)
+    AND (SELECT hjtest_1.b * 5) < 50
+    AND (SELECT hjtest_2.c * 5) < 55
+    AND hjtest_1.a <> hjtest_2.b;
+  a1  | a2 |    t1    |    t2    
+------+----+----------+----------
+ text | t  | hjtest_1 | hjtest_2
+(1 row)
+
+EXPLAIN (COSTS OFF, VERBOSE)
+SELECT hjtest_1.a a1, hjtest_2.a a2,hjtest_1.tableoid::regclass t1, hjtest_2.tableoid::regclass t2
+FROM hjtest_2, hjtest_1
+WHERE
+    hjtest_1.id = (SELECT 1 WHERE hjtest_2.id = 1)
+    AND (SELECT hjtest_1.b * 5) = (SELECT hjtest_2.c*5)
+    AND (SELECT hjtest_1.b * 5) < 50
+    AND (SELECT hjtest_2.c * 5) < 55
+    AND hjtest_1.a <> hjtest_2.b;
+                                           QUERY PLAN                                           
+------------------------------------------------------------------------------------------------
+ Hash Join
+   Output: hjtest_1.a, hjtest_2.a, (hjtest_1.tableoid)::regclass, (hjtest_2.tableoid)::regclass
+   Hash Cond: (((SubPlan 1) = hjtest_1.id) AND ((SubPlan 3) = (SubPlan 2)))
+   Join Filter: (hjtest_1.a <> hjtest_2.b)
+   ->  Seq Scan on public.hjtest_2
+         Output: hjtest_2.a, hjtest_2.tableoid, hjtest_2.id, hjtest_2.c, hjtest_2.b
+         Filter: ((SubPlan 5) < 55)
+         SubPlan 5
+           ->  Result
+                 Output: (hjtest_2.c * 5)
+   ->  Hash
+         Output: hjtest_1.a, hjtest_1.tableoid, hjtest_1.id, hjtest_1.b
+         ->  Seq Scan on public.hjtest_1
+               Output: hjtest_1.a, hjtest_1.tableoid, hjtest_1.id, hjtest_1.b
+               Filter: ((SubPlan 4) < 50)
+               SubPlan 4
+                 ->  Result
+                       Output: (hjtest_1.b * 5)
+         SubPlan 2
+           ->  Result
+                 Output: (hjtest_1.b * 5)
+   SubPlan 1
+     ->  Result
+           Output: 1
+           One-Time Filter: (hjtest_2.id = 1)
+   SubPlan 3
+     ->  Result
+           Output: (hjtest_2.c * 5)
+(28 rows)
+
+SELECT hjtest_1.a a1, hjtest_2.a a2,hjtest_1.tableoid::regclass t1, hjtest_2.tableoid::regclass t2
+FROM hjtest_2, hjtest_1
+WHERE
+    hjtest_1.id = (SELECT 1 WHERE hjtest_2.id = 1)
+    AND (SELECT hjtest_1.b * 5) = (SELECT hjtest_2.c*5)
+    AND (SELECT hjtest_1.b * 5) < 50
+    AND (SELECT hjtest_2.c * 5) < 55
+    AND hjtest_1.a <> hjtest_2.b;
+  a1  | a2 |    t1    |    t2    
+------+----+----------+----------
+ text | t  | hjtest_1 | hjtest_2
+(1 row)
+
+ROLLBACK;
+-- Verify that we behave sanely when the inner hash keys contain parameters
+-- (that is, outer or lateral references).  This situation has to defeat
+-- re-use of the inner hash table across rescans.
+begin;
+set local enable_hashjoin = on;
+explain (costs off)
+select i8.q2, ss.* from
+int8_tbl i8,
+lateral (select t1.fivethous, i4.f1 from tenk1 t1 join int4_tbl i4
+         on t1.fivethous = i4.f1+i8.q2 order by 1,2) ss;
+                        QUERY PLAN                         
+-----------------------------------------------------------
+ Nested Loop
+   ->  Seq Scan on int8_tbl i8
+   ->  Sort
+         Sort Key: t1.fivethous, i4.f1
+         ->  Hash Join
+               Hash Cond: (t1.fivethous = (i4.f1 + i8.q2))
+               ->  Seq Scan on tenk1 t1
+               ->  Hash
+                     ->  Seq Scan on int4_tbl i4
+(9 rows)
+
+select i8.q2, ss.* from
+int8_tbl i8,
+lateral (select t1.fivethous, i4.f1 from tenk1 t1 join int4_tbl i4
+         on t1.fivethous = i4.f1+i8.q2 order by 1,2) ss;
+ q2  | fivethous | f1 
+-----+-----------+----
+ 456 |       456 |  0
+ 456 |       456 |  0
+ 123 |       123 |  0
+ 123 |       123 |  0
+(4 rows)
+
+rollback;
diff --git a/src/test/regress/expected/join_hash_pglz.out b/src/test/regress/expected/join_hash_pglz.out
new file mode 100644
index 00000000000..99c67f982af
--- /dev/null
+++ b/src/test/regress/expected/join_hash_pglz.out
@@ -0,0 +1,1166 @@
+--
+-- exercises for the hash join code
+--
+begin;
+set local min_parallel_table_scan_size = 0;
+set local parallel_setup_cost = 0;
+set local enable_hashjoin = on;
+set local temp_file_compression = 'pglz';
+-- Extract bucket and batch counts from an explain analyze plan.  In
+-- general we can't make assertions about how many batches (or
+-- buckets) will be required because it can vary, but we can in some
+-- special cases and we can check for growth.
+create or replace function find_hash(node json)
+returns json language plpgsql
+as
+$$
+declare
+  x json;
+  child json;
+begin
+  if node->>'Node Type' = 'Hash' then
+    return node;
+  else
+    for child in select json_array_elements(node->'Plans')
+    loop
+      x := find_hash(child);
+      if x is not null then
+        return x;
+      end if;
+    end loop;
+    return null;
+  end if;
+end;
+$$;
+create or replace function hash_join_batches(query text)
+returns table (original int, final int) language plpgsql
+as
+$$
+declare
+  whole_plan json;
+  hash_node json;
+begin
+  for whole_plan in
+    execute 'explain (analyze, format ''json'') ' || query
+  loop
+    hash_node := find_hash(json_extract_path(whole_plan, '0', 'Plan'));
+    original := hash_node->>'Original Hash Batches';
+    final := hash_node->>'Hash Batches';
+    return next;
+  end loop;
+end;
+$$;
+-- Make a simple relation with well distributed keys and correctly
+-- estimated size.
+create table simple as
+  select generate_series(1, 20000) AS id, 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa';
+alter table simple set (parallel_workers = 2);
+analyze simple;
+-- Make a relation whose size we will under-estimate.  We want stats
+-- to say 1000 rows, but actually there are 20,000 rows.
+create table bigger_than_it_looks as
+  select generate_series(1, 20000) as id, 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa';
+alter table bigger_than_it_looks set (autovacuum_enabled = 'false');
+alter table bigger_than_it_looks set (parallel_workers = 2);
+analyze bigger_than_it_looks;
+update pg_class set reltuples = 1000 where relname = 'bigger_than_it_looks';
+-- Make a relation whose size we underestimate and that also has a
+-- kind of skew that breaks our batching scheme.  We want stats to say
+-- 2 rows, but actually there are 20,000 rows with the same key.
+create table extremely_skewed (id int, t text);
+alter table extremely_skewed set (autovacuum_enabled = 'false');
+alter table extremely_skewed set (parallel_workers = 2);
+analyze extremely_skewed;
+insert into extremely_skewed
+  select 42 as id, 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa'
+  from generate_series(1, 20000);
+update pg_class
+  set reltuples = 2, relpages = pg_relation_size('extremely_skewed') / 8192
+  where relname = 'extremely_skewed';
+-- Make a relation with a couple of enormous tuples.
+create table wide as select generate_series(1, 2) as id, rpad('', 320000, 'x') as t;
+alter table wide set (parallel_workers = 2);
+-- The "optimal" case: the hash table fits in memory; we plan for 1
+-- batch, we stick to that number, and peak memory usage stays within
+-- our work_mem budget
+-- non-parallel
+savepoint settings;
+set local max_parallel_workers_per_gather = 0;
+set local work_mem = '4MB';
+set local hash_mem_multiplier = 1.0;
+explain (costs off)
+  select count(*) from simple r join simple s using (id);
+               QUERY PLAN               
+----------------------------------------
+ Aggregate
+   ->  Hash Join
+         Hash Cond: (r.id = s.id)
+         ->  Seq Scan on simple r
+         ->  Hash
+               ->  Seq Scan on simple s
+(6 rows)
+
+select count(*) from simple r join simple s using (id);
+ count 
+-------
+ 20000
+(1 row)
+
+select original > 1 as initially_multibatch, final > original as increased_batches
+  from hash_join_batches(
+$$
+  select count(*) from simple r join simple s using (id);
+$$);
+ initially_multibatch | increased_batches 
+----------------------+-------------------
+ f                    | f
+(1 row)
+
+rollback to settings;
+-- parallel with parallel-oblivious hash join
+savepoint settings;
+set local max_parallel_workers_per_gather = 2;
+set local work_mem = '4MB';
+set local hash_mem_multiplier = 1.0;
+set local enable_parallel_hash = off;
+explain (costs off)
+  select count(*) from simple r join simple s using (id);
+                      QUERY PLAN                       
+-------------------------------------------------------
+ Finalize Aggregate
+   ->  Gather
+         Workers Planned: 2
+         ->  Partial Aggregate
+               ->  Hash Join
+                     Hash Cond: (r.id = s.id)
+                     ->  Parallel Seq Scan on simple r
+                     ->  Hash
+                           ->  Seq Scan on simple s
+(9 rows)
+
+select count(*) from simple r join simple s using (id);
+ count 
+-------
+ 20000
+(1 row)
+
+select original > 1 as initially_multibatch, final > original as increased_batches
+  from hash_join_batches(
+$$
+  select count(*) from simple r join simple s using (id);
+$$);
+ initially_multibatch | increased_batches 
+----------------------+-------------------
+ f                    | f
+(1 row)
+
+rollback to settings;
+-- parallel with parallel-aware hash join
+savepoint settings;
+set local max_parallel_workers_per_gather = 2;
+set local work_mem = '4MB';
+set local hash_mem_multiplier = 1.0;
+set local enable_parallel_hash = on;
+explain (costs off)
+  select count(*) from simple r join simple s using (id);
+                         QUERY PLAN                          
+-------------------------------------------------------------
+ Finalize Aggregate
+   ->  Gather
+         Workers Planned: 2
+         ->  Partial Aggregate
+               ->  Parallel Hash Join
+                     Hash Cond: (r.id = s.id)
+                     ->  Parallel Seq Scan on simple r
+                     ->  Parallel Hash
+                           ->  Parallel Seq Scan on simple s
+(9 rows)
+
+select count(*) from simple r join simple s using (id);
+ count 
+-------
+ 20000
+(1 row)
+
+select original > 1 as initially_multibatch, final > original as increased_batches
+  from hash_join_batches(
+$$
+  select count(*) from simple r join simple s using (id);
+$$);
+ initially_multibatch | increased_batches 
+----------------------+-------------------
+ f                    | f
+(1 row)
+
+rollback to settings;
+-- The "good" case: batches required, but we plan the right number; we
+-- plan for some number of batches, and we stick to that number, and
+-- peak memory usage says within our work_mem budget
+-- non-parallel
+savepoint settings;
+set local max_parallel_workers_per_gather = 0;
+set local work_mem = '128kB';
+set local hash_mem_multiplier = 1.0;
+explain (costs off)
+  select count(*) from simple r join simple s using (id);
+               QUERY PLAN               
+----------------------------------------
+ Aggregate
+   ->  Hash Join
+         Hash Cond: (r.id = s.id)
+         ->  Seq Scan on simple r
+         ->  Hash
+               ->  Seq Scan on simple s
+(6 rows)
+
+select count(*) from simple r join simple s using (id);
+ count 
+-------
+ 20000
+(1 row)
+
+select original > 1 as initially_multibatch, final > original as increased_batches
+  from hash_join_batches(
+$$
+  select count(*) from simple r join simple s using (id);
+$$);
+ initially_multibatch | increased_batches 
+----------------------+-------------------
+ t                    | f
+(1 row)
+
+rollback to settings;
+-- parallel with parallel-oblivious hash join
+savepoint settings;
+set local max_parallel_workers_per_gather = 2;
+set local work_mem = '128kB';
+set local hash_mem_multiplier = 1.0;
+set local enable_parallel_hash = off;
+explain (costs off)
+  select count(*) from simple r join simple s using (id);
+                      QUERY PLAN                       
+-------------------------------------------------------
+ Finalize Aggregate
+   ->  Gather
+         Workers Planned: 2
+         ->  Partial Aggregate
+               ->  Hash Join
+                     Hash Cond: (r.id = s.id)
+                     ->  Parallel Seq Scan on simple r
+                     ->  Hash
+                           ->  Seq Scan on simple s
+(9 rows)
+
+select count(*) from simple r join simple s using (id);
+ count 
+-------
+ 20000
+(1 row)
+
+select original > 1 as initially_multibatch, final > original as increased_batches
+  from hash_join_batches(
+$$
+  select count(*) from simple r join simple s using (id);
+$$);
+ initially_multibatch | increased_batches 
+----------------------+-------------------
+ t                    | f
+(1 row)
+
+rollback to settings;
+-- parallel with parallel-aware hash join
+savepoint settings;
+set local max_parallel_workers_per_gather = 2;
+set local work_mem = '192kB';
+set local hash_mem_multiplier = 1.0;
+set local enable_parallel_hash = on;
+explain (costs off)
+  select count(*) from simple r join simple s using (id);
+                         QUERY PLAN                          
+-------------------------------------------------------------
+ Finalize Aggregate
+   ->  Gather
+         Workers Planned: 2
+         ->  Partial Aggregate
+               ->  Parallel Hash Join
+                     Hash Cond: (r.id = s.id)
+                     ->  Parallel Seq Scan on simple r
+                     ->  Parallel Hash
+                           ->  Parallel Seq Scan on simple s
+(9 rows)
+
+select count(*) from simple r join simple s using (id);
+ count 
+-------
+ 20000
+(1 row)
+
+select original > 1 as initially_multibatch, final > original as increased_batches
+  from hash_join_batches(
+$$
+  select count(*) from simple r join simple s using (id);
+$$);
+ initially_multibatch | increased_batches 
+----------------------+-------------------
+ t                    | f
+(1 row)
+
+-- parallel full multi-batch hash join
+select count(*) from simple r full outer join simple s using (id);
+ count 
+-------
+ 20000
+(1 row)
+
+rollback to settings;
+-- The "bad" case: during execution we need to increase number of
+-- batches; in this case we plan for 1 batch, and increase at least a
+-- couple of times, and peak memory usage stays within our work_mem
+-- budget
+-- non-parallel
+savepoint settings;
+set local max_parallel_workers_per_gather = 0;
+set local work_mem = '128kB';
+set local hash_mem_multiplier = 1.0;
+explain (costs off)
+  select count(*) FROM simple r JOIN bigger_than_it_looks s USING (id);
+                      QUERY PLAN                      
+------------------------------------------------------
+ Aggregate
+   ->  Hash Join
+         Hash Cond: (r.id = s.id)
+         ->  Seq Scan on simple r
+         ->  Hash
+               ->  Seq Scan on bigger_than_it_looks s
+(6 rows)
+
+select count(*) FROM simple r JOIN bigger_than_it_looks s USING (id);
+ count 
+-------
+ 20000
+(1 row)
+
+select original > 1 as initially_multibatch, final > original as increased_batches
+  from hash_join_batches(
+$$
+  select count(*) FROM simple r JOIN bigger_than_it_looks s USING (id);
+$$);
+ initially_multibatch | increased_batches 
+----------------------+-------------------
+ f                    | t
+(1 row)
+
+rollback to settings;
+-- parallel with parallel-oblivious hash join
+savepoint settings;
+set local max_parallel_workers_per_gather = 2;
+set local work_mem = '128kB';
+set local hash_mem_multiplier = 1.0;
+set local enable_parallel_hash = off;
+explain (costs off)
+  select count(*) from simple r join bigger_than_it_looks s using (id);
+                            QUERY PLAN                            
+------------------------------------------------------------------
+ Finalize Aggregate
+   ->  Gather
+         Workers Planned: 2
+         ->  Partial Aggregate
+               ->  Hash Join
+                     Hash Cond: (r.id = s.id)
+                     ->  Parallel Seq Scan on simple r
+                     ->  Hash
+                           ->  Seq Scan on bigger_than_it_looks s
+(9 rows)
+
+select count(*) from simple r join bigger_than_it_looks s using (id);
+ count 
+-------
+ 20000
+(1 row)
+
+select original > 1 as initially_multibatch, final > original as increased_batches
+  from hash_join_batches(
+$$
+  select count(*) from simple r join bigger_than_it_looks s using (id);
+$$);
+ initially_multibatch | increased_batches 
+----------------------+-------------------
+ f                    | t
+(1 row)
+
+rollback to settings;
+-- parallel with parallel-aware hash join
+savepoint settings;
+set local max_parallel_workers_per_gather = 1;
+set local work_mem = '192kB';
+set local hash_mem_multiplier = 1.0;
+set local enable_parallel_hash = on;
+explain (costs off)
+  select count(*) from simple r join bigger_than_it_looks s using (id);
+                                QUERY PLAN                                 
+---------------------------------------------------------------------------
+ Finalize Aggregate
+   ->  Gather
+         Workers Planned: 1
+         ->  Partial Aggregate
+               ->  Parallel Hash Join
+                     Hash Cond: (r.id = s.id)
+                     ->  Parallel Seq Scan on simple r
+                     ->  Parallel Hash
+                           ->  Parallel Seq Scan on bigger_than_it_looks s
+(9 rows)
+
+select count(*) from simple r join bigger_than_it_looks s using (id);
+ count 
+-------
+ 20000
+(1 row)
+
+select original > 1 as initially_multibatch, final > original as increased_batches
+  from hash_join_batches(
+$$
+  select count(*) from simple r join bigger_than_it_looks s using (id);
+$$);
+ initially_multibatch | increased_batches 
+----------------------+-------------------
+ f                    | t
+(1 row)
+
+rollback to settings;
+-- The "ugly" case: increasing the number of batches during execution
+-- doesn't help, so stop trying to fit in work_mem and hope for the
+-- best; in this case we plan for 1 batch, increases just once and
+-- then stop increasing because that didn't help at all, so we blow
+-- right through the work_mem budget and hope for the best...
+-- non-parallel
+savepoint settings;
+set local max_parallel_workers_per_gather = 0;
+set local work_mem = '128kB';
+set local hash_mem_multiplier = 1.0;
+explain (costs off)
+  select count(*) from simple r join extremely_skewed s using (id);
+                    QUERY PLAN                    
+--------------------------------------------------
+ Aggregate
+   ->  Hash Join
+         Hash Cond: (r.id = s.id)
+         ->  Seq Scan on simple r
+         ->  Hash
+               ->  Seq Scan on extremely_skewed s
+(6 rows)
+
+select count(*) from simple r join extremely_skewed s using (id);
+ count 
+-------
+ 20000
+(1 row)
+
+select * from hash_join_batches(
+$$
+  select count(*) from simple r join extremely_skewed s using (id);
+$$);
+ original | final 
+----------+-------
+        1 |     2
+(1 row)
+
+rollback to settings;
+-- parallel with parallel-oblivious hash join
+savepoint settings;
+set local max_parallel_workers_per_gather = 2;
+set local work_mem = '128kB';
+set local hash_mem_multiplier = 1.0;
+set local enable_parallel_hash = off;
+explain (costs off)
+  select count(*) from simple r join extremely_skewed s using (id);
+                       QUERY PLAN                       
+--------------------------------------------------------
+ Aggregate
+   ->  Gather
+         Workers Planned: 2
+         ->  Hash Join
+               Hash Cond: (r.id = s.id)
+               ->  Parallel Seq Scan on simple r
+               ->  Hash
+                     ->  Seq Scan on extremely_skewed s
+(8 rows)
+
+select count(*) from simple r join extremely_skewed s using (id);
+ count 
+-------
+ 20000
+(1 row)
+
+select * from hash_join_batches(
+$$
+  select count(*) from simple r join extremely_skewed s using (id);
+$$);
+ original | final 
+----------+-------
+        1 |     2
+(1 row)
+
+rollback to settings;
+-- parallel with parallel-aware hash join
+savepoint settings;
+set local max_parallel_workers_per_gather = 1;
+set local work_mem = '128kB';
+set local hash_mem_multiplier = 1.0;
+set local enable_parallel_hash = on;
+explain (costs off)
+  select count(*) from simple r join extremely_skewed s using (id);
+                           QUERY PLAN                            
+-----------------------------------------------------------------
+ Aggregate
+   ->  Gather
+         Workers Planned: 1
+         ->  Parallel Hash Join
+               Hash Cond: (r.id = s.id)
+               ->  Parallel Seq Scan on simple r
+               ->  Parallel Hash
+                     ->  Parallel Seq Scan on extremely_skewed s
+(8 rows)
+
+select count(*) from simple r join extremely_skewed s using (id);
+ count 
+-------
+ 20000
+(1 row)
+
+select * from hash_join_batches(
+$$
+  select count(*) from simple r join extremely_skewed s using (id);
+$$);
+ original | final 
+----------+-------
+        1 |     4
+(1 row)
+
+rollback to settings;
+-- A couple of other hash join tests unrelated to work_mem management.
+-- Check that EXPLAIN ANALYZE has data even if the leader doesn't participate
+savepoint settings;
+set local max_parallel_workers_per_gather = 2;
+set local work_mem = '4MB';
+set local hash_mem_multiplier = 1.0;
+set local parallel_leader_participation = off;
+select * from hash_join_batches(
+$$
+  select count(*) from simple r join simple s using (id);
+$$);
+ original | final 
+----------+-------
+        1 |     1
+(1 row)
+
+rollback to settings;
+-- Exercise rescans.  We'll turn off parallel_leader_participation so
+-- that we can check that instrumentation comes back correctly.
+create table join_foo as select generate_series(1, 3) as id, 'xxxxx'::text as t;
+alter table join_foo set (parallel_workers = 0);
+create table join_bar as select generate_series(1, 10000) as id, 'xxxxx'::text as t;
+alter table join_bar set (parallel_workers = 2);
+-- multi-batch with rescan, parallel-oblivious
+savepoint settings;
+set enable_parallel_hash = off;
+set parallel_leader_participation = off;
+set min_parallel_table_scan_size = 0;
+set parallel_setup_cost = 0;
+set parallel_tuple_cost = 0;
+set max_parallel_workers_per_gather = 2;
+set enable_material = off;
+set enable_mergejoin = off;
+set work_mem = '64kB';
+set hash_mem_multiplier = 1.0;
+explain (costs off)
+  select count(*) from join_foo
+    left join (select b1.id, b1.t from join_bar b1 join join_bar b2 using (id)) ss
+    on join_foo.id < ss.id + 1 and join_foo.id > ss.id - 1;
+                                     QUERY PLAN                                     
+------------------------------------------------------------------------------------
+ Aggregate
+   ->  Nested Loop Left Join
+         Join Filter: ((join_foo.id < (b1.id + 1)) AND (join_foo.id > (b1.id - 1)))
+         ->  Seq Scan on join_foo
+         ->  Gather
+               Workers Planned: 2
+               ->  Hash Join
+                     Hash Cond: (b1.id = b2.id)
+                     ->  Parallel Seq Scan on join_bar b1
+                     ->  Hash
+                           ->  Seq Scan on join_bar b2
+(11 rows)
+
+select count(*) from join_foo
+  left join (select b1.id, b1.t from join_bar b1 join join_bar b2 using (id)) ss
+  on join_foo.id < ss.id + 1 and join_foo.id > ss.id - 1;
+ count 
+-------
+     3
+(1 row)
+
+select final > 1 as multibatch
+  from hash_join_batches(
+$$
+  select count(*) from join_foo
+    left join (select b1.id, b1.t from join_bar b1 join join_bar b2 using (id)) ss
+    on join_foo.id < ss.id + 1 and join_foo.id > ss.id - 1;
+$$);
+ multibatch 
+------------
+ t
+(1 row)
+
+rollback to settings;
+-- single-batch with rescan, parallel-oblivious
+savepoint settings;
+set enable_parallel_hash = off;
+set parallel_leader_participation = off;
+set min_parallel_table_scan_size = 0;
+set parallel_setup_cost = 0;
+set parallel_tuple_cost = 0;
+set max_parallel_workers_per_gather = 2;
+set enable_material = off;
+set enable_mergejoin = off;
+set work_mem = '4MB';
+set hash_mem_multiplier = 1.0;
+explain (costs off)
+  select count(*) from join_foo
+    left join (select b1.id, b1.t from join_bar b1 join join_bar b2 using (id)) ss
+    on join_foo.id < ss.id + 1 and join_foo.id > ss.id - 1;
+                                     QUERY PLAN                                     
+------------------------------------------------------------------------------------
+ Aggregate
+   ->  Nested Loop Left Join
+         Join Filter: ((join_foo.id < (b1.id + 1)) AND (join_foo.id > (b1.id - 1)))
+         ->  Seq Scan on join_foo
+         ->  Gather
+               Workers Planned: 2
+               ->  Hash Join
+                     Hash Cond: (b1.id = b2.id)
+                     ->  Parallel Seq Scan on join_bar b1
+                     ->  Hash
+                           ->  Seq Scan on join_bar b2
+(11 rows)
+
+select count(*) from join_foo
+  left join (select b1.id, b1.t from join_bar b1 join join_bar b2 using (id)) ss
+  on join_foo.id < ss.id + 1 and join_foo.id > ss.id - 1;
+ count 
+-------
+     3
+(1 row)
+
+select final > 1 as multibatch
+  from hash_join_batches(
+$$
+  select count(*) from join_foo
+    left join (select b1.id, b1.t from join_bar b1 join join_bar b2 using (id)) ss
+    on join_foo.id < ss.id + 1 and join_foo.id > ss.id - 1;
+$$);
+ multibatch 
+------------
+ f
+(1 row)
+
+rollback to settings;
+-- multi-batch with rescan, parallel-aware
+savepoint settings;
+set enable_parallel_hash = on;
+set parallel_leader_participation = off;
+set min_parallel_table_scan_size = 0;
+set parallel_setup_cost = 0;
+set parallel_tuple_cost = 0;
+set max_parallel_workers_per_gather = 2;
+set enable_material = off;
+set enable_mergejoin = off;
+set work_mem = '64kB';
+set hash_mem_multiplier = 1.0;
+explain (costs off)
+  select count(*) from join_foo
+    left join (select b1.id, b1.t from join_bar b1 join join_bar b2 using (id)) ss
+    on join_foo.id < ss.id + 1 and join_foo.id > ss.id - 1;
+                                     QUERY PLAN                                     
+------------------------------------------------------------------------------------
+ Aggregate
+   ->  Nested Loop Left Join
+         Join Filter: ((join_foo.id < (b1.id + 1)) AND (join_foo.id > (b1.id - 1)))
+         ->  Seq Scan on join_foo
+         ->  Gather
+               Workers Planned: 2
+               ->  Parallel Hash Join
+                     Hash Cond: (b1.id = b2.id)
+                     ->  Parallel Seq Scan on join_bar b1
+                     ->  Parallel Hash
+                           ->  Parallel Seq Scan on join_bar b2
+(11 rows)
+
+select count(*) from join_foo
+  left join (select b1.id, b1.t from join_bar b1 join join_bar b2 using (id)) ss
+  on join_foo.id < ss.id + 1 and join_foo.id > ss.id - 1;
+ count 
+-------
+     3
+(1 row)
+
+select final > 1 as multibatch
+  from hash_join_batches(
+$$
+  select count(*) from join_foo
+    left join (select b1.id, b1.t from join_bar b1 join join_bar b2 using (id)) ss
+    on join_foo.id < ss.id + 1 and join_foo.id > ss.id - 1;
+$$);
+ multibatch 
+------------
+ t
+(1 row)
+
+rollback to settings;
+-- single-batch with rescan, parallel-aware
+savepoint settings;
+set enable_parallel_hash = on;
+set parallel_leader_participation = off;
+set min_parallel_table_scan_size = 0;
+set parallel_setup_cost = 0;
+set parallel_tuple_cost = 0;
+set max_parallel_workers_per_gather = 2;
+set enable_material = off;
+set enable_mergejoin = off;
+set work_mem = '4MB';
+set hash_mem_multiplier = 1.0;
+explain (costs off)
+  select count(*) from join_foo
+    left join (select b1.id, b1.t from join_bar b1 join join_bar b2 using (id)) ss
+    on join_foo.id < ss.id + 1 and join_foo.id > ss.id - 1;
+                                     QUERY PLAN                                     
+------------------------------------------------------------------------------------
+ Aggregate
+   ->  Nested Loop Left Join
+         Join Filter: ((join_foo.id < (b1.id + 1)) AND (join_foo.id > (b1.id - 1)))
+         ->  Seq Scan on join_foo
+         ->  Gather
+               Workers Planned: 2
+               ->  Parallel Hash Join
+                     Hash Cond: (b1.id = b2.id)
+                     ->  Parallel Seq Scan on join_bar b1
+                     ->  Parallel Hash
+                           ->  Parallel Seq Scan on join_bar b2
+(11 rows)
+
+select count(*) from join_foo
+  left join (select b1.id, b1.t from join_bar b1 join join_bar b2 using (id)) ss
+  on join_foo.id < ss.id + 1 and join_foo.id > ss.id - 1;
+ count 
+-------
+     3
+(1 row)
+
+select final > 1 as multibatch
+  from hash_join_batches(
+$$
+  select count(*) from join_foo
+    left join (select b1.id, b1.t from join_bar b1 join join_bar b2 using (id)) ss
+    on join_foo.id < ss.id + 1 and join_foo.id > ss.id - 1;
+$$);
+ multibatch 
+------------
+ f
+(1 row)
+
+rollback to settings;
+-- A full outer join where every record is matched.
+-- non-parallel
+savepoint settings;
+set local max_parallel_workers_per_gather = 0;
+explain (costs off)
+     select  count(*) from simple r full outer join simple s using (id);
+               QUERY PLAN               
+----------------------------------------
+ Aggregate
+   ->  Hash Full Join
+         Hash Cond: (r.id = s.id)
+         ->  Seq Scan on simple r
+         ->  Hash
+               ->  Seq Scan on simple s
+(6 rows)
+
+select  count(*) from simple r full outer join simple s using (id);
+ count 
+-------
+ 20000
+(1 row)
+
+rollback to settings;
+-- parallelism not possible with parallel-oblivious full hash join
+savepoint settings;
+set enable_parallel_hash = off;
+set local max_parallel_workers_per_gather = 2;
+explain (costs off)
+     select  count(*) from simple r full outer join simple s using (id);
+               QUERY PLAN               
+----------------------------------------
+ Aggregate
+   ->  Hash Full Join
+         Hash Cond: (r.id = s.id)
+         ->  Seq Scan on simple r
+         ->  Hash
+               ->  Seq Scan on simple s
+(6 rows)
+
+select  count(*) from simple r full outer join simple s using (id);
+ count 
+-------
+ 20000
+(1 row)
+
+rollback to settings;
+-- parallelism is possible with parallel-aware full hash join
+savepoint settings;
+set local max_parallel_workers_per_gather = 2;
+explain (costs off)
+     select  count(*) from simple r full outer join simple s using (id);
+                         QUERY PLAN                          
+-------------------------------------------------------------
+ Finalize Aggregate
+   ->  Gather
+         Workers Planned: 2
+         ->  Partial Aggregate
+               ->  Parallel Hash Full Join
+                     Hash Cond: (r.id = s.id)
+                     ->  Parallel Seq Scan on simple r
+                     ->  Parallel Hash
+                           ->  Parallel Seq Scan on simple s
+(9 rows)
+
+select  count(*) from simple r full outer join simple s using (id);
+ count 
+-------
+ 20000
+(1 row)
+
+rollback to settings;
+-- A full outer join where every record is not matched.
+-- non-parallel
+savepoint settings;
+set local max_parallel_workers_per_gather = 0;
+explain (costs off)
+     select  count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
+               QUERY PLAN               
+----------------------------------------
+ Aggregate
+   ->  Hash Full Join
+         Hash Cond: ((0 - s.id) = r.id)
+         ->  Seq Scan on simple s
+         ->  Hash
+               ->  Seq Scan on simple r
+(6 rows)
+
+select  count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
+ count 
+-------
+ 40000
+(1 row)
+
+rollback to settings;
+-- parallelism not possible with parallel-oblivious full hash join
+savepoint settings;
+set enable_parallel_hash = off;
+set local max_parallel_workers_per_gather = 2;
+explain (costs off)
+     select  count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
+               QUERY PLAN               
+----------------------------------------
+ Aggregate
+   ->  Hash Full Join
+         Hash Cond: ((0 - s.id) = r.id)
+         ->  Seq Scan on simple s
+         ->  Hash
+               ->  Seq Scan on simple r
+(6 rows)
+
+select  count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
+ count 
+-------
+ 40000
+(1 row)
+
+rollback to settings;
+-- parallelism is possible with parallel-aware full hash join
+savepoint settings;
+set local max_parallel_workers_per_gather = 2;
+explain (costs off)
+     select  count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
+                         QUERY PLAN                          
+-------------------------------------------------------------
+ Finalize Aggregate
+   ->  Gather
+         Workers Planned: 2
+         ->  Partial Aggregate
+               ->  Parallel Hash Full Join
+                     Hash Cond: ((0 - s.id) = r.id)
+                     ->  Parallel Seq Scan on simple s
+                     ->  Parallel Hash
+                           ->  Parallel Seq Scan on simple r
+(9 rows)
+
+select  count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
+ count 
+-------
+ 40000
+(1 row)
+
+rollback to settings;
+-- exercise special code paths for huge tuples (note use of non-strict
+-- expression and left join required to get the detoasted tuple into
+-- the hash table)
+-- parallel with parallel-aware hash join (hits ExecParallelHashLoadTuple and
+-- sts_puttuple oversized tuple cases because it's multi-batch)
+savepoint settings;
+set max_parallel_workers_per_gather = 2;
+set enable_parallel_hash = on;
+set work_mem = '128kB';
+set hash_mem_multiplier = 1.0;
+explain (costs off)
+  select length(max(s.t))
+  from wide left join (select id, coalesce(t, '') || '' as t from wide) s using (id);
+                           QUERY PLAN                           
+----------------------------------------------------------------
+ Finalize Aggregate
+   ->  Gather
+         Workers Planned: 2
+         ->  Partial Aggregate
+               ->  Parallel Hash Left Join
+                     Hash Cond: (wide.id = wide_1.id)
+                     ->  Parallel Seq Scan on wide
+                     ->  Parallel Hash
+                           ->  Parallel Seq Scan on wide wide_1
+(9 rows)
+
+select length(max(s.t))
+from wide left join (select id, coalesce(t, '') || '' as t from wide) s using (id);
+ length 
+--------
+ 320000
+(1 row)
+
+select final > 1 as multibatch
+  from hash_join_batches(
+$$
+  select length(max(s.t))
+  from wide left join (select id, coalesce(t, '') || '' as t from wide) s using (id);
+$$);
+ multibatch 
+------------
+ t
+(1 row)
+
+rollback to settings;
+-- Hash join reuses the HOT status bit to indicate match status. This can only
+-- be guaranteed to produce correct results if all the hash join tuple match
+-- bits are reset before reuse. This is done upon loading them into the
+-- hashtable.
+SAVEPOINT settings;
+SET enable_parallel_hash = on;
+SET min_parallel_table_scan_size = 0;
+SET parallel_setup_cost = 0;
+SET parallel_tuple_cost = 0;
+CREATE TABLE hjtest_matchbits_t1(id int);
+CREATE TABLE hjtest_matchbits_t2(id int);
+INSERT INTO hjtest_matchbits_t1 VALUES (1);
+INSERT INTO hjtest_matchbits_t2 VALUES (2);
+-- Update should create a HOT tuple. If this status bit isn't cleared, we won't
+-- correctly emit the NULL-extended unmatching tuple in full hash join.
+UPDATE hjtest_matchbits_t2 set id = 2;
+SELECT * FROM hjtest_matchbits_t1 t1 FULL JOIN hjtest_matchbits_t2 t2 ON t1.id = t2.id
+  ORDER BY t1.id;
+ id | id 
+----+----
+  1 |   
+    |  2
+(2 rows)
+
+-- Test serial full hash join.
+-- Resetting parallel_setup_cost should force a serial plan.
+-- Just to be safe, however, set enable_parallel_hash to off, as parallel full
+-- hash joins are only supported with shared hashtables.
+RESET parallel_setup_cost;
+SET enable_parallel_hash = off;
+SELECT * FROM hjtest_matchbits_t1 t1 FULL JOIN hjtest_matchbits_t2 t2 ON t1.id = t2.id;
+ id | id 
+----+----
+  1 |   
+    |  2
+(2 rows)
+
+ROLLBACK TO settings;
+rollback;
+-- Verify that hash key expressions reference the correct
+-- nodes. Hashjoin's hashkeys need to reference its outer plan, Hash's
+-- need to reference Hash's outer plan (which is below HashJoin's
+-- inner plan). It's not trivial to verify that the references are
+-- correct (we don't display the hashkeys themselves), but if the
+-- hashkeys contain subplan references, those will be displayed. Force
+-- subplans to appear just about everywhere.
+--
+-- Bug report:
+-- https://www.postgresql.org/message-id/CAPpHfdvGVegF_TKKRiBrSmatJL2dR9uwFCuR%2BteQ_8tEXU8mxg%40mail.gmail.com
+--
+BEGIN;
+SET LOCAL enable_sort = OFF; -- avoid mergejoins
+SET LOCAL from_collapse_limit = 1; -- allows easy changing of join order
+CREATE TABLE hjtest_1 (a text, b int, id int, c bool);
+CREATE TABLE hjtest_2 (a bool, id int, b text, c int);
+INSERT INTO hjtest_1(a, b, id, c) VALUES ('text', 2, 1, false); -- matches
+INSERT INTO hjtest_1(a, b, id, c) VALUES ('text', 1, 2, false); -- fails id join condition
+INSERT INTO hjtest_1(a, b, id, c) VALUES ('text', 20, 1, false); -- fails < 50
+INSERT INTO hjtest_1(a, b, id, c) VALUES ('text', 1, 1, false); -- fails (SELECT hjtest_1.b * 5) = (SELECT hjtest_2.c*5)
+INSERT INTO hjtest_2(a, id, b, c) VALUES (true, 1, 'another', 2); -- matches
+INSERT INTO hjtest_2(a, id, b, c) VALUES (true, 3, 'another', 7); -- fails id join condition
+INSERT INTO hjtest_2(a, id, b, c) VALUES (true, 1, 'another', 90);  -- fails < 55
+INSERT INTO hjtest_2(a, id, b, c) VALUES (true, 1, 'another', 3); -- fails (SELECT hjtest_1.b * 5) = (SELECT hjtest_2.c*5)
+INSERT INTO hjtest_2(a, id, b, c) VALUES (true, 1, 'text', 1); --  fails hjtest_1.a <> hjtest_2.b;
+EXPLAIN (COSTS OFF, VERBOSE)
+SELECT hjtest_1.a a1, hjtest_2.a a2,hjtest_1.tableoid::regclass t1, hjtest_2.tableoid::regclass t2
+FROM hjtest_1, hjtest_2
+WHERE
+    hjtest_1.id = (SELECT 1 WHERE hjtest_2.id = 1)
+    AND (SELECT hjtest_1.b * 5) = (SELECT hjtest_2.c*5)
+    AND (SELECT hjtest_1.b * 5) < 50
+    AND (SELECT hjtest_2.c * 5) < 55
+    AND hjtest_1.a <> hjtest_2.b;
+                                           QUERY PLAN                                           
+------------------------------------------------------------------------------------------------
+ Hash Join
+   Output: hjtest_1.a, hjtest_2.a, (hjtest_1.tableoid)::regclass, (hjtest_2.tableoid)::regclass
+   Hash Cond: ((hjtest_1.id = (SubPlan 1)) AND ((SubPlan 2) = (SubPlan 3)))
+   Join Filter: (hjtest_1.a <> hjtest_2.b)
+   ->  Seq Scan on public.hjtest_1
+         Output: hjtest_1.a, hjtest_1.tableoid, hjtest_1.id, hjtest_1.b
+         Filter: ((SubPlan 4) < 50)
+         SubPlan 4
+           ->  Result
+                 Output: (hjtest_1.b * 5)
+   ->  Hash
+         Output: hjtest_2.a, hjtest_2.tableoid, hjtest_2.id, hjtest_2.c, hjtest_2.b
+         ->  Seq Scan on public.hjtest_2
+               Output: hjtest_2.a, hjtest_2.tableoid, hjtest_2.id, hjtest_2.c, hjtest_2.b
+               Filter: ((SubPlan 5) < 55)
+               SubPlan 5
+                 ->  Result
+                       Output: (hjtest_2.c * 5)
+         SubPlan 1
+           ->  Result
+                 Output: 1
+                 One-Time Filter: (hjtest_2.id = 1)
+         SubPlan 3
+           ->  Result
+                 Output: (hjtest_2.c * 5)
+   SubPlan 2
+     ->  Result
+           Output: (hjtest_1.b * 5)
+(28 rows)
+
+SELECT hjtest_1.a a1, hjtest_2.a a2,hjtest_1.tableoid::regclass t1, hjtest_2.tableoid::regclass t2
+FROM hjtest_1, hjtest_2
+WHERE
+    hjtest_1.id = (SELECT 1 WHERE hjtest_2.id = 1)
+    AND (SELECT hjtest_1.b * 5) = (SELECT hjtest_2.c*5)
+    AND (SELECT hjtest_1.b * 5) < 50
+    AND (SELECT hjtest_2.c * 5) < 55
+    AND hjtest_1.a <> hjtest_2.b;
+  a1  | a2 |    t1    |    t2    
+------+----+----------+----------
+ text | t  | hjtest_1 | hjtest_2
+(1 row)
+
+EXPLAIN (COSTS OFF, VERBOSE)
+SELECT hjtest_1.a a1, hjtest_2.a a2,hjtest_1.tableoid::regclass t1, hjtest_2.tableoid::regclass t2
+FROM hjtest_2, hjtest_1
+WHERE
+    hjtest_1.id = (SELECT 1 WHERE hjtest_2.id = 1)
+    AND (SELECT hjtest_1.b * 5) = (SELECT hjtest_2.c*5)
+    AND (SELECT hjtest_1.b * 5) < 50
+    AND (SELECT hjtest_2.c * 5) < 55
+    AND hjtest_1.a <> hjtest_2.b;
+                                           QUERY PLAN                                           
+------------------------------------------------------------------------------------------------
+ Hash Join
+   Output: hjtest_1.a, hjtest_2.a, (hjtest_1.tableoid)::regclass, (hjtest_2.tableoid)::regclass
+   Hash Cond: (((SubPlan 1) = hjtest_1.id) AND ((SubPlan 3) = (SubPlan 2)))
+   Join Filter: (hjtest_1.a <> hjtest_2.b)
+   ->  Seq Scan on public.hjtest_2
+         Output: hjtest_2.a, hjtest_2.tableoid, hjtest_2.id, hjtest_2.c, hjtest_2.b
+         Filter: ((SubPlan 5) < 55)
+         SubPlan 5
+           ->  Result
+                 Output: (hjtest_2.c * 5)
+   ->  Hash
+         Output: hjtest_1.a, hjtest_1.tableoid, hjtest_1.id, hjtest_1.b
+         ->  Seq Scan on public.hjtest_1
+               Output: hjtest_1.a, hjtest_1.tableoid, hjtest_1.id, hjtest_1.b
+               Filter: ((SubPlan 4) < 50)
+               SubPlan 4
+                 ->  Result
+                       Output: (hjtest_1.b * 5)
+         SubPlan 2
+           ->  Result
+                 Output: (hjtest_1.b * 5)
+   SubPlan 1
+     ->  Result
+           Output: 1
+           One-Time Filter: (hjtest_2.id = 1)
+   SubPlan 3
+     ->  Result
+           Output: (hjtest_2.c * 5)
+(28 rows)
+
+SELECT hjtest_1.a a1, hjtest_2.a a2,hjtest_1.tableoid::regclass t1, hjtest_2.tableoid::regclass t2
+FROM hjtest_2, hjtest_1
+WHERE
+    hjtest_1.id = (SELECT 1 WHERE hjtest_2.id = 1)
+    AND (SELECT hjtest_1.b * 5) = (SELECT hjtest_2.c*5)
+    AND (SELECT hjtest_1.b * 5) < 50
+    AND (SELECT hjtest_2.c * 5) < 55
+    AND hjtest_1.a <> hjtest_2.b;
+  a1  | a2 |    t1    |    t2    
+------+----+----------+----------
+ text | t  | hjtest_1 | hjtest_2
+(1 row)
+
+ROLLBACK;
+-- Verify that we behave sanely when the inner hash keys contain parameters
+-- (that is, outer or lateral references).  This situation has to defeat
+-- re-use of the inner hash table across rescans.
+begin;
+set local enable_hashjoin = on;
+explain (costs off)
+select i8.q2, ss.* from
+int8_tbl i8,
+lateral (select t1.fivethous, i4.f1 from tenk1 t1 join int4_tbl i4
+         on t1.fivethous = i4.f1+i8.q2 order by 1,2) ss;
+                        QUERY PLAN                         
+-----------------------------------------------------------
+ Nested Loop
+   ->  Seq Scan on int8_tbl i8
+   ->  Sort
+         Sort Key: t1.fivethous, i4.f1
+         ->  Hash Join
+               Hash Cond: (t1.fivethous = (i4.f1 + i8.q2))
+               ->  Seq Scan on tenk1 t1
+               ->  Hash
+                     ->  Seq Scan on int4_tbl i4
+(9 rows)
+
+select i8.q2, ss.* from
+int8_tbl i8,
+lateral (select t1.fivethous, i4.f1 from tenk1 t1 join int4_tbl i4
+         on t1.fivethous = i4.f1+i8.q2 order by 1,2) ss;
+ q2  | fivethous | f1 
+-----+-----------+----
+ 456 |       456 |  0
+ 456 |       456 |  0
+ 123 |       123 |  0
+ 123 |       123 |  0
+(4 rows)
+
+rollback;
diff --git a/src/test/regress/parallel_schedule b/src/test/regress/parallel_schedule
index fbffc67ae60..d62d44814ef 100644
--- a/src/test/regress/parallel_schedule
+++ b/src/test/regress/parallel_schedule
@@ -15,7 +15,6 @@ test: test_setup
 # The first group of parallel tests
 # ----------
 test: boolean char name varchar text int2 int4 int8 oid float4 float8 bit numeric txid uuid enum money rangetypes pg_lsn regproc
-
 # ----------
 # The second group of parallel tests
 # multirangetypes depends on rangetypes
@@ -140,3 +139,6 @@ test: fast_default
 # run tablespace test at the end because it drops the tablespace created during
 # setup that other tests may use.
 test: tablespace
+
+# this test is equivalent to join_hash test just the compression is enabled
+test: join_hash_pglz
diff --git a/src/test/regress/sql/join_hash_lz4.sql b/src/test/regress/sql/join_hash_lz4.sql
new file mode 100644
index 00000000000..1d19c1980e1
--- /dev/null
+++ b/src/test/regress/sql/join_hash_lz4.sql
@@ -0,0 +1,626 @@
+--
+-- exercises for the hash join code
+--
+
+begin;
+
+set local min_parallel_table_scan_size = 0;
+set local parallel_setup_cost = 0;
+set local enable_hashjoin = on;
+set local temp_file_compression = 'lz4';
+
+-- Extract bucket and batch counts from an explain analyze plan.  In
+-- general we can't make assertions about how many batches (or
+-- buckets) will be required because it can vary, but we can in some
+-- special cases and we can check for growth.
+create or replace function find_hash(node json)
+returns json language plpgsql
+as
+$$
+declare
+  x json;
+  child json;
+begin
+  if node->>'Node Type' = 'Hash' then
+    return node;
+  else
+    for child in select json_array_elements(node->'Plans')
+    loop
+      x := find_hash(child);
+      if x is not null then
+        return x;
+      end if;
+    end loop;
+    return null;
+  end if;
+end;
+$$;
+create or replace function hash_join_batches(query text)
+returns table (original int, final int) language plpgsql
+as
+$$
+declare
+  whole_plan json;
+  hash_node json;
+begin
+  for whole_plan in
+    execute 'explain (analyze, format ''json'') ' || query
+  loop
+    hash_node := find_hash(json_extract_path(whole_plan, '0', 'Plan'));
+    original := hash_node->>'Original Hash Batches';
+    final := hash_node->>'Hash Batches';
+    return next;
+  end loop;
+end;
+$$;
+
+-- Make a simple relation with well distributed keys and correctly
+-- estimated size.
+create table simple as
+  select generate_series(1, 20000) AS id, 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa';
+alter table simple set (parallel_workers = 2);
+analyze simple;
+
+-- Make a relation whose size we will under-estimate.  We want stats
+-- to say 1000 rows, but actually there are 20,000 rows.
+create table bigger_than_it_looks as
+  select generate_series(1, 20000) as id, 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa';
+alter table bigger_than_it_looks set (autovacuum_enabled = 'false');
+alter table bigger_than_it_looks set (parallel_workers = 2);
+analyze bigger_than_it_looks;
+update pg_class set reltuples = 1000 where relname = 'bigger_than_it_looks';
+
+-- Make a relation whose size we underestimate and that also has a
+-- kind of skew that breaks our batching scheme.  We want stats to say
+-- 2 rows, but actually there are 20,000 rows with the same key.
+create table extremely_skewed (id int, t text);
+alter table extremely_skewed set (autovacuum_enabled = 'false');
+alter table extremely_skewed set (parallel_workers = 2);
+analyze extremely_skewed;
+insert into extremely_skewed
+  select 42 as id, 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa'
+  from generate_series(1, 20000);
+update pg_class
+  set reltuples = 2, relpages = pg_relation_size('extremely_skewed') / 8192
+  where relname = 'extremely_skewed';
+
+-- Make a relation with a couple of enormous tuples.
+create table wide as select generate_series(1, 2) as id, rpad('', 320000, 'x') as t;
+alter table wide set (parallel_workers = 2);
+
+-- The "optimal" case: the hash table fits in memory; we plan for 1
+-- batch, we stick to that number, and peak memory usage stays within
+-- our work_mem budget
+
+-- non-parallel
+savepoint settings;
+set local max_parallel_workers_per_gather = 0;
+set local work_mem = '4MB';
+set local hash_mem_multiplier = 1.0;
+explain (costs off)
+  select count(*) from simple r join simple s using (id);
+select count(*) from simple r join simple s using (id);
+select original > 1 as initially_multibatch, final > original as increased_batches
+  from hash_join_batches(
+$$
+  select count(*) from simple r join simple s using (id);
+$$);
+rollback to settings;
+
+-- parallel with parallel-oblivious hash join
+savepoint settings;
+set local max_parallel_workers_per_gather = 2;
+set local work_mem = '4MB';
+set local hash_mem_multiplier = 1.0;
+set local enable_parallel_hash = off;
+explain (costs off)
+  select count(*) from simple r join simple s using (id);
+select count(*) from simple r join simple s using (id);
+select original > 1 as initially_multibatch, final > original as increased_batches
+  from hash_join_batches(
+$$
+  select count(*) from simple r join simple s using (id);
+$$);
+rollback to settings;
+
+-- parallel with parallel-aware hash join
+savepoint settings;
+set local max_parallel_workers_per_gather = 2;
+set local work_mem = '4MB';
+set local hash_mem_multiplier = 1.0;
+set local enable_parallel_hash = on;
+explain (costs off)
+  select count(*) from simple r join simple s using (id);
+select count(*) from simple r join simple s using (id);
+select original > 1 as initially_multibatch, final > original as increased_batches
+  from hash_join_batches(
+$$
+  select count(*) from simple r join simple s using (id);
+$$);
+rollback to settings;
+
+-- The "good" case: batches required, but we plan the right number; we
+-- plan for some number of batches, and we stick to that number, and
+-- peak memory usage says within our work_mem budget
+
+-- non-parallel
+savepoint settings;
+set local max_parallel_workers_per_gather = 0;
+set local work_mem = '128kB';
+set local hash_mem_multiplier = 1.0;
+explain (costs off)
+  select count(*) from simple r join simple s using (id);
+select count(*) from simple r join simple s using (id);
+select original > 1 as initially_multibatch, final > original as increased_batches
+  from hash_join_batches(
+$$
+  select count(*) from simple r join simple s using (id);
+$$);
+rollback to settings;
+
+-- parallel with parallel-oblivious hash join
+savepoint settings;
+set local max_parallel_workers_per_gather = 2;
+set local work_mem = '128kB';
+set local hash_mem_multiplier = 1.0;
+set local enable_parallel_hash = off;
+explain (costs off)
+  select count(*) from simple r join simple s using (id);
+select count(*) from simple r join simple s using (id);
+select original > 1 as initially_multibatch, final > original as increased_batches
+  from hash_join_batches(
+$$
+  select count(*) from simple r join simple s using (id);
+$$);
+rollback to settings;
+
+-- parallel with parallel-aware hash join
+savepoint settings;
+set local max_parallel_workers_per_gather = 2;
+set local work_mem = '192kB';
+set local hash_mem_multiplier = 1.0;
+set local enable_parallel_hash = on;
+explain (costs off)
+  select count(*) from simple r join simple s using (id);
+select count(*) from simple r join simple s using (id);
+select original > 1 as initially_multibatch, final > original as increased_batches
+  from hash_join_batches(
+$$
+  select count(*) from simple r join simple s using (id);
+$$);
+-- parallel full multi-batch hash join
+select count(*) from simple r full outer join simple s using (id);
+rollback to settings;
+
+-- The "bad" case: during execution we need to increase number of
+-- batches; in this case we plan for 1 batch, and increase at least a
+-- couple of times, and peak memory usage stays within our work_mem
+-- budget
+
+-- non-parallel
+savepoint settings;
+set local max_parallel_workers_per_gather = 0;
+set local work_mem = '128kB';
+set local hash_mem_multiplier = 1.0;
+explain (costs off)
+  select count(*) FROM simple r JOIN bigger_than_it_looks s USING (id);
+select count(*) FROM simple r JOIN bigger_than_it_looks s USING (id);
+select original > 1 as initially_multibatch, final > original as increased_batches
+  from hash_join_batches(
+$$
+  select count(*) FROM simple r JOIN bigger_than_it_looks s USING (id);
+$$);
+rollback to settings;
+
+-- parallel with parallel-oblivious hash join
+savepoint settings;
+set local max_parallel_workers_per_gather = 2;
+set local work_mem = '128kB';
+set local hash_mem_multiplier = 1.0;
+set local enable_parallel_hash = off;
+explain (costs off)
+  select count(*) from simple r join bigger_than_it_looks s using (id);
+select count(*) from simple r join bigger_than_it_looks s using (id);
+select original > 1 as initially_multibatch, final > original as increased_batches
+  from hash_join_batches(
+$$
+  select count(*) from simple r join bigger_than_it_looks s using (id);
+$$);
+rollback to settings;
+
+-- parallel with parallel-aware hash join
+savepoint settings;
+set local max_parallel_workers_per_gather = 1;
+set local work_mem = '192kB';
+set local hash_mem_multiplier = 1.0;
+set local enable_parallel_hash = on;
+explain (costs off)
+  select count(*) from simple r join bigger_than_it_looks s using (id);
+select count(*) from simple r join bigger_than_it_looks s using (id);
+select original > 1 as initially_multibatch, final > original as increased_batches
+  from hash_join_batches(
+$$
+  select count(*) from simple r join bigger_than_it_looks s using (id);
+$$);
+rollback to settings;
+
+-- The "ugly" case: increasing the number of batches during execution
+-- doesn't help, so stop trying to fit in work_mem and hope for the
+-- best; in this case we plan for 1 batch, increases just once and
+-- then stop increasing because that didn't help at all, so we blow
+-- right through the work_mem budget and hope for the best...
+
+-- non-parallel
+savepoint settings;
+set local max_parallel_workers_per_gather = 0;
+set local work_mem = '128kB';
+set local hash_mem_multiplier = 1.0;
+explain (costs off)
+  select count(*) from simple r join extremely_skewed s using (id);
+select count(*) from simple r join extremely_skewed s using (id);
+select * from hash_join_batches(
+$$
+  select count(*) from simple r join extremely_skewed s using (id);
+$$);
+rollback to settings;
+
+-- parallel with parallel-oblivious hash join
+savepoint settings;
+set local max_parallel_workers_per_gather = 2;
+set local work_mem = '128kB';
+set local hash_mem_multiplier = 1.0;
+set local enable_parallel_hash = off;
+explain (costs off)
+  select count(*) from simple r join extremely_skewed s using (id);
+select count(*) from simple r join extremely_skewed s using (id);
+select * from hash_join_batches(
+$$
+  select count(*) from simple r join extremely_skewed s using (id);
+$$);
+rollback to settings;
+
+-- parallel with parallel-aware hash join
+savepoint settings;
+set local max_parallel_workers_per_gather = 1;
+set local work_mem = '128kB';
+set local hash_mem_multiplier = 1.0;
+set local enable_parallel_hash = on;
+explain (costs off)
+  select count(*) from simple r join extremely_skewed s using (id);
+select count(*) from simple r join extremely_skewed s using (id);
+select * from hash_join_batches(
+$$
+  select count(*) from simple r join extremely_skewed s using (id);
+$$);
+rollback to settings;
+
+-- A couple of other hash join tests unrelated to work_mem management.
+
+-- Check that EXPLAIN ANALYZE has data even if the leader doesn't participate
+savepoint settings;
+set local max_parallel_workers_per_gather = 2;
+set local work_mem = '4MB';
+set local hash_mem_multiplier = 1.0;
+set local parallel_leader_participation = off;
+select * from hash_join_batches(
+$$
+  select count(*) from simple r join simple s using (id);
+$$);
+rollback to settings;
+
+-- Exercise rescans.  We'll turn off parallel_leader_participation so
+-- that we can check that instrumentation comes back correctly.
+
+create table join_foo as select generate_series(1, 3) as id, 'xxxxx'::text as t;
+alter table join_foo set (parallel_workers = 0);
+create table join_bar as select generate_series(1, 10000) as id, 'xxxxx'::text as t;
+alter table join_bar set (parallel_workers = 2);
+
+-- multi-batch with rescan, parallel-oblivious
+savepoint settings;
+set enable_parallel_hash = off;
+set parallel_leader_participation = off;
+set min_parallel_table_scan_size = 0;
+set parallel_setup_cost = 0;
+set parallel_tuple_cost = 0;
+set max_parallel_workers_per_gather = 2;
+set enable_material = off;
+set enable_mergejoin = off;
+set work_mem = '64kB';
+set hash_mem_multiplier = 1.0;
+explain (costs off)
+  select count(*) from join_foo
+    left join (select b1.id, b1.t from join_bar b1 join join_bar b2 using (id)) ss
+    on join_foo.id < ss.id + 1 and join_foo.id > ss.id - 1;
+select count(*) from join_foo
+  left join (select b1.id, b1.t from join_bar b1 join join_bar b2 using (id)) ss
+  on join_foo.id < ss.id + 1 and join_foo.id > ss.id - 1;
+select final > 1 as multibatch
+  from hash_join_batches(
+$$
+  select count(*) from join_foo
+    left join (select b1.id, b1.t from join_bar b1 join join_bar b2 using (id)) ss
+    on join_foo.id < ss.id + 1 and join_foo.id > ss.id - 1;
+$$);
+rollback to settings;
+
+-- single-batch with rescan, parallel-oblivious
+savepoint settings;
+set enable_parallel_hash = off;
+set parallel_leader_participation = off;
+set min_parallel_table_scan_size = 0;
+set parallel_setup_cost = 0;
+set parallel_tuple_cost = 0;
+set max_parallel_workers_per_gather = 2;
+set enable_material = off;
+set enable_mergejoin = off;
+set work_mem = '4MB';
+set hash_mem_multiplier = 1.0;
+explain (costs off)
+  select count(*) from join_foo
+    left join (select b1.id, b1.t from join_bar b1 join join_bar b2 using (id)) ss
+    on join_foo.id < ss.id + 1 and join_foo.id > ss.id - 1;
+select count(*) from join_foo
+  left join (select b1.id, b1.t from join_bar b1 join join_bar b2 using (id)) ss
+  on join_foo.id < ss.id + 1 and join_foo.id > ss.id - 1;
+select final > 1 as multibatch
+  from hash_join_batches(
+$$
+  select count(*) from join_foo
+    left join (select b1.id, b1.t from join_bar b1 join join_bar b2 using (id)) ss
+    on join_foo.id < ss.id + 1 and join_foo.id > ss.id - 1;
+$$);
+rollback to settings;
+
+-- multi-batch with rescan, parallel-aware
+savepoint settings;
+set enable_parallel_hash = on;
+set parallel_leader_participation = off;
+set min_parallel_table_scan_size = 0;
+set parallel_setup_cost = 0;
+set parallel_tuple_cost = 0;
+set max_parallel_workers_per_gather = 2;
+set enable_material = off;
+set enable_mergejoin = off;
+set work_mem = '64kB';
+set hash_mem_multiplier = 1.0;
+explain (costs off)
+  select count(*) from join_foo
+    left join (select b1.id, b1.t from join_bar b1 join join_bar b2 using (id)) ss
+    on join_foo.id < ss.id + 1 and join_foo.id > ss.id - 1;
+select count(*) from join_foo
+  left join (select b1.id, b1.t from join_bar b1 join join_bar b2 using (id)) ss
+  on join_foo.id < ss.id + 1 and join_foo.id > ss.id - 1;
+select final > 1 as multibatch
+  from hash_join_batches(
+$$
+  select count(*) from join_foo
+    left join (select b1.id, b1.t from join_bar b1 join join_bar b2 using (id)) ss
+    on join_foo.id < ss.id + 1 and join_foo.id > ss.id - 1;
+$$);
+rollback to settings;
+
+-- single-batch with rescan, parallel-aware
+savepoint settings;
+set enable_parallel_hash = on;
+set parallel_leader_participation = off;
+set min_parallel_table_scan_size = 0;
+set parallel_setup_cost = 0;
+set parallel_tuple_cost = 0;
+set max_parallel_workers_per_gather = 2;
+set enable_material = off;
+set enable_mergejoin = off;
+set work_mem = '4MB';
+set hash_mem_multiplier = 1.0;
+explain (costs off)
+  select count(*) from join_foo
+    left join (select b1.id, b1.t from join_bar b1 join join_bar b2 using (id)) ss
+    on join_foo.id < ss.id + 1 and join_foo.id > ss.id - 1;
+select count(*) from join_foo
+  left join (select b1.id, b1.t from join_bar b1 join join_bar b2 using (id)) ss
+  on join_foo.id < ss.id + 1 and join_foo.id > ss.id - 1;
+select final > 1 as multibatch
+  from hash_join_batches(
+$$
+  select count(*) from join_foo
+    left join (select b1.id, b1.t from join_bar b1 join join_bar b2 using (id)) ss
+    on join_foo.id < ss.id + 1 and join_foo.id > ss.id - 1;
+$$);
+rollback to settings;
+
+-- A full outer join where every record is matched.
+
+-- non-parallel
+savepoint settings;
+set local max_parallel_workers_per_gather = 0;
+explain (costs off)
+     select  count(*) from simple r full outer join simple s using (id);
+select  count(*) from simple r full outer join simple s using (id);
+rollback to settings;
+
+-- parallelism not possible with parallel-oblivious full hash join
+savepoint settings;
+set enable_parallel_hash = off;
+set local max_parallel_workers_per_gather = 2;
+explain (costs off)
+     select  count(*) from simple r full outer join simple s using (id);
+select  count(*) from simple r full outer join simple s using (id);
+rollback to settings;
+
+-- parallelism is possible with parallel-aware full hash join
+savepoint settings;
+set local max_parallel_workers_per_gather = 2;
+explain (costs off)
+     select  count(*) from simple r full outer join simple s using (id);
+select  count(*) from simple r full outer join simple s using (id);
+rollback to settings;
+
+-- A full outer join where every record is not matched.
+
+-- non-parallel
+savepoint settings;
+set local max_parallel_workers_per_gather = 0;
+explain (costs off)
+     select  count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
+select  count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
+rollback to settings;
+
+-- parallelism not possible with parallel-oblivious full hash join
+savepoint settings;
+set enable_parallel_hash = off;
+set local max_parallel_workers_per_gather = 2;
+explain (costs off)
+     select  count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
+select  count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
+rollback to settings;
+
+-- parallelism is possible with parallel-aware full hash join
+savepoint settings;
+set local max_parallel_workers_per_gather = 2;
+explain (costs off)
+     select  count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
+select  count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
+rollback to settings;
+
+
+-- exercise special code paths for huge tuples (note use of non-strict
+-- expression and left join required to get the detoasted tuple into
+-- the hash table)
+
+-- parallel with parallel-aware hash join (hits ExecParallelHashLoadTuple and
+-- sts_puttuple oversized tuple cases because it's multi-batch)
+savepoint settings;
+set max_parallel_workers_per_gather = 2;
+set enable_parallel_hash = on;
+set work_mem = '128kB';
+set hash_mem_multiplier = 1.0;
+explain (costs off)
+  select length(max(s.t))
+  from wide left join (select id, coalesce(t, '') || '' as t from wide) s using (id);
+select length(max(s.t))
+from wide left join (select id, coalesce(t, '') || '' as t from wide) s using (id);
+select final > 1 as multibatch
+  from hash_join_batches(
+$$
+  select length(max(s.t))
+  from wide left join (select id, coalesce(t, '') || '' as t from wide) s using (id);
+$$);
+rollback to settings;
+
+
+-- Hash join reuses the HOT status bit to indicate match status. This can only
+-- be guaranteed to produce correct results if all the hash join tuple match
+-- bits are reset before reuse. This is done upon loading them into the
+-- hashtable.
+SAVEPOINT settings;
+SET enable_parallel_hash = on;
+SET min_parallel_table_scan_size = 0;
+SET parallel_setup_cost = 0;
+SET parallel_tuple_cost = 0;
+CREATE TABLE hjtest_matchbits_t1(id int);
+CREATE TABLE hjtest_matchbits_t2(id int);
+INSERT INTO hjtest_matchbits_t1 VALUES (1);
+INSERT INTO hjtest_matchbits_t2 VALUES (2);
+-- Update should create a HOT tuple. If this status bit isn't cleared, we won't
+-- correctly emit the NULL-extended unmatching tuple in full hash join.
+UPDATE hjtest_matchbits_t2 set id = 2;
+SELECT * FROM hjtest_matchbits_t1 t1 FULL JOIN hjtest_matchbits_t2 t2 ON t1.id = t2.id
+  ORDER BY t1.id;
+-- Test serial full hash join.
+-- Resetting parallel_setup_cost should force a serial plan.
+-- Just to be safe, however, set enable_parallel_hash to off, as parallel full
+-- hash joins are only supported with shared hashtables.
+RESET parallel_setup_cost;
+SET enable_parallel_hash = off;
+SELECT * FROM hjtest_matchbits_t1 t1 FULL JOIN hjtest_matchbits_t2 t2 ON t1.id = t2.id;
+ROLLBACK TO settings;
+
+rollback;
+
+-- Verify that hash key expressions reference the correct
+-- nodes. Hashjoin's hashkeys need to reference its outer plan, Hash's
+-- need to reference Hash's outer plan (which is below HashJoin's
+-- inner plan). It's not trivial to verify that the references are
+-- correct (we don't display the hashkeys themselves), but if the
+-- hashkeys contain subplan references, those will be displayed. Force
+-- subplans to appear just about everywhere.
+--
+-- Bug report:
+-- https://www.postgresql.org/message-id/CAPpHfdvGVegF_TKKRiBrSmatJL2dR9uwFCuR%2BteQ_8tEXU8mxg%40mail.gmail.com
+--
+BEGIN;
+SET LOCAL enable_sort = OFF; -- avoid mergejoins
+SET LOCAL from_collapse_limit = 1; -- allows easy changing of join order
+
+CREATE TABLE hjtest_1 (a text, b int, id int, c bool);
+CREATE TABLE hjtest_2 (a bool, id int, b text, c int);
+
+INSERT INTO hjtest_1(a, b, id, c) VALUES ('text', 2, 1, false); -- matches
+INSERT INTO hjtest_1(a, b, id, c) VALUES ('text', 1, 2, false); -- fails id join condition
+INSERT INTO hjtest_1(a, b, id, c) VALUES ('text', 20, 1, false); -- fails < 50
+INSERT INTO hjtest_1(a, b, id, c) VALUES ('text', 1, 1, false); -- fails (SELECT hjtest_1.b * 5) = (SELECT hjtest_2.c*5)
+
+INSERT INTO hjtest_2(a, id, b, c) VALUES (true, 1, 'another', 2); -- matches
+INSERT INTO hjtest_2(a, id, b, c) VALUES (true, 3, 'another', 7); -- fails id join condition
+INSERT INTO hjtest_2(a, id, b, c) VALUES (true, 1, 'another', 90);  -- fails < 55
+INSERT INTO hjtest_2(a, id, b, c) VALUES (true, 1, 'another', 3); -- fails (SELECT hjtest_1.b * 5) = (SELECT hjtest_2.c*5)
+INSERT INTO hjtest_2(a, id, b, c) VALUES (true, 1, 'text', 1); --  fails hjtest_1.a <> hjtest_2.b;
+
+EXPLAIN (COSTS OFF, VERBOSE)
+SELECT hjtest_1.a a1, hjtest_2.a a2,hjtest_1.tableoid::regclass t1, hjtest_2.tableoid::regclass t2
+FROM hjtest_1, hjtest_2
+WHERE
+    hjtest_1.id = (SELECT 1 WHERE hjtest_2.id = 1)
+    AND (SELECT hjtest_1.b * 5) = (SELECT hjtest_2.c*5)
+    AND (SELECT hjtest_1.b * 5) < 50
+    AND (SELECT hjtest_2.c * 5) < 55
+    AND hjtest_1.a <> hjtest_2.b;
+
+SELECT hjtest_1.a a1, hjtest_2.a a2,hjtest_1.tableoid::regclass t1, hjtest_2.tableoid::regclass t2
+FROM hjtest_1, hjtest_2
+WHERE
+    hjtest_1.id = (SELECT 1 WHERE hjtest_2.id = 1)
+    AND (SELECT hjtest_1.b * 5) = (SELECT hjtest_2.c*5)
+    AND (SELECT hjtest_1.b * 5) < 50
+    AND (SELECT hjtest_2.c * 5) < 55
+    AND hjtest_1.a <> hjtest_2.b;
+
+EXPLAIN (COSTS OFF, VERBOSE)
+SELECT hjtest_1.a a1, hjtest_2.a a2,hjtest_1.tableoid::regclass t1, hjtest_2.tableoid::regclass t2
+FROM hjtest_2, hjtest_1
+WHERE
+    hjtest_1.id = (SELECT 1 WHERE hjtest_2.id = 1)
+    AND (SELECT hjtest_1.b * 5) = (SELECT hjtest_2.c*5)
+    AND (SELECT hjtest_1.b * 5) < 50
+    AND (SELECT hjtest_2.c * 5) < 55
+    AND hjtest_1.a <> hjtest_2.b;
+
+SELECT hjtest_1.a a1, hjtest_2.a a2,hjtest_1.tableoid::regclass t1, hjtest_2.tableoid::regclass t2
+FROM hjtest_2, hjtest_1
+WHERE
+    hjtest_1.id = (SELECT 1 WHERE hjtest_2.id = 1)
+    AND (SELECT hjtest_1.b * 5) = (SELECT hjtest_2.c*5)
+    AND (SELECT hjtest_1.b * 5) < 50
+    AND (SELECT hjtest_2.c * 5) < 55
+    AND hjtest_1.a <> hjtest_2.b;
+
+ROLLBACK;
+
+-- Verify that we behave sanely when the inner hash keys contain parameters
+-- (that is, outer or lateral references).  This situation has to defeat
+-- re-use of the inner hash table across rescans.
+begin;
+set local enable_hashjoin = on;
+
+explain (costs off)
+select i8.q2, ss.* from
+int8_tbl i8,
+lateral (select t1.fivethous, i4.f1 from tenk1 t1 join int4_tbl i4
+         on t1.fivethous = i4.f1+i8.q2 order by 1,2) ss;
+
+select i8.q2, ss.* from
+int8_tbl i8,
+lateral (select t1.fivethous, i4.f1 from tenk1 t1 join int4_tbl i4
+         on t1.fivethous = i4.f1+i8.q2 order by 1,2) ss;
+
+rollback;
diff --git a/src/test/regress/sql/join_hash_pglz.sql b/src/test/regress/sql/join_hash_pglz.sql
new file mode 100644
index 00000000000..2686afab272
--- /dev/null
+++ b/src/test/regress/sql/join_hash_pglz.sql
@@ -0,0 +1,626 @@
+--
+-- exercises for the hash join code
+--
+
+begin;
+
+set local min_parallel_table_scan_size = 0;
+set local parallel_setup_cost = 0;
+set local enable_hashjoin = on;
+set local temp_file_compression = 'pglz';
+
+-- Extract bucket and batch counts from an explain analyze plan.  In
+-- general we can't make assertions about how many batches (or
+-- buckets) will be required because it can vary, but we can in some
+-- special cases and we can check for growth.
+create or replace function find_hash(node json)
+returns json language plpgsql
+as
+$$
+declare
+  x json;
+  child json;
+begin
+  if node->>'Node Type' = 'Hash' then
+    return node;
+  else
+    for child in select json_array_elements(node->'Plans')
+    loop
+      x := find_hash(child);
+      if x is not null then
+        return x;
+      end if;
+    end loop;
+    return null;
+  end if;
+end;
+$$;
+create or replace function hash_join_batches(query text)
+returns table (original int, final int) language plpgsql
+as
+$$
+declare
+  whole_plan json;
+  hash_node json;
+begin
+  for whole_plan in
+    execute 'explain (analyze, format ''json'') ' || query
+  loop
+    hash_node := find_hash(json_extract_path(whole_plan, '0', 'Plan'));
+    original := hash_node->>'Original Hash Batches';
+    final := hash_node->>'Hash Batches';
+    return next;
+  end loop;
+end;
+$$;
+
+-- Make a simple relation with well distributed keys and correctly
+-- estimated size.
+create table simple as
+  select generate_series(1, 20000) AS id, 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa';
+alter table simple set (parallel_workers = 2);
+analyze simple;
+
+-- Make a relation whose size we will under-estimate.  We want stats
+-- to say 1000 rows, but actually there are 20,000 rows.
+create table bigger_than_it_looks as
+  select generate_series(1, 20000) as id, 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa';
+alter table bigger_than_it_looks set (autovacuum_enabled = 'false');
+alter table bigger_than_it_looks set (parallel_workers = 2);
+analyze bigger_than_it_looks;
+update pg_class set reltuples = 1000 where relname = 'bigger_than_it_looks';
+
+-- Make a relation whose size we underestimate and that also has a
+-- kind of skew that breaks our batching scheme.  We want stats to say
+-- 2 rows, but actually there are 20,000 rows with the same key.
+create table extremely_skewed (id int, t text);
+alter table extremely_skewed set (autovacuum_enabled = 'false');
+alter table extremely_skewed set (parallel_workers = 2);
+analyze extremely_skewed;
+insert into extremely_skewed
+  select 42 as id, 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa'
+  from generate_series(1, 20000);
+update pg_class
+  set reltuples = 2, relpages = pg_relation_size('extremely_skewed') / 8192
+  where relname = 'extremely_skewed';
+
+-- Make a relation with a couple of enormous tuples.
+create table wide as select generate_series(1, 2) as id, rpad('', 320000, 'x') as t;
+alter table wide set (parallel_workers = 2);
+
+-- The "optimal" case: the hash table fits in memory; we plan for 1
+-- batch, we stick to that number, and peak memory usage stays within
+-- our work_mem budget
+
+-- non-parallel
+savepoint settings;
+set local max_parallel_workers_per_gather = 0;
+set local work_mem = '4MB';
+set local hash_mem_multiplier = 1.0;
+explain (costs off)
+  select count(*) from simple r join simple s using (id);
+select count(*) from simple r join simple s using (id);
+select original > 1 as initially_multibatch, final > original as increased_batches
+  from hash_join_batches(
+$$
+  select count(*) from simple r join simple s using (id);
+$$);
+rollback to settings;
+
+-- parallel with parallel-oblivious hash join
+savepoint settings;
+set local max_parallel_workers_per_gather = 2;
+set local work_mem = '4MB';
+set local hash_mem_multiplier = 1.0;
+set local enable_parallel_hash = off;
+explain (costs off)
+  select count(*) from simple r join simple s using (id);
+select count(*) from simple r join simple s using (id);
+select original > 1 as initially_multibatch, final > original as increased_batches
+  from hash_join_batches(
+$$
+  select count(*) from simple r join simple s using (id);
+$$);
+rollback to settings;
+
+-- parallel with parallel-aware hash join
+savepoint settings;
+set local max_parallel_workers_per_gather = 2;
+set local work_mem = '4MB';
+set local hash_mem_multiplier = 1.0;
+set local enable_parallel_hash = on;
+explain (costs off)
+  select count(*) from simple r join simple s using (id);
+select count(*) from simple r join simple s using (id);
+select original > 1 as initially_multibatch, final > original as increased_batches
+  from hash_join_batches(
+$$
+  select count(*) from simple r join simple s using (id);
+$$);
+rollback to settings;
+
+-- The "good" case: batches required, but we plan the right number; we
+-- plan for some number of batches, and we stick to that number, and
+-- peak memory usage says within our work_mem budget
+
+-- non-parallel
+savepoint settings;
+set local max_parallel_workers_per_gather = 0;
+set local work_mem = '128kB';
+set local hash_mem_multiplier = 1.0;
+explain (costs off)
+  select count(*) from simple r join simple s using (id);
+select count(*) from simple r join simple s using (id);
+select original > 1 as initially_multibatch, final > original as increased_batches
+  from hash_join_batches(
+$$
+  select count(*) from simple r join simple s using (id);
+$$);
+rollback to settings;
+
+-- parallel with parallel-oblivious hash join
+savepoint settings;
+set local max_parallel_workers_per_gather = 2;
+set local work_mem = '128kB';
+set local hash_mem_multiplier = 1.0;
+set local enable_parallel_hash = off;
+explain (costs off)
+  select count(*) from simple r join simple s using (id);
+select count(*) from simple r join simple s using (id);
+select original > 1 as initially_multibatch, final > original as increased_batches
+  from hash_join_batches(
+$$
+  select count(*) from simple r join simple s using (id);
+$$);
+rollback to settings;
+
+-- parallel with parallel-aware hash join
+savepoint settings;
+set local max_parallel_workers_per_gather = 2;
+set local work_mem = '192kB';
+set local hash_mem_multiplier = 1.0;
+set local enable_parallel_hash = on;
+explain (costs off)
+  select count(*) from simple r join simple s using (id);
+select count(*) from simple r join simple s using (id);
+select original > 1 as initially_multibatch, final > original as increased_batches
+  from hash_join_batches(
+$$
+  select count(*) from simple r join simple s using (id);
+$$);
+-- parallel full multi-batch hash join
+select count(*) from simple r full outer join simple s using (id);
+rollback to settings;
+
+-- The "bad" case: during execution we need to increase number of
+-- batches; in this case we plan for 1 batch, and increase at least a
+-- couple of times, and peak memory usage stays within our work_mem
+-- budget
+
+-- non-parallel
+savepoint settings;
+set local max_parallel_workers_per_gather = 0;
+set local work_mem = '128kB';
+set local hash_mem_multiplier = 1.0;
+explain (costs off)
+  select count(*) FROM simple r JOIN bigger_than_it_looks s USING (id);
+select count(*) FROM simple r JOIN bigger_than_it_looks s USING (id);
+select original > 1 as initially_multibatch, final > original as increased_batches
+  from hash_join_batches(
+$$
+  select count(*) FROM simple r JOIN bigger_than_it_looks s USING (id);
+$$);
+rollback to settings;
+
+-- parallel with parallel-oblivious hash join
+savepoint settings;
+set local max_parallel_workers_per_gather = 2;
+set local work_mem = '128kB';
+set local hash_mem_multiplier = 1.0;
+set local enable_parallel_hash = off;
+explain (costs off)
+  select count(*) from simple r join bigger_than_it_looks s using (id);
+select count(*) from simple r join bigger_than_it_looks s using (id);
+select original > 1 as initially_multibatch, final > original as increased_batches
+  from hash_join_batches(
+$$
+  select count(*) from simple r join bigger_than_it_looks s using (id);
+$$);
+rollback to settings;
+
+-- parallel with parallel-aware hash join
+savepoint settings;
+set local max_parallel_workers_per_gather = 1;
+set local work_mem = '192kB';
+set local hash_mem_multiplier = 1.0;
+set local enable_parallel_hash = on;
+explain (costs off)
+  select count(*) from simple r join bigger_than_it_looks s using (id);
+select count(*) from simple r join bigger_than_it_looks s using (id);
+select original > 1 as initially_multibatch, final > original as increased_batches
+  from hash_join_batches(
+$$
+  select count(*) from simple r join bigger_than_it_looks s using (id);
+$$);
+rollback to settings;
+
+-- The "ugly" case: increasing the number of batches during execution
+-- doesn't help, so stop trying to fit in work_mem and hope for the
+-- best; in this case we plan for 1 batch, increases just once and
+-- then stop increasing because that didn't help at all, so we blow
+-- right through the work_mem budget and hope for the best...
+
+-- non-parallel
+savepoint settings;
+set local max_parallel_workers_per_gather = 0;
+set local work_mem = '128kB';
+set local hash_mem_multiplier = 1.0;
+explain (costs off)
+  select count(*) from simple r join extremely_skewed s using (id);
+select count(*) from simple r join extremely_skewed s using (id);
+select * from hash_join_batches(
+$$
+  select count(*) from simple r join extremely_skewed s using (id);
+$$);
+rollback to settings;
+
+-- parallel with parallel-oblivious hash join
+savepoint settings;
+set local max_parallel_workers_per_gather = 2;
+set local work_mem = '128kB';
+set local hash_mem_multiplier = 1.0;
+set local enable_parallel_hash = off;
+explain (costs off)
+  select count(*) from simple r join extremely_skewed s using (id);
+select count(*) from simple r join extremely_skewed s using (id);
+select * from hash_join_batches(
+$$
+  select count(*) from simple r join extremely_skewed s using (id);
+$$);
+rollback to settings;
+
+-- parallel with parallel-aware hash join
+savepoint settings;
+set local max_parallel_workers_per_gather = 1;
+set local work_mem = '128kB';
+set local hash_mem_multiplier = 1.0;
+set local enable_parallel_hash = on;
+explain (costs off)
+  select count(*) from simple r join extremely_skewed s using (id);
+select count(*) from simple r join extremely_skewed s using (id);
+select * from hash_join_batches(
+$$
+  select count(*) from simple r join extremely_skewed s using (id);
+$$);
+rollback to settings;
+
+-- A couple of other hash join tests unrelated to work_mem management.
+
+-- Check that EXPLAIN ANALYZE has data even if the leader doesn't participate
+savepoint settings;
+set local max_parallel_workers_per_gather = 2;
+set local work_mem = '4MB';
+set local hash_mem_multiplier = 1.0;
+set local parallel_leader_participation = off;
+select * from hash_join_batches(
+$$
+  select count(*) from simple r join simple s using (id);
+$$);
+rollback to settings;
+
+-- Exercise rescans.  We'll turn off parallel_leader_participation so
+-- that we can check that instrumentation comes back correctly.
+
+create table join_foo as select generate_series(1, 3) as id, 'xxxxx'::text as t;
+alter table join_foo set (parallel_workers = 0);
+create table join_bar as select generate_series(1, 10000) as id, 'xxxxx'::text as t;
+alter table join_bar set (parallel_workers = 2);
+
+-- multi-batch with rescan, parallel-oblivious
+savepoint settings;
+set enable_parallel_hash = off;
+set parallel_leader_participation = off;
+set min_parallel_table_scan_size = 0;
+set parallel_setup_cost = 0;
+set parallel_tuple_cost = 0;
+set max_parallel_workers_per_gather = 2;
+set enable_material = off;
+set enable_mergejoin = off;
+set work_mem = '64kB';
+set hash_mem_multiplier = 1.0;
+explain (costs off)
+  select count(*) from join_foo
+    left join (select b1.id, b1.t from join_bar b1 join join_bar b2 using (id)) ss
+    on join_foo.id < ss.id + 1 and join_foo.id > ss.id - 1;
+select count(*) from join_foo
+  left join (select b1.id, b1.t from join_bar b1 join join_bar b2 using (id)) ss
+  on join_foo.id < ss.id + 1 and join_foo.id > ss.id - 1;
+select final > 1 as multibatch
+  from hash_join_batches(
+$$
+  select count(*) from join_foo
+    left join (select b1.id, b1.t from join_bar b1 join join_bar b2 using (id)) ss
+    on join_foo.id < ss.id + 1 and join_foo.id > ss.id - 1;
+$$);
+rollback to settings;
+
+-- single-batch with rescan, parallel-oblivious
+savepoint settings;
+set enable_parallel_hash = off;
+set parallel_leader_participation = off;
+set min_parallel_table_scan_size = 0;
+set parallel_setup_cost = 0;
+set parallel_tuple_cost = 0;
+set max_parallel_workers_per_gather = 2;
+set enable_material = off;
+set enable_mergejoin = off;
+set work_mem = '4MB';
+set hash_mem_multiplier = 1.0;
+explain (costs off)
+  select count(*) from join_foo
+    left join (select b1.id, b1.t from join_bar b1 join join_bar b2 using (id)) ss
+    on join_foo.id < ss.id + 1 and join_foo.id > ss.id - 1;
+select count(*) from join_foo
+  left join (select b1.id, b1.t from join_bar b1 join join_bar b2 using (id)) ss
+  on join_foo.id < ss.id + 1 and join_foo.id > ss.id - 1;
+select final > 1 as multibatch
+  from hash_join_batches(
+$$
+  select count(*) from join_foo
+    left join (select b1.id, b1.t from join_bar b1 join join_bar b2 using (id)) ss
+    on join_foo.id < ss.id + 1 and join_foo.id > ss.id - 1;
+$$);
+rollback to settings;
+
+-- multi-batch with rescan, parallel-aware
+savepoint settings;
+set enable_parallel_hash = on;
+set parallel_leader_participation = off;
+set min_parallel_table_scan_size = 0;
+set parallel_setup_cost = 0;
+set parallel_tuple_cost = 0;
+set max_parallel_workers_per_gather = 2;
+set enable_material = off;
+set enable_mergejoin = off;
+set work_mem = '64kB';
+set hash_mem_multiplier = 1.0;
+explain (costs off)
+  select count(*) from join_foo
+    left join (select b1.id, b1.t from join_bar b1 join join_bar b2 using (id)) ss
+    on join_foo.id < ss.id + 1 and join_foo.id > ss.id - 1;
+select count(*) from join_foo
+  left join (select b1.id, b1.t from join_bar b1 join join_bar b2 using (id)) ss
+  on join_foo.id < ss.id + 1 and join_foo.id > ss.id - 1;
+select final > 1 as multibatch
+  from hash_join_batches(
+$$
+  select count(*) from join_foo
+    left join (select b1.id, b1.t from join_bar b1 join join_bar b2 using (id)) ss
+    on join_foo.id < ss.id + 1 and join_foo.id > ss.id - 1;
+$$);
+rollback to settings;
+
+-- single-batch with rescan, parallel-aware
+savepoint settings;
+set enable_parallel_hash = on;
+set parallel_leader_participation = off;
+set min_parallel_table_scan_size = 0;
+set parallel_setup_cost = 0;
+set parallel_tuple_cost = 0;
+set max_parallel_workers_per_gather = 2;
+set enable_material = off;
+set enable_mergejoin = off;
+set work_mem = '4MB';
+set hash_mem_multiplier = 1.0;
+explain (costs off)
+  select count(*) from join_foo
+    left join (select b1.id, b1.t from join_bar b1 join join_bar b2 using (id)) ss
+    on join_foo.id < ss.id + 1 and join_foo.id > ss.id - 1;
+select count(*) from join_foo
+  left join (select b1.id, b1.t from join_bar b1 join join_bar b2 using (id)) ss
+  on join_foo.id < ss.id + 1 and join_foo.id > ss.id - 1;
+select final > 1 as multibatch
+  from hash_join_batches(
+$$
+  select count(*) from join_foo
+    left join (select b1.id, b1.t from join_bar b1 join join_bar b2 using (id)) ss
+    on join_foo.id < ss.id + 1 and join_foo.id > ss.id - 1;
+$$);
+rollback to settings;
+
+-- A full outer join where every record is matched.
+
+-- non-parallel
+savepoint settings;
+set local max_parallel_workers_per_gather = 0;
+explain (costs off)
+     select  count(*) from simple r full outer join simple s using (id);
+select  count(*) from simple r full outer join simple s using (id);
+rollback to settings;
+
+-- parallelism not possible with parallel-oblivious full hash join
+savepoint settings;
+set enable_parallel_hash = off;
+set local max_parallel_workers_per_gather = 2;
+explain (costs off)
+     select  count(*) from simple r full outer join simple s using (id);
+select  count(*) from simple r full outer join simple s using (id);
+rollback to settings;
+
+-- parallelism is possible with parallel-aware full hash join
+savepoint settings;
+set local max_parallel_workers_per_gather = 2;
+explain (costs off)
+     select  count(*) from simple r full outer join simple s using (id);
+select  count(*) from simple r full outer join simple s using (id);
+rollback to settings;
+
+-- A full outer join where every record is not matched.
+
+-- non-parallel
+savepoint settings;
+set local max_parallel_workers_per_gather = 0;
+explain (costs off)
+     select  count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
+select  count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
+rollback to settings;
+
+-- parallelism not possible with parallel-oblivious full hash join
+savepoint settings;
+set enable_parallel_hash = off;
+set local max_parallel_workers_per_gather = 2;
+explain (costs off)
+     select  count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
+select  count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
+rollback to settings;
+
+-- parallelism is possible with parallel-aware full hash join
+savepoint settings;
+set local max_parallel_workers_per_gather = 2;
+explain (costs off)
+     select  count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
+select  count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
+rollback to settings;
+
+
+-- exercise special code paths for huge tuples (note use of non-strict
+-- expression and left join required to get the detoasted tuple into
+-- the hash table)
+
+-- parallel with parallel-aware hash join (hits ExecParallelHashLoadTuple and
+-- sts_puttuple oversized tuple cases because it's multi-batch)
+savepoint settings;
+set max_parallel_workers_per_gather = 2;
+set enable_parallel_hash = on;
+set work_mem = '128kB';
+set hash_mem_multiplier = 1.0;
+explain (costs off)
+  select length(max(s.t))
+  from wide left join (select id, coalesce(t, '') || '' as t from wide) s using (id);
+select length(max(s.t))
+from wide left join (select id, coalesce(t, '') || '' as t from wide) s using (id);
+select final > 1 as multibatch
+  from hash_join_batches(
+$$
+  select length(max(s.t))
+  from wide left join (select id, coalesce(t, '') || '' as t from wide) s using (id);
+$$);
+rollback to settings;
+
+
+-- Hash join reuses the HOT status bit to indicate match status. This can only
+-- be guaranteed to produce correct results if all the hash join tuple match
+-- bits are reset before reuse. This is done upon loading them into the
+-- hashtable.
+SAVEPOINT settings;
+SET enable_parallel_hash = on;
+SET min_parallel_table_scan_size = 0;
+SET parallel_setup_cost = 0;
+SET parallel_tuple_cost = 0;
+CREATE TABLE hjtest_matchbits_t1(id int);
+CREATE TABLE hjtest_matchbits_t2(id int);
+INSERT INTO hjtest_matchbits_t1 VALUES (1);
+INSERT INTO hjtest_matchbits_t2 VALUES (2);
+-- Update should create a HOT tuple. If this status bit isn't cleared, we won't
+-- correctly emit the NULL-extended unmatching tuple in full hash join.
+UPDATE hjtest_matchbits_t2 set id = 2;
+SELECT * FROM hjtest_matchbits_t1 t1 FULL JOIN hjtest_matchbits_t2 t2 ON t1.id = t2.id
+  ORDER BY t1.id;
+-- Test serial full hash join.
+-- Resetting parallel_setup_cost should force a serial plan.
+-- Just to be safe, however, set enable_parallel_hash to off, as parallel full
+-- hash joins are only supported with shared hashtables.
+RESET parallel_setup_cost;
+SET enable_parallel_hash = off;
+SELECT * FROM hjtest_matchbits_t1 t1 FULL JOIN hjtest_matchbits_t2 t2 ON t1.id = t2.id;
+ROLLBACK TO settings;
+
+rollback;
+
+-- Verify that hash key expressions reference the correct
+-- nodes. Hashjoin's hashkeys need to reference its outer plan, Hash's
+-- need to reference Hash's outer plan (which is below HashJoin's
+-- inner plan). It's not trivial to verify that the references are
+-- correct (we don't display the hashkeys themselves), but if the
+-- hashkeys contain subplan references, those will be displayed. Force
+-- subplans to appear just about everywhere.
+--
+-- Bug report:
+-- https://www.postgresql.org/message-id/CAPpHfdvGVegF_TKKRiBrSmatJL2dR9uwFCuR%2BteQ_8tEXU8mxg%40mail.gmail.com
+--
+BEGIN;
+SET LOCAL enable_sort = OFF; -- avoid mergejoins
+SET LOCAL from_collapse_limit = 1; -- allows easy changing of join order
+
+CREATE TABLE hjtest_1 (a text, b int, id int, c bool);
+CREATE TABLE hjtest_2 (a bool, id int, b text, c int);
+
+INSERT INTO hjtest_1(a, b, id, c) VALUES ('text', 2, 1, false); -- matches
+INSERT INTO hjtest_1(a, b, id, c) VALUES ('text', 1, 2, false); -- fails id join condition
+INSERT INTO hjtest_1(a, b, id, c) VALUES ('text', 20, 1, false); -- fails < 50
+INSERT INTO hjtest_1(a, b, id, c) VALUES ('text', 1, 1, false); -- fails (SELECT hjtest_1.b * 5) = (SELECT hjtest_2.c*5)
+
+INSERT INTO hjtest_2(a, id, b, c) VALUES (true, 1, 'another', 2); -- matches
+INSERT INTO hjtest_2(a, id, b, c) VALUES (true, 3, 'another', 7); -- fails id join condition
+INSERT INTO hjtest_2(a, id, b, c) VALUES (true, 1, 'another', 90);  -- fails < 55
+INSERT INTO hjtest_2(a, id, b, c) VALUES (true, 1, 'another', 3); -- fails (SELECT hjtest_1.b * 5) = (SELECT hjtest_2.c*5)
+INSERT INTO hjtest_2(a, id, b, c) VALUES (true, 1, 'text', 1); --  fails hjtest_1.a <> hjtest_2.b;
+
+EXPLAIN (COSTS OFF, VERBOSE)
+SELECT hjtest_1.a a1, hjtest_2.a a2,hjtest_1.tableoid::regclass t1, hjtest_2.tableoid::regclass t2
+FROM hjtest_1, hjtest_2
+WHERE
+    hjtest_1.id = (SELECT 1 WHERE hjtest_2.id = 1)
+    AND (SELECT hjtest_1.b * 5) = (SELECT hjtest_2.c*5)
+    AND (SELECT hjtest_1.b * 5) < 50
+    AND (SELECT hjtest_2.c * 5) < 55
+    AND hjtest_1.a <> hjtest_2.b;
+
+SELECT hjtest_1.a a1, hjtest_2.a a2,hjtest_1.tableoid::regclass t1, hjtest_2.tableoid::regclass t2
+FROM hjtest_1, hjtest_2
+WHERE
+    hjtest_1.id = (SELECT 1 WHERE hjtest_2.id = 1)
+    AND (SELECT hjtest_1.b * 5) = (SELECT hjtest_2.c*5)
+    AND (SELECT hjtest_1.b * 5) < 50
+    AND (SELECT hjtest_2.c * 5) < 55
+    AND hjtest_1.a <> hjtest_2.b;
+
+EXPLAIN (COSTS OFF, VERBOSE)
+SELECT hjtest_1.a a1, hjtest_2.a a2,hjtest_1.tableoid::regclass t1, hjtest_2.tableoid::regclass t2
+FROM hjtest_2, hjtest_1
+WHERE
+    hjtest_1.id = (SELECT 1 WHERE hjtest_2.id = 1)
+    AND (SELECT hjtest_1.b * 5) = (SELECT hjtest_2.c*5)
+    AND (SELECT hjtest_1.b * 5) < 50
+    AND (SELECT hjtest_2.c * 5) < 55
+    AND hjtest_1.a <> hjtest_2.b;
+
+SELECT hjtest_1.a a1, hjtest_2.a a2,hjtest_1.tableoid::regclass t1, hjtest_2.tableoid::regclass t2
+FROM hjtest_2, hjtest_1
+WHERE
+    hjtest_1.id = (SELECT 1 WHERE hjtest_2.id = 1)
+    AND (SELECT hjtest_1.b * 5) = (SELECT hjtest_2.c*5)
+    AND (SELECT hjtest_1.b * 5) < 50
+    AND (SELECT hjtest_2.c * 5) < 55
+    AND hjtest_1.a <> hjtest_2.b;
+
+ROLLBACK;
+
+-- Verify that we behave sanely when the inner hash keys contain parameters
+-- (that is, outer or lateral references).  This situation has to defeat
+-- re-use of the inner hash table across rescans.
+begin;
+set local enable_hashjoin = on;
+
+explain (costs off)
+select i8.q2, ss.* from
+int8_tbl i8,
+lateral (select t1.fivethous, i4.f1 from tenk1 t1 join int4_tbl i4
+         on t1.fivethous = i4.f1+i8.q2 order by 1,2) ss;
+
+select i8.q2, ss.* from
+int8_tbl i8,
+lateral (select t1.fivethous, i4.f1 from tenk1 t1 join int4_tbl i4
+         on t1.fivethous = i4.f1+i8.q2 order by 1,2) ss;
+
+rollback;
-- 
2.51.0

