]> granicus.if.org Git - postgresql/blob - src/backend/postmaster/pgstat.c
Allow the pgstat process to restart immediately after a receiving
[postgresql] / src / backend / postmaster / pgstat.c
1 /* ----------
2  * pgstat.c
3  *
4  *      All the statistics collector stuff hacked up in one big, ugly file.
5  *
6  *      TODO:   - Separate collector, postmaster and backend stuff
7  *                        into different files.
8  *
9  *                      - Add some automatic call for pgstat vacuuming.
10  *
11  *                      - Add a pgstat config column to pg_database, so this
12  *                        entire thing can be enabled/disabled on a per db basis.
13  *
14  *      Copyright (c) 2001-2007, PostgreSQL Global Development Group
15  *
16  *      $PostgreSQL: pgsql/src/backend/postmaster/pgstat.c,v 1.150 2007/03/22 19:53:30 momjian Exp $
17  * ----------
18  */
19 #include "postgres.h"
20
21 #include <unistd.h>
22 #include <fcntl.h>
23 #include <sys/param.h>
24 #include <sys/time.h>
25 #include <sys/socket.h>
26 #include <netdb.h>
27 #include <netinet/in.h>
28 #include <arpa/inet.h>
29 #include <signal.h>
30 #include <time.h>
31 #ifdef HAVE_POLL_H
32 #include <poll.h>
33 #endif
34 #ifdef HAVE_SYS_POLL_H
35 #include <sys/poll.h>
36 #endif
37
38 #include "pgstat.h"
39
40 #include "access/heapam.h"
41 #include "access/transam.h"
42 #include "access/xact.h"
43 #include "catalog/pg_database.h"
44 #include "libpq/ip.h"
45 #include "libpq/libpq.h"
46 #include "libpq/pqsignal.h"
47 #include "mb/pg_wchar.h"
48 #include "miscadmin.h"
49 #include "postmaster/autovacuum.h"
50 #include "postmaster/fork_process.h"
51 #include "postmaster/postmaster.h"
52 #include "storage/backendid.h"
53 #include "storage/fd.h"
54 #include "storage/ipc.h"
55 #include "storage/pg_shmem.h"
56 #include "storage/pmsignal.h"
57 #include "utils/memutils.h"
58 #include "utils/ps_status.h"
59
60
61 /* ----------
62  * Paths for the statistics files (relative to installation's $PGDATA).
63  * ----------
64  */
65 #define PGSTAT_STAT_FILENAME    "global/pgstat.stat"
66 #define PGSTAT_STAT_TMPFILE             "global/pgstat.tmp"
67
68 /* ----------
69  * Timer definitions.
70  * ----------
71  */
72 #define PGSTAT_STAT_INTERVAL    500             /* How often to write the status file;
73                                                                                  * in milliseconds. */
74
75 #define PGSTAT_RESTART_INTERVAL 60              /* How often to attempt to restart a
76                                                                                  * failed statistics collector; in
77                                                                                  * seconds. */
78
79 #define PGSTAT_SELECT_TIMEOUT   2               /* How often to check for postmaster
80                                                                                  * death; in seconds. */
81
82
83 /* ----------
84  * The initial size hints for the hash tables used in the collector.
85  * ----------
86  */
87 #define PGSTAT_DB_HASH_SIZE             16
88 #define PGSTAT_TAB_HASH_SIZE    512
89
90
91 /* ----------
92  * GUC parameters
93  * ----------
94  */
95 bool            pgstat_collect_startcollector = true;
96 bool            pgstat_collect_resetonpmstart = false;
97 bool            pgstat_collect_tuplelevel = false;
98 bool            pgstat_collect_blocklevel = false;
99 bool            pgstat_collect_querystring = false;
100
101 /* ----------
102  * Local data
103  * ----------
104  */
105 NON_EXEC_STATIC int pgStatSock = -1;
106
107 static struct sockaddr_storage pgStatAddr;
108
109 static time_t last_pgstat_start_time;
110
111 static bool pgStatRunningInCollector = false;
112
113 /*
114  * Place where backends store per-table info to be sent to the collector.
115  * We store shared relations separately from non-shared ones, to be able to
116  * send them in separate messages.
117  */
118 typedef struct TabStatArray
119 {
120         int                     tsa_alloc;              /* num allocated */
121         int                     tsa_used;               /* num actually used */
122         PgStat_MsgTabstat **tsa_messages;       /* the array itself */
123 } TabStatArray;
124
125 #define TABSTAT_QUANTUM         4       /* we alloc this many at a time */
126
127 static TabStatArray RegularTabStat = {0, 0, NULL};
128 static TabStatArray SharedTabStat = {0, 0, NULL};
129
130 static int      pgStatXactCommit = 0;
131 static int      pgStatXactRollback = 0;
132
133 static MemoryContext pgStatLocalContext = NULL;
134 static HTAB *pgStatDBHash = NULL;
135 static PgBackendStatus *localBackendStatusTable = NULL;
136 static int      localNumBackends = 0;
137
138 static volatile bool need_exit = false;
139 static volatile bool need_statwrite = false;
140
141
142 /* ----------
143  * Local function forward declarations
144  * ----------
145  */
146 #ifdef EXEC_BACKEND
147 static pid_t pgstat_forkexec(void);
148 #endif
149
150 NON_EXEC_STATIC void PgstatCollectorMain(int argc, char *argv[]);
151 static void pgstat_exit(SIGNAL_ARGS);
152 static void force_statwrite(SIGNAL_ARGS);
153 static void pgstat_beshutdown_hook(int code, Datum arg);
154
155 static PgStat_StatDBEntry *pgstat_get_db_entry(Oid databaseid, bool create);
156 static void pgstat_write_statsfile(void);
157 static HTAB *pgstat_read_statsfile(Oid onlydb);
158 static void backend_read_statsfile(void);
159 static void pgstat_read_current_status(void);
160 static HTAB *pgstat_collect_oids(Oid catalogid);
161
162 static void pgstat_setup_memcxt(void);
163
164 static void pgstat_setheader(PgStat_MsgHdr *hdr, StatMsgType mtype);
165 static void pgstat_send(void *msg, int len);
166
167 static void pgstat_recv_tabstat(PgStat_MsgTabstat *msg, int len);
168 static void pgstat_recv_tabpurge(PgStat_MsgTabpurge *msg, int len);
169 static void pgstat_recv_dropdb(PgStat_MsgDropdb *msg, int len);
170 static void pgstat_recv_resetcounter(PgStat_MsgResetcounter *msg, int len);
171 static void pgstat_recv_autovac(PgStat_MsgAutovacStart *msg, int len);
172 static void pgstat_recv_vacuum(PgStat_MsgVacuum *msg, int len);
173 static void pgstat_recv_analyze(PgStat_MsgAnalyze *msg, int len);
174
175
176 /* ------------------------------------------------------------
177  * Public functions called from postmaster follow
178  * ------------------------------------------------------------
179  */
180
181 /* ----------
182  * pgstat_init() -
183  *
184  *      Called from postmaster at startup. Create the resources required
185  *      by the statistics collector process.  If unable to do so, do not
186  *      fail --- better to let the postmaster start with stats collection
187  *      disabled.
188  * ----------
189  */
190 void
191 pgstat_init(void)
192 {
193         ACCEPT_TYPE_ARG3 alen;
194         struct addrinfo *addrs = NULL,
195                            *addr,
196                                 hints;
197         int                     ret;
198         fd_set          rset;
199         struct timeval tv;
200         char            test_byte;
201         int                     sel_res;
202         int                     tries = 0;
203
204 #define TESTBYTEVAL ((char) 199)
205
206         /*
207          * Force start of collector daemon if something to collect.  Note that
208          * pgstat_collect_querystring is now an independent facility that does not
209          * require the collector daemon.
210          */
211         if (pgstat_collect_tuplelevel ||
212                 pgstat_collect_blocklevel)
213                 pgstat_collect_startcollector = true;
214
215         /*
216          * If we don't have to start a collector or should reset the collected
217          * statistics on postmaster start, simply remove the stats file.
218          */
219         if (!pgstat_collect_startcollector || pgstat_collect_resetonpmstart)
220                 pgstat_reset_all();
221
222         /*
223          * Nothing else required if collector will not get started
224          */
225         if (!pgstat_collect_startcollector)
226                 return;
227
228         /*
229          * Create the UDP socket for sending and receiving statistic messages
230          */
231         hints.ai_flags = AI_PASSIVE;
232         hints.ai_family = PF_UNSPEC;
233         hints.ai_socktype = SOCK_DGRAM;
234         hints.ai_protocol = 0;
235         hints.ai_addrlen = 0;
236         hints.ai_addr = NULL;
237         hints.ai_canonname = NULL;
238         hints.ai_next = NULL;
239         ret = pg_getaddrinfo_all("localhost", NULL, &hints, &addrs);
240         if (ret || !addrs)
241         {
242                 ereport(LOG,
243                                 (errmsg("could not resolve \"localhost\": %s",
244                                                 gai_strerror(ret))));
245                 goto startup_failed;
246         }
247
248         /*
249          * On some platforms, pg_getaddrinfo_all() may return multiple addresses
250          * only one of which will actually work (eg, both IPv6 and IPv4 addresses
251          * when kernel will reject IPv6).  Worse, the failure may occur at the
252          * bind() or perhaps even connect() stage.      So we must loop through the
253          * results till we find a working combination. We will generate LOG
254          * messages, but no error, for bogus combinations.
255          */
256         for (addr = addrs; addr; addr = addr->ai_next)
257         {
258 #ifdef HAVE_UNIX_SOCKETS
259                 /* Ignore AF_UNIX sockets, if any are returned. */
260                 if (addr->ai_family == AF_UNIX)
261                         continue;
262 #endif
263
264                 if (++tries > 1)
265                         ereport(LOG,
266                         (errmsg("trying another address for the statistics collector")));
267
268                 /*
269                  * Create the socket.
270                  */
271                 if ((pgStatSock = socket(addr->ai_family, SOCK_DGRAM, 0)) < 0)
272                 {
273                         ereport(LOG,
274                                         (errcode_for_socket_access(),
275                         errmsg("could not create socket for statistics collector: %m")));
276                         continue;
277                 }
278
279                 /*
280                  * Bind it to a kernel assigned port on localhost and get the assigned
281                  * port via getsockname().
282                  */
283                 if (bind(pgStatSock, addr->ai_addr, addr->ai_addrlen) < 0)
284                 {
285                         ereport(LOG,
286                                         (errcode_for_socket_access(),
287                           errmsg("could not bind socket for statistics collector: %m")));
288                         closesocket(pgStatSock);
289                         pgStatSock = -1;
290                         continue;
291                 }
292
293                 alen = sizeof(pgStatAddr);
294                 if (getsockname(pgStatSock, (struct sockaddr *) & pgStatAddr, &alen) < 0)
295                 {
296                         ereport(LOG,
297                                         (errcode_for_socket_access(),
298                                          errmsg("could not get address of socket for statistics collector: %m")));
299                         closesocket(pgStatSock);
300                         pgStatSock = -1;
301                         continue;
302                 }
303
304                 /*
305                  * Connect the socket to its own address.  This saves a few cycles by
306                  * not having to respecify the target address on every send. This also
307                  * provides a kernel-level check that only packets from this same
308                  * address will be received.
309                  */
310                 if (connect(pgStatSock, (struct sockaddr *) & pgStatAddr, alen) < 0)
311                 {
312                         ereport(LOG,
313                                         (errcode_for_socket_access(),
314                         errmsg("could not connect socket for statistics collector: %m")));
315                         closesocket(pgStatSock);
316                         pgStatSock = -1;
317                         continue;
318                 }
319
320                 /*
321                  * Try to send and receive a one-byte test message on the socket. This
322                  * is to catch situations where the socket can be created but will not
323                  * actually pass data (for instance, because kernel packet filtering
324                  * rules prevent it).
325                  */
326                 test_byte = TESTBYTEVAL;
327
328 retry1:
329                 if (send(pgStatSock, &test_byte, 1, 0) != 1)
330                 {
331                         if (errno == EINTR)
332                                 goto retry1;    /* if interrupted, just retry */
333                         ereport(LOG,
334                                         (errcode_for_socket_access(),
335                                          errmsg("could not send test message on socket for statistics collector: %m")));
336                         closesocket(pgStatSock);
337                         pgStatSock = -1;
338                         continue;
339                 }
340
341                 /*
342                  * There could possibly be a little delay before the message can be
343                  * received.  We arbitrarily allow up to half a second before deciding
344                  * it's broken.
345                  */
346                 for (;;)                                /* need a loop to handle EINTR */
347                 {
348                         FD_ZERO(&rset);
349                         FD_SET(pgStatSock, &rset);
350                         tv.tv_sec = 0;
351                         tv.tv_usec = 500000;
352                         sel_res = select(pgStatSock + 1, &rset, NULL, NULL, &tv);
353                         if (sel_res >= 0 || errno != EINTR)
354                                 break;
355                 }
356                 if (sel_res < 0)
357                 {
358                         ereport(LOG,
359                                         (errcode_for_socket_access(),
360                                          errmsg("select() failed in statistics collector: %m")));
361                         closesocket(pgStatSock);
362                         pgStatSock = -1;
363                         continue;
364                 }
365                 if (sel_res == 0 || !FD_ISSET(pgStatSock, &rset))
366                 {
367                         /*
368                          * This is the case we actually think is likely, so take pains to
369                          * give a specific message for it.
370                          *
371                          * errno will not be set meaningfully here, so don't use it.
372                          */
373                         ereport(LOG,
374                                         (errcode(ERRCODE_CONNECTION_FAILURE),
375                                          errmsg("test message did not get through on socket for statistics collector")));
376                         closesocket(pgStatSock);
377                         pgStatSock = -1;
378                         continue;
379                 }
380
381                 test_byte++;                    /* just make sure variable is changed */
382
383 retry2:
384                 if (recv(pgStatSock, &test_byte, 1, 0) != 1)
385                 {
386                         if (errno == EINTR)
387                                 goto retry2;    /* if interrupted, just retry */
388                         ereport(LOG,
389                                         (errcode_for_socket_access(),
390                                          errmsg("could not receive test message on socket for statistics collector: %m")));
391                         closesocket(pgStatSock);
392                         pgStatSock = -1;
393                         continue;
394                 }
395
396                 if (test_byte != TESTBYTEVAL)   /* strictly paranoia ... */
397                 {
398                         ereport(LOG,
399                                         (errcode(ERRCODE_INTERNAL_ERROR),
400                                          errmsg("incorrect test message transmission on socket for statistics collector")));
401                         closesocket(pgStatSock);
402                         pgStatSock = -1;
403                         continue;
404                 }
405
406                 /* If we get here, we have a working socket */
407                 break;
408         }
409
410         /* Did we find a working address? */
411         if (!addr || pgStatSock < 0)
412                 goto startup_failed;
413
414         /*
415          * Set the socket to non-blocking IO.  This ensures that if the collector
416          * falls behind, statistics messages will be discarded; backends won't
417          * block waiting to send messages to the collector.
418          */
419         if (!pg_set_noblock(pgStatSock))
420         {
421                 ereport(LOG,
422                                 (errcode_for_socket_access(),
423                                  errmsg("could not set statistics collector socket to nonblocking mode: %m")));
424                 goto startup_failed;
425         }
426
427         pg_freeaddrinfo_all(hints.ai_family, addrs);
428
429         return;
430
431 startup_failed:
432         ereport(LOG,
433           (errmsg("disabling statistics collector for lack of working socket")));
434
435         if (addrs)
436                 pg_freeaddrinfo_all(hints.ai_family, addrs);
437
438         if (pgStatSock >= 0)
439                 closesocket(pgStatSock);
440         pgStatSock = -1;
441
442         /* Adjust GUC variables to suppress useless activity */
443         pgstat_collect_startcollector = false;
444         pgstat_collect_tuplelevel = false;
445         pgstat_collect_blocklevel = false;
446 }
447
448 /*
449  * pgstat_reset_all() -
450  *
451  * Remove the stats file.  This is used on server start if the
452  * stats_reset_on_server_start feature is enabled, or if WAL
453  * recovery is needed after a crash.
454  */
455 void
456 pgstat_reset_all(void)
457 {
458         unlink(PGSTAT_STAT_FILENAME);
459 }
460
461 #ifdef EXEC_BACKEND
462
463 /*
464  * pgstat_forkexec() -
465  *
466  * Format up the arglist for, then fork and exec, statistics collector process
467  */
468 static pid_t
469 pgstat_forkexec(void)
470 {
471         char       *av[10];
472         int                     ac = 0;
473
474         av[ac++] = "postgres";
475         av[ac++] = "--forkcol";
476         av[ac++] = NULL;                        /* filled in by postmaster_forkexec */
477
478         av[ac] = NULL;
479         Assert(ac < lengthof(av));
480
481         return postmaster_forkexec(ac, av);
482 }
483 #endif   /* EXEC_BACKEND */
484
485
486 /* ----------
487  * pgstat_start() -
488  *
489  *      Called from postmaster at startup or after an existing collector
490  *      died.  Attempt to fire up a fresh statistics collector.
491  *
492  *      Returns PID of child process, or 0 if fail.
493  *
494  *      Note: if fail, we will be called again from the postmaster main loop.
495  * ----------
496  */
497 int
498 pgstat_start(void)
499 {
500         time_t          curtime;
501         pid_t           pgStatPid;
502
503         /*
504          * Do nothing if no collector needed
505          */
506         if (!pgstat_collect_startcollector)
507                 return 0;
508
509         /*
510          * Do nothing if too soon since last collector start.  This is a safety
511          * valve to protect against continuous respawn attempts if the collector
512          * is dying immediately at launch.      Note that since we will be re-called
513          * from the postmaster main loop, we will get another chance later.
514          */
515         curtime = time(NULL);
516         if ((unsigned int) (curtime - last_pgstat_start_time) <
517                 (unsigned int) PGSTAT_RESTART_INTERVAL)
518                 return 0;
519         last_pgstat_start_time = curtime;
520
521         /*
522          * Check that the socket is there, else pgstat_init failed.
523          */
524         if (pgStatSock < 0)
525         {
526                 ereport(LOG,
527                                 (errmsg("statistics collector startup skipped")));
528
529                 /*
530                  * We can only get here if someone tries to manually turn
531                  * pgstat_collect_startcollector on after it had been off.
532                  */
533                 pgstat_collect_startcollector = false;
534                 return 0;
535         }
536
537         /*
538          * Okay, fork off the collector.
539          */
540 #ifdef EXEC_BACKEND
541         switch ((pgStatPid = pgstat_forkexec()))
542 #else
543         switch ((pgStatPid = fork_process()))
544 #endif
545         {
546                 case -1:
547                         ereport(LOG,
548                                         (errmsg("could not fork statistics collector: %m")));
549                         return 0;
550
551 #ifndef EXEC_BACKEND
552                 case 0:
553                         /* in postmaster child ... */
554                         /* Close the postmaster's sockets */
555                         ClosePostmasterPorts(false);
556
557                         /* Lose the postmaster's on-exit routines */
558                         on_exit_reset();
559
560                         /* Drop our connection to postmaster's shared memory, as well */
561                         PGSharedMemoryDetach();
562
563                         PgstatCollectorMain(0, NULL);
564                         break;
565 #endif
566
567                 default:
568                         return (int) pgStatPid;
569         }
570
571         /* shouldn't get here */
572         return 0;
573 }
574
575 void allow_immediate_pgstat_restart(void)
576 {
577                 last_pgstat_start_time = 0;
578 }
579
580 /* ------------------------------------------------------------
581  * Public functions used by backends follow
582  *------------------------------------------------------------
583  */
584
585
586 /* ----------
587  * pgstat_report_tabstat() -
588  *
589  *      Called from tcop/postgres.c to send the so far collected
590  *      per table access statistics to the collector.
591  * ----------
592  */
593 void
594 pgstat_report_tabstat(void)
595 {
596         int                     i;
597
598         if (pgStatSock < 0 ||
599                 (!pgstat_collect_tuplelevel &&
600                  !pgstat_collect_blocklevel))
601         {
602                 /* Not reporting stats, so just flush whatever we have */
603                 RegularTabStat.tsa_used = 0;
604                 SharedTabStat.tsa_used = 0;
605                 return;
606         }
607
608         /*
609          * For each message buffer used during the last query set the header
610          * fields and send it out.
611          */
612         for (i = 0; i < RegularTabStat.tsa_used; i++)
613         {
614                 PgStat_MsgTabstat *tsmsg = RegularTabStat.tsa_messages[i];
615                 int                     n;
616                 int                     len;
617
618                 n = tsmsg->m_nentries;
619                 len = offsetof(PgStat_MsgTabstat, m_entry[0]) +
620                         n * sizeof(PgStat_TableEntry);
621
622                 tsmsg->m_xact_commit = pgStatXactCommit;
623                 tsmsg->m_xact_rollback = pgStatXactRollback;
624                 pgStatXactCommit = 0;
625                 pgStatXactRollback = 0;
626
627                 pgstat_setheader(&tsmsg->m_hdr, PGSTAT_MTYPE_TABSTAT);
628                 tsmsg->m_databaseid = MyDatabaseId;
629                 pgstat_send(tsmsg, len);
630         }
631         RegularTabStat.tsa_used = 0;
632
633         /* Ditto, for shared relations */
634         for (i = 0; i < SharedTabStat.tsa_used; i++)
635         {
636                 PgStat_MsgTabstat *tsmsg = SharedTabStat.tsa_messages[i];
637                 int                     n;
638                 int                     len;
639
640                 n = tsmsg->m_nentries;
641                 len = offsetof(PgStat_MsgTabstat, m_entry[0]) +
642                         n * sizeof(PgStat_TableEntry);
643
644                 /* We don't report transaction commit/abort here */
645                 tsmsg->m_xact_commit = 0;
646                 tsmsg->m_xact_rollback = 0;
647
648                 pgstat_setheader(&tsmsg->m_hdr, PGSTAT_MTYPE_TABSTAT);
649                 tsmsg->m_databaseid = InvalidOid;
650                 pgstat_send(tsmsg, len);
651         }
652         SharedTabStat.tsa_used = 0;
653 }
654
655
656 /* ----------
657  * pgstat_vacuum_tabstat() -
658  *
659  *      Will tell the collector about objects he can get rid of.
660  * ----------
661  */
662 void
663 pgstat_vacuum_tabstat(void)
664 {
665         HTAB       *htab;
666         PgStat_MsgTabpurge msg;
667         HASH_SEQ_STATUS hstat;
668         PgStat_StatDBEntry *dbentry;
669         PgStat_StatTabEntry *tabentry;
670         int                     len;
671
672         if (pgStatSock < 0)
673                 return;
674
675         /*
676          * If not done for this transaction, read the statistics collector stats
677          * file into some hash tables.
678          */
679         backend_read_statsfile();
680
681         /*
682          * Read pg_database and make a list of OIDs of all existing databases
683          */
684         htab = pgstat_collect_oids(DatabaseRelationId);
685
686         /*
687          * Search the database hash table for dead databases and tell the
688          * collector to drop them.
689          */
690         hash_seq_init(&hstat, pgStatDBHash);
691         while ((dbentry = (PgStat_StatDBEntry *) hash_seq_search(&hstat)) != NULL)
692         {
693                 Oid                     dbid = dbentry->databaseid;
694
695                 CHECK_FOR_INTERRUPTS();
696
697                 if (hash_search(htab, (void *) &dbid, HASH_FIND, NULL) == NULL)
698                         pgstat_drop_database(dbid);
699         }
700
701         /* Clean up */
702         hash_destroy(htab);
703
704         /*
705          * Lookup our own database entry; if not found, nothing more to do.
706          */
707         dbentry = (PgStat_StatDBEntry *) hash_search(pgStatDBHash,
708                                                                                                  (void *) &MyDatabaseId,
709                                                                                                  HASH_FIND, NULL);
710         if (dbentry == NULL || dbentry->tables == NULL)
711                 return;
712
713         /*
714          * Similarly to above, make a list of all known relations in this DB.
715          */
716         htab = pgstat_collect_oids(RelationRelationId);
717
718         /*
719          * Initialize our messages table counter to zero
720          */
721         msg.m_nentries = 0;
722
723         /*
724          * Check for all tables listed in stats hashtable if they still exist.
725          */
726         hash_seq_init(&hstat, dbentry->tables);
727         while ((tabentry = (PgStat_StatTabEntry *) hash_seq_search(&hstat)) != NULL)
728         {
729                 Oid                     tabid = tabentry->tableid;
730
731                 CHECK_FOR_INTERRUPTS();
732
733                 if (hash_search(htab, (void *) &tabid, HASH_FIND, NULL) != NULL)
734                         continue;
735
736                 /*
737                  * Not there, so add this table's Oid to the message
738                  */
739                 msg.m_tableid[msg.m_nentries++] = tabid;
740
741                 /*
742                  * If the message is full, send it out and reinitialize to empty
743                  */
744                 if (msg.m_nentries >= PGSTAT_NUM_TABPURGE)
745                 {
746                         len = offsetof(PgStat_MsgTabpurge, m_tableid[0])
747                                 +msg.m_nentries * sizeof(Oid);
748
749                         pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_TABPURGE);
750                         msg.m_databaseid = MyDatabaseId;
751                         pgstat_send(&msg, len);
752
753                         msg.m_nentries = 0;
754                 }
755         }
756
757         /*
758          * Send the rest
759          */
760         if (msg.m_nentries > 0)
761         {
762                 len = offsetof(PgStat_MsgTabpurge, m_tableid[0])
763                         +msg.m_nentries * sizeof(Oid);
764
765                 pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_TABPURGE);
766                 msg.m_databaseid = MyDatabaseId;
767                 pgstat_send(&msg, len);
768         }
769
770         /* Clean up */
771         hash_destroy(htab);
772 }
773
774
775 /* ----------
776  * pgstat_collect_oids() -
777  *
778  *      Collect the OIDs of either all databases or all tables, according to
779  *      the parameter, into a temporary hash table.  Caller should hash_destroy
780  *      the result when done with it.
781  * ----------
782  */
783 static HTAB *
784 pgstat_collect_oids(Oid catalogid)
785 {
786         HTAB       *htab;
787         HASHCTL         hash_ctl;
788         Relation        rel;
789         HeapScanDesc scan;
790         HeapTuple       tup;
791
792         memset(&hash_ctl, 0, sizeof(hash_ctl));
793         hash_ctl.keysize = sizeof(Oid);
794         hash_ctl.entrysize = sizeof(Oid);
795         hash_ctl.hash = oid_hash;
796         htab = hash_create("Temporary table of OIDs",
797                                            PGSTAT_TAB_HASH_SIZE,
798                                            &hash_ctl,
799                                            HASH_ELEM | HASH_FUNCTION);
800
801         rel = heap_open(catalogid, AccessShareLock);
802         scan = heap_beginscan(rel, SnapshotNow, 0, NULL);
803         while ((tup = heap_getnext(scan, ForwardScanDirection)) != NULL)
804         {
805                 Oid             thisoid = HeapTupleGetOid(tup);
806
807                 CHECK_FOR_INTERRUPTS();
808
809                 (void) hash_search(htab, (void *) &thisoid, HASH_ENTER, NULL);
810         }
811         heap_endscan(scan);
812         heap_close(rel, AccessShareLock);
813
814         return htab;
815 }
816
817
818 /* ----------
819  * pgstat_drop_database() -
820  *
821  *      Tell the collector that we just dropped a database.
822  *      (If the message gets lost, we will still clean the dead DB eventually
823  *      via future invocations of pgstat_vacuum_tabstat().)
824  * ----------
825  */
826 void
827 pgstat_drop_database(Oid databaseid)
828 {
829         PgStat_MsgDropdb msg;
830
831         if (pgStatSock < 0)
832                 return;
833
834         pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_DROPDB);
835         msg.m_databaseid = databaseid;
836         pgstat_send(&msg, sizeof(msg));
837 }
838
839
840 /* ----------
841  * pgstat_drop_relation() -
842  *
843  *      Tell the collector that we just dropped a relation.
844  *      (If the message gets lost, we will still clean the dead entry eventually
845  *      via future invocations of pgstat_vacuum_tabstat().)
846  * ----------
847  */
848 void
849 pgstat_drop_relation(Oid relid)
850 {
851         PgStat_MsgTabpurge msg;
852         int                     len;
853
854         if (pgStatSock < 0)
855                 return;
856
857         msg.m_tableid[0] = relid;
858         msg.m_nentries = 1;
859
860         len = offsetof(PgStat_MsgTabpurge, m_tableid[0]) +sizeof(Oid);
861
862         pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_TABPURGE);
863         msg.m_databaseid = MyDatabaseId;
864         pgstat_send(&msg, len);
865 }
866
867
868 /* ----------
869  * pgstat_reset_counters() -
870  *
871  *      Tell the statistics collector to reset counters for our database.
872  * ----------
873  */
874 void
875 pgstat_reset_counters(void)
876 {
877         PgStat_MsgResetcounter msg;
878
879         if (pgStatSock < 0)
880                 return;
881
882         if (!superuser())
883                 ereport(ERROR,
884                                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
885                                  errmsg("must be superuser to reset statistics counters")));
886
887         pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_RESETCOUNTER);
888         msg.m_databaseid = MyDatabaseId;
889         pgstat_send(&msg, sizeof(msg));
890 }
891
892
893 /* ----------
894  * pgstat_report_autovac() -
895  *
896  *      Called from autovacuum.c to report startup of an autovacuum process.
897  *      We are called before InitPostgres is done, so can't rely on MyDatabaseId;
898  *      the db OID must be passed in, instead.
899  * ----------
900  */
901 void
902 pgstat_report_autovac(Oid dboid)
903 {
904         PgStat_MsgAutovacStart msg;
905
906         if (pgStatSock < 0)
907                 return;
908
909         pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_AUTOVAC_START);
910         msg.m_databaseid = dboid;
911         msg.m_start_time = GetCurrentTimestamp();
912
913         pgstat_send(&msg, sizeof(msg));
914 }
915
916
917 /* ---------
918  * pgstat_report_vacuum() -
919  *
920  *      Tell the collector about the table we just vacuumed.
921  * ---------
922  */
923 void
924 pgstat_report_vacuum(Oid tableoid, bool shared,
925                                          bool analyze, PgStat_Counter tuples)
926 {
927         PgStat_MsgVacuum msg;
928
929         if (pgStatSock < 0 ||
930                 !pgstat_collect_tuplelevel)
931                 return;
932
933         pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_VACUUM);
934         msg.m_databaseid = shared ? InvalidOid : MyDatabaseId;
935         msg.m_tableoid = tableoid;
936         msg.m_analyze = analyze;
937         msg.m_autovacuum = IsAutoVacuumWorkerProcess(); /* is this autovacuum? */
938         msg.m_vacuumtime = GetCurrentTimestamp();
939         msg.m_tuples = tuples;
940         pgstat_send(&msg, sizeof(msg));
941 }
942
943 /* --------
944  * pgstat_report_analyze() -
945  *
946  *      Tell the collector about the table we just analyzed.
947  * --------
948  */
949 void
950 pgstat_report_analyze(Oid tableoid, bool shared, PgStat_Counter livetuples,
951                                           PgStat_Counter deadtuples)
952 {
953         PgStat_MsgAnalyze msg;
954
955         if (pgStatSock < 0 ||
956                 !pgstat_collect_tuplelevel)
957                 return;
958
959         pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_ANALYZE);
960         msg.m_databaseid = shared ? InvalidOid : MyDatabaseId;
961         msg.m_tableoid = tableoid;
962         msg.m_autovacuum = IsAutoVacuumWorkerProcess(); /* is this autovacuum? */
963         msg.m_analyzetime = GetCurrentTimestamp();
964         msg.m_live_tuples = livetuples;
965         msg.m_dead_tuples = deadtuples;
966         pgstat_send(&msg, sizeof(msg));
967 }
968
969
970 /* ----------
971  * pgstat_ping() -
972  *
973  *      Send some junk data to the collector to increase traffic.
974  * ----------
975  */
976 void
977 pgstat_ping(void)
978 {
979         PgStat_MsgDummy msg;
980
981         if (pgStatSock < 0)
982                 return;
983
984         pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_DUMMY);
985         pgstat_send(&msg, sizeof(msg));
986 }
987
988 /*
989  * Enlarge a TabStatArray
990  */
991 static void
992 more_tabstat_space(TabStatArray *tsarr)
993 {
994         PgStat_MsgTabstat *newMessages;
995         PgStat_MsgTabstat **msgArray;
996         int                     newAlloc;
997         int                     i;
998
999         AssertArg(PointerIsValid(tsarr));
1000
1001         newAlloc = tsarr->tsa_alloc + TABSTAT_QUANTUM;
1002
1003         /* Create (another) quantum of message buffers */
1004         newMessages = (PgStat_MsgTabstat *)
1005                 MemoryContextAllocZero(TopMemoryContext,
1006                                                            sizeof(PgStat_MsgTabstat) * TABSTAT_QUANTUM);
1007
1008         /* Create or enlarge the pointer array */
1009         if (tsarr->tsa_messages == NULL)
1010                 msgArray = (PgStat_MsgTabstat **)
1011                         MemoryContextAlloc(TopMemoryContext,
1012                                                            sizeof(PgStat_MsgTabstat *) * newAlloc);
1013         else
1014                 msgArray = (PgStat_MsgTabstat **)
1015                         repalloc(tsarr->tsa_messages,
1016                                          sizeof(PgStat_MsgTabstat *) * newAlloc);
1017
1018         for (i = 0; i < TABSTAT_QUANTUM; i++)
1019                 msgArray[tsarr->tsa_alloc + i] = newMessages++;
1020         tsarr->tsa_messages = msgArray;
1021         tsarr->tsa_alloc = newAlloc;
1022
1023         Assert(tsarr->tsa_used < tsarr->tsa_alloc);
1024 }
1025
1026 /* ----------
1027  * pgstat_initstats() -
1028  *
1029  *      Called from various places usually dealing with initialization
1030  *      of Relation or Scan structures. The data placed into these
1031  *      structures from here tell where later to count for buffer reads,
1032  *      scans and tuples fetched.
1033  * ----------
1034  */
1035 void
1036 pgstat_initstats(PgStat_Info *stats, Relation rel)
1037 {
1038         Oid                     rel_id = rel->rd_id;
1039         PgStat_TableEntry *useent;
1040         TabStatArray *tsarr;
1041         PgStat_MsgTabstat *tsmsg;
1042         int                     mb;
1043         int                     i;
1044
1045         /*
1046          * Initialize data not to count at all.
1047          */
1048         stats->tabentry = NULL;
1049
1050         if (pgStatSock < 0 ||
1051                 !(pgstat_collect_tuplelevel ||
1052                   pgstat_collect_blocklevel))
1053                 return;
1054
1055         tsarr = rel->rd_rel->relisshared ? &SharedTabStat : &RegularTabStat;
1056
1057         /*
1058          * Search the already-used message slots for this relation.
1059          */
1060         for (mb = 0; mb < tsarr->tsa_used; mb++)
1061         {
1062                 tsmsg = tsarr->tsa_messages[mb];
1063
1064                 for (i = tsmsg->m_nentries; --i >= 0;)
1065                 {
1066                         if (tsmsg->m_entry[i].t_id == rel_id)
1067                         {
1068                                 stats->tabentry = (void *) &(tsmsg->m_entry[i]);
1069                                 return;
1070                         }
1071                 }
1072
1073                 if (tsmsg->m_nentries >= PGSTAT_NUM_TABENTRIES)
1074                         continue;
1075
1076                 /*
1077                  * Not found, but found a message buffer with an empty slot instead.
1078                  * Fine, let's use this one.
1079                  */
1080                 i = tsmsg->m_nentries++;
1081                 useent = &tsmsg->m_entry[i];
1082                 MemSet(useent, 0, sizeof(PgStat_TableEntry));
1083                 useent->t_id = rel_id;
1084                 stats->tabentry = (void *) useent;
1085                 return;
1086         }
1087
1088         /*
1089          * If we ran out of message buffers, we just allocate more.
1090          */
1091         if (tsarr->tsa_used >= tsarr->tsa_alloc)
1092                 more_tabstat_space(tsarr);
1093
1094         /*
1095          * Use the first entry of the next message buffer.
1096          */
1097         mb = tsarr->tsa_used++;
1098         tsmsg = tsarr->tsa_messages[mb];
1099         tsmsg->m_nentries = 1;
1100         useent = &tsmsg->m_entry[0];
1101         MemSet(useent, 0, sizeof(PgStat_TableEntry));
1102         useent->t_id = rel_id;
1103         stats->tabentry = (void *) useent;
1104 }
1105
1106
1107 /* ----------
1108  * pgstat_count_xact_commit() -
1109  *
1110  *      Called from access/transam/xact.c to count transaction commits.
1111  * ----------
1112  */
1113 void
1114 pgstat_count_xact_commit(void)
1115 {
1116         if (!pgstat_collect_tuplelevel &&
1117                 !pgstat_collect_blocklevel)
1118                 return;
1119
1120         pgStatXactCommit++;
1121
1122         /*
1123          * If there was no relation activity yet, just make one existing message
1124          * buffer used without slots, causing the next report to tell new
1125          * xact-counters.
1126          */
1127         if (RegularTabStat.tsa_alloc == 0)
1128                 more_tabstat_space(&RegularTabStat);
1129
1130         if (RegularTabStat.tsa_used == 0)
1131         {
1132                 RegularTabStat.tsa_used++;
1133                 RegularTabStat.tsa_messages[0]->m_nentries = 0;
1134         }
1135 }
1136
1137
1138 /* ----------
1139  * pgstat_count_xact_rollback() -
1140  *
1141  *      Called from access/transam/xact.c to count transaction rollbacks.
1142  * ----------
1143  */
1144 void
1145 pgstat_count_xact_rollback(void)
1146 {
1147         if (!pgstat_collect_tuplelevel &&
1148                 !pgstat_collect_blocklevel)
1149                 return;
1150
1151         pgStatXactRollback++;
1152
1153         /*
1154          * If there was no relation activity yet, just make one existing message
1155          * buffer used without slots, causing the next report to tell new
1156          * xact-counters.
1157          */
1158         if (RegularTabStat.tsa_alloc == 0)
1159                 more_tabstat_space(&RegularTabStat);
1160
1161         if (RegularTabStat.tsa_used == 0)
1162         {
1163                 RegularTabStat.tsa_used++;
1164                 RegularTabStat.tsa_messages[0]->m_nentries = 0;
1165         }
1166 }
1167
1168
1169 /* ----------
1170  * pgstat_fetch_stat_dbentry() -
1171  *
1172  *      Support function for the SQL-callable pgstat* functions. Returns
1173  *      the collected statistics for one database or NULL. NULL doesn't mean
1174  *      that the database doesn't exist, it is just not yet known by the
1175  *      collector, so the caller is better off to report ZERO instead.
1176  * ----------
1177  */
1178 PgStat_StatDBEntry *
1179 pgstat_fetch_stat_dbentry(Oid dbid)
1180 {
1181         /*
1182          * If not done for this transaction, read the statistics collector stats
1183          * file into some hash tables.
1184          */
1185         backend_read_statsfile();
1186
1187         /*
1188          * Lookup the requested database; return NULL if not found
1189          */
1190         return (PgStat_StatDBEntry *) hash_search(pgStatDBHash,
1191                                                                                           (void *) &dbid,
1192                                                                                           HASH_FIND, NULL);
1193 }
1194
1195
1196 /* ----------
1197  * pgstat_fetch_stat_tabentry() -
1198  *
1199  *      Support function for the SQL-callable pgstat* functions. Returns
1200  *      the collected statistics for one table or NULL. NULL doesn't mean
1201  *      that the table doesn't exist, it is just not yet known by the
1202  *      collector, so the caller is better off to report ZERO instead.
1203  * ----------
1204  */
1205 PgStat_StatTabEntry *
1206 pgstat_fetch_stat_tabentry(Oid relid)
1207 {
1208         Oid                     dbid;
1209         PgStat_StatDBEntry *dbentry;
1210         PgStat_StatTabEntry *tabentry;
1211
1212         /*
1213          * If not done for this transaction, read the statistics collector stats
1214          * file into some hash tables.
1215          */
1216         backend_read_statsfile();
1217
1218         /*
1219          * Lookup our database, then look in its table hash table.
1220          */
1221         dbid = MyDatabaseId;
1222         dbentry = (PgStat_StatDBEntry *) hash_search(pgStatDBHash,
1223                                                                                                  (void *) &dbid,
1224                                                                                                  HASH_FIND, NULL);
1225         if (dbentry != NULL && dbentry->tables != NULL)
1226         {
1227                 tabentry = (PgStat_StatTabEntry *) hash_search(dbentry->tables,
1228                                                                                                            (void *) &relid,
1229                                                                                                            HASH_FIND, NULL);
1230                 if (tabentry)
1231                         return tabentry;
1232         }
1233
1234         /*
1235          * If we didn't find it, maybe it's a shared table.
1236          */
1237         dbid = InvalidOid;
1238         dbentry = (PgStat_StatDBEntry *) hash_search(pgStatDBHash,
1239                                                                                                  (void *) &dbid,
1240                                                                                                  HASH_FIND, NULL);
1241         if (dbentry != NULL && dbentry->tables != NULL)
1242         {
1243                 tabentry = (PgStat_StatTabEntry *) hash_search(dbentry->tables,
1244                                                                                                            (void *) &relid,
1245                                                                                                            HASH_FIND, NULL);
1246                 if (tabentry)
1247                         return tabentry;
1248         }
1249
1250         return NULL;
1251 }
1252
1253
1254 /* ----------
1255  * pgstat_fetch_stat_beentry() -
1256  *
1257  *      Support function for the SQL-callable pgstat* functions. Returns
1258  *      our local copy of the current-activity entry for one backend.
1259  *
1260  *      NB: caller is responsible for a check if the user is permitted to see
1261  *      this info (especially the querystring).
1262  * ----------
1263  */
1264 PgBackendStatus *
1265 pgstat_fetch_stat_beentry(int beid)
1266 {
1267         pgstat_read_current_status();
1268
1269         if (beid < 1 || beid > localNumBackends)
1270                 return NULL;
1271
1272         return &localBackendStatusTable[beid - 1];
1273 }
1274
1275
1276 /* ----------
1277  * pgstat_fetch_stat_numbackends() -
1278  *
1279  *      Support function for the SQL-callable pgstat* functions. Returns
1280  *      the maximum current backend id.
1281  * ----------
1282  */
1283 int
1284 pgstat_fetch_stat_numbackends(void)
1285 {
1286         pgstat_read_current_status();
1287
1288         return localNumBackends;
1289 }
1290
1291
1292 /* ------------------------------------------------------------
1293  * Functions for management of the shared-memory PgBackendStatus array
1294  * ------------------------------------------------------------
1295  */
1296
1297 static PgBackendStatus *BackendStatusArray = NULL;
1298 static PgBackendStatus *MyBEEntry = NULL;
1299
1300
1301 /*
1302  * Report shared-memory space needed by CreateSharedBackendStatus.
1303  */
1304 Size
1305 BackendStatusShmemSize(void)
1306 {
1307         Size            size;
1308
1309         size = mul_size(sizeof(PgBackendStatus), MaxBackends);
1310         return size;
1311 }
1312
1313 /*
1314  * Initialize the shared status array during postmaster startup.
1315  */
1316 void
1317 CreateSharedBackendStatus(void)
1318 {
1319         Size            size = BackendStatusShmemSize();
1320         bool            found;
1321
1322         /* Create or attach to the shared array */
1323         BackendStatusArray = (PgBackendStatus *)
1324                 ShmemInitStruct("Backend Status Array", size, &found);
1325
1326         if (!found)
1327         {
1328                 /*
1329                  * We're the first - initialize.
1330                  */
1331                 MemSet(BackendStatusArray, 0, size);
1332         }
1333 }
1334
1335
1336 /* ----------
1337  * pgstat_bestart() -
1338  *
1339  *      Initialize this backend's entry in the PgBackendStatus array,
1340  *      and set up an on-proc-exit hook that will clear it again.
1341  *      Called from InitPostgres.  MyBackendId and MyDatabaseId must be set.
1342  * ----------
1343  */
1344 void
1345 pgstat_bestart(void)
1346 {
1347         volatile PgBackendStatus *beentry;
1348         TimestampTz proc_start_timestamp;
1349         Oid                     userid;
1350         SockAddr        clientaddr;
1351
1352         Assert(MyBackendId >= 1 && MyBackendId <= MaxBackends);
1353         MyBEEntry = &BackendStatusArray[MyBackendId - 1];
1354
1355         /*
1356          * To minimize the time spent modifying the entry, fetch all the needed
1357          * data first.
1358          *
1359          * If we have a MyProcPort, use its session start time (for consistency,
1360          * and to save a kernel call).
1361          */
1362         if (MyProcPort)
1363                 proc_start_timestamp = MyProcPort->SessionStartTime;
1364         else
1365                 proc_start_timestamp = GetCurrentTimestamp();
1366         userid = GetSessionUserId();
1367
1368         /*
1369          * We may not have a MyProcPort (eg, if this is the autovacuum process).
1370          * If so, use all-zeroes client address, which is dealt with specially in
1371          * pg_stat_get_backend_client_addr and pg_stat_get_backend_client_port.
1372          */
1373         if (MyProcPort)
1374                 memcpy(&clientaddr, &MyProcPort->raddr, sizeof(clientaddr));
1375         else
1376                 MemSet(&clientaddr, 0, sizeof(clientaddr));
1377
1378         /*
1379          * Initialize my status entry, following the protocol of bumping
1380          * st_changecount before and after; and make sure it's even afterwards. We
1381          * use a volatile pointer here to ensure the compiler doesn't try to get
1382          * cute.
1383          */
1384         beentry = MyBEEntry;
1385         do
1386         {
1387                 beentry->st_changecount++;
1388         } while ((beentry->st_changecount & 1) == 0);
1389
1390         beentry->st_procpid = MyProcPid;
1391         beentry->st_proc_start_timestamp = proc_start_timestamp;
1392         beentry->st_activity_start_timestamp = 0;
1393         beentry->st_txn_start_timestamp = 0;
1394         beentry->st_databaseid = MyDatabaseId;
1395         beentry->st_userid = userid;
1396         beentry->st_clientaddr = clientaddr;
1397         beentry->st_waiting = false;
1398         beentry->st_activity[0] = '\0';
1399         /* Also make sure the last byte in the string area is always 0 */
1400         beentry->st_activity[PGBE_ACTIVITY_SIZE - 1] = '\0';
1401
1402         beentry->st_changecount++;
1403         Assert((beentry->st_changecount & 1) == 0);
1404
1405         /*
1406          * Set up a process-exit hook to clean up.
1407          */
1408         on_shmem_exit(pgstat_beshutdown_hook, 0);
1409 }
1410
1411 /*
1412  * Shut down a single backend's statistics reporting at process exit.
1413  *
1414  * Flush any remaining statistics counts out to the collector.
1415  * Without this, operations triggered during backend exit (such as
1416  * temp table deletions) won't be counted.
1417  *
1418  * Lastly, clear out our entry in the PgBackendStatus array.
1419  */
1420 static void
1421 pgstat_beshutdown_hook(int code, Datum arg)
1422 {
1423         volatile PgBackendStatus *beentry = MyBEEntry;
1424
1425         pgstat_report_tabstat();
1426
1427         /*
1428          * Clear my status entry, following the protocol of bumping st_changecount
1429          * before and after.  We use a volatile pointer here to ensure the
1430          * compiler doesn't try to get cute.
1431          */
1432         beentry->st_changecount++;
1433
1434         beentry->st_procpid = 0;        /* mark invalid */
1435
1436         beentry->st_changecount++;
1437         Assert((beentry->st_changecount & 1) == 0);
1438 }
1439
1440
1441 /* ----------
1442  * pgstat_report_activity() -
1443  *
1444  *      Called from tcop/postgres.c to report what the backend is actually doing
1445  *      (usually "<IDLE>" or the start of the query to be executed).
1446  * ----------
1447  */
1448 void
1449 pgstat_report_activity(const char *cmd_str)
1450 {
1451         volatile PgBackendStatus *beentry = MyBEEntry;
1452         TimestampTz start_timestamp;
1453         int                     len;
1454
1455         if (!pgstat_collect_querystring || !beentry)
1456                 return;
1457
1458         /*
1459          * To minimize the time spent modifying the entry, fetch all the needed
1460          * data first.
1461          */
1462         start_timestamp = GetCurrentStatementStartTimestamp();
1463
1464         len = strlen(cmd_str);
1465         len = pg_mbcliplen(cmd_str, len, PGBE_ACTIVITY_SIZE - 1);
1466
1467         /*
1468          * Update my status entry, following the protocol of bumping
1469          * st_changecount before and after.  We use a volatile pointer here to
1470          * ensure the compiler doesn't try to get cute.
1471          */
1472         beentry->st_changecount++;
1473
1474         beentry->st_activity_start_timestamp = start_timestamp;
1475         memcpy((char *) beentry->st_activity, cmd_str, len);
1476         beentry->st_activity[len] = '\0';
1477
1478         beentry->st_changecount++;
1479         Assert((beentry->st_changecount & 1) == 0);
1480 }
1481
1482 /*
1483  * Set the current transaction start timestamp to the specified
1484  * value. If there is no current active transaction, this is signified
1485  * by 0.
1486  */
1487 void
1488 pgstat_report_txn_timestamp(TimestampTz tstamp)
1489 {
1490         volatile PgBackendStatus *beentry = MyBEEntry;
1491
1492         if (!pgstat_collect_querystring || !beentry)
1493                 return;
1494
1495         /*
1496          * Update my status entry, following the protocol of bumping
1497          * st_changecount before and after.  We use a volatile pointer
1498          * here to ensure the compiler doesn't try to get cute.
1499          */
1500         beentry->st_changecount++;
1501         beentry->st_txn_start_timestamp = tstamp;
1502         beentry->st_changecount++;
1503         Assert((beentry->st_changecount & 1) == 0);
1504 }
1505
1506 /* ----------
1507  * pgstat_report_waiting() -
1508  *
1509  *      Called from lock manager to report beginning or end of a lock wait.
1510  *
1511  * NB: this *must* be able to survive being called before MyBEEntry has been
1512  * initialized.
1513  * ----------
1514  */
1515 void
1516 pgstat_report_waiting(bool waiting)
1517 {
1518         volatile PgBackendStatus *beentry = MyBEEntry;
1519
1520         if (!pgstat_collect_querystring || !beentry)
1521                 return;
1522
1523         /*
1524          * Since this is a single-byte field in a struct that only this process
1525          * may modify, there seems no need to bother with the st_changecount
1526          * protocol.  The update must appear atomic in any case.
1527          */
1528         beentry->st_waiting = waiting;
1529 }
1530
1531
1532 /* ----------
1533  * pgstat_read_current_status() -
1534  *
1535  *      Copy the current contents of the PgBackendStatus array to local memory,
1536  *      if not already done in this transaction.
1537  * ----------
1538  */
1539 static void
1540 pgstat_read_current_status(void)
1541 {
1542         volatile PgBackendStatus *beentry;
1543         PgBackendStatus *localtable;
1544         PgBackendStatus *localentry;
1545         int                     i;
1546
1547         Assert(!pgStatRunningInCollector);
1548         if (localBackendStatusTable)
1549                 return;                                 /* already done */
1550
1551         pgstat_setup_memcxt();
1552
1553         localtable = (PgBackendStatus *)
1554                 MemoryContextAlloc(pgStatLocalContext,
1555                                                    sizeof(PgBackendStatus) * MaxBackends);
1556         localNumBackends = 0;
1557
1558         beentry = BackendStatusArray;
1559         localentry = localtable;
1560         for (i = 1; i <= MaxBackends; i++)
1561         {
1562                 /*
1563                  * Follow the protocol of retrying if st_changecount changes while we
1564                  * copy the entry, or if it's odd.  (The check for odd is needed to
1565                  * cover the case where we are able to completely copy the entry while
1566                  * the source backend is between increment steps.)      We use a volatile
1567                  * pointer here to ensure the compiler doesn't try to get cute.
1568                  */
1569                 for (;;)
1570                 {
1571                         int                     save_changecount = beentry->st_changecount;
1572
1573                         /*
1574                          * XXX if PGBE_ACTIVITY_SIZE is really large, it might be best to
1575                          * use strcpy not memcpy for copying the activity string?
1576                          */
1577                         memcpy(localentry, (char *) beentry, sizeof(PgBackendStatus));
1578
1579                         if (save_changecount == beentry->st_changecount &&
1580                                 (save_changecount & 1) == 0)
1581                                 break;
1582
1583                         /* Make sure we can break out of loop if stuck... */
1584                         CHECK_FOR_INTERRUPTS();
1585                 }
1586
1587                 beentry++;
1588                 /* Only valid entries get included into the local array */
1589                 if (localentry->st_procpid > 0)
1590                 {
1591                         localentry++;
1592                         localNumBackends++;
1593                 }
1594         }
1595
1596         /* Set the pointer only after completion of a valid table */
1597         localBackendStatusTable = localtable;
1598 }
1599
1600
1601 /* ------------------------------------------------------------
1602  * Local support functions follow
1603  * ------------------------------------------------------------
1604  */
1605
1606
1607 /* ----------
1608  * pgstat_setheader() -
1609  *
1610  *              Set common header fields in a statistics message
1611  * ----------
1612  */
1613 static void
1614 pgstat_setheader(PgStat_MsgHdr *hdr, StatMsgType mtype)
1615 {
1616         hdr->m_type = mtype;
1617 }
1618
1619
1620 /* ----------
1621  * pgstat_send() -
1622  *
1623  *              Send out one statistics message to the collector
1624  * ----------
1625  */
1626 static void
1627 pgstat_send(void *msg, int len)
1628 {
1629         int                     rc;
1630
1631         if (pgStatSock < 0)
1632                 return;
1633
1634         ((PgStat_MsgHdr *) msg)->m_size = len;
1635
1636         /* We'll retry after EINTR, but ignore all other failures */
1637         do
1638         {
1639                 rc = send(pgStatSock, msg, len, 0);
1640         } while (rc < 0 && errno == EINTR);
1641
1642 #ifdef USE_ASSERT_CHECKING
1643         /* In debug builds, log send failures ... */
1644         if (rc < 0)
1645                 elog(LOG, "could not send to statistics collector: %m");
1646 #endif
1647 }
1648
1649
1650 /* ----------
1651  * PgstatCollectorMain() -
1652  *
1653  *      Start up the statistics collector process.      This is the body of the
1654  *      postmaster child process.
1655  *
1656  *      The argc/argv parameters are valid only in EXEC_BACKEND case.
1657  * ----------
1658  */
1659 NON_EXEC_STATIC void
1660 PgstatCollectorMain(int argc, char *argv[])
1661 {
1662         struct itimerval write_timeout;
1663         bool            need_timer = false;
1664         int                     len;
1665         PgStat_Msg      msg;
1666
1667 #ifndef WIN32
1668 #ifdef HAVE_POLL
1669         struct pollfd input_fd;
1670 #else
1671         struct timeval sel_timeout;
1672         fd_set          rfds;
1673 #endif
1674 #endif
1675
1676         IsUnderPostmaster = true;       /* we are a postmaster subprocess now */
1677
1678         MyProcPid = getpid();           /* reset MyProcPid */
1679
1680         /*
1681          * If possible, make this process a group leader, so that the postmaster
1682          * can signal any child processes too.  (pgstat probably never has
1683          * any child processes, but for consistency we make all postmaster
1684          * child processes do this.)
1685          */
1686 #ifdef HAVE_SETSID
1687         if (setsid() < 0)
1688                 elog(FATAL, "setsid() failed: %m");
1689 #endif
1690
1691         /*
1692          * Ignore all signals usually bound to some action in the postmaster,
1693          * except SIGQUIT and SIGALRM.
1694          */
1695         pqsignal(SIGHUP, SIG_IGN);
1696         pqsignal(SIGINT, SIG_IGN);
1697         pqsignal(SIGTERM, SIG_IGN);
1698         pqsignal(SIGQUIT, pgstat_exit);
1699         pqsignal(SIGALRM, force_statwrite);
1700         pqsignal(SIGPIPE, SIG_IGN);
1701         pqsignal(SIGUSR1, SIG_IGN);
1702         pqsignal(SIGUSR2, SIG_IGN);
1703         pqsignal(SIGCHLD, SIG_DFL);
1704         pqsignal(SIGTTIN, SIG_DFL);
1705         pqsignal(SIGTTOU, SIG_DFL);
1706         pqsignal(SIGCONT, SIG_DFL);
1707         pqsignal(SIGWINCH, SIG_DFL);
1708         PG_SETMASK(&UnBlockSig);
1709
1710         /*
1711          * Identify myself via ps
1712          */
1713         init_ps_display("stats collector process", "", "", "");
1714
1715         /*
1716          * Arrange to write the initial status file right away
1717          */
1718         need_statwrite = true;
1719
1720         /* Preset the delay between status file writes */
1721         MemSet(&write_timeout, 0, sizeof(struct itimerval));
1722         write_timeout.it_value.tv_sec = PGSTAT_STAT_INTERVAL / 1000;
1723         write_timeout.it_value.tv_usec = (PGSTAT_STAT_INTERVAL % 1000) * 1000;
1724
1725         /*
1726          * Read in an existing statistics stats file or initialize the stats to
1727          * zero.
1728          */
1729         pgStatRunningInCollector = true;
1730         pgStatDBHash = pgstat_read_statsfile(InvalidOid);
1731
1732         /*
1733          * Setup the descriptor set for select(2).      Since only one bit in the set
1734          * ever changes, we need not repeat FD_ZERO each time.
1735          */
1736 #if !defined(HAVE_POLL) && !defined(WIN32)
1737         FD_ZERO(&rfds);
1738 #endif
1739
1740         /*
1741          * Loop to process messages until we get SIGQUIT or detect ungraceful
1742          * death of our parent postmaster.
1743          *
1744          * For performance reasons, we don't want to do a PostmasterIsAlive() test
1745          * after every message; instead, do it at statwrite time and if
1746          * select()/poll() is interrupted by timeout.
1747          */
1748         for (;;)
1749         {
1750                 int                     got_data;
1751
1752                 /*
1753                  * Quit if we get SIGQUIT from the postmaster.
1754                  */
1755                 if (need_exit)
1756                         break;
1757
1758                 /*
1759                  * If time to write the stats file, do so.      Note that the alarm
1760                  * interrupt isn't re-enabled immediately, but only after we next
1761                  * receive a stats message; so no cycles are wasted when there is
1762                  * nothing going on.
1763                  */
1764                 if (need_statwrite)
1765                 {
1766                         /* Check for postmaster death; if so we'll write file below */
1767                         if (!PostmasterIsAlive(true))
1768                                 break;
1769
1770                         pgstat_write_statsfile();
1771                         need_statwrite = false;
1772                         need_timer = true;
1773                 }
1774
1775                 /*
1776                  * Wait for a message to arrive; but not for more than
1777                  * PGSTAT_SELECT_TIMEOUT seconds. (This determines how quickly we will
1778                  * shut down after an ungraceful postmaster termination; so it needn't
1779                  * be very fast.  However, on some systems SIGQUIT won't interrupt the
1780                  * poll/select call, so this also limits speed of response to SIGQUIT,
1781                  * which is more important.)
1782                  *
1783                  * We use poll(2) if available, otherwise select(2).
1784                  * Win32 has its own implementation.
1785                  */
1786 #ifndef WIN32
1787 #ifdef HAVE_POLL
1788                 input_fd.fd = pgStatSock;
1789                 input_fd.events = POLLIN | POLLERR;
1790                 input_fd.revents = 0;
1791
1792                 if (poll(&input_fd, 1, PGSTAT_SELECT_TIMEOUT * 1000) < 0)
1793                 {
1794                         if (errno == EINTR)
1795                                 continue;
1796                         ereport(ERROR,
1797                                         (errcode_for_socket_access(),
1798                                          errmsg("poll() failed in statistics collector: %m")));
1799                 }
1800
1801                 got_data = (input_fd.revents != 0);
1802 #else                                                   /* !HAVE_POLL */
1803
1804                 FD_SET(pgStatSock, &rfds);
1805
1806                 /*
1807                  * timeout struct is modified by select() on some operating systems,
1808                  * so re-fill it each time.
1809                  */
1810                 sel_timeout.tv_sec = PGSTAT_SELECT_TIMEOUT;
1811                 sel_timeout.tv_usec = 0;
1812
1813                 if (select(pgStatSock + 1, &rfds, NULL, NULL, &sel_timeout) < 0)
1814                 {
1815                         if (errno == EINTR)
1816                                 continue;
1817                         ereport(ERROR,
1818                                         (errcode_for_socket_access(),
1819                                          errmsg("select() failed in statistics collector: %m")));
1820                 }
1821
1822                 got_data = FD_ISSET(pgStatSock, &rfds);
1823 #endif   /* HAVE_POLL */
1824 #else /* WIN32 */
1825                 got_data = pgwin32_waitforsinglesocket(pgStatSock, FD_READ,
1826                                                                                            PGSTAT_SELECT_TIMEOUT*1000);
1827 #endif
1828
1829                 /*
1830                  * If there is a message on the socket, read it and check for
1831                  * validity.
1832                  */
1833                 if (got_data)
1834                 {
1835                         len = recv(pgStatSock, (char *) &msg,
1836                                            sizeof(PgStat_Msg), 0);
1837                         if (len < 0)
1838                         {
1839                                 if (errno == EINTR)
1840                                         continue;
1841                                 ereport(ERROR,
1842                                                 (errcode_for_socket_access(),
1843                                                  errmsg("could not read statistics message: %m")));
1844                         }
1845
1846                         /*
1847                          * We ignore messages that are smaller than our common header
1848                          */
1849                         if (len < sizeof(PgStat_MsgHdr))
1850                                 continue;
1851
1852                         /*
1853                          * The received length must match the length in the header
1854                          */
1855                         if (msg.msg_hdr.m_size != len)
1856                                 continue;
1857
1858                         /*
1859                          * O.K. - we accept this message.  Process it.
1860                          */
1861                         switch (msg.msg_hdr.m_type)
1862                         {
1863                                 case PGSTAT_MTYPE_DUMMY:
1864                                         break;
1865
1866                                 case PGSTAT_MTYPE_TABSTAT:
1867                                         pgstat_recv_tabstat((PgStat_MsgTabstat *) &msg, len);
1868                                         break;
1869
1870                                 case PGSTAT_MTYPE_TABPURGE:
1871                                         pgstat_recv_tabpurge((PgStat_MsgTabpurge *) &msg, len);
1872                                         break;
1873
1874                                 case PGSTAT_MTYPE_DROPDB:
1875                                         pgstat_recv_dropdb((PgStat_MsgDropdb *) &msg, len);
1876                                         break;
1877
1878                                 case PGSTAT_MTYPE_RESETCOUNTER:
1879                                         pgstat_recv_resetcounter((PgStat_MsgResetcounter *) &msg,
1880                                                                                          len);
1881                                         break;
1882
1883                                 case PGSTAT_MTYPE_AUTOVAC_START:
1884                                         pgstat_recv_autovac((PgStat_MsgAutovacStart *) &msg, len);
1885                                         break;
1886
1887                                 case PGSTAT_MTYPE_VACUUM:
1888                                         pgstat_recv_vacuum((PgStat_MsgVacuum *) &msg, len);
1889                                         break;
1890
1891                                 case PGSTAT_MTYPE_ANALYZE:
1892                                         pgstat_recv_analyze((PgStat_MsgAnalyze *) &msg, len);
1893                                         break;
1894
1895                                 default:
1896                                         break;
1897                         }
1898
1899                         /*
1900                          * If this is the first message after we wrote the stats file the
1901                          * last time, enable the alarm interrupt to make it be written
1902                          * again later.
1903                          */
1904                         if (need_timer)
1905                         {
1906                                 if (setitimer(ITIMER_REAL, &write_timeout, NULL))
1907                                         ereport(ERROR,
1908                                         (errmsg("could not set statistics collector timer: %m")));
1909                                 need_timer = false;
1910                         }
1911                 }
1912                 else
1913                 {
1914                         /*
1915                          * We can only get here if the select/poll timeout elapsed. Check
1916                          * for postmaster death.
1917                          */
1918                         if (!PostmasterIsAlive(true))
1919                                 break;
1920                 }
1921         }                                                       /* end of message-processing loop */
1922
1923         /*
1924          * Save the final stats to reuse at next startup.
1925          */
1926         pgstat_write_statsfile();
1927
1928         exit(0);
1929 }
1930
1931
1932 /* SIGQUIT signal handler for collector process */
1933 static void
1934 pgstat_exit(SIGNAL_ARGS)
1935 {
1936         need_exit = true;
1937 }
1938
1939 /* SIGALRM signal handler for collector process */
1940 static void
1941 force_statwrite(SIGNAL_ARGS)
1942 {
1943         need_statwrite = true;
1944 }
1945
1946
1947 /*
1948  * Lookup the hash table entry for the specified database. If no hash
1949  * table entry exists, initialize it, if the create parameter is true.
1950  * Else, return NULL.
1951  */
1952 static PgStat_StatDBEntry *
1953 pgstat_get_db_entry(Oid databaseid, bool create)
1954 {
1955         PgStat_StatDBEntry *result;
1956         bool            found;
1957         HASHACTION      action = (create ? HASH_ENTER : HASH_FIND);
1958
1959         /* Lookup or create the hash table entry for this database */
1960         result = (PgStat_StatDBEntry *) hash_search(pgStatDBHash,
1961                                                                                                 &databaseid,
1962                                                                                                 action, &found);
1963
1964         if (!create && !found)
1965                 return NULL;
1966
1967         /* If not found, initialize the new one. */
1968         if (!found)
1969         {
1970                 HASHCTL         hash_ctl;
1971
1972                 result->tables = NULL;
1973                 result->n_xact_commit = 0;
1974                 result->n_xact_rollback = 0;
1975                 result->n_blocks_fetched = 0;
1976                 result->n_blocks_hit = 0;
1977                 result->n_tuples_returned = 0;
1978                 result->n_tuples_fetched = 0;
1979                 result->n_tuples_inserted = 0;
1980                 result->n_tuples_updated = 0;
1981                 result->n_tuples_deleted = 0;
1982                 result->last_autovac_time = 0;
1983
1984                 memset(&hash_ctl, 0, sizeof(hash_ctl));
1985                 hash_ctl.keysize = sizeof(Oid);
1986                 hash_ctl.entrysize = sizeof(PgStat_StatTabEntry);
1987                 hash_ctl.hash = oid_hash;
1988                 result->tables = hash_create("Per-database table",
1989                                                                          PGSTAT_TAB_HASH_SIZE,
1990                                                                          &hash_ctl,
1991                                                                          HASH_ELEM | HASH_FUNCTION);
1992         }
1993
1994         return result;
1995 }
1996
1997
1998 /* ----------
1999  * pgstat_write_statsfile() -
2000  *
2001  *      Tell the news.
2002  * ----------
2003  */
2004 static void
2005 pgstat_write_statsfile(void)
2006 {
2007         HASH_SEQ_STATUS hstat;
2008         HASH_SEQ_STATUS tstat;
2009         PgStat_StatDBEntry *dbentry;
2010         PgStat_StatTabEntry *tabentry;
2011         FILE       *fpout;
2012         int32           format_id;
2013
2014         /*
2015          * Open the statistics temp file to write out the current values.
2016          */
2017         fpout = fopen(PGSTAT_STAT_TMPFILE, PG_BINARY_W);
2018         if (fpout == NULL)
2019         {
2020                 ereport(LOG,
2021                                 (errcode_for_file_access(),
2022                                  errmsg("could not open temporary statistics file \"%s\": %m",
2023                                                 PGSTAT_STAT_TMPFILE)));
2024                 return;
2025         }
2026
2027         /*
2028          * Write the file header --- currently just a format ID.
2029          */
2030         format_id = PGSTAT_FILE_FORMAT_ID;
2031         fwrite(&format_id, sizeof(format_id), 1, fpout);
2032
2033         /*
2034          * Walk through the database table.
2035          */
2036         hash_seq_init(&hstat, pgStatDBHash);
2037         while ((dbentry = (PgStat_StatDBEntry *) hash_seq_search(&hstat)) != NULL)
2038         {
2039                 /*
2040                  * Write out the DB entry including the number of live backends. We
2041                  * don't write the tables pointer since it's of no use to any other
2042                  * process.
2043                  */
2044                 fputc('D', fpout);
2045                 fwrite(dbentry, offsetof(PgStat_StatDBEntry, tables), 1, fpout);
2046
2047                 /*
2048                  * Walk through the database's access stats per table.
2049                  */
2050                 hash_seq_init(&tstat, dbentry->tables);
2051                 while ((tabentry = (PgStat_StatTabEntry *) hash_seq_search(&tstat)) != NULL)
2052                 {
2053                         fputc('T', fpout);
2054                         fwrite(tabentry, sizeof(PgStat_StatTabEntry), 1, fpout);
2055                 }
2056
2057                 /*
2058                  * Mark the end of this DB
2059                  */
2060                 fputc('d', fpout);
2061         }
2062
2063         /*
2064          * No more output to be done. Close the temp file and replace the old
2065          * pgstat.stat with it.  The ferror() check replaces testing for error
2066          * after each individual fputc or fwrite above.
2067          */
2068         fputc('E', fpout);
2069
2070         if (ferror(fpout))
2071         {
2072                 ereport(LOG,
2073                                 (errcode_for_file_access(),
2074                            errmsg("could not write temporary statistics file \"%s\": %m",
2075                                           PGSTAT_STAT_TMPFILE)));
2076                 fclose(fpout);
2077                 unlink(PGSTAT_STAT_TMPFILE);
2078         }
2079         else if (fclose(fpout) < 0)
2080         {
2081                 ereport(LOG,
2082                                 (errcode_for_file_access(),
2083                            errmsg("could not close temporary statistics file \"%s\": %m",
2084                                           PGSTAT_STAT_TMPFILE)));
2085                 unlink(PGSTAT_STAT_TMPFILE);
2086         }
2087         else if (rename(PGSTAT_STAT_TMPFILE, PGSTAT_STAT_FILENAME) < 0)
2088         {
2089                 ereport(LOG,
2090                                 (errcode_for_file_access(),
2091                                  errmsg("could not rename temporary statistics file \"%s\" to \"%s\": %m",
2092                                                 PGSTAT_STAT_TMPFILE, PGSTAT_STAT_FILENAME)));
2093                 unlink(PGSTAT_STAT_TMPFILE);
2094         }
2095 }
2096
2097
2098 /* ----------
2099  * pgstat_read_statsfile() -
2100  *
2101  *      Reads in an existing statistics collector file and initializes the
2102  *      databases' hash table (whose entries point to the tables' hash tables).
2103  * ----------
2104  */
2105 static HTAB *
2106 pgstat_read_statsfile(Oid onlydb)
2107 {
2108         PgStat_StatDBEntry *dbentry;
2109         PgStat_StatDBEntry dbbuf;
2110         PgStat_StatTabEntry *tabentry;
2111         PgStat_StatTabEntry tabbuf;
2112         HASHCTL         hash_ctl;
2113         HTAB       *dbhash;
2114         HTAB       *tabhash = NULL;
2115         FILE       *fpin;
2116         int32           format_id;
2117         bool            found;
2118
2119         /*
2120          * The tables will live in pgStatLocalContext.
2121          */
2122         pgstat_setup_memcxt();
2123
2124         /*
2125          * Create the DB hashtable
2126          */
2127         memset(&hash_ctl, 0, sizeof(hash_ctl));
2128         hash_ctl.keysize = sizeof(Oid);
2129         hash_ctl.entrysize = sizeof(PgStat_StatDBEntry);
2130         hash_ctl.hash = oid_hash;
2131         hash_ctl.hcxt = pgStatLocalContext;
2132         dbhash = hash_create("Databases hash", PGSTAT_DB_HASH_SIZE, &hash_ctl,
2133                                                  HASH_ELEM | HASH_FUNCTION | HASH_CONTEXT);
2134
2135         /*
2136          * Try to open the status file. If it doesn't exist, the backends simply
2137          * return zero for anything and the collector simply starts from scratch
2138          * with empty counters.
2139          */
2140         if ((fpin = AllocateFile(PGSTAT_STAT_FILENAME, PG_BINARY_R)) == NULL)
2141                 return dbhash;
2142
2143         /*
2144          * Verify it's of the expected format.
2145          */
2146         if (fread(&format_id, 1, sizeof(format_id), fpin) != sizeof(format_id)
2147                 || format_id != PGSTAT_FILE_FORMAT_ID)
2148         {
2149                 ereport(pgStatRunningInCollector ? LOG : WARNING,
2150                                 (errmsg("corrupted pgstat.stat file")));
2151                 goto done;
2152         }
2153
2154         /*
2155          * We found an existing collector stats file. Read it and put all the
2156          * hashtable entries into place.
2157          */
2158         for (;;)
2159         {
2160                 switch (fgetc(fpin))
2161                 {
2162                                 /*
2163                                  * 'D'  A PgStat_StatDBEntry struct describing a database
2164                                  * follows. Subsequently, zero to many 'T' entries will follow
2165                                  * until a 'd' is encountered.
2166                                  */
2167                         case 'D':
2168                                 if (fread(&dbbuf, 1, offsetof(PgStat_StatDBEntry, tables),
2169                                                   fpin) != offsetof(PgStat_StatDBEntry, tables))
2170                                 {
2171                                         ereport(pgStatRunningInCollector ? LOG : WARNING,
2172                                                         (errmsg("corrupted pgstat.stat file")));
2173                                         goto done;
2174                                 }
2175
2176                                 /*
2177                                  * Add to the DB hash
2178                                  */
2179                                 dbentry = (PgStat_StatDBEntry *) hash_search(dbhash,
2180                                                                                                   (void *) &dbbuf.databaseid,
2181                                                                                                                          HASH_ENTER,
2182                                                                                                                          &found);
2183                                 if (found)
2184                                 {
2185                                         ereport(pgStatRunningInCollector ? LOG : WARNING,
2186                                                         (errmsg("corrupted pgstat.stat file")));
2187                                         goto done;
2188                                 }
2189
2190                                 memcpy(dbentry, &dbbuf, sizeof(PgStat_StatDBEntry));
2191                                 dbentry->tables = NULL;
2192
2193                                 /*
2194                                  * Don't collect tables if not the requested DB (or the
2195                                  * shared-table info)
2196                                  */
2197                                 if (onlydb != InvalidOid)
2198                                 {
2199                                         if (dbbuf.databaseid != onlydb &&
2200                                                 dbbuf.databaseid != InvalidOid)
2201                                                 break;
2202                                 }
2203
2204                                 memset(&hash_ctl, 0, sizeof(hash_ctl));
2205                                 hash_ctl.keysize = sizeof(Oid);
2206                                 hash_ctl.entrysize = sizeof(PgStat_StatTabEntry);
2207                                 hash_ctl.hash = oid_hash;
2208                                 hash_ctl.hcxt = pgStatLocalContext;
2209                                 dbentry->tables = hash_create("Per-database table",
2210                                                                                           PGSTAT_TAB_HASH_SIZE,
2211                                                                                           &hash_ctl,
2212                                                                          HASH_ELEM | HASH_FUNCTION | HASH_CONTEXT);
2213
2214                                 /*
2215                                  * Arrange that following 'T's add entries to this database's
2216                                  * tables hash table.
2217                                  */
2218                                 tabhash = dbentry->tables;
2219                                 break;
2220
2221                                 /*
2222                                  * 'd'  End of this database.
2223                                  */
2224                         case 'd':
2225                                 tabhash = NULL;
2226                                 break;
2227
2228                                 /*
2229                                  * 'T'  A PgStat_StatTabEntry follows.
2230                                  */
2231                         case 'T':
2232                                 if (fread(&tabbuf, 1, sizeof(PgStat_StatTabEntry),
2233                                                   fpin) != sizeof(PgStat_StatTabEntry))
2234                                 {
2235                                         ereport(pgStatRunningInCollector ? LOG : WARNING,
2236                                                         (errmsg("corrupted pgstat.stat file")));
2237                                         goto done;
2238                                 }
2239
2240                                 /*
2241                                  * Skip if table belongs to a not requested database.
2242                                  */
2243                                 if (tabhash == NULL)
2244                                         break;
2245
2246                                 tabentry = (PgStat_StatTabEntry *) hash_search(tabhash,
2247                                                                                                         (void *) &tabbuf.tableid,
2248                                                                                                                  HASH_ENTER, &found);
2249
2250                                 if (found)
2251                                 {
2252                                         ereport(pgStatRunningInCollector ? LOG : WARNING,
2253                                                         (errmsg("corrupted pgstat.stat file")));
2254                                         goto done;
2255                                 }
2256
2257                                 memcpy(tabentry, &tabbuf, sizeof(tabbuf));
2258                                 break;
2259
2260                                 /*
2261                                  * 'E'  The EOF marker of a complete stats file.
2262                                  */
2263                         case 'E':
2264                                 goto done;
2265
2266                         default:
2267                                 ereport(pgStatRunningInCollector ? LOG : WARNING,
2268                                                 (errmsg("corrupted pgstat.stat file")));
2269                                 goto done;
2270                 }
2271         }
2272
2273 done:
2274         FreeFile(fpin);
2275
2276         return dbhash;
2277 }
2278
2279 /*
2280  * If not already done, read the statistics collector stats file into
2281  * some hash tables.  The results will be kept until pgstat_clear_snapshot()
2282  * is called (typically, at end of transaction).
2283  */
2284 static void
2285 backend_read_statsfile(void)
2286 {
2287         /* already read it? */
2288         if (pgStatDBHash)
2289                 return;
2290         Assert(!pgStatRunningInCollector);
2291
2292         /* Autovacuum launcher wants stats about all databases */
2293         if (IsAutoVacuumLauncherProcess())
2294                 pgStatDBHash = pgstat_read_statsfile(InvalidOid);
2295         else
2296                 pgStatDBHash = pgstat_read_statsfile(MyDatabaseId);
2297 }
2298
2299
2300 /* ----------
2301  * pgstat_setup_memcxt() -
2302  *
2303  *      Create pgStatLocalContext, if not already done.
2304  * ----------
2305  */
2306 static void
2307 pgstat_setup_memcxt(void)
2308 {
2309         if (!pgStatLocalContext)
2310                 pgStatLocalContext = AllocSetContextCreate(TopMemoryContext,
2311                                                                                                    "Statistics snapshot",
2312                                                                                                    ALLOCSET_SMALL_MINSIZE,
2313                                                                                                    ALLOCSET_SMALL_INITSIZE,
2314                                                                                                    ALLOCSET_SMALL_MAXSIZE);
2315 }
2316
2317
2318 /* ----------
2319  * pgstat_clear_snapshot() -
2320  *
2321  *      Discard any data collected in the current transaction.  Any subsequent
2322  *      request will cause new snapshots to be read.
2323  *
2324  *      This is also invoked during transaction commit or abort to discard
2325  *      the no-longer-wanted snapshot.
2326  * ----------
2327  */
2328 void
2329 pgstat_clear_snapshot(void)
2330 {
2331         /* In an autovacuum worker process we keep the stats forever */
2332         if (IsAutoVacuumWorkerProcess())
2333                 return;
2334
2335         /* Release memory, if any was allocated */
2336         if (pgStatLocalContext)
2337                 MemoryContextDelete(pgStatLocalContext);
2338
2339         /* Reset variables */
2340         pgStatLocalContext = NULL;
2341         pgStatDBHash = NULL;
2342         localBackendStatusTable = NULL;
2343         localNumBackends = 0;
2344 }
2345
2346
2347 /* ----------
2348  * pgstat_recv_tabstat() -
2349  *
2350  *      Count what the backend has done.
2351  * ----------
2352  */
2353 static void
2354 pgstat_recv_tabstat(PgStat_MsgTabstat *msg, int len)
2355 {
2356         PgStat_TableEntry *tabmsg = &(msg->m_entry[0]);
2357         PgStat_StatDBEntry *dbentry;
2358         PgStat_StatTabEntry *tabentry;
2359         int                     i;
2360         bool            found;
2361
2362         dbentry = pgstat_get_db_entry(msg->m_databaseid, true);
2363
2364         /*
2365          * Update database-wide stats.
2366          */
2367         dbentry->n_xact_commit += (PgStat_Counter) (msg->m_xact_commit);
2368         dbentry->n_xact_rollback += (PgStat_Counter) (msg->m_xact_rollback);
2369
2370         /*
2371          * Process all table entries in the message.
2372          */
2373         for (i = 0; i < msg->m_nentries; i++)
2374         {
2375                 tabentry = (PgStat_StatTabEntry *) hash_search(dbentry->tables,
2376                                                                                                   (void *) &(tabmsg[i].t_id),
2377                                                                                                            HASH_ENTER, &found);
2378
2379                 if (!found)
2380                 {
2381                         /*
2382                          * If it's a new table entry, initialize counters to the values we
2383                          * just got.
2384                          */
2385                         tabentry->numscans = tabmsg[i].t_numscans;
2386                         tabentry->tuples_returned = tabmsg[i].t_tuples_returned;
2387                         tabentry->tuples_fetched = tabmsg[i].t_tuples_fetched;
2388                         tabentry->tuples_inserted = tabmsg[i].t_tuples_inserted;
2389                         tabentry->tuples_updated = tabmsg[i].t_tuples_updated;
2390                         tabentry->tuples_deleted = tabmsg[i].t_tuples_deleted;
2391
2392                         tabentry->n_live_tuples = tabmsg[i].t_tuples_inserted;
2393                         tabentry->n_dead_tuples = tabmsg[i].t_tuples_updated +
2394                                 tabmsg[i].t_tuples_deleted;
2395                         tabentry->last_anl_tuples = 0;
2396                         tabentry->vacuum_timestamp = 0;
2397                         tabentry->autovac_vacuum_timestamp = 0;
2398                         tabentry->analyze_timestamp = 0;
2399                         tabentry->autovac_analyze_timestamp = 0;
2400
2401                         tabentry->blocks_fetched = tabmsg[i].t_blocks_fetched;
2402                         tabentry->blocks_hit = tabmsg[i].t_blocks_hit;
2403                 }
2404                 else
2405                 {
2406                         /*
2407                          * Otherwise add the values to the existing entry.
2408                          */
2409                         tabentry->numscans += tabmsg[i].t_numscans;
2410                         tabentry->tuples_returned += tabmsg[i].t_tuples_returned;
2411                         tabentry->tuples_fetched += tabmsg[i].t_tuples_fetched;
2412                         tabentry->tuples_inserted += tabmsg[i].t_tuples_inserted;
2413                         tabentry->tuples_updated += tabmsg[i].t_tuples_updated;
2414                         tabentry->tuples_deleted += tabmsg[i].t_tuples_deleted;
2415
2416                         tabentry->n_live_tuples += tabmsg[i].t_tuples_inserted -
2417                                 tabmsg[i].t_tuples_deleted;
2418                         tabentry->n_dead_tuples += tabmsg[i].t_tuples_updated +
2419                                 tabmsg[i].t_tuples_deleted;
2420
2421                         tabentry->blocks_fetched += tabmsg[i].t_blocks_fetched;
2422                         tabentry->blocks_hit += tabmsg[i].t_blocks_hit;
2423                 }
2424
2425                 /*
2426                  * Add table stats to the database entry.
2427                  */
2428                 dbentry->n_tuples_returned += tabmsg[i].t_tuples_returned;
2429                 dbentry->n_tuples_fetched += tabmsg[i].t_tuples_fetched;
2430                 dbentry->n_tuples_inserted += tabmsg[i].t_tuples_inserted;
2431                 dbentry->n_tuples_updated += tabmsg[i].t_tuples_updated;
2432                 dbentry->n_tuples_deleted += tabmsg[i].t_tuples_deleted;
2433
2434                 /*
2435                  * And add the block IO to the database entry.
2436                  */
2437                 dbentry->n_blocks_fetched += tabmsg[i].t_blocks_fetched;
2438                 dbentry->n_blocks_hit += tabmsg[i].t_blocks_hit;
2439         }
2440 }
2441
2442
2443 /* ----------
2444  * pgstat_recv_tabpurge() -
2445  *
2446  *      Arrange for dead table removal.
2447  * ----------
2448  */
2449 static void
2450 pgstat_recv_tabpurge(PgStat_MsgTabpurge *msg, int len)
2451 {
2452         PgStat_StatDBEntry *dbentry;
2453         int                     i;
2454
2455         dbentry = pgstat_get_db_entry(msg->m_databaseid, false);
2456
2457         /*
2458          * No need to purge if we don't even know the database.
2459          */
2460         if (!dbentry || !dbentry->tables)
2461                 return;
2462
2463         /*
2464          * Process all table entries in the message.
2465          */
2466         for (i = 0; i < msg->m_nentries; i++)
2467         {
2468                 /* Remove from hashtable if present; we don't care if it's not. */
2469                 (void) hash_search(dbentry->tables,
2470                                                    (void *) &(msg->m_tableid[i]),
2471                                                    HASH_REMOVE, NULL);
2472         }
2473 }
2474
2475
2476 /* ----------
2477  * pgstat_recv_dropdb() -
2478  *
2479  *      Arrange for dead database removal
2480  * ----------
2481  */
2482 static void
2483 pgstat_recv_dropdb(PgStat_MsgDropdb *msg, int len)
2484 {
2485         PgStat_StatDBEntry *dbentry;
2486
2487         /*
2488          * Lookup the database in the hashtable.
2489          */
2490         dbentry = pgstat_get_db_entry(msg->m_databaseid, false);
2491
2492         /*
2493          * If found, remove it.
2494          */
2495         if (dbentry)
2496         {
2497                 if (dbentry->tables != NULL)
2498                         hash_destroy(dbentry->tables);
2499
2500                 if (hash_search(pgStatDBHash,
2501                                                 (void *) &(dbentry->databaseid),
2502                                                 HASH_REMOVE, NULL) == NULL)
2503                         ereport(ERROR,
2504                                         (errmsg("database hash table corrupted "
2505                                                         "during cleanup --- abort")));
2506         }
2507 }
2508
2509
2510 /* ----------
2511  * pgstat_recv_resetcounter() -
2512  *
2513  *      Reset the statistics for the specified database.
2514  * ----------
2515  */
2516 static void
2517 pgstat_recv_resetcounter(PgStat_MsgResetcounter *msg, int len)
2518 {
2519         HASHCTL         hash_ctl;
2520         PgStat_StatDBEntry *dbentry;
2521
2522         /*
2523          * Lookup the database in the hashtable.  Nothing to do if not there.
2524          */
2525         dbentry = pgstat_get_db_entry(msg->m_databaseid, false);
2526
2527         if (!dbentry)
2528                 return;
2529
2530         /*
2531          * We simply throw away all the database's table entries by recreating a
2532          * new hash table for them.
2533          */
2534         if (dbentry->tables != NULL)
2535                 hash_destroy(dbentry->tables);
2536
2537         dbentry->tables = NULL;
2538         dbentry->n_xact_commit = 0;
2539         dbentry->n_xact_rollback = 0;
2540         dbentry->n_blocks_fetched = 0;
2541         dbentry->n_blocks_hit = 0;
2542
2543         memset(&hash_ctl, 0, sizeof(hash_ctl));
2544         hash_ctl.keysize = sizeof(Oid);
2545         hash_ctl.entrysize = sizeof(PgStat_StatTabEntry);
2546         hash_ctl.hash = oid_hash;
2547         dbentry->tables = hash_create("Per-database table",
2548                                                                   PGSTAT_TAB_HASH_SIZE,
2549                                                                   &hash_ctl,
2550                                                                   HASH_ELEM | HASH_FUNCTION);
2551 }
2552
2553 /* ----------
2554  * pgstat_recv_autovac() -
2555  *
2556  *      Process an autovacuum signalling message.
2557  * ----------
2558  */
2559 static void
2560 pgstat_recv_autovac(PgStat_MsgAutovacStart *msg, int len)
2561 {
2562         PgStat_StatDBEntry *dbentry;
2563
2564         /*
2565          * Lookup the database in the hashtable.  Don't create the entry if it
2566          * doesn't exist, because autovacuum may be processing a template
2567          * database.  If this isn't the case, the database is most likely to have
2568          * an entry already.  (If it doesn't, not much harm is done anyway --
2569          * it'll get created as soon as somebody actually uses the database.)
2570          */
2571         dbentry = pgstat_get_db_entry(msg->m_databaseid, false);
2572         if (dbentry == NULL)
2573                 return;
2574
2575         /*
2576          * Store the last autovacuum time in the database entry.
2577          */
2578         dbentry->last_autovac_time = msg->m_start_time;
2579 }
2580
2581 /* ----------
2582  * pgstat_recv_vacuum() -
2583  *
2584  *      Process a VACUUM message.
2585  * ----------
2586  */
2587 static void
2588 pgstat_recv_vacuum(PgStat_MsgVacuum *msg, int len)
2589 {
2590         PgStat_StatDBEntry *dbentry;
2591         PgStat_StatTabEntry *tabentry;
2592
2593         /*
2594          * Don't create either the database or table entry if it doesn't already
2595          * exist.  This avoids bloating the stats with entries for stuff that is
2596          * only touched by vacuum and not by live operations.
2597          */
2598         dbentry = pgstat_get_db_entry(msg->m_databaseid, false);
2599         if (dbentry == NULL)
2600                 return;
2601
2602         tabentry = hash_search(dbentry->tables, &(msg->m_tableoid),
2603                                                    HASH_FIND, NULL);
2604         if (tabentry == NULL)
2605                 return;
2606
2607         if (msg->m_autovacuum)
2608                 tabentry->autovac_vacuum_timestamp = msg->m_vacuumtime;
2609         else
2610                 tabentry->vacuum_timestamp = msg->m_vacuumtime;
2611         tabentry->n_live_tuples = msg->m_tuples;
2612         tabentry->n_dead_tuples = 0;
2613         if (msg->m_analyze)
2614         {
2615                 tabentry->last_anl_tuples = msg->m_tuples;
2616                 if (msg->m_autovacuum)
2617                         tabentry->autovac_analyze_timestamp = msg->m_vacuumtime;
2618                 else
2619                         tabentry->analyze_timestamp = msg->m_vacuumtime;
2620         }
2621         else
2622         {
2623                 /* last_anl_tuples must never exceed n_live_tuples */
2624                 tabentry->last_anl_tuples = Min(tabentry->last_anl_tuples,
2625                                                                                 msg->m_tuples);
2626         }
2627 }
2628
2629 /* ----------
2630  * pgstat_recv_analyze() -
2631  *
2632  *      Process an ANALYZE message.
2633  * ----------
2634  */
2635 static void
2636 pgstat_recv_analyze(PgStat_MsgAnalyze *msg, int len)
2637 {
2638         PgStat_StatDBEntry *dbentry;
2639         PgStat_StatTabEntry *tabentry;
2640
2641         /*
2642          * Don't create either the database or table entry if it doesn't already
2643          * exist.  This avoids bloating the stats with entries for stuff that is
2644          * only touched by analyze and not by live operations.
2645          */
2646         dbentry = pgstat_get_db_entry(msg->m_databaseid, false);
2647         if (dbentry == NULL)
2648                 return;
2649
2650         tabentry = hash_search(dbentry->tables, &(msg->m_tableoid),
2651                                                    HASH_FIND, NULL);
2652         if (tabentry == NULL)
2653                 return;
2654
2655         if (msg->m_autovacuum)
2656                 tabentry->autovac_analyze_timestamp = msg->m_analyzetime;
2657         else
2658                 tabentry->analyze_timestamp = msg->m_analyzetime;
2659         tabentry->n_live_tuples = msg->m_live_tuples;
2660         tabentry->n_dead_tuples = msg->m_dead_tuples;
2661         tabentry->last_anl_tuples = msg->m_live_tuples + msg->m_dead_tuples;
2662 }