From c662b8865c58cba714655148401ac86a21c10f3c Mon Sep 17 00:00:00 2001
From: Mikko Tiihonen <mikko.tiihonen@nitorcreations.com>
Date: Sat, 1 Nov 2014 15:43:19 +0200
Subject: [PATCH] Example pipelined single-shot query

---
 org/postgresql/core/QueryExecutor.java             |  13 +++
 org/postgresql/core/v2/QueryExecutorImpl.java      |   5 +
 org/postgresql/core/v3/QueryExecutorImpl.java      |  41 +++++++
 org/postgresql/jdbc2/AbstractJdbc2Statement.java   |  79 +++++++++++++
 .../test/jdbc2/PipelineExecutionTest.java          | 129 +++++++++++++++++++++
 5 files changed, 267 insertions(+)
 create mode 100644 org/postgresql/test/jdbc2/PipelineExecutionTest.java

diff --git a/org/postgresql/core/QueryExecutor.java b/org/postgresql/core/QueryExecutor.java
index e80a23c..b8e46a6 100644
--- a/org/postgresql/core/QueryExecutor.java
+++ b/org/postgresql/core/QueryExecutor.java
@@ -8,6 +8,7 @@
 */
 package org.postgresql.core;
 
+import java.io.IOException;
 import java.sql.SQLException;
 
 import org.postgresql.copy.CopyOperation;
@@ -101,6 +102,16 @@ public interface QueryExecutor {
     static int QUERY_NO_BINARY_TRANSFER = 256;
 
     /**
+     * Flag for pipeline executions with responses read later.
+     */
+    static int QUERY_PIPELINE = 512;
+
+    /**
+     * Flag for pipeline executions with responses read later.
+     */
+    static int QUERY_DEQUEUE_PIPELINE = 1024;
+
+    /**
      * Execute a Query, passing results to a provided ResultHandler.
      *
      * @param query the query to execute; must be a query returned from
@@ -125,6 +136,8 @@ public interface QueryExecutor {
                  int flags)
     throws SQLException;
 
+    void processPipelinedResult(ResultHandler handler) throws SQLException;
+
     /**
      * Execute several Query, passing results to a provided ResultHandler.
      *
diff --git a/org/postgresql/core/v2/QueryExecutorImpl.java b/org/postgresql/core/v2/QueryExecutorImpl.java
index 33c0048..5a6f607 100644
--- a/org/postgresql/core/v2/QueryExecutorImpl.java
+++ b/org/postgresql/core/v2/QueryExecutorImpl.java
@@ -616,4 +616,9 @@ public class QueryExecutorImpl implements QueryExecutor {
     public CopyOperation startCopy(String sql, boolean suppressBegin) throws SQLException {
         throw new PSQLException(GT.tr("Copy not implemented for protocol version 2"), PSQLState.NOT_IMPLEMENTED);
     }
+
+    @Override
+    public void processPipelinedResult(ResultHandler handler) throws SQLException {
+        throw new PSQLException(GT.tr("Copy not implemented for protocol version 2"), PSQLState.NOT_IMPLEMENTED);
+    }
 }
diff --git a/org/postgresql/core/v3/QueryExecutorImpl.java b/org/postgresql/core/v3/QueryExecutorImpl.java
index 966a6f6..7297764 100644
--- a/org/postgresql/core/v3/QueryExecutorImpl.java
+++ b/org/postgresql/core/v3/QueryExecutorImpl.java
@@ -11,6 +11,7 @@ package org.postgresql.core.v3;
 import org.postgresql.core.*;
 
 import java.util.ArrayList;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.HashMap;
 import java.util.Properties;
@@ -1713,7 +1714,33 @@ public class QueryExecutorImpl implements QueryExecutor {
         }
     }
 
+    public void processPipelinedResult(ResultHandler handler) throws SQLException {
+        ResultHandlerHolder holder;
+        while ((holder = pipelineResultHandlers.remove(0)) != null) {
+            try {
+                processResults(holder.handler, holder.flags & (~QUERY_PIPELINE) | QUERY_DEQUEUE_PIPELINE);
+            } catch (IOException e) {
+                protoConnection.close();
+                handler.handleError(new PSQLException(GT.tr("An I/O error occurred while sending to the backend."), PSQLState.CONNECTION_FAILURE, e));
+            }
+            holder.handler.handleCompletion();
+
+            if (holder.handler == handler) {
+                return;
+            }
+        }
+    }
+
     protected void processResults(ResultHandler handler, int flags) throws IOException {
+        if ((flags & QUERY_PIPELINE) != 0) {
+            pipelineResultHandlers.add(new ResultHandlerHolder(handler, flags));
+            return;
+        } else {
+            if (!pipelineResultHandlers.isEmpty() && (flags & QUERY_DEQUEUE_PIPELINE) == 0) {
+                handler.handleError(new PSQLException(GT.tr("Pipelining still in progress pending."), PSQLState.TRANSACTION_STATE_INVALID));
+                return;
+            }
+        }
         boolean noResults = (flags & QueryExecutor.QUERY_NO_RESULTS) != 0;
         boolean bothRowsAndStatus = (flags & QueryExecutor.QUERY_BOTH_ROWS_AND_STATUS) != 0;
 
@@ -2019,6 +2046,10 @@ public class QueryExecutorImpl implements QueryExecutor {
                     failedQuery.unprepare();
                 }
 
+                if ((flags & QUERY_DEQUEUE_PIPELINE) != 0 || parseIndex < pendingExecuteQueue.size()) {
+                    break;
+                }
+
                 pendingParseQueue.clear();              // No more ParseComplete messages expected.
                 pendingDescribeStatementQueue.clear();  // No more ParameterDescription messages expected.
                 pendingDescribePortalQueue.clear();     // No more RowDescription messages expected.
@@ -2282,6 +2313,16 @@ public class QueryExecutorImpl implements QueryExecutor {
     private final ArrayList pendingExecuteQueue = new ArrayList(); // list of {SimpleQuery,Portal} object arrays
     private final ArrayList pendingDescribeStatementQueue = new ArrayList(); // list of {SimpleQuery, SimpleParameterList, Boolean} object arrays
     private final ArrayList pendingDescribePortalQueue = new ArrayList(); // list of SimpleQuery
+    private final List<ResultHandlerHolder> pipelineResultHandlers = new LinkedList<ResultHandlerHolder>();
+
+    static class ResultHandlerHolder {
+        final ResultHandler handler;
+        final int flags;
+        public ResultHandlerHolder(ResultHandler handler, int flags) {
+            this.handler = handler;
+            this.flags = flags;
+        }
+    }
 
     private long nextUniqueID = 1;
     private final ProtocolConnectionImpl protoConnection;
diff --git a/org/postgresql/jdbc2/AbstractJdbc2Statement.java b/org/postgresql/jdbc2/AbstractJdbc2Statement.java
index b50aa7a..7260d1e 100644
--- a/org/postgresql/jdbc2/AbstractJdbc2Statement.java
+++ b/org/postgresql/jdbc2/AbstractJdbc2Statement.java
@@ -20,6 +20,10 @@ import java.util.Map;
 import java.util.TimerTask;
 import java.util.TimeZone;
 import java.util.Calendar;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 import org.postgresql.Driver;
 import org.postgresql.largeobject.*;
@@ -33,6 +37,8 @@ import org.postgresql.util.PSQLState;
 import org.postgresql.util.PGobject;
 import org.postgresql.util.GT;
 
+import com.sun.corba.se.spi.legacy.connection.GetEndPointInfoAgainException;
+
 /**
  * This class defines methods of the jdbc2 specification.
  * The real Statement class (for jdbc2) is org.postgresql.jdbc2.Jdbc2Statement
@@ -197,6 +203,50 @@ public abstract class AbstractJdbc2Statement implements BaseStatement
         return false;
     }
 
+    class ResultSetFutureFuture implements Future<ResultSet> {
+        private StatementResultHandler handler;
+
+        public ResultSetFutureFuture(StatementResultHandler handler) {
+            this.handler = handler;
+        }
+
+        @Override
+        public boolean cancel(boolean mayInterruptIfRunning) {
+            return false;
+        }
+
+        @Override
+        public boolean isCancelled() {
+            return false;
+        }
+
+        @Override
+        public boolean isDone() {
+            return false;
+        }
+
+        @Override
+        public ResultSet get() throws InterruptedException, ExecutionException {
+            try {
+                connection.getQueryExecutor().processPipelinedResult(handler);
+            } catch (SQLException e) {
+                throw new ExecutionException(e);
+            }
+            handleResults(handler);
+
+            if (result.getNext() != null)
+                throw new ExecutionException(new PSQLException(GT.tr("Multiple ResultSets were returned by the query."),
+                                        PSQLState.TOO_MANY_RESULTS));
+
+            return result.getResultSet();
+        }
+
+        @Override
+        public ResultSet get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
+            return get();
+        }
+    }
+
     //
     // ResultHandler implementations for updates, queries, and either-or.
     //
@@ -294,6 +344,26 @@ public abstract class AbstractJdbc2Statement implements BaseStatement
     }
 
     /*
+     * Execute a SQL statement that returns a single ResultSet
+     *
+     * @param sql typically a static SQL SELECT statement
+     * @return a ResulSet that contains the data produced by the query
+     * @exception SQLException if a database access error occurs
+     */
+    public Future<java.sql.ResultSet> pipelineQuery(String p_sql) throws SQLException
+    {
+        if (preparedQuery != null)
+            throw new PSQLException(GT.tr("Can''t use query methods that take a query string on a PreparedStatement."),
+                                    PSQLState.WRONG_OBJECT_TYPE);
+
+        checkClosed();
+        p_sql = replaceProcessing(p_sql);
+        Query simpleQuery = connection.getQueryExecutor().createSimpleQuery(p_sql);
+        StatementResultHandler handler = asyncExecute(simpleQuery, null, QueryExecutor.QUERY_ONESHOT | QueryExecutor.QUERY_PIPELINE);
+        return new ResultSetFutureFuture(handler);
+    }
+
+    /*
      * A Prepared SQL query is executed and its ResultSet is returned
      *
      * @return a ResultSet that contains the data produced by the
@@ -514,6 +584,11 @@ public abstract class AbstractJdbc2Statement implements BaseStatement
     }
 
     protected void execute(Query queryToExecute, ParameterList queryParameters, int flags) throws SQLException {
+        StatementResultHandler handler = asyncExecute(queryToExecute, queryParameters, flags);
+        handleResults(handler);
+    }
+
+    protected StatementResultHandler asyncExecute(Query queryToExecute, ParameterList queryParameters, int flags) throws SQLException {
         closeForNextExecution();
 
         // Enable cursor-based resultset if possible.
@@ -573,6 +648,10 @@ public abstract class AbstractJdbc2Statement implements BaseStatement
         {
             killTimer();
         }
+        return handler;
+    }
+
+    protected void handleResults(StatementResultHandler handler) {
         result = firstUnclosedResult = handler.getResults();
 
         if (wantsGeneratedKeysOnce || wantsGeneratedKeysAlways)
diff --git a/org/postgresql/test/jdbc2/PipelineExecutionTest.java b/org/postgresql/test/jdbc2/PipelineExecutionTest.java
new file mode 100644
index 0000000..8d251b3
--- /dev/null
+++ b/org/postgresql/test/jdbc2/PipelineExecutionTest.java
@@ -0,0 +1,129 @@
+/*-------------------------------------------------------------------------
+ *
+ * Copyright (c) 2004-2014, PostgreSQL Global Development Group
+ *
+ *
+ *-------------------------------------------------------------------------
+ */
+package org.postgresql.test.jdbc2;
+
+import static java.lang.System.currentTimeMillis;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayDeque;
+import java.util.Deque;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import junit.framework.TestCase;
+
+import org.postgresql.jdbc2.AbstractJdbc2Statement;
+import org.postgresql.test.TestUtil;
+
+/*
+ * TestCase to test the pipelining of queries.
+ */
+public class PipelineExecutionTest extends TestCase {
+    private static final int ITERATIONS = 50;
+    private static final int COUNT = 200;
+
+    private Connection con;
+
+    /*
+     * Constructor
+     */
+    public PipelineExecutionTest(String name) {
+        super(name);
+    }
+
+    // Set up the fixture for this testcase: the tables for this test.
+    protected void setUp() throws Exception {
+        con = TestUtil.openDB();
+        TestUtil.createTable(con, "test_p", "key int4, val text");
+        Statement st = con.createStatement();
+        for (int i = 0; i < COUNT; ++i) {
+            st.executeUpdate("insert into test_p (key, val) values (" + i + ", 'text value " + i + "')");
+        }
+        TestUtil.closeDB(con);
+    }
+
+    // Tear down the fixture for this test case.
+    protected void tearDown() throws Exception {
+        TestUtil.closeDB(con);
+        con = TestUtil.openDB();
+        TestUtil.dropTable(con, "test_p");
+        TestUtil.closeDB(con);
+    }
+
+    public void testNonPipelinedExecuteQuery() throws Exception {
+        con = TestUtil.openDB();
+        long min = Long.MAX_VALUE;
+        for (int n=0; n<50; ++n) {
+            long start = currentTimeMillis();
+            Statement stat = con.createStatement();
+            for (int i = 0; i < COUNT; ++i) {
+                ResultSet rs = stat.executeQuery("select val from test_p where key = " + i);
+                rs.next();
+                assertEquals("text value " + i, rs.getString(1));
+                rs.close();
+            }
+            stat.close();
+            long stop = currentTimeMillis();
+            min = Math.min(min, stop-start);
+            Thread.sleep(50);
+        }
+        
+        System.out.printf("Normal min: %dms%n", min);
+    }
+
+    public void testPipelinedExecuteQuery() throws Exception {
+        con = TestUtil.openDB();
+
+        long min = Long.MAX_VALUE;
+        for (int n=0; n<ITERATIONS; ++n) {
+            long start = currentTimeMillis();
+            AbstractJdbc2Statement stat = (AbstractJdbc2Statement) con.createStatement();
+            Deque<Future<ResultSet>> results = new ArrayDeque<Future<ResultSet>>();
+
+            int count = 0;
+            for (int i = 0; i < COUNT; ++i) {
+                results.add(stat.pipelineQuery("select val from test_p where key = " + i));
+                if (results.size() >= 10) {
+                    count = processFutureResults(results, count);
+                }
+            }
+
+            // proceess any left-over results
+            processFutureResults(results, count);
+            long stop = currentTimeMillis();
+            min = Math.min(min, stop-start);
+            Thread.sleep(50);
+        }
+
+        System.out.printf("Pipeline min: %dms%n", min);
+        
+        // verify that sync query works after pipelined requests
+        Statement stat = con.createStatement();
+        ResultSet rs = stat.executeQuery("select val from test_p where key = 0");
+        rs.next();
+        assertEquals("text value 0", rs.getString(1));
+        rs.close();
+
+        stat.close();
+    }
+
+    private int processFutureResults(Deque<Future<ResultSet>> results, int count) throws InterruptedException,
+            ExecutionException, SQLException {
+        Future<ResultSet> future;
+        while ((future = results.pollFirst()) != null) {
+            ResultSet rs = future.get();
+            rs.next();
+            assertEquals("text value " + (count++), rs.getString(1));
+            rs.close();
+        }
+        return count;
+    }
+}
-- 
2.1.0

