pg_background contrib module proposal
Hi All,
I would like to take over pg_background patch and repost for
discussion and review.
Initially Robert Haas has share this for parallelism demonstration[1]. /messages/by-id/CA+Tgmoam66dTzCP8N2cRcS6S6dBMFX+JMba+mDf68H=KAkNjPQ@mail.gmail.com
and abandoned later with
summary of open issue[2]. /messages/by-id/CA+TgmobPiT_3Qgjeh3_v+8Cq2nMczkPyAYernF_7_W9a-6T1PA@mail.gmail.com with this pg_background patch need to be
fixed, most of them seems to be
addressed in core except handling of type exists without binary
send/recv functions and documentation.
I have added handling for types that don't have binary send/recv
functions in the attach patch and will
work on documentation at the end.
One concern with this patch is code duplication with
exec_simple_query(), we could
consider Jim Nasby’s patch[3]. /messages/by-id/54541779.1010906@BlueTreble.com to overcome this, but certainly we
will end up by complicating
exec_simple_query() to make pg_background happy.
As discussed previously[1]. /messages/by-id/CA+Tgmoam66dTzCP8N2cRcS6S6dBMFX+JMba+mDf68H=KAkNjPQ@mail.gmail.com pg_background is a contrib module that lets
you launch
arbitrary command in a background worker.
• VACUUM in background
• Autonomous transaction implementation better than dblink way (i.e.
no separate authentication required).
• Allows to perform task like CREATE INDEX CONCURRENTLY from a
procedural language.
This module comes with following SQL APIs:
• pg_background_launch : This API takes SQL command, which user wants
to execute, and size of queue buffer.
This function returns the process id of background worker.
• pg_background_result : This API takes the process id as input
parameter and returns the result of command
executed thought the background worker.
• pg_background_detach : This API takes the process id and detach the
background process which is waiting for
user to read its results.
Here's an example of running vacuum and then fetching the results.
Notice that the
notices from the original session are propagated to our session; if an
error had occurred,
it would be re-thrown locally when we try to read the results.
postgres=# create table foo (a int);
CREATE TABLE
postgres=# insert into foo values(generate_series(1,5));
INSERT 0 5
postgres=# select pg_background_launch('vacuum verbose foo');
pg_background_launch
----------------------
65427
(1 row)
postgres=# select * from pg_background_result(65427) as (x text);
INFO: vacuuming "public.foo"
INFO: "foo": found 0 removable, 5 nonremovable row versions in 1 out of 1 pages
DETAIL: 0 dead row versions cannot be removed yet.
There were 0 unused item pointers.
Skipped 0 pages due to buffer pins.
0 pages are entirely empty.
CPU: user: 0.00 s, system: 0.00 s, elapsed: 0.00 s.
x
--------
VACUUM
(1 row)
Thanks to Vibhor kumar, Rushabh Lathia and Robert Haas for feedback.
Please let me know your thoughts, and thanks for reading.
[1]: . /messages/by-id/CA+Tgmoam66dTzCP8N2cRcS6S6dBMFX+JMba+mDf68H=KAkNjPQ@mail.gmail.com
[2]: . /messages/by-id/CA+TgmobPiT_3Qgjeh3_v+8Cq2nMczkPyAYernF_7_W9a-6T1PA@mail.gmail.com
[3]: . /messages/by-id/54541779.1010906@BlueTreble.com
Regards,
Amul
Attachments:
0001-pg_background_v7.patchapplication/octet-stream; name=0001-pg_background_v7.patchDownload
From 2a2bd34d65313e1c603eadfcdb4f36111845b3cc Mon Sep 17 00:00:00 2001
From: Amul Sul <sulamul@gmail.com>
Date: Fri, 4 Nov 2016 12:30:07 +0530
Subject: [PATCH] pg_background_v7
pg_background: Run commands in a background worker, and get the results.
v1-v6:
The currently-active GUC values from the user session will be copied
to the background worker. If the command returns a result set, you
can retrieve the result set; if not, you can retrieve the command
tags. If the command fails with an error, the same error will be
thrown in the launching process when the results are retrieved.
Warnings and other messages generated by the background worker, and
notifications received by it, are also propagated to the foreground
process.
v7:
Rebased of Robert's v6 patch
Added handling for types that don't have binary send/recv functions
---
contrib/Makefile | 1 +
contrib/pg_background/Makefile | 18 +
contrib/pg_background/pg_background--1.0.sql | 24 +
contrib/pg_background/pg_background.c | 1042 ++++++++++++++++++++++++++
contrib/pg_background/pg_background.control | 4 +
src/backend/tcop/postgres.c | 6 +-
src/include/storage/proc.h | 2 +-
src/include/tcop/tcopprot.h | 3 +
8 files changed, 1095 insertions(+), 5 deletions(-)
create mode 100644 contrib/pg_background/Makefile
create mode 100644 contrib/pg_background/pg_background--1.0.sql
create mode 100644 contrib/pg_background/pg_background.c
create mode 100644 contrib/pg_background/pg_background.control
diff --git a/contrib/Makefile b/contrib/Makefile
index 25263c0..04ec28a 100644
--- a/contrib/Makefile
+++ b/contrib/Makefile
@@ -29,6 +29,7 @@ SUBDIRS = \
oid2name \
pageinspect \
passwordcheck \
+ pg_background \
pg_buffercache \
pg_freespacemap \
pg_prewarm \
diff --git a/contrib/pg_background/Makefile b/contrib/pg_background/Makefile
new file mode 100644
index 0000000..c4e717d
--- /dev/null
+++ b/contrib/pg_background/Makefile
@@ -0,0 +1,18 @@
+# contrib/pg_background/Makefile
+
+MODULE_big = pg_background
+OBJS = pg_background.o
+
+EXTENSION = pg_background
+DATA = pg_background--1.0.sql
+
+ifdef USE_PGXS
+PG_CONFIG = pg_config
+PGXS := $(shell $(PG_CONFIG) --pgxs)
+include $(PGXS)
+else
+subdir = contrib/pg_background
+top_builddir = ../..
+include $(top_builddir)/src/Makefile.global
+include $(top_srcdir)/contrib/contrib-global.mk
+endif
diff --git a/contrib/pg_background/pg_background--1.0.sql b/contrib/pg_background/pg_background--1.0.sql
new file mode 100644
index 0000000..eb44492
--- /dev/null
+++ b/contrib/pg_background/pg_background--1.0.sql
@@ -0,0 +1,24 @@
+/* contrib/pg_background/pg_background--1.0.sql */
+
+-- complain if script is sourced in psql, rather than via CREATE EXTENSION
+\echo Use "CREATE EXTENSION pg_background" to load this file. \quit
+
+CREATE FUNCTION pg_background_launch(sql pg_catalog.text,
+ queue_size pg_catalog.int4 DEFAULT 65536)
+ RETURNS pg_catalog.int4 STRICT
+ AS 'MODULE_PATHNAME' LANGUAGE C;
+
+CREATE FUNCTION pg_background_result(pid pg_catalog.int4)
+ RETURNS SETOF pg_catalog.record STRICT
+ AS 'MODULE_PATHNAME' LANGUAGE C;
+
+CREATE FUNCTION pg_background_detach(pid pg_catalog.int4)
+ RETURNS pg_catalog.void STRICT
+ AS 'MODULE_PATHNAME' LANGUAGE C;
+
+REVOKE ALL ON FUNCTION pg_background_launch(pg_catalog.text, pg_catalog.int4)
+ FROM public;
+REVOKE ALL ON FUNCTION pg_background_result(pg_catalog.int4)
+ FROM public;
+REVOKE ALL ON FUNCTION pg_background_detach(pg_catalog.int4)
+ FROM public;
diff --git a/contrib/pg_background/pg_background.c b/contrib/pg_background/pg_background.c
new file mode 100644
index 0000000..4190a23
--- /dev/null
+++ b/contrib/pg_background/pg_background.c
@@ -0,0 +1,1042 @@
+/*--------------------------------------------------------------------------
+ *
+ * pg_background.c
+ * Run SQL commands using a background worker.
+ *
+ * Copyright (C) 2016, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ * contrib/pg_background/pg_background.c
+ *
+ * -------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "fmgr.h"
+
+#include "access/htup_details.h"
+#include "access/printtup.h"
+#include "access/xact.h"
+#include "catalog/pg_type.h"
+#include "commands/async.h"
+#include "commands/dbcommands.h"
+#include "funcapi.h"
+#include "libpq/libpq.h"
+#include "libpq/pqformat.h"
+#include "libpq/pqmq.h"
+#include "miscadmin.h"
+#include "parser/analyze.h"
+#include "pgstat.h"
+#include "storage/dsm.h"
+#include "storage/ipc.h"
+#include "storage/shm_mq.h"
+#include "storage/shm_toc.h"
+#include "tcop/pquery.h"
+#include "tcop/utility.h"
+#include "utils/acl.h"
+#include "utils/builtins.h"
+#include "utils/lsyscache.h"
+#include "utils/memutils.h"
+#include "utils/ps_status.h"
+#include "utils/snapmgr.h"
+#include "utils/syscache.h"
+#include "utils/timeout.h"
+
+/* Table-of-contents constants for our dynamic shared memory segment. */
+#define PG_BACKGROUND_MAGIC 0x50674267
+#define PG_BACKGROUND_KEY_FIXED_DATA 0
+#define PG_BACKGROUND_KEY_SQL 1
+#define PG_BACKGROUND_KEY_GUC 2
+#define PG_BACKGROUND_KEY_QUEUE 3
+#define PG_BACKGROUND_NKEYS 4
+
+/* Fixed-size data passed via our dynamic shared memory segment. */
+typedef struct pg_background_fixed_data
+{
+ Oid database_id;
+ Oid authenticated_user_id;
+ Oid current_user_id;
+ int sec_context;
+ NameData database;
+ NameData authenticated_user;
+} pg_background_fixed_data;
+
+/* Private state maintained by the launching backend for IPC. */
+typedef struct pg_background_worker_info
+{
+ pid_t pid;
+ Oid current_user_id;
+ dsm_segment *seg;
+ BackgroundWorkerHandle *handle;
+ shm_mq_handle *responseq;
+ bool consumed;
+} pg_background_worker_info;
+
+/* Private state maintained across calls to pg_background_result. */
+typedef struct pg_background_result_state
+{
+ pg_background_worker_info *info;
+ FmgrInfo *receive_functions;
+ Oid *typioparams;
+ bool has_row_description;
+ List *command_tags;
+ bool complete;
+} pg_background_result_state;
+
+static HTAB *worker_hash;
+
+static void cleanup_worker_info(dsm_segment *, Datum pid_datum);
+static pg_background_worker_info *find_worker_info(pid_t pid);
+static void check_rights(pg_background_worker_info *info);
+static void save_worker_info(pid_t pid, dsm_segment *seg,
+ BackgroundWorkerHandle *handle,
+ shm_mq_handle *responseq);
+static void pg_background_error_callback(void *arg);
+
+static HeapTuple form_result_tuple(pg_background_result_state *state,
+ TupleDesc tupdesc, StringInfo msg);
+
+static void handle_sigterm(SIGNAL_ARGS);
+static void execute_sql_string(const char *sql);
+static bool exists_binary_recv_fn(Oid type);
+
+PG_MODULE_MAGIC;
+
+PG_FUNCTION_INFO_V1(pg_background_launch);
+PG_FUNCTION_INFO_V1(pg_background_result);
+PG_FUNCTION_INFO_V1(pg_background_detach);
+
+void pg_background_worker_main(Datum);
+
+/*
+ * Start a dynamic background worker to run a user-specified SQL command.
+ */
+Datum
+pg_background_launch(PG_FUNCTION_ARGS)
+{
+ text *sql = PG_GETARG_TEXT_PP(0);
+ int32 queue_size = PG_GETARG_INT32(1);
+ int32 sql_len = VARSIZE_ANY_EXHDR(sql);
+ Size guc_len;
+ Size segsize;
+ dsm_segment *seg;
+ shm_toc_estimator e;
+ shm_toc *toc;
+ char *sqlp;
+ char *gucstate;
+ shm_mq *mq;
+ BackgroundWorker worker;
+ BackgroundWorkerHandle *worker_handle;
+ pg_background_fixed_data *fdata;
+ pid_t pid;
+ shm_mq_handle *responseq;
+ MemoryContext oldcontext;
+
+ /* Ensure a valid queue size. */
+ if (queue_size < 0 || ((uint64) queue_size) < shm_mq_minimum_size)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("queue size must be at least %zu bytes",
+ shm_mq_minimum_size)));
+
+ /* Create dynamic shared memory and table of contents. */
+ shm_toc_initialize_estimator(&e);
+ shm_toc_estimate_chunk(&e, sizeof(pg_background_fixed_data));
+ shm_toc_estimate_chunk(&e, sql_len + 1);
+ guc_len = EstimateGUCStateSpace();
+ shm_toc_estimate_chunk(&e, guc_len);
+ shm_toc_estimate_chunk(&e, (Size) queue_size);
+ shm_toc_estimate_keys(&e, PG_BACKGROUND_NKEYS);
+ segsize = shm_toc_estimate(&e);
+ seg = dsm_create(segsize, 0);
+ toc = shm_toc_create(PG_BACKGROUND_MAGIC, dsm_segment_address(seg),
+ segsize);
+
+ /* Store fixed-size data in dynamic shared memory. */
+ fdata = shm_toc_allocate(toc, sizeof(pg_background_fixed_data));
+ fdata->database_id = MyDatabaseId;
+ fdata->authenticated_user_id = GetAuthenticatedUserId();
+ GetUserIdAndSecContext(&fdata->current_user_id, &fdata->sec_context);
+ namestrcpy(&fdata->database, get_database_name(MyDatabaseId));
+ namestrcpy(&fdata->authenticated_user,
+ GetUserNameFromId(fdata->authenticated_user_id, false));
+ shm_toc_insert(toc, PG_BACKGROUND_KEY_FIXED_DATA, fdata);
+
+ /* Store SQL query in dynamic shared memory. */
+ sqlp = shm_toc_allocate(toc, sql_len + 1);
+ memcpy(sqlp, VARDATA(sql), sql_len);
+ sqlp[sql_len] = '\0';
+ shm_toc_insert(toc, PG_BACKGROUND_KEY_SQL, sqlp);
+
+ /* Store GUC state in dynamic shared memory. */
+ gucstate = shm_toc_allocate(toc, guc_len);
+ SerializeGUCState(guc_len, gucstate);
+ shm_toc_insert(toc, PG_BACKGROUND_KEY_GUC, gucstate);
+
+ /* Establish message queue in dynamic shared memory. */
+ mq = shm_mq_create(shm_toc_allocate(toc, (Size) queue_size),
+ (Size) queue_size);
+ shm_toc_insert(toc, PG_BACKGROUND_KEY_QUEUE, mq);
+ shm_mq_set_receiver(mq, MyProc);
+
+ /*
+ * Attach the queue before launching a worker, so that we'll automatically
+ * detach the queue if we error out. (Otherwise, the worker might sit
+ * there trying to write the queue long after we've gone away.)
+ */
+ oldcontext = MemoryContextSwitchTo(TopMemoryContext);
+ responseq = shm_mq_attach(mq, seg, NULL);
+ MemoryContextSwitchTo(oldcontext);
+
+ /* Configure a worker. */
+ worker.bgw_flags =
+ BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION;
+ worker.bgw_start_time = BgWorkerStart_ConsistentState;
+ worker.bgw_restart_time = BGW_NEVER_RESTART;
+ worker.bgw_main = NULL; /* new worker might not have library loaded */
+ sprintf(worker.bgw_library_name, "pg_background");
+ sprintf(worker.bgw_function_name, "pg_background_worker_main");
+ snprintf(worker.bgw_name, BGW_MAXLEN,
+ "pg_background by PID %d", MyProcPid);
+ worker.bgw_main_arg = UInt32GetDatum(dsm_segment_handle(seg));
+ /* set bgw_notify_pid, so we can detect if the worker stops */
+ worker.bgw_notify_pid = MyProcPid;
+
+ /*
+ * Register the worker.
+ *
+ * We switch contexts so that the background worker handle can outlast
+ * this transaction.
+ */
+ oldcontext = MemoryContextSwitchTo(TopMemoryContext);
+ if (!RegisterDynamicBackgroundWorker(&worker, &worker_handle))
+ ereport(ERROR,
+ (errcode(ERRCODE_INSUFFICIENT_RESOURCES),
+ errmsg("could not register background process"),
+ errhint("You may need to increase max_worker_processes.")));
+ MemoryContextSwitchTo(oldcontext);
+ shm_mq_set_handle(responseq, worker_handle);
+
+ /* Wait for the worker to start. */
+ switch (WaitForBackgroundWorkerStartup(worker_handle, &pid))
+ {
+ case BGWH_STARTED:
+ /* Success. */
+ break;
+ case BGWH_STOPPED:
+ pfree(worker_handle);
+ ereport(ERROR,
+ (errcode(ERRCODE_INSUFFICIENT_RESOURCES),
+ errmsg("could not start background process"),
+ errhint("More details may be available in the server log.")));
+ break;
+ case BGWH_POSTMASTER_DIED:
+ pfree(worker_handle);
+ ereport(ERROR,
+ (errcode(ERRCODE_INSUFFICIENT_RESOURCES),
+ errmsg("cannot start background processes without postmaster"),
+ errhint("Kill all remaining database processes and restart the database.")));
+ break;
+ default:
+ elog(ERROR, "unexpected bgworker handle status");
+ break;
+ }
+
+ /* Store the relevant details about this worker for future use. */
+ save_worker_info(pid, seg, worker_handle, responseq);
+
+ /*
+ * Now that the worker info is saved, we do not need to, and should not,
+ * automatically detach the segment at resource-owner cleanup time.
+ */
+ dsm_pin_mapping(seg);
+
+ /* Return the worker's PID. */
+ PG_RETURN_INT32(pid);
+}
+
+/*
+ * Retrieve the results of a background query previously launched in this
+ * session.
+ */
+Datum
+pg_background_result(PG_FUNCTION_ARGS)
+{
+ int32 pid = PG_GETARG_INT32(0);
+ shm_mq_result res;
+ FuncCallContext *funcctx;
+ TupleDesc tupdesc;
+ StringInfoData msg;
+ pg_background_result_state *state;
+
+ /* First-time setup. */
+ if (SRF_IS_FIRSTCALL())
+ {
+ MemoryContext oldcontext;
+ pg_background_worker_info *info;
+
+ funcctx = SRF_FIRSTCALL_INIT();
+ oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
+
+ /* See if we have a connection to the specified PID. */
+ if ((info = find_worker_info(pid)) == NULL)
+ ereport(ERROR,
+ (errcode(ERRCODE_UNDEFINED_OBJECT),
+ errmsg("PID %d is not attached to this session", pid)));
+ check_rights(info);
+
+ /* Can't read results twice. */
+ if (info->consumed)
+ ereport(ERROR,
+ (errcode(ERRCODE_UNDEFINED_OBJECT),
+ errmsg("results for PID %d have already been consumed", pid)));
+ info->consumed = true;
+
+ /*
+ * Whether we succeed or fail, a future invocation of this function
+ * may not try to read from the DSM once we've begun to do so.
+ * Accordingly, make arrangements to clean things up at end of query.
+ */
+ dsm_unpin_mapping(info->seg);
+
+ /* Set up tuple-descriptor based on column definition list. */
+ if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("function returning record called in context "
+ "that cannot accept type record"),
+ errhint("Try calling the function in the FROM clause "
+ "using a column definition list.")));
+ funcctx->tuple_desc = BlessTupleDesc(tupdesc);
+
+ /* Cache state that will be needed on every call. */
+ state = palloc0(sizeof(pg_background_result_state));
+ state->info = info;
+ if (funcctx->tuple_desc->natts > 0)
+ {
+ int natts = funcctx->tuple_desc->natts;
+ int i;
+
+ state->receive_functions = palloc(sizeof(FmgrInfo) * natts);
+ state->typioparams = palloc(sizeof(Oid) * natts);
+
+ for (i = 0; i < natts; ++i)
+ {
+ Oid receive_function_id;
+
+ getTypeBinaryInputInfo(funcctx->tuple_desc->attrs[i]->atttypid,
+ &receive_function_id,
+ &state->typioparams[i]);
+ fmgr_info(receive_function_id, &state->receive_functions[i]);
+ }
+ }
+ funcctx->user_fctx = state;
+
+ MemoryContextSwitchTo(oldcontext);
+ }
+ funcctx = SRF_PERCALL_SETUP();
+ tupdesc = funcctx->tuple_desc;
+ state = funcctx->user_fctx;
+
+ /* Initialize message buffer. */
+ initStringInfo(&msg);
+
+ /* Read and processes messages from the shared memory queue. */
+ for (;;)
+ {
+ char msgtype;
+ Size nbytes;
+ void *data;
+
+ /* Get next message. */
+ res = shm_mq_receive(state->info->responseq, &nbytes, &data, false);
+ if (res != SHM_MQ_SUCCESS)
+ break;
+
+ /*
+ * Message-parsing routines operate on a null-terminated StringInfo,
+ * so we must construct one.
+ */
+ resetStringInfo(&msg);
+ enlargeStringInfo(&msg, nbytes);
+ msg.len = nbytes;
+ memcpy(msg.data, data, nbytes);
+ msg.data[nbytes] = '\0';
+ msgtype = pq_getmsgbyte(&msg);
+
+ /* Dispatch on message type. */
+ switch (msgtype)
+ {
+ case 'E':
+ case 'N':
+ {
+ ErrorData edata;
+ ErrorContextCallback context;
+
+ /* Parse ErrorResponse or NoticeResponse. */
+ pq_parse_errornotice(&msg, &edata);
+
+ /*
+ * Limit the maximum error level to ERROR. We don't want
+ * a FATAL inside the background worker to kill the user
+ * session.
+ */
+ if (edata.elevel > ERROR)
+ edata.elevel = ERROR;
+
+ /*
+ * Rethrow the error with an appropriate context method.
+ */
+ context.callback = pg_background_error_callback;
+ context.arg = (void *) &pid;
+ context.previous = error_context_stack;
+ error_context_stack = &context;
+ ThrowErrorData(&edata);
+ error_context_stack = context.previous;
+
+ break;
+ }
+ case 'A':
+ {
+ /* Propagate NotifyResponse. */
+ pq_putmessage(msg.data[0], &msg.data[1], nbytes - 1);
+ break;
+ }
+ case 'T':
+ {
+ int16 natts = pq_getmsgint(&msg, 2);
+ int16 i;
+
+ if (state->has_row_description)
+ elog(ERROR, "multiple RowDescription messages");
+ state->has_row_description = true;
+ if (natts != tupdesc->natts)
+ ereport(ERROR,
+ (errcode(ERRCODE_DATATYPE_MISMATCH),
+ errmsg("remote query result rowtype does not match "
+ "the specified FROM clause rowtype")));
+
+ for (i = 0; i < natts; ++i)
+ {
+ Oid type_id;
+
+ (void) pq_getmsgstring(&msg); /* name */
+ (void) pq_getmsgint(&msg, 4); /* table OID */
+ (void) pq_getmsgint(&msg, 2); /* table attnum */
+ type_id = pq_getmsgint(&msg, 4); /* type OID */
+ (void) pq_getmsgint(&msg, 2); /* type length */
+ (void) pq_getmsgint(&msg, 4); /* typmod */
+ (void) pq_getmsgint(&msg, 2); /* format code */
+
+ if (exists_binary_recv_fn(type_id))
+ {
+ if (type_id != tupdesc->attrs[i]->atttypid)
+ ereport(ERROR,
+ (errcode(ERRCODE_DATATYPE_MISMATCH),
+ errmsg("remote query result rowtype does not match "
+ "the specified FROM clause rowtype")));
+ }
+ else if(tupdesc->attrs[i]->atttypid != TEXTOID)
+ ereport(ERROR,
+ (errcode(ERRCODE_DATATYPE_MISMATCH),
+ errmsg("remote query result rowtype does not match "
+ "the specified FROM clause rowtype"),
+ errhint("use text type instead")));
+ }
+
+ pq_getmsgend(&msg);
+
+ break;
+ }
+ case 'D':
+ {
+ /* Handle DataRow message. */
+ HeapTuple result;
+
+ result = form_result_tuple(state, tupdesc, &msg);
+ SRF_RETURN_NEXT(funcctx, HeapTupleGetDatum(result));
+ }
+ case 'C':
+ {
+ /* Handle CommandComplete message. */
+ MemoryContext oldcontext;
+ const char *tag = pq_getmsgstring(&msg);
+
+ oldcontext =
+ MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
+ state->command_tags = lappend(state->command_tags,
+ pstrdup(tag));
+ MemoryContextSwitchTo(oldcontext);
+ break;
+ }
+ case 'G':
+ case 'H':
+ case 'W':
+ {
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("COPY protocol not allowed in pg_background")));
+ }
+
+ case 'Z':
+ {
+ /* Handle ReadyForQuery message. */
+ state->complete = true;
+ break;
+ }
+ default:
+ elog(WARNING, "unknown message type: %c (%zu bytes)",
+ msg.data[0], nbytes);
+ break;
+ }
+ }
+
+ /* Check whether the connection was broken prematurely. */
+ if (!state->complete)
+ ereport(ERROR,
+ (errcode(ERRCODE_CONNECTION_FAILURE),
+ errmsg("lost connection to worker process with PID %d",
+ pid)));
+
+ /* If no data rows, return the command tags instead. */
+ if (!state->has_row_description)
+ {
+ if (tupdesc->natts != 1 || tupdesc->attrs[0]->atttypid != TEXTOID)
+ ereport(ERROR,
+ (errcode(ERRCODE_DATATYPE_MISMATCH),
+ errmsg("remote query did not return a result set, but "
+ "result rowtype is not a single text column")));
+ if (state->command_tags != NIL)
+ {
+ char *tag = linitial(state->command_tags);
+ Datum value;
+ bool isnull;
+ HeapTuple result;
+
+ state->command_tags = list_delete_first(state->command_tags);
+ value = PointerGetDatum(cstring_to_text(tag));
+ isnull = false;
+ result = heap_form_tuple(tupdesc, &value, &isnull);
+ SRF_RETURN_NEXT(funcctx, HeapTupleGetDatum(result));
+ }
+ }
+
+ /* We're done! */
+ dsm_detach(state->info->seg);
+ SRF_RETURN_DONE(funcctx);
+}
+
+/*
+ * Parse a DataRow message and form a result tuple.
+ */
+static HeapTuple
+form_result_tuple(pg_background_result_state *state, TupleDesc tupdesc,
+ StringInfo msg)
+{
+ /* Handle DataRow message. */
+ int16 natts = pq_getmsgint(msg, 2);
+ int16 i;
+ Datum *values = NULL;
+ bool *isnull = NULL;
+ StringInfoData buf;
+
+ if (!state->has_row_description)
+ elog(ERROR, "DataRow not preceded by RowDescription");
+ if (natts != tupdesc->natts)
+ elog(ERROR, "malformed DataRow");
+ if (natts > 0)
+ {
+ values = palloc(natts * sizeof(Datum));
+ isnull = palloc(natts * sizeof(bool));
+ }
+ initStringInfo(&buf);
+
+ for (i = 0; i < natts; ++i)
+ {
+ int32 bytes = pq_getmsgint(msg, 4);
+
+ if (bytes < 0)
+ {
+ values[i] = ReceiveFunctionCall(&state->receive_functions[i],
+ NULL,
+ state->typioparams[i],
+ tupdesc->attrs[i]->atttypmod);
+ isnull[i] = true;
+ }
+ else
+ {
+ resetStringInfo(&buf);
+ appendBinaryStringInfo(&buf, pq_getmsgbytes(msg, bytes), bytes);
+ values[i] = ReceiveFunctionCall(&state->receive_functions[i],
+ &buf,
+ state->typioparams[i],
+ tupdesc->attrs[i]->atttypmod);
+ isnull[i] = false;
+ }
+ }
+
+ pq_getmsgend(msg);
+
+ return heap_form_tuple(tupdesc, values, isnull);
+}
+
+/*
+ * Detach from the dynamic shared memory segment used for communication with
+ * a background worker. This prevents the worker from stalling waiting for
+ * us to read its results.
+ */
+Datum
+pg_background_detach(PG_FUNCTION_ARGS)
+{
+ int32 pid = PG_GETARG_INT32(0);
+ pg_background_worker_info *info;
+
+ info = find_worker_info(pid);
+ if (info == NULL)
+ ereport(ERROR,
+ (errcode(ERRCODE_UNDEFINED_OBJECT),
+ errmsg("PID %d is not attached to this session", pid)));
+ check_rights(info);
+ dsm_detach(info->seg);
+
+ PG_RETURN_VOID();
+}
+
+/*
+ * When the dynamic shared memory segment associated with a worker is
+ * cleaned up, we need to clean up our associated private data structures.
+ */
+static void
+cleanup_worker_info(dsm_segment *seg, Datum pid_datum)
+{
+ pid_t pid = DatumGetInt32(pid_datum);
+ bool found;
+ pg_background_worker_info *info;
+
+ /* Find any worker info entry for this PID. If none, we're done. */
+ if ((info = find_worker_info(pid)) == NULL)
+ return;
+
+ /* Free memory used by the BackgroundWorkerHandle. */
+ if (info->handle != NULL)
+ {
+ pfree(info->handle);
+ info->handle = NULL;
+ }
+
+ /* Remove the hashtable entry. */
+ hash_search(worker_hash, (void *) &pid, HASH_REMOVE, &found);
+ if (!found)
+ elog(ERROR, "pg_background worker_hash table corrupted");
+}
+
+/*
+ * Find the background worker information for the worker with a given PID.
+ */
+static pg_background_worker_info *
+find_worker_info(pid_t pid)
+{
+ pg_background_worker_info *info = NULL;
+
+ if (worker_hash != NULL)
+ info = hash_search(worker_hash, (void *) &pid, HASH_FIND, NULL);
+
+ return info;
+}
+
+/*
+ * Check whether the current user has rights to manipulate the background
+ * worker with the given PID.
+ */
+static void
+check_rights(pg_background_worker_info *info)
+{
+ Oid current_user_id;
+ int sec_context;
+
+ GetUserIdAndSecContext(¤t_user_id, &sec_context);
+ if (!has_privs_of_role(current_user_id, info->current_user_id))
+ ereport(ERROR,
+ (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
+ errmsg("permission denied for background worker with PID \"%d\"",
+ info->pid)));
+}
+
+/*
+ * Save worker information for future IPC.
+ */
+static void
+save_worker_info(pid_t pid, dsm_segment *seg, BackgroundWorkerHandle *handle,
+ shm_mq_handle *responseq)
+{
+ pg_background_worker_info *info;
+ Oid current_user_id;
+ int sec_context;
+
+ /* If the hash table hasn't been set up yet, do that now. */
+ if (worker_hash == NULL)
+ {
+ HASHCTL ctl;
+
+ ctl.keysize = sizeof(pid_t);
+ ctl.entrysize = sizeof(pg_background_worker_info);
+ worker_hash = hash_create("pg_background worker_hash", 8, &ctl,
+ HASH_ELEM);
+ }
+
+ /* Get current authentication information. */
+ GetUserIdAndSecContext(¤t_user_id, &sec_context);
+
+ /*
+ * In the unlikely event that there's an older worker with this PID,
+ * just detach it - unless it has a different user ID than the
+ * currently-active one, in which case someone might be trying to pull
+ * a fast one. Let's kill the backend to make sure we don't break
+ * anyone's expectations.
+ */
+ if ((info = find_worker_info(pid)) != NULL)
+ {
+ if (current_user_id != info->current_user_id)
+ ereport(FATAL,
+ (errcode(ERRCODE_DUPLICATE_OBJECT),
+ errmsg("background worker with PID \"%d\" already exists",
+ pid)));
+ dsm_detach(info->seg);
+ }
+
+ /* When the DSM is unmapped, clean everything up. */
+ on_dsm_detach(seg, cleanup_worker_info, Int32GetDatum(pid));
+
+ /* Create a new entry for this worker. */
+ info = hash_search(worker_hash, (void *) &pid, HASH_ENTER, NULL);
+ info->seg = seg;
+ info->handle = handle;
+ info->responseq = responseq;
+ info->consumed = false;
+}
+
+/*
+ * Indicate that an error came from a particular background worker.
+ */
+static void
+pg_background_error_callback(void *arg)
+{
+ pid_t pid = * (pid_t *) arg;
+
+ errcontext("background worker, pid %d", pid);
+}
+
+/*
+ * Background worker entrypoint.
+ */
+void
+pg_background_worker_main(Datum main_arg)
+{
+ dsm_segment *seg;
+ shm_toc *toc;
+ pg_background_fixed_data *fdata;
+ char *sql;
+ char *gucstate;
+ shm_mq *mq;
+ shm_mq_handle *responseq;
+
+ /* Establish signal handlers. */
+ pqsignal(SIGTERM, handle_sigterm);
+ BackgroundWorkerUnblockSignals();
+
+ /* Set up a memory context and resource owner. */
+ Assert(CurrentResourceOwner == NULL);
+ CurrentResourceOwner = ResourceOwnerCreate(NULL, "pg_background");
+ CurrentMemoryContext = AllocSetContextCreate(TopMemoryContext,
+ "pg_background session",
+ ALLOCSET_DEFAULT_MINSIZE,
+ ALLOCSET_DEFAULT_INITSIZE,
+ ALLOCSET_DEFAULT_MAXSIZE);
+
+ /* Connect to the dynamic shared memory segment. */
+ seg = dsm_attach(DatumGetInt32(main_arg));
+ if (seg == NULL)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("unable to map dynamic shared memory segment")));
+ toc = shm_toc_attach(PG_BACKGROUND_MAGIC, dsm_segment_address(seg));
+ if (toc == NULL)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("bad magic number in dynamic shared memory segment")));
+
+ /* Find data structures in dynamic shared memory. */
+ fdata = shm_toc_lookup(toc, PG_BACKGROUND_KEY_FIXED_DATA);
+ sql = shm_toc_lookup(toc, PG_BACKGROUND_KEY_SQL);
+ gucstate = shm_toc_lookup(toc, PG_BACKGROUND_KEY_GUC);
+ mq = shm_toc_lookup(toc, PG_BACKGROUND_KEY_QUEUE);
+ shm_mq_set_sender(mq, MyProc);
+ responseq = shm_mq_attach(mq, seg, NULL);
+
+ /* Redirect protocol messages to responseq. */
+ pq_redirect_to_shm_mq(seg, responseq);
+
+ /*
+ * Initialize our user and database ID based on the strings version of
+ * the data, and then go back and check that we actually got the database
+ * and user ID that we intended to get. We do this because it's not
+ * impossible for the process that started us to die before we get here,
+ * and the user or database could be renamed in the meantime. We don't
+ * want to latch on the wrong object by accident. There should probably
+ * be a variant of BackgroundWorkerInitializeConnection that accepts OIDs
+ * rather than strings.
+ */
+ BackgroundWorkerInitializeConnection(NameStr(fdata->database),
+ NameStr(fdata->authenticated_user));
+ if (fdata->database_id != MyDatabaseId ||
+ fdata->authenticated_user_id != GetAuthenticatedUserId())
+ ereport(ERROR,
+ (errmsg("user or database renamed during pg_background startup")));
+
+ /* Restore GUC values from launching backend. */
+ StartTransactionCommand();
+ RestoreGUCState(gucstate);
+ CommitTransactionCommand();
+
+ /* Handle local_preload_libraries and session_preload_libraries. */
+ process_session_preload_libraries();
+
+ /* Restore user ID and security context. */
+ SetUserIdAndSecContext(fdata->current_user_id, fdata->sec_context);
+
+ /* Prepare to execute the query. */
+ SetCurrentStatementStartTimestamp();
+ debug_query_string = sql;
+ pgstat_report_activity(STATE_RUNNING, sql);
+
+ /* Execute the query. */
+ execute_sql_string(sql);
+
+ /* Post-execution cleanup. */
+ ProcessCompletedNotifies();
+ pgstat_report_activity(STATE_IDLE, sql);
+ pgstat_report_stat(true);
+
+ /* Signal that we are done. */
+ ReadyForQuery(DestRemote);
+}
+
+/*
+ * Check binary input function exists for the given type.
+ */
+static bool
+exists_binary_recv_fn(Oid type)
+{
+ HeapTuple typeTuple;
+ Form_pg_type pt;
+ bool exists_rev_fn;
+
+ typeTuple = SearchSysCache1(TYPEOID, ObjectIdGetDatum(type));
+ if (!HeapTupleIsValid(typeTuple))
+ elog(ERROR, "cache lookup failed for type %u", type);
+
+ pt = (Form_pg_type) GETSTRUCT(typeTuple);
+ exists_rev_fn = OidIsValid(pt->typreceive);
+ ReleaseSysCache(typeTuple);
+
+ return exists_rev_fn;
+}
+
+/*
+ * Execute given SQL string.
+ *
+ * Using SPI here would preclude backgrounding commands like VACUUM which one
+ * might very well wish to launch in the background. So we do this instead.
+ */
+static void
+execute_sql_string(const char *sql)
+{
+ List *raw_parsetree_list;
+ ListCell *lc1;
+ bool isTopLevel;
+ int commands_remaining;
+ MemoryContext parsecontext;
+ MemoryContext oldcontext;
+
+ /* Start up a transaction command. */
+ start_xact_command();
+
+ /*
+ * Parse the SQL string into a list of raw parse trees.
+ *
+ * Because we allow statements that perform internal transaction control,
+ * we can't do this in TopTransactionContext; the parse trees might get
+ * blown away before we're done executing them.
+ */
+ parsecontext = AllocSetContextCreate(TopMemoryContext,
+ "pg_background parse/plan",
+ ALLOCSET_DEFAULT_MINSIZE,
+ ALLOCSET_DEFAULT_INITSIZE,
+ ALLOCSET_DEFAULT_MAXSIZE);
+ oldcontext = MemoryContextSwitchTo(parsecontext);
+ raw_parsetree_list = pg_parse_query(sql);
+ commands_remaining = list_length(raw_parsetree_list);
+ isTopLevel = commands_remaining == 1;
+ MemoryContextSwitchTo(oldcontext);
+
+ /*
+ * Do parse analysis, rule rewrite, planning, and execution for each raw
+ * parsetree. We must fully execute each query before beginning parse
+ * analysis on the next one, since there may be interdependencies.
+ */
+ foreach(lc1, raw_parsetree_list)
+ {
+ Node *parsetree = (Node *) lfirst(lc1);
+ const char *commandTag;
+ char completionTag[COMPLETION_TAG_BUFSIZE];
+ List *querytree_list,
+ *plantree_list;
+ bool snapshot_set = false;
+ Portal portal;
+ DestReceiver *receiver;
+
+ /*
+ * We don't allow transaction-control commands like COMMIT and ABORT
+ * here. The entire SQL statement is executed as a single transaction
+ * which commits if no errors are encountered.
+ */
+ if (IsA(parsetree, TransactionStmt))
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("transaction control statements are not allowed in pg_background")));
+
+ /*
+ * Get the command name for use in status display (it also becomes the
+ * default completion tag, down inside PortalRun). Set ps_status and
+ * do any special start-of-SQL-command processing needed by the
+ * destination.
+ */
+ commandTag = CreateCommandTag(parsetree);
+ set_ps_display(commandTag, false);
+ BeginCommand(commandTag, DestNone);
+
+ /* Set up a snapshot if parse analysis/planning will need one. */
+ if (analyze_requires_snapshot(parsetree))
+ {
+ PushActiveSnapshot(GetTransactionSnapshot());
+ snapshot_set = true;
+ }
+
+ /*
+ * OK to analyze, rewrite, and plan this query.
+ *
+ * As with parsing, we need to make sure this data outlives the
+ * transaction, because of the possibility that the statement might
+ * perform internal transaction control.
+ */
+ oldcontext = MemoryContextSwitchTo(parsecontext);
+ querytree_list = pg_analyze_and_rewrite(parsetree, sql, NULL, 0);
+ plantree_list = pg_plan_queries(querytree_list, 0, NULL);
+
+ /* Done with the snapshot used for parsing/planning */
+ if (snapshot_set)
+ PopActiveSnapshot();
+
+ /* If we got a cancel signal in analysis or planning, quit */
+ CHECK_FOR_INTERRUPTS();
+
+ /*
+ * Execute the query using the unnamed portal.
+ */
+ portal = CreatePortal("", true, true);
+ /* Don't display the portal in pg_cursors */
+ portal->visible = false;
+ PortalDefineQuery(portal, NULL, sql, commandTag, plantree_list, NULL);
+ PortalStart(portal, NULL, 0, InvalidSnapshot);
+
+ /* Use binary format if input function exists otherwise text. */
+ if (portal->tupDesc != NULL)
+ {
+ int i;
+ int natts = portal->tupDesc->natts;
+ Form_pg_attribute *attrs = portal->tupDesc->attrs;
+ int16 *formats = (int16 *) palloc(natts * sizeof(int16));
+
+ for (i = 0; i < natts; i++)
+ formats[i] = exists_binary_recv_fn(attrs[i]->atttypid) ? 1 : 0;
+
+ PortalSetResultFormat(portal, natts, formats);
+ }
+
+ /*
+ * Tuples returned by any command other than the last are simply
+ * discarded; but those returned by the last (or only) command are
+ * redirected to the shared memory queue we're using for communication
+ * with the launching backend. If the launching backend is gone or has
+ * detached us, these messages will just get dropped on the floor.
+ */
+ --commands_remaining;
+ if (commands_remaining > 0)
+ receiver = CreateDestReceiver(DestNone);
+ else
+ {
+ receiver = CreateDestReceiver(DestRemote);
+ SetRemoteDestReceiverParams(receiver, portal);
+ }
+
+ /*
+ * Only once the portal and destreceiver have been established can
+ * we return to the transaction context. All that stuff needs to
+ * survive an internal commit inside PortalRun!
+ */
+ MemoryContextSwitchTo(oldcontext);
+
+ /* Here's where we actually execute the command. */
+ (void) PortalRun(portal, FETCH_ALL, isTopLevel, receiver, receiver,
+ completionTag);
+
+ /* Clean up the receiver. */
+ (*receiver->rDestroy) (receiver);
+
+ /* Clean up the portal. */
+ PortalDrop(portal, false);
+
+ /*
+ * If this is the last parsetree, close down transaction statement
+ * before reporting CommandComplete. Otherwise, we need a
+ * CommandCounterIncrement.
+ */
+ if (lnext(lc1) == NULL)
+ finish_xact_command();
+ else
+ CommandCounterIncrement();
+
+ /*
+ * Send a CommandComplete message even if we suppressed the query
+ * results. The user backend will report the command tags in the
+ * absence of any true query results.
+ */
+ EndCommand(completionTag, DestRemote);
+ }
+
+ /* Make sure there's not still a transaction open. */
+ finish_xact_command();
+}
+
+/*
+ * When we receive a SIGTERM, we set InterruptPending and ProcDiePending just
+ * like a normal backend. The next CHECK_FOR_INTERRUPTS() will do the right
+ * thing.
+ */
+static void
+handle_sigterm(SIGNAL_ARGS)
+{
+ int save_errno = errno;
+
+ if (MyProc)
+ SetLatch(&MyProc->procLatch);
+
+ if (!proc_exit_inprogress)
+ {
+ InterruptPending = true;
+ ProcDiePending = true;
+ }
+
+ errno = save_errno;
+}
diff --git a/contrib/pg_background/pg_background.control b/contrib/pg_background/pg_background.control
new file mode 100644
index 0000000..733d0e1
--- /dev/null
+++ b/contrib/pg_background/pg_background.control
@@ -0,0 +1,4 @@
+comment = 'Run SQL queries in the background'
+default_version = '1.0'
+module_pathname = '$libdir/pg_background'
+relocatable = true
diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c
index 599874e..3129287 100644
--- a/src/backend/tcop/postgres.c
+++ b/src/backend/tcop/postgres.c
@@ -180,8 +180,6 @@ static int errdetail_execute(List *raw_parsetree_list);
static int errdetail_params(ParamListInfo params);
static int errdetail_abort(void);
static int errdetail_recovery_conflict(void);
-static void start_xact_command(void);
-static void finish_xact_command(void);
static bool IsTransactionExitStmt(Node *parsetree);
static bool IsTransactionExitStmtList(List *parseTrees);
static bool IsTransactionStmtList(List *parseTrees);
@@ -2422,7 +2420,7 @@ exec_describe_portal_message(const char *portal_name)
/*
* Convenience routines for starting/committing a single command.
*/
-static void
+void
start_xact_command(void)
{
if (!xact_started)
@@ -2442,7 +2440,7 @@ start_xact_command(void)
}
}
-static void
+void
finish_xact_command(void)
{
if (xact_started)
diff --git a/src/include/storage/proc.h b/src/include/storage/proc.h
index 7dc8dac..227ff09 100644
--- a/src/include/storage/proc.h
+++ b/src/include/storage/proc.h
@@ -260,7 +260,7 @@ extern PGPROC *PreparedXactProcs;
/* configurable options */
extern int DeadlockTimeout;
-extern int StatementTimeout;
+extern PGDLLIMPORT int StatementTimeout;
extern int LockTimeout;
extern int IdleInTransactionSessionTimeout;
extern bool log_lock_waits;
diff --git a/src/include/tcop/tcopprot.h b/src/include/tcop/tcopprot.h
index 7254355..2f0453c 100644
--- a/src/include/tcop/tcopprot.h
+++ b/src/include/tcop/tcopprot.h
@@ -70,6 +70,9 @@ extern void RecoveryConflictInterrupt(ProcSignalReason reason); /* called from S
extern void ProcessClientReadInterrupt(bool blocked);
extern void ProcessClientWriteInterrupt(bool blocked);
+extern void start_xact_command(void);
+extern void finish_xact_command(void);
+
extern void process_postgres_switches(int argc, char *argv[],
GucContext ctx, const char **dbname);
extern void PostgresMain(int argc, char *argv[],
--
2.6.2
This is simply wonderful!
Finaly, I can implement my favorite sleepsort in postgres:
create table input as select round(random()*20) x from generate_series(1,5,1);
create table output(place int,value int);
create sequence s start 1;
create table handles as select pg_background_launch('select
pg_sleep('||x||'); insert into output values
(nextval(''s''),'||x||');') h from input;
select (select * from pg_background_result(h) as (x text) limit 1) from handles;
select * from output;
Works like a charm. It has perfrect running time O(1) where n is
number of items to sort, but it cannot sort more than
max_worker_processes-1.
Next step of user cuncurrency mechanics is future indexes (indexes
under cunstruction are taken into plans, query is executed when index
is actualy constructed) and promised tables (query waits untial there
is actually data in the table).
I like the idea and implementation and I'm planning to post review by
the end of december.
Right now I have just one useful idea: I do not see comfortable way to
check up state of task (finished\failed) without possibility of haning
for long or catching fire.
Best regards, Andrey Borodin.
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
I've read the code and now I have more suggestions.
1. Easy one. The case of different natts of expected and acutal result
throws same errmsg as the case of wrong types.
I think we should assist the user more here
if (natts != tupdesc->natts)
ereport(ERROR,
(errcode(ERRCODE_DATATYPE_MISMATCH),
errmsg("remote query result rowtype does not match the
specified FROM clause rowtype")));
and here
if (type_id != tupdesc->attrs[i]->atttypid)
ereport(ERROR,
(errcode(ERRCODE_DATATYPE_MISMATCH),
errmsg("remote query result rowtype does not match the
specified FROM clause rowtype")));
2. This code
errhint("You may need to increase max_worker_processes.")
Is technically hinting absolutley right.
But this extension requires something like pg_bouncer or what is
called "thread pool".
You just cannot safely fire a background task because you do not know
how many workes can be spawned. Maybe it'd be better to create 'pool'
of workers and form a queue of queries for them?
This will make possible start a millions of subtasks.
3. About getting it to the core, not as an extension. IMO this is too
sharp thing to place it into core, I think that it's best place is in
contribs.
Thanks for the work on such a cool and fun extension.
Best regards, Andrey Borodin.
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
2016-12-09 13:48 GMT+01:00 Andrew Borodin <borodin@octonica.com>:
I've read the code and now I have more suggestions.
1. Easy one. The case of different natts of expected and acutal result
throws same errmsg as the case of wrong types.
I think we should assist the user more hereif (natts != tupdesc->natts)
ereport(ERROR,
(errcode(ERRCODE_DATATYPE_MISMATCH),
errmsg("remote query result rowtype does not match the
specified FROM clause rowtype")));and here
if (type_id != tupdesc->attrs[i]->atttypid)
ereport(ERROR,
(errcode(ERRCODE_DATATYPE_MISMATCH),
errmsg("remote query result rowtype does not match the
specified FROM clause rowtype")));2. This code
errhint("You may need to increase max_worker_processes.")
Is technically hinting absolutley right.
But this extension requires something like pg_bouncer or what is
called "thread pool".
You just cannot safely fire a background task because you do not know
how many workes can be spawned. Maybe it'd be better to create 'pool'
of workers and form a queue of queries for them?
This will make possible start a millions of subtasks.
This is not easy, because pgworker is related to database, not to server.
There are servers where you can have thousands databases.
Regards
Pavel
Show quoted text
3. About getting it to the core, not as an extension. IMO this is too
sharp thing to place it into core, I think that it's best place is in
contribs.Thanks for the work on such a cool and fun extension.
Best regards, Andrey Borodin.
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On Wed, Nov 23, 2016 at 10:46 PM, amul sul <sulamul@gmail.com> wrote:
I would like to take over pg_background patch and repost for
discussion and review.Initially Robert Haas has share this for parallelism demonstration[1]
and abandoned later with
summary of open issue[2] with this pg_background patch need to be
fixed, most of them seems to be
addressed in core except handling of type exists without binary
send/recv functions and documentation.
I have added handling for types that don't have binary send/recv
functions in the attach patch and will
work on documentation at the end.One concern with this patch is code duplication with
exec_simple_query(), we could
consider Jim Nasby’s patch[3] to overcome this, but certainly we
will end up by complicating
exec_simple_query() to make pg_background happy.As discussed previously[1] pg_background is a contrib module that lets
you launch
arbitrary command in a background worker.
It looks like this could be reworked as a client of Peter Eisentraut's
background sessions code, which I think is also derived from
pg_background:
http://archives.postgresql.org/message-id/e1c2d331-ee6a-432d-e9f5-dcf85cffaf29@2ndquadrant.com
That might be good, because then we wouldn't have to maintain two
copies of the code.
--
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
2016-12-09 18:00 GMT+05:00 Robert Haas <robertmhaas@gmail.com>:
It looks like this could be reworked as a client of Peter Eisentraut's
background sessions code, which I think is also derived from
pg_background:http://archives.postgresql.org/message-id/e1c2d331-ee6a-432d-e9f5-dcf85cffaf29@2ndquadrant.com
That might be good, because then we wouldn't have to maintain two
copies of the code.
Code looks quite different. I mean bgsession.c code and pg_background.c code.
Definitly, there is possibility to refactor both patches to have
common subset of base routines, they operate with similar concepts.
But to start, it's better to choose which patch goes first, or merge
them.
There is no possibility to make one on base of other since they both
require some work.
Personally, I like C code from pg_background more. It is far better
commented and has more exceptions info for user. But interface of
bgsessions is crispier. Finally, they solve different problems.
I signed up for review there too (in background sessions patch). I
hope I'll have enough resources to provide decent review for both in
december, before commitfest.
Best regards, Andrey Borodin.
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
Hi!
Just in case you'd like to include sleepsort as a test, here it is
wrapped as a regression test(see attachment). But it has serious
downside: it runs no less than 5 seconds.
Also I'll list here every parallelism feature I managed to imagine. It
is not to say that I suggest having some of these features at v1. We
certainly have to start somewhere, this list is just for information
purposes.
1. Poolable workers. Just to be sure you can reliably queue your task
somewhere without having "increase max_connections" hint.
2. Inside one pool, you can have task chaining. After competition of
task A (query A) start task B, in case of failure start task C. Task C
is followed by task D.
3. Reliably read task status: running\awaiting\completed\errored\in a
lock. Get a query plan of a task? (I know, there are reasons why it is
not possible now)
4. Query as a future (actually it is implemented already by
pg_background_result)
5. Promised result. Declare that you are waiting for data of specific
format, execute a query, someone (from another backend) will
eventually place a data to promised result and your query continues
execution.
6. Cancelation: a way to signal to background query that it's time to
quit gracefully.
Best regards, Andrey Borodin.
Attachments:
pgb_test.difftext/plain; charset=US-ASCII; name=pgb_test.diffDownload
diff --git a/contrib/pg_background/Makefile b/contrib/pg_background/Makefile
index c4e717d..085fbff 100644
--- a/contrib/pg_background/Makefile
+++ b/contrib/pg_background/Makefile
@@ -6,6 +6,8 @@ OBJS = pg_background.o
EXTENSION = pg_background
DATA = pg_background--1.0.sql
+REGRESS = pg_background
+
ifdef USE_PGXS
PG_CONFIG = pg_config
PGXS := $(shell $(PG_CONFIG) --pgxs)
diff --git a/contrib/pg_background/expected/pg_background.out b/contrib/pg_background/expected/pg_background.out
new file mode 100644
index 0000000..dbc344e
--- /dev/null
+++ b/contrib/pg_background/expected/pg_background.out
@@ -0,0 +1,26 @@
+CREATE EXTENSION pg_background;
+--run 5 workers which wait about 1 second
+CREATE TABLE input as SELECT x FROM generate_series(1,5,1) x ORDER BY x DESC;
+CREATE TABLE output(place int,value int);
+CREATE sequence s start 1;
+CREATE TABLE handles as SELECT pg_background_launch('select pg_sleep('||x||'); insert into output values (nextval(''s''),'||x||');') h FROM input;
+SELECT (SELECT * FROM pg_background_result(h) as (x text) limit 1) FROM handles;
+ x
+----------
+ SELECT 1
+ SELECT 1
+ SELECT 1
+ SELECT 1
+ SELECT 1
+(5 rows)
+
+SELECT * FROM output ORDER BY place;
+ place | value
+-------+-------
+ 1 | 1
+ 2 | 2
+ 3 | 3
+ 4 | 4
+ 5 | 5
+(5 rows)
+
diff --git a/contrib/pg_background/sql/pg_background.sql b/contrib/pg_background/sql/pg_background.sql
new file mode 100644
index 0000000..d7cbd44
--- /dev/null
+++ b/contrib/pg_background/sql/pg_background.sql
@@ -0,0 +1,9 @@
+CREATE EXTENSION pg_background;
+
+--run 5 workers which wait about 1 second
+CREATE TABLE input as SELECT x FROM generate_series(1,5,1) x ORDER BY x DESC;
+CREATE TABLE output(place int,value int);
+CREATE sequence s start 1;
+CREATE TABLE handles as SELECT pg_background_launch('select pg_sleep('||x||'); insert into output values (nextval(''s''),'||x||');') h FROM input;
+SELECT (SELECT * FROM pg_background_result(h) as (x text) limit 1) FROM handles;
+SELECT * FROM output ORDER BY place;
On 13 December 2016 at 01:17, Andrew Borodin <borodin@octonica.com> wrote:
6. Cancelation: a way to signal to background query that it's time to
quit gracefully.
That at least should be fuss-free. SIGTERM it, and make sure the
worker does CHECK_FOR_INTERRUPTS() in regularly-hit places and loops.
Ensure the worker waits on latches rather than pg_usleep()ing.
--
Craig Ringer http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Training & Services
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On Mon, Dec 12, 2016 at 10:47 PM, Andrew Borodin <borodin@octonica.com> wrote:
Hi!
Thanks a lot for your review.
Just in case you'd like to include sleepsort as a test, here it is
wrapped as a regression test(see attachment). But it has serious
downside: it runs no less than 5 seconds.Also I'll list here every parallelism feature I managed to imagine. It
is not to say that I suggest having some of these features at v1. We
certainly have to start somewhere, this list is just for information
purposes.
1. Poolable workers. Just to be sure you can reliably queue your task
somewhere without having "increase max_connections" hint.
2. Inside one pool, you can have task chaining. After competition of
task A (query A) start task B, in case of failure start task C. Task C
is followed by task D.
I think background-session code is not that much deviated from
pg_background code, IIUC background-session patch we can manage to
reuse it, as suggested by Robert. This will allow us to maintain
session in long run, we could reuse this session to run subsequent
queries instead of maintaining separate worker pool. Thoughts?
3. Reliably read task status: running\awaiting\completed\errored\in a
lock. Get a query plan of a task? (I know, there are reasons why it is
not possible now)
+1, Let me check this.
4. Query as a future (actually it is implemented already by
pg_background_result)
5. Promised result. Declare that you are waiting for data of specific
format, execute a query, someone (from another backend) will
eventually place a data to promised result and your query continues
execution.
Could you please elaborate more?
Do you mean two way communication between foreground & background process?
6. Cancelation: a way to signal to background query that it's time to
quit gracefully.
Let me check this too.
Thanks & Regards,
Amul
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
2016-12-13 12:55 GMT+05:00 amul sul <sulamul@gmail.com>:
I think background-session code is not that much deviated from
pg_background code,
It is not derived, though it is not much deviated. background sessions
code do not have detailed exceptions and code comments, but it is
doing somewhat similar things.
IIUC background-session patch we can manage to
reuse it, as suggested by Robert. This will allow us to maintain
session in long run, we could reuse this session to run subsequent
queries instead of maintaining separate worker pool. Thoughts?
One API to deal with both features would be much better, sure.
"Object" like sessions pool are much easier to implement on top of
session "object" then on top of worker process, PIDs etc.
4. Query as a future (actually it is implemented already by
pg_background_result)
5. Promised result. Declare that you are waiting for data of specific
format, execute a query, someone (from another backend) will
eventually place a data to promised result and your query continues
execution.Could you please elaborate more?
Do you mean two way communication between foreground & background process?
It is from C++11 threading: future, promise and async - these are
related but different kinds of aquiring result between threads.
Feature - is when caller Cl start thread T(or dequeue thread from
pool) and Cl can wait until T finishes and provides result.
Here waiting the result is just like selecting from pg_background_result().
Promise - is when you declare a variable and caller do not know which
thread will put the data to this variable. But any thread reading
promise will wait until other thread put a data to promise.
There are more parallelism patterns there, like async, deffered, lazy,
but futures and promises from my point of view are most used.
6. Cancelation: a way to signal to background query that it's time to
quit gracefully.Let me check this too.
I think Craig is right: any background query must be ready to be shut
down. That's what databases are about, you can safely pull the plug at
any moment.
I've remembered one more parallelism pattern: critical region. It's
when you ask the system "plz no TERM now, and, if you can, postpone
the vacuums, OOMKs and that kind of stuff" from user code. But I do
not think it's usable pattern here.
Thank you for your work.
Best regards, Andrey Borodin.
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On Tue, Dec 13, 2016 at 2:05 PM, Andrew Borodin <borodin@octonica.com> wrote:
2016-12-13 12:55 GMT+05:00 amul sul <sulamul@gmail.com>:
I think background-session code is not that much deviated from
pg_background code,It is not derived, though it is not much deviated. background sessions
code do not have detailed exceptions and code comments, but it is
doing somewhat similar things.IIUC background-session patch we can manage to
reuse it, as suggested by Robert. This will allow us to maintain
session in long run, we could reuse this session to run subsequent
queries instead of maintaining separate worker pool. Thoughts?One API to deal with both features would be much better, sure.
"Object" like sessions pool are much easier to implement on top of
session "object" then on top of worker process, PIDs etc.4. Query as a future (actually it is implemented already by
pg_background_result)
5. Promised result. Declare that you are waiting for data of specific
format, execute a query, someone (from another backend) will
eventually place a data to promised result and your query continues
execution.Could you please elaborate more?
Do you mean two way communication between foreground & background process?It is from C++11 threading: future, promise and async - these are
related but different kinds of aquiring result between threads.
Feature - is when caller Cl start thread T(or dequeue thread from
pool) and Cl can wait until T finishes and provides result.
Here waiting the result is just like selecting from pg_background_result().
Promise - is when you declare a variable and caller do not know which
thread will put the data to this variable. But any thread reading
promise will wait until other thread put a data to promise.
There are more parallelism patterns there, like async, deffered, lazy,
but futures and promises from my point of view are most used.
Nice, thanks for detailed explanation.
We can use shm_mq infrastructure to share any kind of message between
two processes,
but perhaps we might end up with overestimating what originally pg_background
could used for - the user backend will launch workers and give them an
initial set
of instruction and then results will stream back from the workers to
the user backend.
6. Cancelation: a way to signal to background query that it's time to
quit gracefully.Let me check this too.
I think Craig is right: any background query must be ready to be shut
down. That's what databases are about, you can safely pull the plug at
any moment.
SIGTERM is handled in current pg_background patch, user can terminate
backend execution using pg_cancel_backend() or pg_terminate_backend()
as shown below:
postgres=> select pg_background_launch('insert into foo
values(generate_series(1,100000000))');
pg_background_launch
----------------------
67069
(1 row)
postgres=> select pg_terminate_backend(67069);
pg_terminate_backend
----------------------
t
(1 row)
postgres=> select * from pg_background_result(67069) as (x text);
ERROR: terminating connection due to administrator command
CONTEXT: background worker, pid 67069
postgres=>
Thanks & Regards,
Amul
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On 13 Dec. 2016 20:54, "amul sul" <sulamul@gmail.com> wrote:
postgres=> select * from pg_background_result(67069) as (x text);
ERROR: terminating connection due to administrator command
CONTEXT: background worker, pid 67069
postgres=>
It'll also want to handle cancellation due to conflict with recovery if you
intend it to be used on a standby, probably by making use
of procsignal_sigusr1_handler. The rest of the work is done by
CHECK_FOR_INTERRUPTS() .
This only matters if it's meant to work on standbys of course. I haven't
checked if you write to catalogs or otherwise do non-standby-friendly
things.
On Mon, Dec 12, 2016 at 10:17:24PM +0500, Andrew Borodin wrote:
Hi!
Just in case you'd like to include sleepsort as a test, here it is
wrapped as a regression test(see attachment). But it has serious
downside: it runs no less than 5 seconds.
Couldn't it sleep in increments smaller than a second? Like maybe
milliseconds? Also, it's probably cleaner (or at least more
comprehensible) to write something using format() and dollar quoting
than the line with the double 's.
Best,
David.
--
David Fetter <david(at)fetter(dot)org> http://fetter.org/
Phone: +1 415 235 3778 AIM: dfetter666 Yahoo!: dfetter
Skype: davidfetter XMPP: david(dot)fetter(at)gmail(dot)com
Remember to vote!
Consider donating to Postgres: http://www.postgresql.org/about/donate
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
2016-12-19 4:21 GMT+05:00 David Fetter <david@fetter.org>:
Couldn't it sleep in increments smaller than a second? Like maybe
milliseconds? Also, it's probably cleaner (or at least more
comprehensible) to write something using format() and dollar quoting
than the line with the double 's.
Right. Here's version which waits for half a second. I do not see how
to path doubles via dollar sign, pg_background_launch() gets a string
as a parameter, it's not EXCEUTE USING.
Best regards, Andrey Borodin.
Attachments:
pgb_sleepsort.difftext/plain; charset=US-ASCII; name=pgb_sleepsort.diffDownload
diff --git a/contrib/pg_background/Makefile b/contrib/pg_background/Makefile
index c4e717d..085fbff 100644
--- a/contrib/pg_background/Makefile
+++ b/contrib/pg_background/Makefile
@@ -6,6 +6,8 @@ OBJS = pg_background.o
EXTENSION = pg_background
DATA = pg_background--1.0.sql
+REGRESS = pg_background
+
ifdef USE_PGXS
PG_CONFIG = pg_config
PGXS := $(shell $(PG_CONFIG) --pgxs)
diff --git a/contrib/pg_background/expected/pg_background.out b/contrib/pg_background/expected/pg_background.out
new file mode 100644
index 0000000..a6613ce
--- /dev/null
+++ b/contrib/pg_background/expected/pg_background.out
@@ -0,0 +1,31 @@
+CREATE EXTENSION pg_background;
+--run 6 workers which wait .0, .1, .2, .3, .4, .5 seconds respectively
+CREATE TABLE input as SELECT x FROM generate_series(0,.5,0.1) x ORDER BY x DESC;
+CREATE TABLE output(place int,value float);
+--sequence for indication of who's finished on which place
+CREATE sequence s start 1;
+CREATE TABLE handles as SELECT pg_background_launch(format('select pg_sleep(%s); insert into output values (nextval(''s''),%s);',x,x)) h FROM input;
+--wait until everyone finishes
+SELECT (SELECT * FROM pg_background_result(h) as (x text) limit 1) FROM handles;
+ x
+----------
+ SELECT 1
+ SELECT 1
+ SELECT 1
+ SELECT 1
+ SELECT 1
+ SELECT 1
+(6 rows)
+
+--output results
+SELECT * FROM output ORDER BY place;
+ place | value
+-------+-------
+ 1 | 0
+ 2 | 0.1
+ 3 | 0.2
+ 4 | 0.3
+ 5 | 0.4
+ 6 | 0.5
+(6 rows)
+
diff --git a/contrib/pg_background/sql/pg_background.sql b/contrib/pg_background/sql/pg_background.sql
new file mode 100644
index 0000000..770f0b8
--- /dev/null
+++ b/contrib/pg_background/sql/pg_background.sql
@@ -0,0 +1,12 @@
+CREATE EXTENSION pg_background;
+
+--run 6 workers which wait .0, .1, .2, .3, .4, .5 seconds respectively
+CREATE TABLE input as SELECT x FROM generate_series(0,.5,0.1) x ORDER BY x DESC;
+CREATE TABLE output(place int,value float);
+--sequence for indication of who's finished on which place
+CREATE sequence s start 1;
+CREATE TABLE handles as SELECT pg_background_launch(format('select pg_sleep(%s); insert into output values (nextval(''s''),%s);',x,x)) h FROM input;
+--wait until everyone finishes
+SELECT (SELECT * FROM pg_background_result(h) as (x text) limit 1) FROM handles;
+--output results
+SELECT * FROM output ORDER BY place;
On Thu, Nov 24, 2016 at 09:16:53AM +0530, amul sul wrote:
Hi All,
I would like to take over pg_background patch and repost for
discussion and review.
This looks great.
Sadly, it now breaks initdb when applied atop
dd728826c538f000220af98de025c00114ad0631 with:
performing post-bootstrap initialization ... TRAP: FailedAssertion("!(((((const Node*)(rinfo))->type) == T_RestrictInfo))", File: "costsize.c", Line: 660)
sh: line 1: 2840 Aborted (core dumped) "/home/shackle/pggit/postgresql/tmp_install/home/shackle/10/bin/postgres" --single -F -O -j -c search_path=pg_catalog -c exit_on_error=true template1 > /dev/null
Best,
David.
--
David Fetter <david(at)fetter(dot)org> http://fetter.org/
Phone: +1 415 235 3778 AIM: dfetter666 Yahoo!: dfetter
Skype: davidfetter XMPP: david(dot)fetter(at)gmail(dot)com
Remember to vote!
Consider donating to Postgres: http://www.postgresql.org/about/donate
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On Tue, Dec 20, 2016 at 12:21 AM, David Fetter <david@fetter.org> wrote:
On Thu, Nov 24, 2016 at 09:16:53AM +0530, amul sul wrote:
Hi All,
I would like to take over pg_background patch and repost for
discussion and review.This looks great.
Sadly, it now breaks initdb when applied atop
dd728826c538f000220af98de025c00114ad0631 with:performing post-bootstrap initialization ... TRAP: FailedAssertion("!(((((const Node*)(rinfo))->type) == T_RestrictInfo))", File: "costsize.c", Line: 660)
sh: line 1: 2840 Aborted (core dumped) "/home/shackle/pggit/postgresql/tmp_install/home/shackle/10/bin/postgres" --single -F -O -j -c search_path=pg_catalog -c exit_on_error=true template1 > /dev/null
I've complied binary with --enable-cassert flag and pg_background
patch to the top of described commit as well as on latest HEAD, but I
am not able to reproduce this crash, see this:
[VM postgresql]$ /home/amul/Public/pg_inst/pg-master/bin/postgres
--single -F -O -j -c search_path=pg_catalog -c exit_on_error=true
template1
PostgreSQL stand-alone backend 10devel
backend> CREATE EXTENSION pg_background;
backend> select * from pg_extension;
1: extname (typeid = 19, len = 64, typmod = -1, byval = f)
2: extowner (typeid = 26, len = 4, typmod = -1, byval = t)
3: extnamespace (typeid = 26, len = 4, typmod = -1, byval = t)
4: extrelocatable (typeid = 16, len = 1, typmod = -1, byval = t)
5: extversion (typeid = 25, len = -1, typmod = -1, byval = f)
6: extconfig (typeid = 1028, len = -1, typmod = -1, byval = f)
7: extcondition (typeid = 1009, len = -1, typmod = -1, byval = f)
----
1: extname = "plpgsql" (typeid = 19, len = 64, typmod = -1, byval = f)
2: extowner = "10" (typeid = 26, len = 4, typmod = -1, byval = t)
3: extnamespace = "11" (typeid = 26, len = 4, typmod = -1, byval = t)
4: extrelocatable = "f" (typeid = 16, len = 1, typmod = -1, byval = t)
5: extversion = "1.0" (typeid = 25, len = -1, typmod = -1, byval = f)
----
1: extname = "pg_background" (typeid = 19, len = 64, typmod = -1, byval = f)
2: extowner = "10" (typeid = 26, len = 4, typmod = -1, byval = t)
3: extnamespace = "11" (typeid = 26, len = 4, typmod = -1, byval = t)
4: extrelocatable = "t" (typeid = 16, len = 1, typmod = -1, byval = t)
5: extversion = "1.0" (typeid = 25, len = -1, typmod = -1, byval = f)
----
backend>
I hope I am missing anything. Thanks !
Regards,
Amul
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On Tue, Dec 20, 2016 at 11:11:36AM +0530, amul sul wrote:
On Tue, Dec 20, 2016 at 12:21 AM, David Fetter <david@fetter.org> wrote:
On Thu, Nov 24, 2016 at 09:16:53AM +0530, amul sul wrote:
Hi All,
I would like to take over pg_background patch and repost for
discussion and review.This looks great.
Sadly, it now breaks initdb when applied atop
dd728826c538f000220af98de025c00114ad0631 with:performing post-bootstrap initialization ... TRAP: FailedAssertion("!(((((const Node*)(rinfo))->type) == T_RestrictInfo))", File: "costsize.c", Line: 660)
sh: line 1: 2840 Aborted (core dumped) "/home/shackle/pggit/postgresql/tmp_install/home/shackle/10/bin/postgres" --single -F -O -j -c search_path=pg_catalog -c exit_on_error=true template1 > /dev/nullI've complied binary with --enable-cassert flag and pg_background
patch to the top of described commit as well as on latest HEAD, but I
am not able to reproduce this crash, see this:[VM postgresql]$ /home/amul/Public/pg_inst/pg-master/bin/postgres
--single -F -O -j -c search_path=pg_catalog -c exit_on_error=true
template1
I haven't managed to reproduce it either. Sorry about the noise.
Best,
David.
--
David Fetter <david(at)fetter(dot)org> http://fetter.org/
Phone: +1 415 235 3778 AIM: dfetter666 Yahoo!: dfetter
Skype: davidfetter XMPP: david(dot)fetter(at)gmail(dot)com
Remember to vote!
Consider donating to Postgres: http://www.postgresql.org/about/donate
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On Mon, Dec 19, 2016 at 09:30:32PM +0500, Andrew Borodin wrote:
2016-12-19 4:21 GMT+05:00 David Fetter <david@fetter.org>:
Couldn't it sleep in increments smaller than a second? Like maybe
milliseconds? Also, it's probably cleaner (or at least more
comprehensible) to write something using format() and dollar quoting
than the line with the double 's.Right. Here's version which waits for half a second. I do not see how
to path doubles via dollar sign, pg_background_launch() gets a string
as a parameter, it's not EXCEUTE USING.
I see.
I find the following a little easier to follow, and the sleeps still
work even when very short.
Best,
David.
CREATE TABLE input AS
SELECT x, row_number() OVER (ORDER BY x) n
FROM
generate_series(0,.000005,0.000001) x
ORDER BY x DESC;
CREATE TABLE output(place int,value float);
CREATE TABLE handles AS
SELECT pg_background_launch(
format($$
SELECT pg_sleep(%s);
INSERT INTO output VALUES (%s, %s)
$$,
x, n, x
)
) h
FROM input;
--wait until everyone finishes
SELECT * FROM handles JOIN LATERAL pg_background_result(handles.h) AS (x TEXT) ON (true);
--output results
SELECT * FROM output ORDER BY place;
--
David Fetter <david(at)fetter(dot)org> http://fetter.org/
Phone: +1 415 235 3778 AIM: dfetter666 Yahoo!: dfetter
Skype: davidfetter XMPP: david(dot)fetter(at)gmail(dot)com
Remember to vote!
Consider donating to Postgres: http://www.postgresql.org/about/donate
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On 9 December 2016 at 13:00, Robert Haas <robertmhaas@gmail.com> wrote:
That might be good, because then we wouldn't have to maintain two
copies of the code.
So why are there two things at all? Why is this being worked on as
well as Peter's patch? What will that give us?
--
Simon Riggs http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On Tue, Dec 20, 2016 at 7:32 PM, Simon Riggs <simon@2ndquadrant.com> wrote:
On 9 December 2016 at 13:00, Robert Haas <robertmhaas@gmail.com> wrote:
That might be good, because then we wouldn't have to maintain two
copies of the code.So why are there two things at all? Why is this being worked on as
well as Peter's patch? What will that give us?
A feature that can be accessed without writing C code.
--
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
2016-12-21 4:55 GMT+05:00 David Fetter <david@fetter.org>:
I see.
I find the following a little easier to follow, and the sleeps still
work even when very short.
Now I know a little more SQL :) Thank you.
I'm not sure every platform supports microsecond sleeps. But it will
work anyway. This test is deterministic.
Without sequence here is no race condition. So it is not sleepsort, it
is deterministic. Though I agree that it is good thing for test, I'd
still add some miliseconds to test case when main query for sure have
to wait end of other sleeping query.
.
CREATE TABLE input AS
SELECT x, row_number() OVER (ORDER BY x) n
FROM
generate_series(0,.000005,0.000001) x
ORDER BY x DESC;CREATE TABLE output(place int,value float);
CREATE TABLE handles AS
SELECT pg_background_launch(
format($$
SELECT pg_sleep(%s);
INSERT INTO output VALUES (%s, %s)
$$,
x, n, x
)
) h
FROM input;--wait until everyone finishes
SELECT * FROM handles JOIN LATERAL pg_background_result(handles.h) AS (x TEXT) ON (true);--output results
SELECT * FROM output ORDER BY place;
Best regrards, Andrey Borodin.
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On 21 December 2016 at 14:26, Andrew Borodin <borodin@octonica.com> wrote:
I'm not sure every platform supports microsecond sleeps
Windows at least doesn't by default, unless that changed in Win2k12
and Win8 with the same platform/kernel improvements that delivered
https://msdn.microsoft.com/en-us/library/hh706895(v=vs.85).aspx . I'm
not sure. On older systems sleeps are 1ms to 15ms.
--
Craig Ringer http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Training & Services
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On Wed, Dec 21, 2016 at 06:31:52PM +0800, Craig Ringer wrote:
On 21 December 2016 at 14:26, Andrew Borodin <borodin@octonica.com> wrote:
I'm not sure every platform supports microsecond sleeps
Windows at least doesn't by default, unless that changed in Win2k12
and Win8 with the same platform/kernel improvements that delivered
https://msdn.microsoft.com/en-us/library/hh706895(v=vs.85).aspx . I'm
not sure. On older systems sleeps are 1ms to 15ms.
Apparently, as of 2011, there were ways to do this. It's not crystal
clear to me just how reliable they are.
http://stackoverflow.com/questions/9116618/cpp-windows-is-there-a-sleep-function-in-microseconds
Best,
David.
--
David Fetter <david(at)fetter(dot)org> http://fetter.org/
Phone: +1 415 235 3778 AIM: dfetter666 Yahoo!: dfetter
Skype: davidfetter XMPP: david(dot)fetter(at)gmail(dot)com
Remember to vote!
Consider donating to Postgres: http://www.postgresql.org/about/donate
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On Wed, Dec 21, 2016 at 10:29 AM, David Fetter <david@fetter.org> wrote:
On Wed, Dec 21, 2016 at 06:31:52PM +0800, Craig Ringer wrote:
On 21 December 2016 at 14:26, Andrew Borodin <borodin@octonica.com> wrote:
I'm not sure every platform supports microsecond sleeps
Windows at least doesn't by default, unless that changed in Win2k12
and Win8 with the same platform/kernel improvements that delivered
https://msdn.microsoft.com/en-us/library/hh706895(v=vs.85).aspx . I'm
not sure. On older systems sleeps are 1ms to 15ms.Apparently, as of 2011, there were ways to do this. It's not crystal
clear to me just how reliable they are.http://stackoverflow.com/questions/9116618/cpp-windows-is-there-a-sleep-function-in-microseconds
This whole subthread seems like a distraction to me. I find it hard
to believe that this test case would be stable enough to survive the
buildfarm where, don't forget, we have things like
CLOBBER_CACHE_ALWAYS machines where queries take 100x longer to run.
But even if it is, surely we can pick a less contrived test case. So
why worry about this?
--
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On Wed, Dec 21, 2016 at 10:42:18AM -0500, Robert Haas wrote:
On Wed, Dec 21, 2016 at 10:29 AM, David Fetter <david@fetter.org> wrote:
On Wed, Dec 21, 2016 at 06:31:52PM +0800, Craig Ringer wrote:
On 21 December 2016 at 14:26, Andrew Borodin <borodin@octonica.com> wrote:
I'm not sure every platform supports microsecond sleeps
Windows at least doesn't by default, unless that changed in Win2k12
and Win8 with the same platform/kernel improvements that delivered
https://msdn.microsoft.com/en-us/library/hh706895(v=vs.85).aspx . I'm
not sure. On older systems sleeps are 1ms to 15ms.Apparently, as of 2011, there were ways to do this. It's not crystal
clear to me just how reliable they are.http://stackoverflow.com/questions/9116618/cpp-windows-is-there-a-sleep-function-in-microseconds
This whole subthread seems like a distraction to me. I find it hard
to believe that this test case would be stable enough to survive the
buildfarm where, don't forget, we have things like
CLOBBER_CACHE_ALWAYS machines where queries take 100x longer to run.
But even if it is, surely we can pick a less contrived test case.
So why worry about this?
I wasn't super worried about the actual sleep times, but I was having
trouble puzzling out what the test was actually doing, so I rewrote it
with what I thought of as more clarity.
Best,
David.
--
David Fetter <david(at)fetter(dot)org> http://fetter.org/
Phone: +1 415 235 3778 AIM: dfetter666 Yahoo!: dfetter
Skype: davidfetter XMPP: david(dot)fetter(at)gmail(dot)com
Remember to vote!
Consider donating to Postgres: http://www.postgresql.org/about/donate
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
2016-12-21 20:42 GMT+05:00 Robert Haas <robertmhaas@gmail.com>:
This whole subthread seems like a distraction to me. I find it hard
to believe that this test case would be stable enough to survive the
buildfarm where, don't forget, we have things like
CLOBBER_CACHE_ALWAYS machines where queries take 100x longer to run.
But even if it is, surely we can pick a less contrived test case. So
why worry about this?
David Fetter's test is deterministic and shall pass no matter how slow
and unpredictable perfromance is on a server.
Best regards, Andrey Borodin.
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
Hi all,
As we have discussed previously, we need to rework this patch as a client of
Peter Eisentraut's background sessions code[1]/messages/by-id/e1c2d331-ee6a-432d-e9f5-dcf85cffaf29@2ndquadrant.com..
Attaching trial version patch to discussed possible design and api.
We could have following APIs :
• pg_background_launch : This function start and stores new background
session, and returns the process id of background worker.
• pg_background_run : This API takes the process id and SQL command as
input parameter. Using this process id, stored worker's session is
retrieved and give SQL command is executed under it.
• pg_background_result : This API takes the process id as input
parameter and returns the result of command executed thought the
background worker session. Same as it was before but now result can
be fetch in LIFO order i.e. result of last executed query using
pg_background_run will be fetched first.
• pg_background_detach : This API takes the process id and detach the
background process. Stored worker's session is not dropped until this
called.
• TBC : API to discard result of last query or discard altogether?
• TBC : How about having one more api to see all existing sessions ?
Kindly share your thoughts/suggestions. Note that attach patch is WIP
version, code, comments & behaviour could be vague.
------------------
Quick demo:
------------------
Apply attach patch to the top of Peter Eisentraut's
0001-Add-background-sessions.patch[1]/messages/by-id/e1c2d331-ee6a-432d-e9f5-dcf85cffaf29@2ndquadrant.com.
postgres=# select pg_background_launch();
pg_background_launch
----------------------
21004
(1 row)
postgres=# select pg_background_run(21004, 'vacuum verbose foo');
pg_background_run
-------------------
(1 row)
postgres=# select * from pg_background_result(21004) as (x text);
INFO: vacuuming "public.foo"
INFO: "foo": found 0 removable, 5 nonremovable row versions in 1 out of 1 pages
DETAIL: 0 dead row versions cannot be removed yet.
There were 0 unused item pointers.
Skipped 0 pages due to buffer pins.
0 pages are entirely empty.
CPU: user: 0.00 s, system: 0.00 s, elapsed: 0.00 s.
x
--------
VACUUM
(1 row)
postgres=# select pg_background_run(21004, 'select * from foo');
pg_background_run
-------------------
(1 row)
postgres=# select * from pg_background_result(21004) as (x int);
x
---
1
2
3
4
5
(5 rows)
postgres=# select pg_background_detach(21004);
pg_background_detach
----------------------
(1 row)
References :
[1]: /messages/by-id/e1c2d331-ee6a-432d-e9f5-dcf85cffaf29@2ndquadrant.com.
Regards,
Amul Sul
Attachments:
0002-pg_background_worker_as_client_of_bgsession_trial.patchapplication/octet-stream; name=0002-pg_background_worker_as_client_of_bgsession_trial.patchDownload
diff --git a/contrib/Makefile b/contrib/Makefile
index 25263c0..04ec28a 100644
--- a/contrib/Makefile
+++ b/contrib/Makefile
@@ -29,6 +29,7 @@ SUBDIRS = \
oid2name \
pageinspect \
passwordcheck \
+ pg_background \
pg_buffercache \
pg_freespacemap \
pg_prewarm \
diff --git a/contrib/pg_background/Makefile b/contrib/pg_background/Makefile
new file mode 100644
index 0000000..c4e717d
--- /dev/null
+++ b/contrib/pg_background/Makefile
@@ -0,0 +1,18 @@
+# contrib/pg_background/Makefile
+
+MODULE_big = pg_background
+OBJS = pg_background.o
+
+EXTENSION = pg_background
+DATA = pg_background--1.0.sql
+
+ifdef USE_PGXS
+PG_CONFIG = pg_config
+PGXS := $(shell $(PG_CONFIG) --pgxs)
+include $(PGXS)
+else
+subdir = contrib/pg_background
+top_builddir = ../..
+include $(top_builddir)/src/Makefile.global
+include $(top_srcdir)/contrib/contrib-global.mk
+endif
diff --git a/contrib/pg_background/pg_background--1.0.sql b/contrib/pg_background/pg_background--1.0.sql
new file mode 100644
index 0000000..bc8a881
--- /dev/null
+++ b/contrib/pg_background/pg_background--1.0.sql
@@ -0,0 +1,30 @@
+/* contrib/pg_background/pg_background--1.0.sql */
+
+-- complain if script is sourced in psql, rather than via CREATE EXTENSION
+\echo Use "CREATE EXTENSION pg_background" to load this file. \quit
+
+CREATE FUNCTION pg_background_launch()
+ RETURNS pg_catalog.int4 STRICT
+ AS 'MODULE_PATHNAME' LANGUAGE C;
+
+CREATE FUNCTION pg_background_run(pid pg_catalog.int4, sql pg_catalog.text)
+ RETURNS pg_catalog.void STRICT
+ AS 'MODULE_PATHNAME' LANGUAGE C;
+
+CREATE FUNCTION pg_background_result(pid pg_catalog.int4)
+ RETURNS SETOF pg_catalog.record STRICT
+ AS 'MODULE_PATHNAME' LANGUAGE C;
+
+CREATE FUNCTION pg_background_detach(pid pg_catalog.int4)
+ RETURNS pg_catalog.void STRICT
+ AS 'MODULE_PATHNAME' LANGUAGE C;
+
+REVOKE ALL ON FUNCTION pg_background_launch()
+ FROM public;
+REVOKE ALL ON FUNCTION pg_background_run(pid pg_catalog.int4,
+ sql pg_catalog.text)
+ FROM public;
+REVOKE ALL ON FUNCTION pg_background_result(pg_catalog.int4)
+ FROM public;
+REVOKE ALL ON FUNCTION pg_background_detach(pg_catalog.int4)
+ FROM public;
diff --git a/contrib/pg_background/pg_background.c b/contrib/pg_background/pg_background.c
new file mode 100644
index 0000000..8d26b38
--- /dev/null
+++ b/contrib/pg_background/pg_background.c
@@ -0,0 +1,316 @@
+/*--------------------------------------------------------------------------
+ *
+ * pg_background.c
+ * Run SQL commands using a background worker.
+ *
+ * Copyright (C) 2016, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ * contrib/pg_background/pg_background.c
+ *
+ * -------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "access/htup_details.h"
+#include "catalog/pg_type.h"
+#include "funcapi.h"
+#include "miscadmin.h"
+#include "tcop/bgsession.h"
+#include "utils/acl.h"
+#include "utils/builtins.h"
+
+/* Private state maintained by the launching backend for IPC. */
+typedef struct pg_background_worker_info
+{
+ pid_t pid;
+ Oid current_user_id;
+ BackgroundSession *session;
+ uint32 result_count; /* TODO: uint32? */
+} pg_background_worker_info;
+
+/* Private state maintained across calls to pg_background_result. */
+typedef struct pg_background_result_state
+{
+ pg_background_worker_info *info;
+ BackgroundSessionResult *result;
+} pg_background_result_state;
+
+static HTAB *worker_hash;
+
+static void remove_worker_info(pid_t pid);
+static pg_background_worker_info *find_worker_info(pid_t pid);
+static void save_worker_info(pid_t pid, BackgroundSession *session);
+static void check_rights(pg_background_worker_info *info);
+
+PG_MODULE_MAGIC;
+
+PG_FUNCTION_INFO_V1(pg_background_launch);
+PG_FUNCTION_INFO_V1(pg_background_run);
+PG_FUNCTION_INFO_V1(pg_background_result);
+PG_FUNCTION_INFO_V1(pg_background_detach);
+
+/*
+ * Start a dynamic background worker.
+ */
+Datum
+pg_background_launch(PG_FUNCTION_ARGS)
+{
+ BackgroundSession *session;
+ pid_t pid;
+
+ session = BackgroundSessionNew(&pid);
+
+ /* Save worker info */
+ save_worker_info(pid, session);
+
+ /* Return the worker's PID. */
+ PG_RETURN_INT32(pid);
+}
+
+/*
+ * Run a user-specified SQL command.
+ */
+Datum
+pg_background_run(PG_FUNCTION_ARGS)
+{
+ int32 pid = PG_GETARG_INT32(0);
+ char *sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
+ pg_background_worker_info *info;
+
+ /* See if we have a connection to the specified PID. */
+ if ((info = find_worker_info(pid)) == NULL)
+ ereport(ERROR,
+ (errcode(ERRCODE_UNDEFINED_OBJECT),
+ errmsg("PID %d is not attached to this session", pid)));
+ check_rights(info);
+
+ /* Execute give SQL query */
+ BackgroundSessionExecuteSQL(info->session, sql);
+ info->result_count++;
+
+ PG_RETURN_VOID();
+}
+
+
+/*
+ * Retrieve the results of a background query previously launched in this
+ * session.
+ */
+Datum
+pg_background_result(PG_FUNCTION_ARGS)
+{
+ int32 pid = PG_GETARG_INT32(0);
+ FuncCallContext *funcctx;
+ pg_background_result_state *state;
+ TupleDesc tupdesc;
+ BackgroundSessionResult *result;
+
+ /* First-time setup. */
+ if (SRF_IS_FIRSTCALL())
+ {
+ MemoryContext oldcontext;
+ pg_background_worker_info *info;
+
+ funcctx = SRF_FIRSTCALL_INIT();
+ oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
+
+ /* See if we have a connection to the specified PID. */
+ if ((info = find_worker_info(pid)) == NULL)
+ ereport(ERROR,
+ (errcode(ERRCODE_UNDEFINED_OBJECT),
+ errmsg("PID %d is not attached to this session", pid)));
+ check_rights(info);
+
+ /* Can't read results twice. */
+ if (info->result_count <= 0)
+ ereport(ERROR,
+ (errcode(ERRCODE_UNDEFINED_OBJECT),
+ errmsg("results for PID %d have already been consumed", pid)));
+ info->result_count--;
+
+ /* Set up tuple-descriptor based on column definition list. */
+ if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("function returning record called in context "
+ "that cannot accept type record"),
+ errhint("Try calling the function in the FROM clause "
+ "using a column definition list.")));
+ result = BackgroundSessionFetchResult(info->session);
+ funcctx->tuple_desc = BlessTupleDesc(tupdesc);
+
+ if (result->tupdesc && tupdesc->natts != result->tupdesc->natts)
+ ereport(ERROR,
+ (errcode(ERRCODE_DATATYPE_MISMATCH),
+ errmsg("remote query result rowtype does not match the specified FROM clause rowtype")));
+
+ /* Cache state that will be needed on every call. */
+ state = palloc0(sizeof(pg_background_result_state));
+ state->info = info;
+ state->result = result;
+
+ funcctx->user_fctx = state;
+ MemoryContextSwitchTo(oldcontext);
+ }
+
+ funcctx = SRF_PERCALL_SETUP();
+ tupdesc = funcctx->tuple_desc;
+ state = funcctx->user_fctx;
+ result = state->result;
+
+ if (result->tupdesc)
+ {
+ if (result->tuples != NIL)
+ {
+ HeapTuple tuple = (HeapTuple) linitial(result->tuples);
+ Datum *values = (Datum *) palloc(tupdesc->natts * sizeof(Datum));
+ bool *isnull = (bool *) palloc(tupdesc->natts * sizeof(bool));
+
+ /*
+ * Perform conversion of a tuple according to column
+ * defination list given in FROM clause
+ */
+ heap_deform_tuple(tuple, result->tupdesc, values, isnull);
+ tuple = heap_form_tuple(tupdesc, values, isnull);
+ result->tuples = list_delete_first(result->tuples);
+
+ SRF_RETURN_NEXT(funcctx, HeapTupleGetDatum(tuple));
+ }
+ }
+ else /* If no data rows, return the command tags instead. */
+ {
+ if (tupdesc->natts != 1 || tupdesc->attrs[0]->atttypid != TEXTOID)
+ ereport(ERROR,
+ (errcode(ERRCODE_DATATYPE_MISMATCH),
+ errmsg("remote query did not return a result set, but result rowtype is not a single text column")));
+
+ if (result->command != NULL)
+ {
+ bool isnull = false;
+ Datum value = PointerGetDatum(cstring_to_text(result->command));
+ HeapTuple tuple = heap_form_tuple(tupdesc, &value, &isnull);
+
+ result->command = NULL;
+
+ SRF_RETURN_NEXT(funcctx, HeapTupleGetDatum(tuple));
+ }
+ }
+
+ SRF_RETURN_DONE(funcctx);
+}
+
+/*
+ * Detach from the dynamic shared memory segment used for communication with
+ * a background worker. This prevents the worker from stalling waiting for
+ * us to read its results.
+ */
+Datum
+pg_background_detach(PG_FUNCTION_ARGS)
+{
+ int32 pid = PG_GETARG_INT32(0);
+ pg_background_worker_info *info;
+
+ info = find_worker_info(pid);
+ if (info == NULL)
+ ereport(ERROR,
+ (errcode(ERRCODE_UNDEFINED_OBJECT),
+ errmsg("PID %d is not attached to this session", pid)));
+
+ check_rights(info);
+ remove_worker_info(pid);
+ BackgroundSessionEnd(info->session);
+
+ PG_RETURN_VOID();
+}
+
+static void
+remove_worker_info(pid_t pid)
+{
+ bool found;
+
+ /* Remove the hashtable entry. */
+ hash_search(worker_hash, (void *) &pid, HASH_REMOVE, &found);
+ if (!found)
+ elog(ERROR, "pg_background worker_hash table corrupted");
+}
+
+/*
+ * Find the background worker information for the worker with a given PID.
+ */
+static pg_background_worker_info *
+find_worker_info(pid_t pid)
+{
+ pg_background_worker_info *info = NULL;
+
+ if (worker_hash != NULL)
+ info = hash_search(worker_hash, (void *) &pid, HASH_FIND, NULL);
+
+ return info;
+}
+
+/*
+ * Save worker info.
+ */
+static void
+save_worker_info(pid_t pid, BackgroundSession *session)
+{
+ pg_background_worker_info *info;
+ Oid current_user_id;
+ int sec_context;
+
+ /* If the hash table hasn't been set up yet, do that now. */
+ if (worker_hash == NULL)
+ {
+ HASHCTL ctl;
+
+ ctl.keysize = sizeof(pid_t);
+ ctl.entrysize = sizeof(pg_background_worker_info);
+ worker_hash = hash_create("pg_background worker_hash", 8, &ctl,
+ HASH_ELEM);
+ }
+
+ /* Get current authentication information. */
+ GetUserIdAndSecContext(¤t_user_id, &sec_context);
+
+ /*
+ * In the unlikely event that there's an older worker with this PID,
+ * just detach it - unless it has a different user ID than the
+ * currently-active one, in which case someone might be trying to pull
+ * a fast one. Let's kill the backend to make sure we don't break
+ * anyone's expectations.
+ */
+ if ((info = find_worker_info(pid)) != NULL)
+ {
+ if (current_user_id != info->current_user_id)
+ ereport(FATAL,
+ (errcode(ERRCODE_DUPLICATE_OBJECT),
+ errmsg("background worker with PID \"%d\" already exists",
+ pid)));
+ }
+
+ /* Create a new entry for this worker. */
+ info = hash_search(worker_hash, (void *) &pid, HASH_ENTER, NULL);
+ info->session = session;
+ info->result_count = 0;
+ info->current_user_id = current_user_id;
+}
+
+/*
+ * Check whether the current user has rights to manipulate the background
+ * worker with the given PID.
+ */
+static void
+check_rights(pg_background_worker_info *info)
+{
+ Oid current_user_id;
+ int sec_context;
+
+ GetUserIdAndSecContext(¤t_user_id, &sec_context);
+ if (!has_privs_of_role(current_user_id, info->current_user_id))
+ ereport(ERROR,
+ (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
+ errmsg("permission denied for background worker with PID \"%d\"",
+ info->pid)));
+}
diff --git a/contrib/pg_background/pg_background.control b/contrib/pg_background/pg_background.control
new file mode 100644
index 0000000..733d0e1
--- /dev/null
+++ b/contrib/pg_background/pg_background.control
@@ -0,0 +1,4 @@
+comment = 'Run SQL queries in the background'
+default_version = '1.0'
+module_pathname = '$libdir/pg_background'
+relocatable = true
diff --git a/src/backend/tcop/bgsession.c b/src/backend/tcop/bgsession.c
index 2cc6438..143a244 100644
--- a/src/backend/tcop/bgsession.c
+++ b/src/backend/tcop/bgsession.c
@@ -115,9 +115,16 @@ static void invalid_protocol_message(char msgtype) pg_attribute_noreturn();
BackgroundSession *
BackgroundSessionStart(void)
{
+ pid_t *pid = 0;
+
+ return BackgroundSessionNew(pid);
+}
+
+BackgroundSession *
+BackgroundSessionNew(pid_t *pid)
+{
ResourceOwner oldowner;
BackgroundWorker worker;
- pid_t pid;
BackgroundSession *session;
shm_toc_estimator e;
Size segsize;
@@ -131,8 +138,11 @@ BackgroundSessionStart(void)
BgwHandleStatus bgwstatus;
StringInfoData msg;
char msgtype;
+ MemoryContext oldcontext;
+ oldcontext = MemoryContextSwitchTo(TopMemoryContext);
session = palloc(sizeof(*session));
+ MemoryContextSwitchTo(oldcontext);
session->resowner = ResourceOwnerCreate(NULL, "background session");
@@ -184,8 +194,10 @@ BackgroundSessionStart(void)
shm_toc_insert(toc, BGSESSION_KEY_RESPONSE_QUEUE, response_mq);
shm_mq_set_receiver(response_mq, MyProc);
+ oldcontext = MemoryContextSwitchTo(TopMemoryContext);
session->command_qh = shm_mq_attach(command_mq, seg, NULL);
session->response_qh = shm_mq_attach(response_mq, seg, NULL);
+ MemoryContextSwitchTo(oldcontext);
worker.bgw_flags =
BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION;
@@ -196,16 +208,18 @@ BackgroundSessionStart(void)
worker.bgw_main_arg = UInt32GetDatum(dsm_segment_handle(seg));
worker.bgw_notify_pid = MyProcPid;
+ oldcontext = MemoryContextSwitchTo(TopMemoryContext);
if (!RegisterDynamicBackgroundWorker(&worker, &session->worker_handle))
ereport(ERROR,
(errcode(ERRCODE_INSUFFICIENT_RESOURCES),
errmsg("could not register background process"),
errhint("You might need to increase max_worker_processes.")));
+ MemoryContextSwitchTo(oldcontext);
shm_mq_set_handle(session->command_qh, session->worker_handle);
shm_mq_set_handle(session->response_qh, session->worker_handle);
- bgwstatus = WaitForBackgroundWorkerStartup(session->worker_handle, &pid);
+ bgwstatus = WaitForBackgroundWorkerStartup(session->worker_handle, pid);
if (bgwstatus != BGWH_STARTED)
ereport(ERROR,
(errcode(ERRCODE_INSUFFICIENT_RESOURCES),
@@ -256,24 +270,41 @@ BackgroundSessionEnd(BackgroundSession *session)
dsm_detach(session->seg);
ResourceOwnerRelease(session->resowner, RESOURCE_RELEASE_BEFORE_LOCKS, false, false);
ResourceOwnerDelete(session->resowner);
- pfree(session);
+//TODO: pfree(session);
}
-
BackgroundSessionResult *
BackgroundSessionExecute(BackgroundSession *session, const char *sql)
{
- StringInfoData msg;
- char msgtype;
BackgroundSessionResult *result;
+ BackgroundSessionExecuteSQL(session, sql);
+ result = BackgroundSessionFetchResult(session);
+
+ return result;
+}
+
+void
+BackgroundSessionExecuteSQL(BackgroundSession *session, const char *sql)
+{
+ StringInfoData msg;
+
pq_redirect_to_shm_mq(session->seg, session->command_qh);
pq_beginmessage(&msg, 'Q');
pq_sendstring(&msg, sql);
pq_endmessage(&msg);
pq_stop_redirect_to_shm_mq();
+}
+
+BackgroundSessionResult *
+BackgroundSessionFetchResult(BackgroundSession *session)
+{
+ StringInfoData msg;
+ char msgtype;
+ BackgroundSessionResult *result;
result = palloc0(sizeof(*result));
+ result->command = NULL;
do
{
@@ -322,7 +353,6 @@ BackgroundSessionExecute(BackgroundSession *session, const char *sql)
return result;
}
-
BackgroundSessionPreparedStatement *
BackgroundSessionPrepare(BackgroundSession *session, const char *sql, int nargs,
Oid argtypes[], const char *argnames[])
diff --git a/src/include/tcop/bgsession.h b/src/include/tcop/bgsession.h
index 70dad45..f38cb29 100644
--- a/src/include/tcop/bgsession.h
+++ b/src/include/tcop/bgsession.h
@@ -18,9 +18,11 @@ typedef struct BackgroundSessionResult
} BackgroundSessionResult;
BackgroundSession *BackgroundSessionStart(void);
+BackgroundSession *BackgroundSessionNew(pid_t *pid);
void BackgroundSessionEnd(BackgroundSession *session);
BackgroundSessionResult *BackgroundSessionExecute(BackgroundSession *session, const char *sql);
+void BackgroundSessionExecuteSQL(BackgroundSession *session, const char *sql);
+BackgroundSessionResult * BackgroundSessionFetchResult(BackgroundSession *session);
BackgroundSessionPreparedStatement *BackgroundSessionPrepare(BackgroundSession *session, const char *sql, int nargs, Oid argtypes[], const char *argnames[]);
BackgroundSessionResult *BackgroundSessionExecutePrepared(BackgroundSessionPreparedStatement *stmt, int nargs, Datum values[], bool nulls[]);
-
#endif /* BGSESSION_H */
On 12/22/16 4:20 AM, amul sul wrote:
• pg_background_detach : This API takes the process id and detach the
background process. Stored worker's session is not dropped until this
called.
When I hear "detach" I think that whatever I'm detaching from is going
to stick around, which I don't think is the case here, right? I'd
suggest pg_background_close() instead.
--
Jim Nasby, Data Architect, Blue Treble Consulting, Austin TX
Experts in Analytics, Data Architecture and PostgreSQL
Data in Trouble? Get it in Treble! http://BlueTreble.com
855-TREBLE2 (855-873-2532)
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On Thu, Dec 22, 2016 at 4:41 PM, Jim Nasby <Jim.Nasby@bluetreble.com> wrote:
On 12/22/16 4:20 AM, amul sul wrote:
• pg_background_detach : This API takes the process id and detach the
background process. Stored worker's session is not dropped until this
called.When I hear "detach" I think that whatever I'm detaching from is going to
stick around, which I don't think is the case here, right? I'd suggest
pg_background_close() instead.
Uh, I think it is. At least in the original version of this patch,
pg_background_detach() leaves the spawned process running but says
that you don't care to read any results it may generate.
--
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
Hi all,
Attaching latest pg_background patch for review as per design proposed
on 22 Dec '16 with following minor changes in the api.
Changes:
1. pg_background_launch renamed to pg_background_start
2. pg_background_detach renamed to pg_background_close
3. Added new api to display previously launch background worker & its
result count waiting to be read.
Todo:
1. Documentation.
Thanks to Andrew Borodin & David Fetter for regression test script,
added in the attached version patch.
Note that attached patches need to apply to the top of the Peter
Eisentraut's v2 patch[1]/messages/by-id/attachment/48403/v2-0001-Add-background-sessions.patch.
Patch 0001 is does some changes in BackgroundSession code required by
pg_background, which we are currently discussing on BackgroundSession
thread.
References:
[1]: /messages/by-id/attachment/48403/v2-0001-Add-background-sessions.patch
Regards,
Amul
Attachments:
0001-Changes-to-Peter-Eisentraut-s-bgsession-v2-patch.patchapplication/octet-stream; name=0001-Changes-to-Peter-Eisentraut-s-bgsession-v2-patch.patchDownload
From f0fd108b415abf7bc425d6ca94e8f2f5bf3a827f Mon Sep 17 00:00:00 2001
From: Amul Sul <sulamul@gmail.com>
Date: Fri, 6 Jan 2017 18:13:13 +0530
Subject: [PATCH 1/2] Changes to Peter Eisentraut's bgsession v2 patch
---
src/backend/tcop/bgsession.c | 26 +++++++++-----------------
src/include/tcop/bgsession.h | 19 +++++++++++++++----
src/pl/plpython/plpy_bgsession.c | 2 +-
3 files changed, 25 insertions(+), 22 deletions(-)
diff --git a/src/backend/tcop/bgsession.c b/src/backend/tcop/bgsession.c
index 486819b..26dee53 100644
--- a/src/backend/tcop/bgsession.c
+++ b/src/backend/tcop/bgsession.c
@@ -60,14 +60,11 @@
#include "nodes/pg_list.h"
#include "pgstat.h"
#include "postmaster/bgworker.h"
-#include "storage/dsm.h"
-#include "storage/shm_mq.h"
#include "storage/shm_toc.h"
#include "tcop/bgsession.h"
#include "tcop/tcopprot.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
-#include "utils/resowner.h"
/* Table-of-contents constants for our dynamic shared memory segment. */
#define BGSESSION_MAGIC 0x50674267
@@ -89,16 +86,6 @@ typedef struct background_session_fixed_data
int sec_context;
} background_session_fixed_data;
-struct BackgroundSession
-{
- ResourceOwner resowner;
- dsm_segment *seg;
- BackgroundWorkerHandle *worker_handle;
- shm_mq_handle *command_qh;
- shm_mq_handle *response_qh;
- int transaction_status;
-};
-
struct BackgroundSessionPreparedStatement
{
BackgroundSession *session;
@@ -117,11 +104,10 @@ static void invalid_protocol_message(char msgtype) pg_attribute_noreturn();
BackgroundSession *
-BackgroundSessionStart(void)
+BackgroundSessionStart(MemoryContext context)
{
ResourceOwner oldowner;
BackgroundWorker worker;
- pid_t pid;
BackgroundSession *session;
shm_toc_estimator e;
Size segsize;
@@ -135,8 +121,9 @@ BackgroundSessionStart(void)
BgwHandleStatus bgwstatus;
StringInfoData msg;
char msgtype;
+ MemoryContext oldcontext;
- session = palloc(sizeof(*session));
+ session = MemoryContextAlloc(context, sizeof(*session));;
session->resowner = ResourceOwnerCreate(NULL, "background session");
@@ -188,8 +175,10 @@ BackgroundSessionStart(void)
shm_toc_insert(toc, BGSESSION_KEY_RESPONSE_QUEUE, response_mq);
shm_mq_set_receiver(response_mq, MyProc);
+ oldcontext = MemoryContextSwitchTo(context);
session->command_qh = shm_mq_attach(command_mq, seg, NULL);
session->response_qh = shm_mq_attach(response_mq, seg, NULL);
+ MemoryContextSwitchTo(oldcontext);
worker.bgw_flags =
BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION;
@@ -200,16 +189,19 @@ BackgroundSessionStart(void)
worker.bgw_main_arg = UInt32GetDatum(dsm_segment_handle(seg));
worker.bgw_notify_pid = MyProcPid;
+ oldcontext = MemoryContextSwitchTo(context);
if (!RegisterDynamicBackgroundWorker(&worker, &session->worker_handle))
ereport(ERROR,
(errcode(ERRCODE_INSUFFICIENT_RESOURCES),
errmsg("could not register background process"),
errhint("You might need to increase max_worker_processes.")));
+ MemoryContextSwitchTo(oldcontext);
shm_mq_set_handle(session->command_qh, session->worker_handle);
shm_mq_set_handle(session->response_qh, session->worker_handle);
- bgwstatus = WaitForBackgroundWorkerStartup(session->worker_handle, &pid);
+ bgwstatus = WaitForBackgroundWorkerStartup(session->worker_handle,
+ &session->pid);
if (bgwstatus != BGWH_STARTED)
ereport(ERROR,
(errcode(ERRCODE_INSUFFICIENT_RESOURCES),
diff --git a/src/include/tcop/bgsession.h b/src/include/tcop/bgsession.h
index 71415a6..65dfb56 100644
--- a/src/include/tcop/bgsession.h
+++ b/src/include/tcop/bgsession.h
@@ -3,13 +3,24 @@
#include "access/tupdesc.h"
#include "nodes/pg_list.h"
-
-struct BackgroundSession;
-typedef struct BackgroundSession BackgroundSession;
+#include "storage/dsm.h"
+#include "storage/shm_mq.h"
+#include "utils/resowner.h"
struct BackgroundSessionPreparedStatement;
typedef struct BackgroundSessionPreparedStatement BackgroundSessionPreparedStatement;
+typedef struct BackgroundSession
+{
+ pid_t pid;
+ ResourceOwner resowner;
+ dsm_segment *seg;
+ BackgroundWorkerHandle *worker_handle;
+ shm_mq_handle *command_qh;
+ shm_mq_handle *response_qh;
+ int transaction_status;
+} BackgroundSession;
+
typedef struct BackgroundSessionResult
{
TupleDesc tupdesc;
@@ -17,7 +28,7 @@ typedef struct BackgroundSessionResult
const char *command;
} BackgroundSessionResult;
-BackgroundSession *BackgroundSessionStart(void);
+BackgroundSession *BackgroundSessionStart(MemoryContext oldcontext);
void BackgroundSessionEnd(BackgroundSession *session);
void BackgroundSessionSend(BackgroundSession *session, const char *sql);
diff --git a/src/pl/plpython/plpy_bgsession.c b/src/pl/plpython/plpy_bgsession.c
index 68f7207..7e6e906 100644
--- a/src/pl/plpython/plpy_bgsession.c
+++ b/src/pl/plpython/plpy_bgsession.c
@@ -110,7 +110,7 @@ PLyBackgroundSession_new(PyTypeObject *type, PyObject *args, PyObject *kw)
PyObject *result = type->tp_alloc(type, 0);
PLyBackgroundSession_Object *bgsession = (PLyBackgroundSession_Object *) result;
- bgsession->bgsession = BackgroundSessionStart();
+ bgsession->bgsession = BackgroundSessionStart(CurrentMemoryContext);
return result;
}
--
2.6.2
0002-pg_background-as-client-of-BackgroundSession-v1.patchapplication/octet-stream; name=0002-pg_background-as-client-of-BackgroundSession-v1.patchDownload
From a5811a850ebc66b6b6267afe341c3929cbb57b17 Mon Sep 17 00:00:00 2001
From: Amul Sul <sulamul@gmail.com>
Date: Fri, 6 Jan 2017 18:13:37 +0530
Subject: [PATCH 2/2] pg_background as client of BackgroundSession-v1
---
contrib/Makefile | 1 +
contrib/pg_background/Makefile | 20 ++
contrib/pg_background/expected/pg_background.out | 75 +++++
contrib/pg_background/pg_background--1.0.sql | 37 +++
contrib/pg_background/pg_background.c | 397 +++++++++++++++++++++++
contrib/pg_background/pg_background.control | 4 +
contrib/pg_background/sql/pg_background.sql | 30 ++
7 files changed, 564 insertions(+)
create mode 100644 contrib/pg_background/Makefile
create mode 100644 contrib/pg_background/expected/pg_background.out
create mode 100644 contrib/pg_background/pg_background--1.0.sql
create mode 100644 contrib/pg_background/pg_background.c
create mode 100644 contrib/pg_background/pg_background.control
create mode 100644 contrib/pg_background/sql/pg_background.sql
diff --git a/contrib/Makefile b/contrib/Makefile
index 25263c0..04ec28a 100644
--- a/contrib/Makefile
+++ b/contrib/Makefile
@@ -29,6 +29,7 @@ SUBDIRS = \
oid2name \
pageinspect \
passwordcheck \
+ pg_background \
pg_buffercache \
pg_freespacemap \
pg_prewarm \
diff --git a/contrib/pg_background/Makefile b/contrib/pg_background/Makefile
new file mode 100644
index 0000000..085fbff
--- /dev/null
+++ b/contrib/pg_background/Makefile
@@ -0,0 +1,20 @@
+# contrib/pg_background/Makefile
+
+MODULE_big = pg_background
+OBJS = pg_background.o
+
+EXTENSION = pg_background
+DATA = pg_background--1.0.sql
+
+REGRESS = pg_background
+
+ifdef USE_PGXS
+PG_CONFIG = pg_config
+PGXS := $(shell $(PG_CONFIG) --pgxs)
+include $(PGXS)
+else
+subdir = contrib/pg_background
+top_builddir = ../..
+include $(top_builddir)/src/Makefile.global
+include $(top_srcdir)/contrib/contrib-global.mk
+endif
diff --git a/contrib/pg_background/expected/pg_background.out b/contrib/pg_background/expected/pg_background.out
new file mode 100644
index 0000000..23679e6
--- /dev/null
+++ b/contrib/pg_background/expected/pg_background.out
@@ -0,0 +1,75 @@
+CREATE EXTENSION pg_background;
+--launch 6 workers which wait .0, .1, .2, .3, .4, .5 seconds respectively
+CREATE TABLE input AS
+ SELECT pg_background_start() pid, x, row_number() OVER (ORDER BY x) n
+ FROM generate_series(0,.5,0.1) x
+ ORDER BY x DESC;
+CREATE TABLE output(place int,value float);
+--display active background workers
+SELECT count(*) as active_background_workers FROM pg_background_show();
+ active_background_workers
+---------------------------
+ 6
+(1 row)
+
+--run sql query
+SELECT n as worker_num
+FROM input JOIN LATERAL pg_background_run(input.pid,
+ format($$
+ SELECT pg_sleep(%s)::text;
+ INSERT INTO output VALUES (%s, %s);
+ $$,
+ x, n, x
+ )
+ ) ON (true);
+ worker_num
+------------
+ 6
+ 5
+ 4
+ 3
+ 2
+ 1
+(6 rows)
+
+--wait until everyone finishes
+SELECT n as worker_num
+FROM input JOIN LATERAL pg_background_result(input.pid) AS (x TEXT) ON (true);
+ worker_num
+------------
+ 6
+ 5
+ 4
+ 3
+ 2
+ 1
+(6 rows)
+
+--output results
+SELECT * FROM output ORDER BY place;
+ place | value
+-------+-------
+ 1 | 0
+ 2 | 0.1
+ 3 | 0.2
+ 4 | 0.3
+ 5 | 0.4
+ 6 | 0.5
+(6 rows)
+
+--cleanup
+SELECT n as worker_num
+FROM input JOIN LATERAL pg_background_close(input.pid) ON (true);
+ worker_num
+------------
+ 6
+ 5
+ 4
+ 3
+ 2
+ 1
+(6 rows)
+
+DROP TABLE input;
+DROP TABLE output;
+DROP EXTENSION pg_background;
diff --git a/contrib/pg_background/pg_background--1.0.sql b/contrib/pg_background/pg_background--1.0.sql
new file mode 100644
index 0000000..62a9d18
--- /dev/null
+++ b/contrib/pg_background/pg_background--1.0.sql
@@ -0,0 +1,37 @@
+/* contrib/pg_background/pg_background--1.0.sql */
+
+-- complain if script is sourced in psql, rather than via CREATE EXTENSION
+\echo Use "CREATE EXTENSION pg_background" to load this file. \quit
+
+CREATE FUNCTION pg_background_show()
+ RETURNS TABLE(background_worker_id pg_catalog.int4,
+ num_of_results_waiting_to_read pg_catalog.int4) STRICT
+ AS 'MODULE_PATHNAME' LANGUAGE C;
+
+CREATE FUNCTION pg_background_start()
+ RETURNS pg_catalog.int4 STRICT
+ AS 'MODULE_PATHNAME' LANGUAGE C;
+
+CREATE FUNCTION pg_background_run(pid pg_catalog.int4, sql pg_catalog.text)
+ RETURNS pg_catalog.void STRICT
+ AS 'MODULE_PATHNAME' LANGUAGE C;
+
+CREATE FUNCTION pg_background_result(pid pg_catalog.int4)
+ RETURNS SETOF pg_catalog.record STRICT
+ AS 'MODULE_PATHNAME' LANGUAGE C;
+
+CREATE FUNCTION pg_background_close(pid pg_catalog.int4)
+ RETURNS pg_catalog.void STRICT
+ AS 'MODULE_PATHNAME' LANGUAGE C;
+
+REVOKE ALL ON FUNCTION pg_background_show()
+ FROM public;
+REVOKE ALL ON FUNCTION pg_background_start()
+ FROM public;
+REVOKE ALL ON FUNCTION pg_background_run(pid pg_catalog.int4,
+ sql pg_catalog.text)
+ FROM public;
+REVOKE ALL ON FUNCTION pg_background_result(pg_catalog.int4)
+ FROM public;
+REVOKE ALL ON FUNCTION pg_background_close(pg_catalog.int4)
+ FROM public;
diff --git a/contrib/pg_background/pg_background.c b/contrib/pg_background/pg_background.c
new file mode 100644
index 0000000..e5a5b35
--- /dev/null
+++ b/contrib/pg_background/pg_background.c
@@ -0,0 +1,397 @@
+/*--------------------------------------------------------------------------
+ *
+ * pg_background.c
+ * Run SQL commands using a background worker.
+ *
+ * Copyright (C) 2017, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ * contrib/pg_background/pg_background.c
+ *
+ * -------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "access/htup_details.h"
+#include "catalog/pg_type.h"
+#include "funcapi.h"
+#include "miscadmin.h"
+#include "tcop/bgsession.h"
+#include "utils/acl.h"
+#include "utils/builtins.h"
+#include "utils/memutils.h"
+
+/* Private state maintained by the launching backend for IPC. */
+typedef struct pg_background_worker_info
+{
+ int32 pid;
+ Oid current_user_id;
+ BackgroundSession *session;
+ uint32 result_count;
+} pg_background_worker_info;
+
+/* Private state maintained across calls to pg_background_result. */
+typedef struct pg_background_result_state
+{
+ pg_background_worker_info *info;
+ BackgroundSessionResult *result;
+} pg_background_result_state;
+
+static HTAB *worker_hash = NULL;
+
+static void remove_worker_info(int32 pid);
+static pg_background_worker_info *find_worker_info(int32 pid);
+static void save_worker_info(int32 pid, BackgroundSession *session);
+static void check_rights(pg_background_worker_info *info);
+
+PG_MODULE_MAGIC;
+
+PG_FUNCTION_INFO_V1(pg_background_show);
+PG_FUNCTION_INFO_V1(pg_background_start);
+PG_FUNCTION_INFO_V1(pg_background_run);
+PG_FUNCTION_INFO_V1(pg_background_result);
+PG_FUNCTION_INFO_V1(pg_background_close);
+
+/*
+ * Display the list of background worker previously launched in this session.
+ */
+Datum
+pg_background_show(PG_FUNCTION_ARGS)
+{
+ FuncCallContext *funcctx;
+ HASH_SEQ_STATUS *hash_seq;
+ TupleDesc tupdesc;
+ pg_background_worker_info *info;
+
+ /* First-time setup. */
+ if (SRF_IS_FIRSTCALL())
+ {
+ MemoryContext oldcontext;
+
+ funcctx = SRF_FIRSTCALL_INIT();
+ oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
+
+ /* Construct a tuple descriptor for the result rows. */
+ tupdesc = CreateTemplateTupleDesc(2, false);
+ TupleDescInitEntry(tupdesc, (AttrNumber) 1, "pid",
+ INT4OID, -1, 0);
+ TupleDescInitEntry(tupdesc, (AttrNumber) 2, "results",
+ INT4OID, -1, 0);
+
+ funcctx->tuple_desc = BlessTupleDesc(tupdesc);
+
+ if (worker_hash)
+ {
+ hash_seq = palloc(sizeof(HASH_SEQ_STATUS));
+
+ hash_seq_init(hash_seq, worker_hash);
+ funcctx->user_fctx = hash_seq;
+ }
+
+ MemoryContextSwitchTo(oldcontext);
+ }
+
+ funcctx = SRF_PERCALL_SETUP();
+ hash_seq = (HASH_SEQ_STATUS *) funcctx->user_fctx;
+
+ while (hash_seq && (info = hash_seq_search(hash_seq)) != NULL)
+ {
+ Datum values[2];
+ bool nulls[2];
+ HeapTuple tuple;
+
+ values[0] = Int32GetDatum(info->pid);
+ nulls[0] = false;
+ values[1] = Int32GetDatum(info->result_count);
+ nulls[1] = false;
+
+ /* Build and return the tuple. */
+ tuple = heap_form_tuple(funcctx->tuple_desc, values, nulls);
+ SRF_RETURN_NEXT(funcctx, HeapTupleGetDatum(tuple));
+ }
+
+ SRF_RETURN_DONE(funcctx);
+}
+
+/*
+ * Start a dynamic background worker.
+ */
+Datum
+pg_background_start(PG_FUNCTION_ARGS)
+{
+ BackgroundSession *session;
+ int32 pid;
+
+ session = BackgroundSessionStart(TopMemoryContext);
+ pid = session->pid;
+
+ /* Save worker info */
+ save_worker_info(pid, session);
+
+ /* Return the worker's PID. */
+ PG_RETURN_INT32(pid);
+}
+
+/*
+ * Run a user-specified SQL command.
+ */
+Datum
+pg_background_run(PG_FUNCTION_ARGS)
+{
+ char *sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
+ int32 pid = PG_GETARG_INT32(0);
+ pg_background_worker_info *info;
+
+ /* See if we have a connection to the specified PID. */
+ if ((info = find_worker_info(pid)) == NULL)
+ ereport(ERROR,
+ (errcode(ERRCODE_UNDEFINED_OBJECT),
+ errmsg("PID %d is not attached to this session", pid)));
+ check_rights(info);
+
+ /* Execute give SQL query */
+ BackgroundSessionSend(info->session, sql);
+ info->result_count++;
+
+ PG_RETURN_VOID();
+}
+
+/*
+ * Retrieve the results of a background query previously launched in this
+ * session.
+ */
+Datum
+pg_background_result(PG_FUNCTION_ARGS)
+{
+ int32 pid = PG_GETARG_INT32(0);
+ FuncCallContext *funcctx;
+ pg_background_result_state *state;
+ TupleDesc tupdesc;
+ BackgroundSessionResult *result;
+
+ /* First-time setup. */
+ if (SRF_IS_FIRSTCALL())
+ {
+ MemoryContext oldcontext;
+ pg_background_worker_info *info;
+
+ funcctx = SRF_FIRSTCALL_INIT();
+ oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
+
+ /* See if we have a connection to the specified PID. */
+ if ((info = find_worker_info(pid)) == NULL)
+ ereport(ERROR,
+ (errcode(ERRCODE_UNDEFINED_OBJECT),
+ errmsg("PID %d is not attached to this session", pid)));
+ check_rights(info);
+
+ /* Can't read results twice. */
+ if (info->result_count <= 0)
+ ereport(ERROR,
+ (errcode(ERRCODE_UNDEFINED_OBJECT),
+ errmsg("results for PID %d have already been consumed", pid)));
+ info->result_count--;
+
+ /* Set up tuple-descriptor based on column definition list. */
+ if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("function returning record called in context "
+ "that cannot accept type record"),
+ errhint("Try calling the function in the FROM clause "
+ "using a column definition list.")));
+ result = BackgroundSessionGetResult(info->session);
+ funcctx->tuple_desc = BlessTupleDesc(tupdesc);
+
+ if (result->tupdesc)
+ {
+ bool datatype_mismatch = false;
+
+ if (tupdesc->natts != result->tupdesc->natts)
+ datatype_mismatch = true;
+ else
+ {
+ int i;
+ for (i = 0; i < tupdesc->natts; ++i)
+ if(tupdesc->attrs[i]->atttypid !=
+ result->tupdesc->attrs[i]->atttypid)
+ {
+ datatype_mismatch = true;
+ break;
+ }
+ }
+
+ if (datatype_mismatch)
+ ereport(ERROR,
+ (errcode(ERRCODE_DATATYPE_MISMATCH),
+ errmsg("remote query result rowtype does not match the specified FROM clause rowtype")));
+ }
+
+ /* Cache state that will be needed on every call. */
+ state = palloc0(sizeof(pg_background_result_state));
+ state->info = info;
+ state->result = result;
+
+ funcctx->user_fctx = state;
+ MemoryContextSwitchTo(oldcontext);
+ }
+
+ funcctx = SRF_PERCALL_SETUP();
+ tupdesc = funcctx->tuple_desc;
+ state = funcctx->user_fctx;
+ result = state->result;
+
+ if (result->tupdesc)
+ {
+ if (result->tuples != NIL && funcctx->call_cntr > 0)
+ result->tuples = list_delete_first(result->tuples);
+
+ if (result->tuples != NIL)
+ {
+ HeapTuple tuple = (HeapTuple) linitial(result->tuples);
+
+ /*
+ * Set the tuple type ID information fields correctly because
+ * BackgroundSessionFetchResult returns it as an anonymous record
+ * type.
+ */
+ HeapTupleHeaderSetTypeId(tuple->t_data, tupdesc->tdtypeid);
+ HeapTupleHeaderSetTypMod(tuple->t_data, tupdesc->tdtypmod);
+
+ SRF_RETURN_NEXT(funcctx, HeapTupleGetDatum(tuple));
+ }
+ }
+ else /* If no data rows, return the command tags instead. */
+ {
+ if (tupdesc->natts != 1 || tupdesc->attrs[0]->atttypid != TEXTOID)
+ ereport(ERROR,
+ (errcode(ERRCODE_DATATYPE_MISMATCH),
+ errmsg("remote query did not return a result set, but result rowtype is not a single text column")));
+
+ if (result->command != NULL)
+ {
+ bool isnull = false;
+ Datum value = PointerGetDatum(cstring_to_text(result->command));
+ HeapTuple tuple = heap_form_tuple(tupdesc, &value, &isnull);
+
+ result->command = NULL;
+
+ SRF_RETURN_NEXT(funcctx, HeapTupleGetDatum(tuple));
+ }
+ }
+
+ SRF_RETURN_DONE(funcctx);
+}
+
+/*
+ * End background session and remove hashtable entry.
+ */
+Datum
+pg_background_close(PG_FUNCTION_ARGS)
+{
+ int32 pid = PG_GETARG_INT32(0);
+ pg_background_worker_info *info;
+
+ info = find_worker_info(pid);
+ if (info == NULL)
+ ereport(ERROR,
+ (errcode(ERRCODE_UNDEFINED_OBJECT),
+ errmsg("PID %d is not attached to this session", pid)));
+
+ check_rights(info);
+ remove_worker_info(pid);
+ BackgroundSessionEnd(info->session);
+
+ PG_RETURN_VOID();
+}
+
+static void
+remove_worker_info(int32 pid)
+{
+ bool found;
+
+ /* Remove the hashtable entry. */
+ hash_search(worker_hash, (void *) &pid, HASH_REMOVE, &found);
+ if (!found)
+ elog(ERROR, "pg_background worker_hash table corrupted");
+}
+
+/*
+ * Find the background worker information for the worker with a given PID.
+ */
+static pg_background_worker_info *
+find_worker_info(int32 pid)
+{
+ pg_background_worker_info *info = NULL;
+
+ if (worker_hash != NULL)
+ info = hash_search(worker_hash, (void *) &pid, HASH_FIND, NULL);
+
+ return info;
+}
+
+/*
+ * Save worker info.
+ */
+static void
+save_worker_info(int32 pid, BackgroundSession *session)
+{
+ pg_background_worker_info *info;
+ Oid current_user_id;
+ int sec_context;
+
+ /* If the hash table hasn't been set up yet, do that now. */
+ if (worker_hash == NULL)
+ {
+ HASHCTL ctl;
+
+ ctl.keysize = sizeof(int32);
+ ctl.entrysize = sizeof(pg_background_worker_info);
+ worker_hash = hash_create("pg_background worker_hash", 8, &ctl,
+ HASH_ELEM);
+ }
+
+ /* Get current authentication information. */
+ GetUserIdAndSecContext(¤t_user_id, &sec_context);
+
+ /*
+ * In the unlikely event that there's an older worker with this PID,
+ * just detach it - unless it has a different user ID than the
+ * currently-active one, in which case someone might be trying to pull
+ * a fast one. Let's kill the backend to make sure we don't break
+ * anyone's expectations.
+ */
+ if ((info = find_worker_info(pid)) != NULL)
+ {
+ if (current_user_id != info->current_user_id)
+ ereport(FATAL,
+ (errcode(ERRCODE_DUPLICATE_OBJECT),
+ errmsg("background worker with PID \"%d\" already exists",
+ pid)));
+ }
+
+ /* Create a new entry for this worker. */
+ info = hash_search(worker_hash, (void *) &pid, HASH_ENTER, NULL);
+ info->session = session;
+ info->result_count = 0;
+ info->current_user_id = current_user_id;
+}
+
+/*
+ * Check whether the current user has rights to manipulate the background
+ * worker with the given PID.
+ */
+static void
+check_rights(pg_background_worker_info *info)
+{
+ Oid current_user_id;
+ int sec_context;
+
+ GetUserIdAndSecContext(¤t_user_id, &sec_context);
+ if (!has_privs_of_role(current_user_id, info->current_user_id))
+ ereport(ERROR,
+ (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
+ errmsg("permission denied for background worker with PID \"%d\"",
+ info->pid)));
+}
diff --git a/contrib/pg_background/pg_background.control b/contrib/pg_background/pg_background.control
new file mode 100644
index 0000000..733d0e1
--- /dev/null
+++ b/contrib/pg_background/pg_background.control
@@ -0,0 +1,4 @@
+comment = 'Run SQL queries in the background'
+default_version = '1.0'
+module_pathname = '$libdir/pg_background'
+relocatable = true
diff --git a/contrib/pg_background/sql/pg_background.sql b/contrib/pg_background/sql/pg_background.sql
new file mode 100644
index 0000000..be49dd5
--- /dev/null
+++ b/contrib/pg_background/sql/pg_background.sql
@@ -0,0 +1,30 @@
+CREATE EXTENSION pg_background;
+--launch 6 workers which wait .0, .1, .2, .3, .4, .5 seconds respectively
+CREATE TABLE input AS
+ SELECT pg_background_start() pid, x, row_number() OVER (ORDER BY x) n
+ FROM generate_series(0,.5,0.1) x
+ ORDER BY x DESC;
+CREATE TABLE output(place int,value float);
+--display active background workers
+SELECT count(*) as active_background_workers FROM pg_background_show();
+--run sql query
+SELECT n as worker_num
+FROM input JOIN LATERAL pg_background_run(input.pid,
+ format($$
+ SELECT pg_sleep(%s)::text;
+ INSERT INTO output VALUES (%s, %s);
+ $$,
+ x, n, x
+ )
+ ) ON (true);
+--wait until everyone finishes
+SELECT n as worker_num
+FROM input JOIN LATERAL pg_background_result(input.pid) AS (x TEXT) ON (true);
+--output results
+SELECT * FROM output ORDER BY place;
+--cleanup
+SELECT n as worker_num
+FROM input JOIN LATERAL pg_background_close(input.pid) ON (true);
+DROP TABLE input;
+DROP TABLE output;
+DROP EXTENSION pg_background;
--
2.6.2
Hi!
2017-01-07 11:44 GMT+05:00 amul sul <sulamul@gmail.com>:
Changes:
1. pg_background_launch renamed to pg_background_start
2. pg_background_detach renamed to pg_background_close
3. Added new api to display previously launch background worker & its
result count waiting to be read.
Great work!
Note that attached patches need to apply to the top of the Peter
Eisentraut's v2 patch[1].
I've looked a bit into that patch. I thought you would switch
MemoryContext before calling BackgroundSessionStart() from
pg_background? Have I got something wrong?
Best regards, Andrey Borodin.
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On Sat, Jan 7, 2017 at 2:06 PM, Andrew Borodin <borodin@octonica.com> wrote:
Hi!
2017-01-07 11:44 GMT+05:00 amul sul <sulamul@gmail.com>:
Changes:
1. pg_background_launch renamed to pg_background_start
2. pg_background_detach renamed to pg_background_close
3. Added new api to display previously launch background worker & its
result count waiting to be read.Great work!
Thanks.
Note that attached patches need to apply to the top of the Peter
Eisentraut's v2 patch[1].I've looked a bit into that patch. I thought you would switch
MemoryContext before calling BackgroundSessionStart() from
pg_background? Have I got something wrong?
Hmm, understood your point, let pg_background extension do the
required context setup, attached version patch does that change.
Thanks.
Regards,
Amul
Attachments:
0001-Changes-to-Peter-Eisentraut-s-bgsession-v2-patch.patchapplication/octet-stream; name=0001-Changes-to-Peter-Eisentraut-s-bgsession-v2-patch.patchDownload
From fcf10d1741a236012c0bab327f535fc5277071d9 Mon Sep 17 00:00:00 2001
From: Amul Sul <sulamul@gmail.com>
Date: Sat, 7 Jan 2017 18:51:51 +0530
Subject: [PATCH 1/2] Changes to Peter Eisentraut's bgsession v2 patch
---
src/backend/tcop/bgsession.c | 17 ++---------------
src/include/tcop/bgsession.h | 17 ++++++++++++++---
2 files changed, 16 insertions(+), 18 deletions(-)
diff --git a/src/backend/tcop/bgsession.c b/src/backend/tcop/bgsession.c
index 486819b..510e740 100644
--- a/src/backend/tcop/bgsession.c
+++ b/src/backend/tcop/bgsession.c
@@ -60,14 +60,11 @@
#include "nodes/pg_list.h"
#include "pgstat.h"
#include "postmaster/bgworker.h"
-#include "storage/dsm.h"
-#include "storage/shm_mq.h"
#include "storage/shm_toc.h"
#include "tcop/bgsession.h"
#include "tcop/tcopprot.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
-#include "utils/resowner.h"
/* Table-of-contents constants for our dynamic shared memory segment. */
#define BGSESSION_MAGIC 0x50674267
@@ -89,16 +86,6 @@ typedef struct background_session_fixed_data
int sec_context;
} background_session_fixed_data;
-struct BackgroundSession
-{
- ResourceOwner resowner;
- dsm_segment *seg;
- BackgroundWorkerHandle *worker_handle;
- shm_mq_handle *command_qh;
- shm_mq_handle *response_qh;
- int transaction_status;
-};
-
struct BackgroundSessionPreparedStatement
{
BackgroundSession *session;
@@ -121,7 +108,6 @@ BackgroundSessionStart(void)
{
ResourceOwner oldowner;
BackgroundWorker worker;
- pid_t pid;
BackgroundSession *session;
shm_toc_estimator e;
Size segsize;
@@ -209,7 +195,8 @@ BackgroundSessionStart(void)
shm_mq_set_handle(session->command_qh, session->worker_handle);
shm_mq_set_handle(session->response_qh, session->worker_handle);
- bgwstatus = WaitForBackgroundWorkerStartup(session->worker_handle, &pid);
+ bgwstatus = WaitForBackgroundWorkerStartup(session->worker_handle,
+ &session->pid);
if (bgwstatus != BGWH_STARTED)
ereport(ERROR,
(errcode(ERRCODE_INSUFFICIENT_RESOURCES),
diff --git a/src/include/tcop/bgsession.h b/src/include/tcop/bgsession.h
index 71415a6..008fa28 100644
--- a/src/include/tcop/bgsession.h
+++ b/src/include/tcop/bgsession.h
@@ -3,13 +3,24 @@
#include "access/tupdesc.h"
#include "nodes/pg_list.h"
-
-struct BackgroundSession;
-typedef struct BackgroundSession BackgroundSession;
+#include "storage/dsm.h"
+#include "storage/shm_mq.h"
+#include "utils/resowner.h"
struct BackgroundSessionPreparedStatement;
typedef struct BackgroundSessionPreparedStatement BackgroundSessionPreparedStatement;
+typedef struct BackgroundSession
+{
+ pid_t pid;
+ ResourceOwner resowner;
+ dsm_segment *seg;
+ BackgroundWorkerHandle *worker_handle;
+ shm_mq_handle *command_qh;
+ shm_mq_handle *response_qh;
+ int transaction_status;
+} BackgroundSession;
+
typedef struct BackgroundSessionResult
{
TupleDesc tupdesc;
--
2.6.2
0002-pg_background-as-client-of-BackgroundSession-v2.patchapplication/octet-stream; name=0002-pg_background-as-client-of-BackgroundSession-v2.patchDownload
From e72e4fa60d9f489c33411f83e9e34dc8b202474a Mon Sep 17 00:00:00 2001
From: Amul Sul <sulamul@gmail.com>
Date: Sat, 7 Jan 2017 19:20:07 +0530
Subject: [PATCH 2/2] pg_background as client of BackgroundSession-v2
---
contrib/Makefile | 1 +
contrib/pg_background/Makefile | 20 ++
contrib/pg_background/expected/pg_background.out | 75 +++++
contrib/pg_background/pg_background--1.0.sql | 37 +++
contrib/pg_background/pg_background.c | 400 +++++++++++++++++++++++
contrib/pg_background/pg_background.control | 4 +
contrib/pg_background/sql/pg_background.sql | 30 ++
7 files changed, 567 insertions(+)
create mode 100644 contrib/pg_background/Makefile
create mode 100644 contrib/pg_background/expected/pg_background.out
create mode 100644 contrib/pg_background/pg_background--1.0.sql
create mode 100644 contrib/pg_background/pg_background.c
create mode 100644 contrib/pg_background/pg_background.control
create mode 100644 contrib/pg_background/sql/pg_background.sql
diff --git a/contrib/Makefile b/contrib/Makefile
index 25263c0..04ec28a 100644
--- a/contrib/Makefile
+++ b/contrib/Makefile
@@ -29,6 +29,7 @@ SUBDIRS = \
oid2name \
pageinspect \
passwordcheck \
+ pg_background \
pg_buffercache \
pg_freespacemap \
pg_prewarm \
diff --git a/contrib/pg_background/Makefile b/contrib/pg_background/Makefile
new file mode 100644
index 0000000..085fbff
--- /dev/null
+++ b/contrib/pg_background/Makefile
@@ -0,0 +1,20 @@
+# contrib/pg_background/Makefile
+
+MODULE_big = pg_background
+OBJS = pg_background.o
+
+EXTENSION = pg_background
+DATA = pg_background--1.0.sql
+
+REGRESS = pg_background
+
+ifdef USE_PGXS
+PG_CONFIG = pg_config
+PGXS := $(shell $(PG_CONFIG) --pgxs)
+include $(PGXS)
+else
+subdir = contrib/pg_background
+top_builddir = ../..
+include $(top_builddir)/src/Makefile.global
+include $(top_srcdir)/contrib/contrib-global.mk
+endif
diff --git a/contrib/pg_background/expected/pg_background.out b/contrib/pg_background/expected/pg_background.out
new file mode 100644
index 0000000..23679e6
--- /dev/null
+++ b/contrib/pg_background/expected/pg_background.out
@@ -0,0 +1,75 @@
+CREATE EXTENSION pg_background;
+--launch 6 workers which wait .0, .1, .2, .3, .4, .5 seconds respectively
+CREATE TABLE input AS
+ SELECT pg_background_start() pid, x, row_number() OVER (ORDER BY x) n
+ FROM generate_series(0,.5,0.1) x
+ ORDER BY x DESC;
+CREATE TABLE output(place int,value float);
+--display active background workers
+SELECT count(*) as active_background_workers FROM pg_background_show();
+ active_background_workers
+---------------------------
+ 6
+(1 row)
+
+--run sql query
+SELECT n as worker_num
+FROM input JOIN LATERAL pg_background_run(input.pid,
+ format($$
+ SELECT pg_sleep(%s)::text;
+ INSERT INTO output VALUES (%s, %s);
+ $$,
+ x, n, x
+ )
+ ) ON (true);
+ worker_num
+------------
+ 6
+ 5
+ 4
+ 3
+ 2
+ 1
+(6 rows)
+
+--wait until everyone finishes
+SELECT n as worker_num
+FROM input JOIN LATERAL pg_background_result(input.pid) AS (x TEXT) ON (true);
+ worker_num
+------------
+ 6
+ 5
+ 4
+ 3
+ 2
+ 1
+(6 rows)
+
+--output results
+SELECT * FROM output ORDER BY place;
+ place | value
+-------+-------
+ 1 | 0
+ 2 | 0.1
+ 3 | 0.2
+ 4 | 0.3
+ 5 | 0.4
+ 6 | 0.5
+(6 rows)
+
+--cleanup
+SELECT n as worker_num
+FROM input JOIN LATERAL pg_background_close(input.pid) ON (true);
+ worker_num
+------------
+ 6
+ 5
+ 4
+ 3
+ 2
+ 1
+(6 rows)
+
+DROP TABLE input;
+DROP TABLE output;
+DROP EXTENSION pg_background;
diff --git a/contrib/pg_background/pg_background--1.0.sql b/contrib/pg_background/pg_background--1.0.sql
new file mode 100644
index 0000000..62a9d18
--- /dev/null
+++ b/contrib/pg_background/pg_background--1.0.sql
@@ -0,0 +1,37 @@
+/* contrib/pg_background/pg_background--1.0.sql */
+
+-- complain if script is sourced in psql, rather than via CREATE EXTENSION
+\echo Use "CREATE EXTENSION pg_background" to load this file. \quit
+
+CREATE FUNCTION pg_background_show()
+ RETURNS TABLE(background_worker_id pg_catalog.int4,
+ num_of_results_waiting_to_read pg_catalog.int4) STRICT
+ AS 'MODULE_PATHNAME' LANGUAGE C;
+
+CREATE FUNCTION pg_background_start()
+ RETURNS pg_catalog.int4 STRICT
+ AS 'MODULE_PATHNAME' LANGUAGE C;
+
+CREATE FUNCTION pg_background_run(pid pg_catalog.int4, sql pg_catalog.text)
+ RETURNS pg_catalog.void STRICT
+ AS 'MODULE_PATHNAME' LANGUAGE C;
+
+CREATE FUNCTION pg_background_result(pid pg_catalog.int4)
+ RETURNS SETOF pg_catalog.record STRICT
+ AS 'MODULE_PATHNAME' LANGUAGE C;
+
+CREATE FUNCTION pg_background_close(pid pg_catalog.int4)
+ RETURNS pg_catalog.void STRICT
+ AS 'MODULE_PATHNAME' LANGUAGE C;
+
+REVOKE ALL ON FUNCTION pg_background_show()
+ FROM public;
+REVOKE ALL ON FUNCTION pg_background_start()
+ FROM public;
+REVOKE ALL ON FUNCTION pg_background_run(pid pg_catalog.int4,
+ sql pg_catalog.text)
+ FROM public;
+REVOKE ALL ON FUNCTION pg_background_result(pg_catalog.int4)
+ FROM public;
+REVOKE ALL ON FUNCTION pg_background_close(pg_catalog.int4)
+ FROM public;
diff --git a/contrib/pg_background/pg_background.c b/contrib/pg_background/pg_background.c
new file mode 100644
index 0000000..d8aefab
--- /dev/null
+++ b/contrib/pg_background/pg_background.c
@@ -0,0 +1,400 @@
+/*--------------------------------------------------------------------------
+ *
+ * pg_background.c
+ * Run SQL commands using a background worker.
+ *
+ * Copyright (C) 2017, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ * contrib/pg_background/pg_background.c
+ *
+ * -------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "access/htup_details.h"
+#include "catalog/pg_type.h"
+#include "funcapi.h"
+#include "miscadmin.h"
+#include "tcop/bgsession.h"
+#include "utils/acl.h"
+#include "utils/builtins.h"
+#include "utils/memutils.h"
+
+/* Private state maintained by the launching backend for IPC. */
+typedef struct pg_background_worker_info
+{
+ int32 pid;
+ Oid current_user_id;
+ BackgroundSession *session;
+ uint32 result_count;
+} pg_background_worker_info;
+
+/* Private state maintained across calls to pg_background_result. */
+typedef struct pg_background_result_state
+{
+ pg_background_worker_info *info;
+ BackgroundSessionResult *result;
+} pg_background_result_state;
+
+static HTAB *worker_hash = NULL;
+
+static void remove_worker_info(int32 pid);
+static pg_background_worker_info *find_worker_info(int32 pid);
+static void save_worker_info(int32 pid, BackgroundSession *session);
+static void check_rights(pg_background_worker_info *info);
+
+PG_MODULE_MAGIC;
+
+PG_FUNCTION_INFO_V1(pg_background_show);
+PG_FUNCTION_INFO_V1(pg_background_start);
+PG_FUNCTION_INFO_V1(pg_background_run);
+PG_FUNCTION_INFO_V1(pg_background_result);
+PG_FUNCTION_INFO_V1(pg_background_close);
+
+/*
+ * Display the list of background worker previously launched in this session.
+ */
+Datum
+pg_background_show(PG_FUNCTION_ARGS)
+{
+ FuncCallContext *funcctx;
+ HASH_SEQ_STATUS *hash_seq;
+ TupleDesc tupdesc;
+ pg_background_worker_info *info;
+
+ /* First-time setup. */
+ if (SRF_IS_FIRSTCALL())
+ {
+ MemoryContext oldcontext;
+
+ funcctx = SRF_FIRSTCALL_INIT();
+ oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
+
+ /* Construct a tuple descriptor for the result rows. */
+ tupdesc = CreateTemplateTupleDesc(2, false);
+ TupleDescInitEntry(tupdesc, (AttrNumber) 1, "pid",
+ INT4OID, -1, 0);
+ TupleDescInitEntry(tupdesc, (AttrNumber) 2, "results",
+ INT4OID, -1, 0);
+
+ funcctx->tuple_desc = BlessTupleDesc(tupdesc);
+
+ if (worker_hash)
+ {
+ hash_seq = palloc(sizeof(HASH_SEQ_STATUS));
+
+ hash_seq_init(hash_seq, worker_hash);
+ funcctx->user_fctx = hash_seq;
+ }
+
+ MemoryContextSwitchTo(oldcontext);
+ }
+
+ funcctx = SRF_PERCALL_SETUP();
+ hash_seq = (HASH_SEQ_STATUS *) funcctx->user_fctx;
+
+ while (hash_seq && (info = hash_seq_search(hash_seq)) != NULL)
+ {
+ Datum values[2];
+ bool nulls[2];
+ HeapTuple tuple;
+
+ values[0] = Int32GetDatum(info->pid);
+ nulls[0] = false;
+ values[1] = Int32GetDatum(info->result_count);
+ nulls[1] = false;
+
+ /* Build and return the tuple. */
+ tuple = heap_form_tuple(funcctx->tuple_desc, values, nulls);
+ SRF_RETURN_NEXT(funcctx, HeapTupleGetDatum(tuple));
+ }
+
+ SRF_RETURN_DONE(funcctx);
+}
+
+/*
+ * Start a dynamic background worker.
+ */
+Datum
+pg_background_start(PG_FUNCTION_ARGS)
+{
+ BackgroundSession *session;
+ int32 pid;
+ MemoryContext oldcontext;
+
+ oldcontext = MemoryContextSwitchTo(TopMemoryContext);
+ session = BackgroundSessionStart();
+ MemoryContextSwitchTo(oldcontext);
+ pid = session->pid;
+
+ /* Save worker info */
+ save_worker_info(pid, session);
+
+ /* Return the worker's PID. */
+ PG_RETURN_INT32(pid);
+}
+
+/*
+ * Run a user-specified SQL command.
+ */
+Datum
+pg_background_run(PG_FUNCTION_ARGS)
+{
+ char *sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
+ int32 pid = PG_GETARG_INT32(0);
+ pg_background_worker_info *info;
+
+ /* See if we have a connection to the specified PID. */
+ if ((info = find_worker_info(pid)) == NULL)
+ ereport(ERROR,
+ (errcode(ERRCODE_UNDEFINED_OBJECT),
+ errmsg("PID %d is not attached to this session", pid)));
+ check_rights(info);
+
+ /* Execute give SQL query */
+ BackgroundSessionSend(info->session, sql);
+ info->result_count++;
+
+ PG_RETURN_VOID();
+}
+
+/*
+ * Retrieve the results of a background query previously launched in this
+ * session.
+ */
+Datum
+pg_background_result(PG_FUNCTION_ARGS)
+{
+ int32 pid = PG_GETARG_INT32(0);
+ FuncCallContext *funcctx;
+ pg_background_result_state *state;
+ TupleDesc tupdesc;
+ BackgroundSessionResult *result;
+
+ /* First-time setup. */
+ if (SRF_IS_FIRSTCALL())
+ {
+ MemoryContext oldcontext;
+ pg_background_worker_info *info;
+
+ funcctx = SRF_FIRSTCALL_INIT();
+ oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
+
+ /* See if we have a connection to the specified PID. */
+ if ((info = find_worker_info(pid)) == NULL)
+ ereport(ERROR,
+ (errcode(ERRCODE_UNDEFINED_OBJECT),
+ errmsg("PID %d is not attached to this session", pid)));
+ check_rights(info);
+
+ /* Can't read results twice. */
+ if (info->result_count <= 0)
+ ereport(ERROR,
+ (errcode(ERRCODE_UNDEFINED_OBJECT),
+ errmsg("results for PID %d have already been consumed", pid)));
+ info->result_count--;
+
+ /* Set up tuple-descriptor based on column definition list. */
+ if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("function returning record called in context "
+ "that cannot accept type record"),
+ errhint("Try calling the function in the FROM clause "
+ "using a column definition list.")));
+ result = BackgroundSessionGetResult(info->session);
+ funcctx->tuple_desc = BlessTupleDesc(tupdesc);
+
+ if (result->tupdesc)
+ {
+ bool datatype_mismatch = false;
+
+ if (tupdesc->natts != result->tupdesc->natts)
+ datatype_mismatch = true;
+ else
+ {
+ int i;
+ for (i = 0; i < tupdesc->natts; ++i)
+ if(tupdesc->attrs[i]->atttypid !=
+ result->tupdesc->attrs[i]->atttypid)
+ {
+ datatype_mismatch = true;
+ break;
+ }
+ }
+
+ if (datatype_mismatch)
+ ereport(ERROR,
+ (errcode(ERRCODE_DATATYPE_MISMATCH),
+ errmsg("remote query result rowtype does not match the specified FROM clause rowtype")));
+ }
+
+ /* Cache state that will be needed on every call. */
+ state = palloc0(sizeof(pg_background_result_state));
+ state->info = info;
+ state->result = result;
+
+ funcctx->user_fctx = state;
+ MemoryContextSwitchTo(oldcontext);
+ }
+
+ funcctx = SRF_PERCALL_SETUP();
+ tupdesc = funcctx->tuple_desc;
+ state = funcctx->user_fctx;
+ result = state->result;
+
+ if (result->tupdesc)
+ {
+ if (result->tuples != NIL && funcctx->call_cntr > 0)
+ result->tuples = list_delete_first(result->tuples);
+
+ if (result->tuples != NIL)
+ {
+ HeapTuple tuple = (HeapTuple) linitial(result->tuples);
+
+ /*
+ * Set the tuple type ID information fields correctly because
+ * BackgroundSessionFetchResult returns it as an anonymous record
+ * type.
+ */
+ HeapTupleHeaderSetTypeId(tuple->t_data, tupdesc->tdtypeid);
+ HeapTupleHeaderSetTypMod(tuple->t_data, tupdesc->tdtypmod);
+
+ SRF_RETURN_NEXT(funcctx, HeapTupleGetDatum(tuple));
+ }
+ }
+ else /* If no data rows, return the command tags instead. */
+ {
+ if (tupdesc->natts != 1 || tupdesc->attrs[0]->atttypid != TEXTOID)
+ ereport(ERROR,
+ (errcode(ERRCODE_DATATYPE_MISMATCH),
+ errmsg("remote query did not return a result set, but result rowtype is not a single text column")));
+
+ if (result->command != NULL)
+ {
+ bool isnull = false;
+ Datum value = PointerGetDatum(cstring_to_text(result->command));
+ HeapTuple tuple = heap_form_tuple(tupdesc, &value, &isnull);
+
+ result->command = NULL;
+
+ SRF_RETURN_NEXT(funcctx, HeapTupleGetDatum(tuple));
+ }
+ }
+
+ SRF_RETURN_DONE(funcctx);
+}
+
+/*
+ * End background session and remove hashtable entry.
+ */
+Datum
+pg_background_close(PG_FUNCTION_ARGS)
+{
+ int32 pid = PG_GETARG_INT32(0);
+ pg_background_worker_info *info;
+
+ info = find_worker_info(pid);
+ if (info == NULL)
+ ereport(ERROR,
+ (errcode(ERRCODE_UNDEFINED_OBJECT),
+ errmsg("PID %d is not attached to this session", pid)));
+
+ check_rights(info);
+ remove_worker_info(pid);
+ BackgroundSessionEnd(info->session);
+
+ PG_RETURN_VOID();
+}
+
+static void
+remove_worker_info(int32 pid)
+{
+ bool found;
+
+ /* Remove the hashtable entry. */
+ hash_search(worker_hash, (void *) &pid, HASH_REMOVE, &found);
+ if (!found)
+ elog(ERROR, "pg_background worker_hash table corrupted");
+}
+
+/*
+ * Find the background worker information for the worker with a given PID.
+ */
+static pg_background_worker_info *
+find_worker_info(int32 pid)
+{
+ pg_background_worker_info *info = NULL;
+
+ if (worker_hash != NULL)
+ info = hash_search(worker_hash, (void *) &pid, HASH_FIND, NULL);
+
+ return info;
+}
+
+/*
+ * Save worker info.
+ */
+static void
+save_worker_info(int32 pid, BackgroundSession *session)
+{
+ pg_background_worker_info *info;
+ Oid current_user_id;
+ int sec_context;
+
+ /* If the hash table hasn't been set up yet, do that now. */
+ if (worker_hash == NULL)
+ {
+ HASHCTL ctl;
+
+ ctl.keysize = sizeof(int32);
+ ctl.entrysize = sizeof(pg_background_worker_info);
+ worker_hash = hash_create("pg_background worker_hash", 8, &ctl,
+ HASH_ELEM);
+ }
+
+ /* Get current authentication information. */
+ GetUserIdAndSecContext(¤t_user_id, &sec_context);
+
+ /*
+ * In the unlikely event that there's an older worker with this PID,
+ * just detach it - unless it has a different user ID than the
+ * currently-active one, in which case someone might be trying to pull
+ * a fast one. Let's kill the backend to make sure we don't break
+ * anyone's expectations.
+ */
+ if ((info = find_worker_info(pid)) != NULL)
+ {
+ if (current_user_id != info->current_user_id)
+ ereport(FATAL,
+ (errcode(ERRCODE_DUPLICATE_OBJECT),
+ errmsg("background worker with PID \"%d\" already exists",
+ pid)));
+ }
+
+ /* Create a new entry for this worker. */
+ info = hash_search(worker_hash, (void *) &pid, HASH_ENTER, NULL);
+ info->session = session;
+ info->result_count = 0;
+ info->current_user_id = current_user_id;
+}
+
+/*
+ * Check whether the current user has rights to manipulate the background
+ * worker with the given PID.
+ */
+static void
+check_rights(pg_background_worker_info *info)
+{
+ Oid current_user_id;
+ int sec_context;
+
+ GetUserIdAndSecContext(¤t_user_id, &sec_context);
+ if (!has_privs_of_role(current_user_id, info->current_user_id))
+ ereport(ERROR,
+ (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
+ errmsg("permission denied for background worker with PID \"%d\"",
+ info->pid)));
+}
diff --git a/contrib/pg_background/pg_background.control b/contrib/pg_background/pg_background.control
new file mode 100644
index 0000000..733d0e1
--- /dev/null
+++ b/contrib/pg_background/pg_background.control
@@ -0,0 +1,4 @@
+comment = 'Run SQL queries in the background'
+default_version = '1.0'
+module_pathname = '$libdir/pg_background'
+relocatable = true
diff --git a/contrib/pg_background/sql/pg_background.sql b/contrib/pg_background/sql/pg_background.sql
new file mode 100644
index 0000000..be49dd5
--- /dev/null
+++ b/contrib/pg_background/sql/pg_background.sql
@@ -0,0 +1,30 @@
+CREATE EXTENSION pg_background;
+--launch 6 workers which wait .0, .1, .2, .3, .4, .5 seconds respectively
+CREATE TABLE input AS
+ SELECT pg_background_start() pid, x, row_number() OVER (ORDER BY x) n
+ FROM generate_series(0,.5,0.1) x
+ ORDER BY x DESC;
+CREATE TABLE output(place int,value float);
+--display active background workers
+SELECT count(*) as active_background_workers FROM pg_background_show();
+--run sql query
+SELECT n as worker_num
+FROM input JOIN LATERAL pg_background_run(input.pid,
+ format($$
+ SELECT pg_sleep(%s)::text;
+ INSERT INTO output VALUES (%s, %s);
+ $$,
+ x, n, x
+ )
+ ) ON (true);
+--wait until everyone finishes
+SELECT n as worker_num
+FROM input JOIN LATERAL pg_background_result(input.pid) AS (x TEXT) ON (true);
+--output results
+SELECT * FROM output ORDER BY place;
+--cleanup
+SELECT n as worker_num
+FROM input JOIN LATERAL pg_background_close(input.pid) ON (true);
+DROP TABLE input;
+DROP TABLE output;
+DROP EXTENSION pg_background;
--
2.6.2
On 12/22/16 4:30 PM, Robert Haas wrote:
On Thu, Dec 22, 2016 at 4:41 PM, Jim Nasby <Jim.Nasby@bluetreble.com> wrote:
On 12/22/16 4:20 AM, amul sul wrote:
• pg_background_detach : This API takes the process id and detach the
background process. Stored worker's session is not dropped until this
called.When I hear "detach" I think that whatever I'm detaching from is going to
stick around, which I don't think is the case here, right? I'd suggest
pg_background_close() instead.Uh, I think it is. At least in the original version of this patch,
pg_background_detach() leaves the spawned process running but says
that you don't care to read any results it may generate.
Oh, hmm. So I guess if you do that when the background process is idle
it's the same as a close?
I think we need some way to safeguard against accidental forkbombs for
cases where users aren't intending to leave something running in the
background. There's other reasons to use this besides spawning long
running processes, and I'd certainly want to be able to ensure the
calling function wasn't accidentally leaving things running that it
didn't mean to. (Maybe the patch already does this...)
--
Jim Nasby, Data Architect, Blue Treble Consulting, Austin TX
Experts in Analytics, Data Architecture and PostgreSQL
Data in Trouble? Get it in Treble! http://BlueTreble.com
855-TREBLE2 (855-873-2532)
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On Sun, Jan 8, 2017 at 8:50 AM, Jim Nasby <Jim.Nasby@bluetreble.com> wrote:
[skipped...]
Oh, hmm. So I guess if you do that when the background process is idle it's
the same as a close?I think we need some way to safeguard against accidental forkbombs for cases
where users aren't intending to leave something running in the background.
There's other reasons to use this besides spawning long running processes,
and I'd certainly want to be able to ensure the calling function wasn't
accidentally leaving things running that it didn't mean to. (Maybe the patch
already does this...)
Current pg_background patch built to the top of BackgroundSession code
take care of that;
user need to call pg_background_close() to gracefully close previously
forked background
worker. Even though if user session who forked this worker exited
without calling
pg_background_close(), this background worked force to exit with following log:
ERROR: could not read from message queue: Other process has detached queue
LOG: could not send on message queue: Other process has detached queue
LOG: worker process: background session by PID 61242 (PID 61358)
exited with exit code 1
Does this make sense to you?
Regards,
Amul
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On 1/9/17 7:22 AM, amul sul wrote:
On Sun, Jan 8, 2017 at 8:50 AM, Jim Nasby <Jim.Nasby@bluetreble.com> wrote:
[skipped...]
Oh, hmm. So I guess if you do that when the background process is idle it's
the same as a close?I think we need some way to safeguard against accidental forkbombs for cases
where users aren't intending to leave something running in the background.
There's other reasons to use this besides spawning long running processes,
and I'd certainly want to be able to ensure the calling function wasn't
accidentally leaving things running that it didn't mean to. (Maybe the patch
already does this...)Current pg_background patch built to the top of BackgroundSession code
take care of that;
user need to call pg_background_close() to gracefully close previously
forked background
worker. Even though if user session who forked this worker exited
without calling
pg_background_close(), this background worked force to exit with following log:ERROR: could not read from message queue: Other process has detached queue
LOG: could not send on message queue: Other process has detached queue
LOG: worker process: background session by PID 61242 (PID 61358)
exited with exit code 1Does this make sense to you?
I'm specifically thinking of autonomous transactions, where you do NOT
want to run the command in the background, but you do want to run it in
another connection.
The other example is running a command in another database. Not the same
as the autonomous transaction use case, but once again you likely don't
want that command running in the background.
Put another way, when you launch a new process in a shell script, you
have to do something specific for that process to run in the background.
My concern here is that AIUI the only way to launch a bgworker is with
it already backgrounded. That makes it much more likely that a bug in
your code produces an unintended fork-bomb (connection-bomb?). That
would be especially true if the function was re-entrant.
Since one of the major points of bgworkers is exactly to run stuff in
the background, while the parent connection is doing something else, I
can certainly understand the default case being to background something,
but I think it'd be good to have a simple way to start a bgworker, have
it do something, and wait until it's done.
Make sense?
--
Jim Nasby, Data Architect, Blue Treble Consulting, Austin TX
Experts in Analytics, Data Architecture and PostgreSQL
Data in Trouble? Get it in Treble! http://BlueTreble.com
855-TREBLE2 (855-873-2532)
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On Tue, Jan 10, 2017 at 7:09 AM, Jim Nasby <Jim.Nasby@bluetreble.com> wrote:
On 1/9/17 7:22 AM, amul sul wrote:
On Sun, Jan 8, 2017 at 8:50 AM, Jim Nasby <Jim.Nasby@bluetreble.com>
wrote:[skipped...]
Oh, hmm. So I guess if you do that when the background process is idle
it's
the same as a close?I think we need some way to safeguard against accidental forkbombs for
cases
where users aren't intending to leave something running in the
background.
There's other reasons to use this besides spawning long running
processes,
and I'd certainly want to be able to ensure the calling function wasn't
accidentally leaving things running that it didn't mean to. (Maybe the
patch
already does this...)Current pg_background patch built to the top of BackgroundSession code
take care of that;
user need to call pg_background_close() to gracefully close previously
forked background
worker. Even though if user session who forked this worker exited
without calling
pg_background_close(), this background worked force to exit with following
log:ERROR: could not read from message queue: Other process has detached
queue
LOG: could not send on message queue: Other process has detached queue
LOG: worker process: background session by PID 61242 (PID 61358)
exited with exit code 1Does this make sense to you?
I'm specifically thinking of autonomous transactions, where you do NOT want
to run the command in the background, but you do want to run it in another
connection.The other example is running a command in another database. Not the same as
the autonomous transaction use case, but once again you likely don't want
that command running in the background.Put another way, when you launch a new process in a shell script, you have
to do something specific for that process to run in the background.My concern here is that AIUI the only way to launch a bgworker is with it
already backgrounded. That makes it much more likely that a bug in your code
produces an unintended fork-bomb (connection-bomb?). That would be
especially true if the function was re-entrant.Since one of the major points of bgworkers is exactly to run stuff in the
background, while the parent connection is doing something else, I can
certainly understand the default case being to background something, but I
think it'd be good to have a simple way to start a bgworker, have it do
something, and wait until it's done.Make sense?
I am not sure that I've understand this completely, but note that in
current pg_background design we simply launch worker once and feed the
subsequent queries using pg_background_run() api.
Regards,
Amul
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
The following review has been posted through the commitfest application:
make installcheck-world: tested, passed
Implements feature: tested, passed
Spec compliant: tested, passed
Documentation: tested, failed
I’ll summarize here my thoughts as a reviewer on the current state of the pg_background:
1. Current version of a code [1] is fine, from my point of view. I have no suggestions on improving it. There is no documentation, but code is commented.
2. Patch is dependent on background sessions from the same commitfest.
3. There can exist more features, but for v1 there is surely enough features.
4. There is some controversy on where implemented feature shall be: in separate extension (as in this patch), in db_link, in some PL API, in FDW or somewhere else. I think that new extension is an appropriate place for the feature. But I’m not certain.
Summarizing these points, appropriate statuses of the patch are ‘Ready for committer’ or ‘Rejected’. Between these two I choose ‘Ready for committer’, I think patch is committable (after bg sessions).
Best regards, Andrey Borodin.
The new status of this patch is: Ready for Committer
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On 1/19/17 12:47 PM, Andrey Borodin wrote:
4. There is some controversy on where implemented feature shall be: in separate extension (as in this patch), in db_link, in some PL API, in FDW or somewhere else. I think that new extension is an appropriate place for the feature. But I’m not certain.
I suppose we should decide first whether we want pg_background as a
separate extension or rather pursue extending dblink as proposed elsewhere.
I don't know if pg_background allows any use case that dblink can't
handle (yet).
--
Peter Eisentraut http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On Fri, Jan 27, 2017 at 9:14 AM, Peter Eisentraut
<peter.eisentraut@2ndquadrant.com> wrote:
On 1/19/17 12:47 PM, Andrey Borodin wrote:
4. There is some controversy on where implemented feature shall be: in separate extension (as in this patch), in db_link, in some PL API, in FDW or somewhere else. I think that new extension is an appropriate place for the feature. But I’m not certain.
I suppose we should decide first whether we want pg_background as a
separate extension or rather pursue extending dblink as proposed elsewhere.I don't know if pg_background allows any use case that dblink can't
handle (yet).
For the record, I have no big problem with extending dblink to allow
this instead of adding pg_background. But I think we should try to
get one or the other done in time for this release.
--
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
2017-01-27 19:14 GMT+05:00 Peter Eisentraut <peter.eisentraut@2ndquadrant.com>:
I suppose we should decide first whether we want pg_background as a
separate extension or rather pursue extending dblink as proposed elsewhere.I don't know if pg_background allows any use case that dblink can't
handle (yet).
pg_background in it's current version is just a start of a feature.
The question is: are they coherent in desired features? I do not know.
E.g. will it be possible to copy from stdin in dblink and possible
incarnations of pg_background functionality?)
2017-01-27 19:38 GMT+05:00 Robert Haas <robertmhaas@gmail.com>:
For the record, I have no big problem with extending dblink to allow
this instead of adding pg_background. But I think we should try to
get one or the other done in time for this release.
+1!
that's why I hesitate between not saying my points and making
controversy...need to settle it somehow.
Parallelism is a "selling" feature, everything has to be parallel for
a decade already (don't we have parallel sequential scan yet?).
It's fine to go with dblink, but dblink docs start with roughly "this
is an outdated substandard feature"(not a direct quote[0)].
What will we add there? "Do not use dblink for linking to databases.
This is the standard for doing concurrency." ?
Please excuse me for exaggeration. BTW, pg_background do not have docs at all.
[0]: https://www.postgresql.org/docs/devel/static/dblink.html
Best regards, Andrey Borodin.
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On Fri, Jan 27, 2017 at 11:38 PM, Robert Haas <robertmhaas@gmail.com> wrote:
On Fri, Jan 27, 2017 at 9:14 AM, Peter Eisentraut
<peter.eisentraut@2ndquadrant.com> wrote:On 1/19/17 12:47 PM, Andrey Borodin wrote:
4. There is some controversy on where implemented feature shall be: in separate extension (as in this patch), in db_link, in some PL API, in FDW or somewhere else. I think that new extension is an appropriate place for the feature. But I’m not certain.
I suppose we should decide first whether we want pg_background as a
separate extension or rather pursue extending dblink as proposed elsewhere.I don't know if pg_background allows any use case that dblink can't
handle (yet).For the record, I have no big problem with extending dblink to allow
this instead of adding pg_background. But I think we should try to
get one or the other done in time for this release.
Moved to CF 2017-03 as the discussion is not over yet.
--
Michael
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On 2/1/17 22:03, Michael Paquier wrote:
On Fri, Jan 27, 2017 at 11:38 PM, Robert Haas <robertmhaas@gmail.com> wrote:
On Fri, Jan 27, 2017 at 9:14 AM, Peter Eisentraut
<peter.eisentraut@2ndquadrant.com> wrote:On 1/19/17 12:47 PM, Andrey Borodin wrote:
4. There is some controversy on where implemented feature shall be: in separate extension (as in this patch), in db_link, in some PL API, in FDW or somewhere else. I think that new extension is an appropriate place for the feature. But I’m not certain.
I suppose we should decide first whether we want pg_background as a
separate extension or rather pursue extending dblink as proposed elsewhere.I don't know if pg_background allows any use case that dblink can't
handle (yet).For the record, I have no big problem with extending dblink to allow
this instead of adding pg_background. But I think we should try to
get one or the other done in time for this release.Moved to CF 2017-03 as the discussion is not over yet.
Set to returned with feedback, since the same was done to the background
sessions patch.
I would like to continue working on this for the next release.
--
Peter Eisentraut http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers