4 * All the statistics collector stuff hacked up in one big, ugly file.
6 * TODO: - Separate collector, postmaster and backend stuff
7 * into different files.
9 * - Add some automatic call for pgstat vacuuming.
11 * - Add a pgstat config column to pg_database, so this
12 * entire thing can be enabled/disabled on a per db basis.
14 * Copyright (c) 2001-2003, PostgreSQL Global Development Group
16 * $PostgreSQL: pgsql/src/backend/postmaster/pgstat.c,v 1.76 2004/06/26 16:32:02 tgl Exp $
23 #include <sys/param.h>
25 #include <sys/socket.h>
27 #include <netinet/in.h>
28 #include <arpa/inet.h>
34 #include "access/heapam.h"
35 #include "access/xact.h"
36 #include "catalog/catname.h"
37 #include "catalog/pg_database.h"
38 #include "catalog/pg_shadow.h"
39 #include "libpq/libpq.h"
40 #include "libpq/pqsignal.h"
41 #include "mb/pg_wchar.h"
42 #include "miscadmin.h"
43 #include "postmaster/postmaster.h"
44 #include "storage/backendid.h"
45 #include "storage/ipc.h"
46 #include "storage/pg_shmem.h"
47 #include "storage/pmsignal.h"
48 #include "tcop/tcopprot.h"
49 #include "utils/hsearch.h"
50 #include "utils/memutils.h"
51 #include "utils/ps_status.h"
52 #include "utils/rel.h"
53 #include "utils/syscache.h"
57 * Paths for the statistics files. The %s is replaced with the
58 * installation's $PGDATA.
61 #define PGSTAT_STAT_FILENAME "%s/global/pgstat.stat"
62 #define PGSTAT_STAT_TMPFILE "%s/global/pgstat.tmp.%d"
68 #define PGSTAT_STAT_INTERVAL 500 /* How often to write the status
69 * file; in milliseconds. */
71 #define PGSTAT_DESTROY_DELAY 10000 /* How long to keep destroyed
72 * objects known, to give delayed
73 * UDP packets time to arrive;
76 #define PGSTAT_DESTROY_COUNT (PGSTAT_DESTROY_DELAY / PGSTAT_STAT_INTERVAL)
78 #define PGSTAT_RESTART_INTERVAL 60 /* How often to attempt to restart
79 * a failed statistics collector;
83 * Amount of space reserved in pgstat_recvbuffer().
86 #define PGSTAT_RECVBUFFERSZ ((int) (1024 * sizeof(PgStat_Msg)))
89 * The initial size hints for the hash tables used in the collector.
92 #define PGSTAT_DB_HASH_SIZE 16
93 #define PGSTAT_BE_HASH_SIZE 512
94 #define PGSTAT_TAB_HASH_SIZE 512
101 bool pgstat_collect_startcollector = true;
102 bool pgstat_collect_resetonpmstart = true;
103 bool pgstat_collect_querystring = false;
104 bool pgstat_collect_tuplelevel = false;
105 bool pgstat_collect_blocklevel = false;
111 NON_EXEC_STATIC int pgStatSock = -1;
112 static int pgStatPipe[2];
113 static struct sockaddr_storage pgStatAddr;
115 static time_t last_pgstat_start_time;
117 static long pgStatNumMessages = 0;
119 static bool pgStatRunningInCollector = FALSE;
121 static int pgStatTabstatAlloc = 0;
122 static int pgStatTabstatUsed = 0;
123 static PgStat_MsgTabstat **pgStatTabstatMessages = NULL;
125 #define TABSTAT_QUANTUM 4 /* we alloc this many at a time */
127 static int pgStatXactCommit = 0;
128 static int pgStatXactRollback = 0;
130 static TransactionId pgStatDBHashXact = InvalidTransactionId;
131 static HTAB *pgStatDBHash = NULL;
132 static HTAB *pgStatBeDead = NULL;
133 static PgStat_StatBeEntry *pgStatBeTable = NULL;
134 static int pgStatNumBackends = 0;
136 static char pgStat_fname[MAXPGPATH];
137 static char pgStat_tmpfname[MAXPGPATH];
141 * Local function forward declarations
146 typedef enum STATS_PROCESS_TYPE
150 } STATS_PROCESS_TYPE;
152 static pid_t pgstat_forkexec(STATS_PROCESS_TYPE procType);
153 static void pgstat_parseArgs(int argc, char *argv[]);
157 NON_EXEC_STATIC void PgstatBufferMain(int argc, char *argv[]);
158 NON_EXEC_STATIC void PgstatCollectorMain(int argc, char *argv[]);
159 static void pgstat_recvbuffer(void);
160 static void pgstat_exit(SIGNAL_ARGS);
161 static void pgstat_die(SIGNAL_ARGS);
163 static int pgstat_add_backend(PgStat_MsgHdr *msg);
164 static void pgstat_sub_backend(int procpid);
165 static void pgstat_drop_database(Oid databaseid);
166 static void pgstat_write_statsfile(void);
167 static void pgstat_read_statsfile(HTAB **dbhash, Oid onlydb,
168 PgStat_StatBeEntry **betab,
171 static void pgstat_setheader(PgStat_MsgHdr *hdr, int mtype);
172 static void pgstat_send(void *msg, int len);
174 static void pgstat_recv_bestart(PgStat_MsgBestart *msg, int len);
175 static void pgstat_recv_beterm(PgStat_MsgBeterm *msg, int len);
176 static void pgstat_recv_activity(PgStat_MsgActivity *msg, int len);
177 static void pgstat_recv_tabstat(PgStat_MsgTabstat *msg, int len);
178 static void pgstat_recv_tabpurge(PgStat_MsgTabpurge *msg, int len);
179 static void pgstat_recv_dropdb(PgStat_MsgDropdb *msg, int len);
180 static void pgstat_recv_resetcounter(PgStat_MsgResetcounter *msg, int len);
183 /* ------------------------------------------------------------
184 * Public functions called from postmaster follow
185 * ------------------------------------------------------------
191 * Called from postmaster at startup. Create the resources required
192 * by the statistics collector process. If unable to do so, do not
193 * fail --- better to let the postmaster start with stats collection
200 ACCEPT_TYPE_ARG3 alen;
201 struct addrinfo *addrs = NULL,
210 #define TESTBYTEVAL ((char) 199)
213 * Force start of collector daemon if something to collect
215 if (pgstat_collect_querystring ||
216 pgstat_collect_tuplelevel ||
217 pgstat_collect_blocklevel)
218 pgstat_collect_startcollector = true;
221 * Initialize the filename for the status reports. (In the EXEC_BACKEND
222 * case, this only sets the value in the postmaster. The collector
223 * subprocess will recompute the value for itself, and individual
224 * backends must do so also if they want to access the file.)
226 snprintf(pgStat_fname, MAXPGPATH, PGSTAT_STAT_FILENAME, DataDir);
229 * If we don't have to start a collector or should reset the collected
230 * statistics on postmaster start, simply remove the file.
232 if (!pgstat_collect_startcollector || pgstat_collect_resetonpmstart)
233 unlink(pgStat_fname);
236 * Nothing else required if collector will not get started
238 if (!pgstat_collect_startcollector)
242 * Create the UDP socket for sending and receiving statistic messages
244 hints.ai_flags = AI_PASSIVE;
245 hints.ai_family = PF_UNSPEC;
246 hints.ai_socktype = SOCK_DGRAM;
247 hints.ai_protocol = 0;
248 hints.ai_addrlen = 0;
249 hints.ai_addr = NULL;
250 hints.ai_canonname = NULL;
251 hints.ai_next = NULL;
252 ret = getaddrinfo_all("localhost", NULL, &hints, &addrs);
256 (errmsg("could not resolve \"localhost\": %s",
257 gai_strerror(ret))));
262 * On some platforms, getaddrinfo_all() may return multiple addresses
263 * only one of which will actually work (eg, both IPv6 and IPv4 addresses
264 * when kernel will reject IPv6). Worse, the failure may occur at the
265 * bind() or perhaps even connect() stage. So we must loop through the
266 * results till we find a working combination. We will generate LOG
267 * messages, but no error, for bogus combinations.
269 for (addr = addrs; addr; addr = addr->ai_next)
271 #ifdef HAVE_UNIX_SOCKETS
272 /* Ignore AF_UNIX sockets, if any are returned. */
273 if (addr->ai_family == AF_UNIX)
279 if ((pgStatSock = socket(addr->ai_family, SOCK_DGRAM, 0)) < 0)
282 (errcode_for_socket_access(),
283 errmsg("could not create socket for statistics collector: %m")));
288 * Bind it to a kernel assigned port on localhost and get the assigned
289 * port via getsockname().
291 if (bind(pgStatSock, addr->ai_addr, addr->ai_addrlen) < 0)
294 (errcode_for_socket_access(),
295 errmsg("could not bind socket for statistics collector: %m")));
296 closesocket(pgStatSock);
301 alen = sizeof(pgStatAddr);
302 if (getsockname(pgStatSock, (struct sockaddr *) &pgStatAddr, &alen) < 0)
305 (errcode_for_socket_access(),
306 errmsg("could not get address of socket for statistics collector: %m")));
307 closesocket(pgStatSock);
313 * Connect the socket to its own address. This saves a few cycles by
314 * not having to respecify the target address on every send. This also
315 * provides a kernel-level check that only packets from this same
316 * address will be received.
318 if (connect(pgStatSock, (struct sockaddr *) &pgStatAddr, alen) < 0)
321 (errcode_for_socket_access(),
322 errmsg("could not connect socket for statistics collector: %m")));
323 closesocket(pgStatSock);
329 * Try to send and receive a one-byte test message on the socket.
330 * This is to catch situations where the socket can be created but
331 * will not actually pass data (for instance, because kernel packet
332 * filtering rules prevent it).
334 test_byte = TESTBYTEVAL;
335 if (send(pgStatSock, &test_byte, 1, 0) != 1)
338 (errcode_for_socket_access(),
339 errmsg("could not send test message on socket for statistics collector: %m")));
340 closesocket(pgStatSock);
346 * There could possibly be a little delay before the message can be
347 * received. We arbitrarily allow up to half a second before deciding
350 for (;;) /* need a loop to handle EINTR */
353 FD_SET(pgStatSock, &rset);
356 sel_res = select(pgStatSock+1, &rset, NULL, NULL, &tv);
357 if (sel_res >= 0 || errno != EINTR)
363 (errcode_for_socket_access(),
364 errmsg("select() failed in statistics collector: %m")));
365 closesocket(pgStatSock);
369 if (sel_res == 0 || !FD_ISSET(pgStatSock, &rset))
372 * This is the case we actually think is likely, so take pains to
373 * give a specific message for it.
375 * errno will not be set meaningfully here, so don't use it.
378 (ERRCODE_CONNECTION_FAILURE,
379 errmsg("test message did not get through on socket for statistics collector")));
380 closesocket(pgStatSock);
385 test_byte++; /* just make sure variable is changed */
387 if (recv(pgStatSock, &test_byte, 1, 0) != 1)
390 (errcode_for_socket_access(),
391 errmsg("could not receive test message on socket for statistics collector: %m")));
392 closesocket(pgStatSock);
397 if (test_byte != TESTBYTEVAL) /* strictly paranoia ... */
400 (ERRCODE_INTERNAL_ERROR,
401 errmsg("incorrect test message transmission on socket for statistics collector")));
402 closesocket(pgStatSock);
407 /* If we get here, we have a working socket */
411 /* Did we find a working address? */
412 if (!addr || pgStatSock < 0)
415 (errcode_for_socket_access(),
416 errmsg("disabling statistics collector for lack of working socket")));
421 * Set the socket to non-blocking IO. This ensures that if the
422 * collector falls behind (despite the buffering process), statistics
423 * messages will be discarded; backends won't block waiting to send
424 * messages to the collector.
426 if (!set_noblock(pgStatSock))
429 (errcode_for_socket_access(),
430 errmsg("could not set statistics collector socket to nonblocking mode: %m")));
434 freeaddrinfo_all(hints.ai_family, addrs);
440 freeaddrinfo_all(hints.ai_family, addrs);
443 closesocket(pgStatSock);
446 /* Adjust GUC variables to suppress useless activity */
447 pgstat_collect_startcollector = false;
448 pgstat_collect_querystring = false;
449 pgstat_collect_tuplelevel = false;
450 pgstat_collect_blocklevel = false;
457 * pgstat_forkexec() -
459 * Format up the arglist for, then fork and exec, statistics
460 * (buffer and collector) processes
463 pgstat_forkexec(STATS_PROCESS_TYPE procType)
466 int ac = 0, bufc = 0, i;
467 char pgstatBuf[2][32];
469 av[ac++] = "postgres";
473 case STAT_PROC_BUFFER:
474 av[ac++] = "-forkbuf";
477 case STAT_PROC_COLLECTOR:
478 av[ac++] = "-forkcol";
485 av[ac++] = NULL; /* filled in by postmaster_forkexec */
487 /* postgres_exec_path is not passed by write_backend_variables */
488 av[ac++] = postgres_exec_path;
490 /* Pipe file ids (those not passed by write_backend_variables) */
491 snprintf(pgstatBuf[bufc++],32,"%d",pgStatPipe[0]);
492 snprintf(pgstatBuf[bufc++],32,"%d",pgStatPipe[1]);
494 /* Add to the arg list */
495 Assert(bufc <= lengthof(pgstatBuf));
496 for (i = 0; i < bufc; i++)
497 av[ac++] = pgstatBuf[i];
500 Assert(ac < lengthof(av));
502 return postmaster_forkexec(ac, av);
507 * pgstat_parseArgs() -
509 * Extract data from the arglist for exec'ed statistics
510 * (buffer and collector) processes
513 pgstat_parseArgs(int argc, char *argv[])
518 StrNCpy(postgres_exec_path, argv[argc++], MAXPGPATH);
519 pgStatPipe[0] = atoi(argv[argc++]);
520 pgStatPipe[1] = atoi(argv[argc++]);
523 #endif /* EXEC_BACKEND */
529 * Called from postmaster at startup or after an existing collector
530 * died. Attempt to fire up a fresh statistics collector.
532 * Returns PID of child process, or 0 if fail.
534 * Note: if fail, we will be called again from the postmaster main loop.
544 * Do nothing if no collector needed
546 if (!pgstat_collect_startcollector)
550 * Do nothing if too soon since last collector start. This is a
551 * safety valve to protect against continuous respawn attempts if the
552 * collector is dying immediately at launch. Note that since we will
553 * be re-called from the postmaster main loop, we will get another
556 curtime = time(NULL);
557 if ((unsigned int) (curtime - last_pgstat_start_time) <
558 (unsigned int) PGSTAT_RESTART_INTERVAL)
560 last_pgstat_start_time = curtime;
563 * Check that the socket is there, else pgstat_init failed.
568 (errmsg("statistics collector startup skipped")));
571 * We can only get here if someone tries to manually turn
572 * pgstat_collect_startcollector on after it had been off.
574 pgstat_collect_startcollector = false;
579 * Okay, fork off the collector.
586 /* Specific beos actions before backend startup */
587 beos_before_backend_startup();
591 switch ((pgStatPid = pgstat_forkexec(STAT_PROC_BUFFER)))
593 switch ((pgStatPid = fork()))
598 /* Specific beos actions */
599 beos_backend_startup_failed();
602 (errmsg("could not fork statistics buffer: %m")));
607 /* in postmaster child ... */
609 /* Specific beos actions after backend startup */
610 beos_backend_startup();
612 /* Close the postmaster's sockets */
613 ClosePostmasterPorts();
615 /* Drop our connection to postmaster's shared memory, as well */
616 PGSharedMemoryDetach();
618 PgstatBufferMain(0, NULL);
623 return (int) pgStatPid;
626 /* shouldn't get here */
634 * Called from postmaster to tell collector a backend terminated.
638 pgstat_beterm(int pid)
640 PgStat_MsgBeterm msg;
645 MemSet(&(msg.m_hdr), 0, sizeof(msg.m_hdr));
646 msg.m_hdr.m_type = PGSTAT_MTYPE_BETERM;
647 msg.m_hdr.m_procpid = pid;
649 pgstat_send(&msg, sizeof(msg));
653 /* ------------------------------------------------------------
654 * Public functions used by backends follow
655 *------------------------------------------------------------
662 * Tell the collector that this new backend is soon ready to process
663 * queries. Called from tcop/postgres.c before entering the mainloop.
669 PgStat_MsgBestart msg;
674 pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_BESTART);
675 pgstat_send(&msg, sizeof(msg));
680 * pgstat_report_activity() -
682 * Called in tcop/postgres.c to tell the collector what the backend
683 * is actually doing (usually "<IDLE>" or the start of the query to
688 pgstat_report_activity(const char *what)
690 PgStat_MsgActivity msg;
693 if (!pgstat_collect_querystring || pgStatSock < 0)
697 len = pg_mbcliplen((const unsigned char *) what, len,
698 PGSTAT_ACTIVITY_SIZE - 1);
700 memcpy(msg.m_what, what, len);
701 msg.m_what[len] = '\0';
702 len += offsetof(PgStat_MsgActivity, m_what) +1;
704 pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_ACTIVITY);
705 pgstat_send(&msg, len);
710 * pgstat_report_tabstat() -
712 * Called from tcop/postgres.c to send the so far collected
713 * per table access statistics to the collector.
717 pgstat_report_tabstat(void)
721 if (pgStatSock < 0 ||
722 !(pgstat_collect_querystring ||
723 pgstat_collect_tuplelevel ||
724 pgstat_collect_blocklevel))
726 /* Not reporting stats, so just flush whatever we have */
727 pgStatTabstatUsed = 0;
732 * For each message buffer used during the last query set the header
733 * fields and send it out.
735 for (i = 0; i < pgStatTabstatUsed; i++)
737 PgStat_MsgTabstat *tsmsg = pgStatTabstatMessages[i];
741 n = tsmsg->m_nentries;
742 len = offsetof(PgStat_MsgTabstat, m_entry[0]) +
743 n * sizeof(PgStat_TableEntry);
745 tsmsg->m_xact_commit = pgStatXactCommit;
746 tsmsg->m_xact_rollback = pgStatXactRollback;
747 pgStatXactCommit = 0;
748 pgStatXactRollback = 0;
750 pgstat_setheader(&tsmsg->m_hdr, PGSTAT_MTYPE_TABSTAT);
751 pgstat_send(tsmsg, len);
754 pgStatTabstatUsed = 0;
759 * pgstat_vacuum_tabstat() -
761 * Will tell the collector about objects he can get rid of.
765 pgstat_vacuum_tabstat(void)
773 HASH_SEQ_STATUS hstat;
774 PgStat_StatDBEntry *dbentry;
775 PgStat_StatTabEntry *tabentry;
778 PgStat_MsgTabpurge msg;
786 * If not done for this transaction, read the statistics collector
787 * stats file into some hash tables.
789 if (!TransactionIdEquals(pgStatDBHashXact, GetCurrentTransactionId()))
791 pgstat_read_statsfile(&pgStatDBHash, MyDatabaseId,
792 &pgStatBeTable, &pgStatNumBackends);
793 pgStatDBHashXact = GetCurrentTransactionId();
797 * Lookup our own database entry
799 dbentry = (PgStat_StatDBEntry *) hash_search(pgStatDBHash,
800 (void *) &MyDatabaseId,
805 if (dbentry->tables == NULL)
809 * Initialize our messages table counter to zero
814 * Check for all tables if they still exist.
816 hash_seq_init(&hstat, dbentry->tables);
817 while ((tabentry = (PgStat_StatTabEntry *) hash_seq_search(&hstat)) != NULL)
820 * Check if this relation is still alive by looking up it's
821 * pg_class tuple in the system catalog cache.
823 reltup = SearchSysCache(RELOID,
824 ObjectIdGetDatum(tabentry->tableid),
826 if (HeapTupleIsValid(reltup))
828 ReleaseSysCache(reltup);
833 * Add this tables Oid to the message
835 msg.m_tableid[msg.m_nentries++] = tabentry->tableid;
839 * If the message is full, send it out and reinitialize ot zero
841 if (msg.m_nentries >= PGSTAT_NUM_TABPURGE)
843 len = offsetof(PgStat_MsgTabpurge, m_tableid[0])
844 +msg.m_nentries * sizeof(Oid);
846 pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_TABPURGE);
847 pgstat_send(&msg, len);
856 if (msg.m_nentries > 0)
858 len = offsetof(PgStat_MsgTabpurge, m_tableid[0])
859 +msg.m_nentries * sizeof(Oid);
861 pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_TABPURGE);
862 pgstat_send(&msg, len);
866 * Read pg_database and remember the Oid's of all existing databases
870 dbidlist = (Oid *) palloc(sizeof(Oid) * dbidalloc);
872 dbrel = heap_openr(DatabaseRelationName, AccessShareLock);
873 dbscan = heap_beginscan(dbrel, SnapshotNow, 0, NULL);
874 while ((dbtup = heap_getnext(dbscan, ForwardScanDirection)) != NULL)
876 if (dbidused >= dbidalloc)
879 dbidlist = (Oid *) repalloc((char *) dbidlist,
880 sizeof(Oid) * dbidalloc);
882 dbidlist[dbidused++] = HeapTupleGetOid(dbtup);
884 heap_endscan(dbscan);
885 heap_close(dbrel, AccessShareLock);
888 * Search the database hash table for dead databases and tell the
889 * collector to drop them as well.
891 hash_seq_init(&hstat, pgStatDBHash);
892 while ((dbentry = (PgStat_StatDBEntry *) hash_seq_search(&hstat)) != NULL)
894 Oid dbid = dbentry->databaseid;
896 for (i = 0; i < dbidused; i++)
898 if (dbidlist[i] == dbid)
905 if (dbid != InvalidOid)
908 pgstat_drop_database(dbid);
913 * Free the dbid list.
915 pfree((char *) dbidlist);
918 * Tell the caller how many removeable objects we found
925 * pgstat_drop_database() -
927 * Tell the collector that we just dropped a database.
928 * This is the only message that shouldn't get lost in space. Otherwise
929 * the collector will keep the statistics for the dead DB until his
930 * stats file got removed while the postmaster is down.
934 pgstat_drop_database(Oid databaseid)
936 PgStat_MsgDropdb msg;
941 msg.m_databaseid = databaseid;
943 pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_DROPDB);
944 pgstat_send(&msg, sizeof(msg));
949 * pgstat_reset_counters() -
951 * Tell the statistics collector to reset counters for our database.
955 pgstat_reset_counters(void)
957 PgStat_MsgResetcounter msg;
964 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
965 errmsg("must be superuser to reset statistics counters")));
967 pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_RESETCOUNTER);
968 pgstat_send(&msg, sizeof(msg));
975 * Send some junk data to the collector to increase traffic.
986 pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_DUMMY);
987 pgstat_send(&msg, sizeof(msg));
991 * Create or enlarge the pgStatTabstatMessages array
994 more_tabstat_space(void)
996 PgStat_MsgTabstat *newMessages;
997 PgStat_MsgTabstat **msgArray;
998 int newAlloc = pgStatTabstatAlloc + TABSTAT_QUANTUM;
1001 /* Create (another) quantum of message buffers */
1002 newMessages = (PgStat_MsgTabstat *)
1003 malloc(sizeof(PgStat_MsgTabstat) * TABSTAT_QUANTUM);
1004 if (newMessages == NULL)
1007 (errcode(ERRCODE_OUT_OF_MEMORY),
1008 errmsg("out of memory")));
1012 /* Create or enlarge the pointer array */
1013 if (pgStatTabstatMessages == NULL)
1014 msgArray = (PgStat_MsgTabstat **)
1015 malloc(sizeof(PgStat_MsgTabstat *) * newAlloc);
1017 msgArray = (PgStat_MsgTabstat **)
1018 realloc(pgStatTabstatMessages,
1019 sizeof(PgStat_MsgTabstat *) * newAlloc);
1020 if (msgArray == NULL)
1024 (errcode(ERRCODE_OUT_OF_MEMORY),
1025 errmsg("out of memory")));
1029 MemSet(newMessages, 0, sizeof(PgStat_MsgTabstat) * TABSTAT_QUANTUM);
1030 for (i = 0; i < TABSTAT_QUANTUM; i++)
1031 msgArray[pgStatTabstatAlloc + i] = newMessages++;
1032 pgStatTabstatMessages = msgArray;
1033 pgStatTabstatAlloc = newAlloc;
1039 * pgstat_initstats() -
1041 * Called from various places usually dealing with initialization
1042 * of Relation or Scan structures. The data placed into these
1043 * structures from here tell where later to count for buffer reads,
1044 * scans and tuples fetched.
1048 pgstat_initstats(PgStat_Info *stats, Relation rel)
1050 Oid rel_id = rel->rd_id;
1051 PgStat_TableEntry *useent;
1052 PgStat_MsgTabstat *tsmsg;
1057 * Initialize data not to count at all.
1059 stats->tabentry = NULL;
1060 stats->no_stats = FALSE;
1061 stats->heap_scan_counted = FALSE;
1062 stats->index_scan_counted = FALSE;
1064 if (pgStatSock < 0 ||
1065 !(pgstat_collect_tuplelevel ||
1066 pgstat_collect_blocklevel))
1068 stats->no_stats = TRUE;
1073 * Search the already-used message slots for this relation.
1075 for (mb = 0; mb < pgStatTabstatUsed; mb++)
1077 tsmsg = pgStatTabstatMessages[mb];
1079 for (i = tsmsg->m_nentries; --i >= 0; )
1081 if (tsmsg->m_entry[i].t_id == rel_id)
1083 stats->tabentry = (void *) &(tsmsg->m_entry[i]);
1088 if (tsmsg->m_nentries >= PGSTAT_NUM_TABENTRIES)
1092 * Not found, but found a message buffer with an empty slot
1093 * instead. Fine, let's use this one.
1095 i = tsmsg->m_nentries++;
1096 useent = &tsmsg->m_entry[i];
1097 MemSet(useent, 0, sizeof(PgStat_TableEntry));
1098 useent->t_id = rel_id;
1099 stats->tabentry = (void *) useent;
1104 * If we ran out of message buffers, we just allocate more.
1106 if (pgStatTabstatUsed >= pgStatTabstatAlloc)
1108 if (!more_tabstat_space())
1110 stats->no_stats = TRUE;
1113 Assert(pgStatTabstatUsed < pgStatTabstatAlloc);
1117 * Use the first entry of the next message buffer.
1119 mb = pgStatTabstatUsed++;
1120 tsmsg = pgStatTabstatMessages[mb];
1121 tsmsg->m_nentries = 1;
1122 useent = &tsmsg->m_entry[0];
1123 MemSet(useent, 0, sizeof(PgStat_TableEntry));
1124 useent->t_id = rel_id;
1125 stats->tabentry = (void *) useent;
1130 * pgstat_count_xact_commit() -
1132 * Called from access/transam/xact.c to count transaction commits.
1136 pgstat_count_xact_commit(void)
1138 if (!(pgstat_collect_querystring ||
1139 pgstat_collect_tuplelevel ||
1140 pgstat_collect_blocklevel))
1146 * If there was no relation activity yet, just make one existing
1147 * message buffer used without slots, causing the next report to tell
1148 * new xact-counters.
1150 if (pgStatTabstatAlloc == 0)
1152 if (!more_tabstat_space())
1155 if (pgStatTabstatUsed == 0)
1157 pgStatTabstatUsed++;
1158 pgStatTabstatMessages[0]->m_nentries = 0;
1164 * pgstat_count_xact_rollback() -
1166 * Called from access/transam/xact.c to count transaction rollbacks.
1170 pgstat_count_xact_rollback(void)
1172 if (!(pgstat_collect_querystring ||
1173 pgstat_collect_tuplelevel ||
1174 pgstat_collect_blocklevel))
1177 pgStatXactRollback++;
1180 * If there was no relation activity yet, just make one existing
1181 * message buffer used without slots, causing the next report to tell
1182 * new xact-counters.
1184 if (pgStatTabstatAlloc == 0)
1186 if (!more_tabstat_space())
1189 if (pgStatTabstatUsed == 0)
1191 pgStatTabstatUsed++;
1192 pgStatTabstatMessages[0]->m_nentries = 0;
1198 * pgstat_fetch_stat_dbentry() -
1200 * Support function for the SQL-callable pgstat* functions. Returns
1201 * the collected statistics for one database or NULL. NULL doesn't mean
1202 * that the database doesn't exist, it is just not yet known by the
1203 * collector, so the caller is better off to report ZERO instead.
1206 PgStat_StatDBEntry *
1207 pgstat_fetch_stat_dbentry(Oid dbid)
1209 PgStat_StatDBEntry *dbentry;
1212 * If not done for this transaction, read the statistics collector
1213 * stats file into some hash tables. Be careful with the
1214 * read_statsfile() call below!
1216 if (!TransactionIdEquals(pgStatDBHashXact, GetCurrentTransactionId()))
1218 pgstat_read_statsfile(&pgStatDBHash, MyDatabaseId,
1219 &pgStatBeTable, &pgStatNumBackends);
1220 pgStatDBHashXact = GetCurrentTransactionId();
1224 * Lookup the requested database
1226 dbentry = (PgStat_StatDBEntry *) hash_search(pgStatDBHash,
1229 if (dbentry == NULL)
1237 * pgstat_fetch_stat_tabentry() -
1239 * Support function for the SQL-callable pgstat* functions. Returns
1240 * the collected statistics for one table or NULL. NULL doesn't mean
1241 * that the table doesn't exist, it is just not yet known by the
1242 * collector, so the caller is better off to report ZERO instead.
1245 PgStat_StatTabEntry *
1246 pgstat_fetch_stat_tabentry(Oid relid)
1248 PgStat_StatDBEntry *dbentry;
1249 PgStat_StatTabEntry *tabentry;
1252 * If not done for this transaction, read the statistics collector
1253 * stats file into some hash tables. Be careful with the
1254 * read_statsfile() call below!
1256 if (!TransactionIdEquals(pgStatDBHashXact, GetCurrentTransactionId()))
1258 pgstat_read_statsfile(&pgStatDBHash, MyDatabaseId,
1259 &pgStatBeTable, &pgStatNumBackends);
1260 pgStatDBHashXact = GetCurrentTransactionId();
1264 * Lookup our database.
1266 dbentry = (PgStat_StatDBEntry *) hash_search(pgStatDBHash,
1267 (void *) &MyDatabaseId,
1269 if (dbentry == NULL)
1273 * Now inside the DB's table hash table lookup the requested one.
1275 if (dbentry->tables == NULL)
1277 tabentry = (PgStat_StatTabEntry *) hash_search(dbentry->tables,
1280 if (tabentry == NULL)
1288 * pgstat_fetch_stat_beentry() -
1290 * Support function for the SQL-callable pgstat* functions. Returns
1291 * the actual activity slot of one active backend. The caller is
1292 * responsible for a check if the actual user is permitted to see
1293 * that info (especially the querystring).
1296 PgStat_StatBeEntry *
1297 pgstat_fetch_stat_beentry(int beid)
1299 if (!TransactionIdEquals(pgStatDBHashXact, GetCurrentTransactionId()))
1301 pgstat_read_statsfile(&pgStatDBHash, MyDatabaseId,
1302 &pgStatBeTable, &pgStatNumBackends);
1303 pgStatDBHashXact = GetCurrentTransactionId();
1306 if (beid < 1 || beid > pgStatNumBackends)
1309 return &pgStatBeTable[beid - 1];
1314 * pgstat_fetch_stat_numbackends() -
1316 * Support function for the SQL-callable pgstat* functions. Returns
1317 * the maximum current backend id.
1321 pgstat_fetch_stat_numbackends(void)
1323 if (!TransactionIdEquals(pgStatDBHashXact, GetCurrentTransactionId()))
1325 pgstat_read_statsfile(&pgStatDBHash, MyDatabaseId,
1326 &pgStatBeTable, &pgStatNumBackends);
1327 pgStatDBHashXact = GetCurrentTransactionId();
1330 return pgStatNumBackends;
1335 /* ------------------------------------------------------------
1336 * Local support functions follow
1337 * ------------------------------------------------------------
1342 * pgstat_setheader() -
1344 * Set common header fields in a statistics message
1348 pgstat_setheader(PgStat_MsgHdr *hdr, int mtype)
1350 hdr->m_type = mtype;
1351 hdr->m_backendid = MyBackendId;
1352 hdr->m_procpid = MyProcPid;
1353 hdr->m_databaseid = MyDatabaseId;
1354 hdr->m_userid = GetSessionUserId();
1361 * Send out one statistics message to the collector
1365 pgstat_send(void *msg, int len)
1370 ((PgStat_MsgHdr *) msg)->m_size = len;
1372 send(pgStatSock, msg, len, 0);
1373 /* We deliberately ignore any error from send() */
1378 * PgstatBufferMain() -
1380 * Start up the statistics buffer process. This is the body of the
1381 * postmaster child process.
1383 * The argc/argv parameters are valid only in EXEC_BACKEND case.
1386 NON_EXEC_STATIC void
1387 PgstatBufferMain(int argc, char *argv[])
1389 IsUnderPostmaster = true; /* we are a postmaster subprocess now */
1391 MyProcPid = getpid(); /* reset MyProcPid */
1393 /* Lose the postmaster's on-exit routines */
1397 * Ignore all signals usually bound to some action in the postmaster,
1398 * except for SIGCHLD and SIGQUIT --- see pgstat_recvbuffer.
1400 pqsignal(SIGHUP, SIG_IGN);
1401 pqsignal(SIGINT, SIG_IGN);
1402 pqsignal(SIGTERM, SIG_IGN);
1403 pqsignal(SIGQUIT, pgstat_exit);
1404 pqsignal(SIGALRM, SIG_IGN);
1405 pqsignal(SIGPIPE, SIG_IGN);
1406 pqsignal(SIGUSR1, SIG_IGN);
1407 pqsignal(SIGUSR2, SIG_IGN);
1408 pqsignal(SIGCHLD, pgstat_die);
1409 pqsignal(SIGTTIN, SIG_DFL);
1410 pqsignal(SIGTTOU, SIG_DFL);
1411 pqsignal(SIGCONT, SIG_DFL);
1412 pqsignal(SIGWINCH, SIG_DFL);
1413 /* unblock will happen in pgstat_recvbuffer */
1416 pgstat_parseArgs(argc,argv);
1420 * Start a buffering process to read from the socket, so we have a
1421 * little more time to process incoming messages.
1423 * NOTE: the process structure is: postmaster is parent of buffer process
1424 * is parent of collector process. This way, the buffer can detect
1425 * collector failure via SIGCHLD, whereas otherwise it wouldn't notice
1426 * collector failure until it tried to write on the pipe. That would
1427 * mean that after the postmaster started a new collector, we'd have
1428 * two buffer processes competing to read from the UDP socket --- not
1431 if (pgpipe(pgStatPipe) < 0)
1434 (errcode_for_socket_access(),
1435 errmsg("could not create pipe for statistics buffer: %m")));
1440 /* child becomes collector process */
1441 switch (pgstat_forkexec(STAT_PROC_COLLECTOR))
1448 (errmsg("could not fork statistics collector: %m")));
1451 #ifndef EXEC_BACKEND
1453 /* child becomes collector process */
1454 PgstatCollectorMain(0, NULL);
1459 /* parent becomes buffer process */
1460 closesocket(pgStatPipe[0]);
1461 pgstat_recvbuffer();
1468 * PgstatCollectorMain() -
1470 * Start up the statistics collector itself. This is the body of the
1471 * postmaster grandchild process.
1473 * The argc/argv parameters are valid only in EXEC_BACKEND case.
1476 NON_EXEC_STATIC void
1477 PgstatCollectorMain(int argc, char *argv[])
1484 struct timeval timeout;
1485 struct timeval next_statwrite;
1486 bool need_statwrite;
1489 MyProcPid = getpid(); /* reset MyProcPid */
1492 * Reset signal handling. With the exception of restoring default
1493 * SIGCHLD and SIGQUIT handling, this is a no-op in the non-EXEC_BACKEND
1494 * case because we'll have inherited these settings from the buffer
1495 * process; but it's not a no-op for EXEC_BACKEND.
1497 pqsignal(SIGHUP, SIG_IGN);
1498 pqsignal(SIGINT, SIG_IGN);
1499 pqsignal(SIGTERM, SIG_IGN);
1500 pqsignal(SIGQUIT, SIG_IGN);
1501 pqsignal(SIGALRM, SIG_IGN);
1502 pqsignal(SIGPIPE, SIG_IGN);
1503 pqsignal(SIGUSR1, SIG_IGN);
1504 pqsignal(SIGUSR2, SIG_IGN);
1505 pqsignal(SIGCHLD, SIG_DFL);
1506 pqsignal(SIGTTIN, SIG_DFL);
1507 pqsignal(SIGTTOU, SIG_DFL);
1508 pqsignal(SIGCONT, SIG_DFL);
1509 pqsignal(SIGWINCH, SIG_DFL);
1510 PG_SETMASK(&UnBlockSig);
1513 pgstat_parseArgs(argc,argv);
1516 /* Close unwanted files */
1517 closesocket(pgStatPipe[1]);
1518 closesocket(pgStatSock);
1521 * Identify myself via ps
1523 init_ps_display("stats collector process", "", "");
1527 * Initialize filenames needed for status reports.
1529 snprintf(pgStat_fname, MAXPGPATH, PGSTAT_STAT_FILENAME, DataDir);
1530 /* tmpfname need only be set correctly in this process */
1531 snprintf(pgStat_tmpfname, MAXPGPATH, PGSTAT_STAT_TMPFILE,
1535 * Arrange to write the initial status file right away
1537 gettimeofday(&next_statwrite, NULL);
1538 need_statwrite = TRUE;
1541 * Read in an existing statistics stats file or initialize the stats
1544 pgStatRunningInCollector = TRUE;
1545 pgstat_read_statsfile(&pgStatDBHash, InvalidOid, NULL, NULL);
1548 * Create the dead backend hashtable
1550 memset(&hash_ctl, 0, sizeof(hash_ctl));
1551 hash_ctl.keysize = sizeof(int);
1552 hash_ctl.entrysize = sizeof(PgStat_StatBeDead);
1553 hash_ctl.hash = tag_hash;
1554 pgStatBeDead = hash_create("Dead Backends", PGSTAT_BE_HASH_SIZE,
1555 &hash_ctl, HASH_ELEM | HASH_FUNCTION);
1556 if (pgStatBeDead == NULL)
1558 /* assume the problem is out-of-memory */
1560 (errcode(ERRCODE_OUT_OF_MEMORY),
1561 errmsg("out of memory in statistics collector --- abort")));
1566 * Create the known backends table
1568 pgStatBeTable = (PgStat_StatBeEntry *) malloc(
1569 sizeof(PgStat_StatBeEntry) * MaxBackends);
1570 if (pgStatBeTable == NULL)
1573 (errcode(ERRCODE_OUT_OF_MEMORY),
1574 errmsg("out of memory in statistics collector --- abort")));
1577 memset(pgStatBeTable, 0, sizeof(PgStat_StatBeEntry) * MaxBackends);
1579 readPipe = pgStatPipe[0];
1582 * Process incoming messages and handle all the reporting stuff until
1583 * there are no more messages.
1588 * If we need to write the status file again (there have been
1589 * changes in the statistics since we wrote it last) calculate the
1590 * timeout until we have to do so.
1596 gettimeofday(&now, NULL);
1597 /* avoid assuming that tv_sec is signed */
1598 if (now.tv_sec > next_statwrite.tv_sec ||
1599 (now.tv_sec == next_statwrite.tv_sec &&
1600 now.tv_usec >= next_statwrite.tv_usec))
1603 timeout.tv_usec = 0;
1607 timeout.tv_sec = next_statwrite.tv_sec - now.tv_sec;
1608 timeout.tv_usec = next_statwrite.tv_usec - now.tv_usec;
1609 if (timeout.tv_usec < 0)
1612 timeout.tv_usec += 1000000;
1618 * Setup the descriptor set for select(2)
1621 FD_SET(readPipe, &rfds);
1624 * Now wait for something to do.
1626 nready = select(readPipe+1, &rfds, NULL, NULL,
1627 (need_statwrite) ? &timeout : NULL);
1633 (errcode_for_socket_access(),
1634 errmsg("select() failed in statistics collector: %m")));
1639 * If there are no descriptors ready, our timeout for writing the
1640 * stats file happened.
1644 pgstat_write_statsfile();
1645 need_statwrite = FALSE;
1651 * Check if there is a new statistics message to collect.
1653 if (FD_ISSET(readPipe, &rfds))
1656 * We may need to issue multiple read calls in case the buffer
1657 * process didn't write the message in a single write, which
1658 * is possible since it dumps its buffer bytewise. In any
1659 * case, we'd need two reads since we don't know the message
1663 int targetlen = sizeof(PgStat_MsgHdr); /* initial */
1664 bool pipeEOF = false;
1666 while (nread < targetlen)
1668 len = piperead(readPipe, ((char *) &msg) + nread,
1675 (errcode_for_socket_access(),
1676 errmsg("could not read from statistics collector pipe: %m")));
1679 if (len == 0) /* EOF on the pipe! */
1685 if (nread == sizeof(PgStat_MsgHdr))
1687 /* we have the header, compute actual msg length */
1688 targetlen = msg.msg_hdr.m_size;
1689 if (targetlen < (int) sizeof(PgStat_MsgHdr) ||
1690 targetlen > (int) sizeof(msg))
1693 * Bogus message length implies that we got out of
1694 * sync with the buffer process somehow. Abort so
1695 * that we can restart both processes.
1698 (errmsg("invalid statistics message length")));
1705 * EOF on the pipe implies that the buffer process exited.
1706 * Fall out of outer loop.
1712 * Distribute the message to the specific function handling
1715 switch (msg.msg_hdr.m_type)
1717 case PGSTAT_MTYPE_DUMMY:
1720 case PGSTAT_MTYPE_BESTART:
1721 pgstat_recv_bestart((PgStat_MsgBestart *) &msg, nread);
1724 case PGSTAT_MTYPE_BETERM:
1725 pgstat_recv_beterm((PgStat_MsgBeterm *) &msg, nread);
1728 case PGSTAT_MTYPE_TABSTAT:
1729 pgstat_recv_tabstat((PgStat_MsgTabstat *) &msg, nread);
1732 case PGSTAT_MTYPE_TABPURGE:
1733 pgstat_recv_tabpurge((PgStat_MsgTabpurge *) &msg, nread);
1736 case PGSTAT_MTYPE_ACTIVITY:
1737 pgstat_recv_activity((PgStat_MsgActivity *) &msg, nread);
1740 case PGSTAT_MTYPE_DROPDB:
1741 pgstat_recv_dropdb((PgStat_MsgDropdb *) &msg, nread);
1744 case PGSTAT_MTYPE_RESETCOUNTER:
1745 pgstat_recv_resetcounter((PgStat_MsgResetcounter *) &msg,
1754 * Globally count messages.
1756 pgStatNumMessages++;
1759 * If this is the first message after we wrote the stats file
1760 * the last time, setup the timeout that it'd be written.
1762 if (!need_statwrite)
1764 gettimeofday(&next_statwrite, NULL);
1765 next_statwrite.tv_usec += ((PGSTAT_STAT_INTERVAL) * 1000);
1766 next_statwrite.tv_sec += (next_statwrite.tv_usec / 1000000);
1767 next_statwrite.tv_usec %= 1000000;
1768 need_statwrite = TRUE;
1773 * Note that we do NOT check for postmaster exit inside the loop;
1774 * only EOF on the buffer pipe causes us to fall out. This
1775 * ensures we don't exit prematurely if there are still a few
1776 * messages in the buffer or pipe at postmaster shutdown.
1781 * Okay, we saw EOF on the buffer pipe, so there are no more messages
1782 * to process. If the buffer process quit because of postmaster
1783 * shutdown, we want to save the final stats to reuse at next startup.
1784 * But if the buffer process failed, it seems best not to (there may
1785 * even now be a new collector firing up, and we don't want it to read
1786 * a partially-rewritten stats file).
1788 if (!PostmasterIsAlive(false))
1789 pgstat_write_statsfile();
1794 * pgstat_recvbuffer() -
1796 * This is the body of the separate buffering process. Its only
1797 * purpose is to receive messages from the UDP socket as fast as
1798 * possible and forward them over a pipe into the collector itself.
1799 * If the collector is slow to absorb messages, they are buffered here.
1803 pgstat_recvbuffer(void)
1807 struct timeval timeout;
1808 int writePipe = pgStatPipe[1];
1814 PgStat_Msg input_buffer;
1816 int msg_send = 0; /* next send index in buffer */
1817 int msg_recv = 0; /* next receive index */
1818 int msg_have = 0; /* number of bytes stored */
1819 bool overflow = false;
1822 * Identify myself via ps
1824 init_ps_display("stats buffer process", "", "");
1828 * We want to die if our child collector process does. There are two
1829 * ways we might notice that it has died: receive SIGCHLD, or get a
1830 * write failure on the pipe leading to the child. We can set SIGPIPE
1831 * to kill us here. Our SIGCHLD handler was already set up before we
1832 * forked (must do it that way, else it's a race condition).
1834 pqsignal(SIGPIPE, SIG_DFL);
1835 PG_SETMASK(&UnBlockSig);
1838 * Set the write pipe to nonblock mode, so that we cannot block when
1839 * the collector falls behind.
1841 if (!set_noblock(writePipe))
1844 (errcode_for_socket_access(),
1845 errmsg("could not set statistics collector pipe to nonblocking mode: %m")));
1850 * Allocate the message buffer
1852 msgbuffer = (char *) malloc(PGSTAT_RECVBUFFERSZ);
1853 if (msgbuffer == NULL)
1856 (errcode(ERRCODE_OUT_OF_MEMORY),
1857 errmsg("out of memory in statistics collector --- abort")));
1871 * As long as we have buffer space we add the socket to the read
1874 if (msg_have <= (int) (PGSTAT_RECVBUFFERSZ - sizeof(PgStat_Msg)))
1876 FD_SET(pgStatSock, &rfds);
1885 (errmsg("statistics buffer is full")));
1891 * If we have messages to write out, we add the pipe to the write
1896 FD_SET(writePipe, &wfds);
1897 if (writePipe > maxfd)
1902 * Wait for some work to do; but not for more than 10 seconds.
1903 * (This determines how quickly we will shut down after an
1904 * ungraceful postmaster termination; so it needn't be very fast.)
1906 timeout.tv_sec = 10;
1907 timeout.tv_usec = 0;
1909 nready = select(maxfd + 1, &rfds, &wfds, NULL, &timeout);
1915 (errcode_for_socket_access(),
1916 errmsg("select() failed in statistics buffer: %m")));
1921 * If there is a message on the socket, read it and check for
1924 if (FD_ISSET(pgStatSock, &rfds))
1926 len = recv(pgStatSock, (char *) &input_buffer,
1927 sizeof(PgStat_Msg), 0);
1931 (errcode_for_socket_access(),
1932 errmsg("could not read statistics message: %m")));
1937 * We ignore messages that are smaller than our common header
1939 if (len < sizeof(PgStat_MsgHdr))
1943 * The received length must match the length in the header
1945 if (input_buffer.msg_hdr.m_size != len)
1949 * O.K. - we accept this message. Copy it to the circular
1955 xfr = PGSTAT_RECVBUFFERSZ - msg_recv;
1959 memcpy(msgbuffer + msg_recv,
1960 ((char *) &input_buffer) + frm,
1963 if (msg_recv == PGSTAT_RECVBUFFERSZ)
1972 * If the collector is ready to receive, write some data into his
1973 * pipe. We may or may not be able to write all that we have.
1975 * NOTE: if what we have is less than PIPE_BUF bytes but more than
1976 * the space available in the pipe buffer, most kernels will
1977 * refuse to write any of it, and will return EAGAIN. This means
1978 * we will busy-loop until the situation changes (either because
1979 * the collector caught up, or because more data arrives so that
1980 * we have more than PIPE_BUF bytes buffered). This is not good,
1981 * but is there any way around it? We have no way to tell when
1982 * the collector has caught up...
1984 if (FD_ISSET(writePipe, &wfds))
1986 xfr = PGSTAT_RECVBUFFERSZ - msg_send;
1990 len = pipewrite(writePipe, msgbuffer + msg_send, xfr);
1993 if (errno == EINTR || errno == EAGAIN)
1994 continue; /* not enough space in pipe */
1996 (errcode_for_socket_access(),
1997 errmsg("could not write to statistics collector pipe: %m")));
2000 /* NB: len < xfr is okay */
2002 if (msg_send == PGSTAT_RECVBUFFERSZ)
2008 * Make sure we forwarded all messages before we check for
2009 * postmaster termination.
2011 if (msg_have != 0 || FD_ISSET(pgStatSock, &rfds))
2015 * If the postmaster has terminated, we die too. (This is no longer
2016 * the normal exit path, however.)
2018 if (!PostmasterIsAlive(true))
2023 /* SIGQUIT signal handler for buffer process */
2025 pgstat_exit(SIGNAL_ARGS)
2028 * For now, we just nail the doors shut and get out of town. It might
2029 * be cleaner to allow any pending messages to be sent, but that creates
2030 * a tradeoff against speed of exit.
2035 /* SIGCHLD signal handler for buffer process */
2037 pgstat_die(SIGNAL_ARGS)
2044 * pgstat_add_backend() -
2046 * Support function to keep our backend list up to date.
2050 pgstat_add_backend(PgStat_MsgHdr *msg)
2052 PgStat_StatDBEntry *dbentry;
2053 PgStat_StatBeEntry *beentry;
2054 PgStat_StatBeDead *deadbe;
2058 * Check that the backend ID is valid
2060 if (msg->m_backendid < 1 || msg->m_backendid > MaxBackends)
2063 (errmsg("invalid server process ID %d", msg->m_backendid)));
2068 * Get the slot for this backendid.
2070 beentry = &pgStatBeTable[msg->m_backendid - 1];
2071 if (beentry->databaseid != InvalidOid)
2074 * If the slot contains the PID of this backend, everything is
2075 * fine and we got nothing to do.
2077 if (beentry->procpid == msg->m_procpid)
2082 * Lookup if this backend is known to be dead. This can be caused due
2083 * to messages arriving in the wrong order - i.e. Postmaster's BETERM
2084 * message might have arrived before we received all the backends
2085 * stats messages, or even a new backend with the same backendid was
2086 * faster in sending his BESTART.
2088 * If the backend is known to be dead, we ignore this add.
2090 deadbe = (PgStat_StatBeDead *) hash_search(pgStatBeDead,
2091 (void *) &(msg->m_procpid),
2097 * Backend isn't known to be dead. If it's slot is currently used, we
2098 * have to kick out the old backend.
2100 if (beentry->databaseid != InvalidOid)
2101 pgstat_sub_backend(beentry->procpid);
2104 * Put this new backend into the slot.
2106 beentry->databaseid = msg->m_databaseid;
2107 beentry->procpid = msg->m_procpid;
2108 beentry->userid = msg->m_userid;
2109 beentry->activity_start_sec = 0;
2110 beentry->activity_start_usec = 0;
2111 MemSet(beentry->activity, 0, PGSTAT_ACTIVITY_SIZE);
2114 * Lookup or create the database entry for this backends DB.
2116 dbentry = (PgStat_StatDBEntry *) hash_search(pgStatDBHash,
2117 (void *) &(msg->m_databaseid),
2118 HASH_ENTER, &found);
2119 if (dbentry == NULL)
2122 (errcode(ERRCODE_OUT_OF_MEMORY),
2123 errmsg("out of memory in statistics collector --- abort")));
2128 * If not found, initialize the new one.
2134 dbentry->tables = NULL;
2135 dbentry->n_xact_commit = 0;
2136 dbentry->n_xact_rollback = 0;
2137 dbentry->n_blocks_fetched = 0;
2138 dbentry->n_blocks_hit = 0;
2139 dbentry->n_connects = 0;
2140 dbentry->destroy = 0;
2142 memset(&hash_ctl, 0, sizeof(hash_ctl));
2143 hash_ctl.keysize = sizeof(Oid);
2144 hash_ctl.entrysize = sizeof(PgStat_StatTabEntry);
2145 hash_ctl.hash = tag_hash;
2146 dbentry->tables = hash_create("Per-database table",
2147 PGSTAT_TAB_HASH_SIZE,
2149 HASH_ELEM | HASH_FUNCTION);
2150 if (dbentry->tables == NULL)
2152 /* assume the problem is out-of-memory */
2154 (errcode(ERRCODE_OUT_OF_MEMORY),
2155 errmsg("out of memory in statistics collector --- abort")));
2161 * Count number of connects to the database
2163 dbentry->n_connects++;
2170 * pgstat_sub_backend() -
2172 * Remove a backend from the actual backends list.
2176 pgstat_sub_backend(int procpid)
2179 PgStat_StatBeDead *deadbe;
2183 * Search in the known-backends table for the slot containing this
2186 for (i = 0; i < MaxBackends; i++)
2188 if (pgStatBeTable[i].databaseid != InvalidOid &&
2189 pgStatBeTable[i].procpid == procpid)
2192 * That's him. Add an entry to the known to be dead backends.
2193 * Due to possible misorder in the arrival of UDP packets it's
2194 * possible that even if we know the backend is dead, there
2195 * could still be messages queued that arrive later. Those
2196 * messages must not cause our number of backends statistics
2197 * to get screwed up, so we remember for a couple of seconds
2198 * that this PID is dead and ignore them (only the counting of
2199 * backends, not the table access stats they sent).
2201 deadbe = (PgStat_StatBeDead *) hash_search(pgStatBeDead,
2208 (errcode(ERRCODE_OUT_OF_MEMORY),
2209 errmsg("out of memory in statistics collector --- abort")));
2214 deadbe->backendid = i + 1;
2215 deadbe->destroy = PGSTAT_DESTROY_COUNT;
2219 * Declare the backend slot empty.
2221 pgStatBeTable[i].databaseid = InvalidOid;
2227 * No big problem if not found. This can happen if UDP messages arrive
2228 * out of order here.
2234 * pgstat_write_statsfile() -
2240 pgstat_write_statsfile(void)
2242 HASH_SEQ_STATUS hstat;
2243 HASH_SEQ_STATUS tstat;
2244 PgStat_StatDBEntry *dbentry;
2245 PgStat_StatTabEntry *tabentry;
2246 PgStat_StatBeDead *deadbe;
2251 * Open the statistics temp file to write out the current values.
2253 fpout = fopen(pgStat_tmpfname, PG_BINARY_W);
2257 (errcode_for_file_access(),
2258 errmsg("could not open temporary statistics file \"%s\": %m",
2264 * Walk through the database table.
2266 hash_seq_init(&hstat, pgStatDBHash);
2267 while ((dbentry = (PgStat_StatDBEntry *) hash_seq_search(&hstat)) != NULL)
2270 * If this database is marked destroyed, count down and do so if
2273 if (dbentry->destroy > 0)
2275 if (--(dbentry->destroy) == 0)
2277 if (dbentry->tables != NULL)
2278 hash_destroy(dbentry->tables);
2280 if (hash_search(pgStatDBHash,
2281 (void *) &(dbentry->databaseid),
2282 HASH_REMOVE, NULL) == NULL)
2285 (errmsg("database hash table corrupted "
2286 "during cleanup --- abort")));
2292 * Don't include statistics for it.
2298 * Write out the DB line including the number of life backends.
2301 fwrite(dbentry, sizeof(PgStat_StatDBEntry), 1, fpout);
2304 * Walk through the databases access stats per table.
2306 hash_seq_init(&tstat, dbentry->tables);
2307 while ((tabentry = (PgStat_StatTabEntry *) hash_seq_search(&tstat)) != NULL)
2310 * If table entry marked for destruction, same as above for
2311 * the database entry.
2313 if (tabentry->destroy > 0)
2315 if (--(tabentry->destroy) == 0)
2317 if (hash_search(dbentry->tables,
2318 (void *) &(tabentry->tableid),
2319 HASH_REMOVE, NULL) == NULL)
2322 (errmsg("tables hash table for "
2323 "database %u corrupted during "
2324 "cleanup --- abort",
2325 dbentry->databaseid)));
2333 * At least we think this is still a life table. Print it's
2337 fwrite(tabentry, sizeof(PgStat_StatTabEntry), 1, fpout);
2341 * Mark the end of this DB
2347 * Write out the known running backends to the stats file.
2351 fwrite(&i, sizeof(i), 1, fpout);
2353 for (i = 0; i < MaxBackends; i++)
2355 if (pgStatBeTable[i].databaseid != InvalidOid)
2358 fwrite(&pgStatBeTable[i], sizeof(PgStat_StatBeEntry), 1, fpout);
2363 * No more output to be done. Close the temp file and replace the old
2364 * pgstat.stat with it.
2367 if (fclose(fpout) < 0)
2370 (errcode_for_file_access(),
2371 errmsg("could not close temporary statistics file \"%s\": %m",
2376 if (rename(pgStat_tmpfname, pgStat_fname) < 0)
2379 (errcode_for_file_access(),
2380 errmsg("could not rename temporary statistics file \"%s\" to \"%s\": %m",
2381 pgStat_tmpfname, pgStat_fname)));
2386 * Clear out the dead backends table
2388 hash_seq_init(&hstat, pgStatBeDead);
2389 while ((deadbe = (PgStat_StatBeDead *) hash_seq_search(&hstat)) != NULL)
2392 * Count down the destroy delay and remove entries where it
2395 if (--(deadbe->destroy) <= 0)
2397 if (hash_search(pgStatBeDead,
2398 (void *) &(deadbe->procpid),
2399 HASH_REMOVE, NULL) == NULL)
2402 (errmsg("dead-server-process hash table corrupted "
2403 "during cleanup --- abort")));
2412 * pgstat_read_statsfile() -
2414 * Reads in an existing statistics collector and initializes the
2415 * databases hash table (who's entries point to the tables hash tables)
2416 * and the current backend table.
2420 pgstat_read_statsfile(HTAB **dbhash, Oid onlydb,
2421 PgStat_StatBeEntry **betab, int *numbackends)
2423 PgStat_StatDBEntry *dbentry;
2424 PgStat_StatDBEntry dbbuf;
2425 PgStat_StatTabEntry *tabentry;
2426 PgStat_StatTabEntry tabbuf;
2428 HTAB *tabhash = NULL;
2430 int maxbackends = 0;
2431 int havebackends = 0;
2433 MemoryContext use_mcxt;
2437 * If running in the collector we use the DynaHashCxt memory context.
2438 * If running in a backend, we use the TopTransactionContext instead,
2439 * so the caller must only know the last XactId when this call
2440 * happened to know if his tables are still valid or already gone!
2442 if (pgStatRunningInCollector)
2449 use_mcxt = TopTransactionContext;
2450 mcxt_flags = HASH_CONTEXT;
2454 * Create the DB hashtable
2456 memset(&hash_ctl, 0, sizeof(hash_ctl));
2457 hash_ctl.keysize = sizeof(Oid);
2458 hash_ctl.entrysize = sizeof(PgStat_StatDBEntry);
2459 hash_ctl.hash = tag_hash;
2460 hash_ctl.hcxt = use_mcxt;
2461 *dbhash = hash_create("Databases hash", PGSTAT_DB_HASH_SIZE, &hash_ctl,
2462 HASH_ELEM | HASH_FUNCTION | mcxt_flags);
2463 if (*dbhash == NULL)
2465 /* assume the problem is out-of-memory */
2466 if (pgStatRunningInCollector)
2469 (errcode(ERRCODE_OUT_OF_MEMORY),
2470 errmsg("out of memory in statistics collector --- abort")));
2473 /* in backend, can do normal error */
2475 (errcode(ERRCODE_OUT_OF_MEMORY),
2476 errmsg("out of memory")));
2480 * Initialize the number of known backends to zero, just in case we do
2481 * a silent error return below.
2483 if (numbackends != NULL)
2489 * In EXEC_BACKEND case, we won't have inherited pgStat_fname from
2490 * postmaster, so compute it first time through.
2493 if (pgStat_fname[0] == '\0')
2495 Assert(DataDir != NULL);
2496 snprintf(pgStat_fname, MAXPGPATH, PGSTAT_STAT_FILENAME, DataDir);
2501 * Try to open the status file. If it doesn't exist, the backends
2502 * simply return zero for anything and the collector simply starts
2503 * from scratch with empty counters.
2505 if ((fpin = fopen(pgStat_fname, PG_BINARY_R)) == NULL)
2509 * We found an existing collector stats file. Read it and put all the
2510 * hashtable entries into place.
2514 switch (fgetc(fpin))
2517 * 'D' A PgStat_StatDBEntry struct describing a database
2518 * follows. Subsequently, zero to many 'T' entries will
2519 * follow until a 'd' is encountered.
2522 if (fread(&dbbuf, 1, sizeof(dbbuf), fpin) != sizeof(dbbuf))
2524 ereport(pgStatRunningInCollector ? LOG : WARNING,
2525 (errmsg("corrupted pgstat.stat file")));
2531 * Add to the DB hash
2533 dbentry = (PgStat_StatDBEntry *) hash_search(*dbhash,
2534 (void *) &dbbuf.databaseid,
2537 if (dbentry == NULL)
2539 if (pgStatRunningInCollector)
2542 (errcode(ERRCODE_OUT_OF_MEMORY),
2543 errmsg("out of memory in statistics collector --- abort")));
2550 (errcode(ERRCODE_OUT_OF_MEMORY),
2551 errmsg("out of memory")));
2556 ereport(pgStatRunningInCollector ? LOG : WARNING,
2557 (errmsg("corrupted pgstat.stat file")));
2562 memcpy(dbentry, &dbbuf, sizeof(PgStat_StatDBEntry));
2563 dbentry->tables = NULL;
2564 dbentry->destroy = 0;
2565 dbentry->n_backends = 0;
2568 * Don't collect tables if not the requested DB
2570 if (onlydb != InvalidOid && onlydb != dbbuf.databaseid)
2573 memset(&hash_ctl, 0, sizeof(hash_ctl));
2574 hash_ctl.keysize = sizeof(Oid);
2575 hash_ctl.entrysize = sizeof(PgStat_StatTabEntry);
2576 hash_ctl.hash = tag_hash;
2577 hash_ctl.hcxt = use_mcxt;
2578 dbentry->tables = hash_create("Per-database table",
2579 PGSTAT_TAB_HASH_SIZE,
2581 HASH_ELEM | HASH_FUNCTION | mcxt_flags);
2582 if (dbentry->tables == NULL)
2584 /* assume the problem is out-of-memory */
2585 if (pgStatRunningInCollector)
2588 (errcode(ERRCODE_OUT_OF_MEMORY),
2589 errmsg("out of memory in statistics collector --- abort")));
2592 /* in backend, can do normal error */
2595 (errcode(ERRCODE_OUT_OF_MEMORY),
2596 errmsg("out of memory")));
2600 * Arrange that following 'T's add entries to this
2601 * databases tables hash table.
2603 tabhash = dbentry->tables;
2607 * 'd' End of this database.
2614 * 'T' A PgStat_StatTabEntry follows.
2617 if (fread(&tabbuf, 1, sizeof(tabbuf), fpin) != sizeof(tabbuf))
2619 ereport(pgStatRunningInCollector ? LOG : WARNING,
2620 (errmsg("corrupted pgstat.stat file")));
2626 * Skip if table belongs to a not requested database.
2628 if (tabhash == NULL)
2631 tabentry = (PgStat_StatTabEntry *) hash_search(tabhash,
2632 (void *) &tabbuf.tableid,
2633 HASH_ENTER, &found);
2634 if (tabentry == NULL)
2636 if (pgStatRunningInCollector)
2639 (errcode(ERRCODE_OUT_OF_MEMORY),
2640 errmsg("out of memory in statistics collector --- abort")));
2643 /* in backend, can do normal error */
2646 (errcode(ERRCODE_OUT_OF_MEMORY),
2647 errmsg("out of memory")));
2652 ereport(pgStatRunningInCollector ? LOG : WARNING,
2653 (errmsg("corrupted pgstat.stat file")));
2658 memcpy(tabentry, &tabbuf, sizeof(tabbuf));
2662 * 'M' The maximum number of backends to expect follows.
2665 if (betab == NULL || numbackends == NULL)
2670 if (fread(&maxbackends, 1, sizeof(maxbackends), fpin) !=
2671 sizeof(maxbackends))
2673 ereport(pgStatRunningInCollector ? LOG : WARNING,
2674 (errmsg("corrupted pgstat.stat file")));
2678 if (maxbackends == 0)
2685 * Allocate space (in TopTransactionContext too) for the
2688 if (use_mcxt == NULL)
2689 *betab = (PgStat_StatBeEntry *) malloc(
2690 sizeof(PgStat_StatBeEntry) * maxbackends);
2692 *betab = (PgStat_StatBeEntry *) MemoryContextAlloc(
2694 sizeof(PgStat_StatBeEntry) * maxbackends);
2698 * 'B' A PgStat_StatBeEntry follows.
2701 if (betab == NULL || numbackends == NULL)
2713 * Read it directly into the table.
2715 if (fread(&(*betab)[havebackends], 1,
2716 sizeof(PgStat_StatBeEntry), fpin) !=
2717 sizeof(PgStat_StatBeEntry))
2719 ereport(pgStatRunningInCollector ? LOG : WARNING,
2720 (errmsg("corrupted pgstat.stat file")));
2726 * Count backends per database here.
2728 dbentry = (PgStat_StatDBEntry *) hash_search(*dbhash,
2729 (void *) &((*betab)[havebackends].databaseid),
2732 dbentry->n_backends++;
2735 if (numbackends != 0)
2736 *numbackends = havebackends;
2737 if (havebackends >= maxbackends)
2745 * 'E' The EOF marker of a complete stats file.
2752 ereport(pgStatRunningInCollector ? LOG : WARNING,
2753 (errmsg("corrupted pgstat.stat file")));
2764 * pgstat_recv_bestart() -
2766 * Process a backend starup message.
2770 pgstat_recv_bestart(PgStat_MsgBestart *msg, int len)
2772 pgstat_add_backend(&msg->m_hdr);
2777 * pgstat_recv_beterm() -
2779 * Process a backend termination message.
2783 pgstat_recv_beterm(PgStat_MsgBeterm *msg, int len)
2785 pgstat_sub_backend(msg->m_hdr.m_procpid);
2790 * pgstat_recv_activity() -
2792 * Remember what the backend is doing.
2796 pgstat_recv_activity(PgStat_MsgActivity *msg, int len)
2798 PgStat_StatBeEntry *entry;
2801 * Here we check explicitly for 0 return, since we don't want to
2802 * mangle the activity of an active backend by a delayed packet from a
2805 if (pgstat_add_backend(&msg->m_hdr) != 0)
2808 entry = &(pgStatBeTable[msg->m_hdr.m_backendid - 1]);
2810 StrNCpy(entry->activity, msg->m_what, PGSTAT_ACTIVITY_SIZE);
2812 entry->activity_start_sec =
2813 GetCurrentAbsoluteTimeUsec(&entry->activity_start_usec);
2818 * pgstat_recv_tabstat() -
2820 * Count what the backend has done.
2824 pgstat_recv_tabstat(PgStat_MsgTabstat *msg, int len)
2826 PgStat_TableEntry *tabmsg = &(msg->m_entry[0]);
2827 PgStat_StatDBEntry *dbentry;
2828 PgStat_StatTabEntry *tabentry;
2833 * Make sure the backend is counted for.
2835 if (pgstat_add_backend(&msg->m_hdr) < 0)
2839 * Lookup the database in the hashtable.
2841 dbentry = (PgStat_StatDBEntry *) hash_search(pgStatDBHash,
2842 (void *) &(msg->m_hdr.m_databaseid),
2848 * If the database is marked for destroy, this is a delayed UDP packet
2849 * and not worth being counted.
2851 if (dbentry->destroy > 0)
2854 dbentry->n_xact_commit += (PgStat_Counter) (msg->m_xact_commit);
2855 dbentry->n_xact_rollback += (PgStat_Counter) (msg->m_xact_rollback);
2858 * Process all table entries in the message.
2860 for (i = 0; i < msg->m_nentries; i++)
2862 tabentry = (PgStat_StatTabEntry *) hash_search(dbentry->tables,
2863 (void *) &(tabmsg[i].t_id),
2864 HASH_ENTER, &found);
2865 if (tabentry == NULL)
2868 (errcode(ERRCODE_OUT_OF_MEMORY),
2869 errmsg("out of memory in statistics collector --- abort")));
2876 * If it's a new table entry, initialize counters to the
2877 * values we just got.
2879 tabentry->numscans = tabmsg[i].t_numscans;
2880 tabentry->tuples_returned = tabmsg[i].t_tuples_returned;
2881 tabentry->tuples_fetched = tabmsg[i].t_tuples_fetched;
2882 tabentry->tuples_inserted = tabmsg[i].t_tuples_inserted;
2883 tabentry->tuples_updated = tabmsg[i].t_tuples_updated;
2884 tabentry->tuples_deleted = tabmsg[i].t_tuples_deleted;
2885 tabentry->blocks_fetched = tabmsg[i].t_blocks_fetched;
2886 tabentry->blocks_hit = tabmsg[i].t_blocks_hit;
2888 tabentry->destroy = 0;
2893 * Otherwise add the values to the existing entry.
2895 tabentry->numscans += tabmsg[i].t_numscans;
2896 tabentry->tuples_returned += tabmsg[i].t_tuples_returned;
2897 tabentry->tuples_fetched += tabmsg[i].t_tuples_fetched;
2898 tabentry->tuples_inserted += tabmsg[i].t_tuples_inserted;
2899 tabentry->tuples_updated += tabmsg[i].t_tuples_updated;
2900 tabentry->tuples_deleted += tabmsg[i].t_tuples_deleted;
2901 tabentry->blocks_fetched += tabmsg[i].t_blocks_fetched;
2902 tabentry->blocks_hit += tabmsg[i].t_blocks_hit;
2906 * And add the block IO to the database entry.
2908 dbentry->n_blocks_fetched += tabmsg[i].t_blocks_fetched;
2909 dbentry->n_blocks_hit += tabmsg[i].t_blocks_hit;
2915 * pgstat_recv_tabpurge() -
2917 * Arrange for dead table removal.
2921 pgstat_recv_tabpurge(PgStat_MsgTabpurge *msg, int len)
2923 PgStat_StatDBEntry *dbentry;
2924 PgStat_StatTabEntry *tabentry;
2928 * Make sure the backend is counted for.
2930 if (pgstat_add_backend(&msg->m_hdr) < 0)
2934 * Lookup the database in the hashtable.
2936 dbentry = (PgStat_StatDBEntry *) hash_search(pgStatDBHash,
2937 (void *) &(msg->m_hdr.m_databaseid),
2943 * If the database is marked for destroy, this is a delayed UDP packet
2944 * and the tables will go away at DB destruction.
2946 if (dbentry->destroy > 0)
2950 * Process all table entries in the message.
2952 for (i = 0; i < msg->m_nentries; i++)
2954 tabentry = (PgStat_StatTabEntry *) hash_search(dbentry->tables,
2955 (void *) &(msg->m_tableid[i]),
2958 tabentry->destroy = PGSTAT_DESTROY_COUNT;
2964 * pgstat_recv_dropdb() -
2966 * Arrange for dead database removal
2970 pgstat_recv_dropdb(PgStat_MsgDropdb *msg, int len)
2972 PgStat_StatDBEntry *dbentry;
2975 * Make sure the backend is counted for.
2977 if (pgstat_add_backend(&msg->m_hdr) < 0)
2981 * Lookup the database in the hashtable.
2983 dbentry = (PgStat_StatDBEntry *) hash_search(pgStatDBHash,
2984 (void *) &(msg->m_databaseid),
2990 * Mark the database for destruction.
2992 dbentry->destroy = PGSTAT_DESTROY_COUNT;
2997 * pgstat_recv_dropdb() -
2999 * Arrange for dead database removal
3003 pgstat_recv_resetcounter(PgStat_MsgResetcounter *msg, int len)
3006 PgStat_StatDBEntry *dbentry;
3009 * Make sure the backend is counted for.
3011 if (pgstat_add_backend(&msg->m_hdr) < 0)
3015 * Lookup the database in the hashtable.
3017 dbentry = (PgStat_StatDBEntry *) hash_search(pgStatDBHash,
3018 (void *) &(msg->m_hdr.m_databaseid),
3024 * We simply throw away all the databases table entries by recreating
3025 * a new hash table for them.
3027 if (dbentry->tables != NULL)
3028 hash_destroy(dbentry->tables);
3030 dbentry->tables = NULL;
3031 dbentry->n_xact_commit = 0;
3032 dbentry->n_xact_rollback = 0;
3033 dbentry->n_blocks_fetched = 0;
3034 dbentry->n_blocks_hit = 0;
3035 dbentry->n_connects = 0;
3036 dbentry->destroy = 0;
3038 memset(&hash_ctl, 0, sizeof(hash_ctl));
3039 hash_ctl.keysize = sizeof(Oid);
3040 hash_ctl.entrysize = sizeof(PgStat_StatTabEntry);
3041 hash_ctl.hash = tag_hash;
3042 dbentry->tables = hash_create("Per-database table",
3043 PGSTAT_TAB_HASH_SIZE,
3045 HASH_ELEM | HASH_FUNCTION);
3046 if (dbentry->tables == NULL)
3048 /* assume the problem is out-of-memory */
3050 (errcode(ERRCODE_OUT_OF_MEMORY),
3051 errmsg("out of memory in statistics collector --- abort")));