Index: src/backend/storage/ipc/sinval.c
===================================================================
RCS file: /home/sriggs/pg/REPOSITORY/pgsql/src/backend/storage/ipc/sinval.c,v
retrieving revision 1.85
diff -c -r1.85 sinval.c
*** src/backend/storage/ipc/sinval.c	17 Mar 2008 11:50:26 -0000	1.85
--- src/backend/storage/ipc/sinval.c	19 Mar 2008 17:51:19 -0000
***************
*** 69,74 ****
--- 69,86 ----
   * Hence, we must be holding no SI resources when we call them.  The only
   * bad side-effect is that SIDelExpiredDataEntries might be called extra
   * times on the way out of a nested call.
+  *
+  * We minimise the number of times SIDelExpiredDataEntries is called by having
+  * just one backend set as the appointed deleter. Initially this is not set
+  * until the queue reaches 50% full. Once set the appointed backend will
+  * perform the delete and then pass the role onto another backend. If the
+  * appointed backend should go away then new inserted messages will reappoint
+  * another backend when the queue gets long enough. This works well if there
+  * is just one backend with the longest queue length. If more than one backend
+  * shares the burden of having the longest queue length then we may experience
+  * some inefficiency. This is unlikely with long queue lengths, but could be
+  * fairly common with short queues. So we unset the appointed_deleter if the
+  * queue length drops below 25% of the maximum queue length. 
   */
  void
  ReceiveSharedInvalidMessages(
***************
*** 77,83 ****
  {
  	SharedInvalidationMessage data;
  	int			getResult;
! 	bool		gotMessage = false;
  
  	for (;;)
  	{
--- 89,95 ----
  {
  	SharedInvalidationMessage data;
  	int			getResult;
! 	bool			appointed_deleter = false;
  
  	for (;;)
  	{
***************
*** 87,93 ****
  		 */
  		catchupInterruptOccurred = 0;
  
! 		getResult = SIGetDataEntry(MyBackendId, &data);
  
  		if (getResult == 0)
  			break;				/* nothing more to do */
--- 99,105 ----
  		 */
  		catchupInterruptOccurred = 0;
  
! 		getResult = SIGetDataEntry(MyBackendId, &data, &appointed_deleter);
  
  		if (getResult == 0)
  			break;				/* nothing more to do */
***************
*** 102,112 ****
  			/* got a normal data message */
  			invalFunction(&data);
  		}
- 		gotMessage = true;
  	}
  
! 	/* If we got any messages, try to release dead messages */
! 	if (gotMessage)
  		SIDelExpiredDataEntries(false);
  }
  
--- 114,123 ----
  			/* got a normal data message */
  			invalFunction(&data);
  		}
  	}
  
! 	/* If we are The One, try to release dead messages */
! 	if (appointed_deleter)
  		SIDelExpiredDataEntries(false);
  }
  
Index: src/backend/storage/ipc/sinvaladt.c
===================================================================
RCS file: /home/sriggs/pg/REPOSITORY/pgsql/src/backend/storage/ipc/sinvaladt.c,v
retrieving revision 1.68
diff -c -r1.68 sinvaladt.c
*** src/backend/storage/ipc/sinvaladt.c	17 Mar 2008 11:50:27 -0000	1.68
--- src/backend/storage/ipc/sinvaladt.c	19 Mar 2008 18:11:35 -0000
***************
*** 83,88 ****
--- 83,92 ----
  	int			lastBackend;	/* index of last active procState entry, +1 */
  	int			maxBackends;	/* size of procState array */
  	int			freeBackends;	/* number of empty procState slots */
+ 	int			appointed_deleter;      /* backend has been appointed as
+ 							 * the one to perform deletion of the
+ 							 * message queue
+ 							 */
  
  	/*
  	 * Next LocalTransactionId to use for each idle backend slot.  We keep
***************
*** 159,164 ****
--- 163,170 ----
  	shmInvalBuffer->maxBackends = MaxBackends;
  	shmInvalBuffer->freeBackends = MaxBackends;
  
+ 	shmInvalBuffer->appointed_deleter = -1;
+ 
  	/* The buffer[] array is initially all unused, so we need not fill it */
  
  	/* Mark all backends inactive, and initialize nextLXID */
***************
*** 321,326 ****
--- 327,340 ----
  	}
  
  	/*
+ 	 * Try to prevent table overflow.  When the table is >50% full 
+ 	 * issue a delete every ~6% of messages insertions. We do this
+ 	 * to avoid hitting the next higher limit where we wake everybody.
+ 	 */
+ 	if ((numMsgs > (MAXNUMMESSAGES / 2)) && (segP->maxMsgNum % (MAXNUMMESSAGES / 16) == 0))
+ 		SIDelExpiredDataEntries(true);
+ 
+ 	/*
  	 * Try to prevent table overflow.  When the table is 70% full send a
  	 * WAKEN_CHILDREN request to the postmaster.  The postmaster will send a
  	 * SIGUSR1 signal to all the backends, which will cause sinval.c to read
***************
*** 333,338 ****
--- 347,354 ----
  	if (numMsgs == (MAXNUMMESSAGES * 70 / 100) &&
  		IsUnderPostmaster)
  	{
+ 		SIDelExpiredDataEntries(true);
+ 
  		elog(DEBUG4, "SI table is 70%% full, signaling postmaster");
  		signal_postmaster = true;
  	}
***************
*** 365,370 ****
--- 381,388 ----
  	segP->minMsgNum = 0;
  	segP->maxMsgNum = 0;
  
+ 	segP->appointed_deleter = -1;
+ 
  	for (i = 0; i < segP->lastBackend; i++)
  	{
  		if (segP->procState[i].nextMsgNum >= 0) /* active backend? */
***************
*** 394,400 ****
   * allowing SInvalLock to be grabbed in shared mode for any other reason!
   */
  int
! SIGetDataEntry(int backendId, SharedInvalidationMessage *data)
  {
  	ProcState  *stateP;
  	SISeg	   *segP;
--- 412,418 ----
   * allowing SInvalLock to be grabbed in shared mode for any other reason!
   */
  int
! SIGetDataEntry(int backendId, SharedInvalidationMessage *data, bool *appointed_deleter)
  {
  	ProcState  *stateP;
  	SISeg	   *segP;
***************
*** 433,438 ****
--- 451,459 ----
  	 * delete it here. SIDelExpiredDataEntries() should be called to remove
  	 * dead messages.
  	 */
+ 	if (segP->appointed_deleter == MyBackendId)
+ 		*appointed_deleter = true;
+ 
  
  	LWLockRelease(SInvalLock);
  	return 1;					/* got a message */
***************
*** 449,454 ****
--- 470,476 ----
  	int			min,
  				i,
  				h;
+ 	int			deleter = -1;
  
  	if (!locked)
  		LWLockAcquire(SInvalLock, LW_EXCLUSIVE);
***************
*** 469,479 ****
--- 491,517 ----
  		if (h >= 0)
  		{						/* backend active */
  			if (h < min)
+ 			{
  				min = h;
+ 				deleter = i + 1;
+ 			}
  		}
  	}
  	segP->minMsgNum = min;
  
+ 	/* 
+ 	 * Set the appointed deleter to be the highest BackendId that shares
+ 	 * the minimum message number. This way we have only one appointed
+ 	 * deleter, so we can minimise calls to this function, since it
+ 	 * requires us to hold an Exclusive lock. Only assign a deleter if
+ 	 * the message queue is still long, so we can avoid nibbling away
+ 	 * at the queue.
+ 	 */
+ 	if ((segP->maxMsgNum - segP->minMsgNum) > (MAXNUMMESSAGES / 4)) 
+ 		segP->appointed_deleter = deleter;
+ 	else
+ 		segP->appointed_deleter = -1;
+ 
  	/*
  	 * When minMsgNum gets really large, decrement all message counters so as
  	 * to forestall overflow of the counters.
Index: src/include/storage/sinvaladt.h
===================================================================
RCS file: /home/sriggs/pg/REPOSITORY/pgsql/src/include/storage/sinvaladt.h,v
retrieving revision 1.47
diff -c -r1.47 sinvaladt.h
*** src/include/storage/sinvaladt.h	17 Mar 2008 11:50:27 -0000	1.47
--- src/include/storage/sinvaladt.h	19 Mar 2008 17:34:06 -0000
***************
*** 32,38 ****
  extern void SharedInvalBackendInit(void);
  
  extern bool SIInsertDataEntry(SharedInvalidationMessage *data);
! extern int SIGetDataEntry(int backendId, SharedInvalidationMessage *data);
  extern void SIDelExpiredDataEntries(bool locked);
  
  extern LocalTransactionId GetNextLocalTransactionId(void);
--- 32,39 ----
  extern void SharedInvalBackendInit(void);
  
  extern bool SIInsertDataEntry(SharedInvalidationMessage *data);
! extern int SIGetDataEntry(int backendId, SharedInvalidationMessage *data
! 						, bool *appointed_deleter);
  extern void SIDelExpiredDataEntries(bool locked);
  
  extern LocalTransactionId GetNextLocalTransactionId(void);
