From 9575133ca5da2b2f6827828cb0c26c6122b328f9 Mon Sep 17 00:00:00 2001
From: Heikki Linnakangas <heikki.linnakangas@iki.fi>
Date: Wed, 19 Aug 2020 15:34:43 +0300
Subject: [PATCH v3 5/5] Allow pg_rewind to use a standby server as the source
 system.

Using a hot standby server as the source has not been possible, because
pg_rewind creates a temporary table in the source system, to hold the
list of file ranges that need to be fetched. Refactor it to queue up the
file fetch requests in pg_rewind's memory, so that the temporary table
is no longer needed.

Also update the logic to compute 'minRecoveryPoint' correctly, when the
source is a standby server.
---
 src/bin/pg_rewind/libpq_fetch.c | 281 ++++++++++++++++++++++----------
 src/bin/pg_rewind/pg_rewind.c   |  67 ++++++--
 2 files changed, 253 insertions(+), 95 deletions(-)

diff --git a/src/bin/pg_rewind/libpq_fetch.c b/src/bin/pg_rewind/libpq_fetch.c
index 52c4e147e1..f61a424299 100644
--- a/src/bin/pg_rewind/libpq_fetch.c
+++ b/src/bin/pg_rewind/libpq_fetch.c
@@ -15,39 +15,61 @@
 #include "fetch.h"
 #include "file_ops.h"
 #include "filemap.h"
+#include "lib/stringinfo.h"
 #include "pg_rewind.h"
 #include "port/pg_bswap.h"
 
 /*
- * Files are fetched max CHUNKSIZE bytes at a time.
- *
- * (This only applies to files that are copied in whole, or for truncated
- * files where we copy the tail. Relation files, where we know the individual
- * blocks that need to be fetched, are fetched in BLCKSZ chunks.)
+ * Files are fetched max CHUNK_SIZE bytes at a time, and with a
+ * maximum of MAX_CHUNKS_PER_QUERY chunks in a single query.
  */
-#define CHUNKSIZE 1000000
+#define CHUNK_SIZE (1024 * 1024)
+#define MAX_CHUNKS_PER_QUERY 1000
+
+/* represents the request to fetch a piece of a file from the source */
+typedef struct
+{
+	const char *path;		/* path relative to data directory root */
+	uint64		offset;
+	uint32		length;
+} fetch_range_request;
 
 typedef struct
 {
 	rewind_source common;	/* common interface functions */
 
 	PGconn	   *conn;
+
+	/*
+	 * Queue of chunks that have been requested with the queue_fetch_range()
+	 * function, but have not been fetched from the remote server yet.
+	 */
+	int			num_requests;
+	fetch_range_request request_queue[MAX_CHUNKS_PER_QUERY];
+
+	/* temporary space for process_queued_fetch_requests() */
+	StringInfoData paths;
+	StringInfoData offsets;
+	StringInfoData lengths;
 } libpq_source;
 
 static void init_libpq_conn(PGconn *conn);
 static char *run_simple_query(PGconn *conn, const char *sql);
 static void run_simple_command(PGconn *conn, const char *sql);
+static void appendArrayEscapedString(StringInfo buf, const char *str);
+
+static void process_queued_fetch_requests(libpq_source *src);
 
 /* public interface functions */
 static void libpq_traverse_files(rewind_source *source,
 								 process_file_callback_t callback);
-static char *libpq_fetch_file(rewind_source *source, const char *path,
-							  size_t *filesize);
 static void libpq_queue_fetch_range(rewind_source *source, const char *path,
 									uint64 off, size_t len);
 static void libpq_finish_fetch(rewind_source *source);
-static void libpq_destroy(rewind_source *source);
+static char *libpq_fetch_file(rewind_source *source, const char *path,
+							  size_t *filesize);
 static XLogRecPtr libpq_get_current_wal_insert_lsn(rewind_source *source);
+static void libpq_destroy(rewind_source *source);
 
 /*
  * Create a new libpq source.
@@ -73,6 +95,10 @@ init_libpq_source(PGconn *conn)
 
 	src->conn = conn;
 
+	initStringInfo(&src->paths);
+	initStringInfo(&src->offsets);
+	initStringInfo(&src->lengths);
+
 	return &src->common;
 }
 
@@ -90,7 +116,10 @@ init_libpq_conn(PGconn *conn)
 	run_simple_command(conn, "SET lock_timeout = 0");
 	run_simple_command(conn, "SET idle_in_transaction_session_timeout = 0");
 
-	/* we don't intend do any updates. Put the connection in read-only mode to keep us honest */
+	/*
+	 * We don't intend do any updates.  Put the connection in read-only mode
+	 * to keep us honest.
+	 */
 	run_simple_command(conn, "SET default_transaction_read_only = off");
 
 	/* secure search_path */
@@ -100,17 +129,6 @@ init_libpq_conn(PGconn *conn)
 				 PQresultErrorMessage(res));
 	PQclear(res);
 
-	/*
-	 * Check that the server is not in hot standby mode. There is no
-	 * fundamental reason that couldn't be made to work, but it doesn't
-	 * currently because we use a temporary table. Better to check for it
-	 * explicitly than error out, for a better error message.
-	 */
-	str = run_simple_query(conn, "SELECT pg_is_in_recovery()");
-	if (strcmp(str, "f") != 0)
-		pg_fatal("source server must not be in recovery mode");
-	pg_free(str);
-
 	/*
 	 * Also check that full_page_writes is enabled.  We can get torn pages if
 	 * a page is modified while we read it with pg_read_binary_file(), and we
@@ -121,15 +139,15 @@ init_libpq_conn(PGconn *conn)
 		pg_fatal("full_page_writes must be enabled in the source server");
 	pg_free(str);
 
-	/*
-	 * First create a temporary table, and COPY to load it with the list of
-	 * blocks that we need to fetch.
-	 */
-	run_simple_command(conn, "CREATE TEMPORARY TABLE fetchchunks(path text, begin int8, len int4)");
+	/* Prepare a statement we'll use to fetch files */
+	res = PQprepare(conn, "fetch_chunks_stmt",
+					"SELECT path, begin,\n"
+					"  pg_read_binary_file(path, begin, len, true) AS chunk\n"
+					"FROM unnest ($1::text[], $2::int8[], $3::int4[]) as x(path, begin, len)",
+					3, NULL);
 
-	res = PQexec(conn, "COPY fetchchunks FROM STDIN");
-	if (PQresultStatus(res) != PGRES_COPY_IN)
-		pg_fatal("could not send file list: %s",
+	if (PQresultStatus(res) != PGRES_COMMAND_OK)
+		pg_fatal("could not prepare statement to fetch file contents: %s",
 				 PQresultErrorMessage(res));
 	PQclear(res);
 }
@@ -297,91 +315,143 @@ libpq_queue_fetch_range(rewind_source *source, const char *path, uint64 off,
 						size_t len)
 {
 	libpq_source *src = (libpq_source *) source;
-	uint64		begin = off;
-	uint64		end = off + len;
 
 	/*
-	 * Write the file range to a temporary table in the server.
+	 * Does this request happen to be a continuation of the previous chunk?
+	 * If so, merge it with the previous one.
 	 *
-	 * The range is sent to the server as a COPY formatted line, to be inserted
-	 * into the 'fetchchunks' temporary table. The libpq_finish_fetch() uses
-	 * the temporary table to actually fetch the data.
+	 * XXX: We use pointer equality to compare the path. That's good enough
+	 * for our purposes; the caller always passes the same pointer for the
+	 * same filename. If it didn't, we would fail to merge requests, but it
+	 * wouldn't affect correctness.
 	 */
-
-	/* Split the range into CHUNKSIZE chunks */
-	while (end - begin > 0)
+	if (src->num_requests > 0)
 	{
-		char		linebuf[MAXPGPATH + 23];
-		unsigned int len;
+		fetch_range_request *prev = &src->request_queue[src->num_requests - 1];
+
+		if (prev->offset + prev->length == off &&
+			prev->length < CHUNK_SIZE &&
+			prev->path == path)
+		{
+			/*
+			 * Extend the previous request to cover as much of this new request
+			 * as possible, without exceeding CHUNK_SIZE.
+			 */
+			int32		thislen;
+
+			thislen = Min(len, CHUNK_SIZE - prev->length);
+			src->request_queue[src->num_requests - 1].length += thislen;
 
-		/* Fine as long as CHUNKSIZE is not bigger than UINT32_MAX */
-		if (end - begin > CHUNKSIZE)
-			len = CHUNKSIZE;
-		else
-			len = (unsigned int) (end - begin);
+			off += thislen;
+			len -= thislen;
+
+			/*
+			 * Fall through to create new requests for any remaining 'len' that
+			 * didn't fit in the previous chunk.
+			 */
+		}
+	}
+
+	/* Divide the request into pieces of CHUNK_SIZE bytes each */
+	while (len > 0)
+	{
+		int32		thislen;
 
-		begin += len;
+		/* if the queue is full, perform all the work queued up so far */
+		if (src->num_requests == MAX_CHUNKS_PER_QUERY)
+			process_queued_fetch_requests(src);
 
-		snprintf(linebuf, sizeof(linebuf), "%s\t" UINT64_FORMAT "\t%u\n", path, begin, len);
+		thislen = Min(len, CHUNK_SIZE);
+		src->request_queue[src->num_requests].path = path;
+		src->request_queue[src->num_requests].offset = off;
+		src->request_queue[src->num_requests].length = thislen;
+		src->num_requests++;
 
-		if (PQputCopyData(src->conn, linebuf, strlen(linebuf)) != 1)
-			pg_fatal("could not send COPY data: %s",
-					 PQerrorMessage(src->conn));
+		off += thislen;
+		len -= thislen;
 	}
 }
 
-
-/*----
- * Runs a query, which returns pieces of files from the remote source data
- * directory, and overwrites the corresponding parts of target files with
- * the received parts. The result set is expected to be of format:
- *
- * path		text	-- path in the data directory, e.g "base/1/123"
- * begin	int8	-- offset within the file
- * chunk	bytea	-- file content
- *----
- */
 /*
- * Receive all the queued chunks and write them to the target data directory.
+ * Fetche all the queued chunks and writes them to the target data directory.
  */
 static void
 libpq_finish_fetch(rewind_source *source)
 {
-	libpq_source *src = (libpq_source *) source;
+	process_queued_fetch_requests((libpq_source *) source);
+}
+
+/*
+ * Receive all the queued chunks and write them to the target data directory.
+ */
+static void
+process_queued_fetch_requests(libpq_source *src)
+{
+	const char *params[3];
 	PGresult   *res;
-	const char *sql;
+	int			chunkno;
 
-	if (PQputCopyEnd(src->conn, NULL) != 1)
-		pg_fatal("could not send end-of-COPY: %s",
-				 PQerrorMessage(src->conn));
+	if (src->num_requests == 0)
+		return;
 
-	while ((res = PQgetResult(src->conn)) != NULL)
+	pg_log_debug("getting %d file chunks", src->num_requests);
+
+	/*
+	 * The prepared statement, 'fetch_chunks_stmt', takes three arrays
+	 * with the same length as parameters: paths, offsets and lengths.
+	 * Construct the string representations of the parameter arrays.
+	 */
+	resetStringInfo(&src->paths);
+	resetStringInfo(&src->offsets);
+	resetStringInfo(&src->lengths);
+
+	appendStringInfoChar(&src->paths, '{');
+	appendStringInfoChar(&src->offsets, '{');
+	appendStringInfoChar(&src->lengths, '{');
+	for (int i = 0; i < src->num_requests; i++)
 	{
-		if (PQresultStatus(res) != PGRES_COMMAND_OK)
-			pg_fatal("unexpected result while sending file list: %s",
-					 PQresultErrorMessage(res));
-		PQclear(res);
+		fetch_range_request *rq = &src->request_queue[i];
+
+		if (i > 0)
+		{
+			appendStringInfoChar(&src->paths, ',');
+			appendStringInfoChar(&src->offsets, ',');
+			appendStringInfoChar(&src->lengths, ',');
+		}
+
+		appendArrayEscapedString(&src->paths, rq->path);
+		appendStringInfo(&src->offsets, INT64_FORMAT, rq->offset);
+		appendStringInfo(&src->lengths, "%d", rq->length);
 	}
+	appendStringInfoChar(&src->paths, '}');
+	appendStringInfoChar(&src->offsets, '}');
+	appendStringInfoChar(&src->lengths, '}');
 
 	/*
-	 * We've now copied the list of file ranges that we need to fetch to the
-	 * temporary table. Now, actually fetch all of those ranges.
+	 * Execute the prepared statement.
 	 */
-	sql =
-		"SELECT path, begin,\n"
-		"  pg_read_binary_file(path, begin, len, true) AS chunk\n"
-		"FROM fetchchunks\n";
+	params[0] = src->paths.data;
+	params[1] = src->offsets.data;
+	params[2] = src->lengths.data;
 
-	if (PQsendQueryParams(src->conn, sql, 0, NULL, NULL, NULL, NULL, 1) != 1)
+	if (PQsendQueryPrepared(src->conn, "fetch_chunks_stmt", 3, params, NULL, NULL, 1) != 1)
 		pg_fatal("could not send query: %s", PQerrorMessage(src->conn));
 
-	pg_log_debug("getting file chunks");
-
 	if (PQsetSingleRowMode(src->conn) != 1)
 		pg_fatal("could not set libpq connection to single row mode");
 
+	/*----
+	 * The result set is of format:
+	 *
+	 * path		text	-- path in the data directory, e.g "base/1/123"
+	 * begin	int8	-- offset within the file
+	 * chunk	bytea	-- file content
+	 *----
+	 */
+	chunkno = 0;
 	while ((res = PQgetResult(src->conn)) != NULL)
 	{
+		fetch_range_request *rq = &src->request_queue[chunkno];
 		char	   *filename;
 		int			filenamelen;
 		int64		chunkoff;
@@ -402,6 +472,9 @@ libpq_finish_fetch(rewind_source *source)
 						 PQresultErrorMessage(res));
 		}
 
+		if (chunkno > src->num_requests)
+			pg_fatal("received more data chunks than requested");
+
 		/* sanity check the result set */
 		if (PQnfields(res) != 3 || PQntuples(res) != 1)
 			pg_fatal("unexpected result set size while fetching remote files");
@@ -446,9 +519,7 @@ libpq_finish_fetch(rewind_source *source)
 		 * If a file has been deleted on the source, remove it on the target
 		 * as well.  Note that multiple unlink() calls may happen on the same
 		 * file if multiple data chunks are associated with it, hence ignore
-		 * unconditionally anything missing.  If this file is not a relation
-		 * data file, then it has been already truncated when creating the
-		 * file chunk list at the previous execution of the filemap.
+		 * unconditionally anything missing.
 		 */
 		if (PQgetisnull(res, 0, 2))
 		{
@@ -463,14 +534,54 @@ libpq_finish_fetch(rewind_source *source)
 		pg_log_debug("received chunk for file \"%s\", offset %lld, size %d",
 					 filename, (long long int) chunkoff, chunksize);
 
+		if (strcmp(filename, rq->path) != 0)
+		{
+			pg_fatal("received data for file \"%s\", when requested for \"%s\"",
+					 filename, rq->path);
+		}
+		if (chunkoff != rq->offset)
+			pg_fatal("received data at offset " UINT64_FORMAT" of file \"%s\", when requested for offset " UINT64_FORMAT,
+					 chunkoff, rq->path, rq->offset);
+		if (chunksize > rq->length)
+		{
+			pg_fatal("received more than requested for file \"%s\"",
+					 rq->path);
+			/* receiving less is OK, though */
+		}
+
 		open_target_file(filename, false);
-
 		write_target_range(chunk, chunkoff, chunksize);
 
 		pg_free(filename);
 
 		PQclear(res);
+		chunkno++;
 	}
+	if (chunkno != src->num_requests)
+		pg_fatal("unexpected number of data chunks received");
+
+	src->num_requests = 0;
+}
+
+/*
+ * Escape a string to be used as element in a text array constant
+ */
+static void
+appendArrayEscapedString(StringInfo buf, const char *str)
+{
+	appendStringInfoCharMacro(buf, '\"');
+	while (*str)
+	{
+		char		ch = *str;
+
+		if (ch == '"' || ch == '\\')
+			appendStringInfoCharMacro(buf, '\\');
+
+		appendStringInfoCharMacro(buf, ch);
+
+		str++;
+	}
+	appendStringInfoCharMacro(buf, '\"');
 }
 
 /*
diff --git a/src/bin/pg_rewind/pg_rewind.c b/src/bin/pg_rewind/pg_rewind.c
index c5ee70272a..e4e773deeb 100644
--- a/src/bin/pg_rewind/pg_rewind.c
+++ b/src/bin/pg_rewind/pg_rewind.c
@@ -45,6 +45,7 @@ static void disconnect_atexit(void);
 
 static ControlFileData ControlFile_target;
 static ControlFileData ControlFile_source;
+static ControlFileData ControlFile_source_after;
 
 const char *progname;
 int			WalSegSz;
@@ -446,30 +447,76 @@ main(int argc, char **argv)
 
 	progress_report(true);
 
+	/*
+	 * Fetch the control file from the source last. This ensures that the
+	 * minRecoveryPoint is up-to-date.
+	 */
+	buffer = source->fetch_file(source, "global/pg_control", &size);
+	digestControlFile(&ControlFile_source_after, buffer, size);
+	pg_free(buffer);
+
+	/*
+	 * Sanity check: If the source is a local system, the control file should
+	 * not have changed since we started.
+	 *
+	 * XXX: We assume it hasn't been modified, but actually, what could go
+	 * wrong? The logic handles a libpq source that's modified concurrently,
+	 * why not a local datadir?
+	 */
+	if (datadir_source &&
+		memcmp(&ControlFile_source, &ControlFile_source_after,
+			   sizeof(ControlFileData)) != 0)
+	{
+		pg_fatal("source system was modified while pg_rewind was running");
+	}
+
 	if (showprogress)
 		pg_log_info("creating backup label and updating control file");
 	createBackupLabel(chkptredo, chkpttli, chkptrec);
 
 	/*
 	 * Update control file of target. Make it ready to perform archive
-	 * recovery when restarting.
+	 * recovery when restarting, starting from the last common checkpoint.
 	 *
-	 * minRecoveryPoint is set to the current WAL insert location in the
-	 * source server. Like in an online backup, it's important that we recover
-	 * all the WAL that was generated while we copied the files over.
+	 * Like in an online backup, it's important that we replay all the WAL
+	 * that was generated while we copied the files over. To enforce that,
+	 * set 'minRecoveryPoint' in the control file.
 	 */
-	memcpy(&ControlFile_new, &ControlFile_source, sizeof(ControlFileData));
-
 	if (connstr_source)
 	{
-		endrec = source->get_current_wal_insert_lsn(source);
-		endtli = ControlFile_source.checkPointCopy.ThisTimeLineID;
+		if (ControlFile_source_after.state == DB_IN_ARCHIVE_RECOVERY)
+		{
+			/*
+			 * Source is a standby server. We must replay to its
+			 * minRecoveryPoint.
+			 */
+			endrec = ControlFile_source_after.minRecoveryPoint;
+			endtli = ControlFile_source_after.minRecoveryPointTLI;
+		}
+		else
+		{
+			/*
+			 * Source is a production, non-standby, server. We must recover up
+			 * to the last WAL insert location.
+			 */
+			if (ControlFile_source_after.state != DB_IN_PRODUCTION)
+				pg_fatal("source system was in unexpected state at end of rewind");
+
+			endrec = source->get_current_wal_insert_lsn(source);
+			endtli = ControlFile_source_after.checkPointCopy.ThisTimeLineID;
+		}
 	}
 	else
 	{
-		endrec = ControlFile_source.checkPoint;
-		endtli = ControlFile_source.checkPointCopy.ThisTimeLineID;
+		/*
+		 * Source is a local data directory. It should've shut down cleanly,
+		 * and we must to the latest shutdown checkpoint.
+		 */
+		endrec = ControlFile_source_after.checkPoint;
+		endtli = ControlFile_source_after.checkPointCopy.ThisTimeLineID;
 	}
+
+	memcpy(&ControlFile_new, &ControlFile_source_after, sizeof(ControlFileData));
 	ControlFile_new.minRecoveryPoint = endrec;
 	ControlFile_new.minRecoveryPointTLI = endtli;
 	ControlFile_new.state = DB_IN_ARCHIVE_RECOVERY;
-- 
2.18.4

