From 3f0f22a497536d4e94a4b1b90a84152b78b6c9ca Mon Sep 17 00:00:00 2001 From: Marko Kreen Date: Thu, 6 Dec 2007 09:11:07 +0000 Subject: [PATCH] new features to SBuf - struct field reorder to get aligned buffer - support rewriting pkt header - support listening for full pkt data - clean some comments --- include/bouncer.h | 2 + include/sbuf.h | 81 +++++++++++++++++++++----------- src/client.c | 3 ++ src/objects.c | 4 +- src/sbuf.c | 116 ++++++++++++++++++++++++++++++++-------------- src/server.c | 3 ++ 6 files changed, 143 insertions(+), 66 deletions(-) diff --git a/include/bouncer.h b/include/bouncer.h index 022098c..4753b0e 100644 --- a/include/bouncer.h +++ b/include/bouncer.h @@ -257,6 +257,8 @@ struct PgSocket { SBuf sbuf; /* stream buffer, must be last */ }; +#define PG_SOCKET_SIZE (sizeof(PgSocket) + cf_sbuf_len + SBUF_MAX_REWRITE) + /* where to store old fd info during SHOW FDS result processing */ #define tmp_sk_oldfd request_time #define tmp_sk_linkfd query_start diff --git a/include/sbuf.h b/include/sbuf.h index 86d49fb..ded4c9d 100644 --- a/include/sbuf.h +++ b/include/sbuf.h @@ -16,59 +16,81 @@ * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. */ +/* + * event types for protocol handler + */ typedef enum { - SBUF_EV_READ, - SBUF_EV_RECV_FAILED, - SBUF_EV_SEND_FAILED, - SBUF_EV_CONNECT_FAILED, - SBUF_EV_CONNECT_OK, - SBUF_EV_FLUSH + SBUF_EV_READ, /* got new packet */ + SBUF_EV_RECV_FAILED, /* error */ + SBUF_EV_SEND_FAILED, /* error */ + SBUF_EV_CONNECT_FAILED, /* error */ + SBUF_EV_CONNECT_OK, /* got connection */ + SBUF_EV_FLUSH, /* data is sent, buffer empty */ + SBUF_EV_PKT_CALLBACK, /* next part of pkt data */ } SBufEvent; +/* + * If less that this amount of data is pending, then + * prefer to merge it with next recv(). + * + * It needs to be larger than data handler wants + * to see completely. Generally just header, + * but currently also ServerParam pkt. + */ +#define SBUF_SMALL_PKT 64 + +/* + * How much proto handler may want to enlarge the packet. + */ +#define SBUF_MAX_REWRITE 16 + +/* fwd def */ typedef struct SBuf SBuf; /* callback should return true if it used one of sbuf_prepare_* on sbuf, false if it used sbuf_pause(), sbuf_close() or simply wants to wait for next event loop (eg. too few data available). */ -typedef bool (*sbuf_proto_cb_t)(SBuf *sbuf, - SBufEvent evtype, - MBuf *mbuf, - void *arg); +typedef bool (*sbuf_cb_t)(SBuf *sbuf, + SBufEvent evtype, + MBuf *mbuf, + void *arg); /* for some reason, libevent has no typedef for callback */ typedef void (*sbuf_libevent_cb)(int, short, void *); +/* + * Stream Buffer. + * + * Stream is divided to packets. On each packet start + * protocol handler is called that decides what to do. + */ struct SBuf { - /* libevent handle */ - struct event ev; - - /* protocol callback function */ - sbuf_proto_cb_t proto_handler; - void *arg; + struct event ev; /* libevent handle */ - /* fd for this socket */ - int sock; + bool is_unix; /* is it unix socket */ + bool wait_send; /* debug var, otherwise useless */ + uint8_t pkt_action; /* method for handling current pkt */ - /* dest SBuf for current packet */ - SBuf *dst; + int sock; /* fd for this socket */ - int recv_pos; - int pkt_pos; - int send_pos; + int recv_pos; /* end of received data */ + int pkt_pos; /* packet processing pos */ + int send_pos; /* how far is data sent */ int pkt_remain; /* total packet length remaining */ int send_remain; /* total data to be sent remaining */ - unsigned pkt_skip:1; /* if current packet should be skipped */ - unsigned is_unix:1; /* is it unix socket */ - unsigned wait_send:1; /* debug var, otherwise useless */ + sbuf_cb_t proto_cb; /* protocol callback */ + void *proto_cb_arg; /* extra arg to callback */ + + SBuf *dst; /* target SBuf for current packet */ - uint8_t buf[0]; + uint8_t buf[0]; /* data buffer follows (cf_sbuf_len + SBUF_MAX_REWRITE) */ }; #define sbuf_socket(sbuf) ((sbuf)->sock) -void sbuf_init(SBuf *sbuf, sbuf_proto_cb_t proto_fn, void *arg); +void sbuf_init(SBuf *sbuf, sbuf_cb_t proto_fn, void *arg); void sbuf_accept(SBuf *sbuf, int read_sock, bool is_unix); void sbuf_connect(SBuf *sbuf, const PgAddr *addr, const char *unix_dir, int timeout_sec); @@ -79,6 +101,7 @@ void sbuf_close(SBuf *sbuf); /* proto_fn can use those functions to order behaviour */ void sbuf_prepare_send(SBuf *sbuf, SBuf *dst, int amount); void sbuf_prepare_skip(SBuf *sbuf, int amount); +void sbuf_prepare_fetch(SBuf *sbuf, int amount); bool sbuf_answer(SBuf *sbuf, const void *buf, int len); @@ -94,3 +117,5 @@ static inline bool sbuf_is_empty(SBuf *sbuf) && sbuf->pkt_remain == 0; } +bool sbuf_rewrite_header(SBuf *sbuf, int old_len, + const uint8_t *new_hdr, int new_len); diff --git a/src/client.c b/src/client.c index 44adc1c..499b412 100644 --- a/src/client.c +++ b/src/client.c @@ -392,6 +392,9 @@ bool client_proto(SBuf *sbuf, SBufEvent evtype, MBuf *data, void *arg) case SBUF_EV_FLUSH: /* client is not interested in it */ break; + case SBUF_EV_PKT_CALLBACK: + /* unused ATM */ + break; } return res; } diff --git a/src/objects.c b/src/objects.c index de2bb51..5b83b64 100644 --- a/src/objects.c +++ b/src/objects.c @@ -117,9 +117,9 @@ void init_objects(void) void init_caches(void) { - server_cache = objcache_create("server_cache", sizeof(PgSocket) + cf_sbuf_len, 8, + server_cache = objcache_create("server_cache", PG_SOCKET_SIZE, 8, construct_server, clean_socket); - client_cache = objcache_create("client_cache", sizeof(PgSocket) + cf_sbuf_len, 8, + client_cache = objcache_create("client_cache", PG_SOCKET_SIZE, 8, construct_client, clean_socket); } diff --git a/src/sbuf.c b/src/sbuf.c index 9670609..7d3f5c4 100644 --- a/src/sbuf.c +++ b/src/sbuf.c @@ -30,21 +30,17 @@ #define DO_RECV false #define SKIP_RECV true -/* - * If less that this amount of data is pending, then - * prefer to merge it with next recv(). - * - * It needs to be larger than data handler wants - * to see completely. Generally just header, - * but currently also ServerParam pkt. - */ -#define SMALL_PKT 64 +#define ACT_UNSET 0 +#define ACT_SEND 1 +#define ACT_SKIP 2 +#define ACT_CALL 3 + #define AssertSanity(sbuf) do { \ Assert((sbuf)->send_pos >= 0); \ Assert((sbuf)->send_pos <= (sbuf)->pkt_pos); \ Assert((sbuf)->pkt_pos <= (sbuf)->recv_pos); \ - Assert((sbuf)->recv_pos <= cf_sbuf_len); \ + Assert((sbuf)->recv_pos <= cf_sbuf_len + SBUF_MAX_REWRITE); \ Assert((sbuf)->pkt_remain >= 0); \ Assert((sbuf)->send_remain >= 0); \ } while (0) @@ -71,11 +67,11 @@ static bool sbuf_call_proto(SBuf *sbuf, int event); *********************************/ /* initialize SBuf with proto handler */ -void sbuf_init(SBuf *sbuf, sbuf_proto_cb_t proto_fn, void *arg) +void sbuf_init(SBuf *sbuf, sbuf_cb_t proto_fn, void *arg) { memset(sbuf, 0, sizeof(*sbuf)); - sbuf->arg = arg; - sbuf->proto_handler = proto_fn; + sbuf->proto_cb_arg = arg; + sbuf->proto_cb = proto_fn; } /* got new socket from accept() */ @@ -191,7 +187,7 @@ void sbuf_continue(SBuf *sbuf) * This is not true in ServerParameter case. */ /* - * if (sbuf->recv_pos - sbuf->pkt_pos >= SMALL_PKT) + * if (sbuf->recv_pos - sbuf->pkt_pos >= SBUF_SMALL_PKT) * do_recv = false; */ @@ -209,7 +205,7 @@ void sbuf_continue_with_callback(SBuf *sbuf, sbuf_libevent_cb user_cb) AssertActive(sbuf); event_set(&sbuf->ev, sbuf->sock, EV_READ | EV_PERSIST, - user_cb, sbuf->arg); + user_cb, sbuf->proto_cb_arg); event_add(&sbuf->ev, NULL); } @@ -224,7 +220,7 @@ void sbuf_close(SBuf *sbuf) sbuf->dst = NULL; sbuf->sock = 0; sbuf->pkt_pos = sbuf->pkt_remain = sbuf->recv_pos = 0; - sbuf->pkt_skip = sbuf->wait_send = 0; + sbuf->pkt_action = sbuf->wait_send = 0; sbuf->send_pos = sbuf->send_remain = 0; } @@ -233,10 +229,10 @@ void sbuf_prepare_send(SBuf *sbuf, SBuf *dst, int amount) { AssertActive(sbuf); Assert(sbuf->pkt_remain == 0); - Assert(sbuf->pkt_skip == 0 || sbuf->send_remain == 0); + Assert(sbuf->pkt_action == ACT_UNSET || sbuf->send_remain == 0); Assert(amount > 0); - sbuf->pkt_skip = 0; + sbuf->pkt_action = ACT_SEND; sbuf->pkt_remain = amount; sbuf->dst = dst; } @@ -246,12 +242,25 @@ void sbuf_prepare_skip(SBuf *sbuf, int amount) { AssertActive(sbuf); Assert(sbuf->pkt_remain == 0); - Assert(sbuf->pkt_skip == 0 || sbuf->send_remain == 0); + Assert(sbuf->pkt_action == ACT_UNSET || sbuf->send_remain == 0); Assert(amount > 0); - sbuf->pkt_skip = 1; + sbuf->pkt_action = ACT_SKIP; sbuf->pkt_remain = amount; - sbuf->dst = NULL; + /* sbuf->dst = NULL; // fixme ?? */ +} + +/* proto_fn tells to skip some amount of bytes */ +void sbuf_prepare_fetch(SBuf *sbuf, int amount) +{ + AssertActive(sbuf); + Assert(sbuf->pkt_remain == 0); + Assert(sbuf->pkt_action == ACT_UNSET || sbuf->send_remain == 0); + Assert(amount > 0); + + sbuf->pkt_action = ACT_CALL; + sbuf->pkt_remain = amount; + /* sbuf->dst = NULL; // fixme ?? */ } /************************* @@ -278,8 +287,14 @@ static bool sbuf_call_proto(SBuf *sbuf, int event) AssertSanity(sbuf); Assert(event != SBUF_EV_READ || avail > 0); + /* if pkt callback, limit only with current packet */ + if (event == SBUF_EV_PKT_CALLBACK) { + if (avail > sbuf->pkt_remain) + avail = sbuf->pkt_remain; + } + mbuf_init(&mbuf, pos, avail); - res = sbuf->proto_handler(sbuf, event, &mbuf, sbuf->arg); + res = sbuf->proto_cb(sbuf, event, &mbuf, sbuf->proto_cb_arg); AssertSanity(sbuf); Assert(event != SBUF_EV_READ || !res || sbuf->sock > 0); @@ -388,7 +403,7 @@ all_sent: static bool sbuf_process_pending(SBuf *sbuf) { int avail; - bool full = sbuf->recv_pos == cf_sbuf_len; + bool full = sbuf->recv_pos >= cf_sbuf_len; bool res; while (1) { @@ -397,13 +412,13 @@ static bool sbuf_process_pending(SBuf *sbuf) /* * Enough for now? * - * The (avail <= SMALL_PKT) check is to avoid partial pkts. + * The (avail <= SBUF_SMALL_PKT) check is to avoid partial pkts. * As SBuf should not assume knowledge about packets, * the check is not done in !full case. Packet handler can * then still notify about partial packet by returning false. */ avail = sbuf->recv_pos - sbuf->pkt_pos; - if (avail == 0 || (full && avail <= SMALL_PKT)) + if (avail == 0 || (full && avail <= SBUF_SMALL_PKT)) break; /* @@ -419,20 +434,26 @@ static bool sbuf_process_pending(SBuf *sbuf) /* walk pkt, merge sends */ if (avail > sbuf->pkt_remain) avail = sbuf->pkt_remain; - if (!sbuf->pkt_skip) { + + switch (sbuf->pkt_action) { + case ACT_SEND: if (sbuf->send_remain == 0) sbuf->send_pos = sbuf->pkt_pos; sbuf->send_remain += avail; - } - sbuf->pkt_remain -= avail; - sbuf->pkt_pos += avail; - - /* send data */ - if (sbuf->pkt_skip) { - res = sbuf_send_pending(sbuf); + break; + case ACT_CALL: + res = sbuf_call_proto(sbuf, SBUF_EV_PKT_CALLBACK); if (!res) return false; + /* after callback, skip pkt */ + case ACT_SKIP: + res = sbuf_send_pending(sbuf); + if (!res) + return res; + break; } + sbuf->pkt_remain -= avail; + sbuf->pkt_pos += avail; } return sbuf_send_pending(sbuf); @@ -452,7 +473,7 @@ static void sbuf_try_resync(SBuf *sbuf) if (avail == 0) { sbuf->recv_pos = sbuf->pkt_pos = sbuf->send_pos = 0; - } else if (avail <= SMALL_PKT) { + } else if (avail <= SBUF_SMALL_PKT) { memmove(sbuf->buf, sbuf->buf + sbuf->send_pos, avail); sbuf->pkt_pos -= sbuf->send_pos; sbuf->send_pos = 0; @@ -529,7 +550,7 @@ try_more: sbuf_try_resync(sbuf); /* - * here used to be if (free > SMALL_PKT) check + * here used to be if (free > SBUF_SMALL_PKT) check * but with skip_recv switch its should not be needed anymore. */ free = cf_sbuf_len - sbuf->recv_pos; @@ -557,7 +578,7 @@ skip_recv: return; /* if the buffer is full, there can be more data available */ - if (sbuf->recv_pos == cf_sbuf_len) + if (sbuf->recv_pos >= cf_sbuf_len) goto try_more; /* clean buffer */ @@ -619,3 +640,26 @@ bool sbuf_answer(SBuf *sbuf, const void *buf, int len) return res == len; } +bool sbuf_rewrite_header(SBuf *sbuf, int old_len, + const uint8_t *new_hdr, int new_len) +{ + int avail = sbuf->recv_pos - sbuf->pkt_pos; + int diff = new_len - old_len; + uint8_t *pkt_pos = sbuf->buf + sbuf->pkt_pos; + uint8_t *old_pos = pkt_pos + old_len; + uint8_t *new_pos = pkt_pos + new_len; + + AssertActive(sbuf); + Assert(old_len >= 0 && new_len >= 0); + Assert(diff <= SBUF_MAX_REWRITE); + + /* overflow can be triggered by user by sending multiple Parse pkts */ + if (sbuf->recv_pos + diff > cf_sbuf_len + SBUF_MAX_REWRITE) + return false; + + memmove(new_pos, old_pos, avail - old_len); + memcpy(pkt_pos, new_hdr, new_len); + sbuf->recv_pos += diff; + return true; +} + diff --git a/src/server.c b/src/server.c index b06d3e0..9c9e92c 100644 --- a/src/server.c +++ b/src/server.c @@ -357,6 +357,9 @@ bool server_proto(SBuf *sbuf, SBufEvent evtype, MBuf *data, void *arg) } } break; + case SBUF_EV_PKT_CALLBACK: + slog_warning(server, "SBUF_EV_PKT_CALLBACK with state=%d", server->state); + break; } return res; } -- 2.40.0