>From 4e5937d33e92d908462d567fa3264ae11404ecac Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horiguchi.kyotaro@lab.ntt.co.jp>
Date: Wed, 21 Jan 2015 17:18:09 +0900
Subject: [PATCH 1/2] Size limitation feature of FETCH v1

- Row size calculation is based on detoasted size.
---
 contrib/auto_explain/auto_explain.c             |   8 +-
 contrib/pg_stat_statements/pg_stat_statements.c |   8 +-
 contrib/postgres_fdw/postgres_fdw.c             | 103 +++++++++++++++++++-----
 src/backend/access/common/heaptuple.c           |  36 +++++++++
 src/backend/commands/copy.c                     |   2 +-
 src/backend/commands/createas.c                 |   2 +-
 src/backend/commands/explain.c                  |   2 +-
 src/backend/commands/extension.c                |   2 +-
 src/backend/commands/matview.c                  |   2 +-
 src/backend/commands/portalcmds.c               |   3 +-
 src/backend/commands/prepare.c                  |   2 +-
 src/backend/executor/execMain.c                 |  33 ++++++--
 src/backend/executor/execUtils.c                |   1 +
 src/backend/executor/functions.c                |   2 +-
 src/backend/executor/spi.c                      |   4 +-
 src/backend/parser/gram.y                       |  54 +++++++++++++
 src/backend/tcop/postgres.c                     |   2 +
 src/backend/tcop/pquery.c                       |  87 +++++++++++++-------
 src/include/access/htup_details.h               |   2 +
 src/include/executor/executor.h                 |   8 +-
 src/include/nodes/execnodes.h                   |   1 +
 src/include/nodes/parsenodes.h                  |   1 +
 src/include/tcop/pquery.h                       |   3 +-
 src/interfaces/ecpg/preproc/Makefile            |   2 +-
 src/interfaces/ecpg/preproc/ecpg.addons         |  63 +++++++++++++++
 25 files changed, 353 insertions(+), 80 deletions(-)

diff --git a/contrib/auto_explain/auto_explain.c b/contrib/auto_explain/auto_explain.c
index 2a184ed..f121a33 100644
--- a/contrib/auto_explain/auto_explain.c
+++ b/contrib/auto_explain/auto_explain.c
@@ -57,7 +57,7 @@ void		_PG_fini(void);
 static void explain_ExecutorStart(QueryDesc *queryDesc, int eflags);
 static void explain_ExecutorRun(QueryDesc *queryDesc,
 					ScanDirection direction,
-					long count);
+					long count, long size);
 static void explain_ExecutorFinish(QueryDesc *queryDesc);
 static void explain_ExecutorEnd(QueryDesc *queryDesc);
 
@@ -232,15 +232,15 @@ explain_ExecutorStart(QueryDesc *queryDesc, int eflags)
  * ExecutorRun hook: all we need do is track nesting depth
  */
 static void
-explain_ExecutorRun(QueryDesc *queryDesc, ScanDirection direction, long count)
+explain_ExecutorRun(QueryDesc *queryDesc, ScanDirection direction, long count, long size)
 {
 	nesting_level++;
 	PG_TRY();
 	{
 		if (prev_ExecutorRun)
-			prev_ExecutorRun(queryDesc, direction, count);
+			prev_ExecutorRun(queryDesc, direction, count, size);
 		else
-			standard_ExecutorRun(queryDesc, direction, count);
+			standard_ExecutorRun(queryDesc, direction, count, size);
 		nesting_level--;
 	}
 	PG_CATCH();
diff --git a/contrib/pg_stat_statements/pg_stat_statements.c b/contrib/pg_stat_statements/pg_stat_statements.c
index 2629bfc..a68c11d 100644
--- a/contrib/pg_stat_statements/pg_stat_statements.c
+++ b/contrib/pg_stat_statements/pg_stat_statements.c
@@ -282,7 +282,7 @@ static void pgss_post_parse_analyze(ParseState *pstate, Query *query);
 static void pgss_ExecutorStart(QueryDesc *queryDesc, int eflags);
 static void pgss_ExecutorRun(QueryDesc *queryDesc,
 				 ScanDirection direction,
-				 long count);
+				 long count, long size);
 static void pgss_ExecutorFinish(QueryDesc *queryDesc);
 static void pgss_ExecutorEnd(QueryDesc *queryDesc);
 static void pgss_ProcessUtility(Node *parsetree, const char *queryString,
@@ -863,15 +863,15 @@ pgss_ExecutorStart(QueryDesc *queryDesc, int eflags)
  * ExecutorRun hook: all we need do is track nesting depth
  */
 static void
-pgss_ExecutorRun(QueryDesc *queryDesc, ScanDirection direction, long count)
+pgss_ExecutorRun(QueryDesc *queryDesc, ScanDirection direction, long count, long size)
 {
 	nested_level++;
 	PG_TRY();
 	{
 		if (prev_ExecutorRun)
-			prev_ExecutorRun(queryDesc, direction, count);
+			prev_ExecutorRun(queryDesc, direction, count, size);
 		else
-			standard_ExecutorRun(queryDesc, direction, count);
+			standard_ExecutorRun(queryDesc, direction, count, size);
 		nested_level--;
 	}
 	PG_CATCH();
diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c
index d76e739..b3bf27e 100644
--- a/contrib/postgres_fdw/postgres_fdw.c
+++ b/contrib/postgres_fdw/postgres_fdw.c
@@ -46,6 +46,11 @@ PG_MODULE_MAGIC;
 /* Default CPU cost to process 1 row (above and beyond cpu_tuple_cost). */
 #define DEFAULT_FDW_TUPLE_COST		0.01
 
+/* Maximum tuples per fetch */
+#define MAX_FETCH_SIZE				30000
+
+/* Maximum memory usable for retrieved data  */
+#define MAX_FETCH_MEM				(2 * 1024 * 1024)
 /*
  * FDW-specific planner information kept in RelOptInfo.fdw_private for a
  * foreign table.  This information is collected by postgresGetForeignRelSize.
@@ -156,6 +161,8 @@ typedef struct PgFdwScanState
 	/* working memory contexts */
 	MemoryContext batch_cxt;	/* context holding current batch of tuples */
 	MemoryContext temp_cxt;		/* context for per-tuple temporary data */
+
+	long		max_palloced_mem; /* For test, remove me later */
 } PgFdwScanState;
 
 /*
@@ -321,6 +328,8 @@ static int postgresAcquireSampleRowsFunc(Relation relation, int elevel,
 							  double *totaldeadrows);
 static void analyze_row_processor(PGresult *res, int row,
 					  PgFdwAnalyzeState *astate);
+static Size estimate_tuple_overhead(TupleDesc tupDesc,
+									List *retrieved_attrs);
 static HeapTuple make_tuple_from_result_row(PGresult *res,
 						   int row,
 						   Relation rel,
@@ -1095,6 +1104,7 @@ postgresEndForeignScan(ForeignScanState *node)
 	if (fsstate == NULL)
 		return;
 
+	elog(LOG, "Max memory fo tuple store = %ld", fsstate->max_palloced_mem);
 	/* Close the cursor if open, to prevent accumulation of cursors */
 	if (fsstate->cursor_exists)
 		close_cursor(fsstate->conn, fsstate->cursor_number);
@@ -2029,12 +2039,18 @@ fetch_more_data(ForeignScanState *node)
 		int			fetch_size;
 		int			numrows;
 		int			i;
+		long		alloc_size = 0;
 
 		/* The fetch size is arbitrary, but shouldn't be enormous. */
-		fetch_size = 100;
-
-		snprintf(sql, sizeof(sql), "FETCH %d FROM c%u",
-				 fetch_size, fsstate->cursor_number);
+		fetch_size = MAX_FETCH_MEM -
+			MAX_FETCH_SIZE *
+				estimate_tuple_overhead(fsstate->attinmeta->tupdesc,
+										fsstate->retrieved_attrs);
+
+		snprintf(sql, sizeof(sql), "FETCH %d LIMIT %ld FROM c%u",
+				 MAX_FETCH_SIZE,
+				 fetch_size,
+				 fsstate->cursor_number);
 
 		res = PQexec(conn, sql);
 		/* On error, report the original query, not the FETCH. */
@@ -2043,27 +2059,34 @@ fetch_more_data(ForeignScanState *node)
 
 		/* Convert the data into HeapTuples */
 		numrows = PQntuples(res);
-		fsstate->tuples = (HeapTuple *) palloc0(numrows * sizeof(HeapTuple));
-		fsstate->num_tuples = numrows;
-		fsstate->next_tuple = 0;
-
-		for (i = 0; i < numrows; i++)
+		if (numrows == 0)
+			fsstate->eof_reached;
+		else
 		{
-			fsstate->tuples[i] =
-				make_tuple_from_result_row(res, i,
-										   fsstate->rel,
-										   fsstate->attinmeta,
-										   fsstate->retrieved_attrs,
-										   fsstate->temp_cxt);
-		}
+			alloc_size = numrows * sizeof(HeapTuple);
+			fsstate->tuples = (HeapTuple *) palloc0(alloc_size);
+			fsstate->num_tuples = numrows;
+			fsstate->next_tuple = 0;
 
-		/* Update fetch_ct_2 */
-		if (fsstate->fetch_ct_2 < 2)
-			fsstate->fetch_ct_2++;
+			for (i = 0; i < numrows; i++)
+			{
+				fsstate->tuples[i] =
+					make_tuple_from_result_row(res, i,
+											   fsstate->rel,
+											   fsstate->attinmeta,
+											   fsstate->retrieved_attrs,
+											   fsstate->temp_cxt);
+				alloc_size += fsstate->tuples[i]->t_len;
+			}
 
-		/* Must be EOF if we didn't get as many tuples as we asked for. */
-		fsstate->eof_reached = (numrows < fetch_size);
+			if (alloc_size > fsstate->max_palloced_mem)
+				fsstate->max_palloced_mem = alloc_size;
 
+			/* Update fetch_ct_2 */
+			if (fsstate->fetch_ct_2 < 2)
+				fsstate->fetch_ct_2++;
+		}
+			
 		PQclear(res);
 		res = NULL;
 	}
@@ -2835,6 +2858,44 @@ postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid)
 }
 
 /*
+ * Compute the estimated overhead of the result tuples
+ * See heap_form_tuple for the details of this calculation.
+ */
+static Size
+estimate_tuple_overhead(TupleDesc tupDesc,
+						List *retrieved_attrs)
+{
+	Size size = 0;
+	int	 ncol = list_length(retrieved_attrs);
+	int  nadded = 0;
+	ListCell	*lc;
+
+	size += offsetof(HeapTupleHeaderData, t_bits);
+	size += BITMAPLEN(ncol);
+
+	if (tupDesc->tdhasoid)
+		size += sizeof(Oid);
+
+	size = MAXALIGN(size);
+
+	size += sizeof(Datum) * ncol;
+	size += sizeof(bool) * ncol;
+
+	foreach (lc, retrieved_attrs)
+	{
+		int i = lfirst_int(lc);
+
+		if (i > 0)
+		{
+			if (tupDesc->attrs[i - 1]->attbyval)
+				size -= (sizeof(Datum) - tupDesc->attrs[i - 1]->attlen);
+		}
+	}
+
+	return size;
+}
+
+/*
  * Create a tuple from the specified row of the PGresult.
  *
  * rel is the local representation of the foreign table, attinmeta is
diff --git a/src/backend/access/common/heaptuple.c b/src/backend/access/common/heaptuple.c
index 867035d..2a577e5 100644
--- a/src/backend/access/common/heaptuple.c
+++ b/src/backend/access/common/heaptuple.c
@@ -60,6 +60,7 @@
 #include "access/sysattr.h"
 #include "access/tuptoaster.h"
 #include "executor/tuptable.h"
+#include "utils/pg_lzcompress.h"
 
 
 /* Does att's datatype allow packing into the 1-byte-header varlena format? */
@@ -120,6 +121,41 @@ heap_compute_data_size(TupleDesc tupleDesc,
 	return data_length;
 }
 
+Size
+slot_compute_attr_size(TupleTableSlot *slot)
+{
+	TupleDesc	 tupleDesc = slot->tts_tupleDescriptor;
+	Datum		*values = slot->tts_values;
+	bool		*isnull = slot->tts_isnull;
+	int			 nattrs = tupleDesc->natts;
+	int i;
+	Size		 sumattlen = 0;
+
+	if (slot->tts_nvalid < nattrs)
+	{
+		/*  We need all attributes deformed */
+		slot_getallattrs(slot);
+	}
+	for (i = 0 ; i < nattrs ; i++)
+	{
+		Form_pg_attribute thisatt = tupleDesc->attrs[i];
+
+		if (isnull[i]) continue;
+
+		if (thisatt->attbyval)
+			sumattlen += thisatt->attlen;
+		else if (VARATT_IS_COMPRESSED(values[i]))
+		{
+			sumattlen += PGLZ_RAW_SIZE((PGLZ_Header *)values[i]);
+		}
+		else if (VARATT_IS_SHORT(values[i]))
+			sumattlen += VARSIZE_SHORT(values[i]) - VARHDRSZ_SHORT;
+		else
+			sumattlen += VARSIZE(values[i]) - VARHDRSZ;
+	}
+	return sumattlen;
+}
+
 /*
  * heap_fill_tuple
  *		Load data portion of a tuple from values/isnull arrays
diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c
index 0e604b7..b6e6523 100644
--- a/src/backend/commands/copy.c
+++ b/src/backend/commands/copy.c
@@ -1915,7 +1915,7 @@ CopyTo(CopyState cstate)
 	else
 	{
 		/* run the plan --- the dest receiver will send tuples */
-		ExecutorRun(cstate->queryDesc, ForwardScanDirection, 0L);
+		ExecutorRun(cstate->queryDesc, ForwardScanDirection, 0L, 0L);
 		processed = ((DR_copy *) cstate->queryDesc->dest)->processed;
 	}
 
diff --git a/src/backend/commands/createas.c b/src/backend/commands/createas.c
index abc0fe8..c5c4478 100644
--- a/src/backend/commands/createas.c
+++ b/src/backend/commands/createas.c
@@ -192,7 +192,7 @@ ExecCreateTableAs(CreateTableAsStmt *stmt, const char *queryString,
 		dir = ForwardScanDirection;
 
 	/* run the plan */
-	ExecutorRun(queryDesc, dir, 0L);
+	ExecutorRun(queryDesc, dir, 0L, 0L);
 
 	/* save the rowcount if we're given a completionTag to fill */
 	if (completionTag)
diff --git a/src/backend/commands/explain.c b/src/backend/commands/explain.c
index 7cfc9bb..2c23e9b 100644
--- a/src/backend/commands/explain.c
+++ b/src/backend/commands/explain.c
@@ -489,7 +489,7 @@ ExplainOnePlan(PlannedStmt *plannedstmt, IntoClause *into, ExplainState *es,
 			dir = ForwardScanDirection;
 
 		/* run the plan */
-		ExecutorRun(queryDesc, dir, 0L);
+		ExecutorRun(queryDesc, dir, 0L, 0L);
 
 		/* run cleanup too */
 		ExecutorFinish(queryDesc);
diff --git a/src/backend/commands/extension.c b/src/backend/commands/extension.c
index 3b95552..f624567 100644
--- a/src/backend/commands/extension.c
+++ b/src/backend/commands/extension.c
@@ -736,7 +736,7 @@ execute_sql_string(const char *sql, const char *filename)
 										dest, NULL, 0);
 
 				ExecutorStart(qdesc, 0);
-				ExecutorRun(qdesc, ForwardScanDirection, 0);
+				ExecutorRun(qdesc, ForwardScanDirection, 0, 0);
 				ExecutorFinish(qdesc);
 				ExecutorEnd(qdesc);
 
diff --git a/src/backend/commands/matview.c b/src/backend/commands/matview.c
index 74415b8..6530ecb 100644
--- a/src/backend/commands/matview.c
+++ b/src/backend/commands/matview.c
@@ -360,7 +360,7 @@ refresh_matview_datafill(DestReceiver *dest, Query *query,
 	ExecutorStart(queryDesc, EXEC_FLAG_WITHOUT_OIDS);
 
 	/* run the plan */
-	ExecutorRun(queryDesc, ForwardScanDirection, 0L);
+	ExecutorRun(queryDesc, ForwardScanDirection, 0L, 0L);
 
 	/* and clean up */
 	ExecutorFinish(queryDesc);
diff --git a/src/backend/commands/portalcmds.c b/src/backend/commands/portalcmds.c
index 2794537..255c86e 100644
--- a/src/backend/commands/portalcmds.c
+++ b/src/backend/commands/portalcmds.c
@@ -177,6 +177,7 @@ PerformPortalFetch(FetchStmt *stmt,
 	nprocessed = PortalRunFetch(portal,
 								stmt->direction,
 								stmt->howMany,
+								stmt->howLarge,
 								dest);
 
 	/* Return command status if wanted */
@@ -375,7 +376,7 @@ PersistHoldablePortal(Portal portal)
 										true);
 
 		/* Fetch the result set into the tuplestore */
-		ExecutorRun(queryDesc, ForwardScanDirection, 0L);
+		ExecutorRun(queryDesc, ForwardScanDirection, 0L, 0L);
 
 		(*queryDesc->dest->rDestroy) (queryDesc->dest);
 		queryDesc->dest = NULL;
diff --git a/src/backend/commands/prepare.c b/src/backend/commands/prepare.c
index 71b08f0..31799f5 100644
--- a/src/backend/commands/prepare.c
+++ b/src/backend/commands/prepare.c
@@ -291,7 +291,7 @@ ExecuteQuery(ExecuteStmt *stmt, IntoClause *intoClause,
 	 */
 	PortalStart(portal, paramLI, eflags, GetActiveSnapshot());
 
-	(void) PortalRun(portal, count, false, dest, dest, completionTag);
+	(void) PortalRun(portal, count, 0L, false, dest, dest, completionTag);
 
 	PortalDrop(portal, false);
 
diff --git a/src/backend/executor/execMain.c b/src/backend/executor/execMain.c
index b9f21c5..d976bf3 100644
--- a/src/backend/executor/execMain.c
+++ b/src/backend/executor/execMain.c
@@ -78,6 +78,7 @@ static void ExecutePlan(EState *estate, PlanState *planstate,
 			CmdType operation,
 			bool sendTuples,
 			long numberTuples,
+			long sizeTuples,
 			ScanDirection direction,
 			DestReceiver *dest);
 static bool ExecCheckRTEPerms(RangeTblEntry *rte);
@@ -248,17 +249,17 @@ standard_ExecutorStart(QueryDesc *queryDesc, int eflags)
  */
 void
 ExecutorRun(QueryDesc *queryDesc,
-			ScanDirection direction, long count)
+			ScanDirection direction, long count, long size)
 {
 	if (ExecutorRun_hook)
-		(*ExecutorRun_hook) (queryDesc, direction, count);
+		(*ExecutorRun_hook) (queryDesc, direction, count, size);
 	else
-		standard_ExecutorRun(queryDesc, direction, count);
+		standard_ExecutorRun(queryDesc, direction, count, size);
 }
 
 void
 standard_ExecutorRun(QueryDesc *queryDesc,
-					 ScanDirection direction, long count)
+					 ScanDirection direction, long count, long size)
 {
 	EState	   *estate;
 	CmdType		operation;
@@ -310,6 +311,7 @@ standard_ExecutorRun(QueryDesc *queryDesc,
 					operation,
 					sendTuples,
 					count,
+					size,
 					direction,
 					dest);
 
@@ -1450,22 +1452,26 @@ ExecutePlan(EState *estate,
 			CmdType operation,
 			bool sendTuples,
 			long numberTuples,
+			long sizeTuples,
 			ScanDirection direction,
 			DestReceiver *dest)
 {
 	TupleTableSlot *slot;
 	long		current_tuple_count;
+	long		sent_size;
 
 	/*
 	 * initialize local variables
 	 */
 	current_tuple_count = 0;
-
+	sent_size = 0;
 	/*
 	 * Set the direction.
 	 */
 	estate->es_direction = direction;
 
+	estate->es_stoppedbysize = false;
+
 	/*
 	 * Loop until we've processed the proper number of tuples from the plan.
 	 */
@@ -1520,6 +1526,23 @@ ExecutePlan(EState *estate,
 		current_tuple_count++;
 		if (numberTuples && numberTuples == current_tuple_count)
 			break;
+
+		if (sizeTuples > 0)
+		{
+			/*
+			 * Count the size of tuples we've sent
+			 *
+			 * This needs all attributes deformed so a bit slow on some cases.
+			 */
+			sent_size += slot_compute_attr_size(slot);
+
+			/* Quit when the size limit will be exceeded by this tuple */
+			if (sizeTuples < sent_size)
+			{
+				estate->es_stoppedbysize = true;
+				break;
+			}
+		}
 	}
 }
 
diff --git a/src/backend/executor/execUtils.c b/src/backend/executor/execUtils.c
index 32697dd..ff2c395 100644
--- a/src/backend/executor/execUtils.c
+++ b/src/backend/executor/execUtils.c
@@ -133,6 +133,7 @@ CreateExecutorState(void)
 	estate->es_rowMarks = NIL;
 
 	estate->es_processed = 0;
+	estate->es_stoppedbysize = false;
 	estate->es_lastoid = InvalidOid;
 
 	estate->es_top_eflags = 0;
diff --git a/src/backend/executor/functions.c b/src/backend/executor/functions.c
index 84be37c..d64e908 100644
--- a/src/backend/executor/functions.c
+++ b/src/backend/executor/functions.c
@@ -850,7 +850,7 @@ postquel_getnext(execution_state *es, SQLFunctionCachePtr fcache)
 		/* Run regular commands to completion unless lazyEval */
 		long		count = (es->lazyEval) ? 1L : 0L;
 
-		ExecutorRun(es->qd, ForwardScanDirection, count);
+		ExecutorRun(es->qd, ForwardScanDirection, count, 0L);
 
 		/*
 		 * If we requested run to completion OR there was no tuple returned,
diff --git a/src/backend/executor/spi.c b/src/backend/executor/spi.c
index 4b86e91..cb30cfb 100644
--- a/src/backend/executor/spi.c
+++ b/src/backend/executor/spi.c
@@ -2369,7 +2369,7 @@ _SPI_pquery(QueryDesc *queryDesc, bool fire_triggers, long tcount)
 
 	ExecutorStart(queryDesc, eflags);
 
-	ExecutorRun(queryDesc, ForwardScanDirection, tcount);
+	ExecutorRun(queryDesc, ForwardScanDirection, tcount, 0L);
 
 	_SPI_current->processed = queryDesc->estate->es_processed;
 	_SPI_current->lastoid = queryDesc->estate->es_lastoid;
@@ -2447,7 +2447,7 @@ _SPI_cursor_operation(Portal portal, FetchDirection direction, long count,
 	/* Run the cursor */
 	nfetched = PortalRunFetch(portal,
 							  direction,
-							  count,
+							  count, 0L,
 							  dest);
 
 	/*
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index 36dac29..e559d1a 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -6021,6 +6021,15 @@ fetch_args:	cursor_name
 					n->howMany = $1;
 					$$ = (Node *)n;
 				}
+			| SignedIconst LIMIT Iconst opt_from_in cursor_name
+				{
+					FetchStmt *n = makeNode(FetchStmt);
+					n->portalname = $5;
+					n->direction = FETCH_FORWARD;
+					n->howMany = $1;
+					n->howLarge = $3;
+					$$ = (Node *)n;
+				}
 			| ALL opt_from_in cursor_name
 				{
 					FetchStmt *n = makeNode(FetchStmt);
@@ -6029,6 +6038,15 @@ fetch_args:	cursor_name
 					n->howMany = FETCH_ALL;
 					$$ = (Node *)n;
 				}
+			| ALL LIMIT Iconst opt_from_in cursor_name
+				{
+					FetchStmt *n = makeNode(FetchStmt);
+					n->portalname = $5;
+					n->direction = FETCH_FORWARD;
+					n->howMany = FETCH_ALL;
+					n->howLarge = $3;
+					$$ = (Node *)n;
+				}
 			| FORWARD opt_from_in cursor_name
 				{
 					FetchStmt *n = makeNode(FetchStmt);
@@ -6045,6 +6063,15 @@ fetch_args:	cursor_name
 					n->howMany = $2;
 					$$ = (Node *)n;
 				}
+			| FORWARD SignedIconst LIMIT Iconst opt_from_in cursor_name
+				{
+					FetchStmt *n = makeNode(FetchStmt);
+					n->portalname = $6;
+					n->direction = FETCH_FORWARD;
+					n->howMany = $2;
+					n->howLarge = $4;
+					$$ = (Node *)n;
+				}
 			| FORWARD ALL opt_from_in cursor_name
 				{
 					FetchStmt *n = makeNode(FetchStmt);
@@ -6053,6 +6080,15 @@ fetch_args:	cursor_name
 					n->howMany = FETCH_ALL;
 					$$ = (Node *)n;
 				}
+			| FORWARD ALL LIMIT Iconst opt_from_in cursor_name
+				{
+					FetchStmt *n = makeNode(FetchStmt);
+					n->portalname = $6;
+					n->direction = FETCH_FORWARD;
+					n->howMany = FETCH_ALL;
+					n->howLarge = $4;
+					$$ = (Node *)n;
+				}
 			| BACKWARD opt_from_in cursor_name
 				{
 					FetchStmt *n = makeNode(FetchStmt);
@@ -6069,6 +6105,15 @@ fetch_args:	cursor_name
 					n->howMany = $2;
 					$$ = (Node *)n;
 				}
+			| BACKWARD SignedIconst LIMIT Iconst opt_from_in cursor_name
+				{
+					FetchStmt *n = makeNode(FetchStmt);
+					n->portalname = $6;
+					n->direction = FETCH_BACKWARD;
+					n->howMany = $2;
+					n->howLarge = $4;
+					$$ = (Node *)n;
+				}
 			| BACKWARD ALL opt_from_in cursor_name
 				{
 					FetchStmt *n = makeNode(FetchStmt);
@@ -6077,6 +6122,15 @@ fetch_args:	cursor_name
 					n->howMany = FETCH_ALL;
 					$$ = (Node *)n;
 				}
+			| BACKWARD ALL LIMIT Iconst opt_from_in cursor_name
+				{
+					FetchStmt *n = makeNode(FetchStmt);
+					n->portalname = $6;
+					n->direction = FETCH_BACKWARD;
+					n->howMany = FETCH_ALL;
+					n->howLarge = $4;
+					$$ = (Node *)n;
+				}
 		;
 
 from_in:	FROM									{}
diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c
index 8f74353..55f062b 100644
--- a/src/backend/tcop/postgres.c
+++ b/src/backend/tcop/postgres.c
@@ -1043,6 +1043,7 @@ exec_simple_query(const char *query_string)
 		 */
 		(void) PortalRun(portal,
 						 FETCH_ALL,
+						 0,
 						 isTopLevel,
 						 receiver,
 						 receiver,
@@ -1928,6 +1929,7 @@ exec_execute_message(const char *portal_name, long max_rows)
 
 	completed = PortalRun(portal,
 						  max_rows,
+						  0,
 						  true, /* always top level */
 						  receiver,
 						  receiver,
diff --git a/src/backend/tcop/pquery.c b/src/backend/tcop/pquery.c
index 9c14e8a..1456c5a 100644
--- a/src/backend/tcop/pquery.c
+++ b/src/backend/tcop/pquery.c
@@ -16,6 +16,7 @@
 #include "postgres.h"
 
 #include "access/xact.h"
+#include "access/htup_details.h"
 #include "commands/prepare.h"
 #include "executor/tstoreReceiver.h"
 #include "miscadmin.h"
@@ -39,9 +40,10 @@ static void ProcessQuery(PlannedStmt *plan,
 			 DestReceiver *dest,
 			 char *completionTag);
 static void FillPortalStore(Portal portal, bool isTopLevel);
-static uint32 RunFromStore(Portal portal, ScanDirection direction, long count,
+static uint32 RunFromStore(Portal portal, ScanDirection direction,
+		     long count, long size, bool *stoppedbysize,
 			 DestReceiver *dest);
-static long PortalRunSelect(Portal portal, bool forward, long count,
+static long PortalRunSelect(Portal portal, bool forward, long count, long size,
 				DestReceiver *dest);
 static void PortalRunUtility(Portal portal, Node *utilityStmt, bool isTopLevel,
 				 DestReceiver *dest, char *completionTag);
@@ -51,6 +53,7 @@ static void PortalRunMulti(Portal portal, bool isTopLevel,
 static long DoPortalRunFetch(Portal portal,
 				 FetchDirection fdirection,
 				 long count,
+				 long size,
 				 DestReceiver *dest);
 static void DoPortalRewind(Portal portal);
 
@@ -182,7 +185,7 @@ ProcessQuery(PlannedStmt *plan,
 	/*
 	 * Run the plan to completion.
 	 */
-	ExecutorRun(queryDesc, ForwardScanDirection, 0L);
+	ExecutorRun(queryDesc, ForwardScanDirection, 0L, 0L);
 
 	/*
 	 * Build command completion status string, if caller wants one.
@@ -703,7 +706,7 @@ PortalSetResultFormat(Portal portal, int nFormats, int16 *formats)
  * suspended due to exhaustion of the count parameter.
  */
 bool
-PortalRun(Portal portal, long count, bool isTopLevel,
+PortalRun(Portal portal, long count, long size, bool isTopLevel,
 		  DestReceiver *dest, DestReceiver *altdest,
 		  char *completionTag)
 {
@@ -787,7 +790,7 @@ PortalRun(Portal portal, long count, bool isTopLevel,
 				/*
 				 * Now fetch desired portion of results.
 				 */
-				nprocessed = PortalRunSelect(portal, true, count, dest);
+				nprocessed = PortalRunSelect(portal, true, count, size, dest);
 
 				/*
 				 * If the portal result contains a command tag and the caller
@@ -892,11 +895,13 @@ static long
 PortalRunSelect(Portal portal,
 				bool forward,
 				long count,
+				long size,
 				DestReceiver *dest)
 {
 	QueryDesc  *queryDesc;
 	ScanDirection direction;
 	uint32		nprocessed;
+	bool		stoppedbysize;
 
 	/*
 	 * NB: queryDesc will be NULL if we are fetching from a held cursor or a
@@ -939,12 +944,14 @@ PortalRunSelect(Portal portal,
 			count = 0;
 
 		if (portal->holdStore)
-			nprocessed = RunFromStore(portal, direction, count, dest);
+			nprocessed = RunFromStore(portal, direction, count,
+									  size, &stoppedbysize, dest);
 		else
 		{
 			PushActiveSnapshot(queryDesc->snapshot);
-			ExecutorRun(queryDesc, direction, count);
+			ExecutorRun(queryDesc, direction, count, size);
 			nprocessed = queryDesc->estate->es_processed;
+			stoppedbysize = queryDesc->estate->es_stoppedbysize;
 			PopActiveSnapshot();
 		}
 
@@ -954,8 +961,9 @@ PortalRunSelect(Portal portal,
 
 			if (nprocessed > 0)
 				portal->atStart = false;		/* OK to go backward now */
-			if (count == 0 ||
-				(unsigned long) nprocessed < (unsigned long) count)
+			if ((count == 0 ||
+				 (unsigned long) nprocessed < (unsigned long) count) &&
+				!stoppedbysize)
 				portal->atEnd = true;	/* we retrieved 'em all */
 			oldPos = portal->portalPos;
 			portal->portalPos += nprocessed;
@@ -982,12 +990,14 @@ PortalRunSelect(Portal portal,
 			count = 0;
 
 		if (portal->holdStore)
-			nprocessed = RunFromStore(portal, direction, count, dest);
+			nprocessed = RunFromStore(portal, direction, count,
+									  size, &stoppedbysize, dest);
 		else
 		{
 			PushActiveSnapshot(queryDesc->snapshot);
-			ExecutorRun(queryDesc, direction, count);
+			ExecutorRun(queryDesc, direction, count, size);
 			nprocessed = queryDesc->estate->es_processed;
+			stoppedbysize = queryDesc->estate->es_stoppedbysize;
 			PopActiveSnapshot();
 		}
 
@@ -998,8 +1008,9 @@ PortalRunSelect(Portal portal,
 				portal->atEnd = false;	/* OK to go forward now */
 				portal->portalPos++;	/* adjust for endpoint case */
 			}
-			if (count == 0 ||
-				(unsigned long) nprocessed < (unsigned long) count)
+			if ((count == 0 ||
+				 (unsigned long) nprocessed < (unsigned long) count) &&
+				!stoppedbysize)
 			{
 				portal->atStart = true; /* we retrieved 'em all */
 				portal->portalPos = 0;
@@ -1089,10 +1100,13 @@ FillPortalStore(Portal portal, bool isTopLevel)
  */
 static uint32
 RunFromStore(Portal portal, ScanDirection direction, long count,
-			 DestReceiver *dest)
+			 long size_limit, bool *stoppedbysize, DestReceiver *dest)
 {
 	long		current_tuple_count = 0;
 	TupleTableSlot *slot;
+	long			sent_size = 0;
+
+	*stoppedbysize = false;
 
 	slot = MakeSingleTupleTableSlot(portal->tupDesc);
 
@@ -1123,6 +1137,9 @@ RunFromStore(Portal portal, ScanDirection direction, long count,
 
 			(*dest->receiveSlot) (slot, dest);
 
+			/* Count the size of tuples we've sent */
+			sent_size += slot_compute_attr_size(slot);
+
 			ExecClearTuple(slot);
 
 			/*
@@ -1133,6 +1150,14 @@ RunFromStore(Portal portal, ScanDirection direction, long count,
 			current_tuple_count++;
 			if (count && count == current_tuple_count)
 				break;
+
+			/* Quit when the size limit will be exceeded by this tuple */
+			if (current_tuple_count > 0 &&
+				size_limit > 0 && size_limit < sent_size)
+			{
+				*stoppedbysize = true;
+				break;
+			}
 		}
 	}
 
@@ -1385,6 +1410,7 @@ long
 PortalRunFetch(Portal portal,
 			   FetchDirection fdirection,
 			   long count,
+			   long size,
 			   DestReceiver *dest)
 {
 	long		result;
@@ -1422,7 +1448,7 @@ PortalRunFetch(Portal portal,
 		switch (portal->strategy)
 		{
 			case PORTAL_ONE_SELECT:
-				result = DoPortalRunFetch(portal, fdirection, count, dest);
+				result = DoPortalRunFetch(portal, fdirection, count, size, dest);
 				break;
 
 			case PORTAL_ONE_RETURNING:
@@ -1439,7 +1465,7 @@ PortalRunFetch(Portal portal,
 				/*
 				 * Now fetch desired portion of results.
 				 */
-				result = DoPortalRunFetch(portal, fdirection, count, dest);
+				result = DoPortalRunFetch(portal, fdirection, count, size, dest);
 				break;
 
 			default:
@@ -1484,6 +1510,7 @@ static long
 DoPortalRunFetch(Portal portal,
 				 FetchDirection fdirection,
 				 long count,
+				 long size,
 				 DestReceiver *dest)
 {
 	bool		forward;
@@ -1526,7 +1553,7 @@ DoPortalRunFetch(Portal portal,
 				{
 					DoPortalRewind(portal);
 					if (count > 1)
-						PortalRunSelect(portal, true, count - 1,
+						PortalRunSelect(portal, true, count - 1, 0L,
 										None_Receiver);
 				}
 				else
@@ -1536,13 +1563,13 @@ DoPortalRunFetch(Portal portal,
 					if (portal->atEnd)
 						pos++;	/* need one extra fetch if off end */
 					if (count <= pos)
-						PortalRunSelect(portal, false, pos - count + 1,
+						PortalRunSelect(portal, false, pos - count + 1, 0L,
 										None_Receiver);
 					else if (count > pos + 1)
-						PortalRunSelect(portal, true, count - pos - 1,
+						PortalRunSelect(portal, true, count - pos - 1, 0L,
 										None_Receiver);
 				}
-				return PortalRunSelect(portal, true, 1L, dest);
+				return PortalRunSelect(portal, true, 1L, 0L, dest);
 			}
 			else if (count < 0)
 			{
@@ -1553,17 +1580,17 @@ DoPortalRunFetch(Portal portal,
 				 * (Is it worth considering case where count > half of size of
 				 * query?  We could rewind once we know the size ...)
 				 */
-				PortalRunSelect(portal, true, FETCH_ALL, None_Receiver);
+				PortalRunSelect(portal, true, FETCH_ALL, 0L, None_Receiver);
 				if (count < -1)
-					PortalRunSelect(portal, false, -count - 1, None_Receiver);
-				return PortalRunSelect(portal, false, 1L, dest);
+					PortalRunSelect(portal, false, -count - 1, 0, None_Receiver);
+				return PortalRunSelect(portal, false, 1L, 0L, dest);
 			}
 			else
 			{
 				/* count == 0 */
 				/* Rewind to start, return zero rows */
 				DoPortalRewind(portal);
-				return PortalRunSelect(portal, true, 0L, dest);
+				return PortalRunSelect(portal, true, 0L, 0L, dest);
 			}
 			break;
 		case FETCH_RELATIVE:
@@ -1573,8 +1600,8 @@ DoPortalRunFetch(Portal portal,
 				 * Definition: advance count-1 rows, return next row (if any).
 				 */
 				if (count > 1)
-					PortalRunSelect(portal, true, count - 1, None_Receiver);
-				return PortalRunSelect(portal, true, 1L, dest);
+					PortalRunSelect(portal, true, count - 1, 0L, None_Receiver);
+				return PortalRunSelect(portal, true, 1L, 0L, dest);
 			}
 			else if (count < 0)
 			{
@@ -1583,8 +1610,8 @@ DoPortalRunFetch(Portal portal,
 				 * any).
 				 */
 				if (count < -1)
-					PortalRunSelect(portal, false, -count - 1, None_Receiver);
-				return PortalRunSelect(portal, false, 1L, dest);
+					PortalRunSelect(portal, false, -count - 1, 0L, None_Receiver);
+				return PortalRunSelect(portal, false, 1L, 0L, dest);
 			}
 			else
 			{
@@ -1630,7 +1657,7 @@ DoPortalRunFetch(Portal portal,
 			 */
 			if (on_row)
 			{
-				PortalRunSelect(portal, false, 1L, None_Receiver);
+				PortalRunSelect(portal, false, 1L, 0L, None_Receiver);
 				/* Set up to fetch one row forward */
 				count = 1;
 				forward = true;
@@ -1652,7 +1679,7 @@ DoPortalRunFetch(Portal portal,
 		return result;
 	}
 
-	return PortalRunSelect(portal, forward, count, dest);
+	return PortalRunSelect(portal, forward, count, size, dest);
 }
 
 /*
diff --git a/src/include/access/htup_details.h b/src/include/access/htup_details.h
index d2ad910..2eeba00 100644
--- a/src/include/access/htup_details.h
+++ b/src/include/access/htup_details.h
@@ -20,6 +20,7 @@
 #include "access/transam.h"
 #include "storage/bufpage.h"
 
+#include "executor/tuptable.h"
 /*
  * MaxTupleAttributeNumber limits the number of (user) columns in a tuple.
  * The key limit on this value is that the size of the fixed overhead for
@@ -723,6 +724,7 @@ extern Datum fastgetattr(HeapTuple tup, int attnum, TupleDesc tupleDesc,
 /* prototypes for functions in common/heaptuple.c */
 extern Size heap_compute_data_size(TupleDesc tupleDesc,
 					   Datum *values, bool *isnull);
+extern Size slot_compute_attr_size(TupleTableSlot *slot);
 extern void heap_fill_tuple(TupleDesc tupleDesc,
 				Datum *values, bool *isnull,
 				char *data, Size data_size,
diff --git a/src/include/executor/executor.h b/src/include/executor/executor.h
index 40fde83..64a02c3 100644
--- a/src/include/executor/executor.h
+++ b/src/include/executor/executor.h
@@ -80,8 +80,8 @@ extern PGDLLIMPORT ExecutorStart_hook_type ExecutorStart_hook;
 
 /* Hook for plugins to get control in ExecutorRun() */
 typedef void (*ExecutorRun_hook_type) (QueryDesc *queryDesc,
-												   ScanDirection direction,
-												   long count);
+									   ScanDirection direction,
+									   long count, long size);
 extern PGDLLIMPORT ExecutorRun_hook_type ExecutorRun_hook;
 
 /* Hook for plugins to get control in ExecutorFinish() */
@@ -176,9 +176,9 @@ extern TupleTableSlot *ExecFilterJunk(JunkFilter *junkfilter,
 extern void ExecutorStart(QueryDesc *queryDesc, int eflags);
 extern void standard_ExecutorStart(QueryDesc *queryDesc, int eflags);
 extern void ExecutorRun(QueryDesc *queryDesc,
-			ScanDirection direction, long count);
+			ScanDirection direction, long count, long size);
 extern void standard_ExecutorRun(QueryDesc *queryDesc,
-					 ScanDirection direction, long count);
+		    ScanDirection direction, long count, long size);
 extern void ExecutorFinish(QueryDesc *queryDesc);
 extern void standard_ExecutorFinish(QueryDesc *queryDesc);
 extern void ExecutorEnd(QueryDesc *queryDesc);
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index 41288ed..d963286 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -376,6 +376,7 @@ typedef struct EState
 	List	   *es_rowMarks;	/* List of ExecRowMarks */
 
 	uint32		es_processed;	/* # of tuples processed */
+	bool		es_stoppedbysize; /* true if processing stopped by size */
 	Oid			es_lastoid;		/* last oid processed (by INSERT) */
 
 	int			es_top_eflags;	/* eflags passed to ExecutorStart */
diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h
index b1dfa85..9e18331 100644
--- a/src/include/nodes/parsenodes.h
+++ b/src/include/nodes/parsenodes.h
@@ -2223,6 +2223,7 @@ typedef struct FetchStmt
 	NodeTag		type;
 	FetchDirection direction;	/* see above */
 	long		howMany;		/* number of rows, or position argument */
+	long		howLarge;		/* total bytes of rows */
 	char	   *portalname;		/* name of portal (cursor) */
 	bool		ismove;			/* TRUE if MOVE */
 } FetchStmt;
diff --git a/src/include/tcop/pquery.h b/src/include/tcop/pquery.h
index 8073a6e..afffe86 100644
--- a/src/include/tcop/pquery.h
+++ b/src/include/tcop/pquery.h
@@ -33,13 +33,14 @@ extern void PortalStart(Portal portal, ParamListInfo params,
 extern void PortalSetResultFormat(Portal portal, int nFormats,
 					  int16 *formats);
 
-extern bool PortalRun(Portal portal, long count, bool isTopLevel,
+extern bool PortalRun(Portal portal, long count, long size, bool isTopLevel,
 		  DestReceiver *dest, DestReceiver *altdest,
 		  char *completionTag);
 
 extern long PortalRunFetch(Portal portal,
 			   FetchDirection fdirection,
 			   long count,
+			   long size,
 			   DestReceiver *dest);
 
 #endif   /* PQUERY_H */
diff --git a/src/interfaces/ecpg/preproc/Makefile b/src/interfaces/ecpg/preproc/Makefile
index 1ecc405..b492fa7 100644
--- a/src/interfaces/ecpg/preproc/Makefile
+++ b/src/interfaces/ecpg/preproc/Makefile
@@ -48,7 +48,7 @@ ecpg: $(OBJS) | submake-libpgport
 preproc.o: pgc.c
 
 preproc.h: preproc.c ;
-preproc.c: BISONFLAGS += -d
+preproc.c: BISONFLAGS += -r all -d
 
 preproc.y: ../../../backend/parser/gram.y parse.pl ecpg.addons ecpg.header ecpg.tokens ecpg.trailer ecpg.type
 	$(PERL) $(srcdir)/parse.pl $(srcdir) < $< > $@
diff --git a/src/interfaces/ecpg/preproc/ecpg.addons b/src/interfaces/ecpg/preproc/ecpg.addons
index b3b36cf..bdccb68 100644
--- a/src/interfaces/ecpg/preproc/ecpg.addons
+++ b/src/interfaces/ecpg/preproc/ecpg.addons
@@ -220,13 +220,46 @@ ECPG: fetch_argsNEXTopt_from_incursor_name addon
 ECPG: fetch_argsPRIORopt_from_incursor_name addon
 ECPG: fetch_argsFIRST_Popt_from_incursor_name addon
 ECPG: fetch_argsLAST_Popt_from_incursor_name addon
+		add_additional_variables($3, false);
+		if ($3[0] == ':')
+		{
+			free($3);
+			$3 = mm_strdup("$0");
+		}
 ECPG: fetch_argsALLopt_from_incursor_name addon
+ECPG: fetch_argsFORWARDopt_from_incursor_name addon
+ECPG: fetch_argsBACKWARDopt_from_incursor_name addon
 		add_additional_variables($3, false);
 		if ($3[0] == ':')
 		{
 			free($3);
 			$3 = mm_strdup("$0");
 		}
+ECPG: fetch_argsALLLIMITIconstopt_from_incursor_name addon
+		add_additional_variables($5, false);
+		if ($5[0] == ':')
+		{
+			free($5);
+			$5 = mm_strdup("$0");
+		}
+		if ($3[0] == '$')
+		{
+			free($3);
+			$3 = mm_strdup("$0");
+		}
+ECPG: fetch_argsFORWARDALLLIMITIconstopt_from_incursor_name addon
+ECPG: fetch_argsBACKWARDALLLIMITIconstopt_from_incursor_name addon
+		add_additional_variables($6, false);
+		if ($6[0] == ':')
+		{
+			free($6);
+			$6 = mm_strdup("$0");
+		}
+		if ($4[0] == '$')
+		{
+			free($4);
+			$4 = mm_strdup("$0");
+		}
 ECPG: fetch_argsSignedIconstopt_from_incursor_name addon
 		add_additional_variables($3, false);
 		if ($3[0] == ':')
@@ -234,11 +267,41 @@ ECPG: fetch_argsSignedIconstopt_from_incursor_name addon
 			free($3);
 			$3 = mm_strdup("$0");
 		}
+ECPG: fetch_argsSignedIconstLIMITIconstopt_from_incursor_name addon
+		add_additional_variables($5, false);
+		if ($5[0] == ':')
+		{
+			free($5);
+			$5 = mm_strdup("$0");
+		}
 		if ($1[0] == '$')
 		{
 			free($1);
 			$1 = mm_strdup("$0");
 		}
+		if ($3[0] == '$')
+		{
+			free($3);
+			$3 = mm_strdup("$0");
+		}
+ECPG: fetch_argsFORWARDSignedIconstLIMITIconstopt_from_incursor_name addon
+ECPG: fetch_argsBACKWARDSignedIconstLIMITIconstopt_from_incursor_name addon
+		add_additional_variables($6, false);
+		if ($6[0] == ':')
+		{
+			free($6);
+			$6 = mm_strdup("$0");
+		}
+		if ($2[0] == '$')
+		{
+			free($2);
+			$2 = mm_strdup("$0");
+		}
+		if ($4[0] == '$')
+		{
+			free($4);
+			$4 = mm_strdup("$0");
+		}
 ECPG: fetch_argsFORWARDALLopt_from_incursor_name addon
 ECPG: fetch_argsBACKWARDALLopt_from_incursor_name addon
 		add_additional_variables($4, false);
-- 
2.1.0.GIT

