return 0;
}
+apr_status_t h2_io_in_shutdown(h2_io *io)
+{
+ if (io->bbin) {
+ apr_off_t end_len = 0;
+ apr_brigade_length(io->bbin, 1, &end_len);
+ io->input_consumed += end_len;
+ apr_brigade_cleanup(io->bbin);
+ }
+ return h2_io_in_close(io);
+}
+
apr_status_t h2_io_in_read(h2_io *io, apr_bucket_brigade *bb,
apr_size_t maxlen)
{
*/
apr_status_t h2_io_in_close(h2_io *io);
+/**
+ * Shuts all input down. Will close input and mark any data buffered
+ * as consumed.
+ */
+apr_status_t h2_io_in_shutdown(h2_io *io);
+
/*******************************************************************************
* Output handling of streams.
******************************************************************************/
h2_workers_unregister(m->workers, m);
}
-static void io_destroy(h2_mplx *m, h2_io *io)
+static int io_process_events(h2_mplx *m, h2_io *io) {
+ if (io->input_consumed && m->input_consumed) {
+ m->input_consumed(m->input_consumed_ctx,
+ io->id, io->input_consumed);
+ io->input_consumed = 0;
+ return 1;
+ }
+ return 0;
+}
+
+
+static void io_destroy(h2_mplx *m, h2_io *io, int events)
{
apr_pool_t *pool = io->pool;
+ /* cleanup any buffered input */
+ h2_io_in_shutdown(io);
+ if (events) {
+ /* Process outstanding events before destruction */
+ io_process_events(m, io);
+ }
+
io->pool = NULL;
/* The pool is cleared/destroyed which also closes all
* allocated file handles. Give this count back to our
h2_io_set_remove(m->ready_ios, io);
if (io->task_done || h2_tq_remove(m->q, io->id)) {
/* already finished or not even started yet */
- io_destroy(m, io);
+ io_destroy(m, io, 1);
return 0;
}
else {
if (io) {
io->task_done = 1;
if (io->orphaned) {
- io_destroy(m, io);
+ io_destroy(m, io, 0);
}
else {
/* hang around until the stream deregisteres */
if (io->input_arrived) {
apr_thread_cond_signal(io->input_arrived);
}
+ io_process_events(m, io);
}
else {
status = APR_EOF;
if (io->input_arrived) {
apr_thread_cond_signal(io->input_arrived);
}
+ io_process_events(m, io);
}
else {
status = APR_ECONNABORTED;
}
typedef struct {
- h2_mplx_consumed_cb *cb;
- void *cb_ctx;
+ h2_mplx * m;
int streams_updated;
} update_ctx;
static int update_window(void *ctx, h2_io *io)
{
- if (io->input_consumed) {
- update_ctx *uctx = (update_ctx*)ctx;
- uctx->cb(uctx->cb_ctx, io->id, io->input_consumed);
- io->input_consumed = 0;
+ update_ctx *uctx = (update_ctx*)ctx;
+ if (io_process_events(uctx->m, io)) {
++uctx->streams_updated;
}
return 1;
}
-apr_status_t h2_mplx_in_update_windows(h2_mplx *m,
- h2_mplx_consumed_cb *cb, void *cb_ctx)
+void h2_mplx_set_consumed_cb(h2_mplx *m, h2_mplx_consumed_cb *cb, void *ctx)
+{
+ m->input_consumed = cb;
+ m->input_consumed_ctx = ctx;
+}
+
+apr_status_t h2_mplx_in_update_windows(h2_mplx *m)
{
apr_status_t status;
AP_DEBUG_ASSERT(m);
if (APR_SUCCESS == status) {
update_ctx ctx;
- ctx.cb = cb;
- ctx.cb_ctx = cb_ctx;
+ ctx.m = m;
ctx.streams_updated = 0;
status = APR_EAGAIN;
m->id, io->id);
io->orphaned = 1;
if (io->task_done) {
- io_destroy(m, io);
+ io_destroy(m, io, 1);
}
else {
- /* hang around until the h2_task is done */
+ /* hang around until the h2_task is done, but
+ * shutdown input and send out any events (e.g. window
+ * updates) asap. */
+ h2_io_in_shutdown(io);
h2_io_rst(io, H2_ERR_STREAM_CLOSED);
+ io_process_events(m, io);
}
}
typedef struct h2_mplx h2_mplx;
+/**
+ * Callback invoked for every stream that had input data read since
+ * the last invocation.
+ */
+typedef void h2_mplx_consumed_cb(void *ctx, int stream_id, apr_off_t consumed);
+
struct h2_mplx {
long id;
APR_RING_ENTRY(h2_mplx) link;
apr_pool_t *spare_pool; /* spare pool, ready for next io */
struct h2_workers *workers;
int file_handles_allowed;
+
+ h2_mplx_consumed_cb *input_consumed;
+ void *input_consumed_ctx;
};
struct h2_task *h2_mplx_pop_task(h2_mplx *mplx, struct h2_worker *w, int *has_more);
+/**
+ * Register a callback for the amount of input data consumed per stream. The
+ * will only ever be invoked from the thread creating this h2_mplx, e.g. when
+ * calls from that thread into this h2_mplx are made.
+ *
+ * @param m the multiplexer to register the callback at
+ * @param cb the function to invoke
+ * @param ctx user supplied argument to invocation.
+ */
+void h2_mplx_set_consumed_cb(h2_mplx *m, h2_mplx_consumed_cb *cb, void *ctx);
+
/*******************************************************************************
* Input handling of streams.
******************************************************************************/
int h2_mplx_in_has_eos_for(h2_mplx *m, int stream_id);
/**
- * Callback invoked for every stream that had input data read since
- * the last invocation.
- */
-typedef void h2_mplx_consumed_cb(void *ctx, int stream_id, apr_off_t consumed);
-
-/**
- * Invoke the callback for all streams that had bytes read since the last
- * call to this function. If no stream had input data consumed, the callback
- * is not invoked.
+ * Invoke the consumed callback for all streams that had bytes read since the
+ * last call to this function. If no stream had input data consumed, the
+ * callback is not invoked.
+ * The consumed callback may also be invoked at other times whenever
+ * the need arises.
* Returns APR_SUCCESS when an update happened, APR_EAGAIN if no update
* happened.
*/
-apr_status_t h2_mplx_in_update_windows(h2_mplx *m,
- h2_mplx_consumed_cb *cb, void *ctx);
+apr_status_t h2_mplx_in_update_windows(h2_mplx *m);
/*******************************************************************************
* Output handling of streams.
return NGHTTP2_ERR_PROTO;
}
+static void update_window(void *ctx, int stream_id, apr_off_t bytes_read)
+{
+ h2_session *session = (h2_session*)ctx;
+ nghttp2_session_consume(session->ngh2, stream_id, bytes_read);
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c,
+ "h2_session(%ld-%d): consumed %ld bytes",
+ session->id, stream_id, (long)bytes_read);
+}
+
+
h2_stream *h2_session_open_stream(h2_session *session, int stream_id)
{
h2_stream * stream;
"h2_stream(%ld-%d): data_chunk_recv, written %ld bytes",
session->id, stream_id, (long)len);
if (status != APR_SUCCESS) {
+ update_window(session, stream_id, len);
rv = nghttp2_submit_rst_stream(ngh2, NGHTTP2_FLAG_NONE, stream_id,
H2_STREAM_RST(stream, H2_ERR_INTERNAL_ERROR));
if (nghttp2_is_fatal(rv)) {
void *user_data)
{
h2_session *session = user_data;
- if (APLOGctrace1(session->c)) {
+ if (APLOGcdebug(session->c)) {
char buffer[256];
frame_print(frame, buffer, sizeof(buffer)/sizeof(buffer[0]));
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c,
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c,
"h2_session(%ld): frame_send %s",
session->id, buffer);
}
session->workers = workers;
session->mplx = h2_mplx_create(c, session->pool, config, workers);
+ h2_mplx_set_consumed_cb(session->mplx, update_window, session);
+
h2_conn_io_init(&session->io, c, config, session->pool);
session->bbtmp = apr_brigade_create(session->pool, c->bucket_alloc);
return 0;
}
-static void update_window(void *ctx, int stream_id, apr_off_t bytes_read)
-{
- h2_session *session = (h2_session*)ctx;
- nghttp2_session_consume(session->ngh2, stream_id, bytes_read);
-}
-
h2_stream *h2_session_get_stream(h2_session *session, int stream_id)
{
if (!session->last_stream || stream_id != session->last_stream->id) {
}
if (wait_micros > 0) {
- ap_log_cerror( APLOG_MARK, APLOG_TRACE3, 0, session->c,
- "h2_session: wait for data, %ld micros", (long)(wait_micros));
- h2_conn_io_pass(&session->io);
+ if (APLOGcdebug(session->c)) {
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c,
+ "h2_session: wait for data, %ld micros",
+ (long)wait_micros);
+ }
+ nghttp2_session_send(session->ngh2);
+ h2_conn_io_flush(&session->io);
status = h2_mplx_out_trywait(session->mplx, wait_micros, session->iowait);
if (status == APR_TIMEUP) {
}
}
- if (h2_stream_set_has_open_input(session->streams)) {
- /* Check that any pending window updates are sent. */
- status = h2_mplx_in_update_windows(session->mplx, update_window, session);
- if (APR_STATUS_IS_EAGAIN(status)) {
- status = APR_SUCCESS;
- }
- else if (status == APR_SUCCESS) {
- /* need to flush window updates onto the connection asap */
- h2_conn_io_flush(&session->io);
- }
+ /* Check that any pending window updates are sent. */
+ status = h2_mplx_in_update_windows(session->mplx);
+ if (APR_STATUS_IS_EAGAIN(status)) {
+ status = APR_SUCCESS;
+ }
+ else if (status == APR_SUCCESS) {
+ /* need to flush window updates onto the connection asap */
+ h2_conn_io_flush(&session->io);
}
}