Is SPI + Procedures (with COMMIT) inside a bgworker broken?

Started by Fabrízio de Royes Melloover 4 years ago3 messages
#1Fabrízio de Royes Mello
fabriziomello@gmail.com
1 attachment(s)

Hi all,

I'm trying to execute a PROCEDURE (with COMMIT inside) called from a
background worker using SPI but I'm always getting the error below:

2021-09-13 09:36:43.568 -03 [23845] LOG: worker_spi worker 2 initialized
with schema2.counted
2021-09-13 09:36:43.568 -03 [23846] LOG: worker_spi worker 1 initialized
with schema1.counted
2021-09-13 09:36:43.571 -03 [23846] ERROR: invalid transaction termination
2021-09-13 09:36:43.571 -03 [23846] CONTEXT: PL/pgSQL function
schema1.counted_proc() line 1 at COMMIT
SQL statement "CALL "schema1"."counted_proc"()"
2021-09-13 09:36:43.571 -03 [23846] STATEMENT: CALL
"schema1"."counted_proc"()
2021-09-13 09:36:43.571 -03 [23845] ERROR: invalid transaction termination
2021-09-13 09:36:43.571 -03 [23845] CONTEXT: PL/pgSQL function
schema2.counted_proc() line 1 at COMMIT
SQL statement "CALL "schema2"."counted_proc"()"
2021-09-13 09:36:43.571 -03 [23845] STATEMENT: CALL
"schema2"."counted_proc"()
2021-09-13 09:36:43.571 -03 [23838] LOG: background worker "worker_spi"
(PID 23845) exited with exit code 1
2021-09-13 09:36:43.571 -03 [23838] LOG: background worker "worker_spi"
(PID 23846) exited with exit code 1

I changed the worker_spi example (attached) a bit to execute a simple
procedure. Even using SPI_connect_ext(SPI_OPT_NONATOMIC) I'm getting the
error "invalid transaction termination".

There are something wrong with the attached example or am I missing
something?

Regards,

--
Fabrízio de Royes Mello

Attachments:

worker_spi_procedure.patchtext/x-patch; charset=US-ASCII; name=worker_spi_procedure.patchDownload
diff --git a/src/test/modules/worker_spi/worker_spi.c b/src/test/modules/worker_spi/worker_spi.c
index d0acef2652..0b9e4e9335 100644
--- a/src/test/modules/worker_spi/worker_spi.c
+++ b/src/test/modules/worker_spi/worker_spi.c
@@ -108,8 +108,20 @@ initialize_worker_spi(worktable *table)
 						 "		type text CHECK (type IN ('total', 'delta')), "
 						 "		value	integer)"
 						 "CREATE UNIQUE INDEX \"%s_unique_total\" ON \"%s\" (type) "
-						 "WHERE type = 'total'",
-						 table->schema, table->name, table->name, table->name);
+						 "WHERE type = 'total'; "
+						 "CREATE PROCEDURE \"%s\".\"%s_proc\"() AS $$ "
+						 "DECLARE "
+						 "  i INTEGER; "
+						 "BEGIN "
+						 "  FOR i IN 1..10 "
+						 "  LOOP "
+						 "    INSERT INTO \"%s\".\"%s\" VALUES ('delta', i); "
+						 "    COMMIT; "
+						 "  END LOOP; "
+						 "END; "
+						 "$$ LANGUAGE plpgsql; ",
+						 table->schema, table->name, table->name, table->name,
+						 table->schema, table->name, table->schema, table->name);
 
 		/* set statement start time */
 		SetCurrentStatementStartTimestamp();
@@ -168,20 +180,8 @@ worker_spi_main(Datum main_arg)
 	table->name = quote_identifier(table->name);
 
 	initStringInfo(&buf);
-	appendStringInfo(&buf,
-					 "WITH deleted AS (DELETE "
-					 "FROM %s.%s "
-					 "WHERE type = 'delta' RETURNING value), "
-					 "total AS (SELECT coalesce(sum(value), 0) as sum "
-					 "FROM deleted) "
-					 "UPDATE %s.%s "
-					 "SET value = %s.value + total.sum "
-					 "FROM total WHERE type = 'total' "
-					 "RETURNING %s.value",
-					 table->schema, table->name,
-					 table->schema, table->name,
-					 table->name,
-					 table->name);
+
+	appendStringInfo(&buf, "CALL \"%s\".\"%s_proc\"()", table->schema, table->name);
 
 	/*
 	 * Main loop: do this until SIGTERM is received and processed by
@@ -232,7 +232,7 @@ worker_spi_main(Datum main_arg)
 		 */
 		SetCurrentStatementStartTimestamp();
 		StartTransactionCommand();
-		SPI_connect();
+		SPI_connect_ext(SPI_OPT_NONATOMIC);
 		PushActiveSnapshot(GetTransactionSnapshot());
 		debug_query_string = buf.data;
 		pgstat_report_activity(STATE_RUNNING, buf.data);
@@ -240,30 +240,15 @@ worker_spi_main(Datum main_arg)
 		/* We can now execute queries via SPI */
 		ret = SPI_execute(buf.data, false, 0);
 
-		if (ret != SPI_OK_UPDATE_RETURNING)
-			elog(FATAL, "cannot select from table %s.%s: error code %d",
-				 table->schema, table->name, ret);
-
-		if (SPI_processed > 0)
-		{
-			bool		isnull;
-			int32		val;
-
-			val = DatumGetInt32(SPI_getbinval(SPI_tuptable->vals[0],
-											  SPI_tuptable->tupdesc,
-											  1, &isnull));
-			if (!isnull)
-				elog(LOG, "%s: count in %s.%s is now %d",
-					 MyBgworkerEntry->bgw_name,
-					 table->schema, table->name, val);
-		}
+		if (ret != SPI_OK_UTILITY)
+			elog(FATAL, "failed to call procedure");
 
 		/*
 		 * And finish our transaction.
 		 */
-		SPI_finish();
 		PopActiveSnapshot();
 		CommitTransactionCommand();
+		SPI_finish();
 		debug_query_string = NULL;
 		pgstat_report_stat(false);
 		pgstat_report_activity(STATE_IDLE, NULL);
#2Tom Lane
tgl@sss.pgh.pa.us
In reply to: Fabrízio de Royes Mello (#1)
Re: Is SPI + Procedures (with COMMIT) inside a bgworker broken?

=?UTF-8?Q?Fabr=C3=ADzio_de_Royes_Mello?= <fabriziomello@gmail.com> writes:

I'm trying to execute a PROCEDURE (with COMMIT inside) called from a
background worker using SPI but I'm always getting the error below:
2021-09-13 09:36:43.571 -03 [23846] ERROR: invalid transaction termination

The direct cause of that is that SPI_execute() doesn't permit the called
query to perform COMMIT/ROLLBACK, which is because most callers would fail
to cope with that. You can instruct SPI to allow that by replacing the
SPI_execute() call with something like

SPIExecuteOptions options;

...
memset(&options, 0, sizeof(options));
options.allow_nonatomic = true;

ret = SPI_execute_extended(buf.data, &options);

However, that's not enough to make this example work :-(.
I find that it still fails inside the procedure's COMMIT,
with

2021-09-13 15:14:54.775 EDT worker_spi[476310] ERROR: portal snapshots (0) did not account for all active snapshots (1)
2021-09-13 15:14:54.775 EDT worker_spi[476310] CONTEXT: PL/pgSQL function schema4.counted_proc() line 1 at COMMIT
SQL statement "CALL "schema4"."counted_proc"()"

I think what this indicates is that worker_spi_main's cavalier
management of the active snapshot isn't up to snuff for this
use-case. The error is coming from ForgetPortalSnapshots, which
is expecting that all active snapshots are attached to Portals;
but that one isn't.

Probably the most appropriate fix is to make worker_spi_main
set up a Portal to run the query inside of. There are other
bits of code that are not happy if they're not inside a Portal,
so if you're hoping to run arbitrary SQL this way, sooner or
later you're going to have to cross that bridge.

(I remain of the opinion that replication/logical/worker.c
is going to have to do that eventually, too...)

regards, tom lane

#3Fabrízio de Royes Mello
fabriziomello@gmail.com
In reply to: Tom Lane (#2)
1 attachment(s)
Re: Is SPI + Procedures (with COMMIT) inside a bgworker broken?

On Mon, Sep 13, 2021 at 4:30 PM Tom Lane <tgl@sss.pgh.pa.us> wrote:

The direct cause of that is that SPI_execute() doesn't permit the called
query to perform COMMIT/ROLLBACK, which is because most callers would fail
to cope with that. You can instruct SPI to allow that by replacing the
SPI_execute() call with something like

SPIExecuteOptions options;

...
memset(&options, 0, sizeof(options));
options.allow_nonatomic = true;

ret = SPI_execute_extended(buf.data, &options);

I completely forgot about the SPI execute options... Thanks for the
explanation!!!

However, that's not enough to make this example work :-(.
I find that it still fails inside the procedure's COMMIT,
with

2021-09-13 15:14:54.775 EDT worker_spi[476310] ERROR: portal snapshots

(0) did not account for all active snapshots (1)

2021-09-13 15:14:54.775 EDT worker_spi[476310] CONTEXT: PL/pgSQL

function schema4.counted_proc() line 1 at COMMIT

SQL statement "CALL "schema4"."counted_proc"()"

I think what this indicates is that worker_spi_main's cavalier
management of the active snapshot isn't up to snuff for this
use-case. The error is coming from ForgetPortalSnapshots, which
is expecting that all active snapshots are attached to Portals;
but that one isn't.

That is exactly the root cause of all my investigation.

At Timescale we have a scheduler (background worker) that launches another
background worker to "execute a job", and by executing a job it means to
call a function [1]https://github.com/timescale/timescaledb/blob/master/tsl/src/bgw_policy/job.c#L726 or a procedure [2]https://github.com/timescale/timescaledb/blob/master/tsl/src/bgw_policy/job.c#L741 directly without a SPI.

But now a user raised an issue about snapshots [3]https://github.com/timescale/timescaledb/issues/3545 and when I saw the code
for the first time I tried to use SPI and it didn't work as expected.

Even tweaking worker_spi to execute the procedure without SPI by calling
ExecuteCallStmt (attached) we end up with the same situation about the
active snapshots:

2021-09-13 20:14:36.654 -03 [21483] LOG: worker_spi worker 2 initialized
with schema2.counted
2021-09-13 20:14:36.655 -03 [21484] LOG: worker_spi worker 1 initialized
with schema1.counted
2021-09-13 20:14:36.657 -03 [21483] ERROR: portal snapshots (0) did not
account for all active snapshots (1)
2021-09-13 20:14:36.657 -03 [21483] CONTEXT: PL/pgSQL function
schema2.counted_proc() line 1 at COMMIT
2021-09-13 20:14:36.657 -03 [21484] ERROR: portal snapshots (0) did not
account for all active snapshots (1)
2021-09-13 20:14:36.657 -03 [21484] CONTEXT: PL/pgSQL function
schema1.counted_proc() line 1 at COMMIT
2021-09-13 20:14:36.659 -03 [21476] LOG: background worker "worker_spi"
(PID 21483) exited with exit code 1
2021-09-13 20:14:36.659 -03 [21476] LOG: background worker "worker_spi"
(PID 21484) exited with exit code 1

Probably the most appropriate fix is to make worker_spi_main
set up a Portal to run the query inside of. There are other
bits of code that are not happy if they're not inside a Portal,
so if you're hoping to run arbitrary SQL this way, sooner or
later you're going to have to cross that bridge.

I started digging with it [4]https://github.com/fabriziomello/timescaledb/blob/issue/3545/tsl/src/bgw_policy/job.c#L824 by creating a Portal from scratch to execute
the Function or Procedure and it worked.

We're wondering if we can avoid the parser for PortalRun, can we??

Regards,

[1]: https://github.com/timescale/timescaledb/blob/master/tsl/src/bgw_policy/job.c#L726
https://github.com/timescale/timescaledb/blob/master/tsl/src/bgw_policy/job.c#L726
[2]: https://github.com/timescale/timescaledb/blob/master/tsl/src/bgw_policy/job.c#L741
https://github.com/timescale/timescaledb/blob/master/tsl/src/bgw_policy/job.c#L741
[3]: https://github.com/timescale/timescaledb/issues/3545
[4]: https://github.com/fabriziomello/timescaledb/blob/issue/3545/tsl/src/bgw_policy/job.c#L824
https://github.com/fabriziomello/timescaledb/blob/issue/3545/tsl/src/bgw_policy/job.c#L824

--
Fabrízio de Royes Mello

Attachments:

worker_spi_executecallstmt.patchtext/x-patch; charset=US-ASCII; name=worker_spi_executecallstmt.patchDownload
diff --git a/src/test/modules/worker_spi/worker_spi.c b/src/test/modules/worker_spi/worker_spi.c
index d0acef2652..dace150fa7 100644
--- a/src/test/modules/worker_spi/worker_spi.c
+++ b/src/test/modules/worker_spi/worker_spi.c
@@ -42,6 +42,12 @@
 #include "utils/snapmgr.h"
 #include "tcop/utility.h"
 
+#include "nodes/makefuncs.h"
+#include "nodes/nodes.h"
+#include "nodes/pg_list.h"
+#include "parser/parse_func.h"
+#include "commands/defrem.h"
+
 PG_MODULE_MAGIC;
 
 PG_FUNCTION_INFO_V1(worker_spi_launch);
@@ -59,6 +65,7 @@ typedef struct worktable
 {
 	const char *schema;
 	const char *name;
+	const char *proc;
 } worktable;
 
 /*
@@ -108,8 +115,20 @@ initialize_worker_spi(worktable *table)
 						 "		type text CHECK (type IN ('total', 'delta')), "
 						 "		value	integer)"
 						 "CREATE UNIQUE INDEX \"%s_unique_total\" ON \"%s\" (type) "
-						 "WHERE type = 'total'",
-						 table->schema, table->name, table->name, table->name);
+						 "WHERE type = 'total'; "
+						 "CREATE PROCEDURE \"%s\".\"%s\"() AS $$ "
+						 "DECLARE "
+						 "  i INTEGER; "
+						 "BEGIN "
+						 "  FOR i IN 1..10 "
+						 "  LOOP "
+						 "    INSERT INTO \"%s\".\"%s\" VALUES ('delta', i); "
+						 "    COMMIT; "
+						 "  END LOOP; "
+						 "END; "
+						 "$$ LANGUAGE plpgsql; ",
+						 table->schema, table->name, table->name, table->name,
+						 table->schema, table->proc, table->schema, table->name);
 
 		/* set statement start time */
 		SetCurrentStatementStartTimestamp();
@@ -137,11 +156,16 @@ worker_spi_main(Datum main_arg)
 	worktable  *table;
 	StringInfoData buf;
 	char		name[20];
+	FuncExpr 	*funcexpr;
+	Oid 		proc;
+	ObjectWithArgs *object;
+	MemoryContext oldcontext = CurrentMemoryContext;
 
 	table = palloc(sizeof(worktable));
 	sprintf(name, "schema%d", index);
 	table->schema = pstrdup(name);
 	table->name = pstrdup("counted");
+	table->proc = pstrdup("counted_proc");
 
 	/* Establish signal handlers before unblocking signals. */
 	pqsignal(SIGHUP, SignalHandlerForConfigReload);
@@ -157,6 +181,27 @@ worker_spi_main(Datum main_arg)
 		 MyBgworkerEntry->bgw_name, table->schema, table->name);
 	initialize_worker_spi(table);
 
+	StartTransactionCommand();
+
+	/* build a function expression call */
+	object = makeNode(ObjectWithArgs);
+	object->objname = list_make2(makeString((char *)table->schema),
+								 makeString((char *)table->proc));
+	proc = LookupFuncWithArgs(OBJECT_ROUTINE, object, false);
+
+	CommitTransactionCommand();
+
+	MemoryContextSwitchTo(oldcontext);
+
+	funcexpr = makeFuncExpr(proc,
+							VOIDOID,
+							NIL,
+							InvalidOid,
+							InvalidOid,
+							COERCE_EXPLICIT_CALL);
+
+	MemoryContextSwitchTo(oldcontext);
+
 	/*
 	 * Quote identifiers passed to us.  Note that this must be done after
 	 * initialize_worker_spi, because that routine assumes the names are not
@@ -166,22 +211,10 @@ worker_spi_main(Datum main_arg)
 	 */
 	table->schema = quote_identifier(table->schema);
 	table->name = quote_identifier(table->name);
+	table->proc = quote_identifier(table->proc);
 
 	initStringInfo(&buf);
-	appendStringInfo(&buf,
-					 "WITH deleted AS (DELETE "
-					 "FROM %s.%s "
-					 "WHERE type = 'delta' RETURNING value), "
-					 "total AS (SELECT coalesce(sum(value), 0) as sum "
-					 "FROM deleted) "
-					 "UPDATE %s.%s "
-					 "SET value = %s.value + total.sum "
-					 "FROM total WHERE type = 'total' "
-					 "RETURNING %s.value",
-					 table->schema, table->name,
-					 table->schema, table->name,
-					 table->name,
-					 table->name);
+	appendStringInfo(&buf, "CALL %s.%s()", table->schema, table->proc);
 
 	/*
 	 * Main loop: do this until SIGTERM is received and processed by
@@ -189,7 +222,8 @@ worker_spi_main(Datum main_arg)
 	 */
 	for (;;)
 	{
-		int			ret;
+		CallStmt 		*call;
+		DestReceiver 	*dest;
 
 		/*
 		 * Background workers mustn't call usleep() or any direct equivalent:
@@ -232,39 +266,19 @@ worker_spi_main(Datum main_arg)
 		 */
 		SetCurrentStatementStartTimestamp();
 		StartTransactionCommand();
-		SPI_connect();
 		PushActiveSnapshot(GetTransactionSnapshot());
-		debug_query_string = buf.data;
 		pgstat_report_activity(STATE_RUNNING, buf.data);
 
-		/* We can now execute queries via SPI */
-		ret = SPI_execute(buf.data, false, 0);
-
-		if (ret != SPI_OK_UPDATE_RETURNING)
-			elog(FATAL, "cannot select from table %s.%s: error code %d",
-				 table->schema, table->name, ret);
-
-		if (SPI_processed > 0)
-		{
-			bool		isnull;
-			int32		val;
-
-			val = DatumGetInt32(SPI_getbinval(SPI_tuptable->vals[0],
-											  SPI_tuptable->tupdesc,
-											  1, &isnull));
-			if (!isnull)
-				elog(LOG, "%s: count in %s.%s is now %d",
-					 MyBgworkerEntry->bgw_name,
-					 table->schema, table->name, val);
-		}
+		call = makeNode(CallStmt);
+		call->funcexpr = funcexpr;
+		dest = CreateDestReceiver(DestNone);
+		ExecuteCallStmt(call, NULL, false, dest);
 
 		/*
 		 * And finish our transaction.
 		 */
-		SPI_finish();
 		PopActiveSnapshot();
 		CommitTransactionCommand();
-		debug_query_string = NULL;
 		pgstat_report_stat(false);
 		pgstat_report_activity(STATE_IDLE, NULL);
 	}