>From 307209588737de34573d39d2b2376ce6f689a0f6 Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horiguchi.kyotaro@lab.ntt.co.jp>
Date: Fri, 26 Jun 2015 17:31:26 +0900
Subject: [PATCH 3/3] Add experimental (POC) adaptive fetch size feature.

---
 contrib/postgres_fdw/postgres_fdw.c | 114 ++++++++++++++++++++++++++++++++++--
 1 file changed, 108 insertions(+), 6 deletions(-)

diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c
index 40cac3b..108b4ba 100644
--- a/contrib/postgres_fdw/postgres_fdw.c
+++ b/contrib/postgres_fdw/postgres_fdw.c
@@ -48,6 +48,27 @@ PG_MODULE_MAGIC;
 /* Default CPU cost to process 1 row (above and beyond cpu_tuple_cost). */
 #define DEFAULT_FDW_TUPLE_COST		0.01
 
+/* Fetch size at startup. This might be better be a GUC parameter */
+#define MIN_FETCH_SIZE 100
+
+/* Maximum fetch size. This might be better be a GUC parameter */
+#define MAX_FETCH_SIZE 1000
+
+/*
+ * Maximum size for fetch buffer in kilobytes. Ditto.
+ *
+ * This should be far larger than sizeof(HeapTuple) * FETCH_SIZE_MAX. This is
+ * not a hard limit because we cannot know in advance the average row length
+ * returned.
+ */
+#define MAX_FETCH_BUFFER_SIZE 10000	/* 10MB */
+
+/* Maximum duration allowed for a single fetch, in milliseconds */
+#define MAX_FETCH_DURATION 500
+
+/* Number of successive async fetches to enlarge fetch_size */
+#define INCREASE_FETCH_SIZE_THRESHOLD 8
+
 /*
  * FDW-specific planner information kept in RelOptInfo.fdw_private for a
  * foreign table.  This information is collected by postgresGetForeignRelSize.
@@ -157,6 +178,12 @@ typedef struct PgFdwScanState
 	HeapTuple  *tuples;			/* array of currently-retrieved tuples */
 	int			num_tuples;		/* # of tuples in array */
 	int			next_tuple;		/* index of next one to return */
+	int			fetch_size;		/* rows to be fetched at once */
+	int			successive_async; /* # of successive fetches at this
+                                    fetch_size */
+	long		last_fetch_req_at;  /* The time of the last fetch request, in
+									 * milliseconds*/
+	int			last_buf_size;	/* Buffer size required for the last fetch */
 
 	/* batch-level state, for optimizing rewinds and avoiding useless fetch */
 	int			fetch_ct_2;		/* Min(# of fetches done, 2) */
@@ -886,6 +913,7 @@ postgresGetForeignPlan(PlannerInfo *root,
 							NIL /* no custom tlist */ );
 }
 
+
 /* call back function to kick the query to start on remote */
 static void
 postgresPreExecCallback(EState *estate, Node *node)
@@ -1015,6 +1043,10 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags)
 
 	fsstate->econtext = node->ss.ps.ps_ExprContext;
 
+	fsstate->fetch_size = MIN_FETCH_SIZE;
+	fsstate->successive_async = 0;
+	fsstate->last_buf_size = 0;
+
 	/*
 	 * Register this node to be asynchronously executed if this is the first
 	 * scan on this connection
@@ -2092,18 +2124,72 @@ fetch_more_data(PgFdwScanState *fsstate, fetch_mode cmd)
 	{
 		PgFdwConn  *conn = fsstate->conn;
 		char		sql[64];
-		int			fetch_size;
 		int			numrows, addrows, restrows;
 		HeapTuple  *tmptuples;
+		int			prev_fetch_size = fsstate->fetch_size;
+		int 		new_fetch_size = fsstate->fetch_size;
 		int			i;
+		struct timeval tv = {0, 0};
+		long		current_time;
 		int			fetch_buf_size;
 
-		/* The fetch size is arbitrary, but shouldn't be enormous. */
-		fetch_size = 100;
+		gettimeofday(&tv, NULL);
+		current_time = tv.tv_sec * 1000 + tv.tv_usec / 1000;
+
+		/*
+		 * Calculate adaptive fetch size
+		 *
+		 * Calculate fetch_size based on maximal allowed duration and buffer
+		 * space. The fetch buffer size shouldn't be enormous so we try to
+		 * keep it under MAX_FETCH_BUFFER_SIZE.
+		 */
+
+		/* Decrease fetch_size if the previous required buffer size exceeded
+		 * MAX_FETCH_BUFFER_SIZE.*/
+		if (fsstate->last_buf_size > MAX_FETCH_BUFFER_SIZE)
+		{
+			new_fetch_size =
+				(int)((double)fsstate->fetch_size * MAX_FETCH_BUFFER_SIZE /
+					  fsstate->last_buf_size);
+		}
+		/*
+		 * Decrease fetch_size to twice if the last duration to fetch was too
+		 * long.
+		 */
+		if (PFCisBusy(conn) &&
+			fsstate->fetch_size > MIN_FETCH_SIZE &&
+			fsstate->last_fetch_req_at + MAX_FETCH_DURATION <
+			current_time)
+		{
+			int tmp_fetch_size = fsstate->fetch_size / 2;
+			if (tmp_fetch_size < new_fetch_size)
+				new_fetch_size = tmp_fetch_size;
+		}
+
+		/*
+		 * Increase fetch_size to twice if not decreased so far and other
+		 * conditions match.
+		 */
+		if (new_fetch_size == fsstate->fetch_size &&
+			fsstate->successive_async >= INCREASE_FETCH_SIZE_THRESHOLD &&
+			fsstate->fetch_size < MAX_FETCH_SIZE)
+			new_fetch_size *= 2;
+
+		/* Change fetch_size as calculated above */
+		if (new_fetch_size != fsstate->fetch_size)
+		{
+			if (new_fetch_size > MAX_FETCH_SIZE)
+				fsstate->fetch_size = MAX_FETCH_SIZE;
+			else if (new_fetch_size < MIN_FETCH_SIZE)
+				fsstate->fetch_size = MIN_FETCH_SIZE;
+			else
+				fsstate->fetch_size = new_fetch_size;
+			fsstate->successive_async = 0;
+		}
 
 		/* Make the query to fetch tuples */
 		snprintf(sql, sizeof(sql), "FETCH %d FROM c%u",
-				 fetch_size, fsstate->cursor_number);
+				 fsstate->fetch_size, fsstate->cursor_number);
 
 		if (PFCisAsyncRunning(conn))
 		{
@@ -2123,7 +2209,7 @@ fetch_more_data(PgFdwScanState *fsstate, fetch_mode cmd)
 			{
 				/* Get result of running async fetch */
 				res = PFCgetResult(conn);
-				if (PQntuples(res) == fetch_size)
+				if (PQntuples(res) == prev_fetch_size)
 				{
 					/*
 					 * Connection state doesn't go to IDLE even if all data
@@ -2144,6 +2230,7 @@ fetch_more_data(PgFdwScanState *fsstate, fetch_mode cmd)
 				if (!PFCsendQuery(conn, sql))
 					pgfdw_report_error(ERROR, res, conn, false,
 									   fsstate->query);
+				fsstate->last_fetch_req_at = current_time;
 
 				PFCsetAsyncScan(conn, fsstate);
 				goto end_of_fetch;
@@ -2188,12 +2275,14 @@ fetch_more_data(PgFdwScanState *fsstate, fetch_mode cmd)
 			fetch_buf_size += (HEAPTUPLESIZE + tup->t_len);
 		}
 
+		fsstate->last_buf_size = fetch_buf_size / 1024; /* in kilobytes */
+
 		/* Update fetch_ct_2 */
 		if (fsstate->fetch_ct_2 < 2)
 			fsstate->fetch_ct_2++;
 
 		/* Must be EOF if we didn't get as many tuples as we asked for. */
-		fsstate->eof_reached = (numrows < fetch_size);
+		fsstate->eof_reached = (numrows < prev_fetch_size);
 
 		PQclear(res);
 		res = NULL;
@@ -2208,6 +2297,7 @@ fetch_more_data(PgFdwScanState *fsstate, fetch_mode cmd)
 				 */
 				if (!PFCsendQuery(conn, sql))
 					pgfdw_report_error(ERROR, res, conn, false, fsstate->query);
+				fsstate->last_fetch_req_at = current_time;
 				PFCsetAsyncScan(conn, fsstate);
 			}
 		}
@@ -2223,6 +2313,18 @@ end_of_fetch:
 	}
 	PG_END_TRY();
 
+	if (PFCisAsyncRunning(fsstate->conn))
+	{
+		if (fsstate->successive_async < INCREASE_FETCH_SIZE_THRESHOLD)
+			fsstate->successive_async++;
+	}
+	else
+	{
+		/* Reset fetch_size if the async_fetch stopped */
+		fsstate->successive_async = 0;
+		fsstate->fetch_size = MIN_FETCH_SIZE;
+	}
+
 	MemoryContextSwitchTo(oldcontext);
 }
 
-- 
1.8.3.1

