Merge compact/non compact commits, make aborts dynamically sized
Hi,
Right now wal_level=logical implies that the compact commit record
format isn't used and similarly 2pc commits also include the non compact
format of commits.
In the course of the 'replication identifier' patch submitted to the
current commitfest I added more information to the non compact commit
record if a xinfo flag was present. I optionally adding data is a better
course than the rigorous split between compact/non compact commits.
In the attached patch I've merged compact/noncompact code, made aborts
use similar logic to avoid including useless bytes and used both for the
2pc equivalents.
To avoid using more space in the compact case the 'xinfo' field
indicating the presence of further data is only included when a byte in
the xl_info flag is set (similar to what heap rmgr does). That means
that transactions without subtransactions and other things are four
bytes smaller than before, but ones with a subtransaction but no other
complications 4 byte larger. database info, nsubxacts, nrels, nmsgs et
al are also only included if required. I think that's a overall win,
even disregarding wal_level = logical.
When compact commits were discussed in
http://archives.postgresql.org/message-id/235610.92468.qm%40web29004.mail.ird.yahoo.com
such a scheme was discussed, but then discarded. I think that was a
mistake; the information in commit records is likely to be growing and
having to decide between the compact/non compact form instead on a more
granular level makes decisions harder. The information about two forms
of commits already has creeped into too many places...
I generally like how the patch looks, the relevant code actually looks
clearer to me afterwards - especially RecordTransactionCommit() being
noticeably shorter, decode.c having to care about less and that
twophase.c knows a less about xact.c type records seems good.
There's one bit that I'm not so sure about though: To avoid duplication
I've added Parse(Commit/Abort)Record(), but unfortunately that has to be
available both in front and backend code - so it's currently living in
xactdesc.c. I think we can live with that, but it's certainly not
pretty.
I'm not going to rebase the replication identifier patch over this for
now, seems premature until some feedback is in.
Comments?
Greetings,
Andres Freund
--
Andres Freund http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Training & Services
Attachments:
0001-Debloat-and-deduplicate-transaction-commit-abort-rec.patchtext/x-patch; charset=us-asciiDownload
>From 06bc3275788b494306c717019c9b1b6082ea579a Mon Sep 17 00:00:00 2001
From: Andres Freund <andres@anarazel.de>
Date: Fri, 20 Feb 2015 12:30:57 +0100
Subject: [PATCH] Debloat and deduplicate transaction commit/abort records.
---
src/backend/access/rmgrdesc/xactdesc.c | 189 +++++++++-----
src/backend/access/transam/twophase.c | 56 ++---
src/backend/access/transam/xact.c | 416 +++++++++++++++++++------------
src/backend/access/transam/xlog.c | 16 +-
src/backend/replication/logical/decode.c | 101 +++-----
src/include/access/xact.h | 162 ++++++++----
6 files changed, 564 insertions(+), 376 deletions(-)
diff --git a/src/backend/access/rmgrdesc/xactdesc.c b/src/backend/access/rmgrdesc/xactdesc.c
index 3e87978..0686d55 100644
--- a/src/backend/access/rmgrdesc/xactdesc.c
+++ b/src/backend/access/rmgrdesc/xactdesc.c
@@ -19,48 +19,143 @@
#include "storage/sinval.h"
#include "utils/timestamp.h"
+/* Parse the WAL format of a xact abort into a easier to understand format. */
+void
+ParseCommitRecord(uint8 info, xl_xact_commit *xlrec, xl_xact_parsed_commit *parsed)
+{
+ char *data = ((char *) xlrec) + MinSizeOfXactCommit;
+
+ memset(parsed, 0, sizeof(*parsed));
+
+ parsed->xinfo = 0; /* default, if no XLOG_XACT_HAS_INFO is present */
+
+ if (info & XLOG_XACT_HAS_INFO)
+ {
+ parsed->xinfo = *(uint32 *) data;
+
+ data += sizeof(uint32);
+ }
+
+ if (parsed->xinfo & XACT_XINFO_HAS_DBINFO)
+ {
+ xl_xact_dbinfo *xl_dbinfo = (xl_xact_dbinfo *) data;
+
+ parsed->dbId = xl_dbinfo->dbId;
+ parsed->tsId = xl_dbinfo->tsId;
+
+ data += sizeof(xl_xact_dbinfo);
+ }
+
+ if (parsed->xinfo & XACT_XINFO_HAS_SUBXACTS)
+ {
+ xl_xact_subxacts *xl_subxacts = (xl_xact_subxacts *) data;
+
+ parsed->nsubxacts = xl_subxacts->nsubxacts;
+ parsed->subxacts = xl_subxacts->subxacts;
+
+ data += MinSizeOfXactSubxacts;
+ data += parsed->nsubxacts * sizeof(TransactionId);
+ }
+
+ if (parsed->xinfo & XACT_XINFO_HAS_RELFILENODES)
+ {
+ xl_xact_relfilenodes *xl_relfilenodes = (xl_xact_relfilenodes *) data;
+
+ parsed->nrels = xl_relfilenodes->nrels;
+ parsed->xnodes = xl_relfilenodes->xnodes;
+
+ data += MinSizeOfXactRelfilenodes;
+ data += xl_relfilenodes->nrels * sizeof(RelFileNode);
+ }
+
+ if (parsed->xinfo & XACT_XINFO_HAS_INVALS)
+ {
+ xl_xact_invals *xl_invals = (xl_xact_invals *) data;
+
+ parsed->nmsgs = xl_invals->nmsgs;
+ parsed->msgs = xl_invals->msgs;
+
+ data += MinSizeOfXactInvals;
+ data += xl_invals->nmsgs * sizeof(SharedInvalidationMessage);
+ }
+}
+
+/* Parse the WAL format of xact abort into a easier to understand format. */
+void
+ParseAbortRecord(uint8 info, xl_xact_abort *xlrec, xl_xact_parsed_abort *parsed)
+{
+ char *data = ((char *) xlrec) + MinSizeOfXactAbort;
+
+ memset(parsed, 0, sizeof(*parsed));
+
+ parsed->xinfo = 0; /* default, if no XLOG_XACT_HAS_INFO is present */
+
+ if (info & XLOG_XACT_HAS_INFO)
+ {
+ parsed->xinfo = *(uint32 *) data;
+ data += sizeof(uint32);
+ }
+
+ if (parsed->xinfo & XACT_XINFO_HAS_SUBXACTS)
+ {
+ xl_xact_subxacts *xl_subxacts = (xl_xact_subxacts *) data;
+
+ parsed->nsubxacts = xl_subxacts->nsubxacts;
+ parsed->subxacts = xl_subxacts->subxacts;
+
+ data += MinSizeOfXactSubxacts;
+ data += parsed->nsubxacts * sizeof(TransactionId);
+ }
+
+ if (parsed->xinfo & XACT_XINFO_HAS_RELFILENODES)
+ {
+ xl_xact_relfilenodes *xl_relfilenodes = (xl_xact_relfilenodes *) data;
+
+ parsed->nrels = xl_relfilenodes->nrels;
+ parsed->xnodes = xl_relfilenodes->xnodes;
+
+ data += MinSizeOfXactRelfilenodes;
+ data += xl_relfilenodes->nrels * sizeof(RelFileNode);
+ }
+}
static void
-xact_desc_commit(StringInfo buf, xl_xact_commit *xlrec)
+xact_desc_commit(StringInfo buf, uint8 info, xl_xact_commit *xlrec)
{
+ xl_xact_parsed_commit parsed_c;
int i;
- TransactionId *subxacts;
- subxacts = (TransactionId *) &xlrec->xnodes[xlrec->nrels];
+ ParseCommitRecord(info, xlrec, &parsed_c);
appendStringInfoString(buf, timestamptz_to_str(xlrec->xact_time));
- if (xlrec->nrels > 0)
+ if (parsed_c.nrels > 0)
{
appendStringInfoString(buf, "; rels:");
- for (i = 0; i < xlrec->nrels; i++)
+ for (i = 0; i < parsed_c.nrels; i++)
{
- char *path = relpathperm(xlrec->xnodes[i], MAIN_FORKNUM);
+ char *path = relpathperm(parsed_c.xnodes[i], MAIN_FORKNUM);
appendStringInfo(buf, " %s", path);
pfree(path);
}
}
- if (xlrec->nsubxacts > 0)
+ if (parsed_c.nsubxacts > 0)
{
appendStringInfoString(buf, "; subxacts:");
- for (i = 0; i < xlrec->nsubxacts; i++)
- appendStringInfo(buf, " %u", subxacts[i]);
+ for (i = 0; i < parsed_c.nsubxacts; i++)
+ appendStringInfo(buf, " %u", parsed_c.subxacts[i]);
}
- if (xlrec->nmsgs > 0)
+ if (parsed_c.nmsgs > 0)
{
- SharedInvalidationMessage *msgs;
-
- msgs = (SharedInvalidationMessage *) &subxacts[xlrec->nsubxacts];
-
- if (XactCompletionRelcacheInitFileInval(xlrec->xinfo))
+ if (XactCompletionRelcacheInitFileInval(parsed_c.xinfo))
appendStringInfo(buf, "; relcache init file inval dbid %u tsid %u",
- xlrec->dbId, xlrec->tsId);
+ parsed_c.dbId, parsed_c.tsId);
appendStringInfoString(buf, "; inval msgs:");
- for (i = 0; i < xlrec->nmsgs; i++)
+ for (i = 0; i < parsed_c.nmsgs; i++)
{
- SharedInvalidationMessage *msg = &msgs[i];
+ SharedInvalidationMessage *msg = &parsed_c.msgs[i];
if (msg->id >= 0)
appendStringInfo(buf, " catcache %d", msg->id);
@@ -83,45 +178,31 @@ xact_desc_commit(StringInfo buf, xl_xact_commit *xlrec)
}
static void
-xact_desc_commit_compact(StringInfo buf, xl_xact_commit_compact *xlrec)
+xact_desc_abort(StringInfo buf, uint8 info, xl_xact_abort *xlrec)
{
+ xl_xact_parsed_abort parsed_a;
int i;
- appendStringInfoString(buf, timestamptz_to_str(xlrec->xact_time));
-
- if (xlrec->nsubxacts > 0)
- {
- appendStringInfoString(buf, "; subxacts:");
- for (i = 0; i < xlrec->nsubxacts; i++)
- appendStringInfo(buf, " %u", xlrec->subxacts[i]);
- }
-}
-
-static void
-xact_desc_abort(StringInfo buf, xl_xact_abort *xlrec)
-{
- int i;
+ ParseAbortRecord(info, xlrec, &parsed_a);
appendStringInfoString(buf, timestamptz_to_str(xlrec->xact_time));
- if (xlrec->nrels > 0)
+ if (parsed_a.nrels > 0)
{
appendStringInfoString(buf, "; rels:");
- for (i = 0; i < xlrec->nrels; i++)
+ for (i = 0; i < parsed_a.nrels; i++)
{
- char *path = relpathperm(xlrec->xnodes[i], MAIN_FORKNUM);
+ char *path = relpathperm(parsed_a.xnodes[i], MAIN_FORKNUM);
appendStringInfo(buf, " %s", path);
pfree(path);
}
}
- if (xlrec->nsubxacts > 0)
- {
- TransactionId *xacts = (TransactionId *)
- &xlrec->xnodes[xlrec->nrels];
+ if (parsed_a.nsubxacts > 0)
+ {
appendStringInfoString(buf, "; subxacts:");
- for (i = 0; i < xlrec->nsubxacts; i++)
- appendStringInfo(buf, " %u", xacts[i]);
+ for (i = 0; i < parsed_a.nsubxacts; i++)
+ appendStringInfo(buf, " %u", parsed_a.subxacts[i]);
}
}
@@ -142,37 +223,33 @@ xact_desc(StringInfo buf, XLogReaderState *record)
char *rec = XLogRecGetData(record);
uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
- if (info == XLOG_XACT_COMMIT_COMPACT)
- {
- xl_xact_commit_compact *xlrec = (xl_xact_commit_compact *) rec;
+ info &= XLOG_XACT_OPMASK;
- xact_desc_commit_compact(buf, xlrec);
- }
- else if (info == XLOG_XACT_COMMIT)
+ if (info == XLOG_XACT_COMMIT)
{
xl_xact_commit *xlrec = (xl_xact_commit *) rec;
- xact_desc_commit(buf, xlrec);
+ xact_desc_commit(buf, XLogRecGetInfo(record), xlrec);
}
else if (info == XLOG_XACT_ABORT)
{
xl_xact_abort *xlrec = (xl_xact_abort *) rec;
- xact_desc_abort(buf, xlrec);
+ xact_desc_abort(buf, XLogRecGetInfo(record), xlrec);
}
else if (info == XLOG_XACT_COMMIT_PREPARED)
{
xl_xact_commit_prepared *xlrec = (xl_xact_commit_prepared *) rec;
appendStringInfo(buf, "%u: ", xlrec->xid);
- xact_desc_commit(buf, &xlrec->crec);
+ xact_desc_commit(buf, XLogRecGetInfo(record), &xlrec->crec);
}
else if (info == XLOG_XACT_ABORT_PREPARED)
{
xl_xact_abort_prepared *xlrec = (xl_xact_abort_prepared *) rec;
appendStringInfo(buf, "%u: ", xlrec->xid);
- xact_desc_abort(buf, &xlrec->arec);
+ xact_desc_abort(buf, XLogRecGetInfo(record), &xlrec->arec);
}
else if (info == XLOG_XACT_ASSIGNMENT)
{
@@ -193,7 +270,10 @@ xact_identify(uint8 info)
{
const char *id = NULL;
- switch (info & ~XLR_INFO_MASK)
+ info &= XLR_INFO_MASK;
+ info &= XLOG_XACT_OPMASK;
+
+ switch (info)
{
case XLOG_XACT_COMMIT:
id = "COMMIT";
@@ -213,9 +293,6 @@ xact_identify(uint8 info)
case XLOG_XACT_ASSIGNMENT:
id = "ASSIGNMENT";
break;
- case XLOG_XACT_COMMIT_COMPACT:
- id = "COMMIT_COMPACT";
- break;
}
return id;
diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c
index 6c7029e..a69b9fc 100644
--- a/src/backend/access/transam/twophase.c
+++ b/src/backend/access/transam/twophase.c
@@ -2083,6 +2083,9 @@ RecordTransactionCommitPrepared(TransactionId xid,
bool initfileinval)
{
xl_xact_commit_prepared xlrec;
+
+ uint8 info;
+
XLogRecPtr recptr;
START_CRIT_SECTION();
@@ -2090,37 +2093,20 @@ RecordTransactionCommitPrepared(TransactionId xid,
/* See notes in RecordTransactionCommit */
MyPgXact->delayChkpt = true;
- /* Emit the XLOG commit record */
xlrec.xid = xid;
- xlrec.crec.xinfo = initfileinval ? XACT_COMPLETION_UPDATE_RELCACHE_FILE : 0;
-
- xlrec.crec.dbId = MyDatabaseId;
- xlrec.crec.tsId = MyDatabaseTableSpace;
-
- xlrec.crec.xact_time = GetCurrentTimestamp();
- xlrec.crec.nrels = nrels;
- xlrec.crec.nsubxacts = nchildren;
- xlrec.crec.nmsgs = ninvalmsgs;
-
XLogBeginInsert();
- XLogRegisterData((char *) (&xlrec), MinSizeOfXactCommitPrepared);
-
- /* dump rels to delete */
- if (nrels > 0)
- XLogRegisterData((char *) rels, nrels * sizeof(RelFileNode));
- /* dump committed child Xids */
- if (nchildren > 0)
- XLogRegisterData((char *) children,
- nchildren * sizeof(TransactionId));
+ /* emit the twophase part of the record */
+ XLogRegisterData((char *) (&xlrec), offsetof(xl_xact_commit_prepared, crec));
- /* dump cache invalidation messages */
- if (ninvalmsgs > 0)
- XLogRegisterData((char *) invalmsgs,
- ninvalmsgs * sizeof(SharedInvalidationMessage));
+ /* and then the embedded commit record */
+ info = XactEmitCommitRecord(GetCurrentTimestamp(),
+ nchildren, children, nrels, rels,
+ ninvalmsgs, invalmsgs,
+ initfileinval, false);
- recptr = XLogInsert(RM_XACT_ID, XLOG_XACT_COMMIT_PREPARED);
+ recptr = XLogInsert(RM_XACT_ID, XLOG_XACT_COMMIT_PREPARED | info);
/*
* We don't currently try to sleep before flush here ... nor is there any
@@ -2165,6 +2151,7 @@ RecordTransactionAbortPrepared(TransactionId xid,
{
xl_xact_abort_prepared xlrec;
XLogRecPtr recptr;
+ uint8 info;
/*
* Catch the scenario where we aborted partway through
@@ -2178,23 +2165,18 @@ RecordTransactionAbortPrepared(TransactionId xid,
/* Emit the XLOG abort record */
xlrec.xid = xid;
- xlrec.arec.xact_time = GetCurrentTimestamp();
- xlrec.arec.nrels = nrels;
- xlrec.arec.nsubxacts = nchildren;
XLogBeginInsert();
- XLogRegisterData((char *) (&xlrec), MinSizeOfXactAbortPrepared);
- /* dump rels to delete */
- if (nrels > 0)
- XLogRegisterData((char *) rels, nrels * sizeof(RelFileNode));
+ /* emit the twophase part of the record */
+ XLogRegisterData((char *) (&xlrec), offsetof(xl_xact_abort_prepared, arec));
- /* dump committed child Xids */
- if (nchildren > 0)
- XLogRegisterData((char *) children,
- nchildren * sizeof(TransactionId));
+ /* and then the embedded abort record */
+ info = XactEmitAbortRecord(GetCurrentTimestamp(),
+ nchildren, children,
+ nrels, rels);
- recptr = XLogInsert(RM_XACT_ID, XLOG_XACT_ABORT_PREPARED);
+ recptr = XLogInsert(RM_XACT_ID, XLOG_XACT_ABORT_PREPARED | info);
/* Always flush, since we're about to remove the 2PC state file */
XLogFlush(recptr);
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index 97000ef..134f852 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -1041,6 +1041,8 @@ RecordTransactionCommit(void)
}
else
{
+ uint8 info = 0;
+
/*
* Begin commit critical section and insert the commit XLOG record.
*/
@@ -1069,70 +1071,14 @@ RecordTransactionCommit(void)
SetCurrentTransactionStopTimestamp();
- /*
- * Do we need the long commit record? If not, use the compact format.
- *
- * For now always use the non-compact version if wal_level=logical, so
- * we can hide commits from other databases. TODO: In the future we
- * should merge compact and non-compact commits and use a flags
- * variable to determine if it contains subxacts, relations or
- * invalidation messages, that's more extensible and degrades more
- * gracefully. Till then, it's just 20 bytes of overhead.
- */
- if (nrels > 0 || nmsgs > 0 || RelcacheInitFileInval || forceSyncCommit ||
- XLogLogicalInfoActive())
- {
- xl_xact_commit xlrec;
-
- /*
- * Set flags required for recovery processing of commits.
- */
- xlrec.xinfo = 0;
- if (RelcacheInitFileInval)
- xlrec.xinfo |= XACT_COMPLETION_UPDATE_RELCACHE_FILE;
- if (forceSyncCommit)
- xlrec.xinfo |= XACT_COMPLETION_FORCE_SYNC_COMMIT;
-
- xlrec.dbId = MyDatabaseId;
- xlrec.tsId = MyDatabaseTableSpace;
-
- xlrec.xact_time = xactStopTimestamp;
- xlrec.nrels = nrels;
- xlrec.nsubxacts = nchildren;
- xlrec.nmsgs = nmsgs;
-
- XLogBeginInsert();
- XLogRegisterData((char *) (&xlrec), MinSizeOfXactCommit);
- /* dump rels to delete */
- if (nrels > 0)
- XLogRegisterData((char *) rels,
- nrels * sizeof(RelFileNode));
- /* dump committed child Xids */
- if (nchildren > 0)
- XLogRegisterData((char *) children,
- nchildren * sizeof(TransactionId));
- /* dump shared cache invalidation messages */
- if (nmsgs > 0)
- XLogRegisterData((char *) invalMessages,
- nmsgs * sizeof(SharedInvalidationMessage));
- (void) XLogInsert(RM_XACT_ID, XLOG_XACT_COMMIT);
- }
- else
- {
- xl_xact_commit_compact xlrec;
-
- xlrec.xact_time = xactStopTimestamp;
- xlrec.nsubxacts = nchildren;
+ XLogBeginInsert();
- XLogBeginInsert();
- XLogRegisterData((char *) (&xlrec), MinSizeOfXactCommitCompact);
- /* dump committed child Xids */
- if (nchildren > 0)
- XLogRegisterData((char *) children,
- nchildren * sizeof(TransactionId));
+ info = XactEmitCommitRecord(xactStopTimestamp,
+ nchildren, children, nrels, rels,
+ nmsgs, invalMessages,
+ RelcacheInitFileInval, forceSyncCommit);
- (void) XLogInsert(RM_XACT_ID, XLOG_XACT_COMMIT_COMPACT);
- }
+ (void) XLogInsert(RM_XACT_ID, XLOG_XACT_COMMIT | info);
}
/*
@@ -1419,7 +1365,8 @@ RecordTransactionAbort(bool isSubXact)
RelFileNode *rels;
int nchildren;
TransactionId *children;
- xl_xact_abort xlrec;
+ uint8 info = 0;
+ TimestampTz xact_time;
/*
* If we haven't been assigned an XID, nobody will care whether we aborted
@@ -1459,28 +1406,20 @@ RecordTransactionAbort(bool isSubXact)
/* Write the ABORT record */
if (isSubXact)
- xlrec.xact_time = GetCurrentTimestamp();
+ xact_time = GetCurrentTimestamp();
else
{
SetCurrentTransactionStopTimestamp();
- xlrec.xact_time = xactStopTimestamp;
+ xact_time = xactStopTimestamp;
}
- xlrec.nrels = nrels;
- xlrec.nsubxacts = nchildren;
XLogBeginInsert();
- XLogRegisterData((char *) (&xlrec), MinSizeOfXactAbort);
- /* dump rels to delete */
- if (nrels > 0)
- XLogRegisterData((char *) rels, nrels * sizeof(RelFileNode));
-
- /* dump committed child Xids */
- if (nchildren > 0)
- XLogRegisterData((char *) children,
- nchildren * sizeof(TransactionId));
+ XactEmitAbortRecord(xact_time,
+ nchildren, children,
+ nrels, rels);
- (void) XLogInsert(RM_XACT_ID, XLOG_XACT_ABORT);
+ (void) XLogInsert(RM_XACT_ID, XLOG_XACT_ABORT | info);
/*
* Report the latest async abort LSN, so that the WAL writer knows to
@@ -4654,23 +4593,186 @@ xactGetCommittedChildren(TransactionId **ptr)
* XLOG support routines
*/
+
+/*
+ * Emit a commit record based on the passed in parameters. A xlog insertion
+ * already must have been started. The record isn't inserted though.
+ *
+ * Returns info bytes that have to be ORed with the record type passed to
+ * XLogInsert().
+ */
+uint8
+XactEmitCommitRecord(TimestampTz commit_time,
+ int nsubxacts, TransactionId *subxacts,
+ int nrels, RelFileNode *rels,
+ int nmsgs, SharedInvalidationMessage *msgs,
+ bool relcacheInval, bool forceSync)
+{
+ uint8 info = 0;
+ /* static, so they're valid until the caller does the XLogInsert() */
+ static uint32 xinfo;
+ static xl_xact_commit xlrec;
+ static xl_xact_dbinfo xl_dbinfo;
+ static xl_xact_subxacts xl_subxacts;
+ static xl_xact_relfilenodes xl_relfilenodes;
+ static xl_xact_invals xl_invals;
+
+ xinfo = 0;
+
+ /*
+ * First figure out and collect all the information needed
+ */
+ xlrec.xact_time = commit_time;
+
+ if (relcacheInval)
+ xinfo |= XACT_COMPLETION_UPDATE_RELCACHE_FILE;
+ if (forceSyncCommit)
+ xinfo |= XACT_COMPLETION_FORCE_SYNC_COMMIT;
+
+ /*
+ * Relcache invalidation requires information database and so does
+ * logical decoding.
+ */
+ if (nmsgs > 0 || XLogLogicalInfoActive())
+ {
+ xinfo |= XACT_XINFO_HAS_DBINFO;
+ xl_dbinfo.dbId = MyDatabaseId;
+ xl_dbinfo.tsId = MyDatabaseTableSpace;
+ }
+
+ if (nsubxacts > 0)
+ {
+ xinfo |= XACT_XINFO_HAS_SUBXACTS;
+ xl_subxacts.nsubxacts = nsubxacts;
+ }
+
+ if (nrels > 0)
+ {
+ xinfo |= XACT_XINFO_HAS_RELFILENODES;
+ xl_relfilenodes.nrels = nrels;
+ }
+
+ if (nmsgs > 0)
+ {
+ xinfo |= XACT_XINFO_HAS_INVALS;
+ xl_invals.nmsgs = nmsgs;
+ }
+
+ if (xinfo != 0)
+ info |= XLOG_XACT_HAS_INFO;
+
+ /*
+ * Then include all the collected data.
+ */
+ XLogRegisterData((char *) (&xlrec), sizeof(xl_xact_commit));
+
+ if (xinfo != 0)
+ XLogRegisterData((char *) (&xinfo), sizeof(xinfo));
+
+ if (xinfo & XACT_XINFO_HAS_DBINFO)
+ XLogRegisterData((char *) (&xl_dbinfo), sizeof(xl_dbinfo));
+
+ if (xinfo & XACT_XINFO_HAS_SUBXACTS)
+ {
+ XLogRegisterData((char *) (&xl_subxacts),
+ MinSizeOfXactSubxacts);
+ XLogRegisterData((char *) subxacts,
+ nsubxacts * sizeof(TransactionId));
+ }
+
+ if (xinfo & XACT_XINFO_HAS_RELFILENODES)
+ {
+ XLogRegisterData((char *) (&xl_relfilenodes),
+ MinSizeOfXactRelfilenodes);
+ XLogRegisterData((char *) rels,
+ nrels * sizeof(RelFileNode));
+ }
+
+ if (xinfo & XACT_XINFO_HAS_INVALS)
+ {
+ XLogRegisterData((char *) (&xl_invals), MinSizeOfXactInvals);
+ XLogRegisterData((char *) msgs,
+ nmsgs * sizeof(SharedInvalidationMessage));
+ }
+
+ return info;
+}
+
+/*
+ * Emit, but don't insert, a abort record.
+ *
+ * See XactEmitCommitRecord for details.
+ */
+uint8
+XactEmitAbortRecord(TimestampTz abort_time,
+ int nsubxacts, TransactionId *subxacts,
+ int nrels, RelFileNode *rels)
+{
+ uint8 info = 0;
+ /* static, so they're valid until the caller does the XLogInsert() */
+ static uint32 xinfo;
+ static xl_xact_commit xlrec;
+ static xl_xact_subxacts xl_subxacts;
+ static xl_xact_relfilenodes xl_relfilenodes;
+
+ xinfo = 0;
+
+ /* collect data to log */
+ if (nsubxacts > 0)
+ {
+ xinfo |= XACT_XINFO_HAS_SUBXACTS;
+ xl_subxacts.nsubxacts = nsubxacts;
+ }
+
+ if (nrels > 0)
+ {
+ xinfo |= XACT_XINFO_HAS_RELFILENODES;
+ xl_relfilenodes.nrels = nrels;
+ }
+
+ if (xinfo != 0)
+ info |= XLOG_XACT_HAS_INFO;
+
+ /* and actually log data */
+ XLogRegisterData((char *) (&xlrec), MinSizeOfXactAbort);
+
+ if (xinfo != 0)
+ XLogRegisterData((char *) (&xinfo), sizeof(xinfo));
+
+ if (xinfo & XACT_XINFO_HAS_SUBXACTS)
+ {
+ XLogRegisterData((char *) (&xl_subxacts),
+ MinSizeOfXactSubxacts);
+ XLogRegisterData((char *) subxacts,
+ nsubxacts * sizeof(TransactionId));
+ }
+
+ if (xinfo & XACT_XINFO_HAS_RELFILENODES)
+ {
+ XLogRegisterData((char *) (&xl_relfilenodes),
+ MinSizeOfXactRelfilenodes);
+ XLogRegisterData((char *) rels,
+ nrels * sizeof(RelFileNode));
+ }
+
+ return info;
+}
+
/*
* Before 9.0 this was a fairly short function, but now it performs many
* actions for which the order of execution is critical.
*/
static void
-xact_redo_commit_internal(TransactionId xid, XLogRecPtr lsn,
- TimestampTz commit_time,
- TransactionId *sub_xids, int nsubxacts,
- SharedInvalidationMessage *inval_msgs, int nmsgs,
- RelFileNode *xnodes, int nrels,
- Oid dbId, Oid tsId,
- uint32 xinfo)
+xact_redo_commit(uint8 info, xl_xact_commit *xlrec,
+ TransactionId xid, XLogRecPtr lsn)
{
+ xl_xact_parsed_commit parsed_c;
TransactionId max_xid;
int i;
- max_xid = TransactionIdLatest(xid, nsubxacts, sub_xids);
+ ParseCommitRecord(info, xlrec, &parsed_c);
+
+ max_xid = TransactionIdLatest(xid, parsed_c.nsubxacts, parsed_c.subxacts);
/*
* Make sure nextXid is beyond any XID mentioned in the record.
@@ -4689,15 +4791,16 @@ xact_redo_commit_internal(TransactionId xid, XLogRecPtr lsn,
}
/* Set the transaction commit timestamp and metadata */
- TransactionTreeSetCommitTsData(xid, nsubxacts, sub_xids,
- commit_time, InvalidCommitTsNodeId, false);
+ TransactionTreeSetCommitTsData(xid, parsed_c.nsubxacts, parsed_c.subxacts,
+ xlrec->xact_time, InvalidCommitTsNodeId,
+ false);
if (standbyState == STANDBY_DISABLED)
{
/*
* Mark the transaction committed in pg_clog.
*/
- TransactionIdCommitTree(xid, nsubxacts, sub_xids);
+ TransactionIdCommitTree(xid, parsed_c.nsubxacts, parsed_c.subxacts);
}
else
{
@@ -4721,21 +4824,21 @@ xact_redo_commit_internal(TransactionId xid, XLogRecPtr lsn,
* bits set on changes made by transactions that haven't yet
* recovered. It's unlikely but it's good to be safe.
*/
- TransactionIdAsyncCommitTree(xid, nsubxacts, sub_xids, lsn);
+ TransactionIdAsyncCommitTree(xid, parsed_c.nsubxacts, parsed_c.subxacts, lsn);
/*
* We must mark clog before we update the ProcArray.
*/
- ExpireTreeKnownAssignedTransactionIds(xid, nsubxacts, sub_xids, max_xid);
+ ExpireTreeKnownAssignedTransactionIds(xid, parsed_c.nsubxacts, parsed_c.subxacts, max_xid);
/*
* Send any cache invalidations attached to the commit. We must
* maintain the same order of invalidation then release locks as
* occurs in CommitTransaction().
*/
- ProcessCommittedInvalidationMessages(inval_msgs, nmsgs,
- XactCompletionRelcacheInitFileInval(xinfo),
- dbId, tsId);
+ ProcessCommittedInvalidationMessages(parsed_c.msgs, parsed_c.nmsgs,
+ XactCompletionRelcacheInitFileInval(parsed_c.xinfo),
+ parsed_c.dbId, parsed_c.tsId);
/*
* Release locks, if any. We do this for both two phase and normal one
@@ -4748,7 +4851,7 @@ xact_redo_commit_internal(TransactionId xid, XLogRecPtr lsn,
}
/* Make sure files supposed to be dropped are dropped */
- if (nrels > 0)
+ if (parsed_c.nrels > 0)
{
/*
* First update minimum recovery point to cover this WAL record. Once
@@ -4767,13 +4870,13 @@ xact_redo_commit_internal(TransactionId xid, XLogRecPtr lsn,
*/
XLogFlush(lsn);
- for (i = 0; i < nrels; i++)
+ for (i = 0; i < parsed_c.nrels; i++)
{
- SMgrRelation srel = smgropen(xnodes[i], InvalidBackendId);
+ SMgrRelation srel = smgropen(parsed_c.xnodes[i], InvalidBackendId);
ForkNumber fork;
for (fork = 0; fork <= MAX_FORKNUM; fork++)
- XLogDropRelation(xnodes[i], fork);
+ XLogDropRelation(parsed_c.xnodes[i], fork);
smgrdounlink(srel, true);
smgrclose(srel);
}
@@ -4791,52 +4894,12 @@ xact_redo_commit_internal(TransactionId xid, XLogRecPtr lsn,
* minRecoveryPoint during recovery) helps to reduce that problem window,
* for any user that requested ForceSyncCommit().
*/
- if (XactCompletionForceSyncCommit(xinfo))
+ if (XactCompletionForceSyncCommit(parsed_c.xinfo))
XLogFlush(lsn);
}
/*
- * Utility function to call xact_redo_commit_internal after breaking down xlrec
- */
-static void
-xact_redo_commit(xl_xact_commit *xlrec,
- TransactionId xid, XLogRecPtr lsn)
-{
- TransactionId *subxacts;
- SharedInvalidationMessage *inval_msgs;
-
- /* subxid array follows relfilenodes */
- subxacts = (TransactionId *) &(xlrec->xnodes[xlrec->nrels]);
- /* invalidation messages array follows subxids */
- inval_msgs = (SharedInvalidationMessage *) &(subxacts[xlrec->nsubxacts]);
-
- xact_redo_commit_internal(xid, lsn, xlrec->xact_time,
- subxacts, xlrec->nsubxacts,
- inval_msgs, xlrec->nmsgs,
- xlrec->xnodes, xlrec->nrels,
- xlrec->dbId,
- xlrec->tsId,
- xlrec->xinfo);
-}
-
-/*
- * Utility function to call xact_redo_commit_internal for compact form of message.
- */
-static void
-xact_redo_commit_compact(xl_xact_commit_compact *xlrec,
- TransactionId xid, XLogRecPtr lsn)
-{
- xact_redo_commit_internal(xid, lsn, xlrec->xact_time,
- xlrec->subxacts, xlrec->nsubxacts,
- NULL, 0, /* inval msgs */
- NULL, 0, /* relfilenodes */
- InvalidOid, /* dbId */
- InvalidOid, /* tsId */
- 0); /* xinfo */
-}
-
-/*
* Be careful with the order of execution, as with xact_redo_commit().
* The two functions are similar but differ in key places.
*
@@ -4846,14 +4909,49 @@ xact_redo_commit_compact(xl_xact_commit_compact *xlrec,
* because subtransaction commit is never WAL logged.
*/
static void
-xact_redo_abort(xl_xact_abort *xlrec, TransactionId xid)
+xact_redo_abort(uint8 info, xl_xact_abort *xlrec, TransactionId xid)
{
+ int i;
+ char *data = ((char *) xlrec) + MinSizeOfXactCommit;
TransactionId *sub_xids;
TransactionId max_xid;
- int i;
+ uint32 xinfo = 0;
+ int nsubxacts = 0;
+ RelFileNode *xnodes = NULL;
+ int nrels = 0;
+
+ if (info & XLOG_XACT_HAS_INFO)
+ {
+ xinfo = *(uint32 *) data;
+ data += sizeof(uint32);
+ }
+
+ if (xinfo & XACT_XINFO_HAS_SUBXACTS)
+ {
+ xl_xact_subxacts *xl_subxacts = (xl_xact_subxacts *) data;
- sub_xids = (TransactionId *) &(xlrec->xnodes[xlrec->nrels]);
- max_xid = TransactionIdLatest(xid, xlrec->nsubxacts, sub_xids);
+ nsubxacts = xl_subxacts->nsubxacts;
+ sub_xids = xl_subxacts->subxacts;
+
+ data += MinSizeOfXactSubxacts;
+ data += nsubxacts * sizeof(TransactionId);
+
+ max_xid = TransactionIdLatest(xid, xl_subxacts->nsubxacts,
+ xl_subxacts->subxacts);
+ }
+ else
+ max_xid = xid;
+
+ if (xinfo & XACT_XINFO_HAS_RELFILENODES)
+ {
+ xl_xact_relfilenodes *xl_relfilenodes = (xl_xact_relfilenodes *) data;
+
+ data += MinSizeOfXactRelfilenodes;
+ data += xl_relfilenodes->nrels * sizeof(TransactionId);
+
+ nrels = xl_relfilenodes->nrels;
+ xnodes = xl_relfilenodes->xnodes;
+ }
/*
* Make sure nextXid is beyond any XID mentioned in the record.
@@ -4874,7 +4972,7 @@ xact_redo_abort(xl_xact_abort *xlrec, TransactionId xid)
if (standbyState == STANDBY_DISABLED)
{
/* Mark the transaction aborted in pg_clog, no need for async stuff */
- TransactionIdAbortTree(xid, xlrec->nsubxacts, sub_xids);
+ TransactionIdAbortTree(xid, nsubxacts, sub_xids);
}
else
{
@@ -4890,12 +4988,12 @@ xact_redo_abort(xl_xact_abort *xlrec, TransactionId xid)
RecordKnownAssignedTransactionIds(max_xid);
/* Mark the transaction aborted in pg_clog, no need for async stuff */
- TransactionIdAbortTree(xid, xlrec->nsubxacts, sub_xids);
+ TransactionIdAbortTree(xid, nsubxacts, sub_xids);
/*
* We must update the ProcArray after we have marked clog.
*/
- ExpireTreeKnownAssignedTransactionIds(xid, xlrec->nsubxacts, sub_xids, max_xid);
+ ExpireTreeKnownAssignedTransactionIds(xid, nsubxacts, sub_xids, max_xid);
/*
* There are no flat files that need updating, nor invalidation
@@ -4905,17 +5003,17 @@ xact_redo_abort(xl_xact_abort *xlrec, TransactionId xid)
/*
* Release locks, if any. There are no invalidations to send.
*/
- StandbyReleaseLockTree(xid, xlrec->nsubxacts, sub_xids);
+ StandbyReleaseLockTree(xid, nsubxacts, sub_xids);
}
/* Make sure files supposed to be dropped are dropped */
- for (i = 0; i < xlrec->nrels; i++)
+ for (i = 0; i < nrels; i++)
{
- SMgrRelation srel = smgropen(xlrec->xnodes[i], InvalidBackendId);
+ SMgrRelation srel = smgropen(xnodes[i], InvalidBackendId);
ForkNumber fork;
for (fork = 0; fork <= MAX_FORKNUM; fork++)
- XLogDropRelation(xlrec->xnodes[i], fork);
+ XLogDropRelation(xnodes[i], fork);
smgrdounlink(srel, true);
smgrclose(srel);
}
@@ -4926,26 +5024,23 @@ xact_redo(XLogReaderState *record)
{
uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
+ info &= XLOG_XACT_OPMASK;
+
/* Backup blocks are not used in xact records */
Assert(!XLogRecHasAnyBlockRefs(record));
- if (info == XLOG_XACT_COMMIT_COMPACT)
- {
- xl_xact_commit_compact *xlrec = (xl_xact_commit_compact *) XLogRecGetData(record);
-
- xact_redo_commit_compact(xlrec, XLogRecGetXid(record), record->EndRecPtr);
- }
- else if (info == XLOG_XACT_COMMIT)
+ if (info == XLOG_XACT_COMMIT)
{
xl_xact_commit *xlrec = (xl_xact_commit *) XLogRecGetData(record);
- xact_redo_commit(xlrec, XLogRecGetXid(record), record->EndRecPtr);
+ xact_redo_commit(XLogRecGetInfo(record), xlrec,
+ XLogRecGetXid(record), record->EndRecPtr);
}
else if (info == XLOG_XACT_ABORT)
{
xl_xact_abort *xlrec = (xl_xact_abort *) XLogRecGetData(record);
- xact_redo_abort(xlrec, XLogRecGetXid(record));
+ xact_redo_abort(XLogRecGetInfo(record), xlrec, XLogRecGetXid(record));
}
else if (info == XLOG_XACT_PREPARE)
{
@@ -4957,14 +5052,15 @@ xact_redo(XLogReaderState *record)
{
xl_xact_commit_prepared *xlrec = (xl_xact_commit_prepared *) XLogRecGetData(record);
- xact_redo_commit(&xlrec->crec, xlrec->xid, record->EndRecPtr);
+ xact_redo_commit(XLogRecGetInfo(record), &xlrec->crec, xlrec->xid,
+ record->EndRecPtr);
RemoveTwoPhaseFile(xlrec->xid, false);
}
else if (info == XLOG_XACT_ABORT_PREPARED)
{
xl_xact_abort_prepared *xlrec = (xl_xact_abort_prepared *) XLogRecGetData(record);
- xact_redo_abort(&xlrec->arec, xlrec->xid);
+ xact_redo_abort(XLogRecGetInfo(record), &xlrec->arec, xlrec->xid);
RemoveTwoPhaseFile(xlrec->xid, false);
}
else if (info == XLOG_XACT_ASSIGNMENT)
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 629a457..f0e83ff 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -5106,11 +5106,6 @@ getRecordTimestamp(XLogReaderState *record, TimestampTz *recordXtime)
*recordXtime = ((xl_restore_point *) XLogRecGetData(record))->rp_time;
return true;
}
- if (rmid == RM_XACT_ID && record_info == XLOG_XACT_COMMIT_COMPACT)
- {
- *recordXtime = ((xl_xact_commit_compact *) XLogRecGetData(record))->xact_time;
- return true;
- }
if (rmid == RM_XACT_ID && record_info == XLOG_XACT_COMMIT)
{
*recordXtime = ((xl_xact_commit *) XLogRecGetData(record))->xact_time;
@@ -5169,7 +5164,7 @@ recoveryStopsBefore(XLogReaderState *record)
return false;
record_info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
- if (record_info == XLOG_XACT_COMMIT_COMPACT || record_info == XLOG_XACT_COMMIT)
+ if (record_info == XLOG_XACT_COMMIT)
{
isCommit = true;
recordXid = XLogRecGetXid(record);
@@ -5289,8 +5284,7 @@ recoveryStopsAfter(XLogReaderState *record)
}
if (rmid == RM_XACT_ID &&
- (record_info == XLOG_XACT_COMMIT_COMPACT ||
- record_info == XLOG_XACT_COMMIT ||
+ (record_info == XLOG_XACT_COMMIT ||
record_info == XLOG_XACT_COMMIT_PREPARED ||
record_info == XLOG_XACT_ABORT ||
record_info == XLOG_XACT_ABORT_PREPARED))
@@ -5326,8 +5320,7 @@ recoveryStopsAfter(XLogReaderState *record)
recoveryStopTime = recordXtime;
recoveryStopName[0] = '\0';
- if (record_info == XLOG_XACT_COMMIT_COMPACT ||
- record_info == XLOG_XACT_COMMIT ||
+ if (record_info == XLOG_XACT_COMMIT ||
record_info == XLOG_XACT_COMMIT_PREPARED)
{
ereport(LOG,
@@ -5443,8 +5436,7 @@ recoveryApplyDelay(XLogReaderState *record)
*/
record_info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
if (!(XLogRecGetRmid(record) == RM_XACT_ID &&
- (record_info == XLOG_XACT_COMMIT_COMPACT ||
- record_info == XLOG_XACT_COMMIT ||
+ (record_info == XLOG_XACT_COMMIT ||
record_info == XLOG_XACT_COMMIT_PREPARED)))
return false;
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index 77c02ba..8211daf 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -64,12 +64,9 @@ static void DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
static void DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
static void DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
static void DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
- TransactionId xid, Oid dboid,
- TimestampTz commit_time,
- int nsubxacts, TransactionId *sub_xids,
- int ninval_msgs, SharedInvalidationMessage *msg);
-static void DecodeAbort(LogicalDecodingContext *ctx, XLogRecPtr lsn,
- TransactionId xid, TransactionId *sub_xids, int nsubxacts);
+ xl_xact_commit *xlrec, TransactionId xid);
+static void DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
+ xl_xact_abort *xlrec, TransactionId xid);
/* common function to decode tuples */
static void DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tup);
@@ -194,23 +191,17 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT)
return;
+ info &= XLOG_XACT_OPMASK;
+
switch (info)
{
case XLOG_XACT_COMMIT:
{
xl_xact_commit *xlrec;
- TransactionId *subxacts = NULL;
- SharedInvalidationMessage *invals = NULL;
xlrec = (xl_xact_commit *) XLogRecGetData(r);
- subxacts = (TransactionId *) &(xlrec->xnodes[xlrec->nrels]);
- invals = (SharedInvalidationMessage *) &(subxacts[xlrec->nsubxacts]);
-
- DecodeCommit(ctx, buf, XLogRecGetXid(r), xlrec->dbId,
- xlrec->xact_time,
- xlrec->nsubxacts, subxacts,
- xlrec->nmsgs, invals);
+ DecodeCommit(ctx, buf, xlrec, XLogRecGetXid(r));
break;
}
@@ -218,63 +209,35 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
{
xl_xact_commit_prepared *prec;
xl_xact_commit *xlrec;
- TransactionId *subxacts;
- SharedInvalidationMessage *invals = NULL;
/* Prepared commits contain a normal commit record... */
prec = (xl_xact_commit_prepared *) XLogRecGetData(r);
xlrec = &prec->crec;
- subxacts = (TransactionId *) &(xlrec->xnodes[xlrec->nrels]);
- invals = (SharedInvalidationMessage *) &(subxacts[xlrec->nsubxacts]);
-
- DecodeCommit(ctx, buf, prec->xid, xlrec->dbId,
- xlrec->xact_time,
- xlrec->nsubxacts, subxacts,
- xlrec->nmsgs, invals);
-
- break;
- }
- case XLOG_XACT_COMMIT_COMPACT:
- {
- xl_xact_commit_compact *xlrec;
-
- xlrec = (xl_xact_commit_compact *) XLogRecGetData(r);
+ DecodeCommit(ctx, buf, xlrec, prec->xid);
- DecodeCommit(ctx, buf, XLogRecGetXid(r), InvalidOid,
- xlrec->xact_time,
- xlrec->nsubxacts, xlrec->subxacts,
- 0, NULL);
break;
}
case XLOG_XACT_ABORT:
{
xl_xact_abort *xlrec;
- TransactionId *sub_xids;
xlrec = (xl_xact_abort *) XLogRecGetData(r);
- sub_xids = (TransactionId *) &(xlrec->xnodes[xlrec->nrels]);
-
- DecodeAbort(ctx, buf->origptr, XLogRecGetXid(r),
- sub_xids, xlrec->nsubxacts);
+ DecodeAbort(ctx, buf, xlrec, XLogRecGetXid(r));
break;
}
case XLOG_XACT_ABORT_PREPARED:
{
xl_xact_abort_prepared *prec;
xl_xact_abort *xlrec;
- TransactionId *sub_xids;
/* prepared abort contain a normal commit abort... */
prec = (xl_xact_abort_prepared *) XLogRecGetData(r);
xlrec = &prec->arec;
- sub_xids = (TransactionId *) &(xlrec->xnodes[xlrec->nrels]);
-
/* r->xl_xid is committed in a separate record */
- DecodeAbort(ctx, buf->origptr, prec->xid,
- sub_xids, xlrec->nsubxacts);
+ DecodeAbort(ctx, buf, xlrec, prec->xid);
break;
}
@@ -477,27 +440,27 @@ DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
*/
static void
DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
- TransactionId xid, Oid dboid,
- TimestampTz commit_time,
- int nsubxacts, TransactionId *sub_xids,
- int ninval_msgs, SharedInvalidationMessage *msgs)
+ xl_xact_commit *xlrec, TransactionId xid)
{
+ xl_xact_parsed_commit parsed_c;
int i;
+ ParseCommitRecord(XLogRecGetInfo(buf->record), xlrec, &parsed_c);
+
/*
* Process invalidation messages, even if we're not interested in the
* transaction's contents, since the various caches need to always be
* consistent.
*/
- if (ninval_msgs > 0)
+ if (parsed_c.nmsgs > 0)
{
ReorderBufferAddInvalidations(ctx->reorder, xid, buf->origptr,
- ninval_msgs, msgs);
+ parsed_c.nmsgs, parsed_c.msgs);
ReorderBufferXidSetCatalogChanges(ctx->reorder, xid, buf->origptr);
}
SnapBuildCommitTxn(ctx->snapshot_builder, buf->origptr, xid,
- nsubxacts, sub_xids);
+ parsed_c.nsubxacts, parsed_c.subxacts);
/* ----
* Check whether we are interested in this specific transaction, and tell
@@ -524,12 +487,11 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
* ---
*/
if (SnapBuildXactNeedsSkip(ctx->snapshot_builder, buf->origptr) ||
- (dboid != InvalidOid && dboid != ctx->slot->data.database))
+ (parsed_c.dbId != InvalidOid && parsed_c.dbId != ctx->slot->data.database))
{
- for (i = 0; i < nsubxacts; i++)
+ for (i = 0; i < parsed_c.nsubxacts; i++)
{
- ReorderBufferForget(ctx->reorder, *sub_xids, buf->origptr);
- sub_xids++;
+ ReorderBufferForget(ctx->reorder, parsed_c.subxacts[i], buf->origptr);
}
ReorderBufferForget(ctx->reorder, xid, buf->origptr);
@@ -537,16 +499,15 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
}
/* tell the reorderbuffer about the surviving subtransactions */
- for (i = 0; i < nsubxacts; i++)
+ for (i = 0; i < parsed_c.nsubxacts; i++)
{
- ReorderBufferCommitChild(ctx->reorder, xid, *sub_xids,
+ ReorderBufferCommitChild(ctx->reorder, xid, parsed_c.subxacts[i],
buf->origptr, buf->endptr);
- sub_xids++;
}
/* replay actions of all transaction + subtransactions in order */
ReorderBufferCommit(ctx->reorder, xid, buf->origptr, buf->endptr,
- commit_time);
+ xlrec->xact_time);
}
/*
@@ -554,20 +515,24 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
* snapbuild.c and reorderbuffer.c
*/
static void
-DecodeAbort(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
- TransactionId *sub_xids, int nsubxacts)
+DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
+ xl_xact_abort *xlrec, TransactionId xid)
{
+ xl_xact_parsed_abort parsed_a;
int i;
- SnapBuildAbortTxn(ctx->snapshot_builder, lsn, xid, nsubxacts, sub_xids);
+ ParseAbortRecord(XLogRecGetInfo(buf->record), xlrec, &parsed_a);
+
+ SnapBuildAbortTxn(ctx->snapshot_builder, buf->record->EndRecPtr, xid,
+ parsed_a.nsubxacts, parsed_a.subxacts);
- for (i = 0; i < nsubxacts; i++)
+ for (i = 0; i < parsed_a.nsubxacts; i++)
{
- ReorderBufferAbort(ctx->reorder, *sub_xids, lsn);
- sub_xids++;
+ ReorderBufferAbort(ctx->reorder, parsed_a.subxacts[i],
+ buf->record->EndRecPtr);
}
- ReorderBufferAbort(ctx->reorder, xid, lsn);
+ ReorderBufferAbort(ctx->reorder, xid, buf->record->EndRecPtr);
}
/*
diff --git a/src/include/access/xact.h b/src/include/access/xact.h
index d7e5f64..122a383 100644
--- a/src/include/access/xact.h
+++ b/src/include/access/xact.h
@@ -18,6 +18,7 @@
#include "lib/stringinfo.h"
#include "nodes/pg_list.h"
#include "storage/relfilenode.h"
+#include "storage/sinval.h"
#include "utils/datetime.h"
@@ -103,8 +104,8 @@ typedef void (*SubXactCallback) (SubXactEvent event, SubTransactionId mySubid,
*/
/*
- * XLOG allows to store some information in high 4 bits of log
- * record xl_info field
+ * XLOG allows to store some information in high 4 bits of log record xl_info
+ * field. We use 3 for the opcode, and one about an optional flag variable.
*/
#define XLOG_XACT_COMMIT 0x00
#define XLOG_XACT_PREPARE 0x10
@@ -112,7 +113,36 @@ typedef void (*SubXactCallback) (SubXactEvent event, SubTransactionId mySubid,
#define XLOG_XACT_COMMIT_PREPARED 0x30
#define XLOG_XACT_ABORT_PREPARED 0x40
#define XLOG_XACT_ASSIGNMENT 0x50
-#define XLOG_XACT_COMMIT_COMPACT 0x60
+/* free opcode 0x60 */
+/* free opcode 0x70 */
+
+#define XLOG_XACT_OPMASK 0x70
+
+#define XLOG_XACT_HAS_INFO 0x80
+
+/*
+ * The following flags, stored in xinfo, determine which information is
+ * contained in commit/abort records.
+ */
+#define XACT_XINFO_HAS_DBINFO (1U << 0)
+#define XACT_XINFO_HAS_SUBXACTS (1U << 1)
+#define XACT_XINFO_HAS_RELFILENODES (1U << 2)
+#define XACT_XINFO_HAS_INVALS (1U << 3)
+
+/*
+ * Also stored in xinfo, these indicating a variety of additional actions that
+ * need to occur when emulating transaction effects during recovery.
+ *
+ * They are named XactCompletion... to differentiate them from
+ * EOXact... routines which run at the end of the original transaction
+ * completion.
+ */
+#define XACT_COMPLETION_UPDATE_RELCACHE_FILE (1U << 30)
+#define XACT_COMPLETION_FORCE_SYNC_COMMIT (1U << 31)
+
+/* Access macros for above flags */
+#define XactCompletionRelcacheInitFileInval(xinfo) (xinfo & XACT_COMPLETION_UPDATE_RELCACHE_FILE)
+#define XactCompletionForceSyncCommit(xinfo) (xinfo & XACT_COMPLETION_FORCE_SYNC_COMMIT)
typedef struct xl_xact_assignment
{
@@ -123,61 +153,62 @@ typedef struct xl_xact_assignment
#define MinSizeOfXactAssignment offsetof(xl_xact_assignment, xsub)
-typedef struct xl_xact_commit_compact
+/* sub-records for commit/abort */
+typedef struct xl_xact_dbinfo
+{
+ Oid dbId; /* MyDatabaseId */
+ Oid tsId; /* MyDatabaseTableSpace */
+} xl_xact_dbinfo;
+
+typedef struct xl_xact_subxacts
{
- TimestampTz xact_time; /* time of commit */
int nsubxacts; /* number of subtransaction XIDs */
- /* ARRAY OF COMMITTED SUBTRANSACTION XIDs FOLLOWS */
TransactionId subxacts[FLEXIBLE_ARRAY_MEMBER];
-} xl_xact_commit_compact;
+} xl_xact_subxacts;
-#define MinSizeOfXactCommitCompact offsetof(xl_xact_commit_compact, subxacts)
+#define MinSizeOfXactSubxacts offsetof(xl_xact_subxacts, subxacts)
-typedef struct xl_xact_commit
+typedef struct xl_xact_relfilenodes
{
- TimestampTz xact_time; /* time of commit */
- uint32 xinfo; /* info flags */
- int nrels; /* number of RelFileNodes */
- int nsubxacts; /* number of subtransaction XIDs */
- int nmsgs; /* number of shared inval msgs */
- Oid dbId; /* MyDatabaseId */
- Oid tsId; /* MyDatabaseTableSpace */
- /* Array of RelFileNode(s) to drop at commit */
+ int nrels; /* number of subtransaction XIDs */
RelFileNode xnodes[FLEXIBLE_ARRAY_MEMBER];
- /* ARRAY OF COMMITTED SUBTRANSACTION XIDs FOLLOWS */
- /* ARRAY OF SHARED INVALIDATION MESSAGES FOLLOWS */
-} xl_xact_commit;
+} xl_xact_relfilenodes;
-#define MinSizeOfXactCommit offsetof(xl_xact_commit, xnodes)
+#define MinSizeOfXactRelfilenodes offsetof(xl_xact_relfilenodes, xnodes)
-/*
- * These flags are set in the xinfo fields of WAL commit records,
- * indicating a variety of additional actions that need to occur
- * when emulating transaction effects during recovery.
- * They are named XactCompletion... to differentiate them from
- * EOXact... routines which run at the end of the original
- * transaction completion.
- */
-#define XACT_COMPLETION_UPDATE_RELCACHE_FILE 0x01
-#define XACT_COMPLETION_FORCE_SYNC_COMMIT 0x02
+typedef struct xl_xact_invals
+{
+ int nmsgs; /* number of shared inval msgs */
+ SharedInvalidationMessage msgs[FLEXIBLE_ARRAY_MEMBER];
+} xl_xact_invals;
-/* Access macros for above flags */
-#define XactCompletionRelcacheInitFileInval(xinfo) (xinfo & XACT_COMPLETION_UPDATE_RELCACHE_FILE)
-#define XactCompletionForceSyncCommit(xinfo) (xinfo & XACT_COMPLETION_FORCE_SYNC_COMMIT)
+#define MinSizeOfXactInvals offsetof(xl_xact_invals, msgs)
+
+typedef struct xl_xact_commit
+{
+ TimestampTz xact_time; /* time of commit */
+
+ /* xinfo flags follows if XLOG_XACT_HAS_INFO */
+ /* xl_xact_dbinfo follows if XINFO_HAS_DBINFO */
+ /* xl_xact_subxacts follows if XINFO_HAS_SUBXACT */
+ /* xl_xact_relfilenodes follows if XINFO_HAS_RELFILENODES */
+ /* xl_xact_invals follows if XINFO_HAS_INVALS */
+} xl_xact_commit;
+
+#define MinSizeOfXactCommit (offsetof(xl_xact_commit, xact_time) + sizeof(TimestampTz))
typedef struct xl_xact_abort
{
TimestampTz xact_time; /* time of abort */
- int nrels; /* number of RelFileNodes */
- int nsubxacts; /* number of subtransaction XIDs */
- /* Array of RelFileNode(s) to drop at abort */
- RelFileNode xnodes[FLEXIBLE_ARRAY_MEMBER];
- /* ARRAY OF ABORTED SUBTRANSACTION XIDs FOLLOWS */
+
+ /* xinfo flags follows if XLOG_XACT_HAS_INFO */
+ /* xl_xact_subxacts follows if HAS_SUBXACT */
+ /* xl_xact_relfilenodes follows if HAS_RELFILENODES */
} xl_xact_abort;
/* Note the intentional lack of an invalidation message array c.f. commit */
-#define MinSizeOfXactAbort offsetof(xl_xact_abort, xnodes)
+#define MinSizeOfXactAbort sizeof(xl_xact_abort)
/*
* COMMIT_PREPARED and ABORT_PREPARED are identical to COMMIT/ABORT records
@@ -192,8 +223,6 @@ typedef struct xl_xact_commit_prepared
/* MORE DATA FOLLOWS AT END OF STRUCT */
} xl_xact_commit_prepared;
-#define MinSizeOfXactCommitPrepared offsetof(xl_xact_commit_prepared, crec.xnodes)
-
typedef struct xl_xact_abort_prepared
{
TransactionId xid; /* XID of prepared xact */
@@ -201,7 +230,39 @@ typedef struct xl_xact_abort_prepared
/* MORE DATA FOLLOWS AT END OF STRUCT */
} xl_xact_abort_prepared;
-#define MinSizeOfXactAbortPrepared offsetof(xl_xact_abort_prepared, arec.xnodes)
+/*
+ * Commit/Abort records in the above form are a bit verbose to parse, so
+ * there's a deconstructed versions generated by ParseCommit/AbortRecord().
+ */
+typedef struct xl_xact_parsed_commit
+{
+ TimestampTz xact_time;
+ uint32 xinfo;
+
+ Oid dbId; /* MyDatabaseId */
+ Oid tsId; /* MyDatabaseTableSpace */
+
+ int nsubxacts;
+ TransactionId *subxacts;
+
+ int nrels;
+ RelFileNode *xnodes;
+
+ int nmsgs;
+ SharedInvalidationMessage *msgs;
+} xl_xact_parsed_commit;
+
+typedef struct xl_xact_parsed_abort
+{
+ TimestampTz xact_time;
+ uint32 xinfo;
+
+ int nsubxacts;
+ TransactionId *subxacts;
+
+ int nrels;
+ RelFileNode *xnodes;
+} xl_xact_parsed_abort;
/* ----------------
@@ -256,8 +317,23 @@ extern void UnregisterSubXactCallback(SubXactCallback callback, void *arg);
extern int xactGetCommittedChildren(TransactionId **ptr);
+extern uint8 XactEmitCommitRecord(TimestampTz commit_time,
+ int nsubxacts, TransactionId *subxacts,
+ int nrels, RelFileNode *rels,
+ int nmsgs, SharedInvalidationMessage *msgs,
+ bool relcacheInval, bool forceSync);
+
+extern uint8 XactEmitAbortRecord(TimestampTz abort_time,
+ int nsubxacts, TransactionId *subxacts,
+ int nrels, RelFileNode *rels);
extern void xact_redo(XLogReaderState *record);
+
+/* xactdesc.c */
extern void xact_desc(StringInfo buf, XLogReaderState *record);
extern const char *xact_identify(uint8 info);
+/* also in xactdesc.c, so they can be shared between front/backend code */
+extern void ParseCommitRecord(uint8 info, xl_xact_commit *xlrec, xl_xact_parsed_commit *parsed);
+extern void ParseAbortRecord(uint8 info, xl_xact_abort *xlrec, xl_xact_parsed_abort *parsed);
+
#endif /* XACT_H */
--
2.3.0.149.gf3f4077.dirty
On 02/20/2015 05:21 PM, Andres Freund wrote:
In the attached patch I've merged compact/noncompact code, made aborts
use similar logic to avoid including useless bytes and used both for the
2pc equivalents.
+1 for this approach in general.
To avoid using more space in the compact case the 'xinfo' field
indicating the presence of further data is only included when a byte in
the xl_info flag is set (similar to what heap rmgr does). That means
that transactions without subtransactions and other things are four
bytes smaller than before, but ones with a subtransaction but no other
complications 4 byte larger. database info, nsubxacts, nrels, nmsgs et
al are also only included if required. I think that's a overall win,
even disregarding wal_level = logical.
xinfo is a bit underdocumented now. The comment in xl_xact_commit should
mention that it's a uint32. But actually, why is it 32 bits wide? Only 6
bits are used, so it would in a single byte. I guess that's because of
alignment: now that it's 32 bits wide, that ensures that all the other
structs are 4 byte aligned. That should be documented. Alternatively,
store them unaligned to save the few bytes and use memcpy.
There's one bit that I'm not so sure about though: To avoid duplication
I've added Parse(Commit/Abort)Record(), but unfortunately that has to be
available both in front and backend code - so it's currently living in
xactdesc.c. I think we can live with that, but it's certainly not
pretty.
Yeah, that's ugly. Why does frontend code need that? The old format
isn't exactly trivial for frontend code to decode either.
Regarding XactEmitCommitRecord and XactEmitAbortRecord, I wonder if you
could pass an xl_xact_parsed/abort_commit struct to them, instead of the
individual fields? You could then also avoid the static variables inside
it, passing pointers to that struct to XLogRegisterData() instead.
- Heikki
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On 24 February 2015 at 18:51, Heikki Linnakangas
<hlinnakangas@vmware.com> wrote:
On 02/20/2015 05:21 PM, Andres Freund wrote:
In the attached patch I've merged compact/noncompact code, made aborts
use similar logic to avoid including useless bytes and used both for the
2pc equivalents.+1 for this approach in general.
+1 also
The compact commit record idea wasn't useful enough to stick with,
especially in the light of logical rep, possible future extensions to
the commit record and the prevalence of subtransactions.
XactEmitCommitRecord and XactEmitAbortRecord are somewhat hard to
understand. More comments or clearer design please.
The record format itself needs more explanation than I see in the
patch. Might be worth documenting when and how those fields would be
populated also (my bad).
It might be useful to have the optional field descriptions be named
with a _opt suffix to make it clearer.
--
Simon Riggs http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, RemoteDBA, Training & Services
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
Hi,
On 2015-02-24 20:51:42 +0200, Heikki Linnakangas wrote:
On 02/20/2015 05:21 PM, Andres Freund wrote:
To avoid using more space in the compact case the 'xinfo' field
indicating the presence of further data is only included when a byte in
the xl_info flag is set (similar to what heap rmgr does). That means
that transactions without subtransactions and other things are four
bytes smaller than before, but ones with a subtransaction but no other
complications 4 byte larger. database info, nsubxacts, nrels, nmsgs et
al are also only included if required. I think that's a overall win,
even disregarding wal_level = logical.xinfo is a bit underdocumented now.
Yea, the whole thing needs a bit more comments. I really wanted some
feedback before sinking even more time into this...
The comment in xl_xact_commit should mention that it's a uint32. But
actually, why is it 32 bits wide?
The first reason is that it's essentially just the old 'xinfo' field
moved to a different place ;). I think I'll add a xl_xact_flags or so,
and document it there.
Only 6 bits are used, so it would in a single byte. I guess that's
because of alignment: now that it's 32 bits wide, that ensures that
all the other structs are 4 byte aligned. That should be
documented. Alternatively, store them unaligned to save the few bytes
and use memcpy.
But yes, I didn't think about changing it because of alignment. Given
that commit and abort records can be fairly large, it seems better to
waste three bytes in a few scenarios (not the minimal one!) than
allocate another large chunk of memory for the copied data.
Even in the current code there's some rather underdocumented assumptions
about alignment; namely that RelFileNode+, TransactionId+ always result
in a acceptable alignment for the data after them. That happens to be
the case, but is worth a comment and/or a MAXALIGN.
There's one bit that I'm not so sure about though: To avoid duplication
I've added Parse(Commit/Abort)Record(), but unfortunately that has to be
available both in front and backend code - so it's currently living in
xactdesc.c. I think we can live with that, but it's certainly not
pretty.Yeah, that's ugly. Why does frontend code need that? The old format
isn't exactly trivial for frontend code to decode either.
pg_xlogdump outputs subxacts and such; I don't forsee other
usages. Sure, we could copy the code around, but I think that's worse
than having it in xactdesc.c. Needs a comment explaining why it's there
if I haven't added one already.
Regarding XactEmitCommitRecord and XactEmitAbortRecord, I wonder if you
could pass an xl_xact_parsed/abort_commit struct to them, instead of the
individual fields? You could then also avoid the static variables inside it,
passing pointers to that struct to XLogRegisterData() instead.
Hm, that's an idea. And rename it to xaxt_commit/abort_data?
Greetings,
Andres Freund
--
Andres Freund http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Training & Services
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On Wed, Feb 25, 2015 at 8:10 PM, Andres Freund <andres@2ndquadrant.com> wrote:
On 2015-02-24 20:51:42 +0200, Heikki Linnakangas wrote:
On 02/20/2015 05:21 PM, Andres Freund wrote:
There's one bit that I'm not so sure about though: To avoid duplication
I've added Parse(Commit/Abort)Record(), but unfortunately that has to be
available both in front and backend code - so it's currently living in
xactdesc.c. I think we can live with that, but it's certainly not
pretty.Yeah, that's ugly. Why does frontend code need that? The old format
isn't exactly trivial for frontend code to decode either.pg_xlogdump outputs subxacts and such; I don't forsee other
usages. Sure, we could copy the code around, but I think that's worse
than having it in xactdesc.c. Needs a comment explaining why it's there
if I haven't added one already.
FWIW, I think they would live better in frontend code for client applications.
That's a nice patch. +1 for merging them. Here are a couple of comments:
+/* Parse the WAL format of a xact abort into a easier to understand format. */
+void
+ParseCommitRecord(uint8 info, xl_xact_commit *xlrec,
xl_xact_parsed_commit *parsed)
I think that you mean here of "an xact commit", not abort.
+ * Emit, but don't insert, a abort record.
s/a abort/an abort/
XactEmitAbortRecord has some problems with tabs replaced by 4 spaces
at a couple of places.
+/* free opcode 0x70 */
+
+#define XLOG_XACT_OPMASK 0x70
There is a contradiction here, 0x70 is not free.
Regards,
--
Michael
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On 2015-02-27 16:26:08 +0900, Michael Paquier wrote:
On Wed, Feb 25, 2015 at 8:10 PM, Andres Freund <andres@2ndquadrant.com> wrote:
On 2015-02-24 20:51:42 +0200, Heikki Linnakangas wrote:
On 02/20/2015 05:21 PM, Andres Freund wrote:
There's one bit that I'm not so sure about though: To avoid duplication
I've added Parse(Commit/Abort)Record(), but unfortunately that has to be
available both in front and backend code - so it's currently living in
xactdesc.c. I think we can live with that, but it's certainly not
pretty.Yeah, that's ugly. Why does frontend code need that? The old format
isn't exactly trivial for frontend code to decode either.pg_xlogdump outputs subxacts and such; I don't forsee other
usages. Sure, we could copy the code around, but I think that's worse
than having it in xactdesc.c. Needs a comment explaining why it's there
if I haven't added one already.FWIW, I think they would live better in frontend code for client applications.
What do you mean with that? You mean you'd rather see a copy in
pg_xlogdump somewhere? How would you trigger that being used instead of
the normal description routine?
+/* free opcode 0x70 */ + +#define XLOG_XACT_OPMASK 0x70 There is a contradiction here, 0x70 is not free.
The above is a mask, not an opcode - so I don't think it's a
contraction. I'll expand on the commentary on the next version.
Thanks for having a look!
Greetings,
Andres Freund
--
Andres Freund http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Training & Services
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On Fri, Feb 27, 2015 at 6:49 PM, Andres Freund <andres@2ndquadrant.com> wrote:
On 2015-02-27 16:26:08 +0900, Michael Paquier wrote:
On Wed, Feb 25, 2015 at 8:10 PM, Andres Freund <andres@2ndquadrant.com> wrote:
On 2015-02-24 20:51:42 +0200, Heikki Linnakangas wrote:
On 02/20/2015 05:21 PM, Andres Freund wrote:
There's one bit that I'm not so sure about though: To avoid duplication
I've added Parse(Commit/Abort)Record(), but unfortunately that has to be
available both in front and backend code - so it's currently living in
xactdesc.c. I think we can live with that, but it's certainly not
pretty.Yeah, that's ugly. Why does frontend code need that? The old format
isn't exactly trivial for frontend code to decode either.pg_xlogdump outputs subxacts and such; I don't forsee other
usages. Sure, we could copy the code around, but I think that's worse
than having it in xactdesc.c. Needs a comment explaining why it's there
if I haven't added one already.FWIW, I think they would live better in frontend code for client applications.
What do you mean with that? You mean you'd rather see a copy in
pg_xlogdump somewhere? How would you trigger that being used instead of
the normal description routine?
No, no. I meant that it is good the way your patch does it in
xactdesc.c, where both frontend and backend can reach it.
--
Michael
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On Fri, Feb 27, 2015 at 7:10 AM, Michael Paquier
<michael.paquier@gmail.com> wrote:
No, no. I meant that it is good the way your patch does it in
xactdesc.c, where both frontend and backend can reach it.
Agreed, that seems much better than duplicating it.
--
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On 2015-02-25 12:10:42 +0100, Andres Freund wrote:
On 2015-02-24 20:51:42 +0200, Heikki Linnakangas wrote:
Regarding XactEmitCommitRecord and XactEmitAbortRecord, I wonder if you
could pass an xl_xact_parsed/abort_commit struct to them, instead of the
individual fields? You could then also avoid the static variables inside it,
passing pointers to that struct to XLogRegisterData() instead.Hm, that's an idea. And rename it to xaxt_commit/abort_data?
So, I just tried this, and it doesn't really seem to come out as a net
positive. More code at the callsites and more complex code in the *Emit*
routines. It's impossible to use [FLEXIBLE_ARRAY_MEMBER] employing
datatypes in xact_commit_data while emitting because there obviously are
several chunks needing it. And avoid using it would make for a slightly
less clear format.
So unless you feel strongly about it, don't think so, I'll keep the
statics, even if they're not particularly pretty.
Greetings,
Andres Freund
--
Andres Freund http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Training & Services
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On 03/02/2015 06:51 PM, Andres Freund wrote:
On 2015-02-25 12:10:42 +0100, Andres Freund wrote:
On 2015-02-24 20:51:42 +0200, Heikki Linnakangas wrote:
Regarding XactEmitCommitRecord and XactEmitAbortRecord, I wonder if you
could pass an xl_xact_parsed/abort_commit struct to them, instead of the
individual fields? You could then also avoid the static variables inside it,
passing pointers to that struct to XLogRegisterData() instead.Hm, that's an idea. And rename it to xaxt_commit/abort_data?
So, I just tried this, and it doesn't really seem to come out as a net
positive. More code at the callsites and more complex code in the *Emit*
routines. It's impossible to use [FLEXIBLE_ARRAY_MEMBER] employing
datatypes in xact_commit_data while emitting because there obviously are
several chunks needing it. And avoid using it would make for a slightly
less clear format.So unless you feel strongly about it, don't think so, I'll keep the
statics, even if they're not particularly pretty.
Come to think of it, it would be cleaner anyway to move the
XLogBeginInsert() and XLogInsert() calls inside XactEmitCommitRecord.
Then those structs don't need to be static either.
- Heikki
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On 2015-03-02 19:11:15 +0200, Heikki Linnakangas wrote:
Come to think of it, it would be cleaner anyway to move the
XLogBeginInsert() and XLogInsert() calls inside XactEmitCommitRecord. Then
those structs don't need to be static either.
That was my first thought as well - but it doesn't easily work because
of the way commit/abort records are embedded into twophase.c. I
couldn't come with a simple way to change that, and anythign non simple
imo defeats the purpose.
Greetings,
Andres Freund
--
Andres Freund http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Training & Services
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On 03/02/2015 07:14 PM, Andres Freund wrote:
On 2015-03-02 19:11:15 +0200, Heikki Linnakangas wrote:
Come to think of it, it would be cleaner anyway to move the
XLogBeginInsert() and XLogInsert() calls inside XactEmitCommitRecord. Then
those structs don't need to be static either.That was my first thought as well - but it doesn't easily work because
of the way commit/abort records are embedded into twophase.c. I
couldn't come with a simple way to change that, and anythign non simple
imo defeats the purpose.
Pass the prepared XID as yet another argument to XactEmitCommitRecord,
and have XactEmitCommitRecord emit the xl_xact_commit_prepared part of
the record too. It might even make sense to handle the prepared XID like
all the other optional fields and add an xinfo flag for it.
- Heikki
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On 2015-03-02 19:23:56 +0200, Heikki Linnakangas wrote:
On 03/02/2015 07:14 PM, Andres Freund wrote:
On 2015-03-02 19:11:15 +0200, Heikki Linnakangas wrote:
Come to think of it, it would be cleaner anyway to move the
XLogBeginInsert() and XLogInsert() calls inside XactEmitCommitRecord. Then
those structs don't need to be static either.That was my first thought as well - but it doesn't easily work because
of the way commit/abort records are embedded into twophase.c. I
couldn't come with a simple way to change that, and anythign non simple
imo defeats the purpose.Pass the prepared XID as yet another argument to XactEmitCommitRecord, and
have XactEmitCommitRecord emit the xl_xact_commit_prepared part of the
record too. It might even make sense to handle the prepared XID like all the
other optional fields and add an xinfo flag for it.
That's what I mean with "non simple". Not a fan of teaching xact.c even
more about twophase's dealings than it already knows. We'd have to
either teach XactEmit* to insert different types of records pased a
parameter or switch in xact_redo() based on the availability of the
separate twophase xid via xinfo. Doesn't strike me as an improvement at
all.
I'd rather live with some statics in an isolated location - it's not as
if we're ever going to allow two concurrent xlog inserts to happen at
the same time.
Greetings,
Andres Freund
--
Andres Freund http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Training & Services
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On 2015-03-02 18:45:18 +0100, Andres Freund wrote:
On 2015-03-02 19:23:56 +0200, Heikki Linnakangas wrote:
Pass the prepared XID as yet another argument to XactEmitCommitRecord, and
have XactEmitCommitRecord emit the xl_xact_commit_prepared part of the
record too. It might even make sense to handle the prepared XID like all the
other optional fields and add an xinfo flag for it.That's what I mean with "non simple". Not a fan of teaching xact.c even
more about twophase's dealings than it already knows.
I made an effort to show how horrible it would look like. Turns out it's
not that bad ;). Far from pretty, especially in xlog.c, but
acceptable. I think treating them more similar actually might open the
road for reducing the difference further at some point.
The attached patch does pretty much what you suggested above and tries
to address all the point previously made. I plan to push this fairly
soon; unless somebody has further input.
Greetings,
Andres Freund
--
Andres Freund http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Training & Services
Attachments:
0001-Merge-the-various-forms-of-transaction-commit-abort-.patchtext/x-patch; charset=us-asciiDownload
>From 7557d0afa860c37a1992240b54e2bf3710296fc7 Mon Sep 17 00:00:00 2001
From: Andres Freund <andres@anarazel.de>
Date: Sun, 15 Mar 2015 14:42:09 +0100
Subject: [PATCH] Merge the various forms of transaction commit & abort
records.
Since 465883b0a two versions of commit records have existed. A compact
version that was used when no cache invalidations, smgr unlinks and
similar were needed, and a full version that could deal with all
that. Additionally the full version was embedded into twophase commit
records.
That resulted in a measurable reduction in the size of the logged WAL in
some workloads. But more recently additions like logical decoding, which
e.g. needs information about the database something was executed on,
made it applicable in fewer situations. The static split generally made
it hard to expand the commit record, because concerns over the size made
it hard to add anything to the compact version.
Additionally it's not particularly pretty to have twophase.c insert
RM_XACT records.
Rejigger things so that the commit and abort records only have one form
each, including the twophase equivalents. The presence of the various
optional (in the sense of not being in every record) pieces is indicated
by a bits in the 'xinfo' flag. That flag previously was not included in
compact commit records. To prevent an increase in size due to its
presence, it's only included if necessary; signalled by a bit in the
xl_info bits available for xact.c, similar to heapam.c's
XLOG_HEAP_OPMASK/XLOG_HEAP_INIT_PAGE.
Twophase commit/aborts are now the same as their normal
counterparts. The original transaction's xid is included in an optional
data field.
This means that commit records generally are smaller, except in the case
of a transaction with subtransactions, but no other special cases; the
increase there is four bytes, which seems acceptable given that the more
common case of not having subtransactions shrank. The savings are
especially measurable for twophase commits, which previously always used
the full version; but will in practice only infrequently have required
that.
The motivation for this work are not the space savings and and
deduplication though; it's that it makes it easier to extend commit
records with additional information. That's just a few lines of code
now; without impacting the common case where that information is not
needed.
Discussion: 20150220152150.GD4149@awork2.anarazel.de,
235610.92468.qm%40web29004.mail.ird.yahoo.com
Reviewed-By: Heikki Linnakangas, Simon Riggs
---
src/backend/access/rmgrdesc/xactdesc.c | 245 +++++++++++-----
src/backend/access/transam/twophase.c | 59 +---
src/backend/access/transam/xact.c | 476 +++++++++++++++++++------------
src/backend/access/transam/xlog.c | 126 ++++----
src/backend/replication/logical/decode.c | 133 +++------
src/include/access/xact.h | 215 ++++++++++----
src/include/access/xlog_internal.h | 2 +-
7 files changed, 749 insertions(+), 507 deletions(-)
diff --git a/src/backend/access/rmgrdesc/xactdesc.c b/src/backend/access/rmgrdesc/xactdesc.c
index 3e87978..b036b6d 100644
--- a/src/backend/access/rmgrdesc/xactdesc.c
+++ b/src/backend/access/rmgrdesc/xactdesc.c
@@ -14,53 +14,188 @@
*/
#include "postgres.h"
+#include "access/transam.h"
#include "access/xact.h"
#include "catalog/catalog.h"
#include "storage/sinval.h"
#include "utils/timestamp.h"
+/*
+ * Parse the WAL format of a xact commit and abort records into a easier to
+ * understand format.
+ *
+ * This routines are in xactdesc.c because they're accessed in backend (when
+ * replaying WAL) and frontend (pg_xlogdump) code. This file is the only xact
+ * specific one shared between both. They're complicated enough that
+ * duplication would be bothersome.
+ */
+
+void
+ParseCommitRecord(uint8 info, xl_xact_commit *xlrec, xl_xact_parsed_commit *parsed)
+{
+ char *data = ((char *) xlrec) + MinSizeOfXactCommit;
+
+ memset(parsed, 0, sizeof(*parsed));
+
+ parsed->xinfo = 0; /* default, if no XLOG_XACT_HAS_INFO is present */
+
+ parsed->xact_time = xlrec->xact_time;
+
+ if (info & XLOG_XACT_HAS_INFO)
+ {
+ xl_xact_xinfo *xl_xinfo = (xl_xact_xinfo *) data;
+
+ parsed->xinfo = xl_xinfo->xinfo;
+
+ data += sizeof(xl_xact_xinfo);
+ }
+
+ if (parsed->xinfo & XACT_XINFO_HAS_DBINFO)
+ {
+ xl_xact_dbinfo *xl_dbinfo = (xl_xact_dbinfo *) data;
+
+ parsed->dbId = xl_dbinfo->dbId;
+ parsed->tsId = xl_dbinfo->tsId;
+
+ data += sizeof(xl_xact_dbinfo);
+ }
+
+ if (parsed->xinfo & XACT_XINFO_HAS_SUBXACTS)
+ {
+ xl_xact_subxacts *xl_subxacts = (xl_xact_subxacts *) data;
+
+ parsed->nsubxacts = xl_subxacts->nsubxacts;
+ parsed->subxacts = xl_subxacts->subxacts;
+
+ data += MinSizeOfXactSubxacts;
+ data += parsed->nsubxacts * sizeof(TransactionId);
+ }
+
+ if (parsed->xinfo & XACT_XINFO_HAS_RELFILENODES)
+ {
+ xl_xact_relfilenodes *xl_relfilenodes = (xl_xact_relfilenodes *) data;
+
+ parsed->nrels = xl_relfilenodes->nrels;
+ parsed->xnodes = xl_relfilenodes->xnodes;
+
+ data += MinSizeOfXactRelfilenodes;
+ data += xl_relfilenodes->nrels * sizeof(RelFileNode);
+ }
+
+ if (parsed->xinfo & XACT_XINFO_HAS_INVALS)
+ {
+ xl_xact_invals *xl_invals = (xl_xact_invals *) data;
+
+ parsed->nmsgs = xl_invals->nmsgs;
+ parsed->msgs = xl_invals->msgs;
+
+ data += MinSizeOfXactInvals;
+ data += xl_invals->nmsgs * sizeof(SharedInvalidationMessage);
+ }
+
+ if (parsed->xinfo & XACT_XINFO_HAS_TWOPHASE)
+ {
+ xl_xact_twophase *xl_twophase = (xl_xact_twophase *) data;
+
+ parsed->twophase_xid = xl_twophase->xid;
+
+ data += sizeof(xl_xact_twophase);
+ }
+}
+
+void
+ParseAbortRecord(uint8 info, xl_xact_abort *xlrec, xl_xact_parsed_abort *parsed)
+{
+ char *data = ((char *) xlrec) + MinSizeOfXactAbort;
+
+ memset(parsed, 0, sizeof(*parsed));
+
+ parsed->xinfo = 0; /* default, if no XLOG_XACT_HAS_INFO is present */
+
+ parsed->xact_time = xlrec->xact_time;
+
+ if (info & XLOG_XACT_HAS_INFO)
+ {
+ xl_xact_xinfo *xl_xinfo = (xl_xact_xinfo *) data;
+
+ parsed->xinfo = xl_xinfo->xinfo;
+
+ data += sizeof(xl_xact_xinfo);
+ }
+
+ if (parsed->xinfo & XACT_XINFO_HAS_SUBXACTS)
+ {
+ xl_xact_subxacts *xl_subxacts = (xl_xact_subxacts *) data;
+
+ parsed->nsubxacts = xl_subxacts->nsubxacts;
+ parsed->subxacts = xl_subxacts->subxacts;
+
+ data += MinSizeOfXactSubxacts;
+ data += parsed->nsubxacts * sizeof(TransactionId);
+ }
+
+ if (parsed->xinfo & XACT_XINFO_HAS_RELFILENODES)
+ {
+ xl_xact_relfilenodes *xl_relfilenodes = (xl_xact_relfilenodes *) data;
+
+ parsed->nrels = xl_relfilenodes->nrels;
+ parsed->xnodes = xl_relfilenodes->xnodes;
+
+ data += MinSizeOfXactRelfilenodes;
+ data += xl_relfilenodes->nrels * sizeof(RelFileNode);
+ }
+
+ if (parsed->xinfo & XACT_XINFO_HAS_TWOPHASE)
+ {
+ xl_xact_twophase *xl_twophase = (xl_xact_twophase *) data;
+
+ parsed->twophase_xid = xl_twophase->xid;
+
+ data += sizeof(xl_xact_twophase);
+ }
+}
static void
-xact_desc_commit(StringInfo buf, xl_xact_commit *xlrec)
+xact_desc_commit(StringInfo buf, uint8 info, xl_xact_commit *xlrec)
{
+ xl_xact_parsed_commit parsed;
int i;
- TransactionId *subxacts;
- subxacts = (TransactionId *) &xlrec->xnodes[xlrec->nrels];
+ ParseCommitRecord(info, xlrec, &parsed);
+
+ /* If this is a prepared xact, show the xid of the original xact */
+ if (TransactionIdIsValid(parsed.twophase_xid))
+ appendStringInfo(buf, "%u: ", parsed.twophase_xid);
appendStringInfoString(buf, timestamptz_to_str(xlrec->xact_time));
- if (xlrec->nrels > 0)
+ if (parsed.nrels > 0)
{
appendStringInfoString(buf, "; rels:");
- for (i = 0; i < xlrec->nrels; i++)
+ for (i = 0; i < parsed.nrels; i++)
{
- char *path = relpathperm(xlrec->xnodes[i], MAIN_FORKNUM);
+ char *path = relpathperm(parsed.xnodes[i], MAIN_FORKNUM);
appendStringInfo(buf, " %s", path);
pfree(path);
}
}
- if (xlrec->nsubxacts > 0)
+ if (parsed.nsubxacts > 0)
{
appendStringInfoString(buf, "; subxacts:");
- for (i = 0; i < xlrec->nsubxacts; i++)
- appendStringInfo(buf, " %u", subxacts[i]);
+ for (i = 0; i < parsed.nsubxacts; i++)
+ appendStringInfo(buf, " %u", parsed.subxacts[i]);
}
- if (xlrec->nmsgs > 0)
+ if (parsed.nmsgs > 0)
{
- SharedInvalidationMessage *msgs;
-
- msgs = (SharedInvalidationMessage *) &subxacts[xlrec->nsubxacts];
-
- if (XactCompletionRelcacheInitFileInval(xlrec->xinfo))
+ if (XactCompletionRelcacheInitFileInval(parsed.xinfo))
appendStringInfo(buf, "; relcache init file inval dbid %u tsid %u",
- xlrec->dbId, xlrec->tsId);
+ parsed.dbId, parsed.tsId);
appendStringInfoString(buf, "; inval msgs:");
- for (i = 0; i < xlrec->nmsgs; i++)
+ for (i = 0; i < parsed.nmsgs; i++)
{
- SharedInvalidationMessage *msg = &msgs[i];
+ SharedInvalidationMessage *msg = &parsed.msgs[i];
if (msg->id >= 0)
appendStringInfo(buf, " catcache %d", msg->id);
@@ -80,48 +215,41 @@ xact_desc_commit(StringInfo buf, xl_xact_commit *xlrec)
appendStringInfo(buf, " unknown id %d", msg->id);
}
}
+
+ if (XactCompletionForceSyncCommit(parsed.xinfo))
+ appendStringInfo(buf, "; sync");
}
static void
-xact_desc_commit_compact(StringInfo buf, xl_xact_commit_compact *xlrec)
+xact_desc_abort(StringInfo buf, uint8 info, xl_xact_abort *xlrec)
{
+ xl_xact_parsed_abort parsed;
int i;
- appendStringInfoString(buf, timestamptz_to_str(xlrec->xact_time));
+ ParseAbortRecord(info, xlrec, &parsed);
- if (xlrec->nsubxacts > 0)
- {
- appendStringInfoString(buf, "; subxacts:");
- for (i = 0; i < xlrec->nsubxacts; i++)
- appendStringInfo(buf, " %u", xlrec->subxacts[i]);
- }
-}
-
-static void
-xact_desc_abort(StringInfo buf, xl_xact_abort *xlrec)
-{
- int i;
+ /* If this is a prepared xact, show the xid of the original xact */
+ if (TransactionIdIsValid(parsed.twophase_xid))
+ appendStringInfo(buf, "%u: ", parsed.twophase_xid);
appendStringInfoString(buf, timestamptz_to_str(xlrec->xact_time));
- if (xlrec->nrels > 0)
+ if (parsed.nrels > 0)
{
appendStringInfoString(buf, "; rels:");
- for (i = 0; i < xlrec->nrels; i++)
+ for (i = 0; i < parsed.nrels; i++)
{
- char *path = relpathperm(xlrec->xnodes[i], MAIN_FORKNUM);
+ char *path = relpathperm(parsed.xnodes[i], MAIN_FORKNUM);
appendStringInfo(buf, " %s", path);
pfree(path);
}
}
- if (xlrec->nsubxacts > 0)
- {
- TransactionId *xacts = (TransactionId *)
- &xlrec->xnodes[xlrec->nrels];
+ if (parsed.nsubxacts > 0)
+ {
appendStringInfoString(buf, "; subxacts:");
- for (i = 0; i < xlrec->nsubxacts; i++)
- appendStringInfo(buf, " %u", xacts[i]);
+ for (i = 0; i < parsed.nsubxacts; i++)
+ appendStringInfo(buf, " %u", parsed.subxacts[i]);
}
}
@@ -140,39 +268,19 @@ void
xact_desc(StringInfo buf, XLogReaderState *record)
{
char *rec = XLogRecGetData(record);
- uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
-
- if (info == XLOG_XACT_COMMIT_COMPACT)
- {
- xl_xact_commit_compact *xlrec = (xl_xact_commit_compact *) rec;
+ uint8 info = XLogRecGetInfo(record) & XLOG_XACT_OPMASK;
- xact_desc_commit_compact(buf, xlrec);
- }
- else if (info == XLOG_XACT_COMMIT)
+ if (info == XLOG_XACT_COMMIT || info == XLOG_XACT_COMMIT_PREPARED)
{
xl_xact_commit *xlrec = (xl_xact_commit *) rec;
- xact_desc_commit(buf, xlrec);
+ xact_desc_commit(buf, XLogRecGetInfo(record), xlrec);
}
- else if (info == XLOG_XACT_ABORT)
+ else if (info == XLOG_XACT_ABORT || info == XLOG_XACT_ABORT_PREPARED)
{
xl_xact_abort *xlrec = (xl_xact_abort *) rec;
- xact_desc_abort(buf, xlrec);
- }
- else if (info == XLOG_XACT_COMMIT_PREPARED)
- {
- xl_xact_commit_prepared *xlrec = (xl_xact_commit_prepared *) rec;
-
- appendStringInfo(buf, "%u: ", xlrec->xid);
- xact_desc_commit(buf, &xlrec->crec);
- }
- else if (info == XLOG_XACT_ABORT_PREPARED)
- {
- xl_xact_abort_prepared *xlrec = (xl_xact_abort_prepared *) rec;
-
- appendStringInfo(buf, "%u: ", xlrec->xid);
- xact_desc_abort(buf, &xlrec->arec);
+ xact_desc_abort(buf, XLogRecGetInfo(record), xlrec);
}
else if (info == XLOG_XACT_ASSIGNMENT)
{
@@ -193,7 +301,7 @@ xact_identify(uint8 info)
{
const char *id = NULL;
- switch (info & ~XLR_INFO_MASK)
+ switch (info & XLOG_XACT_OPMASK)
{
case XLOG_XACT_COMMIT:
id = "COMMIT";
@@ -213,9 +321,6 @@ xact_identify(uint8 info)
case XLOG_XACT_ASSIGNMENT:
id = "ASSIGNMENT";
break;
- case XLOG_XACT_COMMIT_COMPACT:
- id = "COMMIT_COMPACT";
- break;
}
return id;
diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c
index 6edc227..4075a6f 100644
--- a/src/backend/access/transam/twophase.c
+++ b/src/backend/access/transam/twophase.c
@@ -2079,7 +2079,6 @@ RecordTransactionCommitPrepared(TransactionId xid,
SharedInvalidationMessage *invalmsgs,
bool initfileinval)
{
- xl_xact_commit_prepared xlrec;
XLogRecPtr recptr;
START_CRIT_SECTION();
@@ -2088,36 +2087,11 @@ RecordTransactionCommitPrepared(TransactionId xid,
MyPgXact->delayChkpt = true;
/* Emit the XLOG commit record */
- xlrec.xid = xid;
-
- xlrec.crec.xinfo = initfileinval ? XACT_COMPLETION_UPDATE_RELCACHE_FILE : 0;
-
- xlrec.crec.dbId = MyDatabaseId;
- xlrec.crec.tsId = MyDatabaseTableSpace;
-
- xlrec.crec.xact_time = GetCurrentTimestamp();
- xlrec.crec.nrels = nrels;
- xlrec.crec.nsubxacts = nchildren;
- xlrec.crec.nmsgs = ninvalmsgs;
-
- XLogBeginInsert();
- XLogRegisterData((char *) (&xlrec), MinSizeOfXactCommitPrepared);
-
- /* dump rels to delete */
- if (nrels > 0)
- XLogRegisterData((char *) rels, nrels * sizeof(RelFileNode));
-
- /* dump committed child Xids */
- if (nchildren > 0)
- XLogRegisterData((char *) children,
- nchildren * sizeof(TransactionId));
-
- /* dump cache invalidation messages */
- if (ninvalmsgs > 0)
- XLogRegisterData((char *) invalmsgs,
- ninvalmsgs * sizeof(SharedInvalidationMessage));
-
- recptr = XLogInsert(RM_XACT_ID, XLOG_XACT_COMMIT_PREPARED);
+ recptr = XactLogCommitRecord(GetCurrentTimestamp(),
+ nchildren, children, nrels, rels,
+ ninvalmsgs, invalmsgs,
+ initfileinval, false,
+ xid);
/*
* We don't currently try to sleep before flush here ... nor is there any
@@ -2160,7 +2134,6 @@ RecordTransactionAbortPrepared(TransactionId xid,
int nrels,
RelFileNode *rels)
{
- xl_xact_abort_prepared xlrec;
XLogRecPtr recptr;
/*
@@ -2174,24 +2147,10 @@ RecordTransactionAbortPrepared(TransactionId xid,
START_CRIT_SECTION();
/* Emit the XLOG abort record */
- xlrec.xid = xid;
- xlrec.arec.xact_time = GetCurrentTimestamp();
- xlrec.arec.nrels = nrels;
- xlrec.arec.nsubxacts = nchildren;
-
- XLogBeginInsert();
- XLogRegisterData((char *) (&xlrec), MinSizeOfXactAbortPrepared);
-
- /* dump rels to delete */
- if (nrels > 0)
- XLogRegisterData((char *) rels, nrels * sizeof(RelFileNode));
-
- /* dump committed child Xids */
- if (nchildren > 0)
- XLogRegisterData((char *) children,
- nchildren * sizeof(TransactionId));
-
- recptr = XLogInsert(RM_XACT_ID, XLOG_XACT_ABORT_PREPARED);
+ recptr = XactLogAbortRecord(GetCurrentTimestamp(),
+ nchildren, children,
+ nrels, rels,
+ xid);
/* Always flush, since we're about to remove the 2PC state file */
XLogFlush(recptr);
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index 89769ea..729c02d 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -1068,70 +1068,11 @@ RecordTransactionCommit(void)
SetCurrentTransactionStopTimestamp();
- /*
- * Do we need the long commit record? If not, use the compact format.
- *
- * For now always use the non-compact version if wal_level=logical, so
- * we can hide commits from other databases. TODO: In the future we
- * should merge compact and non-compact commits and use a flags
- * variable to determine if it contains subxacts, relations or
- * invalidation messages, that's more extensible and degrades more
- * gracefully. Till then, it's just 20 bytes of overhead.
- */
- if (nrels > 0 || nmsgs > 0 || RelcacheInitFileInval || forceSyncCommit ||
- XLogLogicalInfoActive())
- {
- xl_xact_commit xlrec;
-
- /*
- * Set flags required for recovery processing of commits.
- */
- xlrec.xinfo = 0;
- if (RelcacheInitFileInval)
- xlrec.xinfo |= XACT_COMPLETION_UPDATE_RELCACHE_FILE;
- if (forceSyncCommit)
- xlrec.xinfo |= XACT_COMPLETION_FORCE_SYNC_COMMIT;
-
- xlrec.dbId = MyDatabaseId;
- xlrec.tsId = MyDatabaseTableSpace;
-
- xlrec.xact_time = xactStopTimestamp;
- xlrec.nrels = nrels;
- xlrec.nsubxacts = nchildren;
- xlrec.nmsgs = nmsgs;
-
- XLogBeginInsert();
- XLogRegisterData((char *) (&xlrec), MinSizeOfXactCommit);
- /* dump rels to delete */
- if (nrels > 0)
- XLogRegisterData((char *) rels,
- nrels * sizeof(RelFileNode));
- /* dump committed child Xids */
- if (nchildren > 0)
- XLogRegisterData((char *) children,
- nchildren * sizeof(TransactionId));
- /* dump shared cache invalidation messages */
- if (nmsgs > 0)
- XLogRegisterData((char *) invalMessages,
- nmsgs * sizeof(SharedInvalidationMessage));
- (void) XLogInsert(RM_XACT_ID, XLOG_XACT_COMMIT);
- }
- else
- {
- xl_xact_commit_compact xlrec;
-
- xlrec.xact_time = xactStopTimestamp;
- xlrec.nsubxacts = nchildren;
-
- XLogBeginInsert();
- XLogRegisterData((char *) (&xlrec), MinSizeOfXactCommitCompact);
- /* dump committed child Xids */
- if (nchildren > 0)
- XLogRegisterData((char *) children,
- nchildren * sizeof(TransactionId));
-
- (void) XLogInsert(RM_XACT_ID, XLOG_XACT_COMMIT_COMPACT);
- }
+ XactLogCommitRecord(xactStopTimestamp,
+ nchildren, children, nrels, rels,
+ nmsgs, invalMessages,
+ RelcacheInitFileInval, forceSyncCommit,
+ InvalidTransactionId /* plain commit */);
}
/*
@@ -1424,7 +1365,7 @@ RecordTransactionAbort(bool isSubXact)
RelFileNode *rels;
int nchildren;
TransactionId *children;
- xl_xact_abort xlrec;
+ TimestampTz xact_time;
/*
* If we haven't been assigned an XID, nobody will care whether we aborted
@@ -1464,28 +1405,17 @@ RecordTransactionAbort(bool isSubXact)
/* Write the ABORT record */
if (isSubXact)
- xlrec.xact_time = GetCurrentTimestamp();
+ xact_time = GetCurrentTimestamp();
else
{
SetCurrentTransactionStopTimestamp();
- xlrec.xact_time = xactStopTimestamp;
+ xact_time = xactStopTimestamp;
}
- xlrec.nrels = nrels;
- xlrec.nsubxacts = nchildren;
- XLogBeginInsert();
- XLogRegisterData((char *) (&xlrec), MinSizeOfXactAbort);
-
- /* dump rels to delete */
- if (nrels > 0)
- XLogRegisterData((char *) rels, nrels * sizeof(RelFileNode));
-
- /* dump committed child Xids */
- if (nchildren > 0)
- XLogRegisterData((char *) children,
- nchildren * sizeof(TransactionId));
-
- (void) XLogInsert(RM_XACT_ID, XLOG_XACT_ABORT);
+ XactLogAbortRecord(xact_time,
+ nchildren, children,
+ nrels, rels,
+ InvalidTransactionId);
/*
* Report the latest async abort LSN, so that the WAL writer knows to
@@ -4659,23 +4589,230 @@ xactGetCommittedChildren(TransactionId **ptr)
* XLOG support routines
*/
+
+/*
+ * Log the commit record for a plain or twophase transaction commit.
+ *
+ * A 2pc commit will be emitted when twophase_xid is valid, a plain one
+ * otherwise.
+ */
+XLogRecPtr
+XactLogCommitRecord(TimestampTz commit_time,
+ int nsubxacts, TransactionId *subxacts,
+ int nrels, RelFileNode *rels,
+ int nmsgs, SharedInvalidationMessage *msgs,
+ bool relcacheInval, bool forceSync,
+ TransactionId twophase_xid)
+{
+ xl_xact_commit xlrec;
+ xl_xact_xinfo xl_xinfo;
+ xl_xact_dbinfo xl_dbinfo;
+ xl_xact_subxacts xl_subxacts;
+ xl_xact_relfilenodes xl_relfilenodes;
+ xl_xact_invals xl_invals;
+ xl_xact_twophase xl_twophase;
+
+ uint8 info;
+
+ Assert(CritSectionCount > 0);
+
+ xl_xinfo.xinfo = 0;
+
+ /* decide between a plain and 2pc commit */
+ if (!TransactionIdIsValid(twophase_xid))
+ info = XLOG_XACT_COMMIT;
+ else
+ info = XLOG_XACT_COMMIT_PREPARED;
+
+ /* First figure out and collect all the information needed */
+
+ xlrec.xact_time = commit_time;
+
+ if (relcacheInval)
+ xl_xinfo.xinfo |= XACT_COMPLETION_UPDATE_RELCACHE_FILE;
+ if (forceSyncCommit)
+ xl_xinfo.xinfo |= XACT_COMPLETION_FORCE_SYNC_COMMIT;
+
+ /*
+ * Relcache invalidations requires information about the current database
+ * and so does logical decoding.
+ */
+ if (nmsgs > 0 || XLogLogicalInfoActive())
+ {
+ xl_xinfo.xinfo |= XACT_XINFO_HAS_DBINFO;
+ xl_dbinfo.dbId = MyDatabaseId;
+ xl_dbinfo.tsId = MyDatabaseTableSpace;
+ }
+
+ if (nsubxacts > 0)
+ {
+ xl_xinfo.xinfo |= XACT_XINFO_HAS_SUBXACTS;
+ xl_subxacts.nsubxacts = nsubxacts;
+ }
+
+ if (nrels > 0)
+ {
+ xl_xinfo.xinfo |= XACT_XINFO_HAS_RELFILENODES;
+ xl_relfilenodes.nrels = nrels;
+ }
+
+ if (nmsgs > 0)
+ {
+ xl_xinfo.xinfo |= XACT_XINFO_HAS_INVALS;
+ xl_invals.nmsgs = nmsgs;
+ }
+
+ if (TransactionIdIsValid(twophase_xid))
+ {
+ xl_xinfo.xinfo |= XACT_XINFO_HAS_TWOPHASE;
+ xl_twophase.xid = twophase_xid;
+ }
+
+ if (xl_xinfo.xinfo != 0)
+ info |= XLOG_XACT_HAS_INFO;
+
+ /* Then include all the collected data into the commit record. */
+
+ XLogBeginInsert();
+
+ XLogRegisterData((char *) (&xlrec), sizeof(xl_xact_commit));
+
+ if (xl_xinfo.xinfo != 0)
+ XLogRegisterData((char *) (&xl_xinfo.xinfo), sizeof(xl_xinfo.xinfo));
+
+ if (xl_xinfo.xinfo & XACT_XINFO_HAS_DBINFO)
+ XLogRegisterData((char *) (&xl_dbinfo), sizeof(xl_dbinfo));
+
+ if (xl_xinfo.xinfo & XACT_XINFO_HAS_SUBXACTS)
+ {
+ XLogRegisterData((char *) (&xl_subxacts),
+ MinSizeOfXactSubxacts);
+ XLogRegisterData((char *) subxacts,
+ nsubxacts * sizeof(TransactionId));
+ }
+
+ if (xl_xinfo.xinfo & XACT_XINFO_HAS_RELFILENODES)
+ {
+ XLogRegisterData((char *) (&xl_relfilenodes),
+ MinSizeOfXactRelfilenodes);
+ XLogRegisterData((char *) rels,
+ nrels * sizeof(RelFileNode));
+ }
+
+ if (xl_xinfo.xinfo & XACT_XINFO_HAS_INVALS)
+ {
+ XLogRegisterData((char *) (&xl_invals), MinSizeOfXactInvals);
+ XLogRegisterData((char *) msgs,
+ nmsgs * sizeof(SharedInvalidationMessage));
+ }
+
+ if (xl_xinfo.xinfo & XACT_XINFO_HAS_TWOPHASE)
+ XLogRegisterData((char *) (&xl_twophase), sizeof(xl_xact_twophase));
+
+
+ return XLogInsert(RM_XACT_ID, info);
+}
+
+/*
+ * Log the commit record for a plain or twophase transaction abort.
+ *
+ * A 2pc abort will be emitted when twophase_xid is valid, a plain one
+ * otherwise.
+ */
+XLogRecPtr
+XactLogAbortRecord(TimestampTz abort_time,
+ int nsubxacts, TransactionId *subxacts,
+ int nrels, RelFileNode *rels,
+ TransactionId twophase_xid)
+{
+ xl_xact_abort xlrec;
+ xl_xact_xinfo xl_xinfo;
+ xl_xact_subxacts xl_subxacts;
+ xl_xact_relfilenodes xl_relfilenodes;
+ xl_xact_twophase xl_twophase;
+
+ uint8 info;
+
+ Assert(CritSectionCount > 0);
+
+ xl_xinfo.xinfo = 0;
+
+ /* decide between a plain and 2pc abort */
+ if (!TransactionIdIsValid(twophase_xid))
+ info = XLOG_XACT_ABORT;
+ else
+ info = XLOG_XACT_ABORT_PREPARED;
+
+
+ /* First figure out and collect all the information needed */
+
+ xlrec.xact_time = abort_time;
+
+ if (nsubxacts > 0)
+ {
+ xl_xinfo.xinfo |= XACT_XINFO_HAS_SUBXACTS;
+ xl_subxacts.nsubxacts = nsubxacts;
+ }
+
+ if (nrels > 0)
+ {
+ xl_xinfo.xinfo |= XACT_XINFO_HAS_RELFILENODES;
+ xl_relfilenodes.nrels = nrels;
+ }
+
+ if (TransactionIdIsValid(twophase_xid))
+ {
+ xl_xinfo.xinfo |= XACT_XINFO_HAS_TWOPHASE;
+ xl_twophase.xid = twophase_xid;
+ }
+
+ if (xl_xinfo.xinfo != 0)
+ info |= XLOG_XACT_HAS_INFO;
+
+ /* Then include all the collected data into the abort record. */
+
+ XLogBeginInsert();
+
+ XLogRegisterData((char *) (&xlrec), MinSizeOfXactAbort);
+
+ if (xl_xinfo.xinfo != 0)
+ XLogRegisterData((char *) (&xl_xinfo), sizeof(xl_xinfo));
+
+ if (xl_xinfo.xinfo & XACT_XINFO_HAS_SUBXACTS)
+ {
+ XLogRegisterData((char *) (&xl_subxacts),
+ MinSizeOfXactSubxacts);
+ XLogRegisterData((char *) subxacts,
+ nsubxacts * sizeof(TransactionId));
+ }
+
+ if (xl_xinfo.xinfo & XACT_XINFO_HAS_RELFILENODES)
+ {
+ XLogRegisterData((char *) (&xl_relfilenodes),
+ MinSizeOfXactRelfilenodes);
+ XLogRegisterData((char *) rels,
+ nrels * sizeof(RelFileNode));
+ }
+
+ if (xl_xinfo.xinfo & XACT_XINFO_HAS_TWOPHASE)
+ XLogRegisterData((char *) (&xl_twophase), sizeof(xl_xact_twophase));
+
+ return XLogInsert(RM_XACT_ID, info);
+}
+
/*
* Before 9.0 this was a fairly short function, but now it performs many
* actions for which the order of execution is critical.
*/
static void
-xact_redo_commit_internal(TransactionId xid, XLogRecPtr lsn,
- TimestampTz commit_time,
- TransactionId *sub_xids, int nsubxacts,
- SharedInvalidationMessage *inval_msgs, int nmsgs,
- RelFileNode *xnodes, int nrels,
- Oid dbId, Oid tsId,
- uint32 xinfo)
+xact_redo_commit(xl_xact_parsed_commit *parsed,
+ TransactionId xid,
+ XLogRecPtr lsn)
{
TransactionId max_xid;
int i;
- max_xid = TransactionIdLatest(xid, nsubxacts, sub_xids);
+ max_xid = TransactionIdLatest(xid, parsed->nsubxacts, parsed->subxacts);
/*
* Make sure nextXid is beyond any XID mentioned in the record.
@@ -4694,15 +4831,16 @@ xact_redo_commit_internal(TransactionId xid, XLogRecPtr lsn,
}
/* Set the transaction commit timestamp and metadata */
- TransactionTreeSetCommitTsData(xid, nsubxacts, sub_xids,
- commit_time, InvalidCommitTsNodeId, false);
+ TransactionTreeSetCommitTsData(xid, parsed->nsubxacts, parsed->subxacts,
+ parsed->xact_time, InvalidCommitTsNodeId,
+ false);
if (standbyState == STANDBY_DISABLED)
{
/*
* Mark the transaction committed in pg_clog.
*/
- TransactionIdCommitTree(xid, nsubxacts, sub_xids);
+ TransactionIdCommitTree(xid, parsed->nsubxacts, parsed->subxacts);
}
else
{
@@ -4726,21 +4864,24 @@ xact_redo_commit_internal(TransactionId xid, XLogRecPtr lsn,
* bits set on changes made by transactions that haven't yet
* recovered. It's unlikely but it's good to be safe.
*/
- TransactionIdAsyncCommitTree(xid, nsubxacts, sub_xids, lsn);
+ TransactionIdAsyncCommitTree(
+ xid, parsed->nsubxacts, parsed->subxacts, lsn);
/*
* We must mark clog before we update the ProcArray.
*/
- ExpireTreeKnownAssignedTransactionIds(xid, nsubxacts, sub_xids, max_xid);
+ ExpireTreeKnownAssignedTransactionIds(
+ xid, parsed->nsubxacts, parsed->subxacts, max_xid);
/*
* Send any cache invalidations attached to the commit. We must
* maintain the same order of invalidation then release locks as
* occurs in CommitTransaction().
*/
- ProcessCommittedInvalidationMessages(inval_msgs, nmsgs,
- XactCompletionRelcacheInitFileInval(xinfo),
- dbId, tsId);
+ ProcessCommittedInvalidationMessages(
+ parsed->msgs, parsed->nmsgs,
+ XactCompletionRelcacheInitFileInval(parsed->xinfo),
+ parsed->dbId, parsed->tsId);
/*
* Release locks, if any. We do this for both two phase and normal one
@@ -4753,7 +4894,7 @@ xact_redo_commit_internal(TransactionId xid, XLogRecPtr lsn,
}
/* Make sure files supposed to be dropped are dropped */
- if (nrels > 0)
+ if (parsed->nrels > 0)
{
/*
* First update minimum recovery point to cover this WAL record. Once
@@ -4772,13 +4913,13 @@ xact_redo_commit_internal(TransactionId xid, XLogRecPtr lsn,
*/
XLogFlush(lsn);
- for (i = 0; i < nrels; i++)
+ for (i = 0; i < parsed->nrels; i++)
{
- SMgrRelation srel = smgropen(xnodes[i], InvalidBackendId);
+ SMgrRelation srel = smgropen(parsed->xnodes[i], InvalidBackendId);
ForkNumber fork;
for (fork = 0; fork <= MAX_FORKNUM; fork++)
- XLogDropRelation(xnodes[i], fork);
+ XLogDropRelation(parsed->xnodes[i], fork);
smgrdounlink(srel, true);
smgrclose(srel);
}
@@ -4796,52 +4937,12 @@ xact_redo_commit_internal(TransactionId xid, XLogRecPtr lsn,
* minRecoveryPoint during recovery) helps to reduce that problem window,
* for any user that requested ForceSyncCommit().
*/
- if (XactCompletionForceSyncCommit(xinfo))
+ if (XactCompletionForceSyncCommit(parsed->xinfo))
XLogFlush(lsn);
}
/*
- * Utility function to call xact_redo_commit_internal after breaking down xlrec
- */
-static void
-xact_redo_commit(xl_xact_commit *xlrec,
- TransactionId xid, XLogRecPtr lsn)
-{
- TransactionId *subxacts;
- SharedInvalidationMessage *inval_msgs;
-
- /* subxid array follows relfilenodes */
- subxacts = (TransactionId *) &(xlrec->xnodes[xlrec->nrels]);
- /* invalidation messages array follows subxids */
- inval_msgs = (SharedInvalidationMessage *) &(subxacts[xlrec->nsubxacts]);
-
- xact_redo_commit_internal(xid, lsn, xlrec->xact_time,
- subxacts, xlrec->nsubxacts,
- inval_msgs, xlrec->nmsgs,
- xlrec->xnodes, xlrec->nrels,
- xlrec->dbId,
- xlrec->tsId,
- xlrec->xinfo);
-}
-
-/*
- * Utility function to call xact_redo_commit_internal for compact form of message.
- */
-static void
-xact_redo_commit_compact(xl_xact_commit_compact *xlrec,
- TransactionId xid, XLogRecPtr lsn)
-{
- xact_redo_commit_internal(xid, lsn, xlrec->xact_time,
- xlrec->subxacts, xlrec->nsubxacts,
- NULL, 0, /* inval msgs */
- NULL, 0, /* relfilenodes */
- InvalidOid, /* dbId */
- InvalidOid, /* tsId */
- 0); /* xinfo */
-}
-
-/*
* Be careful with the order of execution, as with xact_redo_commit().
* The two functions are similar but differ in key places.
*
@@ -4851,14 +4952,10 @@ xact_redo_commit_compact(xl_xact_commit_compact *xlrec,
* because subtransaction commit is never WAL logged.
*/
static void
-xact_redo_abort(xl_xact_abort *xlrec, TransactionId xid)
+xact_redo_abort(xl_xact_parsed_abort *parsed, TransactionId xid)
{
- TransactionId *sub_xids;
- TransactionId max_xid;
- int i;
-
- sub_xids = (TransactionId *) &(xlrec->xnodes[xlrec->nrels]);
- max_xid = TransactionIdLatest(xid, xlrec->nsubxacts, sub_xids);
+ int i;
+ TransactionId max_xid;
/*
* Make sure nextXid is beyond any XID mentioned in the record.
@@ -4867,6 +4964,10 @@ xact_redo_abort(xl_xact_abort *xlrec, TransactionId xid)
* hold a lock while checking this. We still acquire the lock to modify
* it, though.
*/
+ max_xid = TransactionIdLatest(xid,
+ parsed->nsubxacts,
+ parsed->subxacts);
+
if (TransactionIdFollowsOrEquals(max_xid,
ShmemVariableCache->nextXid))
{
@@ -4879,7 +4980,7 @@ xact_redo_abort(xl_xact_abort *xlrec, TransactionId xid)
if (standbyState == STANDBY_DISABLED)
{
/* Mark the transaction aborted in pg_clog, no need for async stuff */
- TransactionIdAbortTree(xid, xlrec->nsubxacts, sub_xids);
+ TransactionIdAbortTree(xid, parsed->nsubxacts, parsed->subxacts);
}
else
{
@@ -4895,12 +4996,13 @@ xact_redo_abort(xl_xact_abort *xlrec, TransactionId xid)
RecordKnownAssignedTransactionIds(max_xid);
/* Mark the transaction aborted in pg_clog, no need for async stuff */
- TransactionIdAbortTree(xid, xlrec->nsubxacts, sub_xids);
+ TransactionIdAbortTree(xid, parsed->nsubxacts, parsed->subxacts);
/*
* We must update the ProcArray after we have marked clog.
*/
- ExpireTreeKnownAssignedTransactionIds(xid, xlrec->nsubxacts, sub_xids, max_xid);
+ ExpireTreeKnownAssignedTransactionIds(
+ xid, parsed->nsubxacts, parsed->subxacts, max_xid);
/*
* There are no flat files that need updating, nor invalidation
@@ -4910,17 +5012,17 @@ xact_redo_abort(xl_xact_abort *xlrec, TransactionId xid)
/*
* Release locks, if any. There are no invalidations to send.
*/
- StandbyReleaseLockTree(xid, xlrec->nsubxacts, sub_xids);
+ StandbyReleaseLockTree(xid, parsed->nsubxacts, parsed->subxacts);
}
/* Make sure files supposed to be dropped are dropped */
- for (i = 0; i < xlrec->nrels; i++)
+ for (i = 0; i < parsed->nrels; i++)
{
- SMgrRelation srel = smgropen(xlrec->xnodes[i], InvalidBackendId);
+ SMgrRelation srel = smgropen(parsed->xnodes[i], InvalidBackendId);
ForkNumber fork;
for (fork = 0; fork <= MAX_FORKNUM; fork++)
- XLogDropRelation(xlrec->xnodes[i], fork);
+ XLogDropRelation(parsed->xnodes[i], fork);
smgrdounlink(srel, true);
smgrclose(srel);
}
@@ -4929,28 +5031,52 @@ xact_redo_abort(xl_xact_abort *xlrec, TransactionId xid)
void
xact_redo(XLogReaderState *record)
{
- uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
+ uint8 info = XLogRecGetInfo(record) & XLOG_XACT_OPMASK;
/* Backup blocks are not used in xact records */
Assert(!XLogRecHasAnyBlockRefs(record));
- if (info == XLOG_XACT_COMMIT_COMPACT)
- {
- xl_xact_commit_compact *xlrec = (xl_xact_commit_compact *) XLogRecGetData(record);
-
- xact_redo_commit_compact(xlrec, XLogRecGetXid(record), record->EndRecPtr);
- }
- else if (info == XLOG_XACT_COMMIT)
+ if (info == XLOG_XACT_COMMIT || info == XLOG_XACT_COMMIT_PREPARED)
{
xl_xact_commit *xlrec = (xl_xact_commit *) XLogRecGetData(record);
+ xl_xact_parsed_commit parsed;
+
+ ParseCommitRecord(XLogRecGetInfo(record), xlrec,
+ &parsed);
- xact_redo_commit(xlrec, XLogRecGetXid(record), record->EndRecPtr);
+ if (info == XLOG_XACT_COMMIT)
+ {
+ Assert(!TransactionIdIsValid(parsed.twophase_xid));
+ xact_redo_commit(&parsed, XLogRecGetXid(record),
+ record->EndRecPtr);
+ }
+ else
+ {
+ Assert(TransactionIdIsValid(parsed.twophase_xid));
+ xact_redo_commit(&parsed, parsed.twophase_xid,
+ record->EndRecPtr);
+ RemoveTwoPhaseFile(parsed.twophase_xid, false);
+ }
}
- else if (info == XLOG_XACT_ABORT)
+ else if (info == XLOG_XACT_ABORT || info == XLOG_XACT_ABORT_PREPARED)
{
xl_xact_abort *xlrec = (xl_xact_abort *) XLogRecGetData(record);
+ xl_xact_parsed_abort parsed;
+
+ ParseAbortRecord(XLogRecGetInfo(record), xlrec,
+ &parsed);
- xact_redo_abort(xlrec, XLogRecGetXid(record));
+ if (info == XLOG_XACT_ABORT)
+ {
+ Assert(!TransactionIdIsValid(parsed.twophase_xid));
+ xact_redo_abort(&parsed, XLogRecGetXid(record));
+ }
+ else
+ {
+ Assert(TransactionIdIsValid(parsed.twophase_xid));
+ xact_redo_abort(&parsed, parsed.twophase_xid);
+ RemoveTwoPhaseFile(parsed.twophase_xid, false);
+ }
}
else if (info == XLOG_XACT_PREPARE)
{
@@ -4958,20 +5084,6 @@ xact_redo(XLogReaderState *record)
RecreateTwoPhaseFile(XLogRecGetXid(record),
XLogRecGetData(record), XLogRecGetDataLen(record));
}
- else if (info == XLOG_XACT_COMMIT_PREPARED)
- {
- xl_xact_commit_prepared *xlrec = (xl_xact_commit_prepared *) XLogRecGetData(record);
-
- xact_redo_commit(&xlrec->crec, xlrec->xid, record->EndRecPtr);
- RemoveTwoPhaseFile(xlrec->xid, false);
- }
- else if (info == XLOG_XACT_ABORT_PREPARED)
- {
- xl_xact_abort_prepared *xlrec = (xl_xact_abort_prepared *) XLogRecGetData(record);
-
- xact_redo_abort(&xlrec->arec, xlrec->xid);
- RemoveTwoPhaseFile(xlrec->xid, false);
- }
else if (info == XLOG_XACT_ASSIGNMENT)
{
xl_xact_assignment *xlrec = (xl_xact_assignment *) XLogRecGetData(record);
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 554491b..d4d9d6a 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -5200,39 +5200,27 @@ exitArchiveRecovery(TimeLineID endTLI, XLogRecPtr endOfLog)
static bool
getRecordTimestamp(XLogReaderState *record, TimestampTz *recordXtime)
{
- uint8 record_info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
+ uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
+ uint8 xact_info = info & XLOG_XACT_OPMASK;
uint8 rmid = XLogRecGetRmid(record);
- if (rmid == RM_XLOG_ID && record_info == XLOG_RESTORE_POINT)
+ if (rmid == RM_XLOG_ID && info == XLOG_RESTORE_POINT)
{
*recordXtime = ((xl_restore_point *) XLogRecGetData(record))->rp_time;
return true;
}
- if (rmid == RM_XACT_ID && record_info == XLOG_XACT_COMMIT_COMPACT)
- {
- *recordXtime = ((xl_xact_commit_compact *) XLogRecGetData(record))->xact_time;
- return true;
- }
- if (rmid == RM_XACT_ID && record_info == XLOG_XACT_COMMIT)
+ if (rmid == RM_XACT_ID && (xact_info == XLOG_XACT_COMMIT ||
+ xact_info == XLOG_XACT_COMMIT_PREPARED))
{
*recordXtime = ((xl_xact_commit *) XLogRecGetData(record))->xact_time;
return true;
}
- if (rmid == RM_XACT_ID && record_info == XLOG_XACT_COMMIT_PREPARED)
- {
- *recordXtime = ((xl_xact_commit_prepared *) XLogRecGetData(record))->crec.xact_time;
- return true;
- }
- if (rmid == RM_XACT_ID && record_info == XLOG_XACT_ABORT)
+ if (rmid == RM_XACT_ID && (xact_info == XLOG_XACT_ABORT ||
+ xact_info == XLOG_XACT_ABORT_PREPARED))
{
*recordXtime = ((xl_xact_abort *) XLogRecGetData(record))->xact_time;
return true;
}
- if (rmid == RM_XACT_ID && record_info == XLOG_XACT_ABORT_PREPARED)
- {
- *recordXtime = ((xl_xact_abort_prepared *) XLogRecGetData(record))->arec.xact_time;
- return true;
- }
return false;
}
@@ -5248,7 +5236,7 @@ static bool
recoveryStopsBefore(XLogReaderState *record)
{
bool stopsHere = false;
- uint8 record_info;
+ uint8 xact_info;
bool isCommit;
TimestampTz recordXtime = 0;
TransactionId recordXid;
@@ -5269,27 +5257,40 @@ recoveryStopsBefore(XLogReaderState *record)
/* Otherwise we only consider stopping before COMMIT or ABORT records. */
if (XLogRecGetRmid(record) != RM_XACT_ID)
return false;
- record_info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
- if (record_info == XLOG_XACT_COMMIT_COMPACT || record_info == XLOG_XACT_COMMIT)
+ xact_info = XLogRecGetInfo(record) & XLOG_XACT_OPMASK;
+
+ if (xact_info == XLOG_XACT_COMMIT)
{
isCommit = true;
recordXid = XLogRecGetXid(record);
}
- else if (record_info == XLOG_XACT_COMMIT_PREPARED)
+ else if (xact_info == XLOG_XACT_COMMIT_PREPARED)
{
+ xl_xact_commit *xlrec = (xl_xact_commit *) XLogRecGetData(record);
+ xl_xact_parsed_commit parsed;
+
isCommit = true;
- recordXid = ((xl_xact_commit_prepared *) XLogRecGetData(record))->xid;
+ ParseCommitRecord(XLogRecGetInfo(record),
+ xlrec,
+ &parsed);
+ recordXid = parsed.twophase_xid;
}
- else if (record_info == XLOG_XACT_ABORT)
+ else if (xact_info == XLOG_XACT_ABORT)
{
isCommit = false;
recordXid = XLogRecGetXid(record);
}
- else if (record_info == XLOG_XACT_ABORT_PREPARED)
+ else if (xact_info == XLOG_XACT_ABORT_PREPARED)
{
- isCommit = false;
- recordXid = ((xl_xact_abort_prepared *) XLogRecGetData(record))->xid;
+ xl_xact_abort *xlrec = (xl_xact_abort *) XLogRecGetData(record);
+ xl_xact_parsed_abort parsed;
+
+ isCommit = true;
+ ParseAbortRecord(XLogRecGetInfo(record),
+ xlrec,
+ &parsed);
+ recordXid = parsed.twophase_xid;
}
else
return false;
@@ -5357,11 +5358,12 @@ recoveryStopsBefore(XLogReaderState *record)
static bool
recoveryStopsAfter(XLogReaderState *record)
{
- uint8 record_info;
+ uint8 info;
+ uint8 xact_info;
uint8 rmid;
TimestampTz recordXtime;
- record_info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
+ info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
rmid = XLogRecGetRmid(record);
/*
@@ -5369,7 +5371,7 @@ recoveryStopsAfter(XLogReaderState *record)
* the first one.
*/
if (recoveryTarget == RECOVERY_TARGET_NAME &&
- rmid == RM_XLOG_ID && record_info == XLOG_RESTORE_POINT)
+ rmid == RM_XLOG_ID && info == XLOG_RESTORE_POINT)
{
xl_restore_point *recordRestorePointData;
@@ -5390,12 +5392,15 @@ recoveryStopsAfter(XLogReaderState *record)
}
}
- if (rmid == RM_XACT_ID &&
- (record_info == XLOG_XACT_COMMIT_COMPACT ||
- record_info == XLOG_XACT_COMMIT ||
- record_info == XLOG_XACT_COMMIT_PREPARED ||
- record_info == XLOG_XACT_ABORT ||
- record_info == XLOG_XACT_ABORT_PREPARED))
+ if (rmid != RM_XACT_ID)
+ return false;
+
+ xact_info = info & XLOG_XACT_OPMASK;
+
+ if (xact_info == XLOG_XACT_COMMIT ||
+ xact_info == XLOG_XACT_COMMIT_PREPARED ||
+ xact_info == XLOG_XACT_ABORT ||
+ xact_info == XLOG_XACT_ABORT_PREPARED)
{
TransactionId recordXid;
@@ -5404,10 +5409,26 @@ recoveryStopsAfter(XLogReaderState *record)
SetLatestXTime(recordXtime);
/* Extract the XID of the committed/aborted transaction */
- if (record_info == XLOG_XACT_COMMIT_PREPARED)
- recordXid = ((xl_xact_commit_prepared *) XLogRecGetData(record))->xid;
- else if (record_info == XLOG_XACT_ABORT_PREPARED)
- recordXid = ((xl_xact_abort_prepared *) XLogRecGetData(record))->xid;
+ if (xact_info == XLOG_XACT_COMMIT_PREPARED)
+ {
+ xl_xact_commit *xlrec = (xl_xact_commit *) XLogRecGetData(record);
+ xl_xact_parsed_commit parsed;
+
+ ParseCommitRecord(XLogRecGetInfo(record),
+ xlrec,
+ &parsed);
+ recordXid = parsed.twophase_xid;
+ }
+ else if (xact_info == XLOG_XACT_ABORT_PREPARED)
+ {
+ xl_xact_abort *xlrec = (xl_xact_abort *) XLogRecGetData(record);
+ xl_xact_parsed_abort parsed;
+
+ ParseAbortRecord(XLogRecGetInfo(record),
+ xlrec,
+ &parsed);
+ recordXid = parsed.twophase_xid;
+ }
else
recordXid = XLogRecGetXid(record);
@@ -5428,17 +5449,16 @@ recoveryStopsAfter(XLogReaderState *record)
recoveryStopTime = recordXtime;
recoveryStopName[0] = '\0';
- if (record_info == XLOG_XACT_COMMIT_COMPACT ||
- record_info == XLOG_XACT_COMMIT ||
- record_info == XLOG_XACT_COMMIT_PREPARED)
+ if (xact_info == XLOG_XACT_COMMIT ||
+ xact_info == XLOG_XACT_COMMIT_PREPARED)
{
ereport(LOG,
(errmsg("recovery stopping after commit of transaction %u, time %s",
recoveryStopXid,
timestamptz_to_str(recoveryStopTime))));
}
- else if (record_info == XLOG_XACT_ABORT ||
- record_info == XLOG_XACT_ABORT_PREPARED)
+ else if (xact_info == XLOG_XACT_ABORT ||
+ xact_info == XLOG_XACT_ABORT_PREPARED)
{
ereport(LOG,
(errmsg("recovery stopping after abort of transaction %u, time %s",
@@ -5526,7 +5546,7 @@ SetRecoveryPause(bool recoveryPause)
static bool
recoveryApplyDelay(XLogReaderState *record)
{
- uint8 record_info;
+ uint8 xact_info;
TimestampTz xtime;
long secs;
int microsecs;
@@ -5543,11 +5563,13 @@ recoveryApplyDelay(XLogReaderState *record)
* so there is already opportunity for issues caused by early conflicts on
* standbys.
*/
- record_info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
- if (!(XLogRecGetRmid(record) == RM_XACT_ID &&
- (record_info == XLOG_XACT_COMMIT_COMPACT ||
- record_info == XLOG_XACT_COMMIT ||
- record_info == XLOG_XACT_COMMIT_PREPARED)))
+ if (XLogRecGetRmid(record) != RM_XACT_ID)
+ return false;
+
+ xact_info = XLogRecGetInfo(record) & XLOG_XACT_COMMIT;
+
+ if (xact_info != XLOG_XACT_COMMIT &&
+ xact_info != XLOG_XACT_COMMIT_PREPARED)
return false;
if (!getRecordTimestamp(record, &xtime))
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index e7614bd..24c2b59 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -64,12 +64,9 @@ static void DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
static void DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
static void DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
static void DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
- TransactionId xid, Oid dboid,
- TimestampTz commit_time,
- int nsubxacts, TransactionId *sub_xids,
- int ninval_msgs, SharedInvalidationMessage *msg);
-static void DecodeAbort(LogicalDecodingContext *ctx, XLogRecPtr lsn,
- TransactionId xid, TransactionId *sub_xids, int nsubxacts);
+ xl_xact_parsed_commit *parsed, TransactionId xid);
+static void DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
+ xl_xact_parsed_abort *parsed, TransactionId xid);
/* common function to decode tuples */
static void DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tup);
@@ -188,7 +185,7 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
SnapBuild *builder = ctx->snapshot_builder;
ReorderBuffer *reorder = ctx->reorder;
XLogReaderState *r = buf->record;
- uint8 info = XLogRecGetInfo(r) & ~XLR_INFO_MASK;
+ uint8 info = XLogRecGetInfo(r) & ~XLOG_XACT_OPMASK;
/* no point in doing anything yet, data could not be decoded anyway */
if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT)
@@ -197,87 +194,41 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
switch (info)
{
case XLOG_XACT_COMMIT:
- {
- xl_xact_commit *xlrec;
- TransactionId *subxacts = NULL;
- SharedInvalidationMessage *invals = NULL;
-
- xlrec = (xl_xact_commit *) XLogRecGetData(r);
-
- subxacts = (TransactionId *) &(xlrec->xnodes[xlrec->nrels]);
- invals = (SharedInvalidationMessage *) &(subxacts[xlrec->nsubxacts]);
-
- DecodeCommit(ctx, buf, XLogRecGetXid(r), xlrec->dbId,
- xlrec->xact_time,
- xlrec->nsubxacts, subxacts,
- xlrec->nmsgs, invals);
-
- break;
- }
case XLOG_XACT_COMMIT_PREPARED:
{
- xl_xact_commit_prepared *prec;
xl_xact_commit *xlrec;
- TransactionId *subxacts;
- SharedInvalidationMessage *invals = NULL;
-
- /* Prepared commits contain a normal commit record... */
- prec = (xl_xact_commit_prepared *) XLogRecGetData(r);
- xlrec = &prec->crec;
+ xl_xact_parsed_commit parsed;
+ TransactionId xid;
- subxacts = (TransactionId *) &(xlrec->xnodes[xlrec->nrels]);
- invals = (SharedInvalidationMessage *) &(subxacts[xlrec->nsubxacts]);
-
- DecodeCommit(ctx, buf, prec->xid, xlrec->dbId,
- xlrec->xact_time,
- xlrec->nsubxacts, subxacts,
- xlrec->nmsgs, invals);
-
- break;
- }
- case XLOG_XACT_COMMIT_COMPACT:
- {
- xl_xact_commit_compact *xlrec;
+ xlrec = (xl_xact_commit *) XLogRecGetData(r);
+ ParseCommitRecord(XLogRecGetInfo(buf->record), xlrec, &parsed);
- xlrec = (xl_xact_commit_compact *) XLogRecGetData(r);
+ if (TransactionIdIsValid(parsed.twophase_xid))
+ xid = XLogRecGetXid(r);
+ else
+ xid = parsed.twophase_xid;
- DecodeCommit(ctx, buf, XLogRecGetXid(r), InvalidOid,
- xlrec->xact_time,
- xlrec->nsubxacts, xlrec->subxacts,
- 0, NULL);
+ DecodeCommit(ctx, buf, &parsed, xid);
break;
}
case XLOG_XACT_ABORT:
- {
- xl_xact_abort *xlrec;
- TransactionId *sub_xids;
-
- xlrec = (xl_xact_abort *) XLogRecGetData(r);
-
- sub_xids = (TransactionId *) &(xlrec->xnodes[xlrec->nrels]);
-
- DecodeAbort(ctx, buf->origptr, XLogRecGetXid(r),
- sub_xids, xlrec->nsubxacts);
- break;
- }
case XLOG_XACT_ABORT_PREPARED:
{
- xl_xact_abort_prepared *prec;
xl_xact_abort *xlrec;
- TransactionId *sub_xids;
+ xl_xact_parsed_abort parsed;
+ TransactionId xid;
- /* prepared abort contain a normal commit abort... */
- prec = (xl_xact_abort_prepared *) XLogRecGetData(r);
- xlrec = &prec->arec;
+ xlrec = (xl_xact_abort *) XLogRecGetData(r);
+ ParseAbortRecord(XLogRecGetInfo(buf->record), xlrec, &parsed);
- sub_xids = (TransactionId *) &(xlrec->xnodes[xlrec->nrels]);
+ if (TransactionIdIsValid(parsed.twophase_xid))
+ xid = XLogRecGetXid(r);
+ else
+ xid = parsed.twophase_xid;
- /* r->xl_xid is committed in a separate record */
- DecodeAbort(ctx, buf->origptr, prec->xid,
- sub_xids, xlrec->nsubxacts);
+ DecodeAbort(ctx, buf, &parsed, xid);
break;
}
-
case XLOG_XACT_ASSIGNMENT:
{
xl_xact_assignment *xlrec;
@@ -477,10 +428,7 @@ DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
*/
static void
DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
- TransactionId xid, Oid dboid,
- TimestampTz commit_time,
- int nsubxacts, TransactionId *sub_xids,
- int ninval_msgs, SharedInvalidationMessage *msgs)
+ xl_xact_parsed_commit *parsed, TransactionId xid)
{
int i;
@@ -489,15 +437,15 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
* transaction's contents, since the various caches need to always be
* consistent.
*/
- if (ninval_msgs > 0)
+ if (parsed->nmsgs > 0)
{
ReorderBufferAddInvalidations(ctx->reorder, xid, buf->origptr,
- ninval_msgs, msgs);
+ parsed->nmsgs, parsed->msgs);
ReorderBufferXidSetCatalogChanges(ctx->reorder, xid, buf->origptr);
}
SnapBuildCommitTxn(ctx->snapshot_builder, buf->origptr, xid,
- nsubxacts, sub_xids);
+ parsed->nsubxacts, parsed->subxacts);
/* ----
* Check whether we are interested in this specific transaction, and tell
@@ -524,12 +472,11 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
* ---
*/
if (SnapBuildXactNeedsSkip(ctx->snapshot_builder, buf->origptr) ||
- (dboid != InvalidOid && dboid != ctx->slot->data.database))
+ (parsed->dbId != InvalidOid && parsed->dbId != ctx->slot->data.database))
{
- for (i = 0; i < nsubxacts; i++)
+ for (i = 0; i < parsed->nsubxacts; i++)
{
- ReorderBufferForget(ctx->reorder, *sub_xids, buf->origptr);
- sub_xids++;
+ ReorderBufferForget(ctx->reorder, parsed->subxacts[i], buf->origptr);
}
ReorderBufferForget(ctx->reorder, xid, buf->origptr);
@@ -537,16 +484,15 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
}
/* tell the reorderbuffer about the surviving subtransactions */
- for (i = 0; i < nsubxacts; i++)
+ for (i = 0; i < parsed->nsubxacts; i++)
{
- ReorderBufferCommitChild(ctx->reorder, xid, *sub_xids,
+ ReorderBufferCommitChild(ctx->reorder, xid, parsed->subxacts[i],
buf->origptr, buf->endptr);
- sub_xids++;
}
/* replay actions of all transaction + subtransactions in order */
ReorderBufferCommit(ctx->reorder, xid, buf->origptr, buf->endptr,
- commit_time);
+ parsed->xact_time);
}
/*
@@ -554,20 +500,21 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
* snapbuild.c and reorderbuffer.c
*/
static void
-DecodeAbort(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
- TransactionId *sub_xids, int nsubxacts)
+DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
+ xl_xact_parsed_abort *parsed, TransactionId xid)
{
int i;
- SnapBuildAbortTxn(ctx->snapshot_builder, lsn, xid, nsubxacts, sub_xids);
+ SnapBuildAbortTxn(ctx->snapshot_builder, buf->record->EndRecPtr, xid,
+ parsed->nsubxacts, parsed->subxacts);
- for (i = 0; i < nsubxacts; i++)
+ for (i = 0; i < parsed->nsubxacts; i++)
{
- ReorderBufferAbort(ctx->reorder, *sub_xids, lsn);
- sub_xids++;
+ ReorderBufferAbort(ctx->reorder, parsed->subxacts[i],
+ buf->record->EndRecPtr);
}
- ReorderBufferAbort(ctx->reorder, xid, lsn);
+ ReorderBufferAbort(ctx->reorder, xid, buf->record->EndRecPtr);
}
/*
diff --git a/src/include/access/xact.h b/src/include/access/xact.h
index d7e5f64..fdf3ea3 100644
--- a/src/include/access/xact.h
+++ b/src/include/access/xact.h
@@ -18,6 +18,7 @@
#include "lib/stringinfo.h"
#include "nodes/pg_list.h"
#include "storage/relfilenode.h"
+#include "storage/sinval.h"
#include "utils/datetime.h"
@@ -103,8 +104,8 @@ typedef void (*SubXactCallback) (SubXactEvent event, SubTransactionId mySubid,
*/
/*
- * XLOG allows to store some information in high 4 bits of log
- * record xl_info field
+ * XLOG allows to store some information in high 4 bits of log record xl_info
+ * field. We use 3 for the opcode, and one about an optional flag variable.
*/
#define XLOG_XACT_COMMIT 0x00
#define XLOG_XACT_PREPARE 0x10
@@ -112,7 +113,41 @@ typedef void (*SubXactCallback) (SubXactEvent event, SubTransactionId mySubid,
#define XLOG_XACT_COMMIT_PREPARED 0x30
#define XLOG_XACT_ABORT_PREPARED 0x40
#define XLOG_XACT_ASSIGNMENT 0x50
-#define XLOG_XACT_COMMIT_COMPACT 0x60
+/* free opcode 0x60 */
+/* free opcode 0x70 */
+
+/* mask for filtering opcodes out of xl_info */
+#define XLOG_XACT_OPMASK 0x70
+
+/* does this record have a 'xinfo' field or not */
+#define XLOG_XACT_HAS_INFO 0x80
+
+/*
+ * The following flags, stored in xinfo, determine which information is
+ * contained in commit/abort records.
+ */
+#define XACT_XINFO_HAS_DBINFO (1U << 0)
+#define XACT_XINFO_HAS_SUBXACTS (1U << 1)
+#define XACT_XINFO_HAS_RELFILENODES (1U << 2)
+#define XACT_XINFO_HAS_INVALS (1U << 3)
+#define XACT_XINFO_HAS_TWOPHASE (1U << 4)
+
+/*
+ * Also stored in xinfo, these indicating a variety of additional actions that
+ * need to occur when emulating transaction effects during recovery.
+ *
+ * They are named XactCompletion... to differentiate them from
+ * EOXact... routines which run at the end of the original transaction
+ * completion.
+ */
+#define XACT_COMPLETION_UPDATE_RELCACHE_FILE (1U << 30)
+#define XACT_COMPLETION_FORCE_SYNC_COMMIT (1U << 31)
+
+/* Access macros for above flags */
+#define XactCompletionRelcacheInitFileInval(xinfo) \
+ (!!(xinfo & XACT_COMPLETION_UPDATE_RELCACHE_FILE))
+#define XactCompletionForceSyncCommit(xinfo) \
+ (!!(xinfo & XACT_COMPLETION_FORCE_SYNC_COMMIT))
typedef struct xl_xact_assignment
{
@@ -123,85 +158,130 @@ typedef struct xl_xact_assignment
#define MinSizeOfXactAssignment offsetof(xl_xact_assignment, xsub)
-typedef struct xl_xact_commit_compact
+/*
+ * Commit and abort records can contain a lot of information. But a large
+ * portion of the records won't need all possible pieces of information. So we
+ * only include what's needed.
+ *
+ * A minimal commit/abort record only consists out of a xl_xact_commit/abort
+ * struct. The presence of additional information is indicated by bits set in
+ * 'xl_xact_xinfo->xinfo'. The presence of the xinfo field itself is signalled
+ * by a set XLOG_XACT_HAS_INFO bit in the xl_info field.
+ *
+ * NB: All the individual data chunks should be be sized to multiples of
+ * sizeof(int) and only require int32 alignment.
+ */
+
+/* sub-records for commit/abort */
+
+typedef struct xl_xact_xinfo
+{
+ /*
+ * Even though we right now only require 1 byte of space in xinfo we use
+ * four so following records don't have to care about alignment. Commit
+ * records can be large, so copying large portions isn't attractive.
+ */
+ uint32 xinfo;
+} xl_xact_xinfo;
+
+typedef struct xl_xact_dbinfo
+{
+ Oid dbId; /* MyDatabaseId */
+ Oid tsId; /* MyDatabaseTableSpace */
+} xl_xact_dbinfo;
+
+typedef struct xl_xact_subxacts
{
- TimestampTz xact_time; /* time of commit */
int nsubxacts; /* number of subtransaction XIDs */
- /* ARRAY OF COMMITTED SUBTRANSACTION XIDs FOLLOWS */
TransactionId subxacts[FLEXIBLE_ARRAY_MEMBER];
-} xl_xact_commit_compact;
+} xl_xact_subxacts;
+#define MinSizeOfXactSubxacts offsetof(xl_xact_subxacts, subxacts)
-#define MinSizeOfXactCommitCompact offsetof(xl_xact_commit_compact, subxacts)
+typedef struct xl_xact_relfilenodes
+{
+ int nrels; /* number of subtransaction XIDs */
+ RelFileNode xnodes[FLEXIBLE_ARRAY_MEMBER];
+} xl_xact_relfilenodes;
+#define MinSizeOfXactRelfilenodes offsetof(xl_xact_relfilenodes, xnodes)
-typedef struct xl_xact_commit
+typedef struct xl_xact_invals
{
- TimestampTz xact_time; /* time of commit */
- uint32 xinfo; /* info flags */
- int nrels; /* number of RelFileNodes */
- int nsubxacts; /* number of subtransaction XIDs */
int nmsgs; /* number of shared inval msgs */
- Oid dbId; /* MyDatabaseId */
- Oid tsId; /* MyDatabaseTableSpace */
- /* Array of RelFileNode(s) to drop at commit */
- RelFileNode xnodes[FLEXIBLE_ARRAY_MEMBER];
- /* ARRAY OF COMMITTED SUBTRANSACTION XIDs FOLLOWS */
- /* ARRAY OF SHARED INVALIDATION MESSAGES FOLLOWS */
-} xl_xact_commit;
+ SharedInvalidationMessage msgs[FLEXIBLE_ARRAY_MEMBER];
+} xl_xact_invals;
+#define MinSizeOfXactInvals offsetof(xl_xact_invals, msgs)
-#define MinSizeOfXactCommit offsetof(xl_xact_commit, xnodes)
+typedef struct xl_xact_twophase
+{
+ TransactionId xid;
+} xl_xact_twophase;
+#define MinSizeOfXactInvals offsetof(xl_xact_invals, msgs)
-/*
- * These flags are set in the xinfo fields of WAL commit records,
- * indicating a variety of additional actions that need to occur
- * when emulating transaction effects during recovery.
- * They are named XactCompletion... to differentiate them from
- * EOXact... routines which run at the end of the original
- * transaction completion.
- */
-#define XACT_COMPLETION_UPDATE_RELCACHE_FILE 0x01
-#define XACT_COMPLETION_FORCE_SYNC_COMMIT 0x02
+typedef struct xl_xact_commit
+{
+ TimestampTz xact_time; /* time of commit */
-/* Access macros for above flags */
-#define XactCompletionRelcacheInitFileInval(xinfo) (xinfo & XACT_COMPLETION_UPDATE_RELCACHE_FILE)
-#define XactCompletionForceSyncCommit(xinfo) (xinfo & XACT_COMPLETION_FORCE_SYNC_COMMIT)
+ /* xl_xact_xinfo follows if XLOG_XACT_HAS_INFO */
+ /* xl_xact_dbinfo follows if XINFO_HAS_DBINFO */
+ /* xl_xact_subxacts follows if XINFO_HAS_SUBXACT */
+ /* xl_xact_relfilenodes follows if XINFO_HAS_RELFILENODES */
+ /* xl_xact_invals follows if XINFO_HAS_INVALS */
+ /* xl_xact_twophase follows if XINFO_HAS_TWOPHASE */
+} xl_xact_commit;
+#define MinSizeOfXactCommit (offsetof(xl_xact_commit, xact_time) + sizeof(TimestampTz))
typedef struct xl_xact_abort
{
TimestampTz xact_time; /* time of abort */
- int nrels; /* number of RelFileNodes */
- int nsubxacts; /* number of subtransaction XIDs */
- /* Array of RelFileNode(s) to drop at abort */
- RelFileNode xnodes[FLEXIBLE_ARRAY_MEMBER];
- /* ARRAY OF ABORTED SUBTRANSACTION XIDs FOLLOWS */
-} xl_xact_abort;
-/* Note the intentional lack of an invalidation message array c.f. commit */
-
-#define MinSizeOfXactAbort offsetof(xl_xact_abort, xnodes)
+ /* xl_xact_xinfo follows if XLOG_XACT_HAS_INFO */
+ /* No db_info required */
+ /* xl_xact_subxacts follows if HAS_SUBXACT */
+ /* xl_xact_relfilenodes follows if HAS_RELFILENODES */
+ /* No invalidation messages needed. */
+ /* xl_xact_twophase follows if XINFO_HAS_TWOPHASE */
+} xl_xact_abort;
+#define MinSizeOfXactAbort sizeof(xl_xact_abort)
/*
- * COMMIT_PREPARED and ABORT_PREPARED are identical to COMMIT/ABORT records
- * except that we have to store the XID of the prepared transaction explicitly
- * --- the XID in the record header will be invalid.
+ * Commit/Abort records in the above form are a bit verbose to parse, so
+ * there's a deconstructed versions generated by ParseCommit/AbortRecord() for
+ * easier consumption.
*/
-
-typedef struct xl_xact_commit_prepared
+typedef struct xl_xact_parsed_commit
{
- TransactionId xid; /* XID of prepared xact */
- xl_xact_commit crec; /* COMMIT record */
- /* MORE DATA FOLLOWS AT END OF STRUCT */
-} xl_xact_commit_prepared;
+ TimestampTz xact_time;
+
+ uint32 xinfo;
+
+ Oid dbId; /* MyDatabaseId */
+ Oid tsId; /* MyDatabaseTableSpace */
-#define MinSizeOfXactCommitPrepared offsetof(xl_xact_commit_prepared, crec.xnodes)
+ int nsubxacts;
+ TransactionId *subxacts;
-typedef struct xl_xact_abort_prepared
+ int nrels;
+ RelFileNode *xnodes;
+
+ int nmsgs;
+ SharedInvalidationMessage *msgs;
+
+ TransactionId twophase_xid; /* only for 2PC */
+} xl_xact_parsed_commit;
+
+typedef struct xl_xact_parsed_abort
{
- TransactionId xid; /* XID of prepared xact */
- xl_xact_abort arec; /* ABORT record */
- /* MORE DATA FOLLOWS AT END OF STRUCT */
-} xl_xact_abort_prepared;
+ TimestampTz xact_time;
+ uint32 xinfo;
+
+ int nsubxacts;
+ TransactionId *subxacts;
-#define MinSizeOfXactAbortPrepared offsetof(xl_xact_abort_prepared, arec.xnodes)
+ int nrels;
+ RelFileNode *xnodes;
+
+ TransactionId twophase_xid; /* only for 2PC */
+} xl_xact_parsed_abort;
/* ----------------
@@ -256,8 +336,25 @@ extern void UnregisterSubXactCallback(SubXactCallback callback, void *arg);
extern int xactGetCommittedChildren(TransactionId **ptr);
+extern XLogRecPtr XactLogCommitRecord(TimestampTz commit_time,
+ int nsubxacts, TransactionId *subxacts,
+ int nrels, RelFileNode *rels,
+ int nmsgs, SharedInvalidationMessage *msgs,
+ bool relcacheInval, bool forceSync,
+ TransactionId twophase_xid);
+
+extern XLogRecPtr XactLogAbortRecord(TimestampTz abort_time,
+ int nsubxacts, TransactionId *subxacts,
+ int nrels, RelFileNode *rels,
+ TransactionId twophase_xid);
extern void xact_redo(XLogReaderState *record);
+
+/* xactdesc.c */
extern void xact_desc(StringInfo buf, XLogReaderState *record);
extern const char *xact_identify(uint8 info);
+/* also in xactdesc.c, so they can be shared between front/backend code */
+extern void ParseCommitRecord(uint8 info, xl_xact_commit *xlrec, xl_xact_parsed_commit *parsed);
+extern void ParseAbortRecord(uint8 info, xl_xact_abort *xlrec, xl_xact_parsed_abort *parsed);
+
#endif /* XACT_H */
diff --git a/src/include/access/xlog_internal.h b/src/include/access/xlog_internal.h
index f1be598..12a1b61 100644
--- a/src/include/access/xlog_internal.h
+++ b/src/include/access/xlog_internal.h
@@ -31,7 +31,7 @@
/*
* Each page of XLOG file has a header like this:
*/
-#define XLOG_PAGE_MAGIC 0xD082 /* can be used as WAL version indicator */
+#define XLOG_PAGE_MAGIC 0xD083 /* can be used as WAL version indicator */
typedef struct XLogPageHeaderData
{
--
2.3.0.149.gf3f4077
On 2015-03-15 16:30:16 +0100, Andres Freund wrote:
The attached patch does pretty much what you suggested above and tries
to address all the point previously made. I plan to push this fairly
soon; unless somebody has further input.
Pushed, after fixing two typos in decode.c I introduced while "cleaning
up". This might not yet be perfect, but there seems little point in
waiting further.
Greetings,
Andres Freund
--
Andres Freund http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Training & Services
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers