From 4a8fa778fe5ee1807b91d2587775b7a7ad250829 Mon Sep 17 00:00:00 2001
From: Heikki Linnakangas <heikki.linnakangas@iki.fi>
Date: Tue, 23 Jan 2024 11:16:23 +0200
Subject: [PATCH v3 3/4] Use libpq-be-fe-helpers.h wrappers more

---
 .../libpqwalreceiver/libpqwalreceiver.c       | 148 ++++--------------
 1 file changed, 31 insertions(+), 117 deletions(-)

diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
index db779dc6ca6..c60a121093c 100644
--- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
+++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
@@ -102,8 +102,6 @@ static WalReceiverFunctionsType PQWalReceiverFunctions = {
 };
 
 /* Prototypes for private functions */
-static PGresult *libpqrcv_PQexec(PGconn *streamConn, const char *query);
-static PGresult *libpqrcv_PQgetResult(PGconn *streamConn);
 static char *stringlist_to_identifierstr(PGconn *conn, List *strings);
 
 /*
@@ -212,8 +210,9 @@ libpqrcv_connect(const char *conninfo, bool logical, bool must_use_password,
 	{
 		PGresult   *res;
 
-		res = libpqrcv_PQexec(conn->streamConn,
-							  ALWAYS_SECURE_SEARCH_PATH_SQL);
+		res = libpqsrv_exec(conn->streamConn,
+							ALWAYS_SECURE_SEARCH_PATH_SQL,
+							WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
 		if (PQresultStatus(res) != PGRES_TUPLES_OK)
 		{
 			PQclear(res);
@@ -385,7 +384,9 @@ libpqrcv_identify_system(WalReceiverConn *conn, TimeLineID *primary_tli)
 	 * Get the system identifier and timeline ID as a DataRow message from the
 	 * primary server.
 	 */
-	res = libpqrcv_PQexec(conn->streamConn, "IDENTIFY_SYSTEM");
+	res = libpqsrv_exec(conn->streamConn,
+						"IDENTIFY_SYSTEM",
+						WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
 	if (PQresultStatus(res) != PGRES_TUPLES_OK)
 	{
 		PQclear(res);
@@ -518,7 +519,9 @@ libpqrcv_startstreaming(WalReceiverConn *conn,
 						 options->proto.physical.startpointTLI);
 
 	/* Start streaming. */
-	res = libpqrcv_PQexec(conn->streamConn, cmd.data);
+	res = libpqsrv_exec(conn->streamConn,
+						cmd.data,
+						WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
 	pfree(cmd.data);
 
 	if (PQresultStatus(res) == PGRES_COMMAND_OK)
@@ -548,7 +551,7 @@ libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli)
 	PGresult   *res;
 
 	/*
-	 * Send copy-end message.  As in libpqrcv_PQexec, this could theoretically
+	 * Send copy-end message.  As in libpqsrv_exec, this could theoretically
 	 * block, but the risk seems small.
 	 */
 	if (PQputCopyEnd(conn->streamConn, NULL) <= 0 ||
@@ -568,7 +571,8 @@ libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli)
 	 * If we had not yet received CopyDone from the backend, PGRES_COPY_OUT is
 	 * also possible in case we aborted the copy in mid-stream.
 	 */
-	res = libpqrcv_PQgetResult(conn->streamConn);
+	res = libpqsrv_get_result(conn->streamConn,
+							  WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
 	if (PQresultStatus(res) == PGRES_TUPLES_OK)
 	{
 		/*
@@ -583,7 +587,8 @@ libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli)
 		PQclear(res);
 
 		/* the result set should be followed by CommandComplete */
-		res = libpqrcv_PQgetResult(conn->streamConn);
+		res = libpqsrv_get_result(conn->streamConn,
+								  WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
 	}
 	else if (PQresultStatus(res) == PGRES_COPY_OUT)
 	{
@@ -597,7 +602,8 @@ libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli)
 							pchomp(PQerrorMessage(conn->streamConn)))));
 
 		/* CommandComplete should follow */
-		res = libpqrcv_PQgetResult(conn->streamConn);
+		res = libpqsrv_get_result(conn->streamConn,
+								  WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
 	}
 
 	if (PQresultStatus(res) != PGRES_COMMAND_OK)
@@ -608,7 +614,8 @@ libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli)
 	PQclear(res);
 
 	/* Verify that there are no more results */
-	res = libpqrcv_PQgetResult(conn->streamConn);
+	res = libpqsrv_get_result(conn->streamConn,
+							  WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
 	if (res != NULL)
 		ereport(ERROR,
 				(errcode(ERRCODE_PROTOCOL_VIOLATION),
@@ -633,7 +640,9 @@ libpqrcv_readtimelinehistoryfile(WalReceiverConn *conn,
 	 * Request the primary to send over the history file for given timeline.
 	 */
 	snprintf(cmd, sizeof(cmd), "TIMELINE_HISTORY %u", tli);
-	res = libpqrcv_PQexec(conn->streamConn, cmd);
+	res = libpqsrv_exec(conn->streamConn,
+						cmd,
+						WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
 	if (PQresultStatus(res) != PGRES_TUPLES_OK)
 	{
 		PQclear(res);
@@ -663,107 +672,6 @@ libpqrcv_readtimelinehistoryfile(WalReceiverConn *conn,
 	PQclear(res);
 }
 
-/*
- * Send a query and wait for the results by using the asynchronous libpq
- * functions and socket readiness events.
- *
- * The function is modeled on libpqsrv_exec(), with the behavior difference
- * being that it calls ProcessWalRcvInterrupts().  As an optimization, it
- * skips try/catch, since all errors terminate the process.
- *
- * May return NULL, rather than an error result, on failure.
- */
-static PGresult *
-libpqrcv_PQexec(PGconn *streamConn, const char *query)
-{
-	PGresult   *lastResult = NULL;
-
-	/*
-	 * PQexec() silently discards any prior query results on the connection.
-	 * This is not required for this function as it's expected that the caller
-	 * (which is this library in all cases) will behave correctly and we don't
-	 * have to be backwards compatible with old libpq.
-	 */
-
-	/*
-	 * Submit the query.  Since we don't use non-blocking mode, this could
-	 * theoretically block.  In practice, since we don't send very long query
-	 * strings, the risk seems negligible.
-	 */
-	if (!PQsendQuery(streamConn, query))
-		return NULL;
-
-	for (;;)
-	{
-		/* Wait for, and collect, the next PGresult. */
-		PGresult   *result;
-
-		result = libpqrcv_PQgetResult(streamConn);
-		if (result == NULL)
-			break;				/* query is complete, or failure */
-
-		/*
-		 * Emulate PQexec()'s behavior of returning the last result when there
-		 * are many.  We are fine with returning just last error message.
-		 */
-		PQclear(lastResult);
-		lastResult = result;
-
-		if (PQresultStatus(lastResult) == PGRES_COPY_IN ||
-			PQresultStatus(lastResult) == PGRES_COPY_OUT ||
-			PQresultStatus(lastResult) == PGRES_COPY_BOTH ||
-			PQstatus(streamConn) == CONNECTION_BAD)
-			break;
-	}
-
-	return lastResult;
-}
-
-/*
- * Perform the equivalent of PQgetResult(), but watch for interrupts.
- */
-static PGresult *
-libpqrcv_PQgetResult(PGconn *streamConn)
-{
-	/*
-	 * Collect data until PQgetResult is ready to get the result without
-	 * blocking.
-	 */
-	while (PQisBusy(streamConn))
-	{
-		int			rc;
-
-		/*
-		 * We don't need to break down the sleep into smaller increments,
-		 * since we'll get interrupted by signals and can handle any
-		 * interrupts here.
-		 */
-		rc = WaitLatchOrSocket(MyLatch,
-							   WL_EXIT_ON_PM_DEATH | WL_SOCKET_READABLE |
-							   WL_LATCH_SET,
-							   PQsocket(streamConn),
-							   0,
-							   WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
-
-		/* Interrupted? */
-		if (rc & WL_LATCH_SET)
-		{
-			ResetLatch(MyLatch);
-			CHECK_FOR_INTERRUPTS();
-		}
-
-		/* Consume whatever data is available from the socket */
-		if (PQconsumeInput(streamConn) == 0)
-		{
-			/* trouble; return NULL */
-			return NULL;
-		}
-	}
-
-	/* Now we can collect and return the next PGresult */
-	return PQgetResult(streamConn);
-}
-
 /*
  * Disconnect connection to primary, if any.
  */
@@ -824,13 +732,15 @@ libpqrcv_receive(WalReceiverConn *conn, char **buffer,
 	{
 		PGresult   *res;
 
-		res = libpqrcv_PQgetResult(conn->streamConn);
+		res = libpqsrv_get_result(conn->streamConn,
+								  WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
 		if (PQresultStatus(res) == PGRES_COMMAND_OK)
 		{
 			PQclear(res);
 
 			/* Verify that there are no more results. */
-			res = libpqrcv_PQgetResult(conn->streamConn);
+			res = libpqsrv_get_result(conn->streamConn,
+									  WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
 			if (res != NULL)
 			{
 				PQclear(res);
@@ -972,7 +882,9 @@ libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname,
 			appendStringInfoString(&cmd, " PHYSICAL RESERVE_WAL");
 	}
 
-	res = libpqrcv_PQexec(conn->streamConn, cmd.data);
+	res = libpqsrv_exec(conn->streamConn,
+						cmd.data,
+						WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
 	pfree(cmd.data);
 
 	if (PQresultStatus(res) != PGRES_TUPLES_OK)
@@ -1099,7 +1011,9 @@ libpqrcv_exec(WalReceiverConn *conn, const char *query,
 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
 				 errmsg("the query interface requires a database connection")));
 
-	pgres = libpqrcv_PQexec(conn->streamConn, query);
+	pgres = libpqsrv_exec(conn->streamConn,
+						  query,
+						  WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
 
 	switch (PQresultStatus(pgres))
 	{
-- 
2.39.2

