]> granicus.if.org Git - pgbouncer/blob - src/pooler.c
IPv6 support
[pgbouncer] / src / pooler.c
1 /*
2  * PgBouncer - Lightweight connection pooler for PostgreSQL.
3  * 
4  * Copyright (c) 2007-2009  Marko Kreen, Skype Technologies OÜ
5  * 
6  * Permission to use, copy, modify, and/or distribute this software for any
7  * purpose with or without fee is hereby granted, provided that the above
8  * copyright notice and this permission notice appear in all copies.
9  * 
10  * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11  * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12  * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13  * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14  * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15  * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16  * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17  */
18
19 /*
20  * Handling of pooler listening sockets
21  */
22
23 #include "bouncer.h"
24
25 static int fd_net = 0;
26 static int fd_unix = 0;
27 #ifdef HAVE_IPV6
28 static int fd_net_v6 = 0;
29 #endif
30
31
32 static struct event ev_net;
33 static struct event ev_unix;
34
35 /* if sockets are registered in libevent */
36 static bool reg_net = false;
37 static bool reg_unix = false;
38
39 /* should listening sockets be active or suspended? */
40 static bool pooler_active = false;
41
42 /* on accept() failure sleep 5 seconds */
43 static struct event ev_err;
44 static struct timeval err_timeout = {5, 0};
45
46 /* atexit() cleanup func */
47 static void cleanup_unix_socket(void)
48 {
49         char fn[256];
50
51         /* avoid cleanup if exit() while suspended */
52         if (!reg_unix)
53                 return;
54
55         snprintf(fn, sizeof(fn), "%s/.s.PGSQL.%d",
56                         cf_unix_socket_dir, cf_listen_port);
57         unlink(fn);
58 }
59
60 void get_pooler_fds(int *p_net, int *p_unix)
61 {
62         *p_net = fd_net;
63         *p_unix = fd_unix;
64 }
65
66 static int create_unix_socket(const char *socket_dir, int listen_port)
67 {
68         struct sockaddr_un un;
69         int res, sock;
70         char lockfile[256];
71         struct stat st;
72
73         /* fill sockaddr struct */
74         memset(&un, 0, sizeof(un));
75         un.sun_family = AF_UNIX;
76         snprintf(un.sun_path, sizeof(un.sun_path),
77                 "%s/.s.PGSQL.%d", socket_dir, listen_port);
78
79         /* check for lockfile */
80         snprintf(lockfile, sizeof(lockfile), "%s.lock", un.sun_path);
81         res = lstat(lockfile, &st);
82         if (res == 0)
83                 fatal("unix port %d is in use", listen_port);
84
85         /* expect old bouncer gone */
86         unlink(un.sun_path);
87
88         /* create socket */
89         sock = socket(PF_UNIX, SOCK_STREAM, 0);
90         if (sock < 0)
91                 fatal_perror("socket");
92
93         /* bind it */
94         res = bind(sock, (const struct sockaddr *)&un, sizeof(un));
95         if (res < 0)
96                 fatal_perror("bind");
97
98         /* remove socket on shutdown */
99         atexit(cleanup_unix_socket);
100
101         /* set common options */
102         if (!tune_socket(sock, true))
103                 fatal_perror("tune_socket");
104
105         /* finally, accept connections */
106         res = listen(sock, cf_listen_backlog);
107         if (res < 0)
108                 fatal_perror("listen");
109
110         res = chmod(un.sun_path, 0777);
111         if (res < 0)
112                 fatal_perror("chmod");
113
114         log_info("listening on unix:%s", un.sun_path);
115
116         return sock;
117 }
118
119 /*
120  * Notify pooler only when also data is arrived.
121  *
122  * optval specifies how long after connection attempt to wait for data.
123  *
124  * Related to tcp_synack_retries sysctl, default 5 (corresponds 180 secs).
125  *
126  * SO_ACCEPTFILTER needs to be set after listern(), maybe TCP_DEFER_ACCEPT too.
127  */
128 static void tune_accept(int sock, bool on)
129 {
130         const char *act = on ? "install" : "uninstall";
131         int res = 0;
132 #ifdef TCP_DEFER_ACCEPT
133         int val = 45; /* fixme: proper value */
134         socklen_t vlen = sizeof(val);
135         res = getsockopt(sock, IPPROTO_TCP, TCP_DEFER_ACCEPT, &val, &vlen);
136         log_noise("old TCP_DEFER_ACCEPT on %d = %d", sock, val);
137         val = on ? 1 : 0;
138         log_noise("%s TCP_DEFER_ACCEPT on %d", act, sock);
139         res = setsockopt(sock, IPPROTO_TCP, TCP_DEFER_ACCEPT, &val, sizeof(val));
140 #else
141 #if 0
142 #ifdef SO_ACCEPTFILTER
143         struct accept_filter_arg af, *afp = on ? &af : NULL;
144         socklen_t af_len = on ? sizeof(af) : 0;
145         memset(&af, 0, sizeof(af));
146         strcpy(af.af_name, "dataready");
147         log_noise("%s SO_ACCEPTFILTER on %d", act, sock);
148         res = setsockopt(sock, SOL_SOCKET, SO_ACCEPTFILTER, afp, af_len);
149 #endif
150 #endif
151 #endif
152         if (res < 0)
153                 log_warning("tune_accept: %s TCP_DEFER_ACCEPT/SO_ACCEPTFILTER: %s",
154                             act, strerror(errno));
155 }
156
157 void pooler_tune_accept(bool on)
158 {
159         if (fd_net > 0)
160                 tune_accept(fd_net, on);
161 }
162
163 static int create_net_socket(const char *listen_addr, int listen_port)
164 {
165         int sock;
166         struct sockaddr_in sa4;
167         struct sockaddr_in6 sa6;
168         struct sockaddr * sa;
169         int sa_size = 0;
170         int res;
171         int val;
172
173
174         /* parse address as IPv4 */
175         memset(&sa4, 0, sizeof(sa));
176         sa = (struct sockaddr *)&sa4;
177         sa_size = sizeof(sa4);
178         sa4.sin_family = AF_INET;
179         sa4.sin_port = htons(cf_listen_port);
180         if (strcmp(listen_addr, "*") == 0) {
181                 sa4.sin_addr.s_addr = htonl(INADDR_ANY);
182         } else {
183                 sa4.sin_addr.s_addr = inet_addr(listen_addr);
184         }
185         if (sa4.sin_addr.s_addr == INADDR_NONE) {
186                 /* IPv4 addr not foundm re-parse address as IPv6 */
187                 log_info("trying to parse %s as IPv6 address", listen_addr);
188                 sa = (struct sockaddr *)&sa6;
189                 sa_size = sizeof(sa6);
190                 memset(&sa6, 0, sizeof(sa6));
191                 sa6.sin6_family = AF_INET6;
192                 sa6.sin6_port = htons(cf_listen_port);
193
194                 if (inet_pton(AF_INET6, listen_addr, (void *) sa6.sin6_addr.s6_addr) <= 0)
195                         fatal("cannot parse addr: '%s'", listen_addr);
196         }
197
198         /* create socket of right type */ //NB! turn if() into x?y:z
199         if (sa6.sin6_family == AF_INET6){
200                 sock = socket(AF_INET6, SOCK_STREAM, 0);
201                 log_info("created AF_INET6 socket");
202         } else {
203                 sock = socket(AF_INET, SOCK_STREAM, 0);
204                 log_info("created AF_INET socket");
205         }
206         if (sock < 0)
207                 fatal_perror("socket");
208
209         /* relaxed binding */
210         val = 1;
211         res = setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val));
212         if (res < 0)
213                 fatal_perror("setsockopt");
214
215         /* bind to address */
216         res = bind(sock, sa, sa_size);
217         if (res < 0)
218                 fatal_perror("bind");
219
220         /* set common options */
221         if (!tune_socket(sock, false))
222                 fatal_perror("tune_socket");
223
224         /* make it accept connections */
225         res = listen(sock, cf_listen_backlog);
226         if (res < 0)
227                 fatal_perror("listen");
228
229         tune_accept(sock, cf_tcp_defer_accept);
230
231         log_info("listening on %s:%d", cf_listen_addr, cf_listen_port);
232
233         return sock;
234 }
235
236 static void err_wait_func(int sock, short flags, void *arg)
237 {
238         if (cf_pause_mode != P_SUSPEND)
239                 resume_pooler();
240 }
241
242 static const char *addrpair(const PgAddr *src, const PgAddr *dst)
243 {
244         static char ip1buf[INET6_ADDRSTRLEN], ip2buf[INET6_ADDRSTRLEN],
245                     buf[INET6_ADDRSTRLEN+INET6_ADDRSTRLEN+128];
246         const char *ip1, *ip2;
247         if (pga_is_unix(src))
248                 return "unix->unix";
249
250         ip1 = pga_ntop(src, ip1buf, sizeof(ip1buf));
251         if (!ip1)
252                 ip1 = strerror(errno);
253         ip2 = pga_ntop(src, ip2buf, sizeof(ip2buf));
254         if (!ip2)
255                 ip2 = strerror(errno);
256         snprintf(buf, sizeof(buf), "%s:%d -> %s:%d",
257                  ip1, pga_port(src), ip2, pga_port(dst));
258         return buf;
259 }
260
261 static const char *conninfo(const PgSocket *sk)
262 {
263         if (is_server_socket(sk))
264                 return addrpair(&sk->local_addr, &sk->remote_addr);
265         else
266                 return addrpair(&sk->remote_addr, &sk->local_addr);
267 }
268
269 /* got new connection, associate it with client struct */
270 static void pool_accept(int sock, short flags, void *is_unix)
271 {
272         int fd;
273         PgSocket *client;
274         union {
275                 struct sockaddr_in in;
276                 struct sockaddr_un un;
277                 struct sockaddr sa;
278         } addr;
279         socklen_t len = sizeof(addr);
280
281         if(!(flags & EV_READ)) {
282                 log_warning("No EV_READ in pool_accept");
283                 return;
284         }
285 loop:
286         /* get fd */
287         fd = safe_accept(sock, &addr.sa, &len);
288         if (fd < 0) {
289                 if (errno == EAGAIN)
290                         return;
291                 else if (errno == ECONNABORTED)
292                         return;
293
294                 /*
295                  * probably fd limit, pointless to try often
296                  * wait a bit, hope that admin resolves somehow
297                  */
298                 log_error("accept() failed: %s", strerror(errno));
299                 evtimer_set(&ev_err, err_wait_func, NULL);
300                 safe_evtimer_add(&ev_err, &err_timeout);
301                 suspend_pooler();
302                 return;
303         }
304
305         log_noise("new fd from accept=%d", fd);
306         if (is_unix) {
307                 {
308                         uid_t uid;
309                         gid_t gid;
310                         log_noise("getuid(): %d", (int)getuid());
311                         if (getpeereid(fd, &uid, &gid) >= 0)
312                                 log_noise("unix peer uid: %d", (int)uid);
313                         else
314                                 log_warning("unix peer uid failed: %s", strerror(errno));
315                 }
316                 client = accept_client(fd, NULL, true);
317         } else {
318                 client = accept_client(fd, &addr.in, false);
319         }
320
321         if (client)
322                 slog_debug(client, "P: got connection: %s", conninfo(client));
323
324         /*
325          * there may be several clients waiting,
326          * avoid context switch by looping
327          */
328         goto loop;
329 }
330
331 bool use_pooler_socket(int sock, bool is_unix)
332 {
333         if (!tune_socket(sock, is_unix))
334                 return false;
335
336         if (is_unix)
337                 fd_unix = sock;
338         else
339                 fd_net = sock;
340         return true;
341 }
342
343 void suspend_pooler(void)
344 {
345         pooler_active = false;
346
347         if (fd_net && reg_net) {
348                 if (event_del(&ev_net) < 0) {
349                         log_warning("suspend_pooler, event_del: %s", strerror(errno));
350                         return;
351                 }
352                 reg_net = false;
353         }
354         if (fd_unix && reg_unix) {
355                 if (event_del(&ev_unix) < 0) {
356                         log_warning("suspend_pooler, event_del: %s", strerror(errno));
357                         return;
358                 }
359                 reg_unix = false;
360         }
361 }
362
363 void resume_pooler(void)
364 {
365         pooler_active = true;
366
367         if (fd_unix && !reg_unix) {
368                 event_set(&ev_unix, fd_unix, EV_READ | EV_PERSIST, pool_accept, "1");
369                 if (event_add(&ev_unix, NULL) < 0) {
370                         log_warning("event_add failed: %s", strerror(errno));
371                         return;
372                 }
373                 reg_unix = true;
374         }
375
376         if (fd_net && !reg_net) {
377                 event_set(&ev_net, fd_net, EV_READ | EV_PERSIST, pool_accept, NULL);
378                 if (event_add(&ev_net, NULL) < 0) {
379                         log_warning("event_add failed: %s", strerror(errno));
380                 }
381                 reg_net = true;
382         }
383 }
384
385 /* listen on socket - should happen after all other initializations */
386 void pooler_setup(void)
387 {
388         if (cf_listen_addr && !fd_net)
389                 fd_net = create_net_socket(cf_listen_addr, cf_listen_port);
390
391         if (cf_unix_socket_dir && *cf_unix_socket_dir && !fd_unix)
392                 fd_unix = create_unix_socket(cf_unix_socket_dir, cf_listen_port);
393
394         if (!fd_net && !fd_unix)
395                 fatal("nowhere to listen on");
396
397         resume_pooler();
398 }
399
400 /* retry previously failed suspend_pooler() / resume_pooler() */
401 void per_loop_pooler_maint(void)
402 {
403         if (pooler_active) {
404                 if ((fd_unix && !reg_unix) || (fd_net && !reg_net))
405                         resume_pooler();
406         } else {
407                 if ((fd_unix && reg_unix) || (fd_net && reg_net))
408                         suspend_pooler();
409         }
410 }
411