2 * PgBouncer - Lightweight connection pooler for PostgreSQL.
4 * Copyright (c) 2007-2009 Marko Kreen, Skype Technologies OÜ
6 * Permission to use, copy, modify, and/or distribute this software for any
7 * purpose with or without fee is hereby granted, provided that the above
8 * copyright notice and this permission notice appear in all copies.
10 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
20 * Handling of pooler listening sockets
25 static int fd_net = 0;
26 static int fd_unix = 0;
28 static int fd_net_v6 = 0;
32 static struct event ev_net;
33 static struct event ev_unix;
35 /* if sockets are registered in libevent */
36 static bool reg_net = false;
37 static bool reg_unix = false;
39 /* should listening sockets be active or suspended? */
40 static bool pooler_active = false;
42 /* on accept() failure sleep 5 seconds */
43 static struct event ev_err;
44 static struct timeval err_timeout = {5, 0};
46 /* atexit() cleanup func */
47 static void cleanup_unix_socket(void)
51 /* avoid cleanup if exit() while suspended */
55 snprintf(fn, sizeof(fn), "%s/.s.PGSQL.%d",
56 cf_unix_socket_dir, cf_listen_port);
60 void get_pooler_fds(int *p_net, int *p_unix)
66 static int create_unix_socket(const char *socket_dir, int listen_port)
68 struct sockaddr_un un;
73 /* fill sockaddr struct */
74 memset(&un, 0, sizeof(un));
75 un.sun_family = AF_UNIX;
76 snprintf(un.sun_path, sizeof(un.sun_path),
77 "%s/.s.PGSQL.%d", socket_dir, listen_port);
79 /* check for lockfile */
80 snprintf(lockfile, sizeof(lockfile), "%s.lock", un.sun_path);
81 res = lstat(lockfile, &st);
83 fatal("unix port %d is in use", listen_port);
85 /* expect old bouncer gone */
89 sock = socket(PF_UNIX, SOCK_STREAM, 0);
91 fatal_perror("socket");
94 res = bind(sock, (const struct sockaddr *)&un, sizeof(un));
98 /* remove socket on shutdown */
99 atexit(cleanup_unix_socket);
101 /* set common options */
102 if (!tune_socket(sock, true))
103 fatal_perror("tune_socket");
105 /* finally, accept connections */
106 res = listen(sock, cf_listen_backlog);
108 fatal_perror("listen");
110 res = chmod(un.sun_path, 0777);
112 fatal_perror("chmod");
114 log_info("listening on unix:%s", un.sun_path);
120 * Notify pooler only when also data is arrived.
122 * optval specifies how long after connection attempt to wait for data.
124 * Related to tcp_synack_retries sysctl, default 5 (corresponds 180 secs).
126 * SO_ACCEPTFILTER needs to be set after listern(), maybe TCP_DEFER_ACCEPT too.
128 static void tune_accept(int sock, bool on)
130 const char *act = on ? "install" : "uninstall";
132 #ifdef TCP_DEFER_ACCEPT
133 int val = 45; /* fixme: proper value */
134 socklen_t vlen = sizeof(val);
135 res = getsockopt(sock, IPPROTO_TCP, TCP_DEFER_ACCEPT, &val, &vlen);
136 log_noise("old TCP_DEFER_ACCEPT on %d = %d", sock, val);
138 log_noise("%s TCP_DEFER_ACCEPT on %d", act, sock);
139 res = setsockopt(sock, IPPROTO_TCP, TCP_DEFER_ACCEPT, &val, sizeof(val));
142 #ifdef SO_ACCEPTFILTER
143 struct accept_filter_arg af, *afp = on ? &af : NULL;
144 socklen_t af_len = on ? sizeof(af) : 0;
145 memset(&af, 0, sizeof(af));
146 strcpy(af.af_name, "dataready");
147 log_noise("%s SO_ACCEPTFILTER on %d", act, sock);
148 res = setsockopt(sock, SOL_SOCKET, SO_ACCEPTFILTER, afp, af_len);
153 log_warning("tune_accept: %s TCP_DEFER_ACCEPT/SO_ACCEPTFILTER: %s",
154 act, strerror(errno));
157 void pooler_tune_accept(bool on)
160 tune_accept(fd_net, on);
163 static int create_net_socket(const char *listen_addr, int listen_port)
166 struct sockaddr_in sa4;
167 struct sockaddr_in6 sa6;
168 struct sockaddr * sa;
174 /* parse address as IPv4 */
175 memset(&sa4, 0, sizeof(sa));
176 sa = (struct sockaddr *)&sa4;
177 sa_size = sizeof(sa4);
178 sa4.sin_family = AF_INET;
179 sa4.sin_port = htons(cf_listen_port);
180 if (strcmp(listen_addr, "*") == 0) {
181 sa4.sin_addr.s_addr = htonl(INADDR_ANY);
183 sa4.sin_addr.s_addr = inet_addr(listen_addr);
185 if (sa4.sin_addr.s_addr == INADDR_NONE) {
186 /* IPv4 addr not foundm re-parse address as IPv6 */
187 log_info("trying to parse %s as IPv6 address", listen_addr);
188 sa = (struct sockaddr *)&sa6;
189 sa_size = sizeof(sa6);
190 memset(&sa6, 0, sizeof(sa6));
191 sa6.sin6_family = AF_INET6;
192 sa6.sin6_port = htons(cf_listen_port);
194 if (inet_pton(AF_INET6, listen_addr, (void *) sa6.sin6_addr.s6_addr) <= 0)
195 fatal("cannot parse addr: '%s'", listen_addr);
198 /* create socket of right type */ //NB! turn if() into x?y:z
199 if (sa6.sin6_family == AF_INET6){
200 sock = socket(AF_INET6, SOCK_STREAM, 0);
201 log_info("created AF_INET6 socket");
203 sock = socket(AF_INET, SOCK_STREAM, 0);
204 log_info("created AF_INET socket");
207 fatal_perror("socket");
209 /* relaxed binding */
211 res = setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val));
213 fatal_perror("setsockopt");
215 /* bind to address */
216 res = bind(sock, sa, sa_size);
218 fatal_perror("bind");
220 /* set common options */
221 if (!tune_socket(sock, false))
222 fatal_perror("tune_socket");
224 /* make it accept connections */
225 res = listen(sock, cf_listen_backlog);
227 fatal_perror("listen");
229 tune_accept(sock, cf_tcp_defer_accept);
231 log_info("listening on %s:%d", cf_listen_addr, cf_listen_port);
236 static void err_wait_func(int sock, short flags, void *arg)
238 if (cf_pause_mode != P_SUSPEND)
242 static const char *addrpair(const PgAddr *src, const PgAddr *dst)
244 static char ip1buf[INET6_ADDRSTRLEN], ip2buf[INET6_ADDRSTRLEN],
245 buf[INET6_ADDRSTRLEN+INET6_ADDRSTRLEN+128];
246 const char *ip1, *ip2;
247 if (pga_is_unix(src))
250 ip1 = pga_ntop(src, ip1buf, sizeof(ip1buf));
252 ip1 = strerror(errno);
253 ip2 = pga_ntop(src, ip2buf, sizeof(ip2buf));
255 ip2 = strerror(errno);
256 snprintf(buf, sizeof(buf), "%s:%d -> %s:%d",
257 ip1, pga_port(src), ip2, pga_port(dst));
261 static const char *conninfo(const PgSocket *sk)
263 if (is_server_socket(sk))
264 return addrpair(&sk->local_addr, &sk->remote_addr);
266 return addrpair(&sk->remote_addr, &sk->local_addr);
269 /* got new connection, associate it with client struct */
270 static void pool_accept(int sock, short flags, void *is_unix)
275 struct sockaddr_in in;
276 struct sockaddr_un un;
279 socklen_t len = sizeof(addr);
281 if(!(flags & EV_READ)) {
282 log_warning("No EV_READ in pool_accept");
287 fd = safe_accept(sock, &addr.sa, &len);
291 else if (errno == ECONNABORTED)
295 * probably fd limit, pointless to try often
296 * wait a bit, hope that admin resolves somehow
298 log_error("accept() failed: %s", strerror(errno));
299 evtimer_set(&ev_err, err_wait_func, NULL);
300 safe_evtimer_add(&ev_err, &err_timeout);
305 log_noise("new fd from accept=%d", fd);
310 log_noise("getuid(): %d", (int)getuid());
311 if (getpeereid(fd, &uid, &gid) >= 0)
312 log_noise("unix peer uid: %d", (int)uid);
314 log_warning("unix peer uid failed: %s", strerror(errno));
316 client = accept_client(fd, NULL, true);
318 client = accept_client(fd, &addr.in, false);
322 slog_debug(client, "P: got connection: %s", conninfo(client));
325 * there may be several clients waiting,
326 * avoid context switch by looping
331 bool use_pooler_socket(int sock, bool is_unix)
333 if (!tune_socket(sock, is_unix))
343 void suspend_pooler(void)
345 pooler_active = false;
347 if (fd_net && reg_net) {
348 if (event_del(&ev_net) < 0) {
349 log_warning("suspend_pooler, event_del: %s", strerror(errno));
354 if (fd_unix && reg_unix) {
355 if (event_del(&ev_unix) < 0) {
356 log_warning("suspend_pooler, event_del: %s", strerror(errno));
363 void resume_pooler(void)
365 pooler_active = true;
367 if (fd_unix && !reg_unix) {
368 event_set(&ev_unix, fd_unix, EV_READ | EV_PERSIST, pool_accept, "1");
369 if (event_add(&ev_unix, NULL) < 0) {
370 log_warning("event_add failed: %s", strerror(errno));
376 if (fd_net && !reg_net) {
377 event_set(&ev_net, fd_net, EV_READ | EV_PERSIST, pool_accept, NULL);
378 if (event_add(&ev_net, NULL) < 0) {
379 log_warning("event_add failed: %s", strerror(errno));
385 /* listen on socket - should happen after all other initializations */
386 void pooler_setup(void)
388 if (cf_listen_addr && !fd_net)
389 fd_net = create_net_socket(cf_listen_addr, cf_listen_port);
391 if (cf_unix_socket_dir && *cf_unix_socket_dir && !fd_unix)
392 fd_unix = create_unix_socket(cf_unix_socket_dir, cf_listen_port);
394 if (!fd_net && !fd_unix)
395 fatal("nowhere to listen on");
400 /* retry previously failed suspend_pooler() / resume_pooler() */
401 void per_loop_pooler_maint(void)
404 if ((fd_unix && !reg_unix) || (fd_net && !reg_net))
407 if ((fd_unix && reg_unix) || (fd_net && reg_net))