* - Add a pgstat config column to pg_database, so this
* entire thing can be enabled/disabled on a per db basis.
*
- * Copyright (c) 2001-2006, PostgreSQL Global Development Group
+ * Copyright (c) 2001-2007, PostgreSQL Global Development Group
*
- * $PostgreSQL: pgsql/src/backend/postmaster/pgstat.c,v 1.134 2006/07/13 16:49:15 momjian Exp $
+ * $PostgreSQL: pgsql/src/backend/postmaster/pgstat.c,v 1.156 2007/05/27 03:50:39 tgl Exp $
* ----------
*/
#include "postgres.h"
#include "access/heapam.h"
#include "access/transam.h"
+#include "access/twophase_rmgr.h"
#include "access/xact.h"
#include "catalog/pg_database.h"
#include "libpq/ip.h"
#include "storage/ipc.h"
#include "storage/pg_shmem.h"
#include "storage/pmsignal.h"
-#include "storage/procarray.h"
-#include "tcop/tcopprot.h"
-#include "utils/hsearch.h"
#include "utils/memutils.h"
#include "utils/ps_status.h"
-#include "utils/rel.h"
-#include "utils/syscache.h"
/* ----------
bool pgstat_collect_blocklevel = false;
bool pgstat_collect_querystring = false;
+/*
+ * BgWriter global statistics counters (unused in other processes).
+ * Stored directly in a stats message structure so it can be sent
+ * without needing to copy things around. We assume this inits to zeroes.
+ */
+PgStat_MsgBgWriter BgWriterStats;
+
/* ----------
* Local data
* ----------
static bool pgStatRunningInCollector = false;
/*
- * Place where backends store per-table info to be sent to the collector.
- * We store shared relations separately from non-shared ones, to be able to
- * send them in separate messages.
+ * Structures in which backends store per-table info that's waiting to be
+ * sent to the collector.
+ *
+ * NOTE: once allocated, TabStatusArray structures are never moved or deleted
+ * for the life of the backend. Also, we zero out the t_id fields of the
+ * contained PgStat_TableStatus structs whenever they are not actively in use.
+ * This allows relcache pgstat_info pointers to be treated as long-lived data,
+ * avoiding repeated searches in pgstat_initstats() when a relation is
+ * repeatedly opened during a transaction.
*/
-typedef struct TabStatArray
+#define TABSTAT_QUANTUM 100 /* we alloc this many at a time */
+
+typedef struct TabStatusArray
{
- int tsa_alloc; /* num allocated */
- int tsa_used; /* num actually used */
- PgStat_MsgTabstat **tsa_messages; /* the array itself */
-} TabStatArray;
+ struct TabStatusArray *tsa_next; /* link to next array, if any */
+ int tsa_used; /* # entries currently used */
+ PgStat_TableStatus tsa_entries[TABSTAT_QUANTUM]; /* per-table data */
+} TabStatusArray;
+
+static TabStatusArray *pgStatTabList = NULL;
-#define TABSTAT_QUANTUM 4 /* we alloc this many at a time */
+/*
+ * Tuple insertion/deletion counts for an open transaction can't be propagated
+ * into PgStat_TableStatus counters until we know if it is going to commit
+ * or abort. Hence, we keep these counts in per-subxact structs that live
+ * in TopTransactionContext. This data structure is designed on the assumption
+ * that subxacts won't usually modify very many tables.
+ */
+typedef struct PgStat_SubXactStatus
+{
+ int nest_level; /* subtransaction nest level */
+ struct PgStat_SubXactStatus *prev; /* higher-level subxact if any */
+ PgStat_TableXactStatus *first; /* head of list for this subxact */
+} PgStat_SubXactStatus;
-static TabStatArray RegularTabStat = {0, 0, NULL};
-static TabStatArray SharedTabStat = {0, 0, NULL};
+static PgStat_SubXactStatus *pgStatXactStack = NULL;
static int pgStatXactCommit = 0;
static int pgStatXactRollback = 0;
-static TransactionId pgStatDBHashXact = InvalidTransactionId;
+/* Record that's written to 2PC state file when pgstat state is persisted */
+typedef struct TwoPhasePgStatRecord
+{
+ PgStat_Counter tuples_inserted; /* tuples inserted in xact */
+ PgStat_Counter tuples_deleted; /* tuples deleted in xact */
+ Oid t_id; /* table's OID */
+ bool t_shared; /* is it a shared catalog? */
+} TwoPhasePgStatRecord;
+
+/*
+ * Info about current "snapshot" of stats file
+ */
+static MemoryContext pgStatLocalContext = NULL;
static HTAB *pgStatDBHash = NULL;
-static TransactionId pgStatLocalStatusXact = InvalidTransactionId;
static PgBackendStatus *localBackendStatusTable = NULL;
static int localNumBackends = 0;
-static volatile bool need_exit = false;
-static volatile bool need_statwrite = false;
+/*
+ * Cluster wide statistics, kept in the stats collector.
+ * Contains statistics that are not collected per database
+ * or per table.
+ */
+static PgStat_GlobalStats globalStats;
+
+static volatile bool need_exit = false;
+static volatile bool need_statwrite = false;
/* ----------
static void pgstat_beshutdown_hook(int code, Datum arg);
static PgStat_StatDBEntry *pgstat_get_db_entry(Oid databaseid, bool create);
-static void pgstat_drop_database(Oid databaseid);
static void pgstat_write_statsfile(void);
-static void pgstat_read_statsfile(HTAB **dbhash, Oid onlydb);
+static HTAB *pgstat_read_statsfile(Oid onlydb);
static void backend_read_statsfile(void);
static void pgstat_read_current_status(void);
+static void pgstat_send_tabstat(PgStat_MsgTabstat *tsmsg);
+static HTAB *pgstat_collect_oids(Oid catalogid);
+
+static PgStat_TableStatus *get_tabstat_entry(Oid rel_id, bool isshared);
+
+static void pgstat_setup_memcxt(void);
+
static void pgstat_setheader(PgStat_MsgHdr *hdr, StatMsgType mtype);
static void pgstat_send(void *msg, int len);
static void pgstat_recv_autovac(PgStat_MsgAutovacStart *msg, int len);
static void pgstat_recv_vacuum(PgStat_MsgVacuum *msg, int len);
static void pgstat_recv_analyze(PgStat_MsgAnalyze *msg, int len);
+static void pgstat_recv_bgwriter(PgStat_MsgBgWriter *msg, int len);
/* ------------------------------------------------------------
char test_byte;
int sel_res;
int tries = 0;
-
+
#define TESTBYTEVAL ((char) 199)
/*
* Force start of collector daemon if something to collect. Note that
- * pgstat_collect_querystring is now an independent facility that does
- * not require the collector daemon.
+ * pgstat_collect_querystring is now an independent facility that does not
+ * require the collector daemon.
*/
if (pgstat_collect_tuplelevel ||
pgstat_collect_blocklevel)
if (++tries > 1)
ereport(LOG,
- (errmsg("trying another address for the statistics collector")));
-
+ (errmsg("trying another address for the statistics collector")));
+
/*
* Create the socket.
*/
* rules prevent it).
*/
test_byte = TESTBYTEVAL;
+
+retry1:
if (send(pgStatSock, &test_byte, 1, 0) != 1)
{
+ if (errno == EINTR)
+ goto retry1; /* if interrupted, just retry */
ereport(LOG,
(errcode_for_socket_access(),
errmsg("could not send test message on socket for statistics collector: %m")));
test_byte++; /* just make sure variable is changed */
+retry2:
if (recv(pgStatSock, &test_byte, 1, 0) != 1)
{
+ if (errno == EINTR)
+ goto retry2; /* if interrupted, just retry */
ereport(LOG,
(errcode_for_socket_access(),
errmsg("could not receive test message on socket for statistics collector: %m")));
return postmaster_forkexec(ac, av);
}
-
#endif /* EXEC_BACKEND */
return 0;
}
+void allow_immediate_pgstat_restart(void)
+{
+ last_pgstat_start_time = 0;
+}
/* ------------------------------------------------------------
* Public functions used by backends follow
/* ----------
* pgstat_report_tabstat() -
*
- * Called from tcop/postgres.c to send the so far collected
- * per table access statistics to the collector.
+ * Called from tcop/postgres.c to send the so far collected per-table
+ * access statistics to the collector. Note that this is called only
+ * when not within a transaction, so it is fair to use transaction stop
+ * time as an approximation of current time.
* ----------
*/
void
-pgstat_report_tabstat(void)
+pgstat_report_tabstat(bool force)
{
+ /* we assume this inits to all zeroes: */
+ static const PgStat_TableCounts all_zeroes;
+ static TimestampTz last_report = 0;
+
+ TimestampTz now;
+ PgStat_MsgTabstat regular_msg;
+ PgStat_MsgTabstat shared_msg;
+ TabStatusArray *tsa;
int i;
- if (pgStatSock < 0 ||
- (!pgstat_collect_tuplelevel &&
- !pgstat_collect_blocklevel))
- {
- /* Not reporting stats, so just flush whatever we have */
- RegularTabStat.tsa_used = 0;
- SharedTabStat.tsa_used = 0;
+ /* Don't expend a clock check if nothing to do */
+ if (pgStatTabList == NULL ||
+ pgStatTabList->tsa_used == 0)
return;
- }
/*
- * For each message buffer used during the last query set the header
- * fields and send it out.
+ * Don't send a message unless it's been at least PGSTAT_STAT_INTERVAL
+ * msec since we last sent one, or the caller wants to force stats out.
*/
- for (i = 0; i < RegularTabStat.tsa_used; i++)
+ now = GetCurrentTransactionStopTimestamp();
+ if (!force &&
+ !TimestampDifferenceExceeds(last_report, now, PGSTAT_STAT_INTERVAL))
+ return;
+ last_report = now;
+
+ /*
+ * Scan through the TabStatusArray struct(s) to find tables that actually
+ * have counts, and build messages to send. We have to separate shared
+ * relations from regular ones because the databaseid field in the
+ * message header has to depend on that.
+ */
+ regular_msg.m_databaseid = MyDatabaseId;
+ shared_msg.m_databaseid = InvalidOid;
+ regular_msg.m_nentries = 0;
+ shared_msg.m_nentries = 0;
+
+ for (tsa = pgStatTabList; tsa != NULL; tsa = tsa->tsa_next)
{
- PgStat_MsgTabstat *tsmsg = RegularTabStat.tsa_messages[i];
- int n;
- int len;
+ for (i = 0; i < tsa->tsa_used; i++)
+ {
+ PgStat_TableStatus *entry = &tsa->tsa_entries[i];
+ PgStat_MsgTabstat *this_msg;
+ PgStat_TableEntry *this_ent;
+
+ /* Shouldn't have any pending transaction-dependent counts */
+ Assert(entry->trans == NULL);
+
+ /*
+ * Ignore entries that didn't accumulate any actual counts,
+ * such as indexes that were opened by the planner but not used.
+ */
+ if (memcmp(&entry->t_counts, &all_zeroes,
+ sizeof(PgStat_TableCounts)) == 0)
+ continue;
+ /*
+ * OK, insert data into the appropriate message, and send if full.
+ */
+ this_msg = entry->t_shared ? &shared_msg : ®ular_msg;
+ this_ent = &this_msg->m_entry[this_msg->m_nentries];
+ this_ent->t_id = entry->t_id;
+ memcpy(&this_ent->t_counts, &entry->t_counts,
+ sizeof(PgStat_TableCounts));
+ if (++this_msg->m_nentries >= PGSTAT_NUM_TABENTRIES)
+ {
+ pgstat_send_tabstat(this_msg);
+ this_msg->m_nentries = 0;
+ }
+ }
+ /* zero out TableStatus structs after use */
+ MemSet(tsa->tsa_entries, 0,
+ tsa->tsa_used * sizeof(PgStat_TableStatus));
+ tsa->tsa_used = 0;
+ }
+
+ /*
+ * Send partial messages. If force is true, make sure that any pending
+ * xact commit/abort gets counted, even if no table stats to send.
+ */
+ if (regular_msg.m_nentries > 0 ||
+ (force && (pgStatXactCommit > 0 || pgStatXactRollback > 0)))
+ pgstat_send_tabstat(®ular_msg);
+ if (shared_msg.m_nentries > 0)
+ pgstat_send_tabstat(&shared_msg);
+}
+
+/*
+ * Subroutine for pgstat_report_tabstat: finish and send a tabstat message
+ */
+static void
+pgstat_send_tabstat(PgStat_MsgTabstat *tsmsg)
+{
+ int n;
+ int len;
- n = tsmsg->m_nentries;
- len = offsetof(PgStat_MsgTabstat, m_entry[0]) +
- n * sizeof(PgStat_TableEntry);
+ /* It's unlikely we'd get here with no socket, but maybe not impossible */
+ if (pgStatSock < 0)
+ return;
+ /*
+ * Report accumulated xact commit/rollback whenever we send a normal
+ * tabstat message
+ */
+ if (OidIsValid(tsmsg->m_databaseid))
+ {
tsmsg->m_xact_commit = pgStatXactCommit;
tsmsg->m_xact_rollback = pgStatXactRollback;
pgStatXactCommit = 0;
pgStatXactRollback = 0;
-
- pgstat_setheader(&tsmsg->m_hdr, PGSTAT_MTYPE_TABSTAT);
- tsmsg->m_databaseid = MyDatabaseId;
- pgstat_send(tsmsg, len);
}
- RegularTabStat.tsa_used = 0;
-
- /* Ditto, for shared relations */
- for (i = 0; i < SharedTabStat.tsa_used; i++)
+ else
{
- PgStat_MsgTabstat *tsmsg = SharedTabStat.tsa_messages[i];
- int n;
- int len;
-
- n = tsmsg->m_nentries;
- len = offsetof(PgStat_MsgTabstat, m_entry[0]) +
- n * sizeof(PgStat_TableEntry);
-
- /* We don't report transaction commit/abort here */
tsmsg->m_xact_commit = 0;
tsmsg->m_xact_rollback = 0;
-
- pgstat_setheader(&tsmsg->m_hdr, PGSTAT_MTYPE_TABSTAT);
- tsmsg->m_databaseid = InvalidOid;
- pgstat_send(tsmsg, len);
}
- SharedTabStat.tsa_used = 0;
+
+ n = tsmsg->m_nentries;
+ len = offsetof(PgStat_MsgTabstat, m_entry[0]) +
+ n * sizeof(PgStat_TableEntry);
+
+ pgstat_setheader(&tsmsg->m_hdr, PGSTAT_MTYPE_TABSTAT);
+ pgstat_send(tsmsg, len);
}
void
pgstat_vacuum_tabstat(void)
{
- List *oidlist;
- Relation rel;
- HeapScanDesc scan;
- HeapTuple tup;
+ HTAB *htab;
PgStat_MsgTabpurge msg;
HASH_SEQ_STATUS hstat;
PgStat_StatDBEntry *dbentry;
/*
* Read pg_database and make a list of OIDs of all existing databases
*/
- oidlist = NIL;
- rel = heap_open(DatabaseRelationId, AccessShareLock);
- scan = heap_beginscan(rel, SnapshotNow, 0, NULL);
- while ((tup = heap_getnext(scan, ForwardScanDirection)) != NULL)
- {
- oidlist = lappend_oid(oidlist, HeapTupleGetOid(tup));
- }
- heap_endscan(scan);
- heap_close(rel, AccessShareLock);
+ htab = pgstat_collect_oids(DatabaseRelationId);
/*
* Search the database hash table for dead databases and tell the
{
Oid dbid = dbentry->databaseid;
- if (!list_member_oid(oidlist, dbid))
+ CHECK_FOR_INTERRUPTS();
+
+ if (hash_search(htab, (void *) &dbid, HASH_FIND, NULL) == NULL)
pgstat_drop_database(dbid);
}
/* Clean up */
- list_free(oidlist);
+ hash_destroy(htab);
/*
* Lookup our own database entry; if not found, nothing more to do.
/*
* Similarly to above, make a list of all known relations in this DB.
*/
- oidlist = NIL;
- rel = heap_open(RelationRelationId, AccessShareLock);
- scan = heap_beginscan(rel, SnapshotNow, 0, NULL);
- while ((tup = heap_getnext(scan, ForwardScanDirection)) != NULL)
- {
- oidlist = lappend_oid(oidlist, HeapTupleGetOid(tup));
- }
- heap_endscan(scan);
- heap_close(rel, AccessShareLock);
+ htab = pgstat_collect_oids(RelationRelationId);
/*
* Initialize our messages table counter to zero
hash_seq_init(&hstat, dbentry->tables);
while ((tabentry = (PgStat_StatTabEntry *) hash_seq_search(&hstat)) != NULL)
{
- if (list_member_oid(oidlist, tabentry->tableid))
+ Oid tabid = tabentry->tableid;
+
+ CHECK_FOR_INTERRUPTS();
+
+ if (hash_search(htab, (void *) &tabid, HASH_FIND, NULL) != NULL)
continue;
/*
* Not there, so add this table's Oid to the message
*/
- msg.m_tableid[msg.m_nentries++] = tabentry->tableid;
+ msg.m_tableid[msg.m_nentries++] = tabid;
/*
* If the message is full, send it out and reinitialize to empty
}
/* Clean up */
- list_free(oidlist);
+ hash_destroy(htab);
+}
+
+
+/* ----------
+ * pgstat_collect_oids() -
+ *
+ * Collect the OIDs of either all databases or all tables, according to
+ * the parameter, into a temporary hash table. Caller should hash_destroy
+ * the result when done with it.
+ * ----------
+ */
+static HTAB *
+pgstat_collect_oids(Oid catalogid)
+{
+ HTAB *htab;
+ HASHCTL hash_ctl;
+ Relation rel;
+ HeapScanDesc scan;
+ HeapTuple tup;
+
+ memset(&hash_ctl, 0, sizeof(hash_ctl));
+ hash_ctl.keysize = sizeof(Oid);
+ hash_ctl.entrysize = sizeof(Oid);
+ hash_ctl.hash = oid_hash;
+ htab = hash_create("Temporary table of OIDs",
+ PGSTAT_TAB_HASH_SIZE,
+ &hash_ctl,
+ HASH_ELEM | HASH_FUNCTION);
+
+ rel = heap_open(catalogid, AccessShareLock);
+ scan = heap_beginscan(rel, SnapshotNow, 0, NULL);
+ while ((tup = heap_getnext(scan, ForwardScanDirection)) != NULL)
+ {
+ Oid thisoid = HeapTupleGetOid(tup);
+
+ CHECK_FOR_INTERRUPTS();
+
+ (void) hash_search(htab, (void *) &thisoid, HASH_ENTER, NULL);
+ }
+ heap_endscan(scan);
+ heap_close(rel, AccessShareLock);
+
+ return htab;
}
* via future invocations of pgstat_vacuum_tabstat().)
* ----------
*/
-static void
+void
pgstat_drop_database(Oid databaseid)
{
PgStat_MsgDropdb msg;
msg.m_tableid[0] = relid;
msg.m_nentries = 1;
- len = offsetof(PgStat_MsgTabpurge, m_tableid[0]) + sizeof(Oid);
+ len = offsetof(PgStat_MsgTabpurge, m_tableid[0]) +sizeof(Oid);
pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_TABPURGE);
msg.m_databaseid = MyDatabaseId;
msg.m_databaseid = shared ? InvalidOid : MyDatabaseId;
msg.m_tableoid = tableoid;
msg.m_analyze = analyze;
- msg.m_autovacuum = IsAutoVacuumProcess(); /* is this autovacuum? */
+ msg.m_autovacuum = IsAutoVacuumWorkerProcess(); /* is this autovacuum? */
msg.m_vacuumtime = GetCurrentTimestamp();
msg.m_tuples = tuples;
pgstat_send(&msg, sizeof(msg));
pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_ANALYZE);
msg.m_databaseid = shared ? InvalidOid : MyDatabaseId;
msg.m_tableoid = tableoid;
- msg.m_autovacuum = IsAutoVacuumProcess(); /* is this autovacuum? */
+ msg.m_autovacuum = IsAutoVacuumWorkerProcess(); /* is this autovacuum? */
msg.m_analyzetime = GetCurrentTimestamp();
msg.m_live_tuples = livetuples;
msg.m_dead_tuples = deadtuples;
pgstat_send(&msg, sizeof(msg));
}
-/*
- * Enlarge a TabStatArray
- */
-static void
-more_tabstat_space(TabStatArray *tsarr)
-{
- PgStat_MsgTabstat *newMessages;
- PgStat_MsgTabstat **msgArray;
- int newAlloc;
- int i;
-
- AssertArg(PointerIsValid(tsarr));
-
- newAlloc = tsarr->tsa_alloc + TABSTAT_QUANTUM;
-
- /* Create (another) quantum of message buffers */
- newMessages = (PgStat_MsgTabstat *)
- MemoryContextAllocZero(TopMemoryContext,
- sizeof(PgStat_MsgTabstat) * TABSTAT_QUANTUM);
-
- /* Create or enlarge the pointer array */
- if (tsarr->tsa_messages == NULL)
- msgArray = (PgStat_MsgTabstat **)
- MemoryContextAlloc(TopMemoryContext,
- sizeof(PgStat_MsgTabstat *) * newAlloc);
- else
- msgArray = (PgStat_MsgTabstat **)
- repalloc(tsarr->tsa_messages,
- sizeof(PgStat_MsgTabstat *) * newAlloc);
-
- for (i = 0; i < TABSTAT_QUANTUM; i++)
- msgArray[tsarr->tsa_alloc + i] = newMessages++;
- tsarr->tsa_messages = msgArray;
- tsarr->tsa_alloc = newAlloc;
-
- Assert(tsarr->tsa_used < tsarr->tsa_alloc);
-}
/* ----------
* pgstat_initstats() -
*
- * Called from various places usually dealing with initialization
- * of Relation or Scan structures. The data placed into these
- * structures from here tell where later to count for buffer reads,
- * scans and tuples fetched.
+ * Initialize a relcache entry to count access statistics.
+ * Called whenever a relation is opened.
+ *
+ * We assume that a relcache entry's pgstat_info field is zeroed by
+ * relcache.c when the relcache entry is made; thereafter it is long-lived
+ * data. We can avoid repeated searches of the TabStatus arrays when the
+ * same relation is touched repeatedly within a transaction.
* ----------
*/
void
-pgstat_initstats(PgStat_Info *stats, Relation rel)
+pgstat_initstats(Relation rel)
{
Oid rel_id = rel->rd_id;
- PgStat_TableEntry *useent;
- TabStatArray *tsarr;
- PgStat_MsgTabstat *tsmsg;
- int mb;
- int i;
+ char relkind = rel->rd_rel->relkind;
- /*
- * Initialize data not to count at all.
- */
- stats->tabentry = NULL;
+ /* We only count stats for things that have storage */
+ if (!(relkind == RELKIND_RELATION ||
+ relkind == RELKIND_INDEX ||
+ relkind == RELKIND_TOASTVALUE))
+ {
+ rel->pgstat_info = NULL;
+ return;
+ }
if (pgStatSock < 0 ||
!(pgstat_collect_tuplelevel ||
pgstat_collect_blocklevel))
+ {
+ /* We're not counting at all */
+ rel->pgstat_info = NULL;
+ return;
+ }
+
+ /*
+ * If we already set up this relation in the current transaction,
+ * nothing to do.
+ */
+ if (rel->pgstat_info != NULL &&
+ rel->pgstat_info->t_id == rel_id)
return;
- tsarr = rel->rd_rel->relisshared ? &SharedTabStat : &RegularTabStat;
+ /* Else find or make the PgStat_TableStatus entry, and update link */
+ rel->pgstat_info = get_tabstat_entry(rel_id, rel->rd_rel->relisshared);
+}
+
+/*
+ * get_tabstat_entry - find or create a PgStat_TableStatus entry for rel
+ */
+static PgStat_TableStatus *
+get_tabstat_entry(Oid rel_id, bool isshared)
+{
+ PgStat_TableStatus *entry;
+ TabStatusArray *tsa;
+ TabStatusArray *prev_tsa;
+ int i;
/*
- * Search the already-used message slots for this relation.
+ * Search the already-used tabstat slots for this relation.
*/
- for (mb = 0; mb < tsarr->tsa_used; mb++)
+ prev_tsa = NULL;
+ for (tsa = pgStatTabList; tsa != NULL; prev_tsa = tsa, tsa = tsa->tsa_next)
{
- tsmsg = tsarr->tsa_messages[mb];
-
- for (i = tsmsg->m_nentries; --i >= 0;)
+ for (i = 0; i < tsa->tsa_used; i++)
{
- if (tsmsg->m_entry[i].t_id == rel_id)
- {
- stats->tabentry = (void *) &(tsmsg->m_entry[i]);
- return;
- }
+ entry = &tsa->tsa_entries[i];
+ if (entry->t_id == rel_id)
+ return entry;
}
- if (tsmsg->m_nentries >= PGSTAT_NUM_TABENTRIES)
- continue;
-
- /*
- * Not found, but found a message buffer with an empty slot instead.
- * Fine, let's use this one.
- */
- i = tsmsg->m_nentries++;
- useent = &tsmsg->m_entry[i];
- MemSet(useent, 0, sizeof(PgStat_TableEntry));
- useent->t_id = rel_id;
- stats->tabentry = (void *) useent;
- return;
+ if (tsa->tsa_used < TABSTAT_QUANTUM)
+ {
+ /*
+ * It must not be present, but we found a free slot instead.
+ * Fine, let's use this one. We assume the entry was already
+ * zeroed, either at creation or after last use.
+ */
+ entry = &tsa->tsa_entries[tsa->tsa_used++];
+ entry->t_id = rel_id;
+ entry->t_shared = isshared;
+ return entry;
+ }
}
/*
- * If we ran out of message buffers, we just allocate more.
+ * We ran out of tabstat slots, so allocate more. Be sure they're zeroed.
+ */
+ tsa = (TabStatusArray *) MemoryContextAllocZero(TopMemoryContext,
+ sizeof(TabStatusArray));
+ if (prev_tsa)
+ prev_tsa->tsa_next = tsa;
+ else
+ pgStatTabList = tsa;
+
+ /*
+ * Use the first entry of the new TabStatusArray.
*/
- if (tsarr->tsa_used >= tsarr->tsa_alloc)
- more_tabstat_space(tsarr);
+ entry = &tsa->tsa_entries[tsa->tsa_used++];
+ entry->t_id = rel_id;
+ entry->t_shared = isshared;
+ return entry;
+}
+
+/*
+ * get_tabstat_stack_level - add a new (sub)transaction stack entry if needed
+ */
+static PgStat_SubXactStatus *
+get_tabstat_stack_level(int nest_level)
+{
+ PgStat_SubXactStatus *xact_state;
+
+ xact_state = pgStatXactStack;
+ if (xact_state == NULL || xact_state->nest_level != nest_level)
+ {
+ xact_state = (PgStat_SubXactStatus *)
+ MemoryContextAlloc(TopTransactionContext,
+ sizeof(PgStat_SubXactStatus));
+ xact_state->nest_level = nest_level;
+ xact_state->prev = pgStatXactStack;
+ xact_state->first = NULL;
+ pgStatXactStack = xact_state;
+ }
+ return xact_state;
+}
+
+/*
+ * add_tabstat_xact_level - add a new (sub)transaction state record
+ */
+static void
+add_tabstat_xact_level(PgStat_TableStatus *pgstat_info, int nest_level)
+{
+ PgStat_SubXactStatus *xact_state;
+ PgStat_TableXactStatus *trans;
/*
- * Use the first entry of the next message buffer.
+ * If this is the first rel to be modified at the current nest level,
+ * we first have to push a transaction stack entry.
*/
- mb = tsarr->tsa_used++;
- tsmsg = tsarr->tsa_messages[mb];
- tsmsg->m_nentries = 1;
- useent = &tsmsg->m_entry[0];
- MemSet(useent, 0, sizeof(PgStat_TableEntry));
- useent->t_id = rel_id;
- stats->tabentry = (void *) useent;
+ xact_state = get_tabstat_stack_level(nest_level);
+
+ /* Now make a per-table stack entry */
+ trans = (PgStat_TableXactStatus *)
+ MemoryContextAllocZero(TopTransactionContext,
+ sizeof(PgStat_TableXactStatus));
+ trans->nest_level = nest_level;
+ trans->upper = pgstat_info->trans;
+ trans->parent = pgstat_info;
+ trans->next = xact_state->first;
+ xact_state->first = trans;
+ pgstat_info->trans = trans;
+}
+
+/*
+ * pgstat_count_heap_insert - count a tuple insertion
+ */
+void
+pgstat_count_heap_insert(Relation rel)
+{
+ PgStat_TableStatus *pgstat_info = rel->pgstat_info;
+
+ if (pgstat_collect_tuplelevel && pgstat_info != NULL)
+ {
+ int nest_level = GetCurrentTransactionNestLevel();
+
+ /* t_tuples_inserted is nontransactional, so just advance it */
+ pgstat_info->t_counts.t_tuples_inserted++;
+
+ /* We have to log the transactional effect at the proper level */
+ if (pgstat_info->trans == NULL ||
+ pgstat_info->trans->nest_level != nest_level)
+ add_tabstat_xact_level(pgstat_info, nest_level);
+
+ pgstat_info->trans->tuples_inserted++;
+ }
+}
+
+/*
+ * pgstat_count_heap_update - count a tuple update
+ */
+void
+pgstat_count_heap_update(Relation rel)
+{
+ PgStat_TableStatus *pgstat_info = rel->pgstat_info;
+
+ if (pgstat_collect_tuplelevel && pgstat_info != NULL)
+ {
+ int nest_level = GetCurrentTransactionNestLevel();
+
+ /* t_tuples_updated is nontransactional, so just advance it */
+ pgstat_info->t_counts.t_tuples_updated++;
+
+ /* We have to log the transactional effect at the proper level */
+ if (pgstat_info->trans == NULL ||
+ pgstat_info->trans->nest_level != nest_level)
+ add_tabstat_xact_level(pgstat_info, nest_level);
+
+ /* An UPDATE both inserts a new tuple and deletes the old */
+ pgstat_info->trans->tuples_inserted++;
+ pgstat_info->trans->tuples_deleted++;
+ }
+}
+
+/*
+ * pgstat_count_heap_delete - count a tuple deletion
+ */
+void
+pgstat_count_heap_delete(Relation rel)
+{
+ PgStat_TableStatus *pgstat_info = rel->pgstat_info;
+
+ if (pgstat_collect_tuplelevel && pgstat_info != NULL)
+ {
+ int nest_level = GetCurrentTransactionNestLevel();
+
+ /* t_tuples_deleted is nontransactional, so just advance it */
+ pgstat_info->t_counts.t_tuples_deleted++;
+
+ /* We have to log the transactional effect at the proper level */
+ if (pgstat_info->trans == NULL ||
+ pgstat_info->trans->nest_level != nest_level)
+ add_tabstat_xact_level(pgstat_info, nest_level);
+
+ pgstat_info->trans->tuples_deleted++;
+ }
}
/* ----------
- * pgstat_count_xact_commit() -
+ * AtEOXact_PgStat
*
- * Called from access/transam/xact.c to count transaction commits.
+ * Called from access/transam/xact.c at top-level transaction commit/abort.
* ----------
*/
void
-pgstat_count_xact_commit(void)
+AtEOXact_PgStat(bool isCommit)
{
- if (!pgstat_collect_tuplelevel &&
- !pgstat_collect_blocklevel)
- return;
-
- pgStatXactCommit++;
+ PgStat_SubXactStatus *xact_state;
/*
- * If there was no relation activity yet, just make one existing message
- * buffer used without slots, causing the next report to tell new
- * xact-counters.
+ * Count transaction commit or abort. (We use counters, not just bools,
+ * in case the reporting message isn't sent right away.)
*/
- if (RegularTabStat.tsa_alloc == 0)
- more_tabstat_space(&RegularTabStat);
+ if (isCommit)
+ pgStatXactCommit++;
+ else
+ pgStatXactRollback++;
- if (RegularTabStat.tsa_used == 0)
+ /*
+ * Transfer transactional insert/update counts into the base tabstat
+ * entries. We don't bother to free any of the transactional state,
+ * since it's all in TopTransactionContext and will go away anyway.
+ */
+ xact_state = pgStatXactStack;
+ if (xact_state != NULL)
{
- RegularTabStat.tsa_used++;
- RegularTabStat.tsa_messages[0]->m_nentries = 0;
+ PgStat_TableXactStatus *trans;
+
+ Assert(xact_state->nest_level == 1);
+ Assert(xact_state->prev == NULL);
+ for (trans = xact_state->first; trans != NULL; trans = trans->next)
+ {
+ PgStat_TableStatus *tabstat;
+
+ Assert(trans->nest_level == 1);
+ Assert(trans->upper == NULL);
+ tabstat = trans->parent;
+ Assert(tabstat->trans == trans);
+ if (isCommit)
+ {
+ tabstat->t_counts.t_new_live_tuples += trans->tuples_inserted;
+ tabstat->t_counts.t_new_dead_tuples += trans->tuples_deleted;
+ }
+ else
+ {
+ /* inserted tuples are dead, deleted tuples are unaffected */
+ tabstat->t_counts.t_new_dead_tuples += trans->tuples_inserted;
+ }
+ tabstat->trans = NULL;
+ }
}
-}
+ pgStatXactStack = NULL;
+ /* Make sure any stats snapshot is thrown away */
+ pgstat_clear_snapshot();
+}
/* ----------
- * pgstat_count_xact_rollback() -
+ * AtEOSubXact_PgStat
*
- * Called from access/transam/xact.c to count transaction rollbacks.
+ * Called from access/transam/xact.c at subtransaction commit/abort.
* ----------
*/
void
-pgstat_count_xact_rollback(void)
+AtEOSubXact_PgStat(bool isCommit, int nestDepth)
{
- if (!pgstat_collect_tuplelevel &&
- !pgstat_collect_blocklevel)
- return;
-
- pgStatXactRollback++;
+ PgStat_SubXactStatus *xact_state;
/*
- * If there was no relation activity yet, just make one existing message
- * buffer used without slots, causing the next report to tell new
- * xact-counters.
+ * Transfer transactional insert/update counts into the next higher
+ * subtransaction state.
*/
- if (RegularTabStat.tsa_alloc == 0)
- more_tabstat_space(&RegularTabStat);
+ xact_state = pgStatXactStack;
+ if (xact_state != NULL &&
+ xact_state->nest_level >= nestDepth)
+ {
+ PgStat_TableXactStatus *trans;
+ PgStat_TableXactStatus *next_trans;
+
+ /* delink xact_state from stack immediately to simplify reuse case */
+ pgStatXactStack = xact_state->prev;
+
+ for (trans = xact_state->first; trans != NULL; trans = next_trans)
+ {
+ PgStat_TableStatus *tabstat;
+
+ next_trans = trans->next;
+ Assert(trans->nest_level == nestDepth);
+ tabstat = trans->parent;
+ Assert(tabstat->trans == trans);
+ if (isCommit)
+ {
+ if (trans->upper && trans->upper->nest_level == nestDepth - 1)
+ {
+ trans->upper->tuples_inserted += trans->tuples_inserted;
+ trans->upper->tuples_deleted += trans->tuples_deleted;
+ tabstat->trans = trans->upper;
+ pfree(trans);
+ }
+ else
+ {
+ /*
+ * When there isn't an immediate parent state, we can
+ * just reuse the record instead of going through a
+ * palloc/pfree pushup (this works since it's all in
+ * TopTransactionContext anyway). We have to re-link
+ * it into the parent level, though, and that might mean
+ * pushing a new entry into the pgStatXactStack.
+ */
+ PgStat_SubXactStatus *upper_xact_state;
+
+ upper_xact_state = get_tabstat_stack_level(nestDepth - 1);
+ trans->next = upper_xact_state->first;
+ upper_xact_state->first = trans;
+ trans->nest_level = nestDepth - 1;
+ }
+ }
+ else
+ {
+ /*
+ * On abort, inserted tuples are dead (and can be bounced out
+ * to the top-level tabstat), deleted tuples are unaffected
+ */
+ tabstat->t_counts.t_new_dead_tuples += trans->tuples_inserted;
+ tabstat->trans = trans->upper;
+ pfree(trans);
+ }
+ }
+ pfree(xact_state);
+ }
+}
+
+
+/*
+ * AtPrepare_PgStat
+ * Save the transactional stats state at 2PC transaction prepare.
+ *
+ * In this phase we just generate 2PC records for all the pending
+ * transaction-dependent stats work.
+ */
+void
+AtPrepare_PgStat(void)
+{
+ PgStat_SubXactStatus *xact_state;
+
+ xact_state = pgStatXactStack;
+ if (xact_state != NULL)
+ {
+ PgStat_TableXactStatus *trans;
+
+ Assert(xact_state->nest_level == 1);
+ Assert(xact_state->prev == NULL);
+ for (trans = xact_state->first; trans != NULL; trans = trans->next)
+ {
+ PgStat_TableStatus *tabstat;
+ TwoPhasePgStatRecord record;
+
+ Assert(trans->nest_level == 1);
+ Assert(trans->upper == NULL);
+ tabstat = trans->parent;
+ Assert(tabstat->trans == trans);
+
+ record.tuples_inserted = trans->tuples_inserted;
+ record.tuples_deleted = trans->tuples_deleted;
+ record.t_id = tabstat->t_id;
+ record.t_shared = tabstat->t_shared;
+
+ RegisterTwoPhaseRecord(TWOPHASE_RM_PGSTAT_ID, 0,
+ &record, sizeof(TwoPhasePgStatRecord));
+ }
+ }
+}
+
+/*
+ * PostPrepare_PgStat
+ * Clean up after successful PREPARE.
+ *
+ * All we need do here is unlink the transaction stats state from the
+ * nontransactional state. The nontransactional action counts will be
+ * reported to the stats collector immediately, while the effects on live
+ * and dead tuple counts are preserved in the 2PC state file.
+ *
+ * Note: AtEOXact_PgStat is not called during PREPARE.
+ */
+void
+PostPrepare_PgStat(void)
+{
+ PgStat_SubXactStatus *xact_state;
- if (RegularTabStat.tsa_used == 0)
+ /*
+ * We don't bother to free any of the transactional state,
+ * since it's all in TopTransactionContext and will go away anyway.
+ */
+ xact_state = pgStatXactStack;
+ if (xact_state != NULL)
{
- RegularTabStat.tsa_used++;
- RegularTabStat.tsa_messages[0]->m_nentries = 0;
+ PgStat_TableXactStatus *trans;
+
+ for (trans = xact_state->first; trans != NULL; trans = trans->next)
+ {
+ PgStat_TableStatus *tabstat;
+
+ tabstat = trans->parent;
+ tabstat->trans = NULL;
+ }
}
+ pgStatXactStack = NULL;
+
+ /* Make sure any stats snapshot is thrown away */
+ pgstat_clear_snapshot();
+}
+
+/*
+ * 2PC processing routine for COMMIT PREPARED case.
+ *
+ * Load the saved counts into our local pgstats state.
+ */
+void
+pgstat_twophase_postcommit(TransactionId xid, uint16 info,
+ void *recdata, uint32 len)
+{
+ TwoPhasePgStatRecord *rec = (TwoPhasePgStatRecord *) recdata;
+ PgStat_TableStatus *pgstat_info;
+
+ /* Find or create a tabstat entry for the rel */
+ pgstat_info = get_tabstat_entry(rec->t_id, rec->t_shared);
+
+ pgstat_info->t_counts.t_new_live_tuples += rec->tuples_inserted;
+ pgstat_info->t_counts.t_new_dead_tuples += rec->tuples_deleted;
+}
+
+/*
+ * 2PC processing routine for ROLLBACK PREPARED case.
+ *
+ * Load the saved counts into our local pgstats state, but treat them
+ * as aborted.
+ */
+void
+pgstat_twophase_postabort(TransactionId xid, uint16 info,
+ void *recdata, uint32 len)
+{
+ TwoPhasePgStatRecord *rec = (TwoPhasePgStatRecord *) recdata;
+ PgStat_TableStatus *pgstat_info;
+
+ /* Find or create a tabstat entry for the rel */
+ pgstat_info = get_tabstat_entry(rec->t_id, rec->t_shared);
+
+ /* inserted tuples are dead, deleted tuples are no-ops */
+ pgstat_info->t_counts.t_new_dead_tuples += rec->tuples_inserted;
}
return localNumBackends;
}
+/*
+ * ---------
+ * pgstat_fetch_global() -
+ *
+ * Support function for the SQL-callable pgstat* functions. Returns
+ * a pointer to the global statistics struct.
+ * ---------
+ */
+PgStat_GlobalStats *
+pgstat_fetch_global(void)
+{
+ backend_read_statsfile();
+
+ return &globalStats;
+}
+
/* ------------------------------------------------------------
* Functions for management of the shared-memory PgBackendStatus array
MyBEEntry = &BackendStatusArray[MyBackendId - 1];
/*
- * To minimize the time spent modifying the entry, fetch all the
- * needed data first.
+ * To minimize the time spent modifying the entry, fetch all the needed
+ * data first.
*
* If we have a MyProcPort, use its session start time (for consistency,
* and to save a kernel call).
/*
* Initialize my status entry, following the protocol of bumping
- * st_changecount before and after; and make sure it's even afterwards.
- * We use a volatile pointer here to ensure the compiler doesn't try to
- * get cute.
+ * st_changecount before and after; and make sure it's even afterwards. We
+ * use a volatile pointer here to ensure the compiler doesn't try to get
+ * cute.
*/
beentry = MyBEEntry;
- do {
+ do
+ {
beentry->st_changecount++;
} while ((beentry->st_changecount & 1) == 0);
beentry->st_procpid = MyProcPid;
beentry->st_proc_start_timestamp = proc_start_timestamp;
beentry->st_activity_start_timestamp = 0;
+ beentry->st_txn_start_timestamp = 0;
beentry->st_databaseid = MyDatabaseId;
beentry->st_userid = userid;
beentry->st_clientaddr = clientaddr;
+ beentry->st_waiting = false;
beentry->st_activity[0] = '\0';
/* Also make sure the last byte in the string area is always 0 */
beentry->st_activity[PGBE_ACTIVITY_SIZE - 1] = '\0';
static void
pgstat_beshutdown_hook(int code, Datum arg)
{
- volatile PgBackendStatus *beentry;
+ volatile PgBackendStatus *beentry = MyBEEntry;
- pgstat_report_tabstat();
+ pgstat_report_tabstat(true);
/*
- * Clear my status entry, following the protocol of bumping
- * st_changecount before and after. We use a volatile pointer here
- * to ensure the compiler doesn't try to get cute.
+ * Clear my status entry, following the protocol of bumping st_changecount
+ * before and after. We use a volatile pointer here to ensure the
+ * compiler doesn't try to get cute.
*/
- beentry = MyBEEntry;
beentry->st_changecount++;
beentry->st_procpid = 0; /* mark invalid */
void
pgstat_report_activity(const char *cmd_str)
{
- volatile PgBackendStatus *beentry;
+ volatile PgBackendStatus *beentry = MyBEEntry;
TimestampTz start_timestamp;
int len;
- if (!pgstat_collect_querystring)
+ if (!pgstat_collect_querystring || !beentry)
return;
/*
- * To minimize the time spent modifying the entry, fetch all the
- * needed data first.
+ * To minimize the time spent modifying the entry, fetch all the needed
+ * data first.
*/
start_timestamp = GetCurrentStatementStartTimestamp();
/*
* Update my status entry, following the protocol of bumping
- * st_changecount before and after. We use a volatile pointer here
- * to ensure the compiler doesn't try to get cute.
+ * st_changecount before and after. We use a volatile pointer here to
+ * ensure the compiler doesn't try to get cute.
*/
- beentry = MyBEEntry;
beentry->st_changecount++;
beentry->st_activity_start_timestamp = start_timestamp;
Assert((beentry->st_changecount & 1) == 0);
}
+/*
+ * Set the current transaction start timestamp to the specified
+ * value. If there is no current active transaction, this is signified
+ * by 0.
+ */
+void
+pgstat_report_txn_timestamp(TimestampTz tstamp)
+{
+ volatile PgBackendStatus *beentry = MyBEEntry;
+
+ if (!pgstat_collect_querystring || !beentry)
+ return;
+
+ /*
+ * Update my status entry, following the protocol of bumping
+ * st_changecount before and after. We use a volatile pointer
+ * here to ensure the compiler doesn't try to get cute.
+ */
+ beentry->st_changecount++;
+ beentry->st_txn_start_timestamp = tstamp;
+ beentry->st_changecount++;
+ Assert((beentry->st_changecount & 1) == 0);
+}
+
+/* ----------
+ * pgstat_report_waiting() -
+ *
+ * Called from lock manager to report beginning or end of a lock wait.
+ *
+ * NB: this *must* be able to survive being called before MyBEEntry has been
+ * initialized.
+ * ----------
+ */
+void
+pgstat_report_waiting(bool waiting)
+{
+ volatile PgBackendStatus *beentry = MyBEEntry;
+
+ if (!pgstat_collect_querystring || !beentry)
+ return;
+
+ /*
+ * Since this is a single-byte field in a struct that only this process
+ * may modify, there seems no need to bother with the st_changecount
+ * protocol. The update must appear atomic in any case.
+ */
+ beentry->st_waiting = waiting;
+}
+
/* ----------
* pgstat_read_current_status() -
static void
pgstat_read_current_status(void)
{
- TransactionId topXid = GetTopTransactionId();
volatile PgBackendStatus *beentry;
+ PgBackendStatus *localtable;
PgBackendStatus *localentry;
int i;
Assert(!pgStatRunningInCollector);
- if (TransactionIdEquals(pgStatLocalStatusXact, topXid))
+ if (localBackendStatusTable)
return; /* already done */
- localBackendStatusTable = (PgBackendStatus *)
- MemoryContextAlloc(TopTransactionContext,
+ pgstat_setup_memcxt();
+
+ localtable = (PgBackendStatus *)
+ MemoryContextAlloc(pgStatLocalContext,
sizeof(PgBackendStatus) * MaxBackends);
localNumBackends = 0;
beentry = BackendStatusArray;
- localentry = localBackendStatusTable;
+ localentry = localtable;
for (i = 1; i <= MaxBackends; i++)
{
/*
- * Follow the protocol of retrying if st_changecount changes while
- * we copy the entry, or if it's odd. (The check for odd is needed
- * to cover the case where we are able to completely copy the entry
- * while the source backend is between increment steps.) We use a
- * volatile pointer here to ensure the compiler doesn't try to get
- * cute.
+ * Follow the protocol of retrying if st_changecount changes while we
+ * copy the entry, or if it's odd. (The check for odd is needed to
+ * cover the case where we are able to completely copy the entry while
+ * the source backend is between increment steps.) We use a volatile
+ * pointer here to ensure the compiler doesn't try to get cute.
*/
for (;;)
{
- int save_changecount = beentry->st_changecount;
+ int save_changecount = beentry->st_changecount;
/*
- * XXX if PGBE_ACTIVITY_SIZE is really large, it might be best
- * to use strcpy not memcpy for copying the activity string?
+ * XXX if PGBE_ACTIVITY_SIZE is really large, it might be best to
+ * use strcpy not memcpy for copying the activity string?
*/
memcpy(localentry, (char *) beentry, sizeof(PgBackendStatus));
}
}
- pgStatLocalStatusXact = topXid;
+ /* Set the pointer only after completion of a valid table */
+ localBackendStatusTable = localtable;
}
static void
pgstat_send(void *msg, int len)
{
+ int rc;
+
if (pgStatSock < 0)
return;
((PgStat_MsgHdr *) msg)->m_size = len;
+ /* We'll retry after EINTR, but ignore all other failures */
+ do
+ {
+ rc = send(pgStatSock, msg, len, 0);
+ } while (rc < 0 && errno == EINTR);
+
#ifdef USE_ASSERT_CHECKING
- if (send(pgStatSock, msg, len, 0) < 0)
+ /* In debug builds, log send failures ... */
+ if (rc < 0)
elog(LOG, "could not send to statistics collector: %m");
-#else
- send(pgStatSock, msg, len, 0);
- /* We deliberately ignore any error from send() */
#endif
}
+/* ----------
+ * pgstat_send_bgwriter() -
+ *
+ * Send bgwriter statistics to the collector
+ * ----------
+ */
+void
+pgstat_send_bgwriter(void)
+{
+ /* We assume this initializes to zeroes */
+ static const PgStat_MsgBgWriter all_zeroes;
+
+ /*
+ * This function can be called even if nothing at all has happened.
+ * In this case, avoid sending a completely empty message to
+ * the stats collector.
+ */
+ if (memcmp(&BgWriterStats, &all_zeroes, sizeof(PgStat_MsgBgWriter)) == 0)
+ return;
+
+ /*
+ * Prepare and send the message
+ */
+ pgstat_setheader(&BgWriterStats.m_hdr, PGSTAT_MTYPE_BGWRITER);
+ pgstat_send(&BgWriterStats, sizeof(BgWriterStats));
+
+ /*
+ * Clear out the statistics buffer, so it can be re-used.
+ */
+ MemSet(&BgWriterStats, 0, sizeof(BgWriterStats));
+}
+
/* ----------
* PgstatCollectorMain() -
*
- * Start up the statistics collector process. This is the body of the
+ * Start up the statistics collector process. This is the body of the
* postmaster child process.
*
* The argc/argv parameters are valid only in EXEC_BACKEND case.
bool need_timer = false;
int len;
PgStat_Msg msg;
+
+#ifndef WIN32
#ifdef HAVE_POLL
struct pollfd input_fd;
#else
struct timeval sel_timeout;
fd_set rfds;
+#endif
#endif
IsUnderPostmaster = true; /* we are a postmaster subprocess now */
MyProcPid = getpid(); /* reset MyProcPid */
+ /*
+ * If possible, make this process a group leader, so that the postmaster
+ * can signal any child processes too. (pgstat probably never has
+ * any child processes, but for consistency we make all postmaster
+ * child processes do this.)
+ */
+#ifdef HAVE_SETSID
+ if (setsid() < 0)
+ elog(FATAL, "setsid() failed: %m");
+#endif
+
/*
* Ignore all signals usually bound to some action in the postmaster,
* except SIGQUIT and SIGALRM.
/* Preset the delay between status file writes */
MemSet(&write_timeout, 0, sizeof(struct itimerval));
write_timeout.it_value.tv_sec = PGSTAT_STAT_INTERVAL / 1000;
- write_timeout.it_value.tv_usec = PGSTAT_STAT_INTERVAL % 1000;
+ write_timeout.it_value.tv_usec = (PGSTAT_STAT_INTERVAL % 1000) * 1000;
/*
* Read in an existing statistics stats file or initialize the stats to
* zero.
*/
pgStatRunningInCollector = true;
- pgstat_read_statsfile(&pgStatDBHash, InvalidOid);
+ pgStatDBHash = pgstat_read_statsfile(InvalidOid);
/*
- * Setup the descriptor set for select(2). Since only one bit in the
- * set ever changes, we need not repeat FD_ZERO each time.
+ * Setup the descriptor set for select(2). Since only one bit in the set
+ * ever changes, we need not repeat FD_ZERO each time.
*/
-#ifndef HAVE_POLL
+#if !defined(HAVE_POLL) && !defined(WIN32)
FD_ZERO(&rfds);
#endif
* Loop to process messages until we get SIGQUIT or detect ungraceful
* death of our parent postmaster.
*
- * For performance reasons, we don't want to do a PostmasterIsAlive()
- * test after every message; instead, do it at statwrite time and if
+ * For performance reasons, we don't want to do a PostmasterIsAlive() test
+ * after every message; instead, do it at statwrite time and if
* select()/poll() is interrupted by timeout.
*/
for (;;)
{
- int got_data;
+ int got_data;
/*
* Quit if we get SIGQUIT from the postmaster.
break;
/*
- * If time to write the stats file, do so. Note that the alarm
+ * If time to write the stats file, do so. Note that the alarm
* interrupt isn't re-enabled immediately, but only after we next
* receive a stats message; so no cycles are wasted when there is
* nothing going on.
* Wait for a message to arrive; but not for more than
* PGSTAT_SELECT_TIMEOUT seconds. (This determines how quickly we will
* shut down after an ungraceful postmaster termination; so it needn't
- * be very fast. However, on some systems SIGQUIT won't interrupt
- * the poll/select call, so this also limits speed of response to
- * SIGQUIT, which is more important.)
+ * be very fast. However, on some systems SIGQUIT won't interrupt the
+ * poll/select call, so this also limits speed of response to SIGQUIT,
+ * which is more important.)
*
- * We use poll(2) if available, otherwise select(2)
+ * We use poll(2) if available, otherwise select(2).
+ * Win32 has its own implementation.
*/
+#ifndef WIN32
#ifdef HAVE_POLL
input_fd.fd = pgStatSock;
input_fd.events = POLLIN | POLLERR;
}
got_data = (input_fd.revents != 0);
-
#else /* !HAVE_POLL */
FD_SET(pgStatSock, &rfds);
}
got_data = FD_ISSET(pgStatSock, &rfds);
-
#endif /* HAVE_POLL */
+#else /* WIN32 */
+ got_data = pgwin32_waitforsinglesocket(pgStatSock, FD_READ,
+ PGSTAT_SELECT_TIMEOUT*1000);
+#endif
/*
* If there is a message on the socket, read it and check for
len = recv(pgStatSock, (char *) &msg,
sizeof(PgStat_Msg), 0);
if (len < 0)
+ {
+ if (errno == EINTR)
+ continue;
ereport(ERROR,
(errcode_for_socket_access(),
errmsg("could not read statistics message: %m")));
+ }
/*
* We ignore messages that are smaller than our common header
pgstat_recv_analyze((PgStat_MsgAnalyze *) &msg, len);
break;
+ case PGSTAT_MTYPE_BGWRITER:
+ pgstat_recv_bgwriter((PgStat_MsgBgWriter *) &msg, len);
+ break;
+
default:
break;
}
{
if (setitimer(ITIMER_REAL, &write_timeout, NULL))
ereport(ERROR,
- (errmsg("could not set statistics collector timer: %m")));
+ (errmsg("could not set statistics collector timer: %m")));
need_timer = false;
}
}
else
{
/*
- * We can only get here if the select/poll timeout elapsed.
- * Check for postmaster death.
+ * We can only get here if the select/poll timeout elapsed. Check
+ * for postmaster death.
*/
if (!PostmasterIsAlive(true))
break;
}
- } /* end of message-processing loop */
+ } /* end of message-processing loop */
/*
* Save the final stats to reuse at next startup.
result->n_xact_rollback = 0;
result->n_blocks_fetched = 0;
result->n_blocks_hit = 0;
+ result->n_tuples_returned = 0;
+ result->n_tuples_fetched = 0;
+ result->n_tuples_inserted = 0;
+ result->n_tuples_updated = 0;
+ result->n_tuples_deleted = 0;
result->last_autovac_time = 0;
memset(&hash_ctl, 0, sizeof(hash_ctl));
format_id = PGSTAT_FILE_FORMAT_ID;
fwrite(&format_id, sizeof(format_id), 1, fpout);
+ /*
+ * Write global stats struct
+ */
+ fwrite(&globalStats, sizeof(globalStats), 1, fpout);
+
/*
* Walk through the database table.
*/
while ((dbentry = (PgStat_StatDBEntry *) hash_seq_search(&hstat)) != NULL)
{
/*
- * Write out the DB entry including the number of live backends.
- * We don't write the tables pointer since it's of no use to any
- * other process.
+ * Write out the DB entry including the number of live backends. We
+ * don't write the tables pointer since it's of no use to any other
+ * process.
*/
fputc('D', fpout);
fwrite(dbentry, offsetof(PgStat_StatDBEntry, tables), 1, fpout);
{
ereport(LOG,
(errcode_for_file_access(),
- errmsg("could not write temporary statistics file \"%s\": %m",
- PGSTAT_STAT_TMPFILE)));
+ errmsg("could not write temporary statistics file \"%s\": %m",
+ PGSTAT_STAT_TMPFILE)));
fclose(fpout);
unlink(PGSTAT_STAT_TMPFILE);
}
* databases' hash table (whose entries point to the tables' hash tables).
* ----------
*/
-static void
-pgstat_read_statsfile(HTAB **dbhash, Oid onlydb)
+static HTAB *
+pgstat_read_statsfile(Oid onlydb)
{
PgStat_StatDBEntry *dbentry;
PgStat_StatDBEntry dbbuf;
PgStat_StatTabEntry *tabentry;
PgStat_StatTabEntry tabbuf;
HASHCTL hash_ctl;
+ HTAB *dbhash;
HTAB *tabhash = NULL;
FILE *fpin;
int32 format_id;
bool found;
- MemoryContext use_mcxt;
- int mcxt_flags;
/*
- * If running in the collector or the autovacuum process, we use the
- * DynaHashCxt memory context. If running in a backend, we use the
- * TopTransactionContext instead, so the caller must only know the last
- * XactId when this call happened to know if his tables are still valid or
- * already gone!
+ * The tables will live in pgStatLocalContext.
*/
- if (pgStatRunningInCollector || IsAutoVacuumProcess())
- {
- use_mcxt = NULL;
- mcxt_flags = 0;
- }
- else
- {
- use_mcxt = TopTransactionContext;
- mcxt_flags = HASH_CONTEXT;
- }
+ pgstat_setup_memcxt();
/*
* Create the DB hashtable
hash_ctl.keysize = sizeof(Oid);
hash_ctl.entrysize = sizeof(PgStat_StatDBEntry);
hash_ctl.hash = oid_hash;
- hash_ctl.hcxt = use_mcxt;
- *dbhash = hash_create("Databases hash", PGSTAT_DB_HASH_SIZE, &hash_ctl,
- HASH_ELEM | HASH_FUNCTION | mcxt_flags);
+ hash_ctl.hcxt = pgStatLocalContext;
+ dbhash = hash_create("Databases hash", PGSTAT_DB_HASH_SIZE, &hash_ctl,
+ HASH_ELEM | HASH_FUNCTION | HASH_CONTEXT);
+
+ /*
+ * Clear out global statistics so they start from zero in case we can't
+ * load an existing statsfile.
+ */
+ memset(&globalStats, 0, sizeof(globalStats));
/*
* Try to open the status file. If it doesn't exist, the backends simply
* with empty counters.
*/
if ((fpin = AllocateFile(PGSTAT_STAT_FILENAME, PG_BINARY_R)) == NULL)
- return;
+ return dbhash;
/*
* Verify it's of the expected format.
goto done;
}
+ /*
+ * Read global stats struct
+ */
+ if (fread(&globalStats, 1, sizeof(globalStats), fpin) != sizeof(globalStats))
+ {
+ ereport(pgStatRunningInCollector ? LOG : WARNING,
+ (errmsg("corrupted pgstat.stat file")));
+ goto done;
+ }
+
/*
* We found an existing collector stats file. Read it and put all the
* hashtable entries into place.
/*
* Add to the DB hash
*/
- dbentry = (PgStat_StatDBEntry *) hash_search(*dbhash,
+ dbentry = (PgStat_StatDBEntry *) hash_search(dbhash,
(void *) &dbbuf.databaseid,
HASH_ENTER,
&found);
hash_ctl.keysize = sizeof(Oid);
hash_ctl.entrysize = sizeof(PgStat_StatTabEntry);
hash_ctl.hash = oid_hash;
- hash_ctl.hcxt = use_mcxt;
+ hash_ctl.hcxt = pgStatLocalContext;
dbentry->tables = hash_create("Per-database table",
PGSTAT_TAB_HASH_SIZE,
&hash_ctl,
- HASH_ELEM | HASH_FUNCTION | mcxt_flags);
+ HASH_ELEM | HASH_FUNCTION | HASH_CONTEXT);
/*
* Arrange that following 'T's add entries to this database's
done:
FreeFile(fpin);
+
+ return dbhash;
}
/*
- * If not done for this transaction, read the statistics collector
- * stats file into some hash tables.
- *
- * Because we store the tables in TopTransactionContext, the result
- * is good for the entire current main transaction.
- *
- * Inside the autovacuum process, the statfile is assumed to be valid
- * "forever", that is one iteration, within one database. This means
- * we only consider the statistics as they were when the autovacuum
- * iteration started.
+ * If not already done, read the statistics collector stats file into
+ * some hash tables. The results will be kept until pgstat_clear_snapshot()
+ * is called (typically, at end of transaction).
*/
static void
backend_read_statsfile(void)
{
- if (IsAutoVacuumProcess())
- {
- /* already read it? */
- if (pgStatDBHash)
- return;
- Assert(!pgStatRunningInCollector);
- pgstat_read_statsfile(&pgStatDBHash, InvalidOid);
- }
+ /* already read it? */
+ if (pgStatDBHash)
+ return;
+ Assert(!pgStatRunningInCollector);
+
+ /* Autovacuum launcher wants stats about all databases */
+ if (IsAutoVacuumLauncherProcess())
+ pgStatDBHash = pgstat_read_statsfile(InvalidOid);
else
- {
- TransactionId topXid = GetTopTransactionId();
+ pgStatDBHash = pgstat_read_statsfile(MyDatabaseId);
+}
- if (!TransactionIdEquals(pgStatDBHashXact, topXid))
- {
- Assert(!pgStatRunningInCollector);
- pgstat_read_statsfile(&pgStatDBHash, MyDatabaseId);
- pgStatDBHashXact = topXid;
- }
- }
+
+/* ----------
+ * pgstat_setup_memcxt() -
+ *
+ * Create pgStatLocalContext, if not already done.
+ * ----------
+ */
+static void
+pgstat_setup_memcxt(void)
+{
+ if (!pgStatLocalContext)
+ pgStatLocalContext = AllocSetContextCreate(TopMemoryContext,
+ "Statistics snapshot",
+ ALLOCSET_SMALL_MINSIZE,
+ ALLOCSET_SMALL_INITSIZE,
+ ALLOCSET_SMALL_MAXSIZE);
+}
+
+
+/* ----------
+ * pgstat_clear_snapshot() -
+ *
+ * Discard any data collected in the current transaction. Any subsequent
+ * request will cause new snapshots to be read.
+ *
+ * This is also invoked during transaction commit or abort to discard
+ * the no-longer-wanted snapshot.
+ * ----------
+ */
+void
+pgstat_clear_snapshot(void)
+{
+ /* Release memory, if any was allocated */
+ if (pgStatLocalContext)
+ MemoryContextDelete(pgStatLocalContext);
+
+ /* Reset variables */
+ pgStatLocalContext = NULL;
+ pgStatDBHash = NULL;
+ localBackendStatusTable = NULL;
+ localNumBackends = 0;
}
+
/* ----------
* pgstat_recv_tabstat() -
*
* If it's a new table entry, initialize counters to the values we
* just got.
*/
- tabentry->numscans = tabmsg[i].t_numscans;
- tabentry->tuples_returned = tabmsg[i].t_tuples_returned;
- tabentry->tuples_fetched = tabmsg[i].t_tuples_fetched;
- tabentry->tuples_inserted = tabmsg[i].t_tuples_inserted;
- tabentry->tuples_updated = tabmsg[i].t_tuples_updated;
- tabentry->tuples_deleted = tabmsg[i].t_tuples_deleted;
-
- tabentry->n_live_tuples = tabmsg[i].t_tuples_inserted;
- tabentry->n_dead_tuples = tabmsg[i].t_tuples_updated +
- tabmsg[i].t_tuples_deleted;
+ tabentry->numscans = tabmsg[i].t_counts.t_numscans;
+ tabentry->tuples_returned = tabmsg[i].t_counts.t_tuples_returned;
+ tabentry->tuples_fetched = tabmsg[i].t_counts.t_tuples_fetched;
+ tabentry->tuples_inserted = tabmsg[i].t_counts.t_tuples_inserted;
+ tabentry->tuples_updated = tabmsg[i].t_counts.t_tuples_updated;
+ tabentry->tuples_deleted = tabmsg[i].t_counts.t_tuples_deleted;
+ tabentry->n_live_tuples = tabmsg[i].t_counts.t_new_live_tuples;
+ tabentry->n_dead_tuples = tabmsg[i].t_counts.t_new_dead_tuples;
+ tabentry->blocks_fetched = tabmsg[i].t_counts.t_blocks_fetched;
+ tabentry->blocks_hit = tabmsg[i].t_counts.t_blocks_hit;
+
tabentry->last_anl_tuples = 0;
tabentry->vacuum_timestamp = 0;
tabentry->autovac_vacuum_timestamp = 0;
tabentry->analyze_timestamp = 0;
tabentry->autovac_analyze_timestamp = 0;
-
- tabentry->blocks_fetched = tabmsg[i].t_blocks_fetched;
- tabentry->blocks_hit = tabmsg[i].t_blocks_hit;
}
else
{
/*
* Otherwise add the values to the existing entry.
*/
- tabentry->numscans += tabmsg[i].t_numscans;
- tabentry->tuples_returned += tabmsg[i].t_tuples_returned;
- tabentry->tuples_fetched += tabmsg[i].t_tuples_fetched;
- tabentry->tuples_inserted += tabmsg[i].t_tuples_inserted;
- tabentry->tuples_updated += tabmsg[i].t_tuples_updated;
- tabentry->tuples_deleted += tabmsg[i].t_tuples_deleted;
-
- tabentry->n_live_tuples += tabmsg[i].t_tuples_inserted -
- tabmsg[i].t_tuples_deleted;
- tabentry->n_dead_tuples += tabmsg[i].t_tuples_updated +
- tabmsg[i].t_tuples_deleted;
-
- tabentry->blocks_fetched += tabmsg[i].t_blocks_fetched;
- tabentry->blocks_hit += tabmsg[i].t_blocks_hit;
+ tabentry->numscans += tabmsg[i].t_counts.t_numscans;
+ tabentry->tuples_returned += tabmsg[i].t_counts.t_tuples_returned;
+ tabentry->tuples_fetched += tabmsg[i].t_counts.t_tuples_fetched;
+ tabentry->tuples_inserted += tabmsg[i].t_counts.t_tuples_inserted;
+ tabentry->tuples_updated += tabmsg[i].t_counts.t_tuples_updated;
+ tabentry->tuples_deleted += tabmsg[i].t_counts.t_tuples_deleted;
+ tabentry->n_live_tuples += tabmsg[i].t_counts.t_new_live_tuples;
+ tabentry->n_dead_tuples += tabmsg[i].t_counts.t_new_dead_tuples;
+ tabentry->blocks_fetched += tabmsg[i].t_counts.t_blocks_fetched;
+ tabentry->blocks_hit += tabmsg[i].t_counts.t_blocks_hit;
}
/*
- * And add the block IO to the database entry.
+ * Add per-table stats to the per-database entry, too.
*/
- dbentry->n_blocks_fetched += tabmsg[i].t_blocks_fetched;
- dbentry->n_blocks_hit += tabmsg[i].t_blocks_hit;
+ dbentry->n_tuples_returned += tabmsg[i].t_counts.t_tuples_returned;
+ dbentry->n_tuples_fetched += tabmsg[i].t_counts.t_tuples_fetched;
+ dbentry->n_tuples_inserted += tabmsg[i].t_counts.t_tuples_inserted;
+ dbentry->n_tuples_updated += tabmsg[i].t_counts.t_tuples_updated;
+ dbentry->n_tuples_deleted += tabmsg[i].t_counts.t_tuples_deleted;
+ dbentry->n_blocks_fetched += tabmsg[i].t_counts.t_blocks_fetched;
+ dbentry->n_blocks_hit += tabmsg[i].t_counts.t_blocks_hit;
}
}
if (tabentry == NULL)
return;
- if (msg->m_autovacuum)
+ if (msg->m_autovacuum)
tabentry->autovac_vacuum_timestamp = msg->m_vacuumtime;
- else
- tabentry->vacuum_timestamp = msg->m_vacuumtime;
+ else
+ tabentry->vacuum_timestamp = msg->m_vacuumtime;
tabentry->n_live_tuples = msg->m_tuples;
tabentry->n_dead_tuples = 0;
if (msg->m_analyze)
if (tabentry == NULL)
return;
- if (msg->m_autovacuum)
+ if (msg->m_autovacuum)
tabentry->autovac_analyze_timestamp = msg->m_analyzetime;
- else
+ else
tabentry->analyze_timestamp = msg->m_analyzetime;
tabentry->n_live_tuples = msg->m_live_tuples;
tabentry->n_dead_tuples = msg->m_dead_tuples;
tabentry->last_anl_tuples = msg->m_live_tuples + msg->m_dead_tuples;
}
+
+
+/* ----------
+ * pgstat_recv_bgwriter() -
+ *
+ * Process a BGWRITER message.
+ * ----------
+ */
+static void
+pgstat_recv_bgwriter(PgStat_MsgBgWriter *msg, int len)
+{
+ globalStats.timed_checkpoints += msg->m_timed_checkpoints;
+ globalStats.requested_checkpoints += msg->m_requested_checkpoints;
+ globalStats.buf_written_checkpoints += msg->m_buf_written_checkpoints;
+ globalStats.buf_written_lru += msg->m_buf_written_lru;
+ globalStats.buf_written_all += msg->m_buf_written_all;
+ globalStats.maxwritten_lru += msg->m_maxwritten_lru;
+ globalStats.maxwritten_all += msg->m_maxwritten_all;
+}