diff --git a/contrib/test_decoding/t/002_online_activation.pl b/contrib/test_decoding/t/002_online_activation.pl new file mode 100644 index 00000000000..08512a05ae6 --- /dev/null +++ b/contrib/test_decoding/t/002_online_activation.pl @@ -0,0 +1,87 @@ + +# Copyright (c) 2021-2024, PostgreSQL Global Development Group + +use strict; +use warnings FATAL => 'all'; +use PostgreSQL::Test::Cluster; +use PostgreSQL::Test::Utils; +use Test::More; + +# Initialize publisher node +my $node = PostgreSQL::Test::Cluster->new('node'); +$node->init(allows_streaming => 1); +$node->append_conf('postgresql.conf', 'synchronous_commit = on'); +$node->start; + +my ($ret, $stdout, $stderr); +$ret = $node->safe_psql('postgres', 'show wal_level'); +print $ret; + +$ret = $node->safe_psql('postgres', 'SELECT pg_get_logical_decoding_status()'); +is($ret, 'disabled', 'logical decoding must be disabled'); + +# check if a logical replication slot cannot be craeted while logical +# decoding is disabled. +($ret, $stdout, $stderr) = $node->psql( + 'postgres', + q[SELECT pg_create_logical_replication_slot('regression_slot1', 'test_decoding')] + ); +ok( $stderr =~ /ERROR: logical decoding requires "wal_level" >= "logical"/, + "logical replication slot cannot be created while logical decoding is disableed"); + +# Activate logical info logging. +$node->safe_psql('postgres', + 'SELECT pg_activate_logical_decoding()'); +$ret = $node->safe_psql('postgres', 'SELECT pg_get_logical_decoding_status()'); +is($ret, 'ready', 'logical decoding gets activated'); + +# Now we can create a logical replication slot. +$ret = $node->safe_psql( + 'postgres', + q[SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot1', 'test_decoding')] + ); +is($ret, 'init', 'successfully create logical replication slot'); + +# Cannot deactivate logical decoding while having valid slots. +($ret, $stdout, $stderr) = $node->psql( + 'postgres', + q[SELECT pg_deactivate_logical_decoding()] + ); +ok( $stderr =~ /ERROR: cannot deactivate logical decoding while having valid logical replication slots/, + "cannot deactivate logical decoding while having valid logical slots"); + +$node->safe_psql( + 'postgres', + q[ +CREATE TABLE test (a int primary key, b int); +INSERT INTO test values (1, 100), (2, 200); +UPDATE test SET b = b + 10 WHERE a = 1; + ]); + +$ret = $node->safe_psql( + 'postgres', + q[SELECT data FROM pg_logical_slot_get_changes('regression_slot1', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');] + ); +is($ret, 'BEGIN +table public.test: INSERT: a[integer]:1 b[integer]:100 +table public.test: INSERT: a[integer]:2 b[integer]:200 +COMMIT +BEGIN +table public.test: UPDATE: a[integer]:1 b[integer]:110 +COMMIT', 'logical decoding works fine'); + +# Drop the replication slot. +$node->safe_psql( + 'postgres', + q[SELECT pg_drop_replication_slot('regression_slot1')]); + +# Deactivate logical decoding. +$node->safe_psql( + 'postgres', + q[SELECT pg_deactivate_logical_decoding()]); + +$ret = $node->safe_psql('postgres', 'SELECT pg_get_logical_decoding_status()'); +is($ret, 'disabled', 'logical decoding gets activated'); + +done_testing(); + diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c index 6cdc68d981a..66c62745718 100644 --- a/src/backend/access/heap/heapam.c +++ b/src/backend/access/heap/heapam.c @@ -8894,8 +8894,8 @@ log_heap_update(Relation reln, Buffer oldbuf, /* * Perform XLogInsert of an XLOG_HEAP2_NEW_CID record * - * This is only used in wal_level >= WAL_LEVEL_LOGICAL, and only for catalog - * tuples. + * This is only used when XLogLogicalInfoActive() is true, and only for + * catalog tuples. */ static XLogRecPtr log_heap_new_cid(Relation relation, HeapTuple tup) diff --git a/src/backend/access/rmgrdesc/xlogdesc.c b/src/backend/access/rmgrdesc/xlogdesc.c index 363294d6234..8c5be07aa37 100644 --- a/src/backend/access/rmgrdesc/xlogdesc.c +++ b/src/backend/access/rmgrdesc/xlogdesc.c @@ -167,6 +167,13 @@ xlog_desc(StringInfo buf, XLogReaderState *record) memcpy(&wal_level, rec, sizeof(int)); appendStringInfo(buf, "wal_level %s", get_wal_level_string(wal_level)); } + else if (info == XLOG_LOGICAL_DECODING_STATUS) + { + bool logical_decoding; + + memcpy(&logical_decoding, rec, sizeof(bool)); + appendStringInfoString(buf, logical_decoding ? "true" : "false"); + } } const char * @@ -218,6 +225,9 @@ xlog_identify(uint8 info) case XLOG_CHECKPOINT_REDO: id = "CHECKPOINT_REDO"; break; + case XLOG_LOGICAL_DECODING_STATUS: + id = "LOGICAL_DECODING_STATUS"; + break; } return id; diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 6f58412bcab..649bde82095 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -78,6 +78,7 @@ #include "postmaster/walsummarizer.h" #include "postmaster/walwriter.h" #include "replication/origin.h" +#include "replication/logicalxlog.h" #include "replication/slot.h" #include "replication/snapbuild.h" #include "replication/walreceiver.h" @@ -5078,6 +5079,7 @@ BootStrapXLOG(uint32 data_checksum_version) checkPoint.ThisTimeLineID = BootstrapTimeLineID; checkPoint.PrevTimeLineID = BootstrapTimeLineID; checkPoint.fullPageWrites = fullPageWrites; + checkPoint.logicalDecoding = false; checkPoint.wal_level = wal_level; checkPoint.nextXid = FullTransactionIdFromEpochAndXid(0, FirstNormalTransactionId); @@ -5594,6 +5596,12 @@ StartupXLOG(void) */ RelationCacheInitFileRemove(); + /* + * Initialize logical decoding status, before initializing replication + * slots. + */ + StartupLogicalDecodingStatus(checkPoint.logicalDecoding); + /* * Initialize replication slots, before there's a chance to remove * required resources. @@ -7002,6 +7010,7 @@ CreateCheckPoint(int flags) checkPoint.fullPageWrites = Insert->fullPageWrites; checkPoint.wal_level = wal_level; + checkPoint.logicalDecoding = IsLogicalDecodingActive(); if (shutdown) { @@ -8583,6 +8592,25 @@ xlog_redo(XLogReaderState *record) { /* nothing to do here, just for informational purposes */ } + else if (info == XLOG_LOGICAL_DECODING_STATUS) + { + bool enabled; + + memcpy(&enabled, XLogRecGetData(record), sizeof(bool)); + + /* + * Invalidate logical slots if we are in hot standby and the primary + * disables the logical decoding. + */ + if (InRecovery && InHotStandby && + IsLogicalDecodingActive() && + !enabled) + InvalidateObsoleteReplicationSlots(RS_INVAL_WAL_LEVEL, + 0, InvalidOid, + InvalidTransactionId); + + UpdateLogicalDecodingStatus(enabled); + } } /* diff --git a/src/backend/commands/publicationcmds.c b/src/backend/commands/publicationcmds.c index 5050057a7e4..85953ba2588 100644 --- a/src/backend/commands/publicationcmds.c +++ b/src/backend/commands/publicationcmds.c @@ -898,11 +898,11 @@ CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt) InvokeObjectPostCreateHook(PublicationRelationId, puboid, 0); - if (wal_level != WAL_LEVEL_LOGICAL) + if (!XLogLogicalInfoActive()) ereport(WARNING, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - errmsg("\"wal_level\" is insufficient to publish logical changes"), - errhint("Set \"wal_level\" to \"logical\" before creating subscriptions."))); + errmsg("logical decoding needs to be enabled to publish logical changes"), + errhint("Set \"wal_level\" to \"logical\" or activate logical decoding before creating subscriptions."))); return myself; } diff --git a/src/backend/replication/logical/Makefile b/src/backend/replication/logical/Makefile index 1e08bbbd4eb..95bcf25a02b 100644 --- a/src/backend/replication/logical/Makefile +++ b/src/backend/replication/logical/Makefile @@ -21,6 +21,7 @@ OBJS = \ launcher.o \ logical.o \ logicalfuncs.o \ + logicalxlog.o \ message.o \ origin.o \ proto.o \ diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 4dc14fdb495..e85d26aac14 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -35,6 +35,7 @@ #include "pgstat.h" #include "replication/decode.h" #include "replication/logical.h" +#include "replication/logicalxlog.h" #include "replication/reorderbuffer.h" #include "replication/slotsync.h" #include "replication/snapbuild.h" @@ -115,7 +116,7 @@ CheckLogicalDecodingRequirements(void) * needs the same check. */ - if (wal_level < WAL_LEVEL_LOGICAL) + if (!XLogLogicalInfoActive()) ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("logical decoding requires \"wal_level\" >= \"logical\""))); @@ -138,7 +139,7 @@ CheckLogicalDecodingRequirements(void) if (GetActiveWalLevelOnStandby() < WAL_LEVEL_LOGICAL) ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - errmsg("logical decoding on standby requires \"wal_level\" >= \"logical\" on the primary"))); + errmsg("logical decoding on standby requires to enable logical decoding on the primary"))); } } diff --git a/src/backend/replication/logical/logicalxlog.c b/src/backend/replication/logical/logicalxlog.c new file mode 100644 index 00000000000..d409efffc28 --- /dev/null +++ b/src/backend/replication/logical/logicalxlog.c @@ -0,0 +1,379 @@ +/*------------------------------------------------------------------------- + * logicalxlog.c + * This module contains the codes for enabling or disabling to include + * logical information into WAL records and logical decoding. + * + * Copyright (c) 2012-2024, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/backend/replication/logical/logicalxlog.c + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "access/transam.h" +#include "access/xloginsert.h" +#include "access/xlogrecovery.h" +#include "catalog/pg_control.h" +#include "fmgr.h" +#include "funcapi.h" +#include "miscadmin.h" +#include "replication/logicalxlog.h" +#include "replication/slot.h" +#include "storage/ipc.h" +#include "storage/lmgr.h" +#include "storage/procarray.h" +#include "storage/procsignal.h" +#include "utils/builtins.h" +#include "utils/wait_event.h" + +typedef struct XLogLogicalInfoCtlData +{ + LogicalDecodingStatus status; + slock_t mutex; + ConditionVariable cv; +} XLogLogicalInfoCtlData; +XLogLogicalInfoCtlData *XLogLogicalInfoCtl = NULL; + +static void logical_decoding_activation_abort_callback(int code, Datum arg); +static void do_activate_logical_decoding(void); + +/* + * Initialization of shared memory. + */ + +Size +LogicalXlogShmemSize(void) +{ + return sizeof(XLogLogicalInfoCtlData); +} + +void +LogicalXlogShmemInit(void) +{ + bool found; + + XLogLogicalInfoCtl = ShmemInitStruct("Logical Info Logging", + sizeof(XLogLogicalInfoCtlData), + &found); + + if (!found) + { + XLogLogicalInfoCtl->status = LOGICAL_DECODING_STATUS_DISABLED; + SpinLockInit(&XLogLogicalInfoCtl->mutex); + ConditionVariableInit(&XLogLogicalInfoCtl->cv); + } +} + +/* + * This must be called ONCE during postmaster or standalone-backend startup, + * before calling StartupReplicationSlots(). + */ +void +StartupLogicalDecodingStatus(bool enabled_at_last_checkpoint) +{ + LogicalDecodingStatus status; + + if (enabled_at_last_checkpoint) + status = LOGICAL_DECODING_STATUS_READY; + else + status = LOGICAL_DECODING_STATUS_DISABLED; + + /* + * On standbys, we always start with the status in the last checkpoint + * record. If changes of wal_level or logical decoding status is sent from + * the primary, we will enable or disable the logical decoding while + * replaying the WAL record and invalidate slots if necessary. + */ + if (!StandbyMode) + { + /* + * Disable logical decoding if replication slots are not available. + */ + if (max_replication_slots == 0) + status = LOGICAL_DECODING_STATUS_DISABLED; + + /* + * If previously the logical decoding was active but the server + * restarted with wal_level < 'replica', we disable the logical + * decoding. + */ + if (wal_level < WAL_LEVEL_REPLICA) + status = LOGICAL_DECODING_STATUS_DISABLED; + + /* + * Setting wal_level to 'logical' immediately enables logical decoding + * and WAL-logging logical info. + */ + if (wal_level >= WAL_LEVEL_LOGICAL) + status = LOGICAL_DECODING_STATUS_READY; + } + + XLogLogicalInfoCtl->status = status; +} + +void +UpdateLogicalDecodingStatus(bool activate) +{ + XLogLogicalInfoCtl->status = activate + ? LOGICAL_DECODING_STATUS_READY + : LOGICAL_DECODING_STATUS_DISABLED; +} + +/* + * Is WAL-Logging logical info enabled? + */ +bool inline +XLogLogicalInfoEnabled(void) +{ + /* + * Use volatile pointer to make sure we make a fresh read of the shared + * variable. + */ + volatile XLogLogicalInfoCtlData *ctl = XLogLogicalInfoCtl; + + return ctl->status >= LOGICAL_DECODING_STATUS_XLOG_LOGICALINFO; +} + +/* + * Is logical decoding active? + */ +bool inline +IsLogicalDecodingActive(void) +{ + /* + * Use volatile pointer to make sure we make a fresh read of the shared + * variable. + */ + volatile XLogLogicalInfoCtlData *ctl = XLogLogicalInfoCtl; + + return ctl->status == LOGICAL_DECODING_STATUS_READY; +} + +static void +do_activate_logical_decoding(void) +{ + bool recoveryInProgress = RecoveryInProgress(); + + /* + * Get the latest status of logical info logging. If it's already + * activated, quick return. + */ + SpinLockAcquire(&XLogLogicalInfoCtl->mutex); + if (XLogLogicalInfoCtl->status >= LOGICAL_DECODING_STATUS_XLOG_LOGICALINFO) + { + SpinLockRelease(&XLogLogicalInfoCtl->mutex); + return; + } + + /* Activate the logical info logging */ + XLogLogicalInfoCtl->status = LOGICAL_DECODING_STATUS_XLOG_LOGICALINFO; + SpinLockRelease(&XLogLogicalInfoCtl->mutex); + + PG_ENSURE_ERROR_CLEANUP(logical_decoding_activation_abort_callback, 0); + { + RunningTransactions running_xacts; + + running_xacts = GetRunningTransactionData(); + + /* + * GetRunningTransactionData() acquired both ProcArrayLock and + * XidGenLock, we must release them. + */ + LWLockRelease(ProcArrayLock); + LWLockRelease(XidGenLock); + + for (int i = 0; i < running_xacts->xcnt; i++) + { + TransactionId xid = running_xacts->xids[i]; + + /* + * Upper layers should prevent that we ever need to wait on + * ourselves. Check anyway, since failing to do so would either + * result in an endless wait or an Assert() failure. + */ + if (TransactionIdIsCurrentTransactionId(xid)) + elog(ERROR, "waiting for ourselves"); + + XactLockTableWait(xid, NULL, NULL, XLTW_None); + } + } + PG_END_ENSURE_ERROR_CLEANUP(logical_decoding_activation_abort_callback, 0); + + START_CRIT_SECTION(); + + /* Activate logical decoding on the database cluster */ + SpinLockAcquire(&XLogLogicalInfoCtl->mutex); + XLogLogicalInfoCtl->status = LOGICAL_DECODING_STATUS_READY; + SpinLockRelease(&XLogLogicalInfoCtl->mutex); + + if (XLogStandbyInfoActive() && !recoveryInProgress) + { + bool enabled = true; + + XLogBeginInsert(); + XLogRegisterData((char *) (&enabled), sizeof(bool)); + + XLogInsert(RM_XLOG_ID, XLOG_LOGICAL_DECODING_STATUS); + } + + END_CRIT_SECTION(); + + /* Let everybody know we've activated the logical decoding */ + ConditionVariableBroadcast(&XLogLogicalInfoCtl->cv); +} + +static void +logical_decoding_activation_abort_callback(int code, Datum arg) +{ + SpinLockAcquire(&XLogLogicalInfoCtl->mutex); + XLogLogicalInfoCtl->status = LOGICAL_DECODING_STATUS_DISABLED; + SpinLockRelease(&XLogLogicalInfoCtl->mutex); + + /* Let everybody know we failed to activate the logical decoding */ + ConditionVariableBroadcast(&XLogLogicalInfoCtl->cv); +} + +Datum +pg_activate_logical_decoding(PG_FUNCTION_ARGS) +{ + LogicalDecodingStatus status; + + if (max_replication_slots == 0) + ereport(LOG, + (errmsg("logical decoding can only be activated if \"max_replication_slots\" > 0"))); + + if (wal_level < WAL_LEVEL_REPLICA) + ereport(LOG, + (errmsg("logical decoding can only be activated if \"wal_level\" >= \"replica\""))); + + if (IsTransactionState() && + GetTopTransactionIdIfAny() != InvalidTransactionId) + ereport(ERROR, + (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION), + errmsg("cannot activate logical decoding in transaction that has performed writes"))); + + + SpinLockAcquire(&(XLogLogicalInfoCtl->mutex)); + status = XLogLogicalInfoCtl->status; + SpinLockRelease(&(XLogLogicalInfoCtl->mutex)); + + if (status == LOGICAL_DECODING_STATUS_READY) + PG_RETURN_VOID(); + + /* + * Get ready to sleep until the logical decoding gets activated. We may + * end up not sleeping, but we don't want to do this while holding the + * spinlock. + */ + ConditionVariablePrepareToSleep(&XLogLogicalInfoCtl->cv); + + for (;;) + { + SpinLockAcquire(&XLogLogicalInfoCtl->mutex); + status = XLogLogicalInfoCtl->status; + SpinLockRelease(&XLogLogicalInfoCtl->mutex); + + /* Return if the logical decoding is activated */ + if (status == LOGICAL_DECODING_STATUS_READY) + break; + + if (status == LOGICAL_DECODING_STATUS_XLOG_LOGICALINFO) + { + /* + * Someone has already started the activation process. Wait for + * the activation to complete and check the status again. + */ + ConditionVariableSleep(&XLogLogicalInfoCtl->cv, + WAIT_EVENT_LOGICAL_DECODING_ACTIVATION); + + continue; + } + + /* Activate logical decoding */ + do_activate_logical_decoding(); + } + + ConditionVariableCancelSleep(); + + PG_RETURN_VOID(); +} + +Datum +pg_deactivate_logical_decoding(PG_FUNCTION_ARGS) +{ + int valid_logical_slots = 0; + bool recoveryInProgress = RecoveryInProgress(); + + /* + * Hold ReplicationSlotAllocationLock to prevent slots from newly + * being created while deactivating the logical decoding. + */ + LWLockAcquire(ReplicationSlotAllocationLock, LW_SHARED); + + for (int i = 0; i < max_replication_slots; i++) + { + ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i]; + + if (!s->in_use) + continue; + + if (SlotIsPhysical(s)) + continue; + + if (s->data.invalidated != RS_INVAL_NONE) + continue; + + valid_logical_slots++; + } + + if (valid_logical_slots > 0) + ereport(ERROR, + (errmsg("cannot deactivate logical decoding while having valid logical replication slots"))); + + if (XLogStandbyInfoActive() && !recoveryInProgress) + { + bool enabled = false; + + XLogBeginInsert(); + XLogRegisterData((char *) (&enabled), sizeof(bool)); + + XLogInsert(RM_XLOG_ID, XLOG_LOGICAL_DECODING_STATUS); + } + + SpinLockAcquire(&(XLogLogicalInfoCtl->mutex)); + XLogLogicalInfoCtl->status = LOGICAL_DECODING_STATUS_DISABLED; + SpinLockRelease(&(XLogLogicalInfoCtl->mutex)); + + LWLockRelease(ReplicationSlotAllocationLock); + + PG_RETURN_BOOL(true); +} + +Datum +pg_get_logical_decoding_status(PG_FUNCTION_ARGS) +{ + LogicalDecodingStatus status; + const char *status_str = "unknown"; + + SpinLockAcquire(&(XLogLogicalInfoCtl->mutex)); + status = XLogLogicalInfoCtl->status; + SpinLockRelease(&(XLogLogicalInfoCtl->mutex)); + + switch (status) + { + case LOGICAL_DECODING_STATUS_DISABLED: + status_str = "disabled"; + break; + case LOGICAL_DECODING_STATUS_XLOG_LOGICALINFO: + status_str = "xlog-logicalinfo"; + break; + case LOGICAL_DECODING_STATUS_READY: + status_str = "ready"; + break; + } + + PG_RETURN_TEXT_P(cstring_to_text(status_str)); +} diff --git a/src/backend/replication/logical/meson.build b/src/backend/replication/logical/meson.build index 3d36249d8ad..82cec21d5ed 100644 --- a/src/backend/replication/logical/meson.build +++ b/src/backend/replication/logical/meson.build @@ -7,6 +7,7 @@ backend_sources += files( 'launcher.c', 'logical.c', 'logicalfuncs.c', + 'logicalxlog.c', 'message.c', 'origin.c', 'proto.c', diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c index f4f80b23129..85bf4a7be99 100644 --- a/src/backend/replication/logical/slotsync.c +++ b/src/backend/replication/logical/slotsync.c @@ -1042,10 +1042,10 @@ ValidateSlotSyncParams(int elevel) * Since altering the wal_level requires a server restart, so error out in * this case regardless of elevel provided by caller. */ - if (wal_level < WAL_LEVEL_LOGICAL) + if (!XLogLogicalInfoActive()) ereport(ERROR, errcode(ERRCODE_INVALID_PARAMETER_VALUE), - errmsg("replication slot synchronization requires \"wal_level\" >= \"logical\"")); + errmsg("replication slot synchronization requires \"wal_level\" >= \"logical\" or to activate logical decoding")); /* * A physical replication slot(primary_slot_name) is required on the diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index 4a206f95277..5d90a113bf2 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -50,6 +50,7 @@ #include "replication/slotsync.h" #include "replication/slot.h" #include "replication/walsender_private.h" +#include "replication/logicalxlog.h" #include "storage/fd.h" #include "storage/ipc.h" #include "storage/proc.h" @@ -2355,12 +2356,12 @@ RestoreSlotFromDisk(const char *name) * NB: Changing the requirements here also requires adapting * CheckSlotRequirements() and CheckLogicalDecodingRequirements(). */ - if (cp.slotdata.database != InvalidOid && wal_level < WAL_LEVEL_LOGICAL) + if (cp.slotdata.database != InvalidOid && !XLogLogicalInfoActive()) ereport(FATAL, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("logical replication slot \"%s\" exists, but \"wal_level\" < \"logical\"", NameStr(cp.slotdata.name)), - errhint("Change \"wal_level\" to be \"logical\" or higher."))); + errhint("Change \"wal_level\" to be \"logical\" or higher, or activate logical decoding with \"replica\"."))); else if (wal_level < WAL_LEVEL_REPLICA) ereport(FATAL, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c index 7783ba854fc..8a2bf9a54db 100644 --- a/src/backend/storage/ipc/ipci.c +++ b/src/backend/storage/ipc/ipci.c @@ -32,6 +32,7 @@ #include "postmaster/bgwriter.h" #include "postmaster/walsummarizer.h" #include "replication/logicallauncher.h" +#include "replication/logicalxlog.h" #include "replication/origin.h" #include "replication/slot.h" #include "replication/slotsync.h" @@ -148,6 +149,7 @@ CalculateShmemSize(int *num_semaphores) size = add_size(size, WaitEventCustomShmemSize()); size = add_size(size, InjectionPointShmemSize()); size = add_size(size, SlotSyncShmemSize()); + size = add_size(size, LogicalXlogShmemSize()); /* include additional requested shmem from preload libraries */ size = add_size(size, total_addin_request); @@ -330,6 +332,7 @@ CreateOrAttachShmemStructs(void) PgArchShmemInit(); ApplyLauncherShmemInit(); SlotSyncShmemInit(); + LogicalXlogShmemInit(); /* * Set up other modules that need some shared memory space diff --git a/src/backend/storage/ipc/procsignal.c b/src/backend/storage/ipc/procsignal.c index 87027f27eb7..719ec72cd9a 100644 --- a/src/backend/storage/ipc/procsignal.c +++ b/src/backend/storage/ipc/procsignal.c @@ -23,6 +23,7 @@ #include "pgstat.h" #include "port/pg_bitutils.h" #include "replication/logicalworker.h" +#include "replication/logicalxlog.h" #include "replication/walsender.h" #include "storage/condition_variable.h" #include "storage/ipc.h" diff --git a/src/backend/storage/ipc/standby.c b/src/backend/storage/ipc/standby.c index 25267f0f85d..c02ead5bad7 100644 --- a/src/backend/storage/ipc/standby.c +++ b/src/backend/storage/ipc/standby.c @@ -498,7 +498,7 @@ ResolveRecoveryConflictWithSnapshot(TransactionId snapshotConflictHorizon, * seems OK, given that this kind of conflict should not normally be * reached, e.g. due to using a physical replication slot. */ - if (wal_level >= WAL_LEVEL_LOGICAL && isCatalogRel) + if (XLogLogicalInfoActive() && isCatalogRel) InvalidateObsoleteReplicationSlots(RS_INVAL_HORIZON, 0, locator.dbOid, snapshotConflictHorizon); } @@ -1313,13 +1313,13 @@ LogStandbySnapshot(void) * record. Fortunately this routine isn't executed frequently, and it's * only a shared lock. */ - if (wal_level < WAL_LEVEL_LOGICAL) + if (!XLogLogicalInfoActive()) LWLockRelease(ProcArrayLock); recptr = LogCurrentRunningXacts(running); /* Release lock if we kept it longer ... */ - if (wal_level >= WAL_LEVEL_LOGICAL) + if (XLogLogicalInfoActive()) LWLockRelease(ProcArrayLock); /* GetRunningTransactionData() acquired XidGenLock, we must release it */ diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c index 720ef99ee83..8a8ba6765f7 100644 --- a/src/backend/storage/lmgr/proc.c +++ b/src/backend/storage/lmgr/proc.c @@ -41,6 +41,7 @@ #include "postmaster/autovacuum.h" #include "replication/slotsync.h" #include "replication/syncrep.h" +#include "replication/logicalxlog.h" #include "storage/condition_variable.h" #include "storage/ipc.h" #include "storage/lmgr.h" diff --git a/src/backend/utils/activity/wait_event_names.txt b/src/backend/utils/activity/wait_event_names.txt index 16144c2b72d..45bf8da63db 100644 --- a/src/backend/utils/activity/wait_event_names.txt +++ b/src/backend/utils/activity/wait_event_names.txt @@ -131,6 +131,7 @@ HASH_GROW_BUCKETS_ELECT "Waiting to elect a Parallel Hash participant to allocat HASH_GROW_BUCKETS_REALLOCATE "Waiting for an elected Parallel Hash participant to finish allocating more buckets." HASH_GROW_BUCKETS_REINSERT "Waiting for other Parallel Hash participants to finish inserting tuples into new buckets." LOGICAL_APPLY_SEND_DATA "Waiting for a logical replication leader apply process to send data to a parallel apply process." +LOGICAL_DECODING_ACTIVATION "Waiting for logical decoding to be activated." LOGICAL_PARALLEL_APPLY_STATE_CHANGE "Waiting for a logical replication parallel apply process to change state." LOGICAL_SYNC_DATA "Waiting for a logical replication remote server to send data for initial table synchronization." LOGICAL_SYNC_STATE_CHANGE "Waiting for a logical replication remote server to change state." diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h index 34ad46c067b..e5f661be85e 100644 --- a/src/include/access/xlog.h +++ b/src/include/access/xlog.h @@ -14,6 +14,7 @@ #include "access/xlogbackup.h" #include "access/xlogdefs.h" #include "datatype/timestamp.h" +#include "replication/logicalxlog.h" #include "lib/stringinfo.h" #include "nodes/pg_list.h" @@ -123,7 +124,7 @@ extern PGDLLIMPORT int wal_level; #define XLogStandbyInfoActive() (wal_level >= WAL_LEVEL_REPLICA) /* Do we need to WAL-log information required only for logical replication? */ -#define XLogLogicalInfoActive() (wal_level >= WAL_LEVEL_LOGICAL) +#define XLogLogicalInfoActive() (XLogLogicalInfoEnabled()) #ifdef WAL_DEBUG extern PGDLLIMPORT bool XLOG_DEBUG; diff --git a/src/include/catalog/pg_control.h b/src/include/catalog/pg_control.h index e80ff8e4140..764e22798dc 100644 --- a/src/include/catalog/pg_control.h +++ b/src/include/catalog/pg_control.h @@ -40,6 +40,7 @@ typedef struct CheckPoint TimeLineID PrevTimeLineID; /* previous TLI, if this record begins a new * timeline (equals ThisTimeLineID otherwise) */ bool fullPageWrites; /* current full_page_writes */ + bool logicalDecoding; /* true if logical decoding is enabled */ int wal_level; /* current wal_level */ FullTransactionId nextXid; /* next free transaction ID */ Oid nextOid; /* next free OID */ @@ -80,6 +81,7 @@ typedef struct CheckPoint /* 0xC0 is used in Postgres 9.5-11 */ #define XLOG_OVERWRITE_CONTRECORD 0xD0 #define XLOG_CHECKPOINT_REDO 0xE0 +#define XLOG_LOGICAL_DECODING_STATUS 0xF0 /* diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index 2dcc2d42dac..5d04212ac1f 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -11425,6 +11425,22 @@ proname => 'pg_sync_replication_slots', provolatile => 'v', proparallel => 'u', prorettype => 'void', proargtypes => '', prosrc => 'pg_sync_replication_slots' }, +{ oid => '8976', + descr => 'enable both logical info WAL-logging and logical decoding facility', + proname => 'pg_activate_logical_decoding', provolatile => 'v', + proparallel => 'u', prorettype => 'void', proargtypes => '', + prosrc => 'pg_activate_logical_decoding' }, +{ oid => '8977', + descr => 'deactivate logical decoding facility', + proname => 'pg_deactivate_logical_decoding', provolatile => 'v', + proparallel => 'u', prorettype => 'bool', proargtypes => '', + prosrc => 'pg_deactivate_logical_decoding' }, +{ oid => '8978', + descr => 'get the current logical decoding state', + proname => 'pg_get_logical_decoding_status', provolatile => 'v', + proparallel => 'u', prorettype => 'text', proargtypes => '', + prosrc => 'pg_get_logical_decoding_status' }, + # event triggers { oid => '3566', descr => 'list objects dropped by the current command', diff --git a/src/include/replication/logicalxlog.h b/src/include/replication/logicalxlog.h new file mode 100644 index 00000000000..7a95b947c36 --- /dev/null +++ b/src/include/replication/logicalxlog.h @@ -0,0 +1,33 @@ +/*------------------------------------------------------------------------- + * logicalxlog.h + * Exports from replication/logical/logicalxlog.c. + * + * Copyright (c) 2012-2024, PostgreSQL Global Development Group + * + *------------------------------------------------------------------------- + */ +#ifndef LOGICALXLOG_H +#define LOGICALXLOG_H + +/* + * The status of WAL-logging logical info and logical decoding. + */ +typedef enum LogicalDecodingStatus +{ + LOGICAL_DECODING_STATUS_DISABLED = 0, + LOGICAL_DECODING_STATUS_XLOG_LOGICALINFO, + LOGICAL_DECODING_STATUS_READY, +} LogicalDecodingStatus; + +typedef struct XLogLogicalInfoCtlData XLogLogicalInfoCtlData; +extern XLogLogicalInfoCtlData *XLogLogicalInfoCtl; +extern bool XLogLogicalInfo; + +extern Size LogicalXlogShmemSize(void); +extern void LogicalXlogShmemInit(void); +extern void StartupLogicalDecodingStatus(bool enabled_at_last_checkpoint); +extern bool XLogLogicalInfoEnabled(void); +extern bool IsLogicalDecodingActive(void); +extern void UpdateLogicalDecodingStatus(bool enabled); + +#endif /* LOGICALXLOG_H */ diff --git a/src/test/recovery/t/035_standby_logical_decoding.pl b/src/test/recovery/t/035_standby_logical_decoding.pl index d658fdf417c..3bc06afcd8c 100644 --- a/src/test/recovery/t/035_standby_logical_decoding.pl +++ b/src/test/recovery/t/035_standby_logical_decoding.pl @@ -801,7 +801,7 @@ $handle = make_slot_active($node_standby, 'wal_level_', 0, \$stdout, \$stderr); # We are not able to read from the slot as it requires wal_level >= logical on the primary server check_pg_recvlogical_stderr($handle, - "logical decoding on standby requires \"wal_level\" >= \"logical\" on the primary" + "logical decoding on standby requires to enable logical decoding on the primary" ); # Restore primary wal_level diff --git a/src/test/subscription/t/001_rep_changes.pl b/src/test/subscription/t/001_rep_changes.pl index d377e7ae2b6..4a3fec73914 100644 --- a/src/test/subscription/t/001_rep_changes.pl +++ b/src/test/subscription/t/001_rep_changes.pl @@ -573,7 +573,7 @@ CREATE PUBLICATION tap_pub2 FOR TABLE skip_wal; ROLLBACK; }); ok( $reterr =~ - m/WARNING: "wal_level" is insufficient to publish logical changes/, + m/WARNING: logical decoding needs to be enabled to publish logical changes/, 'CREATE PUBLICATION while "wal_level=minimal"'); done_testing();