From 0a9b98a857e8de02394dc8aefe365a72e50a222e Mon Sep 17 00:00:00 2001
From: Craig Ringer <craig@2ndquadrant.com>
Date: Tue, 5 Dec 2017 13:32:45 +0800
Subject: [PATCH v1] Clean up reorder buffer files when starting logical
 decoding

We could fail to clean up reorder buffer files if the walsender exited due to a
client disconnect, because we'd skip both the normal exit and error paths.
---
 src/backend/replication/logical/reorderbuffer.c | 84 +++++++++++++++----------
 1 file changed, 50 insertions(+), 34 deletions(-)

diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index fa95bab..bc562e2 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -196,6 +196,7 @@ static Size ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn
 static void ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
 						   char *change);
 static void ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn);
+static void ReorderBufferCleanSerializedTXNs(const char *slotname);
 
 static void ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap);
 static Snapshot ReorderBufferCopySnap(ReorderBuffer *rb, Snapshot orig_snap,
@@ -214,7 +215,8 @@ static void ReorderBufferToastAppendChunk(ReorderBuffer *rb, ReorderBufferTXN *t
 
 
 /*
- * Allocate a new ReorderBuffer
+ * Allocate a new ReorderBuffer and clean out any old serialized state from
+ * prior ReorderBuffer instances for the same slot.
  */
 ReorderBuffer *
 ReorderBufferAllocate(void)
@@ -223,6 +225,8 @@ ReorderBufferAllocate(void)
 	HASHCTL		hash_ctl;
 	MemoryContext new_ctx;
 
+	Assert(MyReplicationSlot != NULL);
+
 	/* allocate memory in own context, to have better accountability */
 	new_ctx = AllocSetContextCreate(CurrentMemoryContext,
 									"ReorderBuffer",
@@ -266,6 +270,9 @@ ReorderBufferAllocate(void)
 
 	dlist_init(&buffer->toplevel_by_lsn);
 
+	/* Ensure there's no stale data from prior uses of this slot */
+	ReorderBufferCleanSerializedTXNs(NameStr(MyReplicationSlot->data.name));
+
 	return buffer;
 }
 
@@ -2551,6 +2558,47 @@ ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn)
 }
 
 /*
+ * Remove any leftover serialized reorder buffers from a slot directory after a
+ * prior crash or decoding session exit.
+ */
+static void
+ReorderBufferCleanSerializedTXNs(const char *slotname)
+{
+	DIR		   *spill_dir;
+	struct dirent *spill_de;
+	struct stat statbuf;
+	char		path[MAXPGPATH * 2 + 12];
+
+	sprintf(path, "pg_replslot/%s", slotname);
+
+	/* we're only handling directories here, skip if it's not our's */
+	if (lstat(path, &statbuf) == 0 && !S_ISDIR(statbuf.st_mode))
+		return;
+
+	spill_dir = AllocateDir(path);
+	while ((spill_de = ReadDir(spill_dir, path)) != NULL)
+	{
+		if (strcmp(spill_de->d_name, ".") == 0 ||
+			strcmp(spill_de->d_name, "..") == 0)
+			continue;
+
+		/* only look at names that can be ours */
+		if (strncmp(spill_de->d_name, "xid", 3) == 0)
+		{
+			sprintf(path, "pg_replslot/%s/%s", slotname,
+					spill_de->d_name);
+
+			if (unlink(path) != 0)
+				ereport(PANIC,
+						(errcode_for_file_access(),
+						 errmsg("could not remove file \"%s\": %m",
+								path)));
+		}
+	}
+	FreeDir(spill_dir);
+}
+
+/*
  * Delete all data spilled to disk after we've restarted/crashed. It will be
  * recreated when the respective slots are reused.
  */
@@ -2560,15 +2608,9 @@ StartupReorderBuffer(void)
 	DIR		   *logical_dir;
 	struct dirent *logical_de;
 
-	DIR		   *spill_dir;
-	struct dirent *spill_de;
-
 	logical_dir = AllocateDir("pg_replslot");
 	while ((logical_de = ReadDir(logical_dir, "pg_replslot")) != NULL)
 	{
-		struct stat statbuf;
-		char		path[MAXPGPATH * 2 + 12];
-
 		if (strcmp(logical_de->d_name, ".") == 0 ||
 			strcmp(logical_de->d_name, "..") == 0)
 			continue;
@@ -2581,33 +2623,7 @@ StartupReorderBuffer(void)
 		 * ok, has to be a surviving logical slot, iterate and delete
 		 * everything starting with xid-*
 		 */
-		sprintf(path, "pg_replslot/%s", logical_de->d_name);
-
-		/* we're only creating directories here, skip if it's not our's */
-		if (lstat(path, &statbuf) == 0 && !S_ISDIR(statbuf.st_mode))
-			continue;
-
-		spill_dir = AllocateDir(path);
-		while ((spill_de = ReadDir(spill_dir, path)) != NULL)
-		{
-			if (strcmp(spill_de->d_name, ".") == 0 ||
-				strcmp(spill_de->d_name, "..") == 0)
-				continue;
-
-			/* only look at names that can be ours */
-			if (strncmp(spill_de->d_name, "xid", 3) == 0)
-			{
-				sprintf(path, "pg_replslot/%s/%s", logical_de->d_name,
-						spill_de->d_name);
-
-				if (unlink(path) != 0)
-					ereport(PANIC,
-							(errcode_for_file_access(),
-							 errmsg("could not remove file \"%s\": %m",
-									path)));
-			}
-		}
-		FreeDir(spill_dir);
+		ReorderBufferCleanSerializedTXNs(logical_de->d_name);
 	}
 	FreeDir(logical_dir);
 }
-- 
2.9.5

