From d2438ef88dc6295d8f670f02188e01e38518f49a Mon Sep 17 00:00:00 2001 From: Dilip Kumar Date: Tue, 14 Oct 2025 15:03:38 +0530 Subject: [PATCH v1] Support large object decoding Introduce support for decoding changes to large objects in logical replication. Changes to 'pg_largeobject' are now intercepted in 'heap_decode' based on LargeObjectRelationId. Since a single large object operation (LO_WRITE) spans multiple physical rows in 'pg_largeobject', the changes are decoded and converted into a dedicated logical operation: REORDER_BUFFER_CHANGE_LOWRITE. --- contrib/test_decoding/Makefile | 2 +- .../expected/decoding_largeobject.out | 216 ++++++++++++++++++ .../sql/decoding_largeobject.sql | 94 ++++++++ contrib/test_decoding/test_decoding.c | 39 ++++ src/backend/replication/logical/decode.c | 133 +++++++++++ src/backend/replication/logical/proto.c | 41 ++++ .../replication/logical/reorderbuffer.c | 61 +++++ src/backend/replication/pgoutput/pgoutput.c | 23 ++ src/include/replication/logicalproto.h | 5 + src/include/replication/reorderbuffer.h | 12 + src/include/utils/rel.h | 6 +- 11 files changed, 630 insertions(+), 2 deletions(-) create mode 100644 contrib/test_decoding/expected/decoding_largeobject.out create mode 100644 contrib/test_decoding/sql/decoding_largeobject.sql diff --git a/contrib/test_decoding/Makefile b/contrib/test_decoding/Makefile index acbcaed2feb..d1f02500cf3 100644 --- a/contrib/test_decoding/Makefile +++ b/contrib/test_decoding/Makefile @@ -5,7 +5,7 @@ PGFILEDESC = "test_decoding - example of a logical decoding output plugin" REGRESS = ddl xact rewrite toast permissions decoding_in_xact \ decoding_into_rel binary prepared replorigin time messages \ - spill slot truncate stream stats twophase twophase_stream + spill slot truncate stream stats twophase twophase_stream decoding_largeobject ISOLATION = mxact delayed_startup ondisk_startup concurrent_ddl_dml \ oldest_xmin snapshot_transfer subxact_without_top concurrent_stream \ twophase_snapshot slot_creation_error catalog_change_snapshot \ diff --git a/contrib/test_decoding/expected/decoding_largeobject.out b/contrib/test_decoding/expected/decoding_largeobject.out new file mode 100644 index 00000000000..a2720d82064 --- /dev/null +++ b/contrib/test_decoding/expected/decoding_largeobject.out @@ -0,0 +1,216 @@ +-- test that we can insert into the large objects and decode the changes +-- predictability +SET synchronous_commit = on; +SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding'); + ?column? +---------- + init +(1 row) + +-- slot works +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); + data +------ +(0 rows) + +-- Create a new large object +CREATE TABLE lotest_stash_values (loid oid, fd integer); +INSERT INTO lotest_stash_values (loid) SELECT lo_creat(42); +-- NOTE: large objects require transactions +BEGIN; +UPDATE lotest_stash_values SET fd = lo_open(loid, CAST(x'20000' | x'40000' AS integer)); +SELECT lowrite(fd, 'large object test data') FROM lotest_stash_values; + lowrite +--------- + 22 +(1 row) + +SELECT lo_close(fd) FROM lotest_stash_values; + lo_close +---------- + 0 +(1 row) + +END; +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); + data +---------------------------------------------------------------------------- + BEGIN + table public.lotest_stash_values: INSERT: loid[oid]:16970 fd[integer]:null + COMMIT + BEGIN + table public.lotest_stash_values: UPDATE: loid[oid]:16970 fd[integer]:0 + LO_WRITE: loid: 16970 offset: 0 datalen: 22 data: large object test data + COMMIT +(7 rows) + +BEGIN; +UPDATE lotest_stash_values SET fd=lo_open(loid, CAST(x'20000' | x'40000' AS integer)); +SELECT lo_lseek(fd, 10, 0) FROM lotest_stash_values; + lo_lseek +---------- + 10 +(1 row) + +SELECT lowrite(fd, 'overwrite some data') FROM lotest_stash_values; + lowrite +--------- + 19 +(1 row) + +END; +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); + data +--------------------------------------------------------------------------------- + BEGIN + table public.lotest_stash_values: UPDATE: loid[oid]:16970 fd[integer]:0 + LO_WRITE: loid: 16970 offset: 0 datalen: 29 data: large objeoverwrite some data + COMMIT +(4 rows) + +BEGIN; +UPDATE lotest_stash_values SET fd=lo_open(loid, CAST(x'20000' | x'40000' AS integer)); +SELECT lo_lseek(fd, 2048, 0) FROM lotest_stash_values; + lo_lseek +---------- + 2048 +(1 row) + +SELECT lowrite(fd, 'write into new page') FROM lotest_stash_values; + lowrite +--------- + 19 +(1 row) + +END; +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); + data +-------------------------------------------------------------------------- + BEGIN + table public.lotest_stash_values: UPDATE: loid[oid]:16970 fd[integer]:0 + LO_WRITE: loid: 16970 offset: 2048 datalen: 19 data: write into new page + COMMIT +(4 rows) + +BEGIN; +UPDATE lotest_stash_values SET fd = lo_open(loid, CAST(x'20000' | x'40000' AS integer)); +SELECT lowrite(fd, ' +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +') FROM lotest_stash_values; + lowrite +--------- + 3829 +(1 row) + +SELECT lo_close(fd) FROM lotest_stash_values; + lo_close +---------- + 0 +(1 row) + +END; +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); + data +------------------------------------------------------------------------------------------------- + BEGIN + table public.lotest_stash_values: UPDATE: loid[oid]:16970 fd[integer]:0 + LO_WRITE: loid: 16970 offset: 0 datalen: 2048 data: + + test large data more in 2048 test large data more in 2048 test large data more in 2048 + + test large data more in 2048 test large data more in 2048 test large data more in 2048 + + test large data more in 2048 test large data more in 2048 test large data more in 2048 + + test large data more in 2048 test large data more in 2048 test large data more in 2048 + + test large data more in 2048 test large data more in 2048 test large data more in 2048 + + test large data more in 2048 test large data more in 2048 test large data more in 2048 + + test large data more in 2048 test large data more in 2048 test large data more in 2048 + + test large data more in 2048 test large data more in 2048 test large data more in 2048 + + test large data more in 2048 test large data more in 2048 test large data more in 2048 + + test large data more in 2048 test large data more in 2048 test large data more in 2048 + + test large data more in 2048 test large data more in 2048 test large data more in 2048 + + test large data more in 2048 test large data more in 2048 test large data more in 2048 + + test large data more in 2048 test large data more in 2048 test large data more in 2048 + + test large data more in 2048 test large data more in 2048 test large data more in 2048 + + test large data more in 2048 test large data more in 2048 test large data more in 2048 + + test large data more in 2048 test large data more in 2048 test large data more in 2048 + + test large data more in 2048 test large data more in 2048 test large data more in 2048 + + test large data more in 2048 test large data more in 2048 test large data more in 2048 + + test large data more in 2048 test large data more in 2048 test large data more in 2048 + + test large data more in 2048 test large data more in 2048 test large data more in 2048 + + test large data more in 2048 test large data more in 2048 test large data more in 2048 + + test large data more in 2048 test large data more in 2048 test large data more in 2048 + + test large data more in 2048 test large data more in 2048 test large data more in 2048 + + test large data more in 2048 test large data m + LO_WRITE: loid: 16970 offset: 2048 datalen: 1781 data: ore in 2048 test large data more in 2048+ + test large data more in 2048 test large data more in 2048 test large data more in 2048 + + test large data more in 2048 test large data more in 2048 test large data more in 2048 + + test large data more in 2048 test large data more in 2048 test large data more in 2048 + + test large data more in 2048 test large data more in 2048 test large data more in 2048 + + test large data more in 2048 test large data more in 2048 test large data more in 2048 + + test large data more in 2048 test large data more in 2048 test large data more in 2048 + + test large data more in 2048 test large data more in 2048 test large data more in 2048 + + test large data more in 2048 test large data more in 2048 test large data more in 2048 + + test large data more in 2048 test large data more in 2048 test large data more in 2048 + + test large data more in 2048 test large data more in 2048 test large data more in 2048 + + test large data more in 2048 test large data more in 2048 test large data more in 2048 + + test large data more in 2048 test large data more in 2048 test large data more in 2048 + + test large data more in 2048 test large data more in 2048 test large data more in 2048 + + test large data more in 2048 test large data more in 2048 test large data more in 2048 + + test large data more in 2048 test large data more in 2048 test large data more in 2048 + + test large data more in 2048 test large data more in 2048 test large data more in 2048 + + test large data more in 2048 test large data more in 2048 test large data more in 2048 + + test large data more in 2048 test large data more in 2048 test large data more in 2048 + + test large data more in 2048 test large data more in 2048 test large data more in 2048 + + test large data more in 2048 test large data more in 2048 test large data more in 2048 + + + COMMIT +(5 rows) + +-- Clean up the slot +SELECT pg_drop_replication_slot('regression_slot'); + pg_drop_replication_slot +-------------------------- + +(1 row) + diff --git a/contrib/test_decoding/sql/decoding_largeobject.sql b/contrib/test_decoding/sql/decoding_largeobject.sql new file mode 100644 index 00000000000..ff392de2fba --- /dev/null +++ b/contrib/test_decoding/sql/decoding_largeobject.sql @@ -0,0 +1,94 @@ +-- test that we can insert into the large objects and decode the changes + +-- predictability +SET synchronous_commit = on; + +SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding'); + +-- slot works +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); + +-- Create a new large object +CREATE TABLE lotest_stash_values (loid oid, fd integer); + +INSERT INTO lotest_stash_values (loid) SELECT lo_creat(42); + +-- NOTE: large objects require transactions +BEGIN; +UPDATE lotest_stash_values SET fd = lo_open(loid, CAST(x'20000' | x'40000' AS integer)); +SELECT lowrite(fd, 'large object test data') FROM lotest_stash_values; +SELECT lo_close(fd) FROM lotest_stash_values; +END; +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); + +BEGIN; +UPDATE lotest_stash_values SET fd=lo_open(loid, CAST(x'20000' | x'40000' AS integer)); +SELECT lo_lseek(fd, 10, 0) FROM lotest_stash_values; +SELECT lowrite(fd, 'overwrite some data') FROM lotest_stash_values; +END; + +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); + +BEGIN; +UPDATE lotest_stash_values SET fd=lo_open(loid, CAST(x'20000' | x'40000' AS integer)); +SELECT lo_lseek(fd, 2048, 0) FROM lotest_stash_values; +SELECT lowrite(fd, 'write into new page') FROM lotest_stash_values; +END; + +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); + +BEGIN; +UPDATE lotest_stash_values SET fd = lo_open(loid, CAST(x'20000' | x'40000' AS integer)); +SELECT lowrite(fd, ' +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +') FROM lotest_stash_values; +SELECT lo_close(fd) FROM lotest_stash_values; +END; +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); + + +-- Clean up the slot +SELECT pg_drop_replication_slot('regression_slot'); \ No newline at end of file diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c index 36e77c69e1c..d34a73666f4 100644 --- a/contrib/test_decoding/test_decoding.c +++ b/contrib/test_decoding/test_decoding.c @@ -470,6 +470,38 @@ pg_decode_filter(LogicalDecodingContext *ctx, return false; } +static void +pg_decode_lo_write(LogicalDecodingContext *ctx, + ReorderBufferChange *change) +{ + TestDecodingData *data; + MemoryContext old; + Oid loid = change->data.lo_write.loid; + int64 offset = change->data.lo_write.offset; + Size datalen = change->data.lo_write.datalen; + char *lodata = change->data.lo_write.data; + + data = ctx->output_plugin_private; + + /* Avoid leaking memory by using and resetting our own context */ + old = MemoryContextSwitchTo(data->context); + + OutputPluginPrepareWrite(ctx, true); + + appendStringInfo(ctx->out, "LO_WRITE:"); + appendStringInfo(ctx->out, " loid: %u offset: " INT64_FORMAT " datalen: %zu data: ", + loid, offset, datalen); + + appendBinaryStringInfo(ctx->out, lodata, datalen); + + /* For test_decoding, we print the data length but typically skip the binary data itself */ + + MemoryContextSwitchTo(old); + MemoryContextReset(data->context); + + OutputPluginWrite(ctx, true); +} + /* * Print literal `outputstr' already represented as string of type `typid' * into stringbuf `s'. @@ -619,6 +651,13 @@ pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, } txndata->xact_wrote_changes = true; + /* Handle large object changes independent of the table changes. */ + if (change->action == REORDER_BUFFER_CHANGE_LOWRITE) + { + pg_decode_lo_write(ctx, change); + return; + } + class_form = RelationGetForm(relation); tupdesc = RelationGetDescr(relation); diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index cc03f0706e9..d948df84065 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -26,6 +26,7 @@ */ #include "postgres.h" +#include "access/detoast.h" #include "access/heapam_xlog.h" #include "access/transam.h" #include "access/xact.h" @@ -33,11 +34,13 @@ #include "access/xlogreader.h" #include "access/xlogrecord.h" #include "catalog/pg_control.h" +#include "catalog/pg_largeobject.h" #include "replication/decode.h" #include "replication/logical.h" #include "replication/message.h" #include "replication/reorderbuffer.h" #include "replication/snapbuild.h" +#include "storage/large_object.h" #include "storage/standbydefs.h" /* individual record(group)'s handlers */ @@ -56,6 +59,10 @@ static void DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, bool two_phase); static void DecodePrepare(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, xl_xact_parsed_prepare *parsed); +static void DecodeLargeObjectInsert(LogicalDecodingContext *ctx, + XLogRecordBuffer *buf); +static void DecodeLargeObjectChanges(uint8 info, LogicalDecodingContext *ctx, + XLogRecordBuffer *buf); /* common function to decode tuples */ @@ -471,6 +478,8 @@ heap_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) uint8 info = XLogRecGetInfo(buf->record) & XLOG_HEAP_OPMASK; TransactionId xid = XLogRecGetXid(buf->record); SnapBuild *builder = ctx->snapshot_builder; + RelFileLocator target_locator; + XLogReaderState *r = buf->record; ReorderBufferProcessXid(ctx->reorder, xid, buf->origptr); @@ -485,6 +494,22 @@ heap_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT) return; + XLogRecGetBlockTag(r, 0, &target_locator, NULL, NULL); + + /* + * Check if the WAL record pertains to 'pg_largeobject'. If it does, + * handle the large object changes separately via + * DecodeLargeObjectChanges, bypassing the standard heap table decoding + * logic that follows. + */ + if (target_locator.relNumber == LargeObjectRelationId) + { + if (SnapBuildProcessChange(builder, xid, buf->origptr) && + !ctx->fast_forward) + DecodeLargeObjectChanges(info, ctx, buf); + return; + } + switch (info) { case XLOG_HEAP_INSERT: @@ -1323,3 +1348,111 @@ DecodeTXNNeedSkip(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, return false; } + +/* + * Helper function to decode a 'pg_largeobject' INSERT record into a + * 'REORDER_BUFFER_CHANGE_LOWRITE' change. + * + * Each row in 'pg_largeobject' represents only a small page (or chunk) of + * a large object's data. Logically, these individual page-level inserts + * are not meaningful on their own to a consumer. Therefore, instead of + * treating them as regular heap tuple changes, we convert the physical + * page insert into a single, more meaningful logical operation: a + * 'LO_WRITE' change, which can be applied as an independent large object + * operation. + */ +static void +DecodeLargeObjectInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) +{ + XLogReaderState *r = buf->record; + ReorderBufferChange *change; + Size datalen; + char *tupledata; + HeapTuple tuple; + bytea *data_chunk; + Oid loid; + int32 pageno; + int64 offset; + Size chunk_datalen; + char *data_copy; + bool freeit = false; + Form_pg_largeobject largeobject; + + tupledata = XLogRecGetBlockData(r, 0, &datalen); + if (datalen == 0) + return; + + tuple = ReorderBufferAllocTupleBuf(ctx->reorder, datalen - SizeOfHeapHeader); + DecodeXLogTuple(tupledata, datalen, tuple); + largeobject = GETSTRUCT(tuple); + + /* Fetch loid, pageno and actual data from the pg_largeobject tuple. */ + loid = largeobject->loid; + pageno = largeobject->pageno; + data_chunk = &(largeobject->data); + if (VARATT_IS_EXTENDED(data_chunk)) + { + data_chunk = (bytea *) + detoast_attr((struct varlena *) data_chunk); + freeit = true; + } + chunk_datalen = VARSIZE(data_chunk) - VARHDRSZ; + + /* + * Convert the single 'pg_largeobject' row (which represents a data page) + * into a logical 'LOWRITE' operation. The absolute offset for this write + * is computed by multiplying the page number ('pageno') by the fixed + * large object block size (LOBLKSIZE). + */ + offset = (int64) pageno * LOBLKSIZE; + //chunk_datalen = VARSIZE_ANY_EXHDR(data_chunk); + data_copy = ReorderBufferAllocRawBuffer(ctx->reorder, chunk_datalen); + memcpy(data_copy, VARDATA(data_chunk), chunk_datalen); + + + /* Create the LOWRITE change */ + change = ReorderBufferAllocChange(ctx->reorder); + change->action = REORDER_BUFFER_CHANGE_LOWRITE; + change->origin_id = XLogRecGetOrigin(r); + + change->data.lo_write.loid = loid; + change->data.lo_write.offset = offset; + change->data.lo_write.datalen = chunk_datalen; + change->data.lo_write.data = data_copy; + + /* Enqueue the change */ + ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr, + change, false); + ReorderBufferFreeTupleBuf(tuple); + if (freeit) + pfree(data_chunk); +} + +/* + * Processes and decodes all logical changes for large objects (LOs). + * Since LO data is spread across 'pg_largeobject' rows, this function + * maps physical changes (INSERT/UPDATE) to a single logical 'LO_WRITE' + * operation. + * + * TODO: Temporarily ignoring LO_UNLINK (DELETE), which will be + * handled during a later phase. + */ +static void +DecodeLargeObjectChanges(uint8 info, LogicalDecodingContext *ctx, + XLogRecordBuffer *buf) +{ + switch (info) + { + case XLOG_HEAP_INSERT: + case XLOG_HEAP_HOT_UPDATE: + case XLOG_HEAP_UPDATE: + DecodeLargeObjectInsert(ctx, buf); + break; + case XLOG_HEAP_DELETE: + /* LO_UNLINK (delete) is handled in a later phase */ + break; + default: + /* Ignore other operations on pg_largeobject for now */ + break; + } +} diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c index f0a913892b9..68d1a4306e4 100644 --- a/src/backend/replication/logical/proto.c +++ b/src/backend/replication/logical/proto.c @@ -346,6 +346,45 @@ logicalrep_read_rollback_prepared(StringInfo in, strlcpy(rollback_data->gid, pq_getmsgstring(in), sizeof(rollback_data->gid)); } +void +logicalrep_write_lo_write(StringInfo out, TransactionId xid, Oid loid, + int64 offset, Size datalen, const char *data) +{ + pq_sendbyte(out, LOGICAL_REP_MSG_LOWRITE); + + /* Write LO ID, offset, and data length */ + pq_sendint32(out, loid); + pq_sendint64(out, offset); + pq_sendint32(out, datalen); + + /* Write the data chunk */ + pq_sendbytes(out, data, datalen); +} + +void +logicalrep_read_lo_write(StringInfo s, Oid *loid, int64 *offset, Size *datalen, + char **data) +{ + /* Read fields, incorporating validation */ + *loid = pq_getmsgint(s, 4); + if (!OidIsValid(*loid)) + elog(ERROR, "large object ID is not set in LO write message"); + + *offset = pq_getmsgint64(s); + if (*offset < 0) + elog(ERROR, "invalid offset " INT64_FORMAT " in LO write message", *offset); + + *datalen = pq_getmsgint(s, 4); + if (*datalen < 0) + elog(ERROR, "invalid data length %zu in LO write message", *datalen); + + /* Allocate memory for the data payload */ + *data = palloc(*datalen); + + /* Read the data payload directly into the new buffer */ + pq_copymsgbytes(s, *data, *datalen); +} + /* * Write STREAM PREPARE to the output stream. */ @@ -1235,6 +1274,8 @@ logicalrep_message_type(LogicalRepMsgType action) return "TYPE"; case LOGICAL_REP_MSG_MESSAGE: return "MESSAGE"; + case LOGICAL_REP_MSG_LOWRITE: + return "LOWRITE"; case LOGICAL_REP_MSG_BEGIN_PREPARE: return "BEGIN PREPARE"; case LOGICAL_REP_MSG_PREPARE: diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index eb6a84554b7..aeb381b9d8e 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -579,6 +579,13 @@ ReorderBufferFreeChange(ReorderBuffer *rb, ReorderBufferChange *change, case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID: case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID: break; + case REORDER_BUFFER_CHANGE_LOWRITE: + if (change->data.lo_write.data != NULL) + { + pfree(change->data.lo_write.data); + change->data.lo_write.data = NULL; + } + break; } pfree(change); @@ -930,6 +937,19 @@ ReorderBufferQueueMessage(ReorderBuffer *rb, TransactionId xid, } } +/* + * Allocate a raw memory from reorder buffer. + */ +void * +ReorderBufferAllocRawBuffer(ReorderBuffer *rb, Size alloc_len) +{ + void *buffer; + + buffer = (char *) MemoryContextAlloc(rb->tup_context, alloc_len); + + return buffer; +} + /* * AssertTXNLsnOrder * Verify LSN ordering of transaction lists in the reorderbuffer @@ -2585,6 +2605,10 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID: elog(ERROR, "tuplecid value in changequeue"); break; + + case REORDER_BUFFER_CHANGE_LOWRITE: + ReorderBufferApplyChange(rb, txn, NULL, change, streaming); + break; } /* @@ -4270,6 +4294,26 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn, case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID: /* ReorderBufferChange contains everything important */ break; + case REORDER_BUFFER_CHANGE_LOWRITE: + { + char *data; + Size datalen = change->data.lo_write.datalen; + + sz += datalen; + + /* make sure we have enough space */ + ReorderBufferSerializeReserve(rb, sz); + + data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange); + + /* might have been reallocated above */ + ondisk = (ReorderBufferDiskChange *) rb->outbuf; + + /* Copy the LO_WRITE struct and the data payload immediately following it */ + memcpy(data, &change->data.lo_write.data, datalen); + + break; + } } ondisk->size = sz; @@ -4534,6 +4578,11 @@ ReorderBufferChangeSize(ReorderBufferChange *change) case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID: /* ReorderBufferChange contains everything important */ break; + case REORDER_BUFFER_CHANGE_LOWRITE: + { + sz += change->data.lo_write.datalen; + break; + } } return sz; @@ -4833,6 +4882,18 @@ ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn, case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID: case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID: break; + case REORDER_BUFFER_CHANGE_LOWRITE: + { + Size datalen = change->data.lo_write.datalen; + + /* Allocate memory for the data payload */ + change->data.lo_write.data = MemoryContextAlloc(rb->context, datalen); + + /* Copy the data payload */ + memcpy(change->data.lo_write.data, data, datalen); + + break; + } } dlist_push_tail(&txn->changes, &change->node); diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index 942e1abdb58..0b126005ab4 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -1491,6 +1491,29 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, TupleTableSlot *old_slot = NULL; TupleTableSlot *new_slot = NULL; + if (change->action == REORDER_BUFFER_CHANGE_LOWRITE) + { + if (txndata && !txndata->sent_begin_txn) + pgoutput_send_begin(ctx, txn); + + /* Remember the xid for the change in streaming mode. */ + if (data->in_streaming) + xid = change->txn->xid; + + OutputPluginPrepareWrite(ctx, true); + + /* Use the new helper to serialize the LO payload */ + logicalrep_write_lo_write(ctx->out, xid, + change->data.lo_write.loid, + change->data.lo_write.offset, + change->data.lo_write.datalen, + change->data.lo_write.data); + + OutputPluginWrite(ctx, true); + + return; + } + if (!is_publishable_relation(relation)) return; diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h index b261c60d3fa..dc4ad3898c8 100644 --- a/src/include/replication/logicalproto.h +++ b/src/include/replication/logicalproto.h @@ -66,6 +66,7 @@ typedef enum LogicalRepMsgType LOGICAL_REP_MSG_RELATION = 'R', LOGICAL_REP_MSG_TYPE = 'Y', LOGICAL_REP_MSG_MESSAGE = 'M', + LOGICAL_REP_MSG_LOWRITE = 'W', LOGICAL_REP_MSG_BEGIN_PREPARE = 'b', LOGICAL_REP_MSG_PREPARE = 'P', LOGICAL_REP_MSG_COMMIT_PREPARED = 'K', @@ -214,6 +215,10 @@ extern void logicalrep_write_rollback_prepared(StringInfo out, ReorderBufferTXN TimestampTz prepare_time); extern void logicalrep_read_rollback_prepared(StringInfo in, LogicalRepRollbackPreparedTxnData *rollback_data); +extern void logicalrep_write_lo_write(StringInfo out, TransactionId xid, Oid loid, + int64 offset, Size datalen, const char *data); +extern void logicalrep_read_lo_write(StringInfo s, Oid *loid, int64 *offset, Size *datalen, + char **data); extern void logicalrep_write_stream_prepare(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn); extern void logicalrep_read_stream_prepare(StringInfo in, diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index 3cbe106a3c7..031a83fa6d5 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -61,6 +61,7 @@ typedef enum ReorderBufferChangeType REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT, REORDER_BUFFER_CHANGE_TRUNCATE, + REORDER_BUFFER_CHANGE_LOWRITE, } ReorderBufferChangeType; /* forward declaration */ @@ -154,6 +155,16 @@ typedef struct ReorderBufferChange uint32 ninvalidations; /* Number of messages */ SharedInvalidationMessage *invalidations; /* invalidation message */ } inval; + + /* Lo write */ + struct + { + Oid loid; + int64 offset; + Size datalen; + char *data; + } lo_write; + } data; /* @@ -722,6 +733,7 @@ extern void ReorderBufferQueueMessage(ReorderBuffer *rb, TransactionId xid, Snapshot snap, XLogRecPtr lsn, bool transactional, const char *prefix, Size message_size, const char *message); +extern void *ReorderBufferAllocRawBuffer(ReorderBuffer *rb, Size alloc_len); extern void ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn); diff --git a/src/include/utils/rel.h b/src/include/utils/rel.h index 80286076a11..d8303612d14 100644 --- a/src/include/utils/rel.h +++ b/src/include/utils/rel.h @@ -19,6 +19,7 @@ #include "catalog/catalog.h" #include "catalog/pg_class.h" #include "catalog/pg_index.h" +#include "catalog/pg_largeobject.h" #include "catalog/pg_publication.h" #include "nodes/bitmapset.h" #include "partitioning/partdefs.h" @@ -707,12 +708,15 @@ RelationCloseSmgr(Relation relation) * it would complicate decoding slightly for little gain). Note that we *do* * log information for user defined catalog tables since they presumably are * interesting to the user... + * + * TODO: Logically log pg_largeobject rows with a configuration parameter + * instead of doing it unconditionally. */ #define RelationIsLogicallyLogged(relation) \ (XLogLogicalInfoActive() && \ RelationNeedsWAL(relation) && \ (relation)->rd_rel->relkind != RELKIND_FOREIGN_TABLE && \ - !IsCatalogRelation(relation)) + !(IsCatalogRelation(relation) && RelationGetRelid(relation) != LargeObjectRelationId)) /* routines in utils/cache/relcache.c */ extern void RelationIncrementReferenceCount(Relation rel); -- 2.49.0