]> granicus.if.org Git - pgbouncer/blob - src/janitor.c
b37d028e859ef24b0853a1219fc8efa34ada948c
[pgbouncer] / src / janitor.c
1 /*
2  * PgBouncer - Lightweight connection pooler for PostgreSQL.
3  * 
4  * Copyright (c) 2007-2009  Marko Kreen, Skype Technologies OÜ
5  * 
6  * Permission to use, copy, modify, and/or distribute this software for any
7  * purpose with or without fee is hereby granted, provided that the above
8  * copyright notice and this permission notice appear in all copies.
9  * 
10  * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11  * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12  * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13  * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14  * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15  * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16  * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17  */
18
19 /*
20  * Periodic maintenance.
21  */
22
23 #include "bouncer.h"
24
25 /* do full maintenance 3x per second */
26 static struct timeval full_maint_period = {0, USEC / 3};
27 static struct event full_maint_ev;
28
29 /* close all sockets in server list */
30 static void close_server_list(StatList *sk_list, const char *reason)
31 {
32         List *item, *tmp;
33         PgSocket *server;
34
35         statlist_for_each_safe(item, sk_list, tmp) {
36                 server = container_of(item, PgSocket, head);
37                 disconnect_server(server, true, reason);
38         }
39 }
40
41 static void close_client_list(StatList *sk_list, const char *reason)
42 {
43         List *item, *tmp;
44         PgSocket *client;
45
46         statlist_for_each_safe(item, sk_list, tmp) {
47                 client = container_of(item, PgSocket, head);
48                 disconnect_client(client, true, reason);
49         }
50 }
51
52 bool suspend_socket(PgSocket *sk, bool force_suspend)
53 {
54         if (sk->suspended)
55                 return true;
56
57         if (sbuf_is_empty(&sk->sbuf)) {
58                 if (sbuf_pause(&sk->sbuf))
59                         sk->suspended = 1;
60         }
61
62         if (sk->suspended || !force_suspend)
63                 return sk->suspended;
64
65         if (is_server_socket(sk))
66                 disconnect_server(sk, true, "suspend_timeout");
67         else
68                 disconnect_client(sk, true, "suspend_timeout");
69         return true;
70 }
71
72 /* suspend all sockets in socket list */
73 static int suspend_socket_list(StatList *list, bool force_suspend)
74 {
75         List *item, *tmp;
76         PgSocket *sk;
77         int active = 0;
78
79         statlist_for_each_safe(item, list, tmp) {
80                 sk = container_of(item, PgSocket, head);
81                 if (!suspend_socket(sk, force_suspend))
82                         active++;
83         }
84         return active;
85 }
86
87 /* resume all suspended sockets in socket list */
88 static void resume_socket_list(StatList *list)
89 {
90         List *item, *tmp;
91         PgSocket *sk;
92
93         statlist_for_each_safe(item, list, tmp) {
94                 sk = container_of(item, PgSocket, head);
95                 if (sk->suspended) {
96                         sk->suspended = 0;
97                         sbuf_continue(&sk->sbuf);
98                 }
99         }
100 }
101
102 /* resume all suspended sockets in all pools */
103 static void resume_sockets(void)
104 {
105         List *item;
106         PgPool *pool;
107
108         statlist_for_each(item, &pool_list) {
109                 pool = container_of(item, PgPool, head);
110                 if (pool->db->admin)
111                         continue;
112                 resume_socket_list(&pool->active_client_list);
113                 resume_socket_list(&pool->active_server_list);
114                 resume_socket_list(&pool->idle_server_list);
115                 resume_socket_list(&pool->used_server_list);
116         }
117 }
118
119 /* resume pools and listen sockets */
120 void resume_all(void)
121 {
122         resume_sockets();
123         resume_pooler();
124 }
125
126 /*
127  * send test/reset query to server if needed
128  */
129 static void launch_recheck(PgPool *pool)
130 {
131         const char *q = cf_server_check_query;
132         bool need_check = true;
133         PgSocket *server;
134         bool res = true;
135
136         /* find clean server */
137         while (1) {
138                 server = first_socket(&pool->used_server_list);
139                 if (!server)
140                         return;
141                 if (server->ready)
142                         break;
143                 disconnect_server(server, true, "idle server got dirty");
144         }
145
146         /* is the check needed? */
147         if (q == NULL || q[0] == 0)
148                 need_check = false;
149         else if (cf_server_check_delay > 0) {
150                 usec_t now = get_cached_time();
151                 if (now - server->request_time < cf_server_check_delay)
152                         need_check = false;
153         }
154
155         if (need_check) {
156                 /* send test query, wait for result */
157                 slog_debug(server, "P: Checking: %s", q);
158                 change_server_state(server, SV_TESTED);
159                 SEND_generic(res, server, 'Q', "s", q);
160                 if (!res)
161                         disconnect_server(server, false, "test query failed");
162         } else
163                 /* make immediately available */
164                 release_server(server);
165 }
166
167 /*
168  * make servers available
169  */
170 static void per_loop_activate(PgPool *pool)
171 {
172         List *item, *tmp;
173         PgSocket *client;
174
175         /* see if any server have been freed */
176         statlist_for_each_safe(item, &pool->waiting_client_list, tmp) {
177                 client = container_of(item, PgSocket, head);
178                 if (!statlist_empty(&pool->idle_server_list)) {
179
180                         /* db not fully initialized after reboot */
181                         if (client->wait_for_welcome && !pool->welcome_msg_ready) {
182                                 launch_new_connection(pool);
183                                 continue;
184                         }
185
186                         /* there is a ready server already */
187                         activate_client(client);
188                 } else if (!statlist_empty(&pool->tested_server_list)) {
189                         /* some connections are in testing process */
190                         break;
191                 } else if (!statlist_empty(&pool->used_server_list)) {
192                         /* ask for more connections to be tested */
193                         launch_recheck(pool);
194                         break;
195                 } else {
196                         /* not enough connections */
197                         launch_new_connection(pool);
198                         break;
199                 }
200         }
201 }
202
203 /*
204  * pause active clients
205  */
206 static int per_loop_pause(PgPool *pool)
207 {
208         int active = 0;
209
210         if (pool->db->admin)
211                 return 0;
212
213         close_server_list(&pool->idle_server_list, "pause mode");
214         close_server_list(&pool->used_server_list, "pause mode");
215         close_server_list(&pool->new_server_list, "pause mode");
216
217         active += statlist_count(&pool->active_server_list);
218         active += statlist_count(&pool->tested_server_list);
219
220         return active;
221 }
222
223 /*
224  * suspend active clients and servers
225  */
226 static int per_loop_suspend(PgPool *pool, bool force_suspend)
227 {
228         int active = 0;
229
230         if (pool->db->admin)
231                 return 0;
232
233         active += suspend_socket_list(&pool->active_client_list, force_suspend);
234
235         /* this list is unsuspendable, but still need force_suspend and counting */
236         active += suspend_socket_list(&pool->waiting_client_list, force_suspend);
237         if (active)
238                 per_loop_activate(pool);
239
240         if (!active) {
241                 active += suspend_socket_list(&pool->active_server_list, force_suspend);
242                 active += suspend_socket_list(&pool->idle_server_list, force_suspend);
243
244                 /* as all clients are done, no need for them */
245                 close_server_list(&pool->tested_server_list, "close unsafe file descriptors on suspend");
246                 close_server_list(&pool->used_server_list, "close unsafe file descriptors on suspend");
247         }
248
249         return active;
250 }
251
252 /*
253  * this function is called for each event loop.
254  */
255 void per_loop_maint(void)
256 {
257         List *item;
258         PgPool *pool;
259         int active = 0;
260         int partial_pause = 0;
261         bool force_suspend = false;
262
263         if (cf_pause_mode == P_SUSPEND && cf_suspend_timeout > 0) {
264                 usec_t stime = get_cached_time() - g_suspend_start;
265                 if (stime >= cf_suspend_timeout)
266                         force_suspend = true;
267         }
268
269         statlist_for_each(item, &pool_list) {
270                 pool = container_of(item, PgPool, head);
271                 if (pool->db->admin)
272                         continue;
273                 switch (cf_pause_mode) {
274                 case P_NONE:
275                         if (pool->db->db_paused) {
276                                 partial_pause = 1;
277                                 active += per_loop_pause(pool);
278                         } else
279                                 per_loop_activate(pool);
280                         break;
281                 case P_PAUSE:
282                         active += per_loop_pause(pool);
283                         break;
284                 case P_SUSPEND:
285                         active += per_loop_suspend(pool, force_suspend);
286                         break;
287                 }
288         }
289
290         switch (cf_pause_mode) {
291         case P_SUSPEND:
292                 if (force_suspend) {
293                         close_client_list(&login_client_list, "suspend_timeout");
294                 } else
295                         active += statlist_count(&login_client_list);
296         case P_PAUSE:
297                 if (!active)
298                         admin_pause_done();
299                 break;
300         case P_NONE:
301                 if (partial_pause && !active)
302                         admin_pause_done();
303                 break;
304         }
305 }
306
307 /* maintaining clients in pool */
308 static void pool_client_maint(PgPool *pool)
309 {
310         List *item, *tmp;
311         usec_t now = get_cached_time();
312         PgSocket *client;
313         usec_t age;
314
315         /* force client_idle_timeout */
316         if (cf_client_idle_timeout > 0) {
317                 statlist_for_each_safe(item, &pool->active_client_list, tmp) {
318                         client = container_of(item, PgSocket, head);
319                         Assert(client->state == CL_ACTIVE);
320                         if (client->link)
321                                 continue;
322                         if (now - client->request_time > cf_client_idle_timeout)
323                                 disconnect_client(client, true, "client_idle_timeout");
324                 }
325         }
326
327         /* force timeouts for waiting queries */
328         if (cf_query_timeout > 0 || cf_query_wait_timeout > 0) {
329                 statlist_for_each_safe(item, &pool->waiting_client_list, tmp) {
330                         client = container_of(item, PgSocket, head);
331                         Assert(client->state == CL_WAITING);
332                         if (client->query_start == 0) {
333                                 age = now - client->request_time;
334                                 //log_warning("query_start==0");
335                         } else
336                                 age = now - client->query_start;
337
338                         if (cf_query_timeout > 0 && age > cf_query_timeout)
339                                 disconnect_client(client, true, "query_timeout");
340                         else if (cf_query_wait_timeout > 0 && age > cf_query_wait_timeout)
341                                 disconnect_client(client, true, "query_wait_timeout");
342                 }
343         }
344
345         /* apply client_login_timeout to clients waiting for welcome pkt */
346         if (cf_client_login_timeout > 0 && !pool->welcome_msg_ready) {
347                 statlist_for_each_safe(item, &pool->waiting_client_list, tmp) {
348                         client = container_of(item, PgSocket, head);
349                         age = now - client->connect_time;
350                         if (age > cf_client_login_timeout)
351                                 disconnect_client(client, true, "client_login_timeout (server down)");
352                 }
353         }
354 }
355
356 static void check_unused_servers(PgPool *pool, StatList *slist, bool idle_test)
357 {
358         usec_t now = get_cached_time();
359         List *item, *tmp;
360         usec_t idle, age;
361         PgSocket *server;
362         usec_t lifetime_kill_gap = 0;
363
364         /*
365          * Calculate the time that disconnects because of server_lifetime
366          * must be separated.  This avoids the need to re-launch lot
367          * of connections together.
368          */
369         if (pool->db->pool_size > 0)
370                 lifetime_kill_gap = cf_server_lifetime / pool->db->pool_size;
371
372         /* disconnect idle servers if needed */
373         statlist_for_each_safe(item, slist, tmp) {
374                 server = container_of(item, PgSocket, head);
375
376                 age = now - server->connect_time;
377                 idle = now - server->request_time;
378
379                 if (server->close_needed) {
380                         disconnect_server(server, true, "database configuration changed");
381                 } else if (server->state == SV_IDLE && !server->ready) {
382                         disconnect_server(server, true, "SV_IDLE server got dirty");
383                 } else if (server->state == SV_USED && !server->ready) {
384                         disconnect_server(server, true, "SV_USED server got dirty");
385                 } else if (cf_server_idle_timeout > 0 && idle > cf_server_idle_timeout) {
386                         disconnect_server(server, true, "server idle timeout");
387                 } else if (age >= cf_server_lifetime) {
388                         if (pool->last_lifetime_disconnect + lifetime_kill_gap <= now) {
389                                 disconnect_server(server, true, "server lifetime over");
390                                 pool->last_lifetime_disconnect = now;
391                         }
392                 } else if (cf_pause_mode == P_PAUSE) {
393                         disconnect_server(server, true, "pause mode");
394                 } else if (idle_test && *cf_server_check_query) {
395                         if (idle > cf_server_check_delay)
396                                 change_server_state(server, SV_USED);
397                 }
398         }
399 }
400
401 /*
402  * Check pool size, close conns if too many.  Makes pooler
403  * react faster to the case when admin decreased pool size.
404  */
405 static void check_pool_size(PgPool *pool)
406 {
407         PgSocket *server;
408         int cur = statlist_count(&pool->active_server_list)
409                 + statlist_count(&pool->idle_server_list)
410                 + statlist_count(&pool->used_server_list)
411                 + statlist_count(&pool->tested_server_list);
412                 
413                 /* cancel pkt may create new srv conn without
414                  * taking pool_size into account
415                  *
416                  * statlist_count(&pool->new_server_list)
417                  */
418
419         int many = cur - (pool->db->pool_size + pool->db->res_pool_size);
420
421         Assert(pool->db->pool_size >= 0);
422
423         while (many > 0) {
424                 server = first_socket(&pool->used_server_list);
425                 if (!server)
426                         server = first_socket(&pool->idle_server_list);
427                 if (!server)
428                         break;
429                 disconnect_server(server, true, "too many servers in the pool");
430                 many--;
431         }
432
433         /*
434          * Because of fast-fail we may not have any waiting clients that would
435          * trigger server re-connect.  So do it explicitly.
436          */
437         if (cur == 0 && pool->last_connect_failed)
438                 launch_new_connection(pool);
439 }
440
441 /* maintain servers in a pool */
442 static void pool_server_maint(PgPool *pool)
443 {
444         List *item, *tmp;
445         usec_t age, now = get_cached_time();
446         PgSocket *server;
447
448         /* find and disconnect idle servers */
449         check_unused_servers(pool, &pool->used_server_list, 0);
450         check_unused_servers(pool, &pool->tested_server_list, 0);
451         check_unused_servers(pool, &pool->idle_server_list, 1);
452
453         /* where query got did not get answer in query_timeout */
454         if (cf_query_timeout > 0) {
455                 statlist_for_each_safe(item, &pool->active_server_list, tmp) {
456                         server = container_of(item, PgSocket, head);
457                         Assert(server->state == SV_ACTIVE);
458                         if (server->ready)
459                                 continue;
460                         age = now - server->link->request_time;
461                         if (age > cf_query_timeout)
462                                 disconnect_server(server, true, "query timeout");
463                 }
464         }
465
466         /* find connections that got connect, but could not log in */
467         if (cf_server_connect_timeout > 0) {
468                 statlist_for_each_safe(item, &pool->new_server_list, tmp) {
469                         server = container_of(item, PgSocket, head);
470                         Assert(server->state == SV_LOGIN);
471
472                         age = now - server->connect_time;
473                         if (age > cf_server_connect_timeout)
474                                 disconnect_server(server, true, "connect timeout");
475                 }
476         }
477
478         check_pool_size(pool);
479 }
480
481 static void cleanup_client_logins(void)
482 {
483         List *item, *tmp;
484         PgSocket *client;
485         usec_t age;
486         usec_t now = get_cached_time();
487
488         if (cf_client_login_timeout <= 0)
489                 return;
490
491         statlist_for_each_safe(item, &login_client_list, tmp) {
492                 client = container_of(item, PgSocket, head);
493                 age = now - client->connect_time;
494                 if (age > cf_client_login_timeout)
495                         disconnect_client(client, true, "client_login_timeout");
496         }
497 }
498
499 static void kill_database(PgDatabase *db);
500 static void cleanup_inactive_autodatabases(void)
501 {
502         List *item, *tmp;
503         PgDatabase *db;
504         usec_t age;
505         usec_t now = get_cached_time();
506
507         if (cf_autodb_idle_timeout <= 0)
508                 return;
509
510         statlist_for_each_safe(item, &autodatabase_idle_list, tmp) {
511                 db = container_of(item, PgDatabase, head);
512                 age = now - db->inactive_time;
513                 if (age > cf_autodb_idle_timeout) 
514                         kill_database(db);
515                 else
516                         break;
517         }
518 }
519
520 /* full-scale maintenance, done only occasionally */
521 static void do_full_maint(int sock, short flags, void *arg)
522 {
523         List *item, *tmp;
524         PgPool *pool;
525
526         /*
527          * Avoid doing anything that may surprise other pgbouncer.
528          */
529         if (cf_pause_mode == P_SUSPEND)
530                 goto skip_maint;
531
532         statlist_for_each_safe(item, &pool_list, tmp) {
533                 pool = container_of(item, PgPool, head);
534                 if (pool->db->admin)
535                         continue;
536                 pool_server_maint(pool);
537                 pool_client_maint(pool);
538                 if (pool->db->db_auto && pool->db->inactive_time == 0 &&
539                                 pool_client_count(pool) == 0 && pool_server_count(pool) == 0 ) {
540                         pool->db->inactive_time = get_cached_time();
541                         statlist_remove(&pool->db->head, &database_list);
542                         statlist_append(&pool->db->head, &autodatabase_idle_list);
543                 }
544         }
545
546         cleanup_inactive_autodatabases();
547
548         cleanup_client_logins();
549
550         if (cf_shutdown == 1 && get_active_server_count() == 0) {
551                 log_info("server connections dropped, exiting");
552                 cf_shutdown = 2;
553                 event_loopbreak();
554                 return;
555         }
556
557         if (cf_auth_type >= AUTH_TRUST)
558                 loader_users_check();
559
560 skip_maint:
561         safe_evtimer_add(&full_maint_ev, &full_maint_period);
562 }
563
564 /* first-time initializtion */
565 void janitor_setup(void)
566 {
567         /* launch maintenance */
568         evtimer_set(&full_maint_ev, do_full_maint, NULL);
569         safe_evtimer_add(&full_maint_ev, &full_maint_period);
570 }
571
572 static void kill_pool(PgPool *pool)
573 {
574         const char *reason = "database removed";
575
576         close_client_list(&pool->active_client_list, reason);
577         close_client_list(&pool->waiting_client_list, reason);
578         close_client_list(&pool->cancel_req_list, reason);
579
580         close_server_list(&pool->active_server_list, reason);
581         close_server_list(&pool->idle_server_list, reason);
582         close_server_list(&pool->used_server_list, reason);
583         close_server_list(&pool->tested_server_list, reason);
584         close_server_list(&pool->new_server_list, reason);
585
586         list_del(&pool->map_head);
587         statlist_remove(&pool->head, &pool_list);
588         obj_free(pool_cache, pool);
589 }
590
591 static void kill_database(PgDatabase *db)
592 {
593         PgPool *pool;
594         List *item, *tmp;
595
596         log_warning("dropping database '%s' as it does not exist anymore or inactive auto-database", db->name);
597
598         statlist_for_each_safe(item, &pool_list, tmp) {
599                 pool = container_of(item, PgPool, head);
600                 if (pool->db == db)
601                         kill_pool(pool);
602         }
603         if (db->forced_user)
604                 obj_free(user_cache, db->forced_user);
605         if (db->connect_query)
606                 free((void *)db->connect_query);
607         if (db->inactive_time)
608                 statlist_remove(&db->head, &autodatabase_idle_list);
609         else
610                 statlist_remove(&db->head, &database_list);
611         obj_free(db_cache, db);
612 }
613
614 /* as [pgbouncer] section can be loaded after databases,
615    there's need for review */
616 void config_postprocess(void)
617 {
618         List *item, *tmp;
619         PgDatabase *db;
620
621         statlist_for_each_safe(item, &database_list, tmp) {
622                 db = container_of(item, PgDatabase, head);
623                 if (db->db_dead) {
624                         kill_database(db);
625                         continue;
626                 }
627                 if (db->pool_size < 0)
628                         db->pool_size = cf_default_pool_size;
629                 if (db->res_pool_size < 0)
630                         db->res_pool_size = cf_res_pool_size;
631         }
632 }
633