]> granicus.if.org Git - postgresql/blobdiff - src/backend/postmaster/pgstat.c
Fix up pgstats counting of live and dead tuples to recognize that committed
[postgresql] / src / backend / postmaster / pgstat.c
index 715de0071c1abf612d0250d7085491476fe2b450..b41a16de44ce86435068597a40e0fa3537ccd08b 100644 (file)
@@ -11,9 +11,9 @@
  *                     - Add a pgstat config column to pg_database, so this
  *                       entire thing can be enabled/disabled on a per db basis.
  *
- *     Copyright (c) 2001-2006, PostgreSQL Global Development Group
+ *     Copyright (c) 2001-2007, PostgreSQL Global Development Group
  *
- *     $PostgreSQL: pgsql/src/backend/postmaster/pgstat.c,v 1.138 2006/08/28 19:38:09 tgl Exp $
+ *     $PostgreSQL: pgsql/src/backend/postmaster/pgstat.c,v 1.156 2007/05/27 03:50:39 tgl Exp $
  * ----------
  */
 #include "postgres.h"
@@ -39,6 +39,7 @@
 
 #include "access/heapam.h"
 #include "access/transam.h"
+#include "access/twophase_rmgr.h"
 #include "access/xact.h"
 #include "catalog/pg_database.h"
 #include "libpq/ip.h"
@@ -98,6 +99,13 @@ bool         pgstat_collect_tuplelevel = false;
 bool           pgstat_collect_blocklevel = false;
 bool           pgstat_collect_querystring = false;
 
+/*
+ * BgWriter global statistics counters (unused in other processes).
+ * Stored directly in a stats message structure so it can be sent
+ * without needing to copy things around.  We assume this inits to zeroes.
+ */
+PgStat_MsgBgWriter BgWriterStats;
+
 /* ----------
  * Local data
  * ----------
@@ -111,33 +119,72 @@ static time_t last_pgstat_start_time;
 static bool pgStatRunningInCollector = false;
 
 /*
- * Place where backends store per-table info to be sent to the collector.
- * We store shared relations separately from non-shared ones, to be able to
- * send them in separate messages.
+ * Structures in which backends store per-table info that's waiting to be
+ * sent to the collector.
+ *
+ * NOTE: once allocated, TabStatusArray structures are never moved or deleted
+ * for the life of the backend.  Also, we zero out the t_id fields of the
+ * contained PgStat_TableStatus structs whenever they are not actively in use.
+ * This allows relcache pgstat_info pointers to be treated as long-lived data,
+ * avoiding repeated searches in pgstat_initstats() when a relation is
+ * repeatedly opened during a transaction.
  */
-typedef struct TabStatArray
+#define TABSTAT_QUANTUM                100                     /* we alloc this many at a time */
+
+typedef struct TabStatusArray
 {
-       int                     tsa_alloc;              /* num allocated */
-       int                     tsa_used;               /* num actually used */
-       PgStat_MsgTabstat **tsa_messages;       /* the array itself */
-} TabStatArray;
+       struct TabStatusArray *tsa_next;        /* link to next array, if any */
+       int                     tsa_used;                               /* # entries currently used */
+       PgStat_TableStatus tsa_entries[TABSTAT_QUANTUM];        /* per-table data */
+} TabStatusArray;
+
+static TabStatusArray *pgStatTabList = NULL;
 
-#define TABSTAT_QUANTUM                4       /* we alloc this many at a time */
+/*
+ * Tuple insertion/deletion counts for an open transaction can't be propagated
+ * into PgStat_TableStatus counters until we know if it is going to commit
+ * or abort.  Hence, we keep these counts in per-subxact structs that live
+ * in TopTransactionContext.  This data structure is designed on the assumption
+ * that subxacts won't usually modify very many tables.
+ */
+typedef struct PgStat_SubXactStatus
+{
+       int                     nest_level;                             /* subtransaction nest level */
+       struct PgStat_SubXactStatus *prev;      /* higher-level subxact if any */
+       PgStat_TableXactStatus *first;          /* head of list for this subxact */
+} PgStat_SubXactStatus;
 
-static TabStatArray RegularTabStat = {0, 0, NULL};
-static TabStatArray SharedTabStat = {0, 0, NULL};
+static PgStat_SubXactStatus *pgStatXactStack = NULL;
 
 static int     pgStatXactCommit = 0;
 static int     pgStatXactRollback = 0;
 
-static TransactionId pgStatDBHashXact = InvalidTransactionId;
+/* Record that's written to 2PC state file when pgstat state is persisted */
+typedef struct TwoPhasePgStatRecord
+{
+       PgStat_Counter tuples_inserted; /* tuples inserted in xact */
+       PgStat_Counter tuples_deleted;  /* tuples deleted in xact */
+       Oid                     t_id;                           /* table's OID */
+       bool            t_shared;                       /* is it a shared catalog? */
+} TwoPhasePgStatRecord;
+
+/*
+ * Info about current "snapshot" of stats file
+ */
+static MemoryContext pgStatLocalContext = NULL;
 static HTAB *pgStatDBHash = NULL;
-static TransactionId pgStatLocalStatusXact = InvalidTransactionId;
 static PgBackendStatus *localBackendStatusTable = NULL;
 static int     localNumBackends = 0;
 
-static volatile bool   need_exit = false;
-static volatile bool   need_statwrite = false;
+/*
+ * Cluster wide statistics, kept in the stats collector.
+ * Contains statistics that are not collected per database
+ * or per table.
+ */
+static PgStat_GlobalStats globalStats;
+
+static volatile bool need_exit = false;
+static volatile bool need_statwrite = false;
 
 
 /* ----------
@@ -154,12 +201,18 @@ static void force_statwrite(SIGNAL_ARGS);
 static void pgstat_beshutdown_hook(int code, Datum arg);
 
 static PgStat_StatDBEntry *pgstat_get_db_entry(Oid databaseid, bool create);
-static void pgstat_drop_database(Oid databaseid);
 static void pgstat_write_statsfile(void);
-static void pgstat_read_statsfile(HTAB **dbhash, Oid onlydb);
+static HTAB *pgstat_read_statsfile(Oid onlydb);
 static void backend_read_statsfile(void);
 static void pgstat_read_current_status(void);
 
+static void pgstat_send_tabstat(PgStat_MsgTabstat *tsmsg);
+static HTAB *pgstat_collect_oids(Oid catalogid);
+
+static PgStat_TableStatus *get_tabstat_entry(Oid rel_id, bool isshared);
+
+static void pgstat_setup_memcxt(void);
+
 static void pgstat_setheader(PgStat_MsgHdr *hdr, StatMsgType mtype);
 static void pgstat_send(void *msg, int len);
 
@@ -170,6 +223,7 @@ static void pgstat_recv_resetcounter(PgStat_MsgResetcounter *msg, int len);
 static void pgstat_recv_autovac(PgStat_MsgAutovacStart *msg, int len);
 static void pgstat_recv_vacuum(PgStat_MsgVacuum *msg, int len);
 static void pgstat_recv_analyze(PgStat_MsgAnalyze *msg, int len);
+static void pgstat_recv_bgwriter(PgStat_MsgBgWriter *msg, int len);
 
 
 /* ------------------------------------------------------------
@@ -199,13 +253,13 @@ pgstat_init(void)
        char            test_byte;
        int                     sel_res;
        int                     tries = 0;
-       
+
 #define TESTBYTEVAL ((char) 199)
 
        /*
         * Force start of collector daemon if something to collect.  Note that
-        * pgstat_collect_querystring is now an independent facility that does
-        * not require the collector daemon.
+        * pgstat_collect_querystring is now an independent facility that does not
+        * require the collector daemon.
         */
        if (pgstat_collect_tuplelevel ||
                pgstat_collect_blocklevel)
@@ -262,8 +316,8 @@ pgstat_init(void)
 
                if (++tries > 1)
                        ereport(LOG,
-                               (errmsg("trying another address for the statistics collector")));
-               
+                       (errmsg("trying another address for the statistics collector")));
+
                /*
                 * Create the socket.
                 */
@@ -479,7 +533,6 @@ pgstat_forkexec(void)
 
        return postmaster_forkexec(ac, av);
 }
-
 #endif   /* EXEC_BACKEND */
 
 
@@ -572,6 +625,10 @@ pgstat_start(void)
        return 0;
 }
 
+void allow_immediate_pgstat_restart(void)
+{
+               last_pgstat_start_time = 0;
+}
 
 /* ------------------------------------------------------------
  * Public functions used by backends follow
@@ -582,70 +639,136 @@ pgstat_start(void)
 /* ----------
  * pgstat_report_tabstat() -
  *
- *     Called from tcop/postgres.c to send the so far collected
- *     per table access statistics to the collector.
+ *     Called from tcop/postgres.c to send the so far collected per-table
+ *     access statistics to the collector.  Note that this is called only
+ *     when not within a transaction, so it is fair to use transaction stop
+ *     time as an approximation of current time.
  * ----------
  */
 void
-pgstat_report_tabstat(void)
+pgstat_report_tabstat(bool force)
 {
+       /* we assume this inits to all zeroes: */
+       static const PgStat_TableCounts all_zeroes;
+       static TimestampTz last_report = 0;     
+
+       TimestampTz now;
+       PgStat_MsgTabstat regular_msg;
+       PgStat_MsgTabstat shared_msg;
+       TabStatusArray *tsa;
        int                     i;
 
-       if (pgStatSock < 0 ||
-               (!pgstat_collect_tuplelevel &&
-                !pgstat_collect_blocklevel))
-       {
-               /* Not reporting stats, so just flush whatever we have */
-               RegularTabStat.tsa_used = 0;
-               SharedTabStat.tsa_used = 0;
+       /* Don't expend a clock check if nothing to do */
+       if (pgStatTabList == NULL ||
+               pgStatTabList->tsa_used == 0)
                return;
-       }
 
        /*
-        * For each message buffer used during the last query set the header
-        * fields and send it out.
+        * Don't send a message unless it's been at least PGSTAT_STAT_INTERVAL
+        * msec since we last sent one, or the caller wants to force stats out.
         */
-       for (i = 0; i < RegularTabStat.tsa_used; i++)
+       now = GetCurrentTransactionStopTimestamp();
+       if (!force &&
+               !TimestampDifferenceExceeds(last_report, now, PGSTAT_STAT_INTERVAL))
+               return;
+       last_report = now;
+
+       /*
+        * Scan through the TabStatusArray struct(s) to find tables that actually
+        * have counts, and build messages to send.  We have to separate shared
+        * relations from regular ones because the databaseid field in the
+        * message header has to depend on that.
+        */
+       regular_msg.m_databaseid = MyDatabaseId;
+       shared_msg.m_databaseid = InvalidOid;
+       regular_msg.m_nentries = 0;
+       shared_msg.m_nentries = 0;
+
+       for (tsa = pgStatTabList; tsa != NULL; tsa = tsa->tsa_next)
        {
-               PgStat_MsgTabstat *tsmsg = RegularTabStat.tsa_messages[i];
-               int                     n;
-               int                     len;
+               for (i = 0; i < tsa->tsa_used; i++)
+               {
+                       PgStat_TableStatus *entry = &tsa->tsa_entries[i];
+                       PgStat_MsgTabstat *this_msg;
+                       PgStat_TableEntry *this_ent;
+
+                       /* Shouldn't have any pending transaction-dependent counts */
+                       Assert(entry->trans == NULL);
+
+                       /*
+                        * Ignore entries that didn't accumulate any actual counts,
+                        * such as indexes that were opened by the planner but not used.
+                        */
+                       if (memcmp(&entry->t_counts, &all_zeroes,
+                                          sizeof(PgStat_TableCounts)) == 0)
+                               continue;
+                       /*
+                        * OK, insert data into the appropriate message, and send if full.
+                        */
+                       this_msg = entry->t_shared ? &shared_msg : &regular_msg;
+                       this_ent = &this_msg->m_entry[this_msg->m_nentries];
+                       this_ent->t_id = entry->t_id;
+                       memcpy(&this_ent->t_counts, &entry->t_counts,
+                                  sizeof(PgStat_TableCounts));
+                       if (++this_msg->m_nentries >= PGSTAT_NUM_TABENTRIES)
+                       {
+                               pgstat_send_tabstat(this_msg);
+                               this_msg->m_nentries = 0;
+                       }
+               }
+               /* zero out TableStatus structs after use */
+               MemSet(tsa->tsa_entries, 0,
+                          tsa->tsa_used * sizeof(PgStat_TableStatus));
+               tsa->tsa_used = 0;
+       }
+
+       /*
+        * Send partial messages.  If force is true, make sure that any pending
+        * xact commit/abort gets counted, even if no table stats to send.
+        */
+       if (regular_msg.m_nentries > 0 ||
+               (force && (pgStatXactCommit > 0 || pgStatXactRollback > 0)))
+               pgstat_send_tabstat(&regular_msg);
+       if (shared_msg.m_nentries > 0)
+               pgstat_send_tabstat(&shared_msg);
+}
+
+/*
+ * Subroutine for pgstat_report_tabstat: finish and send a tabstat message
+ */
+static void
+pgstat_send_tabstat(PgStat_MsgTabstat *tsmsg)
+{
+       int                     n;
+       int                     len;
 
-               n = tsmsg->m_nentries;
-               len = offsetof(PgStat_MsgTabstat, m_entry[0]) +
-                       n * sizeof(PgStat_TableEntry);
+       /* It's unlikely we'd get here with no socket, but maybe not impossible */
+       if (pgStatSock < 0)
+               return;
 
+       /*
+        * Report accumulated xact commit/rollback whenever we send a normal
+        * tabstat message
+        */
+       if (OidIsValid(tsmsg->m_databaseid))
+       {
                tsmsg->m_xact_commit = pgStatXactCommit;
                tsmsg->m_xact_rollback = pgStatXactRollback;
                pgStatXactCommit = 0;
                pgStatXactRollback = 0;
-
-               pgstat_setheader(&tsmsg->m_hdr, PGSTAT_MTYPE_TABSTAT);
-               tsmsg->m_databaseid = MyDatabaseId;
-               pgstat_send(tsmsg, len);
        }
-       RegularTabStat.tsa_used = 0;
-
-       /* Ditto, for shared relations */
-       for (i = 0; i < SharedTabStat.tsa_used; i++)
+       else
        {
-               PgStat_MsgTabstat *tsmsg = SharedTabStat.tsa_messages[i];
-               int                     n;
-               int                     len;
-
-               n = tsmsg->m_nentries;
-               len = offsetof(PgStat_MsgTabstat, m_entry[0]) +
-                       n * sizeof(PgStat_TableEntry);
-
-               /* We don't report transaction commit/abort here */
                tsmsg->m_xact_commit = 0;
                tsmsg->m_xact_rollback = 0;
-
-               pgstat_setheader(&tsmsg->m_hdr, PGSTAT_MTYPE_TABSTAT);
-               tsmsg->m_databaseid = InvalidOid;
-               pgstat_send(tsmsg, len);
        }
-       SharedTabStat.tsa_used = 0;
+
+       n = tsmsg->m_nentries;
+       len = offsetof(PgStat_MsgTabstat, m_entry[0]) +
+               n * sizeof(PgStat_TableEntry);
+
+       pgstat_setheader(&tsmsg->m_hdr, PGSTAT_MTYPE_TABSTAT);
+       pgstat_send(tsmsg, len);
 }
 
 
@@ -658,10 +781,7 @@ pgstat_report_tabstat(void)
 void
 pgstat_vacuum_tabstat(void)
 {
-       List       *oidlist;
-       Relation        rel;
-       HeapScanDesc scan;
-       HeapTuple       tup;
+       HTAB       *htab;
        PgStat_MsgTabpurge msg;
        HASH_SEQ_STATUS hstat;
        PgStat_StatDBEntry *dbentry;
@@ -680,15 +800,7 @@ pgstat_vacuum_tabstat(void)
        /*
         * Read pg_database and make a list of OIDs of all existing databases
         */
-       oidlist = NIL;
-       rel = heap_open(DatabaseRelationId, AccessShareLock);
-       scan = heap_beginscan(rel, SnapshotNow, 0, NULL);
-       while ((tup = heap_getnext(scan, ForwardScanDirection)) != NULL)
-       {
-               oidlist = lappend_oid(oidlist, HeapTupleGetOid(tup));
-       }
-       heap_endscan(scan);
-       heap_close(rel, AccessShareLock);
+       htab = pgstat_collect_oids(DatabaseRelationId);
 
        /*
         * Search the database hash table for dead databases and tell the
@@ -699,12 +811,14 @@ pgstat_vacuum_tabstat(void)
        {
                Oid                     dbid = dbentry->databaseid;
 
-               if (!list_member_oid(oidlist, dbid))
+               CHECK_FOR_INTERRUPTS();
+
+               if (hash_search(htab, (void *) &dbid, HASH_FIND, NULL) == NULL)
                        pgstat_drop_database(dbid);
        }
 
        /* Clean up */
-       list_free(oidlist);
+       hash_destroy(htab);
 
        /*
         * Lookup our own database entry; if not found, nothing more to do.
@@ -718,15 +832,7 @@ pgstat_vacuum_tabstat(void)
        /*
         * Similarly to above, make a list of all known relations in this DB.
         */
-       oidlist = NIL;
-       rel = heap_open(RelationRelationId, AccessShareLock);
-       scan = heap_beginscan(rel, SnapshotNow, 0, NULL);
-       while ((tup = heap_getnext(scan, ForwardScanDirection)) != NULL)
-       {
-               oidlist = lappend_oid(oidlist, HeapTupleGetOid(tup));
-       }
-       heap_endscan(scan);
-       heap_close(rel, AccessShareLock);
+       htab = pgstat_collect_oids(RelationRelationId);
 
        /*
         * Initialize our messages table counter to zero
@@ -739,13 +845,17 @@ pgstat_vacuum_tabstat(void)
        hash_seq_init(&hstat, dbentry->tables);
        while ((tabentry = (PgStat_StatTabEntry *) hash_seq_search(&hstat)) != NULL)
        {
-               if (list_member_oid(oidlist, tabentry->tableid))
+               Oid                     tabid = tabentry->tableid;
+
+               CHECK_FOR_INTERRUPTS();
+
+               if (hash_search(htab, (void *) &tabid, HASH_FIND, NULL) != NULL)
                        continue;
 
                /*
                 * Not there, so add this table's Oid to the message
                 */
-               msg.m_tableid[msg.m_nentries++] = tabentry->tableid;
+               msg.m_tableid[msg.m_nentries++] = tabid;
 
                /*
                 * If the message is full, send it out and reinitialize to empty
@@ -777,7 +887,50 @@ pgstat_vacuum_tabstat(void)
        }
 
        /* Clean up */
-       list_free(oidlist);
+       hash_destroy(htab);
+}
+
+
+/* ----------
+ * pgstat_collect_oids() -
+ *
+ *     Collect the OIDs of either all databases or all tables, according to
+ *     the parameter, into a temporary hash table.  Caller should hash_destroy
+ *     the result when done with it.
+ * ----------
+ */
+static HTAB *
+pgstat_collect_oids(Oid catalogid)
+{
+       HTAB       *htab;
+       HASHCTL         hash_ctl;
+       Relation        rel;
+       HeapScanDesc scan;
+       HeapTuple       tup;
+
+       memset(&hash_ctl, 0, sizeof(hash_ctl));
+       hash_ctl.keysize = sizeof(Oid);
+       hash_ctl.entrysize = sizeof(Oid);
+       hash_ctl.hash = oid_hash;
+       htab = hash_create("Temporary table of OIDs",
+                                          PGSTAT_TAB_HASH_SIZE,
+                                          &hash_ctl,
+                                          HASH_ELEM | HASH_FUNCTION);
+
+       rel = heap_open(catalogid, AccessShareLock);
+       scan = heap_beginscan(rel, SnapshotNow, 0, NULL);
+       while ((tup = heap_getnext(scan, ForwardScanDirection)) != NULL)
+       {
+               Oid             thisoid = HeapTupleGetOid(tup);
+
+               CHECK_FOR_INTERRUPTS();
+
+               (void) hash_search(htab, (void *) &thisoid, HASH_ENTER, NULL);
+       }
+       heap_endscan(scan);
+       heap_close(rel, AccessShareLock);
+
+       return htab;
 }
 
 
@@ -789,7 +942,7 @@ pgstat_vacuum_tabstat(void)
  *     via future invocations of pgstat_vacuum_tabstat().)
  * ----------
  */
-static void
+void
 pgstat_drop_database(Oid databaseid)
 {
        PgStat_MsgDropdb msg;
@@ -823,7 +976,7 @@ pgstat_drop_relation(Oid relid)
        msg.m_tableid[0] = relid;
        msg.m_nentries = 1;
 
-       len = offsetof(PgStat_MsgTabpurge, m_tableid[0]) + sizeof(Oid);
+       len = offsetof(PgStat_MsgTabpurge, m_tableid[0]) +sizeof(Oid);
 
        pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_TABPURGE);
        msg.m_databaseid = MyDatabaseId;
@@ -900,7 +1053,7 @@ pgstat_report_vacuum(Oid tableoid, bool shared,
        msg.m_databaseid = shared ? InvalidOid : MyDatabaseId;
        msg.m_tableoid = tableoid;
        msg.m_analyze = analyze;
-       msg.m_autovacuum = IsAutoVacuumProcess(); /* is this autovacuum? */
+       msg.m_autovacuum = IsAutoVacuumWorkerProcess(); /* is this autovacuum? */
        msg.m_vacuumtime = GetCurrentTimestamp();
        msg.m_tuples = tuples;
        pgstat_send(&msg, sizeof(msg));
@@ -925,7 +1078,7 @@ pgstat_report_analyze(Oid tableoid, bool shared, PgStat_Counter livetuples,
        pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_ANALYZE);
        msg.m_databaseid = shared ? InvalidOid : MyDatabaseId;
        msg.m_tableoid = tableoid;
-       msg.m_autovacuum = IsAutoVacuumProcess(); /* is this autovacuum? */
+       msg.m_autovacuum = IsAutoVacuumWorkerProcess(); /* is this autovacuum? */
        msg.m_analyzetime = GetCurrentTimestamp();
        msg.m_live_tuples = livetuples;
        msg.m_dead_tuples = deadtuples;
@@ -951,184 +1104,487 @@ pgstat_ping(void)
        pgstat_send(&msg, sizeof(msg));
 }
 
-/*
- * Enlarge a TabStatArray
- */
-static void
-more_tabstat_space(TabStatArray *tsarr)
-{
-       PgStat_MsgTabstat *newMessages;
-       PgStat_MsgTabstat **msgArray;
-       int                     newAlloc;
-       int                     i;
-
-       AssertArg(PointerIsValid(tsarr));
-
-       newAlloc = tsarr->tsa_alloc + TABSTAT_QUANTUM;
-
-       /* Create (another) quantum of message buffers */
-       newMessages = (PgStat_MsgTabstat *)
-               MemoryContextAllocZero(TopMemoryContext,
-                                                          sizeof(PgStat_MsgTabstat) * TABSTAT_QUANTUM);
-
-       /* Create or enlarge the pointer array */
-       if (tsarr->tsa_messages == NULL)
-               msgArray = (PgStat_MsgTabstat **)
-                       MemoryContextAlloc(TopMemoryContext,
-                                                          sizeof(PgStat_MsgTabstat *) * newAlloc);
-       else
-               msgArray = (PgStat_MsgTabstat **)
-                       repalloc(tsarr->tsa_messages,
-                                        sizeof(PgStat_MsgTabstat *) * newAlloc);
-
-       for (i = 0; i < TABSTAT_QUANTUM; i++)
-               msgArray[tsarr->tsa_alloc + i] = newMessages++;
-       tsarr->tsa_messages = msgArray;
-       tsarr->tsa_alloc = newAlloc;
-
-       Assert(tsarr->tsa_used < tsarr->tsa_alloc);
-}
 
 /* ----------
  * pgstat_initstats() -
  *
- *     Called from various places usually dealing with initialization
- *     of Relation or Scan structures. The data placed into these
- *     structures from here tell where later to count for buffer reads,
- *     scans and tuples fetched.
+ *     Initialize a relcache entry to count access statistics.
+ *     Called whenever a relation is opened.
+ *
+ *     We assume that a relcache entry's pgstat_info field is zeroed by
+ *     relcache.c when the relcache entry is made; thereafter it is long-lived
+ *     data.  We can avoid repeated searches of the TabStatus arrays when the
+ *     same relation is touched repeatedly within a transaction.
  * ----------
  */
 void
-pgstat_initstats(PgStat_Info *stats, Relation rel)
+pgstat_initstats(Relation rel)
 {
        Oid                     rel_id = rel->rd_id;
-       PgStat_TableEntry *useent;
-       TabStatArray *tsarr;
-       PgStat_MsgTabstat *tsmsg;
-       int                     mb;
-       int                     i;
+       char            relkind = rel->rd_rel->relkind;
 
-       /*
-        * Initialize data not to count at all.
-        */
-       stats->tabentry = NULL;
+       /* We only count stats for things that have storage */
+       if (!(relkind == RELKIND_RELATION ||
+                 relkind == RELKIND_INDEX ||
+                 relkind == RELKIND_TOASTVALUE))
+       {
+               rel->pgstat_info = NULL;
+               return;
+       }
 
        if (pgStatSock < 0 ||
                !(pgstat_collect_tuplelevel ||
                  pgstat_collect_blocklevel))
+       {
+               /* We're not counting at all */
+               rel->pgstat_info = NULL;
                return;
+       }
 
-       tsarr = rel->rd_rel->relisshared ? &SharedTabStat : &RegularTabStat;
+       /*
+        * If we already set up this relation in the current transaction,
+        * nothing to do.
+        */
+       if (rel->pgstat_info != NULL &&
+               rel->pgstat_info->t_id == rel_id)
+               return;
+
+       /* Else find or make the PgStat_TableStatus entry, and update link */
+       rel->pgstat_info = get_tabstat_entry(rel_id, rel->rd_rel->relisshared);
+}
+
+/*
+ * get_tabstat_entry - find or create a PgStat_TableStatus entry for rel
+ */
+static PgStat_TableStatus *
+get_tabstat_entry(Oid rel_id, bool isshared)
+{
+       PgStat_TableStatus *entry;
+       TabStatusArray *tsa;
+       TabStatusArray *prev_tsa;
+       int                     i;
 
        /*
-        * Search the already-used message slots for this relation.
+        * Search the already-used tabstat slots for this relation.
         */
-       for (mb = 0; mb < tsarr->tsa_used; mb++)
+       prev_tsa = NULL;
+       for (tsa = pgStatTabList; tsa != NULL; prev_tsa = tsa, tsa = tsa->tsa_next)
        {
-               tsmsg = tsarr->tsa_messages[mb];
-
-               for (i = tsmsg->m_nentries; --i >= 0;)
+               for (i = 0; i < tsa->tsa_used; i++)
                {
-                       if (tsmsg->m_entry[i].t_id == rel_id)
-                       {
-                               stats->tabentry = (void *) &(tsmsg->m_entry[i]);
-                               return;
-                       }
+                       entry = &tsa->tsa_entries[i];
+                       if (entry->t_id == rel_id)
+                               return entry;
                }
 
-               if (tsmsg->m_nentries >= PGSTAT_NUM_TABENTRIES)
-                       continue;
-
-               /*
-                * Not found, but found a message buffer with an empty slot instead.
-                * Fine, let's use this one.
-                */
-               i = tsmsg->m_nentries++;
-               useent = &tsmsg->m_entry[i];
-               MemSet(useent, 0, sizeof(PgStat_TableEntry));
-               useent->t_id = rel_id;
-               stats->tabentry = (void *) useent;
-               return;
+               if (tsa->tsa_used < TABSTAT_QUANTUM)
+               {
+                       /*
+                        * It must not be present, but we found a free slot instead.
+                        * Fine, let's use this one.  We assume the entry was already
+                        * zeroed, either at creation or after last use.
+                        */
+                       entry = &tsa->tsa_entries[tsa->tsa_used++];
+                       entry->t_id = rel_id;
+                       entry->t_shared = isshared;
+                       return entry;
+               }
        }
 
        /*
-        * If we ran out of message buffers, we just allocate more.
+        * We ran out of tabstat slots, so allocate more.  Be sure they're zeroed.
         */
-       if (tsarr->tsa_used >= tsarr->tsa_alloc)
-               more_tabstat_space(tsarr);
+       tsa = (TabStatusArray *) MemoryContextAllocZero(TopMemoryContext,
+                                                                                                       sizeof(TabStatusArray));
+       if (prev_tsa)
+               prev_tsa->tsa_next = tsa;
+       else
+               pgStatTabList = tsa;
 
        /*
-        * Use the first entry of the next message buffer.
+        * Use the first entry of the new TabStatusArray.
         */
-       mb = tsarr->tsa_used++;
-       tsmsg = tsarr->tsa_messages[mb];
-       tsmsg->m_nentries = 1;
-       useent = &tsmsg->m_entry[0];
-       MemSet(useent, 0, sizeof(PgStat_TableEntry));
-       useent->t_id = rel_id;
-       stats->tabentry = (void *) useent;
+       entry = &tsa->tsa_entries[tsa->tsa_used++];
+       entry->t_id = rel_id;
+       entry->t_shared = isshared;
+       return entry;
+}
+
+/*
+ * get_tabstat_stack_level - add a new (sub)transaction stack entry if needed
+ */
+static PgStat_SubXactStatus *
+get_tabstat_stack_level(int nest_level)
+{
+       PgStat_SubXactStatus *xact_state;
+
+       xact_state = pgStatXactStack;
+       if (xact_state == NULL || xact_state->nest_level != nest_level)
+       {
+               xact_state = (PgStat_SubXactStatus *)
+                       MemoryContextAlloc(TopTransactionContext,
+                                                          sizeof(PgStat_SubXactStatus));
+               xact_state->nest_level = nest_level;
+               xact_state->prev = pgStatXactStack;
+               xact_state->first = NULL;
+               pgStatXactStack = xact_state;
+       }
+       return xact_state;
+}
+
+/*
+ * add_tabstat_xact_level - add a new (sub)transaction state record
+ */
+static void
+add_tabstat_xact_level(PgStat_TableStatus *pgstat_info, int nest_level)
+{
+       PgStat_SubXactStatus *xact_state;
+       PgStat_TableXactStatus *trans;
+
+       /*
+        * If this is the first rel to be modified at the current nest level,
+        * we first have to push a transaction stack entry.
+        */
+       xact_state = get_tabstat_stack_level(nest_level);
+
+       /* Now make a per-table stack entry */
+       trans = (PgStat_TableXactStatus *)
+               MemoryContextAllocZero(TopTransactionContext,
+                                                          sizeof(PgStat_TableXactStatus));
+       trans->nest_level = nest_level;
+       trans->upper = pgstat_info->trans;
+       trans->parent = pgstat_info;
+       trans->next = xact_state->first;
+       xact_state->first = trans;
+       pgstat_info->trans = trans;
+}
+
+/*
+ * pgstat_count_heap_insert - count a tuple insertion
+ */
+void
+pgstat_count_heap_insert(Relation rel)
+{
+       PgStat_TableStatus *pgstat_info = rel->pgstat_info;
+
+       if (pgstat_collect_tuplelevel && pgstat_info != NULL)
+       {
+               int             nest_level = GetCurrentTransactionNestLevel();
+
+               /* t_tuples_inserted is nontransactional, so just advance it */
+               pgstat_info->t_counts.t_tuples_inserted++;
+
+               /* We have to log the transactional effect at the proper level */
+               if (pgstat_info->trans == NULL ||
+                       pgstat_info->trans->nest_level != nest_level)
+                       add_tabstat_xact_level(pgstat_info, nest_level);
+
+               pgstat_info->trans->tuples_inserted++;
+       }
+}
+
+/*
+ * pgstat_count_heap_update - count a tuple update
+ */
+void
+pgstat_count_heap_update(Relation rel)
+{
+       PgStat_TableStatus *pgstat_info = rel->pgstat_info;
+
+       if (pgstat_collect_tuplelevel && pgstat_info != NULL)
+       {
+               int             nest_level = GetCurrentTransactionNestLevel();
+
+               /* t_tuples_updated is nontransactional, so just advance it */
+               pgstat_info->t_counts.t_tuples_updated++;
+
+               /* We have to log the transactional effect at the proper level */
+               if (pgstat_info->trans == NULL ||
+                       pgstat_info->trans->nest_level != nest_level)
+                       add_tabstat_xact_level(pgstat_info, nest_level);
+
+               /* An UPDATE both inserts a new tuple and deletes the old */
+               pgstat_info->trans->tuples_inserted++;
+               pgstat_info->trans->tuples_deleted++;
+       }
+}
+
+/*
+ * pgstat_count_heap_delete - count a tuple deletion
+ */
+void
+pgstat_count_heap_delete(Relation rel)
+{
+       PgStat_TableStatus *pgstat_info = rel->pgstat_info;
+
+       if (pgstat_collect_tuplelevel && pgstat_info != NULL)
+       {
+               int             nest_level = GetCurrentTransactionNestLevel();
+
+               /* t_tuples_deleted is nontransactional, so just advance it */
+               pgstat_info->t_counts.t_tuples_deleted++;
+
+               /* We have to log the transactional effect at the proper level */
+               if (pgstat_info->trans == NULL ||
+                       pgstat_info->trans->nest_level != nest_level)
+                       add_tabstat_xact_level(pgstat_info, nest_level);
+
+               pgstat_info->trans->tuples_deleted++;
+       }
 }
 
 
 /* ----------
- * pgstat_count_xact_commit() -
+ * AtEOXact_PgStat
  *
- *     Called from access/transam/xact.c to count transaction commits.
+ *     Called from access/transam/xact.c at top-level transaction commit/abort.
  * ----------
  */
 void
-pgstat_count_xact_commit(void)
+AtEOXact_PgStat(bool isCommit)
 {
-       if      (!pgstat_collect_tuplelevel &&
-                !pgstat_collect_blocklevel)
-               return;
-
-       pgStatXactCommit++;
+       PgStat_SubXactStatus *xact_state;
 
        /*
-        * If there was no relation activity yet, just make one existing message
-        * buffer used without slots, causing the next report to tell new
-        * xact-counters.
+        * Count transaction commit or abort.  (We use counters, not just bools,
+        * in case the reporting message isn't sent right away.)
         */
-       if (RegularTabStat.tsa_alloc == 0)
-               more_tabstat_space(&RegularTabStat);
+       if (isCommit)
+               pgStatXactCommit++;
+       else
+               pgStatXactRollback++;
 
-       if (RegularTabStat.tsa_used == 0)
+       /*
+        * Transfer transactional insert/update counts into the base tabstat
+        * entries.  We don't bother to free any of the transactional state,
+        * since it's all in TopTransactionContext and will go away anyway.
+        */
+       xact_state = pgStatXactStack;
+       if (xact_state != NULL)
        {
-               RegularTabStat.tsa_used++;
-               RegularTabStat.tsa_messages[0]->m_nentries = 0;
+               PgStat_TableXactStatus *trans;
+
+               Assert(xact_state->nest_level == 1);
+               Assert(xact_state->prev == NULL);
+               for (trans = xact_state->first; trans != NULL; trans = trans->next)
+               {
+                       PgStat_TableStatus *tabstat;
+
+                       Assert(trans->nest_level == 1);
+                       Assert(trans->upper == NULL);
+                       tabstat = trans->parent;
+                       Assert(tabstat->trans == trans);
+                       if (isCommit)
+                       {
+                               tabstat->t_counts.t_new_live_tuples += trans->tuples_inserted;
+                               tabstat->t_counts.t_new_dead_tuples += trans->tuples_deleted;
+                       }
+                       else
+                       {
+                               /* inserted tuples are dead, deleted tuples are unaffected */
+                               tabstat->t_counts.t_new_dead_tuples += trans->tuples_inserted;
+                       }
+                       tabstat->trans = NULL;
+               }
        }
-}
+       pgStatXactStack = NULL;
 
+       /* Make sure any stats snapshot is thrown away */
+       pgstat_clear_snapshot();
+}
 
 /* ----------
- * pgstat_count_xact_rollback() -
+ * AtEOSubXact_PgStat
  *
- *     Called from access/transam/xact.c to count transaction rollbacks.
+ *     Called from access/transam/xact.c at subtransaction commit/abort.
  * ----------
  */
 void
-pgstat_count_xact_rollback(void)
+AtEOSubXact_PgStat(bool isCommit, int nestDepth)
 {
-       if      (!pgstat_collect_tuplelevel &&
-                !pgstat_collect_blocklevel)
-               return;
-
-       pgStatXactRollback++;
+       PgStat_SubXactStatus *xact_state;
 
        /*
-        * If there was no relation activity yet, just make one existing message
-        * buffer used without slots, causing the next report to tell new
-        * xact-counters.
+        * Transfer transactional insert/update counts into the next higher
+        * subtransaction state.
         */
-       if (RegularTabStat.tsa_alloc == 0)
-               more_tabstat_space(&RegularTabStat);
+       xact_state = pgStatXactStack;
+       if (xact_state != NULL &&
+               xact_state->nest_level >= nestDepth)
+       {
+               PgStat_TableXactStatus *trans;
+               PgStat_TableXactStatus *next_trans;
+
+               /* delink xact_state from stack immediately to simplify reuse case */
+               pgStatXactStack = xact_state->prev;
+
+               for (trans = xact_state->first; trans != NULL; trans = next_trans)
+               {
+                       PgStat_TableStatus *tabstat;
+
+                       next_trans = trans->next;
+                       Assert(trans->nest_level == nestDepth);
+                       tabstat = trans->parent;
+                       Assert(tabstat->trans == trans);
+                       if (isCommit)
+                       {
+                               if (trans->upper && trans->upper->nest_level == nestDepth - 1)
+                               {
+                                       trans->upper->tuples_inserted += trans->tuples_inserted;
+                                       trans->upper->tuples_deleted += trans->tuples_deleted;
+                                       tabstat->trans = trans->upper;
+                                       pfree(trans);
+                               }
+                               else
+                               {
+                                       /*
+                                        * When there isn't an immediate parent state, we can
+                                        * just reuse the record instead of going through a
+                                        * palloc/pfree pushup (this works since it's all in
+                                        * TopTransactionContext anyway).  We have to re-link
+                                        * it into the parent level, though, and that might mean
+                                        * pushing a new entry into the pgStatXactStack.
+                                        */
+                                       PgStat_SubXactStatus *upper_xact_state;
+
+                                       upper_xact_state = get_tabstat_stack_level(nestDepth - 1);
+                                       trans->next = upper_xact_state->first;
+                                       upper_xact_state->first = trans;
+                                       trans->nest_level = nestDepth - 1;
+                               }
+                       }
+                       else
+                       {
+                               /*
+                                * On abort, inserted tuples are dead (and can be bounced out
+                                * to the top-level tabstat), deleted tuples are unaffected
+                                */
+                               tabstat->t_counts.t_new_dead_tuples += trans->tuples_inserted;
+                               tabstat->trans = trans->upper;
+                               pfree(trans);
+                       }
+               }
+               pfree(xact_state);
+       }
+}
+
+
+/*
+ * AtPrepare_PgStat
+ *             Save the transactional stats state at 2PC transaction prepare.
+ *
+ * In this phase we just generate 2PC records for all the pending
+ * transaction-dependent stats work.
+ */
+void
+AtPrepare_PgStat(void)
+{
+       PgStat_SubXactStatus *xact_state;
+
+       xact_state = pgStatXactStack;
+       if (xact_state != NULL)
+       {
+               PgStat_TableXactStatus *trans;
+
+               Assert(xact_state->nest_level == 1);
+               Assert(xact_state->prev == NULL);
+               for (trans = xact_state->first; trans != NULL; trans = trans->next)
+               {
+                       PgStat_TableStatus *tabstat;
+                       TwoPhasePgStatRecord record;
+
+                       Assert(trans->nest_level == 1);
+                       Assert(trans->upper == NULL);
+                       tabstat = trans->parent;
+                       Assert(tabstat->trans == trans);
+
+                       record.tuples_inserted = trans->tuples_inserted;
+                       record.tuples_deleted = trans->tuples_deleted;
+                       record.t_id = tabstat->t_id;
+                       record.t_shared = tabstat->t_shared;
+
+                       RegisterTwoPhaseRecord(TWOPHASE_RM_PGSTAT_ID, 0,
+                                                                  &record, sizeof(TwoPhasePgStatRecord));
+               }
+       }
+}
 
-       if (RegularTabStat.tsa_used == 0)
+/*
+ * PostPrepare_PgStat
+ *             Clean up after successful PREPARE.
+ *
+ * All we need do here is unlink the transaction stats state from the
+ * nontransactional state.  The nontransactional action counts will be
+ * reported to the stats collector immediately, while the effects on live
+ * and dead tuple counts are preserved in the 2PC state file.
+ *
+ * Note: AtEOXact_PgStat is not called during PREPARE.
+ */
+void
+PostPrepare_PgStat(void)
+{
+       PgStat_SubXactStatus *xact_state;
+
+       /*
+        * We don't bother to free any of the transactional state,
+        * since it's all in TopTransactionContext and will go away anyway.
+        */
+       xact_state = pgStatXactStack;
+       if (xact_state != NULL)
        {
-               RegularTabStat.tsa_used++;
-               RegularTabStat.tsa_messages[0]->m_nentries = 0;
+               PgStat_TableXactStatus *trans;
+
+               for (trans = xact_state->first; trans != NULL; trans = trans->next)
+               {
+                       PgStat_TableStatus *tabstat;
+
+                       tabstat = trans->parent;
+                       tabstat->trans = NULL;
+               }
        }
+       pgStatXactStack = NULL;
+
+       /* Make sure any stats snapshot is thrown away */
+       pgstat_clear_snapshot();
+}
+
+/*
+ * 2PC processing routine for COMMIT PREPARED case.
+ *
+ * Load the saved counts into our local pgstats state.
+ */
+void
+pgstat_twophase_postcommit(TransactionId xid, uint16 info,
+                                                  void *recdata, uint32 len)
+{
+       TwoPhasePgStatRecord *rec = (TwoPhasePgStatRecord *) recdata;
+       PgStat_TableStatus *pgstat_info;
+
+       /* Find or create a tabstat entry for the rel */
+       pgstat_info = get_tabstat_entry(rec->t_id, rec->t_shared);
+
+       pgstat_info->t_counts.t_new_live_tuples += rec->tuples_inserted;
+       pgstat_info->t_counts.t_new_dead_tuples += rec->tuples_deleted;
+}
+
+/*
+ * 2PC processing routine for ROLLBACK PREPARED case.
+ *
+ * Load the saved counts into our local pgstats state, but treat them
+ * as aborted.
+ */
+void
+pgstat_twophase_postabort(TransactionId xid, uint16 info,
+                                                 void *recdata, uint32 len)
+{
+       TwoPhasePgStatRecord *rec = (TwoPhasePgStatRecord *) recdata;
+       PgStat_TableStatus *pgstat_info;
+
+       /* Find or create a tabstat entry for the rel */
+       pgstat_info = get_tabstat_entry(rec->t_id, rec->t_shared);
+
+       /* inserted tuples are dead, deleted tuples are no-ops */
+       pgstat_info->t_counts.t_new_dead_tuples += rec->tuples_inserted;
 }
 
 
@@ -1254,6 +1710,22 @@ pgstat_fetch_stat_numbackends(void)
        return localNumBackends;
 }
 
+/*
+ * ---------
+ * pgstat_fetch_global() -
+ *
+ *  Support function for the SQL-callable pgstat* functions. Returns
+ *  a pointer to the global statistics struct.
+ * ---------
+ */
+PgStat_GlobalStats *
+pgstat_fetch_global(void)
+{
+       backend_read_statsfile();
+
+       return &globalStats;
+}
+
 
 /* ------------------------------------------------------------
  * Functions for management of the shared-memory PgBackendStatus array
@@ -1319,8 +1791,8 @@ pgstat_bestart(void)
        MyBEEntry = &BackendStatusArray[MyBackendId - 1];
 
        /*
-        * To minimize the time spent modifying the entry, fetch all the
-        * needed data first.
+        * To minimize the time spent modifying the entry, fetch all the needed
+        * data first.
         *
         * If we have a MyProcPort, use its session start time (for consistency,
         * and to save a kernel call).
@@ -1343,18 +1815,20 @@ pgstat_bestart(void)
 
        /*
         * Initialize my status entry, following the protocol of bumping
-        * st_changecount before and after; and make sure it's even afterwards.
-        * We use a volatile pointer here to ensure the compiler doesn't try to
-        * get cute.
+        * st_changecount before and after; and make sure it's even afterwards. We
+        * use a volatile pointer here to ensure the compiler doesn't try to get
+        * cute.
         */
        beentry = MyBEEntry;
-       do {
+       do
+       {
                beentry->st_changecount++;
        } while ((beentry->st_changecount & 1) == 0);
 
        beentry->st_procpid = MyProcPid;
        beentry->st_proc_start_timestamp = proc_start_timestamp;
        beentry->st_activity_start_timestamp = 0;
+       beentry->st_txn_start_timestamp = 0;
        beentry->st_databaseid = MyDatabaseId;
        beentry->st_userid = userid;
        beentry->st_clientaddr = clientaddr;
@@ -1386,12 +1860,12 @@ pgstat_beshutdown_hook(int code, Datum arg)
 {
        volatile PgBackendStatus *beentry = MyBEEntry;
 
-       pgstat_report_tabstat();
+       pgstat_report_tabstat(true);
 
        /*
-        * Clear my status entry, following the protocol of bumping
-        * st_changecount before and after.  We use a volatile pointer here
-        * to ensure the compiler doesn't try to get cute.
+        * Clear my status entry, following the protocol of bumping st_changecount
+        * before and after.  We use a volatile pointer here to ensure the
+        * compiler doesn't try to get cute.
         */
        beentry->st_changecount++;
 
@@ -1420,8 +1894,8 @@ pgstat_report_activity(const char *cmd_str)
                return;
 
        /*
-        * To minimize the time spent modifying the entry, fetch all the
-        * needed data first.
+        * To minimize the time spent modifying the entry, fetch all the needed
+        * data first.
         */
        start_timestamp = GetCurrentStatementStartTimestamp();
 
@@ -1430,8 +1904,8 @@ pgstat_report_activity(const char *cmd_str)
 
        /*
         * Update my status entry, following the protocol of bumping
-        * st_changecount before and after.  We use a volatile pointer here
-        * to ensure the compiler doesn't try to get cute.
+        * st_changecount before and after.  We use a volatile pointer here to
+        * ensure the compiler doesn't try to get cute.
         */
        beentry->st_changecount++;
 
@@ -1443,6 +1917,29 @@ pgstat_report_activity(const char *cmd_str)
        Assert((beentry->st_changecount & 1) == 0);
 }
 
+/*
+ * Set the current transaction start timestamp to the specified
+ * value. If there is no current active transaction, this is signified
+ * by 0.
+ */
+void
+pgstat_report_txn_timestamp(TimestampTz tstamp)
+{
+       volatile PgBackendStatus *beentry = MyBEEntry;
+
+       if (!pgstat_collect_querystring || !beentry)
+               return;
+
+       /*
+        * Update my status entry, following the protocol of bumping
+        * st_changecount before and after.  We use a volatile pointer
+        * here to ensure the compiler doesn't try to get cute.
+        */
+       beentry->st_changecount++;
+       beentry->st_txn_start_timestamp = tstamp;
+       beentry->st_changecount++;
+       Assert((beentry->st_changecount & 1) == 0);
+}
 
 /* ----------
  * pgstat_report_waiting() -
@@ -1480,39 +1977,40 @@ pgstat_report_waiting(bool waiting)
 static void
 pgstat_read_current_status(void)
 {
-       TransactionId topXid = GetTopTransactionId();
        volatile PgBackendStatus *beentry;
+       PgBackendStatus *localtable;
        PgBackendStatus *localentry;
        int                     i;
 
        Assert(!pgStatRunningInCollector);
-       if (TransactionIdEquals(pgStatLocalStatusXact, topXid))
+       if (localBackendStatusTable)
                return;                                 /* already done */
 
-       localBackendStatusTable = (PgBackendStatus *)
-               MemoryContextAlloc(TopTransactionContext,
+       pgstat_setup_memcxt();
+
+       localtable = (PgBackendStatus *)
+               MemoryContextAlloc(pgStatLocalContext,
                                                   sizeof(PgBackendStatus) * MaxBackends);
        localNumBackends = 0;
 
        beentry = BackendStatusArray;
-       localentry = localBackendStatusTable;
+       localentry = localtable;
        for (i = 1; i <= MaxBackends; i++)
        {
                /*
-                * Follow the protocol of retrying if st_changecount changes while
-                * we copy the entry, or if it's odd.  (The check for odd is needed
-                * to cover the case where we are able to completely copy the entry
-                * while the source backend is between increment steps.)  We use a
-                * volatile pointer here to ensure the compiler doesn't try to get
-                * cute.
+                * Follow the protocol of retrying if st_changecount changes while we
+                * copy the entry, or if it's odd.  (The check for odd is needed to
+                * cover the case where we are able to completely copy the entry while
+                * the source backend is between increment steps.)      We use a volatile
+                * pointer here to ensure the compiler doesn't try to get cute.
                 */
                for (;;)
                {
-                       int             save_changecount = beentry->st_changecount;
+                       int                     save_changecount = beentry->st_changecount;
 
                        /*
-                        * XXX if PGBE_ACTIVITY_SIZE is really large, it might be best
-                        * to use strcpy not memcpy for copying the activity string?
+                        * XXX if PGBE_ACTIVITY_SIZE is really large, it might be best to
+                        * use strcpy not memcpy for copying the activity string?
                         */
                        memcpy(localentry, (char *) beentry, sizeof(PgBackendStatus));
 
@@ -1533,7 +2031,8 @@ pgstat_read_current_status(void)
                }
        }
 
-       pgStatLocalStatusXact = topXid;
+       /* Set the pointer only after completion of a valid table */
+       localBackendStatusTable = localtable;
 }
 
 
@@ -1585,11 +2084,43 @@ pgstat_send(void *msg, int len)
 #endif
 }
 
+/* ----------
+ * pgstat_send_bgwriter() -
+ *
+ *      Send bgwriter statistics to the collector
+ * ----------
+ */
+void
+pgstat_send_bgwriter(void)
+{
+       /* We assume this initializes to zeroes */
+       static const PgStat_MsgBgWriter all_zeroes;
+
+       /*
+        * This function can be called even if nothing at all has happened.
+        * In this case, avoid sending a completely empty message to
+        * the stats collector.
+        */
+       if (memcmp(&BgWriterStats, &all_zeroes, sizeof(PgStat_MsgBgWriter)) == 0)
+               return;
+
+       /*
+        * Prepare and send the message
+        */
+       pgstat_setheader(&BgWriterStats.m_hdr, PGSTAT_MTYPE_BGWRITER);
+       pgstat_send(&BgWriterStats, sizeof(BgWriterStats));
+
+       /*
+        * Clear out the statistics buffer, so it can be re-used.
+        */
+       MemSet(&BgWriterStats, 0, sizeof(BgWriterStats));
+}
+
 
 /* ----------
  * PgstatCollectorMain() -
  *
- *     Start up the statistics collector process.  This is the body of the
+ *     Start up the statistics collector process.      This is the body of the
  *     postmaster child process.
  *
  *     The argc/argv parameters are valid only in EXEC_BACKEND case.
@@ -1602,17 +2133,31 @@ PgstatCollectorMain(int argc, char *argv[])
        bool            need_timer = false;
        int                     len;
        PgStat_Msg      msg;
+
+#ifndef WIN32
 #ifdef HAVE_POLL
        struct pollfd input_fd;
 #else
        struct timeval sel_timeout;
        fd_set          rfds;
+#endif
 #endif
 
        IsUnderPostmaster = true;       /* we are a postmaster subprocess now */
 
        MyProcPid = getpid();           /* reset MyProcPid */
 
+       /*
+        * If possible, make this process a group leader, so that the postmaster
+        * can signal any child processes too.  (pgstat probably never has
+        * any child processes, but for consistency we make all postmaster
+        * child processes do this.)
+        */
+#ifdef HAVE_SETSID
+       if (setsid() < 0)
+               elog(FATAL, "setsid() failed: %m");
+#endif
+
        /*
         * Ignore all signals usually bound to some action in the postmaster,
         * except SIGQUIT and SIGALRM.
@@ -1645,20 +2190,20 @@ PgstatCollectorMain(int argc, char *argv[])
        /* Preset the delay between status file writes */
        MemSet(&write_timeout, 0, sizeof(struct itimerval));
        write_timeout.it_value.tv_sec = PGSTAT_STAT_INTERVAL / 1000;
-       write_timeout.it_value.tv_usec = PGSTAT_STAT_INTERVAL % 1000;
+       write_timeout.it_value.tv_usec = (PGSTAT_STAT_INTERVAL % 1000) * 1000;
 
        /*
         * Read in an existing statistics stats file or initialize the stats to
         * zero.
         */
        pgStatRunningInCollector = true;
-       pgstat_read_statsfile(&pgStatDBHash, InvalidOid);
+       pgStatDBHash = pgstat_read_statsfile(InvalidOid);
 
        /*
-        * Setup the descriptor set for select(2).  Since only one bit in the
-        * set ever changes, we need not repeat FD_ZERO each time.
+        * Setup the descriptor set for select(2).      Since only one bit in the set
+        * ever changes, we need not repeat FD_ZERO each time.
         */
-#ifndef HAVE_POLL
+#if !defined(HAVE_POLL) && !defined(WIN32)
        FD_ZERO(&rfds);
 #endif
 
@@ -1666,13 +2211,13 @@ PgstatCollectorMain(int argc, char *argv[])
         * Loop to process messages until we get SIGQUIT or detect ungraceful
         * death of our parent postmaster.
         *
-        * For performance reasons, we don't want to do a PostmasterIsAlive()
-        * test after every message; instead, do it at statwrite time and if
+        * For performance reasons, we don't want to do a PostmasterIsAlive() test
+        * after every message; instead, do it at statwrite time and if
         * select()/poll() is interrupted by timeout.
         */
        for (;;)
        {
-               int             got_data;
+               int                     got_data;
 
                /*
                 * Quit if we get SIGQUIT from the postmaster.
@@ -1681,7 +2226,7 @@ PgstatCollectorMain(int argc, char *argv[])
                        break;
 
                /*
-                * If time to write the stats file, do so.  Note that the alarm
+                * If time to write the stats file, do so.      Note that the alarm
                 * interrupt isn't re-enabled immediately, but only after we next
                 * receive a stats message; so no cycles are wasted when there is
                 * nothing going on.
@@ -1701,12 +2246,14 @@ PgstatCollectorMain(int argc, char *argv[])
                 * Wait for a message to arrive; but not for more than
                 * PGSTAT_SELECT_TIMEOUT seconds. (This determines how quickly we will
                 * shut down after an ungraceful postmaster termination; so it needn't
-                * be very fast.  However, on some systems SIGQUIT won't interrupt
-                * the poll/select call, so this also limits speed of response to
-                * SIGQUIT, which is more important.)
+                * be very fast.  However, on some systems SIGQUIT won't interrupt the
+                * poll/select call, so this also limits speed of response to SIGQUIT,
+                * which is more important.)
                 *
-                * We use poll(2) if available, otherwise select(2)
+                * We use poll(2) if available, otherwise select(2).
+                * Win32 has its own implementation.
                 */
+#ifndef WIN32
 #ifdef HAVE_POLL
                input_fd.fd = pgStatSock;
                input_fd.events = POLLIN | POLLERR;
@@ -1722,7 +2269,6 @@ PgstatCollectorMain(int argc, char *argv[])
                }
 
                got_data = (input_fd.revents != 0);
-
 #else                                                  /* !HAVE_POLL */
 
                FD_SET(pgStatSock, &rfds);
@@ -1744,8 +2290,11 @@ PgstatCollectorMain(int argc, char *argv[])
                }
 
                got_data = FD_ISSET(pgStatSock, &rfds);
-
 #endif   /* HAVE_POLL */
+#else /* WIN32 */
+               got_data = pgwin32_waitforsinglesocket(pgStatSock, FD_READ,
+                                                                                          PGSTAT_SELECT_TIMEOUT*1000);
+#endif
 
                /*
                 * If there is a message on the socket, read it and check for
@@ -1813,6 +2362,10 @@ PgstatCollectorMain(int argc, char *argv[])
                                        pgstat_recv_analyze((PgStat_MsgAnalyze *) &msg, len);
                                        break;
 
+                               case PGSTAT_MTYPE_BGWRITER:
+                                       pgstat_recv_bgwriter((PgStat_MsgBgWriter *) &msg, len);
+                                       break;
+
                                default:
                                        break;
                        }
@@ -1826,20 +2379,20 @@ PgstatCollectorMain(int argc, char *argv[])
                        {
                                if (setitimer(ITIMER_REAL, &write_timeout, NULL))
                                        ereport(ERROR,
-                                                 (errmsg("could not set statistics collector timer: %m")));
+                                       (errmsg("could not set statistics collector timer: %m")));
                                need_timer = false;
                        }
                }
                else
                {
                        /*
-                        * We can only get here if the select/poll timeout elapsed.
-                        * Check for postmaster death.
+                        * We can only get here if the select/poll timeout elapsed. Check
+                        * for postmaster death.
                         */
                        if (!PostmasterIsAlive(true))
                                break;
                }
-       } /* end of message-processing loop */
+       }                                                       /* end of message-processing loop */
 
        /*
         * Save the final stats to reuse at next startup.
@@ -1895,6 +2448,11 @@ pgstat_get_db_entry(Oid databaseid, bool create)
                result->n_xact_rollback = 0;
                result->n_blocks_fetched = 0;
                result->n_blocks_hit = 0;
+               result->n_tuples_returned = 0;
+               result->n_tuples_fetched = 0;
+               result->n_tuples_inserted = 0;
+               result->n_tuples_updated = 0;
+               result->n_tuples_deleted = 0;
                result->last_autovac_time = 0;
 
                memset(&hash_ctl, 0, sizeof(hash_ctl));
@@ -1946,6 +2504,11 @@ pgstat_write_statsfile(void)
        format_id = PGSTAT_FILE_FORMAT_ID;
        fwrite(&format_id, sizeof(format_id), 1, fpout);
 
+       /*
+        * Write global stats struct
+        */
+       fwrite(&globalStats, sizeof(globalStats), 1, fpout);
+
        /*
         * Walk through the database table.
         */
@@ -1953,9 +2516,9 @@ pgstat_write_statsfile(void)
        while ((dbentry = (PgStat_StatDBEntry *) hash_seq_search(&hstat)) != NULL)
        {
                /*
-                * Write out the DB entry including the number of live backends.
-                * We don't write the tables pointer since it's of no use to any
-                * other process.
+                * Write out the DB entry including the number of live backends. We
+                * don't write the tables pointer since it's of no use to any other
+                * process.
                 */
                fputc('D', fpout);
                fwrite(dbentry, offsetof(PgStat_StatDBEntry, tables), 1, fpout);
@@ -1987,8 +2550,8 @@ pgstat_write_statsfile(void)
        {
                ereport(LOG,
                                (errcode_for_file_access(),
-                                errmsg("could not write temporary statistics file \"%s\": %m",
-                                               PGSTAT_STAT_TMPFILE)));
+                          errmsg("could not write temporary statistics file \"%s\": %m",
+                                         PGSTAT_STAT_TMPFILE)));
                fclose(fpout);
                unlink(PGSTAT_STAT_TMPFILE);
        }
@@ -2018,38 +2581,24 @@ pgstat_write_statsfile(void)
  *     databases' hash table (whose entries point to the tables' hash tables).
  * ----------
  */
-static void
-pgstat_read_statsfile(HTAB **dbhash, Oid onlydb)
+static HTAB *
+pgstat_read_statsfile(Oid onlydb)
 {
        PgStat_StatDBEntry *dbentry;
        PgStat_StatDBEntry dbbuf;
        PgStat_StatTabEntry *tabentry;
        PgStat_StatTabEntry tabbuf;
        HASHCTL         hash_ctl;
+       HTAB       *dbhash;
        HTAB       *tabhash = NULL;
        FILE       *fpin;
        int32           format_id;
        bool            found;
-       MemoryContext use_mcxt;
-       int                     mcxt_flags;
 
        /*
-        * If running in the collector or the autovacuum process, we use the
-        * DynaHashCxt memory context.  If running in a backend, we use the
-        * TopTransactionContext instead, so the caller must only know the last
-        * XactId when this call happened to know if his tables are still valid or
-        * already gone!
+        * The tables will live in pgStatLocalContext.
         */
-       if (pgStatRunningInCollector || IsAutoVacuumProcess())
-       {
-               use_mcxt = NULL;
-               mcxt_flags = 0;
-       }
-       else
-       {
-               use_mcxt = TopTransactionContext;
-               mcxt_flags = HASH_CONTEXT;
-       }
+       pgstat_setup_memcxt();
 
        /*
         * Create the DB hashtable
@@ -2058,9 +2607,15 @@ pgstat_read_statsfile(HTAB **dbhash, Oid onlydb)
        hash_ctl.keysize = sizeof(Oid);
        hash_ctl.entrysize = sizeof(PgStat_StatDBEntry);
        hash_ctl.hash = oid_hash;
-       hash_ctl.hcxt = use_mcxt;
-       *dbhash = hash_create("Databases hash", PGSTAT_DB_HASH_SIZE, &hash_ctl,
-                                                 HASH_ELEM | HASH_FUNCTION | mcxt_flags);
+       hash_ctl.hcxt = pgStatLocalContext;
+       dbhash = hash_create("Databases hash", PGSTAT_DB_HASH_SIZE, &hash_ctl,
+                                                HASH_ELEM | HASH_FUNCTION | HASH_CONTEXT);
+
+       /*
+        * Clear out global statistics so they start from zero in case we can't
+        * load an existing statsfile.
+        */
+       memset(&globalStats, 0, sizeof(globalStats));
 
        /*
         * Try to open the status file. If it doesn't exist, the backends simply
@@ -2068,7 +2623,7 @@ pgstat_read_statsfile(HTAB **dbhash, Oid onlydb)
         * with empty counters.
         */
        if ((fpin = AllocateFile(PGSTAT_STAT_FILENAME, PG_BINARY_R)) == NULL)
-               return;
+               return dbhash;
 
        /*
         * Verify it's of the expected format.
@@ -2081,6 +2636,16 @@ pgstat_read_statsfile(HTAB **dbhash, Oid onlydb)
                goto done;
        }
 
+       /*
+        * Read global stats struct
+        */
+       if (fread(&globalStats, 1, sizeof(globalStats), fpin) != sizeof(globalStats))
+       {
+               ereport(pgStatRunningInCollector ? LOG : WARNING,
+                               (errmsg("corrupted pgstat.stat file")));
+               goto done;
+       }
+
        /*
         * We found an existing collector stats file. Read it and put all the
         * hashtable entries into place.
@@ -2106,7 +2671,7 @@ pgstat_read_statsfile(HTAB **dbhash, Oid onlydb)
                                /*
                                 * Add to the DB hash
                                 */
-                               dbentry = (PgStat_StatDBEntry *) hash_search(*dbhash,
+                               dbentry = (PgStat_StatDBEntry *) hash_search(dbhash,
                                                                                                  (void *) &dbbuf.databaseid,
                                                                                                                         HASH_ENTER,
                                                                                                                         &found);
@@ -2135,11 +2700,11 @@ pgstat_read_statsfile(HTAB **dbhash, Oid onlydb)
                                hash_ctl.keysize = sizeof(Oid);
                                hash_ctl.entrysize = sizeof(PgStat_StatTabEntry);
                                hash_ctl.hash = oid_hash;
-                               hash_ctl.hcxt = use_mcxt;
+                               hash_ctl.hcxt = pgStatLocalContext;
                                dbentry->tables = hash_create("Per-database table",
                                                                                          PGSTAT_TAB_HASH_SIZE,
                                                                                          &hash_ctl,
-                                                                        HASH_ELEM | HASH_FUNCTION | mcxt_flags);
+                                                                        HASH_ELEM | HASH_FUNCTION | HASH_CONTEXT);
 
                                /*
                                 * Arrange that following 'T's add entries to this database's
@@ -2202,44 +2767,74 @@ pgstat_read_statsfile(HTAB **dbhash, Oid onlydb)
 
 done:
        FreeFile(fpin);
+
+       return dbhash;
 }
 
 /*
- * If not done for this transaction, read the statistics collector
- * stats file into some hash tables.
- *
- * Because we store the tables in TopTransactionContext, the result
- * is good for the entire current main transaction.
- *
- * Inside the autovacuum process, the statfile is assumed to be valid
- * "forever", that is one iteration, within one database.  This means
- * we only consider the statistics as they were when the autovacuum
- * iteration started.
+ * If not already done, read the statistics collector stats file into
+ * some hash tables.  The results will be kept until pgstat_clear_snapshot()
+ * is called (typically, at end of transaction).
  */
 static void
 backend_read_statsfile(void)
 {
-       if (IsAutoVacuumProcess())
-       {
-               /* already read it? */
-               if (pgStatDBHash)
-                       return;
-               Assert(!pgStatRunningInCollector);
-               pgstat_read_statsfile(&pgStatDBHash, InvalidOid);
-       }
+       /* already read it? */
+       if (pgStatDBHash)
+               return;
+       Assert(!pgStatRunningInCollector);
+
+       /* Autovacuum launcher wants stats about all databases */
+       if (IsAutoVacuumLauncherProcess())
+               pgStatDBHash = pgstat_read_statsfile(InvalidOid);
        else
-       {
-               TransactionId topXid = GetTopTransactionId();
+               pgStatDBHash = pgstat_read_statsfile(MyDatabaseId);
+}
 
-               if (!TransactionIdEquals(pgStatDBHashXact, topXid))
-               {
-                       Assert(!pgStatRunningInCollector);
-                       pgstat_read_statsfile(&pgStatDBHash, MyDatabaseId);
-                       pgStatDBHashXact = topXid;
-               }
-       }
+
+/* ----------
+ * pgstat_setup_memcxt() -
+ *
+ *     Create pgStatLocalContext, if not already done.
+ * ----------
+ */
+static void
+pgstat_setup_memcxt(void)
+{
+       if (!pgStatLocalContext)
+               pgStatLocalContext = AllocSetContextCreate(TopMemoryContext,
+                                                                                                  "Statistics snapshot",
+                                                                                                  ALLOCSET_SMALL_MINSIZE,
+                                                                                                  ALLOCSET_SMALL_INITSIZE,
+                                                                                                  ALLOCSET_SMALL_MAXSIZE);
 }
 
+
+/* ----------
+ * pgstat_clear_snapshot() -
+ *
+ *     Discard any data collected in the current transaction.  Any subsequent
+ *     request will cause new snapshots to be read.
+ *
+ *     This is also invoked during transaction commit or abort to discard
+ *     the no-longer-wanted snapshot.
+ * ----------
+ */
+void
+pgstat_clear_snapshot(void)
+{
+       /* Release memory, if any was allocated */
+       if (pgStatLocalContext)
+               MemoryContextDelete(pgStatLocalContext);
+
+       /* Reset variables */
+       pgStatLocalContext = NULL;
+       pgStatDBHash = NULL;
+       localBackendStatusTable = NULL;
+       localNumBackends = 0;
+}
+
+
 /* ----------
  * pgstat_recv_tabstat() -
  *
@@ -2278,51 +2873,50 @@ pgstat_recv_tabstat(PgStat_MsgTabstat *msg, int len)
                         * If it's a new table entry, initialize counters to the values we
                         * just got.
                         */
-                       tabentry->numscans = tabmsg[i].t_numscans;
-                       tabentry->tuples_returned = tabmsg[i].t_tuples_returned;
-                       tabentry->tuples_fetched = tabmsg[i].t_tuples_fetched;
-                       tabentry->tuples_inserted = tabmsg[i].t_tuples_inserted;
-                       tabentry->tuples_updated = tabmsg[i].t_tuples_updated;
-                       tabentry->tuples_deleted = tabmsg[i].t_tuples_deleted;
-
-                       tabentry->n_live_tuples = tabmsg[i].t_tuples_inserted;
-                       tabentry->n_dead_tuples = tabmsg[i].t_tuples_updated +
-                               tabmsg[i].t_tuples_deleted;
+                       tabentry->numscans = tabmsg[i].t_counts.t_numscans;
+                       tabentry->tuples_returned = tabmsg[i].t_counts.t_tuples_returned;
+                       tabentry->tuples_fetched = tabmsg[i].t_counts.t_tuples_fetched;
+                       tabentry->tuples_inserted = tabmsg[i].t_counts.t_tuples_inserted;
+                       tabentry->tuples_updated = tabmsg[i].t_counts.t_tuples_updated;
+                       tabentry->tuples_deleted = tabmsg[i].t_counts.t_tuples_deleted;
+                       tabentry->n_live_tuples = tabmsg[i].t_counts.t_new_live_tuples;
+                       tabentry->n_dead_tuples = tabmsg[i].t_counts.t_new_dead_tuples;
+                       tabentry->blocks_fetched = tabmsg[i].t_counts.t_blocks_fetched;
+                       tabentry->blocks_hit = tabmsg[i].t_counts.t_blocks_hit;
+
                        tabentry->last_anl_tuples = 0;
                        tabentry->vacuum_timestamp = 0;
                        tabentry->autovac_vacuum_timestamp = 0;
                        tabentry->analyze_timestamp = 0;
                        tabentry->autovac_analyze_timestamp = 0;
-
-                       tabentry->blocks_fetched = tabmsg[i].t_blocks_fetched;
-                       tabentry->blocks_hit = tabmsg[i].t_blocks_hit;
                }
                else
                {
                        /*
                         * Otherwise add the values to the existing entry.
                         */
-                       tabentry->numscans += tabmsg[i].t_numscans;
-                       tabentry->tuples_returned += tabmsg[i].t_tuples_returned;
-                       tabentry->tuples_fetched += tabmsg[i].t_tuples_fetched;
-                       tabentry->tuples_inserted += tabmsg[i].t_tuples_inserted;
-                       tabentry->tuples_updated += tabmsg[i].t_tuples_updated;
-                       tabentry->tuples_deleted += tabmsg[i].t_tuples_deleted;
-
-                       tabentry->n_live_tuples += tabmsg[i].t_tuples_inserted -
-                               tabmsg[i].t_tuples_deleted;
-                       tabentry->n_dead_tuples += tabmsg[i].t_tuples_updated +
-                               tabmsg[i].t_tuples_deleted;
-
-                       tabentry->blocks_fetched += tabmsg[i].t_blocks_fetched;
-                       tabentry->blocks_hit += tabmsg[i].t_blocks_hit;
+                       tabentry->numscans += tabmsg[i].t_counts.t_numscans;
+                       tabentry->tuples_returned += tabmsg[i].t_counts.t_tuples_returned;
+                       tabentry->tuples_fetched += tabmsg[i].t_counts.t_tuples_fetched;
+                       tabentry->tuples_inserted += tabmsg[i].t_counts.t_tuples_inserted;
+                       tabentry->tuples_updated += tabmsg[i].t_counts.t_tuples_updated;
+                       tabentry->tuples_deleted += tabmsg[i].t_counts.t_tuples_deleted;
+                       tabentry->n_live_tuples += tabmsg[i].t_counts.t_new_live_tuples;
+                       tabentry->n_dead_tuples += tabmsg[i].t_counts.t_new_dead_tuples;
+                       tabentry->blocks_fetched += tabmsg[i].t_counts.t_blocks_fetched;
+                       tabentry->blocks_hit += tabmsg[i].t_counts.t_blocks_hit;
                }
 
                /*
-                * And add the block IO to the database entry.
+                * Add per-table stats to the per-database entry, too.
                 */
-               dbentry->n_blocks_fetched += tabmsg[i].t_blocks_fetched;
-               dbentry->n_blocks_hit += tabmsg[i].t_blocks_hit;
+               dbentry->n_tuples_returned += tabmsg[i].t_counts.t_tuples_returned;
+               dbentry->n_tuples_fetched += tabmsg[i].t_counts.t_tuples_fetched;
+               dbentry->n_tuples_inserted += tabmsg[i].t_counts.t_tuples_inserted;
+               dbentry->n_tuples_updated += tabmsg[i].t_counts.t_tuples_updated;
+               dbentry->n_tuples_deleted += tabmsg[i].t_counts.t_tuples_deleted;
+               dbentry->n_blocks_fetched += tabmsg[i].t_counts.t_blocks_fetched;
+               dbentry->n_blocks_hit += tabmsg[i].t_counts.t_blocks_hit;
        }
 }
 
@@ -2491,10 +3085,10 @@ pgstat_recv_vacuum(PgStat_MsgVacuum *msg, int len)
        if (tabentry == NULL)
                return;
 
-       if (msg->m_autovacuum) 
+       if (msg->m_autovacuum)
                tabentry->autovac_vacuum_timestamp = msg->m_vacuumtime;
-       else 
-               tabentry->vacuum_timestamp = msg->m_vacuumtime; 
+       else
+               tabentry->vacuum_timestamp = msg->m_vacuumtime;
        tabentry->n_live_tuples = msg->m_tuples;
        tabentry->n_dead_tuples = 0;
        if (msg->m_analyze)
@@ -2539,11 +3133,30 @@ pgstat_recv_analyze(PgStat_MsgAnalyze *msg, int len)
        if (tabentry == NULL)
                return;
 
-       if (msg->m_autovacuum) 
+       if (msg->m_autovacuum)
                tabentry->autovac_analyze_timestamp = msg->m_analyzetime;
-       else 
+       else
                tabentry->analyze_timestamp = msg->m_analyzetime;
        tabentry->n_live_tuples = msg->m_live_tuples;
        tabentry->n_dead_tuples = msg->m_dead_tuples;
        tabentry->last_anl_tuples = msg->m_live_tuples + msg->m_dead_tuples;
 }
+
+
+/* ----------
+ * pgstat_recv_bgwriter() -
+ *
+ *     Process a BGWRITER message.
+ * ----------
+ */
+static void
+pgstat_recv_bgwriter(PgStat_MsgBgWriter *msg, int len)
+{
+       globalStats.timed_checkpoints += msg->m_timed_checkpoints;
+       globalStats.requested_checkpoints += msg->m_requested_checkpoints;
+       globalStats.buf_written_checkpoints += msg->m_buf_written_checkpoints;
+       globalStats.buf_written_lru += msg->m_buf_written_lru;
+       globalStats.buf_written_all += msg->m_buf_written_all;
+       globalStats.maxwritten_lru += msg->m_maxwritten_lru;
+       globalStats.maxwritten_all += msg->m_maxwritten_all;
+}