From 53094143e3c1fc9a8090cce66e73e26d58c67b93 Mon Sep 17 00:00:00 2001
From: Andres Freund <andres@anarazel.de>
Date: Fri, 19 Feb 2016 12:07:51 -0800
Subject: [PATCH v7 1/2] Basic obstruction-free single producer, multiple
 consumer ringbuffer.

This is pretty darn limited, supporting only small queues - but could
easily be improved.
---
 src/backend/lib/Makefile  |   3 +-
 src/backend/lib/ringbuf.c | 161 ++++++++++++++++++++++++++++++++++++++
 src/include/lib/ringbuf.h |  72 +++++++++++++++++
 3 files changed, 235 insertions(+), 1 deletion(-)
 create mode 100644 src/backend/lib/ringbuf.c
 create mode 100644 src/include/lib/ringbuf.h

diff --git a/src/backend/lib/Makefile b/src/backend/lib/Makefile
index 3c1ee1df83a..b0a63fba309 100644
--- a/src/backend/lib/Makefile
+++ b/src/backend/lib/Makefile
@@ -13,6 +13,7 @@ top_builddir = ../../..
 include $(top_builddir)/src/Makefile.global
 
 OBJS = binaryheap.o bipartite_match.o bloomfilter.o dshash.o hyperloglog.o \
-       ilist.o integerset.o knapsack.o pairingheap.o rbtree.o stringinfo.o
+       ilist.o integerset.o knapsack.o pairingheap.o rbtree.o ringbuf.o \
+       stringinfo.o
 
 include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/lib/ringbuf.c b/src/backend/lib/ringbuf.c
new file mode 100644
index 00000000000..3de2a4977d8
--- /dev/null
+++ b/src/backend/lib/ringbuf.c
@@ -0,0 +1,161 @@
+/*-------------------------------------------------------------------------
+ *
+ * ringbuf.c
+
+ *	  Single producer, multiple consumer ringbuffer where consumption is
+ *	  obstruction-free (i.e. no progress guarantee, but a consumer that is
+ *	  stopped will not block progress).
+ *
+ * Implemented by essentially using an optimistic lock on the read side.
+ *
+ * XXX: It'd be nice if we could modify this so there's variants for push/pop
+ * that work for different concurrency scenarios. E.g. having spsc_push(),
+ * spmc_push(), ... - that'd avoid having to use different interfaces for
+ * different needs.
+ *
+ * Copyright (c) 2015, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ *	  src/backend/lib/ringbuf.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "lib/ringbuf.h"
+#include "storage/proc.h"
+
+static inline uint32
+ringbuf_backendid(ringbuf *rb, uint32 pos)
+{
+	return pos & 0xffff0000;
+}
+
+uint32
+ringbuf_elements(ringbuf *rb)
+{
+	uint32 read_off = ringbuf_pos(rb, pg_atomic_read_u32(&rb->read_state));
+	uint32 write_off = ringbuf_pos(rb, rb->write_off);
+
+	/* not wrapped around */
+	if (read_off <= write_off)
+	{
+		return write_off - read_off;
+	}
+
+	/* wrapped around */
+	return (rb->size - read_off) + write_off;
+}
+
+size_t
+ringbuf_size(size_t nelems)
+{
+	Assert(nelems <= 0x0000FFFF);
+	return sizeof(ringbuf) + sizeof(void *) * nelems;
+}
+
+/*
+ * Memory needs to be externally allocated and be at least
+ * ringbuf_size(nelems) large.
+ */
+ringbuf *
+ringbuf_create(void *target, size_t nelems)
+{
+	ringbuf *rb = (ringbuf *) target;
+
+	Assert(nelems <= 0x0000FFFF);
+
+	memset(target, 0, ringbuf_size(nelems));
+
+	rb->size = nelems;
+	pg_atomic_init_u32(&rb->read_state, 0);
+	rb->write_off = 0;
+
+	return rb;
+}
+
+bool
+ringbuf_push(ringbuf *rb, void *data)
+{
+	uint32 read_off = pg_atomic_read_u32(&rb->read_state);
+
+	/*
+	 * Check if full - can be outdated, but that's ok. New readers are just
+	 * going to further consume elements, never cause the buffer to become
+	 * full.
+	 */
+	if (ringbuf_pos(rb, read_off)
+		== ringbuf_pos(rb, ringbuf_advance_pos(rb, rb->write_off)))
+	{
+		return false;
+	}
+
+	rb->elements[ringbuf_pos(rb, rb->write_off)] = data;
+
+	/*
+	 * The write adding the data needs to be visible before the corresponding
+	 * increase of write_off is visible.
+	 */
+	pg_write_barrier();
+
+	rb->write_off = ringbuf_advance_pos(rb, rb->write_off);
+
+	return true;
+}
+
+
+bool
+ringbuf_pop(ringbuf *rb, void **data)
+{
+	void *ret;
+	uint32 mybackend = MyProc->backendId;
+
+	Assert((mybackend & 0x0000ffff) == mybackend);
+
+	while (true)
+	{
+		uint32 read_state = pg_atomic_read_u32(&rb->read_state);
+		uint32 read_off = ringbuf_pos(rb, read_state);
+		uint32 old_read_state = read_state;
+
+		/* check if empty - can be outdated, but that's ok */
+		if (read_off == ringbuf_pos(rb, rb->write_off))
+			return false;
+
+		/*
+		 * Add our backend id to the position, to detect wrap around.
+		 * XXX
+		 *
+		 * XXX: Skip if the ID already is ours. That's probably likely enough
+		 * to warrant the additional branch.
+		 */
+		read_state = (read_state & 0x0000ffff) | mybackend << 16;
+
+		/*
+		 * Mix the reader position into the current read_off, otherwise
+		 * unchanged. If the offset changed since, retry from start.
+		 *
+		 * NB: This also serves as the read barrier pairing with the write
+		 * barrier in ringbuf_push().
+		 */
+		if (!pg_atomic_compare_exchange_u32(&rb->read_state, &old_read_state,
+											read_state))
+			continue;
+		old_read_state = read_state; /* with backend id mixed in */
+
+		/* finally read the data */
+		ret = rb->elements[read_off];
+
+		/* compute next offset */
+		read_state = ringbuf_advance_pos(rb, read_state);
+
+		if (pg_atomic_compare_exchange_u32(&rb->read_state, &old_read_state,
+										   read_state))
+			break;
+	}
+
+	*data = ret;
+
+	return true;
+}
diff --git a/src/include/lib/ringbuf.h b/src/include/lib/ringbuf.h
new file mode 100644
index 00000000000..3be450bb8f8
--- /dev/null
+++ b/src/include/lib/ringbuf.h
@@ -0,0 +1,72 @@
+/*
+ * ringbuf.h
+ *
+ * Single writer.multiple reader lockless & obstruction free ringbuffer.
+ *
+ * Copyright (c) 2015, PostgreSQL Global Development Group
+ *
+ * src/include/lib/ringbuf.h
+ */
+#ifndef RINGBUF_H
+#define RINGBUF_H
+
+#include "port/atomics.h"
+
+typedef struct ringbuf
+{
+	uint32 size;
+
+	/* 16 bit reader id, 16 bit offset */
+	/* XXX: probably should be on separate cachelines */
+	pg_atomic_uint32 read_state;
+	uint32_t write_off;
+
+	void *elements[FLEXIBLE_ARRAY_MEMBER];
+} ringbuf;
+
+size_t ringbuf_size(size_t nelems);
+
+ringbuf *ringbuf_create(void *target, size_t nelems);
+
+static inline uint32
+ringbuf_pos(ringbuf *rb, uint32 pos)
+{
+	/*
+	 * XXX: replacing rb->size with a bitmask op would avoid expensive
+	 * divisions. Requiring a pow2 size seems ok.
+	 */
+	return (pos & 0x0000ffff) % rb->size;
+}
+
+/*
+ * Compute the new offset, slightly complicated by the fact that we only want
+ * to modify the lower 16 bits.
+ */
+static inline uint32
+ringbuf_advance_pos(ringbuf *rb, uint32 pos)
+{
+	return ((ringbuf_pos(rb, pos) + 1) & 0x0000FFFF) | (pos & 0xFFFF0000);
+}
+
+static inline bool
+ringbuf_empty(ringbuf *rb)
+{
+	uint32 read_state = pg_atomic_read_u32(&rb->read_state);
+
+	return ringbuf_pos(rb, read_state) == ringbuf_pos(rb, rb->write_off);
+}
+
+static inline bool
+ringbuf_full(ringbuf *rb)
+{
+	uint32 read_state = pg_atomic_read_u32(&rb->read_state);
+
+	return ringbuf_pos(rb, read_state) ==
+		ringbuf_pos(rb, ringbuf_advance_pos(rb, rb->write_off));
+}
+
+uint32 ringbuf_elements(ringbuf *rb);
+bool ringbuf_push(ringbuf *rb, void *data);
+bool ringbuf_pop(ringbuf *rb, void **data);
+
+#endif
-- 
2.22.0.dirty

