diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c new file mode 100644 index 2812681..f552800 *** a/src/backend/access/transam/twophase.c --- b/src/backend/access/transam/twophase.c *************** RecordTransactionCommitPrepared(Transact *** 1977,2014 **** xlrec.xid = xid; xlrec.crec.xact_time = GetCurrentTimestamp(); xlrec.crec.xinfo = initfileinval ? XACT_COMPLETION_UPDATE_RELCACHE_FILE : 0; - xlrec.crec.nmsgs = 0; - xlrec.crec.nrels = nrels; - xlrec.crec.nsubxacts = nchildren; - xlrec.crec.nmsgs = ninvalmsgs; - rdata[0].data = (char *) (&xlrec); rdata[0].len = MinSizeOfXactCommitPrepared; rdata[0].buffer = InvalidBuffer; /* dump rels to delete */ if (nrels > 0) { rdata[0].next = &(rdata[1]); ! rdata[1].data = (char *) rels; ! rdata[1].len = nrels * sizeof(RelFileNode); rdata[1].buffer = InvalidBuffer; lastrdata = 1; } /* dump committed child Xids */ if (nchildren > 0) { rdata[lastrdata].next = &(rdata[2]); ! rdata[2].data = (char *) children; ! rdata[2].len = nchildren * sizeof(TransactionId); rdata[2].buffer = InvalidBuffer; lastrdata = 2; } ! /* dump cache invalidation messages */ if (ninvalmsgs > 0) { rdata[lastrdata].next = &(rdata[3]); ! rdata[3].data = (char *) invalmsgs; ! rdata[3].len = ninvalmsgs * sizeof(SharedInvalidationMessage); rdata[3].buffer = InvalidBuffer; lastrdata = 3; } --- 1977,2034 ---- xlrec.xid = xid; xlrec.crec.xact_time = GetCurrentTimestamp(); xlrec.crec.xinfo = initfileinval ? XACT_COMPLETION_UPDATE_RELCACHE_FILE : 0; rdata[0].data = (char *) (&xlrec); rdata[0].len = MinSizeOfXactCommitPrepared; rdata[0].buffer = InvalidBuffer; /* dump rels to delete */ if (nrels > 0) { + xl_xact_commit_opt *opt_rels = (xl_xact_commit_opt *) + palloc(MinSizeOfXactCommitOpt + + nrels * sizeof(RelFileNode)); rdata[0].next = &(rdata[1]); ! xlrec.crec.xinfo |= XACT_COMPLETION_DROP_REL_NODES_PRESENT; ! opt_rels->nopts = nrels; ! memcpy(&(opt_rels->opts), rels, nrels * sizeof(RelFileNode)); ! rdata[1].data = (char *) opt_rels; ! rdata[1].len = MinSizeOfXactCommitOpt + nrels * sizeof(RelFileNode); rdata[1].buffer = InvalidBuffer; lastrdata = 1; } /* dump committed child Xids */ if (nchildren > 0) { + xl_xact_commit_opt *opt_child = + (xl_xact_commit_opt *) + palloc(MinSizeOfXactCommitOpt + + nchildren * sizeof(TransactionId)); rdata[lastrdata].next = &(rdata[2]); ! xlrec.crec.xinfo |= XACT_COMPLETION_COMMITTED_SUB_PRESENT; ! ! opt_child->nopts = nchildren; ! memcpy(&(opt_child->opts), children, ! nchildren * sizeof(TransactionId)); ! rdata[2].data = (char *) opt_child; ! rdata[2].len = MinSizeOfXactCommitOpt + ! nchildren * sizeof(TransactionId); rdata[2].buffer = InvalidBuffer; lastrdata = 2; } ! /* dump shared cache invalidation messages */ if (ninvalmsgs > 0) { + xl_xact_commit_opt *opt_shr = + (xl_xact_commit_opt *) + palloc(MinSizeOfXactCommitOpt + + ninvalmsgs * sizeof(SharedInvalidationMessage)); rdata[lastrdata].next = &(rdata[3]); ! xlrec.crec.xinfo |= XACT_COMPLETION_SHARED_INV_PRESENT; ! opt_shr->nopts = ninvalmsgs; ! memcpy(&(opt_shr->opts), invalmsgs, ! ninvalmsgs * sizeof(SharedInvalidationMessage)); ! rdata[3].data = (char *) opt_shr; ! rdata[3].len = MinSizeOfXactCommitOpt + ! ninvalmsgs * sizeof(SharedInvalidationMessage); rdata[3].buffer = InvalidBuffer; lastrdata = 3; } diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c new file mode 100644 index 2ca1c14..730f0de *** a/src/backend/access/transam/xact.c --- b/src/backend/access/transam/xact.c *************** RecordTransactionCommit(void) *** 1003,1038 **** SetCurrentTransactionStopTimestamp(); xlrec.xact_time = xactStopTimestamp; - xlrec.nrels = nrels; - xlrec.nsubxacts = nchildren; - xlrec.nmsgs = nmsgs; rdata[0].data = (char *) (&xlrec); rdata[0].len = MinSizeOfXactCommit; rdata[0].buffer = InvalidBuffer; /* dump rels to delete */ if (nrels > 0) { rdata[0].next = &(rdata[1]); ! rdata[1].data = (char *) rels; ! rdata[1].len = nrels * sizeof(RelFileNode); rdata[1].buffer = InvalidBuffer; lastrdata = 1; } /* dump committed child Xids */ if (nchildren > 0) { rdata[lastrdata].next = &(rdata[2]); ! rdata[2].data = (char *) children; ! rdata[2].len = nchildren * sizeof(TransactionId); rdata[2].buffer = InvalidBuffer; lastrdata = 2; } /* dump shared cache invalidation messages */ if (nmsgs > 0) { rdata[lastrdata].next = &(rdata[3]); ! rdata[3].data = (char *) invalMessages; ! rdata[3].len = nmsgs * sizeof(SharedInvalidationMessage); rdata[3].buffer = InvalidBuffer; lastrdata = 3; } --- 1003,1060 ---- SetCurrentTransactionStopTimestamp(); xlrec.xact_time = xactStopTimestamp; rdata[0].data = (char *) (&xlrec); rdata[0].len = MinSizeOfXactCommit; rdata[0].buffer = InvalidBuffer; /* dump rels to delete */ if (nrels > 0) { + xl_xact_commit_opt *opt_rels = (xl_xact_commit_opt *) + palloc(MinSizeOfXactCommitOpt + + nrels * sizeof(RelFileNode)); rdata[0].next = &(rdata[1]); ! xlrec.xinfo |= XACT_COMPLETION_DROP_REL_NODES_PRESENT; ! opt_rels->nopts = nrels; ! memcpy(&(opt_rels->opts), rels, nrels * sizeof(RelFileNode)); ! rdata[1].data = (char *) opt_rels; ! rdata[1].len = MinSizeOfXactCommitOpt + nrels * sizeof(RelFileNode); rdata[1].buffer = InvalidBuffer; lastrdata = 1; } /* dump committed child Xids */ if (nchildren > 0) { + xl_xact_commit_opt *opt_child = + (xl_xact_commit_opt *) + palloc(MinSizeOfXactCommitOpt + + nchildren * sizeof(TransactionId)); rdata[lastrdata].next = &(rdata[2]); ! xlrec.xinfo |= XACT_COMPLETION_COMMITTED_SUB_PRESENT; ! ! opt_child->nopts = nchildren; ! memcpy(&(opt_child->opts), children, ! nchildren * sizeof(TransactionId)); ! rdata[2].data = (char *) opt_child; ! rdata[2].len = MinSizeOfXactCommitOpt + ! nchildren * sizeof(TransactionId); rdata[2].buffer = InvalidBuffer; lastrdata = 2; } /* dump shared cache invalidation messages */ if (nmsgs > 0) { + xl_xact_commit_opt *opt_shr = + (xl_xact_commit_opt *) + palloc(MinSizeOfXactCommitOpt + + nmsgs * sizeof(SharedInvalidationMessage)); rdata[lastrdata].next = &(rdata[3]); ! xlrec.xinfo |= XACT_COMPLETION_SHARED_INV_PRESENT; ! opt_shr->nopts = nmsgs; ! memcpy(&(opt_shr->opts), invalMessages, ! nmsgs * sizeof(SharedInvalidationMessage)); ! rdata[3].data = (char *) opt_shr; ! rdata[3].len = MinSizeOfXactCommitOpt + ! nmsgs * sizeof(SharedInvalidationMessage); rdata[3].buffer = InvalidBuffer; lastrdata = 3; } *************** xactGetCommittedChildren(TransactionId * *** 4436,4441 **** --- 4458,4500 ---- * XLOG support routines */ + + static void + xact_read_commit(xl_xact_commit *xlrec, + RelFileNode **drop_rels, + TransactionId **sub_xids, + SharedInvalidationMessage **inval_msgs, + int *nrels, + int *nsubxacts, + int *nmsgs) + { + xl_xact_commit_opt *nextOpts = &xlrec->xnodes; + *nrels = 0; + *nsubxacts = 0; + *nmsgs = 0; + + if (XactCompletionDropRelPresent(xlrec)) + { + *drop_rels = (RelFileNode *) xlrec->xnodes.opts; + *nrels = xlrec->xnodes.nopts; + nextOpts = (xl_xact_commit_opt *) + &(xlrec->xnodes.opts[*nrels]); + } + + if (XactCompletionCommittedSubPresent(xlrec)) + { + *sub_xids = (TransactionId *) nextOpts->opts; + *nsubxacts = nextOpts->nopts; + nextOpts = (xl_xact_commit_opt *) &(nextOpts->opts[*nsubxacts]); + } + + if (XactCompletionSharedInvPresent(xlrec)) + { + *inval_msgs = (SharedInvalidationMessage *) nextOpts->opts; + *nmsgs = nextOpts->nopts; + } + } + /* * Before 9.0 this was a fairly short function, but now it performs many * actions for which the order of execution is critical. *************** xactGetCommittedChildren(TransactionId * *** 4443,4459 **** static void xact_redo_commit(xl_xact_commit *xlrec, TransactionId xid, XLogRecPtr lsn) { TransactionId *sub_xids; SharedInvalidationMessage *inval_msgs; TransactionId max_xid; int i; ! /* subxid array follows relfilenodes */ ! sub_xids = (TransactionId *) &(xlrec->xnodes[xlrec->nrels]); ! /* invalidation messages array follows subxids */ ! inval_msgs = (SharedInvalidationMessage *) &(sub_xids[xlrec->nsubxacts]); ! ! max_xid = TransactionIdLatest(xid, xlrec->nsubxacts, sub_xids); /* * Make sure nextXid is beyond any XID mentioned in the record. --- 4502,4519 ---- static void xact_redo_commit(xl_xact_commit *xlrec, TransactionId xid, XLogRecPtr lsn) { + RelFileNode *drop_rels; TransactionId *sub_xids; SharedInvalidationMessage *inval_msgs; TransactionId max_xid; int i; + int nrels = 0; + int nsubxacts = 0; + int nmsgs = 0; ! xact_read_commit(xlrec, &drop_rels, &sub_xids, &inval_msgs, &nrels, ! &nsubxacts, &nmsgs); ! max_xid = TransactionIdLatest(xid, nsubxacts, sub_xids); /* * Make sure nextXid is beyond any XID mentioned in the record. *************** xact_redo_commit(xl_xact_commit *xlrec, *** 4476,4482 **** /* * Mark the transaction committed in pg_clog. */ ! TransactionIdCommitTree(xid, xlrec->nsubxacts, sub_xids); } else { --- 4536,4542 ---- /* * Mark the transaction committed in pg_clog. */ ! TransactionIdCommitTree(xid, nsubxacts, sub_xids); } else { *************** xact_redo_commit(xl_xact_commit *xlrec, *** 4500,4518 **** * bits set on changes made by transactions that haven't yet * recovered. It's unlikely but it's good to be safe. */ ! TransactionIdAsyncCommitTree(xid, xlrec->nsubxacts, sub_xids, lsn); /* * We must mark clog before we update the ProcArray. */ ! ExpireTreeKnownAssignedTransactionIds(xid, xlrec->nsubxacts, sub_xids, max_xid); /* * Send any cache invalidations attached to the commit. We must * maintain the same order of invalidation then release locks as * occurs in CommitTransaction(). */ ! ProcessCommittedInvalidationMessages(inval_msgs, xlrec->nmsgs, XactCompletionRelcacheInitFileInval(xlrec), xlrec->dbId, xlrec->tsId); --- 4560,4578 ---- * bits set on changes made by transactions that haven't yet * recovered. It's unlikely but it's good to be safe. */ ! TransactionIdAsyncCommitTree(xid, nsubxacts, sub_xids, lsn); /* * We must mark clog before we update the ProcArray. */ ! ExpireTreeKnownAssignedTransactionIds(xid, nsubxacts, sub_xids, max_xid); /* * Send any cache invalidations attached to the commit. We must * maintain the same order of invalidation then release locks as * occurs in CommitTransaction(). */ ! ProcessCommittedInvalidationMessages(inval_msgs, nmsgs, XactCompletionRelcacheInitFileInval(xlrec), xlrec->dbId, xlrec->tsId); *************** xact_redo_commit(xl_xact_commit *xlrec, *** 4521,4540 **** * phase transactions. In effect we are ignoring the prepare phase and * just going straight to lock release. */ ! StandbyReleaseLockTree(xid, xlrec->nsubxacts, sub_xids); } /* Make sure files supposed to be dropped are dropped */ ! for (i = 0; i < xlrec->nrels; i++) { ! SMgrRelation srel = smgropen(xlrec->xnodes[i], InvalidBackendId); ForkNumber fork; for (fork = 0; fork <= MAX_FORKNUM; fork++) { if (smgrexists(srel, fork)) { ! XLogDropRelation(xlrec->xnodes[i], fork); smgrdounlink(srel, fork, true); } } --- 4581,4600 ---- * phase transactions. In effect we are ignoring the prepare phase and * just going straight to lock release. */ ! StandbyReleaseLockTree(xid, nsubxacts, sub_xids); } /* Make sure files supposed to be dropped are dropped */ ! for (i = 0; i < nrels; i++) { ! SMgrRelation srel = smgropen(drop_rels[i], InvalidBackendId); ForkNumber fork; for (fork = 0; fork <= MAX_FORKNUM; fork++) { if (smgrexists(srel, fork)) { ! XLogDropRelation(drop_rels[i], fork); smgrdounlink(srel, fork, true); } } *************** xact_desc_commit(StringInfo buf, xl_xact *** 4704,4745 **** { int i; TransactionId *xacts; ! xacts = (TransactionId *) &xlrec->xnodes[xlrec->nrels]; appendStringInfoString(buf, timestamptz_to_str(xlrec->xact_time)); ! if (xlrec->nrels > 0) { appendStringInfo(buf, "; rels:"); ! for (i = 0; i < xlrec->nrels; i++) { ! char *path = relpathperm(xlrec->xnodes[i], MAIN_FORKNUM); appendStringInfo(buf, " %s", path); pfree(path); } } ! if (xlrec->nsubxacts > 0) { appendStringInfo(buf, "; subxacts:"); ! for (i = 0; i < xlrec->nsubxacts; i++) appendStringInfo(buf, " %u", xacts[i]); } ! if (xlrec->nmsgs > 0) { - SharedInvalidationMessage *msgs; - - msgs = (SharedInvalidationMessage *) &xacts[xlrec->nsubxacts]; - if (XactCompletionRelcacheInitFileInval(xlrec)) appendStringInfo(buf, "; relcache init file inval dbid %u tsid %u", xlrec->dbId, xlrec->tsId); appendStringInfo(buf, "; inval msgs:"); ! for (i = 0; i < xlrec->nmsgs; i++) { ! SharedInvalidationMessage *msg = &msgs[i]; if (msg->id >= 0) appendStringInfo(buf, " catcache %d", msg->id); --- 4764,4807 ---- { int i; TransactionId *xacts; + RelFileNode *drop_rels; + SharedInvalidationMessage *inval_msgs; + int nrels = 0; + int nsubxacts = 0; + int nmsgs = 0; ! xact_read_commit(xlrec, &drop_rels, &xacts, &inval_msgs, &nrels, ! &nsubxacts, &nmsgs); appendStringInfoString(buf, timestamptz_to_str(xlrec->xact_time)); ! if (nrels > 0) { appendStringInfo(buf, "; rels:"); ! for (i = 0; i < nrels; i++) { ! char *path = relpathperm(drop_rels[i], MAIN_FORKNUM); appendStringInfo(buf, " %s", path); pfree(path); } } ! if (nsubxacts > 0) { appendStringInfo(buf, "; subxacts:"); ! for (i = 0; i < nsubxacts; i++) appendStringInfo(buf, " %u", xacts[i]); } ! if (nmsgs > 0) { if (XactCompletionRelcacheInitFileInval(xlrec)) appendStringInfo(buf, "; relcache init file inval dbid %u tsid %u", xlrec->dbId, xlrec->tsId); appendStringInfo(buf, "; inval msgs:"); ! for (i = 0; i < nmsgs; i++) { ! SharedInvalidationMessage *msg = &inval_msgs[i]; if (msg->id >= 0) appendStringInfo(buf, " catcache %d", msg->id); diff --git a/src/include/access/xact.h b/src/include/access/xact.h new file mode 100644 index cb440d4..9a6a7a2 *** a/src/include/access/xact.h --- b/src/include/access/xact.h *************** typedef struct xl_xact_assignment *** 116,134 **** #define MinSizeOfXactAssignment offsetof(xl_xact_assignment, xsub) typedef struct xl_xact_commit { TimestampTz xact_time; /* time of commit */ uint32 xinfo; /* info flags */ - int nrels; /* number of RelFileNodes */ - int nsubxacts; /* number of subtransaction XIDs */ - int nmsgs; /* number of shared inval msgs */ Oid dbId; /* MyDatabaseId */ Oid tsId; /* MyDatabaseTableSpace */ ! /* Array of RelFileNode(s) to drop at commit */ ! RelFileNode xnodes[1]; /* VARIABLE LENGTH ARRAY */ ! /* ARRAY OF COMMITTED SUBTRANSACTION XIDs FOLLOWS */ ! /* ARRAY OF SHARED INVALIDATION MESSAGES FOLLOWS */ } xl_xact_commit; #define MinSizeOfXactCommit offsetof(xl_xact_commit, xnodes) --- 116,141 ---- #define MinSizeOfXactAssignment offsetof(xl_xact_assignment, xsub) + typedef struct xl_xact_commit_opt + { + int nopts; + char opts[1]; + } xl_xact_commit_opt; + + #define MinSizeOfXactCommitOpt offsetof(xl_xact_commit_opt, opts) + typedef struct xl_xact_commit { TimestampTz xact_time; /* time of commit */ uint32 xinfo; /* info flags */ Oid dbId; /* MyDatabaseId */ Oid tsId; /* MyDatabaseTableSpace */ ! /* Array of RelFileNode(s) to drop at commit (optional) */ ! xl_xact_commit_opt xnodes; ! /* ARRAY OF OPTIONAL xl_xact_commit_opt COMMITTED SUBTRANSACTION XIDs ! * FOLLOWS */ ! /* ARRAY OF OPTIONAL xl_xact_commit_opt SHARED INVALIDATION MESSAGES ! * FOLLOWS */ } xl_xact_commit; #define MinSizeOfXactCommit offsetof(xl_xact_commit, xnodes) *************** typedef struct xl_xact_commit *** 148,153 **** --- 155,174 ---- #define XactCompletionRelcacheInitFileInval(xlrec) ((xlrec)->xinfo & XACT_COMPLETION_UPDATE_RELCACHE_FILE) #define XactCompletionForceSyncCommit(xlrec) ((xlrec)->xinfo & XACT_COMPLETION_FORCE_SYNC_COMMIT) + /* + * These flags are set in the xinfo fields of WAL commit records, + * indicating the presence of optionals xl_xact_commit_opt structures + * (RelFileNodes to drop at commit, subtransaction XIDs, shared inval msgs) + */ + #define XACT_COMPLETION_DROP_REL_NODES_PRESENT 0x04 + #define XACT_COMPLETION_COMMITTED_SUB_PRESENT 0x08 + #define XACT_COMPLETION_SHARED_INV_PRESENT 0x10 + + /* Access macros for above flags */ + #define XactCompletionDropRelPresent(xlrec) ((xlrec)->xinfo & XACT_COMPLETION_DROP_REL_NODES_PRESENT) + #define XactCompletionCommittedSubPresent(xlrec) ((xlrec)->xinfo & XACT_COMPLETION_COMMITTED_SUB_PRESENT) + #define XactCompletionSharedInvPresent(xlrec) ((xlrec)->xinfo & XACT_COMPLETION_SHARED_INV_PRESENT) + typedef struct xl_xact_abort { TimestampTz xact_time; /* time of abort */