Changes with Apache 2.4.21
+ *) mod_http2: slave connections have conn_rec->aborted flag set when a stream
+ has been reset by the client. [Stefan Eissing]
+
+ *) mod_http2: merge of some 2.4.x adaptions re filters on slave connections.
+ Small fixes in bucket beams when forwarding file buckets. Output handling
+ on master connection uses less FLUSH and passes automatically when more
+ than half of H2StreamMaxMemSize bytes have accumulated.
+ Workaround for http: when forwarding partial file buckets to keep the
+ output filter from closing these too early. [Stefan Eissing]
+
+ *) mod_http2: elimination of fixed master connectin buffer for TLS
+ connections. New scratch bucket handling optimized for TLS write sizes.
+ File bucket data read directly into scratch buffers, avoiding one
+ copy. Non-TLS connections continue to pass buckets unchanged to the core
+ filters to allow sendfile() usage. [Stefan Eissing]
+
+ *) mod_http2/mod_proxy_http2: h2_request.c is no longer shared between these
+ modules. This simplifies building on platforms such as Windows, as module
+ reference used in logging is now clear. [Stefan Eissing]
+
*) Scoreboard: Fix a regression in 2.4.20 that causes wrong request data
to be displayed on the status page. PR 59333. [Yann Ylavic, William Rowe]
@echo $(DL) h2_iq_remove,$(DL) >> $@
@echo $(DL) h2_log2,$(DL) >> $@
@echo $(DL) h2_proxy_res_ignore_header,$(DL) >> $@
- @echo $(DL) h2_request_create,$(DL) >> $@
- @echo $(DL) h2_request_make,$(DL) >> $@
+ @echo $(DL) h2_headers_add_h1,$(DL) >> $@
+ @echo $(DL) h2_req_create,$(DL) >> $@
+ @echo $(DL) h2_req_createn,$(DL) >> $@
+ @echo $(DL) h2_req_make,$(DL) >> $@
@echo $(DL) h2_util_camel_case_header,$(DL) >> $@
@echo $(DL) h2_util_frame_print,$(DL) >> $@
@echo $(DL) h2_util_ngheader_make_req,$(DL) >> $@
static apr_status_t enter_yellow(h2_bucket_beam *beam, h2_beam_lock *pbl)
{
- if (beam->m_enter) {
- return beam->m_enter(beam->m_ctx, pbl);
+ h2_beam_mutex_enter *enter = beam->m_enter;
+ if (enter) {
+ void *ctx = beam->m_ctx;
+ if (ctx) {
+ return enter(ctx, pbl);
+ }
}
pbl->mutex = NULL;
pbl->leave = NULL;
status = APR_EAGAIN;
break;
}
+ if (beam->m_cond) {
+ apr_thread_cond_broadcast(beam->m_cond);
+ }
status = wait_cond(beam, bl.mutex);
}
leave_yellow(beam, &bl);
if (enter_yellow(beam, &bl) == APR_SUCCESS) {
transfer:
if (beam->aborted) {
+ if (!APR_BRIGADE_EMPTY(beam->green)) {
+ apr_brigade_cleanup(beam->green);
+ }
status = APR_ECONNABORTED;
goto leave;
}
#endif
remain -= bred->length;
++transferred;
+ APR_BUCKET_REMOVE(bred);
+ H2_BLIST_INSERT_TAIL(&beam->hold, bred);
+ ++transferred;
+ continue;
}
else {
/* create a "green" standin bucket. we took care about the
* which seems to create less TCP packets overall
*/
#define WRITE_SIZE_MAX (TLS_DATA_MAX - 100)
-#define WRITE_BUFFER_SIZE (5*WRITE_SIZE_MAX)
static void h2_conn_io_bb_log(conn_rec *c, int stream_id, int level,
}
apr_status_t h2_conn_io_init(h2_conn_io *io, conn_rec *c,
- const h2_config *cfg,
- apr_pool_t *pool)
+ const h2_config *cfg)
{
io->c = c;
- io->output = apr_brigade_create(pool, c->bucket_alloc);
- io->buflen = 0;
+ io->output = apr_brigade_create(c->pool, c->bucket_alloc);
io->is_tls = h2_h2_is_tls(c);
io->buffer_output = io->is_tls;
-
- if (io->buffer_output) {
- io->bufsize = WRITE_BUFFER_SIZE;
- io->buffer = apr_pcalloc(pool, io->bufsize);
- }
- else {
- io->bufsize = 0;
- }
+ io->pass_threshold = h2_config_geti64(cfg, H2_CONF_STREAM_MAX_MEM) / 2;
if (io->is_tls) {
/* This is what we start with,
io->warmup_size = h2_config_geti64(cfg, H2_CONF_TLS_WARMUP_SIZE);
io->cooldown_usecs = (h2_config_geti(cfg, H2_CONF_TLS_COOLDOWN_SECS)
* APR_USEC_PER_SEC);
- io->write_size = WRITE_SIZE_INITIAL;
+ io->write_size = (io->cooldown_usecs > 0?
+ WRITE_SIZE_INITIAL : WRITE_SIZE_MAX);
}
else {
io->warmup_size = 0;
io->cooldown_usecs = 0;
- io->write_size = io->bufsize;
+ io->write_size = 0;
}
if (APLOGctrace1(c)) {
return APR_SUCCESS;
}
-int h2_conn_io_is_buffered(h2_conn_io *io)
+#define LOG_SCRATCH 0
+
+static void append_scratch(h2_conn_io *io)
{
- return io->bufsize > 0;
+ if (io->scratch && io->slen > 0) {
+ apr_bucket *b = apr_bucket_heap_create(io->scratch, io->slen,
+ apr_bucket_free,
+ io->c->bucket_alloc);
+ APR_BRIGADE_INSERT_TAIL(io->output, b);
+#if LOG_SCRATCH
+ ap_log_cerror(APLOG_MARK, APLOG_INFO, 0, io->c,
+ "h2_conn_io(%ld): append_scratch(%ld)",
+ io->c->id, (long)io->slen);
+#endif
+ io->scratch = NULL;
+ io->slen = io->ssize = 0;
+ }
}
-typedef struct {
- conn_rec *c;
- h2_conn_io *io;
-} pass_out_ctx;
-
-static apr_status_t pass_out(apr_bucket_brigade *bb, void *ctx)
+static apr_size_t assure_scratch_space(h2_conn_io *io) {
+ apr_size_t remain = io->ssize - io->slen;
+ if (io->scratch && remain == 0) {
+ append_scratch(io);
+ }
+ if (!io->scratch) {
+ /* we control the size and it is larger than what buckets usually
+ * allocate. */
+ io->scratch = apr_bucket_alloc(io->write_size, io->c->bucket_alloc);
+ io->ssize = io->write_size;
+ io->slen = 0;
+ remain = io->ssize;
+ }
+ return remain;
+}
+
+static apr_status_t read_to_scratch(h2_conn_io *io, apr_bucket *b)
{
- pass_out_ctx *pctx = ctx;
- conn_rec *c = pctx->c;
apr_status_t status;
- apr_off_t bblen;
+ const char *data;
+ apr_size_t len;
- if (APR_BRIGADE_EMPTY(bb)) {
+ if (!b->length) {
return APR_SUCCESS;
}
- ap_update_child_status(c->sbh, SERVER_BUSY_WRITE, NULL);
- apr_brigade_length(bb, 0, &bblen);
- h2_conn_io_bb_log(c, 0, APLOG_TRACE2, "master conn pass", bb);
- status = ap_pass_brigade(c->output_filters, bb);
- if (status == APR_SUCCESS && pctx->io) {
- pctx->io->bytes_written += (apr_size_t)bblen;
- pctx->io->last_write = apr_time_now();
+ AP_DEBUG_ASSERT(b->length <= (io->ssize - io->slen));
+ if (APR_BUCKET_IS_FILE(b)) {
+ apr_bucket_file *f = (apr_bucket_file *)b->data;
+ apr_file_t *fd = f->fd;
+ apr_off_t offset = b->start;
+ apr_size_t len = b->length;
+
+ /* file buckets will either mmap (which we do not want) or
+ * read 8000 byte chunks and split themself. However, we do
+ * know *exactly* how many bytes we need where.
+ */
+ status = apr_file_seek(fd, APR_SET, &offset);
+ if (status != APR_SUCCESS) {
+ return status;
+ }
+ status = apr_file_read(fd, io->scratch + io->slen, &len);
+#if LOG_SCRATCH
+ ap_log_cerror(APLOG_MARK, APLOG_INFO, status, io->c,
+ "h2_conn_io(%ld): FILE_to_scratch(%ld)",
+ io->c->id, (long)len);
+#endif
+ if (status != APR_SUCCESS && status != APR_EOF) {
+ return status;
+ }
+ io->slen += len;
}
- if (status != APR_SUCCESS) {
- ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, c, APLOGNO(03044)
- "h2_conn_io(%ld): pass_out brigade %ld bytes",
- c->id, (long)bblen);
+ else {
+ status = apr_bucket_read(b, &data, &len, APR_BLOCK_READ);
+ if (status == APR_SUCCESS) {
+#if LOG_SCRATCH
+ ap_log_cerror(APLOG_MARK, APLOG_INFO, 0, io->c,
+ "h2_conn_io(%ld): read_to_scratch(%ld)",
+ io->c->id, (long)b->length);
+#endif
+ memcpy(io->scratch+io->slen, data, len);
+ io->slen += len;
+ }
}
- apr_brigade_cleanup(bb);
return status;
}
-/* Bring the current buffer content into the output brigade, appropriately
- * chunked.
- */
-static apr_status_t bucketeer_buffer(h2_conn_io *io)
+static void check_write_size(h2_conn_io *io)
{
- const char *data = io->buffer;
- apr_size_t remaining = io->buflen;
- apr_bucket *b;
- int bcount, i;
-
if (io->write_size > WRITE_SIZE_INITIAL
&& (io->cooldown_usecs > 0)
&& (apr_time_now() - io->last_write) >= io->cooldown_usecs) {
"h2_conn_io(%ld): threshold reached, write size now %ld",
(long)io->c->id, (long)io->write_size);
}
-
- bcount = (int)(remaining / io->write_size);
- for (i = 0; i < bcount; ++i) {
- b = apr_bucket_transient_create(data, io->write_size,
- io->output->bucket_alloc);
- APR_BRIGADE_INSERT_TAIL(io->output, b);
- data += io->write_size;
- remaining -= io->write_size;
- }
-
- if (remaining > 0) {
- b = apr_bucket_transient_create(data, remaining,
- io->output->bucket_alloc);
- APR_BRIGADE_INSERT_TAIL(io->output, b);
- }
- return APR_SUCCESS;
}
-apr_status_t h2_conn_io_writeb(h2_conn_io *io, apr_bucket *b, int flush)
+static apr_status_t pass_output(h2_conn_io *io, int flush, int eoc)
{
- APR_BRIGADE_INSERT_TAIL(io->output, b);
+ conn_rec *c = io->c;
+ apr_bucket *b;
+ apr_off_t bblen;
+ apr_status_t status;
+
+ append_scratch(io);
if (flush) {
- b = apr_bucket_flush_create(io->c->bucket_alloc);
+ b = apr_bucket_flush_create(c->bucket_alloc);
APR_BRIGADE_INSERT_TAIL(io->output, b);
}
- return APR_SUCCESS;
-}
-
-static apr_status_t h2_conn_io_flush_int(h2_conn_io *io, int flush, int eoc)
-{
- pass_out_ctx ctx;
- apr_bucket *b;
- if (io->buflen == 0 && APR_BRIGADE_EMPTY(io->output)) {
+ if (APR_BRIGADE_EMPTY(io->output)) {
return APR_SUCCESS;
}
-
- if (io->buflen > 0) {
- /* something in the buffer, put it in the output brigade */
- ap_log_cerror(APLOG_MARK, APLOG_TRACE4, 0, io->c,
- "h2_conn_io: flush, flushing %ld bytes",
- (long)io->buflen);
- bucketeer_buffer(io);
- }
-
- if (flush) {
- b = apr_bucket_flush_create(io->c->bucket_alloc);
- APR_BRIGADE_INSERT_TAIL(io->output, b);
- }
- ap_log_cerror(APLOG_MARK, APLOG_TRACE4, 0, io->c, "h2_conn_io: flush");
- io->buflen = 0;
- ctx.c = io->c;
- ctx.io = eoc? NULL : io;
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE4, 0, c, "h2_conn_io: pass_output");
+ ap_update_child_status(c->sbh, SERVER_BUSY_WRITE, NULL);
+ apr_brigade_length(io->output, 0, &bblen);
- return pass_out(io->output, &ctx);
- /* no more access after this, as we might have flushed an EOC bucket
+ h2_conn_io_bb_log(c, 0, APLOG_TRACE2, "master conn pass", io->output);
+ status = ap_pass_brigade(c->output_filters, io->output);
+
+ /* careful with access after this, as we might have flushed an EOC bucket
* that de-allocated us all. */
+ if (!eoc) {
+ apr_brigade_cleanup(io->output);
+ if (status == APR_SUCCESS) {
+ io->bytes_written += (apr_size_t)bblen;
+ io->last_write = apr_time_now();
+ }
+ }
+
+ if (status != APR_SUCCESS) {
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, c, APLOGNO(03044)
+ "h2_conn_io(%ld): pass_out brigade %ld bytes",
+ c->id, (long)bblen);
+ }
+ return status;
}
apr_status_t h2_conn_io_flush(h2_conn_io *io)
{
- return h2_conn_io_flush_int(io, 1, 0);
-}
-
-apr_status_t h2_conn_io_consider_pass(h2_conn_io *io)
-{
- apr_off_t len = 0;
-
- if (!APR_BRIGADE_EMPTY(io->output)) {
- len = h2_brigade_mem_size(io->output);
- }
- len += io->buflen;
- if (len >= WRITE_BUFFER_SIZE) {
- return h2_conn_io_flush_int(io, 1, 0);
- }
- return APR_SUCCESS;
+ return pass_output(io, 1, 0);
}
apr_status_t h2_conn_io_write_eoc(h2_conn_io *io, h2_session *session)
{
apr_bucket *b = h2_bucket_eoc_create(io->c->bucket_alloc, session);
APR_BRIGADE_INSERT_TAIL(io->output, b);
- return h2_conn_io_flush_int(io, 1, 1);
+ return pass_output(io, 1, 1);
}
-apr_status_t h2_conn_io_write(h2_conn_io *io,
- const char *buf, size_t length)
+apr_status_t h2_conn_io_write(h2_conn_io *io, const char *data, size_t length)
{
apr_status_t status = APR_SUCCESS;
- pass_out_ctx ctx;
+ apr_size_t remain;
- ctx.c = io->c;
- ctx.io = io;
- if (io->bufsize > 0) {
- ap_log_cerror(APLOG_MARK, APLOG_TRACE4, 0, io->c,
- "h2_conn_io: buffering %ld bytes", (long)length);
-
- if (!APR_BRIGADE_EMPTY(io->output)) {
- status = h2_conn_io_flush_int(io, 0, 0);
- }
-
- while (length > 0 && (status == APR_SUCCESS)) {
- apr_size_t avail = io->bufsize - io->buflen;
- if (avail <= 0) {
- status = h2_conn_io_flush_int(io, 0, 0);
- }
- else if (length > avail) {
- memcpy(io->buffer + io->buflen, buf, avail);
- io->buflen += avail;
- length -= avail;
- buf += avail;
+ if (io->buffer_output) {
+ while (length > 0) {
+ remain = assure_scratch_space(io);
+ if (remain >= length) {
+#if LOG_SCRATCH
+ ap_log_cerror(APLOG_MARK, APLOG_INFO, 0, io->c,
+ "h2_conn_io(%ld): write_to_scratch(%ld)",
+ io->c->id, (long)length);
+#endif
+ memcpy(io->scratch + io->slen, data, length);
+ io->slen += length;
+ length = 0;
}
else {
- memcpy(io->buffer + io->buflen, buf, length);
- io->buflen += length;
- length = 0;
- break;
+#if LOG_SCRATCH
+ ap_log_cerror(APLOG_MARK, APLOG_INFO, 0, io->c,
+ "h2_conn_io(%ld): write_to_scratch(%ld)",
+ io->c->id, (long)remain);
+#endif
+ memcpy(io->scratch + io->slen, data, remain);
+ io->slen += remain;
+ data += remain;
+ length -= remain;
}
}
-
}
else {
- ap_log_cerror(APLOG_MARK, APLOG_TRACE4, status, io->c,
- "h2_conn_io: writing %ld bytes to brigade", (long)length);
- status = apr_brigade_write(io->output, pass_out, &ctx, buf, length);
+ status = apr_brigade_write(io->output, NULL, NULL, data, length);
+ }
+ return status;
+}
+
+apr_status_t h2_conn_io_pass(h2_conn_io *io, apr_bucket_brigade *bb)
+{
+ apr_bucket *b;
+ apr_status_t status = APR_SUCCESS;
+
+ check_write_size(io);
+ while (!APR_BRIGADE_EMPTY(bb) && status == APR_SUCCESS) {
+ b = APR_BRIGADE_FIRST(bb);
+
+ if (APR_BUCKET_IS_METADATA(b)) {
+ /* need to finish any open scratch bucket, as meta data
+ * needs to be forward "in order". */
+ append_scratch(io);
+ APR_BUCKET_REMOVE(b);
+ APR_BRIGADE_INSERT_TAIL(io->output, b);
+ }
+ else if (io->buffer_output) {
+ apr_size_t remain = assure_scratch_space(io);
+ if (b->length > remain) {
+ apr_bucket_split(b, remain);
+ if (io->slen == 0) {
+ /* complete write_size bucket, append unchanged */
+ APR_BUCKET_REMOVE(b);
+ APR_BRIGADE_INSERT_TAIL(io->output, b);
+#if LOG_SCRATCH
+ ap_log_cerror(APLOG_MARK, APLOG_INFO, 0, io->c,
+ "h2_conn_io(%ld): pass bucket(%ld)",
+ io->c->id, (long)b->length);
+#endif
+ continue;
+ }
+ }
+ else {
+ /* bucket fits in remain, copy to scratch */
+ read_to_scratch(io, b);
+ apr_bucket_delete(b);
+ continue;
+ }
+ }
+ else {
+ /* no buffering, forward buckets setaside on flush */
+ if (APR_BUCKET_IS_TRANSIENT(b)) {
+ apr_bucket_setaside(b, io->c->pool);
+ }
+ APR_BUCKET_REMOVE(b);
+ APR_BRIGADE_INSERT_TAIL(io->output, b);
+ }
}
+ if (status == APR_SUCCESS) {
+ if (!APR_BRIGADE_EMPTY(io->output)) {
+ apr_off_t len = h2_brigade_mem_size(io->output);
+ if (len >= io->pass_threshold) {
+ return pass_output(io, 0, 0);
+ }
+ }
+ }
return status;
}
apr_int64_t bytes_written;
int buffer_output;
- char *buffer;
- apr_size_t buflen;
- apr_size_t bufsize;
+ apr_size_t pass_threshold;
+
+ char *scratch;
+ apr_size_t ssize;
+ apr_size_t slen;
} h2_conn_io;
apr_status_t h2_conn_io_init(h2_conn_io *io, conn_rec *c,
- const struct h2_config *cfg,
- apr_pool_t *pool);
-
-int h2_conn_io_is_buffered(h2_conn_io *io);
+ const struct h2_config *cfg);
/**
* Append data to the buffered output.
const char *buf,
size_t length);
-/**
- * Append a bucket to the buffered output.
- * @param io the connection io
- * @param b the bucket to append
- */
-apr_status_t h2_conn_io_writeb(h2_conn_io *io, apr_bucket *b, int flush);
+apr_status_t h2_conn_io_pass(h2_conn_io *io, apr_bucket_brigade *bb);
/**
* Append an End-Of-Connection bucket to the output that, once destroyed,
*/
apr_status_t h2_conn_io_flush(h2_conn_io *io);
-/**
- * Check the amount of buffered output and pass it on if enough has accumulated.
- * @param io the connection io
- * @param flush if a flush bucket should be appended to any output
- */
-apr_status_t h2_conn_io_consider_pass(h2_conn_io *io);
-
#endif /* defined(__mod_h2__h2_conn_io__) */
#include "h2_session.h"
#include "h2_task.h"
#include "h2_ctx.h"
-#include "h2_private.h"
static h2_ctx *h2_ctx_create(const conn_rec *c)
{
* This allow recursive entering of the mutex from the saem thread,
* which is what we need in certain situations involving callbacks
*/
+ AP_DEBUG_ASSERT(m);
apr_threadkey_private_get(&mutex, thread_lock);
if (mutex == m->lock) {
*pacquired = 0;
}
static void have_out_data_for(h2_mplx *m, int stream_id);
+static void task_destroy(h2_mplx *m, h2_task *task, int called_from_master);
static void check_tx_reservation(h2_mplx *m)
{
}
}
+static int purge_stream(void *ctx, void *val)
+{
+ h2_mplx *m = ctx;
+ h2_stream *stream = val;
+ h2_task *task = h2_ihash_get(m->tasks, stream->id);
+ h2_ihash_remove(m->spurge, stream->id);
+ h2_stream_destroy(stream);
+ if (task) {
+ task_destroy(m, task, 1);
+ }
+ return 0;
+}
+
+static void purge_streams(h2_mplx *m)
+{
+ if (!h2_ihash_empty(m->spurge)) {
+ while(!h2_ihash_iter(m->spurge, purge_stream, m)) {
+ /* repeat until empty */
+ }
+ h2_ihash_clear(m->spurge);
+ }
+}
+
static void h2_mplx_destroy(h2_mplx *m)
{
AP_DEBUG_ASSERT(m);
m->stream_max_mem = h2_config_geti(conf, H2_CONF_STREAM_MAX_MEM);
m->streams = h2_ihash_create(m->pool, offsetof(h2_stream,id));
+ m->shold = h2_ihash_create(m->pool, offsetof(h2_stream,id));
+ m->spurge = h2_ihash_create(m->pool, offsetof(h2_stream,id));
m->q = h2_iq_create(m->pool, m->max_streams);
m->tasks = h2_ihash_create(m->pool, offsetof(h2_task,stream_id));
m->ready_tasks = h2_ihash_create(m->pool, offsetof(h2_task,stream_id));
return max_stream_started;
}
-static void input_consumed_signal(h2_mplx *m, h2_task *task)
+static void input_consumed_signal(h2_mplx *m, h2_stream *stream)
{
- if (task->input.beam && task->worker_started) {
- h2_beam_send(task->input.beam, NULL, 0); /* trigger updates */
+ if (stream->input) {
+ h2_beam_send(stream->input, NULL, 0); /* trigger updates */
}
}
}
-static void task_destroy(h2_mplx *m, h2_task *task, int events)
+static void task_destroy(h2_mplx *m, h2_task *task, int called_from_master)
{
conn_rec *slave = NULL;
int reuse_slave = 0;
apr_status_t status;
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, m->c,
+ "h2_task(%s): destroy", task->id);
/* cleanup any buffered input */
status = h2_task_shutdown(task, 0);
if (status != APR_SUCCESS){
"h2_task(%s): shutdown", task->id);
}
- if (events) {
+ if (called_from_master) {
/* Process outstanding events before destruction */
- input_consumed_signal(m, task);
+ h2_stream *stream = h2_ihash_get(m->streams, task->stream_id);
+ if (stream) {
+ input_consumed_signal(m, stream);
+ }
}
/* The pool is cleared/destroyed which also closes all
* allocated file handles. Give this count back to our
* file handle pool. */
- if (task->input.beam) {
- m->tx_handles_reserved +=
- h2_beam_get_files_beamed(task->input.beam);
- }
if (task->output.beam) {
m->tx_handles_reserved +=
h2_beam_get_files_beamed(task->output.beam);
{
h2_task *task;
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, m->c,
+ "h2_stream(%ld-%d): done", m->c->id, stream->id);
+ /* Situation: we are, on the master connection, done with processing
+ * the stream. Either we have handled it successfully, or the stream
+ * was reset by the client or the connection is gone and we are
+ * shutting down the whole session.
+ *
+ * We possibly have created a task for this stream to be processed
+ * on a slave connection. The processing might actually be ongoing
+ * right now or has already finished. A finished task waits for its
+ * stream to be done. This is the common case.
+ *
+ * If the stream had input (e.g. the request had a body), a task
+ * may have read, or is still reading buckets from the input beam.
+ * This means that the task is referencing memory from the stream's
+ * pool (or the master connection bucket alloc). Before we can free
+ * the stream pool, we need to make sure that those references are
+ * gone. This is what h2_beam_shutdown() on the input waits for.
+ *
+ * With the input handled, we can tear down that beam and care
+ * about the output beam. The stream might still have buffered some
+ * buckets read from the output, so we need to get rid of those. That
+ * is done by h2_stream_cleanup().
+ *
+ * Now it is save to destroy the task (if it exists and is finished).
+ *
+ * FIXME: we currently destroy the stream, even if the task is still
+ * ongoing. This is not ok, since task->request is coming from stream
+ * memory. We should either copy it on task creation or wait with the
+ * stream destruction until the task is done.
+ */
+ h2_iq_remove(m->q, stream->id);
+ h2_ihash_remove(m->ready_tasks, stream->id);
h2_ihash_remove(m->streams, stream->id);
if (stream->input) {
- apr_status_t status;
- status = h2_beam_shutdown(stream->input, APR_NONBLOCK_READ);
- if (status == APR_EAGAIN) {
- ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
- "h2_stream(%ld-%d): wait on input shutdown",
- m->id, stream->id);
- status = h2_beam_shutdown(stream->input, APR_BLOCK_READ);
- ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, m->c,
- "h2_stream(%ld-%d): input shutdown returned",
- m->id, stream->id);
- }
+ m->tx_handles_reserved += h2_beam_get_files_beamed(stream->input);
}
+ h2_stream_cleanup(stream);
task = h2_ihash_get(m->tasks, stream->id);
if (task) {
- /* Remove task from ready set, we will never submit it */
- h2_ihash_remove(m->ready_tasks, stream->id);
-
- if (task->worker_done) {
- /* already finished or not even started yet */
- h2_iq_remove(m->q, task->stream_id);
- task_destroy(m, task, 0);
- }
- else {
+ if (!task->worker_done) {
/* task still running, cleanup once it is done */
- task->orphaned = 1;
- task->input.beam = NULL;
if (rst_error) {
h2_task_rst(task, rst_error);
}
+ /* FIXME: this should work, but does not
+ h2_ihash_add(m->shold, stream);
+ return;*/
+ task->input.beam = NULL;
+ }
+ else {
+ /* already finished */
+ task_destroy(m, task, 0);
}
}
+ h2_stream_destroy(stream);
}
static int stream_done_iter(void *ctx, void *val)
{
- h2_stream *stream = val;
stream_done((h2_mplx*)ctx, val, 0);
- h2_stream_destroy(stream);
return 0;
}
{
h2_mplx *m = ctx;
h2_task *task = val;
+ h2_stream *stream = h2_ihash_get(m->streams, task->stream_id);
if (task->request) {
ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, /* NO APLOGNO */
"->03198: h2_stream(%s): %s %s %s -> %s %d"
task->request->authority, task->request->path,
task->response? "http" : (task->rst_error? "reset" : "?"),
task->response? task->response->http_status : task->rst_error,
- task->orphaned, task->worker_started,
+ (stream? 0 : 1), task->worker_started,
task->worker_done);
}
else if (task) {
/* disable WINDOW_UPDATE callbacks */
h2_mplx_set_consumed_cb(m, NULL, NULL);
+ if (!h2_ihash_empty(m->shold)) {
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
+ "h2_mplx(%ld): start release_join with %d streams in hold",
+ m->id, (int)h2_ihash_count(m->shold));
+ }
+ if (!h2_ihash_empty(m->spurge)) {
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
+ "h2_mplx(%ld): start release_join with %d streams to purge",
+ m->id, (int)h2_ihash_count(m->spurge));
+ }
+
h2_iq_clear(m->q);
apr_thread_cond_broadcast(m->task_thawed);
while (!h2_ihash_iter(m->streams, stream_done_iter, m)) {
}
AP_DEBUG_ASSERT(h2_ihash_empty(m->streams));
+ if (!h2_ihash_empty(m->shold)) {
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
+ "h2_mplx(%ld): 2. release_join with %d streams in hold",
+ m->id, (int)h2_ihash_count(m->shold));
+ }
+ if (!h2_ihash_empty(m->spurge)) {
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
+ "h2_mplx(%ld): 2. release_join with %d streams to purge",
+ m->id, (int)h2_ihash_count(m->spurge));
+ }
+
/* If we still have busy workers, we cannot release our memory
- * pool yet, as slave connections have child pools of their respective
- * h2_io's.
- * Any remaining ios are processed in these workers. Any operation
- * they do on their input/outputs will be errored ECONNRESET/ABORTED,
- * so processing them should fail and workers *should* return.
+ * pool yet, as tasks have references to us.
+ * Any operation on the task slave connection will from now on
+ * be errored ECONNRESET/ABORTED, so processing them should fail
+ * and workers *should* return in a timely fashion.
*/
for (i = 0; m->workers_busy > 0; ++i) {
m->join_wait = wait;
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
- "h2_mplx(%ld): release_join, waiting on %d tasks to report back",
- m->id, (int)h2_ihash_count(m->tasks));
-
status = apr_thread_cond_timedwait(wait, m->lock, apr_time_from_sec(wait_secs));
if (APR_STATUS_IS_TIMEUP(status)) {
}
}
- ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c, APLOGNO(03056)
- "h2_mplx(%ld): release_join (%d tasks left) -> destroy",
- m->id, (int)h2_ihash_count(m->tasks));
+ AP_DEBUG_ASSERT(h2_ihash_empty(m->shold));
+ if (!h2_ihash_empty(m->spurge)) {
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
+ "h2_mplx(%ld): release_join %d streams to purge",
+ m->id, (int)h2_ihash_count(m->spurge));
+ purge_streams(m);
+ }
+ AP_DEBUG_ASSERT(h2_ihash_empty(m->spurge));
+ AP_DEBUG_ASSERT(h2_ihash_empty(m->tasks));
+
+ if (!h2_ihash_empty(m->tasks)) {
+ ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, APLOGNO(03056)
+ "h2_mplx(%ld): release_join -> destroy, "
+ "%d tasks still present",
+ m->id, (int)h2_ihash_count(m->tasks));
+ }
leave_mutex(m, acquired);
h2_mplx_destroy(m);
/* all gone */
}
}
-apr_status_t h2_mplx_stream_done(h2_mplx *m, int stream_id, int rst_error)
+apr_status_t h2_mplx_stream_done(h2_mplx *m, h2_stream *stream)
{
apr_status_t status = APR_SUCCESS;
int acquired;
- /* This maybe called from inside callbacks that already hold the lock.
- * E.g. when we are streaming out DATA and the EOF triggers the stream
- * release.
- */
AP_DEBUG_ASSERT(m);
if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
- h2_stream *stream = h2_ihash_get(m->streams, stream_id);
- if (stream) {
- ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
- "h2_mplx(%ld-%d): marking stream as done.",
- m->id, stream_id);
- stream_done(m, stream, rst_error);
- }
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
+ "h2_mplx(%ld-%d): marking stream as done.",
+ m->id, stream->id);
+ stream_done(m, stream, stream->rst_error);
leave_mutex(m, acquired);
}
return status;
static int update_window(void *ctx, void *val)
{
- h2_mplx *m = ctx;
- input_consumed_signal(m, val);
+ input_consumed_signal(ctx, val);
return 1;
}
return APR_ECONNABORTED;
}
if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
- h2_ihash_iter(m->tasks, update_window, m);
+ h2_ihash_iter(m->streams, update_window, m);
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
"h2_session(%ld): windows updated", m->id);
return 0;
}
-h2_stream *h2_mplx_next_submit(h2_mplx *m, h2_ihash_t *streams)
+h2_stream *h2_mplx_next_submit(h2_mplx *m)
{
apr_status_t status;
h2_stream *stream = NULL;
h2_task *task = ctx.task;
h2_ihash_remove(m->ready_tasks, task->stream_id);
- stream = h2_ihash_get(streams, task->stream_id);
+ stream = h2_ihash_get(m->streams, task->stream_id);
if (stream && task) {
task->submitted = 1;
if (task->rst_error) {
"h2_mplx(%s): stream for response closed, "
"resetting io to close request processing",
task->id);
- task->orphaned = 1;
h2_task_rst(task, H2_ERR_STREAM_CLOSED);
if (!task->worker_started || task->worker_done) {
task_destroy(m, task, 1);
}
else {
/* hang around until the h2_task is done, but
- * shutdown input/output and send out any events asap. */
+ * shutdown output */
h2_task_shutdown(task, 0);
- input_consumed_signal(m, task);
}
}
}
{
apr_status_t status = APR_SUCCESS;
h2_task *task = h2_ihash_get(m->tasks, stream_id);
+ h2_stream *stream = h2_ihash_get(m->streams, stream_id);
- if (!task || task->orphaned) {
+ if (!task || !stream) {
return APR_ECONNABORTED;
}
static apr_status_t out_close(h2_mplx *m, h2_task *task)
{
apr_status_t status = APR_SUCCESS;
+ h2_stream *stream = h2_ihash_get(m->streams, task->stream_id);
- if (!task || task->orphaned) {
+ if (!task || !stream) {
return APR_ECONNABORTED;
}
}
slave->sbh = m->c->sbh;
+ slave->aborted = 0;
task = h2_task_create(slave, stream->request, stream->input, m);
h2_ihash_add(m->tasks, task);
static void task_done(h2_mplx *m, h2_task *task, h2_req_engine *ngn)
{
- if (task) {
- if (task->frozen) {
- /* this task was handed over to an engine for processing
- * and the original worker has finished. That means the
- * engine may start processing now. */
- h2_task_thaw(task);
- /* we do not want the task to block on writing response
- * bodies into the mplx. */
- /* FIXME: this implementation is incomplete. */
- h2_task_set_io_blocking(task, 0);
- apr_thread_cond_broadcast(m->task_thawed);
- return;
- }
- else {
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
- "h2_mplx(%ld): task(%s) done", m->id, task->id);
- out_close(m, task);
-
- if (ngn) {
- apr_off_t bytes = 0;
- if (task->output.beam) {
- h2_beam_send(task->output.beam, NULL, APR_NONBLOCK_READ);
- bytes += h2_beam_get_buffered(task->output.beam);
- }
- if (bytes > 0) {
- /* we need to report consumed and current buffered output
- * to the engine. The request will be streamed out or cancelled,
- * no more data is coming from it and the engine should update
- * its calculations before we destroy this information. */
- h2_req_engine_out_consumed(ngn, task->c, bytes);
- }
+ if (task->frozen) {
+ /* this task was handed over to an engine for processing
+ * and the original worker has finished. That means the
+ * engine may start processing now. */
+ h2_task_thaw(task);
+ /* we do not want the task to block on writing response
+ * bodies into the mplx. */
+ h2_task_set_io_blocking(task, 0);
+ apr_thread_cond_broadcast(m->task_thawed);
+ return;
+ }
+ else {
+ h2_stream *stream;
+
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
+ "h2_mplx(%ld): task(%s) done", m->id, task->id);
+ out_close(m, task);
+ stream = h2_ihash_get(m->streams, task->stream_id);
+
+ if (ngn) {
+ apr_off_t bytes = 0;
+ if (task->output.beam) {
+ h2_beam_send(task->output.beam, NULL, APR_NONBLOCK_READ);
+ bytes += h2_beam_get_buffered(task->output.beam);
}
-
- if (task->engine) {
- if (!h2_req_engine_is_shutdown(task->engine)) {
- ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c,
- "h2_mplx(%ld): task(%s) has not-shutdown "
- "engine(%s)", m->id, task->id,
- h2_req_engine_get_id(task->engine));
- }
- h2_ngn_shed_done_ngn(m->ngn_shed, task->engine);
+ if (bytes > 0) {
+ /* we need to report consumed and current buffered output
+ * to the engine. The request will be streamed out or cancelled,
+ * no more data is coming from it and the engine should update
+ * its calculations before we destroy this information. */
+ h2_req_engine_out_consumed(ngn, task->c, bytes);
}
-
- if (!m->aborted && !task->orphaned && m->redo_tasks
- && h2_ihash_get(m->redo_tasks, task->stream_id)) {
- /* reset and schedule again */
- h2_task_redo(task);
- h2_ihash_remove(m->redo_tasks, task->stream_id);
- h2_iq_add(m->q, task->stream_id, NULL, NULL);
- return;
+ }
+
+ if (task->engine) {
+ if (!h2_req_engine_is_shutdown(task->engine)) {
+ ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c,
+ "h2_mplx(%ld): task(%s) has not-shutdown "
+ "engine(%s)", m->id, task->id,
+ h2_req_engine_get_id(task->engine));
}
-
- task->worker_done = 1;
- task->done_at = apr_time_now();
- if (task->output.beam) {
- h2_beam_on_consumed(task->output.beam, NULL, NULL);
- h2_beam_mutex_set(task->output.beam, NULL, NULL, NULL);
+ h2_ngn_shed_done_ngn(m->ngn_shed, task->engine);
+ }
+
+ if (!m->aborted && stream && m->redo_tasks
+ && h2_ihash_get(m->redo_tasks, task->stream_id)) {
+ /* reset and schedule again */
+ h2_task_redo(task);
+ h2_ihash_remove(m->redo_tasks, task->stream_id);
+ h2_iq_add(m->q, task->stream_id, NULL, NULL);
+ return;
+ }
+
+ task->worker_done = 1;
+ task->done_at = apr_time_now();
+ if (task->output.beam) {
+ h2_beam_on_consumed(task->output.beam, NULL, NULL);
+ h2_beam_mutex_set(task->output.beam, NULL, NULL, NULL);
+ }
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
+ "h2_mplx(%s): request done, %f ms elapsed", task->id,
+ (task->done_at - task->started_at) / 1000.0);
+ if (task->started_at > m->last_idle_block) {
+ /* this task finished without causing an 'idle block', e.g.
+ * a block by flow control.
+ */
+ if (task->done_at- m->last_limit_change >= m->limit_change_interval
+ && m->workers_limit < m->workers_max) {
+ /* Well behaving stream, allow it more workers */
+ m->workers_limit = H2MIN(m->workers_limit * 2,
+ m->workers_max);
+ m->last_limit_change = task->done_at;
+ m->need_registration = 1;
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
+ "h2_mplx(%ld): increase worker limit to %d",
+ m->id, m->workers_limit);
}
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
- "h2_mplx(%s): request done, %f ms"
- " elapsed", task->id,
- (task->done_at - task->started_at) / 1000.0);
- if (task->started_at > m->last_idle_block) {
- /* this task finished without causing an 'idle block', e.g.
- * a block by flow control.
- */
- if (task->done_at- m->last_limit_change >= m->limit_change_interval
- && m->workers_limit < m->workers_max) {
- /* Well behaving stream, allow it more workers */
- m->workers_limit = H2MIN(m->workers_limit * 2,
- m->workers_max);
- m->last_limit_change = task->done_at;
- m->need_registration = 1;
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
- "h2_mplx(%ld): increase worker limit to %d",
- m->id, m->workers_limit);
- }
+ }
+
+ if (stream) {
+ /* hang around until the stream deregisters */
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
+ "h2_mplx(%s): task_done, stream still open",
+ task->id);
+ }
+ else {
+ /* stream done, was it placed in hold? */
+ stream = h2_ihash_get(m->shold, task->stream_id);
+ if (stream) {
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
+ "h2_mplx(%s): task_done, stream in hold",
+ task->id);
+ stream->response = NULL; /* ref from task memory */
+ /* We cannot destroy the stream here since this is
+ * called from a worker thread and freeing memory pools
+ * is only safe in the only thread using it (and its
+ * parent pool / allocator) */
+ h2_ihash_remove(m->shold, stream->id);
+ h2_ihash_add(m->spurge, stream);
}
-
- if (task->orphaned) {
+ else {
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
+ "h2_mplx(%s): task_done, stream not found",
+ task->id);
task_destroy(m, task, 0);
- if (m->join_wait) {
- apr_thread_cond_signal(m->join_wait);
- }
}
- else {
- /* hang around until the stream deregisters */
+
+ if (m->join_wait) {
+ apr_thread_cond_signal(m->join_wait);
}
}
}
task->r = r;
if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
- if (task->orphaned) {
- status = APR_ECONNABORTED;
+ h2_stream *stream = h2_ihash_get(m->streams, task->stream_id);
+
+ if (stream) {
+ status = h2_ngn_shed_push_task(m->ngn_shed, ngn_type, task, einit);
}
else {
- status = h2_ngn_shed_push_task(m->ngn_shed, ngn_type, task, einit);
+ status = APR_ECONNABORTED;
}
leave_mutex(m, acquired);
}
unsigned int need_registration : 1;
struct h2_ihash_t *streams; /* all streams currently processing */
+ struct h2_ihash_t *shold; /* all streams done with task ongoing */
+ struct h2_ihash_t *spurge; /* all streams done, ready for destroy */
struct h2_iqueue *q; /* all stream ids that need to be started */
struct h2_ihash_t *tasks; /* all tasks started and not destroyed */
* @param rst_error if != 0, the stream was reset with the error given
*
*/
-apr_status_t h2_mplx_stream_done(h2_mplx *m, int stream_id, int rst_error);
+apr_status_t h2_mplx_stream_done(h2_mplx *m, struct h2_stream *stream);
/**
* Waits on output data from any stream in this session to become available.
* @param m the mplxer to get a response from
* @param bb the brigade to place any existing repsonse body data into
*/
-struct h2_stream *h2_mplx_next_submit(h2_mplx *m,
- struct h2_ihash_t *streams);
+struct h2_stream *h2_mplx_next_submit(h2_mplx *m);
/**
* Opens the output for the given stream with the specified response.
}
headers = apr_table_make(ctx->pool, 5);
apr_table_do(set_push_header, headers, ctx->req->headers, NULL);
- req = h2_request_createn(0, ctx->pool, method, ctx->req->scheme,
- ctx->req->authority, path, headers,
- ctx->req->serialize);
+ req = h2_req_createn(0, ctx->pool, method, ctx->req->scheme,
+ ctx->req->authority, path, headers,
+ ctx->req->serialize);
/* atm, we do not push on pushes */
h2_request_end_headers(req, ctx->pool, 1, 0);
push->req = req;
#include "h2_util.h"
-h2_request *h2_request_create(int id, apr_pool_t *pool, int serialize)
-{
- return h2_request_createn(id, pool, NULL, NULL, NULL, NULL, NULL,
- serialize);
-}
-
-h2_request *h2_request_createn(int id, apr_pool_t *pool,
- const char *method, const char *scheme,
- const char *authority, const char *path,
- apr_table_t *header, int serialize)
-{
- h2_request *req = apr_pcalloc(pool, sizeof(h2_request));
-
- req->id = id;
- req->method = method;
- req->scheme = scheme;
- req->authority = authority;
- req->path = path;
- req->headers = header? header : apr_table_make(pool, 10);
- req->request_time = apr_time_now();
- req->serialize = serialize;
-
- return req;
-}
-
static apr_status_t inspect_clen(h2_request *req, const char *s)
{
char *end;
return (s == end)? APR_EINVAL : APR_SUCCESS;
}
-static apr_status_t add_h1_header(h2_request *req, apr_pool_t *pool,
- const char *name, size_t nlen,
- const char *value, size_t vlen)
-{
- char *hname, *hvalue;
-
- if (h2_req_ignore_header(name, nlen)) {
- return APR_SUCCESS;
- }
- else if (H2_HD_MATCH_LIT("cookie", name, nlen)) {
- const char *existing = apr_table_get(req->headers, "cookie");
- if (existing) {
- char *nval;
-
- /* Cookie header come separately in HTTP/2, but need
- * to be merged by "; " (instead of default ", ")
- */
- hvalue = apr_pstrndup(pool, value, vlen);
- nval = apr_psprintf(pool, "%s; %s", existing, hvalue);
- apr_table_setn(req->headers, "Cookie", nval);
- return APR_SUCCESS;
- }
- }
- else if (H2_HD_MATCH_LIT("host", name, nlen)) {
- if (apr_table_get(req->headers, "Host")) {
- return APR_SUCCESS; /* ignore duplicate */
- }
- }
-
- hname = apr_pstrndup(pool, name, nlen);
- hvalue = apr_pstrndup(pool, value, vlen);
- h2_util_camel_case_header(hname, nlen);
- apr_table_mergen(req->headers, hname, hvalue);
-
- return APR_SUCCESS;
-}
-
-typedef struct {
- h2_request *req;
- apr_pool_t *pool;
-} h1_ctx;
-
-static int set_h1_header(void *ctx, const char *key, const char *value)
-{
- h1_ctx *x = ctx;
- size_t klen = strlen(key);
- if (!h2_req_ignore_header(key, klen)) {
- add_h1_header(x->req, x->pool, key, klen, value, strlen(value));
- }
- return 1;
-}
-
-static apr_status_t add_all_h1_header(h2_request *req, apr_pool_t *pool,
- apr_table_t *header)
-{
- h1_ctx x;
- x.req = req;
- x.pool = pool;
- apr_table_do(set_h1_header, &x, header, NULL);
- return APR_SUCCESS;
-}
-
-
-apr_status_t h2_request_make(h2_request *req, apr_pool_t *pool,
- const char *method, const char *scheme,
- const char *authority, const char *path,
- apr_table_t *headers)
-{
- req->method = method;
- req->scheme = scheme;
- req->authority = authority;
- req->path = path;
-
- AP_DEBUG_ASSERT(req->scheme);
- AP_DEBUG_ASSERT(req->authority);
- AP_DEBUG_ASSERT(req->path);
- AP_DEBUG_ASSERT(req->method);
-
- return add_all_h1_header(req, pool, headers);
-}
-
-apr_status_t h2_request_rwrite(h2_request *req, request_rec *r)
+apr_status_t h2_request_rwrite(h2_request *req, apr_pool_t *pool,
+ request_rec *r)
{
apr_status_t status;
const char *scheme, *authority;
- scheme = (r->parsed_uri.scheme? r->parsed_uri.scheme
+ scheme = apr_pstrdup(pool, r->parsed_uri.scheme? r->parsed_uri.scheme
: ap_http_scheme(r));
- authority = r->hostname;
+ authority = apr_pstrdup(pool, r->hostname);
if (!ap_strchr_c(authority, ':') && r->server && r->server->port) {
apr_port_t defport = apr_uri_port_of_scheme(scheme);
if (defport != r->server->port) {
/* port info missing and port is not default for scheme: append */
- authority = apr_psprintf(r->pool, "%s:%d", authority,
+ authority = apr_psprintf(pool, "%s:%d", authority,
(int)r->server->port);
}
}
- status = h2_request_make(req, r->pool, r->method, scheme, authority,
- apr_uri_unparse(r->pool, &r->parsed_uri,
- APR_URI_UNP_OMITSITEPART),
- r->headers_in);
- ap_log_rerror(APLOG_MARK, APLOG_DEBUG, status, r, APLOGNO(03058)
- "h2_request(%d): rwrite %s host=%s://%s%s",
- req->id, req->method, req->scheme, req->authority, req->path);
+ status = h2_req_make(req, pool, apr_pstrdup(pool, r->method), scheme,
+ authority, apr_uri_unparse(pool, &r->parsed_uri,
+ APR_URI_UNP_OMITSITEPART),
+ r->headers_in);
return status;
}
}
else {
/* non-pseudo header, append to work bucket of stream */
- status = add_h1_header(req, pool, name, nlen, value, vlen);
+ status = h2_headers_add_h1(req->headers, pool, name, nlen, value, vlen);
}
return status;
#include "h2.h"
-h2_request *h2_request_create(int id, apr_pool_t *pool, int serialize);
-
-h2_request *h2_request_createn(int id, apr_pool_t *pool,
- const char *method, const char *scheme,
- const char *authority, const char *path,
- apr_table_t *headers, int serialize);
-
-apr_status_t h2_request_make(h2_request *req, apr_pool_t *pool,
- const char *method, const char *scheme,
- const char *authority, const char *path,
- apr_table_t *headers);
-
-apr_status_t h2_request_rwrite(h2_request *req, request_rec *r);
+apr_status_t h2_request_rwrite(h2_request *req, apr_pool_t *pool,
+ request_rec *r);
apr_status_t h2_request_add_header(h2_request *req, apr_pool_t *pool,
const char *name, size_t nlen,
h2_stream * stream;
apr_pool_t *stream_pool;
- if (session->spare) {
- stream_pool = session->spare;
- session->spare = NULL;
- }
- else {
- apr_pool_create(&stream_pool, session->pool);
- apr_pool_tag(stream_pool, "h2_stream");
- }
+ apr_pool_create(&stream_pool, session->pool);
+ apr_pool_tag(stream_pool, "h2_stream");
stream = h2_stream_open(stream_id, stream_pool, session,
initiated_on, req);
-
+ ++session->open_streams;
+ ++session->unanswered_streams;
+ nghttp2_session_set_stream_user_data(session->ngh2, stream_id, stream);
h2_ihash_add(session->streams, stream);
+
if (H2_STREAM_CLIENT_INITIATED(stream_id)) {
if (stream_id > session->remote.emitted_max) {
++session->remote.emitted_count;
return 0;
}
+static h2_stream *get_stream(h2_session *session, int stream_id)
+{
+ return nghttp2_session_get_stream_user_data(session->ngh2, stream_id);
+}
+
static int on_data_chunk_recv_cb(nghttp2_session *ngh2, uint8_t flags,
int32_t stream_id,
const uint8_t *data, size_t len, void *userp)
return 0;
}
- stream = h2_session_get_stream(session, stream_id);
+ stream = get_stream(session, stream_id);
if (!stream) {
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03064)
"h2_stream(%ld-%d): on_data_chunk for unknown stream",
uint32_t error_code)
{
conn_rec *c = session->c;
+ apr_bucket *b;
+ apr_status_t status;
+
if (!error_code) {
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c,
"h2_stream(%ld-%d): handled, closing",
h2_stream_rst(stream, error_code);
}
- return h2_conn_io_writeb(&session->io,
- h2_bucket_eos_create(c->bucket_alloc, stream), 0);
+ b = h2_bucket_eos_create(c->bucket_alloc, stream);
+ APR_BRIGADE_INSERT_TAIL(session->bbtmp, b);
+ status = h2_conn_io_pass(&session->io, session->bbtmp);
+ apr_brigade_cleanup(session->bbtmp);
+ return status;
}
static int on_stream_close_cb(nghttp2_session *ngh2, int32_t stream_id,
h2_stream *stream;
(void)ngh2;
- stream = h2_session_get_stream(session, stream_id);
+ stream = get_stream(session, stream_id);
if (stream) {
stream_release(session, stream, error_code);
}
/* We may see HEADERs at the start of a stream or after all DATA
* streams to carry trailers. */
(void)ngh2;
- s = h2_session_get_stream(session, frame->hd.stream_id);
+ s = get_stream(session, frame->hd.stream_id);
if (s) {
/* nop */
}
return 0;
}
- stream = h2_session_get_stream(session, frame->hd.stream_id);
+ stream = get_stream(session, frame->hd.stream_id);
if (!stream) {
ap_log_cerror(APLOG_MARK, APLOG_ERR, 0, session->c,
APLOGNO(02920)
/* This can be HEADERS for a new stream, defining the request,
* or HEADER may come after DATA at the end of a stream as in
* trailers */
- stream = h2_session_get_stream(session, frame->hd.stream_id);
+ stream = get_stream(session, frame->hd.stream_id);
if (stream) {
int eos = (frame->hd.flags & NGHTTP2_FLAG_END_STREAM);
}
break;
case NGHTTP2_DATA:
- stream = h2_session_get_stream(session, frame->hd.stream_id);
+ stream = get_stream(session, frame->hd.stream_id);
if (stream) {
int eos = (frame->hd.flags & NGHTTP2_FLAG_END_STREAM);
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c,
"h2_session(%ld-%d): RST_STREAM by client, errror=%d",
session->id, (int)frame->hd.stream_id,
(int)frame->rst_stream.error_code);
- stream = h2_session_get_stream(session, frame->hd.stream_id);
+ stream = get_stream(session, frame->hd.stream_id);
if (stream && stream->request && stream->request->initiated_on) {
++session->pushes_reset;
}
return 0;
}
-static apr_status_t pass_data(void *ctx,
- const char *data, apr_off_t length)
-{
- return h2_conn_io_write(&((h2_session*)ctx)->io, data, length);
-}
-
-
static char immortal_zeros[H2_MAX_PADLEN];
static int on_send_data_cb(nghttp2_session *ngh2,
}
padlen = (unsigned char)frame->data.padlen;
- stream = h2_session_get_stream(session, stream_id);
+ stream = get_stream(session, stream_id);
if (!stream) {
ap_log_cerror(APLOG_MARK, APLOG_ERR, APR_NOTFOUND, session->c,
APLOGNO(02924)
"h2_stream(%ld-%d): send_data_cb for %ld bytes",
session->id, (int)stream_id, (long)length);
- if (h2_conn_io_is_buffered(&session->io)) {
- status = h2_conn_io_write(&session->io, (const char *)framehd, 9);
- if (status == APR_SUCCESS) {
- if (padlen) {
- status = h2_conn_io_write(&session->io, (const char *)&padlen, 1);
- }
-
- if (status == APR_SUCCESS) {
- apr_off_t len = length;
- status = h2_stream_readx(stream, pass_data, session, &len, &eos);
- if (status == APR_SUCCESS && len != length) {
- status = APR_EINVAL;
- }
- }
-
- if (status == APR_SUCCESS && padlen) {
- if (padlen) {
- status = h2_conn_io_write(&session->io, immortal_zeros, padlen);
- }
- }
- }
+ status = h2_conn_io_write(&session->io, (const char *)framehd, 9);
+ if (padlen && status == APR_SUCCESS) {
+ status = h2_conn_io_write(&session->io, (const char *)&padlen, 1);
}
- else {
- status = h2_conn_io_write(&session->io, (const char *)framehd, 9);
- if (padlen && status == APR_SUCCESS) {
- status = h2_conn_io_write(&session->io, (const char *)&padlen, 1);
- }
- if (status == APR_SUCCESS) {
- apr_off_t len = length;
- status = h2_stream_read_to(stream, session->io.output, &len, &eos);
- if (status == APR_SUCCESS && len != length) {
- status = APR_EINVAL;
- }
- }
-
- if (status == APR_SUCCESS && padlen) {
- b = apr_bucket_immortal_create(immortal_zeros, padlen,
- session->c->bucket_alloc);
- status = h2_conn_io_writeb(&session->io, b, 0);
+
+ if (status == APR_SUCCESS) {
+ apr_off_t len = length;
+ status = h2_stream_read_to(stream, session->bbtmp, &len, &eos);
+ if (status == APR_SUCCESS && len != length) {
+ status = APR_EINVAL;
}
}
+ if (status == APR_SUCCESS && padlen) {
+ b = apr_bucket_immortal_create(immortal_zeros, padlen,
+ session->c->bucket_alloc);
+ APR_BRIGADE_INSERT_TAIL(session->bbtmp, b);
+ }
+ if (status == APR_SUCCESS) {
+ status = h2_conn_io_pass(&session->io, session->bbtmp);
+ }
+
+ apr_brigade_cleanup(session->bbtmp);
if (status == APR_SUCCESS) {
stream->data_frames_sent++;
- h2_conn_io_consider_pass(&session->io);
return 0;
}
else {
return APR_SUCCESS;
}
-static void h2_session_cleanup(h2_session *session)
+static void h2_session_destroy(h2_session *session)
{
- AP_DEBUG_ASSERT(session);
- /* This is an early cleanup of the session that may
- * discard what is no longer necessary for *new* streams
- * and general HTTP/2 processing.
- * At this point, all frames are in transit or somehwere in
- * our buffers or passed down output filters.
- * h2 streams might still being written out.
- */
- if (session->c) {
- h2_ctx_clear(session->c);
+ AP_DEBUG_ASSERT(session);
+
+ h2_ihash_clear(session->streams);
+ if (session->mplx) {
+ h2_mplx_set_consumed_cb(session->mplx, NULL, NULL);
+ h2_mplx_release_and_join(session->mplx, session->iowait);
+ session->mplx = NULL;
}
+
+ ap_remove_input_filter_byhandle((session->r? session->r->input_filters :
+ session->c->input_filters), "H2_IN");
if (session->ngh2) {
nghttp2_session_del(session->ngh2);
session->ngh2 = NULL;
}
- if (session->spare) {
- apr_pool_destroy(session->spare);
- session->spare = NULL;
+ if (session->c) {
+ h2_ctx_clear(session->c);
}
-}
-static void h2_session_destroy(h2_session *session)
-{
- AP_DEBUG_ASSERT(session);
-
- h2_session_cleanup(session);
- h2_ihash_clear(session->streams);
-
if (APLOGctrace1(session->c)) {
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c,
"h2_session(%ld): destroy", session->id);
}
- if (session->mplx) {
- h2_mplx_set_consumed_cb(session->mplx, NULL, NULL);
- h2_mplx_release_and_join(session->mplx, session->iowait);
- session->mplx = NULL;
- }
if (session->pool) {
apr_pool_destroy(session->pool);
}
h2_session_receive, session);
ap_add_input_filter("H2_IN", session->cin, r, c);
- h2_conn_io_init(&session->io, c, session->config, session->pool);
+ h2_conn_io_init(&session->io, c, session->config);
session->bbtmp = apr_brigade_create(session->pool, c->bucket_alloc);
status = init_callbacks(c, &callbacks);
static int h2_session_resume_streams_with_data(h2_session *session)
{
AP_DEBUG_ASSERT(session);
- if (!h2_ihash_empty(session->streams)
- && session->mplx && !session->mplx->aborted) {
+ if (session->open_streams && !session->mplx->aborted) {
resume_ctx ctx;
-
ctx.session = session;
ctx.resume_count = 0;
return 0;
}
-h2_stream *h2_session_get_stream(h2_session *session, int stream_id)
-{
- return h2_ihash_get(session->streams, stream_id);
-}
-
static ssize_t stream_data_cb(nghttp2_session *ng2s,
int32_t stream_id,
uint8_t *buf,
(void)ng2s;
(void)buf;
(void)source;
- stream = h2_session_get_stream(session, stream_id);
+ stream = get_stream(session, stream_id);
if (!stream) {
ap_log_cerror(APLOG_MARK, APLOG_ERR, 0, session->c,
APLOGNO(02937)
stream->id, err);
}
- stream->submitted = 1;
+ --session->unanswered_streams;
if (stream->request && stream->request->initiated_on) {
++session->pushes_submitted;
}
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, session->c,
"h2_stream(%ld-%d): scheduling push stream",
session->id, stream->id);
- h2_stream_cleanup(stream);
stream = NULL;
}
++session->unsent_promises;
apr_status_t h2_session_stream_done(h2_session *session, h2_stream *stream)
{
- apr_pool_t *pool = h2_stream_detach_pool(stream);
- int stream_id = stream->id;
- int rst_error = stream->rst_error;
-
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c,
- "h2_stream(%ld-%d): cleanup by EOS bucket destroy",
- session->id, stream_id);
- if (session->streams) {
- h2_ihash_remove(session->streams, stream_id);
- }
+ "h2_stream(%ld-%d): EOS bucket cleanup -> done",
+ session->id, stream->id);
+ h2_ihash_remove(session->streams, stream->id);
+ --session->open_streams;
+ --session->unanswered_streams;
+ h2_mplx_stream_done(session->mplx, stream);
- h2_stream_cleanup(stream);
- h2_mplx_stream_done(session->mplx, stream_id, rst_error);
- h2_stream_destroy(stream);
-
- if (pool) {
- apr_pool_clear(pool);
- if (session->spare) {
- apr_pool_destroy(session->spare);
- }
- session->spare = pool;
- }
-
return APR_SUCCESS;
}
if (has_unsubmitted_streams(session)) {
/* If we have responses ready, submit them now. */
- while ((stream = h2_mplx_next_submit(session->mplx, session->streams))) {
+ while ((stream = h2_mplx_next_submit(session->mplx))) {
status = submit_response(session, stream);
++session->unsent_submits;
apr_snprintf(session->status, sizeof(session->status),
"%s, streams: %d/%d/%d/%d/%d (open/recv/resp/push/rst)",
msg? msg : "-",
- (int)h2_ihash_count(session->streams),
+ (int)session->open_streams,
(int)session->remote.emitted_count,
(int)session->responses_submitted,
(int)session->pushes_submitted,
session->state = nstate;
switch (session->state) {
case H2_SESSION_ST_IDLE:
- update_child_status(session, (h2_ihash_empty(session->streams)?
+ update_child_status(session, (session->open_streams == 0?
SERVER_BUSY_KEEPALIVE
: SERVER_BUSY_READ), "idle");
break;
* CPU cycles. Ideally, we'd like to do a blocking read, but that
* is not possible if we have scheduled tasks and wait
* for them to produce something. */
- if (h2_conn_io_flush(&session->io) != APR_SUCCESS) {
- dispatch_event(session, H2_SESSION_EV_CONN_ERROR, 0, NULL);
- }
- if (h2_ihash_empty(session->streams)) {
+ if (!session->open_streams) {
if (!is_accepting_streams(session)) {
/* We are no longer accepting new streams and have
* finished processing existing ones. Time to leave. */
* new output data from task processing,
* switch to blocking reads. We are probably waiting on
* window updates. */
+ if (h2_conn_io_flush(&session->io) != APR_SUCCESS) {
+ dispatch_event(session, H2_SESSION_EV_CONN_ERROR, 0, NULL);
+ return;
+ }
transit(session, "no io", H2_SESSION_ST_IDLE);
session->idle_until = apr_time_now() + session->s->timeout;
session->keep_sync_until = session->idle_until;
break;
case H2_SESSION_ST_IDLE:
- /* make certain, the client receives everything before we idle */
- if (!session->keep_sync_until
- && async && h2_ihash_empty(session->streams)
+ /* make certain, we send everything before we idle */
+ if (!session->keep_sync_until && async && !session->open_streams
&& !session->r && session->remote.emitted_count) {
ap_log_cerror( APLOG_MARK, APLOG_TRACE1, status, c,
"h2_session(%ld): async idle, nonblock read", session->id);
}
}
- if (!h2_ihash_empty(session->streams)) {
- /* resume any streams for which data is available again */
+ if (session->open_streams) {
+ /* resume any streams with output data */
h2_session_resume_streams_with_data(session);
/* Submit any responses/push_promises that are ready */
status = h2_session_submit(session);
session->start_wait = apr_time_now();
if (h2_conn_io_flush(&session->io) != APR_SUCCESS) {
dispatch_event(session, H2_SESSION_EV_CONN_ERROR, 0, NULL);
+ break;
}
}
else if ((apr_time_now() - session->start_wait) >= session->s->timeout) {
session->wait_us = 0;
dispatch_event(session, H2_SESSION_EV_DATA_READ, 0, NULL);
}
- else if (status == APR_TIMEUP) {
+ else if (APR_STATUS_IS_TIMEUP(status)) {
/* go back to checking all inputs again */
transit(session, "wait cycle", session->local.accepting?
H2_SESSION_ST_BUSY : H2_SESSION_ST_LOCAL_SHUTDOWN);
}
+ else if (APR_STATUS_IS_ECONNRESET(status)
+ || APR_STATUS_IS_ECONNABORTED(status)) {
+ dispatch_event(session, H2_SESSION_EV_CONN_ERROR, 0, NULL);
+ }
else {
ap_log_cerror(APLOG_MARK, APLOG_WARNING, status, c,
"h2_session(%ld): waiting on conditional",
ap_log_cerror( APLOG_MARK, APLOG_TRACE1, status, c,
"h2_session(%ld): [%s] process returns",
session->id, state_name(session->state));
-
+
if ((session->state != H2_SESSION_ST_DONE)
&& (APR_STATUS_IS_EOF(status)
|| APR_STATUS_IS_ECONNRESET(status)
struct h2_push_diary *push_diary; /* remember pushes, avoid duplicates */
+ int open_streams; /* number of streams open */
+ int unanswered_streams; /* number of streams waiting for response */
int unsent_submits; /* number of submitted, but not yet written responses. */
int unsent_promises; /* number of submitted, but not yet written push promised */
apr_bucket_brigade *bbtmp; /* brigade for keeping temporary data */
struct apr_thread_cond_t *iowait; /* our cond when trywaiting for data */
- apr_pool_t *spare; /* spare stream pool */
-
char status[64]; /* status message for scoreboard */
int last_status_code; /* the one already reported */
const char *last_status_msg; /* the one already reported */
apr_status_t h2_session_handle_response(h2_session *session,
struct h2_stream *stream);
-/* Get the h2_stream for the given stream idenrtifier. */
-struct h2_stream *h2_session_get_stream(h2_session *session, int stream_id);
-
/**
* Create and register a new stream under the given id.
*
/*CL*/{ 1, 1, 0, 0, 1, 1, 1 },
};
-#define H2_STREAM_OUT_LOG(lvl,s,msg) \
- do { \
- if (APLOG_C_IS_LEVEL((s)->session->c,lvl)) \
- h2_util_bb_log((s)->session->c,(s)->session->id,lvl,msg,(s)->buffer); \
- } while(0)
-
+static void H2_STREAM_OUT_LOG(int lvl, h2_stream *s, char *tag)
+{
+ if (APLOG_C_IS_LEVEL(s->session->c, lvl)) {
+ conn_rec *c = s->session->c;
+ char buffer[4 * 1024];
+ const char *line = "(null)";
+ apr_size_t len, bmax = sizeof(buffer)/sizeof(buffer[0]);
+
+ len = h2_util_bb_print(buffer, bmax, tag, "", s->buffer);
+ ap_log_cerror(APLOG_MARK, lvl, 0, c, "bb_dump(%ld-%d): %s",
+ c->id, s->id, len? buffer : line);
+ }
+}
static int set_state(h2_stream *stream, h2_stream_state_t state)
{
}
}
+static apr_status_t stream_pool_cleanup(void *ctx)
+{
+ h2_stream *stream = ctx;
+ apr_status_t status;
+
+ if (stream->input) {
+ h2_beam_destroy(stream->input);
+ stream->input = NULL;
+ }
+ if (stream->files) {
+ apr_file_t *file;
+ int i;
+ for (i = 0; i < stream->files->nelts; ++i) {
+ file = APR_ARRAY_IDX(stream->files, i, apr_file_t*);
+ status = apr_file_close(file);
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE3, status, stream->session->c,
+ "h2_stream(%ld-%d): destroy, closed file %d",
+ stream->session->id, stream->id, i);
+ }
+ stream->files = NULL;
+ }
+ return APR_SUCCESS;
+}
+
h2_stream *h2_stream_open(int id, apr_pool_t *pool, h2_session *session,
int initiated_on, const h2_request *creq)
{
req->initiated_on = initiated_on;
}
else {
- req = h2_request_create(id, pool,
+ req = h2_req_create(id, pool,
h2_config_geti(session->config, H2_CONF_SER_HEADERS));
}
stream->request = req;
+ apr_pool_cleanup_register(pool, stream, stream_pool_cleanup,
+ apr_pool_cleanup_null);
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03082)
"h2_stream(%ld-%d): opened", session->id, stream->id);
return stream;
void h2_stream_cleanup(h2_stream *stream)
{
AP_DEBUG_ASSERT(stream);
- if (stream->input) {
- h2_beam_destroy(stream->input);
- stream->input = NULL;
- }
if (stream->buffer) {
apr_brigade_cleanup(stream->buffer);
}
+ if (stream->input) {
+ apr_status_t status;
+ status = h2_beam_shutdown(stream->input, APR_NONBLOCK_READ);
+ if (status == APR_EAGAIN) {
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, stream->session->c,
+ "h2_stream(%ld-%d): wait on input shutdown",
+ stream->session->id, stream->id);
+ status = h2_beam_shutdown(stream->input, APR_BLOCK_READ);
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, stream->session->c,
+ "h2_stream(%ld-%d): input shutdown returned",
+ stream->session->id, stream->id);
+ }
+ }
}
void h2_stream_destroy(h2_stream *stream)
{
AP_DEBUG_ASSERT(stream);
- h2_stream_cleanup(stream);
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, stream->session->c,
+ "h2_stream(%ld-%d): destroy",
+ stream->session->id, stream->id);
if (stream->pool) {
apr_pool_destroy(stream->pool);
}
return APR_ECONNRESET;
}
set_state(stream, H2_STREAM_ST_OPEN);
- status = h2_request_rwrite(stream->request, r);
+ status = h2_request_rwrite(stream->request, stream->pool, r);
stream->request->serialize = h2_config_geti(h2_config_rget(r),
H2_CONF_SER_HEADERS);
+ ap_log_rerror(APLOG_MARK, APLOG_DEBUG, status, r, APLOGNO(03058)
+ "h2_request(%d): rwrite %s host=%s://%s%s",
+ stream->request->id, stream->request->method,
+ stream->request->scheme, stream->request->authority,
+ stream->request->path);
return status;
}
static apr_status_t fill_buffer(h2_stream *stream, apr_size_t amount)
{
+ conn_rec *c = stream->session->c;
+ apr_bucket *b;
+ apr_status_t status;
+
if (!stream->output) {
return APR_EOF;
}
- return h2_beam_receive(stream->output, stream->buffer,
- APR_NONBLOCK_READ, amount);
+ status = h2_beam_receive(stream->output, stream->buffer,
+ APR_NONBLOCK_READ, amount);
+ /* The buckets we reveive are using the stream->buffer pool as
+ * lifetime which is exactly what we want since this is stream->pool.
+ *
+ * However: when we send these buckets down the core output filters, the
+ * filter might decide to setaside them into a pool of its own. And it
+ * might decide, after having sent the buckets, to clear its pool.
+ *
+ * This is problematic for file buckets because it then closed the contained
+ * file. Any split off buckets we sent afterwards will result in a
+ * APR_EBADF.
+ */
+ for (b = APR_BRIGADE_FIRST(stream->buffer);
+ b != APR_BRIGADE_SENTINEL(stream->buffer);
+ b = APR_BUCKET_NEXT(b)) {
+ if (APR_BUCKET_IS_FILE(b)) {
+ apr_bucket_file *f = (apr_bucket_file *)b->data;
+ apr_pool_t *fpool = apr_file_pool_get(f->fd);
+ if (fpool != c->pool) {
+ apr_bucket_setaside(b, c->pool);
+ if (!stream->files) {
+ stream->files = apr_array_make(stream->pool,
+ 5, sizeof(apr_file_t*));
+ }
+ APR_ARRAY_PUSH(stream->files, apr_file_t*) = f->fd;
+ }
+ }
+ }
+ return status;
}
apr_status_t h2_stream_set_response(h2_stream *stream, h2_response *response,
return status;
}
+static const apr_size_t DATA_CHUNK_SIZE = ((16*1024) - 100 - 9);
+
apr_status_t h2_stream_out_prepare(h2_stream *stream,
apr_off_t *plen, int *peos)
{
conn_rec *c = stream->session->c;
apr_status_t status = APR_SUCCESS;
- apr_off_t requested = (*plen > 0)? *plen : 32*1024;
+ apr_off_t requested;
if (stream->rst_error) {
*plen = 0;
return APR_ECONNRESET;
}
+ if (*plen > 0) {
+ requested = H2MIN(*plen, DATA_CHUNK_SIZE);
+ }
+ else {
+ requested = DATA_CHUNK_SIZE;
+ }
+ *plen = requested;
+
H2_STREAM_OUT_LOG(APLOG_TRACE2, stream, "h2_stream_out_prepare_pre");
h2_util_bb_avail(stream->buffer, plen, peos);
- if (!*peos && !*plen) {
+ if (!*peos && *plen < requested) {
/* try to get more data */
- status = fill_buffer(stream, H2MIN(requested, 32*1024));
+ status = fill_buffer(stream, (requested - *plen) + DATA_CHUNK_SIZE);
if (APR_STATUS_IS_EOF(status)) {
apr_bucket *eos = apr_bucket_eos_create(c->bucket_alloc);
APR_BRIGADE_INSERT_TAIL(stream->buffer, eos);
}
-apr_status_t h2_stream_readx(h2_stream *stream,
- h2_io_data_cb *cb, void *ctx,
- apr_off_t *plen, int *peos)
-{
- conn_rec *c = stream->session->c;
- apr_status_t status = APR_SUCCESS;
-
- if (stream->rst_error) {
- return APR_ECONNRESET;
- }
- status = h2_util_bb_readx(stream->buffer, cb, ctx, plen, peos);
- if (status == APR_SUCCESS && !*peos && !*plen) {
- status = APR_EAGAIN;
- }
- ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, c,
- "h2_stream(%ld-%d): readx, len=%ld eos=%d",
- c->id, stream->id, (long)*plen, *peos);
- return status;
-}
-
-
apr_status_t h2_stream_read_to(h2_stream *stream, apr_bucket_brigade *bb,
apr_off_t *plen, int *peos)
{
struct h2_bucket_beam *output;
apr_bucket_brigade *buffer;
apr_bucket_brigade *tmp;
+ apr_array_header_t *files; /* apr_file_t* we collected during I/O */
int rst_error; /* stream error for RST_STREAM */
unsigned int aborted : 1; /* was aborted */
unsigned int submitted : 1; /* response HEADER has been sent */
apr_off_t input_remaining; /* remaining bytes on input as advertised via content-length */
-
apr_off_t data_frames_sent; /* # of DATA frames sent out for this stream */
};
apr_status_t h2_stream_out_prepare(h2_stream *stream,
apr_off_t *plen, int *peos);
-/**
- * Read data from the stream output.
- *
- * @param stream the stream to read from
- * @param cb callback to invoke for byte chunks read. Might be invoked
- * multiple times (with different values) for one read operation.
- * @param ctx context data for callback
- * @param plen (in-/out) max. number of bytes to read and on return actual
- * number of bytes read
- * @param peos (out) != 0 iff end of stream has been reached while reading
- * @return APR_SUCCESS if out information was computed successfully.
- * APR_EAGAIN if not data is available and end of stream has not been
- * reached yet.
- */
-apr_status_t h2_stream_readx(h2_stream *stream, h2_io_data_cb *cb,
- void *ctx, apr_off_t *plen, int *peos);
-
/**
* Read a maximum number of bytes into the bucket brigade.
*
if (task->output.beam) {
h2_beam_abort(task->output.beam);
}
+ if (task->c) {
+ task->c->aborted = 1;
+ }
}
apr_status_t h2_task_shutdown(h2_task *task, int block)
unsigned int frozen : 1;
unsigned int blocking : 1;
unsigned int detached : 1;
- unsigned int orphaned : 1; /* h2_stream is gone for this task */
unsigned int submitted : 1; /* response has been submitted to client */
unsigned int worker_started : 1; /* h2_worker started processing for this io */
unsigned int worker_done : 1; /* h2_worker finished for this io */
#include <nghttp2/nghttp2.h>
-#include "h2_private.h"
-#include "h2_request.h"
+#include "h2.h"
#include "h2_util.h"
/* h2_log2(n) iff n is a power of 2 */
return off;
}
-void h2_util_bb_log(conn_rec *c, int stream_id, int level,
- const char *tag, apr_bucket_brigade *bb)
-{
- char buffer[4 * 1024];
- const char *line = "(null)";
- apr_size_t len, bmax = sizeof(buffer)/sizeof(buffer[0]);
-
- len = h2_util_bb_print(buffer, bmax, tag, "", bb);
- /* Intentional no APLOGNO */
- ap_log_cerror(APLOG_MARK, level, 0, c, "bb_dump(%ld-%d): %s",
- c->id, stream_id, len? buffer : line);
-}
-
apr_status_t h2_append_brigade(apr_bucket_brigade *to,
apr_bucket_brigade *from,
apr_off_t *plen,
if (APR_BUCKET_IS_METADATA(e)) {
if (APR_BUCKET_IS_EOS(e)) {
*peos = 1;
+ apr_bucket_delete(e);
+ continue;
}
}
else {
|| ignore_header(H2_LIT_ARGS(IgnoredProxyRespHds), name, len));
}
+apr_status_t h2_headers_add_h1(apr_table_t *headers, apr_pool_t *pool,
+ const char *name, size_t nlen,
+ const char *value, size_t vlen)
+{
+ char *hname, *hvalue;
+
+ if (h2_req_ignore_header(name, nlen)) {
+ return APR_SUCCESS;
+ }
+ else if (H2_HD_MATCH_LIT("cookie", name, nlen)) {
+ const char *existing = apr_table_get(headers, "cookie");
+ if (existing) {
+ char *nval;
+
+ /* Cookie header come separately in HTTP/2, but need
+ * to be merged by "; " (instead of default ", ")
+ */
+ hvalue = apr_pstrndup(pool, value, vlen);
+ nval = apr_psprintf(pool, "%s; %s", existing, hvalue);
+ apr_table_setn(headers, "Cookie", nval);
+ return APR_SUCCESS;
+ }
+ }
+ else if (H2_HD_MATCH_LIT("host", name, nlen)) {
+ if (apr_table_get(headers, "Host")) {
+ return APR_SUCCESS; /* ignore duplicate */
+ }
+ }
+
+ hname = apr_pstrndup(pool, name, nlen);
+ hvalue = apr_pstrndup(pool, value, vlen);
+ h2_util_camel_case_header(hname, nlen);
+ apr_table_mergen(headers, hname, hvalue);
+
+ return APR_SUCCESS;
+}
+
+/*******************************************************************************
+ * h2 request handling
+ ******************************************************************************/
+
+h2_request *h2_req_createn(int id, apr_pool_t *pool, const char *method,
+ const char *scheme, const char *authority,
+ const char *path, apr_table_t *header, int serialize)
+{
+ h2_request *req = apr_pcalloc(pool, sizeof(h2_request));
+
+ req->id = id;
+ req->method = method;
+ req->scheme = scheme;
+ req->authority = authority;
+ req->path = path;
+ req->headers = header? header : apr_table_make(pool, 10);
+ req->request_time = apr_time_now();
+ req->serialize = serialize;
+
+ return req;
+}
+
+h2_request *h2_req_create(int id, apr_pool_t *pool, int serialize)
+{
+ return h2_req_createn(id, pool, NULL, NULL, NULL, NULL, NULL, serialize);
+}
+
+typedef struct {
+ apr_table_t *headers;
+ apr_pool_t *pool;
+} h1_ctx;
+
+static int set_h1_header(void *ctx, const char *key, const char *value)
+{
+ h1_ctx *x = ctx;
+ size_t klen = strlen(key);
+ if (!h2_req_ignore_header(key, klen)) {
+ h2_headers_add_h1(x->headers, x->pool, key, klen, value, strlen(value));
+ }
+ return 1;
+}
+
+apr_status_t h2_req_make(h2_request *req, apr_pool_t *pool,
+ const char *method, const char *scheme,
+ const char *authority, const char *path,
+ apr_table_t *headers)
+{
+ h1_ctx x;
+
+ req->method = method;
+ req->scheme = scheme;
+ req->authority = authority;
+ req->path = path;
+
+ AP_DEBUG_ASSERT(req->scheme);
+ AP_DEBUG_ASSERT(req->authority);
+ AP_DEBUG_ASSERT(req->path);
+ AP_DEBUG_ASSERT(req->method);
+
+ x.pool = pool;
+ x.headers = req->headers;
+ apr_table_do(set_h1_header, &x, headers, NULL);
+ return APR_SUCCESS;
+}
/*******************************************************************************
* frame logging
h2_ngheader *h2_util_ngheader_make_req(apr_pool_t *p,
const struct h2_request *req);
+apr_status_t h2_headers_add_h1(apr_table_t *headers, apr_pool_t *pool,
+ const char *name, size_t nlen,
+ const char *value, size_t vlen);
+
+/*******************************************************************************
+ * h2_request helpers
+ ******************************************************************************/
+
+struct h2_request *h2_req_createn(int id, apr_pool_t *pool, const char *method,
+ const char *scheme, const char *authority,
+ const char *path, apr_table_t *header,
+ int serialize);
+struct h2_request *h2_req_create(int id, apr_pool_t *pool, int serialize);
+
+apr_status_t h2_req_make(struct h2_request *req, apr_pool_t *pool,
+ const char *method, const char *scheme,
+ const char *authority, const char *path,
+ apr_table_t *headers);
+
/*******************************************************************************
* apr brigade helpers
******************************************************************************/
* @param tag a short message text about the context
* @param bb the brigade to log
*/
-void h2_util_bb_log(conn_rec *c, int stream_id, int level,
- const char *tag, apr_bucket_brigade *bb);
+#define h2_util_bb_log(c, i, level, tag, bb) \
+do { \
+ char buffer[4 * 1024]; \
+ const char *line = "(null)"; \
+ apr_size_t len, bmax = sizeof(buffer)/sizeof(buffer[0]); \
+ len = h2_util_bb_print(buffer, bmax, (tag), "", (bb)); \
+ ap_log_cerror(APLOG_MARK, level, 0, (c), "bb_dump(%ld-%d): %s", \
+ (c)->id, (int)(i), (len? buffer : line)); \
+} while(0)
+
/**
* Transfer buckets from one brigade to another with a limit on the
* @macro
* Version number of the http2 module as c string
*/
-#define MOD_HTTP2_VERSION "1.5.2"
+#define MOD_HTTP2_VERSION "1.5.3"
/**
* @macro
* release. This is a 24 bit number with 8 bits for major number, 8 bits
* for minor and 8 bits for patch. Version 1.2.3 becomes 0x010203.
*/
-#define MOD_HTTP2_VERSION_NUM 0x010502
+#define MOD_HTTP2_VERSION_NUM 0x010503
#endif /* mod_h2_h2_version_h */