From 8fdc2932080a7ad7151d72639a7983f980876ee1 Mon Sep 17 00:00:00 2001 From: Alexander Korotkov Date: Thu, 19 Sep 2024 15:34:18 +0300 Subject: [PATCH v1 2/2] Attempt to implement pg_wal_replay_wait_status() --- src/backend/access/transam/xlogfuncs.c | 90 ++++++++++++++++++++---- src/backend/catalog/system_functions.sql | 5 ++ src/include/catalog/pg_proc.dat | 6 ++ 3 files changed, 87 insertions(+), 14 deletions(-) diff --git a/src/backend/access/transam/xlogfuncs.c b/src/backend/access/transam/xlogfuncs.c index 74b493e437e..ecc47c045de 100644 --- a/src/backend/access/transam/xlogfuncs.c +++ b/src/backend/access/transam/xlogfuncs.c @@ -752,20 +752,11 @@ pg_promote(PG_FUNCTION_ARGS) } /* - * Waits until recovery replays the target LSN with optional timeout. + * Prepare for waiting for LSN replay. */ -Datum -pg_wal_replay_wait(PG_FUNCTION_ARGS) +static void +pg_wal_replay_wait_prepare(const char *funcname) { - XLogRecPtr target_lsn = PG_GETARG_LSN(0); - int64 timeout = PG_GETARG_INT64(1); - WaitLSNResult result; - - if (timeout < 0) - ereport(ERROR, - (errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE), - errmsg("\"timeout\" must not be negative"))); - /* * We are going to wait for the LSN replay. We should first care that we * don't hold a snapshot and correspondingly our MyProc->xmin is invalid. @@ -791,14 +782,33 @@ pg_wal_replay_wait(PG_FUNCTION_ARGS) if (GetOldestSnapshot()) ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - errmsg("pg_wal_replay_wait() must be only called without an active or registered snapshot"), - errdetail("Make sure pg_wal_replay_wait() isn't called within a transaction with an isolation level higher than READ COMMITTED, another procedure, or a function."))); + errmsg("%s must be only called without an active or registered snapshot", funcname), + errdetail("Make sure %s isn't called within a transaction with an isolation level higher than READ COMMITTED, another procedure, or a function.", funcname))); /* * As the result we should hold no snapshot, and correspondingly our xmin * should be unset. */ Assert(MyProc->xmin == InvalidTransactionId); +} + +/* + * Waits until recovery replays the target LSN with optional timeout. Throw + * an error on failure. + */ +Datum +pg_wal_replay_wait(PG_FUNCTION_ARGS) +{ + XLogRecPtr target_lsn = PG_GETARG_LSN(0); + int64 timeout = PG_GETARG_INT64(1); + WaitLSNResult result; + + if (timeout < 0) + ereport(ERROR, + (errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE), + errmsg("\"timeout\" must not be negative"))); + + pg_wal_replay_wait_prepare("pg_wal_replay_wait()"); result = WaitForLSNReplay(target_lsn, timeout); @@ -839,3 +849,55 @@ pg_wal_replay_wait(PG_FUNCTION_ARGS) PG_RETURN_VOID(); } + +/* + * Waits until recovery replays the target LSN with optional timeout. Return + * the waiting result as a text. + */ +Datum +pg_wal_replay_wait_status(PG_FUNCTION_ARGS) +{ + XLogRecPtr target_lsn = PG_GETARG_LSN(0); + int64 timeout = PG_GETARG_INT64(1); + ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; + WaitLSNResult result; + const char *result_string; + Datum values[1]; + bool nulls[1]; + + if (timeout < 0) + ereport(ERROR, + (errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE), + errmsg("\"timeout\" must not be negative"))); + + pg_wal_replay_wait_prepare("pg_wal_replay_wait_status()"); + + result = WaitForLSNReplay(target_lsn, timeout); + + /* + * Process the result of WaitForLSNReplay(). Throw appropriate error if + * needed. + */ + switch (result) + { + case WaitLSNResultSuccess: + result_string = "success"; + break; + + case WaitLSNResultTimeout: + result_string = "timeout"; + break; + + case WaitLSNResultNotInRecovery: + result_string = "not in recovery"; + break; + + case WaitLSNResultPromotedConcurrently: + result_string = "promoted concurrently"; + break; + } + + tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, + values, nulls); + return (Datum) 0; +} diff --git a/src/backend/catalog/system_functions.sql b/src/backend/catalog/system_functions.sql index b0d0de051e7..ed092b748ef 100644 --- a/src/backend/catalog/system_functions.sql +++ b/src/backend/catalog/system_functions.sql @@ -417,6 +417,11 @@ CREATE OR REPLACE FUNCTION CREATE OR REPLACE PROCEDURE pg_wal_replay_wait(target_lsn pg_lsn, timeout int8 DEFAULT 0) LANGUAGE internal AS 'pg_wal_replay_wait'; +CREATE OR REPLACE PROCEDURE pg_wal_replay_wait_status(OUT status text, + target_lsn pg_lsn, + timeout int8 DEFAULT 0) + LANGUAGE internal AS 'pg_wal_replay_wait_status'; + CREATE OR REPLACE FUNCTION pg_logical_slot_get_changes( IN slot_name name, IN upto_lsn pg_lsn, IN upto_nchanges int, VARIADIC options text[] DEFAULT '{}', OUT lsn pg_lsn, OUT xid xid, OUT data text) diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index 43f608d7a0a..22e58d26db6 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -6647,6 +6647,12 @@ proname => 'pg_wal_replay_wait', prokind => 'p', prorettype => 'void', proargtypes => 'pg_lsn int8', proargnames => '{target_lsn,timeout}', prosrc => 'pg_wal_replay_wait' }, +{ oid => '226', + descr => 'wait for the target LSN to be replayed on standby with an optional timeout and returning the waiting result', + proname => 'pg_wal_replay_wait_status', prokind => 'p', prorettype => 'void', + proargtypes => 'pg_lsn int8 text', proargmodes => '{i,i,o}', + proargnames => '{target_lsn,timeout,status}', + prosrc => 'pg_wal_replay_wait_status' }, { oid => '6224', descr => 'get resource managers loaded in system', proname => 'pg_get_wal_resource_managers', prorows => '50', proretset => 't', -- 2.39.5 (Apple Git-154)