Sending notifications from the master to the standby

Started by Joachim Wielandabout 14 years ago17 messages
#1Joachim Wieland
joe@mcknight.de
1 attachment(s)

People have always expressed interest in $subject, so I wondered how
hard it could possibly be and came up with the attached patch.

Notifications that are generated on the master and are forwarded to
the standby can be used as a convenient way to find out which changes
have already made it to the standby. The idea would be that you run a
transaction on the master, add a "NOTIFY changes_made", and listen on
the standby for this event. Once it gets delivered, you know that your
transaction got replayed to the standby.

Note that this feature is only about LISTEN on the standby, it still
doesn't allow sending NOTIFYs out from the standby.

As a reminder, the current implementation of notifications
(LISTEN/NOTIFY) in a few words is:

- a transaction that executes "NOTIFY channel, payload" adds the
transaction to backend-local memory
- upon commit, it inserts the notifications along with its transaction
id into a large SLRU mapped ring buffer and signals any listening
backend

- each backend that's listening has a pointer into this ring buffer.
After each transaction, the backend starts reading from this pointer
position to the end of the ring buffer. It delivers all matching
notifications to its frontend if the transaction that has inserted
them is known to have committed.

In the patch I added a new WAL message type, XLOG_NOTIFY that writes
out WAL records when the notifications are written into the pages of
the SLRU ring buffer. Whenever an SLRU page is found to be full, a new
WAL record will be created, that's just a more or less arbitrary form
of batching a bunch of them together but that's easy to do and most
often, I think there won't be more than at most a few record per
transaction anyway.

The recovery process on the client side adds the notifications into
the standby's SLRU ring buffer and once the last notification has been
added (which might be after a couple more WAL records), it signals the
listening backends.

Theoretically we could also run into a full queue situation on the
standby: Imagine a long-running transaction doesn't advance its
pointer in the ring buffer and no new notifications can be stored in
the buffer. The patch introduces a new type of recovery conflict for
this reason.

One further optimization (that is not included for now) would be to
keep track of how many backends are actually listening on some channel
and if nobody is listening, discard incoming notifications.

Attachments:

notify-standby.difftext/x-patch; charset=US-ASCII; name=notify-standby.diffDownload
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index e22bdac..1b2eca3 100644
*** a/src/backend/access/transam/xact.c
--- b/src/backend/access/transam/xact.c
*************** RecordTransactionCommit(void)
*** 988,994 ****
  		/*
  		 * Do we need the long commit record? If not, use the compact format.
  		 */
! 		if (nrels > 0 || nmsgs > 0 || RelcacheInitFileInval || forceSyncCommit)
  		{
  			XLogRecData rdata[4];
  			int			lastrdata = 0;
--- 988,995 ----
  		/*
  		 * Do we need the long commit record? If not, use the compact format.
  		 */
! 		if (nrels > 0 || nmsgs > 0 || RelcacheInitFileInval || forceSyncCommit
! 			|| backendHasSentNotifications)
  		{
  			XLogRecData rdata[4];
  			int			lastrdata = 0;
*************** RecordTransactionCommit(void)
*** 1009,1014 ****
--- 1010,1016 ----
  			xlrec.nrels = nrels;
  			xlrec.nsubxacts = nchildren;
  			xlrec.nmsgs = nmsgs;
+ 			xlrec.sent_notifications = backendHasSentNotifications;
  			rdata[0].data = (char *) (&xlrec);
  			rdata[0].len = MinSizeOfXactCommit;
  			rdata[0].buffer = InvalidBuffer;
*************** RecordTransactionAbort(bool isSubXact)
*** 1409,1414 ****
--- 1411,1417 ----
  	}
  	xlrec.nrels = nrels;
  	xlrec.nsubxacts = nchildren;
+ 	xlrec.sent_notifications = backendHasSentNotifications;
  	rdata[0].data = (char *) (&xlrec);
  	rdata[0].len = MinSizeOfXactAbort;
  	rdata[0].buffer = InvalidBuffer;
*************** xact_redo_commit_internal(TransactionId
*** 4510,4516 ****
  					TransactionId *sub_xids, int nsubxacts,
  					SharedInvalidationMessage *inval_msgs, int nmsgs,
  					RelFileNode *xnodes, int nrels,
! 					Oid dbId, Oid tsId,
  					uint32 xinfo)
  {
  	TransactionId max_xid;
--- 4513,4519 ----
  					TransactionId *sub_xids, int nsubxacts,
  					SharedInvalidationMessage *inval_msgs, int nmsgs,
  					RelFileNode *xnodes, int nrels,
! 					Oid dbId, Oid tsId, bool sent_notifications,
  					uint32 xinfo)
  {
  	TransactionId max_xid;
*************** xact_redo_commit_internal(TransactionId
*** 4616,4621 ****
--- 4619,4626 ----
  	if (XactCompletionForceSyncCommit(xinfo))
  		XLogFlush(lsn);
  
+ 	if (sent_notifications)
+ 		AtEOXact_HS_Notify();
  }
  /*
   * Utility function to call xact_redo_commit_internal after breaking down xlrec
*************** xact_redo_commit(xl_xact_commit *xlrec,
*** 4637,4642 ****
--- 4642,4648 ----
  								xlrec->xnodes, xlrec->nrels,
  								xlrec->dbId,
  								xlrec->tsId,
+ 								xlrec->sent_notifications,
  								xlrec->xinfo);
  }
  
*************** xact_redo_commit_compact(xl_xact_commit_
*** 4652,4657 ****
--- 4658,4664 ----
  								NULL, 0,		/* relfilenodes */
  								InvalidOid,		/* dbId */
  								InvalidOid,		/* tsId */
+ 								false,			/* sent_notifications */
  								0);				/* xinfo */
  }
  
*************** xact_redo_abort(xl_xact_abort *xlrec, Tr
*** 4740,4745 ****
--- 4747,4755 ----
  		}
  		smgrclose(srel);
  	}
+ 
+ 	if (xlrec->sent_notifications)
+ 		AtEOXact_HS_Notify();
  }
  
  void
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 8e65962..78d8905 100644
*** a/src/backend/access/transam/xlog.c
--- b/src/backend/access/transam/xlog.c
***************
*** 35,40 ****
--- 35,41 ----
  #include "catalog/catversion.h"
  #include "catalog/pg_control.h"
  #include "catalog/pg_database.h"
+ #include "commands/async.h"
  #include "libpq/pqsignal.h"
  #include "miscadmin.h"
  #include "pgstat.h"
*************** RequestXLogSwitch(void)
*** 8306,8311 ****
--- 8307,8326 ----
  	return RecPtr;
  }
  
+ void
+ XLogInsertNotifyData(NotifyData *NotifyRecord)
+ {
+ 	XLogRecData	rdata;
+ 
+ 	rdata.buffer = InvalidBuffer;
+ 	rdata.data = (char *) NotifyRecord;
+ 	/* -1 because one character is already contained in sizeof(NotifyData) */
+ 	rdata.len = sizeof(NotifyData) + NotifyRecord->notify_len - 1;
+ 	rdata.next = NULL;
+ 
+ 	(void) XLogInsert(RM_XLOG_ID, XLOG_NOTIFY, &rdata);
+ }
+ 
  /*
   * Write a RESTORE POINT record
   */
*************** xlog_redo(XLogRecPtr lsn, XLogRecord *re
*** 8529,8534 ****
--- 8544,8554 ----
  	{
  		/* nothing to do here */
  	}
+ 	else if (info == XLOG_NOTIFY)
+ 	{
+ 		NotifyData *rdata = (NotifyData *) XLogRecGetData(record);
+ 		asyncGetNotifyFromXLog(rdata, record->xl_xid);
+ 	}
  	else if (info == XLOG_BACKUP_END)
  	{
  		XLogRecPtr	startpoint;
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index 50ba20c..051595c 100644
*** a/src/backend/catalog/system_views.sql
--- b/src/backend/catalog/system_views.sql
*************** CREATE VIEW pg_stat_database_conflicts A
*** 585,591 ****
              pg_stat_get_db_conflict_lock(D.oid) AS confl_lock,
              pg_stat_get_db_conflict_snapshot(D.oid) AS confl_snapshot,
              pg_stat_get_db_conflict_bufferpin(D.oid) AS confl_bufferpin,
!             pg_stat_get_db_conflict_startup_deadlock(D.oid) AS confl_deadlock
      FROM pg_database D;
  
  CREATE VIEW pg_stat_user_functions AS
--- 585,592 ----
              pg_stat_get_db_conflict_lock(D.oid) AS confl_lock,
              pg_stat_get_db_conflict_snapshot(D.oid) AS confl_snapshot,
              pg_stat_get_db_conflict_bufferpin(D.oid) AS confl_bufferpin,
!             pg_stat_get_db_conflict_startup_deadlock(D.oid) AS confl_deadlock,
!             pg_stat_get_db_conflict_notify_queue(D.oid) AS confl_notify
      FROM pg_database D;
  
  CREATE VIEW pg_stat_user_functions AS
diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c
index a81f7c7..b9528b4 100644
*** a/src/backend/commands/async.c
--- b/src/backend/commands/async.c
***************
*** 118,123 ****
--- 118,124 ----
  #include "access/slru.h"
  #include "access/transam.h"
  #include "access/xact.h"
+ #include "access/xlog.h"
  #include "catalog/pg_database.h"
  #include "commands/async.h"
  #include "funcapi.h"
***************
*** 128,133 ****
--- 129,135 ----
  #include "storage/lmgr.h"
  #include "storage/procsignal.h"
  #include "storage/sinval.h"
+ #include "storage/standby.h"
  #include "tcop/tcopprot.h"
  #include "utils/builtins.h"
  #include "utils/memutils.h"
***************
*** 160,165 ****
--- 162,168 ----
  typedef struct AsyncQueueEntry
  {
  	int			length;			/* total allocated length of entry */
+ 	int			str_length;		/* only the length of the two string fields */
  	Oid			dboid;			/* sender's database OID */
  	TransactionId xid;			/* sender's XID */
  	int32		srcPid;			/* sender's PID */
*************** static volatile sig_atomic_t notifyInter
*** 351,357 ****
  static bool unlistenExitRegistered = false;
  
  /* has this backend sent notifications in the current transaction? */
! static bool backendHasSentNotifications = false;
  
  /* has this backend executed its first LISTEN in the current transaction? */
  static bool backendHasExecutedInitialListen = false;
--- 354,360 ----
  static bool unlistenExitRegistered = false;
  
  /* has this backend sent notifications in the current transaction? */
! bool backendHasSentNotifications = false;
  
  /* has this backend executed its first LISTEN in the current transaction? */
  static bool backendHasExecutedInitialListen = false;
*************** static bool IsListeningOn(const char *ch
*** 371,378 ****
  static void asyncQueueUnregister(void);
  static bool asyncQueueIsFull(void);
  static bool asyncQueueAdvance(QueuePosition *position, int entryLength);
! static void asyncQueueNotificationToEntry(Notification *n, AsyncQueueEntry *qe);
! static ListCell *asyncQueueAddEntries(ListCell *nextNotify);
  static void asyncQueueFillWarning(void);
  static bool SignalBackends(void);
  static void asyncQueueReadAllNotifications(void);
--- 374,388 ----
  static void asyncQueueUnregister(void);
  static bool asyncQueueIsFull(void);
  static bool asyncQueueAdvance(QueuePosition *position, int entryLength);
! static void asyncQueueNotificationToEntry(Notification *n, AsyncQueueEntry *qe,
! 										  Oid dboid, TransactionId xid);
! static ListCell *asyncQueueAddEntries(ListCell *nextNotify, Oid dboid,
! 									  TransactionId xid);
! static void asyncAddNotifyXLog(ListCell *HSStart, ListCell *HSEnd,
! 							   uint32 xlogLength, Oid dboid, TransactionId xid,
! 							   bool isLast);
! static void asyncAddNotificationsIntoQueue(Oid dboid, TransactionId xid);
! static int32 asyncQueueGetSlowestPid(int *backendId);
  static void asyncQueueFillWarning(void);
  static bool SignalBackends(void);
  static void asyncQueueReadAllNotifications(void);
*************** PreCommit_Notify(void)
*** 768,773 ****
--- 778,784 ----
  	if (Trace_notify)
  		elog(DEBUG1, "PreCommit_Notify");
  
+ 	Assert(backendHasSentNotifications == false);
  	Assert(backendHasExecutedInitialListen == false);
  
  	/* Preflight for any pending listen/unlisten actions */
*************** PreCommit_Notify(void)
*** 788,853 ****
  				break;
  		}
  	}
- 
  	/* Queue any pending notifies */
! 	if (pendingNotifies)
! 	{
! 		ListCell   *nextNotify;
  
! 		/*
! 		 * Make sure that we have an XID assigned to the current transaction.
! 		 * GetCurrentTransactionId is cheap if we already have an XID, but not
! 		 * so cheap if we don't, and we'd prefer not to do that work while
! 		 * holding AsyncQueueLock.
! 		 */
! 		(void) GetCurrentTransactionId();
  
  		/*
! 		 * Serialize writers by acquiring a special lock that we hold till
! 		 * after commit.  This ensures that queue entries appear in commit
! 		 * order, and in particular that there are never uncommitted queue
! 		 * entries ahead of committed ones, so an uncommitted transaction
! 		 * can't block delivery of deliverable notifications.
  		 *
! 		 * We use a heavyweight lock so that it'll automatically be released
! 		 * after either commit or abort.  This also allows deadlocks to be
! 		 * detected, though really a deadlock shouldn't be possible here.
  		 *
! 		 * The lock is on "database 0", which is pretty ugly but it doesn't
! 		 * seem worth inventing a special locktag category just for this.
! 		 * (Historical note: before PG 9.0, a similar lock on "database 0" was
! 		 * used by the flatfiles mechanism.)
  		 */
! 		LockSharedObject(DatabaseRelationId, InvalidOid, 0,
! 						 AccessExclusiveLock);
  
! 		/* Now push the notifications into the queue */
! 		backendHasSentNotifications = true;
  
! 		nextNotify = list_head(pendingNotifies);
! 		while (nextNotify != NULL)
  		{
- 			/*
- 			 * Add the pending notifications to the queue.	We acquire and
- 			 * release AsyncQueueLock once per page, which might be overkill
- 			 * but it does allow readers to get in while we're doing this.
- 			 *
- 			 * A full queue is very uncommon and should really not happen,
- 			 * given that we have so much space available in the SLRU pages.
- 			 * Nevertheless we need to deal with this possibility. Note that
- 			 * when we get here we are in the process of committing our
- 			 * transaction, but we have not yet committed to clog, so at this
- 			 * point in time we can still roll the transaction back.
- 			 */
- 			LWLockAcquire(AsyncQueueLock, LW_EXCLUSIVE);
- 			asyncQueueFillWarning();
  			if (asyncQueueIsFull())
  				ereport(ERROR,
  						(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
  					  errmsg("too many notifications in the NOTIFY queue")));
- 			nextNotify = asyncQueueAddEntries(nextNotify);
- 			LWLockRelease(AsyncQueueLock);
  		}
  	}
  }
  
--- 799,919 ----
  				break;
  		}
  	}
  	/* Queue any pending notifies */
! 	asyncAddNotificationsIntoQueue(MyDatabaseId, InvalidTransactionId);
! }
  
! static void
! asyncAddNotificationsIntoQueue(Oid dboid, TransactionId xid)
! {
! 	ListCell   *nextNotify;
! 
! 	if (pendingNotifies == NIL)
! 		return;
! 
! 	/*
! 	 * Make sure that we have an XID assigned to the current transaction.
! 	 * GetCurrentTransactionId is cheap if we already have an XID, but not
! 	 * so cheap if we don't, and we'd prefer not to do that work while
! 	 * holding AsyncQueueLock.
! 	 */
! 	if (!TransactionIdIsValid(xid))
! 		xid = GetCurrentTransactionId();
  
+ 	/*
+ 	 * Serialize writers by acquiring a special lock that we hold till
+ 	 * after commit.  This ensures that queue entries appear in commit
+ 	 * order, and in particular that there are never uncommitted queue
+ 	 * entries ahead of committed ones, so an uncommitted transaction
+ 	 * can't block delivery of deliverable notifications.
+ 	 *
+ 	 * We use a heavyweight lock so that it'll automatically be released
+ 	 * after either commit or abort.  This also allows deadlocks to be
+ 	 * detected, though really a deadlock shouldn't be possible here.
+ 	 *
+ 	 * The lock is on "database 0", which is pretty ugly but it doesn't
+ 	 * seem worth inventing a special locktag category just for this.
+ 	 * (Historical note: before PG 9.0, a similar lock on "database 0" was
+ 	 * used by the flatfiles mechanism.)
+ 	 */
+ 	if (!RecoveryInProgress())
+ 		LockSharedObject(DatabaseRelationId, InvalidOid, 0,
+ 						 AccessExclusiveLock);
+ 
+ 	/* Now push the notifications into the queue */
+ 	backendHasSentNotifications = true;
+ 
+ 	nextNotify = list_head(pendingNotifies);
+ 	while (nextNotify != NULL)
+ 	{
  		/*
! 		 * Add the pending notifications to the queue.	We acquire and
! 		 * release AsyncQueueLock once per page, which might be overkill
! 		 * but it does allow readers to get in while we're doing this.
  		 *
! 		 * A full queue is very uncommon and should really not happen,
! 		 * given that we have so much space available in the SLRU pages.
! 		 * Nevertheless we need to deal with this possibility.
  		 *
! 		 * On the primary, note that when we get here we are in the process of
! 		 * committing our transaction, but we have not yet committed to clog,
! 		 * so at this point in time we can still roll the transaction back.
! 		 *
! 		 * On the standby if we wouldn't have listening processes, we'd never
! 		 * run into the issue because if the master overflows, then we don't
! 		 * even get that XLOG record that causes the overflow. So the only
! 		 * reason why it can happen on the standby is that there are backend
! 		 * processes that are busy and haven't had the time to catch up. We
! 		 * need to kill these now so that we can go on.
  		 */
! 		LWLockAcquire(AsyncQueueLock, LW_EXCLUSIVE);
! 		asyncQueueFillWarning();
! 		if (RecoveryInProgress())
! 		{
! 			while (asyncQueueIsFull())
! 			{
! 				int32 pid;
! 				int backendId = 1;
! 				bool found = false;
! 				while ((pid = asyncQueueGetSlowestPid(&backendId)) != InvalidPid)
! 				{
! 					SendRecoveryConflictWithNotifyQueue(pid);
! 					/* continue checking the next backend and not the same one again */
! 					backendId++;
! 					found = true;
! 				}
  
! 				/*
! 				 * This shouldn't really happen. It could happen if the master
! 				 * had a larger queue than the standby and if we filled up the queue
! 				 * in one step (for only one transaction) or if we do only have
! 				 * uncommitted transactions in the queue. But since the master
! 				 * always has the same queue, such a transaction would have
! 				 * errored out on the master already.
! 				 */
! 				if (!found)
! 					elog(PANIC, "Notify queue is full but nobody is blocking it");
  
! 				/* Need to release the lock so that others can catch up */
! 				LWLockRelease(AsyncQueueLock);
! 				/*
! 				 * wait a bit (250ms) for other processes to terminate their
! 				 * transactions and catch up.
! 				 */
! 				pg_usleep(250000);
! 				/* Grab the lock again before looping */
! 				LWLockAcquire(AsyncQueueLock, LW_EXCLUSIVE);
! 			}
! 		}
! 		else
  		{
  			if (asyncQueueIsFull())
  				ereport(ERROR,
  						(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
  					  errmsg("too many notifications in the NOTIFY queue")));
  		}
+ 		nextNotify = asyncQueueAddEntries(nextNotify, dboid, xid);
+ 		LWLockRelease(AsyncQueueLock);
  	}
  }
  
*************** AtCommit_Notify(void)
*** 903,908 ****
--- 969,993 ----
  }
  
  /*
+  * AtEOXact_HS_Notify
+  *
+  *		This is called at transaction commit/abort, after committing/aborting
+  *		to clog.
+  *
+  *		Update listenChannels and clear transaction-local state.
+  */
+ void
+ AtEOXact_HS_Notify(void)
+ {
+ 	/*
+ 	 * Send signals to other backends if we have sent some out, if there's
+ 	 * no other backend listening, advance the queue pointer ourselves.
+ 	 */
+ 	if (!SignalBackends())
+ 		asyncQueueAdvanceTail();
+ }
+ 
+ /*
   * Exec_ListenPreCommit --- subroutine for PreCommit_Notify
   *
   * This function must make sure we are ready to catch any incoming messages.
*************** asyncQueueAdvance(QueuePosition *positio
*** 1245,1251 ****
   * Fill the AsyncQueueEntry at *qe with an outbound notification message.
   */
  static void
! asyncQueueNotificationToEntry(Notification *n, AsyncQueueEntry *qe)
  {
  	size_t		channellen = strlen(n->channel);
  	size_t		payloadlen = strlen(n->payload);
--- 1330,1337 ----
   * Fill the AsyncQueueEntry at *qe with an outbound notification message.
   */
  static void
! asyncQueueNotificationToEntry(Notification *n, AsyncQueueEntry *qe,
! 							  Oid dboid, TransactionId xid)
  {
  	size_t		channellen = strlen(n->channel);
  	size_t		payloadlen = strlen(n->payload);
*************** asyncQueueNotificationToEntry(Notificati
*** 1255,1265 ****
  	Assert(payloadlen < NOTIFY_PAYLOAD_MAX_LENGTH);
  
  	/* The terminators are already included in AsyncQueueEntryEmptySize */
! 	entryLength = AsyncQueueEntryEmptySize + payloadlen + channellen;
  	entryLength = QUEUEALIGN(entryLength);
  	qe->length = entryLength;
! 	qe->dboid = MyDatabaseId;
! 	qe->xid = GetCurrentTransactionId();
  	qe->srcPid = MyProcPid;
  	memcpy(qe->data, n->channel, channellen + 1);
  	memcpy(qe->data + channellen + 1, n->payload, payloadlen + 1);
--- 1341,1352 ----
  	Assert(payloadlen < NOTIFY_PAYLOAD_MAX_LENGTH);
  
  	/* The terminators are already included in AsyncQueueEntryEmptySize */
! 	qe->str_length = channellen + payloadlen;
! 	entryLength = AsyncQueueEntryEmptySize + qe->str_length;
  	entryLength = QUEUEALIGN(entryLength);
  	qe->length = entryLength;
! 	qe->dboid = dboid;
! 	qe->xid = xid;
  	qe->srcPid = MyProcPid;
  	memcpy(qe->data, n->channel, channellen + 1);
  	memcpy(qe->data + channellen + 1, n->payload, payloadlen + 1);
*************** asyncQueueNotificationToEntry(Notificati
*** 1282,1293 ****
   * locally in this function.
   */
  static ListCell *
! asyncQueueAddEntries(ListCell *nextNotify)
  {
  	AsyncQueueEntry qe;
  	int			pageno;
  	int			offset;
  	int			slotno;
  
  	/* We hold both AsyncQueueLock and AsyncCtlLock during this operation */
  	LWLockAcquire(AsyncCtlLock, LW_EXCLUSIVE);
--- 1369,1383 ----
   * locally in this function.
   */
  static ListCell *
! asyncQueueAddEntries(ListCell *nextNotify, Oid dboid, TransactionId xid)
  {
  	AsyncQueueEntry qe;
  	int			pageno;
  	int			offset;
  	int			slotno;
+ 	ListCell   *HSStart = NULL;
+ 	ListCell   *HSEnd = NULL;
+ 	uint32		xlogLength = 0;
  
  	/* We hold both AsyncQueueLock and AsyncCtlLock during this operation */
  	LWLockAcquire(AsyncCtlLock, LW_EXCLUSIVE);
*************** asyncQueueAddEntries(ListCell *nextNotif
*** 1301,1315 ****
  	while (nextNotify != NULL)
  	{
  		Notification *n = (Notification *) lfirst(nextNotify);
  
  		/* Construct a valid queue entry in local variable qe */
! 		asyncQueueNotificationToEntry(n, &qe);
  
  		offset = QUEUE_POS_OFFSET(QUEUE_HEAD);
  
  		/* Check whether the entry really fits on the current page */
  		if (offset + qe.length <= QUEUE_PAGESIZE)
  		{
  			/* OK, so advance nextNotify past this item */
  			nextNotify = lnext(nextNotify);
  		}
--- 1391,1410 ----
  	while (nextNotify != NULL)
  	{
  		Notification *n = (Notification *) lfirst(nextNotify);
+ 		if (!HSStart)
+ 			HSStart = nextNotify;
  
  		/* Construct a valid queue entry in local variable qe */
! 		asyncQueueNotificationToEntry(n, &qe, dboid, xid);
  
  		offset = QUEUE_POS_OFFSET(QUEUE_HEAD);
  
  		/* Check whether the entry really fits on the current page */
  		if (offset + qe.length <= QUEUE_PAGESIZE)
  		{
+ 			xlogLength += qe.str_length + 2;
+ 			HSEnd = nextNotify;
+ 
  			/* OK, so advance nextNotify past this item */
  			nextNotify = lnext(nextNotify);
  		}
*************** asyncQueueAddEntries(ListCell *nextNotif
*** 1350,1360 ****
--- 1445,1489 ----
  
  	LWLockRelease(AsyncCtlLock);
  
+ 	/*
+ 	 * Send out the notifications for the hot standby servers, we do one xlog
+ 	 * update per notfication slru page, we could send more or less but this is
+ 	 * most convenient.
+ 	 * If HSStart == NULL: No notifications sent
+ 	 * If HSEnd == NULL: No more space on this page, so no notifications for this page
+ 	 * Only one entry if HSStart == HSEnd.
+ 	 */
+ 	if (!RecoveryInProgress() && HSStart && HSEnd)
+ 		asyncAddNotifyXLog(HSStart, HSEnd, xlogLength, dboid, xid, nextNotify == NULL);
+ 
  	return nextNotify;
  }
  
  /*
+  * Report pids that have their pointer into the queue set to the same position
+  * as the global tail pointer.
+  * The function receives a pointer to the backend id from where it should start
+  * scanning and updates it as it iterates over the backends. When it returns a
+  * pid, then the backend's id that has this pid is in *backendId.
+  */
+ static int32
+ asyncQueueGetSlowestPid(int *backendId)
+ {
+ 	Assert(backendId != NULL);
+ 	/* backend ids start with 1, 0 is unused, we expect to always get at least 1. */
+ 	Assert(*backendId >= 1);
+ 
+ 	for (; *backendId <= MaxBackends; (*backendId)++)
+ 		if (QUEUE_BACKEND_PID(*backendId) != InvalidPid)
+ 			if (QUEUE_POS_EQUAL(QUEUE_TAIL, QUEUE_BACKEND_POS(*backendId)))
+ 				return QUEUE_BACKEND_PID(*backendId);
+ 
+ 	return InvalidPid;
+ }
+ 
+ /*
   * Check whether the queue is at least half full, and emit a warning if so.
+  * Return the pid of one of the lazy backends that aren't catching up.
   *
   * This is unlikely given the size of the queue, but possible.
   * The warnings show up at most once every QUEUE_FULL_WARN_INTERVAL.
*************** asyncQueueFillWarning(void)
*** 1391,1410 ****
  	if (TimestampDifferenceExceeds(asyncQueueControl->lastQueueFillWarn,
  								   t, QUEUE_FULL_WARN_INTERVAL))
  	{
! 		QueuePosition min = QUEUE_HEAD;
! 		int32		minPid = InvalidPid;
! 		int			i;
! 
! 		for (i = 1; i <= MaxBackends; i++)
! 		{
! 			if (QUEUE_BACKEND_PID(i) != InvalidPid)
! 			{
! 				min = QUEUE_POS_MIN(min, QUEUE_BACKEND_POS(i));
! 				if (QUEUE_POS_EQUAL(min, QUEUE_BACKEND_POS(i)))
! 					minPid = QUEUE_BACKEND_PID(i);
! 			}
! 		}
! 
  		ereport(WARNING,
  				(errmsg("NOTIFY queue is %.0f%% full", fillDegree * 100),
  				 (minPid != InvalidPid ?
--- 1520,1528 ----
  	if (TimestampDifferenceExceeds(asyncQueueControl->lastQueueFillWarn,
  								   t, QUEUE_FULL_WARN_INTERVAL))
  	{
! 		/* backend ids start at 1, slot 0 is unused */
! 		int backendId = 1;
! 		int32 minPid = asyncQueueGetSlowestPid(&backendId);
  		ereport(WARNING,
  				(errmsg("NOTIFY queue is %.0f%% full", fillDegree * 100),
  				 (minPid != InvalidPid ?
*************** asyncQueueFillWarning(void)
*** 1418,1423 ****
--- 1536,1650 ----
  	}
  }
  
+ /*
+  * Create an Xlog entry from the notifications of HSStart up to and including HSEnd.
+  * The XLOG entry has a header and then
+  * channel\0payload\0channel\0payload\0...
+  */
+ static void
+ asyncAddNotifyXLog(ListCell *HSStart, ListCell *HSEnd, uint32 xlogLength,
+ 				   Oid dboid, TransactionId xid, bool isLast)
+ {
+ 	/* -1 because NotifyData has already a buffer[1] */
+ 	NotifyData *rdata = (NotifyData *) palloc(sizeof(NotifyData) + xlogLength - 1);
+ 	char	   *dataPtr = rdata->notify_buffer;
+ 	ListCell   *nextNotify = HSStart;
+ 	char	   *p;
+ 
+ 	Assert(HSEnd != NULL);
+ 
+ 	for (;;)
+ 	{
+ 		Notification *n = (Notification *) lfirst(nextNotify);
+ 
+ 		p = n->channel;
+ 		while (*p)
+ 			*dataPtr++ = *p++;
+ 		*dataPtr++ = *p++;
+ 
+ 		p = n->payload;
+ 		while (*p)
+ 			*dataPtr++ = *p++;
+ 		*dataPtr++ = *p++;
+ 
+ 		if (nextNotify == HSEnd)
+ 			break;
+ 
+ 		nextNotify = lnext(nextNotify);
+ 	}
+ 
+ 	Assert(dataPtr == rdata->notify_buffer + xlogLength);
+ 
+ 	rdata->notify_dboid = dboid;
+ 	rdata->notify_len = xlogLength;
+ 	rdata->notify_isLast = isLast;
+ 
+ 	XLogInsertNotifyData(rdata);
+ }
+ 
+ /*
+  * This function is only run from the recovery process (which is why we can use
+  * a static variable here). The function always runs in NotifyCtx but only for
+  * the time the function is run.
+  * It continues to use its snapshot until notify_isLast is set.
+  */
+ void
+ asyncGetNotifyFromXLog(NotifyData *rdata, TransactionId xid)
+ {
+ 	static MemoryContext NotifyCtx = NULL;
+ 	MemoryContext oldctx;
+ 	char	   *data = rdata->notify_buffer;
+ 
+ 	Assert(CurrentMemoryContext != NotifyCtx);
+ 
+ 	if (!NotifyCtx)
+ 	{
+ 		NotifyCtx = AllocSetContextCreate(TopMemoryContext,
+ 										  "NotifyXLogMemoryContext",
+ 										  ALLOCSET_DEFAULT_MINSIZE,
+ 										  ALLOCSET_DEFAULT_INITSIZE,
+ 										  ALLOCSET_DEFAULT_MAXSIZE);
+ 	}
+ 	oldctx = MemoryContextSwitchTo(NotifyCtx);
+ 
+ 	while ((data - rdata->notify_buffer) < rdata->notify_len)
+ 	{
+ 		Notification *n = (Notification *) palloc(sizeof(Notification));
+ 
+ 		n->channel = data;
+ 		while (*data)
+ 			data++;
+ 		data++;
+ 
+ 		n->payload = data;
+ 		while (*data)
+ 			data++;
+ 		data++;
+ 
+ 		pendingNotifies = lappend(pendingNotifies, n);
+ 	}
+ 
+ 	/* We should hit it exactly */
+ 	Assert((data - rdata->notify_buffer) == rdata->notify_len);
+ 
+ 	asyncAddNotificationsIntoQueue(rdata->notify_dboid, xid);
+ 	pendingNotifies = NIL;
+ 
+ 	MemoryContextSwitchTo(oldctx);
+ 
+ 	if (rdata->notify_isLast)
+ 	{
+ 		/*
+ 		 * XXX Maybe we should only do a MemoryContextReset for performance
+ 		 * reasons. However then we keep the space allocated that the
+ 		 * transaction with most/biggest notifications needed. And that might be
+ 		 * carried on forever.
+ 		 */
+ 		MemoryContextDelete(NotifyCtx);
+ 		NotifyCtx = NULL;
+ 	}
+ }
+ 
  /*
   * Send signals to all listening backends (except our own).
   *
diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c
index 323d42b..21c10fe 100644
*** a/src/backend/postmaster/pgstat.c
--- b/src/backend/postmaster/pgstat.c
*************** pgstat_get_db_entry(Oid databaseid, bool
*** 3266,3271 ****
--- 3266,3272 ----
  		result->n_conflict_snapshot = 0;
  		result->n_conflict_bufferpin = 0;
  		result->n_conflict_startup_deadlock = 0;
+ 		result->n_conflict_notify_queue = 0;
  
  		result->stat_reset_timestamp = GetCurrentTimestamp();
  
*************** pgstat_recv_recoveryconflict(PgStat_MsgR
*** 4399,4404 ****
--- 4400,4408 ----
  		case PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK:
  			dbentry->n_conflict_startup_deadlock++;
  			break;
+ 		case PROCSIG_RECOVERY_CONFLICT_NOTIFY_QUEUE:
+ 			dbentry->n_conflict_notify_queue++;
+ 			break;
  	}
  }
  
diff --git a/src/backend/storage/ipc/procarray.c b/src/backend/storage/ipc/procarray.c
index 6ea0a28..a802992 100644
*** a/src/backend/storage/ipc/procarray.c
--- b/src/backend/storage/ipc/procarray.c
*************** GetCurrentVirtualXIDs(TransactionId limi
*** 1999,2004 ****
--- 1999,2021 ----
  }
  
  /*
+  * GetVirtualXIDfromPid -- returns the current VXID of a certain backend pid
+  */
+ VirtualTransactionId
+ GetVirtualXIDfromPid(int pid)
+ {
+ 	VirtualTransactionId vxid;
+ 	PGPROC *proc = BackendPidGetProc(pid);
+ 
+ 	if (proc)
+ 		GET_VXID_FROM_PGPROC(vxid, *proc);
+ 	else
+ 		SetInvalidVirtualTransactionId(vxid);
+ 
+ 	return vxid;
+ }
+ 
+ /*
   * GetConflictingVirtualXIDs -- returns an array of currently active VXIDs.
   *
   * Usage is limited to conflict resolution during recovery on standby servers.
diff --git a/src/backend/storage/ipc/procsignal.c b/src/backend/storage/ipc/procsignal.c
index 3d7e85f..7f0bf52 100644
*** a/src/backend/storage/ipc/procsignal.c
--- b/src/backend/storage/ipc/procsignal.c
*************** procsignal_sigusr1_handler(SIGNAL_ARGS)
*** 276,281 ****
--- 276,284 ----
  	if (CheckProcSignal(PROCSIG_RECOVERY_CONFLICT_BUFFERPIN))
  		RecoveryConflictInterrupt(PROCSIG_RECOVERY_CONFLICT_BUFFERPIN);
  
+ 	if (CheckProcSignal(PROCSIG_RECOVERY_CONFLICT_NOTIFY_QUEUE))
+ 		RecoveryConflictInterrupt(PROCSIG_RECOVERY_CONFLICT_NOTIFY_QUEUE);
+ 
  	latch_sigusr1_handler();
  
  	errno = save_errno;
diff --git a/src/backend/storage/ipc/standby.c b/src/backend/storage/ipc/standby.c
index c88557c..535f456 100644
*** a/src/backend/storage/ipc/standby.c
--- b/src/backend/storage/ipc/standby.c
*************** SendRecoveryConflictWithBufferPin(ProcSi
*** 458,463 ****
--- 458,472 ----
  	CancelDBBackends(InvalidOid, reason, false);
  }
  
+ void
+ SendRecoveryConflictWithNotifyQueue(int pid)
+ {
+ 	VirtualTransactionId vxid = GetVirtualXIDfromPid(pid);
+ 	if (!VirtualTransactionIdIsValid(vxid))
+ 		return;
+ 	CancelVirtualTransaction(vxid, PROCSIG_RECOVERY_CONFLICT_NOTIFY_QUEUE);
+ }
+ 
  /*
   * In Hot Standby perform early deadlock detection.  We abort the lock
   * wait if we are about to sleep while holding the buffer pin that Startup
diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c
index 59a287f..29664e9 100644
*** a/src/backend/tcop/postgres.c
--- b/src/backend/tcop/postgres.c
*************** errdetail_recovery_conflict(void)
*** 2196,2201 ****
--- 2196,2204 ----
  		case PROCSIG_RECOVERY_CONFLICT_DATABASE:
  			errdetail("User was connected to a database that must be dropped.");
  			break;
+ 		case PROCSIG_RECOVERY_CONFLICT_NOTIFY_QUEUE:
+ 			errdetail("User was not processing notify messages promptly enough.");
+ 			break;
  		default:
  			break;
  			/* no errdetail */
*************** RecoveryConflictInterrupt(ProcSignalReas
*** 2698,2703 ****
--- 2701,2707 ----
  			case PROCSIG_RECOVERY_CONFLICT_LOCK:
  			case PROCSIG_RECOVERY_CONFLICT_TABLESPACE:
  			case PROCSIG_RECOVERY_CONFLICT_SNAPSHOT:
+ 			case PROCSIG_RECOVERY_CONFLICT_NOTIFY_QUEUE:
  
  				/*
  				 * If we aren't in a transaction any longer then ignore.
diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c
index 923da6a..f538290 100644
*** a/src/backend/tcop/utility.c
--- b/src/backend/tcop/utility.c
*************** standard_ProcessUtility(Node *parsetree,
*** 971,977 ****
  			{
  				ListenStmt *stmt = (ListenStmt *) parsetree;
  
- 				PreventCommandDuringRecovery("LISTEN");
  				CheckRestrictedOperation("LISTEN");
  				Async_Listen(stmt->conditionname);
  			}
--- 971,976 ----
*************** standard_ProcessUtility(Node *parsetree,
*** 981,987 ****
  			{
  				UnlistenStmt *stmt = (UnlistenStmt *) parsetree;
  
- 				PreventCommandDuringRecovery("UNLISTEN");
  				CheckRestrictedOperation("UNLISTEN");
  				if (stmt->conditionname)
  					Async_Unlisten(stmt->conditionname);
--- 980,985 ----
diff --git a/src/backend/utils/adt/pgstatfuncs.c b/src/backend/utils/adt/pgstatfuncs.c
index b4986d8..bd25f67 100644
*** a/src/backend/utils/adt/pgstatfuncs.c
--- b/src/backend/utils/adt/pgstatfuncs.c
*************** extern Datum pg_stat_get_db_conflict_loc
*** 77,82 ****
--- 77,83 ----
  extern Datum pg_stat_get_db_conflict_snapshot(PG_FUNCTION_ARGS);
  extern Datum pg_stat_get_db_conflict_bufferpin(PG_FUNCTION_ARGS);
  extern Datum pg_stat_get_db_conflict_startup_deadlock(PG_FUNCTION_ARGS);
+ extern Datum pg_stat_get_db_conflict_notify_queue(PG_FUNCTION_ARGS);
  extern Datum pg_stat_get_db_conflict_all(PG_FUNCTION_ARGS);
  extern Datum pg_stat_get_db_stat_reset_time(PG_FUNCTION_ARGS);
  
*************** pg_stat_get_db_conflict_startup_deadlock
*** 1253,1258 ****
--- 1254,1274 ----
  
  	PG_RETURN_INT64(result);
  }
+ 
+ Datum
+ pg_stat_get_db_conflict_notify_queue(PG_FUNCTION_ARGS)
+ {
+ 	Oid			dbid = PG_GETARG_OID(0);
+ 	int64		result;
+ 	PgStat_StatDBEntry *dbentry;
+ 
+ 	if ((dbentry = pgstat_fetch_stat_dbentry(dbid)) == NULL)
+ 		result = 0;
+ 	else
+ 		result = (int64) (dbentry->n_conflict_notify_queue);
+ 
+ 	PG_RETURN_INT64(result);
+ }
  
  Datum
  pg_stat_get_db_conflict_all(PG_FUNCTION_ARGS)
diff --git a/src/include/access/xact.h b/src/include/access/xact.h
index 5f063a9..333350f 100644
*** a/src/include/access/xact.h
--- b/src/include/access/xact.h
*************** typedef struct xl_xact_commit
*** 135,140 ****
--- 135,141 ----
  	int			nmsgs;			/* number of shared inval msgs */
  	Oid			dbId;			/* MyDatabaseId */
  	Oid			tsId;			/* MyDatabaseTableSpace */
+ 	bool		sent_notifications;  /* true if notifications have been sent */
  	/* Array of RelFileNode(s) to drop at commit */
  	RelFileNode xnodes[1];		/* VARIABLE LENGTH ARRAY */
  	/* ARRAY OF COMMITTED SUBTRANSACTION XIDs FOLLOWS */
*************** typedef struct xl_xact_abort
*** 163,168 ****
--- 164,170 ----
  	TimestampTz xact_time;		/* time of abort */
  	int			nrels;			/* number of RelFileNodes */
  	int			nsubxacts;		/* number of subtransaction XIDs */
+ 	bool		sent_notifications;  /* true if notifications have been sent */
  	/* Array of RelFileNode(s) to drop at abort */
  	RelFileNode xnodes[1];		/* VARIABLE LENGTH ARRAY */
  	/* ARRAY OF ABORTED SUBTRANSACTION XIDs FOLLOWS */
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index 93622c4..c95902d 100644
*** a/src/include/access/xlog.h
--- b/src/include/access/xlog.h
*************** typedef struct CheckpointStatsData
*** 263,268 ****
--- 263,277 ----
  
  extern CheckpointStatsData CheckpointStats;
  
+ typedef struct NotifyData
+ {
+ 	Oid			notify_dboid;
+ 	bool		notify_isLast;
+ 	uint32		notify_len;
+ 	char	    notify_buffer[1];
+ 	/* notify_buffer with variable-length down here */
+ } NotifyData;
+ 
  extern XLogRecPtr XLogInsert(RmgrId rmid, uint8 info, XLogRecData *rdata);
  extern void XLogFlush(XLogRecPtr RecPtr);
  extern void XLogBackgroundFlush(void);
*************** extern void InitXLOGAccess(void);
*** 306,311 ****
--- 315,321 ----
  extern void CreateCheckPoint(int flags);
  extern bool CreateRestartPoint(int flags);
  extern void XLogPutNextOid(Oid nextOid);
+ extern void XLogInsertNotifyData(NotifyData *notifyRecord);
  extern XLogRecPtr XLogRestorePoint(const char *rpName);
  extern XLogRecPtr GetRedoRecPtr(void);
  extern XLogRecPtr GetInsertRecPtr(void);
diff --git a/src/include/catalog/catversion.h b/src/include/catalog/catversion.h
index f7af5fd..a2764d4 100644
*** a/src/include/catalog/catversion.h
--- b/src/include/catalog/catversion.h
***************
*** 52,57 ****
--- 52,58 ----
   * catalog changes on the same day...)
   */
  
+ /* XXX Catversion needs new version with the Notify-to-Standby feature */
  /*							yyyymmddN */
  #define CATALOG_VERSION_NO	201112241
  
diff --git a/src/include/catalog/pg_control.h b/src/include/catalog/pg_control.h
index d0d2e9e..a2fe517 100644
*** a/src/include/catalog/pg_control.h
--- b/src/include/catalog/pg_control.h
*************** typedef struct CheckPoint
*** 60,65 ****
--- 60,66 ----
  #define XLOG_BACKUP_END					0x50
  #define XLOG_PARAMETER_CHANGE			0x60
  #define XLOG_RESTORE_POINT				0x70
+ #define XLOG_NOTIFY						0x80
  
  
  /*
diff --git a/src/include/catalog/pg_proc.h b/src/include/catalog/pg_proc.h
index 355c61a..c7be249 100644
*** a/src/include/catalog/pg_proc.h
--- b/src/include/catalog/pg_proc.h
*************** DATA(insert OID = 3068 (  pg_stat_get_db
*** 2629,2634 ****
--- 2629,2636 ----
  DESCR("statistics: recovery conflicts in database caused by shared buffer pin");
  DATA(insert OID = 3069 (  pg_stat_get_db_conflict_startup_deadlock PGNSP PGUID 12 1 0 0 0 f f f t f s 1 0 20 "26" _null_ _null_ _null_ _null_ pg_stat_get_db_conflict_startup_deadlock _null_ _null_ _null_ ));
  DESCR("statistics: recovery conflicts in database caused by buffer deadlock");
+ DATA(insert OID = 3144 (  pg_stat_get_db_conflict_notify_queue PGNSP PGUID 12 1 0 0 0 f f f t f s 1 0 20 "26" _null_ _null_ _null_ _null_ pg_stat_get_db_conflict_notify_queue _null_ _null_ _null_ ));
+ DESCR("statistics: recovery conflicts in database caused by the notify queue");
  DATA(insert OID = 3070 (  pg_stat_get_db_conflict_all PGNSP PGUID 12 1 0 0 0 f f f t f s 1 0 20 "26" _null_ _null_ _null_ _null_ pg_stat_get_db_conflict_all _null_ _null_ _null_ ));
  DESCR("statistics: recovery conflicts in database");
  DATA(insert OID = 3074 (  pg_stat_get_db_stat_reset_time PGNSP PGUID 12 1 0 0 0 f f f t f s 1 0 1184 "26" _null_ _null_ _null_ _null_ pg_stat_get_db_stat_reset_time _null_ _null_ _null_ ));
diff --git a/src/include/commands/async.h b/src/include/commands/async.h
index 3d1cf68..3da9bbe 100644
*** a/src/include/commands/async.h
--- b/src/include/commands/async.h
***************
*** 21,26 ****
--- 21,27 ----
  #define NUM_ASYNC_BUFFERS	8
  
  extern bool Trace_notify;
+ extern bool backendHasSentNotifications;
  
  extern Size AsyncShmemSize(void);
  extern void AsyncShmemInit(void);
*************** extern Datum pg_notify(PG_FUNCTION_ARGS)
*** 38,43 ****
--- 39,45 ----
  /* perform (or cancel) outbound notify processing at transaction commit */
  extern void PreCommit_Notify(void);
  extern void AtCommit_Notify(void);
+ extern void AtEOXact_HS_Notify(void);
  extern void AtAbort_Notify(void);
  extern void AtSubStart_Notify(void);
  extern void AtSubCommit_Notify(void);
*************** extern void HandleNotifyInterrupt(void);
*** 56,59 ****
--- 58,67 ----
  extern void EnableNotifyInterrupt(void);
  extern bool DisableNotifyInterrupt(void);
  
+ /*
+  * Called from redo_xlog in xlog.c to inject notifications into a replicated
+  * database.
+  */
+ void asyncGetNotifyFromXLog(NotifyData *rdata, TransactionId xid);
+ 
  #endif   /* ASYNC_H */
diff --git a/src/include/pgstat.h b/src/include/pgstat.h
index b8c6d82..a14f733 100644
*** a/src/include/pgstat.h
--- b/src/include/pgstat.h
*************** typedef struct PgStat_StatDBEntry
*** 507,512 ****
--- 507,513 ----
  	PgStat_Counter n_conflict_snapshot;
  	PgStat_Counter n_conflict_bufferpin;
  	PgStat_Counter n_conflict_startup_deadlock;
+ 	PgStat_Counter n_conflict_notify_queue;
  	TimestampTz stat_reset_timestamp;
  
  
diff --git a/src/include/storage/procarray.h b/src/include/storage/procarray.h
index 0b0aa35..64f9433 100644
*** a/src/include/storage/procarray.h
--- b/src/include/storage/procarray.h
*************** extern bool IsBackendPid(int pid);
*** 62,67 ****
--- 62,68 ----
  extern VirtualTransactionId *GetCurrentVirtualXIDs(TransactionId limitXmin,
  					  bool excludeXmin0, bool allDbs, int excludeVacuum,
  					  int *nvxids);
+ extern VirtualTransactionId GetVirtualXIDfromPid(int pid);
  extern VirtualTransactionId *GetConflictingVirtualXIDs(TransactionId limitXmin, Oid dbOid);
  extern pid_t CancelVirtualTransaction(VirtualTransactionId vxid, ProcSignalReason sigmode);
  
diff --git a/src/include/storage/procsignal.h b/src/include/storage/procsignal.h
index 715e5be..181dbde 100644
*** a/src/include/storage/procsignal.h
--- b/src/include/storage/procsignal.h
*************** typedef enum
*** 39,44 ****
--- 39,45 ----
  	PROCSIG_RECOVERY_CONFLICT_SNAPSHOT,
  	PROCSIG_RECOVERY_CONFLICT_BUFFERPIN,
  	PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK,
+ 	PROCSIG_RECOVERY_CONFLICT_NOTIFY_QUEUE,
  
  	NUM_PROCSIGNALS				/* Must be last! */
  } ProcSignalReason;
diff --git a/src/include/storage/standby.h b/src/include/storage/standby.h
index a539ec2..671fe0d 100644
*** a/src/include/storage/standby.h
--- b/src/include/storage/standby.h
*************** extern void ResolveRecoveryConflictWithD
*** 35,40 ****
--- 35,41 ----
  
  extern void ResolveRecoveryConflictWithBufferPin(void);
  extern void SendRecoveryConflictWithBufferPin(ProcSignalReason reason);
+ extern void SendRecoveryConflictWithNotifyQueue(int pid);
  extern void CheckRecoveryConflictDeadlock(void);
  
  /*
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index 454e1f9..a273d64 100644
*** a/src/test/regress/expected/rules.out
--- b/src/test/regress/expected/rules.out
*************** SELECT viewname, definition FROM pg_view
*** 1297,1303 ****
   pg_stat_all_tables              | SELECT c.oid AS relid, n.nspname AS schemaname, c.relname, pg_stat_get_numscans(c.oid) AS seq_scan, pg_stat_get_tuples_returned(c.oid) AS seq_tup_read, (sum(pg_stat_get_numscans(i.indexrelid)))::bigint AS idx_scan, ((sum(pg_stat_get_tuples_fetched(i.indexrelid)))::bigint + pg_stat_get_tuples_fetched(c.oid)) AS idx_tup_fetch, pg_stat_get_tuples_inserted(c.oid) AS n_tup_ins, pg_stat_get_tuples_updated(c.oid) AS n_tup_upd, pg_stat_get_tuples_deleted(c.oid) AS n_tup_del, pg_stat_get_tuples_hot_updated(c.oid) AS n_tup_hot_upd, pg_stat_get_live_tuples(c.oid) AS n_live_tup, pg_stat_get_dead_tuples(c.oid) AS n_dead_tup, pg_stat_get_last_vacuum_time(c.oid) AS last_vacuum, pg_stat_get_last_autovacuum_time(c.oid) AS last_autovacuum, pg_stat_get_last_analyze_time(c.oid) AS last_analyze, pg_stat_get_last_autoanalyze_time(c.oid) AS last_autoanalyze, pg_stat_get_vacuum_count(c.oid) AS vacuum_count, pg_stat_get_autovacuum_count(c.oid) AS autovacuum_count, pg_stat_get_analyze_count(c.oid) AS analyze_count, pg_stat_get_autoanalyze_count(c.oid) AS autoanalyze_count FROM ((pg_class c LEFT JOIN pg_index i ON ((c.oid = i.indrelid))) LEFT JOIN pg_namespace n ON ((n.oid = c.relnamespace))) WHERE (c.relkind = ANY (ARRAY['r'::"char", 't'::"char"])) GROUP BY c.oid, n.nspname, c.relname;
   pg_stat_bgwriter                | SELECT pg_stat_get_bgwriter_timed_checkpoints() AS checkpoints_timed, pg_stat_get_bgwriter_requested_checkpoints() AS checkpoints_req, pg_stat_get_bgwriter_buf_written_checkpoints() AS buffers_checkpoint, pg_stat_get_bgwriter_buf_written_clean() AS buffers_clean, pg_stat_get_bgwriter_maxwritten_clean() AS maxwritten_clean, pg_stat_get_buf_written_backend() AS buffers_backend, pg_stat_get_buf_fsync_backend() AS buffers_backend_fsync, pg_stat_get_buf_alloc() AS buffers_alloc, pg_stat_get_bgwriter_stat_reset_time() AS stats_reset;
   pg_stat_database                | SELECT d.oid AS datid, d.datname, pg_stat_get_db_numbackends(d.oid) AS numbackends, pg_stat_get_db_xact_commit(d.oid) AS xact_commit, pg_stat_get_db_xact_rollback(d.oid) AS xact_rollback, (pg_stat_get_db_blocks_fetched(d.oid) - pg_stat_get_db_blocks_hit(d.oid)) AS blks_read, pg_stat_get_db_blocks_hit(d.oid) AS blks_hit, pg_stat_get_db_tuples_returned(d.oid) AS tup_returned, pg_stat_get_db_tuples_fetched(d.oid) AS tup_fetched, pg_stat_get_db_tuples_inserted(d.oid) AS tup_inserted, pg_stat_get_db_tuples_updated(d.oid) AS tup_updated, pg_stat_get_db_tuples_deleted(d.oid) AS tup_deleted, pg_stat_get_db_conflict_all(d.oid) AS conflicts, pg_stat_get_db_stat_reset_time(d.oid) AS stats_reset FROM pg_database d;
!  pg_stat_database_conflicts      | SELECT d.oid AS datid, d.datname, pg_stat_get_db_conflict_tablespace(d.oid) AS confl_tablespace, pg_stat_get_db_conflict_lock(d.oid) AS confl_lock, pg_stat_get_db_conflict_snapshot(d.oid) AS confl_snapshot, pg_stat_get_db_conflict_bufferpin(d.oid) AS confl_bufferpin, pg_stat_get_db_conflict_startup_deadlock(d.oid) AS confl_deadlock FROM pg_database d;
   pg_stat_replication             | SELECT s.procpid, s.usesysid, u.rolname AS usename, s.application_name, s.client_addr, s.client_hostname, s.client_port, s.backend_start, w.state, w.sent_location, w.write_location, w.flush_location, w.replay_location, w.sync_priority, w.sync_state FROM pg_stat_get_activity(NULL::integer) s(datid, procpid, usesysid, application_name, current_query, waiting, xact_start, query_start, backend_start, client_addr, client_hostname, client_port), pg_authid u, pg_stat_get_wal_senders() w(procpid, state, sent_location, write_location, flush_location, replay_location, sync_priority, sync_state) WHERE ((s.usesysid = u.oid) AND (s.procpid = w.procpid));
   pg_stat_sys_indexes             | SELECT pg_stat_all_indexes.relid, pg_stat_all_indexes.indexrelid, pg_stat_all_indexes.schemaname, pg_stat_all_indexes.relname, pg_stat_all_indexes.indexrelname, pg_stat_all_indexes.idx_scan, pg_stat_all_indexes.idx_tup_read, pg_stat_all_indexes.idx_tup_fetch FROM pg_stat_all_indexes WHERE ((pg_stat_all_indexes.schemaname = ANY (ARRAY['pg_catalog'::name, 'information_schema'::name])) OR (pg_stat_all_indexes.schemaname ~ '^pg_toast'::text));
   pg_stat_sys_tables              | SELECT pg_stat_all_tables.relid, pg_stat_all_tables.schemaname, pg_stat_all_tables.relname, pg_stat_all_tables.seq_scan, pg_stat_all_tables.seq_tup_read, pg_stat_all_tables.idx_scan, pg_stat_all_tables.idx_tup_fetch, pg_stat_all_tables.n_tup_ins, pg_stat_all_tables.n_tup_upd, pg_stat_all_tables.n_tup_del, pg_stat_all_tables.n_tup_hot_upd, pg_stat_all_tables.n_live_tup, pg_stat_all_tables.n_dead_tup, pg_stat_all_tables.last_vacuum, pg_stat_all_tables.last_autovacuum, pg_stat_all_tables.last_analyze, pg_stat_all_tables.last_autoanalyze, pg_stat_all_tables.vacuum_count, pg_stat_all_tables.autovacuum_count, pg_stat_all_tables.analyze_count, pg_stat_all_tables.autoanalyze_count FROM pg_stat_all_tables WHERE ((pg_stat_all_tables.schemaname = ANY (ARRAY['pg_catalog'::name, 'information_schema'::name])) OR (pg_stat_all_tables.schemaname ~ '^pg_toast'::text));
--- 1297,1303 ----
   pg_stat_all_tables              | SELECT c.oid AS relid, n.nspname AS schemaname, c.relname, pg_stat_get_numscans(c.oid) AS seq_scan, pg_stat_get_tuples_returned(c.oid) AS seq_tup_read, (sum(pg_stat_get_numscans(i.indexrelid)))::bigint AS idx_scan, ((sum(pg_stat_get_tuples_fetched(i.indexrelid)))::bigint + pg_stat_get_tuples_fetched(c.oid)) AS idx_tup_fetch, pg_stat_get_tuples_inserted(c.oid) AS n_tup_ins, pg_stat_get_tuples_updated(c.oid) AS n_tup_upd, pg_stat_get_tuples_deleted(c.oid) AS n_tup_del, pg_stat_get_tuples_hot_updated(c.oid) AS n_tup_hot_upd, pg_stat_get_live_tuples(c.oid) AS n_live_tup, pg_stat_get_dead_tuples(c.oid) AS n_dead_tup, pg_stat_get_last_vacuum_time(c.oid) AS last_vacuum, pg_stat_get_last_autovacuum_time(c.oid) AS last_autovacuum, pg_stat_get_last_analyze_time(c.oid) AS last_analyze, pg_stat_get_last_autoanalyze_time(c.oid) AS last_autoanalyze, pg_stat_get_vacuum_count(c.oid) AS vacuum_count, pg_stat_get_autovacuum_count(c.oid) AS autovacuum_count, pg_stat_get_analyze_count(c.oid) AS analyze_count, pg_stat_get_autoanalyze_count(c.oid) AS autoanalyze_count FROM ((pg_class c LEFT JOIN pg_index i ON ((c.oid = i.indrelid))) LEFT JOIN pg_namespace n ON ((n.oid = c.relnamespace))) WHERE (c.relkind = ANY (ARRAY['r'::"char", 't'::"char"])) GROUP BY c.oid, n.nspname, c.relname;
   pg_stat_bgwriter                | SELECT pg_stat_get_bgwriter_timed_checkpoints() AS checkpoints_timed, pg_stat_get_bgwriter_requested_checkpoints() AS checkpoints_req, pg_stat_get_bgwriter_buf_written_checkpoints() AS buffers_checkpoint, pg_stat_get_bgwriter_buf_written_clean() AS buffers_clean, pg_stat_get_bgwriter_maxwritten_clean() AS maxwritten_clean, pg_stat_get_buf_written_backend() AS buffers_backend, pg_stat_get_buf_fsync_backend() AS buffers_backend_fsync, pg_stat_get_buf_alloc() AS buffers_alloc, pg_stat_get_bgwriter_stat_reset_time() AS stats_reset;
   pg_stat_database                | SELECT d.oid AS datid, d.datname, pg_stat_get_db_numbackends(d.oid) AS numbackends, pg_stat_get_db_xact_commit(d.oid) AS xact_commit, pg_stat_get_db_xact_rollback(d.oid) AS xact_rollback, (pg_stat_get_db_blocks_fetched(d.oid) - pg_stat_get_db_blocks_hit(d.oid)) AS blks_read, pg_stat_get_db_blocks_hit(d.oid) AS blks_hit, pg_stat_get_db_tuples_returned(d.oid) AS tup_returned, pg_stat_get_db_tuples_fetched(d.oid) AS tup_fetched, pg_stat_get_db_tuples_inserted(d.oid) AS tup_inserted, pg_stat_get_db_tuples_updated(d.oid) AS tup_updated, pg_stat_get_db_tuples_deleted(d.oid) AS tup_deleted, pg_stat_get_db_conflict_all(d.oid) AS conflicts, pg_stat_get_db_stat_reset_time(d.oid) AS stats_reset FROM pg_database d;
!  pg_stat_database_conflicts      | SELECT d.oid AS datid, d.datname, pg_stat_get_db_conflict_tablespace(d.oid) AS confl_tablespace, pg_stat_get_db_conflict_lock(d.oid) AS confl_lock, pg_stat_get_db_conflict_snapshot(d.oid) AS confl_snapshot, pg_stat_get_db_conflict_bufferpin(d.oid) AS confl_bufferpin, pg_stat_get_db_conflict_startup_deadlock(d.oid) AS confl_deadlock, pg_stat_get_db_conflict_notify_queue(d.oid) AS confl_notify FROM pg_database d;
   pg_stat_replication             | SELECT s.procpid, s.usesysid, u.rolname AS usename, s.application_name, s.client_addr, s.client_hostname, s.client_port, s.backend_start, w.state, w.sent_location, w.write_location, w.flush_location, w.replay_location, w.sync_priority, w.sync_state FROM pg_stat_get_activity(NULL::integer) s(datid, procpid, usesysid, application_name, current_query, waiting, xact_start, query_start, backend_start, client_addr, client_hostname, client_port), pg_authid u, pg_stat_get_wal_senders() w(procpid, state, sent_location, write_location, flush_location, replay_location, sync_priority, sync_state) WHERE ((s.usesysid = u.oid) AND (s.procpid = w.procpid));
   pg_stat_sys_indexes             | SELECT pg_stat_all_indexes.relid, pg_stat_all_indexes.indexrelid, pg_stat_all_indexes.schemaname, pg_stat_all_indexes.relname, pg_stat_all_indexes.indexrelname, pg_stat_all_indexes.idx_scan, pg_stat_all_indexes.idx_tup_read, pg_stat_all_indexes.idx_tup_fetch FROM pg_stat_all_indexes WHERE ((pg_stat_all_indexes.schemaname = ANY (ARRAY['pg_catalog'::name, 'information_schema'::name])) OR (pg_stat_all_indexes.schemaname ~ '^pg_toast'::text));
   pg_stat_sys_tables              | SELECT pg_stat_all_tables.relid, pg_stat_all_tables.schemaname, pg_stat_all_tables.relname, pg_stat_all_tables.seq_scan, pg_stat_all_tables.seq_tup_read, pg_stat_all_tables.idx_scan, pg_stat_all_tables.idx_tup_fetch, pg_stat_all_tables.n_tup_ins, pg_stat_all_tables.n_tup_upd, pg_stat_all_tables.n_tup_del, pg_stat_all_tables.n_tup_hot_upd, pg_stat_all_tables.n_live_tup, pg_stat_all_tables.n_dead_tup, pg_stat_all_tables.last_vacuum, pg_stat_all_tables.last_autovacuum, pg_stat_all_tables.last_analyze, pg_stat_all_tables.last_autoanalyze, pg_stat_all_tables.vacuum_count, pg_stat_all_tables.autovacuum_count, pg_stat_all_tables.analyze_count, pg_stat_all_tables.autoanalyze_count FROM pg_stat_all_tables WHERE ((pg_stat_all_tables.schemaname = ANY (ARRAY['pg_catalog'::name, 'information_schema'::name])) OR (pg_stat_all_tables.schemaname ~ '^pg_toast'::text));
#2Tom Lane
tgl@sss.pgh.pa.us
In reply to: Joachim Wieland (#1)
Re: Sending notifications from the master to the standby

Joachim Wieland <joe@mcknight.de> writes:

[ send NOTIFYs to slaves by means of: ]
In the patch I added a new WAL message type, XLOG_NOTIFY that writes
out WAL records when the notifications are written into the pages of
the SLRU ring buffer. Whenever an SLRU page is found to be full, a new
WAL record will be created, that's just a more or less arbitrary form
of batching a bunch of them together but that's easy to do and most
often, I think there won't be more than at most a few record per
transaction anyway.

I'm having a hard time wrapping my mind around why you'd do it that way.
ISTM there are two fairly serious problems:

1. Emitting WAL records for NOTIFY traffic results in significantly
more overhead, with no benefit whatever, for existing non-replicated
NOTIFY-using applications. Those folk are going to see a performance
degradation, and they're going to complain.

2. Batching NOTIFY traffic will result in a delay in receipt, which will
annoy anybody who's trying to make actual use of the notifications on
standby servers. The worst case here happens if notify traffic on the
master is bursty: the last few messages in a burst might not get to the
slave for a long time, certainly long after the commits that the
messages were supposed to be telling people about.

So this design is non-optimal both for existing uses and for the
proposed new uses, which means nobody will like it. You could
ameliorate #1 by adding a GUC that determines whether NOTIFY actually
writes WAL, but that's pretty ugly. In any case ISTM that problem #2
means this design is basically broken.

I wonder whether it'd be practical to not involve WAL per se in this
at all, but to transmit NOTIFY messages by having walsender processes
follow the notify stream (as though they were listeners) and send the
notify traffic as a separate message stream interleaved with the WAL
traffic. We already have, as of a few days ago, the concept of
additional traffic in the walsender stream besides the WAL data itself,
so adding notify traffic as another message type should be
straightforward. It might be a bit tricky to get walreceivers to inject
the data into the slave-side ring buffer at the right time, ie, not
until after the commit a given message describes has been replayed;
but I don't immediately see a reason to think that's infeasible.

Going in this direction would mean that slave-side LISTEN only works
when using walsender/walreceiver, and not with old-style log shipping.
But personally I don't see a problem with that. If you're trying to
LISTEN you probably want pretty up-to-date data anyway.

regards, tom lane

#3Simon Riggs
simon@2ndQuadrant.com
In reply to: Tom Lane (#2)
Re: Sending notifications from the master to the standby

On Tue, Jan 10, 2012 at 5:00 AM, Tom Lane <tgl@sss.pgh.pa.us> wrote:

Joachim Wieland <joe@mcknight.de> writes:

[ send NOTIFYs to slaves by means of: ]

Good idea.

I wonder whether it'd be practical to not involve WAL per se in this
at all, but to transmit NOTIFY messages by having walsender processes
follow the notify stream (as though they were listeners) and send the
notify traffic as a separate message stream interleaved with the WAL
traffic.  We already have, as of a few days ago, the concept of
additional traffic in the walsender stream besides the WAL data itself,
so adding notify traffic as another message type should be
straightforward.

Also good idea.

It might be a bit tricky to get walreceivers to inject
the data into the slave-side ring buffer at the right time, ie, not
until after the commit a given message describes has been replayed;
but I don't immediately see a reason to think that's infeasible.

When transaction commits it would use full-size commit records and set
a (new) flag in xl_xact_commit.xinfo to show the commit is paired with
notify traffic.

Get messages in walreceiver.c XLogWalRcvProcessMsg() and put them in a
shared hash table. Messages would need to contain xid of notifying
transaction and other info needed for LISTEN.

When we hit xact.c xact_redo_commit() on standby we'd check for
messages in the hash table if the notify flag is set and execute the
normal notify code as if the NOTIFY had run locally on the standby. We
can sweep the hash table clean of any old messages each time we run
ProcArrayApplyRecoveryInfo()

Add new message type to walprotocol.h. Message code 'L' appears to be
available.

Suggest we add something to initial handshake from standby to say
"please send me notify traffic", which we can link to a parameter that
defines size of standby_notify_buffer. We don't want all standbys to
receive such traffic unless they really want it and pg_basebackup
probably doesn't want it either.

If you wanted to get really fancy you could send only some of the
traffic to each standby based on a hash or roundrobin algorithm, so we
can spread the listeners across multiple standbys.

I'll be your reviewer, if you want.

Going in this direction would mean that slave-side LISTEN only works
when using walsender/walreceiver, and not with old-style log shipping.
But personally I don't see a problem with that.  If you're trying to
LISTEN you probably want pretty up-to-date data anyway.

Which fits the expected use case also.

--
 Simon Riggs                   http://www.2ndQuadrant.com/
 PostgreSQL Development, 24x7 Support, Training & Services

#4Joachim Wieland
joe@mcknight.de
In reply to: Tom Lane (#2)
Re: Sending notifications from the master to the standby

On Tue, Jan 10, 2012 at 12:00 AM, Tom Lane <tgl@sss.pgh.pa.us> wrote:

So this design is non-optimal both for existing uses and for the
proposed new uses, which means nobody will like it.  You could
ameliorate #1 by adding a GUC that determines whether NOTIFY actually
writes WAL, but that's pretty ugly.  In any case ISTM that problem #2
means this design is basically broken.

I chose to do it this way because it seemed like the most natural way
to do it (which of course doesn't mean it's the best) :-). I agree
that there should be a way to avoid the replication of the NOTIFYs.
Regarding your second point though, remember that on the master we
write notifications to the queue in pre-commit. And we also don't
interleave notifications of different transactions. So once the commit
record makes it to the standby, all the notifications are already
there, just as on the master. In a burst of notifications, both
solutions should more or less behave the same way but yes, the one
involving the WAL file would be slower as it goes to the file system
and back.

I wonder whether it'd be practical to not involve WAL per se in this
at all, but to transmit NOTIFY messages by having walsender processes
follow the notify stream (as though they were listeners) and send the
notify traffic as a separate message stream interleaved with the WAL
traffic.

Agreed, having walsender/receiver work as NOTIFY proxies is kinda smart...

Joachim

#5Simon Riggs
simon@2ndQuadrant.com
In reply to: Joachim Wieland (#4)
Re: Sending notifications from the master to the standby

On Tue, Jan 10, 2012 at 12:56 PM, Joachim Wieland <joe@mcknight.de> wrote:

I chose to do it this way because it seemed like the most natural way
to do it (which of course doesn't mean it's the best)  :-).

If its any consolation its exactly how I would have done it also up
until about 2 months ago, and I remember discussing almost exactly the
design you presented with someone in Rome last year.

Anyway its a good feature, so I hope you have time.

--
 Simon Riggs                   http://www.2ndQuadrant.com/
 PostgreSQL Development, 24x7 Support, Training & Services

#6Tom Lane
tgl@sss.pgh.pa.us
In reply to: Simon Riggs (#3)
Re: Sending notifications from the master to the standby

Simon Riggs <simon@2ndQuadrant.com> writes:

On Tue, Jan 10, 2012 at 5:00 AM, Tom Lane <tgl@sss.pgh.pa.us> wrote:

It might be a bit tricky to get walreceivers to inject
the data into the slave-side ring buffer at the right time, ie, not
until after the commit a given message describes has been replayed;
but I don't immediately see a reason to think that's infeasible.

[ Simon sketches a design for that ]

Seems a bit overcomplicated. I was just thinking of having walreceiver
note the WAL endpoint at the instant of receipt of a notify message,
and not release the notify message to the slave ring buffer until WAL
replay has advanced that far. You'd need to lay down ground rules about
how the walsender times the insertion of notify messages relative to
WAL in its output. But I don't see the need for either explicit markers
in the WAL stream or a hash table. Indeed, a hash table scares me
because it doesn't clearly guarantee that notifies will be released in
arrival order.

Suggest we add something to initial handshake from standby to say
"please send me notify traffic",

+1 on that.

regards, tom lane

#7Simon Riggs
simon@2ndQuadrant.com
In reply to: Tom Lane (#6)
Re: Sending notifications from the master to the standby

On Tue, Jan 10, 2012 at 4:55 PM, Tom Lane <tgl@sss.pgh.pa.us> wrote:

Simon Riggs <simon@2ndQuadrant.com> writes:

On Tue, Jan 10, 2012 at 5:00 AM, Tom Lane <tgl@sss.pgh.pa.us> wrote:

It might be a bit tricky to get walreceivers to inject
the data into the slave-side ring buffer at the right time, ie, not
until after the commit a given message describes has been replayed;
but I don't immediately see a reason to think that's infeasible.

[ Simon sketches a design for that ]

Seems a bit overcomplicated.  I was just thinking of having walreceiver
note the WAL endpoint at the instant of receipt of a notify message,
and not release the notify message to the slave ring buffer until WAL
replay has advanced that far.  You'd need to lay down ground rules about
how the walsender times the insertion of notify messages relative to
WAL in its output.

You have to store the messages somewhere until they're needed. If that
somewhere isn't on the standby, very close to the Startup process then
its going to be very slow. Putting a marker in the WAL stream
guarantees arrival order. The hash table was just a place to store
them until they're needed, could be a ring buffer as well.

Inserts into the slave ring buffer already have an xid on them, so the
test will probably already cope with messages inserted but for which
the parent xid has not committed. The only problem is coping with
possible out of sequence messages.

But I don't see the need for either explicit markers
in the WAL stream or a hash table.  Indeed, a hash table scares me
because it doesn't clearly guarantee that notifies will be released in
arrival order.

The hash table is clearly not the thing providing an arrival order
guarantee, it was just a cache.

You have a few choices: (1) you either send the message while holding
an exclusive lock, or (2) you send them as they come and buffer them,
then reorder them using the WAL log sequence since that matches the
original commit sequence. Or (3) add a sequence number to the messages
sent by WALSender, so that the WALReceiver can buffer them locally and
insert them in the correct order into the normal ring buffer - so in
(3) the message sequence and the WAL sequence match, but the mechanism
is different.

(1) is out because the purpose of offloading to the standby is to give
the master more capcity. If we slow it down in order to serve the
standby we're doing things the wrong way around.

I was choosing (2), maybe you prefer (3) or another design entirely.
They look very similar to me and about the same complexity, its just
copying data and preserving sequence.

--
 Simon Riggs                   http://www.2ndQuadrant.com/
 PostgreSQL Development, 24x7 Support, Training & Services

#8Joachim Wieland
joe@mcknight.de
In reply to: Tom Lane (#6)
Re: Sending notifications from the master to the standby

On Tue, Jan 10, 2012 at 11:55 AM, Tom Lane <tgl@sss.pgh.pa.us> wrote:

Simon Riggs <simon@2ndQuadrant.com> writes:
[ Tom sketches a design ]
Seems a bit overcomplicated.  I was just thinking of having walreceiver
note the WAL endpoint at the instant of receipt of a notify message,
and not release the notify message to the slave ring buffer until WAL
replay has advanced that far.

How about this: We mark a notify message specially if it is the last
message sent by a transaction and also add a flag to
commit/abort-records, indicating whether or not the transaction has
sent notifys. Now if such a last message is being put into the regular
ring buffer on the standby and the xid is known to have committed or
aborted, signal the backends. Also signal from a commit/abort-record
if the flag is set.

If the notify messages make it to the standby first, we just put
messages of a not-yet-committed transaction into the queue, just as on
the master. Listeners will get signaled when the commit record
arrives. If the commit record arrives first, we signal, but the
listeners won't find anything (at least not the latest notifications).
When the last notify of that transaction finally arrives, the
transaction is known to have committed and the listeners will get
signaled.

What could still happen is that the standby receives notifys, the
commit message and more notifys. Listeners would still eventually get
all the messages but potentially not all of them at once. Is this a
problem? If so, then we could add a special "stop reading"-record into
the queue before we write the notifys, that we subsequently change
into a "continue reading"-record once all notifications are in the
queue. Readers would treat a "stop reading" record just like a
not-yet-committed transaction and ignore a "continue reading" record.

Suggest we add something to initial handshake from standby to say
"please send me notify traffic",

+1 on that.

From what you said I imagined this walsender listener as a regular
listener that listens on the union of all sets of channels that
anybody is listening on on the standby, with the LISTEN transaction on
the standby return from commit once the listener is known to have been
set up on the master.

Joachim

#9Tom Lane
tgl@sss.pgh.pa.us
In reply to: Joachim Wieland (#8)
Re: Sending notifications from the master to the standby

Joachim Wieland <joe@mcknight.de> writes:

On Tue, Jan 10, 2012 at 11:55 AM, Tom Lane <tgl@sss.pgh.pa.us> wrote:

Simon Riggs <simon@2ndQuadrant.com> writes:

Suggest we add something to initial handshake from standby to say
"please send me notify traffic",

+1 on that.

From what you said I imagined this walsender listener as a regular
listener that listens on the union of all sets of channels that
anybody is listening on on the standby, with the LISTEN transaction on
the standby return from commit once the listener is known to have been
set up on the master.

This seems vastly overcomplicated too. I'd just vote for a simple
yes/no flag, so that receivers that have no interest in notifies don't
have to deal with them.

regards, tom lane

#10Tom Lane
tgl@sss.pgh.pa.us
In reply to: Tom Lane (#9)
Re: Sending notifications from the master to the standby

BTW ... it occurs to me to ask whether we really have a solid use-case
for having listeners attached to slave servers. I have personally never
seen an application for LISTEN/NOTIFY in which the listeners were
entirely read-only. Even if there are one or two cases out there, it's
not clear to me that supporting it is worth the extra complexity that
seems to be needed.

regards, tom lane

#11Simon Riggs
simon@2ndQuadrant.com
In reply to: Tom Lane (#10)
Re: Sending notifications from the master to the standby

On Wed, Jan 11, 2012 at 4:33 AM, Tom Lane <tgl@sss.pgh.pa.us> wrote:

BTW ... it occurs to me to ask whether we really have a solid use-case
for having listeners attached to slave servers.  I have personally never
seen an application for LISTEN/NOTIFY in which the listeners were
entirely read-only.  Even if there are one or two cases out there, it's
not clear to me that supporting it is worth the extra complexity that
seems to be needed.

The idea is to support external caches that re-read the data when it changes.

If we can do that from the standby then we offload from the master.

Yes, there are other applications for LISTEN/NOTIFY and we wouldn't be
able to support them all with this.

--
 Simon Riggs                   http://www.2ndQuadrant.com/
 PostgreSQL Development, 24x7 Support, Training & Services

#12Josh Berkus
josh@agliodbs.com
In reply to: Tom Lane (#10)
Re: Sending notifications from the master to the standby

Tom,

BTW ... it occurs to me to ask whether we really have a solid use-case
for having listeners attached to slave servers. I have personally never
seen an application for LISTEN/NOTIFY in which the listeners were
entirely read-only. Even if there are one or two cases out there, it's
not clear to me that supporting it is worth the extra complexity that
seems to be needed.

Actually, I've seen requests for it from my clients and on IRC. Not
sure how serious those are, but users have brought it up. Certainly
users intuitively think they should be able to LISTEN on a standby, and
are surprised when they find out they can't.

The basic idea is that if we can replicate LISTENs, then you can use
replication as a simple distributed (and lossy) queueing system. This
is especially useful if the replica is geographically distant, and there
are a lot of listeners.

The obvious first use case for this is for cache invalidation. For
example, we have one application where we're using Redis to queue cache
invalidation messages; if LISTEN/NOTIFY were replicated, we could use it
instead and simplify our infrastructure.

--
Josh Berkus
PostgreSQL Experts Inc.
http://pgexperts.com

#13Tom Lane
tgl@sss.pgh.pa.us
In reply to: Josh Berkus (#12)
Re: Sending notifications from the master to the standby

Josh Berkus <josh@agliodbs.com> writes:

BTW ... it occurs to me to ask whether we really have a solid use-case
for having listeners attached to slave servers. I have personally never
seen an application for LISTEN/NOTIFY in which the listeners were
entirely read-only. Even if there are one or two cases out there, it's
not clear to me that supporting it is worth the extra complexity that
seems to be needed.

The basic idea is that if we can replicate LISTENs, then you can use
replication as a simple distributed (and lossy) queueing system.

Well, this is exactly what I don't believe. A queueing system requires
that recipients be able to remove things from the queue. You can't do
that on a slave server, because you can't make any change in the
database that would be visible to other users.

The obvious first use case for this is for cache invalidation.

Yeah, upthread Simon pointed out that propagating notifies would be
useful for flushing caches in applications that watch the database in a
read-only fashion. I grant that such a use-case is technically possible
within the limitations of a slave server; I'm just dubious that it's a
sufficiently attractive use-case to justify the complexity and future
maintenance costs of the sort of designs we are talking about. Or in
other words: so far, cache invalidation is not the "first" use-case,
it's the ONLY POSSIBLE use-case. That's not useful enough.

regards, tom lane

#14Josh Berkus
josh@agliodbs.com
In reply to: Tom Lane (#13)
Re: Sending notifications from the master to the standby

Yeah, upthread Simon pointed out that propagating notifies would be
useful for flushing caches in applications that watch the database in a
read-only fashion. I grant that such a use-case is technically possible
within the limitations of a slave server; I'm just dubious that it's a
sufficiently attractive use-case to justify the complexity and future
maintenance costs of the sort of designs we are talking about. Or in
other words: so far, cache invalidation is not the "first" use-case,
it's the ONLY POSSIBLE use-case. That's not useful enough.

Well, cache invalidation is a pretty common task; probably more than 50%
of all database applications need to do it. Note that we're not just
talking about memcached for web applications here. For example, one of
the companies quoted for PostgreSQL 9.0 release uses LISTEN/NOTIFY to
inform remote devices (POS systems) that there's new data available for
them. That's a form of cache invalidation. It's certainly a more common
design pattern than using XML in the database.

However, there's the question of whether or not this patch actually
allows a master-slave replication system to support more Listeners more
efficiently than having them all simply listen to the master. And what
impact it has on the performance of LISTEN/NOTIFY on standalone systems.

--
Josh Berkus
PostgreSQL Experts Inc.
http://pgexperts.com

#15Peter Geoghegan
peter@2ndquadrant.com
In reply to: Josh Berkus (#14)
Re: Sending notifications from the master to the standby

On 11 January 2012 23:51, Josh Berkus <josh@agliodbs.com> wrote:

Yeah, upthread Simon pointed out that propagating notifies would be
useful for flushing caches in applications that watch the database in a
read-only fashion.  I grant that such a use-case is technically possible
within the limitations of a slave server; I'm just dubious that it's a
sufficiently attractive use-case to justify the complexity and future
maintenance costs of the sort of designs we are talking about.  Or in
other words: so far, cache invalidation is not the "first" use-case,
it's the ONLY POSSIBLE use-case.  That's not useful enough.

Well, cache invalidation is a pretty common task; probably more than 50%
of all database applications need to do it.

I agree that it would be nice to support this type of cache
invalidation - without commenting on the implementation, I think that
the concept is very useful, and of immediate benefit to a significant
number of people.

--
Peter Geoghegan       http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Training and Services

#16Simon Riggs
simon@2ndQuadrant.com
In reply to: Tom Lane (#13)
Re: Sending notifications from the master to the standby

On Wed, Jan 11, 2012 at 11:38 PM, Tom Lane <tgl@sss.pgh.pa.us> wrote:

The obvious first use case for this is for cache invalidation.

Yeah, upthread Simon pointed out that propagating notifies would be
useful for flushing caches in applications that watch the database in a
read-only fashion.  I grant that such a use-case is technically possible
within the limitations of a slave server; I'm just dubious that it's a
sufficiently attractive use-case to justify the complexity and future
maintenance costs of the sort of designs we are talking about.  Or in
other words: so far, cache invalidation is not the "first" use-case,
it's the ONLY POSSIBLE use-case.  That's not useful enough.

Many people clearly do think this is useful.

I personally don't think it will be that complex. I'm willing to
review and maintain it if the patch works the way we want it to.

--
 Simon Riggs                   http://www.2ndQuadrant.com/
 PostgreSQL Development, 24x7 Support, Training & Services

#17Josh Berkus
josh@agliodbs.com
In reply to: Simon Riggs (#16)
Re: Sending notifications from the master to the standby

Many people clearly do think this is useful.

It also comes under the heading of "avoiding surprising behavior". That
is, users instinctively expect to be able to LISTEN on standbys, and are
surprised when they can't.

I personally don't think it will be that complex. I'm willing to
review and maintain it if the patch works the way we want it to.

I think we need some performance testing for the review for it to be valid.

1) How does this patch affect the speed and throughput of LISTEN/NOTIFY
on a standalone server?

2) Can we actually attach more LISTENers to multiple standbys than we
could to a single Master?

Unfortunately, I don't have an application which can LISTEN in a way
which doesn't eclipse any differences in througput or response time we
would see on the DB side. Does anyone?

--
Josh Berkus
PostgreSQL Experts Inc.
http://pgexperts.com