-*- coding: utf-8 -*-
Changes with Apache 2.5.0
+ *) mod_http2: http/2 streams now with state handling/transitions as defined
+ in RFC7540. Stream cleanup/connection shutdown reworked to become easier
+ to understand/maintain/debug. Added many asserts on state and cleanup
+ transitions. [Stefan Eissing]
+
*) mod_proxy_fcgi: Add ProxyFCGISetEnvIf to fixup CGI environment
variables just before invoking the FastCGI. [Eric Covener,
Jacob Champion]
H2_PUSH_FAST_LOAD,
} h2_push_policy;
-typedef enum {
- H2_STREAM_ST_IDLE,
- H2_STREAM_ST_OPEN,
- H2_STREAM_ST_RESV_LOCAL,
- H2_STREAM_ST_RESV_REMOTE,
- H2_STREAM_ST_CLOSED_INPUT,
- H2_STREAM_ST_CLOSED_OUTPUT,
- H2_STREAM_ST_CLOSED,
-} h2_stream_state_t;
-
typedef enum {
H2_SESSION_ST_INIT, /* send initial SETTINGS, etc. */
H2_SESSION_ST_DONE, /* finished, connection close */
H2_SESSION_ST_IDLE, /* nothing to write, expecting data inc */
H2_SESSION_ST_BUSY, /* read/write without stop */
H2_SESSION_ST_WAIT, /* waiting for tasks reporting back */
+ H2_SESSION_ST_CLEANUP, /* pool is being cleaned up */
} h2_session_state;
typedef struct h2_session_props {
unsigned int shutdown : 1; /* if the final GOAWAY has been sent */
} h2_session_props;
+typedef enum h2_stream_state_t {
+ H2_SS_IDLE,
+ H2_SS_RSVD_R,
+ H2_SS_RSVD_L,
+ H2_SS_OPEN,
+ H2_SS_CLOSED_R,
+ H2_SS_CLOSED_L,
+ H2_SS_CLOSED,
+ H2_SS_CLEANUP,
+ H2_SS_MAX
+} h2_stream_state_t;
+
+typedef enum {
+ H2_SEV_CLOSED_L,
+ H2_SEV_CLOSED_R,
+ H2_SEV_CANCELLED,
+ H2_SEV_EOS_SENT
+} h2_stream_event_t;
+
/* h2_request is the transformer of HTTP2 streams into HTTP/1.1 internal
* format that will be fed to various httpd input filters to finally
}
-apr_size_t h2_util_bl_print(char *buffer, apr_size_t bmax,
- const char *tag, const char *sep,
- h2_blist *bl)
+static apr_size_t h2_util_bl_print(char *buffer, apr_size_t bmax,
+ const char *tag, const char *sep,
+ h2_blist *bl)
{
apr_size_t off = 0;
const char *sp = "";
memset(buffer, 0, bmax--);
off += apr_snprintf(buffer+off, bmax-off, "%s(", tag);
for (b = H2_BLIST_FIRST(bl);
- bmax && (b != H2_BLIST_SENTINEL(bl));
+ (bmax > off) && (b != H2_BLIST_SENTINEL(bl));
b = APR_BUCKET_NEXT(b)) {
off += h2_util_bucket_print(buffer+off, bmax-off, b, sp);
sp = " ";
}
- off += apr_snprintf(buffer+off, bmax-off, ")%s", sep);
+ if (bmax > off) {
+ off += apr_snprintf(buffer+off, bmax-off, ")%s", sep);
+ }
}
else {
off += apr_snprintf(buffer+off, bmax-off, "%s(null)%s", tag, sep);
return APR_SUCCESS;
}
-static void beam_set_send_pool(h2_bucket_beam *beam, apr_pool_t *pool);
-static void beam_set_recv_pool(h2_bucket_beam *beam, apr_pool_t *pool);
+int h2_beam_is_closed(h2_bucket_beam *beam)
+{
+ return beam->closed;
+}
static int pool_register(h2_bucket_beam *beam, apr_pool_t *pool,
apr_status_t (*cleanup)(void *))
return APR_SUCCESS;
}
-static void beam_set_recv_pool(h2_bucket_beam *beam, apr_pool_t *pool)
-{
- if (beam->recv_pool == pool ||
- (beam->recv_pool && pool
- && apr_pool_is_ancestor(beam->recv_pool, pool))) {
- /* when receiver same or sub-pool of existing, stick
- * to the the pool we already have. */
- return;
- }
- pool_kill(beam, beam->recv_pool, beam_recv_cleanup);
- beam->recv_pool = pool;
- pool_register(beam, beam->recv_pool, beam_recv_cleanup);
-}
-
static apr_status_t beam_send_cleanup(void *data)
{
h2_bucket_beam *beam = data;
return beam->aborted? APR_ECONNABORTED : APR_SUCCESS;
}
+apr_status_t h2_beam_leave(h2_bucket_beam *beam)
+{
+ h2_beam_lock bl;
+
+ if (enter_yellow(beam, &bl) == APR_SUCCESS) {
+ if (beam->recv_buffer && !APR_BRIGADE_EMPTY(beam->recv_buffer)) {
+ apr_brigade_cleanup(beam->recv_buffer);
+ }
+ beam->aborted = 1;
+ beam_close(beam);
+ leave_yellow(beam, &bl);
+ }
+ return APR_SUCCESS;
+}
+
apr_status_t h2_beam_wait_empty(h2_bucket_beam *beam, apr_read_type_e block)
{
apr_status_t status;
}
/* transfer enough buckets from our receiver brigade, if we have one */
- beam_set_recv_pool(beam, bb->p);
while (beam->recv_buffer
&& !APR_BRIGADE_EMPTY(beam->recv_buffer)
&& (readbytes <= 0 || remain >= 0)) {
return 0;
}
+void h2_beam_log(h2_bucket_beam *beam, conn_rec *c, int level, const char *msg)
+{
+ if (0 && beam && APLOG_C_IS_LEVEL(c,level)) {
+ char buffer[2048];
+ apr_size_t blen = sizeof(buffer)/sizeof(buffer[0]) - 1;
+ apr_size_t off = 0;
+
+ buffer[0] = 0;
+ off += apr_snprintf(buffer+off, blen-off, "cl=%d, ", beam->closed);
+ off += h2_util_bl_print(buffer+off, blen-off, "to_send", ", ", &beam->send_list);
+ if (blen > off) {
+ off += h2_util_bb_print(buffer+off, blen-off, "recv_buffer", ", ", beam->recv_buffer);
+ if (blen > off) {
+ off += h2_util_bl_print(buffer+off, blen-off, "hold", ", ", &beam->hold_list);
+ if (blen > off) {
+ off += h2_util_bl_print(buffer+off, blen-off, "purge", "", &beam->purge_list);
+ }
+ }
+ }
+ buffer[blen-1] = 0;
+ ap_log_cerror(APLOG_MARK, level, 0, c, "beam(%ld-%d,%s): %s %s",
+ c->id, beam->id, beam->tag, msg, buffer);
+ }
+}
+
+
APR_RING_PREPEND(&(a)->list, &(b)->list, apr_bucket, link); \
} while (0)
-/**
- * Print the buckets in the list into the buffer (type and lengths).
- * @param buffer the buffer to print into
- * @param bmax max number of characters to place in buffer, incl. trailing 0
- * @param tag tag string for this bucket list
- * @param sep separator to use
- * @param bl the bucket list to print
- * @return number of characters printed
- */
-apr_size_t h2_util_bl_print(char *buffer, apr_size_t bmax,
- const char *tag, const char *sep,
- h2_blist *bl);
-
/*******************************************************************************
* h2_bucket_beam
******************************************************************************/
*/
apr_status_t h2_beam_close(h2_bucket_beam *beam);
+/**
+ * Receives leaves the beam, e.g. will no longer read. This will
+ * interrupt any sender blocked writing and fail future send.
+ *
+ * Call from the receiver side only.
+ */
+apr_status_t h2_beam_leave(h2_bucket_beam *beam);
+
+int h2_beam_is_closed(h2_bucket_beam *beam);
+
/**
* Return APR_SUCCESS when all buckets in transit have been handled.
* When called with APR_BLOCK_READ and a mutex set, will wait until the green
void h2_register_bucket_beamer(h2_bucket_beamer *beamer);
+void h2_beam_log(h2_bucket_beam *beam, conn_rec *c, int level, const char *msg);
+
#endif /* h2_bucket_beam_h */
}
apr_bucket_free(h);
if (stream) {
- h2_stream_eos_destroy(stream);
+ h2_stream_dispatch(stream, H2_SEV_EOS_SENT);
}
}
}
#include "h2_ngn_shed.h"
#include "h2_request.h"
#include "h2_stream.h"
+#include "h2_session.h"
#include "h2_task.h"
#include "h2_worker.h"
#include "h2_workers.h"
#include "h2_util.h"
-static void h2_beam_log(h2_bucket_beam *beam, int id, const char *msg,
- conn_rec *c, int level)
-{
- if (beam && APLOG_C_IS_LEVEL(c,level)) {
- char buffer[2048];
- apr_size_t off = 0;
-
- off += apr_snprintf(buffer+off, H2_ALEN(buffer)-off, "cl=%d, ", beam->closed);
- off += h2_util_bl_print(buffer+off, H2_ALEN(buffer)-off, "to_send", ", ", &beam->send_list);
- off += h2_util_bb_print(buffer+off, H2_ALEN(buffer)-off, "recv_buffer", ", ", beam->recv_buffer);
- off += h2_util_bl_print(buffer+off, H2_ALEN(buffer)-off, "hold", ", ", &beam->hold_list);
- off += h2_util_bl_print(buffer+off, H2_ALEN(buffer)-off, "purge", "", &beam->purge_list);
-
- ap_log_cerror(APLOG_MARK, level, 0, c, "beam(%ld-%d): %s %s",
- c->id, id, msg, buffer);
- }
-}
-
-/* utility for iterating over ihash task sets */
+/* utility for iterating over ihash stream sets */
typedef struct {
h2_mplx *m;
- h2_task *task;
+ h2_stream *stream;
apr_time_t now;
-} task_iter_ctx;
+} stream_iter_ctx;
/* NULL or the mutex hold by this thread, used for recursive calls
*/
return 0;
}
-static void have_out_data_for(h2_mplx *m, h2_stream *stream, int response);
-static void task_destroy(h2_mplx *m, h2_task *task, int called_from_master);
+static void have_out_data_for(h2_mplx *m, h2_stream *stream);
static void check_tx_reservation(h2_mplx *m)
{
if (m->tx_handles_reserved <= 0) {
m->tx_handles_reserved += h2_workers_tx_reserve(m->workers,
- H2MIN(m->tx_chunk_size, h2_ihash_count(m->tasks)));
+ H2MIN(m->tx_chunk_size, h2_ihash_count(m->streams)));
}
}
m->tx_handles_reserved = m->tx_chunk_size;
h2_workers_tx_free(m->workers, count);
}
- else if (m->tx_handles_reserved && h2_ihash_empty(m->tasks)) {
+ else if (m->tx_handles_reserved && h2_ihash_empty(m->streams)) {
h2_workers_tx_free(m->workers, m->tx_handles_reserved);
m->tx_handles_reserved = 0;
}
}
-typedef struct {
- h2_mplx *m;
-} purge_ctx;
-
-static int purge_stream(void *ctx, void *val)
+static void stream_joined(h2_mplx *m, h2_stream *stream)
{
- purge_ctx *pctx = ctx;
- h2_stream *stream = val;
- int stream_id = stream->id;
- h2_task *task;
-
- /* stream_cleanup clears all buffers and destroys any buckets
- * that might hold references into task space. Needs to be done
- * before task destruction, otherwise it will complain. */
- h2_stream_cleanup(stream);
+ ap_assert(!stream->task || stream->task->worker_done);
- task = h2_ihash_get(pctx->m->tasks, stream_id);
- if (task) {
- task_destroy(pctx->m, task, 1);
- }
-
- h2_stream_destroy(stream);
- h2_ihash_remove(pctx->m->spurge, stream_id);
- return 0;
+ h2_ihash_remove(m->shold, stream->id);
+ h2_ihash_add(m->spurge, stream);
+ m->tx_handles_reserved += h2_beam_get_files_beamed(stream->input);
+ m->tx_handles_reserved += h2_beam_get_files_beamed(stream->output);
}
-static void purge_streams(h2_mplx *m)
+static void stream_cleanup(h2_mplx *m, h2_stream *stream)
{
- if (!h2_ihash_empty(m->spurge)) {
- purge_ctx ctx;
- ctx.m = m;
- while(!h2_ihash_iter(m->spurge, purge_stream, &ctx)) {
- /* repeat until empty */
- }
- h2_ihash_clear(m->spurge);
- }
-}
+ ap_assert(stream->state == H2_SS_CLEANUP);
+
+ h2_beam_on_produced(stream->output, NULL, NULL);
+ h2_beam_on_consumed(stream->input, NULL, NULL, NULL);
+ h2_beam_abort(stream->input);
+ h2_beam_leave(stream->output);
+
+ h2_stream_cleanup(stream);
-static void h2_mplx_destroy(h2_mplx *m)
-{
- ap_assert(m);
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
- "h2_mplx(%ld): destroy, tasks=%d",
- m->id, (int)h2_ihash_count(m->tasks));
- check_tx_free(m);
- /* pool will be destroyed as child of h2_session->pool,
- slave connection pools are children of m->pool */
+ h2_iq_remove(m->q, stream->id);
+ h2_ihash_remove(m->streams, stream->id);
+ h2_ihash_add(m->shold, stream);
+
+ if (!stream->task || stream->task->worker_done) {
+ stream_joined(m, stream);
+ }
+ else if (stream->task) {
+ stream->task->c->aborted = 1;
+ apr_thread_cond_broadcast(m->task_thawed);
+ }
}
/**
status = apr_thread_mutex_create(&m->lock, APR_THREAD_MUTEX_DEFAULT,
m->pool);
if (status != APR_SUCCESS) {
- h2_mplx_destroy(m);
return NULL;
}
status = apr_thread_cond_create(&m->task_thawed, m->pool);
if (status != APR_SUCCESS) {
- h2_mplx_destroy(m);
return NULL;
}
m->stream_max_mem = h2_config_geti(conf, H2_CONF_STREAM_MAX_MEM);
m->streams = h2_ihash_create(m->pool, offsetof(h2_stream,id));
+ m->sredo = h2_ihash_create(m->pool, offsetof(h2_stream,id));
m->shold = h2_ihash_create(m->pool, offsetof(h2_stream,id));
m->spurge = h2_ihash_create(m->pool, offsetof(h2_stream,id));
m->q = h2_iq_create(m->pool, m->max_streams);
m->readyq = h2_iq_create(m->pool, m->max_streams);
- m->tasks = h2_ihash_create(m->pool, offsetof(h2_task,stream_id));
- m->redo_tasks = h2_ihash_create(m->pool, offsetof(h2_task, stream_id));
m->stream_timeout = stream_timeout;
m->workers = workers;
return 0;
}
-
-static void task_destroy(h2_mplx *m, h2_task *task, int called_from_master)
+static void task_destroy(h2_mplx *m, h2_task *task)
{
conn_rec *slave = NULL;
int reuse_slave = 0;
- ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, m->c,
- "h2_task(%s): destroy", task->id);
- if (called_from_master) {
- /* Process outstanding events before destruction */
- h2_stream *stream = h2_ihash_get(m->streams, task->stream_id);
- if (stream) {
- input_consumed_signal(m, stream);
- }
- }
-
- h2_beam_on_produced(task->output.beam, NULL, NULL);
- ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c,
- APLOGNO(03385) "h2_task(%s): destroy "
- "output beam empty=%d, holds proxies=%d",
- task->id,
- h2_beam_empty(task->output.beam),
- h2_beam_holds_proxies(task->output.beam));
+ h2_beam_log(task->output.beam, m->c, APLOG_DEBUG,
+ APLOGNO(03385) "h2_task_destroy");
slave = task->c;
reuse_slave = ((m->spare_slaves->nelts < m->spare_slaves->nalloc)
&& !task->rst_error);
- h2_ihash_remove(m->tasks, task->stream_id);
- h2_ihash_remove(m->redo_tasks, task->stream_id);
- h2_task_destroy(task);
-
if (slave) {
if (reuse_slave && slave->keepalive == AP_CONN_KEEPALIVE) {
+ h2_task_destroy(task);
APR_ARRAY_PUSH(m->spare_slaves, conn_rec*) = slave;
}
else {
h2_slave_destroy(slave);
}
}
-
- check_tx_free(m);
}
-static void stream_done(h2_mplx *m, h2_stream *stream, int rst_error)
-{
- h2_task *task;
+static int stream_destroy_iter(void *ctx, void *val)
+{
+ h2_mplx *m = ctx;
+ h2_stream *stream = val;
+
+ h2_ihash_remove(m->spurge, stream->id);
+ ap_assert(stream->state == H2_SS_CLEANUP);
- ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, m->c,
- "h2_stream(%ld-%d): done", m->c->id, stream->id);
- /* Situation: we are, on the master connection, done with processing
- * the stream. Either we have handled it successfully, or the stream
- * was reset by the client or the connection is gone and we are
- * shutting down the whole session.
- *
- * We possibly have created a task for this stream to be processed
- * on a slave connection. The processing might actually be ongoing
- * right now or has already finished. A finished task waits for its
- * stream to be done. This is the common case.
- *
- * If the stream had input (e.g. the request had a body), a task
- * may have read, or is still reading buckets from the input beam.
- * This means that the task is referencing memory from the stream's
- * pool (or the master connection bucket alloc). Before we can free
- * the stream pool, we need to make sure that those references are
- * gone. This is what h2_beam_shutdown() on the input waits for.
- *
- * With the input handled, we can tear down that beam and care
- * about the output beam. The stream might still have buffered some
- * buckets read from the output, so we need to get rid of those. That
- * is done by h2_stream_cleanup().
- *
- * Now it is save to destroy the task (if it exists and is finished).
- *
- * FIXME: we currently destroy the stream, even if the task is still
- * ongoing. This is not ok, since task->request is coming from stream
- * memory. We should either copy it on task creation or wait with the
- * stream destruction until the task is done.
- */
- h2_iq_remove(m->q, stream->id);
- h2_ihash_remove(m->streams, stream->id);
- h2_ihash_remove(m->shold, stream->id);
+ if (stream->input == NULL || stream->output == NULL) {
+ ap_log_cerror(APLOG_MARK, APLOG_ERR, 0, m->c,
+ "h2_stream(%ld-%d,%s): already with beams==NULL",
+ m->id, stream->id, h2_stream_state_str(stream));
+ return 0;
+ }
+ /* Process outstanding events before destruction */
+ input_consumed_signal(m, stream);
- h2_stream_cleanup(stream);
- m->tx_handles_reserved += h2_beam_get_files_beamed(stream->input);
- h2_beam_on_consumed(stream->input, NULL, NULL, NULL);
- /* Let anyone blocked reading know that there is no more to come */
- h2_beam_abort(stream->input);
- /* Remove mutex after, so that abort still finds cond to signal */
- h2_beam_mutex_set(stream->input, NULL, NULL, NULL);
- m->tx_handles_reserved += h2_beam_get_files_beamed(stream->output);
-
- task = h2_ihash_get(m->tasks, stream->id);
- if (task) {
- if (!task->worker_done) {
- /* task still running, cleanup once it is done */
- if (rst_error) {
- h2_task_rst(task, rst_error);
- }
- h2_ihash_add(m->shold, stream);
- return;
- }
- else {
- /* already finished */
- task_destroy(m, task, 1);
- }
+ h2_beam_log(stream->input, m->c, APLOG_TRACE2, "stream_destroy");
+ h2_beam_log(stream->output, m->c, APLOG_TRACE2, "stream_destroy");
+ h2_beam_destroy(stream->input);
+ stream->input = NULL;
+ h2_beam_destroy(stream->output);
+ stream->output = NULL;
+ if (stream->task) {
+ task_destroy(m, stream->task);
+ stream->task = NULL;
}
- h2_ihash_add(m->spurge, stream);
+ h2_stream_destroy(stream);
+ return 0;
}
-static int stream_done_iter(void *ctx, void *val)
+static void purge_streams(h2_mplx *m)
{
- stream_done((h2_mplx*)ctx, val, 0);
- return 0;
+ if (!h2_ihash_empty(m->spurge)) {
+ while (!h2_ihash_iter(m->spurge, stream_destroy_iter, m)) {
+ /* repeat until empty */
+ }
+ check_tx_free(m);
+ }
}
typedef struct {
return status;
}
-static int task_print(void *ctx, void *val)
-{
+static int report_stream_iter(void *ctx, void *val) {
h2_mplx *m = ctx;
- h2_task *task = val;
-
+ h2_stream *stream = val;
+ h2_task *task = stream->task;
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
+ H2_STREAM_MSG(stream, "started=%d, scheduled=%d, ready=%d"),
+ !!stream->task, stream->scheduled, h2_stream_is_ready(stream));
if (task) {
- h2_stream *stream = h2_ihash_get(m->streams, task->stream_id);
-
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c, /* NO APLOGNO */
- "->03198: h2_stream(%s): %s %s %s"
- "[orph=%d/started=%d/done=%d/frozen=%d]",
- task->id, task->request->method,
- task->request->authority, task->request->path,
- (stream? 0 : 1), task->worker_started,
+ H2_STREAM_MSG(stream, "->03198: %s %s %s"
+ "[started=%d/done=%d/frozen=%d]"),
+ task->request->method, task->request->authority,
+ task->request->path, task->worker_started,
task->worker_done, task->frozen);
}
else {
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c, /* NO APLOGNO */
- "->03198: h2_stream(%ld-NULL): NULL", m->id);
+ H2_STREAM_MSG(stream, "->03198: no task"));
}
return 1;
}
-static int task_abort_connection(void *ctx, void *val)
-{
- h2_task *task = val;
- if (!task->worker_done) {
- if (task->c) {
- task->c->aborted = 1;
- }
- h2_beam_abort(task->input.beam);
- h2_beam_abort(task->output.beam);
- }
+static int unexpected_stream_iter(void *ctx, void *val) {
+ h2_mplx *m = ctx;
+ h2_stream *stream = val;
+ ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, /* NO APLOGNO */
+ H2_STREAM_MSG(stream,
+ "unexpected, started=%d, scheduled=%d, ready=%d"),
+ !!stream->task, stream->scheduled, h2_stream_is_ready(stream));
return 1;
}
-static int report_stream_iter(void *ctx, void *val) {
+static int stream_cancel_iter(void *ctx, void *val) {
h2_mplx *m = ctx;
h2_stream *stream = val;
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
- "h2_mplx(%ld-%d): exists, started=%d, scheduled=%d, ready=%d",
- m->id, stream->id, stream->started, stream->scheduled,
- h2_stream_is_ready(stream));
- return 1;
-}
-static int task_done_iter(void *ctx, void *val);
+ /* take over event monitoring */
+ h2_stream_set_monitor(stream, NULL);
+ /* Reset, should transit to CLOSED state */
+ h2_stream_rst(stream, H2_ERR_NO_ERROR);
+ /* All connection data has been sent, simulate cleanup */
+ h2_stream_dispatch(stream, H2_SEV_EOS_SENT);
+ stream_cleanup(m, stream);
+ return 0;
+}
-apr_status_t h2_mplx_release_and_join(h2_mplx *m, apr_thread_cond_t *wait)
+void h2_mplx_release_and_join(h2_mplx *m, apr_thread_cond_t *wait)
{
apr_status_t status;
+ int i, wait_secs = 60;
int acquired;
/* How to shut down a h2 connection:
- * 1. tell the workers that no more tasks will come from us */
+ * 0. tell the workers that no more tasks will come from us */
h2_workers_unregister(m->workers, m);
- if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
- int i, wait_secs = 60;
+ enter_mutex(m, &acquired);
- /* 2. disable WINDOW_UPDATEs and set the mplx to aborted, clear
- * our TODO list and purge any streams we have collected */
- h2_mplx_set_consumed_cb(m, NULL, NULL);
- h2_mplx_abort(m);
- h2_iq_clear(m->q);
- h2_ihash_clear(m->spurge);
-
- /* 3. wakeup all sleeping tasks. Mark all still active streams as 'done'.
- * m->streams has to be empty afterwards with streams either in
- * a) m->shold because a task is still active
- * b) m->spurge because task is done, or was not started */
- h2_ihash_iter(m->tasks, task_abort_connection, m);
- apr_thread_cond_broadcast(m->task_thawed);
- while (!h2_ihash_iter(m->streams, stream_done_iter, m)) {
- /* iterate until all streams have been removed */
- }
- ap_assert(h2_ihash_empty(m->streams));
-
- /* 4. while workers are busy on this connection, meaning they
- * are processing tasks from this connection, wait on them finishing
- * to wake us and check again. Eventually, this has to succeed. */
- m->join_wait = wait;
- for (i = 0; m->workers_busy > 0; ++i) {
- status = apr_thread_cond_timedwait(wait, m->lock, apr_time_from_sec(wait_secs));
-
- if (APR_STATUS_IS_TIMEUP(status)) {
- /* This can happen if we have very long running requests
- * that do not time out on IO. */
- ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c, APLOGNO(03198)
- "h2_mplx(%ld): release, waiting for %d seconds now for "
- "%d h2_workers to return, have still %d tasks outstanding",
- m->id, i*wait_secs, m->workers_busy,
- (int)h2_ihash_count(m->tasks));
- h2_ihash_iter(m->shold, report_stream_iter, m);
- h2_ihash_iter(m->tasks, task_print, m);
- }
- }
- m->join_wait = NULL;
+ /* How to shut down a h2 connection:
+ * 1. set aborted flag and cancel all streams still active */
+ m->aborted = 1;
+ while (!h2_ihash_iter(m->streams, stream_cancel_iter, m)) {
+ /* until empty */
+ }
+
+ /* 2. terminate ngn_shed, no more streams
+ * should be scheduled or in the active set */
+ h2_ngn_shed_abort(m->ngn_shed);
+ ap_assert(h2_ihash_empty(m->streams));
+ ap_assert(h2_iq_empty(m->q));
+
+ /* 3. while workers are busy on this connection, meaning they
+ * are processing tasks from this connection, wait on them finishing
+ * in order to wake us and let us check again.
+ * Eventually, this has to succeed. */
+ m->join_wait = wait;
+ for (i = 0; h2_ihash_count(m->shold) > 0; ++i) {
+ status = apr_thread_cond_timedwait(wait, m->lock, apr_time_from_sec(wait_secs));
- /* 5. All workers for this connection are done, we are in
- * single-threaded processing now effectively. */
- leave_mutex(m, acquired);
-
- if (!h2_ihash_empty(m->tasks)) {
- /* when we are here, we lost track of the tasks still present.
- * this currently happens with mod_proxy_http2 when we shut
- * down a h2_req_engine with tasks assigned. Since no parallel
- * processing is going on any more, we just clean them up. */
- ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c, APLOGNO(03056)
- "h2_mplx(%ld): 3. release_join with %d tasks",
- m->id, (int)h2_ihash_count(m->tasks));
- h2_ihash_iter(m->tasks, task_print, m);
-
- while (!h2_ihash_iter(m->tasks, task_done_iter, m)) {
- /* iterate until all tasks have been removed */
- }
+ if (APR_STATUS_IS_TIMEUP(status)) {
+ /* This can happen if we have very long running requests
+ * that do not time out on IO. */
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c, APLOGNO(03198)
+ "h2_mplx(%ld): release, waiting for %d seconds now for "
+ "%d outstanding tasks",
+ m->id, i*wait_secs, (int)h2_ihash_count(m->shold));
+ h2_ihash_iter(m->shold, report_stream_iter, m);
}
-
- /* 6. With all tasks done, the stream hold should be empty now. */
- ap_assert(h2_ihash_empty(m->shold));
-
- /* 7. close the h2_req_enginge shed and self destruct */
- h2_ngn_shed_destroy(m->ngn_shed);
- m->ngn_shed = NULL;
- h2_mplx_destroy(m);
}
- return status;
-}
-
-void h2_mplx_abort(h2_mplx *m)
-{
- int acquired;
+ m->join_wait = NULL;
- if (!m->aborted && enter_mutex(m, &acquired) == APR_SUCCESS) {
- m->aborted = 1;
- h2_ngn_shed_abort(m->ngn_shed);
- leave_mutex(m, acquired);
+ /* 4. close the h2_req_enginge shed */
+ h2_ngn_shed_destroy(m->ngn_shed);
+ m->ngn_shed = NULL;
+
+ /* 4. With all workers done, all streams should be in spurge */
+ if (!h2_ihash_empty(m->shold)) {
+ ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, APLOGNO()
+ "h2_mplx(%ld): unexpected %d streams in hold",
+ m->id, (int)h2_ihash_count(m->shold));
+ h2_ihash_iter(m->shold, unexpected_stream_iter, m);
}
+ /*ap_assert(h2_ihash_empty(m->shold));*/
+
+ /* 5. return any file resources allocated */
+ check_tx_free(m);
+
+ leave_mutex(m, acquired);
+
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
+ "h2_mplx(%ld): released", m->id);
}
-apr_status_t h2_mplx_stream_done(h2_mplx *m, h2_stream *stream)
+apr_status_t h2_mplx_stream_cleanup(h2_mplx *m, h2_stream *stream)
{
apr_status_t status = APR_SUCCESS;
int acquired;
if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
- "h2_mplx(%ld-%d): marking stream as done.",
- m->id, stream->id);
- stream_done(m, stream, stream->rst_error);
- purge_streams(m);
+ "h2_mplx(%ld-%d): cleanup stream.", m->id, stream->id);
+ stream_cleanup(m, stream);
leave_mutex(m, acquired);
}
return status;
if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
stream = h2_ihash_get(m->streams, beam->id);
if (stream) {
- have_out_data_for(m, stream, 0);
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, m->c,
+ "h2_mplx(%s): output_produced", stream->task->id);
+ have_out_data_for(m, stream);
}
leave_mutex(m, acquired);
}
static apr_status_t out_open(h2_mplx *m, int stream_id, h2_bucket_beam *beam)
{
apr_status_t status = APR_SUCCESS;
- h2_task *task = h2_ihash_get(m->tasks, stream_id);
h2_stream *stream = h2_ihash_get(m->streams, stream_id);
apr_size_t beamed_count;
- if (!task || !stream) {
+ if (!stream || !stream->task) {
return APR_ECONNABORTED;
}
if (APLOGctrace2(m->c)) {
- h2_beam_log(beam, stream_id, "out_open", m->c, APLOG_TRACE2);
+ h2_beam_log(beam, m->c, APLOG_TRACE2, "out_open");
}
else {
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, m->c,
- "h2_mplx(%s): out open", task->id);
+ "h2_mplx(%s): out open", stream->task->id);
}
- h2_beam_on_consumed(stream->output, NULL, stream_output_consumed, task);
+ h2_beam_on_consumed(stream->output, NULL, stream_output_consumed, stream->task);
h2_beam_on_produced(stream->output, output_produced, m);
beamed_count = h2_beam_get_files_beamed(stream->output);
if (m->tx_handles_reserved >= beamed_count) {
else {
m->tx_handles_reserved = 0;
}
- if (!task->output.copy_files) {
+ if (!stream->task->output.copy_files) {
h2_beam_on_file_beam(stream->output, can_beam_file, m);
}
/* time to protect the beam against multi-threaded use */
- h2_beam_mutex_set(stream->output, beam_enter, task->cond, m);
+ h2_beam_mutex_set(stream->output, beam_enter, stream->task->cond, m);
/* we might see some file buckets in the output, see
* if we have enough handles reserved. */
check_tx_reservation(m);
- have_out_data_for(m, stream, 0);
+ have_out_data_for(m, stream);
return status;
}
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, m->c,
"h2_mplx(%s): close", task->id);
status = h2_beam_close(task->output.beam);
- h2_beam_log(task->output.beam, task->stream_id, "out_close", m->c,
- APLOG_TRACE2);
+ h2_beam_log(task->output.beam, m->c, APLOG_TRACE2, "out_close");
output_consumed_signal(m, task);
- have_out_data_for(m, stream, 0);
+ have_out_data_for(m, stream);
return status;
}
return status;
}
-static void have_out_data_for(h2_mplx *m, h2_stream *stream, int response)
+static void have_out_data_for(h2_mplx *m, h2_stream *stream)
{
ap_assert(m);
ap_assert(stream);
else {
h2_ihash_add(m->streams, stream);
if (h2_stream_is_ready(stream)) {
+ /* already have a response */
apr_atomic_set32(&m->event_pending, 1);
h2_iq_append(m->readyq, stream->id);
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, m->c,
+ "h2_mplx(%ld-%d): process, add to readyq",
+ m->c->id, stream->id);
}
else {
if (!m->need_registration) {
do_registration = m->need_registration;
}
h2_iq_add(m->q, stream->id, cmp, ctx);
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, m->c,
+ "h2_mplx(%ld-%d): process, add to q",
+ m->c->id, stream->id);
}
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, m->c,
- "h2_mplx(%ld-%d): process", m->c->id, stream->id);
}
leave_mutex(m, acquired);
}
static h2_task *next_stream_task(h2_mplx *m)
{
- h2_task *task = NULL;
h2_stream *stream;
int sid;
- while (!m->aborted && !task && (m->workers_busy < m->workers_limit)
+ while (!m->aborted && (m->workers_busy < m->workers_limit)
&& (sid = h2_iq_shift(m->q)) > 0) {
stream = h2_ihash_get(m->streams, sid);
slave->sbh = m->c->sbh;
slave->aborted = 0;
- task = h2_task_create(slave, stream->id, stream->request,
- stream->input, stream->output, m);
- h2_ihash_add(m->tasks, task);
-
- m->c->keepalives++;
- apr_table_setn(slave->notes, H2_TASK_ID_NOTE, task->id);
- if (new_conn) {
- h2_slave_run_pre_connection(slave, ap_get_conn_socket(slave));
- }
- stream->started = 1;
- task->worker_started = 1;
- task->started_at = apr_time_now();
- if (sid > m->max_stream_started) {
- m->max_stream_started = sid;
+ if (!stream->task) {
+ stream->task = h2_task_create(slave, stream->id, stream->request,
+ stream->input, stream->output, m);
+
+ m->c->keepalives++;
+ apr_table_setn(slave->notes, H2_TASK_ID_NOTE, stream->task->id);
+ if (new_conn) {
+ h2_slave_run_pre_connection(slave, ap_get_conn_socket(slave));
+ }
+ if (sid > m->max_stream_started) {
+ m->max_stream_started = sid;
+ }
+
+ h2_beam_timeout_set(stream->input, m->stream_timeout);
+ h2_beam_on_consumed(stream->input, stream_input_ev,
+ stream_input_consumed, m);
+ h2_beam_on_file_beam(stream->input, can_beam_file, m);
+ h2_beam_mutex_set(stream->input, beam_enter, stream->task->cond, m);
+
+ h2_beam_buffer_size_set(stream->output, m->stream_max_mem);
+ h2_beam_timeout_set(stream->output, m->stream_timeout);
}
-
- h2_beam_timeout_set(stream->input, m->stream_timeout);
- h2_beam_on_consumed(stream->input, stream_input_ev,
- stream_input_consumed, m);
- h2_beam_on_file_beam(stream->input, can_beam_file, m);
- h2_beam_mutex_set(stream->input, beam_enter, task->cond, m);
-
- h2_beam_buffer_size_set(stream->output, m->stream_max_mem);
- h2_beam_timeout_set(stream->output, m->stream_timeout);
+ stream->task->worker_started = 1;
+ stream->task->started_at = apr_time_now();
++m->workers_busy;
+ return stream->task;
}
}
- return task;
+ return NULL;
}
h2_task *h2_mplx_pop_task(h2_mplx *m, int *has_more)
static void task_done(h2_mplx *m, h2_task *task, h2_req_engine *ngn)
{
+ h2_stream *stream;
+
if (task->frozen) {
/* this task was handed over to an engine for processing
* and the original worker has finished. That means the
apr_thread_cond_broadcast(m->task_thawed);
return;
}
- else {
- h2_stream *stream;
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
- "h2_mplx(%ld): task(%s) done", m->id, task->id);
- out_close(m, task);
-
- if (ngn) {
- apr_off_t bytes = 0;
- h2_beam_send(task->output.beam, NULL, APR_NONBLOCK_READ);
- bytes += h2_beam_get_buffered(task->output.beam);
- if (bytes > 0) {
- /* we need to report consumed and current buffered output
- * to the engine. The request will be streamed out or cancelled,
- * no more data is coming from it and the engine should update
- * its calculations before we destroy this information. */
- h2_req_engine_out_consumed(ngn, task->c, bytes);
- }
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
+ "h2_mplx(%ld): task(%s) done", m->id, task->id);
+ out_close(m, task);
+
+ if (ngn) {
+ apr_off_t bytes = 0;
+ h2_beam_send(task->output.beam, NULL, APR_NONBLOCK_READ);
+ bytes += h2_beam_get_buffered(task->output.beam);
+ if (bytes > 0) {
+ /* we need to report consumed and current buffered output
+ * to the engine. The request will be streamed out or cancelled,
+ * no more data is coming from it and the engine should update
+ * its calculations before we destroy this information. */
+ h2_req_engine_out_consumed(ngn, task->c, bytes);
}
-
- if (task->engine) {
- if (!m->aborted && !task->c->aborted
- && !h2_req_engine_is_shutdown(task->engine)) {
- ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c,
- "h2_mplx(%ld): task(%s) has not-shutdown "
- "engine(%s)", m->id, task->id,
- h2_req_engine_get_id(task->engine));
- }
- h2_ngn_shed_done_ngn(m->ngn_shed, task->engine);
+ }
+
+ if (task->engine) {
+ if (!m->aborted && !task->c->aborted
+ && !h2_req_engine_is_shutdown(task->engine)) {
+ ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c,
+ "h2_mplx(%ld): task(%s) has not-shutdown "
+ "engine(%s)", m->id, task->id,
+ h2_req_engine_get_id(task->engine));
}
-
- stream = h2_ihash_get(m->streams, task->stream_id);
- if (!m->aborted && stream
- && h2_ihash_get(m->redo_tasks, task->stream_id)) {
- /* reset and schedule again */
- h2_task_redo(task);
- h2_ihash_remove(m->redo_tasks, task->stream_id);
- h2_iq_add(m->q, task->stream_id, NULL, NULL);
- return;
+ h2_ngn_shed_done_ngn(m->ngn_shed, task->engine);
+ }
+
+ task->worker_done = 1;
+ task->done_at = apr_time_now();
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
+ "h2_mplx(%s): request done, %f ms elapsed", task->id,
+ (task->done_at - task->started_at) / 1000.0);
+
+ if (task->started_at > m->last_idle_block) {
+ /* this task finished without causing an 'idle block', e.g.
+ * a block by flow control.
+ */
+ if (task->done_at- m->last_limit_change >= m->limit_change_interval
+ && m->workers_limit < m->workers_max) {
+ /* Well behaving stream, allow it more workers */
+ m->workers_limit = H2MIN(m->workers_limit * 2,
+ m->workers_max);
+ m->last_limit_change = task->done_at;
+ m->need_registration = 1;
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
+ "h2_mplx(%ld): increase worker limit to %d",
+ m->id, m->workers_limit);
}
-
- task->worker_done = 1;
- task->done_at = apr_time_now();
+ }
+
+ stream = h2_ihash_get(m->streams, task->stream_id);
+ if (stream && !m->aborted && h2_ihash_get(m->sredo, stream->id)) {
+ /* reset and schedule again */
+ h2_task_redo(task);
+ h2_ihash_remove(m->sredo, stream->id);
+ h2_iq_add(m->q, stream->id, NULL, NULL);
+ return;
+ }
+
+ if (stream) {
+ /* stream not cleaned up, stay around */
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
- "h2_mplx(%s): request done, %f ms elapsed", task->id,
- (task->done_at - task->started_at) / 1000.0);
- if (task->started_at > m->last_idle_block) {
- /* this task finished without causing an 'idle block', e.g.
- * a block by flow control.
- */
- if (task->done_at- m->last_limit_change >= m->limit_change_interval
- && m->workers_limit < m->workers_max) {
- /* Well behaving stream, allow it more workers */
- m->workers_limit = H2MIN(m->workers_limit * 2,
- m->workers_max);
- m->last_limit_change = task->done_at;
- m->need_registration = 1;
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
- "h2_mplx(%ld): increase worker limit to %d",
- m->id, m->workers_limit);
- }
- }
-
- if (stream) {
- /* hang around until the stream deregisters */
- ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
- "h2_mplx(%s): task_done, stream still open",
- task->id);
- /* more data will not arrive, resume the stream */
- have_out_data_for(m, stream, 0);
- h2_beam_on_consumed(stream->output, NULL, NULL, NULL);
- h2_beam_mutex_set(stream->output, NULL, NULL, NULL);
- }
- else {
- /* stream no longer active, was it placed in hold? */
- stream = h2_ihash_get(m->shold, task->stream_id);
- if (stream) {
- ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
- "h2_mplx(%s): task_done, stream %d in hold",
- task->id, stream->id);
- /* We cannot destroy the stream here since this is
- * called from a worker thread and freeing memory pools
- * is only safe in the only thread using it (and its
- * parent pool / allocator) */
- h2_beam_on_consumed(stream->output, NULL, NULL, NULL);
- h2_beam_mutex_set(stream->output, NULL, NULL, NULL);
- h2_ihash_remove(m->shold, stream->id);
- h2_ihash_add(m->spurge, stream);
- }
- else {
- ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
- "h2_mplx(%s): task_done, stream not found",
- task->id);
- task_destroy(m, task, 0);
- }
- }
+ H2_STREAM_MSG(stream, "task_done, stream open"));
+ /* more data will not arrive, resume the stream */
+ h2_beam_mutex_set(stream->input, NULL, NULL, NULL);
+ h2_beam_mutex_set(stream->output, NULL, NULL, NULL);
+ h2_beam_leave(stream->input);
+ have_out_data_for(m, stream);
+ }
+ else if ((stream = h2_ihash_get(m->shold, task->stream_id)) != NULL) {
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
+ H2_STREAM_MSG(stream, "task_done, in hold"));
+ /* stream was just waiting for us. */
+ h2_beam_mutex_set(stream->input, NULL, NULL, NULL);
+ h2_beam_mutex_set(stream->output, NULL, NULL, NULL);
+ h2_beam_leave(stream->input);
+ stream_joined(m, stream);
+ }
+ else if ((stream = h2_ihash_get(m->spurge, task->stream_id)) != NULL) {
+ ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, APLOGNO()
+ "h2_mplx(%s): task_done, stream already in spurge",
+ task->id);
+ ap_assert("stream should not be in spurge" == NULL);
+ }
+ else {
+ ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, APLOGNO()
+ "h2_mplx(%s): task_done, stream not found",
+ task->id);
+ ap_assert("stream should still be available" == NULL);
}
-}
-
-static int task_done_iter(void *ctx, void *val)
-{
- task_done((h2_mplx*)ctx, val, 0);
- return 0;
}
void h2_mplx_task_done(h2_mplx *m, h2_task *task, h2_task **ptask)
static int latest_repeatable_unsubmitted_iter(void *data, void *val)
{
- task_iter_ctx *ctx = data;
- h2_stream *stream;
- h2_task *task = val;
- if (!task->worker_done && h2_task_can_redo(task)
- && !h2_ihash_get(ctx->m->redo_tasks, task->stream_id)) {
- stream = h2_ihash_get(ctx->m->streams, task->stream_id);
- if (stream && !h2_stream_is_ready(stream)) {
+ stream_iter_ctx *ctx = data;
+ h2_stream *stream = val;
+
+ if (stream->task && !stream->task->worker_done
+ && h2_task_can_redo(stream->task)
+ && !h2_ihash_get(ctx->m->sredo, stream->id)) {
+ if (!h2_stream_is_ready(stream)) {
/* this task occupies a worker, the response has not been submitted
* yet, not been cancelled and it is a repeatable request
* -> it can be re-scheduled later */
- if (!ctx->task || ctx->task->started_at < task->started_at) {
+ if (!ctx->stream
+ || (ctx->stream->task->started_at < stream->task->started_at)) {
/* we did not have one or this one was started later */
- ctx->task = task;
+ ctx->stream = stream;
}
}
}
return 1;
}
-static h2_task *get_latest_repeatable_unsubmitted_task(h2_mplx *m)
+static h2_stream *get_latest_repeatable_unsubmitted_stream(h2_mplx *m)
{
- task_iter_ctx ctx;
+ stream_iter_ctx ctx;
ctx.m = m;
- ctx.task = NULL;
- h2_ihash_iter(m->tasks, latest_repeatable_unsubmitted_iter, &ctx);
- return ctx.task;
+ ctx.stream = NULL;
+ h2_ihash_iter(m->streams, latest_repeatable_unsubmitted_iter, &ctx);
+ return ctx.stream;
}
static int timed_out_busy_iter(void *data, void *val)
{
- task_iter_ctx *ctx = data;
- h2_task *task = val;
- if (!task->worker_done
- && (ctx->now - task->started_at) > ctx->m->stream_timeout) {
+ stream_iter_ctx *ctx = data;
+ h2_stream *stream = val;
+ if (stream->task && !stream->task->worker_done
+ && (ctx->now - stream->task->started_at) > ctx->m->stream_timeout) {
/* timed out stream occupying a worker, found */
- ctx->task = task;
+ ctx->stream = stream;
return 0;
}
return 1;
}
-static h2_task *get_timed_out_busy_task(h2_mplx *m)
+static h2_stream *get_timed_out_busy_stream(h2_mplx *m)
{
- task_iter_ctx ctx;
+ stream_iter_ctx ctx;
ctx.m = m;
- ctx.task = NULL;
+ ctx.stream = NULL;
ctx.now = apr_time_now();
- h2_ihash_iter(m->tasks, timed_out_busy_iter, &ctx);
- return ctx.task;
+ h2_ihash_iter(m->streams, timed_out_busy_iter, &ctx);
+ return ctx.stream;
}
static apr_status_t unschedule_slow_tasks(h2_mplx *m)
{
- h2_task *task;
+ h2_stream *stream;
int n;
/* Try to get rid of streams that occupy workers. Look for safe requests
* that are repeatable. If none found, fail the connection.
*/
- n = (m->workers_busy - m->workers_limit - (int)h2_ihash_count(m->redo_tasks));
- while (n > 0 && (task = get_latest_repeatable_unsubmitted_task(m))) {
- h2_task_rst(task, H2_ERR_CANCEL);
- h2_ihash_add(m->redo_tasks, task);
+ n = (m->workers_busy - m->workers_limit - (int)h2_ihash_count(m->sredo));
+ while (n > 0 && (stream = get_latest_repeatable_unsubmitted_stream(m))) {
+ h2_task_rst(stream->task, H2_ERR_CANCEL);
+ h2_ihash_add(m->sredo, stream);
--n;
}
- if ((m->workers_busy - h2_ihash_count(m->redo_tasks)) > m->workers_limit) {
- task = get_timed_out_busy_task(m);
- if (task) {
+ if ((m->workers_busy - h2_ihash_count(m->sredo)) > m->workers_limit) {
+ h2_stream *stream = get_timed_out_busy_stream(m);
+ if (stream) {
/* Too many busy workers, unable to cancel enough streams
* and with a busy, timed out stream, we tell the client
* to go away... */
static int ngn_update_window(void *ctx, void *val)
{
ngn_update_ctx *uctx = ctx;
- h2_task *task = val;
- if (task && task->assigned == uctx->ngn
- && output_consumed_signal(uctx->m, task)) {
+ h2_stream *stream = val;
+ if (stream->task && stream->task->assigned == uctx->ngn
+ && output_consumed_signal(uctx->m, stream->task)) {
++uctx->streams_updated;
}
return 1;
ctx.m = m;
ctx.ngn = ngn;
ctx.streams_updated = 0;
- h2_ihash_iter(m->tasks, ngn_update_window, &ctx);
+ h2_ihash_iter(m->streams, ngn_update_window, &ctx);
return ctx.streams_updated? APR_SUCCESS : APR_EAGAIN;
}
int acquired;
if (enter_mutex(m, &acquired) == APR_SUCCESS) {
+ h2_stream *stream = h2_ihash_get(m->streams, task->stream_id);
+
ngn_out_update_windows(m, ngn);
h2_ngn_shed_done_task(m->ngn_shed, ngn, task);
- if (status != APR_SUCCESS && h2_task_can_redo(task)
- && !h2_ihash_get(m->redo_tasks, task->stream_id)) {
- h2_ihash_add(m->redo_tasks, task);
+
+ if (status != APR_SUCCESS && stream
+ && h2_task_can_redo(task)
+ && !h2_ihash_get(m->sredo, stream->id)) {
+ h2_ihash_add(m->sredo, stream);
}
if (task->engine) {
/* cannot report that as done until engine returns */
}
if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
- ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, m->c,
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
"h2_mplx(%ld): dispatch events", m->id);
apr_atomic_set32(&m->event_pending, 0);
/* update input windows for streams */
if (h2_ihash_empty(m->streams)) {
waiting = 0;
}
- if (h2_iq_empty(m->q) && h2_ihash_empty(m->tasks)) {
+ if (h2_iq_empty(m->readyq) && h2_iq_empty(m->q) && !m->workers_busy) {
waiting = 0;
}
leave_mutex(m, acquired);
unsigned int need_registration : 1;
struct h2_ihash_t *streams; /* all streams currently processing */
+ struct h2_ihash_t *sredo; /* all streams that need to be re-started */
struct h2_ihash_t *shold; /* all streams done with task ongoing */
struct h2_ihash_t *spurge; /* all streams done, ready for destroy */
-
+
struct h2_iqueue *q; /* all stream ids that need to be started */
struct h2_iqueue *readyq; /* all stream ids ready for output */
- struct h2_ihash_t *tasks; /* all tasks started and not destroyed */
struct h2_ihash_t *redo_tasks; /* all tasks that need to be redone */
int max_streams; /* max # of concurrent streams */
* @param m the mplx to be released and destroyed
* @param wait condition var to wait on for ref counter == 0
*/
-apr_status_t h2_mplx_release_and_join(h2_mplx *m, struct apr_thread_cond_t *wait);
-
-/**
- * Aborts the multiplexer. It will answer all future invocation with
- * APR_ECONNABORTED, leading to early termination of ongoing streams.
- */
-void h2_mplx_abort(h2_mplx *mplx);
+void h2_mplx_release_and_join(h2_mplx *m, struct apr_thread_cond_t *wait);
struct h2_task *h2_mplx_pop_task(h2_mplx *mplx, int *has_more);
struct h2_stream *h2_mplx_stream_get(h2_mplx *m, int id);
/**
- * Notifies mplx that a stream has finished processing.
+ * Notifies mplx that a stream has been completely handled on the main
+ * connection and is ready for cleanup.
*
* @param m the mplx itself
- * @param stream the id of the stream being done
- * @param rst_error if != 0, the stream was reset with the error given
- *
+ * @param stream the stream ready for cleanup
*/
-apr_status_t h2_mplx_stream_done(h2_mplx *m, struct h2_stream *stream);
+apr_status_t h2_mplx_stream_cleanup(h2_mplx *m, struct h2_stream *stream);
/**
* Waits on output data from any stream in this session to become available.
const char *p_server_uri;
int standalone;
- h2_stream_state_t state;
+ h2_proxy_stream_state_t state;
unsigned int suspended : 1;
unsigned int waiting_on_100 : 1;
unsigned int waiting_on_ping : 1;
struct h2_proxy_iqueue;
struct h2_proxy_ihash_t;
+typedef enum {
+ H2_STREAM_ST_IDLE,
+ H2_STREAM_ST_OPEN,
+ H2_STREAM_ST_RESV_LOCAL,
+ H2_STREAM_ST_RESV_REMOTE,
+ H2_STREAM_ST_CLOSED_INPUT,
+ H2_STREAM_ST_CLOSED_OUTPUT,
+ H2_STREAM_ST_CLOSED,
+} h2_proxy_stream_state_t;
+
typedef enum {
H2_PROXYS_ST_INIT, /* send initial SETTINGS, etc. */
H2_PROXYS_ST_DONE, /* finished, connection close */
static apr_status_t dispatch_master(h2_session *session);
static apr_status_t h2_session_read(h2_session *session, int block);
+static void transit(h2_session *session, const char *action,
+ h2_session_state nstate);
+
+static void on_stream_state_enter(void *ctx, h2_stream *stream);
+static void on_stream_state_event(void *ctx, h2_stream *stream, h2_stream_event_t ev);
static int h2_session_status_from_apr_status(apr_status_t rv)
{
static void dispatch_event(h2_session *session, h2_session_event_t ev,
int err, const char *msg);
-apr_status_t h2_session_stream_done(h2_session *session, h2_stream *stream)
+static int rst_unprocessed_stream(h2_stream *stream, void *ctx)
{
- ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c,
- "h2_stream(%ld-%d): EOS bucket cleanup -> done",
- session->id, stream->id);
- h2_mplx_stream_done(session->mplx, stream);
-
- dispatch_event(session, H2_SESSION_EV_STREAM_DONE, 0, NULL);
- return APR_SUCCESS;
-}
-
-typedef struct stream_sel_ctx {
- h2_session *session;
- h2_stream *candidate;
-} stream_sel_ctx;
-
-static int find_unprocessed_stream(h2_stream *stream, void *ictx)
-{
- stream_sel_ctx *ctx = ictx;
- if (H2_STREAM_CLIENT_INITIATED(stream->id)) {
- if (!ctx->session->local.accepting
- && stream->id > ctx->session->local.accepted_max) {
- ctx->candidate = stream;
- return 0;
- }
- }
- else {
- if (!ctx->session->remote.accepting
- && stream->id > ctx->session->remote.accepted_max) {
- ctx->candidate = stream;
- return 0;
- }
+ int unprocessed = (!h2_stream_was_closed(stream)
+ && (H2_STREAM_CLIENT_INITIATED(stream->id)?
+ (!stream->session->local.accepting
+ && stream->id > stream->session->local.accepted_max)
+ :
+ (!stream->session->remote.accepting
+ && stream->id > stream->session->remote.accepted_max))
+ );
+ if (unprocessed) {
+ h2_stream_rst(stream, H2_ERR_NO_ERROR);
+ return 0;
}
return 1;
}
static void cleanup_unprocessed_streams(h2_session *session)
{
- stream_sel_ctx ctx;
- ctx.session = session;
- while (1) {
- ctx.candidate = NULL;
- h2_mplx_stream_do(session->mplx, find_unprocessed_stream, &ctx);
- if (!ctx.candidate) {
- break;
- }
- h2_session_stream_done(session, ctx.candidate);
- }
+ h2_mplx_stream_do(session->mplx, rst_unprocessed_stream, session);
}
-h2_stream *h2_session_open_stream(h2_session *session, int stream_id,
- int initiated_on, const h2_request *req)
+static h2_stream *h2_session_open_stream(h2_session *session, int stream_id,
+ int initiated_on)
{
h2_stream * stream;
apr_pool_t *stream_pool;
apr_pool_create(&stream_pool, session->pool);
apr_pool_tag(stream_pool, "h2_stream");
- stream = h2_stream_open(stream_id, stream_pool, session,
- initiated_on);
- nghttp2_session_set_stream_user_data(session->ngh2, stream_id, stream);
-
- if (req) {
- h2_stream_set_request(stream, req);
- }
-
- if (H2_STREAM_CLIENT_INITIATED(stream_id)) {
- if (stream_id > session->remote.emitted_max) {
- ++session->remote.emitted_count;
- session->remote.emitted_max = stream->id;
- session->local.accepted_max = stream->id;
- }
- }
- else {
- if (stream_id > session->local.emitted_max) {
- ++session->local.emitted_count;
- session->remote.emitted_max = stream->id;
- }
+ stream = h2_stream_create(stream_id, stream_pool, session,
+ session->monitor, initiated_on);
+ if (stream) {
+ nghttp2_session_set_stream_user_data(session->ngh2, stream_id, stream);
}
- dispatch_event(session, H2_SESSION_EV_STREAM_OPEN, 0, NULL);
-
return stream;
}
return spri_cmp(sid1, s1, sid2, s2, session);
}
-static apr_status_t stream_schedule(h2_session *session,
- h2_stream *stream, int eos)
-{
- (void)session;
- return h2_stream_schedule(stream, eos, h2_session_push_enabled(session),
- stream_pri_cmp, session);
-}
-
/*
* Callback when nghttp2 wants to send bytes back to the client.
*/
{
h2_session *session = (h2_session *)userp;
apr_status_t status;
-
(void)ngh2;
(void)flags;
+
status = h2_conn_io_write(&session->io, (const char *)data, length);
if (status == APR_SUCCESS) {
return length;
const uint8_t *data, size_t len, void *userp)
{
h2_session *session = (h2_session *)userp;
- apr_status_t status = APR_SUCCESS;
+ apr_status_t status = APR_EINVAL;
h2_stream * stream;
- int rv;
+ int rv = 0;
- (void)flags;
stream = get_stream(session, stream_id);
- if (!stream) {
+ if (stream) {
+ status = h2_stream_recv_DATA(stream, flags, data, len);
+ }
+ else {
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03064)
"h2_stream(%ld-%d): on_data_chunk for unknown stream",
session->id, (int)stream_id);
- rv = nghttp2_submit_rst_stream(ngh2, NGHTTP2_FLAG_NONE, stream_id,
- NGHTTP2_INTERNAL_ERROR);
- if (nghttp2_is_fatal(rv)) {
- return NGHTTP2_ERR_CALLBACK_FAILURE;
- }
- return 0;
- }
-
- /* FIXME: enabling setting EOS this way seems to break input handling
- * in mod_proxy_http2. why? */
- status = h2_stream_write_data(stream, (const char *)data, len,
- 0 /*flags & NGHTTP2_FLAG_END_STREAM*/);
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, session->c,
- "h2_stream(%ld-%d): data_chunk_recv, written %ld bytes",
- session->id, stream_id, (long)len);
- if (status != APR_SUCCESS) {
- update_window(session, stream_id, len);
- rv = nghttp2_submit_rst_stream(ngh2, NGHTTP2_FLAG_NONE, stream_id,
- H2_STREAM_RST(stream, H2_ERR_INTERNAL_ERROR));
- if (nghttp2_is_fatal(rv)) {
- return NGHTTP2_ERR_CALLBACK_FAILURE;
- }
+ rv = NGHTTP2_ERR_CALLBACK_FAILURE;
}
- return 0;
-}
-
-static apr_status_t stream_closed(h2_session *session,
- h2_stream *stream,
- uint32_t error_code)
-{
- conn_rec *c = session->c;
- apr_bucket *b;
- apr_status_t status;
- if (!error_code) {
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c,
- "h2_stream(%ld-%d): handled, closing",
- session->id, (int)stream->id);
- if (H2_STREAM_CLIENT_INITIATED(stream->id)
- && stream->id > session->local.completed_max) {
- session->local.completed_max = stream->id;
- }
- }
- else {
- ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, c, APLOGNO(03065)
- "h2_stream(%ld-%d): closing with err=%d %s",
- session->id, (int)stream->id, (int)error_code,
- h2_h2_err_description(error_code));
- h2_stream_rst(stream, error_code);
+ if (status != APR_SUCCESS) {
+ /* count this as consumed explicitly as no one will read it */
+ nghttp2_session_consume(session->ngh2, stream_id, len);
}
- /* The stream might have data in the buffers of the main connection.
- * We can only free the allocated resources once all had been written.
- * Send a special buckets on the connection that gets destroyed when
- * all preceding data has been handled. On its destruction, it is safe
- * to purge all resources of the stream. */
- b = h2_bucket_eos_create(c->bucket_alloc, stream);
- APR_BRIGADE_INSERT_TAIL(session->bbtmp, b);
- status = h2_conn_io_pass(&session->io, session->bbtmp);
- apr_brigade_cleanup(session->bbtmp);
- return status;
+ return rv;
}
static int on_stream_close_cb(nghttp2_session *ngh2, int32_t stream_id,
(void)ngh2;
stream = get_stream(session, stream_id);
if (stream) {
- stream_closed(session, stream, error_code);
+ if (error_code) {
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03065)
+ H2_STREAM_MSG(stream, "closing with err=%d %s"),
+ (int)error_code, h2_h2_err_description(error_code));
+ h2_stream_rst(stream, error_code);
+ }
}
return 0;
}
/* nop */
}
else {
- s = h2_session_open_stream(userp, frame->hd.stream_id, 0, NULL);
+ s = h2_session_open_stream(userp, frame->hd.stream_id, 0);
}
return s? 0 : NGHTTP2_ERR_START_STREAM_NOT_ALLOWED;
}
status = h2_stream_add_header(stream, (const char *)name, namelen,
(const char *)value, valuelen);
- if (status == APR_ECONNRESET) {
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, session->c,
- "h2-stream(%ld-%d): on_header, reset stream",
- session->id, stream->id);
- nghttp2_submit_rst_stream(ngh2, NGHTTP2_FLAG_NONE, stream->id,
- NGHTTP2_INTERNAL_ERROR);
- }
- else if (status != APR_SUCCESS && !h2_stream_is_ready(stream)) {
+ if (status != APR_SUCCESS && !h2_stream_is_ready(stream)) {
return NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE;
}
return 0;
void *userp)
{
h2_session *session = (h2_session *)userp;
- apr_status_t status = APR_SUCCESS;
h2_stream *stream;
if (APLOGcdebug(session->c)) {
* trailers */
stream = get_stream(session, frame->hd.stream_id);
if (stream) {
- int eos = (frame->hd.flags & NGHTTP2_FLAG_END_STREAM);
-
- if (h2_stream_is_scheduled(stream)) {
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c,
- "h2_stream(%ld-%d): TRAILER, eos=%d",
- session->id, frame->hd.stream_id, eos);
- if (eos) {
- status = h2_stream_close_input(stream);
- }
- }
- else {
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c,
- "h2_stream(%ld-%d): HEADER, eos=%d",
- session->id, frame->hd.stream_id, eos);
- status = stream_schedule(session, stream, eos);
- }
- }
- else {
- status = APR_EINVAL;
+ h2_stream_recv_frame(stream, NGHTTP2_HEADERS, frame->hd.flags);
}
break;
case NGHTTP2_DATA:
stream = get_stream(session, frame->hd.stream_id);
if (stream) {
- int eos = (frame->hd.flags & NGHTTP2_FLAG_END_STREAM);
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c,
- "h2_stream(%ld-%d): DATA, len=%ld, eos=%d",
- session->id, frame->hd.stream_id,
- (long)frame->hd.length, eos);
- if (eos) {
- status = h2_stream_close_input(stream);
- }
- }
- else {
- status = APR_EINVAL;
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(02923)
+ H2_STREAM_MSG(stream, "DATA, len=%ld, flags=%d"),
+ (long)frame->hd.length, frame->hd.flags);
+ h2_stream_recv_frame(stream, NGHTTP2_DATA, frame->hd.flags);
}
break;
case NGHTTP2_PRIORITY:
}
break;
}
-
- if (status != APR_SUCCESS) {
- int rv;
-
- ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, session->c,
- APLOGNO(02923)
- "h2_session: stream(%ld-%d): error handling frame",
- session->id, (int)frame->hd.stream_id);
- rv = nghttp2_submit_rst_stream(ng2s, NGHTTP2_FLAG_NONE,
- frame->hd.stream_id,
- NGHTTP2_INTERNAL_ERROR);
- if (nghttp2_is_fatal(rv)) {
- return NGHTTP2_ERR_CALLBACK_FAILURE;
- }
- }
-
return 0;
}
if (status != APR_SUCCESS) {
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, session->c,
- "h2_stream(%ld-%d): writing frame header",
- session->id, (int)stream_id);
+ H2_STREAM_MSG(stream, "writing frame header"));
return NGHTTP2_ERR_CALLBACK_FAILURE;
}
status = h2_stream_read_to(stream, session->bbtmp, &len, &eos);
if (status != APR_SUCCESS) {
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, session->c,
- "h2_stream(%ld-%d): send_data_cb, reading stream",
- session->id, (int)stream_id);
+ H2_STREAM_MSG(stream, "send_data_cb, reading stream"));
apr_brigade_cleanup(session->bbtmp);
return NGHTTP2_ERR_CALLBACK_FAILURE;
}
else if (len != length) {
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, session->c,
- "h2_stream(%ld-%d): send_data_cb, wanted %ld bytes, "
- "got %ld from stream",
- session->id, (int)stream_id, (long)length, (long)len);
+ H2_STREAM_MSG(stream, "send_data_cb, wanted %ld bytes, "
+ "got %ld from stream"), (long)length, (long)len);
apr_brigade_cleanup(session->bbtmp);
return NGHTTP2_ERR_CALLBACK_FAILURE;
}
return 0;
}
else {
- ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, session->c,
- APLOGNO(02925)
- "h2_stream(%ld-%d): failed send_data_cb",
- session->id, (int)stream_id);
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, session->c, APLOGNO(02925)
+ H2_STREAM_MSG(stream, "failed send_data_cb"));
return NGHTTP2_ERR_CALLBACK_FAILURE;
}
}
void *user_data)
{
h2_session *session = user_data;
+ h2_stream *stream;
+ int stream_id = frame->hd.stream_id;
+
+ ++session->frames_sent;
+ switch (frame->hd.type) {
+ case NGHTTP2_PUSH_PROMISE:
+ /* PUSH_PROMISE we report on the promised stream */
+ stream_id = frame->push_promise.promised_stream_id;
+ break;
+ default:
+ break;
+ }
+
if (APLOGcdebug(session->c)) {
char buffer[256];
session->id, buffer, (long)session->frames_received,
(long)session->frames_sent);
}
- ++session->frames_sent;
+
+ stream = get_stream(session, stream_id);
+ if (stream) {
+ h2_stream_send_frame(stream, frame->hd.type, frame->hd.flags);
+ }
return 0;
}
uint8_t flags, void *user_data)
{
h2_session *session = user_data;
+ h2_stream *stream;
+
if (APLOGcdebug(session->c)) {
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03456)
"h2_session(%ld-%d): denying stream with invalid header "
apr_pstrndup(session->pool, (const char *)name, namelen),
apr_pstrndup(session->pool, (const char *)value, valuelen));
}
- return nghttp2_submit_rst_stream(session->ngh2, NGHTTP2_FLAG_NONE,
- frame->hd.stream_id,
- NGHTTP2_PROTOCOL_ERROR);
+ stream = get_stream(session, frame->hd.stream_id);
+ if (stream) {
+ h2_stream_rst(stream, NGHTTP2_PROTOCOL_ERROR);
+ }
+ return 0;
}
#endif
"session(%ld): sent GOAWAY, err=%d, msg=%s",
session->id, error, msg? msg : "");
dispatch_event(session, H2_SESSION_EV_LOCAL_GOAWAY, error, msg);
-
- if (force_close) {
- apr_brigade_cleanup(session->bbtmp);
- h2_mplx_abort(session->mplx);
- }
-
return status;
}
session->id);
}
- if (session->mplx) {
- h2_mplx_set_consumed_cb(session->mplx, NULL, NULL);
- h2_mplx_release_and_join(session->mplx, session->iowait);
- session->mplx = NULL;
- }
- if (session->ngh2) {
- nghttp2_session_del(session->ngh2);
- session->ngh2 = NULL;
- }
+ transit(session, "pool cleanup", H2_SESSION_ST_CLEANUP);
+ h2_mplx_set_consumed_cb(session->mplx, NULL, NULL);
+ h2_mplx_release_and_join(session->mplx, session->iowait);
+ session->mplx = NULL;
+
+ ap_assert(session->ngh2);
+ nghttp2_session_del(session->ngh2);
+ session->ngh2 = NULL;
+
return APR_SUCCESS;
}
/* get h2_session a lifetime beyond its pool and everything
* connected to it. */
- session = apr_pcalloc(c->pool, sizeof(h2_session));
+ session = apr_pcalloc(pool, sizeof(h2_session));
if (session) {
int rv;
nghttp2_mem *mem;
return NULL;
}
+ session->monitor = apr_pcalloc(pool, sizeof(h2_stream_monitor));
+ if (session->monitor == NULL) {
+ return NULL;
+ }
+ session->monitor->ctx = session;
+ session->monitor->on_state_enter = on_stream_state_enter;
+ session->monitor->on_state_event = on_stream_state_event;
+
session->mplx = h2_mplx_create(c, session->pool, session->config,
session->s->timeout, workers);
}
/* Now we need to auto-open stream 1 for the request we got. */
- stream = h2_session_open_stream(session, 1, 0, NULL);
+ stream = h2_session_open_stream(session, 1, 0);
if (!stream) {
status = APR_EGENERAL;
ap_log_rerror(APLOG_MARK, APLOG_ERR, status, session->r,
return status;
}
- status = h2_stream_set_request_rec(stream, session->r);
- if (status != APR_SUCCESS) {
- return status;
- }
- status = stream_schedule(session, stream, 1);
+ status = h2_stream_set_request_rec(stream, session->r, 1);
if (status != APR_SUCCESS) {
return status;
}
status = h2_stream_out_prepare(stream, &nread, &eos, NULL);
if (nread) {
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c,
+ H2_STREAM_MSG(stream, "prepared no_copy, len=%ld, eos=%d"),
+ (long)nread, eos);
*data_flags |= NGHTTP2_DATA_FLAG_NO_COPY;
}
break;
case APR_ECONNRESET:
- return nghttp2_submit_rst_stream(ng2s, NGHTTP2_FLAG_NONE,
- stream->id, stream->rst_error);
+ return 0;
case APR_EAGAIN:
/* If there is no data available, our session will automatically
*/
nread = 0;
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03071)
- "h2_stream(%ld-%d): suspending",
- session->id, (int)stream_id);
+ H2_STREAM_MSG(stream, "suspending"));
return NGHTTP2_ERR_DEFERRED;
default:
nread = 0;
- ap_log_cerror(APLOG_MARK, APLOG_ERR, status, session->c,
- APLOGNO(02938) "h2_stream(%ld-%d): reading data",
- session->id, (int)stream_id);
+ ap_log_cerror(APLOG_MARK, APLOG_ERR, status, session->c, APLOGNO(02938)
+ H2_STREAM_MSG(stream, "reading data"));
return NGHTTP2_ERR_CALLBACK_FAILURE;
}
struct h2_stream *h2_session_push(h2_session *session, h2_stream *is,
h2_push *push)
{
- apr_status_t status;
h2_stream *stream;
h2_ngheader *ngh;
int nid;
ngh->nv, ngh->nvlen, NULL);
if (nid <= 0) {
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03075)
- "h2_stream(%ld-%d): submitting push promise fail: %s",
- session->id, is->id, nghttp2_strerror(nid));
+ H2_STREAM_MSG(is, "submitting push promise fail: %s"),
+ nghttp2_strerror(nid));
return NULL;
}
++session->pushes_promised;
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03076)
- "h2_stream(%ld-%d): SERVER_PUSH %d for %s %s on %d",
- session->id, is->id, nid,
- push->req->method, push->req->path, is->id);
+ H2_STREAM_MSG(is, "SERVER_PUSH %d for %s %s on %d"),
+ nid, push->req->method, push->req->path, is->id);
- stream = h2_session_open_stream(session, nid, is->id, push->req);
- if (stream) {
- h2_session_set_prio(session, stream, push->priority);
- status = stream_schedule(session, stream, 1);
- if (status != APR_SUCCESS) {
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, session->c,
- "h2_stream(%ld-%d): scheduling push stream",
- session->id, stream->id);
- stream = NULL;
- }
- ++session->unsent_promises;
- }
- else {
- ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03077)
- "h2_stream(%ld-%d): failed to create stream obj %d",
- session->id, is->id, nid);
- }
-
+ stream = h2_session_open_stream(session, nid, is->id);
if (!stream) {
- /* try to tell the client that it should not wait. */
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03077)
+ H2_STREAM_MSG(stream, "failed to create stream obj %d"),
+ nid);
+ /* kill the push_promise */
nghttp2_submit_rst_stream(session->ngh2, NGHTTP2_FLAG_NONE, nid,
NGHTTP2_INTERNAL_ERROR);
+ return NULL;
}
+ h2_session_set_prio(session, stream, push->priority);
+ h2_stream_set_request(stream, push->req);
+ ++session->unsent_promises;
return stream;
}
s = nghttp2_session_find_stream(session->ngh2, stream->id);
if (!s) {
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c,
- "h2_stream(%ld-%d): lookup of nghttp2_stream failed",
- session->id, stream->id);
+ H2_STREAM_MSG(stream, "lookup of nghttp2_stream failed"));
return APR_EINVAL;
}
rv = nghttp2_session_change_stream_priority(session->ngh2, stream->id, &ps);
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03203)
- "h2_stream(%ld-%d): PUSH %s, weight=%d, "
- "depends=%d, returned=%d",
- session->id, stream->id, ptype,
- ps.weight, ps.stream_id, rv);
+ H2_STREAM_MSG(stream, "PUSH %s, weight=%d, depends=%d, returned=%d"),
+ ptype, ps.weight, ps.stream_id, rv);
status = (rv < 0)? APR_EGENERAL : APR_SUCCESS;
}
#else
ap_assert(session);
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c,
- "h2_stream(%ld-%d): on_headers", session->id, stream->id);
+ H2_STREAM_MSG(stream, "on_headers"));
if (headers->status < 100) {
- int err = H2_STREAM_RST(stream, headers->status);
- rv = nghttp2_submit_rst_stream(session->ngh2, NGHTTP2_FLAG_NONE,
- stream->id, err);
- ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c,
- "h2_stream(%ld-%d): unpexected header status %d, stream rst",
- session->id, stream->id, headers->status);
+ h2_stream_rst(stream, headers->status);
goto leave;
}
else if (stream->has_response) {
nh = h2_util_ngheader_make(stream->pool, headers->headers);
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03072)
- "h2_stream(%ld-%d): submit %d trailers",
- session->id, (int)stream->id,(int) nh->nvlen);
+ H2_STREAM_MSG(stream, "submit %d trailers"), (int)nh->nvlen);
rv = nghttp2_submit_trailer(session->ngh2, stream->id, nh->nv, nh->nvlen);
goto leave;
}
const char *note;
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03073)
- "h2_stream(%ld-%d): submit response %d, REMOTE_WINDOW_SIZE=%u",
- session->id, stream->id, headers->status,
+ H2_STREAM_MSG(stream, "submit response %d, REMOTE_WINDOW_SIZE=%u"),
+ headers->status,
(unsigned int)nghttp2_session_get_stream_remote_window_size(session->ngh2, stream->id));
if (!eos || len > 0) {
ap_assert(stream);
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c,
- "h2_stream(%ld-%d): on_resume", session->id, stream->id);
+ H2_STREAM_MSG(stream, "on_resume"));
send_headers:
headers = NULL;
status = h2_stream_out_prepare(stream, &len, &eos, &headers);
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, session->c,
- "h2_stream(%ld-%d): prepared len=%ld, eos=%d",
- session->id, stream->id, (long)len, eos);
+ H2_STREAM_MSG(stream, "prepared len=%ld, eos=%d"),
+ (long)len, eos);
if (headers) {
status = on_stream_headers(session, stream, headers, len, eos);
if (status != APR_SUCCESS || stream->rst_error) {
goto send_headers;
}
else if (status != APR_EAGAIN) {
+ /* we have DATA to send */
if (!stream->has_response) {
- int err = H2_STREAM_RST(stream, H2_ERR_PROTOCOL_ERROR);
+ /* but no response */
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03466)
- "h2_stream(%ld-%d): no response, RST_STREAM, err=%d",
- session->id, stream->id, err);
- nghttp2_submit_rst_stream(session->ngh2, NGHTTP2_FLAG_NONE,
- stream->id, err);
+ H2_STREAM_MSG(stream, "no response, RST_STREAM"));
+ h2_stream_rst(stream, H2_ERR_PROTOCOL_ERROR);
return APR_SUCCESS;
}
rv = nghttp2_session_resume_data(session->ngh2, stream->id);
session->have_written = 1;
ap_log_cerror(APLOG_MARK, nghttp2_is_fatal(rv)?
- APLOG_ERR : APLOG_DEBUG, 0, session->c,
- APLOGNO(02936)
- "h2_stream(%ld-%d): resuming %s",
- session->id, stream->id, rv? nghttp2_strerror(rv) : "");
+ APLOG_ERR : APLOG_DEBUG, 0, session->c, APLOGNO(02936)
+ H2_STREAM_MSG(stream, "resumed"));
}
return status;
}
"IDLE", /* H2_SESSION_ST_IDLE */
"BUSY", /* H2_SESSION_ST_BUSY */
"WAIT", /* H2_SESSION_ST_WAIT */
+ "CLEANUP", /* H2_SESSION_ST_CLEANUP */
};
static const char *state_name(h2_session_state state)
}
}
-static void h2_session_ev_stream_ready(h2_session *session, int arg, const char *msg)
-{
- switch (session->state) {
- case H2_SESSION_ST_WAIT:
- transit(session, "stream ready", H2_SESSION_ST_BUSY);
- break;
- default:
- /* nop */
- break;
- }
-}
-
static void h2_session_ev_data_read(h2_session *session, int arg, const char *msg)
{
switch (session->state) {
h2_session_shutdown(session, arg, msg, 1);
}
-static void h2_session_ev_stream_open(h2_session *session, int arg, const char *msg)
+static void ev_stream_open(h2_session *session, h2_stream *stream)
{
- ++session->open_streams;
switch (session->state) {
case H2_SESSION_ST_IDLE:
if (session->open_streams == 1) {
default:
break;
}
+
+ ap_assert(!stream->scheduled);
+ if (stream->request) {
+ const h2_request *r = stream->request;
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c,
+ H2_STREAM_MSG(stream, "schedule %s %s://%s%s chunked=%d"),
+ r->method, r->scheme, r->authority, r->path, r->chunked);
+ stream->scheduled = 1;
+ h2_mplx_process(session->mplx, stream, stream_pri_cmp, session);
+ }
+ else {
+ h2_stream_rst(stream, H2_ERR_INTERNAL_ERROR);
+ }
}
-static void h2_session_ev_stream_done(h2_session *session, int arg, const char *msg)
+static void ev_stream_closed(h2_session *session, h2_stream *stream)
{
- --session->open_streams;
+ apr_bucket *b;
+
+ if (H2_STREAM_CLIENT_INITIATED(stream->id)
+ && (stream->id > session->local.completed_max)) {
+ session->local.completed_max = stream->id;
+ }
switch (session->state) {
case H2_SESSION_ST_IDLE:
if (session->open_streams == 0) {
default:
break;
}
+
+ /* The stream might have data in the buffers of the main connection.
+ * We can only free the allocated resources once all had been written.
+ * Send a special buckets on the connection that gets destroyed when
+ * all preceding data has been handled. On its destruction, it is safe
+ * to purge all resources of the stream. */
+ b = h2_bucket_eos_create(session->c->bucket_alloc, stream);
+ APR_BRIGADE_INSERT_TAIL(session->bbtmp, b);
+ h2_conn_io_pass(&session->io, session->bbtmp);
+ apr_brigade_cleanup(session->bbtmp);
+}
+
+static void on_stream_state_enter(void *ctx, h2_stream *stream)
+{
+ h2_session *session = ctx;
+ /* stream entered a new state */
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c,
+ H2_STREAM_MSG(stream, "entered state"));
+ switch (stream->state) {
+ case H2_SS_IDLE: /* stream was created */
+ ++session->open_streams;
+ if (H2_STREAM_CLIENT_INITIATED(stream->id)) {
+ ++session->remote.emitted_count;
+ if (stream->id > session->remote.emitted_max) {
+ session->remote.emitted_max = stream->id;
+ session->local.accepted_max = stream->id;
+ }
+ }
+ else {
+ if (stream->id > session->local.emitted_max) {
+ ++session->local.emitted_count;
+ session->remote.emitted_max = stream->id;
+ }
+ }
+ break;
+ case H2_SS_OPEN: /* stream has request headers */
+ case H2_SS_RSVD_L: /* stream has request headers */
+ ev_stream_open(session, stream);
+ break;
+ case H2_SS_CLOSED_L: /* stream output was closed */
+ break;
+ case H2_SS_CLOSED_R: /* stream input was closed */
+ break;
+ case H2_SS_CLOSED: /* stream in+out were closed */
+ --session->open_streams;
+ ev_stream_closed(session, stream);
+ break;
+ case H2_SS_CLEANUP:
+ h2_mplx_stream_cleanup(session->mplx, stream);
+ break;
+ default:
+ break;
+ }
+}
+
+static void on_stream_state_event(void *ctx, h2_stream *stream,
+ h2_stream_event_t ev)
+{
+ h2_session *session = ctx;
+ switch (ev) {
+ case H2_SEV_CANCELLED:
+ if (session->state != H2_SESSION_ST_DONE) {
+ nghttp2_submit_rst_stream(session->ngh2, NGHTTP2_FLAG_NONE,
+ stream->id, stream->rst_error);
+ }
+ break;
+ default:
+ /* NOP */
+ break;
+ }
}
static void dispatch_event(h2_session *session, h2_session_event_t ev,
case H2_SESSION_EV_NO_IO:
h2_session_ev_no_io(session, arg, msg);
break;
- case H2_SESSION_EV_STREAM_READY:
- h2_session_ev_stream_ready(session, arg, msg);
- break;
case H2_SESSION_EV_DATA_READ:
h2_session_ev_data_read(session, arg, msg);
break;
case H2_SESSION_EV_PRE_CLOSE:
h2_session_ev_pre_close(session, arg, msg);
break;
- case H2_SESSION_EV_STREAM_OPEN:
- h2_session_ev_stream_open(session, arg, msg);
- break;
- case H2_SESSION_EV_STREAM_DONE:
- h2_session_ev_stream_done(session, arg, msg);
- break;
default:
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c,
"h2_session(%ld): unknown event %d",
session->id, ev);
break;
}
-
- if (session->state == H2_SESSION_ST_DONE) {
- apr_brigade_cleanup(session->bbtmp);
- h2_mplx_abort(session->mplx);
- }
}
/* trigger window updates, stream resumes and submits */
struct h2_push_diary;
struct h2_session;
struct h2_stream;
+struct h2_stream_monitor;
struct h2_task;
struct h2_workers;
H2_SESSION_EV_PROTO_ERROR, /* protocol error */
H2_SESSION_EV_CONN_TIMEOUT, /* connection timeout */
H2_SESSION_EV_NO_IO, /* nothing has been read or written */
- H2_SESSION_EV_STREAM_READY, /* stream signalled availability of headers/data */
H2_SESSION_EV_DATA_READ, /* connection data has been read */
H2_SESSION_EV_NGH2_DONE, /* nghttp2 wants neither read nor write anything */
H2_SESSION_EV_MPM_STOPPING, /* the process is stopping */
H2_SESSION_EV_PRE_CLOSE, /* connection will close after this */
- H2_SESSION_EV_STREAM_OPEN, /* stream has been opened */
- H2_SESSION_EV_STREAM_DONE, /* stream has been handled completely */
} h2_session_event_t;
typedef struct h2_session {
struct h2_push_diary *push_diary; /* remember pushes, avoid duplicates */
+ struct h2_stream_monitor *monitor;/* monitor callbacks for streams */
int open_streams; /* number of client streams open */
int unsent_submits; /* number of submitted, but not yet written responses. */
int unsent_promises; /* number of submitted, but not yet written push promises */
*/
void h2_session_close(h2_session *session);
-/**
- * Create and register a new stream under the given id.
- *
- * @param session the session to register in
- * @param stream_id the new stream identifier
- * @param initiated_on the stream id this one is initiated on or 0
- * @param req the request for this stream or NULL if not known yet
- * @return the new stream
- */
-struct h2_stream *h2_session_open_stream(h2_session *session, int stream_id,
- int initiated_on,
- const h2_request *req);
-
-
/**
* Returns if client settings have push enabled.
* @param != 0 iff push is enabled in client settings
*/
int h2_session_push_enabled(h2_session *session);
-/**
- * Destroy the stream and release it everywhere. Reclaim all resources.
- * @param session the session to which the stream belongs
- * @param stream the stream to destroy
- */
-apr_status_t h2_session_stream_done(h2_session *session,
- struct h2_stream *stream);
-
/**
* Submit a push promise on the stream and schedule the new steam for
* processing..
struct h2_stream *stream,
const struct h2_priority *prio);
-
#endif /* defined(__mod_h2__h2_session__) */
#include "h2_util.h"
-static int state_transition[][7] = {
- /* ID OP RL RR CI CO CL */
-/*ID*/{ 1, 0, 0, 0, 0, 0, 0 },
-/*OP*/{ 1, 1, 0, 0, 0, 0, 0 },
-/*RL*/{ 0, 0, 1, 0, 0, 0, 0 },
-/*RR*/{ 0, 0, 0, 1, 0, 0, 0 },
-/*CI*/{ 1, 1, 0, 0, 1, 0, 0 },
-/*CO*/{ 1, 1, 0, 0, 0, 1, 0 },
-/*CL*/{ 1, 1, 0, 0, 1, 1, 1 },
+#define S_XXX (-2)
+#define S_ERR (-1)
+#define S_NOP (0)
+#define S_IDL (H2_SS_IDL + 1)
+#define S_RS_L (H2_SS_RSVD_L + 1)
+#define S_RS_R (H2_SS_RSVD_R + 1)
+#define S_OPEN (H2_SS_OPEN + 1)
+#define S_CL_L (H2_SS_CLOSED_L + 1)
+#define S_CL_R (H2_SS_CLOSED_R + 1)
+#define S_CLS (H2_SS_CLOSED + 1)
+#define S_CLN (H2_SS_CLEANUP + 1)
+
+static const char *h2_ss_str(h2_stream_state_t state)
+{
+ switch (state) {
+ case H2_SS_IDLE:
+ return "IDLE";
+ case H2_SS_RSVD_L:
+ return "RESERVED_LOCAL";
+ case H2_SS_RSVD_R:
+ return "RESERVED_REMOTE";
+ case H2_SS_OPEN:
+ return "OPEN";
+ case H2_SS_CLOSED_L:
+ return "HALF_CLOSED_LOCAL";
+ case H2_SS_CLOSED_R:
+ return "HALF_CLOSED_REMOTE";
+ case H2_SS_CLOSED:
+ return "CLOSED";
+ case H2_SS_CLEANUP:
+ return "CLEANUP";
+ default:
+ return "UNKNOWN";
+ }
+}
+
+const char *h2_stream_state_str(h2_stream *stream)
+{
+ return h2_ss_str(stream->state);
+}
+
+static int trans_on_send[][H2_SS_MAX] = {
+/* S_IDLE,S_RS_R, S_RS_L, S_OPEN, S_CL_R, S_CL_L, S_CLS, S_CLN, */
+/* DATA, */ { S_ERR, S_ERR, S_ERR, S_NOP, S_NOP, S_ERR, S_NOP, S_NOP, },
+/* HEADERS, */ { S_ERR, S_ERR, S_CL_R, S_NOP, S_NOP, S_ERR, S_NOP, S_NOP, },
+/* PRIORITY, */ { S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, },
+/* RST_STREAM, */ { S_CLS, S_CLS, S_CLS, S_CLS, S_CLS, S_CLS, S_NOP, S_NOP, },
+/* SETTINGS, */ { S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, },
+/* PUSH_PROMISE, */ { S_RS_L,S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, },
+/* PING, */ { S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, },
+/* GOAWAY, */ { S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, },
+/* WINDOW_UPDATE,*/ { S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, },
+/* CONT */ { S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, },
};
+static int trans_on_recv[][H2_SS_MAX] = {
+/* S_IDLE,S_RS_R, S_RS_L, S_OPEN, S_CL_R, S_CL_L, S_CLS, S_CLN, */
+/* DATA, */ { S_ERR, S_ERR, S_ERR, S_NOP, S_ERR, S_NOP, S_NOP, S_NOP, },
+/* HEADERS, */ { S_OPEN,S_CL_L, S_ERR, S_NOP, S_ERR, S_NOP, S_NOP, S_NOP, },
+/* PRIORITY, */ { S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, },
+/* RST_STREAM, */ { S_ERR, S_CLS, S_CLS, S_CLS, S_CLS, S_CLS, S_NOP, S_NOP, },
+/* SETTINGS, */ { S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, },
+/* PUSH_PROMISE, */ { S_RS_R,S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, },
+/* PING, */ { S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, },
+/* GOAWAY, */ { S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, },
+/* WINDOW_UPDATE,*/ { S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, },
+/* CONT */ { S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, },
+};
+static int trans_on_event[][H2_SS_MAX] = {
+/* H2_SEV_CLOSED_L*/{ S_XXX, S_ERR, S_ERR, S_CL_L, S_CLS, S_XXX, S_XXX, S_XXX, },
+/* H2_SEV_CLOSED_R*/{ S_ERR, S_ERR, S_ERR, S_CL_R, S_ERR, S_CLS, S_NOP, S_NOP, },
+/* H2_SEV_CANCELLED*/{S_CLS, S_CLS, S_CLS, S_CLS, S_CLS, S_CLS, S_NOP, S_NOP, },
+/* H2_SEV_EOS_SENT*/{ S_NOP, S_XXX, S_XXX, S_XXX, S_XXX, S_CLS, S_CLN, S_XXX, },
+};
+
+static int on_map(h2_stream_state_t state, int map[H2_SS_MAX])
+{
+ int op = map[state];
+ switch (op) {
+ case S_XXX:
+ case S_ERR:
+ return op;
+ case S_NOP:
+ return state;
+ default:
+ return op-1;
+ }
+}
+
+static int on_frame(h2_stream_state_t state, int frame_type,
+ int frame_map[][H2_SS_MAX], apr_size_t maxlen)
+{
+ ap_assert(frame_type >= 0);
+ ap_assert(state >= 0);
+ if (frame_type >= maxlen) {
+ return state; /* NOP */
+ }
+ return on_map(state, frame_map[frame_type]);
+}
+
+static int on_frame_send(h2_stream_state_t state, int frame_type)
+{
+ return on_frame(state, frame_type, trans_on_send, H2_ALEN(trans_on_send));
+}
+
+static int on_frame_recv(h2_stream_state_t state, int frame_type)
+{
+ return on_frame(state, frame_type, trans_on_recv, H2_ALEN(trans_on_recv));
+}
+
+static int on_event(h2_stream_state_t state, h2_stream_event_t ev)
+{
+ return on_map(state, trans_on_event[ev]);
+}
static void H2_STREAM_OUT_LOG(int lvl, h2_stream *s, const char *tag)
{
}
}
-static int set_state(h2_stream *stream, h2_stream_state_t state)
+static apr_status_t close_input(h2_stream *stream)
{
- int allowed = state_transition[state][stream->state];
- if (allowed) {
- stream->state = state;
- return 1;
+ conn_rec *c = stream->session->c;
+ apr_status_t status;
+ apr_bucket_brigade *tmp;
+ apr_bucket *b;
+
+ if (h2_beam_is_closed(stream->input)) {
+ return APR_SUCCESS;
}
- ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, stream->session->c, APLOGNO(03081)
- "h2_stream(%ld-%d): invalid state transition from %d to %d",
- stream->session->id, stream->id, stream->state, state);
- return 0;
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c,
+ H2_STREAM_MSG(stream, "closing input"));
+ if (stream->rst_error) {
+ return APR_ECONNRESET;
+ }
+
+ tmp = apr_brigade_create(stream->pool, c->bucket_alloc);
+ if (stream->trailers && !apr_is_empty_table(stream->trailers)) {
+ h2_headers *r = h2_headers_create(HTTP_OK, stream->trailers,
+ NULL, stream->pool);
+ b = h2_bucket_headers_create(c->bucket_alloc, r);
+ APR_BRIGADE_INSERT_TAIL(tmp, b);
+ stream->trailers = NULL;
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, stream->session->c,
+ H2_STREAM_MSG(stream, "added trailers"));
+ }
+
+ b = apr_bucket_eos_create(c->bucket_alloc);
+ APR_BRIGADE_INSERT_TAIL(tmp, b);
+ status = h2_beam_send(stream->input, tmp, APR_BLOCK_READ);
+ apr_brigade_destroy(tmp);
+ h2_beam_close(stream->input);
+ return status;
+}
+
+static apr_status_t close_output(h2_stream *stream)
+{
+ if (h2_beam_is_closed(stream->output)) {
+ return APR_SUCCESS;
+ }
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c,
+ H2_STREAM_MSG(stream, "closing output"));
+ return h2_beam_leave(stream->output);
}
-static int close_input(h2_stream *stream)
+static void on_state_enter(h2_stream *stream)
{
+ if (stream->monitor && stream->monitor->on_state_enter) {
+ stream->monitor->on_state_enter(stream->monitor->ctx, stream);
+ }
+}
+
+static void on_state_event(h2_stream *stream, h2_stream_event_t ev)
+{
+ if (stream->monitor && stream->monitor->on_state_event) {
+ stream->monitor->on_state_event(stream->monitor->ctx, stream, ev);
+ }
+}
+
+static void on_state_invalid(h2_stream *stream)
+{
+ if (stream->monitor && stream->monitor->on_state_invalid) {
+ stream->monitor->on_state_invalid(stream->monitor->ctx, stream);
+ }
+ /* stream got an event/frame invalid in its state */
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c,
+ H2_STREAM_MSG(stream, "invalid state event"));
switch (stream->state) {
- case H2_STREAM_ST_CLOSED_INPUT:
- case H2_STREAM_ST_CLOSED:
- return 0; /* ignore, idempotent */
- case H2_STREAM_ST_CLOSED_OUTPUT:
- /* both closed now */
- set_state(stream, H2_STREAM_ST_CLOSED);
+ case H2_SS_OPEN:
+ case H2_SS_RSVD_L:
+ case H2_SS_RSVD_R:
+ case H2_SS_CLOSED_L:
+ case H2_SS_CLOSED_R:
+ h2_stream_rst(stream, H2_ERR_INTERNAL_ERROR);
break;
default:
- /* everything else we jump to here */
- set_state(stream, H2_STREAM_ST_CLOSED_INPUT);
break;
}
- return 1;
}
-static int input_closed(h2_stream *stream)
+static apr_status_t transit(h2_stream *stream, int new_state)
{
- switch (stream->state) {
- case H2_STREAM_ST_OPEN:
- case H2_STREAM_ST_CLOSED_OUTPUT:
- return 0;
- default:
- return 1;
+ if (new_state == stream->state) {
+ return APR_SUCCESS;
}
+ else if (new_state < 0) {
+ ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, stream->session->c, APLOGNO(03081)
+ H2_STREAM_MSG(stream, "invalid transition"));
+ on_state_invalid(stream);
+ return APR_EINVAL;
+ }
+
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c,
+ H2_STREAM_MSG(stream, "transit to [%s]"), h2_ss_str(new_state));
+ stream->state = new_state;
+ switch (new_state) {
+ case H2_SS_IDLE:
+ break;
+ case H2_SS_RSVD_L:
+ close_input(stream);
+ break;
+ case H2_SS_RSVD_R:
+ break;
+ case H2_SS_OPEN:
+ break;
+ case H2_SS_CLOSED_L:
+ close_output(stream);
+ break;
+ case H2_SS_CLOSED_R:
+ close_input(stream);
+ break;
+ case H2_SS_CLOSED:
+ close_input(stream);
+ close_output(stream);
+ if (stream->out_buffer) {
+ apr_brigade_cleanup(stream->out_buffer);
+ }
+ break;
+ case H2_SS_CLEANUP:
+ break;
+ }
+ on_state_enter(stream);
+ return APR_SUCCESS;
}
-static int close_output(h2_stream *stream)
+void h2_stream_set_monitor(h2_stream *stream, h2_stream_monitor *monitor)
{
- switch (stream->state) {
- case H2_STREAM_ST_CLOSED_OUTPUT:
- case H2_STREAM_ST_CLOSED:
- return 0; /* ignore, idempotent */
- case H2_STREAM_ST_CLOSED_INPUT:
- /* both closed now */
- set_state(stream, H2_STREAM_ST_CLOSED);
+ stream->monitor = monitor;
+}
+
+void h2_stream_dispatch(h2_stream *stream, h2_stream_event_t ev)
+{
+ int new_state;
+
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, stream->session->c,
+ H2_STREAM_MSG(stream, "dispatch event %d"), ev);
+ new_state = on_event(stream->state, ev);
+ if (new_state < 0) {
+ ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, stream->session->c, APLOGNO(03081)
+ H2_STREAM_MSG(stream, "invalid event %d"), ev);
+ on_state_invalid(stream);
+ AP_DEBUG_ASSERT(new_state > S_XXX);
+ return;
+ }
+ else if (new_state == stream->state) {
+ /* nop */
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, stream->session->c,
+ H2_STREAM_MSG(stream, "ignored event %d"), ev);
+ return;
+ }
+ else {
+ on_state_event(stream, ev);
+ transit(stream, new_state);
+ }
+}
+
+static void set_policy_for(h2_stream *stream, h2_request *r)
+{
+ int enabled = h2_session_push_enabled(stream->session);
+ stream->push_policy = h2_push_policy_determine(r->headers, stream->pool,
+ enabled);
+ r->serialize = h2_config_geti(stream->session->config, H2_CONF_SER_HEADERS);
+}
+
+apr_status_t h2_stream_send_frame(h2_stream *stream, int ftype, int flags)
+{
+ apr_status_t status = APR_SUCCESS;
+ int new_state, eos = 0;
+
+ new_state = on_frame_send(stream->state, ftype);
+ if (new_state < 0) {
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c,
+ H2_STREAM_MSG(stream, "invalid frame %d send"), ftype);
+ AP_DEBUG_ASSERT(new_state > S_XXX);
+ return transit(stream, new_state);
+ }
+
+ switch (ftype) {
+ case NGHTTP2_DATA:
+ eos = (flags & NGHTTP2_FLAG_END_STREAM);
+ break;
+
+ case NGHTTP2_HEADERS:
+ eos = (flags & NGHTTP2_FLAG_END_STREAM);
+ break;
+
+ case NGHTTP2_PUSH_PROMISE:
+ /* start pushed stream */
+ ap_assert(stream->request == NULL);
+ ap_assert(stream->rtmp != NULL);
+ status = h2_request_end_headers(stream->rtmp, stream->pool, 0);
+ if (status != APR_SUCCESS) {
+ return status;
+ }
+ set_policy_for(stream, stream->rtmp);
+ stream->request = stream->rtmp;
+ stream->rtmp = NULL;
break;
+
default:
- /* everything else we jump to here */
- set_state(stream, H2_STREAM_ST_CLOSED_OUTPUT);
break;
}
- return 1;
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c,
+ H2_STREAM_MSG(stream, "send frame %d, eos=%d"), ftype, eos);
+ status = transit(stream, new_state);
+ if (status == APR_SUCCESS && eos) {
+ status = transit(stream, on_event(stream->state, H2_SEV_CLOSED_L));
+ }
+ return status;
}
-static int input_open(const h2_stream *stream)
+apr_status_t h2_stream_recv_frame(h2_stream *stream, int ftype, int flags)
{
- switch (stream->state) {
- case H2_STREAM_ST_OPEN:
- case H2_STREAM_ST_CLOSED_OUTPUT:
- return 1;
+ apr_status_t status = APR_SUCCESS;
+ int new_state, eos = 0;
+
+ new_state = on_frame_recv(stream->state, ftype);
+ if (new_state < 0) {
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c,
+ H2_STREAM_MSG(stream, "invalid frame %d recv"), ftype);
+ AP_DEBUG_ASSERT(new_state > S_XXX);
+ return transit(stream, new_state);
+ }
+
+ switch (ftype) {
+ case NGHTTP2_DATA:
+ eos = (flags & NGHTTP2_FLAG_END_STREAM);
+ break;
+
+ case NGHTTP2_HEADERS:
+ eos = (flags & NGHTTP2_FLAG_END_STREAM);
+ if (stream->state == H2_SS_OPEN) {
+ /* trailer HEADER */
+ if (!eos) {
+ h2_stream_rst(stream, H2_ERR_PROTOCOL_ERROR);
+ }
+ }
+ else {
+ /* request HEADER */
+ ap_assert(stream->request == NULL);
+ ap_assert(stream->rtmp != NULL);
+ status = h2_request_end_headers(stream->rtmp, stream->pool, 0);
+ if (status != APR_SUCCESS) {
+ return status;
+ }
+ set_policy_for(stream, stream->rtmp);
+ stream->request = stream->rtmp;
+ stream->rtmp = NULL;
+ }
+ break;
+
default:
- return 0;
+ break;
}
+ status = transit(stream, new_state);
+ if (status == APR_SUCCESS && eos) {
+ status = transit(stream, on_event(stream->state, H2_SEV_CLOSED_R));
+ }
+ return status;
}
-static int output_open(h2_stream *stream)
+apr_status_t h2_stream_recv_DATA(h2_stream *stream, uint8_t flags,
+ const uint8_t *data, size_t len)
{
- switch (stream->state) {
- case H2_STREAM_ST_OPEN:
- case H2_STREAM_ST_CLOSED_INPUT:
- return 1;
- default:
- return 0;
+ h2_session *session = stream->session;
+ apr_status_t status = APR_SUCCESS;
+ apr_bucket_brigade *tmp;
+
+ ap_assert(stream);
+ if (!stream->input) {
+ return APR_EOF;
}
+
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, session->c,
+ H2_STREAM_MSG(stream, "recv DATA, len=%d"), (int)len);
+
+ tmp = apr_brigade_create(stream->pool, session->c->bucket_alloc);
+ apr_brigade_write(tmp, NULL, NULL, (const char *)data, len);
+ status = h2_beam_send(stream->input, tmp, APR_BLOCK_READ);
+ apr_brigade_destroy(tmp);
+
+ stream->in_data_frames++;
+ stream->in_data_octets += len;
+ return status;
}
static void prep_output(h2_stream *stream) {
}
}
-static void prepend_response(h2_stream *stream, h2_headers *response)
-{
- conn_rec *c = stream->session->c;
- apr_bucket *b;
-
- prep_output(stream);
- b = h2_bucket_headers_create(c->bucket_alloc, response);
- APR_BRIGADE_INSERT_HEAD(stream->out_buffer, b);
-}
-
-h2_stream *h2_stream_open(int id, apr_pool_t *pool, h2_session *session,
- int initiated_on)
+h2_stream *h2_stream_create(int id, apr_pool_t *pool, h2_session *session,
+ h2_stream_monitor *monitor, int initiated_on)
{
h2_stream *stream = apr_pcalloc(pool, sizeof(h2_stream));
stream->id = id;
stream->initiated_on = initiated_on;
stream->created = apr_time_now();
- stream->state = H2_STREAM_ST_IDLE;
+ stream->state = H2_SS_IDLE;
stream->pool = pool;
stream->session = session;
+ stream->monitor = monitor;
h2_beam_create(&stream->input, pool, id, "input", H2_BEAM_OWNER_SEND, 0);
h2_beam_send_from(stream->input, stream->pool);
h2_beam_create(&stream->output, pool, id, "output", H2_BEAM_OWNER_RECV, 0);
- set_state(stream, H2_STREAM_ST_OPEN);
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03082)
- "h2_stream(%ld-%d): opened", session->id, stream->id);
+ H2_STREAM_MSG(stream, "created"));
+ on_state_enter(stream);
return stream;
}
status = h2_beam_wait_empty(stream->input, APR_NONBLOCK_READ);
if (status == APR_EAGAIN) {
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, stream->session->c,
- "h2_stream(%ld-%d): wait on input drain",
- stream->session->id, stream->id);
+ H2_STREAM_MSG(stream, "wait on input drain"));
status = h2_beam_wait_empty(stream->input, APR_BLOCK_READ);
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, stream->session->c,
- "h2_stream(%ld-%d): input drain returned",
- stream->session->id, stream->id);
+ H2_STREAM_MSG(stream, "input drain returned"));
}
}
{
ap_assert(stream);
ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, stream->session->c,
- "h2_stream(%ld-%d): destroy",
- stream->session->id, stream->id);
+ H2_STREAM_MSG(stream, "destroy"));
if (stream->pool) {
apr_pool_destroy(stream->pool);
stream->pool = NULL;
}
}
-void h2_stream_eos_destroy(h2_stream *stream)
-{
- h2_session_stream_done(stream->session, stream);
- /* stream possibly destroyed */
-}
-
apr_pool_t *h2_stream_detach_pool(h2_stream *stream)
{
apr_pool_t *pool = stream->pool;
void h2_stream_rst(h2_stream *stream, int error_code)
{
stream->rst_error = error_code;
- close_input(stream);
- close_output(stream);
- if (stream->out_buffer) {
- apr_brigade_cleanup(stream->out_buffer);
- }
+ h2_beam_abort(stream->input);
+ h2_beam_leave(stream->output);
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c,
- "h2_stream(%ld-%d): reset, error=%d",
- stream->session->id, stream->id, error_code);
+ H2_STREAM_MSG(stream, "reset, error=%d"), error_code);
+ h2_stream_dispatch(stream, H2_SEV_CANCELLED);
}
-apr_status_t h2_stream_set_request_rec(h2_stream *stream, request_rec *r)
+apr_status_t h2_stream_set_request_rec(h2_stream *stream,
+ request_rec *r, int eos)
{
h2_request *req;
apr_status_t status;
status = h2_request_rcreate(&req, stream->pool, r);
if (status == APR_SUCCESS) {
ap_log_rerror(APLOG_MARK, APLOG_DEBUG, status, r, APLOGNO(03058)
- "h2_request(%d): set_request_rec %s host=%s://%s%s",
- stream->id, req->method, req->scheme, req->authority,
- req->path);
+ H2_STREAM_MSG(stream, "set_request_rec %s host=%s://%s%s"),
+ req->method, req->scheme, req->authority, req->path);
stream->rtmp = req;
+ /* simulate the frames that led to this */
+ return h2_stream_recv_frame(stream, NGHTTP2_HEADERS,
+ NGHTTP2_FLAG_END_STREAM);
}
return status;
}
-apr_status_t h2_stream_set_request(h2_stream *stream, const h2_request *r)
+void h2_stream_set_request(h2_stream *stream, const h2_request *r)
{
ap_assert(stream->request == NULL);
ap_assert(stream->rtmp == NULL);
stream->rtmp = h2_request_clone(stream->pool, r);
- return APR_SUCCESS;
+}
+
+static void set_error_response(h2_stream *stream, int http_status)
+{
+ if (!h2_stream_is_ready(stream)) {
+ conn_rec *c = stream->session->c;
+ apr_bucket *b;
+ h2_headers *response;
+
+ response = h2_headers_die(http_status, stream->request, stream->pool);
+ prep_output(stream);
+ b = apr_bucket_eos_create(c->bucket_alloc);
+ APR_BRIGADE_INSERT_HEAD(stream->out_buffer, b);
+ b = h2_bucket_headers_create(c->bucket_alloc, response);
+ APR_BRIGADE_INSERT_HEAD(stream->out_buffer, b);
+ }
}
static apr_status_t add_trailer(h2_stream *stream,
if (nlen == 0 || name[0] == ':') {
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, APR_EINVAL, c, APLOGNO(03060)
- "h2_request(%ld-%d): pseudo header in trailer",
- c->id, stream->id);
+ H2_STREAM_MSG(stream, "pseudo header in trailer"));
return APR_EINVAL;
}
if (h2_req_ignore_trailer(name, nlen)) {
const char *name, size_t nlen,
const char *value, size_t vlen)
{
+ h2_session *session = stream->session;
int error = 0;
- ap_assert(stream);
+ apr_status_t status;
if (stream->has_response) {
return APR_EINVAL;
}
++stream->request_headers_added;
if (name[0] == ':') {
- if ((vlen) > stream->session->s->limit_req_line) {
+ if ((vlen) > session->s->limit_req_line) {
/* pseudo header: approximation of request line size check */
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c,
- "h2_stream(%ld-%d): pseudo header %s too long",
- stream->session->id, stream->id, name);
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c,
+ H2_STREAM_MSG(stream, "pseudo %s too long"), name);
error = HTTP_REQUEST_URI_TOO_LARGE;
}
}
- else if ((nlen + 2 + vlen) > stream->session->s->limit_req_fieldsize) {
+ else if ((nlen + 2 + vlen) > session->s->limit_req_fieldsize) {
/* header too long */
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c,
- "h2_stream(%ld-%d): header %s too long",
- stream->session->id, stream->id, name);
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c,
+ H2_STREAM_MSG(stream, "header %s too long"), name);
error = HTTP_REQUEST_HEADER_FIELDS_TOO_LARGE;
}
- if (stream->request_headers_added
- > stream->session->s->limit_req_fields + 4) {
+ if (stream->request_headers_added > session->s->limit_req_fields + 4) {
/* too many header lines, include 4 pseudo headers */
if (stream->request_headers_added
- > stream->session->s->limit_req_fields + 4 + 100) {
+ > session->s->limit_req_fields + 4 + 100) {
/* yeah, right */
+ h2_stream_rst(stream, H2_ERR_ENHANCE_YOUR_CALM);
return APR_ECONNRESET;
}
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c,
- "h2_stream(%ld-%d): too many header lines",
- stream->session->id, stream->id);
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c,
+ H2_STREAM_MSG(stream, "too many header lines"));
error = HTTP_REQUEST_HEADER_FIELDS_TOO_LARGE;
}
- if (h2_stream_is_scheduled(stream)) {
- return add_trailer(stream, name, nlen, value, vlen);
+ if (error) {
+ set_error_response(stream, error);
+ return APR_EINVAL;
}
- else if (error) {
- return h2_stream_set_error(stream, error);
- }
- else {
+ else if (H2_SS_IDLE == stream->state) {
if (!stream->rtmp) {
stream->rtmp = h2_req_create(stream->id, stream->pool,
NULL, NULL, NULL, NULL, NULL, 0);
}
- if (stream->state != H2_STREAM_ST_OPEN) {
- return APR_ECONNRESET;
- }
- return h2_request_add_header(stream->rtmp, stream->pool,
- name, nlen, value, vlen);
+ status = h2_request_add_header(stream->rtmp, stream->pool,
+ name, nlen, value, vlen);
}
-}
-
-apr_status_t h2_stream_schedule(h2_stream *stream, int eos, int push_enabled,
- h2_stream_pri_cmp *cmp, void *ctx)
-{
- apr_status_t status = APR_EINVAL;
- ap_assert(stream);
- ap_assert(stream->session);
- ap_assert(stream->session->mplx);
-
- if (!stream->scheduled) {
- if (eos) {
- close_input(stream);
- }
-
- if (h2_stream_is_ready(stream)) {
- /* already have a resonse, probably a HTTP error code */
- return h2_mplx_process(stream->session->mplx, stream, cmp, ctx);
- }
- else if (!stream->request && stream->rtmp) {
- /* This is the common case: a h2_request was being assembled, now
- * it gets finalized and checked for completness */
- status = h2_request_end_headers(stream->rtmp, stream->pool, eos);
- if (status == APR_SUCCESS) {
- stream->rtmp->serialize = h2_config_geti(stream->session->config,
- H2_CONF_SER_HEADERS);
-
- stream->request = stream->rtmp;
- stream->rtmp = NULL;
- stream->scheduled = 1;
-
- stream->push_policy = h2_push_policy_determine(stream->request->headers,
- stream->pool, push_enabled);
-
-
- status = h2_mplx_process(stream->session->mplx, stream, cmp, ctx);
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c,
- "h2_stream(%ld-%d): scheduled %s %s://%s%s "
- "chunked=%d",
- stream->session->id, stream->id,
- stream->request->method, stream->request->scheme,
- stream->request->authority, stream->request->path,
- stream->request->chunked);
- return status;
- }
- }
- else {
- status = APR_ECONNRESET;
- }
+ else {
+ status = add_trailer(stream, name, nlen, value, vlen);
}
-
- h2_stream_rst(stream, H2_ERR_INTERNAL_ERROR);
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, stream->session->c,
- "h2_stream(%ld-%d): RST=2 (internal err) %s %s://%s%s",
- stream->session->id, stream->id,
- stream->request->method, stream->request->scheme,
- stream->request->authority, stream->request->path);
- return status;
-}
-
-int h2_stream_is_scheduled(const h2_stream *stream)
-{
- return stream->scheduled;
-}
-
-apr_status_t h2_stream_close_input(h2_stream *stream)
-{
- conn_rec *c = stream->session->c;
- apr_status_t status;
- apr_bucket_brigade *tmp;
- apr_bucket *b;
-
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c,
- "h2_stream(%ld-%d): closing input",
- stream->session->id, stream->id);
- if (stream->rst_error) {
- return APR_ECONNRESET;
+ if (status != APR_SUCCESS) {
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c,
+ H2_STREAM_MSG(stream, "header %s not accepted"), name);
+ h2_stream_dispatch(stream, H2_SEV_CANCELLED);
}
-
- tmp = apr_brigade_create(stream->pool, c->bucket_alloc);
- if (stream->trailers && !apr_is_empty_table(stream->trailers)) {
- h2_headers *r = h2_headers_create(HTTP_OK, stream->trailers,
- NULL, stream->pool);
- b = h2_bucket_headers_create(c->bucket_alloc, r);
- APR_BRIGADE_INSERT_TAIL(tmp, b);
- stream->trailers = NULL;
- }
-
- b = apr_bucket_eos_create(c->bucket_alloc);
- APR_BRIGADE_INSERT_TAIL(tmp, b);
- status = h2_beam_send(stream->input, tmp, APR_BLOCK_READ);
- apr_brigade_destroy(tmp);
- h2_beam_close(stream->input);
- return status;
-}
-
-apr_status_t h2_stream_write_data(h2_stream *stream,
- const char *data, size_t len, int eos)
-{
- conn_rec *c = stream->session->c;
- apr_status_t status = APR_SUCCESS;
- apr_bucket_brigade *tmp;
-
- ap_assert(stream);
- if (!stream->input) {
- return APR_EOF;
- }
- if (input_closed(stream) || !stream->request) {
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c,
- "h2_stream(%ld-%d): writing denied, closed=%d, eoh=%d",
- stream->session->id, stream->id, input_closed(stream),
- stream->request != NULL);
- return APR_EINVAL;
- }
-
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c,
- "h2_stream(%ld-%d): add %ld input bytes",
- stream->session->id, stream->id, (long)len);
-
- tmp = apr_brigade_create(stream->pool, c->bucket_alloc);
- apr_brigade_write(tmp, NULL, NULL, data, len);
- status = h2_beam_send(stream->input, tmp, APR_BLOCK_READ);
- apr_brigade_destroy(tmp);
-
- stream->in_data_frames++;
- stream->in_data_octets += len;
-
- if (eos) {
- return h2_stream_close_input(stream);
- }
-
return status;
}
status = h2_beam_receive(stream->output, stream->out_buffer,
APR_NONBLOCK_READ, amount);
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, stream->session->c,
- "h2_stream(%ld-%d): beam_received",
- stream->session->id, stream->id);
+ H2_STREAM_MSG(stream, "beam_received"));
return status;
}
-apr_status_t h2_stream_set_error(h2_stream *stream, int http_status)
-{
- h2_headers *response;
-
- if (h2_stream_is_ready(stream)) {
- return APR_EINVAL;
- }
- if (stream->rtmp) {
- stream->request = stream->rtmp;
- stream->rtmp = NULL;
- }
- response = h2_headers_die(http_status, stream->request, stream->pool);
- prepend_response(stream, response);
- h2_beam_close(stream->output);
- return APR_SUCCESS;
-}
-
static apr_bucket *get_first_headers_bucket(apr_bucket_brigade *bb)
{
if (bb) {
*peos = 1;
return APR_ECONNRESET;
}
- else if (!output_open(stream)) {
- return APR_ECONNRESET;
- }
c = stream->session->c;
prep_output(stream);
}
}
}
-
- if (!*peos && !*plen && status == APR_SUCCESS
- && (!presponse || !*presponse)) {
- status = APR_EAGAIN;
+
+ if (status == APR_SUCCESS) {
+ if (presponse && *presponse) {
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, c,
+ H2_STREAM_MSG(stream, "prepare, response %d"),
+ (*presponse)->status);
+ }
+ else if (*peos || *plen) {
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, c,
+ H2_STREAM_MSG(stream, "prepare, len=%ld eos=%d"),
+ (long)*plen, *peos);
+ }
+ else {
+ status = APR_EAGAIN;
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, c,
+ H2_STREAM_MSG(stream, "prepare, no data"));
+ }
}
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, c,
- "h2_stream(%ld-%d): prepare, len=%ld eos=%d",
- c->id, stream->id, (long)*plen, *peos);
return status;
}
status = APR_EAGAIN;
}
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, c,
- "h2_stream(%ld-%d): read_to, len=%ld eos=%d",
- c->id, stream->id, (long)*plen, *peos);
+ H2_STREAM_MSG(stream, "read_to, len=%ld eos=%d"),
+ (long)*plen, *peos);
return status;
}
-int h2_stream_input_is_open(const h2_stream *stream)
-{
- return input_open(stream);
-}
-
apr_status_t h2_stream_submit_pushes(h2_stream *stream, h2_headers *response)
{
apr_status_t status = APR_SUCCESS;
pushes = h2_push_collect_update(stream, stream->request, response);
if (pushes && !apr_is_empty_array(pushes)) {
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c,
- "h2_stream(%ld-%d): found %d push candidates",
- stream->session->id, stream->id, pushes->nelts);
+ H2_STREAM_MSG(stream, "found %d push candidates"),
+ pushes->nelts);
for (i = 0; i < pushes->nelts; ++i) {
h2_push *push = APR_ARRAY_IDX(pushes, i, h2_push*);
h2_stream *s = h2_session_push(stream->session, stream, push);
return NULL;
}
-const char *h2_stream_state_str(h2_stream *stream)
-{
- switch (stream->state) {
- case H2_STREAM_ST_IDLE:
- return "IDLE";
- case H2_STREAM_ST_OPEN:
- return "OPEN";
- case H2_STREAM_ST_RESV_LOCAL:
- return "RESERVED_LOCAL";
- case H2_STREAM_ST_RESV_REMOTE:
- return "RESERVED_REMOTE";
- case H2_STREAM_ST_CLOSED_INPUT:
- return "HALF_CLOSED_REMOTE";
- case H2_STREAM_ST_CLOSED_OUTPUT:
- return "HALF_CLOSED_LOCAL";
- case H2_STREAM_ST_CLOSED:
- return "CLOSED";
- default:
- return "UNKNOWN";
-
- }
-}
-
int h2_stream_is_ready(h2_stream *stream)
{
if (stream->has_response) {
return 0;
}
+int h2_stream_was_closed(const h2_stream *stream)
+{
+ switch (stream->state) {
+ case H2_SS_CLOSED:
+ case H2_SS_CLEANUP:
+ return 1;
+ default:
+ return 0;
+ }
+}
+
struct h2_request;
struct h2_headers;
struct h2_session;
-struct h2_sos;
+struct h2_task;
struct h2_bucket_beam;
typedef struct h2_stream h2_stream;
+typedef void h2_stream_state_cb(void *ctx, h2_stream *stream);
+typedef void h2_stream_event_cb(void *ctx, h2_stream *stream,
+ h2_stream_event_t ev);
+
+typedef struct h2_stream_monitor {
+ void *ctx;
+ h2_stream_state_cb *on_state_enter; /* called when a state is entered */
+ h2_stream_state_cb *on_state_invalid; /* called when an invalid state change
+ was detected */
+ h2_stream_event_cb *on_state_event; /* called right before the given event
+ result in a new stream state */
+} h2_stream_monitor;
+
struct h2_stream {
- int id; /* http2 stream id */
- int initiated_on; /* initiating stream id (PUSH) or 0 */
- apr_time_t created; /* when stream was created */
- h2_stream_state_t state; /* http/2 state of this stream */
+ int id; /* http2 stream identifier */
+ int initiated_on; /* initiating stream id (PUSH) or 0 */
+ apr_pool_t *pool; /* the memory pool for this stream */
struct h2_session *session; /* the session this stream belongs to */
+ h2_stream_state_t state; /* state of this stream */
+
+ apr_time_t created; /* when stream was created */
- apr_pool_t *pool; /* the memory pool for this stream */
const struct h2_request *request; /* the request made in this stream */
struct h2_request *rtmp; /* request being assembled */
apr_table_t *trailers; /* optional incoming trailers */
int rst_error; /* stream error for RST_STREAM */
unsigned int aborted : 1; /* was aborted */
unsigned int scheduled : 1; /* stream has been scheduled */
- unsigned int started : 1; /* stream has started processing */
unsigned int has_response : 1; /* response headers are known */
unsigned int push_policy; /* which push policy to use for this request */
+ struct h2_task *task; /* assigned task to fullfill request */
+
const h2_priority *pref_priority; /* preferred priority for this stream */
apr_off_t out_data_frames; /* # of DATA frames sent */
apr_off_t out_data_octets; /* # of DATA octets (payload) sent */
apr_off_t in_data_frames; /* # of DATA frames received */
apr_off_t in_data_octets; /* # of DATA octets (payload) received */
- const char *sos_filter;
+ h2_stream_monitor *monitor; /* optional monitor for stream states */
};
#define H2_STREAM_RST(s, def) (s->rst_error? s->rst_error : (def))
/**
- * Create a stream in OPEN state.
+ * Create a stream in H2_SS_IDLE state.
* @param id the stream identifier
* @param pool the memory pool to use for this stream
* @param session the session this stream belongs to
* @return the newly opened stream
*/
-h2_stream *h2_stream_open(int id, apr_pool_t *pool, struct h2_session *session,
- int initiated_on);
+h2_stream *h2_stream_create(int id, apr_pool_t *pool,
+ struct h2_session *session,
+ h2_stream_monitor *monitor,
+ int initiated_on);
/**
- * Cleanup any resources still held by the stream, called by last bucket.
+ * Destroy memory pool if still owned by the stream.
*/
-void h2_stream_eos_destroy(h2_stream *stream);
+void h2_stream_destroy(h2_stream *stream);
+
+/*
+ * Set a new monitor for this stream, replacing any existing one. Can
+ * be called with NULL to have no monitor installed.
+ */
+void h2_stream_set_monitor(h2_stream *stream, h2_stream_monitor *monitor);
/**
- * Destroy memory pool if still owned by the stream.
+ * Dispatch (handle) an event on the given stream.
+ * @param stream the streama the event happened on
+ * @param ev the type of event
*/
-void h2_stream_destroy(h2_stream *stream);
+void h2_stream_dispatch(h2_stream *stream, h2_stream_event_t ev);
/**
* Cleanup references into requst processing.
apr_pool_t *h2_stream_detach_pool(h2_stream *stream);
/**
- * Initialize stream->request with the given h2_request.
+ * Set complete stream headers from given h2_request.
*
* @param stream stream to write request to
* @param r the request with all the meta data
+ * @param eos != 0 iff stream input is closed
*/
-apr_status_t h2_stream_set_request(h2_stream *stream, const h2_request *r);
+void h2_stream_set_request(h2_stream *stream, const h2_request *r);
/**
- * Initialize stream->request with the given request_rec.
+ * Set complete stream header from given request_rec.
*
* @param stream stream to write request to
* @param r the request with all the meta data
+ * @param eos != 0 iff stream input is closed
*/
-apr_status_t h2_stream_set_request_rec(h2_stream *stream, request_rec *r);
+apr_status_t h2_stream_set_request_rec(h2_stream *stream,
+ request_rec *r, int eos);
/*
* Add a HTTP/2 header (including pseudo headers) or trailer
const char *name, size_t nlen,
const char *value, size_t vlen);
-/**
- * Closes the stream's input.
- *
- * @param stream stream to close intput of
- */
-apr_status_t h2_stream_close_input(h2_stream *stream);
+apr_status_t h2_stream_send_frame(h2_stream *stream, int frame_type, int flags);
+apr_status_t h2_stream_recv_frame(h2_stream *stream, int frame_type, int flags);
/*
- * Write a chunk of DATA to the stream.
+ * Process a frame of received DATA.
*
* @param stream stream to write the data to
+ * @param flags the frame flags
* @param data the beginning of the bytes to write
* @param len the number of bytes to write
*/
-apr_status_t h2_stream_write_data(h2_stream *stream,
- const char *data, size_t len, int eos);
+apr_status_t h2_stream_recv_DATA(h2_stream *stream, uint8_t flags,
+ const uint8_t *data, size_t len);
/**
* Reset the stream. Stream write/reads will return errors afterwards.
void h2_stream_rst(h2_stream *stream, int error_code);
/**
- * Schedule the stream for execution. All header information must be
- * present. Use the given priority comparison callback to determine
- * order in queued streams.
- *
- * @param stream the stream to schedule
- * @param eos != 0 iff no more input will arrive
- * @param cmp priority comparison
- * @param ctx context for comparison
- */
-apr_status_t h2_stream_schedule(h2_stream *stream, int eos, int push_enabled,
- h2_stream_pri_cmp *cmp, void *ctx);
-
-/**
- * Determine if stream has been scheduled already.
+ * Determine if stream was closed already. This is true for
+ * states H2_SS_CLOSED, H2_SS_CLEANUP. But not true
+ * for H2_SS_CLOSED_L and H2_SS_CLOSED_R.
+ *
* @param stream the stream to check on
- * @return != 0 iff stream has been scheduled
- */
-int h2_stream_is_scheduled(const h2_stream *stream);
-
-/**
- * Set the HTTP error status as response.
+ * @return != 0 iff stream has been closed
*/
-apr_status_t h2_stream_set_error(h2_stream *stream, int http_status);
+int h2_stream_was_closed(const h2_stream *stream);
/**
* Do a speculative read on the stream output to determine the
*/
apr_table_t *h2_stream_get_trailers(h2_stream *stream);
-/**
- * Check if the stream has open input.
- * @param stream the stream to check
- * @return != 0 iff stream has open input.
- */
-int h2_stream_input_is_open(const h2_stream *stream);
-
/**
* Submit any server push promises on this stream and schedule
* the tasks connection with these.
*/
int h2_stream_is_ready(h2_stream *stream);
+
+#define H2_STREAM_MSG(s, msg) \
+ "h2_stream(%ld-%d,%s): "msg, s->session->id, s->id, h2_stream_state_str(s)
+
+
#endif /* defined(__mod_h2__h2_stream__) */
}
}
-static void h2_beam_log(h2_bucket_beam *beam, int id, const char *msg,
- conn_rec *c, int level)
-{
- if (beam && APLOG_C_IS_LEVEL(c,level)) {
- char buffer[2048];
- apr_size_t off = 0;
-
- off += apr_snprintf(buffer+off, H2_ALEN(buffer)-off, "cl=%d, ", beam->closed);
- off += h2_util_bl_print(buffer+off, H2_ALEN(buffer)-off, "red", ", ", &beam->send_list);
- off += h2_util_bb_print(buffer+off, H2_ALEN(buffer)-off, "green", ", ", beam->recv_buffer);
- off += h2_util_bl_print(buffer+off, H2_ALEN(buffer)-off, "hold", ", ", &beam->hold_list);
- off += h2_util_bl_print(buffer+off, H2_ALEN(buffer)-off, "purge", "", &beam->purge_list);
-
- ap_log_cerror(APLOG_MARK, level, 0, c, "beam(%ld-%d): %s %s",
- c->id, id, msg, buffer);
- }
-}
-
/*******************************************************************************
* task input handling
******************************************************************************/
apr_brigade_length(bb, 0, &written);
H2_TASK_OUT_LOG(APLOG_TRACE2, task, bb, "h2_task send_out");
- h2_beam_log(task->output.beam, task->stream_id, "send_out(before)", task->c, APLOG_TRACE2);
+ h2_beam_log(task->output.beam, task->c, APLOG_TRACE2, "send_out(before)");
/* engines send unblocking */
status = h2_beam_send(task->output.beam, bb,
block? APR_BLOCK_READ : APR_NONBLOCK_READ);
- h2_beam_log(task->output.beam, task->stream_id, "send_out(after)", task->c, APLOG_TRACE2);
+ h2_beam_log(task->output.beam, task->c, APLOG_TRACE2, "send_out(after)");
if (APR_STATUS_IS_EAGAIN(status)) {
apr_brigade_length(bb, 0, &left);
void h2_task_rst(h2_task *task, int error)
{
task->rst_error = error;
- h2_beam_abort(task->input.beam);
+ h2_beam_leave(task->input.beam);
if (!task->worker_done) {
h2_beam_abort(task->output.beam);
}
}
/* After the call to ap_process_request, the
- * request pool will have been deleted. We set
+ * request pool may have been deleted. We set
* r=NULL here to ensure that any dereference
* of r that might be added later in this function
* will result in a segfault immediately instead
off += apr_snprintf(buffer+off, bmax-off, "%s", sep);
}
+ if (bmax <= off) {
+ return off;
+ }
if (APR_BUCKET_IS_METADATA(b)) {
if (APR_BUCKET_IS_EOS(b)) {
off += apr_snprintf(buffer+off, bmax-off, "eos");
btype = "pool";
}
- off += apr_snprintf(buffer+off, bmax-off, "%s[%ld]",
- btype,
- (long)(b->length == ((apr_size_t)-1)?
- -1 : b->length));
+ if (bmax > off) {
+ off += apr_snprintf(buffer+off, bmax-off, "%s[%ld]",
+ btype,
+ (long)(b->length == ((apr_size_t)-1)?
+ -1 : b->length));
+ }
}
return off;
}
const char *sp = "";
apr_bucket *b;
- if (bb) {
- memset(buffer, 0, bmax--);
- off += apr_snprintf(buffer+off, bmax-off, "%s(", tag);
- for (b = APR_BRIGADE_FIRST(bb);
- bmax && (b != APR_BRIGADE_SENTINEL(bb));
- b = APR_BUCKET_NEXT(b)) {
-
- off += h2_util_bucket_print(buffer+off, bmax-off, b, sp);
- sp = " ";
+ if (bmax > 1) {
+ if (bb) {
+ memset(buffer, 0, bmax--);
+ off += apr_snprintf(buffer+off, bmax-off, "%s(", tag);
+ for (b = APR_BRIGADE_FIRST(bb);
+ (bmax > off) && (b != APR_BRIGADE_SENTINEL(bb));
+ b = APR_BUCKET_NEXT(b)) {
+
+ off += h2_util_bucket_print(buffer+off, bmax-off, b, sp);
+ sp = " ";
+ }
+ if (bmax > off) {
+ off += apr_snprintf(buffer+off, bmax-off, ")%s", sep);
+ }
+ }
+ else {
+ off += apr_snprintf(buffer+off, bmax-off, "%s(null)%s", tag, sep);
}
- off += apr_snprintf(buffer+off, bmax-off, ")%s", sep);
- }
- else {
- off += apr_snprintf(buffer+off, bmax-off, "%s(null)%s", tag, sep);
}
return off;
}
* @macro
* Version number of the http2 module as c string
*/
-#define MOD_HTTP2_VERSION "1.8.12-DEV"
+#define MOD_HTTP2_VERSION "1.9.0-DEV"
/**
* @macro
* release. This is a 24 bit number with 8 bits for major number, 8 bits
* for minor and 8 bits for patch. Version 1.2.3 becomes 0x010203.
*/
-#define MOD_HTTP2_VERSION_NUM 0x01080c
+#define MOD_HTTP2_VERSION_NUM 0x010900
#endif /* mod_h2_h2_version_h */