From 63866c2c8f9c968957dfb0d5b8c5ceca2c4787ed Mon Sep 17 00:00:00 2001
From: Andres Freund <andres@anarazel.de>
Date: Mon, 24 Apr 2017 22:57:49 -0700
Subject: [PATCH] WIP: POC: Prototype: Unify SQL and replication grammars.

---
 src/backend/commands/event_trigger.c               |   8 +-
 src/backend/parser/analyze.c                       |  10 +
 src/backend/parser/gram.y                          | 396 ++++++++++++++++++--
 src/backend/replication/Makefile                   |   5 +-
 .../libpqwalreceiver/libpqwalreceiver.c            |   2 +-
 src/backend/replication/logical/snapbuild.c        |   5 +-
 src/backend/replication/repl_gram.y                | 408 ---------------------
 src/backend/replication/repl_scanner.l             | 248 -------------
 src/backend/replication/walsender.c                | 124 ++-----
 src/backend/tcop/postgres.c                        |  69 ++--
 src/backend/tcop/pquery.c                          |   2 +
 src/backend/tcop/utility.c                         |  46 +++
 src/bin/pg_basebackup/pg_recvlogical.c             |   2 +-
 src/bin/pg_basebackup/receivelog.c                 |   2 +-
 src/include/nodes/nodes.h                          |   2 +-
 src/include/nodes/replnodes.h                      |   8 +
 src/include/parser/gramparse.h                     |   1 +
 src/include/parser/kwlist.h                        |  18 +
 src/include/replication/walsender.h                |   5 +-
 19 files changed, 538 insertions(+), 823 deletions(-)
 delete mode 100644 src/backend/replication/repl_gram.y
 delete mode 100644 src/backend/replication/repl_scanner.l

diff --git a/src/backend/commands/event_trigger.c b/src/backend/commands/event_trigger.c
index d7c199f314..1e441fbe86 100644
--- a/src/backend/commands/event_trigger.c
+++ b/src/backend/commands/event_trigger.c
@@ -295,7 +295,13 @@ check_ddl_tag(const char *tag)
 		pg_strcasecmp(tag, "REVOKE") == 0 ||
 		pg_strcasecmp(tag, "DROP OWNED") == 0 ||
 		pg_strcasecmp(tag, "IMPORT FOREIGN SCHEMA") == 0 ||
-		pg_strcasecmp(tag, "SECURITY LABEL") == 0)
+		pg_strcasecmp(tag, "SECURITY LABEL") == 0 ||
+		pg_strcasecmp(tag, "IDENTIFY_SYSTEM") == 0 ||
+		pg_strcasecmp(tag, "BASE_BACKUP") == 0 ||
+		pg_strcasecmp(tag, "CREATE_REPLICATION_SLOT") == 0 ||
+		pg_strcasecmp(tag, "DROP_REPLICATION_SLOT") == 0 ||
+		pg_strcasecmp(tag, "START_REPLICATION") == 0 ||
+		pg_strcasecmp(tag, "TIMELINE_HISTORY") == 0)
 		return EVENT_TRIGGER_COMMAND_TAG_OK;
 
 	/*
diff --git a/src/backend/parser/analyze.c b/src/backend/parser/analyze.c
index 567dd54c6c..d969c0a4e0 100644
--- a/src/backend/parser/analyze.c
+++ b/src/backend/parser/analyze.c
@@ -42,6 +42,7 @@
 #include "parser/parse_target.h"
 #include "parser/parsetree.h"
 #include "rewrite/rewriteManip.h"
+#include "replication/walsender.h"
 #include "utils/rel.h"
 
 
@@ -104,6 +105,15 @@ parse_analyze(RawStmt *parseTree, const char *sourceText,
 
 	pstate->p_sourcetext = sourceText;
 
+	if (am_walsender && !am_db_walsender &&
+		analyze_requires_snapshot(parseTree))
+	{
+		/* FIXME: message */
+		ereport(ERROR,
+				(errcode(ERRCODE_PROTOCOL_VIOLATION),
+				 errmsg("non-replication statement not supported in pure replication connection")));
+	}
+
 	if (numParams > 0)
 		parse_fixed_parameters(pstate, paramTypes, numParams);
 
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index 89d2836c49..6ede6d6af0 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -48,6 +48,7 @@
 #include <ctype.h>
 #include <limits.h>
 
+#include "access/xlogdefs.h"
 #include "catalog/index.h"
 #include "catalog/namespace.h"
 #include "catalog/pg_am.h"
@@ -56,9 +57,12 @@
 #include "commands/trigger.h"
 #include "nodes/makefuncs.h"
 #include "nodes/nodeFuncs.h"
+#include "nodes/replnodes.h"
 #include "parser/gramparse.h"
 #include "parser/parser.h"
 #include "parser/parse_expr.h"
+#include "replication/walsender.h"
+#include "replication/walsender_private.h"
 #include "storage/lmgr.h"
 #include "utils/date.h"
 #include "utils/datetime.h"
@@ -241,6 +245,8 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
 	PartitionSpec		*partspec;
 	PartitionRangeDatum	*partrange_datum;
 	RoleSpec			*rolespec;
+	XLogRecPtr			recptr;
+	uint32				uintval;
 }
 
 %type <node>	stmt schema_stmt
@@ -584,6 +590,25 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
 %type <partrange_datum>	PartitionRangeDatum
 %type <list>		range_datum_list
 
+
+/* replication stuff */
+%type <node>	replication_command
+%type <node>	base_backup start_replication start_logical_replication
+				create_replication_slot drop_replication_slot identify_system
+				timeline_history
+%type <list>	base_backup_opt_list
+%type <defelt>	base_backup_opt
+%type <uintval>	opt_timeline
+%type <list>	plugin_options plugin_opt_list
+%type <defelt>	plugin_opt_elem
+%type <node>	plugin_opt_arg
+%type <str>		opt_slot
+%type <boolean>	opt_temporary
+%type <list>	create_slot_opt_list
+%type <defelt>	create_slot_opt
+%type <recptr>	recptr
+
+
 /*
  * Non-keyword token types.  These are hard-wired into the "flex" lexer.
  * They must be listed first so that their numeric codes do not depend on
@@ -610,7 +635,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
 	AGGREGATE ALL ALSO ALTER ALWAYS ANALYSE ANALYZE AND ANY ARRAY AS ASC
 	ASSERTION ASSIGNMENT ASYMMETRIC AT ATTACH ATTRIBUTE AUTHORIZATION
 
-	BACKWARD BEFORE BEGIN_P BETWEEN BIGINT BINARY BIT
+	BACKWARD BASE_BACKUP BEFORE BEGIN_P BETWEEN BIGINT BINARY BIT
 	BOOLEAN_P BOTH BY
 
 	CACHE CALLED CASCADE CASCADED CASE CAST CATALOG_P CHAIN CHAR_P
@@ -618,29 +643,29 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
 	CLUSTER COALESCE COLLATE COLLATION COLUMN COLUMNS COMMENT COMMENTS COMMIT
 	COMMITTED CONCURRENTLY CONFIGURATION CONFLICT CONNECTION CONSTRAINT
 	CONSTRAINTS CONTENT_P CONTINUE_P CONVERSION_P COPY COST CREATE
-	CROSS CSV CUBE CURRENT_P
+	CREATE_REPLICATION_SLOT CROSS CSV CUBE CURRENT_P
 	CURRENT_CATALOG CURRENT_DATE CURRENT_ROLE CURRENT_SCHEMA
 	CURRENT_TIME CURRENT_TIMESTAMP CURRENT_USER CURSOR CYCLE
 
 	DATA_P DATABASE DAY_P DEALLOCATE DEC DECIMAL_P DECLARE DEFAULT DEFAULTS
 	DEFERRABLE DEFERRED DEFINER DELETE_P DELIMITER DELIMITERS DEPENDS DESC
 	DETACH DICTIONARY DISABLE_P DISCARD DISTINCT DO DOCUMENT_P DOMAIN_P
-	DOUBLE_P DROP
+	DOUBLE_P DROP DROP_REPLICATION_SLOT
 
 	EACH ELSE ENABLE_P ENCODING ENCRYPTED END_P ENUM_P ESCAPE EVENT EXCEPT
-	EXCLUDE EXCLUDING EXCLUSIVE EXECUTE EXISTS EXPLAIN
+	EXCLUDE EXCLUDING EXCLUSIVE EXECUTE EXISTS EXPLAIN EXPORT_SNAPSHOT
 	EXTENSION EXTERNAL EXTRACT
 
-	FALSE_P FAMILY FETCH FILTER FIRST_P FLOAT_P FOLLOWING FOR
+	FALSE_P FAMILY FAST_P FETCH FILTER FIRST_P FLOAT_P FOLLOWING FOR
 	FORCE FOREIGN FORWARD FREEZE FROM FULL FUNCTION FUNCTIONS
 
 	GENERATED GLOBAL GRANT GRANTED GREATEST GROUP_P GROUPING
 
 	HANDLER HAVING HEADER_P HOLD HOUR_P
 
-	IDENTITY_P IF_P ILIKE IMMEDIATE IMMUTABLE IMPLICIT_P IMPORT_P IN_P
-	INCLUDING INCREMENT INDEX INDEXES INHERIT INHERITS INITIALLY INLINE_P
-	INNER_P INOUT INPUT_P INSENSITIVE INSERT INSTEAD INT_P INTEGER
+	IDENTITY_P IDENTIFY_SYSTEM IF_P ILIKE IMMEDIATE IMMUTABLE IMPLICIT_P
+	IMPORT_P IN_P INCLUDING INCREMENT INDEX INDEXES INHERIT INHERITS INITIALLY
+	INLINE_P INNER_P INOUT INPUT_P INSENSITIVE INSERT INSTEAD INT_P INTEGER
 	INTERSECT INTERVAL INTO INVOKER IS ISNULL ISOLATION
 
 	JOIN
@@ -649,45 +674,46 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
 
 	LABEL LANGUAGE LARGE_P LAST_P LATERAL_P
 	LEADING LEAKPROOF LEAST LEFT LEVEL LIKE LIMIT LISTEN LOAD LOCAL
-	LOCALTIME LOCALTIMESTAMP LOCATION LOCK_P LOCKED LOGGED
+	LOCALTIME LOCALTIMESTAMP LOCATION LOCK_P LOCKED LOGGED LOGICAL_P
 
-	MAPPING MATCH MATERIALIZED MAXVALUE METHOD MINUTE_P MINVALUE MODE MONTH_P MOVE
+	MAPPING MATCH MATERIALIZED MAXVALUE MAX_RATE METHOD MINUTE_P MINVALUE MODE
+	MONTH_P MOVE
 
-	NAME_P NAMES NATIONAL NATURAL NCHAR NEW NEXT NO NONE
+	NAME_P NAMES NATIONAL NATURAL NCHAR NEW NEXT NO NOEXPORT_SNAPSHOT NONE
 	NOREFRESH NOT NOTHING NOTIFY NOTNULL NOWAIT NULL_P NULLIF
 	NULLS_P NUMERIC
 
 	OBJECT_P OF OFF OFFSET OIDS OLD ON ONLY OPERATOR OPTION OPTIONS OR
 	ORDER ORDINALITY OUT_P OUTER_P OVER OVERLAPS OVERLAY OVERRIDING OWNED OWNER
 
-	PARALLEL PARSER PARTIAL PARTITION PASSING PASSWORD PLACING PLANS POLICY
-	POSITION PRECEDING PRECISION PRESERVE PREPARE PREPARED PRIMARY
-	PRIOR PRIVILEGES PROCEDURAL PROCEDURE PROGRAM PUBLICATION
+	PARALLEL PARSER PARTIAL PARTITION PASSING PASSWORD PHYSICAL PLACING PLANS
+	POLICY POSITION PRECEDING PRECISION PRESERVE PREPARE PREPARED PRIMARY
+	PRIOR PRIVILEGES PROCEDURAL PROCEDURE PROGRAM PROGRESS PUBLICATION
 
 	QUOTE
 
 	RANGE READ REAL REASSIGN RECHECK RECURSIVE REF REFERENCES REFERENCING
 	REFRESH REINDEX RELATIVE_P RELEASE RENAME REPEATABLE REPLACE REPLICA
-	RESET RESTART RESTRICT RETURNING RETURNS REVOKE RIGHT ROLE ROLLBACK ROLLUP
-	ROW ROWS RULE
+	RESERVE_WAL RESET RESTART RESTRICT RETURNING RETURNS REVOKE RIGHT ROLE
+	ROLLBACK ROLLUP ROW ROWS RULE
 
 	SAVEPOINT SCHEMA SCHEMAS SCROLL SEARCH SECOND_P SECURITY SELECT SEQUENCE SEQUENCES
 	SERIALIZABLE SERVER SESSION SESSION_USER SET SETS SETOF SHARE SHOW
 	SIMILAR SIMPLE SKIP SLOT SMALLINT SNAPSHOT SOME SQL_P STABLE STANDALONE_P
-	START STATEMENT STATISTICS STDIN STDOUT STORAGE STRICT_P STRIP_P
-	SUBSCRIPTION SUBSTRING SYMMETRIC SYSID SYSTEM_P
+	START START_REPLICATION STATEMENT STATISTICS STDIN STDOUT STORAGE STRICT_P
+	STRIP_P SUBSCRIPTION SUBSTRING SYMMETRIC SYSID SYSTEM_P
 
-	TABLE TABLES TABLESAMPLE TABLESPACE TEMP TEMPLATE TEMPORARY TEXT_P THEN
-	TIME TIMESTAMP TO TRAILING TRANSACTION TRANSFORM TREAT TRIGGER TRIM TRUE_P
-	TRUNCATE TRUSTED TYPE_P TYPES_P
+	TABLE TABLES TABLESAMPLE TABLESPACE TABLESPACE_MAP_P TEMP TEMPLATE TEMPORARY
+	TEXT_P THEN TIME TIMELINE_P TIMELINE_HISTORY TIMESTAMP TO TRAILING TRANSACTION
+	TRANSFORM TREAT TRIGGER TRIM TRUE_P	TRUNCATE TRUSTED TYPE_P TYPES_P
 
 	UNBOUNDED UNCOMMITTED UNENCRYPTED UNION UNIQUE UNKNOWN UNLISTEN UNLOGGED
-	UNTIL UPDATE USER USING
+	UNTIL UPDATE USE_SNAPSHOT USER USING
 
 	VACUUM VALID VALIDATE VALIDATOR VALUE_P VALUES VARCHAR VARIADIC VARYING
 	VERBOSE VERSION_P VIEW VIEWS VOLATILE
 
-	WHEN WHERE WHITESPACE_P WINDOW WITH WITHIN WITHOUT WORK WRAPPER WRITE
+	WAL_P WHEN WHERE WHITESPACE_P WINDOW WITH WITHIN WITHOUT WORK WRAPPER WRITE
 
 	XML_P XMLATTRIBUTES XMLCONCAT XMLELEMENT XMLEXISTS XMLFOREST XMLNAMESPACES
 	XMLPARSE XMLPI XMLROOT XMLSERIALIZE XMLTABLE
@@ -942,6 +968,7 @@ stmt :
 			| VariableSetStmt
 			| VariableShowStmt
 			| ViewStmt
+			| replication_command
 			| /*EMPTY*/
 				{ $$ = NULL; }
 		;
@@ -14487,6 +14514,311 @@ AexprConst: Iconst
 				}
 		;
 
+/* replication protocol stuff */
+
+replication_command:
+			identify_system
+			{
+				ReplicationCmd *rcmd = makeNode(ReplicationCmd);
+				rcmd->replicationCmd = $1;
+				$$ = (Node *) rcmd;
+			}
+			| base_backup
+			{
+				ReplicationCmd *rcmd = makeNode(ReplicationCmd);
+				rcmd->replicationCmd = $1;
+				$$ = (Node *) rcmd;
+			}
+			| start_replication
+			{
+				ReplicationCmd *rcmd = makeNode(ReplicationCmd);
+				rcmd->replicationCmd = $1;
+				$$ = (Node *) rcmd;
+			}
+			| start_logical_replication
+			{
+				ReplicationCmd *rcmd = makeNode(ReplicationCmd);
+				rcmd->replicationCmd = $1;
+				$$ = (Node *) rcmd;
+			}
+			| create_replication_slot
+			{
+				ReplicationCmd *rcmd = makeNode(ReplicationCmd);
+				rcmd->replicationCmd = $1;
+				$$ = (Node *) rcmd;
+			}
+			| drop_replication_slot
+			{
+				ReplicationCmd *rcmd = makeNode(ReplicationCmd);
+				rcmd->replicationCmd = $1;
+				$$ = (Node *) rcmd;
+			}
+			| timeline_history
+			{
+				ReplicationCmd *rcmd = makeNode(ReplicationCmd);
+				rcmd->replicationCmd = $1;
+				$$ = (Node *) rcmd;
+			}
+;
+
+
+/*
+ * IDENTIFY_SYSTEM
+ */
+identify_system:
+			IDENTIFY_SYSTEM
+				{
+					$$ = (Node *) makeNode(IdentifySystemCmd);
+				}
+			;
+
+/*
+ * BASE_BACKUP [LABEL '<label>'] [PROGRESS] [FAST] [WAL] [NOWAIT]
+ * [MAX_RATE %d] [TABLESPACE_MAP]
+ */
+base_backup:
+			BASE_BACKUP base_backup_opt_list
+				{
+					BaseBackupCmd *cmd = makeNode(BaseBackupCmd);
+					cmd->options = $2;
+					$$ = (Node *) cmd;
+				}
+			;
+
+base_backup_opt_list:
+			base_backup_opt_list base_backup_opt
+				{ $$ = lappend($1, $2); }
+			| /* EMPTY */
+				{ $$ = NIL; }
+			;
+
+base_backup_opt:
+			LABEL SCONST
+				{
+				  $$ = makeDefElem("label",
+								   (Node *)makeString($2), -1);
+				}
+			| PROGRESS
+				{
+				  $$ = makeDefElem("progress",
+								   (Node *)makeInteger(TRUE), -1);
+				}
+			| FAST_P
+				{
+				  $$ = makeDefElem("fast",
+								   (Node *)makeInteger(TRUE), -1);
+				}
+			| WAL_P
+				{
+				  $$ = makeDefElem("wal",
+								   (Node *)makeInteger(TRUE), -1);
+				}
+			| NOWAIT
+				{
+				  $$ = makeDefElem("nowait",
+								   (Node *)makeInteger(TRUE), -1);
+				}
+			| MAX_RATE ICONST
+				{
+				  $$ = makeDefElem("max_rate",
+								   (Node *)makeInteger($2), -1);
+				}
+			| TABLESPACE_MAP_P
+				{
+				  $$ = makeDefElem("tablespace_map",
+								   (Node *)makeInteger(TRUE), -1);
+				}
+			;
+
+create_replication_slot:
+			/* CREATE_REPLICATION_SLOT slot TEMPORARY PHYSICAL RESERVE_WAL */
+			CREATE_REPLICATION_SLOT IDENT opt_temporary PHYSICAL create_slot_opt_list
+				{
+					CreateReplicationSlotCmd *cmd;
+					cmd = makeNode(CreateReplicationSlotCmd);
+					cmd->kind = REPLICATION_KIND_PHYSICAL;
+					cmd->slotname = $2;
+					cmd->temporary = $3;
+					cmd->options = $5;
+					$$ = (Node *) cmd;
+				}
+			/* CREATE_REPLICATION_SLOT slot TEMPORARY LOGICAL plugin */
+			| CREATE_REPLICATION_SLOT IDENT opt_temporary LOGICAL_P IDENT create_slot_opt_list
+				{
+					CreateReplicationSlotCmd *cmd;
+					cmd = makeNode(CreateReplicationSlotCmd);
+					cmd->kind = REPLICATION_KIND_LOGICAL;
+					cmd->slotname = $2;
+					cmd->temporary = $3;
+					cmd->plugin = $5;
+					cmd->options = $6;
+					$$ = (Node *) cmd;
+				}
+			;
+
+create_slot_opt_list:
+			create_slot_opt_list create_slot_opt
+				{ $$ = lappend($1, $2); }
+			| /* EMPTY */
+				{ $$ = NIL; }
+			;
+
+create_slot_opt:
+			EXPORT_SNAPSHOT
+				{
+				  $$ = makeDefElem("export_snapshot",
+								   (Node *)makeInteger(TRUE), -1);
+				}
+			| NOEXPORT_SNAPSHOT
+				{
+				  $$ = makeDefElem("export_snapshot",
+								   (Node *)makeInteger(FALSE), -1);
+				}
+			| USE_SNAPSHOT
+				{
+				  $$ = makeDefElem("use_snapshot",
+								   (Node *)makeInteger(TRUE), -1);
+				}
+			| RESERVE_WAL
+				{
+				  $$ = makeDefElem("reserve_wal",
+								   (Node *)makeInteger(TRUE), -1);
+				}
+			;
+
+/* DROP_REPLICATION_SLOT slot */
+drop_replication_slot:
+			DROP_REPLICATION_SLOT IDENT
+				{
+					DropReplicationSlotCmd *cmd;
+					cmd = makeNode(DropReplicationSlotCmd);
+					cmd->slotname = $2;
+					$$ = (Node *) cmd;
+				}
+			;
+
+recptr:
+			SCONST
+			   {
+					uint32	hi,
+							lo;
+					if (sscanf($1, "%X/%X", &hi, &lo) != 2)
+							ereport(ERROR,
+									(errcode(ERRCODE_SYNTAX_ERROR),
+									 errmsg("invalid frakbar"),
+									 parser_errposition(@1)));
+					$$ = ((uint64) hi) << 32 | lo;
+				}
+
+/*
+ * START_REPLICATION [SLOT slot] [PHYSICAL] %X/%X [TIMELINE %d]
+ */
+start_replication:
+			START_REPLICATION opt_slot opt_physical recptr opt_timeline
+				{
+					StartReplicationCmd *cmd;
+
+					cmd = makeNode(StartReplicationCmd);
+					cmd->kind = REPLICATION_KIND_PHYSICAL;
+					cmd->slotname = $2;
+					cmd->startpoint = $4;
+					cmd->timeline = $5;
+					$$ = (Node *) cmd;
+				}
+			;
+
+/* START_REPLICATION SLOT slot LOGICAL %X/%X options */
+start_logical_replication:
+			START_REPLICATION SLOT IDENT LOGICAL_P recptr plugin_options
+				{
+					StartReplicationCmd *cmd;
+					cmd = makeNode(StartReplicationCmd);
+					cmd->kind = REPLICATION_KIND_LOGICAL;
+					cmd->slotname = $3;
+					cmd->startpoint = $5;
+					cmd->options = $6;
+					$$ = (Node *) cmd;
+				}
+			;
+/*
+ * TIMELINE_HISTORY %d
+ */
+timeline_history:
+			TIMELINE_HISTORY ICONST
+				{
+					TimeLineHistoryCmd *cmd;
+
+					if ($2 <= 0)
+						ereport(ERROR,
+								(errcode(ERRCODE_SYNTAX_ERROR),
+								 (errmsg("invalid timeline %u", $2))));
+
+					cmd = makeNode(TimeLineHistoryCmd);
+					cmd->timeline = $2;
+
+					$$ = (Node *) cmd;
+				}
+			;
+
+opt_physical:
+			PHYSICAL
+			| /* EMPTY */
+			;
+
+opt_temporary:
+			TEMPORARY						{ $$ = true; }
+			| /* EMPTY */					{ $$ = false; }
+			;
+
+opt_slot:
+			SLOT IDENT
+				{ $$ = $2; }
+			| /* EMPTY */
+				{ $$ = NULL; }
+			;
+
+opt_timeline:
+			TIMELINE_P ICONST
+				{
+					if ($2 <= 0)
+						ereport(ERROR,
+								(errcode(ERRCODE_SYNTAX_ERROR),
+								 (errmsg("invalid timeline %u", $2))));
+					$$ = $2;
+				}
+				| /* EMPTY */			{ $$ = 0; }
+			;
+
+
+plugin_options:
+			'(' plugin_opt_list ')'			{ $$ = $2; }
+			| /* EMPTY */					{ $$ = NIL; }
+		;
+
+plugin_opt_list:
+			plugin_opt_elem
+				{
+					$$ = list_make1($1);
+				}
+			| plugin_opt_list ',' plugin_opt_elem
+				{
+					$$ = lappend($1, $3);
+				}
+		;
+
+plugin_opt_elem:
+			IDENT plugin_opt_arg
+				{
+					$$ = makeDefElem($1, $2, -1);
+				}
+		;
+
+plugin_opt_arg:
+			SCONST							{ $$ = (Node *) makeString($1); }
+			| /* EMPTY */					{ $$ = NULL; }
+		;
+
+
 Iconst:		ICONST									{ $$ = $1; };
 Sconst:		SCONST									{ $$ = $1; };
 
@@ -14646,6 +14978,7 @@ unreserved_keyword:
 			| ATTACH
 			| ATTRIBUTE
 			| BACKWARD
+			| BASE_BACKUP
 			| BEFORE
 			| BEGIN_P
 			| BY
@@ -14674,6 +15007,7 @@ unreserved_keyword:
 			| CONVERSION_P
 			| COPY
 			| COST
+			| CREATE_REPLICATION_SLOT
 			| CSV
 			| CUBE
 			| CURRENT_P
@@ -14699,6 +15033,7 @@ unreserved_keyword:
 			| DOMAIN_P
 			| DOUBLE_P
 			| DROP
+			| DROP_REPLICATION_SLOT
 			| EACH
 			| ENABLE_P
 			| ENCODING
@@ -14711,9 +15046,11 @@ unreserved_keyword:
 			| EXCLUSIVE
 			| EXECUTE
 			| EXPLAIN
+			| EXPORT_SNAPSHOT
 			| EXTENSION
 			| EXTERNAL
 			| FAMILY
+			| FAST_P
 			| FILTER
 			| FIRST_P
 			| FOLLOWING
@@ -14728,6 +15065,7 @@ unreserved_keyword:
 			| HEADER_P
 			| HOLD
 			| HOUR_P
+			| IDENTIFY_SYSTEM
 			| IDENTITY_P
 			| IF_P
 			| IMMEDIATE
@@ -14761,10 +15099,12 @@ unreserved_keyword:
 			| LOCK_P
 			| LOCKED
 			| LOGGED
+			| LOGICAL_P
 			| MAPPING
 			| MATCH
 			| MATERIALIZED
 			| MAXVALUE
+			| MAX_RATE
 			| METHOD
 			| MINUTE_P
 			| MINVALUE
@@ -14776,6 +15116,7 @@ unreserved_keyword:
 			| NEW
 			| NEXT
 			| NO
+			| NOEXPORT_SNAPSHOT
 			| NOREFRESH
 			| NOTHING
 			| NOTIFY
@@ -14800,6 +15141,7 @@ unreserved_keyword:
 			| PARTITION
 			| PASSING
 			| PASSWORD
+			| PHYSICAL
 			| PLANS
 			| POLICY
 			| PRECEDING
@@ -14811,6 +15153,7 @@ unreserved_keyword:
 			| PROCEDURAL
 			| PROCEDURE
 			| PROGRAM
+			| PROGRESS
 			| PUBLICATION
 			| QUOTE
 			| RANGE
@@ -14828,6 +15171,7 @@ unreserved_keyword:
 			| REPEATABLE
 			| REPLACE
 			| REPLICA
+			| RESERVE_WAL
 			| RESET
 			| RESTART
 			| RESTRICT
@@ -14862,6 +15206,7 @@ unreserved_keyword:
 			| STABLE
 			| STANDALONE_P
 			| START
+			| START_REPLICATION
 			| STATEMENT
 			| STATISTICS
 			| STDIN
@@ -14874,10 +15219,13 @@ unreserved_keyword:
 			| SYSTEM_P
 			| TABLES
 			| TABLESPACE
+			| TABLESPACE_MAP_P
 			| TEMP
 			| TEMPLATE
 			| TEMPORARY
 			| TEXT_P
+			| TIMELINE_P
+			| TIMELINE_HISTORY
 			| TRANSACTION
 			| TRANSFORM
 			| TRIGGER
@@ -14893,6 +15241,7 @@ unreserved_keyword:
 			| UNLOGGED
 			| UNTIL
 			| UPDATE
+			| USE_SNAPSHOT
 			| VACUUM
 			| VALID
 			| VALIDATE
@@ -14903,6 +15252,7 @@ unreserved_keyword:
 			| VIEW
 			| VIEWS
 			| VOLATILE
+			| WAL_P
 			| WHITESPACE_P
 			| WITHIN
 			| WITHOUT
diff --git a/src/backend/replication/Makefile b/src/backend/replication/Makefile
index da8bcf0471..62f8f3a440 100644
--- a/src/backend/replication/Makefile
+++ b/src/backend/replication/Makefile
@@ -15,15 +15,12 @@ include $(top_builddir)/src/Makefile.global
 override CPPFLAGS := -I. -I$(srcdir) $(CPPFLAGS)
 
 OBJS = walsender.o walreceiverfuncs.o walreceiver.o basebackup.o \
-	repl_gram.o slot.o slotfuncs.o syncrep.o syncrep_gram.o
+	slot.o slotfuncs.o syncrep.o syncrep_gram.o
 
 SUBDIRS = logical
 
 include $(top_srcdir)/src/backend/common.mk
 
-# repl_scanner is compiled as part of repl_gram
-repl_gram.o: repl_scanner.c
-
 # syncrep_scanner is complied as part of syncrep_gram
 syncrep_gram.o: syncrep_scanner.c
 syncrep_scanner.c: FLEXFLAGS = -CF -p -i
diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
index 9d7bb25d39..26a53333f8 100644
--- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
+++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
@@ -352,7 +352,7 @@ libpqrcv_startstreaming(WalReceiverConn *conn,
 	if (options->logical)
 		appendStringInfo(&cmd, " LOGICAL");
 
-	appendStringInfo(&cmd, " %X/%X",
+	appendStringInfo(&cmd, " '%X/%X'",
 					 (uint32) (options->startpoint >> 32),
 					 (uint32) options->startpoint);
 
diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c
index 358ec28932..eb261ea02f 100644
--- a/src/backend/replication/logical/snapbuild.c
+++ b/src/backend/replication/logical/snapbuild.c
@@ -589,8 +589,7 @@ SnapBuildExportSnapshot(SnapBuild *builder)
 	Snapshot	snap;
 	char	   *snapname;
 
-	if (IsTransactionOrTransactionBlock())
-		elog(ERROR, "cannot export a snapshot from within a transaction");
+	Assert(IsTransactionOrTransactionBlock());
 
 	if (SavedResourceOwnerDuringExport)
 		elog(ERROR, "can only export one snapshot at a time");
@@ -598,8 +597,6 @@ SnapBuildExportSnapshot(SnapBuild *builder)
 	SavedResourceOwnerDuringExport = CurrentResourceOwner;
 	ExportInProgress = true;
 
-	StartTransactionCommand();
-
 	/* There doesn't seem to a nice API to set these */
 	XactIsoLevel = XACT_REPEATABLE_READ;
 	XactReadOnly = true;
diff --git a/src/backend/replication/repl_gram.y b/src/backend/replication/repl_gram.y
deleted file mode 100644
index ec047c827c..0000000000
--- a/src/backend/replication/repl_gram.y
+++ /dev/null
@@ -1,408 +0,0 @@
-%{
-/*-------------------------------------------------------------------------
- *
- * repl_gram.y				- Parser for the replication commands
- *
- * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
- * Portions Copyright (c) 1994, Regents of the University of California
- *
- *
- * IDENTIFICATION
- *	  src/backend/replication/repl_gram.y
- *
- *-------------------------------------------------------------------------
- */
-
-#include "postgres.h"
-
-#include "access/xlogdefs.h"
-#include "nodes/makefuncs.h"
-#include "nodes/replnodes.h"
-#include "replication/walsender.h"
-#include "replication/walsender_private.h"
-
-
-/* Result of the parsing is returned here */
-Node *replication_parse_result;
-
-static SQLCmd *make_sqlcmd(void);
-
-
-/*
- * Bison doesn't allocate anything that needs to live across parser calls,
- * so we can easily have it use palloc instead of malloc.  This prevents
- * memory leaks if we error out during parsing.  Note this only works with
- * bison >= 2.0.  However, in bison 1.875 the default is to use alloca()
- * if possible, so there's not really much problem anyhow, at least if
- * you're building with gcc.
- */
-#define YYMALLOC palloc
-#define YYFREE   pfree
-
-%}
-
-%expect 0
-%name-prefix="replication_yy"
-
-%union {
-		char					*str;
-		bool					boolval;
-		uint32					uintval;
-
-		XLogRecPtr				recptr;
-		Node					*node;
-		List					*list;
-		DefElem					*defelt;
-}
-
-/* Non-keyword tokens */
-%token <str> SCONST IDENT
-%token <uintval> UCONST
-%token <recptr> RECPTR
-%token T_WORD
-
-/* Keyword tokens. */
-%token K_BASE_BACKUP
-%token K_IDENTIFY_SYSTEM
-%token K_SHOW
-%token K_START_REPLICATION
-%token K_CREATE_REPLICATION_SLOT
-%token K_DROP_REPLICATION_SLOT
-%token K_TIMELINE_HISTORY
-%token K_LABEL
-%token K_PROGRESS
-%token K_FAST
-%token K_NOWAIT
-%token K_MAX_RATE
-%token K_WAL
-%token K_TABLESPACE_MAP
-%token K_TIMELINE
-%token K_PHYSICAL
-%token K_LOGICAL
-%token K_SLOT
-%token K_RESERVE_WAL
-%token K_TEMPORARY
-%token K_EXPORT_SNAPSHOT
-%token K_NOEXPORT_SNAPSHOT
-%token K_USE_SNAPSHOT
-
-%type <node>	command
-%type <node>	base_backup start_replication start_logical_replication
-				create_replication_slot drop_replication_slot identify_system
-				timeline_history show sql_cmd
-%type <list>	base_backup_opt_list
-%type <defelt>	base_backup_opt
-%type <uintval>	opt_timeline
-%type <list>	plugin_options plugin_opt_list
-%type <defelt>	plugin_opt_elem
-%type <node>	plugin_opt_arg
-%type <str>		opt_slot var_name
-%type <boolval>	opt_temporary
-%type <list>	create_slot_opt_list
-%type <defelt>	create_slot_opt
-
-%%
-
-firstcmd: command opt_semicolon
-				{
-					replication_parse_result = $1;
-				}
-			;
-
-opt_semicolon:	';'
-				| /* EMPTY */
-				;
-
-command:
-			identify_system
-			| base_backup
-			| start_replication
-			| start_logical_replication
-			| create_replication_slot
-			| drop_replication_slot
-			| timeline_history
-			| show
-			| sql_cmd
-			;
-
-/*
- * IDENTIFY_SYSTEM
- */
-identify_system:
-			K_IDENTIFY_SYSTEM
-				{
-					$$ = (Node *) makeNode(IdentifySystemCmd);
-				}
-			;
-
-/*
- * SHOW setting
- */
-show:
-			K_SHOW var_name
-				{
-					VariableShowStmt *n = makeNode(VariableShowStmt);
-					n->name = $2;
-					$$ = (Node *) n;
-				}
-
-var_name:	IDENT	{ $$ = $1; }
-			| var_name '.' IDENT
-				{ $$ = psprintf("%s.%s", $1, $3); }
-		;
-
-/*
- * BASE_BACKUP [LABEL '<label>'] [PROGRESS] [FAST] [WAL] [NOWAIT]
- * [MAX_RATE %d] [TABLESPACE_MAP]
- */
-base_backup:
-			K_BASE_BACKUP base_backup_opt_list
-				{
-					BaseBackupCmd *cmd = makeNode(BaseBackupCmd);
-					cmd->options = $2;
-					$$ = (Node *) cmd;
-				}
-			;
-
-base_backup_opt_list:
-			base_backup_opt_list base_backup_opt
-				{ $$ = lappend($1, $2); }
-			| /* EMPTY */
-				{ $$ = NIL; }
-			;
-
-base_backup_opt:
-			K_LABEL SCONST
-				{
-				  $$ = makeDefElem("label",
-								   (Node *)makeString($2), -1);
-				}
-			| K_PROGRESS
-				{
-				  $$ = makeDefElem("progress",
-								   (Node *)makeInteger(TRUE), -1);
-				}
-			| K_FAST
-				{
-				  $$ = makeDefElem("fast",
-								   (Node *)makeInteger(TRUE), -1);
-				}
-			| K_WAL
-				{
-				  $$ = makeDefElem("wal",
-								   (Node *)makeInteger(TRUE), -1);
-				}
-			| K_NOWAIT
-				{
-				  $$ = makeDefElem("nowait",
-								   (Node *)makeInteger(TRUE), -1);
-				}
-			| K_MAX_RATE UCONST
-				{
-				  $$ = makeDefElem("max_rate",
-								   (Node *)makeInteger($2), -1);
-				}
-			| K_TABLESPACE_MAP
-				{
-				  $$ = makeDefElem("tablespace_map",
-								   (Node *)makeInteger(TRUE), -1);
-				}
-			;
-
-create_replication_slot:
-			/* CREATE_REPLICATION_SLOT slot TEMPORARY PHYSICAL RESERVE_WAL */
-			K_CREATE_REPLICATION_SLOT IDENT opt_temporary K_PHYSICAL create_slot_opt_list
-				{
-					CreateReplicationSlotCmd *cmd;
-					cmd = makeNode(CreateReplicationSlotCmd);
-					cmd->kind = REPLICATION_KIND_PHYSICAL;
-					cmd->slotname = $2;
-					cmd->temporary = $3;
-					cmd->options = $5;
-					$$ = (Node *) cmd;
-				}
-			/* CREATE_REPLICATION_SLOT slot TEMPORARY LOGICAL plugin */
-			| K_CREATE_REPLICATION_SLOT IDENT opt_temporary K_LOGICAL IDENT create_slot_opt_list
-				{
-					CreateReplicationSlotCmd *cmd;
-					cmd = makeNode(CreateReplicationSlotCmd);
-					cmd->kind = REPLICATION_KIND_LOGICAL;
-					cmd->slotname = $2;
-					cmd->temporary = $3;
-					cmd->plugin = $5;
-					cmd->options = $6;
-					$$ = (Node *) cmd;
-				}
-			;
-
-create_slot_opt_list:
-			create_slot_opt_list create_slot_opt
-				{ $$ = lappend($1, $2); }
-			| /* EMPTY */
-				{ $$ = NIL; }
-			;
-
-create_slot_opt:
-			K_EXPORT_SNAPSHOT
-				{
-				  $$ = makeDefElem("export_snapshot",
-								   (Node *)makeInteger(TRUE), -1);
-				}
-			| K_NOEXPORT_SNAPSHOT
-				{
-				  $$ = makeDefElem("export_snapshot",
-								   (Node *)makeInteger(FALSE), -1);
-				}
-			| K_USE_SNAPSHOT
-				{
-				  $$ = makeDefElem("use_snapshot",
-								   (Node *)makeInteger(TRUE), -1);
-				}
-			| K_RESERVE_WAL
-				{
-				  $$ = makeDefElem("reserve_wal",
-								   (Node *)makeInteger(TRUE), -1);
-				}
-			;
-
-/* DROP_REPLICATION_SLOT slot */
-drop_replication_slot:
-			K_DROP_REPLICATION_SLOT IDENT
-				{
-					DropReplicationSlotCmd *cmd;
-					cmd = makeNode(DropReplicationSlotCmd);
-					cmd->slotname = $2;
-					$$ = (Node *) cmd;
-				}
-			;
-
-/*
- * START_REPLICATION [SLOT slot] [PHYSICAL] %X/%X [TIMELINE %d]
- */
-start_replication:
-			K_START_REPLICATION opt_slot opt_physical RECPTR opt_timeline
-				{
-					StartReplicationCmd *cmd;
-
-					cmd = makeNode(StartReplicationCmd);
-					cmd->kind = REPLICATION_KIND_PHYSICAL;
-					cmd->slotname = $2;
-					cmd->startpoint = $4;
-					cmd->timeline = $5;
-					$$ = (Node *) cmd;
-				}
-			;
-
-/* START_REPLICATION SLOT slot LOGICAL %X/%X options */
-start_logical_replication:
-			K_START_REPLICATION K_SLOT IDENT K_LOGICAL RECPTR plugin_options
-				{
-					StartReplicationCmd *cmd;
-					cmd = makeNode(StartReplicationCmd);
-					cmd->kind = REPLICATION_KIND_LOGICAL;
-					cmd->slotname = $3;
-					cmd->startpoint = $5;
-					cmd->options = $6;
-					$$ = (Node *) cmd;
-				}
-			;
-/*
- * TIMELINE_HISTORY %d
- */
-timeline_history:
-			K_TIMELINE_HISTORY UCONST
-				{
-					TimeLineHistoryCmd *cmd;
-
-					if ($2 <= 0)
-						ereport(ERROR,
-								(errcode(ERRCODE_SYNTAX_ERROR),
-								 (errmsg("invalid timeline %u", $2))));
-
-					cmd = makeNode(TimeLineHistoryCmd);
-					cmd->timeline = $2;
-
-					$$ = (Node *) cmd;
-				}
-			;
-
-opt_physical:
-			K_PHYSICAL
-			| /* EMPTY */
-			;
-
-opt_temporary:
-			K_TEMPORARY						{ $$ = true; }
-			| /* EMPTY */					{ $$ = false; }
-			;
-
-opt_slot:
-			K_SLOT IDENT
-				{ $$ = $2; }
-			| /* EMPTY */
-				{ $$ = NULL; }
-			;
-
-opt_timeline:
-			K_TIMELINE UCONST
-				{
-					if ($2 <= 0)
-						ereport(ERROR,
-								(errcode(ERRCODE_SYNTAX_ERROR),
-								 (errmsg("invalid timeline %u", $2))));
-					$$ = $2;
-				}
-				| /* EMPTY */			{ $$ = 0; }
-			;
-
-
-plugin_options:
-			'(' plugin_opt_list ')'			{ $$ = $2; }
-			| /* EMPTY */					{ $$ = NIL; }
-		;
-
-plugin_opt_list:
-			plugin_opt_elem
-				{
-					$$ = list_make1($1);
-				}
-			| plugin_opt_list ',' plugin_opt_elem
-				{
-					$$ = lappend($1, $3);
-				}
-		;
-
-plugin_opt_elem:
-			IDENT plugin_opt_arg
-				{
-					$$ = makeDefElem($1, $2, -1);
-				}
-		;
-
-plugin_opt_arg:
-			SCONST							{ $$ = (Node *) makeString($1); }
-			| /* EMPTY */					{ $$ = NULL; }
-		;
-
-sql_cmd:
-			IDENT							{ $$ = (Node *) make_sqlcmd(); }
-		;
-%%
-
-static SQLCmd *
-make_sqlcmd(void)
-{
-	SQLCmd *cmd = makeNode(SQLCmd);
-	int tok;
-
-	/* Just move lexer to the end of command. */
-	for (;;)
-	{
-		tok = yylex();
-		if (tok == ';' || tok == 0)
-			break;
-	}
-	return cmd;
-}
-
-#include "repl_scanner.c"
diff --git a/src/backend/replication/repl_scanner.l b/src/backend/replication/repl_scanner.l
deleted file mode 100644
index 52ae7b343f..0000000000
--- a/src/backend/replication/repl_scanner.l
+++ /dev/null
@@ -1,248 +0,0 @@
-%{
-/*-------------------------------------------------------------------------
- *
- * repl_scanner.l
- *	  a lexical scanner for the replication commands
- *
- * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
- * Portions Copyright (c) 1994, Regents of the University of California
- *
- *
- * IDENTIFICATION
- *	  src/backend/replication/repl_scanner.l
- *
- *-------------------------------------------------------------------------
- */
-#include "postgres.h"
-
-#include "utils/builtins.h"
-#include "parser/scansup.h"
-
-/* Avoid exit() on fatal scanner errors (a bit ugly -- see yy_fatal_error) */
-#undef fprintf
-#define fprintf(file, fmt, msg)  fprintf_to_ereport(fmt, msg)
-
-static void
-fprintf_to_ereport(const char *fmt, const char *msg)
-{
-	ereport(ERROR, (errmsg_internal("%s", msg)));
-}
-
-/* Handle to the buffer that the lexer uses internally */
-static YY_BUFFER_STATE scanbufhandle;
-
-static StringInfoData litbuf;
-
-static void startlit(void);
-static char *litbufdup(void);
-static void addlit(char *ytext, int yleng);
-static void addlitchar(unsigned char ychar);
-
-%}
-
-%option 8bit
-%option never-interactive
-%option nodefault
-%option noinput
-%option nounput
-%option noyywrap
-%option warn
-%option prefix="replication_yy"
-
-%x xq xd
-
-/* Extended quote
- * xqdouble implements embedded quote, ''''
- */
-xqstart			{quote}
-xqdouble		{quote}{quote}
-xqinside		[^']+
-
-/* Double quote
- * Allows embedded spaces and other special characters into identifiers.
- */
-dquote			\"
-xdstart			{dquote}
-xdstop			{dquote}
-xddouble		{dquote}{dquote}
-xdinside		[^"]+
-
-digit			[0-9]+
-hexdigit		[0-9A-Za-z]+
-
-quote			'
-quotestop		{quote}
-
-ident_start		[A-Za-z\200-\377_]
-ident_cont		[A-Za-z\200-\377_0-9\$]
-
-identifier		{ident_start}{ident_cont}*
-
-%%
-
-BASE_BACKUP			{ return K_BASE_BACKUP; }
-FAST			{ return K_FAST; }
-IDENTIFY_SYSTEM		{ return K_IDENTIFY_SYSTEM; }
-SHOW		{ return K_SHOW; }
-LABEL			{ return K_LABEL; }
-NOWAIT			{ return K_NOWAIT; }
-PROGRESS			{ return K_PROGRESS; }
-MAX_RATE		{ return K_MAX_RATE; }
-WAL			{ return K_WAL; }
-TABLESPACE_MAP			{ return K_TABLESPACE_MAP; }
-TIMELINE			{ return K_TIMELINE; }
-START_REPLICATION	{ return K_START_REPLICATION; }
-CREATE_REPLICATION_SLOT		{ return K_CREATE_REPLICATION_SLOT; }
-DROP_REPLICATION_SLOT		{ return K_DROP_REPLICATION_SLOT; }
-TIMELINE_HISTORY	{ return K_TIMELINE_HISTORY; }
-PHYSICAL			{ return K_PHYSICAL; }
-RESERVE_WAL			{ return K_RESERVE_WAL; }
-LOGICAL				{ return K_LOGICAL; }
-SLOT				{ return K_SLOT; }
-TEMPORARY			{ return K_TEMPORARY; }
-EXPORT_SNAPSHOT		{ return K_EXPORT_SNAPSHOT; }
-NOEXPORT_SNAPSHOT	{ return K_NOEXPORT_SNAPSHOT; }
-USE_SNAPSHOT		{ return K_USE_SNAPSHOT; }
-
-","				{ return ','; }
-";"				{ return ';'; }
-"("				{ return '('; }
-")"				{ return ')'; }
-
-[\n]			;
-[\t]			;
-" "				;
-
-{digit}+		{
-					yylval.uintval = strtoul(yytext, NULL, 10);
-					return UCONST;
-				}
-
-{hexdigit}+\/{hexdigit}+		{
-					uint32	hi,
-							lo;
-					if (sscanf(yytext, "%X/%X", &hi, &lo) != 2)
-						yyerror("invalid streaming start location");
-					yylval.recptr = ((uint64) hi) << 32 | lo;
-					return RECPTR;
-				}
-
-{xqstart}		{
-					BEGIN(xq);
-					startlit();
-				}
-
-<xq>{quotestop}	{
-					yyless(1);
-					BEGIN(INITIAL);
-					yylval.str = litbufdup();
-					return SCONST;
-				}
-
-<xq>{xqdouble}	{
-					addlitchar('\'');
-				}
-
-<xq>{xqinside}  {
-					addlit(yytext, yyleng);
-				}
-
-{xdstart}		{
-					BEGIN(xd);
-					startlit();
-				}
-
-<xd>{xdstop}	{
-					int len;
-					yyless(1);
-					BEGIN(INITIAL);
-					yylval.str = litbufdup();
-					len = strlen(yylval.str);
-					truncate_identifier(yylval.str, len, true);
-					return IDENT;
-				}
-
-<xd>{xdinside}  {
-					addlit(yytext, yyleng);
-				}
-
-{identifier}	{
-					int len = strlen(yytext);
-
-					yylval.str = downcase_truncate_identifier(yytext, len, true);
-					return IDENT;
-				}
-
-<xq,xd><<EOF>>	{ yyerror("unterminated quoted string"); }
-
-
-<<EOF>>			{
-					yyterminate();
-				}
-
-.				{
-					return T_WORD;
-				}
-%%
-
-
-static void
-startlit(void)
-{
-	initStringInfo(&litbuf);
-}
-
-static char *
-litbufdup(void)
-{
-	return litbuf.data;
-}
-
-static void
-addlit(char *ytext, int yleng)
-{
-	appendBinaryStringInfo(&litbuf, ytext, yleng);
-}
-
-static void
-addlitchar(unsigned char ychar)
-{
-	appendStringInfoChar(&litbuf, ychar);
-}
-
-void
-yyerror(const char *message)
-{
-	ereport(ERROR,
-			(errcode(ERRCODE_SYNTAX_ERROR),
-			 errmsg_internal("%s", message)));
-}
-
-
-void
-replication_scanner_init(const char *str)
-{
-	Size		slen = strlen(str);
-	char	   *scanbuf;
-
-	/*
-	 * Might be left over after ereport()
-	 */
-	if (YY_CURRENT_BUFFER)
-		yy_delete_buffer(YY_CURRENT_BUFFER);
-
-	/*
-	 * Make a scan buffer with special termination needed by flex.
-	 */
-	scanbuf = (char *) palloc(slen + 2);
-	memcpy(scanbuf, str, slen);
-	scanbuf[slen] = scanbuf[slen + 1] = YY_END_OF_BUFFER_CHAR;
-	scanbufhandle = yy_scan_buffer(scanbuf, slen + 2);
-}
-
-void
-replication_scanner_finish(void)
-{
-	yy_delete_buffer(scanbufhandle);
-	scanbufhandle = NULL;
-}
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 064cf5ee28..e3284363e0 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -360,18 +360,7 @@ IdentifySystem(void)
 	snprintf(xpos, sizeof(xpos), "%X/%X", (uint32) (logptr >> 32), (uint32) logptr);
 
 	if (MyDatabaseId != InvalidOid)
-	{
-		MemoryContext cur = CurrentMemoryContext;
-
-		/* syscache access needs a transaction env. */
-		StartTransactionCommand();
-		/* make dbname live outside TX context */
-		MemoryContextSwitchTo(cur);
 		dbname = get_database_name(MyDatabaseId);
-		CommitTransactionCommand();
-		/* CommitTransactionCommand switches to TopMemoryContext */
-		MemoryContextSwitchTo(cur);
-	}
 
 	dest = CreateDestReceiver(DestRemoteSimple);
 	MemSet(nulls, false, sizeof(nulls));
@@ -870,6 +859,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
 							  cmd->temporary ? RS_TEMPORARY : RS_EPHEMERAL);
 	}
 
+
 	if (cmd->kind == REPLICATION_KIND_LOGICAL)
 	{
 		LogicalDecodingContext *ctx;
@@ -908,6 +898,9 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
 								"must not be called in a subtransaction")));
 		}
 
+		/* XXX: document */
+		CommitTransactionCommand();
+
 		ctx = CreateInitDecodingContext(cmd->plugin, NIL,
 										logical_read_xlog_page,
 										WalSndPrepareWrite, WalSndWriteData);
@@ -924,6 +917,9 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
 		/* build initial snapshot, might take a while */
 		DecodingContextFindStartpoint(ctx);
 
+		/* match*/
+		StartTransactionCommand();
+
 		/*
 		 * Export or use the snapshot if we've been asked to do so.
 		 *
@@ -940,8 +936,10 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
 
 			snap = SnapBuildInitialSnapshot(ctx->snapshot_builder);
 			RestoreTransactionSnapshot(snap, MyProc);
+
 		}
 
+
 		/* don't need the decoding context anymore */
 		FreeDecodingContext(ctx);
 
@@ -1354,65 +1352,37 @@ WalSndWaitForWal(XLogRecPtr loc)
 	return RecentFlushPtr;
 }
 
-/*
- * Execute an incoming replication command.
- *
- * Returns true if the cmd_string was recognized as WalSender command, false
- * if not.
- */
 bool
-exec_replication_command(const char *cmd_string)
+exec_replication_command(ReplicationCmd *cmd, const char *query_string, bool is_toplevel)
 {
-	int			parse_rc;
-	Node	   *cmd_node;
-	MemoryContext cmd_context;
-	MemoryContext old_context;
-
-	/*
-	 * CREATE_REPLICATION_SLOT ... LOGICAL exports a snapshot until the next
-	 * command arrives. Clean up the old stuff if there's anything.
-	 */
-	SnapBuildClearExportedSnapshot();
-
-	CHECK_FOR_INTERRUPTS();
-
-	cmd_context = AllocSetContextCreate(CurrentMemoryContext,
-										"Replication command context",
-										ALLOCSET_DEFAULT_SIZES);
-	old_context = MemoryContextSwitchTo(cmd_context);
-
-	replication_scanner_init(cmd_string);
-	parse_rc = replication_yyparse();
-	if (parse_rc != 0)
+	if (!am_walsender)
+	{
 		ereport(ERROR,
-				(errcode(ERRCODE_SYNTAX_ERROR),
-				 (errmsg_internal("replication command parser returned %d",
-								  parse_rc))));
-
-	cmd_node = replication_parse_result;
+				(errcode(ERRCODE_PROTOCOL_VIOLATION),
+				 errmsg("replication protocol statement not supported in a normal connection")));
+	}
 
 	/*
 	 * Log replication command if log_replication_commands is enabled. Even
 	 * when it's disabled, log the command with DEBUG1 level for backward
-	 * compatibility. Note that SQL commands are not logged here, and will be
-	 * logged later if log_statement is enabled.
+	 * compatibility.
 	 */
-	if (cmd_node->type != T_SQLCmd)
-		ereport(log_replication_commands ? LOG : DEBUG1,
-				(errmsg("received replication command: %s", cmd_string)));
+	ereport(log_replication_commands ? LOG : DEBUG1,
+			(errmsg("received replication command: %s", query_string)));
 
 	/*
 	 * CREATE_REPLICATION_SLOT ... LOGICAL exports a snapshot. If it was
 	 * called outside of transaction the snapshot should be cleared here.
+	 *
+	 * FIXME: this is completely borked.
 	 */
 	if (!IsTransactionBlock())
 		SnapBuildClearExportedSnapshot();
 
 	/*
-	 * For aborted transactions, don't allow anything except pure SQL,
-	 * the exec_simple_query() will handle it correctly.
+	 * Nothing to do here in an aborted transaction.
 	 */
-	if (IsAbortedTransactionBlockState() && !IsA(cmd_node, SQLCmd))
+	if (IsAbortedTransactionBlockState())
 		ereport(ERROR,
 				(errcode(ERRCODE_IN_FAILED_SQL_TRANSACTION),
 				 errmsg("current transaction is aborted, "
@@ -1428,7 +1398,7 @@ exec_replication_command(const char *cmd_string)
 	initStringInfo(&reply_message);
 	initStringInfo(&tmpbuf);
 
-	switch (cmd_node->type)
+	switch (nodeTag(cmd->replicationCmd))
 	{
 		case T_IdentifySystemCmd:
 			IdentifySystem();
@@ -1436,64 +1406,46 @@ exec_replication_command(const char *cmd_string)
 
 		case T_BaseBackupCmd:
 			PreventTransactionChain(true, "BASE_BACKUP");
-			SendBaseBackup((BaseBackupCmd *) cmd_node);
+			CommitTransactionCommand();
+			SendBaseBackup((BaseBackupCmd *) cmd->replicationCmd);
+			StartTransactionCommand();
 			break;
 
 		case T_CreateReplicationSlotCmd:
-			CreateReplicationSlot((CreateReplicationSlotCmd *) cmd_node);
+			CreateReplicationSlot((CreateReplicationSlotCmd *) cmd->replicationCmd);
 			break;
 
 		case T_DropReplicationSlotCmd:
-			DropReplicationSlot((DropReplicationSlotCmd *) cmd_node);
+			DropReplicationSlot((DropReplicationSlotCmd *) cmd->replicationCmd);
 			break;
 
 		case T_StartReplicationCmd:
 			{
-				StartReplicationCmd *cmd = (StartReplicationCmd *) cmd_node;
+				StartReplicationCmd *scmd = (StartReplicationCmd *) cmd->replicationCmd;
 
 				PreventTransactionChain(true, "START_REPLICATION");
 
-				if (cmd->kind == REPLICATION_KIND_PHYSICAL)
-					StartReplication(cmd);
+				CommitTransactionCommand();
+
+				if (scmd->kind == REPLICATION_KIND_PHYSICAL)
+					StartReplication(scmd);
 				else
-					StartLogicalReplication(cmd);
+					StartLogicalReplication(scmd);
+
+				StartTransactionCommand();
 				break;
 			}
 
 		case T_TimeLineHistoryCmd:
 			PreventTransactionChain(true, "TIMELINE_HISTORY");
-			SendTimeLineHistory((TimeLineHistoryCmd *) cmd_node);
+			SendTimeLineHistory((TimeLineHistoryCmd *) cmd->replicationCmd);
 			break;
 
-		case T_VariableShowStmt:
-			{
-				DestReceiver *dest = CreateDestReceiver(DestRemoteSimple);
-				VariableShowStmt *n = (VariableShowStmt *) cmd_node;
-
-				GetPGVariable(n->name, dest);
-			}
-			break;
-
-		case T_SQLCmd:
-			if (MyDatabaseId == InvalidOid)
-				ereport(ERROR,
-						(errmsg("not connected to database")));
-
-			/* Tell the caller that this wasn't a WalSender command. */
-			return false;
-
 		default:
 			elog(ERROR, "unrecognized replication command node tag: %u",
-				 cmd_node->type);
+				 cmd->replicationCmd->type);
 	}
 
-	/* done */
-	MemoryContextSwitchTo(old_context);
-	MemoryContextDelete(cmd_context);
-
-	/* Send CommandComplete message */
-	EndCommand("SELECT", DestRemote);
-
 	return true;
 }
 
diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c
index 75c2d9a61d..485e61f709 100644
--- a/src/backend/tcop/postgres.c
+++ b/src/backend/tcop/postgres.c
@@ -173,7 +173,6 @@ static int	InteractiveBackend(StringInfo inBuf);
 static int	interactive_getc(void);
 static int	SocketBackend(StringInfo inBuf);
 static int	ReadCommand(StringInfo inBuf);
-static void forbidden_in_wal_sender(char firstchar);
 static List *pg_rewrite_query(Query *query);
 static bool check_log_statement(List *stmt_list);
 static int	errdetail_execute(List *raw_parsetree_list);
@@ -1015,6 +1014,14 @@ exec_simple_query(const char *query_string)
 		 */
 		if (analyze_requires_snapshot(parsetree))
 		{
+			if (am_walsender && !am_db_walsender)
+			{
+				/* FIXME: message */
+				ereport(ERROR,
+						(errcode(ERRCODE_PROTOCOL_VIOLATION),
+						 errmsg("non-replication statement not supported in pure replication connection")));
+			}
+
 			PushActiveSnapshot(GetTransactionSnapshot());
 			snapshot_set = true;
 		}
@@ -1325,6 +1332,14 @@ exec_parse_message(const char *query_string,	/* string to execute */
 		 */
 		if (analyze_requires_snapshot(raw_parse_tree))
 		{
+			if (am_walsender && !am_db_walsender)
+			{
+				/* FIXME: message */
+				ereport(ERROR,
+						(errcode(ERRCODE_PROTOCOL_VIOLATION),
+						 errmsg("non-replication statement not supported in pure replication connection")));
+			}
+
 			PushActiveSnapshot(GetTransactionSnapshot());
 			snapshot_set = true;
 		}
@@ -1610,6 +1625,14 @@ exec_bind_message(StringInfo input_message)
 		(psrc->raw_parse_tree &&
 		 analyze_requires_snapshot(psrc->raw_parse_tree)))
 	{
+		if (am_walsender && !am_db_walsender)
+		{
+			/* FIXME: message */
+			ereport(ERROR,
+					(errcode(ERRCODE_PROTOCOL_VIOLATION),
+					 errmsg("non-replication statement not supported in pure replication connection")));
+		}
+
 		PushActiveSnapshot(GetTransactionSnapshot());
 		snapshot_set = true;
 	}
@@ -4066,13 +4089,7 @@ PostgresMain(int argc, char *argv[],
 					query_string = pq_getmsgstring(&input_message);
 					pq_getmsgend(&input_message);
 
-					if (am_walsender)
-					{
-						if (!exec_replication_command(query_string))
-							exec_simple_query(query_string);
-					}
-					else
-						exec_simple_query(query_string);
+					exec_simple_query(query_string);
 
 					send_ready_for_query = true;
 				}
@@ -4085,8 +4102,6 @@ PostgresMain(int argc, char *argv[],
 					int			numParams;
 					Oid		   *paramTypes = NULL;
 
-					forbidden_in_wal_sender(firstchar);
-
 					/* Set statement_timestamp() */
 					SetCurrentStatementStartTimestamp();
 
@@ -4109,8 +4124,6 @@ PostgresMain(int argc, char *argv[],
 				break;
 
 			case 'B':			/* bind */
-				forbidden_in_wal_sender(firstchar);
-
 				/* Set statement_timestamp() */
 				SetCurrentStatementStartTimestamp();
 
@@ -4126,8 +4139,6 @@ PostgresMain(int argc, char *argv[],
 					const char *portal_name;
 					int			max_rows;
 
-					forbidden_in_wal_sender(firstchar);
-
 					/* Set statement_timestamp() */
 					SetCurrentStatementStartTimestamp();
 
@@ -4140,8 +4151,6 @@ PostgresMain(int argc, char *argv[],
 				break;
 
 			case 'F':			/* fastpath function call */
-				forbidden_in_wal_sender(firstchar);
-
 				/* Set statement_timestamp() */
 				SetCurrentStatementStartTimestamp();
 
@@ -4177,8 +4186,6 @@ PostgresMain(int argc, char *argv[],
 					int			close_type;
 					const char *close_target;
 
-					forbidden_in_wal_sender(firstchar);
-
 					close_type = pq_getmsgbyte(&input_message);
 					close_target = pq_getmsgstring(&input_message);
 					pq_getmsgend(&input_message);
@@ -4221,8 +4228,6 @@ PostgresMain(int argc, char *argv[],
 					int			describe_type;
 					const char *describe_target;
 
-					forbidden_in_wal_sender(firstchar);
-
 					/* Set statement_timestamp() (needed for xact) */
 					SetCurrentStatementStartTimestamp();
 
@@ -4305,30 +4310,6 @@ PostgresMain(int argc, char *argv[],
 }
 
 /*
- * Throw an error if we're a WAL sender process.
- *
- * This is used to forbid anything else than simple query protocol messages
- * in a WAL sender process.  'firstchar' specifies what kind of a forbidden
- * message was received, and is used to construct the error message.
- */
-static void
-forbidden_in_wal_sender(char firstchar)
-{
-	if (am_walsender)
-	{
-		if (firstchar == 'F')
-			ereport(ERROR,
-					(errcode(ERRCODE_PROTOCOL_VIOLATION),
-					 errmsg("fastpath function calls not supported in a replication connection")));
-		else
-			ereport(ERROR,
-					(errcode(ERRCODE_PROTOCOL_VIOLATION),
-					 errmsg("extended query protocol not supported in a replication connection")));
-	}
-}
-
-
-/*
  * Obtain platform stack depth limit (in bytes)
  *
  * Return -1 if unknown
diff --git a/src/backend/tcop/pquery.c b/src/backend/tcop/pquery.c
index e30aeb1c7f..450ee08461 100644
--- a/src/backend/tcop/pquery.c
+++ b/src/backend/tcop/pquery.c
@@ -1155,6 +1155,8 @@ PortalRunUtility(Portal portal, PlannedStmt *pstmt,
 		  IsA(utilityStmt, VariableSetStmt) ||
 		  IsA(utilityStmt, VariableShowStmt) ||
 		  IsA(utilityStmt, ConstraintsSetStmt) ||
+		  /* replication commands shouldn't run w/ snapshot  */
+		  IsA(utilityStmt, ReplicationCmd) ||
 	/* efficiency hacks from here down */
 		  IsA(utilityStmt, FetchStmt) ||
 		  IsA(utilityStmt, ListenStmt) ||
diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c
index 24e5c427c6..a313fde8d9 100644
--- a/src/backend/tcop/utility.c
+++ b/src/backend/tcop/utility.c
@@ -57,10 +57,12 @@
 #include "commands/vacuum.h"
 #include "commands/view.h"
 #include "miscadmin.h"
+#include "nodes/replnodes.h"
 #include "parser/parse_utilcmd.h"
 #include "postmaster/bgwriter.h"
 #include "rewrite/rewriteDefine.h"
 #include "rewrite/rewriteRemove.h"
+#include "replication/walsender.h"
 #include "storage/fd.h"
 #include "tcop/pquery.h"
 #include "tcop/utility.h"
@@ -923,6 +925,11 @@ standard_ProcessUtility(PlannedStmt *pstmt,
 				break;
 			}
 
+		case T_ReplicationCmd:
+			exec_replication_command((ReplicationCmd *) parsetree, queryString, isTopLevel);
+			strcpy(completionTag, CreateCommandTag(parsetree));
+			break;
+
 		default:
 			/* All other statement types have event trigger support */
 			ProcessUtilitySlow(pstate, pstmt, queryString,
@@ -2843,6 +2850,40 @@ CreateCommandTag(Node *parsetree)
 			}
 			break;
 
+		case T_ReplicationCmd:
+			switch (nodeTag(((ReplicationCmd *) parsetree)->replicationCmd))
+			{
+				case T_IdentifySystemCmd:
+					tag = "IDENTIFY_SYSTEM";
+					break;
+
+				case T_BaseBackupCmd:
+					tag = "BASE_BACKUP";
+					break;
+
+				case T_CreateReplicationSlotCmd:
+					tag = "CREATE_REPLICATION_SLOT";
+					break;
+
+				case T_DropReplicationSlotCmd:
+					tag = "DROP_REPLICATION_SLOT";
+					break;
+
+				case T_StartReplicationCmd:
+					tag = "START_REPLICATION";
+					break;
+
+				case T_TimeLineHistoryCmd:
+					tag = "TIMELINE_HISTORY";
+					break;
+				default:
+					elog(WARNING, "unrecognized replication command: %d",
+						 (int) nodeTag(parsetree));
+					tag = "???";
+					break;
+			}
+			break;
+
 		default:
 			elog(WARNING, "unrecognized node type: %d",
 				 (int) nodeTag(parsetree));
@@ -3349,6 +3390,11 @@ GetCommandLogLevel(Node *parsetree)
 			}
 			break;
 
+		case T_ReplicationCmd:
+			/* FIXME: Invent new category? */
+			lev = LOGSTMT_DDL;
+			break;
+
 		default:
 			elog(WARNING, "unrecognized node type: %d",
 				 (int) nodeTag(parsetree));
diff --git a/src/bin/pg_basebackup/pg_recvlogical.c b/src/bin/pg_basebackup/pg_recvlogical.c
index 6b081bd737..2950c7b338 100644
--- a/src/bin/pg_basebackup/pg_recvlogical.c
+++ b/src/bin/pg_basebackup/pg_recvlogical.c
@@ -240,7 +240,7 @@ StreamLogicalLog(void)
 				replication_slot);
 
 	/* Initiate the replication stream at specified location */
-	appendPQExpBuffer(query, "START_REPLICATION SLOT \"%s\" LOGICAL %X/%X",
+	appendPQExpBuffer(query, "START_REPLICATION SLOT \"%s\" LOGICAL '%X/%X'",
 			 replication_slot, (uint32) (startpos >> 32), (uint32) startpos);
 
 	/* print options if there are any */
diff --git a/src/bin/pg_basebackup/receivelog.c b/src/bin/pg_basebackup/receivelog.c
index 8511e57cf7..5a11bb8c4f 100644
--- a/src/bin/pg_basebackup/receivelog.c
+++ b/src/bin/pg_basebackup/receivelog.c
@@ -583,7 +583,7 @@ ReceiveXlogStream(PGconn *conn, StreamCtl *stream)
 			return true;
 
 		/* Initiate the replication stream at specified location */
-		snprintf(query, sizeof(query), "START_REPLICATION %s%X/%X TIMELINE %u",
+		snprintf(query, sizeof(query), "START_REPLICATION %s'%X/%X' TIMELINE %u",
 				 slotcmd,
 				 (uint32) (stream->startpos >> 32), (uint32) stream->startpos,
 				 stream->timeline);
diff --git a/src/include/nodes/nodes.h b/src/include/nodes/nodes.h
index f59d719923..107b36fd6e 100644
--- a/src/include/nodes/nodes.h
+++ b/src/include/nodes/nodes.h
@@ -472,13 +472,13 @@ typedef enum NodeTag
 	/*
 	 * TAGS FOR REPLICATION GRAMMAR PARSE NODES (replnodes.h)
 	 */
+	T_ReplicationCmd, /* container for all other replication commands */
 	T_IdentifySystemCmd,
 	T_BaseBackupCmd,
 	T_CreateReplicationSlotCmd,
 	T_DropReplicationSlotCmd,
 	T_StartReplicationCmd,
 	T_TimeLineHistoryCmd,
-	T_SQLCmd,
 
 	/*
 	 * TAGS FOR RANDOM OTHER STUFF
diff --git a/src/include/nodes/replnodes.h b/src/include/nodes/replnodes.h
index 92ada41b6d..b4afab85da 100644
--- a/src/include/nodes/replnodes.h
+++ b/src/include/nodes/replnodes.h
@@ -17,6 +17,14 @@
 #include "access/xlogdefs.h"
 #include "nodes/pg_list.h"
 
+
+typedef struct ReplicationCmd
+{
+	NodeTag		type;
+
+	Node	   *replicationCmd;
+} ReplicationCmd;
+
 typedef enum ReplicationKind
 {
 	REPLICATION_KIND_PHYSICAL,
diff --git a/src/include/parser/gramparse.h b/src/include/parser/gramparse.h
index 2da98f67f3..0a3edc79e2 100644
--- a/src/include/parser/gramparse.h
+++ b/src/include/parser/gramparse.h
@@ -20,6 +20,7 @@
 #define GRAMPARSE_H
 
 #include "nodes/parsenodes.h"
+#include "access/xlogdefs.h"
 #include "parser/scanner.h"
 
 /*
diff --git a/src/include/parser/kwlist.h b/src/include/parser/kwlist.h
index 37542aaee4..7130803e21 100644
--- a/src/include/parser/kwlist.h
+++ b/src/include/parser/kwlist.h
@@ -53,6 +53,7 @@ PG_KEYWORD("attach", ATTACH, UNRESERVED_KEYWORD)
 PG_KEYWORD("attribute", ATTRIBUTE, UNRESERVED_KEYWORD)
 PG_KEYWORD("authorization", AUTHORIZATION, TYPE_FUNC_NAME_KEYWORD)
 PG_KEYWORD("backward", BACKWARD, UNRESERVED_KEYWORD)
+PG_KEYWORD("base_backup", BASE_BACKUP, UNRESERVED_KEYWORD)
 PG_KEYWORD("before", BEFORE, UNRESERVED_KEYWORD)
 PG_KEYWORD("begin", BEGIN_P, UNRESERVED_KEYWORD)
 PG_KEYWORD("between", BETWEEN, COL_NAME_KEYWORD)
@@ -99,6 +100,7 @@ PG_KEYWORD("conversion", CONVERSION_P, UNRESERVED_KEYWORD)
 PG_KEYWORD("copy", COPY, UNRESERVED_KEYWORD)
 PG_KEYWORD("cost", COST, UNRESERVED_KEYWORD)
 PG_KEYWORD("create", CREATE, RESERVED_KEYWORD)
+PG_KEYWORD("create_replication_slot", CREATE_REPLICATION_SLOT, UNRESERVED_KEYWORD)
 PG_KEYWORD("cross", CROSS, TYPE_FUNC_NAME_KEYWORD)
 PG_KEYWORD("csv", CSV, UNRESERVED_KEYWORD)
 PG_KEYWORD("cube", CUBE, UNRESERVED_KEYWORD)
@@ -139,6 +141,7 @@ PG_KEYWORD("document", DOCUMENT_P, UNRESERVED_KEYWORD)
 PG_KEYWORD("domain", DOMAIN_P, UNRESERVED_KEYWORD)
 PG_KEYWORD("double", DOUBLE_P, UNRESERVED_KEYWORD)
 PG_KEYWORD("drop", DROP, UNRESERVED_KEYWORD)
+PG_KEYWORD("drop_replication_slot", DROP_REPLICATION_SLOT, UNRESERVED_KEYWORD)
 PG_KEYWORD("each", EACH, UNRESERVED_KEYWORD)
 PG_KEYWORD("else", ELSE, RESERVED_KEYWORD)
 PG_KEYWORD("enable", ENABLE_P, UNRESERVED_KEYWORD)
@@ -155,11 +158,13 @@ PG_KEYWORD("exclusive", EXCLUSIVE, UNRESERVED_KEYWORD)
 PG_KEYWORD("execute", EXECUTE, UNRESERVED_KEYWORD)
 PG_KEYWORD("exists", EXISTS, COL_NAME_KEYWORD)
 PG_KEYWORD("explain", EXPLAIN, UNRESERVED_KEYWORD)
+PG_KEYWORD("export_snapshot", EXPORT_SNAPSHOT, UNRESERVED_KEYWORD)
 PG_KEYWORD("extension", EXTENSION, UNRESERVED_KEYWORD)
 PG_KEYWORD("external", EXTERNAL, UNRESERVED_KEYWORD)
 PG_KEYWORD("extract", EXTRACT, COL_NAME_KEYWORD)
 PG_KEYWORD("false", FALSE_P, RESERVED_KEYWORD)
 PG_KEYWORD("family", FAMILY, UNRESERVED_KEYWORD)
+PG_KEYWORD("fast", FAST_P, UNRESERVED_KEYWORD)
 PG_KEYWORD("fetch", FETCH, RESERVED_KEYWORD)
 PG_KEYWORD("filter", FILTER, UNRESERVED_KEYWORD)
 PG_KEYWORD("first", FIRST_P, UNRESERVED_KEYWORD)
@@ -186,6 +191,7 @@ PG_KEYWORD("having", HAVING, RESERVED_KEYWORD)
 PG_KEYWORD("header", HEADER_P, UNRESERVED_KEYWORD)
 PG_KEYWORD("hold", HOLD, UNRESERVED_KEYWORD)
 PG_KEYWORD("hour", HOUR_P, UNRESERVED_KEYWORD)
+PG_KEYWORD("identify_system", IDENTIFY_SYSTEM, UNRESERVED_KEYWORD)
 PG_KEYWORD("identity", IDENTITY_P, UNRESERVED_KEYWORD)
 PG_KEYWORD("if", IF_P, UNRESERVED_KEYWORD)
 PG_KEYWORD("ilike", ILIKE, TYPE_FUNC_NAME_KEYWORD)
@@ -240,9 +246,11 @@ PG_KEYWORD("location", LOCATION, UNRESERVED_KEYWORD)
 PG_KEYWORD("lock", LOCK_P, UNRESERVED_KEYWORD)
 PG_KEYWORD("locked", LOCKED, UNRESERVED_KEYWORD)
 PG_KEYWORD("logged", LOGGED, UNRESERVED_KEYWORD)
+PG_KEYWORD("logical", LOGICAL_P, UNRESERVED_KEYWORD)
 PG_KEYWORD("mapping", MAPPING, UNRESERVED_KEYWORD)
 PG_KEYWORD("match", MATCH, UNRESERVED_KEYWORD)
 PG_KEYWORD("materialized", MATERIALIZED, UNRESERVED_KEYWORD)
+PG_KEYWORD("max_rate", MAX_RATE, UNRESERVED_KEYWORD)
 PG_KEYWORD("maxvalue", MAXVALUE, UNRESERVED_KEYWORD)
 PG_KEYWORD("method", METHOD, UNRESERVED_KEYWORD)
 PG_KEYWORD("minute", MINUTE_P, UNRESERVED_KEYWORD)
@@ -258,6 +266,7 @@ PG_KEYWORD("nchar", NCHAR, COL_NAME_KEYWORD)
 PG_KEYWORD("new", NEW, UNRESERVED_KEYWORD)
 PG_KEYWORD("next", NEXT, UNRESERVED_KEYWORD)
 PG_KEYWORD("no", NO, UNRESERVED_KEYWORD)
+PG_KEYWORD("noexport_snapshot", NOEXPORT_SNAPSHOT, UNRESERVED_KEYWORD)
 PG_KEYWORD("none", NONE, COL_NAME_KEYWORD)
 PG_KEYWORD("norefresh", NOREFRESH, UNRESERVED_KEYWORD)
 PG_KEYWORD("not", NOT, RESERVED_KEYWORD)
@@ -297,6 +306,7 @@ PG_KEYWORD("partial", PARTIAL, UNRESERVED_KEYWORD)
 PG_KEYWORD("partition", PARTITION, UNRESERVED_KEYWORD)
 PG_KEYWORD("passing", PASSING, UNRESERVED_KEYWORD)
 PG_KEYWORD("password", PASSWORD, UNRESERVED_KEYWORD)
+PG_KEYWORD("physical", PHYSICAL, UNRESERVED_KEYWORD)
 PG_KEYWORD("placing", PLACING, RESERVED_KEYWORD)
 PG_KEYWORD("plans", PLANS, UNRESERVED_KEYWORD)
 PG_KEYWORD("policy", POLICY, UNRESERVED_KEYWORD)
@@ -312,6 +322,7 @@ PG_KEYWORD("privileges", PRIVILEGES, UNRESERVED_KEYWORD)
 PG_KEYWORD("procedural", PROCEDURAL, UNRESERVED_KEYWORD)
 PG_KEYWORD("procedure", PROCEDURE, UNRESERVED_KEYWORD)
 PG_KEYWORD("program", PROGRAM, UNRESERVED_KEYWORD)
+PG_KEYWORD("progress", PROGRESS, UNRESERVED_KEYWORD)
 PG_KEYWORD("publication", PUBLICATION, UNRESERVED_KEYWORD)
 PG_KEYWORD("quote", QUOTE, UNRESERVED_KEYWORD)
 PG_KEYWORD("range", RANGE, UNRESERVED_KEYWORD)
@@ -331,6 +342,7 @@ PG_KEYWORD("rename", RENAME, UNRESERVED_KEYWORD)
 PG_KEYWORD("repeatable", REPEATABLE, UNRESERVED_KEYWORD)
 PG_KEYWORD("replace", REPLACE, UNRESERVED_KEYWORD)
 PG_KEYWORD("replica", REPLICA, UNRESERVED_KEYWORD)
+PG_KEYWORD("reserve_wal", RESERVE_WAL, UNRESERVED_KEYWORD)
 PG_KEYWORD("reset", RESET, UNRESERVED_KEYWORD)
 PG_KEYWORD("restart", RESTART, UNRESERVED_KEYWORD)
 PG_KEYWORD("restrict", RESTRICT, UNRESERVED_KEYWORD)
@@ -374,6 +386,7 @@ PG_KEYWORD("sql", SQL_P, UNRESERVED_KEYWORD)
 PG_KEYWORD("stable", STABLE, UNRESERVED_KEYWORD)
 PG_KEYWORD("standalone", STANDALONE_P, UNRESERVED_KEYWORD)
 PG_KEYWORD("start", START, UNRESERVED_KEYWORD)
+PG_KEYWORD("start_replication", START_REPLICATION, UNRESERVED_KEYWORD)
 PG_KEYWORD("statement", STATEMENT, UNRESERVED_KEYWORD)
 PG_KEYWORD("statistics", STATISTICS, UNRESERVED_KEYWORD)
 PG_KEYWORD("stdin", STDIN, UNRESERVED_KEYWORD)
@@ -390,12 +403,15 @@ PG_KEYWORD("table", TABLE, RESERVED_KEYWORD)
 PG_KEYWORD("tables", TABLES, UNRESERVED_KEYWORD)
 PG_KEYWORD("tablesample", TABLESAMPLE, TYPE_FUNC_NAME_KEYWORD)
 PG_KEYWORD("tablespace", TABLESPACE, UNRESERVED_KEYWORD)
+PG_KEYWORD("tablespace_map", TABLESPACE_MAP_P, UNRESERVED_KEYWORD)
 PG_KEYWORD("temp", TEMP, UNRESERVED_KEYWORD)
 PG_KEYWORD("template", TEMPLATE, UNRESERVED_KEYWORD)
 PG_KEYWORD("temporary", TEMPORARY, UNRESERVED_KEYWORD)
 PG_KEYWORD("text", TEXT_P, UNRESERVED_KEYWORD)
 PG_KEYWORD("then", THEN, RESERVED_KEYWORD)
 PG_KEYWORD("time", TIME, COL_NAME_KEYWORD)
+PG_KEYWORD("timeline", TIMELINE_P, UNRESERVED_KEYWORD)
+PG_KEYWORD("timeline_history", TIMELINE_HISTORY, UNRESERVED_KEYWORD)
 PG_KEYWORD("timestamp", TIMESTAMP, COL_NAME_KEYWORD)
 PG_KEYWORD("to", TO, RESERVED_KEYWORD)
 PG_KEYWORD("trailing", TRAILING, RESERVED_KEYWORD)
@@ -419,6 +435,7 @@ PG_KEYWORD("unlisten", UNLISTEN, UNRESERVED_KEYWORD)
 PG_KEYWORD("unlogged", UNLOGGED, UNRESERVED_KEYWORD)
 PG_KEYWORD("until", UNTIL, UNRESERVED_KEYWORD)
 PG_KEYWORD("update", UPDATE, UNRESERVED_KEYWORD)
+PG_KEYWORD("use_snapshot", USE_SNAPSHOT, UNRESERVED_KEYWORD)
 PG_KEYWORD("user", USER, RESERVED_KEYWORD)
 PG_KEYWORD("using", USING, RESERVED_KEYWORD)
 PG_KEYWORD("vacuum", VACUUM, UNRESERVED_KEYWORD)
@@ -435,6 +452,7 @@ PG_KEYWORD("version", VERSION_P, UNRESERVED_KEYWORD)
 PG_KEYWORD("view", VIEW, UNRESERVED_KEYWORD)
 PG_KEYWORD("views", VIEWS, UNRESERVED_KEYWORD)
 PG_KEYWORD("volatile", VOLATILE, UNRESERVED_KEYWORD)
+PG_KEYWORD("wal", WAL_P, UNRESERVED_KEYWORD)
 PG_KEYWORD("when", WHEN, RESERVED_KEYWORD)
 PG_KEYWORD("where", WHERE, RESERVED_KEYWORD)
 PG_KEYWORD("whitespace", WHITESPACE_P, UNRESERVED_KEYWORD)
diff --git a/src/include/replication/walsender.h b/src/include/replication/walsender.h
index 2ca903872e..65026eb83b 100644
--- a/src/include/replication/walsender.h
+++ b/src/include/replication/walsender.h
@@ -16,6 +16,9 @@
 
 #include "fmgr.h"
 
+#include "nodes/nodes.h"
+#include "nodes/replnodes.h"
+
 /*
  * What to do with a snapshot in create replication slot command.
  */
@@ -38,7 +41,7 @@ extern int	wal_sender_timeout;
 extern bool log_replication_commands;
 
 extern void InitWalSender(void);
-extern bool exec_replication_command(const char *query_string);
+extern bool exec_replication_command(ReplicationCmd *cmd, const char *query_string, bool isToplevel);
 extern void WalSndErrorCleanup(void);
 extern void WalSndSignals(void);
 extern Size WalSndShmemSize(void);
-- 
2.12.0.264.gd6db3f2165.dirty

