#include "evthread-internal.h"
#include "event2/thread.h"
+/* These flags are reasons that we might be declining to actually enable
+ reading or writing on a bufferevent.
+ */
+
+/* On a all bufferevents, for reading: used when we have read up to the
+ watermark value.
+
+ On a filtering bufferxevent, for writing: used when the underlying
+ bufferevent's write buffer has been filled up to its watermark
+ value.
+*/
+#define BEV_SUSPEND_WM 0x01
+/* On a base bufferevent: when we have used up our bandwidth buckets. */
+#define BEV_SUSPEND_BW 0x02
+
+struct token_bucket {
+ uint32_t limit;
+ uint32_t rate;
+ uint32_t burst;
+ unsigned last_updated;
+};
+
/** Parts of the bufferevent structure that are shared among all bufferevent
* types, but not exposed in bufferevent_struct.h. */
struct bufferevent_private {
/** Evbuffer callback to enforce watermarks on input. */
struct evbuffer_cb_entry *read_watermarks_cb;
- /** If set, read is suspended until evbuffer some. */
- unsigned read_suspended : 1;
/** If set, we should free the lock when we free the bufferevent. */
unsigned own_lock : 1;
/** Set to the events pending if we have deferred callbacks and
* an events callback is pending. */
short eventcb_pending;
+
+ /** If set, read is suspended until one or more conditions are over.
+ * The actual value here is a bitfield of those conditions; see the
+ * BEV_SUSPEND_* flags above. */
+ short read_suspended;
+
+ /** If set, writing is suspended until one or more conditions are over.
+ * The actual value here is a bitfield of those conditions; see the
+ * BEV_SUSPEND_* flags above. */
+ short write_suspended;
+
/** Set to the current socket errno if we have deferred callbacks and
* an events callback is pending. */
int errno_pending;
+
/** Used to implement deferred callbacks */
struct deferred_cb deferred;
/** Initialize the shared parts of a bufferevent. */
int bufferevent_init_common(struct bufferevent_private *, struct event_base *, const struct bufferevent_ops *, enum bufferevent_options options);
-/** For internal use: temporarily stop all reads on bufev, because its
- * read buffer is too full. */
-void bufferevent_wm_suspend_read(struct bufferevent *bufev);
-/** For internal use: temporarily stop all reads on bufev, because its
- * read buffer is too full. */
-void bufferevent_wm_unsuspend_read(struct bufferevent *bufev);
+/** For internal use: temporarily stop all reads on bufev, until the conditions
+ * in 'what' are over. */
+void bufferevent_suspend_read(struct bufferevent *bufev, short what);
+/** For internal use: clear the conditions 'what' on bufev, and re-enable
+ * reading if there are no conditions left. */
+void bufferevent_unsuspend_read(struct bufferevent *bufev, short what);
+
+/** For internal use: temporarily stop all writes on bufev, until the conditions
+ * in 'what' are over. */
+void bufferevent_suspend_write(struct bufferevent *bufev, short what);
+/** For internal use: clear the conditions 'what' on bufev, and re-enable
+ * writing if there are no conditions left. */
+void bufferevent_unsuspend_write(struct bufferevent *bufev, short what);
+
+#define bufferevent_wm_suspend_read(b) \
+ bufferevent_suspend_read((b), BEV_SUSPEND_WM)
+#define bufferevent_wm_unsuspend_read(b) \
+ bufferevent_unsuspend_read((b), BEV_SUSPEND_WM)
/** Internal: Set up locking on a bufferevent. If lock is set, use it.
* Otherwise, use a new lock. */
#include "util-internal.h"
void
-bufferevent_wm_suspend_read(struct bufferevent *bufev)
+bufferevent_suspend_read(struct bufferevent *bufev, short what)
{
struct bufferevent_private *bufev_private =
EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
BEV_LOCK(bufev);
- if (!bufev_private->read_suspended) {
+ if (!bufev_private->read_suspended)
bufev->be_ops->disable(bufev, EV_READ);
- bufev_private->read_suspended = 1;
- }
+ bufev_private->read_suspended |= what;
BEV_UNLOCK(bufev);
}
void
-bufferevent_wm_unsuspend_read(struct bufferevent *bufev)
+bufferevent_unsuspend_read(struct bufferevent *bufev, short what)
{
struct bufferevent_private *bufev_private =
EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
+ BEV_LOCK(bufev);
+ bufev_private->read_suspended &= ~what;
+ if (!bufev_private->read_suspended)
+ bufev->be_ops->enable(bufev, EV_READ);
+ BEV_UNLOCK(bufev);
+}
+void
+bufferevent_suspend_write(struct bufferevent *bufev, short what)
+{
+ struct bufferevent_private *bufev_private =
+ EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
BEV_LOCK(bufev);
- if (bufev_private->read_suspended) {
- bufev_private->read_suspended = 0;
- if (bufev->enabled & EV_READ)
- bufev->be_ops->enable(bufev, EV_READ);
- }
+ if (!bufev_private->write_suspended)
+ bufev->be_ops->disable(bufev, EV_WRITE);
+ bufev_private->write_suspended |= what;
BEV_UNLOCK(bufev);
}
+void
+bufferevent_unsuspend_write(struct bufferevent *bufev, short what)
+{
+ struct bufferevent_private *bufev_private =
+ EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
+ BEV_LOCK(bufev);
+ bufev_private->write_suspended &= ~what;
+ if (!bufev_private->write_suspended)
+ bufev->be_ops->enable(bufev, EV_WRITE);
+ BEV_UNLOCK(bufev);
+}
+
+
/* Callback to implement watermarks on the input buffer. Only enabled
* if the watermark is set. */
static void
_bufferevent_incref_and_lock(bufev);
if (bufev_private->read_suspended)
impl_events &= ~EV_READ;
+ if (bufev_private->write_suspended)
+ impl_events &= ~EV_WRITE;
bufev->enabled |= event;
- if (bufev->be_ops->enable(bufev, impl_events) < 0)
+ if (impl_events && bufev->be_ops->enable(bufev, impl_events) < 0)
r = -1;
_bufferevent_decref_and_unlock(bufev);