#include "event-config.h"
#include "evutil.h"
#include "defer-internal.h"
+#include "evthread-internal.h"
+#include "event2/thread.h"
struct bufferevent_private {
struct bufferevent bev;
/** If set, read is suspended until evbuffer some. */
unsigned read_suspended : 1;
+ unsigned own_lock : 1;
enum bufferevent_options options;
* read buffer is too full. */
void bufferevent_wm_unsuspend_read(struct bufferevent *bufev);
+int bufferevent_enable_locking(struct bufferevent *bufev, void *lock);
+
+#define BEV_UPCAST(b) EVUTIL_UPCAST((b), struct bufferevent_private, bev)
+
+#define BEV_LOCK(b) do { \
+ struct bufferevent_private *locking = BEV_UPCAST(b); \
+ if (locking->lock) \
+ EVLOCK_LOCK(locking->lock, EVTHREAD_WRITE); \
+ } while(0)
+
+#define BEV_UNLOCK(b) do { \
+ struct bufferevent_private *locking = BEV_UPCAST(b); \
+ if (locking->lock) \
+ EVLOCK_UNLOCK(locking->lock, EVTHREAD_WRITE); \
+ } while(0)
+
#ifdef __cplusplus
}
#endif
#include "bufferevent-internal.h"
#include "util-internal.h"
-
void
bufferevent_wm_suspend_read(struct bufferevent *bufev)
{
struct bufferevent_private *bufev_private =
EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
+ BEV_LOCK(bufev);
if (!bufev_private->read_suspended) {
bufev->be_ops->disable(bufev, EV_READ);
bufev_private->read_suspended = 1;
}
+ BEV_LOCK(bufev);
}
void
{
struct bufferevent_private *bufev_private =
EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
+
+ BEV_LOCK(bufev);
if (bufev_private->read_suspended) {
bufev_private->read_suspended = 0;
if (bufev->enabled & EV_READ)
bufev->be_ops->enable(bufev, EV_READ);
}
+ BEV_LOCK(bufev);
}
/* Callback to implement watermarks on the input buffer. Only enabled
void *arg)
{
struct bufferevent *bufev = arg;
- size_t size = evbuffer_get_length(buf);
+ size_t size;
+
+ size = evbuffer_get_length(buf);
if (cbinfo->n_added > cbinfo->n_deleted) {
/* Data got added. If it put us over the watermark, stop
*/
bufev->enabled = EV_WRITE;
+#ifndef _EVENT_DISABLE_THREAD_SUPPORT
+ if (options & BEV_OPT_THREADSAFE) {
+ if (bufferevent_enable_locking(bufev, NULL) < 0) {
+ /* cleanup */
+ return -1;
+ }
+ }
+#endif
+
bufev_private->options = options;
return 0;
bufferevent_setcb(struct bufferevent *bufev,
evbuffercb readcb, evbuffercb writecb, everrorcb errorcb, void *cbarg)
{
+ BEV_LOCK(bufev);
+
bufev->readcb = readcb;
bufev->writecb = writecb;
bufev->errorcb = errorcb;
bufev->cbarg = cbarg;
+ BEV_UNLOCK(bufev);
}
struct evbuffer *
struct bufferevent_private *bufev_private =
EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
short impl_events = event;
+ int r = 0;
+
+ BEV_LOCK(bufev);
if (bufev_private->read_suspended)
impl_events &= ~EV_READ;
bufev->enabled |= event;
if (bufev->be_ops->enable(bufev, impl_events) < 0)
- return -1;
+ r = -1;
- return (0);
+ BEV_UNLOCK(bufev);
+ return r;
}
void
const struct timeval *tv_read,
const struct timeval *tv_write)
{
+ BEV_LOCK(bufev);
if (tv_read) {
bufev->timeout_read = *tv_read;
} else {
if (bufev->be_ops->adj_timeouts)
bufev->be_ops->adj_timeouts(bufev);
+ BEV_UNLOCK(bufev);
}
int
bufferevent_disable(struct bufferevent *bufev, short event)
{
+ int r = 0;
+
+ BEV_LOCK(bufev);
bufev->enabled &= ~event;
if (bufev->be_ops->disable(bufev, event) < 0)
- return (-1);
+ r = -1;
- return (0);
+ BEV_UNLOCK(bufev);
+ return r;
}
/*
struct bufferevent_private *bufev_private =
EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
+ BEV_LOCK(bufev);
if (events & EV_WRITE) {
bufev->wm_write.low = lowmark;
bufev->wm_write.high = highmark;
bufferevent_wm_unsuspend_read(bufev);
}
}
+ BEV_UNLOCK(bufev);
}
int
short iotype,
enum bufferevent_flush_mode mode)
{
+ int r = -1;
+ BEV_LOCK(bufev);
if (bufev->be_ops->flush)
- return bufev->be_ops->flush(bufev, iotype, mode);
- else
- return -1;
+ r = bufev->be_ops->flush(bufev, iotype, mode);
+ BEV_UNLOCK(bufev);
+ return r;
}
void
/* Free the actual allocated memory. */
mm_free(bufev - bufev->be_ops->mem_offset);
+ /* Free lock XXX */
+}
+
+int
+bufferevent_enable_locking(struct bufferevent *bufev, void *lock)
+{
+#ifdef _EVENT_DISABLE_THREAD_SUPPORT
+ return -1;
+#else
+ if (BEV_UPCAST(bufev)->lock)
+ return -1;
+
+ if (!lock) {
+ EVTHREAD_ALLOC_LOCK(lock);
+ if (!lock)
+ return -1;
+ BEV_UPCAST(bufev)->lock = lock;
+ BEV_UPCAST(bufev)->own_lock = 1;
+ } else {
+ BEV_UPCAST(bufev)->lock = lock;
+ BEV_UPCAST(bufev)->own_lock = 0;
+ }
+ evbuffer_enable_locking(bufev->input, lock);
+ evbuffer_enable_locking(bufev->output, lock);
+
+ return 0;
+#endif
}
void *ctx)
{
struct bufferevent_filtered *bufev_f;
+ enum bufferevent_options tmp_options = options & ~BEV_OPT_THREADSAFE;
if (!input_filter)
input_filter = be_null_filter;
return NULL;
if (bufferevent_init_common(&bufev_f->bev, underlying->ev_base,
- &bufferevent_ops_filter, options) < 0) {
+ &bufferevent_ops_filter, tmp_options) < 0) {
mm_free(bufev_f);
return NULL;
}
+ if (options & BEV_OPT_THREADSAFE) {
+ void *lock = BEV_UPCAST(underlying)->lock;
+ if (!lock) {
+ bufferevent_enable_locking(underlying, NULL);
+ lock = BEV_UPCAST(underlying)->lock;
+ }
+ bufferevent_enable_locking(downcast(bufev_f), lock);
+ }
bufev_f->underlying = underlying;
bufev_f->process_in = input_filter;
struct bufferevent_pair *bufev = arg;
struct bufferevent *bev = downcast(bufev);
+ BEV_LOCK(bev);
if (cb == &bufev->deferred_read_cb) {
if (bev->readcb) {
bev->readcb(bev, bev->cbarg);
bev->writecb(bev, bev->cbarg);
}
}
+ BEV_UNLOCK(bev);
}
static struct bufferevent_pair *
struct bufferevent *pair[2])
{
struct bufferevent_pair *bufev1 = NULL, *bufev2 = NULL;
+ enum bufferevent_options tmp_options = options & ~BEV_OPT_THREADSAFE;
bufev1 = bufferevent_pair_elt_new(base, options);
if (!bufev1)
return -1;
- bufev2 = bufferevent_pair_elt_new(base, options);
+ bufev2 = bufferevent_pair_elt_new(base, tmp_options);
if (!bufev2) {
bufferevent_free(downcast(bufev1));
return -1;
}
+ if (options & BEV_OPT_THREADSAFE) {
+ /*XXXX check return */
+ bufferevent_enable_locking(downcast(bufev2), bufev1->bev.lock);
+ }
+
bufev1->partner = bufev2;
bufev2->partner = bufev1;
void
bufferevent_setfd(struct bufferevent *bufev, evutil_socket_t fd)
{
+ BEV_LOCK(bufev);
assert(bufev->be_ops == &bufferevent_ops_socket);
event_del(&bufev->ev_read);
EV_READ|EV_PERSIST, bufferevent_readcb, bufev);
event_assign(&bufev->ev_write, bufev->ev_base, fd,
EV_WRITE|EV_PERSIST, bufferevent_writecb, bufev);
+ BEV_UNLOCK(bufev);
}
/* XXXX Should non-socket buffferevents support this? */
int
bufferevent_priority_set(struct bufferevent *bufev, int priority)
{
+ int r = -1;
+
+ BEV_LOCK(bufev);
if (bufev->be_ops != &bufferevent_ops_socket)
- return -1;
+ goto done;
if (event_priority_set(&bufev->ev_read, priority) == -1)
- return (-1);
+ goto done;
if (event_priority_set(&bufev->ev_write, priority) == -1)
- return (-1);
+ goto done;
- return (0);
+ r = 0;
+done:
+ BEV_UNLOCK(bufev);
+ return r;
}
/* XXXX Should non-socket buffferevents support this? */
int
bufferevent_base_set(struct event_base *base, struct bufferevent *bufev)
{
- int res;
+ int res = -1;
+
+ BEV_LOCK(bufev);
if (bufev->be_ops != &bufferevent_ops_socket)
- return -1;
+ goto done;
bufev->ev_base = base;
res = event_base_set(base, &bufev->ev_read);
if (res == -1)
- return (res);
+ goto done;
res = event_base_set(base, &bufev->ev_write);
- return (res);
+done:
+ BEV_UNLOCK(bufev);
+ return res;
}
enum bufferevent_options {
BEV_OPT_CLOSE_ON_FREE = (1<<0),
+ BEV_OPT_THREADSAFE = (1<<1),
};
/**