Time based lag tracking for logical replication

Started by Petr Jelinekover 8 years ago18 messages
#1Petr Jelinek
petr.jelinek@2ndquadrant.com
1 attachment(s)

Hi,

The time based lag tracking commit [1] added interface for logging
progress of replication so that we can report lag as time interval
instead of just bytes. But the patch didn't contain patch for the
builtin logical replication.

So I wrote something that implements this. I didn't like all that much
the API layering in terms of exporting the walsender's LagTrackerWrite()
for use by plugin directly. Normally output plugin does not have to care
if it's running under walsender or not, it uses abstracted write
interface for that which can be implemented in various ways (that's how
we implement SQL interface to logical decoding after all). So I decided
to add another function to the logical decoding write api called
update_progress and call that one from the output plugin. The walsender
then implements that new API to call the LagTrackerWrite() while the SQL
interface just does not implement it at all. This seems like cleaner way
of doing it.

Thoughts?

--
Petr Jelinek http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Training & Services

Attachments:

Add-support-for-time-based-lag-tracking-to-logical-r.patchbinary/octet-stream; name=Add-support-for-time-based-lag-tracking-to-logical-r.patchDownload
From cd99766c2090b2090cab27b3c99bad0fad3e0bbe Mon Sep 17 00:00:00 2001
From: Petr Jelinek <pjmodos@pjmodos.net>
Date: Sun, 23 Apr 2017 01:00:25 +0200
Subject: [PATCH] Add support for time based lag tracking to logical
 replication

This patch adds new write api interface to LogicalDecodingContext called
update_progress and wrapper around it called OutputPluginUpdateProgress
for output plugins to use for progress reporting. This new interface is
optional, walsender uses it to do the time based lag tracking, while the
SQL interface does not implement it.
---
 src/backend/replication/logical/logical.c      | 34 ++++++++++++++++++++------
 src/backend/replication/logical/logicalfuncs.c |  2 +-
 src/backend/replication/pgoutput/pgoutput.c    |  2 ++
 src/backend/replication/slotfuncs.c            |  3 ++-
 src/backend/replication/walsender.c            | 32 +++++++++++++++++-------
 src/include/replication/logical.h              | 15 +++++++++---
 src/include/replication/output_plugin.h        |  1 +
 7 files changed, 66 insertions(+), 23 deletions(-)

diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 79c1dd7..cba11da 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -117,7 +117,8 @@ StartupDecodingContext(List *output_plugin_options,
 					   bool need_full_snapshot,
 					   XLogPageReadCB read_page,
 					   LogicalOutputPluginWriterPrepareWrite prepare_write,
-					   LogicalOutputPluginWriterWrite do_write)
+					   LogicalOutputPluginWriterWrite do_write,
+					   LogicalOutputPluginWriterUpdateProgress update_progress)
 {
 	ReplicationSlot *slot;
 	MemoryContext context,
@@ -186,6 +187,7 @@ StartupDecodingContext(List *output_plugin_options,
 	ctx->out = makeStringInfo();
 	ctx->prepare_write = prepare_write;
 	ctx->write = do_write;
+	ctx->update_progress = update_progress;
 
 	ctx->output_plugin_options = output_plugin_options;
 
@@ -199,8 +201,9 @@ StartupDecodingContext(List *output_plugin_options,
  *
  * plugin contains the name of the output plugin
  * output_plugin_options contains options passed to the output plugin
- * read_page, prepare_write, do_write are callbacks that have to be filled to
- *		perform the use-case dependent, actual, work.
+ * read_page, prepare_write, do_write, update_progress
+ *  	callbacks that have to be filled to perform the use-case dependent,
+ *  	actual, work.
  *
  * Needs to be called while in a memory context that's at least as long lived
  * as the decoding context because further memory contexts will be created
@@ -215,7 +218,8 @@ CreateInitDecodingContext(char *plugin,
 						  bool need_full_snapshot,
 						  XLogPageReadCB read_page,
 						  LogicalOutputPluginWriterPrepareWrite prepare_write,
-						  LogicalOutputPluginWriterWrite do_write)
+						  LogicalOutputPluginWriterWrite do_write,
+						  LogicalOutputPluginWriterUpdateProgress update_progress)
 {
 	TransactionId xmin_horizon = InvalidTransactionId;
 	ReplicationSlot *slot;
@@ -298,7 +302,7 @@ CreateInitDecodingContext(char *plugin,
 
 	ctx = StartupDecodingContext(NIL, InvalidXLogRecPtr, xmin_horizon,
 								 need_full_snapshot, read_page, prepare_write,
-								 do_write);
+								 do_write, update_progress);
 
 	/* call output plugin initialization callback */
 	old_context = MemoryContextSwitchTo(ctx->context);
@@ -322,7 +326,7 @@ CreateInitDecodingContext(char *plugin,
  * output_plugin_options
  *		contains options passed to the output plugin.
  *
- * read_page, prepare_write, do_write
+ * read_page, prepare_write, do_write, update_progress
  *		callbacks that have to be filled to perform the use-case dependent,
  *		actual work.
  *
@@ -338,7 +342,8 @@ CreateDecodingContext(XLogRecPtr start_lsn,
 					  List *output_plugin_options,
 					  XLogPageReadCB read_page,
 					  LogicalOutputPluginWriterPrepareWrite prepare_write,
-					  LogicalOutputPluginWriterWrite do_write)
+					  LogicalOutputPluginWriterWrite do_write,
+					  LogicalOutputPluginWriterUpdateProgress update_progress)
 {
 	LogicalDecodingContext *ctx;
 	ReplicationSlot *slot;
@@ -388,7 +393,8 @@ CreateDecodingContext(XLogRecPtr start_lsn,
 
 	ctx = StartupDecodingContext(output_plugin_options,
 								 start_lsn, InvalidTransactionId, false,
-								 read_page, prepare_write, do_write);
+								 read_page, prepare_write, do_write,
+								 update_progress);
 
 	/* call output plugin initialization callback */
 	old_context = MemoryContextSwitchTo(ctx->context);
@@ -521,6 +527,18 @@ OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write)
 }
 
 /*
+ * Update progress tracking (if supported).
+ */
+void
+OutputPluginUpdateProgress(struct LogicalDecodingContext *ctx)
+{
+	if (!ctx->update_progress)
+		return;
+
+	ctx->update_progress(ctx, ctx->write_location, ctx->write_xid);
+}
+
+/*
  * Load the output plugin, lookup its output plugin init function, and check
  * that it provides the required callbacks.
  */
diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c
index c251b92..27164de 100644
--- a/src/backend/replication/logical/logicalfuncs.c
+++ b/src/backend/replication/logical/logicalfuncs.c
@@ -253,7 +253,7 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
 									options,
 									logical_read_local_xlog_page,
 									LogicalOutputPrepareWrite,
-									LogicalOutputWrite);
+									LogicalOutputWrite, NULL);
 
 		MemoryContextSwitchTo(oldcontext);
 
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 2cbd9ce..93615a4 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -244,6 +244,8 @@ static void
 pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 					 XLogRecPtr commit_lsn)
 {
+	OutputPluginUpdateProgress(ctx);
+
 	OutputPluginPrepareWrite(ctx, true);
 	logicalrep_write_commit(ctx->out, txn);
 	OutputPluginWrite(ctx, true);
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index 9775735..22d2644 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -133,7 +133,8 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS)
 	 */
 	ctx = CreateInitDecodingContext(
 									NameStr(*plugin), NIL, false,
-									logical_read_local_xlog_page, NULL, NULL);
+									logical_read_local_xlog_page, NULL, NULL,
+									NULL);
 
 	/* build initial snapshot, might take a while */
 	DecodingContextFindStartpoint(ctx);
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 78369ae..67fd332 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -239,7 +239,9 @@ static void WalSndCheckTimeOut(TimestampTz now);
 static long WalSndComputeSleeptime(TimestampTz now);
 static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write);
 static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write);
+static void WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid);
 static XLogRecPtr WalSndWaitForWal(XLogRecPtr loc);
+static void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time);
 static TimeOffset LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now);
 static bool TransactionIdInRecentPast(TransactionId xid, uint32 epoch);
 
@@ -914,7 +916,8 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
 
 		ctx = CreateInitDecodingContext(cmd->plugin, NIL, need_full_snapshot,
 										logical_read_xlog_page,
-										WalSndPrepareWrite, WalSndWriteData);
+										WalSndPrepareWrite, WalSndWriteData,
+										WalSndUpdateProgress);
 
 		/*
 		 * Signal that we don't need the timeout mechanism. We're just
@@ -1068,10 +1071,11 @@ StartLogicalReplication(StartReplicationCmd *cmd)
 	 * Initialize position to the last ack'ed one, then the xlog records begin
 	 * to be shipped from that position.
 	 */
-	logical_decoding_ctx = CreateDecodingContext(
-											   cmd->startpoint, cmd->options,
+	logical_decoding_ctx = CreateDecodingContext(cmd->startpoint, cmd->options,
 												 logical_read_xlog_page,
-										WalSndPrepareWrite, WalSndWriteData);
+												 WalSndPrepareWrite,
+												 WalSndWriteData,
+												 WalSndUpdateProgress);
 
 	/* Start reading WAL from the oldest required WAL. */
 	logical_startptr = MyReplicationSlot->data.restart_lsn;
@@ -1231,6 +1235,17 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
 }
 
 /*
+ * LogicalDecodingContext 'progress_update' callback.
+ *
+ * Write the current position to the log tracker (see XLogSendPhysical).
+ */
+static void
+WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid)
+{
+	LagTrackerWrite(lsn, GetCurrentTimestamp());
+}
+
+/*
  * Wait till WAL < loc is flushed to disk so it can be safely read.
  */
 static XLogRecPtr
@@ -2690,9 +2705,9 @@ XLogSendLogical(void)
 	if (record != NULL)
 	{
 		/*
-		 * Note the lack of any call to LagTrackerWrite() which is the responsibility
-		 * of the logical decoding plugin. Response messages are handled normally,
-		 * so this responsibility does not extend to needing to call LagTrackerRead().
+		 * Note the lack of any call to LagTrackerWrite() which is handled
+		 * by WalSndUpdateProgress which is called by output plugin through
+		 * logical decoding write api.
 		 */
 		LogicalDecodingProcessRecord(logical_decoding_ctx, logical_decoding_ctx->reader);
 
@@ -3215,9 +3230,8 @@ WalSndKeepaliveIfNecessary(TimestampTz now)
  * LagTrackerRead can compute the elapsed time (lag) when this WAL position is
  * eventually reported to have been written, flushed and applied by the
  * standby in a reply message.
- * Exported to allow logical decoding plugins to call this when they choose.
  */
-void
+static void
 LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time)
 {
 	bool buffer_full;
diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h
index 80f04c3..6346305 100644
--- a/src/include/replication/logical.h
+++ b/src/include/replication/logical.h
@@ -26,6 +26,12 @@ typedef void (*LogicalOutputPluginWriterWrite) (
 
 typedef LogicalOutputPluginWriterWrite LogicalOutputPluginWriterPrepareWrite;
 
+typedef void (*LogicalOutputPluginWriterUpdateProgress) (
+										   struct LogicalDecodingContext *lr,
+															XLogRecPtr Ptr,
+															TransactionId xid
+);
+
 typedef struct LogicalDecodingContext
 {
 	/* memory context this is all allocated in */
@@ -52,6 +58,7 @@ typedef struct LogicalDecodingContext
 	 */
 	LogicalOutputPluginWriterPrepareWrite prepare_write;
 	LogicalOutputPluginWriterWrite write;
+	LogicalOutputPluginWriterUpdateProgress update_progress;
 
 	/*
 	 * Output buffer.
@@ -85,13 +92,15 @@ extern LogicalDecodingContext *CreateInitDecodingContext(char *plugin,
 						  bool need_full_snapshot,
 						  XLogPageReadCB read_page,
 						  LogicalOutputPluginWriterPrepareWrite prepare_write,
-						  LogicalOutputPluginWriterWrite do_write);
+						  LogicalOutputPluginWriterWrite do_write,
+						  LogicalOutputPluginWriterUpdateProgress update_progress);
 extern LogicalDecodingContext *CreateDecodingContext(
 					  XLogRecPtr start_lsn,
 					  List *output_plugin_options,
 					  XLogPageReadCB read_page,
 					  LogicalOutputPluginWriterPrepareWrite prepare_write,
-					  LogicalOutputPluginWriterWrite do_write);
+					  LogicalOutputPluginWriterWrite do_write,
+					  LogicalOutputPluginWriterUpdateProgress update_progress);
 extern void DecodingContextFindStartpoint(LogicalDecodingContext *ctx);
 extern bool DecodingContextReady(LogicalDecodingContext *ctx);
 extern void FreeDecodingContext(LogicalDecodingContext *ctx);
@@ -107,8 +116,6 @@ extern void LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn,
 									  XLogRecPtr restart_lsn);
 extern void LogicalConfirmReceivedLocation(XLogRecPtr lsn);
 
-extern void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time);
-
 extern bool filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id);
 
 #endif
diff --git a/src/include/replication/output_plugin.h b/src/include/replication/output_plugin.h
index 08e962d..2435e2b 100644
--- a/src/include/replication/output_plugin.h
+++ b/src/include/replication/output_plugin.h
@@ -106,5 +106,6 @@ typedef struct OutputPluginCallbacks
 /* Functions in replication/logical/logical.c */
 extern void OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write);
 extern void OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write);
+extern void OutputPluginUpdateProgress(struct LogicalDecodingContext *ctx);
 
 #endif   /* OUTPUT_PLUGIN_H */
-- 
2.7.4

#2Petr Jelinek
petr.jelinek@2ndquadrant.com
In reply to: Petr Jelinek (#1)
1 attachment(s)
Re: Time based lag tracking for logical replication

On 23/04/17 01:10, Petr Jelinek wrote:

Hi,

The time based lag tracking commit [1] added interface for logging
progress of replication so that we can report lag as time interval
instead of just bytes. But the patch didn't contain patch for the
builtin logical replication.

So I wrote something that implements this. I didn't like all that much
the API layering in terms of exporting the walsender's LagTrackerWrite()
for use by plugin directly. Normally output plugin does not have to care
if it's running under walsender or not, it uses abstracted write
interface for that which can be implemented in various ways (that's how
we implement SQL interface to logical decoding after all). So I decided
to add another function to the logical decoding write api called
update_progress and call that one from the output plugin. The walsender
then implements that new API to call the LagTrackerWrite() while the SQL
interface just does not implement it at all. This seems like cleaner way
of doing it.

The original no longer applies after Andres committed some of my fixes
for snapshot builder so here is rebased version.

--
Petr Jelinek http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Training & Services

Attachments:

Add-support-for-time-based-lag-tracking-to-logical-170429.patchbinary/octet-stream; name=Add-support-for-time-based-lag-tracking-to-logical-170429.patchDownload
From 42e5c5c0aebc34f249151e85873e7c3d2cb2c47d Mon Sep 17 00:00:00 2001
From: Petr Jelinek <pjmodos@pjmodos.net>
Date: Sun, 23 Apr 2017 01:00:25 +0200
Subject: [PATCH] Add support for time based lag tracking to logical
 replication

This patch adds new write api interface to LogicalDecodingContext called
update_progress and wrapper around it called OutputPluginUpdateProgress
for output plugins to use for progress reporting. This new interface is
optional, walsender uses it to do the time based lag tracking, while the
SQL interface does not implement it.
---
 src/backend/replication/logical/logical.c      | 34 ++++++++++++++++++++------
 src/backend/replication/logical/logicalfuncs.c |  2 +-
 src/backend/replication/pgoutput/pgoutput.c    |  2 ++
 src/backend/replication/slotfuncs.c            |  3 ++-
 src/backend/replication/walsender.c            | 32 +++++++++++++++++-------
 src/include/replication/logical.h              | 15 +++++++++---
 src/include/replication/output_plugin.h        |  1 +
 7 files changed, 66 insertions(+), 23 deletions(-)

diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index ab963c5..7409e5c 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -117,7 +117,8 @@ StartupDecodingContext(List *output_plugin_options,
 					   bool need_full_snapshot,
 					   XLogPageReadCB read_page,
 					   LogicalOutputPluginWriterPrepareWrite prepare_write,
-					   LogicalOutputPluginWriterWrite do_write)
+					   LogicalOutputPluginWriterWrite do_write,
+					   LogicalOutputPluginWriterUpdateProgress update_progress)
 {
 	ReplicationSlot *slot;
 	MemoryContext context,
@@ -186,6 +187,7 @@ StartupDecodingContext(List *output_plugin_options,
 	ctx->out = makeStringInfo();
 	ctx->prepare_write = prepare_write;
 	ctx->write = do_write;
+	ctx->update_progress = update_progress;
 
 	ctx->output_plugin_options = output_plugin_options;
 
@@ -199,8 +201,9 @@ StartupDecodingContext(List *output_plugin_options,
  *
  * plugin contains the name of the output plugin
  * output_plugin_options contains options passed to the output plugin
- * read_page, prepare_write, do_write are callbacks that have to be filled to
- *		perform the use-case dependent, actual, work.
+ * read_page, prepare_write, do_write, update_progress
+ *  	callbacks that have to be filled to perform the use-case dependent,
+ *  	actual, work.
  *
  * Needs to be called while in a memory context that's at least as long lived
  * as the decoding context because further memory contexts will be created
@@ -215,7 +218,8 @@ CreateInitDecodingContext(char *plugin,
 						  bool need_full_snapshot,
 						  XLogPageReadCB read_page,
 						  LogicalOutputPluginWriterPrepareWrite prepare_write,
-						  LogicalOutputPluginWriterWrite do_write)
+						  LogicalOutputPluginWriterWrite do_write,
+						  LogicalOutputPluginWriterUpdateProgress update_progress)
 {
 	TransactionId xmin_horizon = InvalidTransactionId;
 	ReplicationSlot *slot;
@@ -300,7 +304,7 @@ CreateInitDecodingContext(char *plugin,
 
 	ctx = StartupDecodingContext(NIL, InvalidXLogRecPtr, xmin_horizon,
 								 need_full_snapshot, read_page, prepare_write,
-								 do_write);
+								 do_write, update_progress);
 
 	/* call output plugin initialization callback */
 	old_context = MemoryContextSwitchTo(ctx->context);
@@ -324,7 +328,7 @@ CreateInitDecodingContext(char *plugin,
  * output_plugin_options
  *		contains options passed to the output plugin.
  *
- * read_page, prepare_write, do_write
+ * read_page, prepare_write, do_write, update_progress
  *		callbacks that have to be filled to perform the use-case dependent,
  *		actual work.
  *
@@ -340,7 +344,8 @@ CreateDecodingContext(XLogRecPtr start_lsn,
 					  List *output_plugin_options,
 					  XLogPageReadCB read_page,
 					  LogicalOutputPluginWriterPrepareWrite prepare_write,
-					  LogicalOutputPluginWriterWrite do_write)
+					  LogicalOutputPluginWriterWrite do_write,
+					  LogicalOutputPluginWriterUpdateProgress update_progress)
 {
 	LogicalDecodingContext *ctx;
 	ReplicationSlot *slot;
@@ -390,7 +395,8 @@ CreateDecodingContext(XLogRecPtr start_lsn,
 
 	ctx = StartupDecodingContext(output_plugin_options,
 								 start_lsn, InvalidTransactionId, false,
-								 read_page, prepare_write, do_write);
+								 read_page, prepare_write, do_write,
+								 update_progress);
 
 	/* call output plugin initialization callback */
 	old_context = MemoryContextSwitchTo(ctx->context);
@@ -504,6 +510,18 @@ OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write)
 }
 
 /*
+ * Update progress tracking (if supported).
+ */
+void
+OutputPluginUpdateProgress(struct LogicalDecodingContext *ctx)
+{
+	if (!ctx->update_progress)
+		return;
+
+	ctx->update_progress(ctx, ctx->write_location, ctx->write_xid);
+}
+
+/*
  * Load the output plugin, lookup its output plugin init function, and check
  * that it provides the required callbacks.
  */
diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c
index c251b92..27164de 100644
--- a/src/backend/replication/logical/logicalfuncs.c
+++ b/src/backend/replication/logical/logicalfuncs.c
@@ -253,7 +253,7 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
 									options,
 									logical_read_local_xlog_page,
 									LogicalOutputPrepareWrite,
-									LogicalOutputWrite);
+									LogicalOutputWrite, NULL);
 
 		MemoryContextSwitchTo(oldcontext);
 
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index f3eaccf..4ddfbf7 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -244,6 +244,8 @@ static void
 pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 					 XLogRecPtr commit_lsn)
 {
+	OutputPluginUpdateProgress(ctx);
+
 	OutputPluginPrepareWrite(ctx, true);
 	logicalrep_write_commit(ctx->out, txn, commit_lsn);
 	OutputPluginWrite(ctx, true);
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index 6ee1e68..56a9ca9 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -133,7 +133,8 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS)
 	 */
 	ctx = CreateInitDecodingContext(NameStr(*plugin), NIL,
 									false, /* do not build snapshot */
-									logical_read_local_xlog_page, NULL, NULL);
+									logical_read_local_xlog_page, NULL, NULL,
+									NULL);
 
 	/* build initial snapshot, might take a while */
 	DecodingContextFindStartpoint(ctx);
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 2a6c8bb..5349268 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -240,7 +240,9 @@ static void WalSndCheckTimeOut(TimestampTz now);
 static long WalSndComputeSleeptime(TimestampTz now);
 static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write);
 static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write);
+static void WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid);
 static XLogRecPtr WalSndWaitForWal(XLogRecPtr loc);
+static void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time);
 static TimeOffset LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now);
 static bool TransactionIdInRecentPast(TransactionId xid, uint32 epoch);
 
@@ -915,7 +917,8 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
 
 		ctx = CreateInitDecodingContext(cmd->plugin, NIL, need_full_snapshot,
 										logical_read_xlog_page,
-										WalSndPrepareWrite, WalSndWriteData);
+										WalSndPrepareWrite, WalSndWriteData,
+										WalSndUpdateProgress);
 
 		/*
 		 * Signal that we don't need the timeout mechanism. We're just
@@ -1069,10 +1072,11 @@ StartLogicalReplication(StartReplicationCmd *cmd)
 	 * Initialize position to the last ack'ed one, then the xlog records begin
 	 * to be shipped from that position.
 	 */
-	logical_decoding_ctx = CreateDecodingContext(
-											   cmd->startpoint, cmd->options,
+	logical_decoding_ctx = CreateDecodingContext(cmd->startpoint, cmd->options,
 												 logical_read_xlog_page,
-										WalSndPrepareWrite, WalSndWriteData);
+												 WalSndPrepareWrite,
+												 WalSndWriteData,
+												 WalSndUpdateProgress);
 
 	/* Start reading WAL from the oldest required WAL. */
 	logical_startptr = MyReplicationSlot->data.restart_lsn;
@@ -1232,6 +1236,17 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
 }
 
 /*
+ * LogicalDecodingContext 'progress_update' callback.
+ *
+ * Write the current position to the log tracker (see XLogSendPhysical).
+ */
+static void
+WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid)
+{
+	LagTrackerWrite(lsn, GetCurrentTimestamp());
+}
+
+/*
  * Wait till WAL < loc is flushed to disk so it can be safely read.
  */
 static XLogRecPtr
@@ -2691,9 +2706,9 @@ XLogSendLogical(void)
 	if (record != NULL)
 	{
 		/*
-		 * Note the lack of any call to LagTrackerWrite() which is the responsibility
-		 * of the logical decoding plugin. Response messages are handled normally,
-		 * so this responsibility does not extend to needing to call LagTrackerRead().
+		 * Note the lack of any call to LagTrackerWrite() which is handled
+		 * by WalSndUpdateProgress which is called by output plugin through
+		 * logical decoding write api.
 		 */
 		LogicalDecodingProcessRecord(logical_decoding_ctx, logical_decoding_ctx->reader);
 
@@ -3227,9 +3242,8 @@ WalSndKeepaliveIfNecessary(TimestampTz now)
  * LagTrackerRead can compute the elapsed time (lag) when this WAL position is
  * eventually reported to have been written, flushed and applied by the
  * standby in a reply message.
- * Exported to allow logical decoding plugins to call this when they choose.
  */
-void
+static void
 LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time)
 {
 	bool buffer_full;
diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h
index d0b2e0b..090f9c8 100644
--- a/src/include/replication/logical.h
+++ b/src/include/replication/logical.h
@@ -26,6 +26,12 @@ typedef void (*LogicalOutputPluginWriterWrite) (
 
 typedef LogicalOutputPluginWriterWrite LogicalOutputPluginWriterPrepareWrite;
 
+typedef void (*LogicalOutputPluginWriterUpdateProgress) (
+										   struct LogicalDecodingContext *lr,
+															XLogRecPtr Ptr,
+															TransactionId xid
+);
+
 typedef struct LogicalDecodingContext
 {
 	/* memory context this is all allocated in */
@@ -52,6 +58,7 @@ typedef struct LogicalDecodingContext
 	 */
 	LogicalOutputPluginWriterPrepareWrite prepare_write;
 	LogicalOutputPluginWriterWrite write;
+	LogicalOutputPluginWriterUpdateProgress update_progress;
 
 	/*
 	 * Output buffer.
@@ -85,13 +92,15 @@ extern LogicalDecodingContext *CreateInitDecodingContext(char *plugin,
 						  bool need_full_snapshot,
 						  XLogPageReadCB read_page,
 						  LogicalOutputPluginWriterPrepareWrite prepare_write,
-						  LogicalOutputPluginWriterWrite do_write);
+						  LogicalOutputPluginWriterWrite do_write,
+						  LogicalOutputPluginWriterUpdateProgress update_progress);
 extern LogicalDecodingContext *CreateDecodingContext(
 					  XLogRecPtr start_lsn,
 					  List *output_plugin_options,
 					  XLogPageReadCB read_page,
 					  LogicalOutputPluginWriterPrepareWrite prepare_write,
-					  LogicalOutputPluginWriterWrite do_write);
+					  LogicalOutputPluginWriterWrite do_write,
+					  LogicalOutputPluginWriterUpdateProgress update_progress);
 extern void DecodingContextFindStartpoint(LogicalDecodingContext *ctx);
 extern bool DecodingContextReady(LogicalDecodingContext *ctx);
 extern void FreeDecodingContext(LogicalDecodingContext *ctx);
@@ -101,8 +110,6 @@ extern void LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn,
 									  XLogRecPtr restart_lsn);
 extern void LogicalConfirmReceivedLocation(XLogRecPtr lsn);
 
-extern void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time);
-
 extern bool filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id);
 
 #endif
diff --git a/src/include/replication/output_plugin.h b/src/include/replication/output_plugin.h
index 08e962d..2435e2b 100644
--- a/src/include/replication/output_plugin.h
+++ b/src/include/replication/output_plugin.h
@@ -106,5 +106,6 @@ typedef struct OutputPluginCallbacks
 /* Functions in replication/logical/logical.c */
 extern void OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write);
 extern void OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write);
+extern void OutputPluginUpdateProgress(struct LogicalDecodingContext *ctx);
 
 #endif   /* OUTPUT_PLUGIN_H */
-- 
2.7.4

#3Simon Riggs
simon@2ndquadrant.com
In reply to: Petr Jelinek (#1)
Re: Time based lag tracking for logical replication

On 23 April 2017 at 01:10, Petr Jelinek <petr.jelinek@2ndquadrant.com> wrote:

Hi,

The time based lag tracking commit [1] added interface for logging
progress of replication so that we can report lag as time interval
instead of just bytes. But the patch didn't contain patch for the
builtin logical replication.

So I wrote something that implements this. I didn't like all that much
the API layering in terms of exporting the walsender's LagTrackerWrite()
for use by plugin directly. Normally output plugin does not have to care
if it's running under walsender or not, it uses abstracted write
interface for that which can be implemented in various ways (that's how
we implement SQL interface to logical decoding after all). So I decided
to add another function to the logical decoding write api called
update_progress and call that one from the output plugin. The walsender
then implements that new API to call the LagTrackerWrite() while the SQL
interface just does not implement it at all. This seems like cleaner way
of doing it.

Thoughts?

Agree cleaner.

I don't see any pacing or comments about it, nor handling of
intermediate messages while we process a large transaction.

I'll look at adding some pacing code in WalSndUpdateProgress()

--
Simon Riggs http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

#4Thomas Munro
thomas.munro@enterprisedb.com
In reply to: Simon Riggs (#3)
Re: Time based lag tracking for logical replication

On Wed, May 3, 2017 at 6:28 PM, Simon Riggs <simon@2ndquadrant.com> wrote:

On 23 April 2017 at 01:10, Petr Jelinek <petr.jelinek@2ndquadrant.com> wrote:

Hi,

The time based lag tracking commit [1] added interface for logging
progress of replication so that we can report lag as time interval
instead of just bytes. But the patch didn't contain patch for the
builtin logical replication.

So I wrote something that implements this. I didn't like all that much
the API layering in terms of exporting the walsender's LagTrackerWrite()
for use by plugin directly. Normally output plugin does not have to care
if it's running under walsender or not, it uses abstracted write
interface for that which can be implemented in various ways (that's how
we implement SQL interface to logical decoding after all). So I decided
to add another function to the logical decoding write api called
update_progress and call that one from the output plugin. The walsender
then implements that new API to call the LagTrackerWrite() while the SQL
interface just does not implement it at all. This seems like cleaner way
of doing it.

Thoughts?

Agree cleaner.

+1

I don't see any pacing or comments about it, nor handling of
intermediate messages while we process a large transaction.

I'll look at adding some pacing code in WalSndUpdateProgress()

By the way, I have a small improvement to the interpolation to
propose. Right now, after a period of idleness it can report a silly
large number based on an ancient time, but you won't usually see it
because it's quickly replaced by a sensible number. I think this
thinko will affect logical rep with Petr's patch more. I had been
meaning to post the improvement but got sidetracked by that recovery
test failure problem. I'll post that in the next few days.

--
Thomas Munro
http://www.enterprisedb.com

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

#5Petr Jelinek
petr.jelinek@2ndquadrant.com
In reply to: Simon Riggs (#3)
Re: Time based lag tracking for logical replication

On 03/05/17 08:28, Simon Riggs wrote:

On 23 April 2017 at 01:10, Petr Jelinek <petr.jelinek@2ndquadrant.com> wrote:

Hi,

The time based lag tracking commit [1] added interface for logging
progress of replication so that we can report lag as time interval
instead of just bytes. But the patch didn't contain patch for the
builtin logical replication.

So I wrote something that implements this. I didn't like all that much
the API layering in terms of exporting the walsender's LagTrackerWrite()
for use by plugin directly. Normally output plugin does not have to care
if it's running under walsender or not, it uses abstracted write
interface for that which can be implemented in various ways (that's how
we implement SQL interface to logical decoding after all). So I decided
to add another function to the logical decoding write api called
update_progress and call that one from the output plugin. The walsender
then implements that new API to call the LagTrackerWrite() while the SQL
interface just does not implement it at all. This seems like cleaner way
of doing it.

Thoughts?

Agree cleaner.

I don't see any pacing or comments about it, nor handling of
intermediate messages while we process a large transaction.

Agreed, pacing is good idea because on busy server storing info for
every commit could get expensive.

Don't understand what you mean by "handling of intermediate messages
while we process a large transaction". Logical replication is
transaction based so far, it does not stream like physical replication
so it seems like there is limited usefulness in doing this outside of
commit no?

--
Petr Jelinek http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Training & Services

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

#6Noah Misch
noah@leadboat.com
In reply to: Simon Riggs (#3)
Re: Time based lag tracking for logical replication

On Wed, May 03, 2017 at 08:28:53AM +0200, Simon Riggs wrote:

On 23 April 2017 at 01:10, Petr Jelinek <petr.jelinek@2ndquadrant.com> wrote:

Hi,

The time based lag tracking commit [1] added interface for logging
progress of replication so that we can report lag as time interval
instead of just bytes. But the patch didn't contain patch for the
builtin logical replication.

So I wrote something that implements this. I didn't like all that much
the API layering in terms of exporting the walsender's LagTrackerWrite()
for use by plugin directly. Normally output plugin does not have to care
if it's running under walsender or not, it uses abstracted write
interface for that which can be implemented in various ways (that's how
we implement SQL interface to logical decoding after all). So I decided
to add another function to the logical decoding write api called
update_progress and call that one from the output plugin. The walsender
then implements that new API to call the LagTrackerWrite() while the SQL
interface just does not implement it at all. This seems like cleaner way
of doing it.

Thoughts?

Agree cleaner.

I don't see any pacing or comments about it, nor handling of
intermediate messages while we process a large transaction.

I'll look at adding some pacing code in WalSndUpdateProgress()

[Action required within three days.]

Simon, I understand, from an annotation on the open items list, that you have
volunteered to own this item. Please observe the policy on open item
ownership[1]/messages/by-id/20170404140717.GA2675809@tornado.leadboat.com and send a status update within three calendar days of this
message. Include a date for your subsequent status update. Testers may
discover new open items at any time, and I want to plan to get them all fixed
well in advance of shipping v10. Consequently, I will appreciate your efforts
toward speedy resolution. Thanks.

[1]: /messages/by-id/20170404140717.GA2675809@tornado.leadboat.com

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

#7Noah Misch
noah@leadboat.com
In reply to: Noah Misch (#6)
Re: Time based lag tracking for logical replication

On Fri, May 05, 2017 at 07:07:26AM +0000, Noah Misch wrote:

On Wed, May 03, 2017 at 08:28:53AM +0200, Simon Riggs wrote:

On 23 April 2017 at 01:10, Petr Jelinek <petr.jelinek@2ndquadrant.com> wrote:

Hi,

The time based lag tracking commit [1] added interface for logging
progress of replication so that we can report lag as time interval
instead of just bytes. But the patch didn't contain patch for the
builtin logical replication.

So I wrote something that implements this. I didn't like all that much
the API layering in terms of exporting the walsender's LagTrackerWrite()
for use by plugin directly. Normally output plugin does not have to care
if it's running under walsender or not, it uses abstracted write
interface for that which can be implemented in various ways (that's how
we implement SQL interface to logical decoding after all). So I decided
to add another function to the logical decoding write api called
update_progress and call that one from the output plugin. The walsender
then implements that new API to call the LagTrackerWrite() while the SQL
interface just does not implement it at all. This seems like cleaner way
of doing it.

Thoughts?

Agree cleaner.

I don't see any pacing or comments about it, nor handling of
intermediate messages while we process a large transaction.

I'll look at adding some pacing code in WalSndUpdateProgress()

[Action required within three days.]

Simon, I understand, from an annotation on the open items list, that you have
volunteered to own this item. Please observe the policy on open item
ownership[1] and send a status update within three calendar days of this
message. Include a date for your subsequent status update. Testers may
discover new open items at any time, and I want to plan to get them all fixed
well in advance of shipping v10. Consequently, I will appreciate your efforts
toward speedy resolution. Thanks.

[1] /messages/by-id/20170404140717.GA2675809@tornado.leadboat.com

This PostgreSQL 10 open item is past due for your status update. Kindly send
a status update within 24 hours, and include a date for your subsequent status
update. Refer to the policy on open item ownership:
/messages/by-id/20170404140717.GA2675809@tornado.leadboat.com

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

#8Noah Misch
noah@leadboat.com
In reply to: Noah Misch (#7)
Re: Time based lag tracking for logical replication

On Mon, May 08, 2017 at 10:30:45PM -0700, Noah Misch wrote:

On Fri, May 05, 2017 at 07:07:26AM +0000, Noah Misch wrote:

On Wed, May 03, 2017 at 08:28:53AM +0200, Simon Riggs wrote:

On 23 April 2017 at 01:10, Petr Jelinek <petr.jelinek@2ndquadrant.com> wrote:

Hi,

The time based lag tracking commit [1] added interface for logging
progress of replication so that we can report lag as time interval
instead of just bytes. But the patch didn't contain patch for the
builtin logical replication.

So I wrote something that implements this. I didn't like all that much
the API layering in terms of exporting the walsender's LagTrackerWrite()
for use by plugin directly. Normally output plugin does not have to care
if it's running under walsender or not, it uses abstracted write
interface for that which can be implemented in various ways (that's how
we implement SQL interface to logical decoding after all). So I decided
to add another function to the logical decoding write api called
update_progress and call that one from the output plugin. The walsender
then implements that new API to call the LagTrackerWrite() while the SQL
interface just does not implement it at all. This seems like cleaner way
of doing it.

Thoughts?

Agree cleaner.

I don't see any pacing or comments about it, nor handling of
intermediate messages while we process a large transaction.

I'll look at adding some pacing code in WalSndUpdateProgress()

[Action required within three days.]

Simon, I understand, from an annotation on the open items list, that you have
volunteered to own this item. Please observe the policy on open item
ownership[1] and send a status update within three calendar days of this
message. Include a date for your subsequent status update. Testers may
discover new open items at any time, and I want to plan to get them all fixed
well in advance of shipping v10. Consequently, I will appreciate your efforts
toward speedy resolution. Thanks.

[1] /messages/by-id/20170404140717.GA2675809@tornado.leadboat.com

This PostgreSQL 10 open item is past due for your status update. Kindly send
a status update within 24 hours, and include a date for your subsequent status
update. Refer to the policy on open item ownership:
/messages/by-id/20170404140717.GA2675809@tornado.leadboat.com

IMMEDIATE ATTENTION REQUIRED. This PostgreSQL 10 open item is long past due
for your status update. Please reacquaint yourself with the policy on open
item ownership[1]/messages/by-id/20170404140717.GA2675809@tornado.leadboat.com and then reply immediately. If I do not hear from you by
2017-05-11 06:00 UTC, I will transfer this item to release management team
ownership without further notice.

[1]: /messages/by-id/20170404140717.GA2675809@tornado.leadboat.com

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

#9Noah Misch
noah@leadboat.com
In reply to: Petr Jelinek (#1)
Re: Time based lag tracking for logical replication

On Sun, Apr 23, 2017 at 01:10:32AM +0200, Petr Jelinek wrote:

The time based lag tracking commit [1] added interface for logging
progress of replication so that we can report lag as time interval
instead of just bytes. But the patch didn't contain patch for the
builtin logical replication.

So I wrote something that implements this.

This is listed as a PostgreSQL 10 open item, but the above makes it sound like
a feature to consider for v11, not a defect in v10. Why is this an open item?

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

#10Simon Riggs
simon@2ndquadrant.com
In reply to: Noah Misch (#9)
1 attachment(s)
Re: Time based lag tracking for logical replication

On 11 May 2017 at 08:32, Noah Misch <noah@leadboat.com> wrote:

On Sun, Apr 23, 2017 at 01:10:32AM +0200, Petr Jelinek wrote:

The time based lag tracking commit [1] added interface for logging
progress of replication so that we can report lag as time interval
instead of just bytes. But the patch didn't contain patch for the
builtin logical replication.

So I wrote something that implements this.

This is listed as a PostgreSQL 10 open item, but the above makes it sound like
a feature to consider for v11, not a defect in v10. Why is this an open item?

It's an open item because at the time of the Lag Tracker commit it was
believed to be a single line of code that needed to be added to
Logical Replication to make it work with the Lag Tracker
functionality. It didn't make sense for me to add it while the LR code
was still being changed, even though we had code to do that.

Petr's new patch is slightly longer and needed review and some minor
code to add pacing delay.

My own delay in responding has been because of illness. You'll note
that I'd missed response on at least one other mail from you.
Apologies for that, it has set me back some way but I'm better now and
have caught up with other matters. Petr nudged me to look at this
thread again yesterday, so I had been looking at this over last few
days.

Attached patch is Petr's patch, slightly rebased with added pacing
delay, similar to that used by HSFeedback.

--
Simon Riggs http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services

Attachments:

Add-support-for-time-based-lag-tracking-to-logical-170429.v2.patchapplication/octet-stream; name=Add-support-for-time-based-lag-tracking-to-logical-170429.v2.patchDownload
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index ab963c5345..7409e5ce3d 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -117,7 +117,8 @@ StartupDecodingContext(List *output_plugin_options,
 					   bool need_full_snapshot,
 					   XLogPageReadCB read_page,
 					   LogicalOutputPluginWriterPrepareWrite prepare_write,
-					   LogicalOutputPluginWriterWrite do_write)
+					   LogicalOutputPluginWriterWrite do_write,
+					   LogicalOutputPluginWriterUpdateProgress update_progress)
 {
 	ReplicationSlot *slot;
 	MemoryContext context,
@@ -186,6 +187,7 @@ StartupDecodingContext(List *output_plugin_options,
 	ctx->out = makeStringInfo();
 	ctx->prepare_write = prepare_write;
 	ctx->write = do_write;
+	ctx->update_progress = update_progress;
 
 	ctx->output_plugin_options = output_plugin_options;
 
@@ -199,8 +201,9 @@ StartupDecodingContext(List *output_plugin_options,
  *
  * plugin contains the name of the output plugin
  * output_plugin_options contains options passed to the output plugin
- * read_page, prepare_write, do_write are callbacks that have to be filled to
- *		perform the use-case dependent, actual, work.
+ * read_page, prepare_write, do_write, update_progress
+ *  	callbacks that have to be filled to perform the use-case dependent,
+ *  	actual, work.
  *
  * Needs to be called while in a memory context that's at least as long lived
  * as the decoding context because further memory contexts will be created
@@ -215,7 +218,8 @@ CreateInitDecodingContext(char *plugin,
 						  bool need_full_snapshot,
 						  XLogPageReadCB read_page,
 						  LogicalOutputPluginWriterPrepareWrite prepare_write,
-						  LogicalOutputPluginWriterWrite do_write)
+						  LogicalOutputPluginWriterWrite do_write,
+						  LogicalOutputPluginWriterUpdateProgress update_progress)
 {
 	TransactionId xmin_horizon = InvalidTransactionId;
 	ReplicationSlot *slot;
@@ -300,7 +304,7 @@ CreateInitDecodingContext(char *plugin,
 
 	ctx = StartupDecodingContext(NIL, InvalidXLogRecPtr, xmin_horizon,
 								 need_full_snapshot, read_page, prepare_write,
-								 do_write);
+								 do_write, update_progress);
 
 	/* call output plugin initialization callback */
 	old_context = MemoryContextSwitchTo(ctx->context);
@@ -324,7 +328,7 @@ CreateInitDecodingContext(char *plugin,
  * output_plugin_options
  *		contains options passed to the output plugin.
  *
- * read_page, prepare_write, do_write
+ * read_page, prepare_write, do_write, update_progress
  *		callbacks that have to be filled to perform the use-case dependent,
  *		actual work.
  *
@@ -340,7 +344,8 @@ CreateDecodingContext(XLogRecPtr start_lsn,
 					  List *output_plugin_options,
 					  XLogPageReadCB read_page,
 					  LogicalOutputPluginWriterPrepareWrite prepare_write,
-					  LogicalOutputPluginWriterWrite do_write)
+					  LogicalOutputPluginWriterWrite do_write,
+					  LogicalOutputPluginWriterUpdateProgress update_progress)
 {
 	LogicalDecodingContext *ctx;
 	ReplicationSlot *slot;
@@ -390,7 +395,8 @@ CreateDecodingContext(XLogRecPtr start_lsn,
 
 	ctx = StartupDecodingContext(output_plugin_options,
 								 start_lsn, InvalidTransactionId, false,
-								 read_page, prepare_write, do_write);
+								 read_page, prepare_write, do_write,
+								 update_progress);
 
 	/* call output plugin initialization callback */
 	old_context = MemoryContextSwitchTo(ctx->context);
@@ -504,6 +510,18 @@ OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write)
 }
 
 /*
+ * Update progress tracking (if supported).
+ */
+void
+OutputPluginUpdateProgress(struct LogicalDecodingContext *ctx)
+{
+	if (!ctx->update_progress)
+		return;
+
+	ctx->update_progress(ctx, ctx->write_location, ctx->write_xid);
+}
+
+/*
  * Load the output plugin, lookup its output plugin init function, and check
  * that it provides the required callbacks.
  */
diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c
index c251b92f57..27164de093 100644
--- a/src/backend/replication/logical/logicalfuncs.c
+++ b/src/backend/replication/logical/logicalfuncs.c
@@ -253,7 +253,7 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
 									options,
 									logical_read_local_xlog_page,
 									LogicalOutputPrepareWrite,
-									LogicalOutputWrite);
+									LogicalOutputWrite, NULL);
 
 		MemoryContextSwitchTo(oldcontext);
 
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index f3eaccffd5..4ddfbf7a98 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -244,6 +244,8 @@ static void
 pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 					 XLogRecPtr commit_lsn)
 {
+	OutputPluginUpdateProgress(ctx);
+
 	OutputPluginPrepareWrite(ctx, true);
 	logicalrep_write_commit(ctx->out, txn, commit_lsn);
 	OutputPluginWrite(ctx, true);
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index 6ee1e68819..56a9ca9651 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -133,7 +133,8 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS)
 	 */
 	ctx = CreateInitDecodingContext(NameStr(*plugin), NIL,
 									false, /* do not build snapshot */
-									logical_read_local_xlog_page, NULL, NULL);
+									logical_read_local_xlog_page, NULL, NULL,
+									NULL);
 
 	/* build initial snapshot, might take a while */
 	DecodingContextFindStartpoint(ctx);
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 45d027803a..8bb142a2d4 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -245,7 +245,9 @@ static void WalSndCheckTimeOut(TimestampTz now);
 static long WalSndComputeSleeptime(TimestampTz now);
 static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write);
 static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write);
+static void WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid);
 static XLogRecPtr WalSndWaitForWal(XLogRecPtr loc);
+static void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time);
 static TimeOffset LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now);
 static bool TransactionIdInRecentPast(TransactionId xid, uint32 epoch);
 
@@ -923,7 +925,8 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
 
 		ctx = CreateInitDecodingContext(cmd->plugin, NIL, need_full_snapshot,
 										logical_read_xlog_page,
-										WalSndPrepareWrite, WalSndWriteData);
+										WalSndPrepareWrite, WalSndWriteData,
+										WalSndUpdateProgress);
 
 		/*
 		 * Signal that we don't need the timeout mechanism. We're just
@@ -1077,10 +1080,11 @@ StartLogicalReplication(StartReplicationCmd *cmd)
 	 * Initialize position to the last ack'ed one, then the xlog records begin
 	 * to be shipped from that position.
 	 */
-	logical_decoding_ctx = CreateDecodingContext(
-											   cmd->startpoint, cmd->options,
+	logical_decoding_ctx = CreateDecodingContext(cmd->startpoint, cmd->options,
 												 logical_read_xlog_page,
-										WalSndPrepareWrite, WalSndWriteData);
+												 WalSndPrepareWrite,
+												 WalSndWriteData,
+												 WalSndUpdateProgress);
 
 	/* Start reading WAL from the oldest required WAL. */
 	logical_startptr = MyReplicationSlot->data.restart_lsn;
@@ -1240,6 +1244,29 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
 }
 
 /*
+ * LogicalDecodingContext 'progress_update' callback.
+ *
+ * Write the current position to the log tracker (see XLogSendPhysical).
+ */
+static void
+WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid)
+{
+	TimestampTz now = GetCurrentTimestamp();
+	static TimestampTz sendTime = 0;
+
+	/*
+	 * Track lag no more than once per WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS
+	 */
+#define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS 	1000
+	if (!TimestampDifferenceExceeds(sendTime, now,
+									WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS))
+		return;
+
+	LagTrackerWrite(lsn, now);
+	sendTime = now;
+}
+
+/*
  * Wait till WAL < loc is flushed to disk so it can be safely read.
  */
 static XLogRecPtr
@@ -2730,9 +2757,9 @@ XLogSendLogical(void)
 	if (record != NULL)
 	{
 		/*
-		 * Note the lack of any call to LagTrackerWrite() which is the responsibility
-		 * of the logical decoding plugin. Response messages are handled normally,
-		 * so this responsibility does not extend to needing to call LagTrackerRead().
+		 * Note the lack of any call to LagTrackerWrite() which is handled
+		 * by WalSndUpdateProgress which is called by output plugin through
+		 * logical decoding write api.
 		 */
 		LogicalDecodingProcessRecord(logical_decoding_ctx, logical_decoding_ctx->reader);
 
@@ -3328,9 +3355,8 @@ WalSndKeepaliveIfNecessary(TimestampTz now)
  * LagTrackerRead can compute the elapsed time (lag) when this WAL position is
  * eventually reported to have been written, flushed and applied by the
  * standby in a reply message.
- * Exported to allow logical decoding plugins to call this when they choose.
  */
-void
+static void
 LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time)
 {
 	bool buffer_full;
diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h
index d0b2e0bbae..090f9c8268 100644
--- a/src/include/replication/logical.h
+++ b/src/include/replication/logical.h
@@ -26,6 +26,12 @@ typedef void (*LogicalOutputPluginWriterWrite) (
 
 typedef LogicalOutputPluginWriterWrite LogicalOutputPluginWriterPrepareWrite;
 
+typedef void (*LogicalOutputPluginWriterUpdateProgress) (
+										   struct LogicalDecodingContext *lr,
+															XLogRecPtr Ptr,
+															TransactionId xid
+);
+
 typedef struct LogicalDecodingContext
 {
 	/* memory context this is all allocated in */
@@ -52,6 +58,7 @@ typedef struct LogicalDecodingContext
 	 */
 	LogicalOutputPluginWriterPrepareWrite prepare_write;
 	LogicalOutputPluginWriterWrite write;
+	LogicalOutputPluginWriterUpdateProgress update_progress;
 
 	/*
 	 * Output buffer.
@@ -85,13 +92,15 @@ extern LogicalDecodingContext *CreateInitDecodingContext(char *plugin,
 						  bool need_full_snapshot,
 						  XLogPageReadCB read_page,
 						  LogicalOutputPluginWriterPrepareWrite prepare_write,
-						  LogicalOutputPluginWriterWrite do_write);
+						  LogicalOutputPluginWriterWrite do_write,
+						  LogicalOutputPluginWriterUpdateProgress update_progress);
 extern LogicalDecodingContext *CreateDecodingContext(
 					  XLogRecPtr start_lsn,
 					  List *output_plugin_options,
 					  XLogPageReadCB read_page,
 					  LogicalOutputPluginWriterPrepareWrite prepare_write,
-					  LogicalOutputPluginWriterWrite do_write);
+					  LogicalOutputPluginWriterWrite do_write,
+					  LogicalOutputPluginWriterUpdateProgress update_progress);
 extern void DecodingContextFindStartpoint(LogicalDecodingContext *ctx);
 extern bool DecodingContextReady(LogicalDecodingContext *ctx);
 extern void FreeDecodingContext(LogicalDecodingContext *ctx);
@@ -101,8 +110,6 @@ extern void LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn,
 									  XLogRecPtr restart_lsn);
 extern void LogicalConfirmReceivedLocation(XLogRecPtr lsn);
 
-extern void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time);
-
 extern bool filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id);
 
 #endif
diff --git a/src/include/replication/output_plugin.h b/src/include/replication/output_plugin.h
index 08e962d0c0..2435e2be2d 100644
--- a/src/include/replication/output_plugin.h
+++ b/src/include/replication/output_plugin.h
@@ -106,5 +106,6 @@ typedef struct OutputPluginCallbacks
 /* Functions in replication/logical/logical.c */
 extern void OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write);
 extern void OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write);
+extern void OutputPluginUpdateProgress(struct LogicalDecodingContext *ctx);
 
 #endif   /* OUTPUT_PLUGIN_H */
#11Petr Jelinek
petr.jelinek@2ndquadrant.com
In reply to: Simon Riggs (#10)
Re: Time based lag tracking for logical replication

On 11/05/17 15:01, Simon Riggs wrote:

On 11 May 2017 at 08:32, Noah Misch <noah@leadboat.com> wrote:

On Sun, Apr 23, 2017 at 01:10:32AM +0200, Petr Jelinek wrote:

The time based lag tracking commit [1] added interface for logging
progress of replication so that we can report lag as time interval
instead of just bytes. But the patch didn't contain patch for the
builtin logical replication.

So I wrote something that implements this.

This is listed as a PostgreSQL 10 open item, but the above makes it sound like
a feature to consider for v11, not a defect in v10. Why is this an open item?

It's an open item because at the time of the Lag Tracker commit it was
believed to be a single line of code that needed to be added to
Logical Replication to make it work with the Lag Tracker
functionality. It didn't make sense for me to add it while the LR code
was still being changed, even though we had code to do that.

Petr's new patch is slightly longer and needed review and some minor
code to add pacing delay.

My own delay in responding has been because of illness. You'll note
that I'd missed response on at least one other mail from you.
Apologies for that, it has set me back some way but I'm better now and
have caught up with other matters. Petr nudged me to look at this
thread again yesterday, so I had been looking at this over last few
days.

Attached patch is Petr's patch, slightly rebased with added pacing
delay, similar to that used by HSFeedback.

This looks reasonable. I would perhaps change:

+       /*
+        * Track lag no more than once per WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS
+        */

to something like this for extra clarity:

+       /*
+        * Track lag no more than once per WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS
+        * to avoid flooding the lag tracker on busy servers.
+        */

--
Petr Jelinek http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Training & Services

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

#12Simon Riggs
simon@2ndquadrant.com
In reply to: Petr Jelinek (#11)
1 attachment(s)
Re: Time based lag tracking for logical replication

On 11 May 2017 at 14:12, Petr Jelinek <petr.jelinek@2ndquadrant.com> wrote:

Attached patch is Petr's patch, slightly rebased with added pacing
delay, similar to that used by HSFeedback.

This looks reasonable. I would perhaps change:

+       /*
+        * Track lag no more than once per WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS
+        */

to something like this for extra clarity:

+       /*
+        * Track lag no more than once per WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS
+        * to avoid flooding the lag tracker on busy servers.
+        */

New patch, v3.

Applying in 90 minutes, barring objections.

--
Simon Riggs http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services

Attachments:

Add-support-for-time-based-lag-tracking-to-logical-170429.v3.patchapplication/octet-stream; name=Add-support-for-time-based-lag-tracking-to-logical-170429.v3.patchDownload
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index ab963c5345..7409e5ce3d 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -117,7 +117,8 @@ StartupDecodingContext(List *output_plugin_options,
 					   bool need_full_snapshot,
 					   XLogPageReadCB read_page,
 					   LogicalOutputPluginWriterPrepareWrite prepare_write,
-					   LogicalOutputPluginWriterWrite do_write)
+					   LogicalOutputPluginWriterWrite do_write,
+					   LogicalOutputPluginWriterUpdateProgress update_progress)
 {
 	ReplicationSlot *slot;
 	MemoryContext context,
@@ -186,6 +187,7 @@ StartupDecodingContext(List *output_plugin_options,
 	ctx->out = makeStringInfo();
 	ctx->prepare_write = prepare_write;
 	ctx->write = do_write;
+	ctx->update_progress = update_progress;
 
 	ctx->output_plugin_options = output_plugin_options;
 
@@ -199,8 +201,9 @@ StartupDecodingContext(List *output_plugin_options,
  *
  * plugin contains the name of the output plugin
  * output_plugin_options contains options passed to the output plugin
- * read_page, prepare_write, do_write are callbacks that have to be filled to
- *		perform the use-case dependent, actual, work.
+ * read_page, prepare_write, do_write, update_progress
+ *  	callbacks that have to be filled to perform the use-case dependent,
+ *  	actual, work.
  *
  * Needs to be called while in a memory context that's at least as long lived
  * as the decoding context because further memory contexts will be created
@@ -215,7 +218,8 @@ CreateInitDecodingContext(char *plugin,
 						  bool need_full_snapshot,
 						  XLogPageReadCB read_page,
 						  LogicalOutputPluginWriterPrepareWrite prepare_write,
-						  LogicalOutputPluginWriterWrite do_write)
+						  LogicalOutputPluginWriterWrite do_write,
+						  LogicalOutputPluginWriterUpdateProgress update_progress)
 {
 	TransactionId xmin_horizon = InvalidTransactionId;
 	ReplicationSlot *slot;
@@ -300,7 +304,7 @@ CreateInitDecodingContext(char *plugin,
 
 	ctx = StartupDecodingContext(NIL, InvalidXLogRecPtr, xmin_horizon,
 								 need_full_snapshot, read_page, prepare_write,
-								 do_write);
+								 do_write, update_progress);
 
 	/* call output plugin initialization callback */
 	old_context = MemoryContextSwitchTo(ctx->context);
@@ -324,7 +328,7 @@ CreateInitDecodingContext(char *plugin,
  * output_plugin_options
  *		contains options passed to the output plugin.
  *
- * read_page, prepare_write, do_write
+ * read_page, prepare_write, do_write, update_progress
  *		callbacks that have to be filled to perform the use-case dependent,
  *		actual work.
  *
@@ -340,7 +344,8 @@ CreateDecodingContext(XLogRecPtr start_lsn,
 					  List *output_plugin_options,
 					  XLogPageReadCB read_page,
 					  LogicalOutputPluginWriterPrepareWrite prepare_write,
-					  LogicalOutputPluginWriterWrite do_write)
+					  LogicalOutputPluginWriterWrite do_write,
+					  LogicalOutputPluginWriterUpdateProgress update_progress)
 {
 	LogicalDecodingContext *ctx;
 	ReplicationSlot *slot;
@@ -390,7 +395,8 @@ CreateDecodingContext(XLogRecPtr start_lsn,
 
 	ctx = StartupDecodingContext(output_plugin_options,
 								 start_lsn, InvalidTransactionId, false,
-								 read_page, prepare_write, do_write);
+								 read_page, prepare_write, do_write,
+								 update_progress);
 
 	/* call output plugin initialization callback */
 	old_context = MemoryContextSwitchTo(ctx->context);
@@ -504,6 +510,18 @@ OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write)
 }
 
 /*
+ * Update progress tracking (if supported).
+ */
+void
+OutputPluginUpdateProgress(struct LogicalDecodingContext *ctx)
+{
+	if (!ctx->update_progress)
+		return;
+
+	ctx->update_progress(ctx, ctx->write_location, ctx->write_xid);
+}
+
+/*
  * Load the output plugin, lookup its output plugin init function, and check
  * that it provides the required callbacks.
  */
diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c
index c251b92f57..27164de093 100644
--- a/src/backend/replication/logical/logicalfuncs.c
+++ b/src/backend/replication/logical/logicalfuncs.c
@@ -253,7 +253,7 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
 									options,
 									logical_read_local_xlog_page,
 									LogicalOutputPrepareWrite,
-									LogicalOutputWrite);
+									LogicalOutputWrite, NULL);
 
 		MemoryContextSwitchTo(oldcontext);
 
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index f3eaccffd5..4ddfbf7a98 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -244,6 +244,8 @@ static void
 pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 					 XLogRecPtr commit_lsn)
 {
+	OutputPluginUpdateProgress(ctx);
+
 	OutputPluginPrepareWrite(ctx, true);
 	logicalrep_write_commit(ctx->out, txn, commit_lsn);
 	OutputPluginWrite(ctx, true);
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index 6ee1e68819..56a9ca9651 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -133,7 +133,8 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS)
 	 */
 	ctx = CreateInitDecodingContext(NameStr(*plugin), NIL,
 									false, /* do not build snapshot */
-									logical_read_local_xlog_page, NULL, NULL);
+									logical_read_local_xlog_page, NULL, NULL,
+									NULL);
 
 	/* build initial snapshot, might take a while */
 	DecodingContextFindStartpoint(ctx);
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 45d027803a..e4e5337d54 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -245,7 +245,9 @@ static void WalSndCheckTimeOut(TimestampTz now);
 static long WalSndComputeSleeptime(TimestampTz now);
 static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write);
 static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write);
+static void WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid);
 static XLogRecPtr WalSndWaitForWal(XLogRecPtr loc);
+static void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time);
 static TimeOffset LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now);
 static bool TransactionIdInRecentPast(TransactionId xid, uint32 epoch);
 
@@ -923,7 +925,8 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
 
 		ctx = CreateInitDecodingContext(cmd->plugin, NIL, need_full_snapshot,
 										logical_read_xlog_page,
-										WalSndPrepareWrite, WalSndWriteData);
+										WalSndPrepareWrite, WalSndWriteData,
+										WalSndUpdateProgress);
 
 		/*
 		 * Signal that we don't need the timeout mechanism. We're just
@@ -1077,10 +1080,11 @@ StartLogicalReplication(StartReplicationCmd *cmd)
 	 * Initialize position to the last ack'ed one, then the xlog records begin
 	 * to be shipped from that position.
 	 */
-	logical_decoding_ctx = CreateDecodingContext(
-											   cmd->startpoint, cmd->options,
+	logical_decoding_ctx = CreateDecodingContext(cmd->startpoint, cmd->options,
 												 logical_read_xlog_page,
-										WalSndPrepareWrite, WalSndWriteData);
+												 WalSndPrepareWrite,
+												 WalSndWriteData,
+												 WalSndUpdateProgress);
 
 	/* Start reading WAL from the oldest required WAL. */
 	logical_startptr = MyReplicationSlot->data.restart_lsn;
@@ -1240,6 +1244,30 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
 }
 
 /*
+ * LogicalDecodingContext 'progress_update' callback.
+ *
+ * Write the current position to the log tracker (see XLogSendPhysical).
+ */
+static void
+WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid)
+{
+	static TimestampTz sendTime = 0;
+	TimestampTz now = GetCurrentTimestamp();
+
+	/*
+	 * Track lag no more than once per WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS
+	 * to avoid flooding the lag tracker when we commit frequently.
+	 */
+#define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS 	1000
+	if (!TimestampDifferenceExceeds(sendTime, now,
+									WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS))
+		return;
+
+	LagTrackerWrite(lsn, now);
+	sendTime = now;
+}
+
+/*
  * Wait till WAL < loc is flushed to disk so it can be safely read.
  */
 static XLogRecPtr
@@ -2730,9 +2758,9 @@ XLogSendLogical(void)
 	if (record != NULL)
 	{
 		/*
-		 * Note the lack of any call to LagTrackerWrite() which is the responsibility
-		 * of the logical decoding plugin. Response messages are handled normally,
-		 * so this responsibility does not extend to needing to call LagTrackerRead().
+		 * Note the lack of any call to LagTrackerWrite() which is handled
+		 * by WalSndUpdateProgress which is called by output plugin through
+		 * logical decoding write api.
 		 */
 		LogicalDecodingProcessRecord(logical_decoding_ctx, logical_decoding_ctx->reader);
 
@@ -3328,9 +3356,8 @@ WalSndKeepaliveIfNecessary(TimestampTz now)
  * LagTrackerRead can compute the elapsed time (lag) when this WAL position is
  * eventually reported to have been written, flushed and applied by the
  * standby in a reply message.
- * Exported to allow logical decoding plugins to call this when they choose.
  */
-void
+static void
 LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time)
 {
 	bool buffer_full;
diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h
index d0b2e0bbae..090f9c8268 100644
--- a/src/include/replication/logical.h
+++ b/src/include/replication/logical.h
@@ -26,6 +26,12 @@ typedef void (*LogicalOutputPluginWriterWrite) (
 
 typedef LogicalOutputPluginWriterWrite LogicalOutputPluginWriterPrepareWrite;
 
+typedef void (*LogicalOutputPluginWriterUpdateProgress) (
+										   struct LogicalDecodingContext *lr,
+															XLogRecPtr Ptr,
+															TransactionId xid
+);
+
 typedef struct LogicalDecodingContext
 {
 	/* memory context this is all allocated in */
@@ -52,6 +58,7 @@ typedef struct LogicalDecodingContext
 	 */
 	LogicalOutputPluginWriterPrepareWrite prepare_write;
 	LogicalOutputPluginWriterWrite write;
+	LogicalOutputPluginWriterUpdateProgress update_progress;
 
 	/*
 	 * Output buffer.
@@ -85,13 +92,15 @@ extern LogicalDecodingContext *CreateInitDecodingContext(char *plugin,
 						  bool need_full_snapshot,
 						  XLogPageReadCB read_page,
 						  LogicalOutputPluginWriterPrepareWrite prepare_write,
-						  LogicalOutputPluginWriterWrite do_write);
+						  LogicalOutputPluginWriterWrite do_write,
+						  LogicalOutputPluginWriterUpdateProgress update_progress);
 extern LogicalDecodingContext *CreateDecodingContext(
 					  XLogRecPtr start_lsn,
 					  List *output_plugin_options,
 					  XLogPageReadCB read_page,
 					  LogicalOutputPluginWriterPrepareWrite prepare_write,
-					  LogicalOutputPluginWriterWrite do_write);
+					  LogicalOutputPluginWriterWrite do_write,
+					  LogicalOutputPluginWriterUpdateProgress update_progress);
 extern void DecodingContextFindStartpoint(LogicalDecodingContext *ctx);
 extern bool DecodingContextReady(LogicalDecodingContext *ctx);
 extern void FreeDecodingContext(LogicalDecodingContext *ctx);
@@ -101,8 +110,6 @@ extern void LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn,
 									  XLogRecPtr restart_lsn);
 extern void LogicalConfirmReceivedLocation(XLogRecPtr lsn);
 
-extern void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time);
-
 extern bool filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id);
 
 #endif
diff --git a/src/include/replication/output_plugin.h b/src/include/replication/output_plugin.h
index 08e962d0c0..2435e2be2d 100644
--- a/src/include/replication/output_plugin.h
+++ b/src/include/replication/output_plugin.h
@@ -106,5 +106,6 @@ typedef struct OutputPluginCallbacks
 /* Functions in replication/logical/logical.c */
 extern void OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write);
 extern void OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write);
+extern void OutputPluginUpdateProgress(struct LogicalDecodingContext *ctx);
 
 #endif   /* OUTPUT_PLUGIN_H */
#13Andres Freund
andres@anarazel.de
In reply to: Simon Riggs (#12)
Re: Time based lag tracking for logical replication

On May 11, 2017 8:08:11 AM PDT, Simon Riggs <simon@2ndquadrant.com> wrote:

On 11 May 2017 at 14:12, Petr Jelinek <petr.jelinek@2ndquadrant.com>
wrote:

Attached patch is Petr's patch, slightly rebased with added pacing
delay, similar to that used by HSFeedback.

This looks reasonable. I would perhaps change:

+       /*
+        * Track lag no more than once per

WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS

+ */

to something like this for extra clarity:

+       /*
+        * Track lag no more than once per

WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS

+        * to avoid flooding the lag tracker on busy servers.
+        */

New patch, v3.

Applying in 90 minutes, barring objections.

Could you please wait till tomorrow? I've bigger pending fixes for related code pending/being tested that I plan to push today. I'd also like to take a look before...

Thanks,

Andres
--
Sent from my Android device with K-9 Mail. Please excuse my brevity.

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

#14Simon Riggs
simon@2ndquadrant.com
In reply to: Andres Freund (#13)
Re: Time based lag tracking for logical replication

On 11 May 2017 at 18:13, Andres Freund <andres@anarazel.de> wrote:

New patch, v3.

Applying in 90 minutes, barring objections.

Could you please wait till tomorrow? I've bigger pending fixes for related code pending/being tested that I plan to push today. I'd also like to take a look before...

Sure.

--
Simon Riggs http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

#15Simon Riggs
simon@2ndquadrant.com
In reply to: Simon Riggs (#14)
Re: Time based lag tracking for logical replication

On 11 May 2017 at 18:29, Simon Riggs <simon@2ndquadrant.com> wrote:

On 11 May 2017 at 18:13, Andres Freund <andres@anarazel.de> wrote:

New patch, v3.

Applying in 90 minutes, barring objections.

Could you please wait till tomorrow? I've bigger pending fixes for related code pending/being tested that I plan to push today. I'd also like to take a look before...

Sure.

The changes I've added are very minor, so I'm not expecting debate.
The main part of the patch is the same as Petr posted 19days ago.

I'm travelling now, so after waiting till tomorrow as you requested I
have committed the patch.

Cheers

--
Simon Riggs http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

#16Neha Khatri
nehakhatri5@gmail.com
In reply to: Simon Riggs (#15)
Re: Time based lag tracking for logical replication

On Fri, May 12, 2017 at 8:19 PM, Simon Riggs <simon@2ndquadrant.com> wrote:

On 11 May 2017 at 18:29, Simon Riggs <simon@2ndquadrant.com> wrote:

On 11 May 2017 at 18:13, Andres Freund <andres@anarazel.de> wrote:

New patch, v3.

Applying in 90 minutes, barring objections.

Could you please wait till tomorrow?  I've bigger pending fixes for related code pending/being tested that I plan to push today.  I'd also like to take a look before...

Sure.

The changes I've added are very minor, so I'm not expecting debate.
The main part of the patch is the same as Petr posted 19days ago.

I'm travelling now, so after waiting till tomorrow as you requested I
have committed the patch.

Prior to this commit CREATE SUBSCRIPTION used to work smoothly.

After this commit 024711bb544645c8b1061e9f02b261e2e336981d I get
following error while executing CREATE SUBSCRIPTION:

CREATE SUBSCRIPTION sub1 CONNECTION 'dbname=postgres host=localhost
user=neha port=5432' PUBLICATION mypub;
NOTICE: synchronized table states
ERROR: could not create replication slot "sub1": ERROR: could not
load library "/home/neha/postgres/PGCurrentInstall/lib/pgoutput.so":
/home/neha/postgres/PGCurrentInstall/lib/pgoutput.so: undefined
symbol: OutputPluginUpdateProgress

Regards
Neha

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

#17Petr Jelinek
petr.jelinek@2ndquadrant.com
In reply to: Neha Khatri (#16)
Re: Time based lag tracking for logical replication

On 12/05/17 15:09, Neha Khatri wrote:

On Fri, May 12, 2017 at 8:19 PM, Simon Riggs <simon@2ndquadrant.com> wrote:

On 11 May 2017 at 18:29, Simon Riggs <simon@2ndquadrant.com> wrote:

On 11 May 2017 at 18:13, Andres Freund <andres@anarazel.de> wrote:

New patch, v3.

Applying in 90 minutes, barring objections.

Could you please wait till tomorrow? I've bigger pending fixes for related code pending/being tested that I plan to push today. I'd also like to take a look before...

Sure.

The changes I've added are very minor, so I'm not expecting debate.
The main part of the patch is the same as Petr posted 19days ago.

I'm travelling now, so after waiting till tomorrow as you requested I
have committed the patch.

Prior to this commit CREATE SUBSCRIPTION used to work smoothly.

After this commit 024711bb544645c8b1061e9f02b261e2e336981d I get
following error while executing CREATE SUBSCRIPTION:

CREATE SUBSCRIPTION sub1 CONNECTION 'dbname=postgres host=localhost
user=neha port=5432' PUBLICATION mypub;
NOTICE: synchronized table states
ERROR: could not create replication slot "sub1": ERROR: could not
load library "/home/neha/postgres/PGCurrentInstall/lib/pgoutput.so":
/home/neha/postgres/PGCurrentInstall/lib/pgoutput.so: undefined
symbol: OutputPluginUpdateProgress

Hmm, that sounds like partial rebuild/install (old postgres binary with
new pgoutput one).

--
Petr Jelinek http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Training & Services

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

#18Neha Khatri
nehakhatri5@gmail.com
In reply to: Petr Jelinek (#17)
Re: Time based lag tracking for logical replication

On Sat, May 13, 2017 at 12:04 AM, Petr Jelinek <petr.jelinek@2ndquadrant.com

wrote:

After this commit 024711bb544645c8b1061e9f02b261e2e336981d I get
following error while executing CREATE SUBSCRIPTION:

CREATE SUBSCRIPTION sub1 CONNECTION 'dbname=postgres host=localhost
user=neha port=5432' PUBLICATION mypub;
NOTICE: synchronized table states
ERROR: could not create replication slot "sub1": ERROR: could not
load library "/home/neha/postgres/PGCurrentInstall/lib/pgoutput.so":
/home/neha/postgres/PGCurrentInstall/lib/pgoutput.so: undefined
symbol: OutputPluginUpdateProgress

Hmm, that sounds like partial rebuild/install (old postgres binary with
new pgoutput one).

That's right. Thanks.

Neha