diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c index de1b6926581..cbf25cb2236 100644 --- a/contrib/test_decoding/test_decoding.c +++ b/contrib/test_decoding/test_decoding.c @@ -900,6 +900,9 @@ pg_decode_stream_change(LogicalDecodingContext *ctx, { TestDecodingData *data = ctx->output_plugin_private; TestDecodingTxnData *txndata = txn->output_plugin_private; + Form_pg_class class_form; + TupleDesc tupdesc; + MemoryContext old; /* output stream start if we haven't yet */ if (data->skip_empty_xacts && !txndata->stream_wrote_changes) @@ -913,6 +916,71 @@ pg_decode_stream_change(LogicalDecodingContext *ctx, appendStringInfo(ctx->out, "streaming change for TXN %u", txn->xid); else appendStringInfoString(ctx->out, "streaming change for transaction"); + + class_form = RelationGetForm(relation); + tupdesc = RelationGetDescr(relation); + + /* Avoid leaking memory by using and resetting our own context */ + old = MemoryContextSwitchTo(data->context); + + OutputPluginPrepareWrite(ctx, true); + + appendStringInfoString(ctx->out, "table "); + appendStringInfoString(ctx->out, + quote_qualified_identifier(get_namespace_name(get_rel_namespace(RelationGetRelid(relation))), + class_form->relrewrite ? + get_rel_name(class_form->relrewrite) : + NameStr(class_form->relname))); + appendStringInfoChar(ctx->out, ':'); + + switch (change->action) + { + case REORDER_BUFFER_CHANGE_INSERT: + appendStringInfoString(ctx->out, " INSERT:"); + if (change->data.tp.newtuple == NULL) + appendStringInfoString(ctx->out, " (no-tuple-data)"); + else + tuple_to_stringinfo(ctx->out, tupdesc, + &change->data.tp.newtuple->tuple, + false); + break; + case REORDER_BUFFER_CHANGE_UPDATE: + appendStringInfoString(ctx->out, " UPDATE:"); + if (change->data.tp.oldtuple != NULL) + { + appendStringInfoString(ctx->out, " old-key:"); + tuple_to_stringinfo(ctx->out, tupdesc, + &change->data.tp.oldtuple->tuple, + true); + appendStringInfoString(ctx->out, " new-tuple:"); + } + + if (change->data.tp.newtuple == NULL) + appendStringInfoString(ctx->out, " (no-tuple-data)"); + else + tuple_to_stringinfo(ctx->out, tupdesc, + &change->data.tp.newtuple->tuple, + false); + break; + case REORDER_BUFFER_CHANGE_DELETE: + appendStringInfoString(ctx->out, " DELETE:"); + + /* if there was no PK, we only know that a delete happened */ + if (change->data.tp.oldtuple == NULL) + appendStringInfoString(ctx->out, " (no-tuple-data)"); + /* In DELETE, only the replica identity is present; display that */ + else + tuple_to_stringinfo(ctx->out, tupdesc, + &change->data.tp.oldtuple->tuple, + true); + break; + default: + Assert(false); + } + + MemoryContextSwitchTo(old); + MemoryContextReset(data->context); + OutputPluginWrite(ctx, true); } diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index b0ab91cc71b..76ecfc0079c 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -2183,7 +2183,7 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, * they're not required anymore. The creator of the * tuple tells us. */ - if (change->data.tp.clear_toast_afterwards) + if (change->data.tp.clear_toast_afterwards || streaming) ReorderBufferToastReset(rb, txn); } /* we're not interested in toast deletions */