#include "access/xact.h"
#include "catalog/pg_database.h"
#include "catalog/pg_proc.h"
+#include "lib/ilist.h"
#include "libpq/ip.h"
#include "libpq/libpq.h"
#include "libpq/pqsignal.h"
* Paths for the statistics files (relative to installation's $PGDATA).
* ----------
*/
-#define PGSTAT_STAT_PERMANENT_FILENAME "global/pgstat.stat"
-#define PGSTAT_STAT_PERMANENT_TMPFILE "global/pgstat.tmp"
+#define PGSTAT_STAT_PERMANENT_DIRECTORY "pg_stat"
+#define PGSTAT_STAT_PERMANENT_FILENAME "pg_stat/global.stat"
+#define PGSTAT_STAT_PERMANENT_TMPFILE "pg_stat/global.tmp"
/* ----------
* Timer definitions.
* Built from GUC parameter
* ----------
*/
+char *pgstat_stat_directory = NULL;
char *pgstat_stat_filename = NULL;
char *pgstat_stat_tmpname = NULL;
*/
static PgStat_GlobalStats globalStats;
-/* Last time the collector successfully wrote the stats file */
-static TimestampTz last_statwrite;
+/* Write request info for each database */
+typedef struct DBWriteRequest
+{
+ Oid databaseid; /* OID of the database to write */
+ TimestampTz request_time; /* timestamp of the last write request */
+ slist_node next;
+} DBWriteRequest;
-/* Latest statistics request time from backends */
-static TimestampTz last_statrequest;
+/* Latest statistics request times from backends */
+static slist_head last_statrequests = SLIST_STATIC_INIT(last_statrequests);
static volatile bool need_exit = false;
static volatile bool got_SIGHUP = false;
static PgStat_StatDBEntry *pgstat_get_db_entry(Oid databaseid, bool create);
static PgStat_StatTabEntry *pgstat_get_tab_entry(PgStat_StatDBEntry *dbentry,
Oid tableoid, bool create);
-static void pgstat_write_statsfile(bool permanent);
-static HTAB *pgstat_read_statsfile(Oid onlydb, bool permanent);
+static void pgstat_write_statsfiles(bool permanent, bool allDbs);
+static void pgstat_write_db_statsfile(PgStat_StatDBEntry *dbentry, bool permanent);
+static HTAB *pgstat_read_statsfiles(Oid onlydb, bool permanent, bool deep);
+static void pgstat_read_db_statsfile(Oid databaseid, HTAB *tabhash, HTAB *funchash, bool permanent);
static void backend_read_statsfile(void);
static void pgstat_read_current_status(void);
+static bool pgstat_write_statsfile_needed(void);
+static bool pgstat_db_requested(Oid databaseid);
+
static void pgstat_send_tabstat(PgStat_MsgTabstat *tsmsg);
static void pgstat_send_funcstats(void);
static HTAB *pgstat_collect_oids(Oid catalogid);
static void pgstat_recv_deadlock(PgStat_MsgDeadlock *msg, int len);
static void pgstat_recv_tempfile(PgStat_MsgTempFile *msg, int len);
-
/* ------------------------------------------------------------
* Public functions called from postmaster follow
* ------------------------------------------------------------
SetConfigOption("track_counts", "off", PGC_INTERNAL, PGC_S_OVERRIDE);
}
+/*
+ * subroutine for pgstat_reset_all
+ */
+static void
+pgstat_reset_remove_files(const char *directory)
+{
+ DIR *dir;
+ struct dirent *entry;
+ char fname[MAXPGPATH];
+
+ dir = AllocateDir(pgstat_stat_directory);
+ while ((entry = ReadDir(dir, pgstat_stat_directory)) != NULL)
+ {
+ if (strcmp(entry->d_name, ".") == 0 || strcmp(entry->d_name, "..") == 0)
+ continue;
+
+ /* XXX should we try to ignore files other than the ones we write? */
+
+ snprintf(fname, MAXPGPATH, "%s/%s", pgstat_stat_directory,
+ entry->d_name);
+ unlink(fname);
+ }
+ FreeDir(dir);
+}
+
/*
* pgstat_reset_all() -
*
- * Remove the stats file. This is currently used only if WAL
+ * Remove the stats files. This is currently used only if WAL
* recovery is needed after a crash.
*/
void
pgstat_reset_all(void)
{
- unlink(pgstat_stat_filename);
- unlink(PGSTAT_STAT_PERMANENT_FILENAME);
+ pgstat_reset_remove_files(pgstat_stat_directory);
+ pgstat_reset_remove_files(PGSTAT_STAT_PERMANENT_DIRECTORY);
}
#ifdef EXEC_BACKEND
* ----------
*/
static void
-pgstat_send_inquiry(TimestampTz clock_time, TimestampTz cutoff_time)
+pgstat_send_inquiry(TimestampTz clock_time, TimestampTz cutoff_time, Oid databaseid)
{
PgStat_MsgInquiry msg;
pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_INQUIRY);
msg.clock_time = clock_time;
msg.cutoff_time = cutoff_time;
+ msg.databaseid = databaseid;
pgstat_send(&msg, sizeof(msg));
}
elog(FATAL, "setsid() failed: %m");
#endif
- InitializeLatchSupport(); /* needed for latch waits */
+ InitializeLatchSupport(); /* needed for latch waits */
/* Initialize private latch for use by signal handlers */
InitLatch(&pgStatLatch);
*/
init_ps_display("stats collector process", "", "", "");
- /*
- * Arrange to write the initial status file right away
- */
- last_statrequest = GetCurrentTimestamp();
- last_statwrite = last_statrequest - 1;
-
/*
* Read in an existing statistics stats file or initialize the stats to
* zero.
*/
pgStatRunningInCollector = true;
- pgStatDBHash = pgstat_read_statsfile(InvalidOid, true);
+ pgStatDBHash = pgstat_read_statsfiles(InvalidOid, true, true);
/*
* Loop to process messages until we get SIGQUIT or detect ungraceful
* Write the stats file if a new request has arrived that is not
* satisfied by existing file.
*/
- if (last_statwrite < last_statrequest)
- pgstat_write_statsfile(false);
+ if (pgstat_write_statsfile_needed())
+ pgstat_write_statsfiles(false, false);
/*
* Try to receive and process a message. This will not block,
/*
* Save the final stats to reuse at next startup.
*/
- pgstat_write_statsfile(true);
+ pgstat_write_statsfiles(true, true);
exit(0);
}
errno = save_errno;
}
+/*
+ * Subroutine to clear stats in a database entry
+ *
+ * Tables and functions hashes are initialized to empty.
+ */
+static void
+reset_dbentry_counters(PgStat_StatDBEntry *dbentry)
+{
+ HASHCTL hash_ctl;
+
+ dbentry->n_xact_commit = 0;
+ dbentry->n_xact_rollback = 0;
+ dbentry->n_blocks_fetched = 0;
+ dbentry->n_blocks_hit = 0;
+ dbentry->n_tuples_returned = 0;
+ dbentry->n_tuples_fetched = 0;
+ dbentry->n_tuples_inserted = 0;
+ dbentry->n_tuples_updated = 0;
+ dbentry->n_tuples_deleted = 0;
+ dbentry->last_autovac_time = 0;
+ dbentry->n_conflict_tablespace = 0;
+ dbentry->n_conflict_lock = 0;
+ dbentry->n_conflict_snapshot = 0;
+ dbentry->n_conflict_bufferpin = 0;
+ dbentry->n_conflict_startup_deadlock = 0;
+ dbentry->n_temp_files = 0;
+ dbentry->n_temp_bytes = 0;
+ dbentry->n_deadlocks = 0;
+ dbentry->n_block_read_time = 0;
+ dbentry->n_block_write_time = 0;
+
+ dbentry->stat_reset_timestamp = GetCurrentTimestamp();
+ dbentry->stats_timestamp = 0;
+
+ memset(&hash_ctl, 0, sizeof(hash_ctl));
+ hash_ctl.keysize = sizeof(Oid);
+ hash_ctl.entrysize = sizeof(PgStat_StatTabEntry);
+ hash_ctl.hash = oid_hash;
+ dbentry->tables = hash_create("Per-database table",
+ PGSTAT_TAB_HASH_SIZE,
+ &hash_ctl,
+ HASH_ELEM | HASH_FUNCTION);
+
+ hash_ctl.keysize = sizeof(Oid);
+ hash_ctl.entrysize = sizeof(PgStat_StatFuncEntry);
+ hash_ctl.hash = oid_hash;
+ dbentry->functions = hash_create("Per-database function",
+ PGSTAT_FUNCTION_HASH_SIZE,
+ &hash_ctl,
+ HASH_ELEM | HASH_FUNCTION);
+}
/*
* Lookup the hash table entry for the specified database. If no hash
if (!create && !found)
return NULL;
- /* If not found, initialize the new one. */
+ /*
+ * If not found, initialize the new one. This creates empty hash tables
+ * for tables and functions, too.
+ */
if (!found)
- {
- HASHCTL hash_ctl;
-
- result->tables = NULL;
- result->functions = NULL;
- result->n_xact_commit = 0;
- result->n_xact_rollback = 0;
- result->n_blocks_fetched = 0;
- result->n_blocks_hit = 0;
- result->n_tuples_returned = 0;
- result->n_tuples_fetched = 0;
- result->n_tuples_inserted = 0;
- result->n_tuples_updated = 0;
- result->n_tuples_deleted = 0;
- result->last_autovac_time = 0;
- result->n_conflict_tablespace = 0;
- result->n_conflict_lock = 0;
- result->n_conflict_snapshot = 0;
- result->n_conflict_bufferpin = 0;
- result->n_conflict_startup_deadlock = 0;
- result->n_temp_files = 0;
- result->n_temp_bytes = 0;
- result->n_deadlocks = 0;
- result->n_block_read_time = 0;
- result->n_block_write_time = 0;
-
- result->stat_reset_timestamp = GetCurrentTimestamp();
-
- memset(&hash_ctl, 0, sizeof(hash_ctl));
- hash_ctl.keysize = sizeof(Oid);
- hash_ctl.entrysize = sizeof(PgStat_StatTabEntry);
- hash_ctl.hash = oid_hash;
- result->tables = hash_create("Per-database table",
- PGSTAT_TAB_HASH_SIZE,
- &hash_ctl,
- HASH_ELEM | HASH_FUNCTION);
-
- hash_ctl.keysize = sizeof(Oid);
- hash_ctl.entrysize = sizeof(PgStat_StatFuncEntry);
- hash_ctl.hash = oid_hash;
- result->functions = hash_create("Per-database function",
- PGSTAT_FUNCTION_HASH_SIZE,
- &hash_ctl,
- HASH_ELEM | HASH_FUNCTION);
- }
+ reset_dbentry_counters(result);
return result;
}
/* ----------
- * pgstat_write_statsfile() -
+ * pgstat_write_statsfiles() -
+ * Write the global statistics file, as well as requested DB files.
*
- * Tell the news.
- * If writing to the permanent file (happens when the collector is
- * shutting down only), remove the temporary file so that backends
+ * If writing to the permanent files (happens when the collector is
+ * shutting down only), remove the temporary files so that backends
* starting up under a new postmaster can't read the old data before
* the new collector is ready.
+ *
+ * When 'allDbs' is false, only the requested databases (listed in
+ * last_statrequests) will be written; otherwise, all databases will be
+ * written.
* ----------
*/
static void
-pgstat_write_statsfile(bool permanent)
+pgstat_write_statsfiles(bool permanent, bool allDbs)
{
HASH_SEQ_STATUS hstat;
- HASH_SEQ_STATUS tstat;
- HASH_SEQ_STATUS fstat;
PgStat_StatDBEntry *dbentry;
- PgStat_StatTabEntry *tabentry;
- PgStat_StatFuncEntry *funcentry;
FILE *fpout;
int32 format_id;
const char *tmpfile = permanent ? PGSTAT_STAT_PERMANENT_TMPFILE : pgstat_stat_tmpname;
const char *statfile = permanent ? PGSTAT_STAT_PERMANENT_FILENAME : pgstat_stat_filename;
int rc;
+ elog(DEBUG2, "writing statsfile '%s'", statfile);
+
/*
* Open the statistics temp file to write out the current values.
*/
while ((dbentry = (PgStat_StatDBEntry *) hash_seq_search(&hstat)) != NULL)
{
/*
- * Write out the DB entry including the number of live backends. We
- * don't write the tables or functions pointers, since they're of no
- * use to any other process.
- */
- fputc('D', fpout);
- rc = fwrite(dbentry, offsetof(PgStat_StatDBEntry, tables), 1, fpout);
- (void) rc; /* we'll check for error with ferror */
-
- /*
- * Walk through the database's access stats per table.
- */
- hash_seq_init(&tstat, dbentry->tables);
- while ((tabentry = (PgStat_StatTabEntry *) hash_seq_search(&tstat)) != NULL)
- {
- fputc('T', fpout);
- rc = fwrite(tabentry, sizeof(PgStat_StatTabEntry), 1, fpout);
- (void) rc; /* we'll check for error with ferror */
- }
-
- /*
- * Walk through the database's function stats table.
+ * Write out the tables and functions into the DB stat file, if
+ * required.
+ *
+ * We need to do this before the dbentry write, to ensure the
+ * timestamps written to both are consistent.
*/
- hash_seq_init(&fstat, dbentry->functions);
- while ((funcentry = (PgStat_StatFuncEntry *) hash_seq_search(&fstat)) != NULL)
+ if (allDbs || pgstat_db_requested(dbentry->databaseid))
{
- fputc('F', fpout);
- rc = fwrite(funcentry, sizeof(PgStat_StatFuncEntry), 1, fpout);
- (void) rc; /* we'll check for error with ferror */
+ dbentry->stats_timestamp = globalStats.stats_timestamp;
+ pgstat_write_db_statsfile(dbentry, permanent);
}
/*
- * Mark the end of this DB
+ * Write out the DB entry. We don't write the tables or functions
+ * pointers, since they're of no use to any other process.
*/
- fputc('d', fpout);
+ fputc('D', fpout);
+ rc = fwrite(dbentry, offsetof(PgStat_StatDBEntry, tables), 1, fpout);
+ (void) rc; /* we'll check for error with ferror */
}
/*
tmpfile, statfile)));
unlink(tmpfile);
}
- else
+
+ if (permanent)
+ unlink(pgstat_stat_filename);
+
+ /*
+ * Now throw away the list of requests. Note that requests sent after we
+ * started the write are still waiting on the network socket.
+ */
+ if (!slist_is_empty(&last_statrequests))
{
- /*
- * Successful write, so update last_statwrite.
- */
- last_statwrite = globalStats.stats_timestamp;
+ slist_mutable_iter iter;
- /*
- * If there is clock skew between backends and the collector, we could
- * receive a stats request time that's in the future. If so, complain
- * and reset last_statrequest. Resetting ensures that no inquiry
- * message can cause more than one stats file write to occur.
- */
- if (last_statrequest > last_statwrite)
+ slist_foreach_modify(iter, &last_statrequests)
{
- char *reqtime;
- char *mytime;
+ DBWriteRequest *req;
- /* Copy because timestamptz_to_str returns a static buffer */
- reqtime = pstrdup(timestamptz_to_str(last_statrequest));
- mytime = pstrdup(timestamptz_to_str(last_statwrite));
- elog(LOG, "last_statrequest %s is later than collector's time %s",
- reqtime, mytime);
- pfree(reqtime);
- pfree(mytime);
-
- last_statrequest = last_statwrite;
+ req = slist_container(DBWriteRequest, next, iter.cur);
+ pfree(req);
}
+
+ slist_init(&last_statrequests);
}
+}
- if (permanent)
- unlink(pgstat_stat_filename);
+/*
+ * return the filename for a DB stat file; filename is the output buffer,
+ * of length len.
+ */
+static void
+get_dbstat_filename(bool permanent, bool tempname, Oid databaseid,
+ char *filename, int len)
+{
+ int printed;
+
+ printed = snprintf(filename, len, "%s/db_%u.%s",
+ permanent ? PGSTAT_STAT_PERMANENT_DIRECTORY :
+ pgstat_stat_directory,
+ databaseid,
+ tempname ? "tmp" : "stat");
+ if (printed > len)
+ elog(ERROR, "overlength pgstat path");
}
+/* ----------
+ * pgstat_write_db_statsfile() -
+ * Write the stat file for a single database.
+ *
+ * If writing to the permanent file (happens when the collector is
+ * shutting down only), remove the temporary file so that backends
+ * starting up under a new postmaster can't read the old data before
+ * the new collector is ready.
+ * ----------
+ */
+static void
+pgstat_write_db_statsfile(PgStat_StatDBEntry *dbentry, bool permanent)
+{
+ HASH_SEQ_STATUS tstat;
+ HASH_SEQ_STATUS fstat;
+ PgStat_StatTabEntry *tabentry;
+ PgStat_StatFuncEntry *funcentry;
+ FILE *fpout;
+ int32 format_id;
+ Oid dbid = dbentry->databaseid;
+ int rc;
+ char tmpfile[MAXPGPATH];
+ char statfile[MAXPGPATH];
+
+ get_dbstat_filename(permanent, true, dbid, tmpfile, MAXPGPATH);
+ get_dbstat_filename(permanent, false, dbid, statfile, MAXPGPATH);
+
+ elog(DEBUG2, "writing statsfile '%s'", statfile);
+
+ /*
+ * Open the statistics temp file to write out the current values.
+ */
+ fpout = AllocateFile(tmpfile, PG_BINARY_W);
+ if (fpout == NULL)
+ {
+ ereport(LOG,
+ (errcode_for_file_access(),
+ errmsg("could not open temporary statistics file \"%s\": %m",
+ tmpfile)));
+ return;
+ }
+
+ /*
+ * Write the file header --- currently just a format ID.
+ */
+ format_id = PGSTAT_FILE_FORMAT_ID;
+ rc = fwrite(&format_id, sizeof(format_id), 1, fpout);
+ (void) rc; /* we'll check for error with ferror */
+
+ /*
+ * Walk through the database's access stats per table.
+ */
+ hash_seq_init(&tstat, dbentry->tables);
+ while ((tabentry = (PgStat_StatTabEntry *) hash_seq_search(&tstat)) != NULL)
+ {
+ fputc('T', fpout);
+ rc = fwrite(tabentry, sizeof(PgStat_StatTabEntry), 1, fpout);
+ (void) rc; /* we'll check for error with ferror */
+ }
+
+ /*
+ * Walk through the database's function stats table.
+ */
+ hash_seq_init(&fstat, dbentry->functions);
+ while ((funcentry = (PgStat_StatFuncEntry *) hash_seq_search(&fstat)) != NULL)
+ {
+ fputc('F', fpout);
+ rc = fwrite(funcentry, sizeof(PgStat_StatFuncEntry), 1, fpout);
+ (void) rc; /* we'll check for error with ferror */
+ }
+
+ /*
+ * No more output to be done. Close the temp file and replace the old
+ * pgstat.stat with it. The ferror() check replaces testing for error
+ * after each individual fputc or fwrite above.
+ */
+ fputc('E', fpout);
+
+ if (ferror(fpout))
+ {
+ ereport(LOG,
+ (errcode_for_file_access(),
+ errmsg("could not write temporary statistics file \"%s\": %m",
+ tmpfile)));
+ FreeFile(fpout);
+ unlink(tmpfile);
+ }
+ else if (FreeFile(fpout) < 0)
+ {
+ ereport(LOG,
+ (errcode_for_file_access(),
+ errmsg("could not close temporary statistics file \"%s\": %m",
+ tmpfile)));
+ unlink(tmpfile);
+ }
+ else if (rename(tmpfile, statfile) < 0)
+ {
+ ereport(LOG,
+ (errcode_for_file_access(),
+ errmsg("could not rename temporary statistics file \"%s\" to \"%s\": %m",
+ tmpfile, statfile)));
+ unlink(tmpfile);
+ }
+
+ if (permanent)
+ {
+ get_dbstat_filename(false, false, dbid, statfile, MAXPGPATH);
+
+ elog(DEBUG2, "removing temporary stat file '%s'", statfile);
+ unlink(statfile);
+ }
+}
/* ----------
- * pgstat_read_statsfile() -
+ * pgstat_read_statsfiles() -
*
- * Reads in an existing statistics collector file and initializes the
- * databases' hash table (whose entries point to the tables' hash tables).
+ * Reads in the existing statistics collector files and initializes the
+ * databases' hash table. If the permanent file name is requested (which
+ * only happens in the stats collector itself), also remove the file after
+ * reading; the in-memory status is now authoritative, and the permanent file
+ * would be out of date in case somebody else reads it.
+ *
+ * If a deep read is requested, table/function stats are read also, otherwise
+ * the table/function hash tables remain empty.
* ----------
*/
static HTAB *
-pgstat_read_statsfile(Oid onlydb, bool permanent)
+pgstat_read_statsfiles(Oid onlydb, bool permanent, bool deep)
{
PgStat_StatDBEntry *dbentry;
PgStat_StatDBEntry dbbuf;
- PgStat_StatTabEntry *tabentry;
- PgStat_StatTabEntry tabbuf;
- PgStat_StatFuncEntry funcbuf;
- PgStat_StatFuncEntry *funcentry;
HASHCTL hash_ctl;
HTAB *dbhash;
- HTAB *tabhash = NULL;
- HTAB *funchash = NULL;
FILE *fpin;
int32 format_id;
bool found;
globalStats.stat_reset_timestamp = GetCurrentTimestamp();
/*
- * Try to open the status file. If it doesn't exist, the backends simply
+ * Try to open the stats file. If it doesn't exist, the backends simply
* return zero for anything and the collector simply starts from scratch
* with empty counters.
*
/*
* Verify it's of the expected format.
*/
- if (fread(&format_id, 1, sizeof(format_id), fpin) != sizeof(format_id)
- || format_id != PGSTAT_FILE_FORMAT_ID)
+ if (fread(&format_id, 1, sizeof(format_id), fpin) != sizeof(format_id) ||
+ format_id != PGSTAT_FILE_FORMAT_ID)
{
ereport(pgStatRunningInCollector ? LOG : WARNING,
(errmsg("corrupted statistics file \"%s\"", statfile)));
{
/*
* 'D' A PgStat_StatDBEntry struct describing a database
- * follows. Subsequently, zero to many 'T' and 'F' entries
- * will follow until a 'd' is encountered.
+ * follows.
*/
case 'D':
if (fread(&dbbuf, 1, offsetof(PgStat_StatDBEntry, tables),
HASH_ELEM | HASH_FUNCTION | HASH_CONTEXT);
/*
- * Arrange that following records add entries to this
- * database's hash tables.
+ * If requested, read the data from the database-specific
+ * file. If there was onlydb specified (!= InvalidOid), we
+ * would not get here because of a break above. So we don't
+ * need to recheck.
*/
- tabhash = dbentry->tables;
- funchash = dbentry->functions;
- break;
+ if (deep)
+ pgstat_read_db_statsfile(dbentry->databaseid,
+ dbentry->tables,
+ dbentry->functions,
+ permanent);
- /*
- * 'd' End of this database.
- */
- case 'd':
- tabhash = NULL;
- funchash = NULL;
break;
+ case 'E':
+ goto done;
+
+ default:
+ ereport(pgStatRunningInCollector ? LOG : WARNING,
+ (errmsg("corrupted statistics file \"%s\"",
+ statfile)));
+ goto done;
+ }
+ }
+
+done:
+ FreeFile(fpin);
+
+ /* If requested to read the permanent file, also get rid of it. */
+ if (permanent)
+ {
+ elog(DEBUG2, "removing permanent stats file '%s'", statfile);
+ unlink(statfile);
+ }
+
+ return dbhash;
+}
+
+
+/* ----------
+ * pgstat_read_db_statsfile() -
+ *
+ * Reads in the existing statistics collector file for the given database,
+ * and initializes the tables and functions hash tables.
+ *
+ * As pgstat_read_statsfiles, if the permanent file is requested, it is
+ * removed after reading.
+ * ----------
+ */
+static void
+pgstat_read_db_statsfile(Oid databaseid, HTAB *tabhash, HTAB *funchash,
+ bool permanent)
+{
+ PgStat_StatTabEntry *tabentry;
+ PgStat_StatTabEntry tabbuf;
+ PgStat_StatFuncEntry funcbuf;
+ PgStat_StatFuncEntry *funcentry;
+ FILE *fpin;
+ int32 format_id;
+ bool found;
+ char statfile[MAXPGPATH];
+
+ get_dbstat_filename(permanent, false, databaseid, statfile, MAXPGPATH);
+
+ /*
+ * Try to open the stats file. If it doesn't exist, the backends simply
+ * return zero for anything and the collector simply starts from scratch
+ * with empty counters.
+ *
+ * ENOENT is a possibility if the stats collector is not running or has
+ * not yet written the stats file the first time. Any other failure
+ * condition is suspicious.
+ */
+ if ((fpin = AllocateFile(statfile, PG_BINARY_R)) == NULL)
+ {
+ if (errno != ENOENT)
+ ereport(pgStatRunningInCollector ? LOG : WARNING,
+ (errcode_for_file_access(),
+ errmsg("could not open statistics file \"%s\": %m",
+ statfile)));
+ return;
+ }
+
+ /*
+ * Verify it's of the expected format.
+ */
+ if (fread(&format_id, 1, sizeof(format_id), fpin) != sizeof(format_id) ||
+ format_id != PGSTAT_FILE_FORMAT_ID)
+ {
+ ereport(pgStatRunningInCollector ? LOG : WARNING,
+ (errmsg("corrupted statistics file \"%s\"", statfile)));
+ goto done;
+ }
+
+ /*
+ * We found an existing collector stats file. Read it and put all the
+ * hashtable entries into place.
+ */
+ for (;;)
+ {
+ switch (fgetc(fpin))
+ {
/*
* 'T' A PgStat_StatTabEntry follows.
*/
FreeFile(fpin);
if (permanent)
- unlink(PGSTAT_STAT_PERMANENT_FILENAME);
+ {
+ elog(DEBUG2, "removing permanent stats file '%s'", statfile);
+ unlink(statfile);
+ }
- return dbhash;
+ return;
}
/* ----------
- * pgstat_read_statsfile_timestamp() -
+ * pgstat_read_db_statsfile_timestamp() -
+ *
+ * Attempt to determine the timestamp of the last db statfile write.
+ * Returns TRUE if successful; the timestamp is stored in *ts.
+ *
+ * This needs to be careful about handling databases for which no stats file
+ * exists, such as databases without a stat entry or those not yet written:
*
- * Attempt to fetch the timestamp of an existing stats file.
- * Returns TRUE if successful (timestamp is stored at *ts).
+ * - if there's a database entry in the global file, return the corresponding
+ * stats_timestamp value.
+ *
+ * - if there's no db stat entry (e.g. for a new or inactive database),
+ * there's no stat_timestamp value, but also nothing to write so we return
+ * the timestamp of the global statfile.
* ----------
*/
static bool
-pgstat_read_statsfile_timestamp(bool permanent, TimestampTz *ts)
+pgstat_read_db_statsfile_timestamp(Oid databaseid, bool permanent,
+ TimestampTz *ts)
{
+ PgStat_StatDBEntry dbentry;
PgStat_GlobalStats myGlobalStats;
FILE *fpin;
int32 format_id;
const char *statfile = permanent ? PGSTAT_STAT_PERMANENT_FILENAME : pgstat_stat_filename;
/*
- * Try to open the status file. As above, anything but ENOENT is worthy
- * of complaining about.
+ * Try to open the stats file. As above, anything but ENOENT is worthy of
+ * complaining about.
*/
if ((fpin = AllocateFile(statfile, PG_BINARY_R)) == NULL)
{
/*
* Verify it's of the expected format.
*/
- if (fread(&format_id, 1, sizeof(format_id), fpin) != sizeof(format_id)
- || format_id != PGSTAT_FILE_FORMAT_ID)
+ if (fread(&format_id, 1, sizeof(format_id), fpin) != sizeof(format_id) ||
+ format_id != PGSTAT_FILE_FORMAT_ID)
{
ereport(pgStatRunningInCollector ? LOG : WARNING,
(errmsg("corrupted statistics file \"%s\"", statfile)));
/*
* Read global stats struct
*/
- if (fread(&myGlobalStats, 1, sizeof(myGlobalStats), fpin) != sizeof(myGlobalStats))
+ if (fread(&myGlobalStats, 1, sizeof(myGlobalStats),
+ fpin) != sizeof(myGlobalStats))
{
ereport(pgStatRunningInCollector ? LOG : WARNING,
(errmsg("corrupted statistics file \"%s\"", statfile)));
return false;
}
+ /* By default, we're going to return the timestamp of the global file. */
*ts = myGlobalStats.stats_timestamp;
+ /*
+ * We found an existing collector stats file. Read it and look for a
+ * record for the requested database. If found, use its timestamp.
+ */
+ for (;;)
+ {
+ switch (fgetc(fpin))
+ {
+ /*
+ * 'D' A PgStat_StatDBEntry struct describing a database
+ * follows.
+ */
+ case 'D':
+ if (fread(&dbentry, 1, offsetof(PgStat_StatDBEntry, tables),
+ fpin) != offsetof(PgStat_StatDBEntry, tables))
+ {
+ ereport(pgStatRunningInCollector ? LOG : WARNING,
+ (errmsg("corrupted statistics file \"%s\"",
+ statfile)));
+ goto done;
+ }
+
+ /*
+ * If this is the DB we're looking for, save its timestamp and
+ * we're done.
+ */
+ if (dbentry.databaseid == databaseid)
+ {
+ *ts = dbentry.stats_timestamp;
+ goto done;
+ }
+
+ break;
+
+ case 'E':
+ goto done;
+
+ default:
+ ereport(pgStatRunningInCollector ? LOG : WARNING,
+ (errmsg("corrupted statistics file \"%s\"",
+ statfile)));
+ goto done;
+ }
+ }
+
+done:
FreeFile(fpin);
return true;
}
CHECK_FOR_INTERRUPTS();
- ok = pgstat_read_statsfile_timestamp(false, &file_ts);
+ ok = pgstat_read_db_statsfile_timestamp(MyDatabaseId, false, &file_ts);
cur_ts = GetCurrentTimestamp();
/* Calculate min acceptable timestamp, if we didn't already */
/*
* We set the minimum acceptable timestamp to PGSTAT_STAT_INTERVAL
* msec before now. This indirectly ensures that the collector
- * needn't write the file more often than PGSTAT_STAT_INTERVAL.
- * In an autovacuum worker, however, we want a lower delay to
- * avoid using stale data, so we use PGSTAT_RETRY_DELAY (since the
+ * needn't write the file more often than PGSTAT_STAT_INTERVAL. In
+ * an autovacuum worker, however, we want a lower delay to avoid
+ * using stale data, so we use PGSTAT_RETRY_DELAY (since the
* number of workers is low, this shouldn't be a problem).
*
* We don't recompute min_ts after sleeping, except in the
* unlikely case that cur_ts went backwards. So we might end up
- * accepting a file a bit older than PGSTAT_STAT_INTERVAL. In
+ * accepting a file a bit older than PGSTAT_STAT_INTERVAL. In
* practice that shouldn't happen, though, as long as the sleep
* time is less than PGSTAT_STAT_INTERVAL; and we don't want to
* tell the collector that our cutoff time is less than what we'd
pfree(mytime);
}
- pgstat_send_inquiry(cur_ts, min_ts);
+ pgstat_send_inquiry(cur_ts, min_ts, MyDatabaseId);
break;
}
/* Not there or too old, so kick the collector and wait a bit */
if ((count % PGSTAT_INQ_LOOP_COUNT) == 0)
- pgstat_send_inquiry(cur_ts, min_ts);
+ pgstat_send_inquiry(cur_ts, min_ts, MyDatabaseId);
pg_usleep(PGSTAT_RETRY_DELAY * 1000L);
}
if (count >= PGSTAT_POLL_LOOP_COUNT)
elog(WARNING, "pgstat wait timeout");
- /* Autovacuum launcher wants stats about all databases */
+ /*
+ * Autovacuum launcher wants stats about all databases, but a shallow read
+ * is sufficient.
+ */
if (IsAutoVacuumLauncherProcess())
- pgStatDBHash = pgstat_read_statsfile(InvalidOid, false);
+ pgStatDBHash = pgstat_read_statsfiles(InvalidOid, false, false);
else
- pgStatDBHash = pgstat_read_statsfile(MyDatabaseId, false);
+ pgStatDBHash = pgstat_read_statsfiles(MyDatabaseId, false, true);
}
static void
pgstat_recv_inquiry(PgStat_MsgInquiry *msg, int len)
{
+ slist_iter iter;
+ bool found = false;
+ DBWriteRequest *newreq;
+ PgStat_StatDBEntry *dbentry;
+
+ elog(DEBUG2, "received inquiry for %d", msg->databaseid);
+
+ /*
+ * Find the last write request for this DB (found=true in that case).
+ * Plain linear search, not really worth doing any magic here (probably).
+ */
+ slist_foreach(iter, &last_statrequests)
+ {
+ DBWriteRequest *req = slist_container(DBWriteRequest, next, iter.cur);
+
+ if (req->databaseid != msg->databaseid)
+ continue;
+
+ if (msg->cutoff_time > req->request_time)
+ req->request_time = msg->cutoff_time;
+ found = true;
+ return;
+ }
+
/*
- * Advance last_statrequest if this requestor has a newer cutoff time
- * than any previous request.
+ * There's no request for this DB yet, so create one.
*/
- if (msg->cutoff_time > last_statrequest)
- last_statrequest = msg->cutoff_time;
+ newreq = palloc(sizeof(DBWriteRequest));
+
+ newreq->databaseid = msg->databaseid;
+ newreq->request_time = msg->clock_time;
+ slist_push_head(&last_statrequests, &newreq->next);
/*
- * If the requestor's local clock time is older than last_statwrite, we
+ * If the requestor's local clock time is older than stats_timestamp, we
* should suspect a clock glitch, ie system time going backwards; though
* the more likely explanation is just delayed message receipt. It is
* worth expending a GetCurrentTimestamp call to be sure, since a large
* retreat in the system clock reading could otherwise cause us to neglect
* to update the stats file for a long time.
*/
- if (msg->clock_time < last_statwrite)
+ dbentry = pgstat_get_db_entry(msg->databaseid, false);
+ if ((dbentry != NULL) && (msg->clock_time < dbentry->stats_timestamp))
{
TimestampTz cur_ts = GetCurrentTimestamp();
- if (cur_ts < last_statwrite)
+ if (cur_ts < dbentry->stats_timestamp)
{
/*
* Sure enough, time went backwards. Force a new stats file write
char *mytime;
/* Copy because timestamptz_to_str returns a static buffer */
- writetime = pstrdup(timestamptz_to_str(last_statwrite));
+ writetime = pstrdup(timestamptz_to_str(dbentry->stats_timestamp));
mytime = pstrdup(timestamptz_to_str(cur_ts));
- elog(LOG, "last_statwrite %s is later than collector's time %s",
- writetime, mytime);
+ elog(LOG,
+ "stats_timestamp %s is later than collector's time %s for db %d",
+ writetime, mytime, dbentry->databaseid);
pfree(writetime);
pfree(mytime);
- last_statrequest = cur_ts;
- last_statwrite = last_statrequest - 1;
+ newreq->request_time = cur_ts;
+ dbentry->stats_timestamp = cur_ts - 1;
}
}
}
static void
pgstat_recv_dropdb(PgStat_MsgDropdb *msg, int len)
{
+ Oid dbid = msg->m_databaseid;
PgStat_StatDBEntry *dbentry;
/*
* Lookup the database in the hashtable.
*/
- dbentry = pgstat_get_db_entry(msg->m_databaseid, false);
+ dbentry = pgstat_get_db_entry(dbid, false);
/*
- * If found, remove it.
+ * If found, remove it (along with the db statfile).
*/
if (dbentry)
{
+ char statfile[MAXPGPATH];
+
+ get_dbstat_filename(true, false, dbid, statfile, MAXPGPATH);
+
+ elog(DEBUG2, "removing %s", statfile);
+ unlink(statfile);
+
if (dbentry->tables != NULL)
hash_destroy(dbentry->tables);
if (dbentry->functions != NULL)
hash_destroy(dbentry->functions);
if (hash_search(pgStatDBHash,
- (void *) &(dbentry->databaseid),
+ (void *) &dbid,
HASH_REMOVE, NULL) == NULL)
ereport(ERROR,
- (errmsg("database hash table corrupted "
- "during cleanup --- abort")));
+ (errmsg("database hash table corrupted during cleanup --- abort")));
}
}
static void
pgstat_recv_resetcounter(PgStat_MsgResetcounter *msg, int len)
{
- HASHCTL hash_ctl;
PgStat_StatDBEntry *dbentry;
/*
dbentry->functions = NULL;
/*
- * Reset database-level stats too. This should match the initialization
- * code in pgstat_get_db_entry().
+ * Reset database-level stats, too. This creates empty hash tables for
+ * tables and functions.
*/
- dbentry->n_xact_commit = 0;
- dbentry->n_xact_rollback = 0;
- dbentry->n_blocks_fetched = 0;
- dbentry->n_blocks_hit = 0;
- dbentry->n_tuples_returned = 0;
- dbentry->n_tuples_fetched = 0;
- dbentry->n_tuples_inserted = 0;
- dbentry->n_tuples_updated = 0;
- dbentry->n_tuples_deleted = 0;
- dbentry->last_autovac_time = 0;
- dbentry->n_temp_bytes = 0;
- dbentry->n_temp_files = 0;
- dbentry->n_deadlocks = 0;
- dbentry->n_block_read_time = 0;
- dbentry->n_block_write_time = 0;
-
- dbentry->stat_reset_timestamp = GetCurrentTimestamp();
-
- memset(&hash_ctl, 0, sizeof(hash_ctl));
- hash_ctl.keysize = sizeof(Oid);
- hash_ctl.entrysize = sizeof(PgStat_StatTabEntry);
- hash_ctl.hash = oid_hash;
- dbentry->tables = hash_create("Per-database table",
- PGSTAT_TAB_HASH_SIZE,
- &hash_ctl,
- HASH_ELEM | HASH_FUNCTION);
-
- hash_ctl.keysize = sizeof(Oid);
- hash_ctl.entrysize = sizeof(PgStat_StatFuncEntry);
- hash_ctl.hash = oid_hash;
- dbentry->functions = hash_create("Per-database function",
- PGSTAT_FUNCTION_HASH_SIZE,
- &hash_ctl,
- HASH_ELEM | HASH_FUNCTION);
+ reset_dbentry_counters(dbentry);
}
/* ----------
HASH_REMOVE, NULL);
}
}
+
+/* ----------
+ * pgstat_write_statsfile_needed() -
+ *
+ * Do we need to write out the files?
+ * ----------
+ */
+static bool
+pgstat_write_statsfile_needed(void)
+{
+ if (!slist_is_empty(&last_statrequests))
+ return true;
+
+ /* Everything was written recently */
+ return false;
+}
+
+/* ----------
+ * pgstat_db_requested() -
+ *
+ * Checks whether stats for a particular DB need to be written to a file.
+ * ----------
+ */
+static bool
+pgstat_db_requested(Oid databaseid)
+{
+ slist_iter iter;
+
+ /* Check the databases if they need to refresh the stats. */
+ slist_foreach(iter, &last_statrequests)
+ {
+ DBWriteRequest *req = slist_container(DBWriteRequest, next, iter.cur);
+
+ if (req->databaseid == databaseid)
+ return true;
+ }
+
+ return false;
+}