From 82c93205d2dba8cf82c2753dc0b90540810f0a57 Mon Sep 17 00:00:00 2001
From: Robert Haas <rhaas@postgresql.org>
Date: Thu, 17 Jul 2014 14:51:27 -0400
Subject: [PATCH 1/6] Extend shm_mq API with new functions shm_mq_sendv and
 shm_mq_set_handle.

shm_mq_sendv sends a message to the queue assembled from multiple
locations.  This is expected to be used by forthcoming patches to
allow frontend/backend protocol messages to be sent via shm_mq, but
might be useful for other purposes as well.

shm_mq_set_handle associates a BackgroundWorkerHandle with an
already-existing shm_mq_handle.  This solves a timing problem when
creating a shm_mq to communicate with a newly-launched background
worker: if you attach to the queue first, and the background worker
fails to start, you might block forever trying to do I/O on the
queue; but if you start the background worker first, but then die
before attaching to the queue, the background worrker might block
forever trying to do I/O on the queue.  This lets you attach before
starting the worker (so that the worker is protected) and then associate
the BackgroundWorkerHandle later (so that you are also protected).
---
 src/backend/storage/ipc/shm_mq.c |  126 +++++++++++++++++++++++++++++++++-----
 src/include/storage/shm_mq.h     |   14 ++++-
 2 files changed, 124 insertions(+), 16 deletions(-)

diff --git a/src/backend/storage/ipc/shm_mq.c b/src/backend/storage/ipc/shm_mq.c
index d96627a..90df593 100644
--- a/src/backend/storage/ipc/shm_mq.c
+++ b/src/backend/storage/ipc/shm_mq.c
@@ -139,7 +139,7 @@ struct shm_mq_handle
 };
 
 static shm_mq_result shm_mq_send_bytes(shm_mq_handle *mq, Size nbytes,
-				  void *data, bool nowait, Size *bytes_written);
+				  const void *data, bool nowait, Size *bytes_written);
 static shm_mq_result shm_mq_receive_bytes(shm_mq *mq, Size bytes_needed,
 					 bool nowait, Size *nbytesp, void **datap);
 static bool shm_mq_wait_internal(volatile shm_mq *mq, PGPROC *volatile * ptr,
@@ -301,7 +301,33 @@ shm_mq_attach(shm_mq *mq, dsm_segment *seg, BackgroundWorkerHandle *handle)
 }
 
 /*
+ * Associate a BackgroundWorkerHandle with a shm_mq_handle just as if it had
+ * been passed to shm_mq_attach.
+ */
+void
+shm_mq_set_handle(shm_mq_handle *mqh, BackgroundWorkerHandle *handle)
+{
+	Assert(mqh->mqh_handle == NULL);
+	mqh->mqh_handle = handle;
+}
+
+/*
  * Write a message into a shared message queue.
+ */
+shm_mq_result
+shm_mq_send(shm_mq_handle *mqh, Size nbytes, const void *data, bool nowait)
+{
+	shm_mq_iovec	iov;
+
+	iov.data = data;
+	iov.len = nbytes;
+
+	return shm_mq_sendv(mqh, &iov, 1, nowait);
+}
+
+/*
+ * Write a message into a shared message queue, gathered from multiple
+ * addresses.
  *
  * When nowait = false, we'll wait on our process latch when the ring buffer
  * fills up, and then continue writing once the receiver has drained some data.
@@ -315,14 +341,22 @@ shm_mq_attach(shm_mq *mq, dsm_segment *seg, BackgroundWorkerHandle *handle)
  * the length or payload will corrupt the queue.)
  */
 shm_mq_result
-shm_mq_send(shm_mq_handle *mqh, Size nbytes, void *data, bool nowait)
+shm_mq_sendv(shm_mq_handle *mqh, shm_mq_iovec *iov, int iovcnt, bool nowait)
 {
 	shm_mq_result res;
 	shm_mq	   *mq = mqh->mqh_queue;
+	Size		nbytes = 0;
 	Size		bytes_written;
+	int			i;
+	int			which_iov = 0;
+	Size		offset;
 
 	Assert(mq->mq_sender == MyProc);
 
+	/* Compute total size of write. */
+	for (i = 0; i < iovcnt; ++i)
+		nbytes += iov[i].len;
+
 	/* Try to write, or finish writing, the length word into the buffer. */
 	while (!mqh->mqh_length_word_complete)
 	{
@@ -348,18 +382,80 @@ shm_mq_send(shm_mq_handle *mqh, Size nbytes, void *data, bool nowait)
 
 	/* Write the actual data bytes into the buffer. */
 	Assert(mqh->mqh_partial_bytes <= nbytes);
-	res = shm_mq_send_bytes(mqh, nbytes - mqh->mqh_partial_bytes,
-							((char *) data) + mqh->mqh_partial_bytes,
-							nowait, &bytes_written);
-	if (res == SHM_MQ_WOULD_BLOCK)
-		mqh->mqh_partial_bytes += bytes_written;
-	else
+	offset = mqh->mqh_partial_bytes;
+	do
 	{
-		mqh->mqh_partial_bytes = 0;
-		mqh->mqh_length_word_complete = false;
-	}
-	if (res != SHM_MQ_SUCCESS)
-		return res;
+		Size	chunksize;
+
+		/* Figure out which bytes need to be sent next. */
+		if (offset >= iov[which_iov].len)
+		{
+			offset -= iov[which_iov].len;
+			++which_iov;
+			if (which_iov >= iovcnt)
+				break;
+			continue;
+		}
+
+		/*
+		 * We want to avoid copying the data if at all possible, but every
+		 * chunk of bytes we write into the queue has to be MAXALIGN'd,
+		 * except the last.  Thus, if a chunk other than the last one ends
+		 * on a non-MAXALIGN'd boundary, we have to combine the tail end of
+		 * its data with data from one or more following chunks until we
+		 * either reach the last chunk or accumulate a number of bytes which
+		 * is MAXALIGN'd.
+		 */
+		if (which_iov + 1 < iovcnt &&
+			offset + MAXIMUM_ALIGNOF > iov[which_iov].len)
+		{
+			char	tmpbuf[MAXIMUM_ALIGNOF];
+			int		j = 0;
+
+			for (;;)
+			{
+				if (offset < iov[which_iov].len)
+				{
+					tmpbuf[j] = iov[which_iov].data[offset];
+					j++;
+					offset++;
+					if (j == MAXIMUM_ALIGNOF)
+						break;
+				}
+				else
+				{
+					offset -= iov[which_iov].len;
+					which_iov++;
+					if (which_iov >= iovcnt)
+						break;
+				}
+			}
+			res = shm_mq_send_bytes(mqh, j, tmpbuf, nowait, &bytes_written);
+			mqh->mqh_partial_bytes += bytes_written;
+			if (res != SHM_MQ_SUCCESS)
+				return res;
+			continue;
+		}
+
+		/*
+		 * If this is the last chunk, we can write all the data, even if it
+		 * isn't a multiple of MAXIMUM_ALIGNOF.  Otherwise, we need to
+		 * MAXALIGN_DOWN the write size.
+		 */
+		chunksize = iov[which_iov].len - offset;
+		if (which_iov + 1 < iovcnt)
+			chunksize = MAXALIGN_DOWN(chunksize);
+		res = shm_mq_send_bytes(mqh, chunksize, &iov[which_iov].data[offset],
+								nowait, &bytes_written);
+		mqh->mqh_partial_bytes += bytes_written;
+		offset += bytes_written;
+		if (res != SHM_MQ_SUCCESS)
+			return res;
+	} while (mqh->mqh_partial_bytes < nbytes);
+
+	/* Reset for next message. */
+	mqh->mqh_partial_bytes = 0;
+	mqh->mqh_length_word_complete = false;
 
 	/* Notify receiver of the newly-written data, and return. */
 	return shm_mq_notify_receiver(mq);
@@ -653,8 +749,8 @@ shm_mq_detach(shm_mq *mq)
  * Write bytes into a shared message queue.
  */
 static shm_mq_result
-shm_mq_send_bytes(shm_mq_handle *mqh, Size nbytes, void *data, bool nowait,
-				  Size *bytes_written)
+shm_mq_send_bytes(shm_mq_handle *mqh, Size nbytes, const void *data,
+				  bool nowait, Size *bytes_written)
 {
 	shm_mq	   *mq = mqh->mqh_queue;
 	Size		sent = 0;
diff --git a/src/include/storage/shm_mq.h b/src/include/storage/shm_mq.h
index 5bae380..063400a 100644
--- a/src/include/storage/shm_mq.h
+++ b/src/include/storage/shm_mq.h
@@ -25,6 +25,13 @@ typedef struct shm_mq shm_mq;
 struct shm_mq_handle;
 typedef struct shm_mq_handle shm_mq_handle;
 
+/* Descriptors for a single write spanning multiple locations. */
+typedef struct
+{
+	const char  *data;
+	Size	len;
+} shm_mq_iovec;
+
 /* Possible results of a send or receive operation. */
 typedef enum
 {
@@ -52,12 +59,17 @@ extern PGPROC *shm_mq_get_sender(shm_mq *);
 extern shm_mq_handle *shm_mq_attach(shm_mq *mq, dsm_segment *seg,
 			  BackgroundWorkerHandle *handle);
 
+/* Associate worker handle with shm_mq. */
+extern void shm_mq_set_handle(shm_mq_handle *, BackgroundWorkerHandle *);
+
 /* Break connection. */
 extern void shm_mq_detach(shm_mq *);
 
 /* Send or receive messages. */
 extern shm_mq_result shm_mq_send(shm_mq_handle *mqh,
-			Size nbytes, void *data, bool nowait);
+			Size nbytes, const void *data, bool nowait);
+extern shm_mq_result shm_mq_sendv(shm_mq_handle *mqh,
+			shm_mq_iovec *iov, int iovcnt, bool nowait);
 extern shm_mq_result shm_mq_receive(shm_mq_handle *mqh,
 			   Size *nbytesp, void **datap, bool nowait);
 
-- 
1.7.9.6 (Apple Git-31.1)

