From 0e45c21801c81f35c8718f7d2eafc8b45abeae4c Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horiguchi.kyotaro@lab.ntt.co.jp>
Date: Thu, 5 Jul 2018 18:50:27 +0900
Subject: [PATCH 8/8] PoC implement of undo log implement

To keep consistency of stats data within a transaction, this patch
adds undo log feature to stats collector. It collects undo log if any
backend needs it. The undo log is of a fixed size (1000) and if it is
filled up, consistency is no longer kept. In the common case stats is
not refered in long-lived transaction and the consistency gets trivial
in such a long term.
---
 src/backend/postmaster/autovacuum.c |  16 +--
 src/backend/postmaster/pgstat.c     | 222 ++++++++++++++++++++++++++++++++----
 src/backend/utils/adt/pgstatfuncs.c |  42 +++----
 src/include/pgstat.h                |   2 +-
 4 files changed, 227 insertions(+), 55 deletions(-)

diff --git a/src/backend/postmaster/autovacuum.c b/src/backend/postmaster/autovacuum.c
index 74e3ab6167..dc911a7952 100644
--- a/src/backend/postmaster/autovacuum.c
+++ b/src/backend/postmaster/autovacuum.c
@@ -981,7 +981,7 @@ rebuild_database_list(Oid newdb)
 		PgStat_StatDBEntry *entry;
 
 		/* only consider this database if it has a pgstat entry */
-		entry = pgstat_fetch_stat_dbentry(newdb);
+		entry = pgstat_fetch_stat_dbentry(newdb, false);
 		if (entry != NULL)
 		{
 			/* we assume it isn't found because the hash was just created */
@@ -1005,7 +1005,7 @@ rebuild_database_list(Oid newdb)
 		 * skip databases with no stat entries -- in particular, this gets rid
 		 * of dropped databases
 		 */
-		entry = pgstat_fetch_stat_dbentry(avdb->adl_datid);
+		entry = pgstat_fetch_stat_dbentry(avdb->adl_datid, false);
 		if (entry == NULL)
 			continue;
 
@@ -1029,7 +1029,7 @@ rebuild_database_list(Oid newdb)
 		PgStat_StatDBEntry *entry;
 
 		/* only consider databases with a pgstat entry */
-		entry = pgstat_fetch_stat_dbentry(avdb->adw_datid);
+		entry = pgstat_fetch_stat_dbentry(avdb->adw_datid, false);
 		if (entry == NULL)
 			continue;
 
@@ -1239,7 +1239,7 @@ do_start_worker(void)
 			continue;			/* ignore not-at-risk DBs */
 
 		/* Find pgstat entry if any */
-		tmp->adw_entry = pgstat_fetch_stat_dbentry(tmp->adw_datid);
+		tmp->adw_entry = pgstat_fetch_stat_dbentry(tmp->adw_datid, false);
 
 		/*
 		 * Skip a database with no pgstat entry; it means it hasn't seen any
@@ -1975,7 +1975,7 @@ do_autovacuum(void)
 	 * may be NULL if we couldn't find an entry (only happens if we are
 	 * forcing a vacuum for anti-wrap purposes).
 	 */
-	dbentry = pgstat_fetch_stat_dbentry(MyDatabaseId);
+	dbentry = pgstat_fetch_stat_dbentry(MyDatabaseId, false);
 
 	/* Start a transaction so our commands have one to play into. */
 	StartTransactionCommand();
@@ -2025,7 +2025,7 @@ do_autovacuum(void)
 	MemoryContextSwitchTo(AutovacMemCxt);
 
 	/* The database hash where pgstat keeps shared relations */
-	shared = pgstat_fetch_stat_dbentry(InvalidOid);
+	shared = pgstat_fetch_stat_dbentry(InvalidOid, false);
 
 	classRel = heap_open(RelationRelationId, AccessShareLock);
 
@@ -2806,8 +2806,8 @@ table_recheck_autovac(Oid relid, HTAB *table_toast_map,
 	/* use fresh stats */
 	autovac_refresh_stats();
 
-	shared = pgstat_fetch_stat_dbentry(InvalidOid);
-	dbentry = pgstat_fetch_stat_dbentry(MyDatabaseId);
+	shared = pgstat_fetch_stat_dbentry(InvalidOid, false);
+	dbentry = pgstat_fetch_stat_dbentry(MyDatabaseId, false);
 
 	/* fetch the relation's relcache entry */
 	classTup = SearchSysCacheCopy1(RELOID, ObjectIdGetDatum(relid));
diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c
index af97e6b46b..3e16718057 100644
--- a/src/backend/postmaster/pgstat.c
+++ b/src/backend/postmaster/pgstat.c
@@ -92,6 +92,8 @@
 #define PGSTAT_TAB_HASH_SIZE	512
 #define PGSTAT_FUNCTION_HASH_SIZE	512
 
+/* Maximum number of stats undo entries */
+#define INIT_NUM_UNDOLOGS 1000
 
 /* ----------
  * Total number of backends including auxiliary
@@ -133,18 +135,41 @@ static time_t last_pgstat_start_time;
 
 static bool pgStatRunningInCollector = false;
 
+typedef struct PgStat_TabUndoLogEnt
+{
+	Oid		dboid;
+	PgStat_StatTabEntry ent;
+}  PgStat_TabUndoLogEnt;
+
 /* Shared stats bootstrap infomation */
 typedef struct StatsShmemStruct {
 	dsa_handle stats_dsa_handle;
 	dshash_table_handle db_stats_handle;
 	dsa_pointer	global_stats;
 	dsa_pointer	archiver_stats;
+	/* Undo log stuff */
+	int			nundoers;
+	dsa_pointer undoarea;
+	uint32		nundologs;
+	uint32		undoinsptr;
+	uint32		undoinsround;
+	uint32		lastundopos;
 } StatsShmemStruct;
 
-static StatsShmemStruct * StatsShmem = NULL;
+typedef struct PgStat_RelIdEnt
+{
+	Oid	dboid;
+	Oid	reloid;
+} PgStat_RelIdEnt;
+
+static bool   StatsUndoing = false;	/* Is this backend needs undoing? */
+static uint32 StatsUndoPtr = 0;		/* The first undo log this backend sees */
+static uint32 StatsUndoRound = 0;	/* The undo round for this backend */
+static StatsShmemStruct *StatsShmem = NULL;
 static dsa_area *area = NULL;
 static dshash_table *db_stats;
 static HTAB *snapshot_db_stats;
+static HTAB *undo_logged_tables = NULL;	/* to remember undo logged tables */
 
 /* dshash parameter for each type of table */
 static const dshash_parameters dsh_dbparams = {
@@ -169,6 +194,11 @@ static const dshash_parameters dsh_funcparams = {
 	LWTRANCHE_STATS_FUNC_TABLE
 };
 
+typedef struct {
+	Oid					dbid;
+	PgStat_StatTabEntry	ent;
+} PgStat_StatTabUndo;
+
 /*
  * Structures in which backends store per-table info that's waiting to be
  * sent to the collector.
@@ -274,7 +304,7 @@ static PgStat_ArchiverStats *shared_archiverStats;
 static PgStat_ArchiverStats *snapshot_archiverStats;
 static PgStat_GlobalStats *shared_globalStats;
 static PgStat_GlobalStats *snapshot_globalStats;
-
+static PgStat_TabUndoLogEnt *undodata;
 
 /*
  * List of OIDs of databases we need to write out.  If an entry is InvalidOid,
@@ -318,7 +348,7 @@ static void pgstat_read_statsfiles(void);
 static void pgstat_read_db_statsfile(Oid databaseid, dshash_table *tabhash, dshash_table *funchash);
 
 /* functions used in backends */
-static bool backend_snapshot_database_stats(void);
+static bool backend_snapshot_database_stats(bool snapshot);
 static PgStat_StatFuncEntry *backend_get_func_etnry(PgStat_StatDBEntry *dbent, Oid funcid, bool oneshot);
 static void pgstat_read_current_status(void);
 
@@ -996,6 +1026,10 @@ pgstat_create_shared_stats(void)
 		dsa_allocate0(area, sizeof(PgStat_ArchiverStats));
 	StatsShmem->db_stats_handle =
 		dshash_get_hash_table_handle(db_stats);
+	StatsShmem->nundologs = INIT_NUM_UNDOLOGS;
+	StatsShmem->undoarea =
+		dsa_allocate0(area, sizeof(PgStat_TabUndoLogEnt) * INIT_NUM_UNDOLOGS);
+	StatsShmem->undoinsptr = 0;
 
 	/* connect to the memory */
 	snapshot_db_stats = NULL;
@@ -1003,6 +1037,8 @@ pgstat_create_shared_stats(void)
 		dsa_get_address(area, StatsShmem->global_stats);
 	shared_archiverStats = (PgStat_ArchiverStats *)
 		dsa_get_address(area, StatsShmem->archiver_stats);
+	undodata = (PgStat_TabUndoLogEnt *)
+		dsa_get_address(area, StatsShmem->undoarea);
 	MemoryContextSwitchTo(oldcontext);
 	LWLockRelease(StatsLock);
 }
@@ -1031,7 +1067,7 @@ pgstat_vacuum_stat(void)
 		return;
 
 	/* If not done for this transaction, take a snapshot of stats */
-	if (!backend_snapshot_database_stats())
+	if (!backend_snapshot_database_stats(false))
 		return;
 
 	/*
@@ -2365,10 +2401,10 @@ pgstat_twophase_postabort(TransactionId xid, uint16 info,
  * ----------
  */
 PgStat_StatDBEntry *
-pgstat_fetch_stat_dbentry(Oid dbid)
+pgstat_fetch_stat_dbentry(Oid dbid, bool snapshot)
 {
 	/* If not done for this transaction, take a stats snapshot */
-	if (!backend_snapshot_database_stats())
+	if (!backend_snapshot_database_stats(snapshot))
 		return NULL;
 
 	/*
@@ -2395,7 +2431,7 @@ pgstat_fetch_stat_tabentry(Oid relid)
 	PgStat_StatTabEntry *tabentry;
 
 	/* Lookup our database, then look in its table hash table. */
-	dbentry = pgstat_fetch_stat_dbentry(MyDatabaseId);
+	dbentry = pgstat_fetch_stat_dbentry(MyDatabaseId, true);
 	if (dbentry == NULL)
 		return NULL;
 
@@ -2406,7 +2442,7 @@ pgstat_fetch_stat_tabentry(Oid relid)
 	/*
 	 * If we didn't find it, maybe it's a shared table.
 	 */
-	dbentry = pgstat_fetch_stat_dbentry(InvalidOid);
+	dbentry = pgstat_fetch_stat_dbentry(InvalidOid, true);
 	if (dbentry == NULL)
 		return NULL;
 
@@ -2432,11 +2468,11 @@ pgstat_fetch_stat_funcentry(Oid func_id)
 	PgStat_StatFuncEntry *funcentry = NULL;
 
 	/* If not done for this transaction, take a stats snapshot */
-	if (!backend_snapshot_database_stats())
+	if (!backend_snapshot_database_stats(IsTransactionState()))
 		return NULL;
 
 	/* Lookup our database, then find the requested function */
-	dbentry = pgstat_fetch_stat_dbentry(MyDatabaseId);
+	dbentry = pgstat_fetch_stat_dbentry(MyDatabaseId, true);
 	if (dbentry == NULL)
 		return NULL;
 
@@ -2519,7 +2555,7 @@ PgStat_ArchiverStats *
 pgstat_fetch_stat_archiver(void)
 {
 	/* If not done for this transaction, take a stats snapshot */
-	if (!backend_snapshot_database_stats())
+	if (!backend_snapshot_database_stats(IsTransactionState()))
 		return NULL;
 
 	return snapshot_archiverStats;
@@ -2538,7 +2574,7 @@ PgStat_GlobalStats *
 pgstat_fetch_global(void)
 {
 	/* If not done for this transaction, take a stats snapshot */
-	if (!backend_snapshot_database_stats())
+	if (!backend_snapshot_database_stats(IsTransactionState()))
 		return NULL;
 
 	return snapshot_globalStats;
@@ -3908,7 +3944,6 @@ pgstat_get_wait_io(WaitEventIO w)
 	return event_name;
 }
 
-
 /* ----------
  * pgstat_get_backend_current_activity() -
  *
@@ -5293,6 +5328,22 @@ backend_clean_snapshot_callback(void *arg)
 	snapshot_globalStats = NULL;
 	snapshot_archiverStats = NULL;
 	snapshot_db_stats = NULL;
+	if (StatsUndoing)
+	{
+		Assert(StatsShmem->nundoers > 0);
+
+		LWLockAcquire(StatsLock, LW_EXCLUSIVE);
+		StatsShmem->nundoers--;
+
+		/* If I was the last undoer, reset shared pointers */
+		if (StatsShmem->nundoers == 0)
+		{
+			StatsShmem->undoinsptr = 0;
+			StatsShmem->undoinsround = 0;
+		}
+		LWLockRelease(StatsLock);
+		StatsUndoing = false;
+	}
 }
 
 /*
@@ -5363,7 +5414,8 @@ static void *
 snapshot_statentry(HTAB **dest,
 				   dshash_table_handle dsh_handle,
 				   const dshash_parameters *dsh_params,
-				   Oid key, size_t keysize, size_t entrysize)
+				   Oid key, size_t keysize, size_t entrysize,
+				   bool *found_in_cache)
 {
 	void *lentry = NULL;
 
@@ -5409,12 +5461,13 @@ snapshot_statentry(HTAB **dest,
 		}
 		dshash_detach(t);
 
+		if (found_in_cache)
+			*found_in_cache = true;
+
 		return NULL;
 	}
 	else if (dest)
 	{
-		bool found;
-
 		/*
 		 * Create new hash for entry cache. Make with arbitrary initial
 		 * entries since we don't know how this hash will grow.
@@ -5422,8 +5475,8 @@ snapshot_statentry(HTAB **dest,
 		if (!*dest)
 			*dest = create_local_stats_hash(keysize, entrysize, 32);
 
-		lentry = hash_search(*dest, &key, HASH_ENTER, &found);
-		if (!found)
+		lentry = hash_search(*dest, &key, HASH_ENTER, found_in_cache);
+		if (!*found_in_cache)
 		{
 			dshash_table *t = dshash_attach(area, dsh_params, dsh_handle, NULL);
 			void *sentry = dshash_find(t, &key, false);
@@ -5460,6 +5513,10 @@ snapshot_statentry(HTAB **dest,
 			memcpy(lentry, sentry, entrysize);
 			dshash_release_lock(t, sentry);
 		}
+		
+		if (found_in_cache)
+			*found_in_cache = false;
+
 		dshash_detach(t);
 	}
 	
@@ -5475,7 +5532,7 @@ snapshot_statentry(HTAB **dest,
  * Returns false if the shared stats is not created yet.
  */
 static bool
-backend_snapshot_database_stats(void)
+backend_snapshot_database_stats(bool snapshot)
 {
 	MemoryContext oldcontext;
 	MemoryContextCallback *mcxt_cb;
@@ -5515,6 +5572,18 @@ backend_snapshot_database_stats(void)
 	/* set the timestamp of this snapshot */
 	snapshot_globalStats->stats_timestamp = GetCurrentTimestamp();
 
+	if (snapshot && IsTransactionState())
+	{
+		LWLockAcquire(StatsLock, LW_EXCLUSIVE);
+		StatsUndoing = true;
+		StatsShmem->nundoers++;
+		StatsShmem->lastundopos = StatsShmem->undoinsptr;
+		StatsUndoPtr = StatsShmem->undoinsptr;
+		StatsUndoRound = StatsShmem->undoinsround;
+		LWLockRelease(StatsLock);
+		hash_destroy(undo_logged_tables);
+		undo_logged_tables = NULL;
+	}	
 	take_db_stats_snapshot();
 
 	/* register callback to clear snapshot */
@@ -5537,12 +5606,63 @@ backend_snapshot_database_stats(void)
 PgStat_StatTabEntry *
 backend_get_tab_entry(PgStat_StatDBEntry *dbent, Oid reloid, bool oneshot)
 {
+	bool found_in_cache;
+	uint32 myround, round, start, end, nent, nundologs;
+	PgStat_TabUndoLogEnt *undolist, *p;
+	int i;
+
 	/* take a local snapshot if we don't have one */
-	return snapshot_statentry(oneshot ? NULL : &dbent->snapshot_tables,
-							  dbent->tables, &dsh_tblparams,
-							  reloid,
-							  dsh_tblparams.key_size,
-							  dsh_tblparams.entry_size);
+	PgStat_StatTabEntry *ent =
+		snapshot_statentry(oneshot ? NULL : &dbent->snapshot_tables,
+						   dbent->tables, &dsh_tblparams,
+						   reloid,
+						   dsh_tblparams.key_size,
+						   dsh_tblparams.entry_size,
+						   &found_in_cache);
+
+	/* Just return the result if caching is not required */
+	if (oneshot || found_in_cache || reloid == InvalidOid || !StatsUndoing)
+		return ent;
+
+	/* Search for undo list */
+	LWLockAcquire(StatsLock, LW_SHARED);
+	round = StatsShmem->undoinsround;
+	end = StatsShmem->undoinsptr;
+	nundologs = StatsShmem->nundologs;
+	LWLockRelease(StatsLock);
+
+	myround = StatsUndoRound;
+	start = StatsUndoPtr;
+
+	/* Check for undo wrap around*/
+	if (myround <= round)
+		nent = (round - myround) * nundologs + (end - start);
+	else
+		nent = (nundologs - (myround - round)) * nundologs + (end - start);
+
+	undolist = (PgStat_TabUndoLogEnt *)
+		dsa_get_address(area, StatsShmem->undoarea);
+	if (nent > nundologs)
+	{
+		elog(WARNING, "Stats undo list wrap arounded. Older state is lost");
+		return ent;
+	}
+
+	for (p = undolist + start, i = 0 ; i < nent ; i++)
+	{
+		if (p->dboid == dbent->databaseid &&
+			p->ent.tableid == reloid)
+		{
+			memcpy(ent, &p->ent, sizeof(PgStat_StatTabEntry));
+			break;
+		}
+		if (start + i < nundologs)
+			p++;
+		else
+			p = undolist;
+	}
+
+	return ent;
 }
 
 /* ----------
@@ -5559,7 +5679,8 @@ backend_get_func_etnry(PgStat_StatDBEntry *dbent, Oid funcid, bool oneshot)
 							  dbent->functions, &dsh_funcparams,
 							  funcid,
 							  dsh_funcparams.key_size,
-							  dsh_funcparams.entry_size);
+							  dsh_funcparams.entry_size,
+							  NULL);
 }
 
 /* ----------
@@ -5610,6 +5731,56 @@ pgstat_clear_snapshot(void)
 	backend_clean_snapshot_callback(&param);
 }
 
+/*
+ * record_stats_undo_log() -
+ *
+ * Stores table stats undo log.
+ */
+static void
+record_stats_undo_log(Oid dboid, PgStat_StatTabEntry *tabentry)
+{
+	int inspos;
+	PgStat_RelIdEnt db_relid;
+	bool	found = false;
+
+	/* No lock needed since this check doesn't need so strict */
+	Assert(StatsShmem->undoinsptr < StatsShmem->nundologs);
+	if (StatsShmem->nundoers == 0)
+		return;
+
+	/*
+	 * We need at most one undo entry for a relation since the last undoer
+	 * comes. undo_logged_tables is cleard when new undoer comes.
+	 */
+	if (!undo_logged_tables)
+	{
+		HASHCTL		ctl;
+
+		/* Create undo record hash */
+		ctl.keysize = ctl.entrysize = sizeof(PgStat_RelIdEnt);
+		undo_logged_tables = hash_create("pgstat undo record hash",
+									128, &ctl, HASH_ELEM | HASH_BLOBS);
+	}
+	db_relid.dboid = dboid;
+	db_relid.reloid = tabentry->tableid;
+	hash_search(undo_logged_tables, &db_relid, HASH_ENTER, &found);
+
+	if (found)
+		return;
+
+	inspos = StatsShmem->undoinsptr;
+	undodata[inspos].dboid = dboid;
+	memcpy(&undodata[inspos].ent, tabentry, sizeof(PgStat_TabUndoLogEnt));
+
+	/* Expose the entry just entered. */
+	LWLockAcquire(StatsLock, LW_EXCLUSIVE);
+	if (++StatsShmem->undoinsptr >= StatsShmem->nundologs)
+	{
+		StatsShmem->undoinsptr = 0;
+		StatsShmem->undoinsround++; /* Don't care for overflow */
+	}
+	LWLockRelease(StatsLock);
+}
 
 /* ----------
  * pgstat_recv_tabstat() -
@@ -5677,6 +5848,7 @@ pgstat_recv_tabstat(PgStat_MsgTabstat *msg, int len)
 		}
 		else
 		{
+			record_stats_undo_log(msg->m_databaseid, tabentry);
 			/*
 			 * Otherwise add the values to the existing entry.
 			 */
diff --git a/src/backend/utils/adt/pgstatfuncs.c b/src/backend/utils/adt/pgstatfuncs.c
index e95e347184..62aa520c1e 100644
--- a/src/backend/utils/adt/pgstatfuncs.c
+++ b/src/backend/utils/adt/pgstatfuncs.c
@@ -1176,7 +1176,7 @@ pg_stat_get_db_xact_commit(PG_FUNCTION_ARGS)
 	int64		result;
 	PgStat_StatDBEntry *dbentry;
 
-	if ((dbentry = pgstat_fetch_stat_dbentry(dbid)) == NULL)
+	if ((dbentry = pgstat_fetch_stat_dbentry(dbid, true)) == NULL)
 		result = 0;
 	else
 		result = (int64) (dbentry->n_xact_commit);
@@ -1192,7 +1192,7 @@ pg_stat_get_db_xact_rollback(PG_FUNCTION_ARGS)
 	int64		result;
 	PgStat_StatDBEntry *dbentry;
 
-	if ((dbentry = pgstat_fetch_stat_dbentry(dbid)) == NULL)
+	if ((dbentry = pgstat_fetch_stat_dbentry(dbid, true)) == NULL)
 		result = 0;
 	else
 		result = (int64) (dbentry->n_xact_rollback);
@@ -1208,7 +1208,7 @@ pg_stat_get_db_blocks_fetched(PG_FUNCTION_ARGS)
 	int64		result;
 	PgStat_StatDBEntry *dbentry;
 
-	if ((dbentry = pgstat_fetch_stat_dbentry(dbid)) == NULL)
+	if ((dbentry = pgstat_fetch_stat_dbentry(dbid, true)) == NULL)
 		result = 0;
 	else
 		result = (int64) (dbentry->n_blocks_fetched);
@@ -1224,7 +1224,7 @@ pg_stat_get_db_blocks_hit(PG_FUNCTION_ARGS)
 	int64		result;
 	PgStat_StatDBEntry *dbentry;
 
-	if ((dbentry = pgstat_fetch_stat_dbentry(dbid)) == NULL)
+	if ((dbentry = pgstat_fetch_stat_dbentry(dbid, true)) == NULL)
 		result = 0;
 	else
 		result = (int64) (dbentry->n_blocks_hit);
@@ -1240,7 +1240,7 @@ pg_stat_get_db_tuples_returned(PG_FUNCTION_ARGS)
 	int64		result;
 	PgStat_StatDBEntry *dbentry;
 
-	if ((dbentry = pgstat_fetch_stat_dbentry(dbid)) == NULL)
+	if ((dbentry = pgstat_fetch_stat_dbentry(dbid, true)) == NULL)
 		result = 0;
 	else
 		result = (int64) (dbentry->n_tuples_returned);
@@ -1256,7 +1256,7 @@ pg_stat_get_db_tuples_fetched(PG_FUNCTION_ARGS)
 	int64		result;
 	PgStat_StatDBEntry *dbentry;
 
-	if ((dbentry = pgstat_fetch_stat_dbentry(dbid)) == NULL)
+	if ((dbentry = pgstat_fetch_stat_dbentry(dbid, true)) == NULL)
 		result = 0;
 	else
 		result = (int64) (dbentry->n_tuples_fetched);
@@ -1272,7 +1272,7 @@ pg_stat_get_db_tuples_inserted(PG_FUNCTION_ARGS)
 	int64		result;
 	PgStat_StatDBEntry *dbentry;
 
-	if ((dbentry = pgstat_fetch_stat_dbentry(dbid)) == NULL)
+	if ((dbentry = pgstat_fetch_stat_dbentry(dbid, true)) == NULL)
 		result = 0;
 	else
 		result = (int64) (dbentry->n_tuples_inserted);
@@ -1288,7 +1288,7 @@ pg_stat_get_db_tuples_updated(PG_FUNCTION_ARGS)
 	int64		result;
 	PgStat_StatDBEntry *dbentry;
 
-	if ((dbentry = pgstat_fetch_stat_dbentry(dbid)) == NULL)
+	if ((dbentry = pgstat_fetch_stat_dbentry(dbid, true)) == NULL)
 		result = 0;
 	else
 		result = (int64) (dbentry->n_tuples_updated);
@@ -1304,7 +1304,7 @@ pg_stat_get_db_tuples_deleted(PG_FUNCTION_ARGS)
 	int64		result;
 	PgStat_StatDBEntry *dbentry;
 
-	if ((dbentry = pgstat_fetch_stat_dbentry(dbid)) == NULL)
+	if ((dbentry = pgstat_fetch_stat_dbentry(dbid, true)) == NULL)
 		result = 0;
 	else
 		result = (int64) (dbentry->n_tuples_deleted);
@@ -1319,7 +1319,7 @@ pg_stat_get_db_stat_reset_time(PG_FUNCTION_ARGS)
 	TimestampTz result;
 	PgStat_StatDBEntry *dbentry;
 
-	if ((dbentry = pgstat_fetch_stat_dbentry(dbid)) == NULL)
+	if ((dbentry = pgstat_fetch_stat_dbentry(dbid, true)) == NULL)
 		result = 0;
 	else
 		result = dbentry->stat_reset_timestamp;
@@ -1337,7 +1337,7 @@ pg_stat_get_db_temp_files(PG_FUNCTION_ARGS)
 	int64		result;
 	PgStat_StatDBEntry *dbentry;
 
-	if ((dbentry = pgstat_fetch_stat_dbentry(dbid)) == NULL)
+	if ((dbentry = pgstat_fetch_stat_dbentry(dbid, true)) == NULL)
 		result = 0;
 	else
 		result = dbentry->n_temp_files;
@@ -1353,7 +1353,7 @@ pg_stat_get_db_temp_bytes(PG_FUNCTION_ARGS)
 	int64		result;
 	PgStat_StatDBEntry *dbentry;
 
-	if ((dbentry = pgstat_fetch_stat_dbentry(dbid)) == NULL)
+	if ((dbentry = pgstat_fetch_stat_dbentry(dbid, true)) == NULL)
 		result = 0;
 	else
 		result = dbentry->n_temp_bytes;
@@ -1368,7 +1368,7 @@ pg_stat_get_db_conflict_tablespace(PG_FUNCTION_ARGS)
 	int64		result;
 	PgStat_StatDBEntry *dbentry;
 
-	if ((dbentry = pgstat_fetch_stat_dbentry(dbid)) == NULL)
+	if ((dbentry = pgstat_fetch_stat_dbentry(dbid, true)) == NULL)
 		result = 0;
 	else
 		result = (int64) (dbentry->n_conflict_tablespace);
@@ -1383,7 +1383,7 @@ pg_stat_get_db_conflict_lock(PG_FUNCTION_ARGS)
 	int64		result;
 	PgStat_StatDBEntry *dbentry;
 
-	if ((dbentry = pgstat_fetch_stat_dbentry(dbid)) == NULL)
+	if ((dbentry = pgstat_fetch_stat_dbentry(dbid, true)) == NULL)
 		result = 0;
 	else
 		result = (int64) (dbentry->n_conflict_lock);
@@ -1398,7 +1398,7 @@ pg_stat_get_db_conflict_snapshot(PG_FUNCTION_ARGS)
 	int64		result;
 	PgStat_StatDBEntry *dbentry;
 
-	if ((dbentry = pgstat_fetch_stat_dbentry(dbid)) == NULL)
+	if ((dbentry = pgstat_fetch_stat_dbentry(dbid, true)) == NULL)
 		result = 0;
 	else
 		result = (int64) (dbentry->n_conflict_snapshot);
@@ -1413,7 +1413,7 @@ pg_stat_get_db_conflict_bufferpin(PG_FUNCTION_ARGS)
 	int64		result;
 	PgStat_StatDBEntry *dbentry;
 
-	if ((dbentry = pgstat_fetch_stat_dbentry(dbid)) == NULL)
+	if ((dbentry = pgstat_fetch_stat_dbentry(dbid, true)) == NULL)
 		result = 0;
 	else
 		result = (int64) (dbentry->n_conflict_bufferpin);
@@ -1428,7 +1428,7 @@ pg_stat_get_db_conflict_startup_deadlock(PG_FUNCTION_ARGS)
 	int64		result;
 	PgStat_StatDBEntry *dbentry;
 
-	if ((dbentry = pgstat_fetch_stat_dbentry(dbid)) == NULL)
+	if ((dbentry = pgstat_fetch_stat_dbentry(dbid, true)) == NULL)
 		result = 0;
 	else
 		result = (int64) (dbentry->n_conflict_startup_deadlock);
@@ -1443,7 +1443,7 @@ pg_stat_get_db_conflict_all(PG_FUNCTION_ARGS)
 	int64		result;
 	PgStat_StatDBEntry *dbentry;
 
-	if ((dbentry = pgstat_fetch_stat_dbentry(dbid)) == NULL)
+	if ((dbentry = pgstat_fetch_stat_dbentry(dbid, true)) == NULL)
 		result = 0;
 	else
 		result = (int64) (
@@ -1463,7 +1463,7 @@ pg_stat_get_db_deadlocks(PG_FUNCTION_ARGS)
 	int64		result;
 	PgStat_StatDBEntry *dbentry;
 
-	if ((dbentry = pgstat_fetch_stat_dbentry(dbid)) == NULL)
+	if ((dbentry = pgstat_fetch_stat_dbentry(dbid, true)) == NULL)
 		result = 0;
 	else
 		result = (int64) (dbentry->n_deadlocks);
@@ -1479,7 +1479,7 @@ pg_stat_get_db_blk_read_time(PG_FUNCTION_ARGS)
 	PgStat_StatDBEntry *dbentry;
 
 	/* convert counter from microsec to millisec for display */
-	if ((dbentry = pgstat_fetch_stat_dbentry(dbid)) == NULL)
+	if ((dbentry = pgstat_fetch_stat_dbentry(dbid, true)) == NULL)
 		result = 0;
 	else
 		result = ((double) dbentry->n_block_read_time) / 1000.0;
@@ -1495,7 +1495,7 @@ pg_stat_get_db_blk_write_time(PG_FUNCTION_ARGS)
 	PgStat_StatDBEntry *dbentry;
 
 	/* convert counter from microsec to millisec for display */
-	if ((dbentry = pgstat_fetch_stat_dbentry(dbid)) == NULL)
+	if ((dbentry = pgstat_fetch_stat_dbentry(dbid, true)) == NULL)
 		result = 0;
 	else
 		result = ((double) dbentry->n_block_write_time) / 1000.0;
diff --git a/src/include/pgstat.h b/src/include/pgstat.h
index 88bb1e636b..438a9e2fb9 100644
--- a/src/include/pgstat.h
+++ b/src/include/pgstat.h
@@ -1311,7 +1311,7 @@ extern void pgstat_send_bgwriter(void);
  * generate the pgstat* views.
  * ----------
  */
-extern PgStat_StatDBEntry *pgstat_fetch_stat_dbentry(Oid dbid);
+extern PgStat_StatDBEntry *pgstat_fetch_stat_dbentry(Oid dbid, bool snapshot);
 extern PgStat_StatTabEntry *pgstat_fetch_stat_tabentry(Oid relid);
 extern PgBackendStatus *pgstat_fetch_stat_beentry(int beid);
 extern LocalPgBackendStatus *pgstat_fetch_stat_local_beentry(int beid);
-- 
2.16.3

