From 32a460c977384e580d38b58e3c5ac5359a7f56b9 Mon Sep 17 00:00:00 2001
From: Heikki Linnakangas <heikki.linnakangas@iki.fi>
Date: Thu, 24 Sep 2020 19:35:39 +0300
Subject: [PATCH v3 5/5] Allow pg_rewind to use a standby server as the source
 system.

Using a hot standby server as the source has not been possible, because
pg_rewind creates a temporary table in the source system, to hold the
list of file ranges that need to be fetched. Refactor it to queue up the
file fetch requests in pg_rewind's memory, so that the temporary table
is no longer needed.

Also update the logic to compute 'minRecoveryPoint' correctly, when the
source is a standby server.

Reviewed-by: Kyotaro Horiguchi, Soumyadeep Chakraborty
Discussion: https://www.postgresql.org/message-id/0c5b3783-af52-3ee5-f8fa-6e794061f70d%40iki.fi
---
 src/bin/pg_rewind/libpq_source.c | 277 ++++++++++++++++++++++---------
 src/bin/pg_rewind/pg_rewind.c    |  74 +++++++--
 2 files changed, 259 insertions(+), 92 deletions(-)

diff --git a/src/bin/pg_rewind/libpq_source.c b/src/bin/pg_rewind/libpq_source.c
index 30294d582ee..5e0f02911e0 100644
--- a/src/bin/pg_rewind/libpq_source.c
+++ b/src/bin/pg_rewind/libpq_source.c
@@ -14,30 +14,51 @@
 #include "datapagemap.h"
 #include "file_ops.h"
 #include "filemap.h"
+#include "lib/stringinfo.h"
 #include "pg_rewind.h"
 #include "port/pg_bswap.h"
 #include "rewind_source.h"
 
 /*
- * Files are fetched max CHUNKSIZE bytes at a time.
- *
- * (This only applies to files that are copied in whole, or for truncated
- * files where we copy the tail. Relation files, where we know the individual
- * blocks that need to be fetched, are fetched in BLCKSZ chunks.)
+ * Files are fetched MAX_CHUNK_SIZE bytes at a time, and with a
+ * maximum of MAX_CHUNKS_PER_QUERY chunks in a single query.
  */
-#define CHUNKSIZE 1000000
+#define MAX_CHUNK_SIZE (1024 * 1024)
+#define MAX_CHUNKS_PER_QUERY 1000
+
+/* represents the request to fetch a piece of a file from the source */
+typedef struct
+{
+	const char *path;		/* path relative to data directory root */
+	off_t		offset;
+	size_t		length;
+} fetch_range_request;
 
 typedef struct
 {
 	rewind_source common;	/* common interface functions */
 
 	PGconn	   *conn;
-	bool		copy_started;
+
+	/*
+	 * Queue of chunks that have been requested with the queue_fetch_range()
+	 * function, but have not been fetched from the remote server yet.
+	 */
+	int			num_requests;
+	fetch_range_request request_queue[MAX_CHUNKS_PER_QUERY];
+
+	/* temporary space for process_queued_fetch_requests() */
+	StringInfoData paths;
+	StringInfoData offsets;
+	StringInfoData lengths;
 } libpq_source;
 
 static void init_libpq_conn(PGconn *conn);
 static char *run_simple_query(PGconn *conn, const char *sql);
 static void run_simple_command(PGconn *conn, const char *sql);
+static void appendArrayEscapedString(StringInfo buf, const char *str);
+
+static void process_queued_fetch_requests(libpq_source *src);
 
 /* public interface functions */
 static void libpq_traverse_files(rewind_source *source,
@@ -74,6 +95,10 @@ init_libpq_source(PGconn *conn)
 
 	src->conn = conn;
 
+	initStringInfo(&src->paths);
+	initStringInfo(&src->offsets);
+	initStringInfo(&src->lengths);
+
 	return &src->common;
 }
 
@@ -91,6 +116,12 @@ init_libpq_conn(PGconn *conn)
 	run_simple_command(conn, "SET lock_timeout = 0");
 	run_simple_command(conn, "SET idle_in_transaction_session_timeout = 0");
 
+	/*
+	 * we don't intend to do any updates, put the connection in read-only mode
+	 * to keep us honest
+	 */
+	run_simple_command(conn, "SET default_transaction_read_only = on");
+
 	/* secure search_path */
 	res = PQexec(conn, ALWAYS_SECURE_SEARCH_PATH_SQL);
 	if (PQresultStatus(res) != PGRES_TUPLES_OK)
@@ -98,17 +129,6 @@ init_libpq_conn(PGconn *conn)
 				 PQresultErrorMessage(res));
 	PQclear(res);
 
-	/*
-	 * Check that the server is not in hot standby mode. There is no
-	 * fundamental reason that couldn't be made to work, but it doesn't
-	 * currently because we use a temporary table. Better to check for it
-	 * explicitly than error out, for a better error message.
-	 */
-	str = run_simple_query(conn, "SELECT pg_is_in_recovery()");
-	if (strcmp(str, "f") != 0)
-		pg_fatal("source server must not be in recovery mode");
-	pg_free(str);
-
 	/*
 	 * Also check that full_page_writes is enabled.  We can get torn pages if
 	 * a page is modified while we read it with pg_read_binary_file(), and we
@@ -118,6 +138,18 @@ init_libpq_conn(PGconn *conn)
 	if (strcmp(str, "on") != 0)
 		pg_fatal("full_page_writes must be enabled in the source server");
 	pg_free(str);
+
+	/* Prepare a statement we'll use to fetch files */
+	res = PQprepare(conn, "fetch_chunks_stmt",
+					"SELECT path, begin,\n"
+					"  pg_read_binary_file(path, begin, len, true) AS chunk\n"
+					"FROM unnest ($1::text[], $2::int8[], $3::int4[]) as x(path, begin, len)",
+					3, NULL);
+
+	if (PQresultStatus(res) != PGRES_COMMAND_OK)
+		pg_fatal("could not prepare statement to fetch file contents: %s",
+				 PQresultErrorMessage(res));
+	PQclear(res);
 }
 
 /*
@@ -283,94 +315,128 @@ libpq_queue_fetch_range(rewind_source *source, const char *path, off_t off,
 						size_t len)
 {
 	libpq_source *src = (libpq_source *) source;
-	uint64		begin = off;
-	uint64		end = off + len;
 
 	/*
-	 * On first call, create a temporary table, and start COPYing to it.
-	 * We will load it with the list of blocks that we need to fetch.
+	 * Does this request happen to be a continuation of the previous chunk?
+	 * If so, merge it with the previous one.
+	 *
+	 * XXX: We use pointer equality to compare the path. That's good enough
+	 * for our purposes; the caller always passes the same pointer for the
+	 * same filename. If it didn't, we would fail to merge requests, but it
+	 * wouldn't affect correctness.
 	 */
-	if (!src->copy_started)
+	if (src->num_requests > 0)
 	{
-		PGresult   *res;
+		fetch_range_request *prev = &src->request_queue[src->num_requests - 1];
 
-		run_simple_command(src->conn, "CREATE TEMPORARY TABLE fetchchunks(path text, begin int8, len int4)");
+		if (prev->offset + prev->length == off &&
+			prev->length < MAX_CHUNK_SIZE &&
+			prev->path == path)
+		{
+			/*
+			 * Extend the previous request to cover as much of this new request
+			 * as possible, without exceeding MAX_CHUNK_SIZE.
+			 */
+			size_t		thislen;
 
-		res = PQexec(src->conn, "COPY fetchchunks FROM STDIN");
-		if (PQresultStatus(res) != PGRES_COPY_IN)
-			pg_fatal("could not send file list: %s",
-					 PQresultErrorMessage(res));
-		PQclear(res);
+			thislen = Min(len, MAX_CHUNK_SIZE - prev->length);
+			prev->length += thislen;
 
-		src->copy_started = true;
-	}
+			off += thislen;
+			len -= thislen;
 
-	/*
-	 * Write the file range to a temporary table in the server.
-	 *
-	 * The range is sent to the server as a COPY formatted line, to be inserted
-	 * into the 'fetchchunks' temporary table. The libpq_finish_fetch() uses
-	 * the temporary table to actually fetch the data.
-	 */
+			/*
+			 * Fall through to create new requests for any remaining 'len' that
+			 * didn't fit in the previous chunk.
+			 */
+		}
+	}
 
-	/* Split the range into CHUNKSIZE chunks */
-	while (end - begin > 0)
+	/* Divide the request into pieces of MAX_CHUNK_SIZE bytes each */
+	while (len > 0)
 	{
-		char		linebuf[MAXPGPATH + 23];
-		unsigned int len;
+		int32		thislen;
 
-		/* Fine as long as CHUNKSIZE is not bigger than UINT32_MAX */
-		if (end - begin > CHUNKSIZE)
-			len = CHUNKSIZE;
-		else
-			len = (unsigned int) (end - begin);
-
-		snprintf(linebuf, sizeof(linebuf), "%s\t" UINT64_FORMAT "\t%u\n", path, begin, len);
+		/* if the queue is full, perform all the work queued up so far */
+		if (src->num_requests == MAX_CHUNKS_PER_QUERY)
+			process_queued_fetch_requests(src);
 
-		if (PQputCopyData(src->conn, linebuf, strlen(linebuf)) != 1)
-			pg_fatal("could not send COPY data: %s",
-					 PQerrorMessage(src->conn));
+		thislen = Min(len, MAX_CHUNK_SIZE);
+		src->request_queue[src->num_requests].path = path;
+		src->request_queue[src->num_requests].offset = off;
+		src->request_queue[src->num_requests].length = thislen;
+		src->num_requests++;
 
-		begin += len;
+		off += thislen;
+		len -= thislen;
 	}
 }
 
 /*
- * Receive all the queued chunks and write them to the target data directory.
+ * Fetch all the queued chunks and writes them to the target data directory.
  */
 static void
 libpq_finish_fetch(rewind_source *source)
 {
-	libpq_source *src = (libpq_source *) source;
+	process_queued_fetch_requests((libpq_source *) source);
+}
+
+/*
+ * Receive all the queued chunks and write them to the target data directory.
+ */
+static void
+process_queued_fetch_requests(libpq_source *src)
+{
+	const char *params[3];
 	PGresult   *res;
-	const char *sql;
+	int			chunkno;
 
-	if (PQputCopyEnd(src->conn, NULL) != 1)
-		pg_fatal("could not send end-of-COPY: %s",
-				 PQerrorMessage(src->conn));
+	if (src->num_requests == 0)
+		return;
 
-	while ((res = PQgetResult(src->conn)) != NULL)
+	pg_log_debug("getting %d file chunks", src->num_requests);
+
+	/*
+	 * The prepared statement, 'fetch_chunks_stmt', takes three arrays
+	 * with the same length as parameters: paths, offsets and lengths.
+	 * Construct the string representations of the parameter arrays.
+	 */
+	resetStringInfo(&src->paths);
+	resetStringInfo(&src->offsets);
+	resetStringInfo(&src->lengths);
+
+	appendStringInfoChar(&src->paths, '{');
+	appendStringInfoChar(&src->offsets, '{');
+	appendStringInfoChar(&src->lengths, '{');
+	for (int i = 0; i < src->num_requests; i++)
 	{
-		if (PQresultStatus(res) != PGRES_COMMAND_OK)
-			pg_fatal("unexpected result while sending file list: %s",
-					 PQresultErrorMessage(res));
-		PQclear(res);
+		fetch_range_request *rq = &src->request_queue[i];
+
+		if (i > 0)
+		{
+			appendStringInfoChar(&src->paths, ',');
+			appendStringInfoChar(&src->offsets, ',');
+			appendStringInfoChar(&src->lengths, ',');
+		}
+
+		appendArrayEscapedString(&src->paths, rq->path);
+		appendStringInfo(&src->offsets, INT64_FORMAT, (int64) rq->offset);
+		appendStringInfo(&src->lengths, INT64_FORMAT, (int64) rq->length);
 	}
+	appendStringInfoChar(&src->paths, '}');
+	appendStringInfoChar(&src->offsets, '}');
+	appendStringInfoChar(&src->lengths, '}');
 
 	/*
-	 * We've now copied the list of file ranges that we need to fetch to the
-	 * temporary table. Now, actually fetch all of those ranges.
+	 * Execute the prepared statement.
 	 */
-	sql =
-		"SELECT path, begin,\n"
-		"  pg_read_binary_file(path, begin, len, true) AS chunk\n"
-		"FROM fetchchunks\n";
+	params[0] = src->paths.data;
+	params[1] = src->offsets.data;
+	params[2] = src->lengths.data;
 
-	if (PQsendQueryParams(src->conn, sql, 0, NULL, NULL, NULL, NULL, 1) != 1)
+	if (PQsendQueryPrepared(src->conn, "fetch_chunks_stmt", 3, params, NULL, NULL, 1) != 1)
 		pg_fatal("could not send query: %s", PQerrorMessage(src->conn));
 
-	pg_log_debug("getting file chunks");
-
 	if (PQsetSingleRowMode(src->conn) != 1)
 		pg_fatal("could not set libpq connection to single row mode");
 
@@ -382,8 +448,10 @@ libpq_finish_fetch(rewind_source *source)
 	 * chunk	bytea	-- file content
 	 *----
 	 */
+	chunkno = 0;
 	while ((res = PQgetResult(src->conn)) != NULL)
 	{
+		fetch_range_request *rq = &src->request_queue[chunkno];
 		char	   *filename;
 		int			filenamelen;
 		int64		chunkoff;
@@ -404,6 +472,9 @@ libpq_finish_fetch(rewind_source *source)
 						 PQresultErrorMessage(res));
 		}
 
+		if (chunkno > src->num_requests)
+			pg_fatal("received more data chunks than requested");
+
 		/* sanity check the result set */
 		if (PQnfields(res) != 3 || PQntuples(res) != 1)
 			pg_fatal("unexpected result set size while fetching remote files");
@@ -448,9 +519,7 @@ libpq_finish_fetch(rewind_source *source)
 		 * If a file has been deleted on the source, remove it on the target
 		 * as well.  Note that multiple unlink() calls may happen on the same
 		 * file if multiple data chunks are associated with it, hence ignore
-		 * unconditionally anything missing.  If this file is not a relation
-		 * data file, then it has been already truncated when creating the
-		 * file chunk list at the previous execution of the filemap.
+		 * unconditionally anything missing.
 		 */
 		if (PQgetisnull(res, 0, 2))
 		{
@@ -465,6 +534,26 @@ libpq_finish_fetch(rewind_source *source)
 		pg_log_debug("received chunk for file \"%s\", offset " INT64_FORMAT ", size %d",
 					 filename, chunkoff, chunksize);
 
+		if (strcmp(filename, rq->path) != 0)
+		{
+			pg_fatal("received data for file \"%s\", when requested for \"%s\"",
+					 filename, rq->path);
+		}
+		if (chunkoff != rq->offset)
+			pg_fatal("received data at offset " INT64_FORMAT" of file \"%s\", when requested for offset " INT64_FORMAT,
+					 chunkoff, rq->path, (int64) rq->offset);
+
+		/*
+		 * We should not receive receive more data than we requested, or
+		 * pg_read_binary_file() messed up.  We could receive less, though, if
+		 * the file was truncated in the source after checked it size.  That's
+		 * OK, there should be a WAL record of the truncation, which will get
+		 * replayed when you start the target system for the first time after
+		 * pg_rewind has completed.
+		 */
+		if (chunksize > rq->length)
+			pg_fatal("received more than requested for file \"%s\"", rq->path);
+
 		open_target_file(filename, false);
 
 		write_target_range(chunk, chunkoff, chunksize);
@@ -472,7 +561,33 @@ libpq_finish_fetch(rewind_source *source)
 		pg_free(filename);
 
 		PQclear(res);
+		chunkno++;
+	}
+	if (chunkno != src->num_requests)
+		pg_fatal("unexpected number of data chunks received");
+
+	src->num_requests = 0;
+}
+
+/*
+ * Escape a string to be used as element in a text array constant
+ */
+static void
+appendArrayEscapedString(StringInfo buf, const char *str)
+{
+	appendStringInfoCharMacro(buf, '\"');
+	while (*str)
+	{
+		char		ch = *str;
+
+		if (ch == '"' || ch == '\\')
+			appendStringInfoCharMacro(buf, '\\');
+
+		appendStringInfoCharMacro(buf, ch);
+
+		str++;
 	}
+	appendStringInfoCharMacro(buf, '\"');
 }
 
 /*
@@ -521,6 +636,12 @@ libpq_fetch_file(rewind_source *source, const char *path, size_t *filesize)
 static void
 libpq_destroy(rewind_source *source)
 {
-	pfree(source);
+	libpq_source *src = (libpq_source *) source;
+
+	pfree(src->paths.data);
+	pfree(src->offsets.data);
+	pfree(src->lengths.data);
+	pfree(src);
+
 	/* NOTE: we don't close the connection here, as it was not opened by us. */
 }
diff --git a/src/bin/pg_rewind/pg_rewind.c b/src/bin/pg_rewind/pg_rewind.c
index 33042035424..036f24c332b 100644
--- a/src/bin/pg_rewind/pg_rewind.c
+++ b/src/bin/pg_rewind/pg_rewind.c
@@ -50,6 +50,7 @@ static void disconnect_atexit(void);
 
 static ControlFileData ControlFile_target;
 static ControlFileData ControlFile_source;
+static ControlFileData ControlFile_source_after;
 
 const char *progname;
 int			WalSegSz;
@@ -487,6 +488,8 @@ perform_rewind(filemap_t *filemap, rewind_source *source,
 	XLogRecPtr	endrec;
 	TimeLineID	endtli;
 	ControlFileData ControlFile_new;
+	size_t		size;
+	char	   *buffer;
 
 	/*
 	 * Execute the actions in the file map, fetching data from the source
@@ -553,40 +556,83 @@ perform_rewind(filemap_t *filemap, rewind_source *source,
 		}
 	}
 
-	/*
-	 * We've now copied the list of file ranges that we need to fetch to the
-	 * temporary table. Now, actually fetch all of those ranges.
-	 */
+	/* Complete any remaining range-fetches that we queued up above. */
 	source->finish_fetch(source);
 
 	close_target_file();
 
 	progress_report(true);
 
+	/*
+	 * Fetch the control file from the source last. This ensures that the
+	 * minRecoveryPoint is up-to-date.
+	 */
+	buffer = source->fetch_file(source, "global/pg_control", &size);
+	digestControlFile(&ControlFile_source_after, buffer, size);
+	pg_free(buffer);
+
+	/*
+	 * Sanity check: If the source is a local system, the control file should
+	 * not have changed since we started.
+	 *
+	 * XXX: We assume it hasn't been modified, but actually, what could go
+	 * wrong? The logic handles a libpq source that's modified concurrently,
+	 * why not a local datadir?
+	 */
+	if (datadir_source &&
+		memcmp(&ControlFile_source, &ControlFile_source_after,
+			   sizeof(ControlFileData)) != 0)
+	{
+		pg_fatal("source system was modified while pg_rewind was running");
+	}
+
 	if (showprogress)
 		pg_log_info("creating backup label and updating control file");
 	createBackupLabel(chkptredo, chkpttli, chkptrec);
 
 	/*
 	 * Update control file of target. Make it ready to perform archive
-	 * recovery when restarting.
+	 * recovery when restarting, starting from the last common checkpoint.
 	 *
-	 * minRecoveryPoint is set to the current WAL insert location in the
-	 * source server. Like in an online backup, it's important that we recover
-	 * all the WAL that was generated while we copied the files over.
+	 * Like in an online backup, it's important that we replay all the WAL
+	 * that was generated while we copied the files over. To enforce that,
+	 * set 'minRecoveryPoint' in the control file.
 	 */
-	memcpy(&ControlFile_new, &ControlFile_source, sizeof(ControlFileData));
-
 	if (connstr_source)
 	{
-		endrec = source->get_current_wal_insert_lsn(source);
-		endtli = ControlFile_source.checkPointCopy.ThisTimeLineID;
+		if (ControlFile_source_after.state == DB_IN_ARCHIVE_RECOVERY)
+		{
+			/*
+			 * Source is a standby server. We must replay to its
+			 * minRecoveryPoint.
+			 */
+			endrec = ControlFile_source_after.minRecoveryPoint;
+			endtli = ControlFile_source_after.minRecoveryPointTLI;
+		}
+		else
+		{
+			/*
+			 * Source is a production, non-standby, server. We must recover up
+			 * to the last WAL insert location.
+			 */
+			if (ControlFile_source_after.state != DB_IN_PRODUCTION)
+				pg_fatal("source system was in unexpected state at end of rewind");
+
+			endrec = source->get_current_wal_insert_lsn(source);
+			endtli = ControlFile_source_after.checkPointCopy.ThisTimeLineID;
+		}
 	}
 	else
 	{
-		endrec = ControlFile_source.checkPoint;
-		endtli = ControlFile_source.checkPointCopy.ThisTimeLineID;
+		/*
+		 * Source is a local data directory. It should've shut down cleanly,
+		 * and we must replay to the latest shutdown checkpoint.
+		 */
+		endrec = ControlFile_source_after.checkPoint;
+		endtli = ControlFile_source_after.checkPointCopy.ThisTimeLineID;
 	}
+
+	memcpy(&ControlFile_new, &ControlFile_source_after, sizeof(ControlFileData));
 	ControlFile_new.minRecoveryPoint = endrec;
 	ControlFile_new.minRecoveryPointTLI = endtli;
 	ControlFile_new.state = DB_IN_ARCHIVE_RECOVERY;
-- 
2.20.1

