]> granicus.if.org Git - postgresql/blob - src/backend/postmaster/pgstat.c
Code review for EXEC_BACKEND changes. Reduce the number of #ifdefs by
[postgresql] / src / backend / postmaster / pgstat.c
1 /* ----------
2  * pgstat.c
3  *
4  *      All the statistics collector stuff hacked up in one big, ugly file.
5  *
6  *      TODO:   - Separate collector, postmaster and backend stuff
7  *                        into different files.
8  *
9  *                      - Add some automatic call for pgstat vacuuming.
10  *
11  *                      - Add a pgstat config column to pg_database, so this
12  *                        entire thing can be enabled/disabled on a per db basis.
13  *
14  *      Copyright (c) 2001-2003, PostgreSQL Global Development Group
15  *
16  *      $PostgreSQL: pgsql/src/backend/postmaster/pgstat.c,v 1.72 2004/05/28 05:12:58 tgl Exp $
17  * ----------
18  */
19 #include "postgres.h"
20
21 #include <unistd.h>
22 #include <fcntl.h>
23 #include <sys/param.h>
24 #include <sys/time.h>
25 #include <sys/types.h>
26 #include <sys/socket.h>
27 #include <netdb.h>
28 #include <netinet/in.h>
29 #include <arpa/inet.h>
30 #include <errno.h>
31 #include <signal.h>
32
33 #include "pgstat.h"
34
35 #include "access/xact.h"
36 #include "access/heapam.h"
37 #include "catalog/catname.h"
38 #include "catalog/pg_shadow.h"
39 #include "catalog/pg_database.h"
40 #include "libpq/pqsignal.h"
41 #include "libpq/libpq.h"
42 #include "mb/pg_wchar.h"
43 #include "miscadmin.h"
44 #include "utils/memutils.h"
45 #include "storage/backendid.h"
46 #include "storage/ipc.h"
47 #include "storage/pg_shmem.h"
48 #include "tcop/tcopprot.h"
49 #include "utils/rel.h"
50 #include "utils/hsearch.h"
51 #include "utils/ps_status.h"
52 #include "utils/syscache.h"
53
54
55 /* ----------
56  * GUC parameters
57  * ----------
58  */
59 bool            pgstat_collect_startcollector = true;
60 bool            pgstat_collect_resetonpmstart = true;
61 bool            pgstat_collect_querystring = false;
62 bool            pgstat_collect_tuplelevel = false;
63 bool            pgstat_collect_blocklevel = false;
64
65 /* ----------
66  * Other global variables
67  * ----------
68  */
69 bool            pgstat_is_running = false;
70
71 /* ----------
72  * Local data
73  * ----------
74  */
75 NON_EXEC_STATIC int     pgStatSock = -1;
76 static int      pgStatPipe[2];
77 static struct sockaddr_storage pgStatAddr;
78 static int      pgStatPmPipe[2] = {-1, -1};
79 static int  pgStatCollectorPmPipe[2] = {-1, -1};
80
81 static int      pgStatPid;
82 static time_t last_pgstat_start_time;
83
84 static long pgStatNumMessages = 0;
85
86 static bool pgStatRunningInCollector = FALSE;
87
88 static int      pgStatTabstatAlloc = 0;
89 static int      pgStatTabstatUsed = 0;
90 static PgStat_MsgTabstat **pgStatTabstatMessages = NULL;
91
92 #define TABSTAT_QUANTUM         4       /* we alloc this many at a time */
93
94 static int      pgStatXactCommit = 0;
95 static int      pgStatXactRollback = 0;
96
97 static TransactionId pgStatDBHashXact = InvalidTransactionId;
98 static HTAB *pgStatDBHash = NULL;
99 static HTAB *pgStatBeDead = NULL;
100 static PgStat_StatBeEntry *pgStatBeTable = NULL;
101 static int      pgStatNumBackends = 0;
102
103 static char pgStat_fname[MAXPGPATH];
104 static char pgStat_tmpfname[MAXPGPATH];
105
106
107 /* ----------
108  * Local function forward declarations
109  * ----------
110  */
111 #ifdef EXEC_BACKEND
112
113 typedef enum STATS_PROCESS_TYPE
114 {
115         STAT_PROC_BUFFER,
116         STAT_PROC_COLLECTOR
117 } STATS_PROCESS_TYPE;
118
119 static pid_t pgstat_forkexec(STATS_PROCESS_TYPE procType);
120 static void pgstat_parseArgs(int argc, char *argv[]);
121
122 #endif
123
124 NON_EXEC_STATIC void PgstatBufferMain(int argc, char *argv[]);
125 NON_EXEC_STATIC void PgstatCollectorMain(int argc, char *argv[]);
126 static void pgstat_recvbuffer(void);
127 static void pgstat_die(SIGNAL_ARGS);
128
129 static int      pgstat_add_backend(PgStat_MsgHdr *msg);
130 static void pgstat_sub_backend(int procpid);
131 static void pgstat_drop_database(Oid databaseid);
132 static void pgstat_write_statsfile(void);
133 static void pgstat_read_statsfile(HTAB **dbhash, Oid onlydb,
134                                           PgStat_StatBeEntry **betab,
135                                           int *numbackends);
136
137 static void pgstat_setheader(PgStat_MsgHdr *hdr, int mtype);
138 static void pgstat_send(void *msg, int len);
139
140 static void pgstat_recv_bestart(PgStat_MsgBestart *msg, int len);
141 static void pgstat_recv_beterm(PgStat_MsgBeterm *msg, int len);
142 static void pgstat_recv_activity(PgStat_MsgActivity *msg, int len);
143 static void pgstat_recv_tabstat(PgStat_MsgTabstat *msg, int len);
144 static void pgstat_recv_tabpurge(PgStat_MsgTabpurge *msg, int len);
145 static void pgstat_recv_dropdb(PgStat_MsgDropdb *msg, int len);
146 static void pgstat_recv_resetcounter(PgStat_MsgResetcounter *msg, int len);
147
148
149 /* ------------------------------------------------------------
150  * Public functions called from postmaster follow
151  * ------------------------------------------------------------
152  */
153
154 /* ----------
155  * pgstat_init() -
156  *
157  *      Called from postmaster at startup. Create the resources required
158  *      by the statistics collector process.  If unable to do so, do not
159  *      fail --- better to let the postmaster start with stats collection
160  *      disabled.
161  * ----------
162  */
163 void
164 pgstat_init(void)
165 {
166         ACCEPT_TYPE_ARG3 alen;
167         struct addrinfo *addrs = NULL,
168                            *addr,
169                                 hints;
170         int                     ret;
171         fd_set      rset;
172         struct timeval tv;
173         char        test_byte;
174         int         sel_res;
175
176 #define TESTBYTEVAL ((char) 199)
177
178         /*
179          * Force start of collector daemon if something to collect
180          */
181         if (pgstat_collect_querystring ||
182                 pgstat_collect_tuplelevel ||
183                 pgstat_collect_blocklevel)
184                 pgstat_collect_startcollector = true;
185
186         /*
187          * Initialize the filename for the status reports.  (In the EXEC_BACKEND
188          * case, this only sets the value in the postmaster.  The collector
189          * subprocess will recompute the value for itself, and individual
190          * backends must do so also if they want to access the file.)
191          */
192         snprintf(pgStat_fname, MAXPGPATH, PGSTAT_STAT_FILENAME, DataDir);
193
194         /*
195          * If we don't have to start a collector or should reset the collected
196          * statistics on postmaster start, simply remove the file.
197          */
198         if (!pgstat_collect_startcollector || pgstat_collect_resetonpmstart)
199                 unlink(pgStat_fname);
200
201         /*
202          * Nothing else required if collector will not get started
203          */
204         if (!pgstat_collect_startcollector)
205                 return;
206
207         /*
208          * Create the UDP socket for sending and receiving statistic messages
209          */
210         hints.ai_flags = AI_PASSIVE;
211         hints.ai_family = PF_UNSPEC;
212         hints.ai_socktype = SOCK_DGRAM;
213         hints.ai_protocol = 0;
214         hints.ai_addrlen = 0;
215         hints.ai_addr = NULL;
216         hints.ai_canonname = NULL;
217         hints.ai_next = NULL;
218         ret = getaddrinfo_all("localhost", NULL, &hints, &addrs);
219         if (ret || !addrs)
220         {
221                 ereport(LOG,
222                                 (errmsg("could not resolve \"localhost\": %s",
223                                                 gai_strerror(ret))));
224                 goto startup_failed;
225         }
226
227         /*
228          * On some platforms, getaddrinfo_all() may return multiple addresses
229          * only one of which will actually work (eg, both IPv6 and IPv4 addresses
230          * when kernel will reject IPv6).  Worse, the failure may occur at the
231          * bind() or perhaps even connect() stage.  So we must loop through the
232          * results till we find a working combination.  We will generate LOG
233          * messages, but no error, for bogus combinations.
234          */
235         for (addr = addrs; addr; addr = addr->ai_next)
236         {
237 #ifdef HAVE_UNIX_SOCKETS
238                 /* Ignore AF_UNIX sockets, if any are returned. */
239                 if (addr->ai_family == AF_UNIX)
240                         continue;
241 #endif
242                 /*
243                  * Create the socket.
244                  */
245                 if ((pgStatSock = socket(addr->ai_family, SOCK_DGRAM, 0)) < 0)
246                 {
247                         ereport(LOG,
248                                         (errcode_for_socket_access(),
249                                          errmsg("could not create socket for statistics collector: %m")));
250                         continue;
251                 }
252
253                 /*
254                  * Bind it to a kernel assigned port on localhost and get the assigned
255                  * port via getsockname().
256                  */
257                 if (bind(pgStatSock, addr->ai_addr, addr->ai_addrlen) < 0)
258                 {
259                         ereport(LOG,
260                                         (errcode_for_socket_access(),
261                                          errmsg("could not bind socket for statistics collector: %m")));
262                         closesocket(pgStatSock);
263                         pgStatSock = -1;
264                         continue;
265                 }
266
267                 alen = sizeof(pgStatAddr);
268                 if (getsockname(pgStatSock, (struct sockaddr *) &pgStatAddr, &alen) < 0)
269                 {
270                         ereport(LOG,
271                                         (errcode_for_socket_access(),
272                                          errmsg("could not get address of socket for statistics collector: %m")));
273                         closesocket(pgStatSock);
274                         pgStatSock = -1;
275                         continue;
276                 }
277
278                 /*
279                  * Connect the socket to its own address.  This saves a few cycles by
280                  * not having to respecify the target address on every send. This also
281                  * provides a kernel-level check that only packets from this same
282                  * address will be received.
283                  */
284                 if (connect(pgStatSock, (struct sockaddr *) &pgStatAddr, alen) < 0)
285                 {
286                         ereport(LOG,
287                                         (errcode_for_socket_access(),
288                                          errmsg("could not connect socket for statistics collector: %m")));
289                         closesocket(pgStatSock);
290                         pgStatSock = -1;
291                         continue;
292                 }
293
294                 /*
295                  * Try to send and receive a one-byte test message on the socket.
296                  * This is to catch situations where the socket can be created but
297                  * will not actually pass data (for instance, because kernel packet
298                  * filtering rules prevent it).
299                  */
300                 test_byte = TESTBYTEVAL;
301                 if (send(pgStatSock, &test_byte, 1, 0) != 1)
302                 {
303                         ereport(LOG,
304                                         (errcode_for_socket_access(),
305                                          errmsg("could not send test message on socket for statistics collector: %m")));
306                         closesocket(pgStatSock);
307                         pgStatSock = -1;
308                         continue;
309                 }
310
311                 /*
312                  * There could possibly be a little delay before the message can be
313                  * received.  We arbitrarily allow up to half a second before deciding
314                  * it's broken.
315                  */
316                 for (;;)                                /* need a loop to handle EINTR */
317                 {
318                         FD_ZERO(&rset);
319                         FD_SET(pgStatSock, &rset);
320                         tv.tv_sec = 0;
321                         tv.tv_usec = 500000;
322                         sel_res = select(pgStatSock+1, &rset, NULL, NULL, &tv);
323                         if (sel_res >= 0 || errno != EINTR)
324                                 break;
325                 }
326                 if (sel_res < 0)
327                 {
328                         ereport(LOG,
329                                         (errcode_for_socket_access(),
330                                          errmsg("select() failed in statistics collector: %m")));
331                         closesocket(pgStatSock);
332                         pgStatSock = -1;
333                         continue;
334                 }
335                 if (sel_res == 0 || !FD_ISSET(pgStatSock, &rset))
336                 {
337                         /*
338                          * This is the case we actually think is likely, so take pains to
339                          * give a specific message for it.
340                          *
341                          * errno will not be set meaningfully here, so don't use it.
342                          */
343                         ereport(LOG,
344                                         (ERRCODE_CONNECTION_FAILURE,
345                                          errmsg("test message did not get through on socket for statistics collector")));
346                         closesocket(pgStatSock);
347                         pgStatSock = -1;
348                         continue;
349                 }
350
351                 test_byte++;                    /* just make sure variable is changed */
352
353                 if (recv(pgStatSock, &test_byte, 1, 0) != 1)
354                 {
355                         ereport(LOG,
356                                         (errcode_for_socket_access(),
357                                          errmsg("could not receive test message on socket for statistics collector: %m")));
358                         closesocket(pgStatSock);
359                         pgStatSock = -1;
360                         continue;
361                 }
362
363                 if (test_byte != TESTBYTEVAL) /* strictly paranoia ... */
364                 {
365                         ereport(LOG,
366                                         (ERRCODE_INTERNAL_ERROR,
367                                          errmsg("incorrect test message transmission on socket for statistics collector")));
368                         closesocket(pgStatSock);
369                         pgStatSock = -1;
370                         continue;
371                 }
372
373                 /* If we get here, we have a working socket */
374                 break;
375         }
376
377         /* Did we find a working address? */
378         if (!addr || pgStatSock < 0)
379         {
380                 ereport(LOG,
381                                 (errcode_for_socket_access(),
382                                  errmsg("disabling statistics collector for lack of working socket")));
383                 goto startup_failed;
384         }
385
386         /*
387          * Set the socket to non-blocking IO.  This ensures that if the
388          * collector falls behind (despite the buffering process), statistics
389          * messages will be discarded; backends won't block waiting to send
390          * messages to the collector.
391          */
392         if (!set_noblock(pgStatSock))
393         {
394                 ereport(LOG,
395                                 (errcode_for_socket_access(),
396                 errmsg("could not set statistics collector socket to nonblocking mode: %m")));
397                 goto startup_failed;
398         }
399
400         /*
401          * Create the pipe that controls the statistics collector shutdown
402          */
403         if (pgpipe(pgStatPmPipe) < 0 || pgpipe(pgStatCollectorPmPipe) < 0)
404         {
405                 ereport(LOG,
406                                 (errcode_for_socket_access(),
407                   errmsg("could not create pipe for statistics collector: %m")));
408                 goto startup_failed;
409         }
410
411         freeaddrinfo_all(hints.ai_family, addrs);
412
413         return;
414
415 startup_failed:
416         if (addrs)
417                 freeaddrinfo_all(hints.ai_family, addrs);
418
419         if (pgStatSock >= 0)
420                 closesocket(pgStatSock);
421         pgStatSock = -1;
422
423         /* Adjust GUC variables to suppress useless activity */
424         pgstat_collect_startcollector = false;
425         pgstat_collect_querystring = false;
426         pgstat_collect_tuplelevel = false;
427         pgstat_collect_blocklevel = false;
428 }
429
430
431 #ifdef EXEC_BACKEND
432
433 /*
434  * pgstat_forkexec() -
435  *
436  * Format up the arglist for, then fork and exec, statistics
437  * (buffer and collector) processes
438  */
439 static pid_t
440 pgstat_forkexec(STATS_PROCESS_TYPE procType)
441 {
442         char *av[12];
443         int ac = 0, bufc = 0, i;
444         char pgstatBuf[7][32];
445
446         av[ac++] = "postgres";
447
448         switch (procType)
449         {
450                 case STAT_PROC_BUFFER:
451                         av[ac++] = "-forkbuf";
452                         break;
453
454                 case STAT_PROC_COLLECTOR:
455                         av[ac++] = "-forkcol";
456                         break;
457
458                 default:
459                         Assert(false);
460         }
461
462         av[ac++] = NULL;                        /* filled in by postmaster_forkexec */
463
464         /* postgres_exec_path is not passed by write_backend_variables */
465         av[ac++] = postgres_exec_path;
466
467         /* Sockets + pipes (those not passed by write_backend_variables) */
468         snprintf(pgstatBuf[bufc++],32,"%d",pgStatPmPipe[0]);
469         snprintf(pgstatBuf[bufc++],32,"%d",pgStatPmPipe[1]);
470         snprintf(pgstatBuf[bufc++],32,"%d",pgStatCollectorPmPipe[0]);
471         snprintf(pgstatBuf[bufc++],32,"%d",pgStatCollectorPmPipe[1]);
472         snprintf(pgstatBuf[bufc++],32,"%d",pgStatPipe[0]);
473         snprintf(pgstatBuf[bufc++],32,"%d",pgStatPipe[1]);
474
475         /* Add to the arg list */
476         Assert(bufc <= lengthof(pgstatBuf));
477         for (i = 0; i < bufc; i++)
478                 av[ac++] = pgstatBuf[i];
479
480         av[ac] = NULL;
481         Assert(ac < lengthof(av));
482
483         return postmaster_forkexec(ac, av);
484 }
485
486
487 /*
488  * pgstat_parseArgs() -
489  *
490  * Extract data from the arglist for exec'ed statistics
491  * (buffer and collector) processes
492  */
493 static void
494 pgstat_parseArgs(int argc, char *argv[])
495 {
496         Assert(argc == 10);
497
498         argc = 3;
499         StrNCpy(postgres_exec_path,     argv[argc++], MAXPGPATH);
500         pgStatPmPipe[0] = atoi(argv[argc++]);
501         pgStatPmPipe[1] = atoi(argv[argc++]);
502         pgStatCollectorPmPipe[0] = atoi(argv[argc++]);
503         pgStatCollectorPmPipe[1] = atoi(argv[argc++]);
504         pgStatPipe[0]   = atoi(argv[argc++]);
505         pgStatPipe[1]   = atoi(argv[argc++]);
506 }
507
508 #endif /* EXEC_BACKEND */
509
510
511 /* ----------
512  * pgstat_start() -
513  *
514  *      Called from postmaster at startup or after an existing collector
515  *      died.  Attempt to fire up a fresh statistics collector.
516  *
517  *      Note: if fail, we will be called again from the postmaster main loop.
518  * ----------
519  */
520 void
521 pgstat_start(void)
522 {
523         time_t          curtime;
524
525         /*
526          * Do nothing if no collector needed
527          */
528         if (pgstat_is_running || !pgstat_collect_startcollector)
529                 return;
530
531         /*
532          * Do nothing if too soon since last collector start.  This is a
533          * safety valve to protect against continuous respawn attempts if the
534          * collector is dying immediately at launch.  Note that since we will
535          * be re-called from the postmaster main loop, we will get another
536          * chance later.
537          */
538         curtime = time(NULL);
539         if ((unsigned int) (curtime - last_pgstat_start_time) <
540                 (unsigned int) PGSTAT_RESTART_INTERVAL)
541                 return;
542         last_pgstat_start_time = curtime;
543
544         /*
545          * Check that the socket is there, else pgstat_init failed.
546          */
547         if (pgStatSock < 0)
548         {
549                 ereport(LOG,
550                                 (errmsg("statistics collector startup skipped")));
551
552                 /*
553                  * We can only get here if someone tries to manually turn
554                  * pgstat_collect_startcollector on after it had been off.
555                  */
556                 pgstat_collect_startcollector = false;
557                 return;
558         }
559
560         /*
561          * Okay, fork off the collector.  Remember its PID for
562          * pgstat_ispgstat.
563          */
564
565         fflush(stdout);
566         fflush(stderr);
567
568 #ifdef __BEOS__
569         /* Specific beos actions before backend startup */
570         beos_before_backend_startup();
571 #endif
572
573 #ifdef EXEC_BACKEND
574         switch ((pgStatPid = (int) pgstat_forkexec(STAT_PROC_BUFFER)))
575 #else
576         switch ((pgStatPid = (int) fork()))
577 #endif
578         {
579                 case -1:
580 #ifdef __BEOS__
581                         /* Specific beos actions */
582                         beos_backend_startup_failed();
583 #endif
584                         ereport(LOG,
585                                         (errmsg("could not fork statistics buffer: %m")));
586                         return;
587
588 #ifndef EXEC_BACKEND
589                 case 0:
590                         /* in postmaster child ... */
591 #ifdef __BEOS__
592                         /* Specific beos actions after backend startup */
593                         beos_backend_startup();
594 #endif
595                         /* Close the postmaster's sockets, except for pgstat link */
596                         ClosePostmasterPorts(false);
597
598                         /* Drop our connection to postmaster's shared memory, as well */
599                         PGSharedMemoryDetach();
600
601                         PgstatBufferMain(0, NULL);
602                         break;
603 #endif
604
605                 default:
606                         pgstat_is_running = true;
607                         return;
608         }
609 }
610
611
612 /* ----------
613  * pgstat_ispgstat() -
614  *
615  *      Called from postmaster to check if a terminated child process
616  *      was the statistics collector.
617  * ----------
618  */
619 bool
620 pgstat_ispgstat(int pid)
621 {
622         if (!pgstat_is_running)
623                 return false;
624
625         if (pgStatPid != pid)
626                 return false;
627
628         /* Oh dear ... */
629         pgstat_is_running = false;
630
631         return true;
632 }
633
634
635 /* ----------
636  * pgstat_close_sockets() -
637  *
638  *      Called when postmaster forks a non-pgstat child process, to close off
639  *      file descriptors that should not be held open in child processes.
640  * ----------
641  */
642 void
643 pgstat_close_sockets(void)
644 {
645         if (pgStatPmPipe[0] >= 0)
646                 closesocket(pgStatPmPipe[0]);
647         pgStatPmPipe[0] = -1;
648         if (pgStatPmPipe[1] >= 0)
649                 closesocket(pgStatPmPipe[1]);
650         pgStatPmPipe[1] = -1;
651         if (pgStatCollectorPmPipe[0] >= 0)
652                 closesocket(pgStatCollectorPmPipe[0]);
653         pgStatCollectorPmPipe[0] = -1;
654         if (pgStatCollectorPmPipe[1] >= 0)
655                 closesocket(pgStatCollectorPmPipe[1]);
656         pgStatCollectorPmPipe[1] = -1;
657 }
658
659
660 /* ----------
661  * pgstat_beterm() -
662  *
663  *      Called from postmaster to tell collector a backend terminated.
664  * ----------
665  */
666 void
667 pgstat_beterm(int pid)
668 {
669         PgStat_MsgBeterm msg;
670
671         if (pgStatSock < 0)
672                 return;
673
674         MemSet(&(msg.m_hdr), 0, sizeof(msg.m_hdr));
675         msg.m_hdr.m_type = PGSTAT_MTYPE_BETERM;
676         msg.m_hdr.m_procpid = pid;
677
678         pgstat_send(&msg, sizeof(msg));
679 }
680
681
682 /* ------------------------------------------------------------
683  * Public functions used by backends follow
684  *------------------------------------------------------------
685  */
686
687
688 /* ----------
689  * pgstat_bestart() -
690  *
691  *      Tell the collector that this new backend is soon ready to process
692  *      queries. Called from tcop/postgres.c before entering the mainloop.
693  * ----------
694  */
695 void
696 pgstat_bestart(void)
697 {
698         PgStat_MsgBestart msg;
699
700         if (pgStatSock < 0)
701                 return;
702
703         pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_BESTART);
704         pgstat_send(&msg, sizeof(msg));
705 }
706
707
708 /* ----------
709  * pgstat_report_activity() -
710  *
711  *      Called in tcop/postgres.c to tell the collector what the backend
712  *      is actually doing (usually "<IDLE>" or the start of the query to
713  *      be executed).
714  * ----------
715  */
716 void
717 pgstat_report_activity(const char *what)
718 {
719         PgStat_MsgActivity msg;
720         int                     len;
721
722         if (!pgstat_collect_querystring || pgStatSock < 0)
723                 return;
724
725         len = strlen(what);
726         len = pg_mbcliplen((const unsigned char *) what, len,
727                                            PGSTAT_ACTIVITY_SIZE - 1);
728
729         memcpy(msg.m_what, what, len);
730         msg.m_what[len] = '\0';
731         len += offsetof(PgStat_MsgActivity, m_what) +1;
732
733         pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_ACTIVITY);
734         pgstat_send(&msg, len);
735 }
736
737
738 /* ----------
739  * pgstat_report_tabstat() -
740  *
741  *      Called from tcop/postgres.c to send the so far collected
742  *      per table access statistics to the collector.
743  * ----------
744  */
745 void
746 pgstat_report_tabstat(void)
747 {
748         int                     i;
749
750         if (pgStatSock < 0 ||
751                 !(pgstat_collect_querystring ||
752                   pgstat_collect_tuplelevel ||
753                   pgstat_collect_blocklevel))
754         {
755                 /* Not reporting stats, so just flush whatever we have */
756                 pgStatTabstatUsed = 0;
757                 return;
758         }
759
760         /*
761          * For each message buffer used during the last query set the header
762          * fields and send it out.
763          */
764         for (i = 0; i < pgStatTabstatUsed; i++)
765         {
766                 PgStat_MsgTabstat *tsmsg = pgStatTabstatMessages[i];
767                 int                     n;
768                 int                     len;
769
770                 n = tsmsg->m_nentries;
771                 len = offsetof(PgStat_MsgTabstat, m_entry[0]) +
772                         n * sizeof(PgStat_TableEntry);
773
774                 tsmsg->m_xact_commit = pgStatXactCommit;
775                 tsmsg->m_xact_rollback = pgStatXactRollback;
776                 pgStatXactCommit = 0;
777                 pgStatXactRollback = 0;
778
779                 pgstat_setheader(&tsmsg->m_hdr, PGSTAT_MTYPE_TABSTAT);
780                 pgstat_send(tsmsg, len);
781         }
782
783         pgStatTabstatUsed = 0;
784 }
785
786
787 /* ----------
788  * pgstat_vacuum_tabstat() -
789  *
790  *      Will tell the collector about objects he can get rid of.
791  * ----------
792  */
793 int
794 pgstat_vacuum_tabstat(void)
795 {
796         Relation        dbrel;
797         HeapScanDesc dbscan;
798         HeapTuple       dbtup;
799         Oid                *dbidlist;
800         int                     dbidalloc;
801         int                     dbidused;
802         HASH_SEQ_STATUS hstat;
803         PgStat_StatDBEntry *dbentry;
804         PgStat_StatTabEntry *tabentry;
805         HeapTuple       reltup;
806         int                     nobjects = 0;
807         PgStat_MsgTabpurge msg;
808         int                     len;
809         int                     i;
810
811         if (pgStatSock < 0)
812                 return 0;
813
814         /*
815          * If not done for this transaction, read the statistics collector
816          * stats file into some hash tables.
817          */
818         if (!TransactionIdEquals(pgStatDBHashXact, GetCurrentTransactionId()))
819         {
820                 pgstat_read_statsfile(&pgStatDBHash, MyDatabaseId,
821                                                           &pgStatBeTable, &pgStatNumBackends);
822                 pgStatDBHashXact = GetCurrentTransactionId();
823         }
824
825         /*
826          * Lookup our own database entry
827          */
828         dbentry = (PgStat_StatDBEntry *) hash_search(pgStatDBHash,
829                                                                                                  (void *) &MyDatabaseId,
830                                                                                                  HASH_FIND, NULL);
831         if (dbentry == NULL)
832                 return -1;
833
834         if (dbentry->tables == NULL)
835                 return 0;
836
837         /*
838          * Initialize our messages table counter to zero
839          */
840         msg.m_nentries = 0;
841
842         /*
843          * Check for all tables if they still exist.
844          */
845         hash_seq_init(&hstat, dbentry->tables);
846         while ((tabentry = (PgStat_StatTabEntry *) hash_seq_search(&hstat)) != NULL)
847         {
848                 /*
849                  * Check if this relation is still alive by looking up it's
850                  * pg_class tuple in the system catalog cache.
851                  */
852                 reltup = SearchSysCache(RELOID,
853                                                                 ObjectIdGetDatum(tabentry->tableid),
854                                                                 0, 0, 0);
855                 if (HeapTupleIsValid(reltup))
856                 {
857                         ReleaseSysCache(reltup);
858                         continue;
859                 }
860
861                 /*
862                  * Add this tables Oid to the message
863                  */
864                 msg.m_tableid[msg.m_nentries++] = tabentry->tableid;
865                 nobjects++;
866
867                 /*
868                  * If the message is full, send it out and reinitialize ot zero
869                  */
870                 if (msg.m_nentries >= PGSTAT_NUM_TABPURGE)
871                 {
872                         len = offsetof(PgStat_MsgTabpurge, m_tableid[0])
873                                 +msg.m_nentries * sizeof(Oid);
874
875                         pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_TABPURGE);
876                         pgstat_send(&msg, len);
877
878                         msg.m_nentries = 0;
879                 }
880         }
881
882         /*
883          * Send the rest
884          */
885         if (msg.m_nentries > 0)
886         {
887                 len = offsetof(PgStat_MsgTabpurge, m_tableid[0])
888                         +msg.m_nentries * sizeof(Oid);
889
890                 pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_TABPURGE);
891                 pgstat_send(&msg, len);
892         }
893
894         /*
895          * Read pg_database and remember the Oid's of all existing databases
896          */
897         dbidalloc = 256;
898         dbidused = 0;
899         dbidlist = (Oid *) palloc(sizeof(Oid) * dbidalloc);
900
901         dbrel = heap_openr(DatabaseRelationName, AccessShareLock);
902         dbscan = heap_beginscan(dbrel, SnapshotNow, 0, NULL);
903         while ((dbtup = heap_getnext(dbscan, ForwardScanDirection)) != NULL)
904         {
905                 if (dbidused >= dbidalloc)
906                 {
907                         dbidalloc *= 2;
908                         dbidlist = (Oid *) repalloc((char *) dbidlist,
909                                                                                 sizeof(Oid) * dbidalloc);
910                 }
911                 dbidlist[dbidused++] = HeapTupleGetOid(dbtup);
912         }
913         heap_endscan(dbscan);
914         heap_close(dbrel, AccessShareLock);
915
916         /*
917          * Search the database hash table for dead databases and tell the
918          * collector to drop them as well.
919          */
920         hash_seq_init(&hstat, pgStatDBHash);
921         while ((dbentry = (PgStat_StatDBEntry *) hash_seq_search(&hstat)) != NULL)
922         {
923                 Oid                     dbid = dbentry->databaseid;
924
925                 for (i = 0; i < dbidused; i++)
926                 {
927                         if (dbidlist[i] == dbid)
928                         {
929                                 dbid = InvalidOid;
930                                 break;
931                         }
932                 }
933
934                 if (dbid != InvalidOid)
935                 {
936                         nobjects++;
937                         pgstat_drop_database(dbid);
938                 }
939         }
940
941         /*
942          * Free the dbid list.
943          */
944         pfree((char *) dbidlist);
945
946         /*
947          * Tell the caller how many removeable objects we found
948          */
949         return nobjects;
950 }
951
952
953 /* ----------
954  * pgstat_drop_database() -
955  *
956  *      Tell the collector that we just dropped a database.
957  *      This is the only message that shouldn't get lost in space. Otherwise
958  *      the collector will keep the statistics for the dead DB until his
959  *      stats file got removed while the postmaster is down.
960  * ----------
961  */
962 static void
963 pgstat_drop_database(Oid databaseid)
964 {
965         PgStat_MsgDropdb msg;
966
967         if (pgStatSock < 0)
968                 return;
969
970         msg.m_databaseid = databaseid;
971
972         pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_DROPDB);
973         pgstat_send(&msg, sizeof(msg));
974 }
975
976
977 /* ----------
978  * pgstat_reset_counters() -
979  *
980  *      Tell the statistics collector to reset counters for our database.
981  * ----------
982  */
983 void
984 pgstat_reset_counters(void)
985 {
986         PgStat_MsgResetcounter msg;
987
988         if (pgStatSock < 0)
989                 return;
990
991         if (!superuser())
992                 ereport(ERROR,
993                                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
994                           errmsg("must be superuser to reset statistics counters")));
995
996         pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_RESETCOUNTER);
997         pgstat_send(&msg, sizeof(msg));
998 }
999
1000
1001 /* ----------
1002  * pgstat_ping() -
1003  *
1004  *      Send some junk data to the collector to increase traffic.
1005  * ----------
1006  */
1007 void
1008 pgstat_ping(void)
1009 {
1010         PgStat_MsgDummy msg;
1011
1012         if (pgStatSock < 0)
1013                 return;
1014
1015         pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_DUMMY);
1016         pgstat_send(&msg, sizeof(msg));
1017 }
1018
1019 /*
1020  * Create or enlarge the pgStatTabstatMessages array
1021  */
1022 static bool
1023 more_tabstat_space(void)
1024 {
1025         PgStat_MsgTabstat *newMessages;
1026         PgStat_MsgTabstat **msgArray;
1027         int                     newAlloc = pgStatTabstatAlloc + TABSTAT_QUANTUM;
1028         int                     i;
1029
1030         /* Create (another) quantum of message buffers */
1031         newMessages = (PgStat_MsgTabstat *)
1032                 malloc(sizeof(PgStat_MsgTabstat) * TABSTAT_QUANTUM);
1033         if (newMessages == NULL)
1034         {
1035                 ereport(LOG,
1036                                 (errcode(ERRCODE_OUT_OF_MEMORY),
1037                                  errmsg("out of memory")));
1038                 return false;
1039         }
1040
1041         /* Create or enlarge the pointer array */
1042         if (pgStatTabstatMessages == NULL)
1043                 msgArray = (PgStat_MsgTabstat **)
1044                         malloc(sizeof(PgStat_MsgTabstat *) * newAlloc);
1045         else
1046                 msgArray = (PgStat_MsgTabstat **)
1047                         realloc(pgStatTabstatMessages,
1048                                         sizeof(PgStat_MsgTabstat *) * newAlloc);
1049         if (msgArray == NULL)
1050         {
1051                 free(newMessages);
1052                 ereport(LOG,
1053                                 (errcode(ERRCODE_OUT_OF_MEMORY),
1054                                  errmsg("out of memory")));
1055                 return false;
1056         }
1057
1058         MemSet(newMessages, 0, sizeof(PgStat_MsgTabstat) * TABSTAT_QUANTUM);
1059         for (i = 0; i < TABSTAT_QUANTUM; i++)
1060                 msgArray[pgStatTabstatAlloc + i] = newMessages++;
1061         pgStatTabstatMessages = msgArray;
1062         pgStatTabstatAlloc = newAlloc;
1063
1064         return true;
1065 }
1066
1067 /* ----------
1068  * pgstat_initstats() -
1069  *
1070  *      Called from various places usually dealing with initialization
1071  *      of Relation or Scan structures. The data placed into these
1072  *      structures from here tell where later to count for buffer reads,
1073  *      scans and tuples fetched.
1074  * ----------
1075  */
1076 void
1077 pgstat_initstats(PgStat_Info *stats, Relation rel)
1078 {
1079         Oid                     rel_id = rel->rd_id;
1080         PgStat_TableEntry *useent;
1081         PgStat_MsgTabstat *tsmsg;
1082         int                     mb;
1083         int                     i;
1084
1085         /*
1086          * Initialize data not to count at all.
1087          */
1088         stats->tabentry = NULL;
1089         stats->no_stats = FALSE;
1090         stats->heap_scan_counted = FALSE;
1091         stats->index_scan_counted = FALSE;
1092
1093         if (pgStatSock < 0 ||
1094                 !(pgstat_collect_tuplelevel ||
1095                   pgstat_collect_blocklevel))
1096         {
1097                 stats->no_stats = TRUE;
1098                 return;
1099         }
1100
1101         /*
1102          * Search the already-used message slots for this relation.
1103          */
1104         for (mb = 0; mb < pgStatTabstatUsed; mb++)
1105         {
1106                 tsmsg = pgStatTabstatMessages[mb];
1107
1108                 for (i = tsmsg->m_nentries; --i >= 0; )
1109                 {
1110                         if (tsmsg->m_entry[i].t_id == rel_id)
1111                         {
1112                                 stats->tabentry = (void *) &(tsmsg->m_entry[i]);
1113                                 return;
1114                         }
1115                 }
1116
1117                 if (tsmsg->m_nentries >= PGSTAT_NUM_TABENTRIES)
1118                         continue;
1119
1120                 /*
1121                  * Not found, but found a message buffer with an empty slot
1122                  * instead. Fine, let's use this one.
1123                  */
1124                 i = tsmsg->m_nentries++;
1125                 useent = &tsmsg->m_entry[i];
1126                 MemSet(useent, 0, sizeof(PgStat_TableEntry));
1127                 useent->t_id = rel_id;
1128                 stats->tabentry = (void *) useent;
1129                 return;
1130         }
1131
1132         /*
1133          * If we ran out of message buffers, we just allocate more.
1134          */
1135         if (pgStatTabstatUsed >= pgStatTabstatAlloc)
1136         {
1137                 if (!more_tabstat_space())
1138                 {
1139                         stats->no_stats = TRUE;
1140                         return;
1141                 }
1142                 Assert(pgStatTabstatUsed < pgStatTabstatAlloc);
1143         }
1144
1145         /*
1146          * Use the first entry of the next message buffer.
1147          */
1148         mb = pgStatTabstatUsed++;
1149         tsmsg = pgStatTabstatMessages[mb];
1150         tsmsg->m_nentries = 1;
1151         useent = &tsmsg->m_entry[0];
1152         MemSet(useent, 0, sizeof(PgStat_TableEntry));
1153         useent->t_id = rel_id;
1154         stats->tabentry = (void *) useent;
1155 }
1156
1157
1158 /* ----------
1159  * pgstat_count_xact_commit() -
1160  *
1161  *      Called from access/transam/xact.c to count transaction commits.
1162  * ----------
1163  */
1164 void
1165 pgstat_count_xact_commit(void)
1166 {
1167         if (!(pgstat_collect_querystring ||
1168                   pgstat_collect_tuplelevel ||
1169                   pgstat_collect_blocklevel))
1170                 return;
1171
1172         pgStatXactCommit++;
1173
1174         /*
1175          * If there was no relation activity yet, just make one existing
1176          * message buffer used without slots, causing the next report to tell
1177          * new xact-counters.
1178          */
1179         if (pgStatTabstatAlloc == 0)
1180         {
1181                 if (!more_tabstat_space())
1182                         return;
1183         }
1184         if (pgStatTabstatUsed == 0)
1185         {
1186                 pgStatTabstatUsed++;
1187                 pgStatTabstatMessages[0]->m_nentries = 0;
1188         }
1189 }
1190
1191
1192 /* ----------
1193  * pgstat_count_xact_rollback() -
1194  *
1195  *      Called from access/transam/xact.c to count transaction rollbacks.
1196  * ----------
1197  */
1198 void
1199 pgstat_count_xact_rollback(void)
1200 {
1201         if (!(pgstat_collect_querystring ||
1202                   pgstat_collect_tuplelevel ||
1203                   pgstat_collect_blocklevel))
1204                 return;
1205
1206         pgStatXactRollback++;
1207
1208         /*
1209          * If there was no relation activity yet, just make one existing
1210          * message buffer used without slots, causing the next report to tell
1211          * new xact-counters.
1212          */
1213         if (pgStatTabstatAlloc == 0)
1214         {
1215                 if (!more_tabstat_space())
1216                         return;
1217         }
1218         if (pgStatTabstatUsed == 0)
1219         {
1220                 pgStatTabstatUsed++;
1221                 pgStatTabstatMessages[0]->m_nentries = 0;
1222         }
1223 }
1224
1225
1226 /* ----------
1227  * pgstat_fetch_stat_dbentry() -
1228  *
1229  *      Support function for the SQL-callable pgstat* functions. Returns
1230  *      the collected statistics for one database or NULL. NULL doesn't mean
1231  *      that the database doesn't exist, it is just not yet known by the
1232  *      collector, so the caller is better off to report ZERO instead.
1233  * ----------
1234  */
1235 PgStat_StatDBEntry *
1236 pgstat_fetch_stat_dbentry(Oid dbid)
1237 {
1238         PgStat_StatDBEntry *dbentry;
1239
1240         /*
1241          * If not done for this transaction, read the statistics collector
1242          * stats file into some hash tables. Be careful with the
1243          * read_statsfile() call below!
1244          */
1245         if (!TransactionIdEquals(pgStatDBHashXact, GetCurrentTransactionId()))
1246         {
1247                 pgstat_read_statsfile(&pgStatDBHash, MyDatabaseId,
1248                                                           &pgStatBeTable, &pgStatNumBackends);
1249                 pgStatDBHashXact = GetCurrentTransactionId();
1250         }
1251
1252         /*
1253          * Lookup the requested database
1254          */
1255         dbentry = (PgStat_StatDBEntry *) hash_search(pgStatDBHash,
1256                                                                                                  (void *) &dbid,
1257                                                                                                  HASH_FIND, NULL);
1258         if (dbentry == NULL)
1259                 return NULL;
1260
1261         return dbentry;
1262 }
1263
1264
1265 /* ----------
1266  * pgstat_fetch_stat_tabentry() -
1267  *
1268  *      Support function for the SQL-callable pgstat* functions. Returns
1269  *      the collected statistics for one table or NULL. NULL doesn't mean
1270  *      that the table doesn't exist, it is just not yet known by the
1271  *      collector, so the caller is better off to report ZERO instead.
1272  * ----------
1273  */
1274 PgStat_StatTabEntry *
1275 pgstat_fetch_stat_tabentry(Oid relid)
1276 {
1277         PgStat_StatDBEntry *dbentry;
1278         PgStat_StatTabEntry *tabentry;
1279
1280         /*
1281          * If not done for this transaction, read the statistics collector
1282          * stats file into some hash tables. Be careful with the
1283          * read_statsfile() call below!
1284          */
1285         if (!TransactionIdEquals(pgStatDBHashXact, GetCurrentTransactionId()))
1286         {
1287                 pgstat_read_statsfile(&pgStatDBHash, MyDatabaseId,
1288                                                           &pgStatBeTable, &pgStatNumBackends);
1289                 pgStatDBHashXact = GetCurrentTransactionId();
1290         }
1291
1292         /*
1293          * Lookup our database.
1294          */
1295         dbentry = (PgStat_StatDBEntry *) hash_search(pgStatDBHash,
1296                                                                                                  (void *) &MyDatabaseId,
1297                                                                                                  HASH_FIND, NULL);
1298         if (dbentry == NULL)
1299                 return NULL;
1300
1301         /*
1302          * Now inside the DB's table hash table lookup the requested one.
1303          */
1304         if (dbentry->tables == NULL)
1305                 return NULL;
1306         tabentry = (PgStat_StatTabEntry *) hash_search(dbentry->tables,
1307                                                                                                    (void *) &relid,
1308                                                                                                    HASH_FIND, NULL);
1309         if (tabentry == NULL)
1310                 return NULL;
1311
1312         return tabentry;
1313 }
1314
1315
1316 /* ----------
1317  * pgstat_fetch_stat_beentry() -
1318  *
1319  *      Support function for the SQL-callable pgstat* functions. Returns
1320  *      the actual activity slot of one active backend. The caller is
1321  *      responsible for a check if the actual user is permitted to see
1322  *      that info (especially the querystring).
1323  * ----------
1324  */
1325 PgStat_StatBeEntry *
1326 pgstat_fetch_stat_beentry(int beid)
1327 {
1328         if (!TransactionIdEquals(pgStatDBHashXact, GetCurrentTransactionId()))
1329         {
1330                 pgstat_read_statsfile(&pgStatDBHash, MyDatabaseId,
1331                                                           &pgStatBeTable, &pgStatNumBackends);
1332                 pgStatDBHashXact = GetCurrentTransactionId();
1333         }
1334
1335         if (beid < 1 || beid > pgStatNumBackends)
1336                 return NULL;
1337
1338         return &pgStatBeTable[beid - 1];
1339 }
1340
1341
1342 /* ----------
1343  * pgstat_fetch_stat_numbackends() -
1344  *
1345  *      Support function for the SQL-callable pgstat* functions. Returns
1346  *      the maximum current backend id.
1347  * ----------
1348  */
1349 int
1350 pgstat_fetch_stat_numbackends(void)
1351 {
1352         if (!TransactionIdEquals(pgStatDBHashXact, GetCurrentTransactionId()))
1353         {
1354                 pgstat_read_statsfile(&pgStatDBHash, MyDatabaseId,
1355                                                           &pgStatBeTable, &pgStatNumBackends);
1356                 pgStatDBHashXact = GetCurrentTransactionId();
1357         }
1358
1359         return pgStatNumBackends;
1360 }
1361
1362
1363
1364 /* ------------------------------------------------------------
1365  * Local support functions follow
1366  * ------------------------------------------------------------
1367  */
1368
1369
1370 /* ----------
1371  * pgstat_setheader() -
1372  *
1373  *              Set common header fields in a statistics message
1374  * ----------
1375  */
1376 static void
1377 pgstat_setheader(PgStat_MsgHdr *hdr, int mtype)
1378 {
1379         hdr->m_type = mtype;
1380         hdr->m_backendid = MyBackendId;
1381         hdr->m_procpid = MyProcPid;
1382         hdr->m_databaseid = MyDatabaseId;
1383         hdr->m_userid = GetSessionUserId();
1384 }
1385
1386
1387 /* ----------
1388  * pgstat_send() -
1389  *
1390  *              Send out one statistics message to the collector
1391  * ----------
1392  */
1393 static void
1394 pgstat_send(void *msg, int len)
1395 {
1396         if (pgStatSock < 0)
1397                 return;
1398
1399         ((PgStat_MsgHdr *) msg)->m_size = len;
1400
1401         send(pgStatSock, msg, len, 0);
1402         /* We deliberately ignore any error from send() */
1403 }
1404
1405
1406 /* ----------
1407  * PgstatBufferMain() -
1408  *
1409  *      Start up the statistics buffer process.  This is the body of the
1410  *      postmaster child process.
1411  *
1412  *      The argc/argv parameters are valid only in EXEC_BACKEND case.
1413  * ----------
1414  */
1415 NON_EXEC_STATIC void
1416 PgstatBufferMain(int argc, char *argv[])
1417 {
1418         IsUnderPostmaster = true;       /* we are a postmaster subprocess now */
1419
1420         MyProcPid = getpid();           /* reset MyProcPid */
1421
1422         /* Lose the postmaster's on-exit routines */
1423         on_exit_reset();
1424
1425         /*
1426          * Ignore all signals usually bound to some action in the postmaster,
1427          * except for SIGCHLD --- see pgstat_recvbuffer.
1428          */
1429         pqsignal(SIGHUP, SIG_IGN);
1430         pqsignal(SIGINT, SIG_IGN);
1431         pqsignal(SIGTERM, SIG_IGN);
1432         pqsignal(SIGQUIT, SIG_IGN);
1433         pqsignal(SIGALRM, SIG_IGN);
1434         pqsignal(SIGPIPE, SIG_IGN);
1435         pqsignal(SIGUSR1, SIG_IGN);
1436         pqsignal(SIGUSR2, SIG_IGN);
1437         pqsignal(SIGCHLD, pgstat_die);
1438         pqsignal(SIGTTIN, SIG_DFL);
1439         pqsignal(SIGTTOU, SIG_DFL);
1440         pqsignal(SIGCONT, SIG_DFL);
1441         pqsignal(SIGWINCH, SIG_DFL);
1442         /* unblock will happen in pgstat_recvbuffer */
1443
1444 #ifdef EXEC_BACKEND
1445         pgstat_parseArgs(argc,argv);
1446 #endif
1447
1448         /*
1449          * Close the writing end of the postmaster pipe, so we'll see it
1450          * closing when the postmaster terminates and can terminate as well.
1451          */
1452         closesocket(pgStatPmPipe[1]);
1453         pgStatPmPipe[1] = -1;
1454         closesocket(pgStatCollectorPmPipe[1]);
1455         pgStatCollectorPmPipe[1] = -1;
1456
1457         /*
1458          * Start a buffering process to read from the socket, so we have a
1459          * little more time to process incoming messages.
1460          *
1461          * NOTE: the process structure is: postmaster is parent of buffer process
1462          * is parent of collector process.      This way, the buffer can detect
1463          * collector failure via SIGCHLD, whereas otherwise it wouldn't notice
1464          * collector failure until it tried to write on the pipe.  That would
1465          * mean that after the postmaster started a new collector, we'd have
1466          * two buffer processes competing to read from the UDP socket --- not
1467          * good.
1468          */
1469         if (pgpipe(pgStatPipe) < 0)
1470         {
1471                 ereport(LOG,
1472                                 (errcode_for_socket_access(),
1473                          errmsg("could not create pipe for statistics buffer: %m")));
1474                 exit(1);
1475         }
1476
1477 #ifdef EXEC_BACKEND
1478         /* child becomes collector process */
1479         switch (pgstat_forkexec(STAT_PROC_COLLECTOR))
1480 #else
1481         switch (fork())
1482 #endif
1483         {
1484                 case -1:
1485                         ereport(LOG,
1486                                         (errmsg("could not fork statistics collector: %m")));
1487                         exit(1);
1488
1489 #ifndef EXEC_BACKEND
1490                 case 0:
1491                         /* child becomes collector process */
1492                         PgstatCollectorMain(0, NULL);
1493                         break;
1494 #endif
1495
1496                 default:
1497                         /* parent becomes buffer process */
1498                         closesocket(pgStatPipe[0]);
1499                         pgstat_recvbuffer();
1500         }
1501         exit(0);
1502 }
1503
1504
1505 /* ----------
1506  * PgstatCollectorMain() -
1507  *
1508  *      Start up the statistics collector itself.  This is the body of the
1509  *      postmaster grandchild process.
1510  *
1511  *      The argc/argv parameters are valid only in EXEC_BACKEND case.
1512  * ----------
1513  */
1514 NON_EXEC_STATIC void
1515 PgstatCollectorMain(int argc, char *argv[])
1516 {
1517         PgStat_Msg      msg;
1518         fd_set          rfds;
1519         int                     readPipe;
1520         int                     pmPipe;
1521         int                     nready;
1522         int                     len = 0;
1523         struct timeval timeout;
1524         struct timeval next_statwrite;
1525         bool            need_statwrite;
1526         HASHCTL         hash_ctl;
1527
1528         MyProcPid = getpid();           /* reset MyProcPid */
1529
1530         /*
1531          * Reset signal handling.  With the exception of restoring default
1532          * SIGCHLD handling, this is a no-op in the non-EXEC_BACKEND case
1533          * because we'll have inherited these settings from the buffer process;
1534          * but it's not a no-op for EXEC_BACKEND.
1535          */
1536         pqsignal(SIGHUP, SIG_IGN);
1537         pqsignal(SIGINT, SIG_IGN);
1538         pqsignal(SIGTERM, SIG_IGN);
1539         pqsignal(SIGQUIT, SIG_IGN);
1540         pqsignal(SIGALRM, SIG_IGN);
1541         pqsignal(SIGPIPE, SIG_IGN);
1542         pqsignal(SIGUSR1, SIG_IGN);
1543         pqsignal(SIGUSR2, SIG_IGN);
1544         pqsignal(SIGCHLD, SIG_DFL);
1545         pqsignal(SIGTTIN, SIG_DFL);
1546         pqsignal(SIGTTOU, SIG_DFL);
1547         pqsignal(SIGCONT, SIG_DFL);
1548         pqsignal(SIGWINCH, SIG_DFL);
1549         PG_SETMASK(&UnBlockSig);
1550
1551 #ifdef EXEC_BACKEND
1552         pgstat_parseArgs(argc,argv);
1553 #endif
1554
1555         /* Close unwanted files */
1556         closesocket(pgStatPipe[1]);
1557         closesocket(pgStatSock);
1558         pmPipe = pgStatCollectorPmPipe[0];
1559
1560         /*
1561          * Identify myself via ps
1562          */
1563         init_ps_display("stats collector process", "", "");
1564         set_ps_display("");
1565
1566         /*
1567          * Initialize filenames needed for status reports.
1568          */
1569         snprintf(pgStat_fname, MAXPGPATH, PGSTAT_STAT_FILENAME, DataDir);
1570         /* tmpfname need only be set correctly in this process */
1571         snprintf(pgStat_tmpfname, MAXPGPATH, PGSTAT_STAT_TMPFILE,
1572                          DataDir, getpid());
1573
1574         /*
1575          * Arrange to write the initial status file right away
1576          */
1577         gettimeofday(&next_statwrite, NULL);
1578         need_statwrite = TRUE;
1579
1580         /*
1581          * Read in an existing statistics stats file or initialize the stats
1582          * to zero.
1583          */
1584         pgStatRunningInCollector = TRUE;
1585         pgstat_read_statsfile(&pgStatDBHash, InvalidOid, NULL, NULL);
1586
1587         /*
1588          * Create the dead backend hashtable
1589          */
1590         memset(&hash_ctl, 0, sizeof(hash_ctl));
1591         hash_ctl.keysize = sizeof(int);
1592         hash_ctl.entrysize = sizeof(PgStat_StatBeDead);
1593         hash_ctl.hash = tag_hash;
1594         pgStatBeDead = hash_create("Dead Backends", PGSTAT_BE_HASH_SIZE,
1595                                                            &hash_ctl, HASH_ELEM | HASH_FUNCTION);
1596         if (pgStatBeDead == NULL)
1597         {
1598                 /* assume the problem is out-of-memory */
1599                 ereport(LOG,
1600                                 (errcode(ERRCODE_OUT_OF_MEMORY),
1601                                  errmsg("out of memory in statistics collector --- abort")));
1602                 exit(1);
1603         }
1604
1605         /*
1606          * Create the known backends table
1607          */
1608         pgStatBeTable = (PgStat_StatBeEntry *) malloc(
1609                                                            sizeof(PgStat_StatBeEntry) * MaxBackends);
1610         if (pgStatBeTable == NULL)
1611         {
1612                 ereport(LOG,
1613                                 (errcode(ERRCODE_OUT_OF_MEMORY),
1614                                  errmsg("out of memory in statistics collector --- abort")));
1615                 exit(1);
1616         }
1617         memset(pgStatBeTable, 0, sizeof(PgStat_StatBeEntry) * MaxBackends);
1618
1619         readPipe = pgStatPipe[0];
1620
1621         /*
1622          * Process incoming messages and handle all the reporting stuff until
1623          * there are no more messages.
1624          */
1625         for (;;)
1626         {
1627                 /*
1628                  * If we need to write the status file again (there have been
1629                  * changes in the statistics since we wrote it last) calculate the
1630                  * timeout until we have to do so.
1631                  */
1632                 if (need_statwrite)
1633                 {
1634                         struct timeval now;
1635
1636                         gettimeofday(&now, NULL);
1637                         /* avoid assuming that tv_sec is signed */
1638                         if (now.tv_sec > next_statwrite.tv_sec ||
1639                                 (now.tv_sec == next_statwrite.tv_sec &&
1640                                  now.tv_usec >= next_statwrite.tv_usec))
1641                         {
1642                                 timeout.tv_sec = 0;
1643                                 timeout.tv_usec = 0;
1644                         }
1645                         else
1646                         {
1647                                 timeout.tv_sec = next_statwrite.tv_sec - now.tv_sec;
1648                                 timeout.tv_usec = next_statwrite.tv_usec - now.tv_usec;
1649                                 if (timeout.tv_usec < 0)
1650                                 {
1651                                         timeout.tv_sec--;
1652                                         timeout.tv_usec += 1000000;
1653                                 }
1654                         }
1655                 }
1656
1657                 /*
1658                  * Setup the descriptor set for select(2)
1659                  */
1660                 FD_ZERO(&rfds);
1661                 FD_SET(readPipe, &rfds);
1662
1663                 /*
1664                  * Now wait for something to do.
1665                  */
1666                 nready = select(readPipe+1, &rfds, NULL, NULL,
1667                                                 (need_statwrite) ? &timeout : NULL);
1668                 if (nready < 0)
1669                 {
1670                         if (errno == EINTR)
1671                                 continue;
1672                         ereport(LOG,
1673                                         (errcode_for_socket_access(),
1674                                    errmsg("select() failed in statistics collector: %m")));
1675                         exit(1);
1676                 }
1677
1678                 /*
1679                  * If there are no descriptors ready, our timeout for writing the
1680                  * stats file happened.
1681                  */
1682                 if (nready == 0)
1683                 {
1684                         pgstat_write_statsfile();
1685                         need_statwrite = FALSE;
1686
1687                         continue;
1688                 }
1689
1690                 /*
1691                  * Check if there is a new statistics message to collect.
1692                  */
1693                 if (FD_ISSET(readPipe, &rfds))
1694                 {
1695                         /*
1696                          * We may need to issue multiple read calls in case the buffer
1697                          * process didn't write the message in a single write, which
1698                          * is possible since it dumps its buffer bytewise. In any
1699                          * case, we'd need two reads since we don't know the message
1700                          * length initially.
1701                          */
1702                         int                     nread = 0;
1703                         int                     targetlen = sizeof(PgStat_MsgHdr);              /* initial */
1704                         bool            pipeEOF = false;
1705
1706                         while (nread < targetlen)
1707                         {
1708                                 len = piperead(readPipe, ((char *) &msg) + nread,
1709                                                                 targetlen - nread);
1710                                 if (len < 0)
1711                                 {
1712                                         if (errno == EINTR)
1713                                                 continue;
1714                                         ereport(LOG,
1715                                                         (errcode_for_socket_access(),
1716                                                          errmsg("could not read from statistics collector pipe: %m")));
1717                                         exit(1);
1718                                 }
1719                                 if (len == 0)   /* EOF on the pipe! */
1720                                 {
1721                                         pipeEOF = true;
1722                                         break;
1723                                 }
1724                                 nread += len;
1725                                 if (nread == sizeof(PgStat_MsgHdr))
1726                                 {
1727                                         /* we have the header, compute actual msg length */
1728                                         targetlen = msg.msg_hdr.m_size;
1729                                         if (targetlen < (int) sizeof(PgStat_MsgHdr) ||
1730                                                 targetlen > (int) sizeof(msg))
1731                                         {
1732                                                 /*
1733                                                  * Bogus message length implies that we got out of
1734                                                  * sync with the buffer process somehow. Abort so
1735                                                  * that we can restart both processes.
1736                                                  */
1737                                                 ereport(LOG,
1738                                                   (errmsg("invalid statistics message length")));
1739                                                 exit(1);
1740                                         }
1741                                 }
1742                         }
1743
1744                         /*
1745                          * EOF on the pipe implies that the buffer process exited.
1746                          * Fall out of outer loop.
1747                          */
1748                         if (pipeEOF)
1749                                 break;
1750
1751                         /*
1752                          * Distribute the message to the specific function handling
1753                          * it.
1754                          */
1755                         switch (msg.msg_hdr.m_type)
1756                         {
1757                                 case PGSTAT_MTYPE_DUMMY:
1758                                         break;
1759
1760                                 case PGSTAT_MTYPE_BESTART:
1761                                         pgstat_recv_bestart((PgStat_MsgBestart *) &msg, nread);
1762                                         break;
1763
1764                                 case PGSTAT_MTYPE_BETERM:
1765                                         pgstat_recv_beterm((PgStat_MsgBeterm *) &msg, nread);
1766                                         break;
1767
1768                                 case PGSTAT_MTYPE_TABSTAT:
1769                                         pgstat_recv_tabstat((PgStat_MsgTabstat *) &msg, nread);
1770                                         break;
1771
1772                                 case PGSTAT_MTYPE_TABPURGE:
1773                                         pgstat_recv_tabpurge((PgStat_MsgTabpurge *) &msg, nread);
1774                                         break;
1775
1776                                 case PGSTAT_MTYPE_ACTIVITY:
1777                                         pgstat_recv_activity((PgStat_MsgActivity *) &msg, nread);
1778                                         break;
1779
1780                                 case PGSTAT_MTYPE_DROPDB:
1781                                         pgstat_recv_dropdb((PgStat_MsgDropdb *) &msg, nread);
1782                                         break;
1783
1784                                 case PGSTAT_MTYPE_RESETCOUNTER:
1785                                         pgstat_recv_resetcounter((PgStat_MsgResetcounter *) &msg,
1786                                                                                          nread);
1787                                         break;
1788
1789                                 default:
1790                                         break;
1791                         }
1792
1793                         /*
1794                          * Globally count messages.
1795                          */
1796                         pgStatNumMessages++;
1797
1798                         /*
1799                          * If this is the first message after we wrote the stats file
1800                          * the last time, setup the timeout that it'd be written.
1801                          */
1802                         if (!need_statwrite)
1803                         {
1804                                 gettimeofday(&next_statwrite, NULL);
1805                                 next_statwrite.tv_usec += ((PGSTAT_STAT_INTERVAL) * 1000);
1806                                 next_statwrite.tv_sec += (next_statwrite.tv_usec / 1000000);
1807                                 next_statwrite.tv_usec %= 1000000;
1808                                 need_statwrite = TRUE;
1809                         }
1810                 }
1811
1812                 /*
1813                  * Note that we do NOT check for postmaster exit inside the loop;
1814                  * only EOF on the buffer pipe causes us to fall out.  This
1815                  * ensures we don't exit prematurely if there are still a few
1816                  * messages in the buffer or pipe at postmaster shutdown.
1817                  */
1818         }
1819
1820         /*
1821          * Okay, we saw EOF on the buffer pipe, so there are no more messages
1822          * to process.  If the buffer process quit because of postmaster
1823          * shutdown, we want to save the final stats to reuse at next startup.
1824          * But if the buffer process failed, it seems best not to (there may
1825          * even now be a new collector firing up, and we don't want it to read
1826          * a partially- rewritten stats file).  We can tell whether the
1827          * postmaster is still alive by checking to see if the postmaster pipe
1828          * is still open.  If it is read-ready (ie, EOF), the postmaster must
1829          * have quit.
1830          */
1831         FD_ZERO(&rfds);
1832         FD_SET(pmPipe, &rfds);
1833         timeout.tv_sec = 0;
1834         timeout.tv_usec = 0;
1835         nready = select(pmPipe+1,&rfds,NULL,NULL,&timeout);
1836         if (nready > 0 && FD_ISSET(pmPipe, &rfds)) 
1837                 pgstat_write_statsfile();
1838 }
1839
1840
1841 /* ----------
1842  * pgstat_recvbuffer() -
1843  *
1844  *      This is the body of the separate buffering process. Its only
1845  *      purpose is to receive messages from the UDP socket as fast as
1846  *      possible and forward them over a pipe into the collector itself.
1847  *      If the collector is slow to absorb messages, they are buffered here.
1848  * ----------
1849  */
1850 static void
1851 pgstat_recvbuffer(void)
1852 {
1853         fd_set          rfds;
1854         fd_set          wfds;
1855         int                     writePipe = pgStatPipe[1];
1856         int                     pmPipe = pgStatPmPipe[0];
1857         int                     maxfd;
1858         int                     nready;
1859         int                     len;
1860         int                     xfr;
1861         int                     frm;
1862         PgStat_Msg      input_buffer;
1863         char       *msgbuffer;
1864         int                     msg_send = 0;   /* next send index in buffer */
1865         int                     msg_recv = 0;   /* next receive index */
1866         int                     msg_have = 0;   /* number of bytes stored */
1867         bool            overflow = false;
1868
1869         /*
1870          * Identify myself via ps
1871          */
1872         init_ps_display("stats buffer process", "", "");
1873         set_ps_display("");
1874
1875         /*
1876          * We want to die if our child collector process does.  There are two
1877          * ways we might notice that it has died: receive SIGCHLD, or get a
1878          * write failure on the pipe leading to the child.      We can set SIGPIPE
1879          * to kill us here.  Our SIGCHLD handler was already set up before we
1880          * forked (must do it that way, else it's a race condition).
1881          */
1882         pqsignal(SIGPIPE, SIG_DFL);
1883         PG_SETMASK(&UnBlockSig);
1884
1885         /*
1886          * Set the write pipe to nonblock mode, so that we cannot block when
1887          * the collector falls behind.
1888          */
1889         if (!set_noblock(writePipe))
1890         {
1891                 ereport(LOG,
1892                                 (errcode_for_socket_access(),
1893                   errmsg("could not set statistics collector pipe to nonblocking mode: %m")));
1894                 exit(1);
1895         }
1896
1897         /*
1898          * Allocate the message buffer
1899          */
1900         msgbuffer = (char *) malloc(PGSTAT_RECVBUFFERSZ);
1901         if (msgbuffer == NULL)
1902         {
1903                 ereport(LOG,
1904                                 (errcode(ERRCODE_OUT_OF_MEMORY),
1905                          errmsg("out of memory in statistics collector --- abort")));
1906                 exit(1);
1907         }
1908
1909         /*
1910          * Loop forever
1911          */
1912         for (;;)
1913         {
1914                 FD_ZERO(&rfds);
1915                 FD_ZERO(&wfds);
1916                 maxfd = -1;
1917
1918                 /*
1919                  * As long as we have buffer space we add the socket to the read
1920                  * descriptor set.
1921                  */
1922                 if (msg_have <= (int) (PGSTAT_RECVBUFFERSZ - sizeof(PgStat_Msg)))
1923                 {
1924                         FD_SET(pgStatSock, &rfds);
1925                         maxfd = pgStatSock;
1926                         overflow = false;
1927                 }
1928                 else
1929                 {
1930                         if (!overflow)
1931                         {
1932                                 ereport(LOG,
1933                                                 (errmsg("statistics buffer is full")));
1934                                 overflow = true;
1935                         }
1936                 }
1937
1938                 /*
1939                  * If we have messages to write out, we add the pipe to the write
1940                  * descriptor set. Otherwise, we check if the postmaster might
1941                  * have terminated.
1942                  */
1943                 if (msg_have > 0)
1944                 {
1945                         FD_SET(writePipe, &wfds);
1946                         if (writePipe > maxfd)
1947                                 maxfd = writePipe;
1948                 }
1949                 else
1950                 {
1951                         FD_SET(pmPipe, &rfds);
1952                         if (pmPipe > maxfd)
1953                                 maxfd = pmPipe;
1954                 }
1955
1956                 /*
1957                  * Wait for some work to do.
1958                  */
1959                 nready = select(maxfd + 1, &rfds, &wfds, NULL, NULL);
1960                 if (nready < 0)
1961                 {
1962                         if (errno == EINTR)
1963                                 continue;
1964                         ereport(LOG,
1965                                         (errcode_for_socket_access(),
1966                                          errmsg("select() failed in statistics buffer: %m")));
1967                         exit(1);
1968                 }
1969
1970                 /*
1971                  * If there is a message on the socket, read it and check for
1972                  * validity.
1973                  */
1974                 if (FD_ISSET(pgStatSock, &rfds))
1975                 {
1976                         len = recv(pgStatSock, (char *) &input_buffer,
1977                                            sizeof(PgStat_Msg), 0);
1978                         if (len < 0)
1979                         {
1980                                 ereport(LOG,
1981                                                 (errcode_for_socket_access(),
1982                                            errmsg("could not read statistics message: %m")));
1983                                 exit(1);
1984                         }
1985
1986                         /*
1987                          * We ignore messages that are smaller than our common header
1988                          */
1989                         if (len < sizeof(PgStat_MsgHdr))
1990                                 continue;
1991
1992                         /*
1993                          * The received length must match the length in the header
1994                          */
1995                         if (input_buffer.msg_hdr.m_size != len)
1996                                 continue;
1997
1998                         /*
1999                          * O.K. - we accept this message.  Copy it to the circular
2000                          * msgbuffer.
2001                          */
2002                         frm = 0;
2003                         while (len > 0)
2004                         {
2005                                 xfr = PGSTAT_RECVBUFFERSZ - msg_recv;
2006                                 if (xfr > len)
2007                                         xfr = len;
2008                                 Assert(xfr > 0);
2009                                 memcpy(msgbuffer + msg_recv,
2010                                            ((char *) &input_buffer) + frm,
2011                                            xfr);
2012                                 msg_recv += xfr;
2013                                 if (msg_recv == PGSTAT_RECVBUFFERSZ)
2014                                         msg_recv = 0;
2015                                 msg_have += xfr;
2016                                 frm += xfr;
2017                                 len -= xfr;
2018                         }
2019                 }
2020
2021                 /*
2022                  * If the collector is ready to receive, write some data into his
2023                  * pipe.  We may or may not be able to write all that we have.
2024                  *
2025                  * NOTE: if what we have is less than PIPE_BUF bytes but more than
2026                  * the space available in the pipe buffer, most kernels will
2027                  * refuse to write any of it, and will return EAGAIN.  This means
2028                  * we will busy-loop until the situation changes (either because
2029                  * the collector caught up, or because more data arrives so that
2030                  * we have more than PIPE_BUF bytes buffered).  This is not good,
2031                  * but is there any way around it?      We have no way to tell when
2032                  * the collector has caught up...
2033                  */
2034                 if (FD_ISSET(writePipe, &wfds))
2035                 {
2036                         xfr = PGSTAT_RECVBUFFERSZ - msg_send;
2037                         if (xfr > msg_have)
2038                                 xfr = msg_have;
2039                         Assert(xfr > 0);
2040                         len = pipewrite(writePipe, msgbuffer + msg_send, xfr);
2041                         if (len < 0)
2042                         {
2043                                 if (errno == EINTR || errno == EAGAIN)
2044                                         continue;       /* not enough space in pipe */
2045                                 ereport(LOG,
2046                                                 (errcode_for_socket_access(),
2047                                                  errmsg("could not write to statistics collector pipe: %m")));
2048                                 exit(1);
2049                         }
2050                         /* NB: len < xfr is okay */
2051                         msg_send += len;
2052                         if (msg_send == PGSTAT_RECVBUFFERSZ)
2053                                 msg_send = 0;
2054                         msg_have -= len;
2055                 }
2056
2057                 /*
2058                  * Make sure we forwarded all messages before we check for
2059                  * Postmaster termination.
2060                  */
2061                 if (msg_have != 0 || FD_ISSET(pgStatSock, &rfds))
2062                         continue;
2063
2064                 /*
2065                  * If the pipe from the postmaster is ready for reading, the
2066                  * kernel must have closed it on exit() (the postmaster never
2067                  * really writes to it). So we've done our job.
2068                  */
2069                 if (FD_ISSET(pmPipe, &rfds))
2070                         exit(0);
2071         }
2072 }
2073
2074 static void
2075 pgstat_die(SIGNAL_ARGS)
2076 {
2077         exit(1);
2078 }
2079
2080
2081 /* ----------
2082  * pgstat_add_backend() -
2083  *
2084  *      Support function to keep our backend list up to date.
2085  * ----------
2086  */
2087 static int
2088 pgstat_add_backend(PgStat_MsgHdr *msg)
2089 {
2090         PgStat_StatDBEntry *dbentry;
2091         PgStat_StatBeEntry *beentry;
2092         PgStat_StatBeDead *deadbe;
2093         bool            found;
2094
2095         /*
2096          * Check that the backend ID is valid
2097          */
2098         if (msg->m_backendid < 1 || msg->m_backendid > MaxBackends)
2099         {
2100                 ereport(LOG,
2101                                 (errmsg("invalid server process ID %d", msg->m_backendid)));
2102                 return -1;
2103         }
2104
2105         /*
2106          * Get the slot for this backendid.
2107          */
2108         beentry = &pgStatBeTable[msg->m_backendid - 1];
2109         if (beentry->databaseid != InvalidOid)
2110         {
2111                 /*
2112                  * If the slot contains the PID of this backend, everything is
2113                  * fine and we got nothing to do.
2114                  */
2115                 if (beentry->procpid == msg->m_procpid)
2116                         return 0;
2117         }
2118
2119         /*
2120          * Lookup if this backend is known to be dead. This can be caused due
2121          * to messages arriving in the wrong order - i.e. Postmaster's BETERM
2122          * message might have arrived before we received all the backends
2123          * stats messages, or even a new backend with the same backendid was
2124          * faster in sending his BESTART.
2125          *
2126          * If the backend is known to be dead, we ignore this add.
2127          */
2128         deadbe = (PgStat_StatBeDead *) hash_search(pgStatBeDead,
2129                                                                                            (void *) &(msg->m_procpid),
2130                                                                                            HASH_FIND, NULL);
2131         if (deadbe)
2132                 return 1;
2133
2134         /*
2135          * Backend isn't known to be dead. If it's slot is currently used, we
2136          * have to kick out the old backend.
2137          */
2138         if (beentry->databaseid != InvalidOid)
2139                 pgstat_sub_backend(beentry->procpid);
2140
2141         /*
2142          * Put this new backend into the slot.
2143          */
2144         beentry->databaseid = msg->m_databaseid;
2145         beentry->procpid = msg->m_procpid;
2146         beentry->userid = msg->m_userid;
2147         beentry->activity_start_sec = 0;
2148         beentry->activity_start_usec = 0;
2149         MemSet(beentry->activity, 0, PGSTAT_ACTIVITY_SIZE);
2150
2151         /*
2152          * Lookup or create the database entry for this backends DB.
2153          */
2154         dbentry = (PgStat_StatDBEntry *) hash_search(pgStatDBHash,
2155                                                                                    (void *) &(msg->m_databaseid),
2156                                                                                                  HASH_ENTER, &found);
2157         if (dbentry == NULL)
2158         {
2159                 ereport(LOG,
2160                                 (errcode(ERRCODE_OUT_OF_MEMORY),
2161                          errmsg("out of memory in statistics collector --- abort")));
2162                 exit(1);
2163         }
2164
2165         /*
2166          * If not found, initialize the new one.
2167          */
2168         if (!found)
2169         {
2170                 HASHCTL         hash_ctl;
2171
2172                 dbentry->tables = NULL;
2173                 dbentry->n_xact_commit = 0;
2174                 dbentry->n_xact_rollback = 0;
2175                 dbentry->n_blocks_fetched = 0;
2176                 dbentry->n_blocks_hit = 0;
2177                 dbentry->n_connects = 0;
2178                 dbentry->destroy = 0;
2179
2180                 memset(&hash_ctl, 0, sizeof(hash_ctl));
2181                 hash_ctl.keysize = sizeof(Oid);
2182                 hash_ctl.entrysize = sizeof(PgStat_StatTabEntry);
2183                 hash_ctl.hash = tag_hash;
2184                 dbentry->tables = hash_create("Per-database table",
2185                                                                           PGSTAT_TAB_HASH_SIZE,
2186                                                                           &hash_ctl,
2187                                                                           HASH_ELEM | HASH_FUNCTION);
2188                 if (dbentry->tables == NULL)
2189                 {
2190                         /* assume the problem is out-of-memory */
2191                         ereport(LOG,
2192                                         (errcode(ERRCODE_OUT_OF_MEMORY),
2193                          errmsg("out of memory in statistics collector --- abort")));
2194                         exit(1);
2195                 }
2196         }
2197
2198         /*
2199          * Count number of connects to the database
2200          */
2201         dbentry->n_connects++;
2202
2203         return 0;
2204 }
2205
2206
2207 /* ----------
2208  * pgstat_sub_backend() -
2209  *
2210  *      Remove a backend from the actual backends list.
2211  * ----------
2212  */
2213 static void
2214 pgstat_sub_backend(int procpid)
2215 {
2216         int                     i;
2217         PgStat_StatBeDead *deadbe;
2218         bool            found;
2219
2220         /*
2221          * Search in the known-backends table for the slot containing this
2222          * PID.
2223          */
2224         for (i = 0; i < MaxBackends; i++)
2225         {
2226                 if (pgStatBeTable[i].databaseid != InvalidOid &&
2227                         pgStatBeTable[i].procpid == procpid)
2228                 {
2229                         /*
2230                          * That's him. Add an entry to the known to be dead backends.
2231                          * Due to possible misorder in the arrival of UDP packets it's
2232                          * possible that even if we know the backend is dead, there
2233                          * could still be messages queued that arrive later. Those
2234                          * messages must not cause our number of backends statistics
2235                          * to get screwed up, so we remember for a couple of seconds
2236                          * that this PID is dead and ignore them (only the counting of
2237                          * backends, not the table access stats they sent).
2238                          */
2239                         deadbe = (PgStat_StatBeDead *) hash_search(pgStatBeDead,
2240                                                                                                            (void *) &procpid,
2241                                                                                                            HASH_ENTER,
2242                                                                                                            &found);
2243                         if (deadbe == NULL)
2244                         {
2245                                 ereport(LOG,
2246                                                 (errcode(ERRCODE_OUT_OF_MEMORY),
2247                                                  errmsg("out of memory in statistics collector --- abort")));
2248                                 exit(1);
2249                         }
2250                         if (!found)
2251                         {
2252                                 deadbe->backendid = i + 1;
2253                                 deadbe->destroy = PGSTAT_DESTROY_COUNT;
2254                         }
2255
2256                         /*
2257                          * Declare the backend slot empty.
2258                          */
2259                         pgStatBeTable[i].databaseid = InvalidOid;
2260                         return;
2261                 }
2262         }
2263
2264         /*
2265          * No big problem if not found. This can happen if UDP messages arrive
2266          * out of order here.
2267          */
2268 }
2269
2270
2271 /* ----------
2272  * pgstat_write_statsfile() -
2273  *
2274  *      Tell the news.
2275  * ----------
2276  */
2277 static void
2278 pgstat_write_statsfile(void)
2279 {
2280         HASH_SEQ_STATUS hstat;
2281         HASH_SEQ_STATUS tstat;
2282         PgStat_StatDBEntry *dbentry;
2283         PgStat_StatTabEntry *tabentry;
2284         PgStat_StatBeDead *deadbe;
2285         FILE       *fpout;
2286         int                     i;
2287
2288         /*
2289          * Open the statistics temp file to write out the current values.
2290          */
2291         fpout = fopen(pgStat_tmpfname, PG_BINARY_W);
2292         if (fpout == NULL)
2293         {
2294                 ereport(LOG,
2295                                 (errcode_for_file_access(),
2296                                  errmsg("could not open temporary statistics file \"%s\": %m",
2297                                                 pgStat_tmpfname)));
2298                 return;
2299         }
2300
2301         /*
2302          * Walk through the database table.
2303          */
2304         hash_seq_init(&hstat, pgStatDBHash);
2305         while ((dbentry = (PgStat_StatDBEntry *) hash_seq_search(&hstat)) != NULL)
2306         {
2307                 /*
2308                  * If this database is marked destroyed, count down and do so if
2309                  * it reaches 0.
2310                  */
2311                 if (dbentry->destroy > 0)
2312                 {
2313                         if (--(dbentry->destroy) == 0)
2314                         {
2315                                 if (dbentry->tables != NULL)
2316                                         hash_destroy(dbentry->tables);
2317
2318                                 if (hash_search(pgStatDBHash,
2319                                                                 (void *) &(dbentry->databaseid),
2320                                                                 HASH_REMOVE, NULL) == NULL)
2321                                 {
2322                                         ereport(LOG,
2323                                                         (errmsg("database hash table corrupted "
2324                                                                         "during cleanup --- abort")));
2325                                         exit(1);
2326                                 }
2327                         }
2328
2329                         /*
2330                          * Don't include statistics for it.
2331                          */
2332                         continue;
2333                 }
2334
2335                 /*
2336                  * Write out the DB line including the number of life backends.
2337                  */
2338                 fputc('D', fpout);
2339                 fwrite(dbentry, sizeof(PgStat_StatDBEntry), 1, fpout);
2340
2341                 /*
2342                  * Walk through the databases access stats per table.
2343                  */
2344                 hash_seq_init(&tstat, dbentry->tables);
2345                 while ((tabentry = (PgStat_StatTabEntry *) hash_seq_search(&tstat)) != NULL)
2346                 {
2347                         /*
2348                          * If table entry marked for destruction, same as above for
2349                          * the database entry.
2350                          */
2351                         if (tabentry->destroy > 0)
2352                         {
2353                                 if (--(tabentry->destroy) == 0)
2354                                 {
2355                                         if (hash_search(dbentry->tables,
2356                                                                         (void *) &(tabentry->tableid),
2357                                                                         HASH_REMOVE, NULL) == NULL)
2358                                         {
2359                                                 ereport(LOG,
2360                                                                 (errmsg("tables hash table for "
2361                                                                                 "database %u corrupted during "
2362                                                                                 "cleanup --- abort",
2363                                                                                 dbentry->databaseid)));
2364                                                 exit(1);
2365                                         }
2366                                 }
2367                                 continue;
2368                         }
2369
2370                         /*
2371                          * At least we think this is still a life table. Print it's
2372                          * access stats.
2373                          */
2374                         fputc('T', fpout);
2375                         fwrite(tabentry, sizeof(PgStat_StatTabEntry), 1, fpout);
2376                 }
2377
2378                 /*
2379                  * Mark the end of this DB
2380                  */
2381                 fputc('d', fpout);
2382         }
2383
2384         /*
2385          * Write out the known running backends to the stats file.
2386          */
2387         i = MaxBackends;
2388         fputc('M', fpout);
2389         fwrite(&i, sizeof(i), 1, fpout);
2390
2391         for (i = 0; i < MaxBackends; i++)
2392         {
2393                 if (pgStatBeTable[i].databaseid != InvalidOid)
2394                 {
2395                         fputc('B', fpout);
2396                         fwrite(&pgStatBeTable[i], sizeof(PgStat_StatBeEntry), 1, fpout);
2397                 }
2398         }
2399
2400         /*
2401          * No more output to be done. Close the temp file and replace the old
2402          * pgstat.stat with it.
2403          */
2404         fputc('E', fpout);
2405         if (fclose(fpout) < 0)
2406         {
2407                 ereport(LOG,
2408                                 (errcode_for_file_access(),
2409                                  errmsg("could not close temporary statistics file \"%s\": %m",
2410                                                 pgStat_tmpfname)));
2411         }
2412         else
2413         {
2414                 if (rename(pgStat_tmpfname, pgStat_fname) < 0)
2415                 {
2416                         ereport(LOG,
2417                                         (errcode_for_file_access(),
2418                                          errmsg("could not rename temporary statistics file \"%s\" to \"%s\": %m",
2419                                                         pgStat_tmpfname, pgStat_fname)));
2420                 }
2421         }
2422
2423         /*
2424          * Clear out the dead backends table
2425          */
2426         hash_seq_init(&hstat, pgStatBeDead);
2427         while ((deadbe = (PgStat_StatBeDead *) hash_seq_search(&hstat)) != NULL)
2428         {
2429                 /*
2430                  * Count down the destroy delay and remove entries where it
2431                  * reaches 0.
2432                  */
2433                 if (--(deadbe->destroy) <= 0)
2434                 {
2435                         if (hash_search(pgStatBeDead,
2436                                                         (void *) &(deadbe->procpid),
2437                                                         HASH_REMOVE, NULL) == NULL)
2438                         {
2439                                 ereport(LOG,
2440                                                 (errmsg("dead-server-process hash table corrupted "
2441                                                                 "during cleanup --- abort")));
2442                                 exit(1);
2443                         }
2444                 }
2445         }
2446 }
2447
2448
2449 /* ----------
2450  * pgstat_read_statsfile() -
2451  *
2452  *      Reads in an existing statistics collector and initializes the
2453  *      databases hash table (who's entries point to the tables hash tables)
2454  *      and the current backend table.
2455  * ----------
2456  */
2457 static void
2458 pgstat_read_statsfile(HTAB **dbhash, Oid onlydb,
2459                                           PgStat_StatBeEntry **betab, int *numbackends)
2460 {
2461         PgStat_StatDBEntry *dbentry;
2462         PgStat_StatDBEntry dbbuf;
2463         PgStat_StatTabEntry *tabentry;
2464         PgStat_StatTabEntry tabbuf;
2465         HASHCTL         hash_ctl;
2466         HTAB       *tabhash = NULL;
2467         FILE       *fpin;
2468         int                     maxbackends = 0;
2469         int                     havebackends = 0;
2470         bool            found;
2471         MemoryContext use_mcxt;
2472         int                     mcxt_flags;
2473
2474         /*
2475          * If running in the collector we use the DynaHashCxt memory context.
2476          * If running in a backend, we use the TopTransactionContext instead,
2477          * so the caller must only know the last XactId when this call
2478          * happened to know if his tables are still valid or already gone!
2479          */
2480         if (pgStatRunningInCollector)
2481         {
2482                 use_mcxt = NULL;
2483                 mcxt_flags = 0;
2484         }
2485         else
2486         {
2487                 use_mcxt = TopTransactionContext;
2488                 mcxt_flags = HASH_CONTEXT;
2489         }
2490
2491         /*
2492          * Create the DB hashtable
2493          */
2494         memset(&hash_ctl, 0, sizeof(hash_ctl));
2495         hash_ctl.keysize = sizeof(Oid);
2496         hash_ctl.entrysize = sizeof(PgStat_StatDBEntry);
2497         hash_ctl.hash = tag_hash;
2498         hash_ctl.hcxt = use_mcxt;
2499         *dbhash = hash_create("Databases hash", PGSTAT_DB_HASH_SIZE, &hash_ctl,
2500                                                   HASH_ELEM | HASH_FUNCTION | mcxt_flags);
2501         if (*dbhash == NULL)
2502         {
2503                 /* assume the problem is out-of-memory */
2504                 if (pgStatRunningInCollector)
2505                 {
2506                         ereport(LOG,
2507                                         (errcode(ERRCODE_OUT_OF_MEMORY),
2508                          errmsg("out of memory in statistics collector --- abort")));
2509                         exit(1);
2510                 }
2511                 /* in backend, can do normal error */
2512                 ereport(ERROR,
2513                                 (errcode(ERRCODE_OUT_OF_MEMORY),
2514                                  errmsg("out of memory")));
2515         }
2516
2517         /*
2518          * Initialize the number of known backends to zero, just in case we do
2519          * a silent error return below.
2520          */
2521         if (numbackends != NULL)
2522                 *numbackends = 0;
2523         if (betab != NULL)
2524                 *betab = NULL;
2525
2526         /*
2527          * In EXEC_BACKEND case, we won't have inherited pgStat_fname from
2528          * postmaster, so compute it first time through.
2529          */
2530 #ifdef EXEC_BACKEND
2531         if (pgStat_fname[0] == '\0')
2532         {
2533                 Assert(DataDir != NULL);
2534                 snprintf(pgStat_fname, MAXPGPATH, PGSTAT_STAT_FILENAME, DataDir);
2535         }
2536 #endif
2537
2538         /*
2539          * Try to open the status file. If it doesn't exist, the backends
2540          * simply return zero for anything and the collector simply starts
2541          * from scratch with empty counters.
2542          */
2543         if ((fpin = fopen(pgStat_fname, PG_BINARY_R)) == NULL)
2544                 return;
2545
2546         /*
2547          * We found an existing collector stats file. Read it and put all the
2548          * hashtable entries into place.
2549          */
2550         for (;;)
2551         {
2552                 switch (fgetc(fpin))
2553                 {
2554                                 /*
2555                                  * 'D'  A PgStat_StatDBEntry struct describing a database
2556                                  * follows. Subsequently, zero to many 'T' entries will
2557                                  * follow until a 'd' is encountered.
2558                                  */
2559                         case 'D':
2560                                 if (fread(&dbbuf, 1, sizeof(dbbuf), fpin) != sizeof(dbbuf))
2561                                 {
2562                                         ereport(pgStatRunningInCollector ? LOG : WARNING,
2563                                                         (errmsg("corrupted pgstat.stat file")));
2564                                         fclose(fpin);
2565                                         return;
2566                                 }
2567
2568                                 /*
2569                                  * Add to the DB hash
2570                                  */
2571                                 dbentry = (PgStat_StatDBEntry *) hash_search(*dbhash,
2572                                                                                           (void *) &dbbuf.databaseid,
2573                                                                                                                          HASH_ENTER,
2574                                                                                                                          &found);
2575                                 if (dbentry == NULL)
2576                                 {
2577                                         if (pgStatRunningInCollector)
2578                                         {
2579                                                 ereport(LOG,
2580                                                                 (errcode(ERRCODE_OUT_OF_MEMORY),
2581                                                                  errmsg("out of memory in statistics collector --- abort")));
2582                                                 exit(1);
2583                                         }
2584                                         else
2585                                         {
2586                                                 fclose(fpin);
2587                                                 ereport(ERROR,
2588                                                                 (errcode(ERRCODE_OUT_OF_MEMORY),
2589                                                                  errmsg("out of memory")));
2590                                         }
2591                                 }
2592                                 if (found)
2593                                 {
2594                                         ereport(pgStatRunningInCollector ? LOG : WARNING,
2595                                                         (errmsg("corrupted pgstat.stat file")));
2596                                         fclose(fpin);
2597                                         return;
2598                                 }
2599
2600                                 memcpy(dbentry, &dbbuf, sizeof(PgStat_StatDBEntry));
2601                                 dbentry->tables = NULL;
2602                                 dbentry->destroy = 0;
2603                                 dbentry->n_backends = 0;
2604
2605                                 /*
2606                                  * Don't collect tables if not the requested DB
2607                                  */
2608                                 if (onlydb != InvalidOid && onlydb != dbbuf.databaseid)
2609                                         break;
2610
2611                                 memset(&hash_ctl, 0, sizeof(hash_ctl));
2612                                 hash_ctl.keysize = sizeof(Oid);
2613                                 hash_ctl.entrysize = sizeof(PgStat_StatTabEntry);
2614                                 hash_ctl.hash = tag_hash;
2615                                 hash_ctl.hcxt = use_mcxt;
2616                                 dbentry->tables = hash_create("Per-database table",
2617                                                                                           PGSTAT_TAB_HASH_SIZE,
2618                                                                                           &hash_ctl,
2619                                                                  HASH_ELEM | HASH_FUNCTION | mcxt_flags);
2620                                 if (dbentry->tables == NULL)
2621                                 {
2622                                         /* assume the problem is out-of-memory */
2623                                         if (pgStatRunningInCollector)
2624                                         {
2625                                                 ereport(LOG,
2626                                                                 (errcode(ERRCODE_OUT_OF_MEMORY),
2627                                                                  errmsg("out of memory in statistics collector --- abort")));
2628                                                 exit(1);
2629                                         }
2630                                         /* in backend, can do normal error */
2631                                         fclose(fpin);
2632                                         ereport(ERROR,
2633                                                         (errcode(ERRCODE_OUT_OF_MEMORY),
2634                                                          errmsg("out of memory")));
2635                                 }
2636
2637                                 /*
2638                                  * Arrange that following 'T's add entries to this
2639                                  * databases tables hash table.
2640                                  */
2641                                 tabhash = dbentry->tables;
2642                                 break;
2643
2644                                 /*
2645                                  * 'd'  End of this database.
2646                                  */
2647                         case 'd':
2648                                 tabhash = NULL;
2649                                 break;
2650
2651                                 /*
2652                                  * 'T'  A PgStat_StatTabEntry follows.
2653                                  */
2654                         case 'T':
2655                                 if (fread(&tabbuf, 1, sizeof(tabbuf), fpin) != sizeof(tabbuf))
2656                                 {
2657                                         ereport(pgStatRunningInCollector ? LOG : WARNING,
2658                                                         (errmsg("corrupted pgstat.stat file")));
2659                                         fclose(fpin);
2660                                         return;
2661                                 }
2662
2663                                 /*
2664                                  * Skip if table belongs to a not requested database.
2665                                  */
2666                                 if (tabhash == NULL)
2667                                         break;
2668
2669                                 tabentry = (PgStat_StatTabEntry *) hash_search(tabhash,
2670                                                                                                 (void *) &tabbuf.tableid,
2671                                                                                                          HASH_ENTER, &found);
2672                                 if (tabentry == NULL)
2673                                 {
2674                                         if (pgStatRunningInCollector)
2675                                         {
2676                                                 ereport(LOG,
2677                                                                 (errcode(ERRCODE_OUT_OF_MEMORY),
2678                                                                  errmsg("out of memory in statistics collector --- abort")));
2679                                                 exit(1);
2680                                         }
2681                                         /* in backend, can do normal error */
2682                                         fclose(fpin);
2683                                         ereport(ERROR,
2684                                                         (errcode(ERRCODE_OUT_OF_MEMORY),
2685                                                          errmsg("out of memory")));
2686                                 }
2687
2688                                 if (found)
2689                                 {
2690                                         ereport(pgStatRunningInCollector ? LOG : WARNING,
2691                                                         (errmsg("corrupted pgstat.stat file")));
2692                                         fclose(fpin);
2693                                         return;
2694                                 }
2695
2696                                 memcpy(tabentry, &tabbuf, sizeof(tabbuf));
2697                                 break;
2698
2699                                 /*
2700                                  * 'M'  The maximum number of backends to expect follows.
2701                                  */
2702                         case 'M':
2703                                 if (betab == NULL || numbackends == NULL)
2704                                 {
2705                                         fclose(fpin);
2706                                         return;
2707                                 }
2708                                 if (fread(&maxbackends, 1, sizeof(maxbackends), fpin) !=
2709                                         sizeof(maxbackends))
2710                                 {
2711                                         ereport(pgStatRunningInCollector ? LOG : WARNING,
2712                                                         (errmsg("corrupted pgstat.stat file")));
2713                                         fclose(fpin);
2714                                         return;
2715                                 }
2716                                 if (maxbackends == 0)
2717                                 {
2718                                         fclose(fpin);
2719                                         return;
2720                                 }
2721
2722                                 /*
2723                                  * Allocate space (in TopTransactionContext too) for the
2724                                  * backend table.
2725                                  */
2726                                 if (use_mcxt == NULL)
2727                                         *betab = (PgStat_StatBeEntry *) malloc(
2728                                                            sizeof(PgStat_StatBeEntry) * maxbackends);
2729                                 else
2730                                         *betab = (PgStat_StatBeEntry *) MemoryContextAlloc(
2731                                                                                                                                 use_mcxt,
2732                                                            sizeof(PgStat_StatBeEntry) * maxbackends);
2733                                 break;
2734
2735                                 /*
2736                                  * 'B'  A PgStat_StatBeEntry follows.
2737                                  */
2738                         case 'B':
2739                                 if (betab == NULL || numbackends == NULL)
2740                                 {
2741                                         fclose(fpin);
2742                                         return;
2743                                 }
2744                                 if (*betab == NULL)
2745                                 {
2746                                         fclose(fpin);
2747                                         return;
2748                                 }
2749
2750                                 /*
2751                                  * Read it directly into the table.
2752                                  */
2753                                 if (fread(&(*betab)[havebackends], 1,
2754                                                   sizeof(PgStat_StatBeEntry), fpin) !=
2755                                         sizeof(PgStat_StatBeEntry))
2756                                 {
2757                                         ereport(pgStatRunningInCollector ? LOG : WARNING,
2758                                                         (errmsg("corrupted pgstat.stat file")));
2759                                         fclose(fpin);
2760                                         return;
2761                                 }
2762
2763                                 /*
2764                                  * Count backends per database here.
2765                                  */
2766                                 dbentry = (PgStat_StatDBEntry *) hash_search(*dbhash,
2767                                                    (void *) &((*betab)[havebackends].databaseid),
2768                                                                                                                 HASH_FIND, NULL);
2769                                 if (dbentry)
2770                                         dbentry->n_backends++;
2771
2772                                 havebackends++;
2773                                 if (numbackends != 0)
2774                                         *numbackends = havebackends;
2775                                 if (havebackends >= maxbackends)
2776                                 {
2777                                         fclose(fpin);
2778                                         return;
2779                                 }
2780                                 break;
2781
2782                                 /*
2783                                  * 'E'  The EOF marker of a complete stats file.
2784                                  */
2785                         case 'E':
2786                                 fclose(fpin);
2787                                 return;
2788
2789                         default:
2790                                 ereport(pgStatRunningInCollector ? LOG : WARNING,
2791                                                 (errmsg("corrupted pgstat.stat file")));
2792                                 fclose(fpin);
2793                                 return;
2794                 }
2795         }
2796
2797         fclose(fpin);
2798 }
2799
2800
2801 /* ----------
2802  * pgstat_recv_bestart() -
2803  *
2804  *      Process a backend starup message.
2805  * ----------
2806  */
2807 static void
2808 pgstat_recv_bestart(PgStat_MsgBestart *msg, int len)
2809 {
2810         pgstat_add_backend(&msg->m_hdr);
2811 }
2812
2813
2814 /* ----------
2815  * pgstat_recv_beterm() -
2816  *
2817  *      Process a backend termination message.
2818  * ----------
2819  */
2820 static void
2821 pgstat_recv_beterm(PgStat_MsgBeterm *msg, int len)
2822 {
2823         pgstat_sub_backend(msg->m_hdr.m_procpid);
2824 }
2825
2826
2827 /* ----------
2828  * pgstat_recv_activity() -
2829  *
2830  *      Remember what the backend is doing.
2831  * ----------
2832  */
2833 static void
2834 pgstat_recv_activity(PgStat_MsgActivity *msg, int len)
2835 {
2836         PgStat_StatBeEntry *entry;
2837
2838         /*
2839          * Here we check explicitly for 0 return, since we don't want to
2840          * mangle the activity of an active backend by a delayed packed from a
2841          * dead one.
2842          */
2843         if (pgstat_add_backend(&msg->m_hdr) != 0)
2844                 return;
2845
2846         entry = &(pgStatBeTable[msg->m_hdr.m_backendid - 1]);
2847
2848         strncpy(entry->activity, msg->m_what, PGSTAT_ACTIVITY_SIZE);
2849
2850         entry->activity_start_sec =
2851                 GetCurrentAbsoluteTimeUsec(&entry->activity_start_usec);
2852 }
2853
2854
2855 /* ----------
2856  * pgstat_recv_tabstat() -
2857  *
2858  *      Count what the backend has done.
2859  * ----------
2860  */
2861 static void
2862 pgstat_recv_tabstat(PgStat_MsgTabstat *msg, int len)
2863 {
2864         PgStat_TableEntry *tabmsg = &(msg->m_entry[0]);
2865         PgStat_StatDBEntry *dbentry;
2866         PgStat_StatTabEntry *tabentry;
2867         int                     i;
2868         bool            found;
2869
2870         /*
2871          * Make sure the backend is counted for.
2872          */
2873         if (pgstat_add_backend(&msg->m_hdr) < 0)
2874                 return;
2875
2876         /*
2877          * Lookup the database in the hashtable.
2878          */
2879         dbentry = (PgStat_StatDBEntry *) hash_search(pgStatDBHash,
2880                                                                          (void *) &(msg->m_hdr.m_databaseid),
2881                                                                                                  HASH_FIND, NULL);
2882         if (!dbentry)
2883                 return;
2884
2885         /*
2886          * If the database is marked for destroy, this is a delayed UDP packet
2887          * and not worth being counted.
2888          */
2889         if (dbentry->destroy > 0)
2890                 return;
2891
2892         dbentry->n_xact_commit += (PgStat_Counter) (msg->m_xact_commit);
2893         dbentry->n_xact_rollback += (PgStat_Counter) (msg->m_xact_rollback);
2894
2895         /*
2896          * Process all table entries in the message.
2897          */
2898         for (i = 0; i < msg->m_nentries; i++)
2899         {
2900                 tabentry = (PgStat_StatTabEntry *) hash_search(dbentry->tables,
2901                                                                                           (void *) &(tabmsg[i].t_id),
2902                                                                                                          HASH_ENTER, &found);
2903                 if (tabentry == NULL)
2904                 {
2905                         ereport(LOG,
2906                                         (errcode(ERRCODE_OUT_OF_MEMORY),
2907                          errmsg("out of memory in statistics collector --- abort")));
2908                         exit(1);
2909                 }
2910
2911                 if (!found)
2912                 {
2913                         /*
2914                          * If it's a new table entry, initialize counters to the
2915                          * values we just got.
2916                          */
2917                         tabentry->numscans = tabmsg[i].t_numscans;
2918                         tabentry->tuples_returned = tabmsg[i].t_tuples_returned;
2919                         tabentry->tuples_fetched = tabmsg[i].t_tuples_fetched;
2920                         tabentry->tuples_inserted = tabmsg[i].t_tuples_inserted;
2921                         tabentry->tuples_updated = tabmsg[i].t_tuples_updated;
2922                         tabentry->tuples_deleted = tabmsg[i].t_tuples_deleted;
2923                         tabentry->blocks_fetched = tabmsg[i].t_blocks_fetched;
2924                         tabentry->blocks_hit = tabmsg[i].t_blocks_hit;
2925
2926                         tabentry->destroy = 0;
2927                 }
2928                 else
2929                 {
2930                         /*
2931                          * Otherwise add the values to the existing entry.
2932                          */
2933                         tabentry->numscans += tabmsg[i].t_numscans;
2934                         tabentry->tuples_returned += tabmsg[i].t_tuples_returned;
2935                         tabentry->tuples_fetched += tabmsg[i].t_tuples_fetched;
2936                         tabentry->tuples_inserted += tabmsg[i].t_tuples_inserted;
2937                         tabentry->tuples_updated += tabmsg[i].t_tuples_updated;
2938                         tabentry->tuples_deleted += tabmsg[i].t_tuples_deleted;
2939                         tabentry->blocks_fetched += tabmsg[i].t_blocks_fetched;
2940                         tabentry->blocks_hit += tabmsg[i].t_blocks_hit;
2941                 }
2942
2943                 /*
2944                  * And add the block IO to the database entry.
2945                  */
2946                 dbentry->n_blocks_fetched += tabmsg[i].t_blocks_fetched;
2947                 dbentry->n_blocks_hit += tabmsg[i].t_blocks_hit;
2948         }
2949 }
2950
2951
2952 /* ----------
2953  * pgstat_recv_tabpurge() -
2954  *
2955  *      Arrange for dead table removal.
2956  * ----------
2957  */
2958 static void
2959 pgstat_recv_tabpurge(PgStat_MsgTabpurge *msg, int len)
2960 {
2961         PgStat_StatDBEntry *dbentry;
2962         PgStat_StatTabEntry *tabentry;
2963         int                     i;
2964
2965         /*
2966          * Make sure the backend is counted for.
2967          */
2968         if (pgstat_add_backend(&msg->m_hdr) < 0)
2969                 return;
2970
2971         /*
2972          * Lookup the database in the hashtable.
2973          */
2974         dbentry = (PgStat_StatDBEntry *) hash_search(pgStatDBHash,
2975                                                                          (void *) &(msg->m_hdr.m_databaseid),
2976                                                                                                  HASH_FIND, NULL);
2977         if (!dbentry)
2978                 return;
2979
2980         /*
2981          * If the database is marked for destroy, this is a delayed UDP packet
2982          * and the tables will go away at DB destruction.
2983          */
2984         if (dbentry->destroy > 0)
2985                 return;
2986
2987         /*
2988          * Process all table entries in the message.
2989          */
2990         for (i = 0; i < msg->m_nentries; i++)
2991         {
2992                 tabentry = (PgStat_StatTabEntry *) hash_search(dbentry->tables,
2993                                                                                    (void *) &(msg->m_tableid[i]),
2994                                                                                                            HASH_FIND, NULL);
2995                 if (tabentry)
2996                         tabentry->destroy = PGSTAT_DESTROY_COUNT;
2997         }
2998 }
2999
3000
3001 /* ----------
3002  * pgstat_recv_dropdb() -
3003  *
3004  *      Arrange for dead database removal
3005  * ----------
3006  */
3007 static void
3008 pgstat_recv_dropdb(PgStat_MsgDropdb *msg, int len)
3009 {
3010         PgStat_StatDBEntry *dbentry;
3011
3012         /*
3013          * Make sure the backend is counted for.
3014          */
3015         if (pgstat_add_backend(&msg->m_hdr) < 0)
3016                 return;
3017
3018         /*
3019          * Lookup the database in the hashtable.
3020          */
3021         dbentry = (PgStat_StatDBEntry *) hash_search(pgStatDBHash,
3022                                                                                    (void *) &(msg->m_databaseid),
3023                                                                                                  HASH_FIND, NULL);
3024         if (!dbentry)
3025                 return;
3026
3027         /*
3028          * Mark the database for destruction.
3029          */
3030         dbentry->destroy = PGSTAT_DESTROY_COUNT;
3031 }
3032
3033
3034 /* ----------
3035  * pgstat_recv_dropdb() -
3036  *
3037  *      Arrange for dead database removal
3038  * ----------
3039  */
3040 static void
3041 pgstat_recv_resetcounter(PgStat_MsgResetcounter *msg, int len)
3042 {
3043         HASHCTL         hash_ctl;
3044         PgStat_StatDBEntry *dbentry;
3045
3046         /*
3047          * Make sure the backend is counted for.
3048          */
3049         if (pgstat_add_backend(&msg->m_hdr) < 0)
3050                 return;
3051
3052         /*
3053          * Lookup the database in the hashtable.
3054          */
3055         dbentry = (PgStat_StatDBEntry *) hash_search(pgStatDBHash,
3056                                                                          (void *) &(msg->m_hdr.m_databaseid),
3057                                                                                                  HASH_FIND, NULL);
3058         if (!dbentry)
3059                 return;
3060
3061         /*
3062          * We simply throw away all the databases table entries by recreating
3063          * a new hash table for them.
3064          */
3065         if (dbentry->tables != NULL)
3066                 hash_destroy(dbentry->tables);
3067
3068         dbentry->tables = NULL;
3069         dbentry->n_xact_commit = 0;
3070         dbentry->n_xact_rollback = 0;
3071         dbentry->n_blocks_fetched = 0;
3072         dbentry->n_blocks_hit = 0;
3073         dbentry->n_connects = 0;
3074         dbentry->destroy = 0;
3075
3076         memset(&hash_ctl, 0, sizeof(hash_ctl));
3077         hash_ctl.keysize = sizeof(Oid);
3078         hash_ctl.entrysize = sizeof(PgStat_StatTabEntry);
3079         hash_ctl.hash = tag_hash;
3080         dbentry->tables = hash_create("Per-database table",
3081                                                                   PGSTAT_TAB_HASH_SIZE,
3082                                                                   &hash_ctl,
3083                                                                   HASH_ELEM | HASH_FUNCTION);
3084         if (dbentry->tables == NULL)
3085         {
3086                 /* assume the problem is out-of-memory */
3087                 ereport(LOG,
3088                                 (errcode(ERRCODE_OUT_OF_MEMORY),
3089                          errmsg("out of memory in statistics collector --- abort")));
3090                 exit(1);
3091         }
3092 }