Use WALReadFromBuffers in more places
Hi,
Commit 91f2cae7a4e that introduced WALReadFromBuffers only used it for
physical walsenders. It can also be used in more places benefitting
logical walsenders, backends running pg_walinspect and logical
decoding functions if the WAL is available in WAL buffers. I'm
attaching a 0001 patch for this.
While at it, I've also added a test module in 0002 patch to
demonstrate 2 things: 1) how the caller can ensure the requested WAL
is fully copied to WAL buffers using WaitXLogInsertionsToFinish before
reading from WAL buffers. 2) how one can implement an xlogreader
page_read callback to read unflushed/not-yet-flushed WAL directly from
WAL buffers. FWIW, a separate test module to explicitly test the new
function is suggested here -
/messages/by-id/CAFiTN-sE7CJn-ZFj+-0Wv6TNytv_fp4n+eCszspxJ3mt77t5ig@mail.gmail.com.
Please have a look at the attached patches.
I will register this for the next commit fest.
--
Bharath Rupireddy
PostgreSQL Contributors Team
RDS Open Source Databases
Amazon Web Services: https://aws.amazon.com
Attachments:
v1-0001-Use-WALReadFromBuffers-in-more-places.patchapplication/octet-stream; name=v1-0001-Use-WALReadFromBuffers-in-more-places.patchDownload
From 4525c65690ce801fd67ebb52fe1a50adfa7b1f61 Mon Sep 17 00:00:00 2001
From: Bharath Rupireddy <bharath.rupireddyforpostgres@gmail.com>
Date: Wed, 24 Apr 2024 15:21:48 +0000
Subject: [PATCH v1 1/2] Use WALReadFromBuffers in more places
---
src/backend/access/transam/xlogutils.c | 13 ++++++++++++-
src/backend/replication/walsender.c | 16 +++++++++++++---
2 files changed, 25 insertions(+), 4 deletions(-)
diff --git a/src/backend/access/transam/xlogutils.c b/src/backend/access/transam/xlogutils.c
index 5295b85fe0..1e1f5b5306 100644
--- a/src/backend/access/transam/xlogutils.c
+++ b/src/backend/access/transam/xlogutils.c
@@ -892,6 +892,8 @@ read_local_xlog_page_guts(XLogReaderState *state, XLogRecPtr targetPagePtr,
int count;
WALReadError errinfo;
TimeLineID currTLI;
+ Size nbytes;
+ Size rbytes;
loc = targetPagePtr + reqLen;
@@ -1004,7 +1006,16 @@ read_local_xlog_page_guts(XLogReaderState *state, XLogRecPtr targetPagePtr,
count = read_upto - targetPagePtr;
}
- if (!WALRead(state, cur_page, targetPagePtr, count, tli,
+ /* attempt to read WAL from WAL buffers first */
+ nbytes = count;
+ rbytes = WALReadFromBuffers(cur_page, targetPagePtr, nbytes, currTLI);
+ cur_page += rbytes;
+ targetPagePtr += rbytes;
+ nbytes -= rbytes;
+
+ /* now read the remaining WAL from WAL file */
+ if (nbytes > 0 &&
+ !WALRead(state, cur_page, targetPagePtr, nbytes, tli,
&errinfo))
WALReadRaiseError(&errinfo);
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 9bf7c67f37..1ee3b94888 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -1056,6 +1056,8 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req
WALReadError errinfo;
XLogSegNo segno;
TimeLineID currTLI;
+ Size nbytes;
+ Size rbytes;
/*
* Make sure we have enough WAL available before retrieving the current
@@ -1092,11 +1094,19 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req
else
count = flushptr - targetPagePtr; /* part of the page available */
- /* now actually read the data, we know it's there */
- if (!WALRead(state,
+ /* attempt to read WAL from WAL buffers first */
+ nbytes = count;
+ rbytes = WALReadFromBuffers(cur_page, targetPagePtr, nbytes, currTLI);
+ cur_page += rbytes;
+ targetPagePtr += rbytes;
+ nbytes -= rbytes;
+
+ /* now read the remaining WAL from WAL file */
+ if (nbytes > 0 &&
+ !WALRead(state,
cur_page,
targetPagePtr,
- count,
+ nbytes,
currTLI, /* Pass the current TLI because only
* WalSndSegmentOpen controls whether new TLI
* is needed. */
--
2.34.1
v1-0002-Add-test-module-to-demonstrate-reading-from-WAL-b.patchapplication/octet-stream; name=v1-0002-Add-test-module-to-demonstrate-reading-from-WAL-b.patchDownload
From c9d4900430d9c709f13abfcda74d14a4ff45b86b Mon Sep 17 00:00:00 2001
From: Bharath Rupireddy <bharath.rupireddyforpostgres@gmail.com>
Date: Wed, 24 Apr 2024 15:22:12 +0000
Subject: [PATCH v1 2/2] Add test module to demonstrate reading from WAL
buffers patterns
This commit adds a test module to demonstrate a few patterns for
reading from WAL buffers using WALReadFromBuffers added by commit
91f2cae7a4e.
1. This module contains a test function to read the WAL that's
fully copied to WAL buffers. Whether or not the WAL is fully
copied to WAL buffers is ensured by WaitXLogInsertionsToFinish
before WALReadFromBuffers.
2. This module contains an implementation of xlogreader page_read
callback to read unflushed/not-yet-flushed WAL directly from WAL
buffers.
---
src/backend/access/transam/xlog.c | 3 +-
src/backend/access/transam/xlogreader.c | 3 +-
src/include/access/xlog.h | 1 +
src/test/modules/Makefile | 1 +
src/test/modules/meson.build | 1 +
.../modules/read_wal_from_buffers/.gitignore | 4 +
.../modules/read_wal_from_buffers/Makefile | 23 ++
.../modules/read_wal_from_buffers/meson.build | 33 ++
.../read_wal_from_buffers--1.0.sql | 37 ++
.../read_wal_from_buffers.c | 318 ++++++++++++++++++
.../read_wal_from_buffers.control | 4 +
.../read_wal_from_buffers/t/001_basic.pl | 111 ++++++
12 files changed, 536 insertions(+), 3 deletions(-)
create mode 100644 src/test/modules/read_wal_from_buffers/.gitignore
create mode 100644 src/test/modules/read_wal_from_buffers/Makefile
create mode 100644 src/test/modules/read_wal_from_buffers/meson.build
create mode 100644 src/test/modules/read_wal_from_buffers/read_wal_from_buffers--1.0.sql
create mode 100644 src/test/modules/read_wal_from_buffers/read_wal_from_buffers.c
create mode 100644 src/test/modules/read_wal_from_buffers/read_wal_from_buffers.control
create mode 100644 src/test/modules/read_wal_from_buffers/t/001_basic.pl
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 34a2c71812..f053ad5919 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -699,7 +699,6 @@ static void ReserveXLogInsertLocation(int size, XLogRecPtr *StartPos,
XLogRecPtr *EndPos, XLogRecPtr *PrevPtr);
static bool ReserveXLogSwitch(XLogRecPtr *StartPos, XLogRecPtr *EndPos,
XLogRecPtr *PrevPtr);
-static XLogRecPtr WaitXLogInsertionsToFinish(XLogRecPtr upto);
static char *GetXLogBuffer(XLogRecPtr ptr, TimeLineID tli);
static XLogRecPtr XLogBytePosToRecPtr(uint64 bytepos);
static XLogRecPtr XLogBytePosToEndRecPtr(uint64 bytepos);
@@ -1495,7 +1494,7 @@ WALInsertLockUpdateInsertingAt(XLogRecPtr insertingAt)
* uninitialized page), and the inserter might need to evict an old WAL buffer
* to make room for a new one, which in turn requires WALWriteLock.
*/
-static XLogRecPtr
+XLogRecPtr
WaitXLogInsertionsToFinish(XLogRecPtr upto)
{
uint64 bytepos;
diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c
index 37d2a57961..12dddf64cc 100644
--- a/src/backend/access/transam/xlogreader.c
+++ b/src/backend/access/transam/xlogreader.c
@@ -1033,7 +1033,8 @@ ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen)
* record is. This is so that we can check the additional identification
* info that is present in the first page's "long" header.
*/
- if (targetSegNo != state->seg.ws_segno && targetPageOff != 0)
+ if (state->seg.ws_segno != 0 &&
+ targetSegNo != state->seg.ws_segno && targetPageOff != 0)
{
XLogRecPtr targetSegmentPtr = pageptr - targetPageOff;
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index 76787a8267..74606a6846 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -252,6 +252,7 @@ extern XLogRecPtr GetLastImportantRecPtr(void);
extern void SetWalWriterSleeping(bool sleeping);
+extern XLogRecPtr WaitXLogInsertionsToFinish(XLogRecPtr upto);
extern Size WALReadFromBuffers(char *dstbuf, XLogRecPtr startptr, Size count,
TimeLineID tli);
diff --git a/src/test/modules/Makefile b/src/test/modules/Makefile
index 256799f520..c39b407e5b 100644
--- a/src/test/modules/Makefile
+++ b/src/test/modules/Makefile
@@ -12,6 +12,7 @@ SUBDIRS = \
dummy_seclabel \
libpq_pipeline \
plsample \
+ read_wal_from_buffers \
spgist_name_ops \
test_bloomfilter \
test_copy_callbacks \
diff --git a/src/test/modules/meson.build b/src/test/modules/meson.build
index d8fe059d23..222fa1cd72 100644
--- a/src/test/modules/meson.build
+++ b/src/test/modules/meson.build
@@ -10,6 +10,7 @@ subdir('injection_points')
subdir('ldap_password_func')
subdir('libpq_pipeline')
subdir('plsample')
+subdir('read_wal_from_buffers')
subdir('spgist_name_ops')
subdir('ssl_passphrase_callback')
subdir('test_bloomfilter')
diff --git a/src/test/modules/read_wal_from_buffers/.gitignore b/src/test/modules/read_wal_from_buffers/.gitignore
new file mode 100644
index 0000000000..5dcb3ff972
--- /dev/null
+++ b/src/test/modules/read_wal_from_buffers/.gitignore
@@ -0,0 +1,4 @@
+# Generated subdirectories
+/log/
+/results/
+/tmp_check/
diff --git a/src/test/modules/read_wal_from_buffers/Makefile b/src/test/modules/read_wal_from_buffers/Makefile
new file mode 100644
index 0000000000..9e57a837f9
--- /dev/null
+++ b/src/test/modules/read_wal_from_buffers/Makefile
@@ -0,0 +1,23 @@
+# src/test/modules/read_wal_from_buffers/Makefile
+
+MODULE_big = read_wal_from_buffers
+OBJS = \
+ $(WIN32RES) \
+ read_wal_from_buffers.o
+PGFILEDESC = "read_wal_from_buffers - test module to read WAL from WAL buffers"
+
+EXTENSION = read_wal_from_buffers
+DATA = read_wal_from_buffers--1.0.sql
+
+TAP_TESTS = 1
+
+ifdef USE_PGXS
+PG_CONFIG = pg_config
+PGXS := $(shell $(PG_CONFIG) --pgxs)
+include $(PGXS)
+else
+subdir = src/test/modules/read_wal_from_buffers
+top_builddir = ../../../..
+include $(top_builddir)/src/Makefile.global
+include $(top_srcdir)/contrib/contrib-global.mk
+endif
diff --git a/src/test/modules/read_wal_from_buffers/meson.build b/src/test/modules/read_wal_from_buffers/meson.build
new file mode 100644
index 0000000000..3fac00d616
--- /dev/null
+++ b/src/test/modules/read_wal_from_buffers/meson.build
@@ -0,0 +1,33 @@
+# Copyright (c) 2024, PostgreSQL Global Development Group
+
+read_wal_from_buffers_sources = files(
+ 'read_wal_from_buffers.c',
+)
+
+if host_system == 'windows'
+ read_wal_from_buffers_sources += rc_lib_gen.process(win32ver_rc, extra_args: [
+ '--NAME', 'read_wal_from_buffers',
+ '--FILEDESC', 'read_wal_from_buffers - test module to read WAL from WAL buffers',])
+endif
+
+read_wal_from_buffers = shared_module('read_wal_from_buffers',
+ read_wal_from_buffers_sources,
+ kwargs: pg_test_mod_args,
+)
+test_install_libs += read_wal_from_buffers
+
+test_install_data += files(
+ 'read_wal_from_buffers.control',
+ 'read_wal_from_buffers--1.0.sql',
+)
+
+tests += {
+ 'name': 'read_wal_from_buffers',
+ 'sd': meson.current_source_dir(),
+ 'bd': meson.current_build_dir(),
+ 'tap': {
+ 'tests': [
+ 't/001_basic.pl',
+ ],
+ },
+}
diff --git a/src/test/modules/read_wal_from_buffers/read_wal_from_buffers--1.0.sql b/src/test/modules/read_wal_from_buffers/read_wal_from_buffers--1.0.sql
new file mode 100644
index 0000000000..72d05522fc
--- /dev/null
+++ b/src/test/modules/read_wal_from_buffers/read_wal_from_buffers--1.0.sql
@@ -0,0 +1,37 @@
+/* src/test/modules/read_wal_from_buffers/read_wal_from_buffers--1.0.sql */
+
+-- complain if script is sourced in psql, rather than via CREATE EXTENSION
+\echo Use "CREATE EXTENSION read_wal_from_buffers" to load this file. \quit
+
+--
+-- read_wal_from_buffers()
+--
+-- SQL function to read WAL from WAL buffers. Returns number of bytes read.
+--
+CREATE FUNCTION read_wal_from_buffers(IN lsn pg_lsn, IN bytes_to_read int,
+ bytes_read OUT int)
+AS 'MODULE_PATHNAME', 'read_wal_from_buffers'
+LANGUAGE C STRICT;
+
+--
+-- get_wal_records_info_from_buffers()
+--
+-- SQL function to get info of WAL records available in WAL buffers.
+--
+CREATE FUNCTION get_wal_records_info_from_buffers(IN start_lsn pg_lsn,
+ IN end_lsn pg_lsn,
+ OUT start_lsn pg_lsn,
+ OUT end_lsn pg_lsn,
+ OUT prev_lsn pg_lsn,
+ OUT xid xid,
+ OUT resource_manager text,
+ OUT record_type text,
+ OUT record_length int4,
+ OUT main_data_length int4,
+ OUT fpi_length int4,
+ OUT description text,
+ OUT block_ref text
+)
+RETURNS SETOF record
+AS 'MODULE_PATHNAME', 'get_wal_records_info_from_buffers'
+LANGUAGE C STRICT PARALLEL SAFE;
diff --git a/src/test/modules/read_wal_from_buffers/read_wal_from_buffers.c b/src/test/modules/read_wal_from_buffers/read_wal_from_buffers.c
new file mode 100644
index 0000000000..ed33a14127
--- /dev/null
+++ b/src/test/modules/read_wal_from_buffers/read_wal_from_buffers.c
@@ -0,0 +1,318 @@
+/*--------------------------------------------------------------------------
+ *
+ * read_wal_from_buffers.c
+ * Test module to read WAL from WAL buffers.
+ *
+ * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ * src/test/modules/read_wal_from_buffers/read_wal_from_buffers.c
+ * -------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "access/xlog.h"
+#include "access/xlog_internal.h"
+#include "access/xlogreader.h"
+#include "access/xlogrecovery.h"
+#include "funcapi.h"
+#include "miscadmin.h"
+#include "utils/builtins.h"
+#include "utils/pg_lsn.h"
+
+PG_MODULE_MAGIC;
+
+static int read_from_wal_buffers(XLogReaderState *state, XLogRecPtr targetPagePtr,
+ int reqLen, XLogRecPtr targetRecPtr,
+ char *cur_page);
+
+static XLogRecord *ReadNextXLogRecord(XLogReaderState *xlogreader);
+static void GetWALRecordInfo(XLogReaderState *record, Datum *values,
+ bool *nulls, uint32 ncols);
+static void GetWALRecordsInfo(FunctionCallInfo fcinfo,
+ XLogRecPtr start_lsn,
+ XLogRecPtr end_lsn);
+
+/*
+ * SQL function to read WAL from WAL buffers. Returns number of bytes read.
+ */
+PG_FUNCTION_INFO_V1(read_wal_from_buffers);
+Datum
+read_wal_from_buffers(PG_FUNCTION_ARGS)
+{
+ XLogRecPtr startptr = PG_GETARG_LSN(0);
+ int32 count = PG_GETARG_INT32(1);
+ Size read;
+ char *data = palloc0(count);
+ XLogRecPtr upto = startptr + count;
+ XLogRecPtr insert_pos = GetXLogInsertRecPtr();
+ TimeLineID tli = GetWALInsertionTimeLine();
+
+ /*
+ * The requested WAL may be very recent, so wait for any in-progress WAL
+ * insertions to WAL buffers to finish.
+ */
+ if (upto > insert_pos)
+ {
+ XLogRecPtr writtenUpto = WaitXLogInsertionsToFinish(upto);
+
+ upto = Min(upto, writtenUpto);
+ count = upto - startptr;
+ }
+
+ read = WALReadFromBuffers(data, startptr, count, tli);
+
+ pfree(data);
+
+ PG_RETURN_INT32(read);
+}
+
+/*
+ * XLogReaderRoutine->page_read callback for reading WAL from WAL buffers.
+ */
+static int
+read_from_wal_buffers(XLogReaderState *state, XLogRecPtr targetPagePtr,
+ int reqLen, XLogRecPtr targetRecPtr,
+ char *cur_page)
+{
+ XLogRecPtr read_upto,
+ loc;
+ TimeLineID tli = GetWALInsertionTimeLine();
+ Size count;
+ Size read = 0;
+
+ loc = targetPagePtr + reqLen;
+
+ /* Loop waiting for xlog to be available if necessary */
+ while (1)
+ {
+ read_upto = GetXLogInsertRecPtr();
+
+ if (loc <= read_upto)
+ break;
+
+ WaitXLogInsertionsToFinish(loc);
+
+ CHECK_FOR_INTERRUPTS();
+ pg_usleep(1000L);
+ }
+
+ if (targetPagePtr + XLOG_BLCKSZ <= read_upto)
+ {
+ /*
+ * more than one block available; read only that block, have caller
+ * come back if they need more.
+ */
+ count = XLOG_BLCKSZ;
+ }
+ else if (targetPagePtr + reqLen > read_upto)
+ {
+ /* not enough data there */
+ return -1;
+ }
+ else
+ {
+ /* enough bytes available to satisfy the request */
+ count = read_upto - targetPagePtr;
+ }
+
+ /* read WAL from WAL buffers */
+ read = WALReadFromBuffers(cur_page, targetPagePtr, count, tli);
+
+ if (read != count)
+ ereport(ERROR,
+ errmsg("could not read fully from WAL buffers; expected %lu, read %lu",
+ count, read));
+
+ return count;
+}
+
+/*
+ * Get info of all WAL records between start LSN and end LSN.
+ *
+ * This function and its helpers below are similar to pg_walinspect's
+ * pg_get_wal_records_info() except that it will get info of WAL records
+ * available in WAL buffers.
+ */
+PG_FUNCTION_INFO_V1(get_wal_records_info_from_buffers);
+Datum
+get_wal_records_info_from_buffers(PG_FUNCTION_ARGS)
+{
+ XLogRecPtr start_lsn = PG_GETARG_LSN(0);
+ XLogRecPtr end_lsn = PG_GETARG_LSN(1);
+
+ /*
+ * Validate start and end LSNs coming from the function inputs.
+ *
+ * Reading WAL below the first page of the first segments isn't allowed.
+ * This is a bootstrap WAL page and the page_read callback fails to read
+ * it.
+ */
+ if (start_lsn < XLOG_BLCKSZ)
+ ereport(ERROR,
+ (errmsg("could not read WAL at LSN %X/%X",
+ LSN_FORMAT_ARGS(start_lsn))));
+
+ if (start_lsn > end_lsn)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("WAL start LSN must be less than end LSN")));
+
+ GetWALRecordsInfo(fcinfo, start_lsn, end_lsn);
+
+ PG_RETURN_VOID();
+}
+
+/*
+ * Read next WAL record.
+ */
+static XLogRecord *
+ReadNextXLogRecord(XLogReaderState *xlogreader)
+{
+ XLogRecord *record;
+ char *errormsg;
+
+ record = XLogReadRecord(xlogreader, &errormsg);
+
+ if (record == NULL)
+ {
+ if (errormsg)
+ ereport(ERROR,
+ errmsg("could not read WAL at %X/%X: %s",
+ LSN_FORMAT_ARGS(xlogreader->EndRecPtr), errormsg));
+ else
+ ereport(ERROR,
+ errmsg("could not read WAL at %X/%X",
+ LSN_FORMAT_ARGS(xlogreader->EndRecPtr)));
+ }
+
+ return record;
+}
+
+/*
+ * Output values that make up a row describing caller's WAL record.
+ */
+static void
+GetWALRecordInfo(XLogReaderState *record, Datum *values,
+ bool *nulls, uint32 ncols)
+{
+ const char *record_type;
+ RmgrData desc;
+ uint32 fpi_len = 0;
+ StringInfoData rec_desc;
+ StringInfoData rec_blk_ref;
+ int i = 0;
+
+ desc = GetRmgr(XLogRecGetRmid(record));
+ record_type = desc.rm_identify(XLogRecGetInfo(record));
+
+ if (record_type == NULL)
+ record_type = psprintf("UNKNOWN (%x)", XLogRecGetInfo(record) & ~XLR_INFO_MASK);
+
+ initStringInfo(&rec_desc);
+ desc.rm_desc(&rec_desc, record);
+
+ if (XLogRecHasAnyBlockRefs(record))
+ {
+ initStringInfo(&rec_blk_ref);
+ XLogRecGetBlockRefInfo(record, false, true, &rec_blk_ref, &fpi_len);
+ }
+
+ values[i++] = LSNGetDatum(record->ReadRecPtr);
+ values[i++] = LSNGetDatum(record->EndRecPtr);
+ values[i++] = LSNGetDatum(XLogRecGetPrev(record));
+ values[i++] = TransactionIdGetDatum(XLogRecGetXid(record));
+ values[i++] = CStringGetTextDatum(desc.rm_name);
+ values[i++] = CStringGetTextDatum(record_type);
+ values[i++] = UInt32GetDatum(XLogRecGetTotalLen(record));
+ values[i++] = UInt32GetDatum(XLogRecGetDataLen(record));
+ values[i++] = UInt32GetDatum(fpi_len);
+
+ if (rec_desc.len > 0)
+ values[i++] = CStringGetTextDatum(rec_desc.data);
+ else
+ nulls[i++] = true;
+
+ if (XLogRecHasAnyBlockRefs(record))
+ values[i++] = CStringGetTextDatum(rec_blk_ref.data);
+ else
+ nulls[i++] = true;
+
+ Assert(i == ncols);
+}
+
+/*
+ * Get info of all WAL records between start LSN and end LSN.
+ */
+static void
+GetWALRecordsInfo(FunctionCallInfo fcinfo, XLogRecPtr start_lsn,
+ XLogRecPtr end_lsn)
+{
+#define GET_WAL_RECORDS_INFO_FROM_BUFFERS_COLS 11
+ XLogReaderState *xlogreader;
+ XLogRecPtr first_valid_record;
+ ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
+ MemoryContext old_cxt;
+ MemoryContext tmp_cxt;
+
+ Assert(start_lsn <= end_lsn);
+
+ InitMaterializedSRF(fcinfo, 0);
+
+ xlogreader = XLogReaderAllocate(wal_segment_size, NULL,
+ XL_ROUTINE(.page_read = &read_from_wal_buffers,
+ .segment_open = NULL,
+ .segment_close = NULL),
+ NULL);
+
+ if (xlogreader == NULL)
+ ereport(ERROR,
+ (errcode(ERRCODE_OUT_OF_MEMORY),
+ errmsg("out of memory"),
+ errdetail("Failed while allocating a WAL reading processor.")));
+
+ /* first find a valid recptr to start from */
+ first_valid_record = XLogFindNextRecord(xlogreader, start_lsn);
+
+ if (XLogRecPtrIsInvalid(first_valid_record))
+ {
+ ereport(LOG,
+ (errmsg("could not find a valid record after %X/%X",
+ LSN_FORMAT_ARGS(start_lsn))));
+
+ return;
+ }
+
+ tmp_cxt = AllocSetContextCreate(CurrentMemoryContext,
+ "GetWALRecordsInfo temporary cxt",
+ ALLOCSET_DEFAULT_SIZES);
+
+ while (ReadNextXLogRecord(xlogreader) &&
+ xlogreader->EndRecPtr <= end_lsn)
+ {
+ Datum values[GET_WAL_RECORDS_INFO_FROM_BUFFERS_COLS] = {0};
+ bool nulls[GET_WAL_RECORDS_INFO_FROM_BUFFERS_COLS] = {0};
+
+ /* Use the tmp context so we can clean up after each tuple is done */
+ old_cxt = MemoryContextSwitchTo(tmp_cxt);
+
+ GetWALRecordInfo(xlogreader, values, nulls,
+ GET_WAL_RECORDS_INFO_FROM_BUFFERS_COLS);
+
+ tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,
+ values, nulls);
+
+ /* clean up and switch back */
+ MemoryContextSwitchTo(old_cxt);
+ MemoryContextReset(tmp_cxt);
+
+ CHECK_FOR_INTERRUPTS();
+ }
+
+ MemoryContextDelete(tmp_cxt);
+ XLogReaderFree(xlogreader);
+
+#undef GET_WAL_RECORDS_INFO_FROM_BUFFERS_COLS
+}
diff --git a/src/test/modules/read_wal_from_buffers/read_wal_from_buffers.control b/src/test/modules/read_wal_from_buffers/read_wal_from_buffers.control
new file mode 100644
index 0000000000..b14d24751c
--- /dev/null
+++ b/src/test/modules/read_wal_from_buffers/read_wal_from_buffers.control
@@ -0,0 +1,4 @@
+comment = 'Test module to read WAL from WAL buffers'
+default_version = '1.0'
+module_pathname = '$libdir/read_wal_from_buffers'
+relocatable = true
diff --git a/src/test/modules/read_wal_from_buffers/t/001_basic.pl b/src/test/modules/read_wal_from_buffers/t/001_basic.pl
new file mode 100644
index 0000000000..15ef550c8c
--- /dev/null
+++ b/src/test/modules/read_wal_from_buffers/t/001_basic.pl
@@ -0,0 +1,111 @@
+# Copyright (c) 2021-2023, PostgreSQL Global Development Group
+
+use strict;
+use warnings;
+
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+use Time::HiRes qw(usleep);
+
+# Setup a new node. The configuration chosen here minimizes the number
+# of arbitrary records that could get generated in a cluster. Enlarging
+# checkpoint_timeout avoids noise with checkpoint activity. wal_level
+# set to "minimal" avoids random standby snapshot records. Autovacuum
+# could also trigger randomly, generating random WAL activity of its own.
+# Enlarging wal_writer_delay and wal_writer_flush_after avoid background
+# wal flush by walwriter.
+my $node = PostgreSQL::Test::Cluster->new("node");
+$node->init;
+$node->append_conf(
+ 'postgresql.conf',
+ q[wal_level = minimal
+ autovacuum = off
+ checkpoint_timeout = '30min'
+ wal_writer_delay = 10000ms
+ wal_writer_flush_after = 1GB
+]);
+$node->start;
+
+# Setup.
+$node->safe_psql('postgres', 'CREATE EXTENSION read_wal_from_buffers;');
+
+$node->safe_psql('postgres', 'CREATE TABLE t (c int);');
+
+my $result = 0;
+my $lsn;
+my $to_read;
+
+# Wait until we read from WAL buffers
+for (my $i = 0; $i < 10 * $PostgreSQL::Test::Utils::timeout_default; $i++)
+{
+ # Get current insert LSN. After this, we generate some WAL which is guranteed
+ # to be in WAL buffers as there is no other WAL generating activity is
+ # happening on the server. We then verify if we can read the WAL from WAL
+ # buffers using this LSN.
+ $lsn =
+ $node->safe_psql('postgres', 'SELECT pg_current_wal_insert_lsn();');
+
+ my $logstart = -s $node->logfile;
+
+ # Generate minimal WAL so that WAL buffers don't get overwritten.
+ $node->safe_psql('postgres', "INSERT INTO t VALUES ($i);");
+
+ $to_read = 8192;
+
+ my $res = $node->safe_psql('postgres',
+ qq{SELECT read_wal_from_buffers(lsn := '$lsn', bytes_to_read := $to_read) > 0;}
+ );
+
+ my $log = $node->log_contains(
+ "request to flush past end of generated WAL; request .*, current position .*",
+ $logstart);
+
+ if ($res eq 't' && $log > 0)
+ {
+ $result = 1;
+ last;
+ }
+
+ usleep(100_000);
+}
+ok($result, 'waited until WAL is successfully read from WAL buffers');
+
+$result = 0;
+
+# Wait until we get info of WAL records available in WAL buffers.
+for (my $i = 0; $i < 10 * $PostgreSQL::Test::Utils::timeout_default; $i++)
+{
+ $node->safe_psql('postgres', "DROP TABLE IF EXISTS foo, bar;");
+ $node->safe_psql('postgres',
+ "CREATE TABLE foo AS SELECT * FROM generate_series(1, 2);");
+ my $start_lsn =
+ $node->safe_psql('postgres', "SELECT pg_current_wal_insert_lsn();");
+ my $tbl_oid = $node->safe_psql('postgres',
+ "SELECT oid FROM pg_class WHERE relname = 'foo';");
+ $node->safe_psql('postgres',
+ "INSERT INTO foo SELECT * FROM generate_series(1, 10);");
+ my $end_lsn =
+ $node->safe_psql('postgres', "SELECT pg_current_wal_insert_lsn();");
+ $node->safe_psql('postgres',
+ "CREATE TABLE bar AS SELECT * FROM generate_series(1, 2);");
+
+ my $res = $node->safe_psql(
+ 'postgres',
+ "SELECT count(*) FROM get_wal_records_info_from_buffers('$start_lsn', '$end_lsn')
+ WHERE block_ref LIKE concat('%', '$tbl_oid', '%') AND
+ resource_manager = 'Heap' AND
+ record_type = 'INSERT';");
+
+ if ($res eq 10)
+ {
+ $result = 1;
+ last;
+ }
+
+ usleep(100_000);
+}
+ok($result,
+ 'waited until we get info of WAL records available in WAL buffers.');
+
+done_testing();
--
2.34.1
Hi, Bharath. I've been testing this. It's cool. Is there any way we could
monitor the hit rate about directly reading from WAL buffers by exporting
to some views?
---
Regards, Jingtang
On Wed, May 8, 2024 at 9:51 AM Jingtang Zhang <mrdrivingduck@gmail.com> wrote:
Hi, Bharath. I've been testing this. It's cool. Is there any way we could
monitor the hit rate about directly reading from WAL buffers by exporting
to some views?
Thanks for looking into this. I used purpose-built patches for
verifying the WAL buffers hit ratio, please check
USE-ON-HEAD-Collect-WAL-read-from-file-stats.txt and
USE-ON-PATCH-Collect-WAL-read-from-buffers-and-file-stats.txt at
/messages/by-id/CALj2ACU9cfAcfVsGwUqXMace_7rfSBJ7+hXVJfVV1jnspTDGHQ@mail.gmail.com.
In the long run, it's better to extend what's proposed in the thread
/messages/by-id/CALj2ACU_f5_c8F+xyNR4HURjG=Jziiz07wCpQc=AqAJUFh7+8w@mail.gmail.com.
I haven't had a chance to dive deep into that thread yet, but a quick
glance over v8 patch tells me that it hasn't yet implemented the idea
of collecting WAL read stats for both walsenders and the backends
reading WAL. If that's done, we can extend it for WAL read from WAL
buffers.
--
Bharath Rupireddy
PostgreSQL Contributors Team
RDS Open Source Databases
Amazon Web Services: https://aws.amazon.com
Hi Bharath,
I spent some time examining the patch. Here are my observations from the review.
- I believe there’s no need for an extra variable ‘nbytes’ in this
context. We can repurpose the ‘count’ variable for the same function.
If necessary, we might think about renaming ‘count’ to ‘nbytes’.
- The operations performed by logical_read_xlog_page() and
read_local_xlog_page_guts() are identical. It might be beneficial to
create a shared function to minimize code repetition.
Best Regards,
Nitin Jadhav
Azure Database for PostgreSQL
Microsoft
On Mon, May 13, 2024 at 12:17 PM Bharath Rupireddy
<bharath.rupireddyforpostgres@gmail.com> wrote:
Show quoted text
On Wed, May 8, 2024 at 9:51 AM Jingtang Zhang <mrdrivingduck@gmail.com> wrote:
Hi, Bharath. I've been testing this. It's cool. Is there any way we could
monitor the hit rate about directly reading from WAL buffers by exporting
to some views?Thanks for looking into this. I used purpose-built patches for
verifying the WAL buffers hit ratio, please check
USE-ON-HEAD-Collect-WAL-read-from-file-stats.txt and
USE-ON-PATCH-Collect-WAL-read-from-buffers-and-file-stats.txt at
/messages/by-id/CALj2ACU9cfAcfVsGwUqXMace_7rfSBJ7+hXVJfVV1jnspTDGHQ@mail.gmail.com.
In the long run, it's better to extend what's proposed in the thread
/messages/by-id/CALj2ACU_f5_c8F+xyNR4HURjG=Jziiz07wCpQc=AqAJUFh7+8w@mail.gmail.com.
I haven't had a chance to dive deep into that thread yet, but a quick
glance over v8 patch tells me that it hasn't yet implemented the idea
of collecting WAL read stats for both walsenders and the backends
reading WAL. If that's done, we can extend it for WAL read from WAL
buffers.--
Bharath Rupireddy
PostgreSQL Contributors Team
RDS Open Source Databases
Amazon Web Services: https://aws.amazon.com
Hi,
On Sat, Jun 8, 2024 at 5:24 PM Nitin Jadhav <nitinjadhavpostgres@gmail.com>
wrote:
I spent some time examining the patch. Here are my observations from the
review.
Thanks.
- I believe there’s no need for an extra variable ‘nbytes’ in this
context. We can repurpose the ‘count’ variable for the same function.
If necessary, we might think about renaming ‘count’ to ‘nbytes’.
'count' variable can't be altered once determined as the page_read
callbacks need to return the total number of bytes read. However, I ended
up removing 'nbytes' like in the attached v2 patch.
- The operations performed by logical_read_xlog_page() and
read_local_xlog_page_guts() are identical. It might be beneficial to
create a shared function to minimize code repetition.
IMO, creating another function to just wrap two other functions doesn't
seem good to me.
I attached v2 patches for further review. No changes in 0002 patch.
--
Bharath Rupireddy
PostgreSQL Contributors Team
RDS Open Source Databases
Amazon Web Services: https://aws.amazon.com
Attachments:
v2-0001-Use-WALReadFromBuffers-in-more-places.patchapplication/x-patch; name=v2-0001-Use-WALReadFromBuffers-in-more-places.patchDownload
From 6902b207cf7497396493aef369a9e275900a86e7 Mon Sep 17 00:00:00 2001
From: Bharath Rupireddy <bharath.rupireddyforpostgres@gmail.com>
Date: Wed, 12 Jun 2024 14:46:16 +0000
Subject: [PATCH v2 1/2] Use WALReadFromBuffers in more places
Commit 91f2cae introduced WALReadFromBuffers, and used it only for
physical replication walsenders. There are couple of other callers
that use read_local_xlog_page page_read callback and logical
replication walsenders can also benefit reading WAL from WAL
buffers using the new function. This commit uses the new function
for these callers.
Author: Bharath Rupireddy
Reviewed-by: Jingtang Zhang, Nitin Jadhav
Discussion: https://www.postgresql.org/message-id/CALj2ACVfF2Uj9NoFy-5m98HNtjHpuD17EDE9twVeJng-jTAe7A%40mail.gmail.com
---
src/backend/access/transam/xlogutils.c | 10 +++++++++-
src/backend/replication/walsender.c | 13 ++++++++++---
2 files changed, 19 insertions(+), 4 deletions(-)
diff --git a/src/backend/access/transam/xlogutils.c b/src/backend/access/transam/xlogutils.c
index 5295b85fe0..24a7ef0479 100644
--- a/src/backend/access/transam/xlogutils.c
+++ b/src/backend/access/transam/xlogutils.c
@@ -892,6 +892,7 @@ read_local_xlog_page_guts(XLogReaderState *state, XLogRecPtr targetPagePtr,
int count;
WALReadError errinfo;
TimeLineID currTLI;
+ Size rbytes;
loc = targetPagePtr + reqLen;
@@ -1004,7 +1005,14 @@ read_local_xlog_page_guts(XLogReaderState *state, XLogRecPtr targetPagePtr,
count = read_upto - targetPagePtr;
}
- if (!WALRead(state, cur_page, targetPagePtr, count, tli,
+ /* attempt to read WAL from WAL buffers first */
+ rbytes = WALReadFromBuffers(cur_page, targetPagePtr, count, currTLI);
+ cur_page += rbytes;
+ targetPagePtr += rbytes;
+
+ /* now read the remaining WAL from WAL file */
+ if ((count - rbytes) > 0 &&
+ !WALRead(state, cur_page, targetPagePtr, (count - rbytes), tli,
&errinfo))
WALReadRaiseError(&errinfo);
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index c623b07cf0..bd0decef3d 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -1056,6 +1056,7 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req
WALReadError errinfo;
XLogSegNo segno;
TimeLineID currTLI;
+ Size rbytes;
/*
* Make sure we have enough WAL available before retrieving the current
@@ -1092,11 +1093,17 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req
else
count = flushptr - targetPagePtr; /* part of the page available */
- /* now actually read the data, we know it's there */
- if (!WALRead(state,
+ /* attempt to read WAL from WAL buffers first */
+ rbytes = WALReadFromBuffers(cur_page, targetPagePtr, count, currTLI);
+ cur_page += rbytes;
+ targetPagePtr += rbytes;
+
+ /* now read the remaining WAL from WAL file */
+ if ((count - rbytes) > 0 &&
+ !WALRead(state,
cur_page,
targetPagePtr,
- count,
+ (count - rbytes),
currTLI, /* Pass the current TLI because only
* WalSndSegmentOpen controls whether new TLI
* is needed. */
--
2.34.1
v2-0002-Add-test-module-to-demonstrate-reading-from-WAL-b.patchapplication/x-patch; name=v2-0002-Add-test-module-to-demonstrate-reading-from-WAL-b.patchDownload
From 6aca7856208907d2fbde58d507e4a48319a614ad Mon Sep 17 00:00:00 2001
From: Bharath Rupireddy <bharath.rupireddyforpostgres@gmail.com>
Date: Wed, 12 Jun 2024 14:55:31 +0000
Subject: [PATCH v2 2/2] Add test module to demonstrate reading from WAL
buffers patterns.
his commit adds a test module to demonstrate a few patterns for
reading from WAL buffers using WALReadFromBuffers added by commit
91f2cae7a4e.
1. This module contains a test function to read the WAL that's
fully copied to WAL buffers. Whether or not the WAL is fully
copied to WAL buffers is ensured by WaitXLogInsertionsToFinish
before WALReadFromBuffers.
2. This module contains an implementation of xlogreader page_read
callback to read unflushed/not-yet-flushed WAL directly from WAL
buffers.
Author: Bharath Rupireddy
Discussion: https://www.postgresql.org/message-id/CAFiTN-sE7CJn-ZFj%2B-0Wv6TNytv_fp4n%2BeCszspxJ3mt77t5ig%40mail.gmail.com
Discussion: https://www.postgresql.org/message-id/CALj2ACVfF2Uj9NoFy-5m98HNtjHpuD17EDE9twVeJng-jTAe7A%40mail.gmail.com
---
src/backend/access/transam/xlog.c | 3 +-
src/backend/access/transam/xlogreader.c | 3 +-
src/include/access/xlog.h | 1 +
src/test/modules/Makefile | 1 +
src/test/modules/meson.build | 1 +
.../modules/read_wal_from_buffers/.gitignore | 4 +
.../modules/read_wal_from_buffers/Makefile | 23 ++
.../modules/read_wal_from_buffers/meson.build | 33 ++
.../read_wal_from_buffers--1.0.sql | 37 ++
.../read_wal_from_buffers.c | 318 ++++++++++++++++++
.../read_wal_from_buffers.control | 4 +
.../read_wal_from_buffers/t/001_basic.pl | 111 ++++++
12 files changed, 536 insertions(+), 3 deletions(-)
create mode 100644 src/test/modules/read_wal_from_buffers/.gitignore
create mode 100644 src/test/modules/read_wal_from_buffers/Makefile
create mode 100644 src/test/modules/read_wal_from_buffers/meson.build
create mode 100644 src/test/modules/read_wal_from_buffers/read_wal_from_buffers--1.0.sql
create mode 100644 src/test/modules/read_wal_from_buffers/read_wal_from_buffers.c
create mode 100644 src/test/modules/read_wal_from_buffers/read_wal_from_buffers.control
create mode 100644 src/test/modules/read_wal_from_buffers/t/001_basic.pl
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 330e058c5f..7e2f787743 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -699,7 +699,6 @@ static void ReserveXLogInsertLocation(int size, XLogRecPtr *StartPos,
XLogRecPtr *EndPos, XLogRecPtr *PrevPtr);
static bool ReserveXLogSwitch(XLogRecPtr *StartPos, XLogRecPtr *EndPos,
XLogRecPtr *PrevPtr);
-static XLogRecPtr WaitXLogInsertionsToFinish(XLogRecPtr upto);
static char *GetXLogBuffer(XLogRecPtr ptr, TimeLineID tli);
static XLogRecPtr XLogBytePosToRecPtr(uint64 bytepos);
static XLogRecPtr XLogBytePosToEndRecPtr(uint64 bytepos);
@@ -1495,7 +1494,7 @@ WALInsertLockUpdateInsertingAt(XLogRecPtr insertingAt)
* uninitialized page), and the inserter might need to evict an old WAL buffer
* to make room for a new one, which in turn requires WALWriteLock.
*/
-static XLogRecPtr
+XLogRecPtr
WaitXLogInsertionsToFinish(XLogRecPtr upto)
{
uint64 bytepos;
diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c
index 37d2a57961..12dddf64cc 100644
--- a/src/backend/access/transam/xlogreader.c
+++ b/src/backend/access/transam/xlogreader.c
@@ -1033,7 +1033,8 @@ ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen)
* record is. This is so that we can check the additional identification
* info that is present in the first page's "long" header.
*/
- if (targetSegNo != state->seg.ws_segno && targetPageOff != 0)
+ if (state->seg.ws_segno != 0 &&
+ targetSegNo != state->seg.ws_segno && targetPageOff != 0)
{
XLogRecPtr targetSegmentPtr = pageptr - targetPageOff;
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index 1a1f11a943..36bf90cf58 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -252,6 +252,7 @@ extern XLogRecPtr GetLastImportantRecPtr(void);
extern void SetWalWriterSleeping(bool sleeping);
+extern XLogRecPtr WaitXLogInsertionsToFinish(XLogRecPtr upto);
extern Size WALReadFromBuffers(char *dstbuf, XLogRecPtr startptr, Size count,
TimeLineID tli);
diff --git a/src/test/modules/Makefile b/src/test/modules/Makefile
index 256799f520..c39b407e5b 100644
--- a/src/test/modules/Makefile
+++ b/src/test/modules/Makefile
@@ -12,6 +12,7 @@ SUBDIRS = \
dummy_seclabel \
libpq_pipeline \
plsample \
+ read_wal_from_buffers \
spgist_name_ops \
test_bloomfilter \
test_copy_callbacks \
diff --git a/src/test/modules/meson.build b/src/test/modules/meson.build
index d8fe059d23..222fa1cd72 100644
--- a/src/test/modules/meson.build
+++ b/src/test/modules/meson.build
@@ -10,6 +10,7 @@ subdir('injection_points')
subdir('ldap_password_func')
subdir('libpq_pipeline')
subdir('plsample')
+subdir('read_wal_from_buffers')
subdir('spgist_name_ops')
subdir('ssl_passphrase_callback')
subdir('test_bloomfilter')
diff --git a/src/test/modules/read_wal_from_buffers/.gitignore b/src/test/modules/read_wal_from_buffers/.gitignore
new file mode 100644
index 0000000000..5dcb3ff972
--- /dev/null
+++ b/src/test/modules/read_wal_from_buffers/.gitignore
@@ -0,0 +1,4 @@
+# Generated subdirectories
+/log/
+/results/
+/tmp_check/
diff --git a/src/test/modules/read_wal_from_buffers/Makefile b/src/test/modules/read_wal_from_buffers/Makefile
new file mode 100644
index 0000000000..9e57a837f9
--- /dev/null
+++ b/src/test/modules/read_wal_from_buffers/Makefile
@@ -0,0 +1,23 @@
+# src/test/modules/read_wal_from_buffers/Makefile
+
+MODULE_big = read_wal_from_buffers
+OBJS = \
+ $(WIN32RES) \
+ read_wal_from_buffers.o
+PGFILEDESC = "read_wal_from_buffers - test module to read WAL from WAL buffers"
+
+EXTENSION = read_wal_from_buffers
+DATA = read_wal_from_buffers--1.0.sql
+
+TAP_TESTS = 1
+
+ifdef USE_PGXS
+PG_CONFIG = pg_config
+PGXS := $(shell $(PG_CONFIG) --pgxs)
+include $(PGXS)
+else
+subdir = src/test/modules/read_wal_from_buffers
+top_builddir = ../../../..
+include $(top_builddir)/src/Makefile.global
+include $(top_srcdir)/contrib/contrib-global.mk
+endif
diff --git a/src/test/modules/read_wal_from_buffers/meson.build b/src/test/modules/read_wal_from_buffers/meson.build
new file mode 100644
index 0000000000..3fac00d616
--- /dev/null
+++ b/src/test/modules/read_wal_from_buffers/meson.build
@@ -0,0 +1,33 @@
+# Copyright (c) 2024, PostgreSQL Global Development Group
+
+read_wal_from_buffers_sources = files(
+ 'read_wal_from_buffers.c',
+)
+
+if host_system == 'windows'
+ read_wal_from_buffers_sources += rc_lib_gen.process(win32ver_rc, extra_args: [
+ '--NAME', 'read_wal_from_buffers',
+ '--FILEDESC', 'read_wal_from_buffers - test module to read WAL from WAL buffers',])
+endif
+
+read_wal_from_buffers = shared_module('read_wal_from_buffers',
+ read_wal_from_buffers_sources,
+ kwargs: pg_test_mod_args,
+)
+test_install_libs += read_wal_from_buffers
+
+test_install_data += files(
+ 'read_wal_from_buffers.control',
+ 'read_wal_from_buffers--1.0.sql',
+)
+
+tests += {
+ 'name': 'read_wal_from_buffers',
+ 'sd': meson.current_source_dir(),
+ 'bd': meson.current_build_dir(),
+ 'tap': {
+ 'tests': [
+ 't/001_basic.pl',
+ ],
+ },
+}
diff --git a/src/test/modules/read_wal_from_buffers/read_wal_from_buffers--1.0.sql b/src/test/modules/read_wal_from_buffers/read_wal_from_buffers--1.0.sql
new file mode 100644
index 0000000000..72d05522fc
--- /dev/null
+++ b/src/test/modules/read_wal_from_buffers/read_wal_from_buffers--1.0.sql
@@ -0,0 +1,37 @@
+/* src/test/modules/read_wal_from_buffers/read_wal_from_buffers--1.0.sql */
+
+-- complain if script is sourced in psql, rather than via CREATE EXTENSION
+\echo Use "CREATE EXTENSION read_wal_from_buffers" to load this file. \quit
+
+--
+-- read_wal_from_buffers()
+--
+-- SQL function to read WAL from WAL buffers. Returns number of bytes read.
+--
+CREATE FUNCTION read_wal_from_buffers(IN lsn pg_lsn, IN bytes_to_read int,
+ bytes_read OUT int)
+AS 'MODULE_PATHNAME', 'read_wal_from_buffers'
+LANGUAGE C STRICT;
+
+--
+-- get_wal_records_info_from_buffers()
+--
+-- SQL function to get info of WAL records available in WAL buffers.
+--
+CREATE FUNCTION get_wal_records_info_from_buffers(IN start_lsn pg_lsn,
+ IN end_lsn pg_lsn,
+ OUT start_lsn pg_lsn,
+ OUT end_lsn pg_lsn,
+ OUT prev_lsn pg_lsn,
+ OUT xid xid,
+ OUT resource_manager text,
+ OUT record_type text,
+ OUT record_length int4,
+ OUT main_data_length int4,
+ OUT fpi_length int4,
+ OUT description text,
+ OUT block_ref text
+)
+RETURNS SETOF record
+AS 'MODULE_PATHNAME', 'get_wal_records_info_from_buffers'
+LANGUAGE C STRICT PARALLEL SAFE;
diff --git a/src/test/modules/read_wal_from_buffers/read_wal_from_buffers.c b/src/test/modules/read_wal_from_buffers/read_wal_from_buffers.c
new file mode 100644
index 0000000000..ed33a14127
--- /dev/null
+++ b/src/test/modules/read_wal_from_buffers/read_wal_from_buffers.c
@@ -0,0 +1,318 @@
+/*--------------------------------------------------------------------------
+ *
+ * read_wal_from_buffers.c
+ * Test module to read WAL from WAL buffers.
+ *
+ * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ * src/test/modules/read_wal_from_buffers/read_wal_from_buffers.c
+ * -------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "access/xlog.h"
+#include "access/xlog_internal.h"
+#include "access/xlogreader.h"
+#include "access/xlogrecovery.h"
+#include "funcapi.h"
+#include "miscadmin.h"
+#include "utils/builtins.h"
+#include "utils/pg_lsn.h"
+
+PG_MODULE_MAGIC;
+
+static int read_from_wal_buffers(XLogReaderState *state, XLogRecPtr targetPagePtr,
+ int reqLen, XLogRecPtr targetRecPtr,
+ char *cur_page);
+
+static XLogRecord *ReadNextXLogRecord(XLogReaderState *xlogreader);
+static void GetWALRecordInfo(XLogReaderState *record, Datum *values,
+ bool *nulls, uint32 ncols);
+static void GetWALRecordsInfo(FunctionCallInfo fcinfo,
+ XLogRecPtr start_lsn,
+ XLogRecPtr end_lsn);
+
+/*
+ * SQL function to read WAL from WAL buffers. Returns number of bytes read.
+ */
+PG_FUNCTION_INFO_V1(read_wal_from_buffers);
+Datum
+read_wal_from_buffers(PG_FUNCTION_ARGS)
+{
+ XLogRecPtr startptr = PG_GETARG_LSN(0);
+ int32 count = PG_GETARG_INT32(1);
+ Size read;
+ char *data = palloc0(count);
+ XLogRecPtr upto = startptr + count;
+ XLogRecPtr insert_pos = GetXLogInsertRecPtr();
+ TimeLineID tli = GetWALInsertionTimeLine();
+
+ /*
+ * The requested WAL may be very recent, so wait for any in-progress WAL
+ * insertions to WAL buffers to finish.
+ */
+ if (upto > insert_pos)
+ {
+ XLogRecPtr writtenUpto = WaitXLogInsertionsToFinish(upto);
+
+ upto = Min(upto, writtenUpto);
+ count = upto - startptr;
+ }
+
+ read = WALReadFromBuffers(data, startptr, count, tli);
+
+ pfree(data);
+
+ PG_RETURN_INT32(read);
+}
+
+/*
+ * XLogReaderRoutine->page_read callback for reading WAL from WAL buffers.
+ */
+static int
+read_from_wal_buffers(XLogReaderState *state, XLogRecPtr targetPagePtr,
+ int reqLen, XLogRecPtr targetRecPtr,
+ char *cur_page)
+{
+ XLogRecPtr read_upto,
+ loc;
+ TimeLineID tli = GetWALInsertionTimeLine();
+ Size count;
+ Size read = 0;
+
+ loc = targetPagePtr + reqLen;
+
+ /* Loop waiting for xlog to be available if necessary */
+ while (1)
+ {
+ read_upto = GetXLogInsertRecPtr();
+
+ if (loc <= read_upto)
+ break;
+
+ WaitXLogInsertionsToFinish(loc);
+
+ CHECK_FOR_INTERRUPTS();
+ pg_usleep(1000L);
+ }
+
+ if (targetPagePtr + XLOG_BLCKSZ <= read_upto)
+ {
+ /*
+ * more than one block available; read only that block, have caller
+ * come back if they need more.
+ */
+ count = XLOG_BLCKSZ;
+ }
+ else if (targetPagePtr + reqLen > read_upto)
+ {
+ /* not enough data there */
+ return -1;
+ }
+ else
+ {
+ /* enough bytes available to satisfy the request */
+ count = read_upto - targetPagePtr;
+ }
+
+ /* read WAL from WAL buffers */
+ read = WALReadFromBuffers(cur_page, targetPagePtr, count, tli);
+
+ if (read != count)
+ ereport(ERROR,
+ errmsg("could not read fully from WAL buffers; expected %lu, read %lu",
+ count, read));
+
+ return count;
+}
+
+/*
+ * Get info of all WAL records between start LSN and end LSN.
+ *
+ * This function and its helpers below are similar to pg_walinspect's
+ * pg_get_wal_records_info() except that it will get info of WAL records
+ * available in WAL buffers.
+ */
+PG_FUNCTION_INFO_V1(get_wal_records_info_from_buffers);
+Datum
+get_wal_records_info_from_buffers(PG_FUNCTION_ARGS)
+{
+ XLogRecPtr start_lsn = PG_GETARG_LSN(0);
+ XLogRecPtr end_lsn = PG_GETARG_LSN(1);
+
+ /*
+ * Validate start and end LSNs coming from the function inputs.
+ *
+ * Reading WAL below the first page of the first segments isn't allowed.
+ * This is a bootstrap WAL page and the page_read callback fails to read
+ * it.
+ */
+ if (start_lsn < XLOG_BLCKSZ)
+ ereport(ERROR,
+ (errmsg("could not read WAL at LSN %X/%X",
+ LSN_FORMAT_ARGS(start_lsn))));
+
+ if (start_lsn > end_lsn)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("WAL start LSN must be less than end LSN")));
+
+ GetWALRecordsInfo(fcinfo, start_lsn, end_lsn);
+
+ PG_RETURN_VOID();
+}
+
+/*
+ * Read next WAL record.
+ */
+static XLogRecord *
+ReadNextXLogRecord(XLogReaderState *xlogreader)
+{
+ XLogRecord *record;
+ char *errormsg;
+
+ record = XLogReadRecord(xlogreader, &errormsg);
+
+ if (record == NULL)
+ {
+ if (errormsg)
+ ereport(ERROR,
+ errmsg("could not read WAL at %X/%X: %s",
+ LSN_FORMAT_ARGS(xlogreader->EndRecPtr), errormsg));
+ else
+ ereport(ERROR,
+ errmsg("could not read WAL at %X/%X",
+ LSN_FORMAT_ARGS(xlogreader->EndRecPtr)));
+ }
+
+ return record;
+}
+
+/*
+ * Output values that make up a row describing caller's WAL record.
+ */
+static void
+GetWALRecordInfo(XLogReaderState *record, Datum *values,
+ bool *nulls, uint32 ncols)
+{
+ const char *record_type;
+ RmgrData desc;
+ uint32 fpi_len = 0;
+ StringInfoData rec_desc;
+ StringInfoData rec_blk_ref;
+ int i = 0;
+
+ desc = GetRmgr(XLogRecGetRmid(record));
+ record_type = desc.rm_identify(XLogRecGetInfo(record));
+
+ if (record_type == NULL)
+ record_type = psprintf("UNKNOWN (%x)", XLogRecGetInfo(record) & ~XLR_INFO_MASK);
+
+ initStringInfo(&rec_desc);
+ desc.rm_desc(&rec_desc, record);
+
+ if (XLogRecHasAnyBlockRefs(record))
+ {
+ initStringInfo(&rec_blk_ref);
+ XLogRecGetBlockRefInfo(record, false, true, &rec_blk_ref, &fpi_len);
+ }
+
+ values[i++] = LSNGetDatum(record->ReadRecPtr);
+ values[i++] = LSNGetDatum(record->EndRecPtr);
+ values[i++] = LSNGetDatum(XLogRecGetPrev(record));
+ values[i++] = TransactionIdGetDatum(XLogRecGetXid(record));
+ values[i++] = CStringGetTextDatum(desc.rm_name);
+ values[i++] = CStringGetTextDatum(record_type);
+ values[i++] = UInt32GetDatum(XLogRecGetTotalLen(record));
+ values[i++] = UInt32GetDatum(XLogRecGetDataLen(record));
+ values[i++] = UInt32GetDatum(fpi_len);
+
+ if (rec_desc.len > 0)
+ values[i++] = CStringGetTextDatum(rec_desc.data);
+ else
+ nulls[i++] = true;
+
+ if (XLogRecHasAnyBlockRefs(record))
+ values[i++] = CStringGetTextDatum(rec_blk_ref.data);
+ else
+ nulls[i++] = true;
+
+ Assert(i == ncols);
+}
+
+/*
+ * Get info of all WAL records between start LSN and end LSN.
+ */
+static void
+GetWALRecordsInfo(FunctionCallInfo fcinfo, XLogRecPtr start_lsn,
+ XLogRecPtr end_lsn)
+{
+#define GET_WAL_RECORDS_INFO_FROM_BUFFERS_COLS 11
+ XLogReaderState *xlogreader;
+ XLogRecPtr first_valid_record;
+ ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
+ MemoryContext old_cxt;
+ MemoryContext tmp_cxt;
+
+ Assert(start_lsn <= end_lsn);
+
+ InitMaterializedSRF(fcinfo, 0);
+
+ xlogreader = XLogReaderAllocate(wal_segment_size, NULL,
+ XL_ROUTINE(.page_read = &read_from_wal_buffers,
+ .segment_open = NULL,
+ .segment_close = NULL),
+ NULL);
+
+ if (xlogreader == NULL)
+ ereport(ERROR,
+ (errcode(ERRCODE_OUT_OF_MEMORY),
+ errmsg("out of memory"),
+ errdetail("Failed while allocating a WAL reading processor.")));
+
+ /* first find a valid recptr to start from */
+ first_valid_record = XLogFindNextRecord(xlogreader, start_lsn);
+
+ if (XLogRecPtrIsInvalid(first_valid_record))
+ {
+ ereport(LOG,
+ (errmsg("could not find a valid record after %X/%X",
+ LSN_FORMAT_ARGS(start_lsn))));
+
+ return;
+ }
+
+ tmp_cxt = AllocSetContextCreate(CurrentMemoryContext,
+ "GetWALRecordsInfo temporary cxt",
+ ALLOCSET_DEFAULT_SIZES);
+
+ while (ReadNextXLogRecord(xlogreader) &&
+ xlogreader->EndRecPtr <= end_lsn)
+ {
+ Datum values[GET_WAL_RECORDS_INFO_FROM_BUFFERS_COLS] = {0};
+ bool nulls[GET_WAL_RECORDS_INFO_FROM_BUFFERS_COLS] = {0};
+
+ /* Use the tmp context so we can clean up after each tuple is done */
+ old_cxt = MemoryContextSwitchTo(tmp_cxt);
+
+ GetWALRecordInfo(xlogreader, values, nulls,
+ GET_WAL_RECORDS_INFO_FROM_BUFFERS_COLS);
+
+ tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,
+ values, nulls);
+
+ /* clean up and switch back */
+ MemoryContextSwitchTo(old_cxt);
+ MemoryContextReset(tmp_cxt);
+
+ CHECK_FOR_INTERRUPTS();
+ }
+
+ MemoryContextDelete(tmp_cxt);
+ XLogReaderFree(xlogreader);
+
+#undef GET_WAL_RECORDS_INFO_FROM_BUFFERS_COLS
+}
diff --git a/src/test/modules/read_wal_from_buffers/read_wal_from_buffers.control b/src/test/modules/read_wal_from_buffers/read_wal_from_buffers.control
new file mode 100644
index 0000000000..b14d24751c
--- /dev/null
+++ b/src/test/modules/read_wal_from_buffers/read_wal_from_buffers.control
@@ -0,0 +1,4 @@
+comment = 'Test module to read WAL from WAL buffers'
+default_version = '1.0'
+module_pathname = '$libdir/read_wal_from_buffers'
+relocatable = true
diff --git a/src/test/modules/read_wal_from_buffers/t/001_basic.pl b/src/test/modules/read_wal_from_buffers/t/001_basic.pl
new file mode 100644
index 0000000000..15ef550c8c
--- /dev/null
+++ b/src/test/modules/read_wal_from_buffers/t/001_basic.pl
@@ -0,0 +1,111 @@
+# Copyright (c) 2021-2023, PostgreSQL Global Development Group
+
+use strict;
+use warnings;
+
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+use Time::HiRes qw(usleep);
+
+# Setup a new node. The configuration chosen here minimizes the number
+# of arbitrary records that could get generated in a cluster. Enlarging
+# checkpoint_timeout avoids noise with checkpoint activity. wal_level
+# set to "minimal" avoids random standby snapshot records. Autovacuum
+# could also trigger randomly, generating random WAL activity of its own.
+# Enlarging wal_writer_delay and wal_writer_flush_after avoid background
+# wal flush by walwriter.
+my $node = PostgreSQL::Test::Cluster->new("node");
+$node->init;
+$node->append_conf(
+ 'postgresql.conf',
+ q[wal_level = minimal
+ autovacuum = off
+ checkpoint_timeout = '30min'
+ wal_writer_delay = 10000ms
+ wal_writer_flush_after = 1GB
+]);
+$node->start;
+
+# Setup.
+$node->safe_psql('postgres', 'CREATE EXTENSION read_wal_from_buffers;');
+
+$node->safe_psql('postgres', 'CREATE TABLE t (c int);');
+
+my $result = 0;
+my $lsn;
+my $to_read;
+
+# Wait until we read from WAL buffers
+for (my $i = 0; $i < 10 * $PostgreSQL::Test::Utils::timeout_default; $i++)
+{
+ # Get current insert LSN. After this, we generate some WAL which is guranteed
+ # to be in WAL buffers as there is no other WAL generating activity is
+ # happening on the server. We then verify if we can read the WAL from WAL
+ # buffers using this LSN.
+ $lsn =
+ $node->safe_psql('postgres', 'SELECT pg_current_wal_insert_lsn();');
+
+ my $logstart = -s $node->logfile;
+
+ # Generate minimal WAL so that WAL buffers don't get overwritten.
+ $node->safe_psql('postgres', "INSERT INTO t VALUES ($i);");
+
+ $to_read = 8192;
+
+ my $res = $node->safe_psql('postgres',
+ qq{SELECT read_wal_from_buffers(lsn := '$lsn', bytes_to_read := $to_read) > 0;}
+ );
+
+ my $log = $node->log_contains(
+ "request to flush past end of generated WAL; request .*, current position .*",
+ $logstart);
+
+ if ($res eq 't' && $log > 0)
+ {
+ $result = 1;
+ last;
+ }
+
+ usleep(100_000);
+}
+ok($result, 'waited until WAL is successfully read from WAL buffers');
+
+$result = 0;
+
+# Wait until we get info of WAL records available in WAL buffers.
+for (my $i = 0; $i < 10 * $PostgreSQL::Test::Utils::timeout_default; $i++)
+{
+ $node->safe_psql('postgres', "DROP TABLE IF EXISTS foo, bar;");
+ $node->safe_psql('postgres',
+ "CREATE TABLE foo AS SELECT * FROM generate_series(1, 2);");
+ my $start_lsn =
+ $node->safe_psql('postgres', "SELECT pg_current_wal_insert_lsn();");
+ my $tbl_oid = $node->safe_psql('postgres',
+ "SELECT oid FROM pg_class WHERE relname = 'foo';");
+ $node->safe_psql('postgres',
+ "INSERT INTO foo SELECT * FROM generate_series(1, 10);");
+ my $end_lsn =
+ $node->safe_psql('postgres', "SELECT pg_current_wal_insert_lsn();");
+ $node->safe_psql('postgres',
+ "CREATE TABLE bar AS SELECT * FROM generate_series(1, 2);");
+
+ my $res = $node->safe_psql(
+ 'postgres',
+ "SELECT count(*) FROM get_wal_records_info_from_buffers('$start_lsn', '$end_lsn')
+ WHERE block_ref LIKE concat('%', '$tbl_oid', '%') AND
+ resource_manager = 'Heap' AND
+ record_type = 'INSERT';");
+
+ if ($res eq 10)
+ {
+ $result = 1;
+ last;
+ }
+
+ usleep(100_000);
+}
+ok($result,
+ 'waited until we get info of WAL records available in WAL buffers.');
+
+done_testing();
--
2.34.1
Hi all.
I've been back to this patch for a while recently. I witness that if a WAL
writer works fast, the already flushed WAL buffers will be zeroed out and
re-initialized for future use by AdvanceXLInsertBuffer in
XLogBackgroundFlush, so that WALReadFromBuffers will miss even though the
space of WAL buffer is enough. It is much more unfriendly for logical
walsenders than physical walsenders, because logical ones consume WAL
slower than physical ones due to the extra decoding phase. Seems that the
aim
of AdvanceXLInsertBuffer in WAL writer contradicts with our reading from
WAL buffer. Any thoughts?
Hi,
On Tue, Oct 15, 2024 at 1:22 AM Jingtang Zhang <mrdrivingduck@gmail.com> wrote:
I've been back to this patch for a while recently. I witness that if a WAL
writer works fast, the already flushed WAL buffers will be zeroed out and
re-initialized for future use by AdvanceXLInsertBuffer in
XLogBackgroundFlush, so that WALReadFromBuffers will miss even though the
space of WAL buffer is enough. It is much more unfriendly for logical
walsenders than physical walsenders, because logical ones consume WAL
slower than physical ones due to the extra decoding phase. Seems that the aim
of AdvanceXLInsertBuffer in WAL writer contradicts with our reading from
WAL buffer. Any thoughts?
Thanks for looking at this. Yes, the WAL writers can zero out flushed
buffers before WALReadFromBuffers gets to them. However,
WALReadFromBuffers was intentionally designed as an opportunistic
optimization - it's a "try this first, quickly" approach before
falling back to reading from WAL files. The no-locks design ensures it
never gets in the way of backends generating WAL, which is critical
for overall system performance.
I rebased and attached the v3 patch. I discarded the test extension
patch that demonstrated WALReadFromBuffers' behavior (i.e., waiting
for WAL to be fully copied to WAL buffers with
WaitXLogInsertionsToFinish), as I believe the comment at the top of
WALReadFromBuffers is sufficient documentation. I can reintroduce the
test extension if there's interest.
Thoughts?
--
Bharath Rupireddy
PostgreSQL Contributors Team
RDS Open Source Databases
Amazon Web Services: https://aws.amazon.com
Attachments:
v3-0001-Use-WALReadFromBuffers-in-more-places.patchapplication/octet-stream; name=v3-0001-Use-WALReadFromBuffers-in-more-places.patchDownload
From 60aa2e18df060fcc341847585aa8cd4dcddea5ed Mon Sep 17 00:00:00 2001
From: Bharath Rupireddy <rupiredd@amazon.com>
Date: Sun, 14 Sep 2025 04:54:12 +0000
Subject: [PATCH v3] Use WALReadFromBuffers in more places
Commit 91f2cae introduced WALReadFromBuffers but used it only for
physical replication walsenders. There are several other callers
that use the read_local_xlog_page page_read callback, and logical
replication walsenders can also benefit from reading WAL from WAL
buffers using the new function. This commit extends the use of
WALReadFromBuffers to these callers.
Author: Bharath Rupireddy
Reviewed-by: Jingtang Zhang, Nitin Jadhav
Discussion: https://www.postgresql.org/message-id/CALj2ACVfF2Uj9NoFy-5m98HNtjHpuD17EDE9twVeJng-jTAe7A%40mail.gmail.com
---
src/backend/access/transam/xlogutils.c | 23 +++++++-
src/backend/replication/walsender.c | 77 +++++++++++++++++---------
2 files changed, 70 insertions(+), 30 deletions(-)
diff --git a/src/backend/access/transam/xlogutils.c b/src/backend/access/transam/xlogutils.c
index 38176d9688e..3e5b4f75feb 100644
--- a/src/backend/access/transam/xlogutils.c
+++ b/src/backend/access/transam/xlogutils.c
@@ -876,6 +876,7 @@ read_local_xlog_page_guts(XLogReaderState *state, XLogRecPtr targetPagePtr,
int count;
WALReadError errinfo;
TimeLineID currTLI;
+ Size bytesRead;
loc = targetPagePtr + reqLen;
@@ -995,9 +996,25 @@ read_local_xlog_page_guts(XLogReaderState *state, XLogRecPtr targetPagePtr,
count = read_upto - targetPagePtr;
}
- if (!WALRead(state, cur_page, targetPagePtr, count, tli,
- &errinfo))
- WALReadRaiseError(&errinfo);
+ /* First attempt to read from WAL buffers */
+ bytesRead = WALReadFromBuffers(cur_page, targetPagePtr, count, currTLI);
+
+ /* If we still have bytes to read, get them from WAL file */
+ if (bytesRead < count)
+ {
+ if (!WALRead(state,
+ cur_page + bytesRead,
+ targetPagePtr + bytesRead,
+ count - bytesRead,
+ tli,
+ &errinfo))
+ {
+ WALReadRaiseError(&errinfo);
+ }
+ bytesRead = count; /* All requested bytes read */
+ }
+
+ Assert(bytesRead == count);
/* number of valid bytes in the buffer */
return count;
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 59822f22b8d..937936c3550 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -1036,6 +1036,7 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req
WALReadError errinfo;
XLogSegNo segno;
TimeLineID currTLI;
+ Size bytesRead;
/*
* Make sure we have enough WAL available before retrieving the current
@@ -1073,16 +1074,29 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req
else
count = flushptr - targetPagePtr; /* part of the page available */
- /* now actually read the data, we know it's there */
- if (!WALRead(state,
- cur_page,
- targetPagePtr,
- count,
- currTLI, /* Pass the current TLI because only
+ /* First attempt to read from WAL buffers */
+ bytesRead = WALReadFromBuffers(cur_page, targetPagePtr, count, currTLI);
+
+ targetPagePtr += bytesRead;
+
+ /* If we still have bytes to read, get them from WAL file */
+ if (bytesRead < count)
+ {
+ if (!WALRead(state,
+ cur_page + bytesRead,
+ targetPagePtr,
+ count - bytesRead,
+ currTLI, /* Pass the current TLI because only
* WalSndSegmentOpen controls whether new TLI
* is needed. */
- &errinfo))
- WALReadRaiseError(&errinfo);
+ &errinfo))
+ {
+ WALReadRaiseError(&errinfo);
+ }
+ bytesRead = count; /* All requested bytes read */
+ }
+
+ Assert(bytesRead == count);
/*
* After reading into the buffer, check that what we read was valid. We do
@@ -3176,7 +3190,7 @@ XLogSendPhysical(void)
Size nbytes;
XLogSegNo segno;
WALReadError errinfo;
- Size rbytes;
+ Size bytesRead;
/* If requested switch the WAL sender to the stopping state. */
if (got_STOPPING)
@@ -3392,24 +3406,33 @@ XLogSendPhysical(void)
enlargeStringInfo(&output_message, nbytes);
retry:
- /* attempt to read WAL from WAL buffers first */
- rbytes = WALReadFromBuffers(&output_message.data[output_message.len],
- startptr, nbytes, xlogreader->seg.ws_tli);
- output_message.len += rbytes;
- startptr += rbytes;
- nbytes -= rbytes;
-
- /* now read the remaining WAL from WAL file */
- if (nbytes > 0 &&
- !WALRead(xlogreader,
- &output_message.data[output_message.len],
- startptr,
- nbytes,
- xlogreader->seg.ws_tli, /* Pass the current TLI because
- * only WalSndSegmentOpen controls
- * whether new TLI is needed. */
- &errinfo))
- WALReadRaiseError(&errinfo);
+ /* First attempt to read from WAL buffers */
+ bytesRead = WALReadFromBuffers(&output_message.data[output_message.len],
+ startptr,
+ nbytes,
+ xlogreader->seg.ws_tli);
+
+ startptr += bytesRead;
+
+ /* If we still have bytes to read, get them from WAL file */
+ if (bytesRead < nbytes)
+ {
+ if (!WALRead(xlogreader,
+ &output_message.data[output_message.len + bytesRead],
+ startptr,
+ nbytes - bytesRead,
+ xlogreader->seg.ws_tli, /* Pass the current TLI
+ * because only
+ * WalSndSegmentOpen controls
+ * whether new TLI is needed. */
+ &errinfo))
+ {
+ WALReadRaiseError(&errinfo);
+ }
+ bytesRead = nbytes; /* All requested bytes read */
+ }
+
+ Assert(bytesRead == nbytes);
/* See logical_read_xlog_page(). */
XLByteToSeg(startptr, segno, xlogreader->segcxt.ws_segsize);
--
2.47.3
Hi Bharath,
Hi,
Commit 91f2cae7a4e that introduced WALReadFromBuffers only used it for
physical walsenders. It can also be used in more places benefitting
logical walsenders, backends running pg_walinspect and logical
decoding functions if the WAL is available in WAL buffers. I'm
attaching a 0001 patch for this.
Thank you for working on this. It seems like a useful optimization. Do you
have any information
on how much this improves the performance of the new callers of
WALReadFromBuffers?
Regarding the v3 version of the patch, do you also intend to include other
callers of WALRead,
such as walsummarizer.c and pg_waldump.c?
In WALReadFromBuffers, the buffer scan currently stops when it encounters a
buffer that
doesn't have the needed WAL page. However, it's possible to continue
scanning past the
missing page and find other relevant pages further along. By keeping track
of which pages
are missing, we could read only those specific pages from files, instead of
reading everything
after the first missing page. I am wondering if this was taken into account
during the design
of the function.
Thank you,
Rahila Syed
Hi~
Thanks for looking at this. Yes, the WAL writers can zero out flushed
buffers before WALReadFromBuffers gets to them. However,
WALReadFromBuffers was intentionally designed as an opportunistic
optimization - it's a "try this first, quickly" approach before
falling back to reading from WAL files. The no-locks design ensures it
never gets in the way of backends generating WAL, which is critical
for overall system performance.
Yes, it is actually an interesting thing, beyond current topic. Since
we are using buffered I/O, even though we cannot read from WAL buffer due
to the opportunistic AdvanceXLInsertBuffer by WAL writer, later WALRead may
still find the page inside OS page cache, with high probability, because the
page has just been written out. So WALRead will be fast, too.
But if we are moving forward to direct I/O some day in the future, the cost
of WALReadFromBuffers and WALRead might be obvious. Maybe the opportunistic
WAL buffer initialization could keep a small ratio of old pages inside WAL
buffer so these pages can still be hit by WALReadFromBuffers.
I rebased and attached the v3 patch.
The v3 patch LGTM.
—
Regards,
Jingtang
Alibaba Cloud
On Sat, 2025-09-13 at 22:04 -0700, Bharath Rupireddy wrote:
Thanks for looking at this. Yes, the WAL writers can zero out flushed
buffers before WALReadFromBuffers gets to them. However,
WALReadFromBuffers was intentionally designed as an opportunistic
optimization - it's a "try this first, quickly" approach before
falling back to reading from WAL files.
IIRC, one motivation (perhaps the primary motivation?) was to make it
possible to read buffers before they are flushed. It was always
possible to read already-flushed buffers.
The benefit of reading unflushed buffers is that we can replicate the
WAL sooner (though it can't be replayed until the primary flushes it).
Is that right?
Regards,
Jeff Davis
Hi,
On Mon, Sep 22, 2025 at 8:56 PM Jeff Davis <pgsql@j-davis.com> wrote:
On Sat, 2025-09-13 at 22:04 -0700, Bharath Rupireddy wrote:
Thanks for looking at this. Yes, the WAL writers can zero out flushed
buffers before WALReadFromBuffers gets to them. However,
WALReadFromBuffers was intentionally designed as an opportunistic
optimization - it's a "try this first, quickly" approach before
falling back to reading from WAL files.IIRC, one motivation (perhaps the primary motivation?) was to make it
possible to read buffers before they are flushed. It was always
possible to read already-flushed buffers.
The benefit of reading unflushed buffers is that we can replicate the
WAL sooner (though it can't be replayed until the primary flushes it).
Is that right?
I'm not certain about the primary motivation, but as it stands,
WALReadFromBuffers only reads WAL records present in buffers up to the
flush pointer. This is because XLogSendPhysical currently sends records
only up to the flush pointer, not beyond.
I am currently testing a patch developed by Melih Mutlu that implements the
functionality you described, sending unflushed buffers during physical
replication. After some tuning, the patch has shown a 5 percent improvement
in TPS for synchronous replication with remote_write. I am working on
further improving the patch before sharing it on the hackers mailing list.
Thank you,
Rahila Syed
Hi,
On Mon, Sep 22, 2025 at 8:26 AM Jeff Davis <pgsql@j-davis.com> wrote:
On Sat, 2025-09-13 at 22:04 -0700, Bharath Rupireddy wrote:
Thanks for looking at this. Yes, the WAL writers can zero out flushed
buffers before WALReadFromBuffers gets to them. However,
WALReadFromBuffers was intentionally designed as an opportunistic
optimization - it's a "try this first, quickly" approach before
falling back to reading from WAL files.IIRC, one motivation (perhaps the primary motivation?) was to make it
possible to read buffers before they are flushed. It was always
possible to read already-flushed buffers.The benefit of reading unflushed buffers is that we can replicate the
WAL sooner (though it can't be replayed until the primary flushes it).
Is that right?
Right. Reading unflushed WAL buffers for replication was one of the
motivations. But, in general, WALReadFromBuffers has more benefits
since it lets WAL buffers act as a cache for reads, avoiding the need
to re-read WAL from disk for (both physical and logical) replication.
For example, it makes the use of direct I/O for WAL more realistic and
can provide significant performance benefits [1]/messages/by-id/20230114203403.4zpg72fw2qb34awf@awork3.anarazel.de.
[1]: /messages/by-id/20230114203403.4zpg72fw2qb34awf@awork3.anarazel.de
--
Bharath Rupireddy
PostgreSQL Contributors Team
RDS Open Source Databases
Amazon Web Services: https://aws.amazon.com
On Wed, 2025-09-24 at 07:26 -0700, Bharath Rupireddy wrote:
Right. Reading unflushed WAL buffers for replication was one of the
motivations. But, in general, WALReadFromBuffers has more benefits
since it lets WAL buffers act as a cache for reads, avoiding the need
to re-read WAL from disk for (both physical and logical) replication.
For example, it makes the use of direct I/O for WAL more realistic
and
can provide significant performance benefits [1].
Is it possible to do a POC that shows the potential benefit, or are we
still too far away?
Regards,
Jeff Davis
Hi,
On Wed, Sep 24, 2025 at 11:27 AM Jeff Davis <pgsql@j-davis.com> wrote:
On Wed, 2025-09-24 at 07:26 -0700, Bharath Rupireddy wrote:
Right. Reading unflushed WAL buffers for replication was one of the
motivations. But, in general, WALReadFromBuffers has more benefits
since it lets WAL buffers act as a cache for reads, avoiding the need
to re-read WAL from disk for (both physical and logical) replication.
For example, it makes the use of direct I/O for WAL more realistic
and
can provide significant performance benefits [1].Is it possible to do a POC that shows the potential benefit, or are we
still too far away?
Thanks for looking into this. I did performance analysis with WAL directo
I/O to see how reading from WAL buffers affects walsenders:
/messages/by-id/CALj2ACV6rS+7iZx5+oAvyXJaN4AG-djAQeM1mrM=YSDkVrUs7g@mail.gmail.com.
Following is from that thread. Please let me know if you have any specific
cases in mind. I'm happy to run the same test for logical replication.
It helps WAL DIO; since there's no OS
page cache, using WAL buffers as read cache helps a lot. It is clearly
evident from my experiment with WAL DIO patch [1], see the results [2]Test case is an insert pgbench workload. clients HEAD | WAL DIO | WAL DIO & WAL BUFFERS READ | WAL BUFFERS READ 1 1404 1070 1424 1375 2 1487 796 1454 1517 4 3064 1743 3011 3019 8 6114 3556 6026 5954 16 11560 7051 12216 12132 32 23181 13079 23449 23561 64 43607 26983 43997 45636 128 80723 45169 81515 81911 256 110925 90185 107332 114046 512 119354 109817 110287 117506 768 112435 105795 106853 111605 1024 107554 105541 105942 109370 2048 88552 79024 80699 90555 4096 61323 54814 58704 61743
and attached graph. As expected, WAL DIO brings down the TPS, whereas
WAL buffers read i.e. this patch brings it up.
[2]: Test case is an insert pgbench workload. clients HEAD | WAL DIO | WAL DIO & WAL BUFFERS READ | WAL BUFFERS READ 1 1404 1070 1424 1375 2 1487 796 1454 1517 4 3064 1743 3011 3019 8 6114 3556 6026 5954 16 11560 7051 12216 12132 32 23181 13079 23449 23561 64 43607 26983 43997 45636 128 80723 45169 81515 81911 256 110925 90185 107332 114046 512 119354 109817 110287 117506 768 112435 105795 106853 111605 1024 107554 105541 105942 109370 2048 88552 79024 80699 90555 4096 61323 54814 58704 61743
clients HEAD | WAL DIO | WAL DIO & WAL BUFFERS READ | WAL BUFFERS READ
1 1404 1070 1424 1375
2 1487 796 1454 1517
4 3064 1743 3011 3019
8 6114 3556 6026 5954
16 11560 7051 12216 12132
32 23181 13079 23449 23561
64 43607 26983 43997 45636
128 80723 45169 81515 81911
256 110925 90185 107332 114046
512 119354 109817 110287 117506
768 112435 105795 106853 111605
1024 107554 105541 105942 109370
2048 88552 79024 80699 90555
4096 61323 54814 58704 61743
--
Bharath Rupireddy
PostgreSQL Contributors Team
RDS Open Source Databases
Amazon Web Services: https://aws.amazon.com