#include "event-config.h"
#include "evutil.h"
+#include "defer-internal.h"
+
+struct bufferevent_private {
+ struct bufferevent bev;
+
+ /** Evbuffer callback to enforce watermarks on input. */
+ struct evbuffer_cb_entry *read_watermarks_cb;
+
+ /** If set, read is suspended until evbuffer some. */
+ unsigned read_suspended : 1;
+
+ enum bufferevent_options options;
+
+ int refcnt;
+ void *lock;
+};
/**
Implementation table for a bufferevent: holds function pointers and other
extern const struct bufferevent_ops bufferevent_ops_pair;
/** Initialize the shared parts of a bufferevent. */
-int bufferevent_init_common(struct bufferevent *, struct event_base *, const struct bufferevent_ops *, enum bufferevent_options options);
+int bufferevent_init_common(struct bufferevent_private *, struct event_base *, const struct bufferevent_ops *, enum bufferevent_options options);
/** For internal use: temporarily stop all reads on bufev, because its
* read buffer is too full. */
void
bufferevent_wm_suspend_read(struct bufferevent *bufev)
{
- if (!bufev->read_suspended) {
+ struct bufferevent_private *bufev_private =
+ EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
+ if (!bufev_private->read_suspended) {
bufev->be_ops->disable(bufev, EV_READ);
- bufev->read_suspended = 1;
+ bufev_private->read_suspended = 1;
}
}
void
bufferevent_wm_unsuspend_read(struct bufferevent *bufev)
{
- if (bufev->read_suspended) {
- bufev->read_suspended = 0;
+ struct bufferevent_private *bufev_private =
+ EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
+ if (bufev_private->read_suspended) {
+ bufev_private->read_suspended = 0;
if (bufev->enabled & EV_READ)
bufev->be_ops->enable(bufev, EV_READ);
}
}
int
-bufferevent_init_common(struct bufferevent *bufev, struct event_base *base,
- const struct bufferevent_ops *ops,
- enum bufferevent_options options)
+bufferevent_init_common(struct bufferevent_private *bufev_private,
+ struct event_base *base,
+ const struct bufferevent_ops *ops,
+ enum bufferevent_options options)
{
+ struct bufferevent *bufev = &bufev_private->bev;
+
if ((bufev->input = evbuffer_new()) == NULL)
return -1;
*/
bufev->enabled = EV_WRITE;
- bufev->options = options;
+ bufev_private->options = options;
return 0;
}
int
bufferevent_enable(struct bufferevent *bufev, short event)
{
+ struct bufferevent_private *bufev_private =
+ EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
short impl_events = event;
- if (bufev->read_suspended)
+ if (bufev_private->read_suspended)
impl_events &= ~EV_READ;
bufev->enabled |= event;
bufferevent_setwatermark(struct bufferevent *bufev, short events,
size_t lowmark, size_t highmark)
{
+ struct bufferevent_private *bufev_private =
+ EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
if (events & EV_WRITE) {
bufev->wm_write.low = lowmark;
enable the callback if needed, and see if we should
suspend/bufferevent_wm_unsuspend. */
- if (bufev->read_watermarks_cb == NULL) {
- bufev->read_watermarks_cb =
+ if (bufev_private->read_watermarks_cb == NULL) {
+ bufev_private->read_watermarks_cb =
evbuffer_add_cb(bufev->input,
bufferevent_inbuf_wm_cb,
bufev);
}
evbuffer_cb_set_flags(bufev->input,
- bufev->read_watermarks_cb,
+ bufev_private->read_watermarks_cb,
EVBUFFER_CB_ENABLED);
if (EVBUFFER_LENGTH(bufev->input) > highmark)
bufferevent_wm_unsuspend_read(bufev);
} else {
/* There is now no high-water mark for read. */
- if (bufev->read_watermarks_cb)
+ if (bufev_private->read_watermarks_cb)
evbuffer_cb_set_flags(bufev->input,
- bufev->read_watermarks_cb,
- EVBUFFER_CB_DISABLED);
+ bufev_private->read_watermarks_cb,
+ EVBUFFER_CB_DISABLED);
bufferevent_wm_unsuspend_read(bufev);
}
}
const struct evbuffer_cb_info *info, void *arg);
struct bufferevent_filtered {
- struct bufferevent bev;
+ struct bufferevent_private bev;
/** The bufferevent that we read/write filterd data from/to. */
struct bufferevent *underlying;
if (bev->be_ops != &bufferevent_ops_filter)
return NULL;
bev_f = (void*)( ((char*)bev) -
- evutil_offsetof(struct bufferevent_filtered, bev) );
- assert(bev_f->bev.be_ops == &bufferevent_ops_filter);
+ evutil_offsetof(struct bufferevent_filtered, bev.bev));
+ assert(bev_f->bev.bev.be_ops == &bufferevent_ops_filter);
return bev_f;
}
-#define downcast(bev_f) (&(bev_f)->bev)
+#define downcast(bev_f) (&(bev_f)->bev.bev)
/** Return 1 iff bevf's underlying bufferevent's output buffer is at or
* over its high watermark such that we should not write to it in a given
bufferevent_setcb(bufev_f->underlying,
be_filter_readcb, be_filter_writecb, be_filter_errorcb, bufev_f);
- bufev_f->outbuf_cb = evbuffer_add_cb(bufev_f->bev.output,
+ bufev_f->outbuf_cb = evbuffer_add_cb(downcast(bufev_f)->output,
bufferevent_filtered_outbuf_cb, bufev_f);
- return &bufev_f->bev;
+ return downcast(bufev_f);
}
static void
if (bevf->free_context)
bevf->free_context(bevf->context);
- if (bev->options & BEV_OPT_CLOSE_ON_FREE)
+ if (bevf->bev.options & BEV_OPT_CLOSE_ON_FREE)
bufferevent_free(bevf->underlying);
}
int *processed_out)
{
enum bufferevent_filter_result res;
+ struct bufferevent *bev = downcast(bevf);
if (state == BEV_NORMAL) {
/* If we're in 'normal' mode, don't urge data on the filter
* unless we're reading data and under our high-water mark.*/
- if (!(bevf->bev.enabled & EV_READ) ||
+ if (!(bev->enabled & EV_READ) ||
be_readbuf_full(bevf, state))
return BEV_OK;
}
do {
ssize_t limit = -1;
- if (state == BEV_NORMAL && bevf->bev.wm_read.high)
- limit = bevf->bev.wm_read.high -
- EVBUFFER_LENGTH(bevf->bev.input);
+ if (state == BEV_NORMAL && bev->wm_read.high)
+ limit = bev->wm_read.high -
+ EVBUFFER_LENGTH(bev->input);
res = bevf->process_in(bevf->underlying->input,
- bevf->bev.input, limit, state, bevf->context);
+ bev->input, limit, state, bevf->context);
if (res == BEV_OK)
*processed_out = 1;
} while (res == BEV_OK &&
- (bevf->bev.enabled & EV_READ) &&
+ (bev->enabled & EV_READ) &&
EVBUFFER_LENGTH(bevf->underlying->input) &&
!be_readbuf_full(bevf, state));
limit = bevf->underlying->wm_write.high -
EVBUFFER_LENGTH(bevf->underlying->output);
- res = bevf->process_out(bevf->bev.output,
+ res = bevf->process_out(downcast(bevf)->output,
bevf->underlying->output,
limit,
state,
be_filter_errorcb(struct bufferevent *underlying, short what, void *_me)
{
struct bufferevent_filtered *bevf = _me;
+ struct bufferevent *bev = downcast(bevf);
/* All we can really to is tell our own errorcb. */
- if (bevf->bev.errorcb)
- bevf->bev.errorcb(&bevf->bev, what, bevf->bev.cbarg);
+ if (bev->errorcb)
+ bev->errorcb(bev, what, bev->cbarg);
}
static int
#include "util-internal.h"
struct bufferevent_pair {
- struct bufferevent bev;
+ struct bufferevent_private bev;
struct bufferevent_pair *partner;
struct deferred_cb deferred_write_cb;
struct deferred_cb deferred_read_cb;
struct bufferevent_pair *bev_p;
if (bev->be_ops != &bufferevent_ops_pair)
return NULL;
- bev_p = EVUTIL_UPCAST(bev, struct bufferevent_pair, bev);
- assert(bev_p->bev.be_ops == &bufferevent_ops_pair);
+ bev_p = EVUTIL_UPCAST(bev, struct bufferevent_pair, bev.bev);
+ assert(bev_p->bev.bev.be_ops == &bufferevent_ops_pair);
return bev_p;
}
-#define downcast(bev_pair) (&(bev_pair)->bev)
+#define downcast(bev_pair) (&(bev_pair)->bev.bev)
/* XXX Handle close */
}
/* XXX set read timeout event */
/* XXX set write timeout event */
- if (!evbuffer_add_cb(bufev->bev.output, be_pair_outbuf_cb, bufev)) {
+ if (!evbuffer_add_cb(bufev->bev.bev.output, be_pair_outbuf_cb, bufev)) {
bufferevent_free(downcast(bufev));
return NULL;
}
bufev1->partner = bufev2;
bufev2->partner = bufev1;
- evbuffer_freeze(bufev1->bev.input, 0);
- evbuffer_freeze(bufev1->bev.output, 1);
- evbuffer_freeze(bufev2->bev.input, 0);
- evbuffer_freeze(bufev2->bev.output, 1);
+ evbuffer_freeze(downcast(bufev1)->input, 0);
+ evbuffer_freeze(downcast(bufev1)->output, 1);
+ evbuffer_freeze(downcast(bufev2)->input, 0);
+ evbuffer_freeze(downcast(bufev2)->output, 1);
pair[0] = downcast(bufev1);
pair[1] = downcast(bufev2);
}
static inline int
-be_pair_wants_to_talk(struct bufferevent *src, struct bufferevent *dst)
+be_pair_wants_to_talk(struct bufferevent_pair *src,
+ struct bufferevent_pair *dst)
{
- return (src->enabled & EV_WRITE) &&
- (dst->enabled & EV_READ) && !dst->read_suspended &&
- evbuffer_get_length(src->output);
+ return (downcast(src)->enabled & EV_WRITE) &&
+ (downcast(dst)->enabled & EV_READ) &&
+ !dst->bev.read_suspended &&
+ evbuffer_get_length(downcast(src)->output);
}
static void
if (info->n_added > info->n_deleted && partner) {
/* We got more data. If the other side's reading, then
hand it over. */
- if (be_pair_wants_to_talk(downcast(bev_pair),
- downcast(partner))) {
+ if (be_pair_wants_to_talk(bev_pair, partner)) {
be_pair_transfer(downcast(bev_pair), downcast(partner), 0);
}
}
/* We're starting to read! Does the other side have anything to write?*/
if ((events & EV_READ) && partner &&
- be_pair_wants_to_talk(downcast(partner), bufev)) {
+ be_pair_wants_to_talk(partner, bev_p)) {
be_pair_transfer(downcast(partner), bufev, 0);
}
/* We're starting to write! Does the other side want to read? */
if ((events & EV_WRITE) && partner &&
- be_pair_wants_to_talk(bufev, downcast(partner))) {
+ be_pair_wants_to_talk(bev_p, partner)) {
be_pair_transfer(bufev, downcast(partner), 0);
}
return 0;
bufferevent_socket_new(struct event_base *base, evutil_socket_t fd,
enum bufferevent_options options)
{
+ struct bufferevent_private *bufev_p;
struct bufferevent *bufev;
- if ((bufev = mm_calloc(1, sizeof(struct bufferevent))) == NULL)
+ if ((bufev_p = mm_calloc(1, sizeof(struct bufferevent_private)))== NULL)
return NULL;
- if (bufferevent_init_common(bufev, base, &bufferevent_ops_socket,
+ if (bufferevent_init_common(bufev_p, base, &bufferevent_ops_socket,
options) < 0) {
- mm_free(bufev);
+ mm_free(bufev_p);
return NULL;
}
+ bufev = &bufev_p->bev;
event_assign(&bufev->ev_read, bufev->ev_base, fd,
EV_READ|EV_PERSIST, bufferevent_readcb, bufev);
static void
be_socket_destruct(struct bufferevent *bufev)
{
+ struct bufferevent_private *bufev_p =
+ EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
evutil_socket_t fd;
assert(bufev->be_ops == &bufferevent_ops_socket);
event_del(&bufev->ev_read);
event_del(&bufev->ev_write);
- if (bufev->options & BEV_OPT_CLOSE_ON_FREE)
+ if (bufev_p->options & BEV_OPT_CLOSE_ON_FREE)
EVUTIL_CLOSESOCKET(fd);
}
struct timeval timeout_read;
struct timeval timeout_write;
- /** Evbuffer callback to enforce watermarks on input. */
- struct evbuffer_cb_entry *read_watermarks_cb;
-
/** Events that are currently enabled: currently EV_READ and EV_WRITE
are supported. */
short enabled;
- /** If set, read is suspended until evbuffer some. */
- unsigned read_suspended : 1; /* */
-
- enum bufferevent_options options;
};
#ifdef __cplusplus