From e3af51822d1318743e554e24163390b74b254751 Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horiguchi.kyotaro@lab.ntt.co.jp>
Date: Thu, 2 Feb 2017 09:05:20 +0900
Subject: [PATCH] Enable logical-replication between databases with different
 encodings

Different from physical replication, logical replication may run
between databases with different encodings. This patch makes
subscriber set client_encoding to database encoding and publisher
follow it.
---
 .../libpqwalreceiver/libpqwalreceiver.c            | 35 +++++++++++++++++++++-
 src/backend/replication/logical/proto.c            | 17 +++++++++++
 2 files changed, 51 insertions(+), 1 deletion(-)

diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
index 44a89c7..ef38af7 100644
--- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
+++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
@@ -24,6 +24,7 @@
 #include "access/xlog.h"
 #include "miscadmin.h"
 #include "pgstat.h"
+#include "mb/pg_wchar.h"
 #include "replication/logicalproto.h"
 #include "replication/walreceiver.h"
 #include "storage/proc.h"
@@ -112,11 +113,43 @@ libpqrcv_connect(const char *conninfo, bool logical, const char *appname,
 				 char **err)
 {
 	WalReceiverConn *conn;
+	const char *myconninfo = conninfo;
 	const char *keys[5];
 	const char *vals[5];
 	int			i = 0;
 
 	/*
+	 * For logical replication, set database encoding as client_encoding if
+	 * not specified in conninfo
+	 */
+	if (logical)
+	{
+		PQconninfoOption   *opts = NULL;
+
+		opts = PQconninfoParse(conninfo, NULL);
+
+		if (opts != NULL)
+		{
+			while (opts->keyword != NULL &&
+				   strcmp(opts->keyword, "client_encoding") != 0)
+				opts++;
+
+			/* add client_encoding to conninfo if not set */
+			if (opts->keyword == NULL || opts->val == NULL)
+			{
+				StringInfoData s;
+
+				/* Assuming that the memory context here is properly set */
+				initStringInfo(&s);
+				appendStringInfoString(&s, conninfo);
+				appendStringInfo(&s, " client_encoding=\"%s\"",
+								 GetDatabaseEncodingName());
+				myconninfo = s.data;
+			}
+		}
+	}
+
+	/*
 	 * We use the expand_dbname parameter to process the connection string (or
 	 * URI), and pass some extra options. The deliberately undocumented
 	 * parameter "replication=true" makes it a replication connection. The
@@ -124,7 +157,7 @@ libpqrcv_connect(const char *conninfo, bool logical, const char *appname,
 	 * "replication" for .pgpass lookup.
 	 */
 	keys[i] = "dbname";
-	vals[i] = conninfo;
+	vals[i] = myconninfo;
 	keys[++i] = "replication";
 	vals[i] = logical ? "database" : "true";
 	if (!logical)
diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c
index 1f30de6..97f928e 100644
--- a/src/backend/replication/logical/proto.c
+++ b/src/backend/replication/logical/proto.c
@@ -16,6 +16,7 @@
 #include "catalog/pg_namespace.h"
 #include "catalog/pg_type.h"
 #include "libpq/pqformat.h"
+#include "mb/pg_wchar.h"
 #include "replication/logicalproto.h"
 #include "utils/builtins.h"
 #include "utils/lsyscache.h"
@@ -442,6 +443,22 @@ logicalrep_write_tuple(StringInfo out, Relation rel, HeapTuple tuple)
 		pq_sendbyte(out, 't');	/* 'text' data follows */
 
 		outputstr = OidOutputFunctionCall(typclass->typoutput, values[i]);
+		if (pg_get_client_encoding() != GetDatabaseEncoding())
+		{
+			char *p;
+
+			p = pg_server_to_client(outputstr, strlen(outputstr));
+
+			/* Error out if failed */
+			if (!p)
+				ereport(ERROR,
+						(errcode(ERRCODE_UNTRANSLATABLE_CHARACTER),
+						 errmsg("character conversion failed")));
+
+			pfree(outputstr);
+			outputstr = p;
+		}
+
 		len = strlen(outputstr) + 1;	/* null terminated */
 		pq_sendint(out, len, 4);		/* length */
 		appendBinaryStringInfo(out, outputstr, len); /* data */
-- 
2.9.2

