-*- coding: utf-8 -*-
Changes with Apache 2.5.1
+ *) mod_http2: internal code cleanups and simplifications. Common output code for
+ h2 and h2c protocols, using nested mutex locks for simplified calls. [Stefan Eissing]
+
*) mod_proxy/ssl: Proxy SSL client certificate configuration and other proxy
SSL configurations broken inside <Proxy> context. PR 63430.
[Ruediger Pluem, Yann Ylavic]
[int main(int argc, const char *const *argv) {]
[[$2]]
[ return 0; }]
- )], [CFLAGS=$apr_save_CFLAGS
-$3], [CFLAGS=$apr_save_CFLAGS
-$4])
+ )],
+ [$3], [$4])
+ CFLAGS=$apr_save_CFLAGS
])
dnl
AC_SUBST(MKDEP)
])
-dnl
-dnl APR_CHECK_TYPES_FMT_COMPATIBLE(TYPE-1, TYPE-2, FMT-TAG,
-dnl [ACTION-IF-TRUE], [ACTION-IF-FALSE])
-dnl
-dnl Try to determine whether two types are the same and accept the given
-dnl printf formatter (bare token, e.g. literal d, ld, etc).
-dnl
-AC_DEFUN([APR_CHECK_TYPES_FMT_COMPATIBLE], [
-define([apr_cvname], apr_cv_typematch_[]translit([$1], [ ], [_])_[]translit([$2], [ ], [_])_[][$3])
-AC_CACHE_CHECK([whether $1 and $2 use fmt %$3], apr_cvname, [
-APR_TRY_COMPILE_NO_WARNING([#include <sys/types.h>
-#include <stdio.h>
-#ifdef HAVE_STDINT_H
-#include <stdint.h>
-#endif
-], [
- $1 chk1, *ptr1;
- $2 chk2, *ptr2 = &chk1;
- ptr1 = &chk2;
- *ptr1 = *ptr2 = 0;
- printf("%$3 %$3", chk1, chk2);
-], [apr_cvname=yes], [apr_cvname=no])])
-if test "$apr_cvname" = "yes"; then
- :
- $4
-else
- :
- $5
-fi
-])
-
dnl
dnl APR_CHECK_TYPES_COMPATIBLE(TYPE-1, TYPE-2, [ACTION-IF-TRUE])
dnl
dnl Try to determine whether two types are the same. Only works
dnl for gcc and icc.
dnl
-dnl @deprecated @see APR_CHECK_TYPES_FMT_COMPATIBLE
-dnl
AC_DEFUN([APR_CHECK_TYPES_COMPATIBLE], [
define([apr_cvname], apr_cv_typematch_[]translit([$1], [ ], [_])_[]translit([$2], [ ], [_]))
AC_CACHE_CHECK([whether $1 and $2 are the same], apr_cvname, [
H2_SS_CLOSED_L,
H2_SS_CLOSED,
H2_SS_CLEANUP,
+ H2_SS_DESTROYED,
H2_SS_MAX
} h2_stream_state_t;
H2_SEV_IN_DATA_PENDING,
} h2_stream_event_t;
+typedef enum {
+ H2_PS_NONE,
+ H2_PS_QUEUED,
+ H2_PS_RUNNING,
+ H2_PS_FINISHED,
+} h2_processing_state_t;
+
+#define H2_PS_IS_RUNNING(s) ((s) == H2_PS_RUNNING)
+#define H2_PS_IS_NOT_RUNNING(s) ((s) != H2_PS_RUNNING)
+#define H2_PS_IS_WAS_STARTED(s) ((s) >= H2_PS_RUNNING)
+#define H2_PS_IS_HAS_FINISHED(s) ((s) == H2_PS_FINISHED)
/* h2_request is the transformer of HTTP2 streams into HTTP/1.1 internal
* format that will be fed to various httpd input filters to finally
#include <httpd.h>
#include <http_protocol.h>
+#include <http_request.h>
#include <http_log.h>
#include "h2_private.h"
* h2_blist, a brigade without allocations
******************************************************************************/
+static void h2_blist_cleanup(h2_blist *bl)
+{
+ apr_bucket *e;
+
+ while (!H2_BLIST_EMPTY(bl)) {
+ e = H2_BLIST_FIRST(bl);
+ apr_bucket_delete(e);
+ }
+}
+
+static void brigade_move_to_blist(apr_bucket_brigade *bb, h2_blist *list)
+{
+ apr_bucket *b;
+ while (bb && !APR_BRIGADE_EMPTY(bb)) {
+ b = APR_BRIGADE_FIRST(bb);
+ APR_BUCKET_REMOVE(b);
+ H2_BLIST_INSERT_TAIL(list, b);
+ }
+}
+
+/*******************************************************************************
+ * bucket beamer registration
+ ******************************************************************************/
+
static apr_array_header_t *beamers;
static apr_status_t cleanup_beamers(void *dummy)
return len;
}
-static void r_purge_sent(h2_bucket_beam *beam)
-{
- apr_bucket *b;
- /* delete all sender buckets in purge brigade, needs to be called
- * from sender thread only */
- while (!H2_BLIST_EMPTY(&beam->purge_list)) {
- b = H2_BLIST_FIRST(&beam->purge_list);
- apr_bucket_delete(b);
- }
-}
-
static apr_size_t calc_space_left(h2_bucket_beam *beam)
{
if (beam->max_buf_size > 0) {
}
else {
/* it should be there unless we screwed up */
- ap_log_perror(APLOG_MARK, APLOG_WARNING, 0, beam->send_pool,
+ ap_log_perror(APLOG_MARK, APLOG_WARNING, 0, beam->pool,
APLOGNO(03384) "h2_beam(%d-%s): emitted bucket not "
"in hold, n=%d", beam->id, beam->tag,
(int)proxy->n);
}
/* notify anyone waiting on space to become available */
if (!bl.mutex) {
- r_purge_sent(beam);
+ h2_blist_cleanup(&beam->purge_list);
}
else {
apr_thread_cond_broadcast(beam->change);
}
}
-static void h2_blist_cleanup(h2_blist *bl)
-{
- apr_bucket *e;
-
- while (!H2_BLIST_EMPTY(bl)) {
- e = H2_BLIST_FIRST(bl);
- apr_bucket_delete(e);
- }
-}
-
static apr_status_t beam_close(h2_bucket_beam *beam)
{
if (!beam->closed) {
return beam->closed;
}
-static int pool_register(h2_bucket_beam *beam, apr_pool_t *pool,
- apr_status_t (*cleanup)(void *))
-{
- if (pool && pool != beam->pool) {
- apr_pool_pre_cleanup_register(pool, beam, cleanup);
- return 1;
- }
- return 0;
-}
-
-static int pool_kill(h2_bucket_beam *beam, apr_pool_t *pool,
- apr_status_t (*cleanup)(void *)) {
- if (pool && pool != beam->pool) {
- apr_pool_cleanup_kill(pool, beam, cleanup);
- return 1;
- }
- return 0;
-}
-
-static apr_status_t beam_recv_cleanup(void *data)
-{
- h2_bucket_beam *beam = data;
- /* receiver pool has gone away, clear references */
- beam->recv_buffer = NULL;
- beam->recv_pool = NULL;
- return APR_SUCCESS;
-}
-
static apr_status_t beam_send_cleanup(void *data)
{
h2_bucket_beam *beam = data;
/* sender is going away, clear up all references to its memory */
- r_purge_sent(beam);
- h2_blist_cleanup(&beam->send_list);
report_consumption(beam, NULL);
while (!H2_BPROXY_LIST_EMPTY(&beam->proxies)) {
h2_beam_proxy *proxy = H2_BPROXY_LIST_FIRST(&beam->proxies);
}
h2_blist_cleanup(&beam->purge_list);
h2_blist_cleanup(&beam->hold_list);
- beam->send_pool = NULL;
+ h2_blist_cleanup(&beam->send_list);
return APR_SUCCESS;
}
-static void beam_set_send_pool(h2_bucket_beam *beam, apr_pool_t *pool)
-{
- if (beam->send_pool != pool) {
- if (beam->send_pool && beam->send_pool != beam->pool) {
- pool_kill(beam, beam->send_pool, beam_send_cleanup);
- beam_send_cleanup(beam);
- }
- beam->send_pool = pool;
- pool_register(beam, beam->send_pool, beam_send_cleanup);
- }
-}
-
static void recv_buffer_cleanup(h2_bucket_beam *beam, h2_beam_lock *bl)
{
if (beam->recv_buffer && !APR_BRIGADE_EMPTY(beam->recv_buffer)) {
}
}
-static apr_status_t beam_cleanup(h2_bucket_beam *beam, int from_pool)
-{
- apr_status_t status = APR_SUCCESS;
- int safe_send = (beam->owner == H2_BEAM_OWNER_SEND);
- int safe_recv = (beam->owner == H2_BEAM_OWNER_RECV);
-
- /*
- * Owner of the beam is going away, depending on which side it owns,
- * cleanup strategies will differ.
- *
- * In general, receiver holds references to memory from sender.
- * Clean up receiver first, if safe, then cleanup sender, if safe.
- */
-
- /* When called from pool destroy, io callbacks are disabled */
- if (from_pool) {
- beam->cons_io_cb = NULL;
- }
-
- /* When modify send is not safe, this means we still have multi-thread
- * protection and the owner is receiving the buckets. If the sending
- * side has not gone away, this means we could have dangling buckets
- * in our lists that never get destroyed. This should not happen. */
- ap_assert(safe_send || !beam->send_pool);
- if (!H2_BLIST_EMPTY(&beam->send_list)) {
- ap_assert(beam->send_pool);
- }
-
- if (safe_recv) {
- if (beam->recv_pool) {
- pool_kill(beam, beam->recv_pool, beam_recv_cleanup);
- beam->recv_pool = NULL;
- }
- recv_buffer_cleanup(beam, NULL);
- }
- else {
- beam->recv_buffer = NULL;
- beam->recv_pool = NULL;
- }
-
- if (safe_send && beam->send_pool) {
- pool_kill(beam, beam->send_pool, beam_send_cleanup);
- status = beam_send_cleanup(beam);
- }
-
- if (safe_recv) {
- ap_assert(H2_BPROXY_LIST_EMPTY(&beam->proxies));
- ap_assert(H2_BLIST_EMPTY(&beam->send_list));
- ap_assert(H2_BLIST_EMPTY(&beam->hold_list));
- ap_assert(H2_BLIST_EMPTY(&beam->purge_list));
- }
- return status;
-}
-
-static apr_status_t beam_pool_cleanup(void *data)
-{
- return beam_cleanup(data, 1);
-}
-
apr_status_t h2_beam_destroy(h2_bucket_beam *beam)
{
- apr_pool_cleanup_kill(beam->pool, beam, beam_pool_cleanup);
- return beam_cleanup(beam, 0);
+ /* no more io callbacks */
+ beam->cons_io_cb = NULL;
+ beam->recv_buffer = NULL;
+ beam->recv_pool = NULL;
+
+ return beam_send_cleanup(beam);
}
apr_status_t h2_beam_create(h2_bucket_beam **pbeam, apr_pool_t *pool,
int id, const char *tag,
- h2_beam_owner_t owner,
apr_size_t max_buf_size,
apr_interval_time_t timeout)
{
beam->id = id;
beam->tag = tag;
beam->pool = pool;
- beam->owner = owner;
H2_BLIST_INIT(&beam->send_list);
H2_BLIST_INIT(&beam->hold_list);
H2_BLIST_INIT(&beam->purge_list);
beam->max_buf_size = max_buf_size;
beam->timeout = timeout;
- rv = apr_thread_mutex_create(&beam->lock, APR_THREAD_MUTEX_DEFAULT, pool);
+ rv = apr_thread_mutex_create(&beam->lock, APR_THREAD_MUTEX_NESTED, pool);
if (APR_SUCCESS == rv) {
rv = apr_thread_cond_create(&beam->change, pool);
- if (APR_SUCCESS == rv) {
- apr_pool_pre_cleanup_register(pool, beam, beam_pool_cleanup);
- *pbeam = beam;
- }
}
+ *pbeam = (APR_SUCCESS == rv)? beam : NULL;
return rv;
}
if (beam && enter_yellow(beam, &bl) == APR_SUCCESS) {
beam->aborted = 1;
- r_purge_sent(beam);
+ h2_blist_cleanup(&beam->purge_list);
h2_blist_cleanup(&beam->send_list);
report_consumption(beam, &bl);
apr_thread_cond_broadcast(beam->change);
h2_beam_lock bl;
if (beam && enter_yellow(beam, &bl) == APR_SUCCESS) {
- r_purge_sent(beam);
+ h2_blist_cleanup(&beam->purge_list);
beam_close(beam);
report_consumption(beam, &bl);
leave_yellow(beam, &bl);
return status;
}
-static void move_to_hold(h2_bucket_beam *beam,
- apr_bucket_brigade *sender_bb)
-{
- apr_bucket *b;
- while (sender_bb && !APR_BRIGADE_EMPTY(sender_bb)) {
- b = APR_BRIGADE_FIRST(sender_bb);
- APR_BUCKET_REMOVE(b);
- H2_BLIST_INSERT_TAIL(&beam->send_list, b);
- }
-}
-
static apr_status_t append_bucket(h2_bucket_beam *beam,
apr_bucket *b,
apr_read_type_e block,
if (APR_BUCKET_IS_EOS(b)) {
beam->closed = 1;
}
+ if (AP_BUCKET_IS_EOR(b)) {
+ /* The problem with EOR buckets:
+ * - we cannot delete it now, as it will destroy the request pool
+ * and free data that we are still holding in the beam.
+ * - if we add it to the send_list, as all other buckets,
+ * it will most likely not be read, as an EOS came before.
+ * This means we still juggle it when the beam is destroyed,
+ * and rarely this seems to cause the pool to be freed twice...
+ * if asan stack traces are to be believed...
+ * - since we
+ */
+ beam->closed = 1;
+ }
APR_BUCKET_REMOVE(b);
H2_BLIST_INSERT_TAIL(&beam->send_list, b);
return APR_SUCCESS;
/* this takes care of transient buckets and converts them
* into heap ones. Other bucket types might or might not be
* affected by this. */
- status = apr_bucket_setaside(b, beam->send_pool);
+ status = apr_bucket_setaside(b, beam->pool);
}
else if (APR_BUCKET_IS_HEAP(b)) {
/* For heap buckets read from a receiver thread is fine. The
}
}
else if (APR_BUCKET_IS_FILE(b) && can_beam) {
- status = apr_bucket_setaside(b, beam->send_pool);
+ status = apr_bucket_setaside(b, beam->pool);
}
if (status == APR_ENOTIMPL) {
* use pools/allocators safely. */
status = apr_bucket_read(b, &data, &len, APR_BLOCK_READ);
if (status == APR_SUCCESS) {
- status = apr_bucket_setaside(b, beam->send_pool);
+ status = apr_bucket_setaside(b, beam->pool);
}
}
return APR_SUCCESS;
}
-void h2_beam_send_from(h2_bucket_beam *beam, apr_pool_t *p)
-{
- h2_beam_lock bl;
- /* Called from the sender thread to add buckets to the beam */
- if (enter_yellow(beam, &bl) == APR_SUCCESS) {
- r_purge_sent(beam);
- beam_set_send_pool(beam, p);
- leave_yellow(beam, &bl);
- }
-}
-
apr_status_t h2_beam_send(h2_bucket_beam *beam,
apr_bucket_brigade *sender_bb,
apr_read_type_e block)
/* Called from the sender thread to add buckets to the beam */
if (enter_yellow(beam, &bl) == APR_SUCCESS) {
- ap_assert(beam->send_pool);
- r_purge_sent(beam);
+ ap_assert(beam->pool);
+ h2_blist_cleanup(&beam->purge_list);
if (beam->aborted) {
- move_to_hold(beam, sender_bb);
+ brigade_move_to_blist(sender_bb, &beam->send_list);
rv = APR_ECONNABORTED;
}
else if (sender_bb) {
while (!APR_BRIGADE_EMPTY(sender_bb) && APR_SUCCESS == rv) {
if (space_left <= 0) {
report_prod_io(beam, force_report, &bl);
- r_purge_sent(beam);
+ h2_blist_cleanup(&beam->purge_list);
rv = wait_not_full(beam, block, &space_left, &bl);
if (APR_SUCCESS != rv) {
break;
typedef int h2_beam_can_beam_callback(void *ctx, h2_bucket_beam *beam,
apr_file_t *file);
-typedef enum {
- H2_BEAM_OWNER_SEND,
- H2_BEAM_OWNER_RECV
-} h2_beam_owner_t;
-
/**
* Will deny all transfer of apr_file_t across the beam and force
* a data copy instead.
int id;
const char *tag;
apr_pool_t *pool;
- h2_beam_owner_t owner;
h2_blist send_list;
h2_blist hold_list;
h2_blist purge_list;
apr_bucket_brigade *recv_buffer;
h2_bproxy_list proxies;
- apr_pool_t *send_pool;
apr_pool_t *recv_pool;
apr_size_t max_buf_size;
* @param pool pool owning the beam, beam will cleanup when pool released
* @param id identifier of the beam
* @param tag tag identifying beam for logging
- * @param owner if the beam is owned by the sender or receiver, e.g. if
- * the pool owner is using this beam for sending or receiving
* @param buffer_size maximum memory footprint of buckets buffered in beam, or
* 0 for no limitation
* @param timeout timeout for blocking operations
apr_status_t h2_beam_create(h2_bucket_beam **pbeam,
apr_pool_t *pool,
int id, const char *tag,
- h2_beam_owner_t owner,
apr_size_t buffer_size,
apr_interval_time_t timeout);
apr_bucket_brigade *bb,
apr_read_type_e block);
-/**
- * Register the pool from which future buckets are send. This defines
- * the lifetime of the buckets, e.g. the pool should not be cleared/destroyed
- * until the data is no longer needed (or has been received).
- */
-void h2_beam_send_from(h2_bucket_beam *beam, apr_pool_t *p);
-
/**
* Receive buckets from the beam into the given brigade. Will return APR_EOF
* when reading past an EOS bucket. Reads can be blocking until data is
#include "h2_private.h"
#include "h2.h"
+#include "h2_ctx.h"
#include "h2_mplx.h"
-#include "h2_stream.h"
+#include "h2_session.h"
#include "h2_bucket_eos.h"
typedef struct {
apr_bucket_refcount refcount;
- h2_stream *stream;
+ conn_rec *c;
+ int stream_id;
} h2_bucket_eos;
-static apr_status_t bucket_cleanup(void *data)
-{
- h2_stream **pstream = data;
-
- if (*pstream) {
- /* If bucket_destroy is called after us, this prevents
- * bucket_destroy from trying to destroy the stream again. */
- *pstream = NULL;
- }
- return APR_SUCCESS;
-}
-
static apr_status_t bucket_read(apr_bucket *b, const char **str,
apr_size_t *len, apr_read_type_e block)
{
return APR_SUCCESS;
}
-apr_bucket *h2_bucket_eos_make(apr_bucket *b, h2_stream *stream)
+apr_bucket *h2_bucket_eos_make(apr_bucket *b, conn_rec *c, int stream_id)
{
h2_bucket_eos *h;
h = apr_bucket_alloc(sizeof(*h), b->list);
- h->stream = stream;
+ h->c = c;
+ h->stream_id = stream_id;
b = apr_bucket_shared_make(b, h, 0, 0);
b->type = &h2_bucket_type_eos;
return b;
}
-apr_bucket *h2_bucket_eos_create(apr_bucket_alloc_t *list,
- h2_stream *stream)
+apr_bucket *h2_bucket_eos_create(apr_bucket_alloc_t *list, conn_rec *c, int stream_id)
{
apr_bucket *b = apr_bucket_alloc(sizeof(*b), list);
APR_BUCKET_INIT(b);
b->free = apr_bucket_free;
b->list = list;
- b = h2_bucket_eos_make(b, stream);
- if (stream) {
- h2_bucket_eos *h = b->data;
- apr_pool_pre_cleanup_register(stream->pool, &h->stream, bucket_cleanup);
- }
+ b = h2_bucket_eos_make(b, c, stream_id);
return b;
}
static void bucket_destroy(void *data)
{
h2_bucket_eos *h = data;
-
+ h2_session *session;
+
if (apr_bucket_shared_destroy(h)) {
- h2_stream *stream = h->stream;
- if (stream && stream->pool) {
- apr_pool_cleanup_kill(stream->pool, &h->stream, bucket_cleanup);
+ if ((session = h2_ctx_get_session(h->c))) {
+ h2_session_eos_sent(session, h->stream_id);
}
apr_bucket_free(h);
- if (stream) {
- h2_stream_dispatch(stream, H2_SEV_EOS_SENT);
- }
}
}
#ifndef mod_http2_h2_bucket_stream_eos_h
#define mod_http2_h2_bucket_stream_eos_h
-struct h2_stream;
-
/** End Of HTTP/2 STREAM (H2EOS) bucket */
extern const apr_bucket_type_t h2_bucket_type_eos;
#define H2_BUCKET_IS_H2EOS(e) (e->type == &h2_bucket_type_eos)
-apr_bucket *h2_bucket_eos_make(apr_bucket *b, struct h2_stream *stream);
+apr_bucket *h2_bucket_eos_make(apr_bucket *b, conn_rec *c, int stream_id);
-apr_bucket *h2_bucket_eos_create(apr_bucket_alloc_t *list,
- struct h2_stream *stream);
+apr_bucket *h2_bucket_eos_create(apr_bucket_alloc_t *list, conn_rec *c, int stream_id);
#endif /* mod_http2_h2_bucket_stream_eos_h */
#include "h2_filter.h"
#include "h2_mplx.h"
#include "h2_session.h"
-#include "h2_stream.h"
#include "h2_h2.h"
#include "h2_task.h"
#include "h2_workers.h"
void h2_slave_destroy(conn_rec *slave)
{
- ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, slave,
- "h2_slave(%s): destroy", slave->log_id);
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, slave, "h2_slave(%s): destroy", slave->log_id);
slave->sbh = NULL;
apr_pool_destroy(slave->pool);
}
io->c = c;
io->output = apr_brigade_create(c->pool, c->bucket_alloc);
io->is_tls = h2_h2_is_tls(c);
- io->buffer_output = io->is_tls;
+ /* we used to buffer only on TLS connections, but to eliminate code paths
+ * and force more predictable behaviour, we do it on all now. Less test cases. */
io->flush_threshold = (apr_size_t)h2_config_sgeti64(s, H2_CONF_STREAM_MAX_MEM);
if (io->is_tls) {
else {
io->warmup_size = 0;
io->cooldown_usecs = 0;
- io->write_size = 0;
+ io->write_size = WRITE_SIZE_MAX;
}
if (APLOGctrace1(c)) {
ap_log_cerror(APLOG_MARK, APLOG_TRACE4, 0, io->c,
- "h2_conn_io(%ld): init, buffering=%d, warmup_size=%ld, "
- "cd_secs=%f", io->c->id, io->buffer_output,
- (long)io->warmup_size,
+ "h2_conn_io(%ld): init, warmup_size=%ld, cd_secs=%f",
+ io->c->id, (long)io->warmup_size,
((double)io->cooldown_usecs/APR_USEC_PER_SEC));
}
io->is_flushed = 0;
}
- if (io->buffer_output) {
- while (length > 0) {
- remain = assure_scratch_space(io);
- if (remain >= length) {
- memcpy(io->scratch + io->slen, data, length);
- io->slen += length;
- length = 0;
- }
- else {
- memcpy(io->scratch + io->slen, data, remain);
- io->slen += remain;
- data += remain;
- length -= remain;
- }
+ while (length > 0) {
+ remain = assure_scratch_space(io);
+ if (remain >= length) {
+ memcpy(io->scratch + io->slen, data, length);
+ io->slen += length;
+ length = 0;
+ }
+ else {
+ memcpy(io->scratch + io->slen, data, remain);
+ io->slen += remain;
+ data += remain;
+ length -= remain;
}
- }
- else {
- status = apr_brigade_write(io->output, NULL, NULL, data, length);
}
return status;
}
b = APR_BRIGADE_FIRST(bb);
if (APR_BUCKET_IS_METADATA(b)) {
- /* need to finish any open scratch bucket, as meta data
- * needs to be forward "in order". */
- append_scratch(io);
- APR_BUCKET_REMOVE(b);
- APR_BRIGADE_INSERT_TAIL(io->output, b);
- }
- else if (io->buffer_output) {
- apr_size_t remain = assure_scratch_space(io);
- if (b->length > remain) {
- apr_bucket_split(b, remain);
- if (io->slen == 0) {
- /* complete write_size bucket, append unchanged */
- APR_BUCKET_REMOVE(b);
- APR_BRIGADE_INSERT_TAIL(io->output, b);
- continue;
- }
+ if (APR_BUCKET_IS_FLUSH(b)) {
+ /* need to finish any open scratch bucket, as meta data
+ * needs to be forward "in order". */
+ append_scratch(io);
+ APR_BUCKET_REMOVE(b);
+ APR_BRIGADE_INSERT_TAIL(io->output, b);
}
else {
- /* bucket fits in remain, copy to scratch */
- status = read_to_scratch(io, b);
apr_bucket_delete(b);
- continue;
}
}
else {
- /* no buffering, forward buckets setaside on flush */
- if (APR_BUCKET_IS_TRANSIENT(b)) {
- apr_bucket_setaside(b, io->c->pool);
+ apr_size_t remain = assure_scratch_space(io);
+ if (b->length > remain) {
+ apr_bucket_split(b, remain);
}
- APR_BUCKET_REMOVE(b);
- APR_BRIGADE_INSERT_TAIL(io->output, b);
+ /* bucket now fits in remain, copy to scratch */
+ status = read_to_scratch(io, b);
+ apr_bucket_delete(b);
+ continue;
}
}
return status;
apr_int64_t bytes_read;
apr_int64_t bytes_written;
- int buffer_output;
apr_size_t flush_threshold;
unsigned int is_flushed : 1;
struct h2_bucket_beam;
struct h2_headers;
-struct h2_stream;
struct h2_session;
typedef struct h2_filter_cin {
#include "h2_private.h"
#include "h2_bucket_beam.h"
-#include "h2_stream.h"
#include "h2_task.h"
#include "h2_config.h"
#include "h2_ctx.h"
}
check_push(r, "late_fixup");
}
+ /* enforce that we will close this slave connection after
+ * the task is done. This will keep request processing from
+ * trying to clean up dangling input data, for example. */
+ r->connection->keepalive = AP_CONN_CLOSE;
}
return DECLINED;
}
}
#define H2_MPLX_ENTER(m) \
- do { apr_status_t rv; if ((rv = apr_thread_mutex_lock(m->lock)) != APR_SUCCESS) {\
- return rv;\
+ do { apr_status_t lrv; if ((lrv = apr_thread_mutex_lock(m->lock)) != APR_SUCCESS) {\
+ return lrv;\
} } while(0)
#define H2_MPLX_LEAVE(m) \
h2_ihash_add(m->spurge, stream);
}
-static void stream_cleanup(h2_mplx *m, h2_stream *stream)
+static void stream_discard(h2_mplx *m, h2_stream *stream)
{
ap_assert(stream->state == H2_SS_CLEANUP);
}
apr_pool_tag(m->pool, "h2_mplx");
apr_allocator_owner_set(allocator, m->pool);
- status = apr_thread_mutex_create(&mutex, APR_THREAD_MUTEX_DEFAULT,
+ status = apr_thread_mutex_create(&mutex, APR_THREAD_MUTEX_NESTED,
m->pool);
if (status != APR_SUCCESS) {
apr_pool_destroy(m->pool);
}
apr_allocator_mutex_set(allocator, mutex);
- status = apr_thread_mutex_create(&m->lock, APR_THREAD_MUTEX_DEFAULT,
+ status = apr_thread_mutex_create(&m->lock, APR_THREAD_MUTEX_NESTED,
m->pool);
if (status != APR_SUCCESS) {
apr_pool_destroy(m->pool);
h2_mplx *m = ctx;
h2_stream *stream = val;
+ /* Make dead certain we are called for a stream
+ to purge and that we have not already done so */
+ ap_assert(h2_ihash_get(m->spurge, stream->id) == stream);
+
h2_ihash_remove(m->spurge, stream->id);
ap_assert(stream->state == H2_SS_CLEANUP);
+ stream->state = H2_SS_DESTROYED;
if (stream->input) {
/* Process outstanding events before destruction */
&& !task->rst_error);
}
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c,
+ APLOGNO(03385) "h2_task_destroy, reuse slave=%d", reuse_slave);
+ task->c = NULL;
+ h2_task_destroy(task);
+
if (reuse_slave) {
- h2_beam_log(task->output.beam, m->c, APLOG_DEBUG,
- APLOGNO(03385) "h2_task_destroy, reuse slave");
- h2_task_destroy(task);
APR_ARRAY_PUSH(m->spare_slaves, conn_rec*) = slave;
}
else {
- h2_beam_log(task->output.beam, m->c, APLOG_TRACE1,
- "h2_task_destroy, destroy slave");
h2_slave_destroy(slave);
}
}
return 0;
}
-static void purge_streams(h2_mplx *m, int lock)
+static void purge_streams(h2_mplx *m)
{
+ H2_MPLX_ENTER_ALWAYS(m);
if (!h2_ihash_empty(m->spurge)) {
- H2_MPLX_ENTER_MAYBE(m, lock);
while (!h2_ihash_iter(m->spurge, stream_destroy_iter, m)) {
/* repeat until empty */
}
- H2_MPLX_LEAVE_MAYBE(m, lock);
}
+ H2_MPLX_LEAVE(m);
}
typedef struct {
return 1;
}
-static int stream_cancel_iter(void *ctx, void *val) {
+static int stream_cancel_and_discard_iter(void *ctx, void *val) {
h2_mplx *m = ctx;
h2_stream *stream = val;
h2_stream_rst(stream, H2_ERR_NO_ERROR);
/* All connection data has been sent, simulate cleanup */
h2_stream_dispatch(stream, H2_SEV_EOS_SENT);
- stream_cleanup(m, stream);
+ stream_discard(m, stream);
return 0;
}
/* How to shut down a h2 connection:
* 1. cancel all streams still active */
- while (!h2_ihash_iter(m->streams, stream_cancel_iter, m)) {
+ while (!h2_ihash_iter(m->streams, stream_cancel_and_discard_iter, m)) {
/* until empty */
}
h2_ihash_iter(m->shold, unexpected_stream_iter, m);
}
+ purge_streams(m);
m->c->aborted = old_aborted;
H2_MPLX_LEAVE(m);
"h2_mplx(%ld): released", m->id);
}
-apr_status_t h2_mplx_stream_cleanup(h2_mplx *m, h2_stream *stream)
+static h2_stream *mplx_stream_get(h2_mplx *m, int id)
{
- H2_MPLX_ENTER(m);
-
- ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
- H2_STRM_MSG(stream, "cleanup"));
- stream_cleanup(m, stream);
-
- H2_MPLX_LEAVE(m);
- return APR_SUCCESS;
+ return h2_ihash_get(m->streams, id);
}
h2_stream *h2_mplx_stream_get(h2_mplx *m, int id)
h2_stream *s = NULL;
H2_MPLX_ENTER_ALWAYS(m);
-
s = h2_ihash_get(m->streams, id);
-
H2_MPLX_LEAVE(m);
return s;
}
+apr_status_t h2_mplx_stream_discard(h2_mplx *m, int stream_id)
+{
+ h2_stream *stream;
+
+ H2_MPLX_ENTER(m);
+ stream = mplx_stream_get(m, stream_id);
+ if (stream) {
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
+ H2_STRM_MSG(stream, "cleanup"));
+ stream_discard(m, stream);
+ }
+ H2_MPLX_LEAVE(m);
+ return APR_SUCCESS;
+}
+
static void output_produced(void *ctx, h2_bucket_beam *beam, apr_off_t bytes)
{
h2_stream *stream = ctx;
status = APR_SUCCESS;
}
else {
- purge_streams(m, 0);
+ purge_streams(m);
h2_ihash_iter(m->streams, report_consumption_iter, m);
m->added_output = iowait;
status = apr_thread_cond_timedwait(m->added_output, m->lock, timeout);
}
}
-apr_status_t h2_mplx_process(h2_mplx *m, struct h2_stream *stream,
- h2_stream_pri_cmp *cmp, void *ctx)
+void h2_mplx_stream_register(h2_mplx *m, h2_stream *stream)
{
- apr_status_t status;
+ H2_MPLX_ENTER_ALWAYS(m);
+ AP_DEBUG_ASSERT(h2_ihash_get(m->streams, stream->id) == NULL);
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c, H2_STRM_MSG(stream, "registered"));
+ h2_ihash_add(m->streams, stream);
+ H2_MPLX_LEAVE(m);
+}
+
+apr_status_t h2_mplx_process(h2_mplx *m, int stream_id, h2_stream_pri_cmp *cmp, void *ctx)
+{
+ h2_stream *stream;
+ apr_status_t rv = APR_SUCCESS;
H2_MPLX_ENTER(m);
if (m->aborted) {
- status = APR_ECONNABORTED;
+ rv = APR_ECONNABORTED;
}
else {
- status = APR_SUCCESS;
- h2_ihash_add(m->streams, stream);
+ stream = mplx_stream_get(m, stream_id);
+ if (!stream) goto leave;
+ ap_assert(!stream->scheduled);
+ stream->scheduled = 1;
+
if (h2_stream_is_ready(stream)) {
/* already have a response */
check_data_for(m, stream, 0);
H2_STRM_MSG(stream, "process, added to q"));
}
}
-
+leave:
H2_MPLX_LEAVE(m);
- return status;
+ return rv;
}
static h2_task *next_stream_task(h2_mplx *m)
", out has %ld bytes buffered"),
h2_beam_is_closed(stream->output),
(long)h2_beam_get_buffered(stream->output));
- h2_ihash_add(m->streams, stream);
check_data_for(m, stream, 0);
stream->out_checked = 1;
status = APR_EAGAIN;
/* update input windows for streams */
h2_ihash_iter(m->streams, report_consumption_iter, m);
- purge_streams(m, 1);
+ purge_streams(m);
n = h2_ififo_count(m->readyq);
while (n > 0
* IO lifetime of streams.
******************************************************************************/
-struct h2_stream *h2_mplx_stream_get(h2_mplx *m, int id);
+/**
+ * Register a stream with the multiplexer. This transfers responisibility
+ * for lifetime and final destruction to mplx.
+
+ * @param mplx the multiplexer
+ * @param stream the h2 stream instance
+ */
+void h2_mplx_stream_register(h2_mplx *mplx, struct h2_stream *stream);
+
+/**
+ * Lookup a stream by its id. Will only return active streams, not discarded ones.
+ * @param mplx the multiplexer
+ * @param id the stream identifier
+ * @return the stream or NULL
+ */
+struct h2_stream *h2_mplx_stream_get(h2_mplx *mplx, int id);
/**
* Notifies mplx that a stream has been completely handled on the main
* @param m the mplx itself
* @param stream the stream ready for cleanup
*/
-apr_status_t h2_mplx_stream_cleanup(h2_mplx *m, struct h2_stream *stream);
+apr_status_t h2_mplx_stream_discard(h2_mplx *m, int stream_id);
/**
* Waits on output data from any stream in this session to become available.
* Process a stream request.
*
* @param m the multiplexer
- * @param stream the identifier of the stream
+ * @param stream_id the identifier of the stream
* @param r the request to be processed
* @param cmp the stream priority compare function
* @param ctx context data for the compare function
*/
-apr_status_t h2_mplx_process(h2_mplx *m, struct h2_stream *stream,
- h2_stream_pri_cmp *cmp, void *ctx);
+apr_status_t h2_mplx_process(h2_mplx *m, int stream_id, h2_stream_pri_cmp *cmp, void *ctx);
/**
* Stream priorities have changed, reschedule pending requests.
{
h2_push_diary_entry *entries = (h2_push_diary_entry*)diary->entries->elts;
h2_push_diary_entry e;
- apr_size_t lastidx = (apr_size_t)diary->entries->nelts;
- /* move entry[idx] to the end */
- if (idx+1 < lastidx) {
- e = entries[idx];
- memmove(entries+idx, entries+idx+1, sizeof(e) * (lastidx - idx));
- entries[lastidx] = e;
+ if (diary->entries->nelts > 0) {
+ int lastidx = diary->entries->nelts - 1;
+
+ /* move entry[idx] to the end */
+ if (idx < lastidx) {
+ e = entries[idx];
+ memmove(entries+idx, entries+idx+1, sizeof(e) * (lastidx - idx));
+ entries[lastidx] = e;
+ return &entries[lastidx];
+ }
}
- return &entries[lastidx];
+ return &entries[idx];
}
static void h2_push_diary_append(h2_push_diary *diary, h2_push_diary_entry *e)
}
}
pushes = h2_push_collect(stream->pool, req, stream->push_policy, res);
- return h2_push_diary_update(stream->session, pushes);
+ return h2_push_diary_update(session, pushes);
}
static apr_int32_t h2_log2inv(unsigned char log2)
static h2_stream *get_stream(h2_session *session, int stream_id)
{
- return nghttp2_session_get_stream_user_data(session->ngh2, stream_id);
+ h2_stream *stream;
+
+ if (stream_id <= 0) return NULL;
+ stream = h2_mplx_stream_get(session->mplx, stream_id);
+ if (!stream) {
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c,
+ "session_stream_get(%d) == NULL", stream_id);
+ }
+ return stream;
}
static void dispatch_event(h2_session *session, h2_session_event_t ev,
h2_mplx_stream_do(session->mplx, rst_unprocessed_stream, session);
}
-static h2_stream *h2_session_open_stream(h2_session *session, int stream_id,
- int initiated_on)
+static apr_pool_t *session_stream_pool_create(h2_session *session)
{
- h2_stream * stream;
- apr_pool_t *stream_pool;
-
- apr_pool_create(&stream_pool, session->pool);
- apr_pool_tag(stream_pool, "h2_stream");
+ apr_pool_t *pool;
- stream = h2_stream_create(stream_id, stream_pool, session,
- session->monitor, initiated_on);
- if (stream) {
- nghttp2_session_set_stream_user_data(session->ngh2, stream_id, stream);
- }
- return stream;
+ apr_pool_create(&pool, session->pool);
+ apr_pool_tag(pool, "h2_stream");
+ return pool;
+}
+
+static h2_stream *session_stream_pcreate(h2_session *session, int stream_id,
+ apr_pool_t *pool, int initiated_on)
+{
+ return h2_stream_create(stream_id, pool, session, session->monitor, initiated_on);
+}
+
+static h2_stream *session_stream_create(h2_session *session, int stream_id)
+{
+ return session_stream_pcreate(session, stream_id, session_stream_pool_create(session), 0);
}
/**
const nghttp2_frame *frame, void *userp)
{
h2_session *session = (h2_session *)userp;
- h2_stream *s;
+ h2_stream *stream;
/* We may see HEADERs at the start of a stream or after all DATA
* streams to carry trailers. */
(void)ngh2;
- s = get_stream(session, frame->hd.stream_id);
- if (s) {
- /* nop */
- }
- else {
- s = h2_session_open_stream(userp, frame->hd.stream_id, 0);
+ stream = get_stream(session, frame->hd.stream_id);
+ if (!stream) {
+ stream = session_stream_create(session, frame->hd.stream_id);
+ if (!stream) return NGHTTP2_ERR_START_STREAM_NOT_ALLOWED;
+ h2_mplx_stream_register(session->mplx, stream);
}
- return s? 0 : NGHTTP2_ERR_START_STREAM_NOT_ALLOWED;
+ return 0;
}
static int on_header_cb(nghttp2_session *ngh2, const nghttp2_frame *frame,
break;
case NGHTTP2_PRIORITY:
session->reprioritize = 1;
- ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c,
- "h2_stream(%ld-%d): PRIORITY frame "
- " weight=%d, dependsOn=%d, exclusive=%d",
- session->id, (int)frame->hd.stream_id,
- frame->priority.pri_spec.weight,
- frame->priority.pri_spec.stream_id,
- frame->priority.pri_spec.exclusive);
+ if (APLOGctrace2(session->c)) {
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c,
+ "h2_stream(%ld-%d): PRIORITY frame "
+ " weight=%d, dependsOn=%d, exclusive=%d",
+ session->id, (int)frame->hd.stream_id,
+ frame->priority.pri_spec.weight,
+ frame->priority.pri_spec.stream_id,
+ frame->priority.pri_spec.exclusive);
+ }
break;
case NGHTTP2_WINDOW_UPDATE:
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c,
}
break;
case NGHTTP2_RST_STREAM:
- ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03067)
- "h2_stream(%ld-%d): RST_STREAM by client, errror=%d",
- session->id, (int)frame->hd.stream_id,
- (int)frame->rst_stream.error_code);
- stream = get_stream(session, frame->hd.stream_id);
- if (stream && stream->initiated_on) {
- ++session->pushes_reset;
+ if (APLOGcdebug(session->c)) {
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03067)
+ "h2_stream(%ld-%d): RST_STREAM by client, errror=%d",
+ session->id, (int)frame->hd.stream_id,
+ (int)frame->rst_stream.error_code);
}
- else {
- ++session->streams_reset;
+ stream = get_stream(session, frame->hd.stream_id);
+ if (stream) {
+ stream->initiated_on? ++session->pushes_reset : ++session->streams_reset;
}
break;
case NGHTTP2_GOAWAY:
}
}
- if (APR_SUCCESS != rv) return NGHTTP2_ERR_PROTO;
- return 0;
-}
-
-static int h2_session_continue_data(h2_session *session) {
- if (h2_mplx_has_master_events(session->mplx)) {
- return 0;
- }
- if (h2_conn_io_needs_flush(&session->io)) {
- return 0;
- }
- return 1;
+ return (APR_SUCCESS != rv)? NGHTTP2_ERR_PROTO : 0;
}
static char immortal_zeros[H2_MAX_PADLEN];
(void)ngh2;
(void)source;
- if (!h2_session_continue_data(session)) {
+ /* Be nimble, react to events from your tasks and do not buffer more than we need */
+ if (h2_mplx_has_master_events(session->mplx) ||h2_conn_io_needs_flush(&session->io)) {
return NGHTTP2_ERR_WOULDBLOCK;
}
}
apr_pool_tag(pool, "h2_session");
apr_allocator_owner_set(allocator, pool);
- status = apr_thread_mutex_create(&mutex, APR_THREAD_MUTEX_DEFAULT, pool);
+ status = apr_thread_mutex_create(&mutex, APR_THREAD_MUTEX_NESTED, pool);
if (status != APR_SUCCESS) {
- apr_pool_destroy(pool);
- return APR_ENOMEM;
+ goto fail;
}
apr_allocator_mutex_set(allocator, mutex);
status = apr_thread_cond_create(&session->iowait, session->pool);
if (status != APR_SUCCESS) {
- apr_pool_destroy(pool);
- return status;
+ goto fail;
}
session->in_pending = h2_iq_create(session->pool, (int)session->max_stream_count);
if (session->in_pending == NULL) {
- apr_pool_destroy(pool);
- return APR_ENOMEM;
+ status = APR_ENOMEM;
+ goto fail;
}
session->in_process = h2_iq_create(session->pool, (int)session->max_stream_count);
if (session->in_process == NULL) {
- apr_pool_destroy(pool);
- return APR_ENOMEM;
+ status = APR_ENOMEM;
+ goto fail;
}
session->monitor = apr_pcalloc(pool, sizeof(h2_stream_monitor));
if (session->monitor == NULL) {
- apr_pool_destroy(pool);
- return APR_ENOMEM;
+ status = APR_ENOMEM;
+ goto fail;
}
session->monitor->ctx = session;
session->monitor->on_state_enter = on_stream_state_enter;
if (status != APR_SUCCESS) {
ap_log_cerror(APLOG_MARK, APLOG_ERR, status, c, APLOGNO(02927)
"nghttp2: error in init_callbacks");
- apr_pool_destroy(pool);
- return status;
+ status = APR_ENOMEM;
+ goto fail;
}
rv = nghttp2_option_new(&options);
ap_log_cerror(APLOG_MARK, APLOG_ERR, APR_EGENERAL, c,
APLOGNO(02928) "nghttp2_option_new: %s",
nghttp2_strerror(rv));
- apr_pool_destroy(pool);
- return status;
+ status = APR_ENOMEM;
+ goto fail;
}
nghttp2_option_set_peer_max_concurrent_streams(options, (uint32_t)session->max_stream_count);
/* We need to handle window updates ourself, otherwise we
ap_log_cerror(APLOG_MARK, APLOG_ERR, APR_EGENERAL, c,
APLOGNO(02929) "nghttp2_session_server_new: %s",
nghttp2_strerror(rv));
- apr_pool_destroy(pool);
- return APR_ENOMEM;
+ status = APR_ENOMEM;
+ goto fail;
}
n = h2_config_sgeti(s, H2_CONF_PUSH_DIARY_SIZE);
apr_pool_pre_cleanup_register(pool, c, session_pool_cleanup);
return APR_SUCCESS;
+fail:
+ apr_pool_destroy(pool);
+ return status;
}
static apr_status_t h2_session_start(h2_session *session, int *rv)
}
/* Now we need to auto-open stream 1 for the request we got. */
- stream = h2_session_open_stream(session, 1, 0);
+ stream = session_stream_create(session, 1);
if (!stream) {
status = APR_EGENERAL;
ap_log_rerror(APLOG_MARK, APLOG_ERR, status, session->r,
nghttp2_strerror(*rv));
return status;
}
-
status = h2_stream_set_request_rec(stream, session->r, 1);
if (status != APR_SUCCESS) {
return status;
}
+ h2_mplx_stream_register(session->mplx, stream);
}
slen = 0;
return (ssize_t)nread;
}
-struct h2_stream *h2_session_push(h2_session *session, h2_stream *is,
- h2_push *push)
-{
- h2_stream *stream;
- h2_ngheader *ngh;
- apr_status_t status;
- int nid = 0;
-
- status = h2_req_create_ngheader(&ngh, is->pool, push->req);
- if (status == APR_SUCCESS) {
- nid = nghttp2_submit_push_promise(session->ngh2, 0, is->id,
- ngh->nv, ngh->nvlen, NULL);
- }
- if (status != APR_SUCCESS || nid <= 0) {
- ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, session->c,
- H2_STRM_LOG(APLOGNO(03075), is,
- "submitting push promise fail: %s"), nghttp2_strerror(nid));
- return NULL;
- }
- ++session->pushes_promised;
-
- ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c,
- H2_STRM_LOG(APLOGNO(03076), is, "SERVER_PUSH %d for %s %s on %d"),
- nid, push->req->method, push->req->path, is->id);
-
- stream = h2_session_open_stream(session, nid, is->id);
- if (!stream) {
- ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c,
- H2_STRM_LOG(APLOGNO(03077), stream,
- "failed to create stream obj %d"), nid);
- /* kill the push_promise */
- nghttp2_submit_rst_stream(session->ngh2, NGHTTP2_FLAG_NONE, nid,
- NGHTTP2_INTERNAL_ERROR);
- return NULL;
- }
-
- h2_session_set_prio(session, stream, push->priority);
- h2_stream_set_request(stream, push->req);
- ++session->unsent_promises;
- return stream;
-}
-
static int valid_weight(float f)
{
int w = (int)f;
(w > NGHTTP2_MAX_WEIGHT)? NGHTTP2_MAX_WEIGHT : w);
}
-apr_status_t h2_session_set_prio(h2_session *session, h2_stream *stream,
- const h2_priority *prio)
+static apr_status_t session_stream_priority_set(h2_session *session, h2_stream *stream,
+ const h2_priority *prio)
{
apr_status_t status = APR_SUCCESS;
#ifdef H2_NG2_CHANGE_PRIO
return status;
}
+apr_status_t h2_session_push(h2_session *session, int initiating_stream_id, h2_push *push)
+{
+ h2_stream *stream;
+ apr_pool_t *pool;
+ h2_ngheader *ngh;
+ int nid = 0;
+
+ pool = session_stream_pool_create(session);
+ if (APR_SUCCESS != h2_req_create_ngheader(&ngh, pool, push->req)) goto fail;
+
+ nid = nghttp2_submit_push_promise(session->ngh2, 0, initiating_stream_id,
+ ngh->nv, ngh->nvlen, NULL);
+ if (nid <= 0) {
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c,
+ APLOGNO(03075) "submitting push promise fail: %s", nghttp2_strerror(nid));
+ goto fail;
+ }
+
+ ++session->pushes_promised;
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c,
+ APLOGNO(03076) "SERVER_PUSH %d for %s %s on %d",
+ nid, push->req->method, push->req->path, initiating_stream_id);
+
+ stream = session_stream_pcreate(session, nid, pool, initiating_stream_id);
+ if (!stream) {
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c,
+ H2_STRM_LOG(APLOGNO(03077), stream,
+ "failed to create stream obj %d"), nid);
+ goto fail;
+ }
+
+ session_stream_priority_set(session, stream, push->priority);
+ h2_stream_request_set(stream, push->req);
+ ++session->unsent_promises;
+ h2_mplx_stream_register(session->mplx, stream);
+ return APR_SUCCESS;
+
+fail:
+ if (nid > 0) {
+ nghttp2_submit_rst_stream(session->ngh2, NGHTTP2_FLAG_NONE, nid, NGHTTP2_INTERNAL_ERROR);
+ }
+ if (pool) apr_pool_destroy(pool);
+ return APR_EINVAL;
+}
+
int h2_session_push_enabled(h2_session *session)
{
/* iff we can and they can and want */
if (!stream->pref_priority) {
stream->pref_priority = h2_stream_get_priority(stream, headers);
}
- h2_session_set_prio(session, stream, stream->pref_priority);
+ session_stream_priority_set(session, stream, stream->pref_priority);
note = apr_table_get(headers->notes, H2_FILTER_DEBUG_NOTE);
if (note && !strcmp("on", note)) {
while ((id = h2_iq_shift(session->in_process)) > 0) {
h2_stream *stream = get_stream(session, id);
if (stream) {
- ap_assert(!stream->scheduled);
if (h2_stream_prep_processing(stream) == APR_SUCCESS) {
- h2_mplx_process(session->mplx, stream, stream_pri_cmp, session);
+ h2_mplx_process(session->mplx, id, stream_pri_cmp, session);
}
else {
h2_stream_rst(stream, H2_ERR_INTERNAL_ERROR);
h2_iq_append(session->in_process, stream->id);
}
+void h2_session_eos_sent(h2_session *session, int stream_id)
+{
+ /* stream may no longer be known by nghttp2, but still kept in mplx */
+ h2_stream *stream = h2_mplx_stream_get(session->mplx, stream_id);
+ if (stream) {
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c,
+ H2_STRM_MSG(stream, "eos sent"));
+ h2_stream_dispatch(stream, H2_SEV_EOS_SENT);
+ }
+ else {
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c,
+ "eos sent for unknown stream %d", stream_id);
+ }
+}
+
static void ev_stream_closed(h2_session *session, h2_stream *stream)
{
apr_bucket *b;
break;
}
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c,
+ H2_STRM_MSG(stream, "sending eos"));
/* The stream might have data in the buffers of the main connection.
* We can only free the allocated resources once all had been written.
* Send a special buckets on the connection that gets destroyed when
* all preceding data has been handled. On its destruction, it is safe
* to purge all resources of the stream. */
- b = h2_bucket_eos_create(session->c->bucket_alloc, stream);
+ b = h2_bucket_eos_create(session->c->bucket_alloc, session->c, stream->id);
APR_BRIGADE_INSERT_TAIL(session->bbtmp, b);
h2_conn_io_pass(&session->io, session->bbtmp);
apr_brigade_cleanup(session->bbtmp);
ev_stream_closed(session, stream);
break;
case H2_SS_CLEANUP:
- h2_mplx_stream_cleanup(session->mplx, stream);
+ h2_mplx_stream_discard(session->mplx, stream->id);
break;
default:
break;
struct h2_push;
struct h2_push_diary;
struct h2_session;
-struct h2_stream;
struct h2_stream_monitor;
struct h2_task;
struct h2_workers;
* processing..
*
* @param session the session to work in
- * @param is the stream initiating the push
+ * @param initiating_stream_id id of the stream initiating this push
* @param push the push to promise
- * @return the new promised stream or NULL
*/
-struct h2_stream *h2_session_push(h2_session *session,
- struct h2_stream *is, struct h2_push *push);
+apr_status_t h2_session_push(h2_session *session,
+ int initiating_stream_id, struct h2_push *push);
-apr_status_t h2_session_set_prio(h2_session *session,
- struct h2_stream *stream,
- const struct h2_priority *prio);
+/**
+ * Notifies the session that the EOS for a stream has been sent.
+ * See h2_bucket_eos for usage.
+ */
+void h2_session_eos_sent(h2_session *session, int stream_id);
#define H2_SSSN_MSG(s, msg) \
"h2_session(%ld,%s,%d): "msg, s->id, h2_session_state_str(s->state), \
/* state transisitions when certain frame types are sent */
static int trans_on_send[][H2_SS_MAX] = {
-/*S_IDLE,S_RS_R, S_RS_L, S_OPEN, S_CL_R, S_CL_L, S_CLS, S_CLN, */
-{ S_ERR, S_ERR, S_ERR, S_NOP, S_NOP, S_ERR, S_NOP, S_NOP, },/* DATA */
-{ S_ERR, S_ERR, S_CL_R, S_NOP, S_NOP, S_ERR, S_NOP, S_NOP, },/* HEADERS */
-{ S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, },/* PRIORITY */
-{ S_CLS, S_CLS, S_CLS, S_CLS, S_CLS, S_CLS, S_NOP, S_NOP, },/* RST_STREAM */
-{ S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, },/* SETTINGS */
-{ S_RS_L,S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, },/* PUSH_PROMISE */
-{ S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, },/* PING */
-{ S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, },/* GOAWAY */
-{ S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, },/* WINDOW_UPDATE */
-{ S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, },/* CONT */
+/*S_IDLE,S_RS_R, S_RS_L, S_OPEN, S_CL_R, S_CL_L, S_CLS, S_CLN, S_DSTR */
+{ S_ERR, S_ERR, S_ERR, S_NOP, S_NOP, S_ERR, S_NOP, S_NOP, S_NOP, },/* DATA */
+{ S_ERR, S_ERR, S_CL_R, S_NOP, S_NOP, S_ERR, S_NOP, S_NOP, S_NOP, },/* HEADERS */
+{ S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, },/* PRIORITY */
+{ S_CLS, S_CLS, S_CLS, S_CLS, S_CLS, S_CLS, S_NOP, S_NOP, S_NOP, },/* RST_STREAM */
+{ S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_NOP, },/* SETTINGS */
+{ S_RS_L,S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_NOP, },/* PUSH_PROMISE */
+{ S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_NOP, },/* PING */
+{ S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_NOP, },/* GOAWAY */
+{ S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, },/* WINDOW_UPDATE */
+{ S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, },/* CONT */
};
/* state transisitions when certain frame types are received */
static int trans_on_recv[][H2_SS_MAX] = {
-/*S_IDLE,S_RS_R, S_RS_L, S_OPEN, S_CL_R, S_CL_L, S_CLS, S_CLN, */
-{ S_ERR, S_ERR, S_ERR, S_NOP, S_ERR, S_NOP, S_NOP, S_NOP, },/* DATA */
-{ S_OPEN,S_CL_L, S_ERR, S_NOP, S_ERR, S_NOP, S_NOP, S_NOP, },/* HEADERS */
-{ S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, },/* PRIORITY */
-{ S_ERR, S_CLS, S_CLS, S_CLS, S_CLS, S_CLS, S_NOP, S_NOP, },/* RST_STREAM */
-{ S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, },/* SETTINGS */
-{ S_RS_R,S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, },/* PUSH_PROMISE */
-{ S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, },/* PING */
-{ S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, },/* GOAWAY */
-{ S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, },/* WINDOW_UPDATE */
-{ S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, },/* CONT */
+/*S_IDLE,S_RS_R, S_RS_L, S_OPEN, S_CL_R, S_CL_L, S_CLS, S_CLN, S_DSTR */
+{ S_ERR, S_ERR, S_ERR, S_NOP, S_ERR, S_NOP, S_NOP, S_NOP, S_NOP, },/* DATA */
+{ S_OPEN,S_CL_L, S_ERR, S_NOP, S_ERR, S_NOP, S_NOP, S_NOP, S_NOP, },/* HEADERS */
+{ S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, },/* PRIORITY */
+{ S_ERR, S_CLS, S_CLS, S_CLS, S_CLS, S_CLS, S_NOP, S_NOP, S_NOP, },/* RST_STREAM */
+{ S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_NOP, },/* SETTINGS */
+{ S_RS_R,S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_NOP, },/* PUSH_PROMISE */
+{ S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_NOP, },/* PING */
+{ S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_NOP, },/* GOAWAY */
+{ S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, },/* WINDOW_UPDATE */
+{ S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, },/* CONT */
};
/* state transisitions when certain events happen */
static int trans_on_event[][H2_SS_MAX] = {
-/*S_IDLE,S_RS_R, S_RS_L, S_OPEN, S_CL_R, S_CL_L, S_CLS, S_CLN, */
-{ S_XXX, S_ERR, S_ERR, S_CL_L, S_CLS, S_XXX, S_XXX, S_XXX, },/* EV_CLOSED_L*/
-{ S_ERR, S_ERR, S_ERR, S_CL_R, S_ERR, S_CLS, S_NOP, S_NOP, },/* EV_CLOSED_R*/
-{ S_CLS, S_CLS, S_CLS, S_CLS, S_CLS, S_CLS, S_NOP, S_NOP, },/* EV_CANCELLED*/
-{ S_NOP, S_XXX, S_XXX, S_XXX, S_XXX, S_CLS, S_CLN, S_XXX, },/* EV_EOS_SENT*/
+/*S_IDLE,S_RS_R, S_RS_L, S_OPEN, S_CL_R, S_CL_L, S_CLS, S_CLN, S_DSTR */
+{ S_XXX, S_ERR, S_ERR, S_CL_L, S_CLS, S_XXX, S_XXX, S_XXX, S_NOP, },/* EV_CLOSED_L*/
+{ S_ERR, S_ERR, S_ERR, S_CL_R, S_ERR, S_CLS, S_NOP, S_NOP, S_NOP, },/* EV_CLOSED_R*/
+{ S_CLS, S_CLS, S_CLS, S_CLS, S_CLS, S_CLS, S_NOP, S_NOP, S_NOP, },/* EV_CANCELLED*/
+{ S_NOP, S_XXX, S_XXX, S_XXX, S_XXX, S_CLS, S_CLN, S_XXX, S_NOP, },/* EV_EOS_SENT*/
};
static int on_map(h2_stream_state_t state, int map[H2_SS_MAX])
|| APR_BRIGADE_EMPTY(stream->in_buffer)));
if (!empty) {
h2_beam_create(&stream->input, stream->pool, stream->id,
- "input", H2_BEAM_OWNER_SEND, 0,
- stream->session->s->timeout);
- h2_beam_send_from(stream->input, stream->pool);
+ "input", 0, stream->session->s->timeout);
}
}
return APR_SUCCESS;
H2_STRM_MSG(stream, "schedule %s %s://%s%s chunked=%d"),
r->method, r->scheme, r->authority, r->path, r->chunked);
setup_input(stream);
- stream->scheduled = 1;
return APR_SUCCESS;
}
return APR_EINVAL;
return status;
}
-void h2_stream_set_request(h2_stream *stream, const h2_request *r)
+void h2_stream_request_set(h2_stream *stream, const h2_request *r)
{
ap_assert(stream->request == NULL);
ap_assert(stream->rtmp == NULL);
apr_status_t h2_stream_submit_pushes(h2_stream *stream, h2_headers *response)
{
- apr_status_t status = APR_SUCCESS;
+ apr_status_t rv = APR_SUCCESS;
apr_array_header_t *pushes;
int i;
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c,
H2_STRM_MSG(stream, "found %d push candidates"),
pushes->nelts);
- for (i = 0; i < pushes->nelts; ++i) {
+ for (i = 0; i < pushes->nelts && (APR_SUCCESS == rv); ++i) {
h2_push *push = APR_ARRAY_IDX(pushes, i, h2_push*);
- h2_stream *s = h2_session_push(stream->session, stream, push);
- if (!s) {
- status = APR_ECONNRESET;
- break;
- }
+ rv = h2_session_push(stream->session, stream->id, push);
}
}
- return status;
+ return rv;
}
apr_table_t *h2_stream_get_trailers(h2_stream *stream)
apr_status_t h2_stream_in_consumed(h2_stream *stream, apr_off_t amount);
/**
- * Set complete stream headers from given h2_request.
+ * Set complete stream headers from given h2_request, creates a deep copy.
+ * Only to be called once to initialize.
*
* @param stream stream to write request to
* @param r the request with all the meta data
* @param eos != 0 iff stream input is closed
*/
-void h2_stream_set_request(h2_stream *stream, const h2_request *r);
+void h2_stream_request_set(h2_stream *stream, const h2_request *r);
/**
* Set complete stream header from given request_rec.
#include "h2_request.h"
#include "h2_headers.h"
#include "h2_session.h"
-#include "h2_stream.h"
#include "h2_task.h"
#include "h2_util.h"
return OK;
}
+static apr_status_t task_pool_cleanup(void *data)
+{
+ h2_task *task = data;
+
+ ap_assert(task->destroyed);
+ return APR_SUCCESS;
+}
+
h2_task *h2_task_create(conn_rec *slave, int stream_id,
const h2_request *req, h2_mplx *m,
h2_bucket_beam *input,
task->input.beam = input;
task->output.max_buffer = output_max_mem;
+ apr_pool_cleanup_register(pool, task, task_pool_cleanup, apr_pool_cleanup_null);
+
return task;
}
void h2_task_destroy(h2_task *task)
{
+ task->destroyed = 1;
if (task->output.beam) {
- h2_beam_log(task->output.beam, task->c, APLOG_TRACE2, "task_destroy");
h2_beam_destroy(task->output.beam);
task->output.beam = NULL;
}
}
h2_beam_create(&task->output.beam, c->pool, task->stream_id, "output",
- H2_BEAM_OWNER_SEND, 0, task->timeout);
+ 0, task->timeout);
if (!task->output.beam) {
return APR_ENOMEM;
}
h2_beam_buffer_size_set(task->output.beam, task->output.max_buffer);
- h2_beam_send_from(task->output.beam, task->pool);
h2_ctx_create_for(c, task);
apr_table_setn(c->notes, H2_TASK_ID_NOTE, task->id);
return DECLINED;
}
+
apr_time_t started_at; /* when processing started */
apr_time_t done_at; /* when processing was done */
apr_bucket *eor;
+ int destroyed;
};
h2_task *h2_task_create(conn_rec *slave, int stream_id,
- const h2_request *req, struct h2_mplx *m,
+ const struct h2_request *req, struct h2_mplx *m,
struct h2_bucket_beam *input,
apr_interval_time_t timeout,
apr_size_t output_max_mem);
* @macro
* Version number of the http2 module as c string
*/
-#define MOD_HTTP2_VERSION "1.14.1-git"
+#define MOD_HTTP2_VERSION "1.15.0-git"
/**
* @macro
* release. This is a 24 bit number with 8 bits for major number, 8 bits
* for minor and 8 bits for patch. Version 1.2.3 becomes 0x010203.
*/
-#define MOD_HTTP2_VERSION_NUM 0x010e01
+#define MOD_HTTP2_VERSION_NUM 0x010f00
#endif /* mod_h2_h2_version_h */
if (!slot->lock) {
status = apr_thread_mutex_create(&slot->lock,
- APR_THREAD_MUTEX_DEFAULT,
+ APR_THREAD_MUTEX_NESTED,
workers->pool);
if (status != APR_SUCCESS) {
push_slot(&workers->free, slot);
}
status = apr_thread_mutex_create(&workers->lock,
- APR_THREAD_MUTEX_DEFAULT,
+ APR_THREAD_MUTEX_NESTED,
workers->pool);
if (status == APR_SUCCESS) {
n = workers->nslots = workers->max_workers;
*/
apr_table_setn(ctx->p_conn->connection->notes,
"proxy-request-alpn-protos", "h2");
+ if (ctx->p_conn->ssl_hostname) {
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, ctx->owner,
+ "set SNI to %s for (%s)",
+ ctx->p_conn->ssl_hostname,
+ ctx->p_conn->hostname);
+ apr_table_setn(ctx->p_conn->connection->notes,
+ "proxy-request-hostname", ctx->p_conn->ssl_hostname);
+ }
}
if (ctx->master->aborted) goto cleanup;