Per-table freeze limit proposal

Started by Alvaro Herreraover 20 years ago14 messages
#1Alvaro Herrera
alvherre@alvh.no-ip.org

Hackers,

As you've probably heard too many times already, I'm thinking in
improving vacuum, so we can keep track of the freeze Xid on a table
level, rather than database level. Hopefully this will eliminate the
need for database-wide vacuums.

In fact this seems pretty easy to do. Add a field to pg_class, tell
VACUUM to update it using the determined freezeLimit, and that's it.
(Note that if we ever implement partial vacuum, it won't be able to
update the freeze point. But that was true before anyway.)

We also need to teach autovacuum to update pg_database.datfreezexid,
using the minimum from pg_class. (I don't think it's a good idea to
seqscan pg_class to find out the minimum on each VACUUM call.) So, an
autovacuum iteration would issue all needed VACUUM/ANALYZE calls, then
get the minimum freezexid from pg_class to update pg_database. This
way, GetNewTransactionId can continue checking pg_database.datfreezexid
as the hard limit for issuing warnings for Xid wraparound.

Does anyone see a need for anything other than the autovacuum process to
be updating pg_database.datfreezexid? Of course, if autovacuum is not
in use, things would continue as now, that is, manual database-wide
VACUUM calls updating pg_database.datfreezexid. But note that you can
mark all tables as disabled on pg_autovacuum, issue your manuals VACUUM
calls as needed (from cron or whatever), and use autovacuum to set
pg_database.datfreezexid -- so autovacuum would in fact do nothing
except set the freeze limit.

The problem is, this seems so awfully simple that I fear I am missing
something ... Otherwise, does this sound like a plan?

--
Alvaro Herrera -- Valdivia, Chile Architect, www.EnterpriseDB.com
The easiest way to resolve [trivial code guidelines disputes] is to fire
one or both of the people involved. (Damian Conway)

#2Tom Lane
tgl@sss.pgh.pa.us
In reply to: Alvaro Herrera (#1)
Re: Per-table freeze limit proposal

Alvaro Herrera <alvherre@alvh.no-ip.org> writes:

In fact this seems pretty easy to do. Add a field to pg_class, tell
VACUUM to update it using the determined freezeLimit, and that's it.

I think that it'd be worth fixing things so that the recorded value
is not the freeze cutoff value (as now), but the actual lowest
not-frozen XID present anywhere in the table. The present code does not
do that because it's painful to track across multiple tables, but on a
per-table basis it seems easy. In particular this rule allows you to
set a sane value for the pg_class field when the table is created (ie,
current transaction's XMIN, rather than a billion less).

(Note that if we ever implement partial vacuum, it won't be able to
update the freeze point. But that was true before anyway.)

Sure.

We also need to teach autovacuum to update pg_database.datfreezexid,
using the minimum from pg_class.

No, no, no. autovacuum is not a required part of the system and it's
not going to become so any time soon. Updating the pg_database entry
will have to be the responsibility of VACUUM itself. It's not that
terrible: you don't have to scan pg_class unless you see that the
pg_class.relfreezexid value you are replacing is equal to
pg_database.datfreezexid, and with the exact computation suggested
above, that won't be a common occurrence.

regards, tom lane

#3Jim Buttafuoco
jim@contactbda.com
In reply to: Alvaro Herrera (#1)
Re: Per-table freeze limit proposal

while you are at it, can you put in some audit timestamps as to when the vacuum occurred (full vs not full).

---------- Original Message -----------
From: Alvaro Herrera <alvherre@alvh.no-ip.org>
To: Hackers <pgsql-hackers@postgresql.org>
Sent: Wed, 14 Sep 2005 22:14:23 -0400
Subject: [HACKERS] Per-table freeze limit proposal

Hackers,

As you've probably heard too many times already, I'm thinking in
improving vacuum, so we can keep track of the freeze Xid on a table
level, rather than database level. Hopefully this will eliminate the
need for database-wide vacuums.

In fact this seems pretty easy to do. Add a field to pg_class, tell
VACUUM to update it using the determined freezeLimit, and that's it.
(Note that if we ever implement partial vacuum, it won't be able to
update the freeze point. But that was true before anyway.)

We also need to teach autovacuum to update pg_database.datfreezexid,
using the minimum from pg_class. (I don't think it's a good idea to
seqscan pg_class to find out the minimum on each VACUUM call.) So, an
autovacuum iteration would issue all needed VACUUM/ANALYZE calls, then
get the minimum freezexid from pg_class to update pg_database. This
way, GetNewTransactionId can continue checking pg_database.datfreezexid
as the hard limit for issuing warnings for Xid wraparound.

Does anyone see a need for anything other than the autovacuum process to
be updating pg_database.datfreezexid? Of course, if autovacuum is not
in use, things would continue as now, that is, manual database-wide
VACUUM calls updating pg_database.datfreezexid. But note that you can
mark all tables as disabled on pg_autovacuum, issue your manuals VACUUM
calls as needed (from cron or whatever), and use autovacuum to set
pg_database.datfreezexid -- so autovacuum would in fact do nothing
except set the freeze limit.

The problem is, this seems so awfully simple that I fear I am missing
something ... Otherwise, does this sound like a plan?

--
Alvaro Herrera -- Valdivia, Chile Architect, www.EnterpriseDB.com
The easiest way to resolve [trivial code guidelines disputes] is to fire
one or both of the people involved. (Damian Conway)

---------------------------(end of broadcast)---------------------------
TIP 3: Have you checked our extensive FAQ?

http://www.postgresql.org/docs/faq

------- End of Original Message -------

#4Alvaro Herrera
alvherre@alvh.no-ip.org
In reply to: Tom Lane (#2)
Re: Per-table freeze limit proposal

On Wed, Sep 14, 2005 at 11:30:52PM -0400, Tom Lane wrote:

Alvaro Herrera <alvherre@alvh.no-ip.org> writes:

In fact this seems pretty easy to do. Add a field to pg_class, tell
VACUUM to update it using the determined freezeLimit, and that's it.

I think that it'd be worth fixing things so that the recorded value
is not the freeze cutoff value (as now), but the actual lowest
not-frozen XID present anywhere in the table.

Cool. I wonder if the exact figure should be

min(lowest non-frozen Xid in table, GetOldestXmin(false))

just in case a long-running transaction inserts a new tuple after the
vacuum is done. Also GetOldestXmin should be the value used for empty
tables. For shared relations, we'd use GetOldestXmin(true).

Also, in light of this, it seems a bad idea to use the name "freezexid"
for the pg_class column; I would name it relminxid or something like
that (suggestions welcome). Not sure about renaming the pg_database
column -- I don't see why not.

--
Alvaro Herrera -- Valdivia, Chile Architect, www.EnterpriseDB.com
"La Primavera ha venido. Nadie sabe como ha sido" (A. Machado)

#5Tom Lane
tgl@sss.pgh.pa.us
In reply to: Alvaro Herrera (#4)
Re: Per-table freeze limit proposal

Alvaro Herrera <alvherre@alvh.no-ip.org> writes:

Cool. I wonder if the exact figure should be
min(lowest non-frozen Xid in table, GetOldestXmin(false))

Actually just min(lowest Xid in table, RecentXmin). You only need to be
sure there are no running transactions older than what you put into the
field; their xmins are not at issue.

Also, in light of this, it seems a bad idea to use the name "freezexid"
for the pg_class column; I would name it relminxid or something like
that (suggestions welcome).

Works for me.

regards, tom lane

#6Jim C. Nasby
jnasby@pervasive.com
In reply to: Jim Buttafuoco (#3)
Re: Per-table freeze limit proposal

It would also be very handy to be able to tell how many transactions (or
inserts/updates/deletes) have occured since the last vacuum. Presumably
autovacuum needs to know this already, but is it exposed?

On Thu, Sep 15, 2005 at 07:46:26AM -0400, Jim Buttafuoco wrote:

while you are at it, can you put in some audit timestamps as to when the vacuum occurred (full vs not full).

---------- Original Message -----------
From: Alvaro Herrera <alvherre@alvh.no-ip.org>
To: Hackers <pgsql-hackers@postgresql.org>
Sent: Wed, 14 Sep 2005 22:14:23 -0400
Subject: [HACKERS] Per-table freeze limit proposal

Hackers,

As you've probably heard too many times already, I'm thinking in
improving vacuum, so we can keep track of the freeze Xid on a table
level, rather than database level. Hopefully this will eliminate the
need for database-wide vacuums.

--
Jim C. Nasby, Sr. Engineering Consultant jnasby@pervasive.com
Pervasive Software http://pervasive.com work: 512-231-6117
vcard: http://jim.nasby.net/pervasive.vcf cell: 512-569-9461

#7Alvaro Herrera
alvherre@alvh.no-ip.org
In reply to: Tom Lane (#2)
1 attachment(s)
Re: [HACKERS] Per-table freeze limit proposal

On Wed, Sep 14, 2005 at 11:30:52PM -0400, Tom Lane wrote:

Updating the pg_database entry
will have to be the responsibility of VACUUM itself. It's not that
terrible: you don't have to scan pg_class unless you see that the
pg_class.relfreezexid value you are replacing is equal to
pg_database.datfreezexid, and with the exact computation suggested
above, that won't be a common occurrence.

Ok, this patch is a first cut at that. I made it slightly smarter,
because we want to do it only once if we issue a database-wide vacuum,
instead of once per table. However, I also had to cope with the
possibility that the table with the minimum Xid value is dropped, so I
made that set the datminxid to InvalidXid. If after a VACUUM the
datminxid is found to be Invalid, pg_class is scanned inconditionally.

There's the usual gotcha with shared relations: vacuuming them on one
database is not going to update their pg_class entries in other
databases, so they will be vacuumed more frequently than really
necessary. I don't see how to fix that -- one way would be storing
their stats in one database only, but it seems dangerous.

Note that I use LockSharedObject() to lock the database while we are
updating the pg_database row. This means that more than one database
can be updated concurrently (this is important because we have to keep
the lock while we seqscan pg_class). This may be a bad idea from the
point of view of the buffer manager; maybe we need an additional
LockBuffer() just before we are going to modify the tuple.

With this in place, it's no longer necessary to issue database-wide
vacuums anymore. Note that I haven't tested the part where the clog
grows enough to be truncated, nor really anything more complicated than
a single backend doing database-wide or single-table vacuums (with an
optional parallel backend with an open transaction). Still, it shows
what the basics of the patch are.

--
Alvaro Herrera http://www.amazon.com/gp/registry/CTMLCN8V17R4
"Lo esencial es invisible para los ojos" (A. de Saint Ex�pery)

Attachments:

vacuum-min-xid.patchtext/plain; charset=us-asciiDownload
Index: src/backend/access/transam/varsup.c
===================================================================
RCS file: /home/alvherre/cvs/pgsql/src/backend/access/transam/varsup.c,v
retrieving revision 1.66
diff -c -r1.66 varsup.c
*** src/backend/access/transam/varsup.c	22 Aug 2005 16:59:47 -0000	1.66
--- src/backend/access/transam/varsup.c	20 Sep 2005 20:31:22 -0000
***************
*** 171,181 ****
  
  /*
   * Determine the last safe XID to allocate given the currently oldest
!  * datfrozenxid (ie, the oldest XID that might exist in any database
   * of our cluster).
   */
  void
! SetTransactionIdLimit(TransactionId oldest_datfrozenxid,
  					  Name oldest_datname)
  {
  	TransactionId xidWarnLimit;
--- 171,181 ----
  
  /*
   * Determine the last safe XID to allocate given the currently oldest
!  * datminxid (ie, the oldest XID that might exist in any database
   * of our cluster).
   */
  void
! SetTransactionIdLimit(TransactionId oldest_datminxid,
  					  Name oldest_datname)
  {
  	TransactionId xidWarnLimit;
***************
*** 183,198 ****
  	TransactionId xidWrapLimit;
  	TransactionId curXid;
  
! 	Assert(TransactionIdIsValid(oldest_datfrozenxid));
  
  	/*
  	 * The place where we actually get into deep trouble is halfway around
! 	 * from the oldest potentially-existing XID.  (This calculation is
! 	 * probably off by one or two counts, because the special XIDs reduce the
! 	 * size of the loop a little bit.  But we throw in plenty of slop below,
! 	 * so it doesn't matter.)
  	 */
! 	xidWrapLimit = oldest_datfrozenxid + (MaxTransactionId >> 1);
  	if (xidWrapLimit < FirstNormalTransactionId)
  		xidWrapLimit += FirstNormalTransactionId;
  
--- 183,198 ----
  	TransactionId xidWrapLimit;
  	TransactionId curXid;
  
! 	Assert(TransactionIdIsValid(oldest_datminxid));
  
  	/*
  	 * The place where we actually get into deep trouble is halfway around
! 	 * from the oldest existing XID.  (This calculation is probably off by one
! 	 * or two counts, because the special XIDs reduce the size of the loop a
! 	 * little bit.  But we throw in plenty of slop below, so it doesn't
! 	 * matter.)
  	 */
! 	xidWrapLimit = oldest_datminxid + (MaxTransactionId >> 1);
  	if (xidWrapLimit < FirstNormalTransactionId)
  		xidWrapLimit += FirstNormalTransactionId;
  
Index: src/backend/catalog/heap.c
===================================================================
RCS file: /home/alvherre/cvs/pgsql/src/backend/catalog/heap.c,v
retrieving revision 1.290
diff -c -r1.290 heap.c
*** src/backend/catalog/heap.c	26 Aug 2005 03:07:12 -0000	1.290
--- src/backend/catalog/heap.c	24 Sep 2005 00:43:44 -0000
***************
*** 38,43 ****
--- 38,44 ----
  #include "catalog/indexing.h"
  #include "catalog/pg_attrdef.h"
  #include "catalog/pg_constraint.h"
+ #include "catalog/pg_database.h"
  #include "catalog/pg_inherits.h"
  #include "catalog/pg_namespace.h"
  #include "catalog/pg_statistic.h"
***************
*** 576,591 ****
--- 577,598 ----
  			/* The relation is real, but as yet empty */
  			new_rel_reltup->relpages = 0;
  			new_rel_reltup->reltuples = 0;
+ 			/* Use the minimum Xid that could put tuples in the table */
+ 			new_rel_reltup->relminxid = RecentXmin;
  			break;
  		case RELKIND_SEQUENCE:
  			/* Sequences always have a known size */
  			new_rel_reltup->relpages = 1;
  			new_rel_reltup->reltuples = 1;
+ 			/* Sequences will never have Xids */
+ 			new_rel_reltup->relminxid = InvalidTransactionId;
  			break;
  		default:
  			/* Views, etc, have no disk storage */
  			new_rel_reltup->relpages = 0;
  			new_rel_reltup->reltuples = 0;
+ 			/* Neither will views nor anything else */
+ 			new_rel_reltup->relminxid = InvalidTransactionId;
  			break;
  	}
  
***************
*** 1128,1133 ****
--- 1135,1186 ----
  }
  
  /*
+  * Invalidate (set to invalid) the datminxid of a database, when we
+  * drop the table that has the minimum pg_class.relminxid.  
+  */
+ static void
+ InvalidateDbMinxid(TransactionId relminxid)
+ {
+ 	Oid				dbid = MyDatabaseId;
+ 	Relation		dbrel;
+ 	HeapTuple		tuple;
+ 	HeapScanDesc	scan;
+ 	Form_pg_database dbform;
+ 	ScanKeyData		entry[1];
+ 
+ 	if (!TransactionIdIsValid(relminxid))
+ 		return;
+ 
+ 	dbrel = heap_open(DatabaseRelationId, RowExclusiveLock);
+ 
+ 	/* Must use a heap scan, since there's no syscache for pg_database */
+ 	ScanKeyInit(&entry[0],
+ 				ObjectIdAttributeNumber,
+ 				BTEqualStrategyNumber, F_OIDEQ,
+ 				ObjectIdGetDatum(dbid));
+ 
+ 	scan = heap_beginscan(dbrel, SnapshotNow, 1, entry);
+ 
+ 	tuple = heap_getnext(scan, ForwardScanDirection);
+ 
+ 	if (!HeapTupleIsValid(tuple))
+ 		elog(ERROR, "could not find tuple for database %u", dbid);
+ 
+ 	/* Ensure no one does this at the same time */
+ 	LockSharedObject(DatabaseRelationId, dbid, 0, AccessExclusiveLock);
+ 
+ 	dbform = (Form_pg_database) GETSTRUCT(tuple);
+ 	if (TransactionIdEquals(dbform->datminxid, relminxid))
+ 		dbform->datminxid = InvalidTransactionId;
+ 
+ 	UnlockSharedObject(DatabaseRelationId, dbid, 0, AccessExclusiveLock);
+ 
+ 	heap_endscan(scan);
+ 
+ 	heap_close(dbrel, RowExclusiveLock);
+ }
+ 
+ /*
   * heap_drop_with_catalog	- removes specified relation from catalogs
   *
   * Note that this routine is not responsible for dropping objects that are
***************
*** 1156,1161 ****
--- 1209,1219 ----
  		smgrscheduleunlink(rel->rd_smgr, rel->rd_istemp);
  	}
  
+ 	/* Invalidate pg_database.datminxid, if appropiate */
+ 	if ((rel->rd_rel->relkind == RELKIND_RELATION) &&
+ 		(!rel->rd_istemp))
+ 		InvalidateDbMinxid(rel->rd_rel->relminxid);
+ 
  	/*
  	 * Close relcache entry, but *keep* AccessExclusiveLock on the
  	 * relation until transaction commit.  This ensures no one else will
Index: src/backend/commands/analyze.c
===================================================================
RCS file: /home/alvherre/cvs/pgsql/src/backend/commands/analyze.c,v
retrieving revision 1.88
diff -c -r1.88 analyze.c
*** src/backend/commands/analyze.c	29 Jul 2005 19:30:03 -0000	1.88
--- src/backend/commands/analyze.c	15 Sep 2005 21:50:29 -0000
***************
*** 424,430 ****
  		vac_update_relstats(RelationGetRelid(onerel),
  							RelationGetNumberOfBlocks(onerel),
  							totalrows,
! 							hasindex);
  		for (ind = 0; ind < nindexes; ind++)
  		{
  			AnlIndexData *thisdata = &indexdata[ind];
--- 424,431 ----
  		vac_update_relstats(RelationGetRelid(onerel),
  							RelationGetNumberOfBlocks(onerel),
  							totalrows,
! 							hasindex,
! 							InvalidTransactionId);
  		for (ind = 0; ind < nindexes; ind++)
  		{
  			AnlIndexData *thisdata = &indexdata[ind];
***************
*** 434,440 ****
  			vac_update_relstats(RelationGetRelid(Irel[ind]),
  								RelationGetNumberOfBlocks(Irel[ind]),
  								totalindexrows,
! 								false);
  		}
  
  		/* report results to the stats collector, too */
--- 435,441 ----
  			vac_update_relstats(RelationGetRelid(Irel[ind]),
  								RelationGetNumberOfBlocks(Irel[ind]),
  								totalindexrows,
! 								false, InvalidTransactionId);
  		}
  
  		/* report results to the stats collector, too */
Index: src/backend/commands/dbcommands.c
===================================================================
RCS file: /home/alvherre/cvs/pgsql/src/backend/commands/dbcommands.c,v
retrieving revision 1.171
diff -c -r1.171 dbcommands.c
*** src/backend/commands/dbcommands.c	22 Aug 2005 17:38:20 -0000	1.171
--- src/backend/commands/dbcommands.c	20 Sep 2005 21:05:01 -0000
***************
*** 56,62 ****
  static bool get_db_info(const char *name, Oid *dbIdP, Oid *ownerIdP,
  			int *encodingP, bool *dbIsTemplateP, bool *dbAllowConnP,
  			Oid *dbLastSysOidP,
! 			TransactionId *dbVacuumXidP, TransactionId *dbFrozenXidP,
  			Oid *dbTablespace);
  static bool have_createdb_privilege(void);
  static void remove_dbtablespaces(Oid db_id);
--- 56,62 ----
  static bool get_db_info(const char *name, Oid *dbIdP, Oid *ownerIdP,
  			int *encodingP, bool *dbIsTemplateP, bool *dbAllowConnP,
  			Oid *dbLastSysOidP,
! 			TransactionId *dbMinXidP,
  			Oid *dbTablespace);
  static bool have_createdb_privilege(void);
  static void remove_dbtablespaces(Oid db_id);
***************
*** 76,83 ****
  	bool		src_istemplate;
  	bool		src_allowconn;
  	Oid			src_lastsysoid;
! 	TransactionId src_vacuumxid;
! 	TransactionId src_frozenxid;
  	Oid			src_deftablespace;
  	volatile Oid dst_deftablespace;
  	volatile Relation pg_database_rel;
--- 76,82 ----
  	bool		src_istemplate;
  	bool		src_allowconn;
  	Oid			src_lastsysoid;
! 	TransactionId src_minxid;
  	Oid			src_deftablespace;
  	volatile Oid dst_deftablespace;
  	volatile Relation pg_database_rel;
***************
*** 224,230 ****
  	 * after we grab the exclusive lock.
  	 */
  	if (get_db_info(dbname, NULL, NULL, NULL,
! 					NULL, NULL, NULL, NULL, NULL, NULL))
  		ereport(ERROR,
  				(errcode(ERRCODE_DUPLICATE_DATABASE),
  				 errmsg("database \"%s\" already exists", dbname)));
--- 223,229 ----
  	 * after we grab the exclusive lock.
  	 */
  	if (get_db_info(dbname, NULL, NULL, NULL,
! 					NULL, NULL, NULL, NULL, NULL))
  		ereport(ERROR,
  				(errcode(ERRCODE_DUPLICATE_DATABASE),
  				 errmsg("database \"%s\" already exists", dbname)));
***************
*** 237,243 ****
  
  	if (!get_db_info(dbtemplate, &src_dboid, &src_owner, &src_encoding,
  					 &src_istemplate, &src_allowconn, &src_lastsysoid,
! 					 &src_vacuumxid, &src_frozenxid, &src_deftablespace))
  		ereport(ERROR,
  				(errcode(ERRCODE_UNDEFINED_DATABASE),
  		 errmsg("template database \"%s\" does not exist", dbtemplate)));
--- 236,242 ----
  
  	if (!get_db_info(dbtemplate, &src_dboid, &src_owner, &src_encoding,
  					 &src_istemplate, &src_allowconn, &src_lastsysoid,
! 					 &src_minxid, &src_deftablespace))
  		ereport(ERROR,
  				(errcode(ERRCODE_UNDEFINED_DATABASE),
  		 errmsg("template database \"%s\" does not exist", dbtemplate)));
***************
*** 336,349 ****
  	}
  
  	/*
! 	 * Normally we mark the new database with the same datvacuumxid and
! 	 * datfrozenxid as the source.  However, if the source is not allowing
! 	 * connections then we assume it is fully frozen, and we can set the
! 	 * current transaction ID as the xid limits.  This avoids immediately
! 	 * starting to generate warnings after cloning template0.
  	 */
  	if (!src_allowconn)
! 		src_vacuumxid = src_frozenxid = GetCurrentTransactionId();
  
  	/*
  	 * Preassign OID for pg_database tuple, so that we can compute db
--- 335,348 ----
  	}
  
  	/*
! 	 * Normally we mark the new database with the same datminxid as the source.
! 	 * However, if the source is not allowing connections then we assume it is
! 	 * fully frozen, and we can set the current transaction ID as the xid
! 	 * limit.  This avoids immediately starting to generate warnings after
! 	 * cloning template0.
  	 */
  	if (!src_allowconn)
! 		src_minxid = GetCurrentTransactionId();
  
  	/*
  	 * Preassign OID for pg_database tuple, so that we can compute db
***************
*** 441,447 ****
  
  		/* Check to see if someone else created same DB name meanwhile. */
  		if (get_db_info(dbname, NULL, NULL, NULL,
! 						NULL, NULL, NULL, NULL, NULL, NULL))
  			ereport(ERROR,
  					(errcode(ERRCODE_DUPLICATE_DATABASE),
  					 errmsg("database \"%s\" already exists", dbname)));
--- 440,446 ----
  
  		/* Check to see if someone else created same DB name meanwhile. */
  		if (get_db_info(dbname, NULL, NULL, NULL,
! 						NULL, NULL, NULL, NULL, NULL))
  			ereport(ERROR,
  					(errcode(ERRCODE_DUPLICATE_DATABASE),
  					 errmsg("database \"%s\" already exists", dbname)));
***************
*** 463,470 ****
  		new_record[Anum_pg_database_datallowconn - 1] = BoolGetDatum(true);
  		new_record[Anum_pg_database_datconnlimit - 1] = Int32GetDatum(dbconnlimit);
  		new_record[Anum_pg_database_datlastsysoid - 1] = ObjectIdGetDatum(src_lastsysoid);
! 		new_record[Anum_pg_database_datvacuumxid - 1] = TransactionIdGetDatum(src_vacuumxid);
! 		new_record[Anum_pg_database_datfrozenxid - 1] = TransactionIdGetDatum(src_frozenxid);
  		new_record[Anum_pg_database_dattablespace - 1] = ObjectIdGetDatum(dst_deftablespace);
  
  		/*
--- 462,468 ----
  		new_record[Anum_pg_database_datallowconn - 1] = BoolGetDatum(true);
  		new_record[Anum_pg_database_datconnlimit - 1] = Int32GetDatum(dbconnlimit);
  		new_record[Anum_pg_database_datlastsysoid - 1] = ObjectIdGetDatum(src_lastsysoid);
! 		new_record[Anum_pg_database_datminxid - 1] = TransactionIdGetDatum(src_minxid);
  		new_record[Anum_pg_database_dattablespace - 1] = ObjectIdGetDatum(dst_deftablespace);
  
  		/*
***************
*** 583,589 ****
  	pgdbrel = heap_open(DatabaseRelationId, ExclusiveLock);
  
  	if (!get_db_info(dbname, &db_id, NULL, NULL,
! 					 &db_istemplate, NULL, NULL, NULL, NULL, NULL))
  		ereport(ERROR,
  				(errcode(ERRCODE_UNDEFINED_DATABASE),
  				 errmsg("database \"%s\" does not exist", dbname)));
--- 581,587 ----
  	pgdbrel = heap_open(DatabaseRelationId, ExclusiveLock);
  
  	if (!get_db_info(dbname, &db_id, NULL, NULL,
! 					 &db_istemplate, NULL, NULL, NULL, NULL))
  		ereport(ERROR,
  				(errcode(ERRCODE_UNDEFINED_DATABASE),
  				 errmsg("database \"%s\" does not exist", dbname)));
***************
*** 1084,1091 ****
  static bool
  get_db_info(const char *name, Oid *dbIdP, Oid *ownerIdP,
  			int *encodingP, bool *dbIsTemplateP, bool *dbAllowConnP,
! 			Oid *dbLastSysOidP,
! 			TransactionId *dbVacuumXidP, TransactionId *dbFrozenXidP,
  			Oid *dbTablespace)
  {
  	Relation	relation;
--- 1082,1088 ----
  static bool
  get_db_info(const char *name, Oid *dbIdP, Oid *ownerIdP,
  			int *encodingP, bool *dbIsTemplateP, bool *dbAllowConnP,
! 			Oid *dbLastSysOidP, TransactionId *dbMinXidP,
  			Oid *dbTablespace)
  {
  	Relation	relation;
***************
*** 1132,1143 ****
  		/* last system OID used in database */
  		if (dbLastSysOidP)
  			*dbLastSysOidP = dbform->datlastsysoid;
! 		/* limit of vacuumed XIDs */
! 		if (dbVacuumXidP)
! 			*dbVacuumXidP = dbform->datvacuumxid;
! 		/* limit of frozen XIDs */
! 		if (dbFrozenXidP)
! 			*dbFrozenXidP = dbform->datfrozenxid;
  		/* default tablespace for this database */
  		if (dbTablespace)
  			*dbTablespace = dbform->dattablespace;
--- 1129,1137 ----
  		/* last system OID used in database */
  		if (dbLastSysOidP)
  			*dbLastSysOidP = dbform->datlastsysoid;
! 		/* limit of min XIDs */
! 		if (dbMinXidP)
! 			*dbMinXidP = dbform->datminxid;
  		/* default tablespace for this database */
  		if (dbTablespace)
  			*dbTablespace = dbform->dattablespace;
Index: src/backend/commands/vacuum.c
===================================================================
RCS file: /home/alvherre/cvs/pgsql/src/backend/commands/vacuum.c,v
retrieving revision 1.314
diff -c -r1.314 vacuum.c
*** src/backend/commands/vacuum.c	2 Sep 2005 19:02:19 -0000	1.314
--- src/backend/commands/vacuum.c	24 Sep 2005 01:46:44 -0000
***************
*** 124,129 ****
--- 124,130 ----
  	Size		min_tlen;
  	Size		max_tlen;
  	bool		hasindex;
+ 	TransactionId minxid;	/* Minimum Xid present anywhere on table */
  	/* vtlinks array for tuple chain following - sorted by new_tid */
  	int			num_vtlinks;
  	VTupleLink	vtlinks;
***************
*** 191,215 ****
  
  static int	elevel = -1;
  
- static TransactionId OldestXmin;
- static TransactionId FreezeLimit;
- 
  
  /* non-export function prototypes */
  static List *get_rel_oids(List *relids, const RangeVar *vacrel,
  						  const char *stmttype);
! static void vac_update_dbstats(Oid dbid,
! 				   TransactionId vacuumXID,
! 				   TransactionId frozenXID);
! static void vac_truncate_clog(TransactionId vacuumXID,
! 				  TransactionId frozenXID);
! static bool vacuum_rel(Oid relid, VacuumStmt *vacstmt, char expected_relkind);
! static void full_vacuum_rel(Relation onerel, VacuumStmt *vacstmt);
  static void scan_heap(VRelStats *vacrelstats, Relation onerel,
! 		  VacPageList vacuum_pages, VacPageList fraged_pages);
  static void repair_frag(VRelStats *vacrelstats, Relation onerel,
  			VacPageList vacuum_pages, VacPageList fraged_pages,
! 			int nindexes, Relation *Irel);
  static void move_chain_tuple(Relation rel,
  				 Buffer old_buf, Page old_page, HeapTuple old_tup,
  				 Buffer dst_buf, Page dst_page, VacPage dst_vacpage,
--- 192,211 ----
  
  static int	elevel = -1;
  
  
  /* non-export function prototypes */
  static List *get_rel_oids(List *relids, const RangeVar *vacrel,
  						  const char *stmttype);
! static void vac_update_dbminxid(Oid dbid, TransactionId prevminXID);
! static void vac_truncate_clog(TransactionId minXID);
! static TransactionId vacuum_rel(Oid relid, VacuumStmt *vacstmt, char expected_relkind);
! static TransactionId full_vacuum_rel(Relation onerel, VacuumStmt *vacstmt);
  static void scan_heap(VRelStats *vacrelstats, Relation onerel,
! 		  VacPageList vacuum_pages, VacPageList fraged_pages,
! 		  TransactionId FreezeLimit, TransactionId OldestXmin);
  static void repair_frag(VRelStats *vacrelstats, Relation onerel,
  			VacPageList vacuum_pages, VacPageList fraged_pages,
! 			int nindexes, Relation *Irel, TransactionId OldestXmin);
  static void move_chain_tuple(Relation rel,
  				 Buffer old_buf, Page old_page, HeapTuple old_tup,
  				 Buffer dst_buf, Page dst_page, VacPage dst_vacpage,
***************
*** 266,278 ****
  vacuum(VacuumStmt *vacstmt, List *relids)
  {
  	const char *stmttype = vacstmt->vacuum ? "VACUUM" : "ANALYZE";
- 	TransactionId initialOldestXmin = InvalidTransactionId;
- 	TransactionId initialFreezeLimit = InvalidTransactionId;
  	volatile MemoryContext anl_context = NULL;
  	volatile bool all_rels,
  				in_outer_xact,
  				use_own_xacts;
  	List	   *relations;
  
  	if (vacstmt->verbose)
  		elevel = INFO;
--- 262,274 ----
  vacuum(VacuumStmt *vacstmt, List *relids)
  {
  	const char *stmttype = vacstmt->vacuum ? "VACUUM" : "ANALYZE";
  	volatile MemoryContext anl_context = NULL;
  	volatile bool all_rels,
  				in_outer_xact,
  				use_own_xacts;
  	List	   *relations;
+ 	TransactionId prevmin;
+ 	TransactionId currmin = InvalidTransactionId;
  
  	if (vacstmt->verbose)
  		elevel = INFO;
***************
*** 351,382 ****
  	 */
  	relations = get_rel_oids(relids, vacstmt->relation, stmttype);
  
- 	if (vacstmt->vacuum && all_rels)
- 	{
- 		/*
- 		 * It's a database-wide VACUUM.
- 		 *
- 		 * Compute the initially applicable OldestXmin and FreezeLimit XIDs,
- 		 * so that we can record these values at the end of the VACUUM.
- 		 * Note that individual tables may well be processed with newer
- 		 * values, but we can guarantee that no (non-shared) relations are
- 		 * processed with older ones.
- 		 *
- 		 * It is okay to record non-shared values in pg_database, even though
- 		 * we may vacuum shared relations with older cutoffs, because only
- 		 * the minimum of the values present in pg_database matters.  We
- 		 * can be sure that shared relations have at some time been
- 		 * vacuumed with cutoffs no worse than the global minimum; for, if
- 		 * there is a backend in some other DB with xmin = OLDXMIN that's
- 		 * determining the cutoff with which we vacuum shared relations,
- 		 * it is not possible for that database to have a cutoff newer
- 		 * than OLDXMIN recorded in pg_database.
- 		 */
- 		vacuum_set_xid_limits(vacstmt, false,
- 							  &initialOldestXmin,
- 							  &initialFreezeLimit);
- 	}
- 
  	/*
  	 * Decide whether we need to start/commit our own transactions.
  	 *
--- 347,352 ----
***************
*** 446,453 ****
  
  			if (vacstmt->vacuum)
  			{
! 				if (!vacuum_rel(relid, vacstmt, RELKIND_RELATION))
! 					all_rels = false;	/* forget about updating dbstats */
  			}
  			if (vacstmt->analyze)
  			{
--- 416,427 ----
  
  			if (vacstmt->vacuum)
  			{
! 				prevmin = vacuum_rel(relid, vacstmt, RELKIND_RELATION);
! 				
! 				if (!TransactionIdIsValid(currmin) ||
! 					(TransactionIdIsNormal(prevmin) &&
! 					 TransactionIdPrecedes(prevmin, currmin)))
! 					currmin = prevmin;
  			}
  			if (vacstmt->analyze)
  			{
***************
*** 524,538 ****
  			PrintFreeSpaceMapStatistics(elevel);
  
  		/*
! 		 * If we completed a database-wide VACUUM without skipping any
! 		 * relations, update the database's pg_database row with info
! 		 * about the transaction IDs used, and try to truncate pg_clog.
  		 */
! 		if (all_rels)
  		{
! 			vac_update_dbstats(MyDatabaseId,
! 							   initialOldestXmin, initialFreezeLimit);
! 			vac_truncate_clog(initialOldestXmin, initialFreezeLimit);
  		}
  	}
  
--- 498,514 ----
  			PrintFreeSpaceMapStatistics(elevel);
  
  		/*
! 		 * Skip these steps if the current minxid is InvalidTransactionId.
! 		 * It shouldn't happen on normal operation, but it happens during
! 		 * initdb.
  		 */
! 		if (TransactionIdIsValid(currmin))
  		{
! 			/* Update pg_database.datminxid, if necessary. */
! 			vac_update_dbminxid(MyDatabaseId, currmin);
! 
! 			/* Try to truncate pg_clog. */
! 			vac_truncate_clog(currmin);
  		}
  	}
  
***************
*** 613,620 ****
   */
  void
  vacuum_set_xid_limits(VacuumStmt *vacstmt, bool sharedRel,
! 					  TransactionId *oldestXmin,
! 					  TransactionId *freezeLimit)
  {
  	TransactionId limit;
  
--- 589,596 ----
   */
  void
  vacuum_set_xid_limits(VacuumStmt *vacstmt, bool sharedRel,
!                       TransactionId *oldestXmin,
!                       TransactionId *freezeLimit)
  {
  	TransactionId limit;
  
***************
*** 656,662 ****
  	*freezeLimit = limit;
  }
  
- 
  /*
   *	vac_update_relstats() -- update statistics for one relation
   *
--- 632,637 ----
***************
*** 665,670 ****
--- 640,648 ----
   *		doing ANALYZE, but we always update these stats.  This routine works
   *		for both index and heap relation entries in pg_class.
   *
+  *		The return value is the relminxid previous to the change, in the
+  *		case of a plain relation, or InvalidTransactionId otherwise.
+  *
   *		We violate no-overwrite semantics here by storing new values for the
   *		statistics columns directly into the pg_class tuple that's already on
   *		the page.  The reason for this is that if we updated these tuples in
***************
*** 676,690 ****
   *		This routine is shared by full VACUUM, lazy VACUUM, and stand-alone
   *		ANALYZE.
   */
! void
  vac_update_relstats(Oid relid, BlockNumber num_pages, double num_tuples,
! 					bool hasindex)
  {
  	Relation	rd;
  	HeapTupleData rtup;
  	HeapTuple	ctup;
  	Form_pg_class pgcform;
  	Buffer		buffer;
  
  	/*
  	 * update number of tuples and number of pages in pg_class
--- 654,669 ----
   *		This routine is shared by full VACUUM, lazy VACUUM, and stand-alone
   *		ANALYZE.
   */
! TransactionId
  vac_update_relstats(Oid relid, BlockNumber num_pages, double num_tuples,
! 					bool hasindex, TransactionId minxid)
  {
  	Relation	rd;
  	HeapTupleData rtup;
  	HeapTuple	ctup;
  	Form_pg_class pgcform;
  	Buffer		buffer;
+ 	TransactionId prevmin;
  
  	/*
  	 * update number of tuples and number of pages in pg_class
***************
*** 710,715 ****
--- 689,704 ----
  
  	/* overwrite the existing statistics in the tuple */
  	pgcform = (Form_pg_class) GETSTRUCT(&rtup);
+ 
+ 	/* Set relminxid only to regular tables */
+ 	if (pgcform->relkind == RELKIND_RELATION)
+ 	{
+ 		prevmin = pgcform->relminxid;
+ 		pgcform->relminxid = minxid;
+ 	}
+ 	else
+ 		prevmin = pgcform->relminxid = InvalidTransactionId;
+ 
  	pgcform->relpages = (int32) num_pages;
  	pgcform->reltuples = (float4) num_tuples;
  	pgcform->relhasindex = hasindex;
***************
*** 735,769 ****
  	WriteBuffer(buffer);
  
  	heap_close(rd, RowExclusiveLock);
  }
  
  
  /*
!  *	vac_update_dbstats() -- update statistics for one database
   *
!  *		Update the whole-database statistics that are kept in its pg_database
!  *		row, and the flat-file copy of pg_database.
   *
!  *		We violate no-overwrite semantics here by storing new values for the
!  *		statistics columns directly into the tuple that's already on the page.
   *		As with vac_update_relstats, this avoids leaving dead tuples behind
   *		after a VACUUM.
   *
!  *		This routine is shared by full and lazy VACUUM.  Note that it is only
!  *		applied after a database-wide VACUUM operation.
   */
  static void
! vac_update_dbstats(Oid dbid,
! 				   TransactionId vacuumXID,
! 				   TransactionId frozenXID)
  {
! 	Relation	relation;
! 	ScanKeyData entry[1];
  	HeapScanDesc scan;
  	HeapTuple	tuple;
  	Form_pg_database dbform;
  
! 	relation = heap_open(DatabaseRelationId, RowExclusiveLock);
  
  	/* Must use a heap scan, since there's no syscache for pg_database */
  	ScanKeyInit(&entry[0],
--- 724,771 ----
  	WriteBuffer(buffer);
  
  	heap_close(rd, RowExclusiveLock);
+ 
+ 	return prevmin;
  }
  
  
  /*
!  *	vac_update_dbminxid() -- update the minimum Xid present in one database
   *
!  * 		Update pg_database.datminxid, and the flat-file copy of pg_database.
!  * 		The prevminXID argument is the minimum of all the relminxids that
!  * 		the vacuumed table(s) had before this vacuum operation.  The
!  * 		datminxid is updated to the minimum of all relminxids found in
!  * 		pg_class, if this prevminXID is found to be equal to the current
!  * 		datminxid -- that is, one of the processed tables was the one
!  * 		holding the datminxid back.
   *
!  * 		Note that it's possible for pg_database.datminxid to be
!  * 		InvalidTransactionId -- for example, if a table is dropped.  This is
!  * 		to cope with the possibility that the dropped table was the one with
!  * 		minimum relminxid.  In this case, we need to search the minimum
!  * 		inconditionally.
!  *
!  *		We violate no-overwrite semantics here by storing a new value for the
!  *		statistic column directly into the tuple that's already on the page.
   *		As with vac_update_relstats, this avoids leaving dead tuples behind
   *		after a VACUUM.
   *
!  *		This routine is shared by full and lazy VACUUM.
   */
  static void
! vac_update_dbminxid(Oid dbid, TransactionId prevminXID)
  {
! 	Relation	dbrel;
! 	ScanKeyData	entry[1];
  	HeapScanDesc scan;
  	HeapTuple	tuple;
  	Form_pg_database dbform;
+ 	TransactionId prevmin;
+ 
+ 	Assert(TransactionIdIsValid(prevminXID));
  
! 	dbrel = heap_open(DatabaseRelationId, RowExclusiveLock);
  
  	/* Must use a heap scan, since there's no syscache for pg_database */
  	ScanKeyInit(&entry[0],
***************
*** 771,828 ****
  				BTEqualStrategyNumber, F_OIDEQ,
  				ObjectIdGetDatum(dbid));
  
! 	scan = heap_beginscan(relation, SnapshotNow, 1, entry);
  
  	tuple = heap_getnext(scan, ForwardScanDirection);
  
  	if (!HeapTupleIsValid(tuple))
  		elog(ERROR, "could not find tuple for database %u", dbid);
  
! 	/* ensure no one else does this at the same time */
! 	LockBuffer(scan->rs_cbuf, BUFFER_LOCK_EXCLUSIVE);
  
  	dbform = (Form_pg_database) GETSTRUCT(tuple);
  
! 	/* overwrite the existing statistics in the tuple */
! 	dbform->datvacuumxid = vacuumXID;
! 	dbform->datfrozenxid = frozenXID;
  
! 	LockBuffer(scan->rs_cbuf, BUFFER_LOCK_UNLOCK);
  
! 	/* invalidate the tuple in the cache and write the buffer */
! 	CacheInvalidateHeapTuple(relation, tuple);
! 	WriteNoReleaseBuffer(scan->rs_cbuf);
  
! 	heap_endscan(scan);
  
! 	heap_close(relation, RowExclusiveLock);
  
! 	/* Mark the flat-file copy of pg_database for update at commit */
! 	database_file_update_needed();
! }
  
  
  /*
   *	vac_truncate_clog() -- attempt to truncate the commit log
   *
!  *		Scan pg_database to determine the system-wide oldest datvacuumxid,
!  *		and use it to truncate the transaction commit log (pg_clog).
!  *		Also update the XID wrap limit point maintained by varsup.c.
   *
!  *		We also generate a warning if the system-wide oldest datfrozenxid
   *		seems to be in danger of wrapping around.  This is a long-in-advance
   *		warning; if we start getting uncomfortably close, GetNewTransactionId
   *		will generate more-annoying warnings, and ultimately refuse to issue
   *		any more new XIDs.
   *
!  *		The passed XIDs are simply the ones I just wrote into my pg_database
!  *		entry.	They're used to initialize the "min" calculations.
   *
!  *		This routine is shared by full and lazy VACUUM.  Note that it is only
!  *		applied after a database-wide VACUUM operation.
   */
  static void
! vac_truncate_clog(TransactionId vacuumXID, TransactionId frozenXID)
  {
  	TransactionId myXID = GetCurrentTransactionId();
  	Relation	relation;
--- 773,868 ----
  				BTEqualStrategyNumber, F_OIDEQ,
  				ObjectIdGetDatum(dbid));
  
! 	scan = heap_beginscan(dbrel, SnapshotNow, 1, entry);
  
  	tuple = heap_getnext(scan, ForwardScanDirection);
  
  	if (!HeapTupleIsValid(tuple))
  		elog(ERROR, "could not find tuple for database %u", dbid);
  
! 	/* Ensure no one does this at the same time */
! 	LockSharedObject(DatabaseRelationId, dbid, 0, AccessExclusiveLock);
  
  	dbform = (Form_pg_database) GETSTRUCT(tuple);
+ 	prevmin = dbform->datminxid;
  
! 	/*
! 	 * If the table we just vacuumed was holding the minimum back, try to
! 	 * update datminxid.
! 	 */
! 	if (!TransactionIdIsValid(prevmin) ||
! 		TransactionIdEquals(prevmin, prevminXID))
! 	{
! 		Relation	classRel;
! 		SysScanDesc	classScan;
! 		TransactionId newMinXID = InvalidTransactionId;
! 		HeapTuple	classTup;
  
! 		/* scan pg_class searching for the minimum relminxid */
! 		classRel = heap_open(RelationRelationId, AccessShareLock);
  
! 		classScan = systable_beginscan(classRel, InvalidOid, false,
! 									   SnapshotNow, 0, NULL);
  
! 		while ((classTup = systable_getnext(classScan)) != NULL)
! 		{
! 			Form_pg_class classForm;
  
! 			classForm = (Form_pg_class) GETSTRUCT(classTup);
  
! 			/* Only consider normal tables */
! 			if (classForm->relkind != RELKIND_RELATION)
! 				continue;
! 
! 			if (!TransactionIdIsValid(newMinXID) ||
! 				(TransactionIdIsNormal(classForm->relminxid) &&
! 				 TransactionIdPrecedes(classForm->relminxid, newMinXID)))
! 				newMinXID = classForm->relminxid;
! 		}
! 
! 		systable_endscan(classScan);
  
+ 		heap_close(classRel, AccessShareLock);
+ 
+ 		Assert(TransactionIdIsValid(newMinXID));
+ 
+ 		dbform->datminxid = newMinXID;
+ 		
+ 		/* invalidate the tuple in the cache and write the buffer */
+ 		CacheInvalidateHeapTuple(dbrel, tuple);
+ 		WriteNoReleaseBuffer(scan->rs_cbuf);
+ 		
+ 		/* Mark the flat-file copy of pg_database for update at commit */
+ 		database_file_update_needed();
+ 	}
+ 
+ 	UnlockSharedObject(DatabaseRelationId, dbid, 0, AccessExclusiveLock);
+ 
+ 	heap_endscan(scan);
+ 
+ 	heap_close(dbrel, RowExclusiveLock);
+ }
  
  /*
   *	vac_truncate_clog() -- attempt to truncate the commit log
   *
!  *		Truncate the transaction commit log (pg_clog) using the minimum
!  *		Xid found on pg_database.  Also update the XID wrap limit point
!  *		maintained by varsup.c.
   *
!  *		We also generate a warning if the system-wide oldest datminxid
   *		seems to be in danger of wrapping around.  This is a long-in-advance
   *		warning; if we start getting uncomfortably close, GetNewTransactionId
   *		will generate more-annoying warnings, and ultimately refuse to issue
   *		any more new XIDs.
   *
!  *		The passed XID is simply the one I just wrote into my pg_database
!  *		entry.	It's used to initialize the "min" calculation.
   *
!  *		This routine is shared by full and lazy VACUUM.
   */
  static void
! vac_truncate_clog(TransactionId minXID)
  {
  	TransactionId myXID = GetCurrentTransactionId();
  	Relation	relation;
***************
*** 830,844 ****
  	HeapTuple	tuple;
  	int32		age;
  	NameData	oldest_datname;
! 	bool		vacuumAlreadyWrapped = false;
! 	bool		frozenAlreadyWrapped = false;
  
! 	/* init oldest_datname to sync with my frozenXID */
  	namestrcpy(&oldest_datname, get_database_name(MyDatabaseId));
  
  	/*
! 	 * Note: the "already wrapped" cases should now be impossible due to the
! 	 * defenses in GetNewTransactionId, but we keep them anyway.
  	 */
  	relation = heap_open(DatabaseRelationId, AccessShareLock);
  
--- 870,885 ----
  	HeapTuple	tuple;
  	int32		age;
  	NameData	oldest_datname;
! 	bool		alreadyWrapped = false;
! 
! 	Assert(TransactionIdIsValid(minXID));
  
! 	/* init oldest_datname to sync with my minXID */
  	namestrcpy(&oldest_datname, get_database_name(MyDatabaseId));
  
  	/*
! 	 * Note: the "already wrapped" case should now be impossible due to the
! 	 * defenses in GetNewTransactionId, but we keep it anyway.
  	 */
  	relation = heap_open(DatabaseRelationId, AccessShareLock);
  
***************
*** 853,872 ****
  		if (!dbform->datallowconn)
  			continue;
  
! 		if (TransactionIdIsNormal(dbform->datvacuumxid))
  		{
! 			if (TransactionIdPrecedes(myXID, dbform->datvacuumxid))
! 				vacuumAlreadyWrapped = true;
! 			else if (TransactionIdPrecedes(dbform->datvacuumxid, vacuumXID))
! 				vacuumXID = dbform->datvacuumxid;
! 		}
! 		if (TransactionIdIsNormal(dbform->datfrozenxid))
! 		{
! 			if (TransactionIdPrecedes(myXID, dbform->datfrozenxid))
! 				frozenAlreadyWrapped = true;
! 			else if (TransactionIdPrecedes(dbform->datfrozenxid, frozenXID))
  			{
! 				frozenXID = dbform->datfrozenxid;
  				namecpy(&oldest_datname, &dbform->datname);
  			}
  		}
--- 894,906 ----
  		if (!dbform->datallowconn)
  			continue;
  
! 		if (TransactionIdIsNormal(dbform->datminxid))
  		{
! 			if (TransactionIdPrecedes(myXID, dbform->datminxid))
! 				alreadyWrapped = true;
! 			else if (TransactionIdPrecedes(dbform->datminxid, minXID))
  			{
! 				minXID = dbform->datminxid;
  				namecpy(&oldest_datname, &dbform->datname);
  			}
  		}
***************
*** 876,901 ****
  
  	heap_close(relation, AccessShareLock);
  
! 	/*
! 	 * Do not truncate CLOG if we seem to have suffered wraparound
! 	 * already; the computed minimum XID might be bogus.
! 	 */
! 	if (vacuumAlreadyWrapped)
! 	{
! 		ereport(WARNING,
! 				(errmsg("some databases have not been vacuumed in over 2 billion transactions"),
! 				 errdetail("You may have already suffered transaction-wraparound data loss.")));
! 		return;
! 	}
! 
! 	/* Truncate CLOG to the oldest vacuumxid */
! 	TruncateCLOG(vacuumXID);
  
  	/*
  	 * Do not update varsup.c if we seem to have suffered wraparound
  	 * already; the computed XID might be bogus.
  	 */
! 	if (frozenAlreadyWrapped)
  	{
  		ereport(WARNING,
  				(errmsg("some databases have not been vacuumed in over 1 billion transactions"),
--- 910,923 ----
  
  	heap_close(relation, AccessShareLock);
  
! 	/* Truncate CLOG to the oldest minxid */
! 	TruncateCLOG(minXID);
  
  	/*
  	 * Do not update varsup.c if we seem to have suffered wraparound
  	 * already; the computed XID might be bogus.
  	 */
! 	if (alreadyWrapped)
  	{
  		ereport(WARNING,
  				(errmsg("some databases have not been vacuumed in over 1 billion transactions"),
***************
*** 904,913 ****
  	}
  
  	/* Update the wrap limit for GetNewTransactionId */
! 	SetTransactionIdLimit(frozenXID, &oldest_datname);
  
  	/* Give warning about impending wraparound problems */
! 	age = (int32) (myXID - frozenXID);
  	if (age > (int32) ((MaxTransactionId >> 3) * 3))
  		ereport(WARNING,
  				(errmsg("database \"%s\" must be vacuumed within %u transactions",
--- 926,935 ----
  	}
  
  	/* Update the wrap limit for GetNewTransactionId */
! 	SetTransactionIdLimit(minXID, &oldest_datname);
  
  	/* Give warning about impending wraparound problems */
! 	age = (int32) (myXID - minXID);
  	if (age > (int32) ((MaxTransactionId >> 3) * 3))
  		ereport(WARNING,
  				(errmsg("database \"%s\" must be vacuumed within %u transactions",
***************
*** 929,955 ****
  /*
   *	vacuum_rel() -- vacuum one heap relation
   *
-  *		Returns TRUE if we actually processed the relation (or can ignore it
-  *		for some reason), FALSE if we failed to process it due to permissions
-  *		or other reasons.  (A FALSE result really means that some data
-  *		may have been left unvacuumed, so we can't update XID stats.)
-  *
   *		Doing one heap at a time incurs extra overhead, since we need to
   *		check that the heap exists again just before we vacuum it.	The
   *		reason that we do this is so that vacuuming can be spread across
   *		many small transactions.  Otherwise, two-phase locking would require
   *		us to lock the entire database during one pass of the vacuum cleaner.
   *
   *		At entry and exit, we are not inside a transaction.
   */
! static bool
  vacuum_rel(Oid relid, VacuumStmt *vacstmt, char expected_relkind)
  {
  	LOCKMODE	lmode;
  	Relation	onerel;
  	LockRelId	onerelid;
  	Oid			toast_relid;
! 	bool		result;
  
  	/* Begin a transaction for vacuuming this relation */
  	StartTransactionCommand();
--- 951,974 ----
  /*
   *	vacuum_rel() -- vacuum one heap relation
   *
   *		Doing one heap at a time incurs extra overhead, since we need to
   *		check that the heap exists again just before we vacuum it.	The
   *		reason that we do this is so that vacuuming can be spread across
   *		many small transactions.  Otherwise, two-phase locking would require
   *		us to lock the entire database during one pass of the vacuum cleaner.
   *
+  *		The return value is the pg_class.relminxid previous to the vacuum.
+  *
   *		At entry and exit, we are not inside a transaction.
   */
! static TransactionId
  vacuum_rel(Oid relid, VacuumStmt *vacstmt, char expected_relkind)
  {
  	LOCKMODE	lmode;
  	Relation	onerel;
  	LockRelId	onerelid;
  	Oid			toast_relid;
! 	TransactionId prevmin;
  
  	/* Begin a transaction for vacuuming this relation */
  	StartTransactionCommand();
***************
*** 978,984 ****
  	{
  		StrategyHintVacuum(false);
  		CommitTransactionCommand();
! 		return true;			/* okay 'cause no data there */
  	}
  
  	/*
--- 997,1003 ----
  	{
  		StrategyHintVacuum(false);
  		CommitTransactionCommand();
! 		return InvalidTransactionId;
  	}
  
  	/*
***************
*** 1012,1018 ****
  		relation_close(onerel, lmode);
  		StrategyHintVacuum(false);
  		CommitTransactionCommand();
! 		return false;
  	}
  
  	/*
--- 1031,1037 ----
  		relation_close(onerel, lmode);
  		StrategyHintVacuum(false);
  		CommitTransactionCommand();
! 		return InvalidTransactionId;
  	}
  
  	/*
***************
*** 1027,1033 ****
  		relation_close(onerel, lmode);
  		StrategyHintVacuum(false);
  		CommitTransactionCommand();
! 		return false;
  	}
  
  	/*
--- 1046,1052 ----
  		relation_close(onerel, lmode);
  		StrategyHintVacuum(false);
  		CommitTransactionCommand();
! 		return InvalidTransactionId;
  	}
  
  	/*
***************
*** 1042,1049 ****
  		relation_close(onerel, lmode);
  		StrategyHintVacuum(false);
  		CommitTransactionCommand();
! 		return true;			/* assume no long-lived data in temp
! 								 * tables */
  	}
  
  	/*
--- 1061,1068 ----
  		relation_close(onerel, lmode);
  		StrategyHintVacuum(false);
  		CommitTransactionCommand();
! 		return InvalidTransactionId;	/* assume no long-lived data in temp
! 										 * tables */
  	}
  
  	/*
***************
*** 1068,1078 ****
  	 * Do the actual work --- either FULL or "lazy" vacuum
  	 */
  	if (vacstmt->full)
! 		full_vacuum_rel(onerel, vacstmt);
  	else
! 		lazy_vacuum_rel(onerel, vacstmt);
! 
! 	result = true;				/* did the vacuum */
  
  	/* all done with this class, but hold lock until commit */
  	relation_close(onerel, NoLock);
--- 1087,1095 ----
  	 * Do the actual work --- either FULL or "lazy" vacuum
  	 */
  	if (vacstmt->full)
! 		prevmin = full_vacuum_rel(onerel, vacstmt);
  	else
! 		prevmin = lazy_vacuum_rel(onerel, vacstmt);
  
  	/* all done with this class, but hold lock until commit */
  	relation_close(onerel, NoLock);
***************
*** 1089,1099 ****
  	 * "analyze" will not get done on the toast table.	This is good,
  	 * because the toaster always uses hardcoded index access and
  	 * statistics are totally unimportant for toast relations.
  	 */
  	if (toast_relid != InvalidOid)
  	{
! 		if (!vacuum_rel(toast_relid, vacstmt, RELKIND_TOASTVALUE))
! 			result = false;		/* failed to vacuum the TOAST table? */
  	}
  
  	/*
--- 1106,1117 ----
  	 * "analyze" will not get done on the toast table.	This is good,
  	 * because the toaster always uses hardcoded index access and
  	 * statistics are totally unimportant for toast relations.
+ 	 *
+ 	 * Note we ignore the TOAST table for the "minxid" calculations.
  	 */
  	if (toast_relid != InvalidOid)
  	{
! 		vacuum_rel(toast_relid, vacstmt, RELKIND_TOASTVALUE);
  	}
  
  	/*
***************
*** 1101,1107 ****
  	 */
  	UnlockRelationForSession(&onerelid, lmode);
  
! 	return result;
  }
  
  
--- 1119,1125 ----
  	 */
  	UnlockRelationForSession(&onerelid, lmode);
  
! 	return prevmin;
  }
  
  
***************
*** 1121,1128 ****
   *
   *		At entry, we have already established a transaction and opened
   *		and locked the relation.
   */
! static void
  full_vacuum_rel(Relation onerel, VacuumStmt *vacstmt)
  {
  	VacPageListData vacuum_pages;		/* List of pages to vacuum and/or
--- 1139,1148 ----
   *
   *		At entry, we have already established a transaction and opened
   *		and locked the relation.
+  *
+  *		The return value is the pg_class.relminxid previous to the vacuum.
   */
! static TransactionId
  full_vacuum_rel(Relation onerel, VacuumStmt *vacstmt)
  {
  	VacPageListData vacuum_pages;		/* List of pages to vacuum and/or
***************
*** 1133,1138 ****
--- 1153,1161 ----
  	int			nindexes,
  				i;
  	VRelStats  *vacrelstats;
+ 	TransactionId FreezeLimit,
+ 				  OldestXmin,
+ 				  prevmin;
  
  	vacuum_set_xid_limits(vacstmt, onerel->rd_rel->relisshared,
  						  &OldestXmin, &FreezeLimit);
***************
*** 1145,1153 ****
  	vacrelstats->rel_tuples = 0;
  	vacrelstats->hasindex = false;
  
  	/* scan the heap */
  	vacuum_pages.num_pages = fraged_pages.num_pages = 0;
! 	scan_heap(vacrelstats, onerel, &vacuum_pages, &fraged_pages);
  
  	/* Now open all indexes of the relation */
  	vac_open_indexes(onerel, AccessExclusiveLock, &nindexes, &Irel);
--- 1168,1183 ----
  	vacrelstats->rel_tuples = 0;
  	vacrelstats->hasindex = false;
  
+ 	/*
+ 	 * Set initial minimum Xid, which will be updated if a smaller Xid is found
+ 	 * in the relation.
+ 	 */
+ 	vacrelstats->minxid = RecentXmin;
+ 
  	/* scan the heap */
  	vacuum_pages.num_pages = fraged_pages.num_pages = 0;
! 	scan_heap(vacrelstats, onerel, &vacuum_pages, &fraged_pages, FreezeLimit,
! 			  OldestXmin);
  
  	/* Now open all indexes of the relation */
  	vac_open_indexes(onerel, AccessExclusiveLock, &nindexes, &Irel);
***************
*** 1175,1181 ****
  	{
  		/* Try to shrink heap */
  		repair_frag(vacrelstats, onerel, &vacuum_pages, &fraged_pages,
! 					nindexes, Irel);
  		vac_close_indexes(nindexes, Irel, NoLock);
  	}
  	else
--- 1205,1211 ----
  	{
  		/* Try to shrink heap */
  		repair_frag(vacrelstats, onerel, &vacuum_pages, &fraged_pages,
! 					nindexes, Irel, OldestXmin);
  		vac_close_indexes(nindexes, Irel, NoLock);
  	}
  	else
***************
*** 1192,1203 ****
  	vac_update_fsm(onerel, &fraged_pages, vacrelstats->rel_pages);
  
  	/* update statistics in pg_class */
! 	vac_update_relstats(RelationGetRelid(onerel), vacrelstats->rel_pages,
! 						vacrelstats->rel_tuples, vacrelstats->hasindex);
  
  	/* report results to the stats collector, too */
  	pgstat_report_vacuum(RelationGetRelid(onerel), onerel->rd_rel->relisshared,
  		   				 vacstmt->analyze, vacrelstats->rel_tuples);
  }
  
  
--- 1222,1237 ----
  	vac_update_fsm(onerel, &fraged_pages, vacrelstats->rel_pages);
  
  	/* update statistics in pg_class */
! 	prevmin = vac_update_relstats(RelationGetRelid(onerel),
! 								  vacrelstats->rel_pages,
! 								  vacrelstats->rel_tuples,
! 								  vacrelstats->hasindex, vacrelstats->minxid);
  
  	/* report results to the stats collector, too */
  	pgstat_report_vacuum(RelationGetRelid(onerel), onerel->rd_rel->relisshared,
  		   				 vacstmt->analyze, vacrelstats->rel_tuples);
+ 
+ 	return prevmin;
  }
  
  
***************
*** 1207,1222 ****
   *		This routine sets commit status bits, constructs vacuum_pages (list
   *		of pages we need to compact free space on and/or clean indexes of
   *		deleted tuples), constructs fraged_pages (list of pages with free
!  *		space that tuples could be moved into), and calculates statistics
!  *		on the number of live tuples in the heap.
   */
  static void
  scan_heap(VRelStats *vacrelstats, Relation onerel,
! 		  VacPageList vacuum_pages, VacPageList fraged_pages)
  {
  	BlockNumber nblocks,
  				blkno;
- 	HeapTupleData tuple;
  	char	   *relname;
  	VacPage		vacpage;
  	BlockNumber empty_pages,
--- 1241,1257 ----
   *		This routine sets commit status bits, constructs vacuum_pages (list
   *		of pages we need to compact free space on and/or clean indexes of
   *		deleted tuples), constructs fraged_pages (list of pages with free
!  *		space that tuples could be moved into), calculates statistics on the
!  *		number of live tuples in the heap, and figures out the minimum normal
!  *		Xid present anywhere on the table.
   */
  static void
  scan_heap(VRelStats *vacrelstats, Relation onerel,
! 		  VacPageList vacuum_pages, VacPageList fraged_pages,
! 		  TransactionId FreezeLimit, TransactionId OldestXmin)
  {
  	BlockNumber nblocks,
  				blkno;
  	char	   *relname;
  	VacPage		vacpage;
  	BlockNumber empty_pages,
***************
*** 1325,1330 ****
--- 1360,1366 ----
  		{
  			ItemId		itemid = PageGetItemId(page, offnum);
  			bool		tupgone = false;
+ 			HeapTupleData tuple;
  
  			/*
  			 * Collect un-used items too - it's possible to have indexes
***************
*** 1468,1473 ****
--- 1504,1521 ----
  					min_tlen = tuple.t_len;
  				if (tuple.t_len > max_tlen)
  					max_tlen = tuple.t_len;
+ 
+ 				/* Checks for pg_class.relminxid */
+ 				if (TransactionIdIsNormal(HeapTupleHeaderGetXmin(tuple.t_data)) &&
+ 					TransactionIdPrecedes(HeapTupleHeaderGetXmin(tuple.t_data),
+ 										  vacrelstats->minxid))
+ 					vacrelstats->minxid = HeapTupleHeaderGetXmin(tuple.t_data);
+ 
+ 				if (TransactionIdIsNormal(HeapTupleHeaderGetXmax(tuple.t_data)) &&
+ 					TransactionIdPrecedes(HeapTupleHeaderGetXmax(tuple.t_data),
+ 										  vacrelstats->minxid))
+ 					vacrelstats->minxid = HeapTupleHeaderGetXmax(tuple.t_data);
+ 
  			}
  		}						/* scan along page */
  
***************
*** 1609,1615 ****
  static void
  repair_frag(VRelStats *vacrelstats, Relation onerel,
  			VacPageList vacuum_pages, VacPageList fraged_pages,
! 			int nindexes, Relation *Irel)
  {
  	TransactionId myXID = GetCurrentTransactionId();
  	Buffer		dst_buffer = InvalidBuffer;
--- 1657,1663 ----
  static void
  repair_frag(VRelStats *vacrelstats, Relation onerel,
  			VacPageList vacuum_pages, VacPageList fraged_pages,
! 			int nindexes, Relation *Irel, TransactionId OldestXmin)
  {
  	TransactionId myXID = GetCurrentTransactionId();
  	Buffer		dst_buffer = InvalidBuffer;
***************
*** 2967,2973 ****
  	/* now update statistics in pg_class */
  	vac_update_relstats(RelationGetRelid(indrel),
  						stats->num_pages, stats->num_index_tuples,
! 						false);
  
  	ereport(elevel,
  	   (errmsg("index \"%s\" now contains %.0f row versions in %u pages",
--- 3015,3021 ----
  	/* now update statistics in pg_class */
  	vac_update_relstats(RelationGetRelid(indrel),
  						stats->num_pages, stats->num_index_tuples,
! 						false, InvalidTransactionId);
  
  	ereport(elevel,
  	   (errmsg("index \"%s\" now contains %.0f row versions in %u pages",
***************
*** 3034,3040 ****
  	/* now update statistics in pg_class */
  	vac_update_relstats(RelationGetRelid(indrel),
  						stats->num_pages, stats->num_index_tuples,
! 						false);
  
  	ereport(elevel,
  	   (errmsg("index \"%s\" now contains %.0f row versions in %u pages",
--- 3082,3088 ----
  	/* now update statistics in pg_class */
  	vac_update_relstats(RelationGetRelid(indrel),
  						stats->num_pages, stats->num_index_tuples,
! 						false, InvalidTransactionId);
  
  	ereport(elevel,
  	   (errmsg("index \"%s\" now contains %.0f row versions in %u pages",
Index: src/backend/commands/vacuumlazy.c
===================================================================
RCS file: /home/alvherre/cvs/pgsql/src/backend/commands/vacuumlazy.c,v
retrieving revision 1.58
diff -c -r1.58 vacuumlazy.c
*** src/backend/commands/vacuumlazy.c	2 Sep 2005 19:02:20 -0000	1.58
--- src/backend/commands/vacuumlazy.c	22 Sep 2005 14:30:45 -0000
***************
*** 70,75 ****
--- 70,76 ----
  	double		tuples_deleted;
  	BlockNumber nonempty_pages; /* actually, last nonempty page + 1 */
  	Size		threshold;		/* minimum interesting free space */
+ 	TransactionId minxid;		/* minimum Xid present anywhere in table */
  	/* List of TIDs of tuples we intend to delete */
  	/* NB: this list is ordered by TID address */
  	int			num_dead_tuples;	/* current # of entries */
***************
*** 86,98 ****
  
  static int	elevel = -1;
  
- static TransactionId OldestXmin;
- static TransactionId FreezeLimit;
- 
  
  /* non-export function prototypes */
  static void lazy_scan_heap(Relation onerel, LVRelStats *vacrelstats,
! 			   Relation *Irel, int nindexes);
  static void lazy_vacuum_heap(Relation onerel, LVRelStats *vacrelstats);
  static void lazy_scan_index(Relation indrel, LVRelStats *vacrelstats);
  static void lazy_vacuum_index(Relation indrel,
--- 87,97 ----
  
  static int	elevel = -1;
  
  
  /* non-export function prototypes */
  static void lazy_scan_heap(Relation onerel, LVRelStats *vacrelstats,
! 			   Relation *Irel, int nindexes, TransactionId FreezeLimit,
! 			   TransactionId OldestXmin);
  static void lazy_vacuum_heap(Relation onerel, LVRelStats *vacrelstats);
  static void lazy_scan_index(Relation indrel, LVRelStats *vacrelstats);
  static void lazy_vacuum_index(Relation indrel,
***************
*** 101,109 ****
  							  LVRelStats *vacrelstats);
  static int lazy_vacuum_page(Relation onerel, BlockNumber blkno, Buffer buffer,
  				 int tupindex, LVRelStats *vacrelstats);
! static void lazy_truncate_heap(Relation onerel, LVRelStats *vacrelstats);
  static BlockNumber count_nondeletable_pages(Relation onerel,
! 						 LVRelStats *vacrelstats);
  static void lazy_space_alloc(LVRelStats *vacrelstats, BlockNumber relblocks);
  static void lazy_record_dead_tuple(LVRelStats *vacrelstats,
  					   ItemPointer itemptr);
--- 100,109 ----
  							  LVRelStats *vacrelstats);
  static int lazy_vacuum_page(Relation onerel, BlockNumber blkno, Buffer buffer,
  				 int tupindex, LVRelStats *vacrelstats);
! static void lazy_truncate_heap(Relation onerel, LVRelStats *vacrelstats,
! 							   TransactionId OldestXmin);
  static BlockNumber count_nondeletable_pages(Relation onerel,
! 						 LVRelStats *vacrelstats, TransactionId OldestXmin);
  static void lazy_space_alloc(LVRelStats *vacrelstats, BlockNumber relblocks);
  static void lazy_record_dead_tuple(LVRelStats *vacrelstats,
  					   ItemPointer itemptr);
***************
*** 124,131 ****
   *
   *		At entry, we have already established a transaction and opened
   *		and locked the relation.
   */
! void
  lazy_vacuum_rel(Relation onerel, VacuumStmt *vacstmt)
  {
  	LVRelStats *vacrelstats;
--- 124,133 ----
   *
   *		At entry, we have already established a transaction and opened
   *		and locked the relation.
+  *
+  *		The return value is the pg_class.relminxid previous to the vacuum.
   */
! TransactionId
  lazy_vacuum_rel(Relation onerel, VacuumStmt *vacstmt)
  {
  	LVRelStats *vacrelstats;
***************
*** 133,138 ****
--- 135,143 ----
  	int			nindexes;
  	bool		hasindex;
  	BlockNumber possibly_freeable;
+ 	TransactionId prevmin,
+ 				  OldestXmin,
+ 				  FreezeLimit;
  
  	if (vacstmt->verbose)
  		elevel = INFO;
***************
*** 148,159 ****
  	/* XXX should we scale it up or down?  Adjust vacuum.c too, if so */
  	vacrelstats->threshold = GetAvgFSMRequestSize(&onerel->rd_node);
  
  	/* Open all indexes of the relation */
  	vac_open_indexes(onerel, ShareUpdateExclusiveLock, &nindexes, &Irel);
  	hasindex = (nindexes > 0);
  
  	/* Do the vacuuming */
! 	lazy_scan_heap(onerel, vacrelstats, Irel, nindexes);
  
  	/* Done with indexes */
  	vac_close_indexes(nindexes, Irel, NoLock);
--- 153,167 ----
  	/* XXX should we scale it up or down?  Adjust vacuum.c too, if so */
  	vacrelstats->threshold = GetAvgFSMRequestSize(&onerel->rd_node);
  
+ 	/* Set initial minimum Xid in table */
+ 	vacrelstats->minxid = RecentXmin;
+ 
  	/* Open all indexes of the relation */
  	vac_open_indexes(onerel, ShareUpdateExclusiveLock, &nindexes, &Irel);
  	hasindex = (nindexes > 0);
  
  	/* Do the vacuuming */
! 	lazy_scan_heap(onerel, vacrelstats, Irel, nindexes, FreezeLimit, OldestXmin);
  
  	/* Done with indexes */
  	vac_close_indexes(nindexes, Irel, NoLock);
***************
*** 167,186 ****
  	possibly_freeable = vacrelstats->rel_pages - vacrelstats->nonempty_pages;
  	if (possibly_freeable >= REL_TRUNCATE_MINIMUM ||
  	 possibly_freeable >= vacrelstats->rel_pages / REL_TRUNCATE_FRACTION)
! 		lazy_truncate_heap(onerel, vacrelstats);
  
  	/* Update shared free space map with final free space info */
  	lazy_update_fsm(onerel, vacrelstats);
  
  	/* Update statistics in pg_class */
! 	vac_update_relstats(RelationGetRelid(onerel),
! 						vacrelstats->rel_pages,
! 						vacrelstats->rel_tuples,
! 						hasindex);
  
  	/* report results to the stats collector, too */
  	pgstat_report_vacuum(RelationGetRelid(onerel), onerel->rd_rel->relisshared,
  		   				 vacstmt->analyze, vacrelstats->rel_tuples);
  }
  
  
--- 175,196 ----
  	possibly_freeable = vacrelstats->rel_pages - vacrelstats->nonempty_pages;
  	if (possibly_freeable >= REL_TRUNCATE_MINIMUM ||
  	 possibly_freeable >= vacrelstats->rel_pages / REL_TRUNCATE_FRACTION)
! 		lazy_truncate_heap(onerel, vacrelstats, OldestXmin);
  
  	/* Update shared free space map with final free space info */
  	lazy_update_fsm(onerel, vacrelstats);
  
  	/* Update statistics in pg_class */
! 	prevmin = vac_update_relstats(RelationGetRelid(onerel),
! 								  vacrelstats->rel_pages,
! 								  vacrelstats->rel_tuples, hasindex,
! 								  vacrelstats->minxid);
  
  	/* report results to the stats collector, too */
  	pgstat_report_vacuum(RelationGetRelid(onerel), onerel->rd_rel->relisshared,
  		   				 vacstmt->analyze, vacrelstats->rel_tuples);
+ 
+ 	return prevmin;
  }
  
  
***************
*** 191,200 ****
   *		and pages with free space, and calculates statistics on the number
   *		of live tuples in the heap.  When done, or when we run low on space
   *		for dead-tuple TIDs, invoke vacuuming of indexes and heap.
   */
  static void
  lazy_scan_heap(Relation onerel, LVRelStats *vacrelstats,
! 			   Relation *Irel, int nindexes)
  {
  	BlockNumber nblocks,
  				blkno;
--- 201,214 ----
   *		and pages with free space, and calculates statistics on the number
   *		of live tuples in the heap.  When done, or when we run low on space
   *		for dead-tuple TIDs, invoke vacuuming of indexes and heap.
+  *
+  *		It also updates the minimum Xid found anywhere on the table, for
+  *		pg_class.relminxid.
   */
  static void
  lazy_scan_heap(Relation onerel, LVRelStats *vacrelstats,
! 			   Relation *Irel, int nindexes, TransactionId FreezeLimit,
! 			   TransactionId OldestXmin)
  {
  	BlockNumber nblocks,
  				blkno;
***************
*** 420,425 ****
--- 434,450 ----
  			{
  				num_tuples += 1;
  				hastup = true;
+ 
+ 				/* Checks for pg_class.relminxid */
+ 				if (TransactionIdIsNormal(HeapTupleHeaderGetXmin(tuple.t_data)) &&
+ 					TransactionIdPrecedes(HeapTupleHeaderGetXmin(tuple.t_data),
+ 										  vacrelstats->minxid))
+ 					vacrelstats->minxid = HeapTupleHeaderGetXmin(tuple.t_data);
+ 
+ 				if (TransactionIdIsNormal(HeapTupleHeaderGetXmax(tuple.t_data)) &&
+ 					TransactionIdPrecedes(HeapTupleHeaderGetXmax(tuple.t_data),
+ 										  vacrelstats->minxid))
+ 					vacrelstats->minxid = HeapTupleHeaderGetXmax(tuple.t_data);
  			}
  		}						/* scan along page */
  
***************
*** 644,650 ****
  	vac_update_relstats(RelationGetRelid(indrel),
  						stats->num_pages,
  						stats->num_index_tuples,
! 						false);
  
  	ereport(elevel,
  	   (errmsg("index \"%s\" now contains %.0f row versions in %u pages",
--- 669,675 ----
  	vac_update_relstats(RelationGetRelid(indrel),
  						stats->num_pages,
  						stats->num_index_tuples,
! 						false, InvalidTransactionId);
  
  	ereport(elevel,
  	   (errmsg("index \"%s\" now contains %.0f row versions in %u pages",
***************
*** 720,726 ****
  	vac_update_relstats(RelationGetRelid(indrel),
  						stats->num_pages,
  						stats->num_index_tuples,
! 						false);
  
  	ereport(elevel,
  	   (errmsg("index \"%s\" now contains %.0f row versions in %u pages",
--- 745,751 ----
  	vac_update_relstats(RelationGetRelid(indrel),
  						stats->num_pages,
  						stats->num_index_tuples,
! 						false, InvalidTransactionId);
  
  	ereport(elevel,
  	   (errmsg("index \"%s\" now contains %.0f row versions in %u pages",
***************
*** 741,747 ****
   * lazy_truncate_heap - try to truncate off any empty pages at the end
   */
  static void
! lazy_truncate_heap(Relation onerel, LVRelStats *vacrelstats)
  {
  	BlockNumber old_rel_pages = vacrelstats->rel_pages;
  	BlockNumber new_rel_pages;
--- 766,773 ----
   * lazy_truncate_heap - try to truncate off any empty pages at the end
   */
  static void
! lazy_truncate_heap(Relation onerel, LVRelStats *vacrelstats,
! 				   TransactionId OldestXmin)
  {
  	BlockNumber old_rel_pages = vacrelstats->rel_pages;
  	BlockNumber new_rel_pages;
***************
*** 783,789 ****
  	 * optional, because other backends could have added tuples to these
  	 * pages whilst we were vacuuming.
  	 */
! 	new_rel_pages = count_nondeletable_pages(onerel, vacrelstats);
  
  	if (new_rel_pages >= old_rel_pages)
  	{
--- 809,815 ----
  	 * optional, because other backends could have added tuples to these
  	 * pages whilst we were vacuuming.
  	 */
! 	new_rel_pages = count_nondeletable_pages(onerel, vacrelstats, OldestXmin);
  
  	if (new_rel_pages >= old_rel_pages)
  	{
***************
*** 838,844 ****
   * Returns number of nondeletable pages (last nonempty page + 1).
   */
  static BlockNumber
! count_nondeletable_pages(Relation onerel, LVRelStats *vacrelstats)
  {
  	BlockNumber blkno;
  	HeapTupleData tuple;
--- 864,871 ----
   * Returns number of nondeletable pages (last nonempty page + 1).
   */
  static BlockNumber
! count_nondeletable_pages(Relation onerel, LVRelStats *vacrelstats,
! 						 TransactionId OldestXmin)
  {
  	BlockNumber blkno;
  	HeapTupleData tuple;
Index: src/backend/libpq/hba.c
===================================================================
RCS file: /home/alvherre/cvs/pgsql/src/backend/libpq/hba.c,v
retrieving revision 1.147
diff -c -r1.147 hba.c
*** src/backend/libpq/hba.c	11 Aug 2005 21:11:44 -0000	1.147
--- src/backend/libpq/hba.c	20 Sep 2005 20:42:35 -0000
***************
*** 1000,1015 ****
   *	dbname: gets database name (must be of size NAMEDATALEN bytes)
   *	dboid: gets database OID
   *	dbtablespace: gets database's default tablespace's OID
!  *	dbfrozenxid: gets database's frozen XID
!  *	dbvacuumxid: gets database's vacuum XID
   *
   * This is not much related to the other functions in hba.c, but we put it
   * here because it uses the next_token() infrastructure.
   */
  bool
  read_pg_database_line(FILE *fp, char *dbname, Oid *dboid,
! 					  Oid *dbtablespace, TransactionId *dbfrozenxid,
! 					  TransactionId *dbvacuumxid)
  {
  	char		buf[MAX_TOKEN];
  
--- 1000,1013 ----
   *	dbname: gets database name (must be of size NAMEDATALEN bytes)
   *	dboid: gets database OID
   *	dbtablespace: gets database's default tablespace's OID
!  *	dbminxid: gets database's minimum XID
   *
   * This is not much related to the other functions in hba.c, but we put it
   * here because it uses the next_token() infrastructure.
   */
  bool
  read_pg_database_line(FILE *fp, char *dbname, Oid *dboid,
! 					  Oid *dbtablespace, TransactionId *dbminxid)
  {
  	char		buf[MAX_TOKEN];
  
***************
*** 1031,1041 ****
  	next_token(fp, buf, sizeof(buf));
  	if (!isdigit((unsigned char) buf[0]))
  		elog(FATAL, "bad data in flat pg_database file");
! 	*dbfrozenxid = atoxid(buf);
! 	next_token(fp, buf, sizeof(buf));
! 	if (!isdigit((unsigned char) buf[0]))
! 		elog(FATAL, "bad data in flat pg_database file");
! 	*dbvacuumxid = atoxid(buf);
  	/* expect EOL next */
  	if (next_token(fp, buf, sizeof(buf)))
  		elog(FATAL, "bad data in flat pg_database file");
--- 1029,1035 ----
  	next_token(fp, buf, sizeof(buf));
  	if (!isdigit((unsigned char) buf[0]))
  		elog(FATAL, "bad data in flat pg_database file");
! 	*dbminxid = atoxid(buf);
  	/* expect EOL next */
  	if (next_token(fp, buf, sizeof(buf)))
  		elog(FATAL, "bad data in flat pg_database file");
Index: src/backend/postmaster/autovacuum.c
===================================================================
RCS file: /home/alvherre/cvs/pgsql/src/backend/postmaster/autovacuum.c,v
retrieving revision 1.4
diff -c -r1.4 autovacuum.c
*** src/backend/postmaster/autovacuum.c	15 Aug 2005 16:25:17 -0000	1.4
--- src/backend/postmaster/autovacuum.c	20 Sep 2005 20:45:55 -0000
***************
*** 76,83 ****
  {
  	Oid				oid;
  	char		   *name;
! 	TransactionId	frozenxid;
! 	TransactionId	vacuumxid;
  	PgStat_StatDBEntry *entry;
  	int32			age;
  } autovac_dbase;
--- 76,82 ----
  {
  	Oid				oid;
  	char		   *name;
! 	TransactionId	minxid;
  	PgStat_StatDBEntry *entry;
  	int32			age;
  } autovac_dbase;
***************
*** 328,335 ****
  	{
  		autovac_dbase  *tmp = lfirst(cell);
  		bool			this_whole_db;
- 		int32			freeze_age,
- 						vacuum_age;
  
  		/*
  		 * We look for the database that most urgently needs a database-wide
--- 327,332 ----
***************
*** 337,349 ****
  		 * transactions sooner than vacuum.c's vac_truncate_clog() would
  		 * decide to start giving warnings.  If any such db is found, we
  		 * ignore all other dbs.
! 		 *
! 		 * Unlike vacuum.c, we also look at vacuumxid.  This is so that
! 		 * pg_clog can be kept trimmed to a reasonable size.
! 		 */
! 		freeze_age = (int32) (nextXid - tmp->frozenxid);
! 		vacuum_age = (int32) (nextXid - tmp->vacuumxid);
! 		tmp->age = Max(freeze_age, vacuum_age);
  
  		this_whole_db = (tmp->age >
  						 (int32) ((MaxTransactionId >> 3) * 3 - 100000));
--- 334,341 ----
  		 * transactions sooner than vacuum.c's vac_truncate_clog() would
  		 * decide to start giving warnings.  If any such db is found, we
  		 * ignore all other dbs.
! 		 */
! 		tmp->age = (int32) (nextXid - tmp->minxid);
  
  		this_whole_db = (tmp->age >
  						 (int32) ((MaxTransactionId >> 3) * 3 - 100000));
***************
*** 443,450 ****
  	FILE   *db_file;
  	Oid		db_id;
  	Oid		db_tablespace;
! 	TransactionId db_frozenxid;
! 	TransactionId db_vacuumxid;
  
  	filename = database_getflatfilename();
  	db_file = AllocateFile(filename, "r");
--- 435,441 ----
  	FILE   *db_file;
  	Oid		db_id;
  	Oid		db_tablespace;
! 	TransactionId db_minxid;
  
  	filename = database_getflatfilename();
  	db_file = AllocateFile(filename, "r");
***************
*** 454,461 ****
  				 errmsg("could not open file \"%s\": %m", filename)));
  
  	while (read_pg_database_line(db_file, thisname, &db_id,
! 								 &db_tablespace, &db_frozenxid,
! 								 &db_vacuumxid))
  	{
  		autovac_dbase	*db;
  
--- 445,451 ----
  				 errmsg("could not open file \"%s\": %m", filename)));
  
  	while (read_pg_database_line(db_file, thisname, &db_id,
! 								 &db_tablespace, &db_minxid))
  	{
  		autovac_dbase	*db;
  
***************
*** 463,470 ****
  
  		db->oid = db_id;
  		db->name = pstrdup(thisname);
! 		db->frozenxid = db_frozenxid;
! 		db->vacuumxid = db_vacuumxid;
  		/* these get set later: */
  		db->entry = NULL;
  		db->age = 0;
--- 453,459 ----
  
  		db->oid = db_id;
  		db->name = pstrdup(thisname);
! 		db->minxid = db_minxid;
  		/* these get set later: */
  		db->entry = NULL;
  		db->age = 0;
Index: src/backend/utils/init/flatfiles.c
===================================================================
RCS file: /home/alvherre/cvs/pgsql/src/backend/utils/init/flatfiles.c,v
retrieving revision 1.14
diff -c -r1.14 flatfiles.c
*** src/backend/utils/init/flatfiles.c	11 Aug 2005 21:11:46 -0000	1.14
--- src/backend/utils/init/flatfiles.c	20 Sep 2005 20:25:32 -0000
***************
*** 163,169 ****
  /*
   * write_database_file: update the flat database file
   *
!  * A side effect is to determine the oldest database's datfrozenxid
   * so we can set or update the XID wrap limit.
   */
  static void
--- 163,169 ----
  /*
   * write_database_file: update the flat database file
   *
!  * A side effect is to determine the oldest database's datminxid
   * so we can set or update the XID wrap limit.
   */
  static void
***************
*** 177,183 ****
  	HeapScanDesc scan;
  	HeapTuple	tuple;
  	NameData	oldest_datname;
! 	TransactionId oldest_datfrozenxid = InvalidTransactionId;
  
  	/*
  	 * Create a temporary filename to be renamed later.  This prevents the
--- 177,183 ----
  	HeapScanDesc scan;
  	HeapTuple	tuple;
  	NameData	oldest_datname;
! 	TransactionId oldest_datminxid = InvalidTransactionId;
  
  	/*
  	 * Create a temporary filename to be renamed later.  This prevents the
***************
*** 208,234 ****
  		char	   *datname;
  		Oid			datoid;
  		Oid			dattablespace;
! 		TransactionId datfrozenxid,
! 					  datvacuumxid;
  
  		datname = NameStr(dbform->datname);
  		datoid = HeapTupleGetOid(tuple);
  		dattablespace = dbform->dattablespace;
! 		datfrozenxid = dbform->datfrozenxid;
! 		datvacuumxid = dbform->datvacuumxid;
  
  		/*
! 		 * Identify the oldest datfrozenxid, ignoring databases that are not
  		 * connectable (we assume they are safely frozen).  This must match
  		 * the logic in vac_truncate_clog() in vacuum.c.
  		 */
  		if (dbform->datallowconn &&
! 			TransactionIdIsNormal(datfrozenxid))
  		{
! 			if (oldest_datfrozenxid == InvalidTransactionId ||
! 				TransactionIdPrecedes(datfrozenxid, oldest_datfrozenxid))
  			{
! 				oldest_datfrozenxid = datfrozenxid;
  				namestrcpy(&oldest_datname, datname);
  			}
  		}
--- 208,232 ----
  		char	   *datname;
  		Oid			datoid;
  		Oid			dattablespace;
! 		TransactionId datminxid;
  
  		datname = NameStr(dbform->datname);
  		datoid = HeapTupleGetOid(tuple);
  		dattablespace = dbform->dattablespace;
! 		datminxid = dbform->datminxid;
  
  		/*
! 		 * Identify the oldest datminxid, ignoring databases that are not
  		 * connectable (we assume they are safely frozen).  This must match
  		 * the logic in vac_truncate_clog() in vacuum.c.
  		 */
  		if (dbform->datallowconn &&
! 			TransactionIdIsNormal(datminxid))
  		{
! 			if (oldest_datminxid == InvalidTransactionId ||
! 				TransactionIdPrecedes(datminxid, oldest_datminxid))
  			{
! 				oldest_datminxid = datminxid;
  				namestrcpy(&oldest_datname, datname);
  			}
  		}
***************
*** 244,257 ****
  		}
  
  		/*
! 		 * The file format is: "dbname" oid tablespace frozenxid vacuumxid
  		 *
! 		 * The xids are not needed for backend startup, but are of use to
  		 * autovacuum, and might also be helpful for forensic purposes.
  		 */
  		fputs_quote(datname, fp);
! 		fprintf(fp, " %u %u %u %u\n",
! 				datoid, dattablespace, datfrozenxid, datvacuumxid);
  	}
  	heap_endscan(scan);
  
--- 242,255 ----
  		}
  
  		/*
! 		 * The file format is: "dbname" oid tablespace minxid
  		 *
! 		 * The minxid is not needed for backend startup, but is of use to
  		 * autovacuum, and might also be helpful for forensic purposes.
  		 */
  		fputs_quote(datname, fp);
! 		fprintf(fp, " %u %u %u\n",
! 				datoid, dattablespace, datminxid);
  	}
  	heap_endscan(scan);
  
***************
*** 272,281 ****
  						tempname, filename)));
  
  	/*
! 	 * Set the transaction ID wrap limit using the oldest datfrozenxid
  	 */
! 	if (oldest_datfrozenxid != InvalidTransactionId)
! 		SetTransactionIdLimit(oldest_datfrozenxid, &oldest_datname);
  }
  
  
--- 270,279 ----
  						tempname, filename)));
  
  	/*
! 	 * Set the transaction ID wrap limit using the oldest datminxid
  	 */
! 	if (oldest_datminxid != InvalidTransactionId)
! 		SetTransactionIdLimit(oldest_datminxid, &oldest_datname);
  }
  
  
Index: src/backend/utils/init/postinit.c
===================================================================
RCS file: /home/alvherre/cvs/pgsql/src/backend/utils/init/postinit.c,v
retrieving revision 1.157
diff -c -r1.157 postinit.c
*** src/backend/utils/init/postinit.c	11 Aug 2005 21:11:46 -0000	1.157
--- src/backend/utils/init/postinit.c	20 Sep 2005 21:01:46 -0000
***************
*** 91,98 ****
  				 errmsg("could not open file \"%s\": %m", filename)));
  
  	while (read_pg_database_line(db_file, thisname, db_id,
! 								 db_tablespace, &dummyxid,
! 								 &dummyxid))
  	{
  		if (strcmp(thisname, name) == 0)
  		{
--- 91,97 ----
  				 errmsg("could not open file \"%s\": %m", filename)));
  
  	while (read_pg_database_line(db_file, thisname, db_id,
! 								 db_tablespace, &dummyxid))
  	{
  		if (strcmp(thisname, name) == 0)
  		{
Index: src/include/access/transam.h
===================================================================
RCS file: /home/alvherre/cvs/pgsql/src/include/access/transam.h,v
retrieving revision 1.55
diff -c -r1.55 transam.h
*** src/include/access/transam.h	12 Aug 2005 01:36:03 -0000	1.55
--- src/include/access/transam.h	20 Sep 2005 20:31:50 -0000
***************
*** 123,129 ****
  /* in transam/varsup.c */
  extern TransactionId GetNewTransactionId(bool isSubXact);
  extern TransactionId ReadNewTransactionId(void);
! extern void SetTransactionIdLimit(TransactionId oldest_datfrozenxid,
  								  Name oldest_datname);
  extern Oid	GetNewObjectId(void);
  
--- 123,129 ----
  /* in transam/varsup.c */
  extern TransactionId GetNewTransactionId(bool isSubXact);
  extern TransactionId ReadNewTransactionId(void);
! extern void SetTransactionIdLimit(TransactionId oldest_datminxid,
  								  Name oldest_datname);
  extern Oid	GetNewObjectId(void);
  
Index: src/include/catalog/pg_attribute.h
===================================================================
RCS file: /home/alvherre/cvs/pgsql/src/include/catalog/pg_attribute.h,v
retrieving revision 1.118
diff -c -r1.118 pg_attribute.h
*** src/include/catalog/pg_attribute.h	28 Jun 2005 05:09:04 -0000	1.118
--- src/include/catalog/pg_attribute.h	15 Sep 2005 21:50:29 -0000
***************
*** 406,412 ****
  { 1259, {"relhaspkey"},    16, -1,	1, 22, 0, -1, -1, true, 'p', 'c', true, false, false, true, 0 }, \
  { 1259, {"relhasrules"},   16, -1,	1, 23, 0, -1, -1, true, 'p', 'c', true, false, false, true, 0 }, \
  { 1259, {"relhassubclass"},16, -1,	1, 24, 0, -1, -1, true, 'p', 'c', true, false, false, true, 0 }, \
! { 1259, {"relacl"},		 1034, -1, -1, 25, 1, -1, -1, false, 'x', 'i', false, false, false, true, 0 }
  
  DATA(insert ( 1259 relname			19 -1 NAMEDATALEN	1 0 -1 -1 f p i t f f t 0));
  DATA(insert ( 1259 relnamespace		26 -1 4   2 0 -1 -1 t p i t f f t 0));
--- 406,413 ----
  { 1259, {"relhaspkey"},    16, -1,	1, 22, 0, -1, -1, true, 'p', 'c', true, false, false, true, 0 }, \
  { 1259, {"relhasrules"},   16, -1,	1, 23, 0, -1, -1, true, 'p', 'c', true, false, false, true, 0 }, \
  { 1259, {"relhassubclass"},16, -1,	1, 24, 0, -1, -1, true, 'p', 'c', true, false, false, true, 0 }, \
! { 1259, {"relminxid"},  28, -1,	4, 25, 0, -1, -1, true, 'p', 'i', true, false, false, true, 0 }, \
! { 1259, {"relacl"},		 1034, -1, -1, 26, 1, -1, -1, false, 'x', 'i', false, false, false, true, 0 }
  
  DATA(insert ( 1259 relname			19 -1 NAMEDATALEN	1 0 -1 -1 f p i t f f t 0));
  DATA(insert ( 1259 relnamespace		26 -1 4   2 0 -1 -1 t p i t f f t 0));
***************
*** 432,438 ****
  DATA(insert ( 1259 relhaspkey		16 -1 1  22 0 -1 -1 t p c t f f t 0));
  DATA(insert ( 1259 relhasrules		16 -1 1  23 0 -1 -1 t p c t f f t 0));
  DATA(insert ( 1259 relhassubclass	16 -1 1  24 0 -1 -1 t p c t f f t 0));
! DATA(insert ( 1259 relacl		  1034 -1 -1 25 1 -1 -1 f x i f f f t 0));
  DATA(insert ( 1259 ctid				27 0  6  -1 0 -1 -1 f p s t f f t 0));
  DATA(insert ( 1259 oid				26 0  4  -2 0 -1 -1 t p i t f f t 0));
  DATA(insert ( 1259 xmin				28 0  4  -3 0 -1 -1 t p i t f f t 0));
--- 433,440 ----
  DATA(insert ( 1259 relhaspkey		16 -1 1  22 0 -1 -1 t p c t f f t 0));
  DATA(insert ( 1259 relhasrules		16 -1 1  23 0 -1 -1 t p c t f f t 0));
  DATA(insert ( 1259 relhassubclass	16 -1 1  24 0 -1 -1 t p c t f f t 0));
! DATA(insert ( 1259 relminxid		28 -1 4  25 0 -1 -1 t p i t f f t 0));
! DATA(insert ( 1259 relacl		  1034 -1 -1 26 1 -1 -1 f x i f f f t 0));
  DATA(insert ( 1259 ctid				27 0  6  -1 0 -1 -1 f p s t f f t 0));
  DATA(insert ( 1259 oid				26 0  4  -2 0 -1 -1 t p i t f f t 0));
  DATA(insert ( 1259 xmin				28 0  4  -3 0 -1 -1 t p i t f f t 0));
Index: src/include/catalog/pg_class.h
===================================================================
RCS file: /home/alvherre/cvs/pgsql/src/include/catalog/pg_class.h,v
retrieving revision 1.89
diff -c -r1.89 pg_class.h
*** src/include/catalog/pg_class.h	28 Jun 2005 05:09:05 -0000	1.89
--- src/include/catalog/pg_class.h	15 Sep 2005 21:50:29 -0000
***************
*** 74,79 ****
--- 74,80 ----
  	bool		relhaspkey;		/* has PRIMARY KEY index */
  	bool		relhasrules;	/* has associated rules */
  	bool		relhassubclass; /* has derived classes */
+ 	TransactionId relminxid;	/* minimum Xid present in table */
  
  	/*
  	 * relacl may or may not be present, see note above!
***************
*** 83,89 ****
  
  /* Size of fixed part of pg_class tuples, not counting relacl or padding */
  #define CLASS_TUPLE_SIZE \
! 	 (offsetof(FormData_pg_class,relhassubclass) + sizeof(bool))
  
  /* ----------------
   *		Form_pg_class corresponds to a pointer to a tuple with
--- 84,90 ----
  
  /* Size of fixed part of pg_class tuples, not counting relacl or padding */
  #define CLASS_TUPLE_SIZE \
! 	 (offsetof(FormData_pg_class,relminxid) + sizeof(TransactionId))
  
  /* ----------------
   *		Form_pg_class corresponds to a pointer to a tuple with
***************
*** 103,110 ****
   *		relacl field.  This is a kluge.
   * ----------------
   */
! #define Natts_pg_class_fixed			24
! #define Natts_pg_class					25
  #define Anum_pg_class_relname			1
  #define Anum_pg_class_relnamespace		2
  #define Anum_pg_class_reltype			3
--- 104,111 ----
   *		relacl field.  This is a kluge.
   * ----------------
   */
! #define Natts_pg_class_fixed			25
! #define Natts_pg_class					26
  #define Anum_pg_class_relname			1
  #define Anum_pg_class_relnamespace		2
  #define Anum_pg_class_reltype			3
***************
*** 129,135 ****
  #define Anum_pg_class_relhaspkey		22
  #define Anum_pg_class_relhasrules		23
  #define Anum_pg_class_relhassubclass	24
! #define Anum_pg_class_relacl			25
  
  /* ----------------
   *		initial contents of pg_class
--- 130,137 ----
  #define Anum_pg_class_relhaspkey		22
  #define Anum_pg_class_relhasrules		23
  #define Anum_pg_class_relhassubclass	24
! #define Anum_pg_class_relminxid		25
! #define Anum_pg_class_relacl			26
  
  /* ----------------
   *		initial contents of pg_class
***************
*** 139,151 ****
   * ----------------
   */
  
! DATA(insert OID = 1247 (  pg_type		PGNSP 71 PGUID 0 1247 0 0 0 0 0 f f r 23 0 0 0 0 0 t f f f _null_ ));
  DESCR("");
! DATA(insert OID = 1249 (  pg_attribute	PGNSP 75 PGUID 0 1249 0 0 0 0 0 f f r 17 0 0 0 0 0 f f f f _null_ ));
  DESCR("");
! DATA(insert OID = 1255 (  pg_proc		PGNSP 81 PGUID 0 1255 0 0 0 0 0 f f r 18 0 0 0 0 0 t f f f _null_ ));
  DESCR("");
! DATA(insert OID = 1259 (  pg_class		PGNSP 83 PGUID 0 1259 0 0 0 0 0 f f r 25 0 0 0 0 0 t f f f _null_ ));
  DESCR("");
  
  #define		  RELKIND_INDEX			  'i'		/* secondary index */
--- 141,153 ----
   * ----------------
   */
  
! DATA(insert OID = 1247 (  pg_type		PGNSP 71 PGUID 0 1247 0 0 0 0 0 f f r 23 0 0 0 0 0 t f f f 0 _null_ ));
  DESCR("");
! DATA(insert OID = 1249 (  pg_attribute	PGNSP 75 PGUID 0 1249 0 0 0 0 0 f f r 17 0 0 0 0 0 f f f f 0 _null_ ));
  DESCR("");
! DATA(insert OID = 1255 (  pg_proc		PGNSP 81 PGUID 0 1255 0 0 0 0 0 f f r 18 0 0 0 0 0 t f f f 0 _null_ ));
  DESCR("");
! DATA(insert OID = 1259 (  pg_class		PGNSP 83 PGUID 0 1259 0 0 0 0 0 f f r 26 0 0 0 0 0 t f f f 0 _null_ ));
  DESCR("");
  
  #define		  RELKIND_INDEX			  'i'		/* secondary index */
Index: src/include/catalog/pg_database.h
===================================================================
RCS file: /home/alvherre/cvs/pgsql/src/include/catalog/pg_database.h,v
retrieving revision 1.37
diff -c -r1.37 pg_database.h
*** src/include/catalog/pg_database.h	31 Jul 2005 17:19:21 -0000	1.37
--- src/include/catalog/pg_database.h	20 Sep 2005 19:48:59 -0000
***************
*** 42,49 ****
  	bool		datallowconn;	/* new connections allowed? */
  	int4		datconnlimit;	/* max connections allowed (-1=no limit) */
  	Oid			datlastsysoid;	/* highest OID to consider a system OID */
! 	TransactionId datvacuumxid; /* all XIDs before this are vacuumed */
! 	TransactionId datfrozenxid; /* all XIDs before this are frozen */
  	Oid			dattablespace;	/* default table space for this DB */
  	text		datconfig[1];	/* database-specific GUC (VAR LENGTH) */
  	aclitem		datacl[1];		/* access permissions (VAR LENGTH) */
--- 42,48 ----
  	bool		datallowconn;	/* new connections allowed? */
  	int4		datconnlimit;	/* max connections allowed (-1=no limit) */
  	Oid			datlastsysoid;	/* highest OID to consider a system OID */
! 	TransactionId datminxid;	/* no table contains an Xid below this one */
  	Oid			dattablespace;	/* default table space for this DB */
  	text		datconfig[1];	/* database-specific GUC (VAR LENGTH) */
  	aclitem		datacl[1];		/* access permissions (VAR LENGTH) */
***************
*** 60,66 ****
   *		compiler constants for pg_database
   * ----------------
   */
! #define Natts_pg_database				12
  #define Anum_pg_database_datname		1
  #define Anum_pg_database_datdba			2
  #define Anum_pg_database_encoding		3
--- 59,65 ----
   *		compiler constants for pg_database
   * ----------------
   */
! #define Natts_pg_database				11
  #define Anum_pg_database_datname		1
  #define Anum_pg_database_datdba			2
  #define Anum_pg_database_encoding		3
***************
*** 68,80 ****
  #define Anum_pg_database_datallowconn	5
  #define Anum_pg_database_datconnlimit	6
  #define Anum_pg_database_datlastsysoid	7
! #define Anum_pg_database_datvacuumxid	8
! #define Anum_pg_database_datfrozenxid	9
! #define Anum_pg_database_dattablespace	10
! #define Anum_pg_database_datconfig		11
! #define Anum_pg_database_datacl			12
  
! DATA(insert OID = 1 (  template1 PGUID ENCODING t t -1 0 0 0 1663 _null_ _null_ ));
  DESCR("Default template database");
  #define TemplateDbOid			1
  
--- 67,78 ----
  #define Anum_pg_database_datallowconn	5
  #define Anum_pg_database_datconnlimit	6
  #define Anum_pg_database_datlastsysoid	7
! #define Anum_pg_database_datminxid		8
! #define Anum_pg_database_dattablespace	9
! #define Anum_pg_database_datconfig		10
! #define Anum_pg_database_datacl			11
  
! DATA(insert OID = 1 (  template1 PGUID ENCODING t t -1 0 0 1663 _null_ _null_ ));
  DESCR("Default template database");
  #define TemplateDbOid			1
  
Index: src/include/commands/vacuum.h
===================================================================
RCS file: /home/alvherre/cvs/pgsql/src/include/commands/vacuum.h,v
retrieving revision 1.60
diff -c -r1.60 vacuum.h
*** src/include/commands/vacuum.h	14 Jul 2005 05:13:43 -0000	1.60
--- src/include/commands/vacuum.h	21 Sep 2005 15:05:37 -0000
***************
*** 129,138 ****
  extern void vac_open_indexes(Relation relation, LOCKMODE lockmode,
  							 int *nindexes, Relation **Irel);
  extern void vac_close_indexes(int nindexes, Relation *Irel, LOCKMODE lockmode);
! extern void vac_update_relstats(Oid relid,
! 					BlockNumber num_pages,
! 					double num_tuples,
! 					bool hasindex);
  extern void vacuum_set_xid_limits(VacuumStmt *vacstmt, bool sharedRel,
  					  TransactionId *oldestXmin,
  					  TransactionId *freezeLimit);
--- 129,137 ----
  extern void vac_open_indexes(Relation relation, LOCKMODE lockmode,
  							 int *nindexes, Relation **Irel);
  extern void vac_close_indexes(int nindexes, Relation *Irel, LOCKMODE lockmode);
! extern TransactionId vac_update_relstats(Oid relid, BlockNumber num_pages,
! 										 double num_tuples, bool hasindex,
! 										 TransactionId minxid);
  extern void vacuum_set_xid_limits(VacuumStmt *vacstmt, bool sharedRel,
  					  TransactionId *oldestXmin,
  					  TransactionId *freezeLimit);
***************
*** 142,148 ****
  extern void vacuum_delay_point(void);
  
  /* in commands/vacuumlazy.c */
! extern void lazy_vacuum_rel(Relation onerel, VacuumStmt *vacstmt);
  
  /* in commands/analyze.c */
  extern void analyze_rel(Oid relid, VacuumStmt *vacstmt);
--- 141,147 ----
  extern void vacuum_delay_point(void);
  
  /* in commands/vacuumlazy.c */
! extern TransactionId lazy_vacuum_rel(Relation onerel, VacuumStmt *vacstmt);
  
  /* in commands/analyze.c */
  extern void analyze_rel(Oid relid, VacuumStmt *vacstmt);
Index: src/include/libpq/hba.h
===================================================================
RCS file: /home/alvherre/cvs/pgsql/src/include/libpq/hba.h,v
retrieving revision 1.40
diff -c -r1.40 hba.h
*** src/include/libpq/hba.h	11 Aug 2005 21:11:48 -0000	1.40
--- src/include/libpq/hba.h	20 Sep 2005 20:47:04 -0000
***************
*** 37,43 ****
  extern int	hba_getauthmethod(hbaPort *port);
  extern int	authident(hbaPort *port);
  extern bool	read_pg_database_line(FILE *fp, char *dbname, Oid *dboid,
! 								  Oid *dbtablespace, TransactionId *dbfrozenxid,
! 								  TransactionId *dbvacuumxid);
  
  #endif /* HBA_H */
--- 37,42 ----
  extern int	hba_getauthmethod(hbaPort *port);
  extern int	authident(hbaPort *port);
  extern bool	read_pg_database_line(FILE *fp, char *dbname, Oid *dboid,
! 								  Oid *dbtablespace, TransactionId *dbminxid);
  
  #endif /* HBA_H */
#8Alvaro Herrera
alvherre@commandprompt.com
In reply to: Tom Lane (#5)
1 attachment(s)
Re: [HACKERS] Per-table freeze limit proposal

Tom Lane wrote:

Alvaro Herrera <alvherre@alvh.no-ip.org> writes:

Cool. I wonder if the exact figure should be
min(lowest non-frozen Xid in table, GetOldestXmin(false))

Actually just min(lowest Xid in table, RecentXmin). You only need to be
sure there are no running transactions older than what you put into the
field; their xmins are not at issue.

Ok, patch attached. Two new columns in pg_class store two Xids: the
"relminxid" is the one in the equation above. The relvacuumxid is the
OldestXmin. Two columns in pg_database replace the previous two,
datminxid is the minimum of all relminxid in the database, and
datvacuumxid is the minimum of all relvacuumxid. (datfreezexid is no
more.)

So we can check the Xid wrap horizon using datminxid, and truncate clog
using datvacuumxid. (Actually I was going to post the patch yesterday
without the datvacuumxid/relvacuumxid part, when I noticed that I was
truncating clog on datminxid which seemed a bad idea.)

Additionally I made DROP TABLE invalidate datminxid and datvacuumxid
when the table with the minimum is dropped. New values for the
pg_database columns are calculated by scanning pg_class at the next
vacuum when any of them is invalid, or when the table which was holding
the minimum back is vacuumed. New values for the pg_class columns are
updated every vacuum, as appropiate.

The whole thing is pretty fragile is somebody manually updates a
catalog. But we tell people not to do that, so it should be their
fault, right?

I discovered one problem with the whole approach. Per this patch, we
only store normal Xids in relminxid/relvacuumxid. So if a table is
completely frozen, we will store RecentXmin. We do this because it
would actually be unsafe to store, say, FrozenXid: if another
transaction stores/changes a tuple while we are vacuuming it, the Frozen
mark wouldn't be correct and thus the table could be corrupted if a Xid
wraparound happens (which is why we use RecentXmin in the first place:
to cope with the possibility of someone else using the table during the
vacuum.)

The problem comes when this is done to template1, and it is copied to
another database after some millions of transactions have come and go --
it will seem like the database has suffered wraparound. We would need
to vacuum it completely after copied for the stats to be accurate.

I'm not sure what to do about that. I think storing FrozenXid may not
actually be a totally bad idea. Comments?

--
Alvaro Herrera http://www.CommandPrompt.com/
The PostgreSQL Company - Command Prompt, Inc.

Attachments:

vacuum-minxid-2.patchtext/plain; charset=us-asciiDownload
Index: doc/src/sgml/catalogs.sgml
===================================================================
RCS file: /home/alvherre/cvs/pgsql/doc/src/sgml/catalogs.sgml,v
retrieving revision 2.115
diff -c -r2.115 catalogs.sgml
*** doc/src/sgml/catalogs.sgml	4 Nov 2005 23:13:59 -0000	2.115
--- doc/src/sgml/catalogs.sgml	14 Nov 2005 23:25:57 -0000
***************
*** 1613,1618 ****
--- 1613,1642 ----
       </row>
  
       <row>
+       <entry><structfield>relminxid</structfield></entry>
+       <entry><type>xid</type></entry>
+       <entry></entry>
+       <entry>
+        The minimum transaction ID present in all rows in this table.  This
+        value is used to determine the database-global
+        <structname>pg_database</>.<structfield>datminxid</> value.
+       </entry>
+      </row>
+ 
+      <row>
+       <entry><structfield>relvacuumxid</structfield></entry>
+       <entry><type>xid</type></entry>
+       <entry></entry>
+       <entry>
+        The transaction ID that was used as cleaning point as of the last vacuum
+        operation.  All rows inserted, updated or deleted in this table by
+        transactions whose IDs are below this one have been marked as known good
+        or deleted.  This is used to determine the database-global
+        <structname>pg_database</>.<structfield>datvacuumxid</> value.
+       </entry>
+      </row>
+ 
+      <row>
        <entry><structfield>relacl</structfield></entry>
        <entry><type>aclitem[]</type></entry>
        <entry></entry>
***************
*** 1980,2004 ****
       </row>
  
       <row>
!       <entry><structfield>datvacuumxid</structfield></entry>
        <entry><type>xid</type></entry>
        <entry></entry>
        <entry>
!        All rows inserted or deleted by transaction IDs before this one
!        have been marked as known committed or known aborted in this database.
!        This is used to determine when commit-log space can be recycled.
        </entry>
       </row>
  
       <row>
!       <entry><structfield>datfrozenxid</structfield></entry>
        <entry><type>xid</type></entry>
        <entry></entry>
        <entry>
!        All rows inserted by transaction IDs before this one have been
!        relabeled with a permanent (<quote>frozen</>) transaction ID in this
!        database.  This is useful to check whether a database must be vacuumed
!        soon to avoid transaction ID wrap-around problems.
        </entry>
       </row>
  
--- 2004,2034 ----
       </row>
  
       <row>
!       <entry><structfield>datminxid</structfield></entry>
        <entry><type>xid</type></entry>
        <entry></entry>
        <entry>
!        The minimum transaction ID present in all tables in this database.
!        All rows inserted by transaction IDs before this one have been
!        relabeled with a permanent (<quote>frozen</>) transaction ID in this
!        database.  This is used to check whether a database must be
!        vacuumed soon to avoid transaction ID wrap-around problems.
!        If InvalidTransactionId, then the minimum is unknown and can be
!        determined by scanning <structname>pg_class</>.<structfield>relminxid</>.
        </entry>
       </row>
  
       <row>
!       <entry><structfield>datvacuumxid</structfield></entry>
        <entry><type>xid</type></entry>
        <entry></entry>
        <entry>
!        The transaction ID that was used as cleaning point as of the last vacuum
!        operation.  All rows inserted, updated or deleted by transactions whose
!        IDs are below this one have been marked as known good or deleted.  This
!        is used to determine when commit-log space can be recycled.
!        If InvalidTransactionId, then the minimum is unknown and can be
!        determined by scanning <structname>pg_class</>.<structfield>relvacuumxid</>.
        </entry>
       </row>
  
Index: src/backend/access/transam/varsup.c
===================================================================
RCS file: /home/alvherre/cvs/pgsql/src/backend/access/transam/varsup.c,v
retrieving revision 1.68
diff -c -r1.68 varsup.c
*** src/backend/access/transam/varsup.c	29 Oct 2005 00:31:50 -0000	1.68
--- src/backend/access/transam/varsup.c	7 Nov 2005 12:06:20 -0000
***************
*** 168,178 ****
  
  /*
   * Determine the last safe XID to allocate given the currently oldest
!  * datfrozenxid (ie, the oldest XID that might exist in any database
   * of our cluster).
   */
  void
! SetTransactionIdLimit(TransactionId oldest_datfrozenxid,
  					  Name oldest_datname)
  {
  	TransactionId xidWarnLimit;
--- 168,178 ----
  
  /*
   * Determine the last safe XID to allocate given the currently oldest
!  * datminxid (ie, the oldest XID that might exist in any database
   * of our cluster).
   */
  void
! SetTransactionIdLimit(TransactionId oldest_datminxid,
  					  Name oldest_datname)
  {
  	TransactionId xidWarnLimit;
***************
*** 180,195 ****
  	TransactionId xidWrapLimit;
  	TransactionId curXid;
  
! 	Assert(TransactionIdIsValid(oldest_datfrozenxid));
  
  	/*
  	 * The place where we actually get into deep trouble is halfway around
! 	 * from the oldest potentially-existing XID.  (This calculation is
! 	 * probably off by one or two counts, because the special XIDs reduce the
! 	 * size of the loop a little bit.  But we throw in plenty of slop below,
! 	 * so it doesn't matter.)
  	 */
! 	xidWrapLimit = oldest_datfrozenxid + (MaxTransactionId >> 1);
  	if (xidWrapLimit < FirstNormalTransactionId)
  		xidWrapLimit += FirstNormalTransactionId;
  
--- 180,195 ----
  	TransactionId xidWrapLimit;
  	TransactionId curXid;
  
! 	Assert(TransactionIdIsValid(oldest_datminxid));
  
  	/*
  	 * The place where we actually get into deep trouble is halfway around
! 	 * from the oldest existing XID.  (This calculation is probably off by one
! 	 * or two counts, because the special XIDs reduce the size of the loop a
! 	 * little bit.  But we throw in plenty of slop below, so it doesn't
! 	 * matter.)
  	 */
! 	xidWrapLimit = oldest_datminxid + (MaxTransactionId >> 1);
  	if (xidWrapLimit < FirstNormalTransactionId)
  		xidWrapLimit += FirstNormalTransactionId;
  
Index: src/backend/catalog/heap.c
===================================================================
RCS file: /home/alvherre/cvs/pgsql/src/backend/catalog/heap.c,v
retrieving revision 1.292
diff -c -r1.292 heap.c
*** src/backend/catalog/heap.c	18 Oct 2005 01:06:23 -0000	1.292
--- src/backend/catalog/heap.c	14 Nov 2005 23:09:48 -0000
***************
*** 38,43 ****
--- 38,44 ----
  #include "catalog/indexing.h"
  #include "catalog/pg_attrdef.h"
  #include "catalog/pg_constraint.h"
+ #include "catalog/pg_database.h"
  #include "catalog/pg_inherits.h"
  #include "catalog/pg_namespace.h"
  #include "catalog/pg_statistic.h"
***************
*** 576,591 ****
--- 577,601 ----
  			/* The relation is real, but as yet empty */
  			new_rel_reltup->relpages = 0;
  			new_rel_reltup->reltuples = 0;
+ 			/* Use the minimum Xid that could put tuples in the table */
+ 			new_rel_reltup->relminxid = RecentXmin;
+ 			new_rel_reltup->relvacuumxid = RecentXmin;
  			break;
  		case RELKIND_SEQUENCE:
  			/* Sequences always have a known size */
  			new_rel_reltup->relpages = 1;
  			new_rel_reltup->reltuples = 1;
+ 			/* Sequences will never have Xids */
+ 			new_rel_reltup->relminxid = InvalidTransactionId;
+ 			new_rel_reltup->relvacuumxid = InvalidTransactionId;
  			break;
  		default:
  			/* Views, etc, have no disk storage */
  			new_rel_reltup->relpages = 0;
  			new_rel_reltup->reltuples = 0;
+ 			/* Views, etc, won't have Xids either */
+ 			new_rel_reltup->relminxid = InvalidTransactionId;
+ 			new_rel_reltup->relvacuumxid = InvalidTransactionId;
  			break;
  	}
  
***************
*** 1125,1130 ****
--- 1135,1189 ----
  }
  
  /*
+  * Invalidate (set to invalid) the datminxid and/or datvacuumxid of a database,
+  * when we drop the table that has the minimum Xids.
+  */
+ static void
+ InvalidateDbMinxid(TransactionId relminxid, TransactionId relvacuumxid)
+ {
+ 	Oid				dbid = MyDatabaseId;
+ 	Relation		dbrel;
+ 	HeapTuple		tuple;
+ 	HeapScanDesc	scan;
+ 	Form_pg_database dbform;
+ 	ScanKeyData		entry[1];
+ 
+ 	dbrel = heap_open(DatabaseRelationId, RowExclusiveLock);
+ 
+ 	/* Must use a heap scan, since there's no syscache for pg_database */
+ 	ScanKeyInit(&entry[0],
+ 				ObjectIdAttributeNumber,
+ 				BTEqualStrategyNumber, F_OIDEQ,
+ 				ObjectIdGetDatum(dbid));
+ 
+ 	scan = heap_beginscan(dbrel, SnapshotNow, 1, entry);
+ 
+ 	tuple = heap_getnext(scan, ForwardScanDirection);
+ 
+ 	if (!HeapTupleIsValid(tuple))
+ 		elog(ERROR, "could not find tuple for database %u", dbid);
+ 
+ 	/* Ensure no one does this at the same time */
+ 	LockBuffer(scan->rs_cbuf, BUFFER_LOCK_EXCLUSIVE);
+ 
+ 	dbform = (Form_pg_database) GETSTRUCT(tuple);
+ 
+ 	/* Note we don't actually care if dbform->datminxid is already Invalid */
+ 	if (TransactionIdEquals(dbform->datminxid, relminxid))
+ 		dbform->datminxid = InvalidTransactionId;
+ 
+ 	/* ditto */
+ 	if (TransactionIdEquals(dbform->datvacuumxid, relvacuumxid))
+ 		dbform->datvacuumxid = InvalidTransactionId;
+ 
+ 	LockBuffer(scan->rs_cbuf, BUFFER_LOCK_UNLOCK);
+ 
+ 	heap_endscan(scan);
+ 
+ 	heap_close(dbrel, RowExclusiveLock);
+ }
+ 
+ /*
   * heap_drop_with_catalog	- removes specified relation from catalogs
   *
   * Note that this routine is not responsible for dropping objects that are
***************
*** 1153,1158 ****
--- 1212,1221 ----
  		smgrscheduleunlink(rel->rd_smgr, rel->rd_istemp);
  	}
  
+ 	/* Invalidate pg_database.datminxid, if appropiate */
+ 	if ((rel->rd_rel->relkind == RELKIND_RELATION) && (!rel->rd_istemp))
+ 		InvalidateDbMinxid(rel->rd_rel->relminxid, rel->rd_rel->relvacuumxid);
+ 
  	/*
  	 * Close relcache entry, but *keep* AccessExclusiveLock on the relation
  	 * until transaction commit.  This ensures no one else will try to do
Index: src/backend/commands/analyze.c
===================================================================
RCS file: /home/alvherre/cvs/pgsql/src/backend/commands/analyze.c,v
retrieving revision 1.89
diff -c -r1.89 analyze.c
*** src/backend/commands/analyze.c	15 Oct 2005 02:49:15 -0000	1.89
--- src/backend/commands/analyze.c	14 Nov 2005 21:17:38 -0000
***************
*** 419,426 ****
  	{
  		vac_update_relstats(RelationGetRelid(onerel),
  							RelationGetNumberOfBlocks(onerel),
! 							totalrows,
! 							hasindex);
  		for (ind = 0; ind < nindexes; ind++)
  		{
  			AnlIndexData *thisdata = &indexdata[ind];
--- 419,429 ----
  	{
  		vac_update_relstats(RelationGetRelid(onerel),
  							RelationGetNumberOfBlocks(onerel),
! 							totalrows, hasindex, false,
! 							InvalidTransactionId,
! 							InvalidTransactionId,
! 							NULL, NULL);
! 
  		for (ind = 0; ind < nindexes; ind++)
  		{
  			AnlIndexData *thisdata = &indexdata[ind];
***************
*** 430,436 ****
  			vac_update_relstats(RelationGetRelid(Irel[ind]),
  								RelationGetNumberOfBlocks(Irel[ind]),
  								totalindexrows,
! 								false);
  		}
  
  		/* report results to the stats collector, too */
--- 433,440 ----
  			vac_update_relstats(RelationGetRelid(Irel[ind]),
  								RelationGetNumberOfBlocks(Irel[ind]),
  								totalindexrows,
! 								false, false, InvalidTransactionId,
! 								InvalidTransactionId, NULL, NULL);
  		}
  
  		/* report results to the stats collector, too */
Index: src/backend/commands/dbcommands.c
===================================================================
RCS file: /home/alvherre/cvs/pgsql/src/backend/commands/dbcommands.c,v
retrieving revision 1.173
diff -c -r1.173 dbcommands.c
*** src/backend/commands/dbcommands.c	15 Oct 2005 02:49:15 -0000	1.173
--- src/backend/commands/dbcommands.c	7 Nov 2005 12:10:03 -0000
***************
*** 56,62 ****
  static bool get_db_info(const char *name, Oid *dbIdP, Oid *ownerIdP,
  			int *encodingP, bool *dbIsTemplateP, bool *dbAllowConnP,
  			Oid *dbLastSysOidP,
! 			TransactionId *dbVacuumXidP, TransactionId *dbFrozenXidP,
  			Oid *dbTablespace);
  static bool have_createdb_privilege(void);
  static void remove_dbtablespaces(Oid db_id);
--- 56,62 ----
  static bool get_db_info(const char *name, Oid *dbIdP, Oid *ownerIdP,
  			int *encodingP, bool *dbIsTemplateP, bool *dbAllowConnP,
  			Oid *dbLastSysOidP,
! 			TransactionId *dbMinXidP,
  			Oid *dbTablespace);
  static bool have_createdb_privilege(void);
  static void remove_dbtablespaces(Oid db_id);
***************
*** 76,83 ****
  	bool		src_istemplate;
  	bool		src_allowconn;
  	Oid			src_lastsysoid;
! 	TransactionId src_vacuumxid;
! 	TransactionId src_frozenxid;
  	Oid			src_deftablespace;
  	volatile Oid dst_deftablespace;
  	volatile Relation pg_database_rel;
--- 76,82 ----
  	bool		src_istemplate;
  	bool		src_allowconn;
  	Oid			src_lastsysoid;
! 	TransactionId src_minxid;
  	Oid			src_deftablespace;
  	volatile Oid dst_deftablespace;
  	volatile Relation pg_database_rel;
***************
*** 224,230 ****
  	 * grab the exclusive lock.
  	 */
  	if (get_db_info(dbname, NULL, NULL, NULL,
! 					NULL, NULL, NULL, NULL, NULL, NULL))
  		ereport(ERROR,
  				(errcode(ERRCODE_DUPLICATE_DATABASE),
  				 errmsg("database \"%s\" already exists", dbname)));
--- 223,229 ----
  	 * grab the exclusive lock.
  	 */
  	if (get_db_info(dbname, NULL, NULL, NULL,
! 					NULL, NULL, NULL, NULL, NULL))
  		ereport(ERROR,
  				(errcode(ERRCODE_DUPLICATE_DATABASE),
  				 errmsg("database \"%s\" already exists", dbname)));
***************
*** 237,243 ****
  
  	if (!get_db_info(dbtemplate, &src_dboid, &src_owner, &src_encoding,
  					 &src_istemplate, &src_allowconn, &src_lastsysoid,
! 					 &src_vacuumxid, &src_frozenxid, &src_deftablespace))
  		ereport(ERROR,
  				(errcode(ERRCODE_UNDEFINED_DATABASE),
  			 errmsg("template database \"%s\" does not exist", dbtemplate)));
--- 236,242 ----
  
  	if (!get_db_info(dbtemplate, &src_dboid, &src_owner, &src_encoding,
  					 &src_istemplate, &src_allowconn, &src_lastsysoid,
! 					 &src_minxid, &src_deftablespace))
  		ereport(ERROR,
  				(errcode(ERRCODE_UNDEFINED_DATABASE),
  			 errmsg("template database \"%s\" does not exist", dbtemplate)));
***************
*** 336,349 ****
  	}
  
  	/*
! 	 * Normally we mark the new database with the same datvacuumxid and
! 	 * datfrozenxid as the source.	However, if the source is not allowing
! 	 * connections then we assume it is fully frozen, and we can set the
! 	 * current transaction ID as the xid limits.  This avoids immediately
! 	 * starting to generate warnings after cloning template0.
  	 */
  	if (!src_allowconn)
! 		src_vacuumxid = src_frozenxid = GetCurrentTransactionId();
  
  	/*
  	 * Preassign OID for pg_database tuple, so that we can compute db path.
--- 335,348 ----
  	}
  
  	/*
! 	 * Normally we mark the new database with the same datminxid as the source.
! 	 * However, if the source is not allowing connections then we assume it is
! 	 * fully frozen, and we can set the current transaction ID as the xid
! 	 * limit.  This avoids immediately starting to generate warnings after
! 	 * cloning template0.
  	 */
  	if (!src_allowconn)
! 		src_minxid = GetCurrentTransactionId();
  
  	/*
  	 * Preassign OID for pg_database tuple, so that we can compute db path.
***************
*** 441,447 ****
  
  		/* Check to see if someone else created same DB name meanwhile. */
  		if (get_db_info(dbname, NULL, NULL, NULL,
! 						NULL, NULL, NULL, NULL, NULL, NULL))
  			ereport(ERROR,
  					(errcode(ERRCODE_DUPLICATE_DATABASE),
  					 errmsg("database \"%s\" already exists", dbname)));
--- 440,446 ----
  
  		/* Check to see if someone else created same DB name meanwhile. */
  		if (get_db_info(dbname, NULL, NULL, NULL,
! 						NULL, NULL, NULL, NULL, NULL))
  			ereport(ERROR,
  					(errcode(ERRCODE_DUPLICATE_DATABASE),
  					 errmsg("database \"%s\" already exists", dbname)));
***************
*** 463,470 ****
  		new_record[Anum_pg_database_datallowconn - 1] = BoolGetDatum(true);
  		new_record[Anum_pg_database_datconnlimit - 1] = Int32GetDatum(dbconnlimit);
  		new_record[Anum_pg_database_datlastsysoid - 1] = ObjectIdGetDatum(src_lastsysoid);
! 		new_record[Anum_pg_database_datvacuumxid - 1] = TransactionIdGetDatum(src_vacuumxid);
! 		new_record[Anum_pg_database_datfrozenxid - 1] = TransactionIdGetDatum(src_frozenxid);
  		new_record[Anum_pg_database_dattablespace - 1] = ObjectIdGetDatum(dst_deftablespace);
  
  		/*
--- 462,468 ----
  		new_record[Anum_pg_database_datallowconn - 1] = BoolGetDatum(true);
  		new_record[Anum_pg_database_datconnlimit - 1] = Int32GetDatum(dbconnlimit);
  		new_record[Anum_pg_database_datlastsysoid - 1] = ObjectIdGetDatum(src_lastsysoid);
! 		new_record[Anum_pg_database_datminxid - 1] = TransactionIdGetDatum(src_minxid);
  		new_record[Anum_pg_database_dattablespace - 1] = ObjectIdGetDatum(dst_deftablespace);
  
  		/*
***************
*** 584,590 ****
  	pgdbrel = heap_open(DatabaseRelationId, ExclusiveLock);
  
  	if (!get_db_info(dbname, &db_id, NULL, NULL,
! 					 &db_istemplate, NULL, NULL, NULL, NULL, NULL))
  		ereport(ERROR,
  				(errcode(ERRCODE_UNDEFINED_DATABASE),
  				 errmsg("database \"%s\" does not exist", dbname)));
--- 582,588 ----
  	pgdbrel = heap_open(DatabaseRelationId, ExclusiveLock);
  
  	if (!get_db_info(dbname, &db_id, NULL, NULL,
! 					 &db_istemplate, NULL, NULL, NULL, NULL))
  		ereport(ERROR,
  				(errcode(ERRCODE_UNDEFINED_DATABASE),
  				 errmsg("database \"%s\" does not exist", dbname)));
***************
*** 1082,1089 ****
  static bool
  get_db_info(const char *name, Oid *dbIdP, Oid *ownerIdP,
  			int *encodingP, bool *dbIsTemplateP, bool *dbAllowConnP,
! 			Oid *dbLastSysOidP,
! 			TransactionId *dbVacuumXidP, TransactionId *dbFrozenXidP,
  			Oid *dbTablespace)
  {
  	Relation	relation;
--- 1080,1086 ----
  static bool
  get_db_info(const char *name, Oid *dbIdP, Oid *ownerIdP,
  			int *encodingP, bool *dbIsTemplateP, bool *dbAllowConnP,
! 			Oid *dbLastSysOidP, TransactionId *dbMinXidP,
  			Oid *dbTablespace)
  {
  	Relation	relation;
***************
*** 1130,1141 ****
  		/* last system OID used in database */
  		if (dbLastSysOidP)
  			*dbLastSysOidP = dbform->datlastsysoid;
! 		/* limit of vacuumed XIDs */
! 		if (dbVacuumXidP)
! 			*dbVacuumXidP = dbform->datvacuumxid;
! 		/* limit of frozen XIDs */
! 		if (dbFrozenXidP)
! 			*dbFrozenXidP = dbform->datfrozenxid;
  		/* default tablespace for this database */
  		if (dbTablespace)
  			*dbTablespace = dbform->dattablespace;
--- 1127,1135 ----
  		/* last system OID used in database */
  		if (dbLastSysOidP)
  			*dbLastSysOidP = dbform->datlastsysoid;
! 		/* limit of min XIDs */
! 		if (dbMinXidP)
! 			*dbMinXidP = dbform->datminxid;
  		/* default tablespace for this database */
  		if (dbTablespace)
  			*dbTablespace = dbform->dattablespace;
Index: src/backend/commands/vacuum.c
===================================================================
RCS file: /home/alvherre/cvs/pgsql/src/backend/commands/vacuum.c,v
retrieving revision 1.317
diff -c -r1.317 vacuum.c
*** src/backend/commands/vacuum.c	15 Oct 2005 02:49:16 -0000	1.317
--- src/backend/commands/vacuum.c	14 Nov 2005 21:53:16 -0000
***************
*** 125,130 ****
--- 125,131 ----
  	Size		min_tlen;
  	Size		max_tlen;
  	bool		hasindex;
+ 	TransactionId minxid;	/* Minimum Xid present anywhere on table */
  	/* vtlinks array for tuple chain following - sorted by new_tid */
  	int			num_vtlinks;
  	VTupleLink	vtlinks;
***************
*** 192,216 ****
  
  static int	elevel = -1;
  
- static TransactionId OldestXmin;
- static TransactionId FreezeLimit;
- 
  
  /* non-export function prototypes */
  static List *get_rel_oids(List *relids, const RangeVar *vacrel,
! 			 const char *stmttype);
! static void vac_update_dbstats(Oid dbid,
! 				   TransactionId vacuumXID,
! 				   TransactionId frozenXID);
! static void vac_truncate_clog(TransactionId vacuumXID,
! 				  TransactionId frozenXID);
! static bool vacuum_rel(Oid relid, VacuumStmt *vacstmt, char expected_relkind);
! static void full_vacuum_rel(Relation onerel, VacuumStmt *vacstmt);
  static void scan_heap(VRelStats *vacrelstats, Relation onerel,
! 		  VacPageList vacuum_pages, VacPageList fraged_pages);
  static void repair_frag(VRelStats *vacrelstats, Relation onerel,
  			VacPageList vacuum_pages, VacPageList fraged_pages,
! 			int nindexes, Relation *Irel);
  static void move_chain_tuple(Relation rel,
  				 Buffer old_buf, Page old_page, HeapTuple old_tup,
  				 Buffer dst_buf, Page dst_page, VacPage dst_vacpage,
--- 193,216 ----
  
  static int	elevel = -1;
  
  
  /* non-export function prototypes */
  static List *get_rel_oids(List *relids, const RangeVar *vacrel,
! 						  const char *stmttype);
! static void vac_update_dbxids(Oid dbid, TransactionId prevminXid,
! 							  TransactionId prevvacXid);
! static void vac_truncate_clog(TransactionId minXID);
! static void vacuum_rel(Oid relid, VacuumStmt *vacstmt, char expected_relkind,
! 		   TransactionId *prevmin, TransactionId *prevvac);
! static void full_vacuum_rel(Relation onerel, VacuumStmt *vacstmt,
! 				TransactionId *relminxid, TransactionId *relvacuumxid);
! 
  static void scan_heap(VRelStats *vacrelstats, Relation onerel,
! 		  VacPageList vacuum_pages, VacPageList fraged_pages,
! 		  TransactionId FreezeLimit, TransactionId OldestXmin);
  static void repair_frag(VRelStats *vacrelstats, Relation onerel,
  			VacPageList vacuum_pages, VacPageList fraged_pages,
! 			int nindexes, Relation *Irel, TransactionId OldestXmin);
  static void move_chain_tuple(Relation rel,
  				 Buffer old_buf, Page old_page, HeapTuple old_tup,
  				 Buffer dst_buf, Page dst_page, VacPage dst_vacpage,
***************
*** 267,279 ****
  vacuum(VacuumStmt *vacstmt, List *relids)
  {
  	const char *stmttype = vacstmt->vacuum ? "VACUUM" : "ANALYZE";
- 	TransactionId initialOldestXmin = InvalidTransactionId;
- 	TransactionId initialFreezeLimit = InvalidTransactionId;
  	volatile MemoryContext anl_context = NULL;
  	volatile bool all_rels,
  				in_outer_xact,
  				use_own_xacts;
  	List	   *relations;
  
  	if (vacstmt->verbose)
  		elevel = INFO;
--- 267,281 ----
  vacuum(VacuumStmt *vacstmt, List *relids)
  {
  	const char *stmttype = vacstmt->vacuum ? "VACUUM" : "ANALYZE";
  	volatile MemoryContext anl_context = NULL;
  	volatile bool all_rels,
  				in_outer_xact,
  				use_own_xacts;
  	List	   *relations;
+ 	TransactionId prevmin,
+ 				  prevvac;
+ 	TransactionId currmin = InvalidTransactionId,
+ 				  currvac = InvalidTransactionId;
  
  	if (vacstmt->verbose)
  		elevel = INFO;
***************
*** 350,381 ****
  	 */
  	relations = get_rel_oids(relids, vacstmt->relation, stmttype);
  
- 	if (vacstmt->vacuum && all_rels)
- 	{
- 		/*
- 		 * It's a database-wide VACUUM.
- 		 *
- 		 * Compute the initially applicable OldestXmin and FreezeLimit XIDs, so
- 		 * that we can record these values at the end of the VACUUM. Note that
- 		 * individual tables may well be processed with newer values, but we
- 		 * can guarantee that no (non-shared) relations are processed with
- 		 * older ones.
- 		 *
- 		 * It is okay to record non-shared values in pg_database, even though we
- 		 * may vacuum shared relations with older cutoffs, because only the
- 		 * minimum of the values present in pg_database matters.  We can be
- 		 * sure that shared relations have at some time been vacuumed with
- 		 * cutoffs no worse than the global minimum; for, if there is a
- 		 * backend in some other DB with xmin = OLDXMIN that's determining the
- 		 * cutoff with which we vacuum shared relations, it is not possible
- 		 * for that database to have a cutoff newer than OLDXMIN recorded in
- 		 * pg_database.
- 		 */
- 		vacuum_set_xid_limits(vacstmt, false,
- 							  &initialOldestXmin,
- 							  &initialFreezeLimit);
- 	}
- 
  	/*
  	 * Decide whether we need to start/commit our own transactions.
  	 *
--- 352,357 ----
***************
*** 444,451 ****
  
  			if (vacstmt->vacuum)
  			{
! 				if (!vacuum_rel(relid, vacstmt, RELKIND_RELATION))
! 					all_rels = false;	/* forget about updating dbstats */
  			}
  			if (vacstmt->analyze)
  			{
--- 420,437 ----
  
  			if (vacstmt->vacuum)
  			{
! 				vacuum_rel(relid, vacstmt, RELKIND_RELATION, &prevmin,
! 						   &prevvac);
! 				
! 				if (!TransactionIdIsValid(currmin) ||
! 					(TransactionIdIsNormal(prevmin) &&
! 					 TransactionIdPrecedes(prevmin, currmin)))
! 					currmin = prevmin;
! 
! 				if (!TransactionIdIsValid(currvac) ||
! 					(TransactionIdIsNormal(prevvac) &&
! 					 TransactionIdPrecedes(prevvac, currvac)))
! 					currvac = prevvac;
  			}
  			if (vacstmt->analyze)
  			{
***************
*** 522,536 ****
  			PrintFreeSpaceMapStatistics(elevel);
  
  		/*
! 		 * If we completed a database-wide VACUUM without skipping any
! 		 * relations, update the database's pg_database row with info about
! 		 * the transaction IDs used, and try to truncate pg_clog.
  		 */
! 		if (all_rels)
  		{
! 			vac_update_dbstats(MyDatabaseId,
! 							   initialOldestXmin, initialFreezeLimit);
! 			vac_truncate_clog(initialOldestXmin, initialFreezeLimit);
  		}
  	}
  
--- 508,526 ----
  			PrintFreeSpaceMapStatistics(elevel);
  
  		/*
! 		 * Skip these steps if the current minxid is InvalidTransactionId.
! 		 * It shouldn't happen on normal operation, but it happens during
! 		 * initdb.
  		 */
! 		if (TransactionIdIsValid(currmin))
  		{
! 			Assert(TransactionIdIsValid(currvac));
! 
! 			/* Update pg_database.datminxid, if necessary. */
! 			vac_update_dbxids(MyDatabaseId, currmin, currvac);
! 
! 			/* Try to truncate pg_clog. */
! 			vac_truncate_clog(currvac);
  		}
  	}
  
***************
*** 663,668 ****
--- 653,662 ----
   *		doing ANALYZE, but we always update these stats.  This routine works
   *		for both index and heap relation entries in pg_class.
   *
+  *		relminxid and relvacuumxid are output parameters, and correspond
+  *		to the values in pg_class that existed prior to the execution of
+  *		this vacuum.
+  *
   *		We violate no-overwrite semantics here by storing new values for the
   *		statistics columns directly into the pg_class tuple that's already on
   *		the page.  The reason for this is that if we updated these tuples in
***************
*** 676,682 ****
   */
  void
  vac_update_relstats(Oid relid, BlockNumber num_pages, double num_tuples,
! 					bool hasindex)
  {
  	Relation	rd;
  	HeapTupleData rtup;
--- 670,678 ----
   */
  void
  vac_update_relstats(Oid relid, BlockNumber num_pages, double num_tuples,
! 					bool hasindex, bool update_xids, TransactionId minxid,
! 					TransactionId vacuumxid, TransactionId *relminxid,
! 					TransactionId *relvacuumxid)
  {
  	Relation	rd;
  	HeapTupleData rtup;
***************
*** 708,713 ****
--- 704,721 ----
  
  	/* overwrite the existing statistics in the tuple */
  	pgcform = (Form_pg_class) GETSTRUCT(&rtup);
+ 
+ 	/* Set relminxid/relvacuumxid only if asked to */
+ 	if (update_xids)
+ 	{
+ 		AssertArg(relminxid != NULL && relvacuumxid != NULL);
+ 
+ 		*relminxid = pgcform->relminxid;
+ 		pgcform->relminxid = minxid;
+ 		*relvacuumxid = pgcform->relvacuumxid;
+ 		pgcform->relvacuumxid = vacuumxid;
+ 	}
+ 
  	pgcform->relpages = (int32) num_pages;
  	pgcform->reltuples = (float4) num_tuples;
  	pgcform->relhasindex = hasindex;
***************
*** 737,767 ****
  
  
  /*
!  *	vac_update_dbstats() -- update statistics for one database
   *
!  *		Update the whole-database statistics that are kept in its pg_database
!  *		row, and the flat-file copy of pg_database.
   *
!  *		We violate no-overwrite semantics here by storing new values for the
!  *		statistics columns directly into the tuple that's already on the page.
   *		As with vac_update_relstats, this avoids leaving dead tuples behind
   *		after a VACUUM.
   *
!  *		This routine is shared by full and lazy VACUUM.  Note that it is only
!  *		applied after a database-wide VACUUM operation.
   */
  static void
! vac_update_dbstats(Oid dbid,
! 				   TransactionId vacuumXID,
! 				   TransactionId frozenXID)
  {
! 	Relation	relation;
! 	ScanKeyData entry[1];
  	HeapScanDesc scan;
  	HeapTuple	tuple;
  	Form_pg_database dbform;
  
! 	relation = heap_open(DatabaseRelationId, RowExclusiveLock);
  
  	/* Must use a heap scan, since there's no syscache for pg_database */
  	ScanKeyInit(&entry[0],
--- 745,792 ----
  
  
  /*
!  *	vac_update_dbxids() -- update the minimum Xid present in one database
   *
!  * 		Update pg_database's datminxid and datvacuumxid and the flat-file copy
!  * 		of pg_database.
   *
!  * 		The prevminXid argument (resp. prevvacXid) is the minimum of all the
!  * 		relminxids (resp. relvacuumxid) that the vacuumed table(s) had before
!  * 		this vacuum operation.
!  *
!  * 		The Xids are updated to the minimum of all relminxid/relvacuumxid found
!  * 		in pg_class, if this prevminXid/prevvacXid is found to be equal to the
!  * 		current minimum -- that is, one of the processed tables was the one
!  * 		holding the minimum back.
!  *
!  * 		Note that it's possible for the data in pg_database to be
!  * 		InvalidTransactionId -- for example, if a table is dropped.  This is
!  * 		to cope with the possibility that the dropped table was the one with
!  * 		the minimum.  In this case, we need to search the minimum
!  * 		inconditionally.
!  *
!  *		We violate no-overwrite semantics here by storing a new value for the
!  *		statistic column directly into the tuple that's already on the page.
   *		As with vac_update_relstats, this avoids leaving dead tuples behind
   *		after a VACUUM.
   *
!  *		This routine is shared by full and lazy VACUUM.
   */
  static void
! vac_update_dbxids(Oid dbid, TransactionId prevminXid, TransactionId prevvacXid)
  {
! 	Relation	dbrel;
! 	ScanKeyData	entry[1];
  	HeapScanDesc scan;
  	HeapTuple	tuple;
  	Form_pg_database dbform;
+ 	TransactionId prevmin,
+ 				  prevvac;
+ 
+ 	Assert(TransactionIdIsValid(prevminXid));
+ 	Assert(TransactionIdIsValid(prevvacXid));
  
! 	dbrel = heap_open(DatabaseRelationId, RowExclusiveLock);
  
  	/* Must use a heap scan, since there's no syscache for pg_database */
  	ScanKeyInit(&entry[0],
***************
*** 769,826 ****
  				BTEqualStrategyNumber, F_OIDEQ,
  				ObjectIdGetDatum(dbid));
  
! 	scan = heap_beginscan(relation, SnapshotNow, 1, entry);
  
  	tuple = heap_getnext(scan, ForwardScanDirection);
  
  	if (!HeapTupleIsValid(tuple))
  		elog(ERROR, "could not find tuple for database %u", dbid);
  
! 	/* ensure no one else does this at the same time */
! 	LockBuffer(scan->rs_cbuf, BUFFER_LOCK_EXCLUSIVE);
  
  	dbform = (Form_pg_database) GETSTRUCT(tuple);
  
! 	/* overwrite the existing statistics in the tuple */
! 	dbform->datvacuumxid = vacuumXID;
! 	dbform->datfrozenxid = frozenXID;
  
! 	LockBuffer(scan->rs_cbuf, BUFFER_LOCK_UNLOCK);
  
! 	/* invalidate the tuple in the cache and write the buffer */
! 	CacheInvalidateHeapTuple(relation, tuple);
! 	WriteNoReleaseBuffer(scan->rs_cbuf);
  
! 	heap_endscan(scan);
  
! 	heap_close(relation, RowExclusiveLock);
  
! 	/* Mark the flat-file copy of pg_database for update at commit */
! 	database_file_update_needed();
! }
  
  
  /*
   *	vac_truncate_clog() -- attempt to truncate the commit log
   *
!  *		Scan pg_database to determine the system-wide oldest datvacuumxid,
!  *		and use it to truncate the transaction commit log (pg_clog).
!  *		Also update the XID wrap limit point maintained by varsup.c.
   *
!  *		We also generate a warning if the system-wide oldest datfrozenxid
   *		seems to be in danger of wrapping around.  This is a long-in-advance
   *		warning; if we start getting uncomfortably close, GetNewTransactionId
   *		will generate more-annoying warnings, and ultimately refuse to issue
   *		any more new XIDs.
   *
!  *		The passed XIDs are simply the ones I just wrote into my pg_database
!  *		entry.	They're used to initialize the "min" calculations.
   *
!  *		This routine is shared by full and lazy VACUUM.  Note that it is only
!  *		applied after a database-wide VACUUM operation.
   */
  static void
! vac_truncate_clog(TransactionId vacuumXID, TransactionId frozenXID)
  {
  	TransactionId myXID = GetCurrentTransactionId();
  	Relation	relation;
--- 794,903 ----
  				BTEqualStrategyNumber, F_OIDEQ,
  				ObjectIdGetDatum(dbid));
  
! 	scan = heap_beginscan(dbrel, SnapshotNow, 1, entry);
  
  	tuple = heap_getnext(scan, ForwardScanDirection);
  
  	if (!HeapTupleIsValid(tuple))
  		elog(ERROR, "could not find tuple for database %u", dbid);
  
! 	/* Ensure no one does this at the same time */
! 	LockSharedObject(DatabaseRelationId, dbid, 0, AccessExclusiveLock);
  
  	dbform = (Form_pg_database) GETSTRUCT(tuple);
+ 	prevmin = dbform->datminxid;
+ 	prevvac = dbform->datvacuumxid;
  
! 	/*
! 	 * If the table we just vacuumed was holding one of the minima back, or any
! 	 * of them was unset, update pg_database.
! 	 */
! 	if (!TransactionIdIsValid(prevmin) ||
! 		!TransactionIdIsValid(prevvac) ||
! 		TransactionIdEquals(prevmin, prevminXid) ||
! 		TransactionIdEquals(prevvac, prevvacXid))
! 	{
! 		Relation		classRel;
! 		SysScanDesc		classScan;
! 		HeapTuple		classTup;
! 		TransactionId	newMinXid = InvalidTransactionId,
! 						newVacXid = InvalidTransactionId;
  
! 		/* scan pg_class searching for the minimum relminxid */
! 		classRel = heap_open(RelationRelationId, AccessShareLock);
  
! 		classScan = systable_beginscan(classRel, InvalidOid, false,
! 									   SnapshotNow, 0, NULL);
  
! 		while ((classTup = systable_getnext(classScan)) != NULL)
! 		{
! 			Form_pg_class classForm;
  
! 			classForm = (Form_pg_class) GETSTRUCT(classTup);
  
! 			/* Only consider normal tables */
! 			if (classForm->relkind != RELKIND_RELATION)
! 				continue;
! 
! 			if (!TransactionIdIsValid(newMinXid) ||
! 				(TransactionIdIsNormal(classForm->relminxid) &&
! 				 TransactionIdPrecedes(classForm->relminxid, newMinXid)))
! 				newMinXid = classForm->relminxid;
! 
! 			if (!TransactionIdIsValid(newVacXid) ||
! 				(TransactionIdIsNormal(classForm->relvacuumxid) &&
! 				 TransactionIdPrecedes(classForm->relvacuumxid, newVacXid)))
! 				newVacXid = classForm->relvacuumxid;
! 		}
! 
! 		systable_endscan(classScan);
! 		heap_close(classRel, AccessShareLock);
! 
! 		Assert(TransactionIdIsValid(newMinXid));
! 		Assert(TransactionIdIsValid(newVacXid));
! 
! 		LockBuffer(scan->rs_cbuf, BUFFER_LOCK_EXCLUSIVE);
! 
! 		dbform->datminxid = newMinXid;
! 		dbform->datvacuumxid = newVacXid;
! 		
! 		/* invalidate the tuple in the cache and write the buffer */
! 		CacheInvalidateHeapTuple(dbrel, tuple);
! 		WriteNoReleaseBuffer(scan->rs_cbuf);
! 
! 		LockBuffer(scan->rs_cbuf, BUFFER_LOCK_UNLOCK);
! 		
! 		/* Mark the flat-file copy of pg_database for update at commit */
! 		database_file_update_needed();
! 	}
  
+ 	UnlockSharedObject(DatabaseRelationId, dbid, 0, AccessExclusiveLock);
+ 
+ 	heap_endscan(scan);
+ 
+ 	heap_close(dbrel, RowExclusiveLock);
+ }
  
  /*
   *	vac_truncate_clog() -- attempt to truncate the commit log
   *
!  *		Truncate the transaction commit log (pg_clog) using the minimum
!  *		Xid found on pg_database.  Also update the XID wrap limit point
!  *		maintained by varsup.c.
   *
!  *		We also generate a warning if the system-wide oldest datminxid
   *		seems to be in danger of wrapping around.  This is a long-in-advance
   *		warning; if we start getting uncomfortably close, GetNewTransactionId
   *		will generate more-annoying warnings, and ultimately refuse to issue
   *		any more new XIDs.
   *
!  *		The passed XID is simply the one I just wrote into my pg_database
!  *		entry.	It's used to initialize the "min" calculation.
   *
!  *		This routine is shared by full and lazy VACUUM.
   */
  static void
! vac_truncate_clog(TransactionId minXID)
  {
  	TransactionId myXID = GetCurrentTransactionId();
  	Relation	relation;
***************
*** 828,842 ****
  	HeapTuple	tuple;
  	int32		age;
  	NameData	oldest_datname;
! 	bool		vacuumAlreadyWrapped = false;
! 	bool		frozenAlreadyWrapped = false;
  
! 	/* init oldest_datname to sync with my frozenXID */
  	namestrcpy(&oldest_datname, get_database_name(MyDatabaseId));
  
  	/*
! 	 * Note: the "already wrapped" cases should now be impossible due to the
! 	 * defenses in GetNewTransactionId, but we keep them anyway.
  	 */
  	relation = heap_open(DatabaseRelationId, AccessShareLock);
  
--- 905,920 ----
  	HeapTuple	tuple;
  	int32		age;
  	NameData	oldest_datname;
! 	bool		alreadyWrapped = false;
! 
! 	Assert(TransactionIdIsValid(minXID));
  
! 	/* init oldest_datname to sync with my minXID */
  	namestrcpy(&oldest_datname, get_database_name(MyDatabaseId));
  
  	/*
! 	 * Note: the "already wrapped" case should now be impossible due to the
! 	 * defenses in GetNewTransactionId, but we keep it anyway.
  	 */
  	relation = heap_open(DatabaseRelationId, AccessShareLock);
  
***************
*** 851,870 ****
  		if (!dbform->datallowconn)
  			continue;
  
! 		if (TransactionIdIsNormal(dbform->datvacuumxid))
  		{
! 			if (TransactionIdPrecedes(myXID, dbform->datvacuumxid))
! 				vacuumAlreadyWrapped = true;
! 			else if (TransactionIdPrecedes(dbform->datvacuumxid, vacuumXID))
! 				vacuumXID = dbform->datvacuumxid;
! 		}
! 		if (TransactionIdIsNormal(dbform->datfrozenxid))
! 		{
! 			if (TransactionIdPrecedes(myXID, dbform->datfrozenxid))
! 				frozenAlreadyWrapped = true;
! 			else if (TransactionIdPrecedes(dbform->datfrozenxid, frozenXID))
  			{
! 				frozenXID = dbform->datfrozenxid;
  				namecpy(&oldest_datname, &dbform->datname);
  			}
  		}
--- 929,941 ----
  		if (!dbform->datallowconn)
  			continue;
  
! 		if (TransactionIdIsNormal(dbform->datminxid))
  		{
! 			if (TransactionIdPrecedes(myXID, dbform->datminxid))
! 				alreadyWrapped = true;
! 			else if (TransactionIdPrecedes(dbform->datminxid, minXID))
  			{
! 				minXID = dbform->datminxid;
  				namecpy(&oldest_datname, &dbform->datname);
  			}
  		}
***************
*** 875,899 ****
  	heap_close(relation, AccessShareLock);
  
  	/*
! 	 * Do not truncate CLOG if we seem to have suffered wraparound already;
! 	 * the computed minimum XID might be bogus.
! 	 */
! 	if (vacuumAlreadyWrapped)
! 	{
! 		ereport(WARNING,
! 				(errmsg("some databases have not been vacuumed in over 2 billion transactions"),
! 				 errdetail("You may have already suffered transaction-wraparound data loss.")));
! 		return;
! 	}
! 
! 	/* Truncate CLOG to the oldest vacuumxid */
! 	TruncateCLOG(vacuumXID);
! 
! 	/*
! 	 * Do not update varsup.c if we seem to have suffered wraparound already;
! 	 * the computed XID might be bogus.
  	 */
! 	if (frozenAlreadyWrapped)
  	{
  		ereport(WARNING,
  				(errmsg("some databases have not been vacuumed in over 1 billion transactions"),
--- 946,955 ----
  	heap_close(relation, AccessShareLock);
  
  	/*
! 	 * Do not truncate CLOG or update varsup.c if we seem to have suffered
! 	 * wraparound already; the computed XID might be bogus.
  	 */
! 	if (alreadyWrapped)
  	{
  		ereport(WARNING,
  				(errmsg("some databases have not been vacuumed in over 1 billion transactions"),
***************
*** 901,911 ****
  		return;
  	}
  
  	/* Update the wrap limit for GetNewTransactionId */
! 	SetTransactionIdLimit(frozenXID, &oldest_datname);
  
  	/* Give warning about impending wraparound problems */
! 	age = (int32) (myXID - frozenXID);
  	if (age > (int32) ((MaxTransactionId >> 3) * 3))
  		ereport(WARNING,
  		   (errmsg("database \"%s\" must be vacuumed within %u transactions",
--- 957,970 ----
  		return;
  	}
  
+ 	/* Truncate CLOG to the oldest minxid */
+ 	TruncateCLOG(minXID);
+ 
  	/* Update the wrap limit for GetNewTransactionId */
! 	SetTransactionIdLimit(minXID, &oldest_datname);
  
  	/* Give warning about impending wraparound problems */
! 	age = (int32) (myXID - minXID);
  	if (age > (int32) ((MaxTransactionId >> 3) * 3))
  		ereport(WARNING,
  		   (errmsg("database \"%s\" must be vacuumed within %u transactions",
***************
*** 927,953 ****
  /*
   *	vacuum_rel() -- vacuum one heap relation
   *
-  *		Returns TRUE if we actually processed the relation (or can ignore it
-  *		for some reason), FALSE if we failed to process it due to permissions
-  *		or other reasons.  (A FALSE result really means that some data
-  *		may have been left unvacuumed, so we can't update XID stats.)
-  *
   *		Doing one heap at a time incurs extra overhead, since we need to
   *		check that the heap exists again just before we vacuum it.	The
   *		reason that we do this is so that vacuuming can be spread across
   *		many small transactions.  Otherwise, two-phase locking would require
   *		us to lock the entire database during one pass of the vacuum cleaner.
   *
   *		At entry and exit, we are not inside a transaction.
   */
! static bool
! vacuum_rel(Oid relid, VacuumStmt *vacstmt, char expected_relkind)
  {
  	LOCKMODE	lmode;
  	Relation	onerel;
  	LockRelId	onerelid;
  	Oid			toast_relid;
- 	bool		result;
  
  	/* Begin a transaction for vacuuming this relation */
  	StartTransactionCommand();
--- 986,1010 ----
  /*
   *	vacuum_rel() -- vacuum one heap relation
   *
   *		Doing one heap at a time incurs extra overhead, since we need to
   *		check that the heap exists again just before we vacuum it.	The
   *		reason that we do this is so that vacuuming can be spread across
   *		many small transactions.  Otherwise, two-phase locking would require
   *		us to lock the entire database during one pass of the vacuum cleaner.
   *
+  *		prevmin and prevvac are output parameters, and correspond to the values
+  *		in pg_class that existed prior to the execution of this vacuum.
+  *
   *		At entry and exit, we are not inside a transaction.
   */
! static void
! vacuum_rel(Oid relid, VacuumStmt *vacstmt, char expected_relkind,
! 		   TransactionId *prevmin, TransactionId *prevvac)
  {
  	LOCKMODE	lmode;
  	Relation	onerel;
  	LockRelId	onerelid;
  	Oid			toast_relid;
  
  	/* Begin a transaction for vacuuming this relation */
  	StartTransactionCommand();
***************
*** 976,982 ****
  	{
  		StrategyHintVacuum(false);
  		CommitTransactionCommand();
! 		return true;			/* okay 'cause no data there */
  	}
  
  	/*
--- 1033,1039 ----
  	{
  		StrategyHintVacuum(false);
  		CommitTransactionCommand();
! 		return;
  	}
  
  	/*
***************
*** 1008,1014 ****
  		relation_close(onerel, lmode);
  		StrategyHintVacuum(false);
  		CommitTransactionCommand();
! 		return false;
  	}
  
  	/*
--- 1065,1071 ----
  		relation_close(onerel, lmode);
  		StrategyHintVacuum(false);
  		CommitTransactionCommand();
! 		return;
  	}
  
  	/*
***************
*** 1023,1029 ****
  		relation_close(onerel, lmode);
  		StrategyHintVacuum(false);
  		CommitTransactionCommand();
! 		return false;
  	}
  
  	/*
--- 1080,1086 ----
  		relation_close(onerel, lmode);
  		StrategyHintVacuum(false);
  		CommitTransactionCommand();
! 		return;
  	}
  
  	/*
***************
*** 1038,1044 ****
  		relation_close(onerel, lmode);
  		StrategyHintVacuum(false);
  		CommitTransactionCommand();
! 		return true;			/* assume no long-lived data in temp tables */
  	}
  
  	/*
--- 1095,1101 ----
  		relation_close(onerel, lmode);
  		StrategyHintVacuum(false);
  		CommitTransactionCommand();
! 		return;			/* assume no long-lived data in temp tables */
  	}
  
  	/*
***************
*** 1063,1073 ****
  	 * Do the actual work --- either FULL or "lazy" vacuum
  	 */
  	if (vacstmt->full)
! 		full_vacuum_rel(onerel, vacstmt);
  	else
! 		lazy_vacuum_rel(onerel, vacstmt);
! 
! 	result = true;				/* did the vacuum */
  
  	/* all done with this class, but hold lock until commit */
  	relation_close(onerel, NoLock);
--- 1120,1128 ----
  	 * Do the actual work --- either FULL or "lazy" vacuum
  	 */
  	if (vacstmt->full)
! 		full_vacuum_rel(onerel, vacstmt, prevmin, prevvac);
  	else
! 		lazy_vacuum_rel(onerel, vacstmt, prevmin, prevvac);
  
  	/* all done with this class, but hold lock until commit */
  	relation_close(onerel, NoLock);
***************
*** 1081,1102 ****
  	/*
  	 * If the relation has a secondary toast rel, vacuum that too while we
  	 * still hold the session lock on the master table.  Note however that
! 	 * "analyze" will not get done on the toast table.	This is good, because
! 	 * the toaster always uses hardcoded index access and statistics are
! 	 * totally unimportant for toast relations.
  	 */
  	if (toast_relid != InvalidOid)
! 	{
! 		if (!vacuum_rel(toast_relid, vacstmt, RELKIND_TOASTVALUE))
! 			result = false;		/* failed to vacuum the TOAST table? */
! 	}
  
  	/*
  	 * Now release the session-level lock on the master table.
  	 */
  	UnlockRelationForSession(&onerelid, lmode);
- 
- 	return result;
  }
  
  
--- 1136,1154 ----
  	/*
  	 * If the relation has a secondary toast rel, vacuum that too while we
  	 * still hold the session lock on the master table.  Note however that
! 	 * "analyze" will not get done on the toast table.	This is good,
! 	 * because the toaster always uses hardcoded index access and
! 	 * statistics are totally unimportant for toast relations.
! 	 *
! 	 * Note we ignore the TOAST table for the "minxid" calculations.
  	 */
  	if (toast_relid != InvalidOid)
! 		vacuum_rel(toast_relid, vacstmt, RELKIND_TOASTVALUE, NULL, NULL);
  
  	/*
  	 * Now release the session-level lock on the master table.
  	 */
  	UnlockRelationForSession(&onerelid, lmode);
  }
  
  
***************
*** 1116,1124 ****
   *
   *		At entry, we have already established a transaction and opened
   *		and locked the relation.
   */
  static void
! full_vacuum_rel(Relation onerel, VacuumStmt *vacstmt)
  {
  	VacPageListData vacuum_pages;		/* List of pages to vacuum and/or
  										 * clean indexes */
--- 1168,1182 ----
   *
   *		At entry, we have already established a transaction and opened
   *		and locked the relation.
+  *
+  *		relminxid and relvacuumxid are output parameters, and correspond
+  *		to the values in pg_class that existed prior to the execution of
+  *		this vacuum.
+  *
   */
  static void
! full_vacuum_rel(Relation onerel, VacuumStmt *vacstmt, TransactionId *relminxid,
! 				TransactionId *relvacuumxid)
  {
  	VacPageListData vacuum_pages;		/* List of pages to vacuum and/or
  										 * clean indexes */
***************
*** 1128,1133 ****
--- 1186,1193 ----
  	int			nindexes,
  				i;
  	VRelStats  *vacrelstats;
+ 	TransactionId FreezeLimit,
+ 				  OldestXmin;
  
  	vacuum_set_xid_limits(vacstmt, onerel->rd_rel->relisshared,
  						  &OldestXmin, &FreezeLimit);
***************
*** 1140,1148 ****
  	vacrelstats->rel_tuples = 0;
  	vacrelstats->hasindex = false;
  
  	/* scan the heap */
  	vacuum_pages.num_pages = fraged_pages.num_pages = 0;
! 	scan_heap(vacrelstats, onerel, &vacuum_pages, &fraged_pages);
  
  	/* Now open all indexes of the relation */
  	vac_open_indexes(onerel, AccessExclusiveLock, &nindexes, &Irel);
--- 1200,1215 ----
  	vacrelstats->rel_tuples = 0;
  	vacrelstats->hasindex = false;
  
+ 	/*
+ 	 * Set initial minimum Xid, which will be updated if a smaller Xid is found
+ 	 * in the relation.
+ 	 */
+ 	vacrelstats->minxid = RecentXmin;
+ 
  	/* scan the heap */
  	vacuum_pages.num_pages = fraged_pages.num_pages = 0;
! 	scan_heap(vacrelstats, onerel, &vacuum_pages, &fraged_pages, FreezeLimit,
! 			  OldestXmin);
  
  	/* Now open all indexes of the relation */
  	vac_open_indexes(onerel, AccessExclusiveLock, &nindexes, &Irel);
***************
*** 1170,1176 ****
  	{
  		/* Try to shrink heap */
  		repair_frag(vacrelstats, onerel, &vacuum_pages, &fraged_pages,
! 					nindexes, Irel);
  		vac_close_indexes(nindexes, Irel, NoLock);
  	}
  	else
--- 1237,1243 ----
  	{
  		/* Try to shrink heap */
  		repair_frag(vacrelstats, onerel, &vacuum_pages, &fraged_pages,
! 					nindexes, Irel, OldestXmin);
  		vac_close_indexes(nindexes, Irel, NoLock);
  	}
  	else
***************
*** 1186,1194 ****
  	/* update shared free space map with final free space info */
  	vac_update_fsm(onerel, &fraged_pages, vacrelstats->rel_pages);
  
! 	/* update statistics in pg_class */
  	vac_update_relstats(RelationGetRelid(onerel), vacrelstats->rel_pages,
! 						vacrelstats->rel_tuples, vacrelstats->hasindex);
  
  	/* report results to the stats collector, too */
  	pgstat_report_vacuum(RelationGetRelid(onerel), onerel->rd_rel->relisshared,
--- 1253,1267 ----
  	/* update shared free space map with final free space info */
  	vac_update_fsm(onerel, &fraged_pages, vacrelstats->rel_pages);
  
! 	/*
! 	 * Update statistics in pg_class.  Note we only want the Xids 
! 	 * if this is a plain relation (i.e. not a TOAST table)
! 	 */
  	vac_update_relstats(RelationGetRelid(onerel), vacrelstats->rel_pages,
! 						vacrelstats->rel_tuples, vacrelstats->hasindex,
! 						(onerel->rd_rel->relkind == RELKIND_RELATION),
! 						vacrelstats->minxid, OldestXmin, relminxid,
! 						relvacuumxid);
  
  	/* report results to the stats collector, too */
  	pgstat_report_vacuum(RelationGetRelid(onerel), onerel->rd_rel->relisshared,
***************
*** 1202,1217 ****
   *		This routine sets commit status bits, constructs vacuum_pages (list
   *		of pages we need to compact free space on and/or clean indexes of
   *		deleted tuples), constructs fraged_pages (list of pages with free
!  *		space that tuples could be moved into), and calculates statistics
!  *		on the number of live tuples in the heap.
   */
  static void
  scan_heap(VRelStats *vacrelstats, Relation onerel,
! 		  VacPageList vacuum_pages, VacPageList fraged_pages)
  {
  	BlockNumber nblocks,
  				blkno;
- 	HeapTupleData tuple;
  	char	   *relname;
  	VacPage		vacpage;
  	BlockNumber empty_pages,
--- 1275,1291 ----
   *		This routine sets commit status bits, constructs vacuum_pages (list
   *		of pages we need to compact free space on and/or clean indexes of
   *		deleted tuples), constructs fraged_pages (list of pages with free
!  *		space that tuples could be moved into), calculates statistics on the
!  *		number of live tuples in the heap, and figures out the minimum normal
!  *		Xid present anywhere on the table.
   */
  static void
  scan_heap(VRelStats *vacrelstats, Relation onerel,
! 		  VacPageList vacuum_pages, VacPageList fraged_pages,
! 		  TransactionId FreezeLimit, TransactionId OldestXmin)
  {
  	BlockNumber nblocks,
  				blkno;
  	char	   *relname;
  	VacPage		vacpage;
  	BlockNumber empty_pages,
***************
*** 1326,1331 ****
--- 1400,1406 ----
  		{
  			ItemId		itemid = PageGetItemId(page, offnum);
  			bool		tupgone = false;
+ 			HeapTupleData tuple;
  
  			/*
  			 * Collect un-used items too - it's possible to have indexes
***************
*** 1468,1473 ****
--- 1543,1560 ----
  					min_tlen = tuple.t_len;
  				if (tuple.t_len > max_tlen)
  					max_tlen = tuple.t_len;
+ 
+ 				/* Checks for pg_class.relminxid */
+ 				if (TransactionIdIsNormal(HeapTupleHeaderGetXmin(tuple.t_data)) &&
+ 					TransactionIdPrecedes(HeapTupleHeaderGetXmin(tuple.t_data),
+ 										  vacrelstats->minxid))
+ 					vacrelstats->minxid = HeapTupleHeaderGetXmin(tuple.t_data);
+ 
+ 				if (TransactionIdIsNormal(HeapTupleHeaderGetXmax(tuple.t_data)) &&
+ 					TransactionIdPrecedes(HeapTupleHeaderGetXmax(tuple.t_data),
+ 										  vacrelstats->minxid))
+ 					vacrelstats->minxid = HeapTupleHeaderGetXmax(tuple.t_data);
+ 
  			}
  		}						/* scan along page */
  
***************
*** 1609,1615 ****
  static void
  repair_frag(VRelStats *vacrelstats, Relation onerel,
  			VacPageList vacuum_pages, VacPageList fraged_pages,
! 			int nindexes, Relation *Irel)
  {
  	TransactionId myXID = GetCurrentTransactionId();
  	Buffer		dst_buffer = InvalidBuffer;
--- 1696,1702 ----
  static void
  repair_frag(VRelStats *vacrelstats, Relation onerel,
  			VacPageList vacuum_pages, VacPageList fraged_pages,
! 			int nindexes, Relation *Irel, TransactionId OldestXmin)
  {
  	TransactionId myXID = GetCurrentTransactionId();
  	Buffer		dst_buffer = InvalidBuffer;
***************
*** 2962,2968 ****
  	/* now update statistics in pg_class */
  	vac_update_relstats(RelationGetRelid(indrel),
  						stats->num_pages, stats->num_index_tuples,
! 						false);
  
  	ereport(elevel,
  			(errmsg("index \"%s\" now contains %.0f row versions in %u pages",
--- 3049,3056 ----
  	/* now update statistics in pg_class */
  	vac_update_relstats(RelationGetRelid(indrel),
  						stats->num_pages, stats->num_index_tuples,
! 						false, false, InvalidTransactionId,
! 						InvalidTransactionId, NULL, NULL);
  
  	ereport(elevel,
  			(errmsg("index \"%s\" now contains %.0f row versions in %u pages",
***************
*** 3029,3035 ****
  	/* now update statistics in pg_class */
  	vac_update_relstats(RelationGetRelid(indrel),
  						stats->num_pages, stats->num_index_tuples,
! 						false);
  
  	ereport(elevel,
  			(errmsg("index \"%s\" now contains %.0f row versions in %u pages",
--- 3117,3124 ----
  	/* now update statistics in pg_class */
  	vac_update_relstats(RelationGetRelid(indrel),
  						stats->num_pages, stats->num_index_tuples,
! 						false, false, InvalidTransactionId,
! 						InvalidTransactionId, NULL, NULL);
  
  	ereport(elevel,
  			(errmsg("index \"%s\" now contains %.0f row versions in %u pages",
Index: src/backend/commands/vacuumlazy.c
===================================================================
RCS file: /home/alvherre/cvs/pgsql/src/backend/commands/vacuumlazy.c,v
retrieving revision 1.61
diff -c -r1.61 vacuumlazy.c
*** src/backend/commands/vacuumlazy.c	15 Oct 2005 02:49:16 -0000	1.61
--- src/backend/commands/vacuumlazy.c	14 Nov 2005 21:35:48 -0000
***************
*** 71,76 ****
--- 71,77 ----
  	double		tuples_deleted;
  	BlockNumber nonempty_pages; /* actually, last nonempty page + 1 */
  	Size		threshold;		/* minimum interesting free space */
+ 	TransactionId minxid;		/* minimum Xid present anywhere in table */
  	/* List of TIDs of tuples we intend to delete */
  	/* NB: this list is ordered by TID address */
  	int			num_dead_tuples;	/* current # of entries */
***************
*** 87,99 ****
  
  static int	elevel = -1;
  
- static TransactionId OldestXmin;
- static TransactionId FreezeLimit;
- 
  
  /* non-export function prototypes */
  static void lazy_scan_heap(Relation onerel, LVRelStats *vacrelstats,
! 			   Relation *Irel, int nindexes);
  static void lazy_vacuum_heap(Relation onerel, LVRelStats *vacrelstats);
  static void lazy_scan_index(Relation indrel, LVRelStats *vacrelstats);
  static void lazy_vacuum_index(Relation indrel,
--- 88,98 ----
  
  static int	elevel = -1;
  
  
  /* non-export function prototypes */
  static void lazy_scan_heap(Relation onerel, LVRelStats *vacrelstats,
! 			   Relation *Irel, int nindexes, TransactionId FreezeLimit,
! 			   TransactionId OldestXmin);
  static void lazy_vacuum_heap(Relation onerel, LVRelStats *vacrelstats);
  static void lazy_scan_index(Relation indrel, LVRelStats *vacrelstats);
  static void lazy_vacuum_index(Relation indrel,
***************
*** 102,110 ****
  				  LVRelStats *vacrelstats);
  static int lazy_vacuum_page(Relation onerel, BlockNumber blkno, Buffer buffer,
  				 int tupindex, LVRelStats *vacrelstats);
! static void lazy_truncate_heap(Relation onerel, LVRelStats *vacrelstats);
  static BlockNumber count_nondeletable_pages(Relation onerel,
! 						 LVRelStats *vacrelstats);
  static void lazy_space_alloc(LVRelStats *vacrelstats, BlockNumber relblocks);
  static void lazy_record_dead_tuple(LVRelStats *vacrelstats,
  					   ItemPointer itemptr);
--- 101,110 ----
  				  LVRelStats *vacrelstats);
  static int lazy_vacuum_page(Relation onerel, BlockNumber blkno, Buffer buffer,
  				 int tupindex, LVRelStats *vacrelstats);
! static void lazy_truncate_heap(Relation onerel, LVRelStats *vacrelstats,
! 							   TransactionId OldestXmin);
  static BlockNumber count_nondeletable_pages(Relation onerel,
! 						 LVRelStats *vacrelstats, TransactionId OldestXmin);
  static void lazy_space_alloc(LVRelStats *vacrelstats, BlockNumber relblocks);
  static void lazy_record_dead_tuple(LVRelStats *vacrelstats,
  					   ItemPointer itemptr);
***************
*** 121,139 ****
   *	lazy_vacuum_rel() -- perform LAZY VACUUM for one heap relation
   *
   *		This routine vacuums a single heap, cleans out its indexes, and
!  *		updates its num_pages and num_tuples statistics.
   *
   *		At entry, we have already established a transaction and opened
   *		and locked the relation.
   */
  void
! lazy_vacuum_rel(Relation onerel, VacuumStmt *vacstmt)
  {
  	LVRelStats *vacrelstats;
  	Relation   *Irel;
  	int			nindexes;
  	bool		hasindex;
  	BlockNumber possibly_freeable;
  
  	if (vacstmt->verbose)
  		elevel = INFO;
--- 121,147 ----
   *	lazy_vacuum_rel() -- perform LAZY VACUUM for one heap relation
   *
   *		This routine vacuums a single heap, cleans out its indexes, and
!  *		updates its relpages and reltuples statistics, as well as the
!  *		relminxid and relvacuumxid information.
   *
   *		At entry, we have already established a transaction and opened
   *		and locked the relation.
+  *
+  *		relminxid and relvacuumxid are output parameters, and correspond
+  *		to the values in pg_class that existed prior to the execution of
+  *		this vacuum.
   */
  void
! lazy_vacuum_rel(Relation onerel, VacuumStmt *vacstmt, TransactionId *relminxid,
! 				TransactionId *relvacuumxid)
  {
  	LVRelStats *vacrelstats;
  	Relation   *Irel;
  	int			nindexes;
  	bool		hasindex;
  	BlockNumber possibly_freeable;
+ 	TransactionId OldestXmin,
+ 				  FreezeLimit;
  
  	if (vacstmt->verbose)
  		elevel = INFO;
***************
*** 149,160 ****
  	/* XXX should we scale it up or down?  Adjust vacuum.c too, if so */
  	vacrelstats->threshold = GetAvgFSMRequestSize(&onerel->rd_node);
  
  	/* Open all indexes of the relation */
  	vac_open_indexes(onerel, ShareUpdateExclusiveLock, &nindexes, &Irel);
  	hasindex = (nindexes > 0);
  
  	/* Do the vacuuming */
! 	lazy_scan_heap(onerel, vacrelstats, Irel, nindexes);
  
  	/* Done with indexes */
  	vac_close_indexes(nindexes, Irel, NoLock);
--- 157,171 ----
  	/* XXX should we scale it up or down?  Adjust vacuum.c too, if so */
  	vacrelstats->threshold = GetAvgFSMRequestSize(&onerel->rd_node);
  
+ 	/* Set initial minimum Xid in table */
+ 	vacrelstats->minxid = RecentXmin;
+ 
  	/* Open all indexes of the relation */
  	vac_open_indexes(onerel, ShareUpdateExclusiveLock, &nindexes, &Irel);
  	hasindex = (nindexes > 0);
  
  	/* Do the vacuuming */
! 	lazy_scan_heap(onerel, vacrelstats, Irel, nindexes, FreezeLimit, OldestXmin);
  
  	/* Done with indexes */
  	vac_close_indexes(nindexes, Irel, NoLock);
***************
*** 168,183 ****
  	possibly_freeable = vacrelstats->rel_pages - vacrelstats->nonempty_pages;
  	if (possibly_freeable >= REL_TRUNCATE_MINIMUM ||
  		possibly_freeable >= vacrelstats->rel_pages / REL_TRUNCATE_FRACTION)
! 		lazy_truncate_heap(onerel, vacrelstats);
  
  	/* Update shared free space map with final free space info */
  	lazy_update_fsm(onerel, vacrelstats);
  
  	/* Update statistics in pg_class */
! 	vac_update_relstats(RelationGetRelid(onerel),
! 						vacrelstats->rel_pages,
! 						vacrelstats->rel_tuples,
! 						hasindex);
  
  	/* report results to the stats collector, too */
  	pgstat_report_vacuum(RelationGetRelid(onerel), onerel->rd_rel->relisshared,
--- 179,195 ----
  	possibly_freeable = vacrelstats->rel_pages - vacrelstats->nonempty_pages;
  	if (possibly_freeable >= REL_TRUNCATE_MINIMUM ||
  		possibly_freeable >= vacrelstats->rel_pages / REL_TRUNCATE_FRACTION)
! 		lazy_truncate_heap(onerel, vacrelstats, OldestXmin);
  
  	/* Update shared free space map with final free space info */
  	lazy_update_fsm(onerel, vacrelstats);
  
  	/* Update statistics in pg_class */
! 	vac_update_relstats(RelationGetRelid(onerel), vacrelstats->rel_pages,
! 						vacrelstats->rel_tuples, hasindex,
! 						(onerel->rd_rel->relkind == RELKIND_RELATION),
! 						vacrelstats->minxid, OldestXmin, relminxid,
! 						relvacuumxid);
  
  	/* report results to the stats collector, too */
  	pgstat_report_vacuum(RelationGetRelid(onerel), onerel->rd_rel->relisshared,
***************
*** 192,201 ****
   *		and pages with free space, and calculates statistics on the number
   *		of live tuples in the heap.  When done, or when we run low on space
   *		for dead-tuple TIDs, invoke vacuuming of indexes and heap.
   */
  static void
  lazy_scan_heap(Relation onerel, LVRelStats *vacrelstats,
! 			   Relation *Irel, int nindexes)
  {
  	BlockNumber nblocks,
  				blkno;
--- 204,217 ----
   *		and pages with free space, and calculates statistics on the number
   *		of live tuples in the heap.  When done, or when we run low on space
   *		for dead-tuple TIDs, invoke vacuuming of indexes and heap.
+  *
+  *		It also updates the minimum Xid found anywhere on the table, for
+  *		pg_class.relminxid.
   */
  static void
  lazy_scan_heap(Relation onerel, LVRelStats *vacrelstats,
! 			   Relation *Irel, int nindexes, TransactionId FreezeLimit,
! 			   TransactionId OldestXmin)
  {
  	BlockNumber nblocks,
  				blkno;
***************
*** 420,425 ****
--- 436,452 ----
  			{
  				num_tuples += 1;
  				hastup = true;
+ 
+ 				/* Checks for pg_class.relminxid */
+ 				if (TransactionIdIsNormal(HeapTupleHeaderGetXmin(tuple.t_data)) &&
+ 					TransactionIdPrecedes(HeapTupleHeaderGetXmin(tuple.t_data),
+ 										  vacrelstats->minxid))
+ 					vacrelstats->minxid = HeapTupleHeaderGetXmin(tuple.t_data);
+ 
+ 				if (TransactionIdIsNormal(HeapTupleHeaderGetXmax(tuple.t_data)) &&
+ 					TransactionIdPrecedes(HeapTupleHeaderGetXmax(tuple.t_data),
+ 										  vacrelstats->minxid))
+ 					vacrelstats->minxid = HeapTupleHeaderGetXmax(tuple.t_data);
  			}
  		}						/* scan along page */
  
***************
*** 644,650 ****
  	vac_update_relstats(RelationGetRelid(indrel),
  						stats->num_pages,
  						stats->num_index_tuples,
! 						false);
  
  	ereport(elevel,
  			(errmsg("index \"%s\" now contains %.0f row versions in %u pages",
--- 671,678 ----
  	vac_update_relstats(RelationGetRelid(indrel),
  						stats->num_pages,
  						stats->num_index_tuples,
! 						false, false, InvalidTransactionId,
! 						InvalidTransactionId, NULL, NULL);
  
  	ereport(elevel,
  			(errmsg("index \"%s\" now contains %.0f row versions in %u pages",
***************
*** 720,726 ****
  	vac_update_relstats(RelationGetRelid(indrel),
  						stats->num_pages,
  						stats->num_index_tuples,
! 						false);
  
  	ereport(elevel,
  			(errmsg("index \"%s\" now contains %.0f row versions in %u pages",
--- 748,755 ----
  	vac_update_relstats(RelationGetRelid(indrel),
  						stats->num_pages,
  						stats->num_index_tuples,
! 						false, false, InvalidTransactionId,
! 						InvalidTransactionId, NULL, NULL);
  
  	ereport(elevel,
  			(errmsg("index \"%s\" now contains %.0f row versions in %u pages",
***************
*** 741,747 ****
   * lazy_truncate_heap - try to truncate off any empty pages at the end
   */
  static void
! lazy_truncate_heap(Relation onerel, LVRelStats *vacrelstats)
  {
  	BlockNumber old_rel_pages = vacrelstats->rel_pages;
  	BlockNumber new_rel_pages;
--- 770,777 ----
   * lazy_truncate_heap - try to truncate off any empty pages at the end
   */
  static void
! lazy_truncate_heap(Relation onerel, LVRelStats *vacrelstats,
! 				   TransactionId OldestXmin)
  {
  	BlockNumber old_rel_pages = vacrelstats->rel_pages;
  	BlockNumber new_rel_pages;
***************
*** 782,788 ****
  	 * because other backends could have added tuples to these pages whilst we
  	 * were vacuuming.
  	 */
! 	new_rel_pages = count_nondeletable_pages(onerel, vacrelstats);
  
  	if (new_rel_pages >= old_rel_pages)
  	{
--- 812,818 ----
  	 * because other backends could have added tuples to these pages whilst we
  	 * were vacuuming.
  	 */
! 	new_rel_pages = count_nondeletable_pages(onerel, vacrelstats, OldestXmin);
  
  	if (new_rel_pages >= old_rel_pages)
  	{
***************
*** 837,843 ****
   * Returns number of nondeletable pages (last nonempty page + 1).
   */
  static BlockNumber
! count_nondeletable_pages(Relation onerel, LVRelStats *vacrelstats)
  {
  	BlockNumber blkno;
  	HeapTupleData tuple;
--- 867,874 ----
   * Returns number of nondeletable pages (last nonempty page + 1).
   */
  static BlockNumber
! count_nondeletable_pages(Relation onerel, LVRelStats *vacrelstats,
! 						 TransactionId OldestXmin)
  {
  	BlockNumber blkno;
  	HeapTupleData tuple;
Index: src/backend/libpq/hba.c
===================================================================
RCS file: /home/alvherre/cvs/pgsql/src/backend/libpq/hba.c,v
retrieving revision 1.149
diff -c -r1.149 hba.c
*** src/backend/libpq/hba.c	17 Oct 2005 16:24:19 -0000	1.149
--- src/backend/libpq/hba.c	14 Nov 2005 23:41:13 -0000
***************
*** 1001,1007 ****
   *	dbname: gets database name (must be of size NAMEDATALEN bytes)
   *	dboid: gets database OID
   *	dbtablespace: gets database's default tablespace's OID
!  *	dbfrozenxid: gets database's frozen XID
   *	dbvacuumxid: gets database's vacuum XID
   *
   * This is not much related to the other functions in hba.c, but we put it
--- 1001,1007 ----
   *	dbname: gets database name (must be of size NAMEDATALEN bytes)
   *	dboid: gets database OID
   *	dbtablespace: gets database's default tablespace's OID
!  *	dbminxid: gets database's minimum XID
   *	dbvacuumxid: gets database's vacuum XID
   *
   * This is not much related to the other functions in hba.c, but we put it
***************
*** 1009,1015 ****
   */
  bool
  read_pg_database_line(FILE *fp, char *dbname, Oid *dboid,
! 					  Oid *dbtablespace, TransactionId *dbfrozenxid,
  					  TransactionId *dbvacuumxid)
  {
  	char		buf[MAX_TOKEN];
--- 1009,1015 ----
   */
  bool
  read_pg_database_line(FILE *fp, char *dbname, Oid *dboid,
! 					  Oid *dbtablespace, TransactionId *dbminxid,
  					  TransactionId *dbvacuumxid)
  {
  	char		buf[MAX_TOKEN];
***************
*** 1032,1038 ****
  	next_token(fp, buf, sizeof(buf));
  	if (!isdigit((unsigned char) buf[0]))
  		elog(FATAL, "bad data in flat pg_database file");
! 	*dbfrozenxid = atoxid(buf);
  	next_token(fp, buf, sizeof(buf));
  	if (!isdigit((unsigned char) buf[0]))
  		elog(FATAL, "bad data in flat pg_database file");
--- 1032,1038 ----
  	next_token(fp, buf, sizeof(buf));
  	if (!isdigit((unsigned char) buf[0]))
  		elog(FATAL, "bad data in flat pg_database file");
! 	*dbminxid = atoxid(buf);
  	next_token(fp, buf, sizeof(buf));
  	if (!isdigit((unsigned char) buf[0]))
  		elog(FATAL, "bad data in flat pg_database file");
Index: src/backend/postmaster/autovacuum.c
===================================================================
RCS file: /home/alvherre/cvs/pgsql/src/backend/postmaster/autovacuum.c,v
retrieving revision 1.5
diff -c -r1.5 autovacuum.c
*** src/backend/postmaster/autovacuum.c	15 Oct 2005 02:49:23 -0000	1.5
--- src/backend/postmaster/autovacuum.c	15 Nov 2005 01:10:42 -0000
***************
*** 76,82 ****
  {
  	Oid			oid;
  	char	   *name;
! 	TransactionId frozenxid;
  	TransactionId vacuumxid;
  	PgStat_StatDBEntry *entry;
  	int32		age;
--- 76,82 ----
  {
  	Oid			oid;
  	char	   *name;
! 	TransactionId minxid;
  	TransactionId vacuumxid;
  	PgStat_StatDBEntry *entry;
  	int32		age;
***************
*** 326,332 ****
  	{
  		autovac_dbase *tmp = lfirst(cell);
  		bool		this_whole_db;
! 		int32		freeze_age,
  					vacuum_age;
  
  		/*
--- 326,332 ----
  	{
  		autovac_dbase *tmp = lfirst(cell);
  		bool		this_whole_db;
! 		int32		true_age,
  					vacuum_age;
  
  		/*
***************
*** 339,347 ****
  		 * Unlike vacuum.c, we also look at vacuumxid.	This is so that pg_clog
  		 * can be kept trimmed to a reasonable size.
  		 */
! 		freeze_age = (int32) (nextXid - tmp->frozenxid);
  		vacuum_age = (int32) (nextXid - tmp->vacuumxid);
! 		tmp->age = Max(freeze_age, vacuum_age);
  
  		this_whole_db = (tmp->age >
  						 (int32) ((MaxTransactionId >> 3) * 3 - 100000));
--- 339,347 ----
  		 * Unlike vacuum.c, we also look at vacuumxid.	This is so that pg_clog
  		 * can be kept trimmed to a reasonable size.
  		 */
! 		true_age = (int32) (nextXid - tmp->minxid);
  		vacuum_age = (int32) (nextXid - tmp->vacuumxid);
! 		tmp->age = Max(true_age, vacuum_age);
  
  		this_whole_db = (tmp->age >
  						 (int32) ((MaxTransactionId >> 3) * 3 - 100000));
***************
*** 441,447 ****
  	FILE	   *db_file;
  	Oid			db_id;
  	Oid			db_tablespace;
! 	TransactionId db_frozenxid;
  	TransactionId db_vacuumxid;
  
  	filename = database_getflatfilename();
--- 441,447 ----
  	FILE	   *db_file;
  	Oid			db_id;
  	Oid			db_tablespace;
! 	TransactionId db_minxid;
  	TransactionId db_vacuumxid;
  
  	filename = database_getflatfilename();
***************
*** 452,458 ****
  				 errmsg("could not open file \"%s\": %m", filename)));
  
  	while (read_pg_database_line(db_file, thisname, &db_id,
! 								 &db_tablespace, &db_frozenxid,
  								 &db_vacuumxid))
  	{
  		autovac_dbase *db;
--- 452,458 ----
  				 errmsg("could not open file \"%s\": %m", filename)));
  
  	while (read_pg_database_line(db_file, thisname, &db_id,
! 								 &db_tablespace, &db_minxid,
  								 &db_vacuumxid))
  	{
  		autovac_dbase *db;
***************
*** 461,467 ****
  
  		db->oid = db_id;
  		db->name = pstrdup(thisname);
! 		db->frozenxid = db_frozenxid;
  		db->vacuumxid = db_vacuumxid;
  		/* these get set later: */
  		db->entry = NULL;
--- 461,467 ----
  
  		db->oid = db_id;
  		db->name = pstrdup(thisname);
! 		db->minxid = db_minxid;
  		db->vacuumxid = db_vacuumxid;
  		/* these get set later: */
  		db->entry = NULL;
Index: src/backend/utils/init/flatfiles.c
===================================================================
RCS file: /home/alvherre/cvs/pgsql/src/backend/utils/init/flatfiles.c,v
retrieving revision 1.15
diff -c -r1.15 flatfiles.c
*** src/backend/utils/init/flatfiles.c	15 Oct 2005 02:49:33 -0000	1.15
--- src/backend/utils/init/flatfiles.c	15 Nov 2005 01:12:30 -0000
***************
*** 163,169 ****
  /*
   * write_database_file: update the flat database file
   *
!  * A side effect is to determine the oldest database's datfrozenxid
   * so we can set or update the XID wrap limit.
   */
  static void
--- 163,169 ----
  /*
   * write_database_file: update the flat database file
   *
!  * A side effect is to determine the oldest database's datminxid
   * so we can set or update the XID wrap limit.
   */
  static void
***************
*** 177,183 ****
  	HeapScanDesc scan;
  	HeapTuple	tuple;
  	NameData	oldest_datname;
! 	TransactionId oldest_datfrozenxid = InvalidTransactionId;
  
  	/*
  	 * Create a temporary filename to be renamed later.  This prevents the
--- 177,183 ----
  	HeapScanDesc scan;
  	HeapTuple	tuple;
  	NameData	oldest_datname;
! 	TransactionId oldest_datminxid = InvalidTransactionId;
  
  	/*
  	 * Create a temporary filename to be renamed later.  This prevents the
***************
*** 208,234 ****
  		char	   *datname;
  		Oid			datoid;
  		Oid			dattablespace;
! 		TransactionId datfrozenxid,
! 					datvacuumxid;
  
  		datname = NameStr(dbform->datname);
  		datoid = HeapTupleGetOid(tuple);
  		dattablespace = dbform->dattablespace;
! 		datfrozenxid = dbform->datfrozenxid;
  		datvacuumxid = dbform->datvacuumxid;
  
  		/*
! 		 * Identify the oldest datfrozenxid, ignoring databases that are not
! 		 * connectable (we assume they are safely frozen).	This must match
  		 * the logic in vac_truncate_clog() in vacuum.c.
  		 */
  		if (dbform->datallowconn &&
! 			TransactionIdIsNormal(datfrozenxid))
  		{
! 			if (oldest_datfrozenxid == InvalidTransactionId ||
! 				TransactionIdPrecedes(datfrozenxid, oldest_datfrozenxid))
  			{
! 				oldest_datfrozenxid = datfrozenxid;
  				namestrcpy(&oldest_datname, datname);
  			}
  		}
--- 208,235 ----
  		char	   *datname;
  		Oid			datoid;
  		Oid			dattablespace;
! 		TransactionId datminxid,
! 					  datvacuumxid;
! 					  
  
  		datname = NameStr(dbform->datname);
  		datoid = HeapTupleGetOid(tuple);
  		dattablespace = dbform->dattablespace;
! 		datminxid = dbform->datminxid;
  		datvacuumxid = dbform->datvacuumxid;
  
  		/*
! 		 * Identify the oldest datminxid, ignoring databases that are not
! 		 * connectable (we assume they are safely frozen).  This must match
  		 * the logic in vac_truncate_clog() in vacuum.c.
  		 */
  		if (dbform->datallowconn &&
! 			TransactionIdIsNormal(datminxid))
  		{
! 			if (oldest_datminxid == InvalidTransactionId ||
! 				TransactionIdPrecedes(datminxid, oldest_datminxid))
  			{
! 				oldest_datminxid = datminxid;
  				namestrcpy(&oldest_datname, datname);
  			}
  		}
***************
*** 244,257 ****
  		}
  
  		/*
! 		 * The file format is: "dbname" oid tablespace frozenxid vacuumxid
  		 *
! 		 * The xids are not needed for backend startup, but are of use to
  		 * autovacuum, and might also be helpful for forensic purposes.
  		 */
  		fputs_quote(datname, fp);
  		fprintf(fp, " %u %u %u %u\n",
! 				datoid, dattablespace, datfrozenxid, datvacuumxid);
  	}
  	heap_endscan(scan);
  
--- 245,258 ----
  		}
  
  		/*
! 		 * The file format is: "dbname" oid tablespace minxid vacuumxid
  		 *
! 		 * The minxid is not needed for backend startup, but is of use to
  		 * autovacuum, and might also be helpful for forensic purposes.
  		 */
  		fputs_quote(datname, fp);
  		fprintf(fp, " %u %u %u %u\n",
! 				datoid, dattablespace, datminxid, datvacuumxid);
  	}
  	heap_endscan(scan);
  
***************
*** 272,281 ****
  						tempname, filename)));
  
  	/*
! 	 * Set the transaction ID wrap limit using the oldest datfrozenxid
  	 */
! 	if (oldest_datfrozenxid != InvalidTransactionId)
! 		SetTransactionIdLimit(oldest_datfrozenxid, &oldest_datname);
  }
  
  
--- 273,282 ----
  						tempname, filename)));
  
  	/*
! 	 * Set the transaction ID wrap limit using the oldest datminxid
  	 */
! 	if (oldest_datminxid != InvalidTransactionId)
! 		SetTransactionIdLimit(oldest_datminxid, &oldest_datname);
  }
  
  
Index: src/include/access/transam.h
===================================================================
RCS file: /home/alvherre/cvs/pgsql/src/include/access/transam.h,v
retrieving revision 1.56
diff -c -r1.56 transam.h
*** src/include/access/transam.h	15 Oct 2005 02:49:42 -0000	1.56
--- src/include/access/transam.h	7 Nov 2005 12:35:01 -0000
***************
*** 123,130 ****
  /* in transam/varsup.c */
  extern TransactionId GetNewTransactionId(bool isSubXact);
  extern TransactionId ReadNewTransactionId(void);
! extern void SetTransactionIdLimit(TransactionId oldest_datfrozenxid,
! 					  Name oldest_datname);
  extern Oid	GetNewObjectId(void);
  
  #endif   /* TRAMSAM_H */
--- 123,130 ----
  /* in transam/varsup.c */
  extern TransactionId GetNewTransactionId(bool isSubXact);
  extern TransactionId ReadNewTransactionId(void);
! extern void SetTransactionIdLimit(TransactionId oldest_datminxid,
! 								  Name oldest_datname);
  extern Oid	GetNewObjectId(void);
  
  #endif   /* TRAMSAM_H */
Index: src/include/catalog/pg_attribute.h
===================================================================
RCS file: /home/alvherre/cvs/pgsql/src/include/catalog/pg_attribute.h,v
retrieving revision 1.119
diff -c -r1.119 pg_attribute.h
*** src/include/catalog/pg_attribute.h	15 Oct 2005 02:49:42 -0000	1.119
--- src/include/catalog/pg_attribute.h	14 Nov 2005 19:56:02 -0000
***************
*** 404,410 ****
  { 1259, {"relhaspkey"},    16, -1,	1, 22, 0, -1, -1, true, 'p', 'c', true, false, false, true, 0 }, \
  { 1259, {"relhasrules"},   16, -1,	1, 23, 0, -1, -1, true, 'p', 'c', true, false, false, true, 0 }, \
  { 1259, {"relhassubclass"},16, -1,	1, 24, 0, -1, -1, true, 'p', 'c', true, false, false, true, 0 }, \
! { 1259, {"relacl"},		 1034, -1, -1, 25, 1, -1, -1, false, 'x', 'i', false, false, false, true, 0 }
  
  DATA(insert ( 1259 relname			19 -1 NAMEDATALEN	1 0 -1 -1 f p i t f f t 0));
  DATA(insert ( 1259 relnamespace		26 -1 4   2 0 -1 -1 t p i t f f t 0));
--- 404,412 ----
  { 1259, {"relhaspkey"},    16, -1,	1, 22, 0, -1, -1, true, 'p', 'c', true, false, false, true, 0 }, \
  { 1259, {"relhasrules"},   16, -1,	1, 23, 0, -1, -1, true, 'p', 'c', true, false, false, true, 0 }, \
  { 1259, {"relhassubclass"},16, -1,	1, 24, 0, -1, -1, true, 'p', 'c', true, false, false, true, 0 }, \
! { 1259, {"relminxid"},     28, -1,	4, 25, 0, -1, -1, true, 'p', 'i', true, false, false, true, 0 }, \
! { 1259, {"relvacuumxid"},  28, -1,	4, 26, 0, -1, -1, true, 'p', 'i', true, false, false, true, 0 }, \
! { 1259, {"relacl"},		 1034, -1, -1, 27, 1, -1, -1, false, 'x', 'i', false, false, false, true, 0 }
  
  DATA(insert ( 1259 relname			19 -1 NAMEDATALEN	1 0 -1 -1 f p i t f f t 0));
  DATA(insert ( 1259 relnamespace		26 -1 4   2 0 -1 -1 t p i t f f t 0));
***************
*** 430,436 ****
  DATA(insert ( 1259 relhaspkey		16 -1 1  22 0 -1 -1 t p c t f f t 0));
  DATA(insert ( 1259 relhasrules		16 -1 1  23 0 -1 -1 t p c t f f t 0));
  DATA(insert ( 1259 relhassubclass	16 -1 1  24 0 -1 -1 t p c t f f t 0));
! DATA(insert ( 1259 relacl		  1034 -1 -1 25 1 -1 -1 f x i f f f t 0));
  DATA(insert ( 1259 ctid				27 0  6  -1 0 -1 -1 f p s t f f t 0));
  DATA(insert ( 1259 oid				26 0  4  -2 0 -1 -1 t p i t f f t 0));
  DATA(insert ( 1259 xmin				28 0  4  -3 0 -1 -1 t p i t f f t 0));
--- 432,440 ----
  DATA(insert ( 1259 relhaspkey		16 -1 1  22 0 -1 -1 t p c t f f t 0));
  DATA(insert ( 1259 relhasrules		16 -1 1  23 0 -1 -1 t p c t f f t 0));
  DATA(insert ( 1259 relhassubclass	16 -1 1  24 0 -1 -1 t p c t f f t 0));
! DATA(insert ( 1259 relminxid		28 -1 4  25 0 -1 -1 t p i t f f t 0));
! DATA(insert ( 1259 relvacuumxid		28 -1 4  26 0 -1 -1 t p i t f f t 0));
! DATA(insert ( 1259 relacl		  1034 -1 -1 27 1 -1 -1 f x i f f f t 0));
  DATA(insert ( 1259 ctid				27 0  6  -1 0 -1 -1 f p s t f f t 0));
  DATA(insert ( 1259 oid				26 0  4  -2 0 -1 -1 t p i t f f t 0));
  DATA(insert ( 1259 xmin				28 0  4  -3 0 -1 -1 t p i t f f t 0));
Index: src/include/catalog/pg_class.h
===================================================================
RCS file: /home/alvherre/cvs/pgsql/src/include/catalog/pg_class.h,v
retrieving revision 1.90
diff -c -r1.90 pg_class.h
*** src/include/catalog/pg_class.h	15 Oct 2005 02:49:42 -0000	1.90
--- src/include/catalog/pg_class.h	14 Nov 2005 20:46:10 -0000
***************
*** 74,79 ****
--- 74,81 ----
  	bool		relhaspkey;		/* has PRIMARY KEY index */
  	bool		relhasrules;	/* has associated rules */
  	bool		relhassubclass; /* has derived classes */
+ 	TransactionId relminxid;	/* minimum Xid present in table */
+ 	TransactionId relvacuumxid;	/* OldestXmin of latest vacuum */
  
  	/*
  	 * relacl may or may not be present, see note above!
***************
*** 83,89 ****
  
  /* Size of fixed part of pg_class tuples, not counting relacl or padding */
  #define CLASS_TUPLE_SIZE \
! 	 (offsetof(FormData_pg_class,relhassubclass) + sizeof(bool))
  
  /* ----------------
   *		Form_pg_class corresponds to a pointer to a tuple with
--- 85,91 ----
  
  /* Size of fixed part of pg_class tuples, not counting relacl or padding */
  #define CLASS_TUPLE_SIZE \
! 	 (offsetof(FormData_pg_class,relvacuumxid) + sizeof(TransactionId))
  
  /* ----------------
   *		Form_pg_class corresponds to a pointer to a tuple with
***************
*** 103,110 ****
   *		relacl field.  This is a kluge.
   * ----------------
   */
! #define Natts_pg_class_fixed			24
! #define Natts_pg_class					25
  #define Anum_pg_class_relname			1
  #define Anum_pg_class_relnamespace		2
  #define Anum_pg_class_reltype			3
--- 105,112 ----
   *		relacl field.  This is a kluge.
   * ----------------
   */
! #define Natts_pg_class_fixed			26
! #define Natts_pg_class					27
  #define Anum_pg_class_relname			1
  #define Anum_pg_class_relnamespace		2
  #define Anum_pg_class_reltype			3
***************
*** 129,135 ****
  #define Anum_pg_class_relhaspkey		22
  #define Anum_pg_class_relhasrules		23
  #define Anum_pg_class_relhassubclass	24
! #define Anum_pg_class_relacl			25
  
  /* ----------------
   *		initial contents of pg_class
--- 131,139 ----
  #define Anum_pg_class_relhaspkey		22
  #define Anum_pg_class_relhasrules		23
  #define Anum_pg_class_relhassubclass	24
! #define Anum_pg_class_relminxid			25
! #define Anum_pg_class_relvacuumxid		26
! #define Anum_pg_class_relacl			27
  
  /* ----------------
   *		initial contents of pg_class
***************
*** 139,151 ****
   * ----------------
   */
  
! DATA(insert OID = 1247 (  pg_type		PGNSP 71 PGUID 0 1247 0 0 0 0 0 f f r 23 0 0 0 0 0 t f f f _null_ ));
  DESCR("");
! DATA(insert OID = 1249 (  pg_attribute	PGNSP 75 PGUID 0 1249 0 0 0 0 0 f f r 17 0 0 0 0 0 f f f f _null_ ));
  DESCR("");
! DATA(insert OID = 1255 (  pg_proc		PGNSP 81 PGUID 0 1255 0 0 0 0 0 f f r 18 0 0 0 0 0 t f f f _null_ ));
  DESCR("");
! DATA(insert OID = 1259 (  pg_class		PGNSP 83 PGUID 0 1259 0 0 0 0 0 f f r 25 0 0 0 0 0 t f f f _null_ ));
  DESCR("");
  
  #define		  RELKIND_INDEX			  'i'		/* secondary index */
--- 143,155 ----
   * ----------------
   */
  
! DATA(insert OID = 1247 (  pg_type		PGNSP 71 PGUID 0 1247 0 0 0 0 0 f f r 23 0 0 0 0 0 t f f f 0 0 _null_ ));
  DESCR("");
! DATA(insert OID = 1249 (  pg_attribute	PGNSP 75 PGUID 0 1249 0 0 0 0 0 f f r 17 0 0 0 0 0 f f f f 0 0 _null_ ));
  DESCR("");
! DATA(insert OID = 1255 (  pg_proc		PGNSP 81 PGUID 0 1255 0 0 0 0 0 f f r 18 0 0 0 0 0 t f f f 0 0 _null_ ));
  DESCR("");
! DATA(insert OID = 1259 (  pg_class		PGNSP 83 PGUID 0 1259 0 0 0 0 0 f f r 27 0 0 0 0 0 t f f f 0 0 _null_ ));
  DESCR("");
  
  #define		  RELKIND_INDEX			  'i'		/* secondary index */
Index: src/include/catalog/pg_database.h
===================================================================
RCS file: /home/alvherre/cvs/pgsql/src/include/catalog/pg_database.h,v
retrieving revision 1.38
diff -c -r1.38 pg_database.h
*** src/include/catalog/pg_database.h	15 Oct 2005 02:49:42 -0000	1.38
--- src/include/catalog/pg_database.h	14 Nov 2005 21:42:41 -0000
***************
*** 42,49 ****
  	bool		datallowconn;	/* new connections allowed? */
  	int4		datconnlimit;	/* max connections allowed (-1=no limit) */
  	Oid			datlastsysoid;	/* highest OID to consider a system OID */
! 	TransactionId datvacuumxid; /* all XIDs before this are vacuumed */
! 	TransactionId datfrozenxid; /* all XIDs before this are frozen */
  	Oid			dattablespace;	/* default table space for this DB */
  	text		datconfig[1];	/* database-specific GUC (VAR LENGTH) */
  	aclitem		datacl[1];		/* access permissions (VAR LENGTH) */
--- 42,50 ----
  	bool		datallowconn;	/* new connections allowed? */
  	int4		datconnlimit;	/* max connections allowed (-1=no limit) */
  	Oid			datlastsysoid;	/* highest OID to consider a system OID */
! 	TransactionId datminxid;	/* no table contains an Xid below this one */
! 	TransactionId datvacuumxid;	/* all Xids in all tables before this have
! 								 * been marked known-committed or -aborted */
  	Oid			dattablespace;	/* default table space for this DB */
  	text		datconfig[1];	/* database-specific GUC (VAR LENGTH) */
  	aclitem		datacl[1];		/* access permissions (VAR LENGTH) */
***************
*** 68,75 ****
  #define Anum_pg_database_datallowconn	5
  #define Anum_pg_database_datconnlimit	6
  #define Anum_pg_database_datlastsysoid	7
! #define Anum_pg_database_datvacuumxid	8
! #define Anum_pg_database_datfrozenxid	9
  #define Anum_pg_database_dattablespace	10
  #define Anum_pg_database_datconfig		11
  #define Anum_pg_database_datacl			12
--- 69,76 ----
  #define Anum_pg_database_datallowconn	5
  #define Anum_pg_database_datconnlimit	6
  #define Anum_pg_database_datlastsysoid	7
! #define Anum_pg_database_datminxid		8
! #define Anum_pg_database_datvacuumxid	9
  #define Anum_pg_database_dattablespace	10
  #define Anum_pg_database_datconfig		11
  #define Anum_pg_database_datacl			12
Index: src/include/commands/vacuum.h
===================================================================
RCS file: /home/alvherre/cvs/pgsql/src/include/commands/vacuum.h,v
retrieving revision 1.62
diff -c -r1.62 vacuum.h
*** src/include/commands/vacuum.h	15 Oct 2005 02:49:44 -0000	1.62
--- src/include/commands/vacuum.h	14 Nov 2005 21:17:05 -0000
***************
*** 114,123 ****
  extern void vac_open_indexes(Relation relation, LOCKMODE lockmode,
  				 int *nindexes, Relation **Irel);
  extern void vac_close_indexes(int nindexes, Relation *Irel, LOCKMODE lockmode);
! extern void vac_update_relstats(Oid relid,
! 					BlockNumber num_pages,
! 					double num_tuples,
! 					bool hasindex);
  extern void vacuum_set_xid_limits(VacuumStmt *vacstmt, bool sharedRel,
  					  TransactionId *oldestXmin,
  					  TransactionId *freezeLimit);
--- 114,123 ----
  extern void vac_open_indexes(Relation relation, LOCKMODE lockmode,
  				 int *nindexes, Relation **Irel);
  extern void vac_close_indexes(int nindexes, Relation *Irel, LOCKMODE lockmode);
! extern void vac_update_relstats(Oid relid, BlockNumber num_pages,
! 					double num_tuples, bool hasindex, bool update_xids,
! 					TransactionId minxid, TransactionId vacuumxid,
! 					TransactionId *relminxid, TransactionId *relvacuumxid);
  extern void vacuum_set_xid_limits(VacuumStmt *vacstmt, bool sharedRel,
  					  TransactionId *oldestXmin,
  					  TransactionId *freezeLimit);
***************
*** 125,131 ****
  extern void vacuum_delay_point(void);
  
  /* in commands/vacuumlazy.c */
! extern void lazy_vacuum_rel(Relation onerel, VacuumStmt *vacstmt);
  
  /* in commands/analyze.c */
  extern void analyze_rel(Oid relid, VacuumStmt *vacstmt);
--- 125,132 ----
  extern void vacuum_delay_point(void);
  
  /* in commands/vacuumlazy.c */
! extern void lazy_vacuum_rel(Relation onerel, VacuumStmt *vacstmt,
! 				TransactionId *relminxid, TransactionId *relvacuumxid);
  
  /* in commands/analyze.c */
  extern void analyze_rel(Oid relid, VacuumStmt *vacstmt);
Index: src/include/libpq/hba.h
===================================================================
RCS file: /home/alvherre/cvs/pgsql/src/include/libpq/hba.h,v
retrieving revision 1.41
diff -c -r1.41 hba.h
*** src/include/libpq/hba.h	15 Oct 2005 02:49:44 -0000	1.41
--- src/include/libpq/hba.h	15 Nov 2005 01:04:19 -0000
***************
*** 36,43 ****
  extern void load_role(void);
  extern int	hba_getauthmethod(hbaPort *port);
  extern int	authident(hbaPort *port);
! extern bool read_pg_database_line(FILE *fp, char *dbname, Oid *dboid,
! 					  Oid *dbtablespace, TransactionId *dbfrozenxid,
! 					  TransactionId *dbvacuumxid);
  
  #endif   /* HBA_H */
--- 36,43 ----
  extern void load_role(void);
  extern int	hba_getauthmethod(hbaPort *port);
  extern int	authident(hbaPort *port);
! extern bool	read_pg_database_line(FILE *fp, char *dbname, Oid *dboid,
! 								  Oid *dbtablespace, TransactionId *dbminxid,
! 								  TransactionId *dbvacuumxid);
  
  #endif   /* HBA_H */
#9Simon Riggs
simon@2ndquadrant.com
In reply to: Alvaro Herrera (#8)
Re: [HACKERS] Per-table freeze limit proposal

On Mon, 2005-11-14 at 23:40 -0300, Alvaro Herrera wrote:

The whole thing is pretty fragile is somebody manually updates a
catalog. But we tell people not to do that, so it should be their
fault, right?

Hmmmm...sounds scary. Cool ideas in the patch though.

I discovered one problem with the whole approach. Per this patch, we
only store normal Xids in relminxid/relvacuumxid. So if a table is
completely frozen, we will store RecentXmin. We do this because it
would actually be unsafe to store, say, FrozenXid: if another
transaction stores/changes a tuple while we are vacuuming it, the Frozen
mark wouldn't be correct and thus the table could be corrupted if a Xid
wraparound happens (which is why we use RecentXmin in the first place:
to cope with the possibility of someone else using the table during the
vacuum.)

Yep. And because VACUUM FULL FREEZE is no longer possible.

The problem comes when this is done to template1, and it is copied to
another database after some millions of transactions have come and go --
it will seem like the database has suffered wraparound. We would need
to vacuum it completely after copied for the stats to be accurate.

I don't understand the issue, can you explain more? I see no problem. If
an identical copy gives a problem then surely template1 should also.

I'm not sure what to do about that. I think storing FrozenXid may not
actually be a totally bad idea. Comments?

Its not a totally bad idea, but it has some risk, which where
transactions are concerned is not really acceptable.

Perhaps we should reinstate VACUUM FULL FREEZE to do just a FREEZE with
a table lock and skip all that moving data around.

Best Regards, Simon Riggs

#10Alvaro Herrera
alvherre@alvh.no-ip.org
In reply to: Simon Riggs (#9)
Re: [HACKERS] Per-table freeze limit proposal

Simon Riggs wrote:

On Mon, 2005-11-14 at 23:40 -0300, Alvaro Herrera wrote:

The whole thing is pretty fragile is somebody manually updates a
catalog. But we tell people not to do that, so it should be their
fault, right?

Hmmmm...sounds scary. Cool ideas in the patch though.

Yeah, well, actually the problem is solved very easily by setting the
pg_database tuple manually, either to InvalidTransactionId or to the
minimum computed from pg_class.

The problem comes when this is done to template1, and it is copied to
another database after some millions of transactions have come and go --
it will seem like the database has suffered wraparound. We would need
to vacuum it completely after copied for the stats to be accurate.

I don't understand the issue, can you explain more? I see no problem. If
an identical copy gives a problem then surely template1 should also.

Actually, template1 has the problem too. The scenario is this:

- template1 is freezed. datminxid <- X
- a long time passes, say INT_MAX * 0.75 transactions
- a new database D is created, which coming from template1 has datminxid=X
- the Xid counter is past the vacuum horizon for D.datminxid, so the
system determines that the Xid counter could be wrapped already.
- The system automatically decides to stop accepting new transactions.

In fact there's no problem because in D, just like in template1, all
tuples are frozen. How should we mark this on the catalogs? I don't
see any way.

Note that setting relminxid = FrozenTransactionId is bogus in any case,
because even if we correctly lock and freeze the table, the next
transaction after the vacuum could insert a new tuple into the table.
But we don't want INSERT to be checking pg_class.relminxid! (Or do we?)

Now, restating the problem, certainly template1 has the problem too. In
fact we have a bigger problem: we are forcing all tables to be vacuumed
every so often, even if they have been completely frozen before! This
is because setting relminxid = Frozen is really bogus.

I'm not sure what to do about that. I think storing FrozenXid may not
actually be a totally bad idea. Comments?

Its not a totally bad idea, but it has some risk, which where
transactions are concerned is not really acceptable.

Perhaps we should reinstate VACUUM FULL FREEZE to do just a FREEZE with
a table lock and skip all that moving data around.

Doesn't work either because of the argument above.

What about assuming that if somebody executes a database-wide FREEZE, he
knows what he is doing and thus we can mark datminxid as
FrozenTransactionId?

Sadly, I see all this as proof that the whole idea doesn't work. It
seems better than the current state of the system, where we rely on the
user to do certain things, or on pgstat which is inherently inexact.
But there is a big hole in the whole reasoning which hasn't been filled
yet.

Any ideas welcome. The idea of any insert/delete/update operation
checking a bit in the Relation and resetting relminxid to
TopTransactionId if it's marked Frozen is the only one I have right now.
What do people think about it?

--
Alvaro Herrera http://www.PlanetPostgreSQL.org
"No hay cielo posible sin hundir nuestras ra�ces
en la profundidad de la tierra" (Malucha Pinto)

#11Simon Riggs
simon@2ndquadrant.com
In reply to: Alvaro Herrera (#10)
Re: [HACKERS] Per-table freeze limit proposal

On Tue, 2005-11-15 at 21:58 -0300, Alvaro Herrera wrote:

In fact there's no problem because in D, just like in template1, all
tuples are frozen. How should we mark this on the catalogs? I don't
see any way.

All tuples might be frozen or might not be, the point is you don't know.
That's why you can't use FrozenTransactionId.

Perhaps we should reinstate VACUUM FULL FREEZE to do just a FREEZE with
a table lock and skip all that moving data around.

Doesn't work either because of the argument above.

What about assuming that if somebody executes a database-wide FREEZE, he
knows what he is doing and thus we can mark datminxid as
FrozenTransactionId?

If you lock the table before FREEZE then you will guarantee that all
rows will be frozen and you really can then set FrozenTransactionId.

Making VACUUM FREEZE take full table locks seems like a very useful
thing to me, and it would solve your problems also.

Best Regards, Simon Riggs

#12Simon Riggs
simon@2ndquadrant.com
In reply to: Simon Riggs (#11)
Re: [HACKERS] Per-table freeze limit proposal

On Wed, 2005-11-16 at 08:31 +0000, Simon Riggs wrote:

On Tue, 2005-11-15 at 21:58 -0300, Alvaro Herrera wrote:

In fact there's no problem because in D, just like in template1, all
tuples are frozen. How should we mark this on the catalogs? I don't
see any way.

All tuples might be frozen or might not be, the point is you don't know.
That's why you can't use FrozenTransactionId.

Perhaps we should reinstate VACUUM FULL FREEZE to do just a FREEZE with
a table lock and skip all that moving data around.

Doesn't work either because of the argument above.

What about assuming that if somebody executes a database-wide FREEZE, he
knows what he is doing and thus we can mark datminxid as
FrozenTransactionId?

If you lock the table before FREEZE then you will guarantee that all
rows will be frozen and you really can then set FrozenTransactionId.

Making VACUUM FREEZE take full table locks seems like a very useful
thing to me, and it would solve your problems also.

Thinking some more, when initdb issues VACUUM FREEZE we know for certain
that nobody else is issuing commands against the database at that point,
which is equivalent to a table lock. So we should be able to have a
VACUUM FREEZE detect that and if so, set FrozenTransactionId.

In normal concurrent running, I would like VACUUM FREEZE to issue a full
table SHARE lock to ensure that we can set FrozenTransactionId for that
also. Otherwise we will not be able to move frozen tables to read only
media.

Best Regards, Simon Riggs

#13Alvaro Herrera
alvherre@alvh.no-ip.org
In reply to: Simon Riggs (#12)
Re: [HACKERS] Per-table freeze limit proposal

Simon Riggs wrote:

On Wed, 2005-11-16 at 08:31 +0000, Simon Riggs wrote:

All tuples might be frozen or might not be, the point is you don't know.
That's why you can't use FrozenTransactionId.

Thinking some more, when initdb issues VACUUM FREEZE we know for certain
that nobody else is issuing commands against the database at that point,
which is equivalent to a table lock. So we should be able to have a
VACUUM FREEZE detect that and if so, set FrozenTransactionId.

In normal concurrent running, I would like VACUUM FREEZE to issue a full
table SHARE lock to ensure that we can set FrozenTransactionId for that
also. Otherwise we will not be able to move frozen tables to read only
media.

You missed one point however. Even if VACUUM FREEZE freezes all tuples,
any transaction following that one is able to insert non-frozen tuples
into the table. At that instant, having marked the table with Frozen is
bogus, no matter what amount of locks you took on it.

We can only do that (mark the table Frozen) if and only if all following
transactions are forced to mark the table "unfrozen" as soon as they
change it. Are we going to do that?

Note that trying to unfreeze a table might be difficult -- we might need
to obtain a lock on pg_class after we have opened and locked the target
relation, leading to possible deadlock on INSERT. Yikes.

--
Alvaro Herrera Valdivia, Chile ICBM: S 39� 49' 17.7", W 73� 14' 26.8"
"Ciencias pol�ticas es la ciencia de entender por qu�
los pol�ticos act�an como lo hacen" (netfunny.com)

#14Simon Riggs
simon@2ndquadrant.com
In reply to: Alvaro Herrera (#13)
Re: [HACKERS] Per-table freeze limit proposal

On Wed, 2005-11-16 at 07:52 -0300, Alvaro Herrera wrote:

Simon Riggs wrote:

On Wed, 2005-11-16 at 08:31 +0000, Simon Riggs wrote:

All tuples might be frozen or might not be, the point is you don't know.
That's why you can't use FrozenTransactionId.

Thinking some more, when initdb issues VACUUM FREEZE we know for certain
that nobody else is issuing commands against the database at that point,
which is equivalent to a table lock. So we should be able to have a
VACUUM FREEZE detect that and if so, set FrozenTransactionId.

In normal concurrent running, I would like VACUUM FREEZE to issue a full
table SHARE lock to ensure that we can set FrozenTransactionId for that
also. Otherwise we will not be able to move frozen tables to read only
media.

You missed one point however. Even if VACUUM FREEZE freezes all tuples,
any transaction following that one is able to insert non-frozen tuples
into the table. At that instant, having marked the table with Frozen is
bogus, no matter what amount of locks you took on it.

(OK I think we are getting there now, after my usual comms errors.)

We need something stronger than VACUUM FREEZE then. Perhaps an ALTER
TABLE READONLY. That would do a FREEZE and place a permanent table share
lock, so we wouldn't need to set/unset the Frozen state. We'd do that as
a permissions thing, rather than an actual lock. That way copies of the
data could still be taken with ease and the copies would not themselves
be READONLY.

Not sure what you'd call it to make a whole database readonly all at
once...but whatever we call it we know initdb wants to run it on
template1.

Then your original thought becomes fully viable.

This is particularly important because I see the need to be able to
freeze older data partitions and migrate them to readonly media as part
of very high volume data applications.

Best Regards, Simon Riggs