Changes with Apache 2.4.19
+ *) mod_http2: give control to async mpm for keepalive timeouts only when
+ no streams are open and even if only after 1 sec delay. Under load, event
+ mpm discards connections otherwise too quickly. [Stefan Eissing]
+
*) mod_ssl: Don't lose track of the SSL context if the ssl_run_pre_handshake()
hook returns an error. [Graham Leggett]
modules/http2/h2_mplx.c modules/http2/h2_push.c
modules/http2/h2_request.c modules/http2/h2_response.c
modules/http2/h2_session.c modules/http2/h2_stream.c
- modules/http2/h2_switch.c
+ modules/http2/h2_switch.c modules/http2/h2_ngn_shed.c
modules/http2/h2_task.c modules/http2/h2_task_input.c
modules/http2/h2_task_output.c modules/http2/h2_int_queue.c
modules/http2/h2_util.c modules/http2/h2_worker.c
# If there is an NLM target, put it here
#
TARGET_nlm = \
- $(OBJDIR)/mod_http2.nlm \
$(OBJDIR)/mod_http2.nlm \
$(EOLIST)
$(OBJDIR)/h2_io.o \
$(OBJDIR)/h2_io_set.o \
$(OBJDIR)/h2_mplx.o \
+ $(OBJDIR)/h2_ngn_shed.o \
$(OBJDIR)/h2_push.o \
$(OBJDIR)/h2_request.o \
$(OBJDIR)/h2_response.o \
# Any symbols exported to here
#
FILES_nlm_exports = \
- http2_module \
+ @$(OBJDIR)/mod_http2.imp \
$(EOLIST)
#
libs :: $(OBJDIR) $(NGH2SRC)/lib/config.h $(TARGET_lib)
-nlms :: libs $(TARGET_nlm)
+nlms :: libs $(OBJDIR)/mod_http2.imp $(TARGET_nlm)
#
# Updated this target to create necessary directories and copy files to the
#
vpath %.c $(NGH2SRC)/lib
-$(NGH2SRC)/lib/config.h : NWGNUmakefile
+$(NGH2SRC)/lib/config.h : NWGNUmod_http2
+ @-$(RM) $@
@echo $(DL)GEN $@$(DL)
@echo $(DL)/* For NetWare target.$(DL) > $@
@echo $(DL)** Do not edit - created by Make!$(DL) >> $@
@echo $(DL)#endif /* NGH2_CONFIG_H */$(DL) >> $@
#
+# Exports from mod_http2 for mod_proxy_http2
+$(OBJDIR)/mod_http2.imp : NWGNUmod_http2
+ @-$(RM) $@
+ @echo $(DL)GEN $@$(DL)
+ @echo $(DL) (HTTP2)$(DL) > $@
+ @echo $(DL) http2_module,$(DL) >> $@
+ @echo $(DL) h2_ihash_add,$(DL) >> $@
+ @echo $(DL) h2_ihash_clear,$(DL) >> $@
+ @echo $(DL) h2_ihash_count,$(DL) >> $@
+ @echo $(DL) h2_ihash_create,$(DL) >> $@
+ @echo $(DL) h2_ihash_is_empty,$(DL) >> $@
+ @echo $(DL) h2_ihash_iter,$(DL) >> $@
+ @echo $(DL) h2_ihash_remove,$(DL) >> $@
+ @echo $(DL) h2_iq_add,$(DL) >> $@
+ @echo $(DL) h2_iq_create,$(DL) >> $@
+ @echo $(DL) h2_iq_remove,$(DL) >> $@
+ @echo $(DL) h2_log2,$(DL) >> $@
+ @echo $(DL) h2_proxy_res_ignore_header,$(DL) >> $@
+ @echo $(DL) h2_request_create,$(DL) >> $@
+ @echo $(DL) h2_request_make,$(DL) >> $@
+ @echo $(DL) h2_util_camel_case_header,$(DL) >> $@
+ @echo $(DL) h2_util_frame_print,$(DL) >> $@
+ @echo $(DL) h2_util_ngheader_make_req,$(DL) >> $@
+ @echo $(DL) nghttp2_is_fatal,$(DL) >> $@
+ @echo $(DL) nghttp2_option_del,$(DL) >> $@
+ @echo $(DL) nghttp2_option_new,$(DL) >> $@
+ @echo $(DL) nghttp2_option_set_no_auto_window_update,$(DL) >> $@
+ @echo $(DL) nghttp2_option_set_peer_max_concurrent_streams,$(DL) >> $@
+ @echo $(DL) nghttp2_session_callbacks_del,$(DL) >> $@
+ @echo $(DL) nghttp2_session_callbacks_new,$(DL) >> $@
+ @echo $(DL) nghttp2_session_callbacks_set_before_frame_send_callback,$(DL) >> $@
+ @echo $(DL) nghttp2_session_callbacks_set_on_data_chunk_recv_callback,$(DL) >> $@
+ @echo $(DL) nghttp2_session_callbacks_set_on_frame_recv_callback,$(DL) >> $@
+ @echo $(DL) nghttp2_session_callbacks_set_on_header_callback,$(DL) >> $@
+ @echo $(DL) nghttp2_session_callbacks_set_on_stream_close_callback,$(DL) >> $@
+ @echo $(DL) nghttp2_session_callbacks_set_send_callback,$(DL) >> $@
+ @echo $(DL) nghttp2_session_client_new2,$(DL) >> $@
+ @echo $(DL) nghttp2_session_consume,$(DL) >> $@
+ @echo $(DL) nghttp2_session_del,$(DL) >> $@
+ @echo $(DL) nghttp2_session_get_remote_settings,$(DL) >> $@
+ @echo $(DL) nghttp2_session_get_stream_user_data,$(DL) >> $@
+ @echo $(DL) nghttp2_session_mem_recv,$(DL) >> $@
+ @echo $(DL) nghttp2_session_resume_data,$(DL) >> $@
+ @echo $(DL) nghttp2_session_send,$(DL) >> $@
+ @echo $(DL) nghttp2_session_want_read,$(DL) >> $@
+ @echo $(DL) nghttp2_session_want_write,$(DL) >> $@
+ @echo $(DL) nghttp2_strerror,$(DL) >> $@
+ @echo $(DL) nghttp2_submit_goaway,$(DL) >> $@
+ @echo $(DL) nghttp2_submit_request,$(DL) >> $@
+ @echo $(DL) nghttp2_submit_rst_stream,$(DL) >> $@
+ @echo $(DL) nghttp2_submit_settings,$(DL) >> $@
+ @echo $(DL) nghttp2_submit_window_update,$(DL) >> $@
+ @echo $(DL) nghttp2_version$(DL) >> $@
+
# Include the 'tail' makefile that has targets that depend on variables defined
# in this makefile
#
h2_io.lo dnl
h2_io_set.lo dnl
h2_mplx.lo dnl
+h2_ngn_shed.lo dnl
h2_push.lo dnl
h2_request.lo dnl
h2_response.lo dnl
])
# Ensure that other modules can pick up mod_http2.h
-# APR_ADDTO(INCLUDES, [-I\$(top_srcdir)/$modpath_current])
+# icing: hold back for now until it is more stable
+#APR_ADDTO(INCLUDES, [-I\$(top_srcdir)/$modpath_current])
+
+
dnl # end of module specific part
APACHE_MODPATH_FINISH
};
+/* Note key to attach connection task id to conn_rec/request_rec instances */
+
+#define H2_TASK_ID_NOTE "http2-task-id"
+
+
#endif /* defined(__mod_h2__h2__) */
return status;
}
-/* This is an internal mpm event.c struct which is disguised
- * as a conn_state_t so that mpm_event can have special connection
- * state information without changing the struct seen on the outside.
- *
- * For our task connections we need to create a new beast of this type
- * and fill it with enough meaningful things that mpm_event reads and
- * starts processing out task request.
- */
-typedef struct event_conn_state_t event_conn_state_t;
-struct event_conn_state_t {
- /** APR_RING of expiration timeouts */
- APR_RING_ENTRY(event_conn_state_t) timeout_list;
- /** the expiration time of the next keepalive timeout */
- apr_time_t expiration_time;
- /** connection record this struct refers to */
- conn_rec *c;
- /** request record (if any) this struct refers to */
- request_rec *r;
- /** is the current conn_rec suspended? (disassociated with
- * a particular MPM thread; for suspend_/resume_connection
- * hooks)
- */
- int suspended;
- /** memory pool to allocate from */
- apr_pool_t *p;
- /** bucket allocator */
- apr_bucket_alloc_t *bucket_alloc;
- /** poll file descriptor information */
- apr_pollfd_t pfd;
- /** public parts of the connection state */
- conn_state_t pub;
-};
-APR_RING_HEAD(timeout_head_t, event_conn_state_t);
-
-static void fix_event_conn(conn_rec *c, conn_rec *master)
-{
- event_conn_state_t *master_cs = ap_get_module_config(master->conn_config,
- h2_conn_mpm_module());
- event_conn_state_t *cs = apr_pcalloc(c->pool, sizeof(event_conn_state_t));
- cs->bucket_alloc = apr_bucket_alloc_create(c->pool);
-
- ap_set_module_config(c->conn_config, h2_conn_mpm_module(), cs);
-
- cs->c = c;
- cs->r = NULL;
- cs->p = master_cs->p;
- cs->pfd = master_cs->pfd;
- cs->pub = master_cs->pub;
- cs->pub.state = CONN_STATE_READ_REQUEST_LINE;
-
- c->cs = &(cs->pub);
-}
conn_rec *h2_slave_create(conn_rec *master, apr_pool_t *parent,
apr_allocator_t *allocator)
ap_set_module_config(c->conn_config, h2_conn_mpm_module(), cfg);
}
- switch (h2_conn_mpm_type()) {
- case H2_MPM_EVENT:
- fix_event_conn(c, master);
- break;
- default:
- break;
- }
-
return c;
}
void h2_slave_destroy(conn_rec *slave, apr_allocator_t **pallocator)
{
apr_allocator_t *allocator = apr_pool_allocator_get(slave->pool);
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, slave,
+ "h2_slave_conn(%ld): destroy (task=%s)", slave->id,
+ apr_table_get(slave->notes, H2_TASK_ID_NOTE));
apr_pool_destroy(slave->pool);
if (pallocator) {
*pallocator = allocator;
return h2_conn_io_flush_int(io, flush, 0);
}
+apr_status_t h2_conn_io_flush(h2_conn_io *io)
+{
+ /* make sure we always write a flush, even if our buffers are empty.
+ * We want to flush not only our buffers, but alse ones further down
+ * the connection filters. */
+ apr_bucket *b = apr_bucket_flush_create(io->connection->bucket_alloc);
+ APR_BRIGADE_INSERT_TAIL(io->output, b);
+ return h2_conn_io_flush_int(io, 0, 0);
+}
+
apr_status_t h2_conn_io_consider_pass(h2_conn_io *io)
{
apr_off_t len = 0;
* @param flush if a flush bucket should be appended to any output
*/
apr_status_t h2_conn_io_pass(h2_conn_io *io, int flush);
+apr_status_t h2_conn_io_flush(h2_conn_io *io);
/**
* Check the amount of buffered output and pass it on if enough has accumulated.
status = h2_util_bb_readx(io->bbout, cb, ctx, plen, peos);
if (status == APR_SUCCESS) {
io->eos_out_read = *peos;
+ io->output_consumed += *plen;
}
}
apr_status_t h2_io_out_read_to(h2_io *io, apr_bucket_brigade *bb,
apr_off_t *plen, int *peos)
{
+ apr_status_t status;
+
if (io->rst_error) {
return APR_ECONNABORTED;
}
}
io->eos_out_read = *peos = h2_util_has_eos(io->bbout, *plen);
- return h2_util_move(bb, io->bbout, *plen, NULL, "h2_io_read_to");
+ status = h2_util_move(bb, io->bbout, *plen, NULL, "h2_io_read_to");
+ io->output_consumed += *plen;
+ return status;
}
static void process_trailers(h2_io *io, apr_table_t *trailers)
apr_time_t started_at; /* when processing started */
apr_time_t done_at; /* when processing was done */
apr_size_t input_consumed; /* how many bytes have been read */
+ apr_size_t output_consumed; /* how many bytes have been written out */
int files_handles_owned;
};
#include "h2_io_set.h"
#include "h2_response.h"
#include "h2_mplx.h"
+#include "h2_ngn_shed.h"
#include "h2_request.h"
#include "h2_stream.h"
#include "h2_task.h"
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
"h2_mplx(%ld): destroy, ios=%d",
m->id, (int)h2_io_set_size(m->stream_ios));
- m->aborted = 1;
-
check_tx_free(m);
-
if (m->pool) {
apr_pool_destroy(m->pool);
}
return NULL;
}
- status = apr_thread_cond_create(&m->task_done, m->pool);
+ status = apr_thread_cond_create(&m->req_added, m->pool);
if (status != APR_SUCCESS) {
h2_mplx_destroy(m);
return NULL;
}
- m->q = h2_iq_create(m->pool, h2_config_geti(conf, H2_CONF_MAX_STREAMS));
+ m->max_streams = h2_config_geti(conf, H2_CONF_MAX_STREAMS);
+ m->stream_max_mem = h2_config_geti(conf, H2_CONF_STREAM_MAX_MEM);
+ m->q = h2_iq_create(m->pool, m->max_streams);
m->stream_ios = h2_io_set_create(m->pool);
m->ready_ios = h2_io_set_create(m->pool);
- m->stream_max_mem = h2_config_geti(conf, H2_CONF_STREAM_MAX_MEM);
m->stream_timeout = stream_timeout;
m->workers = workers;
m->workers_max = workers->max_workers;
m->tx_handles_reserved = 0;
m->tx_chunk_size = 4;
+
+ m->ngn_shed = h2_ngn_shed_create(m->pool, m->c, m->max_streams,
+ m->stream_max_mem);
+ h2_ngn_shed_set_ctx(m->ngn_shed , m);
}
return m;
}
h2_mplx_set_consumed_cb(m, NULL, NULL);
h2_iq_clear(m->q);
- apr_thread_cond_broadcast(m->task_done);
+ apr_thread_cond_broadcast(m->req_added);
while (!h2_io_set_iter(m->stream_ios, stream_done_iter, m)) {
/* iterate until all ios have been orphaned or destroyed */
}
h2_io_set_iter(m->stream_ios, stream_print, m);
}
}
- m->aborted = 1;
- apr_thread_cond_broadcast(m->task_done);
+ h2_mplx_abort(m);
+ apr_thread_cond_broadcast(m->req_added);
}
}
+
+ if (!h2_io_set_is_empty(m->stream_ios)) {
+ ap_log_cerror(APLOG_MARK, APLOG_INFO, 0, m->c,
+ "h2_mplx(%ld): release_join, %d streams still open",
+ m->id, (int)h2_io_set_size(m->stream_ios));
+ }
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c, APLOGNO(03056)
"h2_mplx(%ld): release_join -> destroy", m->id);
leave_mutex(m, acquired);
void h2_mplx_abort(h2_mplx *m)
{
- apr_status_t status;
int acquired;
AP_DEBUG_ASSERT(m);
- if (!m->aborted) {
- if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
- m->aborted = 1;
- leave_mutex(m, acquired);
- }
+ if (!m->aborted && enter_mutex(m, &acquired) == APR_SUCCESS) {
+ m->aborted = 1;
+ h2_ngn_shed_abort(m->ngn_shed);
+ leave_mutex(m, acquired);
}
}
}
static apr_status_t out_write(h2_mplx *m, h2_io *io,
- ap_filter_t* f, apr_bucket_brigade *bb,
+ ap_filter_t* f, int blocking,
+ apr_bucket_brigade *bb,
apr_table_t *trailers,
struct apr_thread_cond_t *iowait)
{
&& iowait
&& (m->stream_max_mem <= h2_io_out_length(io))
&& !is_aborted(m, &status)) {
+ if (!blocking) {
+ return APR_INCOMPLETE;
+ }
trailers = NULL;
if (f) {
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, f->c,
check_tx_reservation(m);
}
if (bb) {
- status = out_write(m, io, f, bb, response->trailers, iowait);
+ status = out_write(m, io, f, 0, bb, response->trailers, iowait);
+ if (status == APR_INCOMPLETE) {
+ /* write will have transferred as much data as possible.
+ caller has to deal with non-empty brigade */
+ status = APR_SUCCESS;
+ }
}
have_out_data_for(m, stream_id);
}
}
apr_status_t h2_mplx_out_write(h2_mplx *m, int stream_id,
- ap_filter_t* f, apr_bucket_brigade *bb,
+ ap_filter_t* f, int blocking,
+ apr_bucket_brigade *bb,
apr_table_t *trailers,
struct apr_thread_cond_t *iowait)
{
if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
if (io && !io->orphaned) {
- status = out_write(m, io, f, bb, trailers, iowait);
+ status = out_write(m, io, f, blocking, bb, trailers, iowait);
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, m->c,
"h2_mplx(%ld-%d): write with trailers=%s",
m->id, io->id, trailers? "yes" : "no");
h2_response *r = h2_response_die(stream_id, APR_EGENERAL,
io->request, m->pool);
status = out_open(m, stream_id, r, NULL, NULL, NULL);
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, m->c,
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, m->c,
"h2_mplx(%ld-%d): close, no response, no rst",
m->id, io->id);
}
conn_rec *slave = h2_slave_create(m->c, m->pool, m->spare_allocator);
m->spare_allocator = NULL;
task = h2_task_create(m->id, io->request, slave, m);
+ apr_table_setn(slave->notes, H2_TASK_ID_NOTE, task->id);
io->worker_started = 1;
io->started_at = apr_time_now();
if (sid > m->max_stream_started) {
{
if (task) {
if (task->frozen) {
- /* this task was handed over to an engine for processing */
+ /* this task was handed over to an engine for processing
+ * and the original worker has finished. That means the
+ * engine may start processing now. */
h2_task_thaw(task);
- /* TODO: can we signal an engine that it can now start on this? */
+ /* we do not want the task to block on writing response
+ * bodies into the mplx. */
+ /* FIXME: this implementation is incomplete. */
+ h2_task_set_io_blocking(task, 0);
+ apr_thread_cond_broadcast(m->req_added);
}
else {
h2_io *io = h2_io_set_get(m->stream_ios, task->stream_id);
- ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
"h2_mplx(%ld): task(%s) done", m->id, task->id);
/* clean our references and report request as done. Signal
* that we want another unless we have been aborted */
* long as it has requests to handle. Might no be fair to
* other mplx's. Perhaps leave after n requests? */
h2_mplx_out_close(m, task->stream_id, NULL);
+
+ if (task->engine) {
+ if (!h2_req_engine_is_shutdown(task->engine)) {
+ ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c,
+ "h2_mplx(%ld): task(%s) has not-shutdown "
+ "engine(%s)", m->id, task->id,
+ h2_req_engine_get_id(task->engine));
+ }
+ h2_ngn_shed_done_ngn(m->ngn_shed, task->engine);
+ }
+
if (m->spare_allocator) {
apr_allocator_destroy(m->spare_allocator);
m->spare_allocator = NULL;
}
+
h2_slave_destroy(task->c, &m->spare_allocator);
task = NULL;
+
if (io) {
apr_time_t now = apr_time_now();
if (!io->orphaned && m->redo_ios
/* hang around until the stream deregisteres */
}
}
- apr_thread_cond_broadcast(m->task_done);
}
}
}
* HTTP/2 request engines
******************************************************************************/
-typedef struct h2_req_entry h2_req_entry;
-struct h2_req_entry {
- APR_RING_ENTRY(h2_req_entry) link;
- request_rec *r;
-};
-
-#define H2_REQ_ENTRY_NEXT(e) APR_RING_NEXT((e), link)
-#define H2_REQ_ENTRY_PREV(e) APR_RING_PREV((e), link)
-#define H2_REQ_ENTRY_REMOVE(e) APR_RING_REMOVE((e), link)
-
-typedef struct h2_req_engine_i h2_req_engine_i;
-struct h2_req_engine_i {
- h2_req_engine pub;
- conn_rec *c; /* connection this engine is assigned to */
- h2_mplx *m;
- unsigned int shutdown : 1; /* engine is being shut down */
- apr_thread_cond_t *io; /* condition var for waiting on data */
- APR_RING_HEAD(h2_req_entries, h2_req_entry) entries;
- apr_size_t no_assigned; /* # of assigned requests */
- apr_size_t no_live; /* # of live */
- apr_size_t no_finished; /* # of finished */
-};
-
-#define H2_REQ_ENTRIES_SENTINEL(b) APR_RING_SENTINEL((b), h2_req_entry, link)
-#define H2_REQ_ENTRIES_EMPTY(b) APR_RING_EMPTY((b), h2_req_entry, link)
-#define H2_REQ_ENTRIES_FIRST(b) APR_RING_FIRST(b)
-#define H2_REQ_ENTRIES_LAST(b) APR_RING_LAST(b)
-
-#define H2_REQ_ENTRIES_INSERT_HEAD(b, e) do { \
-h2_req_entry *ap__b = (e); \
-APR_RING_INSERT_HEAD((b), ap__b, h2_req_entry, link); \
-} while (0)
-
-#define H2_REQ_ENTRIES_INSERT_TAIL(b, e) do { \
-h2_req_entry *ap__b = (e); \
-APR_RING_INSERT_TAIL((b), ap__b, h2_req_entry, link); \
-} while (0)
-
-static apr_status_t h2_mplx_engine_schedule(h2_mplx *m,
- h2_req_engine_i *engine,
- request_rec *r)
-{
- h2_req_entry *entry = apr_pcalloc(r->pool, sizeof(*entry));
-
- APR_RING_ELEM_INIT(entry, link);
- entry->r = r;
- H2_REQ_ENTRIES_INSERT_TAIL(&engine->entries, entry);
- return APR_SUCCESS;
-}
-
-
-apr_status_t h2_mplx_engine_push(const char *engine_type,
- request_rec *r, h2_mplx_engine_init *einit)
+apr_status_t h2_mplx_req_engine_push(const char *ngn_type,
+ request_rec *r, h2_req_engine_init *einit)
{
apr_status_t status;
h2_mplx *m;
return APR_ECONNABORTED;
}
m = task->mplx;
- AP_DEBUG_ASSERT(m);
if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
h2_io *io = h2_io_set_get(m->stream_ios, task->stream_id);
status = APR_ECONNABORTED;
}
else {
- h2_req_engine_i *engine = (h2_req_engine_i*)m->engine;
-
- apr_table_set(r->connection->notes, H2_TASK_ID_NOTE, task->id);
- status = APR_EOF;
-
- if (task->ser_headers) {
- /* Max compatibility, deny processing of this */
- }
- else if (engine && !strcmp(engine->pub.type, engine_type)) {
- if (engine->shutdown
- || engine->no_assigned >= H2MIN(engine->pub.capacity, 100)) {
- ap_log_rerror(APLOG_MARK, APLOG_TRACE1, status, r,
- "h2_mplx(%ld): engine shutdown or over %s",
- m->c->id, engine->pub.id);
- engine = NULL;
- }
- else if (h2_mplx_engine_schedule(m, engine, r) == APR_SUCCESS) {
- /* this task will be processed in another thread,
- * freeze any I/O for the time being. */
- h2_task_freeze(task, r);
- engine->no_assigned++;
- status = APR_SUCCESS;
- ap_log_rerror(APLOG_MARK, APLOG_DEBUG, status, r,
- "h2_mplx(%ld): push request %s",
- m->c->id, r->the_request);
- }
- else {
- ap_log_rerror(APLOG_MARK, APLOG_TRACE1, status, r,
- "h2_mplx(%ld): engine error adding req %s",
- m->c->id, engine->pub.id);
- engine = NULL;
- }
- }
-
- if (!engine && einit) {
- engine = apr_pcalloc(task->c->pool, sizeof(*engine));
- engine->pub.id = apr_psprintf(task->c->pool, "eng-%ld-%d",
- m->id, m->next_eng_id++);
- engine->pub.pool = task->c->pool;
- engine->pub.type = apr_pstrdup(task->c->pool, engine_type);
- engine->pub.window_bits = 30;
- engine->pub.req_window_bits = h2_log2(m->stream_max_mem);
- engine->c = r->connection;
- APR_RING_INIT(&engine->entries, h2_req_entry, link);
- engine->m = m;
- engine->io = task->io;
- engine->no_assigned = 1;
- engine->no_live = 1;
-
- status = einit(&engine->pub, r);
- ap_log_rerror(APLOG_MARK, APLOG_TRACE1, status, r,
- "h2_mplx(%ld): init engine %s (%s)",
- m->c->id, engine->pub.id, engine->pub.type);
- if (status == APR_SUCCESS) {
- m->engine = &engine->pub;
- }
- }
+ status = h2_ngn_shed_push_req(m->ngn_shed, ngn_type,
+ task, r, einit);
}
-
leave_mutex(m, acquired);
}
return status;
}
-static h2_req_entry *pop_non_frozen(h2_req_engine_i *engine)
-{
- h2_req_entry *entry;
- h2_task *task;
-
- for (entry = H2_REQ_ENTRIES_FIRST(&engine->entries);
- entry != H2_REQ_ENTRIES_SENTINEL(&engine->entries);
- entry = H2_REQ_ENTRY_NEXT(entry)) {
- task = h2_ctx_rget_task(entry->r);
- AP_DEBUG_ASSERT(task);
- if (!task->frozen) {
- H2_REQ_ENTRY_REMOVE(entry);
- return entry;
- }
- }
- return NULL;
-}
-
-static apr_status_t engine_pull(h2_mplx *m, h2_req_engine_i *engine,
- apr_read_type_e block, request_rec **pr)
+apr_status_t h2_mplx_req_engine_pull(h2_req_engine *ngn,
+ apr_read_type_e block,
+ apr_uint32_t capacity,
+ request_rec **pr)
{
- h2_req_entry *entry;
-
- AP_DEBUG_ASSERT(m);
- AP_DEBUG_ASSERT(engine);
- while (1) {
- if (m->aborted) {
- ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
- "h2_mplx(%ld): mplx abort while pulling requests %s",
- m->id, engine->pub.id);
- *pr = NULL;
- return APR_EOF;
- }
-
- if (!H2_REQ_ENTRIES_EMPTY(&engine->entries)
- && (entry = pop_non_frozen(engine))) {
- ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, entry->r,
- "h2_mplx(%ld): request %s pulled by engine %s",
- m->c->id, entry->r->the_request, engine->pub.id);
- engine->no_live++;
- entry->r->connection->current_thread = engine->c->current_thread;
- *pr = entry->r;
- return APR_SUCCESS;
- }
- else if (APR_NONBLOCK_READ == block) {
- *pr = NULL;
- return APR_EAGAIN;
- }
- else if (H2_REQ_ENTRIES_EMPTY(&engine->entries)) {
- engine->shutdown = 1;
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
- "h2_mplx(%ld): emtpy queue, shutdown engine %s",
- m->id, engine->pub.id);
- *pr = NULL;
- return APR_EOF;
- }
- apr_thread_cond_timedwait(m->task_done, m->lock,
- apr_time_from_msec(100));
- }
-}
-
-apr_status_t h2_mplx_engine_pull(h2_req_engine *pub_engine,
- apr_read_type_e block, request_rec **pr)
-{
- h2_req_engine_i *engine = (h2_req_engine_i*)pub_engine;
- h2_mplx *m = engine->m;
+ h2_ngn_shed *shed = h2_ngn_shed_get_shed(ngn);
+ h2_mplx *m = h2_ngn_shed_get_ctx(shed);
apr_status_t status;
int acquired;
*pr = NULL;
if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
- status = engine_pull(m, engine, block, pr);
+ int want_shutdown = (block == APR_BLOCK_READ);
+ if (want_shutdown && !h2_iq_empty(m->q)) {
+ /* For a blocking read, check first if requests are to be
+ * had and, if not, wait a short while before doing the
+ * blocking, and if unsuccessful, terminating read.
+ */
+ status = h2_ngn_shed_pull_req(shed, ngn, capacity, 1, pr);
+ if (APR_STATUS_IS_EAGAIN(status)) {
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
+ "h2_mplx(%ld): start block engine pull", m->id);
+ apr_thread_cond_timedwait(m->req_added, m->lock,
+ apr_time_from_msec(20));
+ status = h2_ngn_shed_pull_req(shed, ngn, capacity, 1, pr);
+ }
+ }
+ else {
+ status = h2_ngn_shed_pull_req(shed, ngn, capacity, want_shutdown, pr);
+ }
leave_mutex(m, acquired);
}
return status;
}
-static void engine_done(h2_mplx *m, h2_req_engine_i *engine, h2_task *task,
- int waslive, int aborted)
+void h2_mplx_req_engine_done(h2_req_engine *ngn, conn_rec *r_conn)
{
- int acquired;
- ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c,
- "h2_mplx(%ld): task %s %s by %s",
- m->id, task->id, aborted? "aborted":"done",
- engine->pub.id);
- h2_task_output_close(task->output);
- engine->no_finished++;
- if (waslive) engine->no_live--;
- engine->no_assigned--;
- if (task->c != engine->c) { /* do not release what the engine runs on */
+ h2_task *task = h2_ctx_cget_task(r_conn);
+
+ if (task) {
+ h2_mplx *m = task->mplx;
+ int acquired;
+
if (enter_mutex(m, &acquired) == APR_SUCCESS) {
- task_done(m, task);
+ h2_ngn_shed_done_task(m->ngn_shed, ngn, task);
+ if (task->engine) {
+ /* cannot report that as done until engine returns */
+ }
+ else {
+ h2_task_output_close(task->output);
+ task_done(m, task);
+ }
leave_mutex(m, acquired);
}
}
}
-void h2_mplx_engine_done(h2_req_engine *pub_engine, conn_rec *r_conn)
-{
- h2_req_engine_i *engine = (h2_req_engine_i*)pub_engine;
- h2_mplx *m = engine->m;
- h2_task *task;
- int acquired;
-
- task = h2_ctx_cget_task(r_conn);
- if (task && (enter_mutex(m, &acquired) == APR_SUCCESS)) {
- engine_done(m, engine, task, 1, 0);
- leave_mutex(m, acquired);
- }
-}
-
-void h2_mplx_engine_exit(h2_req_engine *pub_engine)
-{
- h2_req_engine_i *engine = (h2_req_engine_i*)pub_engine;
- h2_mplx *m = engine->m;
- int acquired;
-
- if (enter_mutex(m, &acquired) == APR_SUCCESS) {
- if (!m->aborted
- && !H2_REQ_ENTRIES_EMPTY(&engine->entries)) {
- h2_req_entry *entry;
- ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c,
- "h2_mplx(%ld): exit engine %s (%s), "
- "has still requests queued, shutdown=%d,"
- "assigned=%ld, live=%ld, finished=%ld",
- m->c->id, engine->pub.id, engine->pub.type,
- engine->shutdown,
- (long)engine->no_assigned, (long)engine->no_live,
- (long)engine->no_finished);
- for (entry = H2_REQ_ENTRIES_FIRST(&engine->entries);
- entry != H2_REQ_ENTRIES_SENTINEL(&engine->entries);
- entry = H2_REQ_ENTRY_NEXT(entry)) {
- request_rec *r = entry->r;
- h2_task *task = h2_ctx_rget_task(r);
- ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c,
- "h2_mplx(%ld): engine %s has queued task %s, "
- "frozen=%d, aborting",
- m->c->id, engine->pub.id, task->id, task->frozen);
- engine_done(m, engine, task, 0, 1);
- }
- }
- if (!m->aborted && (engine->no_assigned > 1 || engine->no_live > 1)) {
- ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c,
- "h2_mplx(%ld): exit engine %s (%s), "
- "assigned=%ld, live=%ld, finished=%ld",
- m->c->id, engine->pub.id, engine->pub.type,
- (long)engine->no_assigned, (long)engine->no_live,
- (long)engine->no_finished);
- }
- else {
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
- "h2_mplx(%ld): exit engine %s (%s)",
- m->c->id, engine->pub.id, engine->pub.type);
- }
- if (m->engine == &engine->pub) {
- m->engine = NULL; /* TODO */
- }
- leave_mutex(m, acquired);
- }
-}
struct apr_thread_cond_t;
struct h2_workers;
struct h2_int_queue;
+struct h2_ngn_shed;
struct h2_req_engine;
#include <apr_queue.h>
struct h2_io_set *ready_ios;
struct h2_io_set *redo_ios;
- int max_stream_started; /* highest stream id that started processing */
- int workers_busy; /* # of workers processing on this mplx */
- int workers_limit; /* current # of workers limit, dynamic */
- int workers_def_limit; /* default # of workers limit */
- int workers_max; /* max, hard limit # of workers in a process */
- apr_time_t last_idle_block; /* last time, this mplx entered IDLE while
- * streams were ready */
- apr_time_t last_limit_change;/* last time, worker limit changed */
+ apr_uint32_t max_streams; /* max # of concurrent streams */
+ apr_uint32_t max_stream_started; /* highest stream id that started processing */
+ apr_uint32_t workers_busy; /* # of workers processing on this mplx */
+ apr_uint32_t workers_limit; /* current # of workers limit, dynamic */
+ apr_uint32_t workers_def_limit; /* default # of workers limit */
+ apr_uint32_t workers_max; /* max, hard limit # of workers in a process */
+ apr_time_t last_idle_block; /* last time, this mplx entered IDLE while
+ * streams were ready */
+ apr_time_t last_limit_change; /* last time, worker limit changed */
apr_interval_time_t limit_change_interval;
apr_thread_mutex_t *lock;
struct apr_thread_cond_t *added_output;
- struct apr_thread_cond_t *task_done;
+ struct apr_thread_cond_t *req_added;
struct apr_thread_cond_t *join_wait;
apr_size_t stream_max_mem;
h2_mplx_consumed_cb *input_consumed;
void *input_consumed_ctx;
-
- struct h2_req_engine *engine;
- /* TODO: signal for waiting tasks*/
- apr_queue_t *engine_queue;
- int next_eng_id;
+
+ struct h2_ngn_shed *ngn_shed;
};
* of bytes buffered reaches configured max.
* @param stream_id the stream identifier
* @param filter the apache filter context of the data
+ * @param blocking == 0 iff call should return with APR_INCOMPLETE if
+ * the full brigade cannot be written at once
* @param bb the bucket brigade to append
* @param trailers optional trailers for response, maybe NULL
* @param iowait a conditional used for block/signalling in h2_mplx
*/
apr_status_t h2_mplx_out_write(h2_mplx *mplx, int stream_id,
- ap_filter_t* filter, apr_bucket_brigade *bb,
+ ap_filter_t* filter,
+ int blocking,
+ apr_bucket_brigade *bb,
apr_table_t *trailers,
struct apr_thread_cond_t *iowait);
apr_status_t h2_mplx_idle(h2_mplx *m);
/*******************************************************************************
- * h2_mplx h2_req_engine handling.
+ * h2_req_engine handling
******************************************************************************/
-
-typedef apr_status_t h2_mplx_engine_init(struct h2_req_engine *engine,
- request_rec *r);
-
-apr_status_t h2_mplx_engine_push(const char *engine_type,
- request_rec *r, h2_mplx_engine_init *einit);
-
-apr_status_t h2_mplx_engine_pull(struct h2_req_engine *engine,
- apr_read_type_e block, request_rec **pr);
-
-void h2_mplx_engine_done(struct h2_req_engine *engine, conn_rec *r_conn);
-
-void h2_mplx_engine_exit(struct h2_req_engine *engine);
+
+typedef apr_status_t h2_mplx_req_engine_init(struct h2_req_engine *engine,
+ const char *id,
+ const char *type,
+ apr_pool_t *pool,
+ apr_uint32_t req_buffer_size,
+ request_rec *r);
+
+apr_status_t h2_mplx_req_engine_push(const char *ngn_type,
+ request_rec *r,
+ h2_mplx_req_engine_init *einit);
+apr_status_t h2_mplx_req_engine_pull(struct h2_req_engine *ngn,
+ apr_read_type_e block,
+ apr_uint32_t capacity,
+ request_rec **pr);
+void h2_mplx_req_engine_done(struct h2_req_engine *ngn, conn_rec *r_conn);
#endif /* defined(__mod_h2__h2_mplx__) */
--- /dev/null
+/* Copyright 2015 greenbytes GmbH (https://www.greenbytes.de)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <assert.h>
+#include <stddef.h>
+#include <stdlib.h>
+
+#include <apr_thread_mutex.h>
+#include <apr_thread_cond.h>
+#include <apr_strings.h>
+#include <apr_time.h>
+
+#include <httpd.h>
+#include <http_core.h>
+#include <http_log.h>
+
+#include "mod_http2.h"
+
+#include "h2_private.h"
+#include "h2_config.h"
+#include "h2_conn.h"
+#include "h2_ctx.h"
+#include "h2_h2.h"
+#include "h2_int_queue.h"
+#include "h2_response.h"
+#include "h2_request.h"
+#include "h2_task.h"
+#include "h2_task_output.h"
+#include "h2_util.h"
+#include "h2_ngn_shed.h"
+
+
+typedef struct h2_ngn_entry h2_ngn_entry;
+struct h2_ngn_entry {
+ APR_RING_ENTRY(h2_ngn_entry) link;
+ h2_task *task;
+ request_rec *r;
+};
+
+#define H2_NGN_ENTRY_NEXT(e) APR_RING_NEXT((e), link)
+#define H2_NGN_ENTRY_PREV(e) APR_RING_PREV((e), link)
+#define H2_NGN_ENTRY_REMOVE(e) APR_RING_REMOVE((e), link)
+
+#define H2_REQ_ENTRIES_SENTINEL(b) APR_RING_SENTINEL((b), h2_ngn_entry, link)
+#define H2_REQ_ENTRIES_EMPTY(b) APR_RING_EMPTY((b), h2_ngn_entry, link)
+#define H2_REQ_ENTRIES_FIRST(b) APR_RING_FIRST(b)
+#define H2_REQ_ENTRIES_LAST(b) APR_RING_LAST(b)
+
+#define H2_REQ_ENTRIES_INSERT_HEAD(b, e) do { \
+h2_ngn_entry *ap__b = (e); \
+APR_RING_INSERT_HEAD((b), ap__b, h2_ngn_entry, link); \
+} while (0)
+
+#define H2_REQ_ENTRIES_INSERT_TAIL(b, e) do { \
+h2_ngn_entry *ap__b = (e); \
+APR_RING_INSERT_TAIL((b), ap__b, h2_ngn_entry, link); \
+} while (0)
+
+struct h2_req_engine {
+ const char *id; /* identifier */
+ const char *type; /* name of the engine type */
+ apr_pool_t *pool; /* pool for engine specific allocations */
+ conn_rec *c; /* connection this engine is assigned to */
+ h2_task *task; /* the task this engine is base on, running in */
+ h2_ngn_shed *shed;
+
+ unsigned int shutdown : 1; /* engine is being shut down */
+ unsigned int done : 1; /* engine has finished */
+
+ APR_RING_HEAD(h2_req_entries, h2_ngn_entry) entries;
+ apr_uint32_t capacity; /* maximum concurrent requests */
+ apr_uint32_t no_assigned; /* # of assigned requests */
+ apr_uint32_t no_live; /* # of live */
+ apr_uint32_t no_finished; /* # of finished */
+};
+
+const char *h2_req_engine_get_id(h2_req_engine *engine)
+{
+ return engine->id;
+}
+
+int h2_req_engine_is_shutdown(h2_req_engine *engine)
+{
+ return engine->shutdown;
+}
+
+h2_ngn_shed *h2_ngn_shed_create(apr_pool_t *pool, conn_rec *c,
+ apr_uint32_t default_capacity,
+ apr_uint32_t req_buffer_size)
+{
+ h2_ngn_shed *shed;
+
+ shed = apr_pcalloc(pool, sizeof(*shed));
+ shed->c = c;
+ shed->pool = pool;
+ shed->default_capacity = default_capacity;
+ shed->req_buffer_size = req_buffer_size;
+ shed->ngns = apr_hash_make(pool);
+
+ return shed;
+}
+
+void h2_ngn_shed_set_ctx(h2_ngn_shed *shed, void *user_ctx)
+{
+ shed->user_ctx = user_ctx;
+}
+
+void *h2_ngn_shed_get_ctx(h2_ngn_shed *shed)
+{
+ return shed->user_ctx;
+}
+
+h2_ngn_shed *h2_ngn_shed_get_shed(h2_req_engine *ngn)
+{
+ return ngn->shed;
+}
+
+void h2_ngn_shed_abort(h2_ngn_shed *shed)
+{
+ shed->aborted = 1;
+}
+
+static void ngn_add_req(h2_req_engine *ngn, h2_task *task, request_rec *r)
+{
+ h2_ngn_entry *entry = apr_pcalloc(task->c->pool, sizeof(*entry));
+ APR_RING_ELEM_INIT(entry, link);
+ entry->task = task;
+ entry->r = r;
+ H2_REQ_ENTRIES_INSERT_TAIL(&ngn->entries, entry);
+}
+
+
+apr_status_t h2_ngn_shed_push_req(h2_ngn_shed *shed, const char *ngn_type,
+ h2_task *task, request_rec *r,
+ h2_req_engine_init *einit){
+ h2_req_engine *ngn;
+
+ AP_DEBUG_ASSERT(shed);
+
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, shed->c,
+ "h2_ngn_shed(%ld): PUSHing request (task=%s)", shed->c->id,
+ apr_table_get(r->connection->notes, H2_TASK_ID_NOTE));
+ if (task->ser_headers) {
+ /* Max compatibility, deny processing of this */
+ return APR_EOF;
+ }
+
+ ngn = apr_hash_get(shed->ngns, ngn_type, APR_HASH_KEY_STRING);
+ if (ngn && !ngn->shutdown) {
+ /* this task will be processed in another thread,
+ * freeze any I/O for the time being. */
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, task->c,
+ "h2_ngn_shed(%ld): pushing request %s to %s",
+ shed->c->id, task->id, ngn->id);
+ if (!h2_task_is_detached(task)) {
+ h2_task_freeze(task, r);
+ }
+ /* FIXME: sometimes ngn is garbage, probly alread freed */
+ ngn_add_req(ngn, task, r);
+ ngn->no_assigned++;
+ return APR_SUCCESS;
+ }
+
+ /* no existing engine or being shut down, start a new one */
+ if (einit) {
+ apr_status_t status;
+ apr_pool_t *pool = task->c->pool;
+ h2_req_engine *newngn;
+
+ newngn = apr_pcalloc(pool, sizeof(*ngn));
+ newngn->pool = pool;
+ newngn->id = apr_psprintf(pool, "ngn-%s", task->id);
+ newngn->type = apr_pstrdup(pool, ngn_type);
+ newngn->c = task->c;
+ newngn->shed = shed;
+ newngn->capacity = shed->default_capacity;
+ newngn->no_assigned = 1;
+ newngn->no_live = 1;
+ APR_RING_INIT(&newngn->entries, h2_ngn_entry, link);
+
+ status = einit(newngn, newngn->id, newngn->type, newngn->pool,
+ shed->req_buffer_size, r);
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, task->c,
+ "h2_ngn_shed(%ld): create engine %s (%s)",
+ shed->c->id, newngn->id, newngn->type);
+ if (status == APR_SUCCESS) {
+ AP_DEBUG_ASSERT(task->engine == NULL);
+ newngn->task = task;
+ task->engine = newngn;
+ apr_hash_set(shed->ngns, newngn->type, APR_HASH_KEY_STRING, newngn);
+ }
+ return status;
+ }
+ return APR_EOF;
+}
+
+static h2_ngn_entry *pop_non_frozen(h2_req_engine *ngn)
+{
+ h2_ngn_entry *entry;
+ for (entry = H2_REQ_ENTRIES_FIRST(&ngn->entries);
+ entry != H2_REQ_ENTRIES_SENTINEL(&ngn->entries);
+ entry = H2_NGN_ENTRY_NEXT(entry)) {
+ if (!entry->task->frozen) {
+ H2_NGN_ENTRY_REMOVE(entry);
+ return entry;
+ }
+ }
+ return NULL;
+}
+
+apr_status_t h2_ngn_shed_pull_req(h2_ngn_shed *shed,
+ h2_req_engine *ngn,
+ apr_uint32_t capacity,
+ int want_shutdown,
+ request_rec **pr)
+{
+ h2_ngn_entry *entry;
+
+ AP_DEBUG_ASSERT(ngn);
+ *pr = NULL;
+ if (shed->aborted) {
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, shed->c,
+ "h2_ngn_shed(%ld): abort while pulling requests %s",
+ shed->c->id, ngn->id);
+ ngn->shutdown = 1;
+ return APR_ECONNABORTED;
+ }
+
+ ngn->capacity = capacity;
+ if (H2_REQ_ENTRIES_EMPTY(&ngn->entries)) {
+ if (want_shutdown) {
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, shed->c,
+ "h2_ngn_shed(%ld): emtpy queue, shutdown engine %s",
+ shed->c->id, ngn->id);
+ ngn->shutdown = 1;
+ }
+ return ngn->shutdown? APR_EOF : APR_EAGAIN;
+ }
+
+ if ((entry = pop_non_frozen(ngn))) {
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, entry->task->c,
+ "h2_ngn_shed(%ld): pulled request %s for engine %s",
+ shed->c->id, entry->task->id, ngn->id);
+ ngn->no_live++;
+ *pr = entry->r;
+ return APR_SUCCESS;
+ }
+ return APR_EAGAIN;
+}
+
+static apr_status_t ngn_done_task(h2_ngn_shed *shed, h2_req_engine *ngn,
+ h2_task *task, int waslive, int aborted,
+ int close)
+{
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, shed->c,
+ "h2_ngn_shed(%ld): task %s %s by %s",
+ shed->c->id, task->id, aborted? "aborted":"done", ngn->id);
+ ngn->no_finished++;
+ if (waslive) ngn->no_live--;
+ ngn->no_assigned--;
+
+ if (close) {
+ h2_task_output_close(task->output);
+ }
+ return APR_SUCCESS;
+}
+
+apr_status_t h2_ngn_shed_done_task(h2_ngn_shed *shed,
+ struct h2_req_engine *ngn, h2_task *task)
+{
+ return ngn_done_task(shed, ngn, task, 1, 0, 0);
+}
+
+void h2_ngn_shed_done_ngn(h2_ngn_shed *shed, struct h2_req_engine *ngn)
+{
+ if (ngn->done) {
+ return;
+ }
+
+ if (!shed->aborted && !H2_REQ_ENTRIES_EMPTY(&ngn->entries)) {
+ h2_ngn_entry *entry;
+ ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, shed->c,
+ "h2_ngn_shed(%ld): exit engine %s (%s), "
+ "has still requests queued, shutdown=%d,"
+ "assigned=%ld, live=%ld, finished=%ld",
+ shed->c->id, ngn->id, ngn->type,
+ ngn->shutdown,
+ (long)ngn->no_assigned, (long)ngn->no_live,
+ (long)ngn->no_finished);
+ for (entry = H2_REQ_ENTRIES_FIRST(&ngn->entries);
+ entry != H2_REQ_ENTRIES_SENTINEL(&ngn->entries);
+ entry = H2_NGN_ENTRY_NEXT(entry)) {
+ request_rec *r = entry->r;
+ h2_task *task = h2_ctx_rget_task(r);
+ ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, shed->c,
+ "h2_ngn_shed(%ld): engine %s has queued task %s, "
+ "frozen=%d, aborting",
+ shed->c->id, ngn->id, task->id, task->frozen);
+ ngn_done_task(shed, ngn, task, 0, 1, 1);
+ }
+ }
+ if (!shed->aborted && (ngn->no_assigned > 1 || ngn->no_live > 1)) {
+ ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, shed->c,
+ "h2_ngn_shed(%ld): exit engine %s (%s), "
+ "assigned=%ld, live=%ld, finished=%ld",
+ shed->c->id, ngn->id, ngn->type,
+ (long)ngn->no_assigned, (long)ngn->no_live,
+ (long)ngn->no_finished);
+ }
+ else {
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, shed->c,
+ "h2_ngn_shed(%ld): exit engine %s",
+ shed->c->id, ngn->id);
+ }
+
+ apr_hash_set(shed->ngns, ngn->type, APR_HASH_KEY_STRING, NULL);
+ ngn->done = 1;
+}
--- /dev/null
+/* Copyright 2015 greenbytes GmbH (https://www.greenbytes.de)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef h2_req_shed_h
+#define h2_req_shed_h
+
+struct h2_req_engine;
+struct h2_task;
+
+typedef struct h2_ngn_shed h2_ngn_shed;
+struct h2_ngn_shed {
+ conn_rec *c;
+ apr_pool_t *pool;
+ apr_hash_t *ngns;
+ void *user_ctx;
+
+ unsigned int aborted : 1;
+
+ apr_uint32_t default_capacity;
+ apr_uint32_t req_buffer_size; /* preferred buffer size for responses */
+};
+
+const char *h2_req_engine_get_id(h2_req_engine *engine);
+int h2_req_engine_is_shutdown(h2_req_engine *engine);
+
+typedef apr_status_t h2_shed_ngn_init(h2_req_engine *engine,
+ const char *id,
+ const char *type,
+ apr_pool_t *pool,
+ apr_uint32_t req_buffer_size,
+ request_rec *r);
+
+h2_ngn_shed *h2_ngn_shed_create(apr_pool_t *pool, conn_rec *c,
+ apr_uint32_t default_capactiy,
+ apr_uint32_t req_buffer_size);
+
+void h2_ngn_shed_set_ctx(h2_ngn_shed *shed, void *user_ctx);
+void *h2_ngn_shed_get_ctx(h2_ngn_shed *shed);
+
+h2_ngn_shed *h2_ngn_shed_get_shed(struct h2_req_engine *ngn);
+
+void h2_ngn_shed_abort(h2_ngn_shed *shed);
+
+apr_status_t h2_ngn_shed_push_req(h2_ngn_shed *shed, const char *ngn_type,
+ struct h2_task *task, request_rec *r,
+ h2_shed_ngn_init *init_cb);
+
+apr_status_t h2_ngn_shed_pull_req(h2_ngn_shed *shed, h2_req_engine *pub_ngn,
+ apr_uint32_t capacity,
+ int want_shutdown, request_rec **pr);
+
+apr_status_t h2_ngn_shed_done_task(h2_ngn_shed *shed,
+ struct h2_req_engine *ngn,
+ struct h2_task *task);
+
+void h2_ngn_shed_done_ngn(h2_ngn_shed *shed, struct h2_req_engine *ngn);
+
+
+#endif /* h2_req_shed_h */
/* Request check post hooks failed. An example of this would be a
* request for a vhost where h2 is disabled --> 421.
*/
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, conn, APLOGNO()
+ "h2_request(%d): access_status=%d, request_create failed",
+ req->id, access_status);
ap_die(access_status, r);
ap_update_child_status(conn->sbh, SERVER_BUSY_LOG, r);
ap_run_log_transaction(r);
(long)session->frames_sent);
}
++session->frames_sent;
- switch (frame->hd.type) {
- case NGHTTP2_HEADERS:
- case NGHTTP2_DATA:
- /* no explicit flushing necessary */
- break;
- default:
- session->flush = 1;
- break;
- }
return 0;
}
}
else {
/* uncommon status, log on INFO so that we see this */
- ap_log_cerror( APLOG_MARK, APLOG_INFO, status, c,
+ ap_log_cerror( APLOG_MARK, APLOG_DEBUG, status, c,
APLOGNO(02950)
"h2_session(%ld): error reading, terminating",
session->id);
break;
default:
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c,
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c,
"h2_session(%ld): conn error -> shutdown", session->id);
h2_session_shutdown(session, arg, msg, 0);
break;
break;
default:
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c,
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c,
"h2_session(%ld): proto error -> shutdown", session->id);
h2_session_shutdown(session, arg, msg, 0);
break;
transit(session, "no io", H2_SESSION_ST_DONE);
}
else {
+ apr_time_t now = apr_time_now();
/* When we have no streams, no task event are possible,
* switch to blocking reads */
transit(session, "no io", H2_SESSION_ST_IDLE);
session->idle_until = (session->requests_received?
session->s->keep_alive_timeout :
- session->s->timeout) + apr_time_now();
+ session->s->timeout) + now;
+ session->keep_sync_until = now + apr_time_from_sec(1);
}
}
else if (!has_unsubmitted_streams(session)
* window updates. */
transit(session, "no io", H2_SESSION_ST_IDLE);
session->idle_until = apr_time_now() + session->s->timeout;
+ session->keep_sync_until = session->idle_until;
}
else {
/* Unable to do blocking reads, as we wait on events from
no_streams = h2_ihash_is_empty(session->streams);
update_child_status(session, (no_streams? SERVER_BUSY_KEEPALIVE
: SERVER_BUSY_READ), "idle");
- if (async && no_streams && !session->r && session->requests_received) {
+ /* make certain, the client receives everything before we idle */
+ h2_conn_io_flush(&session->io);
+ if (!session->keep_sync_until
+ && async && no_streams && !session->r && session->requests_received) {
ap_log_cerror( APLOG_MARK, APLOG_TRACE1, status, c,
"h2_session(%ld): async idle, nonblock read", session->id);
/* We do not return to the async mpm immediately, since under
/* nothing to read */
}
else if (APR_STATUS_IS_TIMEUP(status)) {
- if (apr_time_now() > session->idle_until) {
+ apr_time_t now = apr_time_now();
+ if (now > session->keep_sync_until) {
+ /* if we are on an async mpm, now is the time that
+ * we may dare to pass control to it. */
+ session->keep_sync_until = 0;
+ }
+ if (now > session->idle_until) {
dispatch_event(session, H2_SESSION_EV_CONN_TIMEOUT, 0, "timeout");
}
/* continue reading handling */
/* waited long enough */
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, APR_TIMEUP, c,
"h2_session: wait for data");
- dispatch_event(session, H2_SESSION_EV_CONN_TIMEOUT, 0, NULL);
+ dispatch_event(session, H2_SESSION_EV_CONN_TIMEOUT, 0, "timeout");
+ break;
}
else {
/* repeating, increase timer for graceful backoff */
"h2_session: wait for data, %ld micros",
(long)session->wait_us);
}
+ /* make certain, the client receives everything before we idle */
+ h2_conn_io_flush(&session->io);
status = h2_mplx_out_trywait(session->mplx, session->wait_us,
session->iowait);
if (status == APR_SUCCESS) {
transit(session, "wait cycle", H2_SESSION_ST_BUSY);
}
else {
- h2_session_shutdown(session, H2_ERR_INTERNAL_ERROR, "cond wait error", 0);
+ ap_log_cerror(APLOG_MARK, APLOG_WARNING, status, c,
+ "h2_session(%ld): waiting on conditional",
+ session->id);
+ h2_session_shutdown(session, H2_ERR_INTERNAL_ERROR,
+ "cond wait error", 0);
}
break;
}
out:
- h2_conn_io_pass(&session->io, session->flush);
- session->flush = 0;
+ h2_conn_io_flush(&session->io);
ap_log_cerror( APLOG_MARK, APLOG_TRACE1, status, c,
"h2_session(%ld): [%s] process returns",
apr_time_t start_wait; /* Time we started waiting for sth. to happen */
apr_time_t idle_until; /* Time we shut down due to sheer boredom */
+ apr_time_t keep_sync_until; /* Time we sync wait until passing to async mpm */
apr_pool_t *pool; /* pool to use in session handling */
apr_bucket_brigade *bbtmp; /* brigade for keeping temporary data */
AP_DEBUG_ASSERT(task);
if (task->frozen) {
- ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, f->r,
+ ap_log_rerror(APLOG_MARK, APLOG_TRACE2, 0, f->r,
"h2_response_freeze_filter, saving");
- return ap_save_brigade(f, &task->frozen_out, &bb, task->c->pool);
+ return ap_save_brigade(f, &task->output->frozen_bb, &bb, task->c->pool);
}
if (APR_BRIGADE_EMPTY(bb)) {
return APR_SUCCESS;
}
- ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, f->r,
+ ap_log_rerror(APLOG_MARK, APLOG_TRACE2, 0, f->r,
"h2_response_freeze_filter, passing");
return ap_pass_brigade(f->next, bb);
}
task->request = req;
task->input_eos = !req->body;
task->ser_headers = req->serialize;
+ task->blocking = 1;
h2_ctx_create_for(c, task);
return task;
}
+void h2_task_set_io_blocking(h2_task *task, int blocking)
+{
+ task->blocking = blocking;
+}
+
apr_status_t h2_task_do(h2_task *task, apr_thread_cond_t *cond)
{
apr_status_t status;
task->input = h2_task_input_create(task, task->c);
task->output = h2_task_output_create(task, task->c);
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, task->c,
+ "h2_task(%s): process connection", task->id);
ap_process_connection(task->c, ap_get_conn_socket(task->c));
if (task->frozen) {
conn_state_t *cs = c->cs;
request_rec *r;
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c,
+ "h2_task(%s): create request_rec", task->id);
r = h2_request_create_rec(req, c);
if (r && (r->status == HTTP_OK)) {
ap_update_child_status(c->sbh, SERVER_BUSY_READ, r);
cs->state = CONN_STATE_WRITE_COMPLETION;
r = NULL;
}
+ else if (!r) {
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c,
+ "h2_task(%s): create request_rec failed, r=NULL", task->id);
+ }
+ else {
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c,
+ "h2_task(%s): create request_rec failed, r->status=%d",
+ task->id, r->status);
+ }
c->sbh = NULL;
return APR_SUCCESS;
ctx = h2_ctx_get(c, 0);
if (h2_ctx_is_task(ctx)) {
if (!ctx->task->ser_headers) {
- ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, c,
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c,
"h2_h2, processing request directly");
h2_task_process_request(ctx->task, c);
return DONE;
}
- ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, c,
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c,
"h2_task(%s), serialized handling", ctx->task->id);
}
+ else {
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c,
+ "slave_conn(%ld): has no task", c->id);
+ }
return DECLINED;
}
conn_rec *c = task->c;
task->frozen = 1;
- task->frozen_out = apr_brigade_create(c->pool, c->bucket_alloc);
+ task->output->frozen_bb = apr_brigade_create(c->pool, c->bucket_alloc);
ap_add_output_filter("H2_RESPONSE_FREEZE", task, r, r->connection);
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, c,
"h2_task(%s), frozen", task->id);
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, task->c,
"h2_task(%s), thawed", task->id);
}
+ task->detached = 1;
return APR_SUCCESS;
}
+int h2_task_is_detached(h2_task *task)
+{
+ return task->detached;
+}
struct h2_conn;
struct h2_mplx;
struct h2_task;
+struct h2_req_engine;
struct h2_request;
struct h2_resp_head;
struct h2_worker;
unsigned int input_eos : 1;
unsigned int ser_headers : 1;
unsigned int frozen : 1;
+ unsigned int blocking : 1;
+ unsigned int detached : 1;
struct h2_task_input *input;
struct h2_task_output *output;
struct apr_thread_cond_t *io; /* used to wait for events on */
-
- apr_bucket_brigade *frozen_out;
+
+ struct h2_req_engine *engine;
};
h2_task *h2_task_create(long session_id, const struct h2_request *req,
apr_status_t h2_task_freeze(h2_task *task, request_rec *r);
apr_status_t h2_task_thaw(h2_task *task);
+int h2_task_is_detached(h2_task *task);
+
+void h2_task_set_io_blocking(h2_task *task, int blocking);
#endif /* defined(__mod_h2__h2_task__) */
if (f) {
/* This happens currently when ap_die(status, r) is invoked
* by a read request filter. */
- ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, f->c, APLOGNO(03204)
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, output->c, APLOGNO(03204)
"h2_task_output(%s): write without response by %s "
"for %s %s %s",
output->task->id, caller,
output->task->request->method,
output->task->request->authority,
output->task->request->path);
- f->c->aborted = 1;
+ output->c->aborted = 1;
}
if (output->task->io) {
apr_thread_cond_broadcast(output->task->io);
if (h2_task_logio_add_bytes_out) {
/* counter headers as if we'd do a HTTP/1.1 serialization */
- /* TODO: counter a virtual status line? */
- apr_off_t bytes_written;
- apr_brigade_length(bb, 0, &bytes_written);
- bytes_written += h2_util_table_bytes(response->headers, 3)+1;
- h2_task_logio_add_bytes_out(f->c, bytes_written);
+ output->written = h2_util_table_bytes(response->headers, 3)+1;
+ h2_task_logio_add_bytes_out(output->c, output->written);
}
get_trailers(output);
- ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, f->c, APLOGNO(03348)
- "h2_task_output(%s): open as needed %s %s %s",
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, output->c, APLOGNO(03348)
+ "h2_task(%s): open response to %s %s %s",
output->task->id, output->task->request->method,
output->task->request->authority,
output->task->request->path);
return h2_mplx_out_open(output->task->mplx, output->task->stream_id,
response, f, bb, output->task->io);
}
- return APR_EOF;
+ return APR_SUCCESS;
}
-void h2_task_output_close(h2_task_output *output)
+static apr_status_t write_brigade_raw(h2_task_output *output,
+ ap_filter_t* f, apr_bucket_brigade* bb)
{
- open_if_needed(output, NULL, NULL, "close");
- if (output->state != H2_TASK_OUT_DONE) {
- if (output->task->frozen_out
- && !APR_BRIGADE_EMPTY(output->task->frozen_out)) {
- h2_mplx_out_write(output->task->mplx, output->task->stream_id,
- NULL, output->task->frozen_out, NULL, NULL);
+ apr_off_t written, left;
+ apr_status_t status;
+
+ apr_brigade_length(bb, 0, &written);
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, output->c,
+ "h2_task(%s): write response body (%ld bytes)",
+ output->task->id, (long)written);
+
+ status = h2_mplx_out_write(output->task->mplx, output->task->stream_id,
+ f, output->task->blocking, bb,
+ get_trailers(output), output->task->io);
+ if (status == APR_INCOMPLETE) {
+ apr_brigade_length(bb, 0, &left);
+ written -= left;
+ status = APR_SUCCESS;
+ }
+
+ if (status == APR_SUCCESS) {
+ output->written += written;
+ if (h2_task_logio_add_bytes_out) {
+ h2_task_logio_add_bytes_out(output->c, written);
}
- h2_mplx_out_close(output->task->mplx, output->task->stream_id,
- get_trailers(output));
- output->state = H2_TASK_OUT_DONE;
}
+ return status;
}
/* Bring the data from the brigade (which represents the result of the
apr_status_t status;
if (APR_BRIGADE_EMPTY(bb)) {
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, f->c,
- "h2_task_output(%s): empty write", output->task->id);
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, output->c,
+ "h2_task(%s): empty write", output->task->id);
return APR_SUCCESS;
}
if (output->task->frozen) {
h2_util_bb_log(output->c, output->task->stream_id, APLOG_TRACE2,
"frozen task output write", bb);
- return ap_save_brigade(f, &output->task->frozen_out, &bb,
- output->c->pool);
+ return ap_save_brigade(f, &output->frozen_bb, &bb, output->c->pool);
}
status = open_if_needed(output, f, bb, "write");
- if (status != APR_EOF) {
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, f->c,
- "h2_task_output(%s): opened and passed brigade",
+
+ /* Attempt to write saved brigade first */
+ if (status == APR_SUCCESS && output->bb
+ && !APR_BRIGADE_EMPTY(output->bb)) {
+ status = write_brigade_raw(output, f, output->bb);
+ }
+
+ /* If there is nothing saved (anymore), try to write the brigade passed */
+ if (status == APR_SUCCESS
+ && (!output->bb || APR_BRIGADE_EMPTY(output->bb))
+ && !APR_BRIGADE_EMPTY(bb)) {
+ status = write_brigade_raw(output, f, bb);
+ }
+
+ /* If the passed brigade is not empty, save it before return */
+ if (status == APR_SUCCESS && !APR_BRIGADE_EMPTY(bb)) {
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, output->c,
+ "h2_task(%s): could not write all, saving brigade",
output->task->id);
- return status;
+ if (!output->bb) {
+ output->bb = apr_brigade_create(output->c->pool, output->c->bucket_alloc);
+ }
+ return ap_save_brigade(f, &output->bb, &bb, output->c->pool);
}
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, f->c,
- "h2_task_output(%s): write brigade", output->task->id);
- if (h2_task_logio_add_bytes_out) {
- apr_off_t bytes_written;
- apr_brigade_length(bb, 0, &bytes_written);
- h2_task_logio_add_bytes_out(f->c, bytes_written);
+ return status;
+}
+
+void h2_task_output_close(h2_task_output *output)
+{
+ if (output->task->frozen) {
+ return;
+ }
+ open_if_needed(output, NULL, NULL, "close");
+ if (output->state != H2_TASK_OUT_DONE) {
+ if (output->frozen_bb && !APR_BRIGADE_EMPTY(output->frozen_bb)) {
+ h2_mplx_out_write(output->task->mplx, output->task->stream_id,
+ NULL, 1, output->frozen_bb, NULL, NULL);
+ }
+ output->state = H2_TASK_OUT_DONE;
+ h2_mplx_out_close(output->task->mplx, output->task->stream_id,
+ get_trailers(output));
}
- return h2_mplx_out_write(output->task->mplx, output->task->stream_id,
- f, bb, get_trailers(output), output->task->io);
}
H2_TASK_OUT_INIT,
H2_TASK_OUT_STARTED,
H2_TASK_OUT_DONE,
-} h2_task_output_state_t;
+} h2_task_out_state_t;
typedef struct h2_task_output h2_task_output;
struct h2_task_output {
conn_rec *c;
struct h2_task *task;
- h2_task_output_state_t state;
+ h2_task_out_state_t state;
struct h2_from_h1 *from_h1;
+
unsigned int trailers_passed : 1;
+
+ apr_off_t written;
+ apr_bucket_brigade *bb;
+ apr_bucket_brigade *frozen_bb;
};
h2_task_output *h2_task_output_create(struct h2_task *task, conn_rec *c);
return ictx->iter(ictx->ctx, (void*)val); /* why is this passed const?*/
}
-void h2_ihash_iter(h2_ihash_t *ih, h2_ihash_iter_t *fn, void *ctx)
+int h2_ihash_iter(h2_ihash_t *ih, h2_ihash_iter_t *fn, void *ctx)
{
iter_ctx ictx;
ictx.iter = fn;
ictx.ctx = ctx;
- apr_hash_do(ihash_iter, &ictx, ih->hash);
+ return apr_hash_do(ihash_iter, &ictx, ih->hash);
}
void h2_ihash_add(h2_ihash_t *ih, void *val)
frame->goaway.opaque_data_len : s_len-1;
memcpy(scratch, frame->goaway.opaque_data, len);
scratch[len] = '\0';
- return apr_snprintf(buffer, maxlen, "GOAWAY[error=%d, reason='%s']",
- frame->goaway.error_code, scratch);
+ return apr_snprintf(buffer, maxlen, "GOAWAY[error=%d, reason='%s', "
+ "last_stream=%d]", frame->goaway.error_code,
+ scratch, frame->goaway.last_stream_id);
}
case NGHTTP2_WINDOW_UPDATE: {
return apr_snprintf(buffer, maxlen,
* @param ih the hash to iterate over
* @param fn the function to invoke on each member
* @param ctx user supplied data passed into each iteration call
+ * @param 0 if one iteration returned 0, otherwise != 0
*/
-void h2_ihash_iter(h2_ihash_t *ih, h2_ihash_iter_t *fn, void *ctx);
+int h2_ihash_iter(h2_ihash_t *ih, h2_ihash_iter_t *fn, void *ctx);
void h2_ihash_add(h2_ihash_t *ih, void *val);
void h2_ihash_remove(h2_ihash_t *ih, int id);
* @macro
* Version number of the http2 module as c string
*/
-#define MOD_HTTP2_VERSION "1.3.3"
+#define MOD_HTTP2_VERSION "1.4.0"
/**
* @macro
* release. This is a 24 bit number with 8 bits for major number, 8 bits
* for minor and 8 bits for patch. Version 1.2.3 becomes 0x010203.
*/
-#define MOD_HTTP2_VERSION_NUM 0x010303
+#define MOD_HTTP2_VERSION_NUM 0x010400
#endif /* mod_h2_h2_version_h */
conn_rec *, request_rec *, char *name);
static int http2_is_h2(conn_rec *);
-static apr_status_t http2_req_engine_push(const char *engine_type,
+static apr_status_t http2_req_engine_push(const char *ngn_type,
request_rec *r,
h2_req_engine_init *einit)
{
- return h2_mplx_engine_push(engine_type, r, einit);
+ return h2_mplx_req_engine_push(ngn_type, r, einit);
}
-static apr_status_t http2_req_engine_pull(h2_req_engine *engine,
+static apr_status_t http2_req_engine_pull(h2_req_engine *ngn,
apr_read_type_e block,
+ apr_uint32_t capacity,
request_rec **pr)
{
- return h2_mplx_engine_pull(engine, block, pr);
+ return h2_mplx_req_engine_pull(ngn, block, capacity, pr);
}
-static void http2_req_engine_done(h2_req_engine *engine, conn_rec *r_conn)
+static void http2_req_engine_done(h2_req_engine *ngn, conn_rec *r_conn)
{
- h2_mplx_engine_done(engine, r_conn);
+ h2_mplx_req_engine_done(ngn, r_conn);
}
-static void http2_req_engine_exit(h2_req_engine *engine)
-{
- h2_mplx_engine_exit(engine);
-}
-
-
/* Runs once per created child process. Perform any process
* related initionalization here.
*/
APR_REGISTER_OPTIONAL_FN(http2_req_engine_push);
APR_REGISTER_OPTIONAL_FN(http2_req_engine_pull);
APR_REGISTER_OPTIONAL_FN(http2_req_engine_done);
- APR_REGISTER_OPTIONAL_FN(http2_req_engine_exit);
ap_log_perror(APLOG_MARK, APLOG_TRACE1, 0, pool, "installing hooks");
* @param engine the allocated, partially filled structure
* @param r the first request to process, or NULL
*/
-typedef apr_status_t h2_req_engine_init(h2_req_engine *engine, request_rec *r);
-
-/**
- * The public structure of a h2_req_engine. It gets allocated by the http2
- * infrastructure, assigned id, type, pool, io and connection and passed to the
- * h2_req_engine_init() callback to complete initialization.
- * This happens whenever a new request gets "push"ed for an engine type and
- * no instance, or no free instance, for the type is available.
- */
-struct h2_req_engine {
- const char *id; /* identifier */
- apr_pool_t *pool; /* pool for engine specific allocations */
- const char *type; /* name of the engine type */
- unsigned char window_bits;/* preferred size of overall response data
- * mod_http2 is willing to buffer as log2 */
- unsigned char req_window_bits;/* preferred size of response body data
- * mod_http2 is willing to buffer per request,
- * as log2 */
- apr_size_t capacity; /* maximum concurrent requests */
- void *user_data; /* user specific data */
-};
+typedef apr_status_t h2_req_engine_init(h2_req_engine *engine,
+ const char *id,
+ const char *type,
+ apr_pool_t *pool,
+ apr_uint32_t req_buffer_size,
+ request_rec *r);
/**
* Push a request to an engine with the specified name for further processing.
* @param timeout wait a maximum amount of time for a new slave, 0 will not wait
* @param pslave the slave connection that needs processing or NULL
* @return APR_SUCCESS if new request was assigned
- * APR_EAGAIN/APR_TIMEUP if no new request is available
- * APR_ECONNABORTED if the engine needs to shut down
+ * APR_EAGAIN if no new request is available
+ * APR_EOF if engine may shut down, as no more request will be scheduled
+ * APR_ECONNABORTED if the engine needs to shut down immediately
*/
APR_DECLARE_OPTIONAL_FN(apr_status_t,
http2_req_engine_pull, (h2_req_engine *engine,
apr_read_type_e block,
+ apr_uint32_t capacity,
request_rec **pr));
APR_DECLARE_OPTIONAL_FN(void,
http2_req_engine_done, (h2_req_engine *engine,
conn_rec *rconn));
-/**
- * The given request engine is done processing and needs to be excluded
- * from further handling.
- * @param engine the engine to exit
- */
-APR_DECLARE_OPTIONAL_FN(void,
- http2_req_engine_exit, (h2_req_engine *engine));
-
-
-#define H2_TASK_ID_NOTE "http2-task-id"
-
#endif