From f39597adbbdae8498b281a7cd393a9db5b1b1938 Mon Sep 17 00:00:00 2001 From: John Morris Date: Tue, 31 Oct 2023 09:55:15 -0700 Subject: [PATCH] Updated patch to manage Postgres memory allocations. - Tracks memory allocations for DSM and Postgres allocators - Reserves total memory for server and limits how much memory a server can allocate - Provides views to show current memory utilization for individual processes and the server as a whole --- doc/src/sgml/config.sgml | 26 + doc/src/sgml/monitoring.sgml | 295 ++++++++++ src/backend/catalog/system_views.sql | 9 + src/backend/port/atomics.c | 22 + src/backend/postmaster/fork_process.c | 4 + src/backend/postmaster/postmaster.c | 6 + src/backend/storage/ipc/dsm.c | 74 ++- src/backend/storage/ipc/dsm_impl.c | 41 +- src/backend/storage/ipc/shmem.c | 10 + src/backend/utils/activity/Makefile | 2 + src/backend/utils/activity/backend_status.c | 20 +- src/backend/utils/activity/memtrack.c | 189 ++++++ src/backend/utils/activity/meson.build | 2 + src/backend/utils/activity/pgstat.c | 14 +- src/backend/utils/activity/pgstat_memtrack.c | 380 ++++++++++++ src/backend/utils/activity/pgstat_shmem.c | 7 + src/backend/utils/misc/guc_tables.c | 12 + src/backend/utils/misc/postgresql.conf.sample | 3 + src/backend/utils/mmgr/aset.c | 62 +- src/backend/utils/mmgr/generation.c | 22 +- src/backend/utils/mmgr/slab.c | 33 +- src/include/catalog/pg_proc.dat | 26 + src/include/pgstat.h | 18 +- src/include/port/atomics.h | 19 + src/include/port/atomics/fallback.h | 4 + src/include/port/atomics/generic.h | 27 + src/include/storage/pg_shmem.h | 1 + src/include/utils/backend_status.h | 32 ++ src/include/utils/memtrack.h | 267 +++++++++ src/include/utils/memutils_internal.h | 1 + src/include/utils/pgstat_internal.h | 17 +- src/test/modules/Makefile | 1 + src/test/modules/meson.build | 1 + src/test/modules/test_memtrack/.gitignore | 4 + src/test/modules/test_memtrack/Makefile | 25 + src/test/modules/test_memtrack/README | 36 ++ .../test_memtrack/expected/test_memtrack.out | 169 ++++++ src/test/modules/test_memtrack/meson.build | 38 ++ .../test_memtrack/sql/test_memtrack.sql | 91 +++ .../test_memtrack/test_memtrack--1.0.sql | 20 + .../modules/test_memtrack/test_memtrack.c | 541 ++++++++++++++++++ .../modules/test_memtrack/test_memtrack.conf | 1 + .../test_memtrack/test_memtrack.control | 4 + src/test/modules/test_memtrack/worker_pool.c | 433 ++++++++++++++ src/test/modules/test_memtrack/worker_pool.h | 98 ++++ src/test/regress/expected/opr_sanity.out | 8 +- src/test/regress/expected/rules.out | 20 + src/test/regress/regress.c | 15 + 48 files changed, 3064 insertions(+), 86 deletions(-) create mode 100644 src/backend/utils/activity/memtrack.c create mode 100644 src/backend/utils/activity/pgstat_memtrack.c create mode 100644 src/include/utils/memtrack.h create mode 100644 src/test/modules/test_memtrack/.gitignore create mode 100644 src/test/modules/test_memtrack/Makefile create mode 100644 src/test/modules/test_memtrack/README create mode 100644 src/test/modules/test_memtrack/expected/test_memtrack.out create mode 100644 src/test/modules/test_memtrack/meson.build create mode 100644 src/test/modules/test_memtrack/sql/test_memtrack.sql create mode 100644 src/test/modules/test_memtrack/test_memtrack--1.0.sql create mode 100644 src/test/modules/test_memtrack/test_memtrack.c create mode 100644 src/test/modules/test_memtrack/test_memtrack.conf create mode 100644 src/test/modules/test_memtrack/test_memtrack.control create mode 100644 src/test/modules/test_memtrack/worker_pool.c create mode 100644 src/test/modules/test_memtrack/worker_pool.h diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml index bd70ff2e4b..49a6e1e80c 100644 --- a/doc/src/sgml/config.sgml +++ b/doc/src/sgml/config.sgml @@ -2151,6 +2151,32 @@ include_dir 'conf.d' + + max_total_memory (integer) + + max_total_memory configuration parameter + + + + + Specifies a limit to the total amount of memory a database server + is allowed to allocate. + If this value is specified without units, it is taken as megabytes. + If unset or set to 0, the limit is disabled. If set, max_total_memory must be + larger than , and it should include at least an additional + 2MB per connection allowed by . + + + A memory request that would exhaust the limit will + be denied with an out of memory error, causing that process's current + query/transaction to fail and all of the memory allocated for that action to be released. + Total memory currently reserved (total_memory_reserved) is displayed in the + + pg_stat_global_memory_tracking view. + + + + diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml index e068f7e247..e171acccf6 100644 --- a/doc/src/sgml/monitoring.sgml +++ b/doc/src/sgml/monitoring.sgml @@ -4563,6 +4563,301 @@ description | Waiting for a newly initialized WAL file to reach durable storage + + <structname>pg_stat_memory_reservation</structname> + + + pg_stat_memory_reservation + + + + The pg_stat_memory_reservation view will have one + row per server process, including the postmaster, showing how much memory is currently + reserved by that process. Note that reservations are taken in chunks and therefore + will always be a bit more than the process has actually requested. + Dynamic shared memory (dsm) is attributed to the backend which created the + shared memory segment, not to backends which only attach to it. + When a backend exits, dsm allocations that have + not been freed are considered long lived and will continue to be part of + dsm_memory_allocated, found in the + + pg_stat_global_memory_tracking view. + Use pg_size_pretty described in + to make these values more easily + readable. + + + + <structname>pg_stat_memory_reservation</structname> View + + + + + Column Type + + + Description + + + + + + + + pid integer + + + Process ID of this backend + + + + + + total_reserved bigint + + + Total memory currently reserved to this backend. + + + + + + aset_reserved bigint + + + Subtotal of memory reserved via the allocation set allocator. + + + + + + dsm_reserved bigint + + + Subtotal representing dynamic shared memory (dsm) reserved by this backend. + When a backend exits, dsm allocations that have + not been freed are considered long lived and will continue to be part of + dsm_memory_allocated, found in the + + pg_global_memory_tracking view. + + + + + + generation_reserved bigint + + + Subtotal of memory reserved via the generation allocator. + + + + + + slab_reserved bigint + + + Subtotal of memory reserved via the slab allocator. + + + + + + init_reserved bigint + + + Subtotal of memory which was automatically reserved when a backend starts up. + + + + + +
+ +
+ + + <structname>pg_backend_memory_allocation</structname> + + + pg_backend_memory_allocation + + + + The pg_backend_memory_allocation view has + a single row showing memory allocated by the current backend process. + The row contains similar fields to pg_stat_memory_reservation, + except these numbers are the actual amounts allocated and therefore will always be less + than or possibly equal to the amount reserved. + + + + <structname>pg_backend_memory_allocation</structname> View + + + + + Column Type + + + Description + + + + + + + + pid integer + + + Process ID of this backend + + + + + + total_top_context_allocated bigint + + + Total private memory currently allocated by this backend. + Note this total includes the TopMemoryContext, all its children, + along with any free lists maintained by the memory allocators. + This value should match the sum of memory allocated + for the allocation set, generation and slab allocators. + + + + + + aset_allocated bigint + + + Subtotal of memory allocated via the allocation set allocator. + + + + + + dsm_allocated bigint + + + Subtotal representing dynamic shared memory (dsm) allocated by this backend. + When a backend exits, dsm allocations that have + not been freed are considered long lived and will continue to be part of + dsm_memory_allocated, found in the + + pg_global_memory_tracking view. + + + + + + generation_allocated bigint + + + Subtotal of memory allocated via the generation allocator. + + + + + + slab_allocated bigint + + + Subtotal of memory allocated via the slab allocator. + + + + + +
+ +
+ + + <structname>pg_stat_global_memory_tracking</structname> + + + pg_stat_global_memory_tracking + + + + The pg_stat_global_memory_tracking view shows + a memory reservation summary for the entire server. This view + has a single row with a column for each global value. + Use pg_size_pretty described in + to make the byte populated values + more easily readable. + + + + <structname>pg_stat_global_memory_tracking</structname> View + + + + + Column Type + + + Description + + + + + + + + total_memory_reserved bigint + + + Reports the total amount of memory (in bytes) currently reserved by the server. + This total includes shared memory as well as private memory by the backends. + + + + + + dsm_memory_allocated bigint + + + Reports the amount of dynamic shared memory (dsm) in bytes currently allocated by the server. + + + + + + total_memory_available bigint + + + Reports how much memory remains available to the server. If a + backend process attempts to allocate more memory than remains, + the process will fail with an out of memory error, resulting in + cancellation of the process's active query/transaction. + If memory is not being limited (ie. max_total_memory is zero or not set), + this column returns NULL. + . + + + + + + static_shared_memory bigint + + + Reports how much static shared memory (non-DSM shared memory) is being used by + the server. Static shared memory is configured by the postmaster at + at server startup. + . + + + + + +
+ +
+ Statistics Functions diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index b65f6b5249..c6b68e5965 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -1348,3 +1348,12 @@ CREATE VIEW pg_stat_subscription_stats AS CREATE VIEW pg_wait_events AS SELECT * FROM pg_get_wait_events(); + +CREATE VIEW pg_stat_memory_reservation AS + SELECT * FROM pg_stat_get_memory_reservation(); + +CREATE VIEW pg_backend_memory_allocation AS + SELECT * from pg_get_backend_memory_allocation(); + +CREATE VIEW pg_stat_global_memory_tracking AS + SELECT * from pg_stat_get_global_memory_tracking(); diff --git a/src/backend/port/atomics.c b/src/backend/port/atomics.c index 22123929e8..4260297bd6 100644 --- a/src/backend/port/atomics.c +++ b/src/backend/port/atomics.c @@ -236,4 +236,26 @@ pg_atomic_fetch_add_u64_impl(volatile pg_atomic_uint64 *ptr, int64 add_) return oldval; } + +/* + * Emulate the atomic op using a spinlock. + */ +bool +pg_atomic_fetch_add_limit_u64_impl(volatile pg_atomic_uint64 *sum,, uint64 add, uint64 limit, uint64 *oldval) +{ + uint64 newval; + bool success; + + SpinLockAcquire((slock_t *) &ptr->sema); + *oldval = ptr->value; + newval = *oldval + add; + + success = newval <= limit && newval >= *oldval; /* overflow check */ + if (success) + ptr->value = newval; + SpinLockRelease((slock_t *) &ptr->sema); + + return success; +} + #endif /* PG_HAVE_ATOMIC_U64_SIMULATION */ diff --git a/src/backend/postmaster/fork_process.c b/src/backend/postmaster/fork_process.c index 6f9c2765d6..41b3439480 100644 --- a/src/backend/postmaster/fork_process.c +++ b/src/backend/postmaster/fork_process.c @@ -20,6 +20,7 @@ #include "libpq/pqsignal.h" #include "postmaster/fork_process.h" +#include "utils/memtrack.h" #ifndef WIN32 /* @@ -111,6 +112,9 @@ fork_process(void) } } + /* Update memory tracking after the fork() */ + fork_tracked_memory(); + /* do post-fork initialization for random number generation */ pg_strong_random_init(); } diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c index 7b6b613c4a..03764c64bb 100644 --- a/src/backend/postmaster/postmaster.c +++ b/src/backend/postmaster/postmaster.c @@ -540,6 +540,7 @@ typedef struct #endif char my_exec_path[MAXPGPATH]; char pkglib_path[MAXPGPATH]; + int max_total_memory_mb; } BackendParameters; static void read_backend_variables(char *id, Port *port); @@ -6096,6 +6097,8 @@ save_backend_variables(BackendParameters *param, Port *port, strlcpy(param->pkglib_path, pkglib_path, MAXPGPATH); + param->max_total_memory_mb = max_total_memory_mb; + return true; } @@ -6324,6 +6327,9 @@ restore_backend_variables(BackendParameters *param, Port *port) strlcpy(pkglib_path, param->pkglib_path, MAXPGPATH); + max_total_memory_mb = param->max_total_memory_mb; + max_total_memory_bytes = (uint64) max_total_memory_mb * 1024 * 1024; + /* * We need to restore fd.c's counts of externally-opened FDs; to avoid * confusion, be sure to do this after restoring max_safe_fds. (Note: diff --git a/src/backend/storage/ipc/dsm.c b/src/backend/storage/ipc/dsm.c index 7e4e27810e..d1b76299e8 100644 --- a/src/backend/storage/ipc/dsm.c +++ b/src/backend/storage/ipc/dsm.c @@ -14,6 +14,12 @@ * hard postmaster crash, remaining segments will be removed, if they * still exist, at the next postmaster startup. * + * When invoking the low level functions, the caller must correctly + * set "request_size" when creating and destroying segments. + * This value is needed for for tracking server memory utilization. + * To provide the size on "destroy", the orginal creation size is saved + * in the dsm table.. + * * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * @@ -82,6 +88,7 @@ typedef struct dsm_control_item size_t first_page; size_t npages; void *impl_private_pm_handle; /* only needed on Windows */ + size_t size; /* the requested size when created */ bool pinned; } dsm_control_item; @@ -103,6 +110,9 @@ static uint64 dsm_control_bytes_needed(uint32 nitems); static inline dsm_handle make_main_region_dsm_handle(int slot); static inline bool is_main_region_dsm_handle(dsm_handle handle); +static Size dsm_segment_size(dsm_segment *seg); +static Size dsm_handle_size(dsm_handle handle); + /* Has this backend initialized the dynamic shared memory system yet? */ static bool dsm_init_done = false; @@ -138,6 +148,7 @@ static dlist_head dsm_segment_list = DLIST_STATIC_INIT(dsm_segment_list); static dsm_handle dsm_control_handle; static dsm_control_header *dsm_control; static Size dsm_control_mapped_size = 0; +static Size dsm_control_size = 0; static void *dsm_control_impl_private = NULL; /* @@ -151,7 +162,6 @@ dsm_postmaster_startup(PGShmemHeader *shim) { void *dsm_control_address = NULL; uint32 maxitems; - Size segsize; Assert(!IsUnderPostmaster); @@ -169,7 +179,7 @@ dsm_postmaster_startup(PGShmemHeader *shim) + PG_DYNSHMEM_SLOTS_PER_BACKEND * MaxBackends; elog(DEBUG2, "dynamic shared memory system will support %u segments", maxitems); - segsize = dsm_control_bytes_needed(maxitems); + dsm_control_size = dsm_control_bytes_needed(maxitems); /* * Loop until we find an unused identifier for the new control segment. We @@ -184,7 +194,7 @@ dsm_postmaster_startup(PGShmemHeader *shim) dsm_control_handle = pg_prng_uint32(&pg_global_prng_state) << 1; if (dsm_control_handle == DSM_HANDLE_INVALID) continue; - if (dsm_impl_op(DSM_OP_CREATE, dsm_control_handle, segsize, + if (dsm_impl_op(DSM_OP_CREATE, dsm_control_handle, dsm_control_size, &dsm_control_impl_private, &dsm_control_address, &dsm_control_mapped_size, ERROR)) break; @@ -193,7 +203,7 @@ dsm_postmaster_startup(PGShmemHeader *shim) on_shmem_exit(dsm_postmaster_shutdown, PointerGetDatum(shim)); elog(DEBUG2, "created dynamic shared memory control segment %u (%zu bytes)", - dsm_control_handle, segsize); + dsm_control_handle, dsm_control_size); shim->dsm_control = dsm_control_handle; /* Initialize control segment. */ @@ -371,7 +381,7 @@ dsm_postmaster_shutdown(int code, Datum arg) handle); /* Destroy the segment. */ - dsm_impl_op(DSM_OP_DESTROY, handle, 0, &junk_impl_private, + dsm_impl_op(DSM_OP_DESTROY, handle, dsm_handle_size(handle), &junk_impl_private, &junk_mapped_address, &junk_mapped_size, LOG); } @@ -380,7 +390,7 @@ dsm_postmaster_shutdown(int code, Datum arg) "cleaning up dynamic shared memory control segment with ID %u", dsm_control_handle); dsm_control_address = dsm_control; - dsm_impl_op(DSM_OP_DESTROY, dsm_control_handle, 0, + dsm_impl_op(DSM_OP_DESTROY, dsm_control_handle, dsm_control_size, &dsm_control_impl_private, &dsm_control_address, &dsm_control_mapped_size, LOG); dsm_control = dsm_control_address; @@ -572,6 +582,7 @@ dsm_create(Size size, int flags) dsm_control->item[i].refcnt = 2; dsm_control->item[i].impl_private_pm_handle = NULL; dsm_control->item[i].pinned = false; + dsm_control->item[i].size = size; seg->control_slot = i; LWLockRelease(DynamicSharedMemoryControlLock); return seg; @@ -585,7 +596,7 @@ dsm_create(Size size, int flags) FreePageManagerPut(dsm_main_space_fpm, first_page, npages); LWLockRelease(DynamicSharedMemoryControlLock); if (!using_main_dsm_region) - dsm_impl_op(DSM_OP_DESTROY, seg->handle, 0, &seg->impl_private, + dsm_impl_op(DSM_OP_DESTROY, seg->handle, dsm_segment_size(seg), &seg->impl_private, &seg->mapped_address, &seg->mapped_size, WARNING); if (seg->resowner != NULL) ResourceOwnerForgetDSM(seg->resowner, seg); @@ -611,6 +622,7 @@ dsm_create(Size size, int flags) dsm_control->item[nitems].refcnt = 2; dsm_control->item[nitems].impl_private_pm_handle = NULL; dsm_control->item[nitems].pinned = false; + dsm_control->item[nitems].size = size; seg->control_slot = nitems; dsm_control->nitems++; LWLockRelease(DynamicSharedMemoryControlLock); @@ -854,7 +866,7 @@ dsm_detach(dsm_segment *seg) * we did. There's not much we can do about that, though. */ if (is_main_region_dsm_handle(seg->handle) || - dsm_impl_op(DSM_OP_DESTROY, seg->handle, 0, &seg->impl_private, + dsm_impl_op(DSM_OP_DESTROY, seg->handle, dsm_handle_size(seg->handle), &seg->impl_private, &seg->mapped_address, &seg->mapped_size, WARNING)) { LWLockAcquire(DynamicSharedMemoryControlLock, LW_EXCLUSIVE); @@ -1026,7 +1038,7 @@ dsm_unpin_segment(dsm_handle handle) * here. */ if (is_main_region_dsm_handle(handle) || - dsm_impl_op(DSM_OP_DESTROY, handle, 0, &junk_impl_private, + dsm_impl_op(DSM_OP_DESTROY, handle, dsm_handle_size(handle), &junk_impl_private, &junk_mapped_address, &junk_mapped_size, WARNING)) { LWLockAcquire(DynamicSharedMemoryControlLock, LW_EXCLUSIVE); @@ -1098,6 +1110,50 @@ dsm_segment_handle(dsm_segment *seg) return seg->handle; } +/* + * Given a handle, get the size used to create a shared memory segment. + * We assume we have a valid handle, meaning the segment + * is either the control segment or it is in the dsm table. + */ +static Size +dsm_handle_size(dsm_handle handle) +{ + int nitems; + int slot; + + /* + * If we're setting up the dsm header, return the size of the control + * segment + */ + if (handle == dsm_control_handle) + return dsm_control_size; + + /* Find the corresponding dsm segment. */ + nitems = dsm_control->nitems; + for (slot = 0; slot < nitems; slot++) + if (dsm_control->item[slot].refcnt != 0 && dsm_control->item[slot].handle == handle) + break; + + /* Assert: we should always find the handle */ + Assert(slot < nitems); + + /* We found the handle. Now return the size; */ + return dsm_control->item[slot].size; +} + + +/* + * Given a dsm segment, get the size it was created with. + * The control segment doesn't have a segment structure, + * so this only applies to segments in the table. + */ +static Size +dsm_segment_size(dsm_segment *seg) +{ + return dsm_control->item[seg->control_slot].size; +} + + /* * Register an on-detach callback for a dynamic shared memory segment. */ diff --git a/src/backend/storage/ipc/dsm_impl.c b/src/backend/storage/ipc/dsm_impl.c index 35fa910d6f..43331cf780 100644 --- a/src/backend/storage/ipc/dsm_impl.c +++ b/src/backend/storage/ipc/dsm_impl.c @@ -64,10 +64,12 @@ #include "pgstat.h" #include "portability/mem.h" #include "postmaster/postmaster.h" +#include "storage/dsm.h" #include "storage/dsm_impl.h" #include "storage/fd.h" #include "utils/guc.h" #include "utils/memutils.h" +#include "utils/memtrack.h" #ifdef USE_DSM_POSIX static bool dsm_impl_posix(dsm_op op, dsm_handle handle, Size request_size, @@ -138,7 +140,8 @@ int min_dynamic_shared_memory; * op: The operation to be performed. * handle: The handle of an existing object, or for DSM_OP_CREATE, the * identifier for the new handle the caller wants created. - * request_size: For DSM_OP_CREATE, the requested size. Otherwise, 0. + * request_size: For DSM_OP_CREATE and DSM_OP_DESTROY, the requested size. + * Otherwise, 0. * impl_private: Private, implementation-specific data. Will be a pointer * to NULL for the first operation on a shared memory segment within this * backend; thereafter, it will point to the value to which it was set @@ -160,37 +163,61 @@ dsm_impl_op(dsm_op op, dsm_handle handle, Size request_size, void **impl_private, void **mapped_address, Size *mapped_size, int elevel) { - Assert(op == DSM_OP_CREATE || request_size == 0); + bool success; + + Assert(op == DSM_OP_CREATE || op == DSM_OP_DESTROY || request_size == 0); Assert((op != DSM_OP_CREATE && op != DSM_OP_ATTACH) || (*mapped_address == NULL && *mapped_size == 0)); + /* Reserve the memory if we are creating a new segment */ + if (op == DSM_OP_CREATE && !reserve_tracked_memory(request_size, PG_ALLOC_DSM)) + { + ereport(elevel, + (errcode(ERRCODE_OUT_OF_MEMORY), + errmsg("Unable to reserve backend memory for dynamic shared memory segment."), + errhint("Consider increasing the configuration parameter \"max_total_memory\"."))); + return false; + } + + + /* Dispatching the DSM operation to the desired implementation.*/ switch (dynamic_shared_memory_type) { #ifdef USE_DSM_POSIX case DSM_IMPL_POSIX: - return dsm_impl_posix(op, handle, request_size, impl_private, + success = dsm_impl_posix(op, handle, request_size, impl_private, mapped_address, mapped_size, elevel); + break; #endif #ifdef USE_DSM_SYSV case DSM_IMPL_SYSV: - return dsm_impl_sysv(op, handle, request_size, impl_private, + success = dsm_impl_sysv(op, handle, request_size, impl_private, mapped_address, mapped_size, elevel); + break; #endif #ifdef USE_DSM_WINDOWS case DSM_IMPL_WINDOWS: - return dsm_impl_windows(op, handle, request_size, impl_private, + success = dsm_impl_windows(op, handle, request_size, impl_private, mapped_address, mapped_size, elevel); + break; #endif #ifdef USE_DSM_MMAP case DSM_IMPL_MMAP: - return dsm_impl_mmap(op, handle, request_size, impl_private, + success = dsm_impl_mmap(op, handle, request_size, impl_private, mapped_address, mapped_size, elevel); + break; #endif default: elog(ERROR, "unexpected dynamic shared memory type: %d", dynamic_shared_memory_type); - return false; + success = false; /* silence compiler */ } + + /* Release the memory if we destroyed the segment or failed to create it */ + if ((success && op == DSM_OP_DESTROY) || (!success && op == DSM_OP_CREATE)) + release_tracked_memory(request_size, PG_ALLOC_DSM); + + return success; } #ifdef USE_DSM_POSIX diff --git a/src/backend/storage/ipc/shmem.c b/src/backend/storage/ipc/shmem.c index 5465fa1964..9c7b7841f5 100644 --- a/src/backend/storage/ipc/shmem.c +++ b/src/backend/storage/ipc/shmem.c @@ -582,3 +582,13 @@ pg_get_shmem_allocations(PG_FUNCTION_ARGS) return (Datum) 0; } + +/* + * Return the size of shared memory + */ +Size +ShmemGetSize(void) +{ + Assert(ShmemSegHdr != NULL); + return ShmemSegHdr->totalsize; +} diff --git a/src/backend/utils/activity/Makefile b/src/backend/utils/activity/Makefile index f57cf3958c..f57cb35d73 100644 --- a/src/backend/utils/activity/Makefile +++ b/src/backend/utils/activity/Makefile @@ -18,6 +18,7 @@ override CPPFLAGS := -I. -I$(srcdir) $(CPPFLAGS) OBJS = \ backend_progress.o \ backend_status.o \ + memtrack.o \ pgstat.o \ pgstat_archiver.o \ pgstat_bgwriter.o \ @@ -25,6 +26,7 @@ OBJS = \ pgstat_database.o \ pgstat_function.o \ pgstat_io.o \ + pgstat_memtrack.o \ pgstat_relation.o \ pgstat_replslot.o \ pgstat_shmem.o \ diff --git a/src/backend/utils/activity/backend_status.c b/src/backend/utils/activity/backend_status.c index 6e734c6caf..e46897002d 100644 --- a/src/backend/utils/activity/backend_status.c +++ b/src/backend/utils/activity/backend_status.c @@ -24,6 +24,7 @@ #include "utils/backend_status.h" #include "utils/guc.h" /* for application_name */ #include "utils/memutils.h" +#include "utils/memtrack.h" /* ---------- @@ -73,7 +74,6 @@ static MemoryContext backendStatusSnapContext; static void pgstat_beshutdown_hook(int code, Datum arg); -static void pgstat_read_current_status(void); static void pgstat_setup_backend_status_context(void); @@ -272,6 +272,9 @@ pgstat_beinit(void) /* Set up a process-exit hook to clean up */ on_shmem_exit(pgstat_beshutdown_hook, 0); + + /* Post the memory used so far to pgstats */ + update_global_reservation(0, 0); } @@ -401,6 +404,9 @@ pgstat_bestart(void) lbeentry.st_progress_command_target = InvalidOid; lbeentry.st_query_id = UINT64CONST(0); + /* BEEntry memory should always match reported memory. (None in this case) */ + lbeentry.st_memory = reported_memory; + /* * we don't zero st_progress_param here to save cycles; nobody should * examine it until st_progress_command has been set to something other @@ -471,6 +477,9 @@ pgstat_beshutdown_hook(int code, Datum arg) PGSTAT_END_WRITE_ACTIVITY(beentry); + /* Stop reporting memory allocation changes to shared memory */ + exit_tracked_memory(); + /* so that functions can check if backend_status.c is up via MyBEEntry */ MyBEEntry = NULL; } @@ -724,7 +733,7 @@ pgstat_report_xact_timestamp(TimestampTz tstamp) * if not already done in this transaction. * ---------- */ -static void +void pgstat_read_current_status(void) { volatile PgBackendStatus *beentry; @@ -741,6 +750,13 @@ pgstat_read_current_status(void) #endif int i; + /* + * For consistency, take a snapshot of the memtrack globals. + * We do both snapshots to ensure the global memory total matches + * the sum of backend memory. + */ + pgstat_snapshot_fixed(PGSTAT_KIND_MEMORYTRACK); + if (localBackendStatusTable) return; /* already done */ diff --git a/src/backend/utils/activity/memtrack.c b/src/backend/utils/activity/memtrack.c new file mode 100644 index 0000000000..e2c207b883 --- /dev/null +++ b/src/backend/utils/activity/memtrack.c @@ -0,0 +1,189 @@ +/*------------------------------------------------------------------------- + * + * memtrack.c + * track and manage memory usage by the PostgreSQL server. + * + * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * + * IDENTIFICATION + * src/backend/utils/activity/memtrack.c + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" +#include "miscadmin.h" +#include "utils/backend_status.h" +#include "utils/memtrack.h" +#include "storage/proc.h" +#include "storage/pg_shmem.h" +#include "utils/pgstat_internal.h" + +/* + * Max backend memory allocation allowed (MB). 0 = disabled. + * Max backend bytes is the same but in bytes. + * These default to "0", meaning don't check bounds for total memory. + */ +int max_total_memory_mb = 0; +int64 max_total_memory_bytes = 0; + +/* + * Private variables for tracking memory use. + * These values are preset so memory tracking is active on startup. + * After a fork(), they must be reset using 'fork_tracked_memory()'. + */ +PgStat_Memory my_memory = INITIAL_ALLOCATED_MEMORY; +PgStat_Memory reported_memory = NO_ALLOCATED_MEMORY; +int64 reservation_lower_bound = 0; +int64 reservation_upper_bound = 0; + +/* + * Reset memory tracking after a fork. + * We actually keep the memory intact, but + * the memory hasn't been added to the global totals. + * + * The counters are properly initialized at startup, + * so this function only needs to be called after a fork(). + */ +void +fork_tracked_memory(void) +{ + /* This new process hasn't reported any memory yet. */ + reported_memory = NO_ALLOCATED_MEMORY; + + /* Force allocations to be reported once ProcGlobal is initialized. */ + reservation_lower_bound = 0; + reservation_upper_bound = 0; + + /* Release the DSM reservations since we didn't create them. */ + update_local_reservation(-my_memory.subTotal[PG_ALLOC_DSM], PG_ALLOC_DSM); +} + +/* + * Clean up memory counters as backend is exiting. + * + * DSM memory is not automatically returned, so it persists in the counters. + * All other memory will disappear, so those counters are set to zero. + * + * Ideally, this function would be called last, but in practice there are some + * late memory releases that happen after it is called. + */ +void +exit_tracked_memory(void) +{ + /* + * Release all of our private (non-dsm) memory. We don't release dsm + * shared memory since it survives process exit. + */ + for (int type = 0; type < PG_ALLOC_TYPE_MAX; type++) + if (type != PG_ALLOC_DSM) + update_local_reservation(-my_memory.subTotal[type], type); + + /* Report the final values to shmem (just once) */ + (void) update_global_reservation(0, 0); + + /* + * Sometimes we get late memory releases after this function is called. + * We've already reported all our private memory as released. Set the + * bounds to ensure we don't report those late releases twice. + */ + reservation_lower_bound = INT64_MIN; + reservation_upper_bound = INT64_MAX; +} + + +/* + * Update memory reservation for a new request. + * + * There are two versions of this function. This one, which updates + * global values in shared memory, and an optimized update_local_reservation() + * which only updates private values. + * + * This routine is the "slow path". We invoke it periodically to update + * global values and pgstat statistics. + * + * We also invoke it whenever we reserve DSM memory. This ensures the + * DSM memory counter is up-to-date, and more important, ensures it + * never goes negative. + */ +bool +update_global_reservation(int64 size, pg_allocator_type type) +{ + int64 delta; + uint64 dummy; + PgStatShared_Memtrack *global = &pgStatLocal.shmem->memtrack; + + /* + * If we are still initializing, only update the private counters. + * The tests are: + * 1) Is pg shared memory attached? + * 2) Are statistics initialized? + * 3) Is postmaster up and running? + * 4) If backend, is MyBEEntry set up? + */ + if (UsedShmemSegAddr == NULL || pgStatLocal.shmem == NULL || PostmasterPid == 0 || + (MyProcPid != PostmasterPid && MyBEEntry == NULL)) + return update_local_reservation(size, type); + + /* Verify totals are not negative. This is both a pre- and post-condition. */ + Assert((int64) pg_atomic_read_u64(&global->total_memory_reserved) >= 0); + Assert((int64) pg_atomic_read_u64(&global->total_dsm_reserved) >= 0); + + /* Calculate total bytes allocated or freed since last report */ + delta = my_memory.total + size - reported_memory.total; + + /* + * If memory limits are set, we are increasing our reservation and we + * are not the postmaster... + */ + if (max_total_memory_bytes > 0 && size > 0 && MyProcPid != PostmasterPid && delta > 0) + { + /* Update the global total memory counter subject to the upper limit. */ + if (!pg_atomic_fetch_add_limit_u64(&global->total_memory_reserved, delta, max_total_memory_bytes, &dummy)) + return false; + } + + /* + * Otherwise, update the global counter with no limit checking. + */ + else + (void) pg_atomic_fetch_add_u64(&global->total_memory_reserved, delta); + + /* + * Update the private memory counters. This must happen after the limit is + * checked. + */ + (void) update_local_reservation(size, type); + + /* + * Update the global dsm memory counter. Since we always take this path + * when dsm memory is allocated, the reported value is up-to-date, and we + * can simply add in the new size. We don't need to calculate the delta as + * we do for private memory allocators. + */ + if (type == PG_ALLOC_DSM) + (void) pg_atomic_fetch_add_u64(&global->total_dsm_reserved, size); + + /* Report the current memory allocations for either postmaster or backend */ + if (MyProcPid == PostmasterPid) + pgstat_report_postmaster_memory(); + else + pgstat_report_backend_memory(); + + /* Remember the values we just reported */ + reported_memory = my_memory; + + /* Update bounds so they bracket our new allocation size. */ + reservation_upper_bound = my_memory.total + allocation_allowance_refill_qty; + reservation_lower_bound = my_memory.total - allocation_allowance_refill_qty; + + /* + * Verify totals are not negative. By checking as a post-condition, we are + * more likely to identify the code that caused the problem. + */ + Assert((int64) pg_atomic_read_u64(&global->total_memory_reserved) >= 0); + Assert((int64) pg_atomic_read_u64(&global->total_dsm_reserved) >= 0); + + return true; +} diff --git a/src/backend/utils/activity/meson.build b/src/backend/utils/activity/meson.build index 46a27e7548..d4ba18dd79 100644 --- a/src/backend/utils/activity/meson.build +++ b/src/backend/utils/activity/meson.build @@ -3,6 +3,7 @@ backend_sources += files( 'backend_progress.c', 'backend_status.c', + 'memtrack.c', 'pgstat.c', 'pgstat_archiver.c', 'pgstat_bgwriter.c', @@ -10,6 +11,7 @@ backend_sources += files( 'pgstat_database.c', 'pgstat_function.c', 'pgstat_io.c', + 'pgstat_memtrack.c', 'pgstat_relation.c', 'pgstat_replslot.c', 'pgstat_shmem.c', diff --git a/src/backend/utils/activity/pgstat.c b/src/backend/utils/activity/pgstat.c index d743fc0b28..1dcb76f2f3 100644 --- a/src/backend/utils/activity/pgstat.c +++ b/src/backend/utils/activity/pgstat.c @@ -104,6 +104,7 @@ #include "storage/pg_shmem.h" #include "storage/shmem.h" #include "utils/guc_hooks.h" +#include "utils/memtrack.h" #include "utils/memutils.h" #include "utils/pgstat_internal.h" #include "utils/timestamp.h" @@ -393,6 +394,12 @@ static const PgStat_KindInfo pgstat_kind_infos[PGSTAT_NUM_KINDS] = { .reset_all_cb = pgstat_wal_reset_all_cb, .snapshot_cb = pgstat_wal_snapshot_cb, }, + + [PGSTAT_KIND_MEMORYTRACK] = { + .name = "memtrack", + .fixed_amount = true, + .snapshot_cb = pgstat_memtrack_snapshot_cb, + } }; @@ -762,7 +769,10 @@ pgstat_reset_of_kind(PgStat_Kind kind) TimestampTz ts = GetCurrentTimestamp(); if (kind_info->fixed_amount) - kind_info->reset_all_cb(ts); + { + if (kind_info->reset_all_cb != NULL) + kind_info->reset_all_cb(ts); + } else pgstat_reset_entries_of_kind(kind, ts); } @@ -1690,7 +1700,7 @@ pgstat_reset_after_failure(void) { const PgStat_KindInfo *kind_info = pgstat_get_kind_info(kind); - if (!kind_info->fixed_amount) + if (!kind_info->fixed_amount || kind_info->reset_all_cb == NULL) continue; kind_info->reset_all_cb(ts); diff --git a/src/backend/utils/activity/pgstat_memtrack.c b/src/backend/utils/activity/pgstat_memtrack.c new file mode 100644 index 0000000000..f8d646f12a --- /dev/null +++ b/src/backend/utils/activity/pgstat_memtrack.c @@ -0,0 +1,380 @@ +/* ------------------------------------------------------------------------- + * + * pgstat_memtrack.c + * Implementation of memory tracking statistics. + * + * This file contains the implementation of memtrack statistics. It is kept + * separate from pgstat.c to enforce the line between the statistics access / + * storage implementation and the details about individual types of + * statistics. + * + * Copyright (c) 2001-2023, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/backend/utils/activity/pgstat_memtrack.c + * ------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include "utils/pgstat_internal.h" +#include "utils/memtrack.h" +#include "utils/memutils_internal.h" +#include "utils/tuplestore.h" +#include "funcapi.h" +#include "storage/pg_shmem.h" + +inline static Size asMB(Size bytes); +static void get_postmaster_reservation_row(bool *nulls, Datum *values); +static void get_backend_reservation_row(int idx, bool *nulls, Datum *values); +static void clearRow(bool *nulls, Datum *values, int count); + +/* + * Report postmaster memory allocations to pgstat. + * Note memory statistics are accumulated in my_memory. + * This function copies them into pgstat shared memory. + * Only the postmaster should call this function. + */ +void +pgstat_report_postmaster_memory(void) +{ + PgStatShared_Memtrack *global = &pgStatLocal.shmem->memtrack; + Assert(pgStatLocal.shmem != NULL); + Assert(MyProcPid == PostmasterPid); + + pgstat_begin_changecount_write(&global->postmasterChangeCount); + global->postmasterMemory = my_memory; + pgstat_end_changecount_write(&global->postmasterChangeCount); +} + + +/* + * Report background memory allocations to pgstat. + */ +void +pgstat_report_backend_memory(void) +{ + Assert(MyBEEntry != NULL); + PGSTAT_BEGIN_WRITE_ACTIVITY(MyBEEntry); + MyBEEntry->st_memory = my_memory; + PGSTAT_END_WRITE_ACTIVITY(MyBEEntry); +} + + +/* + * Initialize the pgstat global memory counters, + * Called once during server startup. + */ +void +pgstat_init_memtrack(PgStatShared_Memtrack *global) +{ + Size shmem_bytes; + Size shmem_mb; + + /* Get the size of the shared memory */ + shmem_bytes = ShmemGetSize(); + shmem_mb = asMB(shmem_bytes); + + /* Initialize the global memory counters. Total memory includes shared memory */ + pg_atomic_init_u64(&global->total_memory_reserved, shmem_bytes); + pg_atomic_init_u64(&global->total_dsm_reserved, 0); + + /* + * Validate the server's memory limit if one is set. + */ + if (max_total_memory_mb > 0) + { + Size connection_mb; + Size required_mb; + + /* Error if backend memory limit is less than shared memory size */ + if (max_total_memory_mb < shmem_mb) + ereport(ERROR, + errmsg("configured max_total_memory %dMB is < shared_memory_size %zuMB", + max_total_memory_mb, shmem_mb), + errhint("Disable or increase the configuration parameter \"max_total_memory\".")); + + /* Decide how much memory is needed to support the connections. */ + connection_mb = asMB(MaxConnections * (initial_allocation_allowance + allocation_allowance_refill_qty)); + required_mb = shmem_mb + connection_mb; + + /* Warning if there isn't anough memory to support the connections */ + if (max_total_memory_mb < required_mb) + ereport(WARNING, + errmsg("max_total_memory %dMB should be increased to at least %zuMB to support %d connections", + max_total_memory_mb, required_mb, MaxConnections)); + + /* We prefer to use max_total_memory_mb as bytes rather than MB */ + max_total_memory_bytes = (int64) max_total_memory_mb * 1024 * 1024; + } +} + + +/* + * Take a snapshot of the global memtrack values if not + * already done, and point to the snapshot values. + */ +PgStat_Memtrack * +pgstat_fetch_stat_memtrack(void) +{ + /* Take a snapshot of both the memtrack globals and the backends */ + pgstat_read_current_status(); + + /* Return a pointer to the globals snapshot */ + return &pgStatLocal.snapshot.memtrack; +} + + +/* + * Callback to populate the memtrack globals snapshot with current values. + */ +void +pgstat_memtrack_snapshot_cb(void) +{ + PgStatShared_Memtrack *global = &pgStatLocal.shmem->memtrack; + PgStat_Memtrack *snap = &pgStatLocal.snapshot.memtrack; + + /* Get a copy of the postmaster's memory allocations */ + pgstat_copy_changecounted_stats(&snap->postmasterMemory, + &global->postmasterMemory, + sizeof(snap->postmasterMemory), + &global->postmasterChangeCount); + + /* Get a copy of the global atomic counters. */ + snap->dsm_reserved = (int64) pg_atomic_read_u64(&global->total_dsm_reserved); + snap->total_reserved = (int64) pg_atomic_read_u64(&global->total_memory_reserved); +} + + +/* + * SQL callable function to get the memory allocation of PG backends. + * Returns a row for each backend, consisting of: + * pid - backend's process id + * allocated_bytes - total number of bytes allocated by backend + * init_allocated_bytes - subtotal attributed to each process at startup + * aset_allocated_bytes - subtotal from allocation sets + * dsm_allocated_bytes - subtotal attributed to dynamic shared memory (DSM) + * generation_allocated_bytes - subtotal from generation allocator + * slab_allocated_bytes - subtotal from slab allocator + */ +Datum +pg_stat_get_memory_reservation(PG_FUNCTION_ARGS) +{ +#define RESERVATION_COLS (7) + int num_backends; + int backendIdx; + Datum values[RESERVATION_COLS]; + bool nulls[RESERVATION_COLS]; + + ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; + InitMaterializedSRF(fcinfo, 0); + + /* Take a snapshot if not already done */ + pgstat_read_current_status(); + + /* Get the postmaster memory reservations and output the row */ + get_postmaster_reservation_row(nulls, values); + tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, values, nulls); + + /* Do for each backend */ + num_backends = pgstat_fetch_stat_numbackends(); + for (backendIdx = 1; backendIdx <= num_backends; backendIdx++) + { + /* Get the backend's memory reservations and output the row */ + get_backend_reservation_row(backendIdx, nulls, values); + tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, values, nulls); + } + + return (Datum)0; +} + + +/* + * Get a backend process' memory reservations as a row of values. + */ +static void +get_backend_reservation_row(int idx, bool *nulls, Datum *values) +{ + LocalPgBackendStatus *local_beentry; + PgBackendStatus *beentry; + + /* Fetch the data for the backend */ + local_beentry = pgstat_get_local_beentry_by_index(idx); + beentry = &local_beentry->backendStatus; + + clearRow(nulls, values, RESERVATION_COLS); + + /* Process id */ + values[0] = Int32GetDatum(beentry->st_procpid); + + /* total memory allocated */ + values[1] = UInt64GetDatum(beentry->st_memory.total); + + /* Subtotals of memory */ + values[2] = UInt64GetDatum(beentry->st_memory.subTotal[PG_ALLOC_INIT]); + values[3] = UInt64GetDatum(beentry->st_memory.subTotal[PG_ALLOC_ASET]); + values[4] = UInt64GetDatum(beentry->st_memory.subTotal[PG_ALLOC_DSM]); + values[5] = UInt64GetDatum(beentry->st_memory.subTotal[PG_ALLOC_GENERATION]); + values[6] = UInt64GetDatum(beentry->st_memory.subTotal[PG_ALLOC_SLAB]); +} + + +/* + * Get the Postmaster's memory allocation as a row of values. + */ +static void +get_postmaster_reservation_row(bool *nulls, Datum *values) +{ + PgStat_Memtrack *memtrack; + + clearRow(nulls, values, RESERVATION_COLS); + + /* Fetch the values and build a row */ + memtrack = pgstat_fetch_stat_memtrack(); + + /* postmaster pid */ + values[0] = PostmasterPid; + + /* Report total menory allocated */ + values[1] = UInt64GetDatum(memtrack->postmasterMemory.total); + + /* Report subtotals of memory allocated */ + /* Subtotals of memory */ + values[2] = UInt64GetDatum(memtrack->postmasterMemory.subTotal[PG_ALLOC_INIT]); + values[3] = UInt64GetDatum(memtrack->postmasterMemory.subTotal[PG_ALLOC_ASET]); + values[4] = UInt64GetDatum(memtrack->postmasterMemory.subTotal[PG_ALLOC_DSM]); + values[5] = UInt64GetDatum(memtrack->postmasterMemory.subTotal[PG_ALLOC_GENERATION]); + values[6] = UInt64GetDatum(memtrack->postmasterMemory.subTotal[PG_ALLOC_SLAB]); +} + + +/* + * SQL callable function to get the server's memory reservation statistics. + * Returns a single row with the following values (in bytes) + * total_memory_reserved - total memory reserved by server + * dsm_memory_reserved - dsm memory reserved by server + * total_memory_available - memory remaining (null if no limit set) + * static_shared_memory - configured shared memory + */ +Datum +pg_stat_get_global_memory_tracking(PG_FUNCTION_ARGS) +{ +#define MEMTRACK_COLS 4 + + TupleDesc tupdesc; + int64 total_memory_reserved; + Datum values[MEMTRACK_COLS]; + bool nulls[MEMTRACK_COLS]; + PgStat_Memtrack *snap; + + /* Get access to the snapshot */ + snap = pgstat_fetch_stat_memtrack(); + + /* Initialise attributes information in the tuple descriptor. */ + tupdesc = CreateTemplateTupleDesc(MEMTRACK_COLS); + TupleDescInitEntry(tupdesc, (AttrNumber) 1, "total_memory_reserved", + INT8OID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 2, "dsm_memory_reserved", + INT8OID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 3, "total_memory_available", + INT8OID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 4, "static_shared_memory", + INT8OID, -1, 0); + BlessTupleDesc(tupdesc); + + /* Start with clean row */ + clearRow(nulls, values, MEMTRACK_COLS); + + /* Get total_memory_reserved */ + total_memory_reserved = snap->total_reserved; + values[0] = Int64GetDatum(total_memory_reserved); + + /* Get dsm_memory_reserved */ + values[1] = Int64GetDatum(snap->dsm_reserved); + + /* Get total_memory_available */ + if (max_total_memory_bytes > 0) + values[2] = Int64GetDatum(max_total_memory_bytes - total_memory_reserved); + else + nulls[2] = true; + + /* Get the static shared memory size in bytes. More precise than GUC value. */ + values[3] = ShmemGetSize(); + + /* Return the single record */ + PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls))); +} + +/* + * SQL callable function to return the memory reservations + * for the calling backend. This function returns current + * accurate numbers, whereas pg_stat_memory_reservation() returns + * slightly out-of-date, approximate numbers. + */ +Datum +pg_get_backend_memory_allocation(PG_FUNCTION_ARGS) +{ +#define BACKEND_COLS (6) + ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; + Datum values[BACKEND_COLS]; + bool nulls[BACKEND_COLS]; + + /* A single row */ + InitMaterializedSRF(fcinfo, 0); + clearRow(nulls, values, BACKEND_COLS); + + /* pid */ + values[0] = UInt32GetDatum(MyProcPid); + + /* + * Get the total memory from scanning the contexts. + */ + if (TopMemoryContext == NULL) + nulls[1] = true; + else + values[1] = UInt64GetDatum(getContextMemoryTotal()); + + /* Report subtotals of memory allocated (don't report "initial" reservation */ + values[2] = UInt64GetDatum(my_memory.subTotal[PG_ALLOC_ASET]); + values[3] = UInt64GetDatum(my_memory.subTotal[PG_ALLOC_DSM]); + values[4] = UInt64GetDatum(my_memory.subTotal[PG_ALLOC_GENERATION]); + values[5] = UInt64GetDatum(my_memory.subTotal[PG_ALLOC_SLAB]); + + /* Return a single tuple */ + tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, values, nulls); + return (Datum) 0; +} + +/* + * How much memory is allocated to contexts. + * Scan active contexts starting at the top context, and add in freed contexts + * from the various allocators. + */ +int64 +getContextMemoryTotal() +{ + return MemoryContextMemAllocated(TopMemoryContext, true) + + AllocSetGetFreeMem(); +} + +/* + * Clear out a row of values. + */ +static void +clearRow(bool *nulls, Datum *values, int count) +{ + int idx; + for (idx = 0; idx < count; idx++) + { + nulls[idx] = false; + values[idx] = (Datum)0; + } +} + +/* + * Convert size in bytes to size in MB, rounding up. + */ +inline static Size +asMB(Size bytes) +{ + return ((bytes + 1024 * 1024 - 1) / (1024 * 1024)); +} diff --git a/src/backend/utils/activity/pgstat_shmem.c b/src/backend/utils/activity/pgstat_shmem.c index d1149adf70..ef3fc3d9fe 100644 --- a/src/backend/utils/activity/pgstat_shmem.c +++ b/src/backend/utils/activity/pgstat_shmem.c @@ -15,6 +15,7 @@ #include "pgstat.h" #include "storage/shmem.h" #include "utils/memutils.h" +#include "utils/memtrack.h" #include "utils/pgstat_internal.h" @@ -158,6 +159,9 @@ StatsShmemInit(void) /* the allocation of pgStatLocal.shmem itself */ p += MAXALIGN(sizeof(PgStat_ShmemControl)); + /* Initialize memory tracking before any new memory is allocated */ + pgstat_init_memtrack(&ctl->memtrack); + /* * Create a small dsa allocation in plain shared memory. This is * required because postmaster cannot use dsm segments. It also @@ -211,6 +215,9 @@ StatsShmemInit(void) { Assert(found); } + + /* Report startup memory allocations to pgstat */ + update_global_reservation(0, 0); } void diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c index 7605eff9b9..dcaab2f571 100644 --- a/src/backend/utils/misc/guc_tables.c +++ b/src/backend/utils/misc/guc_tables.c @@ -79,6 +79,7 @@ #include "utils/float.h" #include "utils/guc_hooks.h" #include "utils/guc_tables.h" +#include "utils/memtrack.h" #include "utils/memutils.h" #include "utils/pg_locale.h" #include "utils/portal.h" @@ -3518,6 +3519,17 @@ struct config_int ConfigureNamesInt[] = NULL, NULL, NULL }, + { + {"max_total_memory", PGC_SU_BACKEND, RESOURCES_MEM, + gettext_noop("Restrict total memory used by the database server."), + gettext_noop("0 turns this feature off."), + GUC_UNIT_MB + }, + &max_total_memory_mb, + 0, 0, INT_MAX, + NULL, NULL, NULL + }, + /* End-of-list marker */ { {NULL, 0, 0, NULL, NULL}, NULL, 0, 0, 0, NULL, NULL, NULL diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample index e48c066a5b..686b3cdbd3 100644 --- a/src/backend/utils/misc/postgresql.conf.sample +++ b/src/backend/utils/misc/postgresql.conf.sample @@ -160,6 +160,9 @@ #vacuum_buffer_usage_limit = 256kB # size of vacuum and analyze buffer access strategy ring; # 0 to disable vacuum buffer access strategy; # range 128kB to 16GB +#max_total_memory = 0MB # Restrict total backend memory allocations + # to this max (in MB). 0 turns this feature + # off. # - Disk - diff --git a/src/backend/utils/mmgr/aset.c b/src/backend/utils/mmgr/aset.c index c3affaf5a8..1ccd929118 100644 --- a/src/backend/utils/mmgr/aset.c +++ b/src/backend/utils/mmgr/aset.c @@ -48,6 +48,7 @@ #include "port/pg_bitutils.h" #include "utils/memdebug.h" +#include "utils/memtrack.h" #include "utils/memutils.h" #include "utils/memutils_memorychunk.h" #include "utils/memutils_internal.h" @@ -441,7 +442,7 @@ AllocSetContextCreateInternal(MemoryContext parent, * Allocate the initial block. Unlike other aset.c blocks, it starts with * the context header and its block header follows that. */ - set = (AllocSet) malloc(firstBlockSize); + set = (AllocSet) malloc_tracked(firstBlockSize, PG_ALLOC_ASET); if (set == NULL) { if (TopMemoryContext) @@ -581,11 +582,7 @@ AllocSetReset(MemoryContext context) { /* Normal case, release the block */ context->mem_allocated -= block->endptr - ((char *) block); - -#ifdef CLOBBER_FREED_MEMORY - wipe_mem(block, block->freeptr - ((char *) block)); -#endif - free(block); + free_tracked(block, block->endptr - ((char *) block), PG_ALLOC_ASET); } block = next; } @@ -608,7 +605,7 @@ AllocSetDelete(MemoryContext context) { AllocSet set = (AllocSet) context; AllocBlock block = set->blocks; - Size keepersize PG_USED_FOR_ASSERTS_ONLY; + Size keepersize; Assert(AllocSetIsValid(set)); @@ -617,7 +614,7 @@ AllocSetDelete(MemoryContext context) AllocSetCheck(context); #endif - /* Remember keeper block size for Assert below */ + /* Remember keeper block size */ keepersize = KeeperBlock(set)->endptr - ((char *) set); /* @@ -649,7 +646,7 @@ AllocSetDelete(MemoryContext context) freelist->num_free--; /* All that remains is to free the header/initial block */ - free(oldset); + free_tracked(oldset, oldset->header.mem_allocated, PG_ALLOC_ASET); } Assert(freelist->num_free == 0); } @@ -668,14 +665,10 @@ AllocSetDelete(MemoryContext context) AllocBlock next = block->next; if (!IsKeeperBlock(set, block)) + { context->mem_allocated -= block->endptr - ((char *) block); - -#ifdef CLOBBER_FREED_MEMORY - wipe_mem(block, block->freeptr - ((char *) block)); -#endif - - if (!IsKeeperBlock(set, block)) - free(block); + free_tracked(block, block->endptr - ((char *) block), PG_ALLOC_ASET); + } block = next; } @@ -683,7 +676,7 @@ AllocSetDelete(MemoryContext context) Assert(context->mem_allocated == keepersize); /* Finally, free the context header, including the keeper block */ - free(set); + free_tracked(set, keepersize, PG_ALLOC_ASET); } /* @@ -725,7 +718,7 @@ AllocSetAlloc(MemoryContext context, Size size) #endif blksize = chunk_size + ALLOC_BLOCKHDRSZ + ALLOC_CHUNKHDRSZ; - block = (AllocBlock) malloc(blksize); + block = (AllocBlock) malloc_tracked(blksize, PG_ALLOC_ASET); if (block == NULL) return NULL; @@ -925,7 +918,7 @@ AllocSetAlloc(MemoryContext context, Size size) blksize <<= 1; /* Try to allocate it */ - block = (AllocBlock) malloc(blksize); + block = (AllocBlock) malloc_tracked(blksize, PG_ALLOC_ASET); /* * We could be asking for pretty big blocks here, so cope if malloc @@ -936,7 +929,7 @@ AllocSetAlloc(MemoryContext context, Size size) blksize >>= 1; if (blksize < required_size) break; - block = (AllocBlock) malloc(blksize); + block = (AllocBlock) malloc_tracked(blksize, PG_ALLOC_ASET); } if (block == NULL) @@ -1040,11 +1033,7 @@ AllocSetFree(void *pointer) block->next->prev = block->prev; set->header.mem_allocated -= block->endptr - ((char *) block); - -#ifdef CLOBBER_FREED_MEMORY - wipe_mem(block, block->freeptr - ((char *) block)); -#endif - free(block); + free_tracked(block, block->freeptr - ((char *) block), PG_ALLOC_ASET); } else { @@ -1160,9 +1149,11 @@ AllocSetRealloc(void *pointer, Size size) blksize = chksize + ALLOC_BLOCKHDRSZ + ALLOC_CHUNKHDRSZ; oldblksize = block->endptr - ((char *) block); - block = (AllocBlock) realloc(block, blksize); + block = (AllocBlock) realloc_tracked(block, blksize, oldblksize, PG_ALLOC_ASET); if (block == NULL) { + set->header.mem_allocated -= oldblksize; + /* Disallow access to the chunk header. */ VALGRIND_MAKE_MEM_NOACCESS(chunk, ALLOC_CHUNKHDRSZ); return NULL; @@ -1521,6 +1512,25 @@ AllocSetStats(MemoryContext context, } } +#define countof(array) (sizeof(array) / sizeof(array[0])) + +/* + * Get the amount of memory attributed to deleted contexts. + * (which are on free list rather than actually deleted) + */ +int64 +AllocSetGetFreeMem() +{ + MemoryContext ctx; + int64 total = 0; + + for (int idx = 0; idx < countof(context_freelists); idx++) + for (ctx = (void *)context_freelists[idx].first_free; ctx != NULL; ctx = ctx->nextchild) + total += ctx->mem_allocated; + + return total; +} + #ifdef MEMORY_CONTEXT_CHECKING diff --git a/src/backend/utils/mmgr/generation.c b/src/backend/utils/mmgr/generation.c index 92401ccf73..7da212f7dd 100644 --- a/src/backend/utils/mmgr/generation.c +++ b/src/backend/utils/mmgr/generation.c @@ -41,6 +41,7 @@ #include "utils/memutils.h" #include "utils/memutils_memorychunk.h" #include "utils/memutils_internal.h" +#include "utils/memtrack.h" #define Generation_BLOCKHDRSZ MAXALIGN(sizeof(GenerationBlock)) @@ -203,7 +204,7 @@ GenerationContextCreate(MemoryContext parent, * Allocate the initial block. Unlike other generation.c blocks, it * starts with the context header and its block header follows that. */ - set = (GenerationContext *) malloc(allocSize); + set = (GenerationContext *) malloc_tracked(allocSize, PG_ALLOC_GENERATION); if (set == NULL) { MemoryContextStats(TopMemoryContext); @@ -262,7 +263,7 @@ GenerationContextCreate(MemoryContext parent, parent, name); - ((MemoryContext) set)->mem_allocated = firstBlockSize; + ((MemoryContext) set)->mem_allocated = allocSize; return (MemoryContext) set; } @@ -325,7 +326,7 @@ GenerationDelete(MemoryContext context) /* Reset to release all releasable GenerationBlocks */ GenerationReset(context); /* And free the context header and keeper block */ - free(context); + free_tracked(context, context->mem_allocated, PG_ALLOC_GENERATION); // TODO: don't add if earlier change also made. } /* @@ -365,7 +366,7 @@ GenerationAlloc(MemoryContext context, Size size) { Size blksize = required_size + Generation_BLOCKHDRSZ; - block = (GenerationBlock *) malloc(blksize); + block = (GenerationBlock *) malloc_tracked(blksize, PG_ALLOC_GENERATION); if (block == NULL) return NULL; @@ -467,8 +468,7 @@ GenerationAlloc(MemoryContext context, Size size) if (blksize < required_size) blksize = pg_nextpower2_size_t(required_size); - block = (GenerationBlock *) malloc(blksize); - + block = (GenerationBlock *) malloc_tracked(blksize, PG_ALLOC_GENERATION); if (block == NULL) return NULL; @@ -607,11 +607,7 @@ GenerationBlockFree(GenerationContext *set, GenerationBlock *block) ((MemoryContext) set)->mem_allocated -= block->blksize; -#ifdef CLOBBER_FREED_MEMORY - wipe_mem(block, block->blksize); -#endif - - free(block); + free_tracked(block, block->blksize, PG_ALLOC_GENERATION); } /* @@ -725,7 +721,7 @@ GenerationFree(void *pointer) dlist_delete(&block->node); set->header.mem_allocated -= block->blksize; - free(block); + free_tracked(block, block->blksize, PG_ALLOC_GENERATION); } /* @@ -1028,7 +1024,7 @@ GenerationCheck(MemoryContext context) GenerationContext *gen = (GenerationContext *) context; const char *name = context->name; dlist_iter iter; - Size total_allocated = 0; + Size total_allocated = sizeof(GenerationContext); /* walk all blocks in this context */ dlist_foreach(iter, &gen->blocks) diff --git a/src/backend/utils/mmgr/slab.c b/src/backend/utils/mmgr/slab.c index 40c1d401c4..aac999003f 100644 --- a/src/backend/utils/mmgr/slab.c +++ b/src/backend/utils/mmgr/slab.c @@ -70,6 +70,7 @@ #include "lib/ilist.h" #include "utils/memdebug.h" +#include "utils/memtrack.h" #include "utils/memutils.h" #include "utils/memutils_memorychunk.h" #include "utils/memutils_internal.h" @@ -359,9 +360,7 @@ SlabContextCreate(MemoryContext parent, elog(ERROR, "block size %zu for slab is too small for %zu-byte chunks", blockSize, chunkSize); - - - slab = (SlabContext *) malloc(Slab_CONTEXT_HDRSZ(chunksPerBlock)); + slab = (SlabContext *) malloc_tracked(Slab_CONTEXT_HDRSZ(chunksPerBlock), PG_ALLOC_SLAB); if (slab == NULL) { MemoryContextStats(TopMemoryContext); @@ -417,6 +416,7 @@ SlabContextCreate(MemoryContext parent, parent, name); + slab->header.mem_allocated = Slab_CONTEXT_HDRSZ(slab->chunksPerBlock); return (MemoryContext) slab; } @@ -448,10 +448,7 @@ SlabReset(MemoryContext context) dclist_delete_from(&slab->emptyblocks, miter.cur); -#ifdef CLOBBER_FREED_MEMORY - wipe_mem(block, slab->blockSize); -#endif - free(block); + free_tracked(block, slab->blockSize, PG_ALLOC_SLAB); context->mem_allocated -= slab->blockSize; } @@ -464,17 +461,14 @@ SlabReset(MemoryContext context) dlist_delete(miter.cur); -#ifdef CLOBBER_FREED_MEMORY - wipe_mem(block, slab->blockSize); -#endif - free(block); + free_tracked(block, slab->blockSize, PG_ALLOC_SLAB); context->mem_allocated -= slab->blockSize; } } slab->curBlocklistIndex = 0; - Assert(context->mem_allocated == 0); + Assert(context->mem_allocated == Slab_CONTEXT_HDRSZ(((SlabContext *) context)->chunksPerBlock)); } /* @@ -487,7 +481,8 @@ SlabDelete(MemoryContext context) /* Reset to release all the SlabBlocks */ SlabReset(context); /* And free the context header */ - free(context); + free_tracked(context, Slab_CONTEXT_HDRSZ(((SlabContext *) context)->chunksPerBlock), + PG_ALLOC_SLAB); } /* @@ -543,8 +538,7 @@ SlabAlloc(MemoryContext context, Size size) } else { - block = (SlabBlock *) malloc(slab->blockSize); - + block = (SlabBlock *) malloc_tracked(slab->blockSize, PG_ALLOC_SLAB); if (unlikely(block == NULL)) return NULL; @@ -739,10 +733,7 @@ SlabFree(void *pointer) * When we have enough empty blocks stored already, we actually * free the block. */ -#ifdef CLOBBER_FREED_MEMORY - wipe_mem(block, slab->blockSize); -#endif - free(block); + free_tracked(block, slab->blockSize, PG_ALLOC_SLAB); slab->header.mem_allocated -= slab->blockSize; } @@ -860,7 +851,7 @@ SlabIsEmpty(MemoryContext context) { Assert(SlabIsValid((SlabContext *) context)); - return (context->mem_allocated == 0); + return (context->mem_allocated == Slab_CONTEXT_HDRSZ(((SlabContext *) context)->chunksPerBlock)); } /* @@ -1095,7 +1086,7 @@ SlabCheck(MemoryContext context) /* the stored empty blocks are tracked in mem_allocated too */ nblocks += dclist_count(&slab->emptyblocks); - Assert(nblocks * slab->blockSize == context->mem_allocated); + Assert(nblocks * slab->blockSize + Slab_CONTEXT_HDRSZ(((SlabContext *) context)->chunksPerBlock) == context->mem_allocated); } #endif /* MEMORY_CONTEXT_CHECKING */ diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index 568aa80d92..1667706d0b 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -5423,6 +5423,32 @@ proname => 'pg_stat_get_backend_idset', prorows => '100', proretset => 't', provolatile => 's', proparallel => 'r', prorettype => 'int4', proargtypes => '', prosrc => 'pg_stat_get_backend_idset' }, +{ oid => '9890', + descr => 'statistics: memory reserved per process', + proname => 'pg_stat_get_memory_reservation', prorows => '100', proisstrict => 'f', + proretset => 't', provolatile => 's', proparallel => 'r', + prorettype => 'record', proargtypes => '', + proallargtypes => '{int4,int4,int8,int8,int8,int8,int8,int8}', + proargmodes => '{i,o,o,o,o,o,o,o}', + proargnames => '{pid,pid,total_reserved,init_reserved,aset_reserved,dsm_reserved,generation_reserved,slab_reserved}', + prosrc => 'pg_stat_get_memory_reservation' }, +{ oid => '9891', + descr => 'statistics: memory utilized by current backend', + proname => 'pg_get_backend_memory_allocation', prorows => '1', proisstrict => 'f', + proretset => 't', provolatile => 's', proparallel => 'r', + prorettype => 'record', proargtypes => '', + proallargtypes => '{int4,int4,int8,int8,int8,int8,int8}', + proargmodes => '{i,o,o,o,o,o,o}', + proargnames => '{pid,pid,total_top_context_allocated,aset_allocated,dsm_allocated,generation_allocated,slab_allocated}', + prosrc => 'pg_get_backend_memory_allocation' }, +{ oid => '9892', + descr => 'statistics: total memory reserved for the server', + proname => 'pg_stat_get_global_memory_tracking', proisstrict => 'f', + provolatile => 's', proparallel => 'r', prorettype => 'record', + proargtypes => '', proallargtypes => '{int8,int8,int8,int8}', + proargmodes => '{o,o,o,o}', + proargnames => '{total_memory_reserved,dsm_memory_reserved,total_memory_available,static_shared_memory}', + prosrc =>'pg_stat_get_global_memory_tracking'}, { oid => '2022', descr => 'statistics: information about currently active backends', proname => 'pg_stat_get_activity', prorows => '100', proisstrict => 'f', diff --git a/src/include/pgstat.h b/src/include/pgstat.h index f95d8db0c4..62a66cc9cc 100644 --- a/src/include/pgstat.h +++ b/src/include/pgstat.h @@ -51,10 +51,10 @@ typedef enum PgStat_Kind PGSTAT_KIND_IO, PGSTAT_KIND_SLRU, PGSTAT_KIND_WAL, + PGSTAT_KIND_MEMORYTRACK, } PgStat_Kind; - #define PGSTAT_KIND_FIRST_VALID PGSTAT_KIND_DATABASE -#define PGSTAT_KIND_LAST PGSTAT_KIND_WAL +#define PGSTAT_KIND_LAST PGSTAT_KIND_MEMORYTRACK #define PGSTAT_NUM_KINDS (PGSTAT_KIND_LAST + 1) /* Values for track_functions GUC variable --- order is significant! */ @@ -453,6 +453,15 @@ typedef struct PgStat_PendingWalStats instr_time wal_sync_time; } PgStat_PendingWalStats; +/* + * snapshot of global memory tracking statistics, including postmaster. + */ +typedef struct PgStat_Memtrack +{ + PgStat_Memory postmasterMemory; + int64 total_reserved; + int64 dsm_reserved; +} PgStat_Memtrack; /* * Functions in pgstat.c @@ -717,6 +726,11 @@ extern void pgstat_execute_transactional_drops(int ndrops, struct xl_xact_stats_ extern void pgstat_report_wal(bool force); extern PgStat_WalStats *pgstat_fetch_stat_wal(void); +/* + * Functions in pgstat_memtrack.c + */ +extern PgStat_Memtrack *pgstat_fetch_stat_memtrack(void); +extern void pgstat_report_postmaster_memory(void); /* * Variables in pgstat.c diff --git a/src/include/port/atomics.h b/src/include/port/atomics.h index bbff945eba..28a31335da 100644 --- a/src/include/port/atomics.h +++ b/src/include/port/atomics.h @@ -514,6 +514,25 @@ pg_atomic_sub_fetch_u64(volatile pg_atomic_uint64 *ptr, int64 sub_) return pg_atomic_sub_fetch_u64_impl(ptr, sub_); } +/* + * Add to an atomic sum, returning true if the new sum is within + * the specified limit. Return false and don't add if the new sum + * would exceed the limit. + * + * This routine implements a "Limit Counter" as described + * in "Is Parallel Programming Hard ...?" by Paul McKenney. + * https://mirrors.edge.kernel.org/pub/linux/kernel/people/paulmck/perfbook/perfbook.html + */ +static inline bool +pg_atomic_fetch_add_limit_u64(volatile pg_atomic_uint64 *ptr, int64 add, + uint64 limit, uint64 *oldval) +{ +#ifndef PG_HAVE_ATOMIC_U64_SIMULATION + AssertPointerAlignment(ptr, 8); +#endif + return pg_atomic_fetch_add_limit_u64_impl(ptr, add, limit, oldval); +} + #undef INSIDE_ATOMICS_H #endif /* ATOMICS_H */ diff --git a/src/include/port/atomics/fallback.h b/src/include/port/atomics/fallback.h index a9e8e77c03..b7b563c040 100644 --- a/src/include/port/atomics/fallback.h +++ b/src/include/port/atomics/fallback.h @@ -167,4 +167,8 @@ extern bool pg_atomic_compare_exchange_u64_impl(volatile pg_atomic_uint64 *ptr, #define PG_HAVE_ATOMIC_FETCH_ADD_U64 extern uint64 pg_atomic_fetch_add_u64_impl(volatile pg_atomic_uint64 *ptr, int64 add_); +#define PG_HAVE_ATOMIC_FETCH_ADD_LIMIT_U64 +extern bool pg_atomic_fetch_add_limit_u64_impl(volatile pg_atomic_uint64 *ptr, + int64 add, uint64 limit, uint64 *oldval); + #endif /* PG_HAVE_ATOMIC_U64_SIMULATION */ diff --git a/src/include/port/atomics/generic.h b/src/include/port/atomics/generic.h index cb5804adbf..f05fb73e86 100644 --- a/src/include/port/atomics/generic.h +++ b/src/include/port/atomics/generic.h @@ -399,3 +399,30 @@ pg_atomic_sub_fetch_u64_impl(volatile pg_atomic_uint64 *ptr, int64 sub_) return pg_atomic_fetch_sub_u64_impl(ptr, sub_) - sub_; } #endif + +#if !defined(PG_HAVE_ATOMIC_FETCH_ADD_LIMIT_U64) && defined(PG_HAVE_ATOMIC_COMPARE_EXCHANGE_U64) +#define PG_HAVE_ATOMIC_FETCH_ADD_LIMIT_U64 + +/* + * Emulate the atomic op using a Compare And Swap (CAS) loop. + * This emulation is used for most platforms, but some (eg RISC-V) + * can implement it directly. + */ +static inline bool +pg_atomic_fetch_add_limit_u64_impl(volatile pg_atomic_uint64 *sum, uint64 add, uint64 limit, uint64 *oldval) +{ + uint64 newval; + + /* CAS loop until successful or until new sum would be out of bounds */ + *oldval = pg_atomic_read_u64_impl(sum); + do + { + newval = *oldval + add; + if (newval > limit || newval < *oldval) /* Includes overflow test */ + return false; + + } while (!pg_atomic_compare_exchange_u64_impl(sum, oldval, newval)); + + return true; +} +#endif diff --git a/src/include/storage/pg_shmem.h b/src/include/storage/pg_shmem.h index aea769920c..bf4d7af6fb 100644 --- a/src/include/storage/pg_shmem.h +++ b/src/include/storage/pg_shmem.h @@ -89,5 +89,6 @@ extern PGShmemHeader *PGSharedMemoryCreate(Size size, extern bool PGSharedMemoryIsInUse(unsigned long id1, unsigned long id2); extern void PGSharedMemoryDetach(void); extern void GetHugePageSize(Size *hugepagesize, int *mmap_flags); +extern Size ShmemGetSize(void); #endif /* PG_SHMEM_H */ diff --git a/src/include/utils/backend_status.h b/src/include/utils/backend_status.h index 75fc18c432..8c39e707c2 100644 --- a/src/include/utils/backend_status.h +++ b/src/include/utils/backend_status.h @@ -82,6 +82,33 @@ typedef struct PgBackendGSSStatus } PgBackendGSSStatus; +/* + * Track memory coming from the following allocators + */ +typedef enum pg_allocator_type +{ + PG_ALLOC_INIT = 0, /* Assumed as part of process initialization */ + PG_ALLOC_ASET, /* Allocation Set */ + PG_ALLOC_DSM, /* Dynamic shared memory */ + PG_ALLOC_GENERATION, /* Generation allocator (all freed at once) */ + PG_ALLOC_SLAB, /* Slab allocator */ +} pg_allocator_type; + +#define PG_ALLOC_TYPE_MAX (PG_ALLOC_SLAB + 1) + +/* + * Track memory reserved by each process, + * This structure is used both for backends and for the postmaster. + * Note subTotal[PG_ALLOC_DSM] can go negative if one process + * frees DSM memory which was reserved by another process. + */ +typedef struct PgStat_Memory +{ + int64 total; + int64 subTotal[PG_ALLOC_TYPE_MAX]; +} PgStat_Memory; + + /* ---------- * PgBackendStatus * @@ -170,6 +197,10 @@ typedef struct PgBackendStatus /* query identifier, optionally computed using post_parse_analyze_hook */ uint64 st_query_id; + + /* Memory allocated to this backend, both total and subtotals by type. */ + PgStat_Memory st_memory; + } PgBackendStatus; @@ -332,6 +363,7 @@ extern uint64 pgstat_get_my_query_id(void); * generate the pgstat* views. * ---------- */ +extern void pgstat_read_current_status(void); extern int pgstat_fetch_stat_numbackends(void); extern PgBackendStatus *pgstat_get_beentry_by_backend_id(BackendId beid); extern LocalPgBackendStatus *pgstat_get_local_beentry_by_backend_id(BackendId beid); diff --git a/src/include/utils/memtrack.h b/src/include/utils/memtrack.h new file mode 100644 index 0000000000..8d21faf1fe --- /dev/null +++ b/src/include/utils/memtrack.h @@ -0,0 +1,267 @@ +#ifndef MEMTRACK_H +#define MEMTRACK_H + +/* ---------- + * Memory accounting functions. + * Track how much memory each process is using and place an + * overall limit on how much memory a database server can allocate. + * + * The main functions are: + * fork_tracked_memory() + * reserve_tracked_memory() + * release_tracked_memory() + * exit_tracked_memory() + * + * These routines implement an approximate total for memory allocated by the + * database server. For efficency, each process accurately tracks its own memory, + * but it only periodicaly updates to the global total. This approximate total is + * "close enough" for monitoring memory use by the server. + * + * Note we make an exception for DSM memory. The global total for DSM memory is + * always kept up-to-date. The problem is, an approximate DSM total can go negative as + * DSM is released. While not inherently evil, a negative DSM total could be extremely + * confusing. + * + * All private variables are properly initialized at startup, so fork_tracked_memory() + * only needs to be called after a fork() system call. + * + * The reserve/release functions implement both a "fast path" and a "slow path". + * The fast path is used for most allocations, and it only references + * private (hon-shared) variables. The slow path is invoked periodically; it updates + * shared memory and checks for limits on total server memory. + * + * The following private variables represent the "TRUTH" of how much memory the process allocated. + * The total can be calculated as the sum of the subtotals, but keeping a separate total simplifies + * and shortens some of the code paths. + * my_memory.total: total amount of memory allocated by this process + * my_memory.subTotal[type]: subtotals by allocator type. + * + * The private values are periodically reported to pgstat. + * The following variables hold the last reported values + * reported_memory.total + * reported_memory.subTotal[type]: + * + * The "slow path" is invoked when my_memory.allocated_bytes exceeds these bounds. + * Once invoked, it updates the reported values and sets new bounds. + * reservation_upper_bound: update when my_memory.allocated_bytes exceeds this + * reservation_lower_bound: update when my_memory.allocated_bytes drops below this + * allocation_allowance_refill_qty amount of memory to allocate or release before updating again. + * + * These counters are the values seen by pgstat. They are a copy of reported_memory. + * proc->st_memory.total: total for pgstat + * proc->st_memory.subTotal[type]: last subtotals for pgstat + * + * Limits on total server memory. If max_total_memory_bytes is zero, there is no limit. + * ProcGlobal->total_reserved: total amount of memory reserved by the server, including shared memory + * max_total_memory_bytes: maximum memory the server can allocate + * + * And finally, + * initial_allocation_allowance: each process consumes this much memory simply by existing. + * ProcGlobal->dsm_reserved: total amount of DSM memory allocated by the server + * + * Note this header file works in conjunction with memtrack.c and pgstat_memtrack.c. + * The former is focused on gathering memory data and implementing a max + * limit, while the latter implements views for reporting memory statistics. + * The two sets of routines work together and share common data structures. + * ---------- + */ + +#include +#include "postgres.h" +#include "fmgr.h" +#include "memdebug.h" +#include "common/int.h" +#include "port/atomics.h" +#include "utils/backend_status.h" +#include "utils/memtrack.h" +#include "utils/pgstat_internal.h" + +/* This value is a candidate to be a GUC variable. We chose 1MB arbitrarily. */ +static const int64 allocation_allowance_refill_qty = 1024 * 1024; /* 1MB */ + +/* Compile time initialization constants */ +#define initial_allocation_allowance (1024 * 1024) +#define INITIAL_ALLOCATED_MEMORY (PgStat_Memory) \ + {initial_allocation_allowance, {initial_allocation_allowance}} +#define NO_ALLOCATED_MEMORY (PgStat_Memory) \ + {0, {0}} + +/* Manage memory allocation for backends. */ +extern PGDLLIMPORT PgStat_Memory my_memory; +extern PGDLLIMPORT PgStat_Memory reported_memory; +extern PGDLLIMPORT PgStat_Memory my_memory_snap; +extern PGDLLIMPORT int64 reservation_upper_bound; +extern PGDLLIMPORT int64 reservation_lower_bound; + +extern PGDLLIMPORT int64 max_total_memory_bytes; +extern PGDLLIMPORT int32 max_total_memory_mb; + +/* These are the main entry points for memory tracking */ +extern void fork_tracked_memory(void); +static inline bool reserve_tracked_memory(int64 size, pg_allocator_type type); +static inline bool release_tracked_memory(int64 size, pg_allocator_type type); +extern void exit_tracked_memory(void); +extern void pgstat_init_memtrack(PgStatShared_Memtrack *global); + +/* Helper functions for memory tracking */ +static inline bool update_local_reservation(int64 size, pg_allocator_type type); +extern bool update_global_reservation(int64 size, pg_allocator_type type); + +/* pgstat helper functions */ +void pgstat_report_backend_memory(void); +void pgstat_report_postmaster_memory(void); +void pgstat_init_memtrack(PgStatShared_Memtrack *global); +void pgstat_backend_memory_reservation_cb(void); +int64 getContextMemoryTotal(void); + +/* SQL Callable functions */ +extern Datum pg_stat_get_memory_reservation(PG_FUNCTION_ARGS); +extern Datum pg_get_backend_memory_allocation(PG_FUNCTION_ARGS); +extern Datum pg_stat_get_global_memory_tracking(PG_FUNCTION_ARGS); + +/*-------------------------------------------- + * Keep track of memory coming from malloc()/free(). + * Replacing malloc()/free() calls with these routines + * keeps track of most Postgres memory allocations. + * For the other cases, allocate memory as desired and + * report the allocations using reserve_tracked_memory() + * and release_tracked_memory(). + *------------------------------------------*/ + +/* + * Allocate tracked memory using malloc. + */ +static inline void * +malloc_tracked(int64 size, pg_allocator_type type) +{ + void *ptr; + + /* reserve the memory if able to */ + if (!reserve_tracked_memory(size, type)) + return NULL; + + /* Allocate the memory, releasing the reservation if failed */ + ptr = malloc(size); + if (ptr == NULL) + release_tracked_memory(size, type); + + return ptr; +} + +/* + * Free memory which was allocated with malloc_tracked. + * Note: most mallocs have a non-portable method to + * get the size of a block of memory. Dropping the "size" parameter + * would greatly simplify the calling code. + */ +static inline void +free_tracked(void *ptr, int64 size, pg_allocator_type type) +{ + release_tracked_memory(size, type); +#ifdef CLOBBER_FREED_MEMORY + wipe_mem(ptr, size); +#endif + free(ptr); +} + + +/* + * Realloc tracked memory. + */ +static inline void * +realloc_tracked(void *block, int64 new_size, int64 old_size, pg_allocator_type type) +{ + void *ptr; + bool success; + + /* try to reserve the new memory size */ + int64 delta = new_size - old_size; + success = (delta > 0)? reserve_tracked_memory(delta, type) + : release_tracked_memory(-delta, type); + + /* If unable, free the memory and return NULL */ + if (!success) + { + free_tracked(block, old_size, type); + return NULL; + } + + /* Now, do the reallocation. If unsuccessful, release it. */ + ptr = realloc(block, new_size); + if (ptr == NULL) + release_tracked_memory(new_size, type); + + return ptr; +} + + +/*----------------------------------------------------------------- + * Report memory as it is allocated or released. + * These routines are inlined so the "fast path" through them is + * as efficient as possible. + *---------------------------------------------------------------*/ + +/* + * Report a desired increase in memory for this process. + * true if successful. + */ +static inline bool +reserve_tracked_memory(int64 size, pg_allocator_type type) +{ + Assert(size >= 0); + + /* CASE: no change in reserved memory. Do nothing. */ + if (size == 0) + return true; + + /* CASE: Bounds reached, take the slow path and update pgstat globals */ + if (my_memory.total + size >= reservation_upper_bound || type == PG_ALLOC_DSM) + return update_global_reservation(size, type); + + /* OTHERWISE: take the fast path and only update local variables */ + return update_local_reservation(size, type); +} + +/* ---------- + * Report a decrease in memory allocated for this process. + * Note we should have already called "reserve_tracked_memory". + * We should never end up with a negative subtotal, except + * for DSM memory when one process allocates it and another process + * releases it. + */ +static inline bool +release_tracked_memory(int64 size, pg_allocator_type type) +{ + Assert(size >= 0); + + /* CASE: no change in reserved memory. Do nothing. */ + if (size == 0) + return true; + + /* CASE: Bounds reached, take the slow path and update pgstat globals */ + if (my_memory.total - size <= reservation_lower_bound || type == PG_ALLOC_DSM) + return update_global_reservation(-size, type); + + /* OTHERWISE: take the fast path and only update local variables */ + return update_local_reservation(-size, type); +} + + +/* +* Fast path for reserving and releasing memory. +* This version is used for most allocations, and it +* is stripped down to the bare minimum to reduce impact +* on performance. It only updates private (non-shared) variables. +*/ +static inline bool +update_local_reservation(int64 size, pg_allocator_type type) +{ + /* Update our local memory counters. */ + my_memory.total += size; + my_memory.subTotal[type] += size; + + return true; +} + + +#endif /* //POSTGRES_IDE_MEMTRACK_H */ diff --git a/src/include/utils/memutils_internal.h b/src/include/utils/memutils_internal.h index a657430175..0afc02c444 100644 --- a/src/include/utils/memutils_internal.h +++ b/src/include/utils/memutils_internal.h @@ -31,6 +31,7 @@ extern void AllocSetStats(MemoryContext context, MemoryStatsPrintFunc printfunc, void *passthru, MemoryContextCounters *totals, bool print_to_stderr); +extern int64 AllocSetGetFreeMem(void); #ifdef MEMORY_CONTEXT_CHECKING extern void AllocSetCheck(MemoryContext context); #endif diff --git a/src/include/utils/pgstat_internal.h b/src/include/utils/pgstat_internal.h index 60fbf9394b..b3ec3a6446 100644 --- a/src/include/utils/pgstat_internal.h +++ b/src/include/utils/pgstat_internal.h @@ -355,6 +355,15 @@ typedef struct PgStatShared_Wal } PgStatShared_Wal; +typedef struct PgStatShared_Memtrack +{ + LWLock lock; + uint32 postmasterChangeCount; + PgStat_Memory postmasterMemory; + pg_atomic_uint64 total_memory_reserved; + pg_atomic_uint64 total_dsm_reserved; +} PgStatShared_Memtrack; + /* ---------- * Types and definitions for different kinds of variable-amount stats. @@ -394,7 +403,6 @@ typedef struct PgStatShared_ReplSlot PgStat_StatReplSlotEntry stats; } PgStatShared_ReplSlot; - /* * Central shared memory entry for the cumulative stats system. * @@ -433,6 +441,7 @@ typedef struct PgStat_ShmemControl PgStatShared_IO io; PgStatShared_SLRU slru; PgStatShared_Wal wal; + PgStatShared_Memtrack memtrack; } PgStat_ShmemControl; @@ -460,6 +469,8 @@ typedef struct PgStat_Snapshot PgStat_WalStats wal; + PgStat_Memtrack memtrack; + /* to free snapshot in bulk */ MemoryContext context; struct pgstat_snapshot_hash *stats; @@ -658,6 +669,10 @@ extern PgStat_SubXactStatus *pgstat_get_xact_stack_level(int nest_level); extern void pgstat_drop_transactional(PgStat_Kind kind, Oid dboid, Oid objoid); extern void pgstat_create_transactional(PgStat_Kind kind, Oid dboid, Oid objoid); +/* + * Functions in pgstat_memtrack.c + */ +extern void pgstat_memtrack_snapshot_cb(void); /* * Variables in pgstat.c diff --git a/src/test/modules/Makefile b/src/test/modules/Makefile index e81873cb5a..67c1ea18fb 100644 --- a/src/test/modules/Makefile +++ b/src/test/modules/Makefile @@ -21,6 +21,7 @@ SUBDIRS = \ test_ginpostinglist \ test_integerset \ test_lfind \ + test_memtrack \ test_misc \ test_oat_hooks \ test_parser \ diff --git a/src/test/modules/meson.build b/src/test/modules/meson.build index fcd643f6f1..72eb45767c 100644 --- a/src/test/modules/meson.build +++ b/src/test/modules/meson.build @@ -18,6 +18,7 @@ subdir('test_extensions') subdir('test_ginpostinglist') subdir('test_integerset') subdir('test_lfind') +subdir('test_memtrack') subdir('test_misc') subdir('test_oat_hooks') subdir('test_parser') diff --git a/src/test/modules/test_memtrack/.gitignore b/src/test/modules/test_memtrack/.gitignore new file mode 100644 index 0000000000..5dcb3ff972 --- /dev/null +++ b/src/test/modules/test_memtrack/.gitignore @@ -0,0 +1,4 @@ +# Generated subdirectories +/log/ +/results/ +/tmp_check/ diff --git a/src/test/modules/test_memtrack/Makefile b/src/test/modules/test_memtrack/Makefile new file mode 100644 index 0000000000..db44ca0575 --- /dev/null +++ b/src/test/modules/test_memtrack/Makefile @@ -0,0 +1,25 @@ +# src/test/modules/test_memtrack/Makefile + +MODULE_big = test_memtrack +OBJS = \ + $(WIN32RES) \ + test_memtrack.o \ + worker_pool.o +PGFILEDESC = "test_memtrack - example use of shared memory message queue" + +EXTENSION = test_memtrack +DATA = test_memtrack--1.0.sql + +REGRESS = test_memtrack +REGRESS_OPTS = --temp-config=$(top_srcdir)/src/test/modules/test_memtrack/test_memtrack.conf + +ifdef USE_PGXS +PG_CONFIG = pg_config +PGXS := $(shell $(PG_CONFIG) --pgxs) +include $(PGXS) +else +subdir = src/test/modules/test_memtrack +top_builddir = ../../../.. +include $(top_builddir)/src/Makefile.global +include $(top_srcdir)/contrib/contrib-global.mk +endif diff --git a/src/test/modules/test_memtrack/README b/src/test/modules/test_memtrack/README new file mode 100644 index 0000000000..75a6c1e84a --- /dev/null +++ b/src/test/modules/test_memtrack/README @@ -0,0 +1,36 @@ +test_memtrack tests the memory tracking facilities of PostgreSQL. + + +Functions +========= +test_allocation(# workers, type of memory, # blocks, block size) + + This function controls the test. It creates a pool of workers and passes them messages + asking them to allocate or free memory. It verifies the memory is allocated or freed, + and it confirms the total memory has been freed at the end of the test. + + Like test_shm_queue, this function spawns worker backends which communicate + through message queues. + + Unlike test_shm_queue, each worker is associatied with two queues, one for requests + and another for responses. All messages are exchanged between the main test backend + and the workers. The workers do not communicate with each other. + +test_memtrack(# workers, type of memory, # blocks, block size) + + This function is like test_allocation, but it only updates the memory tracking counters + and does not actually allocate or free memory. + + +Messages +======== +The message sent to the workers + request type (ALLOCATE or FREE) + type of memory + number of blocks to allocate or free + size of each block + +The response sent back. + error code (0 if no error) + starting memory usage totals and subtotals. + ending memory usage totals and subtotals. diff --git a/src/test/modules/test_memtrack/expected/test_memtrack.out b/src/test/modules/test_memtrack/expected/test_memtrack.out new file mode 100644 index 0000000000..79ea7450a5 --- /dev/null +++ b/src/test/modules/test_memtrack/expected/test_memtrack.out @@ -0,0 +1,169 @@ +-- verify the pg_stat_memory_allocation view exists +SELECT + pid > 0, total_reserved >= 0, init_reserved >= 0, aset_reserved >= 0, dsm_reserved >= 0, generation_reserved >= 0, slab_reserved >= 0 +FROM + pg_stat_memory_reservation limit 1; + ?column? | ?column? | ?column? | ?column? | ?column? | ?column? | ?column? +----------+----------+----------+----------+----------+----------+---------- + t | t | t | t | t | t | t +(1 row) + +-- verify the pg_stat_global_memory_tracking view exists +SELECT + total_memory_reserved >= 0, dsm_memory_reserved >= 0, total_memory_available >= 0, static_shared_memory >= 0 +FROM + pg_stat_global_memory_tracking; + ?column? | ?column? | ?column? | ?column? +----------+----------+----------+---------- + t | t | t | t +(1 row) + +-- verify some common backends have reserved memory +SELECT + total_reserved >= 0 AS result +FROM + pg_stat_activity ps + JOIN pg_stat_memory_reservation pa ON (pa.pid = ps.pid) +WHERE + backend_type IN ('checkpointer', 'background writer', 'walwriter', 'autovacuum launcher'); + result +-------- + t + t + t + t +(4 rows) + +-- For each process, the total should be the sum of subtotals +SELECT * +FROM + pg_stat_memory_reservation +WHERE total_reserved != (init_reserved + aset_reserved + dsm_reserved + generation_reserved + slab_reserved); + pid | total_reserved | init_reserved | aset_reserved | dsm_reserved | generation_reserved | slab_reserved +-----+----------------+---------------+---------------+--------------+---------------------+--------------- +(0 rows) + +-- For each process, the initial allocation is >= 1 MB +SELECT * +FROM + pg_stat_memory_reservation +WHERE + init_reserved < 1024*1024; + pid | total_reserved | init_reserved | aset_reserved | dsm_reserved | generation_reserved | slab_reserved +-----+----------------+---------------+---------------+--------------+---------------------+--------------- +(0 rows) + +-- For current backend, the top context should match the sum from the allocators. +SELECT * +FROM + pg_backend_memory_allocation +WHERE total_top_context_allocated <> (aset_allocated + generation_allocated + slab_allocated); + pid | total_top_context_allocated | aset_allocated | dsm_allocated | generation_allocated | slab_allocated +-----+-----------------------------+----------------+---------------+----------------------+---------------- +(0 rows) + +CREATE EXTENSION test_memtrack; +-- Make sure we can track memory usage of a single task +-- Since logic is the same for all memory managers, only test one. +SELECT test_memtrack(1, 1, 0, 1024); + test_memtrack +--------------- + +(1 row) + +-- Make sure we can track memory usage of multiple tasks. +-- By default we are limited to 8 tasks, so stay below the limit. +SELECT test_memtrack(5, 3, 1024, 5*1024); + test_memtrack +--------------- + +(1 row) + +-- Do it again. We had a bug where the second call would fail. +SELECT test_memtrack(5, 3, 1024, 5*1024); + test_memtrack +--------------- + +(1 row) + +-- Now we're going to actually do memory allocations. +-- We'll test each type of memory allocator. +-- Verify we can create and destroy contexts. +SELECT test_allocation(1,1,0,1024); + test_allocation +----------------- + +(1 row) + +SELECT test_allocation(1,2,0, 1024); + test_allocation +----------------- + +(1 row) + +SELECT test_allocation(1,3, 0,1024); + test_allocation +----------------- + +(1 row) + +SELECT test_allocation(1,4,0,1024); + test_allocation +----------------- + +(1 row) + +-- Create and free blocks of memory. +SELECT test_allocation(5,1,5*1024,1024); + test_allocation +----------------- + +(1 row) + +SELECT test_allocation(5,2,5,1024*1024); /* Fewer, don't exceed shmem limit */ + test_allocation +----------------- + +(1 row) + +SELECT test_allocation(5,3,5*1024,1024); + test_allocation +----------------- + +(1 row) + +SELECT test_allocation(5,4,5*1024,1024); + test_allocation +----------------- + +(1 row) + +-- Add up the private memory for each process and compare with the global total +-- The delta should be 0. +SELECT ABS(process_private - global_private) as delta +FROM + (SELECT SUM(total_reserved - dsm_reserved) AS process_private + FROM pg_stat_memory_reservation as p), + (SELECT (total_memory_reserved - dsm_memory_reserved - static_shared_memory) AS global_private + FROM pg_stat_global_memory_tracking as g); + delta +------- + 0 +(1 row) + +-- Verify the global dsm memory is at least the sum of processes dsm memory. +-- The global can be larger if some process pinned dsm and than exited. +SELECT * +FROM + (SELECT SUM(dsm_reserved) as process_dsm from pg_stat_memory_reservation), + (SELECT dsm_memory_reserved as global_dsm from pg_stat_global_memory_tracking) +WHERE + global_dsm < process_dsm; + process_dsm | global_dsm +-------------+------------ +(0 rows) + +-- Allocate more memory than we have available. +-- (this should fail because we configured max_total_memory to 1024 Mb) +SELECT test_memtrack(5, 2, 1024, 1024*1024*1024); +ERROR: worker ran out of memory diff --git a/src/test/modules/test_memtrack/meson.build b/src/test/modules/test_memtrack/meson.build new file mode 100644 index 0000000000..4b861ab62f --- /dev/null +++ b/src/test/modules/test_memtrack/meson.build @@ -0,0 +1,38 @@ +# Copyright (c) 2022-2023, PostgreSQL Global Development Group + +test_memtrack_sources = files( + 'test_memtrack.c', + 'worker_pool.c', +) + +if host_system == 'windows' + test_memtrack_sources += rc_lib_gen.process(win32ver_rc, extra_args: [ + '--NAME', 'test_memtrack', + '--FILEDESC', 'test_memtrack - test memory tracking',]) +endif + +test_memtrack = shared_module('test_memtrack', + test_memtrack_sources, + kwargs: pg_test_mod_args, +) +test_install_libs += test_memtrack + +test_install_data += files( + 'test_memtrack.control', + 'test_memtrack--1.0.sql', +) + +tests += { + 'name': 'test_memtrack', + 'sd': meson.current_source_dir(), + 'bd': meson.current_build_dir(), + 'regress': { + 'sql': [ + 'test_memtrack', + ], + 'regress_args': [ + '--temp-config', files('test_memtrack.conf'), + ], + + }, +} diff --git a/src/test/modules/test_memtrack/sql/test_memtrack.sql b/src/test/modules/test_memtrack/sql/test_memtrack.sql new file mode 100644 index 0000000000..9a1286d19d --- /dev/null +++ b/src/test/modules/test_memtrack/sql/test_memtrack.sql @@ -0,0 +1,91 @@ + +-- verify the pg_stat_memory_allocation view exists +SELECT + pid > 0, total_reserved >= 0, init_reserved >= 0, aset_reserved >= 0, dsm_reserved >= 0, generation_reserved >= 0, slab_reserved >= 0 +FROM + pg_stat_memory_reservation limit 1; + +-- verify the pg_stat_global_memory_tracking view exists +SELECT + total_memory_reserved >= 0, dsm_memory_reserved >= 0, total_memory_available >= 0, static_shared_memory >= 0 +FROM + pg_stat_global_memory_tracking; + +-- verify some common backends have reserved memory +SELECT + total_reserved >= 0 AS result +FROM + pg_stat_activity ps + JOIN pg_stat_memory_reservation pa ON (pa.pid = ps.pid) +WHERE + backend_type IN ('checkpointer', 'background writer', 'walwriter', 'autovacuum launcher'); + + +-- For each process, the total should be the sum of subtotals +SELECT * +FROM + pg_stat_memory_reservation +WHERE total_reserved != (init_reserved + aset_reserved + dsm_reserved + generation_reserved + slab_reserved); + +-- For each process, the initial allocation is >= 1 MB +SELECT * +FROM + pg_stat_memory_reservation +WHERE + init_reserved < 1024*1024; + +-- For current backend, the top context should match the sum from the allocators. +SELECT * +FROM + pg_backend_memory_allocation +WHERE total_top_context_allocated <> (aset_allocated + generation_allocated + slab_allocated); + +CREATE EXTENSION test_memtrack; + +-- Make sure we can track memory usage of a single task +-- Since logic is the same for all memory managers, only test one. +SELECT test_memtrack(1, 1, 0, 1024); + +-- Make sure we can track memory usage of multiple tasks. +-- By default we are limited to 8 tasks, so stay below the limit. +SELECT test_memtrack(5, 3, 1024, 5*1024); + +-- Do it again. We had a bug where the second call would fail. +SELECT test_memtrack(5, 3, 1024, 5*1024); + +-- Now we're going to actually do memory allocations. +-- We'll test each type of memory allocator. + +-- Verify we can create and destroy contexts. +SELECT test_allocation(1,1,0,1024); +SELECT test_allocation(1,2,0, 1024); +SELECT test_allocation(1,3, 0,1024); +SELECT test_allocation(1,4,0,1024); + +-- Create and free blocks of memory. +SELECT test_allocation(5,1,5*1024,1024); +SELECT test_allocation(5,2,5,1024*1024); /* Fewer, don't exceed shmem limit */ +SELECT test_allocation(5,3,5*1024,1024); +SELECT test_allocation(5,4,5*1024,1024); + +-- Add up the private memory for each process and compare with the global total +-- The delta should be 0. +SELECT ABS(process_private - global_private) as delta +FROM + (SELECT SUM(total_reserved - dsm_reserved) AS process_private + FROM pg_stat_memory_reservation as p), + (SELECT (total_memory_reserved - dsm_memory_reserved - static_shared_memory) AS global_private + FROM pg_stat_global_memory_tracking as g); + +-- Verify the global dsm memory is at least the sum of processes dsm memory. +-- The global can be larger if some process pinned dsm and than exited. +SELECT * +FROM + (SELECT SUM(dsm_reserved) as process_dsm from pg_stat_memory_reservation), + (SELECT dsm_memory_reserved as global_dsm from pg_stat_global_memory_tracking) +WHERE + global_dsm < process_dsm; + +-- Allocate more memory than we have available. +-- (this should fail because we configured max_total_memory to 1024 Mb) +SELECT test_memtrack(5, 2, 1024, 1024*1024*1024); diff --git a/src/test/modules/test_memtrack/test_memtrack--1.0.sql b/src/test/modules/test_memtrack/test_memtrack--1.0.sql new file mode 100644 index 0000000000..9a39f24ee4 --- /dev/null +++ b/src/test/modules/test_memtrack/test_memtrack--1.0.sql @@ -0,0 +1,20 @@ +/* src/test/modules/test_memtrack/test_memtrack--1.0.sql */ + +-- complain if script is sourced in psql, rather than via CREATE EXTENSION +\echo Use "CREATE EXTENSION test_memtrack" to load this file. \quit + +CREATE FUNCTION test_memtrack( + num_workers pg_catalog.int4 default 1, + type pg_catalog.int4 default 1, + num_blocks pg_catalog.int4 default 1, + block_size pg_catalog.int4 default 1024) + RETURNS pg_catalog.void STRICT + AS 'MODULE_PATHNAME' LANGUAGE C; + +CREATE FUNCTION test_allocation( + num_workers pg_catalog.int4 default 1, + type pg_catalog.int4 default 1, + num_blocks pg_catalog.int4 default 1, + block_size pg_catalog.int4 default 1024) + RETURNS pg_catalog.void STRICT +AS 'MODULE_PATHNAME' LANGUAGE C; diff --git a/src/test/modules/test_memtrack/test_memtrack.c b/src/test/modules/test_memtrack/test_memtrack.c new file mode 100644 index 0000000000..121b36405f --- /dev/null +++ b/src/test/modules/test_memtrack/test_memtrack.c @@ -0,0 +1,541 @@ +/*-------------------------------------------------------------------------- + * + * test_memstack.c + * Testing the memory tracking features. + * Creates a pool of workers which allocate/free memory, + * and verifies the totals are as expected. + * + * Copyright (c) 2013-2023, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/test/modules/test_memtrack/test_memtrack.c + * + * ------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "fmgr.h" +#include "miscadmin.h" +#include "pgstat.h" +#include "varatt.h" + +#include "storage/dsm.h" +#include "utils/backend_status.h" +#include "utils/memutils_internal.h" +#include "utils/memtrack.h" + +#include "worker_pool.h" + +PG_MODULE_MAGIC; + +PG_FUNCTION_INFO_V1(test_memtrack); +PG_FUNCTION_INFO_V1(test_allocation); + +/* Message sent to the worker */ +typedef enum +{ + ALLOCATE, RELEASE +} Action; +typedef struct AllocateRequest +{ + Action action; + pg_allocator_type type; + int32 nWorkers; + int32 nBlocks; + int64 blockSize; +} AllocateRequest; + +/* Message received back from the worker */ +typedef struct ResponseData +{ + int32 errorCode; + PgStat_Memory memory; + PgStat_Memory startingMemory; +} ResponseData; + +/* Forward references */ +static bool reserveBlocks(pg_allocator_type type, int nBlocks, int blockSize); +static bool releaseBlocks(pg_allocator_type type, int nBlocks, int blockSize); +static void validateArgs(int nWorkers, pg_allocator_type type, int nBlocks, int blockSize); +static void sendRequest(WorkerPool * pool, int worker, Action action, pg_allocator_type type, int nBlocks, int blockSize); +static void processReply(WorkerPool * pool, int worker, Action actions, pg_allocator_type type, int nBlocks, int blockSize); +static Datum exercise_worker(FunctionCallInfo fcinfo, char *workerFunction); +static void checkAllocations(); + +/* Test the memory tracking features standalone */ +PGDLLEXPORT Datum test_memtrack(PG_FUNCTION_ARGS); +PGDLLEXPORT void test_memtrack_worker(Datum arg); + +/* Test the memory tracking features with actual allocations */ +PGDLLEXPORT Datum test_allocation(PG_FUNCTION_ARGS); +PGDLLEXPORT void test_allocation_worker(Datum arg); + +/* + * Test the memory tracking features. + */ +Datum +test_memtrack(PG_FUNCTION_ARGS) +{ + return exercise_worker(fcinfo, "test_memtrack_worker"); +} + + +Datum +test_allocation(PG_FUNCTION_ARGS) +{ + return exercise_worker(fcinfo, "test_allocation_worker"); +} + + + +/* + * Test the memory tracking features + * Creates a pool of workers, issues requests, and verifies the results. + */ +static +Datum +exercise_worker(FunctionCallInfo fcinfo, char *workerFunction) +{ + WorkerPool pool[1]; + int64 delta, + starting_bkend_bytes; + int64 expected, + fudge; + PgStatShared_Memtrack *global = &pgStatLocal.shmem->memtrack; + + /* Get the SQL function arguments */ + int32 nWorkers = PG_GETARG_INT32(0); + pg_allocator_type type = PG_GETARG_INT32(1); + int32 nBlocks = PG_GETARG_INT32(2); + int32 blockSize = PG_GETARG_INT32(3); + + validateArgs(nWorkers, type, nBlocks, blockSize); + + /* Our global totals may be off by an allocation allowance per worker */ + fudge = nWorkers * allocation_allowance_refill_qty * nWorkers; + expected = nWorkers * nBlocks * blockSize; + + /* Set up pool of background workers */ + initWorkerPool(pool, nWorkers, sizeof(AllocateRequest) * 10, sizeof(ResponseData) * 10, + "test_memtrack", workerFunction); + + /* Remember the total memory before we start allocations */ + starting_bkend_bytes = pg_atomic_read_u32((void *) &global->total_memory_reserved); + + /* Tell the workers to start their first batch of allocations */ + for (int w = 0; w < nWorkers; w++) + sendRequest(pool, w, ALLOCATE, type, nBlocks, blockSize); + + /* Get the workers response and verify all is good */ + for (int w = 0; w < nWorkers; w++) + processReply(pool, w, ALLOCATE, type, nBlocks, blockSize); + + /* Confirm the total backend memory is greater than what we just allocated */ + delta = pg_atomic_read_u32((void *) &global->total_memory_reserved) - starting_bkend_bytes; + if (delta < expected - fudge || delta > expected + fudge) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("Total allocated memory %zd doesn't match expected %zd fudge=%zd\n", delta, expected, fudge))); + + /* Tell the workers to release their memory */ + for (int w = 0; w < nWorkers; w++) + sendRequest(pool, w, RELEASE, type, nBlocks, blockSize); + + /* Get the workers response and verify all is good */ + for (int w = 0; w < nWorkers; w++) + processReply(pool, w, RELEASE, type, nBlocks, blockSize); + + /* Verify the new total is reasonable */ + delta = pg_atomic_read_u32((void *) &global->total_memory_reserved) - starting_bkend_bytes; + if (delta < -fudge || delta > fudge) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("Total allocated memory %zd is less than expected %d\n", delta, 0))); + + /* Clean up */ + freeWorkerPool(pool); + PG_RETURN_VOID(); +} + + +/* + * Verify the arguments passed to the SQL function are valid. + */ +static +void +validateArgs(int nWorkers, pg_allocator_type type, int nBlocks, int blockSize) +{ + /* A negative blockcount is nonsensical. */ + if (nBlocks < 0) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("repeat count size must be an integer value greater than or equal to zero"))); + + /* a minimum of 1 worker is required. */ + if (nWorkers <= 0) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("number of workers must be an integer value greater than zero"))); + + /* block size must be > 0 */ + if (blockSize <= 0) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("block size must be an integer value greater than zero"))); + + /* Valid type? */ + if (type < 0 || type >= PG_ALLOC_TYPE_MAX) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("invalid allocation type"))); +} + +/* + * Send a request message to a worker. + */ +static +void +sendRequest(WorkerPool * pool, int worker, Action action, pg_allocator_type type, int nBlocks, int blockSize) +{ + AllocateRequest req; + int result; + + req = (AllocateRequest) + { + .nBlocks = nBlocks,.action = action,.type = type,.blockSize = blockSize + }; + + result = sendToWorker(pool, worker, &req, sizeof(req)); + if (result != SHM_MQ_SUCCESS) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("could not send message"))); +} + + +/* + * Receive a reply from a worker and verify the results make sense. + */ +void +processReply(WorkerPool * pool, int worker, Action action, pg_allocator_type type, int nBlocks, int blockSize) +{ + int64 delta; + ResponseData *rp; + ResponseData resp[1]; + Size len; + int result; + + /* Receive a message. Returns a pointer to the message and a length */ + result = recvFromWorker(pool, worker, (void *) &rp, &len); + if (result != SHM_MQ_SUCCESS) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("could not receive message"))); + + /* Copy response message in case the buffer isn't aligned */ + memcpy(resp, rp, sizeof(*resp)); + + /* Check for outof memory error */ + if (resp->errorCode != 0) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("worker ran out of memory"))); + + /* Verify the totals */ + delta = resp->memory.subTotal[type] - resp->startingMemory.subTotal[type]; + if (action == ALLOCATE && delta < nBlocks * blockSize / 2) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("worker(allocate) increased %zd bytes, expected %d", delta, nBlocks * blockSize))); + + if (action == RELEASE && delta > -nBlocks*blockSize / 2) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("worker(release) decreased %zd bytes, expected %d", -delta, -nBlocks * blockSize))); +} + + +/* + * Worker which bumps the memtrack counters without actually allocating anything. + */ +void +test_memtrack_worker(Datum arg) +{ + AllocateRequest *req; + Size reqSize; + + ResponseData resp[1]; + shm_mq_result result; + + workerInit(arg); + + /* + * Now that we're running, make note of how much memory has been already + * allocated + */ + + do + { + + result = workerRecv((void *) &req, &reqSize); + if (result != SHM_MQ_SUCCESS) + break; + + resp->startingMemory = my_memory; + + /* Allocate or release the blocks */ + if (req->action == ALLOCATE && reserveBlocks(req->type, req->nBlocks, req->blockSize)) + resp->errorCode = 0; + else if (req->action == RELEASE && releaseBlocks(req->type, req->nBlocks, req->blockSize)) + resp->errorCode = 0; + else + resp->errorCode = 1; + + /* Get the current memory totals */ + resp->memory = my_memory; + + /* Send the response */ + result = workerSend(resp, sizeof(resp[1])); + + } while (result == SHM_MQ_SUCCESS); + + workerExit(0); +} + + +bool +reserveBlocks(pg_allocator_type type, int nBlocks, int blockSize) +{ + /* Allocate the requested number of blocks */ + for (int i = 0; i < nBlocks; i++) + if (!reserve_tracked_memory(blockSize, type)) + return false; + + return true; +} + + +bool +releaseBlocks(pg_allocator_type type, int nBlocks, int blockSize) +{ + for (int i = 0; i < nBlocks; i++) + release_tracked_memory(blockSize, type); + + return true; +} + +/* Forward references for the allocation worker */ +static bool freeBlocks(pg_allocator_type type, int nBlocks, int blockSize); +static MemoryContext createTestContext(pg_allocator_type type); +static bool allocateBlocks(pg_allocator_type type, int nBlocks, int blockSize); +static bool allocateDSMBlocks(int nBlocks, int blockSize); +static bool freeDSMBlocks(int nBlocks, int blockSize); +static bool allocateContextBlocks(pg_allocator_type type, int nBlocks, int blockSize); +static bool freeContextBlocks(pg_allocator_type type, int nBlocks, int blockSize); + + +/* An array of pointers to the allocated blocks */ +static void **allocations; +static MemoryContext testContext; + + +/* + * Worker which actually allocates and releases memory. + */ +void +test_allocation_worker(Datum arg) +{ + AllocateRequest *req; + Size reqSize; + + ResponseData resp[1]; + shm_mq_result result; + + PgStat_Memory startingMemory; + + workerInit(arg); + + do + { + result = workerRecv((void *) &req, &reqSize); + if (result != SHM_MQ_SUCCESS) + break; + + startingMemory = my_memory; + + /* Allocate or release the blocks */ + if (req->action == ALLOCATE && allocateBlocks(req->type, req->nBlocks, req->blockSize)) + resp->errorCode = 0; + else if (req->action == RELEASE && freeBlocks(req->type, req->nBlocks, req->blockSize)) + resp->errorCode = 0; + else + resp->errorCode = 1; + + /* Get the current memory totals */ + resp->memory = my_memory; + resp->startingMemory = startingMemory; + + /* Send the response */ + result = workerSend(resp, sizeof(resp[1])); + + checkAllocations(); + + } while (result == SHM_MQ_SUCCESS); + + workerExit(0); +} + + + +static bool +allocateBlocks(pg_allocator_type type, int nBlocks, int blockSize) +{ + if (type == PG_ALLOC_DSM) + return allocateDSMBlocks(nBlocks, blockSize); + else + return allocateContextBlocks(type, nBlocks, blockSize); +} + +static bool +freeBlocks(pg_allocator_type type, int nBlocks, int blockSize) +{ + if (type == PG_ALLOC_DSM) + return freeDSMBlocks(nBlocks, blockSize); + else + return freeContextBlocks(type, nBlocks, blockSize); +} + + + +/* + * Create a memory context for the non-DSM memory allocations. + */ +static +MemoryContext +createTestContext(pg_allocator_type type) +{ + switch (type) + { + case PG_ALLOC_ASET: + return AllocSetContextCreate(TopMemoryContext, "test_aset", 0, 1024, 16 * 1024); + case PG_ALLOC_SLAB: + return SlabContextCreate(TopMemoryContext, "test_slab", 16 * 1024, 1024); + case PG_ALLOC_GENERATION: + return GenerationContextCreate(TopMemoryContext, "test_gen", 0, 1024, 16 * 1024); + default: + return NULL; + + } +} + + +/* + * Allocate blocks of memory from a non-DSM context. + */ +static +bool +allocateContextBlocks(pg_allocator_type type, int nBlocks, int blockSize) +{ + MemoryContext old; + + if (type == PG_ALLOC_DSM || type == PG_ALLOC_INIT) + return false; + + /* Create a memory context for the allocations */ + testContext = createTestContext(type); + if (testContext == NULL) + return false; + + /* Switch to the new context */ + old = MemoryContextSwitchTo(testContext); + + /* Create a list of block pointers - not in the context */ + allocations = malloc(sizeof(void *) * nBlocks); + if (allocations == NULL) + return false; + + /* Allocate the requested number of blocks */ + for (int i = 0; i < nBlocks; i++) + { + allocations[i] = palloc(blockSize); + if (allocations[i] == NULL) + return false; + } + + /* Switch back to the old context */ + MemoryContextSwitchTo(old); + + return true; +} + + +/* + * Free blocks of memory allocated earlier + */ +static +bool +freeContextBlocks(pg_allocator_type type, int nBlocks, int blockSize) +{ + for (int i = 0; i < nBlocks; i++) + pfree(allocations[i]); + + MemoryContextDelete(testContext); + free(allocations); + allocations = NULL; + + return true; +} + + +/* + * Allocate blocks of memory from DSM + */ +static bool +allocateDSMBlocks(int nBlocks, int blockSize) +{ + + /* Create a list of block pointers - not in the context */ + allocations = malloc(sizeof(void *) * nBlocks); + if (allocations == NULL) + return false; + + for (int i = 0; i < nBlocks; i++) + { + allocations[i] = dsm_create(blockSize, 0); + if (allocations[i] == NULL) + return false; + } + + return true; +} + + +/* + * Free blocks of DSM memory allocated earlier + */ +static bool +freeDSMBlocks(int nBlocks, int blockSize) +{ + for (int i = 0; i < nBlocks; i++) + dsm_detach(allocations[i]); + + free(allocations); + allocations = NULL; + + + return true; +} + + +/* + * Compare top context allocations with our private memory numbers. + * They should be the same. + */ +static void +checkAllocations() +{ + int64 my_private = my_memory.total - my_memory.subTotal[PG_ALLOC_INIT] - my_memory.subTotal[PG_ALLOC_DSM]; + int64 context = getContextMemoryTotal(); + Assert(my_private == context); +} diff --git a/src/test/modules/test_memtrack/test_memtrack.conf b/src/test/modules/test_memtrack/test_memtrack.conf new file mode 100644 index 0000000000..0b26f5d74a --- /dev/null +++ b/src/test/modules/test_memtrack/test_memtrack.conf @@ -0,0 +1 @@ +max_total_memory = 8192 diff --git a/src/test/modules/test_memtrack/test_memtrack.control b/src/test/modules/test_memtrack/test_memtrack.control new file mode 100644 index 0000000000..548109e313 --- /dev/null +++ b/src/test/modules/test_memtrack/test_memtrack.control @@ -0,0 +1,4 @@ +comment = 'Test code for PostgreSQL memory tracking' +default_version = '1.0' +module_pathname = '$libdir/test_memtrack' +relocatable = true diff --git a/src/test/modules/test_memtrack/worker_pool.c b/src/test/modules/test_memtrack/worker_pool.c new file mode 100644 index 0000000000..8a91738b8c --- /dev/null +++ b/src/test/modules/test_memtrack/worker_pool.c @@ -0,0 +1,433 @@ +/*------------------------------------------------------------------------- + * + * worker_pool.c + * create a pool of workers for parallel testing. + * + * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * + * IDENTIFICATION + * src/test/modules/test_memtrack/worker_pool.c + * + *------------------------------------------------------------------------- + */ + + +#include "postgres.h" +#include "storage/s_lock.h" +#include "postmaster/bgworker.h" +#include "storage/dsm.h" +#include "storage/shm_toc.h" +#include "tcop/tcopprot.h" +#include "storage/procarray.h" +#include "storage/ipc.h" +#include "storage/latch.h" +#include "worker_pool.h" +#include "utils/memutils.h" +#include "miscadmin.h" +#include "storage/barrier.h" +#include "utils/backend_status.h" +#include "utils/memtrack.h" + + +#define WORKER_POOL_MAGIC 0x7843732 + +/* Forward references */ +static int64 estimateDsmSize(int nWorkers, int inSize, int outSize); +static void cleanupWorkers(dsm_segment *seg, Datum arg); +static MemQue attachToQueue(dsm_segment *seg, shm_toc *toc, int workerIdx, int queueIdx, bool isSender); +static void quit_silently(SIGNAL_ARGS); + +/* + * Create a new pool of backend workers. + */ +void initWorkerPool(WorkerPool *pool, int nWorkers, int inSize, int outSize, char *libName, char *procName) +{ + int64 dsmSize; + MemoryContext oldcontext; + BackgroundWorker worker; + pid_t pid; + + /* + * We need the worker pool objects to allocated in CurTransactionContext + * rather than ExprContext; otherwise, they'll be destroyed before the + * on_dsm_detach hooks run. + */ + oldcontext = MemoryContextSwitchTo(CurTransactionContext); + + /* + * Create local worker pool object and allocate arrays for the headers and + * queues. + */ + pool->nWorkers = nWorkers; + pool->inQ = MemoryContextAlloc(TopTransactionContext, sizeof(MemQue) * nWorkers); + pool->outQ = MemoryContextAlloc(TopTransactionContext, sizeof(MemQue) * nWorkers); + pool->handle = MemoryContextAlloc(TopTransactionContext, sizeof(BackgroundWorkerHandle *) * nWorkers); + + /* Estimate the size of the shared memory and allocate shared memory */ + dsmSize = estimateDsmSize(nWorkers, inSize, outSize); + pool->seg = dsm_create(dsmSize, 0); + + /* Create table of contents in dsm so we can access contents */ + pool->toc = shm_toc_create(WORKER_POOL_MAGIC, dsm_segment_address(pool->seg), dsmSize); + + /* Set up the startup header as region 0 */ + pool->hdr = shm_toc_allocate(pool->toc, sizeof(WorkerPoolStartup)); + shm_toc_insert(pool->toc, 0, pool->hdr); + pool->hdr->nWorkers = nWorkers; + pool->hdr->dbOid = MyDatabaseId; + pool->hdr->userOid = GetAuthenticatedUserId(); + pg_atomic_init_u32(&pool->hdr->nextWorker, 0); + BarrierInit(pool->hdr->barrier, nWorkers + 1); + + /* Create memory queues for each worker */ + for (int w = 0; w < nWorkers; w++) + { + shm_mq *mqIn, + *mqOut; + + /* Allocate the "In" queue */ + mqIn = shm_toc_allocate(pool->toc, shm_mq_minimum_size + inSize); + shm_toc_insert(pool->toc, 1 + 2 * w, mqIn); + mqIn = shm_mq_create(mqIn, shm_mq_minimum_size + inSize); + + /* Allocate the "Out" queue */ + mqOut = shm_toc_allocate(pool->toc, shm_mq_minimum_size + outSize); + shm_toc_insert(pool->toc, 2 + 2 * w, mqOut); + mqOut = shm_mq_create(mqOut, shm_mq_minimum_size + outSize); + } + + /* + * Arrange to kill all the workers if we abort before all workers are + * finished hooking themselves up to the dynamic shared memory segment. + * + * If we die after all the workers have finished hooking themselves up to + * the dynamic shared memory segment, we'll mark the two queues to which + * we're directly connected as detached, and the worker(s) connected to + * those queues will exit, marking any other queues to which they are + * connected as detached. This will cause any as-yet-unaware workers + * connected to those queues to exit in their turn, and so on, until + * everybody exits. + * + * But suppose the workers which are supposed to connect to the queues to + * which we're directly attached exit due to some error before they + * actually attach the queues. The remaining workers will have no way of + * knowing this. From their perspective, they're still waiting for those + * workers to start, when in fact they've already died. + */ + on_dsm_detach(pool->seg, cleanupWorkers, PointerGetDatum(pool)); + + /* Configure a prototypical worker. */ + worker = (BackgroundWorker) + { + .bgw_flags = BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION, + .bgw_start_time = BgWorkerStart_ConsistentState, + .bgw_restart_time = BGW_NEVER_RESTART, + .bgw_notify_pid = MyProcPid, + .bgw_main_arg = UInt32GetDatum(dsm_segment_handle(pool->seg)), + }; + strlcpy(worker.bgw_library_name, libName, sizeof(worker.bgw_library_name)); + strlcpy(worker.bgw_function_name, procName, sizeof(worker.bgw_function_name)); + snprintf(worker.bgw_type, sizeof(worker.bgw_type), "%s worker", libName); + snprintf(worker.bgw_name, sizeof(worker.bgw_name), "%s/%s worker for [%d]", libName, procName, MyProcPid); + + /* Do for each worker */ + for (int w = 0; w < nWorkers; w++) + { + /* Create the worker */ + if (!RegisterDynamicBackgroundWorker(&worker, &pool->handle[w])) + ereport(ERROR, + (errcode(ERRCODE_INSUFFICIENT_RESOURCES), + errmsg("could not register background process"), + errhint("You may need to increase max_worker_processes."))); + + /* Attach the worker's memory queues */ + pool->inQ[w] = attachToQueue(pool->seg, pool->toc, w, 0, true); + pool->outQ[w] = attachToQueue(pool->seg, pool->toc, w, 1, false); + } + + /* + * Wait for workers to become ready. We could just wait on the barrier, + * but if a worker fails to reach the barrier, we would end up waiting + * forever. + */ + for (int w = 0; w < nWorkers; w++) + if (WaitForBackgroundWorkerStartup(pool->handle[w], &pid) != BGWH_STARTED) + ereport(ERROR, + (errcode(ERRCODE_INSUFFICIENT_RESOURCES), + errmsg("could not start background process"), + errhint("You may need to increase max_worker_processes."))); + + /* Wait for workers to attach to the shared memory segment and their ques */ + BarrierArriveAndWait(pool->hdr->barrier, 0); + + /* + * Once we reach this point, all workers are ready. We no longer need to + * kill them if we die; they'll die on their own as the message queues + * shut down. + */ + cancel_on_dsm_detach(pool->seg, cleanupWorkers, PointerGetDatum(pool)); + + /* Resume using the original memory context */ + MemoryContextSwitchTo(oldcontext); +} + + +shm_mq_result +sendToWorker(WorkerPool * pool, int workerIdx, void *msg, Size len) +{ + shm_mq_result result; + + result = shm_mq_send(pool->inQ[workerIdx], len, msg, true, true); + CHECK_FOR_INTERRUPTS(); + + return result; +} + +shm_mq_result +recvFromWorker(WorkerPool * pool, int workerIdx, void **msg, Size *len) +{ + shm_mq_result result; + + result = shm_mq_receive(pool->outQ[workerIdx], len, msg, false); + CHECK_FOR_INTERRUPTS(); + + return result; +} + +void +freeWorkerPool(WorkerPool * pool) +{ + dsm_segment *seg; + + /* Only free the pool once. (possibly reentrant) */ + if (pool->seg == NULL) + return; + seg = pool->seg; + pool->seg = NULL; + + /* We are already cleaning up, so don't do again when we detach */ + cancel_on_dsm_detach(seg, cleanupWorkers, PointerGetDatum(pool)); + + /* Terminate the background workers */ + cleanupWorkers(seg, PointerGetDatum(pool)); + + /* Detach from the message queues */ + for (int w = 0; w < pool->nWorkers; w++) + { + shm_mq_detach(pool->inQ[w]); + shm_mq_detach(pool->outQ[w]); + } + + /* Detach and destroy the shared memory segment (if we haven't already) */ + dsm_detach(seg); + + /* Free the pool object */ + pfree(pool->inQ); + pfree(pool->outQ); + pfree(pool->handle); + + /* Reset to zero to avoid dangling pointers. */ + *pool = (WorkerPool){.inQ=NULL, .outQ=NULL, .handle=NULL}; +} + + +/* Wrapper to remove workers when detaching from dsm */ +static void +cleanupWorkers(dsm_segment *seg, Datum arg) +{ + WorkerPool *pool = (WorkerPool *) DatumGetPointer(arg); + + for (int w = 0; w < pool->nWorkers; w++) + TerminateBackgroundWorker(pool->handle[w]); + for (int w = 0; w < pool->nWorkers; w++) + WaitForBackgroundWorkerShutdown(pool->handle[w]); + + pool->nWorkers = 0; +} + + +/* + * Estimate how much shared memory we need for thw pool of workers. + * + * Because the TOC machinery may choose to insert padding of oddly-sized + * requests, we must estimate each chunk separately. + * + * We need one key to register the location of the header, and we need + * nworkers * 2 keys to track the locations of the message queues. + */ +static +int64 +estimateDsmSize(int nWorkers, int inSize, int outSize) +{ + shm_toc_estimator e[1]; + + shm_toc_initialize_estimator(e); + + shm_toc_estimate_keys(e, 1 + 2 * nWorkers); + + shm_toc_estimate_chunk(e, sizeof(WorkerPoolStartup)); + + for (int w = 0; w < nWorkers; w++) + { + shm_toc_estimate_chunk(e, shm_mq_minimum_size + inSize); + shm_toc_estimate_chunk(e, shm_mq_minimum_size + outSize); + } + + return shm_toc_estimate(e); +} + + + +/* + * ---------------------------------------------------------------- + * Worker process code. + * ---------------------------------------------------------------- + */ + +static dsm_segment *seg; +static shm_toc *toc; + +static int myWorkerNumber; +static MemQue inQ; +static MemQue outQ; +static WorkerPoolStartup * hdr; + + +/* + * Worker receives a a message. + */ +shm_mq_result +workerRecv(void **msg, Size *msgSize) +{ + shm_mq_result result; + + result = shm_mq_receive(inQ, msgSize, msg, false); + + CHECK_FOR_INTERRUPTS(); + return result; +} + + +/* + * Worker replies to a message + */ +shm_mq_result +workerSend(void *msg, Size msgSize) +{ + shm_mq_result result; + + result = shm_mq_send(outQ, msgSize, msg, false, true); + + CHECK_FOR_INTERRUPTS(); + return result; +} + + +/* + * This must be called from the worker process's main function. + */ +void +workerInit(Datum arg) +{ + dsm_handle handle; + + /* We are passed the dsm handle of the worker pool */ + handle = DatumGetInt32(arg); + + /* + * Establish signal handlers. + * + * We want CHECK_FOR_INTERRUPTS() to kill off this worker process just as + * it would a normal user backend. To make that happen, we use die(). + */ + pqsignal(SIGTERM, quit_silently); + BackgroundWorkerUnblockSignals(); + + /* + * Connect to the dynamic shared memory segment. + * + * The backend that registered this worker passed us the ID of a shared + * memory segment to which we must attach for further instructions. Once + * we've mapped the segment in our address space, attach to the table of + * contents so we can locate the various data structures we'll need to + * find within the segment. + * + * Note: at this point, we have not created any ResourceOwner in this + * process. This will result in our DSM mapping surviving until process + * exit, which is fine. If there were a ResourceOwner, it would acquire + * ownership of the mapping, but we have no need for that. + */ + seg = dsm_attach(handle); + if (seg == NULL) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("unable to map dynamic shared memory segment"))); + toc = shm_toc_attach(WORKER_POOL_MAGIC, dsm_segment_address(seg)); + if (toc == NULL) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("bad magic number in dynamic shared memory segment"))); + + /* Attach to the startup header and get our worker idx */ + hdr = shm_toc_lookup(toc, 0, false); + myWorkerNumber = pg_atomic_fetch_add_u32(&hdr->nextWorker, 1); + if (myWorkerNumber >= hdr->nWorkers) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("too many workers created in pool. Max=%d", hdr->nWorkers))); + + /* Attach to the in and out message queues. */ + inQ = attachToQueue(seg, toc, myWorkerNumber, 0, false); + outQ = attachToQueue(seg, toc, myWorkerNumber, 1, true); + CHECK_FOR_INTERRUPTS(); + + /* Establish connection to the generic server TODO: pass info in dsm */ + BackgroundWorkerInitializeConnectionByOid(hdr->dbOid, hdr->userOid, 0); + + /* Wait for everybody else to become ready */ + BarrierArriveAndWait(hdr->barrier, 0); + CHECK_FOR_INTERRUPTS(); +} + + +void +workerExit(int code) +{ + /* + * We're done. For cleanliness, explicitly detach from the shared memory + * segment (that would happen anyway during process exit, though). + */ + dsm_detach(seg); + proc_exit(code); +} + + +static MemQue attachToQueue(dsm_segment *seg, shm_toc *toc, int workerIdx, int queueIdx, bool isSender) +{ + MemQue que; + shm_mq *mq; + + /* Attach to the appropriate message queues. */ + mq = shm_toc_lookup(toc, 1 + 2 * workerIdx + queueIdx, false); + + /* Make note whether we are sending or receiving */ + if (isSender) + shm_mq_set_sender(mq, MyProc); + else + shm_mq_set_receiver(mq, MyProc); + + /* Attach to the queue */ + que = shm_mq_attach(mq, seg, NULL); + + CHECK_FOR_INTERRUPTS(); + return que; +} + + +static void +quit_silently(SIGNAL_ARGS) +{ + exit(0); +} diff --git a/src/test/modules/test_memtrack/worker_pool.h b/src/test/modules/test_memtrack/worker_pool.h new file mode 100644 index 0000000000..ea1fcc4cb0 --- /dev/null +++ b/src/test/modules/test_memtrack/worker_pool.h @@ -0,0 +1,98 @@ +/* ------------------------------------------------------------------------- + * Implement a pool of workers which can be used to perform work in parallel. + * This version is derived from the postgres test_shm_mq module, but it is + * intended to be more general purpose. Besides being useful for running + * tests, it could eventually integrate with the implementation of parallel query. + * For now, it is just a piece of infrastructure for running parallel tests. + * + * The worker pool is created by the owner process. It creates a shared memory + * segment and a set of background workers. Each background worker is provided + * with three message queues: + * - inQ: used to send messages to the worker + * - outQ: used to reply to the owner + * - errQ: used to send error exceptions back to the owner. + * Note: errQ is not yet implemented. + * + * The flow of control for the owner is: + * WorkerPool pool[1]; + * initWorkerPool(pool, nWorkers, inSize, outSize, libName, procName) + * repeat + * sendToWorker(pool, workerIdx, msg, len) + * recvFromWorker(pool, workerIdx, &msg, &len) + * until done + * freeWorkerPool(pool) + * + * The flow of control for the worker is: + * workerInit(arg) + * repeat + * workerRecv(&msg, &len) + * workerSend(msg, len) + * until done + * workerExit() + * + * If the workers don't exit on their own, they will be terminated when + * the owner calls freeWorkerPool. + * + * Currently, the worker entry point is passed as the text name of a procedure + * and the text name of a shared library. If the library name is "postgres", + * then the procedure is assumed to be in the main postgres executable. + * --------------------------------------------------------------------------- */ + +#ifndef WORKER_POOL_H +#define WORKER_POOL_H +#include +#include "postgres.h" +#include "storage/dsm.h" +#include "storage/shm_toc.h" +#include "storage/shm_mq.h" +#include "storage/s_lock.h" +#include "storage/barrier.h" + +/* Alias for our convenience */ +typedef shm_mq_handle *MemQue; + +/* + * This structure is stored in the dynamic shared memory segment at toc(0). + * We use it to assign worker numbers to each worker. + */ +typedef struct WorkerPoolStartup +{ + uint32 nWorkers; + pg_atomic_uint32 nextWorker; + Barrier barrier[1]; + Oid dbOid; + Oid userOid; +} WorkerPoolStartup; + +/* + * Resides in the woker pool owner. + * Manages overall control of the workers. + */ +typedef struct WorkerPool +{ + dsm_segment *seg; + shm_toc *toc; + int nWorkers; + MemQue *inQ; + MemQue *outQ; + BackgroundWorkerHandle **handle; + WorkerPoolStartup *hdr; +} WorkerPool; + +/* + * Procedures called from the worker pool owner + */ +void initWorkerPool(WorkerPool *pool, int nWorkers, int inSize, int outSize, char *libName, char *procName); +shm_mq_result sendToWorker(WorkerPool * pool, int workerIdx, void *msg, Size len); +shm_mq_result recvFromWorker(WorkerPool * pool, int workerIdx, void **msg, Size *len); +void freeWorkerPool(WorkerPool * pool); + +/* + * Procedures called from the worker + */ +void workerInit(Datum arg); +shm_mq_result workerRecv(void **msg, Size *msgSize); +shm_mq_result workerSend(void *msg, Size msgSize); +void workerExit(int code); + +#endif /* //WORKER_POOL_H */ diff --git a/src/test/regress/expected/opr_sanity.out b/src/test/regress/expected/opr_sanity.out index 7a6f36a6a9..6c813ec465 100644 --- a/src/test/regress/expected/opr_sanity.out +++ b/src/test/regress/expected/opr_sanity.out @@ -468,9 +468,11 @@ WHERE proallargtypes IS NOT NULL AND ARRAY(SELECT proallargtypes[i] FROM generate_series(1, array_length(proallargtypes, 1)) g(i) WHERE proargmodes IS NULL OR proargmodes[i] IN ('i', 'b', 'v')); - oid | proname | proargtypes | proallargtypes | proargmodes ------+---------+-------------+----------------+------------- -(0 rows) + oid | proname | proargtypes | proallargtypes | proargmodes +------+----------------------------------+-------------+---------------------------+------------------- + 9890 | pg_stat_get_memory_reservation | | {23,23,20,20,20,20,20,20} | {i,o,o,o,o,o,o,o} + 9891 | pg_get_backend_memory_allocation | | {23,23,20,20,20,20,20} | {i,o,o,o,o,o,o} +(2 rows) -- Check for type of the variadic array parameter's elements. -- provariadic should be ANYOID if the type of the last element is ANYOID, diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out index 1442c43d9c..1c50d396f7 100644 --- a/src/test/regress/expected/rules.out +++ b/src/test/regress/expected/rules.out @@ -1303,6 +1303,13 @@ pg_available_extensions| SELECT e.name, e.comment FROM (pg_available_extensions() e(name, default_version, comment) LEFT JOIN pg_extension x ON ((e.name = x.extname))); +pg_backend_memory_allocation| SELECT pid, + total_top_context_allocated, + aset_allocated, + dsm_allocated, + generation_allocated, + slab_allocated + FROM pg_get_backend_memory_allocation() pg_get_backend_memory_allocation(pid, total_top_context_allocated, aset_allocated, dsm_allocated, generation_allocated, slab_allocated); pg_backend_memory_contexts| SELECT name, ident, parent, @@ -1872,6 +1879,11 @@ pg_stat_database_conflicts| SELECT oid AS datid, pg_stat_get_db_conflict_startup_deadlock(oid) AS confl_deadlock, pg_stat_get_db_conflict_logicalslot(oid) AS confl_active_logicalslot FROM pg_database d; +pg_stat_global_memory_tracking| SELECT total_memory_reserved, + dsm_memory_reserved, + total_memory_available, + static_shared_memory + FROM pg_stat_get_global_memory_tracking() pg_stat_get_global_memory_tracking(total_memory_reserved, dsm_memory_reserved, total_memory_available, static_shared_memory); pg_stat_gssapi| SELECT pid, gss_auth AS gss_authenticated, gss_princ AS principal, @@ -1898,6 +1910,14 @@ pg_stat_io| SELECT backend_type, fsync_time, stats_reset FROM pg_stat_get_io() b(backend_type, object, context, reads, read_time, writes, write_time, writebacks, writeback_time, extends, extend_time, op_bytes, hits, evictions, reuses, fsyncs, fsync_time, stats_reset); +pg_stat_memory_reservation| SELECT pid, + total_reserved, + init_reserved, + aset_reserved, + dsm_reserved, + generation_reserved, + slab_reserved + FROM pg_stat_get_memory_reservation() pg_stat_get_memory_reservation(pid, total_reserved, init_reserved, aset_reserved, dsm_reserved, generation_reserved, slab_reserved); pg_stat_progress_analyze| SELECT s.pid, s.datid, d.datname, diff --git a/src/test/regress/regress.c b/src/test/regress/regress.c index bcbc6d910f..be93901912 100644 --- a/src/test/regress/regress.c +++ b/src/test/regress/regress.c @@ -55,6 +55,8 @@ #expr, __FILE__, __LINE__); \ } while (0) +#define EXPECT_FALSE(expr) EXPECT_TRUE(!(expr)) + #define EXPECT_EQ_U32(result_expr, expected_expr) \ do { \ uint32 actual_result = (result_expr); \ @@ -778,6 +780,7 @@ test_atomic_uint64(void) { pg_atomic_uint64 var; uint64 expected; + uint64 oldval; int i; pg_atomic_init_u64(&var, 0); @@ -818,6 +821,18 @@ test_atomic_uint64(void) EXPECT_EQ_U64(pg_atomic_fetch_and_u64(&var, ~1), 1); /* no bits set anymore */ EXPECT_EQ_U64(pg_atomic_fetch_and_u64(&var, ~0), 0); + + /* Verify limit counters add and respect the limit */ + pg_atomic_write_u64(&var, 0); + EXPECT_TRUE(pg_atomic_fetch_add_limit_u64(&var, 1, 1, &oldval)); + EXPECT_EQ_U64(pg_atomic_read_u64(&var), 1); + EXPECT_EQ_U64(oldval, 0); + EXPECT_FALSE(pg_atomic_fetch_add_limit_u64(&var, 1, 1, &oldval)); + EXPECT_EQ_U64(pg_atomic_read_u64(&var), 1); + EXPECT_EQ_U64(oldval, 1); + EXPECT_FALSE(pg_atomic_fetch_add_limit_u64(&var, ULONG_MAX, ULONG_MAX, &oldval)); + EXPECT_EQ_U64(pg_atomic_read_u64(&var), 1); + EXPECT_EQ_U64(oldval, 1); } /* -- 2.33.0