From 422114a0bc1d928d257505bf31e99397cb8a6a8c Mon Sep 17 00:00:00 2001
From: Jeff Davis <jeff@j-davis.com>
Date: Wed, 23 Aug 2023 10:31:16 -0700
Subject: [PATCH v1] CREATE SUBSCRIPTION ... SERVER.

---
 contrib/dblink/dblink.c                       |  17 +-
 contrib/dblink/expected/dblink.out            |  58 ++++-
 contrib/dblink/sql/dblink.sql                 |  34 ++-
 .../postgres_fdw/expected/postgres_fdw.out    |   4 +-
 doc/src/sgml/dblink.sgml                      |  11 +-
 doc/src/sgml/ref/alter_subscription.sgml      |  18 +-
 doc/src/sgml/ref/create_server.sgml           |  68 +++++-
 doc/src/sgml/ref/create_subscription.sgml     |  17 +-
 doc/src/sgml/ref/create_user_mapping.sgml     |  75 ++++++
 doc/src/sgml/user-manag.sgml                  |  21 +-
 src/backend/catalog/pg_subscription.c         |  17 +-
 src/backend/catalog/system_functions.sql      |   2 +
 src/backend/commands/foreigncmds.c            | 117 +++++++--
 src/backend/commands/subscriptioncmds.c       | 207 ++++++++++++++--
 src/backend/foreign/foreign.c                 | 224 +++++++++++++++++-
 src/backend/parser/gram.y                     |  46 ++++
 src/backend/replication/logical/worker.c      |  12 +-
 src/bin/pg_dump/pg_dump.c                     |  75 ++++--
 src/bin/pg_dump/pg_dump.h                     |   1 +
 src/bin/psql/tab-complete.c                   |   5 +-
 src/include/catalog/pg_authid.dat             |   5 +
 src/include/catalog/pg_proc.dat               |   4 +
 src/include/catalog/pg_subscription.h         |   5 +-
 src/include/foreign/foreign.h                 |   1 +
 src/include/nodes/parsenodes.h                |   4 +
 src/test/regress/expected/foreign_data.out    |  42 ++++
 src/test/regress/expected/subscription.out    |  38 +++
 src/test/regress/sql/foreign_data.sql         |  34 +++
 src/test/regress/sql/subscription.sql         |  39 +++
 src/test/subscription/t/001_rep_changes.pl    |  57 +++++
 30 files changed, 1139 insertions(+), 119 deletions(-)

diff --git a/contrib/dblink/dblink.c b/contrib/dblink/dblink.c
index 41e1f6c91d..85263f3de6 100644
--- a/contrib/dblink/dblink.c
+++ b/contrib/dblink/dblink.c
@@ -2778,7 +2778,6 @@ get_connect_string(const char *servername)
 	UserMapping *user_mapping;
 	ListCell   *cell;
 	StringInfoData buf;
-	ForeignDataWrapper *fdw;
 	AclResult	aclresult;
 	char	   *srvname;
 
@@ -2815,20 +2814,24 @@ get_connect_string(const char *servername)
 		Oid			userid = GetUserId();
 
 		user_mapping = GetUserMapping(userid, serverid);
-		fdw = GetForeignDataWrapper(fdwid);
 
 		/* Check permissions, user must have usage on the server. */
 		aclresult = object_aclcheck(ForeignServerRelationId, serverid, userid, ACL_USAGE);
 		if (aclresult != ACLCHECK_OK)
 			aclcheck_error(aclresult, OBJECT_FOREIGN_SERVER, foreign_server->servername);
 
-		foreach(cell, fdw->options)
+		if (OidIsValid(fdwid))
 		{
-			DefElem    *def = lfirst(cell);
+			ForeignDataWrapper *fdw = GetForeignDataWrapper(fdwid);
 
-			if (is_valid_dblink_option(options, def->defname, ForeignDataWrapperRelationId))
-				appendStringInfo(&buf, "%s='%s' ", def->defname,
-								 escape_param_str(strVal(def->arg)));
+			foreach(cell, fdw->options)
+			{
+				DefElem    *def = lfirst(cell);
+
+				if (is_valid_dblink_option(options, def->defname, ForeignDataWrapperRelationId))
+					appendStringInfo(&buf, "%s='%s' ", def->defname,
+									 escape_param_str(strVal(def->arg)));
+			}
 		}
 
 		foreach(cell, foreign_server->options)
diff --git a/contrib/dblink/expected/dblink.out b/contrib/dblink/expected/dblink.out
index 7809f58d96..25127995c4 100644
--- a/contrib/dblink/expected/dblink.out
+++ b/contrib/dblink/expected/dblink.out
@@ -888,31 +888,66 @@ SELECT dblink_disconnect('dtest1');
 CREATE ROLE regress_dblink_user;
 DO $d$
     BEGIN
-        EXECUTE $$CREATE SERVER fdtest FOREIGN DATA WRAPPER dblink_fdw
+        EXECUTE $$CREATE SERVER fdtest_fco FOR CONNECTION ONLY
+            OPTIONS (dbname '$$||current_database()||$$',
+                     port '$$||current_setting('port')||$$'
+            )$$;
+        EXECUTE $$CREATE SERVER fdtest_fdw FOREIGN DATA WRAPPER dblink_fdw
             OPTIONS (dbname '$$||current_database()||$$',
                      port '$$||current_setting('port')||$$'
             )$$;
     END;
 $d$;
-CREATE USER MAPPING FOR public SERVER fdtest
+CREATE USER MAPPING FOR public SERVER fdtest_fco
+  OPTIONS (server 'localhost');  -- fail, can't specify server here
+ERROR:  invalid user mapping option "server"
+CREATE USER MAPPING FOR public SERVER fdtest_fdw
   OPTIONS (server 'localhost');  -- fail, can't specify server here
 ERROR:  invalid option "server"
-CREATE USER MAPPING FOR public SERVER fdtest OPTIONS (user :'USER');
-GRANT USAGE ON FOREIGN SERVER fdtest TO regress_dblink_user;
+CREATE USER MAPPING FOR public SERVER fdtest_fco OPTIONS (user :'USER', password 'nonsense');
+CREATE USER MAPPING FOR public SERVER fdtest_fdw OPTIONS (user :'USER');
+GRANT USAGE ON FOREIGN SERVER fdtest_fco TO regress_dblink_user;
+GRANT USAGE ON FOREIGN SERVER fdtest_fdw TO regress_dblink_user;
 GRANT EXECUTE ON FUNCTION dblink_connect_u(text, text) TO regress_dblink_user;
 SET SESSION AUTHORIZATION regress_dblink_user;
 -- should fail
-SELECT dblink_connect('myconn', 'fdtest');
+SELECT dblink_connect('myconn1', 'fdtest_fco');
+ERROR:  password or GSSAPI delegated credentials required
+DETAIL:  Non-superusers may only connect using credentials they provide, eg: password in connection string or delegated GSSAPI credentials
+HINT:  Ensure provided credentials match target server's authentication method.
+SELECT dblink_connect('myconn2', 'fdtest_fdw');
 ERROR:  password or GSSAPI delegated credentials required
 DETAIL:  Non-superusers must provide a password in the connection string or send delegated GSSAPI credentials.
 -- should succeed
-SELECT dblink_connect_u('myconn', 'fdtest');
+SELECT dblink_connect_u('myconn1', 'fdtest_fco');
  dblink_connect_u 
 ------------------
  OK
 (1 row)
 
-SELECT * FROM dblink('myconn','SELECT * FROM foo') AS t(a int, b text, c text[]);
+SELECT dblink_connect_u('myconn2', 'fdtest_fdw');
+ dblink_connect_u 
+------------------
+ OK
+(1 row)
+
+SELECT * FROM dblink('myconn1','SELECT * FROM foo') AS t(a int, b text, c text[]);
+ a  | b |       c       
+----+---+---------------
+  0 | a | {a0,b0,c0}
+  1 | b | {a1,b1,c1}
+  2 | c | {a2,b2,c2}
+  3 | d | {a3,b3,c3}
+  4 | e | {a4,b4,c4}
+  5 | f | {a5,b5,c5}
+  6 | g | {a6,b6,c6}
+  7 | h | {a7,b7,c7}
+  8 | i | {a8,b8,c8}
+  9 | j | {a9,b9,c9}
+ 10 | k | {a10,b10,c10}
+(11 rows)
+
+SELECT * FROM dblink('myconn2','SELECT * FROM foo') AS t(a int, b text, c text[]);
  a  | b |       c       
 ----+---+---------------
   0 | a | {a0,b0,c0}
@@ -929,11 +964,14 @@ SELECT * FROM dblink('myconn','SELECT * FROM foo') AS t(a int, b text, c text[])
 (11 rows)
 
 \c - -
-REVOKE USAGE ON FOREIGN SERVER fdtest FROM regress_dblink_user;
+REVOKE USAGE ON FOREIGN SERVER fdtest_fco FROM regress_dblink_user;
+REVOKE USAGE ON FOREIGN SERVER fdtest_fdw FROM regress_dblink_user;
 REVOKE EXECUTE ON FUNCTION dblink_connect_u(text, text) FROM regress_dblink_user;
 DROP USER regress_dblink_user;
-DROP USER MAPPING FOR public SERVER fdtest;
-DROP SERVER fdtest;
+DROP USER MAPPING FOR public SERVER fdtest_fco;
+DROP USER MAPPING FOR public SERVER fdtest_fdw;
+DROP SERVER fdtest_fco;
+DROP SERVER fdtest_fdw;
 -- should fail
 ALTER FOREIGN DATA WRAPPER dblink_fdw OPTIONS (nonexistent 'fdw');
 ERROR:  invalid option "nonexistent"
diff --git a/contrib/dblink/sql/dblink.sql b/contrib/dblink/sql/dblink.sql
index 7870ce5d5a..cb8c11a20a 100644
--- a/contrib/dblink/sql/dblink.sql
+++ b/contrib/dblink/sql/dblink.sql
@@ -458,33 +458,47 @@ SELECT dblink_disconnect('dtest1');
 CREATE ROLE regress_dblink_user;
 DO $d$
     BEGIN
-        EXECUTE $$CREATE SERVER fdtest FOREIGN DATA WRAPPER dblink_fdw
+        EXECUTE $$CREATE SERVER fdtest_fco FOR CONNECTION ONLY
+            OPTIONS (dbname '$$||current_database()||$$',
+                     port '$$||current_setting('port')||$$'
+            )$$;
+        EXECUTE $$CREATE SERVER fdtest_fdw FOREIGN DATA WRAPPER dblink_fdw
             OPTIONS (dbname '$$||current_database()||$$',
                      port '$$||current_setting('port')||$$'
             )$$;
     END;
 $d$;
 
-CREATE USER MAPPING FOR public SERVER fdtest
+CREATE USER MAPPING FOR public SERVER fdtest_fco
+  OPTIONS (server 'localhost');  -- fail, can't specify server here
+CREATE USER MAPPING FOR public SERVER fdtest_fdw
   OPTIONS (server 'localhost');  -- fail, can't specify server here
-CREATE USER MAPPING FOR public SERVER fdtest OPTIONS (user :'USER');
+CREATE USER MAPPING FOR public SERVER fdtest_fco OPTIONS (user :'USER', password 'nonsense');
+CREATE USER MAPPING FOR public SERVER fdtest_fdw OPTIONS (user :'USER');
 
-GRANT USAGE ON FOREIGN SERVER fdtest TO regress_dblink_user;
+GRANT USAGE ON FOREIGN SERVER fdtest_fco TO regress_dblink_user;
+GRANT USAGE ON FOREIGN SERVER fdtest_fdw TO regress_dblink_user;
 GRANT EXECUTE ON FUNCTION dblink_connect_u(text, text) TO regress_dblink_user;
 
 SET SESSION AUTHORIZATION regress_dblink_user;
 -- should fail
-SELECT dblink_connect('myconn', 'fdtest');
+SELECT dblink_connect('myconn1', 'fdtest_fco');
+SELECT dblink_connect('myconn2', 'fdtest_fdw');
 -- should succeed
-SELECT dblink_connect_u('myconn', 'fdtest');
-SELECT * FROM dblink('myconn','SELECT * FROM foo') AS t(a int, b text, c text[]);
+SELECT dblink_connect_u('myconn1', 'fdtest_fco');
+SELECT dblink_connect_u('myconn2', 'fdtest_fdw');
+SELECT * FROM dblink('myconn1','SELECT * FROM foo') AS t(a int, b text, c text[]);
+SELECT * FROM dblink('myconn2','SELECT * FROM foo') AS t(a int, b text, c text[]);
 
 \c - -
-REVOKE USAGE ON FOREIGN SERVER fdtest FROM regress_dblink_user;
+REVOKE USAGE ON FOREIGN SERVER fdtest_fco FROM regress_dblink_user;
+REVOKE USAGE ON FOREIGN SERVER fdtest_fdw FROM regress_dblink_user;
 REVOKE EXECUTE ON FUNCTION dblink_connect_u(text, text) FROM regress_dblink_user;
 DROP USER regress_dblink_user;
-DROP USER MAPPING FOR public SERVER fdtest;
-DROP SERVER fdtest;
+DROP USER MAPPING FOR public SERVER fdtest_fco;
+DROP USER MAPPING FOR public SERVER fdtest_fdw;
+DROP SERVER fdtest_fco;
+DROP SERVER fdtest_fdw;
 
 -- should fail
 ALTER FOREIGN DATA WRAPPER dblink_fdw OPTIONS (nonexistent 'fdw');
diff --git a/contrib/postgres_fdw/expected/postgres_fdw.out b/contrib/postgres_fdw/expected/postgres_fdw.out
index 77df7eb8e4..0887f445f5 100644
--- a/contrib/postgres_fdw/expected/postgres_fdw.out
+++ b/contrib/postgres_fdw/expected/postgres_fdw.out
@@ -2717,10 +2717,10 @@ ALTER FOREIGN TABLE ft4 OPTIONS (ADD use_remote_estimate 'true');
 -- regress_view_owner_another, the view owner, though it fails as expected
 -- due to the lack of a user mapping for that user.
 EXPLAIN (VERBOSE, COSTS OFF) SELECT * FROM v4;
-ERROR:  user mapping not found for "regress_view_owner_another"
+ERROR:  user mapping not found for server "loopback" and user "regress_view_owner_another"
 -- Likewise, but with the query under an UNION ALL
 EXPLAIN (VERBOSE, COSTS OFF) SELECT * FROM (SELECT * FROM v4 UNION ALL SELECT * FROM v4);
-ERROR:  user mapping not found for "regress_view_owner_another"
+ERROR:  user mapping not found for server "loopback" and user "regress_view_owner_another"
 -- Should not get that error once a user mapping is created
 CREATE USER MAPPING FOR regress_view_owner_another SERVER loopback OPTIONS (password_required 'false');
 EXPLAIN (VERBOSE, COSTS OFF) SELECT * FROM v4;
diff --git a/doc/src/sgml/dblink.sgml b/doc/src/sgml/dblink.sgml
index 7d25f24f49..a34d7e60c4 100644
--- a/doc/src/sgml/dblink.sgml
+++ b/doc/src/sgml/dblink.sgml
@@ -54,12 +54,11 @@ dblink_connect(text connname, text connstr) returns text
    </para>
 
    <para>
-    The connection string may also be the name of an existing foreign
-    server.  It is recommended to use the foreign-data wrapper
-    <literal>dblink_fdw</literal> when defining the foreign
-    server.  See the example below, as well as
-    <xref linkend="sql-createserver"/> and
-    <xref linkend="sql-createusermapping"/>.
+    The connection string may also be the name of an existing foreign server.
+    It is recommended to use the foreign-data wrapper
+    <literal>dblink_fdw</literal> or <literal>FOR CONNECTION ONLY</literal>
+    when defining the foreign server.  See the example below, as well as <xref
+    linkend="sql-createserver"/> and <xref linkend="sql-createusermapping"/>.
    </para>
 
   </refsect1>
diff --git a/doc/src/sgml/ref/alter_subscription.sgml b/doc/src/sgml/ref/alter_subscription.sgml
index a85e04e4d6..8f3d13d1aa 100644
--- a/doc/src/sgml/ref/alter_subscription.sgml
+++ b/doc/src/sgml/ref/alter_subscription.sgml
@@ -21,6 +21,7 @@ PostgreSQL documentation
 
  <refsynopsisdiv>
 <synopsis>
+ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> SERVER <replaceable>servername</replaceable>
 ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> CONNECTION '<replaceable>conninfo</replaceable>'
 ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> SET PUBLICATION <replaceable class="parameter">publication_name</replaceable> [, ...] [ WITH ( <replaceable class="parameter">publication_option</replaceable> [= <replaceable class="parameter">value</replaceable>] [, ... ] ) ]
 ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> ADD PUBLICATION <replaceable class="parameter">publication_name</replaceable> [, ...] [ WITH ( <replaceable class="parameter">publication_option</replaceable> [= <replaceable class="parameter">value</replaceable>] [, ... ] ) ]
@@ -94,13 +95,24 @@ ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> RENAME TO <
     </listitem>
    </varlistentry>
 
+   <varlistentry>
+    <term><literal>SERVER <replaceable class="parameter">servername</replaceable></literal></term>
+    <listitem>
+     <para>
+      This clause replaces the foreign server or connection string originally
+      set by <xref linkend="sql-createsubscription"/> with the foreign server
+      <replaceable>servername</replaceable>.
+     </para>
+    </listitem>
+   </varlistentry>
+
    <varlistentry>
     <term><literal>CONNECTION '<replaceable class="parameter">conninfo</replaceable>'</literal></term>
     <listitem>
      <para>
-      This clause replaces the connection string originally set by
-      <xref linkend="sql-createsubscription"/>.  See there for more
-      information.
+      This clause replaces the foreign server or connection string originally
+      set by <xref linkend="sql-createsubscription"/> with the connection
+      string <replaceable>conninfo</replaceable>.
      </para>
     </listitem>
    </varlistentry>
diff --git a/doc/src/sgml/ref/create_server.sgml b/doc/src/sgml/ref/create_server.sgml
index af0a7a06fd..e5834d1e39 100644
--- a/doc/src/sgml/ref/create_server.sgml
+++ b/doc/src/sgml/ref/create_server.sgml
@@ -22,7 +22,7 @@ PostgreSQL documentation
  <refsynopsisdiv>
 <synopsis>
 CREATE SERVER [ IF NOT EXISTS ] <replaceable class="parameter">server_name</replaceable> [ TYPE '<replaceable class="parameter">server_type</replaceable>' ] [ VERSION '<replaceable class="parameter">server_version</replaceable>' ]
-    FOREIGN DATA WRAPPER <replaceable class="parameter">fdw_name</replaceable>
+    { FOR CONNECTION ONLY | FOREIGN DATA WRAPPER <replaceable class="parameter">fdw_name</replaceable> }
     [ OPTIONS ( <replaceable class="parameter">option</replaceable> '<replaceable class="parameter">value</replaceable>' [, ... ] ) ]
 </synopsis>
  </refsynopsisdiv>
@@ -57,6 +57,22 @@ CREATE SERVER [ IF NOT EXISTS ] <replaceable class="parameter">server_name</repl
 
   <variablelist>
   <varlistentry>
+    <term><literal>FOR CONNECTION ONLY</literal></term>
+    <listitem>
+     <para>
+      Create a foreign server that can be used by <xref
+      linkend="sql-createsubscription"/>, or for other purposes that need only
+      the PostgreSQL connection information, such as <xref
+      linkend="dblink"/>. This foreign server may not be used by a foreign
+      table.
+     </para>
+     <para>
+      See <xref linkend="server-connection-only-options"/> for details.
+     </para>
+    </listitem>
+   </varlistentry>
+
+   <varlistentry>
     <term><literal>IF NOT EXISTS</literal></term>
     <listitem>
      <para>
@@ -113,6 +129,10 @@ CREATE SERVER [ IF NOT EXISTS ] <replaceable class="parameter">server_name</repl
       actual names and values are dependent on the server's
       foreign-data wrapper.
      </para>
+     <para>
+      If <literal>FOR CONNECTION ONLY</literal> is specified, see <xref
+      linkend="server-connection-only-options"/> for available options.
+     </para>
     </listitem>
    </varlistentry>
   </variablelist>
@@ -121,14 +141,44 @@ CREATE SERVER [ IF NOT EXISTS ] <replaceable class="parameter">server_name</repl
  <refsect1>
   <title>Notes</title>
 
-  <para>
-   When using the <xref linkend="dblink"/> module,
-   a foreign server's name can be used
-   as an argument of the <xref linkend="contrib-dblink-connect"/>
-   function to indicate the connection parameters.  It is necessary to have
-   the <literal>USAGE</literal> privilege on the foreign server to be
-   able to use it in this way.
-  </para>
+  <refsect2 id="server-connection-only-options" xreflabel="FOR CONNECTION ONLY Options">
+   <title><literal>FOR CONNECTION ONLY</literal> Options</title>
+
+   <para>
+    A foreign server defined with <literal>FOR CONNECTION ONLY</literal>
+    can have the same options that <application>libpq</application> accepts in
+    connection strings, as described in <xref linkend="libpq-paramkeywords"/>,
+    except that the following options cannot be set:
+
+    <itemizedlist spacing="compact">
+     <listitem>
+      <para>
+       <literal>user</literal>, <literal>password</literal> and
+       <literal>sslpassword</literal> - these must instead be set on the
+       associated user mapping.
+      </para>
+     </listitem>
+     <listitem>
+      <para>
+       <literal>client_encoding</literal> - will always be the same as the
+       database encoding.
+      </para>
+     </listitem>
+    </itemizedlist>
+   </para>
+  </refsect2>
+
+  <refsect2>
+   <title>Use with dblink</title>
+
+   <para>
+    When using the <xref linkend="dblink"/> module, a foreign server's name
+    can be used as an argument of the <xref linkend="contrib-dblink-connect"/>
+    function to indicate the connection parameters.  It is necessary to have
+    the <literal>USAGE</literal> privilege on the foreign server to be able to
+    use it in this way.
+   </para>
+  </refsect2>
  </refsect1>
 
  <refsect1>
diff --git a/doc/src/sgml/ref/create_subscription.sgml b/doc/src/sgml/ref/create_subscription.sgml
index 71652fd918..55b5f629cd 100644
--- a/doc/src/sgml/ref/create_subscription.sgml
+++ b/doc/src/sgml/ref/create_subscription.sgml
@@ -22,7 +22,7 @@ PostgreSQL documentation
  <refsynopsisdiv>
 <synopsis>
 CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceable>
-    CONNECTION '<replaceable class="parameter">conninfo</replaceable>'
+    { SERVER <replaceable class="parameter">servername</replaceable> | CONNECTION '<replaceable class="parameter">conninfo</replaceable>' }
     PUBLICATION <replaceable class="parameter">publication_name</replaceable> [, ...]
     [ WITH ( <replaceable class="parameter">subscription_parameter</replaceable> [= <replaceable class="parameter">value</replaceable>] [, ... ] ) ]
 </synopsis>
@@ -77,6 +77,17 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl
     </listitem>
    </varlistentry>
 
+   <varlistentry id="sql-createsubscription-server">
+    <term><literal>SERVER <replaceable class="parameter">servername</replaceable></literal></term>
+    <listitem>
+     <para>
+      The foreign server to use for connecting to the publisher database,
+      which must have <literal>FOR CONNECTION ONLY</literal> specified. See
+      <xref linkend="sql-createserver"/> for details.
+     </para>
+    </listitem>
+   </varlistentry>
+
    <varlistentry id="sql-createsubscription-connection">
     <term><literal>CONNECTION '<replaceable class="parameter">conninfo</replaceable>'</literal></term>
     <listitem>
@@ -363,6 +374,10 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl
           The default is <literal>true</literal>. Only superusers can set
           this value to <literal>false</literal>.
          </para>
+         <para>
+          Only allowed when <literal>CONNECTION</literal> is
+          specified. Otherwise, see <xref linkend="sql-createusermapping"/>.
+         </para>
         </listitem>
        </varlistentry>
 
diff --git a/doc/src/sgml/ref/create_user_mapping.sgml b/doc/src/sgml/ref/create_user_mapping.sgml
index 55debd5401..0e486890aa 100644
--- a/doc/src/sgml/ref/create_user_mapping.sgml
+++ b/doc/src/sgml/ref/create_user_mapping.sgml
@@ -99,6 +99,81 @@ CREATE USER MAPPING [ IF NOT EXISTS ] FOR { <replaceable class="parameter">user_
   </variablelist>
  </refsect1>
 
+ <refsect1>
+  <title>Notes</title>
+
+  <refsect2 id="usermapping-connection-only-options" xreflabel="FOR CONNECTION ONLY Options">
+   <title><literal>FOR CONNECTION ONLY</literal> Options</title>
+
+   <para>
+    If <replaceable>servername</replaceable> is specified as <literal>FOR
+    CONNECTION ONLY</literal>, the user mapping supports the following options:
+
+    <itemizedlist spacing="compact">
+     <listitem>
+      <para>
+       <literal>user</literal>, <literal>password</literal> and
+       <literal>sslpassword</literal> - these options have the same meaning as
+       described in <xref linkend="libpq-paramkeywords"/>.
+      </para>
+     </listitem>
+     <listitem>
+      <para>
+       <literal>sslkey</literal> and <literal>sslcert</literal> - these have
+       the same meaning as described in as described in <xref
+       linkend="libpq-paramkeywords"/>, and override any settings of the same
+       name in <replaceable>servername</replaceable>.
+      </para>
+     </listitem>
+     <listitem>
+      <para>
+       <literal>password_required</literal> means that this user mapping must
+       specify the <literal>password</literal> option. The
+       <literal>password_required</literal> option defaults to
+       <literal>true</literal> and can only be set to <literal>false</literal>
+       by a superuser.
+      </para>
+     </listitem>
+    </itemizedlist>
+   </para>
+
+   <para>
+    Only superusers may create or modify user mappings with the
+    <literal>sslcert</literal> or <literal>sslkey</literal> settings.
+   </para>
+   <para>
+    Non-superusers may connect to foreign servers using password
+    authentication or with GSSAPI delegated credentials, so specify the
+    <literal>password</literal> option for user mappings belonging to
+    non-superusers where password authentication is required.
+   </para>
+   <para>
+    A superuser may override this check on a per-user-mapping basis by setting
+    the user mapping option <literal>password_required 'false'</literal>, e.g.,
+<programlisting>
+ALTER USER MAPPING FOR some_non_superuser SERVER loopback_nopw
+OPTIONS (ADD password_required 'false');
+</programlisting>
+    To prevent unprivileged users from exploiting the authentication rights
+    of the unix user the postgres server is running as to escalate to superuser
+    rights, only the superuser may set this option on a user mapping.
+    </para>
+    <para>
+    Care is required to ensure that this does not allow the mapped
+    user the ability to connect as superuser to the mapped database per
+    CVE-2007-3278 and CVE-2007-6601. Don't set
+    <literal>password_required=false</literal>
+    on the <literal>public</literal> role. Keep in mind that the mapped
+    user can potentially use any client certificates,
+    <filename>.pgpass</filename>,
+    <filename>.pg_service.conf</filename> etc. in the unix home directory of the
+    system user the postgres server runs as. They can also use any trust
+    relationship granted by authentication modes like <literal>peer</literal>
+    or <literal>ident</literal> authentication.
+   </para>
+  </refsect2>
+ </refsect1>
+
  <refsect1>
   <title>Examples</title>
 
diff --git a/doc/src/sgml/user-manag.sgml b/doc/src/sgml/user-manag.sgml
index 27c1f3d703..e191d759b5 100644
--- a/doc/src/sgml/user-manag.sgml
+++ b/doc/src/sgml/user-manag.sgml
@@ -688,11 +688,20 @@ DROP ROLE doomed_role;
        <entry>Allow use of connection slots reserved via
        <xref linkend="guc-reserved-connections"/>.</entry>
       </row>
+      <row>
+       <entry>pg_create_connection</entry>
+       <entry>Allow users with <literal>CREATE</literal> permission on the
+       database to issue <link linkend="sql-createserver"><command>CREATE
+       SERVER</command></link> if <literal>FOR CONNECTION ONLY</literal> is
+       specified.</entry>
+      </row>
       <row>
        <entry>pg_create_subscription</entry>
        <entry>Allow users with <literal>CREATE</literal> permission on the
-       database to issue
-       <link linkend="sql-createsubscription"><command>CREATE SUBSCRIPTION</command></link>.</entry>
+       database to issue <link
+       linkend="sql-createsubscription"><command>CREATE
+       SUBSCRIPTION</command></link>.  This role is a member of
+       <literal>pg_create_connection</literal>.</entry>
       </row>
      </tbody>
     </tgroup>
@@ -738,6 +747,14 @@ DROP ROLE doomed_role;
   great care should be taken when granting these roles to users.
   </para>
 
+  <para>
+  The <literal>pg_create_subscription</literal> role is a member of
+  <literal>pg_create_connection</literal>. It may be useful to revoke that
+  membership in order to permit roles to create subscriptions only to a
+  foreign server, without allowing them to specify a connection string
+  directly.
+  </para>
+
   <para>
   Care should be taken when granting these roles to ensure they are only used where
   needed and with the understanding that these roles grant access to privileged
diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c
index d07f88ce28..7be6725655 100644
--- a/src/backend/catalog/pg_subscription.c
+++ b/src/backend/catalog/pg_subscription.c
@@ -23,6 +23,7 @@
 #include "catalog/pg_subscription.h"
 #include "catalog/pg_subscription_rel.h"
 #include "catalog/pg_type.h"
+#include "foreign/foreign.h"
 #include "miscadmin.h"
 #include "nodes/makefuncs.h"
 #include "storage/lmgr.h"
@@ -75,10 +76,18 @@ GetSubscription(Oid subid, bool missing_ok)
 	sub->runasowner = subform->subrunasowner;
 
 	/* Get conninfo */
-	datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID,
-								   tup,
-								   Anum_pg_subscription_subconninfo);
-	sub->conninfo = TextDatumGetCString(datum);
+	if (OidIsValid(subform->subserver))
+	{
+		sub->conninfo = ForeignServerConnectionString(subform->subowner,
+													  subform->subserver);
+	}
+	else
+	{
+		datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID,
+									   tup,
+									   Anum_pg_subscription_subconninfo);
+		sub->conninfo = TextDatumGetCString(datum);
+	}
 
 	/* Get slotname */
 	datum = SysCacheGetAttr(SUBSCRIPTIONOID,
diff --git a/src/backend/catalog/system_functions.sql b/src/backend/catalog/system_functions.sql
index 07c0d89c4f..6bebe684a6 100644
--- a/src/backend/catalog/system_functions.sql
+++ b/src/backend/catalog/system_functions.sql
@@ -747,3 +747,5 @@ GRANT pg_read_all_settings TO pg_monitor;
 GRANT pg_read_all_stats TO pg_monitor;
 
 GRANT pg_stat_scan_tables TO pg_monitor;
+
+GRANT pg_create_connection TO pg_create_subscription;
diff --git a/src/backend/commands/foreigncmds.c b/src/backend/commands/foreigncmds.c
index 0ecff545a9..41e12c3a2d 100644
--- a/src/backend/commands/foreigncmds.c
+++ b/src/backend/commands/foreigncmds.c
@@ -21,6 +21,7 @@
 #include "catalog/dependency.h"
 #include "catalog/indexing.h"
 #include "catalog/objectaccess.h"
+#include "catalog/pg_authid.h"
 #include "catalog/pg_foreign_data_wrapper.h"
 #include "catalog/pg_foreign_server.h"
 #include "catalog/pg_foreign_table.h"
@@ -35,6 +36,7 @@
 #include "tcop/utility.h"
 #include "utils/acl.h"
 #include "utils/builtins.h"
+#include "utils/fmgroids.h"
 #include "utils/lsyscache.h"
 #include "utils/rel.h"
 #include "utils/syscache.h"
@@ -843,11 +845,12 @@ CreateForeignServer(CreateForeignServerStmt *stmt)
 	bool		nulls[Natts_pg_foreign_server];
 	HeapTuple	tuple;
 	Oid			srvId;
+	Oid			fdwId;
+	Oid			fdwvalidator;
 	Oid			ownerId;
 	AclResult	aclresult;
 	ObjectAddress myself;
 	ObjectAddress referenced;
-	ForeignDataWrapper *fdw;
 
 	rel = table_open(ForeignServerRelationId, RowExclusiveLock);
 
@@ -885,15 +888,42 @@ CreateForeignServer(CreateForeignServerStmt *stmt)
 							stmt->servername)));
 	}
 
-	/*
-	 * Check that the FDW exists and that we have USAGE on it. Also get the
-	 * actual FDW for option validation etc.
-	 */
-	fdw = GetForeignDataWrapperByName(stmt->fdwname, false);
+	if (stmt->connection_only)
+	{
+		Assert(stmt->fdwname == NULL);
 
-	aclresult = object_aclcheck(ForeignDataWrapperRelationId, fdw->fdwid, ownerId, ACL_USAGE);
-	if (aclresult != ACLCHECK_OK)
-		aclcheck_error(aclresult, OBJECT_FDW, fdw->fdwname);
+		/*
+		 * We don't want to allow unprivileged users to be able to trigger
+		 * attempts to access arbitrary network destinations, so require the user
+		 * to have been specifically authorized to create connections.
+		 */
+		if (!has_privs_of_role(ownerId, ROLE_PG_CREATE_CONNECTION))
+			ereport(ERROR,
+					(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
+					 errmsg("permission denied to create server connection"),
+					 errdetail("Only roles with privileges of the \"%s\" role may create servers FOR CONNECTION ONLY.",
+							   "pg_create_connection")));
+
+		fdwId = InvalidOid;
+		fdwvalidator = F_PG_CONNECTION_VALIDATOR;
+	}
+	else
+	{
+		/*
+		 * Check that the FDW exists and that we have USAGE on it. Also get
+		 * the option validator oid.
+		 */
+		ForeignDataWrapper *fdw = GetForeignDataWrapperByName(stmt->fdwname,
+															  false);
+
+		aclresult = object_aclcheck(ForeignDataWrapperRelationId, fdw->fdwid,
+									ownerId, ACL_USAGE);
+		if (aclresult != ACLCHECK_OK)
+			aclcheck_error(aclresult, OBJECT_FDW, fdw->fdwname);
+
+		fdwId = fdw->fdwid;
+		fdwvalidator = fdw->fdwvalidator;
+	}
 
 	/*
 	 * Insert tuple into pg_foreign_server.
@@ -907,7 +937,7 @@ CreateForeignServer(CreateForeignServerStmt *stmt)
 	values[Anum_pg_foreign_server_srvname - 1] =
 		DirectFunctionCall1(namein, CStringGetDatum(stmt->servername));
 	values[Anum_pg_foreign_server_srvowner - 1] = ObjectIdGetDatum(ownerId);
-	values[Anum_pg_foreign_server_srvfdw - 1] = ObjectIdGetDatum(fdw->fdwid);
+	values[Anum_pg_foreign_server_srvfdw - 1] = ObjectIdGetDatum(fdwId);
 
 	/* Add server type if supplied */
 	if (stmt->servertype)
@@ -930,7 +960,7 @@ CreateForeignServer(CreateForeignServerStmt *stmt)
 	srvoptions = transformGenericOptions(ForeignServerRelationId,
 										 PointerGetDatum(NULL),
 										 stmt->options,
-										 fdw->fdwvalidator);
+										 fdwvalidator);
 
 	if (PointerIsValid(DatumGetPointer(srvoptions)))
 		values[Anum_pg_foreign_server_srvoptions - 1] = srvoptions;
@@ -948,10 +978,13 @@ CreateForeignServer(CreateForeignServerStmt *stmt)
 	myself.objectId = srvId;
 	myself.objectSubId = 0;
 
-	referenced.classId = ForeignDataWrapperRelationId;
-	referenced.objectId = fdw->fdwid;
-	referenced.objectSubId = 0;
-	recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
+	if (OidIsValid(fdwId))
+	{
+		referenced.classId = ForeignDataWrapperRelationId;
+		referenced.objectId = fdwId;
+		referenced.objectSubId = 0;
+		recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
+	}
 
 	recordDependencyOnOwner(ForeignServerRelationId, srvId, ownerId);
 
@@ -1022,10 +1055,20 @@ AlterForeignServer(AlterForeignServerStmt *stmt)
 
 	if (stmt->options)
 	{
-		ForeignDataWrapper *fdw = GetForeignDataWrapper(srvForm->srvfdw);
+		Oid			fdwvalidator;
 		Datum		datum;
 		bool		isnull;
 
+		if (!OidIsValid(srvForm->srvfdw))
+		{
+			fdwvalidator = F_PG_CONNECTION_VALIDATOR;
+		}
+		else
+		{
+			ForeignDataWrapper *fdw = GetForeignDataWrapper(srvForm->srvfdw);
+			fdwvalidator = fdw->fdwvalidator;
+		}
+
 		/* Extract the current srvoptions */
 		datum = SysCacheGetAttr(FOREIGNSERVEROID,
 								tp,
@@ -1038,7 +1081,7 @@ AlterForeignServer(AlterForeignServerStmt *stmt)
 		datum = transformGenericOptions(ForeignServerRelationId,
 										datum,
 										stmt->options,
-										fdw->fdwvalidator);
+										fdwvalidator);
 
 		if (PointerIsValid(DatumGetPointer(datum)))
 			repl_val[Anum_pg_foreign_server_srvoptions - 1] = datum;
@@ -1106,10 +1149,10 @@ CreateUserMapping(CreateUserMappingStmt *stmt)
 	HeapTuple	tuple;
 	Oid			useId;
 	Oid			umId;
+	Oid			fdwvalidator;
 	ObjectAddress myself;
 	ObjectAddress referenced;
 	ForeignServer *srv;
-	ForeignDataWrapper *fdw;
 	RoleSpec   *role = (RoleSpec *) stmt->user;
 
 	rel = table_open(UserMappingRelationId, RowExclusiveLock);
@@ -1156,7 +1199,15 @@ CreateUserMapping(CreateUserMappingStmt *stmt)
 							stmt->servername)));
 	}
 
-	fdw = GetForeignDataWrapper(srv->fdwid);
+	if (!OidIsValid(srv->fdwid))
+	{
+		fdwvalidator = F_PG_CONNECTION_VALIDATOR;
+	}
+	else
+	{
+		ForeignDataWrapper *fdw = GetForeignDataWrapper(srv->fdwid);
+		fdwvalidator = fdw->fdwvalidator;
+	}
 
 	/*
 	 * Insert tuple into pg_user_mapping.
@@ -1174,7 +1225,7 @@ CreateUserMapping(CreateUserMappingStmt *stmt)
 	useoptions = transformGenericOptions(UserMappingRelationId,
 										 PointerGetDatum(NULL),
 										 stmt->options,
-										 fdw->fdwvalidator);
+										 fdwvalidator);
 
 	if (PointerIsValid(DatumGetPointer(useoptions)))
 		values[Anum_pg_user_mapping_umoptions - 1] = useoptions;
@@ -1267,7 +1318,7 @@ AlterUserMapping(AlterUserMappingStmt *stmt)
 
 	if (stmt->options)
 	{
-		ForeignDataWrapper *fdw;
+		Oid			fdwvalidator;
 		Datum		datum;
 		bool		isnull;
 
@@ -1275,7 +1326,15 @@ AlterUserMapping(AlterUserMappingStmt *stmt)
 		 * Process the options.
 		 */
 
-		fdw = GetForeignDataWrapper(srv->fdwid);
+		if (!OidIsValid(srv->fdwid))
+		{
+			fdwvalidator = F_PG_CONNECTION_VALIDATOR;
+		}
+		else
+		{
+			ForeignDataWrapper *fdw = GetForeignDataWrapper(srv->fdwid);
+			fdwvalidator = fdw->fdwvalidator;
+		}
 
 		datum = SysCacheGetAttr(USERMAPPINGUSERSERVER,
 								tp,
@@ -1288,7 +1347,7 @@ AlterUserMapping(AlterUserMappingStmt *stmt)
 		datum = transformGenericOptions(UserMappingRelationId,
 										datum,
 										stmt->options,
-										fdw->fdwvalidator);
+										fdwvalidator);
 
 		if (PointerIsValid(DatumGetPointer(datum)))
 			repl_val[Anum_pg_user_mapping_umoptions - 1] = datum;
@@ -1437,6 +1496,12 @@ CreateForeignTable(CreateForeignTableStmt *stmt, Oid relid)
 	if (aclresult != ACLCHECK_OK)
 		aclcheck_error(aclresult, OBJECT_FOREIGN_SERVER, server->servername);
 
+	if (!OidIsValid(server->fdwid))
+		ereport(ERROR,
+				(errcode(ERRCODE_WRONG_OBJECT_TYPE),
+				 errmsg("cannot create foreign table using server that has FOR CONNECTION ONLY specified"),
+				 errhint("Use a foreign server that has a FOREIGN DATA WRAPPER specified instead.")));
+
 	fdw = GetForeignDataWrapper(server->fdwid);
 
 	/*
@@ -1496,6 +1561,12 @@ ImportForeignSchema(ImportForeignSchemaStmt *stmt)
 	if (aclresult != ACLCHECK_OK)
 		aclcheck_error(aclresult, OBJECT_FOREIGN_SERVER, server->servername);
 
+	if (!OidIsValid(server->fdwid))
+		ereport(ERROR,
+				(errcode(ERRCODE_WRONG_OBJECT_TYPE),
+				 errmsg("cannot import foreign schema using server that has FOR CONNECTION ONLY specified"),
+				 errhint("Use a foreign server that has a FOREIGN DATA WRAPPER specified instead.")));
+
 	/* Check that the schema exists and we have CREATE permissions on it */
 	(void) LookupCreationNamespace(stmt->local_schema);
 
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 34d881fd94..0297169051 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -25,14 +25,17 @@
 #include "catalog/objectaddress.h"
 #include "catalog/pg_authid_d.h"
 #include "catalog/pg_database_d.h"
+#include "catalog/pg_foreign_server.h"
 #include "catalog/pg_subscription.h"
 #include "catalog/pg_subscription_rel.h"
 #include "catalog/pg_type.h"
+#include "catalog/pg_user_mapping.h"
 #include "commands/dbcommands.h"
 #include "commands/defrem.h"
 #include "commands/event_trigger.h"
 #include "commands/subscriptioncmds.h"
 #include "executor/executor.h"
+#include "foreign/foreign.h"
 #include "miscadmin.h"
 #include "nodes/makefuncs.h"
 #include "pgstat.h"
@@ -574,6 +577,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 	Datum		values[Natts_pg_subscription];
 	Oid			owner = GetUserId();
 	HeapTuple	tup;
+	Oid			serverid;
+	Oid			umid;
 	char	   *conninfo;
 	char		originname[NAMEDATALEN];
 	List	   *publications;
@@ -594,6 +599,12 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 					  SUBOPT_RUN_AS_OWNER | SUBOPT_ORIGIN);
 	parse_subscription_options(pstate, stmt->options, supported_opts, &opts);
 
+	if (IsSet(opts.specified_opts, SUBOPT_PASSWORD_REQUIRED) && stmt->servername)
+		ereport(ERROR,
+				(errcode(ERRCODE_SYNTAX_ERROR),
+				 errmsg("option \"password_required\" invalid on subscriptions to a foreign server"),
+				 errhint("Use the \"password_required\" option on the user mappings associated with the foreign server.")));
+
 	/*
 	 * Since creating a replication slot is not transactional, rolling back
 	 * the transaction leaves the created replication slot.  So we cannot run
@@ -604,9 +615,9 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 		PreventInTransactionBlock(isTopLevel, "CREATE SUBSCRIPTION ... WITH (create_slot = true)");
 
 	/*
-	 * We don't want to allow unprivileged users to be able to trigger
-	 * attempts to access arbitrary network destinations, so require the user
-	 * to have been specifically authorized to create subscriptions.
+	 * We don't want to allow unprivileged users to utilize the resources that
+	 * a subscription requires (such as a background worker), so require the
+	 * user to have been specifically authorized to create subscriptions.
 	 */
 	if (!has_privs_of_role(owner, ROLE_PG_CREATE_SUBSCRIPTION))
 		ereport(ERROR,
@@ -666,14 +677,59 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 	if (opts.synchronous_commit == NULL)
 		opts.synchronous_commit = "off";
 
-	conninfo = stmt->conninfo;
-	publications = stmt->publication;
-
 	/* Load the library providing us libpq calls. */
 	load_file("libpqwalreceiver", false);
 
-	/* Check the connection info string. */
-	walrcv_check_conninfo(conninfo, opts.passwordrequired && !superuser());
+	if (stmt->servername)
+	{
+		ForeignServer	*server;
+		UserMapping		*um;
+
+		Assert(!stmt->conninfo);
+		conninfo = NULL;
+
+		server = GetForeignServerByName(stmt->servername, false);
+		aclresult = object_aclcheck(ForeignServerRelationId, server->serverid, owner, ACL_USAGE);
+		if (aclresult != ACLCHECK_OK)
+			aclcheck_error(aclresult, OBJECT_FOREIGN_SERVER, server->servername);
+
+		if (OidIsValid(server->fdwid))
+			ereport(ERROR,
+					(errcode(ERRCODE_WRONG_OBJECT_TYPE),
+					 errmsg("server used for subscription must have FOR CONNECTION ONLY specified")));
+
+		um = GetUserMapping(owner, server->serverid);
+
+		serverid = server->serverid;
+		umid = um->umid;
+		conninfo = ForeignServerConnectionString(owner, serverid);
+	}
+	else
+	{
+		Assert(stmt->conninfo);
+
+		/*
+		 * We don't want to allow unprivileged users to be able to trigger
+		 * attempts to access arbitrary network destinations, so require the user
+		 * to have been specifically authorized to create connections.
+		 */
+		if (!has_privs_of_role(owner, ROLE_PG_CREATE_CONNECTION))
+			ereport(ERROR,
+					(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
+					 errmsg("permission denied to create subscription with a connection string"),
+					 errdetail("Only roles with privileges of the \"%s\" role may create subscriptions with CONNECTION specified.",
+							   "pg_create_connection"),
+					 errhint("Create a subscription to a foreign server by specifying SERVER instead.")));
+
+		/* Check the connection info string. */
+		walrcv_check_conninfo(stmt->conninfo, opts.passwordrequired && !superuser());
+
+		serverid = InvalidOid;
+		umid = InvalidOid;
+		conninfo = stmt->conninfo;
+	}
+
+	publications = stmt->publication;
 
 	/* Everything ok, form a new tuple. */
 	memset(values, 0, sizeof(values));
@@ -697,8 +753,12 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 	values[Anum_pg_subscription_subdisableonerr - 1] = BoolGetDatum(opts.disableonerr);
 	values[Anum_pg_subscription_subpasswordrequired - 1] = BoolGetDatum(opts.passwordrequired);
 	values[Anum_pg_subscription_subrunasowner - 1] = BoolGetDatum(opts.runasowner);
-	values[Anum_pg_subscription_subconninfo - 1] =
-		CStringGetTextDatum(conninfo);
+	values[Anum_pg_subscription_subserver - 1] = serverid;
+	if (!OidIsValid(serverid))
+		values[Anum_pg_subscription_subconninfo - 1] =
+			CStringGetTextDatum(conninfo);
+	else
+		nulls[Anum_pg_subscription_subconninfo - 1] = true;
 	if (opts.slot_name)
 		values[Anum_pg_subscription_subslotname - 1] =
 			DirectFunctionCall1(namein, CStringGetDatum(opts.slot_name));
@@ -719,6 +779,20 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 
 	recordDependencyOnOwner(SubscriptionRelationId, subid, owner);
 
+	ObjectAddressSet(myself, SubscriptionRelationId, subid);
+
+	if (stmt->servername)
+	{
+		ObjectAddress referenced;
+		Assert(OidIsValid(serverid) && OidIsValid(umid));
+
+		ObjectAddressSet(referenced, ForeignServerRelationId, serverid);
+		recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
+
+		ObjectAddressSet(referenced, UserMappingRelationId, umid);
+		recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
+	}
+
 	ReplicationOriginNameForLogicalRep(subid, InvalidOid, originname, sizeof(originname));
 	replorigin_create(originname);
 
@@ -835,8 +909,6 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 	if (opts.enabled)
 		ApplyLauncherWakeupAtCommit();
 
-	ObjectAddressSet(myself, SubscriptionRelationId, subid);
-
 	InvokeObjectPostCreateHook(SubscriptionRelationId, subid, 0);
 
 	return myself;
@@ -1124,6 +1196,8 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 	memset(nulls, false, sizeof(nulls));
 	memset(replaces, false, sizeof(replaces));
 
+	ObjectAddressSet(myself, SubscriptionRelationId, subid);
+
 	switch (stmt->kind)
 	{
 		case ALTER_SUBSCRIPTION_OPTIONS:
@@ -1191,6 +1265,12 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 
 				if (IsSet(opts.specified_opts, SUBOPT_PASSWORD_REQUIRED))
 				{
+					if (OidIsValid(form->subserver))
+						ereport(ERROR,
+								(errcode(ERRCODE_SYNTAX_ERROR),
+								 errmsg("option \"password_required\" invalid on subscriptions to a foreign server"),
+								 errhint("Use the \"password_required\" option on the user mappings associated with the foreign server.")));
+
 					/* Non-superuser may not disable password_required. */
 					if (!opts.passwordrequired && !superuser())
 						ereport(ERROR,
@@ -1237,7 +1317,82 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 				break;
 			}
 
+		case ALTER_SUBSCRIPTION_SERVER:
+			{
+				ForeignServer	*new_server;
+				UserMapping		*new_um;
+				ObjectAddress	 referenced;
+				AclResult		 aclresult;
+
+				/*
+				 * Remove what was there before, either another foreign server
+				 * or a connection string.
+				 */
+				if (form->subserver)
+				{
+					UserMapping *old_um = GetUserMapping(form->subowner, form->subserver);
+
+					deleteDependencyRecordsForSpecific(SubscriptionRelationId, form->oid,
+													   DEPENDENCY_NORMAL,
+													   ForeignServerRelationId, form->subserver);
+					deleteDependencyRecordsForSpecific(SubscriptionRelationId, form->oid,
+													   DEPENDENCY_NORMAL,
+													   UserMappingRelationId, old_um->umid);
+				}
+				else
+				{
+					nulls[Anum_pg_subscription_subconninfo - 1] = true;
+					replaces[Anum_pg_subscription_subconninfo - 1] = true;
+				}
+
+				/*
+				 * Find the new server and user mapping. Check ACL of server
+				 * based on current user ID, but find the user mapping based
+				 * on the subscription owner.
+				 */
+				new_server = GetForeignServerByName(stmt->servername, false);
+				aclresult = object_aclcheck(ForeignServerRelationId,
+											new_server->serverid, GetUserId(), ACL_USAGE);
+				if (aclresult != ACLCHECK_OK)
+					aclcheck_error(aclresult, OBJECT_FOREIGN_SERVER,
+								   new_server->servername);
+
+				if (OidIsValid(new_server->fdwid))
+					ereport(ERROR,
+							(errcode(ERRCODE_WRONG_OBJECT_TYPE),
+							 errmsg("server used for subscription must have FOR CONNECTION ONLY specified")));
+
+				new_um = GetUserMapping(form->subowner, new_server->serverid);
+
+				values[Anum_pg_subscription_subserver - 1] = new_server->serverid;
+				replaces[Anum_pg_subscription_subserver - 1] = true;
+
+				ObjectAddressSet(referenced, ForeignServerRelationId, new_server->serverid);
+				recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
+
+				ObjectAddressSet(referenced, UserMappingRelationId, new_um->umid);
+				recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
+				update_tuple = true;
+			}
+			break;
+
 		case ALTER_SUBSCRIPTION_CONNECTION:
+			/* remove reference to foreign server and dependencies, if present */
+			if (form->subserver)
+			{
+				UserMapping *old_um = GetUserMapping(form->subowner, form->subserver);
+
+				deleteDependencyRecordsForSpecific(SubscriptionRelationId, form->oid,
+												   DEPENDENCY_NORMAL,
+												   ForeignServerRelationId, form->subserver);
+				deleteDependencyRecordsForSpecific(SubscriptionRelationId, form->oid,
+												   DEPENDENCY_NORMAL,
+												   UserMappingRelationId, old_um->umid);
+
+				values[Anum_pg_subscription_subserver - 1] = InvalidOid;
+				replaces[Anum_pg_subscription_subserver - 1] = true;
+			}
+
 			/* Load the library providing us libpq calls. */
 			load_file("libpqwalreceiver", false);
 			/* Check the connection info string. */
@@ -1448,8 +1603,6 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 
 	table_close(rel, RowExclusiveLock);
 
-	ObjectAddressSet(myself, SubscriptionRelationId, subid);
-
 	InvokeObjectPostAlterHook(SubscriptionRelationId, subid, 0);
 
 	/* Wake up related replication workers to handle this change quickly. */
@@ -1534,9 +1687,17 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
 	subname = pstrdup(NameStr(*DatumGetName(datum)));
 
 	/* Get conninfo */
-	datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID, tup,
-								   Anum_pg_subscription_subconninfo);
-	conninfo = TextDatumGetCString(datum);
+	if (OidIsValid(form->subserver))
+	{
+		conninfo = ForeignServerConnectionString(form->subowner,
+												 form->subserver);
+	}
+	else
+	{
+		datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID, tup,
+									   Anum_pg_subscription_subconninfo);
+		conninfo = TextDatumGetCString(datum);
+	}
 
 	/* Get slotname */
 	datum = SysCacheGetAttr(SUBSCRIPTIONOID, tup,
@@ -1637,6 +1798,7 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
 	}
 
 	/* Clean up dependencies */
+	deleteDependencyRecordsFor(SubscriptionRelationId, subid, false);
 	deleteSharedDependencyRecordsFor(SubscriptionRelationId, subid, 0);
 
 	/* Remove any associated relation synchronization states. */
@@ -1846,6 +2008,17 @@ AlterSubscriptionOwner_internal(Relation rel, HeapTuple tup, Oid newOwnerId)
 		aclcheck_error(aclresult, OBJECT_DATABASE,
 					   get_database_name(MyDatabaseId));
 
+	if (form->subserver)
+	{
+		UserMapping *old_um = GetUserMapping(form->subowner, form->subserver);
+		UserMapping *new_um = GetUserMapping(newOwnerId, form->subserver);
+
+		if (changeDependencyFor(SubscriptionRelationId, form->oid,
+								UserMappingRelationId, old_um->umid, new_um->umid) != 1)
+			elog(ERROR, "could not change user mapping dependency for subscription %u",
+				 form->oid);
+	}
+
 	form->subowner = newOwnerId;
 	CatalogTupleUpdate(rel, &tup->t_self, tup);
 
diff --git a/src/backend/foreign/foreign.c b/src/backend/foreign/foreign.c
index ca3ad55b62..b93b9a3146 100644
--- a/src/backend/foreign/foreign.c
+++ b/src/backend/foreign/foreign.c
@@ -18,11 +18,14 @@
 #include "catalog/pg_foreign_server.h"
 #include "catalog/pg_foreign_table.h"
 #include "catalog/pg_user_mapping.h"
+#include "commands/defrem.h"
 #include "foreign/fdwapi.h"
 #include "foreign/foreign.h"
 #include "funcapi.h"
 #include "lib/stringinfo.h"
+#include "mb/pg_wchar.h"
 #include "miscadmin.h"
+#include "replication/walreceiver.h"
 #include "utils/builtins.h"
 #include "utils/memutils.h"
 #include "utils/rel.h"
@@ -190,6 +193,116 @@ GetForeignServerByName(const char *srvname, bool missing_ok)
 }
 
 
+/*
+ * Escape a connection option value. Helper for options_to_connstr().
+ */
+static char *
+escape_value(char *val)
+{
+	StringInfoData result;
+
+	initStringInfo(&result);
+
+	for (int i = 0; val[i] != '\0'; i++)
+	{
+		if (val[i] == '\\' || val[i] == '\'')
+			appendStringInfoChar(&result, '\\');
+		appendStringInfoChar(&result, val[i]);
+	}
+
+	return result.data;
+}
+
+
+/*
+ * Helper for ForeignServerConnectionString() and pg_connection_validator().
+ *
+ * Transform a List of DefElem into a connection string.
+ *
+ * XXX: might leak memory, investigate
+ */
+static char *
+options_to_connstr(List *options)
+{
+	StringInfoData	 connstr;
+	ListCell		*lc;
+	bool			 first = true;
+
+	initStringInfo(&connstr);
+	foreach(lc, options)
+	{
+		DefElem *d = (DefElem *) lfirst(lc);
+		char *name = d->defname;
+		char *value;
+
+		/* not a libpq option; skip */
+		if (strcmp(name, "password_required") == 0)
+			continue;
+
+		/* XXX: pfree() result of defGetString() if needed? */
+		value = escape_value(defGetString(d));
+
+		appendStringInfo(&connstr, "%s%s = '%s'",
+						 first ? "" : " ", name, value);
+		first = false;
+
+		pfree(value);
+	}
+
+	/* override client_encoding */
+	appendStringInfo(&connstr, "%sclient_encoding = '%s'",
+					 first ? "" : " ", GetDatabaseEncodingName());
+
+	return connstr.data;
+}
+
+
+/*
+ * Given a user ID and server ID, return a postgres connection string suitable
+ * to pass to libpq.
+ *
+ * XXX: might leak memory, investigate
+ */
+char *
+ForeignServerConnectionString(Oid userid, Oid serverid)
+{
+	ForeignServer	*server	 = GetForeignServer(serverid);
+	UserMapping		*um		 = GetUserMapping(userid, serverid);
+	List			*options = list_concat(um->options, server->options);
+	char			*connstr;
+
+	connstr = options_to_connstr(options);
+
+	pfree(server);
+	pfree(um);
+	list_free(options);
+
+	return connstr;
+}
+
+
+/*
+ * Get foreign server name from the given oid.
+ */
+static char *
+get_foreign_server_name(Oid serverid)
+{
+	Form_pg_foreign_server	 form;
+	HeapTuple				 tp;
+	char					*result;
+
+	tp = SearchSysCache1(FOREIGNSERVEROID, ObjectIdGetDatum(serverid));
+	if (!HeapTupleIsValid(tp))
+		elog(ERROR, "cache lookup failed for server %u", serverid);
+
+	form = (Form_pg_foreign_server) GETSTRUCT(tp);
+	result = pstrdup(NameStr(form->srvname));
+	ReleaseSysCache(tp);
+
+	return result;
+}
+
+
 /*
  * GetUserMapping - look up the user mapping.
  *
@@ -219,7 +332,8 @@ GetUserMapping(Oid userid, Oid serverid)
 	if (!HeapTupleIsValid(tp))
 		ereport(ERROR,
 				(errcode(ERRCODE_UNDEFINED_OBJECT),
-				 errmsg("user mapping not found for \"%s\"",
+				 errmsg("user mapping not found for server \"%s\" and user \"%s\"",
+						get_foreign_server_name(serverid),
 						MappingUserName(userid))));
 
 	um = (UserMapping *) palloc(sizeof(UserMapping));
@@ -596,6 +710,114 @@ is_conninfo_option(const char *option, Oid context)
 }
 
 
+/*
+ * Option validator for CREATE SERVER ... FOR CONNECTION ONLY.
+ *
+ * XXX: try to unify with validators for CREATE SUBSCRIPTION ... CONNECTION,
+ * postgres_fdw, and dblink. Also investigate if memory leaks are a problem
+ * here.
+ */
+Datum
+pg_connection_validator(PG_FUNCTION_ARGS)
+{
+	List			*options_list = untransformRelOptions(PG_GETARG_DATUM(0));
+	Oid				 catalog	  = PG_GETARG_OID(1);
+
+	if (catalog == ForeignServerRelationId)
+	{
+		char		*conninfo;
+		ListCell	*lc;
+
+		foreach(lc, options_list)
+		{
+			DefElem *d = (DefElem *) lfirst(lc);
+
+			if (strcmp(d->defname, "client_encoding") == 0)
+				ereport(ERROR,
+						(errmsg("cannot specify client_encoding in server FOR CONNECTION ONLY")));
+
+			if (strcmp(d->defname, "user") == 0 ||
+				strcmp(d->defname, "password") == 0 ||
+				strcmp(d->defname, "sslpassword") == 0 ||
+				strcmp(d->defname, "password_required") == 0)
+				ereport(ERROR,
+						(errmsg("invalid option \"%s\" for server FOR CONNECTION ONLY",
+								d->defname),
+						 errhint("Specify option \"%s\" for a user mapping associated with the server instead.",
+								 d->defname)));
+		}
+
+		conninfo = options_to_connstr(options_list);
+
+		/* Load the library providing us libpq calls. */
+		load_file("libpqwalreceiver", false);
+
+		walrcv_check_conninfo(conninfo, false);
+	}
+	else if (catalog == UserMappingRelationId)
+	{
+		bool		 password_required = true;
+		bool		 password_provided = false;
+		ListCell	*lc;
+
+		foreach(lc, options_list)
+		{
+			DefElem *d = (DefElem *) lfirst(lc);
+
+			if (strcmp(d->defname, "password_required") == 0)
+			{
+				/*
+				 * Only the superuser may set this option on a user mapping, or
+				 * alter a user mapping on which this option is set. We allow a
+				 * user to clear this option if it's set - in fact, we don't have
+				 * a choice since we can't see the old mapping when validating an
+				 * alter.
+				 */
+				if (!superuser() && !defGetBoolean(d))
+					ereport(ERROR,
+							(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
+							 errmsg("password_required=false is superuser-only"),
+							 errhint("User mappings with the password_required option set to false may only be created or modified by the superuser.")));
+
+				password_required = defGetBoolean(d);
+			}
+
+			if ((strcmp(d->defname, "sslkey") == 0 || strcmp(d->defname, "sslcert") == 0) && !superuser())
+				ereport(ERROR,
+						(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
+						 errmsg("sslcert and sslkey are superuser-only"),
+						 errhint("User mappings with the sslcert or sslkey options set may only be created or modified by the superuser.")));
+
+			if (strcmp(d->defname, "password") == 0)
+				password_provided = true;
+
+			if (strcmp(d->defname, "user") != 0 &&
+				strcmp(d->defname, "password") != 0 &&
+				strcmp(d->defname, "sslpassword") != 0 &&
+				strcmp(d->defname, "sslkey") != 0 &&
+				strcmp(d->defname, "sslcert") != 0 &&
+				strcmp(d->defname, "password_required") != 0)
+				elog(ERROR, "invalid user mapping option \"%s\"", d->defname);
+		}
+
+		if (password_required && !password_provided)
+			ereport(ERROR,
+					(errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
+					 errmsg("password is required"),
+					 errdetail("Non-superusers must provide a password in the connection string.")));
+	}
+	else if (catalog == ForeignTableRelationId)
+		elog(ERROR, "unexpected call to pg_connection_validator for pg_foreign_table catalog");
+	else if (catalog == AttributeRelationId)
+		elog(ERROR, "unexpected call to pg_connection_validator for pg_attribute catalog");
+	else
+		elog(ERROR, "unexpected call to pg_connection_validator for catalog %d", catalog);
+
+
+	PG_RETURN_BOOL(true);
+}
+
+
 /*
  * Validate the generic option given to SERVER or USER MAPPING.
  * Raise an ERROR if the option or its value is considered invalid.
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index 7d2032885e..0becc0ea30 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -5385,6 +5385,32 @@ CreateForeignServerStmt: CREATE SERVER name opt_type opt_foreign_server_version
 					n->if_not_exists = true;
 					$$ = (Node *) n;
 				}
+				| CREATE SERVER name opt_type opt_foreign_server_version
+						 FOR CONNECTION ONLY create_generic_options
+				{
+					CreateForeignServerStmt *n = makeNode(CreateForeignServerStmt);
+
+					n->servername = $3;
+					n->servertype = $4;
+					n->version = $5;
+					n->options = $9;
+					n->connection_only = true;
+					n->if_not_exists = false;
+					$$ = (Node *) n;
+				}
+				| CREATE SERVER IF_P NOT EXISTS name opt_type opt_foreign_server_version
+						 FOR CONNECTION ONLY create_generic_options
+				{
+					CreateForeignServerStmt *n = makeNode(CreateForeignServerStmt);
+
+					n->servername = $6;
+					n->servertype = $7;
+					n->version = $8;
+					n->options = $12;
+					n->connection_only = true;
+					n->if_not_exists = true;
+					$$ = (Node *) n;
+				}
 		;
 
 opt_type:
@@ -10588,6 +10614,16 @@ CreateSubscriptionStmt:
 					n->options = $8;
 					$$ = (Node *) n;
 				}
+			| CREATE SUBSCRIPTION name SERVER name PUBLICATION name_list opt_definition
+				{
+					CreateSubscriptionStmt *n =
+						makeNode(CreateSubscriptionStmt);
+					n->subname = $3;
+					n->servername = $5;
+					n->publication = $7;
+					n->options = $8;
+					$$ = (Node *) n;
+				}
 		;
 
 /*****************************************************************************
@@ -10617,6 +10653,16 @@ AlterSubscriptionStmt:
 					n->conninfo = $5;
 					$$ = (Node *) n;
 				}
+			| ALTER SUBSCRIPTION name SERVER name
+				{
+					AlterSubscriptionStmt *n =
+						makeNode(AlterSubscriptionStmt);
+
+					n->kind = ALTER_SUBSCRIPTION_SERVER;
+					n->subname = $3;
+					n->servername = $5;
+					$$ = (Node *) n;
+				}
 			| ALTER SUBSCRIPTION name REFRESH PUBLICATION opt_definition
 				{
 					AlterSubscriptionStmt *n =
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 597947410f..c9ba0e9b15 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -3990,7 +3990,9 @@ maybe_reread_subscription(void)
 }
 
 /*
- * Callback from subscription syscache invalidation.
+ * Callback from subscription syscache invalidation. Also needed for server or
+ * user mapping invalidation, which can change the connection information for
+ * subscriptions that connect using a server object.
  */
 static void
 subscription_change_cb(Datum arg, int cacheid, uint32 hashvalue)
@@ -4625,6 +4627,14 @@ InitializeLogRepWorker(void)
 	CacheRegisterSyscacheCallback(SUBSCRIPTIONOID,
 								  subscription_change_cb,
 								  (Datum) 0);
+	/* Keep us informed about subscription changes. */
+	CacheRegisterSyscacheCallback(FOREIGNSERVEROID,
+								  subscription_change_cb,
+								  (Datum) 0);
+	/* Keep us informed about subscription changes. */
+	CacheRegisterSyscacheCallback(USERMAPPINGOID,
+								  subscription_change_cb,
+								  (Datum) 0);
 
 	if (am_tablesync_worker())
 		ereport(LOG,
diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c
index 65f64c282d..1c60dd7c2c 100644
--- a/src/bin/pg_dump/pg_dump.c
+++ b/src/bin/pg_dump/pg_dump.c
@@ -48,6 +48,7 @@
 #include "catalog/pg_cast_d.h"
 #include "catalog/pg_class_d.h"
 #include "catalog/pg_default_acl_d.h"
+#include "catalog/pg_foreign_data_wrapper.h"
 #include "catalog/pg_largeobject_d.h"
 #include "catalog/pg_largeobject_metadata_d.h"
 #include "catalog/pg_proc_d.h"
@@ -4587,6 +4588,7 @@ getSubscriptions(Archive *fout)
 	int			i_subtwophasestate;
 	int			i_subdisableonerr;
 	int			i_suborigin;
+	int			i_subservername;
 	int			i_subconninfo;
 	int			i_subslotname;
 	int			i_subsynccommit;
@@ -4647,17 +4649,26 @@ getSubscriptions(Archive *fout)
 	if (fout->remoteVersion >= 160000)
 		appendPQExpBufferStr(query,
 							 " s.suborigin,\n"
-							 " s.subpasswordrequired\n");
+							 " s.subpasswordrequired,\n");
 	else
 		appendPQExpBuffer(query,
 						  " '%s' AS suborigin,\n"
-						  " 't' AS subpasswordrequired\n",
+						  " 't' AS subpasswordrequired,\n",
 						  LOGICALREP_ORIGIN_ANY);
 
-	appendPQExpBufferStr(query,
-						 "FROM pg_subscription s\n"
-						 "WHERE s.subdbid = (SELECT oid FROM pg_database\n"
-						 "                   WHERE datname = current_database())");
+	if (fout->remoteVersion >= 170000)
+		appendPQExpBufferStr(query,
+							 " fs.srvname AS subservername\n"
+							 "FROM pg_subscription s LEFT JOIN pg_foreign_server fs\n"
+							 "  ON (s.subserver = fs.oid)\n"
+							 "WHERE s.subdbid = (SELECT oid FROM pg_database\n"
+							 "                   WHERE datname = current_database())");
+	else
+		appendPQExpBufferStr(query,
+							 " NULL AS subservername\n"
+							 "FROM pg_subscription s\n"
+							 "WHERE s.subdbid = (SELECT oid FROM pg_database\n"
+							 "                   WHERE datname = current_database())");
 
 	res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK);
 
@@ -4671,6 +4682,7 @@ getSubscriptions(Archive *fout)
 	i_oid = PQfnumber(res, "oid");
 	i_subname = PQfnumber(res, "subname");
 	i_subowner = PQfnumber(res, "subowner");
+	i_subservername = PQfnumber(res, "subservername");
 	i_subconninfo = PQfnumber(res, "subconninfo");
 	i_subslotname = PQfnumber(res, "subslotname");
 	i_subsynccommit = PQfnumber(res, "subsynccommit");
@@ -4693,6 +4705,10 @@ getSubscriptions(Archive *fout)
 		AssignDumpId(&subinfo[i].dobj);
 		subinfo[i].dobj.name = pg_strdup(PQgetvalue(res, i, i_subname));
 		subinfo[i].rolname = getRoleName(PQgetvalue(res, i, i_subowner));
+		if (PQgetisnull(res, i, i_subservername))
+			subinfo[i].subservername = NULL;
+		else
+			subinfo[i].subservername = pg_strdup(PQgetvalue(res, i, i_subservername));
 		subinfo[i].subconninfo = pg_strdup(PQgetvalue(res, i, i_subconninfo));
 		if (PQgetisnull(res, i, i_subslotname))
 			subinfo[i].subslotname = NULL;
@@ -4751,9 +4767,17 @@ dumpSubscription(Archive *fout, const SubscriptionInfo *subinfo)
 	appendPQExpBuffer(delq, "DROP SUBSCRIPTION %s;\n",
 					  qsubname);
 
-	appendPQExpBuffer(query, "CREATE SUBSCRIPTION %s CONNECTION ",
+	appendPQExpBuffer(query, "CREATE SUBSCRIPTION %s ",
 					  qsubname);
-	appendStringLiteralAH(query, subinfo->subconninfo, fout);
+	if (subinfo->subservername)
+	{
+		appendPQExpBuffer(query, "SERVER %s", fmtId(subinfo->subservername));
+	}
+	else
+	{
+		appendPQExpBuffer(query, "CONNECTION ");
+		appendStringLiteralAH(query, subinfo->subconninfo, fout);
+	}
 
 	/* Build list of quoted publications and append them to query. */
 	if (!parsePGArray(subinfo->subpublications, &pubnames, &npubnames))
@@ -14633,9 +14657,9 @@ dumpForeignServer(Archive *fout, const ForeignServerInfo *srvinfo)
 	PQExpBuffer q;
 	PQExpBuffer delq;
 	PQExpBuffer query;
-	PGresult   *res;
+	PGresult   *res = NULL;
 	char	   *qsrvname;
-	char	   *fdwname;
+	char	   *fdwname = NULL;
 
 	/* Do nothing in data-only dump */
 	if (dopt->dataOnly)
@@ -14647,13 +14671,16 @@ dumpForeignServer(Archive *fout, const ForeignServerInfo *srvinfo)
 
 	qsrvname = pg_strdup(fmtId(srvinfo->dobj.name));
 
-	/* look up the foreign-data wrapper */
-	appendPQExpBuffer(query, "SELECT fdwname "
-					  "FROM pg_foreign_data_wrapper w "
-					  "WHERE w.oid = '%u'",
-					  srvinfo->srvfdw);
-	res = ExecuteSqlQueryForSingleRow(fout, query->data);
-	fdwname = PQgetvalue(res, 0, 0);
+	if (OidIsValid(srvinfo->srvfdw))
+	{
+		/* look up the foreign-data wrapper */
+		appendPQExpBuffer(query, "SELECT fdwname "
+						  "FROM pg_foreign_data_wrapper w "
+						  "WHERE w.oid = '%u'",
+						  srvinfo->srvfdw);
+		res = ExecuteSqlQueryForSingleRow(fout, query->data);
+		fdwname = PQgetvalue(res, 0, 0);
+	}
 
 	appendPQExpBuffer(q, "CREATE SERVER %s", qsrvname);
 	if (srvinfo->srvtype && strlen(srvinfo->srvtype) > 0)
@@ -14667,8 +14694,15 @@ dumpForeignServer(Archive *fout, const ForeignServerInfo *srvinfo)
 		appendStringLiteralAH(q, srvinfo->srvversion, fout);
 	}
 
-	appendPQExpBufferStr(q, " FOREIGN DATA WRAPPER ");
-	appendPQExpBufferStr(q, fmtId(fdwname));
+	if (!OidIsValid(srvinfo->srvfdw))
+	{
+		appendPQExpBufferStr(q, " FOR CONNECTION ONLY ");
+	}
+	else
+	{
+		appendPQExpBufferStr(q, " FOREIGN DATA WRAPPER ");
+		appendPQExpBufferStr(q, fmtId(fdwname));
+	}
 
 	if (srvinfo->srvoptions && strlen(srvinfo->srvoptions) > 0)
 		appendPQExpBuffer(q, " OPTIONS (\n    %s\n)", srvinfo->srvoptions);
@@ -14710,7 +14744,8 @@ dumpForeignServer(Archive *fout, const ForeignServerInfo *srvinfo)
 						 srvinfo->rolname,
 						 srvinfo->dobj.catId, srvinfo->dobj.dumpId);
 
-	PQclear(res);
+	if (res)
+		PQclear(res);
 
 	free(qsrvname);
 
diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h
index 9036b13f6a..9ed34b9c6f 100644
--- a/src/bin/pg_dump/pg_dump.h
+++ b/src/bin/pg_dump/pg_dump.h
@@ -660,6 +660,7 @@ typedef struct _SubscriptionInfo
 {
 	DumpableObject dobj;
 	const char *rolname;
+	char	   *subservername;
 	char	   *subconninfo;
 	char	   *subslotname;
 	char	   *subbinary;
diff --git a/src/bin/psql/tab-complete.c b/src/bin/psql/tab-complete.c
index 779fdc90cb..4ff8bab9e6 100644
--- a/src/bin/psql/tab-complete.c
+++ b/src/bin/psql/tab-complete.c
@@ -3193,7 +3193,8 @@ psql_completion(const char *text, int start, int end)
 
 /* CREATE SERVER <name> */
 	else if (Matches("CREATE", "SERVER", MatchAny))
-		COMPLETE_WITH("TYPE", "VERSION", "FOREIGN DATA WRAPPER");
+		COMPLETE_WITH("TYPE", "VERSION", "FOR CONNECTION ONLY",
+					  "FOREIGN DATA WRAPPER");
 
 /* CREATE STATISTICS <name> */
 	else if (Matches("CREATE", "STATISTICS", MatchAny))
@@ -3287,7 +3288,7 @@ psql_completion(const char *text, int start, int end)
 
 /* CREATE SUBSCRIPTION */
 	else if (Matches("CREATE", "SUBSCRIPTION", MatchAny))
-		COMPLETE_WITH("CONNECTION");
+		COMPLETE_WITH("SERVER", "CONNECTION");
 	else if (Matches("CREATE", "SUBSCRIPTION", MatchAny, "CONNECTION", MatchAny))
 		COMPLETE_WITH("PUBLICATION");
 	else if (Matches("CREATE", "SUBSCRIPTION", MatchAny, "CONNECTION",
diff --git a/src/include/catalog/pg_authid.dat b/src/include/catalog/pg_authid.dat
index 6b4a0aaaad..8fce457ab1 100644
--- a/src/include/catalog/pg_authid.dat
+++ b/src/include/catalog/pg_authid.dat
@@ -94,5 +94,10 @@
   rolcreaterole => 'f', rolcreatedb => 'f', rolcanlogin => 'f',
   rolreplication => 'f', rolbypassrls => 'f', rolconnlimit => '-1',
   rolpassword => '_null_', rolvaliduntil => '_null_' },
+{ oid => '6123', oid_symbol => 'ROLE_PG_CREATE_CONNECTION',
+  rolname => 'pg_create_connection', rolsuper => 'f', rolinherit => 't',
+  rolcreaterole => 'f', rolcreatedb => 'f', rolcanlogin => 'f',
+  rolreplication => 'f', rolbypassrls => 'f', rolconnlimit => '-1',
+  rolpassword => '_null_', rolvaliduntil => '_null_' },
 
 ]
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 9805bc6118..1df3d19016 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -7483,6 +7483,10 @@
   proname => 'postgresql_fdw_validator', prorettype => 'bool',
   proargtypes => '_text oid', prosrc => 'postgresql_fdw_validator' },
 
+{ oid => '6122', descr => '(internal)',
+  proname => 'pg_connection_validator', prorettype => 'bool',
+  proargtypes => '_text oid', prosrc => 'pg_connection_validator' },
+
 { oid => '2290', descr => 'I/O',
   proname => 'record_in', provolatile => 's', prorettype => 'record',
   proargtypes => 'cstring oid int4', prosrc => 'record_in' },
diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h
index 1d40eebc78..01736b0419 100644
--- a/src/include/catalog/pg_subscription.h
+++ b/src/include/catalog/pg_subscription.h
@@ -93,9 +93,12 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW
 	bool		subrunasowner;	/* True if replication should execute as the
 								 * subscription owner */
 
+	Oid			subserver;		/* Set if connecting with server */
+
 #ifdef CATALOG_VARLEN			/* variable-length fields start here */
 	/* Connection string to the publisher */
-	text		subconninfo BKI_FORCE_NOT_NULL;
+	text		subconninfo BKI_FORCE_NULL;	/* Set if connecting with
+											   connection string */
 
 	/* Slot name on publisher */
 	NameData	subslotname BKI_FORCE_NULL;
diff --git a/src/include/foreign/foreign.h b/src/include/foreign/foreign.h
index 5256d4d91f..7058335d63 100644
--- a/src/include/foreign/foreign.h
+++ b/src/include/foreign/foreign.h
@@ -69,6 +69,7 @@ extern ForeignServer *GetForeignServerExtended(Oid serverid,
 											   bits16 flags);
 extern ForeignServer *GetForeignServerByName(const char *srvname,
 											 bool missing_ok);
+extern char *ForeignServerConnectionString(Oid userid, Oid serverid);
 extern UserMapping *GetUserMapping(Oid userid, Oid serverid);
 extern ForeignDataWrapper *GetForeignDataWrapper(Oid fdwid);
 extern ForeignDataWrapper *GetForeignDataWrapperExtended(Oid fdwid,
diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h
index fef4c714b8..29fca146a3 100644
--- a/src/include/nodes/parsenodes.h
+++ b/src/include/nodes/parsenodes.h
@@ -2748,6 +2748,7 @@ typedef struct CreateForeignServerStmt
 	char	   *servertype;		/* optional server type */
 	char	   *version;		/* optional server version */
 	char	   *fdwname;		/* FDW name */
+	bool		connection_only;	/* is CONNECTION ONLY */
 	bool		if_not_exists;	/* just do nothing if it already exists? */
 	List	   *options;		/* generic options to server */
 } CreateForeignServerStmt;
@@ -4063,6 +4064,7 @@ typedef struct CreateSubscriptionStmt
 {
 	NodeTag		type;
 	char	   *subname;		/* Name of the subscription */
+	char	   *servername;		/* Server name of publisher */
 	char	   *conninfo;		/* Connection string to publisher */
 	List	   *publication;	/* One or more publication to subscribe to */
 	List	   *options;		/* List of DefElem nodes */
@@ -4071,6 +4073,7 @@ typedef struct CreateSubscriptionStmt
 typedef enum AlterSubscriptionType
 {
 	ALTER_SUBSCRIPTION_OPTIONS,
+	ALTER_SUBSCRIPTION_SERVER,
 	ALTER_SUBSCRIPTION_CONNECTION,
 	ALTER_SUBSCRIPTION_SET_PUBLICATION,
 	ALTER_SUBSCRIPTION_ADD_PUBLICATION,
@@ -4085,6 +4088,7 @@ typedef struct AlterSubscriptionStmt
 	NodeTag		type;
 	AlterSubscriptionType kind; /* ALTER_SUBSCRIPTION_OPTIONS, etc */
 	char	   *subname;		/* Name of the subscription */
+	char	   *servername;		/* Server name of publisher */
 	char	   *conninfo;		/* Connection string to publisher */
 	List	   *publication;	/* One or more publication to subscribe to */
 	List	   *options;		/* List of DefElem nodes */
diff --git a/src/test/regress/expected/foreign_data.out b/src/test/regress/expected/foreign_data.out
index 1dfe23cc1e..0731d887ae 100644
--- a/src/test/regress/expected/foreign_data.out
+++ b/src/test/regress/expected/foreign_data.out
@@ -394,6 +394,48 @@ CREATE SERVER t2 FOREIGN DATA WRAPPER foo;
 
 RESET ROLE;
 REVOKE regress_test_indirect FROM regress_test_role;
+-- test SERVER ... FOR CONNECTION ONLY
+SET ROLE regress_test_role;
+CREATE SERVER t3 FOR CONNECTION ONLY; 			   -- ERROR: not a member of pg_create_connection
+ERROR:  permission denied to create server connection
+DETAIL:  Only roles with privileges of the "pg_create_connection" role may create servers FOR CONNECTION ONLY.
+RESET ROLE;
+GRANT pg_create_connection TO regress_test_role;
+SET ROLE regress_test_role;
+CREATE SERVER t3 FOR CONNECTION ONLY OPTIONS (client_encoding 'foo'); --fails
+ERROR:  cannot specify client_encoding in server FOR CONNECTION ONLY
+CREATE SERVER t3 FOR CONNECTION ONLY OPTIONS (user 'foo'); --fails
+ERROR:  invalid option "user" for server FOR CONNECTION ONLY
+HINT:  Specify option "user" for a user mapping associated with the server instead.
+CREATE SERVER t3 FOR CONNECTION ONLY OPTIONS (password 'foo'); --fails
+ERROR:  invalid option "password" for server FOR CONNECTION ONLY
+HINT:  Specify option "password" for a user mapping associated with the server instead.
+CREATE SERVER t3 FOR CONNECTION ONLY OPTIONS (password_required 'true'); --fails
+ERROR:  invalid option "password_required" for server FOR CONNECTION ONLY
+HINT:  Specify option "password_required" for a user mapping associated with the server instead.
+CREATE SERVER t3 FOR CONNECTION ONLY;
+IMPORT FOREIGN SCHEMA foo FROM SERVER t3 INTO bar; -- fails
+ERROR:  cannot import foreign schema using server that has FOR CONNECTION ONLY specified
+HINT:  Use a foreign server that has a FOREIGN DATA WRAPPER specified instead.
+CREATE USER MAPPING FOR PUBLIC SERVER t3 OPTIONS (user 'x'); -- fails
+ERROR:  password is required
+DETAIL:  Non-superusers must provide a password in the connection string.
+CREATE USER MAPPING FOR PUBLIC SERVER t3 OPTIONS (user 'x', password_required 'false'); -- fails
+ERROR:  password_required=false is superuser-only
+HINT:  User mappings with the password_required option set to false may only be created or modified by the superuser.
+CREATE USER MAPPING FOR PUBLIC SERVER t3 OPTIONS (user 'x', application_name 'nonsense'); -- fails
+ERROR:  invalid user mapping option "application_name"
+CREATE USER MAPPING FOR PUBLIC SERVER t3 OPTIONS (user 'x', password 'secret');
+DROP USER MAPPING FOR PUBLIC SERVER t3;
+RESET ROLE;
+CREATE USER MAPPING FOR PUBLIC SERVER t3 OPTIONS (user 'x'); -- still fails
+ERROR:  password is required
+DETAIL:  Non-superusers must provide a password in the connection string.
+CREATE USER MAPPING FOR PUBLIC SERVER t3 OPTIONS (user 'x', password 'secret');
+DROP USER MAPPING FOR PUBLIC SERVER t3;
+CREATE USER MAPPING FOR PUBLIC SERVER t3 OPTIONS (user 'x', password_required 'false');
+DROP USER MAPPING FOR PUBLIC SERVER t3;
+DROP SERVER t3;
 -- ALTER SERVER
 ALTER SERVER s0;                                            -- ERROR
 ERROR:  syntax error at or near ";"
diff --git a/src/test/regress/expected/subscription.out b/src/test/regress/expected/subscription.out
index 3c1a0869ec..09d9b5dccc 100644
--- a/src/test/regress/expected/subscription.out
+++ b/src/test/regress/expected/subscription.out
@@ -144,6 +144,44 @@ ERROR:  could not connect to the publisher: invalid port number: "-1"
 ALTER SUBSCRIPTION regress_testsub CONNECTION 'foobar';
 ERROR:  invalid connection string syntax: missing "=" after "foobar" in connection info string
 
+-- temporarily revoke pg_create_connection from pg_create_subscription
+-- to test that CREATE SUBSCRIPTION ... CONNECTION fails
+RESET SESSION AUTHORIZATION;
+REVOKE pg_create_connection FROM pg_create_subscription;
+GRANT CREATE ON DATABASE REGRESSION TO regress_subscription_user3;
+SET SESSION AUTHORIZATION regress_subscription_user3;
+-- fail - not a member of pg_create_connection, cannot use CONNECTION
+CREATE SUBSCRIPTION regress_testsub6 CONNECTION 'dbname=regress_doesnotexist password=regress_fakepassword' PUBLICATION testpub WITH (slot_name = NONE, connect = false);
+ERROR:  permission denied to create subscription with a connection string
+DETAIL:  Only roles with privileges of the "pg_create_connection" role may create subscriptions with CONNECTION specified.
+HINT:  Create a subscription to a foreign server by specifying SERVER instead.
+-- re-grant pg_create_connection to pg_create_subscription
+RESET SESSION AUTHORIZATION;
+GRANT pg_create_connection TO pg_create_subscription;
+SET SESSION AUTHORIZATION regress_subscription_user3;
+CREATE SUBSCRIPTION regress_testsub6 CONNECTION 'dbname=regress_doesnotexist password=regress_fakepassword' PUBLICATION testpub WITH (slot_name = NONE, connect = false);
+WARNING:  subscription was created, but is not connected
+HINT:  To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription.
+DROP SUBSCRIPTION regress_testsub6;
+-- test using a server object instead of connection string
+RESET SESSION AUTHORIZATION;
+CREATE SERVER regress_testserver FOR CONNECTION ONLY;
+CREATE USER MAPPING FOR regress_subscription_user3 SERVER regress_testserver
+  OPTIONS (password_required 'false');
+GRANT USAGE ON FOREIGN SERVER regress_testserver TO regress_subscription_user3;
+SET SESSION AUTHORIZATION regress_subscription_user3;
+CREATE SUBSCRIPTION regress_testsub6 SERVER regress_testserver PUBLICATION testpub
+  WITH (slot_name = NONE, connect = false);
+WARNING:  subscription was created, but is not connected
+HINT:  To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription.
+RESET SESSION AUTHORIZATION;
+ALTER SUBSCRIPTION regress_testsub6 OWNER TO regress_subscription_user; -- fail, no user mapping
+ERROR:  user mapping not found for server "regress_testserver" and user "regress_subscription_user"
+DROP SUBSCRIPTION regress_testsub6;
+DROP USER MAPPING FOR regress_subscription_user3 SERVER regress_testserver;
+DROP SERVER regress_testserver;
+REVOKE CREATE ON DATABASE regression FROM regress_subscription_user3;
+SET SESSION AUTHORIZATION regress_subscription_user;
 \dRs+
                                                                                                            List of subscriptions
       Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Synchronous commit |          Conninfo           | Skip LSN 
diff --git a/src/test/regress/sql/foreign_data.sql b/src/test/regress/sql/foreign_data.sql
index eefb860adc..8519c34c48 100644
--- a/src/test/regress/sql/foreign_data.sql
+++ b/src/test/regress/sql/foreign_data.sql
@@ -180,6 +180,40 @@ CREATE SERVER t2 FOREIGN DATA WRAPPER foo;
 RESET ROLE;
 REVOKE regress_test_indirect FROM regress_test_role;
 
+-- test SERVER ... FOR CONNECTION ONLY
+
+SET ROLE regress_test_role;
+CREATE SERVER t3 FOR CONNECTION ONLY; 			   -- ERROR: not a member of pg_create_connection
+RESET ROLE;
+GRANT pg_create_connection TO regress_test_role;
+SET ROLE regress_test_role;
+
+CREATE SERVER t3 FOR CONNECTION ONLY OPTIONS (client_encoding 'foo'); --fails
+CREATE SERVER t3 FOR CONNECTION ONLY OPTIONS (user 'foo'); --fails
+CREATE SERVER t3 FOR CONNECTION ONLY OPTIONS (password 'foo'); --fails
+CREATE SERVER t3 FOR CONNECTION ONLY OPTIONS (password_required 'true'); --fails
+CREATE SERVER t3 FOR CONNECTION ONLY;
+
+IMPORT FOREIGN SCHEMA foo FROM SERVER t3 INTO bar; -- fails
+
+CREATE USER MAPPING FOR PUBLIC SERVER t3 OPTIONS (user 'x'); -- fails
+CREATE USER MAPPING FOR PUBLIC SERVER t3 OPTIONS (user 'x', password_required 'false'); -- fails
+CREATE USER MAPPING FOR PUBLIC SERVER t3 OPTIONS (user 'x', application_name 'nonsense'); -- fails
+
+CREATE USER MAPPING FOR PUBLIC SERVER t3 OPTIONS (user 'x', password 'secret');
+DROP USER MAPPING FOR PUBLIC SERVER t3;
+
+RESET ROLE;
+CREATE USER MAPPING FOR PUBLIC SERVER t3 OPTIONS (user 'x'); -- still fails
+
+CREATE USER MAPPING FOR PUBLIC SERVER t3 OPTIONS (user 'x', password 'secret');
+DROP USER MAPPING FOR PUBLIC SERVER t3;
+
+CREATE USER MAPPING FOR PUBLIC SERVER t3 OPTIONS (user 'x', password_required 'false');
+
+DROP USER MAPPING FOR PUBLIC SERVER t3;
+DROP SERVER t3;
+
 -- ALTER SERVER
 ALTER SERVER s0;                                            -- ERROR
 ALTER SERVER s0 OPTIONS (a '1');                            -- ERROR
diff --git a/src/test/regress/sql/subscription.sql b/src/test/regress/sql/subscription.sql
index 55d7dbc9ab..f5b2ef805b 100644
--- a/src/test/regress/sql/subscription.sql
+++ b/src/test/regress/sql/subscription.sql
@@ -88,6 +88,45 @@ CREATE SUBSCRIPTION regress_testsub5 CONNECTION 'port=-1' PUBLICATION testpub;
 -- fail - invalid connection string during ALTER
 ALTER SUBSCRIPTION regress_testsub CONNECTION 'foobar';
 
+-- temporarily revoke pg_create_connection from pg_create_subscription
+-- to test that CREATE SUBSCRIPTION ... CONNECTION fails
+RESET SESSION AUTHORIZATION;
+REVOKE pg_create_connection FROM pg_create_subscription;
+GRANT CREATE ON DATABASE REGRESSION TO regress_subscription_user3;
+SET SESSION AUTHORIZATION regress_subscription_user3;
+
+-- fail - not a member of pg_create_connection, cannot use CONNECTION
+CREATE SUBSCRIPTION regress_testsub6 CONNECTION 'dbname=regress_doesnotexist password=regress_fakepassword' PUBLICATION testpub WITH (slot_name = NONE, connect = false);
+
+-- re-grant pg_create_connection to pg_create_subscription
+RESET SESSION AUTHORIZATION;
+GRANT pg_create_connection TO pg_create_subscription;
+SET SESSION AUTHORIZATION regress_subscription_user3;
+
+CREATE SUBSCRIPTION regress_testsub6 CONNECTION 'dbname=regress_doesnotexist password=regress_fakepassword' PUBLICATION testpub WITH (slot_name = NONE, connect = false);
+DROP SUBSCRIPTION regress_testsub6;
+
+-- test using a server object instead of connection string
+
+RESET SESSION AUTHORIZATION;
+CREATE SERVER regress_testserver FOR CONNECTION ONLY;
+CREATE USER MAPPING FOR regress_subscription_user3 SERVER regress_testserver
+  OPTIONS (password_required 'false');
+GRANT USAGE ON FOREIGN SERVER regress_testserver TO regress_subscription_user3;
+
+SET SESSION AUTHORIZATION regress_subscription_user3;
+CREATE SUBSCRIPTION regress_testsub6 SERVER regress_testserver PUBLICATION testpub
+  WITH (slot_name = NONE, connect = false);
+RESET SESSION AUTHORIZATION;
+
+ALTER SUBSCRIPTION regress_testsub6 OWNER TO regress_subscription_user; -- fail, no user mapping
+DROP SUBSCRIPTION regress_testsub6;
+
+DROP USER MAPPING FOR regress_subscription_user3 SERVER regress_testserver;
+DROP SERVER regress_testserver;
+REVOKE CREATE ON DATABASE regression FROM regress_subscription_user3;
+SET SESSION AUTHORIZATION regress_subscription_user;
+
 \dRs+
 
 ALTER SUBSCRIPTION regress_testsub SET PUBLICATION testpub2, testpub3 WITH (refresh = false);
diff --git a/src/test/subscription/t/001_rep_changes.pl b/src/test/subscription/t/001_rep_changes.pl
index 0a399cdb82..13b35868a7 100644
--- a/src/test/subscription/t/001_rep_changes.pl
+++ b/src/test/subscription/t/001_rep_changes.pl
@@ -27,6 +27,8 @@ $node_publisher->safe_psql('postgres',
 	"CREATE TABLE tab_notrep AS SELECT generate_series(1,10) AS a");
 $node_publisher->safe_psql('postgres',
 	"CREATE TABLE tab_ins AS SELECT generate_series(1,1002) AS a");
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE tab_ins2 AS SELECT generate_series(1,1002) AS a");
 $node_publisher->safe_psql('postgres',
 	"CREATE TABLE tab_full AS SELECT generate_series(1,10) AS a");
 $node_publisher->safe_psql('postgres', "CREATE TABLE tab_full2 (x text)");
@@ -60,6 +62,7 @@ $node_publisher->safe_psql('postgres',
 # Setup structure on subscriber
 $node_subscriber->safe_psql('postgres', "CREATE TABLE tab_notrep (a int)");
 $node_subscriber->safe_psql('postgres', "CREATE TABLE tab_ins (a int)");
+$node_subscriber->safe_psql('postgres', "CREATE TABLE tab_ins2 (a int)");
 $node_subscriber->safe_psql('postgres', "CREATE TABLE tab_full (a int)");
 $node_subscriber->safe_psql('postgres', "CREATE TABLE tab_full2 (x text)");
 $node_subscriber->safe_psql('postgres',
@@ -102,6 +105,22 @@ $node_subscriber->safe_psql('postgres',
 	"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub, tap_pub_ins_only"
 );
 
+my $publisher_host = $node_publisher->host;
+my $publisher_port = $node_publisher->port;
+$node_subscriber->safe_psql('postgres',
+	"CREATE SERVER tap_sub2_server FOR CONNECTION ONLY OPTIONS (host '$publisher_host', port '$publisher_port', dbname 'postgres')"
+);
+
+$node_subscriber->safe_psql('postgres',
+	"CREATE USER MAPPING FOR PUBLIC SERVER tap_sub2_server OPTIONS (password_required 'false')"
+);
+
+$node_publisher->safe_psql('postgres', "CREATE PUBLICATION tap_simple_pub FOR TABLE tab_ins2");
+
+$node_subscriber->safe_psql('postgres',
+	"CREATE SUBSCRIPTION tap_sub2 SERVER tap_sub2_server PUBLICATION tap_simple_pub"
+);
+
 # Wait for initial table sync to finish
 $node_subscriber->wait_for_subscription_sync($node_publisher, 'tap_sub');
 
@@ -113,11 +132,22 @@ $result =
   $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_ins");
 is($result, qq(1002), 'check initial data was copied to subscriber');
 
+$result =
+  $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_ins2");
+is($result, qq(1002), 'check initial data was copied to subscriber');
+
+$node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION tap_sub2 CONNECTION '$publisher_connstr'");
+
 $node_publisher->safe_psql('postgres',
 	"INSERT INTO tab_ins SELECT generate_series(1,50)");
 $node_publisher->safe_psql('postgres', "DELETE FROM tab_ins WHERE a > 20");
 $node_publisher->safe_psql('postgres', "UPDATE tab_ins SET a = -a");
 
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO tab_ins2 SELECT generate_series(1,50)");
+
+$node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION tap_sub2 SERVER tap_sub2_server");
+
 $node_publisher->safe_psql('postgres',
 	"INSERT INTO tab_rep SELECT generate_series(1,50)");
 $node_publisher->safe_psql('postgres', "DELETE FROM tab_rep WHERE a > 20");
@@ -147,6 +177,10 @@ $result = $node_subscriber->safe_psql('postgres',
 	"SELECT count(*), min(a), max(a) FROM tab_ins");
 is($result, qq(1052|1|1002), 'check replicated inserts on subscriber');
 
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT count(*), min(a), max(a) FROM tab_ins2");
+is($result, qq(1052|1|1002), 'check replicated inserts on subscriber');
+
 $result = $node_subscriber->safe_psql('postgres',
 	"SELECT count(*), min(a), max(a) FROM tab_rep");
 is($result, qq(20|-20|-1), 'check replicated changes on subscriber');
@@ -434,10 +468,27 @@ $node_publisher->poll_query_until('postgres',
   or die
   "Timed out while waiting for apply to restart after changing PUBLICATION";
 
+# test that changes to a foreign server subscription cause the worker
+# to restart
+$oldpid = $node_publisher->safe_psql('postgres',
+	"SELECT pid FROM pg_stat_replication WHERE application_name = 'tap_sub2' AND state = 'streaming';"
+);
+$node_subscriber->safe_psql('postgres',
+	"ALTER SERVER tap_sub2_server OPTIONS (sslmode 'disable')"
+);
+$node_publisher->poll_query_until('postgres',
+	"SELECT pid != $oldpid FROM pg_stat_replication WHERE application_name = 'tap_sub2' AND state = 'streaming';"
+  )
+  or die
+  "Timed out while waiting for apply to restart after changing PUBLICATION";
+
 $node_publisher->safe_psql('postgres',
 	"INSERT INTO tab_ins SELECT generate_series(1001,1100)");
 $node_publisher->safe_psql('postgres', "DELETE FROM tab_rep");
 
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO tab_ins2 SELECT generate_series(1001,1100)");
+
 # Restart the publisher and check the state of the subscriber which
 # should be in a streaming state after catching up.
 $node_publisher->stop('fast');
@@ -450,6 +501,11 @@ $result = $node_subscriber->safe_psql('postgres',
 is($result, qq(1152|1|1100),
 	'check replicated inserts after subscription publication change');
 
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT count(*), min(a), max(a) FROM tab_ins2");
+is($result, qq(1152|1|1100),
+	'check replicated inserts after subscription publication change');
+
 $result = $node_subscriber->safe_psql('postgres',
 	"SELECT count(*), min(a), max(a) FROM tab_rep");
 is($result, qq(20|-20|-1),
@@ -518,6 +574,7 @@ $node_publisher->poll_query_until('postgres',
 
 # check all the cleanup
 $node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub_renamed");
+$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub2");
 
 $result = $node_subscriber->safe_psql('postgres',
 	"SELECT count(*) FROM pg_subscription");
-- 
2.34.1

