From 5840a0f2c7a7546c98485382e2c173b6be943f54 Mon Sep 17 00:00:00 2001
From: Robert Haas <rhaas@postgresql.org>
Date: Wed, 28 May 2014 19:30:21 -0400
Subject: [PATCH 3/6] Support frontend-backend protocol communication using a
 shm_mq.

A background worker can use pq_redirect_to_shm_mq() to direct protocol
that would normally be sent to the frontend to a shm_mq so that another
process may read them.

The receiving process may use pq_parse_errornotice() to parse an
ErrorResponse or NoticeResponse from the background worker and, if
it wishes, ThrowErrorData() to propagate the error (with or without
further modification).
---
 src/backend/libpq/Makefile       |    2 +-
 src/backend/libpq/pqcomm.c       |   13 +++
 src/backend/libpq/pqmq.c         |  202 ++++++++++++++++++++++++++++++++++++++
 src/backend/utils/adt/numutils.c |    2 +-
 src/backend/utils/error/elog.c   |   51 ++++++++++
 src/include/libpq/libpq.h        |    3 +
 src/include/libpq/pqmq.h         |   22 +++++
 src/include/utils/builtins.h     |    2 +-
 src/include/utils/elog.h         |    1 +
 9 files changed, 295 insertions(+), 3 deletions(-)
 create mode 100644 src/backend/libpq/pqmq.c
 create mode 100644 src/include/libpq/pqmq.h

diff --git a/src/backend/libpq/Makefile b/src/backend/libpq/Makefile
index e929864..f548b2f 100644
--- a/src/backend/libpq/Makefile
+++ b/src/backend/libpq/Makefile
@@ -15,6 +15,6 @@ include $(top_builddir)/src/Makefile.global
 # be-fsstubs is here for historical reasons, probably belongs elsewhere
 
 OBJS = be-fsstubs.o be-secure.o auth.o crypt.o hba.o ip.o md5.o pqcomm.o \
-       pqformat.o pqsignal.o
+       pqformat.o pqmq.o pqsignal.o
 
 include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/libpq/pqcomm.c b/src/backend/libpq/pqcomm.c
index 605d891..b02fb0e 100644
--- a/src/backend/libpq/pqcomm.c
+++ b/src/backend/libpq/pqcomm.c
@@ -102,6 +102,9 @@
 int			Unix_socket_permissions;
 char	   *Unix_socket_group;
 
+/* Hooks for protocol message redirect */
+int	(*pq_putmessage_hook)(char msgtype, const char *s, size_t len);
+int	(*pq_flush_hook)(void);
 
 /* Where the Unix socket files are (list of palloc'd strings) */
 static List *sock_paths = NIL;
@@ -792,6 +795,11 @@ TouchSocketFiles(void)
 static void
 pq_set_nonblocking(bool nonblocking)
 {
+	if (MyProcPort == NULL)
+		ereport(ERROR,
+				(errcode(ERRCODE_CONNECTION_DOES_NOT_EXIST),
+				 errmsg("there is no client connection")));
+
 	if (MyProcPort->noblock == nonblocking)
 		return;
 
@@ -1220,6 +1228,9 @@ pq_flush(void)
 {
 	int			res;
 
+	if (pq_flush_hook != NULL)
+		return pq_flush_hook();
+
 	/* No-op if reentrant call */
 	if (PqCommBusy)
 		return 0;
@@ -1378,6 +1389,8 @@ pq_is_send_pending(void)
 int
 pq_putmessage(char msgtype, const char *s, size_t len)
 {
+	if (pq_putmessage_hook != NULL)
+		return pq_putmessage_hook(msgtype, s, len);
 	if (DoingCopyOut || PqCommBusy)
 		return 0;
 	PqCommBusy = true;
diff --git a/src/backend/libpq/pqmq.c b/src/backend/libpq/pqmq.c
new file mode 100644
index 0000000..c29cc7c
--- /dev/null
+++ b/src/backend/libpq/pqmq.c
@@ -0,0 +1,202 @@
+/*-------------------------------------------------------------------------
+ *
+ * pqmq.c
+ *	  Use the frontend/backend protocol for communication over a shm_mq
+ *
+ * Portions Copyright (c) 1996-2014, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ *	src/backend/libpq/pqmq.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "libpq/libpq.h"
+#include "libpq/pqformat.h"
+#include "libpq/pqmq.h"
+#include "tcop/tcopprot.h"
+#include "utils/builtins.h"
+
+static shm_mq *pq_mq;
+static shm_mq_handle *pq_mq_handle;
+static bool pq_mq_busy = false;
+
+static int pq_putmessage_mq(char msgtype, const char *s, size_t len);
+static int pq_flush_mq(void);
+
+/*
+ * Arrange to redirect frontend/backend protocol messages to a shared-memory
+ * message queue.
+ */
+void
+pq_redirect_to_shm_mq(shm_mq *mq, shm_mq_handle *mqh)
+{
+	pq_putmessage_hook = pq_putmessage_mq;
+	pq_flush_hook = pq_flush_mq;
+	pq_mq = mq;
+	pq_mq_handle = mqh;
+	whereToSendOutput = DestRemote;
+	FrontendProtocol = PG_PROTOCOL_LATEST;
+}
+
+/*
+ * Transmit a libpq protocol message to the shared memory message queue
+ * selected via pq_mq_handle.  We don't include a length word, because the
+ * receiver will know the length of the message from shm_mq_receive().
+ */
+static int
+pq_putmessage_mq(char msgtype, const char *s, size_t len)
+{
+	shm_mq_iovec	iov[2];
+	shm_mq_result	result;
+
+	/*
+	 * If we're sending a message, and we have to wait because the
+	 * queue is full, and then we get interrupted, and that interrupt
+	 * results in trying to send another message, we respond by detaching
+	 * the queue.  There's no way to return to the original context, but
+	 * even if there were, just queueing the message would amount to
+	 * indefinitely postponing the response to the interrupt.  So we do
+	 * this instead.
+	 */
+	if (pq_mq_busy)
+	{
+		if (pq_mq != NULL)
+			shm_mq_detach(pq_mq);
+		pq_mq = NULL;
+		return EOF;
+	}
+
+	pq_mq_busy = true;
+
+	iov[0].data = &msgtype;
+	iov[0].len = 1;
+	iov[1].data = s;
+	iov[1].len = len;
+
+	Assert(pq_mq_handle != NULL);
+	result = shm_mq_sendv(pq_mq_handle, iov, 2, false);
+
+	pq_mq_busy = false;
+
+	Assert(result == SHM_MQ_SUCCESS || result == SHM_MQ_DETACHED);
+	if (result != SHM_MQ_SUCCESS)
+		return EOF;
+	return 0;
+}
+
+/*
+ * Flush is not required.
+ */
+static int
+pq_flush_mq(void)
+{
+	return 0;
+}
+
+/*
+ * Parse an ErrorResponse or NoticeResponse payload and populate an ErrorData
+ * structure with the results.
+ */
+void
+pq_parse_errornotice(StringInfo msg, ErrorData *edata)
+{
+	/* Initialize edata with reasonable defaults. */
+	MemSet(edata, 0, sizeof(ErrorData));
+	edata->elevel = ERROR;
+	edata->assoc_context = CurrentMemoryContext;
+
+	/* Loop over fields and extract each one. */
+	for (;;)
+	{
+		char	code = pq_getmsgbyte(msg);
+		const char *value;
+
+		if (code == '\0')
+		{
+			pq_getmsgend(msg);
+			break;
+		}
+	 	value = pq_getmsgstring(msg);
+
+		switch (code)
+		{
+			case PG_DIAG_SEVERITY:
+				if (strcmp(value, "DEBUG") == 0)
+					edata->elevel = DEBUG1;	/* or some other DEBUG level */
+				else if (strcmp(value, "LOG") == 0)
+					edata->elevel = LOG;	/* can't be COMMERROR */
+				else if (strcmp(value, "INFO") == 0)
+					edata->elevel = INFO;
+				else if (strcmp(value, "NOTICE") == 0)
+					edata->elevel = NOTICE;
+				else if (strcmp(value, "WARNING") == 0)
+					edata->elevel = WARNING;
+				else if (strcmp(value, "ERROR") == 0)
+					edata->elevel = ERROR;
+				else if (strcmp(value, "FATAL") == 0)
+					edata->elevel = FATAL;
+				else if (strcmp(value, "PANIC") == 0)
+					edata->elevel = PANIC;
+				else
+					elog(ERROR, "unknown error severity");
+				break;
+			case PG_DIAG_SQLSTATE:
+				if (strlen(value) != 5)
+					elog(ERROR, "malformed sql state");
+				edata->sqlerrcode = MAKE_SQLSTATE(value[0], value[1], value[2],
+												  value[3], value[4]);
+				break;
+			case PG_DIAG_MESSAGE_PRIMARY:
+				edata->message = pstrdup(value);
+				break;
+			case PG_DIAG_MESSAGE_DETAIL:
+				edata->detail = pstrdup(value);
+				break;
+			case PG_DIAG_MESSAGE_HINT:
+				edata->hint = pstrdup(value);
+				break;
+			case PG_DIAG_STATEMENT_POSITION:
+				edata->cursorpos = pg_atoi(value, sizeof(int), '\0');
+				break;
+			case PG_DIAG_INTERNAL_POSITION:
+				edata->internalpos = pg_atoi(value, sizeof(int), '\0');
+				break;
+			case PG_DIAG_INTERNAL_QUERY:
+				edata->internalquery = pstrdup(value);
+				break;
+			case PG_DIAG_CONTEXT:
+				edata->context = pstrdup(value);
+				break;
+			case PG_DIAG_SCHEMA_NAME:
+				edata->schema_name = pstrdup(value);
+				break;
+			case PG_DIAG_TABLE_NAME:
+				edata->table_name = pstrdup(value);
+				break;
+			case PG_DIAG_COLUMN_NAME:
+				edata->column_name = pstrdup(value);
+				break;
+			case PG_DIAG_DATATYPE_NAME:
+				edata->datatype_name = pstrdup(value);
+				break;
+			case PG_DIAG_CONSTRAINT_NAME:
+				edata->constraint_name = pstrdup(value);
+				break;
+			case PG_DIAG_SOURCE_FILE:
+				edata->filename = pstrdup(value);
+				break;
+			case PG_DIAG_SOURCE_LINE:
+				edata->lineno = pg_atoi(value, sizeof(int), '\0');
+				break;
+			case PG_DIAG_SOURCE_FUNCTION:
+				edata->funcname = pstrdup(value);
+				break;
+			default:
+				elog(ERROR, "unknown error field: %d", (int) code);
+				break;
+		}
+	}
+}
diff --git a/src/backend/utils/adt/numutils.c b/src/backend/utils/adt/numutils.c
index ca5a8a5..1d13363 100644
--- a/src/backend/utils/adt/numutils.c
+++ b/src/backend/utils/adt/numutils.c
@@ -34,7 +34,7 @@
  * overflow.
  */
 int32
-pg_atoi(char *s, int size, int c)
+pg_atoi(const char *s, int size, int c)
 {
 	long		l;
 	char	   *badp;
diff --git a/src/backend/utils/error/elog.c b/src/backend/utils/error/elog.c
index 32a9663..2316464 100644
--- a/src/backend/utils/error/elog.c
+++ b/src/backend/utils/error/elog.c
@@ -1577,6 +1577,57 @@ FlushErrorState(void)
 }
 
 /*
+ * ThrowErrorData --- report an error described by an ErrorData structure
+ *
+ * This is intended to be used to re-report errors originally thrown by
+ * background worker processes and then propagated (with or without
+ * modification) to the backend responsible for them.
+ */
+void
+ThrowErrorData(ErrorData *edata)
+{
+	ErrorData *newedata;
+	MemoryContext	oldcontext;
+
+	if (!errstart(edata->elevel, edata->filename, edata->lineno,
+				  edata->funcname, NULL))
+		return;
+
+	newedata = &errordata[errordata_stack_depth];
+	oldcontext = MemoryContextSwitchTo(edata->assoc_context);
+
+	/* Copy the supplied fields to the error stack. */
+	if (edata->sqlerrcode > 0)
+		newedata->sqlerrcode = edata->sqlerrcode;
+	if (edata->message)
+		newedata->message = pstrdup(edata->message);
+	if (edata->detail)
+		newedata->detail = pstrdup(edata->detail);
+	if (edata->detail_log)
+		newedata->detail_log = pstrdup(edata->detail_log);
+	if (edata->hint)
+		newedata->hint = pstrdup(edata->hint);
+	if (edata->context)
+		newedata->context = pstrdup(edata->context);
+	if (edata->schema_name)
+		newedata->schema_name = pstrdup(edata->schema_name);
+	if (edata->table_name)
+		newedata->table_name = pstrdup(edata->table_name);
+	if (edata->column_name)
+		newedata->column_name = pstrdup(edata->column_name);
+	if (edata->datatype_name)
+		newedata->datatype_name = pstrdup(edata->datatype_name);
+	if (edata->constraint_name)
+		newedata->constraint_name = pstrdup(edata->constraint_name);
+	if (edata->internalquery)
+		newedata->internalquery = pstrdup(edata->internalquery);
+
+	MemoryContextSwitchTo(oldcontext);
+
+	errfinish(0);
+}
+
+/*
  * ReThrowError --- re-throw a previously copied error
  *
  * A handler can do CopyErrorData/FlushErrorState to get out of the error
diff --git a/src/include/libpq/libpq.h b/src/include/libpq/libpq.h
index e4e354d..eee80b3 100644
--- a/src/include/libpq/libpq.h
+++ b/src/include/libpq/libpq.h
@@ -75,6 +75,9 @@ extern char *ssl_key_file;
 extern char *ssl_ca_file;
 extern char *ssl_crl_file;
 
+extern int	(*pq_putmessage_hook)(char msgtype, const char *s, size_t len);
+extern int  (*pq_flush_hook)(void);
+
 extern int	secure_initialize(void);
 extern bool secure_loaded_verify_locations(void);
 extern void secure_destroy(void);
diff --git a/src/include/libpq/pqmq.h b/src/include/libpq/pqmq.h
new file mode 100644
index 0000000..6bb24d9
--- /dev/null
+++ b/src/include/libpq/pqmq.h
@@ -0,0 +1,22 @@
+/*-------------------------------------------------------------------------
+ *
+ * pqmq.h
+ *	  Use the frontend/backend protocol for communication over a shm_mq
+ *
+ * Portions Copyright (c) 1996-2014, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/include/libpq/pqmq.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef PQMQ_H
+#define PQMQ_H
+
+#include "storage/shm_mq.h"
+
+extern void	pq_redirect_to_shm_mq(shm_mq *, shm_mq_handle *);
+
+extern void pq_parse_errornotice(StringInfo str, ErrorData *edata);
+
+#endif   /* PQMQ_H */
diff --git a/src/include/utils/builtins.h b/src/include/utils/builtins.h
index bbb5d39..2e8783f 100644
--- a/src/include/utils/builtins.h
+++ b/src/include/utils/builtins.h
@@ -285,7 +285,7 @@ extern Datum current_schema(PG_FUNCTION_ARGS);
 extern Datum current_schemas(PG_FUNCTION_ARGS);
 
 /* numutils.c */
-extern int32 pg_atoi(char *s, int size, int c);
+extern int32 pg_atoi(const char *s, int size, int c);
 extern void pg_itoa(int16 i, char *a);
 extern void pg_ltoa(int32 l, char *a);
 extern void pg_lltoa(int64 ll, char *a);
diff --git a/src/include/utils/elog.h b/src/include/utils/elog.h
index 92073be..87438b8 100644
--- a/src/include/utils/elog.h
+++ b/src/include/utils/elog.h
@@ -415,6 +415,7 @@ extern ErrorData *CopyErrorData(void);
 extern void FreeErrorData(ErrorData *edata);
 extern void FlushErrorState(void);
 extern void ReThrowError(ErrorData *edata) __attribute__((noreturn));
+extern void ThrowErrorData(ErrorData *edata);
 extern void pg_re_throw(void) __attribute__((noreturn));
 
 extern char *GetErrorContextStack(void);
-- 
1.7.9.6 (Apple Git-31.1)

