static void accepted_socket_cb(struct event_overlapped *o, uintptr_t key,
ev_ssize_t n, int ok);
+static void accepted_socket_invoke_user_cb(struct deferred_cb *cb, void *arg);
static struct accepting_socket *
new_accepting_socket(struct evconnlistener_iocp *lev, int family)
res->buflen = buflen;
res->family = family;
+ event_deferred_cb_init(&res->deferred,
+ accepted_socket_invoke_user_cb, res);
+
InitializeCriticalSection(&res->lock);
return res;
}
done:
- LeaveCriticalSection(&as->lock);
return result;
}
static void
accepted_socket_invoke_user_cb(struct deferred_cb *cb, void *arg)
{
- struct *as = arg;
- evconnlistener_cb cb;
+ struct accepting_socket *as = arg;
+
+ struct sockaddr *sa_local=NULL, *sa_remote=NULL;
+ int socklen_local=0, socklen_remote=0;
+ const struct win32_extension_fns *ext =
+ event_get_win32_extension_fns();
+
+ EVUTIL_ASSERT(ext->GetAcceptExSockaddrs);
EnterCriticalSection(&as->lock);
+ if (as->free_on_cb) {
+ free_and_unlock_accepting_socket(as);
+ return;
+ }
+
ext->GetAcceptExSockaddrs(as->addrbuf, 0,
as->buflen/2, as->buflen/2,
&sa_local, &socklen_local,
&sa_remote, &socklen_remote);
- /* XXXX should we/can we release the lock here? */
as->lev->base.cb(&as->lev->base, as->s, sa_remote,
socklen_remote, as->lev->base.user_data);
as->s = INVALID_SOCKET;
- if (as->free_on_cb) {
- free_and_unlock_accepting_socket(as);
- } else {
- start_accepting(as);/*XXX handle error */
- LeaveCriticalSection(&as->lock);
- }
+ start_accepting(as);/*XXX handle error */
+ LeaveCriticalSection(&as->lock);
}
static void
accepted_socket_cb(struct event_overlapped *o, uintptr_t key, ev_ssize_t n, int ok)
{
- struct sockaddr *sa_local=NULL, *sa_remote=NULL;
- int socklen_local=0, socklen_remote=0;
struct accepting_socket *as =
EVUTIL_UPCAST(o, struct accepting_socket, overlapped);
- const struct win32_extension_fns *ext =
- event_get_win32_extension_fns();
- EVUTIL_ASSERT(ext->GetAcceptExSockaddrs);
EnterCriticalSection(&as->lock);
if (ok) {
event_base_get_deferred_cb_queue(as->lev->event_base),
&as->deferred);
LeaveCriticalSection(&as->lock);
- } else if (free_on_cb) {
+ } else if (as->free_on_cb) {
free_and_unlock_accepting_socket(as);
} else if (as->s == INVALID_SOCKET) {
/* This is okay; we were disabled by iocp_listener_disable. */
LeaveCriticalSection(&as->lock);
} else {
/* Some error on accept that we couldn't actually handle. */
- event_sock_warn(as->fd, "Unexpected error on AcceptEx");
+ event_sock_warn(as->s, "Unexpected error on AcceptEx");
LeaveCriticalSection(&as->lock);
/* XXXX recover better. */
}
struct evconnlistener_iocp *lev_iocp =
EVUTIL_UPCAST(lev, struct evconnlistener_iocp, base);
- EnterCriticalSection(&lev->lock);
- for (i = 0; i < n_accepting; ++i) {
+ EnterCriticalSection(&lev_iocp->lock);
+ for (i = 0; i < lev_iocp->n_accepting; ++i) {
struct accepting_socket *as = lev_iocp->accepting[i];
if (!as)
continue;
start_accepting(as); /* detect failure. */
LeaveCriticalSection(&as->lock);
}
- LeaveCriticalSection(&lev->lock);
+ LeaveCriticalSection(&lev_iocp->lock);
return 0;
}
struct evconnlistener_iocp *lev_iocp =
EVUTIL_UPCAST(lev, struct evconnlistener_iocp, base);
- EnterCriticalSection(&lev->lock);
- for (i = 0; i < n_accepting; ++i) {
+ EnterCriticalSection(&lev_iocp->lock);
+ for (i = 0; i < lev_iocp->n_accepting; ++i) {
struct accepting_socket *as = lev_iocp->accepting[i];
if (!as)
continue;
}
LeaveCriticalSection(&as->lock);
}
- LeaveCriticalSection(&lev->lock);
+ LeaveCriticalSection(&lev_iocp->lock);
return 0;
}
static int
-iocp_listener_disable_impl(struct evconnlistener *lev)
+iocp_listener_disable(struct evconnlistener *lev)
{
return iocp_listener_disable_impl(lev,0);
}
iocp_listener_getbase
};
+/* XXX define some way to override this. */
+#define N_SOCKETS_PER_LISTENER 4
+
struct evconnlistener *
evconnlistener_new_async(struct event_base *base,
evconnlistener_cb cb, void *ptr, unsigned flags, int backlog,
struct sockaddr_storage ss;
int socklen = sizeof(ss);
struct evconnlistener_iocp *lev;
+ int i;
+
if (!event_base_get_iocp(base))
return NULL;
if (event_iocp_port_associate(lev->port, fd, 1) < 0)
return NULL;
- lev->n_accepting = 1;
- lev->accepting = mm_calloc(1, sizeof(struct accepting_socket *));
+ InitializeCriticalSection(&lev->lock);
+
+ lev->n_accepting = N_SOCKETS_PER_LISTENER;
+ lev->accepting = mm_calloc(lev->n_accepting,
+ sizeof(struct accepting_socket *));
if (!lev->accepting) {
event_warn("calloc");
mm_free(lev);
closesocket(fd);
return NULL;
}
- lev->accepting[0] = new_accepting_socket(lev, ss.ss_family);
- if (!lev->accepting[0]) {
- event_warnx("Couldn't create accepting socket");
- mm_free(lev->accepting);
- mm_free(lev);
- closesocket(fd);
- return NULL;
- }
-
- InitializeCriticalSection(&lev->lock);
+ for (i = 0; i < lev->n_accepting; ++i) {
+ lev->accepting[i] = new_accepting_socket(lev, ss.ss_family);
+ if (!lev->accepting[i]) {
+ event_warnx("Couldn't create accepting socket");
+ mm_free(lev->accepting);
+ /* DeleteCriticalSection on lev->lock XXXX */
+ /* XXXX free the other elements. */
+ mm_free(lev);
+ closesocket(fd);
+ return NULL;
+ }
- if (start_accepting(lev->accepting[0]) < 0) {
- event_warnx("Couldn't start accepting on socket");
- EnterCriticalSection(&lev->accepting[0]->lock);
- free_and_unlock_accepting_socket(lev->accepting[0]);
- mm_free(lev->accepting);
- mm_free(lev);
- closesocket(fd);
- DeleteCriticalSection(&lev->lock);
- return NULL;
+ if (start_accepting(lev->accepting[i]) < 0) {
+ event_warnx("Couldn't start accepting on socket");
+ EnterCriticalSection(&lev->accepting[i]->lock);
+ free_and_unlock_accepting_socket(lev->accepting[i]);
+ mm_free(lev->accepting);
+ mm_free(lev);
+ closesocket(fd);
+ DeleteCriticalSection(&lev->lock);
+ return NULL;
+ }
}
return &lev->base;