Initial Schema Sync for Logical Replication

Started by Kumar, Sachinalmost 3 years ago64 messages
#1Kumar, Sachin
ssetiya@amazon.com

Hi Everyone,

I am working on the initial schema sync for Logical replication. Currently, user have to
manually create a schema on subscriber side. Aim of this feature is to add an option in
create subscription, so that schema sync can be automatic. I am sharing Design Doc below,
but there are some corner cases where the design does not work. Please share your opinion
if design can be improved and we can get rid of corner cases. This design is loosely based
on Pglogical.
DDL replication is required for this feature.
(/messages/by-id/CAAD30U+pVmfKwUKy8cbZOnUXyguJ-uBNejwD75Kyo=OjdQGJ9g@mail.gmail.com)

SQL Changes:-
CREATE SUBSCRIPTION subscription_name
CONNECTION 'conninfo'
PUBLICATION publication_name [, ...]
[ WITH ( subscription_parameter [= value] [, ... ] ) ]
sync_initial_schema (enum) will be added to subscription_parameter.
It can have 3 values:-
TABLES, ALL , NONE (Default)
In ALL everything will be synced including global objects too.

Restrictions :- sync_initial_schema=ALL can only be used for publication with FOR ALL TABLES

Design:-

Publisher :-
Publisher have to implement `SHOW CREATE TABLE_NAME`, this table definition will be used by
subscriber to create exact schema of a table on the subscriber. One alternative to this can
be doing it on the subscriber side itself, we can create a function similar to
describeOneTableDetails and call it on the subscriber. We also need maintain same ownership
as of publisher.

It should also have turned on publication of DDL commands.

Subscriber :-

1. In CreateSubscription() when we create replication slot(walrcv_create_slot()), should
use CRS_EXPORT_SNAPSHOT, So that we can use this snapshot later in the pg_dump.

2. Now we can call pg_dump with above snapshot from CreateSubscription. This is inside
opts.connect && opts.create_slot if statement. If we fail in this step we have to drop
the replication slot and create a new one again. Because we need snapshot and creating a
replication slot is a way to get snapshot. The reason for running pg_dump with above
snapshot is that we don't want execute DDLs in wal_logs to 2 times. With above snapshot we
get a state of database which is before the replication slot origin and any changes after
the snapshot will be in wal_logs.

We will save the pg_dump into a file (custom archive format). So pg_dump will be similar to
pg_dump --connection_string --schema_only --snapshot=xyz -Fc --file initSchema

If sync_initial_schema=TABLES we dont have to call pg_dump/restore at all. TableSync process
will take care of it.

3. If we have to sync global objects we need to call pg_dumpall --globals-only also. But pg_dumpall
does not support --snapshot option, So if user creates a new global object between creation
of replication slot and running pg_dumpall, that above global object will be created 2
times on subscriber , which will error out the Applier process.

4. walrcv_disconnect should be called after pg_dump is finished, otherwise snapshot will
not be valid.

5. Users will replication role cant not call pg_dump , So the replication user have to
superuser. This is a a major problem.
postgres=# create role s4 WITH LOGIN Replication;
CREATE ROLE
╭─sachin@DUB-1800550165 ~
╰─$ pg_dump postgres -s -U s4 1 ↵
pg_dump: error: query failed: ERROR: permission denied for table t1
pg_dump: detail: Query was: LOCK TABLE public.t1, public.t2 IN ACCESS SHARE MODE

6. pg_subscription_rel table column srsubstate will have one more state
SUBREL_STATE_CREATE 'c'. if sync_initial_schema is enabled we will set table_state to 'c'.
Above 6 steps will be done even if subscription is not enabled, but connect is true.

7. Leader Applier process should check if initSync file exist , if true then it should
call pg_restore. We are not using —pre-data and —post-data segment as it is used in
Pglogical, Because post_data works on table having data , but we will fill the data into
table on later stages. pg_restore can be called like this

pg_restore --connection_string -1 file_name
-1 option will execute every command inside of one transaction. If there is any error
everything will be rollbacked.
pg_restore should be called quite early in the Applier process code, before any tablesync
process can be created.
Instead of checking if file exist maybe pg_subscription table can be extended with column
SyncInitialSchema and applier process will check SyncInitialSchema == SYNC_PENDING

8. TableSync process should check the state of table , if it is SUBREL_STATE_CREATE it should
get the latest definition from the publisher and recreate the table. (We have to recreate
the table even if there are no changes). Then it should go into copy table mode as usual.

It might seem that TableSync is doing duplicate work already done by pg_restore. We are doing
it in this way because of concurrent DDLs and refresh publication command.

Concurrent DDL :-
User can execute a DDL command to table t1 at the same time when subscriber is trying to sync
it. pictorial representation https://imgur.com/a/ivrIEv8 [1]

In tablesync process, it makes a connection to the publisher and it sees the
table state which can be in future wrt to the publisher, which can introduce conflicts.
For example:-

CASE 1:- { Publisher removed the column b from the table t1 when subscriber was doing pg_restore
(or any point in concurrent DDL window described in picture [1] ), when tableSync
process will start transaction on the publisher it will see request data of table t1
including column b, which does not exist on the publisher.} So that is why tableSync process
asks for the latest definition.
If we say that we will delay tableSync worker till all the DDL related to table t1 is
applied by the applier process , we can still have a window when publisher issues a DDL
command just before tableSync starts its transaction, and therefore making tableSync and
publisher table definition incompatible (Thanks to Masahiko for pointing out this race
condition).

Applier process will skip all DDL/DMLs related to the table t1 and tableSync will apply those
in Catchup phase.
Although there is one issue what will happen to views/ or functions which depend on the table
. I think they should wait till table_state is > SUBREL_STATE_CREATE (means we have the latest
schema definition from the publisher).
There might be corner cases to this approach or maybe a better way to handle concurrent DDL
One simple solution might be to disallow DDLs on the publisher till all the schema is
synced and all tables have state >= SUBREL_STATE_DATASYNC (We can have CASE 1: issue ,
even with DDL replication, so we have to wait till all the tables have table_state

SUBREL_STATE_DATASYNC). Which might be a big window for big databases.

Refresh publication :-
In refresh publication, subscriber does create a new replication slot hence , we can’t run
pg_dump with a snapshot which starts from origin(maybe this is not an issue at all). In this case
it makes more sense for tableSync worker to do schema sync.

If community is happy with above design, I can start working on prototype.

Credits :- This design is inspired by Pglogical. Also thanks to Zane, Masahiko, Amit for reviewing earlier designs

Regards
Sachin Kumar
Amazon Web Services

#2Peter Smith
smithpb2250@gmail.com
In reply to: Kumar, Sachin (#1)
Re: Initial Schema Sync for Logical Replication

Hi,

I have a couple of questions.

Q1.

What happens if the subscriber already has some tables present? For
example, I did not see the post saying anything like "Only if the
table does not already exist then it will be created".

On the contrary, the post seemed to say SUBREL_STATE_CREATE 'c' would
*always* be set when this subscriber mode is enabled. And then it
seemed to say the table would *always* get re-created by the tablesync
in this new mode.

Won't this cause problems
- if the user wanted a slightly different subscriber-side table? (eg
some extra columns on the subscriber-side table)
- if there was some pre-existing table data on the subscriber-side
table that you now are about to re-create and clobber?

Or does the idea intend that the CREATE TABLE DDL that will be
executed is like "CREATE TABLE ... IF NOT EXISTS"?

~~~

Q2.

The post says. "DDL replication is required for this feature". And "It
should also have turned on publication of DDL commands."

It wasn't entirely clear to me why those must be a requirement. Is
that just to make implementation easier?

Sure, I see that the idea might have some (or maybe a lot?) of common
internal code with the table DDL replication work, but OTOH an
auto-create feature for subscriber tables seems like it might be a
useful feature to have regardless of the value of the publication
'ddl' parameter.

------
Kind Regards,
Peter Smith.
Fujitsu Australia.

#3Kumar, Sachin
ssetiya@amazon.com
In reply to: Peter Smith (#2)
RE: Initial Schema Sync for Logical Replication

Hi Peter,

Hi,

I have a couple of questions.

Q1.

What happens if the subscriber already has some tables present? For
example, I did not see the post saying anything like "Only if the table does
not already exist then it will be created".

My assumption was the if subscriber is doing initial schema sync , It does not have
any conflicting database objects.

On the contrary, the post seemed to say SUBREL_STATE_CREATE 'c' would
*always* be set when this subscriber mode is enabled. And then it seemed
to say the table would *always* get re-created by the tablesync in this new
mode.

Right

Won't this cause problems
- if the user wanted a slightly different subscriber-side table? (eg some extra
columns on the subscriber-side table)
- if there was some pre-existing table data on the subscriber-side table that
you now are about to re-create and clobber?

Or does the idea intend that the CREATE TABLE DDL that will be executed is
like "CREATE TABLE ... IF NOT EXISTS"?

pg_dump does not support --if-not-exists , But I think it can be added and we get a
dump with IF NOT EXISTS.
On subscriber side we get table OID list, we can use this change table_state
= SUBREL_STATE_INIT so that it won't be recreated.

~~~

Q2.

The post says. "DDL replication is required for this feature". And "It should
also have turned on publication of DDL commands."

It wasn't entirely clear to me why those must be a requirement. Is that just to
make implementation easier?

DDL replication is needed to facilitate concurrent DDL, so that we don’t have to
worry about schema change at the same time when subscriber is initializing.
if we can block publisher so that it does not do DDLs or subscriber can simple
error out if it sees conflicting table information , then we don’t need to use DDL
replication.
Regards
Sachin

Show quoted text

Sure, I see that the idea might have some (or maybe a lot?) of common
internal code with the table DDL replication work, but OTOH an auto-create
feature for subscriber tables seems like it might be a useful feature to have
regardless of the value of the publication 'ddl' parameter.

------
Kind Regards,
Peter Smith.
Fujitsu Australia.

#4Alvaro Herrera
alvherre@alvh.no-ip.org
In reply to: Kumar, Sachin (#1)
Re: Initial Schema Sync for Logical Replication

On 2023-Mar-15, Kumar, Sachin wrote:

1. In CreateSubscription() when we create replication slot(walrcv_create_slot()), should
use CRS_EXPORT_SNAPSHOT, So that we can use this snapshot later in the pg_dump.

2. Now we can call pg_dump with above snapshot from CreateSubscription.

Overall I'm not on board with the idea that logical replication would
depend on pg_dump; that seems like it could run into all sorts of
trouble (what if calling external binaries requires additional security
setup? what about pg_hba connection requirements? what about
max_connections in tight circumstances?).

It would be much better, I think, to handle this internally in the
publisher instead: similar to how DDL sync would work, except it'd
somehow generate the CREATE statements from the existing tables instead
of waiting for DDL events to occur. I grant that this does require
writing a bunch of new code for each object type, a lot of which would
duplicate the pg_dump logic, but it would probably be a lot more robust.

--
Álvaro Herrera PostgreSQL Developer — https://www.EnterpriseDB.com/

#5Amit Kapila
amit.kapila16@gmail.com
In reply to: Kumar, Sachin (#3)
Re: Initial Schema Sync for Logical Replication

On Thu, Mar 16, 2023 at 10:27 PM Kumar, Sachin <ssetiya@amazon.com> wrote:

Hi,

I have a couple of questions.

Q1.

What happens if the subscriber already has some tables present? For
example, I did not see the post saying anything like "Only if the table does
not already exist then it will be created".

My assumption was the if subscriber is doing initial schema sync , It does not have
any conflicting database objects.

Can't we simply error out in such a case with "obj already exists"?
This would be similar to how we deal with conflicting rows with
unique/primary keys.

--
With Regards,
Amit Kapila.

#6Kumar, Sachin
ssetiya@amazon.com
In reply to: Amit Kapila (#5)
RE: Initial Schema Sync for Logical Replication

Hi Amit,

From: Amit Kapila <amit.kapila16@gmail.com>

Hi,

I have a couple of questions.

Q1.

What happens if the subscriber already has some tables present? For
example, I did not see the post saying anything like "Only if the
table does not already exist then it will be created".

My assumption was the if subscriber is doing initial schema sync , It
does not have any conflicting database objects.

Can't we simply error out in such a case with "obj already exists"?
This would be similar to how we deal with conflicting rows with unique/primary
keys.

Right this is the default behaviour , We will run pg_restore with --single_transaction,
So if we get error while executing a create table the whole pg_restore will fail and
user will notified.
Regards
Sachin

#7Kumar, Sachin
ssetiya@amazon.com
In reply to: Alvaro Herrera (#4)
RE: Initial Schema Sync for Logical Replication

Hi Alvaro,

From: Alvaro Herrera <alvherre@alvh.no-ip.org>
Subject: RE: [EXTERNAL]Initial Schema Sync for Logical Replication
On 2023-Mar-15, Kumar, Sachin wrote:

1. In CreateSubscription() when we create replication
slot(walrcv_create_slot()), should use CRS_EXPORT_SNAPSHOT, So that we

can use this snapshot later in the pg_dump.

2. Now we can call pg_dump with above snapshot from CreateSubscription.

Overall I'm not on board with the idea that logical replication would depend on
pg_dump; that seems like it could run into all sorts of trouble (what if calling
external binaries requires additional security setup? what about pg_hba
connection requirements? what about max_connections in tight
circumstances?).
what if calling external binaries requires additional security setup

I am not sure what kind of security restriction would apply in this case, maybe pg_dump
binary can be changed ?

what about pg_hba connection requirements?

We will use the same connection string which subscriber process uses to connect to
the publisher.

what about max_connections in tight circumstances?

Right that might be a issue, but I don’t think it will be a big issue, We will create dump
of database in CreateSubscription() function itself , So before tableSync process even starts
if we have reached max_connections while calling pg_dump itself , tableSync wont be successful.

It would be much better, I think, to handle this internally in the publisher instead:
similar to how DDL sync would work, except it'd somehow generate the CREATE
statements from the existing tables instead of waiting for DDL events to occur. I
grant that this does require writing a bunch of new code for each object type, a
lot of which would duplicate the pg_dump logic, but it would probably be a lot
more robust.

Agree , But we might have a lots of code duplication essentially almost all of pg_dump
Code needs to be duplicated, which might cause issue when modifying/adding new
DDLs.
I am not sure but if it's possible to move dependent code of pg_dump to common/ folder
, to avoid duplication.

Regards
Sachin

#8Euler Taveira
euler@eulerto.com
In reply to: Kumar, Sachin (#7)
Re: Initial Schema Sync for Logical Replication

On Mon, Mar 20, 2023, at 10:10 PM, Kumar, Sachin wrote:

From: Alvaro Herrera <alvherre@alvh.no-ip.org>
Subject: RE: [EXTERNAL]Initial Schema Sync for Logical Replication
On 2023-Mar-15, Kumar, Sachin wrote:

1. In CreateSubscription() when we create replication
slot(walrcv_create_slot()), should use CRS_EXPORT_SNAPSHOT, So that we

can use this snapshot later in the pg_dump.

2. Now we can call pg_dump with above snapshot from CreateSubscription.

Overall I'm not on board with the idea that logical replication would depend on
pg_dump; that seems like it could run into all sorts of trouble (what if calling
external binaries requires additional security setup? what about pg_hba
connection requirements? what about max_connections in tight
circumstances?).
what if calling external binaries requires additional security setup

I am not sure what kind of security restriction would apply in this case, maybe pg_dump
binary can be changed ?

Using pg_dump as part of this implementation is not acceptable because we
expect the backend to be decoupled from the client. Besides that, pg_dump
provides all table dependencies (such as tablespaces, privileges, security
labels, comments); not all dependencies shouldn't be replicated. You should
exclude them removing these objects from the TOC before running pg_restore or
adding a few pg_dump options to exclude these objects. Another issue is related
to different version. Let's say the publisher has a version ahead of the
subscriber version, a new table syntax can easily break your logical
replication setup. IMO pg_dump doesn't seem like a good solution for initial
synchronization.

Instead, the backend should provide infrastructure to obtain the required DDL
commands for the specific (set of) tables. This can work around the issues from
the previous paragraph:

* you can selectively choose dependencies;
* don't require additional client packages;
* don't need to worry about different versions.

This infrastructure can also be useful for other use cases such as:

* client tools that provide create commands (such as psql, pgAdmin);
* other logical replication solutions;
* other logical backup solutions.

--
Euler Taveira
EDB https://www.enterprisedb.com/

#9Amit Kapila
amit.kapila16@gmail.com
In reply to: Euler Taveira (#8)
Re: Initial Schema Sync for Logical Replication

On Tue, Mar 21, 2023 at 7:32 AM Euler Taveira <euler@eulerto.com> wrote:

On Mon, Mar 20, 2023, at 10:10 PM, Kumar, Sachin wrote:

From: Alvaro Herrera <alvherre@alvh.no-ip.org>
Subject: RE: [EXTERNAL]Initial Schema Sync for Logical Replication
On 2023-Mar-15, Kumar, Sachin wrote:

1. In CreateSubscription() when we create replication
slot(walrcv_create_slot()), should use CRS_EXPORT_SNAPSHOT, So that we

can use this snapshot later in the pg_dump.

2. Now we can call pg_dump with above snapshot from CreateSubscription.

Overall I'm not on board with the idea that logical replication would depend on
pg_dump; that seems like it could run into all sorts of trouble (what if calling
external binaries requires additional security setup? what about pg_hba
connection requirements? what about max_connections in tight
circumstances?).
what if calling external binaries requires additional security setup

I am not sure what kind of security restriction would apply in this case, maybe pg_dump
binary can be changed ?

Using pg_dump as part of this implementation is not acceptable because we
expect the backend to be decoupled from the client. Besides that, pg_dump
provides all table dependencies (such as tablespaces, privileges, security
labels, comments); not all dependencies shouldn't be replicated.

I agree that in the initial version we may not support sync of all
objects but why that shouldn't be possible in the later versions?

You should
exclude them removing these objects from the TOC before running pg_restore or
adding a few pg_dump options to exclude these objects. Another issue is related
to different version. Let's say the publisher has a version ahead of the
subscriber version, a new table syntax can easily break your logical
replication setup. IMO pg_dump doesn't seem like a good solution for initial
synchronization.

Instead, the backend should provide infrastructure to obtain the required DDL
commands for the specific (set of) tables. This can work around the issues from
the previous paragraph:

...

* don't need to worry about different versions.

AFAICU some of the reasons why pg_dump is not allowed to dump from the
newer version are as follows: (a) there could be more columns in the
newer version of the system catalog and then Select * type of stuff
won't work because the client won't have knowledge of additional
columns. (b) the newer version could have new features (represented by
say new columns in existing catalogs or new catalogs) that the older
version of pg_dump has no knowledge of and will fail to get that data
and hence an inconsistent dump. The subscriber will easily be not in
sync due to that.

Now, how do we avoid these problems even if we have our own version of
functionality similar to pg_dump for selected objects? I guess we will
face similar problems. If so, we may need to deny schema sync in any
such case.

--
With Regards,
Amit Kapila.

#10Masahiko Sawada
sawada.mshk@gmail.com
In reply to: Amit Kapila (#9)
Re: Initial Schema Sync for Logical Replication

On Tue, Mar 21, 2023 at 8:18 PM Amit Kapila <amit.kapila16@gmail.com> wrote:

On Tue, Mar 21, 2023 at 7:32 AM Euler Taveira <euler@eulerto.com> wrote:

On Mon, Mar 20, 2023, at 10:10 PM, Kumar, Sachin wrote:

From: Alvaro Herrera <alvherre@alvh.no-ip.org>
Subject: RE: [EXTERNAL]Initial Schema Sync for Logical Replication
On 2023-Mar-15, Kumar, Sachin wrote:

1. In CreateSubscription() when we create replication
slot(walrcv_create_slot()), should use CRS_EXPORT_SNAPSHOT, So that we

can use this snapshot later in the pg_dump.

2. Now we can call pg_dump with above snapshot from CreateSubscription.

Overall I'm not on board with the idea that logical replication would depend on
pg_dump; that seems like it could run into all sorts of trouble (what if calling
external binaries requires additional security setup? what about pg_hba
connection requirements? what about max_connections in tight
circumstances?).
what if calling external binaries requires additional security setup

I am not sure what kind of security restriction would apply in this case, maybe pg_dump
binary can be changed ?

Using pg_dump as part of this implementation is not acceptable because we
expect the backend to be decoupled from the client. Besides that, pg_dump
provides all table dependencies (such as tablespaces, privileges, security
labels, comments); not all dependencies shouldn't be replicated.

I agree that in the initial version we may not support sync of all
objects but why that shouldn't be possible in the later versions?

You should
exclude them removing these objects from the TOC before running pg_restore or
adding a few pg_dump options to exclude these objects. Another issue is related
to different version. Let's say the publisher has a version ahead of the
subscriber version, a new table syntax can easily break your logical
replication setup. IMO pg_dump doesn't seem like a good solution for initial
synchronization.

Instead, the backend should provide infrastructure to obtain the required DDL
commands for the specific (set of) tables. This can work around the issues from
the previous paragraph:

...

* don't need to worry about different versions.

AFAICU some of the reasons why pg_dump is not allowed to dump from the
newer version are as follows: (a) there could be more columns in the
newer version of the system catalog and then Select * type of stuff
won't work because the client won't have knowledge of additional
columns. (b) the newer version could have new features (represented by
say new columns in existing catalogs or new catalogs) that the older
version of pg_dump has no knowledge of and will fail to get that data
and hence an inconsistent dump. The subscriber will easily be not in
sync due to that.

Now, how do we avoid these problems even if we have our own version of
functionality similar to pg_dump for selected objects? I guess we will
face similar problems.

Right. I think that such functionality needs to return DDL commands
that can be executed on the requested version.

If so, we may need to deny schema sync in any such case.

Yes. Do we have any concrete use case where the subscriber is an older
version, in the first place?

Regards,

--
Masahiko Sawada
Amazon Web Services: https://aws.amazon.com

#11Amit Kapila
amit.kapila16@gmail.com
In reply to: Masahiko Sawada (#10)
Re: Initial Schema Sync for Logical Replication

On Wed, Mar 22, 2023 at 8:29 AM Masahiko Sawada <sawada.mshk@gmail.com> wrote:

On Tue, Mar 21, 2023 at 8:18 PM Amit Kapila <amit.kapila16@gmail.com> wrote:

On Tue, Mar 21, 2023 at 7:32 AM Euler Taveira <euler@eulerto.com> wrote:

You should
exclude them removing these objects from the TOC before running pg_restore or
adding a few pg_dump options to exclude these objects. Another issue is related
to different version. Let's say the publisher has a version ahead of the
subscriber version, a new table syntax can easily break your logical
replication setup. IMO pg_dump doesn't seem like a good solution for initial
synchronization.

Instead, the backend should provide infrastructure to obtain the required DDL
commands for the specific (set of) tables. This can work around the issues from
the previous paragraph:

...

* don't need to worry about different versions.

AFAICU some of the reasons why pg_dump is not allowed to dump from the
newer version are as follows: (a) there could be more columns in the
newer version of the system catalog and then Select * type of stuff
won't work because the client won't have knowledge of additional
columns. (b) the newer version could have new features (represented by
say new columns in existing catalogs or new catalogs) that the older
version of pg_dump has no knowledge of and will fail to get that data
and hence an inconsistent dump. The subscriber will easily be not in
sync due to that.

Now, how do we avoid these problems even if we have our own version of
functionality similar to pg_dump for selected objects? I guess we will
face similar problems.

Right. I think that such functionality needs to return DDL commands
that can be executed on the requested version.

If so, we may need to deny schema sync in any such case.

Yes. Do we have any concrete use case where the subscriber is an older
version, in the first place?

As per my understanding, it is mostly due to the reason that it can
work today. Today, during an off-list discussion with Jonathan on this
point, he pointed me to a similar incompatibility in MySQL
replication. See the "SQL incompatibilities" section in doc[1]https://dev.mysql.com/doc/refman/8.0/en/replication-compatibility.html. Also,
please note that this applies not only to initial sync but also to
schema sync during replication. I don't think it would be feasible to
keep such cross-version compatibility for DDL replication.

Having said above, I don't intend that we must use pg_dump from the
subscriber for the purpose of initial sync. I think the idea at this
stage is to primarily write a POC patch to see what difficulties we
may face. The other options that we could try out are (a) try to
duplicate parts of pg_dump code in some way (by extracting required
code) for the subscription's initial sync, or (b) have a common code
(probably as a library or some other way) for the required
functionality. There could be more possibilities that we may not have
thought of yet. But the main point is that for approaches other than
using pg_dump, we should consider ways to avoid duplicity of various
parts of its code. Due to this, I think before ruling out using
pg_dump, we should be clear about its risks and limitations.

Thoughts?

[1]: https://dev.mysql.com/doc/refman/8.0/en/replication-compatibility.html
[2]: /messages/by-id/CAAD30U+pVmfKwUKy8cbZOnUXyguJ-uBNejwD75Kyo=OjdQGJ9g@mail.gmail.com

--
With Regards,
Amit Kapila.

#12houzj.fnst@fujitsu.com
houzj.fnst@fujitsu.com
In reply to: Amit Kapila (#11)
RE: Initial Schema Sync for Logical Replication

On Wednesday, March 22, 2023 1:16 PM Amit Kapila <amit.kapila16@gmail.com> wrote:

On Wed, Mar 22, 2023 at 8:29 AM Masahiko Sawada
<sawada.mshk@gmail.com> wrote:

On Tue, Mar 21, 2023 at 8:18 PM Amit Kapila <amit.kapila16@gmail.com>

wrote:

On Tue, Mar 21, 2023 at 7:32 AM Euler Taveira <euler@eulerto.com> wrote:

You should
exclude them removing these objects from the TOC before running
pg_restore or adding a few pg_dump options to exclude these
objects. Another issue is related to different version. Let's say
the publisher has a version ahead of the subscriber version, a new
table syntax can easily break your logical replication setup. IMO
pg_dump doesn't seem like a good solution for initial synchronization.

Instead, the backend should provide infrastructure to obtain the
required DDL commands for the specific (set of) tables. This can
work around the issues from the previous paragraph:

...

* don't need to worry about different versions.

AFAICU some of the reasons why pg_dump is not allowed to dump from
the newer version are as follows: (a) there could be more columns in
the newer version of the system catalog and then Select * type of
stuff won't work because the client won't have knowledge of
additional columns. (b) the newer version could have new features
(represented by say new columns in existing catalogs or new
catalogs) that the older version of pg_dump has no knowledge of and
will fail to get that data and hence an inconsistent dump. The
subscriber will easily be not in sync due to that.

Now, how do we avoid these problems even if we have our own version
of functionality similar to pg_dump for selected objects? I guess we
will face similar problems.

Right. I think that such functionality needs to return DDL commands
that can be executed on the requested version.

If so, we may need to deny schema sync in any such case.

Yes. Do we have any concrete use case where the subscriber is an older
version, in the first place?

As per my understanding, it is mostly due to the reason that it can work today.
Today, during an off-list discussion with Jonathan on this point, he pointed me
to a similar incompatibility in MySQL replication. See the "SQL
incompatibilities" section in doc[1]. Also, please note that this applies not only
to initial sync but also to schema sync during replication. I don't think it would
be feasible to keep such cross-version compatibility for DDL replication.

Having said above, I don't intend that we must use pg_dump from the
subscriber for the purpose of initial sync. I think the idea at this stage is to
primarily write a POC patch to see what difficulties we may face. The other
options that we could try out are (a) try to duplicate parts of pg_dump code in
some way (by extracting required
code) for the subscription's initial sync, or (b) have a common code (probably
as a library or some other way) for the required functionality. There could be
more possibilities that we may not have thought of yet. But the main point is
that for approaches other than using pg_dump, we should consider ways to
avoid duplicity of various parts of its code. Due to this, I think before ruling out
using pg_dump, we should be clear about its risks and limitations.

I thought about some possible problems about the design of using pg_dump.

1) According to the design, it will internally call pg_dump when creating
subscription, but it requires to use a powerful user when calling pg_dump.
Currently, it may not be a problem because create subscription also requires
superuser. But people have recently discussed about allowing non-superuser to
create the subscription[1]/messages/by-id/20230308194743.23rmgjgwahh4i4rg@awork3.anarazel.de, if that is accepted, then it seems not great to
internally use superuser to call pg_dump while the user creating the
subscription is a non-super user.

2) I think it's possible that some cloud DB service doesn't allow user to use
the client commands(pg_dump ,..) directly, and the user that login in the
database may not have the permission to execute the client commands.

[1]: /messages/by-id/20230308194743.23rmgjgwahh4i4rg@awork3.anarazel.de

Best Regards,
Hou zj

#13Kumar, Sachin
ssetiya@amazon.com
In reply to: Amit Kapila (#11)
RE: Initial Schema Sync for Logical Replication

From: Amit Kapila <amit.kapila16@gmail.com>
Sent: Wednesday, March 22, 2023 5:16 AM
To: Masahiko Sawada <sawada.mshk@gmail.com>
Cc: Euler Taveira <euler@eulerto.com>; Kumar, Sachin
<ssetiya@amazon.com>; Alvaro Herrera <alvherre@alvh.no-ip.org>; pgsql-
hackers@lists.postgresql.org; Jonathan S. Katz <jkatz@postgresql.org>
Subject: RE: [EXTERNAL]Initial Schema Sync for Logical Replication

CAUTION: This email originated from outside of the organization. Do not click
links or open attachments unless you can confirm the sender and know the
content is safe.

On Wed, Mar 22, 2023 at 8:29 AM Masahiko Sawada
<sawada.mshk@gmail.com> wrote:

On Tue, Mar 21, 2023 at 8:18 PM Amit Kapila <amit.kapila16@gmail.com>

wrote:

On Tue, Mar 21, 2023 at 7:32 AM Euler Taveira <euler@eulerto.com>

wrote:

You should
exclude them removing these objects from the TOC before running
pg_restore or adding a few pg_dump options to exclude these
objects. Another issue is related to different version. Let's say
the publisher has a version ahead of the subscriber version, a new
table syntax can easily break your logical replication setup. IMO
pg_dump doesn't seem like a good solution for initial synchronization.

Instead, the backend should provide infrastructure to obtain the
required DDL commands for the specific (set of) tables. This can
work around the issues from the previous paragraph:

...

* don't need to worry about different versions.

AFAICU some of the reasons why pg_dump is not allowed to dump from
the newer version are as follows: (a) there could be more columns in
the newer version of the system catalog and then Select * type of
stuff won't work because the client won't have knowledge of
additional columns. (b) the newer version could have new features
(represented by say new columns in existing catalogs or new
catalogs) that the older version of pg_dump has no knowledge of and
will fail to get that data and hence an inconsistent dump. The
subscriber will easily be not in sync due to that.

Now, how do we avoid these problems even if we have our own version
of functionality similar to pg_dump for selected objects? I guess we
will face similar problems.

Right. I think that such functionality needs to return DDL commands
that can be executed on the requested version.

If so, we may need to deny schema sync in any such case.

Yes. Do we have any concrete use case where the subscriber is an older
version, in the first place?

As per my understanding, it is mostly due to the reason that it can work
today. Today, during an off-list discussion with Jonathan on this point, he
pointed me to a similar incompatibility in MySQL replication. See the "SQL
incompatibilities" section in doc[1]. Also, please note that this applies not
only to initial sync but also to schema sync during replication. I don't think it
would be feasible to keep such cross-version compatibility for DDL
replication.

Having said above, I don't intend that we must use pg_dump from the
subscriber for the purpose of initial sync. I think the idea at this stage is to
primarily write a POC patch to see what difficulties we may face. The other
options that we could try out are (a) try to duplicate parts of pg_dump code
in some way (by extracting required
code) for the subscription's initial sync, or (b) have a common code (probably
as a library or some other way) for the required functionality. There could be
more possibilities that we may not have thought of yet. But the main point is
that for approaches other than using pg_dump, we should consider ways to
avoid duplicity of various parts of its code. Due to this, I think before ruling
out using pg_dump, we should be clear about its risks and limitations.

Thoughts?

There is one more thing which needs to be consider even if we use pg_dump/pg_restore
We still need to have a way to get the create table for tables , if we want to support
concurrent DDLs on the publisher.

8. TableSync process should check the state of table , if it is SUBREL_STATE_CREATE it should
get the latest definition from the publisher and recreate the table. (We have to recreate
the table even if there are no changes). Then it should go into copy table mode as usual.

Unless there is different way to support concurrent DDLs or we going for blocking publisher
till initial sync is completed.
Regards
Sachin

Show quoted text

[1] - https://dev.mysql.com/doc/refman/8.0/en/replication-
compatibility.html
[2] - https://www.postgresql.org/message-
id/CAAD30U%2BpVmfKwUKy8cbZOnUXyguJ-
uBNejwD75Kyo%3DOjdQGJ9g%40mail.gmail.com

--
With Regards,
Amit Kapila.

#14Masahiko Sawada
sawada.mshk@gmail.com
In reply to: Amit Kapila (#11)
Re: Initial Schema Sync for Logical Replication

On Wed, Mar 22, 2023 at 2:16 PM Amit Kapila <amit.kapila16@gmail.com> wrote:

On Wed, Mar 22, 2023 at 8:29 AM Masahiko Sawada <sawada.mshk@gmail.com> wrote:

On Tue, Mar 21, 2023 at 8:18 PM Amit Kapila <amit.kapila16@gmail.com> wrote:

On Tue, Mar 21, 2023 at 7:32 AM Euler Taveira <euler@eulerto.com> wrote:

You should
exclude them removing these objects from the TOC before running pg_restore or
adding a few pg_dump options to exclude these objects. Another issue is related
to different version. Let's say the publisher has a version ahead of the
subscriber version, a new table syntax can easily break your logical
replication setup. IMO pg_dump doesn't seem like a good solution for initial
synchronization.

Instead, the backend should provide infrastructure to obtain the required DDL
commands for the specific (set of) tables. This can work around the issues from
the previous paragraph:

...

* don't need to worry about different versions.

AFAICU some of the reasons why pg_dump is not allowed to dump from the
newer version are as follows: (a) there could be more columns in the
newer version of the system catalog and then Select * type of stuff
won't work because the client won't have knowledge of additional
columns. (b) the newer version could have new features (represented by
say new columns in existing catalogs or new catalogs) that the older
version of pg_dump has no knowledge of and will fail to get that data
and hence an inconsistent dump. The subscriber will easily be not in
sync due to that.

Now, how do we avoid these problems even if we have our own version of
functionality similar to pg_dump for selected objects? I guess we will
face similar problems.

Right. I think that such functionality needs to return DDL commands
that can be executed on the requested version.

If so, we may need to deny schema sync in any such case.

Yes. Do we have any concrete use case where the subscriber is an older
version, in the first place?

As per my understanding, it is mostly due to the reason that it can
work today. Today, during an off-list discussion with Jonathan on this
point, he pointed me to a similar incompatibility in MySQL
replication. See the "SQL incompatibilities" section in doc[1]. Also,
please note that this applies not only to initial sync but also to
schema sync during replication. I don't think it would be feasible to
keep such cross-version compatibility for DDL replication.

Makes sense to me.

Having said above, I don't intend that we must use pg_dump from the
subscriber for the purpose of initial sync. I think the idea at this
stage is to primarily write a POC patch to see what difficulties we
may face. The other options that we could try out are (a) try to
duplicate parts of pg_dump code in some way (by extracting required
code) for the subscription's initial sync, or (b) have a common code
(probably as a library or some other way) for the required
functionality. There could be more possibilities that we may not have
thought of yet. But the main point is that for approaches other than
using pg_dump, we should consider ways to avoid duplicity of various
parts of its code. Due to this, I think before ruling out using
pg_dump, we should be clear about its risks and limitations.

Thoughts?

Agreed. My biggest concern about approaches other than using pg_dump
is the same; the code duplication that could increase the maintenance
costs. We should clarify what points of using pg_dump is not a good
idea, and also analyze alternative ideas in depth.

Regards,

--
Masahiko Sawada
Amazon Web Services: https://aws.amazon.com

#15Zheng Li
zhengli10@gmail.com
In reply to: Amit Kapila (#11)
Re: Initial Schema Sync for Logical Replication

Yes. Do we have any concrete use case where the subscriber is an older
version, in the first place?

As per my understanding, it is mostly due to the reason that it can
work today. Today, during an off-list discussion with Jonathan on this
point, he pointed me to a similar incompatibility in MySQL
replication. See the "SQL incompatibilities" section in doc[1]. Also,
please note that this applies not only to initial sync but also to
schema sync during replication. I don't think it would be feasible to
keep such cross-version compatibility for DDL replication.

I think it's possible to make DDL replication cross-version
compatible, by making the DDL deparser version-aware: the deparsed
JSON blob can have a PG version in it, and the destination server can
process the versioned JSON blob by transforming anything incompatible
according to the original version and its own version.

Regards,
Zane

#16Euler Taveira
euler@eulerto.com
In reply to: Amit Kapila (#9)
Re: Initial Schema Sync for Logical Replication

On Tue, Mar 21, 2023, at 8:18 AM, Amit Kapila wrote:

Now, how do we avoid these problems even if we have our own version of
functionality similar to pg_dump for selected objects? I guess we will
face similar problems. If so, we may need to deny schema sync in any
such case.

There are 2 approaches for initial DDL synchronization:

1) generate the DDL command on the publisher, stream it and apply it as-is on
the subscriber;
2) generate a DDL representation (JSON, for example) on the publisher, stream
it, transform it into a DDL command on subscriber and apply it.

The option (1) is simpler and faster than option (2) because it does not
require an additional step (transformation). However, option (2) is more
flexible than option (1) because it allow you to create a DDL command even if a
feature was removed from the subscriber and the publisher version is less than
the subscriber version or a feature was added to the publisher and the
publisher version is greater than the subscriber version. Of course there are
exceptions and it should forbid the transformation (in this case, it can be
controlled by the protocol version -- LOGICALREP_PROTO_FOOBAR_VERSION_NUM). A
decision must be made: simple/restrict vs complex/flexible.

One of the main use cases for logical replication is migration (X -> Y where X
< Y). Postgres generally does not remove features but it might happen (such as
WITH OIDS syntax) and it would break the DDL replication (option 1). In the
downgrade case (X -> Y where X > Y), it might break the DDL replication if a
new syntax is introduced in X. Having said that, IMO option (1) is fragile if
we want to support DDL replication between different Postgres versions. It
might eventually work but there is no guarantee.

Per discussion [1]/messages/by-id/CAA4eK1+w_dFytBiv3RxbOL76_noMzmX0QGTc8uS=bc2WaPVoow@mail.gmail.com, I think if we agree that the Alvaro's DDL deparse patch is
the way to go with DDL replication, it seems wise that it should be used for
initial DDL synchronization as well.

[1]: /messages/by-id/CAA4eK1+w_dFytBiv3RxbOL76_noMzmX0QGTc8uS=bc2WaPVoow@mail.gmail.com

--
Euler Taveira
EDB https://www.enterprisedb.com/

#17Amit Kapila
amit.kapila16@gmail.com
In reply to: Kumar, Sachin (#1)
Re: Initial Schema Sync for Logical Replication

On Wed, Mar 15, 2023 at 11:12 PM Kumar, Sachin <ssetiya@amazon.com> wrote:

Concurrent DDL :-

User can execute a DDL command to table t1 at the same time when subscriber is trying to sync

it. pictorial representation https://imgur.com/a/ivrIEv8 [1]

In tablesync process, it makes a connection to the publisher and it sees the

table state which can be in future wrt to the publisher, which can introduce conflicts.

For example:-

CASE 1:- { Publisher removed the column b from the table t1 when subscriber was doing pg_restore

(or any point in concurrent DDL window described in picture [1] ), when tableSync

process will start transaction on the publisher it will see request data of table t1

including column b, which does not exist on the publisher.} So that is why tableSync process

asks for the latest definition.

If we say that we will delay tableSync worker till all the DDL related to table t1 is

applied by the applier process , we can still have a window when publisher issues a DDL

command just before tableSync starts its transaction, and therefore making tableSync and

publisher table definition incompatible (Thanks to Masahiko for pointing out this race

condition).

IIUC, this is possible only if tablesync process uses a snapshot
different than the snapshot we have used to perform the initial schema
sync, otherwise, this shouldn't be a problem. Let me try to explain my
understanding with an example (the LSNs used are just explain the
problem):

1. Create Table t1(c1, c2); --LSN: 90
2. Insert t1 (1, 1); --LSN 100
3. Insert t1 (2, 2); --LSN 110
4. Alter t1 Add Column c3; --LSN 120
5. Insert t1 (3, 3, 3); --LSN 130

Now, say before starting tablesync worker, apply process performs
initial schema sync and uses a snapshot corresponding to LSN 100. Then
it starts tablesync process to allow the initial copy of data in t1.
Here, if the table sync process tries to establish a new snapshot, it
may get data till LSN 130 and when it will try to copy the same in
subscriber it will fail. Is my understanding correct about the problem
you described? If so, can't we allow tablesync process to use the same
exported snapshot as we used for the initial schema sync and won't
that solve the problem you described?

Applier process will skip all DDL/DMLs related to the table t1 and tableSync will apply those

in Catchup phase.

Although there is one issue what will happen to views/ or functions which depend on the table

. I think they should wait till table_state is > SUBREL_STATE_CREATE (means we have the latest

schema definition from the publisher).

There might be corner cases to this approach or maybe a better way to handle concurrent DDL

One simple solution might be to disallow DDLs on the publisher till all the schema is

synced and all tables have state >= SUBREL_STATE_DATASYNC (We can have CASE 1: issue ,

even with DDL replication, so we have to wait till all the tables have table_state

SUBREL_STATE_DATASYNC). Which might be a big window for big databases.

Refresh publication :-

In refresh publication, subscriber does create a new replication slot hence , we can’t run

pg_dump with a snapshot which starts from origin(maybe this is not an issue at all). In this case

it makes more sense for tableSync worker to do schema sync.

Can you please explain this problem with some examples?

--
With Regards,
Amit Kapila.

#18Amit Kapila
amit.kapila16@gmail.com
In reply to: Euler Taveira (#16)
Re: Initial Schema Sync for Logical Replication

On Thu, Mar 23, 2023 at 2:48 AM Euler Taveira <euler@eulerto.com> wrote:

On Tue, Mar 21, 2023, at 8:18 AM, Amit Kapila wrote:

Now, how do we avoid these problems even if we have our own version of
functionality similar to pg_dump for selected objects? I guess we will
face similar problems. If so, we may need to deny schema sync in any
such case.

There are 2 approaches for initial DDL synchronization:

1) generate the DDL command on the publisher, stream it and apply it as-is on
the subscriber;
2) generate a DDL representation (JSON, for example) on the publisher, stream
it, transform it into a DDL command on subscriber and apply it.

The option (1) is simpler and faster than option (2) because it does not
require an additional step (transformation). However, option (2) is more
flexible than option (1) because it allow you to create a DDL command even if a
feature was removed from the subscriber and the publisher version is less than
the subscriber version or a feature was added to the publisher and the
publisher version is greater than the subscriber version.

Is this practically possible? Say the publisher has a higher version
that has introduced a new object type corresponding to which it has
either a new catalog or some new columns in the existing catalog. Now,
I don't think the older version of the subscriber can modify the
command received from the publisher so that the same can be applied to
the subscriber because it won't have any knowledge of the new feature.
In the other case where the subscriber is of a newer version, we
anyway should be able to support it with pg_dump as there doesn't
appear to be any restriction with that, am, I missing something?

One of the main use cases for logical replication is migration (X -> Y where X
< Y).

I don't think we need to restrict this case even if we decide to use pg_dump.

Per discussion [1], I think if we agree that the Alvaro's DDL deparse patch is
the way to go with DDL replication, it seems wise that it should be used for
initial DDL synchronization as well.

Even if we decide to use deparse approach, it would still need to
mimic stuff from pg_dump to construct commands based on only catalog
contents. I am not against using this approach but we shouldn't ignore
the duplicity required in this approach.

--
With Regards,
Amit Kapila.

#19Kumar, Sachin
ssetiya@amazon.com
In reply to: Amit Kapila (#17)
RE: Initial Schema Sync for Logical Replication

From: Amit Kapila <amit.kapila16@gmail.com>
IIUC, this is possible only if tablesync process uses a snapshot different than the
snapshot we have used to perform the initial schema sync, otherwise, this
shouldn't be a problem. Let me try to explain my understanding with an example
(the LSNs used are just explain the
problem):

1. Create Table t1(c1, c2); --LSN: 90
2. Insert t1 (1, 1); --LSN 100
3. Insert t1 (2, 2); --LSN 110
4. Alter t1 Add Column c3; --LSN 120
5. Insert t1 (3, 3, 3); --LSN 130

Now, say before starting tablesync worker, apply process performs initial
schema sync and uses a snapshot corresponding to LSN 100. Then it starts
tablesync process to allow the initial copy of data in t1.
Here, if the table sync process tries to establish a new snapshot, it may get data
till LSN 130 and when it will try to copy the same in subscriber it will fail. Is my
understanding correct about the problem you described?

Right

If so, can't we allow
tablesync process to use the same exported snapshot as we used for the initial
schema sync and won't that solve the problem you described?

I think we won't be able to use same snapshot because the transaction will be committed.
In CreateSubscription() we can use the transaction snapshot from walrcv_create_slot()
till walrcv_disconnect() is called.(I am not sure about this part maybe walrcv_disconnect() calls
the commits internally ?).
So somehow we need to keep this snapshot alive, even after transaction is committed(or delay committing
the transaction , but we can have CREATE SUBSCRIPTION with ENABLED=FALSE, so we can have a restart before
tableSync is able to use the same snapshot.)

Refresh publication :-

In refresh publication, subscriber does create a new replication slot

Typo-> subscriber does not

hence , we can’t run

pg_dump with a snapshot which starts from origin(maybe this is not an
issue at all). In this case

it makes more sense for tableSync worker to do schema sync.

Can you please explain this problem with some examples?

I think we can have same issues as you mentioned
New table t1 is added to the publication , User does a refresh publication.
pg_dump / pg_restore restores the table definition. But before tableSync
can start, steps from 2 to 5 happen on the publisher.

1. Create Table t1(c1, c2); --LSN: 90
2. Insert t1 (1, 1); --LSN 100
3. Insert t1 (2, 2); --LSN 110
4. Alter t1 Add Column c3; --LSN 120
5. Insert t1 (3, 3, 3); --LSN 130

And table sync errors out
There can be one more issue , since we took the pg_dump without snapshot (wrt to replication slot).
(I am not 100 percent sure about this).
Lets imagine applier process is lagging behind publisher.
Events on publisher
1. alter t1 drop column c; LSN 100 <-- applier process tries to execute this DDL
2. alter t1 drop column d; LSN 110
3. insert into t1 values(..); LSN 120 <-- (Refresh publication called )pg_dump/restore restores this version
Applier process executing 1 will fail because t1 does not have column c.
Regards
Sachin

#20Euler Taveira
euler@eulerto.com
In reply to: Amit Kapila (#18)
Re: Initial Schema Sync for Logical Replication

On Thu, Mar 23, 2023, at 8:44 AM, Amit Kapila wrote:

On Thu, Mar 23, 2023 at 2:48 AM Euler Taveira <euler@eulerto.com> wrote:

On Tue, Mar 21, 2023, at 8:18 AM, Amit Kapila wrote:

Now, how do we avoid these problems even if we have our own version of
functionality similar to pg_dump for selected objects? I guess we will
face similar problems. If so, we may need to deny schema sync in any
such case.

There are 2 approaches for initial DDL synchronization:

1) generate the DDL command on the publisher, stream it and apply it as-is on
the subscriber;
2) generate a DDL representation (JSON, for example) on the publisher, stream
it, transform it into a DDL command on subscriber and apply it.

The option (1) is simpler and faster than option (2) because it does not
require an additional step (transformation). However, option (2) is more
flexible than option (1) because it allow you to create a DDL command even if a
feature was removed from the subscriber and the publisher version is less than
the subscriber version or a feature was added to the publisher and the
publisher version is greater than the subscriber version.

Is this practically possible? Say the publisher has a higher version
that has introduced a new object type corresponding to which it has
either a new catalog or some new columns in the existing catalog. Now,
I don't think the older version of the subscriber can modify the
command received from the publisher so that the same can be applied to
the subscriber because it won't have any knowledge of the new feature.
In the other case where the subscriber is of a newer version, we
anyway should be able to support it with pg_dump as there doesn't
appear to be any restriction with that, am, I missing something?

I think so (with some limitations). Since the publisher knows the subscriber
version, publisher knows that the subscriber does not contain the new object
type then publisher can decide if this case is critical (and reject the
replication) or optional (and silently not include the feature X -- because it
is not essential for logical replication). If required, the transformation
should be done on the publisher.

Even if we decide to use deparse approach, it would still need to
mimic stuff from pg_dump to construct commands based on only catalog
contents. I am not against using this approach but we shouldn't ignore
the duplicity required in this approach.

It is fine to share code between pg_dump and this new infrastructure. However,
the old code should coexist to support older versions because the new set of
functions don't exist in older server versions. Hence, duplicity should exist
for a long time (if you consider that the current policy is to allow dump from
9.2, we are talking about 10 years or so). There are some threads [1]/messages/by-id/82EFF560-2A09-4C3D-81CC-A2A5EC438CE5@eggerapps.at[2]/messages/by-id/71e01949.2e16b.13df4707405.Coremail.shuai900217@126.com that
discussed this topic: provide a SQL command based on the catalog
representation. You can probably find other discussions searching for "pg_dump
library" or "getddl".

[1]: /messages/by-id/82EFF560-2A09-4C3D-81CC-A2A5EC438CE5@eggerapps.at
[2]: /messages/by-id/71e01949.2e16b.13df4707405.Coremail.shuai900217@126.com

--
Euler Taveira
EDB https://www.enterprisedb.com/

#21Amit Kapila
amit.kapila16@gmail.com
In reply to: Kumar, Sachin (#19)
Re: Initial Schema Sync for Logical Replication

On Thu, Mar 23, 2023 at 9:24 PM Kumar, Sachin <ssetiya@amazon.com> wrote:

From: Amit Kapila <amit.kapila16@gmail.com>
IIUC, this is possible only if tablesync process uses a snapshot different than the
snapshot we have used to perform the initial schema sync, otherwise, this
shouldn't be a problem. Let me try to explain my understanding with an example
(the LSNs used are just explain the
problem):

1. Create Table t1(c1, c2); --LSN: 90
2. Insert t1 (1, 1); --LSN 100
3. Insert t1 (2, 2); --LSN 110
4. Alter t1 Add Column c3; --LSN 120
5. Insert t1 (3, 3, 3); --LSN 130

Now, say before starting tablesync worker, apply process performs initial
schema sync and uses a snapshot corresponding to LSN 100. Then it starts
tablesync process to allow the initial copy of data in t1.
Here, if the table sync process tries to establish a new snapshot, it may get data
till LSN 130 and when it will try to copy the same in subscriber it will fail. Is my
understanding correct about the problem you described?

Right

If so, can't we allow
tablesync process to use the same exported snapshot as we used for the initial
schema sync and won't that solve the problem you described?

I think we won't be able to use same snapshot because the transaction will be committed.
In CreateSubscription() we can use the transaction snapshot from walrcv_create_slot()
till walrcv_disconnect() is called.(I am not sure about this part maybe walrcv_disconnect() calls
the commits internally ?).
So somehow we need to keep this snapshot alive, even after transaction is committed(or delay committing
the transaction , but we can have CREATE SUBSCRIPTION with ENABLED=FALSE, so we can have a restart before
tableSync is able to use the same snapshot.)

Can we think of getting the table data as well along with schema via
pg_dump? Won't then both schema and initial data will correspond to
the same snapshot?

Refresh publication :-

In refresh publication, subscriber does create a new replication slot

Typo-> subscriber does not

hence , we can’t run

pg_dump with a snapshot which starts from origin(maybe this is not an
issue at all). In this case

it makes more sense for tableSync worker to do schema sync.

Can you please explain this problem with some examples?

I think we can have same issues as you mentioned
New table t1 is added to the publication , User does a refresh publication.
pg_dump / pg_restore restores the table definition. But before tableSync
can start, steps from 2 to 5 happen on the publisher.

1. Create Table t1(c1, c2); --LSN: 90
2. Insert t1 (1, 1); --LSN 100
3. Insert t1 (2, 2); --LSN 110
4. Alter t1 Add Column c3; --LSN 120
5. Insert t1 (3, 3, 3); --LSN 130

And table sync errors out
There can be one more issue , since we took the pg_dump without snapshot (wrt to replication slot).

To avoid both the problems mentioned for Refresh Publication, we can
do one of the following: (a) create a new slot along with a snapshot
for this operation and drop it afterward; or (b) using the existing
slot, establish a new snapshot using a technique proposed in email
[1]: /messages/by-id/CAGPVpCRWEVhXa7ovrhuSQofx4to7o22oU9iKtrOgAOtz_=Y6vg@mail.gmail.com

Note - Please keep one empty line before and after your inline
responses, otherwise, it is slightly difficult to understand your
response.

[1]: /messages/by-id/CAGPVpCRWEVhXa7ovrhuSQofx4to7o22oU9iKtrOgAOtz_=Y6vg@mail.gmail.com

--
With Regards,
Amit Kapila.

#22houzj.fnst@fujitsu.com
houzj.fnst@fujitsu.com
In reply to: Euler Taveira (#20)
RE: Initial Schema Sync for Logical Replication

On Friday, March 24, 2023 12:02 AM Euler Taveira <euler@eulerto.com> wrote:

On Thu, Mar 23, 2023, at 8:44 AM, Amit Kapila wrote:

On Thu, Mar 23, 2023 at 2:48 AM Euler Taveira <mailto:euler@eulerto.com> wrote:

On Tue, Mar 21, 2023, at 8:18 AM, Amit Kapila wrote:

Now, how do we avoid these problems even if we have our own version of
functionality similar to pg_dump for selected objects? I guess we will
face similar problems. If so, we may need to deny schema sync in any
such case.

There are 2 approaches for initial DDL synchronization:

1) generate the DDL command on the publisher, stream it and apply it as-is on
the subscriber;
2) generate a DDL representation (JSON, for example) on the publisher, stream
it, transform it into a DDL command on subscriber and apply it.

The option (1) is simpler and faster than option (2) because it does not
require an additional step (transformation). However, option (2) is more
flexible than option (1) because it allow you to create a DDL command even if a
feature was removed from the subscriber and the publisher version is less than
the subscriber version or a feature was added to the publisher and the
publisher version is greater than the subscriber version.

Is this practically possible? Say the publisher has a higher version
that has introduced a new object type corresponding to which it has
either a new catalog or some new columns in the existing catalog. Now,
I don't think the older version of the subscriber can modify the
command received from the publisher so that the same can be applied to
the subscriber because it won't have any knowledge of the new feature.
In the other case where the subscriber is of a newer version, we
anyway should be able to support it with pg_dump as there doesn't
appear to be any restriction with that, am, I missing something?

I think so (with some limitations). Since the publisher knows the subscriber
version, publisher knows that the subscriber does not contain the new object
type then publisher can decide if this case is critical (and reject the
replication) or optional (and silently not include the feature X -- because it
is not essential for logical replication). If required, the transformation
should be done on the publisher.

I am not if it's feasible to support the use case the replicate DDL to old
subscriber.

First, I think the current publisher doesn't know the version number of
client(subscriber) so we need to check the feasibility of same. Also, having
client's version number checks doesn't seem to be a good idea.

Besides, I thought about the problems that will happen if we try to support
replicating New PG to older PG. The following examples assume that we support the
DDL replication in the mentioned PG.

1) Assume we want to replicate from a newer PG to a older PG where partition
table has not been introduced. I think even if the publisher is aware of
that, it doesn't have a good way to transform the partition related command,
maybe one could say we can transform that to inherit table, but I feel that
introduces too much complexity.

2) Another example is generated column. To replicate the newer PG which has
this feature to a older PG without this. I am concerned that is there a way
to transform this without causing inconsistent behavior.

Even if we decide to simply skip sending such unsupported commands or skip
applying them, then it's likely that the following dml replication will cause
data inconsistency.

So, it seems we cannot completely support this use case, there would be some
limitations. Personally, I am not sure if it's worth introducing complexity to
support it partially.

Best Regards,
Hou zj

#23Kumar, Sachin
ssetiya@amazon.com
In reply to: Amit Kapila (#21)
RE: Initial Schema Sync for Logical Replication

From: Amit Kapila <amit.kapila16@gmail.com>

I think we won't be able to use same snapshot because the transaction will
be committed.
In CreateSubscription() we can use the transaction snapshot from
walrcv_create_slot() till walrcv_disconnect() is called.(I am not sure
about this part maybe walrcv_disconnect() calls the commits internally ?).
So somehow we need to keep this snapshot alive, even after transaction
is committed(or delay committing the transaction , but we can have
CREATE SUBSCRIPTION with ENABLED=FALSE, so we can have a restart
before tableSync is able to use the same snapshot.)

Can we think of getting the table data as well along with schema via
pg_dump? Won't then both schema and initial data will correspond to the
same snapshot?

Right , that will work, Thanks!

I think we can have same issues as you mentioned New table t1 is added
to the publication , User does a refresh publication.
pg_dump / pg_restore restores the table definition. But before
tableSync can start, steps from 2 to 5 happen on the publisher.

1. Create Table t1(c1, c2); --LSN: 90 2. Insert t1 (1, 1); --LSN 100
3. Insert t1 (2, 2); --LSN 110 4. Alter t1 Add Column c3; --LSN 120
5. Insert t1 (3, 3, 3); --LSN 130

And table sync errors out
There can be one more issue , since we took the pg_dump without

snapshot (wrt to replication slot).

To avoid both the problems mentioned for Refresh Publication, we can do
one of the following: (a) create a new slot along with a snapshot for this
operation and drop it afterward; or (b) using the existing slot, establish a
new snapshot using a technique proposed in email [1].

Thanks, I think option (b) will be perfect, since we don’t have to create a new slot.

Regards
Sachin

#24Euler Taveira
euler@eulerto.com
In reply to: houzj.fnst@fujitsu.com (#22)
Re: Initial Schema Sync for Logical Replication

On Fri, Mar 24, 2023, at 8:57 AM, houzj.fnst@fujitsu.com wrote:

First, I think the current publisher doesn't know the version number of
client(subscriber) so we need to check the feasibility of same. Also, having
client's version number checks doesn't seem to be a good idea.

walrcv_server_version().

Besides, I thought about the problems that will happen if we try to support
replicating New PG to older PG. The following examples assume that we support the
DDL replication in the mentioned PG.

1) Assume we want to replicate from a newer PG to a older PG where partition
table has not been introduced. I think even if the publisher is aware of
that, it doesn't have a good way to transform the partition related command,
maybe one could say we can transform that to inherit table, but I feel that
introduces too much complexity.

2) Another example is generated column. To replicate the newer PG which has
this feature to a older PG without this. I am concerned that is there a way
to transform this without causing inconsistent behavior.

Even if we decide to simply skip sending such unsupported commands or skip
applying them, then it's likely that the following dml replication will cause
data inconsistency.

As I mentioned in a previous email [1]/messages/by-id/fb7894e4-b44e-4ae3-a74d-7c5650f69f1a@app.fastmail.com, the publisher can contain code to
decide if it can proceed or not, in case you are doing a downgrade. I said
downgrade but it can also happen if we decide to deprecate a syntax. For
example, when WITH OIDS was deprecated, pg_dump treats it as an acceptable
removal. The transformation can be (dis)allowed by the protocol version or
another constant [2]/messages/by-id/78149fa6-4c77-4128-8518-197a631c29c3@app.fastmail.com.

So, it seems we cannot completely support this use case, there would be some
limitations. Personally, I am not sure if it's worth introducing complexity to
support it partially.

Limitations are fine; they have different versions. I wouldn't like to forbid
downgrade just because I don't want to maintain compatibility with previous
versions. IMO it is important to be able to downgrade in case of any
incompatibility with an application. You might argue that this isn't possible
due to time or patch size and that there is a workaround for it but I wouldn't
want to close the door for downgrade in the future.

[1]: /messages/by-id/fb7894e4-b44e-4ae3-a74d-7c5650f69f1a@app.fastmail.com
[2]: /messages/by-id/78149fa6-4c77-4128-8518-197a631c29c3@app.fastmail.com

--
Euler Taveira
EDB https://www.enterprisedb.com/

#25Kumar, Sachin
ssetiya@amazon.com
In reply to: houzj.fnst@fujitsu.com (#22)
RE: Initial Schema Sync for Logical Replication

I am not if it's feasible to support the use case the replicate DDL to old
subscriber.

+1

First, I think the current publisher doesn't know the version number of
client(subscriber) so we need to check the feasibility of same. Also, having
client's version number checks doesn't seem to be a good idea.

Besides, I thought about the problems that will happen if we try to support
replicating New PG to older PG. The following examples assume that we
support the DDL replication in the mentioned PG.

1) Assume we want to replicate from a newer PG to a older PG where
partition
table has not been introduced. I think even if the publisher is aware of
that, it doesn't have a good way to transform the partition related
command,
maybe one could say we can transform that to inherit table, but I feel that
introduces too much complexity.

2) Another example is generated column. To replicate the newer PG which
has
this feature to a older PG without this. I am concerned that is there a way
to transform this without causing inconsistent behavior.

Even if we decide to simply skip sending such unsupported commands or
skip applying them, then it's likely that the following dml replication will
cause data inconsistency.

So, it seems we cannot completely support this use case, there would be
some limitations. Personally, I am not sure if it's worth introducing
complexity to support it partially.

+1

Regards
Sachin

#26houzj.fnst@fujitsu.com
houzj.fnst@fujitsu.com
In reply to: Euler Taveira (#24)
RE: Initial Schema Sync for Logical Replication

On Friday, March 24, 2023 11:01 PM Euler Taveira <euler@eulerto.com> wrote:

Hi,

On Fri, Mar 24, 2023, at 8:57 AM, mailto:houzj.fnst@fujitsu.com wrote:

First, I think the current publisher doesn't know the version number of
client(subscriber) so we need to check the feasibility of same. Also, having
client's version number checks doesn't seem to be a good idea.

walrcv_server_version().

I don't think this function works, as it only shows the server version (e.g.
publisher/walsender).

Besides, I thought about the problems that will happen if we try to support
replicating New PG to older PG. The following examples assume that we support the
DDL replication in the mentioned PG.

1) Assume we want to replicate from a newer PG to a older PG where partition
table has not been introduced. I think even if the publisher is aware of
that, it doesn't have a good way to transform the partition related command,
maybe one could say we can transform that to inherit table, but I feel that
introduces too much complexity.

2) Another example is generated column. To replicate the newer PG which has
this feature to a older PG without this. I am concerned that is there a way
to transform this without causing inconsistent behavior.

Even if we decide to simply skip sending such unsupported commands or skip
applying them, then it's likely that the following dml replication will cause
data inconsistency.

As I mentioned in a previous email [1], the publisher can contain code to
decide if it can proceed or not, in case you are doing a downgrade. I said
downgrade but it can also happen if we decide to deprecate a syntax. For
example, when WITH OIDS was deprecated, pg_dump treats it as an acceptable
removal. The transformation can be (dis)allowed by the protocol version or
another constant [2].

If most of the new DDL related features won't be supported to be transformed to
old subscriber, I don't see a point in supporting this use case.

I think cases like the removal of WITH OIDS are rare enough that we don't need
to worry about and it doesn't affect the data consistency. But new DDL features
are different.

Not only the features like partition or generated column, features like
nulls_not_distinct are also tricky to be transformed without causing
inconsistent behavior.

So, it seems we cannot completely support this use case, there would be some
limitations. Personally, I am not sure if it's worth introducing complexity to
support it partially.

Limitations are fine; they have different versions. I wouldn't like to forbid
downgrade just because I don't want to maintain compatibility with previous
versions. IMO it is important to be able to downgrade in case of any
incompatibility with an application. You might argue that this isn't possible
due to time or patch size and that there is a workaround for it but I wouldn't
want to close the door for downgrade in the future.

The biggest problem is the data inconsistency that it would cause. I am not
aware of a generic solution to replicate new introduced DDLs to old subscriber.
which wouldn't cause data inconsistency. And apart from that, IMO the
complexity and maintainability of the feature also matters.

Best Regards,
Hou zj

#27Masahiko Sawada
sawada.mshk@gmail.com
In reply to: Kumar, Sachin (#23)
Re: Initial Schema Sync for Logical Replication

On Fri, Mar 24, 2023 at 11:51 PM Kumar, Sachin <ssetiya@amazon.com> wrote:

From: Amit Kapila <amit.kapila16@gmail.com>

I think we won't be able to use same snapshot because the transaction will
be committed.
In CreateSubscription() we can use the transaction snapshot from
walrcv_create_slot() till walrcv_disconnect() is called.(I am not sure
about this part maybe walrcv_disconnect() calls the commits internally ?).
So somehow we need to keep this snapshot alive, even after transaction
is committed(or delay committing the transaction , but we can have
CREATE SUBSCRIPTION with ENABLED=FALSE, so we can have a restart
before tableSync is able to use the same snapshot.)

Can we think of getting the table data as well along with schema via
pg_dump? Won't then both schema and initial data will correspond to the
same snapshot?

Right , that will work, Thanks!

While it works, we cannot get the initial data in parallel, no?

I think we can have same issues as you mentioned New table t1 is added
to the publication , User does a refresh publication.
pg_dump / pg_restore restores the table definition. But before
tableSync can start, steps from 2 to 5 happen on the publisher.

1. Create Table t1(c1, c2); --LSN: 90 2. Insert t1 (1, 1); --LSN 100
3. Insert t1 (2, 2); --LSN 110 4. Alter t1 Add Column c3; --LSN 120
5. Insert t1 (3, 3, 3); --LSN 130

And table sync errors out
There can be one more issue , since we took the pg_dump without

snapshot (wrt to replication slot).

To avoid both the problems mentioned for Refresh Publication, we can do
one of the following: (a) create a new slot along with a snapshot for this
operation and drop it afterward; or (b) using the existing slot, establish a
new snapshot using a technique proposed in email [1].

Thanks, I think option (b) will be perfect, since we don’t have to create a new slot.

Regarding (b), does it mean that apply worker stops streaming,
requests to create a snapshot, and then resumes the streaming?

Regards,

--
Masahiko Sawada
Amazon Web Services: https://aws.amazon.com

#28Amit Kapila
amit.kapila16@gmail.com
In reply to: Masahiko Sawada (#27)
Re: Initial Schema Sync for Logical Replication

On Mon, Mar 27, 2023 at 8:17 AM Masahiko Sawada <sawada.mshk@gmail.com> wrote:

On Fri, Mar 24, 2023 at 11:51 PM Kumar, Sachin <ssetiya@amazon.com> wrote:

From: Amit Kapila <amit.kapila16@gmail.com>

I think we won't be able to use same snapshot because the transaction will
be committed.
In CreateSubscription() we can use the transaction snapshot from
walrcv_create_slot() till walrcv_disconnect() is called.(I am not sure
about this part maybe walrcv_disconnect() calls the commits internally ?).
So somehow we need to keep this snapshot alive, even after transaction
is committed(or delay committing the transaction , but we can have
CREATE SUBSCRIPTION with ENABLED=FALSE, so we can have a restart
before tableSync is able to use the same snapshot.)

Can we think of getting the table data as well along with schema via
pg_dump? Won't then both schema and initial data will correspond to the
same snapshot?

Right , that will work, Thanks!

While it works, we cannot get the initial data in parallel, no?

Another possibility is that we dump/restore the schema of each table
along with its data. One thing we can explore is whether the parallel
option of dump can be useful here. Do you have any other ideas?

One related idea is that currently, we fetch the table list
corresponding to publications in subscription and create the entries
for those in pg_subscription_rel during Create Subscription, can we
think of postponing that work till after the initial schema sync? We
seem to be already storing publications list in pg_subscription, so it
appears possible if we somehow remember the value of copy_data. If
this is feasible then I think that may give us the flexibility to
perform the initial sync at a later point by the background worker.

I think we can have same issues as you mentioned New table t1 is added
to the publication , User does a refresh publication.
pg_dump / pg_restore restores the table definition. But before
tableSync can start, steps from 2 to 5 happen on the publisher.

1. Create Table t1(c1, c2); --LSN: 90 2. Insert t1 (1, 1); --LSN 100
3. Insert t1 (2, 2); --LSN 110 4. Alter t1 Add Column c3; --LSN 120
5. Insert t1 (3, 3, 3); --LSN 130

And table sync errors out
There can be one more issue , since we took the pg_dump without

snapshot (wrt to replication slot).

To avoid both the problems mentioned for Refresh Publication, we can do
one of the following: (a) create a new slot along with a snapshot for this
operation and drop it afterward; or (b) using the existing slot, establish a
new snapshot using a technique proposed in email [1].

Thanks, I think option (b) will be perfect, since we don’t have to create a new slot.

Regarding (b), does it mean that apply worker stops streaming,
requests to create a snapshot, and then resumes the streaming?

Shouldn't this be done by the backend performing a REFRESH publication?

--
With Regards,
Amit Kapila.

#29Masahiko Sawada
sawada.mshk@gmail.com
In reply to: Amit Kapila (#28)
Re: Initial Schema Sync for Logical Replication

On Tue, Mar 28, 2023 at 6:47 PM Amit Kapila <amit.kapila16@gmail.com> wrote:

On Mon, Mar 27, 2023 at 8:17 AM Masahiko Sawada <sawada.mshk@gmail.com> wrote:

On Fri, Mar 24, 2023 at 11:51 PM Kumar, Sachin <ssetiya@amazon.com> wrote:

From: Amit Kapila <amit.kapila16@gmail.com>

I think we won't be able to use same snapshot because the transaction will
be committed.
In CreateSubscription() we can use the transaction snapshot from
walrcv_create_slot() till walrcv_disconnect() is called.(I am not sure
about this part maybe walrcv_disconnect() calls the commits internally ?).
So somehow we need to keep this snapshot alive, even after transaction
is committed(or delay committing the transaction , but we can have
CREATE SUBSCRIPTION with ENABLED=FALSE, so we can have a restart
before tableSync is able to use the same snapshot.)

Can we think of getting the table data as well along with schema via
pg_dump? Won't then both schema and initial data will correspond to the
same snapshot?

Right , that will work, Thanks!

While it works, we cannot get the initial data in parallel, no?

Another possibility is that we dump/restore the schema of each table
along with its data. One thing we can explore is whether the parallel
option of dump can be useful here. Do you have any other ideas?

A downside of the idea of dumping both table schema and table data
would be that we need to temporarily store data twice the size of the
table (the dump file and the table itself) during the load. One might
think that we can redirect the pg_dump output into the backend so that
it can load it via SPI, but it doesn't work since "COPY tbl FROM
stdin;" doesn't work via SPI. The --inserts option of pg_dump could
help it out but it makes restoration very slow.

One related idea is that currently, we fetch the table list
corresponding to publications in subscription and create the entries
for those in pg_subscription_rel during Create Subscription, can we
think of postponing that work till after the initial schema sync? We
seem to be already storing publications list in pg_subscription, so it
appears possible if we somehow remember the value of copy_data. If
this is feasible then I think that may give us the flexibility to
perform the initial sync at a later point by the background worker.

It sounds possible. With this idea, we will be able to have the apply
worker restore the table schemas (and create pg_subscription_rel
entries) as the first thing. Another point we might need to consider
is that the initial schema sync (i.e. creating tables) and creating
pg_subscription_rel entries need to be done in the same transaction.
Otherwise, we could end up committing either one change. I think it
depends on how we restore the schema data.

I think we can have same issues as you mentioned New table t1 is added
to the publication , User does a refresh publication.
pg_dump / pg_restore restores the table definition. But before
tableSync can start, steps from 2 to 5 happen on the publisher.

1. Create Table t1(c1, c2); --LSN: 90 2. Insert t1 (1, 1); --LSN 100
3. Insert t1 (2, 2); --LSN 110 4. Alter t1 Add Column c3; --LSN 120
5. Insert t1 (3, 3, 3); --LSN 130

And table sync errors out
There can be one more issue , since we took the pg_dump without

snapshot (wrt to replication slot).

To avoid both the problems mentioned for Refresh Publication, we can do
one of the following: (a) create a new slot along with a snapshot for this
operation and drop it afterward; or (b) using the existing slot, establish a
new snapshot using a technique proposed in email [1].

Thanks, I think option (b) will be perfect, since we don’t have to create a new slot.

Regarding (b), does it mean that apply worker stops streaming,
requests to create a snapshot, and then resumes the streaming?

Shouldn't this be done by the backend performing a REFRESH publication?

Hmm, I might be missing something but the idea (b) uses the existing
slot to establish a new snapshot, right? What existing replication
slot do we use for that? I thought it was the one used by the apply
worker.

Regards,

--
Masahiko Sawada
Amazon Web Services: https://aws.amazon.com

#30Kumar, Sachin
ssetiya@amazon.com
In reply to: Amit Kapila (#28)
RE: Initial Schema Sync for Logical Replication

From: Amit Kapila <amit.kapila16@gmail.com>

I think we won't be able to use same snapshot because the
transaction will be committed.
In CreateSubscription() we can use the transaction snapshot from
walrcv_create_slot() till walrcv_disconnect() is called.(I am
not sure about this part maybe walrcv_disconnect() calls the commits

internally ?).

So somehow we need to keep this snapshot alive, even after
transaction is committed(or delay committing the transaction ,
but we can have CREATE SUBSCRIPTION with ENABLED=FALSE, so we
can have a restart before tableSync is able to use the same
snapshot.)

Can we think of getting the table data as well along with schema
via pg_dump? Won't then both schema and initial data will
correspond to the same snapshot?

Right , that will work, Thanks!

While it works, we cannot get the initial data in parallel, no?

I was thinking each TableSync process will call pg_dump --table, This way if we have N
tableSync process, we can have N pg_dump --table=table_name called in parallel.
In fact we can use --schema-only to get schema and then let COPY take care of data
syncing . We will use same snapshot for pg_dump as well as COPY table.

Regards
Sachin

#31Amit Kapila
amit.kapila16@gmail.com
In reply to: Masahiko Sawada (#29)
Re: Initial Schema Sync for Logical Replication

On Tue, Mar 28, 2023 at 8:30 PM Masahiko Sawada <sawada.mshk@gmail.com> wrote:

On Tue, Mar 28, 2023 at 6:47 PM Amit Kapila <amit.kapila16@gmail.com> wrote:

I think we can have same issues as you mentioned New table t1 is added
to the publication , User does a refresh publication.
pg_dump / pg_restore restores the table definition. But before
tableSync can start, steps from 2 to 5 happen on the publisher.

1. Create Table t1(c1, c2); --LSN: 90 2. Insert t1 (1, 1); --LSN 100
3. Insert t1 (2, 2); --LSN 110 4. Alter t1 Add Column c3; --LSN 120
5. Insert t1 (3, 3, 3); --LSN 130

And table sync errors out
There can be one more issue , since we took the pg_dump without

snapshot (wrt to replication slot).

To avoid both the problems mentioned for Refresh Publication, we can do
one of the following: (a) create a new slot along with a snapshot for this
operation and drop it afterward; or (b) using the existing slot, establish a
new snapshot using a technique proposed in email [1].

Thanks, I think option (b) will be perfect, since we don’t have to create a new slot.

Regarding (b), does it mean that apply worker stops streaming,
requests to create a snapshot, and then resumes the streaming?

Shouldn't this be done by the backend performing a REFRESH publication?

Hmm, I might be missing something but the idea (b) uses the existing
slot to establish a new snapshot, right? What existing replication
slot do we use for that? I thought it was the one used by the apply
worker.

Right, it will be the same as the one for apply worker. I think if we
decide to do initial sync via apply worker then in this case also, we
need to let apply worker restart and perform initial sync as the first
thing.

--
With Regards,
Amit Kapila.

#32Masahiko Sawada
sawada.mshk@gmail.com
In reply to: Kumar, Sachin (#30)
Re: Initial Schema Sync for Logical Replication

On Wed, Mar 29, 2023 at 7:57 PM Kumar, Sachin <ssetiya@amazon.com> wrote:

From: Amit Kapila <amit.kapila16@gmail.com>

I think we won't be able to use same snapshot because the
transaction will be committed.
In CreateSubscription() we can use the transaction snapshot from
walrcv_create_slot() till walrcv_disconnect() is called.(I am
not sure about this part maybe walrcv_disconnect() calls the commits

internally ?).

So somehow we need to keep this snapshot alive, even after
transaction is committed(or delay committing the transaction ,
but we can have CREATE SUBSCRIPTION with ENABLED=FALSE, so we
can have a restart before tableSync is able to use the same
snapshot.)

Can we think of getting the table data as well along with schema
via pg_dump? Won't then both schema and initial data will
correspond to the same snapshot?

Right , that will work, Thanks!

While it works, we cannot get the initial data in parallel, no?

I was thinking each TableSync process will call pg_dump --table, This way if we have N
tableSync process, we can have N pg_dump --table=table_name called in parallel.
In fact we can use --schema-only to get schema and then let COPY take care of data
syncing . We will use same snapshot for pg_dump as well as COPY table.

How can we postpone creating the pg_subscription_rel entries until the
tablesync worker starts and does the schema sync? I think that since
pg_subscription_rel entry needs the table OID, we need either to do
the schema sync before creating the entry (i.e, during CREATE
SUBSCRIPTION) or to postpone creating entries as Amit proposed[1]/messages/by-id/CAA4eK1Ld9-5ueomE_J5CA6LfRo=wemdTrUp5qdBhRFwGT+dOUw@mail.gmail.com. The
apply worker needs the information of tables to sync in order to
launch the tablesync workers, but it needs to create the table schema
to get that information.

Regards,

[1]: /messages/by-id/CAA4eK1Ld9-5ueomE_J5CA6LfRo=wemdTrUp5qdBhRFwGT+dOUw@mail.gmail.com

--
Masahiko Sawada
Amazon Web Services: https://aws.amazon.com

#33Kumar, Sachin
ssetiya@amazon.com
In reply to: Masahiko Sawada (#29)
RE: Initial Schema Sync for Logical Replication

From: Masahiko Sawada <sawada.mshk@gmail.com>

One related idea is that currently, we fetch the table list
corresponding to publications in subscription and create the entries
for those in pg_subscription_rel during Create Subscription, can we
think of postponing that work till after the initial schema sync? We
seem to be already storing publications list in pg_subscription, so it
appears possible if we somehow remember the value of copy_data. If
this is feasible then I think that may give us the flexibility to
perform the initial sync at a later point by the background worker.

Maybe we need to add column to pg_subscription to store copy_data state ?

It sounds possible. With this idea, we will be able to have the apply worker
restore the table schemas (and create pg_subscription_rel
entries) as the first thing. Another point we might need to consider is that the
initial schema sync (i.e. creating tables) and creating pg_subscription_rel entries
need to be done in the same transaction.
Otherwise, we could end up committing either one change. I think it depends on
how we restore the schema data.

I think we have to add one more column to pg_subscription to track the initial sync
state {OFF, SCHEMA_DUMPED, SCHEMA_RESTORED, COMPLETED} (COMPLETED will
show that pg_subscription_rel is filled) . I don’t think we won't be able
to do pg_restore and pg_subscription_rel in single transaction, but we can use
initial_sync_state to start from where we left after a abrupt crash/shutdown.

Regards
Sachin

#34Masahiko Sawada
sawada.mshk@gmail.com
In reply to: Masahiko Sawada (#32)
Re: Initial Schema Sync for Logical Replication

On Thu, Mar 30, 2023 at 12:18 AM Masahiko Sawada <sawada.mshk@gmail.com> wrote:

On Wed, Mar 29, 2023 at 7:57 PM Kumar, Sachin <ssetiya@amazon.com> wrote:

From: Amit Kapila <amit.kapila16@gmail.com>

I think we won't be able to use same snapshot because the
transaction will be committed.
In CreateSubscription() we can use the transaction snapshot from
walrcv_create_slot() till walrcv_disconnect() is called.(I am
not sure about this part maybe walrcv_disconnect() calls the commits

internally ?).

So somehow we need to keep this snapshot alive, even after
transaction is committed(or delay committing the transaction ,
but we can have CREATE SUBSCRIPTION with ENABLED=FALSE, so we
can have a restart before tableSync is able to use the same
snapshot.)

Can we think of getting the table data as well along with schema
via pg_dump? Won't then both schema and initial data will
correspond to the same snapshot?

Right , that will work, Thanks!

While it works, we cannot get the initial data in parallel, no?

I was thinking each TableSync process will call pg_dump --table, This way if we have N
tableSync process, we can have N pg_dump --table=table_name called in parallel.
In fact we can use --schema-only to get schema and then let COPY take care of data
syncing . We will use same snapshot for pg_dump as well as COPY table.

How can we postpone creating the pg_subscription_rel entries until the
tablesync worker starts and does the schema sync? I think that since
pg_subscription_rel entry needs the table OID, we need either to do
the schema sync before creating the entry (i.e, during CREATE
SUBSCRIPTION) or to postpone creating entries as Amit proposed[1]. The
apply worker needs the information of tables to sync in order to
launch the tablesync workers, but it needs to create the table schema
to get that information.

For the above reason, I think that step 6 of the initial proposal won't work.

If we can have the tablesync worker create an entry of
pg_subscription_rel after creating the table, it may give us the
flexibility to perform the initial sync. One idea is that we add a
relname field to pg_subscription_rel so that we can create entries
with relname instead of OID if the table is not created yet. Once the
table is created, we clear the relname field and set the OID of the
table instead. It's not an ideal solution but we might make it simpler
later.

Assuming that it's feasible, I'm considering another approach for the
initial sync in order to address the concurrent DDLs.

The basic idea is to somewhat follow how pg_dump/restore to
dump/restore the database data. We divide the synchronization phase
(including both schema and data) up into three phases: pre-data,
table-data, post-data. These mostly follow the --section option of
pg_dump.

1. The backend process performing CREATE SUBSCRIPTION creates the
subscription but doesn't create pg_subscription_rel entries yet.

2. Before starting the streaming, the apply worker fetches the table
list from the publisher, create pg_subscription_rel entries for them,
and dumps+restores database objects that tables could depend on but
don't depend on tables such as TYPE, OPERATOR, ACCESS METHOD etc (i.e.
pre-data).

3. The apply worker launches the tablesync workers for tables that
need to be synchronized.

There might be DDLs executed on the publisher for tables before the
tablesync worker starts. But the apply worker needs to apply DDLs for
pre-data database objects. OTOH, it can ignore DDLs for not-synced-yet
tables and other database objects such as INDEX, TRIGGER, RULE, etc
(i.e. post-data).

4. The tablesync worker creates its replication slot, dumps+restores
the table schema, update the pg_subscription_rel, and perform COPY.

These operations should be done in the same transaction.

5. After finishing COPY, the tablesync worker dumps indexes (and
perhaps constraints) of the table and creates them (which possibly
takes a long time). Then it starts to catch up, same as today. The
apply worker needs to wait for the tablesync worker to catch up.

We need to repeat these steps until we complete the initial data copy
and create indexes for all tables, IOW until all pg_subscription_rel
status becomes READY.

6. If the apply worker confirms all tables are READY, it starts
another sync worker who is responsible for the post-data database
objects such as TRIGGER, RULE, POLICY etc (i.e. post-data).

While the sync worker is starting up or working, the apply worker
applies changes for pre-data database objects as well as READY tables.

7. Similar to the tablesync worker, this sync worker creates its
replication slot and sets the returned LSN somewhere, say
pg_subscription.

8. The sync worker dumps and restores these objects. Which could take
a time since it would need to create FK constraints. Then it starts to
catch up if the apply worker is ahead. The apply worker waits for the
sync worker to catch up.

9. Once the sync worker catches up, the apply worker starts applying
changes for all database objects.

IIUC with this approach, we can resolve the concurrent DDL problem
Sachin mentioned, and indexes (and constraints) are created after the
initial data copy.

The procedures are still very complex and not fully considered yet but
I hope there are some useful things at least for discussion.

Probably we can start with supporting only tables. In this case, we
would not need the post-data phase (i.e. step 6-9). It seems to me
that we need to have the subscription state somewhere (maybe
pg_subscription) so that the apply worker figure out the next step.
Since we need to dump and restore different objects on different
timings, we probably cannot directly use pg_dump/pg_restore. I've not
considered how the concurrent REFRESH PUBLICATION works.

Regards,

--
Masahiko Sawada
Amazon Web Services: https://aws.amazon.com

#35Kumar, Sachin
ssetiya@amazon.com
In reply to: Masahiko Sawada (#34)
RE: Initial Schema Sync for Logical Replication

-----Original Message-----
From: Masahiko Sawada <sawada.mshk@gmail.com>

I was thinking each TableSync process will call pg_dump --table,
This way if we have N tableSync process, we can have N pg_dump --

table=table_name called in parallel.

In fact we can use --schema-only to get schema and then let COPY
take care of data syncing . We will use same snapshot for pg_dump as well

as COPY table.

How can we postpone creating the pg_subscription_rel entries until the
tablesync worker starts and does the schema sync? I think that since
pg_subscription_rel entry needs the table OID, we need either to do
the schema sync before creating the entry (i.e, during CREATE
SUBSCRIPTION) or to postpone creating entries as Amit proposed[1]. The
apply worker needs the information of tables to sync in order to
launch the tablesync workers, but it needs to create the table schema
to get that information.

For the above reason, I think that step 6 of the initial proposal won't work.

If we can have the tablesync worker create an entry of pg_subscription_rel after
creating the table, it may give us the flexibility to perform the initial sync. One
idea is that we add a relname field to pg_subscription_rel so that we can create
entries with relname instead of OID if the table is not created yet. Once the
table is created, we clear the relname field and set the OID of the table instead.
It's not an ideal solution but we might make it simpler later.

Assuming that it's feasible, I'm considering another approach for the initial sync
in order to address the concurrent DDLs.

The basic idea is to somewhat follow how pg_dump/restore to dump/restore
the database data. We divide the synchronization phase (including both schema
and data) up into three phases: pre-data, table-data, post-data. These mostly
follow the --section option of pg_dump.

1. The backend process performing CREATE SUBSCRIPTION creates the
subscription but doesn't create pg_subscription_rel entries yet.

2. Before starting the streaming, the apply worker fetches the table list from the
publisher, create pg_subscription_rel entries for them, and dumps+restores
database objects that tables could depend on but don't depend on tables such as
TYPE, OPERATOR, ACCESS METHOD etc (i.e.
pre-data).

We will not have slot starting snapshot, So somehow we have to get a new snapshot
And skip all the wal_log between starting of slot and snapshot creation lsn ? .

3. The apply worker launches the tablesync workers for tables that need to be
synchronized.

There might be DDLs executed on the publisher for tables before the tablesync
worker starts. But the apply worker needs to apply DDLs for pre-data database
objects. OTOH, it can ignore DDLs for not-synced-yet tables and other database
objects such as INDEX, TRIGGER, RULE, etc (i.e. post-data).

4. The tablesync worker creates its replication slot, dumps+restores the table
schema, update the pg_subscription_rel, and perform COPY.

These operations should be done in the same transaction.

pg_restore wont be rollbackable, So we need to maintain states in pg_subscription_rel.

5. After finishing COPY, the tablesync worker dumps indexes (and perhaps
constraints) of the table and creates them (which possibly takes a long time).
Then it starts to catch up, same as today. The apply worker needs to wait for the
tablesync worker to catch up.

I don’t think we can have CATCHUP stage. We can have a DDL on publisher which
can add a new column (And this DDL will be executed by applier later). Then we get a INSERT
because we have old definition of table, insert will fail.

We need to repeat these steps until we complete the initial data copy and create
indexes for all tables, IOW until all pg_subscription_rel status becomes READY.

6. If the apply worker confirms all tables are READY, it starts another sync
worker who is responsible for the post-data database objects such as TRIGGER,
RULE, POLICY etc (i.e. post-data).

While the sync worker is starting up or working, the apply worker applies
changes for pre-data database objects as well as READY tables.

We might have some issue if we have create table like
Create table_name as select * from materialized_view.

7. Similar to the tablesync worker, this sync worker creates its replication slot
and sets the returned LSN somewhere, say pg_subscription.

8. The sync worker dumps and restores these objects. Which could take a time
since it would need to create FK constraints. Then it starts to catch up if the
apply worker is ahead. The apply worker waits for the sync worker to catch up.

9. Once the sync worker catches up, the apply worker starts applying changes
for all database objects.

IIUC with this approach, we can resolve the concurrent DDL problem Sachin
mentioned, and indexes (and constraints) are created after the initial data copy.

The procedures are still very complex and not fully considered yet but I hope
there are some useful things at least for discussion.

Probably we can start with supporting only tables. In this case, we would not
need the post-data phase (i.e. step 6-9). It seems to me that we need to have
the subscription state somewhere (maybe
pg_subscription) so that the apply worker figure out the next step.
Since we need to dump and restore different objects on different timings, we
probably cannot directly use pg_dump/pg_restore. I've not considered how the
concurrent REFRESH PUBLICATION works.

I think above prototype will work and will have least amount of side effects, but
It might be too complex to implement and I am not sure about corner cases.

I was thinking of other ways of doing Initial Sync , which are less complex but each
with separate set of bottlenecks

On Publisher Side:-
1) Locking the publisher:- Easiest one to implement, applier process will get Access Shared
lock on the all the published tables. (We don't have to worry newly created concurrent table)
As tableSync will finish syncing the table, it will release table lock, So we will release
table locks in steps. Users can still perform DML on tables, but DDLs wont be allowed.

On Subscriber Side:-
So the main issue is tableSync process can see the future table data/version wrt to the
applier process, So we have to find a way to ensure that tableSync/applier process sees
same table version.

2) Using pg_dump/pg_restore for schema and data:- As Amit mentioned we can use pg_dump/
pg_restore [1]/messages/by-id/CAA4eK1Ld9-5ueomE_J5CA6LfRo=wemdTrUp5qdBhRFwGT+dOUw@mail.gmail.com, Although it might have side effect of using double storage , we can
table pg_dump of each table separately and delete the dump as soon as table is synced.
tableSync process will read the dump and call pg_restore on the table.
If we crash in middle of restoring the tables we can start pg_dump(--clean)/restore again
with left out tables.
With this we can reduce space usage but we might create too many files.

3) Using publisher snapshot:- Applier process will do pg_dump/pg_restore as usual,
Then applier process will start a new process P1 which will connect to
publisher and start a transaction , it will call pg_export_snapshot() to export the
snapshot.Then applier process will take snapshot string and pass it to the tableSync process
as a argument. tableSync will use this snapshot for COPY TABLE. tableSync should only
do COPY TABLE and then will exit , So we wont do any catchup phase in tableSync. After
all tables finish COPY table transaction will be committed by P1 process and it will exit.
In the case of crash/restart we can simple start from beginning since nothing is committed
till every table is synced. There are 2 main issues with this approach
1. I am not sure what side-effects we might have on publisher since we might have to keep
the transaction open for long time.
2. Applier process will simple wait till all tables are synced.
since applier process wont be able to apply any wal_logs till all tables are synced
maybe instead of creating new process Applier process itself can start transaction/
export snapshot and tableSync process will use that snapshot. After all tables are synced
it can start wal_streaming.

I think approach no 3 might be the best way.

[1]: /messages/by-id/CAA4eK1Ld9-5ueomE_J5CA6LfRo=wemdTrUp5qdBhRFwGT+dOUw@mail.gmail.com

Regards
Sachin

#36Masahiko Sawada
sawada.mshk@gmail.com
In reply to: Kumar, Sachin (#35)
Re: Initial Schema Sync for Logical Replication

On Mon, Apr 3, 2023 at 3:54 PM Kumar, Sachin <ssetiya@amazon.com> wrote:

-----Original Message-----
From: Masahiko Sawada <sawada.mshk@gmail.com>

I was thinking each TableSync process will call pg_dump --table,
This way if we have N tableSync process, we can have N pg_dump --

table=table_name called in parallel.

In fact we can use --schema-only to get schema and then let COPY
take care of data syncing . We will use same snapshot for pg_dump as well

as COPY table.

How can we postpone creating the pg_subscription_rel entries until the
tablesync worker starts and does the schema sync? I think that since
pg_subscription_rel entry needs the table OID, we need either to do
the schema sync before creating the entry (i.e, during CREATE
SUBSCRIPTION) or to postpone creating entries as Amit proposed[1]. The
apply worker needs the information of tables to sync in order to
launch the tablesync workers, but it needs to create the table schema
to get that information.

For the above reason, I think that step 6 of the initial proposal won't work.

If we can have the tablesync worker create an entry of pg_subscription_rel after
creating the table, it may give us the flexibility to perform the initial sync. One
idea is that we add a relname field to pg_subscription_rel so that we can create
entries with relname instead of OID if the table is not created yet. Once the
table is created, we clear the relname field and set the OID of the table instead.
It's not an ideal solution but we might make it simpler later.

Assuming that it's feasible, I'm considering another approach for the initial sync
in order to address the concurrent DDLs.

The basic idea is to somewhat follow how pg_dump/restore to dump/restore
the database data. We divide the synchronization phase (including both schema
and data) up into three phases: pre-data, table-data, post-data. These mostly
follow the --section option of pg_dump.

1. The backend process performing CREATE SUBSCRIPTION creates the
subscription but doesn't create pg_subscription_rel entries yet.

2. Before starting the streaming, the apply worker fetches the table list from the
publisher, create pg_subscription_rel entries for them, and dumps+restores
database objects that tables could depend on but don't depend on tables such as
TYPE, OPERATOR, ACCESS METHOD etc (i.e.
pre-data).

We will not have slot starting snapshot, So somehow we have to get a new snapshot
And skip all the wal_log between starting of slot and snapshot creation lsn ? .

Yes. Or we can somehow postpone creating pg_subscription_rel entries
until the tablesync workers create tables, or we request walsender to
establish a new snapshot using a technique proposed in email[1]/messages/by-id/CAGPVpCRWEVhXa7ovrhuSQofx4to7o22oU9iKtrOgAOtz_=Y6vg@mail.gmail.com.

3. The apply worker launches the tablesync workers for tables that need to be
synchronized.

There might be DDLs executed on the publisher for tables before the tablesync
worker starts. But the apply worker needs to apply DDLs for pre-data database
objects. OTOH, it can ignore DDLs for not-synced-yet tables and other database
objects such as INDEX, TRIGGER, RULE, etc (i.e. post-data).

4. The tablesync worker creates its replication slot, dumps+restores the table
schema, update the pg_subscription_rel, and perform COPY.

These operations should be done in the same transaction.

pg_restore wont be rollbackable, So we need to maintain states in pg_subscription_rel.

Yes. But I think it depends on how we restore them. For example, if we
have the tablesync worker somethow restore the table using a new SQL
function returning the table schema as we discussed or executing the
dump file via SPI, we can do that in the same transaction.

5. After finishing COPY, the tablesync worker dumps indexes (and perhaps
constraints) of the table and creates them (which possibly takes a long time).
Then it starts to catch up, same as today. The apply worker needs to wait for the
tablesync worker to catch up.

I don’t think we can have CATCHUP stage. We can have a DDL on publisher which
can add a new column (And this DDL will be executed by applier later). Then we get a INSERT
because we have old definition of table, insert will fail.

All DMLs and DDLs associated with the table being synchronized are
applied by the tablesync worker until it catches up with the apply
worker.

We need to repeat these steps until we complete the initial data copy and create
indexes for all tables, IOW until all pg_subscription_rel status becomes READY.

6. If the apply worker confirms all tables are READY, it starts another sync
worker who is responsible for the post-data database objects such as TRIGGER,
RULE, POLICY etc (i.e. post-data).

While the sync worker is starting up or working, the apply worker applies
changes for pre-data database objects as well as READY tables.

We might have some issue if we have create table like
Create table_name as select * from materialized_view.

Could you elaborate on the scenario where we could have an issue with such DDL?

7. Similar to the tablesync worker, this sync worker creates its replication slot
and sets the returned LSN somewhere, say pg_subscription.

8. The sync worker dumps and restores these objects. Which could take a time
since it would need to create FK constraints. Then it starts to catch up if the
apply worker is ahead. The apply worker waits for the sync worker to catch up.

9. Once the sync worker catches up, the apply worker starts applying changes
for all database objects.

IIUC with this approach, we can resolve the concurrent DDL problem Sachin
mentioned, and indexes (and constraints) are created after the initial data copy.

The procedures are still very complex and not fully considered yet but I hope
there are some useful things at least for discussion.

Probably we can start with supporting only tables. In this case, we would not
need the post-data phase (i.e. step 6-9). It seems to me that we need to have
the subscription state somewhere (maybe
pg_subscription) so that the apply worker figure out the next step.
Since we need to dump and restore different objects on different timings, we
probably cannot directly use pg_dump/pg_restore. I've not considered how the
concurrent REFRESH PUBLICATION works.

I think above prototype will work and will have least amount of side effects, but
It might be too complex to implement and I am not sure about corner cases.

I was thinking of other ways of doing Initial Sync , which are less complex but each
with separate set of bottlenecks

On Publisher Side:-
1) Locking the publisher:- Easiest one to implement, applier process will get Access Shared
lock on the all the published tables. (We don't have to worry newly created concurrent table)
As tableSync will finish syncing the table, it will release table lock, So we will release
table locks in steps. Users can still perform DML on tables, but DDLs wont be allowed.

Do you mean that the apply worker acquires table locks and the
tablesync workers release them? If so, how can we implement it?

2) Using pg_dump/pg_restore for schema and data:- As Amit mentioned we can use pg_dump/
pg_restore [1], Although it might have side effect of using double storage , we can
table pg_dump of each table separately and delete the dump as soon as table is synced.
tableSync process will read the dump and call pg_restore on the table.
If we crash in middle of restoring the tables we can start pg_dump(--clean)/restore again
with left out tables.
With this we can reduce space usage but we might create too many files.

With this idea, who does pg_dump and pg_restore? and when do we create
pg_subscription_rel entries?

3) Using publisher snapshot:- Applier process will do pg_dump/pg_restore as usual,
Then applier process will start a new process P1 which will connect to
publisher and start a transaction , it will call pg_export_snapshot() to export the
snapshot.Then applier process will take snapshot string and pass it to the tableSync process
as a argument. tableSync will use this snapshot for COPY TABLE. tableSync should only
do COPY TABLE and then will exit , So we wont do any catchup phase in tableSync. After
all tables finish COPY table transaction will be committed by P1 process and it will exit.
In the case of crash/restart we can simple start from beginning since nothing is committed
till every table is synced. There are 2 main issues with this approach
1. I am not sure what side-effects we might have on publisher since we might have to keep
the transaction open for long time.

I'm concerned that it would not be an acceptable downside that we keep
a transaction open until all tables are synchronized.

2. Applier process will simple wait till all tables are synced.
since applier process wont be able to apply any wal_logs till all tables are synced
maybe instead of creating new process Applier process itself can start transaction/
export snapshot and tableSync process will use that snapshot. After all tables are synced
it can start wal_streaming.

I think that after users execute REFRESH PUBLICATION, there are mixed
non-ready and ready tables in the subscription. In this case, it's a
huge restriction for users that logical replication for the ready
tables stops until all newly-subscribed tables are synchronized.

Regards,

[1]: /messages/by-id/CAGPVpCRWEVhXa7ovrhuSQofx4to7o22oU9iKtrOgAOtz_=Y6vg@mail.gmail.com

--
Masahiko Sawada
Amazon Web Services: https://aws.amazon.com

#37Kumar, Sachin
ssetiya@amazon.com
In reply to: Masahiko Sawada (#36)
RE: Initial Schema Sync for Logical Replication

From: Masahiko Sawada <sawada.mshk@gmail.com>

3. The apply worker launches the tablesync workers for tables that
need to be synchronized.

There might be DDLs executed on the publisher for tables before the
tablesync worker starts. But the apply worker needs to apply DDLs
for pre-data database objects. OTOH, it can ignore DDLs for
not-synced-yet tables and other database objects such as INDEX,

TRIGGER, RULE, etc (i.e. post-data).

4. The tablesync worker creates its replication slot, dumps+restores
the table schema, update the pg_subscription_rel, and perform COPY.

These operations should be done in the same transaction.

pg_restore wont be rollbackable, So we need to maintain states in

pg_subscription_rel.

Yes. But I think it depends on how we restore them. For example, if we have
the tablesync worker somethow restore the table using a new SQL function
returning the table schema as we discussed or executing the dump file via
SPI, we can do that in the same transaction.

okay

5. After finishing COPY, the tablesync worker dumps indexes (and
perhaps
constraints) of the table and creates them (which possibly takes a long

time).

Then it starts to catch up, same as today. The apply worker needs to
wait for the tablesync worker to catch up.

I don’t think we can have CATCHUP stage. We can have a DDL on
publisher which can add a new column (And this DDL will be executed by
applier later). Then we get a INSERT because we have old definition of

table, insert will fail.

All DMLs and DDLs associated with the table being synchronized are applied
by the tablesync worker until it catches up with the apply worker.

Right, Sorry I forgot that in above case if definition on publisher changes we will also have a
corresponding DDLs.

We need to repeat these steps until we complete the initial data
copy and create indexes for all tables, IOW until all pg_subscription_rel

status becomes READY.

6. If the apply worker confirms all tables are READY, it starts
another sync worker who is responsible for the post-data database
objects such as TRIGGER, RULE, POLICY etc (i.e. post-data).

While the sync worker is starting up or working, the apply worker
applies changes for pre-data database objects as well as READY tables.

We might have some issue if we have create table like Create
table_name as select * from materialized_view.

Could you elaborate on the scenario where we could have an issue with such
DDL?

Since materialized view of publisher has not been created by subscriber yet
So if we have a DDL which does a create table using a materialized view
it will fail. I am not sure how DDL patch is handling create table as statements.
If it is modified to become like a normal CREATE TABLE then we wont have any issues.

7. Similar to the tablesync worker, this sync worker creates its
replication slot and sets the returned LSN somewhere, say

pg_subscription.

8. The sync worker dumps and restores these objects. Which could
take a time since it would need to create FK constraints. Then it
starts to catch up if the apply worker is ahead. The apply worker waits for

the sync worker to catch up.

9. Once the sync worker catches up, the apply worker starts applying
changes for all database objects.

IIUC with this approach, we can resolve the concurrent DDL problem
Sachin mentioned, and indexes (and constraints) are created after the

initial data copy.

The procedures are still very complex and not fully considered yet
but I hope there are some useful things at least for discussion.

Probably we can start with supporting only tables. In this case, we
would not need the post-data phase (i.e. step 6-9). It seems to me
that we need to have the subscription state somewhere (maybe
pg_subscription) so that the apply worker figure out the next step.
Since we need to dump and restore different objects on different
timings, we probably cannot directly use pg_dump/pg_restore. I've
not considered how the concurrent REFRESH PUBLICATION works.

I think above prototype will work and will have least amount of side
effects, but It might be too complex to implement and I am not sure about

corner cases.

I was thinking of other ways of doing Initial Sync , which are less
complex but each with separate set of bottlenecks

On Publisher Side:-
1) Locking the publisher:- Easiest one to implement, applier process
will get Access Shared lock on the all the published tables. (We don't
have to worry newly created concurrent table) As tableSync will finish
syncing the table, it will release table lock, So we will release table locks in

steps. Users can still perform DML on tables, but DDLs wont be allowed.

Do you mean that the apply worker acquires table locks and the tablesync
workers release them? If so, how can we implement it?

I think releasing lock in steps would be impossible (given postgres lock implementations)
So applier process has to create a new transaction and lock all the published tables in
access shared mode. And after tableSync is completed transaction will be committed to release
locks. So 1 and 3 are similar we have to keep one transaction open till table are synced.

2) Using pg_dump/pg_restore for schema and data:- As Amit mentioned

we

can use pg_dump/ pg_restore [1], Although it might have side effect of
using double storage , we can table pg_dump of each table separately and

delete the dump as soon as table is synced.

tableSync process will read the dump and call pg_restore on the table.
If we crash in middle of restoring the tables we can start
pg_dump(--clean)/restore again with left out tables.
With this we can reduce space usage but we might create too many files.

With this idea, who does pg_dump and pg_restore? and when do we create
pg_subscription_rel entries?

Applier process will do pg_dump/pg_restore . pg_subscription_rel entries can be created
after pg_restore, We can create a new column with rel_nam and keep oid empty As you have
suggested earlier.

3) Using publisher snapshot:- Applier process will do
pg_dump/pg_restore as usual, Then applier process will start a new
process P1 which will connect to publisher and start a transaction ,
it will call pg_export_snapshot() to export the snapshot.Then applier
process will take snapshot string and pass it to the tableSync process
as a argument. tableSync will use this snapshot for COPY TABLE.
tableSync should only do COPY TABLE and then will exit , So we wont do

any catchup phase in tableSync. After all tables finish COPY table transaction
will be committed by P1 process and it will exit.

In the case of crash/restart we can simple start from beginning since
nothing is committed till every table is synced. There are 2 main
issues with this approach 1. I am not sure what side-effects we might
have on publisher since we might have to keep the transaction open for

long time.

I'm concerned that it would not be an acceptable downside that we keep a
transaction open until all tables are synchronized.

Okay, There is one more issue just using same snapshot will not stop table DDL
modifications, we need to have atleast access share lock on each tables.
So this will make tables locked on publisher, So this is essentially same as 1.

Regards
Sachin

#38Masahiko Sawada
sawada.mshk@gmail.com
In reply to: Masahiko Sawada (#34)
Re: Initial Schema Sync for Logical Replication

On Thu, Mar 30, 2023 at 10:11 PM Masahiko Sawada <sawada.mshk@gmail.com> wrote:

On Thu, Mar 30, 2023 at 12:18 AM Masahiko Sawada <sawada.mshk@gmail.com> wrote:

On Wed, Mar 29, 2023 at 7:57 PM Kumar, Sachin <ssetiya@amazon.com> wrote:

From: Amit Kapila <amit.kapila16@gmail.com>

I think we won't be able to use same snapshot because the
transaction will be committed.
In CreateSubscription() we can use the transaction snapshot from
walrcv_create_slot() till walrcv_disconnect() is called.(I am
not sure about this part maybe walrcv_disconnect() calls the commits

internally ?).

So somehow we need to keep this snapshot alive, even after
transaction is committed(or delay committing the transaction ,
but we can have CREATE SUBSCRIPTION with ENABLED=FALSE, so we
can have a restart before tableSync is able to use the same
snapshot.)

Can we think of getting the table data as well along with schema
via pg_dump? Won't then both schema and initial data will
correspond to the same snapshot?

Right , that will work, Thanks!

While it works, we cannot get the initial data in parallel, no?

I was thinking each TableSync process will call pg_dump --table, This way if we have N
tableSync process, we can have N pg_dump --table=table_name called in parallel.
In fact we can use --schema-only to get schema and then let COPY take care of data
syncing . We will use same snapshot for pg_dump as well as COPY table.

How can we postpone creating the pg_subscription_rel entries until the
tablesync worker starts and does the schema sync? I think that since
pg_subscription_rel entry needs the table OID, we need either to do
the schema sync before creating the entry (i.e, during CREATE
SUBSCRIPTION) or to postpone creating entries as Amit proposed[1]. The
apply worker needs the information of tables to sync in order to
launch the tablesync workers, but it needs to create the table schema
to get that information.

For the above reason, I think that step 6 of the initial proposal won't work.

If we can have the tablesync worker create an entry of
pg_subscription_rel after creating the table, it may give us the
flexibility to perform the initial sync. One idea is that we add a
relname field to pg_subscription_rel so that we can create entries
with relname instead of OID if the table is not created yet. Once the
table is created, we clear the relname field and set the OID of the
table instead. It's not an ideal solution but we might make it simpler
later.

While writing a PoC patch, I found some difficulties in this idea.
First, I tried to add schemaname+relname to pg_subscription_rel but I
could not define the primary key of pg_subscription_rel. The primary
key on (srsubid, srrelid) doesn't work since srrelid could be NULL.
Similarly, the primary key on (srsubid, srrelid, schemaname, relname)
also doesn't work. So I tried another idea: that we generate a new OID
for srrelid and the tablesync worker will replace it with the new
table's OID once it creates the table. However, since we use srrelid
in replication slot names, changing srrelid during the initial
schema+data sync is not straightforward (please note that the slot is
created by the tablesync worker but is removed by the apply worker).
Using relname in slot name instead of srrelid is not a good idea since
it requires all pg_subscription_rel entries have relname, and slot
names could be duplicated, for example, when the relname is very long
and we cut it.

I'm trying to consider the idea from another angle: the apply worker
fetches the table list and passes the relname to the tablesync worker.
But a problem of this approach is that the table list is not
persisted. If the apply worker restarts during the initial table sync,
it could not get the same list as before.

Regards,

--
Masahiko Sawada
Amazon Web Services: https://aws.amazon.com

#39Amit Kapila
amit.kapila16@gmail.com
In reply to: Masahiko Sawada (#38)
Re: Initial Schema Sync for Logical Replication

On Thu, Apr 6, 2023 at 6:57 PM Masahiko Sawada <sawada.mshk@gmail.com> wrote:

On Thu, Mar 30, 2023 at 10:11 PM Masahiko Sawada <sawada.mshk@gmail.com> wrote:

On Thu, Mar 30, 2023 at 12:18 AM Masahiko Sawada <sawada.mshk@gmail.com> wrote:

How can we postpone creating the pg_subscription_rel entries until the
tablesync worker starts and does the schema sync? I think that since
pg_subscription_rel entry needs the table OID, we need either to do
the schema sync before creating the entry (i.e, during CREATE
SUBSCRIPTION) or to postpone creating entries as Amit proposed[1]. The
apply worker needs the information of tables to sync in order to
launch the tablesync workers, but it needs to create the table schema
to get that information.

For the above reason, I think that step 6 of the initial proposal won't work.

If we can have the tablesync worker create an entry of
pg_subscription_rel after creating the table, it may give us the
flexibility to perform the initial sync. One idea is that we add a
relname field to pg_subscription_rel so that we can create entries
with relname instead of OID if the table is not created yet. Once the
table is created, we clear the relname field and set the OID of the
table instead. It's not an ideal solution but we might make it simpler
later.

While writing a PoC patch, I found some difficulties in this idea.
First, I tried to add schemaname+relname to pg_subscription_rel but I
could not define the primary key of pg_subscription_rel. The primary
key on (srsubid, srrelid) doesn't work since srrelid could be NULL.
Similarly, the primary key on (srsubid, srrelid, schemaname, relname)
also doesn't work.

Can we think of having a separate catalog table say
pg_subscription_remote_rel for this? You can have srsubid,
remote_schema_name, remote_rel_name, etc. We may need some other state
to be maintained during the initial schema sync where this table can
be used. Basically, this can be used to maintain the state till the
initial schema sync is complete because we can create a relation entry
in pg_subscritption_rel only after the initial schema sync is
complete.

So I tried another idea: that we generate a new OID
for srrelid and the tablesync worker will replace it with the new
table's OID once it creates the table. However, since we use srrelid
in replication slot names, changing srrelid during the initial
schema+data sync is not straightforward (please note that the slot is
created by the tablesync worker but is removed by the apply worker).
Using relname in slot name instead of srrelid is not a good idea since
it requires all pg_subscription_rel entries have relname, and slot
names could be duplicated, for example, when the relname is very long
and we cut it.

I'm trying to consider the idea from another angle: the apply worker
fetches the table list and passes the relname to the tablesync worker.
But a problem of this approach is that the table list is not
persisted. If the apply worker restarts during the initial table sync,
it could not get the same list as before.

Agreed, this has some drawbacks. We can try to explore this if the
above idea of the new catalog table doesn't solve this problem.

--
With Regards,
Amit Kapila.

#40Masahiko Sawada
sawada.mshk@gmail.com
In reply to: Amit Kapila (#39)
Re: Initial Schema Sync for Logical Replication

On Fri, Apr 7, 2023 at 6:37 PM Amit Kapila <amit.kapila16@gmail.com> wrote:

On Thu, Apr 6, 2023 at 6:57 PM Masahiko Sawada <sawada.mshk@gmail.com> wrote:

On Thu, Mar 30, 2023 at 10:11 PM Masahiko Sawada <sawada.mshk@gmail.com> wrote:

On Thu, Mar 30, 2023 at 12:18 AM Masahiko Sawada <sawada.mshk@gmail.com> wrote:

How can we postpone creating the pg_subscription_rel entries until the
tablesync worker starts and does the schema sync? I think that since
pg_subscription_rel entry needs the table OID, we need either to do
the schema sync before creating the entry (i.e, during CREATE
SUBSCRIPTION) or to postpone creating entries as Amit proposed[1]. The
apply worker needs the information of tables to sync in order to
launch the tablesync workers, but it needs to create the table schema
to get that information.

For the above reason, I think that step 6 of the initial proposal won't work.

If we can have the tablesync worker create an entry of
pg_subscription_rel after creating the table, it may give us the
flexibility to perform the initial sync. One idea is that we add a
relname field to pg_subscription_rel so that we can create entries
with relname instead of OID if the table is not created yet. Once the
table is created, we clear the relname field and set the OID of the
table instead. It's not an ideal solution but we might make it simpler
later.

While writing a PoC patch, I found some difficulties in this idea.
First, I tried to add schemaname+relname to pg_subscription_rel but I
could not define the primary key of pg_subscription_rel. The primary
key on (srsubid, srrelid) doesn't work since srrelid could be NULL.
Similarly, the primary key on (srsubid, srrelid, schemaname, relname)
also doesn't work.

Can we think of having a separate catalog table say
pg_subscription_remote_rel for this? You can have srsubid,
remote_schema_name, remote_rel_name, etc. We may need some other state
to be maintained during the initial schema sync where this table can
be used. Basically, this can be used to maintain the state till the
initial schema sync is complete because we can create a relation entry
in pg_subscritption_rel only after the initial schema sync is
complete.

It might not be ideal but I guess it works. But I think we need to
modify the name of replication slot for initial sync as it currently
includes OID of the table:

void
ReplicationSlotNameForTablesync(Oid suboid, Oid relid,
char *syncslotname, Size szslot)
{
snprintf(syncslotname, szslot, "pg_%u_sync_%u_" UINT64_FORMAT, suboid,
relid, GetSystemIdentifier());
}

If we use both schema name and table name, it's possible that slot
names are duplicated if schema and/or table names are long. Another
idea is to use the hash value of schema+table names, but it cannot
completely eliminate that possibility, and probably would make
investigation and debugging hard in case of any failure. Probably we
can use the OID of each entry in pg_subscription_remote_rel instead,
but I'm not sure it's a good idea, mainly because we will end up using
twice as many OIDs as before.

Regards,

--
Masahiko Sawada
Amazon Web Services: https://aws.amazon.com

#41Kumar, Sachin
ssetiya@amazon.com
In reply to: Masahiko Sawada (#40)
RE: Initial Schema Sync for Logical Replication

From: Masahiko Sawada <sawada.mshk@gmail.com>

While writing a PoC patch, I found some difficulties in this idea.
First, I tried to add schemaname+relname to pg_subscription_rel but
I could not define the primary key of pg_subscription_rel. The
primary key on (srsubid, srrelid) doesn't work since srrelid could be NULL.
Similarly, the primary key on (srsubid, srrelid, schemaname,
relname) also doesn't work.

Can we think of having a separate catalog table say
pg_subscription_remote_rel for this? You can have srsubid,
remote_schema_name, remote_rel_name, etc. We may need some other

state

to be maintained during the initial schema sync where this table can
be used. Basically, this can be used to maintain the state till the
initial schema sync is complete because we can create a relation entry
in pg_subscritption_rel only after the initial schema sync is
complete.

It might not be ideal but I guess it works. But I think we need to modify the name
of replication slot for initial sync as it currently includes OID of the table:

void
ReplicationSlotNameForTablesync(Oid suboid, Oid relid,
char *syncslotname, Size szslot) {
snprintf(syncslotname, szslot, "pg_%u_sync_%u_" UINT64_FORMAT, suboid,
relid, GetSystemIdentifier()); }

If we use both schema name and table name, it's possible that slot names are
duplicated if schema and/or table names are long. Another idea is to use the
hash value of schema+table names, but it cannot completely eliminate that
possibility, and probably would make investigation and debugging hard in case
of any failure. Probably we can use the OID of each entry in
pg_subscription_remote_rel instead, but I'm not sure it's a good idea, mainly
because we will end up using twice as many OIDs as before.

Maybe we can create serial primary key for pg_subscription_remote_rel table
And use this key for creating replication slot ?

Regards
Sachin

#42Kumar, Sachin
ssetiya@amazon.com
In reply to: Kumar, Sachin (#41)
RE: Initial Schema Sync for Logical Replication

I am working on a prototype with above Idea , and will send it for review by Sunday/Monday

Regards
Sachin

#43Amit Kapila
amit.kapila16@gmail.com
In reply to: Masahiko Sawada (#40)
Re: Initial Schema Sync for Logical Replication

On Mon, Apr 17, 2023 at 9:12 AM Masahiko Sawada <sawada.mshk@gmail.com> wrote:

On Fri, Apr 7, 2023 at 6:37 PM Amit Kapila <amit.kapila16@gmail.com> wrote:

On Thu, Apr 6, 2023 at 6:57 PM Masahiko Sawada <sawada.mshk@gmail.com> wrote:

While writing a PoC patch, I found some difficulties in this idea.
First, I tried to add schemaname+relname to pg_subscription_rel but I
could not define the primary key of pg_subscription_rel. The primary
key on (srsubid, srrelid) doesn't work since srrelid could be NULL.
Similarly, the primary key on (srsubid, srrelid, schemaname, relname)
also doesn't work.

Can we think of having a separate catalog table say
pg_subscription_remote_rel for this? You can have srsubid,
remote_schema_name, remote_rel_name, etc. We may need some other state
to be maintained during the initial schema sync where this table can
be used. Basically, this can be used to maintain the state till the
initial schema sync is complete because we can create a relation entry
in pg_subscritption_rel only after the initial schema sync is
complete.

It might not be ideal but I guess it works. But I think we need to
modify the name of replication slot for initial sync as it currently
includes OID of the table:

void
ReplicationSlotNameForTablesync(Oid suboid, Oid relid,
char *syncslotname, Size szslot)
{
snprintf(syncslotname, szslot, "pg_%u_sync_%u_" UINT64_FORMAT, suboid,
relid, GetSystemIdentifier());
}

If we use both schema name and table name, it's possible that slot
names are duplicated if schema and/or table names are long. Another
idea is to use the hash value of schema+table names, but it cannot
completely eliminate that possibility, and probably would make
investigation and debugging hard in case of any failure. Probably we
can use the OID of each entry in pg_subscription_remote_rel instead,
but I'm not sure it's a good idea, mainly because we will end up using
twice as many OIDs as before.

The other possibility is to use worker_pid. To make debugging easier,
we may want to LOG schema_name+rel_name vs slot_name mapping at DEBUG1
log level.

--
With Regards,
Amit Kapila.

#44Kumar, Sachin
ssetiya@amazon.com
In reply to: Kumar, Sachin (#41)
Re: Initial Schema Sync for Logical Replication

I am working on a prototype with above discussed idea, I think I will send it for initial review by Monday.

Regards
Sachin

#45Masahiko Sawada
sawada.mshk@gmail.com
In reply to: Kumar, Sachin (#44)
Re: Initial Schema Sync for Logical Replication

On Thu, Apr 20, 2023 at 9:41 PM Kumar, Sachin <ssetiya@amazon.com> wrote:

I am working on a prototype with above discussed idea, I think I will send it for initial review by Monday.

Okay, but which idea are you referring to? pg_subscription_remote_rel
+ worker_pid idea Amit proposed?

Regards,

--
Masahiko Sawada
Amazon Web Services: https://aws.amazon.com

#46Masahiko Sawada
sawada.mshk@gmail.com
In reply to: Amit Kapila (#43)
1 attachment(s)
Re: Initial Schema Sync for Logical Replication

On Thu, Apr 20, 2023 at 8:16 PM Amit Kapila <amit.kapila16@gmail.com> wrote:

On Mon, Apr 17, 2023 at 9:12 AM Masahiko Sawada <sawada.mshk@gmail.com> wrote:

On Fri, Apr 7, 2023 at 6:37 PM Amit Kapila <amit.kapila16@gmail.com> wrote:

On Thu, Apr 6, 2023 at 6:57 PM Masahiko Sawada <sawada.mshk@gmail.com> wrote:

While writing a PoC patch, I found some difficulties in this idea.
First, I tried to add schemaname+relname to pg_subscription_rel but I
could not define the primary key of pg_subscription_rel. The primary
key on (srsubid, srrelid) doesn't work since srrelid could be NULL.
Similarly, the primary key on (srsubid, srrelid, schemaname, relname)
also doesn't work.

Can we think of having a separate catalog table say
pg_subscription_remote_rel for this? You can have srsubid,
remote_schema_name, remote_rel_name, etc. We may need some other state
to be maintained during the initial schema sync where this table can
be used. Basically, this can be used to maintain the state till the
initial schema sync is complete because we can create a relation entry
in pg_subscritption_rel only after the initial schema sync is
complete.

It might not be ideal but I guess it works. But I think we need to
modify the name of replication slot for initial sync as it currently
includes OID of the table:

void
ReplicationSlotNameForTablesync(Oid suboid, Oid relid,
char *syncslotname, Size szslot)
{
snprintf(syncslotname, szslot, "pg_%u_sync_%u_" UINT64_FORMAT, suboid,
relid, GetSystemIdentifier());
}

If we use both schema name and table name, it's possible that slot
names are duplicated if schema and/or table names are long. Another
idea is to use the hash value of schema+table names, but it cannot
completely eliminate that possibility, and probably would make
investigation and debugging hard in case of any failure. Probably we
can use the OID of each entry in pg_subscription_remote_rel instead,
but I'm not sure it's a good idea, mainly because we will end up using
twice as many OIDs as before.

The other possibility is to use worker_pid. To make debugging easier,
we may want to LOG schema_name+rel_name vs slot_name mapping at DEBUG1
log level.

Since worker_pid changes after the worker restarts, a new worker
cannot find the slot that had been used, no?

After thinking it over, a better solution would be that we add an oid
column, nspname column, and relname column to pg_subscription_rel and
the primary key on the oid. If the table is not present on the
subscriber we store the schema name and table name to the catalog, and
otherwise we store the local table oid same as today. The local table
oid will be filled after the schema sync. The names of origin and
replication slot the tablesync worker uses use the oid instead of the
table oid.

I've attached a PoC patch of this idea (very rough patch and has many
TODO comments). It mixes the following changes:

1. Add oid column to the pg_subscription_rel. The oid is used as the
primary key and in the names of origin and slot the tablesync workers
use.

2. Add copy_schema = on/off option to CREATE SUBSCRIPTION (not yet
support for ALTER SUBSCRIPTION).

3. Add CRS_EXPORT_USE_SNAPSHOT new action in order to use the same
snapshot by both walsender and other processes (e.g. pg_dump). In this
patch, the snapshot is exported for pg_dump and is used by the
walsender for COPY.

It seems to work well but there might be a pitfall as I've not fully
implemented it.

Regards,

--
Masahiko Sawada
Amazon Web Services: https://aws.amazon.com

Attachments:

0001-Poc-initial-table-structure-synchronization-in-logic.patchapplication/octet-stream; name=0001-Poc-initial-table-structure-synchronization-in-logic.patchDownload
From ed0512f321bc98dc16a2bc2cc84a919325060685 Mon Sep 17 00:00:00 2001
From: Masahiko Sawada <sawada.mshk@gmail.com>
Date: Fri, 21 Apr 2023 17:14:04 +0900
Subject: [PATCH] Poc: initial table structure synchronization in logical
 replication.

The key idea is to postpone creation of pg_subscription_rel entry to
the point where the tablesync worker synchronizes and creates the
table on the subscriber.

It is a PoC patch so mixed the following changes:

- Add oid column to the pg_subscription_rel.
  - use it as the primary key.
  - use it in the names of origin and slot the tablesync workers use.
- Add copy_schema = on/off option to CREATE SUBSCRIPTION.
  - not yet support for ALTER SUBSCRIPTION.
- Add CRS_EXPORT_USE_SNAPSHOT new action.
  - to use the same snapshot by walsender AND other processes (e.g. pg_dump).
  - the snapshot is exported for pg_dump and is used for COPY.
---
 src/backend/catalog/heap.c                    |   2 +-
 src/backend/catalog/pg_subscription.c         | 270 +++++++++++++++---
 src/backend/catalog/system_views.sql          |   2 +-
 src/backend/commands/subscriptioncmds.c       | 107 +++++--
 .../libpqwalreceiver/libpqwalreceiver.c       |   6 +
 .../replication/logical/applyparallelworker.c |   2 +-
 src/backend/replication/logical/launcher.c    |  34 +--
 src/backend/replication/logical/relation.c    |   6 +-
 src/backend/replication/logical/snapbuild.c   |  25 +-
 src/backend/replication/logical/tablesync.c   | 246 ++++++++++++----
 src/backend/replication/logical/worker.c      |  18 +-
 src/backend/replication/walsender.c           |  11 +-
 src/backend/utils/cache/syscache.c            |   7 +-
 src/include/catalog/pg_proc.dat               |   2 +-
 src/include/catalog/pg_subscription_rel.h     |  39 ++-
 src/include/replication/snapbuild.h           |   2 +-
 src/include/replication/walsender.h           |   3 +-
 src/include/replication/worker_internal.h     |  10 +-
 src/include/utils/syscache.h                  |   2 +-
 19 files changed, 629 insertions(+), 165 deletions(-)

diff --git a/src/backend/catalog/heap.c b/src/backend/catalog/heap.c
index 2a0d82aedd..32414c30cf 100644
--- a/src/backend/catalog/heap.c
+++ b/src/backend/catalog/heap.c
@@ -1865,7 +1865,7 @@ heap_drop_with_catalog(Oid relid)
 	/*
 	 * Remove any associated relation synchronization states.
 	 */
-	RemoveSubscriptionRel(InvalidOid, relid);
+	RemoveSubscriptionRel(InvalidOid, relid, InvalidOid);
 
 	/*
 	 * Forget any ON COMMIT action for the rel
diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c
index d07f88ce28..b52168ce9b 100644
--- a/src/backend/catalog/pg_subscription.c
+++ b/src/backend/catalog/pg_subscription.c
@@ -35,6 +35,7 @@
 #include "utils/syscache.h"
 
 static List *textarray_to_stringlist(ArrayType *textarray);
+static SubscriptionRelState *deconstruct_subrelstate(HeapTuple tup);
 
 /*
  * Fetch the subscription from the syscache.
@@ -227,8 +228,8 @@ textarray_to_stringlist(ArrayType *textarray)
  * Add new state record for a subscription table.
  */
 void
-AddSubscriptionRelState(Oid subid, Oid relid, char state,
-						XLogRecPtr sublsn)
+AddSubscriptionRelState(Oid subid, Oid relid, char state, bool syncschema, bool syncdata,
+						XLogRecPtr sublsn, char *nspname, char *relname)
 {
 	Relation	rel;
 	HeapTuple	tup;
@@ -239,25 +240,40 @@ AddSubscriptionRelState(Oid subid, Oid relid, char state,
 
 	rel = table_open(SubscriptionRelRelationId, RowExclusiveLock);
 
-	/* Try finding existing mapping. */
-	tup = SearchSysCacheCopy2(SUBSCRIPTIONRELMAP,
-							  ObjectIdGetDatum(relid),
-							  ObjectIdGetDatum(subid));
-	if (HeapTupleIsValid(tup))
-		elog(ERROR, "subscription table %u in subscription %u already exists",
-			 relid, subid);
+	/* XXX: existence check */
 
 	/* Form the tuple. */
 	memset(values, 0, sizeof(values));
 	memset(nulls, false, sizeof(nulls));
+	values[Anum_pg_subscription_rel_oid - 1] =
+		GetNewOidWithIndex(rel, SubscriptionRelObjectIdIndexId,
+						   Anum_pg_subscription_rel_oid);
 	values[Anum_pg_subscription_rel_srsubid - 1] = ObjectIdGetDatum(subid);
-	values[Anum_pg_subscription_rel_srrelid - 1] = ObjectIdGetDatum(relid);
+
+	if (OidIsValid(relid))
+		values[Anum_pg_subscription_rel_srrelid - 1] = ObjectIdGetDatum(relid);
+	else
+		nulls[Anum_pg_subscription_rel_srrelid - 1] = true;
+
 	values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state);
 	if (sublsn != InvalidXLogRecPtr)
 		values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn);
 	else
 		nulls[Anum_pg_subscription_rel_srsublsn - 1] = true;
 
+	if (nspname)
+		values[Anum_pg_subscription_rel_srnspname - 1] = CStringGetDatum(nspname);
+	else
+		nulls[Anum_pg_subscription_rel_srnspname - 1] = true;
+
+	if (relname)
+		values[Anum_pg_subscription_rel_srrelname - 1] = CStringGetDatum(relname);
+	else
+		nulls[Anum_pg_subscription_rel_srrelname - 1] = true;
+
+	values[Anum_pg_subscription_rel_srsyncschema - 1] = BoolGetDatum(syncschema);
+	values[Anum_pg_subscription_rel_srsyncdata - 1] = BoolGetDatum(syncdata);
+
 	tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
 
 	/* Insert tuple into catalog. */
@@ -269,11 +285,49 @@ AddSubscriptionRelState(Oid subid, Oid relid, char state,
 	table_close(rel, NoLock);
 }
 
+/* Update srrelid to the given relid */
+void
+UpdateSubscriptionRelRelid(Oid subid, Oid subrelid, Oid relid)
+{
+	Relation	rel;
+	HeapTuple	tup;
+	bool		nulls[Natts_pg_subscription_rel];
+	Datum		values[Natts_pg_subscription_rel];
+	bool		replaces[Natts_pg_subscription_rel];
+
+	LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
+
+	rel = table_open(SubscriptionRelRelationId, RowExclusiveLock);
+
+	tup = SearchSysCacheCopy1(SUBSCRIPTIONRELOID, ObjectIdGetDatum(subrelid));
+
+	/* XXX: need to distinguish from message in UpdateSubscriptionRelState() */
+	if (!HeapTupleIsValid(tup))
+		elog(ERROR, "subscription table %u in subscription %u does not exist",
+			 subrelid, subid);
+
+	/* Update the tuple. */
+	memset(values, 0, sizeof(values));
+	memset(nulls, false, sizeof(nulls));
+	memset(replaces, false, sizeof(replaces));
+
+	replaces[Anum_pg_subscription_rel_srrelid - 1] = true;
+	values[Anum_pg_subscription_rel_srrelid - 1] = ObjectIdGetDatum(relid);
+
+	tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
+							replaces);
+
+	/* Update the catalog. */
+	CatalogTupleUpdate(rel, &tup->t_self, tup);
+
+	table_close(rel, NoLock);
+}
+
 /*
  * Update the state of a subscription table.
  */
 void
-UpdateSubscriptionRelState(Oid subid, Oid relid, char state,
+UpdateSubscriptionRelState(Oid subid, Oid subrelid, char state,
 						   XLogRecPtr sublsn)
 {
 	Relation	rel;
@@ -287,12 +341,10 @@ UpdateSubscriptionRelState(Oid subid, Oid relid, char state,
 	rel = table_open(SubscriptionRelRelationId, RowExclusiveLock);
 
 	/* Try finding existing mapping. */
-	tup = SearchSysCacheCopy2(SUBSCRIPTIONRELMAP,
-							  ObjectIdGetDatum(relid),
-							  ObjectIdGetDatum(subid));
+	tup = SearchSysCacheCopy1(SUBSCRIPTIONRELOID, ObjectIdGetDatum(subrelid));
 	if (!HeapTupleIsValid(tup))
 		elog(ERROR, "subscription table %u in subscription %u does not exist",
-			 relid, subid);
+			 subrelid, subid);
 
 	/* Update the tuple. */
 	memset(values, 0, sizeof(values));
@@ -318,13 +370,76 @@ UpdateSubscriptionRelState(Oid subid, Oid relid, char state,
 	table_close(rel, NoLock);
 }
 
+/*
+ * Similar to GetSubscriptionRelState, but search the entry by the relid.
+ *
+ * XXX: Note that we cannot use syscache for searching the pg_subscription_rel entry
+ * by (srsubid, srrelid) because cache key columns should always be not  NULL (see
+ * CatalogCacheInitializeCache() for details).
+ *
+ * XXX: duplicated with GetSubscriptionRelState().
+ */
+char
+GetSubscriptoinRelStateByRelid(Oid subid, Oid relid, XLogRecPtr *sublsn)
+{
+	Relation rel;
+	ScanKeyData skey[2];
+	SysScanDesc scan;
+	HeapTuple tup;
+	char substate;
+	Datum d;
+	bool isnull;
+
+	rel = table_open(SubscriptionRelRelationId, AccessShareLock);
+
+	ScanKeyInit(&skey[0],
+				Anum_pg_subscription_rel_srrelid,
+				BTEqualStrategyNumber, F_OIDEQ,
+				ObjectIdGetDatum(relid));
+	ScanKeyInit(&skey[1],
+				Anum_pg_subscription_rel_srsubid,
+				BTEqualStrategyNumber, F_OIDEQ,
+				ObjectIdGetDatum(subid));
+
+	scan = systable_beginscan(rel, SubscriptionRelSrrelidSrsubidIndexId, true,
+							  NULL, 2, skey);
+
+	tup = systable_getnext(scan);
+
+
+	if (!HeapTupleIsValid(tup))
+	{
+		systable_endscan(scan);
+		table_close(rel, AccessShareLock);
+		*sublsn = InvalidXLogRecPtr;
+		return SUBREL_STATE_UNKNOWN;
+	}
+
+	/* Get the state. */
+	substate = ((Form_pg_subscription_rel) GETSTRUCT(tup))->srsubstate;
+
+	/* Get the LSN */
+	d = SysCacheGetAttr(SUBSCRIPTIONRELOID, tup,
+						Anum_pg_subscription_rel_srsublsn, &isnull);
+	if (isnull)
+		*sublsn = InvalidXLogRecPtr;
+	else
+		*sublsn = DatumGetLSN(d);
+
+	/* Cleanup */
+	systable_endscan(scan);
+	table_close(rel, AccessShareLock);
+
+	return substate;
+}
+
 /*
  * Get state of subscription table.
  *
  * Returns SUBREL_STATE_UNKNOWN when the table is not in the subscription.
  */
 char
-GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn)
+GetSubscriptionRelState(Oid subid, Oid subrelid, XLogRecPtr *sublsn)
 {
 	HeapTuple	tup;
 	char		substate;
@@ -339,9 +454,7 @@ GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn)
 	rel = table_open(SubscriptionRelRelationId, AccessShareLock);
 
 	/* Try finding the mapping. */
-	tup = SearchSysCache2(SUBSCRIPTIONRELMAP,
-						  ObjectIdGetDatum(relid),
-						  ObjectIdGetDatum(subid));
+	tup = SearchSysCache1(SUBSCRIPTIONRELOID, ObjectIdGetDatum(subrelid));
 
 	if (!HeapTupleIsValid(tup))
 	{
@@ -354,7 +467,7 @@ GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn)
 	substate = ((Form_pg_subscription_rel) GETSTRUCT(tup))->srsubstate;
 
 	/* Get the LSN */
-	d = SysCacheGetAttr(SUBSCRIPTIONRELMAP, tup,
+	d = SysCacheGetAttr(SUBSCRIPTIONRELOID, tup,
 						Anum_pg_subscription_rel_srsublsn, &isnull);
 	if (isnull)
 		*sublsn = InvalidXLogRecPtr;
@@ -369,16 +482,105 @@ GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn)
 	return substate;
 }
 
+/* Get palloc'ed SubscriptionRelState of the given subrelid */
+SubscriptionRelState *
+GetSubscriptionRelByOid(Oid subrelid)
+{
+	Relation	rel;
+	HeapTuple	tup;
+	SubscriptionRelState *relstate;
+
+	/*
+	 * This is to avoid the race condition with AlterSubscription which tries
+	 * to remove this relstate.
+	 */
+	rel = table_open(SubscriptionRelRelationId, AccessShareLock);
+
+	tup = SearchSysCache1(SUBSCRIPTIONRELOID, ObjectIdGetDatum(subrelid));
+
+	if (!HeapTupleIsValid(tup))
+	{
+		table_close(rel, AccessShareLock);
+		return NULL;
+	}
+
+	relstate = deconstruct_subrelstate(tup);
+
+	/* Cleanup */
+	ReleaseSysCache(tup);
+	table_close(rel, AccessShareLock);
+
+	return relstate;
+}
+
+/*
+ * Extract subscription relation state information from the heap tuple and return palloc'ed
+ * SubscriptionRelState.
+ */
+static SubscriptionRelState *
+deconstruct_subrelstate(HeapTuple tup)
+{
+	Form_pg_subscription_rel subrel_form;
+	SubscriptionRelState *relstate;
+	Datum d;
+	bool isnull;
+
+	subrel_form = (Form_pg_subscription_rel) GETSTRUCT(tup);
+
+	relstate = (SubscriptionRelState *) palloc(sizeof(SubscriptionRelState));
+	relstate->oid = subrel_form->oid;
+
+	/* Get the LSN */
+	d = SysCacheGetAttr(SUBSCRIPTIONRELOID, tup,
+						Anum_pg_subscription_rel_srrelid, &isnull);
+	if (isnull)
+		relstate->relid = InvalidOid;
+	else
+		relstate->relid = DatumGetObjectId(d);
+
+	/* Get the LSN */
+	d = SysCacheGetAttr(SUBSCRIPTIONRELOID, tup,
+						Anum_pg_subscription_rel_srsublsn, &isnull);
+	if (isnull)
+		relstate->lsn = InvalidXLogRecPtr;
+	else
+		relstate->lsn = DatumGetLSN(d);
+
+	relstate->state = subrel_form->srsubstate;
+
+	/* srnspname */
+	d = SysCacheGetAttr(SUBSCRIPTIONRELOID, tup,
+						Anum_pg_subscription_rel_srnspname, &isnull);
+	if (isnull)
+		relstate->nspname = NULL;
+	else
+		relstate->nspname = pstrdup(NameStr(*DatumGetName(d)));
+
+	/* srrelname */
+	d = SysCacheGetAttr(SUBSCRIPTIONRELOID, tup,
+						Anum_pg_subscription_rel_srrelname, &isnull);
+	if (isnull)
+		relstate->relname = NULL;
+	else
+		relstate->relname = pstrdup(NameStr(*DatumGetName(d)));
+
+	/* syncflags */
+	relstate->syncflags =
+		(((subrel_form->srsyncschema) ? SUBREL_SYNC_KIND_SCHEMA : 0) |
+		 ((subrel_form->srsyncdata) ? SUBREL_SYNC_KIND_DATA : 0));
+
+	return relstate;
+}
 /*
  * Drop subscription relation mapping. These can be for a particular
  * subscription, or for a particular relation, or both.
  */
 void
-RemoveSubscriptionRel(Oid subid, Oid relid)
+RemoveSubscriptionRel(Oid subid, Oid relid, Oid subrelid)
 {
 	Relation	rel;
 	TableScanDesc scan;
-	ScanKeyData skey[2];
+	ScanKeyData skey[3];
 	HeapTuple	tup;
 	int			nkeys = 0;
 
@@ -402,6 +604,15 @@ RemoveSubscriptionRel(Oid subid, Oid relid)
 					ObjectIdGetDatum(relid));
 	}
 
+	if (OidIsValid(subrelid))
+	{
+		ScanKeyInit(&skey[nkeys++],
+					Anum_pg_subscription_rel_oid,
+					BTEqualStrategyNumber,
+					F_OIDEQ,
+					ObjectIdGetDatum(subrelid));
+	}
+
 	/* Do the search and delete what we found. */
 	scan = table_beginscan_catalog(rel, nkeys, skey);
 	while (HeapTupleIsValid(tup = heap_getnext(scan, ForwardScanDirection)))
@@ -511,22 +722,9 @@ GetSubscriptionRelations(Oid subid, bool not_ready)
 
 	while (HeapTupleIsValid(tup = systable_getnext(scan)))
 	{
-		Form_pg_subscription_rel subrel;
 		SubscriptionRelState *relstate;
-		Datum		d;
-		bool		isnull;
-
-		subrel = (Form_pg_subscription_rel) GETSTRUCT(tup);
 
-		relstate = (SubscriptionRelState *) palloc(sizeof(SubscriptionRelState));
-		relstate->relid = subrel->srrelid;
-		relstate->state = subrel->srsubstate;
-		d = SysCacheGetAttr(SUBSCRIPTIONRELMAP, tup,
-							Anum_pg_subscription_rel_srsublsn, &isnull);
-		if (isnull)
-			relstate->lsn = InvalidXLogRecPtr;
-		else
-			relstate->lsn = DatumGetLSN(d);
+		relstate = deconstruct_subrelstate(tup);
 
 		res = lappend(res, relstate);
 	}
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index 48aacf66ee..de9988e72e 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -951,7 +951,7 @@ CREATE VIEW pg_stat_subscription AS
             su.subname,
             st.pid,
             st.leader_pid,
-            st.relid,
+            st.subrelid,
             st.received_lsn,
             st.last_msg_send_time,
             st.last_msg_receipt_time,
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 56eafbff10..a08412fb11 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -71,6 +71,7 @@
 #define SUBOPT_RUN_AS_OWNER			0x00001000
 #define SUBOPT_LSN					0x00002000
 #define SUBOPT_ORIGIN				0x00004000
+#define SUBOPT_COPY_SCHEMA			0x00008000
 
 /* check if the 'val' has 'bits' set */
 #define IsSet(val, bits)  (((val) & (bits)) == (bits))
@@ -88,6 +89,8 @@ typedef struct SubOpts
 	bool		enabled;
 	bool		create_slot;
 	bool		copy_data;
+	/* XXX: want to choose synchronizing only tables or all objects? */
+	bool		copy_schema;
 	bool		refresh;
 	bool		binary;
 	char		streaming;
@@ -141,6 +144,8 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
 		opts->create_slot = true;
 	if (IsSet(supported_opts, SUBOPT_COPY_DATA))
 		opts->copy_data = true;
+	if (IsSet(supported_opts, SUBOPT_COPY_SCHEMA))
+		opts->copy_data = true;
 	if (IsSet(supported_opts, SUBOPT_REFRESH))
 		opts->refresh = true;
 	if (IsSet(supported_opts, SUBOPT_BINARY))
@@ -214,6 +219,15 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
 			opts->specified_opts |= SUBOPT_COPY_DATA;
 			opts->copy_data = defGetBoolean(defel);
 		}
+		else if (IsSet(supported_opts, SUBOPT_COPY_SCHEMA) &&
+				 strcmp(defel->defname, "copy_schema") == 0)
+		{
+			if (IsSet(opts->specified_opts, SUBOPT_COPY_SCHEMA))
+				errorConflictingDefElem(defel, pstate);
+
+			opts->specified_opts |= SUBOPT_COPY_SCHEMA;
+			opts->copy_schema = defGetBoolean(defel);
+		}
 		else if (IsSet(supported_opts, SUBOPT_SYNCHRONOUS_COMMIT) &&
 				 strcmp(defel->defname, "synchronous_commit") == 0)
 		{
@@ -388,10 +402,18 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
 					 errmsg("%s and %s are mutually exclusive options",
 							"connect = false", "copy_data = true")));
 
+		if (opts->copy_schema &&
+			IsSet(opts->specified_opts, SUBOPT_COPY_SCHEMA))
+			ereport(ERROR,
+					(errcode(ERRCODE_SYNTAX_ERROR),
+					 errmsg("%s and %s are mutually exclusive options",
+							"connect = false", "copy_schema = true")));
+
 		/* Change the defaults of other options. */
 		opts->enabled = false;
 		opts->create_slot = false;
 		opts->copy_data = false;
+		opts->copy_schema = false;
 	}
 
 	/*
@@ -587,7 +609,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 	 * Connection and publication should not be specified here.
 	 */
 	supported_opts = (SUBOPT_CONNECT | SUBOPT_ENABLED | SUBOPT_CREATE_SLOT |
-					  SUBOPT_SLOT_NAME | SUBOPT_COPY_DATA |
+					  SUBOPT_SLOT_NAME | SUBOPT_COPY_DATA | SUBOPT_COPY_SCHEMA |
 					  SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY |
 					  SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT |
 					  SUBOPT_DISABLE_ON_ERR | SUBOPT_PASSWORD_REQUIRED |
@@ -752,7 +774,10 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 			 * Set sync state based on if we were asked to do data copy or
 			 * not.
 			 */
-			table_state = opts.copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY;
+			if (opts.copy_data || opts.copy_schema)
+				table_state = SUBREL_STATE_INIT;
+			else
+				table_state = SUBREL_STATE_READY;
 
 			/*
 			 * Get the table list from publisher and build local table status
@@ -762,16 +787,27 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 			foreach(lc, tables)
 			{
 				RangeVar   *rv = (RangeVar *) lfirst(lc);
-				Oid			relid;
+				Oid			relid = InvalidOid;
+				char 		*nspname, *relname = NULL;
 
-				relid = RangeVarGetRelid(rv, AccessShareLock, false);
+				if (opts.copy_schema)
+				{
+					nspname = rv->schemaname;
+					relname = rv->relname;
+				}
+				else
+				{
+					/* The relation should already be present on the subscriber */
+					relid = RangeVarGetRelid(rv, AccessShareLock, false);
 
-				/* Check for supported relkind. */
-				CheckSubscriptionRelkind(get_rel_relkind(relid),
-										 rv->schemaname, rv->relname);
+					/* Check for supported relkind. */
+					CheckSubscriptionRelkind(get_rel_relkind(relid),
+											 rv->schemaname, rv->relname);
+				}
 
-				AddSubscriptionRelState(subid, relid, table_state,
-										InvalidXLogRecPtr);
+				AddSubscriptionRelState(subid, relid, table_state, opts.copy_schema,
+										opts.copy_data, InvalidXLogRecPtr, nspname,
+										relname);
 			}
 
 			/*
@@ -898,7 +934,7 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data,
 		{
 			SubscriptionRelState *relstate = (SubscriptionRelState *) lfirst(lc);
 
-			subrel_local_oids[off++] = relstate->relid;
+			subrel_local_oids[off++] = relstate->oid;
 		}
 		qsort(subrel_local_oids, subrel_count,
 			  sizeof(Oid), oid_cmp);
@@ -939,9 +975,11 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data,
 			if (!bsearch(&relid, subrel_local_oids,
 						 subrel_count, sizeof(Oid), oid_cmp))
 			{
+				/* XXX: support sync schema */
 				AddSubscriptionRelState(sub->oid, relid,
 										copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY,
-										InvalidXLogRecPtr);
+										false, copy_data,
+										InvalidXLogRecPtr, NULL, NULL);
 				ereport(DEBUG1,
 						(errmsg_internal("table \"%s.%s\" added to subscription \"%s\"",
 										 rv->schemaname, rv->relname, sub->name)));
@@ -958,13 +996,13 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data,
 		remove_rel_len = 0;
 		for (off = 0; off < subrel_count; off++)
 		{
-			Oid			relid = subrel_local_oids[off];
+			Oid			subrelid = subrel_local_oids[off];
 
-			if (!bsearch(&relid, pubrel_local_oids,
+			if (!bsearch(&subrelid, pubrel_local_oids,
 						 list_length(pubrel_names), sizeof(Oid), oid_cmp))
 			{
+				SubscriptionRelState *relstate;
 				char		state;
-				XLogRecPtr	statelsn;
 
 				/*
 				 * Lock pg_subscription_rel with AccessExclusiveLock to
@@ -984,14 +1022,16 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data,
 					rel = table_open(SubscriptionRelRelationId, AccessExclusiveLock);
 
 				/* Last known rel state. */
-				state = GetSubscriptionRelState(sub->oid, relid, &statelsn);
+				relstate = GetSubscriptionRelByOid(subrelid);
 
-				sub_remove_rels[remove_rel_len].relid = relid;
+				state = relstate->state;
+
+				sub_remove_rels[remove_rel_len].relid = subrelid;
 				sub_remove_rels[remove_rel_len++].state = state;
 
-				RemoveSubscriptionRel(sub->oid, relid);
+				RemoveSubscriptionRel(InvalidOid, InvalidOid, subrelid);
 
-				logicalrep_worker_stop(sub->oid, relid);
+				logicalrep_worker_stop(sub->oid, subrelid);
 
 				/*
 				 * For READY state, we would have already dropped the
@@ -1011,16 +1051,22 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data,
 					 * origin and by this time the origin might be already
 					 * removed. For these reasons, passing missing_ok = true.
 					 */
-					ReplicationOriginNameForLogicalRep(sub->oid, relid, originname,
+					ReplicationOriginNameForLogicalRep(sub->oid, subrelid, originname,
 													   sizeof(originname));
 					replorigin_drop_by_name(originname, true, false);
 				}
 
-				ereport(DEBUG1,
-						(errmsg_internal("table \"%s.%s\" removed from subscription \"%s\"",
-										 get_namespace_name(get_rel_namespace(relid)),
-										 get_rel_name(relid),
-										 sub->name)));
+				if (OidIsValid(relstate->relid))
+					ereport(DEBUG1,
+							(errmsg_internal("table \"%s.%s\" removed from subscription \"%s\"",
+											 get_namespace_name(get_rel_namespace(relstate->relid)),
+											 get_rel_name(relstate->relid),
+											 sub->name)));
+				else
+					ereport(DEBUG1,
+							(errmsg_internal("table \"%s.%s\" removed from subscription \"%s\"",
+											 relstate->nspname, relstate->relname,
+											 sub->name)));
 			}
 		}
 
@@ -1250,6 +1296,9 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 
 		case ALTER_SUBSCRIPTION_SET_PUBLICATION:
 			{
+				/*
+				 * XXX support SET PUBLICATION WITH (copy_schema = xx)
+				 */
 				supported_opts = SUBOPT_COPY_DATA | SUBOPT_REFRESH;
 				parse_subscription_options(pstate, stmt->options,
 										   supported_opts, &opts);
@@ -1297,6 +1346,9 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 				List	   *publist;
 				bool		isadd = stmt->kind == ALTER_SUBSCRIPTION_ADD_PUBLICATION;
 
+				/*
+				 * XXX support ADD/DROP PUBLICATION WITH (copy_schema = xx)
+				 */
 				supported_opts = SUBOPT_REFRESH | SUBOPT_COPY_DATA;
 				parse_subscription_options(pstate, stmt->options,
 										   supported_opts, &opts);
@@ -1357,6 +1409,9 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 							(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
 							 errmsg("ALTER SUBSCRIPTION ... REFRESH is not allowed for disabled subscriptions")));
 
+				/*
+				 * XXX support REFRESH PUBLICATION WITH (copy_schema = xx)
+				 */
 				parse_subscription_options(pstate, stmt->options,
 										   SUBOPT_COPY_DATA, &opts);
 
@@ -1589,7 +1644,7 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
 	{
 		LogicalRepWorker *w = (LogicalRepWorker *) lfirst(lc);
 
-		logicalrep_worker_stop(w->subid, w->relid);
+		logicalrep_worker_stop(w->subid, w->subrelid);
 	}
 	list_free(subworkers);
 
@@ -1638,7 +1693,7 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
 	deleteSharedDependencyRecordsFor(SubscriptionRelationId, subid, 0);
 
 	/* Remove any associated relation synchronization states. */
-	RemoveSubscriptionRel(subid, InvalidOid);
+	RemoveSubscriptionRel(subid, InvalidOid, InvalidOid);
 
 	/* Remove the origin tracking if exists. */
 	ReplicationOriginNameForLogicalRep(subid, InvalidOid, originname, sizeof(originname));
diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
index 052505e46f..8bc5bbe935 100644
--- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
+++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
@@ -927,6 +927,9 @@ libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname,
 				case CRS_USE_SNAPSHOT:
 					appendStringInfoString(&cmd, "SNAPSHOT 'use'");
 					break;
+				case CRS_EXPORT_USE_SNAPSHOT:
+					appendStringInfoString(&cmd, "SNAPSHOT 'export-use'");
+					break;
 			}
 		}
 		else
@@ -942,6 +945,9 @@ libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname,
 				case CRS_USE_SNAPSHOT:
 					appendStringInfoString(&cmd, "USE_SNAPSHOT");
 					break;
+				case CRS_EXPORT_USE_SNAPSHOT:
+					elog(ERROR, "XXX CREATE_REPLICATION_SLOT ... EXPORT_USE_SNAPSHOT is not supported yet");
+					break;
 			}
 		}
 
diff --git a/src/backend/replication/logical/applyparallelworker.c b/src/backend/replication/logical/applyparallelworker.c
index 4518683779..489205436d 100644
--- a/src/backend/replication/logical/applyparallelworker.c
+++ b/src/backend/replication/logical/applyparallelworker.c
@@ -958,7 +958,7 @@ ParallelApplyWorkerMain(Datum main_arg)
 	 * Setup callback for syscache so that we know when something changes in
 	 * the subscription relation state.
 	 */
-	CacheRegisterSyscacheCallback(SUBSCRIPTIONRELMAP,
+	CacheRegisterSyscacheCallback(SUBSCRIPTIONRELOID,
 								  invalidate_syncing_table_states,
 								  (Datum) 0);
 
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index 970d170e73..9e0291d9d9 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -241,12 +241,12 @@ WaitForReplicationWorkerAttach(LogicalRepWorker *worker,
 
 /*
  * Walks the workers array and searches for one that matches given
- * subscription id and relid.
+ * subscription id and subrelid.
  *
  * We are only interested in the leader apply worker or table sync worker.
  */
 LogicalRepWorker *
-logicalrep_worker_find(Oid subid, Oid relid, bool only_running)
+logicalrep_worker_find(Oid subid, Oid subrelid, bool only_running)
 {
 	int			i;
 	LogicalRepWorker *res = NULL;
@@ -262,7 +262,7 @@ logicalrep_worker_find(Oid subid, Oid relid, bool only_running)
 		if (isParallelApplyWorker(w))
 			continue;
 
-		if (w->in_use && w->subid == subid && w->relid == relid &&
+		if (w->in_use && w->subid == subid && w->subrelid == subrelid &&
 			(!only_running || w->proc))
 		{
 			res = w;
@@ -304,7 +304,7 @@ logicalrep_workers_find(Oid subid, bool only_running)
  */
 bool
 logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid,
-						 Oid relid, dsm_handle subworker_dsm)
+						 Oid subrelid, dsm_handle subworker_dsm)
 {
 	BackgroundWorker bgw;
 	BackgroundWorkerHandle *bgw_handle;
@@ -318,7 +318,7 @@ logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid,
 	bool		is_parallel_apply_worker = (subworker_dsm != DSM_HANDLE_INVALID);
 
 	/* Sanity check - tablesync worker cannot be a subworker */
-	Assert(!(is_parallel_apply_worker && OidIsValid(relid)));
+	Assert(!(is_parallel_apply_worker && OidIsValid(subrelid)));
 
 	ereport(DEBUG1,
 			(errmsg_internal("starting logical replication worker for subscription \"%s\"",
@@ -393,7 +393,7 @@ retry:
 	 * sync worker limit per subscription. So, just return silently as we
 	 * might get here because of an otherwise harmless race condition.
 	 */
-	if (OidIsValid(relid) && nsyncworkers >= max_sync_workers_per_subscription)
+	if (OidIsValid(subrelid) && nsyncworkers >= max_sync_workers_per_subscription)
 	{
 		LWLockRelease(LogicalRepWorkerLock);
 		return false;
@@ -434,7 +434,8 @@ retry:
 	worker->dbid = dbid;
 	worker->userid = userid;
 	worker->subid = subid;
-	worker->relid = relid;
+	worker->subrelid = subrelid;
+	worker->relid = InvalidOid; /* will be filled by the tablesync worker */
 	worker->relstate = SUBREL_STATE_UNKNOWN;
 	worker->relstate_lsn = InvalidXLogRecPtr;
 	worker->stream_fileset = NULL;
@@ -463,9 +464,9 @@ retry:
 	else
 		snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyWorkerMain");
 
-	if (OidIsValid(relid))
+	if (OidIsValid(subrelid))
 		snprintf(bgw.bgw_name, BGW_MAXLEN,
-				 "logical replication worker for subscription %u sync %u", subid, relid);
+				 "logical replication worker for subscription %u sync %u", subid, subrelid);
 	else if (is_parallel_apply_worker)
 		snprintf(bgw.bgw_name, BGW_MAXLEN,
 				 "logical replication parallel apply worker for subscription %u", subid);
@@ -591,13 +592,13 @@ logicalrep_worker_stop_internal(LogicalRepWorker *worker, int signo)
  * Stop the logical replication worker for subid/relid, if any.
  */
 void
-logicalrep_worker_stop(Oid subid, Oid relid)
+logicalrep_worker_stop(Oid subid, Oid subrelid)
 {
 	LogicalRepWorker *worker;
 
 	LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
 
-	worker = logicalrep_worker_find(subid, relid, false);
+	worker = logicalrep_worker_find(subid, subrelid, false);
 
 	if (worker)
 	{
@@ -640,13 +641,13 @@ logicalrep_pa_worker_stop(int slot_no, uint16 generation)
  * Wake up (using latch) any logical replication worker for specified sub/rel.
  */
 void
-logicalrep_worker_wakeup(Oid subid, Oid relid)
+logicalrep_worker_wakeup(Oid subid, Oid subrelid)
 {
 	LogicalRepWorker *worker;
 
 	LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
 
-	worker = logicalrep_worker_find(subid, relid, true);
+	worker = logicalrep_worker_find(subid, subrelid, true);
 
 	if (worker)
 		logicalrep_worker_wakeup_ptr(worker);
@@ -760,6 +761,7 @@ logicalrep_worker_cleanup(LogicalRepWorker *worker)
 	worker->userid = InvalidOid;
 	worker->subid = InvalidOid;
 	worker->relid = InvalidOid;
+	worker->subrelid = InvalidOid;
 	worker->leader_pid = InvalidPid;
 	worker->parallel_apply = false;
 }
@@ -820,7 +822,7 @@ logicalrep_sync_worker_count(Oid subid)
 	{
 		LogicalRepWorker *w = &LogicalRepCtx->workers[i];
 
-		if (w->subid == subid && OidIsValid(w->relid))
+		if (w->subid == subid && OidIsValid(w->subrelid))
 			res++;
 	}
 
@@ -1263,8 +1265,8 @@ pg_stat_get_subscription(PG_FUNCTION_ARGS)
 		worker_pid = worker.proc->pid;
 
 		values[0] = ObjectIdGetDatum(worker.subid);
-		if (OidIsValid(worker.relid))
-			values[1] = ObjectIdGetDatum(worker.relid);
+		if (OidIsValid(worker.subrelid))
+			values[1] = ObjectIdGetDatum(worker.subrelid);
 		else
 			nulls[1] = true;
 		values[2] = Int32GetDatum(worker_pid);
diff --git a/src/backend/replication/logical/relation.c b/src/backend/replication/logical/relation.c
index 57ad22b48a..78c91f6f36 100644
--- a/src/backend/replication/logical/relation.c
+++ b/src/backend/replication/logical/relation.c
@@ -457,9 +457,9 @@ logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode)
 	}
 
 	if (entry->state != SUBREL_STATE_READY)
-		entry->state = GetSubscriptionRelState(MySubscription->oid,
-											   entry->localreloid,
-											   &entry->statelsn);
+		entry->state = GetSubscriptoinRelStateByRelid(MySubscription->oid,
+													  entry->localreloid,
+													  &entry->statelsn);
 
 	return entry;
 }
diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c
index 62542827e4..d4e1738df6 100644
--- a/src/backend/replication/logical/snapbuild.c
+++ b/src/backend/replication/logical/snapbuild.c
@@ -277,6 +277,7 @@ struct SnapBuild
  */
 static ResourceOwner SavedResourceOwnerDuringExport = NULL;
 static bool ExportInProgress = false;
+static bool UsingExportedSnapshot = false;
 
 /* ->committed and ->catchange manipulation */
 static void SnapBuildPurgeOlderTxn(SnapBuild *builder);
@@ -661,12 +662,12 @@ SnapBuildInitialSnapshot(SnapBuild *builder)
  * sure the xmin horizon hasn't advanced since then.
  */
 const char *
-SnapBuildExportSnapshot(SnapBuild *builder)
+SnapBuildExportSnapshot(SnapBuild *builder, bool use_it)
 {
 	Snapshot	snap;
 	char	   *snapname;
 
-	if (IsTransactionOrTransactionBlock())
+	if (!use_it && IsTransactionOrTransactionBlock())
 		elog(ERROR, "cannot export a snapshot from within a transaction");
 
 	if (SavedResourceOwnerDuringExport)
@@ -674,15 +675,24 @@ SnapBuildExportSnapshot(SnapBuild *builder)
 
 	SavedResourceOwnerDuringExport = CurrentResourceOwner;
 	ExportInProgress = true;
+	UsingExportedSnapshot = use_it;
 
-	StartTransactionCommand();
+	if (!use_it)
+	{
+		StartTransactionCommand();
 
-	/* There doesn't seem to a nice API to set these */
-	XactIsoLevel = XACT_REPEATABLE_READ;
-	XactReadOnly = true;
+		/* There doesn't seem to a nice API to set these */
+		XactIsoLevel = XACT_REPEATABLE_READ;
+		XactReadOnly = true;
+	}
+	else
+		Assert(IsTransactionBlock());
 
 	snap = SnapBuildInitialSnapshot(builder);
 
+	if (use_it)
+		RestoreTransactionSnapshot(snap, MyProc);
+
 	/*
 	 * now that we've built a plain snapshot, make it active and use the
 	 * normal mechanisms for exporting it
@@ -727,7 +737,7 @@ SnapBuildClearExportedSnapshot(void)
 	ResourceOwner tmpResOwner;
 
 	/* nothing exported, that is the usual case */
-	if (!ExportInProgress)
+	if (!ExportInProgress || UsingExportedSnapshot)
 		return;
 
 	if (!IsTransactionState())
@@ -753,6 +763,7 @@ SnapBuildResetExportedSnapshotState(void)
 {
 	SavedResourceOwnerDuringExport = NULL;
 	ExportInProgress = false;
+	UsingExportedSnapshot = false;
 }
 
 /*
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 0c71ae9ba7..6710d4bbcc 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -97,9 +97,12 @@
 #include "access/table.h"
 #include "access/xact.h"
 #include "catalog/indexing.h"
+#include "catalog/namespace.h"
+#include "catalog/pg_namespace.h"
 #include "catalog/pg_subscription_rel.h"
 #include "catalog/pg_type.h"
 #include "commands/copy.h"
+#include "executor/spi.h"
 #include "miscadmin.h"
 #include "nodes/makefuncs.h"
 #include "parser/parse_relation.h"
@@ -127,6 +130,8 @@ static bool FetchTableStates(bool *started_tx);
 
 static StringInfo copybuf = NULL;
 
+static Oid synchronize_table_schema(char *nspname, char *relname, char *snapshot_name);
+
 /*
  * Exit routine for synchronization worker.
  */
@@ -306,7 +311,7 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn)
 			StartTransactionCommand();
 
 		UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
-								   MyLogicalRepWorker->relid,
+								   MyLogicalRepWorker->subrelid,
 								   MyLogicalRepWorker->relstate,
 								   MyLogicalRepWorker->relstate_lsn);
 
@@ -324,7 +329,7 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn)
 		 * able to rollback dropped slot.
 		 */
 		ReplicationSlotNameForTablesync(MyLogicalRepWorker->subid,
-										MyLogicalRepWorker->relid,
+										MyLogicalRepWorker->subrelid,
 										syncslotname,
 										sizeof(syncslotname));
 
@@ -495,7 +500,7 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
 				 * Update the state to READY only after the origin cleanup.
 				 */
 				UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
-										   rstate->relid, rstate->state,
+										   rstate->oid, rstate->state,
 										   rstate->lsn);
 			}
 		}
@@ -509,7 +514,7 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
 			LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
 
 			syncworker = logicalrep_worker_find(MyLogicalRepWorker->subid,
-												rstate->relid, false);
+												rstate->oid, false);
 
 			if (syncworker)
 			{
@@ -578,7 +583,7 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
 					struct tablesync_start_time_mapping *hentry;
 					bool		found;
 
-					hentry = hash_search(last_start_times, &rstate->relid,
+					hentry = hash_search(last_start_times, &rstate->oid,
 										 HASH_ENTER, &found);
 
 					if (!found ||
@@ -589,7 +594,7 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
 												 MySubscription->oid,
 												 MySubscription->name,
 												 MyLogicalRepWorker->userid,
-												 rstate->relid,
+												 rstate->oid,
 												 DSM_HANDLE_INVALID);
 						hentry->last_start_time = now;
 					}
@@ -1225,11 +1230,11 @@ copy_table(Relation rel)
  * had changed.
  */
 void
-ReplicationSlotNameForTablesync(Oid suboid, Oid relid,
+ReplicationSlotNameForTablesync(Oid suboid, Oid subrelid,
 								char *syncslotname, Size szslot)
 {
 	snprintf(syncslotname, szslot, "pg_%u_sync_%u_" UINT64_FORMAT, suboid,
-			 relid, GetSystemIdentifier());
+			 subrelid, GetSystemIdentifier());
 }
 
 /*
@@ -1245,32 +1250,50 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 {
 	char	   *slotname;
 	char	   *err;
-	char		relstate;
-	XLogRecPtr	relstate_lsn;
+	char 	   *snapshot;
 	Relation	rel;
 	AclResult	aclresult;
+	SubscriptionRelState *relstate;
+	CRSSnapshotAction action;
 	WalRcvExecResult *res;
 	char		originname[NAMEDATALEN];
+	char		nspname[NAMEDATALEN];
+	char		relname[NAMEDATALEN];
+	uint32		syncflags;
 	RepOriginId originid;
 	bool		must_use_password;
+	bool		set_snapshot = false;
 
-	/* Check the state of the table synchronization. */
+	/*
+	 * Fetch the state of the table synchronization.
+	 *
+	 * XXX: Currently we copy some relation state data from relstate
+	 * before freeing the memory at the commit. Probably we need to find
+	 * a better way.
+	 */
 	StartTransactionCommand();
-	relstate = GetSubscriptionRelState(MyLogicalRepWorker->subid,
-									   MyLogicalRepWorker->relid,
-									   &relstate_lsn);
-	CommitTransactionCommand();
+	relstate = GetSubscriptionRelByOid(MyLogicalRepWorker->subrelid);
 
 	SpinLockAcquire(&MyLogicalRepWorker->relmutex);
-	MyLogicalRepWorker->relstate = relstate;
-	MyLogicalRepWorker->relstate_lsn = relstate_lsn;
+	MyLogicalRepWorker->relstate = relstate->state;
+	MyLogicalRepWorker->relstate_lsn = relstate->lsn;
 	SpinLockRelease(&MyLogicalRepWorker->relmutex);
 
+	if (relstate->nspname)
+		strlcpy(nspname, relstate->nspname, NAMEDATALEN);
+
+	if (relstate->relname)
+		strlcpy(relname, relstate->relname, NAMEDATALEN);
+
+	syncflags = relstate->syncflags;
+
+	CommitTransactionCommand();
+
 	/*
 	 * If synchronization is already done or no longer necessary, exit now
 	 * that we've updated shared memory state.
 	 */
-	switch (relstate)
+	switch (MyLogicalRepWorker->relstate)
 	{
 		case SUBREL_STATE_SYNCDONE:
 		case SUBREL_STATE_READY:
@@ -1281,7 +1304,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 	/* Calculate the name of the tablesync slot. */
 	slotname = (char *) palloc(NAMEDATALEN);
 	ReplicationSlotNameForTablesync(MySubscription->oid,
-									MyLogicalRepWorker->relid,
+									MyLogicalRepWorker->subrelid,
 									slotname,
 									NAMEDATALEN);
 
@@ -1309,7 +1332,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 
 	/* Assign the origin tracking record name. */
 	ReplicationOriginNameForLogicalRep(MySubscription->oid,
-									   MyLogicalRepWorker->relid,
+									   MyLogicalRepWorker->subrelid,
 									   originname,
 									   sizeof(originname));
 
@@ -1358,7 +1381,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 	/* Update the state and make it visible to others. */
 	StartTransactionCommand();
 	UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
-							   MyLogicalRepWorker->relid,
+							   MyLogicalRepWorker->subrelid,
 							   MyLogicalRepWorker->relstate,
 							   MyLogicalRepWorker->relstate_lsn);
 	CommitTransactionCommand();
@@ -1366,6 +1389,62 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 
 	StartTransactionCommand();
 
+	/*
+	 * Start a transaction in the remote node in REPEATABLE READ mode.  This
+	 * ensures that both the replication slot we create (see below) and the
+	 * COPY are consistent with each other.
+	 */
+	res = walrcv_exec(LogRepWorkerWalRcvConn,
+					  "BEGIN READ ONLY ISOLATION LEVEL REPEATABLE READ",
+					  0, NULL);
+	if (res->status != WALRCV_OK_COMMAND)
+		ereport(ERROR,
+				(errcode(ERRCODE_CONNECTION_FAILURE),
+				 errmsg("table copy could not start transaction on publisher: %s",
+						res->err)));
+	walrcv_clear_result(res);
+
+	if ((syncflags & SUBREL_SYNC_KIND_SCHEMA) != 0)
+		action = CRS_EXPORT_USE_SNAPSHOT;
+	else
+		action = CRS_USE_SNAPSHOT;
+
+	/*
+	 * Create a new permanent logical decoding slot. This slot will be used
+	 * for the catchup phase after COPY is done, so tell it to use the
+	 * snapshot to make the final data consistent.
+	 */
+	snapshot = walrcv_create_slot(LogRepWorkerWalRcvConn,
+								  slotname, false /* permanent */ , false /* two_phase */ ,
+								  action, origin_startpos);
+
+	if ((syncflags & SUBREL_SYNC_KIND_SCHEMA) != 0)
+	{
+		Oid newrelid;
+
+		Assert(snapshot);
+
+		set_snapshot = true;
+		PushActiveSnapshot(GetTransactionSnapshot());
+
+		/* Create the empty table */
+		newrelid = synchronize_table_schema(nspname, relname, snapshot);
+
+		/* Update the srrelid of the catalog */
+		UpdateSubscriptionRelRelid(MyLogicalRepWorker->subid, MyLogicalRepWorker->subrelid,
+								   newrelid);
+		MyLogicalRepWorker->relid = newrelid;
+
+		/*
+		 * XXX: Currently, schema sync has to be performed in the same transaction
+		 * as we do COPY (i.e. initial data sync). It might be preferable that we
+		 * do schema sync in a separate transaction so that we can continue from
+		 * the initial data sync even in case where we failed between them. To do
+		 * that, probably we need a new relstate (say) SUBREL_STATE_SCHEMASYNC.
+		 */
+		CommandCounterIncrement();
+	}
+
 	/*
 	 * Use a standard write lock here. It might be better to disallow access
 	 * to the table while it's being synchronized. But we don't want to block
@@ -1399,30 +1478,6 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 						GetUserNameFromId(GetUserId(), true),
 						RelationGetRelationName(rel))));
 
-	/*
-	 * Start a transaction in the remote node in REPEATABLE READ mode.  This
-	 * ensures that both the replication slot we create (see below) and the
-	 * COPY are consistent with each other.
-	 */
-	res = walrcv_exec(LogRepWorkerWalRcvConn,
-					  "BEGIN READ ONLY ISOLATION LEVEL REPEATABLE READ",
-					  0, NULL);
-	if (res->status != WALRCV_OK_COMMAND)
-		ereport(ERROR,
-				(errcode(ERRCODE_CONNECTION_FAILURE),
-				 errmsg("table copy could not start transaction on publisher: %s",
-						res->err)));
-	walrcv_clear_result(res);
-
-	/*
-	 * Create a new permanent logical decoding slot. This slot will be used
-	 * for the catchup phase after COPY is done, so tell it to use the
-	 * snapshot to make the final data consistent.
-	 */
-	walrcv_create_slot(LogRepWorkerWalRcvConn,
-					   slotname, false /* permanent */ , false /* two_phase */ ,
-					   CRS_USE_SNAPSHOT, origin_startpos);
-
 	/*
 	 * Setup replication origin tracking. The purpose of doing this before the
 	 * copy is to avoid doing the copy again due to any error in setting up
@@ -1457,8 +1512,14 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 	}
 
 	/* Now do the initial data copy */
-	PushActiveSnapshot(GetTransactionSnapshot());
-	copy_table(rel);
+	if ((syncflags & SUBREL_SYNC_KIND_DATA) != 0)
+	{
+		if (!set_snapshot)
+			PushActiveSnapshot(GetTransactionSnapshot());
+
+		copy_table(rel);
+	}
+
 	PopActiveSnapshot();
 
 	res = walrcv_exec(LogRepWorkerWalRcvConn, "COMMIT", 0, NULL);
@@ -1479,7 +1540,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 	 * visible to others.
 	 */
 	UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
-							   MyLogicalRepWorker->relid,
+							   MyLogicalRepWorker->subrelid,
 							   SUBREL_STATE_FINISHEDCOPY,
 							   MyLogicalRepWorker->relstate_lsn);
 
@@ -1637,3 +1698,92 @@ UpdateTwoPhaseState(Oid suboid, char new_state)
 	heap_freetuple(tup);
 	table_close(rel, RowExclusiveLock);
 }
+
+/*
+ * Fetch the given table definition using pg_dump and restore it. snapshot_name is
+ * the name of the snapshot exported on the publisher. Return the oid of the newly
+ * created table.
+ */
+static Oid
+synchronize_table_schema(char *nspname, char *relname, char *snapshot_name)
+{
+	FILE *handle;
+	Oid relid;
+	Oid nspoid;
+	StringInfoData command;
+	StringInfoData querybuf;
+	char full_path[MAXPGPATH];
+	char buf[1024];
+	int ret;
+
+   if (find_my_exec("pg_dump", full_path) < 0)
+	   elog(ERROR, "\"%s\" was not found", "pg_dump");
+
+   /* Open SPI context. */
+   if (SPI_connect() != SPI_OK_CONNECT)
+	   elog(ERROR, "SPI_connect failed");
+
+   /* Create namespace if not exist */
+   nspoid = get_namespace_oid(nspname, true);
+   if (!OidIsValid(nspoid))
+   {
+	   /* XXX who should be the owner of the new schema? */
+	   nspoid = NamespaceCreate(nspname, GetUserId(), false);
+	   CommandCounterIncrement();
+   }
+
+   /* Construct pg_dump command */
+   initStringInfo(&command);
+   appendStringInfo(&command, "%s -Fp --schema-only -U %s -d \"%s\" --snapshot=%s -f - -t %s.%s",
+					full_path, GetUserNameFromId(GetUserId(), true),
+					MySubscription->conninfo, snapshot_name, nspname, relname);
+   elog(LOG, "XXX pg_dump command \"%s\"", command.data);
+
+   /*
+	* Execute the pg_dump command.
+	*
+	* XXX what if the table already doesn't exist?
+	*/
+   PG_TRY();
+   {
+	   /*
+		* XXX: Currently we execute the pg_dump command with a pipe, but with this way
+		* we cannot handle the command failure. So probably we should dump schema to
+		* the file and perform DDL while reading the dump file.
+		*/
+	   handle = OpenPipeStream(command.data, "r");
+	   if (handle == NULL)
+		   elog(ERROR, "command \"%s\" failed", command.data);
+
+	   initStringInfo(&querybuf);
+	   while (fgets(buf, sizeof(buf), handle))
+	   {
+		   appendStringInfoString(&querybuf, buf);
+
+		   if (buf[strlen(buf) - 2] != ';')
+			   continue;
+
+		   ret = SPI_exec(querybuf.data, 0);
+		   if (ret != SPI_OK_UTILITY && ret != SPI_OK_SELECT)
+			   elog(ERROR, "SPI_exec failed %d: %s", ret, querybuf.data);
+
+		   resetStringInfo(&querybuf);
+	   }
+   }
+   PG_FINALLY();
+   {
+	   ClosePipeStream(handle);
+   }
+   PG_END_TRY();
+
+   CommandCounterIncrement();
+
+   /* Close SPI context */
+   if (SPI_finish() != SPI_OK_FINISH)
+	   elog(ERROR, "SPI_finish failed");
+
+   relid = get_relname_relid(relname, nspoid);
+   Assert(OidIsValid(relid));
+
+   return relid;
+}
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 37bb884127..ff81c65aa2 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -4502,10 +4502,18 @@ InitializeApplyWorker(void)
 								  (Datum) 0);
 
 	if (am_tablesync_worker())
-		ereport(LOG,
-				(errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has started",
-						MySubscription->name,
-						get_rel_name(MyLogicalRepWorker->relid))));
+	{
+		if (OidIsValid(MyLogicalRepWorker->relid))
+			ereport(LOG,
+					(errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has started",
+							MySubscription->name,
+							get_rel_name(MyLogicalRepWorker->relid))));
+		else
+			ereport(LOG,
+					(errmsg("logical replication table synchronization worker for subscription \"%s\", relid %u has started",
+							MySubscription->name,
+							MyLogicalRepWorker->subrelid)));
+	}
 	else
 		ereport(LOG,
 		/* translator: first %s is the name of logical replication worker */
@@ -4621,7 +4629,7 @@ ApplyWorkerMain(Datum main_arg)
 	 * Setup callback for syscache so that we know when something changes in
 	 * the subscription relation state.
 	 */
-	CacheRegisterSyscacheCallback(SUBSCRIPTIONRELMAP,
+	CacheRegisterSyscacheCallback(SUBSCRIPTIONRELOID,
 								  invalidate_syncing_table_states,
 								  (Datum) 0);
 
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 45b8b3684f..7f959d2765 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -1004,6 +1004,8 @@ parseCreateReplSlotOptions(CreateReplicationSlotCmd *cmd,
 				*snapshot_action = CRS_NOEXPORT_SNAPSHOT;
 			else if (strcmp(action, "use") == 0)
 				*snapshot_action = CRS_USE_SNAPSHOT;
+			else if (strcmp(action, "export-use") == 0)
+				*snapshot_action = CRS_EXPORT_USE_SNAPSHOT;
 			else
 				ereport(ERROR,
 						(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
@@ -1097,7 +1099,8 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
 
 			need_full_snapshot = true;
 		}
-		else if (snapshot_action == CRS_USE_SNAPSHOT)
+		else if (snapshot_action == CRS_USE_SNAPSHOT ||
+				 snapshot_action == CRS_EXPORT_USE_SNAPSHOT)
 		{
 			if (!IsTransactionBlock())
 				ereport(ERROR,
@@ -1158,9 +1161,9 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
 		 * snapshot when doing this.
 		 */
 		if (snapshot_action == CRS_EXPORT_SNAPSHOT)
-		{
-			snapshot_name = SnapBuildExportSnapshot(ctx->snapshot_builder);
-		}
+			snapshot_name = SnapBuildExportSnapshot(ctx->snapshot_builder, false);
+		else if (snapshot_action == CRS_EXPORT_USE_SNAPSHOT)
+			snapshot_name = SnapBuildExportSnapshot(ctx->snapshot_builder, true);
 		else if (snapshot_action == CRS_USE_SNAPSHOT)
 		{
 			Snapshot	snap;
diff --git a/src/backend/utils/cache/syscache.c b/src/backend/utils/cache/syscache.c
index 4e4a34bde8..a975d169fe 100644
--- a/src/backend/utils/cache/syscache.c
+++ b/src/backend/utils/cache/syscache.c
@@ -565,11 +565,10 @@ static const struct cachedesc cacheinfo[] = {
 		KEY(Anum_pg_subscription_oid),
 		4
 	},
-	[SUBSCRIPTIONRELMAP] = {
+	[SUBSCRIPTIONRELOID] = {
 		SubscriptionRelRelationId,
-		SubscriptionRelSrrelidSrsubidIndexId,
-		KEY(Anum_pg_subscription_rel_srrelid,
-			Anum_pg_subscription_rel_srsubid),
+		SubscriptionRelObjectIdIndexId,
+		KEY(Anum_pg_subscription_rel_oid),
 		64
 	},
 	[TABLESPACEOID] = {
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index b2bc81b15f..3600b4d7d5 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -5493,7 +5493,7 @@
   prorettype => 'record', proargtypes => 'oid',
   proallargtypes => '{oid,oid,oid,int4,int4,pg_lsn,timestamptz,timestamptz,pg_lsn,timestamptz}',
   proargmodes => '{i,o,o,o,o,o,o,o,o,o}',
-  proargnames => '{subid,subid,relid,pid,leader_pid,received_lsn,last_msg_send_time,last_msg_receipt_time,latest_end_lsn,latest_end_time}',
+  proargnames => '{subid,subid,subrelid,pid,leader_pid,received_lsn,last_msg_send_time,last_msg_receipt_time,latest_end_lsn,latest_end_time}',
   prosrc => 'pg_stat_get_subscription' },
 { oid => '2026', descr => 'statistics: current backend PID',
   proname => 'pg_backend_pid', provolatile => 's', proparallel => 'r',
diff --git a/src/include/catalog/pg_subscription_rel.h b/src/include/catalog/pg_subscription_rel.h
index 60a2bcca23..530c12f148 100644
--- a/src/include/catalog/pg_subscription_rel.h
+++ b/src/include/catalog/pg_subscription_rel.h
@@ -30,16 +30,28 @@
  */
 CATALOG(pg_subscription_rel,6102,SubscriptionRelRelationId)
 {
+	Oid			oid;	/* Oid */
 	Oid			srsubid BKI_LOOKUP(pg_subscription);	/* Oid of subscription */
-	Oid			srrelid BKI_LOOKUP(pg_class);	/* Oid of relation */
 	char		srsubstate;		/* state of the relation in subscription */
 
+	/*
+	 * schema name and table name used only when the table is not created
+	 * on the subscriber yet.
+	 */
+	NameData	srnspname BKI_FORCE_NULL;
+	NameData	srrelname BKI_FORCE_NULL;
+
+	/* What part do we need to synchronize? */
+	bool		srsyncschema;
+	bool		srsyncdata;
+
 	/*
 	 * Although srsublsn is a fixed-width type, it is allowed to be NULL, so
 	 * we prevent direct C code access to it just as for a varlena field.
 	 */
 #ifdef CATALOG_VARLEN			/* variable-length fields start here */
 
+	Oid			srrelid BKI_FORCE_NULL;
 	XLogRecPtr	srsublsn BKI_FORCE_NULL;	/* remote LSN of the state change
 											 * used for synchronization
 											 * coordination, or NULL if not
@@ -49,7 +61,8 @@ CATALOG(pg_subscription_rel,6102,SubscriptionRelRelationId)
 
 typedef FormData_pg_subscription_rel *Form_pg_subscription_rel;
 
-DECLARE_UNIQUE_INDEX_PKEY(pg_subscription_rel_srrelid_srsubid_index, 6117, SubscriptionRelSrrelidSrsubidIndexId, on pg_subscription_rel using btree(srrelid oid_ops, srsubid oid_ops));
+DECLARE_UNIQUE_INDEX_PKEY(pg_subscription_rel_oid_index, 9161, SubscriptionRelObjectIdIndexId, on pg_subscription_rel using btree(oid oid_ops));
+DECLARE_UNIQUE_INDEX(pg_subscription_rel_srrelid_srsubid_index, 6117, SubscriptionRelSrrelidSrsubidIndexId, on pg_subscription_rel using btree(srrelid oid_ops, srsubid oid_ops));
 
 #ifdef EXPOSE_TO_CLIENT_CODE
 
@@ -73,19 +86,31 @@ DECLARE_UNIQUE_INDEX_PKEY(pg_subscription_rel_srrelid_srsubid_index, 6117, Subsc
 
 #endif							/* EXPOSE_TO_CLIENT_CODE */
 
+#define SUBREL_SYNC_KIND_SCHEMA	0x01
+#define SUBREL_SYNC_KIND_DATA	0x02
+
 typedef struct SubscriptionRelState
 {
+	Oid			oid;
 	Oid			relid;
 	XLogRecPtr	lsn;
 	char		state;
+
+	char		*nspname;
+	char		*relname;
+	uint32		syncflags;	 /* OR of SUBREL_SYNC_KIND_XXX */
 } SubscriptionRelState;
 
-extern void AddSubscriptionRelState(Oid subid, Oid relid, char state,
-									XLogRecPtr sublsn);
-extern void UpdateSubscriptionRelState(Oid subid, Oid relid, char state,
+extern void AddSubscriptionRelState(Oid subid, Oid relid, char state, bool syncschema,
+									bool syncdata, XLogRecPtr sublsn, char *nspname,
+									char *relname);
+extern void UpdateSubscriptionRelRelid(Oid subid, Oid subrelid, Oid relid);
+extern void UpdateSubscriptionRelState(Oid subid, Oid subrelid, char state,
 									   XLogRecPtr sublsn);
-extern char GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn);
-extern void RemoveSubscriptionRel(Oid subid, Oid relid);
+extern char GetSubscriptionRelState(Oid subid, Oid subrelid, XLogRecPtr *sublsn);
+extern char GetSubscriptoinRelStateByRelid(Oid subid, Oid relid, XLogRecPtr *sublsn);
+extern SubscriptionRelState *GetSubscriptionRelByOid(Oid subrelid);
+extern void RemoveSubscriptionRel(Oid subid, Oid relid, Oid subrelid);
 
 extern bool HasSubscriptionRelations(Oid subid);
 extern List *GetSubscriptionRelations(Oid subid, bool not_ready);
diff --git a/src/include/replication/snapbuild.h b/src/include/replication/snapbuild.h
index f49b941b53..de43703443 100644
--- a/src/include/replication/snapbuild.h
+++ b/src/include/replication/snapbuild.h
@@ -68,7 +68,7 @@ extern void FreeSnapshotBuilder(SnapBuild *builder);
 extern void SnapBuildSnapDecRefcount(Snapshot snap);
 
 extern Snapshot SnapBuildInitialSnapshot(SnapBuild *builder);
-extern const char *SnapBuildExportSnapshot(SnapBuild *builder);
+extern const char *SnapBuildExportSnapshot(SnapBuild *builder, bool use_it);
 extern void SnapBuildClearExportedSnapshot(void);
 extern void SnapBuildResetExportedSnapshotState(void);
 
diff --git a/src/include/replication/walsender.h b/src/include/replication/walsender.h
index 9df7e50f94..b48f1c0d2a 100644
--- a/src/include/replication/walsender.h
+++ b/src/include/replication/walsender.h
@@ -21,7 +21,8 @@ typedef enum
 {
 	CRS_EXPORT_SNAPSHOT,
 	CRS_NOEXPORT_SNAPSHOT,
-	CRS_USE_SNAPSHOT
+	CRS_USE_SNAPSHOT,
+	CRS_EXPORT_USE_SNAPSHOT
 } CRSSnapshotAction;
 
 /* global state */
diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h
index dce71d2c50..8f982d4a0d 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -50,7 +50,13 @@ typedef struct LogicalRepWorker
 	/* Subscription id for the worker. */
 	Oid			subid;
 
-	/* Used for initial table synchronization. */
+	/*
+	 * Used for initial table synchronization.
+	 *
+	 * relid is an invalid oid if the table is not created on the subscriber
+	 * yet.
+	 */
+	Oid			subrelid;
 	Oid			relid;
 	char		relstate;
 	XLogRecPtr	relstate_lsn;
@@ -308,7 +314,7 @@ extern void pa_xact_finish(ParallelApplyWorkerInfo *winfo,
 static inline bool
 am_tablesync_worker(void)
 {
-	return OidIsValid(MyLogicalRepWorker->relid);
+	return OidIsValid(MyLogicalRepWorker->subrelid);
 }
 
 static inline bool
diff --git a/src/include/utils/syscache.h b/src/include/utils/syscache.h
index 67ea6e4945..1c08ff3c92 100644
--- a/src/include/utils/syscache.h
+++ b/src/include/utils/syscache.h
@@ -97,7 +97,7 @@ enum SysCacheIdentifier
 	STATRELATTINH,
 	SUBSCRIPTIONNAME,
 	SUBSCRIPTIONOID,
-	SUBSCRIPTIONRELMAP,
+	SUBSCRIPTIONRELOID,
 	TABLESPACEOID,
 	TRFOID,
 	TRFTYPELANG,
-- 
2.31.1

#47Wei Wang (Fujitsu)
wangw.fnst@fujitsu.com
In reply to: Masahiko Sawada (#46)
RE: Initial Schema Sync for Logical Replication

On Fri, Apr 21, 2023 at 16:48 PM Masahiko Sawada <sawada.mshk@gmail.com> wrote:

On Thu, Apr 20, 2023 at 8:16 PM Amit Kapila <amit.kapila16@gmail.com> wrote:

On Mon, Apr 17, 2023 at 9:12 AM Masahiko Sawada

<sawada.mshk@gmail.com> wrote:

On Fri, Apr 7, 2023 at 6:37 PM Amit Kapila <amit.kapila16@gmail.com> wrote:

On Thu, Apr 6, 2023 at 6:57 PM Masahiko Sawada

<sawada.mshk@gmail.com> wrote:

While writing a PoC patch, I found some difficulties in this idea.
First, I tried to add schemaname+relname to pg_subscription_rel but I
could not define the primary key of pg_subscription_rel. The primary
key on (srsubid, srrelid) doesn't work since srrelid could be NULL.
Similarly, the primary key on (srsubid, srrelid, schemaname, relname)
also doesn't work.

Can we think of having a separate catalog table say
pg_subscription_remote_rel for this? You can have srsubid,
remote_schema_name, remote_rel_name, etc. We may need some other

state

to be maintained during the initial schema sync where this table can
be used. Basically, this can be used to maintain the state till the
initial schema sync is complete because we can create a relation entry
in pg_subscritption_rel only after the initial schema sync is
complete.

It might not be ideal but I guess it works. But I think we need to
modify the name of replication slot for initial sync as it currently
includes OID of the table:

void
ReplicationSlotNameForTablesync(Oid suboid, Oid relid,
char *syncslotname, Size szslot)
{
snprintf(syncslotname, szslot, "pg_%u_sync_%u_" UINT64_FORMAT,

suboid,

relid, GetSystemIdentifier());
}

If we use both schema name and table name, it's possible that slot
names are duplicated if schema and/or table names are long. Another
idea is to use the hash value of schema+table names, but it cannot
completely eliminate that possibility, and probably would make
investigation and debugging hard in case of any failure. Probably we
can use the OID of each entry in pg_subscription_remote_rel instead,
but I'm not sure it's a good idea, mainly because we will end up using
twice as many OIDs as before.

The other possibility is to use worker_pid. To make debugging easier,
we may want to LOG schema_name+rel_name vs slot_name mapping at

DEBUG1

log level.

Since worker_pid changes after the worker restarts, a new worker
cannot find the slot that had been used, no?

After thinking it over, a better solution would be that we add an oid
column, nspname column, and relname column to pg_subscription_rel and
the primary key on the oid. If the table is not present on the
subscriber we store the schema name and table name to the catalog, and
otherwise we store the local table oid same as today. The local table
oid will be filled after the schema sync. The names of origin and
replication slot the tablesync worker uses use the oid instead of the
table oid.

I've attached a PoC patch of this idea (very rough patch and has many
TODO comments). It mixes the following changes:

1. Add oid column to the pg_subscription_rel. The oid is used as the
primary key and in the names of origin and slot the tablesync workers
use.

2. Add copy_schema = on/off option to CREATE SUBSCRIPTION (not yet
support for ALTER SUBSCRIPTION).

3. Add CRS_EXPORT_USE_SNAPSHOT new action in order to use the same
snapshot by both walsender and other processes (e.g. pg_dump). In this
patch, the snapshot is exported for pg_dump and is used by the
walsender for COPY.

It seems to work well but there might be a pitfall as I've not fully
implemented it.

Thanks for your POC patch.
After reviewing this patch, I have a question below that want to confirm:

1. In the function synchronize_table_schema.
I think some changes to GUC and table-related object SQLs are included in the
pg_dump result. And in this POC, these SQLs will be executed. Do we need to
alter the pg_dump results to only execute the table schema related SQLs?
For example, if we have below table schema in the publisher-side:
```
create table tbl(a int, b int);
create index idx_t on tbl (a);
CREATE FUNCTION trigger_func() RETURNS TRIGGER LANGUAGE PLPGSQL AS $$ BEGIN INSERT INTO public.tbl VALUES (NEW.*); RETURN NEW; END; $$;
CREATE TRIGGER tri_tbl BEFORE INSERT ON public.tbl FOR EACH ROW EXECUTE PROCEDURE trigger_func();
```
The result of pg_dump executed on the subscriber-side:
```
SET statement_timeout = 0;
SET lock_timeout = 0;
SET idle_in_transaction_session_timeout = 0;
SET client_encoding = 'UTF8';
SET standard_conforming_strings = on;
SELECT pg_catalog.set_config('search_path', '', false);
SET check_function_bodies = false;
SET xmloption = content;
SET client_min_messages = warning;
SET row_security = off;
SET default_tablespace = '';
SET default_table_access_method = heap;

CREATE TABLE public.tbl (
a integer,
b integer
);

ALTER TABLE public.tbl OWNER TO postgres;

CREATE INDEX idx_t ON public.tbl USING btree (a);

CREATE TRIGGER tri_tbl BEFORE INSERT ON public.tbl FOR EACH ROW EXECUTE FUNCTION public.trigger_func();
```
And this will cause an error when `CREATE TRIGGER` because we did not dump the
function trigger_func.

Regards,
Wang Wei

#48Masahiko Sawada
sawada.mshk@gmail.com
In reply to: Wei Wang (Fujitsu) (#47)
Re: Initial Schema Sync for Logical Replication

On Thu, Apr 27, 2023 at 12:02 PM Wei Wang (Fujitsu)
<wangw.fnst@fujitsu.com> wrote:

On Fri, Apr 21, 2023 at 16:48 PM Masahiko Sawada <sawada.mshk@gmail.com> wrote:

On Thu, Apr 20, 2023 at 8:16 PM Amit Kapila <amit.kapila16@gmail.com> wrote:

On Mon, Apr 17, 2023 at 9:12 AM Masahiko Sawada

<sawada.mshk@gmail.com> wrote:

On Fri, Apr 7, 2023 at 6:37 PM Amit Kapila <amit.kapila16@gmail.com> wrote:

On Thu, Apr 6, 2023 at 6:57 PM Masahiko Sawada

<sawada.mshk@gmail.com> wrote:

While writing a PoC patch, I found some difficulties in this idea.
First, I tried to add schemaname+relname to pg_subscription_rel but I
could not define the primary key of pg_subscription_rel. The primary
key on (srsubid, srrelid) doesn't work since srrelid could be NULL.
Similarly, the primary key on (srsubid, srrelid, schemaname, relname)
also doesn't work.

Can we think of having a separate catalog table say
pg_subscription_remote_rel for this? You can have srsubid,
remote_schema_name, remote_rel_name, etc. We may need some other

state

to be maintained during the initial schema sync where this table can
be used. Basically, this can be used to maintain the state till the
initial schema sync is complete because we can create a relation entry
in pg_subscritption_rel only after the initial schema sync is
complete.

It might not be ideal but I guess it works. But I think we need to
modify the name of replication slot for initial sync as it currently
includes OID of the table:

void
ReplicationSlotNameForTablesync(Oid suboid, Oid relid,
char *syncslotname, Size szslot)
{
snprintf(syncslotname, szslot, "pg_%u_sync_%u_" UINT64_FORMAT,

suboid,

relid, GetSystemIdentifier());
}

If we use both schema name and table name, it's possible that slot
names are duplicated if schema and/or table names are long. Another
idea is to use the hash value of schema+table names, but it cannot
completely eliminate that possibility, and probably would make
investigation and debugging hard in case of any failure. Probably we
can use the OID of each entry in pg_subscription_remote_rel instead,
but I'm not sure it's a good idea, mainly because we will end up using
twice as many OIDs as before.

The other possibility is to use worker_pid. To make debugging easier,
we may want to LOG schema_name+rel_name vs slot_name mapping at

DEBUG1

log level.

Since worker_pid changes after the worker restarts, a new worker
cannot find the slot that had been used, no?

After thinking it over, a better solution would be that we add an oid
column, nspname column, and relname column to pg_subscription_rel and
the primary key on the oid. If the table is not present on the
subscriber we store the schema name and table name to the catalog, and
otherwise we store the local table oid same as today. The local table
oid will be filled after the schema sync. The names of origin and
replication slot the tablesync worker uses use the oid instead of the
table oid.

I've attached a PoC patch of this idea (very rough patch and has many
TODO comments). It mixes the following changes:

1. Add oid column to the pg_subscription_rel. The oid is used as the
primary key and in the names of origin and slot the tablesync workers
use.

2. Add copy_schema = on/off option to CREATE SUBSCRIPTION (not yet
support for ALTER SUBSCRIPTION).

3. Add CRS_EXPORT_USE_SNAPSHOT new action in order to use the same
snapshot by both walsender and other processes (e.g. pg_dump). In this
patch, the snapshot is exported for pg_dump and is used by the
walsender for COPY.

It seems to work well but there might be a pitfall as I've not fully
implemented it.

Thanks for your POC patch.
After reviewing this patch, I have a question below that want to confirm:

1. In the function synchronize_table_schema.
I think some changes to GUC and table-related object SQLs are included in the
pg_dump result. And in this POC, these SQLs will be executed. Do we need to
alter the pg_dump results to only execute the table schema related SQLs?

Yes, in this approach, we need to dump/restore objects while
specifying with fine granularity. Ideally, the table sync worker dumps
and restores the table schema, does copy the initial data, and then
creates indexes, and triggers and table-related objects are created
after that. So if we go with the pg_dump approach to copy the schema
of individual tables, we need to change pg_dump (or libpgdump needs to
be able to do) to support it.

Regards,

--
Masahiko Sawada
Amazon Web Services: https://aws.amazon.com

#49Masahiko Sawada
sawada.mshk@gmail.com
In reply to: Masahiko Sawada (#48)
Re: Initial Schema Sync for Logical Replication

On Fri, Apr 28, 2023 at 4:16 PM Masahiko Sawada <sawada.mshk@gmail.com> wrote:

Yes, in this approach, we need to dump/restore objects while
specifying with fine granularity. Ideally, the table sync worker dumps
and restores the table schema, does copy the initial data, and then
creates indexes, and triggers and table-related objects are created
after that. So if we go with the pg_dump approach to copy the schema
of individual tables, we need to change pg_dump (or libpgdump needs to
be able to do) to support it.

We have been discussing how to sync schema but I'd like to step back a
bit and discuss use cases and requirements of this feature.

Suppose that a table belongs to a publication, what objects related to
the table we want to sync by the initial schema sync features? IOW, do
we want to sync table's ACLs, tablespace settings, triggers, and
security labels too?

If we want to replicate the whole database, e.g. when using logical
replication for major version upgrade, it would be convenient if it
synchronizes all table-related objects. However, if we have only this
option, it could be useless in some cases. For example, in a case
where users have different database users on the subscriber than the
publisher, they might want to sync only CREATE TABLE, and set ACL etc
by themselves. In this case, it would not be necessary to sync ACL and
security labels.

What use case do we want to support by this feature? I think the
implementation could be varied depending on how to select what objects
to sync.

One possible idea is to select objects to sync depending on how DDL
replication is set in the publisher. It's straightforward but I'm not
sure the design of DDL replication syntax has been decided. Also, even
if we create a publication with ddl = 'table' option, it's not clear
to me that we want to sync table-dependent triggers, indexes, and
rules too by the initial sync feature.

Second idea is to make it configurable by users so that they can
specify what objects to sync. But it would make the feature complex
and I'm not sure users can use it properly.

Third idea is that since the use case of synchronizing the whole
database can be achievable even by pg_dump(all), we support
synchronizing only tables (+ indexes) in the initial sync feature,
which can not be achievable by pg_dump.

Feedback is very welcome.

Regards,

--
Masahiko Sawada
Amazon Web Services: https://aws.amazon.com

#50Amit Kapila
amit.kapila16@gmail.com
In reply to: Masahiko Sawada (#49)
Re: Initial Schema Sync for Logical Replication

On Mon, May 22, 2023 at 6:37 AM Masahiko Sawada <sawada.mshk@gmail.com> wrote:

On Fri, Apr 28, 2023 at 4:16 PM Masahiko Sawada <sawada.mshk@gmail.com> wrote:

Yes, in this approach, we need to dump/restore objects while
specifying with fine granularity. Ideally, the table sync worker dumps
and restores the table schema, does copy the initial data, and then
creates indexes, and triggers and table-related objects are created
after that. So if we go with the pg_dump approach to copy the schema
of individual tables, we need to change pg_dump (or libpgdump needs to
be able to do) to support it.

We have been discussing how to sync schema but I'd like to step back a
bit and discuss use cases and requirements of this feature.

Suppose that a table belongs to a publication, what objects related to
the table we want to sync by the initial schema sync features? IOW, do
we want to sync table's ACLs, tablespace settings, triggers, and
security labels too?

If we want to replicate the whole database, e.g. when using logical
replication for major version upgrade, it would be convenient if it
synchronizes all table-related objects. However, if we have only this
option, it could be useless in some cases. For example, in a case
where users have different database users on the subscriber than the
publisher, they might want to sync only CREATE TABLE, and set ACL etc
by themselves. In this case, it would not be necessary to sync ACL and
security labels.

What use case do we want to support by this feature? I think the
implementation could be varied depending on how to select what objects
to sync.

One possible idea is to select objects to sync depending on how DDL
replication is set in the publisher. It's straightforward but I'm not
sure the design of DDL replication syntax has been decided. Also, even
if we create a publication with ddl = 'table' option, it's not clear
to me that we want to sync table-dependent triggers, indexes, and
rules too by the initial sync feature.

I think it is better to keep the initial sync the same as the
replication. So, if the publication specifies 'table' then we should
just synchronize tables. Otherwise, it will look odd that the initial
sync has synchronized say index-related DDLs but then later
replication didn't replicate it. OTOH, if we want to do initial sync
of table-dependent objects like triggers, indexes, rules, etc. when
the user has specified ddl = 'table' then the replication should also
follow the same. The main reason to exclude the other objects during
replication is to reduce the scope of deparsing patch but if we have a
finite set of objects (say all dependent on the table) then we can
probably try to address those.

Second idea is to make it configurable by users so that they can
specify what objects to sync. But it would make the feature complex
and I'm not sure users can use it properly.

Third idea is that since the use case of synchronizing the whole
database can be achievable even by pg_dump(all), we support
synchronizing only tables (+ indexes) in the initial sync feature,
which can not be achievable by pg_dump.

Can't we add some switch to dump only the table and not its dependents
if we want to go with that approach?

--
With Regards,
Amit Kapila.

#51Masahiko Sawada
sawada.mshk@gmail.com
In reply to: Amit Kapila (#50)
Re: Initial Schema Sync for Logical Replication

On Tue, May 23, 2023 at 2:31 PM Amit Kapila <amit.kapila16@gmail.com> wrote:

On Mon, May 22, 2023 at 6:37 AM Masahiko Sawada <sawada.mshk@gmail.com> wrote:

On Fri, Apr 28, 2023 at 4:16 PM Masahiko Sawada <sawada.mshk@gmail.com> wrote:

Yes, in this approach, we need to dump/restore objects while
specifying with fine granularity. Ideally, the table sync worker dumps
and restores the table schema, does copy the initial data, and then
creates indexes, and triggers and table-related objects are created
after that. So if we go with the pg_dump approach to copy the schema
of individual tables, we need to change pg_dump (or libpgdump needs to
be able to do) to support it.

We have been discussing how to sync schema but I'd like to step back a
bit and discuss use cases and requirements of this feature.

Suppose that a table belongs to a publication, what objects related to
the table we want to sync by the initial schema sync features? IOW, do
we want to sync table's ACLs, tablespace settings, triggers, and
security labels too?

If we want to replicate the whole database, e.g. when using logical
replication for major version upgrade, it would be convenient if it
synchronizes all table-related objects. However, if we have only this
option, it could be useless in some cases. For example, in a case
where users have different database users on the subscriber than the
publisher, they might want to sync only CREATE TABLE, and set ACL etc
by themselves. In this case, it would not be necessary to sync ACL and
security labels.

What use case do we want to support by this feature? I think the
implementation could be varied depending on how to select what objects
to sync.

One possible idea is to select objects to sync depending on how DDL
replication is set in the publisher. It's straightforward but I'm not
sure the design of DDL replication syntax has been decided. Also, even
if we create a publication with ddl = 'table' option, it's not clear
to me that we want to sync table-dependent triggers, indexes, and
rules too by the initial sync feature.

I think it is better to keep the initial sync the same as the
replication. So, if the publication specifies 'table' then we should
just synchronize tables. Otherwise, it will look odd that the initial
sync has synchronized say index-related DDLs but then later
replication didn't replicate it. OTOH, if we want to do initial sync
of table-dependent objects like triggers, indexes, rules, etc. when
the user has specified ddl = 'table' then the replication should also
follow the same. The main reason to exclude the other objects during
replication is to reduce the scope of deparsing patch but if we have a
finite set of objects (say all dependent on the table) then we can
probably try to address those.

We have discussed several ideas of how to synchronize schemas between
publisher and subscribers, and the points are summarized in Wiki
page[1]https://wiki.postgresql.org/wiki/Logical_replication_of_DDLs#Initial_Schema_Sync. As for the idea of using pg_dump, we were concerned that
pg_dump needs to be present along with the server binary if the user
needs to use the initial schema synchronization feature. Since these
binaries are typically included in different packages, they need to
install both. During PGCon we've discussed with some senior hackers
that it would be an acceptable limitation for users. When executing
CREATE/ALTER SUBSCRIPTION, we check if pg_dump is available and raise
an error if not. We've also discussed the idea of using
pg_dump_library but no one preferred this idea because of its
implementation costs. Therefore, I'm going to do further evaluation
for the pg_dump idea.

I agree with Amit that the initial schema synchronization should
process the same as the DDL replication. We can support only table
schemas as the first step. To do that, we need a new switch, say
--exclude-table-dependents, in pg_dump to dump only table schemas
excluding table-related objects such as triggers and indexes. Then, we
can support synchronizing tables and table-related objects such as
triggers, indexes, and rules, as the second step, which can be done
with the --schema and --table option. Finally, we can synchronize the
whole database by using the --schema option.

We also need to research how to integrate the initial schema
synchronization with tablesync workers. We have a PoC patch[2]/messages/by-id/CAD21AoCdfg506__qKz+HX8vqfdyKgQ5qeehgqq9bi1L-6p5Pwg@mail.gmail.com.

Regards,

[1]: https://wiki.postgresql.org/wiki/Logical_replication_of_DDLs#Initial_Schema_Sync
[2]: /messages/by-id/CAD21AoCdfg506__qKz+HX8vqfdyKgQ5qeehgqq9bi1L-6p5Pwg@mail.gmail.com

--
Masahiko Sawada
Amazon Web Services: https://aws.amazon.com

#52Peter Smith
smithpb2250@gmail.com
In reply to: Masahiko Sawada (#51)
Re: Initial Schema Sync for Logical Replication

On Thu, Jun 8, 2023 at 1:24 PM Masahiko Sawada <sawada.mshk@gmail.com> wrote:

...

We also need to research how to integrate the initial schema
synchronization with tablesync workers. We have a PoC patch[2].

Regards,

[1] https://wiki.postgresql.org/wiki/Logical_replication_of_DDLs#Initial_Schema_Sync
[2] /messages/by-id/CAD21AoCdfg506__qKz+HX8vqfdyKgQ5qeehgqq9bi1L-6p5Pwg@mail.gmail.com

FYI -- the PoC patch fails to apply using HEAD fetched today.

git apply ../patches_misc/0001-Poc-initial-table-structure-synchronization-in-logic.patch
error: patch failed: src/backend/replication/logical/tablesync.c:1245
error: src/backend/replication/logical/tablesync.c: patch does not apply

------
Kind Regards,
Peter Smith.
Fujitsu Australia

#53Peter Smith
smithpb2250@gmail.com
In reply to: Peter Smith (#52)
Re: Initial Schema Sync for Logical Replication

On Thu, Jun 15, 2023 at 4:14 PM Peter Smith <smithpb2250@gmail.com> wrote:

On Thu, Jun 8, 2023 at 1:24 PM Masahiko Sawada <sawada.mshk@gmail.com> wrote:

...

We also need to research how to integrate the initial schema
synchronization with tablesync workers. We have a PoC patch[2].

Regards,

[1] https://wiki.postgresql.org/wiki/Logical_replication_of_DDLs#Initial_Schema_Sync
[2] /messages/by-id/CAD21AoCdfg506__qKz+HX8vqfdyKgQ5qeehgqq9bi1L-6p5Pwg@mail.gmail.com

FYI -- the PoC patch fails to apply using HEAD fetched today.

git apply ../patches_misc/0001-Poc-initial-table-structure-synchronization-in-logic.patch
error: patch failed: src/backend/replication/logical/tablesync.c:1245
error: src/backend/replication/logical/tablesync.c: patch does not apply

After rebasing the PoC patch locally, I found the 'make check' still
did not pass 100%.

# 2 of 215 tests failed.

Here are the differences:

diff -U3 /home/postgres/oss_postgres_misc/src/test/regress/expected/rules.out
/home/postgres/oss_postgres_misc/src/test/regress/results/rules.out
--- /home/postgres/oss_postgres_misc/src/test/regress/expected/rules.out
   2023-06-02 23:12:32.073864475 +1000
+++ /home/postgres/oss_postgres_misc/src/test/regress/results/rules.out
2023-06-15 16:53:29.352622676 +1000
@@ -2118,14 +2118,14 @@
     su.subname,
     st.pid,
     st.leader_pid,
-    st.relid,
+    st.subrelid,
     st.received_lsn,
     st.last_msg_send_time,
     st.last_msg_receipt_time,
     st.latest_end_lsn,
     st.latest_end_time
    FROM (pg_subscription su
-     LEFT JOIN pg_stat_get_subscription(NULL::oid) st(subid, relid,
pid, leader_pid, received_lsn, last_msg_send_time,
last_msg_receipt_time, latest_end_lsn, latest_end_time) ON ((st.subid
= su.oid)));
+     LEFT JOIN pg_stat_get_subscription(NULL::oid) st(subid,
subrelid, pid, leader_pid, received_lsn, last_msg_send_time,
last_msg_receipt_time, latest_end_lsn, latest_end_time) ON ((st.subid
= su.oid)));
 pg_stat_subscription_stats| SELECT ss.subid,
     s.subname,
     ss.apply_error_count,
diff -U3 /home/postgres/oss_postgres_misc/src/test/regress/expected/oidjoins.out
/home/postgres/oss_postgres_misc/src/test/regress/results/oidjoins.out
--- /home/postgres/oss_postgres_misc/src/test/regress/expected/oidjoins.out
2022-10-04 15:11:32.457834981 +1100
+++ /home/postgres/oss_postgres_misc/src/test/regress/results/oidjoins.out
 2023-06-15 16:54:07.159839010 +1000
@@ -265,4 +265,3 @@
 NOTICE:  checking pg_subscription {subdbid} => pg_database {oid}
 NOTICE:  checking pg_subscription {subowner} => pg_authid {oid}
 NOTICE:  checking pg_subscription_rel {srsubid} => pg_subscription {oid}
-NOTICE:  checking pg_subscription_rel {srrelid} => pg_class {oid}

------
Kind Regards,
Peter Smith.
Fujitsu Australia

#54Peter Smith
smithpb2250@gmail.com
In reply to: Masahiko Sawada (#46)
Re: Initial Schema Sync for Logical Replication

Hi,

Below are my review comments for the PoC patch 0001.

In addition, the patch needed rebasing, and, after I rebased it
locally in my private environment there were still test failures:
a) The 'make check' tests fail but only in a minor way due to changes colname
b) the subscription TAP test did not work at all for me -- many errors.

======
Commit message.

1.
- Add oid column to the pg_subscription_rel.
- use it as the primary key.
- use it in the names of origin and slot the tablesync workers use.

~

IIUC, I think there were lots of variables called 'subrelid' referring
to this new 'oid' field. But, somehow I found that very confusing with
the other similarly named 'relid'. I wonder if all those can be named
like 'sroid' or 'srid' to reduce the confusion of such similar names?

======
src/backend/catalog/pg_subscription.c

2. AddSubscriptionRelState

I felt should be some sanity check Asserts for the args here. E.g.
Cannot have valid relid when copy_schema == true, etc.

~~~

3.
+ if (nspname)
+ values[Anum_pg_subscription_rel_srnspname - 1] = CStringGetDatum(nspname);
+ else
+ nulls[Anum_pg_subscription_rel_srnspname - 1] = true;
+
+ if (relname)
+ values[Anum_pg_subscription_rel_srrelname - 1] = CStringGetDatum(relname);
+ else
+ nulls[Anum_pg_subscription_rel_srrelname - 1] = true;

Here is where I was wondering why not pass the nspname and relname all
the time, even for valid 'relid' (when copy_schema is false). It
should simplify some code, as well as putting more useful/readable
information into the catalog.

~~~

4. UpdateSubscriptionRelRelid

+ /* XXX: need to distinguish from message in UpdateSubscriptionRelState() */
+ if (!HeapTupleIsValid(tup))
+ elog(ERROR, "subscription table %u in subscription %u does not exist",
+ subrelid, subid);

Is that ERROR msg correct? IIUC the 'subrelid' is the Oid of the row
in the catalog -- it is not the "subscription table" Oid.

~~~

5. UpdateSubscriptionRelState

  if (!HeapTupleIsValid(tup))
  elog(ERROR, "subscription table %u in subscription %u does not exist",
- relid, subid);
+ subrelid, subid);

(ditto previous review comment)

Is that ERROR msg correct? IIUC the subrelid is the Oid of the row in
the catalog -- it is not the "subscription table" Oid.

~~~

6. GetSubscriptoinRelStateByRelid

There is a spelling mistake in this function name

/Subscriptoin/Subscription/

~~~

7.
+ ScanKeyInit(&skey[0],
+ Anum_pg_subscription_rel_srrelid,
+ BTEqualStrategyNumber, F_OIDEQ,
+ ObjectIdGetDatum(relid));
+ ScanKeyInit(&skey[1],
+ Anum_pg_subscription_rel_srsubid,
+ BTEqualStrategyNumber, F_OIDEQ,
+ ObjectIdGetDatum(subid));

Won't it be better to swap the order of these so it matches the
function comment "(srsubid, srrelid)".

~~~

8.
+ tup = systable_getnext(scan);
+
+
+ if (!HeapTupleIsValid(tup))

Double blank lines

~~~

9.
/* Get palloc'ed SubscriptionRelState of the given subrelid */
SubscriptionRelState *
GetSubscriptionRelByOid(Oid subrelid)

~

There seems some function name confusion because the struct is called
SubscriptionRelState and it also has a 'state' field.

e.g. The functions named GetSubscriptionRelStateXXX return only the
state field of the struct. OTOH, this function returns the
SubscriptionRelState* but it is NOT called
GetSubscriptionRelStateByOid (??).

~~~

10. deconstruct_subrelstate

+ /* syncflags */
+ relstate->syncflags =
+ (((subrel_form->srsyncschema) ? SUBREL_SYNC_KIND_SCHEMA : 0) |
+ ((subrel_form->srsyncdata) ? SUBREL_SYNC_KIND_DATA : 0));

Seems excessive parens.

~~~

11.
+ return relstate;
+}
 /*
  * Drop subscription relation mapping. These can be for a particular
  * subscription, or for a particular relation, or both.
  */
 void
-RemoveSubscriptionRel(Oid subid, Oid relid)
+RemoveSubscriptionRel(Oid subid, Oid relid, Oid subrelid)

~

There is no blank line before this function

~~~

12. RemoveSubscriptionRel

-RemoveSubscriptionRel(Oid subid, Oid relid)
+RemoveSubscriptionRel(Oid subid, Oid relid, Oid subrelid)
 {

~

IIUC what you called 'subrelid' is the PK, so would it make more sense
for that to be the 1st parameter for this function?

======
src/backend/commands/subscriptioncmds.c

13. struct SubOpts

  bool copy_data;
+ /* XXX: want to choose synchronizing only tables or all objects? */
+ bool copy_schema;

I wonder if it would be more natural to put the 'copy_schema' field
before the 'copy_data' field?

~~~

14. parse_subscription_options

  if (IsSet(supported_opts, SUBOPT_COPY_DATA))
  opts->copy_data = true;
+ if (IsSet(supported_opts, SUBOPT_COPY_SCHEMA))
+ opts->copy_data = true;

14a.
I wonder if it would be more natural to put the COPY_SCHEMA logic
before the COPY_DATA logic?

~

14b.
Is this a bug? Why is this assigning copy_data = true, instead of
copy_schema = true?

~~~

15.
  opts->specified_opts |= SUBOPT_COPY_DATA;
  opts->copy_data = defGetBoolean(defel);
  }
+ else if (IsSet(supported_opts, SUBOPT_COPY_SCHEMA) &&
+ strcmp(defel->defname, "copy_schema") == 0)
+ {
+ if (IsSet(opts->specified_opts, SUBOPT_COPY_SCHEMA))
+ errorConflictingDefElem(defel, pstate);
+
+ opts->specified_opts |= SUBOPT_COPY_SCHEMA;
+ opts->copy_schema = defGetBoolean(defel);
+ }

I wonder if it would be more natural to put the COPY_SCHEMA logic
before the COPY_DATA logic?

~~~

16.
+ if (opts->copy_schema &&
+ IsSet(opts->specified_opts, SUBOPT_COPY_SCHEMA))
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("%s and %s are mutually exclusive options",
+ "connect = false", "copy_schema = true")));
+

I wonder if it would be more natural to put the COPY_SCHEMA logic
before the COPY_DATA logic?

~~~

17. CreateSubscription

  * Set sync state based on if we were asked to do data copy or
  * not.
  */
- table_state = opts.copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY;
+ if (opts.copy_data || opts.copy_schema)
+ table_state = SUBREL_STATE_INIT;
+ else
+ table_state = SUBREL_STATE_READY;

The comment prior to this code needs updating, it still only mentions
"data copy".

~~~

18. AlterSubscription_refresh

+ sub_remove_rels[remove_rel_len].relid = subrelid;
sub_remove_rels[remove_rel_len++].state = state;
~

Is that right?

IIUC that 'subrelid' is the OID PK of the row in pg_subscription_rel,
which is not the same as the 'relid'.

Shouldn't this be sub_remove_rels[remove_rel_len].relid = relstate->relid;

~~~

19.
+ if (OidIsValid(relstate->relid))
+ ereport(DEBUG1,
+ (errmsg_internal("table \"%s.%s\" removed from subscription \"%s\"",
+ get_namespace_name(get_rel_namespace(relstate->relid)),
+ get_rel_name(relstate->relid),
+ sub->name)));
+ else
+ ereport(DEBUG1,
+ (errmsg_internal("table \"%s.%s\" removed from subscription \"%s\"",
+ relstate->nspname, relstate->relname,
+ sub->name)));

I wondered why can't we just always store nspname and relname even for
the valid 'relid' when there is no copy_schema? Won't that simplify
code such as this?

======
src/backend/replication/logical/launcher.c

20. logicalrep_worker_find

- if (w->in_use && w->subid == subid && w->relid == relid &&
+ if (w->in_use && w->subid == subid && w->subrelid == subrelid &&
  (!only_running || w->proc))
  {

~

Maybe I misunderstand something, but somehow it seems strange to be
checking both the 'subid' and the the Oid PK ('subrelid') here. Isn't
it that when subrelid is valid you need to test only 'subrelid' (aka
tablesync) for equality? But when subrelid is InvalidOid (aka not a
tablesync worker) you only need to test subid for equality?

~~~

21. logicalrep_worker_launch

bool is_parallel_apply_worker = (subworker_dsm != DSM_HANDLE_INVALID);

  /* Sanity check - tablesync worker cannot be a subworker */
- Assert(!(is_parallel_apply_worker && OidIsValid(relid)));
+ Assert(!(is_parallel_apply_worker && OidIsValid(subrelid)));

IIUC I thought this code might be easier to understand if you
introduced another variable

bool is_tabslync_worker = OidIsValid(subrelid);

~~~

22.
+ if (OidIsValid(subrelid) && nsyncworkers >= max_sync_workers_per_subscription)

(ditto previous comment)

~~~

23.
- if (OidIsValid(relid))
+ if (OidIsValid(subrelid))
  snprintf(bgw.bgw_name, BGW_MAXLEN,
- "logical replication worker for subscription %u sync %u", subid, relid);
+ "logical replication worker for subscription %u sync %u", subid, subrelid);

This name seems somehow less useful to the user now. IIUC 'subrelid'
is just the PK of the pg_subscription_rel_catalog instead of the
relid. Does this require changes to the documentation that might have
been saying this is the relid?

~~~

24. logicalrep_worker_stop

  * Stop the logical replication worker for subid/relid, if any.
  */
 void
-logicalrep_worker_stop(Oid subid, Oid relid)
+logicalrep_worker_stop(Oid subid, Oid subrelid)

The function comment still is talking about relid.

======
src/backend/replication/logical/snapbuild.c

25. SnapBuildExportSnapshot

-SnapBuildExportSnapshot(SnapBuild *builder)
+SnapBuildExportSnapshot(SnapBuild *builder, bool use_it)

'use_it' does not see a good parameter name. At least, maybe the
function comment can describe the meaning of use_it.

~~~

26.
- /* There doesn't seem to a nice API to set these */
- XactIsoLevel = XACT_REPEATABLE_READ;
- XactReadOnly = true;
+ /* There doesn't seem to a nice API to set these */
+ XactIsoLevel = XACT_REPEATABLE_READ;
+ XactReadOnly = true;
+ }
+ else
+ Assert(IsTransactionBlock());

Although it is not introduced by this patch, since you change the
indent on this line you might as well at the same time fix the typo on
this line.

/seem to be nice/seem to be a nice/

======
src/backend/replication/logical/tablesync.c

27. process_syncing_tables_for_sync

UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
- MyLogicalRepWorker->relid,
+ MyLogicalRepWorker->subrelid,
MyLogicalRepWorker->relstate,
MyLogicalRepWorker->relstate_lsn);

IIUC the 'subrelid' is now the PK. Isn't it better for that to be the 1st param?

~~~

28.

+ if ((syncflags & SUBREL_SYNC_KIND_SCHEMA) != 0)

There are several checks like the code shown above. Would it be better
to have some macro for that expression? Or maybe simply assign this
result to a local variable instead of testing the same thing multiple
times.

~~~

29. synchronize_table_schema

FILE *handle;
Oid relid;
Oid nspoid;
StringInfoData command;
StringInfoData querybuf;
char full_path[MAXPGPATH];
char buf[1024];
int ret;

if (find_my_exec("pg_dump", full_path) < 0)
elog(ERROR, "\"%s\" was not found", "pg_dump")

~

Something is not quite right with the indentation in this new function.

~~~

30.
+ * XXX what if the table already doesn't exist?

I didn't understand the meaning of the comment. Is it supposed to say
"What if the table already exists?" (??)

======
src/backend/replication/logical/worker.c

31. InitializeApplyWorker

+ {
+ if (OidIsValid(MyLogicalRepWorker->relid))
+ ereport(LOG,
+ (errmsg("logical replication table synchronization worker for
subscription \"%s\", table \"%s\" has started",
+ MySubscription->name,
+ get_rel_name(MyLogicalRepWorker->relid))));
+ else
+ ereport(LOG,
+ (errmsg("logical replication table synchronization worker for
subscription \"%s\", relid %u has started",
+ MySubscription->name,
+ MyLogicalRepWorker->subrelid)));
+ }

~

IIUC it doesn't seem right to say "relid %u has started". Because
that's not really a relid is it? I thought it is just a PK Oid of the
row in the catalog.

======
src/include/catalog/pg_subscription_rel.h

32. pg_subscription_rel

+ /* What part do we need to synchronize? */
+ bool srsyncschema;
+ bool srsyncdata;

These aren't really "parts".

SUGGESTION
/* What to synchronize? */

~~~

33.
typedef struct SubscriptionRelState
{
+ Oid oid;

Is that the pg_subscription_rel's oid? Maybe it would be better to
call this field 'sroid'? (see the general comment in the commit
message)

======
src/include/replication/walsender.h

34. CRSSnapshotAction

  CRS_EXPORT_SNAPSHOT,
  CRS_NOEXPORT_SNAPSHOT,
- CRS_USE_SNAPSHOT
+ CRS_USE_SNAPSHOT,
+ CRS_EXPORT_USE_SNAPSHOT
 } CRSSnapshotAction;

~

Should the CRS_USE_SNAPSHOT be renamed to CRS_NOEXOPRT_USE_SNAPSHOT to
have a more consistent naming pattern?

======
src/include/replication/worker_internal.h

35.
- /* Used for initial table synchronization. */
+ /*
+ * Used for initial table synchronization.
+ *
+ * relid is an invalid oid if the table is not created on the subscriber
+ * yet.
+ */
+ Oid subrelid;
  Oid relid;
It would be good to have more explanation what is the different
meaning of 'subrelid' versus 'relid' (see also the general comment
suggesting to rename this)

------
Kind Regards,
Peter Smith.
Fujitsu Australia

#55Masahiko Sawada
sawada.mshk@gmail.com
In reply to: Peter Smith (#54)
Re: Initial Schema Sync for Logical Replication

On Mon, Jun 19, 2023 at 5:29 PM Peter Smith <smithpb2250@gmail.com> wrote:

Hi,

Below are my review comments for the PoC patch 0001.

In addition, the patch needed rebasing, and, after I rebased it
locally in my private environment there were still test failures:
a) The 'make check' tests fail but only in a minor way due to changes colname
b) the subscription TAP test did not work at all for me -- many errors.

Thank you for reviewing the patch.

While updating the patch, I realized that the current approach won't
work well or at least has the problem with partition tables. If a
publication has a partitioned table with publish_via_root = false, the
subscriber launches tablesync workers for its partitions so that each
tablesync worker copies data of each partition. Similarly, if it has a
partition table with publish_via_root = true, the subscriber launches
a tablesync worker for the parent table. With the current design,
since the tablesync worker is responsible for both schema and data
synchronization for the target table, it won't be possible to
synchronize both the parent table's schema and partitions' schema. For
example, there is no pg_subscription_rel entry for the parent table if
the publication has publish_via_root = false. In addition to that, we
need to be careful about the order of synchronization of the parent
table and its partitions. We cannot start schema synchronization for
partitions before its parent table. So it seems to me that we need to
consider another approach.

Regards,

--
Masahiko Sawada
Amazon Web Services: https://aws.amazon.com

#56Masahiko Sawada
sawada.mshk@gmail.com
In reply to: Masahiko Sawada (#55)
2 attachment(s)
Re: Initial Schema Sync for Logical Replication

On Wed, Jul 5, 2023 at 11:14 AM Masahiko Sawada <sawada.mshk@gmail.com> wrote:

On Mon, Jun 19, 2023 at 5:29 PM Peter Smith <smithpb2250@gmail.com> wrote:

Hi,

Below are my review comments for the PoC patch 0001.

In addition, the patch needed rebasing, and, after I rebased it
locally in my private environment there were still test failures:
a) The 'make check' tests fail but only in a minor way due to changes colname
b) the subscription TAP test did not work at all for me -- many errors.

Thank you for reviewing the patch.

While updating the patch, I realized that the current approach won't
work well or at least has the problem with partition tables. If a
publication has a partitioned table with publish_via_root = false, the
subscriber launches tablesync workers for its partitions so that each
tablesync worker copies data of each partition. Similarly, if it has a
partition table with publish_via_root = true, the subscriber launches
a tablesync worker for the parent table. With the current design,
since the tablesync worker is responsible for both schema and data
synchronization for the target table, it won't be possible to
synchronize both the parent table's schema and partitions' schema. For
example, there is no pg_subscription_rel entry for the parent table if
the publication has publish_via_root = false. In addition to that, we
need to be careful about the order of synchronization of the parent
table and its partitions. We cannot start schema synchronization for
partitions before its parent table. So it seems to me that we need to
consider another approach.

So I've implemented a different approach; doing schema synchronization
at a CREATE SUBSCRIPTION time. The backend executing CREATE
SUBSCRIPTION uses pg_dump and restores the table schemas including
both partitioned tables and their partitions regardless of
publish_via_partition_root option, and then creates
pg_subscription_rel entries for tables while respecting
publish_via_partition_root option.

There is a window between table creations and the tablesync workers
starting to process the tables. If DDLs are executed in this window,
the tablesync worker might fail because the table schema might have
already been changed. We need to mention this note in the
documentation. BTW, I think we will be able to get rid of this
downside if we support DDL replication. DDLs executed in the window
are applied by the apply worker and it takes over the data copy to the
tablesync worker at a certain LSN.

I've attached PoC patches. It has regression tests but doesn't have
the documentation yet.

Regards,

--
Masahiko Sawada
Amazon Web Services: https://aws.amazon.com

Attachments:

v2-0001-PoC-Add-no-table-dependents-option-to-pg_dump.patchapplication/octet-stream; name=v2-0001-PoC-Add-no-table-dependents-option-to-pg_dump.patchDownload
From 17b31313014c8432a7e435d11d4f68e57ef7b623 Mon Sep 17 00:00:00 2001
From: Masahiko Sawada <sawada.mshk@gmail.com>
Date: Mon, 3 Jul 2023 16:07:48 +0900
Subject: [PATCH v2 1/2] PoC: Add --no-table-dependents option to pg_dump.

---
 doc/src/sgml/ref/pg_dump.sgml | 12 ++++++++++++
 src/bin/pg_dump/pg_backup.h   |  1 +
 src/bin/pg_dump/pg_dump.c     | 37 ++++++++++++++++++++++++++++++-----
 src/bin/pg_dump/pg_dump.h     |  3 +++
 4 files changed, 48 insertions(+), 5 deletions(-)

diff --git a/doc/src/sgml/ref/pg_dump.sgml b/doc/src/sgml/ref/pg_dump.sgml
index a3cf0608f5..2e6221ffcb 100644
--- a/doc/src/sgml/ref/pg_dump.sgml
+++ b/doc/src/sgml/ref/pg_dump.sgml
@@ -994,6 +994,18 @@ PostgreSQL documentation
       </listitem>
      </varlistentry>
 
+     <varlistentry>
+      <term><option>--no-table-dependents</option></term>
+      <listitem>
+       <para>
+        Do not output commands to select table-dependent objects such as
+        indexes and triggers.  This option is not valid unless
+        <option>--table</option> or <option>--table-and-children</option>
+        is also specified.
+       </para>
+      </listitem>
+     </varlistentry>
+
      <varlistentry>
       <term><option>--no-tablespaces</option></term>
       <listitem>
diff --git a/src/bin/pg_dump/pg_backup.h b/src/bin/pg_dump/pg_backup.h
index aba780ef4b..eb7727ea93 100644
--- a/src/bin/pg_dump/pg_backup.h
+++ b/src/bin/pg_dump/pg_backup.h
@@ -178,6 +178,7 @@ typedef struct _dumpOptions
 	int			no_security_labels;
 	int			no_publications;
 	int			no_subscriptions;
+	int			no_table_dependents;
 	int			no_toast_compression;
 	int			no_unlogged_table_data;
 	int			serializable_deferrable;
diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c
index 5dab1ba9ea..27f1a2d274 100644
--- a/src/bin/pg_dump/pg_dump.c
+++ b/src/bin/pg_dump/pg_dump.c
@@ -422,6 +422,7 @@ main(int argc, char **argv)
 		{"no-publications", no_argument, &dopt.no_publications, 1},
 		{"no-security-labels", no_argument, &dopt.no_security_labels, 1},
 		{"no-subscriptions", no_argument, &dopt.no_subscriptions, 1},
+		{"no-table-dependents", no_argument, &dopt.no_table_dependents, 15},
 		{"no-toast-compression", no_argument, &dopt.no_toast_compression, 1},
 		{"no-unlogged-table-data", no_argument, &dopt.no_unlogged_table_data, 1},
 		{"no-sync", no_argument, NULL, 7},
@@ -707,6 +708,10 @@ main(int argc, char **argv)
 	if (dopt.if_exists && !dopt.outputClean)
 		pg_fatal("option --if-exists requires option -c/--clean");
 
+	if (dopt.no_table_dependents && table_include_patterns.head == NULL &&
+		table_include_patterns_and_children.head == NULL)
+		pg_fatal("option --no-table-dependents requires option -t/--table or --table-and-children");
+
 	/*
 	 * --inserts are already implied above if --column-inserts or
 	 * --rows-per-insert were specified.
@@ -1114,6 +1119,8 @@ help(const char *progname)
 	printf(_("  --no-security-labels         do not dump security label assignments\n"));
 	printf(_("  --no-subscriptions           do not dump subscriptions\n"));
 	printf(_("  --no-table-access-method     do not dump table access methods\n"));
+	printf(_("  --no-table-dependents        do not dump table-dependent objects such as indexes\n"
+			 "                               and triggers\n"));
 	printf(_("  --no-tablespaces             do not dump tablespace assignments\n"));
 	printf(_("  --no-toast-compression       do not dump TOAST compression methods\n"));
 	printf(_("  --no-unlogged-table-data     do not dump unlogged table data\n"));
@@ -1803,9 +1810,26 @@ selectDumpableTable(TableInfo *tbinfo, Archive *fout)
 	 * according to the parent namespace's dump flag.
 	 */
 	if (table_include_oids.head != NULL)
-		tbinfo->dobj.dump = simple_oid_list_member(&table_include_oids,
-												   tbinfo->dobj.catId.oid) ?
-			DUMP_COMPONENT_ALL : DUMP_COMPONENT_NONE;
+	{
+		if (simple_oid_list_member(&table_include_oids,
+								   tbinfo->dobj.catId.oid))
+		{
+			if (!fout->dopt->no_table_dependents)
+				tbinfo->dobj.dump = DUMP_COMPONENT_ALL;
+			else
+			{
+				/*
+				 * If --exclude-table-dependents option is specified, we
+				 * dump either/both the table schema or/and the table data.
+				 */
+				tbinfo->dobj.dump = fout->dopt->schemaOnly ?
+					DUMP_COMPONENT_DEFINITION :
+					(DUMP_COMPONENT_DEFINITION | DUMP_COMPONENT_DATA);
+			}
+		}
+		else
+			tbinfo->dobj.dump = DUMP_COMPONENT_NONE;
+	}
 	else
 		tbinfo->dobj.dump = tbinfo->dobj.namespace->dobj.dump_contains;
 
@@ -6982,6 +7006,9 @@ getIndexes(Archive *fout, TableInfo tblinfo[], int numTables)
 		if (!tbinfo->hasindex)
 			continue;
 
+		if (!(tbinfo->dobj.dump & DUMP_COMPONENT_INDEX))
+			continue;
+
 		/*
 		 * We can ignore indexes of uninteresting tables.
 		 */
@@ -7352,7 +7379,7 @@ getConstraints(Archive *fout, TableInfo tblinfo[], int numTables)
 		 */
 		if ((!tinfo->hastriggers &&
 			 tinfo->relkind != RELKIND_PARTITIONED_TABLE) ||
-			!(tinfo->dobj.dump & DUMP_COMPONENT_DEFINITION))
+			!(tinfo->dobj.dump & DUMP_COMPONENT_CONSTRAINT))
 			continue;
 
 		/* OK, we need info for this table */
@@ -7737,7 +7764,7 @@ getTriggers(Archive *fout, TableInfo tblinfo[], int numTables)
 		TableInfo  *tbinfo = &tblinfo[i];
 
 		if (!tbinfo->hastriggers ||
-			!(tbinfo->dobj.dump & DUMP_COMPONENT_DEFINITION))
+			!(tbinfo->dobj.dump & DUMP_COMPONENT_TRIGGER))
 			continue;
 
 		/* OK, we need info for this table */
diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h
index bc8f2ec36d..64d2b685fa 100644
--- a/src/bin/pg_dump/pg_dump.h
+++ b/src/bin/pg_dump/pg_dump.h
@@ -100,6 +100,9 @@ typedef uint32 DumpComponents;
 #define DUMP_COMPONENT_ACL			(1 << 4)
 #define DUMP_COMPONENT_POLICY		(1 << 5)
 #define DUMP_COMPONENT_USERMAP		(1 << 6)
+#define DUMP_COMPONENT_INDEX		(1 << 7)
+#define DUMP_COMPONENT_TRIGGER		(1 << 8)
+#define DUMP_COMPONENT_CONSTRAINT	(1 << 9)
 #define DUMP_COMPONENT_ALL			(0xFFFF)
 
 /*
-- 
2.31.1

v2-0002-PoC-intitial-table-schema-synchronization-in-logi.patchapplication/octet-stream; name=v2-0002-PoC-intitial-table-schema-synchronization-in-logi.patchDownload
From 5c7710edbc55b8590ac977ca96cea41465a88e13 Mon Sep 17 00:00:00 2001
From: Masahiko Sawada <sawada.mshk@gmail.com>
Date: Fri, 7 Jul 2023 12:20:25 +0900
Subject: [PATCH v2 2/2] PoC: intitial table schema synchronization in logical
 replication.

---
 src/backend/catalog/pg_publication.c        |  20 +-
 src/backend/catalog/system_views.sql        |   2 +-
 src/backend/commands/subscriptioncmds.c     | 299 +++++++++++++++++---
 src/backend/replication/logical/tablesync.c |   4 +-
 src/include/catalog/pg_proc.dat             |   8 +-
 src/include/catalog/pg_publication.h        |   2 +-
 src/test/regress/expected/rules.out         |   2 +-
 7 files changed, 285 insertions(+), 52 deletions(-)

diff --git a/src/backend/catalog/pg_publication.c b/src/backend/catalog/pg_publication.c
index c488b6370b..b392458642 100644
--- a/src/backend/catalog/pg_publication.c
+++ b/src/backend/catalog/pg_publication.c
@@ -802,7 +802,7 @@ GetAllTablesPublications(void)
  * root partitioned tables.
  */
 List *
-GetAllTablesPublicationRelations(bool pubviaroot)
+GetAllTablesPublicationRelations(bool pubviaroot, bool include_all_partitions)
 {
 	Relation	classRel;
 	ScanKeyData key[1];
@@ -825,13 +825,13 @@ GetAllTablesPublicationRelations(bool pubviaroot)
 		Oid			relid = relForm->oid;
 
 		if (is_publishable_class(relid, relForm) &&
-			!(relForm->relispartition && pubviaroot))
+			(include_all_partitions || !(relForm->relispartition && pubviaroot)))
 			result = lappend_oid(result, relid);
 	}
 
 	table_endscan(scan);
 
-	if (pubviaroot)
+	if (pubviaroot || include_all_partitions)
 	{
 		ScanKeyInit(&key[0],
 					Anum_pg_class_relkind,
@@ -846,7 +846,7 @@ GetAllTablesPublicationRelations(bool pubviaroot)
 			Oid			relid = relForm->oid;
 
 			if (is_publishable_class(relid, relForm) &&
-				!relForm->relispartition)
+				(include_all_partitions || !relForm->relispartition))
 				result = lappend_oid(result, relid);
 		}
 
@@ -1057,6 +1057,7 @@ Datum
 pg_get_publication_tables(PG_FUNCTION_ARGS)
 {
 #define NUM_PUBLICATION_TABLES_ELEM	4
+	bool		include_all_partitions = PG_GETARG_BOOL(0);
 	FuncCallContext *funcctx;
 	List	   *table_infos = NIL;
 
@@ -1081,7 +1082,7 @@ pg_get_publication_tables(PG_FUNCTION_ARGS)
 		 * Deconstruct the parameter into elements where each element is a
 		 * publication name.
 		 */
-		arr = PG_GETARG_ARRAYTYPE_P(0);
+		arr = PG_GETARG_ARRAYTYPE_P(1);
 		deconstruct_array(arr, TEXTOID, -1, false, TYPALIGN_INT,
 						  &elems, NULL, &nelems);
 
@@ -1101,17 +1102,22 @@ pg_get_publication_tables(PG_FUNCTION_ARGS)
 			 * those. Otherwise, get the partitioned table itself.
 			 */
 			if (pub_elem->alltables)
-				pub_elem_tables = GetAllTablesPublicationRelations(pub_elem->pubviaroot);
+				pub_elem_tables = GetAllTablesPublicationRelations(pub_elem->pubviaroot,
+																   include_all_partitions);
 			else
 			{
 				List	   *relids,
 						   *schemarelids;
 
 				relids = GetPublicationRelations(pub_elem->oid,
+												 include_all_partitions ?
+												 PUBLICATION_PART_ALL :
 												 pub_elem->pubviaroot ?
 												 PUBLICATION_PART_ROOT :
 												 PUBLICATION_PART_LEAF);
 				schemarelids = GetAllSchemaPublicationRelations(pub_elem->oid,
+																include_all_partitions ?
+																PUBLICATION_PART_ALL :
 																pub_elem->pubviaroot ?
 																PUBLICATION_PART_ROOT :
 																PUBLICATION_PART_LEAF);
@@ -1148,7 +1154,7 @@ pg_get_publication_tables(PG_FUNCTION_ARGS)
 		 * data of the child table to be double-published on the subscriber
 		 * side.
 		 */
-		if (viaroot)
+		if (viaroot && !include_all_partitions)
 			filter_partitions(table_infos);
 
 		/* Construct a tuple descriptor for the result rows. */
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index c18fea8362..1698dd5374 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -376,7 +376,7 @@ CREATE VIEW pg_publication_tables AS
         ) AS attnames,
         pg_get_expr(GPT.qual, GPT.relid) AS rowfilter
     FROM pg_publication P,
-         LATERAL pg_get_publication_tables(P.pubname) GPT,
+         LATERAL pg_get_publication_tables(false, P.pubname) GPT,
          pg_class C JOIN pg_namespace N ON (N.oid = C.relnamespace)
     WHERE C.oid = GPT.relid;
 
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index d4e798baeb..bdcfb8829e 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -33,6 +33,7 @@
 #include "commands/event_trigger.h"
 #include "commands/subscriptioncmds.h"
 #include "executor/executor.h"
+#include "executor/spi.h"
 #include "miscadmin.h"
 #include "nodes/makefuncs.h"
 #include "pgstat.h"
@@ -71,6 +72,7 @@
 #define SUBOPT_RUN_AS_OWNER			0x00001000
 #define SUBOPT_LSN					0x00002000
 #define SUBOPT_ORIGIN				0x00004000
+#define SUBOPT_COPY_SCHEMA			0x00008000
 
 /* check if the 'val' has 'bits' set */
 #define IsSet(val, bits)  (((val) & (bits)) == (bits))
@@ -87,6 +89,7 @@ typedef struct SubOpts
 	bool		connect;
 	bool		enabled;
 	bool		create_slot;
+	bool		copy_schema;
 	bool		copy_data;
 	bool		refresh;
 	bool		binary;
@@ -99,13 +102,16 @@ typedef struct SubOpts
 	XLogRecPtr	lsn;
 } SubOpts;
 
-static List *fetch_table_list(WalReceiverConn *wrconn, List *publications);
+static List *fetch_table_list(WalReceiverConn *wrconn, List *publications,
+							  bool all_partitions);
 static void check_publications_origin(WalReceiverConn *wrconn,
 									  List *publications, bool copydata,
 									  char *origin, Oid *subrel_local_oids,
 									  int subrel_count, char *subname);
 static void check_duplicates_in_publist(List *publist, Datum *datums);
 static List *merge_publications(List *oldpublist, List *newpublist, bool addpub, const char *subname);
+static void check_pg_dump_available(void);
+static void synchronize_table_schema(char *conninfo, List *tables, char *snapshot_name);
 static void ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err);
 
 
@@ -139,6 +145,8 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
 		opts->enabled = true;
 	if (IsSet(supported_opts, SUBOPT_CREATE_SLOT))
 		opts->create_slot = true;
+	if (IsSet(supported_opts, SUBOPT_COPY_SCHEMA))
+		opts->copy_schema = false;
 	if (IsSet(supported_opts, SUBOPT_COPY_DATA))
 		opts->copy_data = true;
 	if (IsSet(supported_opts, SUBOPT_REFRESH))
@@ -205,6 +213,15 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
 			else
 				ReplicationSlotValidateName(opts->slot_name, ERROR);
 		}
+		else if (IsSet(supported_opts, SUBOPT_COPY_SCHEMA) &&
+				 strcmp(defel->defname, "copy_schema") == 0)
+		{
+			if (IsSet(opts->specified_opts, SUBOPT_COPY_SCHEMA))
+				errorConflictingDefElem(defel, pstate);
+
+			opts->specified_opts |= SUBOPT_COPY_SCHEMA;
+			opts->copy_schema = defGetBoolean(defel);
+		}
 		else if (IsSet(supported_opts, SUBOPT_COPY_DATA) &&
 				 strcmp(defel->defname, "copy_data") == 0)
 		{
@@ -388,12 +405,30 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
 					 errmsg("%s and %s are mutually exclusive options",
 							"connect = false", "copy_data = true")));
 
+		if (opts->copy_schema &&
+			IsSet(opts->specified_opts, SUBOPT_COPY_SCHEMA))
+			ereport(ERROR,
+					(errcode(ERRCODE_SYNTAX_ERROR),
+					 errmsg("%s and %s are mutually exclusive options",
+							"connect = false", "copy_schema = true")));
+
 		/* Change the defaults of other options. */
 		opts->enabled = false;
 		opts->create_slot = false;
+		opts->copy_schema = false;
 		opts->copy_data = false;
 	}
 
+	/*
+	 * The initial schema sync needs a snapshot that is created and exported
+	 * while creating a replication slot.
+	 */
+	if (!opts->create_slot && IsSet(supported_opts, SUBOPT_CONNECT) &&
+		opts->copy_schema)
+		ereport(ERROR,
+				(errcode(ERRCODE_SYNTAX_ERROR),
+				 errmsg("subscription with %s must also set %s",
+						"copy_schema = on", "create_slot = true")));
 	/*
 	 * Do additional checking for disallowed combination when slot_name = NONE
 	 * was used.
@@ -591,7 +626,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 					  SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY |
 					  SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT |
 					  SUBOPT_DISABLE_ON_ERR | SUBOPT_PASSWORD_REQUIRED |
-					  SUBOPT_RUN_AS_OWNER | SUBOPT_ORIGIN);
+					  SUBOPT_RUN_AS_OWNER | SUBOPT_ORIGIN | SUBOPT_COPY_SCHEMA);
 	parse_subscription_options(pstate, stmt->options, supported_opts, &opts);
 
 	/*
@@ -636,6 +671,9 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 				 errmsg("password_required=false is superuser-only"),
 				 errhint("Subscriptions with the password_required option set to false may only be created or modified by the superuser.")));
 
+	if (opts.copy_schema)
+		check_pg_dump_available();
+
 	/*
 	 * If built with appropriate switch, whine when regression-testing
 	 * conventions for subscription names are violated.
@@ -750,31 +788,11 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 			check_publications_origin(wrconn, publications, opts.copy_data,
 									  opts.origin, NULL, 0, stmt->subname);
 
-			/*
-			 * Set sync state based on if we were asked to do data copy or
-			 * not.
-			 */
-			table_state = opts.copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY;
-
 			/*
 			 * Get the table list from publisher and build local table status
 			 * info.
 			 */
-			tables = fetch_table_list(wrconn, publications);
-			foreach(lc, tables)
-			{
-				RangeVar   *rv = (RangeVar *) lfirst(lc);
-				Oid			relid;
-
-				relid = RangeVarGetRelid(rv, AccessShareLock, false);
-
-				/* Check for supported relkind. */
-				CheckSubscriptionRelkind(get_rel_relkind(relid),
-										 rv->schemaname, rv->relname);
-
-				AddSubscriptionRelState(subid, relid, table_state,
-										InvalidXLogRecPtr);
-			}
+			tables = fetch_table_list(wrconn, publications, false);
 
 			/*
 			 * If requested, create permanent slot for the subscription. We
@@ -783,10 +801,22 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 			 */
 			if (opts.create_slot)
 			{
+				List		*tables_schema_sync;
 				bool		twophase_enabled = false;
+				char		*snapshot_name = NULL;
 
 				Assert(opts.slot_name);
 
+				/*
+				 * Get the list of tables in publications including both partitioned
+				 * tables and theirs partitions. We have to fetch the list from the
+				 * publisher before creating the replication slot below. Otherwise,
+				 * the exported snapshot will be invalidated when fetching the table
+				 * list.
+				 */
+				if (opts.copy_schema)
+					tables_schema_sync = fetch_table_list(wrconn, publications, true);
+
 				/*
 				 * Even if two_phase is set, don't create the slot with
 				 * two-phase enabled. Will enable it once all the tables are
@@ -806,8 +836,16 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 				if (opts.twophase && !opts.copy_data && tables != NIL)
 					twophase_enabled = true;
 
-				walrcv_create_slot(wrconn, opts.slot_name, false, twophase_enabled,
-								   CRS_NOEXPORT_SNAPSHOT, NULL);
+				snapshot_name = walrcv_create_slot(wrconn, opts.slot_name, false, twophase_enabled,
+												   opts.copy_schema ? CRS_EXPORT_SNAPSHOT:
+												   CRS_NOEXPORT_SNAPSHOT, NULL);
+
+				/* Synchronize schemas of tables that will be subscribed */
+				if (opts.copy_schema)
+				{
+					Assert(snapshot_name != NULL);
+					synchronize_table_schema(conninfo, tables_schema_sync, snapshot_name);
+				}
 
 				if (twophase_enabled)
 					UpdateTwoPhaseState(subid, LOGICALREP_TWOPHASE_STATE_ENABLED);
@@ -816,6 +854,27 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 						(errmsg("created replication slot \"%s\" on publisher",
 								opts.slot_name)));
 			}
+
+			/*
+			 * Set sync state based on if we were asked to do data copy or
+			 * not.
+			 */
+			table_state = opts.copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY;
+			foreach(lc, tables)
+			{
+				RangeVar   *rv = (RangeVar *) lfirst(lc);
+				Oid			relid;
+
+				relid = RangeVarGetRelid(rv, AccessShareLock, false);
+
+				/* Check for supported relkind. */
+				CheckSubscriptionRelkind(get_rel_relkind(relid),
+										 rv->schemaname, rv->relname);
+
+				AddSubscriptionRelState(subid, relid, table_state,
+										InvalidXLogRecPtr);
+			}
+
 		}
 		PG_FINALLY();
 		{
@@ -843,7 +902,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 }
 
 static void
-AlterSubscription_refresh(Subscription *sub, bool copy_data,
+AlterSubscription_refresh(Subscription *sub, bool copy_data, bool copy_schema,
 						  List *validate_publications)
 {
 	char	   *err;
@@ -868,6 +927,9 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data,
 	/* Load the library providing us libpq calls. */
 	load_file("libpqwalreceiver", false);
 
+	if (copy_schema)
+		check_pg_dump_available();
+
 	/* Try to connect to the publisher. */
 	must_use_password = !superuser_arg(sub->owner) && sub->passwordrequired;
 	wrconn = walrcv_connect(sub->conninfo, true, must_use_password,
@@ -883,7 +945,7 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data,
 			check_publications(wrconn, validate_publications);
 
 		/* Get the table list from publisher. */
-		pubrel_names = fetch_table_list(wrconn, sub->publications);
+		pubrel_names = fetch_table_list(wrconn, sub->publications, false);
 
 		/* Get local table list. */
 		subrel_states = GetSubscriptionRelations(sub->oid, false);
@@ -909,6 +971,35 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data,
 								  sub->origin, subrel_local_oids,
 								  subrel_count, sub->name);
 
+		if (copy_schema)
+		{
+			List	   *rels_schema_sync = NIL;
+
+			foreach(lc, pubrel_names)
+			{
+				RangeVar   *rv = (RangeVar *) lfirst(lc);
+				Oid			relid;
+
+				relid = RangeVarGetRelid(rv, AccessShareLock, true);
+
+				if (!bsearch(&relid, subrel_local_oids,
+							 subrel_count, sizeof(Oid), oid_cmp))
+					rels_schema_sync = lappend(rels_schema_sync, rv);
+			}
+
+			/*
+			 * Synchronize table schemas for tables that are not present
+			 * on the subscriber node.
+			 *
+			 * XXX: There is a window between creating the table and the
+			 * tablesync worker starts processing it. If there is a DDL
+			 * for the table, the data sync could fail.
+			 */
+			if (list_length(rels_schema_sync) > 0)
+				synchronize_table_schema(sub->conninfo, rels_schema_sync,
+										 NULL);
+		}
+
 		/*
 		 * Rels that we want to remove from subscription and drop any slots
 		 * and origins corresponding to them.
@@ -1252,7 +1343,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 
 		case ALTER_SUBSCRIPTION_SET_PUBLICATION:
 			{
-				supported_opts = SUBOPT_COPY_DATA | SUBOPT_REFRESH;
+				supported_opts = SUBOPT_COPY_DATA | SUBOPT_COPY_SCHEMA | SUBOPT_REFRESH;
 				parse_subscription_options(pstate, stmt->options,
 										   supported_opts, &opts);
 
@@ -1286,7 +1377,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 					/* Make sure refresh sees the new list of publications. */
 					sub->publications = stmt->publication;
 
-					AlterSubscription_refresh(sub, opts.copy_data,
+					AlterSubscription_refresh(sub, opts.copy_data, opts.copy_schema,
 											  stmt->publication);
 				}
 
@@ -1299,7 +1390,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 				List	   *publist;
 				bool		isadd = stmt->kind == ALTER_SUBSCRIPTION_ADD_PUBLICATION;
 
-				supported_opts = SUBOPT_REFRESH | SUBOPT_COPY_DATA;
+				supported_opts = SUBOPT_REFRESH | SUBOPT_COPY_DATA | SUBOPT_COPY_SCHEMA;
 				parse_subscription_options(pstate, stmt->options,
 										   supported_opts, &opts);
 
@@ -1345,7 +1436,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 					/* Refresh the new list of publications. */
 					sub->publications = publist;
 
-					AlterSubscription_refresh(sub, opts.copy_data,
+					AlterSubscription_refresh(sub, opts.copy_data, opts.copy_schema,
 											  validate_publications);
 				}
 
@@ -1360,7 +1451,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 							 errmsg("ALTER SUBSCRIPTION ... REFRESH is not allowed for disabled subscriptions")));
 
 				parse_subscription_options(pstate, stmt->options,
-										   SUBOPT_COPY_DATA, &opts);
+										   SUBOPT_COPY_DATA | SUBOPT_COPY_SCHEMA, &opts);
 
 				/*
 				 * The subscription option "two_phase" requires that
@@ -1387,7 +1478,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 
 				PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION ... REFRESH");
 
-				AlterSubscription_refresh(sub, opts.copy_data, NULL);
+				AlterSubscription_refresh(sub, opts.copy_data, opts.copy_schema, NULL);
 
 				break;
 			}
@@ -1957,7 +2048,7 @@ check_publications_origin(WalReceiverConn *wrconn, List *publications,
 	appendStringInfoString(&cmd,
 						   "SELECT DISTINCT P.pubname AS pubname\n"
 						   "FROM pg_publication P,\n"
-						   "     LATERAL pg_get_publication_tables(P.pubname) GPT\n"
+						   "     LATERAL pg_get_publication_tables(false, P.pubname) GPT\n"
 						   "     JOIN pg_subscription_rel PS ON (GPT.relid = PS.srrelid),\n"
 						   "     pg_class C JOIN pg_namespace N ON (N.oid = C.relnamespace)\n"
 						   "WHERE C.oid = GPT.relid AND P.pubname IN (");
@@ -2044,7 +2135,8 @@ check_publications_origin(WalReceiverConn *wrconn, List *publications,
  * list and row filter are specified for different publications.
  */
 static List *
-fetch_table_list(WalReceiverConn *wrconn, List *publications)
+fetch_table_list(WalReceiverConn *wrconn, List *publications,
+				 bool all_partitions)
 {
 	WalRcvExecResult *res;
 	StringInfoData cmd;
@@ -2081,10 +2173,11 @@ fetch_table_list(WalReceiverConn *wrconn, List *publications)
 		appendStringInfo(&cmd, "SELECT DISTINCT n.nspname, c.relname, gpt.attrs\n"
 						 "       FROM pg_class c\n"
 						 "         JOIN pg_namespace n ON n.oid = c.relnamespace\n"
-						 "         JOIN ( SELECT (pg_get_publication_tables(VARIADIC array_agg(pubname::text))).*\n"
+						 "         JOIN ( SELECT (pg_get_publication_tables(%s, VARIADIC array_agg(pubname::text))).*\n"
 						 "                FROM pg_publication\n"
 						 "                WHERE pubname IN ( %s )) AS gpt\n"
 						 "             ON gpt.relid = c.oid\n",
+						 all_partitions ? "true" : "false",
 						 pub_names.data);
 
 		pfree(pub_names.data);
@@ -2290,6 +2383,140 @@ merge_publications(List *oldpublist, List *newpublist, bool addpub, const char *
 	return oldpublist;
 }
 
+/*
+ * Check and raise an ERROR if table schema copy is requested but pg_dump command is
+ * neither not found nor executable.
+ */
+static void
+check_pg_dump_available(void)
+{
+	char path[MAXPGPATH];
+
+	if (find_my_exec("pg_dump", path) < 0)
+		ereport(ERROR,
+				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+				 errmsg("could not find \"%s\" in executable path", "pg_dump")));
+}
+
+/*
+ * Fetch the table schema of the given table and restore into the subscriber.
+ *
+ * XXX currently it doesn't schema (IOW namespace) so the schema has to already
+ * be present on the subscriber, is that okay? or do we want to create it? But
+ * if we want to do that, we need to consider the case where the schema has
+ * non-default options.
+ */
+static void
+synchronize_table_schema(char *conninfo, List *tables, char *snapshot_name)
+{
+	StringInfoData command;
+	FILE *handle;
+	char full_path[MAXPGPATH];
+	ListCell *lc;
+	int ret;
+
+	/*
+	 * We've checked the availability of pg_dump at a CREATE SUBSCRIPTION or a
+	 * ALTER SUBSCRIPTION ... REFRESH PUBLICATION time, but check it again in
+	 * case the pg_dump command becomes unavailable.
+	 */
+	if (find_my_exec("pg_dump", full_path) < 0)
+		ereport(ERROR,
+				(errmsg("could not find \"%s\" in executable path", "pg_dump")));
+
+	/*
+	 * Construct pg_dump command. We dump only the table definition without
+	 * any table-dependent objects such as indexes and triggers. Also, we specify
+	 * the snapshot that has been exported while creating the replication slot for
+	 * tablesync. The table name in --table option must be quoted to avoid the
+	 * table name from being interpreted as a regular expression.
+	 *
+	 * Since the publisher could be a different major version PostgreSQL, we
+	 * use --quote-all-identifiers option.
+	 *
+	 * The outputs are redirected to this backend's input and executed via SPI.
+	 *
+	 * XXX: who should be the owner of the new table?
+	 */
+	initStringInfo(&command);
+	appendStringInfo(&command,
+					 "%s --format=p --schema-only --username %s --dbname \"%s\" --no-table-dependents --quote-all-identifiers --file -",
+					 full_path, GetUserNameFromId(GetUserId(), true),
+					 conninfo);
+
+	if (snapshot_name)
+		appendStringInfo(&command, " --snapshot=%s", snapshot_name);
+
+	foreach(lc, tables)
+	{
+		RangeVar *rv = (RangeVar *) lfirst(lc);
+
+		/*
+		 * Error if the table is already present on the subscriber. Please note
+		 * that concurrent DDLs can create the table as we don't acquire any lock
+		 * on the table.
+		 *
+		 * XXX: do we want to overwrite it (or optionally)?
+		 */
+		if (OidIsValid(RangeVarGetRelid(rv, AccessShareLock, true)))
+			ereport(ERROR,
+					(errmsg("existing table %s cannot synchronize table schema",
+							rv->relname)));
+
+		appendStringInfo(&command, " --table '%s'",
+						 quote_qualified_identifier(rv->schemaname, rv->relname));
+	}
+
+	/* Open SPI context. */
+	if (SPI_connect() != SPI_OK_CONNECT)
+		elog(ERROR, "SPI_connect failed");
+
+	PG_TRY();
+	{
+		char	buf[1024];
+		StringInfoData querybuf;
+
+		elog(DEBUG3, "executing pg_dump command \"%s\"", command.data);
+
+		handle = OpenPipeStream(command.data, "r");
+		if (handle == NULL)
+			elog(ERROR, "could not execute command \"%s\": %m", command.data);
+
+		initStringInfo(&querybuf);
+
+		/*
+		 * Gathering all commands into one string. Since we dump only schema of the
+		 * particular table, the command would not be long.
+		 */
+		while (fgets(buf, sizeof(buf), handle))
+			appendStringInfoString(&querybuf, buf);
+
+		/*
+		 * If the pg_dump command failed, there is no output in the result handle
+		 * and the pg_dump's error messages are written into the server log.
+		 */
+		if (querybuf.len == 0)
+			elog(ERROR, "failed to execute command \"%s\"", command.data);
+
+		elog(DEBUG5, "executing dumped DDLs %s", querybuf.data);
+		ret = SPI_exec(querybuf.data, 0);
+		if (ret != SPI_OK_UTILITY && ret != SPI_OK_SELECT)
+			elog(ERROR, "SPI_exec returned %d: %s", ret, querybuf.data);
+	}
+	PG_FINALLY();
+	{
+		ClosePipeStream(handle);
+	}
+	PG_END_TRY();
+
+	/* Close SPI context */
+	if (SPI_finish() != SPI_OK_FINISH)
+		elog(ERROR, "SPI_finish failed");
+
+	/* make the newly created table visible to us */
+	CommandCounterIncrement();
+}
+
 /*
  * Extract the streaming mode value from a DefElem.  This is like
  * defGetBoolean() but also accepts the special value of "parallel".
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 6d461654ab..d870a9f69c 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -846,7 +846,7 @@ fetch_remote_table_info(char *nspname, char *relname,
 						 "  (CASE WHEN (array_length(gpt.attrs, 1) = c.relnatts)"
 						 "   THEN NULL ELSE gpt.attrs END)"
 						 "  FROM pg_publication p,"
-						 "  LATERAL pg_get_publication_tables(p.pubname) gpt,"
+						 "  LATERAL pg_get_publication_tables(false, p.pubname) gpt,"
 						 "  pg_class c"
 						 " WHERE gpt.relid = %u AND c.oid = gpt.relid"
 						 "   AND p.pubname IN ( %s )",
@@ -1028,7 +1028,7 @@ fetch_remote_table_info(char *nspname, char *relname,
 		appendStringInfo(&cmd,
 						 "SELECT DISTINCT pg_get_expr(gpt.qual, gpt.relid)"
 						 "  FROM pg_publication p,"
-						 "  LATERAL pg_get_publication_tables(p.pubname) gpt"
+						 "  LATERAL pg_get_publication_tables(false, p.pubname) gpt"
 						 " WHERE gpt.relid = %u"
 						 "   AND p.pubname IN ( %s )",
 						 lrel->remoteid,
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 6996073989..ec55a22fe1 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -11823,10 +11823,10 @@
   descr => 'get information of the tables that are part of the specified publications',
   proname => 'pg_get_publication_tables', prorows => '1000',
   provariadic => 'text', proretset => 't', provolatile => 's',
-  prorettype => 'record', proargtypes => '_text',
-  proallargtypes => '{_text,oid,oid,int2vector,pg_node_tree}',
-  proargmodes => '{v,o,o,o,o}',
-  proargnames => '{pubname,pubid,relid,attrs,qual}',
+  prorettype => 'record', proargtypes => 'bool _text',
+  proallargtypes => '{bool,_text,oid,oid,int2vector,pg_node_tree}',
+  proargmodes => '{i,v,o,o,o,o}',
+  proargnames => '{include_all_partitions,pubname,pubid,relid,attrs,qual}',
   prosrc => 'pg_get_publication_tables' },
 { oid => '6121',
   descr => 'returns whether a relation can be part of a publication',
diff --git a/src/include/catalog/pg_publication.h b/src/include/catalog/pg_publication.h
index 6ecaa2a01e..dd3c27d319 100644
--- a/src/include/catalog/pg_publication.h
+++ b/src/include/catalog/pg_publication.h
@@ -132,7 +132,7 @@ typedef enum PublicationPartOpt
 
 extern List *GetPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt);
 extern List *GetAllTablesPublications(void);
-extern List *GetAllTablesPublicationRelations(bool pubviaroot);
+extern List *GetAllTablesPublicationRelations(bool pubviaroot, bool include_all_partitions);
 extern List *GetPublicationSchemas(Oid pubid);
 extern List *GetSchemaPublications(Oid schemaid);
 extern List *GetSchemaPublicationRelations(Oid schemaid,
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index 7fd81e6a7d..fd805595bd 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -1449,7 +1449,7 @@ pg_publication_tables| SELECT p.pubname,
           WHERE ((a.attrelid = gpt.relid) AND (a.attnum = ANY ((gpt.attrs)::smallint[])))) AS attnames,
     pg_get_expr(gpt.qual, gpt.relid) AS rowfilter
    FROM pg_publication p,
-    LATERAL pg_get_publication_tables(VARIADIC ARRAY[(p.pubname)::text]) gpt(pubid, relid, attrs, qual),
+    LATERAL pg_get_publication_tables(false, VARIADIC ARRAY[(p.pubname)::text]) gpt(pubid, relid, attrs, qual),
     (pg_class c
      JOIN pg_namespace n ON ((n.oid = c.relnamespace)))
   WHERE (c.oid = gpt.relid);
-- 
2.31.1

#57Kumar, Sachin
ssetiya@amazon.com
In reply to: Masahiko Sawada (#56)
RE: Initial Schema Sync for Logical Replication

From: Masahiko Sawada <sawada.mshk@gmail.com>
So I've implemented a different approach; doing schema synchronization at a
CREATE SUBSCRIPTION time. The backend executing CREATE SUBSCRIPTION
uses pg_dump and restores the table schemas including both partitioned tables
and their partitions regardless of publish_via_partition_root option, and then
creates pg_subscription_rel entries for tables while respecting
publish_via_partition_root option.

There is a window between table creations and the tablesync workers starting to
process the tables. If DDLs are executed in this window, the tablesync worker
might fail because the table schema might have already been changed. We need
to mention this note in the documentation. BTW, I think we will be able to get
rid of this downside if we support DDL replication. DDLs executed in the window
are applied by the apply worker and it takes over the data copy to the tablesync
worker at a certain LSN.

I don’t think even with DDL replication we will be able to get rid of this window.
There are some issues
1. Even with tablesync worker taking over at certain LSN, publisher can make more changes till
Table sync acquires lock on publisher table via copy table.
2. how we will make sure that applier worker has caught up will all the changes from publisher
Before it starts tableSync worker. It can be lag behind publisher.

I think the easiest option would be to just recreate the table , this way we don’t have to worry about
complex race conditions, tablesync already makes a slot for copy data we can use same slot for
getting upto date table definition, dropping the table won't be much expensive since there won't be any data
in it.Apply worker will skip all the DDLs/DMLs till table is synced.

Although for partitioned tables we will be able to keep with published table schema changes only when
publish_by_partition_root is true.

Regards
Sachin
Amazon Web Services: https://aws.amazon.com

#58Amit Kapila
amit.kapila16@gmail.com
In reply to: Masahiko Sawada (#55)
Re: Initial Schema Sync for Logical Replication

On Wed, Jul 5, 2023 at 7:45 AM Masahiko Sawada <sawada.mshk@gmail.com> wrote:

On Mon, Jun 19, 2023 at 5:29 PM Peter Smith <smithpb2250@gmail.com> wrote:

Hi,

Below are my review comments for the PoC patch 0001.

In addition, the patch needed rebasing, and, after I rebased it
locally in my private environment there were still test failures:
a) The 'make check' tests fail but only in a minor way due to changes colname
b) the subscription TAP test did not work at all for me -- many errors.

Thank you for reviewing the patch.

While updating the patch, I realized that the current approach won't
work well or at least has the problem with partition tables. If a
publication has a partitioned table with publish_via_root = false, the
subscriber launches tablesync workers for its partitions so that each
tablesync worker copies data of each partition. Similarly, if it has a
partition table with publish_via_root = true, the subscriber launches
a tablesync worker for the parent table. With the current design,
since the tablesync worker is responsible for both schema and data
synchronization for the target table, it won't be possible to
synchronize both the parent table's schema and partitions' schema.

I think one possibility to make this design work is that when
publish_via_root is false, then we assume that subscriber already has
parent table and then the individual tablesync workers can sync the
schema of partitions and their data. And when publish_via_root is
true, then the table sync worker is responsible to sync parent and
child tables along with data. Do you think such a mechanism can
address the partition table related cases?

--
With Regards,
Amit Kapila.

#59Kumar, Sachin
ssetiya@amazon.com
In reply to: Amit Kapila (#58)
RE: Initial Schema Sync for Logical Replication

From: Amit Kapila <amit.kapila16@gmail.com>
On Wed, Jul 5, 2023 at 7:45 AM Masahiko Sawada
<sawada.mshk@gmail.com> wrote:

On Mon, Jun 19, 2023 at 5:29 PM Peter Smith <smithpb2250@gmail.com>

wrote:

Hi,

Below are my review comments for the PoC patch 0001.

In addition, the patch needed rebasing, and, after I rebased it
locally in my private environment there were still test failures:
a) The 'make check' tests fail but only in a minor way due to
changes colname
b) the subscription TAP test did not work at all for me -- many errors.

Thank you for reviewing the patch.

While updating the patch, I realized that the current approach won't
work well or at least has the problem with partition tables. If a
publication has a partitioned table with publish_via_root = false, the
subscriber launches tablesync workers for its partitions so that each
tablesync worker copies data of each partition. Similarly, if it has a
partition table with publish_via_root = true, the subscriber launches
a tablesync worker for the parent table. With the current design,
since the tablesync worker is responsible for both schema and data
synchronization for the target table, it won't be possible to
synchronize both the parent table's schema and partitions' schema.

I think one possibility to make this design work is that when publish_via_root
is false, then we assume that subscriber already has parent table and then
the individual tablesync workers can sync the schema of partitions and their
data.

Since publish_via_partition_root is false by default users have to create parent table by themselves
which I think is not a good user experience.

Show quoted text

And when publish_via_root is true, then the table sync worker is
responsible to sync parent and child tables along with data. Do you think
such a mechanism can address the partition table related cases?

#60Masahiko Sawada
sawada.mshk@gmail.com
In reply to: Kumar, Sachin (#59)
Re: Initial Schema Sync for Logical Replication

On Mon, Jul 10, 2023 at 8:06 PM Kumar, Sachin <ssetiya@amazon.com> wrote:

From: Amit Kapila <amit.kapila16@gmail.com>
On Wed, Jul 5, 2023 at 7:45 AM Masahiko Sawada
<sawada.mshk@gmail.com> wrote:

On Mon, Jun 19, 2023 at 5:29 PM Peter Smith <smithpb2250@gmail.com>

wrote:

Hi,

Below are my review comments for the PoC patch 0001.

In addition, the patch needed rebasing, and, after I rebased it
locally in my private environment there were still test failures:
a) The 'make check' tests fail but only in a minor way due to
changes colname
b) the subscription TAP test did not work at all for me -- many errors.

Thank you for reviewing the patch.

While updating the patch, I realized that the current approach won't
work well or at least has the problem with partition tables. If a
publication has a partitioned table with publish_via_root = false, the
subscriber launches tablesync workers for its partitions so that each
tablesync worker copies data of each partition. Similarly, if it has a
partition table with publish_via_root = true, the subscriber launches
a tablesync worker for the parent table. With the current design,
since the tablesync worker is responsible for both schema and data
synchronization for the target table, it won't be possible to
synchronize both the parent table's schema and partitions' schema.

I think one possibility to make this design work is that when publish_via_root
is false, then we assume that subscriber already has parent table and then
the individual tablesync workers can sync the schema of partitions and their
data.

Since publish_via_partition_root is false by default users have to create parent table by themselves
which I think is not a good user experience.

I have the same concern. I think that users normally use
publish_via_partiiton_root = false if the partitioned table on the
subscriber consists of the same set of partitions as the publisher's
ones. And such users would expect the both partitioned table and its
partitions to be synchronized.

Regards,

--
Masahiko Sawada
Amazon Web Services: https://aws.amazon.com

#61Kumar, Sachin
ssetiya@amazon.com
In reply to: Masahiko Sawada (#60)
1 attachment(s)
RE: Initial Schema Sync for Logical Replication

Hi Everyone, based on internal discussion with Masahiko
I have implemented concurrent DDL support for initial schema sync.

Concurrent Patch workflow

1. When TableSync worker creates a replicaton slot, It will
save the slot lsn into pg_subscription_rel with
SUBREL_SYNC_SCHEMA_DATA_SYNC state, and it will wait for
its state to be SUBREL_STATE_DATASYNC.

2. Applier process will apply DDLs till tablesync lsn, and then
it will change pg_subscription_rel state to SUBREL_STATE_DATASYNC.

3. TableSync will continue applying pending DML/DDls till it catch up.

This patch needs DDL replication to apply concurrent DDLs, I have cherry-
picked this DDL patch [0]= /messages/by-id/OS0PR01MB57163E6487EFF7378CB8E17C9438A@OS0PR01MB5716.jpnprd01.prod.outlook.com

Issues
1) needs testing for concurrent DDLs, Not sure how to make tablesync process wait so that
concurrent DDLs can be issued on publisher.
2) In my testing created table does not appear on the same conenction on subscriber,
I have to reconnect to see table.
3) maybe different chars for SUBREL_SYNC_SCHEMA_DATA_INIT and SUBREL_SYNC_SCHEMA_DATA_SYNC,
currently they are 'x' and 'y'.
4) I need to add SUBREL_SYNC_SCHEMA_DATA_INIT and SUBREL_SYNC_SCHEMA_DATA_SYNC to
pg_subscription_rel_d.h to make it compile succesfully.
5) It only implement concurrent alter as of now

[0]: = /messages/by-id/OS0PR01MB57163E6487EFF7378CB8E17C9438A@OS0PR01MB5716.jpnprd01.prod.outlook.com

Attachments:

ConcurrentDDL.patchapplication/octet-stream; name=ConcurrentDDL.patchDownload
diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c
index 2d82fbfad2..9231c7e0c9 100644
--- a/src/backend/catalog/pg_subscription.c
+++ b/src/backend/catalog/pg_subscription.c
@@ -25,6 +25,7 @@
 #include "catalog/pg_type.h"
 #include "miscadmin.h"
 #include "nodes/makefuncs.h"
+#include "replication/worker_internal.h"
 #include "storage/lmgr.h"
 #include "utils/array.h"
 #include "utils/builtins.h"
@@ -538,3 +539,123 @@ GetSubscriptionRelations(Oid subid, bool not_ready)
 
 	return res;
 }
+/*
+ *  Does any TableSync workers are waiting for applier process to apply pending schema updates ?
+ *  We will scan pg_subscription_rel to see if any table is in SUBREL_SYNC_SCHEMA_DATA_SYNC
+ *  state.
+ */
+bool HasTableSchemaSyncPending(Oid subid)
+{
+	Relation	rel;
+	HeapTuple	tup;
+	ScanKeyData skey[1];
+	SysScanDesc scan;
+	bool result = false;
+
+	rel = table_open(SubscriptionRelRelationId, AccessShareLock);
+
+	ScanKeyInit(&skey[0],
+				Anum_pg_subscription_rel_srsubid,
+				BTEqualStrategyNumber, F_OIDEQ,
+				ObjectIdGetDatum(subid));
+
+	scan = systable_beginscan(rel, InvalidOid, false,
+							  NULL, 1, skey);
+	while (HeapTupleIsValid(tup = systable_getnext(scan)))
+	{
+		Form_pg_subscription_rel subrel;
+
+		subrel = (Form_pg_subscription_rel) GETSTRUCT(tup);
+		if (subrel->srsubstate == SUBREL_SYNC_SCHEMA_DATA_SYNC)
+		{
+			result = true;
+			break;
+		}
+	}
+
+	/* Cleanup */
+	systable_endscan(scan);
+	table_close(rel, AccessShareLock);
+	return result;
+}
+
+/*
+ * Update pg_subscription_rel state if we have already applied the lsn > srsublsn
+ * If there is corresponding TableSync worker we will also update its relstate, So
+ * that it can wake up.
+ * return true if no entry in pg_subscription_rel are in SUBREL_SYNC_SCHEMA_DATA_SYNC
+ * else false.
+ */
+bool UpdateTableSchemaSyncPending(Oid subid, XLogRecPtr sublsn)
+{
+	Relation	rel;
+	HeapTuple	tup;
+	ScanKeyData skey[1];
+	SysScanDesc scan;
+	bool result = true;
+
+	rel = table_open(SubscriptionRelRelationId, AccessShareLock);
+
+	ScanKeyInit(&skey[0],
+				Anum_pg_subscription_rel_srsubid,
+				BTEqualStrategyNumber, F_OIDEQ,
+				ObjectIdGetDatum(subid));
+
+	scan = systable_beginscan(rel, InvalidOid, false,
+							  NULL, 1, skey);
+	while (HeapTupleIsValid(tup = systable_getnext(scan)))
+	{
+		Form_pg_subscription_rel subrel;
+		bool		isnull;
+		Datum		d;
+		XLogRecPtr tablesync_sublsn;
+		subrel = (Form_pg_subscription_rel) GETSTRUCT(tup);
+		/* Get the LSN */
+		d = SysCacheGetAttr(SUBSCRIPTIONRELMAP, tup,
+							Anum_pg_subscription_rel_srsublsn, &isnull);
+		if (isnull)
+			tablesync_sublsn = InvalidXLogRecPtr;
+		else
+			tablesync_sublsn = DatumGetLSN(d);
+		if (subrel->srsubstate == SUBREL_SYNC_SCHEMA_DATA_SYNC)
+		{
+			// We dont have to exit here , we can unblock another tablesync workers
+			if (sublsn < tablesync_sublsn)
+			{
+				result = false;
+				continue;
+			}
+			bool		nulls[Natts_pg_subscription_rel];
+			Datum		values[Natts_pg_subscription_rel];
+			bool		replaces[Natts_pg_subscription_rel];	
+			/* Update the tuple. */
+			memset(values, 0, sizeof(values));
+			memset(nulls, false, sizeof(nulls));
+			memset(replaces, false, sizeof(replaces));
+			replaces[Anum_pg_subscription_rel_srsubstate - 1] = true;
+			values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(SUBREL_STATE_DATASYNC);
+			replaces[Anum_pg_subscription_rel_srsublsn - 1] = true;
+			nulls[Anum_pg_subscription_rel_srsublsn - 1] = true;
+			tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
+									replaces);
+			/* Update the catalog. */
+			CatalogTupleUpdate(rel, &tup->t_self, tup);
+			//Update tablesync worker state
+			LogicalRepWorker *worker;
+			LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
+			worker = logicalrep_worker_find(subid, subrel->srrelid, false);
+			LWLockRelease(LogicalRepWorkerLock);
+			if (worker)
+			{
+				SpinLockAcquire(&worker->relmutex);
+				worker->relstate = SUBREL_STATE_DATASYNC;
+				SpinLockRelease(&worker->relmutex);
+			}
+		}
+	}
+
+	/* Cleanup */
+	systable_endscan(scan);
+	table_close(rel, AccessShareLock);
+	return result;
+}
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 9a534fbb00..d07580f799 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -809,6 +809,11 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 			 */
 			tables = fetch_table_list(wrconn, publications, false);
 
+			/*
+			 * Set sync state based on if we were asked to do data copy or
+			 * not.
+			 */
+			table_state = opts.copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY;
 			/*
 			 * If requested, create permanent slot for the subscription. We
 			 * won't use the initial snapshot for anything, so no need to
@@ -860,6 +865,9 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 				{
 					Assert(snapshot_name != NULL);
 					synchronize_table_schema(conninfo, tables_schema_sync, snapshot_name);
+					/* We only support schema sync when copy_data is enabled */
+					if (table_state == SUBREL_STATE_INIT)
+						table_state = SUBREL_SYNC_SCHEMA_DATA_INIT;
 				}
 
 				if (twophase_enabled)
@@ -870,11 +878,6 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 								opts.slot_name)));
 			}
 
-			/*
-			 * Set sync state based on if we were asked to do data copy or
-			 * not.
-			 */
-			table_state = opts.copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY;
 			foreach(lc, tables)
 			{
 				RangeVar   *rv = (RangeVar *) lfirst(lc);
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index d870a9f69c..f708af25dc 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -1310,7 +1310,9 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 
 	Assert(MyLogicalRepWorker->relstate == SUBREL_STATE_INIT ||
 		   MyLogicalRepWorker->relstate == SUBREL_STATE_DATASYNC ||
-		   MyLogicalRepWorker->relstate == SUBREL_STATE_FINISHEDCOPY);
+		   MyLogicalRepWorker->relstate == SUBREL_STATE_FINISHEDCOPY ||
+		   MyLogicalRepWorker->relstate == SUBREL_SYNC_SCHEMA_DATA_INIT ||
+		   MyLogicalRepWorker->relstate == SUBREL_SYNC_SCHEMA_DATA_SYNC);
 
 	/* Assign the origin tracking record name. */
 	ReplicationOriginNameForLogicalRep(MySubscription->oid,
@@ -1355,29 +1357,35 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 		goto copy_table_done;
 	}
 
-	SpinLockAcquire(&MyLogicalRepWorker->relmutex);
-	MyLogicalRepWorker->relstate = SUBREL_STATE_DATASYNC;
-	MyLogicalRepWorker->relstate_lsn = InvalidXLogRecPtr;
-	SpinLockRelease(&MyLogicalRepWorker->relmutex);
-
-	/* Update the state and make it visible to others. */
-	StartTransactionCommand();
-	UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
-							   MyLogicalRepWorker->relid,
-							   MyLogicalRepWorker->relstate,
-							   MyLogicalRepWorker->relstate_lsn);
-	CommitTransactionCommand();
-	pgstat_report_stat(true);
+	if (MyLogicalRepWorker->relstate == SUBREL_STATE_INIT)
+	{	
+		SpinLockAcquire(&MyLogicalRepWorker->relmutex);
+		MyLogicalRepWorker->relstate = SUBREL_STATE_DATASYNC;
+		MyLogicalRepWorker->relstate_lsn = InvalidXLogRecPtr;
+		SpinLockRelease(&MyLogicalRepWorker->relmutex);
+		
+		/* Update the state and make it visible to others. */
+		StartTransactionCommand();
+		UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
+								   MyLogicalRepWorker->relid,
+								   MyLogicalRepWorker->relstate,
+								   MyLogicalRepWorker->relstate_lsn);
+		CommitTransactionCommand();
+	}
 
 	StartTransactionCommand();
 
-	/*
-	 * Use a standard write lock here. It might be better to disallow access
-	 * to the table while it's being synchronized. But we don't want to block
-	 * the main apply process from working and it has to open the relation in
-	 * RowExclusiveLock when remapping remote relation id to local one.
-	 */
-	rel = table_open(MyLogicalRepWorker->relid, RowExclusiveLock);
+	if (MyLogicalRepWorker->relstate != SUBREL_SYNC_SCHEMA_DATA_INIT &&
+		   MyLogicalRepWorker->relstate != SUBREL_SYNC_SCHEMA_DATA_SYNC)
+	{
+		/*
+		 * Use a standard write lock here. It might be better to disallow access
+		 * to the table while it's being synchronized. But we don't want to block
+		 * the main apply process from working and it has to open the relation in
+		 * RowExclusiveLock when remapping remote relation id to local one.
+		 */
+		rel = table_open(MyLogicalRepWorker->relid, RowExclusiveLock);
+	}
 
 	/*
 	 * Start a transaction in the remote node in REPEATABLE READ mode.  This
@@ -1402,6 +1410,27 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 	walrcv_create_slot(LogRepWorkerWalRcvConn,
 					   slotname, false /* permanent */ , false /* two_phase */ ,
 					   CRS_USE_SNAPSHOT, origin_startpos);
+	if (MyLogicalRepWorker->relstate == SUBREL_SYNC_SCHEMA_DATA_INIT ||
+			MyLogicalRepWorker->relstate == SUBREL_SYNC_SCHEMA_DATA_SYNC)
+	{
+		SpinLockAcquire(&MyLogicalRepWorker->relmutex);
+		/*
+		 * relstate_lsn should be updated first See 
+		 * should_apply_ddl_changes_for_rel for loop
+		 */
+		MyLogicalRepWorker->relstate_lsn = *origin_startpos;
+		MyLogicalRepWorker->relstate = SUBREL_SYNC_SCHEMA_DATA_SYNC;
+		SpinLockRelease(&MyLogicalRepWorker->relmutex);
+		/* Update the state and make it visible to others. */
+		UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
+									MyLogicalRepWorker->relid,
+									MyLogicalRepWorker->relstate,
+									MyLogicalRepWorker->relstate_lsn);
+		CommitTransactionCommand();
+		wait_for_worker_state_change(SUBREL_STATE_DATASYNC);
+		StartTransactionCommand();
+		rel = table_open(MyLogicalRepWorker->relid, RowExclusiveLock);
+	}
 
 	/*
 	 * Setup replication origin tracking. The purpose of doing this before the
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index c233a365ba..4cff394adc 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -323,6 +323,8 @@ static List *on_commit_wakeup_workers_subids = NIL;
 
 bool		in_remote_transaction = false;
 static XLogRecPtr remote_final_lsn = InvalidXLogRecPtr;
+bool        tablesync_schema_sync_pending = true;
+static XLogRecPtr last_received_cache = InvalidXLogRecPtr;
 
 /* fields valid only when processing streamed transaction */
 static bool in_streamed_transaction = false;
@@ -525,6 +527,45 @@ should_apply_changes_for_rel(LogicalRepRelMapEntry *rel)
 				 rel->statelsn <= remote_final_lsn));
 }
 
+/*
+ * Should apply worker apply DDLs
+ */
+static bool
+should_apply_ddl_changes_for_rel(Oid relid, XLogRecPtr lsn)
+{
+	if (am_tablesync_worker())
+		return MyLogicalRepWorker->relid == relid;
+	else
+	{
+		//Find if there is a table sync worker active for this table
+		LogicalRepWorker *worker;
+		LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
+		worker = logicalrep_worker_find(MyLogicalRepWorker->subid, relid, false);
+		LWLockRelease(LogicalRepWorkerLock);
+		// Table sync worker does not exist
+		if (!worker)
+			return true;
+		for (;;)
+		{
+			if (worker->relstate != SUBREL_SYNC_SCHEMA_DATA_INIT)
+			{
+				// slot was created after ddl was issued on publisher
+				if (worker->relstate_lsn > lsn)
+					return true;
+				return false;
+			}
+			int rc;
+			rc = WaitLatch(MyLatch,
+						   WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
+						   1000L, WAIT_EVENT_LOGICAL_SYNC_STATE_CHANGE);
+		
+			if (rc & WL_LATCH_SET)
+				ResetLatch(MyLatch);
+		}
+	}
+	return true;
+}
+
 /*
  * Begin one step (one INSERT, UPDATE, etc) of a replication transaction.
  *
@@ -3419,6 +3460,38 @@ apply_handle_ddl(StringInfo s)
 
 		plantree_list = pg_plan_queries(querytree_list, ddl_command, 0, NULL);
 
+		if (commandTag == CMDTAG_ALTER_TABLE)
+		{
+			AlterTableStmt *alstmt = (AlterTableStmt *) command->stmt;
+			char	   *schemaname = NULL;
+			char	   *relname = NULL;
+			Oid			relid;
+			Oid			relnamespace_oid = InvalidOid;
+			RangeVar *rv = alstmt->relation;
+			if (!rv)
+			{
+				MemoryContextSwitchTo(oldcontext);
+				return;
+			}
+			schemaname = rv->schemaname;
+			relname = rv->relname;
+			if (schemaname != NULL)
+				relnamespace_oid = get_namespace_oid(schemaname, false);
+			
+			if (OidIsValid(relnamespace_oid))
+				relid = get_relname_relid(relname, relnamespace_oid);
+			else
+				relid = RelnameGetRelid(relname);
+			if (OidIsValid(relid))
+			{
+				if(!should_apply_ddl_changes_for_rel(relid, lsn))
+				{
+					MemoryContextSwitchTo(oldcontext);
+					return;
+				}
+			}
+		}
+
 		/* Done with the snapshot used for parsing/planning */
 		if (snapshot_set)
 			PopActiveSnapshot();
@@ -3686,6 +3759,31 @@ UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply)
 		MyLogicalRepWorker->reply_time = send_time;
 	}
 }
+/* 
+ * Signal TableSync worker waiting for applier worker to process pending DDL changes
+ * And also update pg_subscription_rel for SUBREL_SYNC_SCHEMA_DATA_SYNC entries
+ */
+static void signal_tablesync_worker(XLogRecPtr last_received)
+{
+	// Since HasTableSchemaSyncPending will scan pg_subscription_rel
+	// which is expensive operation, we are using caching to avoid
+	// scanning every time.
+	if (tablesync_schema_sync_pending && !IsTransactionState())
+	{
+		if (last_received_cache != last_received)
+		{
+			StartTransactionCommand();
+			if (HasTableSchemaSyncPending(MyLogicalRepWorker->subid))
+			{
+				//TODO update logicalrep_rel_entry
+				if (UpdateTableSchemaSyncPending(MyLogicalRepWorker->subid, last_received))
+					tablesync_schema_sync_pending = false;
+				last_received_cache = last_received;
+			}
+			CommitTransactionCommand();
+		}
+	}
+}
 
 /*
  * Apply main loop.
@@ -3825,6 +3923,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
 
 					MemoryContextReset(ApplyMessageContext);
 				}
+				signal_tablesync_worker(last_received);
 
 				len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
 			}
@@ -3832,6 +3931,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
 
 		/* confirm all writes so far */
 		send_feedback(last_received, false, false);
+		signal_tablesync_worker(last_received);
 
 		if (!in_remote_transaction && !in_streamed_transaction)
 		{
diff --git a/src/include/catalog/pg_subscription_rel.h b/src/include/catalog/pg_subscription_rel.h
index 60a2bcca23..80d119530a 100644
--- a/src/include/catalog/pg_subscription_rel.h
+++ b/src/include/catalog/pg_subscription_rel.h
@@ -57,6 +57,8 @@ DECLARE_UNIQUE_INDEX_PKEY(pg_subscription_rel_srrelid_srsubid_index, 6117, Subsc
  *		substate constants
  * ----------------
  */
+#define SUBREL_SYNC_SCHEMA_DATA_INIT      'x'
+#define SUBREL_SYNC_SCHEMA_DATA_SYNC      'y'
 #define SUBREL_STATE_INIT		'i' /* initializing (sublsn NULL) */
 #define SUBREL_STATE_DATASYNC	'd' /* data is being synchronized (sublsn
 									 * NULL) */
@@ -89,5 +91,7 @@ extern void RemoveSubscriptionRel(Oid subid, Oid relid);
 
 extern bool HasSubscriptionRelations(Oid subid);
 extern List *GetSubscriptionRelations(Oid subid, bool not_ready);
+extern bool HasTableSchemaSyncPending(Oid subid);
+extern bool UpdateTableSchemaSyncPending(Oid subid, XLogRecPtr sublsn);
 
 #endif							/* PG_SUBSCRIPTION_REL_H */
diff --git a/src/include/replication/logicalrelation.h b/src/include/replication/logicalrelation.h
index 921b9974db..c2058fcd70 100644
--- a/src/include/replication/logicalrelation.h
+++ b/src/include/replication/logicalrelation.h
@@ -50,5 +50,6 @@ extern void logicalrep_rel_close(LogicalRepRelMapEntry *rel,
 								 LOCKMODE lockmode);
 extern bool IsIndexUsableForReplicaIdentityFull(IndexInfo *indexInfo);
 extern Oid	GetRelationIdentityOrPK(Relation rel);
+extern LogicalRepRelMapEntry* logicalrep_rel_find(LogicalRepRelId remoteid);
 
 #endif							/* LOGICALRELATION_H */
#62vignesh C
vignesh21@gmail.com
In reply to: Kumar, Sachin (#61)
Re: Initial Schema Sync for Logical Replication

On Thu, 31 Aug 2023 at 17:18, Kumar, Sachin <ssetiya@amazon.com> wrote:

Hi Everyone, based on internal discussion with Masahiko
I have implemented concurrent DDL support for initial schema sync.

Concurrent Patch workflow

1. When TableSync worker creates a replicaton slot, It will
save the slot lsn into pg_subscription_rel with
SUBREL_SYNC_SCHEMA_DATA_SYNC state, and it will wait for
its state to be SUBREL_STATE_DATASYNC.

2. Applier process will apply DDLs till tablesync lsn, and then
it will change pg_subscription_rel state to SUBREL_STATE_DATASYNC.

3. TableSync will continue applying pending DML/DDls till it catch up.

This patch needs DDL replication to apply concurrent DDLs, I have cherry-
picked this DDL patch [0]

Can you rebase the patch and post the complete set of required changes
for the concurrent DDL, I will have a look at them.

Regards,
Vignesh

#63vignesh C
vignesh21@gmail.com
In reply to: Masahiko Sawada (#56)
Re: Initial Schema Sync for Logical Replication

On Fri, 7 Jul 2023 at 12:41, Masahiko Sawada <sawada.mshk@gmail.com> wrote:

On Wed, Jul 5, 2023 at 11:14 AM Masahiko Sawada <sawada.mshk@gmail.com> wrote:

On Mon, Jun 19, 2023 at 5:29 PM Peter Smith <smithpb2250@gmail.com> wrote:

Hi,

Below are my review comments for the PoC patch 0001.

In addition, the patch needed rebasing, and, after I rebased it
locally in my private environment there were still test failures:
a) The 'make check' tests fail but only in a minor way due to changes colname
b) the subscription TAP test did not work at all for me -- many errors.

Thank you for reviewing the patch.

While updating the patch, I realized that the current approach won't
work well or at least has the problem with partition tables. If a
publication has a partitioned table with publish_via_root = false, the
subscriber launches tablesync workers for its partitions so that each
tablesync worker copies data of each partition. Similarly, if it has a
partition table with publish_via_root = true, the subscriber launches
a tablesync worker for the parent table. With the current design,
since the tablesync worker is responsible for both schema and data
synchronization for the target table, it won't be possible to
synchronize both the parent table's schema and partitions' schema. For
example, there is no pg_subscription_rel entry for the parent table if
the publication has publish_via_root = false. In addition to that, we
need to be careful about the order of synchronization of the parent
table and its partitions. We cannot start schema synchronization for
partitions before its parent table. So it seems to me that we need to
consider another approach.

So I've implemented a different approach; doing schema synchronization
at a CREATE SUBSCRIPTION time. The backend executing CREATE
SUBSCRIPTION uses pg_dump and restores the table schemas including
both partitioned tables and their partitions regardless of
publish_via_partition_root option, and then creates
pg_subscription_rel entries for tables while respecting
publish_via_partition_root option.

There is a window between table creations and the tablesync workers
starting to process the tables. If DDLs are executed in this window,
the tablesync worker might fail because the table schema might have
already been changed. We need to mention this note in the
documentation. BTW, I think we will be able to get rid of this
downside if we support DDL replication. DDLs executed in the window
are applied by the apply worker and it takes over the data copy to the
tablesync worker at a certain LSN.

I've attached PoC patches. It has regression tests but doesn't have
the documentation yet.

Few thoughts:
1) There might be a scenario where we will create multiple
subscriptions with the tables overlapping across the subscription, in
that case, the table will be present when the 2nd subscription is
being created, can we do something in this case:
+               /*
+                * Error if the table is already present on the
subscriber. Please note
+                * that concurrent DDLs can create the table as we
don't acquire any lock
+                * on the table.
+                *
+                * XXX: do we want to overwrite it (or optionally)?
+                */
+               if (OidIsValid(RangeVarGetRelid(rv, AccessShareLock, true)))
+                       ereport(ERROR,
+                                       (errmsg("existing table %s
cannot synchronize table schema",
+                                                       rv->relname)));

2) Should we clean the replication slot in case of failures, currently
the replication slot is left over.

3) Is it expected that all of the dependencies like type/domain etc
should be created by the user before creating a subscription with
copy_schema, currently we are taking care of creating the sequences
for tables, is this an exception?

4) If a column list publication is created, currently we are getting
all of the columns, should we get only the specified columns in this
case?

Regards,
Vignesh

#64Kumar, Sachin
ssetiya@amazon.com
In reply to: vignesh C (#62)
RE: Initial Schema Sync for Logical Replication

From: vignesh C <vignesh21@gmail.com>
Sent: Thursday, October 19, 2023 10:41 AM
Can you rebase the patch and post the complete set of required changes for
the concurrent DDL, I will have a look at them.

Sure , I will try to send the complete rebased patch within a week.

Regards
Sachin