Add \syncpipeline command to pgbench
Hi,
PQsendSyncMessage() was added in
https://commitfest.postgresql.org/46/4262/. It allows users to add a
Sync message without flushing the buffer.
As a follow-up, this change adds an additional meta-command to
pgbench, \syncpipeline, which will call PQsendSyncMessage(). This will
make it possible to benchmark impact and improvements of using
PQsendSyncMessage through pgbench.
Regards,
Anthonin
Attachments:
v1-0001-pgbench-Add-syncpipeline-to-pgbench.patchapplication/octet-stream; name=v1-0001-pgbench-Add-syncpipeline-to-pgbench.patchDownload
From 4b43e825f9ac7b3149b2e5a30e79e686d00b3589 Mon Sep 17 00:00:00 2001
From: Anthonin Bonnefoy <anthonin.bonnefoy@datadoghq.com>
Date: Mon, 13 Nov 2023 08:14:11 +0100
Subject: [PATCH v1] pgbench: Add \syncpipeline to pgbench
This change adds a new metacommand \syncpipeline to pgbench. This
command allows to add a sync message without flushing using
PQsendPipelineSync().
This will allow to benchmark the possible improvements of using
PQsendPipelineSync()
---
doc/src/sgml/ref/pgbench.sgml | 11 ++++++++
src/bin/pgbench/pgbench.c | 27 +++++++++++++++++---
src/bin/pgbench/t/001_pgbench_with_server.pl | 18 +++++++++++++
3 files changed, 53 insertions(+), 3 deletions(-)
diff --git a/doc/src/sgml/ref/pgbench.sgml b/doc/src/sgml/ref/pgbench.sgml
index 05d3f81619..0781c5fba2 100644
--- a/doc/src/sgml/ref/pgbench.sgml
+++ b/doc/src/sgml/ref/pgbench.sgml
@@ -1399,6 +1399,17 @@ SELECT 4 AS four \; SELECT 5 AS five \aset
</listitem>
</varlistentry>
+ <varlistentry id="pgbench-metacommand-syncpipeline">
+ <term><literal>\syncpipeline</literal></term>
+
+ <listitem>
+ <para>
+ Sends a synchronisation point without ending the ongoing pipeline
+ and flushing the buffer. Needs an ongoing pipeline.
+ </para>
+ </listitem>
+ </varlistentry>
+
</variablelist>
</refsect2>
diff --git a/src/bin/pgbench/pgbench.c b/src/bin/pgbench/pgbench.c
index 2574454839..df96480bd5 100644
--- a/src/bin/pgbench/pgbench.c
+++ b/src/bin/pgbench/pgbench.c
@@ -608,6 +608,7 @@ typedef struct
int use_file; /* index in sql_script for this client */
int command; /* command number in script */
+ int num_syncs; /* Number of ongoing sync commands */
/* client variables */
Variables variables;
@@ -697,6 +698,7 @@ typedef enum MetaCommand
META_ELSE, /* \else */
META_ENDIF, /* \endif */
META_STARTPIPELINE, /* \startpipeline */
+ META_SYNCPIPELINE, /* \syncpipeline */
META_ENDPIPELINE, /* \endpipeline */
} MetaCommand;
@@ -2902,6 +2904,8 @@ getMetaCommand(const char *cmd)
mc = META_ASET;
else if (pg_strcasecmp(cmd, "startpipeline") == 0)
mc = META_STARTPIPELINE;
+ else if (pg_strcasecmp(cmd, "syncpipeline") == 0)
+ mc = META_SYNCPIPELINE;
else if (pg_strcasecmp(cmd, "endpipeline") == 0)
mc = META_ENDPIPELINE;
else
@@ -3317,8 +3321,9 @@ readCommandResponse(CState *st, MetaCommand meta, char *varprefix)
break;
case PGRES_PIPELINE_SYNC:
- pg_log_debug("client %d pipeline ending", st->id);
- if (PQexitPipelineMode(st->con) != 1)
+ pg_log_debug("client %d pipeline ending, num_syncs: %d", st->id, st->num_syncs);
+ st->num_syncs--;
+ if (st->num_syncs == 0 && PQexitPipelineMode(st->con) != 1)
pg_log_error("client %d failed to exit pipeline mode: %s", st->id,
PQerrorMessage(st->con));
break;
@@ -4438,6 +4443,20 @@ executeMetaCommand(CState *st, pg_time_usec_t *now)
return CSTATE_ABORTED;
}
}
+ else if (command->meta == META_SYNCPIPELINE)
+ {
+ if (PQpipelineStatus(st->con) != PQ_PIPELINE_ON)
+ {
+ commandFailed(st, "syncpipeline", "not in pipeline mode");
+ return CSTATE_ABORTED;
+ }
+ if (PQsendPipelineSync(st->con) == 0)
+ {
+ commandFailed(st, "syncpipeline", "failed to send a pipeline sync");
+ return CSTATE_ABORTED;
+ }
+ st->num_syncs++;
+ }
else if (command->meta == META_ENDPIPELINE)
{
if (PQpipelineStatus(st->con) != PQ_PIPELINE_ON)
@@ -4450,6 +4469,7 @@ executeMetaCommand(CState *st, pg_time_usec_t *now)
commandFailed(st, "endpipeline", "failed to send a pipeline sync");
return CSTATE_ABORTED;
}
+ st->num_syncs++;
/* Now wait for the PGRES_PIPELINE_SYNC and exit pipeline mode there */
/* collect pending results before getting out of pipeline mode */
return CSTATE_WAIT_RESULT;
@@ -5783,7 +5803,8 @@ process_backslash_command(PsqlScanState sstate, const char *source)
}
else if (my_command->meta == META_ELSE || my_command->meta == META_ENDIF ||
my_command->meta == META_STARTPIPELINE ||
- my_command->meta == META_ENDPIPELINE)
+ my_command->meta == META_ENDPIPELINE ||
+ my_command->meta == META_SYNCPIPELINE)
{
if (my_command->argc != 1)
syntax_error(source, lineno, my_command->first_line, my_command->argv[0],
diff --git a/src/bin/pgbench/t/001_pgbench_with_server.pl b/src/bin/pgbench/t/001_pgbench_with_server.pl
index 977bc7cdcc..6404eee529 100644
--- a/src/bin/pgbench/t/001_pgbench_with_server.pl
+++ b/src/bin/pgbench/t/001_pgbench_with_server.pl
@@ -814,6 +814,24 @@ $node->pgbench(
}
});
+# Working \startpipeline with \syncpipeline
+$node->pgbench(
+ '-t 1 -n -M extended',
+ 0,
+ [ qr{type: .*/001_pgbench_pipeline_sync}, qr{actually processed: 1/1} ],
+ [],
+ 'working \startpipeline with \syncpipeline',
+ {
+ '001_pgbench_pipeline_sync' => q{
+-- test startpipeline
+\startpipeline
+select 1;
+\syncpipeline
+select 2;
+\endpipeline
+}
+ });
+
# Working \startpipeline in prepared query mode
$node->pgbench(
'-t 1 -n -M prepared',
--
2.39.3 (Apple Git-145)
On Thu, Jan 18, 2024 at 09:48:28AM +0100, Anthonin Bonnefoy wrote:
PQsendSyncMessage() was added in
https://commitfest.postgresql.org/46/4262/. It allows users to add a
Sync message without flushing the buffer.As a follow-up, this change adds an additional meta-command to
pgbench, \syncpipeline, which will call PQsendSyncMessage(). This will
make it possible to benchmark impact and improvements of using
PQsendSyncMessage through pgbench.
Thanks for sending that as a separate patch.
As a matter of fact, I have already looked at what you are proposing
here for the sake of the other thread when checking the difference in
numbers with PQsendSyncMessage(). The logic looks sound, but I have a
comment about the docs: could it be better to group \syncpipeline with
\startpipeline and \endpipeline? \syncpipeline requires a pipeline to
work.
--
Michael
On Fri, Jan 19, 2024 at 5:08 AM Michael Paquier <michael@paquier.xyz> wrote:
The logic looks sound, but I have a
comment about the docs: could it be better to group \syncpipeline with
\startpipeline and \endpipeline? \syncpipeline requires a pipeline to
work.
I've updated the doc to group the commands. It does look better and
more consistent with similar command groups like \if.
Regards,
Anthonin
Attachments:
v2-0001-pgbench-Add-syncpipeline-to-pgbench.patchapplication/octet-stream; name=v2-0001-pgbench-Add-syncpipeline-to-pgbench.patchDownload
From 2be134add92a1a74c9e800b79225a290890f6b30 Mon Sep 17 00:00:00 2001
From: Anthonin Bonnefoy <anthonin.bonnefoy@datadoghq.com>
Date: Mon, 13 Nov 2023 08:14:11 +0100
Subject: [PATCH v2] pgbench: Add \syncpipeline to pgbench
This change adds a new metacommand \syncpipeline to pgbench. This
command allows to add a sync message without flushing using
PQsendPipelineSync().
This will allow to benchmark the possible improvements of using
PQsendPipelineSync()
---
doc/src/sgml/ref/pgbench.sgml | 12 ++++++---
src/bin/pgbench/pgbench.c | 27 +++++++++++++++++---
src/bin/pgbench/t/001_pgbench_with_server.pl | 21 +++++++++++++++
3 files changed, 54 insertions(+), 6 deletions(-)
diff --git a/doc/src/sgml/ref/pgbench.sgml b/doc/src/sgml/ref/pgbench.sgml
index 05d3f81619..279bb0ad7d 100644
--- a/doc/src/sgml/ref/pgbench.sgml
+++ b/doc/src/sgml/ref/pgbench.sgml
@@ -1386,13 +1386,19 @@ SELECT 4 AS four \; SELECT 5 AS five \aset
<varlistentry id="pgbench-metacommand-pipeline">
<term><literal>\startpipeline</literal></term>
+ <term><literal>\syncpipeline</literal></term>
<term><literal>\endpipeline</literal></term>
<listitem>
<para>
- These commands delimit the start and end of a pipeline of SQL
- statements. In pipeline mode, statements are sent to the server
- without waiting for the results of previous statements. See
+ This group of commands implements pipelining of SQL statements.
+ A pipeline must begin with a <command>\startpipeline</command>
+ and end with an <command>\endpipeline</command>. In between there
+ may be any number of <command>\syncpipeline</command> commands,
+ which sends a <link linkend="protocol-flow-ext-query">sync message</link>
+ without ending the ongoing pipeline and flushing the send buffer.
+ In pipeline mode, statements are sent to the server without waiting
+ for the results of previous statements. See
<xref linkend="libpq-pipeline-mode"/> for more details.
Pipeline mode requires the use of extended query protocol.
</para>
diff --git a/src/bin/pgbench/pgbench.c b/src/bin/pgbench/pgbench.c
index 2574454839..df96480bd5 100644
--- a/src/bin/pgbench/pgbench.c
+++ b/src/bin/pgbench/pgbench.c
@@ -608,6 +608,7 @@ typedef struct
int use_file; /* index in sql_script for this client */
int command; /* command number in script */
+ int num_syncs; /* Number of ongoing sync commands */
/* client variables */
Variables variables;
@@ -697,6 +698,7 @@ typedef enum MetaCommand
META_ELSE, /* \else */
META_ENDIF, /* \endif */
META_STARTPIPELINE, /* \startpipeline */
+ META_SYNCPIPELINE, /* \syncpipeline */
META_ENDPIPELINE, /* \endpipeline */
} MetaCommand;
@@ -2902,6 +2904,8 @@ getMetaCommand(const char *cmd)
mc = META_ASET;
else if (pg_strcasecmp(cmd, "startpipeline") == 0)
mc = META_STARTPIPELINE;
+ else if (pg_strcasecmp(cmd, "syncpipeline") == 0)
+ mc = META_SYNCPIPELINE;
else if (pg_strcasecmp(cmd, "endpipeline") == 0)
mc = META_ENDPIPELINE;
else
@@ -3317,8 +3321,9 @@ readCommandResponse(CState *st, MetaCommand meta, char *varprefix)
break;
case PGRES_PIPELINE_SYNC:
- pg_log_debug("client %d pipeline ending", st->id);
- if (PQexitPipelineMode(st->con) != 1)
+ pg_log_debug("client %d pipeline ending, num_syncs: %d", st->id, st->num_syncs);
+ st->num_syncs--;
+ if (st->num_syncs == 0 && PQexitPipelineMode(st->con) != 1)
pg_log_error("client %d failed to exit pipeline mode: %s", st->id,
PQerrorMessage(st->con));
break;
@@ -4438,6 +4443,20 @@ executeMetaCommand(CState *st, pg_time_usec_t *now)
return CSTATE_ABORTED;
}
}
+ else if (command->meta == META_SYNCPIPELINE)
+ {
+ if (PQpipelineStatus(st->con) != PQ_PIPELINE_ON)
+ {
+ commandFailed(st, "syncpipeline", "not in pipeline mode");
+ return CSTATE_ABORTED;
+ }
+ if (PQsendPipelineSync(st->con) == 0)
+ {
+ commandFailed(st, "syncpipeline", "failed to send a pipeline sync");
+ return CSTATE_ABORTED;
+ }
+ st->num_syncs++;
+ }
else if (command->meta == META_ENDPIPELINE)
{
if (PQpipelineStatus(st->con) != PQ_PIPELINE_ON)
@@ -4450,6 +4469,7 @@ executeMetaCommand(CState *st, pg_time_usec_t *now)
commandFailed(st, "endpipeline", "failed to send a pipeline sync");
return CSTATE_ABORTED;
}
+ st->num_syncs++;
/* Now wait for the PGRES_PIPELINE_SYNC and exit pipeline mode there */
/* collect pending results before getting out of pipeline mode */
return CSTATE_WAIT_RESULT;
@@ -5783,7 +5803,8 @@ process_backslash_command(PsqlScanState sstate, const char *source)
}
else if (my_command->meta == META_ELSE || my_command->meta == META_ENDIF ||
my_command->meta == META_STARTPIPELINE ||
- my_command->meta == META_ENDPIPELINE)
+ my_command->meta == META_ENDPIPELINE ||
+ my_command->meta == META_SYNCPIPELINE)
{
if (my_command->argc != 1)
syntax_error(source, lineno, my_command->first_line, my_command->argv[0],
diff --git a/src/bin/pgbench/t/001_pgbench_with_server.pl b/src/bin/pgbench/t/001_pgbench_with_server.pl
index fc57facf9e..b7c755b0ad 100644
--- a/src/bin/pgbench/t/001_pgbench_with_server.pl
+++ b/src/bin/pgbench/t/001_pgbench_with_server.pl
@@ -814,6 +814,27 @@ $node->pgbench(
}
});
+# Working \startpipeline with \syncpipeline
+$node->pgbench(
+ '-t 1 -n -M extended',
+ 0,
+ [ qr{type: .*/001_pgbench_pipeline_sync}, qr{actually processed: 1/1} ],
+ [],
+ 'working \startpipeline with \syncpipeline',
+ {
+ '001_pgbench_pipeline_sync' => q{
+-- test startpipeline
+\startpipeline
+select 1;
+\syncpipeline
+\syncpipeline
+select 2;
+\syncpipeline
+select 3;
+\endpipeline
+}
+ });
+
# Working \startpipeline in prepared query mode
$node->pgbench(
'-t 1 -n -M prepared',
--
2.39.3 (Apple Git-145)
On Fri, Jan 19, 2024 at 08:55:31AM +0100, Anthonin Bonnefoy wrote:
I've updated the doc to group the commands. It does look better and
more consistent with similar command groups like \if.
I was playing with a few meta command scenarios while looking at this
patch, and this sequence generates an error that should never happen:
$ cat /tmp/test.sql
\startpipeline
\syncpipeline
$ pgbench -n -f /tmp/test.sql -M extended
[...]
pgbench: error: unexpected transaction status 1
pgbench: error: client 0 aborted while receiving the transaction status
It looks to me that we need to be much smarter than that for the error
handling we'd need when a sync request is optionally sent when a
transaction stops at the end of pgbench. Could you look at it?
--
Michael
That looks like a bug with how opened pipelines are not caught at the
end of the script processing. startpipeline seems to have similar
related issues.
$ cat only_startpipeline.sql
\startpipeline
SELECT 1;
With only 1 transaction, pgbench will consider this a success despite
not sending anything since the pipeline was not flushed:
pgbench -t1 -Mextended -f only_startpipeline.sql
[...]
number of transactions per client: 1
number of transactions actually processed: 1/1
With 2 transactions, the error will happen when \startpipeline is
called a second time:
pgbench -t2 -Mextended -f only_startpipeline.sql
[...]
pgbench: error: client 0 aborted in command 0 (startpipeline) of
script 0; already in pipeline mode
number of transactions per client: 2
number of transactions actually processed: 1/2
I've split the changes into two patches.
0001 introduces a new error when the end of a pgbench script is
reached while there's still an ongoing pipeline.
0002 adds the \syncpipeline command (original patch with an additional
test case).
Regards,
Anthonin
Show quoted text
On Mon, Jan 22, 2024 at 7:16 AM Michael Paquier <michael@paquier.xyz> wrote:
On Fri, Jan 19, 2024 at 08:55:31AM +0100, Anthonin Bonnefoy wrote:
I've updated the doc to group the commands. It does look better and
more consistent with similar command groups like \if.I was playing with a few meta command scenarios while looking at this
patch, and this sequence generates an error that should never happen:
$ cat /tmp/test.sql
\startpipeline
\syncpipeline
$ pgbench -n -f /tmp/test.sql -M extended
[...]
pgbench: error: unexpected transaction status 1
pgbench: error: client 0 aborted while receiving the transaction statusIt looks to me that we need to be much smarter than that for the error
handling we'd need when a sync request is optionally sent when a
transaction stops at the end of pgbench. Could you look at it?
--
Michael
Attachments:
v3-0001-Abort-pgbench-if-script-end-is-reached-with-an-op.patchapplication/octet-stream; name=v3-0001-Abort-pgbench-if-script-end-is-reached-with-an-op.patchDownload
From 5475f855141251776e6d1a08902fde895fb99782 Mon Sep 17 00:00:00 2001
From: Anthonin Bonnefoy <anthonin.bonnefoy@datadoghq.com>
Date: Mon, 22 Jan 2024 09:19:20 +0100
Subject: [PATCH v3 1/2] Abort pgbench if script end is reached with an opened
pipeline
When a pipeline is opened with \startpipeline and not closed, pgbench
will either error on the next transaction with a "already in pipeline
mode" error or successfully ends if this was the last transaction
despite not sending anything that was piped in the pipeline.
This change adds an error if the end of script is reached while there's
an opened pipeline.
---
src/bin/pgbench/pgbench.c | 8 +++++-
src/bin/pgbench/t/001_pgbench_with_server.pl | 28 ++++++++++++++++++++
2 files changed, 35 insertions(+), 1 deletion(-)
diff --git a/src/bin/pgbench/pgbench.c b/src/bin/pgbench/pgbench.c
index 2574454839..03ad74a738 100644
--- a/src/bin/pgbench/pgbench.c
+++ b/src/bin/pgbench/pgbench.c
@@ -3766,7 +3766,13 @@ advanceConnectionState(TState *thread, CState *st, StatsData *agg)
/* Transition to script end processing if done */
if (command == NULL)
{
- st->state = CSTATE_END_TX;
+ if (PQpipelineStatus(st->con) != PQ_PIPELINE_OFF)
+ {
+ pg_log_error("client %d aborted: end of script reached without closing the ongoing pipeline",
+ st->id);
+ st->state = CSTATE_ABORTED;
+ } else
+ st->state = CSTATE_END_TX;
break;
}
diff --git a/src/bin/pgbench/t/001_pgbench_with_server.pl b/src/bin/pgbench/t/001_pgbench_with_server.pl
index fc57facf9e..532e4a5e98 100644
--- a/src/bin/pgbench/t/001_pgbench_with_server.pl
+++ b/src/bin/pgbench/t/001_pgbench_with_server.pl
@@ -876,6 +876,34 @@ select 1 \gset f
}
});
+# Try \startpipeline without \endpipeline in a single transaction
+$node->pgbench(
+ '-t 1 -n -M extended',
+ 2,
+ [],
+ [qr{end of script reached without closing the ongoing pipeline}],
+ 'error: call \startpipeline without \endpipeline in a single transaction',
+ {
+ '001_pgbench_pipeline_5' => q{
+-- startpipeline only with single transaction
+\startpipeline
+}
+ });
+
+# Try \startpipeline without \endpipeline
+$node->pgbench(
+ '-t 2 -n -M extended',
+ 2,
+ [],
+ [qr{end of script reached without closing the ongoing pipeline}],
+ 'error: call \startpipeline without \endpipeline',
+ {
+ '001_pgbench_pipeline_6' => q{
+-- startpipeline only
+\startpipeline
+}
+ });
+
# Working \startpipeline in prepared query mode with serializable
$node->pgbench(
'-c4 -t 10 -n -M prepared',
--
2.39.3 (Apple Git-145)
v3-0002-pgbench-Add-syncpipeline-to-pgbench.patchapplication/octet-stream; name=v3-0002-pgbench-Add-syncpipeline-to-pgbench.patchDownload
From fae2e6a098c70940d0d87351472e7347a94ffc4f Mon Sep 17 00:00:00 2001
From: Anthonin Bonnefoy <anthonin.bonnefoy@datadoghq.com>
Date: Mon, 13 Nov 2023 08:14:11 +0100
Subject: [PATCH v3 2/2] pgbench: Add \syncpipeline to pgbench
This change adds a new metacommand \syncpipeline to pgbench. This
command allows to add a sync message without flushing using
PQsendPipelineSync().
This will allow to benchmark the possible improvements of using
PQsendPipelineSync()
---
doc/src/sgml/ref/pgbench.sgml | 12 +++++--
src/bin/pgbench/pgbench.c | 27 +++++++++++++--
src/bin/pgbench/t/001_pgbench_with_server.pl | 36 ++++++++++++++++++++
3 files changed, 69 insertions(+), 6 deletions(-)
diff --git a/doc/src/sgml/ref/pgbench.sgml b/doc/src/sgml/ref/pgbench.sgml
index 05d3f81619..279bb0ad7d 100644
--- a/doc/src/sgml/ref/pgbench.sgml
+++ b/doc/src/sgml/ref/pgbench.sgml
@@ -1386,13 +1386,19 @@ SELECT 4 AS four \; SELECT 5 AS five \aset
<varlistentry id="pgbench-metacommand-pipeline">
<term><literal>\startpipeline</literal></term>
+ <term><literal>\syncpipeline</literal></term>
<term><literal>\endpipeline</literal></term>
<listitem>
<para>
- These commands delimit the start and end of a pipeline of SQL
- statements. In pipeline mode, statements are sent to the server
- without waiting for the results of previous statements. See
+ This group of commands implements pipelining of SQL statements.
+ A pipeline must begin with a <command>\startpipeline</command>
+ and end with an <command>\endpipeline</command>. In between there
+ may be any number of <command>\syncpipeline</command> commands,
+ which sends a <link linkend="protocol-flow-ext-query">sync message</link>
+ without ending the ongoing pipeline and flushing the send buffer.
+ In pipeline mode, statements are sent to the server without waiting
+ for the results of previous statements. See
<xref linkend="libpq-pipeline-mode"/> for more details.
Pipeline mode requires the use of extended query protocol.
</para>
diff --git a/src/bin/pgbench/pgbench.c b/src/bin/pgbench/pgbench.c
index 03ad74a738..eb25f62be7 100644
--- a/src/bin/pgbench/pgbench.c
+++ b/src/bin/pgbench/pgbench.c
@@ -608,6 +608,7 @@ typedef struct
int use_file; /* index in sql_script for this client */
int command; /* command number in script */
+ int num_syncs; /* Number of ongoing sync commands */
/* client variables */
Variables variables;
@@ -697,6 +698,7 @@ typedef enum MetaCommand
META_ELSE, /* \else */
META_ENDIF, /* \endif */
META_STARTPIPELINE, /* \startpipeline */
+ META_SYNCPIPELINE, /* \syncpipeline */
META_ENDPIPELINE, /* \endpipeline */
} MetaCommand;
@@ -2902,6 +2904,8 @@ getMetaCommand(const char *cmd)
mc = META_ASET;
else if (pg_strcasecmp(cmd, "startpipeline") == 0)
mc = META_STARTPIPELINE;
+ else if (pg_strcasecmp(cmd, "syncpipeline") == 0)
+ mc = META_SYNCPIPELINE;
else if (pg_strcasecmp(cmd, "endpipeline") == 0)
mc = META_ENDPIPELINE;
else
@@ -3317,8 +3321,9 @@ readCommandResponse(CState *st, MetaCommand meta, char *varprefix)
break;
case PGRES_PIPELINE_SYNC:
- pg_log_debug("client %d pipeline ending", st->id);
- if (PQexitPipelineMode(st->con) != 1)
+ pg_log_debug("client %d pipeline ending, num_syncs: %d", st->id, st->num_syncs);
+ st->num_syncs--;
+ if (st->num_syncs == 0 && PQexitPipelineMode(st->con) != 1)
pg_log_error("client %d failed to exit pipeline mode: %s", st->id,
PQerrorMessage(st->con));
break;
@@ -4444,6 +4449,20 @@ executeMetaCommand(CState *st, pg_time_usec_t *now)
return CSTATE_ABORTED;
}
}
+ else if (command->meta == META_SYNCPIPELINE)
+ {
+ if (PQpipelineStatus(st->con) != PQ_PIPELINE_ON)
+ {
+ commandFailed(st, "syncpipeline", "not in pipeline mode");
+ return CSTATE_ABORTED;
+ }
+ if (PQsendPipelineSync(st->con) == 0)
+ {
+ commandFailed(st, "syncpipeline", "failed to send a pipeline sync");
+ return CSTATE_ABORTED;
+ }
+ st->num_syncs++;
+ }
else if (command->meta == META_ENDPIPELINE)
{
if (PQpipelineStatus(st->con) != PQ_PIPELINE_ON)
@@ -4456,6 +4475,7 @@ executeMetaCommand(CState *st, pg_time_usec_t *now)
commandFailed(st, "endpipeline", "failed to send a pipeline sync");
return CSTATE_ABORTED;
}
+ st->num_syncs++;
/* Now wait for the PGRES_PIPELINE_SYNC and exit pipeline mode there */
/* collect pending results before getting out of pipeline mode */
return CSTATE_WAIT_RESULT;
@@ -5789,7 +5809,8 @@ process_backslash_command(PsqlScanState sstate, const char *source)
}
else if (my_command->meta == META_ELSE || my_command->meta == META_ENDIF ||
my_command->meta == META_STARTPIPELINE ||
- my_command->meta == META_ENDPIPELINE)
+ my_command->meta == META_ENDPIPELINE ||
+ my_command->meta == META_SYNCPIPELINE)
{
if (my_command->argc != 1)
syntax_error(source, lineno, my_command->first_line, my_command->argv[0],
diff --git a/src/bin/pgbench/t/001_pgbench_with_server.pl b/src/bin/pgbench/t/001_pgbench_with_server.pl
index 532e4a5e98..acb1d699fe 100644
--- a/src/bin/pgbench/t/001_pgbench_with_server.pl
+++ b/src/bin/pgbench/t/001_pgbench_with_server.pl
@@ -814,6 +814,27 @@ $node->pgbench(
}
});
+# Working \startpipeline with \syncpipeline
+$node->pgbench(
+ '-t 1 -n -M extended',
+ 0,
+ [ qr{type: .*/001_pgbench_pipeline_sync}, qr{actually processed: 1/1} ],
+ [],
+ 'working \startpipeline with \syncpipeline',
+ {
+ '001_pgbench_pipeline_sync' => q{
+-- test startpipeline
+\startpipeline
+select 1;
+\syncpipeline
+\syncpipeline
+select 2;
+\syncpipeline
+select 3;
+\endpipeline
+}
+ });
+
# Working \startpipeline in prepared query mode
$node->pgbench(
'-t 1 -n -M prepared',
@@ -904,6 +925,21 @@ $node->pgbench(
}
});
+# Try \startpipeline with \syncpipeline without \endpipeline
+$node->pgbench(
+ '-t 2 -n -M extended',
+ 2,
+ [],
+ [qr{end of script reached without closing the ongoing pipeline}],
+ 'error: call \startpipeline and \syncpipeline without \endpipeline',
+ {
+ '001_pgbench_pipeline_7' => q{
+-- startpipeline with \syncpipeline only
+\startpipeline
+\syncpipeline
+}
+ });
+
# Working \startpipeline in prepared query mode with serializable
$node->pgbench(
'-c4 -t 10 -n -M prepared',
--
2.39.3 (Apple Git-145)
On 2024-Jan-22, Anthonin Bonnefoy wrote:
That looks like a bug with how opened pipelines are not caught at the
end of the script processing. startpipeline seems to have similar
related issues.
Ah, yeah. Your fix looks necessary on a quick look. I'll review and
see about backpatching this.
0002 adds the \syncpipeline command (original patch with an additional
test case).
I can look into this one later, unless Michael wants to.
--
Álvaro Herrera PostgreSQL Developer — https://www.EnterpriseDB.com/
"No me acuerdo, pero no es cierto. No es cierto, y si fuera cierto,
no me acuerdo." (Augusto Pinochet a una corte de justicia)
On 2024-Jan-22, Anthonin Bonnefoy wrote:
0001 introduces a new error when the end of a pgbench script is
reached while there's still an ongoing pipeline.
Pushed, backpatched to 14. I reworded the error message to be
client %d aborted: end of script reached with pipeline open
I hope this is OK. I debated a half a dozen alternatives ("with open
pipeline", "without closing pipeline", "with unclosed pipeline" (???),
"leaving pipeline open") and decided this was the least ugly.
Thanks,
--
Álvaro Herrera 48°01'N 7°57'E — https://www.EnterpriseDB.com/
<inflex> really, I see PHP as like a strange amalgamation of C, Perl, Shell
<crab> inflex: you know that "amalgam" means "mixture with mercury",
more or less, right?
<crab> i.e., "deadly poison"
On Mon, Jan 22, 2024 at 05:53:13PM +0100, Alvaro Herrera wrote:
I hope this is OK. I debated a half a dozen alternatives ("with open
pipeline", "without closing pipeline", "with unclosed pipeline" (???),
"leaving pipeline open") and decided this was the least ugly.
That looks OK to me. Thanks for looking at that!
--
Michael
On Mon, Jan 22, 2024 at 01:59:00PM +0100, Alvaro Herrera wrote:
On 2024-Jan-22, Anthonin Bonnefoy wrote:
0002 adds the \syncpipeline command (original patch with an additional
test case).I can look into this one later, unless Michael wants to.
The patch seemed rather OK at quick glance as long as there is a test
to check for error path with a \syncpipeline still on the stack of
metacommands to handle.
Anyway, I wanted to study this one and learn a bit more about the
error stuff that was happening on pgbench side. Now, if you feel
strongly about it, please go ahead!
--
Michael
On Tue, Jan 23, 2024 at 01:08:24PM +0900, Michael Paquier wrote:
Anyway, I wanted to study this one and learn a bit more about the
error stuff that was happening on pgbench side.
Well, I've spend some time studying this part, and the error handling
was looking correct based on the safety measures added in
49f7c6c44a5f, so I've just moved on and applied it.
--
Michael