# 
# old_revision [9a68fa59cb0ca3246f03880664062abb98f1a61a]
# 
# add_file "src/backend/storage/ipc/imsg.c"
#  content [3e84c6372a47612a2fe233fee6b122808135580e]
# 
# add_file "src/include/storage/imsg.h"
#  content [3cf37b12a00b90f65b8393fc5e27c98d772dc22b]
# 
# patch "src/backend/storage/ipc/Makefile"
#  from [71276ab6483aebbb27f87c988d77ab876611f190]
#    to [9a99101d3e8bbfe52c97763db536804e94371828]
# 
# patch "src/backend/storage/ipc/ipci.c"
#  from [177f266b4668190a6ab1f2902305f7b7e577ef8d]
#    to [1971e2122ba4455c8b9784e70059d917fdf4f4c8]
# 
============================================================
--- src/backend/storage/ipc/imsg.c	3e84c6372a47612a2fe233fee6b122808135580e
+++ src/backend/storage/ipc/imsg.c	3e84c6372a47612a2fe233fee6b122808135580e
@@ -0,0 +1,375 @@
+/*-------------------------------------------------------------------------
+ *
+ * imsg.c
+ *    internal messages from process to process sent via shared memory.
+ *
+ *
+ * Copyright (c) 2006, Markus Schiltknecht <markus@bluegap.ch>
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include <unistd.h>
+#include <signal.h>
+#include <string.h>
+
+#ifdef HAVE_SYS_FILIO_H
+#include <sys/filio.h>
+#endif
+
+#include <sys/ioctl.h>
+
+#include "postgres.h"
+#include "miscadmin.h"
+#include "storage/proc.h"
+#include "storage/imsg.h"
+#include "storage/ipc.h"
+#include "storage/buffer.h"
+#include "storage/spin.h"
+#include "utils/elog.h"
+
+/* global variable pointing to the shmem area */
+IMessageCtlData *IMessageCtl = NULL;
+
+/*
+ * Initialization of shared memory for internal messages.
+ */
+int
+IMessageShmemSize(void)
+{
+	return MAXALIGN(IMessageBufferSize);
+}
+
+void
+IMessageShmemInit(void)
+{
+	bool		foundIMessageCtl;
+
+#ifdef IMSG_DEBUG
+	elog(DEBUG3, "IMessageShmemInit(): initializing shared memory");
+#endif
+
+	IMessageCtl = (IMessageCtlData *)
+		ShmemInitStruct("IMsgCtl",
+						MAXALIGN(IMessageBufferSize),
+						&foundIMessageCtl);
+
+	if (foundIMessageCtl)
+		return;
+
+	/* empty the control structure and all message descriptors */
+	memset(IMessageCtl, 0, MAXALIGN(IMessageBufferSize));
+
+	/* initialize start and end pointers */
+	IMessageCtl->queue_start = (IMessage*) IMSG_BUFFER_START(IMessageCtl);
+	IMessageCtl->queue_end = (IMessage*) IMSG_BUFFER_START(IMessageCtl);
+
+	SpinLockInit(&IMessageCtl->msgs_lck);
+}
+
+/*
+ *   IMessageCreate
+ *
+ * creates a new but deactivated message within the queue, returning the
+ * message header of the newly created message.
+ */
+IMessage*
+IMessageCreate(int recipient, int msg_size)
+{
+	IMessage	   *msg;
+	int				remaining_space;
+
+#ifdef IMSG_DEBUG
+	elog(DEBUG3, "IMessageCreate(): recipient: %d, size: %d",
+		recipient, msg_size);
+#endif
+
+	/* assert a reasonable maximum message size */
+	Assert(msg_size < (MAXALIGN(IMessageBufferSize) / 4));
+
+	START_CRIT_SECTION();
+	{
+		/* use volatile pointer to prevent code rearrangement */
+		volatile IMessageCtlData *imsgctl = IMessageCtl;
+
+		SpinLockAcquire(&imsgctl->msgs_lck);
+
+		/*
+		 * Check if there is enough space for the message plus the
+		 * terminating header
+		 */
+		if (imsgctl->queue_end < imsgctl->queue_start)
+			remaining_space = (int) imsgctl->queue_start -
+							  (int) imsgctl->queue_end;
+		else
+			remaining_space = (int) IMSG_BUFFER_END(imsgctl) -
+							  (int) imsgctl->queue_end;
+
+		if (remaining_space < (MAXALIGN(IMessageBufferSize) / 8))
+		{
+#ifdef IMSG_DEBUG
+			elog(DEBUG3, "IMessageCreate(): cleanup starting");
+#endif
+
+			/* Clean up messages that have been removed. */
+			while (imsgctl->queue_start->recipient == 0)
+			{
+				if (imsgctl->queue_start > imsgctl->queue_end)
+				{
+					if ((imsgctl->queue_start->sender == 0) &&
+						(imsgctl->queue_start->recipient == 0))
+					{
+#ifdef IMSG_DEBUG
+			elog(DEBUG3, "IMessageCreate(): cleanup wrapped");
+#endif
+						imsgctl->queue_start = (IMessage*) IMSG_BUFFER_START(imsgctl);
+						continue;
+					}
+				}
+				else if (imsgctl->queue_start >= imsgctl->queue_end)
+					break;
+
+				imsgctl->queue_start = (IMessage*) (
+					(int) imsgctl->queue_start +
+					IMSG_ALIGN(imsgctl->queue_start->size +
+							   sizeof(IMessage)));
+			}
+
+			/* recalc remainig space */
+			if (imsgctl->queue_end < imsgctl->queue_start)
+				remaining_space = (int) imsgctl->queue_start -
+								  (int) imsgctl->queue_end;
+			else
+				remaining_space = (int) IMSG_BUFFER_END(imsgctl) -
+								  (int) imsgctl->queue_end;
+
+		}
+
+		if (IMSG_ALIGN(msg_size + 2 * sizeof(IMessage)) < remaining_space)
+		{
+			msg = (IMessage*) imsgctl->queue_end;
+			imsgctl->queue_end = (IMessage*) ((int) imsgctl->queue_end + 
+								 IMSG_ALIGN(msg_size + sizeof(IMessage)));
+		}
+		else
+		{
+			remaining_space = (int) imsgctl->queue_start -
+							  (int) IMSG_BUFFER_START(imsgctl);
+#ifdef IMSG_DEBUG
+			elog(DEBUG5, "IMessageCreate:    remaining wrap space: %d",
+				 remaining_space);
+#endif
+
+			/* There is not enough space. But maybe we can wrap around? */
+			if ((imsgctl->queue_end >= imsgctl->queue_start) &&
+				((int) IMSG_BUFFER_START(imsgctl) +
+				IMSG_ALIGN(msg_size + 2 * sizeof(IMessage)) <
+				(int) imsgctl->queue_start))
+			{
+				/* Yes, wrap around */
+#ifdef IMSG_DEBUG
+				elog(DEBUG5, "IMessageCreate: wrapped around.");
+#endif
+				msg = (IMessage*) IMSG_BUFFER_START(imsgctl);
+				imsgctl->queue_end = (IMessage*) ((int) msg +
+									IMSG_ALIGN(msg_size + sizeof(IMessage)));
+			}
+			else
+			{
+				/* TODO: correct error handling here... */
+				elog(ERROR, "Not enough space within IMessages buffer.");
+				SpinLockRelease(&imsgctl->msgs_lck);
+				return NULL;
+			}
+		}
+
+		/* initialize the message as inactive */
+		msg->sender = 0;
+		msg->recipient = recipient;
+		msg->size = msg_size;
+
+		/* clean the following block */
+		imsgctl->queue_end->sender = 0;
+		imsgctl->queue_end->recipient = 0;
+
+		/* queue editing finished */
+		SpinLockRelease(&imsgctl->msgs_lck);
+
+#ifdef IMSG_DEBUG
+	elog(DEBUG3, "IMessageCreate(): created at %08X size: %d (next: %08X)",
+		 (int) msg, msg->size, (unsigned int) imsgctl->queue_end);
+#endif
+	}
+	END_CRIT_SECTION();
+
+	return msg;
+}
+
+void
+IMessageForward(IMessage *msg, int new_recipient)
+{
+	msg->recipient = new_recipient;
+	msg->sender = 0;
+
+	IMessageActivate(msg);
+}
+
+void
+IMessageActivate(IMessage *msg)
+{
+	msg->sender = MyProcPid;
+
+	/* TODO: use PGPROC to determine if the recipient wants to be signaled,
+	 *       probably we can save that signaling step in certain occasions.
+	 */
+
+	/* send a signal to the recipient */
+	kill(msg->recipient, SIGUSR1);
+}
+
+/*
+ *   IMessageRemove
+ *
+ * Marks a message as removable by setting the recipient to null. The message
+ * will eventually be removed during creation of new messages, see
+ * IMessageCreate().
+ */
+void
+IMessageRemove(IMessage *msg)
+{
+	msg->recipient = 0;
+}
+
+/*
+ *   IMessageCheck
+ *
+ * Checks if there is a message in the queue for this process. Returns null
+ * if there is no message for this process, the message header otherwise. The
+ * message remains in the queue and should be removed by IMessageRemove().
+ */
+IMessage*
+IMessageCheck(void)
+{
+	IMessage	   *msg,
+				   *res;
+
+	res = NULL;
+	START_CRIT_SECTION();
+	{
+		/* use volatile pointer to prevent code rearrangement */
+		volatile IMessageCtlData *imsgctl = IMessageCtl;
+
+		SpinLockAcquire(&imsgctl->msgs_lck);
+
+		/* Loop through the queue from the start. Wraping might be
+		 * required */
+		msg = imsgctl->queue_start;
+		while (1)
+		{
+			if (((int) msg >= (int) imsgctl->queue_start) &&
+				((int) imsgctl->queue_start > (int) imsgctl->queue_end))
+			{
+				if ((msg->sender == 0) &&
+					(msg->recipient == 0))
+				{
+					msg = (IMessage*) IMSG_BUFFER_START(imsgctl);
+					continue;
+				}
+			}
+			else if (msg >= imsgctl->queue_end)
+				break;
+
+			if ((msg->sender != 0) && (msg->recipient == MyProcPid))
+			{
+				res = msg;
+				break;
+			}
+ 
+			msg = (IMessage*) ((int) msg +
+					IMSG_ALIGN(msg->size + sizeof(IMessage)));
+		}
+
+		SpinLockRelease(&imsgctl->msgs_lck);
+	}
+	END_CRIT_SECTION();
+
+#ifdef IMSG_DEBUG
+	if (res == NULL)
+		elog(DEBUG3, "IMessageCheck(): no new message for %d.", MyProcPid);
+	else
+		elog(DEBUG3, "IMessageCheck(): new message of size %d for %d.",
+				msg->size, MyProcPid);
+#endif
+
+	return res;
+}
+
+/*
+ *   IMessageAwait
+ *
+ * Waits for a message but leaves the message in the queue.
+ */
+IMessage*
+IMessageAwait(void)
+{
+	IMessage	   *msg;
+	struct timeval	tv;
+
+	msg = IMessageCheck();
+	while (!msg)
+	{
+		/*
+		 * TODO: we want to wait for signals here. Check if select() is
+		 * appropriate. Maybe pause() is better, but how about portability?
+		 * However, make sure we have a timeout here, since we could
+		 * probably miss a signal.
+		 */
+		tv.tv_sec = 2;
+		tv.tv_usec = 0;
+		select(1, NULL, NULL, NULL, &tv);
+		// pause();
+		msg = IMessageCheck();
+	}
+
+	return msg;
+}
+
+/*
+ *   IMessageGetReadBuffer
+ *
+ * gets a readable buffer for the given message
+ */
+buffer *
+IMessageGetReadBuffer(IMessage *msg)
+{
+	buffer *b = palloc(sizeof(buffer));
+
+	Assert(msg);
+	Assert(msg->size > 0);
+
+	init_buffer(b, IMSG_DATA(msg), msg->size, NULL, NULL, NULL);
+	b->fill_size = msg->size;
+
+	return b;
+}
+
+/*
+ *   IMessageGetWriteBuffer
+ *
+ * gets a writeable buffer for the given message
+ */
+buffer *
+IMessageGetWriteBuffer(IMessage *msg)
+{
+	buffer *b = palloc(sizeof(buffer));
+
+	init_buffer(b, IMSG_DATA(msg), msg->size, NULL, NULL, NULL);
+
+	return b;
+}
+
+void
+IMessageFreeBuffer(buffer *b)
+{
+	pfree(b);
+}
============================================================
--- src/include/storage/imsg.h	3cf37b12a00b90f65b8393fc5e27c98d772dc22b
+++ src/include/storage/imsg.h	3cf37b12a00b90f65b8393fc5e27c98d772dc22b
@@ -0,0 +1,85 @@
+/*-------------------------------------------------------------------------
+ *
+ * imsg.c
+ *    internal messages from process to process sent via shared memory.
+ *
+ *
+ * Copyright (c) 2006, Markus Schiltknecht <markus@bluegap.ch>
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#ifndef IMSG_H
+#define IMSG_H
+
+#include <sys/types.h>
+#include "storage/spin.h"
+#include "storage/buffer.h"
+
+/* TODO: replace with GUC variable to be configurable */
+#define IMessageBufferSize 8388608		/* 8 MB */
+
+/* alignment for messages (8 bytes) */
+#define IMSG_ALIGN(size) (((size) + 7) & 0xFFFFFFF8)
+
+/* for convinience to buffer access */
+#define IMSG_BUFFER_START(imsgctl) ((int) \
+			(IMSG_ALIGN((int) imsgctl + sizeof(IMessageCtlData))))
+
+#define IMSG_BUFFER_END(imsgctl) ((int) \
+			(IMSG_ALIGN((int) imsgctl + MAXALIGN(IMessageBufferSize))))
+
+/* get a data pointer from the header */
+#define IMSG_DATA(imsg) ((void*) ((int) imsg + sizeof(IMessage)))
+
+/*
+ * Message descriptor in front of the message
+ */
+typedef struct
+{
+	/* pid of the sender, null means not yet activated message */
+	pid_t		sender;
+
+	/* pid of the recipient, null meaning has already been received */
+	pid_t		recipient;
+
+	/* message size following, but not including this header */
+	int			size;
+} IMessage;
+
+/*
+ * shared-memory pool for internal messages.
+ */
+typedef struct
+{
+	/* currently active messages */
+	unsigned int		count_messages;
+
+	/* start of messages within the cycling queue */
+	IMessage		   *queue_start;
+
+	/* next free place, just after the last message */
+	IMessage		   *queue_end;
+
+	/* lock for editing the message queue */
+	slock_t				msgs_lck;
+} IMessageCtlData;
+
+/* the global variable storing pointer to the shared memory area */
+extern IMessageCtlData *RmgrCtl;
+
+/* routines to send and receive internal messages */
+extern int IMessageShmemSize(void);
+extern void IMessageShmemInit(void);
+extern IMessage* IMessageCreate(int recipient, int msg_size);
+extern void IMessageForward(IMessage *msg, int new_recipient);
+extern void IMessageActivate(IMessage *msg);
+extern void IMessageRemove(IMessage *msg);
+extern IMessage* IMessageCheck(void);
+extern IMessage* IMessageAwait(void);
+
+extern buffer *IMessageGetReadBuffer(IMessage *msg);
+extern buffer *IMessageGetWriteBuffer(IMessage *msg);
+extern void IMessageFreeBuffer(buffer *b);
+
+#endif   /* IMSG_H */
============================================================
--- src/backend/storage/ipc/Makefile	71276ab6483aebbb27f87c988d77ab876611f190
+++ src/backend/storage/ipc/Makefile	9a99101d3e8bbfe52c97763db536804e94371828
@@ -16,7 +16,7 @@ OBJS = ipc.o ipci.o pmsignal.o procarray
 endif
 
 OBJS = ipc.o ipci.o pmsignal.o procarray.o shmem.o shmqueue.o \
-	sinval.o sinvaladt.o
+	sinval.o sinvaladt.o imsg.o buffer.o
 
 all: SUBSYS.o
 
============================================================
--- src/backend/storage/ipc/ipci.c	177f266b4668190a6ab1f2902305f7b7e577ef8d
+++ src/backend/storage/ipc/ipci.c	1971e2122ba4455c8b9784e70059d917fdf4f4c8
@@ -24,6 +24,7 @@
 #include "postmaster/bgwriter.h"
 #include "postmaster/postmaster.h"
 #include "storage/freespace.h"
+#include "storage/imsg.h"
 #include "storage/ipc.h"
 #include "storage/pg_shmem.h"
 #include "storage/pmsignal.h"
@@ -110,6 +111,7 @@ CreateSharedMemoryAndSemaphores(bool mak
 		size = add_size(size, FreeSpaceShmemSize());
 		size = add_size(size, BgWriterShmemSize());
 		size = add_size(size, BTreeShmemSize());
+		size = add_size(size, IMessageShmemSize());
 #ifdef EXEC_BACKEND
 		size = add_size(size, ShmemBackendArraySize());
 #endif
@@ -178,6 +180,7 @@ CreateSharedMemoryAndSemaphores(bool mak
 	SUBTRANSShmemInit();
 	TwoPhaseShmemInit();
 	MultiXactShmemInit();
+	IMessageShmemInit();
 	InitBufferPool();
 
 	/*
