>From 6f1dd6998ba312c3552f137365e3a3118b7935be Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horiguchi.kyotaro@lab.ntt.co.jp>
Date: Wed, 21 Jan 2015 17:18:09 +0900
Subject: [PATCH] Size limitation feature of FETCH v0

---
 contrib/auto_explain/auto_explain.c             |  8 +--
 contrib/pg_stat_statements/pg_stat_statements.c |  8 +--
 src/backend/commands/copy.c                     |  2 +-
 src/backend/commands/createas.c                 |  2 +-
 src/backend/commands/explain.c                  |  2 +-
 src/backend/commands/extension.c                |  2 +-
 src/backend/commands/matview.c                  |  2 +-
 src/backend/commands/portalcmds.c               |  3 +-
 src/backend/commands/prepare.c                  |  2 +-
 src/backend/executor/execMain.c                 | 35 +++++++--
 src/backend/executor/execUtils.c                |  1 +
 src/backend/executor/functions.c                |  2 +-
 src/backend/executor/spi.c                      |  4 +-
 src/backend/parser/gram.y                       | 59 +++++++++++++++
 src/backend/tcop/postgres.c                     |  2 +
 src/backend/tcop/pquery.c                       | 95 +++++++++++++++++--------
 src/include/executor/executor.h                 |  8 +--
 src/include/nodes/execnodes.h                   |  1 +
 src/include/nodes/parsenodes.h                  |  1 +
 src/include/tcop/pquery.h                       |  3 +-
 src/interfaces/ecpg/preproc/Makefile            |  2 +-
 src/interfaces/ecpg/preproc/ecpg.addons         | 63 ++++++++++++++++
 22 files changed, 248 insertions(+), 59 deletions(-)

diff --git a/contrib/auto_explain/auto_explain.c b/contrib/auto_explain/auto_explain.c
index 2a184ed..f121a33 100644
--- a/contrib/auto_explain/auto_explain.c
+++ b/contrib/auto_explain/auto_explain.c
@@ -57,7 +57,7 @@ void		_PG_fini(void);
 static void explain_ExecutorStart(QueryDesc *queryDesc, int eflags);
 static void explain_ExecutorRun(QueryDesc *queryDesc,
 					ScanDirection direction,
-					long count);
+					long count, long size);
 static void explain_ExecutorFinish(QueryDesc *queryDesc);
 static void explain_ExecutorEnd(QueryDesc *queryDesc);
 
@@ -232,15 +232,15 @@ explain_ExecutorStart(QueryDesc *queryDesc, int eflags)
  * ExecutorRun hook: all we need do is track nesting depth
  */
 static void
-explain_ExecutorRun(QueryDesc *queryDesc, ScanDirection direction, long count)
+explain_ExecutorRun(QueryDesc *queryDesc, ScanDirection direction, long count, long size)
 {
 	nesting_level++;
 	PG_TRY();
 	{
 		if (prev_ExecutorRun)
-			prev_ExecutorRun(queryDesc, direction, count);
+			prev_ExecutorRun(queryDesc, direction, count, size);
 		else
-			standard_ExecutorRun(queryDesc, direction, count);
+			standard_ExecutorRun(queryDesc, direction, count, size);
 		nesting_level--;
 	}
 	PG_CATCH();
diff --git a/contrib/pg_stat_statements/pg_stat_statements.c b/contrib/pg_stat_statements/pg_stat_statements.c
index 2629bfc..a68c11d 100644
--- a/contrib/pg_stat_statements/pg_stat_statements.c
+++ b/contrib/pg_stat_statements/pg_stat_statements.c
@@ -282,7 +282,7 @@ static void pgss_post_parse_analyze(ParseState *pstate, Query *query);
 static void pgss_ExecutorStart(QueryDesc *queryDesc, int eflags);
 static void pgss_ExecutorRun(QueryDesc *queryDesc,
 				 ScanDirection direction,
-				 long count);
+				 long count, long size);
 static void pgss_ExecutorFinish(QueryDesc *queryDesc);
 static void pgss_ExecutorEnd(QueryDesc *queryDesc);
 static void pgss_ProcessUtility(Node *parsetree, const char *queryString,
@@ -863,15 +863,15 @@ pgss_ExecutorStart(QueryDesc *queryDesc, int eflags)
  * ExecutorRun hook: all we need do is track nesting depth
  */
 static void
-pgss_ExecutorRun(QueryDesc *queryDesc, ScanDirection direction, long count)
+pgss_ExecutorRun(QueryDesc *queryDesc, ScanDirection direction, long count, long size)
 {
 	nested_level++;
 	PG_TRY();
 	{
 		if (prev_ExecutorRun)
-			prev_ExecutorRun(queryDesc, direction, count);
+			prev_ExecutorRun(queryDesc, direction, count, size);
 		else
-			standard_ExecutorRun(queryDesc, direction, count);
+			standard_ExecutorRun(queryDesc, direction, count, size);
 		nested_level--;
 	}
 	PG_CATCH();
diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c
index 0e604b7..b6e6523 100644
--- a/src/backend/commands/copy.c
+++ b/src/backend/commands/copy.c
@@ -1915,7 +1915,7 @@ CopyTo(CopyState cstate)
 	else
 	{
 		/* run the plan --- the dest receiver will send tuples */
-		ExecutorRun(cstate->queryDesc, ForwardScanDirection, 0L);
+		ExecutorRun(cstate->queryDesc, ForwardScanDirection, 0L, 0L);
 		processed = ((DR_copy *) cstate->queryDesc->dest)->processed;
 	}
 
diff --git a/src/backend/commands/createas.c b/src/backend/commands/createas.c
index abc0fe8..c5c4478 100644
--- a/src/backend/commands/createas.c
+++ b/src/backend/commands/createas.c
@@ -192,7 +192,7 @@ ExecCreateTableAs(CreateTableAsStmt *stmt, const char *queryString,
 		dir = ForwardScanDirection;
 
 	/* run the plan */
-	ExecutorRun(queryDesc, dir, 0L);
+	ExecutorRun(queryDesc, dir, 0L, 0L);
 
 	/* save the rowcount if we're given a completionTag to fill */
 	if (completionTag)
diff --git a/src/backend/commands/explain.c b/src/backend/commands/explain.c
index 7cfc9bb..2c23e9b 100644
--- a/src/backend/commands/explain.c
+++ b/src/backend/commands/explain.c
@@ -489,7 +489,7 @@ ExplainOnePlan(PlannedStmt *plannedstmt, IntoClause *into, ExplainState *es,
 			dir = ForwardScanDirection;
 
 		/* run the plan */
-		ExecutorRun(queryDesc, dir, 0L);
+		ExecutorRun(queryDesc, dir, 0L, 0L);
 
 		/* run cleanup too */
 		ExecutorFinish(queryDesc);
diff --git a/src/backend/commands/extension.c b/src/backend/commands/extension.c
index 3b95552..f624567 100644
--- a/src/backend/commands/extension.c
+++ b/src/backend/commands/extension.c
@@ -736,7 +736,7 @@ execute_sql_string(const char *sql, const char *filename)
 										dest, NULL, 0);
 
 				ExecutorStart(qdesc, 0);
-				ExecutorRun(qdesc, ForwardScanDirection, 0);
+				ExecutorRun(qdesc, ForwardScanDirection, 0, 0);
 				ExecutorFinish(qdesc);
 				ExecutorEnd(qdesc);
 
diff --git a/src/backend/commands/matview.c b/src/backend/commands/matview.c
index 74415b8..6530ecb 100644
--- a/src/backend/commands/matview.c
+++ b/src/backend/commands/matview.c
@@ -360,7 +360,7 @@ refresh_matview_datafill(DestReceiver *dest, Query *query,
 	ExecutorStart(queryDesc, EXEC_FLAG_WITHOUT_OIDS);
 
 	/* run the plan */
-	ExecutorRun(queryDesc, ForwardScanDirection, 0L);
+	ExecutorRun(queryDesc, ForwardScanDirection, 0L, 0L);
 
 	/* and clean up */
 	ExecutorFinish(queryDesc);
diff --git a/src/backend/commands/portalcmds.c b/src/backend/commands/portalcmds.c
index 2794537..255c86e 100644
--- a/src/backend/commands/portalcmds.c
+++ b/src/backend/commands/portalcmds.c
@@ -177,6 +177,7 @@ PerformPortalFetch(FetchStmt *stmt,
 	nprocessed = PortalRunFetch(portal,
 								stmt->direction,
 								stmt->howMany,
+								stmt->howLarge,
 								dest);
 
 	/* Return command status if wanted */
@@ -375,7 +376,7 @@ PersistHoldablePortal(Portal portal)
 										true);
 
 		/* Fetch the result set into the tuplestore */
-		ExecutorRun(queryDesc, ForwardScanDirection, 0L);
+		ExecutorRun(queryDesc, ForwardScanDirection, 0L, 0L);
 
 		(*queryDesc->dest->rDestroy) (queryDesc->dest);
 		queryDesc->dest = NULL;
diff --git a/src/backend/commands/prepare.c b/src/backend/commands/prepare.c
index 71b08f0..31799f5 100644
--- a/src/backend/commands/prepare.c
+++ b/src/backend/commands/prepare.c
@@ -291,7 +291,7 @@ ExecuteQuery(ExecuteStmt *stmt, IntoClause *intoClause,
 	 */
 	PortalStart(portal, paramLI, eflags, GetActiveSnapshot());
 
-	(void) PortalRun(portal, count, false, dest, dest, completionTag);
+	(void) PortalRun(portal, count, 0L, false, dest, dest, completionTag);
 
 	PortalDrop(portal, false);
 
diff --git a/src/backend/executor/execMain.c b/src/backend/executor/execMain.c
index b9f21c5..9cecc1d 100644
--- a/src/backend/executor/execMain.c
+++ b/src/backend/executor/execMain.c
@@ -78,6 +78,7 @@ static void ExecutePlan(EState *estate, PlanState *planstate,
 			CmdType operation,
 			bool sendTuples,
 			long numberTuples,
+			long sizeTuples,
 			ScanDirection direction,
 			DestReceiver *dest);
 static bool ExecCheckRTEPerms(RangeTblEntry *rte);
@@ -248,17 +249,17 @@ standard_ExecutorStart(QueryDesc *queryDesc, int eflags)
  */
 void
 ExecutorRun(QueryDesc *queryDesc,
-			ScanDirection direction, long count)
+			ScanDirection direction, long count, long size)
 {
 	if (ExecutorRun_hook)
-		(*ExecutorRun_hook) (queryDesc, direction, count);
+		(*ExecutorRun_hook) (queryDesc, direction, count, size);
 	else
-		standard_ExecutorRun(queryDesc, direction, count);
+		standard_ExecutorRun(queryDesc, direction, count, size);
 }
 
 void
 standard_ExecutorRun(QueryDesc *queryDesc,
-					 ScanDirection direction, long count)
+					 ScanDirection direction, long count, long size)
 {
 	EState	   *estate;
 	CmdType		operation;
@@ -310,6 +311,7 @@ standard_ExecutorRun(QueryDesc *queryDesc,
 					operation,
 					sendTuples,
 					count,
+					size,
 					direction,
 					dest);
 
@@ -1450,22 +1452,26 @@ ExecutePlan(EState *estate,
 			CmdType operation,
 			bool sendTuples,
 			long numberTuples,
+			long sizeTuples,
 			ScanDirection direction,
 			DestReceiver *dest)
 {
 	TupleTableSlot *slot;
 	long		current_tuple_count;
+	long		sent_size;
 
 	/*
 	 * initialize local variables
 	 */
 	current_tuple_count = 0;
-
+	sent_size = 0;
 	/*
 	 * Set the direction.
 	 */
 	estate->es_direction = direction;
 
+	estate->es_stoppedbysize = false;
+
 	/*
 	 * Loop until we've processed the proper number of tuples from the plan.
 	 */
@@ -1520,6 +1526,25 @@ ExecutePlan(EState *estate,
 		current_tuple_count++;
 		if (numberTuples && numberTuples == current_tuple_count)
 			break;
+
+		/* Count the size of tuples we've sent */
+		if (slot->tts_tuple)
+			sent_size += HEAPTUPLESIZE + slot->tts_tuple->t_len;
+		else
+		{
+			sent_size += HEAPTUPLESIZE +
+				heap_compute_data_size(slot->tts_tupleDescriptor,
+									   slot->tts_values,
+									   slot->tts_isnull);
+		}
+
+		/* Quit when the size limit will be exceeded by this tuple */
+		if (sizeTuples > 0 && sizeTuples < sent_size)
+		{
+			estate->es_stoppedbysize = true;
+			break;
+		}
+
 	}
 }
 
diff --git a/src/backend/executor/execUtils.c b/src/backend/executor/execUtils.c
index 32697dd..ff2c395 100644
--- a/src/backend/executor/execUtils.c
+++ b/src/backend/executor/execUtils.c
@@ -133,6 +133,7 @@ CreateExecutorState(void)
 	estate->es_rowMarks = NIL;
 
 	estate->es_processed = 0;
+	estate->es_stoppedbysize = false;
 	estate->es_lastoid = InvalidOid;
 
 	estate->es_top_eflags = 0;
diff --git a/src/backend/executor/functions.c b/src/backend/executor/functions.c
index 84be37c..d64e908 100644
--- a/src/backend/executor/functions.c
+++ b/src/backend/executor/functions.c
@@ -850,7 +850,7 @@ postquel_getnext(execution_state *es, SQLFunctionCachePtr fcache)
 		/* Run regular commands to completion unless lazyEval */
 		long		count = (es->lazyEval) ? 1L : 0L;
 
-		ExecutorRun(es->qd, ForwardScanDirection, count);
+		ExecutorRun(es->qd, ForwardScanDirection, count, 0L);
 
 		/*
 		 * If we requested run to completion OR there was no tuple returned,
diff --git a/src/backend/executor/spi.c b/src/backend/executor/spi.c
index 4b86e91..cb30cfb 100644
--- a/src/backend/executor/spi.c
+++ b/src/backend/executor/spi.c
@@ -2369,7 +2369,7 @@ _SPI_pquery(QueryDesc *queryDesc, bool fire_triggers, long tcount)
 
 	ExecutorStart(queryDesc, eflags);
 
-	ExecutorRun(queryDesc, ForwardScanDirection, tcount);
+	ExecutorRun(queryDesc, ForwardScanDirection, tcount, 0L);
 
 	_SPI_current->processed = queryDesc->estate->es_processed;
 	_SPI_current->lastoid = queryDesc->estate->es_lastoid;
@@ -2447,7 +2447,7 @@ _SPI_cursor_operation(Portal portal, FetchDirection direction, long count,
 	/* Run the cursor */
 	nfetched = PortalRunFetch(portal,
 							  direction,
-							  count,
+							  count, 0L,
 							  dest);
 
 	/*
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index 36dac29..3a139ed 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -520,6 +520,8 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
 %type <str>		opt_existing_window_name
 %type <boolean> opt_if_not_exists
 
+%type <ival>	opt_fetch_limit
+
 /*
  * Non-keyword token types.  These are hard-wired into the "flex" lexer.
  * They must be listed first so that their numeric codes do not depend on
@@ -6021,6 +6023,15 @@ fetch_args:	cursor_name
 					n->howMany = $1;
 					$$ = (Node *)n;
 				}
+			| SignedIconst LIMIT Iconst opt_from_in cursor_name
+				{
+					FetchStmt *n = makeNode(FetchStmt);
+					n->portalname = $5;
+					n->direction = FETCH_FORWARD;
+					n->howMany = $1;
+					n->howLarge = $3;
+					$$ = (Node *)n;
+				}
 			| ALL opt_from_in cursor_name
 				{
 					FetchStmt *n = makeNode(FetchStmt);
@@ -6029,6 +6040,15 @@ fetch_args:	cursor_name
 					n->howMany = FETCH_ALL;
 					$$ = (Node *)n;
 				}
+			| ALL LIMIT Iconst opt_from_in cursor_name
+				{
+					FetchStmt *n = makeNode(FetchStmt);
+					n->portalname = $5;
+					n->direction = FETCH_FORWARD;
+					n->howMany = FETCH_ALL;
+					n->howLarge = $3;
+					$$ = (Node *)n;
+				}
 			| FORWARD opt_from_in cursor_name
 				{
 					FetchStmt *n = makeNode(FetchStmt);
@@ -6045,6 +6065,15 @@ fetch_args:	cursor_name
 					n->howMany = $2;
 					$$ = (Node *)n;
 				}
+			| FORWARD SignedIconst LIMIT Iconst opt_from_in cursor_name
+				{
+					FetchStmt *n = makeNode(FetchStmt);
+					n->portalname = $6;
+					n->direction = FETCH_FORWARD;
+					n->howMany = $2;
+					n->howLarge = $4;
+					$$ = (Node *)n;
+				}
 			| FORWARD ALL opt_from_in cursor_name
 				{
 					FetchStmt *n = makeNode(FetchStmt);
@@ -6053,6 +6082,15 @@ fetch_args:	cursor_name
 					n->howMany = FETCH_ALL;
 					$$ = (Node *)n;
 				}
+			| FORWARD ALL LIMIT Iconst opt_from_in cursor_name
+				{
+					FetchStmt *n = makeNode(FetchStmt);
+					n->portalname = $6;
+					n->direction = FETCH_FORWARD;
+					n->howMany = FETCH_ALL;
+					n->howLarge = $4;
+					$$ = (Node *)n;
+				}
 			| BACKWARD opt_from_in cursor_name
 				{
 					FetchStmt *n = makeNode(FetchStmt);
@@ -6069,6 +6107,15 @@ fetch_args:	cursor_name
 					n->howMany = $2;
 					$$ = (Node *)n;
 				}
+			| BACKWARD SignedIconst LIMIT Iconst opt_from_in cursor_name
+				{
+					FetchStmt *n = makeNode(FetchStmt);
+					n->portalname = $6;
+					n->direction = FETCH_BACKWARD;
+					n->howMany = $2;
+					n->howLarge = $4;
+					$$ = (Node *)n;
+				}
 			| BACKWARD ALL opt_from_in cursor_name
 				{
 					FetchStmt *n = makeNode(FetchStmt);
@@ -6077,6 +6124,15 @@ fetch_args:	cursor_name
 					n->howMany = FETCH_ALL;
 					$$ = (Node *)n;
 				}
+			| BACKWARD ALL LIMIT Iconst opt_from_in cursor_name
+				{
+					FetchStmt *n = makeNode(FetchStmt);
+					n->portalname = $6;
+					n->direction = FETCH_BACKWARD;
+					n->howMany = FETCH_ALL;
+					n->howLarge = $4;
+					$$ = (Node *)n;
+				}
 		;
 
 from_in:	FROM									{}
@@ -6087,6 +6143,9 @@ opt_from_in:	from_in								{}
 			| /* EMPTY */							{}
 		;
 
+opt_fetch_limit:	LIMIT Iconst					{ $$ = $2;}
+			| /* EMPTY */							{ $$ = 0; }
+		;
 
 /*****************************************************************************
  *
diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c
index 8f74353..55f062b 100644
--- a/src/backend/tcop/postgres.c
+++ b/src/backend/tcop/postgres.c
@@ -1043,6 +1043,7 @@ exec_simple_query(const char *query_string)
 		 */
 		(void) PortalRun(portal,
 						 FETCH_ALL,
+						 0,
 						 isTopLevel,
 						 receiver,
 						 receiver,
@@ -1928,6 +1929,7 @@ exec_execute_message(const char *portal_name, long max_rows)
 
 	completed = PortalRun(portal,
 						  max_rows,
+						  0,
 						  true, /* always top level */
 						  receiver,
 						  receiver,
diff --git a/src/backend/tcop/pquery.c b/src/backend/tcop/pquery.c
index 9c14e8a..5f68cc7 100644
--- a/src/backend/tcop/pquery.c
+++ b/src/backend/tcop/pquery.c
@@ -16,6 +16,7 @@
 #include "postgres.h"
 
 #include "access/xact.h"
+#include "access/htup_details.h"
 #include "commands/prepare.h"
 #include "executor/tstoreReceiver.h"
 #include "miscadmin.h"
@@ -39,9 +40,10 @@ static void ProcessQuery(PlannedStmt *plan,
 			 DestReceiver *dest,
 			 char *completionTag);
 static void FillPortalStore(Portal portal, bool isTopLevel);
-static uint32 RunFromStore(Portal portal, ScanDirection direction, long count,
+static uint32 RunFromStore(Portal portal, ScanDirection direction,
+		     long count, long size, bool *stoppedbysize,
 			 DestReceiver *dest);
-static long PortalRunSelect(Portal portal, bool forward, long count,
+static long PortalRunSelect(Portal portal, bool forward, long count, long size,
 				DestReceiver *dest);
 static void PortalRunUtility(Portal portal, Node *utilityStmt, bool isTopLevel,
 				 DestReceiver *dest, char *completionTag);
@@ -51,6 +53,7 @@ static void PortalRunMulti(Portal portal, bool isTopLevel,
 static long DoPortalRunFetch(Portal portal,
 				 FetchDirection fdirection,
 				 long count,
+				 long size,
 				 DestReceiver *dest);
 static void DoPortalRewind(Portal portal);
 
@@ -182,7 +185,7 @@ ProcessQuery(PlannedStmt *plan,
 	/*
 	 * Run the plan to completion.
 	 */
-	ExecutorRun(queryDesc, ForwardScanDirection, 0L);
+	ExecutorRun(queryDesc, ForwardScanDirection, 0L, 0L);
 
 	/*
 	 * Build command completion status string, if caller wants one.
@@ -703,7 +706,7 @@ PortalSetResultFormat(Portal portal, int nFormats, int16 *formats)
  * suspended due to exhaustion of the count parameter.
  */
 bool
-PortalRun(Portal portal, long count, bool isTopLevel,
+PortalRun(Portal portal, long count, long size, bool isTopLevel,
 		  DestReceiver *dest, DestReceiver *altdest,
 		  char *completionTag)
 {
@@ -787,7 +790,7 @@ PortalRun(Portal portal, long count, bool isTopLevel,
 				/*
 				 * Now fetch desired portion of results.
 				 */
-				nprocessed = PortalRunSelect(portal, true, count, dest);
+				nprocessed = PortalRunSelect(portal, true, count, size, dest);
 
 				/*
 				 * If the portal result contains a command tag and the caller
@@ -892,11 +895,13 @@ static long
 PortalRunSelect(Portal portal,
 				bool forward,
 				long count,
+				long size,
 				DestReceiver *dest)
 {
 	QueryDesc  *queryDesc;
 	ScanDirection direction;
 	uint32		nprocessed;
+	bool		stoppedbysize;
 
 	/*
 	 * NB: queryDesc will be NULL if we are fetching from a held cursor or a
@@ -939,12 +944,14 @@ PortalRunSelect(Portal portal,
 			count = 0;
 
 		if (portal->holdStore)
-			nprocessed = RunFromStore(portal, direction, count, dest);
+			nprocessed = RunFromStore(portal, direction, count,
+									  size, &stoppedbysize, dest);
 		else
 		{
 			PushActiveSnapshot(queryDesc->snapshot);
-			ExecutorRun(queryDesc, direction, count);
+			ExecutorRun(queryDesc, direction, count, size);
 			nprocessed = queryDesc->estate->es_processed;
+			stoppedbysize = queryDesc->estate->es_stoppedbysize;
 			PopActiveSnapshot();
 		}
 
@@ -954,8 +961,9 @@ PortalRunSelect(Portal portal,
 
 			if (nprocessed > 0)
 				portal->atStart = false;		/* OK to go backward now */
-			if (count == 0 ||
-				(unsigned long) nprocessed < (unsigned long) count)
+			if ((count == 0 ||
+				 (unsigned long) nprocessed < (unsigned long) count) &&
+				!stoppedbysize)
 				portal->atEnd = true;	/* we retrieved 'em all */
 			oldPos = portal->portalPos;
 			portal->portalPos += nprocessed;
@@ -982,12 +990,14 @@ PortalRunSelect(Portal portal,
 			count = 0;
 
 		if (portal->holdStore)
-			nprocessed = RunFromStore(portal, direction, count, dest);
+			nprocessed = RunFromStore(portal, direction, count,
+									  size, &stoppedbysize, dest);
 		else
 		{
 			PushActiveSnapshot(queryDesc->snapshot);
-			ExecutorRun(queryDesc, direction, count);
+			ExecutorRun(queryDesc, direction, count, size);
 			nprocessed = queryDesc->estate->es_processed;
+			stoppedbysize = queryDesc->estate->es_stoppedbysize;
 			PopActiveSnapshot();
 		}
 
@@ -998,8 +1008,9 @@ PortalRunSelect(Portal portal,
 				portal->atEnd = false;	/* OK to go forward now */
 				portal->portalPos++;	/* adjust for endpoint case */
 			}
-			if (count == 0 ||
-				(unsigned long) nprocessed < (unsigned long) count)
+			if ((count == 0 ||
+				 (unsigned long) nprocessed < (unsigned long) count) &&
+				!stoppedbysize)
 			{
 				portal->atStart = true; /* we retrieved 'em all */
 				portal->portalPos = 0;
@@ -1089,10 +1100,13 @@ FillPortalStore(Portal portal, bool isTopLevel)
  */
 static uint32
 RunFromStore(Portal portal, ScanDirection direction, long count,
-			 DestReceiver *dest)
+			 long size_limit, bool *stoppedbysize, DestReceiver *dest)
 {
 	long		current_tuple_count = 0;
 	TupleTableSlot *slot;
+	long			sent_size = 0;
+
+	*stoppedbysize = false;
 
 	slot = MakeSingleTupleTableSlot(portal->tupDesc);
 
@@ -1133,6 +1147,25 @@ RunFromStore(Portal portal, ScanDirection direction, long count,
 			current_tuple_count++;
 			if (count && count == current_tuple_count)
 				break;
+
+			/* Count the size of tuples we've sent */
+			if (slot->tts_tuple)
+				sent_size += HEAPTUPLESIZE + slot->tts_tuple->t_len;
+			else
+			{
+				sent_size += HEAPTUPLESIZE +
+					heap_compute_data_size(slot->tts_tupleDescriptor,
+										   slot->tts_values,
+										   slot->tts_isnull);
+			}
+
+			/* Quit when the size limit will be exceeded by this tuple */
+			if (current_tuple_count > 0 &&
+				size_limit > 0 && size_limit < sent_size)
+			{
+				*stoppedbysize = true;
+				break;
+			}
 		}
 	}
 
@@ -1385,6 +1418,7 @@ long
 PortalRunFetch(Portal portal,
 			   FetchDirection fdirection,
 			   long count,
+			   long size,
 			   DestReceiver *dest)
 {
 	long		result;
@@ -1422,7 +1456,7 @@ PortalRunFetch(Portal portal,
 		switch (portal->strategy)
 		{
 			case PORTAL_ONE_SELECT:
-				result = DoPortalRunFetch(portal, fdirection, count, dest);
+				result = DoPortalRunFetch(portal, fdirection, count, size, dest);
 				break;
 
 			case PORTAL_ONE_RETURNING:
@@ -1439,7 +1473,7 @@ PortalRunFetch(Portal portal,
 				/*
 				 * Now fetch desired portion of results.
 				 */
-				result = DoPortalRunFetch(portal, fdirection, count, dest);
+				result = DoPortalRunFetch(portal, fdirection, count, size, dest);
 				break;
 
 			default:
@@ -1484,6 +1518,7 @@ static long
 DoPortalRunFetch(Portal portal,
 				 FetchDirection fdirection,
 				 long count,
+				 long size,
 				 DestReceiver *dest)
 {
 	bool		forward;
@@ -1526,7 +1561,7 @@ DoPortalRunFetch(Portal portal,
 				{
 					DoPortalRewind(portal);
 					if (count > 1)
-						PortalRunSelect(portal, true, count - 1,
+						PortalRunSelect(portal, true, count - 1, 0L,
 										None_Receiver);
 				}
 				else
@@ -1536,13 +1571,13 @@ DoPortalRunFetch(Portal portal,
 					if (portal->atEnd)
 						pos++;	/* need one extra fetch if off end */
 					if (count <= pos)
-						PortalRunSelect(portal, false, pos - count + 1,
+						PortalRunSelect(portal, false, pos - count + 1, 0L,
 										None_Receiver);
 					else if (count > pos + 1)
-						PortalRunSelect(portal, true, count - pos - 1,
+						PortalRunSelect(portal, true, count - pos - 1, 0L,
 										None_Receiver);
 				}
-				return PortalRunSelect(portal, true, 1L, dest);
+				return PortalRunSelect(portal, true, 1L, 0L, dest);
 			}
 			else if (count < 0)
 			{
@@ -1553,17 +1588,17 @@ DoPortalRunFetch(Portal portal,
 				 * (Is it worth considering case where count > half of size of
 				 * query?  We could rewind once we know the size ...)
 				 */
-				PortalRunSelect(portal, true, FETCH_ALL, None_Receiver);
+				PortalRunSelect(portal, true, FETCH_ALL, 0L, None_Receiver);
 				if (count < -1)
-					PortalRunSelect(portal, false, -count - 1, None_Receiver);
-				return PortalRunSelect(portal, false, 1L, dest);
+					PortalRunSelect(portal, false, -count - 1, 0, None_Receiver);
+				return PortalRunSelect(portal, false, 1L, 0L, dest);
 			}
 			else
 			{
 				/* count == 0 */
 				/* Rewind to start, return zero rows */
 				DoPortalRewind(portal);
-				return PortalRunSelect(portal, true, 0L, dest);
+				return PortalRunSelect(portal, true, 0L, 0L, dest);
 			}
 			break;
 		case FETCH_RELATIVE:
@@ -1573,8 +1608,8 @@ DoPortalRunFetch(Portal portal,
 				 * Definition: advance count-1 rows, return next row (if any).
 				 */
 				if (count > 1)
-					PortalRunSelect(portal, true, count - 1, None_Receiver);
-				return PortalRunSelect(portal, true, 1L, dest);
+					PortalRunSelect(portal, true, count - 1, 0L, None_Receiver);
+				return PortalRunSelect(portal, true, 1L, 0L, dest);
 			}
 			else if (count < 0)
 			{
@@ -1583,8 +1618,8 @@ DoPortalRunFetch(Portal portal,
 				 * any).
 				 */
 				if (count < -1)
-					PortalRunSelect(portal, false, -count - 1, None_Receiver);
-				return PortalRunSelect(portal, false, 1L, dest);
+					PortalRunSelect(portal, false, -count - 1, 0L, None_Receiver);
+				return PortalRunSelect(portal, false, 1L, 0L, dest);
 			}
 			else
 			{
@@ -1630,7 +1665,7 @@ DoPortalRunFetch(Portal portal,
 			 */
 			if (on_row)
 			{
-				PortalRunSelect(portal, false, 1L, None_Receiver);
+				PortalRunSelect(portal, false, 1L, 0L, None_Receiver);
 				/* Set up to fetch one row forward */
 				count = 1;
 				forward = true;
@@ -1652,7 +1687,7 @@ DoPortalRunFetch(Portal portal,
 		return result;
 	}
 
-	return PortalRunSelect(portal, forward, count, dest);
+	return PortalRunSelect(portal, forward, count, size, dest);
 }
 
 /*
diff --git a/src/include/executor/executor.h b/src/include/executor/executor.h
index 40fde83..64a02c3 100644
--- a/src/include/executor/executor.h
+++ b/src/include/executor/executor.h
@@ -80,8 +80,8 @@ extern PGDLLIMPORT ExecutorStart_hook_type ExecutorStart_hook;
 
 /* Hook for plugins to get control in ExecutorRun() */
 typedef void (*ExecutorRun_hook_type) (QueryDesc *queryDesc,
-												   ScanDirection direction,
-												   long count);
+									   ScanDirection direction,
+									   long count, long size);
 extern PGDLLIMPORT ExecutorRun_hook_type ExecutorRun_hook;
 
 /* Hook for plugins to get control in ExecutorFinish() */
@@ -176,9 +176,9 @@ extern TupleTableSlot *ExecFilterJunk(JunkFilter *junkfilter,
 extern void ExecutorStart(QueryDesc *queryDesc, int eflags);
 extern void standard_ExecutorStart(QueryDesc *queryDesc, int eflags);
 extern void ExecutorRun(QueryDesc *queryDesc,
-			ScanDirection direction, long count);
+			ScanDirection direction, long count, long size);
 extern void standard_ExecutorRun(QueryDesc *queryDesc,
-					 ScanDirection direction, long count);
+		    ScanDirection direction, long count, long size);
 extern void ExecutorFinish(QueryDesc *queryDesc);
 extern void standard_ExecutorFinish(QueryDesc *queryDesc);
 extern void ExecutorEnd(QueryDesc *queryDesc);
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index 41288ed..d963286 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -376,6 +376,7 @@ typedef struct EState
 	List	   *es_rowMarks;	/* List of ExecRowMarks */
 
 	uint32		es_processed;	/* # of tuples processed */
+	bool		es_stoppedbysize; /* true if processing stopped by size */
 	Oid			es_lastoid;		/* last oid processed (by INSERT) */
 
 	int			es_top_eflags;	/* eflags passed to ExecutorStart */
diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h
index b1dfa85..9e18331 100644
--- a/src/include/nodes/parsenodes.h
+++ b/src/include/nodes/parsenodes.h
@@ -2223,6 +2223,7 @@ typedef struct FetchStmt
 	NodeTag		type;
 	FetchDirection direction;	/* see above */
 	long		howMany;		/* number of rows, or position argument */
+	long		howLarge;		/* total bytes of rows */
 	char	   *portalname;		/* name of portal (cursor) */
 	bool		ismove;			/* TRUE if MOVE */
 } FetchStmt;
diff --git a/src/include/tcop/pquery.h b/src/include/tcop/pquery.h
index 8073a6e..afffe86 100644
--- a/src/include/tcop/pquery.h
+++ b/src/include/tcop/pquery.h
@@ -33,13 +33,14 @@ extern void PortalStart(Portal portal, ParamListInfo params,
 extern void PortalSetResultFormat(Portal portal, int nFormats,
 					  int16 *formats);
 
-extern bool PortalRun(Portal portal, long count, bool isTopLevel,
+extern bool PortalRun(Portal portal, long count, long size, bool isTopLevel,
 		  DestReceiver *dest, DestReceiver *altdest,
 		  char *completionTag);
 
 extern long PortalRunFetch(Portal portal,
 			   FetchDirection fdirection,
 			   long count,
+			   long size,
 			   DestReceiver *dest);
 
 #endif   /* PQUERY_H */
diff --git a/src/interfaces/ecpg/preproc/Makefile b/src/interfaces/ecpg/preproc/Makefile
index 1ecc405..b492fa7 100644
--- a/src/interfaces/ecpg/preproc/Makefile
+++ b/src/interfaces/ecpg/preproc/Makefile
@@ -48,7 +48,7 @@ ecpg: $(OBJS) | submake-libpgport
 preproc.o: pgc.c
 
 preproc.h: preproc.c ;
-preproc.c: BISONFLAGS += -d
+preproc.c: BISONFLAGS += -r all -d
 
 preproc.y: ../../../backend/parser/gram.y parse.pl ecpg.addons ecpg.header ecpg.tokens ecpg.trailer ecpg.type
 	$(PERL) $(srcdir)/parse.pl $(srcdir) < $< > $@
diff --git a/src/interfaces/ecpg/preproc/ecpg.addons b/src/interfaces/ecpg/preproc/ecpg.addons
index b3b36cf..bdccb68 100644
--- a/src/interfaces/ecpg/preproc/ecpg.addons
+++ b/src/interfaces/ecpg/preproc/ecpg.addons
@@ -220,13 +220,46 @@ ECPG: fetch_argsNEXTopt_from_incursor_name addon
 ECPG: fetch_argsPRIORopt_from_incursor_name addon
 ECPG: fetch_argsFIRST_Popt_from_incursor_name addon
 ECPG: fetch_argsLAST_Popt_from_incursor_name addon
+		add_additional_variables($3, false);
+		if ($3[0] == ':')
+		{
+			free($3);
+			$3 = mm_strdup("$0");
+		}
 ECPG: fetch_argsALLopt_from_incursor_name addon
+ECPG: fetch_argsFORWARDopt_from_incursor_name addon
+ECPG: fetch_argsBACKWARDopt_from_incursor_name addon
 		add_additional_variables($3, false);
 		if ($3[0] == ':')
 		{
 			free($3);
 			$3 = mm_strdup("$0");
 		}
+ECPG: fetch_argsALLLIMITIconstopt_from_incursor_name addon
+		add_additional_variables($5, false);
+		if ($5[0] == ':')
+		{
+			free($5);
+			$5 = mm_strdup("$0");
+		}
+		if ($3[0] == '$')
+		{
+			free($3);
+			$3 = mm_strdup("$0");
+		}
+ECPG: fetch_argsFORWARDALLLIMITIconstopt_from_incursor_name addon
+ECPG: fetch_argsBACKWARDALLLIMITIconstopt_from_incursor_name addon
+		add_additional_variables($6, false);
+		if ($6[0] == ':')
+		{
+			free($6);
+			$6 = mm_strdup("$0");
+		}
+		if ($4[0] == '$')
+		{
+			free($4);
+			$4 = mm_strdup("$0");
+		}
 ECPG: fetch_argsSignedIconstopt_from_incursor_name addon
 		add_additional_variables($3, false);
 		if ($3[0] == ':')
@@ -234,11 +267,41 @@ ECPG: fetch_argsSignedIconstopt_from_incursor_name addon
 			free($3);
 			$3 = mm_strdup("$0");
 		}
+ECPG: fetch_argsSignedIconstLIMITIconstopt_from_incursor_name addon
+		add_additional_variables($5, false);
+		if ($5[0] == ':')
+		{
+			free($5);
+			$5 = mm_strdup("$0");
+		}
 		if ($1[0] == '$')
 		{
 			free($1);
 			$1 = mm_strdup("$0");
 		}
+		if ($3[0] == '$')
+		{
+			free($3);
+			$3 = mm_strdup("$0");
+		}
+ECPG: fetch_argsFORWARDSignedIconstLIMITIconstopt_from_incursor_name addon
+ECPG: fetch_argsBACKWARDSignedIconstLIMITIconstopt_from_incursor_name addon
+		add_additional_variables($6, false);
+		if ($6[0] == ':')
+		{
+			free($6);
+			$6 = mm_strdup("$0");
+		}
+		if ($2[0] == '$')
+		{
+			free($2);
+			$2 = mm_strdup("$0");
+		}
+		if ($4[0] == '$')
+		{
+			free($4);
+			$4 = mm_strdup("$0");
+		}
 ECPG: fetch_argsFORWARDALLopt_from_incursor_name addon
 ECPG: fetch_argsBACKWARDALLopt_from_incursor_name addon
 		add_additional_variables($4, false);
-- 
2.1.0.GIT

