}
apr_pool_create_ex(&pool, parent, NULL, allocator);
apr_pool_tag(pool, "h2_slave_conn");
- apr_allocator_owner_set(allocator, parent);
+ apr_allocator_owner_set(allocator, pool);
c = (conn_rec *) apr_palloc(pool, sizeof(conn_rec));
if (c == NULL) {
void h2_slave_destroy(conn_rec *slave, apr_allocator_t **pallocator)
{
+ apr_pool_t *parent;
apr_allocator_t *allocator = apr_pool_allocator_get(slave->pool);
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, slave,
"h2_slave_conn(%ld): destroy (task=%s)", slave->id,
apr_table_get(slave->notes, H2_TASK_ID_NOTE));
- apr_pool_destroy(slave->pool);
- if (pallocator) {
+ /* Attache the allocator to the parent pool and return it for
+ * reuse, otherwise the own is still the slave pool and it will
+ * get destroyed with it. */
+ parent = apr_pool_parent_get(slave->pool);
+ if (pallocator && parent) {
+ apr_allocator_owner_set(allocator, parent);
*pallocator = allocator;
}
- else {
- apr_allocator_destroy(allocator);
- }
+ apr_pool_destroy(slave->pool);
}
#include <http_core.h>
#include <http_log.h>
#include <http_connection.h>
+#include <http_request.h>
#include "h2_private.h"
#include "h2_h2.h"
#include "h2_task.h"
#include "h2_util.h"
-h2_io *h2_io_create(int id, apr_pool_t *pool, const h2_request *request)
+h2_io *h2_io_create(int id, apr_pool_t *pool,
+ apr_bucket_alloc_t *bucket_alloc,
+ const h2_request *request)
{
h2_io *io = apr_pcalloc(pool, sizeof(*io));
if (io) {
io->id = id;
io->pool = pool;
- io->bucket_alloc = apr_bucket_alloc_create(pool);
+ io->bucket_alloc = bucket_alloc;
io->request = h2_request_clone(pool, request);
}
return io;
apr_size_t *pfile_buckets_allowed)
{
apr_status_t status;
+ apr_bucket *b;
int start_allowed;
if (io->rst_error) {
return APR_ECONNABORTED;
}
+ if (!io->eor) {
+ /* Filter the EOR bucket and set it aside. We prefer to tear down
+ * the request when the whole h2 stream is done */
+ for (b = APR_BRIGADE_FIRST(bb);
+ b != APR_BRIGADE_SENTINEL(bb);
+ b = APR_BUCKET_NEXT(b))
+ {
+ if (AP_BUCKET_IS_EOR(b)) {
+ APR_BUCKET_REMOVE(b);
+ io->eor = b;
+ break;
+ }
+ }
+ }
+
if (io->eos_out) {
- apr_off_t len;
+ apr_off_t len = 0;
/* We have already delivered an EOS bucket to a reader, no
* sense in storing anything more here.
*/
- status = apr_brigade_length(bb, 1, &len);
- if (status == APR_SUCCESS) {
- if (len > 0) {
- /* someone tries to write real data after EOS, that
- * does not look right. */
- status = APR_EOF;
- }
- /* cleanup, as if we had moved the data */
- apr_brigade_cleanup(bb);
- }
- return status;
+ apr_brigade_length(bb, 0, &len);
+ apr_brigade_cleanup(bb);
+ return (len > 0)? APR_EOF : APR_SUCCESS;
}
process_trailers(io, trailers);
struct h2_response *response; /* response to request */
int rst_error; /* h2 related stream abort error */
+ apr_bucket *eor; /* the EOR bucket, set aside */
+ struct h2_task *task; /* the task once started */
+
apr_bucket_brigade *bbin; /* input data for stream */
apr_bucket_brigade *bbout; /* output data from stream */
apr_bucket_brigade *bbtmp; /* temporary data for chunking */
/**
* Creates a new h2_io for the given stream id.
*/
-h2_io *h2_io_create(int id, apr_pool_t *pool, const struct h2_request *request);
+h2_io *h2_io_create(int id, apr_pool_t *pool,
+ apr_bucket_alloc_t *bucket_alloc,
+ const struct h2_request *request);
/**
* Set the response of this stream.
return NULL;
}
+ m->bucket_alloc = apr_bucket_alloc_create(m->pool);
m->max_streams = h2_config_geti(conf, H2_CONF_MAX_STREAMS);
m->stream_max_mem = h2_config_geti(conf, H2_CONF_STREAM_MAX_MEM);
m->q = h2_iq_create(m->pool, m->max_streams);
static void io_destroy(h2_mplx *m, h2_io *io, int events)
{
- apr_pool_t *pool = io->pool;
+ apr_pool_t *pool;
/* cleanup any buffered input */
h2_io_in_shutdown(io);
io_process_events(m, io);
}
- io->pool = NULL;
/* The pool is cleared/destroyed which also closes all
* allocated file handles. Give this count back to our
* file handle pool. */
if (m->redo_ios) {
h2_io_set_remove(m->redo_ios, io);
}
-
+
+ if (io->task) {
+ if (m->spare_allocator) {
+ apr_allocator_destroy(m->spare_allocator);
+ m->spare_allocator = NULL;
+ }
+
+ h2_slave_destroy(io->task->c, &m->spare_allocator);
+ io->task = NULL;
+ }
+
+ pool = io->pool;
+ io->pool = NULL;
if (pool) {
apr_pool_clear(pool);
if (m->spare_pool) {
"h2_mplx(%ld-%d): close, no response, no rst",
m->id, io->id);
}
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, m->c,
- "h2_mplx(%ld-%d): close with trailers=%s",
- m->id, io->id, trailers? "yes" : "no");
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, m->c,
+ "h2_mplx(%ld-%d): close with eor=%s, trailers=%s",
+ m->id, io->id, io->eor? "yes" : "no",
+ trailers? "yes" : "no");
status = h2_io_out_close(io, trailers);
H2_MPLX_IO_OUT(APLOG_TRACE2, m, io, "h2_mplx_out_close");
+ if (io->eor) {
+ apr_bucket_delete(io->eor);
+ io->eor = NULL;
+ }
have_out_data_for(m, stream_id);
}
else {
m->spare_pool = NULL;
}
- io = h2_io_create(stream_id, io_pool, request);
+ io = h2_io_create(stream_id, io_pool, m->bucket_alloc, request);
h2_io_set_add(m->stream_ios, io);
return io;
}
}
else if (io) {
- conn_rec *slave = h2_slave_create(m->c, m->pool, m->spare_allocator);
+ conn_rec *slave = h2_slave_create(m->c, io->pool, m->spare_allocator);
m->spare_allocator = NULL;
- task = h2_task_create(m->id, io->request, slave, m);
+ io->task = task = h2_task_create(m->id, io->request, slave, m);
apr_table_setn(slave->notes, H2_TASK_ID_NOTE, task->id);
io->worker_started = 1;
io->started_at = apr_time_now();
h2_ngn_shed_done_ngn(m->ngn_shed, task->engine);
}
- if (m->spare_allocator) {
- apr_allocator_destroy(m->spare_allocator);
- m->spare_allocator = NULL;
- }
-
- h2_slave_destroy(task->c, &m->spare_allocator);
- task = NULL;
-
if (io) {
apr_time_t now = apr_time_now();
if (!io->orphaned && m->redo_ios
}
}
else {
- /* hang around until the stream deregisteres */
+ /* hang around until the stream deregisters */
}
}
+ else {
+ ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c,
+ "h2_mplx(%ld): task %s without corresp. h2_io",
+ m->id, task->id);
+ }
}
}
}
/* cannot report that as done until engine returns */
}
else {
- h2_task_output_close(task->output);
task_done(m, task);
}
leave_mutex(m, acquired);
volatile int refs;
conn_rec *c;
apr_pool_t *pool;
+ apr_bucket_alloc_t *bucket_alloc;
unsigned int aborted : 1;
unsigned int need_registration : 1;
}
static apr_status_t ngn_done_task(h2_ngn_shed *shed, h2_req_engine *ngn,
- h2_task *task, int waslive, int aborted,
- int close)
+ h2_task *task, int waslive, int aborted)
{
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, shed->c,
"h2_ngn_shed(%ld): task %s %s by %s",
if (waslive) ngn->no_live--;
ngn->no_assigned--;
- if (close) {
- h2_task_output_close(task->output);
- }
return APR_SUCCESS;
}
apr_status_t h2_ngn_shed_done_task(h2_ngn_shed *shed,
struct h2_req_engine *ngn, h2_task *task)
{
- return ngn_done_task(shed, ngn, task, 1, 0, 0);
+ return ngn_done_task(shed, ngn, task, 1, 0);
}
void h2_ngn_shed_done_ngn(h2_ngn_shed *shed, struct h2_req_engine *ngn)
"h2_ngn_shed(%ld): engine %s has queued task %s, "
"frozen=%d, aborting",
shed->c->id, ngn->id, task->id, task->frozen);
- ngn_done_task(shed, ngn, task, 0, 1, 1);
+ ngn_done_task(shed, ngn, task, 0, 1);
}
}
if (!shed->aborted && (ngn->no_assigned > 1 || ngn->no_live > 1)) {
output->task = task;
output->state = H2_TASK_OUT_INIT;
output->from_h1 = h2_from_h1_create(task->stream_id, c->pool);
- if (!output->from_h1) {
- return NULL;
- }
}
return output;
}
return NULL;
}
-static apr_status_t open_if_needed(h2_task_output *output, ap_filter_t *f,
- apr_bucket_brigade *bb, const char *caller)
+static apr_status_t open_response(h2_task_output *output, ap_filter_t *f,
+ apr_bucket_brigade *bb, const char *caller)
{
- if (output->state == H2_TASK_OUT_INIT) {
- h2_response *response;
- output->state = H2_TASK_OUT_STARTED;
- response = h2_from_h1_get_response(output->from_h1);
- if (!response) {
- if (f) {
- /* This happens currently when ap_die(status, r) is invoked
- * by a read request filter. */
- ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, output->c, APLOGNO(03204)
- "h2_task_output(%s): write without response by %s "
- "for %s %s %s",
- output->task->id, caller,
- output->task->request->method,
- output->task->request->authority,
- output->task->request->path);
- output->c->aborted = 1;
- }
- if (output->task->io) {
- apr_thread_cond_broadcast(output->task->io);
- }
- return APR_ECONNABORTED;
+ h2_response *response;
+ response = h2_from_h1_get_response(output->from_h1);
+ if (!response) {
+ if (f) {
+ /* This happens currently when ap_die(status, r) is invoked
+ * by a read request filter. */
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, output->c, APLOGNO(03204)
+ "h2_task_output(%s): write without response by %s "
+ "for %s %s %s",
+ output->task->id, caller,
+ output->task->request->method,
+ output->task->request->authority,
+ output->task->request->path);
+ output->c->aborted = 1;
}
-
- if (h2_task_logio_add_bytes_out) {
- /* counter headers as if we'd do a HTTP/1.1 serialization */
- output->written = h2_util_table_bytes(response->headers, 3)+1;
- h2_task_logio_add_bytes_out(output->c, output->written);
+ if (output->task->io) {
+ apr_thread_cond_broadcast(output->task->io);
}
- get_trailers(output);
- ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, output->c, APLOGNO(03348)
- "h2_task(%s): open response to %s %s %s",
- output->task->id, output->task->request->method,
- output->task->request->authority,
- output->task->request->path);
- return h2_mplx_out_open(output->task->mplx, output->task->stream_id,
- response, f, bb, output->task->io);
+ return APR_ECONNABORTED;
+ }
+
+ if (h2_task_logio_add_bytes_out) {
+ /* count headers as if we'd do a HTTP/1.1 serialization */
+ output->written = h2_util_table_bytes(response->headers, 3)+1;
+ h2_task_logio_add_bytes_out(output->c, output->written);
}
- return APR_SUCCESS;
+ get_trailers(output);
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, output->c, APLOGNO(03348)
+ "h2_task(%s): open response to %s %s %s",
+ output->task->id, output->task->request->method,
+ output->task->request->authority,
+ output->task->request->path);
+ return h2_mplx_out_open(output->task->mplx, output->task->stream_id,
+ response, f, bb, output->task->io);
}
static apr_status_t write_brigade_raw(h2_task_output *output,
apr_status_t h2_task_output_write(h2_task_output *output,
ap_filter_t* f, apr_bucket_brigade* bb)
{
- apr_status_t status;
+ apr_status_t status = APR_SUCCESS;
if (APR_BRIGADE_EMPTY(bb)) {
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, output->c,
return APR_SUCCESS;
}
- status = open_if_needed(output, f, bb, "write");
+ if (output->state == H2_TASK_OUT_INIT) {
+ status = open_response(output, f, bb, "write");
+ output->state = H2_TASK_OUT_STARTED;
+ }
/* Attempt to write saved brigade first */
if (status == APR_SUCCESS && output->bb
return status;
}
-void h2_task_output_close(h2_task_output *output)
-{
- open_if_needed(output, NULL, NULL, "close");
- if (output->state != H2_TASK_OUT_DONE) {
- h2_mplx_out_close(output->task->mplx, output->task->stream_id,
- get_trailers(output));
- output->state = H2_TASK_OUT_DONE;
- }
-}
-
ap_filter_t* filter,
apr_bucket_brigade* brigade);
-void h2_task_output_close(h2_task_output *output);
-
apr_status_t h2_task_output_freeze(h2_task_output *output);
apr_status_t h2_task_output_thaw(h2_task_output *output);
else {
const char *data;
apr_size_t len;
+
status = apr_bucket_read(b, &data, &len, APR_BLOCK_READ);
if (status == APR_SUCCESS && len > 0) {
status = apr_brigade_write(to, NULL, NULL, data, len);