postgres_fdw binary protocol support
Hi everyone,
I have made a patch that introduces support for libpq binary protocol
in postgres_fdw. The idea is simple, when a user knows that the foreign
server is binary compatible with the local and his workload could
somehow benefit from using binary protocol, it can be switched on for a
particular server or even a particular table.
The patch adds a new foreign server and table option 'binary_format'
(by default off) and implements serialization/deserialization of query
results and parameters for binary protocol. I have tested the patch by
switching foreign servers in postgres_fdw.sql tests to binary_mode, the
only diff was in the text of the error for parsing an invalid integer
value, so it worked as expected for the test. There are a few minor
issues I don't like in the code and I am yet to write the tests and
docs for it. It would be great to get some feedback and understand,
whether this is a welcome feature, before proceeding with all of the
abovementioned.
Thanks,
Ilya Gladyshev
Attachments:
0001-postgres_fdw-libpq-binary-proto-support.patchtext/x-patch; charset=UTF-8; name=0001-postgres_fdw-libpq-binary-proto-support.patchDownload
From 2cb72df03ed94d55cf51531a2d21a7d3369ae27b Mon Sep 17 00:00:00 2001
From: Ilya Gladyshev <ilya.v.gladyshev@gmail.com>
Date: Sat, 19 Nov 2022 17:47:49 +0400
Subject: [PATCH] postgres_fdw libpq binary proto support
---
contrib/postgres_fdw/option.c | 6 +-
contrib/postgres_fdw/postgres_fdw.c | 389 ++++++++++++++++++++++++----
contrib/postgres_fdw/postgres_fdw.h | 1 +
3 files changed, 338 insertions(+), 58 deletions(-)
diff --git a/contrib/postgres_fdw/option.c b/contrib/postgres_fdw/option.c
index fa80ee2a55..f96cb79b42 100644
--- a/contrib/postgres_fdw/option.c
+++ b/contrib/postgres_fdw/option.c
@@ -125,7 +125,8 @@ postgres_fdw_validator(PG_FUNCTION_ARGS)
strcmp(def->defname, "truncatable") == 0 ||
strcmp(def->defname, "async_capable") == 0 ||
strcmp(def->defname, "parallel_commit") == 0 ||
- strcmp(def->defname, "keep_connections") == 0)
+ strcmp(def->defname, "keep_connections") == 0 ||
+ strcmp(def->defname, "binary_format") == 0)
{
/* these accept only boolean values */
(void) defGetBoolean(def);
@@ -253,6 +254,9 @@ InitPgFdwOptions(void)
/* async_capable is available on both server and table */
{"async_capable", ForeignServerRelationId, false},
{"async_capable", ForeignTableRelationId, false},
+ /* async_capable is available on both server and table */
+ {"binary_format", ForeignServerRelationId, false},
+ {"binary_format", ForeignTableRelationId, false},
{"parallel_commit", ForeignServerRelationId, false},
{"keep_connections", ForeignServerRelationId, false},
{"password_required", UserMappingRelationId, false},
diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c
index 8d7500abfb..9344b6f5fc 100644
--- a/contrib/postgres_fdw/postgres_fdw.c
+++ b/contrib/postgres_fdw/postgres_fdw.c
@@ -76,6 +76,8 @@ enum FdwScanPrivateIndex
FdwScanPrivateRetrievedAttrs,
/* Integer representing the desired fetch_size */
FdwScanPrivateFetchSize,
+ /* Boolean flag showing whether to use binary or text libpq protocol */
+ FdwScanPrivateBinaryFormat,
/*
* String describing join i.e. names of relations being joined and types
@@ -128,7 +130,8 @@ enum FdwDirectModifyPrivateIndex
/* Integer list of attribute numbers retrieved by RETURNING */
FdwDirectModifyPrivateRetrievedAttrs,
/* set-processed flag (as a Boolean node) */
- FdwDirectModifyPrivateSetProcessed
+ FdwDirectModifyPrivateSetProcessed,
+ FdwDirectModifyPrivateBinaryFormat
};
/*
@@ -154,6 +157,7 @@ typedef struct PgFdwScanState
FmgrInfo *param_flinfo; /* output conversion functions for them */
List *param_exprs; /* executable expressions for param values */
const char **param_values; /* textual values of query parameters */
+ int *param_lengths;
/* for storing result tuples */
HeapTuple *tuples; /* array of currently-retrieved tuples */
@@ -172,6 +176,7 @@ typedef struct PgFdwScanState
MemoryContext temp_cxt; /* context for per-tuple temporary data */
int fetch_size; /* number of tuples per fetch */
+ bool binary_format; /* whether to use libpq binary or text format */
} PgFdwScanState;
/*
@@ -195,6 +200,7 @@ typedef struct PgFdwModifyState
int batch_size; /* value of FDW option "batch_size" */
bool has_returning; /* is there a RETURNING clause? */
List *retrieved_attrs; /* attr numbers retrieved by RETURNING */
+ bool binary_format;
/* info about parameters for prepared statement */
AttrNumber ctidAttno; /* attnum of input resjunk ctid column */
@@ -225,7 +231,7 @@ typedef struct PgFdwDirectModifyState
bool has_returning; /* is there a RETURNING clause? */
List *retrieved_attrs; /* attr numbers retrieved by RETURNING */
bool set_processed; /* do we set the command es_processed? */
-
+ bool binary_format;
/* for remote query execution */
PGconn *conn; /* connection for the update */
PgFdwConnState *conn_state; /* extra per-connection state */
@@ -233,6 +239,7 @@ typedef struct PgFdwDirectModifyState
FmgrInfo *param_flinfo; /* output conversion functions for them */
List *param_exprs; /* executable expressions for param values */
const char **param_values; /* textual values of query parameters */
+ int *param_lengths;
/* for storing result tuples */
PGresult *result; /* result for query */
@@ -256,6 +263,7 @@ typedef struct PgFdwAnalyzeState
Relation rel; /* relcache entry for the foreign table */
AttInMetadata *attinmeta; /* attribute datatype conversion metadata */
List *retrieved_attrs; /* attr numbers retrieved by query */
+ bool binary_format;
/* collected sample rows */
HeapTuple *rows; /* array of size targrows */
@@ -470,7 +478,8 @@ static void prepare_foreign_modify(PgFdwModifyState *fmstate);
static const char **convert_prep_stmt_params(PgFdwModifyState *fmstate,
ItemPointer tupleid,
TupleTableSlot **slots,
- int numSlots);
+ int numSlots,
+ int **p_lengths);
static void store_returning_result(PgFdwModifyState *fmstate,
TupleTableSlot *slot, PGresult *res);
static void finish_foreign_modify(PgFdwModifyState *fmstate);
@@ -492,11 +501,15 @@ static void prepare_query_params(PlanState *node,
int numParams,
FmgrInfo **param_flinfo,
List **param_exprs,
- const char ***param_values);
+ const char ***param_values,
+ int **param_lengths,
+ bool binary_format);
static void process_query_params(ExprContext *econtext,
FmgrInfo *param_flinfo,
List *param_exprs,
- const char **param_values);
+ const char **param_values,
+ int *param_lengths,
+ bool binary_format);
static int postgresAcquireSampleRowsFunc(Relation relation, int elevel,
HeapTuple *rows, int targrows,
double *totalrows,
@@ -512,7 +525,8 @@ static HeapTuple make_tuple_from_result_row(PGresult *res,
AttInMetadata *attinmeta,
List *retrieved_attrs,
ForeignScanState *fsstate,
- MemoryContext temp_context);
+ MemoryContext temp_context,
+ bool binary_format);
static void conversion_error_callback(void *arg);
static bool foreign_join_ok(PlannerInfo *root, RelOptInfo *joinrel,
JoinType jointype, RelOptInfo *outerrel, RelOptInfo *innerrel,
@@ -540,8 +554,14 @@ static void apply_table_options(PgFdwRelationInfo *fpinfo);
static void merge_fdw_options(PgFdwRelationInfo *fpinfo,
const PgFdwRelationInfo *fpinfo_o,
const PgFdwRelationInfo *fpinfo_i);
-static int get_batch_size_option(Relation rel);
+static bool get_binary_format_option(ForeignServer *server, ForeignTable *table);
+static int get_batch_size_option(ForeignServer *server, ForeignTable *table);
+static ForeignScan *find_modifytable_subplan(PlannerInfo *root,
+ ModifyTable *plan,
+ Index rtindex,
+ int subplan_index);
+static AttInMetadata *TupleDescGetAttInBinaryMetadata(TupleDesc tupdesc);
/*
* Foreign-data wrapper handler function: return a struct with pointers
@@ -1404,9 +1424,10 @@ postgresGetForeignPlan(PlannerInfo *root,
* Build the fdw_private list that will be available to the executor.
* Items in the list must match order in enum FdwScanPrivateIndex.
*/
- fdw_private = list_make3(makeString(sql.data),
+ fdw_private = list_make4(makeString(sql.data),
retrieved_attrs,
- makeInteger(fpinfo->fetch_size));
+ makeInteger(fpinfo->fetch_size),
+ makeBoolean(fpinfo->binary_format));
if (IS_JOIN_REL(foreignrel) || IS_UPPER_REL(foreignrel))
fdw_private = lappend(fdw_private,
makeString(fpinfo->relation_name));
@@ -1542,6 +1563,8 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags)
FdwScanPrivateRetrievedAttrs);
fsstate->fetch_size = intVal(list_nth(fsplan->fdw_private,
FdwScanPrivateFetchSize));
+ fsstate->binary_format = boolVal(list_nth(fsplan->fdw_private,
+ FdwScanPrivateBinaryFormat));
/* Create contexts for batches of tuples and per-tuple temp workspace. */
fsstate->batch_cxt = AllocSetContextCreate(estate->es_query_cxt,
@@ -1566,8 +1589,10 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags)
fsstate->tupdesc = get_tupdesc_for_join_scan_tuples(node);
}
- fsstate->attinmeta = TupleDescGetAttInMetadata(fsstate->tupdesc);
-
+ if (fsstate->binary_format)
+ fsstate->attinmeta = TupleDescGetAttInBinaryMetadata(fsstate->tupdesc);
+ else
+ fsstate->attinmeta = TupleDescGetAttInMetadata(fsstate->tupdesc);
/*
* Prepare for processing of parameters used in remote query, if any.
*/
@@ -1579,7 +1604,9 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags)
numParams,
&fsstate->param_flinfo,
&fsstate->param_exprs,
- &fsstate->param_values);
+ &fsstate->param_values,
+ &fsstate->param_lengths,
+ fsstate->binary_format);
/* Set the async-capable flag */
fsstate->async_capable = node->ss.ps.async_capable;
@@ -2038,7 +2065,15 @@ postgresGetForeignModifyBatchSize(ResultRelInfo *resultRelInfo)
if (fmstate)
batch_size = fmstate->batch_size;
else
- batch_size = get_batch_size_option(resultRelInfo->ri_RelationDesc);
+ {
+ ForeignServer *fs;
+ ForeignTable *ft;
+
+ ft = GetForeignTable(RelationGetRelid(resultRelInfo->ri_RelationDesc));
+ fs = GetForeignServer(ft->serverid);
+
+ batch_size = get_batch_size_option(fs, ft);
+ }
/*
* Disable batching when we have to use RETURNING, there are any
@@ -2593,10 +2628,11 @@ postgresPlanDirectModify(PlannerInfo *root,
* Update the fdw_private list that will be available to the executor.
* Items in the list must match enum FdwDirectModifyPrivateIndex, above.
*/
- fscan->fdw_private = list_make4(makeString(sql.data),
+ fscan->fdw_private = list_make5(makeString(sql.data),
makeBoolean((retrieved_attrs != NIL)),
retrieved_attrs,
- makeBoolean(plan->canSetTag));
+ makeBoolean(plan->canSetTag),
+ makeBoolean(fpinfo->binary_format));
/*
* Update the foreign-join-related fields.
@@ -2701,6 +2737,8 @@ postgresBeginDirectModify(ForeignScanState *node, int eflags)
dmstate->set_processed = boolVal(list_nth(fsplan->fdw_private,
FdwDirectModifyPrivateSetProcessed));
+ dmstate->binary_format = boolVal(list_nth(fsplan->fdw_private,
+ FdwDirectModifyPrivateBinaryFormat));
/* Create context for per-tuple temp workspace. */
dmstate->temp_cxt = AllocSetContextCreate(estate->es_query_cxt,
"postgres_fdw temporary data",
@@ -2716,7 +2754,10 @@ postgresBeginDirectModify(ForeignScanState *node, int eflags)
else
tupdesc = RelationGetDescr(dmstate->rel);
- dmstate->attinmeta = TupleDescGetAttInMetadata(tupdesc);
+ if (dmstate->binary_format)
+ dmstate->attinmeta = TupleDescGetAttInBinaryMetadata(tupdesc);
+ else
+ dmstate->attinmeta = TupleDescGetAttInMetadata(tupdesc);
/*
* When performing an UPDATE/DELETE .. RETURNING on a join directly,
@@ -2738,7 +2779,9 @@ postgresBeginDirectModify(ForeignScanState *node, int eflags)
numParams,
&dmstate->param_flinfo,
&dmstate->param_exprs,
- &dmstate->param_values);
+ &dmstate->param_values,
+ &dmstate->param_lengths,
+ dmstate->binary_format);
}
/*
@@ -3711,9 +3754,11 @@ create_cursor(ForeignScanState *node)
ExprContext *econtext = node->ss.ps.ps_ExprContext;
int numParams = fsstate->numParams;
const char **values = fsstate->param_values;
+ int *formats = NULL;
PGconn *conn = fsstate->conn;
StringInfoData buf;
PGresult *res;
+ bool binary = fsstate->binary_format;
/* First, process a pending asynchronous request, if any. */
if (fsstate->conn_state->pendingAreq)
@@ -3733,15 +3778,28 @@ create_cursor(ForeignScanState *node)
process_query_params(econtext,
fsstate->param_flinfo,
fsstate->param_exprs,
- values);
+ values,
+ fsstate->param_lengths,
+ binary);
MemoryContextSwitchTo(oldcontext);
+
+ if (binary)
+ {
+ int i;
+
+ formats = palloc(sizeof(int) * numParams);
+ for (i = 0; i < numParams; i++)
+ formats[i] = 1;
+ }
}
/* Construct the DECLARE CURSOR command */
initStringInfo(&buf);
- appendStringInfo(&buf, "DECLARE c%u CURSOR FOR\n%s",
- fsstate->cursor_number, fsstate->query);
+ appendStringInfo(&buf, "DECLARE c%u %sCURSOR FOR\n%s",
+ fsstate->cursor_number,
+ binary ? "BINARY " : "",
+ fsstate->query);
/*
* Notice that we pass NULL for paramTypes, thus forcing the remote server
@@ -3751,7 +3809,7 @@ create_cursor(ForeignScanState *node)
* server has the same OIDs we do for the parameters' types.
*/
if (!PQsendQueryParams(conn, buf.data, numParams,
- NULL, values, NULL, NULL, 0))
+ NULL, values, fsstate->param_lengths, formats, binary ? 1 : 0))
pgfdw_report_error(ERROR, NULL, conn, false, buf.data);
/*
@@ -3848,7 +3906,8 @@ fetch_more_data(ForeignScanState *node)
fsstate->attinmeta,
fsstate->retrieved_attrs,
node,
- fsstate->temp_cxt);
+ fsstate->temp_cxt,
+ fsstate->binary_format);
}
/* Update fetch_ct_2 */
@@ -3969,11 +4028,13 @@ create_foreign_modify(EState *estate,
TupleDesc tupdesc = RelationGetDescr(rel);
Oid userid;
ForeignTable *table;
+ ForeignServer *server;
UserMapping *user;
AttrNumber n_params;
Oid typefnoid;
bool isvarlena;
ListCell *lc;
+ bool binary_format;
/* Begin constructing PgFdwModifyState. */
fmstate = (PgFdwModifyState *) palloc0(sizeof(PgFdwModifyState));
@@ -3987,6 +4048,7 @@ create_foreign_modify(EState *estate,
/* Get info about foreign table. */
table = GetForeignTable(RelationGetRelid(rel));
+ server = GetForeignServer(table->serverid);
user = GetUserMapping(userid, table->serverid);
/* Open connection; report that we'll create a prepared statement. */
@@ -4000,6 +4062,9 @@ create_foreign_modify(EState *estate,
fmstate->query = pstrdup(fmstate->query);
fmstate->orig_query = pstrdup(fmstate->query);
}
+ binary_format = get_binary_format_option(server, table);
+
+ fmstate->binary_format = binary_format;
fmstate->target_attrs = target_attrs;
fmstate->values_end = values_end;
fmstate->has_returning = has_returning;
@@ -4012,7 +4077,12 @@ create_foreign_modify(EState *estate,
/* Prepare for input conversion of RETURNING results. */
if (fmstate->has_returning)
- fmstate->attinmeta = TupleDescGetAttInMetadata(tupdesc);
+ {
+ if (binary_format)
+ fmstate->attinmeta = TupleDescGetAttInBinaryMetadata(tupdesc);
+ else
+ fmstate->attinmeta = TupleDescGetAttInMetadata(tupdesc);
+ }
/* Prepare for output conversion of parameters used in prepared stmt. */
n_params = list_length(fmstate->target_attrs) + 1;
@@ -4030,7 +4100,10 @@ create_foreign_modify(EState *estate,
elog(ERROR, "could not find junk ctid column");
/* First transmittable parameter will be ctid */
- getTypeOutputInfo(TIDOID, &typefnoid, &isvarlena);
+ if (binary_format)
+ getTypeBinaryOutputInfo(TIDOID, &typefnoid, &isvarlena);
+ else
+ getTypeOutputInfo(TIDOID, &typefnoid, &isvarlena);
fmgr_info(typefnoid, &fmstate->p_flinfo[fmstate->p_nums]);
fmstate->p_nums++;
}
@@ -4048,7 +4121,11 @@ create_foreign_modify(EState *estate,
/* Ignore generated columns; they are set to DEFAULT */
if (attr->attgenerated)
continue;
- getTypeOutputInfo(attr->atttypid, &typefnoid, &isvarlena);
+
+ if (binary_format)
+ getTypeBinaryOutputInfo(attr->atttypid, &typefnoid, &isvarlena);
+ else
+ getTypeOutputInfo(attr->atttypid, &typefnoid, &isvarlena);
fmgr_info(typefnoid, &fmstate->p_flinfo[fmstate->p_nums]);
fmstate->p_nums++;
}
@@ -4058,7 +4135,7 @@ create_foreign_modify(EState *estate,
/* Set batch_size from foreign server/table options. */
if (operation == CMD_INSERT)
- fmstate->batch_size = get_batch_size_option(rel);
+ fmstate->batch_size = get_batch_size_option(server, table);
fmstate->num_slots = 1;
@@ -4088,6 +4165,9 @@ execute_foreign_modify(EState *estate,
const char **p_values;
PGresult *res;
int n_rows;
+ int *p_formats = NULL;
+ int numParams = fmstate->p_nums * (*numSlots);
+ int *p_lengths = NULL;
StringInfoData sql;
/* The operation should be INSERT, UPDATE, or DELETE */
@@ -4142,7 +4222,17 @@ execute_foreign_modify(EState *estate,
}
/* Convert parameters needed by prepared statement to text form */
- p_values = convert_prep_stmt_params(fmstate, ctid, slots, *numSlots);
+ p_values = convert_prep_stmt_params(fmstate, ctid, slots, *numSlots,
+ &p_lengths);
+
+ if (fmstate->binary_format)
+ {
+ int i;
+
+ p_formats = palloc(sizeof(int) * numParams);
+ for (i = 0; i < numParams; i++)
+ p_formats[i] = 1;
+ }
/*
* Execute the prepared statement.
@@ -4151,9 +4241,9 @@ execute_foreign_modify(EState *estate,
fmstate->p_name,
fmstate->p_nums * (*numSlots),
p_values,
- NULL,
- NULL,
- 0))
+ p_lengths,
+ p_formats,
+ fmstate->binary_format ? 1 : 0))
pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query);
/*
@@ -4254,17 +4344,24 @@ static const char **
convert_prep_stmt_params(PgFdwModifyState *fmstate,
ItemPointer tupleid,
TupleTableSlot **slots,
- int numSlots)
+ int numSlots,
+ int **param_lengths)
{
const char **p_values;
int i;
int j;
int pindex = 0;
MemoryContext oldcontext;
+ int *p_lengths = NULL;
+
oldcontext = MemoryContextSwitchTo(fmstate->temp_cxt);
p_values = (const char **) palloc(sizeof(char *) * fmstate->p_nums * numSlots);
+ if (fmstate->binary_format)
+ p_lengths = palloc(sizeof(int) * fmstate->p_nums * numSlots);
+
+ *param_lengths = p_lengths;
/* ctid is provided only for UPDATE/DELETE, which don't allow batching */
Assert(!(tupleid != NULL && numSlots > 1));
@@ -4274,8 +4371,18 @@ convert_prep_stmt_params(PgFdwModifyState *fmstate,
{
Assert(numSlots == 1);
/* don't need set_transmission_modes for TID output */
- p_values[pindex] = OutputFunctionCall(&fmstate->p_flinfo[pindex],
- PointerGetDatum(tupleid));
+ if (fmstate->binary_format)
+ {
+ bytea *val;
+
+ val = SendFunctionCall(&fmstate->p_flinfo[pindex],
+ PointerGetDatum(tupleid));
+ p_values[pindex] = VARDATA(val);
+ p_lengths[pindex] = VARSIZE(val) - VARHDRSZ;
+ }
+ else
+ p_values[pindex] = OutputFunctionCall(&fmstate->p_flinfo[pindex],
+ PointerGetDatum(tupleid));
pindex++;
}
@@ -4303,7 +4410,21 @@ convert_prep_stmt_params(PgFdwModifyState *fmstate,
continue;
value = slot_getattr(slots[i], attnum, &isnull);
if (isnull)
+ {
p_values[pindex] = NULL;
+
+ /* Binary params are parsed as NULL if length is -1 */
+ if (fmstate->binary_format)
+ p_lengths[pindex] = -1;
+ }
+ else if (fmstate->binary_format)
+ {
+ bytea *val;
+
+ val = SendFunctionCall(&fmstate->p_flinfo[j], value);
+ p_values[pindex] = VARDATA(val);
+ p_lengths[pindex] = VARSIZE(val) - VARHDRSZ;
+ }
else
p_values[pindex] = OutputFunctionCall(&fmstate->p_flinfo[j],
value);
@@ -4342,7 +4463,8 @@ store_returning_result(PgFdwModifyState *fmstate,
fmstate->attinmeta,
fmstate->retrieved_attrs,
NULL,
- fmstate->temp_cxt);
+ fmstate->temp_cxt,
+ fmstate->binary_format);
/*
* The returning slot will not necessarily be suitable to store
@@ -4543,6 +4665,8 @@ execute_dml_stmt(ForeignScanState *node)
ExprContext *econtext = node->ss.ps.ps_ExprContext;
int numParams = dmstate->numParams;
const char **values = dmstate->param_values;
+ int *formats = NULL;
+ bool binary = dmstate->binary_format;
/* First, process a pending asynchronous request, if any. */
if (dmstate->conn_state->pendingAreq)
@@ -4552,11 +4676,24 @@ execute_dml_stmt(ForeignScanState *node)
* Construct array of query parameter values in text format.
*/
if (numParams > 0)
+ {
process_query_params(econtext,
dmstate->param_flinfo,
dmstate->param_exprs,
- values);
+ values,
+ dmstate->param_lengths,
+ dmstate->binary_format);
+
+ if (binary)
+ {
+ int i;
+ formats = palloc(sizeof(int) * numParams);
+ for (i = 0; i < numParams; i++)
+ formats[i] = 1;
+ }
+
+ }
/*
* Notice that we pass NULL for paramTypes, thus forcing the remote server
* to infer types for all parameters. Since we explicitly cast every
@@ -4565,7 +4702,7 @@ execute_dml_stmt(ForeignScanState *node)
* server has the same OIDs we do for the parameters' types.
*/
if (!PQsendQueryParams(dmstate->conn, dmstate->query, numParams,
- NULL, values, NULL, NULL, 0))
+ NULL, values, dmstate->param_lengths, formats, binary ? 1 : 0))
pgfdw_report_error(ERROR, NULL, dmstate->conn, false, dmstate->query);
/*
@@ -4635,7 +4772,8 @@ get_returning_data(ForeignScanState *node)
dmstate->attinmeta,
dmstate->retrieved_attrs,
node,
- dmstate->temp_cxt);
+ dmstate->temp_cxt,
+ dmstate->binary_format);
ExecStoreHeapTuple(newtup, slot, false);
}
PG_CATCH();
@@ -4841,7 +4979,9 @@ prepare_query_params(PlanState *node,
int numParams,
FmgrInfo **param_flinfo,
List **param_exprs,
- const char ***param_values)
+ const char ***param_values,
+ int **param_lengths,
+ bool binary_format)
{
int i;
ListCell *lc;
@@ -4857,8 +4997,14 @@ prepare_query_params(PlanState *node,
Node *param_expr = (Node *) lfirst(lc);
Oid typefnoid;
bool isvarlena;
-
- getTypeOutputInfo(exprType(param_expr), &typefnoid, &isvarlena);
+ if (binary_format)
+ {
+ getTypeBinaryOutputInfo(exprType(param_expr), &typefnoid, &isvarlena);
+ }
+ else
+ {
+ getTypeOutputInfo(exprType(param_expr), &typefnoid, &isvarlena);
+ }
fmgr_info(typefnoid, &(*param_flinfo)[i]);
i++;
}
@@ -4875,6 +5021,8 @@ prepare_query_params(PlanState *node,
/* Allocate buffer for text form of query parameters. */
*param_values = (const char **) palloc0(numParams * sizeof(char *));
+ if (binary_format)
+ *param_lengths = (int *) palloc0(numParams * sizeof(int));
}
/*
@@ -4884,7 +5032,9 @@ static void
process_query_params(ExprContext *econtext,
FmgrInfo *param_flinfo,
List *param_exprs,
- const char **param_values)
+ const char **param_values,
+ int *param_lengths,
+ bool binary_format)
{
int nestlevel;
int i;
@@ -4907,7 +5057,19 @@ process_query_params(ExprContext *econtext,
* type-specific output function, unless the value is null.
*/
if (isNull)
+ {
param_values[i] = NULL;
+
+ /* Binary params are parsed as NULL if length is -1 */
+ if (binary_format)
+ param_lengths[i] = -1;
+ }
+ else if (binary_format)
+ {
+ bytea *val = SendFunctionCall(¶m_flinfo[i], expr_value);
+ param_values[i] = VARDATA(val);
+ param_lengths[i] = VARSIZE(val) - VARHDRSZ;
+ }
else
param_values[i] = OutputFunctionCall(¶m_flinfo[i], expr_value);
@@ -5011,7 +5173,6 @@ postgresAcquireSampleRowsFunc(Relation relation, int elevel,
/* Initialize workspace state */
astate.rel = relation;
- astate.attinmeta = TupleDescGetAttInMetadata(RelationGetDescr(relation));
astate.rows = rows;
astate.targrows = targrows;
@@ -5035,12 +5196,17 @@ postgresAcquireSampleRowsFunc(Relation relation, int elevel,
user = GetUserMapping(relation->rd_rel->relowner, table->serverid);
conn = GetConnection(user, false, NULL);
+ astate.binary_format = get_binary_format_option(server, table);
+ if (astate.binary_format)
+ astate.attinmeta = TupleDescGetAttInBinaryMetadata(RelationGetDescr(relation));
+ else
+ astate.attinmeta = TupleDescGetAttInMetadata(RelationGetDescr(relation));;
/*
* Construct cursor that retrieves whole rows from remote.
*/
cursor_number = GetCursorNumber(conn);
initStringInfo(&sql);
- appendStringInfo(&sql, "DECLARE c%u CURSOR FOR ", cursor_number);
+ appendStringInfo(&sql, "DECLARE c%u %sCURSOR FOR ", cursor_number, astate.binary_format ? "BINARY " : "");
deparseAnalyzeSql(&sql, relation, &astate.retrieved_attrs);
/* In what follows, do not risk leaking any PGresults. */
@@ -5212,7 +5378,8 @@ analyze_row_processor(PGresult *res, int row, PgFdwAnalyzeState *astate)
astate->attinmeta,
astate->retrieved_attrs,
NULL,
- astate->temp_cxt);
+ astate->temp_cxt,
+ astate->binary_format);
MemoryContextSwitchTo(oldcontext);
}
@@ -5922,6 +6089,8 @@ apply_server_options(PgFdwRelationInfo *fpinfo)
(void) parse_int(defGetString(def), &fpinfo->fetch_size, 0, NULL);
else if (strcmp(def->defname, "async_capable") == 0)
fpinfo->async_capable = defGetBoolean(def);
+ else if (strcmp(def->defname, "binary_format") == 0)
+ fpinfo->binary_format = defGetBoolean(def);
}
}
@@ -5945,6 +6114,9 @@ apply_table_options(PgFdwRelationInfo *fpinfo)
(void) parse_int(defGetString(def), &fpinfo->fetch_size, 0, NULL);
else if (strcmp(def->defname, "async_capable") == 0)
fpinfo->async_capable = defGetBoolean(def);
+ else if (strcmp(def->defname, "binary_format") == 0)
+ fpinfo->binary_format = defGetBoolean(def);
+
}
}
@@ -5980,6 +6152,7 @@ merge_fdw_options(PgFdwRelationInfo *fpinfo,
fpinfo->use_remote_estimate = fpinfo_o->use_remote_estimate;
fpinfo->fetch_size = fpinfo_o->fetch_size;
fpinfo->async_capable = fpinfo_o->async_capable;
+ fpinfo->binary_format = fpinfo_o->binary_format;
/* Merge the table level options from either side of the join. */
if (fpinfo_i)
@@ -6011,6 +6184,9 @@ merge_fdw_options(PgFdwRelationInfo *fpinfo,
*/
fpinfo->async_capable = fpinfo_o->async_capable ||
fpinfo_i->async_capable;
+
+ /* Join will use binary proto only if both base rels are configured to use it. */
+ fpinfo->binary_format = fpinfo_o->binary_format && fpinfo->binary_format;
}
}
@@ -7209,7 +7385,8 @@ make_tuple_from_result_row(PGresult *res,
AttInMetadata *attinmeta,
List *retrieved_attrs,
ForeignScanState *fsstate,
- MemoryContext temp_context)
+ MemoryContext temp_context,
+ bool binary_format)
{
HeapTuple tuple;
TupleDesc tupdesc;
@@ -7221,6 +7398,10 @@ make_tuple_from_result_row(PGresult *res,
MemoryContext oldcontext;
ListCell *lc;
int j;
+ StringInfo buf = NULL;
+
+ if (binary_format)
+ buf = makeStringInfo();
Assert(row < PQntuples(res));
@@ -7274,6 +7455,11 @@ make_tuple_from_result_row(PGresult *res,
else
valstr = PQgetvalue(res, row, j);
+ if (binary_format && valstr != NULL)
+ {
+ resetStringInfo(buf);
+ appendBinaryStringInfo(buf, valstr, PQgetlength(res, row, j));
+ }
/*
* convert value to internal representation
*
@@ -7286,10 +7472,20 @@ make_tuple_from_result_row(PGresult *res,
Assert(i <= tupdesc->natts);
nulls[i - 1] = (valstr == NULL);
/* Apply the input function even to nulls, to support domains */
- values[i - 1] = InputFunctionCall(&attinmeta->attinfuncs[i - 1],
- valstr,
- attinmeta->attioparams[i - 1],
- attinmeta->atttypmods[i - 1]);
+ if (binary_format)
+ {
+ values[i - 1] = ReceiveFunctionCall(&attinmeta->attinfuncs[i - 1],
+ nulls[i - 1] ? NULL : buf,
+ attinmeta->attioparams[i - 1],
+ attinmeta->atttypmods[i - 1]);
+ }
+ else
+ {
+ values[i - 1] = InputFunctionCall(&attinmeta->attinfuncs[i - 1],
+ valstr,
+ attinmeta->attioparams[i - 1],
+ attinmeta->atttypmods[i - 1]);
+ }
}
else if (i == SelfItemPointerAttributeNumber)
{
@@ -7298,7 +7494,10 @@ make_tuple_from_result_row(PGresult *res,
{
Datum datum;
- datum = DirectFunctionCall1(tidin, CStringGetDatum(valstr));
+ if (binary_format)
+ datum = DirectFunctionCall1(tidrecv, PointerGetDatum(buf));
+ else
+ datum = DirectFunctionCall1(tidin, CStringGetDatum(valstr));
ctid = (ItemPointer) DatumGetPointer(datum);
}
}
@@ -7556,16 +7755,46 @@ find_em_for_rel_target(PlannerInfo *root, EquivalenceClass *ec,
return NULL;
}
+
+static bool
+get_binary_format_option(ForeignServer *server, ForeignTable *table)
+{
+ List *options;
+ ListCell *lc;
+
+ /* By default, text protocol is used */
+ bool binary_format = false;
+
+ /*
+ * Load options for table and server. We append server options after table
+ * options, because table options take precedence.
+ */
+ options = NIL;
+ options = list_concat(options, table->options);
+ options = list_concat(options, server->options);
+
+ /* See if either table or server specifies batch_size. */
+ foreach(lc, options)
+ {
+ DefElem *def = (DefElem *) lfirst(lc);
+
+ if (strcmp(def->defname, "binary_format") == 0)
+ {
+ (void) parse_bool(defGetString(def), &binary_format);
+ break;
+ }
+ }
+ list_free(options);
+ return binary_format;
+}
+
/*
* Determine batch size for a given foreign table. The option specified for
* a table has precedence.
*/
static int
-get_batch_size_option(Relation rel)
+get_batch_size_option(ForeignServer *server, ForeignTable *table)
{
- Oid foreigntableid = RelationGetRelid(rel);
- ForeignTable *table;
- ForeignServer *server;
List *options;
ListCell *lc;
@@ -7576,9 +7805,6 @@ get_batch_size_option(Relation rel)
* Load options for table and server. We append server options after table
* options, because table options take precedence.
*/
- table = GetForeignTable(foreigntableid);
- server = GetForeignServer(table->serverid);
-
options = NIL;
options = list_concat(options, table->options);
options = list_concat(options, server->options);
@@ -7595,5 +7821,54 @@ get_batch_size_option(Relation rel)
}
}
+ list_free(options);
return batch_size;
}
+
+/*
+ * pTupleDescGetAttInBinaryMetadata - Basically a copy of TupleDescGetAttInMetadata
+ * where input function is replaced binary input function.
+ */
+static AttInMetadata *
+TupleDescGetAttInBinaryMetadata(TupleDesc tupdesc)
+{
+ int natts = tupdesc->natts;
+ int i;
+ Oid atttypeid;
+ Oid attinfuncid;
+ FmgrInfo *attinfuncinfo;
+ Oid *attioparams;
+ int32 *atttypmods;
+ AttInMetadata *attinmeta;
+
+ attinmeta = (AttInMetadata *) palloc(sizeof(AttInMetadata));
+
+ /* "Bless" the tupledesc so that we can make rowtype datums with it */
+ attinmeta->tupdesc = BlessTupleDesc(tupdesc);
+
+ /*
+ * Gather info needed later to call the "in" function for each attribute
+ */
+ attinfuncinfo = (FmgrInfo *) palloc0(natts * sizeof(FmgrInfo));
+ attioparams = (Oid *) palloc0(natts * sizeof(Oid));
+ atttypmods = (int32 *) palloc0(natts * sizeof(int32));
+
+ for (i = 0; i < natts; i++)
+ {
+ Form_pg_attribute att = TupleDescAttr(tupdesc, i);
+
+ /* Ignore dropped attributes */
+ if (!att->attisdropped)
+ {
+ atttypeid = att->atttypid;
+ getTypeBinaryInputInfo(atttypeid, &attinfuncid, &attioparams[i]);
+ fmgr_info(attinfuncid, &attinfuncinfo[i]);
+ atttypmods[i] = att->atttypmod;
+ }
+ }
+ attinmeta->attinfuncs = attinfuncinfo;
+ attinmeta->attioparams = attioparams;
+ attinmeta->atttypmods = atttypmods;
+
+ return attinmeta;
+}
diff --git a/contrib/postgres_fdw/postgres_fdw.h b/contrib/postgres_fdw/postgres_fdw.h
index a11d45bedf..993d3f026a 100644
--- a/contrib/postgres_fdw/postgres_fdw.h
+++ b/contrib/postgres_fdw/postgres_fdw.h
@@ -80,6 +80,7 @@ typedef struct PgFdwRelationInfo
Cost fdw_tuple_cost;
List *shippable_extensions; /* OIDs of shippable extensions */
bool async_capable;
+ bool binary_format;
/* Cached catalog information. */
ForeignTable *table;
--
2.30.2
Hi Illya,
On Mon, Nov 21, 2022 at 8:50 PM Ilya Gladyshev
<ilya.v.gladyshev@gmail.com> wrote:
Hi everyone,
I have made a patch that introduces support for libpq binary protocol
in postgres_fdw. The idea is simple, when a user knows that the foreign
server is binary compatible with the local and his workload could
somehow benefit from using binary protocol, it can be switched on for a
particular server or even a particular table.
Why do we need this feature? If it's for performance then do we have
performance numbers?
AFAIU, binary compatibility of two postgresql servers depends upon the
binary compatibility of the platforms on which they run. So probably
postgres_fdw can not infer the binary compatibility by itself. Is that
true? We have many postgres_fdw options that user needs to set
manually to benefit from them. It will be good to infer those
automatically as much as possible. Hence this question.
The patch adds a new foreign server and table option 'binary_format'
(by default off) and implements serialization/deserialization of query
results and parameters for binary protocol. I have tested the patch by
switching foreign servers in postgres_fdw.sql tests to binary_mode, the
only diff was in the text of the error for parsing an invalid integer
value, so it worked as expected for the test. There are a few minor
issues I don't like in the code and I am yet to write the tests and
docs for it. It would be great to get some feedback and understand,
whether this is a welcome feature, before proceeding with all of the
abovementioned.
About the patch itself, I see a lot of if (binary) {} else {} block
which are repeated. It will be good if we can add functions/macros to
avoid duplication.
--
Best Wishes,
Ashutosh Bapat
On Tue, 22 Nov 2022 at 08:17, Ashutosh Bapat
<ashutosh.bapat.oss@gmail.com> wrote:
AFAIU, binary compatibility of two postgresql servers depends upon the
binary compatibility of the platforms on which they run.
No, libpq binary mode is not architecture-specific. I think you're
thinking of on-disk binary compatibility. But libpq binary mode is
just a binary network representation of the data instead of an ascii
representation. It should be faster and more efficient but it still
goes through binary input/output functions (which aren't named
input/output)
I actually wonder if having this would be a good way to get some code
coverage of the binary input/output functions which I suspect is sadly
lacking now. It wouldn't necessarily test that they're doing what
they're supposed to... but at least they would be getting run which I
don't think they are currently?
--
greg
22 нояб. 2022 г., в 17:10, Ashutosh Bapat <ashutosh.bapat.oss@gmail.com> написал(а):
Hi Illya,
On Mon, Nov 21, 2022 at 8:50 PM Ilya Gladyshev
<ilya.v.gladyshev@gmail.com> wrote:Hi everyone,
I have made a patch that introduces support for libpq binary protocol
in postgres_fdw. The idea is simple, when a user knows that the foreign
server is binary compatible with the local and his workload could
somehow benefit from using binary protocol, it can be switched on for a
particular server or even a particular table.Why do we need this feature? If it's for performance then do we have
performance numbers?
Yes, it is for performance, but I am yet to do the benchmarks. My initial idea was that binary protocol must be more efficient than text, because as I understand that’s the whole point of it. However, the minor tests that I have done do not prove this and I couldn’t find any benchmarks for it online, so I will do further tests to find a use case for it.
About the patch itself, I see a lot of if (binary) {} else {} block
which are repeated. It will be good if we can add functions/macros to
avoid duplication.
Yea, that’s true, I have some ideas about improving it