From 43f63100b0e792a3f6e96c9c4f2218b27a731f52 Mon Sep 17 00:00:00 2001
From: Tomas Vondra <tomas.vondra@postgresql.org>
Date: Thu, 18 Nov 2021 19:48:30 +0100
Subject: [PATCH 2/4] rework

---
 src/backend/access/rmgrdesc/logicalmsgdesc.c  |  13 ++
 src/backend/access/transam/xact.c             |   9 +
 src/backend/commands/sequence.c               | 180 +++++++++------
 src/backend/replication/logical/decode.c      | 104 +++------
 src/backend/replication/logical/message.c     |   3 +-
 .../replication/logical/reorderbuffer.c       | 213 ++++--------------
 src/include/commands/sequence.h               |   2 +
 src/include/replication/message.h             |  15 ++
 src/include/replication/reorderbuffer.h       |  10 +-
 9 files changed, 232 insertions(+), 317 deletions(-)

diff --git a/src/backend/access/rmgrdesc/logicalmsgdesc.c b/src/backend/access/rmgrdesc/logicalmsgdesc.c
index d64ce2e7ef..c84628cd54 100644
--- a/src/backend/access/rmgrdesc/logicalmsgdesc.c
+++ b/src/backend/access/rmgrdesc/logicalmsgdesc.c
@@ -40,6 +40,16 @@ logicalmsg_desc(StringInfo buf, XLogReaderState *record)
 			sep = " ";
 		}
 	}
+	else if (info == XLOG_LOGICAL_SEQUENCE)
+	{
+		xl_logical_sequence *xlrec = (xl_logical_sequence *) rec;
+
+		appendStringInfo(buf, "rel %u/%u/%u last: %lu log_cnt: %lu is_called: %d",
+						 xlrec->node.spcNode,
+						 xlrec->node.dbNode,
+						 xlrec->node.relNode,
+						 xlrec->last, xlrec->log_cnt, xlrec->is_called);
+	}
 }
 
 const char *
@@ -48,5 +58,8 @@ logicalmsg_identify(uint8 info)
 	if ((info & ~XLR_INFO_MASK) == XLOG_LOGICAL_MESSAGE)
 		return "MESSAGE";
 
+	if ((info & ~XLR_INFO_MASK) == XLOG_LOGICAL_SEQUENCE)
+		return "SEQUENCE";
+
 	return NULL;
 }
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index 8e35c432f5..eac8180250 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -36,6 +36,7 @@
 #include "catalog/storage.h"
 #include "commands/async.h"
 #include "commands/tablecmds.h"
+#include "commands/sequence.h"
 #include "commands/trigger.h"
 #include "executor/spi.h"
 #include "libpq/be-fsstubs.h"
@@ -2220,6 +2221,9 @@ CommitTransaction(void)
 	/* Prevent cancel/die interrupt while cleaning up */
 	HOLD_INTERRUPTS();
 
+	/* Write state of sequences to WAL */
+	AtEOXact_Sequences(true);
+
 	/* Commit updates to the relation map --- do this as late as possible */
 	AtEOXact_RelationMap(true, is_parallel_worker);
 
@@ -2378,6 +2382,8 @@ CommitTransaction(void)
  *	PrepareTransaction
  *
  * NB: if you change this routine, better look at CommitTransaction too!
+ *
+ * XXX Does this need to do something about logging of sequences?
  */
 static void
 PrepareTransaction(void)
@@ -2776,6 +2782,9 @@ AbortTransaction(void)
 	AtEOXact_RelationMap(false, is_parallel_worker);
 	AtAbort_Twophase();
 
+	/* XXX not sure we need to do anything about sequences at abort */
+	AtEOXact_Sequences(false);
+
 	/*
 	 * Advertise the fact that we aborted in pg_xact (assuming that we got as
 	 * far as assigning an XID to advertise).  But if we're inside a parallel
diff --git a/src/backend/commands/sequence.c b/src/backend/commands/sequence.c
index a98fcc2e97..92d1966f34 100644
--- a/src/backend/commands/sequence.c
+++ b/src/backend/commands/sequence.c
@@ -37,6 +37,7 @@
 #include "miscadmin.h"
 #include "nodes/makefuncs.h"
 #include "parser/parse_type.h"
+#include "replication/message.h"
 #include "storage/lmgr.h"
 #include "storage/proc.h"
 #include "storage/smgr.h"
@@ -75,6 +76,7 @@ typedef struct SeqTableData
 {
 	Oid			relid;			/* pg_class OID of this sequence (hash key) */
 	Oid			filenode;		/* last seen relfilenode of this sequence */
+	Oid			tablespace;		/* last seen tablespace of this sequence */
 	LocalTransactionId lxid;	/* xact in which we last did a seq op */
 	bool		last_valid;		/* do we have a valid "last" value? */
 	int64		last;			/* value last returned by nextval */
@@ -82,6 +84,10 @@ typedef struct SeqTableData
 	/* if last != cached, we have not used up all the cached values */
 	int64		increment;		/* copy of sequence's increment field */
 	/* note that increment is zero until we first do nextval_internal() */
+	bool		need_log;		/* should be written to WAL at commit? */
+	bool		touched;
+	int64		log_cnt;
+	bool		is_called;
 } SeqTableData;
 
 typedef SeqTableData *SeqTable;
@@ -94,7 +100,7 @@ static HTAB *seqhashtab = NULL; /* hash table for SeqTable items */
  */
 static SeqTableData *last_used_seq = NULL;
 
-static void fill_seq_with_data(Relation rel, HeapTuple tuple, bool create);
+static void fill_seq_with_data(Relation rel, HeapTuple tuple);
 static Relation lock_and_open_sequence(SeqTable seq);
 static void create_seq_hashtable(void);
 static void init_sequence(Oid relid, SeqTable *p_elm, Relation *p_rel);
@@ -222,7 +228,7 @@ DefineSequence(ParseState *pstate, CreateSeqStmt *seq)
 
 	/* now initialize the sequence's data */
 	tuple = heap_form_tuple(tupDesc, value, null);
-	fill_seq_with_data(rel, tuple, true);
+	fill_seq_with_data(rel, tuple);
 
 	/* process OWNED BY if given */
 	if (owned_by)
@@ -327,12 +333,17 @@ ResetSequence(Oid seq_relid)
 	/*
 	 * Insert the modified tuple into the new storage file.
 	 */
-	fill_seq_with_data(seq_rel, tuple, true);
+	fill_seq_with_data(seq_rel, tuple);
 
 	/* Clear local cache so that we don't think we have cached numbers */
 	/* Note that we do not change the currval() state */
 	elm->cached = elm->last;
 
+	/* Remember we need to write the sequence to WAL at commit. */
+	elm->need_log = true;
+	elm->is_called = false;
+	elm->log_cnt = 0;
+
 	relation_close(seq_rel, NoLock);
 }
 
@@ -340,7 +351,7 @@ ResetSequence(Oid seq_relid)
  * Initialize a sequence's relation with the specified tuple as content
  */
 static void
-fill_seq_with_data(Relation rel, HeapTuple tuple, bool create)
+fill_seq_with_data(Relation rel, HeapTuple tuple)
 {
 	Buffer		buf;
 	Page		page;
@@ -380,27 +391,7 @@ fill_seq_with_data(Relation rel, HeapTuple tuple, bool create)
 	if (RelationNeedsWAL(rel))
 	{
 		GetTopTransactionId();
-
-		/*
-		 * Ensure we have a proper XID, which will be included in the XLOG
-		 * record by XLogRecordAssemble. Otherwise the first nextval() in
-		 * a subxact (without any preceding changes) would get XID 0,
-		 * and it'd be impossible to decide which top xact it belongs to.
-		 * It'd also trigger assert in DecodeSequence.
-		 *
-		 * XXX This might seem unnecessary, because if there's no XID the
-		 * transaction couldn't have done anything important yet, e.g. it
-		 * could not have created a sequence. But that's incorrect, as it
-		 * ignores subtransactions. The current subtransaction might not
-		 * have done anything yet (thus no XID), but an earlier one might
-		 * have created the sequence.
-		 *
-		 * XXX Not sure if this is the best solution. Maybe do this only
-		 * with wal_level=logical to minimize the overhead. OTOH advancing
-		 * the sequence is likely followed by using the value(s) for some
-		 * other activity, which assigns the XID.
-		 */
-		GetCurrentTransactionId();
+		// GetCurrentTransactionId();
 	}
 
 	START_CRIT_SECTION();
@@ -422,7 +413,6 @@ fill_seq_with_data(Relation rel, HeapTuple tuple, bool create)
 		XLogRegisterBuffer(0, buf, REGBUF_WILL_INIT);
 
 		xlrec.node = rel->rd_node;
-		xlrec.created = create;
 
 		XLogRegisterData((char *) &xlrec, sizeof(xl_seq_rec));
 		XLogRegisterData((char *) tuple->t_data, tuple->t_len);
@@ -526,7 +516,11 @@ AlterSequence(ParseState *pstate, AlterSeqStmt *stmt)
 		/*
 		 * Insert the modified tuple into the new storage file.
 		 */
-		fill_seq_with_data(seqrel, newdatatuple, true);
+		fill_seq_with_data(seqrel, newdatatuple);
+
+		elm->need_log = true;
+		elm->is_called = false;
+		elm->log_cnt = 0;
 	}
 
 	/* process OWNED BY if given */
@@ -792,27 +786,7 @@ nextval_internal(Oid relid, bool check_permissions)
 	if (logit && RelationNeedsWAL(seqrel))
 	{
 		GetTopTransactionId();
-
-		/*
-		 * Ensure we have a proper XID, which will be included in the XLOG
-		 * record by XLogRecordAssemble. Otherwise the first nextval() in
-		 * a subxact (without any preceding changes) would get XID 0,
-		 * and it'd be impossible to decide which top xact it belongs to.
-		 * It'd also trigger assert in DecodeSequence.
-		 *
-		 * XXX This might seem unnecessary, because if there's no XID the
-		 * transaction couldn't have done anything important yet, e.g. it
-		 * could not have created a sequence. But that's incorrect, as it
-		 * ignores subtransactions. The current subtransaction might not
-		 * have done anything yet (thus no XID), but an earlier one might
-		 * have created the sequence.
-		 *
-		 * XXX Not sure if this is the best solution. Maybe do this only
-		 * with wal_level=logical to minimize the overhead. OTOH advancing
-		 * the sequence is likely followed by using the value(s) for some
-		 * other activity, which assigns the XID.
-		 */
-		GetCurrentTransactionId();
+		// GetCurrentTransactionId();
 	}
 
 	/* ready to change the on-disk (or really, in-buffer) tuple */
@@ -850,7 +824,6 @@ nextval_internal(Oid relid, bool check_permissions)
 		seq->log_cnt = 0;
 
 		xlrec.node = seqrel->rd_node;
-		xlrec.created = false;
 
 		XLogRegisterData((char *) &xlrec, sizeof(xl_seq_rec));
 		XLogRegisterData((char *) seqdatatuple.t_data, seqdatatuple.t_len);
@@ -858,6 +831,10 @@ nextval_internal(Oid relid, bool check_permissions)
 		recptr = XLogInsert(RM_SEQ_ID, XLOG_SEQ_LOG);
 
 		PageSetLSN(page, recptr);
+
+		elm->need_log = true;
+		elm->is_called = true;
+		elm->log_cnt = 0;
 	}
 
 	/* Now update sequence tuple to the intended final state */
@@ -1027,27 +1004,7 @@ do_setval(Oid relid, int64 next, bool iscalled)
 	if (RelationNeedsWAL(seqrel))
 	{
 		GetTopTransactionId();
-
-		/*
-		 * Ensure we have a proper XID, which will be included in the XLOG
-		 * record by XLogRecordAssemble. Otherwise the first nextval() in
-		 * a subxact (without any preceding changes) would get XID 0,
-		 * and it'd be impossible to decide which top xact it belongs to.
-		 * It'd also trigger assert in DecodeSequence.
-		 *
-		 * XXX This might seem unnecessary, because if there's no XID the
-		 * transaction couldn't have done anything important yet, e.g. it
-		 * could not have created a sequence. But that's incorrect, as it
-		 * ignores subtransactions. The current subtransaction might not
-		 * have done anything yet (thus no XID), but an earlier one might
-		 * have created the sequence.
-		 *
-		 * XXX Not sure if this is the best solution. Maybe do this only
-		 * with wal_level=logical to minimize the overhead. OTOH advancing
-		 * the sequence is likely followed by using the value(s) for some
-		 * other activity, which assigns the XID.
-		 */
-		GetCurrentTransactionId();
+		// GetCurrentTransactionId();
 	}
 
 	/* ready to change the on-disk (or really, in-buffer) tuple */
@@ -1070,14 +1027,16 @@ do_setval(Oid relid, int64 next, bool iscalled)
 		XLogRegisterBuffer(0, buf, REGBUF_WILL_INIT);
 
 		xlrec.node = seqrel->rd_node;
-		xlrec.created = false;
-
 		XLogRegisterData((char *) &xlrec, sizeof(xl_seq_rec));
 		XLogRegisterData((char *) seqdatatuple.t_data, seqdatatuple.t_len);
 
 		recptr = XLogInsert(RM_SEQ_ID, XLOG_SEQ_LOG);
 
 		PageSetLSN(page, recptr);
+
+		elm->need_log = true;
+		elm->is_called = false;
+		elm->log_cnt = 0;
 	}
 
 	END_CRIT_SECTION();
@@ -1196,11 +1155,14 @@ init_sequence(Oid relid, SeqTable *p_elm, Relation *p_rel)
 	{
 		/* relid already filled in */
 		elm->filenode = InvalidOid;
+		elm->tablespace = InvalidOid;
 		elm->lxid = InvalidLocalTransactionId;
 		elm->last_valid = false;
 		elm->last = elm->cached = 0;
 	}
 
+	elm->touched = true;
+
 	/*
 	 * Open the sequence relation.
 	 */
@@ -1219,6 +1181,7 @@ init_sequence(Oid relid, SeqTable *p_elm, Relation *p_rel)
 	 */
 	if (seqrel->rd_rel->relfilenode != elm->filenode)
 	{
+		elm->tablespace = seqrel->rd_rel->reltablespace;
 		elm->filenode = seqrel->rd_rel->relfilenode;
 		elm->cached = elm->last;
 	}
@@ -1977,6 +1940,8 @@ seq_redo(XLogReaderState *record)
 
 /*
  * Flush cached sequence information.
+ *
+ * XXX Do we need to WAL-log the entries based on need_log?
  */
 void
 ResetSequenceCaches(void)
@@ -2000,3 +1965,74 @@ seq_mask(char *page, BlockNumber blkno)
 
 	mask_unused_space(page);
 }
+
+/* XXX Do this only for wal_level = logical, probably? */
+void
+AtEOXact_Sequences(bool isCommit)
+{
+	SeqTable		entry;
+	HASH_SEQ_STATUS	scan;
+
+	if (!seqhashtab)
+		return;
+
+	/*
+	 * XXX If we run just nextval() that returns cached value, the xact may
+	 * not have XID, but we expect it in reorderbuffer. Maybe treat that as
+	 * transactional?
+	 */
+	// GetTopTransactionId();
+	// GetCurrentTransactionId();
+
+	hash_seq_init(&scan, seqhashtab);
+
+	while ((entry = (SeqTable) hash_seq_search(&scan)))
+	{
+		RelFileNode			rnode;
+		xl_logical_sequence	xlrec;
+
+		if (!isCommit)
+			entry->touched = false;
+
+		/*
+		 * If not touched in the current transaction, don't log anything.
+		 * We leave needs_log set, so that if future transactions touch
+		 * the sequence we'll log it properly.
+		 */
+		if (!entry->touched)
+			continue;
+
+		/*
+		 * Likewise, if the sequence does not need logging, we're done.
+		 */
+		if (!entry->need_log)
+			continue;
+
+		/* if this is commit, we'll log the */
+		entry->need_log = false;
+		entry->touched = false;
+
+		rnode.spcNode = entry->tablespace;		/* tablespace */
+		rnode.dbNode = MyDatabaseId;			/* database */
+		rnode.relNode = entry->filenode;		/* relation */
+
+		xlrec.node = rnode;
+		xlrec.reloid = entry->relid;
+
+		/* XXX is it good enough to log values we have in cache? seems
+		 * wrong and we may need to re-read that. */
+		// xlrec.dbId = MyDatabaseId;
+		// xlrec.relId = entry->relid;
+		xlrec.last = entry->last;
+		xlrec.log_cnt = entry->log_cnt;
+		xlrec.is_called = entry->is_called;
+
+		XLogBeginInsert();
+		XLogRegisterData((char *) &xlrec, SizeOfLogicalSequence);
+
+		/* allow origin filtering */
+		XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN);
+
+		(void) XLogInsert(RM_LOGICALMSG_ID, XLOG_LOGICAL_SEQUENCE);
+	}
+}
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index b61f2e264a..d29844adf9 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -59,6 +59,9 @@ static void DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
 static void DecodeStandbyOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
 static void DecodeLogicalMsgOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
 
+static void DecodeLogicalMessage(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
+static void DecodeLogicalSequence(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
+
 /* individual record(group)'s handlers */
 static void DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
 static void DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
@@ -75,11 +78,9 @@ static void DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
 						bool two_phase);
 static void DecodePrepare(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
 						  xl_xact_parsed_prepare *parsed);
-static void DecodeSequence(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
 
 /* common function to decode tuples */
 static void DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tup);
-static void DecodeSeqTuple(char *data, Size len, ReorderBufferTupleBuf *tuple);
 
 /* helper functions for decoding transactions */
 static inline bool FilterPrepare(LogicalDecodingContext *ctx,
@@ -160,10 +161,6 @@ LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *recor
 			DecodeLogicalMsgOp(ctx, &buf);
 			break;
 
-		case RM_SEQ_ID:
-			DecodeSequence(ctx, &buf);
-			break;
-
 			/*
 			 * Rmgrs irrelevant for logical decoding; they describe stuff not
 			 * represented in logical decoding. Add new rmgrs in rmgrlist.h's
@@ -626,6 +623,27 @@ FilterByOrigin(LogicalDecodingContext *ctx, RepOriginId origin_id)
  */
 static void
 DecodeLogicalMsgOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
+{
+	XLogReaderState *r = buf->record;
+	uint8		info = XLogRecGetInfo(r) & ~XLR_INFO_MASK;
+
+	switch (info)
+	{
+		case XLOG_LOGICAL_MESSAGE:
+			DecodeLogicalMessage(ctx, buf);
+			break;
+
+		case XLOG_LOGICAL_SEQUENCE:
+			DecodeLogicalSequence(ctx, buf);
+			break;
+
+		default:
+			elog(ERROR, "unexpected RM_LOGICALMSG_ID record type: %u", info);
+	}
+}
+
+static void
+DecodeLogicalMessage(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 {
 	SnapBuild  *builder = ctx->snapshot_builder;
 	XLogReaderState *r = buf->record;
@@ -1319,31 +1337,6 @@ DecodeTXNNeedSkip(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
 			ctx->fast_forward || FilterByOrigin(ctx, origin_id));
 }
 
-/*
- * Decode Sequence Tuple
- */
-static void
-DecodeSeqTuple(char *data, Size len, ReorderBufferTupleBuf *tuple)
-{
-	int			datalen = len - sizeof(xl_seq_rec) - SizeofHeapTupleHeader;
-
-	Assert(datalen >= 0);
-
-	tuple->tuple.t_len = datalen + SizeofHeapTupleHeader;;
-
-	ItemPointerSetInvalid(&tuple->tuple.t_self);
-
-	tuple->tuple.t_tableOid = InvalidOid;
-
-	memcpy(((char *) tuple->tuple.t_data),
-		   data + sizeof(xl_seq_rec),
-		   SizeofHeapTupleHeader);
-
-	memcpy(((char *) tuple->tuple.t_data) + SizeofHeapTupleHeader,
-		   data + sizeof(xl_seq_rec) + SizeofHeapTupleHeader,
-		   datalen);
-}
-
 /*
  * Handle sequence decode
  *
@@ -1362,24 +1355,20 @@ DecodeSeqTuple(char *data, Size len, ReorderBufferTupleBuf *tuple)
  * plugin - it might get confused about which sequence it's related to etc.
  */
 static void
-DecodeSequence(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
+DecodeLogicalSequence(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 {
 	SnapBuild  *builder = ctx->snapshot_builder;
-	ReorderBufferTupleBuf *tuplebuf;
 	RelFileNode target_node;
 	XLogReaderState *r = buf->record;
-	char	   *tupledata = NULL;
-	Size		tuplelen;
-	Size		datalen = 0;
 	TransactionId xid = XLogRecGetXid(r);
 	uint8		info = XLogRecGetInfo(buf->record) & ~XLR_INFO_MASK;
-	xl_seq_rec *xlrec;
+	xl_logical_sequence *xlrec;
 	Snapshot	snapshot;
 	RepOriginId origin_id = XLogRecGetOrigin(r);
-	bool		transactional;
+	bool		transactional = TransactionIdIsValid(xid);
 
 	/* only decode changes flagged with XLOG_SEQ_LOG */
-	if (info != XLOG_SEQ_LOG)
+	if (info != XLOG_LOGICAL_SEQUENCE)
 		elog(ERROR, "unexpected RM_SEQ_ID record type: %u", info);
 
 	/*
@@ -1390,8 +1379,11 @@ DecodeSequence(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 		ctx->fast_forward)
 		return;
 
+	/* extract the WAL record, with "created" flag */
+	xlrec = (xl_logical_sequence *) XLogRecGetData(r);
+
 	/* only interested in our database */
-	XLogRecGetBlockTag(r, 0, &target_node, NULL, NULL);
+	target_node = xlrec->node;
 	if (target_node.dbNode != ctx->slot->data.database)
 		return;
 
@@ -1399,44 +1391,18 @@ DecodeSequence(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 	if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
 		return;
 
-	tupledata = XLogRecGetData(r);
-	datalen = XLogRecGetDataLen(r);
-	tuplelen = datalen - SizeOfHeapHeader - sizeof(xl_seq_rec);
-
-	/* extract the WAL record, with "created" flag */
-	xlrec = (xl_seq_rec *) XLogRecGetData(r);
-
-	/* XXX how could we have sequence change without data? */
-	if(!datalen || !tupledata)
-		return;
-
-	tuplebuf = ReorderBufferGetTupleBuf(ctx->reorder, tuplelen);
-	DecodeSeqTuple(tupledata, datalen, tuplebuf);
-
-	/*
-	 * Should we handle the sequence increment as transactional or not?
-	 *
-	 * If the sequence was created in a still-running transaction, treat
-	 * it as transactional and queue the increments. Otherwise it needs
-	 * to be treated as non-transactional, in which case we send it to
-	 * the plugin right away.
-	 */
-	transactional = ReorderBufferSequenceIsTransactional(ctx->reorder,
-														 target_node,
-														 xlrec->created);
-
 	/* Skip the change if already processed (per the snapshot). */
 	if (transactional &&
 		!SnapBuildProcessChange(builder, xid, buf->origptr))
 		return;
 	else if (!transactional &&
 			 (SnapBuildCurrentState(builder) != SNAPBUILD_CONSISTENT ||
-			  SnapBuildXactNeedsSkip(builder, buf->origptr)))
+			 SnapBuildXactNeedsSkip(builder, buf->origptr)))
 		return;
 
 	/* Queue the increment (or send immediately if not transactional). */
 	snapshot = SnapBuildGetOrBuildSnapshot(builder, xid);
 	ReorderBufferQueueSequence(ctx->reorder, xid, snapshot, buf->endptr,
-							   origin_id, target_node, transactional,
-							   xlrec->created, tuplebuf);
+							   origin_id, xlrec->reloid, target_node,
+							   xlrec->last, xlrec->log_cnt, xlrec->is_called);
 }
diff --git a/src/backend/replication/logical/message.c b/src/backend/replication/logical/message.c
index 93bd372421..3f0ac57fc6 100644
--- a/src/backend/replication/logical/message.c
+++ b/src/backend/replication/logical/message.c
@@ -81,7 +81,8 @@ logicalmsg_redo(XLogReaderState *record)
 {
 	uint8		info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
 
-	if (info != XLOG_LOGICAL_MESSAGE)
+	if (info != XLOG_LOGICAL_MESSAGE &&
+		info != XLOG_LOGICAL_SEQUENCE)
 		elog(PANIC, "logicalmsg_redo: unknown op code %u", info);
 
 	/* This is only interesting for logical decoding, see decode.c. */
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 434926459f..1d9a39ac4b 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -569,17 +569,11 @@ ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change,
 				change->data.truncate.relids = NULL;
 			}
 			break;
-		case REORDER_BUFFER_CHANGE_SEQUENCE:
-			if (change->data.sequence.tuple)
-			{
-				ReorderBufferReturnTupleBuf(rb, change->data.sequence.tuple);
-				change->data.sequence.tuple = NULL;
-			}
-			break;
 		case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM:
 		case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT:
 		case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
 		case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:
+		case REORDER_BUFFER_CHANGE_SEQUENCE:
 			break;
 	}
 
@@ -973,9 +967,10 @@ ReorderBufferSequenceCleanup(ReorderBuffer *rb, TransactionId xid)
 void
 ReorderBufferQueueSequence(ReorderBuffer *rb, TransactionId xid,
 						   Snapshot snapshot, XLogRecPtr lsn, RepOriginId origin_id,
-						   RelFileNode rnode, bool transactional, bool created,
-						   ReorderBufferTupleBuf *tuplebuf)
+						   Oid reloid, RelFileNode rnode, int64 last, int64 log_cnt, bool is_called)
 {
+	bool transactional = TransactionIdIsValid(xid);
+
 	/*
 	 * Change needs to be handled as transactional, because the sequence was
 	 * created in a transaction that is still running. In that case all the
@@ -992,38 +987,6 @@ ReorderBufferQueueSequence(ReorderBuffer *rb, TransactionId xid,
 		MemoryContext oldcontext;
 		ReorderBufferChange *change;
 
-		/* lookup sequence by relfilenode */
-		ReorderBufferSequenceEnt   *ent;
-		bool						found;
-
-		/* transactional changes require a transaction */
-		Assert(xid != InvalidTransactionId);
-
-		/* search the lookup table (we ignore the return value, found is enough) */
-		ent = hash_search(rb->sequences,
-						  (void *) &rnode,
-						  created ? HASH_ENTER : HASH_FIND,
-						  &found);
-
-		/*
-		 * If this is the "create" increment, we must not have found any
-		 * pre-existing entry in the hash table (i.e. there must not be
-		 * any conflicting sequence).
-		 */
-		Assert(!(created && found));
-
-		/* But we must have either created or found an existing entry. */
-		Assert(created || found);
-
-		/*
-		 * When creating the sequence, remember the XID of the transaction
-		 * that created id.
-		 */
-		if (created)
-			ent->xid = xid;
-
-		/* XXX Maybe check that we're still in the same top-level xact? */
-
 		/* OK, allocate and queue the change */
 		oldcontext = MemoryContextSwitchTo(rb->context);
 
@@ -1034,38 +997,26 @@ ReorderBufferQueueSequence(ReorderBuffer *rb, TransactionId xid,
 
 		memcpy(&change->data.sequence.relnode, &rnode, sizeof(RelFileNode));
 
-		change->data.sequence.created = created;
-		change->data.sequence.tuple = tuplebuf;
+		change->data.sequence.reloid = reloid;
+		change->data.sequence.last = last;
+		change->data.sequence.log_cnt = log_cnt;
+		change->data.sequence.is_called = is_called;
 
 		/* add it to the same subxact that created the sequence */
-		ReorderBufferQueueChange(rb, ent->xid, lsn, change, false);
+		ReorderBufferQueueChange(rb, xid, lsn, change, false);
 
 		MemoryContextSwitchTo(oldcontext);
 	}
 	else
 	{
 		/*
-		 * This increment is for a sequence that was not created in any
-		 * running transaction, so we treat it as non-transactional and
-		 * just send it to the output plugin directly.
-		 */
+		* This increment is for a sequence that was not created in any
+		* running transaction, so we treat it as non-transactional and
+		* just send it to the output plugin directly.
+		*/
 		ReorderBufferTXN *txn = NULL;
 		volatile Snapshot snapshot_now = snapshot;
-		bool	using_subtxn;
-
-#ifdef USE_ASSERT_CHECKING
-		/* All "creates" have to be handled as transactional. */
-		Assert(!created);
-
-		/* Make sure the sequence is not in the hash table. */
-		{
-			bool	found;
-			hash_search(rb->sequences,
-						(void *) &rnode,
-						HASH_FIND, &found);
-			Assert(!found);
-		}
-#endif
+		bool    using_subtxn;
 
 		if (xid != InvalidTransactionId)
 			txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
@@ -1074,40 +1025,35 @@ ReorderBufferQueueSequence(ReorderBuffer *rb, TransactionId xid,
 		SetupHistoricSnapshot(snapshot_now, NULL);
 
 		/*
-		 * Decoding needs access to syscaches et al., which in turn use
-		 * heavyweight locks and such. Thus we need to have enough state around to
-		 * keep track of those.  The easiest way is to simply use a transaction
-		 * internally.  That also allows us to easily enforce that nothing writes
-		 * to the database by checking for xid assignments.
-		 *
-		 * When we're called via the SQL SRF there's already a transaction
-		 * started, so start an explicit subtransaction there.
-		 */
+		* Decoding needs access to syscaches et al., which in turn use
+		* heavyweight locks and such. Thus we need to have enough state around to
+		* keep track of those.  The easiest way is to simply use a transaction
+		* internally.  That also allows us to easily enforce that nothing writes
+		* to the database by checking for xid assignments.
+		*
+		* When we're called via the SQL SRF there's already a transaction
+		* started, so start an explicit subtransaction there.
+		*/
 		using_subtxn = IsTransactionOrTransactionBlock();
 
 		PG_TRY();
 		{
 			Relation	relation;
-			HeapTuple	tuple;
-			bool		isnull;
-			int64		last_value, log_cnt;
-			bool		is_called;
-			Oid			reloid;
+//			Oid			reloid;
 
 			if (using_subtxn)
 				BeginInternalSubTransaction("sequence");
 			else
 				StartTransactionCommand();
-
+/*
 			reloid = RelidByRelfilenode(rnode.spcNode, rnode.relNode);
 
 			if (reloid == InvalidOid)
 				elog(ERROR, "could not map filenode \"%s\" to relation OID",
-					 relpathperm(rnode,
-								 MAIN_FORKNUM));
-
+					relpathperm(rnode,
+								MAIN_FORKNUM));
+*/
 			relation = RelationIdGetRelation(reloid);
-			tuple = &tuplebuf->tuple;
 
 			/*
 			 * Extract the internal sequence values, describing the state.
@@ -1115,12 +1061,8 @@ ReorderBufferQueueSequence(ReorderBuffer *rb, TransactionId xid,
 			 * XXX Seems a bit strange to access it directly. Maybe there's
 			 * a better / more correct way?
 			 */
-			last_value = heap_getattr(tuple, 1, RelationGetDescr(relation), &isnull);
-			log_cnt = heap_getattr(tuple, 2, RelationGetDescr(relation), &isnull);
-			is_called = heap_getattr(tuple, 3, RelationGetDescr(relation), &isnull);
-
-			rb->sequence(rb, txn, lsn, relation, transactional, created,
-						 last_value, log_cnt, is_called);
+			rb->sequence(rb, txn, lsn, relation, transactional, false,
+						last, log_cnt, is_called);
 
 			RelationClose(relation);
 
@@ -2248,31 +2190,27 @@ ReorderBufferApplySequence(ReorderBuffer *rb, ReorderBufferTXN *txn,
 						   Relation relation, ReorderBufferChange *change,
 						   bool streaming)
 {
-	HeapTuple	tuple;
-	bool		isnull;
 	int64		last_value, log_cnt;
 	bool		is_called;
 
-	tuple = &change->data.sequence.tuple->tuple;
-
 	/*
 	 * Extract the internal sequence values, describing the state.
 	 *
 	 * XXX Seems a bit strange to access it directly. Maybe there's
 	 * a better / more correct way?
 	 */
-	last_value = heap_getattr(tuple, 1, RelationGetDescr(relation), &isnull);
-	log_cnt = heap_getattr(tuple, 2, RelationGetDescr(relation), &isnull);
-	is_called = heap_getattr(tuple, 3, RelationGetDescr(relation), &isnull);
+	last_value = change->data.sequence.last;
+	log_cnt = change->data.sequence.log_cnt;
+	is_called = change->data.sequence.is_called;
 
 	/* Only ever called from ReorderBufferApplySequence, so transational. */
 	if (streaming)
 		rb->stream_sequence(rb, txn, change->lsn, relation, true,
-							change->data.sequence.created,
+							false,	/* FIXME */
 							last_value, log_cnt, is_called);
 	else
 		rb->sequence(rb, txn, change->lsn, relation, true,
-					 change->data.sequence.created,
+					 false,	/* FIXME */
 					 last_value, log_cnt, is_called);
 }
 
@@ -2722,15 +2660,17 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
 				case REORDER_BUFFER_CHANGE_SEQUENCE:
 					Assert(snapshot_now);
 
+					/*
 					reloid = RelidByRelfilenode(change->data.sequence.relnode.spcNode,
 												change->data.sequence.relnode.relNode);
 
 					if (reloid == InvalidOid)
-						elog(ERROR, "could not map filenode \"%s\" to relation OID",
+						elog(ERROR, "zz could not map filenode \"%s\" to relation OID",
 							 relpathperm(change->data.sequence.relnode,
 										 MAIN_FORKNUM));
 
-					relation = RelationIdGetRelation(reloid);
+					*/
+					relation = RelationIdGetRelation(change->data.sequence.reloid);
 
 					if (!RelationIsValid(relation))
 						elog(ERROR, "could not open relation with OID %u (for filenode \"%s\")",
@@ -4127,45 +4067,13 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
 				memcpy(data, change->data.truncate.relids, size);
 				data += size;
 
-				break;
-			}
-		case REORDER_BUFFER_CHANGE_SEQUENCE:
-			{
-				char	   *data;
-				ReorderBufferTupleBuf *tup;
-				Size		len = 0;
-
-				tup = change->data.sequence.tuple;
-
-				if (tup)
-				{
-					sz += sizeof(HeapTupleData);
-					len = tup->tuple.t_len;
-					sz += len;
-				}
-
-				/* make sure we have enough space */
-				ReorderBufferSerializeReserve(rb, sz);
-
-				data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
-				/* might have been reallocated above */
-				ondisk = (ReorderBufferDiskChange *) rb->outbuf;
-
-				if (len)
-				{
-					memcpy(data, &tup->tuple, sizeof(HeapTupleData));
-					data += sizeof(HeapTupleData);
-
-					memcpy(data, tup->tuple.t_data, len);
-					data += len;
-				}
-
 				break;
 			}
 		case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM:
 		case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT:
 		case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
 		case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:
+		case REORDER_BUFFER_CHANGE_SEQUENCE:
 			/* ReorderBufferChange contains everything important */
 			break;
 	}
@@ -4424,28 +4332,13 @@ ReorderBufferChangeSize(ReorderBufferChange *change)
 			{
 				sz += sizeof(Oid) * change->data.truncate.nrelids;
 
-				break;
-			}
-		case REORDER_BUFFER_CHANGE_SEQUENCE:
-			{
-				ReorderBufferTupleBuf *tup;
-				Size		len = 0;
-
-				tup = change->data.sequence.tuple;
-
-				if (tup)
-				{
-					sz += sizeof(HeapTupleData);
-					len = tup->tuple.t_len;
-					sz += len;
-				}
-
 				break;
 			}
 		case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM:
 		case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT:
 		case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
 		case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:
+		case REORDER_BUFFER_CHANGE_SEQUENCE:
 			/* ReorderBufferChange contains everything important */
 			break;
 	}
@@ -4742,33 +4635,11 @@ ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
 				break;
 			}
 
-		case REORDER_BUFFER_CHANGE_SEQUENCE:
-			if (change->data.sequence.tuple)
-			{
-				uint32		tuplelen = ((HeapTuple) data)->t_len;
-
-				change->data.sequence.tuple =
-					ReorderBufferGetTupleBuf(rb, tuplelen - SizeofHeapTupleHeader);
-
-				/* restore ->tuple */
-				memcpy(&change->data.sequence.tuple->tuple, data,
-					   sizeof(HeapTupleData));
-				data += sizeof(HeapTupleData);
-
-				/* reset t_data pointer into the new tuplebuf */
-				change->data.sequence.tuple->tuple.t_data =
-					ReorderBufferTupleBufData(change->data.sequence.tuple);
-
-				/* restore tuple data itself */
-				memcpy(change->data.sequence.tuple->tuple.t_data, data, tuplelen);
-				data += tuplelen;
-			}
-			break;
-
 		case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM:
 		case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT:
 		case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
 		case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:
+		case REORDER_BUFFER_CHANGE_SEQUENCE:
 			break;
 	}
 
diff --git a/src/include/commands/sequence.h b/src/include/commands/sequence.h
index 5919fb90ee..ddb36b5b17 100644
--- a/src/include/commands/sequence.h
+++ b/src/include/commands/sequence.h
@@ -62,6 +62,8 @@ extern void DeleteSequenceTuple(Oid relid);
 extern void ResetSequence(Oid seq_relid);
 extern void ResetSequenceCaches(void);
 
+extern void AtEOXact_Sequences(bool isCommit);
+
 extern void seq_redo(XLogReaderState *rptr);
 extern void seq_desc(StringInfo buf, XLogReaderState *rptr);
 extern const char *seq_identify(uint8 info);
diff --git a/src/include/replication/message.h b/src/include/replication/message.h
index d3fb324c81..a39e8e984d 100644
--- a/src/include/replication/message.h
+++ b/src/include/replication/message.h
@@ -27,13 +27,28 @@ typedef struct xl_logical_message
 	char		message[FLEXIBLE_ARRAY_MEMBER];
 } xl_logical_message;
 
+/*
+ * Generic logical decoding sequence wal record.
+ */
+typedef struct xl_logical_sequence
+{
+	RelFileNode	node;
+	Oid			reloid;
+	int64		last;			/* last value emitted for sequence */
+	int64		log_cnt;		/* last value cached for sequence */
+	bool		is_called;
+} xl_logical_sequence;
+
 #define SizeOfLogicalMessage	(offsetof(xl_logical_message, message))
+#define SizeOfLogicalSequence	(sizeof(xl_logical_sequence))
 
 extern XLogRecPtr LogLogicalMessage(const char *prefix, const char *message,
 									size_t size, bool transactional);
 
 /* RMGR API*/
 #define XLOG_LOGICAL_MESSAGE	0x00
+#define XLOG_LOGICAL_SEQUENCE	0x10
+
 void		logicalmsg_redo(XLogReaderState *record);
 void		logicalmsg_desc(StringInfo buf, XLogReaderState *record);
 const char *logicalmsg_identify(uint8 info);
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index 63e6ed037b..eb78d5dff4 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -164,8 +164,10 @@ typedef struct ReorderBufferChange
 		struct
 		{
 			RelFileNode relnode;
-			bool		created;
-			ReorderBufferTupleBuf *tuple;
+			Oid			reloid;
+			int64		last;
+			int64		log_cnt;
+			bool		is_called;
 		}			sequence;
 	}			data;
 
@@ -672,8 +674,8 @@ void		ReorderBufferQueueMessage(ReorderBuffer *, TransactionId, Snapshot snapsho
 									  Size message_size, const char *message);
 void		ReorderBufferQueueSequence(ReorderBuffer *rb, TransactionId xid,
 									   Snapshot snapshot, XLogRecPtr lsn, RepOriginId origin_id,
-									   RelFileNode rnode, bool transactional, bool created,
-									   ReorderBufferTupleBuf *tuplebuf);
+									   Oid reloid, RelFileNode rnode,
+									   int64 last, int64 log_cnt, bool is_called);
 void		ReorderBufferCommit(ReorderBuffer *, TransactionId,
 								XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
 								TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn);
-- 
2.31.1

