Index: src/backend/access/heap/pruneheap.c
===================================================================
RCS file: /repositories/postgreshome/cvs/pgsql/src/backend/access/heap/pruneheap.c,v
retrieving revision 1.16
diff -c -p -r1.16 pruneheap.c
*** src/backend/access/heap/pruneheap.c	13 Jul 2008 20:45:47 -0000	1.16
--- src/backend/access/heap/pruneheap.c	8 Dec 2008 13:24:53 -0000
***************
*** 17,22 ****
--- 17,23 ----
  #include "access/heapam.h"
  #include "access/htup.h"
  #include "access/transam.h"
+ #include "access/visibilitymap.h"
  #include "miscadmin.h"
  #include "pgstat.h"
  #include "storage/bufmgr.h"
*************** typedef struct
*** 39,44 ****
--- 40,50 ----
  	OffsetNumber nowunused[MaxHeapTuplesPerPage];
  	/* marked[i] is TRUE if item i is entered in one of the above arrays */
  	bool		marked[MaxHeapTuplesPerPage + 1];
+ 	/* 
+ 	 * all_visible is TRUE if all tuples in the page are visible to all
+ 	 * transactions and there are no DEAD line pointers in the page.
+ 	 */
+ 	bool		all_visible;
  } PruneState;
  
  /* Local functions */
*************** heap_page_prune_opt(Relation relation, B
*** 118,125 ****
  			(void) heap_page_prune(relation, buffer, OldestXmin, false, true);
  		}
  
! 		/* And release buffer lock */
! 		LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
  	}
  }
  
--- 124,150 ----
  			(void) heap_page_prune(relation, buffer, OldestXmin, false, true);
  		}
  
! 		/*
! 		 * Since the visibility map page may require an I/O, release the buffer
! 		 * lock before updating the visibility map.
! 		 */
! 		if (PageIsAllVisible(page))
! 		{
! 			Buffer vmbuffer = InvalidBuffer;
! 			/* Release buffer lock */
! 			LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
! 
! 			visibilitymap_pin(relation, BufferGetBlockNumber(buffer), &vmbuffer);
! 			LockBuffer(buffer, BUFFER_LOCK_SHARE);
! 			if (PageIsAllVisible(page))
! 				visibilitymap_set(relation, BufferGetBlockNumber(buffer),
! 								  PageGetLSN(page), &vmbuffer);
! 			LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
! 			if (BufferIsValid(vmbuffer))
! 				ReleaseBuffer(vmbuffer);
! 		}
! 		else
! 			LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
  	}
  }
  
*************** heap_page_prune(Relation relation, Buffe
*** 178,183 ****
--- 203,209 ----
  	prstate.new_prune_xid = InvalidTransactionId;
  	prstate.nredirected = prstate.ndead = prstate.nunused = 0;
  	memset(prstate.marked, 0, sizeof(prstate.marked));
+ 	prstate.all_visible = true;
  
  	/* Scan the page */
  	maxoff = PageGetMaxOffsetNumber(page);
*************** heap_page_prune(Relation relation, Buffe
*** 193,200 ****
  
  		/* Nothing to do if slot is empty or already dead */
  		itemid = PageGetItemId(page, offnum);
! 		if (!ItemIdIsUsed(itemid) || ItemIdIsDead(itemid))
  			continue;
  
  		/* Process this item or chain of items */
  		ndeleted += heap_prune_chain(relation, buffer, offnum,
--- 219,232 ----
  
  		/* Nothing to do if slot is empty or already dead */
  		itemid = PageGetItemId(page, offnum);
! 		if (!ItemIdIsUsed(itemid))
! 		   continue;
! 		if (ItemIdIsDead(itemid))
! 		{
! 			/* VACUUM is required to reclaim DEAD line pointers */
! 			prstate.all_visible = false;
  			continue;
+ 		}
  
  		/* Process this item or chain of items */
  		ndeleted += heap_prune_chain(relation, buffer, offnum,
*************** heap_page_prune(Relation relation, Buffe
*** 245,250 ****
--- 277,291 ----
  		 */
  		PageClearFull(page);
  
+ 		/* Update the all-visible flag on the page */
+ 		if (!PageIsAllVisible(page) && prstate.all_visible)
+ 			PageSetAllVisible(page);
+ 		else if (PageIsAllVisible(page) && !prstate.all_visible)
+ 		{
+ 			elog(WARNING, "PD_ALL_VISIBLE flag was incorrectly set");
+ 			PageClearAllVisible(page);
+ 		}
+ 
  		MarkBufferDirty(buffer);
  
  		/*
*************** heap_page_prune(Relation relation, Buffe
*** 282,287 ****
--- 323,341 ----
  			PageClearFull(page);
  			SetBufferCommitInfoNeedsSave(buffer);
  		}
+ 
+ 		/* Update the all-visible flag on the page */
+ 		if (!PageIsAllVisible(page) && prstate.all_visible)
+ 		{
+ 			PageSetAllVisible(page);
+ 			SetBufferCommitInfoNeedsSave(buffer);
+ 		}
+ 		else if (PageIsAllVisible(page) && !prstate.all_visible)
+ 		{
+ 			elog(WARNING, "PD_ALL_VISIBLE flag was incorrectly set");
+ 			PageClearAllVisible(page);
+ 			SetBufferCommitInfoNeedsSave(buffer);
+ 		}
  	}
  
  	END_CRIT_SECTION();
*************** heap_page_prune(Relation relation, Buffe
*** 312,318 ****
  	 * In any case, the current FSM implementation doesn't accept
  	 * one-page-at-a-time updates, so this is all academic for now.
  	 */
- 
  	return ndeleted;
  }
  
--- 366,371 ----
*************** heap_prune_chain(Relation relation, Buff
*** 448,454 ****
--- 501,510 ----
  		 * function.)
  		 */
  		if (ItemIdIsDead(lp))
+ 		{
+ 			prstate->all_visible = false;
  			break;
+ 		}
  
  		Assert(ItemIdIsNormal(lp));
  		htup = (HeapTupleHeader) PageGetItem(dp, lp);
*************** heap_prune_chain(Relation relation, Buff
*** 474,479 ****
--- 530,540 ----
  		{
  			case HEAPTUPLE_DEAD:
  				tupdead = true;
+ 				/*
+ 				 * This DEAD tuple will certainly be removed by the prune
+ 				 * operation. So this can not change the all_visible state of
+ 				 * the page.
+ 				 */
  				break;
  
  			case HEAPTUPLE_RECENTLY_DEAD:
*************** heap_prune_chain(Relation relation, Buff
*** 485,490 ****
--- 546,552 ----
  				 */
  				heap_prune_record_prunable(prstate,
  										   HeapTupleHeaderGetXmax(htup));
+ 				prstate->all_visible = false;
  				break;
  
  			case HEAPTUPLE_DELETE_IN_PROGRESS:
*************** heap_prune_chain(Relation relation, Buff
*** 495,503 ****
--- 557,596 ----
  				 */
  				heap_prune_record_prunable(prstate,
  										   HeapTupleHeaderGetXmax(htup));
+ 				prstate->all_visible = false;
  				break;
  
  			case HEAPTUPLE_LIVE:
+ 				/*
+ 				 * Is the tuple definitely visible to all transactions?
+ 				 *
+ 				 * NB: Like with per-tuple hint bits, we can't set the
+ 				 * PD_ALL_VISIBLE flag if the inserter committed
+ 				 * asynchronously. See SetHintBits for more info. Check
+ 				 * that the HEAP_XMIN_COMMITTED hint bit is set because of
+ 				 * that.
+ 				 */
+ 				if (prstate->all_visible)
+ 				{
+ 					TransactionId xmin;
+ 
+ 					if (!(htup->t_infomask & HEAP_XMIN_COMMITTED))
+ 					{
+ 						prstate->all_visible = false;
+ 						break;
+ 					}
+ 					/*
+ 					 * The inserter definitely committed. But is it
+ 					 * old enough that everyone sees it as committed?
+ 					 */
+ 					xmin = HeapTupleHeaderGetXmin(htup);
+ 					if (!TransactionIdPrecedes(xmin, OldestXmin))
+ 					{
+ 						prstate->all_visible = false;
+ 						break;
+ 					}
+ 				}
+ 				break;
  			case HEAPTUPLE_INSERT_IN_PROGRESS:
  
  				/*
*************** heap_prune_chain(Relation relation, Buff
*** 506,511 ****
--- 599,605 ----
  				 * But we don't.  See related decisions about when to mark the
  				 * page prunable in heapam.c.
  				 */
+ 				prstate->all_visible = false;
  				break;
  
  			default:
*************** heap_prune_record_dead(PruneState *prsta
*** 673,678 ****
--- 767,774 ----
  	prstate->ndead++;
  	Assert(!prstate->marked[offnum]);
  	prstate->marked[offnum] = true;
+ 	/* DEAD line pointers can only be removed by VACUUM */
+ 	prstate->all_visible = false;
  }
  
  /* Record item pointer to be marked unused */
