return h2_beam_bucket_make(b, beam, bred, n);
}
-/*static apr_status_t beam_bucket_setaside(apr_bucket *b, apr_pool_t *pool)
-{
- apr_status_t status = APR_SUCCESS;
- h2_beam_proxy *d = b->data;
- if (d->bred) {
- const char *data;
- apr_size_t len;
-
- status = apr_bucket_read(d->bred, &data, &len, APR_BLOCK_READ);
- if (status == APR_SUCCESS) {
- b = apr_bucket_heap_make(b, (char *)data + b->start, b->length, NULL);
- if (b == NULL) {
- return APR_ENOMEM;
- }
- }
- }
- return status;
-}*/
-
const apr_bucket_type_t h2_bucket_type_beam = {
"BEAM", 5, APR_BUCKET_DATA,
beam_bucket_destroy,
return APR_SUCCESS;
}
-static apr_status_t beam_cleanup(void *data)
+static void beam_set_red_pool(h2_bucket_beam *beam, apr_pool_t *pool);
+
+static apr_status_t beam_red_cleanup(void *data)
{
h2_bucket_beam *beam = data;
- beam_close(beam);
r_purge_reds(beam);
h2_blist_cleanup(&beam->red);
report_consumption(beam, 0);
}
h2_blist_cleanup(&beam->purge);
h2_blist_cleanup(&beam->hold);
+ beam_set_red_pool(beam, NULL);
return APR_SUCCESS;
}
+static void beam_set_red_pool(h2_bucket_beam *beam, apr_pool_t *pool)
+{
+ if (beam->red_pool != pool) {
+ if (beam->red_pool) {
+ apr_pool_cleanup_kill(beam->red_pool, beam, beam_red_cleanup);
+ }
+ beam->red_pool = pool;
+ if (beam->red_pool) {
+ apr_pool_pre_cleanup_register(beam->red_pool, beam, beam_red_cleanup);
+ }
+ }
+}
+
+static apr_status_t beam_cleanup(void *data)
+{
+ h2_bucket_beam *beam = data;
+ apr_status_t status;
+
+ beam_close(beam);
+ if (beam->red_pool) {
+ status = beam_red_cleanup(beam);
+ }
+ return APR_SUCCESS;
+}
+
apr_status_t h2_beam_destroy(h2_bucket_beam *beam)
{
- apr_pool_cleanup_kill(beam->red_pool, beam, beam_cleanup);
+ apr_pool_cleanup_kill(beam->pool, beam, beam_cleanup);
return beam_cleanup(beam);
}
-apr_status_t h2_beam_create(h2_bucket_beam **pbeam, apr_pool_t *red_pool,
+apr_status_t h2_beam_create(h2_bucket_beam **pbeam, apr_pool_t *pool,
int id, const char *tag,
apr_size_t max_buf_size)
{
h2_bucket_beam *beam;
apr_status_t status = APR_SUCCESS;
- beam = apr_pcalloc(red_pool, sizeof(*beam));
+ beam = apr_pcalloc(pool, sizeof(*beam));
if (!beam) {
return APR_ENOMEM;
}
beam->id = id;
beam->tag = tag;
+ beam->pool = pool;
H2_BLIST_INIT(&beam->red);
H2_BLIST_INIT(&beam->hold);
H2_BLIST_INIT(&beam->purge);
H2_BPROXY_LIST_INIT(&beam->proxies);
- beam->red_pool = red_pool;
beam->max_buf_size = max_buf_size;
+ apr_pool_pre_cleanup_register(pool, beam, beam_cleanup);
- apr_pool_pre_cleanup_register(red_pool, beam, beam_cleanup);
*pbeam = beam;
return status;
return status;
}
+static void move_to_hold(h2_bucket_beam *beam,
+ apr_bucket_brigade *red_brigade)
+{
+ apr_bucket *b;
+ while (red_brigade && !APR_BRIGADE_EMPTY(red_brigade)) {
+ b = APR_BRIGADE_FIRST(red_brigade);
+ APR_BUCKET_REMOVE(b);
+ H2_BLIST_INSERT_TAIL(&beam->red, b);
+ }
+}
+
static apr_status_t append_bucket(h2_bucket_beam *beam,
apr_bucket *bred,
apr_read_type_e block,
- apr_pool_t *pool,
h2_beam_lock *pbl)
{
const char *data;
* its pool/bucket_alloc from a foreign thread and that will
* corrupt. */
status = APR_ENOTIMPL;
- if (beam->closed && bred->length > 0) {
- status = APR_EOF;
- }
- else if (APR_BUCKET_IS_TRANSIENT(bred)) {
+ if (APR_BUCKET_IS_TRANSIENT(bred)) {
/* this takes care of transient buckets and converts them
* into heap ones. Other bucket types might or might not be
* affected by this. */
- status = apr_bucket_setaside(bred, pool);
+ status = apr_bucket_setaside(bred, beam->red_pool);
}
else if (APR_BUCKET_IS_HEAP(bred)) {
/* For heap buckets read from a green thread is fine. The
}
if (can_beam) {
beam->last_beamed = fd;
- status = apr_bucket_setaside(bred, pool);
+ status = apr_bucket_setaside(bred, beam->red_pool);
}
/* else: enter ENOTIMPL case below */
}
}
status = apr_bucket_read(bred, &data, &len, APR_BLOCK_READ);
if (status == APR_SUCCESS) {
- status = apr_bucket_setaside(bred, pool);
+ status = apr_bucket_setaside(bred, beam->red_pool);
}
}
r_purge_reds(beam);
if (beam->aborted) {
+ move_to_hold(beam, red_brigade);
status = APR_ECONNABORTED;
}
else if (red_brigade) {
while (!APR_BRIGADE_EMPTY(red_brigade)
&& status == APR_SUCCESS) {
bred = APR_BRIGADE_FIRST(red_brigade);
- status = append_bucket(beam, bred, block, beam->red_pool, &bl);
+ beam_set_red_pool(beam, red_brigade->p);
+ status = append_bucket(beam, bred, block, &bl);
}
report_production(beam, force_report);
if (beam->m_cond) {
struct h2_bucket_beam {
int id;
const char *tag;
+ apr_pool_t *pool;
h2_blist red;
h2_blist hold;
h2_blist purge;
apr_pool_t *pool;
conn_rec *c;
void *cfg;
- unsigned int free_bits;
- unsigned long l, lor;
AP_DEBUG_ASSERT(master);
ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, master,
memcpy(c, master, sizeof(conn_rec));
- /* Each conn_rec->id is supposed to be unique at a point in time. Since
- * some modules (and maybe external code) uses this id as an identifier
- * for the request_rec they handle, it needs to be unique for slave
- * connections also.
- * The connection id is generated by the MPM and most MPMs use the formula
- * id := (child_num * max_threads) + thread_num
- * which means that there is a maximum id of about
- * idmax := max_child_count * max_threads
- * If we assume 2024 child processes with 2048 threads max, we get
- * idmax ~= 2024 * 2048 = 2 ** 22
- * On 32 bit systems, we have not much space left, but on 64 bit systems
- * (and higher?) we can use the upper 32 bits without fear of collision.
- * 32 bits is just what we need, since a connection can only handle so
- * many streams.
- */
- l = master->id;
- lor = 0;
- if (sizeof(unsigned long) >= 8 && l < APR_UINT32_MAX) {
- free_bits = 32;
- }
- else {
- /* Assume that we never encounter ranges stream ids where this
- * leads to many collisions. With 32 bit longs, we have a hard time
- * to make server wide unique ids. */
- free_bits = 16;
- lor= (1 << 31);
- }
- c->id = (l^((unsigned long)slave_id << free_bits))|lor;
c->master = master;
c->pool = pool;
c->conn_config = ap_create_conn_config(pool);
/* check if we copy vs. setaside files in this location */
task->output.copy_files = h2_config_geti(h2_config_rget(r),
H2_CONF_COPY_FILES);
+ if (task->output.copy_files) {
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, task->c,
+ "h2_slave_out(%s): copy_files on", task->id);
+ h2_beam_on_file_beam(task->output.beam, h2_beam_no_files, NULL);
+ }
}
}
return DECLINED;
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, m->c,
"h2_mplx(%s): out open", task->id);
- if (!stream->output) {
- h2_beam_buffer_size_set(beam, m->stream_max_mem);
- h2_beam_timeout_set(beam, m->stream_timeout);
- h2_beam_on_consumed(beam, stream_output_consumed, task);
- h2_beam_on_produced(beam, output_produced, m);
- beamed_count = h2_beam_get_files_beamed(beam);
- if (m->tx_handles_reserved >= beamed_count) {
- m->tx_handles_reserved -= beamed_count;
- }
- else {
- m->tx_handles_reserved = 0;
- }
- if (!task->output.copy_files) {
- h2_beam_on_file_beam(beam, can_beam_file, m);
- }
- h2_beam_mutex_set(beam, beam_enter, task->cond, m);
- stream->output = beam;
+ h2_beam_on_consumed(stream->output, stream_output_consumed, task);
+ h2_beam_on_produced(stream->output, output_produced, m);
+ beamed_count = h2_beam_get_files_beamed(stream->output);
+ if (m->tx_handles_reserved >= beamed_count) {
+ m->tx_handles_reserved -= beamed_count;
+ }
+ else {
+ m->tx_handles_reserved = 0;
}
+ if (!task->output.copy_files) {
+ h2_beam_on_file_beam(stream->output, can_beam_file, m);
+ }
+
+ /* time to protect the beam against multi-threaded use */
+ h2_beam_mutex_set(stream->output, beam_enter, task->cond, m);
/* we might see some file buckets in the output, see
* if we have enough handles reserved. */
slave->sbh = m->c->sbh;
slave->aborted = 0;
- task = h2_task_create(slave, stream->id, stream->request, stream->input, m);
+ task = h2_task_create(slave, stream->id, stream->request,
+ stream->input, stream->output, m);
h2_ihash_add(m->tasks, task);
m->c->keepalives++;
h2_beam_on_file_beam(stream->input, can_beam_file, m);
h2_beam_mutex_set(stream->input, beam_enter, task->cond, m);
}
-
+ if (stream->output) {
+ h2_beam_buffer_size_set(stream->output, m->stream_max_mem);
+ h2_beam_timeout_set(stream->output, m->stream_timeout);
+ }
++m->workers_busy;
}
}
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
"h2_mplx(%ld): task(%s) done", m->id, task->id);
out_close(m, task);
- stream = h2_ihash_get(m->streams, task->stream_id);
if (ngn) {
apr_off_t bytes = 0;
h2_ngn_shed_done_ngn(m->ngn_shed, task->engine);
}
+ stream = h2_ihash_get(m->streams, task->stream_id);
if (!m->aborted && stream && m->redo_tasks
&& h2_ihash_get(m->redo_tasks, task->stream_id)) {
/* reset and schedule again */
task->worker_done = 1;
task->done_at = apr_time_now();
- if (task->output.beam) {
- h2_beam_on_consumed(task->output.beam, NULL, NULL);
- h2_beam_mutex_set(task->output.beam, NULL, NULL, NULL);
- }
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
"h2_mplx(%s): request done, %f ms elapsed", task->id,
(task->done_at - task->started_at) / 1000.0);
task->id);
/* more data will not arrive, resume the stream */
have_out_data_for(m, stream, 0);
+ h2_beam_on_consumed(stream->output, NULL, NULL);
+ h2_beam_mutex_set(stream->output, NULL, NULL, NULL);
}
else {
/* stream no longer active, was it placed in hold? */
const char *line = "(null)";
apr_size_t len, bmax = sizeof(buffer)/sizeof(buffer[0]);
- len = h2_util_bb_print(buffer, bmax, tag, "", s->buffer);
+ len = h2_util_bb_print(buffer, bmax, tag, "", s->out_buffer);
ap_log_cerror(APLOG_MARK, lvl, 0, c, "bb_dump(%s): %s",
c->log_id, len? buffer : line);
}
static void prep_output(h2_stream *stream) {
conn_rec *c = stream->session->c;
- if (!stream->buffer) {
- stream->buffer = apr_brigade_create(stream->pool, c->bucket_alloc);
+ if (!stream->out_buffer) {
+ stream->out_buffer = apr_brigade_create(stream->pool, c->bucket_alloc);
}
}
prep_output(stream);
b = h2_bucket_headers_create(c->bucket_alloc, response);
- APR_BRIGADE_INSERT_HEAD(stream->buffer, b);
+ APR_BRIGADE_INSERT_HEAD(stream->out_buffer, b);
}
static apr_status_t stream_pool_cleanup(void *ctx)
h2_stream *stream = ctx;
apr_status_t status;
- if (stream->input) {
- h2_beam_destroy(stream->input);
- stream->input = NULL;
- }
if (stream->files) {
apr_file_t *file;
int i;
stream->state = H2_STREAM_ST_IDLE;
stream->pool = pool;
stream->session = session;
+
+ h2_beam_create(&stream->input, pool, id, "input", 0);
+ h2_beam_create(&stream->output, pool, id, "output", 0);
set_state(stream, H2_STREAM_ST_OPEN);
apr_pool_cleanup_register(pool, stream, stream_pool_cleanup,
void h2_stream_cleanup(h2_stream *stream)
{
AP_DEBUG_ASSERT(stream);
- if (stream->buffer) {
- apr_brigade_cleanup(stream->buffer);
+ if (stream->out_buffer) {
+ apr_brigade_cleanup(stream->out_buffer);
}
if (stream->input) {
apr_status_t status;
stream->rst_error = error_code;
close_input(stream);
close_output(stream);
- if (stream->buffer) {
- apr_brigade_cleanup(stream->buffer);
+ if (stream->out_buffer) {
+ apr_brigade_cleanup(stream->out_buffer);
}
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c,
"h2_stream(%ld-%d): reset, error=%d",
close_input(stream);
}
- if (!stream->input) {
- h2_beam_create(&stream->input, stream->pool, stream->id, "input", 0);
- }
-
if (h2_stream_is_ready(stream)) {
/* already have a resonse, probably a HTTP error code */
return h2_mplx_process(stream->session->mplx, stream, cmp, ctx);
if (!stream->output) {
return APR_EOF;
}
- status = h2_beam_receive(stream->output, stream->buffer,
+ status = h2_beam_receive(stream->output, stream->out_buffer,
APR_NONBLOCK_READ, amount);
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, stream->session->c,
"h2_stream(%ld-%d): beam_received",
stream->session->id, stream->id);
- /* The buckets we reveive are using the stream->buffer pool as
+ /* The buckets we reveive are using the stream->out_buffer pool as
* lifetime which is exactly what we want since this is stream->pool.
*
* However: when we send these buckets down the core output filters, the
* file. Any split off buckets we sent afterwards will result in a
* APR_EBADF.
*/
- for (b = APR_BRIGADE_FIRST(stream->buffer);
- b != APR_BRIGADE_SENTINEL(stream->buffer);
+ for (b = APR_BRIGADE_FIRST(stream->out_buffer);
+ b != APR_BRIGADE_SENTINEL(stream->out_buffer);
b = APR_BUCKET_NEXT(b)) {
if (APR_BUCKET_IS_FILE(b)) {
apr_bucket_file *f = (apr_bucket_file *)b->data;
}
response = h2_headers_die(http_status, stream->request, stream->pool);
prepend_response(stream, response);
+ h2_beam_close(stream->output);
return APR_SUCCESS;
}
*plen = requested;
H2_STREAM_OUT_LOG(APLOG_TRACE2, stream, "h2_stream_out_prepare_pre");
- h2_util_bb_avail(stream->buffer, plen, peos);
+ h2_util_bb_avail(stream->out_buffer, plen, peos);
if (!*peos && *plen < requested) {
/* try to get more data */
status = fill_buffer(stream, (requested - *plen) + H2_DATA_CHUNK_SIZE);
if (APR_STATUS_IS_EOF(status)) {
apr_bucket *eos = apr_bucket_eos_create(c->bucket_alloc);
- APR_BRIGADE_INSERT_TAIL(stream->buffer, eos);
+ APR_BRIGADE_INSERT_TAIL(stream->out_buffer, eos);
status = APR_SUCCESS;
}
else if (status == APR_EAGAIN) {
status = APR_SUCCESS;
}
*plen = requested;
- h2_util_bb_avail(stream->buffer, plen, peos);
+ h2_util_bb_avail(stream->out_buffer, plen, peos);
}
H2_STREAM_OUT_LOG(APLOG_TRACE2, stream, "h2_stream_out_prepare_post");
- b = APR_BRIGADE_FIRST(stream->buffer);
- while (b != APR_BRIGADE_SENTINEL(stream->buffer)) {
+ b = APR_BRIGADE_FIRST(stream->out_buffer);
+ while (b != APR_BRIGADE_SENTINEL(stream->out_buffer)) {
e = APR_BUCKET_NEXT(b);
if (APR_BUCKET_IS_FLUSH(b)
|| (!APR_BUCKET_IS_METADATA(b) && b->length == 0)) {
b = e;
}
- b = get_first_headers_bucket(stream->buffer);
+ b = get_first_headers_bucket(stream->out_buffer);
if (b) {
/* there are HEADERS to submit */
*peos = 0;
*plen = 0;
- if (b == APR_BRIGADE_FIRST(stream->buffer)) {
+ if (b == APR_BRIGADE_FIRST(stream->out_buffer)) {
if (presponse) {
*presponse = h2_bucket_headers_get(b);
APR_BUCKET_REMOVE(b);
}
}
else {
- apr_bucket *e = APR_BRIGADE_FIRST(stream->buffer);
- while (e != APR_BRIGADE_SENTINEL(stream->buffer)) {
+ apr_bucket *e = APR_BRIGADE_FIRST(stream->out_buffer);
+ while (e != APR_BRIGADE_SENTINEL(stream->out_buffer)) {
if (e == b) {
break;
}
if (stream->rst_error) {
return APR_ECONNRESET;
}
- status = h2_append_brigade(bb, stream->buffer, plen, peos, is_not_headers);
+ status = h2_append_brigade(bb, stream->out_buffer, plen, peos, is_not_headers);
if (status == APR_SUCCESS && !*peos && !*plen) {
status = APR_EAGAIN;
}
if (stream->has_response) {
return 1;
}
- else if (stream->buffer && get_first_headers_bucket(stream->buffer)) {
+ else if (stream->out_buffer && get_first_headers_bucket(stream->out_buffer)) {
return 1;
}
return 0;
const struct h2_request *request; /* the request made in this stream */
struct h2_request *rtmp; /* request being assembled */
apr_table_t *trailers; /* optional incoming trailers */
- struct h2_bucket_beam *input;
int request_headers_added; /* number of request headers added */
- unsigned int push_policy; /* which push policy to use for this request */
+ struct h2_bucket_beam *input;
struct h2_bucket_beam *output;
- apr_bucket_brigade *buffer;
+ apr_bucket_brigade *out_buffer;
apr_array_header_t *files; /* apr_file_t* we collected during I/O */
int rst_error; /* stream error for RST_STREAM */
unsigned int scheduled : 1; /* stream has been scheduled */
unsigned int started : 1; /* stream has started processing */
unsigned int has_response : 1; /* response headers are known */
+ unsigned int push_policy; /* which push policy to use for this request */
apr_off_t out_data_frames; /* # of DATA frames sent */
apr_off_t out_data_octets; /* # of DATA octets (payload) sent */
* task output handling
******************************************************************************/
-static void prep_output(h2_task *task)
-{
- if (!task->output.beam) {
- h2_beam_create(&task->output.beam, task->pool,
- task->stream_id, "output", 0);
- if (task->output.copy_files) {
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, task->c,
- "h2_slave_out(%s): copy_files on", task->id);
- h2_beam_on_file_beam(task->output.beam, h2_beam_no_files, NULL);
- }
- }
-}
-
static apr_status_t open_output(h2_task *task)
{
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, task->c, APLOGNO(03348)
task->id, task->request->method,
task->request->authority,
task->request->path);
- prep_output(task);
task->output.opened = 1;
return h2_mplx_out_open(task->mplx, task->stream_id, task->output.beam);
}
return APR_SUCCESS;
}
- prep_output(task);
-
/* Attempt to write saved brigade first */
if (task->output.bb && !APR_BRIGADE_EMPTY(task->output.bb)) {
status = send_out(task, task->output.bb);
return OK;
}
-h2_task *h2_task_create(conn_rec *c, int stream_id,
- const h2_request *req, h2_bucket_beam *input,
+h2_task *h2_task_create(conn_rec *c, int stream_id, const h2_request *req,
+ h2_bucket_beam *input, h2_bucket_beam *output,
h2_mplx *mplx)
{
apr_pool_t *pool;
task->pool = pool;
task->request = req;
task->input.beam = input;
+ task->output.beam = output;
apr_thread_cond_create(&task->cond, pool);
void h2_task_destroy(h2_task *task)
{
- if (task->output.beam) {
- h2_beam_destroy(task->output.beam);
- task->output.beam = NULL;
- }
if (task->eor) {
apr_bucket_destroy(task->eor);
}
}
}
-apr_status_t h2_task_do(h2_task *task, apr_thread_t *thread)
+apr_status_t h2_task_do(h2_task *task, apr_thread_t *thread, int worker_id)
{
AP_DEBUG_ASSERT(task);
+
+ if (task->c->master) {
+ /* Each conn_rec->id is supposed to be unique at a point in time. Since
+ * some modules (and maybe external code) uses this id as an identifier
+ * for the request_rec they handle, it needs to be unique for slave
+ * connections also.
+ * The connection id is generated by the MPM and most MPMs use the formula
+ * id := (child_num * max_threads) + thread_num
+ * which means that there is a maximum id of about
+ * idmax := max_child_count * max_threads
+ * If we assume 2024 child processes with 2048 threads max, we get
+ * idmax ~= 2024 * 2048 = 2 ** 22
+ * On 32 bit systems, we have not much space left, but on 64 bit systems
+ * (and higher?) we can use the upper 32 bits without fear of collision.
+ * 32 bits is just what we need, since a connection can only handle so
+ * many streams.
+ */
+ int slave_id, free_bits;
+
+ if (sizeof(unsigned long) >= 8) {
+ free_bits = 32;
+ slave_id = task->stream_id;
+ }
+ else {
+ /* Assume we have a more limited number of threads/processes
+ * and h2 workers on a 32-bit system. Use the worker instead
+ * of the stream id. */
+ free_bits = 8;
+ slave_id = worker_id;
+ }
+ task->c->id = (task->c->master->id << free_bits)^slave_id;
+ }
task->input.chunked = task->request->chunked;
task->input.bb = apr_brigade_create(task->pool, task->c->bucket_alloc);
h2_task *h2_task_create(conn_rec *c, int stream_id,
const struct h2_request *req,
- struct h2_bucket_beam *input, struct h2_mplx *mplx);
+ struct h2_bucket_beam *input,
+ struct h2_bucket_beam *output,
+ struct h2_mplx *mplx);
void h2_task_destroy(h2_task *task);
-apr_status_t h2_task_do(h2_task *task, apr_thread_t *thread);
+apr_status_t h2_task_do(h2_task *task, apr_thread_t *thread, int worker_id);
void h2_task_redo(h2_task *task);
int h2_task_can_redo(h2_task *task);
* @macro
* Version number of the http2 module as c string
*/
-#define MOD_HTTP2_VERSION "1.7.5"
+#define MOD_HTTP2_VERSION "1.7.6"
/**
* @macro
* release. This is a 24 bit number with 8 bits for major number, 8 bits
* for minor and 8 bits for patch. Version 1.2.3 becomes 0x010203.
*/
-#define MOD_HTTP2_VERSION_NUM 0x010705
+#define MOD_HTTP2_VERSION_NUM 0x010706
#endif /* mod_h2_h2_version_h */
worker->get_next(worker, worker->ctx, &task, &sticky);
while (task) {
- h2_task_do(task, thread);
+ h2_task_do(task, thread, worker->id);
/* report the task done and maybe get another one from the same
* mplx (= master connection), if we can be sticky.
*/