From 0d744aa1738f6f16324a07f6185107d28c4f7fe4 Mon Sep 17 00:00:00 2001 From: Nick Mathewson Date: Mon, 23 Nov 2009 18:34:32 -0500 Subject: [PATCH] Refactor our 'suspend operation' logic on bufferevents. There are lots of things we do internally in bufferevents to indicate "the user would like this operation to happen, but we aren't going to try until some other condition goes away." Our logic here has gotten entirely too complicated. This patch tries to fix that by adding the idea of 'suspend flags' for read and write. To say "don't bother reading or writing until condition X no longer holds," bufferevent_suspend_read/write(bev, BEV_SUSPEND_X). When X no longer holds, call bufferevent_unsuspend_read/write(bev, BEV_SUSPEND_X). Right now, only the read-watermark logic uses this. --- bufferevent-internal.h | 60 ++++++++++++++++++++++++++++++++++++------ bufferevent.c | 45 +++++++++++++++++++++++-------- 2 files changed, 86 insertions(+), 19 deletions(-) diff --git a/bufferevent-internal.h b/bufferevent-internal.h index 937086ea..a5ce7cb7 100644 --- a/bufferevent-internal.h +++ b/bufferevent-internal.h @@ -36,6 +36,28 @@ extern "C" { #include "evthread-internal.h" #include "event2/thread.h" +/* These flags are reasons that we might be declining to actually enable + reading or writing on a bufferevent. + */ + +/* On a all bufferevents, for reading: used when we have read up to the + watermark value. + + On a filtering bufferxevent, for writing: used when the underlying + bufferevent's write buffer has been filled up to its watermark + value. +*/ +#define BEV_SUSPEND_WM 0x01 +/* On a base bufferevent: when we have used up our bandwidth buckets. */ +#define BEV_SUSPEND_BW 0x02 + +struct token_bucket { + uint32_t limit; + uint32_t rate; + uint32_t burst; + unsigned last_updated; +}; + /** Parts of the bufferevent structure that are shared among all bufferevent * types, but not exposed in bufferevent_struct.h. */ struct bufferevent_private { @@ -45,8 +67,6 @@ struct bufferevent_private { /** Evbuffer callback to enforce watermarks on input. */ struct evbuffer_cb_entry *read_watermarks_cb; - /** If set, read is suspended until evbuffer some. */ - unsigned read_suspended : 1; /** If set, we should free the lock when we free the bufferevent. */ unsigned own_lock : 1; @@ -61,9 +81,21 @@ struct bufferevent_private { /** Set to the events pending if we have deferred callbacks and * an events callback is pending. */ short eventcb_pending; + + /** If set, read is suspended until one or more conditions are over. + * The actual value here is a bitfield of those conditions; see the + * BEV_SUSPEND_* flags above. */ + short read_suspended; + + /** If set, writing is suspended until one or more conditions are over. + * The actual value here is a bitfield of those conditions; see the + * BEV_SUSPEND_* flags above. */ + short write_suspended; + /** Set to the current socket errno if we have deferred callbacks and * an events callback is pending. */ int errno_pending; + /** Used to implement deferred callbacks */ struct deferred_cb deferred; @@ -155,12 +187,24 @@ extern const struct bufferevent_ops bufferevent_ops_async; /** Initialize the shared parts of a bufferevent. */ int bufferevent_init_common(struct bufferevent_private *, struct event_base *, const struct bufferevent_ops *, enum bufferevent_options options); -/** For internal use: temporarily stop all reads on bufev, because its - * read buffer is too full. */ -void bufferevent_wm_suspend_read(struct bufferevent *bufev); -/** For internal use: temporarily stop all reads on bufev, because its - * read buffer is too full. */ -void bufferevent_wm_unsuspend_read(struct bufferevent *bufev); +/** For internal use: temporarily stop all reads on bufev, until the conditions + * in 'what' are over. */ +void bufferevent_suspend_read(struct bufferevent *bufev, short what); +/** For internal use: clear the conditions 'what' on bufev, and re-enable + * reading if there are no conditions left. */ +void bufferevent_unsuspend_read(struct bufferevent *bufev, short what); + +/** For internal use: temporarily stop all writes on bufev, until the conditions + * in 'what' are over. */ +void bufferevent_suspend_write(struct bufferevent *bufev, short what); +/** For internal use: clear the conditions 'what' on bufev, and re-enable + * writing if there are no conditions left. */ +void bufferevent_unsuspend_write(struct bufferevent *bufev, short what); + +#define bufferevent_wm_suspend_read(b) \ + bufferevent_suspend_read((b), BEV_SUSPEND_WM) +#define bufferevent_wm_unsuspend_read(b) \ + bufferevent_unsuspend_read((b), BEV_SUSPEND_WM) /** Internal: Set up locking on a bufferevent. If lock is set, use it. * Otherwise, use a new lock. */ diff --git a/bufferevent.c b/bufferevent.c index 2794cff2..012950b2 100644 --- a/bufferevent.c +++ b/bufferevent.c @@ -60,33 +60,54 @@ #include "util-internal.h" void -bufferevent_wm_suspend_read(struct bufferevent *bufev) +bufferevent_suspend_read(struct bufferevent *bufev, short what) { struct bufferevent_private *bufev_private = EVUTIL_UPCAST(bufev, struct bufferevent_private, bev); BEV_LOCK(bufev); - if (!bufev_private->read_suspended) { + if (!bufev_private->read_suspended) bufev->be_ops->disable(bufev, EV_READ); - bufev_private->read_suspended = 1; - } + bufev_private->read_suspended |= what; BEV_UNLOCK(bufev); } void -bufferevent_wm_unsuspend_read(struct bufferevent *bufev) +bufferevent_unsuspend_read(struct bufferevent *bufev, short what) { struct bufferevent_private *bufev_private = EVUTIL_UPCAST(bufev, struct bufferevent_private, bev); + BEV_LOCK(bufev); + bufev_private->read_suspended &= ~what; + if (!bufev_private->read_suspended) + bufev->be_ops->enable(bufev, EV_READ); + BEV_UNLOCK(bufev); +} +void +bufferevent_suspend_write(struct bufferevent *bufev, short what) +{ + struct bufferevent_private *bufev_private = + EVUTIL_UPCAST(bufev, struct bufferevent_private, bev); BEV_LOCK(bufev); - if (bufev_private->read_suspended) { - bufev_private->read_suspended = 0; - if (bufev->enabled & EV_READ) - bufev->be_ops->enable(bufev, EV_READ); - } + if (!bufev_private->write_suspended) + bufev->be_ops->disable(bufev, EV_WRITE); + bufev_private->write_suspended |= what; BEV_UNLOCK(bufev); } +void +bufferevent_unsuspend_write(struct bufferevent *bufev, short what) +{ + struct bufferevent_private *bufev_private = + EVUTIL_UPCAST(bufev, struct bufferevent_private, bev); + BEV_LOCK(bufev); + bufev_private->write_suspended &= ~what; + if (!bufev_private->write_suspended) + bufev->be_ops->enable(bufev, EV_WRITE); + BEV_UNLOCK(bufev); +} + + /* Callback to implement watermarks on the input buffer. Only enabled * if the watermark is set. */ static void @@ -338,10 +359,12 @@ bufferevent_enable(struct bufferevent *bufev, short event) _bufferevent_incref_and_lock(bufev); if (bufev_private->read_suspended) impl_events &= ~EV_READ; + if (bufev_private->write_suspended) + impl_events &= ~EV_WRITE; bufev->enabled |= event; - if (bufev->be_ops->enable(bufev, impl_events) < 0) + if (impl_events && bufev->be_ops->enable(bufev, impl_events) < 0) r = -1; _bufferevent_decref_and_unlock(bufev); -- 2.40.0