Add the replication origin name and commit-LSN to logical replication worker errcontext
Hia,
We've added some information such as the command and the timestamp to
the error context message by commit abc0910e2. This patch adds further
information to it: replication origin name and commit-LSN.
This will be helpful for users to set the origin name and LSN to
pg_replication_origin_advance().
The errcontext message would become like follows:
*Before
ERROR: duplicate key value violates unique constraint "test_pkey"
DETAIL: Key (c)=(1) already exists.
CONTEXT: processing remote data during "INSERT" for replication
target relation "public.test" in transaction 726 at 2022-02-28
20:59:56.005909+09
* After
ERROR: duplicate key value violates unique constraint "test_pkey"
DETAIL: Key (c)=(1) already exists.
CONTEXT: processing remote data during "INSERT" for replication
target relation "public.test" in transaction 726 committed at LSN
0/14BFA88 and timestamp 2022-02-28 20:58:27.964238+09 from replication
origin "pg_16395"
I'm a bit concerned that the message may be too long.
I've attached two patches: the first one changes
apply_error_callback() so that it uses complete sentences with if-else
blocks in order to have a translation work, the second patch adds the
origin name and commit-LSN to the errcontext message.
Regards,
--
Masahiko Sawada
EDB: https://www.enterprisedb.com/
Attachments:
0002-Add-the-origin-name-and-remote-commit-LSN-to-logical.patchapplication/x-patch; name=0002-Add-the-origin-name-and-remote-commit-LSN-to-logical.patchDownload
From dc6d97c71394c7c216920b9aa1d55bf33c5ac472 Mon Sep 17 00:00:00 2001
From: Masahiko Sawada <sawada.mshk@gmail.com>
Date: Thu, 24 Feb 2022 16:56:58 +0900
Subject: [PATCH 2/2] Add the origin name and remote commit-LSN to logical
replication worker errcontext.
This commits adds both the commit-LSN and replication origin name to
the existing error context message.
This will help users in specifying the origin name and commit-LSN to
pg_replication_origin_advance() SQL function to skip the particular transaction.
---
doc/src/sgml/logical-replication.sgml | 19 +++++--
src/backend/replication/logical/worker.c | 71 ++++++++++++++++++------
2 files changed, 67 insertions(+), 23 deletions(-)
diff --git a/doc/src/sgml/logical-replication.sgml b/doc/src/sgml/logical-replication.sgml
index 96b4886e08..a96cc21a1c 100644
--- a/doc/src/sgml/logical-replication.sgml
+++ b/doc/src/sgml/logical-replication.sgml
@@ -354,12 +354,21 @@
<para>
The resolution can be done either by changing data or permissions on the subscriber so
that it does not conflict with the incoming change or by skipping the
- transaction that conflicts with the existing data. The transaction can be
- skipped by calling the <link linkend="pg-replication-origin-advance">
+ transaction that conflicts with the existing data. When a conflict produces
+ an error, it is shown in the subscriber's server logs as follows:
+<screen>
+ERROR: duplicate key value violates unique constraint "test_pkey"
+DETAIL: Key (c)=(1) already exists.
+CONTEXT: processing remote data during "INSERT" for replication target relation "public.test" in transaction 725 committed at LSN 0/14BFA88 and timestamp 2022-02-28 20:58:27.964238+00 from replication origin "pg_16395"
+</screen>
+ The LSN of the transaction that contains the change violating the constraint and
+ the replication origin name can be found from those outputs (LSN 0/14C0378 and
+ replication origin <literal>pg_16395</literal> in the above case). The transaction
+ can be skipped by calling the <link linkend="pg-replication-origin-advance">
<function>pg_replication_origin_advance()</function></link> function with
- a <parameter>node_name</parameter> corresponding to the subscription name,
- and a position. The current position of origins can be seen in the
- <link linkend="view-pg-replication-origin-status">
+ the <parameter>node_name</parameter> and the next LSN of the commit LSN
+ (i.e., 0/14C0379) from those outputs. The current position of origins can be
+ seen in the <link linkend="view-pg-replication-origin-status">
<structname>pg_replication_origin_status</structname></link> system view.
</para>
</sect1>
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index ac49e73b45..3fe5f50806 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -226,6 +226,8 @@ typedef struct ApplyErrorCallbackArg
/* Remote node information */
int remote_attnum; /* -1 if invalid */
TransactionId remote_xid;
+ XLogRecPtr commit_lsn;
+ char *origin_name;
TimestampTz ts; /* commit, rollback, or prepare timestamp */
} ApplyErrorCallbackArg;
@@ -235,6 +237,8 @@ static ApplyErrorCallbackArg apply_error_callback_arg =
.rel = NULL,
.remote_attnum = -1,
.remote_xid = InvalidTransactionId,
+ .commit_lsn = InvalidXLogRecPtr,
+ .origin_name = NULL,
.ts = 0,
};
@@ -334,7 +338,8 @@ static void apply_spooled_messages(TransactionId xid, XLogRecPtr lsn);
/* Functions for apply error callback */
static void apply_error_callback(void *arg);
-static inline void set_apply_error_context_xact(TransactionId xid, TimestampTz ts);
+static inline void set_apply_error_context_xact(TransactionId xid, XLogRecPtr lsn,
+ TimestampTz ts);
static inline void reset_apply_error_context_info(void);
/*
@@ -787,7 +792,8 @@ apply_handle_begin(StringInfo s)
LogicalRepBeginData begin_data;
logicalrep_read_begin(s, &begin_data);
- set_apply_error_context_xact(begin_data.xid, begin_data.committime);
+ set_apply_error_context_xact(begin_data.xid, begin_data.final_lsn,
+ begin_data.committime);
remote_final_lsn = begin_data.final_lsn;
@@ -839,7 +845,8 @@ apply_handle_begin_prepare(StringInfo s)
errmsg_internal("tablesync worker received a BEGIN PREPARE message")));
logicalrep_read_begin_prepare(s, &begin_data);
- set_apply_error_context_xact(begin_data.xid, begin_data.prepare_time);
+ set_apply_error_context_xact(begin_data.xid, begin_data.prepare_lsn,
+ begin_data.prepare_time);
remote_final_lsn = begin_data.prepare_lsn;
@@ -938,7 +945,8 @@ apply_handle_commit_prepared(StringInfo s)
char gid[GIDSIZE];
logicalrep_read_commit_prepared(s, &prepare_data);
- set_apply_error_context_xact(prepare_data.xid, prepare_data.commit_time);
+ set_apply_error_context_xact(prepare_data.xid, prepare_data.commit_lsn,
+ prepare_data.commit_time);
/* Compute GID for two_phase transactions. */
TwoPhaseTransactionGid(MySubscription->oid, prepare_data.xid,
@@ -979,7 +987,8 @@ apply_handle_rollback_prepared(StringInfo s)
char gid[GIDSIZE];
logicalrep_read_rollback_prepared(s, &rollback_data);
- set_apply_error_context_xact(rollback_data.xid, rollback_data.rollback_time);
+ set_apply_error_context_xact(rollback_data.xid, rollback_data.rollback_end_lsn,
+ rollback_data.rollback_time);
/* Compute GID for two_phase transactions. */
TwoPhaseTransactionGid(MySubscription->oid, rollback_data.xid,
@@ -1044,7 +1053,8 @@ apply_handle_stream_prepare(StringInfo s)
errmsg_internal("tablesync worker received a STREAM PREPARE message")));
logicalrep_read_stream_prepare(s, &prepare_data);
- set_apply_error_context_xact(prepare_data.xid, prepare_data.prepare_time);
+ set_apply_error_context_xact(prepare_data.xid, prepare_data.prepare_lsn,
+ prepare_data.prepare_time);
elog(DEBUG1, "received prepare for streamed transaction %u", prepare_data.xid);
@@ -1126,7 +1136,7 @@ apply_handle_stream_start(StringInfo s)
(errcode(ERRCODE_PROTOCOL_VIOLATION),
errmsg_internal("invalid transaction ID in streamed replication transaction")));
- set_apply_error_context_xact(stream_xid, 0);
+ set_apply_error_context_xact(stream_xid, InvalidXLogRecPtr, 0);
/*
* Initialize the worker's stream_fileset if we haven't yet. This will be
@@ -1215,7 +1225,7 @@ apply_handle_stream_abort(StringInfo s)
*/
if (xid == subxid)
{
- set_apply_error_context_xact(xid, 0);
+ set_apply_error_context_xact(xid, InvalidXLogRecPtr, 0);
stream_cleanup_files(MyLogicalRepWorker->subid, xid);
}
else
@@ -1241,7 +1251,7 @@ apply_handle_stream_abort(StringInfo s)
bool found = false;
char path[MAXPGPATH];
- set_apply_error_context_xact(subxid, 0);
+ set_apply_error_context_xact(subxid, InvalidXLogRecPtr, 0);
subidx = -1;
begin_replication_step();
@@ -1426,7 +1436,7 @@ apply_handle_stream_commit(StringInfo s)
errmsg_internal("STREAM COMMIT message without STREAM STOP")));
xid = logicalrep_read_stream_commit(s, &commit_data);
- set_apply_error_context_xact(xid, commit_data.committime);
+ set_apply_error_context_xact(xid, commit_data.commit_lsn, commit_data.committime);
elog(DEBUG1, "received commit for streamed transaction %u", xid);
@@ -3507,6 +3517,17 @@ ApplyWorkerMain(Datum main_arg)
myslotname = MemoryContextStrdup(ApplyContext, syncslotname);
pfree(syncslotname);
+
+ /*
+ * Allocate the origin name in long-lived context for error context
+ * message
+ */
+ ReplicationOriginNameForTablesync(MySubscription->oid,
+ MyLogicalRepWorker->relid,
+ originname,
+ sizeof(originname));
+ apply_error_callback_arg.origin_name = MemoryContextStrdup(ApplyContext,
+ originname);
}
else
{
@@ -3550,6 +3571,13 @@ ApplyWorkerMain(Datum main_arg)
* does some initializations on the upstream so let's still call it.
*/
(void) walrcv_identify_system(LogRepWorkerWalRcvConn, &startpointTLI);
+
+ /*
+ * Allocate the origin name in long-lived context for error context
+ * message
+ */
+ apply_error_callback_arg.origin_name = MemoryContextStrdup(ApplyContext,
+ originname);
}
/*
@@ -3673,33 +3701,40 @@ apply_error_callback(void *arg)
errcontext("processing remote data during \"%s\"",
logicalrep_message_type(errarg->command));
else
- errcontext("processing remote data during \"%s\" in transaction %u at %s",
+ errcontext("processing remote data during \"%s\" in transaction %u committed at LSN %X/%X and timestamp %s from replication origin \"%s\"",
logicalrep_message_type(errarg->command),
errarg->remote_xid,
- (errarg->ts != 0) ? timestamptz_to_str(errarg->ts) : "(not-set)");
+ LSN_FORMAT_ARGS(errarg->commit_lsn),
+ (errarg->ts != 0) ? timestamptz_to_str(errarg->ts) : "(not-set)",
+ errarg->origin_name);
}
else if (errarg->remote_attnum < 0)
- errcontext("processing remote data during \"%s\" for replication target relation \"%s.%s\" in transaction %u at %s",
+ errcontext("processing remote data during \"%s\" for replication target relation \"%s.%s\" in transaction %u committed at LSN %X/%X and timestamp %s from replication origin \"%s\"",
logicalrep_message_type(errarg->command),
errarg->rel->remoterel.nspname,
errarg->rel->remoterel.relname,
errarg->remote_xid,
- (errarg->ts != 0) ? timestamptz_to_str(errarg->ts) : "(not-set)");
+ LSN_FORMAT_ARGS(errarg->commit_lsn),
+ (errarg->ts != 0) ? timestamptz_to_str(errarg->ts) : "(not-set)",
+ errarg->origin_name);
else
- errcontext("processing remote data during \"%s\" for replication target relation \"%s.%s\" column \"%s\" in transaction %u at %s",
+ errcontext("processing remote data during \"%s\" for replication target relation \"%s.%s\" column \"%s\" in transaction %u committed at LSN %X/%X and timestamp %s from replication origin \"%s\"",
logicalrep_message_type(errarg->command),
errarg->rel->remoterel.nspname,
errarg->rel->remoterel.relname,
errarg->rel->remoterel.attnames[errarg->remote_attnum],
errarg->remote_xid,
- (errarg->ts != 0) ? timestamptz_to_str(errarg->ts) : "(not-set)");
+ LSN_FORMAT_ARGS(errarg->commit_lsn),
+ (errarg->ts != 0) ? timestamptz_to_str(errarg->ts) : "(not-set)",
+ errarg->origin_name);
}
/* Set transaction information of apply error callback */
static inline void
-set_apply_error_context_xact(TransactionId xid, TimestampTz ts)
+set_apply_error_context_xact(TransactionId xid, XLogRecPtr lsn, TimestampTz ts)
{
apply_error_callback_arg.remote_xid = xid;
+ apply_error_callback_arg.commit_lsn = lsn;
apply_error_callback_arg.ts = ts;
}
@@ -3710,5 +3745,5 @@ reset_apply_error_context_info(void)
apply_error_callback_arg.command = 0;
apply_error_callback_arg.rel = NULL;
apply_error_callback_arg.remote_attnum = -1;
- set_apply_error_context_xact(InvalidTransactionId, 0);
+ set_apply_error_context_xact(InvalidTransactionId, InvalidXLogRecPtr, 0);
}
--
2.24.3 (Apple Git-128)
0001-Use-complete-sentences-in-logical-replication-worker.patchapplication/x-patch; name=0001-Use-complete-sentences-in-logical-replication-worker.patchDownload
From fd5b78993d0e73144ceedad6dce29cf641eb06ed Mon Sep 17 00:00:00 2001
From: Masahiko Sawada <sawada.mshk@gmail.com>
Date: Mon, 28 Feb 2022 17:53:28 +0900
Subject: [PATCH 1/2] Use complete sentences in logical replication worker
errcontext.
Previously, the message for logical replication worker errcontext is
incrementally built, which was not translation friendly. Instead, we
use complete sentences with if-else branches.
---
src/backend/replication/logical/worker.c | 49 ++++++++++++------------
1 file changed, 24 insertions(+), 25 deletions(-)
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 5d9acc6173..ac49e73b45 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -3662,38 +3662,37 @@ IsLogicalWorker(void)
static void
apply_error_callback(void *arg)
{
- StringInfoData buf;
ApplyErrorCallbackArg *errarg = &apply_error_callback_arg;
if (apply_error_callback_arg.command == 0)
return;
- initStringInfo(&buf);
- appendStringInfo(&buf, _("processing remote data during \"%s\""),
- logicalrep_message_type(errarg->command));
-
- /* append relation information */
- if (errarg->rel)
- {
- appendStringInfo(&buf, _(" for replication target relation \"%s.%s\""),
- errarg->rel->remoterel.nspname,
- errarg->rel->remoterel.relname);
- if (errarg->remote_attnum >= 0)
- appendStringInfo(&buf, _(" column \"%s\""),
- errarg->rel->remoterel.attnames[errarg->remote_attnum]);
- }
-
- /* append transaction information */
- if (TransactionIdIsNormal(errarg->remote_xid))
+ if (errarg->rel == NULL)
{
- appendStringInfo(&buf, _(" in transaction %u"), errarg->remote_xid);
- if (errarg->ts != 0)
- appendStringInfo(&buf, _(" at %s"),
- timestamptz_to_str(errarg->ts));
+ if (!TransactionIdIsValid(errarg->remote_xid))
+ errcontext("processing remote data during \"%s\"",
+ logicalrep_message_type(errarg->command));
+ else
+ errcontext("processing remote data during \"%s\" in transaction %u at %s",
+ logicalrep_message_type(errarg->command),
+ errarg->remote_xid,
+ (errarg->ts != 0) ? timestamptz_to_str(errarg->ts) : "(not-set)");
}
-
- errcontext("%s", buf.data);
- pfree(buf.data);
+ else if (errarg->remote_attnum < 0)
+ errcontext("processing remote data during \"%s\" for replication target relation \"%s.%s\" in transaction %u at %s",
+ logicalrep_message_type(errarg->command),
+ errarg->rel->remoterel.nspname,
+ errarg->rel->remoterel.relname,
+ errarg->remote_xid,
+ (errarg->ts != 0) ? timestamptz_to_str(errarg->ts) : "(not-set)");
+ else
+ errcontext("processing remote data during \"%s\" for replication target relation \"%s.%s\" column \"%s\" in transaction %u at %s",
+ logicalrep_message_type(errarg->command),
+ errarg->rel->remoterel.nspname,
+ errarg->rel->remoterel.relname,
+ errarg->rel->remoterel.attnames[errarg->remote_attnum],
+ errarg->remote_xid,
+ (errarg->ts != 0) ? timestamptz_to_str(errarg->ts) : "(not-set)");
}
/* Set transaction information of apply error callback */
--
2.24.3 (Apple Git-128)
On Mon, Feb 28, 2022 at 5:46 PM Masahiko Sawada <sawada.mshk@gmail.com> wrote:
We've added some information such as the command and the timestamp to
the error context message by commit abc0910e2. This patch adds further
information to it: replication origin name and commit-LSN.This will be helpful for users to set the origin name and LSN to
pg_replication_origin_advance().
+1. This will make the use of pg_replication_origin_advance() relatively easy.
--
With Regards,
Amit Kapila.
On Mon, Feb 28, 2022 at 11:16 PM Masahiko Sawada <sawada.mshk@gmail.com> wrote:
Hia,
We've added some information such as the command and the timestamp to
the error context message by commit abc0910e2. This patch adds further
information to it: replication origin name and commit-LSN.This will be helpful for users to set the origin name and LSN to
pg_replication_origin_advance().The errcontext message would become like follows:
*Before
ERROR: duplicate key value violates unique constraint "test_pkey"
DETAIL: Key (c)=(1) already exists.
CONTEXT: processing remote data during "INSERT" for replication
target relation "public.test" in transaction 726 at 2022-02-28
20:59:56.005909+09* After
ERROR: duplicate key value violates unique constraint "test_pkey"
DETAIL: Key (c)=(1) already exists.
CONTEXT: processing remote data during "INSERT" for replication
target relation "public.test" in transaction 726 committed at LSN
0/14BFA88 and timestamp 2022-02-28 20:58:27.964238+09 from replication
origin "pg_16395"I'm a bit concerned that the message may be too long.
If you are willing to use abbreviations instead of full
words/sentences perhaps you can shorten the long CONTEXT part like
below?
Before:
CONTEXT: processing remote data during "INSERT" for replication
target relation "public.test" in transaction 726 committed at LSN
0/14BFA88 and timestamp 2022-02-28 20:58:27.964238+09 from
replication origin "pg_16395"
After:
CONTEXT: processing remote data during "INSERT" for replication target
relation "public.test" (txid 726, LSN 0/14BFA88, ts 2022-02-28
20:58:27.964238+09, origin "pg_16395")
------
Kind Regards,
Peter Smith.
Fujitsu Australia.
On Mon, Feb 28, 2022 at 5:46 PM Masahiko Sawada <sawada.mshk@gmail.com> wrote:
I've attached two patches: the first one changes
apply_error_callback() so that it uses complete sentences with if-else
blocks in order to have a translation work,
This is an improvement over what we have now but I think this is still
not completely correct as per message translation rules:
+ else
+ errcontext("processing remote data during \"%s\" in transaction %u at %s",
+ logicalrep_message_type(errarg->command),
+ errarg->remote_xid,
+ (errarg->ts != 0) ? timestamptz_to_str(errarg->ts) : "(not-set)");
As per guidelines [1]https://www.postgresql.org/docs/devel/nls-programmer.html#NLS-GUIDELINES[2]Do not construct sentences at run-time, like: printf("Files were %s.\n", flag ? "copied" : "removed"); The word order within the sentence might be different in other languages., we don't prefer to construct messages at
run-time aka we can do better for the following part: + (errarg->ts
!= 0) ? timestamptz_to_str(errarg->ts) : "(not-set)". I think we need
to use if-else here to split it further. If you agree, then the same
needs to be dealt with in other parts of the patch as well.
[1]: https://www.postgresql.org/docs/devel/nls-programmer.html#NLS-GUIDELINES
[2]: Do not construct sentences at run-time, like: printf("Files were %s.\n", flag ? "copied" : "removed"); The word order within the sentence might be different in other languages.
printf("Files were %s.\n", flag ? "copied" : "removed");
The word order within the sentence might be different in other languages.
--
With Regards,
Amit Kapila.
On Wed, Mar 2, 2022 at 8:25 AM Peter Smith <smithpb2250@gmail.com> wrote:
On Mon, Feb 28, 2022 at 11:16 PM Masahiko Sawada <sawada.mshk@gmail.com> wrote:
The errcontext message would become like follows:
*Before
ERROR: duplicate key value violates unique constraint "test_pkey"
DETAIL: Key (c)=(1) already exists.
CONTEXT: processing remote data during "INSERT" for replication
target relation "public.test" in transaction 726 at 2022-02-28
20:59:56.005909+09* After
ERROR: duplicate key value violates unique constraint "test_pkey"
DETAIL: Key (c)=(1) already exists.
CONTEXT: processing remote data during "INSERT" for replication
target relation "public.test" in transaction 726 committed at LSN
0/14BFA88 and timestamp 2022-02-28 20:58:27.964238+09 from replication
origin "pg_16395"I'm a bit concerned that the message may be too long.
If you are willing to use abbreviations instead of full
words/sentences perhaps you can shorten the long CONTEXT part like
below?Before:
CONTEXT: processing remote data during "INSERT" for replication
target relation "public.test" in transaction 726 committed at LSN
0/14BFA88 and timestamp 2022-02-28 20:58:27.964238+09 from
replication origin "pg_16395"After:
CONTEXT: processing remote data during "INSERT" for replication target
relation "public.test" (txid 726, LSN 0/14BFA88, ts 2022-02-28
20:58:27.964238+09, origin "pg_16395")
I am wondering whether we can avoid having a timestamp in the message?
If one wants, it can be retrieved from the errors otherwise as well.
For example, I see the below error in my machine:
2022-02-26 07:45:25.092 IST [17644] ERROR: duplicate key value
violates unique constraint "t1_pkey"
2022-02-26 07:45:25.092 IST [17644] DETAIL: Key (c1)=(1) already exists.
2022-02-26 07:45:25.092 IST [17644] CONTEXT: processing remote data
during "INSERT" for replication target relation "public.t1" in
transaction 724 at 2022-02-26 07:45:09.083848+05:30
Now, here, won't the time at the starting of CONTEXT serves our
purpose. If we can remove the timestamp, I think the message won't
appear too long. What do you think?
--
With Regards,
Amit Kapila.
On Wed, Mar 2, 2022 at 12:21 PM Amit Kapila <amit.kapila16@gmail.com> wrote:
On Wed, Mar 2, 2022 at 8:25 AM Peter Smith <smithpb2250@gmail.com> wrote:
On Mon, Feb 28, 2022 at 11:16 PM Masahiko Sawada <sawada.mshk@gmail.com> wrote:
The errcontext message would become like follows:
*Before
ERROR: duplicate key value violates unique constraint "test_pkey"
DETAIL: Key (c)=(1) already exists.
CONTEXT: processing remote data during "INSERT" for replication
target relation "public.test" in transaction 726 at 2022-02-28
20:59:56.005909+09* After
ERROR: duplicate key value violates unique constraint "test_pkey"
DETAIL: Key (c)=(1) already exists.
CONTEXT: processing remote data during "INSERT" for replication
target relation "public.test" in transaction 726 committed at LSN
0/14BFA88 and timestamp 2022-02-28 20:58:27.964238+09 from replication
origin "pg_16395"I'm a bit concerned that the message may be too long.
If you are willing to use abbreviations instead of full
words/sentences perhaps you can shorten the long CONTEXT part like
below?Before:
CONTEXT: processing remote data during "INSERT" for replication
target relation "public.test" in transaction 726 committed at LSN
0/14BFA88 and timestamp 2022-02-28 20:58:27.964238+09 from
replication origin "pg_16395"After:
CONTEXT: processing remote data during "INSERT" for replication target
relation "public.test" (txid 726, LSN 0/14BFA88, ts 2022-02-28
20:58:27.964238+09, origin "pg_16395")I am wondering whether we can avoid having a timestamp in the message?
If one wants, it can be retrieved from the errors otherwise as well.
For example, I see the below error in my machine:
2022-02-26 07:45:25.092 IST [17644] ERROR: duplicate key value
violates unique constraint "t1_pkey"
2022-02-26 07:45:25.092 IST [17644] DETAIL: Key (c1)=(1) already exists.
2022-02-26 07:45:25.092 IST [17644] CONTEXT: processing remote data
during "INSERT" for replication target relation "public.t1" in
transaction 724 at 2022-02-26 07:45:09.083848+05:30Now, here, won't the time at the starting of CONTEXT serves our
purpose. If we can remove the timestamp, I think the message won't
appear too long. What do you think?
The time of the CONTEXT log message and the time in the message would
largely vary when the subscriber is much behind the publisher. But I
basically agree that the timestamp in the message might not be
important, at least for now. If we will support conflict resolution
that resolves based on the commit timestamp in the future, we might
want it again.
Regards,
--
Masahiko Sawada
EDB: https://www.enterprisedb.com/
On Wed, Mar 2, 2022 at 11:55 AM Amit Kapila <amit.kapila16@gmail.com> wrote:
On Mon, Feb 28, 2022 at 5:46 PM Masahiko Sawada <sawada.mshk@gmail.com> wrote:
I've attached two patches: the first one changes
apply_error_callback() so that it uses complete sentences with if-else
blocks in order to have a translation work,This is an improvement over what we have now but I think this is still not completely correct as per message translation rules: + else + errcontext("processing remote data during \"%s\" in transaction %u at %s", + logicalrep_message_type(errarg->command), + errarg->remote_xid, + (errarg->ts != 0) ? timestamptz_to_str(errarg->ts) : "(not-set)");As per guidelines [1][2], we don't prefer to construct messages at
run-time aka we can do better for the following part: + (errarg->ts
!= 0) ? timestamptz_to_str(errarg->ts) : "(not-set)". I think we need
to use if-else here to split it further. If you agree, then the same
needs to be dealt with in other parts of the patch as well.
I intended to use "(not-set)" as a value rather than a word in the
sentence so I think it doesn't violate the guidelines. We can split it
further as you suggested but we will end up having more if-else
branches.
Regards,
--
Masahiko Sawada
EDB: https://www.enterprisedb.com/
At Wed, 2 Mar 2022 14:39:54 +0900, Masahiko Sawada <sawada.mshk@gmail.com> wrote in
On Wed, Mar 2, 2022 at 11:55 AM Amit Kapila <amit.kapila16@gmail.com> wrote:
On Mon, Feb 28, 2022 at 5:46 PM Masahiko Sawada <sawada.mshk@gmail.com> wrote:
I've attached two patches: the first one changes
apply_error_callback() so that it uses complete sentences with if-else
blocks in order to have a translation work,This is an improvement over what we have now but I think this is still not completely correct as per message translation rules: + else + errcontext("processing remote data during \"%s\" in transaction %u at %s", + logicalrep_message_type(errarg->command), + errarg->remote_xid, + (errarg->ts != 0) ? timestamptz_to_str(errarg->ts) : "(not-set)");As per guidelines [1][2], we don't prefer to construct messages at
run-time aka we can do better for the following part: + (errarg->ts
!= 0) ? timestamptz_to_str(errarg->ts) : "(not-set)". I think we need
to use if-else here to split it further. If you agree, then the same
needs to be dealt with in other parts of the patch as well.I intended to use "(not-set)" as a value rather than a word in the
sentence so I think it doesn't violate the guidelines. We can split it
further as you suggested but we will end up having more if-else
branches.
It seems to me exactly our way. In the first place I doubt
"(not-set)" fits the place for timestamp even in English.
Moreover, if we (I?) translated the message into Japanese, it would
look like;
CONTEXT: (not-set)にトランザクション 2352314 内で "TRUNCATE" でのリモートデータの処理中
I don't think that looks fine. Translating "(not-set)" makes things
even worse.
CONTEXT: (非設定)にトランザクション 2352314 内で "TRUNCATE" でのリモートデータの処理中
Yes, I can alleviate that strangeness a bit by modulating it, but it
still looks odd.
CONTEXT: 時刻(非設定)、トランザクション 2352314 内で "TRUNCATE" でのリモートデータの処理中
Rather, I'd prefer simply to list the attributes.
CONTEXT: processing remote data during "MESSAGE". Transaction (unknown). Time (unknown), replication target relation (unknown), column (unknown)
CONTEXT: "MESSAGE"でのリモートデータの処理中. トランザクション (不明). 時刻 (不明), レプリケーション対象リレーション (不明), column (不明)
regards.
--
Kyotaro Horiguchi
NTT Open Source Software Center
On Wed, Mar 2, 2022 at 9:33 AM Masahiko Sawada <sawada.mshk@gmail.com> wrote:
On Wed, Mar 2, 2022 at 12:21 PM Amit Kapila <amit.kapila16@gmail.com> wrote:
On Wed, Mar 2, 2022 at 8:25 AM Peter Smith <smithpb2250@gmail.com> wrote:
On Mon, Feb 28, 2022 at 11:16 PM Masahiko Sawada <sawada.mshk@gmail.com> wrote:
The errcontext message would become like follows:
*Before
ERROR: duplicate key value violates unique constraint "test_pkey"
DETAIL: Key (c)=(1) already exists.
CONTEXT: processing remote data during "INSERT" for replication
target relation "public.test" in transaction 726 at 2022-02-28
20:59:56.005909+09* After
ERROR: duplicate key value violates unique constraint "test_pkey"
DETAIL: Key (c)=(1) already exists.
CONTEXT: processing remote data during "INSERT" for replication
target relation "public.test" in transaction 726 committed at LSN
0/14BFA88 and timestamp 2022-02-28 20:58:27.964238+09 from replication
origin "pg_16395"I'm a bit concerned that the message may be too long.
If you are willing to use abbreviations instead of full
words/sentences perhaps you can shorten the long CONTEXT part like
below?Before:
CONTEXT: processing remote data during "INSERT" for replication
target relation "public.test" in transaction 726 committed at LSN
0/14BFA88 and timestamp 2022-02-28 20:58:27.964238+09 from
replication origin "pg_16395"After:
CONTEXT: processing remote data during "INSERT" for replication target
relation "public.test" (txid 726, LSN 0/14BFA88, ts 2022-02-28
20:58:27.964238+09, origin "pg_16395")I am wondering whether we can avoid having a timestamp in the message?
If one wants, it can be retrieved from the errors otherwise as well.
For example, I see the below error in my machine:
2022-02-26 07:45:25.092 IST [17644] ERROR: duplicate key value
violates unique constraint "t1_pkey"
2022-02-26 07:45:25.092 IST [17644] DETAIL: Key (c1)=(1) already exists.
2022-02-26 07:45:25.092 IST [17644] CONTEXT: processing remote data
during "INSERT" for replication target relation "public.t1" in
transaction 724 at 2022-02-26 07:45:09.083848+05:30Now, here, won't the time at the starting of CONTEXT serves our
purpose. If we can remove the timestamp, I think the message won't
appear too long. What do you think?The time of the CONTEXT log message and the time in the message would
largely vary when the subscriber is much behind the publisher. But I
basically agree that the timestamp in the message might not be
important, at least for now. If we will support conflict resolution
that resolves based on the commit timestamp in the future, we might
want it again.
Possible, but let's remove it for now as it will simplify the message
and the need for additional branches. What do you think?
--
With Regards,
Amit Kapila.
On Wed, Mar 2, 2022 at 4:14 PM Amit Kapila <amit.kapila16@gmail.com> wrote:
On Wed, Mar 2, 2022 at 9:33 AM Masahiko Sawada <sawada.mshk@gmail.com> wrote:
On Wed, Mar 2, 2022 at 12:21 PM Amit Kapila <amit.kapila16@gmail.com> wrote:
On Wed, Mar 2, 2022 at 8:25 AM Peter Smith <smithpb2250@gmail.com> wrote:
On Mon, Feb 28, 2022 at 11:16 PM Masahiko Sawada <sawada.mshk@gmail.com> wrote:
The errcontext message would become like follows:
*Before
ERROR: duplicate key value violates unique constraint "test_pkey"
DETAIL: Key (c)=(1) already exists.
CONTEXT: processing remote data during "INSERT" for replication
target relation "public.test" in transaction 726 at 2022-02-28
20:59:56.005909+09* After
ERROR: duplicate key value violates unique constraint "test_pkey"
DETAIL: Key (c)=(1) already exists.
CONTEXT: processing remote data during "INSERT" for replication
target relation "public.test" in transaction 726 committed at LSN
0/14BFA88 and timestamp 2022-02-28 20:58:27.964238+09 from replication
origin "pg_16395"I'm a bit concerned that the message may be too long.
If you are willing to use abbreviations instead of full
words/sentences perhaps you can shorten the long CONTEXT part like
below?Before:
CONTEXT: processing remote data during "INSERT" for replication
target relation "public.test" in transaction 726 committed at LSN
0/14BFA88 and timestamp 2022-02-28 20:58:27.964238+09 from
replication origin "pg_16395"After:
CONTEXT: processing remote data during "INSERT" for replication target
relation "public.test" (txid 726, LSN 0/14BFA88, ts 2022-02-28
20:58:27.964238+09, origin "pg_16395")I am wondering whether we can avoid having a timestamp in the message?
If one wants, it can be retrieved from the errors otherwise as well.
For example, I see the below error in my machine:
2022-02-26 07:45:25.092 IST [17644] ERROR: duplicate key value
violates unique constraint "t1_pkey"
2022-02-26 07:45:25.092 IST [17644] DETAIL: Key (c1)=(1) already exists.
2022-02-26 07:45:25.092 IST [17644] CONTEXT: processing remote data
during "INSERT" for replication target relation "public.t1" in
transaction 724 at 2022-02-26 07:45:09.083848+05:30Now, here, won't the time at the starting of CONTEXT serves our
purpose. If we can remove the timestamp, I think the message won't
appear too long. What do you think?The time of the CONTEXT log message and the time in the message would
largely vary when the subscriber is much behind the publisher. But I
basically agree that the timestamp in the message might not be
important, at least for now. If we will support conflict resolution
that resolves based on the commit timestamp in the future, we might
want it again.Possible, but let's remove it for now as it will simplify the message
and the need for additional branches. What do you think?
Agreed.
I've attached updated patches.
Regards,
--
Masahiko Sawada
EDB: https://www.enterprisedb.com/
Attachments:
v2-0001-Use-complete-sentences-in-logical-replication-wor.patchapplication/octet-stream; name=v2-0001-Use-complete-sentences-in-logical-replication-wor.patchDownload
From 1b318de52a9b157238467b45f52657f86157518b Mon Sep 17 00:00:00 2001
From: Masahiko Sawada <sawada.mshk@gmail.com>
Date: Mon, 28 Feb 2022 17:53:28 +0900
Subject: [PATCH v2 1/3] Use complete sentences in logical replication worker
errcontext.
Previously, the message for logical replication worker errcontext is
incrementally built, which was not translation friendly. Instead, we
use complete sentences with if-else branches.
We also remove the commit timestamp from the context message since
it's not an important information and made the message long.
---
src/backend/replication/logical/worker.c | 73 +++++++++++-------------
1 file changed, 33 insertions(+), 40 deletions(-)
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 7e267f7960..92aa794706 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -226,7 +226,6 @@ typedef struct ApplyErrorCallbackArg
/* Remote node information */
int remote_attnum; /* -1 if invalid */
TransactionId remote_xid;
- TimestampTz ts; /* commit, rollback, or prepare timestamp */
} ApplyErrorCallbackArg;
static ApplyErrorCallbackArg apply_error_callback_arg =
@@ -235,7 +234,6 @@ static ApplyErrorCallbackArg apply_error_callback_arg =
.rel = NULL,
.remote_attnum = -1,
.remote_xid = InvalidTransactionId,
- .ts = 0,
};
static MemoryContext ApplyMessageContext = NULL;
@@ -334,7 +332,7 @@ static void apply_spooled_messages(TransactionId xid, XLogRecPtr lsn);
/* Functions for apply error callback */
static void apply_error_callback(void *arg);
-static inline void set_apply_error_context_xact(TransactionId xid, TimestampTz ts);
+static inline void set_apply_error_context_xact(TransactionId xid);
static inline void reset_apply_error_context_info(void);
/*
@@ -787,7 +785,7 @@ apply_handle_begin(StringInfo s)
LogicalRepBeginData begin_data;
logicalrep_read_begin(s, &begin_data);
- set_apply_error_context_xact(begin_data.xid, begin_data.committime);
+ set_apply_error_context_xact(begin_data.xid);
remote_final_lsn = begin_data.final_lsn;
@@ -839,7 +837,7 @@ apply_handle_begin_prepare(StringInfo s)
errmsg_internal("tablesync worker received a BEGIN PREPARE message")));
logicalrep_read_begin_prepare(s, &begin_data);
- set_apply_error_context_xact(begin_data.xid, begin_data.prepare_time);
+ set_apply_error_context_xact(begin_data.xid);
remote_final_lsn = begin_data.prepare_lsn;
@@ -938,7 +936,7 @@ apply_handle_commit_prepared(StringInfo s)
char gid[GIDSIZE];
logicalrep_read_commit_prepared(s, &prepare_data);
- set_apply_error_context_xact(prepare_data.xid, prepare_data.commit_time);
+ set_apply_error_context_xact(prepare_data.xid);
/* Compute GID for two_phase transactions. */
TwoPhaseTransactionGid(MySubscription->oid, prepare_data.xid,
@@ -979,7 +977,7 @@ apply_handle_rollback_prepared(StringInfo s)
char gid[GIDSIZE];
logicalrep_read_rollback_prepared(s, &rollback_data);
- set_apply_error_context_xact(rollback_data.xid, rollback_data.rollback_time);
+ set_apply_error_context_xact(rollback_data.xid);
/* Compute GID for two_phase transactions. */
TwoPhaseTransactionGid(MySubscription->oid, rollback_data.xid,
@@ -1044,7 +1042,7 @@ apply_handle_stream_prepare(StringInfo s)
errmsg_internal("tablesync worker received a STREAM PREPARE message")));
logicalrep_read_stream_prepare(s, &prepare_data);
- set_apply_error_context_xact(prepare_data.xid, prepare_data.prepare_time);
+ set_apply_error_context_xact(prepare_data.xid);
elog(DEBUG1, "received prepare for streamed transaction %u", prepare_data.xid);
@@ -1126,7 +1124,7 @@ apply_handle_stream_start(StringInfo s)
(errcode(ERRCODE_PROTOCOL_VIOLATION),
errmsg_internal("invalid transaction ID in streamed replication transaction")));
- set_apply_error_context_xact(stream_xid, 0);
+ set_apply_error_context_xact(stream_xid);
/*
* Initialize the worker's stream_fileset if we haven't yet. This will be
@@ -1215,7 +1213,7 @@ apply_handle_stream_abort(StringInfo s)
*/
if (xid == subxid)
{
- set_apply_error_context_xact(xid, 0);
+ set_apply_error_context_xact(xid);
stream_cleanup_files(MyLogicalRepWorker->subid, xid);
}
else
@@ -1241,7 +1239,7 @@ apply_handle_stream_abort(StringInfo s)
bool found = false;
char path[MAXPGPATH];
- set_apply_error_context_xact(subxid, 0);
+ set_apply_error_context_xact(subxid);
subidx = -1;
begin_replication_step();
@@ -1426,7 +1424,7 @@ apply_handle_stream_commit(StringInfo s)
errmsg_internal("STREAM COMMIT message without STREAM STOP")));
xid = logicalrep_read_stream_commit(s, &commit_data);
- set_apply_error_context_xact(xid, commit_data.committime);
+ set_apply_error_context_xact(xid);
elog(DEBUG1, "received commit for streamed transaction %u", xid);
@@ -3648,46 +3646,41 @@ IsLogicalWorker(void)
static void
apply_error_callback(void *arg)
{
- StringInfoData buf;
ApplyErrorCallbackArg *errarg = &apply_error_callback_arg;
if (apply_error_callback_arg.command == 0)
return;
- initStringInfo(&buf);
- appendStringInfo(&buf, _("processing remote data during \"%s\""),
- logicalrep_message_type(errarg->command));
-
- /* append relation information */
- if (errarg->rel)
- {
- appendStringInfo(&buf, _(" for replication target relation \"%s.%s\""),
- errarg->rel->remoterel.nspname,
- errarg->rel->remoterel.relname);
- if (errarg->remote_attnum >= 0)
- appendStringInfo(&buf, _(" column \"%s\""),
- errarg->rel->remoterel.attnames[errarg->remote_attnum]);
- }
-
- /* append transaction information */
- if (TransactionIdIsNormal(errarg->remote_xid))
+ if (errarg->rel == NULL)
{
- appendStringInfo(&buf, _(" in transaction %u"), errarg->remote_xid);
- if (errarg->ts != 0)
- appendStringInfo(&buf, _(" at %s"),
- timestamptz_to_str(errarg->ts));
+ if (!TransactionIdIsValid(errarg->remote_xid))
+ errcontext("processing remote data during \"%s\"",
+ logicalrep_message_type(errarg->command));
+ else
+ errcontext("processing remote data during \"%s\" in transaction %u",
+ logicalrep_message_type(errarg->command),
+ errarg->remote_xid);
}
-
- errcontext("%s", buf.data);
- pfree(buf.data);
+ else if (errarg->remote_attnum < 0)
+ errcontext("processing remote data during \"%s\" for replication target relation \"%s.%s\" in transaction %u",
+ logicalrep_message_type(errarg->command),
+ errarg->rel->remoterel.nspname,
+ errarg->rel->remoterel.relname,
+ errarg->remote_xid);
+ else
+ errcontext("processing remote data during \"%s\" for replication target relation \"%s.%s\" column \"%s\" in transaction %u",
+ logicalrep_message_type(errarg->command),
+ errarg->rel->remoterel.nspname,
+ errarg->rel->remoterel.relname,
+ errarg->rel->remoterel.attnames[errarg->remote_attnum],
+ errarg->remote_xid);
}
/* Set transaction information of apply error callback */
static inline void
-set_apply_error_context_xact(TransactionId xid, TimestampTz ts)
+set_apply_error_context_xact(TransactionId xid)
{
apply_error_callback_arg.remote_xid = xid;
- apply_error_callback_arg.ts = ts;
}
/* Reset all information of apply error callback */
@@ -3697,5 +3690,5 @@ reset_apply_error_context_info(void)
apply_error_callback_arg.command = 0;
apply_error_callback_arg.rel = NULL;
apply_error_callback_arg.remote_attnum = -1;
- set_apply_error_context_xact(InvalidTransactionId, 0);
+ set_apply_error_context_xact(InvalidTransactionId);
}
--
2.24.3 (Apple Git-128)
v2-0002-Add-the-origin-name-and-remote-commit-LSN-to-logi.patchapplication/octet-stream; name=v2-0002-Add-the-origin-name-and-remote-commit-LSN-to-logi.patchDownload
From 69dd71c4bea9ad286041ff5230914880b2fa30a0 Mon Sep 17 00:00:00 2001
From: Masahiko Sawada <sawada.mshk@gmail.com>
Date: Thu, 24 Feb 2022 16:56:58 +0900
Subject: [PATCH v2 2/3] Add the origin name and remote commit-LSN to logical
replication worker errcontext.
This commits adds both the commit-LSN and replication origin name to
the existing error context message.
This will help users in specifying the origin name and commit-LSN to
pg_replication_origin_advance() SQL function to skip the particular transaction.
---
doc/src/sgml/logical-replication.sgml | 19 +++++--
src/backend/replication/logical/worker.c | 65 +++++++++++++++++-------
2 files changed, 61 insertions(+), 23 deletions(-)
diff --git a/doc/src/sgml/logical-replication.sgml b/doc/src/sgml/logical-replication.sgml
index fb4472356d..deb9d79b47 100644
--- a/doc/src/sgml/logical-replication.sgml
+++ b/doc/src/sgml/logical-replication.sgml
@@ -352,12 +352,21 @@
<para>
The resolution can be done either by changing data or permissions on the subscriber so
that it does not conflict with the incoming change or by skipping the
- transaction that conflicts with the existing data. The transaction can be
- skipped by calling the <link linkend="pg-replication-origin-advance">
+ transaction that conflicts with the existing data. When a conflict produces
+ an error, it is shown in the subscriber's server logs as follows:
+<screen>
+ERROR: duplicate key value violates unique constraint "test_pkey"
+DETAIL: Key (c)=(1) already exists.
+CONTEXT: processing remote data during "INSERT" for replication target relation "public.test" in transaction 725 committed at LSN 0/14BFA88 from replication origin "pg_16395"
+</screen>
+ The LSN of the transaction that contains the change violating the constraint and
+ the replication origin name can be found from those outputs (LSN 0/14C0378 and
+ replication origin <literal>pg_16395</literal> in the above case). The transaction
+ can be skipped by calling the <link linkend="pg-replication-origin-advance">
<function>pg_replication_origin_advance()</function></link> function with
- a <parameter>node_name</parameter> corresponding to the subscription name,
- and a position. The current position of origins can be seen in the
- <link linkend="view-pg-replication-origin-status">
+ the <parameter>node_name</parameter> and the next LSN of the commit LSN
+ (i.e., 0/14C0379) from those outputs. The current position of origins can be
+ seen in the <link linkend="view-pg-replication-origin-status">
<structname>pg_replication_origin_status</structname></link> system view.
</para>
</sect1>
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 92aa794706..ffaa82532b 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -226,6 +226,8 @@ typedef struct ApplyErrorCallbackArg
/* Remote node information */
int remote_attnum; /* -1 if invalid */
TransactionId remote_xid;
+ XLogRecPtr commit_lsn;
+ char *origin_name;
} ApplyErrorCallbackArg;
static ApplyErrorCallbackArg apply_error_callback_arg =
@@ -234,6 +236,8 @@ static ApplyErrorCallbackArg apply_error_callback_arg =
.rel = NULL,
.remote_attnum = -1,
.remote_xid = InvalidTransactionId,
+ .commit_lsn = InvalidXLogRecPtr,
+ .origin_name = NULL,
};
static MemoryContext ApplyMessageContext = NULL;
@@ -332,7 +336,7 @@ static void apply_spooled_messages(TransactionId xid, XLogRecPtr lsn);
/* Functions for apply error callback */
static void apply_error_callback(void *arg);
-static inline void set_apply_error_context_xact(TransactionId xid);
+static inline void set_apply_error_context_xact(TransactionId xid, XLogRecPtr lsn);
static inline void reset_apply_error_context_info(void);
/*
@@ -785,7 +789,7 @@ apply_handle_begin(StringInfo s)
LogicalRepBeginData begin_data;
logicalrep_read_begin(s, &begin_data);
- set_apply_error_context_xact(begin_data.xid);
+ set_apply_error_context_xact(begin_data.xid, begin_data.final_lsn);
remote_final_lsn = begin_data.final_lsn;
@@ -837,7 +841,7 @@ apply_handle_begin_prepare(StringInfo s)
errmsg_internal("tablesync worker received a BEGIN PREPARE message")));
logicalrep_read_begin_prepare(s, &begin_data);
- set_apply_error_context_xact(begin_data.xid);
+ set_apply_error_context_xact(begin_data.xid, begin_data.prepare_lsn);
remote_final_lsn = begin_data.prepare_lsn;
@@ -936,7 +940,7 @@ apply_handle_commit_prepared(StringInfo s)
char gid[GIDSIZE];
logicalrep_read_commit_prepared(s, &prepare_data);
- set_apply_error_context_xact(prepare_data.xid);
+ set_apply_error_context_xact(prepare_data.xid, prepare_data.commit_lsn);
/* Compute GID for two_phase transactions. */
TwoPhaseTransactionGid(MySubscription->oid, prepare_data.xid,
@@ -977,7 +981,7 @@ apply_handle_rollback_prepared(StringInfo s)
char gid[GIDSIZE];
logicalrep_read_rollback_prepared(s, &rollback_data);
- set_apply_error_context_xact(rollback_data.xid);
+ set_apply_error_context_xact(rollback_data.xid, rollback_data.rollback_end_lsn);
/* Compute GID for two_phase transactions. */
TwoPhaseTransactionGid(MySubscription->oid, rollback_data.xid,
@@ -1042,7 +1046,7 @@ apply_handle_stream_prepare(StringInfo s)
errmsg_internal("tablesync worker received a STREAM PREPARE message")));
logicalrep_read_stream_prepare(s, &prepare_data);
- set_apply_error_context_xact(prepare_data.xid);
+ set_apply_error_context_xact(prepare_data.xid, prepare_data.prepare_lsn);
elog(DEBUG1, "received prepare for streamed transaction %u", prepare_data.xid);
@@ -1124,7 +1128,7 @@ apply_handle_stream_start(StringInfo s)
(errcode(ERRCODE_PROTOCOL_VIOLATION),
errmsg_internal("invalid transaction ID in streamed replication transaction")));
- set_apply_error_context_xact(stream_xid);
+ set_apply_error_context_xact(stream_xid, InvalidXLogRecPtr);
/*
* Initialize the worker's stream_fileset if we haven't yet. This will be
@@ -1213,7 +1217,7 @@ apply_handle_stream_abort(StringInfo s)
*/
if (xid == subxid)
{
- set_apply_error_context_xact(xid);
+ set_apply_error_context_xact(xid, InvalidXLogRecPtr);
stream_cleanup_files(MyLogicalRepWorker->subid, xid);
}
else
@@ -1239,7 +1243,7 @@ apply_handle_stream_abort(StringInfo s)
bool found = false;
char path[MAXPGPATH];
- set_apply_error_context_xact(subxid);
+ set_apply_error_context_xact(subxid, InvalidXLogRecPtr);
subidx = -1;
begin_replication_step();
@@ -1424,7 +1428,7 @@ apply_handle_stream_commit(StringInfo s)
errmsg_internal("STREAM COMMIT message without STREAM STOP")));
xid = logicalrep_read_stream_commit(s, &commit_data);
- set_apply_error_context_xact(xid);
+ set_apply_error_context_xact(xid, commit_data.commit_lsn);
elog(DEBUG1, "received commit for streamed transaction %u", xid);
@@ -3499,6 +3503,17 @@ ApplyWorkerMain(Datum main_arg)
myslotname = MemoryContextStrdup(ApplyContext, syncslotname);
pfree(syncslotname);
+
+ /*
+ * Allocate the origin name in long-lived context for error context
+ * message
+ */
+ ReplicationOriginNameForTablesync(MySubscription->oid,
+ MyLogicalRepWorker->relid,
+ originname,
+ sizeof(originname));
+ apply_error_callback_arg.origin_name = MemoryContextStrdup(ApplyContext,
+ originname);
}
else
{
@@ -3542,6 +3557,13 @@ ApplyWorkerMain(Datum main_arg)
* does some initializations on the upstream so let's still call it.
*/
(void) walrcv_identify_system(LogRepWorkerWalRcvConn, &startpointTLI);
+
+ /*
+ * Allocate the origin name in long-lived context for error context
+ * message
+ */
+ apply_error_callback_arg.origin_name = MemoryContextStrdup(ApplyContext,
+ originname);
}
/*
@@ -3657,30 +3679,37 @@ apply_error_callback(void *arg)
errcontext("processing remote data during \"%s\"",
logicalrep_message_type(errarg->command));
else
- errcontext("processing remote data during \"%s\" in transaction %u",
+ errcontext("processing remote data during \"%s\" in transaction %u committed at LSN %X/%X from replication origin \"%s\"",
logicalrep_message_type(errarg->command),
- errarg->remote_xid);
+ errarg->remote_xid,
+ LSN_FORMAT_ARGS(errarg->commit_lsn),
+ errarg->origin_name);
}
else if (errarg->remote_attnum < 0)
- errcontext("processing remote data during \"%s\" for replication target relation \"%s.%s\" in transaction %u",
+ errcontext("processing remote data during \"%s\" for replication target relation \"%s.%s\" in transaction %u committed at LSN %X/%X from replication origin \"%s\"",
logicalrep_message_type(errarg->command),
errarg->rel->remoterel.nspname,
errarg->rel->remoterel.relname,
- errarg->remote_xid);
+ errarg->remote_xid,
+ LSN_FORMAT_ARGS(errarg->commit_lsn),
+ errarg->origin_name);
else
- errcontext("processing remote data during \"%s\" for replication target relation \"%s.%s\" column \"%s\" in transaction %u",
+ errcontext("processing remote data during \"%s\" for replication target relation \"%s.%s\" column \"%s\" in transaction %u committed at LSN %X/%X from replication origin \"%s\"",
logicalrep_message_type(errarg->command),
errarg->rel->remoterel.nspname,
errarg->rel->remoterel.relname,
errarg->rel->remoterel.attnames[errarg->remote_attnum],
- errarg->remote_xid);
+ errarg->remote_xid,
+ LSN_FORMAT_ARGS(errarg->commit_lsn),
+ errarg->origin_name);
}
/* Set transaction information of apply error callback */
static inline void
-set_apply_error_context_xact(TransactionId xid)
+set_apply_error_context_xact(TransactionId xid, XLogRecPtr lsn)
{
apply_error_callback_arg.remote_xid = xid;
+ apply_error_callback_arg.commit_lsn = lsn;
}
/* Reset all information of apply error callback */
@@ -3690,5 +3719,5 @@ reset_apply_error_context_info(void)
apply_error_callback_arg.command = 0;
apply_error_callback_arg.rel = NULL;
apply_error_callback_arg.remote_attnum = -1;
- set_apply_error_context_xact(InvalidTransactionId);
+ set_apply_error_context_xact(InvalidTransactionId, InvalidXLogRecPtr);
}
--
2.24.3 (Apple Git-128)
On Wed, Mar 2, 2022 at 4:07 PM Kyotaro Horiguchi
<horikyota.ntt@gmail.com> wrote:
At Wed, 2 Mar 2022 14:39:54 +0900, Masahiko Sawada <sawada.mshk@gmail.com> wrote in
On Wed, Mar 2, 2022 at 11:55 AM Amit Kapila <amit.kapila16@gmail.com> wrote:
On Mon, Feb 28, 2022 at 5:46 PM Masahiko Sawada <sawada.mshk@gmail.com> wrote:
I've attached two patches: the first one changes
apply_error_callback() so that it uses complete sentences with if-else
blocks in order to have a translation work,This is an improvement over what we have now but I think this is still not completely correct as per message translation rules: + else + errcontext("processing remote data during \"%s\" in transaction %u at %s", + logicalrep_message_type(errarg->command), + errarg->remote_xid, + (errarg->ts != 0) ? timestamptz_to_str(errarg->ts) : "(not-set)");As per guidelines [1][2], we don't prefer to construct messages at
run-time aka we can do better for the following part: + (errarg->ts
!= 0) ? timestamptz_to_str(errarg->ts) : "(not-set)". I think we need
to use if-else here to split it further. If you agree, then the same
needs to be dealt with in other parts of the patch as well.I intended to use "(not-set)" as a value rather than a word in the
sentence so I think it doesn't violate the guidelines. We can split it
further as you suggested but we will end up having more if-else
branches.It seems to me exactly our way. In the first place I doubt
"(not-set)" fits the place for timestamp even in English.Moreover, if we (I?) translated the message into Japanese, it would
look like;CONTEXT: (not-set)にトランザクション 2352314 内で "TRUNCATE" でのリモートデータの処理中
I don't think that looks fine. Translating "(not-set)" makes things
even worse.CONTEXT: (非設定)にトランザクション 2352314 内で "TRUNCATE" でのリモートデータの処理中
Yes, I can alleviate that strangeness a bit by modulating it, but it
still looks odd.
Indeed. But the timestamp is removed in the latest version patch.
CONTEXT: 時刻(非設定)、トランザクション 2352314 内で "TRUNCATE" でのリモートデータの処理中
Rather, I'd prefer simply to list the attributes.
CONTEXT: processing remote data during "MESSAGE". Transaction (unknown). Time (unknown), replication target relation (unknown), column (unknown)
CONTEXT: "MESSAGE"でのリモートデータの処理中. トランザクション (不明). 時刻 (不明), レプリケーション対象リレーション (不明), column (不明)
Peter Smith also seems to prefer this style. Looking at existing error
context messages, we use this list style in logical.c:
static void
output_plugin_error_callback(void *arg)
{
LogicalErrorCallbackState *state = (LogicalErrorCallbackState *) arg;
/* not all callbacks have an associated LSN */
if (state->report_location != InvalidXLogRecPtr)
errcontext("slot \"%s\", output plugin \"%s\", in the %s
callback, associated LSN %X/%X",
NameStr(state->ctx->slot->data.name),
NameStr(state->ctx->slot->data.plugin),
state->callback_name,
LSN_FORMAT_ARGS(state->report_location));
else
errcontext("slot \"%s\", output plugin \"%s\", in the %s callback",
NameStr(state->ctx->slot->data.name),
NameStr(state->ctx->slot->data.plugin),
state->callback_name);
}
Regards,
--
Masahiko Sawada
EDB: https://www.enterprisedb.com/
On Wed, Mar 2, 2022 at 1:05 PM Masahiko Sawada <sawada.mshk@gmail.com> wrote:
On Wed, Mar 2, 2022 at 4:14 PM Amit Kapila <amit.kapila16@gmail.com> wrote:
I've attached updated patches.
The first patch LGTM. Some comments on the second patch:
1.
@@ -3499,6 +3503,17 @@ ApplyWorkerMain(Datum main_arg)
myslotname = MemoryContextStrdup(ApplyContext, syncslotname);
pfree(syncslotname);
+
+ /*
+ * Allocate the origin name in long-lived context for error context
+ * message
+ */
+ ReplicationOriginNameForTablesync(MySubscription->oid,
+ MyLogicalRepWorker->relid,
+ originname,
+ sizeof(originname));
+ apply_error_callback_arg.origin_name = MemoryContextStrdup(ApplyContext,
+ originname);
Can we assign this in LogicalRepSyncTableStart() where we already
forming the origin name? That will avoid this extra call to
ReplicationOriginNameForTablesync.
2.
@@ -3657,30 +3679,37 @@ apply_error_callback(void *arg)
errcontext("processing remote data during \"%s\"",
logicalrep_message_type(errarg->command));
else
- errcontext("processing remote data during \"%s\" in transaction %u",
+ errcontext("processing remote data during \"%s\" in transaction %u
committed at LSN %X/%X from replication origin \"%s\"",
logicalrep_message_type(errarg->command),
- errarg->remote_xid);
+ errarg->remote_xid,
+ LSN_FORMAT_ARGS(errarg->commit_lsn),
+ errarg->origin_name);
Won't we set the origin name before the command? If so, it can be used
in the first message as well and we can change the condition in the
beginning such that if the origin or command is not set then we can
return without adding additional context information.
Isn't this error message used for rollback prepared failure as well?
If so, do we need to say "... finished at LSN ..." instead of "...
committed at LSN ..."?
The other point about this message is that saying ".... from
replication origin ..." sounds more like remote information similar to
publication but the origin is of the local node. How about slightly
changing it to "processing remote data for replication origin \"%s\"
during \"%s\" in transaction ..."?
3.
+</screen>
+ The LSN of the transaction that contains the change violating the
constraint and
+ the replication origin name can be found from those outputs (LSN
0/14C0378 and
+ replication origin <literal>pg_16395</literal> in the above case).
The transaction
+ can be skipped by calling the <link linkend="pg-replication-origin-advance">
<function>pg_replication_origin_advance()</function></link> function with
- a <parameter>node_name</parameter> corresponding to the subscription name,
- and a position. The current position of origins can be seen in the
- <link linkend="view-pg-replication-origin-status">
+ the <parameter>node_name</parameter> and the next LSN of the commit LSN
+ (i.e., 0/14C0379) from those outputs.
After node_name, can we specify origin_name similar to what we do for
LSN as that will make things more clear? Also, shall we mention that
users need to disable subscriptions to perform replication origin
advance?
I think for prepared transactions, the user might need to use it twice
because after skipping prepared xact, it will get an error again
during the processing of commit prepared (no such prepare exists). I
thought of mentioning it but felt that might lead to specifying too
many details which can confuse users as well. What do you think?
4. There are places in the patch like apply_handle_stream_start()
which sets commit_lsn in callback arg as InvalidXLogRecPtr but the
callback function seems to be assuming that it is always a valid
value. Shouldn't we need to avoid appending LSN for such cases?
--
With Regards,
Amit Kapila.
On Thu, Mar 3, 2022 at 3:37 PM Amit Kapila <amit.kapila16@gmail.com> wrote:
On Wed, Mar 2, 2022 at 1:05 PM Masahiko Sawada <sawada.mshk@gmail.com> wrote:
On Wed, Mar 2, 2022 at 4:14 PM Amit Kapila <amit.kapila16@gmail.com> wrote:
I've attached updated patches.
The first patch LGTM. Some comments on the second patch:
1.
@@ -3499,6 +3503,17 @@ ApplyWorkerMain(Datum main_arg)
myslotname = MemoryContextStrdup(ApplyContext, syncslotname);pfree(syncslotname); + + /* + * Allocate the origin name in long-lived context for error context + * message + */ + ReplicationOriginNameForTablesync(MySubscription->oid, + MyLogicalRepWorker->relid, + originname, + sizeof(originname)); + apply_error_callback_arg.origin_name = MemoryContextStrdup(ApplyContext, + originname);Can we assign this in LogicalRepSyncTableStart() where we already
forming the origin name? That will avoid this extra call to
ReplicationOriginNameForTablesync.
Yes, but it requires to expose either apply_error_callback_arg or the
tablesync's origin name since the tablesync worker sets its origin
name in tablesync.c. I think it's better to avoid exposing them.
2. @@ -3657,30 +3679,37 @@ apply_error_callback(void *arg) errcontext("processing remote data during \"%s\"", logicalrep_message_type(errarg->command)); else - errcontext("processing remote data during \"%s\" in transaction %u", + errcontext("processing remote data during \"%s\" in transaction %u committed at LSN %X/%X from replication origin \"%s\"", logicalrep_message_type(errarg->command), - errarg->remote_xid); + errarg->remote_xid, + LSN_FORMAT_ARGS(errarg->commit_lsn), + errarg->origin_name);Won't we set the origin name before the command? If so, it can be used
in the first message as well and we can change the condition in the
beginning such that if the origin or command is not set then we can
return without adding additional context information.
Good point.
Isn't this error message used for rollback prepared failure as well?
If so, do we need to say "... finished at LSN ..." instead of "...
committed at LSN ..."?
Right. Or can we just remove "committed" since the current message is
"transaction %u at %s"? That is , just replace the timestamp with LSN.
The other point about this message is that saying ".... from
replication origin ..." sounds more like remote information similar to
publication but the origin is of the local node. How about slightly
changing it to "processing remote data for replication origin \"%s\"
during \"%s\" in transaction ..."?
Okay, so the modified message would be like:
"processing remote data for replication origin \"%s\" during \"%s\"
for replication target relation \"%s.%s\" column \"%s\" in transaction
%u finished at LSN %X/%X"
3. +</screen> + The LSN of the transaction that contains the change violating the constraint and + the replication origin name can be found from those outputs (LSN 0/14C0378 and + replication origin <literal>pg_16395</literal> in the above case). The transaction + can be skipped by calling the <link linkend="pg-replication-origin-advance"> <function>pg_replication_origin_advance()</function></link> function with - a <parameter>node_name</parameter> corresponding to the subscription name, - and a position. The current position of origins can be seen in the - <link linkend="view-pg-replication-origin-status"> + the <parameter>node_name</parameter> and the next LSN of the commit LSN + (i.e., 0/14C0379) from those outputs.After node_name, can we specify origin_name similar to what we do for
LSN as that will make things more clear? Also, shall we mention that
users need to disable subscriptions to perform replication origin
advance?
Agreed.
I think for prepared transactions, the user might need to use it twice
because after skipping prepared xact, it will get an error again
during the processing of commit prepared (no such prepare exists).
Good point.
I
thought of mentioning it but felt that might lead to specifying too
many details which can confuse users as well. What do you think?
Given that this method of using pg_replication_origin_advance() is
normally not a preferable way, I think we might not need to mention
it. Also, it needs twice for one transaction but the steps are the
same.
4. There are places in the patch like apply_handle_stream_start()
which sets commit_lsn in callback arg as InvalidXLogRecPtr but the
callback function seems to be assuming that it is always a valid
value. Shouldn't we need to avoid appending LSN for such cases?
Agreed. Will fix it in the next version patch.
I'm updating the patches and will submit them.
Regards,
--
Masahiko Sawada
EDB: https://www.enterprisedb.com/
On Thu, Mar 3, 2022 at 10:02 PM Masahiko Sawada <sawada.mshk@gmail.com> wrote:
I'm updating the patches and will submit them.
Attached updated version patches.
Regards,
--
Masahiko Sawada
EDB: https://www.enterprisedb.com/
Attachments:
v3-0002-Add-the-origin-name-and-remote-commit-LSN-to-logi.patchapplication/octet-stream; name=v3-0002-Add-the-origin-name-and-remote-commit-LSN-to-logi.patchDownload
From 524d47418b800bccc89f18b3ef2bd89ecef625b4 Mon Sep 17 00:00:00 2001
From: Masahiko Sawada <sawada.mshk@gmail.com>
Date: Thu, 24 Feb 2022 16:56:58 +0900
Subject: [PATCH v3 2/3] Add the origin name and remote commit-LSN to logical
replication worker errcontext.
This commits adds both the commit-LSN and replication origin name to
the existing error context message.
This will help users in specifying the origin name and commit-LSN to
pg_replication_origin_advance() SQL function to skip the particular transaction.
---
doc/src/sgml/logical-replication.sgml | 21 +++++--
src/backend/replication/logical/worker.c | 73 ++++++++++++++++++------
2 files changed, 70 insertions(+), 24 deletions(-)
diff --git a/doc/src/sgml/logical-replication.sgml b/doc/src/sgml/logical-replication.sgml
index fb4472356d..ca0db358b0 100644
--- a/doc/src/sgml/logical-replication.sgml
+++ b/doc/src/sgml/logical-replication.sgml
@@ -352,12 +352,23 @@
<para>
The resolution can be done either by changing data or permissions on the subscriber so
that it does not conflict with the incoming change or by skipping the
- transaction that conflicts with the existing data. The transaction can be
- skipped by calling the <link linkend="pg-replication-origin-advance">
+ transaction that conflicts with the existing data. When a conflict produces
+ an error, it is shown in the subscriber's server logs as follows:
+<screen>
+ERROR: duplicate key value violates unique constraint "test_pkey"
+DETAIL: Key (c)=(1) already exists.
+CONTEXT: processing remote data during "INSERT" for replication target relation "public.test" in transaction 725 committed at LSN 0/14BFA88 from replication origin "pg_16395"
+</screen>
+ The LSN of the transaction that contains the change violating the constraint and
+ the replication origin name can be found from those outputs (LSN 0/14C0378 and
+ replication origin <literal>pg_16395</literal> in the above case). To skip the
+ transaction, the subscription needs to be disabled temporarily by
+ <command>ALTER SUBSCRIPTION ... DISABLE</command> first. Then, the transaction
+ can be skipped by calling the <link linkend="pg-replication-origin-advance">
<function>pg_replication_origin_advance()</function></link> function with
- a <parameter>node_name</parameter> corresponding to the subscription name,
- and a position. The current position of origins can be seen in the
- <link linkend="view-pg-replication-origin-status">
+ the <parameter>node_name</parameter> (i.e., <literal>pg_16395</literal>) and the
+ next LSN of the commit LSN (i.e., LSN 0/14C0379) from those outputs. The current
+ position of origins can be seen in the <link linkend="view-pg-replication-origin-status">
<structname>pg_replication_origin_status</structname></link> system view.
</para>
</sect1>
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 92aa794706..b9d0336a34 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -226,6 +226,8 @@ typedef struct ApplyErrorCallbackArg
/* Remote node information */
int remote_attnum; /* -1 if invalid */
TransactionId remote_xid;
+ XLogRecPtr commit_lsn;
+ char *origin_name;
} ApplyErrorCallbackArg;
static ApplyErrorCallbackArg apply_error_callback_arg =
@@ -234,6 +236,8 @@ static ApplyErrorCallbackArg apply_error_callback_arg =
.rel = NULL,
.remote_attnum = -1,
.remote_xid = InvalidTransactionId,
+ .commit_lsn = InvalidXLogRecPtr,
+ .origin_name = NULL,
};
static MemoryContext ApplyMessageContext = NULL;
@@ -332,7 +336,7 @@ static void apply_spooled_messages(TransactionId xid, XLogRecPtr lsn);
/* Functions for apply error callback */
static void apply_error_callback(void *arg);
-static inline void set_apply_error_context_xact(TransactionId xid);
+static inline void set_apply_error_context_xact(TransactionId xid, XLogRecPtr lsn);
static inline void reset_apply_error_context_info(void);
/*
@@ -785,7 +789,7 @@ apply_handle_begin(StringInfo s)
LogicalRepBeginData begin_data;
logicalrep_read_begin(s, &begin_data);
- set_apply_error_context_xact(begin_data.xid);
+ set_apply_error_context_xact(begin_data.xid, begin_data.final_lsn);
remote_final_lsn = begin_data.final_lsn;
@@ -837,7 +841,7 @@ apply_handle_begin_prepare(StringInfo s)
errmsg_internal("tablesync worker received a BEGIN PREPARE message")));
logicalrep_read_begin_prepare(s, &begin_data);
- set_apply_error_context_xact(begin_data.xid);
+ set_apply_error_context_xact(begin_data.xid, begin_data.prepare_lsn);
remote_final_lsn = begin_data.prepare_lsn;
@@ -936,7 +940,7 @@ apply_handle_commit_prepared(StringInfo s)
char gid[GIDSIZE];
logicalrep_read_commit_prepared(s, &prepare_data);
- set_apply_error_context_xact(prepare_data.xid);
+ set_apply_error_context_xact(prepare_data.xid, prepare_data.commit_lsn);
/* Compute GID for two_phase transactions. */
TwoPhaseTransactionGid(MySubscription->oid, prepare_data.xid,
@@ -977,7 +981,7 @@ apply_handle_rollback_prepared(StringInfo s)
char gid[GIDSIZE];
logicalrep_read_rollback_prepared(s, &rollback_data);
- set_apply_error_context_xact(rollback_data.xid);
+ set_apply_error_context_xact(rollback_data.xid, rollback_data.rollback_end_lsn);
/* Compute GID for two_phase transactions. */
TwoPhaseTransactionGid(MySubscription->oid, rollback_data.xid,
@@ -1042,7 +1046,7 @@ apply_handle_stream_prepare(StringInfo s)
errmsg_internal("tablesync worker received a STREAM PREPARE message")));
logicalrep_read_stream_prepare(s, &prepare_data);
- set_apply_error_context_xact(prepare_data.xid);
+ set_apply_error_context_xact(prepare_data.xid, prepare_data.prepare_lsn);
elog(DEBUG1, "received prepare for streamed transaction %u", prepare_data.xid);
@@ -1124,7 +1128,7 @@ apply_handle_stream_start(StringInfo s)
(errcode(ERRCODE_PROTOCOL_VIOLATION),
errmsg_internal("invalid transaction ID in streamed replication transaction")));
- set_apply_error_context_xact(stream_xid);
+ set_apply_error_context_xact(stream_xid, InvalidXLogRecPtr);
/*
* Initialize the worker's stream_fileset if we haven't yet. This will be
@@ -1213,7 +1217,7 @@ apply_handle_stream_abort(StringInfo s)
*/
if (xid == subxid)
{
- set_apply_error_context_xact(xid);
+ set_apply_error_context_xact(xid, InvalidXLogRecPtr);
stream_cleanup_files(MyLogicalRepWorker->subid, xid);
}
else
@@ -1239,7 +1243,7 @@ apply_handle_stream_abort(StringInfo s)
bool found = false;
char path[MAXPGPATH];
- set_apply_error_context_xact(subxid);
+ set_apply_error_context_xact(subxid, InvalidXLogRecPtr);
subidx = -1;
begin_replication_step();
@@ -1424,7 +1428,7 @@ apply_handle_stream_commit(StringInfo s)
errmsg_internal("STREAM COMMIT message without STREAM STOP")));
xid = logicalrep_read_stream_commit(s, &commit_data);
- set_apply_error_context_xact(xid);
+ set_apply_error_context_xact(xid, commit_data.commit_lsn);
elog(DEBUG1, "received commit for streamed transaction %u", xid);
@@ -3499,6 +3503,17 @@ ApplyWorkerMain(Datum main_arg)
myslotname = MemoryContextStrdup(ApplyContext, syncslotname);
pfree(syncslotname);
+
+ /*
+ * Allocate the origin name in long-lived context for error context
+ * message.
+ */
+ ReplicationOriginNameForTablesync(MySubscription->oid,
+ MyLogicalRepWorker->relid,
+ originname,
+ sizeof(originname));
+ apply_error_callback_arg.origin_name = MemoryContextStrdup(ApplyContext,
+ originname);
}
else
{
@@ -3542,6 +3557,13 @@ ApplyWorkerMain(Datum main_arg)
* does some initializations on the upstream so let's still call it.
*/
(void) walrcv_identify_system(LogRepWorkerWalRcvConn, &startpointTLI);
+
+ /*
+ * Allocate the origin name in long-lived context for error context
+ * message.
+ */
+ apply_error_callback_arg.origin_name = MemoryContextStrdup(ApplyContext,
+ originname);
}
/*
@@ -3654,33 +3676,46 @@ apply_error_callback(void *arg)
if (errarg->rel == NULL)
{
if (!TransactionIdIsValid(errarg->remote_xid))
- errcontext("processing remote data during \"%s\"",
+ errcontext("processing remote data for replication origin \"%s\" during \"%s\"",
+ errarg->origin_name,
logicalrep_message_type(errarg->command));
- else
- errcontext("processing remote data during \"%s\" in transaction %u",
+ else if (XLogRecPtrIsInvalid(errarg->commit_lsn))
+ errcontext("processing remote data for replication origin \"%s\" during \"%s\" in transaction %u",
+ errarg->origin_name,
logicalrep_message_type(errarg->command),
errarg->remote_xid);
+ else
+ errcontext("processing remote data for replication origin \"%s\" during \"%s\" in transaction %u finished at %X/%X",
+ errarg->origin_name,
+ logicalrep_message_type(errarg->command),
+ errarg->remote_xid,
+ LSN_FORMAT_ARGS(errarg->commit_lsn));
}
else if (errarg->remote_attnum < 0)
- errcontext("processing remote data during \"%s\" for replication target relation \"%s.%s\" in transaction %u",
+ errcontext("processing remote data for replication origin \"%s\" during \"%s\" for replication target relation \"%s.%s\" in transaction %u finished at %X/%X",
+ errarg->origin_name,
logicalrep_message_type(errarg->command),
errarg->rel->remoterel.nspname,
errarg->rel->remoterel.relname,
- errarg->remote_xid);
+ errarg->remote_xid,
+ LSN_FORMAT_ARGS(errarg->commit_lsn));
else
- errcontext("processing remote data during \"%s\" for replication target relation \"%s.%s\" column \"%s\" in transaction %u",
+ errcontext("processing remote data for replication origin \"%s\" during \"%s\" for replication target relation \"%s.%s\" column \"%s\" in transaction %u finished at %X/%X",
+ errarg->origin_name,
logicalrep_message_type(errarg->command),
errarg->rel->remoterel.nspname,
errarg->rel->remoterel.relname,
errarg->rel->remoterel.attnames[errarg->remote_attnum],
- errarg->remote_xid);
+ errarg->remote_xid,
+ LSN_FORMAT_ARGS(errarg->commit_lsn));
}
/* Set transaction information of apply error callback */
static inline void
-set_apply_error_context_xact(TransactionId xid)
+set_apply_error_context_xact(TransactionId xid, XLogRecPtr lsn)
{
apply_error_callback_arg.remote_xid = xid;
+ apply_error_callback_arg.commit_lsn = lsn;
}
/* Reset all information of apply error callback */
@@ -3690,5 +3725,5 @@ reset_apply_error_context_info(void)
apply_error_callback_arg.command = 0;
apply_error_callback_arg.rel = NULL;
apply_error_callback_arg.remote_attnum = -1;
- set_apply_error_context_xact(InvalidTransactionId);
+ set_apply_error_context_xact(InvalidTransactionId, InvalidXLogRecPtr);
}
--
2.24.3 (Apple Git-128)
v3-0001-Use-complete-sentences-in-logical-replication-wor.patchapplication/octet-stream; name=v3-0001-Use-complete-sentences-in-logical-replication-wor.patchDownload
From 1b318de52a9b157238467b45f52657f86157518b Mon Sep 17 00:00:00 2001
From: Masahiko Sawada <sawada.mshk@gmail.com>
Date: Mon, 28 Feb 2022 17:53:28 +0900
Subject: [PATCH v3 1/3] Use complete sentences in logical replication worker
errcontext.
Previously, the message for logical replication worker errcontext is
incrementally built, which was not translation friendly. Instead, we
use complete sentences with if-else branches.
We also remove the commit timestamp from the context message since
it's not an important information and made the message long.
---
src/backend/replication/logical/worker.c | 73 +++++++++++-------------
1 file changed, 33 insertions(+), 40 deletions(-)
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 7e267f7960..92aa794706 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -226,7 +226,6 @@ typedef struct ApplyErrorCallbackArg
/* Remote node information */
int remote_attnum; /* -1 if invalid */
TransactionId remote_xid;
- TimestampTz ts; /* commit, rollback, or prepare timestamp */
} ApplyErrorCallbackArg;
static ApplyErrorCallbackArg apply_error_callback_arg =
@@ -235,7 +234,6 @@ static ApplyErrorCallbackArg apply_error_callback_arg =
.rel = NULL,
.remote_attnum = -1,
.remote_xid = InvalidTransactionId,
- .ts = 0,
};
static MemoryContext ApplyMessageContext = NULL;
@@ -334,7 +332,7 @@ static void apply_spooled_messages(TransactionId xid, XLogRecPtr lsn);
/* Functions for apply error callback */
static void apply_error_callback(void *arg);
-static inline void set_apply_error_context_xact(TransactionId xid, TimestampTz ts);
+static inline void set_apply_error_context_xact(TransactionId xid);
static inline void reset_apply_error_context_info(void);
/*
@@ -787,7 +785,7 @@ apply_handle_begin(StringInfo s)
LogicalRepBeginData begin_data;
logicalrep_read_begin(s, &begin_data);
- set_apply_error_context_xact(begin_data.xid, begin_data.committime);
+ set_apply_error_context_xact(begin_data.xid);
remote_final_lsn = begin_data.final_lsn;
@@ -839,7 +837,7 @@ apply_handle_begin_prepare(StringInfo s)
errmsg_internal("tablesync worker received a BEGIN PREPARE message")));
logicalrep_read_begin_prepare(s, &begin_data);
- set_apply_error_context_xact(begin_data.xid, begin_data.prepare_time);
+ set_apply_error_context_xact(begin_data.xid);
remote_final_lsn = begin_data.prepare_lsn;
@@ -938,7 +936,7 @@ apply_handle_commit_prepared(StringInfo s)
char gid[GIDSIZE];
logicalrep_read_commit_prepared(s, &prepare_data);
- set_apply_error_context_xact(prepare_data.xid, prepare_data.commit_time);
+ set_apply_error_context_xact(prepare_data.xid);
/* Compute GID for two_phase transactions. */
TwoPhaseTransactionGid(MySubscription->oid, prepare_data.xid,
@@ -979,7 +977,7 @@ apply_handle_rollback_prepared(StringInfo s)
char gid[GIDSIZE];
logicalrep_read_rollback_prepared(s, &rollback_data);
- set_apply_error_context_xact(rollback_data.xid, rollback_data.rollback_time);
+ set_apply_error_context_xact(rollback_data.xid);
/* Compute GID for two_phase transactions. */
TwoPhaseTransactionGid(MySubscription->oid, rollback_data.xid,
@@ -1044,7 +1042,7 @@ apply_handle_stream_prepare(StringInfo s)
errmsg_internal("tablesync worker received a STREAM PREPARE message")));
logicalrep_read_stream_prepare(s, &prepare_data);
- set_apply_error_context_xact(prepare_data.xid, prepare_data.prepare_time);
+ set_apply_error_context_xact(prepare_data.xid);
elog(DEBUG1, "received prepare for streamed transaction %u", prepare_data.xid);
@@ -1126,7 +1124,7 @@ apply_handle_stream_start(StringInfo s)
(errcode(ERRCODE_PROTOCOL_VIOLATION),
errmsg_internal("invalid transaction ID in streamed replication transaction")));
- set_apply_error_context_xact(stream_xid, 0);
+ set_apply_error_context_xact(stream_xid);
/*
* Initialize the worker's stream_fileset if we haven't yet. This will be
@@ -1215,7 +1213,7 @@ apply_handle_stream_abort(StringInfo s)
*/
if (xid == subxid)
{
- set_apply_error_context_xact(xid, 0);
+ set_apply_error_context_xact(xid);
stream_cleanup_files(MyLogicalRepWorker->subid, xid);
}
else
@@ -1241,7 +1239,7 @@ apply_handle_stream_abort(StringInfo s)
bool found = false;
char path[MAXPGPATH];
- set_apply_error_context_xact(subxid, 0);
+ set_apply_error_context_xact(subxid);
subidx = -1;
begin_replication_step();
@@ -1426,7 +1424,7 @@ apply_handle_stream_commit(StringInfo s)
errmsg_internal("STREAM COMMIT message without STREAM STOP")));
xid = logicalrep_read_stream_commit(s, &commit_data);
- set_apply_error_context_xact(xid, commit_data.committime);
+ set_apply_error_context_xact(xid);
elog(DEBUG1, "received commit for streamed transaction %u", xid);
@@ -3648,46 +3646,41 @@ IsLogicalWorker(void)
static void
apply_error_callback(void *arg)
{
- StringInfoData buf;
ApplyErrorCallbackArg *errarg = &apply_error_callback_arg;
if (apply_error_callback_arg.command == 0)
return;
- initStringInfo(&buf);
- appendStringInfo(&buf, _("processing remote data during \"%s\""),
- logicalrep_message_type(errarg->command));
-
- /* append relation information */
- if (errarg->rel)
- {
- appendStringInfo(&buf, _(" for replication target relation \"%s.%s\""),
- errarg->rel->remoterel.nspname,
- errarg->rel->remoterel.relname);
- if (errarg->remote_attnum >= 0)
- appendStringInfo(&buf, _(" column \"%s\""),
- errarg->rel->remoterel.attnames[errarg->remote_attnum]);
- }
-
- /* append transaction information */
- if (TransactionIdIsNormal(errarg->remote_xid))
+ if (errarg->rel == NULL)
{
- appendStringInfo(&buf, _(" in transaction %u"), errarg->remote_xid);
- if (errarg->ts != 0)
- appendStringInfo(&buf, _(" at %s"),
- timestamptz_to_str(errarg->ts));
+ if (!TransactionIdIsValid(errarg->remote_xid))
+ errcontext("processing remote data during \"%s\"",
+ logicalrep_message_type(errarg->command));
+ else
+ errcontext("processing remote data during \"%s\" in transaction %u",
+ logicalrep_message_type(errarg->command),
+ errarg->remote_xid);
}
-
- errcontext("%s", buf.data);
- pfree(buf.data);
+ else if (errarg->remote_attnum < 0)
+ errcontext("processing remote data during \"%s\" for replication target relation \"%s.%s\" in transaction %u",
+ logicalrep_message_type(errarg->command),
+ errarg->rel->remoterel.nspname,
+ errarg->rel->remoterel.relname,
+ errarg->remote_xid);
+ else
+ errcontext("processing remote data during \"%s\" for replication target relation \"%s.%s\" column \"%s\" in transaction %u",
+ logicalrep_message_type(errarg->command),
+ errarg->rel->remoterel.nspname,
+ errarg->rel->remoterel.relname,
+ errarg->rel->remoterel.attnames[errarg->remote_attnum],
+ errarg->remote_xid);
}
/* Set transaction information of apply error callback */
static inline void
-set_apply_error_context_xact(TransactionId xid, TimestampTz ts)
+set_apply_error_context_xact(TransactionId xid)
{
apply_error_callback_arg.remote_xid = xid;
- apply_error_callback_arg.ts = ts;
}
/* Reset all information of apply error callback */
@@ -3697,5 +3690,5 @@ reset_apply_error_context_info(void)
apply_error_callback_arg.command = 0;
apply_error_callback_arg.rel = NULL;
apply_error_callback_arg.remote_attnum = -1;
- set_apply_error_context_xact(InvalidTransactionId, 0);
+ set_apply_error_context_xact(InvalidTransactionId);
}
--
2.24.3 (Apple Git-128)
On Friday, March 4, 2022 10:09 AM Masahiko Sawada <sawada.mshk@gmail.com> wrote:
On Thu, Mar 3, 2022 at 10:02 PM Masahiko Sawada
<sawada.mshk@gmail.com> wrote:I'm updating the patches and will submit them.
Attached updated version patches.
Thank you for sharing the patch v3.
Few minor comments.
(1) v03-0001, apply_error_callback function
- /* append transaction information */
- if (TransactionIdIsNormal(errarg->remote_xid))
+ if (errarg->rel == NULL)
{
- appendStringInfo(&buf, _(" in transaction %u"), errarg->remote_xid);
Should write !errarg->rel ?
(2) v03-0002, doc/src/sgml/logical-replication.sgml
+ transaction that conflicts with the existing data. When a conflict produces
+ an error, it is shown in the subscriber's server logs as follows:
+<screen>
+ERROR: duplicate key value violates unique constraint "test_pkey"
+DETAIL: Key (c)=(1) already exists.
+CONTEXT: processing remote data during "INSERT" for replication target relation "public.test" in transaction 725 committed at LSN 0/14BFA88
+</screen>
We should update the CONTEXT message by using the v3-0001.
(3) v03-0002, doc/src/sgml/logical-replication.sgml
+ The LSN of the transaction that contains the change violating the constraint and
+ the replication origin name can be found from those outputs (LSN 0/14C0378 and
+ replication origin <literal>pg_16395</literal> in the above case). To skip the
+ transaction, the subscription needs to be disabled temporarily by
+ <command>ALTER SUBSCRIPTION ... DISABLE</command> first. Then, the transaction
+ can be skipped by calling the <link linkend="pg-replication-origin-advance">
The LSN(0/14C0378) is not same as the one in the above error context.
It's recommended to check LSNs directly written in the documentation.
(4) one confirmation
We don't have a TAP test of pg_replication_origin_advance()
for v3, that utilizes this new log in a logical replication setup.
This is because existing tests for this function (in test_decoding) is only for permission check
and argument validation, and we're just changing error message itself.
Is this correct ?
Best Regards,
Takamichi Osumi
On Fri, Mar 4, 2022 at 11:27 AM osumi.takamichi@fujitsu.com
<osumi.takamichi@fujitsu.com> wrote:
On Friday, March 4, 2022 10:09 AM Masahiko Sawada <sawada.mshk@gmail.com> wrote:
On Thu, Mar 3, 2022 at 10:02 PM Masahiko Sawada
<sawada.mshk@gmail.com> wrote:I'm updating the patches and will submit them.
Attached updated version patches.
Thank you for sharing the patch v3.
Few minor comments.
(1) v03-0001, apply_error_callback function
- /* append transaction information */ - if (TransactionIdIsNormal(errarg->remote_xid)) + if (errarg->rel == NULL) { - appendStringInfo(&buf, _(" in transaction %u"), errarg->remote_xid);Should write !errarg->rel ?
I think either should be fine.
(2) v03-0002, doc/src/sgml/logical-replication.sgml
+ transaction that conflicts with the existing data. When a conflict produces + an error, it is shown in the subscriber's server logs as follows: +<screen> +ERROR: duplicate key value violates unique constraint "test_pkey" +DETAIL: Key (c)=(1) already exists. +CONTEXT: processing remote data during "INSERT" for replication target relation "public.test" in transaction 725 committed at LSN 0/14BFA88 +</screen>We should update the CONTEXT message by using the v3-0001.
(3) v03-0002, doc/src/sgml/logical-replication.sgml
+ The LSN of the transaction that contains the change violating the constraint and + the replication origin name can be found from those outputs (LSN 0/14C0378 and + replication origin <literal>pg_16395</literal> in the above case). To skip the + transaction, the subscription needs to be disabled temporarily by + <command>ALTER SUBSCRIPTION ... DISABLE</command> first. Then, the transaction + can be skipped by calling the <link linkend="pg-replication-origin-advance">The LSN(0/14C0378) is not same as the one in the above error context.
It's recommended to check LSNs directly written in the documentation.
Right, I missed checking and updating it.
(4) one confirmation
We don't have a TAP test of pg_replication_origin_advance()
for v3, that utilizes this new log in a logical replication setup.
This is because existing tests for this function (in test_decoding) is only for permission check
and argument validation, and we're just changing error message itself.
Is this correct ?
Yeah, I think it’s a good idea to add test in general but I don't
think we should include the tests for skipping a transaction by using
pg_replication_origin() in this patch because it's an existing way and
upcoming ALTER SUBSCRIPTION SKIP patch includes the tests and it's
more appropriate way. But if others also think it should do that too
along with this update, I'm happy to add tests.
I've attached updated patches.
Regards,
--
Masahiko Sawada
EDB: https://www.enterprisedb.com/
Attachments:
v4-0002-Add-the-origin-name-and-remote-commit-LSN-to-logi.patchapplication/octet-stream; name=v4-0002-Add-the-origin-name-and-remote-commit-LSN-to-logi.patchDownload
From 5d4e91232c9b3748265c45da4f328b7c75b47c7e Mon Sep 17 00:00:00 2001
From: Masahiko Sawada <sawada.mshk@gmail.com>
Date: Thu, 24 Feb 2022 16:56:58 +0900
Subject: [PATCH v4 2/3] Add the origin name and remote commit-LSN to logical
replication worker errcontext.
This commits adds both the commit-LSN and replication origin name to
the existing error context message.
This will help users in specifying the origin name and commit-LSN to
pg_replication_origin_advance() SQL function to skip the particular transaction.
---
doc/src/sgml/logical-replication.sgml | 21 +++++--
src/backend/replication/logical/worker.c | 73 ++++++++++++++++++------
2 files changed, 71 insertions(+), 23 deletions(-)
diff --git a/doc/src/sgml/logical-replication.sgml b/doc/src/sgml/logical-replication.sgml
index fb4472356d..59a22b9b32 100644
--- a/doc/src/sgml/logical-replication.sgml
+++ b/doc/src/sgml/logical-replication.sgml
@@ -352,11 +352,24 @@
<para>
The resolution can be done either by changing data or permissions on the subscriber so
that it does not conflict with the incoming change or by skipping the
- transaction that conflicts with the existing data. The transaction can be
- skipped by calling the <link linkend="pg-replication-origin-advance">
+ transaction that conflicts with the existing data. When a conflict produces
+ an error, it is shown in the subscriber's server logs as follows:
+<screen>
+ERROR: duplicate key value violates unique constraint "test_pkey"
+DETAIL: Key (c)=(1) already exists.
+CONTEXT: processing remote data for replication origin "pg_16395" during "INSERT" for replication target relation "public.test" in transaction 725 at 0/14C0378
+</screen>
+ The LSN of the transaction that contains the change violating the constraint and
+ the replication origin name can be found in the error context message
+ (replication origin <literal>pg_16395</literal> and LSN <literal>0/14C0378</literal>
+ in the above case). To skip the transaction, the subscription needs to be temporarily
+ disabled by <command>ALTER SUBSCRIPTION ... DISABLE</command> first. Then, the
+ transaction can be skipped by calling the
+ <link linkend="pg-replication-origin-advance">
<function>pg_replication_origin_advance()</function></link> function with
- a <parameter>node_name</parameter> corresponding to the subscription name,
- and a position. The current position of origins can be seen in the
+ the <parameter>node_name</parameter> (i.e., <literal>pg_16395</literal>) and the
+ next LSN of the transaction's LSN (i.e., LSN <literal>0/14C0379</literal>) from
+ those outputs. The current position of origins can be seen in the
<link linkend="view-pg-replication-origin-status">
<structname>pg_replication_origin_status</structname></link> system view.
</para>
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 92aa794706..076734af23 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -226,6 +226,8 @@ typedef struct ApplyErrorCallbackArg
/* Remote node information */
int remote_attnum; /* -1 if invalid */
TransactionId remote_xid;
+ XLogRecPtr commit_lsn;
+ char *origin_name;
} ApplyErrorCallbackArg;
static ApplyErrorCallbackArg apply_error_callback_arg =
@@ -234,6 +236,8 @@ static ApplyErrorCallbackArg apply_error_callback_arg =
.rel = NULL,
.remote_attnum = -1,
.remote_xid = InvalidTransactionId,
+ .commit_lsn = InvalidXLogRecPtr,
+ .origin_name = NULL,
};
static MemoryContext ApplyMessageContext = NULL;
@@ -332,7 +336,7 @@ static void apply_spooled_messages(TransactionId xid, XLogRecPtr lsn);
/* Functions for apply error callback */
static void apply_error_callback(void *arg);
-static inline void set_apply_error_context_xact(TransactionId xid);
+static inline void set_apply_error_context_xact(TransactionId xid, XLogRecPtr lsn);
static inline void reset_apply_error_context_info(void);
/*
@@ -785,7 +789,7 @@ apply_handle_begin(StringInfo s)
LogicalRepBeginData begin_data;
logicalrep_read_begin(s, &begin_data);
- set_apply_error_context_xact(begin_data.xid);
+ set_apply_error_context_xact(begin_data.xid, begin_data.final_lsn);
remote_final_lsn = begin_data.final_lsn;
@@ -837,7 +841,7 @@ apply_handle_begin_prepare(StringInfo s)
errmsg_internal("tablesync worker received a BEGIN PREPARE message")));
logicalrep_read_begin_prepare(s, &begin_data);
- set_apply_error_context_xact(begin_data.xid);
+ set_apply_error_context_xact(begin_data.xid, begin_data.prepare_lsn);
remote_final_lsn = begin_data.prepare_lsn;
@@ -936,7 +940,7 @@ apply_handle_commit_prepared(StringInfo s)
char gid[GIDSIZE];
logicalrep_read_commit_prepared(s, &prepare_data);
- set_apply_error_context_xact(prepare_data.xid);
+ set_apply_error_context_xact(prepare_data.xid, prepare_data.commit_lsn);
/* Compute GID for two_phase transactions. */
TwoPhaseTransactionGid(MySubscription->oid, prepare_data.xid,
@@ -977,7 +981,7 @@ apply_handle_rollback_prepared(StringInfo s)
char gid[GIDSIZE];
logicalrep_read_rollback_prepared(s, &rollback_data);
- set_apply_error_context_xact(rollback_data.xid);
+ set_apply_error_context_xact(rollback_data.xid, rollback_data.rollback_end_lsn);
/* Compute GID for two_phase transactions. */
TwoPhaseTransactionGid(MySubscription->oid, rollback_data.xid,
@@ -1042,7 +1046,7 @@ apply_handle_stream_prepare(StringInfo s)
errmsg_internal("tablesync worker received a STREAM PREPARE message")));
logicalrep_read_stream_prepare(s, &prepare_data);
- set_apply_error_context_xact(prepare_data.xid);
+ set_apply_error_context_xact(prepare_data.xid, prepare_data.prepare_lsn);
elog(DEBUG1, "received prepare for streamed transaction %u", prepare_data.xid);
@@ -1124,7 +1128,7 @@ apply_handle_stream_start(StringInfo s)
(errcode(ERRCODE_PROTOCOL_VIOLATION),
errmsg_internal("invalid transaction ID in streamed replication transaction")));
- set_apply_error_context_xact(stream_xid);
+ set_apply_error_context_xact(stream_xid, InvalidXLogRecPtr);
/*
* Initialize the worker's stream_fileset if we haven't yet. This will be
@@ -1213,7 +1217,7 @@ apply_handle_stream_abort(StringInfo s)
*/
if (xid == subxid)
{
- set_apply_error_context_xact(xid);
+ set_apply_error_context_xact(xid, InvalidXLogRecPtr);
stream_cleanup_files(MyLogicalRepWorker->subid, xid);
}
else
@@ -1239,7 +1243,7 @@ apply_handle_stream_abort(StringInfo s)
bool found = false;
char path[MAXPGPATH];
- set_apply_error_context_xact(subxid);
+ set_apply_error_context_xact(subxid, InvalidXLogRecPtr);
subidx = -1;
begin_replication_step();
@@ -1424,7 +1428,7 @@ apply_handle_stream_commit(StringInfo s)
errmsg_internal("STREAM COMMIT message without STREAM STOP")));
xid = logicalrep_read_stream_commit(s, &commit_data);
- set_apply_error_context_xact(xid);
+ set_apply_error_context_xact(xid, commit_data.commit_lsn);
elog(DEBUG1, "received commit for streamed transaction %u", xid);
@@ -3499,6 +3503,17 @@ ApplyWorkerMain(Datum main_arg)
myslotname = MemoryContextStrdup(ApplyContext, syncslotname);
pfree(syncslotname);
+
+ /*
+ * Allocate the origin name in long-lived context for error context
+ * message.
+ */
+ ReplicationOriginNameForTablesync(MySubscription->oid,
+ MyLogicalRepWorker->relid,
+ originname,
+ sizeof(originname));
+ apply_error_callback_arg.origin_name = MemoryContextStrdup(ApplyContext,
+ originname);
}
else
{
@@ -3542,6 +3557,13 @@ ApplyWorkerMain(Datum main_arg)
* does some initializations on the upstream so let's still call it.
*/
(void) walrcv_identify_system(LogRepWorkerWalRcvConn, &startpointTLI);
+
+ /*
+ * Allocate the origin name in long-lived context for error context
+ * message.
+ */
+ apply_error_callback_arg.origin_name = MemoryContextStrdup(ApplyContext,
+ originname);
}
/*
@@ -3654,33 +3676,46 @@ apply_error_callback(void *arg)
if (errarg->rel == NULL)
{
if (!TransactionIdIsValid(errarg->remote_xid))
- errcontext("processing remote data during \"%s\"",
+ errcontext("processing remote data for replication origin \"%s\" during \"%s\"",
+ errarg->origin_name,
logicalrep_message_type(errarg->command));
- else
- errcontext("processing remote data during \"%s\" in transaction %u",
+ else if (XLogRecPtrIsInvalid(errarg->commit_lsn))
+ errcontext("processing remote data for replication origin \"%s\" during \"%s\" in transaction %u",
+ errarg->origin_name,
logicalrep_message_type(errarg->command),
errarg->remote_xid);
+ else
+ errcontext("processing remote data for replication origin \"%s\" during \"%s\" in transaction %u at %X/%X",
+ errarg->origin_name,
+ logicalrep_message_type(errarg->command),
+ errarg->remote_xid,
+ LSN_FORMAT_ARGS(errarg->commit_lsn));
}
else if (errarg->remote_attnum < 0)
- errcontext("processing remote data during \"%s\" for replication target relation \"%s.%s\" in transaction %u",
+ errcontext("processing remote data for replication origin \"%s\" during \"%s\" for replication target relation \"%s.%s\" in transaction %u at %X/%X",
+ errarg->origin_name,
logicalrep_message_type(errarg->command),
errarg->rel->remoterel.nspname,
errarg->rel->remoterel.relname,
- errarg->remote_xid);
+ errarg->remote_xid,
+ LSN_FORMAT_ARGS(errarg->commit_lsn));
else
- errcontext("processing remote data during \"%s\" for replication target relation \"%s.%s\" column \"%s\" in transaction %u",
+ errcontext("processing remote data for replication origin \"%s\" during \"%s\" for replication target relation \"%s.%s\" column \"%s\" in transaction %u at %X/%X",
+ errarg->origin_name,
logicalrep_message_type(errarg->command),
errarg->rel->remoterel.nspname,
errarg->rel->remoterel.relname,
errarg->rel->remoterel.attnames[errarg->remote_attnum],
- errarg->remote_xid);
+ errarg->remote_xid,
+ LSN_FORMAT_ARGS(errarg->commit_lsn));
}
/* Set transaction information of apply error callback */
static inline void
-set_apply_error_context_xact(TransactionId xid)
+set_apply_error_context_xact(TransactionId xid, XLogRecPtr lsn)
{
apply_error_callback_arg.remote_xid = xid;
+ apply_error_callback_arg.commit_lsn = lsn;
}
/* Reset all information of apply error callback */
@@ -3690,5 +3725,5 @@ reset_apply_error_context_info(void)
apply_error_callback_arg.command = 0;
apply_error_callback_arg.rel = NULL;
apply_error_callback_arg.remote_attnum = -1;
- set_apply_error_context_xact(InvalidTransactionId);
+ set_apply_error_context_xact(InvalidTransactionId, InvalidXLogRecPtr);
}
--
2.24.3 (Apple Git-128)
v4-0001-Use-complete-sentences-in-logical-replication-wor.patchapplication/octet-stream; name=v4-0001-Use-complete-sentences-in-logical-replication-wor.patchDownload
From 1b318de52a9b157238467b45f52657f86157518b Mon Sep 17 00:00:00 2001
From: Masahiko Sawada <sawada.mshk@gmail.com>
Date: Mon, 28 Feb 2022 17:53:28 +0900
Subject: [PATCH v4 1/3] Use complete sentences in logical replication worker
errcontext.
Previously, the message for logical replication worker errcontext is
incrementally built, which was not translation friendly. Instead, we
use complete sentences with if-else branches.
We also remove the commit timestamp from the context message since
it's not an important information and made the message long.
---
src/backend/replication/logical/worker.c | 73 +++++++++++-------------
1 file changed, 33 insertions(+), 40 deletions(-)
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 7e267f7960..92aa794706 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -226,7 +226,6 @@ typedef struct ApplyErrorCallbackArg
/* Remote node information */
int remote_attnum; /* -1 if invalid */
TransactionId remote_xid;
- TimestampTz ts; /* commit, rollback, or prepare timestamp */
} ApplyErrorCallbackArg;
static ApplyErrorCallbackArg apply_error_callback_arg =
@@ -235,7 +234,6 @@ static ApplyErrorCallbackArg apply_error_callback_arg =
.rel = NULL,
.remote_attnum = -1,
.remote_xid = InvalidTransactionId,
- .ts = 0,
};
static MemoryContext ApplyMessageContext = NULL;
@@ -334,7 +332,7 @@ static void apply_spooled_messages(TransactionId xid, XLogRecPtr lsn);
/* Functions for apply error callback */
static void apply_error_callback(void *arg);
-static inline void set_apply_error_context_xact(TransactionId xid, TimestampTz ts);
+static inline void set_apply_error_context_xact(TransactionId xid);
static inline void reset_apply_error_context_info(void);
/*
@@ -787,7 +785,7 @@ apply_handle_begin(StringInfo s)
LogicalRepBeginData begin_data;
logicalrep_read_begin(s, &begin_data);
- set_apply_error_context_xact(begin_data.xid, begin_data.committime);
+ set_apply_error_context_xact(begin_data.xid);
remote_final_lsn = begin_data.final_lsn;
@@ -839,7 +837,7 @@ apply_handle_begin_prepare(StringInfo s)
errmsg_internal("tablesync worker received a BEGIN PREPARE message")));
logicalrep_read_begin_prepare(s, &begin_data);
- set_apply_error_context_xact(begin_data.xid, begin_data.prepare_time);
+ set_apply_error_context_xact(begin_data.xid);
remote_final_lsn = begin_data.prepare_lsn;
@@ -938,7 +936,7 @@ apply_handle_commit_prepared(StringInfo s)
char gid[GIDSIZE];
logicalrep_read_commit_prepared(s, &prepare_data);
- set_apply_error_context_xact(prepare_data.xid, prepare_data.commit_time);
+ set_apply_error_context_xact(prepare_data.xid);
/* Compute GID for two_phase transactions. */
TwoPhaseTransactionGid(MySubscription->oid, prepare_data.xid,
@@ -979,7 +977,7 @@ apply_handle_rollback_prepared(StringInfo s)
char gid[GIDSIZE];
logicalrep_read_rollback_prepared(s, &rollback_data);
- set_apply_error_context_xact(rollback_data.xid, rollback_data.rollback_time);
+ set_apply_error_context_xact(rollback_data.xid);
/* Compute GID for two_phase transactions. */
TwoPhaseTransactionGid(MySubscription->oid, rollback_data.xid,
@@ -1044,7 +1042,7 @@ apply_handle_stream_prepare(StringInfo s)
errmsg_internal("tablesync worker received a STREAM PREPARE message")));
logicalrep_read_stream_prepare(s, &prepare_data);
- set_apply_error_context_xact(prepare_data.xid, prepare_data.prepare_time);
+ set_apply_error_context_xact(prepare_data.xid);
elog(DEBUG1, "received prepare for streamed transaction %u", prepare_data.xid);
@@ -1126,7 +1124,7 @@ apply_handle_stream_start(StringInfo s)
(errcode(ERRCODE_PROTOCOL_VIOLATION),
errmsg_internal("invalid transaction ID in streamed replication transaction")));
- set_apply_error_context_xact(stream_xid, 0);
+ set_apply_error_context_xact(stream_xid);
/*
* Initialize the worker's stream_fileset if we haven't yet. This will be
@@ -1215,7 +1213,7 @@ apply_handle_stream_abort(StringInfo s)
*/
if (xid == subxid)
{
- set_apply_error_context_xact(xid, 0);
+ set_apply_error_context_xact(xid);
stream_cleanup_files(MyLogicalRepWorker->subid, xid);
}
else
@@ -1241,7 +1239,7 @@ apply_handle_stream_abort(StringInfo s)
bool found = false;
char path[MAXPGPATH];
- set_apply_error_context_xact(subxid, 0);
+ set_apply_error_context_xact(subxid);
subidx = -1;
begin_replication_step();
@@ -1426,7 +1424,7 @@ apply_handle_stream_commit(StringInfo s)
errmsg_internal("STREAM COMMIT message without STREAM STOP")));
xid = logicalrep_read_stream_commit(s, &commit_data);
- set_apply_error_context_xact(xid, commit_data.committime);
+ set_apply_error_context_xact(xid);
elog(DEBUG1, "received commit for streamed transaction %u", xid);
@@ -3648,46 +3646,41 @@ IsLogicalWorker(void)
static void
apply_error_callback(void *arg)
{
- StringInfoData buf;
ApplyErrorCallbackArg *errarg = &apply_error_callback_arg;
if (apply_error_callback_arg.command == 0)
return;
- initStringInfo(&buf);
- appendStringInfo(&buf, _("processing remote data during \"%s\""),
- logicalrep_message_type(errarg->command));
-
- /* append relation information */
- if (errarg->rel)
- {
- appendStringInfo(&buf, _(" for replication target relation \"%s.%s\""),
- errarg->rel->remoterel.nspname,
- errarg->rel->remoterel.relname);
- if (errarg->remote_attnum >= 0)
- appendStringInfo(&buf, _(" column \"%s\""),
- errarg->rel->remoterel.attnames[errarg->remote_attnum]);
- }
-
- /* append transaction information */
- if (TransactionIdIsNormal(errarg->remote_xid))
+ if (errarg->rel == NULL)
{
- appendStringInfo(&buf, _(" in transaction %u"), errarg->remote_xid);
- if (errarg->ts != 0)
- appendStringInfo(&buf, _(" at %s"),
- timestamptz_to_str(errarg->ts));
+ if (!TransactionIdIsValid(errarg->remote_xid))
+ errcontext("processing remote data during \"%s\"",
+ logicalrep_message_type(errarg->command));
+ else
+ errcontext("processing remote data during \"%s\" in transaction %u",
+ logicalrep_message_type(errarg->command),
+ errarg->remote_xid);
}
-
- errcontext("%s", buf.data);
- pfree(buf.data);
+ else if (errarg->remote_attnum < 0)
+ errcontext("processing remote data during \"%s\" for replication target relation \"%s.%s\" in transaction %u",
+ logicalrep_message_type(errarg->command),
+ errarg->rel->remoterel.nspname,
+ errarg->rel->remoterel.relname,
+ errarg->remote_xid);
+ else
+ errcontext("processing remote data during \"%s\" for replication target relation \"%s.%s\" column \"%s\" in transaction %u",
+ logicalrep_message_type(errarg->command),
+ errarg->rel->remoterel.nspname,
+ errarg->rel->remoterel.relname,
+ errarg->rel->remoterel.attnames[errarg->remote_attnum],
+ errarg->remote_xid);
}
/* Set transaction information of apply error callback */
static inline void
-set_apply_error_context_xact(TransactionId xid, TimestampTz ts)
+set_apply_error_context_xact(TransactionId xid)
{
apply_error_callback_arg.remote_xid = xid;
- apply_error_callback_arg.ts = ts;
}
/* Reset all information of apply error callback */
@@ -3697,5 +3690,5 @@ reset_apply_error_context_info(void)
apply_error_callback_arg.command = 0;
apply_error_callback_arg.rel = NULL;
apply_error_callback_arg.remote_attnum = -1;
- set_apply_error_context_xact(InvalidTransactionId, 0);
+ set_apply_error_context_xact(InvalidTransactionId);
}
--
2.24.3 (Apple Git-128)
On Fri, Mar 4, 2022 at 6:40 AM Masahiko Sawada <sawada.mshk@gmail.com> wrote:
Attached updated version patches.
The patch looks mostly good to me. Few minor comments:
1. I think we can have an Assert for errarg->origin_name in
apply_error_callback after checking the command as this function
assumes that it will always be set.
2. I suggest minor changes in the documentation change:
When a conflict produces an error, the replication won't proceed, and
the apply worker will emit the following kind of message to the
subscriber's server log:
+<screen>
+ERROR: duplicate key value violates unique constraint "test_pkey"
+DETAIL: Key (c)=(1) already exists.
+CONTEXT: processing remote data during "INSERT" for replication
target relation "public.test" in transaction 725 committed at LSN
0/14BFA88 from replication origin "pg_16395"
+</screen>
The LSN of the transaction that contains the change violating the
constraint and the replication origin name can be found from the
server log (LSN 0/14C0378 and replication origin
<literal>pg_16395</literal> in the above case). To skip the
transaction, the subscription needs to be disabled temporarily by
<command>ALTER SUBSCRIPTION ... DISABLE</command> first. Then, the
transaction can be skipped by calling the <link
linkend="pg-replication-origin-advance">
<function>pg_replication_origin_advance()</function></link> function
with the <parameter>node_name</parameter> (i.e.,
<literal>pg_16395</literal>) and the next LSN of the commit LSN (i.e.,
LSN 0/14C0379).
--
With Regards,
Amit Kapila.
On Fri, Mar 4, 2022 at 10:53 AM Masahiko Sawada <sawada.mshk@gmail.com> wrote:
On Fri, Mar 4, 2022 at 11:27 AM osumi.takamichi@fujitsu.com
<osumi.takamichi@fujitsu.com> wrote:(4) one confirmation
We don't have a TAP test of pg_replication_origin_advance()
for v3, that utilizes this new log in a logical replication setup.
This is because existing tests for this function (in test_decoding) is only for permission check
and argument validation, and we're just changing error message itself.
Is this correct ?Yeah, I think it’s a good idea to add test in general but I don't
think we should include the tests for skipping a transaction by using
pg_replication_origin() in this patch because it's an existing way and
upcoming ALTER SUBSCRIPTION SKIP patch includes the tests and it's
more appropriate way. But if others also think it should do that too
along with this update, I'm happy to add tests.
I also don't see a reason why this patch should add any tests related
to origin.
--
With Regards,
Amit Kapila.
On Friday, March 4, 2022 2:23 PM Masahiko Sawada <sawada.mshk@gmail.com> wrote:
I've attached updated patches.
Hi, thank you for updating the patch.
One comment on v4.
In v4-0002, we introduce 'commit_lsn' in the ApplyErrorCallbackArg.
This member is set for prepare, rollback prepared and stream_abort as well.
The new log message format is useful when we have a prepare transaction
that keeps failing on the subscriber and want to know the target transaction
for the pg_replication_origin_advance(), right ?
If this is true, I wasn't sure if the name 'commit_lsn' is the
most accurate name for this variable. Should we adjust the name a bit ?
Even when we decide to continue to use 'commit_lsn',
it might be better to add some comments near the definition, I feel.
Best Regards,
Takamichi Osumi
On Fri, Mar 4, 2022 at 11:45 AM osumi.takamichi@fujitsu.com
<osumi.takamichi@fujitsu.com> wrote:
On Friday, March 4, 2022 2:23 PM Masahiko Sawada <sawada.mshk@gmail.com> wrote:
I've attached updated patches.
Hi, thank you for updating the patch.
One comment on v4.
In v4-0002, we introduce 'commit_lsn' in the ApplyErrorCallbackArg.
This member is set for prepare, rollback prepared and stream_abort as well.
The new log message format is useful when we have a prepare transaction
that keeps failing on the subscriber and want to know the target transaction
for the pg_replication_origin_advance(), right ?
If this is true, I wasn't sure if the name 'commit_lsn' is the
most accurate name for this variable. Should we adjust the name a bit ?
If we want to change this variable name, the other options could be
'end_lsn', or 'finish_lsn'.
--
With Regards,
Amit Kapila.
On Fri, Mar 4, 2022 at 2:55 PM Amit Kapila <amit.kapila16@gmail.com> wrote:
On Fri, Mar 4, 2022 at 6:40 AM Masahiko Sawada <sawada.mshk@gmail.com> wrote:
Attached updated version patches.
The patch looks mostly good to me. Few minor comments:
Thank you for the comments!
1. I think we can have an Assert for errarg->origin_name in
apply_error_callback after checking the command as this function
assumes that it will always be set.
Added.
2. I suggest minor changes in the documentation change: When a conflict produces an error, the replication won't proceed, and the apply worker will emit the following kind of message to the subscriber's server log: +<screen> +ERROR: duplicate key value violates unique constraint "test_pkey" +DETAIL: Key (c)=(1) already exists. +CONTEXT: processing remote data during "INSERT" for replication target relation "public.test" in transaction 725 committed at LSN 0/14BFA88 from replication origin "pg_16395" +</screen>The LSN of the transaction that contains the change violating the
constraint and the replication origin name can be found from the
server log (LSN 0/14C0378 and replication origin
<literal>pg_16395</literal> in the above case). To skip the
transaction, the subscription needs to be disabled temporarily by
<command>ALTER SUBSCRIPTION ... DISABLE</command> first. Then, the
transaction can be skipped by calling the <link
linkend="pg-replication-origin-advance">
<function>pg_replication_origin_advance()</function></link> function
with the <parameter>node_name</parameter> (i.e.,
<literal>pg_16395</literal>) and the next LSN of the commit LSN (i.e.,
LSN 0/14C0379).
Fixed.
On Fri, Mar 4, 2022 at 3:21 PM Amit Kapila <amit.kapila16@gmail.com> wrote:
On Fri, Mar 4, 2022 at 11:45 AM osumi.takamichi@fujitsu.com
<osumi.takamichi@fujitsu.com> wrote:On Friday, March 4, 2022 2:23 PM Masahiko Sawada <sawada.mshk@gmail.com> wrote:
I've attached updated patches.
Hi, thank you for updating the patch.
One comment on v4.
In v4-0002, we introduce 'commit_lsn' in the ApplyErrorCallbackArg.
This member is set for prepare, rollback prepared and stream_abort as well.
The new log message format is useful when we have a prepare transaction
that keeps failing on the subscriber and want to know the target transaction
for the pg_replication_origin_advance(), right ?
If this is true, I wasn't sure if the name 'commit_lsn' is the
most accurate name for this variable. Should we adjust the name a bit ?If we want to change this variable name, the other options could be
'end_lsn', or 'finish_lsn'.
Agreed with 'finish_lsn'.
I've attached updated patches. Please review them.
Regards,
--
Masahiko Sawada
EDB: https://www.enterprisedb.com/
Attachments:
v4-0002-Add-the-origin-name-and-remote-commit-LSN-to-logi.patchapplication/octet-stream; name=v4-0002-Add-the-origin-name-and-remote-commit-LSN-to-logi.patchDownload
From 5213cada035fcb0c2e2433720df5bacd586829ae Mon Sep 17 00:00:00 2001
From: Masahiko Sawada <sawada.mshk@gmail.com>
Date: Thu, 24 Feb 2022 16:56:58 +0900
Subject: [PATCH v4 2/3] Add the origin name and remote commit-LSN to logical
replication worker errcontext.
This commits adds both the commit-LSN and replication origin name to
the existing error context message.
This will help users in specifying the origin name and commit-LSN to
pg_replication_origin_advance() SQL function to skip the particular transaction.
---
doc/src/sgml/logical-replication.sgml | 22 +++++--
src/backend/replication/logical/worker.c | 75 ++++++++++++++++++------
2 files changed, 74 insertions(+), 23 deletions(-)
diff --git a/doc/src/sgml/logical-replication.sgml b/doc/src/sgml/logical-replication.sgml
index fb4472356d..58aaaa3c6b 100644
--- a/doc/src/sgml/logical-replication.sgml
+++ b/doc/src/sgml/logical-replication.sgml
@@ -352,11 +352,25 @@
<para>
The resolution can be done either by changing data or permissions on the subscriber so
that it does not conflict with the incoming change or by skipping the
- transaction that conflicts with the existing data. The transaction can be
- skipped by calling the <link linkend="pg-replication-origin-advance">
+ transaction that conflicts with the existing data. When a conflict produces
+ an error, the replication won't proceed, and the logical replication worker will
+ emit the following kind of message to the subscriber's server log:
+<screen>
+ERROR: duplicate key value violates unique constraint "test_pkey"
+DETAIL: Key (c)=(1) already exists.
+CONTEXT: processing remote data for replication origin "pg_16395" during "INSERT" for replication target relation "public.test" in transaction 725 finished at 0/14C0378
+</screen>
+ The LSN of the transaction that contains the change violating the constraint and
+ the replication origin name can be found from the server log (LSN 0/14C0378 and
+ replication origin <literal>pg_16395</literal> in the above case). To skip the
+ the subscription needs to be disabled temporarily by
+ <command>ALTER SUBSCRIPTION ... DISABLE</command> first. Then, the transaction
+ can be skipped by calling the
+ <link linkend="pg-replication-origin-advance">
<function>pg_replication_origin_advance()</function></link> function with
- a <parameter>node_name</parameter> corresponding to the subscription name,
- and a position. The current position of origins can be seen in the
+ the <parameter>node_name</parameter> (i.e., <literal>pg_16395</literal>) and the
+ next LSN of the transaction's LSN (i.e., LSN 0/14C0379). The current position
+ of origins can be seen in the
<link linkend="view-pg-replication-origin-status">
<structname>pg_replication_origin_status</structname></link> system view.
</para>
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 92aa794706..8653e1d840 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -226,6 +226,8 @@ typedef struct ApplyErrorCallbackArg
/* Remote node information */
int remote_attnum; /* -1 if invalid */
TransactionId remote_xid;
+ XLogRecPtr finish_lsn;
+ char *origin_name;
} ApplyErrorCallbackArg;
static ApplyErrorCallbackArg apply_error_callback_arg =
@@ -234,6 +236,8 @@ static ApplyErrorCallbackArg apply_error_callback_arg =
.rel = NULL,
.remote_attnum = -1,
.remote_xid = InvalidTransactionId,
+ .finish_lsn = InvalidXLogRecPtr,
+ .origin_name = NULL,
};
static MemoryContext ApplyMessageContext = NULL;
@@ -332,7 +336,7 @@ static void apply_spooled_messages(TransactionId xid, XLogRecPtr lsn);
/* Functions for apply error callback */
static void apply_error_callback(void *arg);
-static inline void set_apply_error_context_xact(TransactionId xid);
+static inline void set_apply_error_context_xact(TransactionId xid, XLogRecPtr lsn);
static inline void reset_apply_error_context_info(void);
/*
@@ -785,7 +789,7 @@ apply_handle_begin(StringInfo s)
LogicalRepBeginData begin_data;
logicalrep_read_begin(s, &begin_data);
- set_apply_error_context_xact(begin_data.xid);
+ set_apply_error_context_xact(begin_data.xid, begin_data.final_lsn);
remote_final_lsn = begin_data.final_lsn;
@@ -837,7 +841,7 @@ apply_handle_begin_prepare(StringInfo s)
errmsg_internal("tablesync worker received a BEGIN PREPARE message")));
logicalrep_read_begin_prepare(s, &begin_data);
- set_apply_error_context_xact(begin_data.xid);
+ set_apply_error_context_xact(begin_data.xid, begin_data.prepare_lsn);
remote_final_lsn = begin_data.prepare_lsn;
@@ -936,7 +940,7 @@ apply_handle_commit_prepared(StringInfo s)
char gid[GIDSIZE];
logicalrep_read_commit_prepared(s, &prepare_data);
- set_apply_error_context_xact(prepare_data.xid);
+ set_apply_error_context_xact(prepare_data.xid, prepare_data.commit_lsn);
/* Compute GID for two_phase transactions. */
TwoPhaseTransactionGid(MySubscription->oid, prepare_data.xid,
@@ -977,7 +981,7 @@ apply_handle_rollback_prepared(StringInfo s)
char gid[GIDSIZE];
logicalrep_read_rollback_prepared(s, &rollback_data);
- set_apply_error_context_xact(rollback_data.xid);
+ set_apply_error_context_xact(rollback_data.xid, rollback_data.rollback_end_lsn);
/* Compute GID for two_phase transactions. */
TwoPhaseTransactionGid(MySubscription->oid, rollback_data.xid,
@@ -1042,7 +1046,7 @@ apply_handle_stream_prepare(StringInfo s)
errmsg_internal("tablesync worker received a STREAM PREPARE message")));
logicalrep_read_stream_prepare(s, &prepare_data);
- set_apply_error_context_xact(prepare_data.xid);
+ set_apply_error_context_xact(prepare_data.xid, prepare_data.prepare_lsn);
elog(DEBUG1, "received prepare for streamed transaction %u", prepare_data.xid);
@@ -1124,7 +1128,7 @@ apply_handle_stream_start(StringInfo s)
(errcode(ERRCODE_PROTOCOL_VIOLATION),
errmsg_internal("invalid transaction ID in streamed replication transaction")));
- set_apply_error_context_xact(stream_xid);
+ set_apply_error_context_xact(stream_xid, InvalidXLogRecPtr);
/*
* Initialize the worker's stream_fileset if we haven't yet. This will be
@@ -1213,7 +1217,7 @@ apply_handle_stream_abort(StringInfo s)
*/
if (xid == subxid)
{
- set_apply_error_context_xact(xid);
+ set_apply_error_context_xact(xid, InvalidXLogRecPtr);
stream_cleanup_files(MyLogicalRepWorker->subid, xid);
}
else
@@ -1239,7 +1243,7 @@ apply_handle_stream_abort(StringInfo s)
bool found = false;
char path[MAXPGPATH];
- set_apply_error_context_xact(subxid);
+ set_apply_error_context_xact(subxid, InvalidXLogRecPtr);
subidx = -1;
begin_replication_step();
@@ -1424,7 +1428,7 @@ apply_handle_stream_commit(StringInfo s)
errmsg_internal("STREAM COMMIT message without STREAM STOP")));
xid = logicalrep_read_stream_commit(s, &commit_data);
- set_apply_error_context_xact(xid);
+ set_apply_error_context_xact(xid, commit_data.commit_lsn);
elog(DEBUG1, "received commit for streamed transaction %u", xid);
@@ -3499,6 +3503,17 @@ ApplyWorkerMain(Datum main_arg)
myslotname = MemoryContextStrdup(ApplyContext, syncslotname);
pfree(syncslotname);
+
+ /*
+ * Allocate the origin name in long-lived context for error context
+ * message.
+ */
+ ReplicationOriginNameForTablesync(MySubscription->oid,
+ MyLogicalRepWorker->relid,
+ originname,
+ sizeof(originname));
+ apply_error_callback_arg.origin_name = MemoryContextStrdup(ApplyContext,
+ originname);
}
else
{
@@ -3542,6 +3557,13 @@ ApplyWorkerMain(Datum main_arg)
* does some initializations on the upstream so let's still call it.
*/
(void) walrcv_identify_system(LogRepWorkerWalRcvConn, &startpointTLI);
+
+ /*
+ * Allocate the origin name in long-lived context for error context
+ * message.
+ */
+ apply_error_callback_arg.origin_name = MemoryContextStrdup(ApplyContext,
+ originname);
}
/*
@@ -3651,36 +3673,51 @@ apply_error_callback(void *arg)
if (apply_error_callback_arg.command == 0)
return;
+ Assert(errarg->origin_name);
+
if (errarg->rel == NULL)
{
if (!TransactionIdIsValid(errarg->remote_xid))
- errcontext("processing remote data during \"%s\"",
+ errcontext("processing remote data for replication origin \"%s\" during \"%s\"",
+ errarg->origin_name,
logicalrep_message_type(errarg->command));
- else
- errcontext("processing remote data during \"%s\" in transaction %u",
+ else if (XLogRecPtrIsInvalid(errarg->finish_lsn))
+ errcontext("processing remote data for replication origin \"%s\" during \"%s\" in transaction %u",
+ errarg->origin_name,
logicalrep_message_type(errarg->command),
errarg->remote_xid);
+ else
+ errcontext("processing remote data for replication origin \"%s\" during \"%s\" in transaction %u finished at %X/%X",
+ errarg->origin_name,
+ logicalrep_message_type(errarg->command),
+ errarg->remote_xid,
+ LSN_FORMAT_ARGS(errarg->finish_lsn));
}
else if (errarg->remote_attnum < 0)
- errcontext("processing remote data during \"%s\" for replication target relation \"%s.%s\" in transaction %u",
+ errcontext("processing remote data for replication origin \"%s\" during \"%s\" for replication target relation \"%s.%s\" in transaction %u finished at %X/%X",
+ errarg->origin_name,
logicalrep_message_type(errarg->command),
errarg->rel->remoterel.nspname,
errarg->rel->remoterel.relname,
- errarg->remote_xid);
+ errarg->remote_xid,
+ LSN_FORMAT_ARGS(errarg->finish_lsn));
else
- errcontext("processing remote data during \"%s\" for replication target relation \"%s.%s\" column \"%s\" in transaction %u",
+ errcontext("processing remote data for replication origin \"%s\" during \"%s\" for replication target relation \"%s.%s\" column \"%s\" in transaction %u finished at %X/%X",
+ errarg->origin_name,
logicalrep_message_type(errarg->command),
errarg->rel->remoterel.nspname,
errarg->rel->remoterel.relname,
errarg->rel->remoterel.attnames[errarg->remote_attnum],
- errarg->remote_xid);
+ errarg->remote_xid,
+ LSN_FORMAT_ARGS(errarg->finish_lsn));
}
/* Set transaction information of apply error callback */
static inline void
-set_apply_error_context_xact(TransactionId xid)
+set_apply_error_context_xact(TransactionId xid, XLogRecPtr lsn)
{
apply_error_callback_arg.remote_xid = xid;
+ apply_error_callback_arg.finish_lsn = lsn;
}
/* Reset all information of apply error callback */
@@ -3690,5 +3727,5 @@ reset_apply_error_context_info(void)
apply_error_callback_arg.command = 0;
apply_error_callback_arg.rel = NULL;
apply_error_callback_arg.remote_attnum = -1;
- set_apply_error_context_xact(InvalidTransactionId);
+ set_apply_error_context_xact(InvalidTransactionId, InvalidXLogRecPtr);
}
--
2.24.3 (Apple Git-128)
v4-0001-Use-complete-sentences-in-logical-replication-wor.patchapplication/octet-stream; name=v4-0001-Use-complete-sentences-in-logical-replication-wor.patchDownload
From 1b318de52a9b157238467b45f52657f86157518b Mon Sep 17 00:00:00 2001
From: Masahiko Sawada <sawada.mshk@gmail.com>
Date: Mon, 28 Feb 2022 17:53:28 +0900
Subject: [PATCH v4 1/3] Use complete sentences in logical replication worker
errcontext.
Previously, the message for logical replication worker errcontext is
incrementally built, which was not translation friendly. Instead, we
use complete sentences with if-else branches.
We also remove the commit timestamp from the context message since
it's not an important information and made the message long.
---
src/backend/replication/logical/worker.c | 73 +++++++++++-------------
1 file changed, 33 insertions(+), 40 deletions(-)
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 7e267f7960..92aa794706 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -226,7 +226,6 @@ typedef struct ApplyErrorCallbackArg
/* Remote node information */
int remote_attnum; /* -1 if invalid */
TransactionId remote_xid;
- TimestampTz ts; /* commit, rollback, or prepare timestamp */
} ApplyErrorCallbackArg;
static ApplyErrorCallbackArg apply_error_callback_arg =
@@ -235,7 +234,6 @@ static ApplyErrorCallbackArg apply_error_callback_arg =
.rel = NULL,
.remote_attnum = -1,
.remote_xid = InvalidTransactionId,
- .ts = 0,
};
static MemoryContext ApplyMessageContext = NULL;
@@ -334,7 +332,7 @@ static void apply_spooled_messages(TransactionId xid, XLogRecPtr lsn);
/* Functions for apply error callback */
static void apply_error_callback(void *arg);
-static inline void set_apply_error_context_xact(TransactionId xid, TimestampTz ts);
+static inline void set_apply_error_context_xact(TransactionId xid);
static inline void reset_apply_error_context_info(void);
/*
@@ -787,7 +785,7 @@ apply_handle_begin(StringInfo s)
LogicalRepBeginData begin_data;
logicalrep_read_begin(s, &begin_data);
- set_apply_error_context_xact(begin_data.xid, begin_data.committime);
+ set_apply_error_context_xact(begin_data.xid);
remote_final_lsn = begin_data.final_lsn;
@@ -839,7 +837,7 @@ apply_handle_begin_prepare(StringInfo s)
errmsg_internal("tablesync worker received a BEGIN PREPARE message")));
logicalrep_read_begin_prepare(s, &begin_data);
- set_apply_error_context_xact(begin_data.xid, begin_data.prepare_time);
+ set_apply_error_context_xact(begin_data.xid);
remote_final_lsn = begin_data.prepare_lsn;
@@ -938,7 +936,7 @@ apply_handle_commit_prepared(StringInfo s)
char gid[GIDSIZE];
logicalrep_read_commit_prepared(s, &prepare_data);
- set_apply_error_context_xact(prepare_data.xid, prepare_data.commit_time);
+ set_apply_error_context_xact(prepare_data.xid);
/* Compute GID for two_phase transactions. */
TwoPhaseTransactionGid(MySubscription->oid, prepare_data.xid,
@@ -979,7 +977,7 @@ apply_handle_rollback_prepared(StringInfo s)
char gid[GIDSIZE];
logicalrep_read_rollback_prepared(s, &rollback_data);
- set_apply_error_context_xact(rollback_data.xid, rollback_data.rollback_time);
+ set_apply_error_context_xact(rollback_data.xid);
/* Compute GID for two_phase transactions. */
TwoPhaseTransactionGid(MySubscription->oid, rollback_data.xid,
@@ -1044,7 +1042,7 @@ apply_handle_stream_prepare(StringInfo s)
errmsg_internal("tablesync worker received a STREAM PREPARE message")));
logicalrep_read_stream_prepare(s, &prepare_data);
- set_apply_error_context_xact(prepare_data.xid, prepare_data.prepare_time);
+ set_apply_error_context_xact(prepare_data.xid);
elog(DEBUG1, "received prepare for streamed transaction %u", prepare_data.xid);
@@ -1126,7 +1124,7 @@ apply_handle_stream_start(StringInfo s)
(errcode(ERRCODE_PROTOCOL_VIOLATION),
errmsg_internal("invalid transaction ID in streamed replication transaction")));
- set_apply_error_context_xact(stream_xid, 0);
+ set_apply_error_context_xact(stream_xid);
/*
* Initialize the worker's stream_fileset if we haven't yet. This will be
@@ -1215,7 +1213,7 @@ apply_handle_stream_abort(StringInfo s)
*/
if (xid == subxid)
{
- set_apply_error_context_xact(xid, 0);
+ set_apply_error_context_xact(xid);
stream_cleanup_files(MyLogicalRepWorker->subid, xid);
}
else
@@ -1241,7 +1239,7 @@ apply_handle_stream_abort(StringInfo s)
bool found = false;
char path[MAXPGPATH];
- set_apply_error_context_xact(subxid, 0);
+ set_apply_error_context_xact(subxid);
subidx = -1;
begin_replication_step();
@@ -1426,7 +1424,7 @@ apply_handle_stream_commit(StringInfo s)
errmsg_internal("STREAM COMMIT message without STREAM STOP")));
xid = logicalrep_read_stream_commit(s, &commit_data);
- set_apply_error_context_xact(xid, commit_data.committime);
+ set_apply_error_context_xact(xid);
elog(DEBUG1, "received commit for streamed transaction %u", xid);
@@ -3648,46 +3646,41 @@ IsLogicalWorker(void)
static void
apply_error_callback(void *arg)
{
- StringInfoData buf;
ApplyErrorCallbackArg *errarg = &apply_error_callback_arg;
if (apply_error_callback_arg.command == 0)
return;
- initStringInfo(&buf);
- appendStringInfo(&buf, _("processing remote data during \"%s\""),
- logicalrep_message_type(errarg->command));
-
- /* append relation information */
- if (errarg->rel)
- {
- appendStringInfo(&buf, _(" for replication target relation \"%s.%s\""),
- errarg->rel->remoterel.nspname,
- errarg->rel->remoterel.relname);
- if (errarg->remote_attnum >= 0)
- appendStringInfo(&buf, _(" column \"%s\""),
- errarg->rel->remoterel.attnames[errarg->remote_attnum]);
- }
-
- /* append transaction information */
- if (TransactionIdIsNormal(errarg->remote_xid))
+ if (errarg->rel == NULL)
{
- appendStringInfo(&buf, _(" in transaction %u"), errarg->remote_xid);
- if (errarg->ts != 0)
- appendStringInfo(&buf, _(" at %s"),
- timestamptz_to_str(errarg->ts));
+ if (!TransactionIdIsValid(errarg->remote_xid))
+ errcontext("processing remote data during \"%s\"",
+ logicalrep_message_type(errarg->command));
+ else
+ errcontext("processing remote data during \"%s\" in transaction %u",
+ logicalrep_message_type(errarg->command),
+ errarg->remote_xid);
}
-
- errcontext("%s", buf.data);
- pfree(buf.data);
+ else if (errarg->remote_attnum < 0)
+ errcontext("processing remote data during \"%s\" for replication target relation \"%s.%s\" in transaction %u",
+ logicalrep_message_type(errarg->command),
+ errarg->rel->remoterel.nspname,
+ errarg->rel->remoterel.relname,
+ errarg->remote_xid);
+ else
+ errcontext("processing remote data during \"%s\" for replication target relation \"%s.%s\" column \"%s\" in transaction %u",
+ logicalrep_message_type(errarg->command),
+ errarg->rel->remoterel.nspname,
+ errarg->rel->remoterel.relname,
+ errarg->rel->remoterel.attnames[errarg->remote_attnum],
+ errarg->remote_xid);
}
/* Set transaction information of apply error callback */
static inline void
-set_apply_error_context_xact(TransactionId xid, TimestampTz ts)
+set_apply_error_context_xact(TransactionId xid)
{
apply_error_callback_arg.remote_xid = xid;
- apply_error_callback_arg.ts = ts;
}
/* Reset all information of apply error callback */
@@ -3697,5 +3690,5 @@ reset_apply_error_context_info(void)
apply_error_callback_arg.command = 0;
apply_error_callback_arg.rel = NULL;
apply_error_callback_arg.remote_attnum = -1;
- set_apply_error_context_xact(InvalidTransactionId, 0);
+ set_apply_error_context_xact(InvalidTransactionId);
}
--
2.24.3 (Apple Git-128)
On Fri, Mar 4, 2022, at 2:54 AM, Amit Kapila wrote:
The LSN of the transaction that contains the change violating the
constraint and the replication origin name can be found from the
server log (LSN 0/14C0378 and replication origin
<literal>pg_16395</literal> in the above case). To skip the
transaction, the subscription needs to be disabled temporarily by
<command>ALTER SUBSCRIPTION ... DISABLE</command> first. Then, the
transaction can be skipped by calling the <link
linkend="pg-replication-origin-advance">
<function>pg_replication_origin_advance()</function></link> function
with the <parameter>node_name</parameter> (i.e.,
<literal>pg_16395</literal>) and the next LSN of the commit LSN (i.e.,
LSN 0/14C0379).
You could also add:
After that the replication can be resumed by <command>ALTER SUBSCRIPTION ...
ENABLE</command>.
Let's provide complete instructions.
--
Euler Taveira
EDB https://www.enterprisedb.com/
On Fri, Mar 4, 2022 at 6:02 PM Euler Taveira <euler@eulerto.com> wrote:
On Fri, Mar 4, 2022, at 2:54 AM, Amit Kapila wrote:
The LSN of the transaction that contains the change violating the
constraint and the replication origin name can be found from the
server log (LSN 0/14C0378 and replication origin
<literal>pg_16395</literal> in the above case). To skip the
transaction, the subscription needs to be disabled temporarily by
<command>ALTER SUBSCRIPTION ... DISABLE</command> first. Then, the
transaction can be skipped by calling the <link
linkend="pg-replication-origin-advance">
<function>pg_replication_origin_advance()</function></link> function
with the <parameter>node_name</parameter> (i.e.,
<literal>pg_16395</literal>) and the next LSN of the commit LSN (i.e.,
LSN 0/14C0379).You could also add:
After that the replication can be resumed by <command>ALTER SUBSCRIPTION ...
ENABLE</command>.Let's provide complete instructions.
+1. That sounds reasonable to me.
--
With Regards,
Amit Kapila.
On Fri, Mar 4, 2022 at 9:32 PM Euler Taveira <euler@eulerto.com> wrote:
On Fri, Mar 4, 2022, at 2:54 AM, Amit Kapila wrote:
The LSN of the transaction that contains the change violating the
constraint and the replication origin name can be found from the
server log (LSN 0/14C0378 and replication origin
<literal>pg_16395</literal> in the above case). To skip the
transaction, the subscription needs to be disabled temporarily by
<command>ALTER SUBSCRIPTION ... DISABLE</command> first. Then, the
transaction can be skipped by calling the <link
linkend="pg-replication-origin-advance">
<function>pg_replication_origin_advance()</function></link> function
with the <parameter>node_name</parameter> (i.e.,
<literal>pg_16395</literal>) and the next LSN of the commit LSN (i.e.,
LSN 0/14C0379).You could also add:
After that the replication can be resumed by <command>ALTER SUBSCRIPTION ...
ENABLE</command>.Let's provide complete instructions.
Thank you for the comment. +1.
I've attached updated patches.
Regards,
--
Masahiko Sawada
EDB: https://www.enterprisedb.com/
Attachments:
v5-0002-Add-the-origin-name-and-remote-commit-LSN-to-logi.patchapplication/octet-stream; name=v5-0002-Add-the-origin-name-and-remote-commit-LSN-to-logi.patchDownload
From d95d0672622367076c0bbb1db686a1f0aa2e01c5 Mon Sep 17 00:00:00 2001
From: Masahiko Sawada <sawada.mshk@gmail.com>
Date: Thu, 24 Feb 2022 16:56:58 +0900
Subject: [PATCH v5 2/3] Add the origin name and remote commit-LSN to logical
replication worker errcontext.
This commits adds both the commit-LSN and replication origin name to
the existing error context message.
This will help users in specifying the origin name and commit-LSN to
pg_replication_origin_advance() SQL function to skip the particular transaction.
---
doc/src/sgml/logical-replication.sgml | 23 ++++++--
src/backend/replication/logical/worker.c | 75 ++++++++++++++++++------
2 files changed, 75 insertions(+), 23 deletions(-)
diff --git a/doc/src/sgml/logical-replication.sgml b/doc/src/sgml/logical-replication.sgml
index fb4472356d..4c89b71397 100644
--- a/doc/src/sgml/logical-replication.sgml
+++ b/doc/src/sgml/logical-replication.sgml
@@ -352,11 +352,26 @@
<para>
The resolution can be done either by changing data or permissions on the subscriber so
that it does not conflict with the incoming change or by skipping the
- transaction that conflicts with the existing data. The transaction can be
- skipped by calling the <link linkend="pg-replication-origin-advance">
+ transaction that conflicts with the existing data. When a conflict produces
+ an error, the replication won't proceed, and the logical replication worker will
+ emit the following kind of message to the subscriber's server log:
+<screen>
+ERROR: duplicate key value violates unique constraint "test_pkey"
+DETAIL: Key (c)=(1) already exists.
+CONTEXT: processing remote data for replication origin "pg_16395" during "INSERT" for replication target relation "public.test" in transaction 725 finished at 0/14C0378
+</screen>
+ The LSN of the transaction that contains the change violating the constraint and
+ the replication origin name can be found from the server log (LSN 0/14C0378 and
+ replication origin <literal>pg_16395</literal> in the above case). To skip the
+ the subscription needs to be disabled temporarily by
+ <command>ALTER SUBSCRIPTION ... DISABLE</command> first. Then, the transaction
+ can be skipped by calling the
+ <link linkend="pg-replication-origin-advance">
<function>pg_replication_origin_advance()</function></link> function with
- a <parameter>node_name</parameter> corresponding to the subscription name,
- and a position. The current position of origins can be seen in the
+ the <parameter>node_name</parameter> (i.e., <literal>pg_16395</literal>) and the
+ next LSN of the transaction's LSN (i.e., LSN 0/14C0379). After that the replication
+ can be resumed by <command>ALTER SUBSCRIPTION ... ENABLE</command>. The current
+ position of origins can be seen in the
<link linkend="view-pg-replication-origin-status">
<structname>pg_replication_origin_status</structname></link> system view.
</para>
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 92aa794706..8653e1d840 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -226,6 +226,8 @@ typedef struct ApplyErrorCallbackArg
/* Remote node information */
int remote_attnum; /* -1 if invalid */
TransactionId remote_xid;
+ XLogRecPtr finish_lsn;
+ char *origin_name;
} ApplyErrorCallbackArg;
static ApplyErrorCallbackArg apply_error_callback_arg =
@@ -234,6 +236,8 @@ static ApplyErrorCallbackArg apply_error_callback_arg =
.rel = NULL,
.remote_attnum = -1,
.remote_xid = InvalidTransactionId,
+ .finish_lsn = InvalidXLogRecPtr,
+ .origin_name = NULL,
};
static MemoryContext ApplyMessageContext = NULL;
@@ -332,7 +336,7 @@ static void apply_spooled_messages(TransactionId xid, XLogRecPtr lsn);
/* Functions for apply error callback */
static void apply_error_callback(void *arg);
-static inline void set_apply_error_context_xact(TransactionId xid);
+static inline void set_apply_error_context_xact(TransactionId xid, XLogRecPtr lsn);
static inline void reset_apply_error_context_info(void);
/*
@@ -785,7 +789,7 @@ apply_handle_begin(StringInfo s)
LogicalRepBeginData begin_data;
logicalrep_read_begin(s, &begin_data);
- set_apply_error_context_xact(begin_data.xid);
+ set_apply_error_context_xact(begin_data.xid, begin_data.final_lsn);
remote_final_lsn = begin_data.final_lsn;
@@ -837,7 +841,7 @@ apply_handle_begin_prepare(StringInfo s)
errmsg_internal("tablesync worker received a BEGIN PREPARE message")));
logicalrep_read_begin_prepare(s, &begin_data);
- set_apply_error_context_xact(begin_data.xid);
+ set_apply_error_context_xact(begin_data.xid, begin_data.prepare_lsn);
remote_final_lsn = begin_data.prepare_lsn;
@@ -936,7 +940,7 @@ apply_handle_commit_prepared(StringInfo s)
char gid[GIDSIZE];
logicalrep_read_commit_prepared(s, &prepare_data);
- set_apply_error_context_xact(prepare_data.xid);
+ set_apply_error_context_xact(prepare_data.xid, prepare_data.commit_lsn);
/* Compute GID for two_phase transactions. */
TwoPhaseTransactionGid(MySubscription->oid, prepare_data.xid,
@@ -977,7 +981,7 @@ apply_handle_rollback_prepared(StringInfo s)
char gid[GIDSIZE];
logicalrep_read_rollback_prepared(s, &rollback_data);
- set_apply_error_context_xact(rollback_data.xid);
+ set_apply_error_context_xact(rollback_data.xid, rollback_data.rollback_end_lsn);
/* Compute GID for two_phase transactions. */
TwoPhaseTransactionGid(MySubscription->oid, rollback_data.xid,
@@ -1042,7 +1046,7 @@ apply_handle_stream_prepare(StringInfo s)
errmsg_internal("tablesync worker received a STREAM PREPARE message")));
logicalrep_read_stream_prepare(s, &prepare_data);
- set_apply_error_context_xact(prepare_data.xid);
+ set_apply_error_context_xact(prepare_data.xid, prepare_data.prepare_lsn);
elog(DEBUG1, "received prepare for streamed transaction %u", prepare_data.xid);
@@ -1124,7 +1128,7 @@ apply_handle_stream_start(StringInfo s)
(errcode(ERRCODE_PROTOCOL_VIOLATION),
errmsg_internal("invalid transaction ID in streamed replication transaction")));
- set_apply_error_context_xact(stream_xid);
+ set_apply_error_context_xact(stream_xid, InvalidXLogRecPtr);
/*
* Initialize the worker's stream_fileset if we haven't yet. This will be
@@ -1213,7 +1217,7 @@ apply_handle_stream_abort(StringInfo s)
*/
if (xid == subxid)
{
- set_apply_error_context_xact(xid);
+ set_apply_error_context_xact(xid, InvalidXLogRecPtr);
stream_cleanup_files(MyLogicalRepWorker->subid, xid);
}
else
@@ -1239,7 +1243,7 @@ apply_handle_stream_abort(StringInfo s)
bool found = false;
char path[MAXPGPATH];
- set_apply_error_context_xact(subxid);
+ set_apply_error_context_xact(subxid, InvalidXLogRecPtr);
subidx = -1;
begin_replication_step();
@@ -1424,7 +1428,7 @@ apply_handle_stream_commit(StringInfo s)
errmsg_internal("STREAM COMMIT message without STREAM STOP")));
xid = logicalrep_read_stream_commit(s, &commit_data);
- set_apply_error_context_xact(xid);
+ set_apply_error_context_xact(xid, commit_data.commit_lsn);
elog(DEBUG1, "received commit for streamed transaction %u", xid);
@@ -3499,6 +3503,17 @@ ApplyWorkerMain(Datum main_arg)
myslotname = MemoryContextStrdup(ApplyContext, syncslotname);
pfree(syncslotname);
+
+ /*
+ * Allocate the origin name in long-lived context for error context
+ * message.
+ */
+ ReplicationOriginNameForTablesync(MySubscription->oid,
+ MyLogicalRepWorker->relid,
+ originname,
+ sizeof(originname));
+ apply_error_callback_arg.origin_name = MemoryContextStrdup(ApplyContext,
+ originname);
}
else
{
@@ -3542,6 +3557,13 @@ ApplyWorkerMain(Datum main_arg)
* does some initializations on the upstream so let's still call it.
*/
(void) walrcv_identify_system(LogRepWorkerWalRcvConn, &startpointTLI);
+
+ /*
+ * Allocate the origin name in long-lived context for error context
+ * message.
+ */
+ apply_error_callback_arg.origin_name = MemoryContextStrdup(ApplyContext,
+ originname);
}
/*
@@ -3651,36 +3673,51 @@ apply_error_callback(void *arg)
if (apply_error_callback_arg.command == 0)
return;
+ Assert(errarg->origin_name);
+
if (errarg->rel == NULL)
{
if (!TransactionIdIsValid(errarg->remote_xid))
- errcontext("processing remote data during \"%s\"",
+ errcontext("processing remote data for replication origin \"%s\" during \"%s\"",
+ errarg->origin_name,
logicalrep_message_type(errarg->command));
- else
- errcontext("processing remote data during \"%s\" in transaction %u",
+ else if (XLogRecPtrIsInvalid(errarg->finish_lsn))
+ errcontext("processing remote data for replication origin \"%s\" during \"%s\" in transaction %u",
+ errarg->origin_name,
logicalrep_message_type(errarg->command),
errarg->remote_xid);
+ else
+ errcontext("processing remote data for replication origin \"%s\" during \"%s\" in transaction %u finished at %X/%X",
+ errarg->origin_name,
+ logicalrep_message_type(errarg->command),
+ errarg->remote_xid,
+ LSN_FORMAT_ARGS(errarg->finish_lsn));
}
else if (errarg->remote_attnum < 0)
- errcontext("processing remote data during \"%s\" for replication target relation \"%s.%s\" in transaction %u",
+ errcontext("processing remote data for replication origin \"%s\" during \"%s\" for replication target relation \"%s.%s\" in transaction %u finished at %X/%X",
+ errarg->origin_name,
logicalrep_message_type(errarg->command),
errarg->rel->remoterel.nspname,
errarg->rel->remoterel.relname,
- errarg->remote_xid);
+ errarg->remote_xid,
+ LSN_FORMAT_ARGS(errarg->finish_lsn));
else
- errcontext("processing remote data during \"%s\" for replication target relation \"%s.%s\" column \"%s\" in transaction %u",
+ errcontext("processing remote data for replication origin \"%s\" during \"%s\" for replication target relation \"%s.%s\" column \"%s\" in transaction %u finished at %X/%X",
+ errarg->origin_name,
logicalrep_message_type(errarg->command),
errarg->rel->remoterel.nspname,
errarg->rel->remoterel.relname,
errarg->rel->remoterel.attnames[errarg->remote_attnum],
- errarg->remote_xid);
+ errarg->remote_xid,
+ LSN_FORMAT_ARGS(errarg->finish_lsn));
}
/* Set transaction information of apply error callback */
static inline void
-set_apply_error_context_xact(TransactionId xid)
+set_apply_error_context_xact(TransactionId xid, XLogRecPtr lsn)
{
apply_error_callback_arg.remote_xid = xid;
+ apply_error_callback_arg.finish_lsn = lsn;
}
/* Reset all information of apply error callback */
@@ -3690,5 +3727,5 @@ reset_apply_error_context_info(void)
apply_error_callback_arg.command = 0;
apply_error_callback_arg.rel = NULL;
apply_error_callback_arg.remote_attnum = -1;
- set_apply_error_context_xact(InvalidTransactionId);
+ set_apply_error_context_xact(InvalidTransactionId, InvalidXLogRecPtr);
}
--
2.24.3 (Apple Git-128)
v5-0001-Use-complete-sentences-in-logical-replication-wor.patchapplication/octet-stream; name=v5-0001-Use-complete-sentences-in-logical-replication-wor.patchDownload
From 1b318de52a9b157238467b45f52657f86157518b Mon Sep 17 00:00:00 2001
From: Masahiko Sawada <sawada.mshk@gmail.com>
Date: Mon, 28 Feb 2022 17:53:28 +0900
Subject: [PATCH v5 1/3] Use complete sentences in logical replication worker
errcontext.
Previously, the message for logical replication worker errcontext is
incrementally built, which was not translation friendly. Instead, we
use complete sentences with if-else branches.
We also remove the commit timestamp from the context message since
it's not an important information and made the message long.
---
src/backend/replication/logical/worker.c | 73 +++++++++++-------------
1 file changed, 33 insertions(+), 40 deletions(-)
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 7e267f7960..92aa794706 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -226,7 +226,6 @@ typedef struct ApplyErrorCallbackArg
/* Remote node information */
int remote_attnum; /* -1 if invalid */
TransactionId remote_xid;
- TimestampTz ts; /* commit, rollback, or prepare timestamp */
} ApplyErrorCallbackArg;
static ApplyErrorCallbackArg apply_error_callback_arg =
@@ -235,7 +234,6 @@ static ApplyErrorCallbackArg apply_error_callback_arg =
.rel = NULL,
.remote_attnum = -1,
.remote_xid = InvalidTransactionId,
- .ts = 0,
};
static MemoryContext ApplyMessageContext = NULL;
@@ -334,7 +332,7 @@ static void apply_spooled_messages(TransactionId xid, XLogRecPtr lsn);
/* Functions for apply error callback */
static void apply_error_callback(void *arg);
-static inline void set_apply_error_context_xact(TransactionId xid, TimestampTz ts);
+static inline void set_apply_error_context_xact(TransactionId xid);
static inline void reset_apply_error_context_info(void);
/*
@@ -787,7 +785,7 @@ apply_handle_begin(StringInfo s)
LogicalRepBeginData begin_data;
logicalrep_read_begin(s, &begin_data);
- set_apply_error_context_xact(begin_data.xid, begin_data.committime);
+ set_apply_error_context_xact(begin_data.xid);
remote_final_lsn = begin_data.final_lsn;
@@ -839,7 +837,7 @@ apply_handle_begin_prepare(StringInfo s)
errmsg_internal("tablesync worker received a BEGIN PREPARE message")));
logicalrep_read_begin_prepare(s, &begin_data);
- set_apply_error_context_xact(begin_data.xid, begin_data.prepare_time);
+ set_apply_error_context_xact(begin_data.xid);
remote_final_lsn = begin_data.prepare_lsn;
@@ -938,7 +936,7 @@ apply_handle_commit_prepared(StringInfo s)
char gid[GIDSIZE];
logicalrep_read_commit_prepared(s, &prepare_data);
- set_apply_error_context_xact(prepare_data.xid, prepare_data.commit_time);
+ set_apply_error_context_xact(prepare_data.xid);
/* Compute GID for two_phase transactions. */
TwoPhaseTransactionGid(MySubscription->oid, prepare_data.xid,
@@ -979,7 +977,7 @@ apply_handle_rollback_prepared(StringInfo s)
char gid[GIDSIZE];
logicalrep_read_rollback_prepared(s, &rollback_data);
- set_apply_error_context_xact(rollback_data.xid, rollback_data.rollback_time);
+ set_apply_error_context_xact(rollback_data.xid);
/* Compute GID for two_phase transactions. */
TwoPhaseTransactionGid(MySubscription->oid, rollback_data.xid,
@@ -1044,7 +1042,7 @@ apply_handle_stream_prepare(StringInfo s)
errmsg_internal("tablesync worker received a STREAM PREPARE message")));
logicalrep_read_stream_prepare(s, &prepare_data);
- set_apply_error_context_xact(prepare_data.xid, prepare_data.prepare_time);
+ set_apply_error_context_xact(prepare_data.xid);
elog(DEBUG1, "received prepare for streamed transaction %u", prepare_data.xid);
@@ -1126,7 +1124,7 @@ apply_handle_stream_start(StringInfo s)
(errcode(ERRCODE_PROTOCOL_VIOLATION),
errmsg_internal("invalid transaction ID in streamed replication transaction")));
- set_apply_error_context_xact(stream_xid, 0);
+ set_apply_error_context_xact(stream_xid);
/*
* Initialize the worker's stream_fileset if we haven't yet. This will be
@@ -1215,7 +1213,7 @@ apply_handle_stream_abort(StringInfo s)
*/
if (xid == subxid)
{
- set_apply_error_context_xact(xid, 0);
+ set_apply_error_context_xact(xid);
stream_cleanup_files(MyLogicalRepWorker->subid, xid);
}
else
@@ -1241,7 +1239,7 @@ apply_handle_stream_abort(StringInfo s)
bool found = false;
char path[MAXPGPATH];
- set_apply_error_context_xact(subxid, 0);
+ set_apply_error_context_xact(subxid);
subidx = -1;
begin_replication_step();
@@ -1426,7 +1424,7 @@ apply_handle_stream_commit(StringInfo s)
errmsg_internal("STREAM COMMIT message without STREAM STOP")));
xid = logicalrep_read_stream_commit(s, &commit_data);
- set_apply_error_context_xact(xid, commit_data.committime);
+ set_apply_error_context_xact(xid);
elog(DEBUG1, "received commit for streamed transaction %u", xid);
@@ -3648,46 +3646,41 @@ IsLogicalWorker(void)
static void
apply_error_callback(void *arg)
{
- StringInfoData buf;
ApplyErrorCallbackArg *errarg = &apply_error_callback_arg;
if (apply_error_callback_arg.command == 0)
return;
- initStringInfo(&buf);
- appendStringInfo(&buf, _("processing remote data during \"%s\""),
- logicalrep_message_type(errarg->command));
-
- /* append relation information */
- if (errarg->rel)
- {
- appendStringInfo(&buf, _(" for replication target relation \"%s.%s\""),
- errarg->rel->remoterel.nspname,
- errarg->rel->remoterel.relname);
- if (errarg->remote_attnum >= 0)
- appendStringInfo(&buf, _(" column \"%s\""),
- errarg->rel->remoterel.attnames[errarg->remote_attnum]);
- }
-
- /* append transaction information */
- if (TransactionIdIsNormal(errarg->remote_xid))
+ if (errarg->rel == NULL)
{
- appendStringInfo(&buf, _(" in transaction %u"), errarg->remote_xid);
- if (errarg->ts != 0)
- appendStringInfo(&buf, _(" at %s"),
- timestamptz_to_str(errarg->ts));
+ if (!TransactionIdIsValid(errarg->remote_xid))
+ errcontext("processing remote data during \"%s\"",
+ logicalrep_message_type(errarg->command));
+ else
+ errcontext("processing remote data during \"%s\" in transaction %u",
+ logicalrep_message_type(errarg->command),
+ errarg->remote_xid);
}
-
- errcontext("%s", buf.data);
- pfree(buf.data);
+ else if (errarg->remote_attnum < 0)
+ errcontext("processing remote data during \"%s\" for replication target relation \"%s.%s\" in transaction %u",
+ logicalrep_message_type(errarg->command),
+ errarg->rel->remoterel.nspname,
+ errarg->rel->remoterel.relname,
+ errarg->remote_xid);
+ else
+ errcontext("processing remote data during \"%s\" for replication target relation \"%s.%s\" column \"%s\" in transaction %u",
+ logicalrep_message_type(errarg->command),
+ errarg->rel->remoterel.nspname,
+ errarg->rel->remoterel.relname,
+ errarg->rel->remoterel.attnames[errarg->remote_attnum],
+ errarg->remote_xid);
}
/* Set transaction information of apply error callback */
static inline void
-set_apply_error_context_xact(TransactionId xid, TimestampTz ts)
+set_apply_error_context_xact(TransactionId xid)
{
apply_error_callback_arg.remote_xid = xid;
- apply_error_callback_arg.ts = ts;
}
/* Reset all information of apply error callback */
@@ -3697,5 +3690,5 @@ reset_apply_error_context_info(void)
apply_error_callback_arg.command = 0;
apply_error_callback_arg.rel = NULL;
apply_error_callback_arg.remote_attnum = -1;
- set_apply_error_context_xact(InvalidTransactionId, 0);
+ set_apply_error_context_xact(InvalidTransactionId);
}
--
2.24.3 (Apple Git-128)
On Mon, Mar 7, 2022 at 6:36 AM Masahiko Sawada <sawada.mshk@gmail.com> wrote:
Thank you for the comment. +1.
I've attached updated patches.
Pushed the first patch. Fixed one typo in the second patch and
slightly changed the commit message, otherwise, it looks good to me.
I'll push this tomorrow unless there are more comments.
--
With Regards,
Amit Kapila.
Attachments:
v6-0001-Add-the-additional-information-to-the-logical-rep.patchapplication/octet-stream; name=v6-0001-Add-the-additional-information-to-the-logical-rep.patchDownload
From 4bf2cabbf3a3b008ad33d69ac0ca95d7f49ebfec Mon Sep 17 00:00:00 2001
From: Amit Kapila <akapila@postgresql.org>
Date: Mon, 7 Mar 2022 09:40:31 +0530
Subject: [PATCH v6] Add the additional information to the logical replication
worker errcontext.
This commits adds both the finish LSN (commit_lsn in case transaction got
committed, prepare_lsn in case of a prepared transaction, etc.) and
replication origin name to the existing error context message.
This will help users in specifying the origin name and transaction finish
LSN to pg_replication_origin_advance() SQL function to skip a particular
transaction.
Author: Masahiko Sawada
Reviewed-by: Takamichi Osumi, Euler Taveira, and Amit Kapila
Discussion: https://postgr.es/m/CAD21AoBarBf2oTF71ig2g_o=3Z_Dt6_sOpMQma1kFgbnA5OZ_w@mail.gmail.com
---
doc/src/sgml/logical-replication.sgml | 23 ++++++--
src/backend/replication/logical/worker.c | 75 ++++++++++++++++++------
2 files changed, 75 insertions(+), 23 deletions(-)
diff --git a/doc/src/sgml/logical-replication.sgml b/doc/src/sgml/logical-replication.sgml
index fb4472356d..82326c3901 100644
--- a/doc/src/sgml/logical-replication.sgml
+++ b/doc/src/sgml/logical-replication.sgml
@@ -352,11 +352,26 @@
<para>
The resolution can be done either by changing data or permissions on the subscriber so
that it does not conflict with the incoming change or by skipping the
- transaction that conflicts with the existing data. The transaction can be
- skipped by calling the <link linkend="pg-replication-origin-advance">
+ transaction that conflicts with the existing data. When a conflict produces
+ an error, the replication won't proceed, and the logical replication worker will
+ emit the following kind of message to the subscriber's server log:
+<screen>
+ERROR: duplicate key value violates unique constraint "test_pkey"
+DETAIL: Key (c)=(1) already exists.
+CONTEXT: processing remote data for replication origin "pg_16395" during "INSERT" for replication target relation "public.test" in transaction 725 finished at 0/14C0378
+</screen>
+ The LSN of the transaction that contains the change violating the constraint and
+ the replication origin name can be found from the server log (LSN 0/14C0378 and
+ replication origin <literal>pg_16395</literal> in the above case). To skip the
+ transaction, the subscription needs to be disabled temporarily by
+ <command>ALTER SUBSCRIPTION ... DISABLE</command> first. Then, the transaction
+ can be skipped by calling the
+ <link linkend="pg-replication-origin-advance">
<function>pg_replication_origin_advance()</function></link> function with
- a <parameter>node_name</parameter> corresponding to the subscription name,
- and a position. The current position of origins can be seen in the
+ the <parameter>node_name</parameter> (i.e., <literal>pg_16395</literal>) and the
+ next LSN of the transaction's LSN (i.e., LSN 0/14C0379). After that the replication
+ can be resumed by <command>ALTER SUBSCRIPTION ... ENABLE</command>. The current
+ position of origins can be seen in the
<link linkend="view-pg-replication-origin-status">
<structname>pg_replication_origin_status</structname></link> system view.
</para>
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 92aa794706..8653e1d840 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -226,6 +226,8 @@ typedef struct ApplyErrorCallbackArg
/* Remote node information */
int remote_attnum; /* -1 if invalid */
TransactionId remote_xid;
+ XLogRecPtr finish_lsn;
+ char *origin_name;
} ApplyErrorCallbackArg;
static ApplyErrorCallbackArg apply_error_callback_arg =
@@ -234,6 +236,8 @@ static ApplyErrorCallbackArg apply_error_callback_arg =
.rel = NULL,
.remote_attnum = -1,
.remote_xid = InvalidTransactionId,
+ .finish_lsn = InvalidXLogRecPtr,
+ .origin_name = NULL,
};
static MemoryContext ApplyMessageContext = NULL;
@@ -332,7 +336,7 @@ static void apply_spooled_messages(TransactionId xid, XLogRecPtr lsn);
/* Functions for apply error callback */
static void apply_error_callback(void *arg);
-static inline void set_apply_error_context_xact(TransactionId xid);
+static inline void set_apply_error_context_xact(TransactionId xid, XLogRecPtr lsn);
static inline void reset_apply_error_context_info(void);
/*
@@ -785,7 +789,7 @@ apply_handle_begin(StringInfo s)
LogicalRepBeginData begin_data;
logicalrep_read_begin(s, &begin_data);
- set_apply_error_context_xact(begin_data.xid);
+ set_apply_error_context_xact(begin_data.xid, begin_data.final_lsn);
remote_final_lsn = begin_data.final_lsn;
@@ -837,7 +841,7 @@ apply_handle_begin_prepare(StringInfo s)
errmsg_internal("tablesync worker received a BEGIN PREPARE message")));
logicalrep_read_begin_prepare(s, &begin_data);
- set_apply_error_context_xact(begin_data.xid);
+ set_apply_error_context_xact(begin_data.xid, begin_data.prepare_lsn);
remote_final_lsn = begin_data.prepare_lsn;
@@ -936,7 +940,7 @@ apply_handle_commit_prepared(StringInfo s)
char gid[GIDSIZE];
logicalrep_read_commit_prepared(s, &prepare_data);
- set_apply_error_context_xact(prepare_data.xid);
+ set_apply_error_context_xact(prepare_data.xid, prepare_data.commit_lsn);
/* Compute GID for two_phase transactions. */
TwoPhaseTransactionGid(MySubscription->oid, prepare_data.xid,
@@ -977,7 +981,7 @@ apply_handle_rollback_prepared(StringInfo s)
char gid[GIDSIZE];
logicalrep_read_rollback_prepared(s, &rollback_data);
- set_apply_error_context_xact(rollback_data.xid);
+ set_apply_error_context_xact(rollback_data.xid, rollback_data.rollback_end_lsn);
/* Compute GID for two_phase transactions. */
TwoPhaseTransactionGid(MySubscription->oid, rollback_data.xid,
@@ -1042,7 +1046,7 @@ apply_handle_stream_prepare(StringInfo s)
errmsg_internal("tablesync worker received a STREAM PREPARE message")));
logicalrep_read_stream_prepare(s, &prepare_data);
- set_apply_error_context_xact(prepare_data.xid);
+ set_apply_error_context_xact(prepare_data.xid, prepare_data.prepare_lsn);
elog(DEBUG1, "received prepare for streamed transaction %u", prepare_data.xid);
@@ -1124,7 +1128,7 @@ apply_handle_stream_start(StringInfo s)
(errcode(ERRCODE_PROTOCOL_VIOLATION),
errmsg_internal("invalid transaction ID in streamed replication transaction")));
- set_apply_error_context_xact(stream_xid);
+ set_apply_error_context_xact(stream_xid, InvalidXLogRecPtr);
/*
* Initialize the worker's stream_fileset if we haven't yet. This will be
@@ -1213,7 +1217,7 @@ apply_handle_stream_abort(StringInfo s)
*/
if (xid == subxid)
{
- set_apply_error_context_xact(xid);
+ set_apply_error_context_xact(xid, InvalidXLogRecPtr);
stream_cleanup_files(MyLogicalRepWorker->subid, xid);
}
else
@@ -1239,7 +1243,7 @@ apply_handle_stream_abort(StringInfo s)
bool found = false;
char path[MAXPGPATH];
- set_apply_error_context_xact(subxid);
+ set_apply_error_context_xact(subxid, InvalidXLogRecPtr);
subidx = -1;
begin_replication_step();
@@ -1424,7 +1428,7 @@ apply_handle_stream_commit(StringInfo s)
errmsg_internal("STREAM COMMIT message without STREAM STOP")));
xid = logicalrep_read_stream_commit(s, &commit_data);
- set_apply_error_context_xact(xid);
+ set_apply_error_context_xact(xid, commit_data.commit_lsn);
elog(DEBUG1, "received commit for streamed transaction %u", xid);
@@ -3499,6 +3503,17 @@ ApplyWorkerMain(Datum main_arg)
myslotname = MemoryContextStrdup(ApplyContext, syncslotname);
pfree(syncslotname);
+
+ /*
+ * Allocate the origin name in long-lived context for error context
+ * message.
+ */
+ ReplicationOriginNameForTablesync(MySubscription->oid,
+ MyLogicalRepWorker->relid,
+ originname,
+ sizeof(originname));
+ apply_error_callback_arg.origin_name = MemoryContextStrdup(ApplyContext,
+ originname);
}
else
{
@@ -3542,6 +3557,13 @@ ApplyWorkerMain(Datum main_arg)
* does some initializations on the upstream so let's still call it.
*/
(void) walrcv_identify_system(LogRepWorkerWalRcvConn, &startpointTLI);
+
+ /*
+ * Allocate the origin name in long-lived context for error context
+ * message.
+ */
+ apply_error_callback_arg.origin_name = MemoryContextStrdup(ApplyContext,
+ originname);
}
/*
@@ -3651,36 +3673,51 @@ apply_error_callback(void *arg)
if (apply_error_callback_arg.command == 0)
return;
+ Assert(errarg->origin_name);
+
if (errarg->rel == NULL)
{
if (!TransactionIdIsValid(errarg->remote_xid))
- errcontext("processing remote data during \"%s\"",
+ errcontext("processing remote data for replication origin \"%s\" during \"%s\"",
+ errarg->origin_name,
logicalrep_message_type(errarg->command));
- else
- errcontext("processing remote data during \"%s\" in transaction %u",
+ else if (XLogRecPtrIsInvalid(errarg->finish_lsn))
+ errcontext("processing remote data for replication origin \"%s\" during \"%s\" in transaction %u",
+ errarg->origin_name,
logicalrep_message_type(errarg->command),
errarg->remote_xid);
+ else
+ errcontext("processing remote data for replication origin \"%s\" during \"%s\" in transaction %u finished at %X/%X",
+ errarg->origin_name,
+ logicalrep_message_type(errarg->command),
+ errarg->remote_xid,
+ LSN_FORMAT_ARGS(errarg->finish_lsn));
}
else if (errarg->remote_attnum < 0)
- errcontext("processing remote data during \"%s\" for replication target relation \"%s.%s\" in transaction %u",
+ errcontext("processing remote data for replication origin \"%s\" during \"%s\" for replication target relation \"%s.%s\" in transaction %u finished at %X/%X",
+ errarg->origin_name,
logicalrep_message_type(errarg->command),
errarg->rel->remoterel.nspname,
errarg->rel->remoterel.relname,
- errarg->remote_xid);
+ errarg->remote_xid,
+ LSN_FORMAT_ARGS(errarg->finish_lsn));
else
- errcontext("processing remote data during \"%s\" for replication target relation \"%s.%s\" column \"%s\" in transaction %u",
+ errcontext("processing remote data for replication origin \"%s\" during \"%s\" for replication target relation \"%s.%s\" column \"%s\" in transaction %u finished at %X/%X",
+ errarg->origin_name,
logicalrep_message_type(errarg->command),
errarg->rel->remoterel.nspname,
errarg->rel->remoterel.relname,
errarg->rel->remoterel.attnames[errarg->remote_attnum],
- errarg->remote_xid);
+ errarg->remote_xid,
+ LSN_FORMAT_ARGS(errarg->finish_lsn));
}
/* Set transaction information of apply error callback */
static inline void
-set_apply_error_context_xact(TransactionId xid)
+set_apply_error_context_xact(TransactionId xid, XLogRecPtr lsn)
{
apply_error_callback_arg.remote_xid = xid;
+ apply_error_callback_arg.finish_lsn = lsn;
}
/* Reset all information of apply error callback */
@@ -3690,5 +3727,5 @@ reset_apply_error_context_info(void)
apply_error_callback_arg.command = 0;
apply_error_callback_arg.rel = NULL;
apply_error_callback_arg.remote_attnum = -1;
- set_apply_error_context_xact(InvalidTransactionId);
+ set_apply_error_context_xact(InvalidTransactionId, InvalidXLogRecPtr);
}
--
2.28.0.windows.1
On Mon, Mar 7, 2022 at 10:06 AM Amit Kapila <amit.kapila16@gmail.com> wrote:
On Mon, Mar 7, 2022 at 6:36 AM Masahiko Sawada <sawada.mshk@gmail.com> wrote:
Thank you for the comment. +1.
I've attached updated patches.
Pushed the first patch. Fixed one typo in the second patch and
slightly changed the commit message, otherwise, it looks good to me.
I'll push this tomorrow unless there are more comments.
Pushed.
--
With Regards,
Amit Kapila.
On Tue, Mar 8, 2022 at 7:56 PM Amit Kapila <amit.kapila16@gmail.com> wrote:
On Mon, Mar 7, 2022 at 10:06 AM Amit Kapila <amit.kapila16@gmail.com> wrote:
On Mon, Mar 7, 2022 at 6:36 AM Masahiko Sawada <sawada.mshk@gmail.com> wrote:
Thank you for the comment. +1.
I've attached updated patches.
Pushed the first patch. Fixed one typo in the second patch and
slightly changed the commit message, otherwise, it looks good to me.
I'll push this tomorrow unless there are more comments.Pushed.
Thank you!
Regards,
--
Masahiko Sawada
EDB: https://www.enterprisedb.com/