return io;
}
+static void check_bbin(h2_io *io)
+{
+ if (!io->bbin) {
+ io->bbin = apr_brigade_create(io->pool, io->bucket_alloc);
+ io->tmp = apr_brigade_create(io->pool, io->bucket_alloc);
+ }
+}
+
void h2_io_redo(h2_io *io)
{
io->worker_started = 0;
}
}
-
void h2_io_rst(h2_io *io, int error)
{
io->rst_error = error;
io->eos_in = 1;
}
-int h2_io_in_has_eos_for(h2_io *io)
-{
- return io->eos_in || (io->bbin && h2_util_has_eos(io->bbin, -1));
-}
-
-int h2_io_in_has_data(h2_io *io)
-{
- return io->bbin && h2_util_bb_has_data_or_eos(io->bbin);
-}
-
int h2_io_out_has_data(h2_io *io)
{
return io->bbout && h2_util_bb_has_data_or_eos(io->bbout);
return status;
}
-apr_status_t h2_io_in_write(h2_io *io, apr_bucket_brigade *bb)
+apr_status_t h2_io_in_write(h2_io *io, const char *d, apr_size_t len, int eos)
{
if (io->rst_error) {
return APR_ECONNABORTED;
if (io->eos_in) {
return APR_EOF;
}
- io->eos_in = h2_util_has_eos(bb, -1);
- if (!APR_BRIGADE_EMPTY(bb)) {
- if (!io->bbin) {
- io->bbin = apr_brigade_create(io->pool, io->bucket_alloc);
- io->tmp = apr_brigade_create(io->pool, io->bucket_alloc);
- }
- return h2_util_move(io->bbin, bb, -1, NULL, "h2_io_in_write");
+ if (eos) {
+ io->eos_in = 1;
+ }
+ if (len > 0) {
+ check_bbin(io);
+ return apr_brigade_write(io->bbin, NULL, NULL, d, len);
}
return APR_SUCCESS;
}
int h2_io_is_repeatable(h2_io *io);
void h2_io_redo(h2_io *io);
-/**
- * The input data is completely queued. Blocked reads will return immediately
- * and give either data or EOF.
- */
-int h2_io_in_has_eos_for(h2_io *io);
/**
* Output data is available.
*/
int h2_io_out_has_data(h2_io *io);
-/**
- * Input data is available.
- */
-int h2_io_in_has_data(h2_io *io);
void h2_io_signal(h2_io *io, h2_io_op op);
void h2_io_signal_init(h2_io *io, h2_io_op op, apr_interval_time_t timeout,
/**
* Appends given bucket to the input.
*/
-apr_status_t h2_io_in_write(h2_io *io, apr_bucket_brigade *bb);
+apr_status_t h2_io_in_write(h2_io *io, const char *d, apr_size_t len, int eos);
/**
* Closes the input. After existing data has been read, APR_EOF will
}
apr_status_t h2_mplx_in_write(h2_mplx *m, int stream_id,
- apr_bucket_brigade *bb)
+ const char *data, apr_size_t len, int eos)
{
apr_status_t status;
int acquired;
h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
if (io && !io->orphaned) {
H2_MPLX_IO_IN(APLOG_TRACE2, m, io, "h2_mplx_in_write_pre");
- status = h2_io_in_write(io, bb);
+ status = h2_io_in_write(io, data, len, eos);
H2_MPLX_IO_IN(APLOG_TRACE2, m, io, "h2_mplx_in_write_post");
h2_io_signal(io, H2_IO_READ);
io_process_events(m, io);
return status;
}
-int h2_mplx_in_has_eos_for(h2_mplx *m, int stream_id)
-{
- int has_eos = 0;
- int acquired;
-
- apr_status_t status;
- AP_DEBUG_ASSERT(m);
- if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
- h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
- if (io && !io->orphaned) {
- has_eos = h2_io_in_has_eos_for(io);
- }
- else {
- has_eos = 1;
- }
- leave_mutex(m, acquired);
- }
- return has_eos;
-}
-
-int h2_mplx_in_has_data_for(h2_mplx *m, int stream_id)
-{
- apr_status_t status;
- int has_data = 0;
- int acquired;
-
- AP_DEBUG_ASSERT(m);
- if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
- h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
- if (io && !io->orphaned) {
- has_data = h2_io_in_has_data(io);
- }
- else {
- has_data = 0;
- }
- leave_mutex(m, acquired);
- }
- return has_data;
-}
-
int h2_mplx_out_has_data_for(h2_mplx *m, int stream_id)
{
apr_status_t status;
*/
int h2_mplx_out_has_data_for(h2_mplx *m, int stream_id);
-/* Return != 0 iff the multiplexer has input data for the given stream.
- */
-int h2_mplx_in_has_data_for(h2_mplx *m, int stream_id);
-
/**
* Waits on output data from any stream in this session to become available.
* Returns APR_TIMEUP if no data arrived in the given time.
* Appends data to the input of the given stream. Storage of input data is
* not subject to flow control.
*/
-apr_status_t h2_mplx_in_write(h2_mplx *mplx, int stream_id,
- apr_bucket_brigade *bb);
+apr_status_t h2_mplx_in_write(h2_mplx *m, int stream_id,
+ const char *data, apr_size_t len, int eos);
/**
* Closes the input for the given stream_id.
*/
apr_status_t h2_mplx_in_close(h2_mplx *m, int stream_id);
-/**
- * Returns != 0 iff the input for the given stream has been closed. There
- * could still be data queued, but it can be read without blocking.
- */
-int h2_mplx_in_has_eos_for(h2_mplx *m, int stream_id);
-
/**
* Invoke the consumed callback for all streams that had bytes read since the
* last call to this function. If no stream had input data consumed, the
}
return 0;
}
-
- status = h2_stream_write_data(stream, (const char *)data, len);
+
+ /* FIXME: enabling setting EOS this way seems to break input handling
+ * in mod_proxy_http2. why? */
+ status = h2_stream_write_data(stream, (const char *)data, len,
+ 0 /*flags & NGHTTP2_FLAG_END_STREAM*/);
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, session->c,
"h2_stream(%ld-%d): data_chunk_recv, written %ld bytes",
session->id, stream_id, (long)len);
#include "h2_util.h"
-#define H2_STREAM_IN(lvl,s,msg) \
- do { \
- if (APLOG_C_IS_LEVEL((s)->session->c,lvl)) \
- h2_util_bb_log((s)->session->c,(s)->id,lvl,msg,(s)->bbin); \
- } while(0)
-
-
static int state_transition[][7] = {
/* ID OP RL RR CI CO CL */
/*ID*/{ 1, 0, 0, 0, 0, 0, 0 },
static h2_sos *h2_sos_mplx_create(h2_stream *stream, h2_response *response);
-h2_stream *h2_stream_create(int id, apr_pool_t *pool, h2_session *session)
+h2_stream *h2_stream_open(int id, apr_pool_t *pool, h2_session *session)
{
h2_stream *stream = apr_pcalloc(pool, sizeof(h2_stream));
stream->id = id;
stream->state = H2_STREAM_ST_IDLE;
stream->pool = pool;
stream->session = session;
- return stream;
-}
-
-h2_stream *h2_stream_open(int id, apr_pool_t *pool, h2_session *session)
-{
- h2_stream *stream = h2_stream_create(id, pool, session);
set_state(stream, H2_STREAM_ST_OPEN);
stream->request = h2_request_create(id, pool,
h2_config_geti(session->config, H2_CONF_SER_HEADERS));
if (status == APR_SUCCESS) {
if (!eos) {
stream->request->body = 1;
- stream->bbin = apr_brigade_create(stream->pool,
- stream->session->c->bucket_alloc);
}
stream->input_remaining = stream->request->content_length;
return stream->scheduled;
}
-static apr_status_t h2_stream_input_flush(h2_stream *stream)
-{
- apr_status_t status = APR_SUCCESS;
- if (stream->bbin && !APR_BRIGADE_EMPTY(stream->bbin)) {
-
- status = h2_mplx_in_write(stream->session->mplx, stream->id, stream->bbin);
- if (status != APR_SUCCESS) {
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, stream->session->mplx->c,
- "h2_stream(%ld-%d): flushing input data",
- stream->session->id, stream->id);
- }
- }
- return status;
-}
-
-static apr_status_t input_flush(apr_bucket_brigade *bb, void *ctx)
-{
- (void)bb;
- return h2_stream_input_flush(ctx);
-}
-
-static apr_status_t input_add_data(h2_stream *stream,
- const char *data, size_t len)
-{
- return apr_brigade_write(stream->bbin, input_flush, stream, data, len);
-}
-
apr_status_t h2_stream_close_input(h2_stream *stream)
{
apr_status_t status = APR_SUCCESS;
return APR_ECONNRESET;
}
- H2_STREAM_IN(APLOG_TRACE2, stream, "close_pre");
- if (close_input(stream) && stream->bbin) {
- status = h2_stream_input_flush(stream);
- if (status == APR_SUCCESS) {
- status = h2_mplx_in_close(stream->session->mplx, stream->id);
- }
+ if (close_input(stream)) {
+ status = h2_mplx_in_close(stream->session->mplx, stream->id);
}
- H2_STREAM_IN(APLOG_TRACE2, stream, "close_post");
return status;
}
apr_status_t h2_stream_write_data(h2_stream *stream,
- const char *data, size_t len)
+ const char *data, size_t len, int eos)
{
apr_status_t status = APR_SUCCESS;
AP_DEBUG_ASSERT(stream);
- if (input_closed(stream) || !stream->request->eoh || !stream->bbin) {
+ if (input_closed(stream) || !stream->request->eoh) {
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c,
- "h2_stream(%ld-%d): writing denied, closed=%d, eoh=%d, bbin=%d",
+ "h2_stream(%ld-%d): writing denied, closed=%d, eoh=%d",
stream->session->id, stream->id, input_closed(stream),
- stream->request->eoh, !!stream->bbin);
+ stream->request->eoh);
return APR_EINVAL;
}
"h2_stream(%ld-%d): add %ld input bytes",
stream->session->id, stream->id, (long)len);
- H2_STREAM_IN(APLOG_TRACE2, stream, "write_data_pre");
if (!stream->request->chunked) {
stream->input_remaining -= len;
if (stream->input_remaining < 0) {
}
}
- status = input_add_data(stream, data, len);
- if (status == APR_SUCCESS) {
- status = h2_stream_input_flush(stream);
+ status = h2_mplx_in_write(stream->session->mplx, stream->id, data, len, eos);
+ if (eos) {
+ close_input(stream);
}
- H2_STREAM_IN(APLOG_TRACE2, stream, "write_data_post");
return status;
}
unsigned int submitted : 1; /* response HEADER has been sent */
apr_off_t input_remaining; /* remaining bytes on input as advertised via content-length */
- apr_bucket_brigade *bbin; /* input DATA */
struct h2_sos *sos; /* stream output source, e.g. to read output from */
apr_off_t data_frames_sent; /* # of DATA frames sent out for this stream */
#define H2_STREAM_RST(s, def) (s->rst_error? s->rst_error : (def))
-/**
- * Create a stream in IDLE state.
- * @param id the stream identifier
- * @param pool the memory pool to use for this stream
- * @param session the session this stream belongs to
- * @return the newly created IDLE stream
- */
-h2_stream *h2_stream_create(int id, apr_pool_t *pool, struct h2_session *session);
-
/**
* Create a stream in OPEN state.
* @param id the stream identifier
* @param len the number of bytes to write
*/
apr_status_t h2_stream_write_data(h2_stream *stream,
- const char *data, size_t len);
+ const char *data, size_t len, int eos);
/**
* Reset the stream. Stream write/reads will return errors afterwards.