]> granicus.if.org Git - pgbouncer/blob - src/objects.c
Move fast-fail relaunch logic around.
[pgbouncer] / src / objects.c
1 /*
2  * PgBouncer - Lightweight connection pooler for PostgreSQL.
3  * 
4  * Copyright (c) 2007-2009  Marko Kreen, Skype Technologies OÜ
5  * 
6  * Permission to use, copy, modify, and/or distribute this software for any
7  * purpose with or without fee is hereby granted, provided that the above
8  * copyright notice and this permission notice appear in all copies.
9  * 
10  * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11  * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12  * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13  * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14  * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15  * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16  * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17  */
18
19 /*
20  * Herding objects between lists happens here.
21  */
22
23 #include "bouncer.h"
24
25 /* those items will be allocated as needed, never freed */
26 STATLIST(user_list);
27 STATLIST(database_list);
28 STATLIST(pool_list);
29
30 Tree user_tree;
31
32 /*
33  * client and server objects will be pre-allocated
34  * they are always in either active or free lists
35  * in addition to others.
36  */
37 STATLIST(login_client_list);
38
39 ObjectCache *server_cache;
40 ObjectCache *client_cache;
41 ObjectCache *db_cache;
42 ObjectCache *pool_cache;
43 ObjectCache *user_cache;
44 ObjectCache *iobuf_cache;
45
46 /*
47  * libevent may still report events when event_del()
48  * is called from somewhere else.  So hide just freed
49  * PgSockets for one loop.
50  */
51 static STATLIST(justfree_client_list);
52 static STATLIST(justfree_server_list);
53
54 /* init autodb idle list */
55 STATLIST(autodatabase_idle_list);
56
57 /* fast way to get number of active clients */
58 int get_active_client_count(void)
59 {
60         return objcache_active_count(client_cache);
61 }
62
63 /* fast way to get number of active servers */
64 int get_active_server_count(void)
65 {
66         return objcache_active_count(server_cache);
67 }
68
69 static void construct_client(void *obj)
70 {
71         PgSocket *client = obj;
72
73         memset(client, 0, sizeof(PgSocket));
74         list_init(&client->head);
75         sbuf_init(&client->sbuf, client_proto);
76         client->state = CL_FREE;
77 }
78
79 static void construct_server(void *obj)
80 {
81         PgSocket *server = obj;
82
83         memset(server, 0, sizeof(PgSocket));
84         list_init(&server->head);
85         sbuf_init(&server->sbuf, server_proto);
86         server->state = SV_FREE;
87 }
88
89 /* compare string with PgUser->name, for usage with btree */
90 static int user_node_cmp(long userptr, Node *node)
91 {
92         const char *name = (const char *)userptr;
93         PgUser *user = container_of(node, PgUser, tree_node);
94         return strcmp(name, user->name);
95 }
96
97 /* initialization before config loading */
98 void init_objects(void)
99 {
100         tree_init(&user_tree, user_node_cmp, NULL);
101         user_cache = objcache_create("user_cache", sizeof(PgUser), 0, NULL);
102         db_cache = objcache_create("db_cache", sizeof(PgDatabase), 0, NULL);
103         pool_cache = objcache_create("pool_cache", sizeof(PgPool), 0, NULL);
104
105         if (!user_cache || !db_cache || !pool_cache)
106                 fatal("cannot create initial caches");
107 }
108
109 static void do_iobuf_reset(void *arg)
110 {
111         IOBuf *io = arg;
112         iobuf_reset(io);
113 }
114
115 /* initialization after config loading */
116 void init_caches(void)
117 {
118         server_cache = objcache_create("server_cache", sizeof(PgSocket), 0, construct_server);
119         client_cache = objcache_create("client_cache", sizeof(PgSocket), 0, construct_client);
120         iobuf_cache = objcache_create("iobuf_cache", IOBUF_SIZE, 0, do_iobuf_reset);
121 }
122
123 /* state change means moving between lists */
124 void change_client_state(PgSocket *client, SocketState newstate)
125 {
126         PgPool *pool = client->pool;
127
128         /* remove from old location */
129         switch (client->state) {
130         case CL_FREE:
131                 break;
132         case CL_JUSTFREE:
133                 statlist_remove(&client->head, &justfree_client_list);
134                 break;
135         case CL_LOGIN:
136                 statlist_remove(&client->head, &login_client_list);
137                 break;
138         case CL_WAITING:
139                 statlist_remove(&client->head, &pool->waiting_client_list);
140                 break;
141         case CL_ACTIVE:
142                 statlist_remove(&client->head, &pool->active_client_list);
143                 break;
144         case CL_CANCEL:
145                 statlist_remove(&client->head, &pool->cancel_req_list);
146                 break;
147         default:
148                 fatal("bad cur client state: %d", client->state);
149         }
150
151         client->state = newstate;
152
153         /* put to new location */
154         switch (client->state) {
155         case CL_FREE:
156                 obj_free(client_cache, client);
157                 break;
158         case CL_JUSTFREE:
159                 statlist_append(&client->head, &justfree_client_list);
160                 break;
161         case CL_LOGIN:
162                 statlist_append(&client->head, &login_client_list);
163                 break;
164         case CL_WAITING:
165                 statlist_append(&client->head, &pool->waiting_client_list);
166                 break;
167         case CL_ACTIVE:
168                 statlist_append(&client->head, &pool->active_client_list);
169                 break;
170         case CL_CANCEL:
171                 statlist_append(&client->head, &pool->cancel_req_list);
172                 break;
173         default:
174                 fatal("bad new client state: %d", client->state);
175         }
176 }
177
178 /* state change means moving between lists */
179 void change_server_state(PgSocket *server, SocketState newstate)
180 {
181         PgPool *pool = server->pool;
182
183         /* remove from old location */
184         switch (server->state) {
185         case SV_FREE:
186                 break;
187         case SV_JUSTFREE:
188                 statlist_remove(&server->head, &justfree_server_list);
189                 break;
190         case SV_LOGIN:
191                 statlist_remove(&server->head, &pool->new_server_list);
192                 break;
193         case SV_USED:
194                 statlist_remove(&server->head, &pool->used_server_list);
195                 break;
196         case SV_TESTED:
197                 statlist_remove(&server->head, &pool->tested_server_list);
198                 break;
199         case SV_IDLE:
200                 statlist_remove(&server->head, &pool->idle_server_list);
201                 break;
202         case SV_ACTIVE:
203                 statlist_remove(&server->head, &pool->active_server_list);
204                 break;
205         default:
206                 fatal("change_server_state: bad old server state: %d", server->state);
207         }
208
209         server->state = newstate;
210
211         /* put to new location */
212         switch (server->state) {
213         case SV_FREE:
214                 obj_free(server_cache, server);
215                 break;
216         case SV_JUSTFREE:
217                 statlist_append(&server->head, &justfree_server_list);
218                 break;
219         case SV_LOGIN:
220                 statlist_append(&server->head, &pool->new_server_list);
221                 break;
222         case SV_USED:
223                 /* use LIFO */
224                 statlist_prepend(&server->head, &pool->used_server_list);
225                 break;
226         case SV_TESTED:
227                 statlist_append(&server->head, &pool->tested_server_list);
228                 break;
229         case SV_IDLE:
230                 if (server->close_needed || cf_server_round_robin)
231                         /* try to avoid immediate usage then */
232                         statlist_append(&server->head, &pool->idle_server_list);
233                 else
234                         /* otherwise use LIFO */
235                         statlist_prepend(&server->head, &pool->idle_server_list);
236                 break;
237         case SV_ACTIVE:
238                 statlist_append(&server->head, &pool->active_server_list);
239                 break;
240         default:
241                 fatal("bad server state");
242         }
243 }
244
245 /* compare pool names, for use with put_in_order */
246 static int cmp_pool(List *i1, List *i2)
247 {
248         PgPool *p1 = container_of(i1, PgPool, head);
249         PgPool *p2 = container_of(i2, PgPool, head);
250         if (p1->db != p2->db)
251                 return strcmp(p1->db->name, p2->db->name);
252         if (p1->user != p2->user)
253                 return strcmp(p1->user->name, p2->user->name);
254         return 0;
255 }
256
257 /* compare user names, for use with put_in_order */
258 static int cmp_user(List *i1, List *i2)
259 {
260         PgUser *u1 = container_of(i1, PgUser, head);
261         PgUser *u2 = container_of(i2, PgUser, head);
262         return strcmp(u1->name, u2->name);
263 }
264
265 /* compare db names, for use with put_in_order */
266 static int cmp_database(List *i1, List *i2)
267 {
268         PgDatabase *db1 = container_of(i1, PgDatabase, head);
269         PgDatabase *db2 = container_of(i2, PgDatabase, head);
270         return strcmp(db1->name, db2->name);
271 }
272
273 /* put elem into list in correct pos */
274 static void put_in_order(List *newitem, StatList *list, int (*cmpfn)(List *, List *))
275 {
276         int res;
277         List *item;
278
279         statlist_for_each(item, list) {
280                 res = cmpfn(item, newitem);
281                 if (res == 0)
282                         fatal("put_in_order: found existing elem");
283                 else if (res > 0) {
284                         statlist_put_before(newitem, list, item);
285                         return;
286                 }
287         }
288         statlist_append(newitem, list);
289 }
290
291 /* create new object if new, then return it */
292 PgDatabase *add_database(const char *name)
293 {
294         PgDatabase *db = find_database(name);
295
296         /* create new object if needed */
297         if (db == NULL) {
298                 db = obj_alloc(db_cache);
299                 if (!db)
300                         return NULL;
301
302                 list_init(&db->head);
303                 safe_strcpy(db->name, name, sizeof(db->name));
304                 put_in_order(&db->head, &database_list, cmp_database);
305         }
306
307         return db;
308 }
309
310 /* register new auto database */
311 PgDatabase *register_auto_database(const char *name)
312 {
313         PgDatabase *db;
314         int len;
315         char *cs;
316         
317         if (!cf_autodb_connstr)
318                 return NULL;
319
320         len = strlen(cf_autodb_connstr);
321         cs = malloc(len + 1);
322         if (!cs)
323                 return NULL;
324         memcpy(cs, cf_autodb_connstr, len + 1);
325         parse_database((char*)name, cs);
326         free(cs);
327
328         db = find_database(name);
329         if (db) {
330                 db->db_auto = 1;
331                 /* do not forget to check pool_size like in config_postprocess */
332                 if (db->pool_size < 0)
333                         db->pool_size = cf_default_pool_size;
334                 if (db->res_pool_size < 0)
335                         db->res_pool_size = cf_res_pool_size;
336         }
337
338         return db;
339 }
340
341 /* add or update client users */
342 PgUser *add_user(const char *name, const char *passwd)
343 {
344         PgUser *user = find_user(name);
345
346         if (user == NULL) {
347                 user = obj_alloc(user_cache);
348                 if (!user)
349                         return NULL;
350
351                 list_init(&user->head);
352                 list_init(&user->pool_list);
353                 safe_strcpy(user->name, name, sizeof(user->name));
354                 put_in_order(&user->head, &user_list, cmp_user);
355
356                 tree_insert(&user_tree, (long)user->name, &user->tree_node);
357         }
358         safe_strcpy(user->passwd, passwd, sizeof(user->passwd));
359         return user;
360 }
361
362 /* create separate user object for storing server user info */
363 PgUser *force_user(PgDatabase *db, const char *name, const char *passwd)
364 {
365         PgUser *user = db->forced_user;
366         if (!user) {
367                 user = obj_alloc(user_cache);
368                 if (!user)
369                         return NULL;
370                 list_init(&user->head);
371                 list_init(&user->pool_list);
372         }
373         safe_strcpy(user->name, name, sizeof(user->name));
374         safe_strcpy(user->passwd, passwd, sizeof(user->passwd));
375         db->forced_user = user;
376         return user;
377 }
378
379 /* find an existing database */
380 PgDatabase *find_database(const char *name)
381 {
382         List *item, *tmp;
383         PgDatabase *db;
384         statlist_for_each(item, &database_list) {
385                 db = container_of(item, PgDatabase, head);
386                 if (strcmp(db->name, name) == 0)
387                         return db;
388         }
389         /* also trying to find in idle autodatabases list */
390         statlist_for_each_safe(item, &autodatabase_idle_list, tmp) {
391                 db = container_of(item, PgDatabase, head);
392                 if (strcmp(db->name, name) == 0) {
393                         db->inactive_time = 0;
394                         statlist_remove(&db->head, &autodatabase_idle_list);
395                         put_in_order(&db->head, &database_list, cmp_database);
396                         return db;
397                 }
398         }
399         return NULL;
400 }
401
402 /* find existing user */
403 PgUser *find_user(const char *name)
404 {
405         PgUser *user = NULL;
406         Node *node;
407
408         node = tree_search(&user_tree, (long)name);
409         user = node ? container_of(node, PgUser, tree_node) : NULL;
410         return user;
411 }
412
413 /* create new pool object */
414 static PgPool *new_pool(PgDatabase *db, PgUser *user)
415 {
416         PgPool *pool;
417
418         pool = obj_alloc(pool_cache);
419         if (!pool)
420                 return NULL;
421
422         list_init(&pool->head);
423         list_init(&pool->map_head);
424
425         pool->user = user;
426         pool->db = db;
427
428         statlist_init(&pool->active_client_list, "active_client_list");
429         statlist_init(&pool->waiting_client_list, "waiting_client_list");
430         statlist_init(&pool->active_server_list, "active_server_list");
431         statlist_init(&pool->idle_server_list, "idle_server_list");
432         statlist_init(&pool->tested_server_list, "tested_server_list");
433         statlist_init(&pool->used_server_list, "used_server_list");
434         statlist_init(&pool->new_server_list, "new_server_list");
435         statlist_init(&pool->cancel_req_list, "cancel_req_list");
436
437         list_append(&pool->map_head, &user->pool_list);
438
439         /* keep pools in db/user order to make stats faster */
440         put_in_order(&pool->head, &pool_list, cmp_pool);
441
442         return pool;
443 }
444
445 /* find pool object, create if needed */
446 PgPool *get_pool(PgDatabase *db, PgUser *user)
447 {
448         List *item;
449         PgPool *pool;
450
451         if (!db || !user)
452                 return NULL;
453
454         list_for_each(item, &user->pool_list) {
455                 pool = container_of(item, PgPool, map_head);
456                 if (pool->db == db)
457                         return pool;
458         }
459
460         return new_pool(db, user);
461 }
462
463 /* deactivate socket and put into wait queue */
464 static void pause_client(PgSocket *client)
465 {
466         Assert(client->state == CL_ACTIVE);
467
468         slog_debug(client, "pause_client");
469         change_client_state(client, CL_WAITING);
470         if (!sbuf_pause(&client->sbuf))
471                 disconnect_client(client, true, "pause failed");
472 }
473
474 /* wake client from wait */
475 void activate_client(PgSocket *client)
476 {
477         Assert(client->state == CL_WAITING);
478
479         slog_debug(client, "activate_client");
480         change_client_state(client, CL_ACTIVE);
481         sbuf_continue(&client->sbuf);
482 }
483
484 /* link if found, otherwise put into wait queue */
485 bool find_server(PgSocket *client)
486 {
487         PgPool *pool = client->pool;
488         PgSocket *server;
489         bool res;
490         bool varchange = false;
491
492         Assert(client->state == CL_ACTIVE);
493
494         if (client->link)
495                 return true;
496
497         /* try to get idle server, if allowed */
498         if (cf_pause_mode == P_PAUSE) {
499                 server = NULL;
500         } else {
501                 while (1) {
502                         server = first_socket(&pool->idle_server_list);
503                         if (!server)
504                                 break;
505                         else if (server->close_needed)
506                                 disconnect_server(server, true, "obsolete connection");
507                         else if (!server->ready)
508                                 disconnect_server(server, true, "idle server got dirty");
509                         else
510                                 break;
511                 }
512
513                 /*
514                  * Don't let clients queue at all, if there is no working server connection.
515                  *
516                  * It must still allow following cases:
517                  * - empty pool on startup
518                  * - idle pool where all servers are removed
519                  *
520                  * Current logic:
521                  * - old server connections will be dropped by query_timeout
522                  * - new server connections fail due to server_connect_timeout, or other failure
523                  */
524                 if (!server && pool->last_connect_failed) {
525                         int cnt = pool_server_count(pool) - statlist_count(&pool->new_server_list);
526                         if (!cnt) {
527                                 /* usual relaunch wont work, as there are no waiting clients */
528                                 launch_new_connection(client->pool);
529
530                                 disconnect_client(client, true, "no working server connection");
531                                 return false;
532                         }
533                 }
534         }
535         Assert(!server || server->state == SV_IDLE);
536
537         /* send var changes */
538         if (server) {
539                 res = varcache_apply(server, client, &varchange);
540                 if (!res) {
541                         disconnect_server(server, true, "var change failed");
542                         server = NULL;
543                 }
544         }
545
546         /* link or send to waiters list */
547         if (server) {
548                 client->link = server;
549                 server->link = client;
550                 change_server_state(server, SV_ACTIVE);
551                 if (varchange) {
552                         server->setting_vars = 1;
553                         server->ready = 0;
554                         res = false; /* don't process client data yet */
555                         if (!sbuf_pause(&client->sbuf))
556                                 disconnect_client(client, true, "pause failed");
557                 } else
558                         res = true;
559         } else {
560                 pause_client(client);
561                 res = false;
562         }
563         return res;
564 }
565
566 /* pick waiting client */
567 static bool reuse_on_release(PgSocket *server)
568 {
569         bool res = true;
570         PgPool *pool = server->pool;
571         PgSocket *client = first_socket(&pool->waiting_client_list);
572         if (client) {
573                 activate_client(client);
574
575                 /*
576                  * As the activate_client() does full read loop,
577                  * then it may happen that linked client close
578                  * couses server close.  Report it.
579                  */
580                 if (server->state == SV_FREE || server->state == SV_JUSTFREE)
581                         res = false;
582         }
583         return res;
584 }
585
586 /* send reset query */
587 static bool reset_on_release(PgSocket *server)
588 {
589         bool res;
590         
591         Assert(server->state == SV_TESTED);
592
593         slog_debug(server, "Resetting: %s", cf_server_reset_query);
594         SEND_generic(res, server, 'Q', "s", cf_server_reset_query);
595         if (!res)
596                 disconnect_server(server, false, "reset query failed");
597         return res;
598 }
599
600 static bool life_over(PgSocket *server)
601 {
602         PgPool *pool = server->pool;
603         usec_t lifetime_kill_gap = 0;
604         usec_t now = get_cached_time();
605         usec_t age = now - server->connect_time;
606         usec_t last_kill = now - pool->last_lifetime_disconnect;
607
608         if (age < cf_server_lifetime)
609                 return false;
610
611         if (pool->db->pool_size > 0)
612                 lifetime_kill_gap = cf_server_lifetime / pool->db->pool_size;
613
614         if (last_kill >= lifetime_kill_gap)
615                 return true;
616
617         return false;
618 }
619
620 /* connecting/active -> idle, unlink if needed */
621 bool release_server(PgSocket *server)
622 {
623         PgPool *pool = server->pool;
624         SocketState newstate = SV_IDLE;
625
626         Assert(server->ready);
627
628         /* remove from old list */
629         switch (server->state) {
630         case SV_ACTIVE:
631                 server->link->link = NULL;
632                 server->link = NULL;
633
634                 if (*cf_server_reset_query)
635                         /* notify reset is required */
636                         newstate = SV_TESTED;
637                 else if (cf_server_check_delay == 0 && *cf_server_check_query)
638                         /*
639                          * deprecated: before reset_query, the check_delay = 0
640                          * was used to get same effect.  This if() can be removed
641                          * after couple of releases.
642                          */
643                         newstate = SV_USED;
644         case SV_USED:
645         case SV_TESTED:
646                 break;
647         case SV_LOGIN:
648                 pool->last_connect_failed = 0;
649                 break;
650         default:
651                 fatal("bad server state in release_server (%d)", server->state);
652         }
653
654         /* enforce lifetime immidiately on release */
655         if (server->state != SV_LOGIN && life_over(server)) {
656                 disconnect_server(server, true, "server_lifetime");
657                 return false;
658         }
659
660         /* enforce close request */
661         if (server->close_needed) {
662                 disconnect_server(server, true, "close_needed");
663                 return false;
664         }
665
666         Assert(server->link == NULL);
667         slog_noise(server, "release_server: new state=%d", newstate);
668         change_server_state(server, newstate);
669
670         if (newstate == SV_IDLE)
671                 /* immediately process waiters, to give fair chance */
672                 return reuse_on_release(server);
673         else if (newstate == SV_TESTED)
674                 return reset_on_release(server);
675
676         return true;
677 }
678
679 /* drop server connection */
680 void disconnect_server(PgSocket *server, bool notify, const char *reason)
681 {
682         PgPool *pool = server->pool;
683         PgSocket *client = server->link;
684         static const uint8_t pkt_term[] = {'X', 0,0,0,4};
685         int send_term = 1;
686         usec_t now = get_cached_time();
687
688         if (cf_log_disconnections)
689                 slog_info(server, "closing because: %s (age=%llu)", reason,
690                           (now - server->connect_time) / USEC);
691
692         switch (server->state) {
693         case SV_ACTIVE:
694                 client = server->link;
695                 if (client) {
696                         client->link = NULL;
697                         server->link = NULL;
698                         disconnect_client(client, true, reason);
699                 }
700                 break;
701         case SV_TESTED:
702         case SV_USED:
703         case SV_IDLE:
704                 break;
705         case SV_LOGIN:
706                 /*
707                  * usually disconnect means problems in startup phase,
708                  * except when sending cancel packet
709                  */
710                 if (!server->ready)
711                         pool->last_connect_failed = 1;
712                 else
713                         send_term = 0;
714                 break;
715         default:
716                 fatal("disconnect_server: bad server state (%d)", server->state);
717         }
718
719         Assert(server->link == NULL);
720
721         /* notify server and close connection */
722         if (send_term && notify) {
723                 if (!sbuf_answer(&server->sbuf, pkt_term, sizeof(pkt_term)))
724                         /* ignore result */
725                         notify = false;
726         }
727
728         change_server_state(server, SV_JUSTFREE);
729         if (!sbuf_close(&server->sbuf))
730                 log_noise("sbuf_close failed, retry later");
731 }
732
733 /* drop client connection */
734 void disconnect_client(PgSocket *client, bool notify, const char *reason)
735 {
736         usec_t now = get_cached_time();
737
738         if (cf_log_disconnections)
739                 slog_info(client, "closing because: %s (age=%llu)", reason,
740                           (now - client->connect_time) / USEC);
741
742         switch (client->state) {
743         case CL_ACTIVE:
744                 if (client->link) {
745                         PgSocket *server = client->link;
746                         /* ->ready may be set before all is sent */
747                         if (server->ready && sbuf_is_empty(&server->sbuf)) {
748                                 /* retval does not matter here */
749                                 release_server(server);
750                         } else {
751                                 server->link = NULL;
752                                 client->link = NULL;
753                                 disconnect_server(server, true, "unclean server");
754                         }
755                 }
756         case CL_LOGIN:
757         case CL_WAITING:
758         case CL_CANCEL:
759                 break;
760         default:
761                 fatal("bad client state in disconnect_client: %d", client->state);
762         }
763
764         /* send reason to client */
765         if (notify && reason && client->state != CL_CANCEL) {
766                 /*
767                  * don't send Ready pkt here, or client won't notice
768                  * closed connection
769                  */
770                 send_pooler_error(client, false, reason);
771         }
772
773         change_client_state(client, CL_JUSTFREE);
774         if (!sbuf_close(&client->sbuf))
775                 log_noise("sbuf_close failed, retry later");
776 }
777
778 /* the pool needs new connection, if possible */
779 void launch_new_connection(PgPool *pool)
780 {
781         PgSocket *server;
782         int total;
783         const char *unix_dir = cf_unix_socket_dir;
784         bool res;
785
786         /* allow only small number of connection attempts at a time */
787         if (!statlist_empty(&pool->new_server_list)) {
788                 log_debug("launch_new_connection: already progress");
789                 return;
790         }
791
792         /* if server bounces, don't retry too fast */
793         if (pool->last_connect_failed) {
794                 usec_t now = get_cached_time();
795                 if (now - pool->last_connect_time < cf_server_login_retry) {
796                         log_debug("launch_new_connection: last failed, wait");
797                         return;
798                 }
799         }
800
801         /* is it allowed to add servers? */
802         total = pool_server_count(pool);
803         if (total >= pool->db->pool_size && pool->welcome_msg_ready) {
804                 /* should we use reserve pool? */
805                 if (cf_res_pool_timeout && pool->db->res_pool_size) {
806                         usec_t now = get_cached_time();
807                         PgSocket *c = first_socket(&pool->waiting_client_list);
808                         if (c && (now - c->request_time) >= cf_res_pool_timeout) {
809                                 if (total < pool->db->pool_size + pool->db->res_pool_size) {
810                                         log_debug("reserve_pool activated");
811                                         goto allow_new;
812                                 }
813                         }
814                 }
815                 log_debug("launch_new_connection: pool full (%d >= %d)",
816                                 total, pool->db->pool_size);
817                 return;
818         }
819
820 allow_new:
821         /* get free conn object */
822         server = obj_alloc(server_cache);
823         if (!server) {
824                 log_debug("launch_new_connection: no memory");
825                 return;
826         }
827
828         /* initialize it */
829         server->pool = pool;
830         server->auth_user = server->pool->user;
831         server->remote_addr = server->pool->db->addr;
832         server->connect_time = get_cached_time();
833         pool->last_connect_time = get_cached_time();
834         change_server_state(server, SV_LOGIN);
835
836         if (cf_log_connections)
837                 slog_info(server, "new connection to server");
838
839         /* override socket location if requested */
840         if (server->pool->db->unix_socket_dir[0])
841                 unix_dir = server->pool->db->unix_socket_dir;
842
843         /* start connecting */
844         res = sbuf_connect(&server->sbuf, &server->remote_addr, unix_dir,
845                            cf_server_connect_timeout / USEC);
846         if (!res)
847                 log_noise("failed to launch new connection");
848 }
849
850 /* new client connection attempt */
851 PgSocket * accept_client(int sock,
852                          const struct sockaddr_in *addr,
853                          bool is_unix)
854 {
855         bool res;
856         PgSocket *client;
857
858         /* get free PgSocket */
859         client = obj_alloc(client_cache);
860         if (!client) {
861                 log_warning("cannot allocate client struct");
862                 safe_close(sock);
863                 return NULL;
864         }
865
866         client->connect_time = client->request_time = get_cached_time();
867         client->query_start = 0;
868
869         fill_remote_addr(client, sock, is_unix);
870         fill_local_addr(client, sock, is_unix);
871
872         change_client_state(client, CL_LOGIN);
873
874         res = sbuf_accept(&client->sbuf, sock, is_unix);
875         if (!res) {
876                 if (cf_log_connections)
877                         slog_debug(client, "failed connection attempt");
878                 return NULL;
879         }
880
881         return client;
882 }
883
884 /* send cached parameters to client to pretend being server */
885 /* client managed to authenticate, send welcome msg and accept queries */
886 bool finish_client_login(PgSocket *client)
887 {
888         switch (client->state) {
889         case CL_LOGIN:
890                 change_client_state(client, CL_ACTIVE);
891         case CL_ACTIVE:
892                 break;
893         default:
894                 fatal("bad client state");
895         }
896
897         if (!welcome_client(client)) {
898                 log_debug("finish_client_login: no welcome message, pause");
899                 client->wait_for_welcome = 1;
900                 pause_client(client);
901                 if (cf_pause_mode == P_NONE)
902                         launch_new_connection(client->pool);
903                 return false;
904         }
905         client->wait_for_welcome = 0;
906
907         slog_debug(client, "logged in");
908
909         return true;
910 }
911
912 /* client->cancel_key has requested client key */
913 void accept_cancel_request(PgSocket *req)
914 {
915         List *pitem, *citem;
916         PgPool *pool;
917         PgSocket *server = NULL, *client, *main_client = NULL;
918
919         Assert(req->state == CL_LOGIN);
920
921         /* find real client this is for */
922         statlist_for_each(pitem, &pool_list) {
923                 pool = container_of(pitem, PgPool, head);
924                 statlist_for_each(citem, &pool->active_client_list) {
925                         client = container_of(citem, PgSocket, head);
926                         if (memcmp(client->cancel_key, req->cancel_key, 8) == 0) {
927                                 main_client = client;
928                                 goto found;
929                         }
930                 }
931         }
932 found:
933
934         /* wrong key */
935         if (!main_client) {
936                 disconnect_client(req, false, "failed cancel request");
937                 return;
938         }
939
940         /* not linked client, just drop it then */
941         if (!main_client->link) {
942                 bool res;
943
944                 /* let administrative cancel be handled elsewhere */
945                 if (main_client->pool->db->admin) {
946                         disconnect_client(req, false, "cancel request for console client");
947                         admin_handle_cancel(main_client);
948                         return;
949                 }
950
951                 disconnect_client(req, false, "cancel request for idle client");
952
953                 /* notify readiness */
954                 SEND_ReadyForQuery(res, main_client);
955                 if (!res)
956                         disconnect_client(main_client, true,
957                                           "ReadyForQuery for main_client failed");
958                 return;
959         }
960
961         /* drop the connection, if fails, retry later in justfree list */
962         if (!sbuf_close(&req->sbuf))
963                 log_noise("sbuf_close failed, retry later");
964
965         /* remember server key */
966         server = main_client->link;
967         memcpy(req->cancel_key, server->cancel_key, 8);
968
969         /* attach to target pool */
970         req->pool = pool;
971         change_client_state(req, CL_CANCEL);
972
973         /* need fresh connection */
974         launch_new_connection(pool);
975 }
976
977 void forward_cancel_request(PgSocket *server)
978 {
979         bool res;
980         PgSocket *req = first_socket(&server->pool->cancel_req_list);
981
982         Assert(req != NULL && req->state == CL_CANCEL);
983         Assert(server->state == SV_LOGIN);
984
985         SEND_CancelRequest(res, server, req->cancel_key);
986
987         change_client_state(req, CL_JUSTFREE);
988 }
989
990 bool use_client_socket(int fd, PgAddr *addr,
991                        const char *dbname, const char *username,
992                        uint64_t ckey, int oldfd, int linkfd,
993                        const char *client_enc, const char *std_string,
994                        const char *datestyle, const char *timezone)
995 {
996         PgSocket *client;
997         PktBuf tmp;
998
999         client = accept_client(fd, NULL, addr->is_unix);
1000         if (client == NULL)
1001                 return false;
1002         client->suspended = 1;
1003
1004         if (!set_pool(client, dbname, username))
1005                 return false;
1006
1007         change_client_state(client, CL_ACTIVE);
1008
1009         /* store old cancel key */
1010         pktbuf_static(&tmp, client->cancel_key, 8);
1011         pktbuf_put_uint64(&tmp, ckey);
1012
1013         /* store old fds */
1014         client->tmp_sk_oldfd = oldfd;
1015         client->tmp_sk_linkfd = linkfd;
1016
1017         varcache_set(&client->vars, "client_encoding", client_enc);
1018         varcache_set(&client->vars, "standard_conforming_strings", std_string);
1019         varcache_set(&client->vars, "datestyle", datestyle);
1020         varcache_set(&client->vars, "timezone", timezone);
1021
1022         return true;
1023 }
1024
1025 bool use_server_socket(int fd, PgAddr *addr,
1026                        const char *dbname, const char *username,
1027                        uint64_t ckey, int oldfd, int linkfd,
1028                        const char *client_enc, const char *std_string,
1029                        const char *datestyle, const char *timezone)
1030 {
1031         PgDatabase *db = find_database(dbname);
1032         PgUser *user;
1033         PgPool *pool;
1034         PgSocket *server;
1035         PktBuf tmp;
1036         bool res;
1037         
1038         /* if the database not found, it's an auto database -> registering... */
1039         if (!db) {
1040                 db = register_auto_database(dbname);
1041                 if (!db)
1042                         return true;
1043         }
1044
1045         if (db->forced_user)
1046                 user = db->forced_user;
1047         else
1048                 user = find_user(username);
1049
1050         pool = get_pool(db, user);
1051         if (!pool)
1052                 return false;
1053
1054         server = obj_alloc(server_cache);
1055         if (!server)
1056                 return false;
1057
1058         res = sbuf_accept(&server->sbuf, fd, addr->is_unix);
1059         if (!res)
1060                 return false;
1061
1062         server->suspended = 1;
1063         server->pool = pool;
1064         server->auth_user = user;
1065         server->connect_time = server->request_time = get_cached_time();
1066         server->query_start = 0;
1067
1068         fill_remote_addr(server, fd, addr->is_unix);
1069         fill_local_addr(server, fd, addr->is_unix);
1070
1071         if (linkfd) {
1072                 server->ready = 0;
1073                 change_server_state(server, SV_ACTIVE);
1074         } else {
1075                 server->ready = 1;
1076                 change_server_state(server, SV_IDLE);
1077         }
1078
1079         /* store old cancel key */
1080         pktbuf_static(&tmp, server->cancel_key, 8);
1081         pktbuf_put_uint64(&tmp, ckey);
1082
1083         /* store old fds */
1084         server->tmp_sk_oldfd = oldfd;
1085         server->tmp_sk_linkfd = linkfd;
1086
1087         varcache_set(&server->vars, "client_encoding", client_enc);
1088         varcache_set(&server->vars, "standard_conforming_strings", std_string);
1089         varcache_set(&server->vars, "datestyle", datestyle);
1090         varcache_set(&server->vars, "timezone", timezone);
1091
1092         return true;
1093 }
1094
1095 void for_each_server(PgPool *pool, void (*func)(PgSocket *sk))
1096 {
1097         List *item;
1098
1099         statlist_for_each(item, &pool->idle_server_list)
1100                 func(container_of(item, PgSocket, head));
1101
1102         statlist_for_each(item, &pool->used_server_list)
1103                 func(container_of(item, PgSocket, head));
1104
1105         statlist_for_each(item, &pool->tested_server_list)
1106                 func(container_of(item, PgSocket, head));
1107
1108         statlist_for_each(item, &pool->active_server_list)
1109                 func(container_of(item, PgSocket, head));
1110
1111         statlist_for_each(item, &pool->new_server_list)
1112                 func(container_of(item, PgSocket, head));
1113 }
1114
1115 static void tag_dirty(PgSocket *sk)
1116 {
1117         sk->close_needed = 1;
1118 }
1119
1120 void tag_database_dirty(PgDatabase *db)
1121 {
1122         List *item;
1123         PgPool *pool;
1124
1125         statlist_for_each(item, &pool_list) {
1126                 pool = container_of(item, PgPool, head);
1127                 if (pool->db == db)
1128                         for_each_server(pool, tag_dirty);
1129         }
1130 }
1131
1132 /* move objects from justfree_* to free_* lists */
1133 void reuse_just_freed_objects(void)
1134 {
1135         List *tmp, *item;
1136         PgSocket *sk;
1137         bool close_works = true;
1138
1139         /*
1140          * event_del() may fail because of ENOMEM for event handlers
1141          * that need only changes sent to kernel on each loop.
1142          *
1143          * Keep open sbufs in justfree lists until successful.
1144          */
1145
1146         statlist_for_each_safe(item, &justfree_client_list, tmp) {
1147                 sk = container_of(item, PgSocket, head);
1148                 if (sbuf_is_closed(&sk->sbuf))
1149                         change_client_state(sk, CL_FREE);
1150                 else if (close_works)
1151                         close_works = sbuf_close(&sk->sbuf);
1152         }
1153         statlist_for_each_safe(item, &justfree_server_list, tmp) {
1154                 sk = container_of(item, PgSocket, head);
1155                 if (sbuf_is_closed(&sk->sbuf))
1156                         change_server_state(sk, SV_FREE);
1157                 else if (close_works)
1158                         close_works = sbuf_close(&sk->sbuf);
1159         }
1160 }
1161