From b4ac63f9f8d1739366021cee293b14231f753386 Mon Sep 17 00:00:00 2001
From: Tomas Vondra <tomas.vondra@postgresql.org>
Date: Fri, 24 Sep 2021 00:42:04 +0200
Subject: [PATCH 3/4] Add support for decoding sequences to test_decoding

---
 contrib/test_decoding/Makefile              |   3 +-
 contrib/test_decoding/expected/sequence.out | 321 ++++++++++++++++++++
 contrib/test_decoding/sql/sequence.sql      | 119 ++++++++
 contrib/test_decoding/test_decoding.c       |  69 +++++
 4 files changed, 511 insertions(+), 1 deletion(-)
 create mode 100644 contrib/test_decoding/expected/sequence.out
 create mode 100644 contrib/test_decoding/sql/sequence.sql

diff --git a/contrib/test_decoding/Makefile b/contrib/test_decoding/Makefile
index 9a31e0b879..56ddc3abae 100644
--- a/contrib/test_decoding/Makefile
+++ b/contrib/test_decoding/Makefile
@@ -5,7 +5,8 @@ PGFILEDESC = "test_decoding - example of a logical decoding output plugin"
 
 REGRESS = ddl xact rewrite toast permissions decoding_in_xact \
 	decoding_into_rel binary prepared replorigin time messages \
-	spill slot truncate stream stats twophase twophase_stream
+	spill slot truncate stream stats twophase twophase_stream \
+	sequence
 ISOLATION = mxact delayed_startup ondisk_startup concurrent_ddl_dml \
 	oldest_xmin snapshot_transfer subxact_without_top concurrent_stream \
 	twophase_snapshot
diff --git a/contrib/test_decoding/expected/sequence.out b/contrib/test_decoding/expected/sequence.out
new file mode 100644
index 0000000000..17c88990b1
--- /dev/null
+++ b/contrib/test_decoding/expected/sequence.out
@@ -0,0 +1,321 @@
+-- predictability
+SET synchronous_commit = on;
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
+ ?column? 
+----------
+ init
+(1 row)
+
+CREATE SEQUENCE test_sequence;
+-- test the sequence changes by several nextval() calls
+SELECT nextval('test_sequence');
+ nextval 
+---------
+       1
+(1 row)
+
+SELECT nextval('test_sequence');
+ nextval 
+---------
+       2
+(1 row)
+
+SELECT nextval('test_sequence');
+ nextval 
+---------
+       3
+(1 row)
+
+SELECT nextval('test_sequence');
+ nextval 
+---------
+       4
+(1 row)
+
+-- test the sequence changes by several ALTER commands
+ALTER SEQUENCE test_sequence INCREMENT BY 10;
+SELECT nextval('test_sequence');
+ nextval 
+---------
+      14
+(1 row)
+
+ALTER SEQUENCE test_sequence START WITH 3000;
+ALTER SEQUENCE test_sequence MAXVALUE 10000;
+ALTER SEQUENCE test_sequence RESTART WITH 4000;
+SELECT nextval('test_sequence');
+ nextval 
+---------
+    4000
+(1 row)
+
+-- test the sequence changes by several setval() calls
+SELECT setval('test_sequence', 3500);
+ setval 
+--------
+   3500
+(1 row)
+
+SELECT nextval('test_sequence');
+ nextval 
+---------
+    3510
+(1 row)
+
+SELECT setval('test_sequence', 3500, true);
+ setval 
+--------
+   3500
+(1 row)
+
+SELECT nextval('test_sequence');
+ nextval 
+---------
+    3510
+(1 row)
+
+SELECT setval('test_sequence', 3500, false);
+ setval 
+--------
+   3500
+(1 row)
+
+SELECT nextval('test_sequence');
+ nextval 
+---------
+    3500
+(1 row)
+
+-- show results and drop sequence
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '0', 'skip-sequences', '0');
+                                              data                                              
+------------------------------------------------------------------------------------------------
+ BEGIN
+ sequence public.test_sequence: transactional:1 created:1 last_value:1 log_cnt:0 is_called:0
+ COMMIT
+ sequence public.test_sequence: transactional:0 created:0 last_value:33 log_cnt:0 is_called:1
+ BEGIN
+ sequence public.test_sequence: transactional:1 created:1 last_value:4 log_cnt:0 is_called:1
+ COMMIT
+ sequence public.test_sequence: transactional:0 created:0 last_value:334 log_cnt:0 is_called:1
+ BEGIN
+ sequence public.test_sequence: transactional:1 created:1 last_value:14 log_cnt:32 is_called:1
+ COMMIT
+ BEGIN
+ sequence public.test_sequence: transactional:1 created:1 last_value:14 log_cnt:0 is_called:1
+ COMMIT
+ BEGIN
+ sequence public.test_sequence: transactional:1 created:1 last_value:4000 log_cnt:0 is_called:0
+ COMMIT
+ sequence public.test_sequence: transactional:0 created:0 last_value:4320 log_cnt:0 is_called:1
+ sequence public.test_sequence: transactional:0 created:0 last_value:3500 log_cnt:0 is_called:1
+ sequence public.test_sequence: transactional:0 created:0 last_value:3830 log_cnt:0 is_called:1
+ sequence public.test_sequence: transactional:0 created:0 last_value:3500 log_cnt:0 is_called:1
+ sequence public.test_sequence: transactional:0 created:0 last_value:3830 log_cnt:0 is_called:1
+ sequence public.test_sequence: transactional:0 created:0 last_value:3500 log_cnt:0 is_called:0
+ sequence public.test_sequence: transactional:0 created:0 last_value:3820 log_cnt:0 is_called:1
+(24 rows)
+
+DROP SEQUENCE test_sequence;
+-- rollback on sequence creation and update
+BEGIN;
+CREATE SEQUENCE test_sequence;
+CREATE TABLE test_table (a INT);
+SELECT nextval('test_sequence');
+ nextval 
+---------
+       1
+(1 row)
+
+SELECT nextval('test_sequence');
+ nextval 
+---------
+       2
+(1 row)
+
+SELECT nextval('test_sequence');
+ nextval 
+---------
+       3
+(1 row)
+
+SELECT setval('test_sequence', 3000);
+ setval 
+--------
+   3000
+(1 row)
+
+SELECT nextval('test_sequence');
+ nextval 
+---------
+    3001
+(1 row)
+
+SELECT nextval('test_sequence');
+ nextval 
+---------
+    3002
+(1 row)
+
+SELECT nextval('test_sequence');
+ nextval 
+---------
+    3003
+(1 row)
+
+ALTER SEQUENCE test_sequence RESTART WITH 6000;
+INSERT INTO test_table VALUES( (SELECT nextval('test_sequence')) );
+SELECT nextval('test_sequence');
+ nextval 
+---------
+    6001
+(1 row)
+
+ROLLBACK;
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'skip-sequences', '0');
+ data 
+------
+(0 rows)
+
+-- rollback on table creation with serial column 
+BEGIN;
+CREATE TABLE test_table (a SERIAL, b INT);
+INSERT INTO test_table (b) VALUES (100);
+INSERT INTO test_table (b) VALUES (200);
+INSERT INTO test_table (b) VALUES (300);
+SELECT setval('test_table_a_seq', 3000);
+ setval 
+--------
+   3000
+(1 row)
+
+INSERT INTO test_table (b) VALUES (400);
+INSERT INTO test_table (b) VALUES (500);
+INSERT INTO test_table (b) VALUES (600);
+ALTER SEQUENCE test_table_a_seq RESTART WITH 6000;
+INSERT INTO test_table (b) VALUES (700);
+INSERT INTO test_table (b) VALUES (800);
+INSERT INTO test_table (b) VALUES (900);
+ROLLBACK;
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'skip-sequences', '0');
+ data 
+------
+(0 rows)
+
+-- rollback on table with serial column 
+CREATE TABLE test_table (a SERIAL, b INT);
+BEGIN;
+INSERT INTO test_table (b) VALUES (100);
+INSERT INTO test_table (b) VALUES (200);
+INSERT INTO test_table (b) VALUES (300);
+SELECT setval('test_table_a_seq', 3000);
+ setval 
+--------
+   3000
+(1 row)
+
+INSERT INTO test_table (b) VALUES (400);
+INSERT INTO test_table (b) VALUES (500);
+INSERT INTO test_table (b) VALUES (600);
+ALTER SEQUENCE test_table_a_seq RESTART WITH 6000;
+INSERT INTO test_table (b) VALUES (700);
+INSERT INTO test_table (b) VALUES (800);
+INSERT INTO test_table (b) VALUES (900);
+ROLLBACK;
+-- check table and sequence values after rollback 
+SELECT * from test_table_a_seq;
+ last_value | log_cnt | is_called 
+------------+---------+-----------
+       3003 |      30 | t
+(1 row)
+
+SELECT nextval('test_table_a_seq');
+ nextval 
+---------
+    3004
+(1 row)
+
+DROP TABLE test_table;
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '0', 'skip-sequences', '0');
+                                               data                                                
+---------------------------------------------------------------------------------------------------
+ BEGIN
+ sequence public.test_table_a_seq: transactional:1 created:1 last_value:1 log_cnt:0 is_called:0
+ COMMIT
+ sequence public.test_table_a_seq: transactional:0 created:0 last_value:33 log_cnt:0 is_called:1
+ sequence public.test_table_a_seq: transactional:0 created:0 last_value:3000 log_cnt:0 is_called:1
+ sequence public.test_table_a_seq: transactional:0 created:0 last_value:3033 log_cnt:0 is_called:1
+ BEGIN
+ COMMIT
+(8 rows)
+
+-- savepoint test on table with serial column
+BEGIN;
+CREATE TABLE test_table (a SERIAL, b INT);
+INSERT INTO test_table (b) VALUES (100);
+INSERT INTO test_table (b) VALUES (200);
+SAVEPOINT a;
+INSERT INTO test_table (b) VALUES (300);
+ROLLBACK TO SAVEPOINT a;
+DROP TABLE test_table;
+COMMIT;
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '0', 'skip-sequences', '0');
+                             data                             
+--------------------------------------------------------------
+ BEGIN
+ table public.test_table: INSERT: a[integer]:1 b[integer]:100
+ table public.test_table: INSERT: a[integer]:2 b[integer]:200
+ COMMIT
+(4 rows)
+
+-- savepoint test on table with serial column
+BEGIN;
+CREATE SEQUENCE test_sequence;
+SELECT nextval('test_sequence');
+ nextval 
+---------
+       1
+(1 row)
+
+SELECT setval('test_sequence', 3000);
+ setval 
+--------
+   3000
+(1 row)
+
+SELECT nextval('test_sequence');
+ nextval 
+---------
+    3001
+(1 row)
+
+SAVEPOINT a;
+ALTER SEQUENCE test_sequence START WITH 7000;
+SELECT setval('test_sequence', 5000);
+ setval 
+--------
+   5000
+(1 row)
+
+ROLLBACK TO SAVEPOINT a;
+SELECT * FROM test_sequence;
+ last_value | log_cnt | is_called 
+------------+---------+-----------
+       3001 |      32 | t
+(1 row)
+
+DROP SEQUENCE test_sequence;
+COMMIT;
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '0', 'skip-sequences', '0');
+  data  
+--------
+ BEGIN
+ COMMIT
+(2 rows)
+
+SELECT pg_drop_replication_slot('regression_slot');
+ pg_drop_replication_slot 
+--------------------------
+ 
+(1 row)
+
diff --git a/contrib/test_decoding/sql/sequence.sql b/contrib/test_decoding/sql/sequence.sql
new file mode 100644
index 0000000000..d8a34738f3
--- /dev/null
+++ b/contrib/test_decoding/sql/sequence.sql
@@ -0,0 +1,119 @@
+-- predictability
+SET synchronous_commit = on;
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
+
+CREATE SEQUENCE test_sequence;
+
+-- test the sequence changes by several nextval() calls
+SELECT nextval('test_sequence');
+SELECT nextval('test_sequence');
+SELECT nextval('test_sequence');
+SELECT nextval('test_sequence');
+
+-- test the sequence changes by several ALTER commands
+ALTER SEQUENCE test_sequence INCREMENT BY 10;
+SELECT nextval('test_sequence');
+
+ALTER SEQUENCE test_sequence START WITH 3000;
+ALTER SEQUENCE test_sequence MAXVALUE 10000;
+ALTER SEQUENCE test_sequence RESTART WITH 4000;
+SELECT nextval('test_sequence');
+
+-- test the sequence changes by several setval() calls
+SELECT setval('test_sequence', 3500);
+SELECT nextval('test_sequence');
+SELECT setval('test_sequence', 3500, true);
+SELECT nextval('test_sequence');
+SELECT setval('test_sequence', 3500, false);
+SELECT nextval('test_sequence');
+
+-- show results and drop sequence
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '0', 'skip-sequences', '0');
+DROP SEQUENCE test_sequence;
+
+-- rollback on sequence creation and update
+BEGIN;
+CREATE SEQUENCE test_sequence;
+CREATE TABLE test_table (a INT);
+SELECT nextval('test_sequence');
+SELECT nextval('test_sequence');
+SELECT nextval('test_sequence');
+SELECT setval('test_sequence', 3000);
+SELECT nextval('test_sequence');
+SELECT nextval('test_sequence');
+SELECT nextval('test_sequence');
+ALTER SEQUENCE test_sequence RESTART WITH 6000;
+INSERT INTO test_table VALUES( (SELECT nextval('test_sequence')) );
+SELECT nextval('test_sequence');
+ROLLBACK;
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'skip-sequences', '0');
+
+-- rollback on table creation with serial column 
+BEGIN;
+CREATE TABLE test_table (a SERIAL, b INT);
+INSERT INTO test_table (b) VALUES (100);
+INSERT INTO test_table (b) VALUES (200);
+INSERT INTO test_table (b) VALUES (300);
+SELECT setval('test_table_a_seq', 3000);
+INSERT INTO test_table (b) VALUES (400);
+INSERT INTO test_table (b) VALUES (500);
+INSERT INTO test_table (b) VALUES (600);
+ALTER SEQUENCE test_table_a_seq RESTART WITH 6000;
+INSERT INTO test_table (b) VALUES (700);
+INSERT INTO test_table (b) VALUES (800);
+INSERT INTO test_table (b) VALUES (900);
+ROLLBACK;
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'skip-sequences', '0');
+
+-- rollback on table with serial column 
+CREATE TABLE test_table (a SERIAL, b INT);
+
+BEGIN;
+INSERT INTO test_table (b) VALUES (100);
+INSERT INTO test_table (b) VALUES (200);
+INSERT INTO test_table (b) VALUES (300);
+SELECT setval('test_table_a_seq', 3000);
+INSERT INTO test_table (b) VALUES (400);
+INSERT INTO test_table (b) VALUES (500);
+INSERT INTO test_table (b) VALUES (600);
+ALTER SEQUENCE test_table_a_seq RESTART WITH 6000;
+INSERT INTO test_table (b) VALUES (700);
+INSERT INTO test_table (b) VALUES (800);
+INSERT INTO test_table (b) VALUES (900);
+ROLLBACK;
+
+-- check table and sequence values after rollback 
+SELECT * from test_table_a_seq;
+SELECT nextval('test_table_a_seq');
+
+DROP TABLE test_table;
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '0', 'skip-sequences', '0');
+
+-- savepoint test on table with serial column
+BEGIN;
+CREATE TABLE test_table (a SERIAL, b INT);
+INSERT INTO test_table (b) VALUES (100);
+INSERT INTO test_table (b) VALUES (200);
+SAVEPOINT a;
+INSERT INTO test_table (b) VALUES (300);
+ROLLBACK TO SAVEPOINT a;
+DROP TABLE test_table;
+COMMIT;
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '0', 'skip-sequences', '0');
+
+-- savepoint test on table with serial column
+BEGIN;
+CREATE SEQUENCE test_sequence;
+SELECT nextval('test_sequence');
+SELECT setval('test_sequence', 3000);
+SELECT nextval('test_sequence');
+SAVEPOINT a;
+ALTER SEQUENCE test_sequence START WITH 7000;
+SELECT setval('test_sequence', 5000);
+ROLLBACK TO SAVEPOINT a;
+SELECT * FROM test_sequence;
+DROP SEQUENCE test_sequence;
+COMMIT;
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '0', 'skip-sequences', '0');
+
+SELECT pg_drop_replication_slot('regression_slot');
diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c
index e5cd84e85e..45765d299e 100644
--- a/contrib/test_decoding/test_decoding.c
+++ b/contrib/test_decoding/test_decoding.c
@@ -35,6 +35,7 @@ typedef struct
 	bool		include_timestamp;
 	bool		skip_empty_xacts;
 	bool		only_local;
+	bool		skip_sequences;
 } TestDecodingData;
 
 /*
@@ -76,6 +77,10 @@ static void pg_decode_message(LogicalDecodingContext *ctx,
 							  ReorderBufferTXN *txn, XLogRecPtr message_lsn,
 							  bool transactional, const char *prefix,
 							  Size sz, const char *message);
+static void pg_decode_sequence(LogicalDecodingContext *ctx,
+							  ReorderBufferTXN *txn, XLogRecPtr sequence_lsn,
+							  Relation rel, bool transactional, bool created,
+							  int64 last_value, int64 log_cnt, bool is_called);
 static bool pg_decode_filter_prepare(LogicalDecodingContext *ctx,
 									 TransactionId xid,
 									 const char *gid);
@@ -116,6 +121,10 @@ static void pg_decode_stream_message(LogicalDecodingContext *ctx,
 									 ReorderBufferTXN *txn, XLogRecPtr message_lsn,
 									 bool transactional, const char *prefix,
 									 Size sz, const char *message);
+static void pg_decode_stream_sequence(LogicalDecodingContext *ctx,
+									  ReorderBufferTXN *txn, XLogRecPtr sequence_lsn,
+									  Relation rel, bool transactional, bool created,
+									  int64 last_value, int64 log_cnt, bool is_called);
 static void pg_decode_stream_truncate(LogicalDecodingContext *ctx,
 									  ReorderBufferTXN *txn,
 									  int nrelations, Relation relations[],
@@ -141,6 +150,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
 	cb->filter_by_origin_cb = pg_decode_filter;
 	cb->shutdown_cb = pg_decode_shutdown;
 	cb->message_cb = pg_decode_message;
+	cb->sequence_cb = pg_decode_sequence;
 	cb->filter_prepare_cb = pg_decode_filter_prepare;
 	cb->begin_prepare_cb = pg_decode_begin_prepare_txn;
 	cb->prepare_cb = pg_decode_prepare_txn;
@@ -153,6 +163,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
 	cb->stream_commit_cb = pg_decode_stream_commit;
 	cb->stream_change_cb = pg_decode_stream_change;
 	cb->stream_message_cb = pg_decode_stream_message;
+	cb->stream_sequence_cb = pg_decode_stream_sequence;
 	cb->stream_truncate_cb = pg_decode_stream_truncate;
 }
 
@@ -175,6 +186,9 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
 	data->skip_empty_xacts = false;
 	data->only_local = false;
 
+	/* skip sequences by default for backwards compatibility */
+	data->skip_sequences = true;
+
 	ctx->output_plugin_private = data;
 
 	opt->output_type = OUTPUT_PLUGIN_TEXTUAL_OUTPUT;
@@ -265,6 +279,17 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
 						 errmsg("could not parse value \"%s\" for parameter \"%s\"",
 								strVal(elem->arg), elem->defname)));
 		}
+		else if (strcmp(elem->defname, "skip-sequences") == 0)
+		{
+
+			if (elem->arg == NULL)
+				continue;	/* true by default */
+			else if (!parse_bool(strVal(elem->arg), &data->skip_sequences))
+				ereport(ERROR,
+						(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+						 errmsg("could not parse value \"%s\" for parameter \"%s\"",
+								strVal(elem->arg), elem->defname)));
+		}
 		else
 		{
 			ereport(ERROR,
@@ -744,6 +769,28 @@ pg_decode_message(LogicalDecodingContext *ctx,
 	OutputPluginWrite(ctx, true);
 }
 
+static void
+pg_decode_sequence(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
+				   XLogRecPtr sequence_lsn, Relation rel,
+				   bool transactional, bool created,
+				   int64 last_value, int64 log_cnt, bool is_called)
+{
+	TestDecodingData *data = ctx->output_plugin_private;
+
+	/* return if requested to skip_sequences */
+	if (data->skip_sequences)
+		return;
+
+	OutputPluginPrepareWrite(ctx, true);
+	appendStringInfoString(ctx->out, "sequence ");
+	appendStringInfoString(ctx->out,
+						   quote_qualified_identifier(get_namespace_name(get_rel_namespace(RelationGetRelid(rel))),
+													  RelationGetRelationName(rel)));
+	appendStringInfo(ctx->out, 	": transactional:%d created:%d last_value:%zu log_cnt:%zu is_called:%d",
+					 transactional, created, last_value, log_cnt, is_called);
+	OutputPluginWrite(ctx, true);
+}
+
 static void
 pg_decode_stream_start(LogicalDecodingContext *ctx,
 					   ReorderBufferTXN *txn)
@@ -943,6 +990,28 @@ pg_decode_stream_message(LogicalDecodingContext *ctx,
 	OutputPluginWrite(ctx, true);
 }
 
+static void
+pg_decode_stream_sequence(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
+						  XLogRecPtr sequence_lsn, Relation rel,
+						  bool transactional, bool created,
+						  int64 last_value, int64 log_cnt, bool is_called)
+{
+	TestDecodingData *data = ctx->output_plugin_private;
+
+	/* return if requested to skip_sequences */
+	if (data->skip_sequences)
+		return;
+
+	OutputPluginPrepareWrite(ctx, true);
+	appendStringInfoString(ctx->out, "streaming sequence ");
+	appendStringInfoString(ctx->out,
+						   quote_qualified_identifier(get_namespace_name(get_rel_namespace(RelationGetRelid(rel))),
+													  RelationGetRelationName(rel)));
+	appendStringInfo(ctx->out, 	": transactional:%d created:%d last_value:%zu log_cnt:%zu is_called:%d",
+					 transactional, created, last_value, log_cnt, is_called);
+	OutputPluginWrite(ctx, true);
+}
+
 /*
  * In streaming mode, we don't display the detailed information of Truncate.
  * See pg_decode_stream_change.
-- 
2.31.1

