diff --git a/contrib/test_decoding/expected/slot.out b/contrib/test_decoding/expected/slot.out index 7de03c79f6f..22ee8fdc0b8 100644 --- a/contrib/test_decoding/expected/slot.out +++ b/contrib/test_decoding/expected/slot.out @@ -466,3 +466,80 @@ SELECT pg_drop_replication_slot('physical_slot'); (1 row) +-- +-- Test that temporary slots are properly dropped on error, even when +-- the error is caught by a PL/pgSQL EXCEPTION handler (which doesn't +-- terminate the session). +-- Test 1: create_logical_replication_slot with invalid plugin (temporary=true) +DO $$ +BEGIN + PERFORM pg_create_logical_replication_slot('regression_slot_error', 'nonexistent_plugin_xyz', true); +EXCEPTION WHEN OTHERS THEN + RAISE NOTICE 'caught: %', SQLERRM; +END; +$$; +NOTICE: caught: could not access file "nonexistent_plugin_xyz": No such file or directory +SELECT count(*) = 0 AS slot_was_dropped FROM pg_replication_slots + WHERE slot_name = 'regression_slot_error'; + slot_was_dropped +------------------ + t +(1 row) + +-- Test 2: session remains usable after the error (MyReplicationSlot cleared) +SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot_t3', 'test_decoding', true); + ?column? +---------- + init +(1 row) + +SELECT count(*) = 1 AS slot_exists FROM pg_replication_slots + WHERE slot_name = 'regression_slot_t3'; + slot_exists +------------- + t +(1 row) + +-- Test 3: pg_replication_slot_advance with a non-existent slot inside +-- an EXCEPTION block — must not leave MyReplicationSlot dangling. +DO $$ +BEGIN + PERFORM pg_replication_slot_advance('regression_slot_nonexist', '0/1'); +EXCEPTION WHEN OTHERS THEN + RAISE NOTICE 'caught: %', SQLERRM; +END; +$$; +NOTICE: caught: replication slot "regression_slot_nonexist" does not exist +-- Session is still healthy: we can advance the real slot +SELECT slot_name FROM pg_replication_slot_advance('regression_slot_t3', pg_current_wal_lsn()); + slot_name +-------------------- + regression_slot_t3 +(1 row) + +-- Test 4: copy_replication_slot with max_replication_slots exceeded. +-- We reduce max_replication_slots artificially by filling all remaining slots. +-- Instead, trigger an error by copying to an already-existing name. +DO $$ +BEGIN + PERFORM pg_copy_logical_replication_slot('regression_slot_t3', 'regression_slot_t3'); +EXCEPTION WHEN OTHERS THEN + RAISE NOTICE 'caught: %', SQLERRM; +END; +$$; +NOTICE: caught: replication slot "regression_slot_t3" already exists +-- The original slot must still exist and be usable +SELECT count(*) = 1 AS orig_slot_ok FROM pg_replication_slots + WHERE slot_name = 'regression_slot_t3'; + orig_slot_ok +-------------- + t +(1 row) + +-- Cleanup (slot is temporary, will drop on disconnect anyway, but be explicit) +SELECT pg_drop_replication_slot('regression_slot_t3'); + pg_drop_replication_slot +-------------------------- + +(1 row) + diff --git a/contrib/test_decoding/sql/slot.sql b/contrib/test_decoding/sql/slot.sql index 580e3ae3bef..29abf302413 100644 --- a/contrib/test_decoding/sql/slot.sql +++ b/contrib/test_decoding/sql/slot.sql @@ -190,3 +190,52 @@ SELECT pg_drop_replication_slot('failover_true_slot'); SELECT pg_drop_replication_slot('failover_false_slot'); SELECT pg_drop_replication_slot('failover_default_slot'); SELECT pg_drop_replication_slot('physical_slot'); + +-- +-- Test that temporary slots are properly dropped on error, even when +-- the error is caught by a PL/pgSQL EXCEPTION handler (which doesn't +-- terminate the session). +-- Test 1: create_logical_replication_slot with invalid plugin (temporary=true) +DO $$ +BEGIN + PERFORM pg_create_logical_replication_slot('regression_slot_error', 'nonexistent_plugin_xyz', true); +EXCEPTION WHEN OTHERS THEN + RAISE NOTICE 'caught: %', SQLERRM; +END; +$$; +SELECT count(*) = 0 AS slot_was_dropped FROM pg_replication_slots + WHERE slot_name = 'regression_slot_error'; + +-- Test 2: session remains usable after the error (MyReplicationSlot cleared) +SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot_t3', 'test_decoding', true); +SELECT count(*) = 1 AS slot_exists FROM pg_replication_slots + WHERE slot_name = 'regression_slot_t3'; + +-- Test 3: pg_replication_slot_advance with a non-existent slot inside +-- an EXCEPTION block — must not leave MyReplicationSlot dangling. +DO $$ +BEGIN + PERFORM pg_replication_slot_advance('regression_slot_nonexist', '0/1'); +EXCEPTION WHEN OTHERS THEN + RAISE NOTICE 'caught: %', SQLERRM; +END; +$$; +-- Session is still healthy: we can advance the real slot +SELECT slot_name FROM pg_replication_slot_advance('regression_slot_t3', pg_current_wal_lsn()); + +-- Test 4: copy_replication_slot with max_replication_slots exceeded. +-- We reduce max_replication_slots artificially by filling all remaining slots. +-- Instead, trigger an error by copying to an already-existing name. +DO $$ +BEGIN + PERFORM pg_copy_logical_replication_slot('regression_slot_t3', 'regression_slot_t3'); +EXCEPTION WHEN OTHERS THEN + RAISE NOTICE 'caught: %', SQLERRM; +END; +$$; +-- The original slot must still exist and be usable +SELECT count(*) = 1 AS orig_slot_ok FROM pg_replication_slots + WHERE slot_name = 'regression_slot_t3'; + +-- Cleanup (slot is temporary, will drop on disconnect anyway, but be explicit) +SELECT pg_drop_replication_slot('regression_slot_t3'); diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c index 71fbaf72269..aa56e90bfab 100644 --- a/src/backend/replication/logical/logicalfuncs.c +++ b/src/backend/replication/logical/logicalfuncs.c @@ -197,10 +197,10 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin else end_of_wal = GetXLogReplayRecPtr(NULL); - ReplicationSlotAcquire(NameStr(*name), true, true); - PG_TRY(); { + ReplicationSlotAcquire(NameStr(*name), true, true); + /* restart at slot's confirmed_flush */ ctx = CreateDecodingContext(InvalidXLogRecPtr, options, @@ -320,6 +320,9 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin /* clear all timetravel entries */ InvalidateSystemCaches(); + if (MyReplicationSlot != NULL) + ReplicationSlotRelease(); + PG_RE_THROW(); } PG_END_TRY(); diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c index 16fbd383735..acc643ac749 100644 --- a/src/backend/replication/slotfuncs.c +++ b/src/backend/replication/slotfuncs.c @@ -148,38 +148,55 @@ create_logical_replication_slot(char *name, char *plugin, temporary ? RS_TEMPORARY : RS_EPHEMERAL, two_phase, false, failover, false); - /* - * Ensure the logical decoding is enabled before initializing the logical - * decoding context. - */ - EnsureLogicalDecodingEnabled(); - Assert(IsLogicalDecodingEnabled()); + PG_TRY(); + { + /* + * Ensure the logical decoding is enabled before initializing the logical + * decoding context. + */ + EnsureLogicalDecodingEnabled(); + Assert(IsLogicalDecodingEnabled()); - /* - * Create logical decoding context to find start point or, if we don't - * need it, to 1) bump slot's restart_lsn and xmin 2) check plugin sanity. - * - * Note: when !find_startpoint this is still important, because it's at - * this point that the output plugin is validated. - */ - ctx = CreateInitDecodingContext(plugin, NIL, - false, /* just catalogs is OK */ - false, /* not repack */ - restart_lsn, - XL_ROUTINE(.page_read = read_local_xlog_page, - .segment_open = wal_segment_open, - .segment_close = wal_segment_close), - NULL, NULL, NULL); + /* + * Create logical decoding context to find start point or, if we don't + * need it, to 1) bump slot's restart_lsn and xmin 2) check plugin sanity. + * + * Note: when !find_startpoint this is still important, because it's at + * this point that the output plugin is validated. + */ + ctx = CreateInitDecodingContext(plugin, NIL, + false, /* just catalogs is OK */ + false, /* not repack */ + restart_lsn, + XL_ROUTINE(.page_read = read_local_xlog_page, + .segment_open = wal_segment_open, + .segment_close = wal_segment_close), + NULL, NULL, NULL); - /* - * If caller needs us to determine the decoding start point, do so now. - * This might take a while. - */ - if (find_startpoint) - DecodingContextFindStartpoint(ctx); + /* + * If caller needs us to determine the decoding start point, do so now. + * This might take a while. + */ + if (find_startpoint) + DecodingContextFindStartpoint(ctx); - /* don't need the decoding context anymore */ - FreeDecodingContext(ctx); + /* don't need the decoding context anymore */ + FreeDecodingContext(ctx); + } + PG_CATCH(); + { + /* + * Drop the slot on error. ReplicationSlotRelease() only drops + * RS_EPHEMERAL slots, so for RS_TEMPORARY slots we must explicitly + * call ReplicationSlotDropAcquired() to avoid leaving the slot + * behind (e.g. when the error is caught by a PL/pgSQL EXCEPTION + * handler that doesn't terminate the session). + */ + if (MyReplicationSlot != NULL) + ReplicationSlotDropAcquired(); + PG_RE_THROW(); + } + PG_END_TRY(); } /* @@ -566,49 +583,58 @@ pg_replication_slot_advance(PG_FUNCTION_ARGS) else moveto = Min(moveto, GetXLogReplayRecPtr(NULL)); - /* Acquire the slot so we "own" it */ - ReplicationSlotAcquire(NameStr(*slotname), true, true); - - /* A slot whose restart_lsn has never been reserved cannot be advanced */ - if (!XLogRecPtrIsValid(MyReplicationSlot->data.restart_lsn)) - ereport(ERROR, - (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - errmsg("replication slot \"%s\" cannot be advanced", - NameStr(*slotname)), - errdetail("This slot has never previously reserved WAL, or it has been invalidated."))); + PG_TRY(); + { + /* Acquire the slot so we "own" it */ + ReplicationSlotAcquire(NameStr(*slotname), true, true); + /* A slot whose restart_lsn has never been reserved cannot be advanced */ + if (!XLogRecPtrIsValid(MyReplicationSlot->data.restart_lsn)) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("replication slot \"%s\" cannot be advanced", + NameStr(*slotname)), + errdetail("This slot has never previously reserved WAL, or it has been invalidated."))); - /* - * Check if the slot is not moving backwards. Physical slots rely simply - * on restart_lsn as a minimum point, while logical slots have confirmed - * consumption up to confirmed_flush, meaning that in both cases data - * older than that is not available anymore. - */ - if (OidIsValid(MyReplicationSlot->data.database)) - minlsn = MyReplicationSlot->data.confirmed_flush; - else - minlsn = MyReplicationSlot->data.restart_lsn; + /* + * Check if the slot is not moving backwards. Physical slots rely simply + * on restart_lsn as a minimum point, while logical slots have confirmed + * consumption up to confirmed_flush, meaning that in both cases data + * older than that is not available anymore. + */ + if (OidIsValid(MyReplicationSlot->data.database)) + minlsn = MyReplicationSlot->data.confirmed_flush; + else + minlsn = MyReplicationSlot->data.restart_lsn; - if (moveto < minlsn) - ereport(ERROR, - (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - errmsg("cannot advance replication slot to %X/%08X, minimum is %X/%08X", - LSN_FORMAT_ARGS(moveto), LSN_FORMAT_ARGS(minlsn)))); + if (moveto < minlsn) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("cannot advance replication slot to %X/%08X, minimum is %X/%08X", + LSN_FORMAT_ARGS(moveto), LSN_FORMAT_ARGS(minlsn)))); - /* Do the actual slot update, depending on the slot type */ - if (OidIsValid(MyReplicationSlot->data.database)) - endlsn = pg_logical_replication_slot_advance(moveto); - else - endlsn = pg_physical_replication_slot_advance(moveto); + /* Do the actual slot update, depending on the slot type */ + if (OidIsValid(MyReplicationSlot->data.database)) + endlsn = pg_logical_replication_slot_advance(moveto); + else + endlsn = pg_physical_replication_slot_advance(moveto); - values[0] = NameGetDatum(&MyReplicationSlot->data.name); - nulls[0] = false; + values[0] = NameGetDatum(&MyReplicationSlot->data.name); + nulls[0] = false; - /* - * Recompute the minimum LSN and xmin across all slots to adjust with the - * advancing potentially done. - */ - ReplicationSlotsComputeRequiredXmin(false); - ReplicationSlotsComputeRequiredLSN(); + /* + * Recompute the minimum LSN and xmin across all slots to adjust with the + * advancing potentially done. + */ + ReplicationSlotsComputeRequiredXmin(false); + ReplicationSlotsComputeRequiredLSN(); + } + PG_CATCH(); + { + if (MyReplicationSlot != NULL) + ReplicationSlotRelease(); + PG_RE_THROW(); + } + PG_END_TRY(); ReplicationSlotRelease(); @@ -763,7 +789,13 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot) /* * Update the destination slot to current values of the source slot; * recheck that the source slot is still the one we saw previously. + * + * Use PG_TRY to ensure we drop the destination slot if any validation + * error occurs. Without this, an error caught by a PL/pgSQL EXCEPTION + * handler would leave MyReplicationSlot set, crashing on the next slot + * operation. */ + PG_TRY(); { TransactionId copy_effective_xmin; TransactionId copy_effective_catalog_xmin; @@ -797,9 +829,6 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot) * or the restart_lsn either is invalid or has gone backward. (The * restart_lsn could go backwards if the source slot is dropped and * copied from an older slot during installation.) - * - * Since erroring out will release and drop the destination slot we - * don't need to release it here. */ if (copy_restart_lsn < src_restart_lsn || src_islogical != copy_islogical || @@ -857,6 +886,18 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot) } #endif } + PG_CATCH(); + { + /* + * Drop the newly-created destination slot on error. Same as in + * create_logical_replication_slot(): use ReplicationSlotDropAcquired() + * to handle both RS_EPHEMERAL and RS_TEMPORARY slots. + */ + if (MyReplicationSlot != NULL) + ReplicationSlotDropAcquired(); + PG_RE_THROW(); + } + PG_END_TRY(); /* target slot fully created, mark as persistent if needed */ if (logical_slot && !temporary)