2 * PgBouncer - Lightweight connection pooler for PostgreSQL.
4 * Copyright (c) 2007-2009 Marko Kreen, Skype Technologies OÜ
6 * Permission to use, copy, modify, and/or distribute this software for any
7 * purpose with or without fee is hereby granted, provided that the above
8 * copyright notice and this permission notice appear in all copies.
10 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
20 * Handling of pooler listening sockets
25 #include <usual/netdb.h>
35 static STATLIST(sock_list);
37 /* hints for getaddrinfo(listen_addr) */
38 static const struct addrinfo hints = {
39 .ai_family = AF_UNSPEC,
40 .ai_socktype = SOCK_STREAM,
41 .ai_protocol = IPPROTO_TCP,
42 .ai_flags = AI_PASSIVE,
45 /* should listening sockets be active or suspended? */
46 static bool need_active = false;
47 /* is it actually active or suspended? */
48 static bool pooler_active = false;
50 /* on accept() failure sleep 5 seconds */
51 static struct event ev_err;
52 static struct timeval err_timeout = {5, 0};
54 static void tune_accept(int sock, bool on);
56 /* atexit() cleanup func */
57 static void cleanup_sockets(void)
59 struct ListenSocket *ls;
62 /* avoid cleanup if exit() while suspended */
63 if (cf_pause_mode == P_SUSPEND)
66 while ((el = statlist_pop(&sock_list)) != NULL) {
67 ls = container_of(el, struct ListenSocket, node);
72 if (pga_is_unix(&ls->addr)) {
73 char buf[sizeof(struct sockaddr_un) + 20];
74 snprintf(buf, sizeof(buf), "%s/.s.PGSQL.%d", cf_unix_socket_dir, cf_listen_port);
77 statlist_remove(&sock_list, &ls->node);
83 * initialize another listening socket.
85 static bool add_listen(int af, const struct sockaddr *sa, int salen)
87 struct ListenSocket *ls;
92 log_debug("add_listen: %s", sa2str(sa, buf, sizeof(buf)));
96 sock = socket(af, SOCK_STREAM, 0);
100 /* SO_REUSEADDR behaviour it default in WIN32. */
102 /* relaxed binding */
105 errpos = "setsockopt";
106 res = setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val));
113 /* avoid ipv6 socket's attempt to takeover ipv4 port */
114 if (af == AF_INET6) {
116 errpos = "setsockopt/IPV6_V6ONLY";
117 res = setsockopt(sock, IPPROTO_IPV6, IPV6_V6ONLY, &val, sizeof(val));
125 res = bind(sock, sa, salen);
129 /* set common options */
130 errpos = "tune_socket";
131 if (!tune_socket(sock, (af == AF_UNIX)))
134 /* finally, accept connections */
136 res = listen(sock, cf_listen_backlog);
141 ls = calloc(1, sizeof(*ls));
145 list_init(&ls->node);
147 if (sa->sa_family == AF_UNIX) {
148 pga_set(&ls->addr, AF_UNIX, cf_listen_port);
150 pga_copy(&ls->addr, sa);
154 struct sockaddr_un *un = (struct sockaddr_un *)sa;
155 change_file_mode(un->sun_path, cf_unix_socket_mode, NULL, cf_unix_socket_group);
157 tune_accept(sock, cf_tcp_defer_accept);
160 log_info("listening on %s", sa2str(sa, buf, sizeof(buf)));
161 statlist_append(&sock_list, &ls->node);
165 log_warning("cannot listen on %s: %s(): %s",
166 sa2str(sa, buf, sizeof(buf)),
167 errpos, strerror(errno));
173 static void create_unix_socket(const char *socket_dir, int listen_port)
175 struct sockaddr_un un;
177 char lockfile[sizeof(struct sockaddr_un) + 10];
180 /* fill sockaddr struct */
181 memset(&un, 0, sizeof(un));
182 un.sun_family = AF_UNIX;
183 snprintf(un.sun_path, sizeof(un.sun_path),
184 "%s/.s.PGSQL.%d", socket_dir, listen_port);
186 /* check for lockfile */
187 snprintf(lockfile, sizeof(lockfile), "%s.lock", un.sun_path);
188 res = lstat(lockfile, &st);
190 fatal("unix port %d is in use", listen_port);
192 /* expect old bouncer gone */
195 add_listen(AF_UNIX, (const struct sockaddr *)&un, sizeof(un));
199 * Notify pooler only when also data is arrived.
201 * optval specifies how long after connection attempt to wait for data.
203 * Related to tcp_synack_retries sysctl, default 5 (corresponds 180 secs).
205 * SO_ACCEPTFILTER needs to be set after listen(), maybe TCP_DEFER_ACCEPT too.
207 static void tune_accept(int sock, bool on)
209 const char *act = on ? "install" : "uninstall";
211 #ifdef TCP_DEFER_ACCEPT
212 int val = 45; /* FIXME: proper value */
213 socklen_t vlen = sizeof(val);
214 res = getsockopt(sock, IPPROTO_TCP, TCP_DEFER_ACCEPT, &val, &vlen);
215 log_noise("old TCP_DEFER_ACCEPT on %d = %d", sock, val);
217 log_noise("%s TCP_DEFER_ACCEPT on %d", act, sock);
218 res = setsockopt(sock, IPPROTO_TCP, TCP_DEFER_ACCEPT, &val, sizeof(val));
221 #ifdef SO_ACCEPTFILTER
222 struct accept_filter_arg af, *afp = on ? &af : NULL;
223 socklen_t af_len = on ? sizeof(af) : 0;
224 memset(&af, 0, sizeof(af));
225 strcpy(af.af_name, "dataready");
226 log_noise("%s SO_ACCEPTFILTER on %d", act, sock);
227 res = setsockopt(sock, SOL_SOCKET, SO_ACCEPTFILTER, afp, af_len);
232 log_warning("tune_accept: %s TCP_DEFER_ACCEPT/SO_ACCEPTFILTER: %s",
233 act, strerror(errno));
236 void pooler_tune_accept(bool on)
239 struct ListenSocket *ls;
240 statlist_for_each(el, &sock_list) {
241 ls = container_of(el, struct ListenSocket, node);
242 if (!pga_is_unix(&ls->addr))
243 tune_accept(ls->fd, on);
247 static void err_wait_func(int sock, short flags, void *arg)
249 if (cf_pause_mode != P_SUSPEND)
253 static const char *addrpair(const PgAddr *src, const PgAddr *dst)
255 static char ip1buf[PGADDR_BUF], ip2buf[PGADDR_BUF],
256 buf[2*PGADDR_BUF + 16];
257 const char *ip1, *ip2;
258 if (pga_is_unix(src))
261 ip1 = pga_ntop(src, ip1buf, sizeof(ip1buf));
262 ip2 = pga_ntop(src, ip2buf, sizeof(ip2buf));
263 snprintf(buf, sizeof(buf), "%s:%d -> %s:%d",
264 ip1, pga_port(src), ip2, pga_port(dst));
268 static const char *conninfo(const PgSocket *sk)
270 if (is_server_socket(sk)) {
271 return addrpair(&sk->local_addr, &sk->remote_addr);
273 return addrpair(&sk->remote_addr, &sk->local_addr);
277 /* got new connection, associate it with client struct */
278 static void pool_accept(int sock, short flags, void *arg)
280 struct ListenSocket *ls = arg;
284 struct sockaddr_in in;
285 struct sockaddr_in6 in6;
286 struct sockaddr_un un;
289 socklen_t len = sizeof(raddr);
290 bool is_unix = pga_is_unix(&ls->addr);
292 if(!(flags & EV_READ)) {
293 log_warning("no EV_READ in pool_accept");
298 fd = safe_accept(sock, &raddr.sa, &len);
302 else if (errno == ECONNABORTED)
306 * probably fd limit, pointless to try often
307 * wait a bit, hope that admin resolves somehow
309 log_error("accept() failed: %s", strerror(errno));
310 evtimer_set(&ev_err, err_wait_func, NULL);
311 safe_evtimer_add(&ev_err, &err_timeout);
316 log_noise("new fd from accept=%d", fd);
318 client = accept_client(fd, true);
320 client = accept_client(fd, false);
324 slog_debug(client, "P: got connection: %s", conninfo(client));
327 * there may be several clients waiting,
328 * avoid context switch by looping
333 bool use_pooler_socket(int sock, bool is_unix)
335 struct ListenSocket *ls;
337 char buf[PGADDR_BUF];
339 if (!tune_socket(sock, is_unix))
342 ls = calloc(1, sizeof(*ls));
347 pga_set(&ls->addr, AF_UNIX, cf_listen_port);
349 struct sockaddr_storage ss;
350 socklen_t len = sizeof(ss);
351 res = getsockname(sock, (struct sockaddr *)&ss, &len);
353 log_error("getsockname failed");
357 pga_copy(&ls->addr, (struct sockaddr *)&ss);
359 log_info("got pooler socket: %s", pga_str(&ls->addr, buf, sizeof(buf)));
360 statlist_append(&sock_list, &ls->node);
364 void suspend_pooler(void)
367 struct ListenSocket *ls;
370 statlist_for_each(el, &sock_list) {
371 ls = container_of(el, struct ListenSocket, node);
374 if (event_del(&ls->ev) < 0) {
375 log_warning("suspend_pooler, event_del: %s", strerror(errno));
380 pooler_active = false;
383 void resume_pooler(void)
386 struct ListenSocket *ls;
389 statlist_for_each(el, &sock_list) {
390 ls = container_of(el, struct ListenSocket, node);
393 event_set(&ls->ev, ls->fd, EV_READ | EV_PERSIST, pool_accept, ls);
394 if (event_add(&ls->ev, NULL) < 0) {
395 log_warning("event_add failed: %s", strerror(errno));
400 pooler_active = true;
403 /* retry previously failed suspend_pooler() / resume_pooler() */
404 void per_loop_pooler_maint(void)
406 if (need_active && !pooler_active)
408 else if (!need_active && pooler_active)
412 static bool parse_addr(void *arg, const char *addr)
416 struct addrinfo *ai, *gaires = NULL;
421 if (strcmp(addr, "*") == 0)
423 snprintf(service, sizeof(service), "%d", cf_listen_port);
425 res = getaddrinfo(addr, service, &hints, &gaires);
427 fatal("getaddrinfo('%s', '%d') = %s [%d]", addr ? addr : "*",
428 cf_listen_port, gai_strerror(res), res);
431 for (ai = gaires; ai; ai = ai->ai_next) {
432 ok = add_listen(ai->ai_family, ai->ai_addr, ai->ai_addrlen);
433 /* it's unclear whether all or only first result should be used */
438 freeaddrinfo(gaires);
442 /* listen on socket - should happen after all other initializations */
443 void pooler_setup(void)
446 static int init_done = 0;
449 /* remove socket on shutdown */
450 atexit(cleanup_sockets);
454 ok = parse_word_list(cf_listen_addr, parse_addr, NULL);
456 fatal("failed to parse listen_addr list: %s", cf_listen_addr);
458 if (cf_unix_socket_dir && *cf_unix_socket_dir)
459 create_unix_socket(cf_unix_socket_dir, cf_listen_port);
461 if (!statlist_count(&sock_list))
462 fatal("nowhere to listen on");
467 bool for_each_pooler_fd(pooler_cb cbfunc, void *arg)
470 struct ListenSocket *ls;
473 statlist_for_each(el, &sock_list) {
474 ls = container_of(el, struct ListenSocket, node);
475 ok = cbfunc(arg, ls->fd, &ls->addr);