]> granicus.if.org Git - postgresql/blob - src/backend/postmaster/pgstat.c
Assume select() might modify struct timeout, so remove previous
[postgresql] / src / backend / postmaster / pgstat.c
1 /* ----------
2  * pgstat.c
3  *
4  *      All the statistics collector stuff hacked up in one big, ugly file.
5  *
6  *      TODO:   - Separate collector, postmaster and backend stuff
7  *                        into different files.
8  *
9  *                      - Add some automatic call for pgstat vacuuming.
10  *
11  *                      - Add a pgstat config column to pg_database, so this
12  *                        entire thing can be enabled/disabled on a per db basis.
13  *
14  *      Copyright (c) 2001-2005, PostgreSQL Global Development Group
15  *
16  *      $PostgreSQL: pgsql/src/backend/postmaster/pgstat.c,v 1.118 2006/01/03 19:54:08 momjian Exp $
17  * ----------
18  */
19 #include "postgres.h"
20
21 #include <unistd.h>
22 #include <fcntl.h>
23 #include <sys/param.h>
24 #include <sys/time.h>
25 #include <sys/socket.h>
26 #include <netdb.h>
27 #include <netinet/in.h>
28 #include <arpa/inet.h>
29 #include <signal.h>
30 #include <time.h>
31
32 #include "pgstat.h"
33
34 #include "access/heapam.h"
35 #include "access/xact.h"
36 #include "catalog/pg_database.h"
37 #include "libpq/libpq.h"
38 #include "libpq/pqsignal.h"
39 #include "mb/pg_wchar.h"
40 #include "miscadmin.h"
41 #include "postmaster/autovacuum.h"
42 #include "postmaster/fork_process.h"
43 #include "postmaster/postmaster.h"
44 #include "storage/backendid.h"
45 #include "storage/fd.h"
46 #include "storage/ipc.h"
47 #include "storage/pg_shmem.h"
48 #include "storage/pmsignal.h"
49 #include "storage/procarray.h"
50 #include "tcop/tcopprot.h"
51 #include "utils/hsearch.h"
52 #include "utils/memutils.h"
53 #include "utils/ps_status.h"
54 #include "utils/rel.h"
55 #include "utils/syscache.h"
56
57
58 /* ----------
59  * Paths for the statistics files (relative to installation's $PGDATA).
60  * ----------
61  */
62 #define PGSTAT_STAT_FILENAME    "global/pgstat.stat"
63 #define PGSTAT_STAT_TMPFILE             "global/pgstat.tmp"
64
65 /* ----------
66  * Timer definitions.
67  * ----------
68  */
69 #define PGSTAT_STAT_INTERVAL    500             /* How often to write the status file;
70                                                                                  * in milliseconds. */
71
72 #define PGSTAT_DESTROY_DELAY    10000   /* How long to keep destroyed objects
73                                                                                  * known, to give delayed UDP packets
74                                                                                  * time to arrive; in milliseconds. */
75
76 #define PGSTAT_DESTROY_COUNT    (PGSTAT_DESTROY_DELAY / PGSTAT_STAT_INTERVAL)
77
78 #define PGSTAT_RESTART_INTERVAL 60              /* How often to attempt to restart a
79                                                                                  * failed statistics collector; in
80                                                                                  * seconds. */
81
82 /* ----------
83  * Amount of space reserved in pgstat_recvbuffer().
84  * ----------
85  */
86 #define PGSTAT_RECVBUFFERSZ             ((int) (1024 * sizeof(PgStat_Msg)))
87
88 /* ----------
89  * The initial size hints for the hash tables used in the collector.
90  * ----------
91  */
92 #define PGSTAT_DB_HASH_SIZE             16
93 #define PGSTAT_BE_HASH_SIZE             512
94 #define PGSTAT_TAB_HASH_SIZE    512
95
96
97 /* ----------
98  * GUC parameters
99  * ----------
100  */
101 bool            pgstat_collect_startcollector = true;
102 bool            pgstat_collect_resetonpmstart = false;
103 bool            pgstat_collect_querystring = false;
104 bool            pgstat_collect_tuplelevel = false;
105 bool            pgstat_collect_blocklevel = false;
106
107 /* ----------
108  * Local data
109  * ----------
110  */
111 NON_EXEC_STATIC int pgStatSock = -1;
112 NON_EXEC_STATIC int pgStatPipe[2] = {-1, -1};
113 static struct sockaddr_storage pgStatAddr;
114 static pid_t pgStatCollectorPid = 0;
115
116 static time_t last_pgstat_start_time;
117
118 static long pgStatNumMessages = 0;
119
120 static bool pgStatRunningInCollector = false;
121
122 /*
123  * Place where backends store per-table info to be sent to the collector.
124  * We store shared relations separately from non-shared ones, to be able to
125  * send them in separate messages.
126  */
127 typedef struct TabStatArray
128 {
129         int                     tsa_alloc;              /* num allocated */
130         int                     tsa_used;               /* num actually used */
131         PgStat_MsgTabstat **tsa_messages;       /* the array itself */
132 } TabStatArray;
133
134 #define TABSTAT_QUANTUM         4       /* we alloc this many at a time */
135
136 static TabStatArray RegularTabStat = {0, 0, NULL};
137 static TabStatArray SharedTabStat = {0, 0, NULL};
138
139 static int      pgStatXactCommit = 0;
140 static int      pgStatXactRollback = 0;
141
142 static TransactionId pgStatDBHashXact = InvalidTransactionId;
143 static HTAB *pgStatDBHash = NULL;
144 static HTAB *pgStatBeDead = NULL;
145 static PgStat_StatBeEntry *pgStatBeTable = NULL;
146 static int      pgStatNumBackends = 0;
147
148 static volatile bool    need_statwrite;
149
150 /* ----------
151  * Local function forward declarations
152  * ----------
153  */
154 #ifdef EXEC_BACKEND
155
156 typedef enum STATS_PROCESS_TYPE
157 {
158         STAT_PROC_BUFFER,
159         STAT_PROC_COLLECTOR
160 }       STATS_PROCESS_TYPE;
161
162 static pid_t pgstat_forkexec(STATS_PROCESS_TYPE procType);
163 static void pgstat_parseArgs(int argc, char *argv[]);
164 #endif
165
166 NON_EXEC_STATIC void PgstatBufferMain(int argc, char *argv[]);
167 NON_EXEC_STATIC void PgstatCollectorMain(int argc, char *argv[]);
168 static void force_statwrite(SIGNAL_ARGS);
169 static void pgstat_recvbuffer(void);
170 static void pgstat_exit(SIGNAL_ARGS);
171 static void pgstat_die(SIGNAL_ARGS);
172 static void pgstat_beshutdown_hook(int code, Datum arg);
173
174 static PgStat_StatDBEntry *pgstat_get_db_entry(Oid databaseid, bool create);
175 static int      pgstat_add_backend(PgStat_MsgHdr *msg);
176 static void pgstat_sub_backend(int procpid);
177 static void pgstat_drop_database(Oid databaseid);
178 static void pgstat_write_statsfile(void);
179 static void pgstat_read_statsfile(HTAB **dbhash, Oid onlydb,
180                                           PgStat_StatBeEntry **betab,
181                                           int *numbackends);
182 static void backend_read_statsfile(void);
183
184 static void pgstat_setheader(PgStat_MsgHdr *hdr, StatMsgType mtype);
185 static void pgstat_send(void *msg, int len);
186
187 static void pgstat_recv_bestart(PgStat_MsgBestart *msg, int len);
188 static void pgstat_recv_beterm(PgStat_MsgBeterm *msg, int len);
189 static void pgstat_recv_activity(PgStat_MsgActivity *msg, int len);
190 static void pgstat_recv_tabstat(PgStat_MsgTabstat *msg, int len);
191 static void pgstat_recv_tabpurge(PgStat_MsgTabpurge *msg, int len);
192 static void pgstat_recv_dropdb(PgStat_MsgDropdb *msg, int len);
193 static void pgstat_recv_resetcounter(PgStat_MsgResetcounter *msg, int len);
194 static void pgstat_recv_autovac(PgStat_MsgAutovacStart *msg, int len);
195 static void pgstat_recv_vacuum(PgStat_MsgVacuum *msg, int len);
196 static void pgstat_recv_analyze(PgStat_MsgAnalyze *msg, int len);
197
198
199 /* ------------------------------------------------------------
200  * Public functions called from postmaster follow
201  * ------------------------------------------------------------
202  */
203
204 /* ----------
205  * pgstat_init() -
206  *
207  *      Called from postmaster at startup. Create the resources required
208  *      by the statistics collector process.  If unable to do so, do not
209  *      fail --- better to let the postmaster start with stats collection
210  *      disabled.
211  * ----------
212  */
213 void
214 pgstat_init(void)
215 {
216         ACCEPT_TYPE_ARG3 alen;
217         struct addrinfo *addrs = NULL,
218                            *addr,
219                                 hints;
220         int                     ret;
221         fd_set          rset;
222         struct timeval tv;
223         char            test_byte;
224         int                     sel_res;
225
226 #define TESTBYTEVAL ((char) 199)
227
228         /*
229          * Force start of collector daemon if something to collect
230          */
231         if (pgstat_collect_querystring ||
232                 pgstat_collect_tuplelevel ||
233                 pgstat_collect_blocklevel)
234                 pgstat_collect_startcollector = true;
235
236         /*
237          * If we don't have to start a collector or should reset the collected
238          * statistics on postmaster start, simply remove the stats file.
239          */
240         if (!pgstat_collect_startcollector || pgstat_collect_resetonpmstart)
241                 pgstat_reset_all();
242
243         /*
244          * Nothing else required if collector will not get started
245          */
246         if (!pgstat_collect_startcollector)
247                 return;
248
249         /*
250          * Create the UDP socket for sending and receiving statistic messages
251          */
252         hints.ai_flags = AI_PASSIVE;
253         hints.ai_family = PF_UNSPEC;
254         hints.ai_socktype = SOCK_DGRAM;
255         hints.ai_protocol = 0;
256         hints.ai_addrlen = 0;
257         hints.ai_addr = NULL;
258         hints.ai_canonname = NULL;
259         hints.ai_next = NULL;
260         ret = pg_getaddrinfo_all("localhost", NULL, &hints, &addrs);
261         if (ret || !addrs)
262         {
263                 ereport(LOG,
264                                 (errmsg("could not resolve \"localhost\": %s",
265                                                 gai_strerror(ret))));
266                 goto startup_failed;
267         }
268
269         /*
270          * On some platforms, pg_getaddrinfo_all() may return multiple addresses
271          * only one of which will actually work (eg, both IPv6 and IPv4 addresses
272          * when kernel will reject IPv6).  Worse, the failure may occur at the
273          * bind() or perhaps even connect() stage.      So we must loop through the
274          * results till we find a working combination. We will generate LOG
275          * messages, but no error, for bogus combinations.
276          */
277         for (addr = addrs; addr; addr = addr->ai_next)
278         {
279 #ifdef HAVE_UNIX_SOCKETS
280                 /* Ignore AF_UNIX sockets, if any are returned. */
281                 if (addr->ai_family == AF_UNIX)
282                         continue;
283 #endif
284
285                 /*
286                  * Create the socket.
287                  */
288                 if ((pgStatSock = socket(addr->ai_family, SOCK_DGRAM, 0)) < 0)
289                 {
290                         ereport(LOG,
291                                         (errcode_for_socket_access(),
292                         errmsg("could not create socket for statistics collector: %m")));
293                         continue;
294                 }
295
296                 /*
297                  * Bind it to a kernel assigned port on localhost and get the assigned
298                  * port via getsockname().
299                  */
300                 if (bind(pgStatSock, addr->ai_addr, addr->ai_addrlen) < 0)
301                 {
302                         ereport(LOG,
303                                         (errcode_for_socket_access(),
304                           errmsg("could not bind socket for statistics collector: %m")));
305                         closesocket(pgStatSock);
306                         pgStatSock = -1;
307                         continue;
308                 }
309
310                 alen = sizeof(pgStatAddr);
311                 if (getsockname(pgStatSock, (struct sockaddr *) & pgStatAddr, &alen) < 0)
312                 {
313                         ereport(LOG,
314                                         (errcode_for_socket_access(),
315                                          errmsg("could not get address of socket for statistics collector: %m")));
316                         closesocket(pgStatSock);
317                         pgStatSock = -1;
318                         continue;
319                 }
320
321                 /*
322                  * Connect the socket to its own address.  This saves a few cycles by
323                  * not having to respecify the target address on every send. This also
324                  * provides a kernel-level check that only packets from this same
325                  * address will be received.
326                  */
327                 if (connect(pgStatSock, (struct sockaddr *) & pgStatAddr, alen) < 0)
328                 {
329                         ereport(LOG,
330                                         (errcode_for_socket_access(),
331                         errmsg("could not connect socket for statistics collector: %m")));
332                         closesocket(pgStatSock);
333                         pgStatSock = -1;
334                         continue;
335                 }
336
337                 /*
338                  * Try to send and receive a one-byte test message on the socket. This
339                  * is to catch situations where the socket can be created but will not
340                  * actually pass data (for instance, because kernel packet filtering
341                  * rules prevent it).
342                  */
343                 test_byte = TESTBYTEVAL;
344                 if (send(pgStatSock, &test_byte, 1, 0) != 1)
345                 {
346                         ereport(LOG,
347                                         (errcode_for_socket_access(),
348                                          errmsg("could not send test message on socket for statistics collector: %m")));
349                         closesocket(pgStatSock);
350                         pgStatSock = -1;
351                         continue;
352                 }
353
354                 /*
355                  * There could possibly be a little delay before the message can be
356                  * received.  We arbitrarily allow up to half a second before deciding
357                  * it's broken.
358                  */
359                 for (;;)                                /* need a loop to handle EINTR */
360                 {
361                         FD_ZERO(&rset);
362                         FD_SET(pgStatSock, &rset);
363                         tv.tv_sec = 0;
364                         tv.tv_usec = 500000;
365                         sel_res = select(pgStatSock + 1, &rset, NULL, NULL, &tv);
366                         if (sel_res >= 0 || errno != EINTR)
367                                 break;
368                 }
369                 if (sel_res < 0)
370                 {
371                         ereport(LOG,
372                                         (errcode_for_socket_access(),
373                                          errmsg("select() failed in statistics collector: %m")));
374                         closesocket(pgStatSock);
375                         pgStatSock = -1;
376                         continue;
377                 }
378                 if (sel_res == 0 || !FD_ISSET(pgStatSock, &rset))
379                 {
380                         /*
381                          * This is the case we actually think is likely, so take pains to
382                          * give a specific message for it.
383                          *
384                          * errno will not be set meaningfully here, so don't use it.
385                          */
386                         ereport(LOG,
387                                         (errcode(ERRCODE_CONNECTION_FAILURE),
388                                          errmsg("test message did not get through on socket for statistics collector")));
389                         closesocket(pgStatSock);
390                         pgStatSock = -1;
391                         continue;
392                 }
393
394                 test_byte++;                    /* just make sure variable is changed */
395
396                 if (recv(pgStatSock, &test_byte, 1, 0) != 1)
397                 {
398                         ereport(LOG,
399                                         (errcode_for_socket_access(),
400                                          errmsg("could not receive test message on socket for statistics collector: %m")));
401                         closesocket(pgStatSock);
402                         pgStatSock = -1;
403                         continue;
404                 }
405
406                 if (test_byte != TESTBYTEVAL)   /* strictly paranoia ... */
407                 {
408                         ereport(LOG,
409                                         (errcode(ERRCODE_INTERNAL_ERROR),
410                                          errmsg("incorrect test message transmission on socket for statistics collector")));
411                         closesocket(pgStatSock);
412                         pgStatSock = -1;
413                         continue;
414                 }
415
416                 /* If we get here, we have a working socket */
417                 break;
418         }
419
420         /* Did we find a working address? */
421         if (!addr || pgStatSock < 0)
422                 goto startup_failed;
423
424         /*
425          * Set the socket to non-blocking IO.  This ensures that if the collector
426          * falls behind (despite the buffering process), statistics messages will
427          * be discarded; backends won't block waiting to send messages to the
428          * collector.
429          */
430         if (!pg_set_noblock(pgStatSock))
431         {
432                 ereport(LOG,
433                                 (errcode_for_socket_access(),
434                                  errmsg("could not set statistics collector socket to nonblocking mode: %m")));
435                 goto startup_failed;
436         }
437
438         pg_freeaddrinfo_all(hints.ai_family, addrs);
439
440         return;
441
442 startup_failed:
443         ereport(LOG,
444           (errmsg("disabling statistics collector for lack of working socket")));
445
446         if (addrs)
447                 pg_freeaddrinfo_all(hints.ai_family, addrs);
448
449         if (pgStatSock >= 0)
450                 closesocket(pgStatSock);
451         pgStatSock = -1;
452
453         /* Adjust GUC variables to suppress useless activity */
454         pgstat_collect_startcollector = false;
455         pgstat_collect_querystring = false;
456         pgstat_collect_tuplelevel = false;
457         pgstat_collect_blocklevel = false;
458 }
459
460 /*
461  * pgstat_reset_all() -
462  *
463  * Remove the stats file.  This is used on server start if the
464  * stats_reset_on_server_start feature is enabled, or if WAL
465  * recovery is needed after a crash.
466  */
467 void
468 pgstat_reset_all(void)
469 {
470         unlink(PGSTAT_STAT_FILENAME);
471 }
472
473 #ifdef EXEC_BACKEND
474
475 /*
476  * pgstat_forkexec() -
477  *
478  * Format up the arglist for, then fork and exec, statistics
479  * (buffer and collector) processes
480  */
481 static pid_t
482 pgstat_forkexec(STATS_PROCESS_TYPE procType)
483 {
484         char       *av[10];
485         int                     ac = 0,
486                                 bufc = 0,
487                                 i;
488         char            pgstatBuf[2][32];
489
490         av[ac++] = "postgres";
491
492         switch (procType)
493         {
494                 case STAT_PROC_BUFFER:
495                         av[ac++] = "-forkbuf";
496                         break;
497
498                 case STAT_PROC_COLLECTOR:
499                         av[ac++] = "-forkcol";
500                         break;
501
502                 default:
503                         Assert(false);
504         }
505
506         av[ac++] = NULL;                        /* filled in by postmaster_forkexec */
507
508         /* postgres_exec_path is not passed by write_backend_variables */
509         av[ac++] = postgres_exec_path;
510
511         /* Add to the arg list */
512         Assert(bufc <= lengthof(pgstatBuf));
513         for (i = 0; i < bufc; i++)
514                 av[ac++] = pgstatBuf[i];
515
516         av[ac] = NULL;
517         Assert(ac < lengthof(av));
518
519         return postmaster_forkexec(ac, av);
520 }
521
522
523 /*
524  * pgstat_parseArgs() -
525  *
526  * Extract data from the arglist for exec'ed statistics
527  * (buffer and collector) processes
528  */
529 static void
530 pgstat_parseArgs(int argc, char *argv[])
531 {
532         Assert(argc == 4);
533
534         argc = 3;
535         StrNCpy(postgres_exec_path, argv[argc++], MAXPGPATH);
536 }
537 #endif   /* EXEC_BACKEND */
538
539
540 /* ----------
541  * pgstat_start() -
542  *
543  *      Called from postmaster at startup or after an existing collector
544  *      died.  Attempt to fire up a fresh statistics collector.
545  *
546  *      Returns PID of child process, or 0 if fail.
547  *
548  *      Note: if fail, we will be called again from the postmaster main loop.
549  * ----------
550  */
551 int
552 pgstat_start(void)
553 {
554         time_t          curtime;
555         pid_t           pgStatPid;
556
557         /*
558          * Do nothing if no collector needed
559          */
560         if (!pgstat_collect_startcollector)
561                 return 0;
562
563         /*
564          * Do nothing if too soon since last collector start.  This is a safety
565          * valve to protect against continuous respawn attempts if the collector
566          * is dying immediately at launch.      Note that since we will be re-called
567          * from the postmaster main loop, we will get another chance later.
568          */
569         curtime = time(NULL);
570         if ((unsigned int) (curtime - last_pgstat_start_time) <
571                 (unsigned int) PGSTAT_RESTART_INTERVAL)
572                 return 0;
573         last_pgstat_start_time = curtime;
574
575         /*
576          * Check that the socket is there, else pgstat_init failed.
577          */
578         if (pgStatSock < 0)
579         {
580                 ereport(LOG,
581                                 (errmsg("statistics collector startup skipped")));
582
583                 /*
584                  * We can only get here if someone tries to manually turn
585                  * pgstat_collect_startcollector on after it had been off.
586                  */
587                 pgstat_collect_startcollector = false;
588                 return 0;
589         }
590
591         /*
592          * Okay, fork off the collector.
593          */
594 #ifdef EXEC_BACKEND
595         switch ((pgStatPid = pgstat_forkexec(STAT_PROC_BUFFER)))
596 #else
597         switch ((pgStatPid = fork_process()))
598 #endif
599         {
600                 case -1:
601                         ereport(LOG,
602                                         (errmsg("could not fork statistics buffer: %m")));
603                         return 0;
604
605 #ifndef EXEC_BACKEND
606                 case 0:
607                         /* in postmaster child ... */
608                         /* Close the postmaster's sockets */
609                         ClosePostmasterPorts(false);
610
611                         /* Drop our connection to postmaster's shared memory, as well */
612                         PGSharedMemoryDetach();
613
614                         PgstatBufferMain(0, NULL);
615                         break;
616 #endif
617
618                 default:
619                         return (int) pgStatPid;
620         }
621
622         /* shouldn't get here */
623         return 0;
624 }
625
626
627 /* ----------
628  * pgstat_beterm() -
629  *
630  *      Called from postmaster to tell collector a backend terminated.
631  * ----------
632  */
633 void
634 pgstat_beterm(int pid)
635 {
636         PgStat_MsgBeterm msg;
637
638         if (pgStatSock < 0)
639                 return;
640
641         /* can't use pgstat_setheader() because it's not called in a backend */
642         MemSet(&(msg.m_hdr), 0, sizeof(msg.m_hdr));
643         msg.m_hdr.m_type = PGSTAT_MTYPE_BETERM;
644         msg.m_hdr.m_procpid = pid;
645
646         pgstat_send(&msg, sizeof(msg));
647 }
648
649
650 /* ----------
651  * pgstat_report_autovac() -
652  *
653  *      Called from autovacuum.c to report startup of an autovacuum process.
654  *      We are called before InitPostgres is done, so can't rely on MyDatabaseId;
655  *      the db OID must be passed in, instead.
656  * ----------
657  */
658 void
659 pgstat_report_autovac(Oid dboid)
660 {
661         PgStat_MsgAutovacStart msg;
662
663         if (pgStatSock < 0)
664                 return;
665
666         pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_AUTOVAC_START);
667         msg.m_databaseid = dboid;
668         msg.m_start_time = GetCurrentTimestamp();
669
670         pgstat_send(&msg, sizeof(msg));
671 }
672
673 /* ------------------------------------------------------------
674  * Public functions used by backends follow
675  *------------------------------------------------------------
676  */
677
678
679 /* ----------
680  * pgstat_bestart() -
681  *
682  *      Tell the collector that this new backend is soon ready to process
683  *      queries. Called from InitPostgres.
684  * ----------
685  */
686 void
687 pgstat_bestart(void)
688 {
689         PgStat_MsgBestart msg;
690
691         if (pgStatSock < 0)
692                 return;
693
694         /*
695          * We may not have a MyProcPort (eg, if this is the autovacuum process).
696          * For the moment, punt and don't send BESTART --- would be better to work
697          * out a clean way of handling "unknown clientaddr".
698          */
699         if (MyProcPort)
700         {
701                 pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_BESTART);
702                 msg.m_databaseid = MyDatabaseId;
703                 msg.m_userid = GetSessionUserId();
704                 memcpy(&msg.m_clientaddr, &MyProcPort->raddr, sizeof(msg.m_clientaddr));
705                 pgstat_send(&msg, sizeof(msg));
706         }
707
708         /*
709          * Set up a process-exit hook to ensure we flush the last batch of
710          * statistics to the collector.
711          */
712         on_shmem_exit(pgstat_beshutdown_hook, 0);
713 }
714
715 /* ---------
716  * pgstat_report_vacuum() -
717  *
718  *      Tell the collector about the table we just vacuumed.
719  * ---------
720  */
721 void
722 pgstat_report_vacuum(Oid tableoid, bool shared,
723                                          bool analyze, PgStat_Counter tuples)
724 {
725         PgStat_MsgVacuum msg;
726
727         if (pgStatSock < 0)
728                 return;
729
730         pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_VACUUM);
731         msg.m_databaseid = shared ? InvalidOid : MyDatabaseId;
732         msg.m_tableoid = tableoid;
733         msg.m_analyze = analyze;
734         msg.m_tuples = tuples;
735         pgstat_send(&msg, sizeof(msg));
736 }
737
738 /* --------
739  * pgstat_report_analyze() -
740  *
741  *      Tell the collector about the table we just analyzed.
742  * --------
743  */
744 void
745 pgstat_report_analyze(Oid tableoid, bool shared, PgStat_Counter livetuples,
746                                           PgStat_Counter deadtuples)
747 {
748         PgStat_MsgAnalyze msg;
749
750         if (pgStatSock < 0)
751                 return;
752
753         pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_ANALYZE);
754         msg.m_databaseid = shared ? InvalidOid : MyDatabaseId;
755         msg.m_tableoid = tableoid;
756         msg.m_live_tuples = livetuples;
757         msg.m_dead_tuples = deadtuples;
758         pgstat_send(&msg, sizeof(msg));
759 }
760
761 /*
762  * Flush any remaining statistics counts out to the collector at process
763  * exit.   Without this, operations triggered during backend exit (such as
764  * temp table deletions) won't be counted.
765  */
766 static void
767 pgstat_beshutdown_hook(int code, Datum arg)
768 {
769         pgstat_report_tabstat();
770 }
771
772
773 /* ----------
774  * pgstat_report_activity() -
775  *
776  *      Called from tcop/postgres.c to tell the collector what the backend
777  *      is actually doing (usually "<IDLE>" or the start of the query to
778  *      be executed).
779  * ----------
780  */
781 void
782 pgstat_report_activity(const char *cmd_str)
783 {
784         PgStat_MsgActivity msg;
785         int                     len;
786
787         if (!pgstat_collect_querystring || pgStatSock < 0)
788                 return;
789
790         len = strlen(cmd_str);
791         len = pg_mbcliplen(cmd_str, len, PGSTAT_ACTIVITY_SIZE - 1);
792
793         memcpy(msg.m_cmd_str, cmd_str, len);
794         msg.m_cmd_str[len] = '\0';
795         len += offsetof(PgStat_MsgActivity, m_cmd_str) + 1;
796
797         pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_ACTIVITY);
798         pgstat_send(&msg, len);
799 }
800
801
802 /* ----------
803  * pgstat_report_tabstat() -
804  *
805  *      Called from tcop/postgres.c to send the so far collected
806  *      per table access statistics to the collector.
807  * ----------
808  */
809 void
810 pgstat_report_tabstat(void)
811 {
812         int                     i;
813
814         if (pgStatSock < 0 ||
815                 (!pgstat_collect_querystring &&
816                  !pgstat_collect_tuplelevel &&
817                  !pgstat_collect_blocklevel))
818         {
819                 /* Not reporting stats, so just flush whatever we have */
820                 RegularTabStat.tsa_used = 0;
821                 SharedTabStat.tsa_used = 0;
822                 return;
823         }
824
825         /*
826          * For each message buffer used during the last query set the header
827          * fields and send it out.
828          */
829         for (i = 0; i < RegularTabStat.tsa_used; i++)
830         {
831                 PgStat_MsgTabstat *tsmsg = RegularTabStat.tsa_messages[i];
832                 int                     n;
833                 int                     len;
834
835                 n = tsmsg->m_nentries;
836                 len = offsetof(PgStat_MsgTabstat, m_entry[0]) +
837                         n * sizeof(PgStat_TableEntry);
838
839                 tsmsg->m_xact_commit = pgStatXactCommit;
840                 tsmsg->m_xact_rollback = pgStatXactRollback;
841                 pgStatXactCommit = 0;
842                 pgStatXactRollback = 0;
843
844                 pgstat_setheader(&tsmsg->m_hdr, PGSTAT_MTYPE_TABSTAT);
845                 tsmsg->m_databaseid = MyDatabaseId;
846                 pgstat_send(tsmsg, len);
847         }
848         RegularTabStat.tsa_used = 0;
849
850         /* Ditto, for shared relations */
851         for (i = 0; i < SharedTabStat.tsa_used; i++)
852         {
853                 PgStat_MsgTabstat *tsmsg = SharedTabStat.tsa_messages[i];
854                 int                     n;
855                 int                     len;
856
857                 n = tsmsg->m_nentries;
858                 len = offsetof(PgStat_MsgTabstat, m_entry[0]) +
859                         n * sizeof(PgStat_TableEntry);
860
861                 /* We don't report transaction commit/abort here */
862                 tsmsg->m_xact_commit = 0;
863                 tsmsg->m_xact_rollback = 0;
864
865                 pgstat_setheader(&tsmsg->m_hdr, PGSTAT_MTYPE_TABSTAT);
866                 tsmsg->m_databaseid = InvalidOid;
867                 pgstat_send(tsmsg, len);
868         }
869         SharedTabStat.tsa_used = 0;
870 }
871
872
873 /* ----------
874  * pgstat_vacuum_tabstat() -
875  *
876  *      Will tell the collector about objects he can get rid of.
877  * ----------
878  */
879 int
880 pgstat_vacuum_tabstat(void)
881 {
882         Relation        dbrel;
883         HeapScanDesc dbscan;
884         HeapTuple       dbtup;
885         Oid                *dbidlist;
886         int                     dbidalloc;
887         int                     dbidused;
888         HASH_SEQ_STATUS hstat;
889         PgStat_StatDBEntry *dbentry;
890         PgStat_StatTabEntry *tabentry;
891         HeapTuple       reltup;
892         int                     nobjects = 0;
893         PgStat_MsgTabpurge msg;
894         int                     len;
895         int                     i;
896
897         if (pgStatSock < 0)
898                 return 0;
899
900         /*
901          * If not done for this transaction, read the statistics collector stats
902          * file into some hash tables.
903          */
904         backend_read_statsfile();
905
906         /*
907          * Lookup our own database entry; if not found, nothing to do.
908          */
909         dbentry = (PgStat_StatDBEntry *) hash_search(pgStatDBHash,
910                                                                                                  (void *) &MyDatabaseId,
911                                                                                                  HASH_FIND, NULL);
912         if (dbentry == NULL)
913                 return -1;
914         if (dbentry->tables == NULL)
915                 return 0;
916
917         /*
918          * Initialize our messages table counter to zero
919          */
920         msg.m_nentries = 0;
921
922         /*
923          * Check for all tables listed in stats hashtable if they still exist.
924          */
925         hash_seq_init(&hstat, dbentry->tables);
926         while ((tabentry = (PgStat_StatTabEntry *) hash_seq_search(&hstat)) != NULL)
927         {
928                 /*
929                  * Check if this relation is still alive by looking up it's pg_class
930                  * tuple in the system catalog cache.
931                  */
932                 reltup = SearchSysCache(RELOID,
933                                                                 ObjectIdGetDatum(tabentry->tableid),
934                                                                 0, 0, 0);
935                 if (HeapTupleIsValid(reltup))
936                 {
937                         ReleaseSysCache(reltup);
938                         continue;
939                 }
940
941                 /*
942                  * Add this table's Oid to the message
943                  */
944                 msg.m_tableid[msg.m_nentries++] = tabentry->tableid;
945                 nobjects++;
946
947                 /*
948                  * If the message is full, send it out and reinitialize to zero
949                  */
950                 if (msg.m_nentries >= PGSTAT_NUM_TABPURGE)
951                 {
952                         len = offsetof(PgStat_MsgTabpurge, m_tableid[0])
953                                 +msg.m_nentries * sizeof(Oid);
954
955                         pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_TABPURGE);
956                         msg.m_databaseid = MyDatabaseId;
957                         pgstat_send(&msg, len);
958
959                         msg.m_nentries = 0;
960                 }
961         }
962
963         /*
964          * Send the rest
965          */
966         if (msg.m_nentries > 0)
967         {
968                 len = offsetof(PgStat_MsgTabpurge, m_tableid[0])
969                         +msg.m_nentries * sizeof(Oid);
970
971                 pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_TABPURGE);
972                 msg.m_databaseid = MyDatabaseId;
973                 pgstat_send(&msg, len);
974         }
975
976         /*
977          * Read pg_database and remember the Oid's of all existing databases
978          */
979         dbidalloc = 256;
980         dbidused = 0;
981         dbidlist = (Oid *) palloc(sizeof(Oid) * dbidalloc);
982
983         dbrel = heap_open(DatabaseRelationId, AccessShareLock);
984         dbscan = heap_beginscan(dbrel, SnapshotNow, 0, NULL);
985         while ((dbtup = heap_getnext(dbscan, ForwardScanDirection)) != NULL)
986         {
987                 if (dbidused >= dbidalloc)
988                 {
989                         dbidalloc *= 2;
990                         dbidlist = (Oid *) repalloc((char *) dbidlist,
991                                                                                 sizeof(Oid) * dbidalloc);
992                 }
993                 dbidlist[dbidused++] = HeapTupleGetOid(dbtup);
994         }
995         heap_endscan(dbscan);
996         heap_close(dbrel, AccessShareLock);
997
998         /*
999          * Search the database hash table for dead databases and tell the
1000          * collector to drop them as well.
1001          */
1002         hash_seq_init(&hstat, pgStatDBHash);
1003         while ((dbentry = (PgStat_StatDBEntry *) hash_seq_search(&hstat)) != NULL)
1004         {
1005                 Oid                     dbid = dbentry->databaseid;
1006
1007                 for (i = 0; i < dbidused; i++)
1008                 {
1009                         if (dbidlist[i] == dbid)
1010                         {
1011                                 dbid = InvalidOid;
1012                                 break;
1013                         }
1014                 }
1015
1016                 if (dbid != InvalidOid)
1017                 {
1018                         pgstat_drop_database(dbid);
1019                         nobjects++;
1020                 }
1021         }
1022
1023         /*
1024          * Free the dbid list.
1025          */
1026         pfree(dbidlist);
1027
1028         /*
1029          * Tell the caller how many removeable objects we found
1030          */
1031         return nobjects;
1032 }
1033
1034
1035 /* ----------
1036  * pgstat_drop_database() -
1037  *
1038  *      Tell the collector that we just dropped a database.
1039  *      This is the only message that shouldn't get lost in space. Otherwise
1040  *      the collector will keep the statistics for the dead DB until his
1041  *      stats file got removed while the postmaster is down.
1042  * ----------
1043  */
1044 static void
1045 pgstat_drop_database(Oid databaseid)
1046 {
1047         PgStat_MsgDropdb msg;
1048
1049         if (pgStatSock < 0)
1050                 return;
1051
1052         pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_DROPDB);
1053         msg.m_databaseid = databaseid;
1054         pgstat_send(&msg, sizeof(msg));
1055 }
1056
1057
1058 /* ----------
1059  * pgstat_reset_counters() -
1060  *
1061  *      Tell the statistics collector to reset counters for our database.
1062  * ----------
1063  */
1064 void
1065 pgstat_reset_counters(void)
1066 {
1067         PgStat_MsgResetcounter msg;
1068
1069         if (pgStatSock < 0)
1070                 return;
1071
1072         if (!superuser())
1073                 ereport(ERROR,
1074                                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1075                                  errmsg("must be superuser to reset statistics counters")));
1076
1077         pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_RESETCOUNTER);
1078         msg.m_databaseid = MyDatabaseId;
1079         pgstat_send(&msg, sizeof(msg));
1080 }
1081
1082
1083 /* ----------
1084  * pgstat_ping() -
1085  *
1086  *      Send some junk data to the collector to increase traffic.
1087  * ----------
1088  */
1089 void
1090 pgstat_ping(void)
1091 {
1092         PgStat_MsgDummy msg;
1093
1094         if (pgStatSock < 0)
1095                 return;
1096
1097         pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_DUMMY);
1098         pgstat_send(&msg, sizeof(msg));
1099 }
1100
1101 /*
1102  * Enlarge a TabStatArray
1103  */
1104 static void
1105 more_tabstat_space(TabStatArray *tsarr)
1106 {
1107         PgStat_MsgTabstat *newMessages;
1108         PgStat_MsgTabstat **msgArray;
1109         int                     newAlloc;
1110         int                     i;
1111
1112         AssertArg(PointerIsValid(tsarr));
1113
1114         newAlloc = tsarr->tsa_alloc + TABSTAT_QUANTUM;
1115
1116         /* Create (another) quantum of message buffers */
1117         newMessages = (PgStat_MsgTabstat *)
1118                 MemoryContextAllocZero(TopMemoryContext,
1119                                                            sizeof(PgStat_MsgTabstat) * TABSTAT_QUANTUM);
1120
1121         /* Create or enlarge the pointer array */
1122         if (tsarr->tsa_messages == NULL)
1123                 msgArray = (PgStat_MsgTabstat **)
1124                         MemoryContextAlloc(TopMemoryContext,
1125                                                            sizeof(PgStat_MsgTabstat *) * newAlloc);
1126         else
1127                 msgArray = (PgStat_MsgTabstat **)
1128                         repalloc(tsarr->tsa_messages,
1129                                          sizeof(PgStat_MsgTabstat *) * newAlloc);
1130
1131         for (i = 0; i < TABSTAT_QUANTUM; i++)
1132                 msgArray[tsarr->tsa_alloc + i] = newMessages++;
1133         tsarr->tsa_messages = msgArray;
1134         tsarr->tsa_alloc = newAlloc;
1135
1136         Assert(tsarr->tsa_used < tsarr->tsa_alloc);
1137 }
1138
1139 /* ----------
1140  * pgstat_initstats() -
1141  *
1142  *      Called from various places usually dealing with initialization
1143  *      of Relation or Scan structures. The data placed into these
1144  *      structures from here tell where later to count for buffer reads,
1145  *      scans and tuples fetched.
1146  * ----------
1147  */
1148 void
1149 pgstat_initstats(PgStat_Info *stats, Relation rel)
1150 {
1151         Oid                     rel_id = rel->rd_id;
1152         PgStat_TableEntry *useent;
1153         TabStatArray *tsarr;
1154         PgStat_MsgTabstat *tsmsg;
1155         int                     mb;
1156         int                     i;
1157
1158         /*
1159          * Initialize data not to count at all.
1160          */
1161         stats->tabentry = NULL;
1162
1163         if (pgStatSock < 0 ||
1164                 !(pgstat_collect_tuplelevel ||
1165                   pgstat_collect_blocklevel))
1166                 return;
1167
1168         tsarr = rel->rd_rel->relisshared ? &SharedTabStat : &RegularTabStat;
1169
1170         /*
1171          * Search the already-used message slots for this relation.
1172          */
1173         for (mb = 0; mb < tsarr->tsa_used; mb++)
1174         {
1175                 tsmsg = tsarr->tsa_messages[mb];
1176
1177                 for (i = tsmsg->m_nentries; --i >= 0;)
1178                 {
1179                         if (tsmsg->m_entry[i].t_id == rel_id)
1180                         {
1181                                 stats->tabentry = (void *) &(tsmsg->m_entry[i]);
1182                                 return;
1183                         }
1184                 }
1185
1186                 if (tsmsg->m_nentries >= PGSTAT_NUM_TABENTRIES)
1187                         continue;
1188
1189                 /*
1190                  * Not found, but found a message buffer with an empty slot instead.
1191                  * Fine, let's use this one.
1192                  */
1193                 i = tsmsg->m_nentries++;
1194                 useent = &tsmsg->m_entry[i];
1195                 MemSet(useent, 0, sizeof(PgStat_TableEntry));
1196                 useent->t_id = rel_id;
1197                 stats->tabentry = (void *) useent;
1198                 return;
1199         }
1200
1201         /*
1202          * If we ran out of message buffers, we just allocate more.
1203          */
1204         if (tsarr->tsa_used >= tsarr->tsa_alloc)
1205                 more_tabstat_space(tsarr);
1206
1207         /*
1208          * Use the first entry of the next message buffer.
1209          */
1210         mb = tsarr->tsa_used++;
1211         tsmsg = tsarr->tsa_messages[mb];
1212         tsmsg->m_nentries = 1;
1213         useent = &tsmsg->m_entry[0];
1214         MemSet(useent, 0, sizeof(PgStat_TableEntry));
1215         useent->t_id = rel_id;
1216         stats->tabentry = (void *) useent;
1217 }
1218
1219
1220 /* ----------
1221  * pgstat_count_xact_commit() -
1222  *
1223  *      Called from access/transam/xact.c to count transaction commits.
1224  * ----------
1225  */
1226 void
1227 pgstat_count_xact_commit(void)
1228 {
1229         if      (!pgstat_collect_querystring &&
1230                  !pgstat_collect_tuplelevel &&
1231                  !pgstat_collect_blocklevel)
1232                 return;
1233
1234         pgStatXactCommit++;
1235
1236         /*
1237          * If there was no relation activity yet, just make one existing message
1238          * buffer used without slots, causing the next report to tell new
1239          * xact-counters.
1240          */
1241         if (RegularTabStat.tsa_alloc == 0)
1242                 more_tabstat_space(&RegularTabStat);
1243
1244         if (RegularTabStat.tsa_used == 0)
1245         {
1246                 RegularTabStat.tsa_used++;
1247                 RegularTabStat.tsa_messages[0]->m_nentries = 0;
1248         }
1249 }
1250
1251
1252 /* ----------
1253  * pgstat_count_xact_rollback() -
1254  *
1255  *      Called from access/transam/xact.c to count transaction rollbacks.
1256  * ----------
1257  */
1258 void
1259 pgstat_count_xact_rollback(void)
1260 {
1261         if      (!pgstat_collect_querystring &&
1262                  !pgstat_collect_tuplelevel &&
1263                  !pgstat_collect_blocklevel)
1264                 return;
1265
1266         pgStatXactRollback++;
1267
1268         /*
1269          * If there was no relation activity yet, just make one existing message
1270          * buffer used without slots, causing the next report to tell new
1271          * xact-counters.
1272          */
1273         if (RegularTabStat.tsa_alloc == 0)
1274                 more_tabstat_space(&RegularTabStat);
1275
1276         if (RegularTabStat.tsa_used == 0)
1277         {
1278                 RegularTabStat.tsa_used++;
1279                 RegularTabStat.tsa_messages[0]->m_nentries = 0;
1280         }
1281 }
1282
1283
1284 /* ----------
1285  * pgstat_fetch_stat_dbentry() -
1286  *
1287  *      Support function for the SQL-callable pgstat* functions. Returns
1288  *      the collected statistics for one database or NULL. NULL doesn't mean
1289  *      that the database doesn't exist, it is just not yet known by the
1290  *      collector, so the caller is better off to report ZERO instead.
1291  * ----------
1292  */
1293 PgStat_StatDBEntry *
1294 pgstat_fetch_stat_dbentry(Oid dbid)
1295 {
1296         /*
1297          * If not done for this transaction, read the statistics collector stats
1298          * file into some hash tables.
1299          */
1300         backend_read_statsfile();
1301
1302         /*
1303          * Lookup the requested database; return NULL if not found
1304          */
1305         return (PgStat_StatDBEntry *) hash_search(pgStatDBHash,
1306                                                                                           (void *) &dbid,
1307                                                                                           HASH_FIND, NULL);
1308 }
1309
1310
1311 /* ----------
1312  * pgstat_fetch_stat_tabentry() -
1313  *
1314  *      Support function for the SQL-callable pgstat* functions. Returns
1315  *      the collected statistics for one table or NULL. NULL doesn't mean
1316  *      that the table doesn't exist, it is just not yet known by the
1317  *      collector, so the caller is better off to report ZERO instead.
1318  * ----------
1319  */
1320 PgStat_StatTabEntry *
1321 pgstat_fetch_stat_tabentry(Oid relid)
1322 {
1323         Oid                     dbid;
1324         PgStat_StatDBEntry *dbentry;
1325         PgStat_StatTabEntry *tabentry;
1326
1327         /*
1328          * If not done for this transaction, read the statistics collector stats
1329          * file into some hash tables.
1330          */
1331         backend_read_statsfile();
1332
1333         /*
1334          * Lookup our database, then look in its table hash table.
1335          */
1336         dbid = MyDatabaseId;
1337         dbentry = (PgStat_StatDBEntry *) hash_search(pgStatDBHash,
1338                                                                                                  (void *) &dbid,
1339                                                                                                  HASH_FIND, NULL);
1340         if (dbentry != NULL && dbentry->tables != NULL)
1341         {
1342                 tabentry = (PgStat_StatTabEntry *) hash_search(dbentry->tables,
1343                                                                                                            (void *) &relid,
1344                                                                                                            HASH_FIND, NULL);
1345                 if (tabentry)
1346                         return tabentry;
1347         }
1348
1349         /*
1350          * If we didn't find it, maybe it's a shared table.
1351          */
1352         dbid = InvalidOid;
1353         dbentry = (PgStat_StatDBEntry *) hash_search(pgStatDBHash,
1354                                                                                                  (void *) &dbid,
1355                                                                                                  HASH_FIND, NULL);
1356         if (dbentry != NULL && dbentry->tables != NULL)
1357         {
1358                 tabentry = (PgStat_StatTabEntry *) hash_search(dbentry->tables,
1359                                                                                                            (void *) &relid,
1360                                                                                                            HASH_FIND, NULL);
1361                 if (tabentry)
1362                         return tabentry;
1363         }
1364
1365         return NULL;
1366 }
1367
1368
1369 /* ----------
1370  * pgstat_fetch_stat_beentry() -
1371  *
1372  *      Support function for the SQL-callable pgstat* functions. Returns
1373  *      the actual activity slot of one active backend. The caller is
1374  *      responsible for a check if the actual user is permitted to see
1375  *      that info (especially the querystring).
1376  * ----------
1377  */
1378 PgStat_StatBeEntry *
1379 pgstat_fetch_stat_beentry(int beid)
1380 {
1381         backend_read_statsfile();
1382
1383         if (beid < 1 || beid > pgStatNumBackends)
1384                 return NULL;
1385
1386         return &pgStatBeTable[beid - 1];
1387 }
1388
1389
1390 /* ----------
1391  * pgstat_fetch_stat_numbackends() -
1392  *
1393  *      Support function for the SQL-callable pgstat* functions. Returns
1394  *      the maximum current backend id.
1395  * ----------
1396  */
1397 int
1398 pgstat_fetch_stat_numbackends(void)
1399 {
1400         backend_read_statsfile();
1401
1402         return pgStatNumBackends;
1403 }
1404
1405
1406
1407 /* ------------------------------------------------------------
1408  * Local support functions follow
1409  * ------------------------------------------------------------
1410  */
1411
1412
1413 /* ----------
1414  * pgstat_setheader() -
1415  *
1416  *              Set common header fields in a statistics message
1417  * ----------
1418  */
1419 static void
1420 pgstat_setheader(PgStat_MsgHdr *hdr, StatMsgType mtype)
1421 {
1422         hdr->m_type = mtype;
1423         hdr->m_backendid = MyBackendId;
1424         hdr->m_procpid = MyProcPid;
1425 }
1426
1427
1428 /* ----------
1429  * pgstat_send() -
1430  *
1431  *              Send out one statistics message to the collector
1432  * ----------
1433  */
1434 static void
1435 pgstat_send(void *msg, int len)
1436 {
1437         if (pgStatSock < 0)
1438                 return;
1439
1440         ((PgStat_MsgHdr *) msg)->m_size = len;
1441
1442 #ifdef USE_ASSERT_CHECKING
1443         if (send(pgStatSock, msg, len, 0) < 0)
1444                 elog(LOG, "could not send to statistics collector: %m");
1445 #else
1446         send(pgStatSock, msg, len, 0);
1447         /* We deliberately ignore any error from send() */
1448 #endif
1449 }
1450
1451
1452 /* ----------
1453  * PgstatBufferMain() -
1454  *
1455  *      Start up the statistics buffer process.  This is the body of the
1456  *      postmaster child process.
1457  *
1458  *      The argc/argv parameters are valid only in EXEC_BACKEND case.
1459  * ----------
1460  */
1461 NON_EXEC_STATIC void
1462 PgstatBufferMain(int argc, char *argv[])
1463 {
1464         IsUnderPostmaster = true;       /* we are a postmaster subprocess now */
1465
1466         MyProcPid = getpid();           /* reset MyProcPid */
1467
1468         /* Lose the postmaster's on-exit routines */
1469         on_exit_reset();
1470
1471         /*
1472          * Ignore all signals usually bound to some action in the postmaster,
1473          * except for SIGCHLD and SIGQUIT --- see pgstat_recvbuffer.
1474          */
1475         pqsignal(SIGHUP, SIG_IGN);
1476         pqsignal(SIGINT, SIG_IGN);
1477         pqsignal(SIGTERM, SIG_IGN);
1478         pqsignal(SIGQUIT, pgstat_exit);
1479         pqsignal(SIGALRM, SIG_IGN);
1480         pqsignal(SIGPIPE, SIG_IGN);
1481         pqsignal(SIGUSR1, SIG_IGN);
1482         pqsignal(SIGUSR2, SIG_IGN);
1483         pqsignal(SIGCHLD, pgstat_die);
1484         pqsignal(SIGTTIN, SIG_DFL);
1485         pqsignal(SIGTTOU, SIG_DFL);
1486         pqsignal(SIGCONT, SIG_DFL);
1487         pqsignal(SIGWINCH, SIG_DFL);
1488         /* unblock will happen in pgstat_recvbuffer */
1489
1490 #ifdef EXEC_BACKEND
1491         pgstat_parseArgs(argc, argv);
1492 #endif
1493
1494         /*
1495          * Start a buffering process to read from the socket, so we have a little
1496          * more time to process incoming messages.
1497          *
1498          * NOTE: the process structure is: postmaster is parent of buffer process
1499          * is parent of collector process.      This way, the buffer can detect
1500          * collector failure via SIGCHLD, whereas otherwise it wouldn't notice
1501          * collector failure until it tried to write on the pipe.  That would mean
1502          * that after the postmaster started a new collector, we'd have two buffer
1503          * processes competing to read from the UDP socket --- not good.
1504          */
1505         if (pgpipe(pgStatPipe) < 0)
1506                 ereport(ERROR,
1507                                 (errcode_for_socket_access(),
1508                                  errmsg("could not create pipe for statistics buffer: %m")));
1509
1510         /* child becomes collector process */
1511 #ifdef EXEC_BACKEND
1512         pgStatCollectorPid = pgstat_forkexec(STAT_PROC_COLLECTOR);
1513 #else
1514         pgStatCollectorPid = fork();
1515 #endif
1516         switch (pgStatCollectorPid)
1517         {
1518                 case -1:
1519                         ereport(ERROR,
1520                                         (errmsg("could not fork statistics collector: %m")));
1521
1522 #ifndef EXEC_BACKEND
1523                 case 0:
1524                         /* child becomes collector process */
1525                         PgstatCollectorMain(0, NULL);
1526                         break;
1527 #endif
1528
1529                 default:
1530                         /* parent becomes buffer process */
1531                         closesocket(pgStatPipe[0]);
1532                         pgstat_recvbuffer();
1533         }
1534         exit(0);
1535 }
1536
1537
1538 /* ----------
1539  * PgstatCollectorMain() -
1540  *
1541  *      Start up the statistics collector itself.  This is the body of the
1542  *      postmaster grandchild process.
1543  *
1544  *      The argc/argv parameters are valid only in EXEC_BACKEND case.
1545  * ----------
1546  */
1547 NON_EXEC_STATIC void
1548 PgstatCollectorMain(int argc, char *argv[])
1549 {
1550         PgStat_Msg      msg;
1551         fd_set          rfds;
1552         int                     readPipe;
1553         int                     len = 0;
1554         struct itimerval timeval;
1555         HASHCTL         hash_ctl;
1556         bool            need_timer = false;
1557         
1558         MyProcPid = getpid();           /* reset MyProcPid */
1559
1560         /*
1561          * Reset signal handling.  With the exception of restoring default SIGCHLD
1562          * and SIGQUIT handling, this is a no-op in the non-EXEC_BACKEND case
1563          * because we'll have inherited these settings from the buffer process;
1564          * but it's not a no-op for EXEC_BACKEND.
1565          */
1566         pqsignal(SIGHUP, SIG_IGN);
1567         pqsignal(SIGINT, SIG_IGN);
1568         pqsignal(SIGTERM, SIG_IGN);
1569 #ifndef WIN32
1570         pqsignal(SIGQUIT, SIG_IGN);
1571 #else
1572         /* kluge to allow buffer process to kill collector; FIXME */
1573         pqsignal(SIGQUIT, pgstat_exit);
1574 #endif
1575         pqsignal(SIGALRM, force_statwrite);
1576         pqsignal(SIGPIPE, SIG_IGN);
1577         pqsignal(SIGUSR1, SIG_IGN);
1578         pqsignal(SIGUSR2, SIG_IGN);
1579         pqsignal(SIGCHLD, SIG_DFL);
1580         pqsignal(SIGTTIN, SIG_DFL);
1581         pqsignal(SIGTTOU, SIG_DFL);
1582         pqsignal(SIGCONT, SIG_DFL);
1583         pqsignal(SIGWINCH, SIG_DFL);
1584         PG_SETMASK(&UnBlockSig);
1585
1586 #ifdef EXEC_BACKEND
1587         pgstat_parseArgs(argc, argv);
1588 #endif
1589
1590         /* Close unwanted files */
1591         closesocket(pgStatPipe[1]);
1592         closesocket(pgStatSock);
1593
1594         /*
1595          * Identify myself via ps
1596          */
1597         init_ps_display("stats collector process", "", "");
1598         set_ps_display("");
1599
1600         need_statwrite = true;
1601
1602         MemSet(&timeval, 0, sizeof(struct itimerval));
1603         timeval.it_value.tv_sec = PGSTAT_STAT_INTERVAL / 1000;
1604         timeval.it_value.tv_usec = PGSTAT_STAT_INTERVAL % 1000;
1605
1606         /*
1607          * Read in an existing statistics stats file or initialize the stats to
1608          * zero.
1609          */
1610         pgStatRunningInCollector = true;
1611         pgstat_read_statsfile(&pgStatDBHash, InvalidOid, NULL, NULL);
1612
1613         /*
1614          * Create the dead backend hashtable
1615          */
1616         memset(&hash_ctl, 0, sizeof(hash_ctl));
1617         hash_ctl.keysize = sizeof(int);
1618         hash_ctl.entrysize = sizeof(PgStat_StatBeDead);
1619         hash_ctl.hash = tag_hash;
1620         pgStatBeDead = hash_create("Dead Backends", PGSTAT_BE_HASH_SIZE,
1621                                                            &hash_ctl, HASH_ELEM | HASH_FUNCTION);
1622
1623         /*
1624          * Create the known backends table
1625          */
1626         pgStatBeTable = (PgStat_StatBeEntry *)
1627                 palloc0(sizeof(PgStat_StatBeEntry) * MaxBackends);
1628
1629         readPipe = pgStatPipe[0];
1630
1631         /*
1632          * Process incoming messages and handle all the reporting stuff until
1633          * there are no more messages.
1634          */
1635         for (;;)
1636         {
1637                 if (need_statwrite)
1638                 {
1639                         pgstat_write_statsfile();
1640                         need_statwrite = false;
1641                         need_timer = true;
1642                 }
1643
1644                 /*
1645                  * Setup the descriptor set for select(2)
1646                  */
1647                 FD_ZERO(&rfds);
1648                 FD_SET(readPipe, &rfds);
1649
1650                 /*
1651                  * Now wait for something to do.
1652                  */
1653                 if (select(readPipe + 1, &rfds, NULL, NULL, NULL) < 0)
1654                 {
1655                         if (errno == EINTR)
1656                                 continue;
1657                         ereport(ERROR,
1658                                         (errcode_for_socket_access(),
1659                                          errmsg("select() failed in statistics collector: %m")));
1660                 }
1661
1662                 /*
1663                  * Check if there is a new statistics message to collect.
1664                  */
1665                 if (FD_ISSET(readPipe, &rfds))
1666                 {
1667                         /*
1668                          * We may need to issue multiple read calls in case the buffer
1669                          * process didn't write the message in a single write, which is
1670                          * possible since it dumps its buffer bytewise. In any case, we'd
1671                          * need two reads since we don't know the message length
1672                          * initially.
1673                          */
1674                         int                     nread = 0;
1675                         int                     targetlen = sizeof(PgStat_MsgHdr);              /* initial */
1676                         bool            pipeEOF = false;
1677
1678                         while (nread < targetlen)
1679                         {
1680                                 len = piperead(readPipe, ((char *) &msg) + nread,
1681                                                            targetlen - nread);
1682                                 if (len < 0)
1683                                 {
1684                                         if (errno == EINTR)
1685                                                 continue;
1686                                         ereport(ERROR,
1687                                                         (errcode_for_socket_access(),
1688                                                          errmsg("could not read from statistics collector pipe: %m")));
1689                                 }
1690                                 if (len == 0)   /* EOF on the pipe! */
1691                                 {
1692                                         pipeEOF = true;
1693                                         break;
1694                                 }
1695                                 nread += len;
1696                                 if (nread == sizeof(PgStat_MsgHdr))
1697                                 {
1698                                         /* we have the header, compute actual msg length */
1699                                         targetlen = msg.msg_hdr.m_size;
1700                                         if (targetlen < (int) sizeof(PgStat_MsgHdr) ||
1701                                                 targetlen > (int) sizeof(msg))
1702                                         {
1703                                                 /*
1704                                                  * Bogus message length implies that we got out of
1705                                                  * sync with the buffer process somehow. Abort so that
1706                                                  * we can restart both processes.
1707                                                  */
1708                                                 ereport(ERROR,
1709                                                           (errmsg("invalid statistics message length")));
1710                                         }
1711                                 }
1712                         }
1713
1714                         /*
1715                          * EOF on the pipe implies that the buffer process exited. Fall
1716                          * out of outer loop.
1717                          */
1718                         if (pipeEOF)
1719                                 break;
1720
1721                         /*
1722                          * Distribute the message to the specific function handling it.
1723                          */
1724                         switch (msg.msg_hdr.m_type)
1725                         {
1726                                 case PGSTAT_MTYPE_DUMMY:
1727                                         break;
1728
1729                                 case PGSTAT_MTYPE_BESTART:
1730                                         pgstat_recv_bestart((PgStat_MsgBestart *) &msg, nread);
1731                                         break;
1732
1733                                 case PGSTAT_MTYPE_BETERM:
1734                                         pgstat_recv_beterm((PgStat_MsgBeterm *) &msg, nread);
1735                                         break;
1736
1737                                 case PGSTAT_MTYPE_TABSTAT:
1738                                         pgstat_recv_tabstat((PgStat_MsgTabstat *) &msg, nread);
1739                                         break;
1740
1741                                 case PGSTAT_MTYPE_TABPURGE:
1742                                         pgstat_recv_tabpurge((PgStat_MsgTabpurge *) &msg, nread);
1743                                         break;
1744
1745                                 case PGSTAT_MTYPE_ACTIVITY:
1746                                         pgstat_recv_activity((PgStat_MsgActivity *) &msg, nread);
1747                                         break;
1748
1749                                 case PGSTAT_MTYPE_DROPDB:
1750                                         pgstat_recv_dropdb((PgStat_MsgDropdb *) &msg, nread);
1751                                         break;
1752
1753                                 case PGSTAT_MTYPE_RESETCOUNTER:
1754                                         pgstat_recv_resetcounter((PgStat_MsgResetcounter *) &msg,
1755                                                                                          nread);
1756                                         break;
1757
1758                                 case PGSTAT_MTYPE_AUTOVAC_START:
1759                                         pgstat_recv_autovac((PgStat_MsgAutovacStart *) &msg, nread);
1760                                         break;
1761
1762                                 case PGSTAT_MTYPE_VACUUM:
1763                                         pgstat_recv_vacuum((PgStat_MsgVacuum *) &msg, nread);
1764                                         break;
1765
1766                                 case PGSTAT_MTYPE_ANALYZE:
1767                                         pgstat_recv_analyze((PgStat_MsgAnalyze *) &msg, nread);
1768                                         break;
1769
1770                                 default:
1771                                         break;
1772                         }
1773
1774                         /*
1775                          * Globally count messages.
1776                          */
1777                         pgStatNumMessages++;
1778
1779                         if (need_timer)
1780                         {
1781                                 if (setitimer(ITIMER_REAL, &timeval, NULL))
1782                                         ereport(ERROR,
1783                                                   (errmsg("unable to set statistics collector timer: %m")));
1784                                 need_timer = false;
1785                         }
1786                 }
1787
1788                 /*
1789                  * Note that we do NOT check for postmaster exit inside the loop; only
1790                  * EOF on the buffer pipe causes us to fall out.  This ensures we
1791                  * don't exit prematurely if there are still a few messages in the
1792                  * buffer or pipe at postmaster shutdown.
1793                  */
1794         }
1795
1796         /*
1797          * Okay, we saw EOF on the buffer pipe, so there are no more messages to
1798          * process.  If the buffer process quit because of postmaster shutdown, we
1799          * want to save the final stats to reuse at next startup. But if the
1800          * buffer process failed, it seems best not to (there may even now be a
1801          * new collector firing up, and we don't want it to read a
1802          * partially-rewritten stats file).
1803          */
1804         if (!PostmasterIsAlive(false))
1805                 pgstat_write_statsfile();
1806 }
1807
1808
1809 static void
1810 force_statwrite(SIGNAL_ARGS)
1811 {
1812         need_statwrite = true;
1813 }
1814
1815
1816 /* ----------
1817  * pgstat_recvbuffer() -
1818  *
1819  *      This is the body of the separate buffering process. Its only
1820  *      purpose is to receive messages from the UDP socket as fast as
1821  *      possible and forward them over a pipe into the collector itself.
1822  *      If the collector is slow to absorb messages, they are buffered here.
1823  * ----------
1824  */
1825 static void
1826 pgstat_recvbuffer(void)
1827 {
1828         fd_set          rfds;
1829         fd_set          wfds;
1830         struct timeval timeout;
1831         int                     writePipe = pgStatPipe[1];
1832         int                     maxfd;
1833         int                     len;
1834         int                     xfr;
1835         int                     frm;
1836         PgStat_Msg      input_buffer;
1837         char       *msgbuffer;
1838         int                     msg_send = 0;   /* next send index in buffer */
1839         int                     msg_recv = 0;   /* next receive index */
1840         int                     msg_have = 0;   /* number of bytes stored */
1841         bool            overflow = false;
1842
1843         /*
1844          * Identify myself via ps
1845          */
1846         init_ps_display("stats buffer process", "", "");
1847         set_ps_display("");
1848
1849         /*
1850          * We want to die if our child collector process does.  There are two ways
1851          * we might notice that it has died: receive SIGCHLD, or get a write
1852          * failure on the pipe leading to the child.  We can set SIGPIPE to kill
1853          * us here.  Our SIGCHLD handler was already set up before we forked (must
1854          * do it that way, else it's a race condition).
1855          */
1856         pqsignal(SIGPIPE, SIG_DFL);
1857         PG_SETMASK(&UnBlockSig);
1858
1859         /*
1860          * Set the write pipe to nonblock mode, so that we cannot block when the
1861          * collector falls behind.
1862          */
1863         if (!pg_set_noblock(writePipe))
1864                 ereport(ERROR,
1865                                 (errcode_for_socket_access(),
1866                                  errmsg("could not set statistics collector pipe to nonblocking mode: %m")));
1867
1868         /*
1869          * Allocate the message buffer
1870          */
1871         msgbuffer = (char *) palloc(PGSTAT_RECVBUFFERSZ);
1872
1873         /*
1874          * Loop forever
1875          */
1876         for (;;)
1877         {
1878                 FD_ZERO(&rfds);
1879                 FD_ZERO(&wfds);
1880                 maxfd = -1;
1881
1882                 /*
1883                  * As long as we have buffer space we add the socket to the read
1884                  * descriptor set.
1885                  */
1886                 if (msg_have <= (int) (PGSTAT_RECVBUFFERSZ - sizeof(PgStat_Msg)))
1887                 {
1888                         FD_SET(pgStatSock, &rfds);
1889                         maxfd = pgStatSock;
1890                         overflow = false;
1891                 }
1892                 else
1893                 {
1894                         if (!overflow)
1895                         {
1896                                 ereport(LOG,
1897                                                 (errmsg("statistics buffer is full")));
1898                                 overflow = true;
1899                         }
1900                 }
1901
1902                 /*
1903                  * If we have messages to write out, we add the pipe to the write
1904                  * descriptor set.
1905                  */
1906                 if (msg_have > 0)
1907                 {
1908                         FD_SET(writePipe, &wfds);
1909                         if (writePipe > maxfd)
1910                                 maxfd = writePipe;
1911                 }
1912
1913                 /*
1914                  * Wait for some work to do; but not for more than 10 seconds. (This
1915                  * determines how quickly we will shut down after an ungraceful
1916                  * postmaster termination; so it needn't be very fast.)  struct timeout
1917                  * is modified by some operating systems.
1918                  */
1919                 timeout.tv_sec = 10;
1920                 timeout.tv_usec = 0;
1921
1922                 if (select(maxfd + 1, &rfds, &wfds, NULL, &timeout) < 0)
1923                 {
1924                         if (errno == EINTR)
1925                                 continue;
1926                         ereport(ERROR,
1927                                         (errcode_for_socket_access(),
1928                                          errmsg("select() failed in statistics buffer: %m")));
1929                 }
1930
1931                 /*
1932                  * If there is a message on the socket, read it and check for
1933                  * validity.
1934                  */
1935                 if (FD_ISSET(pgStatSock, &rfds))
1936                 {
1937                         len = recv(pgStatSock, (char *) &input_buffer,
1938                                            sizeof(PgStat_Msg), 0);
1939                         if (len < 0)
1940                                 ereport(ERROR,
1941                                                 (errcode_for_socket_access(),
1942                                                  errmsg("could not read statistics message: %m")));
1943
1944                         /*
1945                          * We ignore messages that are smaller than our common header
1946                          */
1947                         if (len < sizeof(PgStat_MsgHdr))
1948                                 continue;
1949
1950                         /*
1951                          * The received length must match the length in the header
1952                          */
1953                         if (input_buffer.msg_hdr.m_size != len)
1954                                 continue;
1955
1956                         /*
1957                          * O.K. - we accept this message.  Copy it to the circular
1958                          * msgbuffer.
1959                          */
1960                         frm = 0;
1961                         while (len > 0)
1962                         {
1963                                 xfr = PGSTAT_RECVBUFFERSZ - msg_recv;
1964                                 if (xfr > len)
1965                                         xfr = len;
1966                                 Assert(xfr > 0);
1967                                 memcpy(msgbuffer + msg_recv,
1968                                            ((char *) &input_buffer) + frm,
1969                                            xfr);
1970                                 msg_recv += xfr;
1971                                 if (msg_recv == PGSTAT_RECVBUFFERSZ)
1972                                         msg_recv = 0;
1973                                 msg_have += xfr;
1974                                 frm += xfr;
1975                                 len -= xfr;
1976                         }
1977                 }
1978
1979                 /*
1980                  * If the collector is ready to receive, write some data into his
1981                  * pipe.  We may or may not be able to write all that we have.
1982                  *
1983                  * NOTE: if what we have is less than PIPE_BUF bytes but more than the
1984                  * space available in the pipe buffer, most kernels will refuse to
1985                  * write any of it, and will return EAGAIN.  This means we will
1986                  * busy-loop until the situation changes (either because the collector
1987                  * caught up, or because more data arrives so that we have more than
1988                  * PIPE_BUF bytes buffered).  This is not good, but is there any way
1989                  * around it?  We have no way to tell when the collector has caught
1990                  * up...
1991                  */
1992                 if (FD_ISSET(writePipe, &wfds))
1993                 {
1994                         xfr = PGSTAT_RECVBUFFERSZ - msg_send;
1995                         if (xfr > msg_have)
1996                                 xfr = msg_have;
1997                         Assert(xfr > 0);
1998                         len = pipewrite(writePipe, msgbuffer + msg_send, xfr);
1999                         if (len < 0)
2000                         {
2001                                 if (errno == EINTR || errno == EAGAIN)
2002                                         continue;       /* not enough space in pipe */
2003                                 ereport(ERROR,
2004                                                 (errcode_for_socket_access(),
2005                                 errmsg("could not write to statistics collector pipe: %m")));
2006                         }
2007                         /* NB: len < xfr is okay */
2008                         msg_send += len;
2009                         if (msg_send == PGSTAT_RECVBUFFERSZ)
2010                                 msg_send = 0;
2011                         msg_have -= len;
2012                 }
2013
2014                 /*
2015                  * Make sure we forwarded all messages before we check for postmaster
2016                  * termination.
2017                  */
2018                 if (msg_have != 0 || FD_ISSET(pgStatSock, &rfds))
2019                         continue;
2020
2021                 /*
2022                  * If the postmaster has terminated, we die too.  (This is no longer
2023                  * the normal exit path, however.)
2024                  */
2025                 if (!PostmasterIsAlive(true))
2026                         exit(0);
2027         }
2028 }
2029
2030 /* SIGQUIT signal handler for buffer process */
2031 static void
2032 pgstat_exit(SIGNAL_ARGS)
2033 {
2034         /*
2035          * For now, we just nail the doors shut and get out of town.  It might be
2036          * cleaner to allow any pending messages to be sent, but that creates a
2037          * tradeoff against speed of exit.
2038          */
2039
2040         /*
2041          * If running in bufferer, kill our collector as well. On some broken
2042          * win32 systems, it does not shut down automatically because of issues
2043          * with socket inheritance.  XXX so why not fix the socket inheritance...
2044          */
2045 #ifdef WIN32
2046         if (pgStatCollectorPid > 0)
2047                 kill(pgStatCollectorPid, SIGQUIT);
2048 #endif
2049         exit(0);
2050 }
2051
2052 /* SIGCHLD signal handler for buffer process */
2053 static void
2054 pgstat_die(SIGNAL_ARGS)
2055 {
2056         exit(1);
2057 }
2058
2059
2060 /* ----------
2061  * pgstat_add_backend() -
2062  *
2063  *      Support function to keep our backend list up to date.
2064  * ----------
2065  */
2066 static int
2067 pgstat_add_backend(PgStat_MsgHdr *msg)
2068 {
2069         PgStat_StatBeEntry *beentry;
2070         PgStat_StatBeDead *deadbe;
2071
2072         /*
2073          * Check that the backend ID is valid
2074          */
2075         if (msg->m_backendid < 1 || msg->m_backendid > MaxBackends)
2076         {
2077                 ereport(LOG,
2078                                 (errmsg("invalid server process ID %d", msg->m_backendid)));
2079                 return -1;
2080         }
2081
2082         /*
2083          * Get the slot for this backendid.
2084          */
2085         beentry = &pgStatBeTable[msg->m_backendid - 1];
2086
2087         /*
2088          * If the slot contains the PID of this backend, everything is fine and we
2089          * have nothing to do. Note that all the slots are zero'd out when the
2090          * collector is started. We assume that a slot is "empty" iff procpid ==
2091          * 0.
2092          */
2093         if (beentry->procpid > 0 && beentry->procpid == msg->m_procpid)
2094                 return 0;
2095
2096         /*
2097          * Lookup if this backend is known to be dead. This can be caused due to
2098          * messages arriving in the wrong order - e.g. postmaster's BETERM message
2099          * might have arrived before we received all the backends stats messages,
2100          * or even a new backend with the same backendid was faster in sending his
2101          * BESTART.
2102          *
2103          * If the backend is known to be dead, we ignore this add.
2104          */
2105         deadbe = (PgStat_StatBeDead *) hash_search(pgStatBeDead,
2106                                                                                            (void *) &(msg->m_procpid),
2107                                                                                            HASH_FIND, NULL);
2108         if (deadbe)
2109                 return 1;
2110
2111         /*
2112          * Backend isn't known to be dead. If it's slot is currently used, we have
2113          * to kick out the old backend.
2114          */
2115         if (beentry->procpid > 0)
2116                 pgstat_sub_backend(beentry->procpid);
2117
2118         /* Must be able to distinguish between empty and non-empty slots */
2119         Assert(msg->m_procpid > 0);
2120
2121         /* Put this new backend into the slot */
2122         beentry->procpid = msg->m_procpid;
2123         beentry->start_timestamp = GetCurrentTimestamp();
2124         beentry->activity_start_timestamp = 0;
2125         beentry->activity[0] = '\0';
2126
2127         /*
2128          * We can't initialize the rest of the data in this slot until we see the
2129          * BESTART message. Therefore, we set the database and user to sentinel
2130          * values, to indicate "undefined". There is no easy way to do this for
2131          * the client address, so make sure to check that the database or user are
2132          * defined before accessing the client address.
2133          */
2134         beentry->userid = InvalidOid;
2135         beentry->databaseid = InvalidOid;
2136
2137         return 0;
2138 }
2139
2140 /*
2141  * Lookup the hash table entry for the specified database. If no hash
2142  * table entry exists, initialize it, if the create parameter is true.
2143  * Else, return NULL.
2144  */
2145 static PgStat_StatDBEntry *
2146 pgstat_get_db_entry(Oid databaseid, bool create)
2147 {
2148         PgStat_StatDBEntry *result;
2149         bool            found;
2150         HASHACTION      action = (create ? HASH_ENTER : HASH_FIND);
2151
2152         /* Lookup or create the hash table entry for this database */
2153         result = (PgStat_StatDBEntry *) hash_search(pgStatDBHash,
2154                                                                                                 &databaseid,
2155                                                                                                 action, &found);
2156
2157         if (!create && !found)
2158                 return NULL;
2159
2160         /* If not found, initialize the new one. */
2161         if (!found)
2162         {
2163                 HASHCTL         hash_ctl;
2164
2165                 result->tables = NULL;
2166                 result->n_xact_commit = 0;
2167                 result->n_xact_rollback = 0;
2168                 result->n_blocks_fetched = 0;
2169                 result->n_blocks_hit = 0;
2170                 result->destroy = 0;
2171                 result->last_autovac_time = 0;
2172
2173                 memset(&hash_ctl, 0, sizeof(hash_ctl));
2174                 hash_ctl.keysize = sizeof(Oid);
2175                 hash_ctl.entrysize = sizeof(PgStat_StatTabEntry);
2176                 hash_ctl.hash = oid_hash;
2177                 result->tables = hash_create("Per-database table",
2178                                                                          PGSTAT_TAB_HASH_SIZE,
2179                                                                          &hash_ctl,
2180                                                                          HASH_ELEM | HASH_FUNCTION);
2181         }
2182
2183         return result;
2184 }
2185
2186 /* ----------
2187  * pgstat_sub_backend() -
2188  *
2189  *      Remove a backend from the actual backends list.
2190  * ----------
2191  */
2192 static void
2193 pgstat_sub_backend(int procpid)
2194 {
2195         int                     i;
2196         PgStat_StatBeDead *deadbe;
2197         bool            found;
2198
2199         /*
2200          * Search in the known-backends table for the slot containing this PID.
2201          */
2202         for (i = 0; i < MaxBackends; i++)
2203         {
2204                 if (pgStatBeTable[i].procpid == procpid)
2205                 {
2206                         /*
2207                          * That's him. Add an entry to the known to be dead backends. Due
2208                          * to possible misorder in the arrival of UDP packets it's
2209                          * possible that even if we know the backend is dead, there could
2210                          * still be messages queued that arrive later. Those messages must
2211                          * not cause our number of backends statistics to get screwed up,
2212                          * so we remember for a couple of seconds that this PID is dead
2213                          * and ignore them (only the counting of backends, not the table
2214                          * access stats they sent).
2215                          */
2216                         deadbe = (PgStat_StatBeDead *) hash_search(pgStatBeDead,
2217                                                                                                            (void *) &procpid,
2218                                                                                                            HASH_ENTER,
2219                                                                                                            &found);
2220
2221                         if (!found)
2222                         {
2223                                 deadbe->backendid = i + 1;
2224                                 deadbe->destroy = PGSTAT_DESTROY_COUNT;
2225                         }
2226
2227                         /*
2228                          * Declare the backend slot empty.
2229                          */
2230                         pgStatBeTable[i].procpid = 0;
2231                         return;
2232                 }
2233         }
2234
2235         /*
2236          * No big problem if not found. This can happen if UDP messages arrive out
2237          * of order here.
2238          */
2239 }
2240
2241
2242 /* ----------
2243  * pgstat_write_statsfile() -
2244  *
2245  *      Tell the news.
2246  * ----------
2247  */
2248 static void
2249 pgstat_write_statsfile(void)
2250 {
2251         HASH_SEQ_STATUS hstat;
2252         HASH_SEQ_STATUS tstat;
2253         PgStat_StatDBEntry *dbentry;
2254         PgStat_StatTabEntry *tabentry;
2255         PgStat_StatBeDead *deadbe;
2256         FILE       *fpout;
2257         int                     i;
2258         int32           format_id;
2259
2260         /*
2261          * Open the statistics temp file to write out the current values.
2262          */
2263         fpout = fopen(PGSTAT_STAT_TMPFILE, PG_BINARY_W);
2264         if (fpout == NULL)
2265         {
2266                 ereport(LOG,
2267                                 (errcode_for_file_access(),
2268                                  errmsg("could not open temporary statistics file \"%s\": %m",
2269                                                 PGSTAT_STAT_TMPFILE)));
2270                 return;
2271         }
2272
2273         /*
2274          * Write the file header --- currently just a format ID.
2275          */
2276         format_id = PGSTAT_FILE_FORMAT_ID;
2277         fwrite(&format_id, sizeof(format_id), 1, fpout);
2278
2279         /*
2280          * Walk through the database table.
2281          */
2282         hash_seq_init(&hstat, pgStatDBHash);
2283         while ((dbentry = (PgStat_StatDBEntry *) hash_seq_search(&hstat)) != NULL)
2284         {
2285                 /*
2286                  * If this database is marked destroyed, count down and do so if it
2287                  * reaches 0.
2288                  */
2289                 if (dbentry->destroy > 0)
2290                 {
2291                         if (--(dbentry->destroy) == 0)
2292                         {
2293                                 if (dbentry->tables != NULL)
2294                                         hash_destroy(dbentry->tables);
2295
2296                                 if (hash_search(pgStatDBHash,
2297                                                                 (void *) &(dbentry->databaseid),
2298                                                                 HASH_REMOVE, NULL) == NULL)
2299                                         ereport(ERROR,
2300                                                         (errmsg("database hash table corrupted "
2301                                                                         "during cleanup --- abort")));
2302                         }
2303
2304                         /*
2305                          * Don't include statistics for it.
2306                          */
2307                         continue;
2308                 }
2309
2310                 /*
2311                  * Write out the DB line including the number of live backends.
2312                  */
2313                 fputc('D', fpout);
2314                 fwrite(dbentry, sizeof(PgStat_StatDBEntry), 1, fpout);
2315
2316                 /*
2317                  * Walk through the databases access stats per table.
2318                  */
2319                 hash_seq_init(&tstat, dbentry->tables);
2320                 while ((tabentry = (PgStat_StatTabEntry *) hash_seq_search(&tstat)) != NULL)
2321                 {
2322                         /*
2323                          * If table entry marked for destruction, same as above for the
2324                          * database entry.
2325                          */
2326                         if (tabentry->destroy > 0)
2327                         {
2328                                 if (--(tabentry->destroy) == 0)
2329                                 {
2330                                         if (hash_search(dbentry->tables,
2331                                                                         (void *) &(tabentry->tableid),
2332                                                                         HASH_REMOVE, NULL) == NULL)
2333                                         {
2334                                                 ereport(ERROR,
2335                                                                 (errmsg("tables hash table for "
2336                                                                                 "database %u corrupted during "
2337                                                                                 "cleanup --- abort",
2338                                                                                 dbentry->databaseid)));
2339                                         }
2340                                 }
2341                                 continue;
2342                         }
2343
2344                         /*
2345                          * At least we think this is still a live table. Print its access
2346                          * stats.
2347                          */
2348                         fputc('T', fpout);
2349                         fwrite(tabentry, sizeof(PgStat_StatTabEntry), 1, fpout);
2350                 }
2351
2352                 /*
2353                  * Mark the end of this DB
2354                  */
2355                 fputc('d', fpout);
2356         }
2357
2358         /*
2359          * Write out the known running backends to the stats file.
2360          */
2361         i = MaxBackends;
2362         fputc('M', fpout);
2363         fwrite(&i, sizeof(i), 1, fpout);
2364
2365         for (i = 0; i < MaxBackends; i++)
2366         {
2367                 if (pgStatBeTable[i].procpid > 0)
2368                 {
2369                         fputc('B', fpout);
2370                         fwrite(&pgStatBeTable[i], sizeof(PgStat_StatBeEntry), 1, fpout);
2371                 }
2372         }
2373
2374         /*
2375          * No more output to be done. Close the temp file and replace the old
2376          * pgstat.stat with it.
2377          */
2378         fputc('E', fpout);
2379         if (fclose(fpout) < 0)
2380         {
2381                 ereport(LOG,
2382                                 (errcode_for_file_access(),
2383                            errmsg("could not close temporary statistics file \"%s\": %m",
2384                                           PGSTAT_STAT_TMPFILE)));
2385         }
2386         else
2387         {
2388                 if (rename(PGSTAT_STAT_TMPFILE, PGSTAT_STAT_FILENAME) < 0)
2389                 {
2390                         ereport(LOG,
2391                                         (errcode_for_file_access(),
2392                                          errmsg("could not rename temporary statistics file \"%s\" to \"%s\": %m",
2393                                                         PGSTAT_STAT_TMPFILE, PGSTAT_STAT_FILENAME)));
2394                 }
2395         }
2396
2397         /*
2398          * Clear out the dead backends table
2399          */
2400         hash_seq_init(&hstat, pgStatBeDead);
2401         while ((deadbe = (PgStat_StatBeDead *) hash_seq_search(&hstat)) != NULL)
2402         {
2403                 /*
2404                  * Count down the destroy delay and remove entries where it reaches 0.
2405                  */
2406                 if (--(deadbe->destroy) <= 0)
2407                 {
2408                         if (hash_search(pgStatBeDead,
2409                                                         (void *) &(deadbe->procpid),
2410                                                         HASH_REMOVE, NULL) == NULL)
2411                         {
2412                                 ereport(ERROR,
2413                                                 (errmsg("dead-server-process hash table corrupted "
2414                                                                 "during cleanup --- abort")));
2415                         }
2416                 }
2417         }
2418 }
2419
2420 /*
2421  * qsort/bsearch comparison routine for PIDs
2422  *
2423  * We assume PIDs are nonnegative, so there's no overflow risk
2424  */
2425 static int
2426 comparePids(const void *v1, const void *v2)
2427 {
2428         return *((const int *) v1) - *((const int *) v2);
2429 }
2430
2431 /* ----------
2432  * pgstat_read_statsfile() -
2433  *
2434  *      Reads in an existing statistics collector and initializes the
2435  *      databases' hash table (whose entries point to the tables' hash tables)
2436  *      and the current backend table.
2437  * ----------
2438  */
2439 static void
2440 pgstat_read_statsfile(HTAB **dbhash, Oid onlydb,
2441                                           PgStat_StatBeEntry **betab, int *numbackends)
2442 {
2443         PgStat_StatDBEntry *dbentry;
2444         PgStat_StatDBEntry dbbuf;
2445         PgStat_StatTabEntry *tabentry;
2446         PgStat_StatTabEntry tabbuf;
2447         PgStat_StatBeEntry *beentry;
2448         HASHCTL         hash_ctl;
2449         HTAB       *tabhash = NULL;
2450         FILE       *fpin;
2451         int32           format_id;
2452         int                     maxbackends = 0;
2453         int                     havebackends = 0;
2454         bool            found;
2455         int                *live_pids;
2456         MemoryContext use_mcxt;
2457         int                     mcxt_flags;
2458
2459         /*
2460          * If running in the collector or the autovacuum process, we use the
2461          * DynaHashCxt memory context.  If running in a backend, we use the
2462          * TopTransactionContext instead, so the caller must only know the last
2463          * XactId when this call happened to know if his tables are still valid or
2464          * already gone!
2465          *
2466          * Also, if running in a regular backend, we check backend entries against
2467          * the PGPROC array so that we can detect stale entries.  This lets us
2468          * discard entries whose BETERM message got lost for some reason.
2469          */
2470         if (pgStatRunningInCollector || IsAutoVacuumProcess())
2471         {
2472                 use_mcxt = NULL;
2473                 mcxt_flags = 0;
2474                 live_pids = NULL;
2475         }
2476         else
2477         {
2478                 use_mcxt = TopTransactionContext;
2479                 mcxt_flags = HASH_CONTEXT;
2480                 live_pids = GetAllBackendPids();
2481                 /* Sort the PID array so we can use bsearch */
2482                 if (live_pids[0] > 1)
2483                         qsort((void *) &live_pids[1], live_pids[0], sizeof(int),
2484                                   comparePids);
2485         }
2486
2487         /*
2488          * Create the DB hashtable
2489          */
2490         memset(&hash_ctl, 0, sizeof(hash_ctl));
2491         hash_ctl.keysize = sizeof(Oid);
2492         hash_ctl.entrysize = sizeof(PgStat_StatDBEntry);
2493         hash_ctl.hash = oid_hash;
2494         hash_ctl.hcxt = use_mcxt;
2495         *dbhash = hash_create("Databases hash", PGSTAT_DB_HASH_SIZE, &hash_ctl,
2496                                                   HASH_ELEM | HASH_FUNCTION | mcxt_flags);
2497
2498         /*
2499          * Initialize the number of known backends to zero, just in case we do a
2500          * silent error return below.
2501          */
2502         if (numbackends != NULL)
2503                 *numbackends = 0;
2504         if (betab != NULL)
2505                 *betab = NULL;
2506
2507         /*
2508          * Try to open the status file. If it doesn't exist, the backends simply
2509          * return zero for anything and the collector simply starts from scratch
2510          * with empty counters.
2511          */
2512         if ((fpin = AllocateFile(PGSTAT_STAT_FILENAME, PG_BINARY_R)) == NULL)
2513                 return;
2514
2515         /*
2516          * Verify it's of the expected format.
2517          */
2518         if (fread(&format_id, 1, sizeof(format_id), fpin) != sizeof(format_id)
2519                 || format_id != PGSTAT_FILE_FORMAT_ID)
2520         {
2521                 ereport(pgStatRunningInCollector ? LOG : WARNING,
2522                                 (errmsg("corrupted pgstat.stat file")));
2523                 goto done;
2524         }
2525
2526         /*
2527          * We found an existing collector stats file. Read it and put all the
2528          * hashtable entries into place.
2529          */
2530         for (;;)
2531         {
2532                 switch (fgetc(fpin))
2533                 {
2534                                 /*
2535                                  * 'D'  A PgStat_StatDBEntry struct describing a database
2536                                  * follows. Subsequently, zero to many 'T' entries will follow
2537                                  * until a 'd' is encountered.
2538                                  */
2539                         case 'D':
2540                                 if (fread(&dbbuf, 1, sizeof(dbbuf), fpin) != sizeof(dbbuf))
2541                                 {
2542                                         ereport(pgStatRunningInCollector ? LOG : WARNING,
2543                                                         (errmsg("corrupted pgstat.stat file")));
2544                                         goto done;
2545                                 }
2546
2547                                 /*
2548                                  * Add to the DB hash
2549                                  */
2550                                 dbentry = (PgStat_StatDBEntry *) hash_search(*dbhash,
2551                                                                                                   (void *) &dbbuf.databaseid,
2552                                                                                                                          HASH_ENTER,
2553                                                                                                                          &found);
2554                                 if (found)
2555                                 {
2556                                         ereport(pgStatRunningInCollector ? LOG : WARNING,
2557                                                         (errmsg("corrupted pgstat.stat file")));
2558                                         goto done;
2559                                 }
2560
2561                                 memcpy(dbentry, &dbbuf, sizeof(PgStat_StatDBEntry));
2562                                 dbentry->tables = NULL;
2563                                 dbentry->destroy = 0;
2564                                 dbentry->n_backends = 0;
2565
2566                                 /*
2567                                  * Don't collect tables if not the requested DB (or the
2568                                  * shared-table info)
2569                                  */
2570                                 if (onlydb != InvalidOid)
2571                                 {
2572                                         if (dbbuf.databaseid != onlydb &&
2573                                                 dbbuf.databaseid != InvalidOid)
2574                                                 break;
2575                                 }
2576
2577                                 memset(&hash_ctl, 0, sizeof(hash_ctl));
2578                                 hash_ctl.keysize = sizeof(Oid);
2579                                 hash_ctl.entrysize = sizeof(PgStat_StatTabEntry);
2580                                 hash_ctl.hash = oid_hash;
2581                                 hash_ctl.hcxt = use_mcxt;
2582                                 dbentry->tables = hash_create("Per-database table",
2583                                                                                           PGSTAT_TAB_HASH_SIZE,
2584                                                                                           &hash_ctl,
2585                                                                          HASH_ELEM | HASH_FUNCTION | mcxt_flags);
2586
2587                                 /*
2588                                  * Arrange that following 'T's add entries to this databases
2589                                  * tables hash table.
2590                                  */
2591                                 tabhash = dbentry->tables;
2592                                 break;
2593
2594                                 /*
2595                                  * 'd'  End of this database.
2596                                  */
2597                         case 'd':
2598                                 tabhash = NULL;
2599                                 break;
2600
2601                                 /*
2602                                  * 'T'  A PgStat_StatTabEntry follows.
2603                                  */
2604                         case 'T':
2605                                 if (fread(&tabbuf, 1, sizeof(tabbuf), fpin) != sizeof(tabbuf))
2606                                 {
2607                                         ereport(pgStatRunningInCollector ? LOG : WARNING,
2608                                                         (errmsg("corrupted pgstat.stat file")));
2609                                         goto done;
2610                                 }
2611
2612                                 /*
2613                                  * Skip if table belongs to a not requested database.
2614                                  */
2615                                 if (tabhash == NULL)
2616                                         break;
2617
2618                                 tabentry = (PgStat_StatTabEntry *) hash_search(tabhash,
2619                                                                                                         (void *) &tabbuf.tableid,
2620                                                                                                                  HASH_ENTER, &found);
2621
2622                                 if (found)
2623                                 {
2624                                         ereport(pgStatRunningInCollector ? LOG : WARNING,
2625                                                         (errmsg("corrupted pgstat.stat file")));
2626                                         goto done;
2627                                 }
2628
2629                                 memcpy(tabentry, &tabbuf, sizeof(tabbuf));
2630                                 break;
2631
2632                                 /*
2633                                  * 'M'  The maximum number of backends to expect follows.
2634                                  */
2635                         case 'M':
2636                                 if (betab == NULL || numbackends == NULL)
2637                                         goto done;
2638                                 if (fread(&maxbackends, 1, sizeof(maxbackends), fpin) !=
2639                                         sizeof(maxbackends))
2640                                 {
2641                                         ereport(pgStatRunningInCollector ? LOG : WARNING,
2642                                                         (errmsg("corrupted pgstat.stat file")));
2643                                         goto done;
2644                                 }
2645                                 if (maxbackends == 0)
2646                                         goto done;
2647
2648                                 /*
2649                                  * Allocate space (in TopTransactionContext too) for the
2650                                  * backend table.
2651                                  */
2652                                 if (use_mcxt == NULL)
2653                                         *betab = (PgStat_StatBeEntry *)
2654                                                 palloc(sizeof(PgStat_StatBeEntry) * maxbackends);
2655                                 else
2656                                         *betab = (PgStat_StatBeEntry *)
2657                                                 MemoryContextAlloc(use_mcxt,
2658                                                                    sizeof(PgStat_StatBeEntry) * maxbackends);
2659                                 break;
2660
2661                                 /*
2662                                  * 'B'  A PgStat_StatBeEntry follows.
2663                                  */
2664                         case 'B':
2665                                 if (betab == NULL || numbackends == NULL || *betab == NULL)
2666                                         goto done;
2667
2668                                 if (havebackends >= maxbackends)
2669                                         goto done;
2670
2671                                 /*
2672                                  * Read it directly into the table.
2673                                  */
2674                                 beentry = &(*betab)[havebackends];
2675
2676                                 if (fread(beentry, 1, sizeof(PgStat_StatBeEntry), fpin) !=
2677                                         sizeof(PgStat_StatBeEntry))
2678                                 {
2679                                         ereport(pgStatRunningInCollector ? LOG : WARNING,
2680                                                         (errmsg("corrupted pgstat.stat file")));
2681                                         goto done;
2682                                 }
2683
2684                                 /*
2685                                  * If possible, check PID to verify still running
2686                                  */
2687                                 if (live_pids &&
2688                                         (live_pids[0] == 0 ||
2689                                          bsearch((void *) &beentry->procpid,
2690                                                          (void *) &live_pids[1],
2691                                                          live_pids[0],
2692                                                          sizeof(int),
2693                                                          comparePids) == NULL))
2694                                 {
2695                                         /*
2696                                          * Note: we could send a BETERM message to tell the
2697                                          * collector to drop the entry, but I'm a bit worried
2698                                          * about race conditions.  For now, just silently ignore
2699                                          * dead entries; they'll get recycled eventually anyway.
2700                                          */
2701
2702                                         /* Don't accept the entry */
2703                                         memset(beentry, 0, sizeof(PgStat_StatBeEntry));
2704                                         break;
2705                                 }
2706
2707                                 /*
2708                                  * Count backends per database here.
2709                                  */
2710                                 dbentry = (PgStat_StatDBEntry *)
2711                                         hash_search(*dbhash,
2712                                                                 &(beentry->databaseid),
2713                                                                 HASH_FIND,
2714                                                                 NULL);
2715                                 if (dbentry)
2716                                         dbentry->n_backends++;
2717
2718                                 havebackends++;
2719                                 *numbackends = havebackends;
2720
2721                                 break;
2722
2723                                 /*
2724                                  * 'E'  The EOF marker of a complete stats file.
2725                                  */
2726                         case 'E':
2727                                 goto done;
2728
2729                         default:
2730                                 ereport(pgStatRunningInCollector ? LOG : WARNING,
2731                                                 (errmsg("corrupted pgstat.stat file")));
2732                                 goto done;
2733                 }
2734         }
2735
2736 done:
2737         FreeFile(fpin);
2738 }
2739
2740 /*
2741  * If not done for this transaction, read the statistics collector
2742  * stats file into some hash tables.
2743  *
2744  * Because we store the hash tables in TopTransactionContext, the result
2745  * is good for the entire current main transaction.
2746  *
2747  * Inside the autovacuum process, the statfile is assumed to be valid
2748  * "forever", that is one iteration, within one database.  This means
2749  * we only consider the statistics as they were when the autovacuum
2750  * iteration started.
2751  */
2752 static void
2753 backend_read_statsfile(void)
2754 {
2755         if (IsAutoVacuumProcess())
2756         {
2757                 /* already read it? */
2758                 if (pgStatDBHash)
2759                         return;
2760                 Assert(!pgStatRunningInCollector);
2761                 pgstat_read_statsfile(&pgStatDBHash, InvalidOid,
2762                                                           &pgStatBeTable, &pgStatNumBackends);
2763         }
2764         else
2765         {
2766                 TransactionId topXid = GetTopTransactionId();
2767
2768                 if (!TransactionIdEquals(pgStatDBHashXact, topXid))
2769                 {
2770                         Assert(!pgStatRunningInCollector);
2771                         pgstat_read_statsfile(&pgStatDBHash, MyDatabaseId,
2772                                                                   &pgStatBeTable, &pgStatNumBackends);
2773                         pgStatDBHashXact = topXid;
2774                 }
2775         }
2776 }
2777
2778
2779 /* ----------
2780  * pgstat_recv_bestart() -
2781  *
2782  *      Process a backend startup message.
2783  * ----------
2784  */
2785 static void
2786 pgstat_recv_bestart(PgStat_MsgBestart *msg, int len)
2787 {
2788         PgStat_StatBeEntry *entry;
2789
2790         /*
2791          * If the backend is known dead, we ignore the message -- we don't want to
2792          * update the backend entry's state since this BESTART message refers to
2793          * an old, dead backend
2794          */
2795         if (pgstat_add_backend(&msg->m_hdr) != 0)
2796                 return;
2797
2798         entry = &(pgStatBeTable[msg->m_hdr.m_backendid - 1]);
2799         entry->userid = msg->m_userid;
2800         memcpy(&entry->clientaddr, &msg->m_clientaddr, sizeof(entry->clientaddr));
2801         entry->databaseid = msg->m_databaseid;
2802 }
2803
2804
2805 /* ----------
2806  * pgstat_recv_beterm() -
2807  *
2808  *      Process a backend termination message.
2809  * ----------
2810  */
2811 static void
2812 pgstat_recv_beterm(PgStat_MsgBeterm *msg, int len)
2813 {
2814         pgstat_sub_backend(msg->m_hdr.m_procpid);
2815 }
2816
2817 /* ----------
2818  * pgstat_recv_autovac() -
2819  *
2820  *      Process an autovacuum signalling message.
2821  * ----------
2822  */
2823 static void
2824 pgstat_recv_autovac(PgStat_MsgAutovacStart *msg, int len)
2825 {
2826         PgStat_StatDBEntry *dbentry;
2827
2828         /*
2829          * Lookup the database in the hashtable.  Don't create the entry if it
2830          * doesn't exist, because autovacuum may be processing a template
2831          * database.  If this isn't the case, the database is most likely to have
2832          * an entry already.  (If it doesn't, not much harm is done anyway --
2833          * it'll get created as soon as somebody actually uses the database.)
2834          */
2835         dbentry = pgstat_get_db_entry(msg->m_databaseid, false);
2836         if (dbentry == NULL)
2837                 return;
2838
2839         /*
2840          * Store the last autovacuum time in the database entry.
2841          */
2842         dbentry->last_autovac_time = msg->m_start_time;
2843 }
2844
2845 /* ----------
2846  * pgstat_recv_vacuum() -
2847  *
2848  *      Process a VACUUM message.
2849  * ----------
2850  */
2851 static void
2852 pgstat_recv_vacuum(PgStat_MsgVacuum *msg, int len)
2853 {
2854         PgStat_StatDBEntry *dbentry;
2855         PgStat_StatTabEntry *tabentry;
2856         bool            found;
2857         bool            create;
2858
2859         /*
2860          * If we don't know about the database, ignore the message, because it may
2861          * be autovacuum processing a template database.  But if the message is
2862          * for database InvalidOid, don't ignore it, because we are getting a
2863          * message from vacuuming a shared relation.
2864          */
2865         create = (msg->m_databaseid == InvalidOid);
2866
2867         dbentry = pgstat_get_db_entry(msg->m_databaseid, create);
2868         if (dbentry == NULL)
2869                 return;
2870
2871         tabentry = hash_search(dbentry->tables, &(msg->m_tableoid),
2872                                                    HASH_ENTER, &found);
2873
2874         /*
2875          * If we are creating the entry, initialize it.
2876          */
2877         if (!found)
2878         {
2879                 tabentry->numscans = 0;
2880
2881                 tabentry->tuples_returned = 0;
2882                 tabentry->tuples_fetched = 0;
2883                 tabentry->tuples_inserted = 0;
2884                 tabentry->tuples_updated = 0;
2885                 tabentry->tuples_deleted = 0;
2886
2887                 tabentry->n_live_tuples = msg->m_tuples;
2888                 tabentry->n_dead_tuples = 0;
2889
2890                 if (msg->m_analyze)
2891                         tabentry->last_anl_tuples = msg->m_tuples;
2892                 else
2893                         tabentry->last_anl_tuples = 0;
2894
2895                 tabentry->blocks_fetched = 0;
2896                 tabentry->blocks_hit = 0;
2897
2898                 tabentry->destroy = 0;
2899         }
2900         else
2901         {
2902                 tabentry->n_live_tuples = msg->m_tuples;
2903                 tabentry->n_dead_tuples = 0;
2904                 if (msg->m_analyze)
2905                         tabentry->last_anl_tuples = msg->m_tuples;
2906         }
2907 }
2908
2909 /* ----------
2910  * pgstat_recv_analyze() -
2911  *
2912  *      Process an ANALYZE message.
2913  * ----------
2914  */
2915 static void
2916 pgstat_recv_analyze(PgStat_MsgAnalyze *msg, int len)
2917 {
2918         PgStat_StatDBEntry *dbentry;
2919         PgStat_StatTabEntry *tabentry;
2920         bool            found;
2921
2922         /*
2923          * Note that we do create the database entry here, as opposed to what we
2924          * do on AutovacStart and Vacuum messages.      This is because autovacuum
2925          * never executes ANALYZE on template databases.
2926          */
2927         dbentry = pgstat_get_db_entry(msg->m_databaseid, true);
2928
2929         tabentry = hash_search(dbentry->tables, &(msg->m_tableoid),
2930                                                    HASH_ENTER, &found);
2931
2932         /*
2933          * If we are creating the entry, initialize it.
2934          */
2935         if (!found)
2936         {
2937                 tabentry->numscans = 0;
2938
2939                 tabentry->tuples_returned = 0;
2940                 tabentry->tuples_fetched = 0;
2941                 tabentry->tuples_inserted = 0;
2942                 tabentry->tuples_updated = 0;
2943                 tabentry->tuples_deleted = 0;
2944
2945                 tabentry->n_live_tuples = msg->m_live_tuples;
2946                 tabentry->n_dead_tuples = msg->m_dead_tuples;
2947                 tabentry->last_anl_tuples = msg->m_live_tuples + msg->m_dead_tuples;
2948
2949                 tabentry->blocks_fetched = 0;
2950                 tabentry->blocks_hit = 0;
2951
2952                 tabentry->destroy = 0;
2953         }
2954         else
2955         {
2956                 tabentry->n_live_tuples = msg->m_live_tuples;
2957                 tabentry->n_dead_tuples = msg->m_dead_tuples;
2958                 tabentry->last_anl_tuples = msg->m_live_tuples + msg->m_dead_tuples;
2959         }
2960 }
2961
2962 /* ----------
2963  * pgstat_recv_activity() -
2964  *
2965  *      Remember what the backend is doing.
2966  * ----------
2967  */
2968 static void
2969 pgstat_recv_activity(PgStat_MsgActivity *msg, int len)
2970 {
2971         PgStat_StatBeEntry *entry;
2972
2973         /*
2974          * Here we check explicitly for 0 return, since we don't want to mangle
2975          * the activity of an active backend by a delayed packet from a dead one.
2976          */
2977         if (pgstat_add_backend(&msg->m_hdr) != 0)
2978                 return;
2979
2980         entry = &(pgStatBeTable[msg->m_hdr.m_backendid - 1]);
2981
2982         StrNCpy(entry->activity, msg->m_cmd_str, PGSTAT_ACTIVITY_SIZE);
2983
2984         entry->activity_start_timestamp = GetCurrentTimestamp();
2985 }
2986
2987
2988 /* ----------
2989  * pgstat_recv_tabstat() -
2990  *
2991  *      Count what the backend has done.
2992  * ----------
2993  */
2994 static void
2995 pgstat_recv_tabstat(PgStat_MsgTabstat *msg, int len)
2996 {
2997         PgStat_TableEntry *tabmsg = &(msg->m_entry[0]);
2998         PgStat_StatDBEntry *dbentry;
2999         PgStat_StatTabEntry *tabentry;
3000         int                     i;
3001         bool            found;
3002
3003         /*
3004          * Make sure the backend is counted for.
3005          */
3006         if (pgstat_add_backend(&msg->m_hdr) < 0)
3007                 return;
3008
3009         dbentry = pgstat_get_db_entry(msg->m_databaseid, true);
3010
3011         /*
3012          * If the database is marked for destroy, this is a delayed UDP packet and
3013          * not worth being counted.
3014          */
3015         if (dbentry->destroy > 0)
3016                 return;
3017
3018         dbentry->n_xact_commit += (PgStat_Counter) (msg->m_xact_commit);
3019         dbentry->n_xact_rollback += (PgStat_Counter) (msg->m_xact_rollback);
3020
3021         /*
3022          * Process all table entries in the message.
3023          */
3024         for (i = 0; i < msg->m_nentries; i++)
3025         {
3026                 tabentry = (PgStat_StatTabEntry *) hash_search(dbentry->tables,
3027                                                                                                   (void *) &(tabmsg[i].t_id),
3028                                                                                                            HASH_ENTER, &found);
3029
3030                 if (!found)
3031                 {
3032                         /*
3033                          * If it's a new table entry, initialize counters to the values we
3034                          * just got.
3035                          */
3036                         tabentry->numscans = tabmsg[i].t_numscans;
3037                         tabentry->tuples_returned = tabmsg[i].t_tuples_returned;
3038                         tabentry->tuples_fetched = tabmsg[i].t_tuples_fetched;
3039                         tabentry->tuples_inserted = tabmsg[i].t_tuples_inserted;
3040                         tabentry->tuples_updated = tabmsg[i].t_tuples_updated;
3041                         tabentry->tuples_deleted = tabmsg[i].t_tuples_deleted;
3042
3043                         tabentry->n_live_tuples = tabmsg[i].t_tuples_inserted;
3044                         tabentry->n_dead_tuples = tabmsg[i].t_tuples_updated +
3045                                 tabmsg[i].t_tuples_deleted;
3046                         tabentry->last_anl_tuples = 0;
3047
3048                         tabentry->blocks_fetched = tabmsg[i].t_blocks_fetched;
3049                         tabentry->blocks_hit = tabmsg[i].t_blocks_hit;
3050
3051                         tabentry->destroy = 0;
3052                 }
3053                 else
3054                 {
3055                         /*
3056                          * Otherwise add the values to the existing entry.
3057                          */
3058                         tabentry->numscans += tabmsg[i].t_numscans;
3059                         tabentry->tuples_returned += tabmsg[i].t_tuples_returned;
3060                         tabentry->tuples_fetched += tabmsg[i].t_tuples_fetched;
3061                         tabentry->tuples_inserted += tabmsg[i].t_tuples_inserted;
3062                         tabentry->tuples_updated += tabmsg[i].t_tuples_updated;
3063                         tabentry->tuples_deleted += tabmsg[i].t_tuples_deleted;
3064
3065                         tabentry->n_live_tuples += tabmsg[i].t_tuples_inserted;
3066                         tabentry->n_dead_tuples += tabmsg[i].t_tuples_updated +
3067                                 tabmsg[i].t_tuples_deleted;
3068
3069                         tabentry->blocks_fetched += tabmsg[i].t_blocks_fetched;
3070                         tabentry->blocks_hit += tabmsg[i].t_blocks_hit;
3071                 }
3072
3073                 /*
3074                  * And add the block IO to the database entry.
3075                  */
3076                 dbentry->n_blocks_fetched += tabmsg[i].t_blocks_fetched;
3077                 dbentry->n_blocks_hit += tabmsg[i].t_blocks_hit;
3078         }
3079 }
3080
3081
3082 /* ----------
3083  * pgstat_recv_tabpurge() -
3084  *
3085  *      Arrange for dead table removal.
3086  * ----------
3087  */
3088 static void
3089 pgstat_recv_tabpurge(PgStat_MsgTabpurge *msg, int len)
3090 {
3091         PgStat_StatDBEntry *dbentry;
3092         PgStat_StatTabEntry *tabentry;
3093         int                     i;
3094
3095         /*
3096          * Make sure the backend is counted for.
3097          */
3098         if (pgstat_add_backend(&msg->m_hdr) < 0)
3099                 return;
3100
3101         dbentry = pgstat_get_db_entry(msg->m_databaseid, false);
3102
3103         /*
3104          * No need to purge if we don't even know the database.
3105          */
3106         if (!dbentry || !dbentry->tables)
3107                 return;
3108
3109         /*
3110          * If the database is marked for destroy, this is a delayed UDP packet and
3111          * the tables will go away at DB destruction.
3112          */
3113         if (dbentry->destroy > 0)
3114                 return;
3115
3116         /*
3117          * Process all table entries in the message.
3118          */
3119         for (i = 0; i < msg->m_nentries; i++)
3120         {
3121                 tabentry = (PgStat_StatTabEntry *) hash_search(dbentry->tables,
3122                                                                                            (void *) &(msg->m_tableid[i]),
3123                                                                                                            HASH_FIND, NULL);
3124                 if (tabentry)
3125                         tabentry->destroy = PGSTAT_DESTROY_COUNT;
3126         }
3127 }
3128
3129
3130 /* ----------
3131  * pgstat_recv_dropdb() -
3132  *
3133  *      Arrange for dead database removal
3134  * ----------
3135  */
3136 static void
3137 pgstat_recv_dropdb(PgStat_MsgDropdb *msg, int len)
3138 {
3139         PgStat_StatDBEntry *dbentry;
3140
3141         /*
3142          * Make sure the backend is counted for.
3143          */
3144         if (pgstat_add_backend(&msg->m_hdr) < 0)
3145                 return;
3146
3147         /*
3148          * Lookup the database in the hashtable.
3149          */
3150         dbentry = pgstat_get_db_entry(msg->m_databaseid, false);
3151
3152         /*
3153          * Mark the database for destruction.
3154          */
3155         if (dbentry)
3156                 dbentry->destroy = PGSTAT_DESTROY_COUNT;
3157 }
3158
3159
3160 /* ----------
3161  * pgstat_recv_resetcounter() -
3162  *
3163  *      Reset the statistics for the specified database.
3164  * ----------
3165  */
3166 static void
3167 pgstat_recv_resetcounter(PgStat_MsgResetcounter *msg, int len)
3168 {
3169         HASHCTL         hash_ctl;
3170         PgStat_StatDBEntry *dbentry;
3171
3172         /*
3173          * Make sure the backend is counted for.
3174          */
3175         if (pgstat_add_backend(&msg->m_hdr) < 0)
3176                 return;
3177
3178         /*
3179          * Lookup the database in the hashtable.  Nothing to do if not there.
3180          */
3181         dbentry = pgstat_get_db_entry(msg->m_databaseid, false);
3182
3183         if (!dbentry)
3184                 return;
3185
3186         /*
3187          * We simply throw away all the database's table entries by recreating a
3188          * new hash table for them.
3189          */
3190         if (dbentry->tables != NULL)
3191                 hash_destroy(dbentry->tables);
3192
3193         dbentry->tables = NULL;
3194         dbentry->n_xact_commit = 0;
3195         dbentry->n_xact_rollback = 0;
3196         dbentry->n_blocks_fetched = 0;
3197         dbentry->n_blocks_hit = 0;
3198         dbentry->destroy = 0;
3199
3200         memset(&hash_ctl, 0, sizeof(hash_ctl));
3201         hash_ctl.keysize = sizeof(Oid);
3202         hash_ctl.entrysize = sizeof(PgStat_StatTabEntry);
3203         hash_ctl.hash = oid_hash;
3204         dbentry->tables = hash_create("Per-database table",
3205                                                                   PGSTAT_TAB_HASH_SIZE,
3206                                                                   &hash_ctl,
3207                                                                   HASH_ELEM | HASH_FUNCTION);
3208 }