#include "h2_util.h"
#include "h2_bucket_beam.h"
-static void h2_beam_emitted(h2_bucket_beam *beam, apr_bucket *bred);
+static void h2_beam_emitted(h2_bucket_beam *beam, h2_beam_proxy *proxy);
+
+#define H2_BPROXY_NEXT(e) APR_RING_NEXT((e), link)
+#define H2_BPROXY_PREV(e) APR_RING_PREV((e), link)
+#define H2_BPROXY_REMOVE(e) APR_RING_REMOVE((e), link)
+
+#define H2_BPROXY_LIST_INIT(b) APR_RING_INIT(&(b)->list, h2_beam_proxy, link);
+#define H2_BPROXY_LIST_SENTINEL(b) APR_RING_SENTINEL(&(b)->list, h2_beam_proxy, link)
+#define H2_BPROXY_LIST_EMPTY(b) APR_RING_EMPTY(&(b)->list, h2_beam_proxy, link)
+#define H2_BPROXY_LIST_FIRST(b) APR_RING_FIRST(&(b)->list)
+#define H2_BPROXY_LIST_LAST(b) APR_RING_LAST(&(b)->list)
+#define H2_PROXY_BLIST_INSERT_HEAD(b, e) do { \
+ h2_beam_proxy *ap__b = (e); \
+ APR_RING_INSERT_HEAD(&(b)->list, ap__b, h2_beam_proxy, link); \
+ } while (0)
+#define H2_BPROXY_LIST_INSERT_TAIL(b, e) do { \
+ h2_beam_proxy *ap__b = (e); \
+ APR_RING_INSERT_TAIL(&(b)->list, ap__b, h2_beam_proxy, link); \
+ } while (0)
+#define H2_BPROXY_LIST_CONCAT(a, b) do { \
+ APR_RING_CONCAT(&(a)->list, &(b)->list, h2_beam_proxy, link); \
+ } while (0)
+#define H2_BPROXY_LIST_PREPEND(a, b) do { \
+ APR_RING_PREPEND(&(a)->list, &(b)->list, h2_beam_proxy, link); \
+ } while (0)
+
/*******************************************************************************
* beam bucket with reference to beam and bucket it represents
#define H2_BUCKET_IS_BEAM(e) (e->type == &h2_bucket_type_beam)
-typedef struct {
+struct h2_beam_proxy {
apr_bucket_refcount refcount;
+ APR_RING_ENTRY(h2_beam_proxy) link;
h2_bucket_beam *beam;
apr_bucket *bred;
-} h2_beam_bucket;
+};
static const char Dummy = '\0';
static apr_status_t beam_bucket_read(apr_bucket *b, const char **str,
apr_size_t *len, apr_read_type_e block)
{
- h2_beam_bucket *d = b->data;
+ h2_beam_proxy *d = b->data;
if (d->bred) {
const char *data;
apr_status_t status = apr_bucket_read(d->bred, &data, len, block);
}
*str = &Dummy;
*len = 0;
- return APR_SUCCESS;
+ return APR_ECONNRESET;
}
static void beam_bucket_destroy(void *data)
{
- h2_beam_bucket *d = data;
+ h2_beam_proxy *d = data;
if (apr_bucket_shared_destroy(d)) {
- if (d->bred) {
- h2_beam_emitted(d->beam, d->bred);
+ /* When the beam gets destroyed before this bucket, it will
+ * NULLify its reference here. This is not protected by a mutex,
+ * so it will not help with race conditions.
+ * But it lets us shut down memory pool with circulare beam
+ * references. */
+ if (d->beam) {
+ h2_beam_emitted(d->beam, d);
}
apr_bucket_free(d);
}
h2_bucket_beam *beam,
apr_bucket *bred)
{
- h2_beam_bucket *d;
+ h2_beam_proxy *d;
d = apr_bucket_alloc(sizeof(*d), b->list);
+ H2_BPROXY_LIST_INSERT_TAIL(&beam->proxies, d);
d->beam = beam;
d->bred = bred;
return h2_beam_bucket_make(b, beam, bred);
}
+/*static apr_status_t beam_bucket_setaside(apr_bucket *b, apr_pool_t *pool)
+{
+ apr_status_t status = APR_SUCCESS;
+ h2_beam_proxy *d = b->data;
+ if (d->bred) {
+ const char *data;
+ apr_size_t len;
+
+ status = apr_bucket_read(d->bred, &data, &len, APR_BLOCK_READ);
+ if (status == APR_SUCCESS) {
+ b = apr_bucket_heap_make(b, (char *)data + b->start, b->length, NULL);
+ if (b == NULL) {
+ return APR_ENOMEM;
+ }
+ }
+ }
+ return status;
+}*/
+
const apr_bucket_type_t h2_bucket_type_beam = {
"BEAM", 5, APR_BUCKET_DATA,
beam_bucket_destroy,
H2_BLIST_INSERT_TAIL(&beam->purge, bred);
}
-static void h2_beam_emitted(h2_bucket_beam *beam, apr_bucket *bred)
+static void h2_beam_emitted(h2_bucket_beam *beam, h2_beam_proxy *proxy)
{
apr_thread_mutex_t *lock;
int acquired;
if (enter_yellow(beam, &lock, &acquired) == APR_SUCCESS) {
/* even when beam buckets are split, only the one where
* refcount drops to 0 will call us */
- --beam->live_beam_buckets;
+ H2_BPROXY_REMOVE(proxy);
/* invoked from green thread, the last beam bucket for the red
* bucket bred is about to be destroyed.
* remove it from the hold, where it should be now */
- h2_beam_prep_purge(beam, bred);
+ if (proxy->bred) {
+ h2_beam_prep_purge(beam, proxy->bred);
+ proxy->bred = NULL;
+ }
/* notify anyone waiting on space to become available */
if (!lock) {
r_purge_reds(beam);
}
}
-static apr_status_t beam_cleanup(void *data)
+static apr_status_t beam_close(h2_bucket_beam *beam)
{
- h2_bucket_beam *beam = data;
+ if (!beam->closed) {
+ beam->closed = 1;
+ if (beam->m_cond) {
+ apr_thread_cond_broadcast(beam->m_cond);
+ }
+ }
+ return APR_SUCCESS;
+}
- if (beam->live_beam_buckets) {
- ap_log_perror(APLOG_MARK, APLOG_WARNING, 0, beam->life_pool,
- "h2_beam(%d-%s) cleanup with live %d buckets",
- beam->id, beam->tag, (int)beam->live_beam_buckets);
+static void beam_shutdown(h2_bucket_beam *beam, int disconnect)
+{
+ if (disconnect && !H2_BPROXY_LIST_EMPTY(&beam->proxies)) {
+ /* If we are called before all green buckets we put out
+ * there have been destroyed, we need to disentangle ourself.
+ * We NULLify the beam and red buckets in every proxy from us, so
+ * a) red memory is no longer read
+ * b) destruction of the proxy no longer calls back to this beam
+ * This does not protect against races when red and green thread are still
+ * running concurrently and it does not protect from passed out red
+ * memory to still being accessed.
+ */
+ while (!H2_BPROXY_LIST_EMPTY(&beam->proxies)) {
+ h2_beam_proxy *proxy = H2_BPROXY_LIST_FIRST(&beam->proxies);
+ H2_BPROXY_REMOVE(proxy);
+ proxy->beam = NULL;
+ if (proxy->bred) {
+ h2_beam_prep_purge(beam, proxy->bred);
+ proxy->bred = NULL;
+ }
+ }
}
- AP_DEBUG_ASSERT(beam->live_beam_buckets == 0);
+ r_purge_reds(beam);
h2_blist_cleanup(&beam->red);
+ beam_close(beam);
+ report_consumption(beam);
+}
+
+static apr_status_t beam_cleanup(void *data)
+{
+ h2_bucket_beam *beam = data;
+
+ if (beam->green) {
+ apr_brigade_destroy(beam->green);
+ beam->green = NULL;
+ }
+ beam_shutdown(beam, 0);
h2_blist_cleanup(&beam->purge);
h2_blist_cleanup(&beam->hold);
return APR_SUCCESS;
H2_BLIST_INIT(&beam->red);
H2_BLIST_INIT(&beam->hold);
H2_BLIST_INIT(&beam->purge);
+ H2_BPROXY_LIST_INIT(&beam->proxies);
beam->life_pool = life_pool;
beam->max_buf_size = max_buf_size;
- apr_pool_cleanup_register(life_pool, beam, beam_cleanup,
- apr_pool_cleanup_null);
+ apr_pool_pre_cleanup_register(life_pool, beam, beam_cleanup);
*pbeam = beam;
return status;
}
}
-static apr_status_t beam_close(h2_bucket_beam *beam)
-{
- if (!beam->closed) {
- beam->closed = 1;
- if (beam->m_cond) {
- apr_thread_cond_broadcast(beam->m_cond);
- }
- }
- return APR_SUCCESS;
-}
-
apr_status_t h2_beam_close(h2_bucket_beam *beam)
{
apr_thread_mutex_t *lock;
int acquired;
if (enter_yellow(beam, &lock, &acquired) == APR_SUCCESS) {
- r_purge_reds(beam);
- h2_blist_cleanup(&beam->red);
- beam_close(beam);
- report_consumption(beam);
- leave_yellow(beam, lock, acquired);
- }
-}
-
-void h2_beam_reset(h2_bucket_beam *beam)
-{
- apr_thread_mutex_t *lock;
- int acquired;
-
- if (enter_yellow(beam, &lock, &acquired) == APR_SUCCESS) {
- beam_cleanup(beam);
- beam->closed = beam->close_sent = 0;
- beam->sent_bytes = beam->received_bytes = beam->reported_bytes = 0;
+ beam_shutdown(beam, 1);
leave_yellow(beam, lock, acquired);
}
}
* the beam bucket will notify us on destruction that bred is
* no longer needed. */
bgreen = h2_beam_bucket_create(beam, bred, bb->bucket_alloc);
- ++beam->live_beam_buckets;
}
/* Place the red bucket into our hold, to be destroyed when no
h2_stream *stream,
uint32_t error_code)
{
+ conn_rec *c = session->c;
if (!error_code) {
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c,
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c,
"h2_stream(%ld-%d): handled, closing",
session->id, (int)stream->id);
if (H2_STREAM_CLIENT_INITIATED(stream->id)) {
}
}
else {
- ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03065)
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, c, APLOGNO(03065)
"h2_stream(%ld-%d): closing with err=%d %s",
session->id, (int)stream->id, (int)error_code,
h2_h2_err_description(error_code));
}
return h2_conn_io_writeb(&session->io,
- h2_bucket_eos_create(session->c->bucket_alloc,
- stream));
+ h2_bucket_eos_create(c->bucket_alloc, stream), 1);
}
static int on_stream_close_cb(nghttp2_session *ngh2, int32_t stream_id,
if (status == APR_SUCCESS && padlen) {
b = apr_bucket_immortal_create(immortal_zeros, padlen,
session->c->bucket_alloc);
- status = h2_conn_io_writeb(&session->io, b);
+ status = h2_conn_io_writeb(&session->io, b, 1);
}
}