? src/backend/access/transam/readahead.c Index: src/backend/access/gin/ginxlog.c =================================================================== RCS file: /projects/cvsroot/pgsql/src/backend/access/gin/ginxlog.c,v retrieving revision 1.17 diff -c -r1.17 ginxlog.c *** src/backend/access/gin/ginxlog.c 20 Jan 2009 18:59:36 -0000 1.17 --- src/backend/access/gin/ginxlog.c 3 Mar 2009 02:47:34 -0000 *************** *** 14,19 **** --- 14,20 ---- #include "postgres.h" #include "access/gin.h" + #include "access/xlog.h" #include "access/xlogutils.h" #include "storage/bufmgr.h" #include "utils/memutils.h" *************** *** 521,526 **** --- 522,631 ---- } } + /* + * gin_readahead - enqueue information about data pages + * + * The readahead module stores information about pages that are modified through + * redo-ing record. + * + */ + bool + gin_readahead(XLogRecPtr lsn, XLogRecord *record) + { + uint8 info = record->xl_info & ~XLR_INFO_MASK; + + Assert(record); + + switch (info) + { + case XLOG_GIN_CREATE_INDEX: + { + RelFileNode *node = (RelFileNode *) XLogRecGetData(record); + + if (!ReadAheadHasRoom(1)) + return false; + ReadAheadAddEntry(*node, GIN_ROOT_BLKNO, lsn.xrecoff, false); + break; + } + case XLOG_GIN_CREATE_PTREE: + { + ginxlogCreatePostingTree *data = + (ginxlogCreatePostingTree *) XLogRecGetData(record); + + if (!ReadAheadHasRoom(1)) + return false; + ReadAheadAddEntry(data->node, data->blkno, lsn.xrecoff, false); + break; + } + case XLOG_GIN_INSERT: + { + ginxlogInsert *data = (ginxlogInsert *) XLogRecGetData(record); + + if (!ReadAheadHasRoom(1)) + return false; + ReadAheadAddEntry(data->node, data->blkno, + lsn.xrecoff, record->xl_info & XLR_BKP_BLOCK_1); + break; + } + case XLOG_GIN_SPLIT: + { + int readahead_cnt; + ginxlogSplit *data = (ginxlogSplit *) XLogRecGetData(record); + + readahead_cnt = 2; + if (data->isRootSplit) + readahead_cnt++; + + if (!ReadAheadHasRoom(readahead_cnt)) + return false; + + ReadAheadAddEntry(data->node, data->lblkno, lsn.xrecoff, false); + ReadAheadAddEntry(data->node, data->rblkno, lsn.xrecoff, false); + if (data->isRootSplit) + { + ReadAheadAddEntry(data->node, data->rootBlkno, + lsn.xrecoff, false); + } + break; + } + case XLOG_GIN_VACUUM_PAGE: + { + ginxlogVacuumPage *data = + (ginxlogVacuumPage *) XLogRecGetData(record); + + if (!ReadAheadHasRoom(1)) + return false; + ReadAheadAddEntry(data->node, data->blkno, + lsn.xrecoff, record->xl_info & XLR_BKP_BLOCK_1); + break; + } + case XLOG_GIN_DELETE_PAGE: + { + int readahead_cnt; + ginxlogDeletePage *data = + (ginxlogDeletePage *) XLogRecGetData(record); + readahead_cnt = 2; + if (data->leftBlkno != InvalidBlockNumber) + readahead_cnt++; + + if (!ReadAheadHasRoom(2)) + return false; + ReadAheadAddEntry(data->node, data->blkno, + lsn.xrecoff, record->xl_info & XLR_BKP_BLOCK_1); + ReadAheadAddEntry(data->node, data->parentBlkno, + lsn.xrecoff, record->xl_info & XLR_BKP_BLOCK_2); + if (data->leftBlkno != InvalidBlockNumber) + { + ReadAheadAddEntry(data->node, data->leftBlkno, + lsn.xrecoff, record->xl_info & XLR_BKP_BLOCK_3); + } + break; + } + } + + return true; + } + void gin_xlog_startup(void) { Index: src/backend/access/gist/gistxlog.c =================================================================== RCS file: /projects/cvsroot/pgsql/src/backend/access/gist/gistxlog.c,v retrieving revision 1.32 diff -c -r1.32 gistxlog.c *** src/backend/access/gist/gistxlog.c 20 Jan 2009 18:59:36 -0000 1.32 --- src/backend/access/gist/gistxlog.c 3 Mar 2009 02:47:34 -0000 *************** *** 14,19 **** --- 14,20 ---- #include "postgres.h" #include "access/gist_private.h" + #include "access/xlog.h" #include "access/xlogutils.h" #include "miscadmin.h" #include "storage/bufmgr.h" *************** *** 501,506 **** --- 502,585 ---- } } + /* + * gist_readahead - enqueue information about data pages + * + * The readahead module stores information about pages that are modified through + * redo-ing record. + * + */ + bool + gist_readahead(XLogRecPtr lsn, XLogRecord *record) + { + uint8 info = record->xl_info & ~XLR_INFO_MASK; + + Assert(record); + + switch (info) + { + case XLOG_GIST_PAGE_UPDATE: + case XLOG_GIST_NEW_ROOT: + { + PageUpdateRecord xlrec; + + decodePageUpdateRecord(&xlrec, record); + + if (!ReadAheadHasRoom(1)) + return false; + ReadAheadAddEntry(xlrec.data->node, xlrec.data->blkno, + lsn.xrecoff, record->xl_info & XLR_BKP_BLOCK_1); + break; + } + case XLOG_GIST_PAGE_SPLIT: + { + int i; + + PageSplitRecord rec; + decodePageSplitRecord(&rec, record); + + if (!ReadAheadHasRoom(rec.data->npage)) + return false; + for (i = 0; i < rec.data->npage; i++) + { + ReadAheadAddEntry(rec.data->node, rec.page[i].header->blkno, + lsn.xrecoff, false); + } + break; + } + case XLOG_GIST_INSERT_COMPLETE: + { + /* + * This WAL record never touch data page, so nothi ng + * to do. + */ + break; + } + case XLOG_GIST_CREATE_INDEX: + { + RelFileNode *node = (RelFileNode *) XLogRecGetData(record); + + if (!ReadAheadHasRoom(1)) + return false; + ReadAheadAddEntry(*node, GIST_ROOT_BLKNO, lsn.xrecoff, false); + break; + } + case XLOG_GIST_PAGE_DELETE: + { + gistxlogPageDelete *xldata = + (gistxlogPageDelete *) XLogRecGetData(record); + + if (!ReadAheadHasRoom(1)) + return false; + ReadAheadAddEntry(xldata->node, xldata->blkno, + lsn.xrecoff, record->xl_info & XLR_BKP_BLOCK_1); + break; + } + } + + return true; + } + IndexTuple gist_form_invalid_tuple(BlockNumber blkno) { Index: src/backend/access/heap/heapam.c =================================================================== RCS file: /projects/cvsroot/pgsql/src/backend/access/heap/heapam.c,v retrieving revision 1.274 diff -c -r1.274 heapam.c *** src/backend/access/heap/heapam.c 20 Jan 2009 18:59:36 -0000 1.274 --- src/backend/access/heap/heapam.c 3 Mar 2009 02:47:35 -0000 *************** *** 49,54 **** --- 49,55 ---- #include "access/valid.h" #include "access/visibilitymap.h" #include "access/xact.h" + #include "access/xlog.h" #include "access/xlogutils.h" #include "catalog/catalog.h" #include "catalog/namespace.h" *************** *** 4975,4980 **** --- 4976,5127 ---- } /* + * heap_readahead - enqueue information about data pages + * + * The readahead module stores information about pages that are modified through + * redo-ing record. + * + */ + bool + heap_readahead(XLogRecPtr lsn, XLogRecord *record) + { + uint8 info = record->xl_info & ~XLR_INFO_MASK; + + Assert(record); + + switch (info & XLOG_HEAP_OPMASK) + { + case XLOG_HEAP_INSERT: + { + xl_heap_insert *xlrec = + (xl_heap_insert *) XLogRecGetData(record); + + if (!ReadAheadHasRoom(1)) + return false; + ReadAheadAddEntry(xlrec->target.node, + ItemPointerGetBlockNumber(&xlrec->target.tid), + lsn.xrecoff, record->xl_info & XLR_BKP_BLOCK_1); + break; + } + case XLOG_HEAP_DELETE: + { + xl_heap_delete *xlrec = + (xl_heap_delete *) XLogRecGetData(record); + + if (!ReadAheadHasRoom(1)) + return false; + ReadAheadAddEntry(xlrec->target.node, + ItemPointerGetBlockNumber(&xlrec->target.tid), + lsn.xrecoff, record->xl_info & XLR_BKP_BLOCK_1); + break; + } + case XLOG_HEAP_UPDATE: + case XLOG_HEAP_MOVE: + case XLOG_HEAP_HOT_UPDATE: + { + bool samepage; + xl_heap_update *xlrec = + (xl_heap_update *) XLogRecGetData(record); + + samepage = ItemPointerGetBlockNumber(&xlrec->newtid) == + ItemPointerGetBlockNumber(&xlrec->target.tid); + + if (!ReadAheadHasRoom(1 + (samepage ? 0 : 1))) + return false; + /* store page which contains updated tuple. */ + ReadAheadAddEntry(xlrec->target.node, + ItemPointerGetBlockNumber(&xlrec->target.tid), + lsn.xrecoff, record->xl_info & XLR_BKP_BLOCK_1); + /* store another page if any. */ + if (!samepage) + ReadAheadAddEntry(xlrec->target.node, + ItemPointerGetBlockNumber(&xlrec->newtid), + lsn.xrecoff, record->xl_info & XLR_BKP_BLOCK_2); + break; + } + case XLOG_HEAP_NEWPAGE: + { + xl_heap_newpage *xlrec = + (xl_heap_newpage *) XLogRecGetData(record); + + if (!ReadAheadHasRoom(1)) + return false; + ReadAheadAddEntry(xlrec->node, xlrec->blkno, + lsn.xrecoff, false); + break; + } + case XLOG_HEAP_LOCK: + { + xl_heap_lock *xlrec = + (xl_heap_lock *) XLogRecGetData(record); + + if (!ReadAheadHasRoom(1)) + return false; + ReadAheadAddEntry(xlrec->target.node, + ItemPointerGetBlockNumber(&xlrec->target.tid), + lsn.xrecoff, record->xl_info & XLR_BKP_BLOCK_1); + break; + } + case XLOG_HEAP_INPLACE: + { + xl_heap_inplace *xlrec = + (xl_heap_inplace *) XLogRecGetData(record); + + if (!ReadAheadHasRoom(1)) + return false; + ReadAheadAddEntry(xlrec->target.node, + ItemPointerGetBlockNumber(&xlrec->target.tid), + lsn.xrecoff, record->xl_info & XLR_BKP_BLOCK_1); + break; + } + } + + return true; + } + + /* + * heap2_readahead - enqueue information about data pages + * + * The readahead module stores information about pages that are modified through + * redo-ing record. + * + */ + bool + heap2_readahead(XLogRecPtr lsn, XLogRecord *record) + { + Assert(record); + + switch (record->xl_info) + { + case XLOG_HEAP2_FREEZE: + { + xl_heap_freeze *xlrec = + (xl_heap_freeze *) XLogRecGetData(record); + + if (!ReadAheadHasRoom(1)) + return false; + ReadAheadAddEntry(xlrec->node, xlrec->block, + lsn.xrecoff, record->xl_info & XLR_BKP_BLOCK_1); + break; + } + case XLOG_HEAP2_CLEAN: + case XLOG_HEAP2_CLEAN_MOVE: + { + xl_heap_clean *xlrec = + (xl_heap_clean *) XLogRecGetData(record); + + if (!ReadAheadHasRoom(1)) + return false; + ReadAheadAddEntry(xlrec->node, xlrec->block, + lsn.xrecoff, record->xl_info & XLR_BKP_BLOCK_1); + break; + } + } + + return true; + } + + /* * heap_sync - sync a heap, for use when no WAL has been written * * This forces the heap contents (including TOAST heap if any) down to disk. Index: src/backend/access/nbtree/nbtxlog.c =================================================================== RCS file: /projects/cvsroot/pgsql/src/backend/access/nbtree/nbtxlog.c,v retrieving revision 1.54 diff -c -r1.54 nbtxlog.c *** src/backend/access/nbtree/nbtxlog.c 20 Jan 2009 18:59:37 -0000 1.54 --- src/backend/access/nbtree/nbtxlog.c 3 Mar 2009 02:47:35 -0000 *************** *** 16,21 **** --- 16,22 ---- #include "access/nbtree.h" #include "access/transam.h" + #include "access/xlog.h" #include "storage/bufmgr.h" /* *************** *** 880,885 **** --- 881,1016 ---- } } + /* + * btree_readahead - enqueue information about data pages + * + */ + bool + btree_readahead(XLogRecPtr lsn, XLogRecord *record) + { + uint8 info = record->xl_info & ~XLR_INFO_MASK; + + Assert(record); + + switch (info) + { + case XLOG_BTREE_INSERT_LEAF: + case XLOG_BTREE_INSERT_UPPER: + case XLOG_BTREE_INSERT_META: + { + int readahead_cnt; + xl_btree_insert *xlrec = + (xl_btree_insert *) XLogRecGetData(record); + + readahead_cnt = 1; + if (info == XLOG_BTREE_INSERT_META) + readahead_cnt++; + + if (!ReadAheadHasRoom(readahead_cnt)) + return false; + ReadAheadAddEntry(xlrec->target.node, + BlockIdGetBlockNumber(&xlrec->target.tid.ip_blkid), + lsn.xrecoff, record->xl_info & XLR_BKP_BLOCK_1); + if (info == XLOG_BTREE_INSERT_META) + { + ReadAheadAddEntry(xlrec->target.node, + BTREE_METAPAGE, lsn.xrecoff, false); + } + break; + } + case XLOG_BTREE_SPLIT_L: + case XLOG_BTREE_SPLIT_L_ROOT: + case XLOG_BTREE_SPLIT_R: + case XLOG_BTREE_SPLIT_R_ROOT: + { + int readahead_cnt; + xl_btree_split *xlrec = + (xl_btree_split *) XLogRecGetData(record); + + readahead_cnt = 2; + if (xlrec->rnext != P_NONE) + readahead_cnt++; + + if (!ReadAheadHasRoom(readahead_cnt)) + return false; + + ReadAheadAddEntry(xlrec->node, xlrec->rightsib, + lsn.xrecoff, false); + ReadAheadAddEntry(xlrec->node, xlrec->leftsib, + lsn.xrecoff, record->xl_info & XLR_BKP_BLOCK_1); + if (xlrec->rnext != P_NONE) + { + ReadAheadAddEntry(xlrec->node, xlrec->rnext, + lsn.xrecoff, record->xl_info & XLR_BKP_BLOCK_2); + } + break; + } + case XLOG_BTREE_DELETE: + { + xl_btree_delete *xlrec = + (xl_btree_delete *) XLogRecGetData(record); + + if (!ReadAheadHasRoom(1)) + return false; + ReadAheadAddEntry(xlrec->node, xlrec->block, + lsn.xrecoff, record->xl_info & XLR_BKP_BLOCK_1); + break; + } + case XLOG_BTREE_DELETE_PAGE: + case XLOG_BTREE_DELETE_PAGE_META: + case XLOG_BTREE_DELETE_PAGE_HALF: + { + int readahead_cnt; + xl_btree_delete_page *xlrec = + (xl_btree_delete_page *) XLogRecGetData(record); + + readahead_cnt = 3; + if (info == XLOG_BTREE_DELETE_PAGE_META) + readahead_cnt++; + if (xlrec->leftblk != P_NONE) + readahead_cnt++; + + /* parent page */ + ReadAheadAddEntry(xlrec->target.node, + ItemPointerGetBlockNumber(&(xlrec->target.tid)), + lsn.xrecoff, record->xl_info & XLR_BKP_BLOCK_1); + /* rightsib page */ + ReadAheadAddEntry(xlrec->target.node, xlrec->rightblk, + lsn.xrecoff, record->xl_info & XLR_BKP_BLOCK_2); + /* leftsib page, if exists */ + if (xlrec->leftblk != P_NONE) + { + ReadAheadAddEntry(xlrec->target.node, xlrec->leftblk, + lsn.xrecoff, record->xl_info & XLR_BKP_BLOCK_3); + } + /* target page */ + ReadAheadAddEntry(xlrec->target.node, + xlrec->deadblk, lsn.xrecoff, false); + /* metapage, if exists */ + if (info == XLOG_BTREE_DELETE_PAGE_META) + { + ReadAheadAddEntry(xlrec->target.node, + BTREE_METAPAGE, lsn.xrecoff, false); + } + break; + } + case XLOG_BTREE_NEWROOT: + { + xl_btree_newroot *xlrec = + (xl_btree_newroot *) XLogRecGetData(record); + + if (!ReadAheadHasRoom(1)) + return false; + /* FPW does not exists. */ + ReadAheadAddEntry(xlrec->node, xlrec->rootblk, + lsn.xrecoff, false); + break; + } + } + + return true; + } + void btree_xlog_startup(void) { Index: src/backend/access/transam/Makefile =================================================================== RCS file: /projects/cvsroot/pgsql/src/backend/access/transam/Makefile,v retrieving revision 1.22 diff -c -r1.22 Makefile *** src/backend/access/transam/Makefile 19 Feb 2008 10:30:07 -0000 1.22 --- src/backend/access/transam/Makefile 3 Mar 2009 02:47:35 -0000 *************** *** 12,18 **** top_builddir = ../../../.. include $(top_builddir)/src/Makefile.global ! OBJS = clog.o transam.o varsup.o xact.o xlog.o xlogutils.o rmgr.o slru.o subtrans.o multixact.o twophase.o twophase_rmgr.o include $(top_srcdir)/src/backend/common.mk --- 12,18 ---- top_builddir = ../../../.. include $(top_builddir)/src/Makefile.global ! OBJS = clog.o transam.o varsup.o xact.o xlog.o xlogutils.o rmgr.o slru.o subtrans.o multixact.o twophase.o twophase_rmgr.o readahead.o include $(top_srcdir)/src/backend/common.mk Index: src/backend/access/transam/rmgr.c =================================================================== RCS file: /projects/cvsroot/pgsql/src/backend/access/transam/rmgr.c,v retrieving revision 1.27 diff -c -r1.27 rmgr.c *** src/backend/access/transam/rmgr.c 19 Nov 2008 10:34:50 -0000 1.27 --- src/backend/access/transam/rmgr.c 3 Mar 2009 02:47:35 -0000 *************** *** 24,43 **** const RmgrData RmgrTable[RM_MAX_ID + 1] = { ! {"XLOG", xlog_redo, xlog_desc, NULL, NULL, NULL}, ! {"Transaction", xact_redo, xact_desc, NULL, NULL, NULL}, ! {"Storage", smgr_redo, smgr_desc, NULL, NULL, NULL}, ! {"CLOG", clog_redo, clog_desc, NULL, NULL, NULL}, ! {"Database", dbase_redo, dbase_desc, NULL, NULL, NULL}, ! {"Tablespace", tblspc_redo, tblspc_desc, NULL, NULL, NULL}, ! {"MultiXact", multixact_redo, multixact_desc, NULL, NULL, NULL}, ! {"Reserved 7", NULL, NULL, NULL, NULL, NULL}, ! {"Reserved 8", NULL, NULL, NULL, NULL, NULL}, ! {"Heap2", heap2_redo, heap2_desc, NULL, NULL, NULL}, ! {"Heap", heap_redo, heap_desc, NULL, NULL, NULL}, ! {"Btree", btree_redo, btree_desc, btree_xlog_startup, btree_xlog_cleanup, btree_safe_restartpoint}, ! {"Hash", hash_redo, hash_desc, NULL, NULL, NULL}, ! {"Gin", gin_redo, gin_desc, gin_xlog_startup, gin_xlog_cleanup, gin_safe_restartpoint}, ! {"Gist", gist_redo, gist_desc, gist_xlog_startup, gist_xlog_cleanup, gist_safe_restartpoint}, ! {"Sequence", seq_redo, seq_desc, NULL, NULL, NULL} }; --- 24,43 ---- const RmgrData RmgrTable[RM_MAX_ID + 1] = { ! {"XLOG", xlog_redo, xlog_desc, NULL, NULL, NULL, NULL}, ! {"Transaction", xact_redo, xact_desc, NULL, NULL, NULL, NULL}, ! {"Storage", smgr_redo, smgr_desc, NULL, NULL, NULL, NULL}, ! {"CLOG", clog_redo, clog_desc, NULL, NULL, NULL, NULL}, ! {"Database", dbase_redo, dbase_desc, NULL, NULL, NULL, NULL}, ! {"Tablespace", tblspc_redo, tblspc_desc, NULL, NULL, NULL, NULL}, ! {"MultiXact", multixact_redo, multixact_desc, NULL, NULL, NULL, NULL}, ! {"Reserved 7", NULL, NULL, NULL, NULL, NULL, NULL}, ! {"Reserved 8", NULL, NULL, NULL, NULL, NULL, NULL}, ! {"Heap2", heap2_redo, heap2_desc, NULL, NULL, NULL, heap2_readahead}, ! {"Heap", heap_redo, heap_desc, NULL, NULL, NULL, heap_readahead}, ! {"Btree", btree_redo, btree_desc, btree_xlog_startup, btree_xlog_cleanup, btree_safe_restartpoint, btree_readahead}, ! {"Hash", hash_redo, hash_desc, NULL, NULL, NULL, NULL}, ! {"Gin", gin_redo, gin_desc, gin_xlog_startup, gin_xlog_cleanup, gin_safe_restartpoint, gin_readahead}, ! {"Gist", gist_redo, gist_desc, gist_xlog_startup, gist_xlog_cleanup, gist_safe_restartpoint, gist_readahead}, ! {"Sequence", seq_redo, seq_desc, NULL, NULL, NULL, seq_readahead} }; Index: src/backend/access/transam/xlog.c =================================================================== RCS file: /projects/cvsroot/pgsql/src/backend/access/transam/xlog.c,v retrieving revision 1.332 diff -c -r1.332 xlog.c *** src/backend/access/transam/xlog.c 23 Feb 2009 09:28:49 -0000 1.332 --- src/backend/access/transam/xlog.c 3 Mar 2009 02:47:36 -0000 *************** *** 416,421 **** --- 416,433 ---- static char *readRecordBuf = NULL; static uint32 readRecordBufSize = 0; + /* + * Buffer for queued WAL records (fixed size) + * + * This buffer is used for holding WAL records and their LSNs. When the all WAL + * records of one WAL segment file are read, redo them and make the buffer + * empty. Therefore, twice of XLogSegSize, determined by the total size of WAL + * records and LSNs, must be enough for the buffer. + */ + #define RECORD_QUEUE_BUF_SIZE (XLogSegSize * 2) + static char *RecordQueueBuf = NULL; + static uint32 RecordQueueBufUsed = 0; + /* State information for XLOG reading */ static XLogRecPtr ReadRecPtr; /* start of last record read */ static XLogRecPtr EndRecPtr; /* end+1 of last record read */ *************** *** 489,494 **** --- 501,509 ---- static void rm_redo_error_callback(void *arg); static int get_sync_bit(int method); + static void PushRecord(XLogRecPtr lsn, XLogRecord *record); + static void PushReadAhead(XLogRecPtr lsn, XLogRecord *record); + static void RedoRecords(void); /* * Insert an XLOG record having the specified RMID and info bytes, *************** *** 2491,2496 **** --- 2506,2513 ---- ListCell *cell; int fd; + ereport(DEBUG1, (errmsg("XLOG switch to %X/%X", log, seg))); + /* * Loop looking for a suitable timeline ID: we might need to read any of * the timelines listed in expectedTLIs. *************** *** 2512,2517 **** --- 2529,2541 ---- if (InArchiveRecovery) { + /* + * Wait until next WAL segment file. It might takes long time. + * Therefore, redo with stored WAL records and LSNs here. + */ + ereport(DEBUG1, (errmsg("XLOG will be switched"))); + RedoRecords(); + /* Report recovery progress in PS display */ snprintf(activitymsg, sizeof(activitymsg), "waiting for %s", xlogfname); *************** *** 3576,3581 **** --- 3600,3612 ---- return (XLogRecord *) buffer; next_record_is_invalid:; + /* + * Reached to unused area of current WAL segment file, redo all of WAL + * records in the queue. + */ + ereport(DEBUG1, (errmsg("next record is invalid(maybe unused area)"))); + RedoRecords(); + if (readFile >= 0) { close(readFile); *************** *** 5077,5082 **** --- 5108,5124 ---- ValidateXLOGDirectoryStructure(); /* + * To postpone the actual redo, store WAL records and EndRecPtrs. + * Therefore, this buffer must be allocated here because the buffer + * will be used by RedoRecords(); ReadRecord() may call RedoRecords(). + */ + RecordQueueBuf = (char *) malloc(RECORD_QUEUE_BUF_SIZE); + Assert(RecordQueueBuf != NULL); + + /* Allocate the buffer for storing information about data pages. */ + ReadAheadInit(); + + /* * Initialize on the assumption we want to recover to the same timeline * that's active according to pg_control. */ *************** *** 5303,5309 **** bool recoveryContinue = true; bool recoveryApply = true; bool reachedMinRecoveryPoint = false; - ErrorContextCallback errcontext; /* use volatile pointer to prevent code rearrangement */ volatile XLogCtlData *xlogctl = XLogCtl; --- 5345,5350 ---- *************** *** 5397,5416 **** break; } - /* Setup error traceback support for ereport() */ - errcontext.callback = rm_redo_error_callback; - errcontext.arg = (void *) record; - errcontext.previous = error_context_stack; - error_context_stack = &errcontext; - - /* nextXid must be beyond record's xid */ - if (TransactionIdFollowsOrEquals(record->xl_xid, - ShmemVariableCache->nextXid)) - { - ShmemVariableCache->nextXid = record->xl_xid; - TransactionIdAdvance(ShmemVariableCache->nextXid); - } - /* * Update shared replayEndRecPtr before replaying this * record, so that XLogFlush will update minRecoveryPoint --- 5438,5443 ---- *************** *** 5420,5435 **** xlogctl->replayEndRecPtr = EndRecPtr; SpinLockRelease(&xlogctl->info_lck); ! RmgrTable[record->xl_rmid].rm_redo(EndRecPtr, record); ! /* Pop the error context stack */ ! error_context_stack = errcontext.previous; LastRec = ReadRecPtr; record = ReadRecord(NULL, LOG); } while (record != NULL && recoveryContinue); /* * end of main redo apply loop */ --- 5447,5480 ---- xlogctl->replayEndRecPtr = EndRecPtr; SpinLockRelease(&xlogctl->info_lck); ! /* ! * Push WAL record in WAL record buffer with its LSN for ! * delayed redo. ! * If the WAL record queue is full, redo all WAL records in the ! * queue and make the queue empty. ! */ ! ereport(DEBUG1, ! (errmsg("WAL record queue is used %d(%d) bytes at %X/%08X.", ! RecordQueueBufUsed, record->xl_tot_len, ! EndRecPtr.xlogid, EndRecPtr.xrecoff))); ! PushRecord(EndRecPtr, record); ! /* ! * Push page information to prefetch later. ! * If no more space, redo all records in queue and make the ! * queue empty. ! */ ! PushReadAhead(EndRecPtr, record); LastRec = ReadRecPtr; record = ReadRecord(NULL, LOG); } while (record != NULL && recoveryContinue); + /* All WAL records are read, redo all queued WAL records. */ + ereport(DEBUG1, (errmsg("end of redo apply loop"))); + RedoRecords(); + /* * end of main redo apply loop */ *************** *** 5693,5698 **** --- 5738,5839 ---- return LocalRecoveryInProgress; } + if (RecordQueueBuf) + { + free(RecordQueueBuf); + RecordQueueBufUsed = 0; + ReadAheadFinish(); + } + } + + /* + * Push the pair of WAL record and its LSN. + * Both WAL records and LSNs are aligned as same as WAL segment file. + */ + static void + PushRecord(XLogRecPtr lsn, XLogRecord *record) + { + Assert(record); + + if (RecordQueueBufUsed + MAXALIGN(sizeof(XLogRecPtr)) + + MAXALIGN(record->xl_tot_len) > RECORD_QUEUE_BUF_SIZE) + { + ereport(DEBUG1, (errmsg("WAL record queue is full."))); + RedoRecords(); + } + + memcpy(RecordQueueBuf + RecordQueueBufUsed, &lsn, sizeof(XLogRecPtr)); + RecordQueueBufUsed += MAXALIGN(sizeof(XLogRecPtr)); + memcpy(RecordQueueBuf + RecordQueueBufUsed, record, record->xl_tot_len); + RecordQueueBufUsed += MAXALIGN(record->xl_tot_len); + } + + /* + * Push page information to readahead module. + */ + static void + PushReadAhead(XLogRecPtr lsn, XLogRecord *record) + { + Assert(record); + + if (!RmgrTable[record->xl_rmid].rm_readahead) + return; + + while (!RmgrTable[record->xl_rmid].rm_readahead(lsn, record)) + { + ereport(DEBUG1, (errmsg("ReadAhead queue is full."))); + RedoRecords(); + } + } + + /* + * Redo all WAL records in the queue and make the it empty. + */ + static void + RedoRecords(void) + { + ErrorContextCallback errcontext; + uint32 off = 0; + + /* Readahead data pages which will be modified during redo. */ + ReadAheadExecute(); + + while (off < RecordQueueBufUsed) + { + XLogRecPtr lsn; + XLogRecord *record; + + /* Extract LSN and WAL record image from local buffer. */ + memcpy(&lsn, RecordQueueBuf + off, sizeof(XLogRecPtr)); + off += MAXALIGN(sizeof(XLogRecPtr)); + record = (XLogRecord *)(RecordQueueBuf + off); + + /* Setup error traceback support for ereport() */ + errcontext.callback = rm_redo_error_callback; + errcontext.arg = (void *) record; + errcontext.previous = error_context_stack; + error_context_stack = &errcontext; + + /* nextXid must be beyond record's xid */ + if (TransactionIdFollowsOrEquals(record->xl_xid, + ShmemVariableCache->nextXid)) + { + ShmemVariableCache->nextXid = record->xl_xid; + TransactionIdAdvance(ShmemVariableCache->nextXid); + } + + /* Redo with WAL record and its LSN. */ + RmgrTable[record->xl_rmid].rm_redo(lsn, record); + + /* Pop the error context stack */ + error_context_stack = errcontext.previous; + + off += MAXALIGN(record->xl_tot_len); + } + + /* Make RecordQueueBuf empty. */ + MemSet(RecordQueueBuf, 0, sizeof(RecordQueueBuf)); + RecordQueueBufUsed = 0; } /* Index: src/backend/commands/sequence.c =================================================================== RCS file: /projects/cvsroot/pgsql/src/backend/commands/sequence.c,v retrieving revision 1.158 diff -c -r1.158 sequence.c *** src/backend/commands/sequence.c 2 Feb 2009 19:31:38 -0000 1.158 --- src/backend/commands/sequence.c 3 Mar 2009 02:47:37 -0000 *************** *** 17,22 **** --- 17,23 ---- #include "access/heapam.h" #include "access/transam.h" #include "access/xact.h" + #include "access/xlog.h" #include "access/xlogutils.h" #include "catalog/dependency.h" #include "catalog/namespace.h" *************** *** 1385,1387 **** --- 1386,1418 ---- appendStringInfo(buf, "rel %u/%u/%u", xlrec->node.spcNode, xlrec->node.dbNode, xlrec->node.relNode); } + + /* + * seq_readahead - enqueue information about data pages + * + * The readahead module stores information about pages that are modified through + * redo-ing record. + * + */ + bool + seq_readahead(XLogRecPtr lsn, XLogRecord *record) + { + uint8 info = record->xl_info & ~XLR_INFO_MASK; + + Assert(record); + + switch (info) + { + case XLOG_SEQ_LOG: + { + xl_seq_rec *xlrec = (xl_seq_rec *) XLogRecGetData(record); + + if (!ReadAheadHasRoom(1)) + return false; + ReadAheadAddEntry(xlrec->node, 0, lsn.xrecoff, false); + break; + } + } + + return true; + } Index: src/backend/storage/smgr/md.c =================================================================== RCS file: /projects/cvsroot/pgsql/src/backend/storage/smgr/md.c,v retrieving revision 1.144 diff -c -r1.144 md.c *** src/backend/storage/smgr/md.c 12 Jan 2009 05:10:44 -0000 1.144 --- src/backend/storage/smgr/md.c 3 Mar 2009 02:47:47 -0000 *************** *** 560,566 **** off_t seekpos; MdfdVec *v; ! v = _mdfd_getseg(reln, forknum, blocknum, false, EXTENSION_FAIL); seekpos = (off_t) BLCKSZ * (blocknum % ((BlockNumber) RELSEG_SIZE)); Assert(seekpos < (off_t) BLCKSZ * RELSEG_SIZE); --- 560,568 ---- off_t seekpos; MdfdVec *v; ! v = _mdfd_getseg(reln, forknum, blocknum, false, EXTENSION_RETURN_NULL); ! if (!v) ! return; seekpos = (off_t) BLCKSZ * (blocknum % ((BlockNumber) RELSEG_SIZE)); Assert(seekpos < (off_t) BLCKSZ * RELSEG_SIZE); Index: src/include/access/gin.h =================================================================== RCS file: /projects/cvsroot/pgsql/src/include/access/gin.h,v retrieving revision 1.28 diff -c -r1.28 gin.h *** src/include/access/gin.h 10 Jan 2009 21:08:36 -0000 1.28 --- src/include/access/gin.h 3 Mar 2009 02:48:03 -0000 *************** *** 256,261 **** --- 256,262 ---- /* ginxlog.c */ extern void gin_redo(XLogRecPtr lsn, XLogRecord *record); extern void gin_desc(StringInfo buf, uint8 xl_info, char *rec); + extern bool gin_readahead(XLogRecPtr lsn, XLogRecord *record); extern void gin_xlog_startup(void); extern void gin_xlog_cleanup(void); extern bool gin_safe_restartpoint(void); Index: src/include/access/gist_private.h =================================================================== RCS file: /projects/cvsroot/pgsql/src/include/access/gist_private.h,v retrieving revision 1.36 diff -c -r1.36 gist_private.h *** src/include/access/gist_private.h 1 Jan 2009 17:23:55 -0000 1.36 --- src/include/access/gist_private.h 3 Mar 2009 02:48:03 -0000 *************** *** 250,255 **** --- 250,256 ---- /* gistxlog.c */ extern void gist_redo(XLogRecPtr lsn, XLogRecord *record); extern void gist_desc(StringInfo buf, uint8 xl_info, char *rec); + extern bool gist_readahead(XLogRecPtr lsn, XLogRecord *record); extern void gist_xlog_startup(void); extern void gist_xlog_cleanup(void); extern bool gist_safe_restartpoint(void); Index: src/include/access/heapam.h =================================================================== RCS file: /projects/cvsroot/pgsql/src/include/access/heapam.h,v retrieving revision 1.141 diff -c -r1.141 heapam.h *** src/include/access/heapam.h 1 Jan 2009 17:23:56 -0000 1.141 --- src/include/access/heapam.h 3 Mar 2009 02:48:03 -0000 *************** *** 124,131 **** --- 124,133 ---- extern void heap_redo(XLogRecPtr lsn, XLogRecord *rptr); extern void heap_desc(StringInfo buf, uint8 xl_info, char *rec); + extern bool heap_readahead(XLogRecPtr lsn, XLogRecord *rptr); extern void heap2_redo(XLogRecPtr lsn, XLogRecord *rptr); extern void heap2_desc(StringInfo buf, uint8 xl_info, char *rec); + extern bool heap2_readahead(XLogRecPtr lsn, XLogRecord *rptr); extern XLogRecPtr log_heap_move(Relation reln, Buffer oldbuf, ItemPointerData from, Index: src/include/access/nbtree.h =================================================================== RCS file: /projects/cvsroot/pgsql/src/include/access/nbtree.h,v retrieving revision 1.123 diff -c -r1.123 nbtree.h *** src/include/access/nbtree.h 1 Jan 2009 17:23:56 -0000 1.123 --- src/include/access/nbtree.h 3 Mar 2009 02:48:03 -0000 *************** *** 591,596 **** --- 591,597 ---- */ extern void btree_redo(XLogRecPtr lsn, XLogRecord *record); extern void btree_desc(StringInfo buf, uint8 xl_info, char *rec); + extern bool btree_readahead(XLogRecPtr lns, XLogRecord *rptr); extern void btree_xlog_startup(void); extern void btree_xlog_cleanup(void); extern bool btree_safe_restartpoint(void); Index: src/include/access/xlog.h =================================================================== RCS file: /projects/cvsroot/pgsql/src/include/access/xlog.h,v retrieving revision 1.91 diff -c -r1.91 xlog.h *** src/include/access/xlog.h 18 Feb 2009 15:58:41 -0000 1.91 --- src/include/access/xlog.h 3 Mar 2009 02:48:03 -0000 *************** *** 14,20 **** --- 14,23 ---- #include "access/rmgr.h" #include "access/xlogdefs.h" #include "lib/stringinfo.h" + #include "postgres.h" #include "storage/buf.h" + #include "storage/relfilenode.h" + #include "storage/block.h" #include "utils/pg_crc.h" #include "utils/timestamp.h" *************** *** 198,203 **** --- 201,207 ---- extern void xlog_redo(XLogRecPtr lsn, XLogRecord *record); extern void xlog_desc(StringInfo buf, uint8 xl_info, char *rec); + extern bool xlog_readahead(XLogRecPtr lsn, XLogRecord *rptr); extern bool RecoveryInProgress(void); *************** *** 217,220 **** --- 221,232 ---- extern void StartupProcessMain(void); + /* Implimented in readahead.c. */ + extern void ReadAheadInit(void); + extern void ReadAheadAddEntry(RelFileNode node, BlockNumber blkno, + uint32 xrecoff, bool has_fpw); + extern bool ReadAheadHasRoom(int num); + extern void ReadAheadExecute(void); + extern void ReadAheadFinish(void); + #endif /* XLOG_H */ Index: src/include/access/xlog_internal.h =================================================================== RCS file: /projects/cvsroot/pgsql/src/include/access/xlog_internal.h,v retrieving revision 1.25 diff -c -r1.25 xlog_internal.h *** src/include/access/xlog_internal.h 1 Jan 2009 17:23:56 -0000 1.25 --- src/include/access/xlog_internal.h 3 Mar 2009 02:48:03 -0000 *************** *** 235,240 **** --- 235,241 ---- void (*rm_startup) (void); void (*rm_cleanup) (void); bool (*rm_safe_restartpoint) (void); + bool (*rm_readahead) (XLogRecPtr lsn, XLogRecord *rptr); } RmgrData; extern const RmgrData RmgrTable[]; Index: src/include/commands/sequence.h =================================================================== RCS file: /projects/cvsroot/pgsql/src/include/commands/sequence.h,v retrieving revision 1.42 diff -c -r1.42 sequence.h *** src/include/commands/sequence.h 1 Jan 2009 17:23:58 -0000 1.42 --- src/include/commands/sequence.h 3 Mar 2009 02:48:03 -0000 *************** *** 98,102 **** --- 98,103 ---- extern void seq_redo(XLogRecPtr lsn, XLogRecord *rptr); extern void seq_desc(StringInfo buf, uint8 xl_info, char *rec); + extern bool seq_readahead(XLogRecPtr lsn, XLogRecord *record); #endif /* SEQUENCE_H */