From ed962c55faf3b6ef28af6672591f460ad2289273 Mon Sep 17 00:00:00 2001 From: Nathan Bossart Date: Thu, 28 May 2026 15:42:20 -0500 Subject: [PATCH v2 1/3] tell client when prep stmts are deallocated --- src/backend/commands/prepare.c | 30 +++++++++++++ src/include/libpq/pqcomm.h | 2 +- src/include/libpq/protocol.h | 1 + src/interfaces/libpq/exports.txt | 1 + src/interfaces/libpq/fe-connect.c | 30 +++++++++++++ src/interfaces/libpq/fe-protocol3.c | 43 +++++++++++++++++++ src/interfaces/libpq/fe-trace.c | 10 +++++ src/interfaces/libpq/libpq-fe.h | 6 +++ src/interfaces/libpq/libpq-int.h | 2 + .../modules/libpq_pipeline/libpq_pipeline.c | 12 +++--- .../libpq_pipeline/traces/prepared.trace | 1 + 11 files changed, 131 insertions(+), 7 deletions(-) diff --git a/src/backend/commands/prepare.c b/src/backend/commands/prepare.c index 876aad2100a..4ca85b10f9e 100644 --- a/src/backend/commands/prepare.c +++ b/src/backend/commands/prepare.c @@ -26,6 +26,9 @@ #include "commands/explain_state.h" #include "commands/prepare.h" #include "funcapi.h" +#include "libpq/libpq-be.h" +#include "libpq/pqformat.h" +#include "miscadmin.h" #include "nodes/nodeFuncs.h" #include "parser/parse_coerce.h" #include "parser/parse_collate.h" @@ -512,6 +515,27 @@ DeallocateQuery(DeallocateStmt *stmt) DropAllPreparedStatements(); } +/* + * Tell the client that a prepared statement has been deallocated. Pass an + * empty string to indicate that all statements were deallocated. + * + * This is only sent to clients that are using protocol version 3.3 or later. + */ +static void +SendStmtDeallocMsg(const char *name) +{ + StringInfoData buf; + + if (whereToSendOutput != DestRemote) + return; + if (!MyProcPort || MyProcPort->proto < PG_PROTOCOL(3, 3)) + return; + + pq_beginmessage(&buf, PqMsg_PrepStmtDeallocated); + pq_sendstring(&buf, name); + pq_endmessage(&buf); +} + /* * Internal version of DEALLOCATE * @@ -530,6 +554,9 @@ DropPreparedStatement(const char *stmt_name, bool showError) /* Release the plancache entry */ DropCachedPlan(entry->plansource); + /* Alert the client */ + SendStmtDeallocMsg(entry->stmt_name); + /* Now we can remove the hash table entry */ hash_search(prepared_queries, entry->stmt_name, HASH_REMOVE, NULL); } @@ -548,6 +575,9 @@ DropAllPreparedStatements(void) if (!prepared_queries) return; + /* Alert the client */ + SendStmtDeallocMsg(""); + /* walk over cache */ hash_seq_init(&seq, prepared_queries); while ((entry = hash_seq_search(&seq)) != NULL) diff --git a/src/include/libpq/pqcomm.h b/src/include/libpq/pqcomm.h index a29c9c94d79..28e7944cdf4 100644 --- a/src/include/libpq/pqcomm.h +++ b/src/include/libpq/pqcomm.h @@ -92,7 +92,7 @@ is_unixsock_path(const char *path) * The earliest and latest frontend/backend protocol version supported. */ #define PG_PROTOCOL_EARLIEST PG_PROTOCOL(3,0) -#define PG_PROTOCOL_LATEST PG_PROTOCOL(3,2) +#define PG_PROTOCOL_LATEST PG_PROTOCOL(3,3) /* * Reserved protocol numbers, which have special semantics: diff --git a/src/include/libpq/protocol.h b/src/include/libpq/protocol.h index eae8f0e7238..7ea331f7210 100644 --- a/src/include/libpq/protocol.h +++ b/src/include/libpq/protocol.h @@ -53,6 +53,7 @@ #define PqMsg_FunctionCallResponse 'V' #define PqMsg_CopyBothResponse 'W' #define PqMsg_ReadyForQuery 'Z' +#define PqMsg_PrepStmtDeallocated 'i' #define PqMsg_NoData 'n' #define PqMsg_PortalSuspended 's' #define PqMsg_ParameterDescription 't' diff --git a/src/interfaces/libpq/exports.txt b/src/interfaces/libpq/exports.txt index 1e3d5bd5867..effd73ca3e6 100644 --- a/src/interfaces/libpq/exports.txt +++ b/src/interfaces/libpq/exports.txt @@ -211,3 +211,4 @@ PQdefaultAuthDataHook 208 PQfullProtocolVersion 209 appendPQExpBufferVA 210 PQgetThreadLock 211 +PQaddPrepStmtDeallocCallback 212 diff --git a/src/interfaces/libpq/fe-connect.c b/src/interfaces/libpq/fe-connect.c index 4272d386e64..5e41c21c6f6 100644 --- a/src/interfaces/libpq/fe-connect.c +++ b/src/interfaces/libpq/fe-connect.c @@ -8374,6 +8374,11 @@ pqParseProtocolVersion(const char *value, ProtocolVersion *result, PGconn *conn, *result = PG_PROTOCOL(3, 2); return true; } + if (strcmp(value, "3.3") == 0) + { + *result = PG_PROTOCOL(3, 3); + return true; + } libpq_append_conn_error(conn, "invalid %s value: \"%s\"", context, value); @@ -8426,3 +8431,28 @@ PQgetThreadLock(void) Assert(pg_g_threadlock); return pg_g_threadlock; } + +/* + * Adds a prepared statement deallocation callback to the connection's list of + * callbacks. These are invoked when the server sends us + * PqMsg_PrepStmtDeallocated messages. + */ +bool +PQaddPrepStmtDeallocCallback(PGconn *conn, PQprepStmtDeallocCallback cb) +{ + if (!conn) + return false; + + /* Add to end to preserve registration order */ + for (int i = 0; i < lengthof(conn->prepStmtDeallocCallbacks); i++) + { + if (conn->prepStmtDeallocCallbacks[i]) + continue; + + conn->prepStmtDeallocCallbacks[i] = cb; + return true; + } + + libpq_append_conn_error(conn, "maximum number of prepared statement deallocation callbacks already registered"); + return false; +} diff --git a/src/interfaces/libpq/fe-protocol3.c b/src/interfaces/libpq/fe-protocol3.c index 840e018cd18..0407d10362d 100644 --- a/src/interfaces/libpq/fe-protocol3.c +++ b/src/interfaces/libpq/fe-protocol3.c @@ -61,6 +61,32 @@ static size_t build_startup_packet(const PGconn *conn, char *packet, const PQEnvironmentOption *options); +/* + * Attempt to read a PrepStmtDeallocated message and invoke the connection's + * registered callbacks. This is possible in several places, so we break it + * out as a subroutine. + * + * Entry: 'i' message type and length have already been consumed. + * Exit: returns 0 if successfully consumed message and invoked callbacks, or + * EOF if not enough data. + */ +static int +getPrepStmtDeallocated(PGconn *conn) +{ + if (pqGets(&conn->workBuffer, conn)) + return EOF; + + for (int i = 0; i < lengthof(conn->prepStmtDeallocCallbacks); i++) + { + if (!conn->prepStmtDeallocCallbacks[i]) + break; + + (conn->prepStmtDeallocCallbacks[i]) (conn, conn->workBuffer.data); + } + + return 0; +} + /* * parseInput: if appropriate, parse input data from backend * until input is exhausted or a stopping state is reached. @@ -184,6 +210,11 @@ pqParseInput3(PGconn *conn) if (getParameterStatus(conn)) return; } + else if (id == PqMsg_PrepStmtDeallocated) + { + if (getPrepStmtDeallocated(conn)) + return; + } else { /* Any other case is unexpected and we summarily skip it */ @@ -305,6 +336,10 @@ pqParseInput3(PGconn *conn) if (getParameterStatus(conn)) return; break; + case PqMsg_PrepStmtDeallocated: + if (getPrepStmtDeallocated(conn)) + return; + break; case PqMsg_BackendKeyData: /* @@ -1905,6 +1940,10 @@ getCopyDataMessage(PGconn *conn) if (getParameterStatus(conn)) return 0; break; + case PqMsg_PrepStmtDeallocated: + if (getPrepStmtDeallocated(conn)) + return 0; + break; case PqMsg_CopyData: return msgLength; case PqMsg_CopyDone: @@ -2409,6 +2448,10 @@ pqFunctionCall3(PGconn *conn, Oid fnid, if (getParameterStatus(conn)) continue; break; + case PqMsg_PrepStmtDeallocated: + if (getPrepStmtDeallocated(conn)) + continue; + break; default: /* The backend violates the protocol. */ libpq_append_conn_error(conn, "protocol error: id=0x%x", id); diff --git a/src/interfaces/libpq/fe-trace.c b/src/interfaces/libpq/fe-trace.c index c348b08c39b..e9f734187a2 100644 --- a/src/interfaces/libpq/fe-trace.c +++ b/src/interfaces/libpq/fe-trace.c @@ -543,6 +543,13 @@ pqTraceOutput_ParameterStatus(FILE *f, const char *message, int *cursor) pqTraceOutputString(f, message, cursor, false); } +static void +pqTraceOutput_PrepStmtDeallocated(FILE *f, const char *message, int *cursor) +{ + fprintf(f, "PrepStmtDeallocated\t"); + pqTraceOutputString(f, message, cursor, false); +} + static void pqTraceOutput_ParameterDescription(FILE *f, const char *message, int *cursor, bool regress) { @@ -793,6 +800,9 @@ pqTraceOutputMessage(PGconn *conn, const char *message, bool toServer) else pqTraceOutput_ParameterStatus(conn->Pfdebug, message, &logCursor); break; + case PqMsg_PrepStmtDeallocated: + pqTraceOutput_PrepStmtDeallocated(conn->Pfdebug, message, &logCursor); + break; case PqMsg_ParameterDescription: pqTraceOutput_ParameterDescription(conn->Pfdebug, message, &logCursor, regress); break; diff --git a/src/interfaces/libpq/libpq-fe.h b/src/interfaces/libpq/libpq-fe.h index 8ecb9b4a4c7..c57bb8806cf 100644 --- a/src/interfaces/libpq/libpq-fe.h +++ b/src/interfaces/libpq/libpq-fe.h @@ -486,6 +486,12 @@ typedef void (*pgthreadlock_t) (int acquire); extern pgthreadlock_t PQregisterThreadLock(pgthreadlock_t newhandler); extern pgthreadlock_t PQgetThreadLock(void); +/* callbacks for prepared statement deallocation notifications */ +typedef void (*PQprepStmtDeallocCallback) (PGconn *conn, const char *name); + +extern bool PQaddPrepStmtDeallocCallback(PGconn *conn, + PQprepStmtDeallocCallback cb); + /* === in fe-trace.c === */ extern void PQtrace(PGconn *conn, FILE *debug_port); extern void PQuntrace(PGconn *conn); diff --git a/src/interfaces/libpq/libpq-int.h b/src/interfaces/libpq/libpq-int.h index 461b39620c3..7eca941ddcc 100644 --- a/src/interfaces/libpq/libpq-int.h +++ b/src/interfaces/libpq/libpq-int.h @@ -532,6 +532,8 @@ struct pg_conn void (*cleanup_async_auth) (PGconn *conn); pgsocket altsock; /* alternative socket for client to poll */ + /* Callbacks for prep stmt deallocs (16 ought to be enough for anybody) */ + PQprepStmtDeallocCallback prepStmtDeallocCallbacks[16]; /* Transient state needed while establishing connection */ PGTargetServerType target_server_type; /* desired session properties */ diff --git a/src/test/modules/libpq_pipeline/libpq_pipeline.c b/src/test/modules/libpq_pipeline/libpq_pipeline.c index ee3e2ec7570..b61f33e7cd9 100644 --- a/src/test/modules/libpq_pipeline/libpq_pipeline.c +++ b/src/test/modules/libpq_pipeline/libpq_pipeline.c @@ -1363,7 +1363,7 @@ test_protocol_version(PGconn *conn) Assert(max_protocol_version_index >= 0); /* - * Test default protocol_version (GREASE - should negotiate down to 3.2) + * Test default protocol_version (GREASE - should negotiate down to 3.3) */ vals[max_protocol_version_index] = ""; conn = PQconnectdbParams(keywords, vals, false); @@ -1373,8 +1373,8 @@ test_protocol_version(PGconn *conn) PQerrorMessage(conn)); protocol_version = PQfullProtocolVersion(conn); - if (protocol_version != 30002) - pg_fatal("expected 30002, got %d", protocol_version); + if (protocol_version != 30003) + pg_fatal("expected 30003, got %d", protocol_version); PQfinish(conn); @@ -1423,7 +1423,7 @@ test_protocol_version(PGconn *conn) PQfinish(conn); /* - * Test max_protocol_version=latest. 'latest' currently means '3.2'. + * Test max_protocol_version=latest. 'latest' currently means '3.3'. */ vals[max_protocol_version_index] = "latest"; conn = PQconnectdbParams(keywords, vals, false); @@ -1433,8 +1433,8 @@ test_protocol_version(PGconn *conn) PQerrorMessage(conn)); protocol_version = PQfullProtocolVersion(conn); - if (protocol_version != 30002) - pg_fatal("expected 30002, got %d", protocol_version); + if (protocol_version != 30003) + pg_fatal("expected 30003, got %d", protocol_version); PQfinish(conn); diff --git a/src/test/modules/libpq_pipeline/traces/prepared.trace b/src/test/modules/libpq_pipeline/traces/prepared.trace index aeb5de109e0..5d36fb0056d 100644 --- a/src/test/modules/libpq_pipeline/traces/prepared.trace +++ b/src/test/modules/libpq_pipeline/traces/prepared.trace @@ -7,6 +7,7 @@ B 113 RowDescription 4 "?column?" NNNN 0 NNNN 4 -1 0 "?column?" NNNN 0 NNNN 655 B 5 ReadyForQuery I F 16 Close S "select_one" F 4 Sync +B 15 PrepStmtDeallocated "select_one" B 4 CloseComplete B 5 ReadyForQuery I F 16 Describe S "select_one" -- 2.50.1 (Apple Git-155)