From 587c0e3eb954f8da8eb77f101ac714ddb7c876bf Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.munro@gmail.com>
Date: Mon, 24 Feb 2020 19:05:00 +1300
Subject: [PATCH 04/11] Introduce RemoveWaitEvent().

This will allow WaitEventSet objects to be used in more
long lived scenarios, where sockets are added and removed.

Author: Thomas Munro
Discussion: https://postgr.es/m/CA%2BhUKGJAC4Oqao%3DqforhNey20J8CiG2R%3DoBPqvfR0vOJrFysGw%40mail.gmail.com
---
 src/backend/storage/ipc/latch.c | 131 ++++++++++++++++++++++++++++----
 src/include/storage/latch.h     |   3 +
 2 files changed, 120 insertions(+), 14 deletions(-)

diff --git a/src/backend/storage/ipc/latch.c b/src/backend/storage/ipc/latch.c
index 30e461e965..025545fc89 100644
--- a/src/backend/storage/ipc/latch.c
+++ b/src/backend/storage/ipc/latch.c
@@ -84,6 +84,7 @@ struct WaitEventSet
 {
 	int			nevents;		/* number of registered events */
 	int			nevents_space;	/* maximum number of events in this set */
+	int			free_list;		/* position of first free event */
 
 	/*
 	 * Array, of nevents_space length, storing the definition of events this
@@ -119,6 +120,8 @@ struct WaitEventSet
 #elif defined(WAIT_USE_POLL)
 	/* poll expects events to be waited on every poll() call, prepare once */
 	struct pollfd *pollfds;
+	/* track the populated range of pollfds */
+	int			npollfds;
 #elif defined(WAIT_USE_WIN32)
 
 	/*
@@ -127,6 +130,8 @@ struct WaitEventSet
 	 * event->pos + 1).
 	 */
 	HANDLE	   *handles;
+	/* track the populated range of handles */
+	int			nhandles;
 #endif
 };
 
@@ -642,13 +647,16 @@ CreateWaitEventSet(MemoryContext context, int nevents)
 #elif defined(WAIT_USE_POLL)
 	set->pollfds = (struct pollfd *) data;
 	data += MAXALIGN(sizeof(struct pollfd) * nevents);
+	set->npollfds = 0;
 #elif defined(WAIT_USE_WIN32)
 	set->handles = (HANDLE) data;
 	data += MAXALIGN(sizeof(HANDLE) * nevents);
+	set->nhandles = 0;
 #endif
 
 	set->latch = NULL;
 	set->nevents_space = nevents;
+	set->nevents = 0;
 	set->exit_on_postmaster_death = false;
 
 #if defined(WAIT_USE_EPOLL)
@@ -714,11 +722,25 @@ CreateWaitEventSet(MemoryContext context, int nevents)
 	 * Note: pgwin32_signal_event should be first to ensure that it will be
 	 * reported when multiple events are set.  We want to guarantee that
 	 * pending signals are serviced.
+	 *
+	 * We set unused handles to INVALID_HANDLE_VALUE, because
+	 * WaitForMultipleObjects() considers that to mean "this process" which is
+	 * not signaled until process, so it's a way of leaving a hole in the
+	 * middle of the wait set if you remove something (just like -1 in the poll
+	 * implementation).  An alternative would be to fill in holes and create a
+	 * non 1-to-1 mapping between 'events' and 'handles'.
 	 */
 	set->handles[0] = pgwin32_signal_event;
-	StaticAssertStmt(WSA_INVALID_EVENT == NULL, "");
+	for (int i = 0; i < nevents; ++i)
+		set->handles[i + 1] = INVALID_HANDLE_VALUE;
 #endif
 
+	/* Set up the free list. */
+	for (int i = 0; i < nevents; ++i)
+		set->events[i].next_free = i + 1;
+	set->events[nevents - 1].next_free = -1;
+	set->free_list = 0;
+
 	return set;
 }
 
@@ -727,7 +749,6 @@ CreateWaitEventSet(MemoryContext context, int nevents)
  *
  * Note: preferably, this shouldn't have to free any resources that could be
  * inherited across an exec().  If it did, we'd likely leak those resources in
- * many scenarios.  For the epoll case, we ensure that by setting FD_CLOEXEC
  * when the FD is created.  For the Windows case, we assume that the handles
  * involved are non-inheritable.
  */
@@ -748,9 +769,12 @@ FreeWaitEventSet(WaitEventSet *set)
 	WaitEvent  *cur_event;
 
 	for (cur_event = set->events;
-		 cur_event < (set->events + set->nevents);
+		 cur_event < (set->events + set->nhandles);
 		 cur_event++)
 	{
+		if (set->handles[cur_event->pos + 1] == INVALID_HANDLE_VALUE)
+			continue;
+
 		if (cur_event->events & WL_LATCH_SET)
 		{
 			/* uses the latch's HANDLE */
@@ -805,9 +829,6 @@ AddWaitEventToSet(WaitEventSet *set, uint32 events, pgsocket fd, Latch *latch,
 {
 	WaitEvent  *event;
 
-	/* not enough space */
-	Assert(set->nevents < set->nevents_space);
-
 	if (events == WL_EXIT_ON_PM_DEATH)
 	{
 		events = WL_POSTMASTER_DEATH;
@@ -833,8 +854,12 @@ AddWaitEventToSet(WaitEventSet *set, uint32 events, pgsocket fd, Latch *latch,
 	if (fd == PGINVALID_SOCKET && (events & WL_SOCKET_MASK))
 		elog(ERROR, "cannot wait on socket event without a socket");
 
-	event = &set->events[set->nevents];
-	event->pos = set->nevents++;
+	/* Do we have any free slots? */
+	if (set->free_list == -1)
+		elog(ERROR, "WaitEventSet is full");
+
+	event = &set->events[set->free_list];
+	event->pos = set->free_list;
 	event->fd = fd;
 	event->events = events;
 	event->user_data = user_data;
@@ -868,6 +893,11 @@ AddWaitEventToSet(WaitEventSet *set, uint32 events, pgsocket fd, Latch *latch,
 	WaitEventAdjustWin32(set, event);
 #endif
 
+	/* Remove it from the free list. */
+	set->free_list = event->next_free;
+	event->next_free = -1;
+	set->nevents++;
+
 	return event->pos;
 }
 
@@ -885,7 +915,7 @@ ModifyWaitEvent(WaitEventSet *set, int pos, uint32 events, Latch *latch)
 	int			old_events;
 #endif
 
-	Assert(pos < set->nevents);
+	Assert(pos < set->nevents_space);
 
 	event = &set->events[pos];
 #if defined(WAIT_USE_KQUEUE)
@@ -933,6 +963,63 @@ ModifyWaitEvent(WaitEventSet *set, int pos, uint32 events, Latch *latch)
 #endif
 }
 
+/*
+ * If the descriptor has already been closed, the kernel should already have
+ * removed it from the wait set (except in WAIT_USE_POLL).  Pass in true for
+ * fd_closed in that case, so we don't try to remove it ourselves.
+ */
+void
+RemoveWaitEvent(WaitEventSet *set, int pos, bool fd_closed)
+{
+	WaitEvent  *event;
+
+	Assert(pos >= 0);
+	Assert(pos < set->nevents_space);
+	event = &set->events[pos];
+
+	/* For now only sockets can be removed */
+	if ((event->events & WL_SOCKET_MASK) == 0)
+		elog(ERROR, "event type cannot be removed");
+
+#if defined(WAIT_USE_EPOLL)
+	if (!fd_closed)
+		WaitEventAdjustEpoll(set, event, EPOLL_CTL_DEL);
+#elif defined(WAIT_USE_KQUEUE)
+	if (!fd_closed)
+	{
+		int old_events = event->events;
+
+		event->events = 0;
+		WaitEventAdjustKqueue(set, event, old_events);
+	}
+#elif defined(WAIT_USE_POLL)
+	/* no kernel state to remove, just blank out the fd */
+	set->pollfds[event->pos].fd = -1;
+	/* see if we can shrink the range of active fds */
+	while (set->npollfds > 0 &&
+		   set->pollfds[set->npollfds - 1].fd == -1)
+		set->npollfds -= 1;
+#elif defined(WAIT_USE_WIN32)
+	if (!fd_closed)
+		WSAEventSelect(event->fd, NULL, 0);
+	if (set->handles[event->pos + 1] != INVALID_HANDLE_VALUE)
+	{
+		WSACloseEvent(set->handles[event->pos + 1]);
+		set->handles[event->pos + 1] = INVALID_HANDLE_VALUE;
+	}
+	/* see if we can shrink the range of active handles */
+	while (set->nhandles > 0 &&
+		   set->handles[set->nhandles] == INVALID_HANDLE_VALUE)
+		set->nhandles -= 1;
+#endif
+
+	/* This position is now free. */
+	memset(event, 0, sizeof(*event));
+	event->next_free = set->free_list;
+	set->free_list = pos;
+	set->nevents--;
+}
+
 #if defined(WAIT_USE_EPOLL)
 /*
  * action can be one of EPOLL_CTL_ADD | EPOLL_CTL_MOD | EPOLL_CTL_DEL
@@ -994,6 +1081,9 @@ WaitEventAdjustPoll(WaitEventSet *set, WaitEvent *event)
 	pollfd->revents = 0;
 	pollfd->fd = event->fd;
 
+	/* track the known range of populated slots */
+	set->npollfds = Max(event->pos + 1, set->nevents);
+
 	/* prepare pollfd entry once */
 	if (event->events == WL_LATCH_SET)
 	{
@@ -1072,7 +1162,9 @@ WaitEventAdjustKqueue(WaitEventSet *set, WaitEvent *event, int old_events)
 	Assert(event->events != WL_LATCH_SET || set->latch != NULL);
 	Assert(event->events == WL_LATCH_SET ||
 		   event->events == WL_POSTMASTER_DEATH ||
-		   (event->events & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE)));
+		   (event->events & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE)) ||
+		   (event->events == 0 &&
+			(old_events & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE))));
 
 	if (event->events == WL_POSTMASTER_DEATH)
 	{
@@ -1149,6 +1241,9 @@ WaitEventAdjustWin32(WaitEventSet *set, WaitEvent *event)
 {
 	HANDLE	   *handle = &set->handles[event->pos + 1];
 
+	/* track the known range of populated slots */
+	set->nhandles = Max(event->pos + 1, set->nhandles);
+
 	if (event->events == WL_LATCH_SET)
 	{
 		Assert(set->latch != NULL);
@@ -1169,12 +1264,15 @@ WaitEventAdjustWin32(WaitEventSet *set, WaitEvent *event)
 		if (event->events & WL_SOCKET_CONNECTED)
 			flags |= FD_CONNECT;
 
-		if (*handle == WSA_INVALID_EVENT)
+		if (*handle == INVALID_HANDLE_VALUE)
 		{
 			*handle = WSACreateEvent();
 			if (*handle == WSA_INVALID_EVENT)
+			{
+				*handle = INVALID_HANDLE_VALUE;
 				elog(ERROR, "failed to create event for socket: error code %u",
 					 WSAGetLastError());
+			}
 		}
 		if (WSAEventSelect(event->fd, *handle, flags) != 0)
 			elog(ERROR, "failed to set up event for socket: error code %u",
@@ -1304,6 +1402,11 @@ WaitEventSetWait(WaitEventSet *set, long timeout,
 	return returned_events;
 }
 
+int
+WaitEventSetSize(WaitEventSet *set)
+{
+	return set->nevents;
+}
 
 #if defined(WAIT_USE_EPOLL)
 
@@ -1589,7 +1692,7 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout,
 	struct pollfd *cur_pollfd;
 
 	/* Sleep */
-	rc = poll(set->pollfds, set->nevents, (int) cur_timeout);
+	rc = poll(set->pollfds, set->npollfds, (int) cur_timeout);
 
 	/* Check return code */
 	if (rc < 0)
@@ -1761,9 +1864,9 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout,
 	/*
 	 * Sleep.
 	 *
-	 * Need to wait for ->nevents + 1, because signal handle is in [0].
+	 * Need to wait for ->nhandles + 1, because signal handle is in [0].
 	 */
-	rc = WaitForMultipleObjects(set->nevents + 1, set->handles, FALSE,
+	rc = WaitForMultipleObjects(set->nhandles + 1, set->handles, FALSE,
 								cur_timeout);
 
 	/* Check return code */
diff --git a/src/include/storage/latch.h b/src/include/storage/latch.h
index ec1865a8fd..210f37659e 100644
--- a/src/include/storage/latch.h
+++ b/src/include/storage/latch.h
@@ -144,6 +144,7 @@ typedef struct WaitEvent
 	uint32		events;			/* triggered events */
 	pgsocket	fd;				/* socket fd associated with event */
 	void	   *user_data;		/* pointer provided in AddWaitEventToSet */
+	int			next_free;		/* free list for internal use */
 #ifdef WIN32
 	bool		reset;			/* Is reset of the event required? */
 #endif
@@ -168,6 +169,8 @@ extern void FreeWaitEventSet(WaitEventSet *set);
 extern int	AddWaitEventToSet(WaitEventSet *set, uint32 events, pgsocket fd,
 							  Latch *latch, void *user_data);
 extern void ModifyWaitEvent(WaitEventSet *set, int pos, uint32 events, Latch *latch);
+extern void RemoveWaitEvent(WaitEventSet *set, int pos, bool fd_closed);
+extern int WaitEventSetSize(WaitEventSet *set);
 
 extern int	WaitEventSetWait(WaitEventSet *set, long timeout,
 							 WaitEvent *occurred_events, int nevents,
-- 
2.20.1

