synchronous_commit = apply

Started by Thomas Munroover 10 years ago10 messages
#1Thomas Munro
thomas.munro@enterprisedb.com
1 attachment(s)

Hi

Do you think it's reasonable to want to COMMIT a particular transaction on
a master node, and then immediately run a read-only query on a hot standby
node that is guaranteed to see that transaction?

A friend of mine who works with a different RDBMS technology that can do
that asked me how to achieve this with Postgres, and I suggested waiting
for the standby's pg_last_xlog_replay_location() to be >= the master's
pg_current_xlog_location() after COMMIT, which might involve some looping
and sleeping.

As a quick weekend learning exercise/hack I recently went looking into how
we could support $SUBJECT. I discovered we already report the apply
progress back to the master, and the synchronous waiting facility seemed to
be all ready to support this. In fact it seemed a little too easy so
something tells me it must be wrong! But anyway, please see the attached
toy POC patch which does that.

The next problem is that the master can be waiting quite a long time for a
reply from the remote walreceiver containing the desired apply LSN: in the
best case it learns of apply progress from replies to subsequent unrelated
records (which might be very soon on a busy system but still involves
waiting for the next transaction's WAL flush), and in the worst case it
needs to wait for wal_receiver_status_interval (10 seconds by default),
which makes for a long COMMIT delay. I was thinking that the solution to
that may be to teach StartupLOG to signal the walreceiver after it updates
XLogCtl->lastReplayedEndRecPtr, which should cause walrcv_receive to be
interrupted and return early, and then walreceiver could send a reply if it
sees that lastReplayedEndRecPtr has moved. Maybe that would generate an
unacceptably high frequency of signals, and maybe there is a better form of
IPC for this. Without introducing any new IPC, the walreceiver could
instead simply report apply progress to the master whenever it sees that
the apply LSN has moved after its regular NAPTIME_PER_CYCLE wait (100ms),
but that would still introduces bogus latency. A quick and dirty way to
see that on top of the attached patch is to set requestReply = true in
WalReceiverMain to force a send after every nap.

I can see that using synchronous_commit = apply in the practice might prove
difficult: how does a client know which node is the synchronous standby?
Perhaps those sorts of practical problems are the reason no one has done or
wanted this.

Thoughts?
Thanks for reading!

--
Thomas Munro
http://www.enterprisedb.com

Attachments:

synchronous-commit-apply.patchapplication/octet-stream; name=synchronous-commit-apply.patchDownload
diff --git a/src/backend/replication/syncrep.c b/src/backend/replication/syncrep.c
index 325239d..2e18768 100644
--- a/src/backend/replication/syncrep.c
+++ b/src/backend/replication/syncrep.c
@@ -462,6 +462,11 @@ SyncRepReleaseWaiters(void)
 		walsndctl->lsn[SYNC_REP_WAIT_FLUSH] = MyWalSnd->flush;
 		numflush = SyncRepWakeQueue(false, SYNC_REP_WAIT_FLUSH);
 	}
+	if (walsndctl->lsn[SYNC_REP_WAIT_APPLY] < MyWalSnd->apply)
+	{
+		walsndctl->lsn[SYNC_REP_WAIT_APPLY] = MyWalSnd->apply;
+		numflush = SyncRepWakeQueue(false, SYNC_REP_WAIT_APPLY);
+	}
 
 	LWLockRelease(SyncRepLock);
 
@@ -728,6 +733,9 @@ assign_synchronous_commit(int newval, void *extra)
 		case SYNCHRONOUS_COMMIT_REMOTE_FLUSH:
 			SyncRepWaitMode = SYNC_REP_WAIT_FLUSH;
 			break;
+		case SYNCHRONOUS_COMMIT_REMOTE_APPLY:
+			SyncRepWaitMode = SYNC_REP_WAIT_APPLY;
+			break;
 		default:
 			SyncRepWaitMode = SYNC_REP_NO_WAIT;
 			break;
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index b3dac51..bbabc58 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -351,6 +351,7 @@ static const struct config_enum_entry constraint_exclusion_options[] = {
 static const struct config_enum_entry synchronous_commit_options[] = {
 	{"local", SYNCHRONOUS_COMMIT_LOCAL_FLUSH, false},
 	{"remote_write", SYNCHRONOUS_COMMIT_REMOTE_WRITE, false},
+	{"apply", SYNCHRONOUS_COMMIT_REMOTE_APPLY, false},
 	{"on", SYNCHRONOUS_COMMIT_ON, false},
 	{"off", SYNCHRONOUS_COMMIT_OFF, false},
 	{"true", SYNCHRONOUS_COMMIT_ON, true},
diff --git a/src/include/access/xact.h b/src/include/access/xact.h
index cb1c2db..d8433e2 100644
--- a/src/include/access/xact.h
+++ b/src/include/access/xact.h
@@ -60,7 +60,9 @@ typedef enum
 	SYNCHRONOUS_COMMIT_LOCAL_FLUSH,		/* wait for local flush only */
 	SYNCHRONOUS_COMMIT_REMOTE_WRITE,	/* wait for local flush and remote
 										 * write */
-	SYNCHRONOUS_COMMIT_REMOTE_FLUSH		/* wait for local and remote flush */
+	SYNCHRONOUS_COMMIT_REMOTE_FLUSH,	/* wait for local and remote flush */
+	SYNCHRONOUS_COMMIT_REMOTE_APPLY		/* wait for local flush and remote
+										 * apply */
 }	SyncCommitLevel;
 
 /* Define the default setting for synchonous_commit */
diff --git a/src/include/replication/syncrep.h b/src/include/replication/syncrep.h
index 71e2857..8e0fe00 100644
--- a/src/include/replication/syncrep.h
+++ b/src/include/replication/syncrep.h
@@ -23,8 +23,9 @@
 #define SYNC_REP_NO_WAIT		-1
 #define SYNC_REP_WAIT_WRITE		0
 #define SYNC_REP_WAIT_FLUSH		1
+#define SYNC_REP_WAIT_APPLY		2
 
-#define NUM_SYNC_REP_WAIT_MODE	2
+#define NUM_SYNC_REP_WAIT_MODE	3
 
 /* syncRepState */
 #define SYNC_REP_NOT_WAITING		0
#2Jaime Casanova
jaime.casanova@2ndquadrant.com
In reply to: Thomas Munro (#1)
Re: synchronous_commit = apply

On 1 September 2015 at 20:25, Thomas Munro
<thomas.munro@enterprisedb.com> wrote:

Hi

Do you think it's reasonable to want to COMMIT a particular transaction on a
master node, and then immediately run a read-only query on a hot standby
node that is guaranteed to see that transaction?

well, that is important to make load balancing completely safe (not
returning old data when is important to get the latest).
Having said that, i have never seen a case where the apply lag
postgres has really matters or where the cause of the apply lag (I/O)
doesn't get worst if we try to apply immediatly.

Other solutions use a cache on top to apply in-memory at the cost of
getting inconsistent in a failure.

A friend of mine who works with a different RDBMS technology that can do
that asked me how to achieve this with Postgres, and I suggested waiting for
the standby's pg_last_xlog_replay_location() to be >= the master's
pg_current_xlog_location() after COMMIT, which might involve some looping
and sleeping.

As a quick weekend learning exercise/hack I recently went looking into how
we could support $SUBJECT. I discovered we already report the apply
progress back to the master, and the synchronous waiting facility seemed to
be all ready to support this. In fact it seemed a little too easy so
something tells me it must be wrong! But anyway, please see the attached
toy POC patch which does that.

i haven't seen the patch, but probably is as easy as you see it...
IIRC, Simon proposed a patch for this a few years ago and this was
actually contempleted from the beggining in the design of SR.

I guess there were good reasons the patch didn't get applied, i found
this thread and in this one Simon suggest is not the first time he
submitted that option so it should be other threads too:
/messages/by-id/AANLkTinxoYmWoWBsJxmnpJHJh_YAN9vFmnmhNJDMev4M@mail.gmail.com

--
Jaime Casanova www.2ndQuadrant.com
Professional PostgreSQL: Soporte 24x7 y capacitación

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

#3Fujii Masao
masao.fujii@gmail.com
In reply to: Thomas Munro (#1)
Re: synchronous_commit = apply

On Wed, Sep 2, 2015 at 10:25 AM, Thomas Munro
<thomas.munro@enterprisedb.com> wrote:

Hi

Do you think it's reasonable to want to COMMIT a particular transaction on a
master node, and then immediately run a read-only query on a hot standby
node that is guaranteed to see that transaction?

A friend of mine who works with a different RDBMS technology that can do
that asked me how to achieve this with Postgres, and I suggested waiting for
the standby's pg_last_xlog_replay_location() to be >= the master's
pg_current_xlog_location() after COMMIT, which might involve some looping
and sleeping.

As a quick weekend learning exercise/hack I recently went looking into how
we could support $SUBJECT. I discovered we already report the apply
progress back to the master, and the synchronous waiting facility seemed to
be all ready to support this. In fact it seemed a little too easy so
something tells me it must be wrong! But anyway, please see the attached
toy POC patch which does that.

The next problem is that the master can be waiting quite a long time for a
reply from the remote walreceiver containing the desired apply LSN: in the
best case it learns of apply progress from replies to subsequent unrelated
records (which might be very soon on a busy system but still involves
waiting for the next transaction's WAL flush), and in the worst case it
needs to wait for wal_receiver_status_interval (10 seconds by default),
which makes for a long COMMIT delay. I was thinking that the solution to
that may be to teach StartupLOG to signal the walreceiver after it updates
XLogCtl->lastReplayedEndRecPtr, which should cause walrcv_receive to be
interrupted and return early, and then walreceiver could send a reply if it
sees that lastReplayedEndRecPtr has moved. Maybe that would generate an
unacceptably high frequency of signals

One idea is to change the standby so that it manages the locations
that the backends in "apply" mode are waiting for in the master,
and to make the startup process wake the walreceiver up whenever
the replay location reaches either of those locations. In this idea,
walreceiver sends back the "apply" location to the master only when
needed.

Regards,

--
Fujii Masao

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

#4Robert Haas
robertmhaas@gmail.com
In reply to: Thomas Munro (#1)
Re: synchronous_commit = apply

On Tue, Sep 1, 2015 at 9:25 PM, Thomas Munro
<thomas.munro@enterprisedb.com> wrote:

The next problem is that the master can be waiting quite a long time for a
reply from the remote walreceiver containing the desired apply LSN: in the
best case it learns of apply progress from replies to subsequent unrelated
records (which might be very soon on a busy system but still involves
waiting for the next transaction's WAL flush), and in the worst case it
needs to wait for wal_receiver_status_interval (10 seconds by default),
which makes for a long COMMIT delay. I was thinking that the solution to
that may be to teach StartupLOG to signal the walreceiver after it updates
XLogCtl->lastReplayedEndRecPtr, which should cause walrcv_receive to be
interrupted and return early, and then walreceiver could send a reply if it
sees that lastReplayedEndRecPtr has moved. Maybe that would generate an
unacceptably high frequency of signals, and maybe there is a better form of
IPC for this.

Yeah, that could be a problem, as could reply volume. If you've got a
bunch of heap inserts of narrow rows into some table, you don't really
want to send a reply after each one. That would be a lot of replies,
and nobody can really care about them anyway, at least not for
synchronous_commit purposes. But what if you only sent a signal when
the just-replayed record was a COMMIT record? I suppose that could
still be a lot of replies on something like a full-tilt pgbench
workload, but even in that case it would help a lot.

--
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

#5Thomas Munro
thomas.munro@enterprisedb.com
In reply to: Jaime Casanova (#2)
2 attachment(s)
Re: synchronous_commit = apply

[Combining replies to emails from different authors into one message]

On Wed, Sep 2, 2015 at 2:21 PM, Jaime Casanova <
jaime.casanova@2ndquadrant.com> wrote:

On 1 September 2015 at 20:25, Thomas Munro <thomas.munro@enterprisedb.com>
wrote:

As a quick weekend learning exercise/hack I recently went looking into how

we could support $SUBJECT. I discovered we already report the apply
progress back to the master, and the synchronous waiting facility seemed

to

be all ready to support this. In fact it seemed a little too easy so
something tells me it must be wrong! But anyway, please see the attached
toy POC patch which does that.

i haven't seen the patch, but probably is as easy as you see it...
IIRC, Simon proposed a patch for this a few years ago and this was
actually contempleted from the beggining in the design of SR.

Ah, thanks, that certainly explains that. The source code practically had
big arrows pointing to the place to type. I don't want to step on anyone's
toes, so if Simon or anyone else is actively working on this, please let me
know, I'll happily cease and desist.

On Thu, Sep 3, 2015 at 12:35 AM, Robert Haas <robertmhaas@gmail.com> wrote:

On Tue, Sep 1, 2015 at 9:25 PM, Thomas Munro
<thomas.munro@enterprisedb.com> wrote:

The next problem is that the master can be waiting quite a long time for

a

reply from the remote walreceiver containing the desired apply LSN: in

the

best case it learns of apply progress from replies to subsequent

unrelated

records (which might be very soon on a busy system but still involves
waiting for the next transaction's WAL flush), and in the worst case it
needs to wait for wal_receiver_status_interval (10 seconds by default),
which makes for a long COMMIT delay. I was thinking that the solution to
that may be to teach StartupLOG to signal the walreceiver after it

updates

XLogCtl->lastReplayedEndRecPtr, which should cause walrcv_receive to be
interrupted and return early, and then walreceiver could send a reply if

it

sees that lastReplayedEndRecPtr has moved. Maybe that would generate an
unacceptably high frequency of signals, and maybe there is a better form

of

IPC for this.

Yeah, that could be a problem, as could reply volume. If you've got a
bunch of heap inserts of narrow rows into some table, you don't really
want to send a reply after each one. That would be a lot of replies,
and nobody can really care about them anyway, at least not for
synchronous_commit purposes. But what if you only sent a signal when
the just-replayed record was a COMMIT record? I suppose that could
still be a lot of replies on something like a full-tilt pgbench
workload, but even in that case it would help a lot.

Here's a version that does that. It's still ugly POC code for now -- the
flow control in walreceiver.c probably needs a bit of refactoring so it
doesn't have to do the same work in two different places, and it needs some
thought about how it balances time spent write wal and sending replies.
But ... it seems to work for simple tests.

I have also attached a test program. Here are some numbers I measured with
master and standby running on my laptop using that program:

synchronous_commit loops Time TPS
off 10000 0.841s 11890
local 10000 1.869s 5350
remote_write 10000 3.123s 3202
on 10000 3.085s 3241
apply 10000 3.361s 2975

If you run it with "--check" you can see that the changes are not always
immediately visible in anything below "apply" and are always visible in
"apply". (I can't explain why "on" consistently beats "remote_write" on my
machine by a small margin... Maybe something to do with being an assert
build.)

On Thu, Sep 3, 2015 at 12:02 AM, Fujii Masao <masao.fujii@gmail.com> wrote:

One idea is to change the standby so that it manages the locations
that the backends in "apply" mode are waiting for in the master,
and to make the startup process wake the walreceiver up whenever
the replay location reaches either of those locations. In this idea,
walreceiver sends back the "apply" location to the master only when
needed.

Hmm. So maybe commit records could have a flag saying 'someone is waiting
for this to commit to apply', and the startup process's apply loop would
only bother to signal the walreceiver if it sees that flag. I will try
that.

--
Thomas Munro
http://www.enterprisedb.com

Attachments:

synchronous-commit-apply-v2.patchapplication/octet-stream; name=synchronous-commit-apply-v2.patchDownload
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 127bc58..3cd46af 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -6871,6 +6871,20 @@ StartupXLOG(void)
 				XLogCtl->lastReplayedTLI = ThisTimeLineID;
 				SpinLockRelease(&XLogCtl->info_lck);
 
+				/*
+				 * Tell walreceiver to notify the master so that any backends
+				 * blocked in COMMIT with synchronous_commit = apply can
+				 * continue.
+				 */
+				if (record->xl_rmid == RM_XACT_ID)
+				{
+					uint8 xact_info = record->xl_info & XLOG_XACT_OPMASK;
+
+					if (xact_info == XLOG_XACT_COMMIT ||
+						xact_info == XLOG_XACT_COMMIT_PREPARED)
+						WalRcvWakeup();
+				}
+
 				/* Remember this record as the last-applied one */
 				LastRec = ReadRecPtr;
 
diff --git a/src/backend/replication/syncrep.c b/src/backend/replication/syncrep.c
index 325239d..2e18768 100644
--- a/src/backend/replication/syncrep.c
+++ b/src/backend/replication/syncrep.c
@@ -462,6 +462,11 @@ SyncRepReleaseWaiters(void)
 		walsndctl->lsn[SYNC_REP_WAIT_FLUSH] = MyWalSnd->flush;
 		numflush = SyncRepWakeQueue(false, SYNC_REP_WAIT_FLUSH);
 	}
+	if (walsndctl->lsn[SYNC_REP_WAIT_APPLY] < MyWalSnd->apply)
+	{
+		walsndctl->lsn[SYNC_REP_WAIT_APPLY] = MyWalSnd->apply;
+		numflush = SyncRepWakeQueue(false, SYNC_REP_WAIT_APPLY);
+	}
 
 	LWLockRelease(SyncRepLock);
 
@@ -728,6 +733,9 @@ assign_synchronous_commit(int newval, void *extra)
 		case SYNCHRONOUS_COMMIT_REMOTE_FLUSH:
 			SyncRepWaitMode = SYNC_REP_WAIT_FLUSH;
 			break;
+		case SYNCHRONOUS_COMMIT_REMOTE_APPLY:
+			SyncRepWaitMode = SYNC_REP_WAIT_APPLY;
+			break;
 		default:
 			SyncRepWaitMode = SYNC_REP_NO_WAIT;
 			break;
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index 41e57f2..279c096 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -96,6 +96,7 @@ static uint32 recvOff = 0;
  */
 static volatile sig_atomic_t got_SIGHUP = false;
 static volatile sig_atomic_t got_SIGTERM = false;
+static volatile sig_atomic_t check_applied_lsn_flag = false;
 
 /*
  * LogstreamResult indicates the byte positions that we have already
@@ -138,7 +139,7 @@ static void WalRcvDie(int code, Datum arg);
 static void XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len);
 static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr);
 static void XLogWalRcvFlush(bool dying);
-static void XLogWalRcvSendReply(bool force, bool requestReply);
+static void XLogWalRcvSendReply(bool force, bool requestReply, XLogRecPtr applyLsn);
 static void XLogWalRcvSendHSFeedback(bool immed);
 static void ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime);
 
@@ -350,6 +351,7 @@ WalReceiverMain(void)
 								  slotname[0] != '\0' ? slotname : NULL))
 		{
 			bool		endofwal = false;
+			XLogRecPtr	last_sent_applied_lsn = InvalidXLogRecPtr;
 
 			if (first_stream)
 				ereport(LOG,
@@ -423,6 +425,9 @@ WalReceiverMain(void)
 							last_recv_timestamp = GetCurrentTimestamp();
 							ping_sent = false;
 							XLogWalRcvProcessMsg(buf[0], &buf[1], len - 1);
+							/* Check if we need to break out of this loop to send a reply. */
+							if (check_applied_lsn_flag)
+								break;
 						}
 						else if (len == 0)
 							break;
@@ -439,8 +444,20 @@ WalReceiverMain(void)
 						len = walrcv_receive(0, &buf);
 					}
 
-					/* Let the master know that we received some data. */
-					XLogWalRcvSendReply(false, false);
+					/* Let the master know that we applied commit records. */
+					if (check_applied_lsn_flag)
+					{
+						XLogRecPtr applied_lsn_snapshot = GetXLogReplayRecPtr(NULL);
+
+						check_applied_lsn_flag = false;
+						if (last_sent_applied_lsn != applied_lsn_snapshot)
+						{
+							last_sent_applied_lsn = applied_lsn_snapshot;
+							XLogWalRcvSendReply(true, true, applied_lsn_snapshot);
+						}
+					}
+					else
+						XLogWalRcvSendReply(false, false, InvalidXLogRecPtr);
 
 					/*
 					 * If we've written some records, flush them to disk and
@@ -461,6 +478,7 @@ WalReceiverMain(void)
 					 * WAL.
 					 */
 					bool		requestReply = false;
+					XLogRecPtr	applied_lsn = InvalidXLogRecPtr;
 
 					/*
 					 * Check if time since last receive from standby has
@@ -495,7 +513,29 @@ WalReceiverMain(void)
 						}
 					}
 
-					XLogWalRcvSendReply(requestReply, requestReply);
+					/*
+					 * Check if the startup process has signalled us to report
+					 * that an interesting commit record has been applied.
+					 */
+					if (check_applied_lsn_flag)
+					{
+						/*
+						 * Check if the apply LSN has moved since we last
+						 * notified the master of our apply position.
+						 */
+						XLogRecPtr applied_lsn_snapshot;
+
+						check_applied_lsn_flag = false;
+						applied_lsn_snapshot = GetXLogReplayRecPtr(NULL);
+						if (applied_lsn_snapshot != last_sent_applied_lsn)
+						{
+							requestReply = true;
+							last_sent_applied_lsn = applied_lsn_snapshot;
+							applied_lsn = applied_lsn_snapshot;
+						}
+					}
+
+					XLogWalRcvSendReply(requestReply, requestReply, applied_lsn);
 					XLogWalRcvSendHSFeedback(false);
 				}
 			}
@@ -734,6 +774,7 @@ WalRcvSigUsr1Handler(SIGNAL_ARGS)
 {
 	int			save_errno = errno;
 
+	check_applied_lsn_flag = true;
 	latch_sigusr1_handler();
 
 	errno = save_errno;
@@ -846,7 +887,7 @@ XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len)
 
 				/* If the primary requested a reply, send one immediately */
 				if (replyRequested)
-					XLogWalRcvSendReply(true, false);
+					XLogWalRcvSendReply(true, false, InvalidXLogRecPtr);
 				break;
 			}
 		default:
@@ -1010,7 +1051,7 @@ XLogWalRcvFlush(bool dying)
 		/* Also let the master know that we made some progress */
 		if (!dying)
 		{
-			XLogWalRcvSendReply(false, false);
+			XLogWalRcvSendReply(false, false, InvalidXLogRecPtr);
 			XLogWalRcvSendHSFeedback(false);
 		}
 	}
@@ -1028,9 +1069,12 @@ XLogWalRcvFlush(bool dying)
  * If 'requestReply' is true, requests the server to reply immediately upon
  * receiving this message. This is used for heartbearts, when approaching
  * wal_receiver_timeout.
+ *
+ * If 'apply_lsn' is InvalidXLogRecPtr, the apply LSN is looked up in shmem if
+ * it is needed.  Otherwise, the value provided is used.
  */
 static void
-XLogWalRcvSendReply(bool force, bool requestReply)
+XLogWalRcvSendReply(bool force, bool requestReply, XLogRecPtr apply_lsn)
 {
 	static XLogRecPtr writePtr = 0;
 	static XLogRecPtr flushPtr = 0;
@@ -1221,3 +1265,19 @@ ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime)
 		pfree(receipttime);
 	}
 }
+
+/*
+ * Wake up the walreceiver if it happens to be blocked in walrcv_receive,
+ * and tell it that a commit record has been applied.
+ *
+ * This is called by the startup process whenever interesting xlog records
+ * are applied, so that walreceiver can check if it needs to send an apply
+ * notification back to the master which may be waiting in a COMMIT with
+ * synchronous_commit = apply.
+ */
+void
+WalRcvWakeup(void)
+{
+	if (WalRcv->pid != 0)
+		kill(WalRcv->pid, SIGUSR1);
+}
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index b3dac51..bbabc58 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -351,6 +351,7 @@ static const struct config_enum_entry constraint_exclusion_options[] = {
 static const struct config_enum_entry synchronous_commit_options[] = {
 	{"local", SYNCHRONOUS_COMMIT_LOCAL_FLUSH, false},
 	{"remote_write", SYNCHRONOUS_COMMIT_REMOTE_WRITE, false},
+	{"apply", SYNCHRONOUS_COMMIT_REMOTE_APPLY, false},
 	{"on", SYNCHRONOUS_COMMIT_ON, false},
 	{"off", SYNCHRONOUS_COMMIT_OFF, false},
 	{"true", SYNCHRONOUS_COMMIT_ON, true},
diff --git a/src/include/access/xact.h b/src/include/access/xact.h
index cb1c2db..d8433e2 100644
--- a/src/include/access/xact.h
+++ b/src/include/access/xact.h
@@ -60,7 +60,9 @@ typedef enum
 	SYNCHRONOUS_COMMIT_LOCAL_FLUSH,		/* wait for local flush only */
 	SYNCHRONOUS_COMMIT_REMOTE_WRITE,	/* wait for local flush and remote
 										 * write */
-	SYNCHRONOUS_COMMIT_REMOTE_FLUSH		/* wait for local and remote flush */
+	SYNCHRONOUS_COMMIT_REMOTE_FLUSH,	/* wait for local and remote flush */
+	SYNCHRONOUS_COMMIT_REMOTE_APPLY		/* wait for local flush and remote
+										 * apply */
 }	SyncCommitLevel;
 
 /* Define the default setting for synchonous_commit */
diff --git a/src/include/replication/syncrep.h b/src/include/replication/syncrep.h
index 71e2857..8e0fe00 100644
--- a/src/include/replication/syncrep.h
+++ b/src/include/replication/syncrep.h
@@ -23,8 +23,9 @@
 #define SYNC_REP_NO_WAIT		-1
 #define SYNC_REP_WAIT_WRITE		0
 #define SYNC_REP_WAIT_FLUSH		1
+#define SYNC_REP_WAIT_APPLY		2
 
-#define NUM_SYNC_REP_WAIT_MODE	2
+#define NUM_SYNC_REP_WAIT_MODE	3
 
 /* syncRepState */
 #define SYNC_REP_NOT_WAITING		0
diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h
index 61255a9..3256ed3 100644
--- a/src/include/replication/walreceiver.h
+++ b/src/include/replication/walreceiver.h
@@ -160,5 +160,6 @@ extern void RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr,
 extern XLogRecPtr GetWalRcvWriteRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI);
 extern int	GetReplicationApplyDelay(void);
 extern int	GetReplicationTransferLatency(void);
+extern void WalRcvWakeup(void);
 
 #endif   /* _WALRECEIVER_H */
test-sync-apply.ctext/x-csrc; charset=US-ASCII; name=test-sync-apply.cDownload
#6Thomas Munro
thomas.munro@enterprisedb.com
In reply to: Thomas Munro (#5)
1 attachment(s)
Re: synchronous_commit = apply

On Tue, Sep 8, 2015 at 1:11 AM, Thomas Munro <thomas.munro@enterprisedb.com>
wrote:

On Thu, Sep 3, 2015 at 12:02 AM, Fujii Masao <masao.fujii@gmail.com>
wrote:

One idea is to change the standby so that it manages the locations
that the backends in "apply" mode are waiting for in the master,
and to make the startup process wake the walreceiver up whenever
the replay location reaches either of those locations. In this idea,
walreceiver sends back the "apply" location to the master only when
needed.

Hmm. So maybe commit records could have a flag saying 'someone is waiting
for this to commit to apply', and the startup process's apply loop would
only bother to signal the walreceiver if it sees that flag. I will try
that.

Here is a version that does that, using a bit in xinfo to request apply
feedback from standbys when running with synchronous_commit = apply.

I am not very happy with the way that xact_redo communicates with the main
apply loop when it sees that bit, through calls to
XLogAppliedSynchronousCommit (essentially a global variable), but I
couldn't immediately see a better way to get information out of xact_redo
into the apply loop without changing the rm_redo interface. Perhaps xinfo
is the wrong place for that information. Thoughts?

--
Thomas Munro
http://www.enterprisedb.com

Attachments:

synchronous-commit-apply-v3.patchapplication/octet-stream; name=synchronous-commit-apply-v3.patchDownload
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index 56a1cb4..2f13b63 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -5099,6 +5099,13 @@ XactLogCommitRecord(TimestampTz commit_time,
 		xl_xinfo.xinfo |= XACT_COMPLETION_FORCE_SYNC_COMMIT;
 
 	/*
+	 * Check if the caller would like to ask standbys for immediate feedback
+	 * once this commit is applied.
+	 */
+	if (synchronous_commit >= SYNCHRONOUS_COMMIT_REMOTE_APPLY)
+		xl_xinfo.xinfo |= XACT_COMPLETION_SYNC_APPLY_FEEDBACK;
+
+	/*
 	 * Relcache invalidations requires information about the current database
 	 * and so does logical decoding.
 	 */
@@ -5435,6 +5442,12 @@ xact_redo_commit(xl_xact_parsed_commit *parsed,
 	if (XactCompletionForceSyncCommit(parsed->xinfo))
 		XLogFlush(lsn);
 
+	/*
+	 * This commit record has someone waiting for apply feedback, so we tell
+	 * the xlog apply loop about that so it can generate a reply.
+	 */
+	if (XactCompletionSyncApplyFeedback(parsed->xinfo))
+		XLogAppliedSynchronousCommit(true);
 }
 
 /*
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index a092aad..373204a 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -6871,6 +6871,15 @@ StartupXLOG(void)
 				XLogCtl->lastReplayedTLI = ThisTimeLineID;
 				SpinLockRelease(&XLogCtl->info_lck);
 
+				/*
+				 * If rm_redo reported that it applied a commit record that
+				 * the master is waiting for, then we wake up the receiver so
+				 * that it notices the updated lastReplayedEndRecPtr and sends
+				 * a reply to the master.
+				 */
+				if (XLogAppliedSynchronousCommit(false))
+					WalRcvWakeup();
+
 				/* Remember this record as the last-applied one */
 				LastRec = ReadRecPtr;
 
@@ -11637,3 +11646,17 @@ SetWalWriterSleeping(bool sleeping)
 	XLogCtl->WalWriterSleeping = sleeping;
 	SpinLockRelease(&XLogCtl->info_lck);
 }
+
+/*
+ * Update the flag to indicate that a commit record has been applied, and
+ * return the previous value.
+ */
+bool
+XLogAppliedSynchronousCommit(bool value)
+{
+	static bool last_value = false;
+	bool result = last_value;
+
+	last_value = value;
+	return result;
+}
diff --git a/src/backend/replication/syncrep.c b/src/backend/replication/syncrep.c
index 325239d..2e18768 100644
--- a/src/backend/replication/syncrep.c
+++ b/src/backend/replication/syncrep.c
@@ -462,6 +462,11 @@ SyncRepReleaseWaiters(void)
 		walsndctl->lsn[SYNC_REP_WAIT_FLUSH] = MyWalSnd->flush;
 		numflush = SyncRepWakeQueue(false, SYNC_REP_WAIT_FLUSH);
 	}
+	if (walsndctl->lsn[SYNC_REP_WAIT_APPLY] < MyWalSnd->apply)
+	{
+		walsndctl->lsn[SYNC_REP_WAIT_APPLY] = MyWalSnd->apply;
+		numflush = SyncRepWakeQueue(false, SYNC_REP_WAIT_APPLY);
+	}
 
 	LWLockRelease(SyncRepLock);
 
@@ -728,6 +733,9 @@ assign_synchronous_commit(int newval, void *extra)
 		case SYNCHRONOUS_COMMIT_REMOTE_FLUSH:
 			SyncRepWaitMode = SYNC_REP_WAIT_FLUSH;
 			break;
+		case SYNCHRONOUS_COMMIT_REMOTE_APPLY:
+			SyncRepWaitMode = SYNC_REP_WAIT_APPLY;
+			break;
 		default:
 			SyncRepWaitMode = SYNC_REP_NO_WAIT;
 			break;
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index 41e57f2..fde7d97 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -96,6 +96,7 @@ static uint32 recvOff = 0;
  */
 static volatile sig_atomic_t got_SIGHUP = false;
 static volatile sig_atomic_t got_SIGTERM = false;
+static volatile sig_atomic_t got_SIGUSR1 = false;
 
 /*
  * LogstreamResult indicates the byte positions that we have already
@@ -138,7 +139,7 @@ static void WalRcvDie(int code, Datum arg);
 static void XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len);
 static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr);
 static void XLogWalRcvFlush(bool dying);
-static void XLogWalRcvSendReply(bool force, bool requestReply);
+static void XLogWalRcvSendReply(bool force, bool requestReply, XLogRecPtr applyLsn);
 static void XLogWalRcvSendHSFeedback(bool immed);
 static void ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime);
 
@@ -182,6 +183,24 @@ DisableWalRcvImmediateExit(void)
 	ProcessWalRcvInterrupts();
 }
 
+/*
+ * Check if the applied LSN has moved.  If it has, send a message to the
+ * primary server.  This is called by the walreceiver when it receives a
+ * signal from the startup process.
+ */
+static void
+SendApplyNotificationIfMoved(XLogRecPtr *last_sent_applied_lsn)
+{
+	XLogRecPtr applied_lsn;
+
+	applied_lsn = GetXLogReplayRecPtr(NULL);
+	if (applied_lsn != *last_sent_applied_lsn)
+	{
+		XLogWalRcvSendReply(true, true, applied_lsn);
+		*last_sent_applied_lsn = applied_lsn;
+	}
+}
+
 /* Main entry point for walreceiver process */
 void
 WalReceiverMain(void)
@@ -350,6 +369,7 @@ WalReceiverMain(void)
 								  slotname[0] != '\0' ? slotname : NULL))
 		{
 			bool		endofwal = false;
+			XLogRecPtr	last_sent_applied_lsn = InvalidXLogRecPtr;
 
 			if (first_stream)
 				ereport(LOG,
@@ -412,7 +432,7 @@ WalReceiverMain(void)
 					 * Process the received data, and any subsequent data we
 					 * can read without blocking.
 					 */
-					for (;;)
+					while (!got_SIGUSR1)
 					{
 						if (len > 0)
 						{
@@ -439,8 +459,15 @@ WalReceiverMain(void)
 						len = walrcv_receive(0, &buf);
 					}
 
+					/* Check if the startup process has signaled us. */
+					if (got_SIGUSR1)
+					{
+						got_SIGUSR1 = false;
+						SendApplyNotificationIfMoved(&last_sent_applied_lsn);
+					}
+
 					/* Let the master know that we received some data. */
-					XLogWalRcvSendReply(false, false);
+					XLogWalRcvSendReply(false, false, InvalidXLogRecPtr);
 
 					/*
 					 * If we've written some records, flush them to disk and
@@ -495,7 +522,14 @@ WalReceiverMain(void)
 						}
 					}
 
-					XLogWalRcvSendReply(requestReply, requestReply);
+					/* Check if the startup process has signaled us. */
+					if (got_SIGUSR1)
+					{
+						got_SIGUSR1 = false;
+						SendApplyNotificationIfMoved(&last_sent_applied_lsn);
+					}
+
+					XLogWalRcvSendReply(requestReply, requestReply, InvalidXLogRecPtr);
 					XLogWalRcvSendHSFeedback(false);
 				}
 			}
@@ -734,6 +768,7 @@ WalRcvSigUsr1Handler(SIGNAL_ARGS)
 {
 	int			save_errno = errno;
 
+	got_SIGUSR1 = true;
 	latch_sigusr1_handler();
 
 	errno = save_errno;
@@ -846,7 +881,7 @@ XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len)
 
 				/* If the primary requested a reply, send one immediately */
 				if (replyRequested)
-					XLogWalRcvSendReply(true, false);
+					XLogWalRcvSendReply(true, false, InvalidXLogRecPtr);
 				break;
 			}
 		default:
@@ -1010,7 +1045,7 @@ XLogWalRcvFlush(bool dying)
 		/* Also let the master know that we made some progress */
 		if (!dying)
 		{
-			XLogWalRcvSendReply(false, false);
+			XLogWalRcvSendReply(false, false, InvalidXLogRecPtr);
 			XLogWalRcvSendHSFeedback(false);
 		}
 	}
@@ -1028,9 +1063,12 @@ XLogWalRcvFlush(bool dying)
  * If 'requestReply' is true, requests the server to reply immediately upon
  * receiving this message. This is used for heartbearts, when approaching
  * wal_receiver_timeout.
+ *
+ * If 'apply_lsn' is InvalidXLogRecPtr, the apply LSN is looked up in shmem if
+ * it is needed.  Otherwise, the value provided is used.
  */
 static void
-XLogWalRcvSendReply(bool force, bool requestReply)
+XLogWalRcvSendReply(bool force, bool requestReply, XLogRecPtr apply_lsn)
 {
 	static XLogRecPtr writePtr = 0;
 	static XLogRecPtr flushPtr = 0;
@@ -1221,3 +1259,19 @@ ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime)
 		pfree(receipttime);
 	}
 }
+
+/*
+ * Wake up the walreceiver if it happens to be blocked in walrcv_receive,
+ * and tell it that a commit record has been applied.
+ *
+ * This is called by the startup process whenever interesting xlog records
+ * are applied, so that walreceiver can check if it needs to send an apply
+ * notification back to the master which may be waiting in a COMMIT with
+ * synchronous_commit = apply.
+ */
+void
+WalRcvWakeup(void)
+{
+	if (WalRcv->pid != 0)
+		kill(WalRcv->pid, SIGUSR1);
+}
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index 8ebf424..39f14fc 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -351,6 +351,7 @@ static const struct config_enum_entry constraint_exclusion_options[] = {
 static const struct config_enum_entry synchronous_commit_options[] = {
 	{"local", SYNCHRONOUS_COMMIT_LOCAL_FLUSH, false},
 	{"remote_write", SYNCHRONOUS_COMMIT_REMOTE_WRITE, false},
+	{"apply", SYNCHRONOUS_COMMIT_REMOTE_APPLY, false},
 	{"on", SYNCHRONOUS_COMMIT_ON, false},
 	{"off", SYNCHRONOUS_COMMIT_OFF, false},
 	{"true", SYNCHRONOUS_COMMIT_ON, true},
diff --git a/src/include/access/xact.h b/src/include/access/xact.h
index cb1c2db..7fec300 100644
--- a/src/include/access/xact.h
+++ b/src/include/access/xact.h
@@ -60,7 +60,9 @@ typedef enum
 	SYNCHRONOUS_COMMIT_LOCAL_FLUSH,		/* wait for local flush only */
 	SYNCHRONOUS_COMMIT_REMOTE_WRITE,	/* wait for local flush and remote
 										 * write */
-	SYNCHRONOUS_COMMIT_REMOTE_FLUSH		/* wait for local and remote flush */
+	SYNCHRONOUS_COMMIT_REMOTE_FLUSH,	/* wait for local and remote flush */
+	SYNCHRONOUS_COMMIT_REMOTE_APPLY		/* wait for local flush and remote
+										 * apply */
 }	SyncCommitLevel;
 
 /* Define the default setting for synchonous_commit */
@@ -144,10 +146,13 @@ typedef void (*SubXactCallback) (SubXactEvent event, SubTransactionId mySubid,
  * EOXact... routines which run at the end of the original transaction
  * completion.
  */
+#define XACT_COMPLETION_SYNC_APPLY_FEEDBACK		(1U << 29)
 #define XACT_COMPLETION_UPDATE_RELCACHE_FILE	(1U << 30)
 #define XACT_COMPLETION_FORCE_SYNC_COMMIT		(1U << 31)
 
 /* Access macros for above flags */
+#define XactCompletionSyncApplyFeedback(xinfo) \
+	(!!(xinfo & XACT_COMPLETION_SYNC_APPLY_FEEDBACK))
 #define XactCompletionRelcacheInitFileInval(xinfo) \
 	(!!(xinfo & XACT_COMPLETION_UPDATE_RELCACHE_FILE))
 #define XactCompletionForceSyncCommit(xinfo) \
@@ -368,6 +373,8 @@ extern XLogRecPtr XactLogAbortRecord(TimestampTz abort_time,
 				   TransactionId twophase_xid);
 extern void xact_redo(XLogReaderState *record);
 
+extern bool XactSyncApplyFeedbackRequested(XLogReaderState *record);
+
 /* xactdesc.c */
 extern void xact_desc(StringInfo buf, XLogReaderState *record);
 extern const char *xact_identify(uint8 info);
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index 790ca66..93efa29 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -266,6 +266,7 @@ extern void RemovePromoteSignalFiles(void);
 extern bool CheckPromoteSignal(void);
 extern void WakeupRecovery(void);
 extern void SetWalWriterSleeping(bool sleeping);
+extern bool XLogAppliedSynchronousCommit(bool value);
 
 extern void assign_max_wal_size(int newval, void *extra);
 extern void assign_checkpoint_completion_target(double newval, void *extra);
diff --git a/src/include/replication/syncrep.h b/src/include/replication/syncrep.h
index 71e2857..8e0fe00 100644
--- a/src/include/replication/syncrep.h
+++ b/src/include/replication/syncrep.h
@@ -23,8 +23,9 @@
 #define SYNC_REP_NO_WAIT		-1
 #define SYNC_REP_WAIT_WRITE		0
 #define SYNC_REP_WAIT_FLUSH		1
+#define SYNC_REP_WAIT_APPLY		2
 
-#define NUM_SYNC_REP_WAIT_MODE	2
+#define NUM_SYNC_REP_WAIT_MODE	3
 
 /* syncRepState */
 #define SYNC_REP_NOT_WAITING		0
diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h
index 61255a9..3256ed3 100644
--- a/src/include/replication/walreceiver.h
+++ b/src/include/replication/walreceiver.h
@@ -160,5 +160,6 @@ extern void RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr,
 extern XLogRecPtr GetWalRcvWriteRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI);
 extern int	GetReplicationApplyDelay(void);
 extern int	GetReplicationTransferLatency(void);
+extern void WalRcvWakeup(void);
 
 #endif   /* _WALRECEIVER_H */
#7Simon Riggs
simon@2ndQuadrant.com
In reply to: Thomas Munro (#1)
Re: synchronous_commit = apply

On 1 September 2015 at 20:25, Thomas Munro <thomas.munro@enterprisedb.com>
wrote:

Do you think it's reasonable to want to COMMIT a particular transaction on
a master node, and then immediately run a read-only query on a hot standby
node that is guaranteed to see that transaction?

Yes, that is reasonable and we've been discussing it for a few years now.

A friend of mine who works with a different RDBMS technology that can do
that asked me how to achieve this with Postgres, and I suggested waiting
for the standby's pg_last_xlog_replay_location() to be >= the master's
pg_current_xlog_location() after COMMIT, which might involve some looping
and sleeping.

As a quick weekend learning exercise/hack I recently went looking into how
we could support $SUBJECT. I discovered we already report the apply
progress back to the master, and the synchronous waiting facility seemed to
be all ready to support this. In fact it seemed a little too easy so
something tells me it must be wrong! But anyway, please see the attached
toy POC patch which does that.

As you say, that is the easy part.

The next problem is that the master can be waiting quite a long time for a
reply from the remote walreceiver containing the desired apply LSN: in the
best case it learns of apply progress from replies to subsequent unrelated
records (which might be very soon on a busy system but still involves
waiting for the next transaction's WAL flush), and in the worst case it
needs to wait for wal_receiver_status_interval (10 seconds by default),
which makes for a long COMMIT delay. I was thinking that the solution to
that may be to teach StartupLOG to signal the walreceiver after it updates
XLogCtl->lastReplayedEndRecPtr, which should cause walrcv_receive to be
interrupted and return early, and then walreceiver could send a reply if it
sees that lastReplayedEndRecPtr has moved. Maybe that would generate an
unacceptably high frequency of signals, and maybe there is a better form of
IPC for this. Without introducing any new IPC, the walreceiver could
instead simply report apply progress to the master whenever it sees that
the apply LSN has moved after its regular NAPTIME_PER_CYCLE wait (100ms),
but that would still introduces bogus latency. A quick and dirty way to
see that on top of the attached patch is to set requestReply = true in
WalReceiverMain to force a send after every nap.

This problem is exactly why I wrote my recent patch to make WALWriter work
in recovery.

Currently, the WALReceiver issues regular fsyncs that prevent it from
replying in time. Also, the WALReceiver waits on incoming data only, so we
can't (yet) set a latch when the Startup process has applied some records.

I've solved the first problem and know how to solve the second, just
haven't coded it yet. I was expecting to do that for CF3 or CF4.

I don't think we should be using signals, nor would I expect them to work
effectively while in an fsync.

I can see that using synchronous_commit = apply in the practice might
prove difficult: how does a client know which node is the synchronous
standby? Perhaps those sorts of practical problems are the reason no one
has done or wanted this.

It means we need quorum sync rep as well, to make this useful in practice
without sacrificing HA.

Bringing my patch and Beena's patch together will solve this for us in 9.6

So yes, 1) we have thought of it and want it, 2) the basic patch is
trivial, 3) but it isn't the main problem.

--
Simon Riggs http://www.2ndQuadrant.com/
<http://www.2ndquadrant.com/&gt;
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services

#8Thomas Munro
thomas.munro@enterprisedb.com
In reply to: Simon Riggs (#7)
Re: synchronous_commit = apply

On Thu, Sep 17, 2015 at 12:50 AM, Simon Riggs <simon@2ndquadrant.com> wrote:

On 1 September 2015 at 20:25, Thomas Munro <thomas.munro@enterprisedb.com>
wrote:

The next problem is that the master can be waiting quite a long time for a
reply from the remote walreceiver containing the desired apply LSN: in the
best case it learns of apply progress from replies to subsequent unrelated
records (which might be very soon on a busy system but still involves
waiting for the next transaction's WAL flush), and in the worst case it
needs to wait for wal_receiver_status_interval (10 seconds by default),
which makes for a long COMMIT delay. I was thinking that the solution to
that may be to teach StartupLOG to signal the walreceiver after it updates
XLogCtl->lastReplayedEndRecPtr, which should cause walrcv_receive to be
interrupted and return early, and then walreceiver could send a reply if it
sees that lastReplayedEndRecPtr has moved. Maybe that would generate an
unacceptably high frequency of signals, and maybe there is a better form of
IPC for this. Without introducing any new IPC, the walreceiver could
instead simply report apply progress to the master whenever it sees that the
apply LSN has moved after its regular NAPTIME_PER_CYCLE wait (100ms), but
that would still introduces bogus latency. A quick and dirty way to see
that on top of the attached patch is to set requestReply = true in
WalReceiverMain to force a send after every nap.

This problem is exactly why I wrote my recent patch to make WALWriter work
in recovery.

Currently, the WALReceiver issues regular fsyncs that prevent it from
replying in time. Also, the WALReceiver waits on incoming data only, so we
can't (yet) set a latch when the Startup process has applied some records.

I've solved the first problem and know how to solve the second, just haven't
coded it yet. I was expecting to do that for CF3 or CF4.

I don't think we should be using signals, nor would I expect them to work
effectively while in an fsync.

That sounds much better. I had noticed that with my patch the
walreceiver loop was basically trying to do far too much. I was
contemplating investigating a pipe for IPC, so that it could
select/poll on both the socket connected to master + the new apply
feedback pipe, rather that using raw signals (directly or via latches)
and interrupting syscalls.

I can see that using synchronous_commit = apply in the practice might
prove difficult: how does a client know which node is the synchronous
standby? Perhaps those sorts of practical problems are the reason no one
has done or wanted this.

It means we need quorum sync rep as well, to make this useful in practice
without sacrificing HA.

Bringing my patch and Beena's patch together will solve this for us in 9.6

I've been looking at that patch. It makes sense for adding redundancy
in synchronous_commit = on mode (waiting for WAL flush but not apply).
But it strikes me that to make multi-server synchronous_commit = apply
really useful, it is not enough to wait for a quorum of any N servers
in a group to reply, because a client connected to a given standby
doesn't know whether that standby was one of the N and therefore
whether it is guaranteed to see the effects of a committed transaction
that it has heard about. Do you have a plan that could address that?

I have been working on a proposal that adds support for reliable
"causal" and "ready-your-writes" consistency, while still allowing for
some number of standbys to fail/fall behind without blocking all
transactions forever. After a COMMIT with synchronous_commit = apply
returns successfully, you can run a query on any standby node, or tell
another process to run a query on any standby node, and it is
guaranteed to either see the committed transaction or receive a new
error "standby not synchronized". This behaviour is activated by also
setting synchronous_commit = apply on the standby, and works by adding
some two-way timeout logic. I will have more to say about this soon
(I have some other work to get out of the way first).

I will not be at all surprised to hear that you already have this
covered and are 18 steps ahead of me!

So yes, 1) we have thought of it and want it, 2) the basic patch is trivial,
3) but it isn't the main problem.

Agreed. I had a go at this because I needed the trivial plumbing in
so I could work on the more difficult problem above, and I didn't know
you had it in the pipeline already. I'm glad to hear that you do, and
that you have solved the problem of the interleaving of operations in
walreceiver, and I will be following along with interest.

--
Thomas Munro
http://www.enterprisedb.com

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

#9Kyotaro HORIGUCHI
horiguchi.kyotaro@lab.ntt.co.jp
In reply to: Thomas Munro (#6)
Re: synchronous_commit = apply

Hello, I have some random comments.

At Wed, 16 Sep 2015 23:07:03 +1200, Thomas Munro <thomas.munro@enterprisedb.com> wrote in <CAEepm=2_dDqQxgGc83_a48rYza3T4P4vPTpSC6xkHcMEoGyspw@mail.gmail.com>

On Tue, Sep 8, 2015 at 1:11 AM, Thomas Munro <thomas.munro@enterprisedb.com>
wrote:

On Thu, Sep 3, 2015 at 12:02 AM, Fujii Masao <masao.fujii@gmail.com>
wrote:
Hmm. So maybe commit records could have a flag saying 'someone is waiting
for this to commit to apply', and the startup process's apply loop would
only bother to signal the walreceiver if it sees that flag. I will try
that.

Here is a version that does that, using a bit in xinfo to request apply
feedback from standbys when running with synchronous_commit = apply.

The paramter apply_lsn of XLogWalRcvSendReply seems not used in
the function. Maybe

- applyPtr = GetXLogReplayRecPtr(NULL);
+ applyPtr = apply_lsn != InvalidXLogRecPtr ?
+                 apply_lsn : GetXLogReplayRecPtr(NULL);

However, walreceiver already sends feedback containing apply lsn
always so I think it is useless if walreceiver is woke up after
the commit record is applied.

I am not very happy with the way that xact_redo communicates with the main
apply loop when it sees that bit, through calls to
XLogAppliedSynchronousCommit (essentially a global variable), but I
couldn't immediately see a better way to get information out of xact_redo
into the apply loop without changing the rm_redo interface. Perhaps xinfo
is the wrong place for that information. Thoughts?

I think it is better to avoid xact_redo_commit to be involved in
the standby side mechanism.

walreceiver don't seem to be the place to read XLogRecord.
StartXOG already parses records in recoveryStopsBefore/After. So
we can do the following thing in place of
XLogAppliedSynchronousCommit() if additional parsing of xlog
records in redo loop is acceptable.

XLogImmediatFeedbackAppliedLSN(XLogReaderState *record)
{
if (XLogRecGetRmid(record) != RM_XACT_ID)
return false;
info = XLogRecGetInfo(record) & XLOG_XACT_OPMASK;
if (xact_info != XLOG_XACT_COMMIT &&
xact_info != XLOG_XACT_COMMIT_PREPARED)
return false;
xl_xact_commit *xlrec = (xl_xact_commit *) XLogRecGetData(record);
xl_xact_parsed_commit parsed;
ParseCommitRecord(XLogRecGetInfo(record), xlrec, &parsed);
if (! (parsed->xinfo.xinfo & XACT_XINFO_NEED_APPLY_FEEDBACK))
return false;

WalRcvWakeup();
}

In WalRcvMain, there's a bit too many if(got_SIGUSR1)'s in the
main loop. And the current patch seems to simply double the
walreceiver reply when got_SIGUSR1.

I found one trival mistake,

--- a/src/backend/replication/syncrep.c
+++ b/src/backend/replication/syncrep.c
@@ -462,6 +462,11 @@ SyncRepReleaseWaiters(void)
         walsndctl->lsn[SYNC_REP_WAIT_FLUSH] = MyWalSnd->flush;
         numflush = SyncRepWakeQueue(false, SYNC_REP_WAIT_FLUSH);
     }
+    if (walsndctl->lsn[SYNC_REP_WAIT_APPLY] < MyWalSnd->apply)
+    {
+        walsndctl->lsn[SYNC_REP_WAIT_APPLY] = MyWalSnd->apply;
+        numflush = SyncRepWakeQueue(false, SYNC_REP_WAIT_APPLY);
+    }

This overwrites numflush by the value which is to be numapply. So
the following DEBUG3 message will be wrong.

elog(DEBUG3, "released %d procs up to write %X/%X, %d procs up to flush %X/%X",
numwrite, (uint32) (MyWalSnd->write >> 32), (uint32) MyWalSnd->write,
numflush, (uint32) (MyWalSnd->flush >> 32), (uint32) MyWalSnd->flush);

regards,

--
Kyotaro Horiguchi
NTT Open Source Software Center

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

#10Thomas Munro
thomas.munro@enterprisedb.com
In reply to: Kyotaro HORIGUCHI (#9)
1 attachment(s)
Re: synchronous_commit = apply

On Fri, Sep 18, 2015 at 7:06 PM, Kyotaro HORIGUCHI
<horiguchi.kyotaro@lab.ntt.co.jp> wrote:

Hello, I have some random comments.

Thanks for the feedback! I have fixed several of the things that you
found in the attached new version -- see comments inline below.
However, I now know that Simon has a better patch in development to do
this, so I won't be developing this further. (Until that work is
available, this patch is temporarily useful as a prerequisite for
something else that I'm working on so I'll still be using it...)

At Wed, 16 Sep 2015 23:07:03 +1200, Thomas Munro <thomas.munro@enterprisedb.com> wrote in <CAEepm=2_dDqQxgGc83_a48rYza3T4P4vPTpSC6xkHcMEoGyspw@mail.gmail.com>

On Tue, Sep 8, 2015 at 1:11 AM, Thomas Munro <thomas.munro@enterprisedb.com>
wrote:

On Thu, Sep 3, 2015 at 12:02 AM, Fujii Masao <masao.fujii@gmail.com>
wrote:
Hmm. So maybe commit records could have a flag saying 'someone is waiting
for this to commit to apply', and the startup process's apply loop would
only bother to signal the walreceiver if it sees that flag. I will try
that.

Here is a version that does that, using a bit in xinfo to request apply
feedback from standbys when running with synchronous_commit = apply.

The paramter apply_lsn of XLogWalRcvSendReply seems not used in
the function. Maybe

- applyPtr = GetXLogReplayRecPtr(NULL);
+ applyPtr = apply_lsn != InvalidXLogRecPtr ?
+                 apply_lsn : GetXLogReplayRecPtr(NULL);

You're right, that is what I meant to do. Fixed.

However, walreceiver already sends feedback containing apply lsn
always so I think it is useless if walreceiver is woke up after
the commit record is applied.

No, XLogWalRcvSendReply only sends feedback sometimes (see the
conditional early returns).

I am not very happy with the way that xact_redo communicates with the main
apply loop when it sees that bit, through calls to
XLogAppliedSynchronousCommit (essentially a global variable), but I
couldn't immediately see a better way to get information out of xact_redo
into the apply loop without changing the rm_redo interface. Perhaps xinfo
is the wrong place for that information. Thoughts?

I think it is better to avoid xact_redo_commit to be involved in
the standby side mechanism.

I agree that this doesn't seem quite right...

walreceiver don't seem to be the place to read XLogRecord.
StartXOG already parses records in recoveryStopsBefore/After. So
we can do the following thing in place of
XLogAppliedSynchronousCommit() if additional parsing of xlog
records in redo loop is acceptable.

XLogImmediatFeedbackAppliedLSN(XLogReaderState *record)
{
if (XLogRecGetRmid(record) != RM_XACT_ID)
return false;
info = XLogRecGetInfo(record) & XLOG_XACT_OPMASK;
if (xact_info != XLOG_XACT_COMMIT &&
xact_info != XLOG_XACT_COMMIT_PREPARED)
return false;
xl_xact_commit *xlrec = (xl_xact_commit *) XLogRecGetData(record);
xl_xact_parsed_commit parsed;
ParseCommitRecord(XLogRecGetInfo(record), xlrec, &parsed);
if (! (parsed->xinfo.xinfo & XACT_XINFO_NEED_APPLY_FEEDBACK))
return false;

WalRcvWakeup();
}

... but I don't think it's a good idea to parse every commit record
twice. Maybe there could be an XactGetXinfo function which just takes
reads the xinfo field from the front.

In WalRcvMain, there's a bit too many if(got_SIGUSR1)'s in the
main loop.

I agree that this control flow is not ideal, but I won't try to
improve that now that I know that Simon has a patch that doesn't use
signals for this and probably rearranges this loop considerably.

And the current patch seems to simply double the
walreceiver reply when got_SIGUSR1.

I don't think so -- the pre-existing call to XLogWalRcvSendReply
doesn't send anything unless certain conditions are met. You can see
this by testing the first version of the patch I posted in this
thread, which didn't do any of this SIGUSR1 stuff -- in that version,
the test program "test-sync-apply --level apply --loops 5" had to wait
~10 seconds for every commit.

I found one trival mistake,

--- a/src/backend/replication/syncrep.c
+++ b/src/backend/replication/syncrep.c
@@ -462,6 +462,11 @@ SyncRepReleaseWaiters(void)
walsndctl->lsn[SYNC_REP_WAIT_FLUSH] = MyWalSnd->flush;
numflush = SyncRepWakeQueue(false, SYNC_REP_WAIT_FLUSH);
}
+    if (walsndctl->lsn[SYNC_REP_WAIT_APPLY] < MyWalSnd->apply)
+    {
+        walsndctl->lsn[SYNC_REP_WAIT_APPLY] = MyWalSnd->apply;
+        numflush = SyncRepWakeQueue(false, SYNC_REP_WAIT_APPLY);
+    }

This overwrites numflush by the value which is to be numapply. So
the following DEBUG3 message will be wrong.

Oops, right. Fixed.

--
Thomas Munro
http://www.enterprisedb.com

Attachments:

synchronous-commit-apply-v4.patchapplication/octet-stream; name=synchronous-commit-apply-v4.patchDownload
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index 56a1cb4..2f13b63 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -5099,6 +5099,13 @@ XactLogCommitRecord(TimestampTz commit_time,
 		xl_xinfo.xinfo |= XACT_COMPLETION_FORCE_SYNC_COMMIT;
 
 	/*
+	 * Check if the caller would like to ask standbys for immediate feedback
+	 * once this commit is applied.
+	 */
+	if (synchronous_commit >= SYNCHRONOUS_COMMIT_REMOTE_APPLY)
+		xl_xinfo.xinfo |= XACT_COMPLETION_SYNC_APPLY_FEEDBACK;
+
+	/*
 	 * Relcache invalidations requires information about the current database
 	 * and so does logical decoding.
 	 */
@@ -5435,6 +5442,12 @@ xact_redo_commit(xl_xact_parsed_commit *parsed,
 	if (XactCompletionForceSyncCommit(parsed->xinfo))
 		XLogFlush(lsn);
 
+	/*
+	 * This commit record has someone waiting for apply feedback, so we tell
+	 * the xlog apply loop about that so it can generate a reply.
+	 */
+	if (XactCompletionSyncApplyFeedback(parsed->xinfo))
+		XLogAppliedSynchronousCommit(true);
 }
 
 /*
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index a092aad..373204a 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -6871,6 +6871,15 @@ StartupXLOG(void)
 				XLogCtl->lastReplayedTLI = ThisTimeLineID;
 				SpinLockRelease(&XLogCtl->info_lck);
 
+				/*
+				 * If rm_redo reported that it applied a commit record that
+				 * the master is waiting for, then we wake up the receiver so
+				 * that it notices the updated lastReplayedEndRecPtr and sends
+				 * a reply to the master.
+				 */
+				if (XLogAppliedSynchronousCommit(false))
+					WalRcvWakeup();
+
 				/* Remember this record as the last-applied one */
 				LastRec = ReadRecPtr;
 
@@ -11637,3 +11646,17 @@ SetWalWriterSleeping(bool sleeping)
 	XLogCtl->WalWriterSleeping = sleeping;
 	SpinLockRelease(&XLogCtl->info_lck);
 }
+
+/*
+ * Update the flag to indicate that a commit record has been applied, and
+ * return the previous value.
+ */
+bool
+XLogAppliedSynchronousCommit(bool value)
+{
+	static bool last_value = false;
+	bool result = last_value;
+
+	last_value = value;
+	return result;
+}
diff --git a/src/backend/replication/syncrep.c b/src/backend/replication/syncrep.c
index 325239d..4524dcb 100644
--- a/src/backend/replication/syncrep.c
+++ b/src/backend/replication/syncrep.c
@@ -416,6 +416,7 @@ SyncRepReleaseWaiters(void)
 	WalSnd	   *syncWalSnd;
 	int			numwrite = 0;
 	int			numflush = 0;
+	int			numapply = 0;
 
 	/*
 	 * If this WALSender is serving a standby that is not on the list of
@@ -462,12 +463,18 @@ SyncRepReleaseWaiters(void)
 		walsndctl->lsn[SYNC_REP_WAIT_FLUSH] = MyWalSnd->flush;
 		numflush = SyncRepWakeQueue(false, SYNC_REP_WAIT_FLUSH);
 	}
+	if (walsndctl->lsn[SYNC_REP_WAIT_APPLY] < MyWalSnd->apply)
+	{
+		walsndctl->lsn[SYNC_REP_WAIT_APPLY] = MyWalSnd->apply;
+		numapply = SyncRepWakeQueue(false, SYNC_REP_WAIT_APPLY);
+	}
 
 	LWLockRelease(SyncRepLock);
 
-	elog(DEBUG3, "released %d procs up to write %X/%X, %d procs up to flush %X/%X",
+	elog(DEBUG3, "released %d procs up to write %X/%X, %d procs up to flush %X/%X, %d procs up to apply %X/%X",
 		 numwrite, (uint32) (MyWalSnd->write >> 32), (uint32) MyWalSnd->write,
-	   numflush, (uint32) (MyWalSnd->flush >> 32), (uint32) MyWalSnd->flush);
+		 numflush, (uint32) (MyWalSnd->flush >> 32), (uint32) MyWalSnd->flush,
+		 numapply, (uint32) (MyWalSnd->apply >> 32), (uint32) MyWalSnd->apply);
 
 	/*
 	 * If we are managing the highest priority standby, though we weren't
@@ -728,6 +735,9 @@ assign_synchronous_commit(int newval, void *extra)
 		case SYNCHRONOUS_COMMIT_REMOTE_FLUSH:
 			SyncRepWaitMode = SYNC_REP_WAIT_FLUSH;
 			break;
+		case SYNCHRONOUS_COMMIT_REMOTE_APPLY:
+			SyncRepWaitMode = SYNC_REP_WAIT_APPLY;
+			break;
 		default:
 			SyncRepWaitMode = SYNC_REP_NO_WAIT;
 			break;
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index 41e57f2..7d3acac 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -96,6 +96,7 @@ static uint32 recvOff = 0;
  */
 static volatile sig_atomic_t got_SIGHUP = false;
 static volatile sig_atomic_t got_SIGTERM = false;
+static volatile sig_atomic_t got_SIGUSR1 = false;
 
 /*
  * LogstreamResult indicates the byte positions that we have already
@@ -138,7 +139,7 @@ static void WalRcvDie(int code, Datum arg);
 static void XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len);
 static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr);
 static void XLogWalRcvFlush(bool dying);
-static void XLogWalRcvSendReply(bool force, bool requestReply);
+static void XLogWalRcvSendReply(bool force, bool requestReply, XLogRecPtr applyLsn);
 static void XLogWalRcvSendHSFeedback(bool immed);
 static void ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime);
 
@@ -182,6 +183,24 @@ DisableWalRcvImmediateExit(void)
 	ProcessWalRcvInterrupts();
 }
 
+/*
+ * Check if the applied LSN has moved.  If it has, send a message to the
+ * primary server.  This is called by the walreceiver when it receives a
+ * signal from the startup process.
+ */
+static void
+SendApplyNotificationIfMoved(XLogRecPtr *last_sent_applied_lsn)
+{
+	XLogRecPtr applied_lsn;
+
+	applied_lsn = GetXLogReplayRecPtr(NULL);
+	if (applied_lsn != *last_sent_applied_lsn)
+	{
+		XLogWalRcvSendReply(true, true, applied_lsn);
+		*last_sent_applied_lsn = applied_lsn;
+	}
+}
+
 /* Main entry point for walreceiver process */
 void
 WalReceiverMain(void)
@@ -350,6 +369,7 @@ WalReceiverMain(void)
 								  slotname[0] != '\0' ? slotname : NULL))
 		{
 			bool		endofwal = false;
+			XLogRecPtr	last_sent_applied_lsn = InvalidXLogRecPtr;
 
 			if (first_stream)
 				ereport(LOG,
@@ -412,7 +432,7 @@ WalReceiverMain(void)
 					 * Process the received data, and any subsequent data we
 					 * can read without blocking.
 					 */
-					for (;;)
+					while (!got_SIGUSR1)
 					{
 						if (len > 0)
 						{
@@ -439,8 +459,15 @@ WalReceiverMain(void)
 						len = walrcv_receive(0, &buf);
 					}
 
+					/* Check if the startup process has signaled us. */
+					if (got_SIGUSR1)
+					{
+						got_SIGUSR1 = false;
+						SendApplyNotificationIfMoved(&last_sent_applied_lsn);
+					}
+
 					/* Let the master know that we received some data. */
-					XLogWalRcvSendReply(false, false);
+					XLogWalRcvSendReply(false, false, InvalidXLogRecPtr);
 
 					/*
 					 * If we've written some records, flush them to disk and
@@ -495,7 +522,14 @@ WalReceiverMain(void)
 						}
 					}
 
-					XLogWalRcvSendReply(requestReply, requestReply);
+					/* Check if the startup process has signaled us. */
+					if (got_SIGUSR1)
+					{
+						got_SIGUSR1 = false;
+						SendApplyNotificationIfMoved(&last_sent_applied_lsn);
+					}
+
+					XLogWalRcvSendReply(requestReply, requestReply, InvalidXLogRecPtr);
 					XLogWalRcvSendHSFeedback(false);
 				}
 			}
@@ -734,6 +768,7 @@ WalRcvSigUsr1Handler(SIGNAL_ARGS)
 {
 	int			save_errno = errno;
 
+	got_SIGUSR1 = true;
 	latch_sigusr1_handler();
 
 	errno = save_errno;
@@ -846,7 +881,7 @@ XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len)
 
 				/* If the primary requested a reply, send one immediately */
 				if (replyRequested)
-					XLogWalRcvSendReply(true, false);
+					XLogWalRcvSendReply(true, false, InvalidXLogRecPtr);
 				break;
 			}
 		default:
@@ -1010,7 +1045,7 @@ XLogWalRcvFlush(bool dying)
 		/* Also let the master know that we made some progress */
 		if (!dying)
 		{
-			XLogWalRcvSendReply(false, false);
+			XLogWalRcvSendReply(false, false, InvalidXLogRecPtr);
 			XLogWalRcvSendHSFeedback(false);
 		}
 	}
@@ -1028,9 +1063,12 @@ XLogWalRcvFlush(bool dying)
  * If 'requestReply' is true, requests the server to reply immediately upon
  * receiving this message. This is used for heartbearts, when approaching
  * wal_receiver_timeout.
+ *
+ * If 'apply_lsn' is InvalidXLogRecPtr, the apply LSN is looked up in shmem if
+ * it is needed.  Otherwise, the value provided is used.
  */
 static void
-XLogWalRcvSendReply(bool force, bool requestReply)
+XLogWalRcvSendReply(bool force, bool requestReply, XLogRecPtr apply_lsn)
 {
 	static XLogRecPtr writePtr = 0;
 	static XLogRecPtr flushPtr = 0;
@@ -1068,7 +1106,8 @@ XLogWalRcvSendReply(bool force, bool requestReply)
 	/* Construct a new message */
 	writePtr = LogstreamResult.Write;
 	flushPtr = LogstreamResult.Flush;
-	applyPtr = GetXLogReplayRecPtr(NULL);
+	applyPtr = apply_lsn != InvalidXLogRecPtr ?
+		apply_lsn : GetXLogReplayRecPtr(NULL);
 
 	resetStringInfo(&reply_message);
 	pq_sendbyte(&reply_message, 'r');
@@ -1221,3 +1260,19 @@ ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime)
 		pfree(receipttime);
 	}
 }
+
+/*
+ * Wake up the walreceiver if it happens to be blocked in walrcv_receive,
+ * and tell it that a commit record has been applied.
+ *
+ * This is called by the startup process whenever interesting xlog records
+ * are applied, so that walreceiver can check if it needs to send an apply
+ * notification back to the master which may be waiting in a COMMIT with
+ * synchronous_commit = apply.
+ */
+void
+WalRcvWakeup(void)
+{
+	if (WalRcv->pid != 0)
+		kill(WalRcv->pid, SIGUSR1);
+}
diff --git a/src/backend/utils/adt/tsvector_op.c b/src/backend/utils/adt/tsvector_op.c
index 05c23da..e822ba8 100644
--- a/src/backend/utils/adt/tsvector_op.c
+++ b/src/backend/utils/adt/tsvector_op.c
@@ -21,6 +21,7 @@
 #include "funcapi.h"
 #include "mb/pg_wchar.h"
 #include "miscadmin.h"
+#include "parser/parse_coerce.h"
 #include "tsearch/ts_utils.h"
 #include "utils/builtins.h"
 #include "utils/lsyscache.h"
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index 8ebf424..39f14fc 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -351,6 +351,7 @@ static const struct config_enum_entry constraint_exclusion_options[] = {
 static const struct config_enum_entry synchronous_commit_options[] = {
 	{"local", SYNCHRONOUS_COMMIT_LOCAL_FLUSH, false},
 	{"remote_write", SYNCHRONOUS_COMMIT_REMOTE_WRITE, false},
+	{"apply", SYNCHRONOUS_COMMIT_REMOTE_APPLY, false},
 	{"on", SYNCHRONOUS_COMMIT_ON, false},
 	{"off", SYNCHRONOUS_COMMIT_OFF, false},
 	{"true", SYNCHRONOUS_COMMIT_ON, true},
diff --git a/src/include/access/xact.h b/src/include/access/xact.h
index cb1c2db..bfb8fa2 100644
--- a/src/include/access/xact.h
+++ b/src/include/access/xact.h
@@ -60,7 +60,9 @@ typedef enum
 	SYNCHRONOUS_COMMIT_LOCAL_FLUSH,		/* wait for local flush only */
 	SYNCHRONOUS_COMMIT_REMOTE_WRITE,	/* wait for local flush and remote
 										 * write */
-	SYNCHRONOUS_COMMIT_REMOTE_FLUSH		/* wait for local and remote flush */
+	SYNCHRONOUS_COMMIT_REMOTE_FLUSH,	/* wait for local and remote flush */
+	SYNCHRONOUS_COMMIT_REMOTE_APPLY		/* wait for local flush and remote
+										 * apply */
 }	SyncCommitLevel;
 
 /* Define the default setting for synchonous_commit */
@@ -144,10 +146,13 @@ typedef void (*SubXactCallback) (SubXactEvent event, SubTransactionId mySubid,
  * EOXact... routines which run at the end of the original transaction
  * completion.
  */
+#define XACT_COMPLETION_SYNC_APPLY_FEEDBACK		(1U << 29)
 #define XACT_COMPLETION_UPDATE_RELCACHE_FILE	(1U << 30)
 #define XACT_COMPLETION_FORCE_SYNC_COMMIT		(1U << 31)
 
 /* Access macros for above flags */
+#define XactCompletionSyncApplyFeedback(xinfo) \
+	(!!(xinfo & XACT_COMPLETION_SYNC_APPLY_FEEDBACK))
 #define XactCompletionRelcacheInitFileInval(xinfo) \
 	(!!(xinfo & XACT_COMPLETION_UPDATE_RELCACHE_FILE))
 #define XactCompletionForceSyncCommit(xinfo) \
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index 790ca66..93efa29 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -266,6 +266,7 @@ extern void RemovePromoteSignalFiles(void);
 extern bool CheckPromoteSignal(void);
 extern void WakeupRecovery(void);
 extern void SetWalWriterSleeping(bool sleeping);
+extern bool XLogAppliedSynchronousCommit(bool value);
 
 extern void assign_max_wal_size(int newval, void *extra);
 extern void assign_checkpoint_completion_target(double newval, void *extra);
diff --git a/src/include/replication/syncrep.h b/src/include/replication/syncrep.h
index 71e2857..8e0fe00 100644
--- a/src/include/replication/syncrep.h
+++ b/src/include/replication/syncrep.h
@@ -23,8 +23,9 @@
 #define SYNC_REP_NO_WAIT		-1
 #define SYNC_REP_WAIT_WRITE		0
 #define SYNC_REP_WAIT_FLUSH		1
+#define SYNC_REP_WAIT_APPLY		2
 
-#define NUM_SYNC_REP_WAIT_MODE	2
+#define NUM_SYNC_REP_WAIT_MODE	3
 
 /* syncRepState */
 #define SYNC_REP_NOT_WAITING		0
diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h
index 61255a9..3256ed3 100644
--- a/src/include/replication/walreceiver.h
+++ b/src/include/replication/walreceiver.h
@@ -160,5 +160,6 @@ extern void RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr,
 extern XLogRecPtr GetWalRcvWriteRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI);
 extern int	GetReplicationApplyDelay(void);
 extern int	GetReplicationTransferLatency(void);
+extern void WalRcvWakeup(void);
 
 #endif   /* _WALRECEIVER_H */