From: Marko Kreen Date: Wed, 23 Jan 2008 09:43:34 +0000 (+0000) Subject: draft lazy iobuf handling X-Git-Tag: pgbouncer_1_2_rc2~46 X-Git-Url: https://granicus.if.org/sourcecode?a=commitdiff_plain;h=4de7ac670c698d6528000ab9b9c51eb23a49e3e6;p=pgbouncer draft lazy iobuf handling --- diff --git a/include/bouncer.h b/include/bouncer.h index cac0655..42e7344 100644 --- a/include/bouncer.h +++ b/include/bouncer.h @@ -275,8 +275,8 @@ struct PgSocket { SBuf sbuf; /* stream buffer, must be last */ }; -#define RAW_SOCKET_SIZE offsetof(struct PgSocket, sbuf.io.buf) -#define PG_SOCKET_SIZE (RAW_SOCKET_SIZE + cf_sbuf_len + SBUF_MAX_REWRITE) +#define RAW_IOBUF_SIZE offsetof(IOBuf, buf) +#define IOBUF_SIZE (RAW_IOBUF_SIZE + cf_sbuf_len) /* where to store old fd info during SHOW FDS result processing */ #define tmp_sk_oldfd request_time diff --git a/include/iobuf.h b/include/iobuf.h index 3197c73..42574b3 100644 --- a/include/iobuf.h +++ b/include/iobuf.h @@ -56,15 +56,15 @@ typedef struct iobuf IOBuf; static inline bool iobuf_sane(const IOBuf *io) { - return io->done_pos >= 0 + return (io == NULL) || (io->done_pos >= 0 && io->parse_pos >= io->done_pos && io->recv_pos >= io->parse_pos - && cf_sbuf_len >= io->recv_pos; + && cf_sbuf_len >= io->recv_pos); } -static inline bool iobuf_empty(const IOBuf *buf) +static inline bool iobuf_empty(const IOBuf *io) { - return buf->done_pos == buf->recv_pos; + return io == NULL || io->done_pos == io->recv_pos; } /* unsent amount */ diff --git a/include/objects.h b/include/objects.h index 92f44a7..374e22b 100644 --- a/include/objects.h +++ b/include/objects.h @@ -26,6 +26,7 @@ extern ObjectCache *server_cache; extern ObjectCache *db_cache; extern ObjectCache *pool_cache; extern ObjectCache *user_cache; +extern ObjectCache *iobuf_cache; PgDatabase *find_database(const char *name); PgUser *find_user(const char *name); diff --git a/include/sbuf.h b/include/sbuf.h index d2b6956..eaa7e2c 100644 --- a/include/sbuf.h +++ b/include/sbuf.h @@ -39,11 +39,6 @@ typedef enum { */ #define SBUF_SMALL_PKT 64 -/* - * How much proto handler may want to enlarge the packet. - */ -#define SBUF_MAX_REWRITE 16 - /* fwd def */ typedef struct SBuf SBuf; @@ -80,11 +75,9 @@ struct SBuf { SBuf *dst; /* target SBuf for current packet */ - IOBuf io; + IOBuf *io; }; -#define RAW_SBUF_SIZE offsetof(struct SBuf, io.buf) - #define sbuf_socket(sbuf) ((sbuf)->sock) void sbuf_init(SBuf *sbuf, sbuf_cb_t proto_fn, void *arg); @@ -110,7 +103,7 @@ bool sbuf_continue_with_callback(SBuf *sbuf, sbuf_libevent_cb cb) _MUSTCHECK; */ static inline bool sbuf_is_empty(SBuf *sbuf) { - return iobuf_empty(&sbuf->io) && sbuf->pkt_remain == 0; + return iobuf_empty(sbuf->io) && sbuf->pkt_remain == 0; } static inline bool sbuf_is_closed(SBuf *sbuf) @@ -118,6 +111,4 @@ static inline bool sbuf_is_closed(SBuf *sbuf) return sbuf->sock == 0; } -bool sbuf_rewrite_header(SBuf *sbuf, int old_len, - const uint8_t *new_hdr, int new_len) _MUSTCHECK; diff --git a/src/admin.c b/src/admin.c index 65e26bc..d9431ad 100644 --- a/src/admin.c +++ b/src/admin.c @@ -501,8 +501,8 @@ static void adr2txt(const PgAddr *adr, char *dst, int dstlen) static void socket_row(PktBuf *buf, PgSocket *sk, const char *state, bool debug) { - int pkt_avail = iobuf_amount_parse(&sk->sbuf.io); - int send_avail = iobuf_amount_pending(&sk->sbuf.io); + int pkt_avail = iobuf_amount_parse(sk->sbuf.io); + int send_avail = iobuf_amount_pending(sk->sbuf.io); char ptrbuf[128], linkbuf[128]; char l_addr[32], r_addr[32]; @@ -524,10 +524,10 @@ static void socket_row(PktBuf *buf, PgSocket *sk, const char *state, bool debug) sk->connect_time, sk->request_time, ptrbuf, linkbuf, - sk->sbuf.io.recv_pos, - sk->sbuf.io.parse_pos, + sk->sbuf.io->recv_pos, + sk->sbuf.io->parse_pos, sk->sbuf.pkt_remain, - sk->sbuf.io.done_pos, + sk->sbuf.io->done_pos, 0, pkt_avail, send_avail); } diff --git a/src/main.c b/src/main.c index 0dce3d9..aa9afe3 100644 --- a/src/main.c +++ b/src/main.c @@ -471,9 +471,9 @@ static void check_limits(void) List *item; PgDatabase *db; - log_noise("event: %d, SBuf: %d, PgSocket: %d, Full PgSocket: %d", - (int)sizeof(struct event), (int)RAW_SBUF_SIZE, - (int)RAW_SOCKET_SIZE, (int)PG_SOCKET_SIZE); + log_noise("event: %d, SBuf: %d, PgSocket: %d, IOBuf: %d", + (int)sizeof(struct event), (int)sizeof(SBuf), + (int)sizeof(PgSocket), (int)IOBUF_SIZE); /* load limits */ err = getrlimit(RLIMIT_NOFILE, &lim); diff --git a/src/objects.c b/src/objects.c index a170f8f..c99e5ef 100644 --- a/src/objects.c +++ b/src/objects.c @@ -41,6 +41,7 @@ ObjectCache *client_cache; ObjectCache *db_cache; ObjectCache *pool_cache; ObjectCache *user_cache; +ObjectCache *iobuf_cache; /* * libevent may still report events when event_del() @@ -125,13 +126,21 @@ void init_objects(void) fatal("cannot create initial caches"); } +static void do_iobuf_reset(void *arg) +{ + IOBuf *io = arg; + iobuf_reset(io); +} + /* initialization after config loading */ void init_caches(void) { - server_cache = objcache_create("server_cache", PG_SOCKET_SIZE, 8, + server_cache = objcache_create("server_cache", sizeof(PgSocket), 0, construct_server, clean_socket); - client_cache = objcache_create("client_cache", PG_SOCKET_SIZE, 8, + client_cache = objcache_create("client_cache", sizeof(PgSocket), 0, construct_client, clean_socket); + iobuf_cache = objcache_create("iobuf_cache", IOBUF_SIZE, 0, + do_iobuf_reset, do_iobuf_reset); } /* state change means moving between lists */ diff --git a/src/sbuf.c b/src/sbuf.c index 8e6c622..f97cffa 100644 --- a/src/sbuf.c +++ b/src/sbuf.c @@ -37,7 +37,7 @@ #define AssertSanity(sbuf) do { \ - Assert(iobuf_sane(&(sbuf)->io)); \ + Assert(iobuf_sane((sbuf)->io)); \ Assert((sbuf)->pkt_remain >= 0); \ } while (0) @@ -53,14 +53,14 @@ static bool sbuf_process_pending(SBuf *sbuf) _MUSTCHECK; static void sbuf_connect_cb(int sock, short flags, void *arg); static void sbuf_recv_cb(int sock, short flags, void *arg); static void sbuf_send_cb(int sock, short flags, void *arg); -static void sbuf_try_resync(SBuf *sbuf); +static void sbuf_try_resync(SBuf *sbuf, bool release); static bool sbuf_wait_for_data(SBuf *sbuf) _MUSTCHECK; static void sbuf_main_loop(SBuf *sbuf, bool skip_recv); static bool sbuf_call_proto(SBuf *sbuf, int event) /* _MUSTCHECK */; static bool sbuf_actual_recv(SBuf *sbuf, int len) _MUSTCHECK; static bool sbuf_after_connect_check(SBuf *sbuf) _MUSTCHECK; -static inline IOBuf *get_iobuf(SBuf *sbuf) { return &sbuf->io; } +static inline IOBuf *get_iobuf(SBuf *sbuf) { return sbuf->io; } /********************************* * Public functions @@ -69,7 +69,7 @@ static inline IOBuf *get_iobuf(SBuf *sbuf) { return &sbuf->io; } /* initialize SBuf with proto handler */ void sbuf_init(SBuf *sbuf, sbuf_cb_t proto_fn, void *arg) { - memset(sbuf, 0, RAW_SBUF_SIZE); + memset(sbuf, 0, sizeof(SBuf)); sbuf->proto_cb_arg = arg; sbuf->proto_cb = proto_fn; } @@ -79,7 +79,7 @@ bool sbuf_accept(SBuf *sbuf, int sock, bool is_unix) { bool res; - Assert(iobuf_empty(&sbuf->io) && sbuf->sock == 0); + Assert(iobuf_empty(sbuf->io) && sbuf->sock == 0); AssertSanity(sbuf); tune_socket(sock, is_unix); @@ -112,7 +112,7 @@ bool sbuf_connect(SBuf *sbuf, const PgAddr *addr, const char *unix_dir, int time socklen_t len; struct timeval timeout; - Assert(iobuf_empty(&sbuf->io) && sbuf->sock == 0); + Assert(iobuf_empty(sbuf->io) && sbuf->sock == 0); AssertSanity(sbuf); /* prepare sockaddr */ @@ -253,7 +253,10 @@ bool sbuf_close(SBuf *sbuf) sbuf->sock = 0; sbuf->pkt_remain = 0; sbuf->pkt_action = sbuf->wait_send = 0; - iobuf_reset(get_iobuf(sbuf)); + if (sbuf->io) { + obj_free(iobuf_cache, sbuf->io); + sbuf->io = NULL; + } return true; } @@ -312,7 +315,7 @@ void sbuf_prepare_fetch(SBuf *sbuf, int amount) static bool sbuf_call_proto(SBuf *sbuf, int event) { MBuf mbuf; - IOBuf *io = get_iobuf(sbuf); + IOBuf *io = sbuf->io; bool res; AssertSanity(sbuf); @@ -321,8 +324,10 @@ static bool sbuf_call_proto(SBuf *sbuf, int event) /* if pkt callback, limit only with current packet */ if (event == SBUF_EV_PKT_CALLBACK) iobuf_parse_limit(io, &mbuf, sbuf->pkt_remain); - else + else if (event == SBUF_EV_READ) iobuf_parse_all(io, &mbuf); + else + memset(&mbuf, 0, sizeof(mbuf)); res = sbuf->proto_cb(sbuf, event, &mbuf, sbuf->proto_cb_arg); @@ -405,10 +410,10 @@ static bool sbuf_queue_send(SBuf *sbuf) static bool sbuf_send_pending(SBuf *sbuf) { int res, avail; - IOBuf *io = get_iobuf(sbuf); + IOBuf *io = sbuf->io; AssertActive(sbuf); - Assert(sbuf->dst || iobuf_amount_pending(&sbuf->io) == 0); + Assert(sbuf->dst || iobuf_amount_pending(io) == 0); try_more: /* how much data is available for sending */ @@ -447,7 +452,7 @@ try_more: static bool sbuf_process_pending(SBuf *sbuf) { int avail; - IOBuf *io = get_iobuf(sbuf); + IOBuf *io = sbuf->io; bool full = iobuf_amount_recv(io) <= 0; bool res; @@ -508,20 +513,30 @@ static bool sbuf_process_pending(SBuf *sbuf) } /* reposition at buffer start again */ -static void sbuf_try_resync(SBuf *sbuf) +static void sbuf_try_resync(SBuf *sbuf, bool release) { - IOBuf *io = get_iobuf(sbuf); + IOBuf *io = sbuf->io; + if (io) + log_debug("reync: done=%d, parse=%d, recv=%d", + io->done_pos, io->parse_pos, io->recv_pos); AssertActive(sbuf); - iobuf_try_resync(io, SBUF_SMALL_PKT); + if (!io) + return; + + if (release && iobuf_empty(io)) { + obj_free(iobuf_cache, io); + sbuf->io = NULL; + } else + iobuf_try_resync(io, SBUF_SMALL_PKT); } /* actually ask kernel for more data */ static bool sbuf_actual_recv(SBuf *sbuf, int len) { int got; - IOBuf *io = get_iobuf(sbuf); + IOBuf *io = sbuf->io; AssertActive(sbuf); Assert(len > 0); @@ -547,6 +562,19 @@ static void sbuf_recv_cb(int sock, short flags, void *arg) sbuf_main_loop(sbuf, DO_RECV); } +static bool allocate_iobuf(SBuf *sbuf) +{ + if (sbuf->io == NULL) { + sbuf->io = obj_alloc(iobuf_cache); + if (sbuf->io == NULL) { + sbuf_call_proto(sbuf, SBUF_EV_RECV_FAILED); + return false; + } + iobuf_reset(sbuf->io); + } + return true; +} + /* * Main recv-parse-send-repeat loop. * @@ -559,7 +587,6 @@ static void sbuf_recv_cb(int sock, short flags, void *arg) static void sbuf_main_loop(SBuf *sbuf, bool skip_recv) { int free, ok; - IOBuf *io = get_iobuf(sbuf); /* sbuf was closed before in this event loop */ if (!sbuf->sock) @@ -569,19 +596,22 @@ static void sbuf_main_loop(SBuf *sbuf, bool skip_recv) Assert(sbuf->wait_send == 0); AssertSanity(sbuf); + if (!allocate_iobuf(sbuf)) + return; + /* avoid recv() if asked */ if (skip_recv) goto skip_recv; try_more: /* make room in buffer */ - sbuf_try_resync(sbuf); + sbuf_try_resync(sbuf, false); /* * here used to be if (free > SBUF_SMALL_PKT) check * but with skip_recv switch its should not be needed anymore. */ - free = iobuf_amount_recv(io); + free = iobuf_amount_recv(sbuf->io); if (free > 0) { /* * When suspending, try to hit packet boundary ASAP. @@ -606,11 +636,11 @@ skip_recv: return; /* if the buffer is full, there can be more data available */ - if (iobuf_amount_recv(io) <= 0) + if (iobuf_amount_recv(sbuf->io) <= 0) goto try_more; /* clean buffer */ - sbuf_try_resync(sbuf); + sbuf_try_resync(sbuf, true); /* notify proto that all is sent */ if (sbuf_is_empty(sbuf)) @@ -669,28 +699,3 @@ bool sbuf_answer(SBuf *sbuf, const void *buf, int len) return res == len; } -bool sbuf_rewrite_header(SBuf *sbuf, int old_len, - const uint8_t *new_hdr, int new_len) -{ -#if 0 - int avail = sbuf->recv_pos - sbuf->pkt_pos; - int diff = new_len - old_len; - uint8_t *pkt_pos = sbuf->buf + sbuf->pkt_pos; - uint8_t *old_pos = pkt_pos + old_len; - uint8_t *new_pos = pkt_pos + new_len; - - AssertActive(sbuf); - Assert(old_len >= 0 && new_len >= 0); - Assert(diff <= SBUF_MAX_REWRITE); - - /* overflow can be triggered by user by sending multiple Parse pkts */ - if (sbuf->recv_pos + diff > cf_sbuf_len + SBUF_MAX_REWRITE) - return false; - - memmove(new_pos, old_pos, avail - old_len); - memcpy(pkt_pos, new_hdr, new_len); - sbuf->recv_pos += diff; -#endif - return false; -} -