X-Git-Url: https://granicus.if.org/sourcecode?a=blobdiff_plain;f=src%2Fbackend%2Fpostmaster%2Fpgstat.c;h=1d80c311d879d9cf9009621860cda3ab19c6dea9;hb=b4b6923e03f4d29636a94f6f4cc2f5cf6298b8c8;hp=fd19d5741c2fb930103ce4943cd818ca97fd8404;hpb=7d4c9a5793b49633be0fae7653552f3fb4a812c0;p=postgresql diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c index fd19d5741c..1d80c311d8 100644 --- a/src/backend/postmaster/pgstat.c +++ b/src/backend/postmaster/pgstat.c @@ -11,9 +11,9 @@ * - Add a pgstat config column to pg_database, so this * entire thing can be enabled/disabled on a per db basis. * - * Copyright (c) 2001-2007, PostgreSQL Global Development Group + * Copyright (c) 2001-2011, PostgreSQL Global Development Group * - * $PostgreSQL: pgsql/src/backend/postmaster/pgstat.c,v 1.151 2007/03/28 22:17:12 alvherre Exp $ + * src/backend/postmaster/pgstat.c * ---------- */ #include "postgres.h" @@ -39,13 +39,16 @@ #include "access/heapam.h" #include "access/transam.h" +#include "access/twophase_rmgr.h" #include "access/xact.h" #include "catalog/pg_database.h" +#include "catalog/pg_proc.h" #include "libpq/ip.h" #include "libpq/libpq.h" #include "libpq/pqsignal.h" #include "mb/pg_wchar.h" #include "miscadmin.h" +#include "pg_trace.h" #include "postmaster/autovacuum.h" #include "postmaster/fork_process.h" #include "postmaster/postmaster.h" @@ -54,23 +57,33 @@ #include "storage/ipc.h" #include "storage/pg_shmem.h" #include "storage/pmsignal.h" +#include "storage/procsignal.h" +#include "utils/guc.h" #include "utils/memutils.h" #include "utils/ps_status.h" +#include "utils/rel.h" +#include "utils/tqual.h" /* ---------- * Paths for the statistics files (relative to installation's $PGDATA). * ---------- */ -#define PGSTAT_STAT_FILENAME "global/pgstat.stat" -#define PGSTAT_STAT_TMPFILE "global/pgstat.tmp" +#define PGSTAT_STAT_PERMANENT_FILENAME "global/pgstat.stat" +#define PGSTAT_STAT_PERMANENT_TMPFILE "global/pgstat.tmp" /* ---------- * Timer definitions. * ---------- */ -#define PGSTAT_STAT_INTERVAL 500 /* How often to write the status file; - * in milliseconds. */ +#define PGSTAT_STAT_INTERVAL 500 /* Minimum time between stats file + * updates; in milliseconds. */ + +#define PGSTAT_RETRY_DELAY 10 /* How long to wait between statistics + * update requests; in milliseconds. */ + +#define PGSTAT_MAX_WAIT_TIME 5000 /* Maximum time to wait for a stats + * file update; in milliseconds. */ #define PGSTAT_RESTART_INTERVAL 60 /* How often to attempt to restart a * failed statistics collector; in @@ -79,6 +92,8 @@ #define PGSTAT_SELECT_TIMEOUT 2 /* How often to check for postmaster * death; in seconds. */ +#define PGSTAT_POLL_LOOP_COUNT (PGSTAT_MAX_WAIT_TIME / PGSTAT_RETRY_DELAY) + /* ---------- * The initial size hints for the hash tables used in the collector. @@ -86,23 +101,37 @@ */ #define PGSTAT_DB_HASH_SIZE 16 #define PGSTAT_TAB_HASH_SIZE 512 +#define PGSTAT_FUNCTION_HASH_SIZE 512 /* ---------- * GUC parameters * ---------- */ -bool pgstat_collect_startcollector = true; -bool pgstat_collect_resetonpmstart = false; -bool pgstat_collect_tuplelevel = false; -bool pgstat_collect_blocklevel = false; -bool pgstat_collect_querystring = false; +bool pgstat_track_activities = false; +bool pgstat_track_counts = false; +int pgstat_track_functions = TRACK_FUNC_OFF; +int pgstat_track_activity_query_size = 1024; + +/* ---------- + * Built from GUC parameter + * ---------- + */ +char *pgstat_stat_filename = NULL; +char *pgstat_stat_tmpname = NULL; + +/* + * BgWriter global statistics counters (unused in other processes). + * Stored directly in a stats message structure so it can be sent + * without needing to copy things around. We assume this inits to zeroes. + */ +PgStat_MsgBgWriter BgWriterStats; /* ---------- * Local data * ---------- */ -NON_EXEC_STATIC int pgStatSock = -1; +NON_EXEC_STATIC pgsocket pgStatSock = PGINVALID_SOCKET; static struct sockaddr_storage pgStatAddr; @@ -111,32 +140,98 @@ static time_t last_pgstat_start_time; static bool pgStatRunningInCollector = false; /* - * Place where backends store per-table info to be sent to the collector. - * We store shared relations separately from non-shared ones, to be able to - * send them in separate messages. + * Structures in which backends store per-table info that's waiting to be + * sent to the collector. + * + * NOTE: once allocated, TabStatusArray structures are never moved or deleted + * for the life of the backend. Also, we zero out the t_id fields of the + * contained PgStat_TableStatus structs whenever they are not actively in use. + * This allows relcache pgstat_info pointers to be treated as long-lived data, + * avoiding repeated searches in pgstat_initstats() when a relation is + * repeatedly opened during a transaction. */ -typedef struct TabStatArray +#define TABSTAT_QUANTUM 100 /* we alloc this many at a time */ + +typedef struct TabStatusArray { - int tsa_alloc; /* num allocated */ - int tsa_used; /* num actually used */ - PgStat_MsgTabstat **tsa_messages; /* the array itself */ -} TabStatArray; + struct TabStatusArray *tsa_next; /* link to next array, if any */ + int tsa_used; /* # entries currently used */ + PgStat_TableStatus tsa_entries[TABSTAT_QUANTUM]; /* per-table data */ +} TabStatusArray; + +static TabStatusArray *pgStatTabList = NULL; + +/* + * Backends store per-function info that's waiting to be sent to the collector + * in this hash table (indexed by function OID). + */ +static HTAB *pgStatFunctions = NULL; -#define TABSTAT_QUANTUM 4 /* we alloc this many at a time */ +/* + * Indicates if backend has some function stats that it hasn't yet + * sent to the collector. + */ +static bool have_function_stats = false; + +/* + * Tuple insertion/deletion counts for an open transaction can't be propagated + * into PgStat_TableStatus counters until we know if it is going to commit + * or abort. Hence, we keep these counts in per-subxact structs that live + * in TopTransactionContext. This data structure is designed on the assumption + * that subxacts won't usually modify very many tables. + */ +typedef struct PgStat_SubXactStatus +{ + int nest_level; /* subtransaction nest level */ + struct PgStat_SubXactStatus *prev; /* higher-level subxact if any */ + PgStat_TableXactStatus *first; /* head of list for this subxact */ +} PgStat_SubXactStatus; -static TabStatArray RegularTabStat = {0, 0, NULL}; -static TabStatArray SharedTabStat = {0, 0, NULL}; +static PgStat_SubXactStatus *pgStatXactStack = NULL; static int pgStatXactCommit = 0; static int pgStatXactRollback = 0; +/* Record that's written to 2PC state file when pgstat state is persisted */ +typedef struct TwoPhasePgStatRecord +{ + PgStat_Counter tuples_inserted; /* tuples inserted in xact */ + PgStat_Counter tuples_updated; /* tuples updated in xact */ + PgStat_Counter tuples_deleted; /* tuples deleted in xact */ + Oid t_id; /* table's OID */ + bool t_shared; /* is it a shared catalog? */ +} TwoPhasePgStatRecord; + +/* + * Info about current "snapshot" of stats file + */ static MemoryContext pgStatLocalContext = NULL; static HTAB *pgStatDBHash = NULL; static PgBackendStatus *localBackendStatusTable = NULL; static int localNumBackends = 0; +/* + * Cluster wide statistics, kept in the stats collector. + * Contains statistics that are not collected per database + * or per table. + */ +static PgStat_GlobalStats globalStats; + +/* Last time the collector successfully wrote the stats file */ +static TimestampTz last_statwrite; + +/* Latest statistics request time from backends */ +static TimestampTz last_statrequest; + static volatile bool need_exit = false; -static volatile bool need_statwrite = false; +static volatile bool got_SIGHUP = false; + +/* + * Total time charged to functions so far in the current backend. + * We use this to help separate "self" and "other" time charges. + * (We assume this initializes to zero.) + */ +static instr_time total_func_time; /* ---------- @@ -149,28 +244,42 @@ static pid_t pgstat_forkexec(void); NON_EXEC_STATIC void PgstatCollectorMain(int argc, char *argv[]); static void pgstat_exit(SIGNAL_ARGS); -static void force_statwrite(SIGNAL_ARGS); static void pgstat_beshutdown_hook(int code, Datum arg); +static void pgstat_sighup_handler(SIGNAL_ARGS); static PgStat_StatDBEntry *pgstat_get_db_entry(Oid databaseid, bool create); -static void pgstat_write_statsfile(void); -static HTAB *pgstat_read_statsfile(Oid onlydb); +static PgStat_StatTabEntry *pgstat_get_tab_entry(PgStat_StatDBEntry *dbentry, + Oid tableoid, bool create); +static void pgstat_write_statsfile(bool permanent); +static HTAB *pgstat_read_statsfile(Oid onlydb, bool permanent); static void backend_read_statsfile(void); static void pgstat_read_current_status(void); + +static void pgstat_send_tabstat(PgStat_MsgTabstat *tsmsg); +static void pgstat_send_funcstats(void); static HTAB *pgstat_collect_oids(Oid catalogid); +static PgStat_TableStatus *get_tabstat_entry(Oid rel_id, bool isshared); + static void pgstat_setup_memcxt(void); static void pgstat_setheader(PgStat_MsgHdr *hdr, StatMsgType mtype); static void pgstat_send(void *msg, int len); +static void pgstat_recv_inquiry(PgStat_MsgInquiry *msg, int len); static void pgstat_recv_tabstat(PgStat_MsgTabstat *msg, int len); static void pgstat_recv_tabpurge(PgStat_MsgTabpurge *msg, int len); static void pgstat_recv_dropdb(PgStat_MsgDropdb *msg, int len); static void pgstat_recv_resetcounter(PgStat_MsgResetcounter *msg, int len); +static void pgstat_recv_resetsharedcounter(PgStat_MsgResetsharedcounter *msg, int len); +static void pgstat_recv_resetsinglecounter(PgStat_MsgResetsinglecounter *msg, int len); static void pgstat_recv_autovac(PgStat_MsgAutovacStart *msg, int len); static void pgstat_recv_vacuum(PgStat_MsgVacuum *msg, int len); static void pgstat_recv_analyze(PgStat_MsgAnalyze *msg, int len); +static void pgstat_recv_bgwriter(PgStat_MsgBgWriter *msg, int len); +static void pgstat_recv_funcstat(PgStat_MsgFuncstat *msg, int len); +static void pgstat_recv_funcpurge(PgStat_MsgFuncpurge *msg, int len); +static void pgstat_recv_recoveryconflict(PgStat_MsgRecoveryConflict *msg, int len); /* ------------------------------------------------------------ @@ -203,28 +312,6 @@ pgstat_init(void) #define TESTBYTEVAL ((char) 199) - /* - * Force start of collector daemon if something to collect. Note that - * pgstat_collect_querystring is now an independent facility that does not - * require the collector daemon. - */ - if (pgstat_collect_tuplelevel || - pgstat_collect_blocklevel) - pgstat_collect_startcollector = true; - - /* - * If we don't have to start a collector or should reset the collected - * statistics on postmaster start, simply remove the stats file. - */ - if (!pgstat_collect_startcollector || pgstat_collect_resetonpmstart) - pgstat_reset_all(); - - /* - * Nothing else required if collector will not get started - */ - if (!pgstat_collect_startcollector) - return; - /* * Create the UDP socket for sending and receiving statistic messages */ @@ -268,7 +355,7 @@ pgstat_init(void) /* * Create the socket. */ - if ((pgStatSock = socket(addr->ai_family, SOCK_DGRAM, 0)) < 0) + if ((pgStatSock = socket(addr->ai_family, SOCK_DGRAM, 0)) == PGINVALID_SOCKET) { ereport(LOG, (errcode_for_socket_access(), @@ -286,7 +373,7 @@ pgstat_init(void) (errcode_for_socket_access(), errmsg("could not bind socket for statistics collector: %m"))); closesocket(pgStatSock); - pgStatSock = -1; + pgStatSock = PGINVALID_SOCKET; continue; } @@ -297,7 +384,7 @@ pgstat_init(void) (errcode_for_socket_access(), errmsg("could not get address of socket for statistics collector: %m"))); closesocket(pgStatSock); - pgStatSock = -1; + pgStatSock = PGINVALID_SOCKET; continue; } @@ -313,7 +400,7 @@ pgstat_init(void) (errcode_for_socket_access(), errmsg("could not connect socket for statistics collector: %m"))); closesocket(pgStatSock); - pgStatSock = -1; + pgStatSock = PGINVALID_SOCKET; continue; } @@ -334,7 +421,7 @@ retry1: (errcode_for_socket_access(), errmsg("could not send test message on socket for statistics collector: %m"))); closesocket(pgStatSock); - pgStatSock = -1; + pgStatSock = PGINVALID_SOCKET; continue; } @@ -347,6 +434,7 @@ retry1: { FD_ZERO(&rset); FD_SET(pgStatSock, &rset); + tv.tv_sec = 0; tv.tv_usec = 500000; sel_res = select(pgStatSock + 1, &rset, NULL, NULL, &tv); @@ -359,7 +447,7 @@ retry1: (errcode_for_socket_access(), errmsg("select() failed in statistics collector: %m"))); closesocket(pgStatSock); - pgStatSock = -1; + pgStatSock = PGINVALID_SOCKET; continue; } if (sel_res == 0 || !FD_ISSET(pgStatSock, &rset)) @@ -374,7 +462,7 @@ retry1: (errcode(ERRCODE_CONNECTION_FAILURE), errmsg("test message did not get through on socket for statistics collector"))); closesocket(pgStatSock); - pgStatSock = -1; + pgStatSock = PGINVALID_SOCKET; continue; } @@ -389,7 +477,7 @@ retry2: (errcode_for_socket_access(), errmsg("could not receive test message on socket for statistics collector: %m"))); closesocket(pgStatSock); - pgStatSock = -1; + pgStatSock = PGINVALID_SOCKET; continue; } @@ -399,7 +487,7 @@ retry2: (errcode(ERRCODE_INTERNAL_ERROR), errmsg("incorrect test message transmission on socket for statistics collector"))); closesocket(pgStatSock); - pgStatSock = -1; + pgStatSock = PGINVALID_SOCKET; continue; } @@ -408,7 +496,7 @@ retry2: } /* Did we find a working address? */ - if (!addr || pgStatSock < 0) + if (!addr || pgStatSock == PGINVALID_SOCKET) goto startup_failed; /* @@ -435,27 +523,30 @@ startup_failed: if (addrs) pg_freeaddrinfo_all(hints.ai_family, addrs); - if (pgStatSock >= 0) + if (pgStatSock != PGINVALID_SOCKET) closesocket(pgStatSock); - pgStatSock = -1; + pgStatSock = PGINVALID_SOCKET; - /* Adjust GUC variables to suppress useless activity */ - pgstat_collect_startcollector = false; - pgstat_collect_tuplelevel = false; - pgstat_collect_blocklevel = false; + /* + * Adjust GUC variables to suppress useless activity, and for debugging + * purposes (seeing track_counts off is a clue that we failed here). We + * use PGC_S_OVERRIDE because there is no point in trying to turn it back + * on from postgresql.conf without a restart. + */ + SetConfigOption("track_counts", "off", PGC_INTERNAL, PGC_S_OVERRIDE); } /* * pgstat_reset_all() - * - * Remove the stats file. This is used on server start if the - * stats_reset_on_server_start feature is enabled, or if WAL + * Remove the stats file. This is currently used only if WAL * recovery is needed after a crash. */ void pgstat_reset_all(void) { - unlink(PGSTAT_STAT_FILENAME); + unlink(pgstat_stat_filename); + unlink(PGSTAT_STAT_PERMANENT_FILENAME); } #ifdef EXEC_BACKEND @@ -483,7 +574,7 @@ pgstat_forkexec(void) #endif /* EXEC_BACKEND */ -/* ---------- +/* * pgstat_start() - * * Called from postmaster at startup or after an existing collector @@ -492,7 +583,6 @@ pgstat_forkexec(void) * Returns PID of child process, or 0 if fail. * * Note: if fail, we will be called again from the postmaster main loop. - * ---------- */ int pgstat_start(void) @@ -501,9 +591,10 @@ pgstat_start(void) pid_t pgStatPid; /* - * Do nothing if no collector needed + * Check that the socket is there, else pgstat_init failed and we can do + * nothing useful. */ - if (!pgstat_collect_startcollector) + if (pgStatSock == PGINVALID_SOCKET) return 0; /* @@ -518,22 +609,6 @@ pgstat_start(void) return 0; last_pgstat_start_time = curtime; - /* - * Check that the socket is there, else pgstat_init failed. - */ - if (pgStatSock < 0) - { - ereport(LOG, - (errmsg("statistics collector startup skipped"))); - - /* - * We can only get here if someone tries to manually turn - * pgstat_collect_startcollector on after it had been off. - */ - pgstat_collect_startcollector = false; - return 0; - } - /* * Okay, fork off the collector. */ @@ -572,9 +647,10 @@ pgstat_start(void) return 0; } -void allow_immediate_pgstat_restart(void) +void +allow_immediate_pgstat_restart(void) { - last_pgstat_start_time = 0; + last_pgstat_start_time = 0; } /* ------------------------------------------------------------ @@ -584,92 +660,219 @@ void allow_immediate_pgstat_restart(void) /* ---------- - * pgstat_report_tabstat() - + * pgstat_report_stat() - * - * Called from tcop/postgres.c to send the so far collected - * per table access statistics to the collector. + * Called from tcop/postgres.c to send the so far collected per-table + * and function usage statistics to the collector. Note that this is + * called only when not within a transaction, so it is fair to use + * transaction stop time as an approximation of current time. * ---------- */ void -pgstat_report_tabstat(void) +pgstat_report_stat(bool force) { + /* we assume this inits to all zeroes: */ + static const PgStat_TableCounts all_zeroes; + static TimestampTz last_report = 0; + + TimestampTz now; + PgStat_MsgTabstat regular_msg; + PgStat_MsgTabstat shared_msg; + TabStatusArray *tsa; int i; - if (pgStatSock < 0 || - (!pgstat_collect_tuplelevel && - !pgstat_collect_blocklevel)) - { - /* Not reporting stats, so just flush whatever we have */ - RegularTabStat.tsa_used = 0; - SharedTabStat.tsa_used = 0; + /* Don't expend a clock check if nothing to do */ + if ((pgStatTabList == NULL || pgStatTabList->tsa_used == 0) + && !have_function_stats) return; - } /* - * For each message buffer used during the last query set the header - * fields and send it out. + * Don't send a message unless it's been at least PGSTAT_STAT_INTERVAL + * msec since we last sent one, or the caller wants to force stats out. + */ + now = GetCurrentTransactionStopTimestamp(); + if (!force && + !TimestampDifferenceExceeds(last_report, now, PGSTAT_STAT_INTERVAL)) + return; + last_report = now; + + /* + * Scan through the TabStatusArray struct(s) to find tables that actually + * have counts, and build messages to send. We have to separate shared + * relations from regular ones because the databaseid field in the message + * header has to depend on that. */ - for (i = 0; i < RegularTabStat.tsa_used; i++) + regular_msg.m_databaseid = MyDatabaseId; + shared_msg.m_databaseid = InvalidOid; + regular_msg.m_nentries = 0; + shared_msg.m_nentries = 0; + + for (tsa = pgStatTabList; tsa != NULL; tsa = tsa->tsa_next) { - PgStat_MsgTabstat *tsmsg = RegularTabStat.tsa_messages[i]; - int n; - int len; + for (i = 0; i < tsa->tsa_used; i++) + { + PgStat_TableStatus *entry = &tsa->tsa_entries[i]; + PgStat_MsgTabstat *this_msg; + PgStat_TableEntry *this_ent; + + /* Shouldn't have any pending transaction-dependent counts */ + Assert(entry->trans == NULL); + + /* + * Ignore entries that didn't accumulate any actual counts, such + * as indexes that were opened by the planner but not used. + */ + if (memcmp(&entry->t_counts, &all_zeroes, + sizeof(PgStat_TableCounts)) == 0) + continue; + + /* + * OK, insert data into the appropriate message, and send if full. + */ + this_msg = entry->t_shared ? &shared_msg : ®ular_msg; + this_ent = &this_msg->m_entry[this_msg->m_nentries]; + this_ent->t_id = entry->t_id; + memcpy(&this_ent->t_counts, &entry->t_counts, + sizeof(PgStat_TableCounts)); + if (++this_msg->m_nentries >= PGSTAT_NUM_TABENTRIES) + { + pgstat_send_tabstat(this_msg); + this_msg->m_nentries = 0; + } + } + /* zero out TableStatus structs after use */ + MemSet(tsa->tsa_entries, 0, + tsa->tsa_used * sizeof(PgStat_TableStatus)); + tsa->tsa_used = 0; + } + + /* + * Send partial messages. If force is true, make sure that any pending + * xact commit/abort gets counted, even if no table stats to send. + */ + if (regular_msg.m_nentries > 0 || + (force && (pgStatXactCommit > 0 || pgStatXactRollback > 0))) + pgstat_send_tabstat(®ular_msg); + if (shared_msg.m_nentries > 0) + pgstat_send_tabstat(&shared_msg); + + /* Now, send function statistics */ + pgstat_send_funcstats(); +} + +/* + * Subroutine for pgstat_report_stat: finish and send a tabstat message + */ +static void +pgstat_send_tabstat(PgStat_MsgTabstat *tsmsg) +{ + int n; + int len; - n = tsmsg->m_nentries; - len = offsetof(PgStat_MsgTabstat, m_entry[0]) + - n * sizeof(PgStat_TableEntry); + /* It's unlikely we'd get here with no socket, but maybe not impossible */ + if (pgStatSock == PGINVALID_SOCKET) + return; + /* + * Report accumulated xact commit/rollback whenever we send a normal + * tabstat message + */ + if (OidIsValid(tsmsg->m_databaseid)) + { tsmsg->m_xact_commit = pgStatXactCommit; tsmsg->m_xact_rollback = pgStatXactRollback; pgStatXactCommit = 0; pgStatXactRollback = 0; - - pgstat_setheader(&tsmsg->m_hdr, PGSTAT_MTYPE_TABSTAT); - tsmsg->m_databaseid = MyDatabaseId; - pgstat_send(tsmsg, len); } - RegularTabStat.tsa_used = 0; + else + { + tsmsg->m_xact_commit = 0; + tsmsg->m_xact_rollback = 0; + } + + n = tsmsg->m_nentries; + len = offsetof(PgStat_MsgTabstat, m_entry[0]) + + n * sizeof(PgStat_TableEntry); + + pgstat_setheader(&tsmsg->m_hdr, PGSTAT_MTYPE_TABSTAT); + pgstat_send(tsmsg, len); +} + +/* + * Subroutine for pgstat_report_stat: populate and send a function stat message + */ +static void +pgstat_send_funcstats(void) +{ + /* we assume this inits to all zeroes: */ + static const PgStat_FunctionCounts all_zeroes; + + PgStat_MsgFuncstat msg; + PgStat_BackendFunctionEntry *entry; + HASH_SEQ_STATUS fstat; - /* Ditto, for shared relations */ - for (i = 0; i < SharedTabStat.tsa_used; i++) + if (pgStatFunctions == NULL) + return; + + pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_FUNCSTAT); + msg.m_databaseid = MyDatabaseId; + msg.m_nentries = 0; + + hash_seq_init(&fstat, pgStatFunctions); + while ((entry = (PgStat_BackendFunctionEntry *) hash_seq_search(&fstat)) != NULL) { - PgStat_MsgTabstat *tsmsg = SharedTabStat.tsa_messages[i]; - int n; - int len; + PgStat_FunctionEntry *m_ent; - n = tsmsg->m_nentries; - len = offsetof(PgStat_MsgTabstat, m_entry[0]) + - n * sizeof(PgStat_TableEntry); + /* Skip it if no counts accumulated since last time */ + if (memcmp(&entry->f_counts, &all_zeroes, + sizeof(PgStat_FunctionCounts)) == 0) + continue; - /* We don't report transaction commit/abort here */ - tsmsg->m_xact_commit = 0; - tsmsg->m_xact_rollback = 0; + /* need to convert format of time accumulators */ + m_ent = &msg.m_entry[msg.m_nentries]; + m_ent->f_id = entry->f_id; + m_ent->f_numcalls = entry->f_counts.f_numcalls; + m_ent->f_time = INSTR_TIME_GET_MICROSEC(entry->f_counts.f_time); + m_ent->f_time_self = INSTR_TIME_GET_MICROSEC(entry->f_counts.f_time_self); - pgstat_setheader(&tsmsg->m_hdr, PGSTAT_MTYPE_TABSTAT); - tsmsg->m_databaseid = InvalidOid; - pgstat_send(tsmsg, len); + if (++msg.m_nentries >= PGSTAT_NUM_FUNCENTRIES) + { + pgstat_send(&msg, offsetof(PgStat_MsgFuncstat, m_entry[0]) + + msg.m_nentries * sizeof(PgStat_FunctionEntry)); + msg.m_nentries = 0; + } + + /* reset the entry's counts */ + MemSet(&entry->f_counts, 0, sizeof(PgStat_FunctionCounts)); } - SharedTabStat.tsa_used = 0; + + if (msg.m_nentries > 0) + pgstat_send(&msg, offsetof(PgStat_MsgFuncstat, m_entry[0]) + + msg.m_nentries * sizeof(PgStat_FunctionEntry)); + + have_function_stats = false; } /* ---------- - * pgstat_vacuum_tabstat() - + * pgstat_vacuum_stat() - * * Will tell the collector about objects he can get rid of. * ---------- */ void -pgstat_vacuum_tabstat(void) +pgstat_vacuum_stat(void) { HTAB *htab; PgStat_MsgTabpurge msg; + PgStat_MsgFuncpurge f_msg; HASH_SEQ_STATUS hstat; PgStat_StatDBEntry *dbentry; PgStat_StatTabEntry *tabentry; + PgStat_StatFuncEntry *funcentry; int len; - if (pgStatSock < 0) + if (pgStatSock == PGINVALID_SOCKET) return; /* @@ -694,7 +897,9 @@ pgstat_vacuum_tabstat(void) CHECK_FOR_INTERRUPTS(); - if (hash_search(htab, (void *) &dbid, HASH_FIND, NULL) == NULL) + /* the DB entry for shared tables (with InvalidOid) is never dropped */ + if (OidIsValid(dbid) && + hash_search(htab, (void *) &dbid, HASH_FIND, NULL) == NULL) pgstat_drop_database(dbid); } @@ -769,15 +974,72 @@ pgstat_vacuum_tabstat(void) /* Clean up */ hash_destroy(htab); + + /* + * Now repeat the above steps for functions. However, we needn't bother + * in the common case where no function stats are being collected. + */ + if (dbentry->functions != NULL && + hash_get_num_entries(dbentry->functions) > 0) + { + htab = pgstat_collect_oids(ProcedureRelationId); + + pgstat_setheader(&f_msg.m_hdr, PGSTAT_MTYPE_FUNCPURGE); + f_msg.m_databaseid = MyDatabaseId; + f_msg.m_nentries = 0; + + hash_seq_init(&hstat, dbentry->functions); + while ((funcentry = (PgStat_StatFuncEntry *) hash_seq_search(&hstat)) != NULL) + { + Oid funcid = funcentry->functionid; + + CHECK_FOR_INTERRUPTS(); + + if (hash_search(htab, (void *) &funcid, HASH_FIND, NULL) != NULL) + continue; + + /* + * Not there, so add this function's Oid to the message + */ + f_msg.m_functionid[f_msg.m_nentries++] = funcid; + + /* + * If the message is full, send it out and reinitialize to empty + */ + if (f_msg.m_nentries >= PGSTAT_NUM_FUNCPURGE) + { + len = offsetof(PgStat_MsgFuncpurge, m_functionid[0]) + +f_msg.m_nentries * sizeof(Oid); + + pgstat_send(&f_msg, len); + + f_msg.m_nentries = 0; + } + } + + /* + * Send the rest + */ + if (f_msg.m_nentries > 0) + { + len = offsetof(PgStat_MsgFuncpurge, m_functionid[0]) + +f_msg.m_nentries * sizeof(Oid); + + pgstat_send(&f_msg, len); + } + + hash_destroy(htab); + } } /* ---------- * pgstat_collect_oids() - * - * Collect the OIDs of either all databases or all tables, according to - * the parameter, into a temporary hash table. Caller should hash_destroy - * the result when done with it. + * Collect the OIDs of all objects listed in the specified system catalog + * into a temporary hash table. Caller should hash_destroy the result + * when done with it. (However, we make the table in CurrentMemoryContext + * so that it will be freed properly in event of an error.) * ---------- */ static HTAB * @@ -793,16 +1055,17 @@ pgstat_collect_oids(Oid catalogid) hash_ctl.keysize = sizeof(Oid); hash_ctl.entrysize = sizeof(Oid); hash_ctl.hash = oid_hash; + hash_ctl.hcxt = CurrentMemoryContext; htab = hash_create("Temporary table of OIDs", PGSTAT_TAB_HASH_SIZE, &hash_ctl, - HASH_ELEM | HASH_FUNCTION); + HASH_ELEM | HASH_FUNCTION | HASH_CONTEXT); rel = heap_open(catalogid, AccessShareLock); scan = heap_beginscan(rel, SnapshotNow, 0, NULL); while ((tup = heap_getnext(scan, ForwardScanDirection)) != NULL) { - Oid thisoid = HeapTupleGetOid(tup); + Oid thisoid = HeapTupleGetOid(tup); CHECK_FOR_INTERRUPTS(); @@ -820,7 +1083,7 @@ pgstat_collect_oids(Oid catalogid) * * Tell the collector that we just dropped a database. * (If the message gets lost, we will still clean the dead DB eventually - * via future invocations of pgstat_vacuum_tabstat().) + * via future invocations of pgstat_vacuum_stat().) * ---------- */ void @@ -828,7 +1091,7 @@ pgstat_drop_database(Oid databaseid) { PgStat_MsgDropdb msg; - if (pgStatSock < 0) + if (pgStatSock == PGINVALID_SOCKET) return; pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_DROPDB); @@ -842,16 +1105,20 @@ pgstat_drop_database(Oid databaseid) * * Tell the collector that we just dropped a relation. * (If the message gets lost, we will still clean the dead entry eventually - * via future invocations of pgstat_vacuum_tabstat().) + * via future invocations of pgstat_vacuum_stat().) + * + * Currently not used for lack of any good place to call it; we rely + * entirely on pgstat_vacuum_stat() to clean out stats for dead rels. * ---------- */ +#ifdef NOT_USED void pgstat_drop_relation(Oid relid) { PgStat_MsgTabpurge msg; int len; - if (pgStatSock < 0) + if (pgStatSock == PGINVALID_SOCKET) return; msg.m_tableid[0] = relid; @@ -863,6 +1130,7 @@ pgstat_drop_relation(Oid relid) msg.m_databaseid = MyDatabaseId; pgstat_send(&msg, len); } +#endif /* NOT_USED */ /* ---------- @@ -876,7 +1144,7 @@ pgstat_reset_counters(void) { PgStat_MsgResetcounter msg; - if (pgStatSock < 0) + if (pgStatSock == PGINVALID_SOCKET) return; if (!superuser()) @@ -889,6 +1157,63 @@ pgstat_reset_counters(void) pgstat_send(&msg, sizeof(msg)); } +/* ---------- + * pgstat_reset_shared_counters() - + * + * Tell the statistics collector to reset cluster-wide shared counters. + * ---------- + */ +void +pgstat_reset_shared_counters(const char *target) +{ + PgStat_MsgResetsharedcounter msg; + + if (pgStatSock == PGINVALID_SOCKET) + return; + + if (!superuser()) + ereport(ERROR, + (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), + errmsg("must be superuser to reset statistics counters"))); + + if (strcmp(target, "bgwriter") == 0) + msg.m_resettarget = RESET_BGWRITER; + else + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("unrecognized reset target: \"%s\"", target), + errhint("Target must be \"bgwriter\"."))); + + pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_RESETSHAREDCOUNTER); + pgstat_send(&msg, sizeof(msg)); +} + +/* ---------- + * pgstat_reset_single_counter() - + * + * Tell the statistics collector to reset a single counter. + * ---------- + */ +void +pgstat_reset_single_counter(Oid objoid, PgStat_Single_Reset_Type type) +{ + PgStat_MsgResetsinglecounter msg; + + if (pgStatSock == PGINVALID_SOCKET) + return; + + if (!superuser()) + ereport(ERROR, + (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), + errmsg("must be superuser to reset statistics counters"))); + + pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_RESETSINGLECOUNTER); + msg.m_databaseid = MyDatabaseId; + msg.m_resettype = type; + msg.m_objectid = objoid; + + pgstat_send(&msg, sizeof(msg)); +} /* ---------- * pgstat_report_autovac() - @@ -903,7 +1228,7 @@ pgstat_report_autovac(Oid dboid) { PgStat_MsgAutovacStart msg; - if (pgStatSock < 0) + if (pgStatSock == PGINVALID_SOCKET) return; pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_AUTOVAC_START); @@ -921,20 +1246,17 @@ pgstat_report_autovac(Oid dboid) * --------- */ void -pgstat_report_vacuum(Oid tableoid, bool shared, - bool analyze, PgStat_Counter tuples) +pgstat_report_vacuum(Oid tableoid, bool shared, PgStat_Counter tuples) { PgStat_MsgVacuum msg; - if (pgStatSock < 0 || - !pgstat_collect_tuplelevel) + if (pgStatSock == PGINVALID_SOCKET || !pgstat_track_counts) return; pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_VACUUM); msg.m_databaseid = shared ? InvalidOid : MyDatabaseId; msg.m_tableoid = tableoid; - msg.m_analyze = analyze; - msg.m_autovacuum = IsAutoVacuumWorkerProcess(); /* is this autovacuum? */ + msg.m_autovacuum = IsAutoVacuumWorkerProcess(); msg.m_vacuumtime = GetCurrentTimestamp(); msg.m_tuples = tuples; pgstat_send(&msg, sizeof(msg)); @@ -947,25 +1269,69 @@ pgstat_report_vacuum(Oid tableoid, bool shared, * -------- */ void -pgstat_report_analyze(Oid tableoid, bool shared, PgStat_Counter livetuples, - PgStat_Counter deadtuples) +pgstat_report_analyze(Relation rel, + PgStat_Counter livetuples, PgStat_Counter deadtuples) { PgStat_MsgAnalyze msg; - if (pgStatSock < 0 || - !pgstat_collect_tuplelevel) + if (pgStatSock == PGINVALID_SOCKET || !pgstat_track_counts) return; + /* + * Unlike VACUUM, ANALYZE might be running inside a transaction that has + * already inserted and/or deleted rows in the target table. ANALYZE will + * have counted such rows as live or dead respectively. Because we will + * report our counts of such rows at transaction end, we should subtract + * off these counts from what we send to the collector now, else they'll + * be double-counted after commit. (This approach also ensures that the + * collector ends up with the right numbers if we abort instead of + * committing.) + */ + if (rel->pgstat_info != NULL) + { + PgStat_TableXactStatus *trans; + + for (trans = rel->pgstat_info->trans; trans; trans = trans->upper) + { + livetuples -= trans->tuples_inserted - trans->tuples_deleted; + deadtuples -= trans->tuples_updated + trans->tuples_deleted; + } + /* count stuff inserted by already-aborted subxacts, too */ + deadtuples -= rel->pgstat_info->t_counts.t_delta_dead_tuples; + /* Since ANALYZE's counts are estimates, we could have underflowed */ + livetuples = Max(livetuples, 0); + deadtuples = Max(deadtuples, 0); + } + pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_ANALYZE); - msg.m_databaseid = shared ? InvalidOid : MyDatabaseId; - msg.m_tableoid = tableoid; - msg.m_autovacuum = IsAutoVacuumWorkerProcess(); /* is this autovacuum? */ + msg.m_databaseid = rel->rd_rel->relisshared ? InvalidOid : MyDatabaseId; + msg.m_tableoid = RelationGetRelid(rel); + msg.m_autovacuum = IsAutoVacuumWorkerProcess(); msg.m_analyzetime = GetCurrentTimestamp(); msg.m_live_tuples = livetuples; msg.m_dead_tuples = deadtuples; pgstat_send(&msg, sizeof(msg)); } +/* -------- + * pgstat_report_recovery_conflict() - + * + * Tell the collector about a Hot Standby recovery conflict. + * -------- + */ +void +pgstat_report_recovery_conflict(int reason) +{ + PgStat_MsgRecoveryConflict msg; + + if (pgStatSock == PGINVALID_SOCKET || !pgstat_track_counts) + return; + + pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_RECOVERYCONFLICT); + msg.m_databaseid = MyDatabaseId; + msg.m_reason = reason; + pgstat_send(&msg, sizeof(msg)); +} /* ---------- * pgstat_ping() - @@ -978,194 +1344,707 @@ pgstat_ping(void) { PgStat_MsgDummy msg; - if (pgStatSock < 0) + if (pgStatSock == PGINVALID_SOCKET) return; pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_DUMMY); pgstat_send(&msg, sizeof(msg)); } -/* - * Enlarge a TabStatArray +/* ---------- + * pgstat_send_inquiry() - + * + * Notify collector that we need fresh data. + * ts specifies the minimum acceptable timestamp for the stats file. + * ---------- */ static void -more_tabstat_space(TabStatArray *tsarr) +pgstat_send_inquiry(TimestampTz ts) { - PgStat_MsgTabstat *newMessages; - PgStat_MsgTabstat **msgArray; - int newAlloc; - int i; - - AssertArg(PointerIsValid(tsarr)); + PgStat_MsgInquiry msg; - newAlloc = tsarr->tsa_alloc + TABSTAT_QUANTUM; - - /* Create (another) quantum of message buffers */ - newMessages = (PgStat_MsgTabstat *) - MemoryContextAllocZero(TopMemoryContext, - sizeof(PgStat_MsgTabstat) * TABSTAT_QUANTUM); - - /* Create or enlarge the pointer array */ - if (tsarr->tsa_messages == NULL) - msgArray = (PgStat_MsgTabstat **) - MemoryContextAlloc(TopMemoryContext, - sizeof(PgStat_MsgTabstat *) * newAlloc); - else - msgArray = (PgStat_MsgTabstat **) - repalloc(tsarr->tsa_messages, - sizeof(PgStat_MsgTabstat *) * newAlloc); - - for (i = 0; i < TABSTAT_QUANTUM; i++) - msgArray[tsarr->tsa_alloc + i] = newMessages++; - tsarr->tsa_messages = msgArray; - tsarr->tsa_alloc = newAlloc; - - Assert(tsarr->tsa_used < tsarr->tsa_alloc); + pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_INQUIRY); + msg.inquiry_time = ts; + pgstat_send(&msg, sizeof(msg)); } -/* ---------- - * pgstat_initstats() - - * - * Called from various places usually dealing with initialization - * of Relation or Scan structures. The data placed into these - * structures from here tell where later to count for buffer reads, - * scans and tuples fetched. - * ---------- + +/* + * Initialize function call usage data. + * Called by the executor before invoking a function. */ void -pgstat_initstats(PgStat_Info *stats, Relation rel) +pgstat_init_function_usage(FunctionCallInfoData *fcinfo, + PgStat_FunctionCallUsage *fcu) { - Oid rel_id = rel->rd_id; - PgStat_TableEntry *useent; - TabStatArray *tsarr; - PgStat_MsgTabstat *tsmsg; - int mb; - int i; - - /* - * Initialize data not to count at all. - */ - stats->tabentry = NULL; + PgStat_BackendFunctionEntry *htabent; + bool found; - if (pgStatSock < 0 || - !(pgstat_collect_tuplelevel || - pgstat_collect_blocklevel)) + if (pgstat_track_functions <= fcinfo->flinfo->fn_stats) + { + /* stats not wanted */ + fcu->fs = NULL; return; + } - tsarr = rel->rd_rel->relisshared ? &SharedTabStat : &RegularTabStat; - - /* - * Search the already-used message slots for this relation. - */ - for (mb = 0; mb < tsarr->tsa_used; mb++) + if (!pgStatFunctions) { - tsmsg = tsarr->tsa_messages[mb]; + /* First time through - initialize function stat table */ + HASHCTL hash_ctl; - for (i = tsmsg->m_nentries; --i >= 0;) - { - if (tsmsg->m_entry[i].t_id == rel_id) - { - stats->tabentry = (void *) &(tsmsg->m_entry[i]); - return; - } - } + memset(&hash_ctl, 0, sizeof(hash_ctl)); + hash_ctl.keysize = sizeof(Oid); + hash_ctl.entrysize = sizeof(PgStat_BackendFunctionEntry); + hash_ctl.hash = oid_hash; + pgStatFunctions = hash_create("Function stat entries", + PGSTAT_FUNCTION_HASH_SIZE, + &hash_ctl, + HASH_ELEM | HASH_FUNCTION); + } - if (tsmsg->m_nentries >= PGSTAT_NUM_TABENTRIES) - continue; + /* Get the stats entry for this function, create if necessary */ + htabent = hash_search(pgStatFunctions, &fcinfo->flinfo->fn_oid, + HASH_ENTER, &found); + if (!found) + MemSet(&htabent->f_counts, 0, sizeof(PgStat_FunctionCounts)); - /* - * Not found, but found a message buffer with an empty slot instead. - * Fine, let's use this one. - */ - i = tsmsg->m_nentries++; - useent = &tsmsg->m_entry[i]; - MemSet(useent, 0, sizeof(PgStat_TableEntry)); - useent->t_id = rel_id; - stats->tabentry = (void *) useent; + fcu->fs = &htabent->f_counts; + + /* save stats for this function, later used to compensate for recursion */ + fcu->save_f_time = htabent->f_counts.f_time; + + /* save current backend-wide total time */ + fcu->save_total = total_func_time; + + /* get clock time as of function start */ + INSTR_TIME_SET_CURRENT(fcu->f_start); +} + +/* + * find_funcstat_entry - find any existing PgStat_BackendFunctionEntry entry + * for specified function + * + * If no entry, return NULL, don't create a new one + */ +PgStat_BackendFunctionEntry * +find_funcstat_entry(Oid func_id) +{ + if (pgStatFunctions == NULL) + return NULL; + + return (PgStat_BackendFunctionEntry *) hash_search(pgStatFunctions, + (void *) &func_id, + HASH_FIND, NULL); +} + +/* + * Calculate function call usage and update stat counters. + * Called by the executor after invoking a function. + * + * In the case of a set-returning function that runs in value-per-call mode, + * we will see multiple pgstat_init_function_usage/pgstat_end_function_usage + * calls for what the user considers a single call of the function. The + * finalize flag should be TRUE on the last call. + */ +void +pgstat_end_function_usage(PgStat_FunctionCallUsage *fcu, bool finalize) +{ + PgStat_FunctionCounts *fs = fcu->fs; + instr_time f_total; + instr_time f_others; + instr_time f_self; + + /* stats not wanted? */ + if (fs == NULL) return; - } - /* - * If we ran out of message buffers, we just allocate more. - */ - if (tsarr->tsa_used >= tsarr->tsa_alloc) - more_tabstat_space(tsarr); + /* total elapsed time in this function call */ + INSTR_TIME_SET_CURRENT(f_total); + INSTR_TIME_SUBTRACT(f_total, fcu->f_start); + + /* self usage: elapsed minus anything already charged to other calls */ + f_others = total_func_time; + INSTR_TIME_SUBTRACT(f_others, fcu->save_total); + f_self = f_total; + INSTR_TIME_SUBTRACT(f_self, f_others); + + /* update backend-wide total time */ + INSTR_TIME_ADD(total_func_time, f_self); /* - * Use the first entry of the next message buffer. + * Compute the new total f_time as the total elapsed time added to the + * pre-call value of f_time. This is necessary to avoid double-counting + * any time taken by recursive calls of myself. (We do not need any + * similar kluge for self time, since that already excludes any recursive + * calls.) */ - mb = tsarr->tsa_used++; - tsmsg = tsarr->tsa_messages[mb]; - tsmsg->m_nentries = 1; - useent = &tsmsg->m_entry[0]; - MemSet(useent, 0, sizeof(PgStat_TableEntry)); - useent->t_id = rel_id; - stats->tabentry = (void *) useent; + INSTR_TIME_ADD(f_total, fcu->save_f_time); + + /* update counters in function stats table */ + if (finalize) + fs->f_numcalls++; + fs->f_time = f_total; + INSTR_TIME_ADD(fs->f_time_self, f_self); + + /* indicate that we have something to send */ + have_function_stats = true; } /* ---------- - * pgstat_count_xact_commit() - + * pgstat_initstats() - + * + * Initialize a relcache entry to count access statistics. + * Called whenever a relation is opened. * - * Called from access/transam/xact.c to count transaction commits. + * We assume that a relcache entry's pgstat_info field is zeroed by + * relcache.c when the relcache entry is made; thereafter it is long-lived + * data. We can avoid repeated searches of the TabStatus arrays when the + * same relation is touched repeatedly within a transaction. * ---------- */ void -pgstat_count_xact_commit(void) +pgstat_initstats(Relation rel) { - if (!pgstat_collect_tuplelevel && - !pgstat_collect_blocklevel) + Oid rel_id = rel->rd_id; + char relkind = rel->rd_rel->relkind; + + /* We only count stats for things that have storage */ + if (!(relkind == RELKIND_RELATION || + relkind == RELKIND_INDEX || + relkind == RELKIND_TOASTVALUE || + relkind == RELKIND_SEQUENCE)) + { + rel->pgstat_info = NULL; return; + } + + if (pgStatSock == PGINVALID_SOCKET || !pgstat_track_counts) + { + /* We're not counting at all */ + rel->pgstat_info = NULL; + return; + } + + /* + * If we already set up this relation in the current transaction, nothing + * to do. + */ + if (rel->pgstat_info != NULL && + rel->pgstat_info->t_id == rel_id) + return; + + /* Else find or make the PgStat_TableStatus entry, and update link */ + rel->pgstat_info = get_tabstat_entry(rel_id, rel->rd_rel->relisshared); +} - pgStatXactCommit++; +/* + * get_tabstat_entry - find or create a PgStat_TableStatus entry for rel + */ +static PgStat_TableStatus * +get_tabstat_entry(Oid rel_id, bool isshared) +{ + PgStat_TableStatus *entry; + TabStatusArray *tsa; + TabStatusArray *prev_tsa; + int i; /* - * If there was no relation activity yet, just make one existing message - * buffer used without slots, causing the next report to tell new - * xact-counters. + * Search the already-used tabstat slots for this relation. */ - if (RegularTabStat.tsa_alloc == 0) - more_tabstat_space(&RegularTabStat); + prev_tsa = NULL; + for (tsa = pgStatTabList; tsa != NULL; prev_tsa = tsa, tsa = tsa->tsa_next) + { + for (i = 0; i < tsa->tsa_used; i++) + { + entry = &tsa->tsa_entries[i]; + if (entry->t_id == rel_id) + return entry; + } + + if (tsa->tsa_used < TABSTAT_QUANTUM) + { + /* + * It must not be present, but we found a free slot instead. Fine, + * let's use this one. We assume the entry was already zeroed, + * either at creation or after last use. + */ + entry = &tsa->tsa_entries[tsa->tsa_used++]; + entry->t_id = rel_id; + entry->t_shared = isshared; + return entry; + } + } + + /* + * We ran out of tabstat slots, so allocate more. Be sure they're zeroed. + */ + tsa = (TabStatusArray *) MemoryContextAllocZero(TopMemoryContext, + sizeof(TabStatusArray)); + if (prev_tsa) + prev_tsa->tsa_next = tsa; + else + pgStatTabList = tsa; + + /* + * Use the first entry of the new TabStatusArray. + */ + entry = &tsa->tsa_entries[tsa->tsa_used++]; + entry->t_id = rel_id; + entry->t_shared = isshared; + return entry; +} + +/* + * find_tabstat_entry - find any existing PgStat_TableStatus entry for rel + * + * If no entry, return NULL, don't create a new one + */ +PgStat_TableStatus * +find_tabstat_entry(Oid rel_id) +{ + PgStat_TableStatus *entry; + TabStatusArray *tsa; + int i; + + for (tsa = pgStatTabList; tsa != NULL; tsa = tsa->tsa_next) + { + for (i = 0; i < tsa->tsa_used; i++) + { + entry = &tsa->tsa_entries[i]; + if (entry->t_id == rel_id) + return entry; + } + } + + /* Not present */ + return NULL; +} + +/* + * get_tabstat_stack_level - add a new (sub)transaction stack entry if needed + */ +static PgStat_SubXactStatus * +get_tabstat_stack_level(int nest_level) +{ + PgStat_SubXactStatus *xact_state; + + xact_state = pgStatXactStack; + if (xact_state == NULL || xact_state->nest_level != nest_level) + { + xact_state = (PgStat_SubXactStatus *) + MemoryContextAlloc(TopTransactionContext, + sizeof(PgStat_SubXactStatus)); + xact_state->nest_level = nest_level; + xact_state->prev = pgStatXactStack; + xact_state->first = NULL; + pgStatXactStack = xact_state; + } + return xact_state; +} + +/* + * add_tabstat_xact_level - add a new (sub)transaction state record + */ +static void +add_tabstat_xact_level(PgStat_TableStatus *pgstat_info, int nest_level) +{ + PgStat_SubXactStatus *xact_state; + PgStat_TableXactStatus *trans; + + /* + * If this is the first rel to be modified at the current nest level, we + * first have to push a transaction stack entry. + */ + xact_state = get_tabstat_stack_level(nest_level); + + /* Now make a per-table stack entry */ + trans = (PgStat_TableXactStatus *) + MemoryContextAllocZero(TopTransactionContext, + sizeof(PgStat_TableXactStatus)); + trans->nest_level = nest_level; + trans->upper = pgstat_info->trans; + trans->parent = pgstat_info; + trans->next = xact_state->first; + xact_state->first = trans; + pgstat_info->trans = trans; +} + +/* + * pgstat_count_heap_insert - count a tuple insertion + */ +void +pgstat_count_heap_insert(Relation rel) +{ + PgStat_TableStatus *pgstat_info = rel->pgstat_info; + + if (pgstat_info != NULL) + { + /* We have to log the effect at the proper transactional level */ + int nest_level = GetCurrentTransactionNestLevel(); + + if (pgstat_info->trans == NULL || + pgstat_info->trans->nest_level != nest_level) + add_tabstat_xact_level(pgstat_info, nest_level); + + pgstat_info->trans->tuples_inserted++; + } +} + +/* + * pgstat_count_heap_update - count a tuple update + */ +void +pgstat_count_heap_update(Relation rel, bool hot) +{ + PgStat_TableStatus *pgstat_info = rel->pgstat_info; + + if (pgstat_info != NULL) + { + /* We have to log the effect at the proper transactional level */ + int nest_level = GetCurrentTransactionNestLevel(); + + if (pgstat_info->trans == NULL || + pgstat_info->trans->nest_level != nest_level) + add_tabstat_xact_level(pgstat_info, nest_level); + + pgstat_info->trans->tuples_updated++; + + /* t_tuples_hot_updated is nontransactional, so just advance it */ + if (hot) + pgstat_info->t_counts.t_tuples_hot_updated++; + } +} + +/* + * pgstat_count_heap_delete - count a tuple deletion + */ +void +pgstat_count_heap_delete(Relation rel) +{ + PgStat_TableStatus *pgstat_info = rel->pgstat_info; - if (RegularTabStat.tsa_used == 0) + if (pgstat_info != NULL) { - RegularTabStat.tsa_used++; - RegularTabStat.tsa_messages[0]->m_nentries = 0; + /* We have to log the effect at the proper transactional level */ + int nest_level = GetCurrentTransactionNestLevel(); + + if (pgstat_info->trans == NULL || + pgstat_info->trans->nest_level != nest_level) + add_tabstat_xact_level(pgstat_info, nest_level); + + pgstat_info->trans->tuples_deleted++; } } +/* + * pgstat_update_heap_dead_tuples - update dead-tuples count + * + * The semantics of this are that we are reporting the nontransactional + * recovery of "delta" dead tuples; so t_delta_dead_tuples decreases + * rather than increasing, and the change goes straight into the per-table + * counter, not into transactional state. + */ +void +pgstat_update_heap_dead_tuples(Relation rel, int delta) +{ + PgStat_TableStatus *pgstat_info = rel->pgstat_info; + + if (pgstat_info != NULL) + pgstat_info->t_counts.t_delta_dead_tuples -= delta; +} + /* ---------- - * pgstat_count_xact_rollback() - + * AtEOXact_PgStat * - * Called from access/transam/xact.c to count transaction rollbacks. + * Called from access/transam/xact.c at top-level transaction commit/abort. * ---------- */ void -pgstat_count_xact_rollback(void) +AtEOXact_PgStat(bool isCommit) { - if (!pgstat_collect_tuplelevel && - !pgstat_collect_blocklevel) - return; + PgStat_SubXactStatus *xact_state; - pgStatXactRollback++; + /* + * Count transaction commit or abort. (We use counters, not just bools, + * in case the reporting message isn't sent right away.) + */ + if (isCommit) + pgStatXactCommit++; + else + pgStatXactRollback++; /* - * If there was no relation activity yet, just make one existing message - * buffer used without slots, causing the next report to tell new - * xact-counters. + * Transfer transactional insert/update counts into the base tabstat + * entries. We don't bother to free any of the transactional state, since + * it's all in TopTransactionContext and will go away anyway. */ - if (RegularTabStat.tsa_alloc == 0) - more_tabstat_space(&RegularTabStat); + xact_state = pgStatXactStack; + if (xact_state != NULL) + { + PgStat_TableXactStatus *trans; + + Assert(xact_state->nest_level == 1); + Assert(xact_state->prev == NULL); + for (trans = xact_state->first; trans != NULL; trans = trans->next) + { + PgStat_TableStatus *tabstat; + + Assert(trans->nest_level == 1); + Assert(trans->upper == NULL); + tabstat = trans->parent; + Assert(tabstat->trans == trans); + /* count attempted actions regardless of commit/abort */ + tabstat->t_counts.t_tuples_inserted += trans->tuples_inserted; + tabstat->t_counts.t_tuples_updated += trans->tuples_updated; + tabstat->t_counts.t_tuples_deleted += trans->tuples_deleted; + if (isCommit) + { + /* insert adds a live tuple, delete removes one */ + tabstat->t_counts.t_delta_live_tuples += + trans->tuples_inserted - trans->tuples_deleted; + /* update and delete each create a dead tuple */ + tabstat->t_counts.t_delta_dead_tuples += + trans->tuples_updated + trans->tuples_deleted; + /* insert, update, delete each count as one change event */ + tabstat->t_counts.t_changed_tuples += + trans->tuples_inserted + trans->tuples_updated + + trans->tuples_deleted; + } + else + { + /* inserted tuples are dead, deleted tuples are unaffected */ + tabstat->t_counts.t_delta_dead_tuples += + trans->tuples_inserted + trans->tuples_updated; + /* an aborted xact generates no changed_tuple events */ + } + tabstat->trans = NULL; + } + } + pgStatXactStack = NULL; + + /* Make sure any stats snapshot is thrown away */ + pgstat_clear_snapshot(); +} - if (RegularTabStat.tsa_used == 0) +/* ---------- + * AtEOSubXact_PgStat + * + * Called from access/transam/xact.c at subtransaction commit/abort. + * ---------- + */ +void +AtEOSubXact_PgStat(bool isCommit, int nestDepth) +{ + PgStat_SubXactStatus *xact_state; + + /* + * Transfer transactional insert/update counts into the next higher + * subtransaction state. + */ + xact_state = pgStatXactStack; + if (xact_state != NULL && + xact_state->nest_level >= nestDepth) { - RegularTabStat.tsa_used++; - RegularTabStat.tsa_messages[0]->m_nentries = 0; + PgStat_TableXactStatus *trans; + PgStat_TableXactStatus *next_trans; + + /* delink xact_state from stack immediately to simplify reuse case */ + pgStatXactStack = xact_state->prev; + + for (trans = xact_state->first; trans != NULL; trans = next_trans) + { + PgStat_TableStatus *tabstat; + + next_trans = trans->next; + Assert(trans->nest_level == nestDepth); + tabstat = trans->parent; + Assert(tabstat->trans == trans); + if (isCommit) + { + if (trans->upper && trans->upper->nest_level == nestDepth - 1) + { + trans->upper->tuples_inserted += trans->tuples_inserted; + trans->upper->tuples_updated += trans->tuples_updated; + trans->upper->tuples_deleted += trans->tuples_deleted; + tabstat->trans = trans->upper; + pfree(trans); + } + else + { + /* + * When there isn't an immediate parent state, we can just + * reuse the record instead of going through a + * palloc/pfree pushup (this works since it's all in + * TopTransactionContext anyway). We have to re-link it + * into the parent level, though, and that might mean + * pushing a new entry into the pgStatXactStack. + */ + PgStat_SubXactStatus *upper_xact_state; + + upper_xact_state = get_tabstat_stack_level(nestDepth - 1); + trans->next = upper_xact_state->first; + upper_xact_state->first = trans; + trans->nest_level = nestDepth - 1; + } + } + else + { + /* + * On abort, update top-level tabstat counts, then forget the + * subtransaction + */ + + /* count attempted actions regardless of commit/abort */ + tabstat->t_counts.t_tuples_inserted += trans->tuples_inserted; + tabstat->t_counts.t_tuples_updated += trans->tuples_updated; + tabstat->t_counts.t_tuples_deleted += trans->tuples_deleted; + /* inserted tuples are dead, deleted tuples are unaffected */ + tabstat->t_counts.t_delta_dead_tuples += + trans->tuples_inserted + trans->tuples_updated; + tabstat->trans = trans->upper; + pfree(trans); + } + } + pfree(xact_state); } } +/* + * AtPrepare_PgStat + * Save the transactional stats state at 2PC transaction prepare. + * + * In this phase we just generate 2PC records for all the pending + * transaction-dependent stats work. + */ +void +AtPrepare_PgStat(void) +{ + PgStat_SubXactStatus *xact_state; + + xact_state = pgStatXactStack; + if (xact_state != NULL) + { + PgStat_TableXactStatus *trans; + + Assert(xact_state->nest_level == 1); + Assert(xact_state->prev == NULL); + for (trans = xact_state->first; trans != NULL; trans = trans->next) + { + PgStat_TableStatus *tabstat; + TwoPhasePgStatRecord record; + + Assert(trans->nest_level == 1); + Assert(trans->upper == NULL); + tabstat = trans->parent; + Assert(tabstat->trans == trans); + + record.tuples_inserted = trans->tuples_inserted; + record.tuples_updated = trans->tuples_updated; + record.tuples_deleted = trans->tuples_deleted; + record.t_id = tabstat->t_id; + record.t_shared = tabstat->t_shared; + + RegisterTwoPhaseRecord(TWOPHASE_RM_PGSTAT_ID, 0, + &record, sizeof(TwoPhasePgStatRecord)); + } + } +} + +/* + * PostPrepare_PgStat + * Clean up after successful PREPARE. + * + * All we need do here is unlink the transaction stats state from the + * nontransactional state. The nontransactional action counts will be + * reported to the stats collector immediately, while the effects on live + * and dead tuple counts are preserved in the 2PC state file. + * + * Note: AtEOXact_PgStat is not called during PREPARE. + */ +void +PostPrepare_PgStat(void) +{ + PgStat_SubXactStatus *xact_state; + + /* + * We don't bother to free any of the transactional state, since it's all + * in TopTransactionContext and will go away anyway. + */ + xact_state = pgStatXactStack; + if (xact_state != NULL) + { + PgStat_TableXactStatus *trans; + + for (trans = xact_state->first; trans != NULL; trans = trans->next) + { + PgStat_TableStatus *tabstat; + + tabstat = trans->parent; + tabstat->trans = NULL; + } + } + pgStatXactStack = NULL; + + /* Make sure any stats snapshot is thrown away */ + pgstat_clear_snapshot(); +} + +/* + * 2PC processing routine for COMMIT PREPARED case. + * + * Load the saved counts into our local pgstats state. + */ +void +pgstat_twophase_postcommit(TransactionId xid, uint16 info, + void *recdata, uint32 len) +{ + TwoPhasePgStatRecord *rec = (TwoPhasePgStatRecord *) recdata; + PgStat_TableStatus *pgstat_info; + + /* Find or create a tabstat entry for the rel */ + pgstat_info = get_tabstat_entry(rec->t_id, rec->t_shared); + + /* Same math as in AtEOXact_PgStat, commit case */ + pgstat_info->t_counts.t_tuples_inserted += rec->tuples_inserted; + pgstat_info->t_counts.t_tuples_updated += rec->tuples_updated; + pgstat_info->t_counts.t_tuples_deleted += rec->tuples_deleted; + pgstat_info->t_counts.t_delta_live_tuples += + rec->tuples_inserted - rec->tuples_deleted; + pgstat_info->t_counts.t_delta_dead_tuples += + rec->tuples_updated + rec->tuples_deleted; + pgstat_info->t_counts.t_changed_tuples += + rec->tuples_inserted + rec->tuples_updated + + rec->tuples_deleted; +} + +/* + * 2PC processing routine for ROLLBACK PREPARED case. + * + * Load the saved counts into our local pgstats state, but treat them + * as aborted. + */ +void +pgstat_twophase_postabort(TransactionId xid, uint16 info, + void *recdata, uint32 len) +{ + TwoPhasePgStatRecord *rec = (TwoPhasePgStatRecord *) recdata; + PgStat_TableStatus *pgstat_info; + + /* Find or create a tabstat entry for the rel */ + pgstat_info = get_tabstat_entry(rec->t_id, rec->t_shared); + + /* Same math as in AtEOXact_PgStat, abort case */ + pgstat_info->t_counts.t_tuples_inserted += rec->tuples_inserted; + pgstat_info->t_counts.t_tuples_updated += rec->tuples_updated; + pgstat_info->t_counts.t_tuples_deleted += rec->tuples_deleted; + pgstat_info->t_counts.t_delta_dead_tuples += + rec->tuples_inserted + rec->tuples_updated; +} + + /* ---------- * pgstat_fetch_stat_dbentry() - * @@ -1251,6 +2130,35 @@ pgstat_fetch_stat_tabentry(Oid relid) } +/* ---------- + * pgstat_fetch_stat_funcentry() - + * + * Support function for the SQL-callable pgstat* functions. Returns + * the collected statistics for one function or NULL. + * ---------- + */ +PgStat_StatFuncEntry * +pgstat_fetch_stat_funcentry(Oid func_id) +{ + PgStat_StatDBEntry *dbentry; + PgStat_StatFuncEntry *funcentry = NULL; + + /* load the stats file if needed */ + backend_read_statsfile(); + + /* Lookup our database, then find the requested function. */ + dbentry = pgstat_fetch_stat_dbentry(MyDatabaseId); + if (dbentry != NULL && dbentry->functions != NULL) + { + funcentry = (PgStat_StatFuncEntry *) hash_search(dbentry->functions, + (void *) &func_id, + HASH_FIND, NULL); + } + + return funcentry; +} + + /* ---------- * pgstat_fetch_stat_beentry() - * @@ -1288,6 +2196,22 @@ pgstat_fetch_stat_numbackends(void) return localNumBackends; } +/* + * --------- + * pgstat_fetch_global() - + * + * Support function for the SQL-callable pgstat* functions. Returns + * a pointer to the global statistics struct. + * --------- + */ +PgStat_GlobalStats * +pgstat_fetch_global(void) +{ + backend_read_statsfile(); + + return &globalStats; +} + /* ------------------------------------------------------------ * Functions for management of the shared-memory PgBackendStatus array @@ -1296,6 +2220,9 @@ pgstat_fetch_stat_numbackends(void) static PgBackendStatus *BackendStatusArray = NULL; static PgBackendStatus *MyBEEntry = NULL; +static char *BackendClientHostnameBuffer = NULL; +static char *BackendAppnameBuffer = NULL; +static char *BackendActivityBuffer = NULL; /* @@ -1307,19 +2234,29 @@ BackendStatusShmemSize(void) Size size; size = mul_size(sizeof(PgBackendStatus), MaxBackends); + size = add_size(size, + mul_size(NAMEDATALEN, MaxBackends)); + size = add_size(size, + mul_size(pgstat_track_activity_query_size, MaxBackends)); + size = add_size(size, + mul_size(NAMEDATALEN, MaxBackends)); return size; } /* - * Initialize the shared status array during postmaster startup. + * Initialize the shared status array and several string buffers + * during postmaster startup. */ void CreateSharedBackendStatus(void) { - Size size = BackendStatusShmemSize(); + Size size; bool found; + int i; + char *buffer; /* Create or attach to the shared array */ + size = mul_size(sizeof(PgBackendStatus), MaxBackends); BackendStatusArray = (PgBackendStatus *) ShmemInitStruct("Backend Status Array", size, &found); @@ -1330,31 +2267,104 @@ CreateSharedBackendStatus(void) */ MemSet(BackendStatusArray, 0, size); } + + /* Create or attach to the shared appname buffer */ + size = mul_size(NAMEDATALEN, MaxBackends); + BackendAppnameBuffer = (char *) + ShmemInitStruct("Backend Application Name Buffer", size, &found); + + if (!found) + { + MemSet(BackendAppnameBuffer, 0, size); + + /* Initialize st_appname pointers. */ + buffer = BackendAppnameBuffer; + for (i = 0; i < MaxBackends; i++) + { + BackendStatusArray[i].st_appname = buffer; + buffer += NAMEDATALEN; + } + } + + /* Create or attach to the shared client hostname buffer */ + size = mul_size(NAMEDATALEN, MaxBackends); + BackendClientHostnameBuffer = (char *) + ShmemInitStruct("Backend Client Host Name Buffer", size, &found); + + if (!found) + { + MemSet(BackendClientHostnameBuffer, 0, size); + + /* Initialize st_clienthostname pointers. */ + buffer = BackendClientHostnameBuffer; + for (i = 0; i < MaxBackends; i++) + { + BackendStatusArray[i].st_clienthostname = buffer; + buffer += NAMEDATALEN; + } + } + + /* Create or attach to the shared activity buffer */ + size = mul_size(pgstat_track_activity_query_size, MaxBackends); + BackendActivityBuffer = (char *) + ShmemInitStruct("Backend Activity Buffer", size, &found); + + if (!found) + { + MemSet(BackendActivityBuffer, 0, size); + + /* Initialize st_activity pointers. */ + buffer = BackendActivityBuffer; + for (i = 0; i < MaxBackends; i++) + { + BackendStatusArray[i].st_activity = buffer; + buffer += pgstat_track_activity_query_size; + } + } } +/* ---------- + * pgstat_initialize() - + * + * Initialize pgstats state, and set up our on-proc-exit hook. + * Called from InitPostgres. MyBackendId must be set, + * but we must not have started any transaction yet (since the + * exit hook must run after the last transaction exit). + * NOTE: MyDatabaseId isn't set yet; so the shutdown hook has to be careful. + * ---------- + */ +void +pgstat_initialize(void) +{ + /* Initialize MyBEEntry */ + Assert(MyBackendId >= 1 && MyBackendId <= MaxBackends); + MyBEEntry = &BackendStatusArray[MyBackendId - 1]; + + /* Set up a process-exit hook to clean up */ + on_shmem_exit(pgstat_beshutdown_hook, 0); +} + /* ---------- * pgstat_bestart() - * - * Initialize this backend's entry in the PgBackendStatus array, - * and set up an on-proc-exit hook that will clear it again. - * Called from InitPostgres. MyBackendId and MyDatabaseId must be set. + * Initialize this backend's entry in the PgBackendStatus array. + * Called from InitPostgres. + * MyDatabaseId, session userid, and application_name must be set + * (hence, this cannot be combined with pgstat_initialize). * ---------- */ void pgstat_bestart(void) { - volatile PgBackendStatus *beentry; TimestampTz proc_start_timestamp; Oid userid; SockAddr clientaddr; - - Assert(MyBackendId >= 1 && MyBackendId <= MaxBackends); - MyBEEntry = &BackendStatusArray[MyBackendId - 1]; + volatile PgBackendStatus *beentry; /* - * To minimize the time spent modifying the entry, fetch all the needed - * data first. + * To minimize the time spent modifying the PgBackendStatus entry, fetch + * all the needed data first. * * If we have a MyProcPort, use its session start time (for consistency, * and to save a kernel call). @@ -1390,22 +2400,28 @@ pgstat_bestart(void) beentry->st_procpid = MyProcPid; beentry->st_proc_start_timestamp = proc_start_timestamp; beentry->st_activity_start_timestamp = 0; - beentry->st_txn_start_timestamp = 0; + beentry->st_xact_start_timestamp = 0; beentry->st_databaseid = MyDatabaseId; beentry->st_userid = userid; beentry->st_clientaddr = clientaddr; + beentry->st_clienthostname[0] = '\0'; beentry->st_waiting = false; + beentry->st_appname[0] = '\0'; beentry->st_activity[0] = '\0'; - /* Also make sure the last byte in the string area is always 0 */ - beentry->st_activity[PGBE_ACTIVITY_SIZE - 1] = '\0'; + /* Also make sure the last byte in each string area is always 0 */ + beentry->st_clienthostname[NAMEDATALEN - 1] = '\0'; + beentry->st_appname[NAMEDATALEN - 1] = '\0'; + beentry->st_activity[pgstat_track_activity_query_size - 1] = '\0'; beentry->st_changecount++; Assert((beentry->st_changecount & 1) == 0); - /* - * Set up a process-exit hook to clean up. - */ - on_shmem_exit(pgstat_beshutdown_hook, 0); + if (MyProcPort && MyProcPort->remote_hostname) + strlcpy(beentry->st_clienthostname, MyProcPort->remote_hostname, NAMEDATALEN); + + /* Update app name to current GUC setting */ + if (application_name) + pgstat_report_appname(application_name); } /* @@ -1422,7 +2438,14 @@ pgstat_beshutdown_hook(int code, Datum arg) { volatile PgBackendStatus *beentry = MyBEEntry; - pgstat_report_tabstat(); + /* + * If we got as far as discovering our own database ID, we can report what + * we did to the collector. Otherwise, we'd be sending an invalid + * database ID, so forget it. (This means that accesses to pg_database + * during failed backend starts might never get counted.) + */ + if (OidIsValid(MyDatabaseId)) + pgstat_report_stat(true); /* * Clear my status entry, following the protocol of bumping st_changecount @@ -1452,7 +2475,9 @@ pgstat_report_activity(const char *cmd_str) TimestampTz start_timestamp; int len; - if (!pgstat_collect_querystring || !beentry) + TRACE_POSTGRESQL_STATEMENT_STATUS(cmd_str); + + if (!pgstat_track_activities || !beentry) return; /* @@ -1462,7 +2487,7 @@ pgstat_report_activity(const char *cmd_str) start_timestamp = GetCurrentStatementStartTimestamp(); len = strlen(cmd_str); - len = pg_mbcliplen(cmd_str, len, PGBE_ACTIVITY_SIZE - 1); + len = pg_mbcliplen(cmd_str, len, pgstat_track_activity_query_size - 1); /* * Update my status entry, following the protocol of bumping @@ -1479,26 +2504,57 @@ pgstat_report_activity(const char *cmd_str) Assert((beentry->st_changecount & 1) == 0); } +/* ---------- + * pgstat_report_appname() - + * + * Called to update our application name. + * ---------- + */ +void +pgstat_report_appname(const char *appname) +{ + volatile PgBackendStatus *beentry = MyBEEntry; + int len; + + if (!beentry) + return; + + /* This should be unnecessary if GUC did its job, but be safe */ + len = pg_mbcliplen(appname, strlen(appname), NAMEDATALEN - 1); + + /* + * Update my status entry, following the protocol of bumping + * st_changecount before and after. We use a volatile pointer here to + * ensure the compiler doesn't try to get cute. + */ + beentry->st_changecount++; + + memcpy((char *) beentry->st_appname, appname, len); + beentry->st_appname[len] = '\0'; + + beentry->st_changecount++; + Assert((beentry->st_changecount & 1) == 0); +} + /* - * Set the current transaction start timestamp to the specified - * value. If there is no current active transaction, this is signified - * by 0. + * Report current transaction start timestamp as the specified value. + * Zero means there is no active transaction. */ void -pgstat_report_txn_timestamp(TimestampTz tstamp) +pgstat_report_xact_timestamp(TimestampTz tstamp) { volatile PgBackendStatus *beentry = MyBEEntry; - if (!pgstat_collect_querystring || !beentry) + if (!pgstat_track_activities || !beentry) return; /* * Update my status entry, following the protocol of bumping - * st_changecount before and after. We use a volatile pointer - * here to ensure the compiler doesn't try to get cute. + * st_changecount before and after. We use a volatile pointer here to + * ensure the compiler doesn't try to get cute. */ beentry->st_changecount++; - beentry->st_txn_start_timestamp = tstamp; + beentry->st_xact_start_timestamp = tstamp; beentry->st_changecount++; Assert((beentry->st_changecount & 1) == 0); } @@ -1517,7 +2573,7 @@ pgstat_report_waiting(bool waiting) { volatile PgBackendStatus *beentry = MyBEEntry; - if (!pgstat_collect_querystring || !beentry) + if (!pgstat_track_activities || !beentry) return; /* @@ -1542,6 +2598,8 @@ pgstat_read_current_status(void) volatile PgBackendStatus *beentry; PgBackendStatus *localtable; PgBackendStatus *localentry; + char *localappname, + *localactivity; int i; Assert(!pgStatRunningInCollector); @@ -1553,6 +2611,12 @@ pgstat_read_current_status(void) localtable = (PgBackendStatus *) MemoryContextAlloc(pgStatLocalContext, sizeof(PgBackendStatus) * MaxBackends); + localappname = (char *) + MemoryContextAlloc(pgStatLocalContext, + NAMEDATALEN * MaxBackends); + localactivity = (char *) + MemoryContextAlloc(pgStatLocalContext, + pgstat_track_activity_query_size * MaxBackends); localNumBackends = 0; beentry = BackendStatusArray; @@ -1570,11 +2634,20 @@ pgstat_read_current_status(void) { int save_changecount = beentry->st_changecount; - /* - * XXX if PGBE_ACTIVITY_SIZE is really large, it might be best to - * use strcpy not memcpy for copying the activity string? - */ - memcpy(localentry, (char *) beentry, sizeof(PgBackendStatus)); + localentry->st_procpid = beentry->st_procpid; + if (localentry->st_procpid > 0) + { + memcpy(localentry, (char *) beentry, sizeof(PgBackendStatus)); + + /* + * strcpy is safe even if the string is modified concurrently, + * because there's always a \0 at the end of the buffer. + */ + strcpy(localappname, (char *) beentry->st_appname); + localentry->st_appname = localappname; + strcpy(localactivity, (char *) beentry->st_activity); + localentry->st_activity = localactivity; + } if (save_changecount == beentry->st_changecount && (save_changecount & 1) == 0) @@ -1589,6 +2662,8 @@ pgstat_read_current_status(void) if (localentry->st_procpid > 0) { localentry++; + localappname += NAMEDATALEN; + localactivity += pgstat_track_activity_query_size; localNumBackends++; } } @@ -1598,6 +2673,80 @@ pgstat_read_current_status(void) } +/* ---------- + * pgstat_get_backend_current_activity() - + * + * Return a string representing the current activity of the backend with + * the specified PID. This looks directly at the BackendStatusArray, + * and so will provide current information regardless of the age of our + * transaction's snapshot of the status array. + * + * It is the caller's responsibility to invoke this only for backends whose + * state is expected to remain stable while the result is in use. The + * only current use is in deadlock reporting, where we can expect that + * the target backend is blocked on a lock. (There are corner cases + * where the target's wait could get aborted while we are looking at it, + * but the very worst consequence is to return a pointer to a string + * that's been changed, so we won't worry too much.) + * + * Note: return strings for special cases match pg_stat_get_backend_activity. + * ---------- + */ +const char * +pgstat_get_backend_current_activity(int pid, bool checkUser) +{ + PgBackendStatus *beentry; + int i; + + beentry = BackendStatusArray; + for (i = 1; i <= MaxBackends; i++) + { + /* + * Although we expect the target backend's entry to be stable, that + * doesn't imply that anyone else's is. To avoid identifying the + * wrong backend, while we check for a match to the desired PID we + * must follow the protocol of retrying if st_changecount changes + * while we examine the entry, or if it's odd. (This might be + * unnecessary, since fetching or storing an int is almost certainly + * atomic, but let's play it safe.) We use a volatile pointer here to + * ensure the compiler doesn't try to get cute. + */ + volatile PgBackendStatus *vbeentry = beentry; + bool found; + + for (;;) + { + int save_changecount = vbeentry->st_changecount; + + found = (vbeentry->st_procpid == pid); + + if (save_changecount == vbeentry->st_changecount && + (save_changecount & 1) == 0) + break; + + /* Make sure we can break out of loop if stuck... */ + CHECK_FOR_INTERRUPTS(); + } + + if (found) + { + /* Now it is safe to use the non-volatile pointer */ + if (checkUser && !superuser() && beentry->st_userid != GetUserId()) + return ""; + else if (*(beentry->st_activity) == '\0') + return ""; + else + return beentry->st_activity; + } + + beentry++; + } + + /* If we get here, caller is in error ... */ + return ""; +} + + /* ------------------------------------------------------------ * Local support functions follow * ------------------------------------------------------------ @@ -1628,7 +2777,7 @@ pgstat_send(void *msg, int len) { int rc; - if (pgStatSock < 0) + if (pgStatSock == PGINVALID_SOCKET) return; ((PgStat_MsgHdr *) msg)->m_size = len; @@ -1646,6 +2795,38 @@ pgstat_send(void *msg, int len) #endif } +/* ---------- + * pgstat_send_bgwriter() - + * + * Send bgwriter statistics to the collector + * ---------- + */ +void +pgstat_send_bgwriter(void) +{ + /* We assume this initializes to zeroes */ + static const PgStat_MsgBgWriter all_zeroes; + + /* + * This function can be called even if nothing at all has happened. In + * this case, avoid sending a completely empty message to the stats + * collector. + */ + if (memcmp(&BgWriterStats, &all_zeroes, sizeof(PgStat_MsgBgWriter)) == 0) + return; + + /* + * Prepare and send the message + */ + pgstat_setheader(&BgWriterStats.m_hdr, PGSTAT_MTYPE_BGWRITER); + pgstat_send(&BgWriterStats, sizeof(BgWriterStats)); + + /* + * Clear out the statistics buffer, so it can be re-used. + */ + MemSet(&BgWriterStats, 0, sizeof(BgWriterStats)); +} + /* ---------- * PgstatCollectorMain() - @@ -1659,8 +2840,6 @@ pgstat_send(void *msg, int len) NON_EXEC_STATIC void PgstatCollectorMain(int argc, char *argv[]) { - struct itimerval write_timeout; - bool need_timer = false; int len; PgStat_Msg msg; @@ -1677,11 +2856,13 @@ PgstatCollectorMain(int argc, char *argv[]) MyProcPid = getpid(); /* reset MyProcPid */ + MyStartTime = time(NULL); /* record Start Time for logging */ + /* * If possible, make this process a group leader, so that the postmaster - * can signal any child processes too. (pgstat probably never has - * any child processes, but for consistency we make all postmaster - * child processes do this.) + * can signal any child processes too. (pgstat probably never has any + * child processes, but for consistency we make all postmaster child + * processes do this.) */ #ifdef HAVE_SETSID if (setsid() < 0) @@ -1690,13 +2871,13 @@ PgstatCollectorMain(int argc, char *argv[]) /* * Ignore all signals usually bound to some action in the postmaster, - * except SIGQUIT and SIGALRM. + * except SIGQUIT. */ - pqsignal(SIGHUP, SIG_IGN); + pqsignal(SIGHUP, pgstat_sighup_handler); pqsignal(SIGINT, SIG_IGN); pqsignal(SIGTERM, SIG_IGN); pqsignal(SIGQUIT, pgstat_exit); - pqsignal(SIGALRM, force_statwrite); + pqsignal(SIGALRM, SIG_IGN); pqsignal(SIGPIPE, SIG_IGN); pqsignal(SIGUSR1, SIG_IGN); pqsignal(SIGUSR2, SIG_IGN); @@ -1715,19 +2896,15 @@ PgstatCollectorMain(int argc, char *argv[]) /* * Arrange to write the initial status file right away */ - need_statwrite = true; - - /* Preset the delay between status file writes */ - MemSet(&write_timeout, 0, sizeof(struct itimerval)); - write_timeout.it_value.tv_sec = PGSTAT_STAT_INTERVAL / 1000; - write_timeout.it_value.tv_usec = (PGSTAT_STAT_INTERVAL % 1000) * 1000; + last_statrequest = GetCurrentTimestamp(); + last_statwrite = last_statrequest - 1; /* * Read in an existing statistics stats file or initialize the stats to * zero. */ pgStatRunningInCollector = true; - pgStatDBHash = pgstat_read_statsfile(InvalidOid); + pgStatDBHash = pgstat_read_statsfile(InvalidOid, true); /* * Setup the descriptor set for select(2). Since only one bit in the set @@ -1742,8 +2919,9 @@ PgstatCollectorMain(int argc, char *argv[]) * death of our parent postmaster. * * For performance reasons, we don't want to do a PostmasterIsAlive() test - * after every message; instead, do it at statwrite time and if - * select()/poll() is interrupted by timeout. + * after every message; instead, do it only when select()/poll() is + * interrupted by timeout. In essence, we'll stay alive as long as + * backends keep sending us stuff often, even if the postmaster is gone. */ for (;;) { @@ -1756,22 +2934,21 @@ PgstatCollectorMain(int argc, char *argv[]) break; /* - * If time to write the stats file, do so. Note that the alarm - * interrupt isn't re-enabled immediately, but only after we next - * receive a stats message; so no cycles are wasted when there is - * nothing going on. + * Reload configuration if we got SIGHUP from the postmaster. */ - if (need_statwrite) + if (got_SIGHUP) { - /* Check for postmaster death; if so we'll write file below */ - if (!PostmasterIsAlive(true)) - break; - - pgstat_write_statsfile(); - need_statwrite = false; - need_timer = true; + ProcessConfigFile(PGC_SIGHUP); + got_SIGHUP = false; } + /* + * Write the stats file if a new request has arrived that is not + * satisfied by existing file. + */ + if (last_statwrite < last_statrequest) + pgstat_write_statsfile(false); + /* * Wait for a message to arrive; but not for more than * PGSTAT_SELECT_TIMEOUT seconds. (This determines how quickly we will @@ -1780,8 +2957,8 @@ PgstatCollectorMain(int argc, char *argv[]) * poll/select call, so this also limits speed of response to SIGQUIT, * which is more important.) * - * We use poll(2) if available, otherwise select(2). - * Win32 has its own implementation. + * We use poll(2) if available, otherwise select(2). Win32 has its own + * implementation. */ #ifndef WIN32 #ifdef HAVE_POLL @@ -1821,9 +2998,9 @@ PgstatCollectorMain(int argc, char *argv[]) got_data = FD_ISSET(pgStatSock, &rfds); #endif /* HAVE_POLL */ -#else /* WIN32 */ +#else /* WIN32 */ got_data = pgwin32_waitforsinglesocket(pgStatSock, FD_READ, - PGSTAT_SELECT_TIMEOUT*1000); + PGSTAT_SELECT_TIMEOUT * 1000); #endif /* @@ -1863,6 +3040,10 @@ PgstatCollectorMain(int argc, char *argv[]) case PGSTAT_MTYPE_DUMMY: break; + case PGSTAT_MTYPE_INQUIRY: + pgstat_recv_inquiry((PgStat_MsgInquiry *) &msg, len); + break; + case PGSTAT_MTYPE_TABSTAT: pgstat_recv_tabstat((PgStat_MsgTabstat *) &msg, len); break; @@ -1880,6 +3061,18 @@ PgstatCollectorMain(int argc, char *argv[]) len); break; + case PGSTAT_MTYPE_RESETSHAREDCOUNTER: + pgstat_recv_resetsharedcounter( + (PgStat_MsgResetsharedcounter *) &msg, + len); + break; + + case PGSTAT_MTYPE_RESETSINGLECOUNTER: + pgstat_recv_resetsinglecounter( + (PgStat_MsgResetsinglecounter *) &msg, + len); + break; + case PGSTAT_MTYPE_AUTOVAC_START: pgstat_recv_autovac((PgStat_MsgAutovacStart *) &msg, len); break; @@ -1892,21 +3085,24 @@ PgstatCollectorMain(int argc, char *argv[]) pgstat_recv_analyze((PgStat_MsgAnalyze *) &msg, len); break; - default: + case PGSTAT_MTYPE_BGWRITER: + pgstat_recv_bgwriter((PgStat_MsgBgWriter *) &msg, len); break; - } - /* - * If this is the first message after we wrote the stats file the - * last time, enable the alarm interrupt to make it be written - * again later. - */ - if (need_timer) - { - if (setitimer(ITIMER_REAL, &write_timeout, NULL)) - ereport(ERROR, - (errmsg("could not set statistics collector timer: %m"))); - need_timer = false; + case PGSTAT_MTYPE_FUNCSTAT: + pgstat_recv_funcstat((PgStat_MsgFuncstat *) &msg, len); + break; + + case PGSTAT_MTYPE_FUNCPURGE: + pgstat_recv_funcpurge((PgStat_MsgFuncpurge *) &msg, len); + break; + + case PGSTAT_MTYPE_RECOVERYCONFLICT: + pgstat_recv_recoveryconflict((PgStat_MsgRecoveryConflict *) &msg, len); + break; + + default: + break; } } else @@ -1923,7 +3119,7 @@ PgstatCollectorMain(int argc, char *argv[]) /* * Save the final stats to reuse at next startup. */ - pgstat_write_statsfile(); + pgstat_write_statsfile(true); exit(0); } @@ -1936,11 +3132,11 @@ pgstat_exit(SIGNAL_ARGS) need_exit = true; } -/* SIGALRM signal handler for collector process */ +/* SIGHUP handler for collector process */ static void -force_statwrite(SIGNAL_ARGS) +pgstat_sighup_handler(SIGNAL_ARGS) { - need_statwrite = true; + got_SIGHUP = true; } @@ -1970,6 +3166,7 @@ pgstat_get_db_entry(Oid databaseid, bool create) HASHCTL hash_ctl; result->tables = NULL; + result->functions = NULL; result->n_xact_commit = 0; result->n_xact_rollback = 0; result->n_blocks_fetched = 0; @@ -1980,6 +3177,13 @@ pgstat_get_db_entry(Oid databaseid, bool create) result->n_tuples_updated = 0; result->n_tuples_deleted = 0; result->last_autovac_time = 0; + result->n_conflict_tablespace = 0; + result->n_conflict_lock = 0; + result->n_conflict_snapshot = 0; + result->n_conflict_bufferpin = 0; + result->n_conflict_startup_deadlock = 0; + + result->stat_reset_timestamp = GetCurrentTimestamp(); memset(&hash_ctl, 0, sizeof(hash_ctl)); hash_ctl.keysize = sizeof(Oid); @@ -1989,6 +3193,63 @@ pgstat_get_db_entry(Oid databaseid, bool create) PGSTAT_TAB_HASH_SIZE, &hash_ctl, HASH_ELEM | HASH_FUNCTION); + + hash_ctl.keysize = sizeof(Oid); + hash_ctl.entrysize = sizeof(PgStat_StatFuncEntry); + hash_ctl.hash = oid_hash; + result->functions = hash_create("Per-database function", + PGSTAT_FUNCTION_HASH_SIZE, + &hash_ctl, + HASH_ELEM | HASH_FUNCTION); + } + + return result; +} + + +/* + * Lookup the hash table entry for the specified table. If no hash + * table entry exists, initialize it, if the create parameter is true. + * Else, return NULL. + */ +static PgStat_StatTabEntry * +pgstat_get_tab_entry(PgStat_StatDBEntry *dbentry, Oid tableoid, bool create) +{ + PgStat_StatTabEntry *result; + bool found; + HASHACTION action = (create ? HASH_ENTER : HASH_FIND); + + /* Lookup or create the hash table entry for this table */ + result = (PgStat_StatTabEntry *) hash_search(dbentry->tables, + &tableoid, + action, &found); + + if (!create && !found) + return NULL; + + /* If not found, initialize the new one. */ + if (!found) + { + result->numscans = 0; + result->tuples_returned = 0; + result->tuples_fetched = 0; + result->tuples_inserted = 0; + result->tuples_updated = 0; + result->tuples_deleted = 0; + result->tuples_hot_updated = 0; + result->n_live_tuples = 0; + result->n_dead_tuples = 0; + result->changes_since_analyze = 0; + result->blocks_fetched = 0; + result->blocks_hit = 0; + result->vacuum_timestamp = 0; + result->vacuum_count = 0; + result->autovac_vacuum_timestamp = 0; + result->autovac_vacuum_count = 0; + result->analyze_timestamp = 0; + result->analyze_count = 0; + result->autovac_analyze_timestamp = 0; + result->autovac_analyze_count = 0; } return result; @@ -1999,37 +3260,55 @@ pgstat_get_db_entry(Oid databaseid, bool create) * pgstat_write_statsfile() - * * Tell the news. + * If writing to the permanent file (happens when the collector is + * shutting down only), remove the temporary file so that backends + * starting up under a new postmaster can't read the old data before + * the new collector is ready. * ---------- */ static void -pgstat_write_statsfile(void) +pgstat_write_statsfile(bool permanent) { HASH_SEQ_STATUS hstat; HASH_SEQ_STATUS tstat; + HASH_SEQ_STATUS fstat; PgStat_StatDBEntry *dbentry; PgStat_StatTabEntry *tabentry; + PgStat_StatFuncEntry *funcentry; FILE *fpout; int32 format_id; + const char *tmpfile = permanent ? PGSTAT_STAT_PERMANENT_TMPFILE : pgstat_stat_tmpname; + const char *statfile = permanent ? PGSTAT_STAT_PERMANENT_FILENAME : pgstat_stat_filename; /* * Open the statistics temp file to write out the current values. */ - fpout = fopen(PGSTAT_STAT_TMPFILE, PG_BINARY_W); + fpout = AllocateFile(tmpfile, PG_BINARY_W); if (fpout == NULL) { ereport(LOG, (errcode_for_file_access(), errmsg("could not open temporary statistics file \"%s\": %m", - PGSTAT_STAT_TMPFILE))); + tmpfile))); return; } + /* + * Set the timestamp of the stats file. + */ + globalStats.stats_timestamp = GetCurrentTimestamp(); + /* * Write the file header --- currently just a format ID. */ format_id = PGSTAT_FILE_FORMAT_ID; fwrite(&format_id, sizeof(format_id), 1, fpout); + /* + * Write global stats struct + */ + fwrite(&globalStats, sizeof(globalStats), 1, fpout); + /* * Walk through the database table. */ @@ -2038,8 +3317,8 @@ pgstat_write_statsfile(void) { /* * Write out the DB entry including the number of live backends. We - * don't write the tables pointer since it's of no use to any other - * process. + * don't write the tables or functions pointers, since they're of no + * use to any other process. */ fputc('D', fpout); fwrite(dbentry, offsetof(PgStat_StatDBEntry, tables), 1, fpout); @@ -2054,6 +3333,16 @@ pgstat_write_statsfile(void) fwrite(tabentry, sizeof(PgStat_StatTabEntry), 1, fpout); } + /* + * Walk through the database's function stats table. + */ + hash_seq_init(&fstat, dbentry->functions); + while ((funcentry = (PgStat_StatFuncEntry *) hash_seq_search(&fstat)) != NULL) + { + fputc('F', fpout); + fwrite(funcentry, sizeof(PgStat_StatFuncEntry), 1, fpout); + } + /* * Mark the end of this DB */ @@ -2072,26 +3361,58 @@ pgstat_write_statsfile(void) ereport(LOG, (errcode_for_file_access(), errmsg("could not write temporary statistics file \"%s\": %m", - PGSTAT_STAT_TMPFILE))); - fclose(fpout); - unlink(PGSTAT_STAT_TMPFILE); + tmpfile))); + FreeFile(fpout); + unlink(tmpfile); } - else if (fclose(fpout) < 0) + else if (FreeFile(fpout) < 0) { ereport(LOG, (errcode_for_file_access(), errmsg("could not close temporary statistics file \"%s\": %m", - PGSTAT_STAT_TMPFILE))); - unlink(PGSTAT_STAT_TMPFILE); + tmpfile))); + unlink(tmpfile); } - else if (rename(PGSTAT_STAT_TMPFILE, PGSTAT_STAT_FILENAME) < 0) + else if (rename(tmpfile, statfile) < 0) { ereport(LOG, (errcode_for_file_access(), errmsg("could not rename temporary statistics file \"%s\" to \"%s\": %m", - PGSTAT_STAT_TMPFILE, PGSTAT_STAT_FILENAME))); - unlink(PGSTAT_STAT_TMPFILE); + tmpfile, statfile))); + unlink(tmpfile); } + else + { + /* + * Successful write, so update last_statwrite. + */ + last_statwrite = globalStats.stats_timestamp; + + /* + * If there is clock skew between backends and the collector, we could + * receive a stats request time that's in the future. If so, complain + * and reset last_statrequest. Resetting ensures that no inquiry + * message can cause more than one stats file write to occur. + */ + if (last_statrequest > last_statwrite) + { + char *reqtime; + char *mytime; + + /* Copy because timestamptz_to_str returns a static buffer */ + reqtime = pstrdup(timestamptz_to_str(last_statrequest)); + mytime = pstrdup(timestamptz_to_str(last_statwrite)); + elog(LOG, "last_statrequest %s is later than collector's time %s", + reqtime, mytime); + pfree(reqtime); + pfree(mytime); + + last_statrequest = last_statwrite; + } + } + + if (permanent) + unlink(pgstat_stat_filename); } @@ -2103,18 +3424,22 @@ pgstat_write_statsfile(void) * ---------- */ static HTAB * -pgstat_read_statsfile(Oid onlydb) +pgstat_read_statsfile(Oid onlydb, bool permanent) { PgStat_StatDBEntry *dbentry; PgStat_StatDBEntry dbbuf; PgStat_StatTabEntry *tabentry; PgStat_StatTabEntry tabbuf; + PgStat_StatFuncEntry funcbuf; + PgStat_StatFuncEntry *funcentry; HASHCTL hash_ctl; HTAB *dbhash; HTAB *tabhash = NULL; + HTAB *funchash = NULL; FILE *fpin; int32 format_id; bool found; + const char *statfile = permanent ? PGSTAT_STAT_PERMANENT_FILENAME : pgstat_stat_filename; /* * The tables will live in pgStatLocalContext. @@ -2132,22 +3457,55 @@ pgstat_read_statsfile(Oid onlydb) dbhash = hash_create("Databases hash", PGSTAT_DB_HASH_SIZE, &hash_ctl, HASH_ELEM | HASH_FUNCTION | HASH_CONTEXT); + /* + * Clear out global statistics so they start from zero in case we can't + * load an existing statsfile. + */ + memset(&globalStats, 0, sizeof(globalStats)); + + /* + * Set the current timestamp (will be kept only in case we can't load an + * existing statsfile. + */ + globalStats.stat_reset_timestamp = GetCurrentTimestamp(); + /* * Try to open the status file. If it doesn't exist, the backends simply * return zero for anything and the collector simply starts from scratch * with empty counters. + * + * ENOENT is a possibility if the stats collector is not running or has + * not yet written the stats file the first time. Any other failure + * condition is suspicious. */ - if ((fpin = AllocateFile(PGSTAT_STAT_FILENAME, PG_BINARY_R)) == NULL) + if ((fpin = AllocateFile(statfile, PG_BINARY_R)) == NULL) + { + if (errno != ENOENT) + ereport(pgStatRunningInCollector ? LOG : WARNING, + (errcode_for_file_access(), + errmsg("could not open statistics file \"%s\": %m", + statfile))); return dbhash; + } + + /* + * Verify it's of the expected format. + */ + if (fread(&format_id, 1, sizeof(format_id), fpin) != sizeof(format_id) + || format_id != PGSTAT_FILE_FORMAT_ID) + { + ereport(pgStatRunningInCollector ? LOG : WARNING, + (errmsg("corrupted statistics file \"%s\"", statfile))); + goto done; + } /* - * Verify it's of the expected format. + * Read global stats struct */ - if (fread(&format_id, 1, sizeof(format_id), fpin) != sizeof(format_id) - || format_id != PGSTAT_FILE_FORMAT_ID) + if (fread(&globalStats, 1, sizeof(globalStats), fpin) != sizeof(globalStats)) { ereport(pgStatRunningInCollector ? LOG : WARNING, - (errmsg("corrupted pgstat.stat file"))); + (errmsg("corrupted statistics file \"%s\"", statfile))); goto done; } @@ -2161,15 +3519,16 @@ pgstat_read_statsfile(Oid onlydb) { /* * 'D' A PgStat_StatDBEntry struct describing a database - * follows. Subsequently, zero to many 'T' entries will follow - * until a 'd' is encountered. + * follows. Subsequently, zero to many 'T' and 'F' entries + * will follow until a 'd' is encountered. */ case 'D': if (fread(&dbbuf, 1, offsetof(PgStat_StatDBEntry, tables), fpin) != offsetof(PgStat_StatDBEntry, tables)) { ereport(pgStatRunningInCollector ? LOG : WARNING, - (errmsg("corrupted pgstat.stat file"))); + (errmsg("corrupted statistics file \"%s\"", + statfile))); goto done; } @@ -2183,12 +3542,14 @@ pgstat_read_statsfile(Oid onlydb) if (found) { ereport(pgStatRunningInCollector ? LOG : WARNING, - (errmsg("corrupted pgstat.stat file"))); + (errmsg("corrupted statistics file \"%s\"", + statfile))); goto done; } memcpy(dbentry, &dbbuf, sizeof(PgStat_StatDBEntry)); dbentry->tables = NULL; + dbentry->functions = NULL; /* * Don't collect tables if not the requested DB (or the @@ -2209,13 +3570,23 @@ pgstat_read_statsfile(Oid onlydb) dbentry->tables = hash_create("Per-database table", PGSTAT_TAB_HASH_SIZE, &hash_ctl, - HASH_ELEM | HASH_FUNCTION | HASH_CONTEXT); + HASH_ELEM | HASH_FUNCTION | HASH_CONTEXT); + + hash_ctl.keysize = sizeof(Oid); + hash_ctl.entrysize = sizeof(PgStat_StatFuncEntry); + hash_ctl.hash = oid_hash; + hash_ctl.hcxt = pgStatLocalContext; + dbentry->functions = hash_create("Per-database function", + PGSTAT_FUNCTION_HASH_SIZE, + &hash_ctl, + HASH_ELEM | HASH_FUNCTION | HASH_CONTEXT); /* - * Arrange that following 'T's add entries to this database's - * tables hash table. + * Arrange that following records add entries to this + * database's hash tables. */ tabhash = dbentry->tables; + funchash = dbentry->functions; break; /* @@ -2223,6 +3594,7 @@ pgstat_read_statsfile(Oid onlydb) */ case 'd': tabhash = NULL; + funchash = NULL; break; /* @@ -2233,7 +3605,8 @@ pgstat_read_statsfile(Oid onlydb) fpin) != sizeof(PgStat_StatTabEntry)) { ereport(pgStatRunningInCollector ? LOG : WARNING, - (errmsg("corrupted pgstat.stat file"))); + (errmsg("corrupted statistics file \"%s\"", + statfile))); goto done; } @@ -2250,13 +3623,48 @@ pgstat_read_statsfile(Oid onlydb) if (found) { ereport(pgStatRunningInCollector ? LOG : WARNING, - (errmsg("corrupted pgstat.stat file"))); + (errmsg("corrupted statistics file \"%s\"", + statfile))); goto done; } memcpy(tabentry, &tabbuf, sizeof(tabbuf)); break; + /* + * 'F' A PgStat_StatFuncEntry follows. + */ + case 'F': + if (fread(&funcbuf, 1, sizeof(PgStat_StatFuncEntry), + fpin) != sizeof(PgStat_StatFuncEntry)) + { + ereport(pgStatRunningInCollector ? LOG : WARNING, + (errmsg("corrupted statistics file \"%s\"", + statfile))); + goto done; + } + + /* + * Skip if function belongs to a not requested database. + */ + if (funchash == NULL) + break; + + funcentry = (PgStat_StatFuncEntry *) hash_search(funchash, + (void *) &funcbuf.functionid, + HASH_ENTER, &found); + + if (found) + { + ereport(pgStatRunningInCollector ? LOG : WARNING, + (errmsg("corrupted statistics file \"%s\"", + statfile))); + goto done; + } + + memcpy(funcentry, &funcbuf, sizeof(funcbuf)); + break; + /* * 'E' The EOF marker of a complete stats file. */ @@ -2265,7 +3673,8 @@ pgstat_read_statsfile(Oid onlydb) default: ereport(pgStatRunningInCollector ? LOG : WARNING, - (errmsg("corrupted pgstat.stat file"))); + (errmsg("corrupted statistics file \"%s\"", + statfile))); goto done; } } @@ -2273,9 +3682,70 @@ pgstat_read_statsfile(Oid onlydb) done: FreeFile(fpin); + if (permanent) + unlink(PGSTAT_STAT_PERMANENT_FILENAME); + return dbhash; } +/* ---------- + * pgstat_read_statsfile_timestamp() - + * + * Attempt to fetch the timestamp of an existing stats file. + * Returns TRUE if successful (timestamp is stored at *ts). + * ---------- + */ +static bool +pgstat_read_statsfile_timestamp(bool permanent, TimestampTz *ts) +{ + PgStat_GlobalStats myGlobalStats; + FILE *fpin; + int32 format_id; + const char *statfile = permanent ? PGSTAT_STAT_PERMANENT_FILENAME : pgstat_stat_filename; + + /* + * Try to open the status file. As above, anything but ENOENT is worthy + * of complaining about. + */ + if ((fpin = AllocateFile(statfile, PG_BINARY_R)) == NULL) + { + if (errno != ENOENT) + ereport(pgStatRunningInCollector ? LOG : WARNING, + (errcode_for_file_access(), + errmsg("could not open statistics file \"%s\": %m", + statfile))); + return false; + } + + /* + * Verify it's of the expected format. + */ + if (fread(&format_id, 1, sizeof(format_id), fpin) != sizeof(format_id) + || format_id != PGSTAT_FILE_FORMAT_ID) + { + ereport(pgStatRunningInCollector ? LOG : WARNING, + (errmsg("corrupted statistics file \"%s\"", statfile))); + FreeFile(fpin); + return false; + } + + /* + * Read global stats struct + */ + if (fread(&myGlobalStats, 1, sizeof(myGlobalStats), fpin) != sizeof(myGlobalStats)) + { + ereport(pgStatRunningInCollector ? LOG : WARNING, + (errmsg("corrupted statistics file \"%s\"", statfile))); + FreeFile(fpin); + return false; + } + + *ts = myGlobalStats.stats_timestamp; + + FreeFile(fpin); + return true; +} + /* * If not already done, read the statistics collector stats file into * some hash tables. The results will be kept until pgstat_clear_snapshot() @@ -2284,16 +3754,63 @@ done: static void backend_read_statsfile(void) { + TimestampTz min_ts; + int count; + /* already read it? */ if (pgStatDBHash) return; Assert(!pgStatRunningInCollector); + /* + * We set the minimum acceptable timestamp to PGSTAT_STAT_INTERVAL msec + * before now. This indirectly ensures that the collector needn't write + * the file more often than PGSTAT_STAT_INTERVAL. In an autovacuum + * worker, however, we want a lower delay to avoid using stale data, so we + * use PGSTAT_RETRY_DELAY (since the number of worker is low, this + * shouldn't be a problem). + * + * Note that we don't recompute min_ts after sleeping; so we might end up + * accepting a file a bit older than PGSTAT_STAT_INTERVAL. In practice + * that shouldn't happen, though, as long as the sleep time is less than + * PGSTAT_STAT_INTERVAL; and we don't want to lie to the collector about + * what our cutoff time really is. + */ + if (IsAutoVacuumWorkerProcess()) + min_ts = TimestampTzPlusMilliseconds(GetCurrentTimestamp(), + -PGSTAT_RETRY_DELAY); + else + min_ts = TimestampTzPlusMilliseconds(GetCurrentTimestamp(), + -PGSTAT_STAT_INTERVAL); + + /* + * Loop until fresh enough stats file is available or we ran out of time. + * The stats inquiry message is sent repeatedly in case collector drops + * it. + */ + for (count = 0; count < PGSTAT_POLL_LOOP_COUNT; count++) + { + TimestampTz file_ts = 0; + + CHECK_FOR_INTERRUPTS(); + + if (pgstat_read_statsfile_timestamp(false, &file_ts) && + file_ts >= min_ts) + break; + + /* Not there or too old, so kick the collector and wait a bit */ + pgstat_send_inquiry(min_ts); + pg_usleep(PGSTAT_RETRY_DELAY * 1000L); + } + + if (count >= PGSTAT_POLL_LOOP_COUNT) + elog(WARNING, "pgstat wait timeout"); + /* Autovacuum launcher wants stats about all databases */ if (IsAutoVacuumLauncherProcess()) - pgStatDBHash = pgstat_read_statsfile(InvalidOid); + pgStatDBHash = pgstat_read_statsfile(InvalidOid, false); else - pgStatDBHash = pgstat_read_statsfile(MyDatabaseId); + pgStatDBHash = pgstat_read_statsfile(MyDatabaseId, false); } @@ -2318,7 +3835,7 @@ pgstat_setup_memcxt(void) /* ---------- * pgstat_clear_snapshot() - * - * Discard any data collected in the current transaction. Any subsequent + * Discard any data collected in the current transaction. Any subsequent * request will cause new snapshots to be read. * * This is also invoked during transaction commit or abort to discard @@ -2340,6 +3857,20 @@ pgstat_clear_snapshot(void) } +/* ---------- + * pgstat_recv_inquiry() - + * + * Process stat inquiry requests. + * ---------- + */ +static void +pgstat_recv_inquiry(PgStat_MsgInquiry *msg, int len) +{ + if (msg->inquiry_time > last_statrequest) + last_statrequest = msg->inquiry_time; +} + + /* ---------- * pgstat_recv_tabstat() - * @@ -2349,7 +3880,6 @@ pgstat_clear_snapshot(void) static void pgstat_recv_tabstat(PgStat_MsgTabstat *msg, int len) { - PgStat_TableEntry *tabmsg = &(msg->m_entry[0]); PgStat_StatDBEntry *dbentry; PgStat_StatTabEntry *tabentry; int i; @@ -2368,8 +3898,10 @@ pgstat_recv_tabstat(PgStat_MsgTabstat *msg, int len) */ for (i = 0; i < msg->m_nentries; i++) { + PgStat_TableEntry *tabmsg = &(msg->m_entry[i]); + tabentry = (PgStat_StatTabEntry *) hash_search(dbentry->tables, - (void *) &(tabmsg[i].t_id), + (void *) &(tabmsg->t_id), HASH_ENTER, &found); if (!found) @@ -2378,60 +3910,62 @@ pgstat_recv_tabstat(PgStat_MsgTabstat *msg, int len) * If it's a new table entry, initialize counters to the values we * just got. */ - tabentry->numscans = tabmsg[i].t_numscans; - tabentry->tuples_returned = tabmsg[i].t_tuples_returned; - tabentry->tuples_fetched = tabmsg[i].t_tuples_fetched; - tabentry->tuples_inserted = tabmsg[i].t_tuples_inserted; - tabentry->tuples_updated = tabmsg[i].t_tuples_updated; - tabentry->tuples_deleted = tabmsg[i].t_tuples_deleted; - - tabentry->n_live_tuples = tabmsg[i].t_tuples_inserted; - tabentry->n_dead_tuples = tabmsg[i].t_tuples_updated + - tabmsg[i].t_tuples_deleted; - tabentry->last_anl_tuples = 0; + tabentry->numscans = tabmsg->t_counts.t_numscans; + tabentry->tuples_returned = tabmsg->t_counts.t_tuples_returned; + tabentry->tuples_fetched = tabmsg->t_counts.t_tuples_fetched; + tabentry->tuples_inserted = tabmsg->t_counts.t_tuples_inserted; + tabentry->tuples_updated = tabmsg->t_counts.t_tuples_updated; + tabentry->tuples_deleted = tabmsg->t_counts.t_tuples_deleted; + tabentry->tuples_hot_updated = tabmsg->t_counts.t_tuples_hot_updated; + tabentry->n_live_tuples = tabmsg->t_counts.t_delta_live_tuples; + tabentry->n_dead_tuples = tabmsg->t_counts.t_delta_dead_tuples; + tabentry->changes_since_analyze = tabmsg->t_counts.t_changed_tuples; + tabentry->blocks_fetched = tabmsg->t_counts.t_blocks_fetched; + tabentry->blocks_hit = tabmsg->t_counts.t_blocks_hit; + tabentry->vacuum_timestamp = 0; + tabentry->vacuum_count = 0; tabentry->autovac_vacuum_timestamp = 0; + tabentry->autovac_vacuum_count = 0; tabentry->analyze_timestamp = 0; + tabentry->analyze_count = 0; tabentry->autovac_analyze_timestamp = 0; - - tabentry->blocks_fetched = tabmsg[i].t_blocks_fetched; - tabentry->blocks_hit = tabmsg[i].t_blocks_hit; + tabentry->autovac_analyze_count = 0; } else { /* * Otherwise add the values to the existing entry. */ - tabentry->numscans += tabmsg[i].t_numscans; - tabentry->tuples_returned += tabmsg[i].t_tuples_returned; - tabentry->tuples_fetched += tabmsg[i].t_tuples_fetched; - tabentry->tuples_inserted += tabmsg[i].t_tuples_inserted; - tabentry->tuples_updated += tabmsg[i].t_tuples_updated; - tabentry->tuples_deleted += tabmsg[i].t_tuples_deleted; - - tabentry->n_live_tuples += tabmsg[i].t_tuples_inserted - - tabmsg[i].t_tuples_deleted; - tabentry->n_dead_tuples += tabmsg[i].t_tuples_updated + - tabmsg[i].t_tuples_deleted; - - tabentry->blocks_fetched += tabmsg[i].t_blocks_fetched; - tabentry->blocks_hit += tabmsg[i].t_blocks_hit; + tabentry->numscans += tabmsg->t_counts.t_numscans; + tabentry->tuples_returned += tabmsg->t_counts.t_tuples_returned; + tabentry->tuples_fetched += tabmsg->t_counts.t_tuples_fetched; + tabentry->tuples_inserted += tabmsg->t_counts.t_tuples_inserted; + tabentry->tuples_updated += tabmsg->t_counts.t_tuples_updated; + tabentry->tuples_deleted += tabmsg->t_counts.t_tuples_deleted; + tabentry->tuples_hot_updated += tabmsg->t_counts.t_tuples_hot_updated; + tabentry->n_live_tuples += tabmsg->t_counts.t_delta_live_tuples; + tabentry->n_dead_tuples += tabmsg->t_counts.t_delta_dead_tuples; + tabentry->changes_since_analyze += tabmsg->t_counts.t_changed_tuples; + tabentry->blocks_fetched += tabmsg->t_counts.t_blocks_fetched; + tabentry->blocks_hit += tabmsg->t_counts.t_blocks_hit; } - /* - * Add table stats to the database entry. - */ - dbentry->n_tuples_returned += tabmsg[i].t_tuples_returned; - dbentry->n_tuples_fetched += tabmsg[i].t_tuples_fetched; - dbentry->n_tuples_inserted += tabmsg[i].t_tuples_inserted; - dbentry->n_tuples_updated += tabmsg[i].t_tuples_updated; - dbentry->n_tuples_deleted += tabmsg[i].t_tuples_deleted; + /* Clamp n_live_tuples in case of negative delta_live_tuples */ + tabentry->n_live_tuples = Max(tabentry->n_live_tuples, 0); + /* Likewise for n_dead_tuples */ + tabentry->n_dead_tuples = Max(tabentry->n_dead_tuples, 0); /* - * And add the block IO to the database entry. + * Add per-table stats to the per-database entry, too. */ - dbentry->n_blocks_fetched += tabmsg[i].t_blocks_fetched; - dbentry->n_blocks_hit += tabmsg[i].t_blocks_hit; + dbentry->n_tuples_returned += tabmsg->t_counts.t_tuples_returned; + dbentry->n_tuples_fetched += tabmsg->t_counts.t_tuples_fetched; + dbentry->n_tuples_inserted += tabmsg->t_counts.t_tuples_inserted; + dbentry->n_tuples_updated += tabmsg->t_counts.t_tuples_updated; + dbentry->n_tuples_deleted += tabmsg->t_counts.t_tuples_deleted; + dbentry->n_blocks_fetched += tabmsg->t_counts.t_blocks_fetched; + dbentry->n_blocks_hit += tabmsg->t_counts.t_blocks_hit; } } @@ -2492,6 +4026,8 @@ pgstat_recv_dropdb(PgStat_MsgDropdb *msg, int len) { if (dbentry->tables != NULL) hash_destroy(dbentry->tables); + if (dbentry->functions != NULL) + hash_destroy(dbentry->functions); if (hash_search(pgStatDBHash, (void *) &(dbentry->databaseid), @@ -2529,12 +4065,28 @@ pgstat_recv_resetcounter(PgStat_MsgResetcounter *msg, int len) */ if (dbentry->tables != NULL) hash_destroy(dbentry->tables); + if (dbentry->functions != NULL) + hash_destroy(dbentry->functions); dbentry->tables = NULL; + dbentry->functions = NULL; + + /* + * Reset database-level stats too. This should match the initialization + * code in pgstat_get_db_entry(). + */ dbentry->n_xact_commit = 0; dbentry->n_xact_rollback = 0; dbentry->n_blocks_fetched = 0; dbentry->n_blocks_hit = 0; + dbentry->n_tuples_returned = 0; + dbentry->n_tuples_fetched = 0; + dbentry->n_tuples_inserted = 0; + dbentry->n_tuples_updated = 0; + dbentry->n_tuples_deleted = 0; + dbentry->last_autovac_time = 0; + + dbentry->stat_reset_timestamp = GetCurrentTimestamp(); memset(&hash_ctl, 0, sizeof(hash_ctl)); hash_ctl.keysize = sizeof(Oid); @@ -2544,33 +4096,82 @@ pgstat_recv_resetcounter(PgStat_MsgResetcounter *msg, int len) PGSTAT_TAB_HASH_SIZE, &hash_ctl, HASH_ELEM | HASH_FUNCTION); + + hash_ctl.keysize = sizeof(Oid); + hash_ctl.entrysize = sizeof(PgStat_StatFuncEntry); + hash_ctl.hash = oid_hash; + dbentry->functions = hash_create("Per-database function", + PGSTAT_FUNCTION_HASH_SIZE, + &hash_ctl, + HASH_ELEM | HASH_FUNCTION); } /* ---------- - * pgstat_recv_autovac() - + * pgstat_recv_resetshared() - * - * Process an autovacuum signalling message. + * Reset some shared statistics of the cluster. * ---------- */ static void -pgstat_recv_autovac(PgStat_MsgAutovacStart *msg, int len) +pgstat_recv_resetsharedcounter(PgStat_MsgResetsharedcounter *msg, int len) { - PgStat_StatDBEntry *dbentry; + if (msg->m_resettarget == RESET_BGWRITER) + { + /* Reset the global background writer statistics for the cluster. */ + memset(&globalStats, 0, sizeof(globalStats)); + globalStats.stat_reset_timestamp = GetCurrentTimestamp(); + } /* - * Lookup the database in the hashtable. Don't create the entry if it - * doesn't exist, because autovacuum may be processing a template - * database. If this isn't the case, the database is most likely to have - * an entry already. (If it doesn't, not much harm is done anyway -- - * it'll get created as soon as somebody actually uses the database.) + * Presumably the sender of this message validated the target, don't + * complain here if it's not valid */ +} + +/* ---------- + * pgstat_recv_resetsinglecounter() - + * + * Reset a statistics for a single object + * ---------- + */ +static void +pgstat_recv_resetsinglecounter(PgStat_MsgResetsinglecounter *msg, int len) +{ + PgStat_StatDBEntry *dbentry; + dbentry = pgstat_get_db_entry(msg->m_databaseid, false); - if (dbentry == NULL) + + if (!dbentry) return; + /* Set the reset timestamp for the whole database */ + dbentry->stat_reset_timestamp = GetCurrentTimestamp(); + + /* Remove object if it exists, ignore it if not */ + if (msg->m_resettype == RESET_TABLE) + (void) hash_search(dbentry->tables, (void *) &(msg->m_objectid), + HASH_REMOVE, NULL); + else if (msg->m_resettype == RESET_FUNCTION) + (void) hash_search(dbentry->functions, (void *) &(msg->m_objectid), + HASH_REMOVE, NULL); +} + +/* ---------- + * pgstat_recv_autovac() - + * + * Process an autovacuum signalling message. + * ---------- + */ +static void +pgstat_recv_autovac(PgStat_MsgAutovacStart *msg, int len) +{ + PgStat_StatDBEntry *dbentry; + /* - * Store the last autovacuum time in the database entry. + * Store the last autovacuum time in the database's hashtable entry. */ + dbentry = pgstat_get_db_entry(msg->m_databaseid, true); + dbentry->last_autovac_time = msg->m_start_time; } @@ -2587,38 +4188,25 @@ pgstat_recv_vacuum(PgStat_MsgVacuum *msg, int len) PgStat_StatTabEntry *tabentry; /* - * Don't create either the database or table entry if it doesn't already - * exist. This avoids bloating the stats with entries for stuff that is - * only touched by vacuum and not by live operations. + * Store the data in the table's hashtable entry. */ - dbentry = pgstat_get_db_entry(msg->m_databaseid, false); - if (dbentry == NULL) - return; + dbentry = pgstat_get_db_entry(msg->m_databaseid, true); - tabentry = hash_search(dbentry->tables, &(msg->m_tableoid), - HASH_FIND, NULL); - if (tabentry == NULL) - return; + tabentry = pgstat_get_tab_entry(dbentry, msg->m_tableoid, true); - if (msg->m_autovacuum) - tabentry->autovac_vacuum_timestamp = msg->m_vacuumtime; - else - tabentry->vacuum_timestamp = msg->m_vacuumtime; tabentry->n_live_tuples = msg->m_tuples; + /* Resetting dead_tuples to 0 is an approximation ... */ tabentry->n_dead_tuples = 0; - if (msg->m_analyze) + + if (msg->m_autovacuum) { - tabentry->last_anl_tuples = msg->m_tuples; - if (msg->m_autovacuum) - tabentry->autovac_analyze_timestamp = msg->m_vacuumtime; - else - tabentry->analyze_timestamp = msg->m_vacuumtime; + tabentry->autovac_vacuum_timestamp = msg->m_vacuumtime; + tabentry->autovac_vacuum_count++; } else { - /* last_anl_tuples must never exceed n_live_tuples */ - tabentry->last_anl_tuples = Min(tabentry->last_anl_tuples, - msg->m_tuples); + tabentry->vacuum_timestamp = msg->m_vacuumtime; + tabentry->vacuum_count++; } } @@ -2635,24 +4223,169 @@ pgstat_recv_analyze(PgStat_MsgAnalyze *msg, int len) PgStat_StatTabEntry *tabentry; /* - * Don't create either the database or table entry if it doesn't already - * exist. This avoids bloating the stats with entries for stuff that is - * only touched by analyze and not by live operations. + * Store the data in the table's hashtable entry. */ - dbentry = pgstat_get_db_entry(msg->m_databaseid, false); - if (dbentry == NULL) - return; + dbentry = pgstat_get_db_entry(msg->m_databaseid, true); - tabentry = hash_search(dbentry->tables, &(msg->m_tableoid), - HASH_FIND, NULL); - if (tabentry == NULL) - return; + tabentry = pgstat_get_tab_entry(dbentry, msg->m_tableoid, true); + + tabentry->n_live_tuples = msg->m_live_tuples; + tabentry->n_dead_tuples = msg->m_dead_tuples; + + /* + * We reset changes_since_analyze to zero, forgetting any changes that + * occurred while the ANALYZE was in progress. + */ + tabentry->changes_since_analyze = 0; if (msg->m_autovacuum) + { tabentry->autovac_analyze_timestamp = msg->m_analyzetime; + tabentry->autovac_analyze_count++; + } else + { tabentry->analyze_timestamp = msg->m_analyzetime; - tabentry->n_live_tuples = msg->m_live_tuples; - tabentry->n_dead_tuples = msg->m_dead_tuples; - tabentry->last_anl_tuples = msg->m_live_tuples + msg->m_dead_tuples; + tabentry->analyze_count++; + } +} + + +/* ---------- + * pgstat_recv_bgwriter() - + * + * Process a BGWRITER message. + * ---------- + */ +static void +pgstat_recv_bgwriter(PgStat_MsgBgWriter *msg, int len) +{ + globalStats.timed_checkpoints += msg->m_timed_checkpoints; + globalStats.requested_checkpoints += msg->m_requested_checkpoints; + globalStats.buf_written_checkpoints += msg->m_buf_written_checkpoints; + globalStats.buf_written_clean += msg->m_buf_written_clean; + globalStats.maxwritten_clean += msg->m_maxwritten_clean; + globalStats.buf_written_backend += msg->m_buf_written_backend; + globalStats.buf_fsync_backend += msg->m_buf_fsync_backend; + globalStats.buf_alloc += msg->m_buf_alloc; +} + +/* ---------- + * pgstat_recv_recoveryconflict() - + * + * Process as RECOVERYCONFLICT message. + * ---------- + */ +static void +pgstat_recv_recoveryconflict(PgStat_MsgRecoveryConflict *msg, int len) +{ + PgStat_StatDBEntry *dbentry; + + dbentry = pgstat_get_db_entry(msg->m_databaseid, true); + + switch (msg->m_reason) + { + case PROCSIG_RECOVERY_CONFLICT_DATABASE: + + /* + * Since we drop the information about the database as soon as it + * replicates, there is no point in counting these conflicts. + */ + break; + case PROCSIG_RECOVERY_CONFLICT_TABLESPACE: + dbentry->n_conflict_tablespace++; + break; + case PROCSIG_RECOVERY_CONFLICT_LOCK: + dbentry->n_conflict_lock++; + break; + case PROCSIG_RECOVERY_CONFLICT_SNAPSHOT: + dbentry->n_conflict_snapshot++; + break; + case PROCSIG_RECOVERY_CONFLICT_BUFFERPIN: + dbentry->n_conflict_bufferpin++; + break; + case PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK: + dbentry->n_conflict_startup_deadlock++; + break; + } +} + +/* ---------- + * pgstat_recv_funcstat() - + * + * Count what the backend has done. + * ---------- + */ +static void +pgstat_recv_funcstat(PgStat_MsgFuncstat *msg, int len) +{ + PgStat_FunctionEntry *funcmsg = &(msg->m_entry[0]); + PgStat_StatDBEntry *dbentry; + PgStat_StatFuncEntry *funcentry; + int i; + bool found; + + dbentry = pgstat_get_db_entry(msg->m_databaseid, true); + + /* + * Process all function entries in the message. + */ + for (i = 0; i < msg->m_nentries; i++, funcmsg++) + { + funcentry = (PgStat_StatFuncEntry *) hash_search(dbentry->functions, + (void *) &(funcmsg->f_id), + HASH_ENTER, &found); + + if (!found) + { + /* + * If it's a new function entry, initialize counters to the values + * we just got. + */ + funcentry->f_numcalls = funcmsg->f_numcalls; + funcentry->f_time = funcmsg->f_time; + funcentry->f_time_self = funcmsg->f_time_self; + } + else + { + /* + * Otherwise add the values to the existing entry. + */ + funcentry->f_numcalls += funcmsg->f_numcalls; + funcentry->f_time += funcmsg->f_time; + funcentry->f_time_self += funcmsg->f_time_self; + } + } +} + +/* ---------- + * pgstat_recv_funcpurge() - + * + * Arrange for dead function removal. + * ---------- + */ +static void +pgstat_recv_funcpurge(PgStat_MsgFuncpurge *msg, int len) +{ + PgStat_StatDBEntry *dbentry; + int i; + + dbentry = pgstat_get_db_entry(msg->m_databaseid, false); + + /* + * No need to purge if we don't even know the database. + */ + if (!dbentry || !dbentry->functions) + return; + + /* + * Process all function entries in the message. + */ + for (i = 0; i < msg->m_nentries; i++) + { + /* Remove from hashtable if present; we don't care if it's not. */ + (void) hash_search(dbentry->functions, + (void *) &(msg->m_functionid[i]), + HASH_REMOVE, NULL); + } }