Speedup twophase transactions
Hello.
While working with cluster stuff (DTM, tsDTM) we noted that postgres 2pc transactions is approximately two times slower than an ordinary commit on workload with fast transactions — few single-row updates and COMMIT or PREPARE/COMMIT. Perf top showed that a lot of time is spent in kernel on fopen/fclose, so it worth a try to reduce file operations with 2pc tx.
Now 2PC in postgres does following:
* on prepare 2pc data (subxacts, commitrels, abortrels, invalmsgs) saved to xlog and to file, but file not is not fsynced
* on commit backend reads data from file
* if checkpoint occurs before commit, then files are fsynced during checkpoint
* if case of crash replay will move data from xlog to files
In this patch I’ve changed this procedures to following:
* on prepare backend writes data only to xlog and store pointer to the start of the xlog record
* if commit occurs before checkpoint then backend reads data from xlog by this pointer
* on checkpoint 2pc data copied to files and fsynced
* if commit happens after checkpoint then backend reads files
* in case of crash replay will move data from xlog to files (as it was before patch)
Most of that ideas was already mentioned in 2009 thread by Michael Paquier /messages/by-id/c64c5f8b0908062031k3ff48428j824a9a46f28180ac@mail.gmail.com where he suggested to store 2pc data in shared memory.
At that time patch was declined because no significant speedup were observed. Now I see performance improvements by my patch at about 60%. Probably old benchmark overall tps was lower and it was harder to hit filesystem fopen/fclose limits.
Now results of benchmark are following (dual 6-core xeon server):
Current master without 2PC: ~42 ktps
Current master with 2PC: ~22 ktps
Current master with 2PC: ~36 ktps
Benchmark done with following script:
\set naccounts 100000 * :scale
\setrandom from_aid 1 :naccounts
\setrandom to_aid 1 :naccounts
\setrandom delta 1 100
\set scale :scale+1
BEGIN;
UPDATE pgbench_accounts SET abalance = abalance - :delta WHERE aid = :from_aid;
UPDATE pgbench_accounts SET abalance = abalance + :delta WHERE aid = :to_aid;
PREPARE TRANSACTION ':client_id.:scale';
COMMIT PREPARED ':client_id.:scale';
Attachments:
2pc_xlog.diffapplication/octet-stream; name=2pc_xlog.diffDownload
diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c
index 8c47e0f..b1c36e3 100644
--- a/src/backend/access/transam/twophase.c
+++ b/src/backend/access/transam/twophase.c
@@ -51,6 +51,7 @@
#include "access/xlog.h"
#include "access/xloginsert.h"
#include "access/xlogutils.h"
+#include "access/xlogreader.h"
#include "catalog/pg_type.h"
#include "catalog/storage.h"
#include "funcapi.h"
@@ -60,6 +61,7 @@
#include "replication/origin.h"
#include "replication/syncrep.h"
#include "replication/walsender.h"
+#include "replication/logicalfuncs.h"
#include "storage/fd.h"
#include "storage/ipc.h"
#include "storage/predicate.h"
@@ -117,7 +119,11 @@ typedef struct GlobalTransactionData
int pgprocno; /* ID of associated dummy PGPROC */
BackendId dummyBackendId; /* similar to backend id for backends */
TimestampTz prepared_at; /* time of preparation */
- XLogRecPtr prepare_lsn; /* XLOG offset of prepare record */
+ XLogRecPtr prepare_lsn; /* XLOG offset of prepare record end */
+ XLogRecPtr prepare_xlogptr; /* XLOG offset of prepare record start
+ * or NULL if twophase data moved to file
+ * after checkpoint.
+ */
Oid owner; /* ID of user that executed the xact */
BackendId locking_backend; /* backend currently working on the xact */
bool valid; /* TRUE if PGPROC entry is in proc array */
@@ -166,6 +172,7 @@ static void ProcessRecords(char *bufptr, TransactionId xid,
const TwoPhaseCallback callbacks[]);
static void RemoveGXact(GlobalTransaction gxact);
+static void XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len);
/*
* Initialization of shared memory
@@ -400,6 +407,7 @@ MarkAsPreparing(TransactionId xid, const char *gid,
gxact->prepared_at = prepared_at;
/* initialize LSN to 0 (start of WAL) */
gxact->prepare_lsn = 0;
+ gxact->prepare_xlogptr = 0;
gxact->owner = owner;
gxact->locking_backend = MyBackendId;
gxact->valid = false;
@@ -579,41 +587,6 @@ RemoveGXact(GlobalTransaction gxact)
}
/*
- * TransactionIdIsPrepared
- * True iff transaction associated with the identifier is prepared
- * for two-phase commit
- *
- * Note: only gxacts marked "valid" are considered; but notice we do not
- * check the locking status.
- *
- * This is not currently exported, because it is only needed internally.
- */
-static bool
-TransactionIdIsPrepared(TransactionId xid)
-{
- bool result = false;
- int i;
-
- LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
-
- for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
- {
- GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
- PGXACT *pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
-
- if (gxact->valid && pgxact->xid == xid)
- {
- result = true;
- break;
- }
- }
-
- LWLockRelease(TwoPhaseStateLock);
-
- return result;
-}
-
-/*
* Returns an array of all prepared transactions for the user-level
* function pg_prepared_xact.
*
@@ -1020,14 +993,8 @@ StartPrepare(GlobalTransaction gxact)
void
EndPrepare(GlobalTransaction gxact)
{
- PGXACT *pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
- TransactionId xid = pgxact->xid;
TwoPhaseFileHeader *hdr;
- char path[MAXPGPATH];
StateFileChunk *record;
- pg_crc32c statefile_crc;
- pg_crc32c bogus_crc;
- int fd;
/* Add the end sentinel to the list of 2PC records */
RegisterTwoPhaseRecord(TWOPHASE_RM_END_ID, 0,
@@ -1048,70 +1015,7 @@ EndPrepare(GlobalTransaction gxact)
errmsg("two-phase state file maximum length exceeded")));
/*
- * Create the 2PC state file.
- */
- TwoPhaseFilePath(path, xid);
-
- fd = OpenTransientFile(path,
- O_CREAT | O_EXCL | O_WRONLY | PG_BINARY,
- S_IRUSR | S_IWUSR);
- if (fd < 0)
- ereport(ERROR,
- (errcode_for_file_access(),
- errmsg("could not create two-phase state file \"%s\": %m",
- path)));
-
- /* Write data to file, and calculate CRC as we pass over it */
- INIT_CRC32C(statefile_crc);
-
- for (record = records.head; record != NULL; record = record->next)
- {
- COMP_CRC32C(statefile_crc, record->data, record->len);
- if ((write(fd, record->data, record->len)) != record->len)
- {
- CloseTransientFile(fd);
- ereport(ERROR,
- (errcode_for_file_access(),
- errmsg("could not write two-phase state file: %m")));
- }
- }
-
- FIN_CRC32C(statefile_crc);
-
- /*
- * Write a deliberately bogus CRC to the state file; this is just paranoia
- * to catch the case where four more bytes will run us out of disk space.
- */
- bogus_crc = ~statefile_crc;
-
- if ((write(fd, &bogus_crc, sizeof(pg_crc32c))) != sizeof(pg_crc32c))
- {
- CloseTransientFile(fd);
- ereport(ERROR,
- (errcode_for_file_access(),
- errmsg("could not write two-phase state file: %m")));
- }
-
- /* Back up to prepare for rewriting the CRC */
- if (lseek(fd, -((off_t) sizeof(pg_crc32c)), SEEK_CUR) < 0)
- {
- CloseTransientFile(fd);
- ereport(ERROR,
- (errcode_for_file_access(),
- errmsg("could not seek in two-phase state file: %m")));
- }
-
- /*
- * The state file isn't valid yet, because we haven't written the correct
- * CRC yet. Before we do that, insert entry in WAL and flush it to disk.
- *
- * Between the time we have written the WAL entry and the time we write
- * out the correct state file CRC, we have an inconsistency: the xact is
- * prepared according to WAL but not according to our on-disk state. We
- * use a critical section to force a PANIC if we are unable to complete
- * the write --- then, WAL replay should repair the inconsistency. The
- * odds of a PANIC actually occurring should be very tiny given that we
- * were able to write the bogus CRC above.
+ * Now writing 2PC state data to WAL.
*
* We have to set delayChkpt here, too; otherwise a checkpoint starting
* immediately after the WAL record is inserted could complete without
@@ -1136,19 +1040,8 @@ EndPrepare(GlobalTransaction gxact)
/* If we crash now, we have prepared: WAL replay will fix things */
- /* write correct CRC and close file */
- if ((write(fd, &statefile_crc, sizeof(pg_crc32c))) != sizeof(pg_crc32c))
- {
- CloseTransientFile(fd);
- ereport(ERROR,
- (errcode_for_file_access(),
- errmsg("could not write two-phase state file: %m")));
- }
-
- if (CloseTransientFile(fd) != 0)
- ereport(ERROR,
- (errcode_for_file_access(),
- errmsg("could not close two-phase state file: %m")));
+ /* Store record's start location to read that later on Commit */
+ gxact->prepare_xlogptr = ProcLastRecPtr;
/*
* Mark the prepared transaction as valid. As soon as xact.c marks
@@ -1315,6 +1208,36 @@ ReadTwoPhaseFile(TransactionId xid, bool give_warnings)
return buf;
}
+
+/*
+ * Reads 2PC data from xlog. During checkpoint this data will be moved to
+ * twophase files and ReadTwoPhaseFile should be used instead.
+ */
+static void
+XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len)
+{
+ XLogRecord *record;
+ XLogReaderState *xlogreader;
+ char *errormsg;
+
+ xlogreader = XLogReaderAllocate(&logical_read_local_xlog_page, NULL);
+ if (xlogreader == NULL)
+ elog(ERROR, "failed to open xlogreader for reading 2PC data");
+
+ record = XLogReadRecord(xlogreader, lsn, &errormsg);
+ if (record == NULL)
+ elog(ERROR, "failed to read 2PC record from xlog");
+
+ if (len != NULL)
+ *len = XLogRecGetDataLen(xlogreader);
+
+ *buf = palloc(sizeof(char)*XLogRecGetDataLen(xlogreader));
+ memcpy(*buf, XLogRecGetData(xlogreader), sizeof(char)*XLogRecGetDataLen(xlogreader));
+
+ XLogReaderFree(xlogreader);
+}
+
+
/*
* Confirms an xid is prepared, during recovery
*/
@@ -1364,6 +1287,7 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
int ndelrels;
SharedInvalidationMessage *invalmsgs;
int i;
+ bool file_used = false;
/*
* Validate the GID, and lock the GXACT to ensure that two backends do not
@@ -1375,14 +1299,20 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
xid = pgxact->xid;
/*
- * Read and validate the state file
+ * Read and validate 2PC state data.
+ * State data can be stored in xlog or in files after xlog checkpoint.
+ * While checkpointing we set gxact->prepare_lsn to NULL to signalize
+ * that 2PC data is moved to files.
*/
- buf = ReadTwoPhaseFile(xid, true);
- if (buf == NULL)
- ereport(ERROR,
- (errcode(ERRCODE_DATA_CORRUPTED),
- errmsg("two-phase state file for transaction %u is corrupt",
- xid)));
+ if (gxact->prepare_lsn)
+ {
+ XlogReadTwoPhaseData(gxact->prepare_xlogptr, &buf, NULL);
+ }
+ else
+ {
+ buf = ReadTwoPhaseFile(xid, true);
+ file_used = true;
+ }
/*
* Disassemble the header area
@@ -1484,7 +1414,8 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
/*
* And now we can clean up our mess.
*/
- RemoveTwoPhaseFile(xid, true);
+ if (file_used)
+ RemoveTwoPhaseFile(xid, true);
RemoveGXact(gxact);
MyLockedGxact = NULL;
@@ -1539,7 +1470,8 @@ RemoveTwoPhaseFile(TransactionId xid, bool giveWarning)
}
/*
- * Recreates a state file. This is used in WAL replay.
+ * Recreates a state file. This is used in WAL replay and during
+ * checkpoint creation.
*
* Note: content and len don't include CRC.
*/
@@ -1620,85 +1552,37 @@ RecreateTwoPhaseFile(TransactionId xid, void *content, int len)
void
CheckPointTwoPhase(XLogRecPtr redo_horizon)
{
- TransactionId *xids;
- int nxids;
- char path[MAXPGPATH];
int i;
+ int len;
+ char *buf;
- /*
- * We don't want to hold the TwoPhaseStateLock while doing I/O, so we grab
- * it just long enough to make a list of the XIDs that require fsyncing,
- * and then do the I/O afterwards.
- *
- * This approach creates a race condition: someone else could delete a
- * GXACT between the time we release TwoPhaseStateLock and the time we try
- * to open its state file. We handle this by special-casing ENOENT
- * failures: if we see that, we verify that the GXACT is no longer valid,
- * and if so ignore the failure.
- */
if (max_prepared_xacts <= 0)
return; /* nothing to do */
TRACE_POSTGRESQL_TWOPHASE_CHECKPOINT_START();
- xids = (TransactionId *) palloc(max_prepared_xacts * sizeof(TransactionId));
- nxids = 0;
-
+ /*
+ * Here we doing whole I/O while holding TwoPhaseStateLock.
+ * It's also possible to move I/O out of the lock, but on
+ * every error we should check whether somebody commited our
+ * transaction in different backend. Let's leave this optimisation
+ * for future, if somebody will spot that this place cause
+ * bottleneck.
+ */
LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
-
for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
{
GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
PGXACT *pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
- if (gxact->valid &&
- gxact->prepare_lsn <= redo_horizon)
- xids[nxids++] = pgxact->xid;
- }
-
- LWLockRelease(TwoPhaseStateLock);
-
- for (i = 0; i < nxids; i++)
- {
- TransactionId xid = xids[i];
- int fd;
-
- TwoPhaseFilePath(path, xid);
-
- fd = OpenTransientFile(path, O_RDWR | PG_BINARY, 0);
- if (fd < 0)
- {
- if (errno == ENOENT)
- {
- /* OK if gxact is no longer valid */
- if (!TransactionIdIsPrepared(xid))
- continue;
- /* Restore errno in case it was changed */
- errno = ENOENT;
- }
- ereport(ERROR,
- (errcode_for_file_access(),
- errmsg("could not open two-phase state file \"%s\": %m",
- path)));
- }
-
- if (pg_fsync(fd) != 0)
- {
- CloseTransientFile(fd);
- ereport(ERROR,
- (errcode_for_file_access(),
- errmsg("could not fsync two-phase state file \"%s\": %m",
- path)));
+ if (gxact->valid && gxact->prepare_lsn && gxact->prepare_lsn <= redo_horizon){
+ XlogReadTwoPhaseData(gxact->prepare_xlogptr, &buf, &len);
+ RecreateTwoPhaseFile(pgxact->xid, buf, len);
+ gxact->prepare_lsn = (XLogRecPtr) NULL;
+ pfree(buf);
}
-
- if (CloseTransientFile(fd) != 0)
- ereport(ERROR,
- (errcode_for_file_access(),
- errmsg("could not close two-phase state file \"%s\": %m",
- path)));
}
-
- pfree(xids);
+ LWLockRelease(TwoPhaseStateLock);
TRACE_POSTGRESQL_TWOPHASE_CHECKPOINT_DONE();
}
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 86debf4..3683785 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -321,8 +321,7 @@ static TimeLineID curFileTLI;
* stored here. The parallel leader advances its own copy, when necessary,
* in WaitForParallelWorkersToFinish.
*/
-static XLogRecPtr ProcLastRecPtr = InvalidXLogRecPtr;
-
+XLogRecPtr ProcLastRecPtr = InvalidXLogRecPtr;
XLogRecPtr XactLastRecEnd = InvalidXLogRecPtr;
XLogRecPtr XactLastCommitEnd = InvalidXLogRecPtr;
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index 790ca66..a6d04cc 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -86,6 +86,7 @@ typedef enum
RECOVERY_TARGET_IMMEDIATE
} RecoveryTargetType;
+extern XLogRecPtr ProcLastRecPtr;
extern XLogRecPtr XactLastRecEnd;
extern PGDLLIMPORT XLogRecPtr XactLastCommitEnd;
On Wed, Dec 9, 2015 at 12:44 PM, Stas Kelvich <s.kelvich@postgrespro.ru> wrote:
Now 2PC in postgres does following:
* on prepare 2pc data (subxacts, commitrels, abortrels, invalmsgs) saved to xlog and to file, but file not is not fsynced
* on commit backend reads data from file
* if checkpoint occurs before commit, then files are fsynced during checkpoint
* if case of crash replay will move data from xlog to filesIn this patch I’ve changed this procedures to following:
* on prepare backend writes data only to xlog and store pointer to the start of the xlog record
* if commit occurs before checkpoint then backend reads data from xlog by this pointer
* on checkpoint 2pc data copied to files and fsynced
* if commit happens after checkpoint then backend reads files
* in case of crash replay will move data from xlog to files (as it was before patch)
That sounds like a very good plan to me.
Now results of benchmark are following (dual 6-core xeon server):
Current master without 2PC: ~42 ktps
Current master with 2PC: ~22 ktps
Current master with 2PC: ~36 ktps
I assume that last one should have been *Patched master with 2PC"?
Please add this to the January CommitFest.
--
Kevin Grittner
EDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
Thanks, Kevin.
I assume that last one should have been *Patched master with 2PC”?
Yes, this list should look like this:
Current master without 2PC: ~42 ktps
Current master with 2PC: ~22 ktps
Patched master with 2PC: ~36 ktps
And created CommitFest entry for this patch.
--
Stas Kelvich
Postgres Professional: http://www.postgrespro.com
The Russian Postgres Company
On 10 Dec 2015, at 00:37, Kevin Grittner <kgrittn@gmail.com> wrote:
On Wed, Dec 9, 2015 at 12:44 PM, Stas Kelvich <s.kelvich@postgrespro.ru> wrote:
Now 2PC in postgres does following:
* on prepare 2pc data (subxacts, commitrels, abortrels, invalmsgs) saved to xlog and to file, but file not is not fsynced
* on commit backend reads data from file
* if checkpoint occurs before commit, then files are fsynced during checkpoint
* if case of crash replay will move data from xlog to filesIn this patch I’ve changed this procedures to following:
* on prepare backend writes data only to xlog and store pointer to the start of the xlog record
* if commit occurs before checkpoint then backend reads data from xlog by this pointer
* on checkpoint 2pc data copied to files and fsynced
* if commit happens after checkpoint then backend reads files
* in case of crash replay will move data from xlog to files (as it was before patch)That sounds like a very good plan to me.
Now results of benchmark are following (dual 6-core xeon server):
Current master without 2PC: ~42 ktps
Current master with 2PC: ~22 ktps
Current master with 2PC: ~36 ktpsI assume that last one should have been *Patched master with 2PC"?
Please add this to the January CommitFest.
--
Kevin Grittner
EDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On Thu, Dec 10, 2015 at 3:44 AM, Stas Kelvich <s.kelvich@postgrespro.ru> wrote:
Most of that ideas was already mentioned in 2009 thread by Michael Paquier /messages/by-id/c64c5f8b0908062031k3ff48428j824a9a46f28180ac@mail.gmail.com where he suggested to store 2pc data in shared memory.
At that time patch was declined because no significant speedup were observed. Now I see performance improvements by my patch at about 60%. Probably old benchmark overall tps was lower and it was harder to hit filesystem fopen/fclose limits.
Glad to see this patch is given a second life 6 years later.
Now results of benchmark are following (dual 6-core xeon server):
Current master without 2PC: ~42 ktps
Current master with 2PC: ~22 ktps
Current master with 2PC: ~36 ktps
That's nice.
+ XLogRecPtr prepare_xlogptr; /* XLOG offset of prepare record start
+ * or NULL if twophase data moved to file
+ * after checkpoint.
+ */
This has better be InvalidXLogRecPtr if unused.
+ if (gxact->prepare_lsn)
+ {
+ XlogReadTwoPhaseData(gxact->prepare_xlogptr, &buf, NULL);
+ }
Perhaps you mean prepare_xlogptr here?
--
Michael
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On Wed, Dec 9, 2015 at 10:44 AM, Stas Kelvich <s.kelvich@postgrespro.ru> wrote:
Hello.
While working with cluster stuff (DTM, tsDTM) we noted that postgres 2pc transactions is approximately two times slower than an ordinary commit on workload with fast transactions — few single-row updates and COMMIT or PREPARE/COMMIT. Perf top showed that a lot of time is spent in kernel on fopen/fclose, so it worth a try to reduce file operations with 2pc tx.
I've tested this through my testing harness which forces the database
to go through endless runs of crash recovery and checks for
consistency, and so far it has survived perfectly.
...
Now results of benchmark are following (dual 6-core xeon server):
Current master without 2PC: ~42 ktps
Current master with 2PC: ~22 ktps
Current master with 2PC: ~36 ktps
Can you give the full command line? -j, -c, etc.
Benchmark done with following script:
\set naccounts 100000 * :scale
\setrandom from_aid 1 :naccounts
\setrandom to_aid 1 :naccounts
\setrandom delta 1 100
\set scale :scale+1
Why are you incrementing :scale ?
I very rapidly reach a point where most of the updates are against
tuples that don't exist, and then get integer overflow problems.
BEGIN;
UPDATE pgbench_accounts SET abalance = abalance - :delta WHERE aid = :from_aid;
UPDATE pgbench_accounts SET abalance = abalance + :delta WHERE aid = :to_aid;
PREPARE TRANSACTION ':client_id.:scale';
COMMIT PREPARED ':client_id.:scale';
Cheers,
Jeff
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
Michael, Jeff thanks for reviewing and testing.
On 10 Dec 2015, at 02:16, Michael Paquier <michael.paquier@gmail.com> wrote:
This has better be InvalidXLogRecPtr if unused.
Yes, that’s better. Changed.
On 10 Dec 2015, at 02:16, Michael Paquier <michael.paquier@gmail.com> wrote:
+ if (gxact->prepare_lsn) + { + XlogReadTwoPhaseData(gxact->prepare_xlogptr, &buf, NULL); + } Perhaps you mean prepare_xlogptr here?
Yes, my bad. But funnily I have this error even number of times: code in CheckPointTwoPhase also uses prepare_lsn instead of xlogptr, so overall this was working well, that’s why it survived my own tests and probably Jeff’s tests.
I think that’s a bad variable naming, for example because lsn in pg_xlogdump points to start of the record, but here start used as xloptr and end as lsn.
So changed both variables to prepare_start_lsn and prepare_end_lsn.
On 10 Dec 2015, at 09:48, Jeff Janes <jeff.janes@gmail.com> wrote:
I've tested this through my testing harness which forces the database
to go through endless runs of crash recovery and checks for
consistency, and so far it has survived perfectly.
Cool! I think that patch is most vulnerable to following type of workload: prepare transaction, do a lot of stuff with database to force checkpoints (or even recovery cycles), and commit it.
On 10 Dec 2015, at 09:48, Jeff Janes <jeff.janes@gmail.com> wrote:
Can you give the full command line? -j, -c, etc.
pgbench -h testhost -i && pgbench -h testhost -f 2pc.pgb -T 300 -P 1 -c 64 -j 16 -r
where 2pc.pgb as in previous message.
Also all this applies to hosts with uniform memory. I tried to run patched postgres on NUMA with 60 physical cores and patch didn’t change anything. Perf top shows that main bottleneck is access to gxact, but on ordinary host with 1/2 cpu’s that access even not in top ten heaviest routines.
On 10 Dec 2015, at 09:48, Jeff Janes <jeff.janes@gmail.com> wrote:
Why are you incrementing :scale ?
That’s a funny part, overall 2pc speed depends on how you will name your prepared transaction. Concretely I tried to use random numbers for gid’s and it was slower than having constantly incrementing gid. Probably that happens due to linear search by gid in gxact array on commit. So I used :scale just as a counter, bacause it is initialised on pgbench start and line like “\set scale :scale+1” works well. (may be there is a different way to do it in pgbench).
I very rapidly reach a point where most of the updates are against
tuples that don't exist, and then get integer overflow problems.
Hmm, that’s strange. Probably you set scale to big value, so that 100000*:scale is bigger that int4? But i thought that pgbench will change aid columns to bigint if scale is more than 20000.
Attachments:
2pc_xlog.v2.diffapplication/octet-stream; name=2pc_xlog.v2.diffDownload
diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c
index 8c47e0f..e9a1ff5 100644
--- a/src/backend/access/transam/twophase.c
+++ b/src/backend/access/transam/twophase.c
@@ -51,6 +51,7 @@
#include "access/xlog.h"
#include "access/xloginsert.h"
#include "access/xlogutils.h"
+#include "access/xlogreader.h"
#include "catalog/pg_type.h"
#include "catalog/storage.h"
#include "funcapi.h"
@@ -60,6 +61,7 @@
#include "replication/origin.h"
#include "replication/syncrep.h"
#include "replication/walsender.h"
+#include "replication/logicalfuncs.h"
#include "storage/fd.h"
#include "storage/ipc.h"
#include "storage/predicate.h"
@@ -117,7 +119,11 @@ typedef struct GlobalTransactionData
int pgprocno; /* ID of associated dummy PGPROC */
BackendId dummyBackendId; /* similar to backend id for backends */
TimestampTz prepared_at; /* time of preparation */
- XLogRecPtr prepare_lsn; /* XLOG offset of prepare record */
+ XLogRecPtr prepare_start_lsn; /* XLOG offset of prepare record start
+ * or InvalidXLogRecPtr if twophase data
+ * moved to file after checkpoint.
+ */
+ XLogRecPtr prepare_end_lsn; /* XLOG offset of prepare record end */
Oid owner; /* ID of user that executed the xact */
BackendId locking_backend; /* backend currently working on the xact */
bool valid; /* TRUE if PGPROC entry is in proc array */
@@ -166,6 +172,7 @@ static void ProcessRecords(char *bufptr, TransactionId xid,
const TwoPhaseCallback callbacks[]);
static void RemoveGXact(GlobalTransaction gxact);
+static void XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len);
/*
* Initialization of shared memory
@@ -398,8 +405,9 @@ MarkAsPreparing(TransactionId xid, const char *gid,
pgxact->nxids = 0;
gxact->prepared_at = prepared_at;
- /* initialize LSN to 0 (start of WAL) */
- gxact->prepare_lsn = 0;
+ /* initialize LSN to InvalidXLogRecPtr */
+ gxact->prepare_start_lsn = InvalidXLogRecPtr;
+ gxact->prepare_end_lsn = InvalidXLogRecPtr;
gxact->owner = owner;
gxact->locking_backend = MyBackendId;
gxact->valid = false;
@@ -579,41 +587,6 @@ RemoveGXact(GlobalTransaction gxact)
}
/*
- * TransactionIdIsPrepared
- * True iff transaction associated with the identifier is prepared
- * for two-phase commit
- *
- * Note: only gxacts marked "valid" are considered; but notice we do not
- * check the locking status.
- *
- * This is not currently exported, because it is only needed internally.
- */
-static bool
-TransactionIdIsPrepared(TransactionId xid)
-{
- bool result = false;
- int i;
-
- LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
-
- for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
- {
- GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
- PGXACT *pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
-
- if (gxact->valid && pgxact->xid == xid)
- {
- result = true;
- break;
- }
- }
-
- LWLockRelease(TwoPhaseStateLock);
-
- return result;
-}
-
-/*
* Returns an array of all prepared transactions for the user-level
* function pg_prepared_xact.
*
@@ -1020,14 +993,8 @@ StartPrepare(GlobalTransaction gxact)
void
EndPrepare(GlobalTransaction gxact)
{
- PGXACT *pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
- TransactionId xid = pgxact->xid;
TwoPhaseFileHeader *hdr;
- char path[MAXPGPATH];
StateFileChunk *record;
- pg_crc32c statefile_crc;
- pg_crc32c bogus_crc;
- int fd;
/* Add the end sentinel to the list of 2PC records */
RegisterTwoPhaseRecord(TWOPHASE_RM_END_ID, 0,
@@ -1048,70 +1015,7 @@ EndPrepare(GlobalTransaction gxact)
errmsg("two-phase state file maximum length exceeded")));
/*
- * Create the 2PC state file.
- */
- TwoPhaseFilePath(path, xid);
-
- fd = OpenTransientFile(path,
- O_CREAT | O_EXCL | O_WRONLY | PG_BINARY,
- S_IRUSR | S_IWUSR);
- if (fd < 0)
- ereport(ERROR,
- (errcode_for_file_access(),
- errmsg("could not create two-phase state file \"%s\": %m",
- path)));
-
- /* Write data to file, and calculate CRC as we pass over it */
- INIT_CRC32C(statefile_crc);
-
- for (record = records.head; record != NULL; record = record->next)
- {
- COMP_CRC32C(statefile_crc, record->data, record->len);
- if ((write(fd, record->data, record->len)) != record->len)
- {
- CloseTransientFile(fd);
- ereport(ERROR,
- (errcode_for_file_access(),
- errmsg("could not write two-phase state file: %m")));
- }
- }
-
- FIN_CRC32C(statefile_crc);
-
- /*
- * Write a deliberately bogus CRC to the state file; this is just paranoia
- * to catch the case where four more bytes will run us out of disk space.
- */
- bogus_crc = ~statefile_crc;
-
- if ((write(fd, &bogus_crc, sizeof(pg_crc32c))) != sizeof(pg_crc32c))
- {
- CloseTransientFile(fd);
- ereport(ERROR,
- (errcode_for_file_access(),
- errmsg("could not write two-phase state file: %m")));
- }
-
- /* Back up to prepare for rewriting the CRC */
- if (lseek(fd, -((off_t) sizeof(pg_crc32c)), SEEK_CUR) < 0)
- {
- CloseTransientFile(fd);
- ereport(ERROR,
- (errcode_for_file_access(),
- errmsg("could not seek in two-phase state file: %m")));
- }
-
- /*
- * The state file isn't valid yet, because we haven't written the correct
- * CRC yet. Before we do that, insert entry in WAL and flush it to disk.
- *
- * Between the time we have written the WAL entry and the time we write
- * out the correct state file CRC, we have an inconsistency: the xact is
- * prepared according to WAL but not according to our on-disk state. We
- * use a critical section to force a PANIC if we are unable to complete
- * the write --- then, WAL replay should repair the inconsistency. The
- * odds of a PANIC actually occurring should be very tiny given that we
- * were able to write the bogus CRC above.
+ * Now writing 2PC state data to WAL.
*
* We have to set delayChkpt here, too; otherwise a checkpoint starting
* immediately after the WAL record is inserted could complete without
@@ -1131,24 +1035,13 @@ EndPrepare(GlobalTransaction gxact)
XLogBeginInsert();
for (record = records.head; record != NULL; record = record->next)
XLogRegisterData(record->data, record->len);
- gxact->prepare_lsn = XLogInsert(RM_XACT_ID, XLOG_XACT_PREPARE);
- XLogFlush(gxact->prepare_lsn);
+ gxact->prepare_end_lsn = XLogInsert(RM_XACT_ID, XLOG_XACT_PREPARE);
+ XLogFlush(gxact->prepare_end_lsn);
/* If we crash now, we have prepared: WAL replay will fix things */
- /* write correct CRC and close file */
- if ((write(fd, &statefile_crc, sizeof(pg_crc32c))) != sizeof(pg_crc32c))
- {
- CloseTransientFile(fd);
- ereport(ERROR,
- (errcode_for_file_access(),
- errmsg("could not write two-phase state file: %m")));
- }
-
- if (CloseTransientFile(fd) != 0)
- ereport(ERROR,
- (errcode_for_file_access(),
- errmsg("could not close two-phase state file: %m")));
+ /* Store record's start location to read that later on Commit */
+ gxact->prepare_start_lsn = ProcLastRecPtr;
/*
* Mark the prepared transaction as valid. As soon as xact.c marks
@@ -1186,7 +1079,7 @@ EndPrepare(GlobalTransaction gxact)
* Note that at this stage we have marked the prepare, but still show as
* running in the procarray (twice!) and continue to hold locks.
*/
- SyncRepWaitForLSN(gxact->prepare_lsn);
+ SyncRepWaitForLSN(gxact->prepare_end_lsn);
records.tail = records.head = NULL;
records.num_chunks = 0;
@@ -1315,6 +1208,36 @@ ReadTwoPhaseFile(TransactionId xid, bool give_warnings)
return buf;
}
+
+/*
+ * Reads 2PC data from xlog. During checkpoint this data will be moved to
+ * twophase files and ReadTwoPhaseFile should be used instead.
+ */
+static void
+XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len)
+{
+ XLogRecord *record;
+ XLogReaderState *xlogreader;
+ char *errormsg;
+
+ xlogreader = XLogReaderAllocate(&logical_read_local_xlog_page, NULL);
+ if (xlogreader == NULL)
+ elog(ERROR, "failed to open xlogreader for reading 2PC data");
+
+ record = XLogReadRecord(xlogreader, lsn, &errormsg);
+ if (record == NULL)
+ elog(ERROR, "failed to read 2PC record from xlog");
+
+ if (len != NULL)
+ *len = XLogRecGetDataLen(xlogreader);
+
+ *buf = palloc(sizeof(char)*XLogRecGetDataLen(xlogreader));
+ memcpy(*buf, XLogRecGetData(xlogreader), sizeof(char)*XLogRecGetDataLen(xlogreader));
+
+ XLogReaderFree(xlogreader);
+}
+
+
/*
* Confirms an xid is prepared, during recovery
*/
@@ -1364,6 +1287,7 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
int ndelrels;
SharedInvalidationMessage *invalmsgs;
int i;
+ bool file_used = false;
/*
* Validate the GID, and lock the GXACT to ensure that two backends do not
@@ -1375,14 +1299,20 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
xid = pgxact->xid;
/*
- * Read and validate the state file
+ * Read and validate 2PC state data.
+ * State data can be stored in xlog or in files after xlog checkpoint.
+ * While checkpointing we set gxact->prepare_start_lsn to NULL to signalize
+ * that 2PC data is moved to files.
*/
- buf = ReadTwoPhaseFile(xid, true);
- if (buf == NULL)
- ereport(ERROR,
- (errcode(ERRCODE_DATA_CORRUPTED),
- errmsg("two-phase state file for transaction %u is corrupt",
- xid)));
+ if (gxact->prepare_start_lsn)
+ {
+ XlogReadTwoPhaseData(gxact->prepare_start_lsn, &buf, NULL);
+ }
+ else
+ {
+ buf = ReadTwoPhaseFile(xid, true);
+ file_used = true;
+ }
/*
* Disassemble the header area
@@ -1484,7 +1414,8 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
/*
* And now we can clean up our mess.
*/
- RemoveTwoPhaseFile(xid, true);
+ if (file_used)
+ RemoveTwoPhaseFile(xid, true);
RemoveGXact(gxact);
MyLockedGxact = NULL;
@@ -1539,7 +1470,8 @@ RemoveTwoPhaseFile(TransactionId xid, bool giveWarning)
}
/*
- * Recreates a state file. This is used in WAL replay.
+ * Recreates a state file. This is used in WAL replay and during
+ * checkpoint creation.
*
* Note: content and len don't include CRC.
*/
@@ -1620,85 +1552,38 @@ RecreateTwoPhaseFile(TransactionId xid, void *content, int len)
void
CheckPointTwoPhase(XLogRecPtr redo_horizon)
{
- TransactionId *xids;
- int nxids;
- char path[MAXPGPATH];
int i;
+ int len;
+ char *buf;
- /*
- * We don't want to hold the TwoPhaseStateLock while doing I/O, so we grab
- * it just long enough to make a list of the XIDs that require fsyncing,
- * and then do the I/O afterwards.
- *
- * This approach creates a race condition: someone else could delete a
- * GXACT between the time we release TwoPhaseStateLock and the time we try
- * to open its state file. We handle this by special-casing ENOENT
- * failures: if we see that, we verify that the GXACT is no longer valid,
- * and if so ignore the failure.
- */
if (max_prepared_xacts <= 0)
return; /* nothing to do */
TRACE_POSTGRESQL_TWOPHASE_CHECKPOINT_START();
- xids = (TransactionId *) palloc(max_prepared_xacts * sizeof(TransactionId));
- nxids = 0;
-
+ /*
+ * Here we doing whole I/O while holding TwoPhaseStateLock.
+ * It's also possible to move I/O out of the lock, but on
+ * every error we should check whether somebody commited our
+ * transaction in different backend. Let's leave this optimisation
+ * for future, if somebody will spot that this place cause
+ * bottleneck.
+ */
LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
-
for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
{
GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
PGXACT *pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
- if (gxact->valid &&
- gxact->prepare_lsn <= redo_horizon)
- xids[nxids++] = pgxact->xid;
- }
-
- LWLockRelease(TwoPhaseStateLock);
-
- for (i = 0; i < nxids; i++)
- {
- TransactionId xid = xids[i];
- int fd;
-
- TwoPhaseFilePath(path, xid);
-
- fd = OpenTransientFile(path, O_RDWR | PG_BINARY, 0);
- if (fd < 0)
- {
- if (errno == ENOENT)
- {
- /* OK if gxact is no longer valid */
- if (!TransactionIdIsPrepared(xid))
- continue;
- /* Restore errno in case it was changed */
- errno = ENOENT;
- }
- ereport(ERROR,
- (errcode_for_file_access(),
- errmsg("could not open two-phase state file \"%s\": %m",
- path)));
- }
-
- if (pg_fsync(fd) != 0)
- {
- CloseTransientFile(fd);
- ereport(ERROR,
- (errcode_for_file_access(),
- errmsg("could not fsync two-phase state file \"%s\": %m",
- path)));
+ if (gxact->valid && gxact->prepare_start_lsn != InvalidXLogRecPtr &&
+ gxact->prepare_end_lsn <= redo_horizon){
+ XlogReadTwoPhaseData(gxact->prepare_start_lsn, &buf, &len);
+ RecreateTwoPhaseFile(pgxact->xid, buf, len);
+ gxact->prepare_start_lsn = InvalidXLogRecPtr;
+ pfree(buf);
}
-
- if (CloseTransientFile(fd) != 0)
- ereport(ERROR,
- (errcode_for_file_access(),
- errmsg("could not close two-phase state file \"%s\": %m",
- path)));
}
-
- pfree(xids);
+ LWLockRelease(TwoPhaseStateLock);
TRACE_POSTGRESQL_TWOPHASE_CHECKPOINT_DONE();
}
@@ -2030,12 +1915,8 @@ RecoverPreparedTransactions(void)
/*
* Recreate its GXACT and dummy PGPROC
*
- * Note: since we don't have the PREPARE record's WAL location at
- * hand, we leave prepare_lsn zeroes. This means the GXACT will
- * be fsync'd on every future checkpoint. We assume this
- * situation is infrequent enough that the performance cost is
- * negligible (especially since we know the state file has already
- * been fsynced).
+ * MarkAsPreparing sets prepare_start_lsn to InvalidXLogRecPtr
+ * so next checkpoint will skip that transaction.
*/
gxact = MarkAsPreparing(xid, hdr->gid,
hdr->prepared_at,
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 86debf4..3683785 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -321,8 +321,7 @@ static TimeLineID curFileTLI;
* stored here. The parallel leader advances its own copy, when necessary,
* in WaitForParallelWorkersToFinish.
*/
-static XLogRecPtr ProcLastRecPtr = InvalidXLogRecPtr;
-
+XLogRecPtr ProcLastRecPtr = InvalidXLogRecPtr;
XLogRecPtr XactLastRecEnd = InvalidXLogRecPtr;
XLogRecPtr XactLastCommitEnd = InvalidXLogRecPtr;
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index 790ca66..a6d04cc 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -86,6 +86,7 @@ typedef enum
RECOVERY_TARGET_IMMEDIATE
} RecoveryTargetType;
+extern XLogRecPtr ProcLastRecPtr;
extern XLogRecPtr XactLastRecEnd;
extern PGDLLIMPORT XLogRecPtr XactLastCommitEnd;
On 9 December 2015 at 18:44, Stas Kelvich <s.kelvich@postgrespro.ru> wrote:
In this patch I’ve changed this procedures to following:
* on prepare backend writes data only to xlog and store pointer to the
start of the xlog record
* if commit occurs before checkpoint then backend reads data from xlog by
this pointer
* on checkpoint 2pc data copied to files and fsynced
* if commit happens after checkpoint then backend reads files
* in case of crash replay will move data from xlog to files (as it was
before patch)
This looks sound to me.
I think we could do better still, but this looks like the easiest 80% and
actually removes code.
The lack of substantial comments on the patch is a problem though - the
details above should go in the patch. I'll have a go at reworking this for
you, this time.
--
Simon Riggs http://www.2ndQuadrant.com/
<http://www.2ndquadrant.com/>
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services
Simon Riggs wrote:
I think we could do better still, but this looks like the easiest 80% and
actually removes code.The lack of substantial comments on the patch is a problem though - the
details above should go in the patch. I'll have a go at reworking this for
you, this time.
Is someone submitting an updated patch here?
--
�lvaro Herrera http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
Hi.
I’ve updated patch and wrote description of thighs that happens
with 2PC state data in the beginning of the file. I think now this patch is well documented,
but if somebody points me to places that probably requires more detailed description I’m ready
to extend that.
Attachments:
2pc_xlog.diffapplication/octet-stream; name=2pc_xlog.diffDownload
diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c
index 8c47e0f..267f99b 100644
--- a/src/backend/access/transam/twophase.c
+++ b/src/backend/access/transam/twophase.c
@@ -28,8 +28,21 @@
* In order to survive crashes and shutdowns, all prepared
* transactions must be stored in permanent storage. This includes
* locking information, pending notifications etc. All that state
- * information is written to the per-transaction state file in
- * the pg_twophase directory.
+ * information is written to the WAL and later can be moved to the
+ * per-transaction state file in the pg_twophase directory. Life track of
+ * state data is following:
+ *
+ * * On PREPARE TRANSACTION backend writes state data only to the WAL and
+ * stores pointer to the start of the WAL record in
+ * gxact->prepare_start_lsn.
+ * * If COMMIT occurs before checkpoint then backend reads data from WAL
+ * using prepare_start_lsn.
+ * * On checkpoint state data copied to files in pg_twophase directory and
+ * fsynced
+ * * If COMMIT happens after checkpoint then backend reads state data from
+ * files
+ * * In case of crash replay will move data from xlog to files, if that
+ * hasn't happened before.
*
*-------------------------------------------------------------------------
*/
@@ -51,6 +64,7 @@
#include "access/xlog.h"
#include "access/xloginsert.h"
#include "access/xlogutils.h"
+#include "access/xlogreader.h"
#include "catalog/pg_type.h"
#include "catalog/storage.h"
#include "funcapi.h"
@@ -60,6 +74,7 @@
#include "replication/origin.h"
#include "replication/syncrep.h"
#include "replication/walsender.h"
+#include "replication/logicalfuncs.h"
#include "storage/fd.h"
#include "storage/ipc.h"
#include "storage/predicate.h"
@@ -117,7 +132,11 @@ typedef struct GlobalTransactionData
int pgprocno; /* ID of associated dummy PGPROC */
BackendId dummyBackendId; /* similar to backend id for backends */
TimestampTz prepared_at; /* time of preparation */
- XLogRecPtr prepare_lsn; /* XLOG offset of prepare record */
+ XLogRecPtr prepare_start_lsn; /* XLOG offset of prepare record start
+ * or InvalidXLogRecPtr if twophase data
+ * moved to file after checkpoint.
+ */
+ XLogRecPtr prepare_end_lsn; /* XLOG offset of prepare record end */
Oid owner; /* ID of user that executed the xact */
BackendId locking_backend; /* backend currently working on the xact */
bool valid; /* TRUE if PGPROC entry is in proc array */
@@ -166,6 +185,7 @@ static void ProcessRecords(char *bufptr, TransactionId xid,
const TwoPhaseCallback callbacks[]);
static void RemoveGXact(GlobalTransaction gxact);
+static void XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len);
/*
* Initialization of shared memory
@@ -398,8 +418,9 @@ MarkAsPreparing(TransactionId xid, const char *gid,
pgxact->nxids = 0;
gxact->prepared_at = prepared_at;
- /* initialize LSN to 0 (start of WAL) */
- gxact->prepare_lsn = 0;
+ /* initialize LSN to InvalidXLogRecPtr */
+ gxact->prepare_start_lsn = InvalidXLogRecPtr;
+ gxact->prepare_end_lsn = InvalidXLogRecPtr;
gxact->owner = owner;
gxact->locking_backend = MyBackendId;
gxact->valid = false;
@@ -579,41 +600,6 @@ RemoveGXact(GlobalTransaction gxact)
}
/*
- * TransactionIdIsPrepared
- * True iff transaction associated with the identifier is prepared
- * for two-phase commit
- *
- * Note: only gxacts marked "valid" are considered; but notice we do not
- * check the locking status.
- *
- * This is not currently exported, because it is only needed internally.
- */
-static bool
-TransactionIdIsPrepared(TransactionId xid)
-{
- bool result = false;
- int i;
-
- LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
-
- for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
- {
- GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
- PGXACT *pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
-
- if (gxact->valid && pgxact->xid == xid)
- {
- result = true;
- break;
- }
- }
-
- LWLockRelease(TwoPhaseStateLock);
-
- return result;
-}
-
-/*
* Returns an array of all prepared transactions for the user-level
* function pg_prepared_xact.
*
@@ -1020,14 +1006,8 @@ StartPrepare(GlobalTransaction gxact)
void
EndPrepare(GlobalTransaction gxact)
{
- PGXACT *pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
- TransactionId xid = pgxact->xid;
TwoPhaseFileHeader *hdr;
- char path[MAXPGPATH];
StateFileChunk *record;
- pg_crc32c statefile_crc;
- pg_crc32c bogus_crc;
- int fd;
/* Add the end sentinel to the list of 2PC records */
RegisterTwoPhaseRecord(TWOPHASE_RM_END_ID, 0,
@@ -1048,70 +1028,7 @@ EndPrepare(GlobalTransaction gxact)
errmsg("two-phase state file maximum length exceeded")));
/*
- * Create the 2PC state file.
- */
- TwoPhaseFilePath(path, xid);
-
- fd = OpenTransientFile(path,
- O_CREAT | O_EXCL | O_WRONLY | PG_BINARY,
- S_IRUSR | S_IWUSR);
- if (fd < 0)
- ereport(ERROR,
- (errcode_for_file_access(),
- errmsg("could not create two-phase state file \"%s\": %m",
- path)));
-
- /* Write data to file, and calculate CRC as we pass over it */
- INIT_CRC32C(statefile_crc);
-
- for (record = records.head; record != NULL; record = record->next)
- {
- COMP_CRC32C(statefile_crc, record->data, record->len);
- if ((write(fd, record->data, record->len)) != record->len)
- {
- CloseTransientFile(fd);
- ereport(ERROR,
- (errcode_for_file_access(),
- errmsg("could not write two-phase state file: %m")));
- }
- }
-
- FIN_CRC32C(statefile_crc);
-
- /*
- * Write a deliberately bogus CRC to the state file; this is just paranoia
- * to catch the case where four more bytes will run us out of disk space.
- */
- bogus_crc = ~statefile_crc;
-
- if ((write(fd, &bogus_crc, sizeof(pg_crc32c))) != sizeof(pg_crc32c))
- {
- CloseTransientFile(fd);
- ereport(ERROR,
- (errcode_for_file_access(),
- errmsg("could not write two-phase state file: %m")));
- }
-
- /* Back up to prepare for rewriting the CRC */
- if (lseek(fd, -((off_t) sizeof(pg_crc32c)), SEEK_CUR) < 0)
- {
- CloseTransientFile(fd);
- ereport(ERROR,
- (errcode_for_file_access(),
- errmsg("could not seek in two-phase state file: %m")));
- }
-
- /*
- * The state file isn't valid yet, because we haven't written the correct
- * CRC yet. Before we do that, insert entry in WAL and flush it to disk.
- *
- * Between the time we have written the WAL entry and the time we write
- * out the correct state file CRC, we have an inconsistency: the xact is
- * prepared according to WAL but not according to our on-disk state. We
- * use a critical section to force a PANIC if we are unable to complete
- * the write --- then, WAL replay should repair the inconsistency. The
- * odds of a PANIC actually occurring should be very tiny given that we
- * were able to write the bogus CRC above.
+ * Now writing 2PC state data to WAL.
*
* We have to set delayChkpt here, too; otherwise a checkpoint starting
* immediately after the WAL record is inserted could complete without
@@ -1131,24 +1048,13 @@ EndPrepare(GlobalTransaction gxact)
XLogBeginInsert();
for (record = records.head; record != NULL; record = record->next)
XLogRegisterData(record->data, record->len);
- gxact->prepare_lsn = XLogInsert(RM_XACT_ID, XLOG_XACT_PREPARE);
- XLogFlush(gxact->prepare_lsn);
+ gxact->prepare_end_lsn = XLogInsert(RM_XACT_ID, XLOG_XACT_PREPARE);
+ XLogFlush(gxact->prepare_end_lsn);
/* If we crash now, we have prepared: WAL replay will fix things */
- /* write correct CRC and close file */
- if ((write(fd, &statefile_crc, sizeof(pg_crc32c))) != sizeof(pg_crc32c))
- {
- CloseTransientFile(fd);
- ereport(ERROR,
- (errcode_for_file_access(),
- errmsg("could not write two-phase state file: %m")));
- }
-
- if (CloseTransientFile(fd) != 0)
- ereport(ERROR,
- (errcode_for_file_access(),
- errmsg("could not close two-phase state file: %m")));
+ /* Store record's start location to read that later on Commit */
+ gxact->prepare_start_lsn = ProcLastRecPtr;
/*
* Mark the prepared transaction as valid. As soon as xact.c marks
@@ -1186,7 +1092,7 @@ EndPrepare(GlobalTransaction gxact)
* Note that at this stage we have marked the prepare, but still show as
* running in the procarray (twice!) and continue to hold locks.
*/
- SyncRepWaitForLSN(gxact->prepare_lsn);
+ SyncRepWaitForLSN(gxact->prepare_end_lsn);
records.tail = records.head = NULL;
records.num_chunks = 0;
@@ -1315,6 +1221,36 @@ ReadTwoPhaseFile(TransactionId xid, bool give_warnings)
return buf;
}
+
+/*
+ * Reads 2PC data from xlog. During checkpoint this data will be moved to
+ * twophase files and ReadTwoPhaseFile should be used instead.
+ */
+static void
+XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len)
+{
+ XLogRecord *record;
+ XLogReaderState *xlogreader;
+ char *errormsg;
+
+ xlogreader = XLogReaderAllocate(&logical_read_local_xlog_page, NULL);
+ if (xlogreader == NULL)
+ elog(ERROR, "failed to open xlogreader for reading 2PC data");
+
+ record = XLogReadRecord(xlogreader, lsn, &errormsg);
+ if (record == NULL)
+ elog(ERROR, "failed to read 2PC record from xlog");
+
+ if (len != NULL)
+ *len = XLogRecGetDataLen(xlogreader);
+
+ *buf = palloc(sizeof(char)*XLogRecGetDataLen(xlogreader));
+ memcpy(*buf, XLogRecGetData(xlogreader), sizeof(char)*XLogRecGetDataLen(xlogreader));
+
+ XLogReaderFree(xlogreader);
+}
+
+
/*
* Confirms an xid is prepared, during recovery
*/
@@ -1364,6 +1300,7 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
int ndelrels;
SharedInvalidationMessage *invalmsgs;
int i;
+ bool file_used = false;
/*
* Validate the GID, and lock the GXACT to ensure that two backends do not
@@ -1375,14 +1312,20 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
xid = pgxact->xid;
/*
- * Read and validate the state file
+ * Read and validate 2PC state data.
+ * State data can be stored in xlog or in files after xlog checkpoint.
+ * While checkpointing we set gxact->prepare_start_lsn to NULL to signalize
+ * that 2PC data is moved to files.
*/
- buf = ReadTwoPhaseFile(xid, true);
- if (buf == NULL)
- ereport(ERROR,
- (errcode(ERRCODE_DATA_CORRUPTED),
- errmsg("two-phase state file for transaction %u is corrupt",
- xid)));
+ if (gxact->prepare_start_lsn)
+ {
+ XlogReadTwoPhaseData(gxact->prepare_start_lsn, &buf, NULL);
+ }
+ else
+ {
+ buf = ReadTwoPhaseFile(xid, true);
+ file_used = true;
+ }
/*
* Disassemble the header area
@@ -1484,7 +1427,8 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
/*
* And now we can clean up our mess.
*/
- RemoveTwoPhaseFile(xid, true);
+ if (file_used)
+ RemoveTwoPhaseFile(xid, true);
RemoveGXact(gxact);
MyLockedGxact = NULL;
@@ -1539,7 +1483,8 @@ RemoveTwoPhaseFile(TransactionId xid, bool giveWarning)
}
/*
- * Recreates a state file. This is used in WAL replay.
+ * Recreates a state file. This is used in WAL replay and during
+ * checkpoint creation.
*
* Note: content and len don't include CRC.
*/
@@ -1620,85 +1565,38 @@ RecreateTwoPhaseFile(TransactionId xid, void *content, int len)
void
CheckPointTwoPhase(XLogRecPtr redo_horizon)
{
- TransactionId *xids;
- int nxids;
- char path[MAXPGPATH];
int i;
+ int len;
+ char *buf;
- /*
- * We don't want to hold the TwoPhaseStateLock while doing I/O, so we grab
- * it just long enough to make a list of the XIDs that require fsyncing,
- * and then do the I/O afterwards.
- *
- * This approach creates a race condition: someone else could delete a
- * GXACT between the time we release TwoPhaseStateLock and the time we try
- * to open its state file. We handle this by special-casing ENOENT
- * failures: if we see that, we verify that the GXACT is no longer valid,
- * and if so ignore the failure.
- */
if (max_prepared_xacts <= 0)
return; /* nothing to do */
TRACE_POSTGRESQL_TWOPHASE_CHECKPOINT_START();
- xids = (TransactionId *) palloc(max_prepared_xacts * sizeof(TransactionId));
- nxids = 0;
-
+ /*
+ * Here we doing whole I/O while holding TwoPhaseStateLock.
+ * It's also possible to move I/O out of the lock, but on
+ * every error we should check whether somebody commited our
+ * transaction in different backend. Let's leave this optimisation
+ * for future, if somebody will spot that this place cause
+ * bottleneck.
+ */
LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
-
for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
{
GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
PGXACT *pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
- if (gxact->valid &&
- gxact->prepare_lsn <= redo_horizon)
- xids[nxids++] = pgxact->xid;
- }
-
- LWLockRelease(TwoPhaseStateLock);
-
- for (i = 0; i < nxids; i++)
- {
- TransactionId xid = xids[i];
- int fd;
-
- TwoPhaseFilePath(path, xid);
-
- fd = OpenTransientFile(path, O_RDWR | PG_BINARY, 0);
- if (fd < 0)
- {
- if (errno == ENOENT)
- {
- /* OK if gxact is no longer valid */
- if (!TransactionIdIsPrepared(xid))
- continue;
- /* Restore errno in case it was changed */
- errno = ENOENT;
- }
- ereport(ERROR,
- (errcode_for_file_access(),
- errmsg("could not open two-phase state file \"%s\": %m",
- path)));
- }
-
- if (pg_fsync(fd) != 0)
- {
- CloseTransientFile(fd);
- ereport(ERROR,
- (errcode_for_file_access(),
- errmsg("could not fsync two-phase state file \"%s\": %m",
- path)));
+ if (gxact->valid && gxact->prepare_start_lsn != InvalidXLogRecPtr &&
+ gxact->prepare_end_lsn <= redo_horizon){
+ XlogReadTwoPhaseData(gxact->prepare_start_lsn, &buf, &len);
+ RecreateTwoPhaseFile(pgxact->xid, buf, len);
+ gxact->prepare_start_lsn = InvalidXLogRecPtr;
+ pfree(buf);
}
-
- if (CloseTransientFile(fd) != 0)
- ereport(ERROR,
- (errcode_for_file_access(),
- errmsg("could not close two-phase state file \"%s\": %m",
- path)));
}
-
- pfree(xids);
+ LWLockRelease(TwoPhaseStateLock);
TRACE_POSTGRESQL_TWOPHASE_CHECKPOINT_DONE();
}
@@ -2030,12 +1928,8 @@ RecoverPreparedTransactions(void)
/*
* Recreate its GXACT and dummy PGPROC
*
- * Note: since we don't have the PREPARE record's WAL location at
- * hand, we leave prepare_lsn zeroes. This means the GXACT will
- * be fsync'd on every future checkpoint. We assume this
- * situation is infrequent enough that the performance cost is
- * negligible (especially since we know the state file has already
- * been fsynced).
+ * MarkAsPreparing sets prepare_start_lsn to InvalidXLogRecPtr
+ * so next checkpoint will skip that transaction.
*/
gxact = MarkAsPreparing(xid, hdr->gid,
hdr->prepared_at,
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 147fd53..598faf0 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -321,8 +321,7 @@ static TimeLineID curFileTLI;
* stored here. The parallel leader advances its own copy, when necessary,
* in WaitForParallelWorkersToFinish.
*/
-static XLogRecPtr ProcLastRecPtr = InvalidXLogRecPtr;
-
+XLogRecPtr ProcLastRecPtr = InvalidXLogRecPtr;
XLogRecPtr XactLastRecEnd = InvalidXLogRecPtr;
XLogRecPtr XactLastCommitEnd = InvalidXLogRecPtr;
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index 790ca66..a6d04cc 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -86,6 +86,7 @@ typedef enum
RECOVERY_TARGET_IMMEDIATE
} RecoveryTargetType;
+extern XLogRecPtr ProcLastRecPtr;
extern XLogRecPtr XactLastRecEnd;
extern PGDLLIMPORT XLogRecPtr XactLastCommitEnd;
On 9 January 2016 at 12:26, Stas Kelvich <s.kelvich@postgrespro.ru> wrote:
I’ve updated patch and wrote description of thighs that happens
with 2PC state data in the beginning of the file. I think now this patch
is well documented,
but if somebody points me to places that probably requires more detailed
description I’m ready
to extend that.
Hmm, I was just preparing this for commit.
Please have a look at my mild edits and extended comments.
--
Simon Riggs http://www.2ndQuadrant.com/
<http://www.2ndquadrant.com/>
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services
Attachments:
2pc_optimize.v2.patchapplication/octet-stream; name=2pc_optimize.v2.patchDownload
diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c
index 2251b02..7b8b620 100644
--- a/src/backend/access/transam/twophase.c
+++ b/src/backend/access/transam/twophase.c
@@ -25,11 +25,14 @@
* what keeps the XID considered running by TransactionIdIsInProgress.
* It is also convenient as a PGPROC to hook the gxact's locks to.
*
- * In order to survive crashes and shutdowns, all prepared
- * transactions must be stored in permanent storage. This includes
- * locking information, pending notifications etc. All that state
- * information is written to the per-transaction state file in
- * the pg_twophase directory.
+ * Information to recover prepared transactions in case of crash is
+ * now stored in WAL for the common case. In some cases there will be
+ * an extended period between preparing a GXACT and commit/abort, in
+ * which case we need to separately record prepared transaction data
+ * in permanent storage. This includes locking information, pending
+ * notifications etc. All that state information is written to the
+ * per-transaction state file in the pg_twophase directory.
+ * All prepared transactions will be written prior to shutdown.
*
*-------------------------------------------------------------------------
*/
@@ -51,6 +54,7 @@
#include "access/xlog.h"
#include "access/xloginsert.h"
#include "access/xlogutils.h"
+#include "access/xlogreader.h"
#include "catalog/pg_type.h"
#include "catalog/storage.h"
#include "funcapi.h"
@@ -60,6 +64,7 @@
#include "replication/origin.h"
#include "replication/syncrep.h"
#include "replication/walsender.h"
+#include "replication/logicalfuncs.h"
#include "storage/fd.h"
#include "storage/ipc.h"
#include "storage/predicate.h"
@@ -117,10 +122,21 @@ typedef struct GlobalTransactionData
int pgprocno; /* ID of associated dummy PGPROC */
BackendId dummyBackendId; /* similar to backend id for backends */
TimestampTz prepared_at; /* time of preparation */
- XLogRecPtr prepare_lsn; /* XLOG offset of prepare record */
+
+ /*
+ * Note that we need to keep track of two LSNs for each GXACT.
+ * We keep track of the start LSN because this is the address we must
+ * use to read state data back from WAL when committing a prepared GXACT.
+ * We keep track of the end LSN because that is the LSN we need to wait
+ * for prior to commit.
+ */
+ XLogRecPtr prepare_start_lsn; /* XLOG offset of prepare record start */
+ XLogRecPtr prepare_end_lsn; /* XLOG offset of prepare record end */
+
Oid owner; /* ID of user that executed the xact */
BackendId locking_backend; /* backend currently working on the xact */
bool valid; /* TRUE if PGPROC entry is in proc array */
+ bool ondisk; /* TRUE if prepare state file is on disk */
char gid[GIDSIZE]; /* The GID assigned to the prepared xact */
} GlobalTransactionData;
@@ -166,6 +182,7 @@ static void ProcessRecords(char *bufptr, TransactionId xid,
const TwoPhaseCallback callbacks[]);
static void RemoveGXact(GlobalTransaction gxact);
+static void XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len);
/*
* Initialization of shared memory
@@ -398,8 +415,9 @@ MarkAsPreparing(TransactionId xid, const char *gid,
pgxact->nxids = 0;
gxact->prepared_at = prepared_at;
- /* initialize LSN to 0 (start of WAL) */
- gxact->prepare_lsn = 0;
+ /* initialize LSN to InvalidXLogRecPtr */
+ gxact->prepare_start_lsn = InvalidXLogRecPtr;
+ gxact->prepare_end_lsn = InvalidXLogRecPtr;
gxact->owner = owner;
gxact->locking_backend = MyBackendId;
gxact->valid = false;
@@ -579,41 +597,6 @@ RemoveGXact(GlobalTransaction gxact)
}
/*
- * TransactionIdIsPrepared
- * True iff transaction associated with the identifier is prepared
- * for two-phase commit
- *
- * Note: only gxacts marked "valid" are considered; but notice we do not
- * check the locking status.
- *
- * This is not currently exported, because it is only needed internally.
- */
-static bool
-TransactionIdIsPrepared(TransactionId xid)
-{
- bool result = false;
- int i;
-
- LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
-
- for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
- {
- GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
- PGXACT *pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
-
- if (gxact->valid && pgxact->xid == xid)
- {
- result = true;
- break;
- }
- }
-
- LWLockRelease(TwoPhaseStateLock);
-
- return result;
-}
-
-/*
* Returns an array of all prepared transactions for the user-level
* function pg_prepared_xact.
*
@@ -1020,14 +1003,8 @@ StartPrepare(GlobalTransaction gxact)
void
EndPrepare(GlobalTransaction gxact)
{
- PGXACT *pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
- TransactionId xid = pgxact->xid;
TwoPhaseFileHeader *hdr;
- char path[MAXPGPATH];
StateFileChunk *record;
- pg_crc32c statefile_crc;
- pg_crc32c bogus_crc;
- int fd;
/* Add the end sentinel to the list of 2PC records */
RegisterTwoPhaseRecord(TWOPHASE_RM_END_ID, 0,
@@ -1048,70 +1025,8 @@ EndPrepare(GlobalTransaction gxact)
errmsg("two-phase state file maximum length exceeded")));
/*
- * Create the 2PC state file.
- */
- TwoPhaseFilePath(path, xid);
-
- fd = OpenTransientFile(path,
- O_CREAT | O_EXCL | O_WRONLY | PG_BINARY,
- S_IRUSR | S_IWUSR);
- if (fd < 0)
- ereport(ERROR,
- (errcode_for_file_access(),
- errmsg("could not create two-phase state file \"%s\": %m",
- path)));
-
- /* Write data to file, and calculate CRC as we pass over it */
- INIT_CRC32C(statefile_crc);
-
- for (record = records.head; record != NULL; record = record->next)
- {
- COMP_CRC32C(statefile_crc, record->data, record->len);
- if ((write(fd, record->data, record->len)) != record->len)
- {
- CloseTransientFile(fd);
- ereport(ERROR,
- (errcode_for_file_access(),
- errmsg("could not write two-phase state file: %m")));
- }
- }
-
- FIN_CRC32C(statefile_crc);
-
- /*
- * Write a deliberately bogus CRC to the state file; this is just paranoia
- * to catch the case where four more bytes will run us out of disk space.
- */
- bogus_crc = ~statefile_crc;
-
- if ((write(fd, &bogus_crc, sizeof(pg_crc32c))) != sizeof(pg_crc32c))
- {
- CloseTransientFile(fd);
- ereport(ERROR,
- (errcode_for_file_access(),
- errmsg("could not write two-phase state file: %m")));
- }
-
- /* Back up to prepare for rewriting the CRC */
- if (lseek(fd, -((off_t) sizeof(pg_crc32c)), SEEK_CUR) < 0)
- {
- CloseTransientFile(fd);
- ereport(ERROR,
- (errcode_for_file_access(),
- errmsg("could not seek in two-phase state file: %m")));
- }
-
- /*
- * The state file isn't valid yet, because we haven't written the correct
- * CRC yet. Before we do that, insert entry in WAL and flush it to disk.
- *
- * Between the time we have written the WAL entry and the time we write
- * out the correct state file CRC, we have an inconsistency: the xact is
- * prepared according to WAL but not according to our on-disk state. We
- * use a critical section to force a PANIC if we are unable to complete
- * the write --- then, WAL replay should repair the inconsistency. The
- * odds of a PANIC actually occurring should be very tiny given that we
- * were able to write the bogus CRC above.
+ * Now writing 2PC state data to WAL. We let the WAL's CRC protection
+ * cover us, so no need to calculate a separate CRC.
*
* We have to set delayChkpt here, too; otherwise a checkpoint starting
* immediately after the WAL record is inserted could complete without
@@ -1131,24 +1046,13 @@ EndPrepare(GlobalTransaction gxact)
XLogBeginInsert();
for (record = records.head; record != NULL; record = record->next)
XLogRegisterData(record->data, record->len);
- gxact->prepare_lsn = XLogInsert(RM_XACT_ID, XLOG_XACT_PREPARE);
- XLogFlush(gxact->prepare_lsn);
+ gxact->prepare_end_lsn = XLogInsert(RM_XACT_ID, XLOG_XACT_PREPARE);
+ XLogFlush(gxact->prepare_end_lsn);
/* If we crash now, we have prepared: WAL replay will fix things */
- /* write correct CRC and close file */
- if ((write(fd, &statefile_crc, sizeof(pg_crc32c))) != sizeof(pg_crc32c))
- {
- CloseTransientFile(fd);
- ereport(ERROR,
- (errcode_for_file_access(),
- errmsg("could not write two-phase state file: %m")));
- }
-
- if (CloseTransientFile(fd) != 0)
- ereport(ERROR,
- (errcode_for_file_access(),
- errmsg("could not close two-phase state file: %m")));
+ /* Store record's start location to read that later on Commit */
+ gxact->prepare_start_lsn = ProcLastRecPtr;
/*
* Mark the prepared transaction as valid. As soon as xact.c marks
@@ -1186,7 +1090,7 @@ EndPrepare(GlobalTransaction gxact)
* Note that at this stage we have marked the prepare, but still show as
* running in the procarray (twice!) and continue to hold locks.
*/
- SyncRepWaitForLSN(gxact->prepare_lsn);
+ SyncRepWaitForLSN(gxact->prepare_end_lsn);
records.tail = records.head = NULL;
records.num_chunks = 0;
@@ -1315,6 +1219,36 @@ ReadTwoPhaseFile(TransactionId xid, bool give_warnings)
return buf;
}
+
+/*
+ * Reads 2PC data from xlog. During checkpoint this data will be moved to
+ * twophase files and ReadTwoPhaseFile should be used instead.
+ */
+static void
+XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len)
+{
+ XLogRecord *record;
+ XLogReaderState *xlogreader;
+ char *errormsg;
+
+ xlogreader = XLogReaderAllocate(&logical_read_local_xlog_page, NULL);
+ if (xlogreader == NULL)
+ elog(ERROR, "failed to open xlogreader for reading 2PC data");
+
+ record = XLogReadRecord(xlogreader, lsn, &errormsg);
+ if (record == NULL)
+ elog(ERROR, "failed to read 2PC record from xlog");
+
+ if (len != NULL)
+ *len = XLogRecGetDataLen(xlogreader);
+
+ *buf = palloc(sizeof(char)*XLogRecGetDataLen(xlogreader));
+ memcpy(*buf, XLogRecGetData(xlogreader), sizeof(char)*XLogRecGetDataLen(xlogreader));
+
+ XLogReaderFree(xlogreader);
+}
+
+
/*
* Confirms an xid is prepared, during recovery
*/
@@ -1375,14 +1309,16 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
xid = pgxact->xid;
/*
- * Read and validate the state file
+ * Read and validate 2PC state data.
+ * State data will typically be stored in WAL files if the LSN is after the
+ * last checkpoint record, or moved to disk if for some reason they have
+ * lived for a long time.
*/
- buf = ReadTwoPhaseFile(xid, true);
- if (buf == NULL)
- ereport(ERROR,
- (errcode(ERRCODE_DATA_CORRUPTED),
- errmsg("two-phase state file for transaction %u is corrupt",
- xid)));
+ if (gxact->ondisk)
+ buf = ReadTwoPhaseFile(xid, true);
+ else
+ XlogReadTwoPhaseData(gxact->prepare_start_lsn, &buf, NULL);
+
/*
* Disassemble the header area
@@ -1482,9 +1418,10 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
AtEOXact_PgStat(isCommit);
/*
- * And now we can clean up our mess.
+ * And now we can clean up any files we may have left.
*/
- RemoveTwoPhaseFile(xid, true);
+ if (gxact->ondisk)
+ RemoveTwoPhaseFile(xid, true);
RemoveGXact(gxact);
MyLockedGxact = NULL;
@@ -1539,7 +1476,8 @@ RemoveTwoPhaseFile(TransactionId xid, bool giveWarning)
}
/*
- * Recreates a state file. This is used in WAL replay.
+ * Recreates a state file. This is used in WAL replay and during
+ * checkpoint creation.
*
* Note: content and len don't include CRC.
*/
@@ -1610,97 +1548,71 @@ RecreateTwoPhaseFile(TransactionId xid, void *content, int len)
* This is deliberately run as late as possible in the checkpoint sequence,
* because GXACTs ordinarily have short lifespans, and so it is quite
* possible that GXACTs that were valid at checkpoint start will no longer
- * exist if we wait a little bit.
+ * exist if we wait a little bit. With typical checkpoint settings this
+ * will be about 3 minutes for an online checkpoint, so as a result we
+ * we expect that there will be no GXACTs that need to be copied to disk.
*
- * If a GXACT remains valid across multiple checkpoints, it'll be fsynced
- * each time. This is considered unusual enough that we don't bother to
- * expend any extra code to avoid the redundant fsyncs. (They should be
- * reasonably cheap anyway, since they won't cause I/O.)
+ * If a GXACT remains valid across multiple checkpoints, it will already
+ * be on disk so we don't bother to repeat that write.
*/
void
CheckPointTwoPhase(XLogRecPtr redo_horizon)
{
- TransactionId *xids;
- int nxids;
- char path[MAXPGPATH];
int i;
+ int n = 0;
- /*
- * We don't want to hold the TwoPhaseStateLock while doing I/O, so we grab
- * it just long enough to make a list of the XIDs that require fsyncing,
- * and then do the I/O afterwards.
- *
- * This approach creates a race condition: someone else could delete a
- * GXACT between the time we release TwoPhaseStateLock and the time we try
- * to open its state file. We handle this by special-casing ENOENT
- * failures: if we see that, we verify that the GXACT is no longer valid,
- * and if so ignore the failure.
- */
if (max_prepared_xacts <= 0)
return; /* nothing to do */
TRACE_POSTGRESQL_TWOPHASE_CHECKPOINT_START();
- xids = (TransactionId *) palloc(max_prepared_xacts * sizeof(TransactionId));
- nxids = 0;
-
+ /*
+ * We are expecting there to be zero GXACTs that need to be
+ * copied to disk, so we perform all I/O while holding
+ * TwoPhaseStateLock for simplicity. This prevents any new xacts
+ * from preparing while this occurs, which shouldn't be a problem
+ * since the presence of long-lived prepared xacts indicates the
+ * transaction manager isn't active.
+ *
+ * It's also possible to move I/O out of the lock, but on
+ * every error we should check whether somebody commited our
+ * transaction in different backend. Let's leave this optimisation
+ * for future, if somebody will spot that this place cause
+ * bottleneck.
+ *
+ * Note that it isn't possible for there to be a GXACT with
+ * a prepare_end_lsn set prior to the last checkpoint yet
+ * is marked invalid, because of the efforts with delayChkpt.
+ */
LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
-
for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
{
GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
PGXACT *pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
if (gxact->valid &&
- gxact->prepare_lsn <= redo_horizon)
- xids[nxids++] = pgxact->xid;
- }
-
- LWLockRelease(TwoPhaseStateLock);
-
- for (i = 0; i < nxids; i++)
- {
- TransactionId xid = xids[i];
- int fd;
-
- TwoPhaseFilePath(path, xid);
-
- fd = OpenTransientFile(path, O_RDWR | PG_BINARY, 0);
- if (fd < 0)
+ !gxact->ondisk &&
+ gxact->prepare_end_lsn <= redo_horizon)
{
- if (errno == ENOENT)
- {
- /* OK if gxact is no longer valid */
- if (!TransactionIdIsPrepared(xid))
- continue;
- /* Restore errno in case it was changed */
- errno = ENOENT;
- }
- ereport(ERROR,
- (errcode_for_file_access(),
- errmsg("could not open two-phase state file \"%s\": %m",
- path)));
- }
+ char *buf;
+ int len;
- if (pg_fsync(fd) != 0)
- {
- CloseTransientFile(fd);
- ereport(ERROR,
- (errcode_for_file_access(),
- errmsg("could not fsync two-phase state file \"%s\": %m",
- path)));
+ XlogReadTwoPhaseData(gxact->prepare_start_lsn, &buf, &len);
+ RecreateTwoPhaseFile(pgxact->xid, buf, len);
+ gxact->ondisk = true;
+ pfree(buf);
+ n++;
}
-
- if (CloseTransientFile(fd) != 0)
- ereport(ERROR,
- (errcode_for_file_access(),
- errmsg("could not close two-phase state file \"%s\": %m",
- path)));
}
-
- pfree(xids);
+ LWLockRelease(TwoPhaseStateLock);
TRACE_POSTGRESQL_TWOPHASE_CHECKPOINT_DONE();
+
+ if (log_checkpoints)
+ ereport(LOG,
+ (errmsg("%u two-phase state files were written "
+ "for long-running prepared transactions",
+ n)));
}
/*
@@ -2029,17 +1941,11 @@ RecoverPreparedTransactions(void)
/*
* Recreate its GXACT and dummy PGPROC
- *
- * Note: since we don't have the PREPARE record's WAL location at
- * hand, we leave prepare_lsn zeroes. This means the GXACT will
- * be fsync'd on every future checkpoint. We assume this
- * situation is infrequent enough that the performance cost is
- * negligible (especially since we know the state file has already
- * been fsynced).
*/
gxact = MarkAsPreparing(xid, hdr->gid,
hdr->prepared_at,
hdr->owner, hdr->database);
+ gxact->ondisk = true;
GXactLoadSubxactData(gxact, hdr->nsubxacts, subxids);
MarkAsPrepared(gxact);
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index aa90503..c41baa0 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -321,8 +321,7 @@ static TimeLineID curFileTLI;
* stored here. The parallel leader advances its own copy, when necessary,
* in WaitForParallelWorkersToFinish.
*/
-static XLogRecPtr ProcLastRecPtr = InvalidXLogRecPtr;
-
+XLogRecPtr ProcLastRecPtr = InvalidXLogRecPtr;
XLogRecPtr XactLastRecEnd = InvalidXLogRecPtr;
XLogRecPtr XactLastCommitEnd = InvalidXLogRecPtr;
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index 3de337a..ecd30ce 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -86,6 +86,7 @@ typedef enum
RECOVERY_TARGET_IMMEDIATE
} RecoveryTargetType;
+extern XLogRecPtr ProcLastRecPtr;
extern XLogRecPtr XactLastRecEnd;
extern PGDLLIMPORT XLogRecPtr XactLastCommitEnd;
On 9 January 2016 at 12:26, Stas Kelvich <s.kelvich@postgrespro.ru> wrote:
I’ve updated patch and wrote description of thighs that happens
with 2PC state data in the beginning of the file. I think now this patch
is well documented,
but if somebody points me to places that probably requires more detailed
description I’m ready
to extend that.
Your comments say
"In case of crash replay will move data from xlog to files, if
that hasn't happened before."
but I don't see that in code. Can you show me where that happens?
--
Simon Riggs http://www.2ndQuadrant.com/
<http://www.2ndquadrant.com/>
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services
Thanks a lot for your edits, now that patch is much more cleaner.
Your comments say
"In case of crash replay will move data from xlog to files, if that hasn't happened before."
but I don't see that in code. Can you show me where that happens?
xact.c calls RecreateTwoPhaseFile in xact_redo() function (xact.c:5596)
On 09 Jan 2016, at 18:29, Simon Riggs <simon@2ndquadrant.com> wrote:
Hmm, I was just preparing this for commit.
Please have a look at my mild edits and extended comments.
One concern that come into my mind while reading updated
patch is about creating extra bool field in GlobalTransactionData structure. While this improves readability, it
also increases size of that structure and that size have impact on performance on systems with many cores
(say like 60-80). Probably one byte will not make measurable difference, but I think it is good idea to keep
GXact as small as possible. As far as I understand the same logic was behind split of
PGPROC to PGPROC+PGXACT in 9.2 (comment in proc.h:166)
Stas Kelvich
Postgres Professional: http://www.postgrespro.com
The Russian Postgres Company
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On 9 January 2016 at 20:28, Stas Kelvich <s.kelvich@postgrespro.ru> wrote:
Thanks a lot for your edits, now that patch is much more cleaner.
Your comments say
"In case of crash replay will move data from xlog to files, if that
hasn't happened before."
but I don't see that in code. Can you show me where that happens?
xact.c calls RecreateTwoPhaseFile in xact_redo() function (xact.c:5596)
So we've only optimized half the usage? We're still going to cause
replication delays.
Sounds like we should be fixing both.
We can either
1) Skip fsyncing the RecreateTwoPhaseFile and then fsync during
restartpoints
2) Copy the contents to shmem and then write them at restartpoint as we do
for checkpoint
(preferred)
On 09 Jan 2016, at 18:29, Simon Riggs <simon@2ndquadrant.com> wrote:
Hmm, I was just preparing this for commit.
Please have a look at my mild edits and extended comments.
One concern that come into my mind while reading updated
patch is about creating extra bool field in GlobalTransactionData
structure. While this improves readability, it
also increases size of that structure and that size have impact on
performance on systems with many cores
(say like 60-80). Probably one byte will not make measurable difference,
but I think it is good idea to keep
GXact as small as possible. As far as I understand the same logic was
behind split of
PGPROC to PGPROC+PGXACT in 9.2 (comment in proc.h:166)
I think padding will negate the effects of the additional bool.
If we want to reduce the size of the array GIDSIZE is currently 200, but XA
says maximum 128 bytes.
Anybody know why that is set to 200?
--
Simon Riggs http://www.2ndQuadrant.com/
<http://www.2ndquadrant.com/>
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services
On 10 Jan 2016, at 12:15, Simon Riggs <simon@2ndquadrant.com> wrote:
So we've only optimized half the usage? We're still going to cause replication delays.
Yes, replica will go through old procedures of moving data to and from file.
We can either
1) Skip fsyncing the RecreateTwoPhaseFile and then fsync during restartpoints
From what i’ve seen with old 2pc code main performance bottleneck was caused by frequent creating of files. So better to avoid files if possible.
2) Copy the contents to shmem and then write them at restartpoint as we do for checkpoint
(preferred)
Problem with shared memory is that we can’t really predict size of state data, and anyway it isn’t faster then reading data from WAL
(I have tested that while preparing original patch).
We can just apply the same logic on replica that on master: do not do anything special on prepare, and just read that data from WAL.
If checkpoint occurs during recovery/replay probably existing code will handle moving data to files.
I will update patch to address this issue.
I think padding will negate the effects of the additional bool.
If we want to reduce the size of the array GIDSIZE is currently 200, but XA says maximum 128 bytes.
Anybody know why that is set to 200?
Good catch about GID size.
If we talk about further optimisations i see two ways:
1) Optimising access to GXACT. Here we can try to shrink it; introduce more granular locks,
e.g. move GIDs out of GXACT and lock GIDs array only once while checking new GID uniqueness; try to lock only part of GXACT by hash; etc.
2) Be optimistic about consequent COMMIT PREPARED. In normal workload next command after PREPARE will be COMMIT/ROLLBACK, so we can save
transaction context and release it only if next command isn’t our designated COMMIT/ROLLBACK. But that is a big amount of work and requires
changes to whole transaction pipeline in postgres.
Anyway I suggest that we should consider that as a separate task.
---
Stas Kelvich
Postgres Professional: http://www.postgrespro.com
Russian Postgres Company
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On 01/09/2016 10:29 AM, Simon Riggs wrote:
On 9 January 2016 at 12:26, Stas Kelvich <s.kelvich@postgrespro.ru> wrote:
I’ve updated patch and wrote description of thighs that happens
with 2PC state data in the beginning of the file. I think now this patch
is well documented,
but if somebody points me to places that probably requires more detailed
description I’m ready
to extend that.Hmm, I was just preparing this for commit.
Please have a look at my mild edits and extended comments.
I have done a run with the patch and it looks really great.
Attached is the TPS graph - with a 1pc run too - and the perf profile as
a flame graph (28C/56T w/ 256Gb mem, 2 x RAID10 SSD).
Maybe
+static void
+XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len)
to
+static void
+ReadTwoPhaseDataFromXlog(XLogRecPtr lsn, char **buf, int *len)
Best regards,
Jesper