logical replication of identity sequences

Started by Peter Eisentrautalmost 4 years ago1 messages
#1Peter Eisentraut
peter.eisentraut@enterprisedb.com
1 attachment(s)

Identity sequences shouldn't be addressed directly by name in normal
use. Therefore, requiring them to be added directly to publications is
a faulty interface. I think they should be considered included in a
publication automatically when their owning table is. See attached
patch for a sketch. (It doesn't actually work quite yet, but it shows
the idea, I think.)

If we end up keeping the logical replication of sequences feature, I
think something like this should be added, too.

Attachments:

v1-0001-Automatically-replicate-identity-sequences-with-t.patchtext/plain; charset=UTF-8; name=v1-0001-Automatically-replicate-identity-sequences-with-t.patchDownload
From ef7699a6f5ebe5d76f2f7ce14975b515a97df925 Mon Sep 17 00:00:00 2001
From: Peter Eisentraut <peter@eisentraut.org>
Date: Wed, 6 Apr 2022 14:47:36 +0200
Subject: [PATCH v1] Automatically replicate identity sequences with their
 tables

This automatically includes an identity sequence into a publication if
the owning table is part of the publication (directly or indirectly).
---
 src/backend/catalog/pg_publication.c        | 10 ++++++++++
 src/backend/replication/pgoutput/pgoutput.c | 18 ++++++++++++++++++
 src/test/subscription/t/030_sequences.pl    | 18 +++++++++++++++++-
 3 files changed, 45 insertions(+), 1 deletion(-)

diff --git a/src/backend/catalog/pg_publication.c b/src/backend/catalog/pg_publication.c
index 9fe3b18926..0f9bf1a28f 100644
--- a/src/backend/catalog/pg_publication.c
+++ b/src/backend/catalog/pg_publication.c
@@ -1392,6 +1392,7 @@ pg_get_publication_sequences(PG_FUNCTION_ARGS)
 		{
 			List	   *relids,
 					   *schemarelids;
+			ListCell   *lc;
 
 			relids = GetPublicationRelations(publication->oid,
 											 PUB_OBJTYPE_SEQUENCE,
@@ -1404,6 +1405,15 @@ pg_get_publication_sequences(PG_FUNCTION_ARGS)
 															PUBLICATION_PART_ROOT :
 															PUBLICATION_PART_LEAF);
 			sequences = list_concat_unique_oid(relids, schemarelids);
+
+			foreach(lc, GetPublicationRelations(publication->oid,
+												PUB_OBJTYPE_TABLE,
+												publication->pubviaroot ? PUBLICATION_PART_ROOT : PUBLICATION_PART_LEAF))
+			{
+				Oid			relid = lfirst_oid(lc);
+
+				sequences = list_concat_unique_oid(sequences, getOwnedSequences(relid));
+			}
 		}
 
 		funcctx->user_fctx = (void *) sequences;
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 9d33630464..1044458a47 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -13,6 +13,7 @@
 #include "postgres.h"
 
 #include "access/tupconvert.h"
+#include "catalog/dependency.h"
 #include "catalog/partition.h"
 #include "catalog/pg_publication.h"
 #include "catalog/pg_publication_namespace.h"
@@ -2206,6 +2207,23 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
 					}
 				}
 
+				if (relation->rd_rel->relkind == RELKIND_SEQUENCE)
+				{
+					Oid			tableId;
+					int32		colId;
+
+					if (sequenceIsOwned(relid, DEPENDENCY_INTERNAL, &tableId, &colId))
+					{
+						if (GetRelationPublications(tableId) ||
+							GetSchemaPublications(get_rel_namespace(tableId),
+												  PUB_OBJTYPE_TABLE) ||
+							pub->alltables)
+						{
+							ancestor_published = true;
+						}
+					}
+				}
+
 				if (list_member_oid(pubids, pub->oid) ||
 					list_member_oid(schemaPubids, pub->oid) ||
 					ancestor_published)
diff --git a/src/test/subscription/t/030_sequences.pl b/src/test/subscription/t/030_sequences.pl
index 9ae3c03d7d..7b5b735bff 100644
--- a/src/test/subscription/t/030_sequences.pl
+++ b/src/test/subscription/t/030_sequences.pl
@@ -22,6 +22,7 @@
 my $ddl = qq(
 	CREATE TABLE seq_test (v BIGINT);
 	CREATE SEQUENCE s;
+	CREATE TABLE identity_test (v BIGINT GENERATED BY DEFAULT AS IDENTITY);
 );
 
 # Setup structure on the publisher
@@ -33,6 +34,7 @@
 	CREATE TABLE seq_test (v BIGINT);
 	CREATE SEQUENCE s;
 	CREATE SEQUENCE s2;
+	CREATE TABLE identity_test (v BIGINT GENERATED BY DEFAULT AS IDENTITY);
 );
 
 $node_subscriber->safe_psql('postgres', $ddl);
@@ -45,8 +47,14 @@
 $node_publisher->safe_psql('postgres',
 	"ALTER PUBLICATION seq_pub ADD SEQUENCE s");
 
+$node_publisher->safe_psql('postgres',
+	"CREATE PUBLICATION tab_pub");
+
+$node_publisher->safe_psql('postgres',
+	"ALTER PUBLICATION tab_pub ADD TABLE identity_test");
+
 $node_subscriber->safe_psql('postgres',
-	"CREATE SUBSCRIPTION seq_sub CONNECTION '$publisher_connstr' PUBLICATION seq_pub"
+	"CREATE SUBSCRIPTION seq_sub CONNECTION '$publisher_connstr' PUBLICATION seq_pub, tab_pub"
 );
 
 $node_publisher->wait_for_catchup('seq_sub');
@@ -62,6 +70,7 @@
 	'postgres', qq(
 	-- generate a number of values using the sequence
 	INSERT INTO seq_test SELECT nextval('s') FROM generate_series(1,100);
+	INSERT INTO identity_test SELECT * FROM generate_series(1,100);
 ));
 
 $node_publisher->wait_for_catchup('seq_sub');
@@ -75,6 +84,13 @@
 is( $result, '132|0|t',
 	'initial test data replicated');
 
+is($node_subscriber->safe_psql('postgres', qq(SELECT max(v) FROM identity_test)),
+	'100',
+	'identity table replicated');
+
+is($node_subscriber->safe_psql('postgres', qq(SELECT * FROM identity_test_v_seq)),
+	'100|0|t',
+	'identity sequence advanced');
 
 # advance the sequence in a rolled-back transaction - the rollback
 # does not wait for the replication, so we could see any intermediate state
-- 
2.35.1