]> granicus.if.org Git - pgbouncer/blob - src/objects.c
40783552214d446d02cbc27349d18275ed11ced7
[pgbouncer] / src / objects.c
1 /*
2  * PgBouncer - Lightweight connection pooler for PostgreSQL.
3  * 
4  * Copyright (c) 2007-2009  Marko Kreen, Skype Technologies OÜ
5  * 
6  * Permission to use, copy, modify, and/or distribute this software for any
7  * purpose with or without fee is hereby granted, provided that the above
8  * copyright notice and this permission notice appear in all copies.
9  * 
10  * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11  * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12  * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13  * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14  * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15  * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16  * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17  */
18
19 /*
20  * Herding objects between lists happens here.
21  */
22
23 #include "bouncer.h"
24
25 /* those items will be allocated as needed, never freed */
26 STATLIST(user_list);
27 STATLIST(database_list);
28 STATLIST(pool_list);
29
30 Tree user_tree;
31
32 /*
33  * client and server objects will be pre-allocated
34  * they are always in either active or free lists
35  * in addition to others.
36  */
37 STATLIST(login_client_list);
38
39 ObjectCache *server_cache;
40 ObjectCache *client_cache;
41 ObjectCache *db_cache;
42 ObjectCache *pool_cache;
43 ObjectCache *user_cache;
44 ObjectCache *iobuf_cache;
45
46 /*
47  * libevent may still report events when event_del()
48  * is called from somewhere else.  So hide just freed
49  * PgSockets for one loop.
50  */
51 static STATLIST(justfree_client_list);
52 static STATLIST(justfree_server_list);
53
54 /* init autodb idle list */
55 STATLIST(autodatabase_idle_list);
56
57 /* fast way to get number of active clients */
58 int get_active_client_count(void)
59 {
60         return objcache_active_count(client_cache);
61 }
62
63 /* fast way to get number of active servers */
64 int get_active_server_count(void)
65 {
66         return objcache_active_count(server_cache);
67 }
68
69 static void construct_client(void *obj)
70 {
71         PgSocket *client = obj;
72
73         memset(client, 0, sizeof(PgSocket));
74         list_init(&client->head);
75         sbuf_init(&client->sbuf, client_proto);
76         client->state = CL_FREE;
77 }
78
79 static void construct_server(void *obj)
80 {
81         PgSocket *server = obj;
82
83         memset(server, 0, sizeof(PgSocket));
84         list_init(&server->head);
85         sbuf_init(&server->sbuf, server_proto);
86         server->state = SV_FREE;
87 }
88
89 /* compare string with PgUser->name, for usage with btree */
90 static int user_node_cmp(long userptr, Node *node)
91 {
92         const char *name = (const char *)userptr;
93         PgUser *user = container_of(node, PgUser, tree_node);
94         return strcmp(name, user->name);
95 }
96
97 /* initialization before config loading */
98 void init_objects(void)
99 {
100         tree_init(&user_tree, user_node_cmp, NULL);
101         user_cache = objcache_create("user_cache", sizeof(PgUser), 0, NULL);
102         db_cache = objcache_create("db_cache", sizeof(PgDatabase), 0, NULL);
103         pool_cache = objcache_create("pool_cache", sizeof(PgPool), 0, NULL);
104
105         if (!user_cache || !db_cache || !pool_cache)
106                 fatal("cannot create initial caches");
107 }
108
109 static void do_iobuf_reset(void *arg)
110 {
111         IOBuf *io = arg;
112         iobuf_reset(io);
113 }
114
115 /* initialization after config loading */
116 void init_caches(void)
117 {
118         server_cache = objcache_create("server_cache", sizeof(PgSocket), 0, construct_server);
119         client_cache = objcache_create("client_cache", sizeof(PgSocket), 0, construct_client);
120         iobuf_cache = objcache_create("iobuf_cache", IOBUF_SIZE, 0, do_iobuf_reset);
121 }
122
123 /* state change means moving between lists */
124 void change_client_state(PgSocket *client, SocketState newstate)
125 {
126         PgPool *pool = client->pool;
127
128         /* remove from old location */
129         switch (client->state) {
130         case CL_FREE:
131                 break;
132         case CL_JUSTFREE:
133                 statlist_remove(&client->head, &justfree_client_list);
134                 break;
135         case CL_LOGIN:
136                 statlist_remove(&client->head, &login_client_list);
137                 break;
138         case CL_WAITING:
139                 statlist_remove(&client->head, &pool->waiting_client_list);
140                 break;
141         case CL_ACTIVE:
142                 statlist_remove(&client->head, &pool->active_client_list);
143                 break;
144         case CL_CANCEL:
145                 statlist_remove(&client->head, &pool->cancel_req_list);
146                 break;
147         default:
148                 fatal("bad cur client state: %d", client->state);
149         }
150
151         client->state = newstate;
152
153         /* put to new location */
154         switch (client->state) {
155         case CL_FREE:
156                 obj_free(client_cache, client);
157                 break;
158         case CL_JUSTFREE:
159                 statlist_append(&client->head, &justfree_client_list);
160                 break;
161         case CL_LOGIN:
162                 statlist_append(&client->head, &login_client_list);
163                 break;
164         case CL_WAITING:
165                 statlist_append(&client->head, &pool->waiting_client_list);
166                 break;
167         case CL_ACTIVE:
168                 statlist_append(&client->head, &pool->active_client_list);
169                 break;
170         case CL_CANCEL:
171                 statlist_append(&client->head, &pool->cancel_req_list);
172                 break;
173         default:
174                 fatal("bad new client state: %d", client->state);
175         }
176 }
177
178 /* state change means moving between lists */
179 void change_server_state(PgSocket *server, SocketState newstate)
180 {
181         PgPool *pool = server->pool;
182
183         /* remove from old location */
184         switch (server->state) {
185         case SV_FREE:
186                 break;
187         case SV_JUSTFREE:
188                 statlist_remove(&server->head, &justfree_server_list);
189                 break;
190         case SV_LOGIN:
191                 statlist_remove(&server->head, &pool->new_server_list);
192                 break;
193         case SV_USED:
194                 statlist_remove(&server->head, &pool->used_server_list);
195                 break;
196         case SV_TESTED:
197                 statlist_remove(&server->head, &pool->tested_server_list);
198                 break;
199         case SV_IDLE:
200                 statlist_remove(&server->head, &pool->idle_server_list);
201                 break;
202         case SV_ACTIVE:
203                 statlist_remove(&server->head, &pool->active_server_list);
204                 break;
205         default:
206                 fatal("change_server_state: bad old server state: %d", server->state);
207         }
208
209         server->state = newstate;
210
211         /* put to new location */
212         switch (server->state) {
213         case SV_FREE:
214                 obj_free(server_cache, server);
215                 break;
216         case SV_JUSTFREE:
217                 statlist_append(&server->head, &justfree_server_list);
218                 break;
219         case SV_LOGIN:
220                 statlist_append(&server->head, &pool->new_server_list);
221                 break;
222         case SV_USED:
223                 /* use LIFO */
224                 statlist_prepend(&server->head, &pool->used_server_list);
225                 break;
226         case SV_TESTED:
227                 statlist_append(&server->head, &pool->tested_server_list);
228                 break;
229         case SV_IDLE:
230                 if (server->close_needed || cf_server_round_robin)
231                         /* try to avoid immediate usage then */
232                         statlist_append(&server->head, &pool->idle_server_list);
233                 else
234                         /* otherwise use LIFO */
235                         statlist_prepend(&server->head, &pool->idle_server_list);
236                 break;
237         case SV_ACTIVE:
238                 statlist_append(&server->head, &pool->active_server_list);
239                 break;
240         default:
241                 fatal("bad server state");
242         }
243 }
244
245 /* compare pool names, for use with put_in_order */
246 static int cmp_pool(List *i1, List *i2)
247 {
248         PgPool *p1 = container_of(i1, PgPool, head);
249         PgPool *p2 = container_of(i2, PgPool, head);
250         if (p1->db != p2->db)
251                 return strcmp(p1->db->name, p2->db->name);
252         if (p1->user != p2->user)
253                 return strcmp(p1->user->name, p2->user->name);
254         return 0;
255 }
256
257 /* compare user names, for use with put_in_order */
258 static int cmp_user(List *i1, List *i2)
259 {
260         PgUser *u1 = container_of(i1, PgUser, head);
261         PgUser *u2 = container_of(i2, PgUser, head);
262         return strcmp(u1->name, u2->name);
263 }
264
265 /* compare db names, for use with put_in_order */
266 static int cmp_database(List *i1, List *i2)
267 {
268         PgDatabase *db1 = container_of(i1, PgDatabase, head);
269         PgDatabase *db2 = container_of(i2, PgDatabase, head);
270         return strcmp(db1->name, db2->name);
271 }
272
273 /* put elem into list in correct pos */
274 static void put_in_order(List *newitem, StatList *list, int (*cmpfn)(List *, List *))
275 {
276         int res;
277         List *item;
278
279         statlist_for_each(item, list) {
280                 res = cmpfn(item, newitem);
281                 if (res == 0)
282                         fatal("put_in_order: found existing elem");
283                 else if (res > 0) {
284                         statlist_put_before(newitem, list, item);
285                         return;
286                 }
287         }
288         statlist_append(newitem, list);
289 }
290
291 /* create new object if new, then return it */
292 PgDatabase *add_database(const char *name)
293 {
294         PgDatabase *db = find_database(name);
295
296         /* create new object if needed */
297         if (db == NULL) {
298                 db = obj_alloc(db_cache);
299                 if (!db)
300                         return NULL;
301
302                 list_init(&db->head);
303                 safe_strcpy(db->name, name, sizeof(db->name));
304                 put_in_order(&db->head, &database_list, cmp_database);
305         }
306
307         return db;
308 }
309
310 /* register new auto database */
311 PgDatabase *register_auto_database(const char *name)
312 {
313         PgDatabase *db;
314         int len;
315         char *cs;
316         
317         if (!cf_autodb_connstr)
318                 return NULL;
319
320         len = strlen(cf_autodb_connstr);
321         cs = malloc(len + 1);
322         if (!cs)
323                 return NULL;
324         memcpy(cs, cf_autodb_connstr, len + 1);
325         parse_database((char*)name, cs);
326         free(cs);
327
328         db = find_database(name);
329         if (db) {
330                 db->db_auto = 1;
331                 /* do not forget to check pool_size like in config_postprocess */
332                 if (db->pool_size < 0)
333                         db->pool_size = cf_default_pool_size;
334                 if (db->res_pool_size < 0)
335                         db->res_pool_size = cf_res_pool_size;
336         }
337
338         return db;
339 }
340
341 /* add or update client users */
342 PgUser *add_user(const char *name, const char *passwd)
343 {
344         PgUser *user = find_user(name);
345
346         if (user == NULL) {
347                 user = obj_alloc(user_cache);
348                 if (!user)
349                         return NULL;
350
351                 list_init(&user->head);
352                 list_init(&user->pool_list);
353                 safe_strcpy(user->name, name, sizeof(user->name));
354                 put_in_order(&user->head, &user_list, cmp_user);
355
356                 tree_insert(&user_tree, (long)user->name, &user->tree_node);
357         }
358         safe_strcpy(user->passwd, passwd, sizeof(user->passwd));
359         return user;
360 }
361
362 /* create separate user object for storing server user info */
363 PgUser *force_user(PgDatabase *db, const char *name, const char *passwd)
364 {
365         PgUser *user = db->forced_user;
366         if (!user) {
367                 user = obj_alloc(user_cache);
368                 if (!user)
369                         return NULL;
370                 list_init(&user->head);
371                 list_init(&user->pool_list);
372         }
373         safe_strcpy(user->name, name, sizeof(user->name));
374         safe_strcpy(user->passwd, passwd, sizeof(user->passwd));
375         db->forced_user = user;
376         return user;
377 }
378
379 /* find an existing database */
380 PgDatabase *find_database(const char *name)
381 {
382         List *item, *tmp;
383         PgDatabase *db;
384         statlist_for_each(item, &database_list) {
385                 db = container_of(item, PgDatabase, head);
386                 if (strcmp(db->name, name) == 0)
387                         return db;
388         }
389         /* also trying to find in idle autodatabases list */
390         statlist_for_each_safe(item, &autodatabase_idle_list, tmp) {
391                 db = container_of(item, PgDatabase, head);
392                 if (strcmp(db->name, name) == 0) {
393                         db->inactive_time = 0;
394                         statlist_remove(&db->head, &autodatabase_idle_list);
395                         put_in_order(&db->head, &database_list, cmp_database);
396                         return db;
397                 }
398         }
399         return NULL;
400 }
401
402 /* find existing user */
403 PgUser *find_user(const char *name)
404 {
405         PgUser *user = NULL;
406         Node *node;
407
408         node = tree_search(&user_tree, (long)name);
409         user = node ? container_of(node, PgUser, tree_node) : NULL;
410         return user;
411 }
412
413 /* create new pool object */
414 static PgPool *new_pool(PgDatabase *db, PgUser *user)
415 {
416         PgPool *pool;
417
418         pool = obj_alloc(pool_cache);
419         if (!pool)
420                 return NULL;
421
422         list_init(&pool->head);
423         list_init(&pool->map_head);
424
425         pool->user = user;
426         pool->db = db;
427
428         statlist_init(&pool->active_client_list, "active_client_list");
429         statlist_init(&pool->waiting_client_list, "waiting_client_list");
430         statlist_init(&pool->active_server_list, "active_server_list");
431         statlist_init(&pool->idle_server_list, "idle_server_list");
432         statlist_init(&pool->tested_server_list, "tested_server_list");
433         statlist_init(&pool->used_server_list, "used_server_list");
434         statlist_init(&pool->new_server_list, "new_server_list");
435         statlist_init(&pool->cancel_req_list, "cancel_req_list");
436
437         list_append(&pool->map_head, &user->pool_list);
438
439         /* keep pools in db/user order to make stats faster */
440         put_in_order(&pool->head, &pool_list, cmp_pool);
441
442         return pool;
443 }
444
445 /* find pool object, create if needed */
446 PgPool *get_pool(PgDatabase *db, PgUser *user)
447 {
448         List *item;
449         PgPool *pool;
450
451         if (!db || !user)
452                 return NULL;
453
454         list_for_each(item, &user->pool_list) {
455                 pool = container_of(item, PgPool, map_head);
456                 if (pool->db == db)
457                         return pool;
458         }
459
460         return new_pool(db, user);
461 }
462
463 /* deactivate socket and put into wait queue */
464 static void pause_client(PgSocket *client)
465 {
466         Assert(client->state == CL_ACTIVE);
467
468         slog_debug(client, "pause_client");
469         change_client_state(client, CL_WAITING);
470         if (!sbuf_pause(&client->sbuf))
471                 disconnect_client(client, true, "pause failed");
472 }
473
474 /* wake client from wait */
475 void activate_client(PgSocket *client)
476 {
477         Assert(client->state == CL_WAITING);
478
479         slog_debug(client, "activate_client");
480         change_client_state(client, CL_ACTIVE);
481         sbuf_continue(&client->sbuf);
482 }
483
484 /* link if found, otherwise put into wait queue */
485 bool find_server(PgSocket *client)
486 {
487         PgPool *pool = client->pool;
488         PgSocket *server;
489         bool res;
490         bool varchange = false;
491
492         Assert(client->state == CL_ACTIVE);
493
494         if (client->link)
495                 return true;
496
497         /* try to get idle server, if allowed */
498         if (cf_pause_mode == P_PAUSE) {
499                 server = NULL;
500         } else {
501                 while (1) {
502                         server = first_socket(&pool->idle_server_list);
503                         if (!server)
504                                 break;
505                         else if (server->close_needed)
506                                 disconnect_server(server, true, "obsolete connection");
507                         else if (!server->ready)
508                                 disconnect_server(server, true, "idle server got dirty");
509                         else
510                                 break;
511                 }
512
513                 /*
514                  * Don't let clients queue at all, if there is no working server connection.
515                  *
516                  * It must still allow following cases:
517                  * - empty pool on startup
518                  * - idle pool where all servers are removed
519                  *
520                  * Current logic:
521                  * - old server connections will be dropped by query_timeout
522                  * - new server connections fail due to server_connect_timeout, or other failure
523                  */
524                 if (!server && pool->last_connect_failed) {
525                         int cnt = pool_server_count(pool) - statlist_count(&pool->new_server_list);
526                         if (!cnt) {
527                                 disconnect_client(client, true, "no working server connection");
528                                 return false;
529                         }
530                 }
531         }
532         Assert(!server || server->state == SV_IDLE);
533
534         /* send var changes */
535         if (server) {
536                 res = varcache_apply(server, client, &varchange);
537                 if (!res) {
538                         disconnect_server(server, true, "var change failed");
539                         server = NULL;
540                 }
541         }
542
543         /* link or send to waiters list */
544         if (server) {
545                 client->link = server;
546                 server->link = client;
547                 change_server_state(server, SV_ACTIVE);
548                 if (varchange) {
549                         server->setting_vars = 1;
550                         server->ready = 0;
551                         res = false; /* don't process client data yet */
552                         if (!sbuf_pause(&client->sbuf))
553                                 disconnect_client(client, true, "pause failed");
554                 } else
555                         res = true;
556         } else {
557                 pause_client(client);
558                 res = false;
559         }
560         return res;
561 }
562
563 /* pick waiting client */
564 static bool reuse_on_release(PgSocket *server)
565 {
566         bool res = true;
567         PgPool *pool = server->pool;
568         PgSocket *client = first_socket(&pool->waiting_client_list);
569         if (client) {
570                 activate_client(client);
571
572                 /*
573                  * As the activate_client() does full read loop,
574                  * then it may happen that linked client close
575                  * couses server close.  Report it.
576                  */
577                 if (server->state == SV_FREE || server->state == SV_JUSTFREE)
578                         res = false;
579         }
580         return res;
581 }
582
583 /* send reset query */
584 static bool reset_on_release(PgSocket *server)
585 {
586         bool res;
587         
588         Assert(server->state == SV_TESTED);
589
590         slog_debug(server, "Resetting: %s", cf_server_reset_query);
591         SEND_generic(res, server, 'Q', "s", cf_server_reset_query);
592         if (!res)
593                 disconnect_server(server, false, "reset query failed");
594         return res;
595 }
596
597 static bool life_over(PgSocket *server)
598 {
599         PgPool *pool = server->pool;
600         usec_t lifetime_kill_gap = 0;
601         usec_t now = get_cached_time();
602         usec_t age = now - server->connect_time;
603         usec_t last_kill = now - pool->last_lifetime_disconnect;
604
605         if (age < cf_server_lifetime)
606                 return false;
607
608         if (pool->db->pool_size > 0)
609                 lifetime_kill_gap = cf_server_lifetime / pool->db->pool_size;
610
611         if (last_kill >= lifetime_kill_gap)
612                 return true;
613
614         return false;
615 }
616
617 /* connecting/active -> idle, unlink if needed */
618 bool release_server(PgSocket *server)
619 {
620         PgPool *pool = server->pool;
621         SocketState newstate = SV_IDLE;
622
623         Assert(server->ready);
624
625         /* remove from old list */
626         switch (server->state) {
627         case SV_ACTIVE:
628                 server->link->link = NULL;
629                 server->link = NULL;
630
631                 if (*cf_server_reset_query)
632                         /* notify reset is required */
633                         newstate = SV_TESTED;
634                 else if (cf_server_check_delay == 0 && *cf_server_check_query)
635                         /*
636                          * deprecated: before reset_query, the check_delay = 0
637                          * was used to get same effect.  This if() can be removed
638                          * after couple of releases.
639                          */
640                         newstate = SV_USED;
641         case SV_USED:
642         case SV_TESTED:
643                 break;
644         case SV_LOGIN:
645                 pool->last_connect_failed = 0;
646                 break;
647         default:
648                 fatal("bad server state in release_server (%d)", server->state);
649         }
650
651         /* enforce lifetime immidiately on release */
652         if (server->state != SV_LOGIN && life_over(server)) {
653                 disconnect_server(server, true, "server_lifetime");
654                 return false;
655         }
656
657         /* enforce close request */
658         if (server->close_needed) {
659                 disconnect_server(server, true, "close_needed");
660                 return false;
661         }
662
663         Assert(server->link == NULL);
664         slog_noise(server, "release_server: new state=%d", newstate);
665         change_server_state(server, newstate);
666
667         if (newstate == SV_IDLE)
668                 /* immediately process waiters, to give fair chance */
669                 return reuse_on_release(server);
670         else if (newstate == SV_TESTED)
671                 return reset_on_release(server);
672
673         return true;
674 }
675
676 /* drop server connection */
677 void disconnect_server(PgSocket *server, bool notify, const char *reason)
678 {
679         PgPool *pool = server->pool;
680         PgSocket *client = server->link;
681         static const uint8_t pkt_term[] = {'X', 0,0,0,4};
682         int send_term = 1;
683         usec_t now = get_cached_time();
684
685         if (cf_log_disconnections)
686                 slog_info(server, "closing because: %s (age=%llu)", reason,
687                           (now - server->connect_time) / USEC);
688
689         switch (server->state) {
690         case SV_ACTIVE:
691                 client = server->link;
692                 if (client) {
693                         client->link = NULL;
694                         server->link = NULL;
695                         disconnect_client(client, true, reason);
696                 }
697                 break;
698         case SV_TESTED:
699         case SV_USED:
700         case SV_IDLE:
701                 break;
702         case SV_LOGIN:
703                 /*
704                  * usually disconnect means problems in startup phase,
705                  * except when sending cancel packet
706                  */
707                 if (!server->ready)
708                         pool->last_connect_failed = 1;
709                 else
710                         send_term = 0;
711                 break;
712         default:
713                 fatal("disconnect_server: bad server state (%d)", server->state);
714         }
715
716         Assert(server->link == NULL);
717
718         /* notify server and close connection */
719         if (send_term && notify) {
720                 if (!sbuf_answer(&server->sbuf, pkt_term, sizeof(pkt_term)))
721                         /* ignore result */
722                         notify = false;
723         }
724
725         change_server_state(server, SV_JUSTFREE);
726         if (!sbuf_close(&server->sbuf))
727                 log_noise("sbuf_close failed, retry later");
728 }
729
730 /* drop client connection */
731 void disconnect_client(PgSocket *client, bool notify, const char *reason)
732 {
733         usec_t now = get_cached_time();
734
735         if (cf_log_disconnections)
736                 slog_info(client, "closing because: %s (age=%llu)", reason,
737                           (now - client->connect_time) / USEC);
738
739         switch (client->state) {
740         case CL_ACTIVE:
741                 if (client->link) {
742                         PgSocket *server = client->link;
743                         /* ->ready may be set before all is sent */
744                         if (server->ready && sbuf_is_empty(&server->sbuf)) {
745                                 /* retval does not matter here */
746                                 release_server(server);
747                         } else {
748                                 server->link = NULL;
749                                 client->link = NULL;
750                                 disconnect_server(server, true, "unclean server");
751                         }
752                 }
753         case CL_LOGIN:
754         case CL_WAITING:
755         case CL_CANCEL:
756                 break;
757         default:
758                 fatal("bad client state in disconnect_client: %d", client->state);
759         }
760
761         /* send reason to client */
762         if (notify && reason && client->state != CL_CANCEL) {
763                 /*
764                  * don't send Ready pkt here, or client won't notice
765                  * closed connection
766                  */
767                 send_pooler_error(client, false, reason);
768         }
769
770         change_client_state(client, CL_JUSTFREE);
771         if (!sbuf_close(&client->sbuf))
772                 log_noise("sbuf_close failed, retry later");
773 }
774
775 /* the pool needs new connection, if possible */
776 void launch_new_connection(PgPool *pool)
777 {
778         PgSocket *server;
779         int total;
780         const char *unix_dir = cf_unix_socket_dir;
781         bool res;
782
783         /* allow only small number of connection attempts at a time */
784         if (!statlist_empty(&pool->new_server_list)) {
785                 log_debug("launch_new_connection: already progress");
786                 return;
787         }
788
789         /* if server bounces, don't retry too fast */
790         if (pool->last_connect_failed) {
791                 usec_t now = get_cached_time();
792                 if (now - pool->last_connect_time < cf_server_login_retry) {
793                         log_debug("launch_new_connection: last failed, wait");
794                         return;
795                 }
796         }
797
798         /* is it allowed to add servers? */
799         total = pool_server_count(pool);
800         if (total >= pool->db->pool_size && pool->welcome_msg_ready) {
801                 /* should we use reserve pool? */
802                 if (cf_res_pool_timeout && pool->db->res_pool_size) {
803                         usec_t now = get_cached_time();
804                         PgSocket *c = first_socket(&pool->waiting_client_list);
805                         if (c && (now - c->request_time) >= cf_res_pool_timeout) {
806                                 if (total < pool->db->pool_size + pool->db->res_pool_size) {
807                                         log_debug("reserve_pool activated");
808                                         goto allow_new;
809                                 }
810                         }
811                 }
812                 log_debug("launch_new_connection: pool full (%d >= %d)",
813                                 total, pool->db->pool_size);
814                 return;
815         }
816
817 allow_new:
818         /* get free conn object */
819         server = obj_alloc(server_cache);
820         if (!server) {
821                 log_debug("launch_new_connection: no memory");
822                 return;
823         }
824
825         /* initialize it */
826         server->pool = pool;
827         server->auth_user = server->pool->user;
828         server->remote_addr = server->pool->db->addr;
829         server->connect_time = get_cached_time();
830         pool->last_connect_time = get_cached_time();
831         change_server_state(server, SV_LOGIN);
832
833         if (cf_log_connections)
834                 slog_info(server, "new connection to server");
835
836         /* override socket location if requested */
837         if (server->pool->db->unix_socket_dir[0])
838                 unix_dir = server->pool->db->unix_socket_dir;
839
840         /* start connecting */
841         res = sbuf_connect(&server->sbuf, &server->remote_addr, unix_dir,
842                            cf_server_connect_timeout / USEC);
843         if (!res)
844                 log_noise("failed to launch new connection");
845 }
846
847 /* new client connection attempt */
848 PgSocket * accept_client(int sock,
849                          const struct sockaddr_in *addr,
850                          bool is_unix)
851 {
852         bool res;
853         PgSocket *client;
854
855         /* get free PgSocket */
856         client = obj_alloc(client_cache);
857         if (!client) {
858                 log_warning("cannot allocate client struct");
859                 safe_close(sock);
860                 return NULL;
861         }
862
863         client->connect_time = client->request_time = get_cached_time();
864         client->query_start = 0;
865
866         fill_remote_addr(client, sock, is_unix);
867         fill_local_addr(client, sock, is_unix);
868
869         change_client_state(client, CL_LOGIN);
870
871         res = sbuf_accept(&client->sbuf, sock, is_unix);
872         if (!res) {
873                 if (cf_log_connections)
874                         slog_debug(client, "failed connection attempt");
875                 return NULL;
876         }
877
878         return client;
879 }
880
881 /* send cached parameters to client to pretend being server */
882 /* client managed to authenticate, send welcome msg and accept queries */
883 bool finish_client_login(PgSocket *client)
884 {
885         switch (client->state) {
886         case CL_LOGIN:
887                 change_client_state(client, CL_ACTIVE);
888         case CL_ACTIVE:
889                 break;
890         default:
891                 fatal("bad client state");
892         }
893
894         if (!welcome_client(client)) {
895                 log_debug("finish_client_login: no welcome message, pause");
896                 client->wait_for_welcome = 1;
897                 pause_client(client);
898                 if (cf_pause_mode == P_NONE)
899                         launch_new_connection(client->pool);
900                 return false;
901         }
902         client->wait_for_welcome = 0;
903
904         slog_debug(client, "logged in");
905
906         return true;
907 }
908
909 /* client->cancel_key has requested client key */
910 void accept_cancel_request(PgSocket *req)
911 {
912         List *pitem, *citem;
913         PgPool *pool;
914         PgSocket *server = NULL, *client, *main_client = NULL;
915
916         Assert(req->state == CL_LOGIN);
917
918         /* find real client this is for */
919         statlist_for_each(pitem, &pool_list) {
920                 pool = container_of(pitem, PgPool, head);
921                 statlist_for_each(citem, &pool->active_client_list) {
922                         client = container_of(citem, PgSocket, head);
923                         if (memcmp(client->cancel_key, req->cancel_key, 8) == 0) {
924                                 main_client = client;
925                                 goto found;
926                         }
927                 }
928         }
929 found:
930
931         /* wrong key */
932         if (!main_client) {
933                 disconnect_client(req, false, "failed cancel request");
934                 return;
935         }
936
937         /* not linked client, just drop it then */
938         if (!main_client->link) {
939                 bool res;
940
941                 /* let administrative cancel be handled elsewhere */
942                 if (main_client->pool->db->admin) {
943                         disconnect_client(req, false, "cancel request for console client");
944                         admin_handle_cancel(main_client);
945                         return;
946                 }
947
948                 disconnect_client(req, false, "cancel request for idle client");
949
950                 /* notify readiness */
951                 SEND_ReadyForQuery(res, main_client);
952                 if (!res)
953                         disconnect_client(main_client, true,
954                                           "ReadyForQuery for main_client failed");
955                 return;
956         }
957
958         /* drop the connection, if fails, retry later in justfree list */
959         if (!sbuf_close(&req->sbuf))
960                 log_noise("sbuf_close failed, retry later");
961
962         /* remember server key */
963         server = main_client->link;
964         memcpy(req->cancel_key, server->cancel_key, 8);
965
966         /* attach to target pool */
967         req->pool = pool;
968         change_client_state(req, CL_CANCEL);
969
970         /* need fresh connection */
971         launch_new_connection(pool);
972 }
973
974 void forward_cancel_request(PgSocket *server)
975 {
976         bool res;
977         PgSocket *req = first_socket(&server->pool->cancel_req_list);
978
979         Assert(req != NULL && req->state == CL_CANCEL);
980         Assert(server->state == SV_LOGIN);
981
982         SEND_CancelRequest(res, server, req->cancel_key);
983
984         change_client_state(req, CL_JUSTFREE);
985 }
986
987 bool use_client_socket(int fd, PgAddr *addr,
988                        const char *dbname, const char *username,
989                        uint64_t ckey, int oldfd, int linkfd,
990                        const char *client_enc, const char *std_string,
991                        const char *datestyle, const char *timezone)
992 {
993         PgSocket *client;
994         PktBuf tmp;
995
996         client = accept_client(fd, NULL, addr->is_unix);
997         if (client == NULL)
998                 return false;
999         client->suspended = 1;
1000
1001         if (!set_pool(client, dbname, username))
1002                 return false;
1003
1004         change_client_state(client, CL_ACTIVE);
1005
1006         /* store old cancel key */
1007         pktbuf_static(&tmp, client->cancel_key, 8);
1008         pktbuf_put_uint64(&tmp, ckey);
1009
1010         /* store old fds */
1011         client->tmp_sk_oldfd = oldfd;
1012         client->tmp_sk_linkfd = linkfd;
1013
1014         varcache_set(&client->vars, "client_encoding", client_enc);
1015         varcache_set(&client->vars, "standard_conforming_strings", std_string);
1016         varcache_set(&client->vars, "datestyle", datestyle);
1017         varcache_set(&client->vars, "timezone", timezone);
1018
1019         return true;
1020 }
1021
1022 bool use_server_socket(int fd, PgAddr *addr,
1023                        const char *dbname, const char *username,
1024                        uint64_t ckey, int oldfd, int linkfd,
1025                        const char *client_enc, const char *std_string,
1026                        const char *datestyle, const char *timezone)
1027 {
1028         PgDatabase *db = find_database(dbname);
1029         PgUser *user;
1030         PgPool *pool;
1031         PgSocket *server;
1032         PktBuf tmp;
1033         bool res;
1034         
1035         /* if the database not found, it's an auto database -> registering... */
1036         if (!db) {
1037                 db = register_auto_database(dbname);
1038                 if (!db)
1039                         return true;
1040         }
1041
1042         if (db->forced_user)
1043                 user = db->forced_user;
1044         else
1045                 user = find_user(username);
1046
1047         pool = get_pool(db, user);
1048         if (!pool)
1049                 return false;
1050
1051         server = obj_alloc(server_cache);
1052         if (!server)
1053                 return false;
1054
1055         res = sbuf_accept(&server->sbuf, fd, addr->is_unix);
1056         if (!res)
1057                 return false;
1058
1059         server->suspended = 1;
1060         server->pool = pool;
1061         server->auth_user = user;
1062         server->connect_time = server->request_time = get_cached_time();
1063         server->query_start = 0;
1064
1065         fill_remote_addr(server, fd, addr->is_unix);
1066         fill_local_addr(server, fd, addr->is_unix);
1067
1068         if (linkfd) {
1069                 server->ready = 0;
1070                 change_server_state(server, SV_ACTIVE);
1071         } else {
1072                 server->ready = 1;
1073                 change_server_state(server, SV_IDLE);
1074         }
1075
1076         /* store old cancel key */
1077         pktbuf_static(&tmp, server->cancel_key, 8);
1078         pktbuf_put_uint64(&tmp, ckey);
1079
1080         /* store old fds */
1081         server->tmp_sk_oldfd = oldfd;
1082         server->tmp_sk_linkfd = linkfd;
1083
1084         varcache_set(&server->vars, "client_encoding", client_enc);
1085         varcache_set(&server->vars, "standard_conforming_strings", std_string);
1086         varcache_set(&server->vars, "datestyle", datestyle);
1087         varcache_set(&server->vars, "timezone", timezone);
1088
1089         return true;
1090 }
1091
1092 void for_each_server(PgPool *pool, void (*func)(PgSocket *sk))
1093 {
1094         List *item;
1095
1096         statlist_for_each(item, &pool->idle_server_list)
1097                 func(container_of(item, PgSocket, head));
1098
1099         statlist_for_each(item, &pool->used_server_list)
1100                 func(container_of(item, PgSocket, head));
1101
1102         statlist_for_each(item, &pool->tested_server_list)
1103                 func(container_of(item, PgSocket, head));
1104
1105         statlist_for_each(item, &pool->active_server_list)
1106                 func(container_of(item, PgSocket, head));
1107
1108         statlist_for_each(item, &pool->new_server_list)
1109                 func(container_of(item, PgSocket, head));
1110 }
1111
1112 static void tag_dirty(PgSocket *sk)
1113 {
1114         sk->close_needed = 1;
1115 }
1116
1117 void tag_database_dirty(PgDatabase *db)
1118 {
1119         List *item;
1120         PgPool *pool;
1121
1122         statlist_for_each(item, &pool_list) {
1123                 pool = container_of(item, PgPool, head);
1124                 if (pool->db == db)
1125                         for_each_server(pool, tag_dirty);
1126         }
1127 }
1128
1129 /* move objects from justfree_* to free_* lists */
1130 void reuse_just_freed_objects(void)
1131 {
1132         List *tmp, *item;
1133         PgSocket *sk;
1134         bool close_works = true;
1135
1136         /*
1137          * event_del() may fail because of ENOMEM for event handlers
1138          * that need only changes sent to kernel on each loop.
1139          *
1140          * Keep open sbufs in justfree lists until successful.
1141          */
1142
1143         statlist_for_each_safe(item, &justfree_client_list, tmp) {
1144                 sk = container_of(item, PgSocket, head);
1145                 if (sbuf_is_closed(&sk->sbuf))
1146                         change_client_state(sk, CL_FREE);
1147                 else if (close_works)
1148                         close_works = sbuf_close(&sk->sbuf);
1149         }
1150         statlist_for_each_safe(item, &justfree_server_list, tmp) {
1151                 sk = container_of(item, PgSocket, head);
1152                 if (sbuf_is_closed(&sk->sbuf))
1153                         change_server_state(sk, SV_FREE);
1154                 else if (close_works)
1155                         close_works = sbuf_close(&sk->sbuf);
1156         }
1157 }
1158