--- postgresql-8.4.0.orig/src/backend/access/transam/twophase.c 2009-06-26 04:05:52.000000000 +0900 +++ postgresql-8.4.0/src/backend/access/transam/twophase.c 2009-08-06 10:03:40.000000000 +0900 @@ -69,6 +69,7 @@ /* GUC variable, can't be changed after startup */ int max_prepared_xacts = 0; +int state_file_max_space = 0; /* * This struct describes one global transaction that is in prepared state @@ -115,6 +116,10 @@ typedef struct GlobalTransactionData TransactionId locking_xid; /* top-level XID of backend working on xact */ bool valid; /* TRUE if fully prepared */ char gid[GIDSIZE]; /* The GID assigned to the prepared xact */ + int stateFileLength; /* length of a state file */ + bool in_cache; /* To determine if a state file is on shared mem or not*/ + char *cache_entry; /* block entry */ + int BlockId; /* identifier to find the block where state file is */ } GlobalTransactionData; /* @@ -138,6 +143,8 @@ typedef struct TwoPhaseStateData static TwoPhaseStateData *TwoPhaseState; +/* Static variable linked to state files on shared memory */ +static char *StateFileCacheFreeList = NULL; static void RecordTransactionCommitPrepared(TransactionId xid, int nchildren, @@ -172,6 +179,14 @@ TwoPhaseShmemSize(void) return size; } +Size +StateFileShmemSize(void) +{ + Size StateFileSize; + StateFileSize = mul_size(max_prepared_xacts, state_file_max_space); + return StateFileSize; +} + void TwoPhaseShmemInit(void) { @@ -206,6 +221,18 @@ TwoPhaseShmemInit(void) Assert(found); } +void +StateFileShmemInit(void) +{ + if (state_file_max_space != 0) + { + StateFileCacheFreeList = (char *) ShmemAlloc(state_file_max_space*max_prepared_xacts); + } + else + { + StateFileCacheFreeList = NULL; + } +} /* * MarkAsPreparing @@ -865,7 +892,7 @@ EndPrepare(GlobalTransaction gxact) XLogRecData *record; pg_crc32 statefile_crc; pg_crc32 bogus_crc; - int fd; + int fd = 0; /* Add the end sentinel to the list of 2PC records */ RegisterTwoPhaseRecord(TWOPHASE_RM_END_ID, 0, @@ -892,58 +919,133 @@ EndPrepare(GlobalTransaction gxact) * the FD gets closed in any error exit path. Once we get into the * critical section, though, it doesn't matter since any failure causes * PANIC anyway. + * + * If the total length of records is higher than a block on shared mem, + * state file is written on disk instead */ - TwoPhaseFilePath(path, xid); - fd = BasicOpenFile(path, - O_CREAT | O_EXCL | O_WRONLY | PG_BINARY, - S_IRUSR | S_IWUSR); - if (fd < 0) - ereport(ERROR, - (errcode_for_file_access(), - errmsg("could not create two-phase state file \"%s\": %m", - path))); + gxact->stateFileLength = records.total_len; + if (hdr->total_len < state_file_max_space + && StateFileCacheFreeList != NULL) + { + bool *BlockId=(bool *)palloc(max_prepared_xacts*sizeof(bool)); + int i,count = 1; + bool found = false; + /*Initialize BlockId */ + for (i=0;inumPrepXacts; i++) + { + GlobalTransaction gxactloc = TwoPhaseState->prepXacts[i]; + if (gxactloc->BlockId > 0) + { + BlockId[gxactloc->BlockId-1]=true; + } + } + /* It is better to keep the lock a longer time + * as another transaction could take the same block + */ + /* find the 1st block in the list not taken */ + while(!found) + { + if(BlockId[count-1]==true + && count < max_prepared_xacts)/* block already taken */ + { + count++; + } + else + { + found = true; + gxact->BlockId=count; + } + } + if (gxact->BlockId > 0) + { + gxact->cache_entry = StateFileCacheFreeList + state_file_max_space*(gxact->BlockId-1); + } - /* Write data to file, and calculate CRC as we pass over it */ - INIT_CRC32(statefile_crc); + LWLockRelease(TwoPhaseStateLock); - for (record = records.head; record != NULL; record = record->next) + /* allocation of a memory block + * The head block of the free list is taken and used for the TX in process + */ + gxact->in_cache = true; + } + else { - COMP_CRC32(statefile_crc, record->data, record->len); - if ((write(fd, record->data, record->len)) != record->len) - { - close(fd); + gxact->in_cache = false; + gxact->BlockId = 0; + TwoPhaseFilePath(path, xid); + + fd = BasicOpenFile(path, + O_CREAT | O_EXCL | O_WRONLY | PG_BINARY, + S_IRUSR | S_IWUSR); + if (fd < 0) ereport(ERROR, (errcode_for_file_access(), - errmsg("could not write two-phase state file: %m"))); - } + errmsg("could not create two-phase state file \"%s\": %m", + path))); } - FIN_CRC32(statefile_crc); - /* - * Write a deliberately bogus CRC to the state file; this is just paranoia - * to catch the case where four more bytes will run us out of disk space. - */ - bogus_crc = ~statefile_crc; - - if ((write(fd, &bogus_crc, sizeof(pg_crc32))) != sizeof(pg_crc32)) + if (gxact->in_cache == true) { - close(fd); - ereport(ERROR, - (errcode_for_file_access(), - errmsg("could not write two-phase state file: %m"))); + int locTotalLen = 0; + for (record = records.head; record != NULL; record = record->next) + { + memcpy(gxact->cache_entry+locTotalLen, record->data, record->len); + locTotalLen += record->len; + } + Assert(locTotalLen == gxact->stateFileLength); } - - /* Back up to prepare for rewriting the CRC */ - if (lseek(fd, -((off_t) sizeof(pg_crc32)), SEEK_CUR) < 0) + else { - close(fd); - ereport(ERROR, - (errcode_for_file_access(), - errmsg("could not seek in two-phase state file: %m"))); - } + /* Write data to file, and calculate CRC as we pass over it */ + INIT_CRC32(statefile_crc); + for (record = records.head; record != NULL; record = record->next) + { + COMP_CRC32(statefile_crc, record->data, record->len); + if ((write(fd, record->data, record->len)) != record->len) + { + close(fd); + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not write two-phase state file: %m"))); + } + } + + FIN_CRC32(statefile_crc); + + /* + * Write a deliberately bogus CRC to the state file; this is just paranoia + * to catch the case where four more bytes will run us out of disk space. + */ + bogus_crc = ~statefile_crc; + + if ((write(fd, &bogus_crc, sizeof(pg_crc32))) != sizeof(pg_crc32)) + { + close(fd); + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not write two-phase state file: %m"))); + } + + /* Back up to prepare for rewriting the CRC */ + if (lseek(fd, -((off_t) sizeof(pg_crc32)), SEEK_CUR) < 0) + { + close(fd); + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not seek in two-phase state file: %m"))); + } + } /* * The state file isn't valid yet, because we haven't written the correct * CRC yet. Before we do that, insert entry in WAL and flush it to disk. @@ -974,21 +1076,23 @@ EndPrepare(GlobalTransaction gxact) XLogFlush(gxact->prepare_lsn); /* If we crash now, we have prepared: WAL replay will fix things */ - - /* write correct CRC and close file */ - if ((write(fd, &statefile_crc, sizeof(pg_crc32))) != sizeof(pg_crc32)) + if (!gxact->in_cache) { - close(fd); - ereport(ERROR, - (errcode_for_file_access(), - errmsg("could not write two-phase state file: %m"))); - } - if (close(fd) != 0) - ereport(ERROR, - (errcode_for_file_access(), - errmsg("could not close two-phase state file: %m"))); + /* write correct CRC and close file */ + if ((write(fd, &statefile_crc, sizeof(pg_crc32))) != sizeof(pg_crc32)) + { + close(fd); + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not write two-phase state file: %m"))); + } + if (close(fd) != 0) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not close two-phase state file: %m"))); + } /* * Mark the prepared transaction as valid. As soon as xact.c marks MyProc * as not running our XID (which it will do immediately after this @@ -1165,7 +1269,16 @@ FinishPreparedTransaction(const char *gi /* * Read and validate the state file */ - buf = ReadTwoPhaseFile(xid); + if (gxact->in_cache) + { + /* read file in shmem */ + buf = (char *) palloc(state_file_max_space); + memcpy(buf, gxact->cache_entry, state_file_max_space); + } + else + { + buf = ReadTwoPhaseFile(xid); + } if (buf == NULL) ereport(ERROR, (errcode(ERRCODE_DATA_CORRUPTED), @@ -1258,7 +1371,30 @@ FinishPreparedTransaction(const char *gi /* * And now we can clean up our mess. */ - RemoveTwoPhaseFile(xid, true); + if (gxact->in_cache) + { + int i; + /* Clean up the zone where the last state file has been written + * by replacing it with zeros + */ + for (i=0;istateFileLength;i++) + { + *(StateFileCacheFreeList+(gxact->BlockId-1)*state_file_max_space+i)='\0'; + } + + /* Remove statefile in shared memory by deleting the BlockId */ + LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE); + + gxact->BlockId = 0; + LWLockRelease(TwoPhaseStateLock); + gxact->in_cache = false; + gxact->cache_entry = NULL; + gxact->stateFileLength = 0; + } + else + { + RemoveTwoPhaseFile(xid, true); + } RemoveGXact(gxact); @@ -1373,6 +1509,77 @@ RecreateTwoPhaseFile(TransactionId xid, errmsg("could not close two-phase state file: %m"))); } +/* + * Writes the state file for given xact from shared memory cache to disk + */ +static void +FlushTwoPhaseStateFile(TransactionId xid) +{ + char *buffer = palloc(state_file_max_space); + int len; + int i; + bool found = false; + + LWLockAcquire(TwoPhaseStateLock, LW_SHARED); + + /* find the TX corresponding to the XID, and copy the state file contents + * from shared memory cache to local buffer + */ + for (i = 0; i < TwoPhaseState->numPrepXacts; i++) + { + GlobalTransaction gxact = TwoPhaseState->prepXacts[i]; + if (gxact->proc.xid == xid) + { + /* If not in cache, nothing to do */ + if (!gxact->in_cache) + return; + + len = gxact->stateFileLength; + memcpy(buffer, gxact->cache_entry, len); + found = true; + break; + } + } + LWLockRelease(TwoPhaseStateLock); + + if (!found) + return; + + RecreateTwoPhaseFile(xid, buffer, len); + + /* The data is now both in cache, and on disk. Remove it from cache */ + + LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE); + + found = false; + for (i = 0; i < TwoPhaseState->numPrepXacts; i++) + { + GlobalTransaction gxact = TwoPhaseState->prepXacts[i]; + if (gxact->proc.xid == xid) + { + for (i=0;iBlockId-1)*state_file_max_space)='\0'; + } + Assert(gxact->in_cache); + gxact->in_cache = false; + gxact->BlockId = 0; + /* add a state file block in Free list */ + gxact->cache_entry = NULL; + + found = true; + break; + } + } + LWLockRelease(TwoPhaseStateLock); + + if (!found) + { + /* The transaction was finished while we were writing it to disk */ + RemoveTwoPhaseFile(xid, true); + } +} + /* * CheckPointTwoPhase -- handle 2PC component of checkpointing. * @@ -1394,6 +1601,7 @@ void CheckPointTwoPhase(XLogRecPtr redo_horizon) { TransactionId *xids; + bool *in_cache; int nxids; char path[MAXPGPATH]; int i; @@ -1415,6 +1623,7 @@ CheckPointTwoPhase(XLogRecPtr redo_horiz TRACE_POSTGRESQL_TWOPHASE_CHECKPOINT_START(); xids = (TransactionId *) palloc(max_prepared_xacts * sizeof(TransactionId)); + in_cache = (bool *) palloc(max_prepared_xacts * sizeof(bool)); nxids = 0; LWLockAcquire(TwoPhaseStateLock, LW_SHARED); @@ -1425,7 +1634,11 @@ CheckPointTwoPhase(XLogRecPtr redo_horiz if (gxact->valid && XLByteLE(gxact->prepare_lsn, redo_horizon)) - xids[nxids++] = gxact->proc.xid; + { + xids[nxids] = gxact->proc.xid; + in_cache[nxids] = gxact->in_cache; + nxids++; + } } LWLockRelease(TwoPhaseStateLock); @@ -1435,6 +1648,9 @@ CheckPointTwoPhase(XLogRecPtr redo_horiz TransactionId xid = xids[i]; int fd; + if (in_cache[i]) + FlushTwoPhaseStateFile(xid); + TwoPhaseFilePath(path, xid); fd = BasicOpenFile(path, O_RDWR | PG_BINARY, 0); --- postgresql-8.4.0.orig/src/backend/utils/misc/guc.c 2009-06-11 23:49:06.000000000 +0900 +++ postgresql-8.4.0/src/backend/utils/misc/guc.c 2009-08-06 10:03:40.000000000 +0900 @@ -1506,6 +1506,15 @@ static struct config_int ConfigureNamesI &max_prepared_xacts, 0, 0, INT_MAX / 4, NULL, NULL }, + { + {"state_file_max_space", PGC_POSTMASTER, RESOURCES_MEM, + gettext_noop("Sets the maximum space usable by a state file in shared memory."), + NULL, + GUC_UNIT_BLOCKS + }, + &state_file_max_space, + 0, 0, INT_MAX, NULL, NULL + }, #ifdef LOCK_DEBUG { --- postgresql-8.4.0.orig/src/include/access/twophase.h 2009-01-02 02:23:56.000000000 +0900 +++ postgresql-8.4.0/src/include/access/twophase.h 2009-08-06 10:03:41.000000000 +0900 @@ -26,10 +26,14 @@ typedef struct GlobalTransactionData *Gl /* GUC variable */ extern int max_prepared_xacts; +extern int state_file_max_space; extern Size TwoPhaseShmemSize(void); extern void TwoPhaseShmemInit(void); +extern Size StateFileShmemSize(void); +extern void StateFileShmemInit(void); + extern PGPROC *TwoPhaseGetDummyProc(TransactionId xid); extern GlobalTransaction MarkAsPreparing(TransactionId xid, const char *gid, --- postgresql-8.4.0.orig/src/backend/storage/ipc/ipci.c 2009-05-06 04:59:00.000000000 +0900 +++ postgresql-8.4.0/src/backend/storage/ipc/ipci.c 2009-08-06 10:03:40.000000000 +0900 @@ -106,6 +106,7 @@ CreateSharedMemoryAndSemaphores(bool mak size = add_size(size, CLOGShmemSize()); size = add_size(size, SUBTRANSShmemSize()); size = add_size(size, TwoPhaseShmemSize()); + size = add_size(size, StateFileShmemSize()); size = add_size(size, MultiXactShmemSize()); size = add_size(size, LWLockShmemSize()); size = add_size(size, ProcArrayShmemSize()); @@ -183,6 +184,7 @@ CreateSharedMemoryAndSemaphores(bool mak CLOGShmemInit(); SUBTRANSShmemInit(); TwoPhaseShmemInit(); + StateFileShmemInit(); MultiXactShmemInit(); InitBufferPool(); --- postgresql-8.4.0.orig/src/backend/utils/misc/postgresql.conf.sample 2009-04-23 09:23:45.000000000 +0900 +++ postgresql-8.4.0/src/backend/utils/misc/postgresql.conf.sample 2009-08-06 10:03:40.000000000 +0900 @@ -112,6 +112,10 @@ # per transaction slot, plus lock space (see max_locks_per_transaction). # It is not advisable to set max_prepared_transactions nonzero unless you # actively intend to use prepared transactions. + +#state_file_max_space = 0 # maximum space reserved for one state file on shared memory + # 0 value equivalent on writing all files on disk + # default value set up at 0, averaged value at 768 #work_mem = 1MB # min 64kB #maintenance_work_mem = 16MB # min 1MB #max_stack_depth = 2MB # min 100kB