Separation walsender & normal backends
Hi,
I've for a while suspected that the separation & duplication of
infrastructure between walsenders and normal backends isn't nice. But
looking at the code changes in v10 drove that home quite a bit.
The major changes in this area are
1) d1ecd539477fe640455dc890216a7c1561e047b4 - Add a SHOW command to the replication command language.
2) 7c4f52409a8c7d85ed169bbbc1f6092274d03920 - Logical replication support for initial data copy
The first adds SHOW support for the replication protocol. With such
oddities that "SHOW hba_file;" works, but "show hba_file;"
doesn't. Unless your're connected to a database, when feature 2) kicks
in and falls back to the normal SQL parser (as it'd for most other SQL
commands).
With 2) we can execute normal SQL over a walsender connection. That's
useful for logical rep because it needs to able to copy data (via
e.g. COPY), read catalogs (SELECT) and stream data (via
START_REPLICATION).
The problem is that we now end up with fairly similar infrastructure,
that's duplicated in walsender and normal backends. We've already seen a
couple bugs stem from this:
- parallel queries hang when executed over walsender connection
- query cancel doesn't work over walsender connection
- parser/scanner hack deciding between replication and normal grammer
isn't correct
- SnapBuildClearExportedSnapshot isn't called reliably anymore
- config files aren't reloaded while idle in a walsender connection
while all of those are fixable, and several of them already have
proposed fixes, I think this is some indication that the current split
isn't well thought out. I think in hindsight, the whole idea of
introducing a separate protocol/language for the replication protocol
was a rather bad one.
I think we should give up on the current course, and decide to unify as
much as possible. Petr already proposed some fixes for the above
(unifying signal and config file handling), but I don't think that
solves the more fundamental issue of different grammars.
For me it's fairly obvious that we should try to merge the two grammars,
and live with the slight uglyness that'll caused by doing so. The
primary problem here is that both languages "look" different, primary
that the replication protocol uses enforced-all-caps-with-underscores as
style.
Attached is a very rough POC, that unifies the two grammars.
Unsurprisingly that requires the addition of a bunch of additional
keywords, but at least they're all unreserved. Both grammars seem to
mostly merge well, except for repl_scanner.l's RECPTR which I wasn't
able to add to scan.l within a couple minutes (therefore single quotes
are now necessary). This really is WIP, and not meant as a patch to be
reviewed in detail, but just as something to be discussed.
I'm probably going to be tarred and feathered for this position, but I
think we need to fix this before v10 is coming out. We're taking on
quite some architectural burden here, and I'm pretty sure we'd otherwise
have to fix it in v11, so we'll be stuck with some odd-duck v10, that
behaves different from all other versions.
If so, we'd have to:
- gate catalog access more careful in !am_db_walsender connections
- find a better way to deal with repl_scan.l's RECPTR
- remove replnodes.h (or move at least parts of it) into parsenodes.h
- do a lot of other minor cleanup
Greetings,
Andres Freund
Attachments:
0001-WIP-POC-Prototype-Unify-SQL-and-replication-grammars.patchtext/x-patch; charset=us-asciiDownload
From 63866c2c8f9c968957dfb0d5b8c5ceca2c4787ed Mon Sep 17 00:00:00 2001
From: Andres Freund <andres@anarazel.de>
Date: Mon, 24 Apr 2017 22:57:49 -0700
Subject: [PATCH] WIP: POC: Prototype: Unify SQL and replication grammars.
---
src/backend/commands/event_trigger.c | 8 +-
src/backend/parser/analyze.c | 10 +
src/backend/parser/gram.y | 396 ++++++++++++++++++--
src/backend/replication/Makefile | 5 +-
.../libpqwalreceiver/libpqwalreceiver.c | 2 +-
src/backend/replication/logical/snapbuild.c | 5 +-
src/backend/replication/repl_gram.y | 408 ---------------------
src/backend/replication/repl_scanner.l | 248 -------------
src/backend/replication/walsender.c | 124 ++-----
src/backend/tcop/postgres.c | 69 ++--
src/backend/tcop/pquery.c | 2 +
src/backend/tcop/utility.c | 46 +++
src/bin/pg_basebackup/pg_recvlogical.c | 2 +-
src/bin/pg_basebackup/receivelog.c | 2 +-
src/include/nodes/nodes.h | 2 +-
src/include/nodes/replnodes.h | 8 +
src/include/parser/gramparse.h | 1 +
src/include/parser/kwlist.h | 18 +
src/include/replication/walsender.h | 5 +-
19 files changed, 538 insertions(+), 823 deletions(-)
delete mode 100644 src/backend/replication/repl_gram.y
delete mode 100644 src/backend/replication/repl_scanner.l
diff --git a/src/backend/commands/event_trigger.c b/src/backend/commands/event_trigger.c
index d7c199f314..1e441fbe86 100644
--- a/src/backend/commands/event_trigger.c
+++ b/src/backend/commands/event_trigger.c
@@ -295,7 +295,13 @@ check_ddl_tag(const char *tag)
pg_strcasecmp(tag, "REVOKE") == 0 ||
pg_strcasecmp(tag, "DROP OWNED") == 0 ||
pg_strcasecmp(tag, "IMPORT FOREIGN SCHEMA") == 0 ||
- pg_strcasecmp(tag, "SECURITY LABEL") == 0)
+ pg_strcasecmp(tag, "SECURITY LABEL") == 0 ||
+ pg_strcasecmp(tag, "IDENTIFY_SYSTEM") == 0 ||
+ pg_strcasecmp(tag, "BASE_BACKUP") == 0 ||
+ pg_strcasecmp(tag, "CREATE_REPLICATION_SLOT") == 0 ||
+ pg_strcasecmp(tag, "DROP_REPLICATION_SLOT") == 0 ||
+ pg_strcasecmp(tag, "START_REPLICATION") == 0 ||
+ pg_strcasecmp(tag, "TIMELINE_HISTORY") == 0)
return EVENT_TRIGGER_COMMAND_TAG_OK;
/*
diff --git a/src/backend/parser/analyze.c b/src/backend/parser/analyze.c
index 567dd54c6c..d969c0a4e0 100644
--- a/src/backend/parser/analyze.c
+++ b/src/backend/parser/analyze.c
@@ -42,6 +42,7 @@
#include "parser/parse_target.h"
#include "parser/parsetree.h"
#include "rewrite/rewriteManip.h"
+#include "replication/walsender.h"
#include "utils/rel.h"
@@ -104,6 +105,15 @@ parse_analyze(RawStmt *parseTree, const char *sourceText,
pstate->p_sourcetext = sourceText;
+ if (am_walsender && !am_db_walsender &&
+ analyze_requires_snapshot(parseTree))
+ {
+ /* FIXME: message */
+ ereport(ERROR,
+ (errcode(ERRCODE_PROTOCOL_VIOLATION),
+ errmsg("non-replication statement not supported in pure replication connection")));
+ }
+
if (numParams > 0)
parse_fixed_parameters(pstate, paramTypes, numParams);
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index 89d2836c49..6ede6d6af0 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -48,6 +48,7 @@
#include <ctype.h>
#include <limits.h>
+#include "access/xlogdefs.h"
#include "catalog/index.h"
#include "catalog/namespace.h"
#include "catalog/pg_am.h"
@@ -56,9 +57,12 @@
#include "commands/trigger.h"
#include "nodes/makefuncs.h"
#include "nodes/nodeFuncs.h"
+#include "nodes/replnodes.h"
#include "parser/gramparse.h"
#include "parser/parser.h"
#include "parser/parse_expr.h"
+#include "replication/walsender.h"
+#include "replication/walsender_private.h"
#include "storage/lmgr.h"
#include "utils/date.h"
#include "utils/datetime.h"
@@ -241,6 +245,8 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
PartitionSpec *partspec;
PartitionRangeDatum *partrange_datum;
RoleSpec *rolespec;
+ XLogRecPtr recptr;
+ uint32 uintval;
}
%type <node> stmt schema_stmt
@@ -584,6 +590,25 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
%type <partrange_datum> PartitionRangeDatum
%type <list> range_datum_list
+
+/* replication stuff */
+%type <node> replication_command
+%type <node> base_backup start_replication start_logical_replication
+ create_replication_slot drop_replication_slot identify_system
+ timeline_history
+%type <list> base_backup_opt_list
+%type <defelt> base_backup_opt
+%type <uintval> opt_timeline
+%type <list> plugin_options plugin_opt_list
+%type <defelt> plugin_opt_elem
+%type <node> plugin_opt_arg
+%type <str> opt_slot
+%type <boolean> opt_temporary
+%type <list> create_slot_opt_list
+%type <defelt> create_slot_opt
+%type <recptr> recptr
+
+
/*
* Non-keyword token types. These are hard-wired into the "flex" lexer.
* They must be listed first so that their numeric codes do not depend on
@@ -610,7 +635,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
AGGREGATE ALL ALSO ALTER ALWAYS ANALYSE ANALYZE AND ANY ARRAY AS ASC
ASSERTION ASSIGNMENT ASYMMETRIC AT ATTACH ATTRIBUTE AUTHORIZATION
- BACKWARD BEFORE BEGIN_P BETWEEN BIGINT BINARY BIT
+ BACKWARD BASE_BACKUP BEFORE BEGIN_P BETWEEN BIGINT BINARY BIT
BOOLEAN_P BOTH BY
CACHE CALLED CASCADE CASCADED CASE CAST CATALOG_P CHAIN CHAR_P
@@ -618,29 +643,29 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
CLUSTER COALESCE COLLATE COLLATION COLUMN COLUMNS COMMENT COMMENTS COMMIT
COMMITTED CONCURRENTLY CONFIGURATION CONFLICT CONNECTION CONSTRAINT
CONSTRAINTS CONTENT_P CONTINUE_P CONVERSION_P COPY COST CREATE
- CROSS CSV CUBE CURRENT_P
+ CREATE_REPLICATION_SLOT CROSS CSV CUBE CURRENT_P
CURRENT_CATALOG CURRENT_DATE CURRENT_ROLE CURRENT_SCHEMA
CURRENT_TIME CURRENT_TIMESTAMP CURRENT_USER CURSOR CYCLE
DATA_P DATABASE DAY_P DEALLOCATE DEC DECIMAL_P DECLARE DEFAULT DEFAULTS
DEFERRABLE DEFERRED DEFINER DELETE_P DELIMITER DELIMITERS DEPENDS DESC
DETACH DICTIONARY DISABLE_P DISCARD DISTINCT DO DOCUMENT_P DOMAIN_P
- DOUBLE_P DROP
+ DOUBLE_P DROP DROP_REPLICATION_SLOT
EACH ELSE ENABLE_P ENCODING ENCRYPTED END_P ENUM_P ESCAPE EVENT EXCEPT
- EXCLUDE EXCLUDING EXCLUSIVE EXECUTE EXISTS EXPLAIN
+ EXCLUDE EXCLUDING EXCLUSIVE EXECUTE EXISTS EXPLAIN EXPORT_SNAPSHOT
EXTENSION EXTERNAL EXTRACT
- FALSE_P FAMILY FETCH FILTER FIRST_P FLOAT_P FOLLOWING FOR
+ FALSE_P FAMILY FAST_P FETCH FILTER FIRST_P FLOAT_P FOLLOWING FOR
FORCE FOREIGN FORWARD FREEZE FROM FULL FUNCTION FUNCTIONS
GENERATED GLOBAL GRANT GRANTED GREATEST GROUP_P GROUPING
HANDLER HAVING HEADER_P HOLD HOUR_P
- IDENTITY_P IF_P ILIKE IMMEDIATE IMMUTABLE IMPLICIT_P IMPORT_P IN_P
- INCLUDING INCREMENT INDEX INDEXES INHERIT INHERITS INITIALLY INLINE_P
- INNER_P INOUT INPUT_P INSENSITIVE INSERT INSTEAD INT_P INTEGER
+ IDENTITY_P IDENTIFY_SYSTEM IF_P ILIKE IMMEDIATE IMMUTABLE IMPLICIT_P
+ IMPORT_P IN_P INCLUDING INCREMENT INDEX INDEXES INHERIT INHERITS INITIALLY
+ INLINE_P INNER_P INOUT INPUT_P INSENSITIVE INSERT INSTEAD INT_P INTEGER
INTERSECT INTERVAL INTO INVOKER IS ISNULL ISOLATION
JOIN
@@ -649,45 +674,46 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
LABEL LANGUAGE LARGE_P LAST_P LATERAL_P
LEADING LEAKPROOF LEAST LEFT LEVEL LIKE LIMIT LISTEN LOAD LOCAL
- LOCALTIME LOCALTIMESTAMP LOCATION LOCK_P LOCKED LOGGED
+ LOCALTIME LOCALTIMESTAMP LOCATION LOCK_P LOCKED LOGGED LOGICAL_P
- MAPPING MATCH MATERIALIZED MAXVALUE METHOD MINUTE_P MINVALUE MODE MONTH_P MOVE
+ MAPPING MATCH MATERIALIZED MAXVALUE MAX_RATE METHOD MINUTE_P MINVALUE MODE
+ MONTH_P MOVE
- NAME_P NAMES NATIONAL NATURAL NCHAR NEW NEXT NO NONE
+ NAME_P NAMES NATIONAL NATURAL NCHAR NEW NEXT NO NOEXPORT_SNAPSHOT NONE
NOREFRESH NOT NOTHING NOTIFY NOTNULL NOWAIT NULL_P NULLIF
NULLS_P NUMERIC
OBJECT_P OF OFF OFFSET OIDS OLD ON ONLY OPERATOR OPTION OPTIONS OR
ORDER ORDINALITY OUT_P OUTER_P OVER OVERLAPS OVERLAY OVERRIDING OWNED OWNER
- PARALLEL PARSER PARTIAL PARTITION PASSING PASSWORD PLACING PLANS POLICY
- POSITION PRECEDING PRECISION PRESERVE PREPARE PREPARED PRIMARY
- PRIOR PRIVILEGES PROCEDURAL PROCEDURE PROGRAM PUBLICATION
+ PARALLEL PARSER PARTIAL PARTITION PASSING PASSWORD PHYSICAL PLACING PLANS
+ POLICY POSITION PRECEDING PRECISION PRESERVE PREPARE PREPARED PRIMARY
+ PRIOR PRIVILEGES PROCEDURAL PROCEDURE PROGRAM PROGRESS PUBLICATION
QUOTE
RANGE READ REAL REASSIGN RECHECK RECURSIVE REF REFERENCES REFERENCING
REFRESH REINDEX RELATIVE_P RELEASE RENAME REPEATABLE REPLACE REPLICA
- RESET RESTART RESTRICT RETURNING RETURNS REVOKE RIGHT ROLE ROLLBACK ROLLUP
- ROW ROWS RULE
+ RESERVE_WAL RESET RESTART RESTRICT RETURNING RETURNS REVOKE RIGHT ROLE
+ ROLLBACK ROLLUP ROW ROWS RULE
SAVEPOINT SCHEMA SCHEMAS SCROLL SEARCH SECOND_P SECURITY SELECT SEQUENCE SEQUENCES
SERIALIZABLE SERVER SESSION SESSION_USER SET SETS SETOF SHARE SHOW
SIMILAR SIMPLE SKIP SLOT SMALLINT SNAPSHOT SOME SQL_P STABLE STANDALONE_P
- START STATEMENT STATISTICS STDIN STDOUT STORAGE STRICT_P STRIP_P
- SUBSCRIPTION SUBSTRING SYMMETRIC SYSID SYSTEM_P
+ START START_REPLICATION STATEMENT STATISTICS STDIN STDOUT STORAGE STRICT_P
+ STRIP_P SUBSCRIPTION SUBSTRING SYMMETRIC SYSID SYSTEM_P
- TABLE TABLES TABLESAMPLE TABLESPACE TEMP TEMPLATE TEMPORARY TEXT_P THEN
- TIME TIMESTAMP TO TRAILING TRANSACTION TRANSFORM TREAT TRIGGER TRIM TRUE_P
- TRUNCATE TRUSTED TYPE_P TYPES_P
+ TABLE TABLES TABLESAMPLE TABLESPACE TABLESPACE_MAP_P TEMP TEMPLATE TEMPORARY
+ TEXT_P THEN TIME TIMELINE_P TIMELINE_HISTORY TIMESTAMP TO TRAILING TRANSACTION
+ TRANSFORM TREAT TRIGGER TRIM TRUE_P TRUNCATE TRUSTED TYPE_P TYPES_P
UNBOUNDED UNCOMMITTED UNENCRYPTED UNION UNIQUE UNKNOWN UNLISTEN UNLOGGED
- UNTIL UPDATE USER USING
+ UNTIL UPDATE USE_SNAPSHOT USER USING
VACUUM VALID VALIDATE VALIDATOR VALUE_P VALUES VARCHAR VARIADIC VARYING
VERBOSE VERSION_P VIEW VIEWS VOLATILE
- WHEN WHERE WHITESPACE_P WINDOW WITH WITHIN WITHOUT WORK WRAPPER WRITE
+ WAL_P WHEN WHERE WHITESPACE_P WINDOW WITH WITHIN WITHOUT WORK WRAPPER WRITE
XML_P XMLATTRIBUTES XMLCONCAT XMLELEMENT XMLEXISTS XMLFOREST XMLNAMESPACES
XMLPARSE XMLPI XMLROOT XMLSERIALIZE XMLTABLE
@@ -942,6 +968,7 @@ stmt :
| VariableSetStmt
| VariableShowStmt
| ViewStmt
+ | replication_command
| /*EMPTY*/
{ $$ = NULL; }
;
@@ -14487,6 +14514,311 @@ AexprConst: Iconst
}
;
+/* replication protocol stuff */
+
+replication_command:
+ identify_system
+ {
+ ReplicationCmd *rcmd = makeNode(ReplicationCmd);
+ rcmd->replicationCmd = $1;
+ $$ = (Node *) rcmd;
+ }
+ | base_backup
+ {
+ ReplicationCmd *rcmd = makeNode(ReplicationCmd);
+ rcmd->replicationCmd = $1;
+ $$ = (Node *) rcmd;
+ }
+ | start_replication
+ {
+ ReplicationCmd *rcmd = makeNode(ReplicationCmd);
+ rcmd->replicationCmd = $1;
+ $$ = (Node *) rcmd;
+ }
+ | start_logical_replication
+ {
+ ReplicationCmd *rcmd = makeNode(ReplicationCmd);
+ rcmd->replicationCmd = $1;
+ $$ = (Node *) rcmd;
+ }
+ | create_replication_slot
+ {
+ ReplicationCmd *rcmd = makeNode(ReplicationCmd);
+ rcmd->replicationCmd = $1;
+ $$ = (Node *) rcmd;
+ }
+ | drop_replication_slot
+ {
+ ReplicationCmd *rcmd = makeNode(ReplicationCmd);
+ rcmd->replicationCmd = $1;
+ $$ = (Node *) rcmd;
+ }
+ | timeline_history
+ {
+ ReplicationCmd *rcmd = makeNode(ReplicationCmd);
+ rcmd->replicationCmd = $1;
+ $$ = (Node *) rcmd;
+ }
+;
+
+
+/*
+ * IDENTIFY_SYSTEM
+ */
+identify_system:
+ IDENTIFY_SYSTEM
+ {
+ $$ = (Node *) makeNode(IdentifySystemCmd);
+ }
+ ;
+
+/*
+ * BASE_BACKUP [LABEL '<label>'] [PROGRESS] [FAST] [WAL] [NOWAIT]
+ * [MAX_RATE %d] [TABLESPACE_MAP]
+ */
+base_backup:
+ BASE_BACKUP base_backup_opt_list
+ {
+ BaseBackupCmd *cmd = makeNode(BaseBackupCmd);
+ cmd->options = $2;
+ $$ = (Node *) cmd;
+ }
+ ;
+
+base_backup_opt_list:
+ base_backup_opt_list base_backup_opt
+ { $$ = lappend($1, $2); }
+ | /* EMPTY */
+ { $$ = NIL; }
+ ;
+
+base_backup_opt:
+ LABEL SCONST
+ {
+ $$ = makeDefElem("label",
+ (Node *)makeString($2), -1);
+ }
+ | PROGRESS
+ {
+ $$ = makeDefElem("progress",
+ (Node *)makeInteger(TRUE), -1);
+ }
+ | FAST_P
+ {
+ $$ = makeDefElem("fast",
+ (Node *)makeInteger(TRUE), -1);
+ }
+ | WAL_P
+ {
+ $$ = makeDefElem("wal",
+ (Node *)makeInteger(TRUE), -1);
+ }
+ | NOWAIT
+ {
+ $$ = makeDefElem("nowait",
+ (Node *)makeInteger(TRUE), -1);
+ }
+ | MAX_RATE ICONST
+ {
+ $$ = makeDefElem("max_rate",
+ (Node *)makeInteger($2), -1);
+ }
+ | TABLESPACE_MAP_P
+ {
+ $$ = makeDefElem("tablespace_map",
+ (Node *)makeInteger(TRUE), -1);
+ }
+ ;
+
+create_replication_slot:
+ /* CREATE_REPLICATION_SLOT slot TEMPORARY PHYSICAL RESERVE_WAL */
+ CREATE_REPLICATION_SLOT IDENT opt_temporary PHYSICAL create_slot_opt_list
+ {
+ CreateReplicationSlotCmd *cmd;
+ cmd = makeNode(CreateReplicationSlotCmd);
+ cmd->kind = REPLICATION_KIND_PHYSICAL;
+ cmd->slotname = $2;
+ cmd->temporary = $3;
+ cmd->options = $5;
+ $$ = (Node *) cmd;
+ }
+ /* CREATE_REPLICATION_SLOT slot TEMPORARY LOGICAL plugin */
+ | CREATE_REPLICATION_SLOT IDENT opt_temporary LOGICAL_P IDENT create_slot_opt_list
+ {
+ CreateReplicationSlotCmd *cmd;
+ cmd = makeNode(CreateReplicationSlotCmd);
+ cmd->kind = REPLICATION_KIND_LOGICAL;
+ cmd->slotname = $2;
+ cmd->temporary = $3;
+ cmd->plugin = $5;
+ cmd->options = $6;
+ $$ = (Node *) cmd;
+ }
+ ;
+
+create_slot_opt_list:
+ create_slot_opt_list create_slot_opt
+ { $$ = lappend($1, $2); }
+ | /* EMPTY */
+ { $$ = NIL; }
+ ;
+
+create_slot_opt:
+ EXPORT_SNAPSHOT
+ {
+ $$ = makeDefElem("export_snapshot",
+ (Node *)makeInteger(TRUE), -1);
+ }
+ | NOEXPORT_SNAPSHOT
+ {
+ $$ = makeDefElem("export_snapshot",
+ (Node *)makeInteger(FALSE), -1);
+ }
+ | USE_SNAPSHOT
+ {
+ $$ = makeDefElem("use_snapshot",
+ (Node *)makeInteger(TRUE), -1);
+ }
+ | RESERVE_WAL
+ {
+ $$ = makeDefElem("reserve_wal",
+ (Node *)makeInteger(TRUE), -1);
+ }
+ ;
+
+/* DROP_REPLICATION_SLOT slot */
+drop_replication_slot:
+ DROP_REPLICATION_SLOT IDENT
+ {
+ DropReplicationSlotCmd *cmd;
+ cmd = makeNode(DropReplicationSlotCmd);
+ cmd->slotname = $2;
+ $$ = (Node *) cmd;
+ }
+ ;
+
+recptr:
+ SCONST
+ {
+ uint32 hi,
+ lo;
+ if (sscanf($1, "%X/%X", &hi, &lo) != 2)
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("invalid frakbar"),
+ parser_errposition(@1)));
+ $$ = ((uint64) hi) << 32 | lo;
+ }
+
+/*
+ * START_REPLICATION [SLOT slot] [PHYSICAL] %X/%X [TIMELINE %d]
+ */
+start_replication:
+ START_REPLICATION opt_slot opt_physical recptr opt_timeline
+ {
+ StartReplicationCmd *cmd;
+
+ cmd = makeNode(StartReplicationCmd);
+ cmd->kind = REPLICATION_KIND_PHYSICAL;
+ cmd->slotname = $2;
+ cmd->startpoint = $4;
+ cmd->timeline = $5;
+ $$ = (Node *) cmd;
+ }
+ ;
+
+/* START_REPLICATION SLOT slot LOGICAL %X/%X options */
+start_logical_replication:
+ START_REPLICATION SLOT IDENT LOGICAL_P recptr plugin_options
+ {
+ StartReplicationCmd *cmd;
+ cmd = makeNode(StartReplicationCmd);
+ cmd->kind = REPLICATION_KIND_LOGICAL;
+ cmd->slotname = $3;
+ cmd->startpoint = $5;
+ cmd->options = $6;
+ $$ = (Node *) cmd;
+ }
+ ;
+/*
+ * TIMELINE_HISTORY %d
+ */
+timeline_history:
+ TIMELINE_HISTORY ICONST
+ {
+ TimeLineHistoryCmd *cmd;
+
+ if ($2 <= 0)
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ (errmsg("invalid timeline %u", $2))));
+
+ cmd = makeNode(TimeLineHistoryCmd);
+ cmd->timeline = $2;
+
+ $$ = (Node *) cmd;
+ }
+ ;
+
+opt_physical:
+ PHYSICAL
+ | /* EMPTY */
+ ;
+
+opt_temporary:
+ TEMPORARY { $$ = true; }
+ | /* EMPTY */ { $$ = false; }
+ ;
+
+opt_slot:
+ SLOT IDENT
+ { $$ = $2; }
+ | /* EMPTY */
+ { $$ = NULL; }
+ ;
+
+opt_timeline:
+ TIMELINE_P ICONST
+ {
+ if ($2 <= 0)
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ (errmsg("invalid timeline %u", $2))));
+ $$ = $2;
+ }
+ | /* EMPTY */ { $$ = 0; }
+ ;
+
+
+plugin_options:
+ '(' plugin_opt_list ')' { $$ = $2; }
+ | /* EMPTY */ { $$ = NIL; }
+ ;
+
+plugin_opt_list:
+ plugin_opt_elem
+ {
+ $$ = list_make1($1);
+ }
+ | plugin_opt_list ',' plugin_opt_elem
+ {
+ $$ = lappend($1, $3);
+ }
+ ;
+
+plugin_opt_elem:
+ IDENT plugin_opt_arg
+ {
+ $$ = makeDefElem($1, $2, -1);
+ }
+ ;
+
+plugin_opt_arg:
+ SCONST { $$ = (Node *) makeString($1); }
+ | /* EMPTY */ { $$ = NULL; }
+ ;
+
+
Iconst: ICONST { $$ = $1; };
Sconst: SCONST { $$ = $1; };
@@ -14646,6 +14978,7 @@ unreserved_keyword:
| ATTACH
| ATTRIBUTE
| BACKWARD
+ | BASE_BACKUP
| BEFORE
| BEGIN_P
| BY
@@ -14674,6 +15007,7 @@ unreserved_keyword:
| CONVERSION_P
| COPY
| COST
+ | CREATE_REPLICATION_SLOT
| CSV
| CUBE
| CURRENT_P
@@ -14699,6 +15033,7 @@ unreserved_keyword:
| DOMAIN_P
| DOUBLE_P
| DROP
+ | DROP_REPLICATION_SLOT
| EACH
| ENABLE_P
| ENCODING
@@ -14711,9 +15046,11 @@ unreserved_keyword:
| EXCLUSIVE
| EXECUTE
| EXPLAIN
+ | EXPORT_SNAPSHOT
| EXTENSION
| EXTERNAL
| FAMILY
+ | FAST_P
| FILTER
| FIRST_P
| FOLLOWING
@@ -14728,6 +15065,7 @@ unreserved_keyword:
| HEADER_P
| HOLD
| HOUR_P
+ | IDENTIFY_SYSTEM
| IDENTITY_P
| IF_P
| IMMEDIATE
@@ -14761,10 +15099,12 @@ unreserved_keyword:
| LOCK_P
| LOCKED
| LOGGED
+ | LOGICAL_P
| MAPPING
| MATCH
| MATERIALIZED
| MAXVALUE
+ | MAX_RATE
| METHOD
| MINUTE_P
| MINVALUE
@@ -14776,6 +15116,7 @@ unreserved_keyword:
| NEW
| NEXT
| NO
+ | NOEXPORT_SNAPSHOT
| NOREFRESH
| NOTHING
| NOTIFY
@@ -14800,6 +15141,7 @@ unreserved_keyword:
| PARTITION
| PASSING
| PASSWORD
+ | PHYSICAL
| PLANS
| POLICY
| PRECEDING
@@ -14811,6 +15153,7 @@ unreserved_keyword:
| PROCEDURAL
| PROCEDURE
| PROGRAM
+ | PROGRESS
| PUBLICATION
| QUOTE
| RANGE
@@ -14828,6 +15171,7 @@ unreserved_keyword:
| REPEATABLE
| REPLACE
| REPLICA
+ | RESERVE_WAL
| RESET
| RESTART
| RESTRICT
@@ -14862,6 +15206,7 @@ unreserved_keyword:
| STABLE
| STANDALONE_P
| START
+ | START_REPLICATION
| STATEMENT
| STATISTICS
| STDIN
@@ -14874,10 +15219,13 @@ unreserved_keyword:
| SYSTEM_P
| TABLES
| TABLESPACE
+ | TABLESPACE_MAP_P
| TEMP
| TEMPLATE
| TEMPORARY
| TEXT_P
+ | TIMELINE_P
+ | TIMELINE_HISTORY
| TRANSACTION
| TRANSFORM
| TRIGGER
@@ -14893,6 +15241,7 @@ unreserved_keyword:
| UNLOGGED
| UNTIL
| UPDATE
+ | USE_SNAPSHOT
| VACUUM
| VALID
| VALIDATE
@@ -14903,6 +15252,7 @@ unreserved_keyword:
| VIEW
| VIEWS
| VOLATILE
+ | WAL_P
| WHITESPACE_P
| WITHIN
| WITHOUT
diff --git a/src/backend/replication/Makefile b/src/backend/replication/Makefile
index da8bcf0471..62f8f3a440 100644
--- a/src/backend/replication/Makefile
+++ b/src/backend/replication/Makefile
@@ -15,15 +15,12 @@ include $(top_builddir)/src/Makefile.global
override CPPFLAGS := -I. -I$(srcdir) $(CPPFLAGS)
OBJS = walsender.o walreceiverfuncs.o walreceiver.o basebackup.o \
- repl_gram.o slot.o slotfuncs.o syncrep.o syncrep_gram.o
+ slot.o slotfuncs.o syncrep.o syncrep_gram.o
SUBDIRS = logical
include $(top_srcdir)/src/backend/common.mk
-# repl_scanner is compiled as part of repl_gram
-repl_gram.o: repl_scanner.c
-
# syncrep_scanner is complied as part of syncrep_gram
syncrep_gram.o: syncrep_scanner.c
syncrep_scanner.c: FLEXFLAGS = -CF -p -i
diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
index 9d7bb25d39..26a53333f8 100644
--- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
+++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
@@ -352,7 +352,7 @@ libpqrcv_startstreaming(WalReceiverConn *conn,
if (options->logical)
appendStringInfo(&cmd, " LOGICAL");
- appendStringInfo(&cmd, " %X/%X",
+ appendStringInfo(&cmd, " '%X/%X'",
(uint32) (options->startpoint >> 32),
(uint32) options->startpoint);
diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c
index 358ec28932..eb261ea02f 100644
--- a/src/backend/replication/logical/snapbuild.c
+++ b/src/backend/replication/logical/snapbuild.c
@@ -589,8 +589,7 @@ SnapBuildExportSnapshot(SnapBuild *builder)
Snapshot snap;
char *snapname;
- if (IsTransactionOrTransactionBlock())
- elog(ERROR, "cannot export a snapshot from within a transaction");
+ Assert(IsTransactionOrTransactionBlock());
if (SavedResourceOwnerDuringExport)
elog(ERROR, "can only export one snapshot at a time");
@@ -598,8 +597,6 @@ SnapBuildExportSnapshot(SnapBuild *builder)
SavedResourceOwnerDuringExport = CurrentResourceOwner;
ExportInProgress = true;
- StartTransactionCommand();
-
/* There doesn't seem to a nice API to set these */
XactIsoLevel = XACT_REPEATABLE_READ;
XactReadOnly = true;
diff --git a/src/backend/replication/repl_gram.y b/src/backend/replication/repl_gram.y
deleted file mode 100644
index ec047c827c..0000000000
--- a/src/backend/replication/repl_gram.y
+++ /dev/null
@@ -1,408 +0,0 @@
-%{
-/*-------------------------------------------------------------------------
- *
- * repl_gram.y - Parser for the replication commands
- *
- * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
- * Portions Copyright (c) 1994, Regents of the University of California
- *
- *
- * IDENTIFICATION
- * src/backend/replication/repl_gram.y
- *
- *-------------------------------------------------------------------------
- */
-
-#include "postgres.h"
-
-#include "access/xlogdefs.h"
-#include "nodes/makefuncs.h"
-#include "nodes/replnodes.h"
-#include "replication/walsender.h"
-#include "replication/walsender_private.h"
-
-
-/* Result of the parsing is returned here */
-Node *replication_parse_result;
-
-static SQLCmd *make_sqlcmd(void);
-
-
-/*
- * Bison doesn't allocate anything that needs to live across parser calls,
- * so we can easily have it use palloc instead of malloc. This prevents
- * memory leaks if we error out during parsing. Note this only works with
- * bison >= 2.0. However, in bison 1.875 the default is to use alloca()
- * if possible, so there's not really much problem anyhow, at least if
- * you're building with gcc.
- */
-#define YYMALLOC palloc
-#define YYFREE pfree
-
-%}
-
-%expect 0
-%name-prefix="replication_yy"
-
-%union {
- char *str;
- bool boolval;
- uint32 uintval;
-
- XLogRecPtr recptr;
- Node *node;
- List *list;
- DefElem *defelt;
-}
-
-/* Non-keyword tokens */
-%token <str> SCONST IDENT
-%token <uintval> UCONST
-%token <recptr> RECPTR
-%token T_WORD
-
-/* Keyword tokens. */
-%token K_BASE_BACKUP
-%token K_IDENTIFY_SYSTEM
-%token K_SHOW
-%token K_START_REPLICATION
-%token K_CREATE_REPLICATION_SLOT
-%token K_DROP_REPLICATION_SLOT
-%token K_TIMELINE_HISTORY
-%token K_LABEL
-%token K_PROGRESS
-%token K_FAST
-%token K_NOWAIT
-%token K_MAX_RATE
-%token K_WAL
-%token K_TABLESPACE_MAP
-%token K_TIMELINE
-%token K_PHYSICAL
-%token K_LOGICAL
-%token K_SLOT
-%token K_RESERVE_WAL
-%token K_TEMPORARY
-%token K_EXPORT_SNAPSHOT
-%token K_NOEXPORT_SNAPSHOT
-%token K_USE_SNAPSHOT
-
-%type <node> command
-%type <node> base_backup start_replication start_logical_replication
- create_replication_slot drop_replication_slot identify_system
- timeline_history show sql_cmd
-%type <list> base_backup_opt_list
-%type <defelt> base_backup_opt
-%type <uintval> opt_timeline
-%type <list> plugin_options plugin_opt_list
-%type <defelt> plugin_opt_elem
-%type <node> plugin_opt_arg
-%type <str> opt_slot var_name
-%type <boolval> opt_temporary
-%type <list> create_slot_opt_list
-%type <defelt> create_slot_opt
-
-%%
-
-firstcmd: command opt_semicolon
- {
- replication_parse_result = $1;
- }
- ;
-
-opt_semicolon: ';'
- | /* EMPTY */
- ;
-
-command:
- identify_system
- | base_backup
- | start_replication
- | start_logical_replication
- | create_replication_slot
- | drop_replication_slot
- | timeline_history
- | show
- | sql_cmd
- ;
-
-/*
- * IDENTIFY_SYSTEM
- */
-identify_system:
- K_IDENTIFY_SYSTEM
- {
- $$ = (Node *) makeNode(IdentifySystemCmd);
- }
- ;
-
-/*
- * SHOW setting
- */
-show:
- K_SHOW var_name
- {
- VariableShowStmt *n = makeNode(VariableShowStmt);
- n->name = $2;
- $$ = (Node *) n;
- }
-
-var_name: IDENT { $$ = $1; }
- | var_name '.' IDENT
- { $$ = psprintf("%s.%s", $1, $3); }
- ;
-
-/*
- * BASE_BACKUP [LABEL '<label>'] [PROGRESS] [FAST] [WAL] [NOWAIT]
- * [MAX_RATE %d] [TABLESPACE_MAP]
- */
-base_backup:
- K_BASE_BACKUP base_backup_opt_list
- {
- BaseBackupCmd *cmd = makeNode(BaseBackupCmd);
- cmd->options = $2;
- $$ = (Node *) cmd;
- }
- ;
-
-base_backup_opt_list:
- base_backup_opt_list base_backup_opt
- { $$ = lappend($1, $2); }
- | /* EMPTY */
- { $$ = NIL; }
- ;
-
-base_backup_opt:
- K_LABEL SCONST
- {
- $$ = makeDefElem("label",
- (Node *)makeString($2), -1);
- }
- | K_PROGRESS
- {
- $$ = makeDefElem("progress",
- (Node *)makeInteger(TRUE), -1);
- }
- | K_FAST
- {
- $$ = makeDefElem("fast",
- (Node *)makeInteger(TRUE), -1);
- }
- | K_WAL
- {
- $$ = makeDefElem("wal",
- (Node *)makeInteger(TRUE), -1);
- }
- | K_NOWAIT
- {
- $$ = makeDefElem("nowait",
- (Node *)makeInteger(TRUE), -1);
- }
- | K_MAX_RATE UCONST
- {
- $$ = makeDefElem("max_rate",
- (Node *)makeInteger($2), -1);
- }
- | K_TABLESPACE_MAP
- {
- $$ = makeDefElem("tablespace_map",
- (Node *)makeInteger(TRUE), -1);
- }
- ;
-
-create_replication_slot:
- /* CREATE_REPLICATION_SLOT slot TEMPORARY PHYSICAL RESERVE_WAL */
- K_CREATE_REPLICATION_SLOT IDENT opt_temporary K_PHYSICAL create_slot_opt_list
- {
- CreateReplicationSlotCmd *cmd;
- cmd = makeNode(CreateReplicationSlotCmd);
- cmd->kind = REPLICATION_KIND_PHYSICAL;
- cmd->slotname = $2;
- cmd->temporary = $3;
- cmd->options = $5;
- $$ = (Node *) cmd;
- }
- /* CREATE_REPLICATION_SLOT slot TEMPORARY LOGICAL plugin */
- | K_CREATE_REPLICATION_SLOT IDENT opt_temporary K_LOGICAL IDENT create_slot_opt_list
- {
- CreateReplicationSlotCmd *cmd;
- cmd = makeNode(CreateReplicationSlotCmd);
- cmd->kind = REPLICATION_KIND_LOGICAL;
- cmd->slotname = $2;
- cmd->temporary = $3;
- cmd->plugin = $5;
- cmd->options = $6;
- $$ = (Node *) cmd;
- }
- ;
-
-create_slot_opt_list:
- create_slot_opt_list create_slot_opt
- { $$ = lappend($1, $2); }
- | /* EMPTY */
- { $$ = NIL; }
- ;
-
-create_slot_opt:
- K_EXPORT_SNAPSHOT
- {
- $$ = makeDefElem("export_snapshot",
- (Node *)makeInteger(TRUE), -1);
- }
- | K_NOEXPORT_SNAPSHOT
- {
- $$ = makeDefElem("export_snapshot",
- (Node *)makeInteger(FALSE), -1);
- }
- | K_USE_SNAPSHOT
- {
- $$ = makeDefElem("use_snapshot",
- (Node *)makeInteger(TRUE), -1);
- }
- | K_RESERVE_WAL
- {
- $$ = makeDefElem("reserve_wal",
- (Node *)makeInteger(TRUE), -1);
- }
- ;
-
-/* DROP_REPLICATION_SLOT slot */
-drop_replication_slot:
- K_DROP_REPLICATION_SLOT IDENT
- {
- DropReplicationSlotCmd *cmd;
- cmd = makeNode(DropReplicationSlotCmd);
- cmd->slotname = $2;
- $$ = (Node *) cmd;
- }
- ;
-
-/*
- * START_REPLICATION [SLOT slot] [PHYSICAL] %X/%X [TIMELINE %d]
- */
-start_replication:
- K_START_REPLICATION opt_slot opt_physical RECPTR opt_timeline
- {
- StartReplicationCmd *cmd;
-
- cmd = makeNode(StartReplicationCmd);
- cmd->kind = REPLICATION_KIND_PHYSICAL;
- cmd->slotname = $2;
- cmd->startpoint = $4;
- cmd->timeline = $5;
- $$ = (Node *) cmd;
- }
- ;
-
-/* START_REPLICATION SLOT slot LOGICAL %X/%X options */
-start_logical_replication:
- K_START_REPLICATION K_SLOT IDENT K_LOGICAL RECPTR plugin_options
- {
- StartReplicationCmd *cmd;
- cmd = makeNode(StartReplicationCmd);
- cmd->kind = REPLICATION_KIND_LOGICAL;
- cmd->slotname = $3;
- cmd->startpoint = $5;
- cmd->options = $6;
- $$ = (Node *) cmd;
- }
- ;
-/*
- * TIMELINE_HISTORY %d
- */
-timeline_history:
- K_TIMELINE_HISTORY UCONST
- {
- TimeLineHistoryCmd *cmd;
-
- if ($2 <= 0)
- ereport(ERROR,
- (errcode(ERRCODE_SYNTAX_ERROR),
- (errmsg("invalid timeline %u", $2))));
-
- cmd = makeNode(TimeLineHistoryCmd);
- cmd->timeline = $2;
-
- $$ = (Node *) cmd;
- }
- ;
-
-opt_physical:
- K_PHYSICAL
- | /* EMPTY */
- ;
-
-opt_temporary:
- K_TEMPORARY { $$ = true; }
- | /* EMPTY */ { $$ = false; }
- ;
-
-opt_slot:
- K_SLOT IDENT
- { $$ = $2; }
- | /* EMPTY */
- { $$ = NULL; }
- ;
-
-opt_timeline:
- K_TIMELINE UCONST
- {
- if ($2 <= 0)
- ereport(ERROR,
- (errcode(ERRCODE_SYNTAX_ERROR),
- (errmsg("invalid timeline %u", $2))));
- $$ = $2;
- }
- | /* EMPTY */ { $$ = 0; }
- ;
-
-
-plugin_options:
- '(' plugin_opt_list ')' { $$ = $2; }
- | /* EMPTY */ { $$ = NIL; }
- ;
-
-plugin_opt_list:
- plugin_opt_elem
- {
- $$ = list_make1($1);
- }
- | plugin_opt_list ',' plugin_opt_elem
- {
- $$ = lappend($1, $3);
- }
- ;
-
-plugin_opt_elem:
- IDENT plugin_opt_arg
- {
- $$ = makeDefElem($1, $2, -1);
- }
- ;
-
-plugin_opt_arg:
- SCONST { $$ = (Node *) makeString($1); }
- | /* EMPTY */ { $$ = NULL; }
- ;
-
-sql_cmd:
- IDENT { $$ = (Node *) make_sqlcmd(); }
- ;
-%%
-
-static SQLCmd *
-make_sqlcmd(void)
-{
- SQLCmd *cmd = makeNode(SQLCmd);
- int tok;
-
- /* Just move lexer to the end of command. */
- for (;;)
- {
- tok = yylex();
- if (tok == ';' || tok == 0)
- break;
- }
- return cmd;
-}
-
-#include "repl_scanner.c"
diff --git a/src/backend/replication/repl_scanner.l b/src/backend/replication/repl_scanner.l
deleted file mode 100644
index 52ae7b343f..0000000000
--- a/src/backend/replication/repl_scanner.l
+++ /dev/null
@@ -1,248 +0,0 @@
-%{
-/*-------------------------------------------------------------------------
- *
- * repl_scanner.l
- * a lexical scanner for the replication commands
- *
- * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
- * Portions Copyright (c) 1994, Regents of the University of California
- *
- *
- * IDENTIFICATION
- * src/backend/replication/repl_scanner.l
- *
- *-------------------------------------------------------------------------
- */
-#include "postgres.h"
-
-#include "utils/builtins.h"
-#include "parser/scansup.h"
-
-/* Avoid exit() on fatal scanner errors (a bit ugly -- see yy_fatal_error) */
-#undef fprintf
-#define fprintf(file, fmt, msg) fprintf_to_ereport(fmt, msg)
-
-static void
-fprintf_to_ereport(const char *fmt, const char *msg)
-{
- ereport(ERROR, (errmsg_internal("%s", msg)));
-}
-
-/* Handle to the buffer that the lexer uses internally */
-static YY_BUFFER_STATE scanbufhandle;
-
-static StringInfoData litbuf;
-
-static void startlit(void);
-static char *litbufdup(void);
-static void addlit(char *ytext, int yleng);
-static void addlitchar(unsigned char ychar);
-
-%}
-
-%option 8bit
-%option never-interactive
-%option nodefault
-%option noinput
-%option nounput
-%option noyywrap
-%option warn
-%option prefix="replication_yy"
-
-%x xq xd
-
-/* Extended quote
- * xqdouble implements embedded quote, ''''
- */
-xqstart {quote}
-xqdouble {quote}{quote}
-xqinside [^']+
-
-/* Double quote
- * Allows embedded spaces and other special characters into identifiers.
- */
-dquote \"
-xdstart {dquote}
-xdstop {dquote}
-xddouble {dquote}{dquote}
-xdinside [^"]+
-
-digit [0-9]+
-hexdigit [0-9A-Za-z]+
-
-quote '
-quotestop {quote}
-
-ident_start [A-Za-z\200-\377_]
-ident_cont [A-Za-z\200-\377_0-9\$]
-
-identifier {ident_start}{ident_cont}*
-
-%%
-
-BASE_BACKUP { return K_BASE_BACKUP; }
-FAST { return K_FAST; }
-IDENTIFY_SYSTEM { return K_IDENTIFY_SYSTEM; }
-SHOW { return K_SHOW; }
-LABEL { return K_LABEL; }
-NOWAIT { return K_NOWAIT; }
-PROGRESS { return K_PROGRESS; }
-MAX_RATE { return K_MAX_RATE; }
-WAL { return K_WAL; }
-TABLESPACE_MAP { return K_TABLESPACE_MAP; }
-TIMELINE { return K_TIMELINE; }
-START_REPLICATION { return K_START_REPLICATION; }
-CREATE_REPLICATION_SLOT { return K_CREATE_REPLICATION_SLOT; }
-DROP_REPLICATION_SLOT { return K_DROP_REPLICATION_SLOT; }
-TIMELINE_HISTORY { return K_TIMELINE_HISTORY; }
-PHYSICAL { return K_PHYSICAL; }
-RESERVE_WAL { return K_RESERVE_WAL; }
-LOGICAL { return K_LOGICAL; }
-SLOT { return K_SLOT; }
-TEMPORARY { return K_TEMPORARY; }
-EXPORT_SNAPSHOT { return K_EXPORT_SNAPSHOT; }
-NOEXPORT_SNAPSHOT { return K_NOEXPORT_SNAPSHOT; }
-USE_SNAPSHOT { return K_USE_SNAPSHOT; }
-
-"," { return ','; }
-";" { return ';'; }
-"(" { return '('; }
-")" { return ')'; }
-
-[\n] ;
-[\t] ;
-" " ;
-
-{digit}+ {
- yylval.uintval = strtoul(yytext, NULL, 10);
- return UCONST;
- }
-
-{hexdigit}+\/{hexdigit}+ {
- uint32 hi,
- lo;
- if (sscanf(yytext, "%X/%X", &hi, &lo) != 2)
- yyerror("invalid streaming start location");
- yylval.recptr = ((uint64) hi) << 32 | lo;
- return RECPTR;
- }
-
-{xqstart} {
- BEGIN(xq);
- startlit();
- }
-
-<xq>{quotestop} {
- yyless(1);
- BEGIN(INITIAL);
- yylval.str = litbufdup();
- return SCONST;
- }
-
-<xq>{xqdouble} {
- addlitchar('\'');
- }
-
-<xq>{xqinside} {
- addlit(yytext, yyleng);
- }
-
-{xdstart} {
- BEGIN(xd);
- startlit();
- }
-
-<xd>{xdstop} {
- int len;
- yyless(1);
- BEGIN(INITIAL);
- yylval.str = litbufdup();
- len = strlen(yylval.str);
- truncate_identifier(yylval.str, len, true);
- return IDENT;
- }
-
-<xd>{xdinside} {
- addlit(yytext, yyleng);
- }
-
-{identifier} {
- int len = strlen(yytext);
-
- yylval.str = downcase_truncate_identifier(yytext, len, true);
- return IDENT;
- }
-
-<xq,xd><<EOF>> { yyerror("unterminated quoted string"); }
-
-
-<<EOF>> {
- yyterminate();
- }
-
-. {
- return T_WORD;
- }
-%%
-
-
-static void
-startlit(void)
-{
- initStringInfo(&litbuf);
-}
-
-static char *
-litbufdup(void)
-{
- return litbuf.data;
-}
-
-static void
-addlit(char *ytext, int yleng)
-{
- appendBinaryStringInfo(&litbuf, ytext, yleng);
-}
-
-static void
-addlitchar(unsigned char ychar)
-{
- appendStringInfoChar(&litbuf, ychar);
-}
-
-void
-yyerror(const char *message)
-{
- ereport(ERROR,
- (errcode(ERRCODE_SYNTAX_ERROR),
- errmsg_internal("%s", message)));
-}
-
-
-void
-replication_scanner_init(const char *str)
-{
- Size slen = strlen(str);
- char *scanbuf;
-
- /*
- * Might be left over after ereport()
- */
- if (YY_CURRENT_BUFFER)
- yy_delete_buffer(YY_CURRENT_BUFFER);
-
- /*
- * Make a scan buffer with special termination needed by flex.
- */
- scanbuf = (char *) palloc(slen + 2);
- memcpy(scanbuf, str, slen);
- scanbuf[slen] = scanbuf[slen + 1] = YY_END_OF_BUFFER_CHAR;
- scanbufhandle = yy_scan_buffer(scanbuf, slen + 2);
-}
-
-void
-replication_scanner_finish(void)
-{
- yy_delete_buffer(scanbufhandle);
- scanbufhandle = NULL;
-}
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 064cf5ee28..e3284363e0 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -360,18 +360,7 @@ IdentifySystem(void)
snprintf(xpos, sizeof(xpos), "%X/%X", (uint32) (logptr >> 32), (uint32) logptr);
if (MyDatabaseId != InvalidOid)
- {
- MemoryContext cur = CurrentMemoryContext;
-
- /* syscache access needs a transaction env. */
- StartTransactionCommand();
- /* make dbname live outside TX context */
- MemoryContextSwitchTo(cur);
dbname = get_database_name(MyDatabaseId);
- CommitTransactionCommand();
- /* CommitTransactionCommand switches to TopMemoryContext */
- MemoryContextSwitchTo(cur);
- }
dest = CreateDestReceiver(DestRemoteSimple);
MemSet(nulls, false, sizeof(nulls));
@@ -870,6 +859,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
cmd->temporary ? RS_TEMPORARY : RS_EPHEMERAL);
}
+
if (cmd->kind == REPLICATION_KIND_LOGICAL)
{
LogicalDecodingContext *ctx;
@@ -908,6 +898,9 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
"must not be called in a subtransaction")));
}
+ /* XXX: document */
+ CommitTransactionCommand();
+
ctx = CreateInitDecodingContext(cmd->plugin, NIL,
logical_read_xlog_page,
WalSndPrepareWrite, WalSndWriteData);
@@ -924,6 +917,9 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
/* build initial snapshot, might take a while */
DecodingContextFindStartpoint(ctx);
+ /* match*/
+ StartTransactionCommand();
+
/*
* Export or use the snapshot if we've been asked to do so.
*
@@ -940,8 +936,10 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
snap = SnapBuildInitialSnapshot(ctx->snapshot_builder);
RestoreTransactionSnapshot(snap, MyProc);
+
}
+
/* don't need the decoding context anymore */
FreeDecodingContext(ctx);
@@ -1354,65 +1352,37 @@ WalSndWaitForWal(XLogRecPtr loc)
return RecentFlushPtr;
}
-/*
- * Execute an incoming replication command.
- *
- * Returns true if the cmd_string was recognized as WalSender command, false
- * if not.
- */
bool
-exec_replication_command(const char *cmd_string)
+exec_replication_command(ReplicationCmd *cmd, const char *query_string, bool is_toplevel)
{
- int parse_rc;
- Node *cmd_node;
- MemoryContext cmd_context;
- MemoryContext old_context;
-
- /*
- * CREATE_REPLICATION_SLOT ... LOGICAL exports a snapshot until the next
- * command arrives. Clean up the old stuff if there's anything.
- */
- SnapBuildClearExportedSnapshot();
-
- CHECK_FOR_INTERRUPTS();
-
- cmd_context = AllocSetContextCreate(CurrentMemoryContext,
- "Replication command context",
- ALLOCSET_DEFAULT_SIZES);
- old_context = MemoryContextSwitchTo(cmd_context);
-
- replication_scanner_init(cmd_string);
- parse_rc = replication_yyparse();
- if (parse_rc != 0)
+ if (!am_walsender)
+ {
ereport(ERROR,
- (errcode(ERRCODE_SYNTAX_ERROR),
- (errmsg_internal("replication command parser returned %d",
- parse_rc))));
-
- cmd_node = replication_parse_result;
+ (errcode(ERRCODE_PROTOCOL_VIOLATION),
+ errmsg("replication protocol statement not supported in a normal connection")));
+ }
/*
* Log replication command if log_replication_commands is enabled. Even
* when it's disabled, log the command with DEBUG1 level for backward
- * compatibility. Note that SQL commands are not logged here, and will be
- * logged later if log_statement is enabled.
+ * compatibility.
*/
- if (cmd_node->type != T_SQLCmd)
- ereport(log_replication_commands ? LOG : DEBUG1,
- (errmsg("received replication command: %s", cmd_string)));
+ ereport(log_replication_commands ? LOG : DEBUG1,
+ (errmsg("received replication command: %s", query_string)));
/*
* CREATE_REPLICATION_SLOT ... LOGICAL exports a snapshot. If it was
* called outside of transaction the snapshot should be cleared here.
+ *
+ * FIXME: this is completely borked.
*/
if (!IsTransactionBlock())
SnapBuildClearExportedSnapshot();
/*
- * For aborted transactions, don't allow anything except pure SQL,
- * the exec_simple_query() will handle it correctly.
+ * Nothing to do here in an aborted transaction.
*/
- if (IsAbortedTransactionBlockState() && !IsA(cmd_node, SQLCmd))
+ if (IsAbortedTransactionBlockState())
ereport(ERROR,
(errcode(ERRCODE_IN_FAILED_SQL_TRANSACTION),
errmsg("current transaction is aborted, "
@@ -1428,7 +1398,7 @@ exec_replication_command(const char *cmd_string)
initStringInfo(&reply_message);
initStringInfo(&tmpbuf);
- switch (cmd_node->type)
+ switch (nodeTag(cmd->replicationCmd))
{
case T_IdentifySystemCmd:
IdentifySystem();
@@ -1436,64 +1406,46 @@ exec_replication_command(const char *cmd_string)
case T_BaseBackupCmd:
PreventTransactionChain(true, "BASE_BACKUP");
- SendBaseBackup((BaseBackupCmd *) cmd_node);
+ CommitTransactionCommand();
+ SendBaseBackup((BaseBackupCmd *) cmd->replicationCmd);
+ StartTransactionCommand();
break;
case T_CreateReplicationSlotCmd:
- CreateReplicationSlot((CreateReplicationSlotCmd *) cmd_node);
+ CreateReplicationSlot((CreateReplicationSlotCmd *) cmd->replicationCmd);
break;
case T_DropReplicationSlotCmd:
- DropReplicationSlot((DropReplicationSlotCmd *) cmd_node);
+ DropReplicationSlot((DropReplicationSlotCmd *) cmd->replicationCmd);
break;
case T_StartReplicationCmd:
{
- StartReplicationCmd *cmd = (StartReplicationCmd *) cmd_node;
+ StartReplicationCmd *scmd = (StartReplicationCmd *) cmd->replicationCmd;
PreventTransactionChain(true, "START_REPLICATION");
- if (cmd->kind == REPLICATION_KIND_PHYSICAL)
- StartReplication(cmd);
+ CommitTransactionCommand();
+
+ if (scmd->kind == REPLICATION_KIND_PHYSICAL)
+ StartReplication(scmd);
else
- StartLogicalReplication(cmd);
+ StartLogicalReplication(scmd);
+
+ StartTransactionCommand();
break;
}
case T_TimeLineHistoryCmd:
PreventTransactionChain(true, "TIMELINE_HISTORY");
- SendTimeLineHistory((TimeLineHistoryCmd *) cmd_node);
+ SendTimeLineHistory((TimeLineHistoryCmd *) cmd->replicationCmd);
break;
- case T_VariableShowStmt:
- {
- DestReceiver *dest = CreateDestReceiver(DestRemoteSimple);
- VariableShowStmt *n = (VariableShowStmt *) cmd_node;
-
- GetPGVariable(n->name, dest);
- }
- break;
-
- case T_SQLCmd:
- if (MyDatabaseId == InvalidOid)
- ereport(ERROR,
- (errmsg("not connected to database")));
-
- /* Tell the caller that this wasn't a WalSender command. */
- return false;
-
default:
elog(ERROR, "unrecognized replication command node tag: %u",
- cmd_node->type);
+ cmd->replicationCmd->type);
}
- /* done */
- MemoryContextSwitchTo(old_context);
- MemoryContextDelete(cmd_context);
-
- /* Send CommandComplete message */
- EndCommand("SELECT", DestRemote);
-
return true;
}
diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c
index 75c2d9a61d..485e61f709 100644
--- a/src/backend/tcop/postgres.c
+++ b/src/backend/tcop/postgres.c
@@ -173,7 +173,6 @@ static int InteractiveBackend(StringInfo inBuf);
static int interactive_getc(void);
static int SocketBackend(StringInfo inBuf);
static int ReadCommand(StringInfo inBuf);
-static void forbidden_in_wal_sender(char firstchar);
static List *pg_rewrite_query(Query *query);
static bool check_log_statement(List *stmt_list);
static int errdetail_execute(List *raw_parsetree_list);
@@ -1015,6 +1014,14 @@ exec_simple_query(const char *query_string)
*/
if (analyze_requires_snapshot(parsetree))
{
+ if (am_walsender && !am_db_walsender)
+ {
+ /* FIXME: message */
+ ereport(ERROR,
+ (errcode(ERRCODE_PROTOCOL_VIOLATION),
+ errmsg("non-replication statement not supported in pure replication connection")));
+ }
+
PushActiveSnapshot(GetTransactionSnapshot());
snapshot_set = true;
}
@@ -1325,6 +1332,14 @@ exec_parse_message(const char *query_string, /* string to execute */
*/
if (analyze_requires_snapshot(raw_parse_tree))
{
+ if (am_walsender && !am_db_walsender)
+ {
+ /* FIXME: message */
+ ereport(ERROR,
+ (errcode(ERRCODE_PROTOCOL_VIOLATION),
+ errmsg("non-replication statement not supported in pure replication connection")));
+ }
+
PushActiveSnapshot(GetTransactionSnapshot());
snapshot_set = true;
}
@@ -1610,6 +1625,14 @@ exec_bind_message(StringInfo input_message)
(psrc->raw_parse_tree &&
analyze_requires_snapshot(psrc->raw_parse_tree)))
{
+ if (am_walsender && !am_db_walsender)
+ {
+ /* FIXME: message */
+ ereport(ERROR,
+ (errcode(ERRCODE_PROTOCOL_VIOLATION),
+ errmsg("non-replication statement not supported in pure replication connection")));
+ }
+
PushActiveSnapshot(GetTransactionSnapshot());
snapshot_set = true;
}
@@ -4066,13 +4089,7 @@ PostgresMain(int argc, char *argv[],
query_string = pq_getmsgstring(&input_message);
pq_getmsgend(&input_message);
- if (am_walsender)
- {
- if (!exec_replication_command(query_string))
- exec_simple_query(query_string);
- }
- else
- exec_simple_query(query_string);
+ exec_simple_query(query_string);
send_ready_for_query = true;
}
@@ -4085,8 +4102,6 @@ PostgresMain(int argc, char *argv[],
int numParams;
Oid *paramTypes = NULL;
- forbidden_in_wal_sender(firstchar);
-
/* Set statement_timestamp() */
SetCurrentStatementStartTimestamp();
@@ -4109,8 +4124,6 @@ PostgresMain(int argc, char *argv[],
break;
case 'B': /* bind */
- forbidden_in_wal_sender(firstchar);
-
/* Set statement_timestamp() */
SetCurrentStatementStartTimestamp();
@@ -4126,8 +4139,6 @@ PostgresMain(int argc, char *argv[],
const char *portal_name;
int max_rows;
- forbidden_in_wal_sender(firstchar);
-
/* Set statement_timestamp() */
SetCurrentStatementStartTimestamp();
@@ -4140,8 +4151,6 @@ PostgresMain(int argc, char *argv[],
break;
case 'F': /* fastpath function call */
- forbidden_in_wal_sender(firstchar);
-
/* Set statement_timestamp() */
SetCurrentStatementStartTimestamp();
@@ -4177,8 +4186,6 @@ PostgresMain(int argc, char *argv[],
int close_type;
const char *close_target;
- forbidden_in_wal_sender(firstchar);
-
close_type = pq_getmsgbyte(&input_message);
close_target = pq_getmsgstring(&input_message);
pq_getmsgend(&input_message);
@@ -4221,8 +4228,6 @@ PostgresMain(int argc, char *argv[],
int describe_type;
const char *describe_target;
- forbidden_in_wal_sender(firstchar);
-
/* Set statement_timestamp() (needed for xact) */
SetCurrentStatementStartTimestamp();
@@ -4305,30 +4310,6 @@ PostgresMain(int argc, char *argv[],
}
/*
- * Throw an error if we're a WAL sender process.
- *
- * This is used to forbid anything else than simple query protocol messages
- * in a WAL sender process. 'firstchar' specifies what kind of a forbidden
- * message was received, and is used to construct the error message.
- */
-static void
-forbidden_in_wal_sender(char firstchar)
-{
- if (am_walsender)
- {
- if (firstchar == 'F')
- ereport(ERROR,
- (errcode(ERRCODE_PROTOCOL_VIOLATION),
- errmsg("fastpath function calls not supported in a replication connection")));
- else
- ereport(ERROR,
- (errcode(ERRCODE_PROTOCOL_VIOLATION),
- errmsg("extended query protocol not supported in a replication connection")));
- }
-}
-
-
-/*
* Obtain platform stack depth limit (in bytes)
*
* Return -1 if unknown
diff --git a/src/backend/tcop/pquery.c b/src/backend/tcop/pquery.c
index e30aeb1c7f..450ee08461 100644
--- a/src/backend/tcop/pquery.c
+++ b/src/backend/tcop/pquery.c
@@ -1155,6 +1155,8 @@ PortalRunUtility(Portal portal, PlannedStmt *pstmt,
IsA(utilityStmt, VariableSetStmt) ||
IsA(utilityStmt, VariableShowStmt) ||
IsA(utilityStmt, ConstraintsSetStmt) ||
+ /* replication commands shouldn't run w/ snapshot */
+ IsA(utilityStmt, ReplicationCmd) ||
/* efficiency hacks from here down */
IsA(utilityStmt, FetchStmt) ||
IsA(utilityStmt, ListenStmt) ||
diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c
index 24e5c427c6..a313fde8d9 100644
--- a/src/backend/tcop/utility.c
+++ b/src/backend/tcop/utility.c
@@ -57,10 +57,12 @@
#include "commands/vacuum.h"
#include "commands/view.h"
#include "miscadmin.h"
+#include "nodes/replnodes.h"
#include "parser/parse_utilcmd.h"
#include "postmaster/bgwriter.h"
#include "rewrite/rewriteDefine.h"
#include "rewrite/rewriteRemove.h"
+#include "replication/walsender.h"
#include "storage/fd.h"
#include "tcop/pquery.h"
#include "tcop/utility.h"
@@ -923,6 +925,11 @@ standard_ProcessUtility(PlannedStmt *pstmt,
break;
}
+ case T_ReplicationCmd:
+ exec_replication_command((ReplicationCmd *) parsetree, queryString, isTopLevel);
+ strcpy(completionTag, CreateCommandTag(parsetree));
+ break;
+
default:
/* All other statement types have event trigger support */
ProcessUtilitySlow(pstate, pstmt, queryString,
@@ -2843,6 +2850,40 @@ CreateCommandTag(Node *parsetree)
}
break;
+ case T_ReplicationCmd:
+ switch (nodeTag(((ReplicationCmd *) parsetree)->replicationCmd))
+ {
+ case T_IdentifySystemCmd:
+ tag = "IDENTIFY_SYSTEM";
+ break;
+
+ case T_BaseBackupCmd:
+ tag = "BASE_BACKUP";
+ break;
+
+ case T_CreateReplicationSlotCmd:
+ tag = "CREATE_REPLICATION_SLOT";
+ break;
+
+ case T_DropReplicationSlotCmd:
+ tag = "DROP_REPLICATION_SLOT";
+ break;
+
+ case T_StartReplicationCmd:
+ tag = "START_REPLICATION";
+ break;
+
+ case T_TimeLineHistoryCmd:
+ tag = "TIMELINE_HISTORY";
+ break;
+ default:
+ elog(WARNING, "unrecognized replication command: %d",
+ (int) nodeTag(parsetree));
+ tag = "???";
+ break;
+ }
+ break;
+
default:
elog(WARNING, "unrecognized node type: %d",
(int) nodeTag(parsetree));
@@ -3349,6 +3390,11 @@ GetCommandLogLevel(Node *parsetree)
}
break;
+ case T_ReplicationCmd:
+ /* FIXME: Invent new category? */
+ lev = LOGSTMT_DDL;
+ break;
+
default:
elog(WARNING, "unrecognized node type: %d",
(int) nodeTag(parsetree));
diff --git a/src/bin/pg_basebackup/pg_recvlogical.c b/src/bin/pg_basebackup/pg_recvlogical.c
index 6b081bd737..2950c7b338 100644
--- a/src/bin/pg_basebackup/pg_recvlogical.c
+++ b/src/bin/pg_basebackup/pg_recvlogical.c
@@ -240,7 +240,7 @@ StreamLogicalLog(void)
replication_slot);
/* Initiate the replication stream at specified location */
- appendPQExpBuffer(query, "START_REPLICATION SLOT \"%s\" LOGICAL %X/%X",
+ appendPQExpBuffer(query, "START_REPLICATION SLOT \"%s\" LOGICAL '%X/%X'",
replication_slot, (uint32) (startpos >> 32), (uint32) startpos);
/* print options if there are any */
diff --git a/src/bin/pg_basebackup/receivelog.c b/src/bin/pg_basebackup/receivelog.c
index 8511e57cf7..5a11bb8c4f 100644
--- a/src/bin/pg_basebackup/receivelog.c
+++ b/src/bin/pg_basebackup/receivelog.c
@@ -583,7 +583,7 @@ ReceiveXlogStream(PGconn *conn, StreamCtl *stream)
return true;
/* Initiate the replication stream at specified location */
- snprintf(query, sizeof(query), "START_REPLICATION %s%X/%X TIMELINE %u",
+ snprintf(query, sizeof(query), "START_REPLICATION %s'%X/%X' TIMELINE %u",
slotcmd,
(uint32) (stream->startpos >> 32), (uint32) stream->startpos,
stream->timeline);
diff --git a/src/include/nodes/nodes.h b/src/include/nodes/nodes.h
index f59d719923..107b36fd6e 100644
--- a/src/include/nodes/nodes.h
+++ b/src/include/nodes/nodes.h
@@ -472,13 +472,13 @@ typedef enum NodeTag
/*
* TAGS FOR REPLICATION GRAMMAR PARSE NODES (replnodes.h)
*/
+ T_ReplicationCmd, /* container for all other replication commands */
T_IdentifySystemCmd,
T_BaseBackupCmd,
T_CreateReplicationSlotCmd,
T_DropReplicationSlotCmd,
T_StartReplicationCmd,
T_TimeLineHistoryCmd,
- T_SQLCmd,
/*
* TAGS FOR RANDOM OTHER STUFF
diff --git a/src/include/nodes/replnodes.h b/src/include/nodes/replnodes.h
index 92ada41b6d..b4afab85da 100644
--- a/src/include/nodes/replnodes.h
+++ b/src/include/nodes/replnodes.h
@@ -17,6 +17,14 @@
#include "access/xlogdefs.h"
#include "nodes/pg_list.h"
+
+typedef struct ReplicationCmd
+{
+ NodeTag type;
+
+ Node *replicationCmd;
+} ReplicationCmd;
+
typedef enum ReplicationKind
{
REPLICATION_KIND_PHYSICAL,
diff --git a/src/include/parser/gramparse.h b/src/include/parser/gramparse.h
index 2da98f67f3..0a3edc79e2 100644
--- a/src/include/parser/gramparse.h
+++ b/src/include/parser/gramparse.h
@@ -20,6 +20,7 @@
#define GRAMPARSE_H
#include "nodes/parsenodes.h"
+#include "access/xlogdefs.h"
#include "parser/scanner.h"
/*
diff --git a/src/include/parser/kwlist.h b/src/include/parser/kwlist.h
index 37542aaee4..7130803e21 100644
--- a/src/include/parser/kwlist.h
+++ b/src/include/parser/kwlist.h
@@ -53,6 +53,7 @@ PG_KEYWORD("attach", ATTACH, UNRESERVED_KEYWORD)
PG_KEYWORD("attribute", ATTRIBUTE, UNRESERVED_KEYWORD)
PG_KEYWORD("authorization", AUTHORIZATION, TYPE_FUNC_NAME_KEYWORD)
PG_KEYWORD("backward", BACKWARD, UNRESERVED_KEYWORD)
+PG_KEYWORD("base_backup", BASE_BACKUP, UNRESERVED_KEYWORD)
PG_KEYWORD("before", BEFORE, UNRESERVED_KEYWORD)
PG_KEYWORD("begin", BEGIN_P, UNRESERVED_KEYWORD)
PG_KEYWORD("between", BETWEEN, COL_NAME_KEYWORD)
@@ -99,6 +100,7 @@ PG_KEYWORD("conversion", CONVERSION_P, UNRESERVED_KEYWORD)
PG_KEYWORD("copy", COPY, UNRESERVED_KEYWORD)
PG_KEYWORD("cost", COST, UNRESERVED_KEYWORD)
PG_KEYWORD("create", CREATE, RESERVED_KEYWORD)
+PG_KEYWORD("create_replication_slot", CREATE_REPLICATION_SLOT, UNRESERVED_KEYWORD)
PG_KEYWORD("cross", CROSS, TYPE_FUNC_NAME_KEYWORD)
PG_KEYWORD("csv", CSV, UNRESERVED_KEYWORD)
PG_KEYWORD("cube", CUBE, UNRESERVED_KEYWORD)
@@ -139,6 +141,7 @@ PG_KEYWORD("document", DOCUMENT_P, UNRESERVED_KEYWORD)
PG_KEYWORD("domain", DOMAIN_P, UNRESERVED_KEYWORD)
PG_KEYWORD("double", DOUBLE_P, UNRESERVED_KEYWORD)
PG_KEYWORD("drop", DROP, UNRESERVED_KEYWORD)
+PG_KEYWORD("drop_replication_slot", DROP_REPLICATION_SLOT, UNRESERVED_KEYWORD)
PG_KEYWORD("each", EACH, UNRESERVED_KEYWORD)
PG_KEYWORD("else", ELSE, RESERVED_KEYWORD)
PG_KEYWORD("enable", ENABLE_P, UNRESERVED_KEYWORD)
@@ -155,11 +158,13 @@ PG_KEYWORD("exclusive", EXCLUSIVE, UNRESERVED_KEYWORD)
PG_KEYWORD("execute", EXECUTE, UNRESERVED_KEYWORD)
PG_KEYWORD("exists", EXISTS, COL_NAME_KEYWORD)
PG_KEYWORD("explain", EXPLAIN, UNRESERVED_KEYWORD)
+PG_KEYWORD("export_snapshot", EXPORT_SNAPSHOT, UNRESERVED_KEYWORD)
PG_KEYWORD("extension", EXTENSION, UNRESERVED_KEYWORD)
PG_KEYWORD("external", EXTERNAL, UNRESERVED_KEYWORD)
PG_KEYWORD("extract", EXTRACT, COL_NAME_KEYWORD)
PG_KEYWORD("false", FALSE_P, RESERVED_KEYWORD)
PG_KEYWORD("family", FAMILY, UNRESERVED_KEYWORD)
+PG_KEYWORD("fast", FAST_P, UNRESERVED_KEYWORD)
PG_KEYWORD("fetch", FETCH, RESERVED_KEYWORD)
PG_KEYWORD("filter", FILTER, UNRESERVED_KEYWORD)
PG_KEYWORD("first", FIRST_P, UNRESERVED_KEYWORD)
@@ -186,6 +191,7 @@ PG_KEYWORD("having", HAVING, RESERVED_KEYWORD)
PG_KEYWORD("header", HEADER_P, UNRESERVED_KEYWORD)
PG_KEYWORD("hold", HOLD, UNRESERVED_KEYWORD)
PG_KEYWORD("hour", HOUR_P, UNRESERVED_KEYWORD)
+PG_KEYWORD("identify_system", IDENTIFY_SYSTEM, UNRESERVED_KEYWORD)
PG_KEYWORD("identity", IDENTITY_P, UNRESERVED_KEYWORD)
PG_KEYWORD("if", IF_P, UNRESERVED_KEYWORD)
PG_KEYWORD("ilike", ILIKE, TYPE_FUNC_NAME_KEYWORD)
@@ -240,9 +246,11 @@ PG_KEYWORD("location", LOCATION, UNRESERVED_KEYWORD)
PG_KEYWORD("lock", LOCK_P, UNRESERVED_KEYWORD)
PG_KEYWORD("locked", LOCKED, UNRESERVED_KEYWORD)
PG_KEYWORD("logged", LOGGED, UNRESERVED_KEYWORD)
+PG_KEYWORD("logical", LOGICAL_P, UNRESERVED_KEYWORD)
PG_KEYWORD("mapping", MAPPING, UNRESERVED_KEYWORD)
PG_KEYWORD("match", MATCH, UNRESERVED_KEYWORD)
PG_KEYWORD("materialized", MATERIALIZED, UNRESERVED_KEYWORD)
+PG_KEYWORD("max_rate", MAX_RATE, UNRESERVED_KEYWORD)
PG_KEYWORD("maxvalue", MAXVALUE, UNRESERVED_KEYWORD)
PG_KEYWORD("method", METHOD, UNRESERVED_KEYWORD)
PG_KEYWORD("minute", MINUTE_P, UNRESERVED_KEYWORD)
@@ -258,6 +266,7 @@ PG_KEYWORD("nchar", NCHAR, COL_NAME_KEYWORD)
PG_KEYWORD("new", NEW, UNRESERVED_KEYWORD)
PG_KEYWORD("next", NEXT, UNRESERVED_KEYWORD)
PG_KEYWORD("no", NO, UNRESERVED_KEYWORD)
+PG_KEYWORD("noexport_snapshot", NOEXPORT_SNAPSHOT, UNRESERVED_KEYWORD)
PG_KEYWORD("none", NONE, COL_NAME_KEYWORD)
PG_KEYWORD("norefresh", NOREFRESH, UNRESERVED_KEYWORD)
PG_KEYWORD("not", NOT, RESERVED_KEYWORD)
@@ -297,6 +306,7 @@ PG_KEYWORD("partial", PARTIAL, UNRESERVED_KEYWORD)
PG_KEYWORD("partition", PARTITION, UNRESERVED_KEYWORD)
PG_KEYWORD("passing", PASSING, UNRESERVED_KEYWORD)
PG_KEYWORD("password", PASSWORD, UNRESERVED_KEYWORD)
+PG_KEYWORD("physical", PHYSICAL, UNRESERVED_KEYWORD)
PG_KEYWORD("placing", PLACING, RESERVED_KEYWORD)
PG_KEYWORD("plans", PLANS, UNRESERVED_KEYWORD)
PG_KEYWORD("policy", POLICY, UNRESERVED_KEYWORD)
@@ -312,6 +322,7 @@ PG_KEYWORD("privileges", PRIVILEGES, UNRESERVED_KEYWORD)
PG_KEYWORD("procedural", PROCEDURAL, UNRESERVED_KEYWORD)
PG_KEYWORD("procedure", PROCEDURE, UNRESERVED_KEYWORD)
PG_KEYWORD("program", PROGRAM, UNRESERVED_KEYWORD)
+PG_KEYWORD("progress", PROGRESS, UNRESERVED_KEYWORD)
PG_KEYWORD("publication", PUBLICATION, UNRESERVED_KEYWORD)
PG_KEYWORD("quote", QUOTE, UNRESERVED_KEYWORD)
PG_KEYWORD("range", RANGE, UNRESERVED_KEYWORD)
@@ -331,6 +342,7 @@ PG_KEYWORD("rename", RENAME, UNRESERVED_KEYWORD)
PG_KEYWORD("repeatable", REPEATABLE, UNRESERVED_KEYWORD)
PG_KEYWORD("replace", REPLACE, UNRESERVED_KEYWORD)
PG_KEYWORD("replica", REPLICA, UNRESERVED_KEYWORD)
+PG_KEYWORD("reserve_wal", RESERVE_WAL, UNRESERVED_KEYWORD)
PG_KEYWORD("reset", RESET, UNRESERVED_KEYWORD)
PG_KEYWORD("restart", RESTART, UNRESERVED_KEYWORD)
PG_KEYWORD("restrict", RESTRICT, UNRESERVED_KEYWORD)
@@ -374,6 +386,7 @@ PG_KEYWORD("sql", SQL_P, UNRESERVED_KEYWORD)
PG_KEYWORD("stable", STABLE, UNRESERVED_KEYWORD)
PG_KEYWORD("standalone", STANDALONE_P, UNRESERVED_KEYWORD)
PG_KEYWORD("start", START, UNRESERVED_KEYWORD)
+PG_KEYWORD("start_replication", START_REPLICATION, UNRESERVED_KEYWORD)
PG_KEYWORD("statement", STATEMENT, UNRESERVED_KEYWORD)
PG_KEYWORD("statistics", STATISTICS, UNRESERVED_KEYWORD)
PG_KEYWORD("stdin", STDIN, UNRESERVED_KEYWORD)
@@ -390,12 +403,15 @@ PG_KEYWORD("table", TABLE, RESERVED_KEYWORD)
PG_KEYWORD("tables", TABLES, UNRESERVED_KEYWORD)
PG_KEYWORD("tablesample", TABLESAMPLE, TYPE_FUNC_NAME_KEYWORD)
PG_KEYWORD("tablespace", TABLESPACE, UNRESERVED_KEYWORD)
+PG_KEYWORD("tablespace_map", TABLESPACE_MAP_P, UNRESERVED_KEYWORD)
PG_KEYWORD("temp", TEMP, UNRESERVED_KEYWORD)
PG_KEYWORD("template", TEMPLATE, UNRESERVED_KEYWORD)
PG_KEYWORD("temporary", TEMPORARY, UNRESERVED_KEYWORD)
PG_KEYWORD("text", TEXT_P, UNRESERVED_KEYWORD)
PG_KEYWORD("then", THEN, RESERVED_KEYWORD)
PG_KEYWORD("time", TIME, COL_NAME_KEYWORD)
+PG_KEYWORD("timeline", TIMELINE_P, UNRESERVED_KEYWORD)
+PG_KEYWORD("timeline_history", TIMELINE_HISTORY, UNRESERVED_KEYWORD)
PG_KEYWORD("timestamp", TIMESTAMP, COL_NAME_KEYWORD)
PG_KEYWORD("to", TO, RESERVED_KEYWORD)
PG_KEYWORD("trailing", TRAILING, RESERVED_KEYWORD)
@@ -419,6 +435,7 @@ PG_KEYWORD("unlisten", UNLISTEN, UNRESERVED_KEYWORD)
PG_KEYWORD("unlogged", UNLOGGED, UNRESERVED_KEYWORD)
PG_KEYWORD("until", UNTIL, UNRESERVED_KEYWORD)
PG_KEYWORD("update", UPDATE, UNRESERVED_KEYWORD)
+PG_KEYWORD("use_snapshot", USE_SNAPSHOT, UNRESERVED_KEYWORD)
PG_KEYWORD("user", USER, RESERVED_KEYWORD)
PG_KEYWORD("using", USING, RESERVED_KEYWORD)
PG_KEYWORD("vacuum", VACUUM, UNRESERVED_KEYWORD)
@@ -435,6 +452,7 @@ PG_KEYWORD("version", VERSION_P, UNRESERVED_KEYWORD)
PG_KEYWORD("view", VIEW, UNRESERVED_KEYWORD)
PG_KEYWORD("views", VIEWS, UNRESERVED_KEYWORD)
PG_KEYWORD("volatile", VOLATILE, UNRESERVED_KEYWORD)
+PG_KEYWORD("wal", WAL_P, UNRESERVED_KEYWORD)
PG_KEYWORD("when", WHEN, RESERVED_KEYWORD)
PG_KEYWORD("where", WHERE, RESERVED_KEYWORD)
PG_KEYWORD("whitespace", WHITESPACE_P, UNRESERVED_KEYWORD)
diff --git a/src/include/replication/walsender.h b/src/include/replication/walsender.h
index 2ca903872e..65026eb83b 100644
--- a/src/include/replication/walsender.h
+++ b/src/include/replication/walsender.h
@@ -16,6 +16,9 @@
#include "fmgr.h"
+#include "nodes/nodes.h"
+#include "nodes/replnodes.h"
+
/*
* What to do with a snapshot in create replication slot command.
*/
@@ -38,7 +41,7 @@ extern int wal_sender_timeout;
extern bool log_replication_commands;
extern void InitWalSender(void);
-extern bool exec_replication_command(const char *query_string);
+extern bool exec_replication_command(ReplicationCmd *cmd, const char *query_string, bool isToplevel);
extern void WalSndErrorCleanup(void);
extern void WalSndSignals(void);
extern Size WalSndShmemSize(void);
--
2.12.0.264.gd6db3f2165.dirty
Andres Freund <andres@anarazel.de> writes:
I've for a while suspected that the separation & duplication of
infrastructure between walsenders and normal backends isn't nice.
I think we should consider a more radical solution: trying to put
general SQL query capability into the replication protocol was a
bad idea and we should revert it while we still can. The uglinesses
you mention aren't merely implementation issues, they're an indication
that that concept is broken in itself.
regards, tom lane
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On Tue, Apr 25, 2017 at 11:34 PM, Tom Lane <tgl@sss.pgh.pa.us> wrote:
Andres Freund <andres@anarazel.de> writes:
I've for a while suspected that the separation & duplication of
infrastructure between walsenders and normal backends isn't nice.I think we should consider a more radical solution: trying to put
general SQL query capability into the replication protocol was a
bad idea and we should revert it while we still can. The uglinesses
you mention aren't merely implementation issues, they're an indication
that that concept is broken in itself.
I think that it's worth considering this option in order to "stabilize"
logical replication stuff before the release. The table sync patch
(which allows walsender to run normal queries) introduced such
uglinesses and increased the complexity in logical rep code.
OTOH, I believe that logical replication is still useful even without
initial table sync feature. So reverting the table sync patch seems
possible idea.
Regards,
--
Fujii Masao
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On 2017-04-25 10:34:20 -0400, Tom Lane wrote:
Andres Freund <andres@anarazel.de> writes:
I've for a while suspected that the separation & duplication of
infrastructure between walsenders and normal backends isn't nice.I think we should consider a more radical solution: trying to put
general SQL query capability into the replication protocol was a
bad idea and we should revert it while we still can. The uglinesses
you mention aren't merely implementation issues, they're an indication
that that concept is broken in itself.
I don't think that's the right solution, I think it's evidence that the
split was a bad idea in the first place. There's a growing amount of
duplication between the protocols, and a growing number of use-cases
that need facilities of both protocols. We e.g. now already have SHOW
statements in both, except that they behave slightly differently. For
logical rep we'd alternatively add more complexity because we'd need
both replication and non-replication connections (to stream changes, to
copy tables, to query config), which'd also complicate administration
because users & hba config have to be setup so the same user can connect
over both.
Therefore I think it's the implementation that's not perfect, but the
idea is perfectly sound. Having two awkwardly & increasingly different
languages in postgres doesn't sound like a sound idea.
- Andres
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On 25/04/17 17:13, Fujii Masao wrote:
On Tue, Apr 25, 2017 at 11:34 PM, Tom Lane <tgl@sss.pgh.pa.us> wrote:
Andres Freund <andres@anarazel.de> writes:
I've for a while suspected that the separation & duplication of
infrastructure between walsenders and normal backends isn't nice.I think we should consider a more radical solution: trying to put
general SQL query capability into the replication protocol was a
bad idea and we should revert it while we still can. The uglinesses
you mention aren't merely implementation issues, they're an indication
that that concept is broken in itself.I think that it's worth considering this option in order to "stabilize"
logical replication stuff before the release. The table sync patch
(which allows walsender to run normal queries) introduced such
uglinesses and increased the complexity in logical rep code.
OTOH, I believe that logical replication is still useful even without
initial table sync feature. So reverting the table sync patch seems
possible idea.
I don't think that's good idea, the usefulness if much lower without the
initial copy. The original patch for this added new commands to
replication protocol, adding generic SQL interface was result of request
in the reviews.
I personally don't mind moving back my original idea of special commands
if that was the consensus, but previous consensus was to do SQL instead.
--
Petr Jelinek http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Training & Services
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On Tue, Apr 25, 2017 at 2:24 PM, Petr Jelinek <petr.jelinek@2ndquadrant.com>
wrote:
On 25/04/17 17:13, Fujii Masao wrote:
On Tue, Apr 25, 2017 at 11:34 PM, Tom Lane <tgl@sss.pgh.pa.us> wrote:
OTOH, I believe that logical replication is still useful even without
initial table sync feature. So reverting the table sync patch seems
possible idea.I don't think that's good idea, the usefulness if much lower without the
initial copy. The original patch for this added new commands to
replication protocol, adding generic SQL interface was result of request
in the reviews.
Haven't followed this feature closely but my first thoughts when recently
reading about it were related to the initial copy and table synchronization
- so I'd have to agree with Petr here. Full table sync is big and for any
table with activity on it the confidence level of knowing you have
everything is greatly reduced if the system isn't making a guarantee.
David J.
On 2017-04-25 23:24:40 +0200, Petr Jelinek wrote:
On 25/04/17 17:13, Fujii Masao wrote:
On Tue, Apr 25, 2017 at 11:34 PM, Tom Lane <tgl@sss.pgh.pa.us> wrote:
Andres Freund <andres@anarazel.de> writes:
I've for a while suspected that the separation & duplication of
infrastructure between walsenders and normal backends isn't nice.I think we should consider a more radical solution: trying to put
general SQL query capability into the replication protocol was a
bad idea and we should revert it while we still can. The uglinesses
you mention aren't merely implementation issues, they're an indication
that that concept is broken in itself.I think that it's worth considering this option in order to "stabilize"
logical replication stuff before the release. The table sync patch
(which allows walsender to run normal queries) introduced such
uglinesses and increased the complexity in logical rep code.
OTOH, I believe that logical replication is still useful even without
initial table sync feature. So reverting the table sync patch seems
possible idea.I don't think that's good idea, the usefulness if much lower without the
initial copy.
Agreed. I think that'd move us way backwards, and we'd have to tackle
exactly the same issue in a few weeks again.
The original patch for this added new commands to
replication protocol, adding generic SQL interface was result of request
in the reviews.
Yea, I still think it's the right approach in general - I don't think
the patch itself was properly discussed and such though, being
essentially burried in another commit.
I personally don't mind moving back my original idea of special commands
if that was the consensus, but previous consensus was to do SQL instead.
I really don't think that'll solve anything.
- Andres
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On 26 April 2017 at 02:36, Andres Freund <andres@anarazel.de> wrote:
For
logical rep we'd alternatively add more complexity because we'd need
both replication and non-replication connections (to stream changes, to
copy tables, to query config), which'd also complicate administration
because users & hba config have to be setup so the same user can connect
over both.
We have experience of this from BDR and pglogical, of course, and it's
definitely a pain and source of user misconfiguration.
I'm not sure how practical it is to merge walsenders and regular
backends at this stage of pg10 development, but it seems like a
worthwhile goal down the track. I'd very much like to reduce the
amount of magic global juggling done by the walsender, unify the
XLogRead paths, unify the timeline following logic for physical
walsenders, logical walsenders and logical decoding on normal
backends, allow normal backends to be signaled when there's new WAL,
etc. I think there's a fair bit to do in order to do this well though.
--
Craig Ringer http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Training & Services
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On 2017-04-26 08:41:46 +0800, Craig Ringer wrote:
I'd very much like to reduce the amount of magic global juggling done
by the walsender, unify the XLogRead paths, unify the timeline
following logic for physical walsenders, logical walsenders and
logical decoding on normal backends, allow normal backends to be
signaled when there's new WAL, etc. I think there's a fair bit to do
in order to do this well though.
That all seems orthogonal however.
- Andres
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers