if (b != H2_BLIST_SENTINEL(&beam->hold_list)) {
/* bucket is in hold as it should be, mark this one
* and all before it for purging. We might have placed meta
- * buckets without a green proxy into the hold before it
+ * buckets without a receiver proxy into the hold before it
* and schedule them for purging now */
for (b = H2_BLIST_FIRST(&beam->hold_list);
b != H2_BLIST_SENTINEL(&beam->hold_list);
static apr_status_t beam_send_cleanup(void *data)
{
h2_bucket_beam *beam = data;
- /* sender has gone away, clear up all references to its memory */
+ /* sender is going away, clear up all references to its memory */
r_purge_sent(beam);
h2_blist_cleanup(&beam->send_list);
report_consumption(beam);
/* The fundamental problem is that reading a sender bucket from
- * a green thread is a total NO GO, because the bucket might use
+ * a receiver thread is a total NO GO, because the bucket might use
* its pool/bucket_alloc from a foreign thread and that will
* corrupt. */
status = APR_ENOTIMPL;
status = apr_bucket_setaside(b, beam->send_pool);
}
else if (APR_BUCKET_IS_HEAP(b)) {
- /* For heap buckets read from a green thread is fine. The
+ /* For heap buckets read from a receiver thread is fine. The
* data will be there and live until the bucket itself is
* destroyed. */
status = APR_SUCCESS;
/* pool buckets are bastards that register at pool cleanup
* to morph themselves into heap buckets. That may happen anytime,
* even after the bucket data pointer has been read. So at
- * any time inside the green thread, the pool bucket memory
+ * any time inside the receiver thread, the pool bucket memory
* may disappear. yikes. */
status = apr_bucket_read(b, &data, &len, APR_BLOCK_READ);
if (status == APR_SUCCESS) {
apr_off_t readbytes)
{
h2_beam_lock bl;
- apr_bucket *bsender, *bgreen, *ng;
+ apr_bucket *bsender, *brecv, *ng;
int transferred = 0;
apr_status_t status = APR_SUCCESS;
apr_off_t remain = readbytes;
int transferred_buckets = 0;
- /* Called from the green thread to take buckets from the beam */
+ /* Called from the receiver thread to take buckets from the beam */
if (enter_yellow(beam, &bl) == APR_SUCCESS) {
transfer:
if (beam->aborted) {
goto leave;
}
- /* transfer enough buckets from our green brigade, if we have one */
+ /* transfer enough buckets from our receiver brigade, if we have one */
beam_set_recv_pool(beam, bb->p);
while (beam->recv_buffer
&& !APR_BRIGADE_EMPTY(beam->recv_buffer)
&& (readbytes <= 0 || remain >= 0)) {
- bgreen = APR_BRIGADE_FIRST(beam->recv_buffer);
- if (readbytes > 0 && bgreen->length > 0 && remain <= 0) {
+ brecv = APR_BRIGADE_FIRST(beam->recv_buffer);
+ if (readbytes > 0 && brecv->length > 0 && remain <= 0) {
break;
}
- APR_BUCKET_REMOVE(bgreen);
- APR_BRIGADE_INSERT_TAIL(bb, bgreen);
- remain -= bgreen->length;
+ APR_BUCKET_REMOVE(brecv);
+ APR_BRIGADE_INSERT_TAIL(bb, brecv);
+ remain -= brecv->length;
++transferred;
}
/* transfer from our sender brigade, transforming sender buckets to
- * green ones until we have enough */
+ * receiver ones until we have enough */
while (!H2_BLIST_EMPTY(&beam->send_list) && (readbytes <= 0 || remain >= 0)) {
bsender = H2_BLIST_FIRST(&beam->send_list);
- bgreen = NULL;
+ brecv = NULL;
if (readbytes > 0 && bsender->length > 0 && remain <= 0) {
break;
if (APR_BUCKET_IS_METADATA(bsender)) {
if (APR_BUCKET_IS_EOS(bsender)) {
- bgreen = apr_bucket_eos_create(bb->bucket_alloc);
+ brecv = apr_bucket_eos_create(bb->bucket_alloc);
beam->close_sent = 1;
}
else if (APR_BUCKET_IS_FLUSH(bsender)) {
- bgreen = apr_bucket_flush_create(bb->bucket_alloc);
+ brecv = apr_bucket_flush_create(bb->bucket_alloc);
}
else if (AP_BUCKET_IS_ERROR(bsender)) {
ap_bucket_error *eb = (ap_bucket_error *)bsender;
- bgreen = ap_bucket_error_create(eb->status, eb->data,
+ brecv = ap_bucket_error_create(eb->status, eb->data,
bb->p, bb->bucket_alloc);
}
}
continue;
}
else {
- /* create a "green" standin bucket. we took care about the
+ /* create a "receiver" standin bucket. we took care about the
* underlying sender bucket and its data when we placed it into
* the sender brigade.
* the beam bucket will notify us on destruction that bsender is
* no longer needed. */
- bgreen = h2_beam_bucket_create(beam, bsender, bb->bucket_alloc,
+ brecv = h2_beam_bucket_create(beam, bsender, bb->bucket_alloc,
beam->buckets_sent++);
}
/* Place the sender bucket into our hold, to be destroyed when no
- * green bucket references it any more. */
+ * receiver bucket references it any more. */
APR_BUCKET_REMOVE(bsender);
H2_BLIST_INSERT_TAIL(&beam->hold_list, bsender);
beam->received_bytes += bsender->length;
++transferred_buckets;
- if (bgreen) {
- APR_BRIGADE_INSERT_TAIL(bb, bgreen);
- remain -= bgreen->length;
+ if (brecv) {
+ APR_BRIGADE_INSERT_TAIL(bb, brecv);
+ remain -= brecv->length;
++transferred;
}
else {
- bgreen = h2_beam_bucket(beam, bb, bsender);
- while (bgreen && bgreen != APR_BRIGADE_SENTINEL(bb)) {
+ brecv = h2_beam_bucket(beam, bb, bsender);
+ while (brecv && brecv != APR_BRIGADE_SENTINEL(bb)) {
++transferred;
- remain -= bgreen->length;
- bgreen = APR_BUCKET_NEXT(bgreen);
+ remain -= brecv->length;
+ brecv = APR_BUCKET_NEXT(brecv);
}
}
}
if (readbytes > 0 && remain < 0) {
/* too much, put some back */
remain = readbytes;
- for (bgreen = APR_BRIGADE_FIRST(bb);
- bgreen != APR_BRIGADE_SENTINEL(bb);
- bgreen = APR_BUCKET_NEXT(bgreen)) {
- remain -= bgreen->length;
+ for (brecv = APR_BRIGADE_FIRST(bb);
+ brecv != APR_BRIGADE_SENTINEL(bb);
+ brecv = APR_BUCKET_NEXT(brecv)) {
+ remain -= brecv->length;
if (remain < 0) {
- apr_bucket_split(bgreen, bgreen->length+remain);
+ apr_bucket_split(brecv, brecv->length+remain);
beam->recv_buffer = apr_brigade_split_ex(bb,
- APR_BUCKET_NEXT(bgreen),
+ APR_BUCKET_NEXT(brecv),
beam->recv_buffer);
break;
}
h2_stream *candidate;
} stream_sel_ctx;
-static int find_cleanup_stream(h2_stream *stream, void *ictx)
+static int find_unprocessed_stream(h2_stream *stream, void *ictx)
{
stream_sel_ctx *ctx = ictx;
if (H2_STREAM_CLIENT_INITIATED(stream->id)) {
return 1;
}
-static void cleanup_streams(h2_session *session)
+static void cleanup_unprocessed_streams(h2_session *session)
{
stream_sel_ctx ctx;
ctx.session = session;
- ctx.candidate = NULL;
while (1) {
- h2_mplx_stream_do(session->mplx, find_cleanup_stream, &ctx);
- if (ctx.candidate) {
- h2_session_stream_done(session, ctx.candidate);
- ctx.candidate = NULL;
- }
- else {
+ ctx.candidate = NULL;
+ h2_mplx_stream_do(session->mplx, find_unprocessed_stream, &ctx);
+ if (!ctx.candidate) {
break;
}
+ h2_session_stream_done(session, ctx.candidate);
}
}
return 0;
}
-static apr_status_t stream_release(h2_session *session,
- h2_stream *stream,
- uint32_t error_code)
+static apr_status_t stream_closed(h2_session *session,
+ h2_stream *stream,
+ uint32_t error_code)
{
conn_rec *c = session->c;
apr_bucket *b;
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c,
"h2_stream(%ld-%d): handled, closing",
session->id, (int)stream->id);
- if (H2_STREAM_CLIENT_INITIATED(stream->id)) {
- if (stream->id > session->local.completed_max) {
- session->local.completed_max = stream->id;
- }
+ if (H2_STREAM_CLIENT_INITIATED(stream->id)
+ && stream->id > session->local.completed_max) {
+ session->local.completed_max = stream->id;
}
}
else {
h2_h2_err_description(error_code));
h2_stream_rst(stream, error_code);
}
-
+ /* The stream might have data in the buffers of the main connection.
+ * We can only free the allocated resources once all had been written.
+ * Send a special buckets on the connection that gets destroyed when
+ * all preceding data has been handled. On its destruction, it is safe
+ * to purge all resources of the stream. */
b = h2_bucket_eos_create(c->bucket_alloc, stream);
APR_BRIGADE_INSERT_TAIL(session->bbtmp, b);
status = h2_conn_io_pass(&session->io, session->bbtmp);
(void)ngh2;
stream = get_stream(session, stream_id);
if (stream) {
- stream_release(session, stream, error_code);
+ stream_closed(session, stream, error_code);
}
return 0;
}
return APR_SUCCESS;
}
-static void h2_session_cleanup(h2_session *session)
-{
- ap_assert(session);
-
- if (session->mplx) {
- h2_mplx_set_consumed_cb(session->mplx, NULL, NULL);
- h2_mplx_release_and_join(session->mplx, session->iowait);
- session->mplx = NULL;
- }
-
- ap_remove_input_filter_byhandle((session->r? session->r->input_filters :
- session->c->input_filters), "H2_IN");
- if (session->ngh2) {
- nghttp2_session_del(session->ngh2);
- session->ngh2 = NULL;
- }
- if (session->c) {
- h2_ctx_clear(session->c);
- }
-
- if (APLOGctrace1(session->c)) {
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c,
- "h2_session(%ld): cleanup", session->id);
- }
-}
-
static apr_status_t h2_session_shutdown_notice(h2_session *session)
{
apr_status_t status;
static apr_status_t session_pool_cleanup(void *data)
{
h2_session *session = data;
- /* On a controlled connection shutdown, this gets never
- * called as we deregister and destroy our pool manually.
- * However when we have an async mpm, and handed it our idle
- * connection, it will just cleanup once the connection is closed
- * from the other side (and sometimes even from out side) and
- * here we arrive then.
- */
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c,
"session(%ld): pool_cleanup", session->id);
- if (session->state != H2_SESSION_ST_DONE) {
+ if (session->state != H2_SESSION_ST_DONE
+ && session->state != H2_SESSION_ST_INIT) {
/* Not good. The connection is being torn down and we have
* not sent a goaway. This is considered a protocol error and
* the client has to assume that any streams "in flight" may have
"goodbye, clients will be confused, should not happen",
session->id);
}
- h2_session_cleanup(session);
- session->pool = NULL;
+
+ if (session->mplx) {
+ h2_mplx_set_consumed_cb(session->mplx, NULL, NULL);
+ h2_mplx_release_and_join(session->mplx, session->iowait);
+ session->mplx = NULL;
+ }
+ if (session->ngh2) {
+ nghttp2_session_del(session->ngh2);
+ session->ngh2 = NULL;
+ }
return APR_SUCCESS;
}
static void h2_session_ev_local_goaway(h2_session *session, int arg, const char *msg)
{
- cleanup_streams(session);
+ cleanup_unprocessed_streams(session);
if (!session->remote.shutdown) {
update_child_status(session, SERVER_CLOSING, "local goaway");
}
session->remote.error = arg;
session->remote.accepting = 0;
session->remote.shutdown = 1;
- cleanup_streams(session);
+ cleanup_unprocessed_streams(session);
update_child_status(session, SERVER_CLOSING, "remote goaway");
transit(session, "remote goaway", H2_SESSION_ST_DONE);
}