From 719ca0128dd9d4ded7c61d86875956114a51cf08 Mon Sep 17 00:00:00 2001 From: "kuroda.hayato%40jp.fujitsu.com" Date: Tue, 17 May 2022 08:03:31 +0000 Subject: [PATCH v3 1/3] (PoC) implement LRG Logical Replication Group (LRG) is a way to create a node group that replicates data objects and their changes to each other. All nodes in the group can execute Read-Write queries, and its changes will "eventually" send to other nodes. In order to implement this feature, two processes "LRG launcher" and "LRG worker" have been introduced. LRG launcher process will boot when the server promotes, and it has a responsibility for starting LRG worker processes. LRG worker process has a responsibility for connecting to other nodes, CREATE PUB/SUB, update system catalogs, and so on. Note that for using libpq functions in LRG worker processes, a new libirary libpqlrg has been also introduced. --- src/Makefile | 1 + src/backend/catalog/Makefile | 3 +- src/backend/postmaster/bgworker.c | 7 + src/backend/postmaster/postmaster.c | 3 + src/backend/replication/Makefile | 4 +- src/backend/replication/libpqlrg/Makefile | 38 ++ src/backend/replication/libpqlrg/libpqlrg.c | 352 +++++++++++ src/backend/replication/lrg/Makefile | 22 + src/backend/replication/lrg/lrg.c | 522 ++++++++++++++++ src/backend/replication/lrg/lrg_launcher.c | 341 ++++++++++ src/backend/replication/lrg/lrg_worker.c | 652 ++++++++++++++++++++ src/backend/storage/ipc/ipci.c | 2 + src/include/catalog/pg_lrg_info.h | 47 ++ src/include/catalog/pg_lrg_nodes.h | 54 ++ src/include/catalog/pg_lrg_pub.h | 46 ++ src/include/catalog/pg_lrg_sub.h | 46 ++ src/include/catalog/pg_proc.dat | 30 + src/include/replication/libpqlrg.h | 99 +++ src/include/replication/lrg.h | 68 ++ src/test/regress/expected/oidjoins.out | 6 + 20 files changed, 2341 insertions(+), 2 deletions(-) create mode 100644 src/backend/replication/libpqlrg/Makefile create mode 100644 src/backend/replication/libpqlrg/libpqlrg.c create mode 100644 src/backend/replication/lrg/Makefile create mode 100644 src/backend/replication/lrg/lrg.c create mode 100644 src/backend/replication/lrg/lrg_launcher.c create mode 100644 src/backend/replication/lrg/lrg_worker.c create mode 100644 src/include/catalog/pg_lrg_info.h create mode 100644 src/include/catalog/pg_lrg_nodes.h create mode 100644 src/include/catalog/pg_lrg_pub.h create mode 100644 src/include/catalog/pg_lrg_sub.h create mode 100644 src/include/replication/libpqlrg.h create mode 100644 src/include/replication/lrg.h diff --git a/src/Makefile b/src/Makefile index 79e274a476..75db706762 100644 --- a/src/Makefile +++ b/src/Makefile @@ -23,6 +23,7 @@ SUBDIRS = \ interfaces \ backend/replication/libpqwalreceiver \ backend/replication/pgoutput \ + backend/replication/libpqlrg \ fe_utils \ bin \ pl \ diff --git a/src/backend/catalog/Makefile b/src/backend/catalog/Makefile index 89a0221ec9..744fdf4fb8 100644 --- a/src/backend/catalog/Makefile +++ b/src/backend/catalog/Makefile @@ -72,7 +72,8 @@ CATALOG_HEADERS := \ pg_collation.h pg_parameter_acl.h pg_partitioned_table.h \ pg_range.h pg_transform.h \ pg_sequence.h pg_publication.h pg_publication_namespace.h \ - pg_publication_rel.h pg_subscription.h pg_subscription_rel.h + pg_publication_rel.h pg_subscription.h pg_subscription_rel.h \ + pg_lrg_info.h pg_lrg_nodes.h pg_lrg_pub.h pg_lrg_sub.h GENERATED_HEADERS := $(CATALOG_HEADERS:%.h=%_d.h) schemapg.h system_fk_info.h diff --git a/src/backend/postmaster/bgworker.c b/src/backend/postmaster/bgworker.c index 40601aefd9..49d8ff1878 100644 --- a/src/backend/postmaster/bgworker.c +++ b/src/backend/postmaster/bgworker.c @@ -20,6 +20,7 @@ #include "postmaster/bgworker_internals.h" #include "postmaster/interrupt.h" #include "postmaster/postmaster.h" +#include "replication/lrg.h" #include "replication/logicallauncher.h" #include "replication/logicalworker.h" #include "storage/dsm.h" @@ -128,6 +129,12 @@ static const struct }, { "ApplyWorkerMain", ApplyWorkerMain + }, + { + "lrg_launcher_main", lrg_launcher_main + }, + { + "lrg_worker_main", lrg_worker_main } }; diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c index 3b73e26956..b900008cdd 100644 --- a/src/backend/postmaster/postmaster.c +++ b/src/backend/postmaster/postmaster.c @@ -118,6 +118,7 @@ #include "postmaster/postmaster.h" #include "postmaster/syslogger.h" #include "replication/logicallauncher.h" +#include "replication/lrg.h" #include "replication/walsender.h" #include "storage/fd.h" #include "storage/ipc.h" @@ -1020,6 +1021,8 @@ PostmasterMain(int argc, char *argv[]) */ ApplyLauncherRegister(); + LrgLauncherRegister(); + /* * process any libraries that should be preloaded at postmaster start */ diff --git a/src/backend/replication/Makefile b/src/backend/replication/Makefile index 3d8fb70c0e..49ffc243f6 100644 --- a/src/backend/replication/Makefile +++ b/src/backend/replication/Makefile @@ -35,7 +35,9 @@ OBJS = \ walreceiverfuncs.o \ walsender.o -SUBDIRS = logical +SUBDIRS = \ + logical \ + lrg include $(top_srcdir)/src/backend/common.mk diff --git a/src/backend/replication/libpqlrg/Makefile b/src/backend/replication/libpqlrg/Makefile new file mode 100644 index 0000000000..72d911a918 --- /dev/null +++ b/src/backend/replication/libpqlrg/Makefile @@ -0,0 +1,38 @@ +#------------------------------------------------------------------------- +# +# Makefile-- +# Makefile for src/backend/replication/lrg/libpqlrg +# +# IDENTIFICATION +# src/backend/replication/lrg/libpqlrg/Makefile +# +#------------------------------------------------------------------------- + +subdir = src/backend/replication/lrg/libpqlrg +top_builddir = ../../../.. +include $(top_builddir)/src/Makefile.global + +override CPPFLAGS := -I$(srcdir) -I$(libpq_srcdir) $(CPPFLAGS) + +OBJS = \ + $(WIN32RES) \ + libpqlrg.o + +SHLIB_LINK_INTERNAL = $(libpq) +SHLIB_LINK = $(filter -lintl, $(LIBS)) +SHLIB_PREREQS = submake-libpq +PGFILEDESC = "libpqlrg" +NAME = libpqlrg + +all: all-shared-lib + +include $(top_srcdir)/src/Makefile.shlib + +install: all installdirs install-lib + +installdirs: installdirs-lib + +uninstall: uninstall-lib + +clean distclean maintainer-clean: clean-lib + rm -f $(OBJS) diff --git a/src/backend/replication/libpqlrg/libpqlrg.c b/src/backend/replication/libpqlrg/libpqlrg.c new file mode 100644 index 0000000000..b313e7c0b8 --- /dev/null +++ b/src/backend/replication/libpqlrg/libpqlrg.c @@ -0,0 +1,352 @@ +/*------------------------------------------------------------------------- + * + * libpqlrg.c + * + * This file contains the libpq-specific parts of lrg feature. It's + * loaded as a dynamic module to avoid linking the main server binary with + * libpq. + *------------------------------------------------------------------------- + */ + + +#include "postgres.h" + +#include "access/heapam.h" +#include "funcapi.h" +#include "libpq-fe.h" +#include "lib/stringinfo.h" +#include "replication/libpqlrg.h" +#include "replication/lrg.h" +#include "utils/snapmgr.h" + +PG_MODULE_MAGIC; + +void _PG_init(void); + +/* Prototypes for interface functions */ +static void libpqlrg_connect(const char *connstring, PGconn **conn); +static bool libpqlrg_check_group(PGconn *conn, const char *group_name); +static void libpqlrg_copy_lrg_nodes(PGconn *remoteconn, PGconn *localconn); +static void libpqlrg_insert_into_lrg_nodes(PGconn *remoteconn, + const char *node_id, LRG_NODE_STATE status, + const char *node_name, const char *local_connstring, + const char *upstream_connstring); + +static void libpqlrg_create_subscription(const char *group_name, const char *publisher_connstring, + const char *publisher_node_id, const char *subscriber_node_id, + PGconn *subscriberconn, const char *options); + +static void libpqlrg_drop_publication(const char *group_name, + PGconn *publisherconn); + +static void libpqlrg_drop_subscription(const char *group_name, + const char *publisher_node_id, const char *subscriber_node_id, + PGconn *subscriberconn); + +static void libpqlrg_delete_from_nodes(PGconn *conn, const char *node_id); + +static void libpqlrg_cleanup(PGconn *conn); + +static void libpqlrg_disconnect(PGconn *conn); + +static lrg_function_types PQLrgFunctionTypes = +{ + libpqlrg_connect, + libpqlrg_check_group, + libpqlrg_copy_lrg_nodes, + libpqlrg_insert_into_lrg_nodes, + libpqlrg_create_subscription, + libpqlrg_drop_publication, + libpqlrg_drop_subscription, + libpqlrg_delete_from_nodes, + libpqlrg_cleanup, + libpqlrg_disconnect +}; + +/* + * Just a wrapper for PQconnectdb() and PQstatus(). + */ +static void +libpqlrg_connect(const char *connstring, PGconn **conn) +{ + *conn = PQconnectdb(connstring); + if (PQstatus(*conn) != CONNECTION_OK) + elog(ERROR, "failed to connect"); +} + +/* + * Check whether the node is in the specified group or not. + */ +static bool +libpqlrg_check_group(PGconn *conn, const char *group_name) +{ + PGresult *result; + StringInfoData query; + bool ret; + + Assert(PQstatus(conn) == CONNECTION_OK); + initStringInfo(&query); + appendStringInfo(&query, "SELECT COUNT(*) FROM pg_lrg_info WHERE groupname = '%s'", group_name); + + result = PQexec(conn, query.data); + + ret = atoi(PQgetvalue(result, 0, 0)); + pfree(query.data); + + return ret != 0; +} + +/* + * Copy pg_lrg_nodes from remoteconn. + */ +static void +libpqlrg_copy_lrg_nodes(PGconn *remoteconn, PGconn *localconn) +{ + PGresult *result; + StringInfoData query; + int i, num_tuples; + + Assert(PQstatus(remoteconn) == CONNECTION_OK + && PQstatus(localconn) == CONNECTION_OK); + initStringInfo(&query); + + + /* + * Note that COPY command cannot be used here because group_oid + * might be different between remote and local. + */ + appendStringInfo(&query, "SELECT nodeid, status, nodename, " + "localconn, upstreamconn FROM pg_lrg_nodes"); + result = PQexec(remoteconn, query.data); + if (PQresultStatus(result) != PGRES_TUPLES_OK) + elog(ERROR, "failed to read pg_lrg_nodes"); + + resetStringInfo(&query); + + num_tuples = PQntuples(result); + + for(i = 0; i < num_tuples; i++) + { + char *node_id; + char *status; + char *nodename; + char *localconn; + char *upstreamconn; + + node_id = PQgetvalue(result, i, 0); + status = PQgetvalue(result, i, 1); + nodename = PQgetvalue(result, i, 2); + localconn = PQgetvalue(result, i, 3); + upstreamconn = PQgetvalue(result, i, 4); + + StartTransactionCommand(); + (void) GetTransactionSnapshot(); + /* + * group_oid is adjusted to local value + */ + lrg_add_nodes(node_id, get_group_info(NULL), atoi(status), nodename, localconn, upstreamconn); + CommitTransactionCommand(); + } +} + +/* + * Insert data to remote's pg_lrg_nodes. It will be done + * via internal SQL function. + */ +static void +libpqlrg_insert_into_lrg_nodes(PGconn *remoteconn, + const char *node_id, LRG_NODE_STATE status, + const char *node_name, const char *local_connstring, + const char *upstream_connstring) +{ + StringInfoData query; + PGresult *result; + + Assert(PQstatus(remoteconn) == CONNECTION_OK + && node_id != NULL + && node_name != NULL + && local_connstring != NULL + && upstream_connstring != NULL); + + initStringInfo(&query); + appendStringInfo(&query, "SELECT lrg_insert_into_nodes('%s', %d, '%s', '%s', '%s')", + node_id, status, node_name, local_connstring, upstream_connstring); + + result = PQexec(remoteconn, query.data); + if (PQresultStatus(result) != PGRES_TUPLES_OK) + elog(ERROR, "failed to execute libpqlrg_insert_to_remote_lrg_nodes: %s", query.data); + PQclear(result); + + pfree(query.data); +} + +/* + * Create a subscription with given name and parameters, and + * add a tuple to remote's pg_lrg_sub. + * + * Note that both of this and libpqlrg_insert_into_lrg_nodes() + * must be called during attaching a node. + */ +static void +libpqlrg_create_subscription(const char *group_name, const char *publisher_connstring, + const char *publisher_node_id, const char *subscriber_node_id, + PGconn *subscriberconn, const char *options) +{ + StringInfoData query, sub_name; + PGresult *result; + + Assert(publisher_connstring != NULL && subscriberconn != NULL); + + /* + * the name of subscriber is just concat of two node_id. + */ + initStringInfo(&query); + initStringInfo(&sub_name); + + /* + * construct the name of subscription and query. + */ + appendStringInfo(&sub_name, "sub_%s_%s", subscriber_node_id, publisher_node_id); + appendStringInfo(&query, "CREATE SUBSCRIPTION %s CONNECTION '%s' PUBLICATION pub_for_%s", + sub_name.data, publisher_connstring, group_name); + + if (options) + appendStringInfo(&query, " WITH (%s)", options); + + result = PQexec(subscriberconn, query.data); + if (PQresultStatus(result) != PGRES_COMMAND_OK) + elog(ERROR, "failed to create subscription: %s", query.data); + PQclear(result); + + resetStringInfo(&query); + appendStringInfo(&query, "SELECT lrg_insert_into_sub('%s')", sub_name.data); + result = PQexec(subscriberconn, query.data); + if (PQresultStatus(result) != PGRES_TUPLES_OK) + elog(ERROR, "failed to execute lrg_insert_into_sub: %s", query.data); + PQclear(result); + + pfree(sub_name.data); + pfree(query.data); +} + +/* + * Drop a given publication and delete a tuple + * from remote's pg_lrg_pub. + */ +static void +libpqlrg_drop_publication(const char *group_name, + PGconn *publisherconn) +{ + StringInfoData query, pub_name; + PGresult *result; + + Assert(PQstatus(publisherconn) == CONNECTION_OK); + + initStringInfo(&query); + initStringInfo(&pub_name); + + appendStringInfo(&pub_name, "pub_for_%s", group_name); + appendStringInfo(&query, "DROP PUBLICATION %s", pub_name.data); + + result = PQexec(publisherconn, query.data); + if (PQresultStatus(result) != PGRES_COMMAND_OK) + elog(ERROR, "failed to drop publication: %s", query.data); + PQclear(result); + pfree(pub_name.data); + pfree(query.data); +} + +/* + * same as above, but for subscription. + */ +static void +libpqlrg_drop_subscription(const char *group_name, + const char *publisher_node_id, const char *subscriber_node_id, + PGconn *subscriberconn) +{ + StringInfoData query, sub_name; + PGresult *result; + + Assert(PQstatus(subscriberconn) == CONNECTION_OK); + + /* + * the name of subscriber is just concat of two node_id. + */ + initStringInfo(&query); + initStringInfo(&sub_name); + + /* + * construct the name of subscription and query. + */ + appendStringInfo(&sub_name, "sub_%s_%s", subscriber_node_id, publisher_node_id); + appendStringInfo(&query, "DROP SUBSCRIPTION %s", sub_name.data); + + result = PQexec(subscriberconn, query.data); + if (PQresultStatus(result) != PGRES_COMMAND_OK) + elog(ERROR, "failed to drop subscription: %s", query.data); + PQclear(result); + pfree(sub_name.data); + pfree(query.data); +} + +/* + * Delete data to remote's pg_lrg_nodes. It will be done + * via internal SQL function. + */ +static void +libpqlrg_delete_from_nodes(PGconn *conn, const char *node_id) +{ + StringInfoData query; + PGresult *result; + + Assert(PQstatus(conn) == CONNECTION_OK); + + initStringInfo(&query); + appendStringInfo(&query, "DELETE FROM pg_lrg_nodes WHERE nodeid = '%s'", node_id); + + result = PQexec(conn, query.data); + if (PQresultStatus(result) != PGRES_COMMAND_OK) + elog(ERROR, "failed to delete from pg_lrg_nodes: %s", query.data); + + PQclear(result); + pfree(query.data); +} + +/* + * Delete all data from LRG catalogs + */ +static void +libpqlrg_cleanup(PGconn *conn) +{ + PGresult *result; + Assert(PQstatus(conn) == CONNECTION_OK); + + result = PQexec(conn, "DELETE FROM pg_lrg_pub;" + "DELETE FROM pg_lrg_sub;" + "DELETE FROM pg_lrg_nodes;" + "DELETE FROM pg_lrg_info;"); + if (PQresultStatus(result) != PGRES_COMMAND_OK) + elog(ERROR, "failed to DELETE"); + + PQclear(result); +} + +/* + * Just a wrapper for PQfinish() + */ +static void +libpqlrg_disconnect(PGconn *conn) +{ + PQfinish(conn); +} + +/* + * Module initialization function + */ +void +_PG_init(void) +{ + if (LrgFunctionTypes != NULL) + elog(ERROR, "libpqlrg already loaded"); + LrgFunctionTypes = &PQLrgFunctionTypes; +} diff --git a/src/backend/replication/lrg/Makefile b/src/backend/replication/lrg/Makefile new file mode 100644 index 0000000000..4ce929b6a4 --- /dev/null +++ b/src/backend/replication/lrg/Makefile @@ -0,0 +1,22 @@ +#------------------------------------------------------------------------- +# +# Makefile-- +# Makefile for src/backend/replication/lrg +# +# IDENTIFICATION +# src/backend/replication/lrg/Makefile +# +#------------------------------------------------------------------------- + +subdir = src/backend/replication/lrg +top_builddir = ../../../.. +include $(top_builddir)/src/Makefile.global + +override CPPFLAGS := -I$(srcdir) -I$(libpq_srcdir) $(CPPFLAGS) + +OBJS = \ + lrg.o \ + lrg_launcher.o \ + lrg_worker.o + +include $(top_srcdir)/src/backend/common.mk diff --git a/src/backend/replication/lrg/lrg.c b/src/backend/replication/lrg/lrg.c new file mode 100644 index 0000000000..be8a757162 --- /dev/null +++ b/src/backend/replication/lrg/lrg.c @@ -0,0 +1,522 @@ +/*------------------------------------------------------------------------- + * + * lrg.c + * Constructs a logical replication group + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "access/heapam.h" +#include "access/htup_details.h" +#include "access/relscan.h" +#include "access/table.h" +#include "access/xlog.h" +#include "catalog/catalog.h" +#include "catalog/indexing.h" +#include "catalog/pg_lrg_info.h" +#include "catalog/pg_lrg_nodes.h" +#include "catalog/pg_lrg_sub.h" +#include "catalog/pg_subscription.h" +#include "miscadmin.h" +#include "postmaster/bgworker.h" +#include "replication/libpqlrg.h" +#include "replication/logicallauncher.h" +#include "replication/lrg.h" +#include "storage/lock.h" +#include "utils/builtins.h" +#include "utils/fmgrprotos.h" +#include "utils/memutils.h" +#include "utils/rel.h" +#include "utils/snapmgr.h" +#include "utils/syscache.h" + +#include "storage/proc.h" +#include "utils/guc.h" + +LrgPerdbCtxStruct *LrgPerdbCtx; + +static Size lrg_worker_array_size(void); +static Oid lrg_add_info(char *group_name, bool puballtables); +static Oid find_subscription(const char *subname); + +/* + * Helpler function for LrgLauncherShmemInit. + */ +static Size +lrg_worker_array_size(void) +{ + Size size; + + size = sizeof(LrgPerdbCtxStruct); + size = MAXALIGN(size); + /* XXX: for simplify the size of the array is set to max_worker_processes */ + size = add_size(size, mul_size(max_worker_processes, sizeof(LrgPerdbCtxStruct))); + + return size; +} + +/* + * Allocate LrgPerdbCtxStruct to the shared memory. + */ +void +LrgLauncherShmemInit(void) +{ + bool found; + + LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE); + LrgPerdbCtx = (LrgPerdbCtxStruct *) + ShmemInitStruct("Lrg Launcher Data", + lrg_worker_array_size(), + &found); + if (!found) + { + MemSet(LrgPerdbCtx, 0, lrg_worker_array_size()); + LWLockInitialize(&(LrgPerdbCtx->lock), LWLockNewTrancheId()); + } + LWLockRelease(AddinShmemInitLock); + LWLockRegisterTranche(LrgPerdbCtx->lock.tranche, "lrg"); +} + +void +LrgLauncherRegister(void) +{ + BackgroundWorker worker; + + /* + * LRG deeply depends on the logical replication mechanism, so + * skip registering the LRG launcher if logical replication + * cannot be used. + */ + if (max_logical_replication_workers == 0) + return; + + /* + * Build struct BackgroundWorker for launcher. + */ + MemSet(&worker, 0, sizeof(BackgroundWorker)); + + snprintf(worker.bgw_name, BGW_MAXLEN, "lrg launcher"); + worker.bgw_start_time = BgWorkerStart_RecoveryFinished; + worker.bgw_flags = BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION; + worker.bgw_restart_time = BGW_NEVER_RESTART; + snprintf(worker.bgw_library_name, BGW_MAXLEN, "postgres"); + snprintf(worker.bgw_function_name, BGW_MAXLEN, "lrg_launcher_main"); + RegisterBackgroundWorker(&worker); +} + +/* + * construct node_id. + * + * TODO: construct proper node_id. Currently it is just concat of + * sytem identifier and dbid. + */ +void +construct_node_id(char *out_node_id, int size) +{ + snprintf(out_node_id, size, UINT64_FORMAT "%u", GetSystemIdentifier(), MyDatabaseId); +} + +/* + * Actual work for adding a tuple to pg_lrg_nodes. + */ +void +lrg_add_nodes(char *node_id, Oid group_id, LRG_NODE_STATE status, char *node_name, char *local_connstring, char *upstream_connstring) +{ + Relation rel; + bool nulls[Natts_pg_lrg_nodes]; + Datum values[Natts_pg_lrg_nodes]; + HeapTuple tup; + + Oid lrgnodesoid; + + rel = table_open(LrgNodesRelationId, ExclusiveLock); + + memset(values, 0, sizeof(values)); + memset(nulls, 0, sizeof(nulls)); + + lrgnodesoid = GetNewOidWithIndex(rel, LrgNodesRelationIndexId, Anum_pg_lrg_nodes_oid); + values[Anum_pg_lrg_nodes_oid - 1] = ObjectIdGetDatum(lrgnodesoid); + values[Anum_pg_lrg_nodes_nodeid - 1] = CStringGetDatum(node_id); + values[Anum_pg_lrg_nodes_groupid - 1] = ObjectIdGetDatum(group_id); + values[Anum_pg_lrg_nodes_status - 1] = Int32GetDatum(status); + values[Anum_pg_lrg_nodes_dbid - 1] = ObjectIdGetDatum(MyDatabaseId); + values[Anum_pg_lrg_nodes_nodename - 1] = CStringGetDatum(node_name); + values[Anum_pg_lrg_nodes_localconn - 1] = CStringGetDatum(local_connstring); + + if (upstream_connstring != NULL) + values[Anum_pg_lrg_nodes_upstreamconn - 1] = CStringGetDatum(upstream_connstring); + else + nulls[Anum_pg_lrg_nodes_upstreamconn - 1] = true; + + tup = heap_form_tuple(RelationGetDescr(rel), values, nulls); + /* Insert tuple into catalog. */ + CatalogTupleInsert(rel, tup); + heap_freetuple(tup); + table_close(rel, ExclusiveLock); +} + +/* + * read pg_lrg_info and get oid. + * + * XXX: This function assumes that there is only one tuple + * in thepg_lrg_info. + */ +Oid +get_group_info(char **group_name) +{ + Relation rel; + HeapTuple tup; + TableScanDesc scan; + Oid group_oid = InvalidOid; + Form_pg_lrg_info infoform; + bool is_opened = false; + + if (!IsTransactionState()) + { + is_opened = true; + StartTransactionCommand(); + (void) GetTransactionSnapshot(); + } + + rel = table_open(LrgInfoRelationId, AccessShareLock); + scan = table_beginscan_catalog(rel, 0, NULL); + tup = heap_getnext(scan, ForwardScanDirection); + + if (tup != NULL) + { + infoform = (Form_pg_lrg_info) GETSTRUCT(tup); + group_oid = infoform->oid; + if (group_name != NULL) + { + MemoryContext old; + old = MemoryContextSwitchTo(TopMemoryContext); + *group_name = pstrdup(NameStr(infoform->groupname)); + MemoryContextSwitchTo(old); + } + } + + table_endscan(scan); + table_close(rel, AccessShareLock); + + if (is_opened) + CommitTransactionCommand(); + + return group_oid; +} + +/* + * Actual work for adding a tuple to pg_lrg_info. + */ +static Oid +lrg_add_info(char *group_name, bool puballtables) +{ + Relation rel; + bool nulls[Natts_pg_lrg_info]; + Datum values[Natts_pg_lrg_info]; + HeapTuple tup; + Oid lrgoid; + + rel = table_open(LrgInfoRelationId, ExclusiveLock); + + memset(values, 0, sizeof(values)); + memset(nulls, 0, sizeof(nulls)); + + lrgoid = GetNewOidWithIndex(rel, LrgInfoRelationIndexId, Anum_pg_lrg_info_oid); + values[Anum_pg_lrg_info_oid - 1] = ObjectIdGetDatum(lrgoid); + values[Anum_pg_lrg_info_groupname - 1] = CStringGetDatum(group_name); + values[Anum_pg_lrg_info_puballtables - 1] = BoolGetDatum(puballtables); + + tup = heap_form_tuple(RelationGetDescr(rel), values, nulls); + /* Insert tuple into catalog. */ + CatalogTupleInsert(rel, tup); + heap_freetuple(tup); + table_close(rel, ExclusiveLock); + + return lrgoid; +} + +/* + * helper function for lrg_insert_into_sub + */ +static Oid +find_subscription(const char *subname) +{ + /* for scannning */ + Relation rel; + HeapTuple tup; + Form_pg_subscription form; + + rel = table_open(SubscriptionRelationId, AccessExclusiveLock); + tup = SearchSysCacheCopy2(SUBSCRIPTIONNAME, MyDatabaseId, + CStringGetDatum(subname)); + + if (!HeapTupleIsValid(tup)) + { + table_close(rel, NoLock); + return InvalidOid; + } + + form = (Form_pg_subscription) GETSTRUCT(tup); + table_close(rel, NoLock); + + return form->oid; +} + +/* + * ================================ + * Public APIs + * ================================ + */ + +/* + * SQL function for creating a new logical replication group. + * + * This function adds a tuple to pg_lrg_info and pg_lrg_nodes, + * and after that kick lrg launcher. + */ +Datum +lrg_create(PG_FUNCTION_ARGS) +{ + Oid lrgoid; + char *group_name; + char *pub_type; + char *local_connstring; + char *node_name; + + /* XXX: for simplify the fixed array is used */ + char node_id[64]; + + if (get_group_info(NULL) != InvalidOid) + elog(ERROR, "This node is already a member of a node group"); + + group_name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0))); + pub_type = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(1))); + + if (pg_strcasecmp(pub_type, "FOR ALL TABLES") != 0) + elog(ERROR, "'only 'FOR ALL TABLES' is support"); + + lrgoid = lrg_add_info(group_name, true); + + construct_node_id(node_id, sizeof(node_id)); + local_connstring = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(2))); + node_name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(3))); + lrg_add_nodes(node_id, lrgoid, LRG_STATE_INIT, node_name, local_connstring, NULL); + + lrg_launcher_wakeup(); + PG_RETURN_VOID(); +} + + +/* + * SQL function for attaching to a specified group + * + * This function adds a tuple to pg_lrg_info and pg_lrg_nodes, + * and after that kick lrg launcher. + */ +Datum +lrg_node_attach(PG_FUNCTION_ARGS) +{ + Oid lrgoid; + char *group_name; + char *local_connstring; + char *upstream_connstring; + char *node_name; + PGconn *upstreamconn = NULL; + + /* XXX: for simplify the fixed array is used */ + char node_id[64]; + + if (get_group_info(NULL) != InvalidOid) + elog(ERROR, "This node is already a member of a node group"); + + group_name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0))); + local_connstring = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(1))); + upstream_connstring = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(2))); + node_name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(3))); + + /* + * For sanity check the backend process must connect to the upstream node. + * libpqlrg shared library will be used for that. + */ + load_file("libpqlrg", false); + lrg_connect(upstream_connstring, &upstreamconn); + if (!lrg_check_group(upstreamconn, group_name)) + elog(ERROR, "specified group is not exist"); + lrg_disconnect(upstreamconn); + + lrgoid = lrg_add_info(group_name, true); + construct_node_id(node_id, sizeof(node_id)); + lrg_add_nodes(node_id, lrgoid, LRG_STATE_INIT, node_name, local_connstring, upstream_connstring); + + lrg_launcher_wakeup(); + PG_RETURN_VOID(); +} + +/* + * SQL function for detaching from a group + */ +Datum +lrg_node_detach(PG_FUNCTION_ARGS) +{ + char *node_name; + char *given_group_name; + char *group_name_from_catalog = NULL; + + given_group_name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0))); + node_name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(1))); + + (void) get_group_info(&group_name_from_catalog); + if (group_name_from_catalog == NULL || + strcmp(given_group_name, group_name_from_catalog) != 0) + elog(ERROR, "This node is not a member of the specified group: %s", given_group_name); + + update_node_status_by_nodename(node_name, LRG_STATE_TO_BE_DETACHED, true); + lrg_launcher_wakeup(); + PG_RETURN_VOID(); +} + +/* + * SQL function for dropping a group. + */ +Datum +lrg_drop(PG_FUNCTION_ARGS) +{ + char node_id[64]; + char *given_group_name; + char *group_name_from_catalog = NULL; + + construct_node_id(node_id, sizeof(node_id)); + + given_group_name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0))); + + (void) get_group_info(&group_name_from_catalog); + if (group_name_from_catalog == NULL || + strcmp(given_group_name, group_name_from_catalog) != 0) + elog(ERROR, "This node is not a member of the specified group: %s", given_group_name); + + /* TODO: add a check whether there are not other members in the group or not */ + update_node_status_by_nodeid(node_id, LRG_STATE_TO_BE_DETACHED, true); + lrg_launcher_wakeup(); + PG_RETURN_VOID(); +} + +/* + * Wait until lrg related functions are done + */ +Datum +lrg_wait(PG_FUNCTION_ARGS) +{ + if (get_group_info(NULL) == InvalidOid) + PG_RETURN_NULL(); + + for (;;) + { + Relation rel; + HeapTuple tup; + TableScanDesc scan; + bool need_more_loop = false; + + CHECK_FOR_INTERRUPTS(); + + rel = table_open(LrgNodesRelationId, AccessShareLock); + scan = table_beginscan_catalog(rel, 0, NULL); + + while (HeapTupleIsValid(tup = heap_getnext(scan, ForwardScanDirection))) + { + Form_pg_lrg_nodes nodesform = (Form_pg_lrg_nodes) GETSTRUCT(tup); + + /* + * Set a flag if we must wait more. + */ + if (nodesform->status != LRG_STATE_READY) + need_more_loop = true; + } + + table_endscan(scan); + table_close(rel, NoLock); + + if (!need_more_loop) + break; + + elog(LOG, "we need to wait more..."); + +#define TEMPORARY_NAP_TIME 500L + WaitLatch(&MyProc->procLatch, + WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, + TEMPORARY_NAP_TIME, 0); + } + + PG_RETURN_VOID(); +} + +/* + * ================================ + * Internal SQL functions + * ================================ + */ + +/* + * Wrapper for adding a tuple into pg_lrg_sub + */ +Datum +lrg_insert_into_sub(PG_FUNCTION_ARGS) +{ + char *sub_name; + Oid group_oid, sub_oid, lrgsub_oid; + Relation rel; + bool nulls[Natts_pg_lrg_sub]; + Datum values[Natts_pg_lrg_sub]; + HeapTuple tup; + + sub_name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0))); + + group_oid = get_group_info(NULL); + sub_oid = find_subscription(sub_name); + + rel = table_open(LrgSubscriptionId, ExclusiveLock); + + memset(values, 0, sizeof(values)); + memset(nulls, 0, sizeof(nulls)); + + lrgsub_oid = GetNewOidWithIndex(rel, LrgSubscriptionOidIndexId, Anum_pg_lrg_sub_oid); + + values[Anum_pg_lrg_sub_oid - 1] = ObjectIdGetDatum(lrgsub_oid); + values[Anum_pg_lrg_sub_groupid - 1] = ObjectIdGetDatum(group_oid); + values[Anum_pg_lrg_sub_subid - 1] = ObjectIdGetDatum(sub_oid); + + tup = heap_form_tuple(RelationGetDescr(rel), values, nulls); + /* Insert tuple into catalog. */ + CatalogTupleInsert(rel, tup); + heap_freetuple(tup); + table_close(rel, ExclusiveLock); + + PG_RETURN_VOID(); +} + +/* + * Wrapper for adding a tuple into pg_lrg_nodes + */ +Datum +lrg_insert_into_nodes(PG_FUNCTION_ARGS) +{ + char *node_id; + LRG_NODE_STATE status; + char *node_name; + char *local_connstring; + char *upstream_connstring; + Oid group_oid; + + node_id = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0))); + status = DatumGetInt32(PG_GETARG_DATUM(1)); + node_name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(2))); + local_connstring = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(3))); + upstream_connstring = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(4))); + + group_oid = get_group_info(NULL); + + lrg_add_nodes(node_id, group_oid, status, node_name, local_connstring, upstream_connstring); + + PG_RETURN_VOID(); +} diff --git a/src/backend/replication/lrg/lrg_launcher.c b/src/backend/replication/lrg/lrg_launcher.c new file mode 100644 index 0000000000..2a63546ffb --- /dev/null +++ b/src/backend/replication/lrg/lrg_launcher.c @@ -0,0 +1,341 @@ +/*------------------------------------------------------------------------- + * + * lrg_launcher.c + * functions for lrg launcher + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include "access/htup_details.h" +#include "access/heapam.h" +#include "access/relscan.h" +#include "access/table.h" +#include "catalog/pg_database.h" +#include "miscadmin.h" +#include "postmaster/bgworker.h" +#include "postmaster/interrupt.h" +#include "replication/logicallauncher.h" +#include "replication/lrg.h" +#include "storage/ipc.h" +#include "storage/proc.h" +#include "tcop/tcopprot.h" +#include "utils/memutils.h" +#include "utils/snapmgr.h" + +static void launch_lrg_worker(Oid dbid); +static LrgPerdbWorker* find_perdb_worker(Oid dbid); +static List* get_db_list(void); +static void scan_and_launch(void); +static void lrglauncher_worker_onexit(int code, Datum arg); + +static bool ishook_registered = false; +static bool isworker_needed = false; + +typedef struct db_list_cell +{ + Oid dbid; + char *dbname; +} db_list_cell; + +/* + * Launch a lrg worker related with the given database + */ +static void +launch_lrg_worker(Oid dbid) +{ + BackgroundWorker bgw; + LrgPerdbWorker *worker = NULL; + int slot = 0; + + LWLockAcquire(&LrgPerdbCtx->lock, LW_EXCLUSIVE); + + /* + * Find a free worker slot. + */ + for (int i = 0; i < max_logical_replication_workers; i++) + { + LrgPerdbWorker *pw = &LrgPerdbCtx->workers[i]; + + if (pw->dbid == InvalidOid) + { + worker = pw; + slot = i; + break; + } + } + + /* + * If there are no more free worker slots, raise an ERROR now. + * + * TODO: cleanup the array? + */ + if (worker == NULL) + { + LWLockRelease(&LrgPerdbCtx->lock); + ereport(ERROR, + errmsg("out of worker slots")); + } + + + /* Prepare the worker slot. */ + worker->dbid = dbid; + + LWLockRelease(&LrgPerdbCtx->lock); + + MemSet(&bgw, 0, sizeof(BackgroundWorker)); + + snprintf(bgw.bgw_name, BGW_MAXLEN, "lrg worker for database %u", dbid); + bgw.bgw_start_time = BgWorkerStart_RecoveryFinished; + bgw.bgw_flags = BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION; + bgw.bgw_restart_time = BGW_NEVER_RESTART; + snprintf(bgw.bgw_library_name, BGW_MAXLEN, "postgres"); + snprintf(bgw.bgw_function_name, BGW_MAXLEN, "lrg_worker_main"); + bgw.bgw_main_arg = UInt32GetDatum(slot); + + if (!RegisterDynamicBackgroundWorker(&bgw, NULL)) + { + /* Failed to start worker, so clean up the worker slot. */ + LWLockAcquire(&LrgPerdbCtx->lock, LW_EXCLUSIVE); + lrg_worker_cleanup(worker); + LWLockRelease(&LrgPerdbCtx->lock); + ereport(ERROR, + (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED), + errmsg("out of worker slots"))); + } +} + +/* + * Find a launched lrg worker that related with the given database. + * This returns NUL if not exist. + */ +static LrgPerdbWorker* +find_perdb_worker(Oid dbid) +{ + int i; + + Assert(LWLockHeldByMe(&LrgPerdbCtx->lock)); + + for (i = 0; i < max_logical_replication_workers; i++) + { + LrgPerdbWorker *worker = &LrgPerdbCtx->workers[i]; + if (worker->dbid == dbid) + return worker; + } + return NULL; +} + +/* + * Load the list of databases in this server. + */ +static List* +get_db_list() +{ + List *res = NIL; + Relation rel; + TableScanDesc scan; + HeapTuple tup; + /* We will allocate the output data in the current memory context */ + MemoryContext resultcxt = CurrentMemoryContext; + + StartTransactionCommand(); + (void) GetTransactionSnapshot(); + + rel = table_open(DatabaseRelationId, AccessShareLock); + scan = table_beginscan_catalog(rel, 0, NULL); + + while (HeapTupleIsValid(tup = heap_getnext(scan, ForwardScanDirection))) + { + Form_pg_database dbform = (Form_pg_database) GETSTRUCT(tup); + db_list_cell *cell; + MemoryContext oldcxt; + + /* skip if connection is not allowed */ + if (!dbform->datallowconn) + continue; + + /* + * Allocate our results in the caller's context + */ + oldcxt = MemoryContextSwitchTo(resultcxt); + + cell = (db_list_cell *) palloc0(sizeof(db_list_cell)); + cell->dbid = dbform->oid; + cell->dbname = pstrdup(NameStr(dbform->datname)); + res = lappend(res, cell); + + MemoryContextSwitchTo(oldcxt); + } + + table_endscan(scan); + table_close(rel, AccessShareLock); + CommitTransactionCommand(); + + return res; +} + +/* + * Scan pg_lrg_nodes and launch if needed. + */ +static void +scan_and_launch(void) +{ + List *list; + ListCell *lc; + MemoryContext subctx; + MemoryContext oldctx; + + subctx = AllocSetContextCreate(TopMemoryContext, + "Lrg Launcher list", + ALLOCSET_DEFAULT_SIZES); + oldctx = MemoryContextSwitchTo(subctx); + + list = get_db_list(); + + foreach(lc, list) + { + db_list_cell *cell = (db_list_cell *)lfirst(lc); + LrgPerdbWorker *worker; + + LWLockAcquire(&LrgPerdbCtx->lock, LW_EXCLUSIVE); + worker = find_perdb_worker(cell->dbid); + LWLockRelease(&LrgPerdbCtx->lock); + + if (worker != NULL) + continue; + + launch_lrg_worker(cell->dbid); + } + + /* Switch back to original memory context. */ + MemoryContextSwitchTo(oldctx); + /* Clean the temporary memory. */ + MemoryContextDelete(subctx); +} + + +/* + * Callback for process exit. cleanup the controller + */ +static void +lrglauncher_worker_onexit(int code, Datum arg) +{ + LWLockAcquire(&LrgPerdbCtx->lock, LW_EXCLUSIVE); + LrgPerdbCtx->launcher_pid = InvalidPid; + LrgPerdbCtx->launcher_latch = NULL; + LWLockRelease(&LrgPerdbCtx->lock); +} + +/* + * Entry point for lrg launcher + */ +void +lrg_launcher_main(Datum arg) +{ + Assert(LrgPerdbCtx->launcher_pid == 0); + LrgPerdbCtx->launcher_pid = MyProcPid; + + /* Establish signal handlers. */ + pqsignal(SIGHUP, SignalHandlerForConfigReload); + pqsignal(SIGTERM, die); + BackgroundWorkerUnblockSignals(); + + /* + * Register my latch to the controller + * for receiving notifications from lrg background worker. + */ + LWLockAcquire(&LrgPerdbCtx->lock, LW_EXCLUSIVE); + LrgPerdbCtx->launcher_latch = &MyProc->procLatch; + LrgPerdbCtx->launcher_pid = MyProcPid; + LWLockRelease(&LrgPerdbCtx->lock); + before_shmem_exit(lrglauncher_worker_onexit, (Datum) 0); + ResetLatch(&MyProc->procLatch); + + /* + * we did not connect specific database, because launcher + * will read only pg_database. + */ + BackgroundWorkerInitializeConnection(NULL, NULL, 0); + + /* + * main loop + */ + for (;;) + { + int rc = 0; + + CHECK_FOR_INTERRUPTS(); + + /* + * XXX: for simplify laucnher will start a loop at fixed intervals, + * but it will be no-op if no one sets a latch. + */ +#define TEMPORARY_NAP_TIME 180000L + + rc = WaitLatch(&MyProc->procLatch, + WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, + TEMPORARY_NAP_TIME, 0); + if (rc & WL_LATCH_SET) + { + ResetLatch(&MyProc->procLatch); + CHECK_FOR_INTERRUPTS(); + scan_and_launch(); + } + + if (ConfigReloadPending) + { + ConfigReloadPending = false; + ProcessConfigFile(PGC_SIGHUP); + } + } + /* Not reachable */ +} + +/* + * xact callback for launcher/worker. + */ +static void +lrg_perdb_wakeup_callback(XactEvent event, void *arg) +{ + switch (event) + { + case XACT_EVENT_COMMIT: + if (isworker_needed) + { + LrgPerdbWorker *worker; + LWLockAcquire(&LrgPerdbCtx->lock, LW_EXCLUSIVE); + worker = find_perdb_worker(MyDatabaseId); + + /* + * If lrg worker related with this db has been + * launched, notify to the worker. + * If not, maybe it means that someone has called lrg_create()/lrg_node_attach(), + * notify to the launcher. + */ + if (worker != NULL) + SetLatch(worker->worker_latch); + else + SetLatch(LrgPerdbCtx->launcher_latch); + + LWLockRelease(&LrgPerdbCtx->lock); + } + isworker_needed = false; + break; + default: + break; + } +} + +/* + * Register a callback for notifying to launcher, and set a flag + */ +void +lrg_launcher_wakeup(void) +{ + if (!ishook_registered) + { + RegisterXactCallback(lrg_perdb_wakeup_callback, NULL); + ishook_registered = true; + } + isworker_needed = true; +} diff --git a/src/backend/replication/lrg/lrg_worker.c b/src/backend/replication/lrg/lrg_worker.c new file mode 100644 index 0000000000..f4ccf3cc1c --- /dev/null +++ b/src/backend/replication/lrg/lrg_worker.c @@ -0,0 +1,652 @@ +/*------------------------------------------------------------------------- + * + * lrg_worker.c + * functions for lrg worker + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "access/genam.h" +#include "access/heapam.h" +#include "access/htup_details.h" +#include "access/relscan.h" +#include "access/table.h" +#include "catalog/catalog.h" +#include "catalog/indexing.h" +#include "catalog/pg_lrg_info.h" +#include "catalog/pg_lrg_nodes.h" +#include "catalog/pg_lrg_pub.h" +#include "catalog/pg_publication.h" +#include "executor/spi.h" +#include "libpq-fe.h" +#include "miscadmin.h" +#include "postmaster/bgworker.h" +#include "postmaster/interrupt.h" +#include "replication/libpqlrg.h" +#include "replication/lrg.h" +#include "storage/ipc.h" +#include "storage/proc.h" +#include "tcop/tcopprot.h" +#include "utils/fmgroids.h" +#include "utils/memutils.h" +#include "utils/snapmgr.h" +#include "utils/syscache.h" + +typedef struct LrgNode { + Oid group_oid; + char *node_id; + char *node_name; + char *local_connstring; + char *upstream_connstring; +} LrgNode; + +lrg_function_types *LrgFunctionTypes = NULL; + +static LrgPerdbWorker* my_lrg_worker = NULL; + +static void lrg_worker_onexit(int code, Datum arg); +static void do_node_management(void); +static void get_node_information(LrgNode **node, LRG_NODE_STATE *status); +static void advance_state_machine(LrgNode *node, LRG_NODE_STATE initial_status); +static void update_node_status_internal(const char *node_id, const char *node_name, LRG_NODE_STATE state, bool is_in_txn); +static void detach_node(LrgNode *node); +static void create_publication(const char* group_name, const char* node_id, Oid group_oid); +static Oid find_publication(const char *pubname); +static List* get_lrg_nodes_list(const char *local_nodeid); +static void synchronise_system_tables(PGconn *localconn, PGconn *upstreamconn); + +void +lrg_worker_cleanup(LrgPerdbWorker *worker) +{ + Assert(LWLockHeldByMeInMode(&LrgPerdbCtx->lock, LW_EXCLUSIVE)); + + worker->dbid = InvalidOid; + worker->worker_pid = InvalidPid; + worker->worker_latch = NULL; +} + +/* + * Callback for process exit. cleanup the array. + */ +static void +lrg_worker_onexit(int code, Datum arg) +{ + LWLockAcquire(&LrgPerdbCtx->lock, LW_EXCLUSIVE); + lrg_worker_cleanup(my_lrg_worker); + LWLockRelease(&LrgPerdbCtx->lock); +} + +/* + * Synchronise system tables from upstream node. + * + * Currently it will read and insert pg_lrg_nodes only. + */ +static void +synchronise_system_tables(PGconn *localconn, PGconn *upstreamconn) +{ + lrg_copy_lrg_nodes(upstreamconn, localconn); +} + +/* + * Load the list of lrg_nodes, except the given node + */ +static List* +get_lrg_nodes_list(const char *excepted_node) +{ + List *res = NIL; + Relation rel; + TableScanDesc scan; + HeapTuple tup; + /* We will allocate the output data in the current memory context */ + MemoryContext resultcxt = CurrentMemoryContext; + + StartTransactionCommand(); + (void) GetTransactionSnapshot(); + + rel = table_open(LrgNodesRelationId, AccessShareLock); + scan = table_beginscan_catalog(rel, 0, NULL); + + while (HeapTupleIsValid(tup = heap_getnext(scan, ForwardScanDirection))) + { + Form_pg_lrg_nodes nodesform = (Form_pg_lrg_nodes) GETSTRUCT(tup); + LrgNode *node; + MemoryContext oldcxt; + + if (excepted_node != NULL && + strcmp(NameStr(nodesform->nodeid), excepted_node) == 0) + continue; + /* + * Allocate our results in the caller's context, not the transaction's. + */ + oldcxt = MemoryContextSwitchTo(resultcxt); + + node = (LrgNode *)palloc0(sizeof(LrgNode)); + node->group_oid = nodesform->groupid; + node->node_id = pstrdup(NameStr(nodesform->nodeid)); + node->node_name = pstrdup(NameStr(nodesform->nodename)); + node->local_connstring = pstrdup(NameStr(nodesform->localconn)); + + /* + * TODO: treat upstreamconn as nullable field + */ + if (strlen(NameStr(nodesform->upstreamconn)) != 0) + node->upstream_connstring = pstrdup(NameStr(nodesform->upstreamconn)); + else + node->upstream_connstring = NULL; + + res = lappend(res, node); + + MemoryContextSwitchTo(oldcxt); + } + + table_endscan(scan); + table_close(rel, AccessShareLock); + CommitTransactionCommand(); + + return res; +} + +/* + * Internal routine for updaing the status of the node. + * + * TODO: implement as C func instead of SPI interface + */ +static void +update_node_status_internal(const char *node_id, const char *node_name, LRG_NODE_STATE state, bool is_in_txn) +{ + StringInfoData query; + int ret; + + Assert(!(node_id == NULL && node_name == NULL) + && !(node_id != NULL && node_name != NULL)); + + initStringInfo(&query); + appendStringInfo(&query, "UPDATE pg_lrg_nodes SET status = "); + + switch (state) + { + case LRG_STATE_CREATE_PUBLICATION: + appendStringInfo(&query, "%d ", LRG_STATE_CREATE_PUBLICATION); + break; + case LRG_STATE_CREATE_SUBSCRIPTION: + appendStringInfo(&query, "%d", LRG_STATE_CREATE_SUBSCRIPTION); + break; + case LRG_STATE_READY: + appendStringInfo(&query, "%d", LRG_STATE_READY); + break; + case LRG_STATE_TO_BE_DETACHED: + appendStringInfo(&query, "%d", LRG_STATE_TO_BE_DETACHED); + break; + default: + elog(ERROR, "not implemented yet"); + } + + if (node_id != NULL) + appendStringInfo(&query, " WHERE nodeid = '%s'", node_id); + else + appendStringInfo(&query, " WHERE nodename = '%s'", node_name); + + if (!is_in_txn) + { + StartTransactionCommand(); + PushActiveSnapshot(GetTransactionSnapshot()); + } + SPI_connect(); + + ret = SPI_execute(query.data, false, 0); + if (ret != SPI_OK_UPDATE) + elog(ERROR, "SPI error while updating a table"); + SPI_finish(); + + if (!is_in_txn) + { + PopActiveSnapshot(); + CommitTransactionCommand(); + } + + pfree(query.data); +} + +/* + * Update the status of node, that is speciefied by the name + */ +void +update_node_status_by_nodename(const char *node_name, LRG_NODE_STATE state, bool is_in_txn) +{ + update_node_status_internal(NULL, node_name, state, is_in_txn); +} + +/* + * Same as above, but node_id is used for the key + */ +void +update_node_status_by_nodeid(const char *node_id, LRG_NODE_STATE state, bool is_in_txn) +{ + update_node_status_internal(node_id, NULL, state, is_in_txn); +} + + +static Oid +find_publication(const char *pubname) +{ + Relation rel; + HeapTuple tup; + Form_pg_publication pubform; + + rel = table_open(PublicationRelationId, RowExclusiveLock); + + /* Check if name is used */ + tup = SearchSysCacheCopy1(PUBLICATIONNAME, + CStringGetDatum(pubname)); + + if (!HeapTupleIsValid(tup)) + { + table_close(rel, NoLock); + return InvalidOid; + } + + pubform = (Form_pg_publication) GETSTRUCT(tup); + table_close(rel, NoLock); + + return pubform->oid; +} + +/* + * Create publication via SPI interface, and insert its oid + * to the system catalog pg_lrg_pub. + */ +static void +create_publication(const char* group_name, const char* node_id, Oid group_oid) +{ + int ret; + StringInfoData query, pub_name; + Oid pub_oid; + Oid lrgpub_oid; + Relation rel; + bool nulls[Natts_pg_lrg_pub]; + Datum values[Natts_pg_lrg_pub]; + HeapTuple tup; + + initStringInfo(&query); + initStringInfo(&pub_name); + + /* Firstly do CREATE PUBLICATION */ + + StartTransactionCommand(); + SPI_connect(); + PushActiveSnapshot(GetTransactionSnapshot()); + + appendStringInfo(&pub_name, "pub_for_%s", group_name); + appendStringInfo(&query, "CREATE PUBLICATION %s %s", pub_name.data, "FOR ALL TABLES"); + + ret = SPI_execute(query.data, false, 0); + if (ret != SPI_OK_UTILITY) + elog(ERROR, "SPI error while creating publication"); + + PopActiveSnapshot(); + SPI_finish(); + CommitTransactionCommand(); + + /* ...And record its oid */ + StartTransactionCommand(); + (void) GetTransactionSnapshot(); + + pub_oid = find_publication(pub_name.data); + if (pub_oid == InvalidOid) + elog(ERROR, "publication is not found"); + + rel = table_open(LrgPublicationId, ExclusiveLock); + + memset(nulls, 0, sizeof(nulls)); + memset(values, 0, sizeof(values)); + + lrgpub_oid = GetNewOidWithIndex(rel, LrgPublicationOidIndexId, Anum_pg_lrg_pub_oid); + + values[Anum_pg_lrg_pub_oid - 1] = ObjectIdGetDatum(lrgpub_oid); + values[Anum_pg_lrg_pub_groupid - 1] = ObjectIdGetDatum(group_oid); + values[Anum_pg_lrg_pub_pubid - 1] = ObjectIdGetDatum(pub_oid); + + tup = heap_form_tuple(RelationGetDescr(rel), values, nulls); + /* Insert tuple into catalog. */ + CatalogTupleInsert(rel, tup); + heap_freetuple(tup); + table_close(rel, ExclusiveLock); + + CommitTransactionCommand(); + + pfree(pub_name.data); + pfree(query.data); +} + +/* + * Some work for detaching and dropping + */ +static void +detach_node(LrgNode *node) +{ + PGconn *tobedetached = NULL; + List *list; + ListCell *lc; + MemoryContext subctx; + MemoryContext oldctx; + char *group_name = NULL; + + get_group_info(&group_name); + + if (LrgFunctionTypes == NULL) + load_file("libpqlrg", false); + + lrg_connect(node->local_connstring, &tobedetached); + + subctx = AllocSetContextCreate(TopMemoryContext, + "Lrg Launcher list", + ALLOCSET_DEFAULT_SIZES); + oldctx = MemoryContextSwitchTo(subctx); + + list = get_lrg_nodes_list(node->node_id); + + if (list != NIL) + { + foreach(lc, list) + { + LrgNode *other_node = (LrgNode *)lfirst(lc); + PGconn *otherconn = NULL; + lrg_connect(other_node->local_connstring, &otherconn); + + lrg_drop_subscription(group_name, node->node_id, other_node->node_id, otherconn); + lrg_drop_subscription(group_name, other_node->node_id, node->node_id, tobedetached); + + lrg_delete_from_nodes(otherconn, node->node_id); + lrg_disconnect(otherconn); + } + } + else + lrg_delete_from_nodes(tobedetached, node->node_id); + + MemoryContextSwitchTo(oldctx); + MemoryContextDelete(subctx); + + lrg_drop_publication(group_name, tobedetached); + lrg_cleanup(tobedetached); + lrg_disconnect(tobedetached); + + pfree(group_name); +} + +/* + * advance the state machine for creating/attaching + */ +static void +advance_state_machine(LrgNode *local_node, LRG_NODE_STATE initial_status) +{ + PGconn *localconn = NULL; + PGconn *upstreamconn = NULL; + char *group_name = NULL; + LRG_NODE_STATE state = initial_status; + char node_id[64]; + + /* + * Assuming that the specified node is local + */ + construct_node_id(node_id, sizeof(node_id)); + Assert(strcmp(node_id, local_node->node_id) == 0); + + if (state == LRG_STATE_INIT) + { + /* Establish connection if we are in the attaching case */ + if (local_node->upstream_connstring != NULL) + { + load_file("libpqlrg", false); + lrg_connect(local_node->upstream_connstring, &upstreamconn); + lrg_connect(local_node->local_connstring, &localconn); + + /* and get pg_lrg_nodes from upstream */ + synchronise_system_tables(localconn, upstreamconn); + } + get_group_info(&group_name); + + create_publication(group_name, local_node->node_id, local_node->group_oid); + + state = LRG_STATE_CREATE_PUBLICATION; + update_node_status_by_nodename(local_node->node_name, LRG_STATE_CREATE_PUBLICATION, false); + } + + if (state == LRG_STATE_CREATE_PUBLICATION) + { + if (local_node->upstream_connstring != NULL) + { + List *list; + ListCell *lc; + MemoryContext subctx; + MemoryContext oldctx; + + subctx = AllocSetContextCreate(TopMemoryContext, + "Lrg Launcher list", + ALLOCSET_DEFAULT_SIZES); + oldctx = MemoryContextSwitchTo(subctx); + + /* Get a node list that belong to the group */ + list = get_lrg_nodes_list(local_node->node_id); + + /* and do CREATE SUBSCRIPTION on all nodes! */ + foreach(lc, list) + { + LrgNode *other_node = (LrgNode *)lfirst(lc); + PGconn *otherconn = NULL; + lrg_connect(other_node->local_connstring, &otherconn); + lrg_create_subscription(group_name, local_node->local_connstring, + local_node->node_id, other_node->node_id, + otherconn, "only_local = true, copy_data = false"); + lrg_create_subscription(group_name, other_node->local_connstring, + other_node->node_id, local_node->node_id, + localconn, "only_local = true, copy_data = false"); + + /* + * XXX: adding a tuple into remote's pg_lrg_nodes here, + * but it is bad. it should be end of this function. + */ + if (local_node->upstream_connstring != NULL) + lrg_insert_into_lrg_nodes(otherconn, local_node->node_id, + LRG_STATE_READY, local_node->node_name, + local_node->local_connstring, local_node->upstream_connstring); + lrg_disconnect(otherconn); + } + MemoryContextSwitchTo(oldctx); + MemoryContextDelete(subctx); + } + + state = LRG_STATE_CREATE_SUBSCRIPTION; + update_node_status_by_nodename(local_node->node_name, LRG_STATE_CREATE_SUBSCRIPTION, false); + } + + state = LRG_STATE_READY; + update_node_status_by_nodename(local_node->node_name, LRG_STATE_READY, false); + + /* + * clean up phase + */ + if (localconn != NULL) + lrg_disconnect(localconn); + if (upstreamconn != NULL) + lrg_disconnect(upstreamconn); + if (group_name != NULL) + pfree(group_name); +} + +/* + * Get node-specific information that status is not ready. + */ +static void +get_node_information(LrgNode **node, LRG_NODE_STATE *status) +{ + Relation rel; + HeapTuple tup; + TableScanDesc scan; + + StartTransactionCommand(); + (void) GetTransactionSnapshot(); + + rel = table_open(LrgNodesRelationId, AccessShareLock); + scan = table_beginscan_catalog(rel, 0, NULL); + + while (HeapTupleIsValid(tup = heap_getnext(scan, ForwardScanDirection))) + { + Form_pg_lrg_nodes nodesform = (Form_pg_lrg_nodes) GETSTRUCT(tup); + MemoryContext oldcxt; + LrgNode *tmp; + + /* + * If the status is ready, we skip it. + */ + if (nodesform->status == LRG_STATE_READY) + continue; + + oldcxt = MemoryContextSwitchTo(TopMemoryContext); + tmp = (LrgNode *)palloc0(sizeof(LrgNode)); + tmp->group_oid = nodesform->groupid; + tmp->node_id = pstrdup(NameStr(nodesform->nodeid)); + tmp->node_name = pstrdup(NameStr(nodesform->nodename)); + tmp->local_connstring = pstrdup(NameStr(nodesform->localconn)); + if (strlen(NameStr(nodesform->upstreamconn)) != 0) + tmp->upstream_connstring = pstrdup(NameStr(nodesform->upstreamconn)); + else + tmp->upstream_connstring = NULL; + + *node = tmp; + *status = nodesform->status; + + MemoryContextSwitchTo(oldcxt); + break; + } + + table_endscan(scan); + table_close(rel, NoLock); + CommitTransactionCommand(); +} + +static void +do_node_management(void) +{ + LrgNode *node = NULL; + LRG_NODE_STATE status; + + /* + * read information from pg_lrg_nodes + */ + get_node_information(&node, &status); + + if (node == NULL) + { + /* + * If we rearch here status of nodes are READY, + * it means that no operations are needed. + */ + return; + } + + /* + * XXX: for simplify the case for detaching/dropping is completely separated + * from the creating/attaching. + */ + if (status == LRG_STATE_TO_BE_DETACHED) + detach_node(node); + else + { + /* + * advance the state machine for creating or attaching. + */ + advance_state_machine(node, status); + } + + pfree(node->node_id); + pfree(node->node_name); + pfree(node->local_connstring); + if (node->upstream_connstring != NULL) + pfree(node->upstream_connstring); + pfree(node); +} + +/* + * Entry point for lrg worker + */ +void +lrg_worker_main(Datum arg) +{ + int slot = DatumGetInt32(arg); + + /* Establish signal handlers. */ + pqsignal(SIGHUP, SignalHandlerForConfigReload); + pqsignal(SIGTERM, die); + BackgroundWorkerUnblockSignals(); + + /* + * Get information from the controller. The idex + * is given as the argument + */ + LWLockAcquire(&LrgPerdbCtx->lock, LW_SHARED); + my_lrg_worker = &LrgPerdbCtx->workers[slot]; + my_lrg_worker->worker_pid = MyProcPid; + my_lrg_worker->worker_latch = &MyProc->procLatch; + LWLockRelease(&LrgPerdbCtx->lock); + + before_shmem_exit(lrg_worker_onexit, (Datum) 0); + + BackgroundWorkerInitializeConnectionByOid(my_lrg_worker->dbid, 0, 0); + + elog(DEBUG3, "per-db worker for %u was launched", my_lrg_worker->dbid); + + /* + * The launcher launches the worker without considering + * the existence of lrg related data. + * So firstly workers must check their catalogs, and exit + * if there is no data. + * In any cases pg_lrg_info will have tuples if + * this node is in a node group, so we reads it. + */ + if (get_group_info(NULL) == InvalidOid) + { + elog(DEBUG3, "This database %u is not a member of lrg", MyDatabaseId); + proc_exit(0); + } + + do_node_management(); + + ResetLatch(&MyProc->procLatch); + + /* + * Wait for detaching or dropping. + */ + for (;;) + { + int rc; + bool is_latch_set = false; + + CHECK_FOR_INTERRUPTS(); + +#define TEMPORARY_NAP_TIME 180000L + rc = WaitLatch(&MyProc->procLatch, + WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, + TEMPORARY_NAP_TIME, 0); + + if (rc & WL_LATCH_SET) + { + is_latch_set = true; + ResetLatch(&MyProc->procLatch); + CHECK_FOR_INTERRUPTS(); + } + + if (ConfigReloadPending) + { + ConfigReloadPending = false; + ProcessConfigFile(PGC_SIGHUP); + } + + if (is_latch_set) + { + do_node_management(); + is_latch_set = false; + } + } +} diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c index 26372d95b3..15b77405bc 100644 --- a/src/backend/storage/ipc/ipci.c +++ b/src/backend/storage/ipc/ipci.c @@ -32,6 +32,7 @@ #include "postmaster/bgwriter.h" #include "postmaster/postmaster.h" #include "replication/logicallauncher.h" +#include "replication/lrg.h" #include "replication/origin.h" #include "replication/slot.h" #include "replication/walreceiver.h" @@ -284,6 +285,7 @@ CreateSharedMemoryAndSemaphores(void) WalRcvShmemInit(); PgArchShmemInit(); ApplyLauncherShmemInit(); + LrgLauncherShmemInit(); /* * Set up other modules that need some shared memory space diff --git a/src/include/catalog/pg_lrg_info.h b/src/include/catalog/pg_lrg_info.h new file mode 100644 index 0000000000..0067aac389 --- /dev/null +++ b/src/include/catalog/pg_lrg_info.h @@ -0,0 +1,47 @@ +/*------------------------------------------------------------------------- + * + * pg_lrg_info.h + * definition of the "logical replication group information" system + * catalog (pg_lrg_info) + * + * + * Portions Copyright (c) 1996-2022, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * src/include/catalog/pg_lrg_info.h + * + * NOTES + * The Catalog.pm module reads this file and derives schema + * information. + * + *------------------------------------------------------------------------- + */ +#ifndef PG_LRG_INFO_H +#define PG_LRG_INFO_H + +#include "catalog/genbki.h" +#include "catalog/pg_lrg_info_d.h" + +/* ---------------- + * pg_lrg_info definition. cpp turns this into + * typedef struct FormData_pg_lrg_info + * ---------------- + */ +CATALOG(pg_lrg_info,8337,LrgInfoRelationId) +{ + Oid oid; /* oid */ + + NameData groupname; /* name of the logical replication group */ + bool puballtables; +} FormData_pg_lrg_info; + +/* ---------------- + * Form_pg_lrg_info corresponds to a pointer to a tuple with + * the format of pg_lrg_info relation. + * ---------------- + */ +typedef FormData_pg_lrg_info *Form_pg_lrg_info; + +DECLARE_UNIQUE_INDEX_PKEY(pg_lrg_info_oid_index, 8338, LrgInfoRelationIndexId, on pg_lrg_info using btree(oid oid_ops)); + +#endif /* PG_LRG_INFO_H */ diff --git a/src/include/catalog/pg_lrg_nodes.h b/src/include/catalog/pg_lrg_nodes.h new file mode 100644 index 0000000000..0ef32185ad --- /dev/null +++ b/src/include/catalog/pg_lrg_nodes.h @@ -0,0 +1,54 @@ +/*------------------------------------------------------------------------- + * + * pg_lrg_nodes.h + * definition of the "logical replication nodes" system + * catalog (pg_lrg_nodes) + * + * + * Portions Copyright (c) 1996-2022, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * src/include/catalog/pg_lrg_nodes.h + * + * NOTES + * The Catalog.pm module reads this file and derives schema + * information. + * + *------------------------------------------------------------------------- + */ +#ifndef PG_LRG_NODES_H +#define PG_LRG_NODES_H + +#include "catalog/genbki.h" +#include "catalog/pg_lrg_nodes_d.h" + +/* ---------------- + * pg_lrg_nodes definition. cpp turns this into + * typedef struct FormData_pg_lrg_nodes + * ---------------- + */ +CATALOG(pg_lrg_nodes,8339,LrgNodesRelationId) +{ + Oid oid; /* oid */ + + NameData nodeid; /* name of the logical replication group */ + Oid groupid BKI_LOOKUP(pg_lrg_info); + Oid dbid BKI_LOOKUP(pg_database); + int32 status; + NameData nodename; + NameData localconn; + NameData upstreamconn BKI_FORCE_NULL; +} FormData_pg_lrg_nodes; + +/* ---------------- + * Form_pg_lrg_nodes corresponds to a pointer to a tuple with + * the format of pg_lrg_nodes relation. + * ---------------- + */ +typedef FormData_pg_lrg_nodes *Form_pg_lrg_nodes; + +DECLARE_UNIQUE_INDEX_PKEY(pg_lrg_nodes_oid_index, 8340, LrgNodesRelationIndexId, on pg_lrg_nodes using btree(oid oid_ops)); +DECLARE_UNIQUE_INDEX(pg_lrg_node_id_index, 8346, LrgNodeIdIndexId, on pg_lrg_nodes using btree(nodeid name_ops)); +DECLARE_UNIQUE_INDEX(pg_lrg_nodes_name_index, 8347, LrgNodeNameIndexId, on pg_lrg_nodes using btree(nodename name_ops)); + +#endif /* PG_LRG_NODES_H */ diff --git a/src/include/catalog/pg_lrg_pub.h b/src/include/catalog/pg_lrg_pub.h new file mode 100644 index 0000000000..d65dc51d4d --- /dev/null +++ b/src/include/catalog/pg_lrg_pub.h @@ -0,0 +1,46 @@ +/*------------------------------------------------------------------------- + * + * pg_lrg_info.h + * definition of the "logical replication group publication" system + * catalog (pg_lrg_pub) + * + * + * Portions Copyright (c) 1996-2022, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * src/include/catalog/pg_lrg_pub.h + * + * NOTES + * The Catalog.pm module reads this file and derives schema + * information. + * + *------------------------------------------------------------------------- + */ +#ifndef PG_LRG_PUB_H +#define PG_LRG_PUB_H + +#include "catalog/genbki.h" +#include "catalog/pg_lrg_pub_d.h" + +/* ---------------- + * pg_lrg_pub definition. cpp turns this into + * typedef struct FormData_pg_lrg_pub + * ---------------- + */ +CATALOG(pg_lrg_pub,8341,LrgPublicationId) +{ + Oid oid; + Oid groupid BKI_LOOKUP(pg_lrg_info); + Oid pubid BKI_LOOKUP(pg_publication); +} FormData_pg_lrg_pub; + +/* ---------------- + * Form_pg_lrg_pub corresponds to a pointer to a tuple with + * the format of pg_lrg_pub relation. + * ---------------- + */ +typedef FormData_pg_lrg_pub *Form_pg_lrg_pub; + +DECLARE_UNIQUE_INDEX_PKEY(pg_lrg_pub_oid_index, 8344, LrgPublicationOidIndexId, on pg_lrg_pub using btree(oid oid_ops)); + +#endif /* PG_LRG_PUB_H */ diff --git a/src/include/catalog/pg_lrg_sub.h b/src/include/catalog/pg_lrg_sub.h new file mode 100644 index 0000000000..398c8e8971 --- /dev/null +++ b/src/include/catalog/pg_lrg_sub.h @@ -0,0 +1,46 @@ +/*------------------------------------------------------------------------- + * + * pg_lrg_sub.h + * definition of the "logical replication group subscription" system + * catalog (pg_lrg_sub) + * + * + * Portions Copyright (c) 1996-2022, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * src/include/catalog/pg_lrg_sub.h + * + * NOTES + * The Catalog.pm module reads this file and derives schema + * information. + * + *------------------------------------------------------------------------- + */ +#ifndef PG_LRG_SUB_H +#define PG_LRG_SUB_H + +#include "catalog/genbki.h" +#include "catalog/pg_lrg_sub_d.h" + +/* ---------------- + * pg_lrg_sub definition. cpp turns this into + * typedef struct FormData_pg_lrg_sub + * ---------------- + */ +CATALOG(pg_lrg_sub,8343,LrgSubscriptionId) +{ + Oid oid; + Oid groupid BKI_LOOKUP(pg_lrg_info);; + Oid subid BKI_LOOKUP(pg_subscription); +} FormData_pg_lrg_sub; + +/* ---------------- + * Form_pg_lrg_sub corresponds to a pointer to a tuple with + * the format of pg_lrg_sub relation. + * ---------------- + */ +typedef FormData_pg_lrg_sub *Form_pg_lrg_sub; + +DECLARE_UNIQUE_INDEX_PKEY(pg_lrg_sub_oid_index, 8345, LrgSubscriptionOidIndexId, on pg_lrg_sub using btree(oid oid_ops)); + +#endif /* PG_LRG_SUB_H */ diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index babe16f00a..c64b4b2420 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -11885,4 +11885,34 @@ prorettype => 'bytea', proargtypes => 'pg_brin_minmax_multi_summary', prosrc => 'brin_minmax_multi_summary_send' }, +# lrg +{ oid => '8143', descr => 'create logical replication group', + proname => 'lrg_create', proparallel => 'r', + prorettype => 'void', proargtypes => 'text text text text', + prosrc => 'lrg_create' }, +{ oid => '8144', descr => 'attach to logical replication group', + proname => 'lrg_node_attach', proparallel => 'r', + prorettype => 'void', proargtypes => 'text text text text', + prosrc => 'lrg_node_attach' }, +{ oid => '8145', descr => 'detach from logical replication group', + proname => 'lrg_node_detach', proparallel => 'r', + prorettype => 'void', proargtypes => 'text text', + prosrc => 'lrg_node_detach' }, +{ oid => '8146', descr => 'delete logical replication group', + proname => 'lrg_drop', proparallel => 'r', + prorettype => 'void', proargtypes => 'text', + prosrc => 'lrg_drop' }, +{ oid => '8147', descr => 'insert a tuple to pg_lrg_sub', + proname => 'lrg_insert_into_sub', proparallel => 'r', + prorettype => 'void', proargtypes => 'text', + prosrc => 'lrg_insert_into_sub' }, +{ oid => '8148', descr => 'insert a tuple to pg_lrg_nodes', + proname => 'lrg_insert_into_nodes', proparallel => 'r', + prorettype => 'void', proargtypes => 'text int4 text text text', + prosrc => 'lrg_insert_into_nodes' }, +{ oid => '8149', descr => 'wait until node operations are done.', + proname => 'lrg_wait', proparallel => 'r', + prorettype => 'void', proargtypes => '', + prosrc => 'lrg_wait' }, + ] diff --git a/src/include/replication/libpqlrg.h b/src/include/replication/libpqlrg.h new file mode 100644 index 0000000000..f13b4934d3 --- /dev/null +++ b/src/include/replication/libpqlrg.h @@ -0,0 +1,99 @@ +/*------------------------------------------------------------------------- + * + * libpqlrg.h + * Constructs a logical replication group + * + *------------------------------------------------------------------------- + */ +#ifndef LIBPQLIG_H +#define LIBPQLIG_H + +#include "postgres.h" +#include "libpq-fe.h" +#include "replication/lrg.h" + +/* function pointers for libpqlrg */ + +typedef void (*libpqlrg_connect_fn) (const char *connstring, PGconn **conn); +typedef bool (*libpqlrg_check_group_fn) (PGconn *conn, const char *group_name); +typedef void (*libpqlrg_copy_lrg_nodes_fn) (PGconn *remoteconn, PGconn *localconn); +typedef void (*libpqlrg_insert_into_lrg_nodes_fn) (PGconn *remoteconn, + const char *node_id, LRG_NODE_STATE status, + const char *node_name, const char *local_connstring, + const char *upstream_connstring); +typedef void (*libpqlrg_create_subscription_fn) (const char *group_name, const char *publisher_connstring, + const char *publisher_node_id, const char *subscriber_node_id, + PGconn *subscriberconn, const char *options); + +typedef void (*libpqlrg_drop_publication_fn) (const char *group_name, + PGconn *publisherconn); + +typedef void (*libpqlrg_drop_subscription_fn) (const char *group_name, + const char *publisher_node_id, const char *subscriber_node_id, + PGconn *subscriberconn); + +typedef void (*libpqlrg_delete_from_nodes_fn) (PGconn *conn, const char *node_id); +typedef void (*libpqlrg_cleanup_fn) (PGconn *conn); + +typedef void (*libpqlrg_disconnect_fn) (PGconn *conn); + +typedef struct lrg_function_types +{ + libpqlrg_connect_fn libpqlrg_connect; + libpqlrg_check_group_fn libpqlrg_check_group; + libpqlrg_copy_lrg_nodes_fn libpqlrg_copy_lrg_nodes; + libpqlrg_insert_into_lrg_nodes_fn libpqlrg_insert_into_lrg_nodes; + libpqlrg_create_subscription_fn libpqlrg_create_subscription; + libpqlrg_drop_publication_fn libpqlrg_drop_publication; + libpqlrg_drop_subscription_fn libpqlrg_drop_subscription; + libpqlrg_delete_from_nodes_fn libpqlrg_delete_from_nodes; + libpqlrg_cleanup_fn libpqlrg_cleanup; + libpqlrg_disconnect_fn libpqlrg_disconnect; +} lrg_function_types; + +extern PGDLLIMPORT lrg_function_types *LrgFunctionTypes; + +#define lrg_connect(connstring, conn) \ + LrgFunctionTypes->libpqlrg_connect(connstring, conn) +#define lrg_check_group(conn, group_name) \ + LrgFunctionTypes->libpqlrg_check_group(conn, group_name) +#define lrg_copy_lrg_nodes(remoteconn, localconn) \ + LrgFunctionTypes->libpqlrg_copy_lrg_nodes(remoteconn, localconn) + +#define lrg_insert_into_lrg_nodes(remoteconn, \ + node_id, status, \ + node_name, local_connstring, \ + upstream_connstring) \ + LrgFunctionTypes->libpqlrg_insert_into_lrg_nodes(remoteconn, \ + node_id, status, \ + node_name, local_connstring, \ + upstream_connstring) +#define lrg_create_subscription(group_name, publisher_connstring, \ + publisher_node_id, subscriber_node_id, \ + subscriberconn, options) \ + LrgFunctionTypes->libpqlrg_create_subscription(group_name, publisher_connstring, \ + publisher_node_id, subscriber_node_id, \ + subscriberconn, options) + +#define lrg_drop_publication(group_name, \ + publisherconn) \ + LrgFunctionTypes->libpqlrg_drop_publication(group_name, \ + publisherconn) + +#define lrg_drop_subscription(group_name, \ + publisher_node_id, subscriber_node_id, \ + subscriberconn) \ + LrgFunctionTypes->libpqlrg_drop_subscription(group_name, \ + publisher_node_id, subscriber_node_id, \ + subscriberconn) + +#define lrg_delete_from_nodes(conn, node_id) \ + LrgFunctionTypes->libpqlrg_delete_from_nodes(conn, node_id) + +#define lrg_cleanup(conn) \ + LrgFunctionTypes->libpqlrg_cleanup(conn) + +#define lrg_disconnect(conn) \ + LrgFunctionTypes->libpqlrg_disconnect(conn) + +#endif /* LIBPQLIG_H */ \ No newline at end of file diff --git a/src/include/replication/lrg.h b/src/include/replication/lrg.h new file mode 100644 index 0000000000..f0e38696cf --- /dev/null +++ b/src/include/replication/lrg.h @@ -0,0 +1,68 @@ +/*------------------------------------------------------------------------- + * + * lrg.h + * Constructs a logical replication group + * + *------------------------------------------------------------------------- + */ +#ifndef LRG_H +#define LRG_H + +#include "postgres.h" + +#include "storage/latch.h" +#include "storage/lock.h" +#include "storage/lwlock.h" + +/* + * enumeration for represents its status + */ +typedef enum +{ + LRG_STATE_INIT = 0, + LRG_STATE_CREATE_PUBLICATION, + LRG_STATE_CREATE_SUBSCRIPTION, + LRG_STATE_READY, + LRG_STATE_TO_BE_DETACHED, +} LRG_NODE_STATE; + +/* + * working space for each lrg per-db worker. + */ +typedef struct LrgPerdbWorker { + pid_t worker_pid; + Oid dbid; + Latch *worker_latch; +} LrgPerdbWorker; + +/* + * controller for lrg per-db worker. + * This will be hold by launcher. + */ +typedef struct LrgPerdbCtxStruct { + LWLock lock; + pid_t launcher_pid; + Latch *launcher_latch; + LrgPerdbWorker workers[FLEXIBLE_ARRAY_MEMBER]; +} LrgPerdbCtxStruct; + +extern LrgPerdbCtxStruct *LrgPerdbCtx; + +/* lrg.c */ +extern void LrgLauncherShmemInit(void); +extern void LrgLauncherRegister(void); +extern void lrg_add_nodes(char *node_id, Oid group_id, LRG_NODE_STATE status, char *node_name, char *local_connstring, char *upstream_connstring); +extern Oid get_group_info(char **group_name); +extern void construct_node_id(char *out_node_id, int size); +extern void update_node_status_by_nodename(const char *node_name, LRG_NODE_STATE state, bool is_in_txn); +extern void update_node_status_by_nodeid(const char *node_id, LRG_NODE_STATE state, bool is_in_txn); + +/* lrg_launcher.c */ +extern void lrg_launcher_main(Datum arg) pg_attribute_noreturn(); +extern void lrg_launcher_wakeup(void); + +/* *lrg_worker.c */ +extern void lrg_worker_main(Datum arg) pg_attribute_noreturn(); +extern void lrg_worker_cleanup(LrgPerdbWorker *worker); + +#endif /* LRG_H */ \ No newline at end of file diff --git a/src/test/regress/expected/oidjoins.out b/src/test/regress/expected/oidjoins.out index 215eb899be..da0ef150a2 100644 --- a/src/test/regress/expected/oidjoins.out +++ b/src/test/regress/expected/oidjoins.out @@ -266,3 +266,9 @@ NOTICE: checking pg_subscription {subdbid} => pg_database {oid} NOTICE: checking pg_subscription {subowner} => pg_authid {oid} NOTICE: checking pg_subscription_rel {srsubid} => pg_subscription {oid} NOTICE: checking pg_subscription_rel {srrelid} => pg_class {oid} +NOTICE: checking pg_lrg_nodes {groupid} => pg_lrg_info {oid} +NOTICE: checking pg_lrg_nodes {dbid} => pg_database {oid} +NOTICE: checking pg_lrg_pub {groupid} => pg_lrg_info {oid} +NOTICE: checking pg_lrg_pub {pubid} => pg_publication {oid} +NOTICE: checking pg_lrg_sub {groupid} => pg_lrg_info {oid} +NOTICE: checking pg_lrg_sub {subid} => pg_subscription {oid} -- 2.27.0