User defined data types in Logical Replication
Hi,
We are getting the bellow error while trying use Logical Replication
with user defined data types in a C program (when call elog function).
ERROR: XX000: cache lookup failed for type XXXXX
# XXXXX is remote type's oid
It occurs in worker.c:slot_store_error_callback function when remotetypoid not exist in local pg_type.
I have tried to write a patch (attached).
I think it is not kindly to change typename to the OID's one,
But I could not find the easy way to get typename from OID in the remote host.
---
Thanks and best regards,
Dang Minh Huong
NEC Solution Innovators, Ltd.
http://www.nec-solutioninnovators.co.jp/en/
Attachments:
logicalrep_typmap.patchapplication/octet-stream; name=logicalrep_typmap.patchDownload
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index eedc3a8..11a5883 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -289,11 +289,11 @@ slot_store_error_callback(void *arg)
remotetypoid = errarg->rel->atttyps[errarg->attnum];
localtypoid = logicalrep_typmap_getid(remotetypoid);
errcontext("processing remote data for replication target relation \"%s.%s\" column \"%s\", "
- "remote type %s, local type %s",
+ "remote type's OID %u, local type's OID %u",
errarg->rel->nspname, errarg->rel->relname,
errarg->rel->attnames[errarg->attnum],
- format_type_be(remotetypoid),
- format_type_be(localtypoid));
+ remotetypoid,
+ localtypoid);
}
/*
Hi,
We are getting the bellow error while trying use Logical Replication with
user defined data types in a C program (when call elog function).ERROR: XX000: cache lookup failed for type XXXXX
Sorry for continuously disturbing in this topic, but am I missing something here?
I mean that in case of type's OID in PUBLICATION host does not exists in SUBSCRIPTION host's pg_type,
it could returns unintended error (the XX000 above) when elog or ereport is executed.
For more details, it happen in slot_store_error_callback when it try to call format_type_be(localtypoid) for errcontext.
slot_store_error_callback is set in slot_store_cstrings, slot_modify_cstrings function and it also be unset here, so the effect here is small but it happens.
---
Thanks and best regards,
Dang Minh Huong
NEC Solution Innovators, Ltd.
http://www.nec-solutioninnovators.co.jp/en/
On Wed, Nov 15, 2017 at 7:55 PM, Huong Dangminh
<huo-dangminh@ys.jp.nec.com> wrote:
Hi,
We are getting the bellow error while trying use Logical Replication with
user defined data types in a C program (when call elog function).ERROR: XX000: cache lookup failed for type XXXXX
Sorry for continuously disturbing in this topic, but am I missing something here?
No, but I'd suggest to provide a procedure for reproducing if
possible, which will be helpful for investigation.
I mean that in case of type's OID in PUBLICATION host does not exists in SUBSCRIPTION host's pg_type,
it could returns unintended error (the XX000 above) when elog or ereport is executed.For more details, it happen in slot_store_error_callback when it try to call format_type_be(localtypoid) for errcontext.
slot_store_error_callback is set in slot_store_cstrings, slot_modify_cstrings function and it also be unset here, so the effect here is small but it happens.
I think I found out the cause of this issue, and this is a bug. This
can be reproduced, for example, if the input function of the data type
calls elog() during applying on the environment where OIDs of the data
type on publisher and subscriber are different. The cause of this
issue is that we call format_type_be() with remotetypoid. If the OIDs
of data type on publisher and subscriber are different we search it
from syscache by the OID that doesn't exist on subscriber.
On detail of your patch, I don't think this direction is good. Since
the subscriber already has a LogicalRepTyp cache entry for the type we
can report the error message using the data type name. So I think this
issue can be fixed by using the remote type name got from the cache.
Also I'm confused about the message of errcontext; currently we store
the local data type OID corresponding to the remote data type name
into the cache, and then we search the local data type name by the
local data type OID stored in the cache. So it means the both the
local data type OID and the remote data type OID always imply the same
data type. We use the both data type OIDs for log message in
slot_store_error_callback, but I think what the function want to do is
to show the different type names if the table definitions on both
server are different (e.g. sending jsonb column data to text column
data). I think we should use the type of the local relation attribute
rather than remote's one.
Attached draft patch fixed this issue, at least on my environment.
Please review it.
Regards,
--
Masahiko Sawada
NIPPON TELEGRAPH AND TELEPHONE CORPORATION
NTT Open Source Software Center
Attachments:
fix_slot_store_error_callback.patchapplication/octet-stream; name=fix_slot_store_error_callback.patchDownload
diff --git a/src/backend/replication/logical/relation.c b/src/backend/replication/logical/relation.c
index 4b2d8a1..f21ba5c 100644
--- a/src/backend/replication/logical/relation.c
+++ b/src/backend/replication/logical/relation.c
@@ -442,8 +442,8 @@ logicalrep_typmap_update(LogicalRepTyp *remotetyp)
/*
* Fetch type info from the cache.
*/
-Oid
-logicalrep_typmap_getid(Oid remoteid)
+char *
+logicalrep_typmap_gettypname(Oid remoteid)
{
LogicalRepTyp *entry;
bool found;
@@ -456,7 +456,7 @@ logicalrep_typmap_getid(Oid remoteid)
ereport(ERROR,
(errmsg("built-in type %u not found", remoteid),
errhint("This can be caused by having a publisher with a higher PostgreSQL major version than the subscriber.")));
- return remoteid;
+ return format_type_be(remoteid);
}
if (LogicalRepTypMap == NULL)
@@ -472,7 +472,7 @@ logicalrep_typmap_getid(Oid remoteid)
/* Found and mapped, return the oid. */
if (OidIsValid(entry->typoid))
- return entry->typoid;
+ return entry->typname;
/* Otherwise, try to map to local type. */
nspoid = LookupExplicitNamespace(entry->nspname, true);
@@ -489,5 +489,5 @@ logicalrep_typmap_getid(Oid remoteid)
errmsg("data type \"%s.%s\" required for logical replication does not exist",
entry->nspname, entry->typname)));
- return entry->typoid;
+ return entry->typname;
}
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 0e68670..b132a23 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -100,8 +100,9 @@ static dlist_head lsn_mapping = DLIST_STATIC_INIT(lsn_mapping);
typedef struct SlotErrCallbackArg
{
- LogicalRepRelation *rel;
- int attnum;
+ LogicalRepRelMapEntry *rel;
+ int remote_attnum;
+ int local_attnum;
} SlotErrCallbackArg;
static MemoryContext ApplyMessageContext = NULL;
@@ -282,17 +283,20 @@ slot_store_error_callback(void *arg)
SlotErrCallbackArg *errarg = (SlotErrCallbackArg *) arg;
Oid remotetypoid,
localtypoid;
+ char *remotetypname;
- if (errarg->attnum < 0)
+ if (errarg->remote_attnum < 0)
return;
- remotetypoid = errarg->rel->atttyps[errarg->attnum];
- localtypoid = logicalrep_typmap_getid(remotetypoid);
+ remotetypoid = errarg->rel->remoterel.atttyps[errarg->remote_attnum];
+ remotetypname = logicalrep_typmap_gettypname(remotetypoid);
+ localtypoid = get_atttype(errarg->rel->localreloid, errarg->local_attnum + 1);
+
errcontext("processing remote data for replication target relation \"%s.%s\" column \"%s\", "
- "remote type %s, local type %s",
- errarg->rel->nspname, errarg->rel->relname,
- errarg->rel->attnames[errarg->attnum],
- format_type_be(remotetypoid),
+ "remote type \"%s\", local type \"%s\"",
+ errarg->rel->remoterel.nspname, errarg->rel->remoterel.relname,
+ errarg->rel->remoterel.attnames[errarg->remote_attnum],
+ remotetypname,
format_type_be(localtypoid));
}
@@ -313,8 +317,9 @@ slot_store_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel,
ExecClearTuple(slot);
/* Push callback + info on the error context stack */
- errarg.rel = &rel->remoterel;
- errarg.attnum = -1;
+ errarg.rel = rel;
+ errarg.local_attnum = -1;
+ errarg.remote_attnum = -1;
errcallback.callback = slot_store_error_callback;
errcallback.arg = (void *) &errarg;
errcallback.previous = error_context_stack;
@@ -332,7 +337,8 @@ slot_store_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel,
Oid typinput;
Oid typioparam;
- errarg.attnum = remoteattnum;
+ errarg.local_attnum = i;
+ errarg.remote_attnum = remoteattnum;
getTypeInputInfo(att->atttypid, &typinput, &typioparam);
slot->tts_values[i] = OidInputFunctionCall(typinput,
@@ -378,8 +384,9 @@ slot_modify_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel,
ExecClearTuple(slot);
/* Push callback + info on the error context stack */
- errarg.rel = &rel->remoterel;
- errarg.attnum = -1;
+ errarg.rel = rel;
+ errarg.local_attnum = -1;
+ errarg.remote_attnum = -1;
errcallback.callback = slot_store_error_callback;
errcallback.arg = (void *) &errarg;
errcallback.previous = error_context_stack;
@@ -402,7 +409,8 @@ slot_modify_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel,
Oid typinput;
Oid typioparam;
- errarg.attnum = remoteattnum;
+ errarg.local_attnum = i;
+ errarg.remote_attnum = remoteattnum;
getTypeInputInfo(att->atttypid, &typinput, &typioparam);
slot->tts_values[i] = OidInputFunctionCall(typinput,
diff --git a/src/include/replication/logicalrelation.h b/src/include/replication/logicalrelation.h
index 8352705..563bb4f 100644
--- a/src/include/replication/logicalrelation.h
+++ b/src/include/replication/logicalrelation.h
@@ -37,6 +37,6 @@ extern void logicalrep_rel_close(LogicalRepRelMapEntry *rel,
LOCKMODE lockmode);
extern void logicalrep_typmap_update(LogicalRepTyp *remotetyp);
-extern Oid logicalrep_typmap_getid(Oid remoteid);
+extern char *logicalrep_typmap_gettypname(Oid remoteid);
#endif /* LOGICALRELATION_H */
Sawada-san,
Thanks for your response.
# And sorry again because I could not reply to your gmail
# address from my environment due to security restriction.
We are getting the bellow error while trying use Logical Replication
with user defined data types in a C program (when call elog function).ERROR: XX000: cache lookup failed for type XXXXX
Sorry for continuously disturbing in this topic, but am I missing something
here?
No, but I'd suggest to provide a procedure for reproducing if possible,
which will be helpful for investigation.
Sorry, I will be careful next time.
I mean that in case of type's OID in PUBLICATION host does not exists
in SUBSCRIPTION host's pg_type, it could returns unintended error (theXX000 above) when elog or ereport is executed.
For more details, it happen in slot_store_error_callback when it try to
call format_type_be(localtypoid) for errcontext.
slot_store_error_callback is set in slot_store_cstrings,
slot_modify_cstrings function and it also be unset here, so the effect here
is small but it happens.I think I found out the cause of this issue, and this is a bug. This can
be reproduced, for example, if the input function of the data type calls
elog() during applying on the environment where OIDs of the data type on
publisher and subscriber are different. The cause of this issue is that
we call format_type_be() with remotetypoid. If the OIDs of data type on
publisher and subscriber are different we search it from syscache by the
OID that doesn't exist on subscriber.
Yes, I also think that.
On detail of your patch, I don't think this direction is good. Since the
subscriber already has a LogicalRepTyp cache entry for the type we can report
the error message using the data type name. So I think this issue can be
fixed by using the remote type name got from the cache.
Thanks,
I did not realize the LogicalRepRelMapEntry, remote type name is already here.
Also I'm confused about the message of errcontext; currently we store the
local data type OID corresponding to the remote data type name into the
cache, and then we search the local data type name by the local data type
OID stored in the cache. So it means the both the local data type OID and
the remote data type OID always imply the same data type. We use the both
data type OIDs for log message in slot_store_error_callback, but I think
what the function want to do is to show the different type names if the
table definitions on both server are different (e.g. sending jsonb column
data to text column data). I think we should use the type of the local relation
attribute rather than remote's one.Attached draft patch fixed this issue, at least on my environment.
It works good for me.
Please review it.
I will review it soon.
---
Thanks and best regards,
Dang Minh Huong
NEC Solution Innovators, Ltd.
http://www.nec-solutioninnovators.co.jp/en/
Import Notes
Resolved by subject fallback
Sorry for not replying sooner.
Attached draft patch fixed this issue, at least on my environment.
It works good for me.
Please review it.
I will review it soon.
There is one more case that user-defined data type is not supported in Logical Replication.
That is when remote data type's name does not exist in SUBSCRIBE.
In relation.c:logicalrep_typmap_gettypname
We search OID in syscache by remote's data type name and mapping it, if it does not exist in syscache
We will be faced with the bellow error.
if (!OidIsValid(entry->typoid))
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("data type \"%s.%s\" required for logical replication does not exist",
entry->nspname, entry->typname)));
I think, it is not necessary to check typoid here in order to avoid above case, is that right?
I attached a patch based on Sawada-san's patch with a bit of messages modified and remove the above check.
Could somebody check it for me or should I add it into CF?
---
Thanks and best regards,
Dang Minh Huong
NEC Solution Innovators, Ltd.
http://www.nec-solutioninnovators.co.jp/en/
Attachments:
fix_slot_store_error_callback_V2.patchapplication/octet-stream; name=fix_slot_store_error_callback_V2.patchDownload
diff --git a/src/backend/replication/logical/relation.c b/src/backend/replication/logical/relation.c
index 4b2d8a1..e24725c 100644
--- a/src/backend/replication/logical/relation.c
+++ b/src/backend/replication/logical/relation.c
@@ -442,8 +442,8 @@ logicalrep_typmap_update(LogicalRepTyp *remotetyp)
/*
* Fetch type info from the cache.
*/
-Oid
-logicalrep_typmap_getid(Oid remoteid)
+char *
+logicalrep_typmap_gettypname(Oid remoteid)
{
LogicalRepTyp *entry;
bool found;
@@ -454,9 +454,9 @@ logicalrep_typmap_getid(Oid remoteid)
{
if (!get_typisdefined(remoteid))
ereport(ERROR,
- (errmsg("built-in type %u not found", remoteid),
+ (errmsg("built-in type oid %u not found", remoteid),
errhint("This can be caused by having a publisher with a higher PostgreSQL major version than the subscriber.")));
- return remoteid;
+ return format_type_be(remoteid);
}
if (LogicalRepTypMap == NULL)
@@ -467,27 +467,21 @@ logicalrep_typmap_getid(Oid remoteid)
HASH_FIND, &found);
if (!found)
- elog(ERROR, "no type map entry for remote type %u",
+ elog(ERROR, "no type map entry for remote type oid %u",
remoteid);
-
- /* Found and mapped, return the oid. */
- if (OidIsValid(entry->typoid))
- return entry->typoid;
-
- /* Otherwise, try to map to local type. */
- nspoid = LookupExplicitNamespace(entry->nspname, true);
- if (OidIsValid(nspoid))
- entry->typoid = GetSysCacheOid2(TYPENAMENSP,
+
+ if (!OidIsValid(entry->typoid))
+ {
+ /* Try to map to local type. */
+ nspoid = LookupExplicitNamespace(entry->nspname, true);
+ if (OidIsValid(nspoid))
+ entry->typoid = GetSysCacheOid2(TYPENAMENSP,
PointerGetDatum(entry->typname),
ObjectIdGetDatum(nspoid));
- else
- entry->typoid = InvalidOid;
-
- if (!OidIsValid(entry->typoid))
- ereport(ERROR,
- (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
- errmsg("data type \"%s.%s\" required for logical replication does not exist",
- entry->nspname, entry->typname)));
-
- return entry->typoid;
+ else
+ entry->typoid = InvalidOid;
+ }
+
+ /* Return type name */
+ return entry->typname;
}
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 0e68670..b132a23 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -100,8 +100,9 @@ static dlist_head lsn_mapping = DLIST_STATIC_INIT(lsn_mapping);
typedef struct SlotErrCallbackArg
{
- LogicalRepRelation *rel;
- int attnum;
+ LogicalRepRelMapEntry *rel;
+ int remote_attnum;
+ int local_attnum;
} SlotErrCallbackArg;
static MemoryContext ApplyMessageContext = NULL;
@@ -282,17 +283,20 @@ slot_store_error_callback(void *arg)
SlotErrCallbackArg *errarg = (SlotErrCallbackArg *) arg;
Oid remotetypoid,
localtypoid;
+ char *remotetypname;
- if (errarg->attnum < 0)
+ if (errarg->remote_attnum < 0)
return;
- remotetypoid = errarg->rel->atttyps[errarg->attnum];
- localtypoid = logicalrep_typmap_getid(remotetypoid);
+ remotetypoid = errarg->rel->remoterel.atttyps[errarg->remote_attnum];
+ remotetypname = logicalrep_typmap_gettypname(remotetypoid);
+ localtypoid = get_atttype(errarg->rel->localreloid, errarg->local_attnum + 1);
+
errcontext("processing remote data for replication target relation \"%s.%s\" column \"%s\", "
- "remote type %s, local type %s",
- errarg->rel->nspname, errarg->rel->relname,
- errarg->rel->attnames[errarg->attnum],
- format_type_be(remotetypoid),
+ "remote type \"%s\", local type \"%s\"",
+ errarg->rel->remoterel.nspname, errarg->rel->remoterel.relname,
+ errarg->rel->remoterel.attnames[errarg->remote_attnum],
+ remotetypname,
format_type_be(localtypoid));
}
@@ -313,8 +317,9 @@ slot_store_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel,
ExecClearTuple(slot);
/* Push callback + info on the error context stack */
- errarg.rel = &rel->remoterel;
- errarg.attnum = -1;
+ errarg.rel = rel;
+ errarg.local_attnum = -1;
+ errarg.remote_attnum = -1;
errcallback.callback = slot_store_error_callback;
errcallback.arg = (void *) &errarg;
errcallback.previous = error_context_stack;
@@ -332,7 +337,8 @@ slot_store_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel,
Oid typinput;
Oid typioparam;
- errarg.attnum = remoteattnum;
+ errarg.local_attnum = i;
+ errarg.remote_attnum = remoteattnum;
getTypeInputInfo(att->atttypid, &typinput, &typioparam);
slot->tts_values[i] = OidInputFunctionCall(typinput,
@@ -378,8 +384,9 @@ slot_modify_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel,
ExecClearTuple(slot);
/* Push callback + info on the error context stack */
- errarg.rel = &rel->remoterel;
- errarg.attnum = -1;
+ errarg.rel = rel;
+ errarg.local_attnum = -1;
+ errarg.remote_attnum = -1;
errcallback.callback = slot_store_error_callback;
errcallback.arg = (void *) &errarg;
errcallback.previous = error_context_stack;
@@ -402,7 +409,8 @@ slot_modify_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel,
Oid typinput;
Oid typioparam;
- errarg.attnum = remoteattnum;
+ errarg.local_attnum = i;
+ errarg.remote_attnum = remoteattnum;
getTypeInputInfo(att->atttypid, &typinput, &typioparam);
slot->tts_values[i] = OidInputFunctionCall(typinput,
diff --git a/src/include/replication/logicalrelation.h b/src/include/replication/logicalrelation.h
index 8352705..563bb4f 100644
--- a/src/include/replication/logicalrelation.h
+++ b/src/include/replication/logicalrelation.h
@@ -37,6 +37,6 @@ extern void logicalrep_rel_close(LogicalRepRelMapEntry *rel,
LOCKMODE lockmode);
extern void logicalrep_typmap_update(LogicalRepTyp *remotetyp);
-extern Oid logicalrep_typmap_getid(Oid remoteid);
+extern char *logicalrep_typmap_gettypname(Oid remoteid);
#endif /* LOGICALRELATION_H */
Hi,
From: Huong Dangminh [mailto:huo-dangminh@ys.jp.nec.com]
I attached a patch based on Sawada-san's patch with a bit of messages
modified and remove the above check.
Could somebody check it for me or should I add it into CF?
Sorry, I have added this thread to the next CF.
---
Thanks and best regards,
Dang Minh Huong
NEC Solution Innovators, Ltd.
http://www.nec-solutioninnovators.co.jp/en/
On Wed, Nov 22, 2017 at 12:25 AM, Huong Dangminh
<huo-dangminh@ys.jp.nec.com> wrote:
Thanks for your response.
# And sorry again because I could not reply to your gmail
# address from my environment due to security restriction.
It's okay. I can understand your environment :-)
Sorry for not replying sooner.
Attached draft patch fixed this issue, at least on my environment.
It works good for me.
Please review it.
I will review it soon.
There is one more case that user-defined data type is not supported in Logical Replication.
That is when remote data type's name does not exist in SUBSCRIBE.In relation.c:logicalrep_typmap_gettypname
We search OID in syscache by remote's data type name and mapping it, if it does not exist in syscache
We will be faced with the bellow error.if (!OidIsValid(entry->typoid))
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("data type \"%s.%s\" required for logical replication does not exist",
entry->nspname, entry->typname)));I think, it is not necessary to check typoid here in order to avoid above case, is that right?
I think it's not right. We should end up with an error in the case
where the same type name doesn't exist on subscriber. With your
proposed patch, logicalrep_typmap_gettypname() can return an invalid
string (entry->typname) in that case, which can be a cause of SEGV.
Sorry, I have added this thread to the next CF.
Thank you for adding it.
Regards,
--
Masahiko Sawada
NIPPON TELEGRAPH AND TELEPHONE CORPORATION
NTT Open Source Software Center
Hi Sawada-san,
Sorry for my late response.
On 2017/12/05 0:11, Masahiko Sawada wrote:
There is one more case that user-defined data type is not supported in Logical Replication.
That is when remote data type's name does not exist in SUBSCRIBE.In relation.c:logicalrep_typmap_gettypname
We search OID in syscache by remote's data type name and mapping it, if it does not exist in syscache
We will be faced with the bellow error.if (!OidIsValid(entry->typoid))
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("data type \"%s.%s\" required for logical replication does not exist",
entry->nspname, entry->typname)));I think, it is not necessary to check typoid here in order to avoid above case, is that right?
I think it's not right. We should end up with an error in the case
where the same type name doesn't exist on subscriber. With your
proposed patch, logicalrep_typmap_gettypname() can return an invalid
string (entry->typname) in that case, which can be a cause of SEGV.
Thanks, I think you are right.
# I thought that entry->typname was received from walsender, and it is
already be qualified in logicalrep_write_typ.
# But we also need check it in subscriber, because it could be lost info
in transmission.
Also we do not need to fix for the case above too, because user can
change type name by ALTER TYPE statement.
I attached the patch, which based on your patch with a bit of modified
messages.
---
Thanks and best regards,
Dang Minh Huong
Attachments:
fix_slot_store_error_callback_V3.patchtext/plain; charset=UTF-8; name=fix_slot_store_error_callback_V3.patch; x-mac-creator=0; x-mac-type=0Download
diff --git a/src/backend/replication/logical/relation.c b/src/backend/replication/logical/relation.c
index 46e515e4b6..0ae676a8d6 100644
--- a/src/backend/replication/logical/relation.c
+++ b/src/backend/replication/logical/relation.c
@@ -441,8 +441,8 @@ logicalrep_typmap_update(LogicalRepTyp *remotetyp)
/*
* Fetch type info from the cache.
*/
-Oid
-logicalrep_typmap_getid(Oid remoteid)
+char *
+logicalrep_typmap_gettypname(Oid remoteid)
{
LogicalRepTyp *entry;
bool found;
@@ -453,9 +453,9 @@ logicalrep_typmap_getid(Oid remoteid)
{
if (!get_typisdefined(remoteid))
ereport(ERROR,
- (errmsg("built-in type %u not found", remoteid),
+ (errmsg("built-in type oid %u not found", remoteid),
errhint("This can be caused by having a publisher with a higher PostgreSQL major version than the subscriber.")));
- return remoteid;
+ return format_type_be(remoteid);
}
if (LogicalRepTypMap == NULL)
@@ -466,12 +466,12 @@ logicalrep_typmap_getid(Oid remoteid)
HASH_FIND, &found);
if (!found)
- elog(ERROR, "no type map entry for remote type %u",
+ elog(ERROR, "no type map entry for remote type oid %u",
remoteid);
- /* Found and mapped, return the oid. */
+ /* Found and mapped, return the type name. */
if (OidIsValid(entry->typoid))
- return entry->typoid;
+ return entry->typname;
/* Otherwise, try to map to local type. */
nspoid = LookupExplicitNamespace(entry->nspname, true);
@@ -488,5 +488,6 @@ logicalrep_typmap_getid(Oid remoteid)
errmsg("data type \"%s.%s\" required for logical replication does not exist",
entry->nspname, entry->typname)));
- return entry->typoid;
+ /* Return type name */
+ return entry->typname;
}
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index eedc3a8816..6cf037684b 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -100,8 +100,9 @@ static dlist_head lsn_mapping = DLIST_STATIC_INIT(lsn_mapping);
typedef struct SlotErrCallbackArg
{
- LogicalRepRelation *rel;
- int attnum;
+ LogicalRepRelMapEntry *rel;
+ int remote_attnum;
+ int local_attnum;
} SlotErrCallbackArg;
static MemoryContext ApplyMessageContext = NULL;
@@ -282,17 +283,20 @@ slot_store_error_callback(void *arg)
SlotErrCallbackArg *errarg = (SlotErrCallbackArg *) arg;
Oid remotetypoid,
localtypoid;
+ char *remotetypname;
- if (errarg->attnum < 0)
+ if (errarg->remote_attnum < 0)
return;
- remotetypoid = errarg->rel->atttyps[errarg->attnum];
- localtypoid = logicalrep_typmap_getid(remotetypoid);
+ remotetypoid = errarg->rel->remoterel.atttyps[errarg->remote_attnum];
+ remotetypname = logicalrep_typmap_gettypname(remotetypoid);
+ localtypoid = get_atttype(errarg->rel->localreloid, errarg->local_attnum + 1);
+
errcontext("processing remote data for replication target relation \"%s.%s\" column \"%s\", "
- "remote type %s, local type %s",
- errarg->rel->nspname, errarg->rel->relname,
- errarg->rel->attnames[errarg->attnum],
- format_type_be(remotetypoid),
+ "remote type \"%s\", local type \"%s\"",
+ errarg->rel->remoterel.nspname, errarg->rel->remoterel.relname,
+ errarg->rel->remoterel.attnames[errarg->remote_attnum],
+ remotetypname,
format_type_be(localtypoid));
}
@@ -313,8 +317,9 @@ slot_store_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel,
ExecClearTuple(slot);
/* Push callback + info on the error context stack */
- errarg.rel = &rel->remoterel;
- errarg.attnum = -1;
+ errarg.rel = rel;
+ errarg.local_attnum = -1;
+ errarg.remote_attnum = -1;
errcallback.callback = slot_store_error_callback;
errcallback.arg = (void *) &errarg;
errcallback.previous = error_context_stack;
@@ -332,7 +337,8 @@ slot_store_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel,
Oid typinput;
Oid typioparam;
- errarg.attnum = remoteattnum;
+ errarg.local_attnum = i;
+ errarg.remote_attnum = remoteattnum;
getTypeInputInfo(att->atttypid, &typinput, &typioparam);
slot->tts_values[i] = OidInputFunctionCall(typinput,
@@ -378,8 +384,9 @@ slot_modify_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel,
ExecClearTuple(slot);
/* Push callback + info on the error context stack */
- errarg.rel = &rel->remoterel;
- errarg.attnum = -1;
+ errarg.rel = rel;
+ errarg.local_attnum = -1;
+ errarg.remote_attnum = -1;
errcallback.callback = slot_store_error_callback;
errcallback.arg = (void *) &errarg;
errcallback.previous = error_context_stack;
@@ -402,7 +409,8 @@ slot_modify_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel,
Oid typinput;
Oid typioparam;
- errarg.attnum = remoteattnum;
+ errarg.local_attnum = i;
+ errarg.remote_attnum = remoteattnum;
getTypeInputInfo(att->atttypid, &typinput, &typioparam);
slot->tts_values[i] = OidInputFunctionCall(typinput,
diff --git a/src/include/replication/logicalrelation.h b/src/include/replication/logicalrelation.h
index 8352705650..563bb4f37d 100644
--- a/src/include/replication/logicalrelation.h
+++ b/src/include/replication/logicalrelation.h
@@ -37,6 +37,6 @@ extern void logicalrep_rel_close(LogicalRepRelMapEntry *rel,
LOCKMODE lockmode);
extern void logicalrep_typmap_update(LogicalRepTyp *remotetyp);
-extern Oid logicalrep_typmap_getid(Oid remoteid);
+extern char *logicalrep_typmap_gettypname(Oid remoteid);
#endif /* LOGICALRELATION_H */
On Thu, Dec 7, 2017 at 12:23 AM, Dang Minh Huong <kakalot49@gmail.com> wrote:
Hi Sawada-san,
Sorry for my late response.
On 2017/12/05 0:11, Masahiko Sawada wrote:
There is one more case that user-defined data type is not supported in
Logical Replication.
That is when remote data type's name does not exist in SUBSCRIBE.In relation.c:logicalrep_typmap_gettypname
We search OID in syscache by remote's data type name and mapping it, if it
does not exist in syscache
We will be faced with the bellow error.if (!OidIsValid(entry->typoid))
ereport(ERROR,(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("data type \"%s.%s\" required for
logical replication does not exist",
entry->nspname,
entry->typname)));I think, it is not necessary to check typoid here in order to avoid above
case, is that right?I think it's not right. We should end up with an error in the case where the
same type name doesn't exist on subscriber. With your proposed patch,
logicalrep_typmap_gettypname() can return an invalid string (entry->typname)
in that case, which can be a cause of SEGV.Thanks, I think you are right.
# I thought that entry->typname was received from walsender, and it is
already be qualified in logicalrep_write_typ.
# But we also need check it in subscriber, because it could be lost info in
transmission.
Oops, the last sentence of my previous mail was wrong.
logicalrep_typmap_gettypname() doesn't return an invalid string since
entry->typname is initialized with a type name got from wal sender.
After more thought, we might not need to raise an error even if there
is not the same data type on both publisher and subscriber. Because
data is sent after converted to the text representation and is
converted to a data type according to the local table definition
subscribers don't always need to have the same data type. If it's
right, slot_store_error_callback() doesn't need to find a
corresponding local data type OID but just finds the corresponding
type name by remote data type OID. What do you think?
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -100,8 +100,9 @@ static dlist_head lsn_mapping =
DLIST_STATIC_INIT(lsn_mapping);
typedef struct SlotErrCallbackArg
{
- LogicalRepRelation *rel;
- int attnum;
+ LogicalRepRelMapEntry *rel;
+ int remote_attnum;
+ int local_attnum;
} SlotErrCallbackArg;
Since LogicalRepRelMapEntry has a map of local attributes to remote
ones we don't need to have two attribute numbers.
Regards,
--
Masahiko Sawada
NIPPON TELEGRAPH AND TELEPHONE CORPORATION
NTT Open Source Software Center
On Thu, Dec 7, 2017 at 11:07 AM, Masahiko Sawada <sawada.mshk@gmail.com> wrote:
On Thu, Dec 7, 2017 at 12:23 AM, Dang Minh Huong <kakalot49@gmail.com> wrote:
Hi Sawada-san,
Sorry for my late response.
On 2017/12/05 0:11, Masahiko Sawada wrote:
There is one more case that user-defined data type is not supported in
Logical Replication.
That is when remote data type's name does not exist in SUBSCRIBE.In relation.c:logicalrep_typmap_gettypname
We search OID in syscache by remote's data type name and mapping it, if it
does not exist in syscache
We will be faced with the bellow error.if (!OidIsValid(entry->typoid))
ereport(ERROR,(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("data type \"%s.%s\" required for
logical replication does not exist",
entry->nspname,
entry->typname)));I think, it is not necessary to check typoid here in order to avoid above
case, is that right?I think it's not right. We should end up with an error in the case where the
same type name doesn't exist on subscriber. With your proposed patch,
logicalrep_typmap_gettypname() can return an invalid string (entry->typname)
in that case, which can be a cause of SEGV.Thanks, I think you are right.
# I thought that entry->typname was received from walsender, and it is
already be qualified in logicalrep_write_typ.
# But we also need check it in subscriber, because it could be lost info in
transmission.Oops, the last sentence of my previous mail was wrong.
logicalrep_typmap_gettypname() doesn't return an invalid string since
entry->typname is initialized with a type name got from wal sender.After more thought, we might not need to raise an error even if there
is not the same data type on both publisher and subscriber. Because
data is sent after converted to the text representation and is
converted to a data type according to the local table definition
subscribers don't always need to have the same data type. If it's
right, slot_store_error_callback() doesn't need to find a
corresponding local data type OID but just finds the corresponding
type name by remote data type OID. What do you think?--- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -100,8 +100,9 @@ static dlist_head lsn_mapping = DLIST_STATIC_INIT(lsn_mapping);typedef struct SlotErrCallbackArg { - LogicalRepRelation *rel; - int attnum; + LogicalRepRelMapEntry *rel; + int remote_attnum; + int local_attnum; } SlotErrCallbackArg;Since LogicalRepRelMapEntry has a map of local attributes to remote
ones we don't need to have two attribute numbers.
Attached the patch incorporated what I have on mind. Please review it.
Regards,
--
Masahiko Sawada
NIPPON TELEGRAPH AND TELEPHONE CORPORATION
NTT Open Source Software Center
Attachments:
fix_slot_store_error_callback_v4.patchapplication/octet-stream; name=fix_slot_store_error_callback_v4.patchDownload
diff --git a/src/backend/replication/logical/relation.c b/src/backend/replication/logical/relation.c
index 4b2d8a1..1c23491 100644
--- a/src/backend/replication/logical/relation.c
+++ b/src/backend/replication/logical/relation.c
@@ -440,7 +440,42 @@ logicalrep_typmap_update(LogicalRepTyp *remotetyp)
}
/*
- * Fetch type info from the cache.
+ * Fetch type name from the cache by remote type OID.
+ */
+char *
+logicalrep_typmap_gettypname(Oid remoteid)
+{
+ LogicalRepTyp *entry;
+ bool found;
+
+ /* Internal types are mapped directly. */
+ if (remoteid < FirstNormalObjectId)
+ {
+ if (!get_typisdefined(remoteid))
+ ereport(ERROR,
+ (errmsg("built-in type %u not found", remoteid),
+ errhint("This can be caused by having a publisher with a higher PostgreSQL major version than the subscriber.")));
+ return format_type_be(remoteid);
+ }
+
+ if (LogicalRepTypMap == NULL)
+ logicalrep_relmap_init();
+
+ /* Try finding the mapping. */
+ entry = hash_search(LogicalRepTypMap, (void *) &remoteid,
+ HASH_FIND, &found);
+
+ if (!found)
+ elog(ERROR, "no type map entry for remote type %u",
+ remoteid);
+
+ Assert(OidIsValid(entry->remoteid));
+
+ return entry->typname;
+}
+
+/*
+ * Fetch local type OID from the cache by remote type OID.
*/
Oid
logicalrep_typmap_getid(Oid remoteid)
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index fa5d9bb..a51c882 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -100,8 +100,8 @@ static dlist_head lsn_mapping = DLIST_STATIC_INIT(lsn_mapping);
typedef struct SlotErrCallbackArg
{
- LogicalRepRelation *rel;
- int attnum;
+ LogicalRepRelMapEntry *rel;
+ int local_attnum;
} SlotErrCallbackArg;
static MemoryContext ApplyMessageContext = NULL;
@@ -282,19 +282,31 @@ static void
slot_store_error_callback(void *arg)
{
SlotErrCallbackArg *errarg = (SlotErrCallbackArg *) arg;
+ LogicalRepRelMapEntry *rel;
+ char *remotetypname;
+ int remote_attnum;
Oid remotetypoid,
localtypoid;
- if (errarg->attnum < 0)
+ rel = errarg->rel;
+ remote_attnum = rel->attrmap[errarg->local_attnum];
+
+ if (remote_attnum < 0)
return;
- remotetypoid = errarg->rel->atttyps[errarg->attnum];
- localtypoid = logicalrep_typmap_getid(remotetypoid);
+ remotetypoid = rel->remoterel.atttyps[remote_attnum];
+
+ /* Fetch remote type name from the LogicalRepTypMap cache */
+ remotetypname = logicalrep_typmap_gettypname(remotetypoid);
+
+ /* Fetch local type OID from the local sys cache */
+ localtypoid = get_atttype(errarg->rel->localreloid, errarg->local_attnum + 1);
+
errcontext("processing remote data for replication target relation \"%s.%s\" column \"%s\", "
- "remote type %s, local type %s",
- errarg->rel->nspname, errarg->rel->relname,
- errarg->rel->attnames[errarg->attnum],
- format_type_be(remotetypoid),
+ "remote type \"%s\", local type \"%s\"",
+ rel->remoterel.nspname, rel->remoterel.relname,
+ rel->remoterel.attnames[remote_attnum],
+ remotetypname,
format_type_be(localtypoid));
}
@@ -315,8 +327,8 @@ slot_store_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel,
ExecClearTuple(slot);
/* Push callback + info on the error context stack */
- errarg.rel = &rel->remoterel;
- errarg.attnum = -1;
+ errarg.rel = rel;
+ errarg.local_attnum = -1;
errcallback.callback = slot_store_error_callback;
errcallback.arg = (void *) &errarg;
errcallback.previous = error_context_stack;
@@ -334,7 +346,7 @@ slot_store_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel,
Oid typinput;
Oid typioparam;
- errarg.attnum = remoteattnum;
+ errarg.local_attnum = i;
getTypeInputInfo(att->atttypid, &typinput, &typioparam);
slot->tts_values[i] = OidInputFunctionCall(typinput,
@@ -380,8 +392,8 @@ slot_modify_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel,
ExecClearTuple(slot);
/* Push callback + info on the error context stack */
- errarg.rel = &rel->remoterel;
- errarg.attnum = -1;
+ errarg.rel = rel;
+ errarg.local_attnum = -1;
errcallback.callback = slot_store_error_callback;
errcallback.arg = (void *) &errarg;
errcallback.previous = error_context_stack;
@@ -404,7 +416,7 @@ slot_modify_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel,
Oid typinput;
Oid typioparam;
- errarg.attnum = remoteattnum;
+ errarg.local_attnum = i;
getTypeInputInfo(att->atttypid, &typinput, &typioparam);
slot->tts_values[i] = OidInputFunctionCall(typinput,
diff --git a/src/include/replication/logicalrelation.h b/src/include/replication/logicalrelation.h
index 8352705..474563c 100644
--- a/src/include/replication/logicalrelation.h
+++ b/src/include/replication/logicalrelation.h
@@ -38,5 +38,6 @@ extern void logicalrep_rel_close(LogicalRepRelMapEntry *rel,
extern void logicalrep_typmap_update(LogicalRepTyp *remotetyp);
extern Oid logicalrep_typmap_getid(Oid remoteid);
+extern char *logicalrep_typmap_gettypname(Oid remoteid);
#endif /* LOGICALRELATION_H */
Hi Sawada-san,
On Thu, Dec 7, 2017 at 11:07 AM, Masahiko Sawada <sawada.mshk@gmail.com>
wrote:On Thu, Dec 7, 2017 at 12:23 AM, Dang Minh Huong <kakalot49@gmail.com>
wrote:
Hi Sawada-san,
Sorry for my late response.
On 2017/12/05 0:11, Masahiko Sawada wrote:
There is one more case that user-defined data type is not supported
in Logical Replication.
That is when remote data type's name does not exist in SUBSCRIBE.In relation.c:logicalrep_typmap_gettypname
We search OID in syscache by remote's data type name and mapping it,
if it does not exist in syscache We will be faced with the bellow
error.if (!OidIsValid(entry->typoid))
ereport(ERROR,(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("data type \"%s.%s\" required
for logical replication does not exist",
entry->nspname,
entry->typname)));I think, it is not necessary to check typoid here in order to avoid
above case, is that right?I think it's not right. We should end up with an error in the case
where the same type name doesn't exist on subscriber. With your
proposed patch,
logicalrep_typmap_gettypname() can return an invalid string
(entry->typname) in that case, which can be a cause of SEGV.Thanks, I think you are right.
# I thought that entry->typname was received from walsender, and it
is already be qualified in logicalrep_write_typ.
# But we also need check it in subscriber, because it could be lost
info in transmission.Oops, the last sentence of my previous mail was wrong.
logicalrep_typmap_gettypname() doesn't return an invalid string since
entry->typname is initialized with a type name got from wal sender.After more thought, we might not need to raise an error even if there
is not the same data type on both publisher and subscriber. Because
data is sent after converted to the text representation and is
converted to a data type according to the local table definition
subscribers don't always need to have the same data type. If it's
right, slot_store_error_callback() doesn't need to find a
corresponding local data type OID but just finds the corresponding
type name by remote data type OID. What do you think?--- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -100,8 +100,9 @@ static dlist_head lsn_mapping = DLIST_STATIC_INIT(lsn_mapping);typedef struct SlotErrCallbackArg { - LogicalRepRelation *rel; - int attnum; + LogicalRepRelMapEntry *rel; + int remote_attnum; + int local_attnum; } SlotErrCallbackArg;Since LogicalRepRelMapEntry has a map of local attributes to remote
ones we don't need to have two attribute numbers.
Sorry for the late reply.
Attached the patch incorporated what I have on mind. Please review it.
Thanks for the patch, I will do it at this weekend.
---
Thanks and best regards,
Dang Minh Huong
NEC Solution Innovators, Ltd.
http://www.nec-solutioninnovators.co.jp/en/
On 2017/12/08 13:18, Huong Dangminh wrote:
Hi Sawada-san,
On Thu, Dec 7, 2017 at 11:07 AM, Masahiko Sawada <sawada.mshk@gmail.com>
wrote:On Thu, Dec 7, 2017 at 12:23 AM, Dang Minh Huong <kakalot49@gmail.com>
wrote:
Hi Sawada-san,
Sorry for my late response.
On 2017/12/05 0:11, Masahiko Sawada wrote:
There is one more case that user-defined data type is not supported
in Logical Replication.
That is when remote data type's name does not exist in SUBSCRIBE.In relation.c:logicalrep_typmap_gettypname
We search OID in syscache by remote's data type name and mapping it,
if it does not exist in syscache We will be faced with the bellow
error.if (!OidIsValid(entry->typoid))
ereport(ERROR,(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("data type \"%s.%s\" required
for logical replication does not exist",
entry->nspname,
entry->typname)));I think, it is not necessary to check typoid here in order to avoid
above case, is that right?I think it's not right. We should end up with an error in the case
where the same type name doesn't exist on subscriber. With your
proposed patch,
logicalrep_typmap_gettypname() can return an invalid string
(entry->typname) in that case, which can be a cause of SEGV.Thanks, I think you are right.
# I thought that entry->typname was received from walsender, and it
is already be qualified in logicalrep_write_typ.
# But we also need check it in subscriber, because it could be lost
info in transmission.Oops, the last sentence of my previous mail was wrong.
logicalrep_typmap_gettypname() doesn't return an invalid string since
entry->typname is initialized with a type name got from wal sender.
Yeah, so we do not need to check the existing of publisher's type name
in subscriber.
After more thought, we might not need to raise an error even if there
is not the same data type on both publisher and subscriber. Because
data is sent after converted to the text representation and is
converted to a data type according to the local table definition
subscribers don't always need to have the same data type. If it's
right, slot_store_error_callback() doesn't need to find a
corresponding local data type OID but just finds the corresponding
type name by remote data type OID. What do you think?
I totally agree. It will make logical replication more flexible with
data type.
--- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -100,8 +100,9 @@ static dlist_head lsn_mapping = DLIST_STATIC_INIT(lsn_mapping);typedef struct SlotErrCallbackArg { - LogicalRepRelation *rel; - int attnum; + LogicalRepRelMapEntry *rel; + int remote_attnum; + int local_attnum; } SlotErrCallbackArg;Since LogicalRepRelMapEntry has a map of local attributes to remote
ones we don't need to have two attribute numbers.Sorry for the late reply.
Attached the patch incorporated what I have on mind. Please review it.
Thanks for the patch, I will do it at this weekend.
Your patch is fine for me.
But logicalrep_typmap_getid will be unused.
I attached the patch with removing of it.
---
Thanks and best regards,
Dang Minh Huong
Attachments:
fix_slot_store_error_callback_v5.patchtext/plain; charset=UTF-8; name=fix_slot_store_error_callback_v5.patch; x-mac-creator=0; x-mac-type=0Download
diff --git a/src/backend/replication/logical/relation.c b/src/backend/replication/logical/relation.c
index 46e515e4b6..7858cecbb6 100644
--- a/src/backend/replication/logical/relation.c
+++ b/src/backend/replication/logical/relation.c
@@ -439,14 +439,13 @@ logicalrep_typmap_update(LogicalRepTyp *remotetyp)
}
/*
- * Fetch type info from the cache.
+ * Fetch type name from the cache by remote type OID.
*/
-Oid
-logicalrep_typmap_getid(Oid remoteid)
+char *
+logicalrep_typmap_gettypname(Oid remoteid)
{
LogicalRepTyp *entry;
bool found;
- Oid nspoid;
/* Internal types are mapped directly. */
if (remoteid < FirstNormalObjectId)
@@ -455,7 +454,7 @@ logicalrep_typmap_getid(Oid remoteid)
ereport(ERROR,
(errmsg("built-in type %u not found", remoteid),
errhint("This can be caused by having a publisher with a higher PostgreSQL major version than the subscriber.")));
- return remoteid;
+ return format_type_be(remoteid);
}
if (LogicalRepTypMap == NULL)
@@ -469,24 +468,7 @@ logicalrep_typmap_getid(Oid remoteid)
elog(ERROR, "no type map entry for remote type %u",
remoteid);
- /* Found and mapped, return the oid. */
- if (OidIsValid(entry->typoid))
- return entry->typoid;
+ Assert(OidIsValid(entry->remoteid));
- /* Otherwise, try to map to local type. */
- nspoid = LookupExplicitNamespace(entry->nspname, true);
- if (OidIsValid(nspoid))
- entry->typoid = GetSysCacheOid2(TYPENAMENSP,
- PointerGetDatum(entry->typname),
- ObjectIdGetDatum(nspoid));
- else
- entry->typoid = InvalidOid;
-
- if (!OidIsValid(entry->typoid))
- ereport(ERROR,
- (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
- errmsg("data type \"%s.%s\" required for logical replication does not exist",
- entry->nspname, entry->typname)));
-
- return entry->typoid;
+ return entry->typname;
}
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index eedc3a8816..8c96982608 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -100,8 +100,8 @@ static dlist_head lsn_mapping = DLIST_STATIC_INIT(lsn_mapping);
typedef struct SlotErrCallbackArg
{
- LogicalRepRelation *rel;
- int attnum;
+ LogicalRepRelMapEntry *rel;
+ int local_attnum;
} SlotErrCallbackArg;
static MemoryContext ApplyMessageContext = NULL;
@@ -279,20 +279,32 @@ slot_fill_defaults(LogicalRepRelMapEntry *rel, EState *estate,
static void
slot_store_error_callback(void *arg)
{
- SlotErrCallbackArg *errarg = (SlotErrCallbackArg *) arg;
+ SlotErrCallbackArg *errarg = (SlotErrCallbackArg *) arg;
+ LogicalRepRelMapEntry *rel;
+ char *remotetypname;
+ int remote_attnum;
Oid remotetypoid,
localtypoid;
- if (errarg->attnum < 0)
+ rel = errarg->rel;
+ remote_attnum = rel->attrmap[errarg->local_attnum];
+
+ if (remote_attnum < 0)
return;
- remotetypoid = errarg->rel->atttyps[errarg->attnum];
- localtypoid = logicalrep_typmap_getid(remotetypoid);
+ remotetypoid = rel->remoterel.atttyps[remote_attnum];
+
+ /* Fetch remote type name from the LogicalRepTypMap cache */
+ remotetypname = logicalrep_typmap_gettypname(remotetypoid);
+
+ /* Fetch local type OID from the local sys cache */
+ localtypoid = get_atttype(errarg->rel->localreloid, errarg->local_attnum + 1);
+
errcontext("processing remote data for replication target relation \"%s.%s\" column \"%s\", "
- "remote type %s, local type %s",
- errarg->rel->nspname, errarg->rel->relname,
- errarg->rel->attnames[errarg->attnum],
- format_type_be(remotetypoid),
+ "remote type \"%s\", local type \"%s\"",
+ rel->remoterel.nspname, rel->remoterel.relname,
+ rel->remoterel.attnames[remote_attnum],
+ remotetypname,
format_type_be(localtypoid));
}
@@ -313,8 +325,8 @@ slot_store_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel,
ExecClearTuple(slot);
/* Push callback + info on the error context stack */
- errarg.rel = &rel->remoterel;
- errarg.attnum = -1;
+ errarg.rel = rel;
+ errarg.local_attnum = -1;
errcallback.callback = slot_store_error_callback;
errcallback.arg = (void *) &errarg;
errcallback.previous = error_context_stack;
@@ -332,7 +344,7 @@ slot_store_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel,
Oid typinput;
Oid typioparam;
- errarg.attnum = remoteattnum;
+ errarg.local_attnum = i;
getTypeInputInfo(att->atttypid, &typinput, &typioparam);
slot->tts_values[i] = OidInputFunctionCall(typinput,
@@ -378,8 +390,8 @@ slot_modify_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel,
ExecClearTuple(slot);
/* Push callback + info on the error context stack */
- errarg.rel = &rel->remoterel;
- errarg.attnum = -1;
+ errarg.rel = rel;
+ errarg.local_attnum = -1;
errcallback.callback = slot_store_error_callback;
errcallback.arg = (void *) &errarg;
errcallback.previous = error_context_stack;
@@ -402,7 +414,7 @@ slot_modify_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel,
Oid typinput;
Oid typioparam;
- errarg.attnum = remoteattnum;
+ errarg.local_attnum = i;
getTypeInputInfo(att->atttypid, &typinput, &typioparam);
slot->tts_values[i] = OidInputFunctionCall(typinput,
diff --git a/src/include/replication/logicalrelation.h b/src/include/replication/logicalrelation.h
index 8352705650..563bb4f37d 100644
--- a/src/include/replication/logicalrelation.h
+++ b/src/include/replication/logicalrelation.h
@@ -37,6 +37,6 @@ extern void logicalrep_rel_close(LogicalRepRelMapEntry *rel,
LOCKMODE lockmode);
extern void logicalrep_typmap_update(LogicalRepTyp *remotetyp);
-extern Oid logicalrep_typmap_getid(Oid remoteid);
+extern char *logicalrep_typmap_gettypname(Oid remoteid);
#endif /* LOGICALRELATION_H */
On Sun, Dec 10, 2017 at 12:33 AM, Dang Minh Huong <kakalot49@gmail.com> wrote:
On 2017/12/08 13:18, Huong Dangminh wrote:
Hi Sawada-san,
On Thu, Dec 7, 2017 at 11:07 AM, Masahiko Sawada <sawada.mshk@gmail.com>
wrote:On Thu, Dec 7, 2017 at 12:23 AM, Dang Minh Huong <kakalot49@gmail.com>
wrote:
Hi Sawada-san,
Sorry for my late response.
On 2017/12/05 0:11, Masahiko Sawada wrote:
There is one more case that user-defined data type is not supported
in Logical Replication.
That is when remote data type's name does not exist in SUBSCRIBE.In relation.c:logicalrep_typmap_gettypname
We search OID in syscache by remote's data type name and mapping it,
if it does not exist in syscache We will be faced with the bellow
error.if (!OidIsValid(entry->typoid))
ereport(ERROR,(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("data type \"%s.%s\" required
for logical replication does not exist",
entry->nspname,
entry->typname)));I think, it is not necessary to check typoid here in order to avoid
above case, is that right?I think it's not right. We should end up with an error in the case
where the same type name doesn't exist on subscriber. With your
proposed patch,
logicalrep_typmap_gettypname() can return an invalid string
(entry->typname) in that case, which can be a cause of SEGV.Thanks, I think you are right.
# I thought that entry->typname was received from walsender, and it
is already be qualified in logicalrep_write_typ.
# But we also need check it in subscriber, because it could be lost
info in transmission.Oops, the last sentence of my previous mail was wrong.
logicalrep_typmap_gettypname() doesn't return an invalid string since
entry->typname is initialized with a type name got from wal sender.Yeah, so we do not need to check the existing of publisher's type name in
subscriber.After more thought, we might not need to raise an error even if there
is not the same data type on both publisher and subscriber. Because
data is sent after converted to the text representation and is
converted to a data type according to the local table definition
subscribers don't always need to have the same data type. If it's
right, slot_store_error_callback() doesn't need to find a
corresponding local data type OID but just finds the corresponding
type name by remote data type OID. What do you think?I totally agree. It will make logical replication more flexible with data
type.--- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -100,8 +100,9 @@ static dlist_head lsn_mapping = DLIST_STATIC_INIT(lsn_mapping);typedef struct SlotErrCallbackArg { - LogicalRepRelation *rel; - int attnum; + LogicalRepRelMapEntry *rel; + int remote_attnum; + int local_attnum; } SlotErrCallbackArg;Since LogicalRepRelMapEntry has a map of local attributes to remote
ones we don't need to have two attribute numbers.Sorry for the late reply.
Attached the patch incorporated what I have on mind. Please review it.
Thanks for the patch, I will do it at this weekend.
Your patch is fine for me.
But logicalrep_typmap_getid will be unused.
Yeah, actually we don't use LogicalRepTyp.typoid as well. If we allow
to replicate data to an another data type of column, what is the data
type mapping on subscriber for? It seems enough to have remote data
type oid, namespace and name.
Regards,
--
Masahiko Sawada
NIPPON TELEGRAPH AND TELEPHONE CORPORATION
NTT Open Source Software Center
Hi Sawada-san,
Sent: Tuesday, December 12, 2017 9:41 AM
To: Dang Minh Huong <kakalot49@gmail.com>
Cc: PostgreSQL Hackers <pgsql-hackers@postgresql.org>; Yanagisawa
Hiroshi(柳澤 博) <hir-yanagisawa@ut.jp.nec.com>; Dangminh Huong(ダンM
フーン) <huo-dangminh@ys.jp.nec.com>
Subject: Re: User defined data types in Logical ReplicationOn Sun, Dec 10, 2017 at 12:33 AM, Dang Minh Huong <kakalot49@gmail.com>
wrote:On 2017/12/08 13:18, Huong Dangminh wrote:
Hi Sawada-san,
On Thu, Dec 7, 2017 at 11:07 AM, Masahiko Sawada
<sawada.mshk@gmail.com>
wrote:On Thu, Dec 7, 2017 at 12:23 AM, Dang Minh Huong
<kakalot49@gmail.com>wrote:
Hi Sawada-san,
Sorry for my late response.
On 2017/12/05 0:11, Masahiko Sawada wrote:
There is one more case that user-defined data type is not
supported in Logical Replication.
That is when remote data type's name does not exist in SUBSCRIBE.In relation.c:logicalrep_typmap_gettypname
We search OID in syscache by remote's data type name and mapping
it, if it does not exist in syscache We will be faced with the
bellow error.if (!OidIsValid(entry->typoid))
ereport(ERROR,(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("data type \"%s.%s\"
required for logical replication does not exist",
entry->nspname,
entry->typname)));I think, it is not necessary to check typoid here in order to
avoid above case, is that right?I think it's not right. We should end up with an error in the case
where the same type name doesn't exist on subscriber. With your
proposed patch,
logicalrep_typmap_gettypname() can return an invalid string
(entry->typname) in that case, which can be a cause of SEGV.Thanks, I think you are right.
# I thought that entry->typname was received from walsender, and
it is already be qualified in logicalrep_write_typ.
# But we also need check it in subscriber, because it could be
lost info in transmission.Oops, the last sentence of my previous mail was wrong.
logicalrep_typmap_gettypname() doesn't return an invalid string
since
entry->typname is initialized with a type name got from wal sender.Yeah, so we do not need to check the existing of publisher's type name
in subscriber.After more thought, we might not need to raise an error even if
there is not the same data type on both publisher and subscriber.
Because data is sent after converted to the text representation and
is converted to a data type according to the local table definition
subscribers don't always need to have the same data type. If it's
right, slot_store_error_callback() doesn't need to find a
corresponding local data type OID but just finds the corresponding
type name by remote data type OID. What do you think?I totally agree. It will make logical replication more flexible with
data type.--- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -100,8 +100,9 @@ static dlist_head lsn_mapping = DLIST_STATIC_INIT(lsn_mapping);typedef struct SlotErrCallbackArg { - LogicalRepRelation *rel; - int attnum; + LogicalRepRelMapEntry *rel; + int remote_attnum; + int local_attnum; } SlotErrCallbackArg;Since LogicalRepRelMapEntry has a map of local attributes to remote
ones we don't need to have two attribute numbers.Sorry for the late reply.
Attached the patch incorporated what I have on mind. Please review it.
Thanks for the patch, I will do it at this weekend.
Your patch is fine for me.
But logicalrep_typmap_getid will be unused.Yeah, actually we don't use LogicalRepTyp.typoid as well. If we allow to
replicate data to an another data type of column, what is the data type
mapping on subscriber for? It seems enough to have remote data type oid,
namespace and name.
Yeah, so the meaning of the type map will be disappeared, aren't you?
I updated the patch with removing of LogicalRepTyp.typoid and changing
concept "typmap" to "remote type".
How do you think?
---
Thanks and best regards,
Dang Minh Huong
NEC Solution Innovators, Ltd.
http://www.nec-solutioninnovators.co.jp/en/
Attachments:
fix_slot_store_error_callback_v6.patchapplication/octet-stream; name=fix_slot_store_error_callback_v6.patchDownload
diff --git a/src/backend/replication/logical/relation.c b/src/backend/replication/logical/relation.c
index 46e515e..4300429 100644
--- a/src/backend/replication/logical/relation.c
+++ b/src/backend/replication/logical/relation.c
@@ -33,9 +33,9 @@
static MemoryContext LogicalRepRelMapContext = NULL;
static HTAB *LogicalRepRelMap = NULL;
-static HTAB *LogicalRepTypMap = NULL;
+static HTAB *LogicalRepRemoteTyp = NULL;
-static void logicalrep_typmap_invalidate_cb(Datum arg, int cacheid,
+static void logicalrep_remotetyp_invalidate_cb(Datum arg, int cacheid,
uint32 hashvalue);
/*
@@ -72,7 +72,7 @@ logicalrep_relmap_invalidate_cb(Datum arg, Oid reloid)
/* invalidate all cache entries */
HASH_SEQ_STATUS status;
- hash_seq_init(&status, LogicalRepRelMap);
+ hash_seq_init(&status, LogicalRepRemoteTyp);
while ((entry = (LogicalRepRelMapEntry *) hash_seq_search(&status)) != NULL)
entry->localreloid = InvalidOid;
@@ -109,13 +109,13 @@ logicalrep_relmap_init(void)
ctl.hcxt = LogicalRepRelMapContext;
/* This will usually be small. */
- LogicalRepTypMap = hash_create("logicalrep type map cache", 2, &ctl,
+ LogicalRepRemoteTyp = hash_create("logicalrep remote type cache", 2, &ctl,
HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
/* Watch for invalidation events. */
CacheRegisterRelcacheCallback(logicalrep_relmap_invalidate_cb,
(Datum) 0);
- CacheRegisterSyscacheCallback(TYPEOID, logicalrep_typmap_invalidate_cb,
+ CacheRegisterSyscacheCallback(TYPEOID, logicalrep_remotetyp_invalidate_cb,
(Datum) 0);
}
@@ -376,58 +376,52 @@ logicalrep_rel_close(LogicalRepRelMapEntry *rel, LOCKMODE lockmode)
/*
- * Type cache invalidation callback for our type map cache.
+ * Type cache invalidation callback for our remote type cache.
*/
static void
-logicalrep_typmap_invalidate_cb(Datum arg, int cacheid, uint32 hashvalue)
+logicalrep_remotetyp_invalidate_cb(Datum arg, int cacheid, uint32 hashvalue)
{
HASH_SEQ_STATUS status;
- LogicalRepTyp *entry;
/* Just to be sure. */
- if (LogicalRepTypMap == NULL)
+ if (LogicalRepRemoteTyp == NULL)
return;
/* invalidate all cache entries */
- hash_seq_init(&status, LogicalRepTypMap);
-
- while ((entry = (LogicalRepTyp *) hash_seq_search(&status)) != NULL)
- entry->typoid = InvalidOid;
+ hash_seq_init(&status, LogicalRepRemoteTyp);
}
/*
- * Free the type map cache entry data.
+ * Free the remote type cache entry data.
*/
static void
-logicalrep_typmap_free_entry(LogicalRepTyp *entry)
+logicalrep_remotetyp_free_entry(LogicalRepTyp *entry)
{
pfree(entry->nspname);
pfree(entry->typname);
-
- entry->typoid = InvalidOid;
}
/*
- * Add new entry or update existing entry in the type map cache.
+ * Add new entry or update existing entry in the remote type cache.
*/
void
-logicalrep_typmap_update(LogicalRepTyp *remotetyp)
+logicalrep_remotetyp_update(LogicalRepTyp *remotetyp)
{
MemoryContext oldctx;
LogicalRepTyp *entry;
bool found;
- if (LogicalRepTypMap == NULL)
+ if (LogicalRepRemoteTyp == NULL)
logicalrep_relmap_init();
/*
* HASH_ENTER returns the existing entry if present or creates a new one.
*/
- entry = hash_search(LogicalRepTypMap, (void *) &remotetyp->remoteid,
+ entry = hash_search(LogicalRepRemoteTyp, (void *) &remotetyp->remoteid,
HASH_ENTER, &found);
if (found)
- logicalrep_typmap_free_entry(entry);
+ logicalrep_remotetyp_free_entry(entry);
/* Make cached copy of the data */
entry->remoteid = remotetyp->remoteid;
@@ -435,58 +429,39 @@ logicalrep_typmap_update(LogicalRepTyp *remotetyp)
entry->nspname = pstrdup(remotetyp->nspname);
entry->typname = pstrdup(remotetyp->typname);
MemoryContextSwitchTo(oldctx);
- entry->typoid = InvalidOid;
}
/*
- * Fetch type info from the cache.
+ * Fetch type name from the cache by remote type OID.
*/
-Oid
-logicalrep_typmap_getid(Oid remoteid)
+char *
+logicalrep_get_remotetypname(Oid remoteid)
{
LogicalRepTyp *entry;
bool found;
- Oid nspoid;
- /* Internal types are mapped directly. */
+ /* If internal types, we can get from local. */
if (remoteid < FirstNormalObjectId)
{
if (!get_typisdefined(remoteid))
ereport(ERROR,
(errmsg("built-in type %u not found", remoteid),
errhint("This can be caused by having a publisher with a higher PostgreSQL major version than the subscriber.")));
- return remoteid;
+ return format_type_be(remoteid);
}
- if (LogicalRepTypMap == NULL)
+ if (LogicalRepRemoteTyp == NULL)
logicalrep_relmap_init();
- /* Try finding the mapping. */
- entry = hash_search(LogicalRepTypMap, (void *) &remoteid,
+ /* Try finding in cache. */
+ entry = hash_search(LogicalRepRemoteTyp, (void *) &remoteid,
HASH_FIND, &found);
if (!found)
elog(ERROR, "no type map entry for remote type %u",
remoteid);
- /* Found and mapped, return the oid. */
- if (OidIsValid(entry->typoid))
- return entry->typoid;
-
- /* Otherwise, try to map to local type. */
- nspoid = LookupExplicitNamespace(entry->nspname, true);
- if (OidIsValid(nspoid))
- entry->typoid = GetSysCacheOid2(TYPENAMENSP,
- PointerGetDatum(entry->typname),
- ObjectIdGetDatum(nspoid));
- else
- entry->typoid = InvalidOid;
-
- if (!OidIsValid(entry->typoid))
- ereport(ERROR,
- (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
- errmsg("data type \"%s.%s\" required for logical replication does not exist",
- entry->nspname, entry->typname)));
+ Assert(OidIsValid(entry->remoteid));
- return entry->typoid;
+ return entry->typname;
}
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index e46a62e..7ef4fe5 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -100,8 +100,8 @@ static dlist_head lsn_mapping = DLIST_STATIC_INIT(lsn_mapping);
typedef struct SlotErrCallbackArg
{
- LogicalRepRelation *rel;
- int attnum;
+ LogicalRepRelMapEntry *rel;
+ int local_attnum;
} SlotErrCallbackArg;
static MemoryContext ApplyMessageContext = NULL;
@@ -281,20 +281,32 @@ slot_fill_defaults(LogicalRepRelMapEntry *rel, EState *estate,
static void
slot_store_error_callback(void *arg)
{
- SlotErrCallbackArg *errarg = (SlotErrCallbackArg *) arg;
+ SlotErrCallbackArg *errarg = (SlotErrCallbackArg *) arg;
+ LogicalRepRelMapEntry *rel;
+ char *remotetypname;
+ int remote_attnum;
Oid remotetypoid,
localtypoid;
- if (errarg->attnum < 0)
+ rel = errarg->rel;
+ remote_attnum = rel->attrmap[errarg->local_attnum];
+
+ if (remote_attnum < 0)
return;
- remotetypoid = errarg->rel->atttyps[errarg->attnum];
- localtypoid = logicalrep_typmap_getid(remotetypoid);
+ remotetypoid = rel->remoterel.atttyps[remote_attnum];
+
+ /* Fetch remote type name from cache */
+ remotetypname = logicalrep_get_remotetypname(remotetypoid);
+
+ /* Fetch local type OID from the local sys cache */
+ localtypoid = get_atttype(errarg->rel->localreloid, errarg->local_attnum + 1);
+
errcontext("processing remote data for replication target relation \"%s.%s\" column \"%s\", "
- "remote type %s, local type %s",
- errarg->rel->nspname, errarg->rel->relname,
- errarg->rel->attnames[errarg->attnum],
- format_type_be(remotetypoid),
+ "remote type \"%s\", local type \"%s\"",
+ rel->remoterel.nspname, rel->remoterel.relname,
+ rel->remoterel.attnames[remote_attnum],
+ remotetypname,
format_type_be(localtypoid));
}
@@ -315,8 +327,8 @@ slot_store_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel,
ExecClearTuple(slot);
/* Push callback + info on the error context stack */
- errarg.rel = &rel->remoterel;
- errarg.attnum = -1;
+ errarg.rel = rel;
+ errarg.local_attnum = -1;
errcallback.callback = slot_store_error_callback;
errcallback.arg = (void *) &errarg;
errcallback.previous = error_context_stack;
@@ -334,8 +346,8 @@ slot_store_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel,
Oid typinput;
Oid typioparam;
- errarg.attnum = remoteattnum;
-
+ errarg.local_attnum = i;
+ elog(LOG,"test");
getTypeInputInfo(att->atttypid, &typinput, &typioparam);
slot->tts_values[i] = OidInputFunctionCall(typinput,
values[remoteattnum],
@@ -380,8 +392,8 @@ slot_modify_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel,
ExecClearTuple(slot);
/* Push callback + info on the error context stack */
- errarg.rel = &rel->remoterel;
- errarg.attnum = -1;
+ errarg.rel = rel;
+ errarg.local_attnum = -1;
errcallback.callback = slot_store_error_callback;
errcallback.arg = (void *) &errarg;
errcallback.previous = error_context_stack;
@@ -404,7 +416,7 @@ slot_modify_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel,
Oid typinput;
Oid typioparam;
- errarg.attnum = remoteattnum;
+ errarg.local_attnum = i;
getTypeInputInfo(att->atttypid, &typinput, &typioparam);
slot->tts_values[i] = OidInputFunctionCall(typinput,
@@ -535,7 +547,7 @@ apply_handle_type(StringInfo s)
LogicalRepTyp typ;
logicalrep_read_typ(s, &typ);
- logicalrep_typmap_update(&typ);
+ logicalrep_remotetyp_update(&typ);
}
/*
diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h
index a9736e1..b8a7957 100644
--- a/src/include/replication/logicalproto.h
+++ b/src/include/replication/logicalproto.h
@@ -52,13 +52,12 @@ typedef struct LogicalRepRelation
Bitmapset *attkeys; /* Bitmap of key columns */
} LogicalRepRelation;
-/* Type mapping info */
+/* Remote type info */
typedef struct LogicalRepTyp
{
- Oid remoteid; /* unique id of the type */
- char *nspname; /* schema name */
- char *typname; /* name of the type */
- Oid typoid; /* local type Oid */
+ Oid remoteid; /* unique id of the remote type */
+ char *nspname; /* schema name of remote type */
+ char *typname; /* remote type name */
} LogicalRepTyp;
/* Transaction info */
diff --git a/src/include/replication/logicalrelation.h b/src/include/replication/logicalrelation.h
index 8352705..1016e68 100644
--- a/src/include/replication/logicalrelation.h
+++ b/src/include/replication/logicalrelation.h
@@ -36,7 +36,7 @@ extern LogicalRepRelMapEntry *logicalrep_rel_open(LogicalRepRelId remoteid,
extern void logicalrep_rel_close(LogicalRepRelMapEntry *rel,
LOCKMODE lockmode);
-extern void logicalrep_typmap_update(LogicalRepTyp *remotetyp);
-extern Oid logicalrep_typmap_getid(Oid remoteid);
+extern void logicalrep_remotetyp_update(LogicalRepTyp *remotetyp);
+extern char *logicalrep_get_remotetypname(Oid remoteid);
#endif /* LOGICALRELATION_H */
Hi Sawada-san,
Sent: Tuesday, December 12, 2017 9:41 AM
To: Dang Minh Huong <kakalot49@gmail.com>
Cc: PostgreSQL Hackers <pgsql-hackers@postgresql.org>; Yanagisawa
Hiroshi(柳澤 博) <hir-yanagisawa@ut.jp.nec.com>; Dangminh Huong(ダンM
フーン) <huo-dangminh@ys.jp.nec.com>
Subject: Re: User defined data types in Logical ReplicationOn Sun, Dec 10, 2017 at 12:33 AM, Dang Minh Huong
<kakalot49@gmail.com>
wrote:On 2017/12/08 13:18, Huong Dangminh wrote:
Hi Sawada-san,
On Thu, Dec 7, 2017 at 11:07 AM, Masahiko Sawada
<sawada.mshk@gmail.com>
wrote:On Thu, Dec 7, 2017 at 12:23 AM, Dang Minh Huong
<kakalot49@gmail.com>wrote:
Hi Sawada-san,
Sorry for my late response.
On 2017/12/05 0:11, Masahiko Sawada wrote:
There is one more case that user-defined data type is not
supported in Logical Replication.
That is when remote data type's name does not exist in SUBSCRIBE.In relation.c:logicalrep_typmap_gettypname
We search OID in syscache by remote's data type name and mapping
it, if it does not exist in syscache We will be faced with the
bellow error.if (!OidIsValid(entry->typoid))
ereport(ERROR,(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("data type \"%s.%s\"
required for logical replication does not exist",entry->nspname,
entry->typname)));
I think, it is not necessary to check typoid here in order to
avoid above case, is that right?I think it's not right. We should end up with an error in the
case where the same type name doesn't exist on subscriber. With
your proposed patch,
logicalrep_typmap_gettypname() can return an invalid string
(entry->typname) in that case, which can be a cause of SEGV.Thanks, I think you are right.
# I thought that entry->typname was received from walsender, and
it is already be qualified in logicalrep_write_typ.
# But we also need check it in subscriber, because it could be
lost info in transmission.Oops, the last sentence of my previous mail was wrong.
logicalrep_typmap_gettypname() doesn't return an invalid string
since
entry->typname is initialized with a type name got from wal sender.Yeah, so we do not need to check the existing of publisher's type
name in subscriber.After more thought, we might not need to raise an error even if
there is not the same data type on both publisher and subscriber.
Because data is sent after converted to the text representation
and is converted to a data type according to the local table
definition subscribers don't always need to have the same data
type. If it's right, slot_store_error_callback() doesn't need to
find a corresponding local data type OID but just finds the
corresponding type name by remote data type OID. What do you think?I totally agree. It will make logical replication more flexible with
data type.--- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -100,8 +100,9 @@ static dlist_head lsn_mapping = DLIST_STATIC_INIT(lsn_mapping);typedef struct SlotErrCallbackArg { - LogicalRepRelation *rel; - int attnum; + LogicalRepRelMapEntry *rel; + int remote_attnum; + int local_attnum; } SlotErrCallbackArg;Since LogicalRepRelMapEntry has a map of local attributes to
remote ones we don't need to have two attribute numbers.Sorry for the late reply.
Attached the patch incorporated what I have on mind. Please review
it.
Thanks for the patch, I will do it at this weekend.
Your patch is fine for me.
But logicalrep_typmap_getid will be unused.Yeah, actually we don't use LogicalRepTyp.typoid as well. If we allow
to replicate data to an another data type of column, what is the data
type mapping on subscriber for? It seems enough to have remote data
type oid, namespace and name.Yeah, so the meaning of the type map will be disappeared, aren't you?
I updated the patch with removing of LogicalRepTyp.typoid and changing
concept "typmap" to "remote type".
How do you think?
Sorry, Please confirm V7 of patch (attached in this mail).
---
Thanks and best regards,
Dang Minh Huong
NEC Solution Innovators, Ltd.
http://www.nec-solutioninnovators.co.jp/en/
Attachments:
fix_slot_store_error_callback_v7.patchapplication/octet-stream; name=fix_slot_store_error_callback_v7.patchDownload
diff --git a/src/backend/replication/logical/relation.c b/src/backend/replication/logical/relation.c
index 46e515e..4300429 100644
--- a/src/backend/replication/logical/relation.c
+++ b/src/backend/replication/logical/relation.c
@@ -33,9 +33,9 @@
static MemoryContext LogicalRepRelMapContext = NULL;
static HTAB *LogicalRepRelMap = NULL;
-static HTAB *LogicalRepTypMap = NULL;
+static HTAB *LogicalRepRemoteTyp = NULL;
-static void logicalrep_typmap_invalidate_cb(Datum arg, int cacheid,
+static void logicalrep_remotetyp_invalidate_cb(Datum arg, int cacheid,
uint32 hashvalue);
/*
@@ -72,7 +72,7 @@ logicalrep_relmap_invalidate_cb(Datum arg, Oid reloid)
/* invalidate all cache entries */
HASH_SEQ_STATUS status;
- hash_seq_init(&status, LogicalRepRelMap);
+ hash_seq_init(&status, LogicalRepRemoteTyp);
while ((entry = (LogicalRepRelMapEntry *) hash_seq_search(&status)) != NULL)
entry->localreloid = InvalidOid;
@@ -109,13 +109,13 @@ logicalrep_relmap_init(void)
ctl.hcxt = LogicalRepRelMapContext;
/* This will usually be small. */
- LogicalRepTypMap = hash_create("logicalrep type map cache", 2, &ctl,
+ LogicalRepRemoteTyp = hash_create("logicalrep remote type cache", 2, &ctl,
HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
/* Watch for invalidation events. */
CacheRegisterRelcacheCallback(logicalrep_relmap_invalidate_cb,
(Datum) 0);
- CacheRegisterSyscacheCallback(TYPEOID, logicalrep_typmap_invalidate_cb,
+ CacheRegisterSyscacheCallback(TYPEOID, logicalrep_remotetyp_invalidate_cb,
(Datum) 0);
}
@@ -376,58 +376,52 @@ logicalrep_rel_close(LogicalRepRelMapEntry *rel, LOCKMODE lockmode)
/*
- * Type cache invalidation callback for our type map cache.
+ * Type cache invalidation callback for our remote type cache.
*/
static void
-logicalrep_typmap_invalidate_cb(Datum arg, int cacheid, uint32 hashvalue)
+logicalrep_remotetyp_invalidate_cb(Datum arg, int cacheid, uint32 hashvalue)
{
HASH_SEQ_STATUS status;
- LogicalRepTyp *entry;
/* Just to be sure. */
- if (LogicalRepTypMap == NULL)
+ if (LogicalRepRemoteTyp == NULL)
return;
/* invalidate all cache entries */
- hash_seq_init(&status, LogicalRepTypMap);
-
- while ((entry = (LogicalRepTyp *) hash_seq_search(&status)) != NULL)
- entry->typoid = InvalidOid;
+ hash_seq_init(&status, LogicalRepRemoteTyp);
}
/*
- * Free the type map cache entry data.
+ * Free the remote type cache entry data.
*/
static void
-logicalrep_typmap_free_entry(LogicalRepTyp *entry)
+logicalrep_remotetyp_free_entry(LogicalRepTyp *entry)
{
pfree(entry->nspname);
pfree(entry->typname);
-
- entry->typoid = InvalidOid;
}
/*
- * Add new entry or update existing entry in the type map cache.
+ * Add new entry or update existing entry in the remote type cache.
*/
void
-logicalrep_typmap_update(LogicalRepTyp *remotetyp)
+logicalrep_remotetyp_update(LogicalRepTyp *remotetyp)
{
MemoryContext oldctx;
LogicalRepTyp *entry;
bool found;
- if (LogicalRepTypMap == NULL)
+ if (LogicalRepRemoteTyp == NULL)
logicalrep_relmap_init();
/*
* HASH_ENTER returns the existing entry if present or creates a new one.
*/
- entry = hash_search(LogicalRepTypMap, (void *) &remotetyp->remoteid,
+ entry = hash_search(LogicalRepRemoteTyp, (void *) &remotetyp->remoteid,
HASH_ENTER, &found);
if (found)
- logicalrep_typmap_free_entry(entry);
+ logicalrep_remotetyp_free_entry(entry);
/* Make cached copy of the data */
entry->remoteid = remotetyp->remoteid;
@@ -435,58 +429,39 @@ logicalrep_typmap_update(LogicalRepTyp *remotetyp)
entry->nspname = pstrdup(remotetyp->nspname);
entry->typname = pstrdup(remotetyp->typname);
MemoryContextSwitchTo(oldctx);
- entry->typoid = InvalidOid;
}
/*
- * Fetch type info from the cache.
+ * Fetch type name from the cache by remote type OID.
*/
-Oid
-logicalrep_typmap_getid(Oid remoteid)
+char *
+logicalrep_get_remotetypname(Oid remoteid)
{
LogicalRepTyp *entry;
bool found;
- Oid nspoid;
- /* Internal types are mapped directly. */
+ /* If internal types, we can get from local. */
if (remoteid < FirstNormalObjectId)
{
if (!get_typisdefined(remoteid))
ereport(ERROR,
(errmsg("built-in type %u not found", remoteid),
errhint("This can be caused by having a publisher with a higher PostgreSQL major version than the subscriber.")));
- return remoteid;
+ return format_type_be(remoteid);
}
- if (LogicalRepTypMap == NULL)
+ if (LogicalRepRemoteTyp == NULL)
logicalrep_relmap_init();
- /* Try finding the mapping. */
- entry = hash_search(LogicalRepTypMap, (void *) &remoteid,
+ /* Try finding in cache. */
+ entry = hash_search(LogicalRepRemoteTyp, (void *) &remoteid,
HASH_FIND, &found);
if (!found)
elog(ERROR, "no type map entry for remote type %u",
remoteid);
- /* Found and mapped, return the oid. */
- if (OidIsValid(entry->typoid))
- return entry->typoid;
-
- /* Otherwise, try to map to local type. */
- nspoid = LookupExplicitNamespace(entry->nspname, true);
- if (OidIsValid(nspoid))
- entry->typoid = GetSysCacheOid2(TYPENAMENSP,
- PointerGetDatum(entry->typname),
- ObjectIdGetDatum(nspoid));
- else
- entry->typoid = InvalidOid;
-
- if (!OidIsValid(entry->typoid))
- ereport(ERROR,
- (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
- errmsg("data type \"%s.%s\" required for logical replication does not exist",
- entry->nspname, entry->typname)));
+ Assert(OidIsValid(entry->remoteid));
- return entry->typoid;
+ return entry->typname;
}
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index e46a62e..1380800 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -100,8 +100,8 @@ static dlist_head lsn_mapping = DLIST_STATIC_INIT(lsn_mapping);
typedef struct SlotErrCallbackArg
{
- LogicalRepRelation *rel;
- int attnum;
+ LogicalRepRelMapEntry *rel;
+ int local_attnum;
} SlotErrCallbackArg;
static MemoryContext ApplyMessageContext = NULL;
@@ -281,20 +281,32 @@ slot_fill_defaults(LogicalRepRelMapEntry *rel, EState *estate,
static void
slot_store_error_callback(void *arg)
{
- SlotErrCallbackArg *errarg = (SlotErrCallbackArg *) arg;
+ SlotErrCallbackArg *errarg = (SlotErrCallbackArg *) arg;
+ LogicalRepRelMapEntry *rel;
+ char *remotetypname;
+ int remote_attnum;
Oid remotetypoid,
localtypoid;
- if (errarg->attnum < 0)
+ rel = errarg->rel;
+ remote_attnum = rel->attrmap[errarg->local_attnum];
+
+ if (remote_attnum < 0)
return;
- remotetypoid = errarg->rel->atttyps[errarg->attnum];
- localtypoid = logicalrep_typmap_getid(remotetypoid);
+ remotetypoid = rel->remoterel.atttyps[remote_attnum];
+
+ /* Fetch remote type name from cache */
+ remotetypname = logicalrep_get_remotetypname(remotetypoid);
+
+ /* Fetch local type OID from the local sys cache */
+ localtypoid = get_atttype(errarg->rel->localreloid, errarg->local_attnum + 1);
+
errcontext("processing remote data for replication target relation \"%s.%s\" column \"%s\", "
- "remote type %s, local type %s",
- errarg->rel->nspname, errarg->rel->relname,
- errarg->rel->attnames[errarg->attnum],
- format_type_be(remotetypoid),
+ "remote type \"%s\", local type \"%s\"",
+ rel->remoterel.nspname, rel->remoterel.relname,
+ rel->remoterel.attnames[remote_attnum],
+ remotetypname,
format_type_be(localtypoid));
}
@@ -315,8 +327,8 @@ slot_store_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel,
ExecClearTuple(slot);
/* Push callback + info on the error context stack */
- errarg.rel = &rel->remoterel;
- errarg.attnum = -1;
+ errarg.rel = rel;
+ errarg.local_attnum = -1;
errcallback.callback = slot_store_error_callback;
errcallback.arg = (void *) &errarg;
errcallback.previous = error_context_stack;
@@ -334,8 +346,7 @@ slot_store_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel,
Oid typinput;
Oid typioparam;
- errarg.attnum = remoteattnum;
-
+ errarg.local_attnum = i;
getTypeInputInfo(att->atttypid, &typinput, &typioparam);
slot->tts_values[i] = OidInputFunctionCall(typinput,
values[remoteattnum],
@@ -380,8 +391,8 @@ slot_modify_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel,
ExecClearTuple(slot);
/* Push callback + info on the error context stack */
- errarg.rel = &rel->remoterel;
- errarg.attnum = -1;
+ errarg.rel = rel;
+ errarg.local_attnum = -1;
errcallback.callback = slot_store_error_callback;
errcallback.arg = (void *) &errarg;
errcallback.previous = error_context_stack;
@@ -404,7 +415,7 @@ slot_modify_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel,
Oid typinput;
Oid typioparam;
- errarg.attnum = remoteattnum;
+ errarg.local_attnum = i;
getTypeInputInfo(att->atttypid, &typinput, &typioparam);
slot->tts_values[i] = OidInputFunctionCall(typinput,
@@ -535,7 +546,7 @@ apply_handle_type(StringInfo s)
LogicalRepTyp typ;
logicalrep_read_typ(s, &typ);
- logicalrep_typmap_update(&typ);
+ logicalrep_remotetyp_update(&typ);
}
/*
diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h
index a9736e1..b8a7957 100644
--- a/src/include/replication/logicalproto.h
+++ b/src/include/replication/logicalproto.h
@@ -52,13 +52,12 @@ typedef struct LogicalRepRelation
Bitmapset *attkeys; /* Bitmap of key columns */
} LogicalRepRelation;
-/* Type mapping info */
+/* Remote type info */
typedef struct LogicalRepTyp
{
- Oid remoteid; /* unique id of the type */
- char *nspname; /* schema name */
- char *typname; /* name of the type */
- Oid typoid; /* local type Oid */
+ Oid remoteid; /* unique id of the remote type */
+ char *nspname; /* schema name of remote type */
+ char *typname; /* remote type name */
} LogicalRepTyp;
/* Transaction info */
diff --git a/src/include/replication/logicalrelation.h b/src/include/replication/logicalrelation.h
index 8352705..1016e68 100644
--- a/src/include/replication/logicalrelation.h
+++ b/src/include/replication/logicalrelation.h
@@ -36,7 +36,7 @@ extern LogicalRepRelMapEntry *logicalrep_rel_open(LogicalRepRelId remoteid,
extern void logicalrep_rel_close(LogicalRepRelMapEntry *rel,
LOCKMODE lockmode);
-extern void logicalrep_typmap_update(LogicalRepTyp *remotetyp);
-extern Oid logicalrep_typmap_getid(Oid remoteid);
+extern void logicalrep_remotetyp_update(LogicalRepTyp *remotetyp);
+extern char *logicalrep_get_remotetypname(Oid remoteid);
#endif /* LOGICALRELATION_H */
Hi,
On 18/12/17 14:28, Huong Dangminh wrote:
Your patch is fine for me.
But logicalrep_typmap_getid will be unused.Yeah, actually we don't use LogicalRepTyp.typoid as well. If we allow
to replicate data to an another data type of column, what is the data
type mapping on subscriber for? It seems enough to have remote data
type oid, namespace and name.Yeah, so the meaning of the type map will be disappeared, aren't you?
I updated the patch with removing of LogicalRepTyp.typoid and changing
concept "typmap" to "remote type".
How do you think?Sorry, Please confirm V7 of patch (attached in this mail).
I think the changes make sense in terms of how it all works now.
That said I don't think the renaming idea is a good one, the naming was
chosen to be future proof because eventually we'll need to map types to
local oid (and possibly more) where the local info is cached so that we
can interpret binary representation of replicated data (which we'll add
at some point since it's big performance boost).
So I am afraid that if we do the rename of typmap to remotetype in this
patch it will a) make backports of fixes in the related code harder, b)
force us to rename it back again in the future.
I'd keep your general approach but keep using typmap naming.
--
Petr Jelinek http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Training & Services
Hi Petr Jelineks, Sawada-san
I think the changes make sense in terms of how it all works now.
That said I don't think the renaming idea is a good one, the naming was
chosen to be future proof because eventually we'll need to map types to
local oid (and possibly more) where the local info is cached so that we
can interpret binary representation of replicated data (which we'll add
at some point since it's big performance boost).So I am afraid that if we do the rename of typmap to remotetype in this
patch it will a) make backports of fixes in the related code harder, b)
force us to rename it back again in the future.
Thanks for your comment.
I'd keep your general approach but keep using typmap naming.
I update the patch as Petr Jelineks mention, keep using typmap naming.
---
Thanks and best regards,
Dang Minh Huong
NEC Solution Innovators, Ltd.
http://www.nec-solutioninnovators.co.jp/en/
Attachments:
fix_slot_store_error_callback_v8.patchapplication/octet-stream; name=fix_slot_store_error_callback_v8.patchDownload
diff --git a/src/backend/replication/logical/relation.c b/src/backend/replication/logical/relation.c
index 46e515e..99c1bba 100644
--- a/src/backend/replication/logical/relation.c
+++ b/src/backend/replication/logical/relation.c
@@ -382,7 +382,6 @@ static void
logicalrep_typmap_invalidate_cb(Datum arg, int cacheid, uint32 hashvalue)
{
HASH_SEQ_STATUS status;
- LogicalRepTyp *entry;
/* Just to be sure. */
if (LogicalRepTypMap == NULL)
@@ -390,9 +389,6 @@ logicalrep_typmap_invalidate_cb(Datum arg, int cacheid, uint32 hashvalue)
/* invalidate all cache entries */
hash_seq_init(&status, LogicalRepTypMap);
-
- while ((entry = (LogicalRepTyp *) hash_seq_search(&status)) != NULL)
- entry->typoid = InvalidOid;
}
/*
@@ -403,8 +399,6 @@ logicalrep_typmap_free_entry(LogicalRepTyp *entry)
{
pfree(entry->nspname);
pfree(entry->typname);
-
- entry->typoid = InvalidOid;
}
/*
@@ -435,18 +429,16 @@ logicalrep_typmap_update(LogicalRepTyp *remotetyp)
entry->nspname = pstrdup(remotetyp->nspname);
entry->typname = pstrdup(remotetyp->typname);
MemoryContextSwitchTo(oldctx);
- entry->typoid = InvalidOid;
}
/*
- * Fetch type info from the cache.
+ * Fetch type name from the cache by remote type OID.
*/
-Oid
-logicalrep_typmap_getid(Oid remoteid)
+char *
+logicalrep_typmap_gettypname(Oid remoteid)
{
LogicalRepTyp *entry;
bool found;
- Oid nspoid;
/* Internal types are mapped directly. */
if (remoteid < FirstNormalObjectId)
@@ -455,7 +447,7 @@ logicalrep_typmap_getid(Oid remoteid)
ereport(ERROR,
(errmsg("built-in type %u not found", remoteid),
errhint("This can be caused by having a publisher with a higher PostgreSQL major version than the subscriber.")));
- return remoteid;
+ return format_type_be(remoteid);
}
if (LogicalRepTypMap == NULL)
@@ -469,24 +461,7 @@ logicalrep_typmap_getid(Oid remoteid)
elog(ERROR, "no type map entry for remote type %u",
remoteid);
- /* Found and mapped, return the oid. */
- if (OidIsValid(entry->typoid))
- return entry->typoid;
-
- /* Otherwise, try to map to local type. */
- nspoid = LookupExplicitNamespace(entry->nspname, true);
- if (OidIsValid(nspoid))
- entry->typoid = GetSysCacheOid2(TYPENAMENSP,
- PointerGetDatum(entry->typname),
- ObjectIdGetDatum(nspoid));
- else
- entry->typoid = InvalidOid;
-
- if (!OidIsValid(entry->typoid))
- ereport(ERROR,
- (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
- errmsg("data type \"%s.%s\" required for logical replication does not exist",
- entry->nspname, entry->typname)));
+ Assert(OidIsValid(entry->remoteid));
- return entry->typoid;
+ return entry->typname;
}
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index e46a62e..108e1e3 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -100,8 +100,8 @@ static dlist_head lsn_mapping = DLIST_STATIC_INIT(lsn_mapping);
typedef struct SlotErrCallbackArg
{
- LogicalRepRelation *rel;
- int attnum;
+ LogicalRepRelMapEntry *rel;
+ int local_attnum;
} SlotErrCallbackArg;
static MemoryContext ApplyMessageContext = NULL;
@@ -281,20 +281,32 @@ slot_fill_defaults(LogicalRepRelMapEntry *rel, EState *estate,
static void
slot_store_error_callback(void *arg)
{
- SlotErrCallbackArg *errarg = (SlotErrCallbackArg *) arg;
+ SlotErrCallbackArg *errarg = (SlotErrCallbackArg *) arg;
+ LogicalRepRelMapEntry *rel;
+ char *remotetypname;
+ int remote_attnum;
Oid remotetypoid,
localtypoid;
- if (errarg->attnum < 0)
+ rel = errarg->rel;
+ remote_attnum = rel->attrmap[errarg->local_attnum];
+
+ if (remote_attnum < 0)
return;
- remotetypoid = errarg->rel->atttyps[errarg->attnum];
- localtypoid = logicalrep_typmap_getid(remotetypoid);
+ remotetypoid = rel->remoterel.atttyps[remote_attnum];
+
+ /* Fetch remote type name from the LogicalRepTypMap cache */
+ remotetypname = logicalrep_typmap_gettypname(remotetypoid);
+
+ /* Fetch local type OID from the local sys cache */
+ localtypoid = get_atttype(errarg->rel->localreloid, errarg->local_attnum + 1);
+
errcontext("processing remote data for replication target relation \"%s.%s\" column \"%s\", "
- "remote type %s, local type %s",
- errarg->rel->nspname, errarg->rel->relname,
- errarg->rel->attnames[errarg->attnum],
- format_type_be(remotetypoid),
+ "remote type \"%s\", local type \"%s\"",
+ rel->remoterel.nspname, rel->remoterel.relname,
+ rel->remoterel.attnames[remote_attnum],
+ remotetypname,
format_type_be(localtypoid));
}
@@ -315,8 +327,8 @@ slot_store_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel,
ExecClearTuple(slot);
/* Push callback + info on the error context stack */
- errarg.rel = &rel->remoterel;
- errarg.attnum = -1;
+ errarg.rel = rel;
+ errarg.local_attnum = -1;
errcallback.callback = slot_store_error_callback;
errcallback.arg = (void *) &errarg;
errcallback.previous = error_context_stack;
@@ -334,7 +346,7 @@ slot_store_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel,
Oid typinput;
Oid typioparam;
- errarg.attnum = remoteattnum;
+ errarg.local_attnum = i;
getTypeInputInfo(att->atttypid, &typinput, &typioparam);
slot->tts_values[i] = OidInputFunctionCall(typinput,
@@ -380,8 +392,8 @@ slot_modify_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel,
ExecClearTuple(slot);
/* Push callback + info on the error context stack */
- errarg.rel = &rel->remoterel;
- errarg.attnum = -1;
+ errarg.rel = rel;
+ errarg.local_attnum = -1;
errcallback.callback = slot_store_error_callback;
errcallback.arg = (void *) &errarg;
errcallback.previous = error_context_stack;
@@ -404,7 +416,7 @@ slot_modify_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel,
Oid typinput;
Oid typioparam;
- errarg.attnum = remoteattnum;
+ errarg.local_attnum = i;
getTypeInputInfo(att->atttypid, &typinput, &typioparam);
slot->tts_values[i] = OidInputFunctionCall(typinput,
diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h
index a9736e1..90ca358 100644
--- a/src/include/replication/logicalproto.h
+++ b/src/include/replication/logicalproto.h
@@ -55,10 +55,9 @@ typedef struct LogicalRepRelation
/* Type mapping info */
typedef struct LogicalRepTyp
{
- Oid remoteid; /* unique id of the type */
- char *nspname; /* schema name */
- char *typname; /* name of the type */
- Oid typoid; /* local type Oid */
+ Oid remoteid; /* unique id of the remote type */
+ char *nspname; /* schema name of remote type */
+ char *typname; /* name of the remote type */
} LogicalRepTyp;
/* Transaction info */
diff --git a/src/include/replication/logicalrelation.h b/src/include/replication/logicalrelation.h
index 8352705..563bb4f 100644
--- a/src/include/replication/logicalrelation.h
+++ b/src/include/replication/logicalrelation.h
@@ -37,6 +37,6 @@ extern void logicalrep_rel_close(LogicalRepRelMapEntry *rel,
LOCKMODE lockmode);
extern void logicalrep_typmap_update(LogicalRepTyp *remotetyp);
-extern Oid logicalrep_typmap_getid(Oid remoteid);
+extern char *logicalrep_typmap_gettypname(Oid remoteid);
#endif /* LOGICALRELATION_H */
On Mon, Dec 18, 2017 at 11:25 PM, Huong Dangminh
<huo-dangminh@ys.jp.nec.com> wrote:
eventually we'll need to map types to
local oid (and possibly more) where the local info is cached so that we
can interpret binary representation of replicated data (which we'll add
at some point since it's big performance boost).
Sounds good.
So I am afraid that if we do the rename of typmap to remotetype in this
patch it will a) make backports of fixes in the related code harder, b)
force us to rename it back again in the future.Thanks for your comment.
I'd keep your general approach but keep using typmap naming.
I update the patch as Petr Jelineks mention, keep using typmap naming.
Thank you for updating the patch. Here is a review comment.
- if (errarg->attnum < 0)
+ rel = errarg->rel;
+ remote_attnum = rel->attrmap[errarg->local_attnum];
+
+ if (remote_attnum < 0)
return;
I think errarg->local_attnum can be -1 here and access an invalid
address if slot_store_error_callback() is called before setting
errarg.local_attnum. I cannot see such call path in the current code
so far but would need to be fixed.
Regards,
--
Masahiko Sawada
NIPPON TELEGRAPH AND TELEPHONE CORPORATION
NTT Open Source Software Center
Hi Sawada-san,
eventually we'll need to map types to local oid (and possibly more)
where the local info is cached so that we can interpret binary
representation of replicated data (which we'll add at some point
since it's big performance boost).Sounds good.
So I am afraid that if we do the rename of typmap to remotetype in
this patch it will a) make backports of fixes in the related code
harder, b) force us to rename it back again in the future.Thanks for your comment.
I'd keep your general approach but keep using typmap naming.
I update the patch as Petr Jelineks mention, keep using typmap naming.
Thank you for updating the patch. Here is a review comment.
Thanks for reviewing.
- if (errarg->attnum < 0) + rel = errarg->rel; + remote_attnum = rel->attrmap[errarg->local_attnum]; + + if (remote_attnum < 0) return;I think errarg->local_attnum can be -1 here and access an invalid address
if slot_store_error_callback() is called before setting
errarg.local_attnum. I cannot see such call path in the current code so
far but would need to be fixed.
I updated the patch to fix it.
---
Thanks and best regards,
Dang Minh Huong
NEC Solution Innovators, Ltd.
http://www.nec-solutioninnovators.co.jp/en/
Attachments:
fix_slot_store_error_callback_v9.patchapplication/octet-stream; name=fix_slot_store_error_callback_v9.patchDownload
diff --git a/src/backend/replication/logical/relation.c b/src/backend/replication/logical/relation.c
index 46e515e..99c1bba 100644
--- a/src/backend/replication/logical/relation.c
+++ b/src/backend/replication/logical/relation.c
@@ -382,7 +382,6 @@ static void
logicalrep_typmap_invalidate_cb(Datum arg, int cacheid, uint32 hashvalue)
{
HASH_SEQ_STATUS status;
- LogicalRepTyp *entry;
/* Just to be sure. */
if (LogicalRepTypMap == NULL)
@@ -390,9 +389,6 @@ logicalrep_typmap_invalidate_cb(Datum arg, int cacheid, uint32 hashvalue)
/* invalidate all cache entries */
hash_seq_init(&status, LogicalRepTypMap);
-
- while ((entry = (LogicalRepTyp *) hash_seq_search(&status)) != NULL)
- entry->typoid = InvalidOid;
}
/*
@@ -403,8 +399,6 @@ logicalrep_typmap_free_entry(LogicalRepTyp *entry)
{
pfree(entry->nspname);
pfree(entry->typname);
-
- entry->typoid = InvalidOid;
}
/*
@@ -435,18 +429,16 @@ logicalrep_typmap_update(LogicalRepTyp *remotetyp)
entry->nspname = pstrdup(remotetyp->nspname);
entry->typname = pstrdup(remotetyp->typname);
MemoryContextSwitchTo(oldctx);
- entry->typoid = InvalidOid;
}
/*
- * Fetch type info from the cache.
+ * Fetch type name from the cache by remote type OID.
*/
-Oid
-logicalrep_typmap_getid(Oid remoteid)
+char *
+logicalrep_typmap_gettypname(Oid remoteid)
{
LogicalRepTyp *entry;
bool found;
- Oid nspoid;
/* Internal types are mapped directly. */
if (remoteid < FirstNormalObjectId)
@@ -455,7 +447,7 @@ logicalrep_typmap_getid(Oid remoteid)
ereport(ERROR,
(errmsg("built-in type %u not found", remoteid),
errhint("This can be caused by having a publisher with a higher PostgreSQL major version than the subscriber.")));
- return remoteid;
+ return format_type_be(remoteid);
}
if (LogicalRepTypMap == NULL)
@@ -469,24 +461,7 @@ logicalrep_typmap_getid(Oid remoteid)
elog(ERROR, "no type map entry for remote type %u",
remoteid);
- /* Found and mapped, return the oid. */
- if (OidIsValid(entry->typoid))
- return entry->typoid;
-
- /* Otherwise, try to map to local type. */
- nspoid = LookupExplicitNamespace(entry->nspname, true);
- if (OidIsValid(nspoid))
- entry->typoid = GetSysCacheOid2(TYPENAMENSP,
- PointerGetDatum(entry->typname),
- ObjectIdGetDatum(nspoid));
- else
- entry->typoid = InvalidOid;
-
- if (!OidIsValid(entry->typoid))
- ereport(ERROR,
- (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
- errmsg("data type \"%s.%s\" required for logical replication does not exist",
- entry->nspname, entry->typname)));
+ Assert(OidIsValid(entry->remoteid));
- return entry->typoid;
+ return entry->typname;
}
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index e46a62e..e25f845 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -100,8 +100,8 @@ static dlist_head lsn_mapping = DLIST_STATIC_INIT(lsn_mapping);
typedef struct SlotErrCallbackArg
{
- LogicalRepRelation *rel;
- int attnum;
+ LogicalRepRelMapEntry *rel;
+ int local_attnum;
} SlotErrCallbackArg;
static MemoryContext ApplyMessageContext = NULL;
@@ -281,20 +281,35 @@ slot_fill_defaults(LogicalRepRelMapEntry *rel, EState *estate,
static void
slot_store_error_callback(void *arg)
{
- SlotErrCallbackArg *errarg = (SlotErrCallbackArg *) arg;
+ SlotErrCallbackArg *errarg = (SlotErrCallbackArg *) arg;
+ LogicalRepRelMapEntry *rel;
+ char *remotetypname;
+ int remote_attnum;
Oid remotetypoid,
localtypoid;
- if (errarg->attnum < 0)
+ if (errarg->local_attnum <0)
+ return;
+
+ rel = errarg->rel;
+ remote_attnum = rel->attrmap[errarg->local_attnum];
+
+ if (remote_attnum < 0)
return;
- remotetypoid = errarg->rel->atttyps[errarg->attnum];
- localtypoid = logicalrep_typmap_getid(remotetypoid);
+ remotetypoid = rel->remoterel.atttyps[remote_attnum];
+
+ /* Fetch remote type name from the LogicalRepTypMap cache */
+ remotetypname = logicalrep_typmap_gettypname(remotetypoid);
+
+ /* Fetch local type OID from the local sys cache */
+ localtypoid = get_atttype(errarg->rel->localreloid, errarg->local_attnum + 1);
+
errcontext("processing remote data for replication target relation \"%s.%s\" column \"%s\", "
- "remote type %s, local type %s",
- errarg->rel->nspname, errarg->rel->relname,
- errarg->rel->attnames[errarg->attnum],
- format_type_be(remotetypoid),
+ "remote type \"%s\", local type \"%s\"",
+ rel->remoterel.nspname, rel->remoterel.relname,
+ rel->remoterel.attnames[remote_attnum],
+ remotetypname,
format_type_be(localtypoid));
}
@@ -315,8 +330,8 @@ slot_store_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel,
ExecClearTuple(slot);
/* Push callback + info on the error context stack */
- errarg.rel = &rel->remoterel;
- errarg.attnum = -1;
+ errarg.rel = rel;
+ errarg.local_attnum = -1;
errcallback.callback = slot_store_error_callback;
errcallback.arg = (void *) &errarg;
errcallback.previous = error_context_stack;
@@ -333,9 +348,7 @@ slot_store_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel,
{
Oid typinput;
Oid typioparam;
-
- errarg.attnum = remoteattnum;
-
+ errarg.local_attnum = i;
getTypeInputInfo(att->atttypid, &typinput, &typioparam);
slot->tts_values[i] = OidInputFunctionCall(typinput,
values[remoteattnum],
@@ -380,8 +393,8 @@ slot_modify_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel,
ExecClearTuple(slot);
/* Push callback + info on the error context stack */
- errarg.rel = &rel->remoterel;
- errarg.attnum = -1;
+ errarg.rel = rel;
+ errarg.local_attnum = -1;
errcallback.callback = slot_store_error_callback;
errcallback.arg = (void *) &errarg;
errcallback.previous = error_context_stack;
@@ -404,7 +417,7 @@ slot_modify_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel,
Oid typinput;
Oid typioparam;
- errarg.attnum = remoteattnum;
+ errarg.local_attnum = i;
getTypeInputInfo(att->atttypid, &typinput, &typioparam);
slot->tts_values[i] = OidInputFunctionCall(typinput,
diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h
index a9736e1..90ca358 100644
--- a/src/include/replication/logicalproto.h
+++ b/src/include/replication/logicalproto.h
@@ -55,10 +55,9 @@ typedef struct LogicalRepRelation
/* Type mapping info */
typedef struct LogicalRepTyp
{
- Oid remoteid; /* unique id of the type */
- char *nspname; /* schema name */
- char *typname; /* name of the type */
- Oid typoid; /* local type Oid */
+ Oid remoteid; /* unique id of the remote type */
+ char *nspname; /* schema name of remote type */
+ char *typname; /* name of the remote type */
} LogicalRepTyp;
/* Transaction info */
diff --git a/src/include/replication/logicalrelation.h b/src/include/replication/logicalrelation.h
index 8352705..563bb4f 100644
--- a/src/include/replication/logicalrelation.h
+++ b/src/include/replication/logicalrelation.h
@@ -37,6 +37,6 @@ extern void logicalrep_rel_close(LogicalRepRelMapEntry *rel,
LOCKMODE lockmode);
extern void logicalrep_typmap_update(LogicalRepTyp *remotetyp);
-extern Oid logicalrep_typmap_getid(Oid remoteid);
+extern char *logicalrep_typmap_gettypname(Oid remoteid);
#endif /* LOGICALRELATION_H */
On Wed, Dec 20, 2017 at 2:28 PM, Huong Dangminh
<huo-dangminh@ys.jp.nec.com> wrote:
Hi Sawada-san,
eventually we'll need to map types to local oid (and possibly more)
where the local info is cached so that we can interpret binary
representation of replicated data (which we'll add at some point
since it's big performance boost).Sounds good.
So I am afraid that if we do the rename of typmap to remotetype in
this patch it will a) make backports of fixes in the related code
harder, b) force us to rename it back again in the future.Thanks for your comment.
I'd keep your general approach but keep using typmap naming.
I update the patch as Petr Jelineks mention, keep using typmap naming.
Thank you for updating the patch. Here is a review comment.
Thanks for reviewing.
- if (errarg->attnum < 0) + rel = errarg->rel; + remote_attnum = rel->attrmap[errarg->local_attnum]; + + if (remote_attnum < 0) return;I think errarg->local_attnum can be -1 here and access an invalid address
if slot_store_error_callback() is called before setting
errarg.local_attnum. I cannot see such call path in the current code so
far but would need to be fixed.I updated the patch to fix it.
Thank you for quick response. The changes look good to me. But I
wonder if the following changes needs some comments to describe what
each checks does for.
- if (errarg->attnum < 0)
+ if (errarg->local_attnum <0)
+ return;
+
+ rel = errarg->rel;
+ remote_attnum = rel->attrmap[errarg->local_attnum];
+
+ if (remote_attnum < 0)
return;
Regards,
--
Masahiko Sawada
NIPPON TELEGRAPH AND TELEPHONE CORPORATION
NTT Open Source Software Center
Hi Sawada-san,
Thank you for quick response. The changes look good to me. But I wonder
if the following changes needs some comments to describe what each checks
does for.- if (errarg->attnum < 0) + if (errarg->local_attnum <0) + return; + + rel = errarg->rel; + remote_attnum = rel->attrmap[errarg->local_attnum]; + + if (remote_attnum < 0) return;
Thanks, I have added some comments as you mentioned.
---
Thanks and best regards,
Dang Minh Huong
NEC Solution Innovators, Ltd.
http://www.nec-solutioninnovators.co.jp/en/
Attachments:
fix_slot_store_error_callback_v10.patchapplication/octet-stream; name=fix_slot_store_error_callback_v10.patchDownload
diff --git a/src/backend/replication/logical/relation.c b/src/backend/replication/logical/relation.c
index 46e515e..99c1bba 100644
--- a/src/backend/replication/logical/relation.c
+++ b/src/backend/replication/logical/relation.c
@@ -382,7 +382,6 @@ static void
logicalrep_typmap_invalidate_cb(Datum arg, int cacheid, uint32 hashvalue)
{
HASH_SEQ_STATUS status;
- LogicalRepTyp *entry;
/* Just to be sure. */
if (LogicalRepTypMap == NULL)
@@ -390,9 +389,6 @@ logicalrep_typmap_invalidate_cb(Datum arg, int cacheid, uint32 hashvalue)
/* invalidate all cache entries */
hash_seq_init(&status, LogicalRepTypMap);
-
- while ((entry = (LogicalRepTyp *) hash_seq_search(&status)) != NULL)
- entry->typoid = InvalidOid;
}
/*
@@ -403,8 +399,6 @@ logicalrep_typmap_free_entry(LogicalRepTyp *entry)
{
pfree(entry->nspname);
pfree(entry->typname);
-
- entry->typoid = InvalidOid;
}
/*
@@ -435,18 +429,16 @@ logicalrep_typmap_update(LogicalRepTyp *remotetyp)
entry->nspname = pstrdup(remotetyp->nspname);
entry->typname = pstrdup(remotetyp->typname);
MemoryContextSwitchTo(oldctx);
- entry->typoid = InvalidOid;
}
/*
- * Fetch type info from the cache.
+ * Fetch type name from the cache by remote type OID.
*/
-Oid
-logicalrep_typmap_getid(Oid remoteid)
+char *
+logicalrep_typmap_gettypname(Oid remoteid)
{
LogicalRepTyp *entry;
bool found;
- Oid nspoid;
/* Internal types are mapped directly. */
if (remoteid < FirstNormalObjectId)
@@ -455,7 +447,7 @@ logicalrep_typmap_getid(Oid remoteid)
ereport(ERROR,
(errmsg("built-in type %u not found", remoteid),
errhint("This can be caused by having a publisher with a higher PostgreSQL major version than the subscriber.")));
- return remoteid;
+ return format_type_be(remoteid);
}
if (LogicalRepTypMap == NULL)
@@ -469,24 +461,7 @@ logicalrep_typmap_getid(Oid remoteid)
elog(ERROR, "no type map entry for remote type %u",
remoteid);
- /* Found and mapped, return the oid. */
- if (OidIsValid(entry->typoid))
- return entry->typoid;
-
- /* Otherwise, try to map to local type. */
- nspoid = LookupExplicitNamespace(entry->nspname, true);
- if (OidIsValid(nspoid))
- entry->typoid = GetSysCacheOid2(TYPENAMENSP,
- PointerGetDatum(entry->typname),
- ObjectIdGetDatum(nspoid));
- else
- entry->typoid = InvalidOid;
-
- if (!OidIsValid(entry->typoid))
- ereport(ERROR,
- (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
- errmsg("data type \"%s.%s\" required for logical replication does not exist",
- entry->nspname, entry->typname)));
+ Assert(OidIsValid(entry->remoteid));
- return entry->typoid;
+ return entry->typname;
}
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index e46a62e..cd7576f 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -100,8 +100,8 @@ static dlist_head lsn_mapping = DLIST_STATIC_INIT(lsn_mapping);
typedef struct SlotErrCallbackArg
{
- LogicalRepRelation *rel;
- int attnum;
+ LogicalRepRelMapEntry *rel;
+ int local_attnum;
} SlotErrCallbackArg;
static MemoryContext ApplyMessageContext = NULL;
@@ -281,20 +281,38 @@ slot_fill_defaults(LogicalRepRelMapEntry *rel, EState *estate,
static void
slot_store_error_callback(void *arg)
{
- SlotErrCallbackArg *errarg = (SlotErrCallbackArg *) arg;
+ SlotErrCallbackArg *errarg = (SlotErrCallbackArg *) arg;
+ LogicalRepRelMapEntry *rel;
+ char *remotetypname;
+ int remote_attnum;
Oid remotetypoid,
localtypoid;
- if (errarg->attnum < 0)
+ /* Check case of slot_store_error_callback() is called before
+ * errarg.local_attnum is set. */
+ if (errarg->local_attnum <0)
+ return;
+
+ rel = errarg->rel;
+ remote_attnum = rel->attrmap[errarg->local_attnum];
+
+ /* Check if invalid attribute here, e.g. dropped column. */
+ if (remote_attnum < 0)
return;
- remotetypoid = errarg->rel->atttyps[errarg->attnum];
- localtypoid = logicalrep_typmap_getid(remotetypoid);
+ remotetypoid = rel->remoterel.atttyps[remote_attnum];
+
+ /* Fetch remote type name from the LogicalRepTypMap cache */
+ remotetypname = logicalrep_typmap_gettypname(remotetypoid);
+
+ /* Fetch local type OID from the local sys cache */
+ localtypoid = get_atttype(errarg->rel->localreloid, errarg->local_attnum + 1);
+
errcontext("processing remote data for replication target relation \"%s.%s\" column \"%s\", "
- "remote type %s, local type %s",
- errarg->rel->nspname, errarg->rel->relname,
- errarg->rel->attnames[errarg->attnum],
- format_type_be(remotetypoid),
+ "remote type \"%s\", local type \"%s\"",
+ rel->remoterel.nspname, rel->remoterel.relname,
+ rel->remoterel.attnames[remote_attnum],
+ remotetypname,
format_type_be(localtypoid));
}
@@ -315,8 +333,8 @@ slot_store_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel,
ExecClearTuple(slot);
/* Push callback + info on the error context stack */
- errarg.rel = &rel->remoterel;
- errarg.attnum = -1;
+ errarg.rel = rel;
+ errarg.local_attnum = -1;
errcallback.callback = slot_store_error_callback;
errcallback.arg = (void *) &errarg;
errcallback.previous = error_context_stack;
@@ -333,9 +351,7 @@ slot_store_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel,
{
Oid typinput;
Oid typioparam;
-
- errarg.attnum = remoteattnum;
-
+ errarg.local_attnum = i;
getTypeInputInfo(att->atttypid, &typinput, &typioparam);
slot->tts_values[i] = OidInputFunctionCall(typinput,
values[remoteattnum],
@@ -380,8 +396,8 @@ slot_modify_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel,
ExecClearTuple(slot);
/* Push callback + info on the error context stack */
- errarg.rel = &rel->remoterel;
- errarg.attnum = -1;
+ errarg.rel = rel;
+ errarg.local_attnum = -1;
errcallback.callback = slot_store_error_callback;
errcallback.arg = (void *) &errarg;
errcallback.previous = error_context_stack;
@@ -404,7 +420,7 @@ slot_modify_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel,
Oid typinput;
Oid typioparam;
- errarg.attnum = remoteattnum;
+ errarg.local_attnum = i;
getTypeInputInfo(att->atttypid, &typinput, &typioparam);
slot->tts_values[i] = OidInputFunctionCall(typinput,
diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h
index a9736e1..90ca358 100644
--- a/src/include/replication/logicalproto.h
+++ b/src/include/replication/logicalproto.h
@@ -55,10 +55,9 @@ typedef struct LogicalRepRelation
/* Type mapping info */
typedef struct LogicalRepTyp
{
- Oid remoteid; /* unique id of the type */
- char *nspname; /* schema name */
- char *typname; /* name of the type */
- Oid typoid; /* local type Oid */
+ Oid remoteid; /* unique id of the remote type */
+ char *nspname; /* schema name of remote type */
+ char *typname; /* name of the remote type */
} LogicalRepTyp;
/* Transaction info */
diff --git a/src/include/replication/logicalrelation.h b/src/include/replication/logicalrelation.h
index 8352705..563bb4f 100644
--- a/src/include/replication/logicalrelation.h
+++ b/src/include/replication/logicalrelation.h
@@ -37,6 +37,6 @@ extern void logicalrep_rel_close(LogicalRepRelMapEntry *rel,
LOCKMODE lockmode);
extern void logicalrep_typmap_update(LogicalRepTyp *remotetyp);
-extern Oid logicalrep_typmap_getid(Oid remoteid);
+extern char *logicalrep_typmap_gettypname(Oid remoteid);
#endif /* LOGICALRELATION_H */
On Wed, Dec 20, 2017 at 5:39 PM, Huong Dangminh
<huo-dangminh@ys.jp.nec.com> wrote:
Hi Sawada-san,
Thank you for quick response. The changes look good to me. But I wonder
if the following changes needs some comments to describe what each checks
does for.- if (errarg->attnum < 0) + if (errarg->local_attnum <0) + return; + + rel = errarg->rel; + remote_attnum = rel->attrmap[errarg->local_attnum]; + + if (remote_attnum < 0) return;Thanks, I have added some comments as you mentioned.
Thank you for updating the patch.
- if (errarg->attnum < 0)
+ /* Check case of slot_store_error_callback() is called before
+ * errarg.local_attnum is set. */
+ if (errarg->local_attnum <0)
This comment style isn't preferred by PostgreSQL code. Please refer to
https://www.postgresql.org/docs/current/static/source-format.html
--
$ git diff --check
src/backend/replication/logical/worker.c:291: trailing whitespace.
+ /* Check case of slot_store_error_callback() is called before
There is an extra white space in the patch.
I'm inclined to think SlotErrCallbackArg can have remote_attnum and
pass it to the callback function. That way, we don't need to case the
case where local_attnum is not set yet.
Attached a new version patch incorporated the aboves. Please review it.
Regards,
--
Masahiko Sawada
NIPPON TELEGRAPH AND TELEPHONE CORPORATION
NTT Open Source Software Center
Attachments:
fix_slot_store_error_callback_v11.patchapplication/octet-stream; name=fix_slot_store_error_callback_v11.patchDownload
diff --git a/src/backend/replication/logical/relation.c b/src/backend/replication/logical/relation.c
index 4b2d8a1..efd7265 100644
--- a/src/backend/replication/logical/relation.c
+++ b/src/backend/replication/logical/relation.c
@@ -383,7 +383,6 @@ static void
logicalrep_typmap_invalidate_cb(Datum arg, int cacheid, uint32 hashvalue)
{
HASH_SEQ_STATUS status;
- LogicalRepTyp *entry;
/* Just to be sure. */
if (LogicalRepTypMap == NULL)
@@ -391,9 +390,6 @@ logicalrep_typmap_invalidate_cb(Datum arg, int cacheid, uint32 hashvalue)
/* invalidate all cache entries */
hash_seq_init(&status, LogicalRepTypMap);
-
- while ((entry = (LogicalRepTyp *) hash_seq_search(&status)) != NULL)
- entry->typoid = InvalidOid;
}
/*
@@ -404,8 +400,6 @@ logicalrep_typmap_free_entry(LogicalRepTyp *entry)
{
pfree(entry->nspname);
pfree(entry->typname);
-
- entry->typoid = InvalidOid;
}
/*
@@ -436,18 +430,16 @@ logicalrep_typmap_update(LogicalRepTyp *remotetyp)
entry->nspname = pstrdup(remotetyp->nspname);
entry->typname = pstrdup(remotetyp->typname);
MemoryContextSwitchTo(oldctx);
- entry->typoid = InvalidOid;
}
/*
- * Fetch type info from the cache.
+ * Fetch type name from the cache by remote type OID.
*/
-Oid
-logicalrep_typmap_getid(Oid remoteid)
+char *
+logicalrep_typmap_gettypname(Oid remoteid)
{
LogicalRepTyp *entry;
bool found;
- Oid nspoid;
/* Internal types are mapped directly. */
if (remoteid < FirstNormalObjectId)
@@ -456,7 +448,7 @@ logicalrep_typmap_getid(Oid remoteid)
ereport(ERROR,
(errmsg("built-in type %u not found", remoteid),
errhint("This can be caused by having a publisher with a higher PostgreSQL major version than the subscriber.")));
- return remoteid;
+ return format_type_be(remoteid);
}
if (LogicalRepTypMap == NULL)
@@ -470,24 +462,7 @@ logicalrep_typmap_getid(Oid remoteid)
elog(ERROR, "no type map entry for remote type %u",
remoteid);
- /* Found and mapped, return the oid. */
- if (OidIsValid(entry->typoid))
- return entry->typoid;
-
- /* Otherwise, try to map to local type. */
- nspoid = LookupExplicitNamespace(entry->nspname, true);
- if (OidIsValid(nspoid))
- entry->typoid = GetSysCacheOid2(TYPENAMENSP,
- PointerGetDatum(entry->typname),
- ObjectIdGetDatum(nspoid));
- else
- entry->typoid = InvalidOid;
-
- if (!OidIsValid(entry->typoid))
- ereport(ERROR,
- (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
- errmsg("data type \"%s.%s\" required for logical replication does not exist",
- entry->nspname, entry->typname)));
+ Assert(OidIsValid(entry->remoteid));
- return entry->typoid;
+ return entry->typname;
}
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index fa5d9bb..2b8abb1 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -100,8 +100,9 @@ static dlist_head lsn_mapping = DLIST_STATIC_INIT(lsn_mapping);
typedef struct SlotErrCallbackArg
{
- LogicalRepRelation *rel;
- int attnum;
+ LogicalRepRelMapEntry *rel;
+ int local_attnum;
+ int remote_attnum;
} SlotErrCallbackArg;
static MemoryContext ApplyMessageContext = NULL;
@@ -281,20 +282,30 @@ slot_fill_defaults(LogicalRepRelMapEntry *rel, EState *estate,
static void
slot_store_error_callback(void *arg)
{
- SlotErrCallbackArg *errarg = (SlotErrCallbackArg *) arg;
+ SlotErrCallbackArg *errarg = (SlotErrCallbackArg *) arg;
+ LogicalRepRelMapEntry *rel;
+ char *remotetypname;
Oid remotetypoid,
localtypoid;
- if (errarg->attnum < 0)
+ /* Return if remote attribute number is not set */
+ if (errarg->remote_attnum < 0)
return;
- remotetypoid = errarg->rel->atttyps[errarg->attnum];
- localtypoid = logicalrep_typmap_getid(remotetypoid);
+ rel = errarg->rel;
+ remotetypoid = rel->remoterel.atttyps[errarg->remote_attnum];
+
+ /* Fetch remote type name from the LogicalRepTypMap cache */
+ remotetypname = logicalrep_typmap_gettypname(remotetypoid);
+
+ /* Fetch local type OID from the local sys cache */
+ localtypoid = get_atttype(errarg->rel->localreloid, errarg->local_attnum + 1);
+
errcontext("processing remote data for replication target relation \"%s.%s\" column \"%s\", "
- "remote type %s, local type %s",
- errarg->rel->nspname, errarg->rel->relname,
- errarg->rel->attnames[errarg->attnum],
- format_type_be(remotetypoid),
+ "remote type \"%s\", local type \"%s\"",
+ rel->remoterel.nspname, rel->remoterel.relname,
+ rel->remoterel.attnames[errarg->remote_attnum],
+ remotetypname,
format_type_be(localtypoid));
}
@@ -315,8 +326,9 @@ slot_store_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel,
ExecClearTuple(slot);
/* Push callback + info on the error context stack */
- errarg.rel = &rel->remoterel;
- errarg.attnum = -1;
+ errarg.rel = rel;
+ errarg.local_attnum = -1;
+ errarg.remote_attnum = -1;
errcallback.callback = slot_store_error_callback;
errcallback.arg = (void *) &errarg;
errcallback.previous = error_context_stack;
@@ -334,8 +346,8 @@ slot_store_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel,
Oid typinput;
Oid typioparam;
- errarg.attnum = remoteattnum;
-
+ errarg.local_attnum = i;
+ errarg.remote_attnum = remoteattnum;
getTypeInputInfo(att->atttypid, &typinput, &typioparam);
slot->tts_values[i] = OidInputFunctionCall(typinput,
values[remoteattnum],
@@ -380,8 +392,9 @@ slot_modify_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel,
ExecClearTuple(slot);
/* Push callback + info on the error context stack */
- errarg.rel = &rel->remoterel;
- errarg.attnum = -1;
+ errarg.rel = rel;
+ errarg.local_attnum = -1;
+ errarg.remote_attnum = -1;
errcallback.callback = slot_store_error_callback;
errcallback.arg = (void *) &errarg;
errcallback.previous = error_context_stack;
@@ -404,7 +417,8 @@ slot_modify_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel,
Oid typinput;
Oid typioparam;
- errarg.attnum = remoteattnum;
+ errarg.local_attnum = i;
+ errarg.remote_attnum = remoteattnum;
getTypeInputInfo(att->atttypid, &typinput, &typioparam);
slot->tts_values[i] = OidInputFunctionCall(typinput,
diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h
index a9736e1..90ca358 100644
--- a/src/include/replication/logicalproto.h
+++ b/src/include/replication/logicalproto.h
@@ -55,10 +55,9 @@ typedef struct LogicalRepRelation
/* Type mapping info */
typedef struct LogicalRepTyp
{
- Oid remoteid; /* unique id of the type */
- char *nspname; /* schema name */
- char *typname; /* name of the type */
- Oid typoid; /* local type Oid */
+ Oid remoteid; /* unique id of the remote type */
+ char *nspname; /* schema name of remote type */
+ char *typname; /* name of the remote type */
} LogicalRepTyp;
/* Transaction info */
diff --git a/src/include/replication/logicalrelation.h b/src/include/replication/logicalrelation.h
index 8352705..563bb4f 100644
--- a/src/include/replication/logicalrelation.h
+++ b/src/include/replication/logicalrelation.h
@@ -37,6 +37,6 @@ extern void logicalrep_rel_close(LogicalRepRelMapEntry *rel,
LOCKMODE lockmode);
extern void logicalrep_typmap_update(LogicalRepTyp *remotetyp);
-extern Oid logicalrep_typmap_getid(Oid remoteid);
+extern char *logicalrep_typmap_gettypname(Oid remoteid);
#endif /* LOGICALRELATION_H */
On 2017/12/21 10:05, Masahiko Sawada wrote:
On Wed, Dec 20, 2017 at 5:39 PM, Huong Dangminh
<huo-dangminh@ys.jp.nec.com> wrote:Hi Sawada-san,
Thank you for quick response. The changes look good to me. But I wonder
if the following changes needs some comments to describe what each checks
does for.- if (errarg->attnum < 0) + if (errarg->local_attnum <0) + return; + + rel = errarg->rel; + remote_attnum = rel->attrmap[errarg->local_attnum]; + + if (remote_attnum < 0) return;Thanks, I have added some comments as you mentioned.
Thank you for updating the patch.
- if (errarg->attnum < 0) + /* Check case of slot_store_error_callback() is called before + * errarg.local_attnum is set. */ + if (errarg->local_attnum <0)This comment style isn't preferred by PostgreSQL code. Please refer to https://www.postgresql.org/docs/current/static/source-format.html -- $ git diff --check src/backend/replication/logical/worker.c:291: trailing whitespace. + /* Check case of slot_store_error_callback() is called before
Thanks, I will be careful in the next time.
There is an extra white space in the patch.
I'm inclined to think SlotErrCallbackArg can have remote_attnum and
pass it to the callback function. That way, we don't need to case the
case where local_attnum is not set yet.
Nice.
Attached a new version patch incorporated the aboves. Please review it.
Thanks for updating the patch.
It looks fine to me.
---
Thanks and best regards,
Dang Minh Huong
On Sat, Dec 23, 2017 at 4:08 PM, Dang Minh Huong <kakalot49@gmail.com> wrote:
On 2017/12/21 10:05, Masahiko Sawada wrote:
On Wed, Dec 20, 2017 at 5:39 PM, Huong Dangminh
<huo-dangminh@ys.jp.nec.com> wrote:Hi Sawada-san,
Thank you for quick response. The changes look good to me. But I wonder
if the following changes needs some comments to describe what each
checks
does for.- if (errarg->attnum < 0) + if (errarg->local_attnum <0) + return; + + rel = errarg->rel; + remote_attnum = rel->attrmap[errarg->local_attnum]; + + if (remote_attnum < 0) return;Thanks, I have added some comments as you mentioned.
Thank you for updating the patch.
- if (errarg->attnum < 0) + /* Check case of slot_store_error_callback() is called before + * errarg.local_attnum is set. */ + if (errarg->local_attnum <0)This comment style isn't preferred by PostgreSQL code. Please refer to https://www.postgresql.org/docs/current/static/source-format.html -- $ git diff --check src/backend/replication/logical/worker.c:291: trailing whitespace. + /* Check case of slot_store_error_callback() is called beforeThanks, I will be careful in the next time.
There is an extra white space in the patch.
I'm inclined to think SlotErrCallbackArg can have remote_attnum and
pass it to the callback function. That way, we don't need to case the
case where local_attnum is not set yet.Nice.
Attached a new version patch incorporated the aboves. Please review it.
Thanks for updating the patch.
It looks fine to me.
Thank you for confirmation.
Regards,
--
Masahiko Sawada
NIPPON TELEGRAPH AND TELEPHONE CORPORATION
NTT Open Source Software Center
From: Masahiko Sawada [mailto:sawada.mshk@gmail.com]
Attached a new version patch incorporated the aboves. Please review it.
Thanks for updating the patch.
It looks fine to me.
I mean that it passes make check, and subscription TAP tests too.
Thank you for confirmation.
Also. I have changed status in CF to Ready for Committer.
I would be glad if it can be applied from PostgreSQL 10.
---
Thanks and best regards,
Dang Minh Huong
NEC Solution Innovators, Ltd.
http://www.nec-solutioninnovators.co.jp/en/
hash_seq_init in logicalrep_typmap_invalidate_cb is useless after your
patch. If you remove it, the function becomes empty, so why is it there
an invalidation callback at all?
Are we now leaking memory if types keep repeatedly being re-created in
the origin? I suppose it's not a common use pattern, but it'd be good
to avoid everlasting memleaks.
--
�lvaro Herrera https://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services
On Sat, Jan 6, 2018 at 3:53 AM, Alvaro Herrera <alvherre@alvh.no-ip.org> wrote:
hash_seq_init in logicalrep_typmap_invalidate_cb is useless after your
patch. If you remove it, the function becomes empty, so why is it there
an invalidation callback at all?
Thank you for the comment. Yeah, logicalrep_typmap_invalidate_cb is no
longer needed. Attached an updated patch.
Are we now leaking memory if types keep repeatedly being re-created in
the origin?
The type name and namespace name in LogicalRepTyp are freed when
updating entries but LogicalRepTyp entry itself could be leaked. It
can happen to relation map as well. Since we don't remove hash entry
during working in the origin the hash map entry for relation map is
leaked if publication repeatedly adds/drops tables and subscription
refreshes it.
I suppose it's not a common use pattern, but it'd be good
to avoid everlasting memleaks.
I agree. Can we remove entry from hash table in the callbacks instead
of setting InvalidOid when invalidate caches?
Regards,
--
Masahiko Sawada
NIPPON TELEGRAPH AND TELEPHONE CORPORATION
NTT Open Source Software Center
Attachments:
fix_slot_store_error_callback_v12.patchapplication/octet-stream; name=fix_slot_store_error_callback_v12.patchDownload
diff --git a/src/backend/replication/logical/relation.c b/src/backend/replication/logical/relation.c
index e492d26..fb391b6 100644
--- a/src/backend/replication/logical/relation.c
+++ b/src/backend/replication/logical/relation.c
@@ -35,8 +35,6 @@ static MemoryContext LogicalRepRelMapContext = NULL;
static HTAB *LogicalRepRelMap = NULL;
static HTAB *LogicalRepTypMap = NULL;
-static void logicalrep_typmap_invalidate_cb(Datum arg, int cacheid,
- uint32 hashvalue);
/*
* Relcache invalidation callback for our relation map cache.
@@ -115,8 +113,6 @@ logicalrep_relmap_init(void)
/* Watch for invalidation events. */
CacheRegisterRelcacheCallback(logicalrep_relmap_invalidate_cb,
(Datum) 0);
- CacheRegisterSyscacheCallback(TYPEOID, logicalrep_typmap_invalidate_cb,
- (Datum) 0);
}
/*
@@ -375,27 +371,6 @@ logicalrep_rel_close(LogicalRepRelMapEntry *rel, LOCKMODE lockmode)
rel->localrel = NULL;
}
-
-/*
- * Type cache invalidation callback for our type map cache.
- */
-static void
-logicalrep_typmap_invalidate_cb(Datum arg, int cacheid, uint32 hashvalue)
-{
- HASH_SEQ_STATUS status;
- LogicalRepTyp *entry;
-
- /* Just to be sure. */
- if (LogicalRepTypMap == NULL)
- return;
-
- /* invalidate all cache entries */
- hash_seq_init(&status, LogicalRepTypMap);
-
- while ((entry = (LogicalRepTyp *) hash_seq_search(&status)) != NULL)
- entry->typoid = InvalidOid;
-}
-
/*
* Free the type map cache entry data.
*/
@@ -404,8 +379,6 @@ logicalrep_typmap_free_entry(LogicalRepTyp *entry)
{
pfree(entry->nspname);
pfree(entry->typname);
-
- entry->typoid = InvalidOid;
}
/*
@@ -436,18 +409,16 @@ logicalrep_typmap_update(LogicalRepTyp *remotetyp)
entry->nspname = pstrdup(remotetyp->nspname);
entry->typname = pstrdup(remotetyp->typname);
MemoryContextSwitchTo(oldctx);
- entry->typoid = InvalidOid;
}
/*
- * Fetch type info from the cache.
+ * Fetch type name from the cache by remote type OID.
*/
-Oid
-logicalrep_typmap_getid(Oid remoteid)
+char *
+logicalrep_typmap_gettypname(Oid remoteid)
{
LogicalRepTyp *entry;
bool found;
- Oid nspoid;
/* Internal types are mapped directly. */
if (remoteid < FirstNormalObjectId)
@@ -456,7 +427,7 @@ logicalrep_typmap_getid(Oid remoteid)
ereport(ERROR,
(errmsg("built-in type %u not found", remoteid),
errhint("This can be caused by having a publisher with a higher PostgreSQL major version than the subscriber.")));
- return remoteid;
+ return format_type_be(remoteid);
}
if (LogicalRepTypMap == NULL)
@@ -470,24 +441,7 @@ logicalrep_typmap_getid(Oid remoteid)
elog(ERROR, "no type map entry for remote type %u",
remoteid);
- /* Found and mapped, return the oid. */
- if (OidIsValid(entry->typoid))
- return entry->typoid;
-
- /* Otherwise, try to map to local type. */
- nspoid = LookupExplicitNamespace(entry->nspname, true);
- if (OidIsValid(nspoid))
- entry->typoid = GetSysCacheOid2(TYPENAMENSP,
- PointerGetDatum(entry->typname),
- ObjectIdGetDatum(nspoid));
- else
- entry->typoid = InvalidOid;
-
- if (!OidIsValid(entry->typoid))
- ereport(ERROR,
- (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
- errmsg("data type \"%s.%s\" required for logical replication does not exist",
- entry->nspname, entry->typname)));
+ Assert(OidIsValid(entry->remoteid));
- return entry->typoid;
+ return entry->typname;
}
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 83c6909..bd596e8 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -100,8 +100,9 @@ static dlist_head lsn_mapping = DLIST_STATIC_INIT(lsn_mapping);
typedef struct SlotErrCallbackArg
{
- LogicalRepRelation *rel;
- int attnum;
+ LogicalRepRelMapEntry *rel;
+ int local_attnum;
+ int remote_attnum;
} SlotErrCallbackArg;
static MemoryContext ApplyMessageContext = NULL;
@@ -281,20 +282,30 @@ slot_fill_defaults(LogicalRepRelMapEntry *rel, EState *estate,
static void
slot_store_error_callback(void *arg)
{
- SlotErrCallbackArg *errarg = (SlotErrCallbackArg *) arg;
+ SlotErrCallbackArg *errarg = (SlotErrCallbackArg *) arg;
+ LogicalRepRelMapEntry *rel;
+ char *remotetypname;
Oid remotetypoid,
localtypoid;
- if (errarg->attnum < 0)
+ /* Return if remote attribute number is not set */
+ if (errarg->remote_attnum < 0)
return;
- remotetypoid = errarg->rel->atttyps[errarg->attnum];
- localtypoid = logicalrep_typmap_getid(remotetypoid);
+ rel = errarg->rel;
+ remotetypoid = rel->remoterel.atttyps[errarg->remote_attnum];
+
+ /* Fetch remote type name from the LogicalRepTypMap cache */
+ remotetypname = logicalrep_typmap_gettypname(remotetypoid);
+
+ /* Fetch local type OID from the local sys cache */
+ localtypoid = get_atttype(errarg->rel->localreloid, errarg->local_attnum + 1);
+
errcontext("processing remote data for replication target relation \"%s.%s\" column \"%s\", "
- "remote type %s, local type %s",
- errarg->rel->nspname, errarg->rel->relname,
- errarg->rel->attnames[errarg->attnum],
- format_type_be(remotetypoid),
+ "remote type \"%s\", local type \"%s\"",
+ rel->remoterel.nspname, rel->remoterel.relname,
+ rel->remoterel.attnames[errarg->remote_attnum],
+ remotetypname,
format_type_be(localtypoid));
}
@@ -315,8 +326,9 @@ slot_store_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel,
ExecClearTuple(slot);
/* Push callback + info on the error context stack */
- errarg.rel = &rel->remoterel;
- errarg.attnum = -1;
+ errarg.rel = rel;
+ errarg.local_attnum = -1;
+ errarg.remote_attnum = -1;
errcallback.callback = slot_store_error_callback;
errcallback.arg = (void *) &errarg;
errcallback.previous = error_context_stack;
@@ -334,8 +346,8 @@ slot_store_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel,
Oid typinput;
Oid typioparam;
- errarg.attnum = remoteattnum;
-
+ errarg.local_attnum = i;
+ errarg.remote_attnum = remoteattnum;
getTypeInputInfo(att->atttypid, &typinput, &typioparam);
slot->tts_values[i] = OidInputFunctionCall(typinput,
values[remoteattnum],
@@ -380,8 +392,9 @@ slot_modify_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel,
ExecClearTuple(slot);
/* Push callback + info on the error context stack */
- errarg.rel = &rel->remoterel;
- errarg.attnum = -1;
+ errarg.rel = rel;
+ errarg.local_attnum = -1;
+ errarg.remote_attnum = -1;
errcallback.callback = slot_store_error_callback;
errcallback.arg = (void *) &errarg;
errcallback.previous = error_context_stack;
@@ -404,7 +417,8 @@ slot_modify_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel,
Oid typinput;
Oid typioparam;
- errarg.attnum = remoteattnum;
+ errarg.local_attnum = i;
+ errarg.remote_attnum = remoteattnum;
getTypeInputInfo(att->atttypid, &typinput, &typioparam);
slot->tts_values[i] = OidInputFunctionCall(typinput,
diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h
index 0eb2105..56f6095 100644
--- a/src/include/replication/logicalproto.h
+++ b/src/include/replication/logicalproto.h
@@ -55,10 +55,9 @@ typedef struct LogicalRepRelation
/* Type mapping info */
typedef struct LogicalRepTyp
{
- Oid remoteid; /* unique id of the type */
- char *nspname; /* schema name */
- char *typname; /* name of the type */
- Oid typoid; /* local type Oid */
+ Oid remoteid; /* unique id of the remote type */
+ char *nspname; /* schema name of remote type */
+ char *typname; /* name of the remote type */
} LogicalRepTyp;
/* Transaction info */
diff --git a/src/include/replication/logicalrelation.h b/src/include/replication/logicalrelation.h
index d4250c2..fc2d808 100644
--- a/src/include/replication/logicalrelation.h
+++ b/src/include/replication/logicalrelation.h
@@ -37,6 +37,6 @@ extern void logicalrep_rel_close(LogicalRepRelMapEntry *rel,
LOCKMODE lockmode);
extern void logicalrep_typmap_update(LogicalRepTyp *remotetyp);
-extern Oid logicalrep_typmap_getid(Oid remoteid);
+extern char *logicalrep_typmap_gettypname(Oid remoteid);
#endif /* LOGICALRELATION_H */
Hi,
It seems to be in the middle of discussion, but I became a reviewer of
this problem several days ago so I've tested the latest patch
'fix_slot_store_error_callback_v12.patch'.
I reproduced the below ERROR by inserting elog() to INPUT function
of CREATE TYPE, and confirmed that above patch solves the problem.
ERROR: XX000: cache lookup failed for type XXXXX
I also ran make check-world and there was no error.
Is the only remaining topic memory leaks?
On 2018/01/09 17:22, Masahiko Sawada wrote:
I suppose it's not a common use pattern, but it'd be good
to avoid everlasting memleaks.I agree. Can we remove entry from hash table in the callbacks instead
of setting InvalidOid when invalidate caches?
--
Atsushi Torikoshi
NIPPON TELEGRAPH AND TELEPHONE CORPORATION
NTT Open Source Software Center
On Tue, Jan 30, 2018 at 6:36 PM, atorikoshi
<torikoshi_atsushi_z2@lab.ntt.co.jp> wrote:
Hi,
It seems to be in the middle of discussion, but I became a reviewer of
this problem several days ago so I've tested the latest patch
'fix_slot_store_error_callback_v12.patch'.I reproduced the below ERROR by inserting elog() to INPUT function
of CREATE TYPE, and confirmed that above patch solves the problem.ERROR: XX000: cache lookup failed for type XXXXX
I also ran make check-world and there was no error.
Is the only remaining topic memory leaks?
Yeah, but I think that the patch for the avoiding memleaks should be a
separate patch.
Regards,
--
Masahiko Sawada
NIPPON TELEGRAPH AND TELEPHONE CORPORATION
NTT Open Source Software Center
I noticed that logicalrep_typmap_gettypname's only caller is
slot_store_error_callback, which is an error callback. So by raising an
error from the former, we would effectively just hide the actual reason
for the error behind the error that the cache entry cannot be found.
Therefore, I'm inclined to make this function raise a warning, then
return a substitute value (something like "unrecognized type XYZ"). Do
the same for the case with the mismatched major versions. Lastly, if
LogicalRepTypMap is not defined, also throw a warning and return a
substitute value rather than try to initialize the hash table:
initializing the table is not going to make the entry show up in it,
anyway, so it's pretty pointless; and there's plenty chance for things
to go wrong, which is not a good idea in an error callback.
Do we need a new TAP test for this? Judging by
https://coverage.postgresql.org/src/backend/replication/logical/relation.c.gcov.html
showing all zeroes for the last function, we do. Same with
slot_store_error_callback in
https://coverage.postgresql.org/src/backend/replication/logical/worker.c.gcov.html
Thanks,
--
�lvaro Herrera https://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services
On Tue, Mar 6, 2018 at 8:35 AM, Alvaro Herrera <alvherre@alvh.no-ip.org> wrote:
I noticed that logicalrep_typmap_gettypname's only caller is
slot_store_error_callback, which is an error callback. So by raising an
error from the former, we would effectively just hide the actual reason
for the error behind the error that the cache entry cannot be found.Therefore, I'm inclined to make this function raise a warning, then
return a substitute value (something like "unrecognized type XYZ"). Do
the same for the case with the mismatched major versions. Lastly, if
LogicalRepTypMap is not defined, also throw a warning and return a
substitute value rather than try to initialize the hash table:
initializing the table is not going to make the entry show up in it,
anyway, so it's pretty pointless; and there's plenty chance for things
to go wrong, which is not a good idea in an error callback.
I agree with you about not hiding the actual reason for the error but
if we raise a warning at logicalrep_typmap_gettypname don't we call
slot_store_error_callback recursively?
Do we need a new TAP test for this? Judging by
https://coverage.postgresql.org/src/backend/replication/logical/relation.c.gcov.html
showing all zeroes for the last function, we do. Same with
slot_store_error_callback in
https://coverage.postgresql.org/src/backend/replication/logical/worker.c.gcov.html
Agreed. Will add a TAP test.
Regards,
--
Masahiko Sawada
NIPPON TELEGRAPH AND TELEPHONE CORPORATION
NTT Open Source Software Center
Masahiko Sawada wrote:
On Tue, Mar 6, 2018 at 8:35 AM, Alvaro Herrera <alvherre@alvh.no-ip.org> wrote:
Therefore, I'm inclined to make this function raise a warning, then
return a substitute value (something like "unrecognized type XYZ").
[...]I agree with you about not hiding the actual reason for the error but
if we raise a warning at logicalrep_typmap_gettypname don't we call
slot_store_error_callback recursively?
Hmm, now that you mention it, I don't really know. I think it's
supposed not to happen, since calling ereport() again opens a new
recursion level, but then maybe errcontext doesn't depend on the
recursion level ... I haven't checked. This is why the TAP test would
be handy :-)
Agreed. Will add a TAP test.
Great. This patch waits on that, then.
--
�lvaro Herrera https://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services
On Wed, Mar 7, 2018 at 2:52 AM, Alvaro Herrera <alvherre@alvh.no-ip.org> wrote:
Masahiko Sawada wrote:
On Tue, Mar 6, 2018 at 8:35 AM, Alvaro Herrera <alvherre@alvh.no-ip.org> wrote:
Therefore, I'm inclined to make this function raise a warning, then
return a substitute value (something like "unrecognized type XYZ").
[...]I agree with you about not hiding the actual reason for the error but
if we raise a warning at logicalrep_typmap_gettypname don't we call
slot_store_error_callback recursively?Hmm, now that you mention it, I don't really know. I think it's
supposed not to happen, since calling ereport() again opens a new
recursion level, but then maybe errcontext doesn't depend on the
recursion level ... I haven't checked. This is why the TAP test would
be handy :-)
The calling ereport opens a new recursion level. The calling ereport
with error doesn't return to caller but the calling with warning does.
So the recursively calling ereport(WARNING) ends up with exceeding the
errordata stack size. So it seems to me that we can set errcontext in
logicalrep_typmap_gettypname() instead of raising warning or error.
Agreed. Will add a TAP test.
Great. This patch waits on that, then.
Okay. I think the most simple and convenient way to reproduce this
issue is to call an elog(LOG) in input function of a user-defined data
type. So I'm thinking to create the test in src/test/module directory.
Regards,
--
Masahiko Sawada
NIPPON TELEGRAPH AND TELEPHONE CORPORATION
NTT Open Source Software Center
On Wed, Mar 7, 2018 at 9:19 PM, Masahiko Sawada <sawada.mshk@gmail.com> wrote:
On Wed, Mar 7, 2018 at 2:52 AM, Alvaro Herrera <alvherre@alvh.no-ip.org> wrote:
Masahiko Sawada wrote:
On Tue, Mar 6, 2018 at 8:35 AM, Alvaro Herrera <alvherre@alvh.no-ip.org> wrote:
Therefore, I'm inclined to make this function raise a warning, then
return a substitute value (something like "unrecognized type XYZ").
[...]I agree with you about not hiding the actual reason for the error but
if we raise a warning at logicalrep_typmap_gettypname don't we call
slot_store_error_callback recursively?Hmm, now that you mention it, I don't really know. I think it's
supposed not to happen, since calling ereport() again opens a new
recursion level, but then maybe errcontext doesn't depend on the
recursion level ... I haven't checked. This is why the TAP test would
be handy :-)The calling ereport opens a new recursion level. The calling ereport
with error doesn't return to caller but the calling with warning does.
So the recursively calling ereport(WARNING) ends up with exceeding the
errordata stack size. So it seems to me that we can set errcontext in
logicalrep_typmap_gettypname() instead of raising warning or error.Agreed. Will add a TAP test.
Great. This patch waits on that, then.
Okay. I think the most simple and convenient way to reproduce this
issue is to call an elog(LOG) in input function of a user-defined data
type. So I'm thinking to create the test in src/test/module directory.
After more thought, I think since the errors in
logicalrep_typmap_gettypname are not relevant with the actual error
(i.g. type conversion error) it would not be good idea to show the
error message of "could not found data type entry" in errcontext.
It might be more appropriate if we return a substitute value
("unrecognized type" or "unrecognized built-in type") without raising
neither an error nor a warning. Thoughts?
Regarding to regression test, I added a new test module
test_subscription that creates a new user-defined data type. In a
subscription regression test, using test_subscription we make
subscriber call slot_store_error_callback and check if the subscriber
can correctly look up both remote and local data type strings. One
downside of this regression test is that in a failure case, the
duration of the test will be long (up to 180sec) because it has to
wait for the polling timeout.
Attached an updated patch with a regression test.
Regards,
--
Masahiko Sawada
NIPPON TELEGRAPH AND TELEPHONE CORPORATION
NTT Open Source Software Center
Attachments:
fix_slot_store_error_callback_v13.patchapplication/octet-stream; name=fix_slot_store_error_callback_v13.patchDownload
diff --git a/src/backend/replication/logical/relation.c b/src/backend/replication/logical/relation.c
index e492d26..01ace8f 100644
--- a/src/backend/replication/logical/relation.c
+++ b/src/backend/replication/logical/relation.c
@@ -35,8 +35,6 @@ static MemoryContext LogicalRepRelMapContext = NULL;
static HTAB *LogicalRepRelMap = NULL;
static HTAB *LogicalRepTypMap = NULL;
-static void logicalrep_typmap_invalidate_cb(Datum arg, int cacheid,
- uint32 hashvalue);
/*
* Relcache invalidation callback for our relation map cache.
@@ -115,8 +113,6 @@ logicalrep_relmap_init(void)
/* Watch for invalidation events. */
CacheRegisterRelcacheCallback(logicalrep_relmap_invalidate_cb,
(Datum) 0);
- CacheRegisterSyscacheCallback(TYPEOID, logicalrep_typmap_invalidate_cb,
- (Datum) 0);
}
/*
@@ -375,27 +371,6 @@ logicalrep_rel_close(LogicalRepRelMapEntry *rel, LOCKMODE lockmode)
rel->localrel = NULL;
}
-
-/*
- * Type cache invalidation callback for our type map cache.
- */
-static void
-logicalrep_typmap_invalidate_cb(Datum arg, int cacheid, uint32 hashvalue)
-{
- HASH_SEQ_STATUS status;
- LogicalRepTyp *entry;
-
- /* Just to be sure. */
- if (LogicalRepTypMap == NULL)
- return;
-
- /* invalidate all cache entries */
- hash_seq_init(&status, LogicalRepTypMap);
-
- while ((entry = (LogicalRepTyp *) hash_seq_search(&status)) != NULL)
- entry->typoid = InvalidOid;
-}
-
/*
* Free the type map cache entry data.
*/
@@ -404,8 +379,6 @@ logicalrep_typmap_free_entry(LogicalRepTyp *entry)
{
pfree(entry->nspname);
pfree(entry->typname);
-
- entry->typoid = InvalidOid;
}
/*
@@ -436,58 +409,57 @@ logicalrep_typmap_update(LogicalRepTyp *remotetyp)
entry->nspname = pstrdup(remotetyp->nspname);
entry->typname = pstrdup(remotetyp->typname);
MemoryContextSwitchTo(oldctx);
- entry->typoid = InvalidOid;
}
/*
- * Fetch type info from the cache.
+ * Fetch type name from the cache by remote type OID. Return a
+ * substitute value if we could not found the data type name.
+ * Since this function is called by slot_store_error_callback
+ * which is a error context callback function we don't complain
+ * here.
*/
-Oid
-logicalrep_typmap_getid(Oid remoteid)
+char *
+logicalrep_typmap_gettypname(Oid remoteid)
{
LogicalRepTyp *entry;
- bool found;
- Oid nspoid;
+ StringInfoData invalid_type_str;
+ bool found;
+
+ initStringInfo(&invalid_type_str);
/* Internal types are mapped directly. */
if (remoteid < FirstNormalObjectId)
{
if (!get_typisdefined(remoteid))
- ereport(ERROR,
- (errmsg("built-in type %u not found", remoteid),
- errhint("This can be caused by having a publisher with a higher PostgreSQL major version than the subscriber.")));
- return remoteid;
+ {
+ /*
+ * This can be caused by having a publisher with
+ * a higher PostgreSQL major version than the
+ * subscriber.
+ */
+ appendStringInfo(&invalid_type_str, "unrecognized built-in type %u", remoteid);
+ return invalid_type_str.data;
+ }
+
+ return format_type_be(remoteid);
}
if (LogicalRepTypMap == NULL)
- logicalrep_relmap_init();
+ {
+ appendStringInfo(&invalid_type_str, "unrecognized type %u", remoteid);
+ return invalid_type_str.data;
+ }
/* Try finding the mapping. */
entry = hash_search(LogicalRepTypMap, (void *) &remoteid,
HASH_FIND, &found);
if (!found)
- elog(ERROR, "no type map entry for remote type %u",
- remoteid);
-
- /* Found and mapped, return the oid. */
- if (OidIsValid(entry->typoid))
- return entry->typoid;
-
- /* Otherwise, try to map to local type. */
- nspoid = LookupExplicitNamespace(entry->nspname, true);
- if (OidIsValid(nspoid))
- entry->typoid = GetSysCacheOid2(TYPENAMENSP,
- PointerGetDatum(entry->typname),
- ObjectIdGetDatum(nspoid));
- else
- entry->typoid = InvalidOid;
-
- if (!OidIsValid(entry->typoid))
- ereport(ERROR,
- (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
- errmsg("data type \"%s.%s\" required for logical replication does not exist",
- entry->nspname, entry->typname)));
+ {
+ appendStringInfo(&invalid_type_str, "unrecognized type %u", remoteid);
+ return invalid_type_str.data;
+ }
- return entry->typoid;
+ Assert(OidIsValid(entry->remoteid));
+ return entry->typname;
}
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 04985c9..c6a9f88 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -100,8 +100,9 @@ static dlist_head lsn_mapping = DLIST_STATIC_INIT(lsn_mapping);
typedef struct SlotErrCallbackArg
{
- LogicalRepRelation *rel;
- int attnum;
+ LogicalRepRelMapEntry *rel;
+ int local_attnum;
+ int remote_attnum;
} SlotErrCallbackArg;
static MemoryContext ApplyMessageContext = NULL;
@@ -281,20 +282,30 @@ slot_fill_defaults(LogicalRepRelMapEntry *rel, EState *estate,
static void
slot_store_error_callback(void *arg)
{
- SlotErrCallbackArg *errarg = (SlotErrCallbackArg *) arg;
+ SlotErrCallbackArg *errarg = (SlotErrCallbackArg *) arg;
+ LogicalRepRelMapEntry *rel;
+ char *remotetypname;
Oid remotetypoid,
localtypoid;
- if (errarg->attnum < 0)
+ /* Return if remote attribute number is not set */
+ if (errarg->remote_attnum < 0)
return;
- remotetypoid = errarg->rel->atttyps[errarg->attnum];
- localtypoid = logicalrep_typmap_getid(remotetypoid);
+ rel = errarg->rel;
+ remotetypoid = rel->remoterel.atttyps[errarg->remote_attnum];
+
+ /* Fetch remote type name from the LogicalRepTypMap cache */
+ remotetypname = logicalrep_typmap_gettypname(remotetypoid);
+
+ /* Fetch local type OID from the local sys cache */
+ localtypoid = get_atttype(errarg->rel->localreloid, errarg->local_attnum + 1);
+
errcontext("processing remote data for replication target relation \"%s.%s\" column \"%s\", "
- "remote type %s, local type %s",
- errarg->rel->nspname, errarg->rel->relname,
- errarg->rel->attnames[errarg->attnum],
- format_type_be(remotetypoid),
+ "remote type \"%s\", local type \"%s\"",
+ rel->remoterel.nspname, rel->remoterel.relname,
+ rel->remoterel.attnames[errarg->remote_attnum],
+ remotetypname,
format_type_be(localtypoid));
}
@@ -315,8 +326,9 @@ slot_store_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel,
ExecClearTuple(slot);
/* Push callback + info on the error context stack */
- errarg.rel = &rel->remoterel;
- errarg.attnum = -1;
+ errarg.rel = rel;
+ errarg.local_attnum = -1;
+ errarg.remote_attnum = -1;
errcallback.callback = slot_store_error_callback;
errcallback.arg = (void *) &errarg;
errcallback.previous = error_context_stack;
@@ -334,8 +346,8 @@ slot_store_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel,
Oid typinput;
Oid typioparam;
- errarg.attnum = remoteattnum;
-
+ errarg.local_attnum = i;
+ errarg.remote_attnum = remoteattnum;
getTypeInputInfo(att->atttypid, &typinput, &typioparam);
slot->tts_values[i] = OidInputFunctionCall(typinput,
values[remoteattnum],
@@ -380,8 +392,9 @@ slot_modify_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel,
ExecClearTuple(slot);
/* Push callback + info on the error context stack */
- errarg.rel = &rel->remoterel;
- errarg.attnum = -1;
+ errarg.rel = rel;
+ errarg.local_attnum = -1;
+ errarg.remote_attnum = -1;
errcallback.callback = slot_store_error_callback;
errcallback.arg = (void *) &errarg;
errcallback.previous = error_context_stack;
@@ -404,7 +417,8 @@ slot_modify_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel,
Oid typinput;
Oid typioparam;
- errarg.attnum = remoteattnum;
+ errarg.local_attnum = i;
+ errarg.remote_attnum = remoteattnum;
getTypeInputInfo(att->atttypid, &typinput, &typioparam);
slot->tts_values[i] = OidInputFunctionCall(typinput,
diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h
index 0eb2105..56f6095 100644
--- a/src/include/replication/logicalproto.h
+++ b/src/include/replication/logicalproto.h
@@ -55,10 +55,9 @@ typedef struct LogicalRepRelation
/* Type mapping info */
typedef struct LogicalRepTyp
{
- Oid remoteid; /* unique id of the type */
- char *nspname; /* schema name */
- char *typname; /* name of the type */
- Oid typoid; /* local type Oid */
+ Oid remoteid; /* unique id of the remote type */
+ char *nspname; /* schema name of remote type */
+ char *typname; /* name of the remote type */
} LogicalRepTyp;
/* Transaction info */
diff --git a/src/include/replication/logicalrelation.h b/src/include/replication/logicalrelation.h
index d4250c2..fc2d808 100644
--- a/src/include/replication/logicalrelation.h
+++ b/src/include/replication/logicalrelation.h
@@ -37,6 +37,6 @@ extern void logicalrep_rel_close(LogicalRepRelMapEntry *rel,
LOCKMODE lockmode);
extern void logicalrep_typmap_update(LogicalRepTyp *remotetyp);
-extern Oid logicalrep_typmap_getid(Oid remoteid);
+extern char *logicalrep_typmap_gettypname(Oid remoteid);
#endif /* LOGICALRELATION_H */
diff --git a/src/test/modules/Makefile b/src/test/modules/Makefile
index 7294b69..a455634 100644
--- a/src/test/modules/Makefile
+++ b/src/test/modules/Makefile
@@ -17,6 +17,7 @@ SUBDIRS = \
test_rbtree \
test_rls_hooks \
test_shm_mq \
+ test_subscription \
worker_spi
all: submake-generated-headers
diff --git a/src/test/modules/test_subscription/Makefile b/src/test/modules/test_subscription/Makefile
new file mode 100644
index 0000000..ec3d13d
--- /dev/null
+++ b/src/test/modules/test_subscription/Makefile
@@ -0,0 +1,19 @@
+# src/test/modules/test_subscription/Makefile
+
+MODULE_big = test_subscription
+OBJS = test_subscription.o $(WIN32RES)
+PGFILEDESC = "test_subscription - module for subscription test"
+
+EXTENSION = test_subscription
+DATA = test_subscription--1.0.sql
+
+ifdef USE_PGXS
+PG_CONFIG = pg_config
+PGXS := $(shell $(PG_CONFIG) --pgxs)
+include $(PGXS)
+else
+subdir = src/test/modules/test_subscription
+top_builddir = ../../../..
+include $(top_builddir)/src/Makefile.global
+include $(top_srcdir)/contrib/contrib-global.mk
+endif
diff --git a/src/test/modules/test_subscription/TAGS b/src/test/modules/test_subscription/TAGS
new file mode 120000
index 0000000..1a96393
--- /dev/null
+++ b/src/test/modules/test_subscription/TAGS
@@ -0,0 +1 @@
+/home/masahiko/source/postgresql/TAGS
\ No newline at end of file
diff --git a/src/test/modules/test_subscription/test_subscription--1.0.sql b/src/test/modules/test_subscription/test_subscription--1.0.sql
new file mode 100644
index 0000000..42bc85f
--- /dev/null
+++ b/src/test/modules/test_subscription/test_subscription--1.0.sql
@@ -0,0 +1,22 @@
+/* src/test/modules/test_subscription/test_subscription--1.0.sql */
+
+-- complain if script is sourced in psql, rather than via CREATE EXTENSION
+\echo Use "CREATE EXTENSION test_subscription" to load this file. \quit
+
+CREATE TYPE dummytext;
+
+CREATE FUNCTION dummytext_in(cstring)
+RETURNS dummytext
+AS 'MODULE_PATHNAME'
+LANGUAGE C IMMUTABLE STRICT PARALLEL SAFE;
+
+CREATE FUNCTION dummytext_out(dummytext)
+RETURNS cstring
+AS 'MODULE_PATHNAME'
+LANGUAGE C IMMUTABLE STRICT PARALLEL SAFE;
+
+CREATE TYPE dummytext (
+ INTERNALLENGTH = -1,
+ INPUT = dummytext_in,
+ OUTPUT = dummytext_out
+);
diff --git a/src/test/modules/test_subscription/test_subscription.c b/src/test/modules/test_subscription/test_subscription.c
new file mode 100644
index 0000000..b2578d4
--- /dev/null
+++ b/src/test/modules/test_subscription/test_subscription.c
@@ -0,0 +1,44 @@
+/*--------------------------------------------------------------------------
+ *
+ * test_subscription.c
+ * Code for testing logical replication subscriptions.
+ *
+ * Copyright (c) 2018, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ * src/test/modules/test_subscription/test_subscription.c
+ *
+ * -------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "fmgr.h"
+
+#include <utils/builtins.h>
+
+PG_MODULE_MAGIC;
+
+PG_FUNCTION_INFO_V1(dummytext_in);
+PG_FUNCTION_INFO_V1(dummytext_out);
+
+/* Dummy input of data type function */
+Datum
+dummytext_in(PG_FUNCTION_ARGS)
+{
+ char *inputText = PG_GETARG_CSTRING(0);
+
+ if (inputText)
+ elog(LOG, "intput text: \"%s\"", inputText);
+
+ PG_RETURN_TEXT_P(cstring_to_text(inputText));
+}
+
+/* Dummy output of data type function */
+Datum
+dummytext_out(PG_FUNCTION_ARGS)
+{
+ Datum txt = PG_GETARG_DATUM(0);
+
+ PG_RETURN_CSTRING(TextDatumGetCString(txt));
+}
diff --git a/src/test/modules/test_subscription/test_subscription.control b/src/test/modules/test_subscription/test_subscription.control
new file mode 100644
index 0000000..ffea087
--- /dev/null
+++ b/src/test/modules/test_subscription/test_subscription.control
@@ -0,0 +1,4 @@
+comment = 'Test subscription with an extension'
+default_version = '1.0'
+module_pathname = '$libdir/test_subscription'
+relocatable = true
diff --git a/src/test/subscription/Makefile b/src/test/subscription/Makefile
index 0f3d209..f5e76bb 100644
--- a/src/test/subscription/Makefile
+++ b/src/test/subscription/Makefile
@@ -13,7 +13,7 @@ subdir = src/test/subscription
top_builddir = ../../..
include $(top_builddir)/src/Makefile.global
-EXTRA_INSTALL = contrib/hstore
+EXTRA_INSTALL = contrib/hstore src/test/modules/test_subscription
check:
$(prove_check)
diff --git a/src/test/subscription/t/010_slot_store_error.pl b/src/test/subscription/t/010_slot_store_error.pl
new file mode 100644
index 0000000..8c4b49f
--- /dev/null
+++ b/src/test/subscription/t/010_slot_store_error.pl
@@ -0,0 +1,56 @@
+# This tests that the errors when data type conversion are correctly
+# handled by logical replication apply workers
+use strict;
+use warnings;
+use PostgresNode;
+use TestLib;
+use Test::More tests => 1;
+
+# Initialize publisher node
+my $node_publisher = get_new_node('publisher');
+$node_publisher->init(allows_streaming => 'logical');
+$node_publisher->start;
+
+# Create subscriber node
+my $node_subscriber = get_new_node('subscriber');
+$node_subscriber->init(allows_streaming => 'logical');
+$node_subscriber->start;
+
+# Setup same table by different steps so that publisher
+# and subscriber get different OID of the dummytext data.
+# It's necessary for checking if the subscriber can correcly
+# look up both remote and local data type strings.
+my $ddl = qq(
+CREATE EXTENSION test_subscription;
+CREATE TABLE test (a dummytext););
+
+$node_publisher->safe_psql('postgres', qq(
+CREATE EXTENSION hstore;));
+$node_publisher->safe_psql('postgres', $ddl);
+$node_subscriber->safe_psql('postgres', $ddl);
+
+# Setup logical replication
+my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+$node_publisher->safe_psql('postgres', qq(
+CREATE PUBLICATION tap_pub FOR TABLE test
+));
+my $appname = 'tap_sub';
+$node_subscriber->safe_psql('postgres', qq(
+CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub WITH (slot_name = tap_sub_slot, copy_data = false)
+));
+
+# Insert test data.which will lead to call the callback
+# function for the date type conversion on subscriber.
+$node_publisher->safe_psql('postgres', qq(
+INSERT INTO test VALUES ('1');
+));
+
+$node_publisher->wait_for_catchup($appname);
+
+# Check the data on subscriber
+my $result = $node_subscriber->safe_psql('postgres', qq(
+SELECT a FROM test;
+));
+
+# Inserted data is replicated correctly
+is( $result, '1');
Masahiko Sawada wrote:
After more thought, I think since the errors in
logicalrep_typmap_gettypname are not relevant with the actual error
(i.g. type conversion error) it would not be good idea to show the
error message of "could not found data type entry" in errcontext.
It might be more appropriate if we return a substitute value
("unrecognized type" or "unrecognized built-in type") without raising
neither an error nor a warning. Thoughts?
Yeah, seems a reasonable idea to me. Here's a tidied-up version of your
patch, minus the regression test changes (I may end up committing that
one separately). But I now hesitate to push it because I'm unsure of
how does type mapping actually work, and whether it's still working
after this patch -- for example if we create two user-defined datatypes
in opposite orders in the nodes (so they get different OIDs), are we
able to replicate data correctly from one side to the other? If there's
code to support this case, it is not at all obvious where it is.
Regarding to regression test, I added a new test module
test_subscription that creates a new user-defined data type.
I think this is a good thing to add. I wonder if we could have the
module create two extensions, and have the TAP test create first A then
B in one node, and first B in the other node, replicate a table that
uses type A, see how it fails, then create type A and ensure replication
works correctly. (Ideally the types would not be binary compatible, so
that it'd not work by accident.)
One downside of this regression test is that in a failure case, the
duration of the test will be long (up to 180sec) because it has to
wait for the polling timeout.
This is bad. I haven't read it.
--
�lvaro Herrera https://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services
Alvaro Herrera wrote:
Yeah, seems a reasonable idea to me. Here's a tidied-up version of your
patch, minus the regression test changes (I may end up committing that
one separately).
... and patch.
--
�lvaro Herrera https://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services
Attachments:
0001-logical-replication-change-how-datatype-mapping-is-u.patchtext/plain; charset=us-asciiDownload
From 7e14d8afb10744508313a56a76c86b2a4c6b38f0 Mon Sep 17 00:00:00 2001
From: Alvaro Herrera <alvherre@alvh.no-ip.org>
Date: Wed, 14 Mar 2018 14:15:28 -0300
Subject: [PATCH] logical replication: change how datatype mapping is used
---
src/backend/replication/logical/relation.c | 90 ++++++++++--------------------
src/backend/replication/logical/worker.c | 63 +++++++++++++--------
src/include/replication/logicalproto.h | 7 +--
src/include/replication/logicalrelation.h | 2 +-
4 files changed, 74 insertions(+), 88 deletions(-)
diff --git a/src/backend/replication/logical/relation.c b/src/backend/replication/logical/relation.c
index e492d26d18..1f20df5680 100644
--- a/src/backend/replication/logical/relation.c
+++ b/src/backend/replication/logical/relation.c
@@ -35,8 +35,6 @@ static MemoryContext LogicalRepRelMapContext = NULL;
static HTAB *LogicalRepRelMap = NULL;
static HTAB *LogicalRepTypMap = NULL;
-static void logicalrep_typmap_invalidate_cb(Datum arg, int cacheid,
- uint32 hashvalue);
/*
* Relcache invalidation callback for our relation map cache.
@@ -115,8 +113,6 @@ logicalrep_relmap_init(void)
/* Watch for invalidation events. */
CacheRegisterRelcacheCallback(logicalrep_relmap_invalidate_cb,
(Datum) 0);
- CacheRegisterSyscacheCallback(TYPEOID, logicalrep_typmap_invalidate_cb,
- (Datum) 0);
}
/*
@@ -375,27 +371,6 @@ logicalrep_rel_close(LogicalRepRelMapEntry *rel, LOCKMODE lockmode)
rel->localrel = NULL;
}
-
-/*
- * Type cache invalidation callback for our type map cache.
- */
-static void
-logicalrep_typmap_invalidate_cb(Datum arg, int cacheid, uint32 hashvalue)
-{
- HASH_SEQ_STATUS status;
- LogicalRepTyp *entry;
-
- /* Just to be sure. */
- if (LogicalRepTypMap == NULL)
- return;
-
- /* invalidate all cache entries */
- hash_seq_init(&status, LogicalRepTypMap);
-
- while ((entry = (LogicalRepTyp *) hash_seq_search(&status)) != NULL)
- entry->typoid = InvalidOid;
-}
-
/*
* Free the type map cache entry data.
*/
@@ -404,8 +379,6 @@ logicalrep_typmap_free_entry(LogicalRepTyp *entry)
{
pfree(entry->nspname);
pfree(entry->typname);
-
- entry->typoid = InvalidOid;
}
/*
@@ -436,58 +409,53 @@ logicalrep_typmap_update(LogicalRepTyp *remotetyp)
entry->nspname = pstrdup(remotetyp->nspname);
entry->typname = pstrdup(remotetyp->typname);
MemoryContextSwitchTo(oldctx);
- entry->typoid = InvalidOid;
}
/*
- * Fetch type info from the cache.
+ * Fetch type name from the cache by remote type OID.
+ *
+ * Return a substitute value if we cannot find the data type; no message is
+ * sent to the log in that case, because this is used by error callback
+ * already.
*/
-Oid
-logicalrep_typmap_getid(Oid remoteid)
+char *
+logicalrep_typmap_gettypname(Oid remoteid)
{
LogicalRepTyp *entry;
bool found;
- Oid nspoid;
/* Internal types are mapped directly. */
if (remoteid < FirstNormalObjectId)
{
if (!get_typisdefined(remoteid))
- ereport(ERROR,
- (errmsg("built-in type %u not found", remoteid),
- errhint("This can be caused by having a publisher with a higher PostgreSQL major version than the subscriber.")));
- return remoteid;
+ {
+ /*
+ * This can be caused by having a publisher with a higher
+ * PostgreSQL major version than the subscriber.
+ */
+ return psprintf("unrecognized %u", remoteid);
+ }
+
+ return format_type_be(remoteid);
}
if (LogicalRepTypMap == NULL)
- logicalrep_relmap_init();
+ {
+ /*
+ * If the typemap is not initialized yet, we cannot possibly attempt
+ * to search the hash table; but there's no way we know the type
+ * locally yet, since we haven't received a message about this type,
+ * so this is the best we can do.
+ */
+ return psprintf("unrecognized %u", remoteid);
+ }
- /* Try finding the mapping. */
+ /* search the mapping */
entry = hash_search(LogicalRepTypMap, (void *) &remoteid,
HASH_FIND, &found);
-
if (!found)
- elog(ERROR, "no type map entry for remote type %u",
- remoteid);
+ return psprintf("unrecognized %u", remoteid);
- /* Found and mapped, return the oid. */
- if (OidIsValid(entry->typoid))
- return entry->typoid;
-
- /* Otherwise, try to map to local type. */
- nspoid = LookupExplicitNamespace(entry->nspname, true);
- if (OidIsValid(nspoid))
- entry->typoid = GetSysCacheOid2(TYPENAMENSP,
- PointerGetDatum(entry->typname),
- ObjectIdGetDatum(nspoid));
- else
- entry->typoid = InvalidOid;
-
- if (!OidIsValid(entry->typoid))
- ereport(ERROR,
- (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
- errmsg("data type \"%s.%s\" required for logical replication does not exist",
- entry->nspname, entry->typname)));
-
- return entry->typoid;
+ Assert(OidIsValid(entry->remoteid));
+ return psprintf("%s.%s", entry->nspname, entry->typname);
}
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 04985c9f91..fdace7eea2 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -100,8 +100,9 @@ static dlist_head lsn_mapping = DLIST_STATIC_INIT(lsn_mapping);
typedef struct SlotErrCallbackArg
{
- LogicalRepRelation *rel;
- int attnum;
+ LogicalRepRelMapEntry *rel;
+ int local_attnum;
+ int remote_attnum;
} SlotErrCallbackArg;
static MemoryContext ApplyMessageContext = NULL;
@@ -282,19 +283,29 @@ static void
slot_store_error_callback(void *arg)
{
SlotErrCallbackArg *errarg = (SlotErrCallbackArg *) arg;
+ LogicalRepRelMapEntry *rel;
+ char *remotetypname;
Oid remotetypoid,
localtypoid;
- if (errarg->attnum < 0)
+ /* Nothing to do if remote attribute number is not set */
+ if (errarg->remote_attnum < 0)
return;
- remotetypoid = errarg->rel->atttyps[errarg->attnum];
- localtypoid = logicalrep_typmap_getid(remotetypoid);
+ rel = errarg->rel;
+ remotetypoid = rel->remoterel.atttyps[errarg->remote_attnum];
+
+ /* Fetch remote type name from the LogicalRepTypMap cache */
+ remotetypname = logicalrep_typmap_gettypname(remotetypoid);
+
+ /* Fetch local type OID from the local sys cache */
+ localtypoid = get_atttype(rel->localreloid, errarg->local_attnum + 1);
+
errcontext("processing remote data for replication target relation \"%s.%s\" column \"%s\", "
"remote type %s, local type %s",
- errarg->rel->nspname, errarg->rel->relname,
- errarg->rel->attnames[errarg->attnum],
- format_type_be(remotetypoid),
+ rel->remoterel.nspname, rel->remoterel.relname,
+ rel->remoterel.attnames[errarg->remote_attnum],
+ remotetypname,
format_type_be(localtypoid));
}
@@ -315,8 +326,9 @@ slot_store_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel,
ExecClearTuple(slot);
/* Push callback + info on the error context stack */
- errarg.rel = &rel->remoterel;
- errarg.attnum = -1;
+ errarg.rel = rel;
+ errarg.local_attnum = -1;
+ errarg.remote_attnum = -1;
errcallback.callback = slot_store_error_callback;
errcallback.arg = (void *) &errarg;
errcallback.previous = error_context_stack;
@@ -334,14 +346,17 @@ slot_store_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel,
Oid typinput;
Oid typioparam;
- errarg.attnum = remoteattnum;
+ errarg.local_attnum = i;
+ errarg.remote_attnum = remoteattnum;
getTypeInputInfo(att->atttypid, &typinput, &typioparam);
- slot->tts_values[i] = OidInputFunctionCall(typinput,
- values[remoteattnum],
- typioparam,
- att->atttypmod);
+ slot->tts_values[i] =
+ OidInputFunctionCall(typinput, values[remoteattnum],
+ typioparam, att->atttypmod);
slot->tts_isnull[i] = false;
+
+ errarg.local_attnum = -1;
+ errarg.remote_attnum = -1;
}
else
{
@@ -380,8 +395,9 @@ slot_modify_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel,
ExecClearTuple(slot);
/* Push callback + info on the error context stack */
- errarg.rel = &rel->remoterel;
- errarg.attnum = -1;
+ errarg.rel = rel;
+ errarg.local_attnum = -1;
+ errarg.remote_attnum = -1;
errcallback.callback = slot_store_error_callback;
errcallback.arg = (void *) &errarg;
errcallback.previous = error_context_stack;
@@ -404,14 +420,17 @@ slot_modify_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel,
Oid typinput;
Oid typioparam;
- errarg.attnum = remoteattnum;
+ errarg.local_attnum = i;
+ errarg.remote_attnum = remoteattnum;
getTypeInputInfo(att->atttypid, &typinput, &typioparam);
- slot->tts_values[i] = OidInputFunctionCall(typinput,
- values[remoteattnum],
- typioparam,
- att->atttypmod);
+ slot->tts_values[i] =
+ OidInputFunctionCall(typinput, values[remoteattnum],
+ typioparam, att->atttypmod);
slot->tts_isnull[i] = false;
+
+ errarg.local_attnum = -1;
+ errarg.remote_attnum = -1;
}
else
{
diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h
index 0eb21057c5..116f16f42d 100644
--- a/src/include/replication/logicalproto.h
+++ b/src/include/replication/logicalproto.h
@@ -55,10 +55,9 @@ typedef struct LogicalRepRelation
/* Type mapping info */
typedef struct LogicalRepTyp
{
- Oid remoteid; /* unique id of the type */
- char *nspname; /* schema name */
- char *typname; /* name of the type */
- Oid typoid; /* local type Oid */
+ Oid remoteid; /* unique id of the remote type */
+ char *nspname; /* schema name of remote type */
+ char *typname; /* name of the remote type */
} LogicalRepTyp;
/* Transaction info */
diff --git a/src/include/replication/logicalrelation.h b/src/include/replication/logicalrelation.h
index d4250c2608..73e4805827 100644
--- a/src/include/replication/logicalrelation.h
+++ b/src/include/replication/logicalrelation.h
@@ -37,6 +37,6 @@ extern void logicalrep_rel_close(LogicalRepRelMapEntry *rel,
LOCKMODE lockmode);
extern void logicalrep_typmap_update(LogicalRepTyp *remotetyp);
-extern Oid logicalrep_typmap_getid(Oid remoteid);
+extern char *logicalrep_typmap_gettypname(Oid remoteid);
#endif /* LOGICALRELATION_H */
--
2.11.0
Masahiko Sawada wrote:
Regarding to regression test, I added a new test module
test_subscription that creates a new user-defined data type. In a
subscription regression test, using test_subscription we make
subscriber call slot_store_error_callback and check if the subscriber
can correctly look up both remote and local data type strings. One
downside of this regression test is that in a failure case, the
duration of the test will be long (up to 180sec) because it has to
wait for the polling timeout.
Attached an updated patch with a regression test.
Pushed the fix to pg10 and master. Thanks to all involved for the
report, patches and review.
Here's the regression test patch. The problem with it is that the TAP
test is not verifying much -- I tried applying it before the fix commit,
and it succeeds! The only funny is that the errcontext messages are
wrong, they look like this:
2018-03-14 20:31:03.564 -03 [763018] LOG: input int: 1
2018-03-14 20:31:03.564 -03 [763018] CONTEXT: processing remote data for replication target relation "public.test" column "b", remote type dummytext, local type dummyint
2018-03-14 20:31:03.564 -03 [763018] LOG: input text: "one"
2018-03-14 20:31:03.564 -03 [763018] CONTEXT: processing remote data for replication target relation "public.test" column "a", remote type dummyint, local type dummytext
but I think it would be better to verify them. (With your version I
think you were trusting that the OID would not match anything, giving
raise to the "cache lookup failed" error before the patch. I'm not sure
that that's very trustworthy either.)
I think this is a worthwhile test, but IMO it should be improved a bit
before we include it. Also, we can come up with a better name for the
test surely, not just refer to this particular bug. Maybe "typemap".
--
�lvaro Herrera https://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services
--
�lvaro Herrera https://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services
Attachments:
0001-test-module.patchtext/plain; charset=us-asciiDownload
From 2da536c0f6a081a50cb708de3ad6631345ec8dca Mon Sep 17 00:00:00 2001
From: Alvaro Herrera <alvherre@alvh.no-ip.org>
Date: Wed, 14 Mar 2018 16:47:44 -0300
Subject: [PATCH] test module
---
src/test/modules/Makefile | 1 +
src/test/modules/test_subscription/Makefile | 18 +++++++
.../modules/test_subscription/testsub1--1.0.sql | 22 +++++++++
src/test/modules/test_subscription/testsub1.c | 42 ++++++++++++++++
.../modules/test_subscription/testsub1.control | 4 ++
.../modules/test_subscription/testsub2--1.0.sql | 22 +++++++++
src/test/modules/test_subscription/testsub2.c | 45 +++++++++++++++++
.../modules/test_subscription/testsub2.control | 4 ++
src/test/subscription/Makefile | 2 +-
src/test/subscription/t/010_slot_store_error.pl | 56 ++++++++++++++++++++++
10 files changed, 215 insertions(+), 1 deletion(-)
create mode 100644 src/test/modules/test_subscription/Makefile
create mode 100644 src/test/modules/test_subscription/testsub1--1.0.sql
create mode 100644 src/test/modules/test_subscription/testsub1.c
create mode 100644 src/test/modules/test_subscription/testsub1.control
create mode 100644 src/test/modules/test_subscription/testsub2--1.0.sql
create mode 100644 src/test/modules/test_subscription/testsub2.c
create mode 100644 src/test/modules/test_subscription/testsub2.control
create mode 100644 src/test/subscription/t/010_slot_store_error.pl
diff --git a/src/test/modules/Makefile b/src/test/modules/Makefile
index 7294b6958b..a455634538 100644
--- a/src/test/modules/Makefile
+++ b/src/test/modules/Makefile
@@ -17,6 +17,7 @@ SUBDIRS = \
test_rbtree \
test_rls_hooks \
test_shm_mq \
+ test_subscription \
worker_spi
all: submake-generated-headers
diff --git a/src/test/modules/test_subscription/Makefile b/src/test/modules/test_subscription/Makefile
new file mode 100644
index 0000000000..12993e0ac5
--- /dev/null
+++ b/src/test/modules/test_subscription/Makefile
@@ -0,0 +1,18 @@
+# src/test/modules/test_subscription/Makefile
+
+MODULES = testsub1 testsub2
+EXTENSION = testsub1 testsub2
+DATA = testsub1--1.0.sql testsub2--1.0.sql
+
+PGFILEDESC = "test_subscription - mock extensions for subscription test"
+
+ifdef USE_PGXS
+PG_CONFIG = pg_config
+PGXS := $(shell $(PG_CONFIG) --pgxs)
+include $(PGXS)
+else
+subdir = src/test/modules/test_subscription
+top_builddir = ../../../..
+include $(top_builddir)/src/Makefile.global
+include $(top_srcdir)/contrib/contrib-global.mk
+endif
diff --git a/src/test/modules/test_subscription/testsub1--1.0.sql b/src/test/modules/test_subscription/testsub1--1.0.sql
new file mode 100644
index 0000000000..43ab7187df
--- /dev/null
+++ b/src/test/modules/test_subscription/testsub1--1.0.sql
@@ -0,0 +1,22 @@
+/* src/test/modules/test_subscription/testsub1--1.0.sql */
+
+-- complain if script is sourced in psql, rather than via CREATE EXTENSION
+\echo Use "CREATE EXTENSION testsub1" to load this file. \quit
+
+CREATE TYPE dummytext;
+
+CREATE FUNCTION dummytext_in(cstring)
+RETURNS dummytext
+AS 'MODULE_PATHNAME'
+LANGUAGE C IMMUTABLE STRICT PARALLEL SAFE;
+
+CREATE FUNCTION dummytext_out(dummytext)
+RETURNS cstring
+AS 'MODULE_PATHNAME'
+LANGUAGE C IMMUTABLE STRICT PARALLEL SAFE;
+
+CREATE TYPE dummytext (
+ INTERNALLENGTH = -1,
+ INPUT = dummytext_in,
+ OUTPUT = dummytext_out
+);
diff --git a/src/test/modules/test_subscription/testsub1.c b/src/test/modules/test_subscription/testsub1.c
new file mode 100644
index 0000000000..e94fbe735a
--- /dev/null
+++ b/src/test/modules/test_subscription/testsub1.c
@@ -0,0 +1,42 @@
+/*--------------------------------------------------------------------------
+ *
+ * testsub1.c
+ * Code for testing logical replication subscriptions.
+ *
+ * Copyright (c) 2018, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ * src/test/modules/test_subscription/testsub1.c
+ *
+ * -------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "fmgr.h"
+#include "utils/builtins.h"
+
+PG_MODULE_MAGIC;
+
+PG_FUNCTION_INFO_V1(dummytext_in);
+PG_FUNCTION_INFO_V1(dummytext_out);
+
+/* Dummy input of data type function */
+Datum
+dummytext_in(PG_FUNCTION_ARGS)
+{
+ char *inputText = PG_GETARG_CSTRING(0);
+
+ if (inputText)
+ elog(LOG, "input text: \"%s\"", inputText);
+
+ PG_RETURN_TEXT_P(cstring_to_text(inputText));
+}
+
+/* Dummy output of data type function */
+Datum
+dummytext_out(PG_FUNCTION_ARGS)
+{
+ Datum txt = PG_GETARG_DATUM(0);
+
+ PG_RETURN_CSTRING(TextDatumGetCString(txt));
+}
diff --git a/src/test/modules/test_subscription/testsub1.control b/src/test/modules/test_subscription/testsub1.control
new file mode 100644
index 0000000000..746bba3442
--- /dev/null
+++ b/src/test/modules/test_subscription/testsub1.control
@@ -0,0 +1,4 @@
+comment = 'extension testsub1 to test subscriptions'
+default_version = '1.0'
+module_pathname = '$libdir/testsub1'
+relocatable = true
diff --git a/src/test/modules/test_subscription/testsub2--1.0.sql b/src/test/modules/test_subscription/testsub2--1.0.sql
new file mode 100644
index 0000000000..4c94dd3c92
--- /dev/null
+++ b/src/test/modules/test_subscription/testsub2--1.0.sql
@@ -0,0 +1,22 @@
+/* src/test/modules/test_subscription/testsub2--1.0.sql */
+
+-- complain if script is sourced in psql, rather than via CREATE EXTENSION
+\echo Use "CREATE EXTENSION testsub2" to load this file. \quit
+
+CREATE TYPE dummyint;
+
+CREATE FUNCTION dummyint_in(cstring)
+RETURNS dummyint
+AS 'MODULE_PATHNAME'
+LANGUAGE C IMMUTABLE STRICT PARALLEL SAFE;
+
+CREATE FUNCTION dummyint_out(dummyint)
+RETURNS cstring
+AS 'MODULE_PATHNAME'
+LANGUAGE C IMMUTABLE STRICT PARALLEL SAFE;
+
+CREATE TYPE dummyint (
+ LIKE = pg_catalog.int4,
+ INPUT = dummyint_in,
+ OUTPUT = dummyint_out
+);
diff --git a/src/test/modules/test_subscription/testsub2.c b/src/test/modules/test_subscription/testsub2.c
new file mode 100644
index 0000000000..d1a91b6656
--- /dev/null
+++ b/src/test/modules/test_subscription/testsub2.c
@@ -0,0 +1,45 @@
+/*--------------------------------------------------------------------------
+ *
+ * testsub2.c
+ * Code for testing logical replication subscriptions.
+ *
+ * Copyright (c) 2018, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ * src/test/modules/test_subscription/testsub2.c
+ *
+ * -------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "fmgr.h"
+#include "utils/builtins.h"
+
+PG_MODULE_MAGIC;
+
+PG_FUNCTION_INFO_V1(dummyint_in);
+PG_FUNCTION_INFO_V1(dummyint_out);
+
+/* Dummy input of data type function */
+Datum
+dummyint_in(PG_FUNCTION_ARGS)
+{
+ char *num = PG_GETARG_CSTRING(0);
+ int32 val;
+
+ val = pg_atoi(num, sizeof(int32), '\0');
+ elog(LOG, "input int: %d", val);
+
+ PG_RETURN_INT32(val);
+}
+
+/* Dummy output of data type function */
+Datum
+dummyint_out(PG_FUNCTION_ARGS)
+{
+ int32 arg1 = PG_GETARG_INT32(0);
+ char *result = (char *) palloc(12); /* sign, 10 digits, '\0' */
+
+ pg_ltoa(arg1, result);
+ PG_RETURN_CSTRING(result);
+}
diff --git a/src/test/modules/test_subscription/testsub2.control b/src/test/modules/test_subscription/testsub2.control
new file mode 100644
index 0000000000..8a0341c86f
--- /dev/null
+++ b/src/test/modules/test_subscription/testsub2.control
@@ -0,0 +1,4 @@
+comment = 'extension testsub2 to test subscriptions'
+default_version = '1.0'
+module_pathname = '$libdir/testsub2'
+relocatable = true
diff --git a/src/test/subscription/Makefile b/src/test/subscription/Makefile
index 0f3d2098ad..f5e76bb264 100644
--- a/src/test/subscription/Makefile
+++ b/src/test/subscription/Makefile
@@ -13,7 +13,7 @@ subdir = src/test/subscription
top_builddir = ../../..
include $(top_builddir)/src/Makefile.global
-EXTRA_INSTALL = contrib/hstore
+EXTRA_INSTALL = contrib/hstore src/test/modules/test_subscription
check:
$(prove_check)
diff --git a/src/test/subscription/t/010_slot_store_error.pl b/src/test/subscription/t/010_slot_store_error.pl
new file mode 100644
index 0000000000..b525bc03dc
--- /dev/null
+++ b/src/test/subscription/t/010_slot_store_error.pl
@@ -0,0 +1,56 @@
+# This tests that the errors when data type conversion are correctly
+# handled by logical replication apply workers
+use strict;
+use warnings;
+
+use PostgresNode;
+use TestLib;
+use Test::More tests => 1;
+
+# Initialize publisher node
+my $node_publisher = get_new_node('publisher');
+$node_publisher->init(allows_streaming => 'logical');
+$node_publisher->start;
+
+# Create subscriber node
+my $node_subscriber = get_new_node('subscriber');
+$node_subscriber->init(allows_streaming => 'logical');
+$node_subscriber->start;
+
+# Setup same table by different steps so that publisher and subscriber get
+# different datatype OIDs.
+$node_publisher->safe_psql('postgres', qq[
+CREATE EXTENSION testsub2;
+CREATE EXTENSION testsub1;
+CREATE TABLE test (a dummytext, b dummyint);]);
+
+$node_subscriber->safe_psql('postgres', qq[
+CREATE EXTENSION testsub1;
+CREATE EXTENSION testsub2;
+CREATE TABLE test (b dummyint, a dummytext);]);
+
+# Setup logical replication
+my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+$node_publisher->safe_psql('postgres', qq[
+CREATE PUBLICATION tap_pub FOR TABLE test
+]);
+my $appname = 'tap_sub';
+$node_subscriber->safe_psql('postgres', qq[
+CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub WITH (slot_name = tap_sub_slot, copy_data = false)
+]);
+
+# Insert test data, which will lead to call the callback function for the date
+# type conversion on subscriber.
+$node_publisher->safe_psql('postgres', qq(
+INSERT INTO test VALUES ('one', '1');
+));
+
+$node_publisher->wait_for_catchup($appname);
+
+# Check the data on subscriber
+my $result = $node_subscriber->safe_psql('postgres', qq(
+SELECT a FROM test;
+));
+
+# Inserted data is replicated correctly
+is( $result, 'one');
--
2.11.0
On Thu, Mar 15, 2018 at 9:41 AM, Alvaro Herrera <alvherre@alvh.no-ip.org> wrote:
Masahiko Sawada wrote:
Regarding to regression test, I added a new test module
test_subscription that creates a new user-defined data type. In a
subscription regression test, using test_subscription we make
subscriber call slot_store_error_callback and check if the subscriber
can correctly look up both remote and local data type strings. One
downside of this regression test is that in a failure case, the
duration of the test will be long (up to 180sec) because it has to
wait for the polling timeout.
Attached an updated patch with a regression test.Pushed the fix to pg10 and master. Thanks to all involved for the
report, patches and review.
Thank you for committing!
Here's the regression test patch. The problem with it is that the TAP
test is not verifying much -- I tried applying it before the fix commit,
and it succeeds! The only funny is that the errcontext messages are
wrong, they look like this:2018-03-14 20:31:03.564 -03 [763018] LOG: input int: 1
2018-03-14 20:31:03.564 -03 [763018] CONTEXT: processing remote data for replication target relation "public.test" column "b", remote type dummytext, local type dummyint
2018-03-14 20:31:03.564 -03 [763018] LOG: input text: "one"
2018-03-14 20:31:03.564 -03 [763018] CONTEXT: processing remote data for replication target relation "public.test" column "a", remote type dummyint, local type dummytextbut I think it would be better to verify them. (With your version I
think you were trusting that the OID would not match anything, giving
raise to the "cache lookup failed" error before the patch. I'm not sure
that that's very trustworthy either.)
Agreed. Also, since my patch attempted to lead the error we need a
long time to check if it failed, which is not good.
I think this is a worthwhile test, but IMO it should be improved a bit
before we include it. Also, we can come up with a better name for the
test surely, not just refer to this particular bug. Maybe "typemap".
It might be useful if we have the facility of TAP test to check the
log message and regexp-match the message to the expected string.
Regards,
--
Masahiko Sawada
NIPPON TELEGRAPH AND TELEPHONE CORPORATION
NTT Open Source Software Center
Masahiko Sawada wrote:
On Thu, Mar 15, 2018 at 9:41 AM, Alvaro Herrera <alvherre@alvh.no-ip.org> wrote:
I think this is a worthwhile test, but IMO it should be improved a bit
before we include it. Also, we can come up with a better name for the
test surely, not just refer to this particular bug. Maybe "typemap".It might be useful if we have the facility of TAP test to check the
log message and regexp-match the message to the expected string.
Something similar to PostgresNode::issues_sql_like() perhaps?
--
�lvaro Herrera https://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services
Masahiko Sawada wrote:
Regarding to regression test, I added a new test module
test_subscription that creates a new user-defined data type. In a
subscription regression test, using test_subscription we make
subscriber call slot_store_error_callback and check if the subscriber
can correctly look up both remote and local data type strings. One
downside of this regression test is that in a failure case, the
duration of the test will be long (up to 180sec) because it has to
wait for the polling timeout.
Attached an updated patch with a regression test.Pushed the fix to pg10 and master. Thanks to all involved for the report,
patches and review.
Thank you all for fixing it.
---
Dang Minh Huong
NEC Solution Innovators, Ltd.
http://www.nec-solutioninnovators.co.jp/en/
On Fri, Mar 16, 2018 at 10:24 AM, Alvaro Herrera
<alvherre@alvh.no-ip.org> wrote:
Masahiko Sawada wrote:
On Thu, Mar 15, 2018 at 9:41 AM, Alvaro Herrera <alvherre@alvh.no-ip.org> wrote:
I think this is a worthwhile test, but IMO it should be improved a bit
before we include it. Also, we can come up with a better name for the
test surely, not just refer to this particular bug. Maybe "typemap".It might be useful if we have the facility of TAP test to check the
log message and regexp-match the message to the expected string.Something similar to PostgresNode::issues_sql_like() perhaps?
Yeah, I didn't know that but I think it's a good idea. Unlike
issues_sql_like() we don't issue anything to the subscriber. So maybe
we need truncate logfile before insertion and verify logfile of
particular period. The test code would be like follows.
$node_subscriber->safe_psql('postgres', 'CREATE SUBSCRIPTION...");
truncate $node_subscriber->logfile, 0;
$node_publisher->safe_psql('postgres', 'INSERT .. ')
my $log = TestLib::slurp_file($node_subscriber->logfile);
# Verify logs
like($log, qr/processing remote data for replication target relation
"public.test" column "b", remote type dummyint, local type dummyint/,
'callback function of datatype conversion1');
like($log, qr/processing remote data for replication target relation
"public.test" column "a", remote type dummytext, local type
dummytext/, 'callback function of datatype conversion2');
Thoughts?
Regards,
--
Masahiko Sawada
NIPPON TELEGRAPH AND TELEPHONE CORPORATION
NTT Open Source Software Center
At Mon, 19 Mar 2018 12:50:55 +0900, Masahiko Sawada <sawada.mshk@gmail.com> wrote in <CAD21AoD=kiDVxfZ2aT_Oeg7+5etkxg0eqmsRE-gcUbptKNir6g@mail.gmail.com>
On Fri, Mar 16, 2018 at 10:24 AM, Alvaro Herrera
<alvherre@alvh.no-ip.org> wrote:Masahiko Sawada wrote:
On Thu, Mar 15, 2018 at 9:41 AM, Alvaro Herrera <alvherre@alvh.no-ip.org> wrote:
I think this is a worthwhile test, but IMO it should be improved a bit
before we include it. Also, we can come up with a better name for the
test surely, not just refer to this particular bug. Maybe "typemap".It might be useful if we have the facility of TAP test to check the
log message and regexp-match the message to the expected string.Something similar to PostgresNode::issues_sql_like() perhaps?
Yeah, I didn't know that but I think it's a good idea. Unlike
issues_sql_like() we don't issue anything to the subscriber. So maybe
we need truncate logfile before insertion and verify logfile of
particular period. The test code would be like follows.$node_subscriber->safe_psql('postgres', 'CREATE SUBSCRIPTION...");
truncate $node_subscriber->logfile, 0;
$node_publisher->safe_psql('postgres', 'INSERT .. ')
my $log = TestLib::slurp_file($node_subscriber->logfile);# Verify logs
like($log, qr/processing remote data for replication target relation
"public.test" column "b", remote type dummyint, local type dummyint/,
'callback function of datatype conversion1');
like($log, qr/processing remote data for replication target relation
"public.test" column "a", remote type dummytext, local type
dummytext/, 'callback function of datatype conversion2');Thoughts?
FWIW something like is in a currently proposed patch.
/messages/by-id/20180129.194023.228030941.horiguchi.kyotaro@lab.ntt.co.jp
0003 contains the following function.
+# find $pat in logfile of $node after $off-th byte
+sub find_in_log
+{
+ my ($node, $pat, $off) = @_;
+
+ $off = 0 unless defined $off;
+ my $log = TestLib::slurp_file($node->logfile);
+ return 0 if (length($log) <= $off);
+
+ $log = substr($log, $off);
+
+ return $log =~ m/$pat/;
+}
It is used as the follows.
+$logstart = get_log_size($node_standby);
+$node_standby->start;
+
+my $failed = 0;
+for (my $i = 0 ; $i < 10000 ; $i++)
+{
+ if (find_in_log($node_standby,
+ "requested WAL segment [0-9A-F]+ has already been removed",
+ $logstart))
+ {
+ $failed = 1;
+ last;
+ }
+ usleep(100_000);
+}
+ok($failed, 'check replication has been broken');
regards,
--
Kyotaro Horiguchi
NTT Open Source Software Center
On Mon, Mar 19, 2018 at 12:50 PM, Masahiko Sawada <sawada.mshk@gmail.com> wrote:
On Fri, Mar 16, 2018 at 10:24 AM, Alvaro Herrera
<alvherre@alvh.no-ip.org> wrote:Masahiko Sawada wrote:
On Thu, Mar 15, 2018 at 9:41 AM, Alvaro Herrera <alvherre@alvh.no-ip.org> wrote:
I think this is a worthwhile test, but IMO it should be improved a bit
before we include it. Also, we can come up with a better name for the
test surely, not just refer to this particular bug. Maybe "typemap".It might be useful if we have the facility of TAP test to check the
log message and regexp-match the message to the expected string.Something similar to PostgresNode::issues_sql_like() perhaps?
Yeah, I didn't know that but I think it's a good idea. Unlike
issues_sql_like() we don't issue anything to the subscriber. So maybe
we need truncate logfile before insertion and verify logfile of
particular period. The test code would be like follows.$node_subscriber->safe_psql('postgres', 'CREATE SUBSCRIPTION...");
truncate $node_subscriber->logfile, 0;
$node_publisher->safe_psql('postgres', 'INSERT .. ')
my $log = TestLib::slurp_file($node_subscriber->logfile);# Verify logs
like($log, qr/processing remote data for replication target relation
"public.test" column "b", remote type dummyint, local type dummyint/,
'callback function of datatype conversion1');
like($log, qr/processing remote data for replication target relation
"public.test" column "a", remote type dummytext, local type
dummytext/, 'callback function of datatype conversion2');Thoughts?
Attached an updated test patch added the above test(0002 patch). Since
for this test case it's enough to use existing test functions I didn't
create new test functions. Also I found that the local data type name
in log for data type conversion isn't qualified whereas the remote
data type is always qualified. Attached 0001 patch fixes that.
Regards,
--
Masahiko Sawada
NIPPON TELEGRAPH AND TELEPHONE CORPORATION
NTT Open Source Software Center
Attachments:
0001-Qualify-datatype-name-in-log-of-data-type-conversion.patchapplication/octet-stream; name=0001-Qualify-datatype-name-in-log-of-data-type-conversion.patchDownload
From f8415b4683b099421c0039ffc467de25dd5ea02e Mon Sep 17 00:00:00 2001
From: Masahiko Sawada <sawada.mshk@gmail.com>
Date: Mon, 19 Mar 2018 19:44:36 +0900
Subject: [PATCH 1/2] Qualify datatype name in log of data type conversion on subscriber.
---
src/backend/replication/logical/worker.c | 2 +-
1 files changed, 1 insertions(+), 1 deletions(-)
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index fdace7e..319d0bf 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -306,7 +306,7 @@ slot_store_error_callback(void *arg)
rel->remoterel.nspname, rel->remoterel.relname,
rel->remoterel.attnames[errarg->remote_attnum],
remotetypname,
- format_type_be(localtypoid));
+ format_type_be_qualified(localtypoid));
}
/*
--
1.7.1
0002-test-module.patchapplication/octet-stream; name=0002-test-module.patchDownload
From 563b2e7b6564c9444afe4e06f6f99c36a3e8403d Mon Sep 17 00:00:00 2001
From: Masahiko Sawada <sawada.mshk@gmail.com>
Date: Mon, 19 Mar 2018 19:43:44 +0900
Subject: [PATCH 2/2] test module.
---
src/test/modules/Makefile | 1 +
src/test/modules/test_subscription/Makefile | 18 +++++
src/test/modules/test_subscription/TAGS | 1 +
.../modules/test_subscription/testsub1--1.0.sql | 22 ++++++
src/test/modules/test_subscription/testsub1.c | 42 ++++++++++++
.../modules/test_subscription/testsub1.control | 4 +
.../modules/test_subscription/testsub2--1.0.sql | 22 ++++++
src/test/modules/test_subscription/testsub2.c | 45 ++++++++++++
.../modules/test_subscription/testsub2.control | 4 +
src/test/subscription/Makefile | 2 +-
src/test/subscription/t/010_typemap.pl | 71 ++++++++++++++++++++
11 files changed, 231 insertions(+), 1 deletions(-)
create mode 100644 src/test/modules/test_subscription/Makefile
create mode 120000 src/test/modules/test_subscription/TAGS
create mode 100644 src/test/modules/test_subscription/testsub1--1.0.sql
create mode 100644 src/test/modules/test_subscription/testsub1.c
create mode 100644 src/test/modules/test_subscription/testsub1.control
create mode 100644 src/test/modules/test_subscription/testsub2--1.0.sql
create mode 100644 src/test/modules/test_subscription/testsub2.c
create mode 100644 src/test/modules/test_subscription/testsub2.control
create mode 100644 src/test/subscription/t/010_typemap.pl
diff --git a/src/test/modules/Makefile b/src/test/modules/Makefile
index 7294b69..a455634 100644
--- a/src/test/modules/Makefile
+++ b/src/test/modules/Makefile
@@ -17,6 +17,7 @@ SUBDIRS = \
test_rbtree \
test_rls_hooks \
test_shm_mq \
+ test_subscription \
worker_spi
all: submake-generated-headers
diff --git a/src/test/modules/test_subscription/Makefile b/src/test/modules/test_subscription/Makefile
new file mode 100644
index 0000000..12993e0
--- /dev/null
+++ b/src/test/modules/test_subscription/Makefile
@@ -0,0 +1,18 @@
+# src/test/modules/test_subscription/Makefile
+
+MODULES = testsub1 testsub2
+EXTENSION = testsub1 testsub2
+DATA = testsub1--1.0.sql testsub2--1.0.sql
+
+PGFILEDESC = "test_subscription - mock extensions for subscription test"
+
+ifdef USE_PGXS
+PG_CONFIG = pg_config
+PGXS := $(shell $(PG_CONFIG) --pgxs)
+include $(PGXS)
+else
+subdir = src/test/modules/test_subscription
+top_builddir = ../../../..
+include $(top_builddir)/src/Makefile.global
+include $(top_srcdir)/contrib/contrib-global.mk
+endif
diff --git a/src/test/modules/test_subscription/TAGS b/src/test/modules/test_subscription/TAGS
new file mode 120000
index 0000000..1a96393
--- /dev/null
+++ b/src/test/modules/test_subscription/TAGS
@@ -0,0 +1 @@
+/home/masahiko/source/postgresql/TAGS
\ No newline at end of file
diff --git a/src/test/modules/test_subscription/testsub1--1.0.sql b/src/test/modules/test_subscription/testsub1--1.0.sql
new file mode 100644
index 0000000..43ab718
--- /dev/null
+++ b/src/test/modules/test_subscription/testsub1--1.0.sql
@@ -0,0 +1,22 @@
+/* src/test/modules/test_subscription/testsub1--1.0.sql */
+
+-- complain if script is sourced in psql, rather than via CREATE EXTENSION
+\echo Use "CREATE EXTENSION testsub1" to load this file. \quit
+
+CREATE TYPE dummytext;
+
+CREATE FUNCTION dummytext_in(cstring)
+RETURNS dummytext
+AS 'MODULE_PATHNAME'
+LANGUAGE C IMMUTABLE STRICT PARALLEL SAFE;
+
+CREATE FUNCTION dummytext_out(dummytext)
+RETURNS cstring
+AS 'MODULE_PATHNAME'
+LANGUAGE C IMMUTABLE STRICT PARALLEL SAFE;
+
+CREATE TYPE dummytext (
+ INTERNALLENGTH = -1,
+ INPUT = dummytext_in,
+ OUTPUT = dummytext_out
+);
diff --git a/src/test/modules/test_subscription/testsub1.c b/src/test/modules/test_subscription/testsub1.c
new file mode 100644
index 0000000..e94fbe7
--- /dev/null
+++ b/src/test/modules/test_subscription/testsub1.c
@@ -0,0 +1,42 @@
+/*--------------------------------------------------------------------------
+ *
+ * testsub1.c
+ * Code for testing logical replication subscriptions.
+ *
+ * Copyright (c) 2018, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ * src/test/modules/test_subscription/testsub1.c
+ *
+ * -------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "fmgr.h"
+#include "utils/builtins.h"
+
+PG_MODULE_MAGIC;
+
+PG_FUNCTION_INFO_V1(dummytext_in);
+PG_FUNCTION_INFO_V1(dummytext_out);
+
+/* Dummy input of data type function */
+Datum
+dummytext_in(PG_FUNCTION_ARGS)
+{
+ char *inputText = PG_GETARG_CSTRING(0);
+
+ if (inputText)
+ elog(LOG, "input text: \"%s\"", inputText);
+
+ PG_RETURN_TEXT_P(cstring_to_text(inputText));
+}
+
+/* Dummy output of data type function */
+Datum
+dummytext_out(PG_FUNCTION_ARGS)
+{
+ Datum txt = PG_GETARG_DATUM(0);
+
+ PG_RETURN_CSTRING(TextDatumGetCString(txt));
+}
diff --git a/src/test/modules/test_subscription/testsub1.control b/src/test/modules/test_subscription/testsub1.control
new file mode 100644
index 0000000..746bba3
--- /dev/null
+++ b/src/test/modules/test_subscription/testsub1.control
@@ -0,0 +1,4 @@
+comment = 'extension testsub1 to test subscriptions'
+default_version = '1.0'
+module_pathname = '$libdir/testsub1'
+relocatable = true
diff --git a/src/test/modules/test_subscription/testsub2--1.0.sql b/src/test/modules/test_subscription/testsub2--1.0.sql
new file mode 100644
index 0000000..4c94dd3
--- /dev/null
+++ b/src/test/modules/test_subscription/testsub2--1.0.sql
@@ -0,0 +1,22 @@
+/* src/test/modules/test_subscription/testsub2--1.0.sql */
+
+-- complain if script is sourced in psql, rather than via CREATE EXTENSION
+\echo Use "CREATE EXTENSION testsub2" to load this file. \quit
+
+CREATE TYPE dummyint;
+
+CREATE FUNCTION dummyint_in(cstring)
+RETURNS dummyint
+AS 'MODULE_PATHNAME'
+LANGUAGE C IMMUTABLE STRICT PARALLEL SAFE;
+
+CREATE FUNCTION dummyint_out(dummyint)
+RETURNS cstring
+AS 'MODULE_PATHNAME'
+LANGUAGE C IMMUTABLE STRICT PARALLEL SAFE;
+
+CREATE TYPE dummyint (
+ LIKE = pg_catalog.int4,
+ INPUT = dummyint_in,
+ OUTPUT = dummyint_out
+);
diff --git a/src/test/modules/test_subscription/testsub2.c b/src/test/modules/test_subscription/testsub2.c
new file mode 100644
index 0000000..d1a91b6
--- /dev/null
+++ b/src/test/modules/test_subscription/testsub2.c
@@ -0,0 +1,45 @@
+/*--------------------------------------------------------------------------
+ *
+ * testsub2.c
+ * Code for testing logical replication subscriptions.
+ *
+ * Copyright (c) 2018, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ * src/test/modules/test_subscription/testsub2.c
+ *
+ * -------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "fmgr.h"
+#include "utils/builtins.h"
+
+PG_MODULE_MAGIC;
+
+PG_FUNCTION_INFO_V1(dummyint_in);
+PG_FUNCTION_INFO_V1(dummyint_out);
+
+/* Dummy input of data type function */
+Datum
+dummyint_in(PG_FUNCTION_ARGS)
+{
+ char *num = PG_GETARG_CSTRING(0);
+ int32 val;
+
+ val = pg_atoi(num, sizeof(int32), '\0');
+ elog(LOG, "input int: %d", val);
+
+ PG_RETURN_INT32(val);
+}
+
+/* Dummy output of data type function */
+Datum
+dummyint_out(PG_FUNCTION_ARGS)
+{
+ int32 arg1 = PG_GETARG_INT32(0);
+ char *result = (char *) palloc(12); /* sign, 10 digits, '\0' */
+
+ pg_ltoa(arg1, result);
+ PG_RETURN_CSTRING(result);
+}
diff --git a/src/test/modules/test_subscription/testsub2.control b/src/test/modules/test_subscription/testsub2.control
new file mode 100644
index 0000000..8a0341c
--- /dev/null
+++ b/src/test/modules/test_subscription/testsub2.control
@@ -0,0 +1,4 @@
+comment = 'extension testsub2 to test subscriptions'
+default_version = '1.0'
+module_pathname = '$libdir/testsub2'
+relocatable = true
diff --git a/src/test/subscription/Makefile b/src/test/subscription/Makefile
index 0f3d209..f5e76bb 100644
--- a/src/test/subscription/Makefile
+++ b/src/test/subscription/Makefile
@@ -13,7 +13,7 @@ subdir = src/test/subscription
top_builddir = ../../..
include $(top_builddir)/src/Makefile.global
-EXTRA_INSTALL = contrib/hstore
+EXTRA_INSTALL = contrib/hstore src/test/modules/test_subscription
check:
$(prove_check)
diff --git a/src/test/subscription/t/010_typemap.pl b/src/test/subscription/t/010_typemap.pl
new file mode 100644
index 0000000..f893622
--- /dev/null
+++ b/src/test/subscription/t/010_typemap.pl
@@ -0,0 +1,71 @@
+# This tests that the errors when data type conversion are correctly
+# handled by logical replication apply workers
+use strict;
+use warnings;
+
+use PostgresNode;
+use TestLib;
+use Test::More tests => 3;
+
+# Initialize publisher node
+my $node_publisher = get_new_node('publisher');
+$node_publisher->init(allows_streaming => 'logical');
+$node_publisher->start;
+
+# Create subscriber node
+my $node_subscriber = get_new_node('subscriber');
+$node_subscriber->init(allows_streaming => 'logical');
+$node_subscriber->start;
+
+# Setup same table by different steps so that publisher and subscriber get
+# different datatype OIDs.
+$node_publisher->safe_psql('postgres', qq[
+CREATE EXTENSION testsub2;
+CREATE EXTENSION testsub1;
+CREATE TABLE test (a dummytext, b dummyint);]);
+
+$node_subscriber->safe_psql('postgres', qq[
+CREATE EXTENSION testsub1;
+CREATE EXTENSION testsub2;
+CREATE TABLE test (b dummyint, a dummytext);]);
+
+# Setup logical replication
+my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+$node_publisher->safe_psql('postgres', qq[
+CREATE PUBLICATION tap_pub FOR TABLE test
+]);
+my $appname = 'tap_sub';
+$node_subscriber->safe_psql('postgres', qq[
+CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub WITH (slot_name = tap_sub_slot, copy_data = false)
+]);
+
+# Truncate the logfile on subscriber before insertion in
+# order to capture logs emitted by the callback function
+# for the datatype conversion.
+truncate $node_subscriber->logfile, 0;
+
+# Insert test data, which will lead to call the callback function for the data
+# type conversion on subscriber.
+$node_publisher->safe_psql('postgres', qq(
+INSERT INTO test VALUES ('one', '1');
+));
+
+$node_publisher->wait_for_catchup($appname);
+
+# Check the callback function behavior for datatype conversion
+# by checking the logs.
+my $log = TestLib::slurp_file($node_subscriber->logfile);
+like ($log,
+ qr/processing remote data for replication target relation "public.test" column "a", remote type public.dummytext, local type public.dummytext/,
+ 'callbackfunction of datatype conversion1');
+like ($log,
+ qr/processing remote data for replication target relation "public.test" column "b", remote type public.dummyint, local type public.dummyint/,
+ 'callbackfunction of datatype conversion1');
+
+# Check the data on subscriber
+my $result = $node_subscriber->safe_psql('postgres', qq(
+SELECT a FROM test;
+));
+
+# Inserted data is replicated correctly
+is( $result, 'one');
--
1.7.1
On Mon, Mar 19, 2018 at 7:57 PM, Masahiko Sawada <sawada.mshk@gmail.com> wrote:
On Mon, Mar 19, 2018 at 12:50 PM, Masahiko Sawada <sawada.mshk@gmail.com> wrote:
On Fri, Mar 16, 2018 at 10:24 AM, Alvaro Herrera
<alvherre@alvh.no-ip.org> wrote:Masahiko Sawada wrote:
On Thu, Mar 15, 2018 at 9:41 AM, Alvaro Herrera <alvherre@alvh.no-ip.org> wrote:
I think this is a worthwhile test, but IMO it should be improved a bit
before we include it. Also, we can come up with a better name for the
test surely, not just refer to this particular bug. Maybe "typemap".It might be useful if we have the facility of TAP test to check the
log message and regexp-match the message to the expected string.Something similar to PostgresNode::issues_sql_like() perhaps?
Yeah, I didn't know that but I think it's a good idea. Unlike
issues_sql_like() we don't issue anything to the subscriber. So maybe
we need truncate logfile before insertion and verify logfile of
particular period. The test code would be like follows.$node_subscriber->safe_psql('postgres', 'CREATE SUBSCRIPTION...");
truncate $node_subscriber->logfile, 0;
$node_publisher->safe_psql('postgres', 'INSERT .. ')
my $log = TestLib::slurp_file($node_subscriber->logfile);# Verify logs
like($log, qr/processing remote data for replication target relation
"public.test" column "b", remote type dummyint, local type dummyint/,
'callback function of datatype conversion1');
like($log, qr/processing remote data for replication target relation
"public.test" column "a", remote type dummytext, local type
dummytext/, 'callback function of datatype conversion2');Thoughts?
Attached an updated test patch added the above test(0002 patch). Since
for this test case it's enough to use existing test functions I didn't
create new test functions. Also I found that the local data type name
in log for data type conversion isn't qualified whereas the remote
data type is always qualified. Attached 0001 patch fixes that.
The original issue has been fixed and this entry on CommitFest has
been marked as "Committed" but there are still works for improving
testing. Perhaps I should register a new entry of remaining patches to
next CommitFest.
Regards,
--
Masahiko Sawada
NIPPON TELEGRAPH AND TELEPHONE CORPORATION
NTT Open Source Software Center
2018-04-11 10:16 GMT+09:00 Masahiko Sawada <sawada.mshk@gmail.com>:
On Mon, Mar 19, 2018 at 7:57 PM, Masahiko Sawada <sawada.mshk@gmail.com>
wrote:Attached an updated test patch added the above test(0002 patch). Since
for this test case it's enough to use existing test functions I didn't
create new test functions. Also I found that the local data type name
in log for data type conversion isn't qualified whereas the remote
data type is always qualified. Attached 0001 patch fixes that.The original issue has been fixed and this entry on CommitFest has
been marked as "Committed" but there are still works for improving
testing. Perhaps I should register a new entry of remaining patches to
next CommitFest.
Thanks. I appreciate it.
---
Dang Minh Huong