diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c new file mode 100644 index 2ca1c14..727b86b *** a/src/backend/access/transam/xact.c --- b/src/backend/access/transam/xact.c *************** RecordTransactionCommit(void) *** 962,987 **** /* * Begin commit critical section and insert the commit XLOG record. */ - XLogRecData rdata[4]; - int lastrdata = 0; - xl_xact_commit xlrec; - /* Tell bufmgr and smgr to prepare for commit */ BufmgrCommit(); /* - * Set flags required for recovery processing of commits. - */ - xlrec.xinfo = 0; - if (RelcacheInitFileInval) - xlrec.xinfo |= XACT_COMPLETION_UPDATE_RELCACHE_FILE; - if (forceSyncCommit) - xlrec.xinfo |= XACT_COMPLETION_FORCE_SYNC_COMMIT; - - xlrec.dbId = MyDatabaseId; - xlrec.tsId = MyDatabaseTableSpace; - - /* * Mark ourselves as within our "commit critical section". This * forces any concurrent checkpoint to wait until we've updated * pg_clog. Without this, it is possible for the checkpoint to set --- 962,971 ---- *************** RecordTransactionCommit(void) *** 1002,1044 **** MyProc->inCommit = true; SetCurrentTransactionStopTimestamp(); ! xlrec.xact_time = xactStopTimestamp; ! xlrec.nrels = nrels; ! xlrec.nsubxacts = nchildren; ! xlrec.nmsgs = nmsgs; ! rdata[0].data = (char *) (&xlrec); ! rdata[0].len = MinSizeOfXactCommit; ! rdata[0].buffer = InvalidBuffer; ! /* dump rels to delete */ ! if (nrels > 0) ! { ! rdata[0].next = &(rdata[1]); ! rdata[1].data = (char *) rels; ! rdata[1].len = nrels * sizeof(RelFileNode); ! rdata[1].buffer = InvalidBuffer; ! lastrdata = 1; ! } ! /* dump committed child Xids */ ! if (nchildren > 0) { ! rdata[lastrdata].next = &(rdata[2]); ! rdata[2].data = (char *) children; ! rdata[2].len = nchildren * sizeof(TransactionId); ! rdata[2].buffer = InvalidBuffer; ! lastrdata = 2; } ! /* dump shared cache invalidation messages */ ! if (nmsgs > 0) { ! rdata[lastrdata].next = &(rdata[3]); ! rdata[3].data = (char *) invalMessages; ! rdata[3].len = nmsgs * sizeof(SharedInvalidationMessage); ! rdata[3].buffer = InvalidBuffer; ! lastrdata = 3; ! } ! rdata[lastrdata].next = NULL; ! (void) XLogInsert(RM_XACT_ID, XLOG_XACT_COMMIT, rdata); } /* --- 986,1071 ---- MyProc->inCommit = true; SetCurrentTransactionStopTimestamp(); ! ! if (nrels == 0 && nmsgs == 0) { ! XLogRecData rdata[2]; ! int lastrdata = 0; ! xl_xact_commit xlrec; ! xlrec.xact_time = xactStopTimestamp; ! xlrec.nsubxacts = nchildren; ! rdata[0].data = (char *) (&xlrec); ! rdata[0].len = MinSizeOfXactCommit; ! rdata[0].buffer = InvalidBuffer; ! /* dump committed child Xids */ ! if (nchildren > 0) ! { ! rdata[0].next = &(rdata[1]); ! rdata[1].data = (char *) children; ! rdata[1].len = nchildren * sizeof(TransactionId); ! rdata[1].buffer = InvalidBuffer; ! lastrdata = 1; ! } ! rdata[lastrdata].next = NULL; ! ! (void) XLogInsert(RM_XACT_ID, XLOG_XACT_COMMIT, rdata); ! } ! else { ! XLogRecData rdata[4]; ! int lastrdata = 0; ! xl_xact_commit_with_info xlrec; ! /* ! * Set flags required for recovery processing of commits. ! */ ! xlrec.xinfo = 0; ! if (RelcacheInitFileInval) ! xlrec.xinfo |= XACT_COMPLETION_UPDATE_RELCACHE_FILE; ! if (forceSyncCommit) ! xlrec.xinfo |= XACT_COMPLETION_FORCE_SYNC_COMMIT; ! xlrec.dbId = MyDatabaseId; ! xlrec.tsId = MyDatabaseTableSpace; ! ! xlrec.xact_time = xactStopTimestamp; ! xlrec.nrels = nrels; ! xlrec.nsubxacts = nchildren; ! xlrec.nmsgs = nmsgs; ! rdata[0].data = (char *) (&xlrec); ! rdata[0].len = MinSizeOfXactCommit; ! rdata[0].buffer = InvalidBuffer; ! /* dump rels to delete */ ! if (nrels > 0) ! { ! rdata[0].next = &(rdata[1]); ! rdata[1].data = (char *) rels; ! rdata[1].len = nrels * sizeof(RelFileNode); ! rdata[1].buffer = InvalidBuffer; ! lastrdata = 1; ! } ! /* dump committed child Xids */ ! if (nchildren > 0) ! { ! rdata[lastrdata].next = &(rdata[2]); ! rdata[2].data = (char *) children; ! rdata[2].len = nchildren * sizeof(TransactionId); ! rdata[2].buffer = InvalidBuffer; ! lastrdata = 2; ! } ! /* dump shared cache invalidation messages */ ! if (nmsgs > 0) ! { ! rdata[lastrdata].next = &(rdata[3]); ! rdata[3].data = (char *) invalMessages; ! rdata[3].len = nmsgs * sizeof(SharedInvalidationMessage); ! rdata[3].buffer = InvalidBuffer; ! lastrdata = 3; ! } ! rdata[lastrdata].next = NULL; ! ! (void) XLogInsert(RM_XACT_ID, XLOG_XACT_COMMIT_WITH_INFO, rdata); ! } } /* *************** xactGetCommittedChildren(TransactionId * *** 4441,4459 **** * actions for which the order of execution is critical. */ static void ! xact_redo_commit(xl_xact_commit *xlrec, TransactionId xid, XLogRecPtr lsn) { - TransactionId *sub_xids; - SharedInvalidationMessage *inval_msgs; TransactionId max_xid; int i; ! /* subxid array follows relfilenodes */ ! sub_xids = (TransactionId *) &(xlrec->xnodes[xlrec->nrels]); ! /* invalidation messages array follows subxids */ ! inval_msgs = (SharedInvalidationMessage *) &(sub_xids[xlrec->nsubxacts]); ! ! max_xid = TransactionIdLatest(xid, xlrec->nsubxacts, sub_xids); /* * Make sure nextXid is beyond any XID mentioned in the record. --- 4468,4483 ---- * actions for which the order of execution is critical. */ static void ! xact_redo_commit(RelFileNode *xnodes, int nrels, ! TransactionId *sub_xids, int nsubxacts, ! SharedInvalidationMessage *inval_msgs, int nmsgs, ! Oid dbId, Oid tsId, uint32 xinfo, ! TransactionId xid, XLogRecPtr lsn) { TransactionId max_xid; int i; ! max_xid = TransactionIdLatest(xid, nsubxacts, sub_xids); /* * Make sure nextXid is beyond any XID mentioned in the record. *************** xact_redo_commit(xl_xact_commit *xlrec, *** 4476,4482 **** /* * Mark the transaction committed in pg_clog. */ ! TransactionIdCommitTree(xid, xlrec->nsubxacts, sub_xids); } else { --- 4500,4506 ---- /* * Mark the transaction committed in pg_clog. */ ! TransactionIdCommitTree(xid, nsubxacts, sub_xids); } else { *************** xact_redo_commit(xl_xact_commit *xlrec, *** 4500,4540 **** * bits set on changes made by transactions that haven't yet * recovered. It's unlikely but it's good to be safe. */ ! TransactionIdAsyncCommitTree(xid, xlrec->nsubxacts, sub_xids, lsn); /* * We must mark clog before we update the ProcArray. */ ! ExpireTreeKnownAssignedTransactionIds(xid, xlrec->nsubxacts, sub_xids, max_xid); /* * Send any cache invalidations attached to the commit. We must * maintain the same order of invalidation then release locks as * occurs in CommitTransaction(). */ ! ProcessCommittedInvalidationMessages(inval_msgs, xlrec->nmsgs, ! XactCompletionRelcacheInitFileInval(xlrec), ! xlrec->dbId, xlrec->tsId); /* * Release locks, if any. We do this for both two phase and normal one * phase transactions. In effect we are ignoring the prepare phase and * just going straight to lock release. */ ! StandbyReleaseLockTree(xid, xlrec->nsubxacts, sub_xids); } /* Make sure files supposed to be dropped are dropped */ ! for (i = 0; i < xlrec->nrels; i++) { ! SMgrRelation srel = smgropen(xlrec->xnodes[i], InvalidBackendId); ForkNumber fork; for (fork = 0; fork <= MAX_FORKNUM; fork++) { if (smgrexists(srel, fork)) { ! XLogDropRelation(xlrec->xnodes[i], fork); smgrdounlink(srel, fork, true); } } --- 4524,4564 ---- * bits set on changes made by transactions that haven't yet * recovered. It's unlikely but it's good to be safe. */ ! TransactionIdAsyncCommitTree(xid, nsubxacts, sub_xids, lsn); /* * We must mark clog before we update the ProcArray. */ ! ExpireTreeKnownAssignedTransactionIds(xid, nsubxacts, sub_xids, max_xid); /* * Send any cache invalidations attached to the commit. We must * maintain the same order of invalidation then release locks as * occurs in CommitTransaction(). */ ! ProcessCommittedInvalidationMessages(inval_msgs, nmsgs, ! XactCompletionRelcacheInitFileInval(xinfo), ! dbId, tsId); /* * Release locks, if any. We do this for both two phase and normal one * phase transactions. In effect we are ignoring the prepare phase and * just going straight to lock release. */ ! StandbyReleaseLockTree(xid, nsubxacts, sub_xids); } /* Make sure files supposed to be dropped are dropped */ ! for (i = 0; i < nrels; i++) { ! SMgrRelation srel = smgropen(xnodes[i], InvalidBackendId); ForkNumber fork; for (fork = 0; fork <= MAX_FORKNUM; fork++) { if (smgrexists(srel, fork)) { ! XLogDropRelation(xnodes[i], fork); smgrdounlink(srel, fork, true); } } *************** xact_redo_commit(xl_xact_commit *xlrec, *** 4553,4560 **** * to reduce that problem window, for any user that requested * ForceSyncCommit(). */ ! if (XactCompletionForceSyncCommit(xlrec)) XLogFlush(lsn); } /* --- 4577,4616 ---- * to reduce that problem window, for any user that requested * ForceSyncCommit(). */ ! if (XactCompletionForceSyncCommit(xinfo)) XLogFlush(lsn); + + } + /* + * utility function to call xact_redo_commit with parameters found in xlrec + */ + static void + xact_redo_commit_with_info(xl_xact_commit_with_info *xlrec, + TransactionId xid, XLogRecPtr lsn) + { + TransactionId *sub_xids; + SharedInvalidationMessage *inval_msgs; + + /* subxid array follows relfilenodes */ + sub_xids = (TransactionId *) &(xlrec->xnodes[xlrec->nrels]); + /* invalidation messages array follows subxids */ + inval_msgs = (SharedInvalidationMessage *) &(sub_xids[xlrec->nsubxacts]); + + xact_redo_commit(xlrec->xnodes, xlrec->nrels, sub_xids, xlrec->nsubxacts, + inval_msgs, xlrec->nmsgs, xlrec->dbId, xlrec->tsId, + xlrec->xinfo, xid, lsn); + } + + /* + * utility function to call xact_redo_commit with proper parameters: + * parameters not available in xl_xact_commit are defaulted to 0. + */ + static void + xact_redo_commit_without_info(xl_xact_commit *xlrec, + TransactionId xid, XLogRecPtr lsn) + { + xact_redo_commit(NULL, 0, xlrec->children, xlrec->nsubxacts, + NULL, 0, InvalidOid, InvalidOid, 0, xid, lsn); } /* *************** xact_redo(XLogRecPtr lsn, XLogRecord *re *** 4659,4665 **** { xl_xact_commit *xlrec = (xl_xact_commit *) XLogRecGetData(record); ! xact_redo_commit(xlrec, record->xl_xid, lsn); } else if (info == XLOG_XACT_ABORT) { --- 4715,4727 ---- { xl_xact_commit *xlrec = (xl_xact_commit *) XLogRecGetData(record); ! xact_redo_commit_without_info(xlrec, record->xl_xid, lsn); ! } ! else if (info == XLOG_XACT_COMMIT_WITH_INFO) ! { ! xl_xact_commit_with_info *xlrec = (xl_xact_commit_with_info *) XLogRecGetData(record); ! ! xact_redo_commit_with_info(xlrec, record->xl_xid, lsn); } else if (info == XLOG_XACT_ABORT) { *************** xact_redo(XLogRecPtr lsn, XLogRecord *re *** 4677,4683 **** { xl_xact_commit_prepared *xlrec = (xl_xact_commit_prepared *) XLogRecGetData(record); ! xact_redo_commit(&xlrec->crec, xlrec->xid, lsn); RemoveTwoPhaseFile(xlrec->xid, false); } else if (info == XLOG_XACT_ABORT_PREPARED) --- 4739,4745 ---- { xl_xact_commit_prepared *xlrec = (xl_xact_commit_prepared *) XLogRecGetData(record); ! xact_redo_commit_with_info(&xlrec->crec, xlrec->xid, lsn); RemoveTwoPhaseFile(xlrec->xid, false); } else if (info == XLOG_XACT_ABORT_PREPARED) *************** xact_redo(XLogRecPtr lsn, XLogRecord *re *** 4700,4706 **** } static void ! xact_desc_commit(StringInfo buf, xl_xact_commit *xlrec) { int i; TransactionId *xacts; --- 4762,4768 ---- } static void ! xact_desc_commit_with_info(StringInfo buf, xl_xact_commit_with_info *xlrec) { int i; TransactionId *xacts; *************** xact_desc_commit(StringInfo buf, xl_xact *** 4732,4738 **** msgs = (SharedInvalidationMessage *) &xacts[xlrec->nsubxacts]; ! if (XactCompletionRelcacheInitFileInval(xlrec)) appendStringInfo(buf, "; relcache init file inval dbid %u tsid %u", xlrec->dbId, xlrec->tsId); --- 4794,4800 ---- msgs = (SharedInvalidationMessage *) &xacts[xlrec->nsubxacts]; ! if (XactCompletionRelcacheInitFileInval(xlrec->xinfo)) appendStringInfo(buf, "; relcache init file inval dbid %u tsid %u", xlrec->dbId, xlrec->tsId); *************** xact_desc_commit(StringInfo buf, xl_xact *** 4759,4764 **** --- 4821,4841 ---- } static void + xact_desc_commit(StringInfo buf, xl_xact_commit *xlrec) + { + int i; + + appendStringInfoString(buf, timestamptz_to_str(xlrec->xact_time)); + + if (xlrec->nsubxacts > 0) + { + appendStringInfo(buf, "; subxacts:"); + for (i = 0; i < xlrec->nsubxacts; i++) + appendStringInfo(buf, " %u", xlrec->children[i]); + } + } + + static void xact_desc_abort(StringInfo buf, xl_xact_abort *xlrec) { int i; *************** xact_desc(StringInfo buf, uint8 xl_info, *** 4809,4814 **** --- 4886,4898 ---- appendStringInfo(buf, "commit: "); xact_desc_commit(buf, xlrec); } + else if (info == XLOG_XACT_COMMIT_WITH_INFO) + { + xl_xact_commit_with_info *xlrec = (xl_xact_commit_with_info *) rec; + + appendStringInfo(buf, "commit: "); + xact_desc_commit_with_info(buf, xlrec); + } else if (info == XLOG_XACT_ABORT) { xl_xact_abort *xlrec = (xl_xact_abort *) rec; *************** xact_desc(StringInfo buf, uint8 xl_info, *** 4825,4831 **** xl_xact_commit_prepared *xlrec = (xl_xact_commit_prepared *) rec; appendStringInfo(buf, "commit prepared %u: ", xlrec->xid); ! xact_desc_commit(buf, &xlrec->crec); } else if (info == XLOG_XACT_ABORT_PREPARED) { --- 4909,4915 ---- xl_xact_commit_prepared *xlrec = (xl_xact_commit_prepared *) rec; appendStringInfo(buf, "commit prepared %u: ", xlrec->xid); ! xact_desc_commit_with_info(buf, &xlrec->crec); } else if (info == XLOG_XACT_ABORT_PREPARED) { diff --git a/src/include/access/xact.h b/src/include/access/xact.h new file mode 100644 index cb440d4..a75be3f *** a/src/include/access/xact.h --- b/src/include/access/xact.h *************** typedef void (*SubXactCallback) (SubXact *** 106,111 **** --- 106,112 ---- #define XLOG_XACT_COMMIT_PREPARED 0x30 #define XLOG_XACT_ABORT_PREPARED 0x40 #define XLOG_XACT_ASSIGNMENT 0x50 + #define XLOG_XACT_COMMIT_WITH_INFO 0x60 typedef struct xl_xact_assignment { *************** typedef struct xl_xact_assignment *** 119,124 **** --- 120,136 ---- typedef struct xl_xact_commit { TimestampTz xact_time; /* time of commit */ + int nsubxacts; /* number of subtransaction XIDs */ + /* ARRAY OF COMMITTED SUBTRANSACTION XIDs FOLLOWS */ + TransactionId children[1]; /* VARIABLE LENGTH ARRAY */ + } xl_xact_commit; + + #define MinSizeOfXactCommit offsetof(xl_xact_commit, children) + + + typedef struct xl_xact_commit_with_info + { + TimestampTz xact_time; /* time of commit */ uint32 xinfo; /* info flags */ int nrels; /* number of RelFileNodes */ int nsubxacts; /* number of subtransaction XIDs */ *************** typedef struct xl_xact_commit *** 129,137 **** RelFileNode xnodes[1]; /* VARIABLE LENGTH ARRAY */ /* ARRAY OF COMMITTED SUBTRANSACTION XIDs FOLLOWS */ /* ARRAY OF SHARED INVALIDATION MESSAGES FOLLOWS */ ! } xl_xact_commit; ! #define MinSizeOfXactCommit offsetof(xl_xact_commit, xnodes) /* * These flags are set in the xinfo fields of WAL commit records, --- 141,149 ---- RelFileNode xnodes[1]; /* VARIABLE LENGTH ARRAY */ /* ARRAY OF COMMITTED SUBTRANSACTION XIDs FOLLOWS */ /* ARRAY OF SHARED INVALIDATION MESSAGES FOLLOWS */ ! } xl_xact_commit_with_info; ! #define MinSizeOfXactCommitWithInfo offsetof(xl_xact_commit_with_info, xnodes) /* * These flags are set in the xinfo fields of WAL commit records, *************** typedef struct xl_xact_commit *** 145,152 **** #define XACT_COMPLETION_FORCE_SYNC_COMMIT 0x02 /* Access macros for above flags */ ! #define XactCompletionRelcacheInitFileInval(xlrec) ((xlrec)->xinfo & XACT_COMPLETION_UPDATE_RELCACHE_FILE) ! #define XactCompletionForceSyncCommit(xlrec) ((xlrec)->xinfo & XACT_COMPLETION_FORCE_SYNC_COMMIT) typedef struct xl_xact_abort { --- 157,164 ---- #define XACT_COMPLETION_FORCE_SYNC_COMMIT 0x02 /* Access macros for above flags */ ! #define XactCompletionRelcacheInitFileInval(xinfo) (xinfo & XACT_COMPLETION_UPDATE_RELCACHE_FILE) ! #define XactCompletionForceSyncCommit(xinfo) (xinfo & XACT_COMPLETION_FORCE_SYNC_COMMIT) typedef struct xl_xact_abort { *************** typedef struct xl_xact_abort *** 172,178 **** typedef struct xl_xact_commit_prepared { TransactionId xid; /* XID of prepared xact */ ! xl_xact_commit crec; /* COMMIT record */ /* MORE DATA FOLLOWS AT END OF STRUCT */ } xl_xact_commit_prepared; --- 184,190 ---- typedef struct xl_xact_commit_prepared { TransactionId xid; /* XID of prepared xact */ ! xl_xact_commit_with_info crec; /* COMMIT record */ /* MORE DATA FOLLOWS AT END OF STRUCT */ } xl_xact_commit_prepared;