>From b66a78366a7234842d8c0a351d090fc1855d0da3 Mon Sep 17 00:00:00 2001 From: Petr Jelinek Date: Fri, 1 Jan 2016 04:27:32 +0100 Subject: [PATCH] WAL Messages --- contrib/test_decoding/Makefile | 2 +- contrib/test_decoding/expected/messages.out | 56 +++++++++++++++++ contrib/test_decoding/sql/messages.sql | 17 +++++ contrib/test_decoding/test_decoding.c | 21 ++++++- doc/src/sgml/func.sgml | 42 +++++++++++++ doc/src/sgml/logicaldecoding.sgml | 22 +++++++ src/backend/access/rmgrdesc/standbydesc.c | 8 +++ src/backend/replication/logical/decode.c | 19 ++++++ src/backend/replication/logical/logical.c | 38 ++++++++++++ src/backend/replication/logical/logicalfuncs.c | 27 ++++++++ src/backend/replication/logical/reorderbuffer.c | 73 ++++++++++++++++++++++ src/backend/storage/ipc/standby.c | 82 +++++++++++++++++++++++++ src/include/catalog/pg_proc.h | 4 ++ src/include/replication/logicalfuncs.h | 2 + src/include/replication/output_plugin.h | 13 ++++ src/include/replication/reorderbuffer.h | 21 +++++++ src/include/storage/standby.h | 15 +++++ 17 files changed, 460 insertions(+), 2 deletions(-) create mode 100644 contrib/test_decoding/expected/messages.out create mode 100644 contrib/test_decoding/sql/messages.sql diff --git a/contrib/test_decoding/Makefile b/contrib/test_decoding/Makefile index a362e69..8fdcfbc 100644 --- a/contrib/test_decoding/Makefile +++ b/contrib/test_decoding/Makefile @@ -38,7 +38,7 @@ submake-test_decoding: $(MAKE) -C $(top_builddir)/contrib/test_decoding REGRESSCHECKS=ddl rewrite toast permissions decoding_in_xact decoding_into_rel \ - binary prepared replorigin + binary prepared replorigin messages regresscheck: | submake-regress submake-test_decoding temp-install $(MKDIR_P) regression_output diff --git a/contrib/test_decoding/expected/messages.out b/contrib/test_decoding/expected/messages.out new file mode 100644 index 0000000..d256851 --- /dev/null +++ b/contrib/test_decoding/expected/messages.out @@ -0,0 +1,56 @@ +-- predictability +SET synchronous_commit = on; +SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding'); + ?column? +---------- + init +(1 row) + +SELECT 'msg1' FROM pg_logical_send_message(true, 'test', 'msg1'); + ?column? +---------- + msg1 +(1 row) + +SELECT 'msg2' FROM pg_logical_send_message(false, 'test', 'msg2'); + ?column? +---------- + msg2 +(1 row) + +BEGIN; +SELECT 'msg3' FROM pg_logical_send_message(true, 'test', 'msg3'); + ?column? +---------- + msg3 +(1 row) + +SELECT 'msg4' FROM pg_logical_send_message(false, 'test', 'msg4'); + ?column? +---------- + msg4 +(1 row) + +SELECT 'msg5' FROM pg_logical_send_message(true, 'test', 'msg5'); + ?column? +---------- + msg5 +(1 row) + +COMMIT; +SELECT regexp_replace(data, 'lsn: [^ ]+', '') FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'force-binary', '0', 'skip-empty-xacts', '1'); + regexp_replace +------------------------------------------------------------- + message: transactional: 1 prefix: test, sz: 4 content:msg1 + message: transactional: 0 prefix: test, sz: 4 content:msg2 + message: transactional: 0 prefix: test, sz: 4 content:msg4 + message: transactional: 1 prefix: test, sz: 4 content:msg3 + message: transactional: 1 prefix: test, sz: 4 content:msg5 +(5 rows) + +SELECT 'init' FROM pg_drop_replication_slot('regression_slot'); + ?column? +---------- + init +(1 row) + diff --git a/contrib/test_decoding/sql/messages.sql b/contrib/test_decoding/sql/messages.sql new file mode 100644 index 0000000..7f462b8 --- /dev/null +++ b/contrib/test_decoding/sql/messages.sql @@ -0,0 +1,17 @@ +-- predictability +SET synchronous_commit = on; + +SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding'); + +SELECT 'msg1' FROM pg_logical_send_message(true, 'test', 'msg1'); +SELECT 'msg2' FROM pg_logical_send_message(false, 'test', 'msg2'); + +BEGIN; +SELECT 'msg3' FROM pg_logical_send_message(true, 'test', 'msg3'); +SELECT 'msg4' FROM pg_logical_send_message(false, 'test', 'msg4'); +SELECT 'msg5' FROM pg_logical_send_message(true, 'test', 'msg5'); +COMMIT; + +SELECT regexp_replace(data, 'lsn: [^ ]+', '') FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'force-binary', '0', 'skip-empty-xacts', '1'); + +SELECT 'init' FROM pg_drop_replication_slot('regression_slot'); diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c index 80fc5f4..064bb5f 100644 --- a/contrib/test_decoding/test_decoding.c +++ b/contrib/test_decoding/test_decoding.c @@ -63,11 +63,15 @@ static void pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferChange *change); static bool pg_decode_filter(LogicalDecodingContext *ctx, RepOriginId origin_id); +static void pg_decode_message(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, XLogRecPtr message_lsn, + bool transactional, const char *prefix, + Size sz, const char *message); void _PG_init(void) { - /* other plugins can perform things here */ + RegisterStandbyMsgPrefix("test"); } /* specify output plugin callbacks */ @@ -82,6 +86,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb) cb->commit_cb = pg_decode_commit_txn; cb->filter_by_origin_cb = pg_decode_filter; cb->shutdown_cb = pg_decode_shutdown; + cb->message_cb = pg_decode_message; } @@ -471,3 +476,17 @@ pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, OutputPluginWrite(ctx, true); } + +static void +pg_decode_message(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, XLogRecPtr lsn, + bool transactional, const char *prefix, + Size sz, const char *message) +{ + OutputPluginPrepareWrite(ctx, true); + appendStringInfo(ctx->out, "message: lsn: %X/%X transactional: %d prefix: %s, sz: %zu content:", + (uint32)(lsn >> 32), (uint32)lsn, transactional, prefix, + sz); + appendBinaryStringInfo(ctx->out, message, sz); + OutputPluginWrite(ctx, true); +} diff --git a/doc/src/sgml/func.sgml b/doc/src/sgml/func.sgml index 8ef9fce..e0fe571 100644 --- a/doc/src/sgml/func.sgml +++ b/doc/src/sgml/func.sgml @@ -17580,6 +17580,48 @@ postgres=# SELECT * FROM pg_xlogfile_name_offset(pg_stop_backup()); + + + + pg_logical_send_message + + pg_logical_send_message(transactional bool, prefix text, content text) + + + void + + + Write text standby message. This can used by logical decoding for + sending generic messages. The parameter + transactional specifies if the message should + be part of current transaction or if it should be written and decoded + immediately. The prefix has to be prefix which + was registered by a plugin. The content is + content of the message. + + + + + + + pg_logical_send_message + + pg_logical_send_message(transactional bool, prefix text, content bytea) + + + void + + + Write binary standby message. This can used by logical decoding for + sending generic messages. The parameter + transactional specifies if the message should + be part of current transaction or if it should be written and decoded + immediately. The prefix has to be prefix which + was registered by a plugin. The content is + content of the message. + + + diff --git a/doc/src/sgml/logicaldecoding.sgml b/doc/src/sgml/logicaldecoding.sgml index 02794cc..eae3639 100644 --- a/doc/src/sgml/logicaldecoding.sgml +++ b/doc/src/sgml/logicaldecoding.sgml @@ -363,6 +363,7 @@ typedef struct OutputPluginCallbacks LogicalDecodeBeginCB begin_cb; LogicalDecodeChangeCB change_cb; LogicalDecodeCommitCB commit_cb; + LogicalDecodeMessageCB message_cb; LogicalDecodeFilterByOriginCB filter_by_origin_cb; LogicalDecodeShutdownCB shutdown_cb; } OutputPluginCallbacks; @@ -602,6 +603,27 @@ typedef bool (*LogicalDecodeChangeCB) ( more efficient. + + + Genenric Message Callback + + + The optional message_cb callback is called whenever + a standby message has been decoded. + +typedef void (*LogicalDecodeMessageCB) ( + struct LogicalDecodingContext *, + ReorderBufferTXN *txn, + XLogRecPtr message_lsn, + bool transactional, + const char *prefix, + Size message_size, + const char *message +); + + + + diff --git a/src/backend/access/rmgrdesc/standbydesc.c b/src/backend/access/rmgrdesc/standbydesc.c index 3d35f38..ecf4f6e 100644 --- a/src/backend/access/rmgrdesc/standbydesc.c +++ b/src/backend/access/rmgrdesc/standbydesc.c @@ -58,6 +58,14 @@ standby_desc(StringInfo buf, XLogReaderState *record) standby_desc_running_xacts(buf, xlrec); } + else if (info == XLOG_STANDBY_MESSAGE) + { + xl_standby_message *xlrec = (xl_standby_message *) rec; + + appendStringInfo(buf, "%s message size %zu bytes", + xlrec->transactional ? "transactional" : "nontransactional", + xlrec->message_size); + } } const char * diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index 9f60687..008301a 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -298,6 +298,25 @@ DecodeStandbyOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) break; case XLOG_STANDBY_LOCK: break; + case XLOG_STANDBY_MESSAGE: + { + xl_standby_message *message = (xl_standby_message *) XLogRecGetData(r); + + if (message->transactional && + !SnapBuildProcessChange(builder, XLogRecGetXid(r), buf->origptr)) + break; + else if(!message->transactional && + (SnapBuildCurrentState(ctx->snapshot_builder) != SNAPBUILD_CONSISTENT || + SnapBuildXactNeedsSkip(builder, buf->origptr))) + break; + + ReorderBufferQueueMessage(ctx->reorder, XLogRecGetXid(r), + buf->endptr, message->transactional, + message->message, /* first part of message is prefix */ + message->message_size, + message->message + message->prefix_size); + break; + } default: elog(ERROR, "unexpected RM_STANDBY_ID record type: %u", info); } diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 2e14a3e..d4d58d9 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -62,6 +62,9 @@ static void commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, XLogRecPtr commit_lsn); static void change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change); +static void message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, + XLogRecPtr message_lsn, bool transactional, + const char *prefix, Size sz, const char *message); static void LoadOutputPlugin(OutputPluginCallbacks *callbacks, char *plugin); @@ -178,6 +181,7 @@ StartupDecodingContext(List *output_plugin_options, ctx->reorder->begin = begin_cb_wrapper; ctx->reorder->apply_change = change_cb_wrapper; ctx->reorder->commit = commit_cb_wrapper; + ctx->reorder->message = message_cb_wrapper; ctx->out = makeStringInfo(); ctx->prepare_write = prepare_write; @@ -702,6 +706,40 @@ filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id) return ret; } +static void message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, + XLogRecPtr message_lsn, + bool transactional, const char *prefix, + Size sz, const char *message) +{ + LogicalDecodingContext *ctx = cache->private_data; + LogicalErrorCallbackState state; + ErrorContextCallback errcallback; + + if (ctx->callbacks.message_cb == NULL) + return; + + /* Push callback + info on the error context stack */ + state.ctx = ctx; + state.callback_name = "message"; + state.report_location = message_lsn; /* beginning of commit record */ + errcallback.callback = output_plugin_error_callback; + errcallback.arg = (void *) &state; + errcallback.previous = error_context_stack; + error_context_stack = &errcallback; + + /* set output state */ + ctx->accept_writes = true; + ctx->write_xid = txn != NULL ? txn->xid : InvalidTransactionId; + ctx->write_location = message_lsn; + + /* do the actual work: call callback */ + ctx->callbacks.message_cb(ctx, txn, message_lsn, transactional, + prefix, sz, message); + + /* Pop the error context stack */ + error_context_stack = errcallback.previous; +} + /* * Set the required catalog xmin horizon for historic snapshots in the current * replication slot. diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c index 012987a..a76f8c4 100644 --- a/src/backend/replication/logical/logicalfuncs.c +++ b/src/backend/replication/logical/logicalfuncs.c @@ -23,6 +23,8 @@ #include "access/xlog_internal.h" +#include "access/xact.h" + #include "catalog/pg_type.h" #include "nodes/makefuncs.h" @@ -42,6 +44,7 @@ #include "replication/logicalfuncs.h" #include "storage/fd.h" +#include "storage/standby.h" /* private date for writing out data */ typedef struct DecodingOutputState @@ -517,3 +520,27 @@ pg_logical_slot_peek_binary_changes(PG_FUNCTION_ARGS) return ret; } + + +/* + * SQL function returning the changestream in binary, only peeking ahead. + */ +Datum +pg_logical_send_message_bytea(PG_FUNCTION_ARGS) +{ + bool transactional = PG_GETARG_BOOL(0); + char *prefix = text_to_cstring(PG_GETARG_TEXT_PP(1)); + bytea *data = PG_GETARG_BYTEA_PP(2); + XLogRecPtr lsn; + + lsn = LogStandbyMessage(prefix, VARDATA_ANY(data), VARSIZE_ANY_EXHDR(data), + transactional); + PG_RETURN_LSN(lsn); +} + +Datum +pg_logical_send_message_text(PG_FUNCTION_ARGS) +{ + /* bytea and text are compatible */ + return pg_logical_send_message_bytea(fcinfo); +} diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index cdc7bd7..4ee7bb9 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -414,6 +414,11 @@ ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change) change->data.tp.oldtuple = NULL; } break; + case REORDER_BUFFER_CHANGE_MESSAGE: + if (change->data.msg.message != NULL) + pfree(change->data.msg.message); + change->data.msg.message = NULL; + break; case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT: if (change->data.snapshot) { @@ -599,6 +604,39 @@ ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, ReorderBufferCheckSerializeTXN(rb, txn); } +void +ReorderBufferQueueMessage(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, + bool transactional, const char *prefix, Size msg_sz, + const char *msg) +{ + ReorderBufferTXN *txn = NULL; + + if (transactional) + { + ReorderBufferChange *change; + + txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true); + + Assert(xid != InvalidTransactionId); + Assert(txn != NULL); + + change = ReorderBufferGetChange(rb); + change->action = REORDER_BUFFER_CHANGE_MESSAGE; + change->data.msg.transactional = true; + change->data.msg.prefix = pstrdup(prefix); + change->data.msg.message_size = msg_sz; + change->data.msg.message = palloc(msg_sz); + memcpy(change->data.msg.message, msg, msg_sz); + + ReorderBufferQueueChange(rb, xid, lsn, change); + } + else + { + rb->message(rb, txn, lsn, transactional, prefix, msg_sz, msg); + } +} + + static void AssertTXNLsnOrder(ReorderBuffer *rb) { @@ -1465,6 +1503,14 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, specinsert = change; break; + case REORDER_BUFFER_CHANGE_MESSAGE: + rb->message(rb, txn, change->lsn, + change->data.msg.transactional, + change->data.msg.prefix, + change->data.msg.message_size, + change->data.msg.message); + break; + case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT: /* get rid of the old */ TeardownHistoricSnapshot(false); @@ -2117,6 +2163,21 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn, } break; } + case REORDER_BUFFER_CHANGE_MESSAGE: + { + char *data; + size_t prefix_size = strlen(change->data.msg.prefix) + 1; + + sz += prefix_size + change->data.msg.message_size; + ReorderBufferSerializeReserve(rb, sz); + + data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange); + memcpy(data, change->data.msg.prefix, + prefix_size); + memcpy(data + prefix_size, change->data.msg.message, + change->data.msg.message_size); + break; + } case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT: { Snapshot snap; @@ -2354,6 +2415,18 @@ ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn, data += len; } break; + case REORDER_BUFFER_CHANGE_MESSAGE: + { + Size message_size = change->data.msg.message_size; + Size prefix_size = strlen(data) + 1; + + change->data.msg.prefix = pstrdup(data); + change->data.msg.message = palloc(message_size); + memcpy(change->data.msg.message, data + prefix_size, + message_size); + + data += prefix_size + message_size; + } case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT: { Snapshot oldsnap; diff --git a/src/backend/storage/ipc/standby.c b/src/backend/storage/ipc/standby.c index 292bed5..9e2d01b 100644 --- a/src/backend/storage/ipc/standby.c +++ b/src/backend/storage/ipc/standby.c @@ -28,6 +28,7 @@ #include "storage/procarray.h" #include "storage/sinvaladt.h" #include "storage/standby.h" +#include "utils/memutils.h" #include "utils/ps_status.h" #include "utils/timeout.h" #include "utils/timestamp.h" @@ -39,6 +40,8 @@ int max_standby_streaming_delay = 30 * 1000; static List *RecoveryLockList; +static List *StandbyMsgPrefixList = NIL; + static void ResolveRecoveryConflictWithVirtualXIDs(VirtualTransactionId *waitlist, ProcSignalReason reason); static void ResolveRecoveryConflictWithLock(Oid dbOid, Oid relOid); @@ -795,6 +798,10 @@ standby_redo(XLogReaderState *record) ProcArrayApplyRecoveryInfo(&running); } + else if (info == XLOG_STANDBY_MESSAGE) + { + /* Only interesting to logical decoding. Check decode.c */ + } else elog(PANIC, "standby_redo: unknown op code %u", info); } @@ -1038,3 +1045,78 @@ LogAccessExclusiveLockPrepare(void) */ (void) GetTopTransactionId(); } + +XLogRecPtr +LogStandbyMessage(const char *prefix, const char *message, size_t size, + bool transactional) +{ + ListCell *lc; + bool found = false; + xl_standby_message xlrec; + + /* Check if the provided prefix is known to us. */ + foreach(lc, StandbyMsgPrefixList) + { + char *mp = lfirst(lc); + + if (strcmp(prefix, mp) == 0) + { + found = true; + break; + } + } + if (!found) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("standby message prefix \"%s\" is not registered", + prefix))); + + /* + * Force xid to be allocated if we're sending a transactional message. + */ + if (transactional) + { + Assert(IsTransactionState()); + GetCurrentTransactionId(); + } + + xlrec.transactional = transactional; + xlrec.prefix_size = strlen(prefix) + 1; + xlrec.message_size = size; + + XLogBeginInsert(); + XLogRegisterData((char *) &xlrec, SizeOfStandbyMessage); + XLogRegisterData((char *) prefix, xlrec.prefix_size); + XLogRegisterData((char *) message, size); + + return XLogInsert(RM_STANDBY_ID, XLOG_STANDBY_MESSAGE); +} + +void +RegisterStandbyMsgPrefix(const char *prefix) +{ + ListCell *lc; + bool found = false; + MemoryContext oldcxt; + + /* Check for duplicit registrations. */ + foreach(lc, StandbyMsgPrefixList) + { + char *mp = lfirst(lc); + + if (strcmp(prefix, mp) == 0) + { + found = true; + break; + } + } + if (found) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("standby message prefix \"%s\" is already registered", + prefix))); + + oldcxt = MemoryContextSwitchTo(TopMemoryContext); + StandbyMsgPrefixList = lappend(StandbyMsgPrefixList, pstrdup(prefix)); + MemoryContextSwitchTo(oldcxt); +} diff --git a/src/include/catalog/pg_proc.h b/src/include/catalog/pg_proc.h index d8640db..8871207 100644 --- a/src/include/catalog/pg_proc.h +++ b/src/include/catalog/pg_proc.h @@ -5215,6 +5215,10 @@ DATA(insert OID = 3784 ( pg_logical_slot_peek_changes PGNSP PGUID 12 1000 1000 DESCR("peek at changes from replication slot"); DATA(insert OID = 3785 ( pg_logical_slot_peek_binary_changes PGNSP PGUID 12 1000 1000 25 0 f f f f f t v u 4 0 2249 "19 3220 23 1009" "{19,3220,23,1009,3220,28,17}" "{i,i,i,v,o,o,o}" "{slot_name,upto_lsn,upto_nchanges,options,location,xid,data}" _null_ _null_ pg_logical_slot_peek_binary_changes _null_ _null_ _null_ )); DESCR("peek at binary changes from replication slot"); +DATA(insert OID = 3577 ( pg_logical_send_message PGNSP PGUID 12 1 0 0 0 f f f f f f v u 3 0 3220 "16 25 25" _null_ _null_ _null_ _null_ _null_ pg_logical_send_message_text _null_ _null_ _null_ )); +DESCR("send a textual message"); +DATA(insert OID = 3578 ( pg_logical_send_message PGNSP PGUID 12 1 0 0 0 f f f f f f v u 3 0 3220 "16 25 17" _null_ _null_ _null_ _null_ _null_ pg_logical_send_message_bytea _null_ _null_ _null_ )); +DESCR("send a binary message"); /* event triggers */ DATA(insert OID = 3566 ( pg_event_trigger_dropped_objects PGNSP PGUID 12 10 100 0 0 f f f f t t s s 0 0 2249 "" "{26,26,23,16,16,16,25,25,25,25,1009,1009}" "{o,o,o,o,o,o,o,o,o,o,o,o}" "{classid, objid, objsubid, original, normal, is_temporary, object_type, schema_name, object_name, object_identity, address_names, address_args}" _null_ _null_ pg_event_trigger_dropped_objects _null_ _null_ _null_ )); diff --git a/src/include/replication/logicalfuncs.h b/src/include/replication/logicalfuncs.h index 03fbc4a..bf94193 100644 --- a/src/include/replication/logicalfuncs.h +++ b/src/include/replication/logicalfuncs.h @@ -21,4 +21,6 @@ extern Datum pg_logical_slot_get_binary_changes(PG_FUNCTION_ARGS); extern Datum pg_logical_slot_peek_changes(PG_FUNCTION_ARGS); extern Datum pg_logical_slot_peek_binary_changes(PG_FUNCTION_ARGS); +extern Datum pg_logical_send_message_bytea(PG_FUNCTION_ARGS); +extern Datum pg_logical_send_message_text(PG_FUNCTION_ARGS); #endif diff --git a/src/include/replication/output_plugin.h b/src/include/replication/output_plugin.h index 17c3de2..d3df560 100644 --- a/src/include/replication/output_plugin.h +++ b/src/include/replication/output_plugin.h @@ -74,6 +74,18 @@ typedef void (*LogicalDecodeCommitCB) ( XLogRecPtr commit_lsn); /* + * Called for messages sent by C code. + */ +typedef void (*LogicalDecodeMessageCB) ( + struct LogicalDecodingContext *, + ReorderBufferTXN *txn, + XLogRecPtr message_lsn, + bool transactional, + const char *prefix, + Size message_size, + const char *message); + +/* * Filter changes by origin. */ typedef bool (*LogicalDecodeFilterByOriginCB) ( @@ -96,6 +108,7 @@ typedef struct OutputPluginCallbacks LogicalDecodeBeginCB begin_cb; LogicalDecodeChangeCB change_cb; LogicalDecodeCommitCB commit_cb; + LogicalDecodeMessageCB message_cb; LogicalDecodeFilterByOriginCB filter_by_origin_cb; LogicalDecodeShutdownCB shutdown_cb; } OutputPluginCallbacks; diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index c9a11db..8d2fca7 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -51,6 +51,7 @@ enum ReorderBufferChangeType REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_UPDATE, REORDER_BUFFER_CHANGE_DELETE, + REORDER_BUFFER_CHANGE_MESSAGE, REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT, REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, @@ -95,6 +96,14 @@ typedef struct ReorderBufferChange ReorderBufferTupleBuf *newtuple; } tp; + struct + { + bool transactional; + char *prefix; + size_t message_size; + char *message; + } msg; + /* New snapshot, set when action == *_INTERNAL_SNAPSHOT */ Snapshot snapshot; @@ -271,6 +280,15 @@ typedef void (*ReorderBufferCommitCB) ( ReorderBufferTXN *txn, XLogRecPtr commit_lsn); +/* message callback signature */ +typedef void (*ReorderBufferMessageCB) ( + ReorderBuffer *rb, + ReorderBufferTXN *txn, + XLogRecPtr message_lsn, + bool transactional, + const char *prefix, Size sz, + const char *message); + struct ReorderBuffer { /* @@ -297,6 +315,7 @@ struct ReorderBuffer ReorderBufferBeginCB begin; ReorderBufferApplyChangeCB apply_change; ReorderBufferCommitCB commit; + ReorderBufferMessageCB message; /* * Pointer that will be passed untouched to the callbacks. @@ -347,6 +366,8 @@ ReorderBufferChange *ReorderBufferGetChange(ReorderBuffer *); void ReorderBufferReturnChange(ReorderBuffer *, ReorderBufferChange *); void ReorderBufferQueueChange(ReorderBuffer *, TransactionId, XLogRecPtr lsn, ReorderBufferChange *); +void ReorderBufferQueueMessage(ReorderBuffer *, TransactionId, XLogRecPtr lsn, + bool transactional, const char *prefix, Size msg_sz, const char *msg); void ReorderBufferCommit(ReorderBuffer *, TransactionId, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn); diff --git a/src/include/storage/standby.h b/src/include/storage/standby.h index 40b329b..3f036cb 100644 --- a/src/include/storage/standby.h +++ b/src/include/storage/standby.h @@ -56,6 +56,7 @@ extern void StandbyReleaseOldLocks(int nxids, TransactionId *xids); */ #define XLOG_STANDBY_LOCK 0x00 #define XLOG_RUNNING_XACTS 0x10 +#define XLOG_STANDBY_MESSAGE 0x20 typedef struct xl_standby_locks { @@ -80,6 +81,17 @@ typedef struct xl_running_xacts #define MinSizeOfXactRunningXacts offsetof(xl_running_xacts, xids) +typedef struct xl_standby_message +{ + bool transactional; /* is message transactional? */ + size_t prefix_size; /* length of prefix */ + size_t message_size; /* size of the message */ + char message[FLEXIBLE_ARRAY_MEMBER]; /* message including the null + * terminated prefx of length + * prefix_size */ +} xl_standby_message; + +#define SizeOfStandbyMessage (offsetof(xl_standby_message, message)) /* Recovery handlers for the Standby Rmgr (RM_STANDBY_ID) */ extern void standby_redo(XLogReaderState *record); @@ -116,5 +128,8 @@ extern void LogAccessExclusiveLock(Oid dbOid, Oid relOid); extern void LogAccessExclusiveLockPrepare(void); extern XLogRecPtr LogStandbySnapshot(void); +extern XLogRecPtr LogStandbyMessage(const char *prefix, const char *message, + size_t size, bool transactional); +extern void RegisterStandbyMsgPrefix(const char *prefix); #endif /* STANDBY_H */ -- 1.9.1