-Changes in current version:
+Changes in 2.0.2-alpha:
+ o Add a new flag to bufferevents to make all callbacks automatically deferred.
+
+Changes in 2.0.1-alpha:
o free minheap on event_base_free(); from Christopher Layne
o debug cleanups in signal.c; from Christopher Layne
o provide event_base_new() that does not set the current_base global
/** If set, read is suspended until evbuffer some. */
unsigned read_suspended : 1;
+ /** If set, we should free the lock when we free the bufferevent. */
unsigned own_lock : 1;
+ unsigned readcb_pending : 1;
+ unsigned writecb_pending : 1;
+ short errorcb_pending;
+ int errno_pending;
+ struct deferred_cb deferred;
+
enum bufferevent_options options;
int refcnt;
void bufferevent_incref(struct bufferevent *bufev);
void _bufferevent_decref_and_unlock(struct bufferevent *bufev);
+void _bufferevent_run_readcb(struct bufferevent *bufev);
+void _bufferevent_run_writecb(struct bufferevent *bufev);
+void _bufferevent_run_errorcb(struct bufferevent *bufev, short what);
+
#define BEV_UPCAST(b) EVUTIL_UPCAST((b), struct bufferevent_private, bev)
#define BEV_LOCK(b) do { \
#ifdef WIN32
#include <winsock2.h>
#endif
+#include <errno.h>
#include "event2/util.h"
#include "event2/bufferevent.h"
}
}
+static void
+bufferevent_run_deferred_callbacks(struct deferred_cb *_, void *arg)
+{
+ struct bufferevent_private *bufev_private = arg;
+ struct bufferevent *bufev = &bufev_private->bev;
+
+ BEV_LOCK(bufev);
+ if (bufev_private->readcb_pending && bufev->readcb) {
+ bufev_private->readcb_pending = 0;
+ bufev->readcb(bufev, bufev->cbarg);
+ }
+ if (bufev_private->writecb_pending && bufev->writecb) {
+ bufev_private->writecb_pending = 0;
+ bufev->writecb(bufev, bufev->cbarg);
+ }
+ if (bufev_private->errorcb_pending && bufev->errorcb) {
+ short what = bufev_private->errorcb_pending;
+ int err = bufev_private->errno_pending;
+ bufev_private->errorcb_pending = 0;
+ bufev_private->errno_pending = 0;
+ EVUTIL_SET_SOCKET_ERROR(err);
+ bufev->errorcb(bufev, what, bufev->cbarg);
+ }
+ _bufferevent_decref_and_unlock(bufev);
+}
+
+void
+_bufferevent_run_readcb(struct bufferevent *bufev)
+{
+ /* Requires lock. */
+ struct bufferevent_private *p =
+ EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
+ if (p->options & BEV_OPT_DEFER_CALLBACKS) {
+ p->readcb_pending = 1;
+ if (!p->deferred.queued) {
+ bufferevent_incref(bufev);
+ event_deferred_cb_schedule(
+ bufev->ev_base, &p->deferred);
+ }
+ } else {
+ bufev->readcb(bufev, bufev->cbarg);
+ }
+}
+
+void
+_bufferevent_run_writecb(struct bufferevent *bufev)
+{
+ /* Requires lock. */
+ struct bufferevent_private *p =
+ EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
+ if (p->options & BEV_OPT_DEFER_CALLBACKS) {
+ p->writecb_pending = 1;
+ if (!p->deferred.queued) {
+ bufferevent_incref(bufev);
+ event_deferred_cb_schedule(
+ bufev->ev_base, &p->deferred);
+ }
+ } else {
+ bufev->writecb(bufev, bufev->cbarg);
+ }
+}
+
+void
+_bufferevent_run_errorcb(struct bufferevent *bufev, short what)
+{
+ /* Requires lock. */
+ struct bufferevent_private *p =
+ EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
+ if (p->options & BEV_OPT_DEFER_CALLBACKS) {
+ p->errorcb_pending |= what;
+ p->errno_pending = EVUTIL_SOCKET_ERROR();
+ if (!p->deferred.queued) {
+ bufferevent_incref(bufev);
+ event_deferred_cb_schedule(
+ bufev->ev_base, &p->deferred);
+ }
+ } else {
+ bufev->errorcb(bufev, what, bufev->cbarg);
+ }
+}
+
int
bufferevent_init_common(struct bufferevent_private *bufev_private,
struct event_base *base,
}
}
#endif
+ if (options & BEV_OPT_DEFER_CALLBACKS) {
+ event_deferred_cb_init(&bufev_private->deferred,
+ bufferevent_run_deferred_callbacks,
+ bufev_private);
+ }
bufev_private->options = options;
if (processed && bufev->writecb &&
evbuffer_get_length(bufev->output) <= bufev->wm_write.low) {
/* call the write callback.*/
- (*bufev->writecb)(bufev, bufev->cbarg);
+ _bufferevent_run_writecb(bufev);
if (res == BEV_OK &&
(bufev->enabled & EV_WRITE) &&
if (processed_any &&
evbuffer_get_length(bufev->input) >= bufev->wm_read.low &&
bufev->readcb != NULL)
- (*bufev->readcb)(bufev, bufev->cbarg);
+ _bufferevent_run_readcb(bufev);
}
/* Called when the underlying socket has drained enough that we can write to
/* All we can really to is tell our own errorcb. */
if (bev->errorcb)
- bev->errorcb(bev, what, bev->cbarg);
+ _bufferevent_run_errorcb(bev, what);
}
static int
struct bufferevent_pair {
struct bufferevent_private bev;
struct bufferevent_pair *partner;
- struct deferred_cb deferred_write_cb;
- struct deferred_cb deferred_read_cb;
};
static void be_pair_outbuf_cb(struct evbuffer *,
const struct evbuffer_cb_info *, void *);
-static void
-run_callback(struct deferred_cb *cb, void *arg)
-{
- struct bufferevent_pair *bufev = arg;
- struct bufferevent *bev = downcast(bufev);
-
- BEV_LOCK(bev);
- if (cb == &bufev->deferred_read_cb) {
- if (bev->readcb) {
- bev->readcb(bev, bev->cbarg);
- }
- } else {
- if (bev->writecb) {
- bev->writecb(bev, bev->cbarg);
- }
- }
- BEV_UNLOCK(bev);
-}
-
static struct bufferevent_pair *
bufferevent_pair_elt_new(struct event_base *base,
enum bufferevent_options options)
bufferevent_free(downcast(bufev));
return NULL;
}
- event_deferred_cb_init(&bufev->deferred_read_cb, run_callback, bufev);
- event_deferred_cb_init(&bufev->deferred_write_cb, run_callback, bufev);
return bufev;
}
struct bufferevent *pair[2])
{
struct bufferevent_pair *bufev1 = NULL, *bufev2 = NULL;
- enum bufferevent_options tmp_options = options & ~BEV_OPT_THREADSAFE;
+ enum bufferevent_options tmp_options;
+
+ options |= BEV_OPT_DEFER_CALLBACKS;
+ tmp_options = options & ~BEV_OPT_THREADSAFE;
bufev1 = bufferevent_pair_elt_new(base, options);
if (!bufev1)
dst_size = evbuffer_get_length(dst->input);
if (dst_size >= dst->wm_read.low && dst->readcb) {
- event_deferred_cb_schedule(dst->ev_base,
- &(upcast(dst)->deferred_read_cb));
+ _bufferevent_run_readcb(dst);
}
if (src_size <= src->wm_write.low && src->writecb) {
- event_deferred_cb_schedule(src->ev_base,
- &(upcast(src)->deferred_write_cb));
+ _bufferevent_run_writecb(src);
}
done:
evbuffer_freeze(src->output, 1);
bev_p->partner->partner = NULL;
bev_p->partner = NULL;
}
- event_deferred_cb_cancel(bev->ev_base, &bev_p->deferred_write_cb);
- event_deferred_cb_cancel(bev->ev_base, &bev_p->deferred_read_cb);
}
static void
/* Invoke the user callback - must always be called last */
if (evbuffer_get_length(input) >= bufev->wm_read.low &&
bufev->readcb != NULL)
- (*bufev->readcb)(bufev, bufev->cbarg);
+ _bufferevent_run_readcb(bufev);
return;
error:
event_del(&bufev->ev_read);
- (*bufev->errorcb)(bufev, what, bufev->cbarg);
-
+ _bufferevent_run_errorcb(bufev, what);
}
static void
*/
if (bufev->writecb != NULL &&
evbuffer_get_length(bufev->output) <= bufev->wm_write.low)
- (*bufev->writecb)(bufev, bufev->cbarg);
+ _bufferevent_run_writecb(bufev);
return;
error:
event_del(&bufev->ev_write);
- (*bufev->errorcb)(bufev, what, bufev->cbarg);
+ _bufferevent_run_errorcb(bufev, what);
}
struct bufferevent *
/** If set, and threading is enabled, operations on this bufferevent
* are protected by a lock */
BEV_OPT_THREADSAFE = (1<<1),
+
+ /** If set, callbacks are run deferred in the event loop. */
+ BEV_OPT_DEFER_CALLBACKS = (1<<2),
};
/**