]> granicus.if.org Git - pgbouncer/blob - src/pooler.c
277d0f216624924bac9fce6ea69ede5cb2a422e4
[pgbouncer] / src / pooler.c
1 /*
2  * PgBouncer - Lightweight connection pooler for PostgreSQL.
3  *
4  * Copyright (c) 2007-2009  Marko Kreen, Skype Technologies OÜ
5  *
6  * Permission to use, copy, modify, and/or distribute this software for any
7  * purpose with or without fee is hereby granted, provided that the above
8  * copyright notice and this permission notice appear in all copies.
9  *
10  * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11  * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12  * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13  * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14  * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15  * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16  * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17  */
18
19 /*
20  * Handling of pooler listening sockets
21  */
22
23 #include "bouncer.h"
24
25 #include <usual/netdb.h>
26
27 struct ListenSocket {
28         struct List node;
29         int fd;
30         bool active;
31         struct event ev;
32         PgAddr addr;
33 };
34
35 static STATLIST(sock_list);
36
37 /* hints for getaddrinfo(listen_addr) */
38 static const struct addrinfo hints = {
39         .ai_family = AF_UNSPEC,
40         .ai_socktype = SOCK_STREAM,
41         .ai_protocol = IPPROTO_TCP,
42         .ai_flags = AI_PASSIVE,
43 };
44
45 /* should listening sockets be active or suspended? */
46 static bool need_active = false;
47 /* is it actually active or suspended? */
48 static bool pooler_active = false;
49
50 /* on accept() failure sleep 5 seconds */
51 static struct event ev_err;
52 static struct timeval err_timeout = {5, 0};
53
54 static void tune_accept(int sock, bool on);
55
56 /* atexit() cleanup func */
57 static void cleanup_sockets(void)
58 {
59         struct ListenSocket *ls;
60         struct List *el;
61
62         /* avoid cleanup if exit() while suspended */
63         if (cf_pause_mode == P_SUSPEND)
64                 return;
65
66         while ((el = statlist_pop(&sock_list)) != NULL) {
67                 ls = container_of(el, struct ListenSocket, node);
68                 if (ls->fd > 0) {
69                         safe_close(ls->fd);
70                         ls->fd = 0;
71                 }
72                 if (pga_is_unix(&ls->addr)) {
73                         char buf[sizeof(struct sockaddr_un) + 20];
74                         snprintf(buf, sizeof(buf), "%s/.s.PGSQL.%d", cf_unix_socket_dir, cf_listen_port);
75                         unlink(buf);
76                 }
77                 statlist_remove(&sock_list, &ls->node);
78                 free(ls);
79         }
80 }
81
82 /*
83  * initialize another listening socket.
84  */
85 static bool add_listen(int af, const struct sockaddr *sa, int salen)
86 {
87         struct ListenSocket *ls;
88         int sock, res;
89         char buf[128];
90         const char *errpos;
91
92         log_debug("add_listen: %s", sa2str(sa, buf, sizeof(buf)));
93
94         /* create socket */
95         errpos = "socket";
96         sock = socket(af, SOCK_STREAM, 0);
97         if (sock < 0)
98                 goto failed;
99
100         /* SO_REUSEADDR behaviour it default in WIN32.  */
101 #ifndef WIN32
102         /* relaxed binding */
103         if (af != AF_UNIX) {
104                 int val = 1;
105                 errpos = "setsockopt";
106                 res = setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val));
107                 if (res < 0)
108                         goto failed;
109         }
110 #endif
111
112 #ifdef IPV6_V6ONLY
113         /* avoid ipv6 socket's attempt to takeover ipv4 port */
114         if (af == AF_INET6) {
115                 int val = 1;
116                 errpos = "setsockopt/IPV6_V6ONLY";
117                 res = setsockopt(sock, IPPROTO_IPV6, IPV6_V6ONLY, &val, sizeof(val));
118                 if (res < 0)
119                         goto failed;
120         }
121 #endif
122
123         /* bind it */
124         errpos = "bind";
125         res = bind(sock, sa, salen);
126         if (res < 0)
127                 goto failed;
128
129         /* set common options */
130         errpos = "tune_socket";
131         if (!tune_socket(sock, (af == AF_UNIX)))
132                 goto failed;
133
134         /* finally, accept connections */
135         errpos = "listen";
136         res = listen(sock, cf_listen_backlog);
137         if (res < 0)
138                 goto failed;
139
140         errpos = "calloc";
141         ls = calloc(1, sizeof(*ls));
142         if (!ls)
143                 goto failed;
144
145         list_init(&ls->node);
146         ls->fd = sock;
147         if (sa->sa_family == AF_UNIX) {
148                 pga_set(&ls->addr, AF_UNIX, cf_listen_port);
149         } else {
150                 pga_copy(&ls->addr, sa);
151         }
152
153         if (af == AF_UNIX) {
154                 struct sockaddr_un *un = (struct sockaddr_un *)sa;
155                 change_file_mode(un->sun_path, cf_unix_socket_mode, NULL, cf_unix_socket_group);
156         } else {
157                 tune_accept(sock, cf_tcp_defer_accept);
158         }
159
160         log_info("listening on %s", sa2str(sa, buf, sizeof(buf)));
161         statlist_append(&sock_list, &ls->node);
162         return true;
163
164 failed:
165         log_warning("cannot listen on %s: %s(): %s",
166                     sa2str(sa, buf, sizeof(buf)),
167                     errpos, strerror(errno));
168         if (sock >= 0)
169                 safe_close(sock);
170         return false;
171 }
172
173 static void create_unix_socket(const char *socket_dir, int listen_port)
174 {
175         struct sockaddr_un un;
176         int res;
177         char lockfile[sizeof(struct sockaddr_un) + 10];
178         struct stat st;
179
180         /* fill sockaddr struct */
181         memset(&un, 0, sizeof(un));
182         un.sun_family = AF_UNIX;
183         snprintf(un.sun_path, sizeof(un.sun_path),
184                 "%s/.s.PGSQL.%d", socket_dir, listen_port);
185
186         /* check for lockfile */
187         snprintf(lockfile, sizeof(lockfile), "%s.lock", un.sun_path);
188         res = lstat(lockfile, &st);
189         if (res == 0)
190                 fatal("unix port %d is in use", listen_port);
191
192         /* expect old bouncer gone */
193         unlink(un.sun_path);
194
195         add_listen(AF_UNIX, (const struct sockaddr *)&un, sizeof(un));
196 }
197
198 /*
199  * Notify pooler only when also data is arrived.
200  *
201  * optval specifies how long after connection attempt to wait for data.
202  *
203  * Related to tcp_synack_retries sysctl, default 5 (corresponds 180 secs).
204  *
205  * SO_ACCEPTFILTER needs to be set after listen(), maybe TCP_DEFER_ACCEPT too.
206  */
207 static void tune_accept(int sock, bool on)
208 {
209         const char *act = on ? "install" : "uninstall";
210         int res = 0;
211 #ifdef TCP_DEFER_ACCEPT
212         int val = 45; /* FIXME: proper value */
213         socklen_t vlen = sizeof(val);
214         res = getsockopt(sock, IPPROTO_TCP, TCP_DEFER_ACCEPT, &val, &vlen);
215         log_noise("old TCP_DEFER_ACCEPT on %d = %d", sock, val);
216         val = on ? 1 : 0;
217         log_noise("%s TCP_DEFER_ACCEPT on %d", act, sock);
218         res = setsockopt(sock, IPPROTO_TCP, TCP_DEFER_ACCEPT, &val, sizeof(val));
219 #else
220 #if 0
221 #ifdef SO_ACCEPTFILTER
222         struct accept_filter_arg af, *afp = on ? &af : NULL;
223         socklen_t af_len = on ? sizeof(af) : 0;
224         memset(&af, 0, sizeof(af));
225         strcpy(af.af_name, "dataready");
226         log_noise("%s SO_ACCEPTFILTER on %d", act, sock);
227         res = setsockopt(sock, SOL_SOCKET, SO_ACCEPTFILTER, afp, af_len);
228 #endif
229 #endif
230 #endif
231         if (res < 0)
232                 log_warning("tune_accept: %s TCP_DEFER_ACCEPT/SO_ACCEPTFILTER: %s",
233                             act, strerror(errno));
234 }
235
236 void pooler_tune_accept(bool on)
237 {
238         struct List *el;
239         struct ListenSocket *ls;
240         statlist_for_each(el, &sock_list) {
241                 ls = container_of(el, struct ListenSocket, node);
242                 if (!pga_is_unix(&ls->addr))
243                         tune_accept(ls->fd, on);
244         }
245 }
246
247 static void err_wait_func(int sock, short flags, void *arg)
248 {
249         if (cf_pause_mode != P_SUSPEND)
250                 resume_pooler();
251 }
252
253 static const char *addrpair(const PgAddr *src, const PgAddr *dst)
254 {
255         static char ip1buf[PGADDR_BUF], ip2buf[PGADDR_BUF],
256                     buf[2*PGADDR_BUF + 16];
257         const char *ip1, *ip2;
258         if (pga_is_unix(src))
259                 return "unix->unix";
260
261         ip1 = pga_ntop(src, ip1buf, sizeof(ip1buf));
262         ip2 = pga_ntop(src, ip2buf, sizeof(ip2buf));
263         snprintf(buf, sizeof(buf), "%s:%d -> %s:%d",
264                  ip1, pga_port(src), ip2, pga_port(dst));
265         return buf;
266 }
267
268 static const char *conninfo(const PgSocket *sk)
269 {
270         if (is_server_socket(sk)) {
271                 return addrpair(&sk->local_addr, &sk->remote_addr);
272         } else {
273                 return addrpair(&sk->remote_addr, &sk->local_addr);
274         }
275 }
276
277 /* got new connection, associate it with client struct */
278 static void pool_accept(int sock, short flags, void *arg)
279 {
280         struct ListenSocket *ls = arg;
281         int fd;
282         PgSocket *client;
283         union {
284                 struct sockaddr_in in;
285                 struct sockaddr_in6 in6;
286                 struct sockaddr_un un;
287                 struct sockaddr sa;
288         } raddr;
289         socklen_t len = sizeof(raddr);
290         bool is_unix = pga_is_unix(&ls->addr);
291
292         if(!(flags & EV_READ)) {
293                 log_warning("no EV_READ in pool_accept");
294                 return;
295         }
296 loop:
297         /* get fd */
298         fd = safe_accept(sock, &raddr.sa, &len);
299         if (fd < 0) {
300                 if (errno == EAGAIN)
301                         return;
302                 else if (errno == ECONNABORTED)
303                         return;
304
305                 /*
306                  * probably fd limit, pointless to try often
307                  * wait a bit, hope that admin resolves somehow
308                  */
309                 log_error("accept() failed: %s", strerror(errno));
310                 evtimer_set(&ev_err, err_wait_func, NULL);
311                 safe_evtimer_add(&ev_err, &err_timeout);
312                 suspend_pooler();
313                 return;
314         }
315
316         log_noise("new fd from accept=%d", fd);
317         if (is_unix) {
318                 client = accept_client(fd, true);
319         } else {
320                 client = accept_client(fd, false);
321         }
322
323         if (client)
324                 slog_debug(client, "P: got connection: %s", conninfo(client));
325
326         /*
327          * there may be several clients waiting,
328          * avoid context switch by looping
329          */
330         goto loop;
331 }
332
333 bool use_pooler_socket(int sock, bool is_unix)
334 {
335         struct ListenSocket *ls;
336         int res;
337         char buf[PGADDR_BUF];
338
339         if (!tune_socket(sock, is_unix))
340                 return false;
341
342         ls = calloc(1, sizeof(*ls));
343         if (!ls)
344                 return false;
345         ls->fd = sock;
346         if (is_unix) {
347                 pga_set(&ls->addr, AF_UNIX, cf_listen_port);
348         } else {
349                 struct sockaddr_storage ss;
350                 socklen_t len = sizeof(ss);
351                 res = getsockname(sock, (struct sockaddr *)&ss, &len);
352                 if (res < 0) {
353                         log_error("getsockname failed");
354                         free(ls);
355                         return false;
356                 }
357                 pga_copy(&ls->addr, (struct sockaddr *)&ss);
358         }
359         log_info("got pooler socket: %s", pga_str(&ls->addr, buf, sizeof(buf)));
360         statlist_append(&sock_list, &ls->node);
361         return true;
362 }
363
364 void suspend_pooler(void)
365 {
366         struct List *el;
367         struct ListenSocket *ls;
368
369         need_active = false;
370         statlist_for_each(el, &sock_list) {
371                 ls = container_of(el, struct ListenSocket, node);
372                 if (!ls->active)
373                         continue;
374                 if (event_del(&ls->ev) < 0) {
375                         log_warning("suspend_pooler, event_del: %s", strerror(errno));
376                         return;
377                 }
378                 ls->active = false;
379         }
380         pooler_active = false;
381 }
382
383 void resume_pooler(void)
384 {
385         struct List *el;
386         struct ListenSocket *ls;
387
388         need_active = true;
389         statlist_for_each(el, &sock_list) {
390                 ls = container_of(el, struct ListenSocket, node);
391                 if (ls->active)
392                         continue;
393                 event_set(&ls->ev, ls->fd, EV_READ | EV_PERSIST, pool_accept, ls);
394                 if (event_add(&ls->ev, NULL) < 0) {
395                         log_warning("event_add failed: %s", strerror(errno));
396                         return;
397                 }
398                 ls->active = true;
399         }
400         pooler_active = true;
401 }
402
403 /* retry previously failed suspend_pooler() / resume_pooler() */
404 void per_loop_pooler_maint(void)
405 {
406         if (need_active && !pooler_active)
407                 resume_pooler();
408         else if (!need_active && pooler_active)
409                 suspend_pooler();
410 }
411
412 static bool parse_addr(void *arg, const char *addr)
413 {
414         int res;
415         char service[64];
416         struct addrinfo *ai, *gaires = NULL;
417         bool ok;
418
419         if (!*addr)
420                 return true;
421         if (strcmp(addr, "*") == 0)
422                 addr = NULL;
423         snprintf(service, sizeof(service), "%d", cf_listen_port);
424
425         res = getaddrinfo(addr, service, &hints, &gaires);
426         if (res != 0) {
427                 fatal("getaddrinfo('%s', '%d') = %s [%d]", addr ? addr : "*",
428                       cf_listen_port, gai_strerror(res), res);
429         }
430
431         for (ai = gaires; ai; ai = ai->ai_next) {
432                 ok = add_listen(ai->ai_family, ai->ai_addr, ai->ai_addrlen);
433                 /* it's unclear whether all or only first result should be used */
434                 if (0 && ok)
435                         break;
436         }
437
438         freeaddrinfo(gaires);
439         return true;
440 }
441
442 /* listen on socket - should happen after all other initializations */
443 void pooler_setup(void)
444 {
445         bool ok;
446         static int init_done = 0;
447
448         if (!init_done) {
449                 /* remove socket on shutdown */
450                 atexit(cleanup_sockets);
451                 init_done = 1;
452         }
453
454         ok = parse_word_list(cf_listen_addr, parse_addr, NULL);
455         if (!ok)
456                 fatal("failed to parse listen_addr list: %s", cf_listen_addr);
457
458         if (cf_unix_socket_dir && *cf_unix_socket_dir)
459                 create_unix_socket(cf_unix_socket_dir, cf_listen_port);
460
461         if (!statlist_count(&sock_list))
462                 fatal("nowhere to listen on");
463
464         resume_pooler();
465 }
466
467 bool for_each_pooler_fd(pooler_cb cbfunc, void *arg)
468 {
469         struct List *el;
470         struct ListenSocket *ls;
471         bool ok;
472
473         statlist_for_each(el, &sock_list) {
474                 ls = container_of(el, struct ListenSocket, node);
475                 ok = cbfunc(arg, ls->fd, &ls->addr);
476                 if (!ok)
477                         return false;
478         }
479         return true;
480 }