#include <ws2tcpip.h>
#endif
+#include <sys/queue.h>
+
#include "event2/util.h"
#include "event2/bufferevent.h"
#include "event2/buffer.h"
#include "event2/bufferevent_struct.h"
#include "event2/event.h"
+#include "event2/util.h"
+#include "event-internal.h"
#include "log-internal.h"
#include "mm-internal.h"
#include "bufferevent-internal.h"
unsigned read_in_progress : 1;
unsigned write_in_progress : 1;
unsigned ok : 1;
+ unsigned read_added : 1;
+ unsigned write_added : 1;
};
const struct bufferevent_ops bufferevent_ops_async = {
}
static void
-bev_async_consider_writing(struct bufferevent_async *b)
+bev_async_del_write(struct bufferevent_async *beva)
+{
+ struct bufferevent *bev = &beva->bev.bev;
+
+ if (beva->write_added) {
+ beva->write_added = 0;
+ event_base_del_virtual(bev->ev_base);
+ }
+}
+
+static void
+bev_async_del_read(struct bufferevent_async *beva)
+{
+ struct bufferevent *bev = &beva->bev.bev;
+
+ if (beva->read_added) {
+ beva->read_added = 0;
+ event_base_del_virtual(bev->ev_base);
+ }
+}
+
+static void
+bev_async_add_write(struct bufferevent_async *beva)
+{
+ struct bufferevent *bev = &beva->bev.bev;
+
+ if (!beva->write_added) {
+ beva->write_added = 1;
+ event_base_add_virtual(bev->ev_base);
+ }
+}
+
+static void
+bev_async_add_read(struct bufferevent_async *beva)
+{
+ struct bufferevent *bev = &beva->bev.bev;
+
+ if (!beva->read_added) {
+ beva->read_added = 1;
+ event_base_add_virtual(bev->ev_base);
+ }
+}
+
+static void
+bev_async_consider_writing(struct bufferevent_async *beva)
{
size_t at_most;
int limit;
+ struct bufferevent *bev = &beva->bev.bev;
+
/* Don't write if there's a write in progress, or we do not
- * want to write. */
- if (!b->ok || b->write_in_progress || !(b->bev.bev.enabled&EV_WRITE))
+ * want to write, or when there's nothing left to write. */
+ if (beva->write_in_progress)
return;
- /* Don't write if there's nothing to write */
- if (!evbuffer_get_length(b->bev.bev.output))
+ if (!beva->ok || !(bev->enabled&EV_WRITE) ||
+ !evbuffer_get_length(bev->output)) {
+ bev_async_del_write(beva);
return;
+ }
- at_most = evbuffer_get_length(b->bev.bev.output);
+ at_most = evbuffer_get_length(bev->output);
/* XXXX This over-commits. */
- limit = _bufferevent_get_write_max(&b->bev);
+ limit = _bufferevent_get_write_max(&beva->bev);
if (at_most >= limit)
at_most = limit;
- if (b->bev.write_suspended)
+ if (beva->bev.write_suspended) {
+ bev_async_del_write(beva);
return;
+ }
/* XXXX doesn't respect low-water mark very well. */
- if (evbuffer_launch_write(b->bev.bev.output, at_most,
- &b->write_overlapped)) {
- EVUTIL_ASSERT(0);/* XXX act sensibly. */
+ bufferevent_incref(bev);
+ if (evbuffer_launch_write(bev->output, at_most,
+ &beva->write_overlapped)) {
+ bufferevent_decref(bev);
+ beva->ok = 0;
+ _bufferevent_run_eventcb(bev, BEV_EVENT_ERROR);
} else {
- b->write_in_progress = 1;
+ beva->write_in_progress = 1;
+ bev_async_add_write(beva);
}
}
static void
-bev_async_consider_reading(struct bufferevent_async *b)
+bev_async_consider_reading(struct bufferevent_async *beva)
{
size_t cur_size;
size_t read_high;
size_t at_most;
int limit;
+ struct bufferevent *bev = &beva->bev.bev;
+
/* Don't read if there is a read in progress, or we do not
* want to read. */
- if (!b->ok || b->read_in_progress || !(b->bev.bev.enabled&EV_READ))
+ if (beva->read_in_progress)
+ return;
+ if (!beva->ok || !(bev->enabled&EV_READ)) {
+ bev_async_del_read(beva);
return;
+ }
/* Don't read if we're full */
- cur_size = evbuffer_get_length(b->bev.bev.input);
- read_high = b->bev.bev.wm_read.high;
+ cur_size = evbuffer_get_length(bev->input);
+ read_high = bev->wm_read.high;
if (read_high) {
- if (cur_size >= read_high)
+ if (cur_size >= read_high) {
+ bev_async_del_read(beva);
return;
+ }
at_most = read_high - cur_size;
} else {
at_most = 16384; /* FIXME totally magic. */
}
/* XXXX This over-commits. */
- limit = _bufferevent_get_read_max(&b->bev);
+ limit = _bufferevent_get_read_max(&beva->bev);
if (at_most >= limit)
at_most = limit;
- if (b->bev.read_suspended)
+ if (beva->bev.read_suspended) {
+ bev_async_del_read(beva);
return;
+ }
- if (evbuffer_launch_read(b->bev.bev.input, at_most,
- &b->read_overlapped)) {
- EVUTIL_ASSERT(0);
+ bufferevent_incref(bev);
+ if (evbuffer_launch_read(bev->input, at_most, &beva->read_overlapped)) {
+ beva->ok = 0;
+ bufferevent_decref(bev);
+ _bufferevent_run_eventcb(bev, BEV_EVENT_ERROR);
} else {
- b->read_in_progress = 1;
+ beva->read_in_progress = 1;
+ bev_async_add_read(beva);
}
+
+ return;
}
static void
static int
be_async_disable(struct bufferevent *bev, short what)
{
+ struct bufferevent_async *bev_async = upcast(bev);
/* XXXX If we disable reading or writing, we may want to consider
* canceling any in-progress read or write operation, though it might
* not work. */
- if (what & EV_READ)
+ if (what & EV_READ) {
BEV_DEL_GENERIC_READ_TIMEOUT(bev);
- if (what & EV_WRITE)
+ bev_async_del_read(bev_async);
+ }
+ if (what & EV_WRITE) {
BEV_DEL_GENERIC_WRITE_TIMEOUT(bev);
+ bev_async_del_write(bev_async);
+ }
return 0;
}
static void
be_async_destruct(struct bufferevent *bev)
{
+ struct bufferevent_async *bev_async = upcast(bev);
struct bufferevent_private *bev_p = BEV_UPCAST(bev);
evutil_socket_t fd;
- EVUTIL_ASSERT(!upcast(bev)->write_in_progress && !upcast(bev)->read_in_progress);
+ EVUTIL_ASSERT(!upcast(bev)->write_in_progress &&
+ !upcast(bev)->read_in_progress);
+
+ bev_async_del_read(bev_async);
+ bev_async_del_write(bev_async);
- /* XXX cancel any outstanding I/O operations */
fd = _evbuffer_overlapped_get_fd(bev->input);
- /* delete this in case non-blocking connect was used */
- event_del(&bev->ev_write);
if (bev_p->options & BEV_OPT_CLOSE_ON_FREE)
evutil_closesocket(fd);
- _bufferevent_del_generic_timeout_cbs(bev);
+ /* delete this in case non-blocking connect was used */
+ if (event_initialized(&bev->ev_write)) {
+ event_del(&bev->ev_write);
+ _bufferevent_del_generic_timeout_cbs(bev);
+ }
+}
+
+/* GetQueuedCompletionStatus doesn't reliably yield WSA error codes, so
+ * we use WSAGetOverlappedResult to translate. */
+static void
+bev_async_set_wsa_error(struct bufferevent *bev, struct event_overlapped *eo)
+{
+ DWORD bytes, flags;
+ evutil_socket_t fd;
+
+ fd = _evbuffer_overlapped_get_fd(bev->input);
+ WSAGetOverlappedResult(fd, &eo->overlapped, &bytes, FALSE, &flags);
}
static int
struct bufferevent_async *bev_a = upcast_connect(eo);
struct bufferevent *bev = &bev_a->bev.bev;
- _bufferevent_incref_and_lock(bev);
+ BEV_LOCK(bev);
EVUTIL_ASSERT(bev_a->bev.connecting);
bev_a->bev.connecting = 0;
+ event_base_del_virtual(bev->ev_base);
+
+ if (ok)
+ bufferevent_async_set_connected(bev);
+ else
+ bev_async_set_wsa_error(bev, eo);
- bufferevent_async_set_connected(bev);
_bufferevent_run_eventcb(bev,
ok? BEV_EVENT_CONNECTED : BEV_EVENT_ERROR);
+ event_base_del_virtual(bev->ev_base);
+
_bufferevent_decref_and_unlock(bev);
}
struct bufferevent *bev = &bev_a->bev.bev;
short what = BEV_EVENT_READING;
- _bufferevent_incref_and_lock(bev);
- EVUTIL_ASSERT(bev_a->ok && bev_a->read_in_progress);
+ BEV_LOCK(bev);
+ EVUTIL_ASSERT(bev_a->read_in_progress);
evbuffer_commit_read(bev->input, nbytes);
bev_a->read_in_progress = 0;
- if (ok && nbytes) {
- BEV_RESET_GENERIC_READ_TIMEOUT(bev);
- _bufferevent_decrement_read_buckets(&bev_a->bev, nbytes);
- if (evbuffer_get_length(bev->input) >= bev->wm_read.low)
- _bufferevent_run_readcb(bev);
- bev_async_consider_reading(bev_a);
- } else if (!ok) {
- what |= BEV_EVENT_ERROR;
- bev_a->ok = 0;
- _bufferevent_run_eventcb(bev, what);
- } else if (!nbytes) {
- what |= BEV_EVENT_EOF;
- bev_a->ok = 0;
- _bufferevent_run_eventcb(bev, what);
+ if (!ok)
+ bev_async_set_wsa_error(bev, eo);
+
+ if (bev_a->ok) {
+ if (ok && nbytes) {
+ BEV_RESET_GENERIC_READ_TIMEOUT(bev);
+ _bufferevent_decrement_read_buckets(&bev_a->bev,
+ nbytes);
+ if (evbuffer_get_length(bev->input) >= bev->wm_read.low)
+ _bufferevent_run_readcb(bev);
+ bev_async_consider_reading(bev_a);
+ } else if (!ok) {
+ what |= BEV_EVENT_ERROR;
+ bev_a->ok = 0;
+ _bufferevent_run_eventcb(bev, what);
+ } else if (!nbytes) {
+ what |= BEV_EVENT_EOF;
+ bev_a->ok = 0;
+ _bufferevent_run_eventcb(bev, what);
+ }
}
_bufferevent_decref_and_unlock(bev);
struct bufferevent *bev = &bev_a->bev.bev;
short what = BEV_EVENT_WRITING;
- _bufferevent_incref_and_lock(bev);
- EVUTIL_ASSERT(bev_a->ok && bev_a->write_in_progress);
-
+ BEV_LOCK(bev);
+ EVUTIL_ASSERT(bev_a->write_in_progress);
evbuffer_commit_write(bev->output, nbytes);
bev_a->write_in_progress = 0;
- if (ok && nbytes) {
- BEV_RESET_GENERIC_WRITE_TIMEOUT(bev);
- _bufferevent_decrement_write_buckets(&bev_a->bev, nbytes);
- if (evbuffer_get_length(bev->output) <= bev->wm_write.low)
- _bufferevent_run_writecb(bev);
- bev_async_consider_writing(bev_a);
- } else if (!ok) {
- what |= BEV_EVENT_ERROR;
- bev_a->ok = 0;
- _bufferevent_run_eventcb(bev, what);
- } else if (!nbytes) {
- what |= BEV_EVENT_EOF;
- bev_a->ok = 0;
- _bufferevent_run_eventcb(bev, what);
+ if (!ok)
+ bev_async_set_wsa_error(bev, eo);
+
+ if (bev_a->ok) {
+ if (ok && nbytes) {
+ BEV_RESET_GENERIC_WRITE_TIMEOUT(bev);
+ _bufferevent_decrement_write_buckets(&bev_a->bev,
+ nbytes);
+ if (evbuffer_get_length(bev->output) <=
+ bev->wm_write.low)
+ _bufferevent_run_writecb(bev);
+ bev_async_consider_writing(bev_a);
+ } else if (!ok) {
+ what |= BEV_EVENT_ERROR;
+ bev_a->ok = 0;
+ _bufferevent_run_eventcb(bev, what);
+ } else if (!nbytes) {
+ what |= BEV_EVENT_EOF;
+ bev_a->ok = 0;
+ _bufferevent_run_eventcb(bev, what);
+ }
}
_bufferevent_decref_and_unlock(bev);
evbuffer_add_cb(bev->input, be_async_inbuf_callback, bev);
evbuffer_add_cb(bev->output, be_async_outbuf_callback, bev);
- evbuffer_defer_callbacks(bev->input, base);
- evbuffer_defer_callbacks(bev->output, base);
event_overlapped_init(&bev_a->connect_overlapped, connect_complete);
event_overlapped_init(&bev_a->read_overlapped, read_complete);
WSAGetLastError() != WSAEINVAL)
return -1;
+ event_base_add_virtual(bev->ev_base);
+ bufferevent_incref(bev);
rc = ext->ConnectEx(fd, sa, socklen, NULL, 0, NULL,
&bev_async->connect_overlapped.overlapped);
if (rc || WSAGetLastError() == ERROR_IO_PENDING)
return 0;
+ event_base_del_virtual(bev->ev_base);
+ bufferevent_decref(bev);
+
return -1;
}