2 * PgBouncer - Lightweight connection pooler for PostgreSQL.
4 * Copyright (c) 2007 Marko Kreen, Skype Technologies OÜ
6 * Permission to use, copy, modify, and distribute this software for any
7 * purpose with or without fee is hereby granted, provided that the above
8 * copyright notice and this permission notice appear in all copies.
10 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
22 * The task is to copy data from one socket to another
23 * efficiently, while allowing callbacks to look
29 /* sbuf_main_loop() skip_recv values */
31 #define SKIP_RECV true
39 #define AssertSanity(sbuf) do { \
40 Assert(iobuf_sane((sbuf)->io)); \
43 #define AssertActive(sbuf) do { \
44 Assert((sbuf)->sock > 0); \
48 /* declare static stuff */
49 static bool sbuf_queue_send(SBuf *sbuf) _MUSTCHECK;
50 static bool sbuf_send_pending(SBuf *sbuf) _MUSTCHECK;
51 static bool sbuf_process_pending(SBuf *sbuf) _MUSTCHECK;
52 static void sbuf_connect_cb(int sock, short flags, void *arg);
53 static void sbuf_recv_cb(int sock, short flags, void *arg);
54 static void sbuf_send_cb(int sock, short flags, void *arg);
55 static void sbuf_try_resync(SBuf *sbuf, bool release);
56 static bool sbuf_wait_for_data(SBuf *sbuf) _MUSTCHECK;
57 static void sbuf_main_loop(SBuf *sbuf, bool skip_recv);
58 static bool sbuf_call_proto(SBuf *sbuf, int event) /* _MUSTCHECK */;
59 static bool sbuf_actual_recv(SBuf *sbuf, unsigned len) _MUSTCHECK;
60 static bool sbuf_after_connect_check(SBuf *sbuf) _MUSTCHECK;
62 static inline IOBuf *get_iobuf(SBuf *sbuf) { return sbuf->io; }
64 /*********************************
66 *********************************/
68 /* initialize SBuf with proto handler */
69 void sbuf_init(SBuf *sbuf, sbuf_cb_t proto_fn)
71 memset(sbuf, 0, sizeof(SBuf));
72 sbuf->proto_cb = proto_fn;
75 /* got new socket from accept() */
76 bool sbuf_accept(SBuf *sbuf, int sock, bool is_unix)
80 Assert(iobuf_empty(sbuf->io) && sbuf->sock == 0);
83 tune_socket(sock, is_unix);
85 sbuf->is_unix = is_unix;
88 res = sbuf_wait_for_data(sbuf);
90 sbuf_call_proto(sbuf, SBUF_EV_RECV_FAILED);
93 /* socket should already have some data (linux only) */
94 if (cf_tcp_defer_accept && !is_unix) {
95 sbuf_main_loop(sbuf, DO_RECV);
103 /* need to connect() to get a socket */
104 bool sbuf_connect(SBuf *sbuf, const PgAddr *addr, const char *unix_dir, int timeout_sec)
106 int res, sock, domain;
107 struct sockaddr_in sa_in;
108 struct sockaddr_un sa_un;
111 struct timeval timeout;
113 Assert(iobuf_empty(sbuf->io) && sbuf->sock == 0);
116 /* prepare sockaddr */
121 sa_un.sun_family = AF_UNIX;
122 snprintf(sa_un.sun_path, sizeof(sa_un.sun_path),
123 "%s/.s.PGSQL.%d", unix_dir, addr->port);
129 sa_in.sin_family = AF_INET;
130 sa_in.sin_addr = addr->ip_addr;
131 sa_in.sin_port = htons(addr->port);
138 sock = socket(domain, SOCK_STREAM, 0);
140 /* probably fd limit */
143 tune_socket(sock, addr->is_unix);
145 sbuf->is_unix = addr->is_unix;
148 timeout.tv_sec = timeout_sec;
151 /* launch connection */
152 res = safe_connect(sock, sa, len);
154 /* unix socket gives connection immidiately */
155 sbuf_connect_cb(sock, EV_WRITE, sbuf);
157 } else if (errno == EINPROGRESS) {
158 /* tcp socket needs waiting */
159 event_set(&sbuf->ev, sock, EV_WRITE, sbuf_connect_cb, sbuf);
160 res = event_add(&sbuf->ev, &timeout);
166 log_warning("sbuf_connect failed: %s", strerror(errno));
171 sbuf_call_proto(sbuf, SBUF_EV_CONNECT_FAILED);
175 /* don't wait for data on this socket */
176 bool sbuf_pause(SBuf *sbuf)
179 Assert(sbuf->wait_send == 0);
181 if (event_del(&sbuf->ev) < 0) {
182 log_warning("event_del: %s", strerror(errno));
188 /* resume from pause, start waiting for data */
189 void sbuf_continue(SBuf *sbuf)
191 bool do_recv = DO_RECV;
195 res = sbuf_wait_for_data(sbuf);
197 /* drop if problems */
198 sbuf_call_proto(sbuf, SBUF_EV_RECV_FAILED);
203 * It's tempting to try to avoid the recv() but that would
204 * only work if no code wants to see full packet.
206 * This is not true in ServerParameter case.
209 * if (sbuf->recv_pos - sbuf->pkt_pos >= SBUF_SMALL_PKT)
213 sbuf_main_loop(sbuf, do_recv);
217 * Resume from pause and give socket over to external
220 * The callback will be called with arg given to sbuf_init.
222 bool sbuf_continue_with_callback(SBuf *sbuf, sbuf_libevent_cb user_cb)
228 event_set(&sbuf->ev, sbuf->sock, EV_READ | EV_PERSIST,
231 err = event_add(&sbuf->ev, NULL);
233 log_warning("sbuf_continue_with_callback: %s", strerror(errno));
239 /* socket cleanup & close */
240 bool sbuf_close(SBuf *sbuf)
242 /* keep handler & arg values */
243 if (sbuf->sock > 0) {
244 if (event_del(&sbuf->ev) < 0) {
245 log_warning("event_del: %s", strerror(errno));
248 safe_close(sbuf->sock);
252 sbuf->pkt_remain = 0;
253 sbuf->pkt_action = sbuf->wait_send = 0;
255 obj_free(iobuf_cache, sbuf->io);
261 /* proto_fn tells to send some bytes to socket */
262 void sbuf_prepare_send(SBuf *sbuf, SBuf *dst, unsigned amount)
265 Assert(sbuf->pkt_remain == 0);
266 //Assert(sbuf->pkt_action == ACT_UNSET || sbuf->pkt_action == ACT_SEND || iobuf_amount_pending(&sbuf->io));
269 sbuf->pkt_action = ACT_SEND;
270 sbuf->pkt_remain = amount;
274 /* proto_fn tells to skip some amount of bytes */
275 void sbuf_prepare_skip(SBuf *sbuf, unsigned amount)
278 Assert(sbuf->pkt_remain == 0);
279 //Assert(sbuf->pkt_action == ACT_UNSET || iobuf_send_pending_avail(&sbuf->io));
282 sbuf->pkt_action = ACT_SKIP;
283 sbuf->pkt_remain = amount;
286 /* proto_fn tells to skip some amount of bytes */
287 void sbuf_prepare_fetch(SBuf *sbuf, unsigned amount)
290 Assert(sbuf->pkt_remain == 0);
291 //Assert(sbuf->pkt_action == ACT_UNSET || iobuf_send_pending_avail(&sbuf->io));
294 sbuf->pkt_action = ACT_CALL;
295 sbuf->pkt_remain = amount;
296 /* sbuf->dst = NULL; // fixme ?? */
299 /*************************
301 *************************/
304 * Call proto callback with proper MBuf.
306 * If callback returns true it used one of sbuf_prepare_* on sbuf,
307 * and processing can continue.
309 * If it returned false it used sbuf_pause(), sbuf_close() or simply
310 * wants to wait for next event loop (e.g. too few data available).
311 * Callee should not touch sbuf in that case and just return to libevent.
313 static bool sbuf_call_proto(SBuf *sbuf, int event)
316 IOBuf *io = sbuf->io;
320 Assert(event != SBUF_EV_READ || iobuf_amount_parse(io) > 0);
322 /* if pkt callback, limit only with current packet */
323 if (event == SBUF_EV_PKT_CALLBACK)
324 iobuf_parse_limit(io, &mbuf, sbuf->pkt_remain);
325 else if (event == SBUF_EV_READ)
326 iobuf_parse_all(io, &mbuf);
328 memset(&mbuf, 0, sizeof(mbuf));
330 res = sbuf->proto_cb(sbuf, event, &mbuf);
333 Assert(event != SBUF_EV_READ || !res || sbuf->sock > 0);
338 /* let's wait for new data */
339 static bool sbuf_wait_for_data(SBuf *sbuf)
343 event_set(&sbuf->ev, sbuf->sock, EV_READ | EV_PERSIST, sbuf_recv_cb, sbuf);
344 err = event_add(&sbuf->ev, NULL);
346 log_warning("sbuf_wait_for_data: event_add: %s", strerror(errno));
352 /* libevent EV_WRITE: called when dest socket is writable again */
353 static void sbuf_send_cb(int sock, short flags, void *arg)
358 /* sbuf was closed before in this loop */
363 Assert(sbuf->wait_send);
365 /* prepare normal situation for sbuf_main_loop */
367 res = sbuf_wait_for_data(sbuf);
369 /* here we should certainly skip recv() */
370 sbuf_main_loop(sbuf, SKIP_RECV);
372 /* drop if problems */
373 sbuf_call_proto(sbuf, SBUF_EV_SEND_FAILED);
376 /* socket is full, wait until it's writable again */
377 static bool sbuf_queue_send(SBuf *sbuf)
382 /* if false is returned, the socket will be closed later */
384 /* stop waiting for read events */
385 err = event_del(&sbuf->ev);
387 log_warning("sbuf_queue_send: event_del failed: %s", strerror(errno));
391 /* instead wait for EV_WRITE on destination socket */
392 event_set(&sbuf->ev, sbuf->dst->sock, EV_WRITE, sbuf_send_cb, sbuf);
393 err = event_add(&sbuf->ev, NULL);
395 log_warning("sbuf_queue_send: event_add failed: %s", strerror(errno));
404 * There's data in buffer to be sent. Returns bool if processing can continue.
406 * Does not look at pkt_pos/remain fields, expects them to be merged to send_*
408 static bool sbuf_send_pending(SBuf *sbuf)
411 IOBuf *io = sbuf->io;
414 Assert(sbuf->dst || iobuf_amount_pending(io) == 0);
417 /* how much data is available for sending */
418 avail = iobuf_amount_pending(io);
422 if (sbuf->dst->sock == 0) {
423 log_error("sbuf_send_pending: no dst sock?");
427 /* actually send it */
428 res = iobuf_send_pending(io, sbuf->dst->sock);
430 if (errno == EAGAIN) {
431 if (!sbuf_queue_send(sbuf))
432 /* drop if queue failed */
433 sbuf_call_proto(sbuf, SBUF_EV_SEND_FAILED);
435 sbuf_call_proto(sbuf, SBUF_EV_SEND_FAILED);
442 * Should do sbuf_queue_send() immediately?
444 * To be sure, let's run into EAGAIN.
449 /* process as much data as possible */
450 static bool sbuf_process_pending(SBuf *sbuf)
453 IOBuf *io = sbuf->io;
454 bool full = iobuf_amount_recv(io) <= 0;
463 * The (avail <= SBUF_SMALL_PKT) check is to avoid partial pkts.
464 * As SBuf should not assume knowledge about packets,
465 * the check is not done in !full case. Packet handler can
466 * then still notify about partial packet by returning false.
468 avail = iobuf_amount_parse(io);
469 if (avail == 0 || (full && avail <= SBUF_SMALL_PKT))
473 * If start of packet, process packet header.
475 if (sbuf->pkt_remain == 0) {
476 res = sbuf_call_proto(sbuf, SBUF_EV_READ);
479 Assert(sbuf->pkt_remain > 0);
482 if (sbuf->pkt_action == ACT_SKIP || sbuf->pkt_action == ACT_CALL) {
483 /* send any pending data before skipping */
484 if (iobuf_amount_pending(io) > 0) {
485 res = sbuf_send_pending(sbuf);
491 if (avail > sbuf->pkt_remain)
492 avail = sbuf->pkt_remain;
494 switch (sbuf->pkt_action) {
496 iobuf_tag_send(io, avail);
499 res = sbuf_call_proto(sbuf, SBUF_EV_PKT_CALLBACK);
502 /* after callback, skip pkt */
504 iobuf_tag_skip(io, avail);
507 sbuf->pkt_remain -= avail;
510 return sbuf_send_pending(sbuf);
513 /* reposition at buffer start again */
514 static void sbuf_try_resync(SBuf *sbuf, bool release)
516 IOBuf *io = sbuf->io;
519 log_noise("resync: done=%d, parse=%d, recv=%d",
520 io->done_pos, io->parse_pos, io->recv_pos);
526 if (release && iobuf_empty(io)) {
527 obj_free(iobuf_cache, io);
530 iobuf_try_resync(io, SBUF_SMALL_PKT);
533 /* actually ask kernel for more data */
534 static bool sbuf_actual_recv(SBuf *sbuf, unsigned len)
537 IOBuf *io = sbuf->io;
541 Assert(iobuf_amount_recv(io) >= len);
543 got = iobuf_recv_limit(io, sbuf->sock, len);
545 /* eof from socket */
546 sbuf_call_proto(sbuf, SBUF_EV_RECV_FAILED);
548 } else if (got < 0 && errno != EAGAIN) {
549 /* some error occured */
550 sbuf_call_proto(sbuf, SBUF_EV_RECV_FAILED);
556 /* callback for libevent EV_READ */
557 static void sbuf_recv_cb(int sock, short flags, void *arg)
560 sbuf_main_loop(sbuf, DO_RECV);
563 static bool allocate_iobuf(SBuf *sbuf)
565 if (sbuf->io == NULL) {
566 sbuf->io = obj_alloc(iobuf_cache);
567 if (sbuf->io == NULL) {
568 sbuf_call_proto(sbuf, SBUF_EV_RECV_FAILED);
571 iobuf_reset(sbuf->io);
577 * Main recv-parse-send-repeat loop.
579 * Reason for skip_recv is to avoid extra recv(). The problem with it
580 * is EOF from socket. Currently that means that the pending data is
581 * dropped. Fortunately server sockets are not paused and dropping
582 * data from client is no problem. So only place where skip_recv is
583 * important is sbuf_send_cb().
585 static void sbuf_main_loop(SBuf *sbuf, bool skip_recv)
590 /* sbuf was closed before in this event loop */
594 /* reading should be disabled when waiting */
595 Assert(sbuf->wait_send == 0);
598 if (!allocate_iobuf(sbuf))
601 /* avoid recv() if asked */
606 /* avoid spending too much time on single socket */
607 if (cf_sbuf_loopcnt > 0 && loopcnt >= cf_sbuf_loopcnt) {
608 log_debug("loopcnt full");
613 /* make room in buffer */
614 sbuf_try_resync(sbuf, false);
617 * here used to be if (free > SBUF_SMALL_PKT) check
618 * but with skip_recv switch its should not be needed anymore.
620 free = iobuf_amount_recv(sbuf->io);
623 * When suspending, try to hit packet boundary ASAP.
625 if (cf_pause_mode == P_SUSPEND
626 && sbuf->pkt_remain > 0
627 && sbuf->pkt_remain < free)
629 free = sbuf->pkt_remain;
632 /* now fetch the data */
633 ok = sbuf_actual_recv(sbuf, free);
640 ok = sbuf_process_pending(sbuf);
644 /* if the buffer is full, there can be more data available */
645 if (iobuf_amount_recv(sbuf->io) <= 0)
649 sbuf_try_resync(sbuf, true);
651 /* notify proto that all is sent */
652 if (sbuf_is_empty(sbuf))
653 sbuf_call_proto(sbuf, SBUF_EV_FLUSH);
656 /* check if there is any error pending on socket */
657 static bool sbuf_after_connect_check(SBuf *sbuf)
660 socklen_t optlen = sizeof(optval);
662 err = getsockopt(sbuf->sock, SOL_SOCKET, SO_ERROR, (void*)&optval, &optlen);
664 log_debug("sbuf_after_connect_check: getsockopt: %s",
669 log_debug("sbuf_after_connect_check: pending error: %s",
676 /* callback for libevent EV_WRITE when connecting */
677 static void sbuf_connect_cb(int sock, short flags, void *arg)
681 if (flags & EV_WRITE) {
682 if (!sbuf_after_connect_check(sbuf))
684 if (!sbuf_call_proto(sbuf, SBUF_EV_CONNECT_OK))
686 if (!sbuf_wait_for_data(sbuf))
691 sbuf_call_proto(sbuf, SBUF_EV_CONNECT_FAILED);
694 /* send some data to listening socket */
695 bool sbuf_answer(SBuf *sbuf, const void *buf, unsigned len)
700 res = safe_send(sbuf->sock, buf, len, 0);
702 log_debug("sbuf_answer: error sending: %s", strerror(errno));
703 } else if ((unsigned)res != len)
704 log_debug("sbuf_answer: partial send: len=%d sent=%d", len, res);
705 return (unsigned)res == len;