Changes with Apache 2.4.19
+ *) mod_http2: fixes problem with wrong lifetime of file buckets on main
+ connection. [Stefan Eissing]
+
*) mod_http2: fixes incorrect denial of requests without :authority header.
[Stefan Eissing]
/** End Of HTTP/2 SESSION (H2EOC) bucket */
extern const apr_bucket_type_t h2_bucket_type_eoc;
+#define H2_BUCKET_IS_H2EOC(e) (e->type == &h2_bucket_type_eoc)
apr_bucket * h2_bucket_eoc_make(apr_bucket *b,
struct h2_session *session);
/** End Of HTTP/2 STREAM (H2EOS) bucket */
extern const apr_bucket_type_t h2_bucket_type_eos;
+#define H2_BUCKET_IS_H2EOS(e) (e->type == &h2_bucket_type_eos)
apr_bucket *h2_bucket_eos_make(apr_bucket *b, struct h2_stream *stream);
}
apr_pool_create_ex(&pool, parent, NULL, allocator);
apr_pool_tag(pool, "h2_slave_conn");
- apr_allocator_owner_set(allocator, parent);
+ apr_allocator_owner_set(allocator, pool);
c = (conn_rec *) apr_palloc(pool, sizeof(conn_rec));
if (c == NULL) {
void h2_slave_destroy(conn_rec *slave, apr_allocator_t **pallocator)
{
+ apr_pool_t *parent;
apr_allocator_t *allocator = apr_pool_allocator_get(slave->pool);
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, slave,
"h2_slave_conn(%ld): destroy (task=%s)", slave->id,
apr_table_get(slave->notes, H2_TASK_ID_NOTE));
- apr_pool_destroy(slave->pool);
- if (pallocator) {
+ /* Attache the allocator to the parent pool and return it for
+ * reuse, otherwise the own is still the slave pool and it will
+ * get destroyed with it. */
+ parent = apr_pool_parent_get(slave->pool);
+ if (pallocator && parent) {
+ apr_allocator_owner_set(allocator, parent);
*pallocator = allocator;
}
- else {
- apr_allocator_destroy(allocator);
- }
+ apr_pool_destroy(slave->pool);
}
*/
#include <assert.h>
-
+#include <apr_strings.h>
#include <ap_mpm.h>
#include <httpd.h>
#include <http_core.h>
#include <http_log.h>
#include <http_connection.h>
+#include <http_request.h>
#include "h2_private.h"
#include "h2_bucket_eoc.h"
+#include "h2_bucket_eos.h"
#include "h2_config.h"
#include "h2_conn_io.h"
#include "h2_h2.h"
* which seems to create less TCP packets overall
*/
#define WRITE_SIZE_MAX (TLS_DATA_MAX - 100)
-
#define WRITE_BUFFER_SIZE (5*WRITE_SIZE_MAX)
+
+static void h2_conn_io_bb_log(conn_rec *c, int stream_id, int level,
+ const char *tag, apr_bucket_brigade *bb)
+{
+ char buffer[16 * 1024];
+ const char *line = "(null)";
+ apr_size_t bmax = sizeof(buffer)/sizeof(buffer[0]);
+ int off = 0;
+ apr_bucket *b;
+
+ if (bb) {
+ memset(buffer, 0, bmax--);
+ for (b = APR_BRIGADE_FIRST(bb);
+ bmax && (b != APR_BRIGADE_SENTINEL(bb));
+ b = APR_BUCKET_NEXT(b)) {
+
+ if (APR_BUCKET_IS_METADATA(b)) {
+ if (APR_BUCKET_IS_EOS(b)) {
+ off += apr_snprintf(buffer+off, bmax-off, "eos ");
+ }
+ else if (APR_BUCKET_IS_FLUSH(b)) {
+ off += apr_snprintf(buffer+off, bmax-off, "flush ");
+ }
+ else if (AP_BUCKET_IS_EOR(b)) {
+ off += apr_snprintf(buffer+off, bmax-off, "eor ");
+ }
+ else if (H2_BUCKET_IS_H2EOC(b)) {
+ off += apr_snprintf(buffer+off, bmax-off, "h2eoc ");
+ }
+ else if (H2_BUCKET_IS_H2EOS(b)) {
+ off += apr_snprintf(buffer+off, bmax-off, "h2eos ");
+ }
+ else {
+ off += apr_snprintf(buffer+off, bmax-off, "meta(unknown) ");
+ }
+ }
+ else {
+ const char *btype = "data";
+ if (APR_BUCKET_IS_FILE(b)) {
+ btype = "file";
+ }
+ else if (APR_BUCKET_IS_PIPE(b)) {
+ btype = "pipe";
+ }
+ else if (APR_BUCKET_IS_SOCKET(b)) {
+ btype = "socket";
+ }
+ else if (APR_BUCKET_IS_HEAP(b)) {
+ btype = "heap";
+ }
+ else if (APR_BUCKET_IS_TRANSIENT(b)) {
+ btype = "transient";
+ }
+ else if (APR_BUCKET_IS_IMMORTAL(b)) {
+ btype = "immortal";
+ }
+#if APR_HAS_MMAP
+ else if (APR_BUCKET_IS_MMAP(b)) {
+ btype = "mmap";
+ }
+#endif
+ else if (APR_BUCKET_IS_POOL(b)) {
+ btype = "pool";
+ }
+
+ off += apr_snprintf(buffer+off, bmax-off, "%s[%ld] ",
+ btype,
+ (long)(b->length == ((apr_size_t)-1)?
+ -1 : b->length));
+ }
+ }
+ line = *buffer? buffer : "(empty)";
+ }
+ /* Intentional no APLOGNO */
+ ap_log_cerror(APLOG_MARK, level, 0, c, "bb_dump(%ld-%d)-%s: %s",
+ c->id, stream_id, tag, line);
+
+}
+
apr_status_t h2_conn_io_init(h2_conn_io *io, conn_rec *c,
const h2_config *cfg,
apr_pool_t *pool)
{
- io->connection = c;
- io->output = apr_brigade_create(pool, c->bucket_alloc);
- io->buflen = 0;
- io->is_tls = h2_h2_is_tls(c);
- io->buffer_output = io->is_tls;
+ io->c = c;
+ io->output = apr_brigade_create(pool, c->bucket_alloc);
+ io->buflen = 0;
+ io->is_tls = h2_h2_is_tls(c);
+ io->buffer_output = io->is_tls;
if (io->buffer_output) {
io->bufsize = WRITE_BUFFER_SIZE;
}
if (io->is_tls) {
- /* That is where we start with,
- * see https://issues.apache.org/jira/browse/TS-2503 */
+ /* This is what we start with,
+ * see https://issues.apache.org/jira/browse/TS-2503
+ */
io->warmup_size = h2_config_geti64(cfg, H2_CONF_TLS_WARMUP_SIZE);
io->cooldown_usecs = (h2_config_geti(cfg, H2_CONF_TLS_COOLDOWN_SECS)
* APR_USEC_PER_SEC);
}
if (APLOGctrace1(c)) {
- ap_log_cerror(APLOG_MARK, APLOG_TRACE4, 0, io->connection,
- "h2_conn_io(%ld): init, buffering=%d, warmup_size=%ld, cd_secs=%f",
- io->connection->id, io->buffer_output, (long)io->warmup_size,
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE4, 0, io->c,
+ "h2_conn_io(%ld): init, buffering=%d, warmup_size=%ld, "
+ "cd_secs=%f", io->c->id, io->buffer_output,
+ (long)io->warmup_size,
((float)io->cooldown_usecs/APR_USEC_PER_SEC));
}
}
ap_update_child_status_from_conn(c->sbh, SERVER_BUSY_WRITE, c);
- status = apr_brigade_length(bb, 0, &bblen);
- if (status == APR_SUCCESS) {
- ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, c, APLOGNO(03044)
+ apr_brigade_length(bb, 0, &bblen);
+ h2_conn_io_bb_log(c, 0, APLOG_TRACE2, "master conn pass", bb);
+ status = ap_pass_brigade(c->output_filters, bb);
+ if (status == APR_SUCCESS && pctx->io) {
+ pctx->io->bytes_written += (apr_size_t)bblen;
+ pctx->io->last_write = apr_time_now();
+ }
+ if (status != APR_SUCCESS) {
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, c, APLOGNO(03044)
"h2_conn_io(%ld): pass_out brigade %ld bytes",
c->id, (long)bblen);
- status = ap_pass_brigade(c->output_filters, bb);
- if (status == APR_SUCCESS && pctx->io) {
- pctx->io->bytes_written += (apr_size_t)bblen;
- pctx->io->last_write = apr_time_now();
- }
}
apr_brigade_cleanup(bb);
return status;
/* long time not written, reset write size */
io->write_size = WRITE_SIZE_INITIAL;
io->bytes_written = 0;
- ap_log_cerror(APLOG_MARK, APLOG_TRACE4, 0, io->connection,
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE4, 0, io->c,
"h2_conn_io(%ld): timeout write size reset to %ld",
- (long)io->connection->id, (long)io->write_size);
+ (long)io->c->id, (long)io->write_size);
}
else if (io->write_size < WRITE_SIZE_MAX
&& io->bytes_written >= io->warmup_size) {
/* connection is hot, use max size */
io->write_size = WRITE_SIZE_MAX;
- ap_log_cerror(APLOG_MARK, APLOG_TRACE4, 0, io->connection,
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE4, 0, io->c,
"h2_conn_io(%ld): threshold reached, write size now %ld",
- (long)io->connection->id, (long)io->write_size);
+ (long)io->c->id, (long)io->write_size);
}
bcount = (int)(remaining / io->write_size);
return APR_SUCCESS;
}
-static apr_status_t h2_conn_io_flush_int(h2_conn_io *io, int force, int eoc)
+static apr_status_t h2_conn_io_flush_int(h2_conn_io *io, int flush, int eoc)
{
- if (io->buflen > 0 || !APR_BRIGADE_EMPTY(io->output)) {
- pass_out_ctx ctx;
-
- if (io->buflen > 0) {
- /* something in the buffer, put it in the output brigade */
- ap_log_cerror(APLOG_MARK, APLOG_TRACE4, 0, io->connection,
- "h2_conn_io: flush, flushing %ld bytes", (long)io->buflen);
- bucketeer_buffer(io);
- }
-
- if (force) {
- APR_BRIGADE_INSERT_TAIL(io->output,
- apr_bucket_flush_create(io->output->bucket_alloc));
- }
-
- ap_log_cerror(APLOG_MARK, APLOG_TRACE4, 0, io->connection,
- "h2_conn_io: flush");
- /* Send it out */
- io->buflen = 0;
- ctx.c = io->connection;
- ctx.io = eoc? NULL : io;
+ pass_out_ctx ctx;
+ apr_bucket *b;
+
+ if (io->buflen == 0 && APR_BRIGADE_EMPTY(io->output)) {
+ return APR_SUCCESS;
+ }
- return pass_out(io->output, &ctx);
- /* no more access after this, as we might have flushed an EOC bucket
- * that de-allocated us all. */
+ if (io->buflen > 0) {
+ /* something in the buffer, put it in the output brigade */
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE4, 0, io->c,
+ "h2_conn_io: flush, flushing %ld bytes",
+ (long)io->buflen);
+ bucketeer_buffer(io);
}
- return APR_SUCCESS;
-}
-
-apr_status_t h2_conn_io_pass(h2_conn_io *io, int flush)
-{
- return h2_conn_io_flush_int(io, flush, 0);
+
+ if (flush) {
+ b = apr_bucket_flush_create(io->c->bucket_alloc);
+ APR_BRIGADE_INSERT_TAIL(io->output, b);
+ }
+
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE4, 0, io->c, "h2_conn_io: flush");
+ io->buflen = 0;
+ ctx.c = io->c;
+ ctx.io = eoc? NULL : io;
+
+ return pass_out(io->output, &ctx);
+ /* no more access after this, as we might have flushed an EOC bucket
+ * that de-allocated us all. */
}
apr_status_t h2_conn_io_flush(h2_conn_io *io)
{
- /* make sure we always write a flush, even if our buffers are empty.
- * We want to flush not only our buffers, but alse ones further down
- * the connection filters. */
- apr_bucket *b = apr_bucket_flush_create(io->connection->bucket_alloc);
- APR_BRIGADE_INSERT_TAIL(io->output, b);
- return h2_conn_io_flush_int(io, 0, 0);
+ return h2_conn_io_flush_int(io, 1, 0);
}
apr_status_t h2_conn_io_consider_pass(h2_conn_io *io)
apr_off_t len = 0;
if (!APR_BRIGADE_EMPTY(io->output)) {
- apr_brigade_length(io->output, 0, &len);
+ len = h2_brigade_mem_size(io->output);
}
len += io->buflen;
if (len >= WRITE_BUFFER_SIZE) {
- return h2_conn_io_pass(io, 0);
+ return h2_conn_io_flush_int(io, 1, 0);
}
return APR_SUCCESS;
}
apr_status_t h2_conn_io_write_eoc(h2_conn_io *io, h2_session *session)
{
- apr_bucket *b = h2_bucket_eoc_create(io->connection->bucket_alloc, session);
- APR_BRIGADE_INSERT_TAIL(io->output, b);
- b = apr_bucket_flush_create(io->connection->bucket_alloc);
+ apr_bucket *b = h2_bucket_eoc_create(io->c->bucket_alloc, session);
APR_BRIGADE_INSERT_TAIL(io->output, b);
return h2_conn_io_flush_int(io, 0, 1);
}
apr_status_t status = APR_SUCCESS;
pass_out_ctx ctx;
- ctx.c = io->connection;
+ ctx.c = io->c;
ctx.io = io;
if (io->bufsize > 0) {
- ap_log_cerror(APLOG_MARK, APLOG_TRACE4, 0, io->connection,
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE4, 0, io->c,
"h2_conn_io: buffering %ld bytes", (long)length);
if (!APR_BRIGADE_EMPTY(io->output)) {
- status = h2_conn_io_pass(io, 0);
+ status = h2_conn_io_flush_int(io, 0, 0);
}
while (length > 0 && (status == APR_SUCCESS)) {
apr_size_t avail = io->bufsize - io->buflen;
if (avail <= 0) {
- h2_conn_io_pass(io, 0);
+ status = h2_conn_io_flush_int(io, 0, 0);
}
else if (length > avail) {
memcpy(io->buffer + io->buflen, buf, avail);
}
else {
- ap_log_cerror(APLOG_MARK, APLOG_TRACE4, status, io->connection,
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE4, status, io->c,
"h2_conn_io: writing %ld bytes to brigade", (long)length);
status = apr_brigade_write(io->output, pass_out, &ctx, buf, length);
}
* directly without copying.
*/
typedef struct {
- conn_rec *connection;
+ conn_rec *c;
apr_bucket_brigade *output;
int is_tls;
* @param io the connection io
* @param flush if a flush bucket should be appended to any output
*/
-apr_status_t h2_conn_io_pass(h2_conn_io *io, int flush);
apr_status_t h2_conn_io_flush(h2_conn_io *io);
/**
#include <http_core.h>
#include <http_log.h>
#include <http_connection.h>
+#include <http_request.h>
#include "h2_private.h"
#include "h2_h2.h"
#include "h2_task.h"
#include "h2_util.h"
-h2_io *h2_io_create(int id, apr_pool_t *pool, const h2_request *request)
+h2_io *h2_io_create(int id, apr_pool_t *pool,
+ apr_bucket_alloc_t *bucket_alloc,
+ const h2_request *request)
{
h2_io *io = apr_pcalloc(pool, sizeof(*io));
if (io) {
io->id = id;
io->pool = pool;
- io->bucket_alloc = apr_bucket_alloc_create(pool);
+ io->bucket_alloc = bucket_alloc;
io->request = h2_request_clone(pool, request);
}
return io;
}
+static void check_bbin(h2_io *io)
+{
+ if (!io->bbin) {
+ io->bbin = apr_brigade_create(io->pool, io->bucket_alloc);
+ }
+}
+
+static void check_bbout(h2_io *io)
+{
+ if (!io->bbout) {
+ io->bbout = apr_brigade_create(io->pool, io->bucket_alloc);
+ }
+}
+
+static void check_bbtmp(h2_io *io)
+{
+ if (!io->bbtmp) {
+ io->bbtmp = apr_brigade_create(io->pool, io->bucket_alloc);
+ }
+}
+
+static void append_eos(h2_io *io, apr_bucket_brigade *bb)
+{
+ APR_BRIGADE_INSERT_TAIL(bb, apr_bucket_eos_create(io->bucket_alloc));
+}
+
void h2_io_redo(h2_io *io)
{
io->worker_started = 0;
if (io->bbout) {
apr_brigade_cleanup(io->bbout);
}
- if (io->tmp) {
- apr_brigade_cleanup(io->tmp);
+ if (io->bbtmp) {
+ apr_brigade_cleanup(io->bbtmp);
}
io->started_at = io->done_at = 0;
}
}
}
-
void h2_io_rst(h2_io *io, int error)
{
io->rst_error = error;
io->eos_in = 1;
}
-int h2_io_in_has_eos_for(h2_io *io)
-{
- return io->eos_in || (io->bbin && h2_util_has_eos(io->bbin, -1));
-}
-
-int h2_io_in_has_data(h2_io *io)
-{
- return io->bbin && h2_util_bb_has_data_or_eos(io->bbin);
-}
-
int h2_io_out_has_data(h2_io *io)
{
return io->bbout && h2_util_bb_has_data_or_eos(io->bbout);
return (status == APR_SUCCESS);
}
-static apr_status_t append_eos(h2_io *io, apr_bucket_brigade *bb,
- apr_table_t *trailers)
+static apr_status_t in_append_eos(h2_io *io, apr_bucket_brigade *bb,
+ apr_table_t *trailers)
{
apr_status_t status = APR_SUCCESS;
apr_table_t *t = io->request->trailers;
status = apr_brigade_puts(bb, NULL, NULL, "0\r\n\r\n");
}
}
- APR_BRIGADE_INSERT_TAIL(bb, apr_bucket_eos_create(io->bucket_alloc));
+ append_eos(io, bb);
return status;
}
if (!io->bbin || APR_BRIGADE_EMPTY(io->bbin)) {
if (io->eos_in) {
if (!io->eos_in_written) {
- status = append_eos(io, bb, trailers);
+ status = in_append_eos(io, bb, trailers);
io->eos_in_written = 1;
return status;
}
if (io->request->chunked) {
/* the reader expects HTTP/1.1 chunked encoding */
- status = h2_util_move(io->tmp, io->bbin, maxlen, NULL, "h2_io_in_read_chunk");
+ check_bbtmp(io);
+ status = h2_util_move(io->bbtmp, io->bbin, maxlen, NULL, "h2_io_in_read_chunk");
if (status == APR_SUCCESS) {
apr_off_t tmp_len = 0;
- apr_brigade_length(io->tmp, 1, &tmp_len);
+ apr_brigade_length(io->bbtmp, 1, &tmp_len);
if (tmp_len > 0) {
io->input_consumed += tmp_len;
status = apr_brigade_printf(bb, NULL, NULL, "%lx\r\n",
(unsigned long)tmp_len);
if (status == APR_SUCCESS) {
- status = h2_util_move(bb, io->tmp, -1, NULL, "h2_io_in_read_tmp1");
+ status = h2_util_move(bb, io->bbtmp, -1, NULL, "h2_io_in_read_tmp1");
if (status == APR_SUCCESS) {
status = apr_brigade_puts(bb, NULL, NULL, "\r\n");
}
}
}
else {
- status = h2_util_move(bb, io->tmp, -1, NULL, "h2_io_in_read_tmp2");
+ status = h2_util_move(bb, io->bbtmp, -1, NULL, "h2_io_in_read_tmp2");
}
- apr_brigade_cleanup(io->tmp);
+ apr_brigade_cleanup(io->bbtmp);
}
}
else {
if (status == APR_SUCCESS && (!io->bbin || APR_BRIGADE_EMPTY(io->bbin))) {
if (io->eos_in) {
if (!io->eos_in_written) {
- status = append_eos(io, bb, trailers);
+ status = in_append_eos(io, bb, trailers);
io->eos_in_written = 1;
}
}
return status;
}
-apr_status_t h2_io_in_write(h2_io *io, apr_bucket_brigade *bb)
+apr_status_t h2_io_in_write(h2_io *io, const char *d, apr_size_t len, int eos)
{
if (io->rst_error) {
return APR_ECONNABORTED;
if (io->eos_in) {
return APR_EOF;
}
- io->eos_in = h2_util_has_eos(bb, -1);
- if (!APR_BRIGADE_EMPTY(bb)) {
- if (!io->bbin) {
- io->bbin = apr_brigade_create(io->pool, io->bucket_alloc);
- io->tmp = apr_brigade_create(io->pool, io->bucket_alloc);
- }
- return h2_util_move(io->bbin, bb, -1, NULL, "h2_io_in_write");
+ if (eos) {
+ io->eos_in = 1;
+ }
+ if (len > 0) {
+ check_bbin(io);
+ return apr_brigade_write(io->bbin, NULL, NULL, d, len);
}
return APR_SUCCESS;
}
return APR_SUCCESS;
}
-apr_status_t h2_io_out_readx(h2_io *io,
- h2_io_data_cb *cb, void *ctx,
- apr_off_t *plen, int *peos)
+static int is_out_readable(h2_io *io, apr_off_t *plen, int *peos,
+ apr_status_t *ps)
{
- apr_status_t status;
-
if (io->rst_error) {
- return APR_ECONNABORTED;
+ *ps = APR_ECONNABORTED;
+ return 0;
}
-
if (io->eos_out_read) {
*plen = 0;
*peos = 1;
- return APR_SUCCESS;
+ *ps = APR_SUCCESS;
+ return 0;
}
else if (!io->bbout) {
*plen = 0;
*peos = 0;
- return APR_EAGAIN;
+ *ps = APR_EAGAIN;
+ return 0;
+ }
+ return 1;
+}
+
+apr_status_t h2_io_out_readx(h2_io *io,
+ h2_io_data_cb *cb, void *ctx,
+ apr_off_t *plen, int *peos)
+{
+ apr_status_t status;
+ if (!is_out_readable(io, plen, peos, &status)) {
+ return status;
}
-
if (cb == NULL) {
/* just checking length available */
status = h2_util_bb_avail(io->bbout, plen, peos);
io->output_consumed += *plen;
}
}
-
return status;
}
apr_off_t *plen, int *peos)
{
apr_status_t status;
-
- if (io->rst_error) {
- return APR_ECONNABORTED;
- }
-
- if (io->eos_out_read) {
- *plen = 0;
- *peos = 1;
- return APR_SUCCESS;
- }
- else if (!io->bbout) {
- *plen = 0;
- *peos = 0;
- return APR_EAGAIN;
+ if (!is_out_readable(io, plen, peos, &status)) {
+ return status;
}
-
- io->eos_out_read = *peos = h2_util_has_eos(io->bbout, *plen);
status = h2_util_move(bb, io->bbout, *plen, NULL, "h2_io_read_to");
+ if (status == APR_SUCCESS && io->eos_out && APR_BRIGADE_EMPTY(io->bbout)) {
+ io->eos_out_read = *peos = 1;
+ }
io->output_consumed += *plen;
return status;
}
apr_size_t *pfile_buckets_allowed)
{
apr_status_t status;
+ apr_bucket *b;
int start_allowed;
if (io->rst_error) {
}
if (io->eos_out) {
- apr_off_t len;
+ apr_off_t len = 0;
/* We have already delivered an EOS bucket to a reader, no
* sense in storing anything more here.
*/
- status = apr_brigade_length(bb, 1, &len);
- if (status == APR_SUCCESS) {
- if (len > 0) {
- /* someone tries to write real data after EOS, that
- * does not look right. */
- status = APR_EOF;
- }
- /* cleanup, as if we had moved the data */
- apr_brigade_cleanup(bb);
+ apr_brigade_length(bb, 0, &len);
+ apr_brigade_cleanup(bb);
+ return (len > 0)? APR_EOF : APR_SUCCESS;
+ }
+
+ /* Filter the EOR bucket and set it aside. We prefer to tear down
+ * the request when the whole h2 stream is done */
+ for (b = APR_BRIGADE_FIRST(bb);
+ b != APR_BRIGADE_SENTINEL(bb);
+ b = APR_BUCKET_NEXT(b))
+ {
+ if (AP_BUCKET_IS_EOR(b)) {
+ APR_BUCKET_REMOVE(b);
+ io->eor = b;
+ break;
}
- return status;
- }
-
+ else if (APR_BUCKET_IS_EOS(b)) {
+ io->eos_out = 1;
+ break;
+ }
+ }
+
process_trailers(io, trailers);
- if (!io->bbout) {
- io->bbout = apr_brigade_create(io->pool, io->bucket_alloc);
- }
/* Let's move the buckets from the request processing in here, so
* that the main thread can read them when it has time/capacity.
* many open files already buffered. Otherwise we will run out of
* file handles.
*/
+ check_bbout(io);
start_allowed = *pfile_buckets_allowed;
status = h2_util_move(io->bbout, bb, maxlen, pfile_buckets_allowed,
"h2_io_out_write");
}
if (!io->eos_out_read) { /* EOS has not been read yet */
process_trailers(io, trailers);
- if (!io->bbout) {
- io->bbout = apr_brigade_create(io->pool, io->bucket_alloc);
- }
if (!io->eos_out) {
+ check_bbout(io);
io->eos_out = 1;
if (!h2_util_has_eos(io->bbout, -1)) {
- APR_BRIGADE_INSERT_TAIL(io->bbout,
- apr_bucket_eos_create(io->bucket_alloc));
+ append_eos(io, io->bbout);
}
}
}
struct h2_response *response; /* response to request */
int rst_error; /* h2 related stream abort error */
+ apr_bucket *eor; /* the EOR bucket, set aside */
+ struct h2_task *task; /* the task once started */
+
apr_bucket_brigade *bbin; /* input data for stream */
apr_bucket_brigade *bbout; /* output data from stream */
- apr_bucket_brigade *tmp; /* temporary data for chunking */
+ apr_bucket_brigade *bbtmp; /* temporary data for chunking */
unsigned int orphaned : 1; /* h2_stream is gone for this io */
unsigned int worker_started : 1; /* h2_worker started processing for this io */
/**
* Creates a new h2_io for the given stream id.
*/
-h2_io *h2_io_create(int id, apr_pool_t *pool, const struct h2_request *request);
+h2_io *h2_io_create(int id, apr_pool_t *pool,
+ apr_bucket_alloc_t *bucket_alloc,
+ const struct h2_request *request);
/**
* Set the response of this stream.
int h2_io_is_repeatable(h2_io *io);
void h2_io_redo(h2_io *io);
-/**
- * The input data is completely queued. Blocked reads will return immediately
- * and give either data or EOF.
- */
-int h2_io_in_has_eos_for(h2_io *io);
/**
* Output data is available.
*/
int h2_io_out_has_data(h2_io *io);
-/**
- * Input data is available.
- */
-int h2_io_in_has_data(h2_io *io);
void h2_io_signal(h2_io *io, h2_io_op op);
void h2_io_signal_init(h2_io *io, h2_io_op op, apr_interval_time_t timeout,
/**
* Appends given bucket to the input.
*/
-apr_status_t h2_io_in_write(h2_io *io, apr_bucket_brigade *bb);
+apr_status_t h2_io_in_write(h2_io *io, const char *d, apr_size_t len, int eos);
/**
* Closes the input. After existing data has been read, APR_EOF will
return NULL;
}
- status = apr_thread_cond_create(&m->req_added, m->pool);
+ status = apr_thread_cond_create(&m->task_thawed, m->pool);
if (status != APR_SUCCESS) {
h2_mplx_destroy(m);
return NULL;
}
+ m->bucket_alloc = apr_bucket_alloc_create(m->pool);
m->max_streams = h2_config_geti(conf, H2_CONF_MAX_STREAMS);
m->stream_max_mem = h2_config_geti(conf, H2_CONF_STREAM_MAX_MEM);
m->q = h2_iq_create(m->pool, m->max_streams);
h2_workers_register(m->workers, m);
}
-static int io_process_events(h2_mplx *m, h2_io *io)
+static int io_in_consumed_signal(h2_mplx *m, h2_io *io)
{
if (io->input_consumed && m->input_consumed) {
m->input_consumed(m->input_consumed_ctx,
return 0;
}
+static int io_out_consumed_signal(h2_mplx *m, h2_io *io)
+{
+ if (io->output_consumed && io->task && io->task->assigned) {
+ h2_req_engine_out_consumed(io->task->assigned, io->task->c,
+ io->output_consumed);
+ io->output_consumed = 0;
+ return 1;
+ }
+ return 0;
+}
+
static void io_destroy(h2_mplx *m, h2_io *io, int events)
{
- apr_pool_t *pool = io->pool;
+ apr_pool_t *pool;
/* cleanup any buffered input */
h2_io_in_shutdown(io);
if (events) {
/* Process outstanding events before destruction */
- io_process_events(m, io);
+ io_in_consumed_signal(m, io);
}
- io->pool = NULL;
/* The pool is cleared/destroyed which also closes all
* allocated file handles. Give this count back to our
* file handle pool. */
if (m->redo_ios) {
h2_io_set_remove(m->redo_ios, io);
}
-
- if (pool) {
+
+ if (io->task) {
+ if (m->spare_allocator) {
+ apr_allocator_destroy(m->spare_allocator);
+ m->spare_allocator = NULL;
+ }
+
+ h2_slave_destroy(io->task->c, &m->spare_allocator);
+ io->task = NULL;
+ }
+
+ pool = io->pool;
+ io->pool = NULL;
+ if (0 && pool) {
apr_pool_clear(pool);
if (m->spare_pool) {
apr_pool_destroy(m->spare_pool);
h2_mplx_set_consumed_cb(m, NULL, NULL);
h2_iq_clear(m->q);
- apr_thread_cond_broadcast(m->req_added);
+ apr_thread_cond_broadcast(m->task_thawed);
while (!h2_io_set_iter(m->stream_ios, stream_done_iter, m)) {
/* iterate until all ios have been orphaned or destroyed */
}
}
}
h2_mplx_abort(m);
- apr_thread_cond_broadcast(m->req_added);
+ apr_thread_cond_broadcast(m->task_thawed);
}
}
* for processing, e.g. when we received all HEADERs. But when
* a stream is cancelled very early, it will not exist. */
if (io) {
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
"h2_mplx(%ld-%d): marking stream as done.",
m->id, stream_id);
io_stream_done(m, io, rst_error);
}
apr_status_t h2_mplx_in_write(h2_mplx *m, int stream_id,
- apr_bucket_brigade *bb)
+ const char *data, apr_size_t len, int eos)
{
apr_status_t status;
int acquired;
h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
if (io && !io->orphaned) {
H2_MPLX_IO_IN(APLOG_TRACE2, m, io, "h2_mplx_in_write_pre");
- status = h2_io_in_write(io, bb);
+ status = h2_io_in_write(io, data, len, eos);
H2_MPLX_IO_IN(APLOG_TRACE2, m, io, "h2_mplx_in_write_post");
h2_io_signal(io, H2_IO_READ);
- io_process_events(m, io);
+ io_in_consumed_signal(m, io);
}
else {
status = APR_ECONNABORTED;
status = h2_io_in_close(io);
H2_MPLX_IO_IN(APLOG_TRACE2, m, io, "h2_mplx_in_close");
h2_io_signal(io, H2_IO_READ);
- io_process_events(m, io);
+ io_in_consumed_signal(m, io);
}
else {
status = APR_ECONNABORTED;
return status;
}
+void h2_mplx_set_consumed_cb(h2_mplx *m, h2_mplx_consumed_cb *cb, void *ctx)
+{
+ m->input_consumed = cb;
+ m->input_consumed_ctx = ctx;
+}
+
typedef struct {
h2_mplx * m;
int streams_updated;
static int update_window(void *ctx, h2_io *io)
{
update_ctx *uctx = (update_ctx*)ctx;
- if (io_process_events(uctx->m, io)) {
+ if (io_in_consumed_signal(uctx->m, io)) {
++uctx->streams_updated;
}
return 1;
}
-void h2_mplx_set_consumed_cb(h2_mplx *m, h2_mplx_consumed_cb *cb, void *ctx)
-{
- m->input_consumed = cb;
- m->input_consumed_ctx = ctx;
-}
-
apr_status_t h2_mplx_in_update_windows(h2_mplx *m)
{
apr_status_t status;
* shutdown input and send out any events (e.g. window
* updates) asap. */
h2_io_in_shutdown(io);
- io_process_events(m, io);
+ io_in_consumed_signal(m, io);
}
}
&& !APR_BRIGADE_EMPTY(bb)
&& !is_aborted(m, &status)) {
- status = h2_io_out_write(io, bb, m->stream_max_mem, trailers,
- &m->tx_handles_reserved);
+ status = h2_io_out_write(io, bb, blocking? m->stream_max_mem : INT_MAX,
+ trailers, &m->tx_handles_reserved);
+ io_out_consumed_signal(m, io);
+
/* Wait for data to drain until there is room again or
* stream timeout expires */
h2_io_signal_init(io, H2_IO_WRITE, m->stream_timeout, iowait);
&& (m->stream_max_mem <= h2_io_out_length(io))
&& !is_aborted(m, &status)) {
if (!blocking) {
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, f->c,
+ "h2_mplx(%ld-%d): incomplete write",
+ m->id, io->id);
return APR_INCOMPLETE;
}
trailers = NULL;
"h2_mplx(%ld-%d): close, no response, no rst",
m->id, io->id);
}
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, m->c,
- "h2_mplx(%ld-%d): close with trailers=%s",
- m->id, io->id, trailers? "yes" : "no");
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, m->c,
+ "h2_mplx(%ld-%d): close with eor=%s, trailers=%s",
+ m->id, io->id, io->eor? "yes" : "no",
+ trailers? "yes" : "no");
status = h2_io_out_close(io, trailers);
H2_MPLX_IO_OUT(APLOG_TRACE2, m, io, "h2_mplx_out_close");
+ io_out_consumed_signal(m, io);
have_out_data_for(m, stream_id);
}
return status;
}
-int h2_mplx_in_has_eos_for(h2_mplx *m, int stream_id)
-{
- int has_eos = 0;
- int acquired;
-
- apr_status_t status;
- AP_DEBUG_ASSERT(m);
- if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
- h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
- if (io && !io->orphaned) {
- has_eos = h2_io_in_has_eos_for(io);
- }
- else {
- has_eos = 1;
- }
- leave_mutex(m, acquired);
- }
- return has_eos;
-}
-
-int h2_mplx_in_has_data_for(h2_mplx *m, int stream_id)
-{
- apr_status_t status;
- int has_data = 0;
- int acquired;
-
- AP_DEBUG_ASSERT(m);
- if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
- h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
- if (io && !io->orphaned) {
- has_data = h2_io_in_has_data(io);
- }
- else {
- has_data = 0;
- }
- leave_mutex(m, acquired);
- }
- return has_data;
-}
-
int h2_mplx_out_has_data_for(h2_mplx *m, int stream_id)
{
apr_status_t status;
m->spare_pool = NULL;
}
- io = h2_io_create(stream_id, io_pool, request);
+ io = h2_io_create(stream_id, io_pool, m->bucket_alloc, request);
h2_io_set_add(m->stream_ios, io);
return io;
else if (io) {
conn_rec *slave = h2_slave_create(m->c, m->pool, m->spare_allocator);
m->spare_allocator = NULL;
- task = h2_task_create(m->id, io->request, slave, m);
+ io->task = task = h2_task_create(m->id, io->request, slave, m);
apr_table_setn(slave->notes, H2_TASK_ID_NOTE, task->id);
io->worker_started = 1;
io->started_at = apr_time_now();
return task;
}
-static void task_done(h2_mplx *m, h2_task *task)
+static void task_done(h2_mplx *m, h2_task *task, h2_req_engine *ngn)
{
if (task) {
if (task->frozen) {
* bodies into the mplx. */
/* FIXME: this implementation is incomplete. */
h2_task_set_io_blocking(task, 0);
- apr_thread_cond_broadcast(m->req_added);
+ apr_thread_cond_broadcast(m->task_thawed);
}
else {
h2_io *io = h2_io_set_get(m->stream_ios, task->stream_id);
* other mplx's. Perhaps leave after n requests? */
h2_mplx_out_close(m, task->stream_id, NULL);
+ if (ngn && io) {
+ apr_off_t bytes = io->output_consumed + h2_io_out_length(io);
+ if (bytes > 0) {
+ /* we need to report consumed and current buffered output
+ * to the engine. The request will be streamed out or cancelled,
+ * no more data is coming from it and the engine should update
+ * its calculations before we destroy this information. */
+ h2_req_engine_out_consumed(ngn, task->c, bytes);
+ io->output_consumed = 0;
+ }
+ }
+
if (task->engine) {
if (!h2_req_engine_is_shutdown(task->engine)) {
ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c,
h2_ngn_shed_done_ngn(m->ngn_shed, task->engine);
}
- if (m->spare_allocator) {
- apr_allocator_destroy(m->spare_allocator);
- m->spare_allocator = NULL;
- }
-
- h2_slave_destroy(task->c, &m->spare_allocator);
- task = NULL;
-
if (io) {
apr_time_t now = apr_time_now();
if (!io->orphaned && m->redo_ios
}
}
else {
- /* hang around until the stream deregisteres */
+ /* hang around until the stream deregisters */
}
}
+ else {
+ ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c,
+ "h2_mplx(%ld): task %s without corresp. h2_io",
+ m->id, task->id);
+ }
}
}
}
int acquired;
if (enter_mutex(m, &acquired) == APR_SUCCESS) {
- task_done(m, task);
+ task_done(m, task, NULL);
--m->workers_busy;
if (ptask) {
/* caller wants another task */
* HTTP/2 request engines
******************************************************************************/
+typedef struct {
+ h2_mplx * m;
+ h2_req_engine *ngn;
+ int streams_updated;
+} ngn_update_ctx;
+
+static int ngn_update_window(void *ctx, h2_io *io)
+{
+ ngn_update_ctx *uctx = ctx;
+ if (io && io->task && io->task->assigned == uctx->ngn
+ && io_out_consumed_signal(uctx->m, io)) {
+ ++uctx->streams_updated;
+ }
+ return 1;
+}
+
+static apr_status_t ngn_out_update_windows(h2_mplx *m, h2_req_engine *ngn)
+{
+ ngn_update_ctx ctx;
+
+ ctx.m = m;
+ ctx.ngn = ngn;
+ ctx.streams_updated = 0;
+ h2_io_set_iter(m->stream_ios, ngn_update_window, &ctx);
+
+ return ctx.streams_updated? APR_SUCCESS : APR_EAGAIN;
+}
+
apr_status_t h2_mplx_req_engine_push(const char *ngn_type,
- request_rec *r, h2_req_engine_init *einit)
+ request_rec *r,
+ http2_req_engine_init *einit)
{
apr_status_t status;
h2_mplx *m;
return APR_ECONNABORTED;
}
m = task->mplx;
+ task->r = r;
if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
h2_io *io = h2_io_set_get(m->stream_ios, task->stream_id);
status = APR_ECONNABORTED;
}
else {
- status = h2_ngn_shed_push_req(m->ngn_shed, ngn_type,
- task, r, einit);
+ status = h2_ngn_shed_push_task(m->ngn_shed, ngn_type, task, einit);
}
leave_mutex(m, acquired);
}
h2_ngn_shed *shed = h2_ngn_shed_get_shed(ngn);
h2_mplx *m = h2_ngn_shed_get_ctx(shed);
apr_status_t status;
+ h2_task *task = NULL;
int acquired;
- *pr = NULL;
if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
int want_shutdown = (block == APR_BLOCK_READ);
+
+ /* Take this opportunity to update output consummation
+ * for this engine */
+ ngn_out_update_windows(m, ngn);
+
if (want_shutdown && !h2_iq_empty(m->q)) {
/* For a blocking read, check first if requests are to be
* had and, if not, wait a short while before doing the
* blocking, and if unsuccessful, terminating read.
*/
- status = h2_ngn_shed_pull_req(shed, ngn, capacity, 1, pr);
+ status = h2_ngn_shed_pull_task(shed, ngn, capacity, 1, &task);
if (APR_STATUS_IS_EAGAIN(status)) {
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
"h2_mplx(%ld): start block engine pull", m->id);
- apr_thread_cond_timedwait(m->req_added, m->lock,
+ apr_thread_cond_timedwait(m->task_thawed, m->lock,
apr_time_from_msec(20));
- status = h2_ngn_shed_pull_req(shed, ngn, capacity, 1, pr);
+ status = h2_ngn_shed_pull_task(shed, ngn, capacity, 1, &task);
}
}
else {
- status = h2_ngn_shed_pull_req(shed, ngn, capacity, want_shutdown, pr);
+ status = h2_ngn_shed_pull_task(shed, ngn, capacity,
+ want_shutdown, &task);
}
leave_mutex(m, acquired);
}
+ *pr = task? task->r : NULL;
return status;
}
int acquired;
if (enter_mutex(m, &acquired) == APR_SUCCESS) {
+ ngn_out_update_windows(m, ngn);
h2_ngn_shed_done_task(m->ngn_shed, ngn, task);
if (task->engine) {
/* cannot report that as done until engine returns */
}
else {
- h2_task_output_close(task->output);
- task_done(m, task);
+ task_done(m, task, ngn);
}
+ /* Take this opportunity to update output consummation
+ * for this engine */
leave_mutex(m, acquired);
}
}
volatile int refs;
conn_rec *c;
apr_pool_t *pool;
+ apr_bucket_alloc_t *bucket_alloc;
unsigned int aborted : 1;
unsigned int need_registration : 1;
apr_thread_mutex_t *lock;
struct apr_thread_cond_t *added_output;
- struct apr_thread_cond_t *req_added;
+ struct apr_thread_cond_t *task_thawed;
struct apr_thread_cond_t *join_wait;
apr_size_t stream_max_mem;
*/
int h2_mplx_out_has_data_for(h2_mplx *m, int stream_id);
-/* Return != 0 iff the multiplexer has input data for the given stream.
- */
-int h2_mplx_in_has_data_for(h2_mplx *m, int stream_id);
-
/**
* Waits on output data from any stream in this session to become available.
* Returns APR_TIMEUP if no data arrived in the given time.
* Appends data to the input of the given stream. Storage of input data is
* not subject to flow control.
*/
-apr_status_t h2_mplx_in_write(h2_mplx *mplx, int stream_id,
- apr_bucket_brigade *bb);
+apr_status_t h2_mplx_in_write(h2_mplx *m, int stream_id,
+ const char *data, apr_size_t len, int eos);
/**
* Closes the input for the given stream_id.
*/
apr_status_t h2_mplx_in_close(h2_mplx *m, int stream_id);
-/**
- * Returns != 0 iff the input for the given stream has been closed. There
- * could still be data queued, but it can be read without blocking.
- */
-int h2_mplx_in_has_eos_for(h2_mplx *m, int stream_id);
-
/**
* Invoke the consumed callback for all streams that had bytes read since the
* last call to this function. If no stream had input data consumed, the
* h2_req_engine handling
******************************************************************************/
+typedef void h2_output_consumed(void *ctx, conn_rec *c, apr_off_t consumed);
typedef apr_status_t h2_mplx_req_engine_init(struct h2_req_engine *engine,
const char *id,
const char *type,
apr_pool_t *pool,
apr_uint32_t req_buffer_size,
- request_rec *r);
+ request_rec *r,
+ h2_output_consumed **pconsumed,
+ void **pbaton);
apr_status_t h2_mplx_req_engine_push(const char *ngn_type,
request_rec *r,
#include "h2_ctx.h"
#include "h2_h2.h"
#include "h2_int_queue.h"
+#include "h2_mplx.h"
#include "h2_response.h"
#include "h2_request.h"
#include "h2_task.h"
struct h2_ngn_entry {
APR_RING_ENTRY(h2_ngn_entry) link;
h2_task *task;
- request_rec *r;
};
#define H2_NGN_ENTRY_NEXT(e) APR_RING_NEXT((e), link)
apr_uint32_t no_assigned; /* # of assigned requests */
apr_uint32_t no_live; /* # of live */
apr_uint32_t no_finished; /* # of finished */
+
+ h2_output_consumed *out_consumed;
+ void *out_consumed_ctx;
};
const char *h2_req_engine_get_id(h2_req_engine *engine)
return engine->shutdown;
}
+void h2_req_engine_out_consumed(h2_req_engine *engine, conn_rec *c,
+ apr_off_t bytes)
+{
+ if (engine->out_consumed) {
+ engine->out_consumed(engine->out_consumed_ctx, c, bytes);
+ }
+}
+
h2_ngn_shed *h2_ngn_shed_create(apr_pool_t *pool, conn_rec *c,
apr_uint32_t default_capacity,
apr_uint32_t req_buffer_size)
shed->aborted = 1;
}
-static void ngn_add_req(h2_req_engine *ngn, h2_task *task, request_rec *r)
+static void ngn_add_task(h2_req_engine *ngn, h2_task *task)
{
h2_ngn_entry *entry = apr_pcalloc(task->c->pool, sizeof(*entry));
APR_RING_ELEM_INIT(entry, link);
entry->task = task;
- entry->r = r;
H2_REQ_ENTRIES_INSERT_TAIL(&ngn->entries, entry);
}
-apr_status_t h2_ngn_shed_push_req(h2_ngn_shed *shed, const char *ngn_type,
- h2_task *task, request_rec *r,
- h2_req_engine_init *einit){
+apr_status_t h2_ngn_shed_push_task(h2_ngn_shed *shed, const char *ngn_type,
+ h2_task *task, http2_req_engine_init *einit)
+{
h2_req_engine *ngn;
AP_DEBUG_ASSERT(shed);
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, shed->c,
"h2_ngn_shed(%ld): PUSHing request (task=%s)", shed->c->id,
- apr_table_get(r->connection->notes, H2_TASK_ID_NOTE));
+ task->id);
if (task->ser_headers) {
/* Max compatibility, deny processing of this */
return APR_EOF;
"h2_ngn_shed(%ld): pushing request %s to %s",
shed->c->id, task->id, ngn->id);
if (!h2_task_is_detached(task)) {
- h2_task_freeze(task, r);
+ h2_task_freeze(task);
}
/* FIXME: sometimes ngn is garbage, probly alread freed */
- ngn_add_req(ngn, task, r);
+ ngn_add_task(ngn, task);
ngn->no_assigned++;
return APR_SUCCESS;
}
APR_RING_INIT(&newngn->entries, h2_ngn_entry, link);
status = einit(newngn, newngn->id, newngn->type, newngn->pool,
- shed->req_buffer_size, r);
+ shed->req_buffer_size, task->r,
+ &newngn->out_consumed, &newngn->out_consumed_ctx);
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, task->c,
"h2_ngn_shed(%ld): create engine %s (%s)",
shed->c->id, newngn->id, newngn->type);
AP_DEBUG_ASSERT(task->engine == NULL);
newngn->task = task;
task->engine = newngn;
+ task->assigned = newngn;
apr_hash_set(shed->ngns, newngn->type, APR_HASH_KEY_STRING, newngn);
}
return status;
return APR_EOF;
}
-static h2_ngn_entry *pop_non_frozen(h2_req_engine *ngn)
+static h2_ngn_entry *pop_detached(h2_req_engine *ngn)
{
h2_ngn_entry *entry;
for (entry = H2_REQ_ENTRIES_FIRST(&ngn->entries);
entry != H2_REQ_ENTRIES_SENTINEL(&ngn->entries);
entry = H2_NGN_ENTRY_NEXT(entry)) {
- if (!entry->task->frozen) {
+ if (h2_task_is_detached(entry->task)
+ || (entry->task->engine == ngn)) {
+ /* The task hosting this engine can always be pulled by it.
+ * For other task, they need to become detached, e.g. no longer
+ * assigned to another worker. */
H2_NGN_ENTRY_REMOVE(entry);
return entry;
}
return NULL;
}
-apr_status_t h2_ngn_shed_pull_req(h2_ngn_shed *shed,
- h2_req_engine *ngn,
- apr_uint32_t capacity,
- int want_shutdown,
- request_rec **pr)
+apr_status_t h2_ngn_shed_pull_task(h2_ngn_shed *shed,
+ h2_req_engine *ngn,
+ apr_uint32_t capacity,
+ int want_shutdown,
+ h2_task **ptask)
{
h2_ngn_entry *entry;
AP_DEBUG_ASSERT(ngn);
- *pr = NULL;
+ *ptask = NULL;
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, shed->c,
+ "h2_ngn_shed(%ld): pull task for engine %s, shutdown=%d",
+ shed->c->id, ngn->id, want_shutdown);
if (shed->aborted) {
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, shed->c,
"h2_ngn_shed(%ld): abort while pulling requests %s",
return ngn->shutdown? APR_EOF : APR_EAGAIN;
}
- if ((entry = pop_non_frozen(ngn))) {
+ if ((entry = pop_detached(ngn))) {
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, entry->task->c,
"h2_ngn_shed(%ld): pulled request %s for engine %s",
shed->c->id, entry->task->id, ngn->id);
ngn->no_live++;
- *pr = entry->r;
+ *ptask = entry->task;
+ entry->task->assigned = ngn;
return APR_SUCCESS;
}
+
+ if (1) {
+ h2_ngn_entry *entry = H2_REQ_ENTRIES_FIRST(&ngn->entries);
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, shed->c,
+ "h2_ngn_shed(%ld): pull task, nothing, first task %s",
+ shed->c->id, entry->task->id);
+ }
return APR_EAGAIN;
}
static apr_status_t ngn_done_task(h2_ngn_shed *shed, h2_req_engine *ngn,
- h2_task *task, int waslive, int aborted,
- int close)
+ h2_task *task, int waslive, int aborted)
{
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, shed->c,
"h2_ngn_shed(%ld): task %s %s by %s",
if (waslive) ngn->no_live--;
ngn->no_assigned--;
- if (close) {
- h2_task_output_close(task->output);
- }
return APR_SUCCESS;
}
apr_status_t h2_ngn_shed_done_task(h2_ngn_shed *shed,
struct h2_req_engine *ngn, h2_task *task)
{
- return ngn_done_task(shed, ngn, task, 1, 0, 0);
+ return ngn_done_task(shed, ngn, task, 1, 0);
}
void h2_ngn_shed_done_ngn(h2_ngn_shed *shed, struct h2_req_engine *ngn)
for (entry = H2_REQ_ENTRIES_FIRST(&ngn->entries);
entry != H2_REQ_ENTRIES_SENTINEL(&ngn->entries);
entry = H2_NGN_ENTRY_NEXT(entry)) {
- request_rec *r = entry->r;
- h2_task *task = h2_ctx_rget_task(r);
+ h2_task *task = entry->task;
ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, shed->c,
"h2_ngn_shed(%ld): engine %s has queued task %s, "
"frozen=%d, aborting",
shed->c->id, ngn->id, task->id, task->frozen);
- ngn_done_task(shed, ngn, task, 0, 1, 1);
+ ngn_done_task(shed, ngn, task, 0, 1);
}
}
if (!shed->aborted && (ngn->no_assigned > 1 || ngn->no_live > 1)) {
const char *h2_req_engine_get_id(h2_req_engine *engine);
int h2_req_engine_is_shutdown(h2_req_engine *engine);
+void h2_req_engine_out_consumed(h2_req_engine *engine, conn_rec *c,
+ apr_off_t bytes);
+
typedef apr_status_t h2_shed_ngn_init(h2_req_engine *engine,
const char *id,
const char *type,
apr_pool_t *pool,
apr_uint32_t req_buffer_size,
- request_rec *r);
+ request_rec *r,
+ h2_output_consumed **pconsumed,
+ void **pbaton);
h2_ngn_shed *h2_ngn_shed_create(apr_pool_t *pool, conn_rec *c,
apr_uint32_t default_capactiy,
void h2_ngn_shed_abort(h2_ngn_shed *shed);
-apr_status_t h2_ngn_shed_push_req(h2_ngn_shed *shed, const char *ngn_type,
- struct h2_task *task, request_rec *r,
+apr_status_t h2_ngn_shed_push_task(h2_ngn_shed *shed, const char *ngn_type,
+ struct h2_task *task,
h2_shed_ngn_init *init_cb);
-apr_status_t h2_ngn_shed_pull_req(h2_ngn_shed *shed, h2_req_engine *pub_ngn,
- apr_uint32_t capacity,
- int want_shutdown, request_rec **pr);
+apr_status_t h2_ngn_shed_pull_task(h2_ngn_shed *shed, h2_req_engine *pub_ngn,
+ apr_uint32_t capacity,
+ int want_shutdown, struct h2_task **ptask);
apr_status_t h2_ngn_shed_done_task(h2_ngn_shed *shed,
struct h2_req_engine *ngn,
}
return 0;
}
-
- status = h2_stream_write_data(stream, (const char *)data, len);
+
+ /* FIXME: enabling setting EOS this way seems to break input handling
+ * in mod_proxy_http2. why? */
+ status = h2_stream_write_data(stream, (const char *)data, len,
+ 0 /*flags & NGHTTP2_FLAG_END_STREAM*/);
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, session->c,
"h2_stream(%ld-%d): data_chunk_recv, written %ld bytes",
session->id, stream_id, (long)len);
h2_mplx_get_max_stream_started(session->mplx),
reason, (uint8_t*)err, err? strlen(err):0);
status = nghttp2_session_send(session->ngh2);
- h2_conn_io_pass(&session->io, 1);
+ if (status == APR_SUCCESS) {
+ status = h2_conn_io_flush(&session->io);
+ }
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03069)
"session(%ld): sent GOAWAY, err=%d, msg=%s",
session->id, reason, err? err : "");
}
}
- h2_conn_io_pass(&session->io, 1);
return status;
}
{
apr_pool_t *pool = h2_stream_detach_pool(stream);
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c,
+ "h2_stream(%ld-%d): cleanup by EOS bucket destroy",
+ session->id, stream->id);
/* this may be called while the session has already freed
* some internal structures or even when the mplx is locked. */
if (session->mplx) {
static void h2_session_ev_local_goaway(h2_session *session, int arg, const char *msg)
{
+ session->local_shutdown = 1;
switch (session->state) {
case H2_SESSION_ST_LOCAL_SHUTDOWN:
/* already did that? */
static void update_child_status(h2_session *session, int status, const char *msg)
{
- apr_snprintf(session->status, sizeof(session->status),
- "%s, streams: %d/%d/%d/%d/%d (open/recv/resp/push/rst)",
- msg? msg : "-",
- (int)h2_ihash_count(session->streams),
- (int)session->requests_received,
- (int)session->responses_submitted,
- (int)session->pushes_submitted,
- (int)session->pushes_reset + session->streams_reset);
- ap_update_child_status_descr(session->c->sbh, status, session->status);
+ /* Assume that we also change code/msg when something really happened and
+ * avoid updating the scoreboard in between */
+ if (session->last_status_code != status
+ || session->last_status_msg != msg) {
+ apr_snprintf(session->status, sizeof(session->status),
+ "%s, streams: %d/%d/%d/%d/%d (open/recv/resp/push/rst)",
+ msg? msg : "-",
+ (int)h2_ihash_count(session->streams),
+ (int)session->requests_received,
+ (int)session->responses_submitted,
+ (int)session->pushes_submitted,
+ (int)session->pushes_reset + session->streams_reset);
+ ap_update_child_status_descr(session->c->sbh, status, session->status);
+ }
}
apr_status_t h2_session_process(h2_session *session, int async)
update_child_status(session, (no_streams? SERVER_BUSY_KEEPALIVE
: SERVER_BUSY_READ), "idle");
/* make certain, the client receives everything before we idle */
- h2_conn_io_flush(&session->io);
if (!session->keep_sync_until
&& async && no_streams && !session->r && session->requests_received) {
ap_log_cerror( APLOG_MARK, APLOG_TRACE1, status, c,
case H2_SESSION_ST_LOCAL_SHUTDOWN:
case H2_SESSION_ST_REMOTE_SHUTDOWN:
if (nghttp2_session_want_read(session->ngh2)) {
+ ap_update_child_status(session->c->sbh, SERVER_BUSY_READ, NULL);
h2_filter_cin_timeout_set(session->cin, session->s->timeout);
status = h2_session_read(session, 0);
if (status == APR_SUCCESS) {
have_read = 1;
+ update_child_status(session, SERVER_BUSY_READ, "busy");
dispatch_event(session, H2_SESSION_EV_DATA_READ, 0, NULL);
}
else if (status == APR_EAGAIN) {
}
}
- while (nghttp2_session_want_write(session->ngh2)) {
+ if (nghttp2_session_want_write(session->ngh2)) {
ap_update_child_status(session->c->sbh, SERVER_BUSY_WRITE, NULL);
status = h2_session_send(session);
if (status == APR_SUCCESS) {
if (have_read || have_written) {
if (session->wait_us) {
session->wait_us = 0;
- update_child_status(session, SERVER_BUSY_READ, "busy");
}
}
else if (!nghttp2_session_want_write(session->ngh2)) {
"h2_session: wait for data, %ld micros",
(long)session->wait_us);
}
- /* make certain, the client receives everything before we idle */
- h2_conn_io_flush(&session->io);
status = h2_mplx_out_trywait(session->mplx, session->wait_us,
session->iowait);
if (status == APR_SUCCESS) {
}
else if (status == APR_TIMEUP) {
/* go back to checking all inputs again */
- transit(session, "wait cycle", H2_SESSION_ST_BUSY);
+ transit(session, "wait cycle", session->local_shutdown?
+ H2_SESSION_ST_LOCAL_SHUTDOWN : H2_SESSION_ST_BUSY);
}
else {
ap_log_cerror(APLOG_MARK, APLOG_WARNING, status, c,
break;
}
- h2_conn_io_pass(&session->io, 1);
+ status = h2_conn_io_flush(&session->io);
+ if (status != APR_SUCCESS) {
+ dispatch_event(session, H2_SESSION_EV_CONN_ERROR, 0, NULL);
+ }
if (!nghttp2_session_want_read(session->ngh2)
&& !nghttp2_session_want_write(session->ngh2)) {
dispatch_event(session, H2_SESSION_EV_NGH2_DONE, 0, NULL);
unsigned int reprioritize : 1; /* scheduled streams priority changed */
unsigned int eoc_written : 1; /* h2 eoc bucket written */
unsigned int flush : 1; /* flushing output necessary */
+ unsigned int local_shutdown: 1; /* GOAWAY has been sent by us */
apr_interval_time_t wait_us; /* timout during BUSY_WAIT state, micro secs */
int unsent_submits; /* number of submitted, but not yet written responses. */
struct h2_push_diary *push_diary; /* remember pushes, avoid duplicates */
char status[64]; /* status message for scoreboard */
+ int last_status_code; /* the one already reported */
+ const char *last_status_msg; /* the one already reported */
} h2_session;
#include "h2_util.h"
-#define H2_STREAM_IN(lvl,s,msg) \
- do { \
- if (APLOG_C_IS_LEVEL((s)->session->c,lvl)) \
- h2_util_bb_log((s)->session->c,(s)->id,lvl,msg,(s)->bbin); \
- } while(0)
-
-
static int state_transition[][7] = {
/* ID OP RL RR CI CO CL */
/*ID*/{ 1, 0, 0, 0, 0, 0, 0 },
static h2_sos *h2_sos_mplx_create(h2_stream *stream, h2_response *response);
-h2_stream *h2_stream_create(int id, apr_pool_t *pool, h2_session *session)
+h2_stream *h2_stream_open(int id, apr_pool_t *pool, h2_session *session)
{
h2_stream *stream = apr_pcalloc(pool, sizeof(h2_stream));
stream->id = id;
stream->state = H2_STREAM_ST_IDLE;
stream->pool = pool;
stream->session = session;
- return stream;
-}
-
-h2_stream *h2_stream_open(int id, apr_pool_t *pool, h2_session *session)
-{
- h2_stream *stream = h2_stream_create(id, pool, session);
set_state(stream, H2_STREAM_ST_OPEN);
stream->request = h2_request_create(id, pool,
h2_config_geti(session->config, H2_CONF_SER_HEADERS));
if (status == APR_SUCCESS) {
if (!eos) {
stream->request->body = 1;
- stream->bbin = apr_brigade_create(stream->pool,
- stream->session->c->bucket_alloc);
}
stream->input_remaining = stream->request->content_length;
return stream->scheduled;
}
-static apr_status_t h2_stream_input_flush(h2_stream *stream)
-{
- apr_status_t status = APR_SUCCESS;
- if (stream->bbin && !APR_BRIGADE_EMPTY(stream->bbin)) {
-
- status = h2_mplx_in_write(stream->session->mplx, stream->id, stream->bbin);
- if (status != APR_SUCCESS) {
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, stream->session->mplx->c,
- "h2_stream(%ld-%d): flushing input data",
- stream->session->id, stream->id);
- }
- }
- return status;
-}
-
-static apr_status_t input_flush(apr_bucket_brigade *bb, void *ctx)
-{
- (void)bb;
- return h2_stream_input_flush(ctx);
-}
-
-static apr_status_t input_add_data(h2_stream *stream,
- const char *data, size_t len)
-{
- return apr_brigade_write(stream->bbin, input_flush, stream, data, len);
-}
-
apr_status_t h2_stream_close_input(h2_stream *stream)
{
apr_status_t status = APR_SUCCESS;
return APR_ECONNRESET;
}
- H2_STREAM_IN(APLOG_TRACE2, stream, "close_pre");
- if (close_input(stream) && stream->bbin) {
- status = h2_stream_input_flush(stream);
- if (status == APR_SUCCESS) {
- status = h2_mplx_in_close(stream->session->mplx, stream->id);
- }
+ if (close_input(stream)) {
+ status = h2_mplx_in_close(stream->session->mplx, stream->id);
}
- H2_STREAM_IN(APLOG_TRACE2, stream, "close_post");
return status;
}
apr_status_t h2_stream_write_data(h2_stream *stream,
- const char *data, size_t len)
+ const char *data, size_t len, int eos)
{
apr_status_t status = APR_SUCCESS;
AP_DEBUG_ASSERT(stream);
- if (input_closed(stream) || !stream->request->eoh || !stream->bbin) {
+ if (input_closed(stream) || !stream->request->eoh) {
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c,
- "h2_stream(%ld-%d): writing denied, closed=%d, eoh=%d, bbin=%d",
+ "h2_stream(%ld-%d): writing denied, closed=%d, eoh=%d",
stream->session->id, stream->id, input_closed(stream),
- stream->request->eoh, !!stream->bbin);
+ stream->request->eoh);
return APR_EINVAL;
}
"h2_stream(%ld-%d): add %ld input bytes",
stream->session->id, stream->id, (long)len);
- H2_STREAM_IN(APLOG_TRACE2, stream, "write_data_pre");
if (!stream->request->chunked) {
stream->input_remaining -= len;
if (stream->input_remaining < 0) {
}
}
- status = input_add_data(stream, data, len);
- if (status == APR_SUCCESS) {
- status = h2_stream_input_flush(stream);
+ status = h2_mplx_in_write(stream->session->mplx, stream->id, data, len, eos);
+ if (eos) {
+ close_input(stream);
}
- H2_STREAM_IN(APLOG_TRACE2, stream, "write_data_post");
return status;
}
unsigned int submitted : 1; /* response HEADER has been sent */
apr_off_t input_remaining; /* remaining bytes on input as advertised via content-length */
- apr_bucket_brigade *bbin; /* input DATA */
struct h2_sos *sos; /* stream output source, e.g. to read output from */
apr_off_t data_frames_sent; /* # of DATA frames sent out for this stream */
#define H2_STREAM_RST(s, def) (s->rst_error? s->rst_error : (def))
-/**
- * Create a stream in IDLE state.
- * @param id the stream identifier
- * @param pool the memory pool to use for this stream
- * @param session the session this stream belongs to
- * @return the newly created IDLE stream
- */
-h2_stream *h2_stream_create(int id, apr_pool_t *pool, struct h2_session *session);
-
/**
* Create a stream in OPEN state.
* @param id the stream identifier
* @param len the number of bytes to write
*/
apr_status_t h2_stream_write_data(h2_stream *stream,
- const char *data, size_t len);
+ const char *data, size_t len, int eos);
/**
* Reset the stream. Stream write/reads will return errors afterwards.
return h2_from_h1_read_response(task->output->from_h1, f, bb);
}
-static apr_status_t h2_response_freeze_filter(ap_filter_t* f,
- apr_bucket_brigade* bb)
-{
- h2_task *task = f->ctx;
- AP_DEBUG_ASSERT(task);
-
- if (task->frozen) {
- ap_log_rerror(APLOG_MARK, APLOG_TRACE2, 0, f->r,
- "h2_response_freeze_filter, saving");
- return ap_save_brigade(f, &task->output->frozen_bb, &bb, task->c->pool);
- }
-
- if (APR_BRIGADE_EMPTY(bb)) {
- return APR_SUCCESS;
- }
-
- ap_log_rerror(APLOG_MARK, APLOG_TRACE2, 0, f->r,
- "h2_response_freeze_filter, passing");
- return ap_pass_brigade(f->next, bb);
-}
-
/*******************************************************************************
* Register various hooks
*/
NULL, AP_FTYPE_PROTOCOL);
ap_register_output_filter("H2_TRAILERS", h2_response_trailers_filter,
NULL, AP_FTYPE_PROTOCOL);
- ap_register_output_filter("H2_RESPONSE_FREEZE", h2_response_freeze_filter,
- NULL, AP_FTYPE_RESOURCE);
}
/* post config init */
return DECLINED;
}
-apr_status_t h2_task_freeze(h2_task *task, request_rec *r)
+apr_status_t h2_task_freeze(h2_task *task)
{
if (!task->frozen) {
- conn_rec *c = task->c;
-
task->frozen = 1;
- task->output->frozen_bb = apr_brigade_create(c->pool, c->bucket_alloc);
- ap_add_output_filter("H2_RESPONSE_FREEZE", task, r, r->connection);
- ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, c,
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, task->c,
"h2_task(%s), frozen", task->id);
}
return APR_SUCCESS;
struct h2_task_output *output;
struct apr_thread_cond_t *io; /* used to wait for events on */
- struct h2_req_engine *engine;
+ struct h2_req_engine *engine; /* engine hosted by this task */
+ struct h2_req_engine *assigned; /* engine that task has been assigned to */
+ request_rec *r; /* request being processed in this task */
};
h2_task *h2_task_create(long session_id, const struct h2_request *req,
extern APR_OPTIONAL_FN_TYPE(ap_logio_add_bytes_in) *h2_task_logio_add_bytes_in;
extern APR_OPTIONAL_FN_TYPE(ap_logio_add_bytes_out) *h2_task_logio_add_bytes_out;
-apr_status_t h2_task_freeze(h2_task *task, request_rec *r);
+apr_status_t h2_task_freeze(h2_task *task);
apr_status_t h2_task_thaw(h2_task *task);
int h2_task_is_detached(h2_task *task);
output->task = task;
output->state = H2_TASK_OUT_INIT;
output->from_h1 = h2_from_h1_create(task->stream_id, c->pool);
- if (!output->from_h1) {
- return NULL;
- }
}
return output;
}
return NULL;
}
-static apr_status_t open_if_needed(h2_task_output *output, ap_filter_t *f,
- apr_bucket_brigade *bb, const char *caller)
+static apr_status_t open_response(h2_task_output *output, ap_filter_t *f,
+ apr_bucket_brigade *bb, const char *caller)
{
- if (output->state == H2_TASK_OUT_INIT) {
- h2_response *response;
- output->state = H2_TASK_OUT_STARTED;
- response = h2_from_h1_get_response(output->from_h1);
- if (!response) {
- if (f) {
- /* This happens currently when ap_die(status, r) is invoked
- * by a read request filter. */
- ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, output->c, APLOGNO(03204)
- "h2_task_output(%s): write without response by %s "
- "for %s %s %s",
- output->task->id, caller,
- output->task->request->method,
- output->task->request->authority,
- output->task->request->path);
- output->c->aborted = 1;
- }
- if (output->task->io) {
- apr_thread_cond_broadcast(output->task->io);
- }
- return APR_ECONNABORTED;
+ h2_response *response;
+ response = h2_from_h1_get_response(output->from_h1);
+ if (!response) {
+ if (f) {
+ /* This happens currently when ap_die(status, r) is invoked
+ * by a read request filter. */
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, output->c, APLOGNO(03204)
+ "h2_task_output(%s): write without response by %s "
+ "for %s %s %s",
+ output->task->id, caller,
+ output->task->request->method,
+ output->task->request->authority,
+ output->task->request->path);
+ output->c->aborted = 1;
}
-
- if (h2_task_logio_add_bytes_out) {
- /* counter headers as if we'd do a HTTP/1.1 serialization */
- output->written = h2_util_table_bytes(response->headers, 3)+1;
- h2_task_logio_add_bytes_out(output->c, output->written);
+ if (output->task->io) {
+ apr_thread_cond_broadcast(output->task->io);
}
- get_trailers(output);
- ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, output->c, APLOGNO(03348)
- "h2_task(%s): open response to %s %s %s",
- output->task->id, output->task->request->method,
- output->task->request->authority,
- output->task->request->path);
- return h2_mplx_out_open(output->task->mplx, output->task->stream_id,
- response, f, bb, output->task->io);
+ return APR_ECONNABORTED;
+ }
+
+ if (h2_task_logio_add_bytes_out) {
+ /* count headers as if we'd do a HTTP/1.1 serialization */
+ output->written = h2_util_table_bytes(response->headers, 3)+1;
+ h2_task_logio_add_bytes_out(output->c, output->written);
}
- return APR_SUCCESS;
+ get_trailers(output);
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, output->c, APLOGNO(03348)
+ "h2_task(%s): open response to %s %s %s",
+ output->task->id, output->task->request->method,
+ output->task->request->authority,
+ output->task->request->path);
+ return h2_mplx_out_open(output->task->mplx, output->task->stream_id,
+ response, f, bb, output->task->io);
}
static apr_status_t write_brigade_raw(h2_task_output *output,
apr_status_t h2_task_output_write(h2_task_output *output,
ap_filter_t* f, apr_bucket_brigade* bb)
{
- apr_status_t status;
+ apr_status_t status = APR_SUCCESS;
if (APR_BRIGADE_EMPTY(bb)) {
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, output->c,
if (output->task->frozen) {
h2_util_bb_log(output->c, output->task->stream_id, APLOG_TRACE2,
- "frozen task output write", bb);
- return ap_save_brigade(f, &output->frozen_bb, &bb, output->c->pool);
+ "frozen task output write, ignored", bb);
+ return APR_SUCCESS;
}
- status = open_if_needed(output, f, bb, "write");
+ if (output->state == H2_TASK_OUT_INIT) {
+ status = open_response(output, f, bb, "write");
+ output->state = H2_TASK_OUT_STARTED;
+ }
/* Attempt to write saved brigade first */
- if (status == APR_SUCCESS && output->bb
- && !APR_BRIGADE_EMPTY(output->bb)) {
+ if (status == APR_SUCCESS && output->bb && !APR_BRIGADE_EMPTY(output->bb)) {
status = write_brigade_raw(output, f, output->bb);
}
return status;
}
-void h2_task_output_close(h2_task_output *output)
-{
- if (output->task->frozen) {
- return;
- }
- open_if_needed(output, NULL, NULL, "close");
- if (output->state != H2_TASK_OUT_DONE) {
- if (output->frozen_bb && !APR_BRIGADE_EMPTY(output->frozen_bb)) {
- h2_mplx_out_write(output->task->mplx, output->task->stream_id,
- NULL, 1, output->frozen_bb, NULL, NULL);
- }
- output->state = H2_TASK_OUT_DONE;
- h2_mplx_out_close(output->task->mplx, output->task->stream_id,
- get_trailers(output));
- }
-}
-
apr_off_t written;
apr_bucket_brigade *bb;
- apr_bucket_brigade *frozen_bb;
};
h2_task_output *h2_task_output_create(struct h2_task *task, conn_rec *c);
ap_filter_t* filter,
apr_bucket_brigade* brigade);
-void h2_task_output_close(h2_task_output *output);
-
apr_status_t h2_task_output_freeze(h2_task_output *output);
apr_status_t h2_task_output_thaw(h2_task_output *output);
*/
#include <assert.h>
-
#include <apr_strings.h>
#include <httpd.h>
else {
const char *data;
apr_size_t len;
+
status = apr_bucket_read(b, &data, &len, APR_BLOCK_READ);
if (status == APR_SUCCESS && len > 0) {
status = apr_brigade_write(to, NULL, NULL, data, len);
return status;
}
-int h2_util_has_flush_or_eos(apr_bucket_brigade *bb)
-{
- apr_bucket *b;
- for (b = APR_BRIGADE_FIRST(bb);
- b != APR_BRIGADE_SENTINEL(bb);
- b = APR_BUCKET_NEXT(b))
- {
- if (APR_BUCKET_IS_EOS(b) || APR_BUCKET_IS_FLUSH(b)) {
- return 1;
- }
- }
- return 0;
-}
-
int h2_util_has_eos(apr_bucket_brigade *bb, apr_off_t len)
{
apr_bucket *b, *end;
return APR_SUCCESS;
}
+apr_off_t h2_brigade_mem_size(apr_bucket_brigade *bb)
+{
+ apr_bucket *b;
+ apr_off_t total = 0;
+
+ for (b = APR_BRIGADE_FIRST(bb);
+ b != APR_BRIGADE_SENTINEL(bb);
+ b = APR_BUCKET_NEXT(b))
+ {
+ total += sizeof(*b);
+ if (b->length > 0) {
+ if (APR_BUCKET_IS_HEAP(b)
+ || APR_BUCKET_IS_POOL(b)) {
+ total += b->length;
+ }
+ }
+ }
+ return total;
+}
+
+
/*******************************************************************************
* h2_ngheader
******************************************************************************/
* @param bb the brigade to check on
* @return != 0 iff brigade holds FLUSH or EOS bucket (or both)
*/
-int h2_util_has_flush_or_eos(apr_bucket_brigade *bb);
int h2_util_has_eos(apr_bucket_brigade *bb, apr_off_t len);
int h2_util_bb_has_data(apr_bucket_brigade *bb);
int h2_util_bb_has_data_or_eos(apr_bucket_brigade *bb);
apr_off_t *plen,
int *peos);
+/**
+ * Get an approximnation of the memory footprint of the given
+ * brigade. This varies from apr_brigade_length as
+ * - no buckets are ever read
+ * - only buckets known to allocate memory (HEAP+POOL) are counted
+ * - the bucket struct itself is counted
+ */
+apr_off_t h2_brigade_mem_size(apr_bucket_brigade *bb);
+
#endif /* defined(__mod_h2__h2_util__) */
* @macro
* Version number of the http2 module as c string
*/
-#define MOD_HTTP2_VERSION "1.4.1"
+#define MOD_HTTP2_VERSION "1.4.2"
/**
* @macro
* release. This is a 24 bit number with 8 bits for major number, 8 bits
* for minor and 8 bits for patch. Version 1.2.3 becomes 0x010203.
*/
-#define MOD_HTTP2_VERSION_NUM 0x010401
+#define MOD_HTTP2_VERSION_NUM 0x010402
#endif /* mod_h2_h2_version_h */
static apr_status_t http2_req_engine_push(const char *ngn_type,
request_rec *r,
- h2_req_engine_init *einit)
+ http2_req_engine_init *einit)
{
return h2_mplx_req_engine_push(ngn_type, r, einit);
}
typedef struct h2_req_engine h2_req_engine;
+typedef void http2_output_consumed(void *ctx, conn_rec *c, apr_off_t consumed);
+
/**
* Initialize a h2_req_engine. The structure will be passed in but
* only the name and master are set. The function should initialize
* @param engine the allocated, partially filled structure
* @param r the first request to process, or NULL
*/
-typedef apr_status_t h2_req_engine_init(h2_req_engine *engine,
- const char *id,
- const char *type,
- apr_pool_t *pool,
- apr_uint32_t req_buffer_size,
- request_rec *r);
+typedef apr_status_t http2_req_engine_init(h2_req_engine *engine,
+ const char *id,
+ const char *type,
+ apr_pool_t *pool,
+ apr_uint32_t req_buffer_size,
+ request_rec *r,
+ http2_output_consumed **pconsumed,
+ void **pbaton);
/**
* Push a request to an engine with the specified name for further processing.
APR_DECLARE_OPTIONAL_FN(apr_status_t,
http2_req_engine_push, (const char *engine_type,
request_rec *r,
- h2_req_engine_init *einit));
+ http2_req_engine_init *einit));
/**
* Get a new request for processing in this engine.