#define AssertSanity(sbuf) do { \
- Assert(iobuf_sane(&(sbuf)->io)); \
+ Assert(iobuf_sane((sbuf)->io)); \
Assert((sbuf)->pkt_remain >= 0); \
} while (0)
static void sbuf_connect_cb(int sock, short flags, void *arg);
static void sbuf_recv_cb(int sock, short flags, void *arg);
static void sbuf_send_cb(int sock, short flags, void *arg);
-static void sbuf_try_resync(SBuf *sbuf);
+static void sbuf_try_resync(SBuf *sbuf, bool release);
static bool sbuf_wait_for_data(SBuf *sbuf) _MUSTCHECK;
static void sbuf_main_loop(SBuf *sbuf, bool skip_recv);
static bool sbuf_call_proto(SBuf *sbuf, int event) /* _MUSTCHECK */;
static bool sbuf_actual_recv(SBuf *sbuf, int len) _MUSTCHECK;
static bool sbuf_after_connect_check(SBuf *sbuf) _MUSTCHECK;
-static inline IOBuf *get_iobuf(SBuf *sbuf) { return &sbuf->io; }
+static inline IOBuf *get_iobuf(SBuf *sbuf) { return sbuf->io; }
/*********************************
* Public functions
/* initialize SBuf with proto handler */
void sbuf_init(SBuf *sbuf, sbuf_cb_t proto_fn, void *arg)
{
- memset(sbuf, 0, RAW_SBUF_SIZE);
+ memset(sbuf, 0, sizeof(SBuf));
sbuf->proto_cb_arg = arg;
sbuf->proto_cb = proto_fn;
}
{
bool res;
- Assert(iobuf_empty(&sbuf->io) && sbuf->sock == 0);
+ Assert(iobuf_empty(sbuf->io) && sbuf->sock == 0);
AssertSanity(sbuf);
tune_socket(sock, is_unix);
socklen_t len;
struct timeval timeout;
- Assert(iobuf_empty(&sbuf->io) && sbuf->sock == 0);
+ Assert(iobuf_empty(sbuf->io) && sbuf->sock == 0);
AssertSanity(sbuf);
/* prepare sockaddr */
sbuf->sock = 0;
sbuf->pkt_remain = 0;
sbuf->pkt_action = sbuf->wait_send = 0;
- iobuf_reset(get_iobuf(sbuf));
+ if (sbuf->io) {
+ obj_free(iobuf_cache, sbuf->io);
+ sbuf->io = NULL;
+ }
return true;
}
static bool sbuf_call_proto(SBuf *sbuf, int event)
{
MBuf mbuf;
- IOBuf *io = get_iobuf(sbuf);
+ IOBuf *io = sbuf->io;
bool res;
AssertSanity(sbuf);
/* if pkt callback, limit only with current packet */
if (event == SBUF_EV_PKT_CALLBACK)
iobuf_parse_limit(io, &mbuf, sbuf->pkt_remain);
- else
+ else if (event == SBUF_EV_READ)
iobuf_parse_all(io, &mbuf);
+ else
+ memset(&mbuf, 0, sizeof(mbuf));
res = sbuf->proto_cb(sbuf, event, &mbuf, sbuf->proto_cb_arg);
static bool sbuf_send_pending(SBuf *sbuf)
{
int res, avail;
- IOBuf *io = get_iobuf(sbuf);
+ IOBuf *io = sbuf->io;
AssertActive(sbuf);
- Assert(sbuf->dst || iobuf_amount_pending(&sbuf->io) == 0);
+ Assert(sbuf->dst || iobuf_amount_pending(io) == 0);
try_more:
/* how much data is available for sending */
static bool sbuf_process_pending(SBuf *sbuf)
{
int avail;
- IOBuf *io = get_iobuf(sbuf);
+ IOBuf *io = sbuf->io;
bool full = iobuf_amount_recv(io) <= 0;
bool res;
}
/* reposition at buffer start again */
-static void sbuf_try_resync(SBuf *sbuf)
+static void sbuf_try_resync(SBuf *sbuf, bool release)
{
- IOBuf *io = get_iobuf(sbuf);
+ IOBuf *io = sbuf->io;
+ if (io)
+ log_debug("reync: done=%d, parse=%d, recv=%d",
+ io->done_pos, io->parse_pos, io->recv_pos);
AssertActive(sbuf);
- iobuf_try_resync(io, SBUF_SMALL_PKT);
+ if (!io)
+ return;
+
+ if (release && iobuf_empty(io)) {
+ obj_free(iobuf_cache, io);
+ sbuf->io = NULL;
+ } else
+ iobuf_try_resync(io, SBUF_SMALL_PKT);
}
/* actually ask kernel for more data */
static bool sbuf_actual_recv(SBuf *sbuf, int len)
{
int got;
- IOBuf *io = get_iobuf(sbuf);
+ IOBuf *io = sbuf->io;
AssertActive(sbuf);
Assert(len > 0);
sbuf_main_loop(sbuf, DO_RECV);
}
+static bool allocate_iobuf(SBuf *sbuf)
+{
+ if (sbuf->io == NULL) {
+ sbuf->io = obj_alloc(iobuf_cache);
+ if (sbuf->io == NULL) {
+ sbuf_call_proto(sbuf, SBUF_EV_RECV_FAILED);
+ return false;
+ }
+ iobuf_reset(sbuf->io);
+ }
+ return true;
+}
+
/*
* Main recv-parse-send-repeat loop.
*
static void sbuf_main_loop(SBuf *sbuf, bool skip_recv)
{
int free, ok;
- IOBuf *io = get_iobuf(sbuf);
/* sbuf was closed before in this event loop */
if (!sbuf->sock)
Assert(sbuf->wait_send == 0);
AssertSanity(sbuf);
+ if (!allocate_iobuf(sbuf))
+ return;
+
/* avoid recv() if asked */
if (skip_recv)
goto skip_recv;
try_more:
/* make room in buffer */
- sbuf_try_resync(sbuf);
+ sbuf_try_resync(sbuf, false);
/*
* here used to be if (free > SBUF_SMALL_PKT) check
* but with skip_recv switch its should not be needed anymore.
*/
- free = iobuf_amount_recv(io);
+ free = iobuf_amount_recv(sbuf->io);
if (free > 0) {
/*
* When suspending, try to hit packet boundary ASAP.
return;
/* if the buffer is full, there can be more data available */
- if (iobuf_amount_recv(io) <= 0)
+ if (iobuf_amount_recv(sbuf->io) <= 0)
goto try_more;
/* clean buffer */
- sbuf_try_resync(sbuf);
+ sbuf_try_resync(sbuf, true);
/* notify proto that all is sent */
if (sbuf_is_empty(sbuf))
return res == len;
}
-bool sbuf_rewrite_header(SBuf *sbuf, int old_len,
- const uint8_t *new_hdr, int new_len)
-{
-#if 0
- int avail = sbuf->recv_pos - sbuf->pkt_pos;
- int diff = new_len - old_len;
- uint8_t *pkt_pos = sbuf->buf + sbuf->pkt_pos;
- uint8_t *old_pos = pkt_pos + old_len;
- uint8_t *new_pos = pkt_pos + new_len;
-
- AssertActive(sbuf);
- Assert(old_len >= 0 && new_len >= 0);
- Assert(diff <= SBUF_MAX_REWRITE);
-
- /* overflow can be triggered by user by sending multiple Parse pkts */
- if (sbuf->recv_pos + diff > cf_sbuf_len + SBUF_MAX_REWRITE)
- return false;
-
- memmove(new_pos, old_pos, avail - old_len);
- memcpy(pkt_pos, new_hdr, new_len);
- sbuf->recv_pos += diff;
-#endif
- return false;
-}
-