Changes with Apache 2.4.24
+ *) mod_http2: unannounced and multiple interim responses (status code < 200)
+ are parsed and forwarded to client until a final response arrives.
+ [Stefan Eissing]
+
+ *) mod_proxy_http2: improved robustness when main connection is closed early
+ by resetting all ongoing streams against the backend.
+ [Stefan Eissing]
+
+ *) mod_http2: allocators from slave connections are released earlier, resulting
+ in less overall memory use on busy, long lived connections.
+ [Stefan Eissing]
+
*) mod_remoteip: Pick up where we left off during a subrequest rather
than running with the modified XFF but original TCP address.
PR 49839/PR 60251
@echo $(DL)GEN $@$(DL)
@echo $(DL) (HTTP2)$(DL) > $@
@echo $(DL) http2_module,$(DL) >> $@
- @echo $(DL) h2_ihash_add,$(DL) >> $@
- @echo $(DL) h2_ihash_clear,$(DL) >> $@
- @echo $(DL) h2_ihash_count,$(DL) >> $@
- @echo $(DL) h2_ihash_create,$(DL) >> $@
- @echo $(DL) h2_ihash_empty,$(DL) >> $@
- @echo $(DL) h2_ihash_iter,$(DL) >> $@
- @echo $(DL) h2_ihash_remove,$(DL) >> $@
- @echo $(DL) h2_iq_add,$(DL) >> $@
- @echo $(DL) h2_iq_create,$(DL) >> $@
- @echo $(DL) h2_iq_remove,$(DL) >> $@
- @echo $(DL) h2_log2,$(DL) >> $@
- @echo $(DL) h2_headers_add_h1,$(DL) >> $@
@echo $(DL) nghttp2_is_fatal,$(DL) >> $@
@echo $(DL) nghttp2_option_del,$(DL) >> $@
@echo $(DL) nghttp2_option_new,$(DL) >> $@
const char *scheme;
const char *authority;
const char *path;
-
apr_table_t *headers;
apr_time_t request_time;
-
unsigned int chunked : 1; /* iff requst body needs to be forwarded as chunked */
unsigned int serialize : 1; /* iff this request is written in HTTP/1.1 serialization */
- unsigned int expect_100 : 1; /* iff we need a 100-continue response */
- unsigned int expect_failed : 1; /* iff we are unable to fullfill expects */
};
typedef struct h2_headers h2_headers;
return status;
}
-conn_rec *h2_slave_create(conn_rec *master, int slave_id,
- apr_pool_t *parent, apr_allocator_t *allocator)
+conn_rec *h2_slave_create(conn_rec *master, int slave_id, apr_pool_t *parent)
{
+ apr_allocator_t *allocator;
apr_pool_t *pool;
conn_rec *c;
void *cfg;
* independant of its parent pool in the sense that it can work in
* another thread.
*/
- if (!allocator) {
- apr_allocator_create(&allocator);
- }
+ apr_allocator_create(&allocator);
apr_pool_create_ex(&pool, parent, NULL, allocator);
apr_pool_tag(pool, "h2_slave_conn");
apr_allocator_owner_set(allocator, pool);
return c;
}
-void h2_slave_destroy(conn_rec *slave, apr_allocator_t **pallocator)
+void h2_slave_destroy(conn_rec *slave)
{
- apr_pool_t *parent;
- apr_allocator_t *allocator = apr_pool_allocator_get(slave->pool);
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, slave,
"h2_slave_conn(%ld): destroy (task=%s)", slave->id,
apr_table_get(slave->notes, H2_TASK_ID_NOTE));
- /* Attache the allocator to the parent pool and return it for
- * reuse, otherwise the own is still the slave pool and it will
- * get destroyed with it. */
- parent = apr_pool_parent_get(slave->pool);
- if (pallocator && parent) {
- apr_allocator_owner_set(allocator, parent);
- *pallocator = allocator;
- }
apr_pool_destroy(slave->pool);
}
h2_mpm_type_t h2_conn_mpm_type(void);
-conn_rec *h2_slave_create(conn_rec *master, int slave_id,
- apr_pool_t *parent, apr_allocator_t *allocator);
-void h2_slave_destroy(conn_rec *slave, apr_allocator_t **pallocator);
+conn_rec *h2_slave_create(conn_rec *master, int slave_id, apr_pool_t *parent);
+void h2_slave_destroy(conn_rec *slave);
apr_status_t h2_slave_run_pre_connection(conn_rec *slave, apr_socket_t *csd);
void h2_slave_run_connection(conn_rec *slave);
status = ap_pass_brigade(f->next, parser->tmp);
apr_brigade_cleanup(parser->tmp);
- parser->state = H2_RP_DONE;
- task->output.parse_response = 0;
+ /* reset parser for possible next response */
+ parser->state = H2_RP_STATUS_LINE;
+ apr_array_clear(parser->hlines);
+
+ if (response->status >= 200) {
+ task->output.sent_response = 1;
+ }
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, task->c,
APLOGNO(03197) "h2_task(%s): passed response %d",
task->id, response->status);
}
else if (line[0] == '\0') {
/* end of headers, pass response onward */
+
return pass_response(task, f, parser);
}
else {
static void h2_mplx_destroy(h2_mplx *m)
{
+ conn_rec **pslave;
ap_assert(m);
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
"h2_mplx(%ld): destroy, tasks=%d",
m->id, (int)h2_ihash_count(m->tasks));
check_tx_free(m);
+
+ while (m->spare_slaves->nelts > 0) {
+ pslave = (conn_rec **)apr_array_pop(m->spare_slaves);
+ h2_slave_destroy(*pslave);
+ }
if (m->pool) {
apr_pool_destroy(m->pool);
}
m->shold = h2_ihash_create(m->pool, offsetof(h2_stream,id));
m->spurge = h2_ihash_create(m->pool, offsetof(h2_stream,id));
m->q = h2_iq_create(m->pool, m->max_streams);
- m->sready = h2_ihash_create(m->pool, offsetof(h2_stream,id));
+ m->readyq = h2_iq_create(m->pool, m->max_streams);
m->tasks = h2_ihash_create(m->pool, offsetof(h2_task,stream_id));
+ m->redo_tasks = h2_ihash_create(m->pool, offsetof(h2_task, stream_id));
m->stream_timeout = stream_timeout;
m->workers = workers;
m->workers_max = workers->max_workers;
- m->workers_def_limit = 4;
- m->workers_limit = m->workers_def_limit;
+ m->workers_limit = 6; /* the original h1 max parallel connections */
m->last_limit_change = m->last_idle_block = apr_time_now();
m->limit_change_interval = apr_time_from_msec(200);
}
}
- if (task->output.beam) {
- h2_beam_on_produced(task->output.beam, NULL, NULL);
- ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c,
- APLOGNO(03385) "h2_task(%s): destroy "
- "output beam empty=%d, holds proxies=%d",
- task->id,
- h2_beam_empty(task->output.beam),
- h2_beam_holds_proxies(task->output.beam));
- }
+ h2_beam_on_produced(task->output.beam, NULL, NULL);
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c,
+ APLOGNO(03385) "h2_task(%s): destroy "
+ "output beam empty=%d, holds proxies=%d",
+ task->id,
+ h2_beam_empty(task->output.beam),
+ h2_beam_holds_proxies(task->output.beam));
slave = task->c;
reuse_slave = ((m->spare_slaves->nelts < m->spare_slaves->nalloc)
&& !task->rst_error);
h2_ihash_remove(m->tasks, task->stream_id);
- if (m->redo_tasks) {
- h2_ihash_remove(m->redo_tasks, task->stream_id);
- }
+ h2_ihash_remove(m->redo_tasks, task->stream_id);
h2_task_destroy(task);
if (slave) {
}
else {
slave->sbh = NULL;
- h2_slave_destroy(slave, NULL);
+ h2_slave_destroy(slave);
}
}
* stream destruction until the task is done.
*/
h2_iq_remove(m->q, stream->id);
- h2_ihash_remove(m->sready, stream->id);
h2_ihash_remove(m->streams, stream->id);
- if (stream->input) {
- m->tx_handles_reserved += h2_beam_get_files_beamed(stream->input);
- h2_beam_on_consumed(stream->input, NULL, NULL);
- /* Let anyone blocked reading know that there is no more to come */
- h2_beam_abort(stream->input);
- /* Remove mutex after, so that abort still finds cond to signal */
- h2_beam_mutex_set(stream->input, NULL, NULL, NULL);
- }
- if (stream->output) {
- m->tx_handles_reserved += h2_beam_get_files_beamed(stream->output);
- }
+
h2_stream_cleanup(stream);
+ m->tx_handles_reserved += h2_beam_get_files_beamed(stream->input);
+ h2_beam_on_consumed(stream->input, NULL, NULL);
+ /* Let anyone blocked reading know that there is no more to come */
+ h2_beam_abort(stream->input);
+ /* Remove mutex after, so that abort still finds cond to signal */
+ h2_beam_mutex_set(stream->input, NULL, NULL, NULL);
+ m->tx_handles_reserved += h2_beam_get_files_beamed(stream->output);
task = h2_ihash_get(m->tasks, stream->id);
if (task) {
}
else {
/* already finished */
- task_destroy(m, task, 0);
+ task_destroy(m, task, 1);
}
}
h2_stream_destroy(stream);
if (task->c) {
task->c->aborted = 1;
}
- if (task->input.beam) {
- h2_beam_abort(task->input.beam);
- }
- if (task->output.beam) {
- h2_beam_abort(task->output.beam);
- }
+ h2_beam_abort(task->input.beam);
+ h2_beam_abort(task->output.beam);
}
return 1;
}
h2_iq_clear(m->q);
purge_streams(m);
- /* 3. mark all slave connections as aborted and wakeup all sleeping
- * tasks. Mark all still active streams as 'done'. m->streams has to
- * be empty afterwards with streams either in
+ /* 3. wakeup all sleeping tasks. Mark all still active streams as 'done'.
+ * m->streams has to be empty afterwards with streams either in
* a) m->shold because a task is still active
* b) m->spurge because task is done, or was not started */
h2_ihash_iter(m->tasks, task_abort_connection, m);
if (!h2_ihash_empty(m->tasks)) {
/* when we are here, we lost track of the tasks still present.
* this currently happens with mod_proxy_http2 when we shut
- * down a h2_req_engine with tasks assigned... */
- ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, APLOGNO(03056)
+ * down a h2_req_engine with tasks assigned. Since no parallel
+ * processing is going on any more, we just clean them up. */
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c, APLOGNO(03056)
"h2_mplx(%ld): 3. release_join with %d tasks",
m->id, (int)h2_ihash_count(m->tasks));
h2_ihash_iter(m->tasks, task_print, m);
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, m->c,
"h2_mplx(%s): close", task->id);
- if (task->output.beam) {
- status = h2_beam_close(task->output.beam);
- h2_beam_log(task->output.beam, task->stream_id, "out_close", m->c,
- APLOG_TRACE2);
- }
+ status = h2_beam_close(task->output.beam);
+ h2_beam_log(task->output.beam, task->stream_id, "out_close", m->c,
+ APLOG_TRACE2);
output_consumed_signal(m, task);
have_out_data_for(m, stream, 0);
return status;
if (m->aborted) {
status = APR_ECONNABORTED;
}
- else if (!h2_ihash_empty(m->sready)) {
+ else if (!h2_iq_empty(m->readyq)) {
status = APR_SUCCESS;
}
else {
{
ap_assert(m);
ap_assert(stream);
- if (!h2_ihash_get(m->sready, stream->id)) {
- h2_ihash_add(m->sready, stream);
- if (m->added_output) {
- apr_thread_cond_signal(m->added_output);
- }
+ h2_iq_append(m->readyq, stream->id);
+ if (m->added_output) {
+ apr_thread_cond_signal(m->added_output);
}
}
else {
h2_ihash_add(m->streams, stream);
if (h2_stream_is_ready(stream)) {
- h2_ihash_add(m->sready, stream);
+ h2_iq_append(m->readyq, stream->id);
}
else {
if (!m->need_registration) {
slave = *pslave;
}
else {
- slave = h2_slave_create(m->c, stream->id, m->pool, NULL);
+ slave = h2_slave_create(m->c, stream->id, m->pool);
new_conn = 1;
}
m->max_stream_started = sid;
}
- if (stream->input) {
- h2_beam_timeout_set(stream->input, m->stream_timeout);
- h2_beam_on_consumed(stream->input, stream_input_consumed, m);
- h2_beam_on_file_beam(stream->input, can_beam_file, m);
- h2_beam_mutex_set(stream->input, beam_enter, task->cond, m);
- }
- if (stream->output) {
- h2_beam_buffer_size_set(stream->output, m->stream_max_mem);
- h2_beam_timeout_set(stream->output, m->stream_timeout);
- }
+ h2_beam_timeout_set(stream->input, m->stream_timeout);
+ h2_beam_on_consumed(stream->input, stream_input_consumed, m);
+ h2_beam_on_file_beam(stream->input, can_beam_file, m);
+ h2_beam_mutex_set(stream->input, beam_enter, task->cond, m);
+
+ h2_beam_buffer_size_set(stream->output, m->stream_max_mem);
+ h2_beam_timeout_set(stream->output, m->stream_timeout);
++m->workers_busy;
}
}
if (ngn) {
apr_off_t bytes = 0;
- if (task->output.beam) {
- h2_beam_send(task->output.beam, NULL, APR_NONBLOCK_READ);
- bytes += h2_beam_get_buffered(task->output.beam);
- }
+ h2_beam_send(task->output.beam, NULL, APR_NONBLOCK_READ);
+ bytes += h2_beam_get_buffered(task->output.beam);
if (bytes > 0) {
/* we need to report consumed and current buffered output
* to the engine. The request will be streamed out or cancelled,
}
if (task->engine) {
- if (!h2_req_engine_is_shutdown(task->engine)) {
+ if (!m->aborted && !task->c->aborted
+ && !h2_req_engine_is_shutdown(task->engine)) {
ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c,
"h2_mplx(%ld): task(%s) has not-shutdown "
"engine(%s)", m->id, task->id,
}
stream = h2_ihash_get(m->streams, task->stream_id);
- if (!m->aborted && stream && m->redo_tasks
+ if (!m->aborted && stream
&& h2_ihash_get(m->redo_tasks, task->stream_id)) {
/* reset and schedule again */
h2_task_redo(task);
h2_task *task;
int n;
- if (!m->redo_tasks) {
- m->redo_tasks = h2_ihash_create(m->pool, offsetof(h2_task, stream_id));
- }
/* Try to get rid of streams that occupy workers. Look for safe requests
* that are repeatable. If none found, fail the connection.
*/
return status;
}
-void h2_mplx_req_engine_done(h2_req_engine *ngn, conn_rec *r_conn)
+void h2_mplx_req_engine_done(h2_req_engine *ngn, conn_rec *r_conn,
+ apr_status_t status)
{
h2_task *task = h2_ctx_cget_task(r_conn);
if (enter_mutex(m, &acquired) == APR_SUCCESS) {
ngn_out_update_windows(m, ngn);
h2_ngn_shed_done_task(m->ngn_shed, ngn, task);
+ if (status != APR_SUCCESS && h2_task_can_redo(task)
+ && !h2_ihash_get(m->redo_tasks, task->stream_id)) {
+ h2_ihash_add(m->redo_tasks, task);
+ }
if (task->engine) {
/* cannot report that as done until engine returns */
}
{
apr_status_t status;
int acquired;
- int streams[32];
+ int ids[100];
h2_stream *stream;
size_t i, n;
/* update input windows for streams */
h2_ihash_iter(m->streams, update_window, m);
- if (on_resume && !h2_ihash_empty(m->sready)) {
- n = h2_ihash_ishift(m->sready, streams, H2_ALEN(streams));
+ if (on_resume && !h2_iq_empty(m->readyq)) {
+ n = h2_iq_mshift(m->readyq, ids, H2_ALEN(ids));
for (i = 0; i < n; ++i) {
- stream = h2_ihash_get(m->streams, streams[i]);
- if (!stream) {
- continue;
+ stream = h2_ihash_get(m->streams, ids[i]);
+ if (stream) {
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, m->c,
+ "h2_mplx(%ld-%d): on_resume",
+ m->id, stream->id);
+ on_resume(on_ctx, stream);
}
- ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, m->c,
- "h2_mplx(%ld-%d): on_resume",
- m->id, stream->id);
- on_resume(on_ctx, stream);
}
}
if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
h2_stream *s = h2_ihash_get(m->streams, stream_id);
if (s) {
- h2_ihash_add(m->sready, s);
+ h2_iq_append(m->readyq, stream_id);
}
leave_mutex(m, acquired);
}
struct h2_ihash_t *spurge; /* all streams done, ready for destroy */
struct h2_iqueue *q; /* all stream ids that need to be started */
- struct h2_ihash_t *sready; /* all streams ready for output */
+ struct h2_iqueue *readyq; /* all stream ids ready for output */
struct h2_ihash_t *tasks; /* all tasks started and not destroyed */
struct h2_ihash_t *redo_tasks; /* all tasks that need to be redone */
int max_stream_started; /* highest stream id that started processing */
int workers_busy; /* # of workers processing on this mplx */
int workers_limit; /* current # of workers limit, dynamic */
- int workers_def_limit; /* default # of workers limit */
int workers_max; /* max, hard limit # of workers in a process */
apr_time_t last_idle_block; /* last time, this mplx entered IDLE while
* streams were ready */
apr_read_type_e block,
int capacity,
request_rec **pr);
-void h2_mplx_req_engine_done(struct h2_req_engine *ngn, conn_rec *r_conn);
+void h2_mplx_req_engine_done(struct h2_req_engine *ngn, conn_rec *r_conn,
+ apr_status_t status);
#endif /* defined(__mod_h2__h2_mplx__) */
const char *type; /* name of the engine type */
apr_pool_t *pool; /* pool for engine specific allocations */
conn_rec *c; /* connection this engine is assigned to */
- h2_task *task; /* the task this engine is base on, running in */
+ h2_task *task; /* the task this engine is based on, running in */
h2_ngn_shed *shed;
unsigned int shutdown : 1; /* engine is being shut down */
if (!shed->aborted && !H2_REQ_ENTRIES_EMPTY(&ngn->entries)) {
h2_ngn_entry *entry;
- ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, shed->c,
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, shed->c,
"h2_ngn_shed(%ld): exit engine %s (%s), "
"has still requests queued, shutdown=%d,"
"assigned=%ld, live=%ld, finished=%ld",
entry != H2_REQ_ENTRIES_SENTINEL(&ngn->entries);
entry = H2_NGN_ENTRY_NEXT(entry)) {
h2_task *task = entry->task;
- ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, shed->c,
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, shed->c,
"h2_ngn_shed(%ld): engine %s has queued task %s, "
"frozen=%d, aborting",
shed->c->id, ngn->id, task->id, task->frozen);
}
}
if (!shed->aborted && (ngn->no_assigned > 1 || ngn->no_live > 1)) {
- ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, shed->c,
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, shed->c,
"h2_ngn_shed(%ld): exit engine %s (%s), "
"assigned=%ld, live=%ld, finished=%ld",
shed->c->id, ngn->id, ngn->type,
if (stream) {
int touched = (stream->data_sent ||
stream_id <= session->last_stream_id);
- int complete = (stream->error_code == 0);
+ apr_status_t status = (stream->error_code == 0)? APR_SUCCESS : APR_EINVAL;
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03364)
"h2_proxy_sesssion(%s): stream(%d) closed "
"(touched=%d, error=%d)",
session->id, stream_id, touched, stream->error_code);
- if (!complete) {
+ if (status != APR_SUCCESS) {
stream->r->status = 500;
}
else if (!stream->data_received) {
h2_proxy_ihash_remove(session->streams, stream_id);
h2_proxy_iq_remove(session->suspended, stream_id);
if (session->done) {
- session->done(session, stream->r, complete, touched);
+ session->done(session, stream->r, status, touched);
}
}
}
}
+static int send_loop(h2_proxy_session *session)
+{
+ while (nghttp2_session_want_write(session->ngh2)) {
+ int rv = nghttp2_session_send(session->ngh2);
+ if (rv < 0 && nghttp2_is_fatal(rv)) {
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c,
+ "h2_proxy_session(%s): write, rv=%d", session->id, rv);
+ dispatch_event(session, H2_PROXYS_EV_CONN_ERROR, rv, NULL);
+ break;
+ }
+ return 1;
+ }
+ return 0;
+}
+
apr_status_t h2_proxy_session_process(h2_proxy_session *session)
{
apr_status_t status;
case H2_PROXYS_ST_BUSY:
case H2_PROXYS_ST_LOCAL_SHUTDOWN:
case H2_PROXYS_ST_REMOTE_SHUTDOWN:
- while (nghttp2_session_want_write(session->ngh2)) {
- int rv = nghttp2_session_send(session->ngh2);
- if (rv < 0 && nghttp2_is_fatal(rv)) {
- ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c,
- "h2_proxy_session(%s): write, rv=%d", session->id, rv);
- dispatch_event(session, H2_PROXYS_EV_CONN_ERROR, rv, NULL);
- break;
- }
- have_written = 1;
- }
+ have_written = send_loop(session);
if (nghttp2_session_want_read(session->ngh2)) {
status = h2_proxy_session_read(session, 0, 0);
h2_proxy_request_done *done;
} cleanup_iter_ctx;
+static int cancel_iter(void *udata, void *val)
+{
+ cleanup_iter_ctx *ctx = udata;
+ h2_proxy_stream *stream = val;
+ nghttp2_submit_rst_stream(ctx->session->ngh2, NGHTTP2_FLAG_NONE,
+ stream->id, 0);
+ return 1;
+}
+
+void h2_proxy_session_cancel_all(h2_proxy_session *session)
+{
+ if (!h2_proxy_ihash_empty(session->streams)) {
+ cleanup_iter_ctx ctx;
+ ctx.session = session;
+ ctx.done = session->done;
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03366)
+ "h2_proxy_session(%s): cancel %d streams",
+ session->id, (int)h2_proxy_ihash_count(session->streams));
+ h2_proxy_ihash_iter(session->streams, cancel_iter, &ctx);
+ session_shutdown(session, 0, NULL);
+ }
+}
+
static int done_iter(void *udata, void *val)
{
cleanup_iter_ctx *ctx = udata;
h2_proxy_stream *stream = val;
int touched = (stream->data_sent ||
stream->id <= ctx->session->last_stream_id);
- ctx->done(ctx->session, stream->r, 0, touched);
+ ctx->done(ctx->session, stream->r, APR_ECONNABORTED, touched);
return 1;
}
typedef struct h2_proxy_session h2_proxy_session;
typedef void h2_proxy_request_done(h2_proxy_session *s, request_rec *r,
- int complete, int touched);
+ apr_status_t status, int touched);
struct h2_proxy_session {
const char *id;
*/
apr_status_t h2_proxy_session_process(h2_proxy_session *s);
+void h2_proxy_session_cancel_all(h2_proxy_session *s);
+
void h2_proxy_session_cleanup(h2_proxy_session *s, h2_proxy_request_done *done);
void h2_proxy_session_update_window(h2_proxy_session *s,
}
}
- s = apr_table_get(req->headers, "Expect");
- if (s && s[0]) {
- if (ap_cstr_casecmp(s, "100-continue") == 0) {
- req->expect_100 = 1;
- }
- else {
- req->expect_failed = 1;
- }
- }
-
return APR_SUCCESS;
}
const char *rpath;
apr_pool_t *p;
request_rec *r;
+ const char *s;
apr_pool_create(&p, c->pool);
apr_pool_tag(p, "request");
/* we may have switched to another server */
r->per_dir_config = r->server->lookup_defaults;
- if (req->expect_100) {
- r->expecting_100 = 1;
- }
- else if (req->expect_failed) {
- r->status = HTTP_EXPECTATION_FAILED;
- ap_send_error_response(r, 0);
+ s = apr_table_get(r->headers_in, "Expect");
+ if (s && s[0]) {
+ if (ap_cstr_casecmp(s, "100-continue") == 0) {
+ r->expecting_100 = 1;
+ }
+ else {
+ r->status = HTTP_EXPECTATION_FAILED;
+ ap_send_error_response(r, 0);
+ }
}
/*
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c,
"h2_stream(%ld-%d): EOS bucket cleanup -> done",
session->id, stream->id);
- h2_ihash_remove(session->streams, stream->id);
h2_mplx_stream_done(session->mplx, stream);
dispatch_event(session, H2_SESSION_EV_STREAM_DONE, 0, NULL);
h2_stream *candidate;
} stream_sel_ctx;
-static int find_cleanup_stream(void *udata, void *sdata)
+static int find_cleanup_stream(h2_stream *stream, void *ictx)
{
- stream_sel_ctx *ctx = udata;
- h2_stream *stream = sdata;
+ stream_sel_ctx *ctx = ictx;
if (H2_STREAM_CLIENT_INITIATED(stream->id)) {
if (!ctx->session->local.accepting
&& stream->id > ctx->session->local.accepted_max) {
ctx.session = session;
ctx.candidate = NULL;
while (1) {
- h2_ihash_iter(session->streams, find_cleanup_stream, &ctx);
+ h2_mplx_stream_do(session->mplx, find_cleanup_stream, &ctx);
if (ctx.candidate) {
h2_session_stream_done(session, ctx.candidate);
ctx.candidate = NULL;
stream = h2_stream_open(stream_id, stream_pool, session,
initiated_on);
nghttp2_session_set_stream_user_data(session->ngh2, stream_id, stream);
- h2_ihash_add(session->streams, stream);
if (req) {
h2_stream_set_request(stream, req);
{
ap_assert(session);
- h2_ihash_clear(session->streams);
if (session->mplx) {
h2_mplx_set_consumed_cb(session->mplx, NULL, NULL);
h2_mplx_release_and_join(session->mplx, session->iowait);
return NULL;
}
- session->streams = h2_ihash_create(session->pool,
- offsetof(h2_stream, id));
session->mplx = h2_mplx_create(c, session->pool, session->config,
session->s->timeout, workers);
struct h2_workers *workers; /* for executing stream tasks */
struct h2_filter_cin *cin; /* connection input filter context */
h2_conn_io io; /* io on httpd conn filters */
- struct h2_ihash_t *streams; /* streams handled by this session */
struct nghttp2_session *ngh2; /* the nghttp2 session (internal use) */
h2_session_state state; /* state session is in */
return h2_mplx_out_open(task->mplx, task->stream_id, task->output.beam);
}
-static apr_status_t send_out(h2_task *task, apr_bucket_brigade* bb)
+static apr_status_t send_out(h2_task *task, apr_bucket_brigade* bb, int block)
{
apr_off_t written, left;
apr_status_t status;
H2_TASK_OUT_LOG(APLOG_TRACE2, task, bb, "h2_task send_out");
/* engines send unblocking */
status = h2_beam_send(task->output.beam, bb,
- task->assigned? APR_NONBLOCK_READ
- : APR_BLOCK_READ);
+ block? APR_BLOCK_READ : APR_NONBLOCK_READ);
if (APR_STATUS_IS_EAGAIN(status)) {
apr_brigade_length(bb, 0, &left);
written -= left;
{
apr_bucket *b;
apr_status_t status = APR_SUCCESS;
- int flush = 0;
-
- if (APR_BRIGADE_EMPTY(bb)) {
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, task->c,
- "h2_slave_out(%s): empty write", task->id);
- return APR_SUCCESS;
- }
+ int flush = 0, blocking;
if (task->frozen) {
h2_util_bb_log(task->c, task->stream_id, APLOG_TRACE2,
}
return APR_SUCCESS;
}
-
- /* Attempt to write saved brigade first */
- if (task->output.bb && !APR_BRIGADE_EMPTY(task->output.bb)) {
- status = send_out(task, task->output.bb);
- if (status != APR_SUCCESS) {
- return status;
- }
- }
-
- /* If there is nothing saved (anymore), try to write the brigade passed */
- if ((!task->output.bb || APR_BRIGADE_EMPTY(task->output.bb))
- && !APR_BRIGADE_EMPTY(bb)) {
- /* check if we have a flush before the end-of-request */
- if (!task->output.opened) {
- for (b = APR_BRIGADE_FIRST(bb);
- b != APR_BRIGADE_SENTINEL(bb);
- b = APR_BUCKET_NEXT(b)) {
- if (AP_BUCKET_IS_EOR(b)) {
- break;
- }
- else if (APR_BUCKET_IS_FLUSH(b)) {
- flush = 1;
- }
- }
- }
- status = send_out(task, bb);
- if (status != APR_SUCCESS) {
- return status;
+ /* we send block once we opened the output, so someone is there
+ * reading it *and* the task is not assigned to a h2_req_engine */
+ blocking = (!task->assigned && task->output.opened);
+ if (!task->output.opened) {
+ for (b = APR_BRIGADE_FIRST(bb);
+ b != APR_BRIGADE_SENTINEL(bb);
+ b = APR_BUCKET_NEXT(b)) {
+ if (APR_BUCKET_IS_FLUSH(b)) {
+ flush = 1;
+ break;
+ }
}
}
- /* If the passed brigade is not empty, save it before return */
- if (!APR_BRIGADE_EMPTY(bb)) {
- ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, task->c, APLOGNO(03405)
- "h2_slave_out(%s): could not write all, saving brigade",
- task->id);
- if (!task->output.bb) {
- task->output.bb = apr_brigade_create(task->pool,
- task->c->bucket_alloc);
+ if (task->output.bb && !APR_BRIGADE_EMPTY(task->output.bb)) {
+ /* still have data buffered from previous attempt.
+ * setaside and append new data and try to pass the complete data */
+ if (!APR_BRIGADE_EMPTY(bb)) {
+ status = ap_save_brigade(f, &task->output.bb, &bb, task->pool);
}
- status = ap_save_brigade(f, &task->output.bb, &bb, task->pool);
- if (status != APR_SUCCESS) {
- return status;
+ if (status == APR_SUCCESS) {
+ status = send_out(task, task->output.bb, blocking);
+ }
+ }
+ else {
+ /* no data buffered here, try to pass the brigade directly */
+ status = send_out(task, bb, blocking);
+ if (status == APR_SUCCESS && !APR_BRIGADE_EMPTY(bb)) {
+ /* could not write all, buffer the rest */
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, task->c, APLOGNO(03405)
+ "h2_slave_out(%s): saving brigade",
+ task->id);
+ status = ap_save_brigade(f, &task->output.bb, &bb, task->pool);
+ flush = 1;
}
}
- if (!task->output.opened
- && (flush || h2_beam_get_mem_used(task->output.beam) > (32*1024))) {
- /* if we have enough buffered or we got a flush bucket, open
- * the response now. */
+ if (status == APR_SUCCESS && !task->output.opened && flush) {
+ /* got a flush or could not write all, time to tell someone to read */
status = open_output(task);
}
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, task->c,
/* Hmm, well. There is mode AP_MODE_EATCRLF, but we chose not
* to support it. Seems to work. */
ap_log_cerror(APLOG_MARK, APLOG_ERR, APR_ENOTIMPL, f->c,
- APLOGNO(02942)
+ APLOGNO(03472)
"h2_slave_in(%s), unsupported READ mode %d",
task->id, mode);
status = APR_ENOTIMPL;
/* There are cases where we need to parse a serialized http/1.1
* response. One example is a 100-continue answer in serialized mode
* or via a mod_proxy setup */
- while (task->output.parse_response) {
+ while (!task->output.sent_response) {
status = h2_from_h1_parse_response(task, f, bb);
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, f->c,
"h2_task(%s): parsed response", task->id);
******************************************************************************/
int h2_task_can_redo(h2_task *task) {
- if (task->input.beam && h2_beam_was_received(task->input.beam)) {
+ if (h2_beam_was_received(task->input.beam)) {
/* cannot repeat that. */
return 0;
}
void h2_task_rst(h2_task *task, int error)
{
task->rst_error = error;
- if (task->input.beam) {
- h2_beam_abort(task->input.beam);
- }
- if (!task->worker_done && task->output.beam) {
+ h2_beam_abort(task->input.beam);
+ if (!task->worker_done) {
h2_beam_abort(task->output.beam);
}
if (task->c) {
task->input.bb = apr_brigade_create(task->pool, task->c->bucket_alloc);
if (task->request->serialize) {
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, task->c,
- "h2_task(%s): serialize request %s %s, expect-100=%d",
- task->id, task->request->method, task->request->path,
- task->request->expect_100);
+ "h2_task(%s): serialize request %s %s",
+ task->id, task->request->method, task->request->path);
apr_brigade_printf(task->input.bb, NULL,
NULL, "%s %s HTTP/1.1\r\n",
task->request->method, task->request->path);
apr_table_do(input_ser_header, task, task->request->headers, NULL);
apr_brigade_puts(task->input.bb, NULL, NULL, "\r\n");
- if (task->request->expect_100) {
- /* we are unable to suppress the serialization of the
- * intermediate response and need to parse it */
- task->output.parse_response = 1;
- }
- }
-
- if (task->request->expect_100) {
- task->output.parse_response = 1;
}
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, task->c,
unsigned int opened : 1;
unsigned int sent_response : 1;
unsigned int copy_files : 1;
- unsigned int parse_response : 1;
struct h2_response_parser *rparser;
apr_bucket_brigade *bb;
} output;
{
int i;
+ if (h2_iq_contains(q, sid)) {
+ return;
+ }
if (q->nelts >= q->nalloc) {
iq_grow(q, q->nalloc * 2);
}
-
i = (q->head + q->nelts) % q->nalloc;
q->elts[i] = sid;
++q->nelts;
}
}
+void h2_iq_append(h2_iqueue *q, int sid)
+{
+ h2_iq_add(q, sid, NULL, NULL);
+}
+
int h2_iq_remove(h2_iqueue *q, int sid)
{
int i;
return sid;
}
+size_t h2_iq_mshift(h2_iqueue *q, int *pint, size_t max)
+{
+ int i;
+ for (i = 0; i < max; ++i) {
+ pint[i] = h2_iq_shift(q);
+ if (pint[i] == 0) {
+ break;
+ }
+ }
+ return i;
+}
+
static void iq_grow(h2_iqueue *q, int nlen)
{
if (nlen > q->nalloc) {
return i;
}
+int h2_iq_contains(h2_iqueue *q, int sid)
+{
+ int i;
+ for (i = 0; i < q->nelts; ++i) {
+ if (sid == q->elts[(q->head + i) % q->nalloc]) {
+ return 1;
+ }
+ }
+ return 0;
+}
+
/*******************************************************************************
* h2_util for apt_table_t
******************************************************************************/
/**
* Add a stream id to the queue.
*
- * @param q the queue to append the task to
+ * @param q the queue to append the id to
* @param sid the stream id to add
* @param cmp the comparator for sorting
* @param ctx user data for comparator
*/
void h2_iq_add(h2_iqueue *q, int sid, h2_iq_cmp *cmp, void *ctx);
+/**
+ * Append the id to the queue if not already present.
+ *
+ * @param q the queue to append the id to
+ * @param sid the id to append
+ */
+void h2_iq_append(h2_iqueue *q, int sid);
+
/**
* Remove the stream id from the queue. Return != 0 iff task
* was found in queue.
void h2_iq_sort(h2_iqueue *q, h2_iq_cmp *cmp, void *ctx);
/**
- * Get the first stream id from the queue or NULL if the queue is empty.
- * The task will be removed.
+ * Get the first id from the queue or 0 if the queue is empty.
+ * The id is being removed.
*
- * @param q the queue to get the first task from
- * @return the first stream id of the queue, 0 if empty
+ * @param q the queue to get the first id from
+ * @return the first id of the queue, 0 if empty
*/
int h2_iq_shift(h2_iqueue *q);
+/**
+ * Get the first max ids from the queue. All these ids will be removed.
+ *
+ * @param q the queue to get the first task from
+ * @param pint the int array to receive the values
+ * @param max the maximum number of ids to shift
+ * @return the actual number of ids shifted
+ */
+size_t h2_iq_mshift(h2_iqueue *q, int *pint, size_t max);
+
+/**
+ * Determine if int is in the queue already
+ *
+ * @parm q the queue
+ * @param sid the integer id to check for
+ * @return != 0 iff sid is already in the queue
+ */
+int h2_iq_contains(h2_iqueue *q, int sid);
+
/*******************************************************************************
* common helpers
******************************************************************************/
* @macro
* Version number of the http2 module as c string
*/
-#define MOD_HTTP2_VERSION "1.7.7"
+#define MOD_HTTP2_VERSION "1.7.8"
/**
* @macro
* release. This is a 24 bit number with 8 bits for major number, 8 bits
* for minor and 8 bits for patch. Version 1.2.3 becomes 0x010203.
*/
-#define MOD_HTTP2_VERSION_NUM 0x010707
+#define MOD_HTTP2_VERSION_NUM 0x010708
#endif /* mod_h2_h2_version_h */
return h2_mplx_req_engine_pull(ngn, block, capacity, pr);
}
-static void http2_req_engine_done(h2_req_engine *ngn, conn_rec *r_conn)
+static void http2_req_engine_done(h2_req_engine *ngn, conn_rec *r_conn,
+ apr_status_t status)
{
- h2_mplx_req_engine_done(ngn, r_conn);
+ h2_mplx_req_engine_done(ngn, r_conn, status);
}
/* Runs once per created child process. Perform any process
request_rec **pr));
APR_DECLARE_OPTIONAL_FN(void,
http2_req_engine_done, (h2_req_engine *engine,
- conn_rec *rconn));
+ conn_rec *rconn,
+ apr_status_t status));
#endif
apr_read_type_e block,
int capacity,
request_rec **pr);
-static void (*req_engine_done)(h2_req_engine *engine, conn_rec *r_conn);
+static void (*req_engine_done)(h2_req_engine *engine, conn_rec *r_conn,
+ apr_status_t status);
typedef struct h2_proxy_ctx {
conn_rec *owner;
}
static void request_done(h2_proxy_session *session, request_rec *r,
- int complete, int touched)
+ apr_status_t status, int touched)
{
h2_proxy_ctx *ctx = session->user_data;
const char *task_id = apr_table_get(r->connection->notes, H2_TASK_ID_NOTE);
- if (!complete) {
+ if (status != APR_SUCCESS) {
if (!touched) {
/* untouched request, need rescheduling */
if (req_engine_push && is_h2 && is_h2(ctx->owner)) {
else {
const char *uri;
uri = apr_uri_unparse(r->pool, &r->parsed_uri, 0);
- ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, r->connection,
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, r->connection,
APLOGNO(03471) "h2_proxy_session(%s): request %s -> %s "
"not complete, was touched",
ctx->engine_id, task_id, uri);
}
if (r == ctx->rbase) {
- ctx->r_status = complete? APR_SUCCESS : HTTP_GATEWAY_TIME_OUT;
+ ctx->r_status = (status == APR_SUCCESS)? APR_SUCCESS : HTTP_SERVICE_UNAVAILABLE;
}
if (req_engine_done && ctx->engine) {
- if (complete) {
- ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, r->connection,
- APLOGNO(03370)
- "h2_proxy_session(%s): finished request %s",
- ctx->engine_id, task_id);
- }
- else {
- ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, r->connection,
- APLOGNO(03371)
- "h2_proxy_session(%s): failed request %s",
- ctx->engine_id, task_id);
- }
- req_engine_done(ctx->engine, r->connection);
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, r->connection,
+ APLOGNO(03370)
+ "h2_proxy_session(%s): finished request %s",
+ ctx->engine_id, task_id);
+ req_engine_done(ctx->engine, r->connection, status);
}
}
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, s2, ctx->owner,
APLOGNO(03374) "eng(%s): pull request",
ctx->engine_id);
- status = s2;
+ /* give notice that we're leaving and cancel all ongoing
+ * streams. */
+ next_request(ctx, 1);
+ h2_proxy_session_cancel_all(ctx->session);
+ h2_proxy_session_process(ctx->session);
+ status = ctx->r_status = APR_SUCCESS;
break;
}
if (!ctx->next && h2_proxy_ihash_empty(ctx->session->streams)) {