return status;
}
+apr_status_t h2_conn_io_append(h2_conn_io *io, apr_bucket *b)
+{
+ APR_BRIGADE_INSERT_TAIL(io->output, b);
+ return APR_SUCCESS;
+}
+
+apr_status_t h2_conn_io_pass(h2_conn_io *io, apr_bucket_brigade *bb)
+{
+ return h2_util_move(io->output, bb, 0, NULL, "h2_conn_io_pass");
+}
apr_status_t h2_conn_io_flush(h2_conn_io *io)
{
apr_status_t h2_conn_io_write(h2_conn_io *io,
const char *buf,
size_t length);
+
+apr_status_t h2_conn_io_append(h2_conn_io *io, apr_bucket *b);
+apr_status_t h2_conn_io_pass(h2_conn_io *io, apr_bucket_brigade *bb);
apr_status_t h2_conn_io_flush(h2_conn_io *io);
apr_brigade_length(bb, 1, &start_len);
last = APR_BRIGADE_LAST(bb);
- status = h2_util_move(bb, io->bbin, maxlen, 0,
- "h2_io_in_read");
+ status = h2_util_move(bb, io->bbin, maxlen, NULL, "h2_io_in_read");
if (status == APR_SUCCESS) {
apr_bucket *nlast = APR_BRIGADE_LAST(bb);
apr_off_t end_len = 0;
io->bbin = apr_brigade_create(io->bbout->p,
io->bbout->bucket_alloc);
}
- return h2_util_move(io->bbin, bb, 0, 0, "h2_io_in_write");
+ return h2_util_move(io->bbin, bb, 0, NULL, "h2_io_in_write");
}
return APR_SUCCESS;
}
return status;
}
+apr_status_t h2_io_out_read_to(h2_io *io, apr_bucket_brigade *bb,
+ apr_size_t *plen, int *peos)
+{
+ if (io->rst_error) {
+ return APR_ECONNABORTED;
+ }
+
+ if (io->eos_out) {
+ *plen = 0;
+ *peos = 1;
+ return APR_SUCCESS;
+ }
+
+
+ io->eos_out = *peos = h2_util_has_eos(io->bbout, *plen);
+ return h2_util_move(bb, io->bbout, *plen, NULL, "h2_io_read_to");
+}
+
apr_status_t h2_io_out_write(h2_io *io, apr_bucket_brigade *bb,
apr_size_t maxlen, int *pfile_handles_allowed)
{
h2_io_data_cb *cb, void *ctx,
apr_size_t *plen, int *peos);
+apr_status_t h2_io_out_read_to(h2_io *io,
+ apr_bucket_brigade *bb,
+ apr_size_t *plen, int *peos);
+
apr_status_t h2_io_out_write(h2_io *io, apr_bucket_brigade *bb,
apr_size_t maxlen, int *pfile_buckets_allowed);
return status;
}
+apr_status_t h2_mplx_out_read_to(h2_mplx *m, int stream_id,
+ apr_bucket_brigade *bb,
+ apr_size_t *plen, int *peos)
+{
+ apr_status_t status;
+ AP_DEBUG_ASSERT(m);
+ if (m->aborted) {
+ return APR_ECONNABORTED;
+ }
+ status = apr_thread_mutex_lock(m->lock);
+ if (APR_SUCCESS == status) {
+ h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
+ if (io) {
+ H2_MPLX_IO_OUT(APLOG_TRACE2, m, io, "h2_mplx_out_readx_pre");
+
+ status = h2_io_out_read_to(io, bb, plen, peos);
+
+ H2_MPLX_IO_OUT(APLOG_TRACE2, m, io, "h2_mplx_out_readx_post");
+ if (status == APR_SUCCESS && io->output_drained) {
+ apr_thread_cond_signal(io->output_drained);
+ }
+ }
+ else {
+ status = APR_ECONNABORTED;
+ }
+ apr_thread_mutex_unlock(m->lock);
+ }
+ return status;
+}
+
h2_stream *h2_mplx_next_submit(h2_mplx *m, h2_stream_set *streams)
{
apr_status_t status;
h2_io_data_cb *cb, void *ctx,
apr_size_t *plen, int *peos);
+/**
+ * Reads output data into the given brigade. Will never block, but
+ * return APR_EAGAIN until data arrives or the stream is closed.
+ */
+apr_status_t h2_mplx_out_read_to(h2_mplx *mplx, int stream_id,
+ apr_bucket_brigade *bb,
+ apr_size_t *plen, int *peos);
+
/**
* Opens the output for the given stream with the specified response.
*/
return h2_stream_schedule(stream, eos, stream_pri_cmp, session);
}
-static apr_status_t send_data(h2_session *session, const char *data,
- apr_size_t length);
-
/*
* Callback when nghttp2 wants to send bytes back to the client.
*/
int flags, void *userp)
{
h2_session *session = (h2_session *)userp;
- apr_status_t status = send_data(session, (const char *)data, length);
+ apr_status_t status;
(void)ngh2;
(void)flags;
+ status = h2_conn_io_write(&session->io, (const char *)data, length);
if (status == APR_SUCCESS) {
return length;
}
return 0;
}
-static apr_status_t send_data(h2_session *session, const char *data,
- apr_size_t length)
-{
- return h2_conn_io_write(&session->io, data, length);
-}
-
static apr_status_t pass_data(void *ctx,
const char *data, apr_size_t length)
{
- return send_data((h2_session*)ctx, data, length);
+ return h2_conn_io_write(&((h2_session*)ctx)->io, data, length);
}
+static char immortal_zeros[256];
+
static int on_send_data_cb(nghttp2_session *ngh2,
nghttp2_frame *frame,
const uint8_t *framehd,
"h2_stream(%ld-%d): send_data_cb for %ld bytes",
session->id, (int)stream_id, (long)length);
- status = send_data(session, (const char *)framehd, 9);
- if (status == APR_SUCCESS) {
+ if (h2_conn_io_is_buffered(&session->io)) {
+ status = h2_conn_io_write(&session->io, (const char *)framehd, 9);
+ if (status == APR_SUCCESS) {
+ if (padlen) {
+ status = h2_conn_io_write(&session->io, (const char *)&padlen, 1);
+ }
+
+ if (status == APR_SUCCESS) {
+ apr_size_t len = length;
+ status = h2_stream_readx(stream, pass_data, session,
+ &len, &eos);
+ if (status == APR_SUCCESS && len != length) {
+ status = APR_EINVAL;
+ }
+ }
+
+ if (status == APR_SUCCESS && padlen) {
+ if (padlen) {
+ char pad[256];
+ memset(pad, 0, padlen);
+ status = h2_conn_io_write(&session->io, pad, padlen);
+ }
+ }
+ }
+ }
+ else {
+ apr_bucket *b;
+ char *header = apr_pcalloc(stream->pool, 10);
+ memcpy(header, (const char *)framehd, 9);
if (padlen) {
- status = send_data(session, (const char *)&padlen, 1);
+ header[9] = (char)padlen;
}
-
+ b = apr_bucket_pool_create(header, padlen? 10 : 9,
+ stream->pool, session->c->bucket_alloc);
+ status = h2_conn_io_append(&session->io, b);
+
if (status == APR_SUCCESS) {
apr_size_t len = length;
- status = h2_stream_readx(stream, pass_data, session,
- &len, &eos);
+ status = h2_stream_read_to(stream, session->io.output, &len, &eos);
+ session->io.unflushed = 1;
if (status == APR_SUCCESS && len != length) {
status = APR_EINVAL;
}
}
-
+
if (status == APR_SUCCESS && padlen) {
- if (padlen) {
- char pad[256];
- memset(pad, 0, padlen);
- status = send_data(session, pad, padlen);
- }
+ b = apr_bucket_immortal_create(immortal_zeros, padlen,
+ session->c->bucket_alloc);
+ status = h2_conn_io_append(&session->io, b);
}
}
+
if (status == APR_SUCCESS) {
return 0;
}
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, c->session->c,
"h2_stream(%ld-%d): copy %ld bytes for DATA #%ld",
c->session->id, c->stream->id,
- (long)c->stream->data_frames_sent, (long)len);
+ (long)len, (long)c->stream->data_frames_sent);
memcpy(c->buf + c->offset, data, len);
c->offset += len;
return APR_SUCCESS;
AP_DEBUG_ASSERT(!h2_stream_is_suspended(stream));
- if (h2_conn_io_is_buffered(&session->io)) {
+ if (1 || h2_conn_io_is_buffered(&session->io)) {
status = h2_stream_prep_read(stream, &nread, &eos);
if (nread) {
*data_flags |= NGHTTP2_DATA_FLAG_NO_COPY;
stream->response = response;
if (bb && !APR_BRIGADE_EMPTY(bb)) {
+ int move_all = INT_MAX;
if (!stream->bbout) {
stream->bbout = apr_brigade_create(stream->pool,
stream->m->c->bucket_alloc);
}
- /* TODO: this does not move complete file buckets.*/
- status = h2_util_move(stream->bbout, bb, 16 * 1024, NULL,
+ /* we can move file handles from h2_mplx into this h2_stream as many
+ * as we want, since the lifetimes are the same and we are not freeing
+ * the ones in h2_mplx->io before this stream is done. */
+ status = h2_util_move(stream->bbout, bb, 16 * 1024, &move_all,
"h2_stream_set_response");
}
if (APLOGctrace1(stream->m->c)) {
return status;
}
+apr_status_t h2_stream_read_to(h2_stream *stream, apr_bucket_brigade *bb,
+ apr_size_t *plen, int *peos)
+{
+ apr_status_t status = APR_SUCCESS;
+
+ if (stream->rst_error) {
+ return APR_ECONNRESET;
+ }
+ if (stream->bbout && !APR_BRIGADE_EMPTY(stream->bbout)) {
+ status = h2_transfer_brigade(bb, stream->bbout, bb->p, plen, peos);
+ }
+ else {
+ status = h2_mplx_out_read_to(stream->m, stream->id, bb, plen, peos);
+ }
+ if (status == APR_SUCCESS && !*peos && !*plen) {
+ status = APR_EAGAIN;
+ }
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, stream->m->c,
+ "h2_stream(%ld-%d): read_to, len=%ld eos=%d",
+ stream->m->id, stream->id, (long)*plen, *peos);
+ return status;
+}
void h2_stream_set_suspended(h2_stream *stream, int suspended)
{
apr_status_t h2_stream_readx(h2_stream *stream, h2_io_data_cb *cb,
void *ctx, apr_size_t *plen, int *peos);
+apr_status_t h2_stream_read_to(h2_stream *stream, apr_bucket_brigade *bb,
+ apr_size_t *plen, int *peos);
+
+
void h2_stream_set_suspended(h2_stream *stream, int suspended);
int h2_stream_is_suspended(h2_stream *stream);
if (!APR_BRIGADE_EMPTY(input->bb)) {
if (mode == AP_MODE_EXHAUSTIVE) {
/* return all we have */
- return h2_util_move(bb, input->bb, readbytes, 0,
+ return h2_util_move(bb, input->bb, readbytes, NULL,
"task_input_read(exhaustive)");
}
else if (mode == AP_MODE_READBYTES) {
- return h2_util_move(bb, input->bb, readbytes, 0,
+ return h2_util_move(bb, input->bb, readbytes, NULL,
"task_input_read(readbytes)");
}
else if (mode == AP_MODE_SPECULATIVE) {
c->id, stream_id, tag, line);
}
+
+AP_DECLARE(apr_status_t) h2_transfer_brigade(apr_bucket_brigade *to,
+ apr_bucket_brigade *from,
+ apr_pool_t *p,
+ apr_size_t *plen,
+ int *peos)
+{
+ apr_bucket *e;
+ apr_size_t len = 0, remain = *plen;
+ apr_status_t rv;
+
+ *peos = 0;
+
+ while (!APR_BRIGADE_EMPTY(from)) {
+ e = APR_BRIGADE_FIRST(from);
+
+ if (APR_BUCKET_IS_METADATA(e)) {
+ if (APR_BUCKET_IS_EOS(e)) {
+ *peos = 1;
+ }
+ }
+ else {
+ if (remain > 0 && e->length == ((apr_size_t)-1)) {
+ const char *ign;
+ apr_size_t ilen;
+ rv = apr_bucket_read(e, &ign, &ilen, APR_BLOCK_READ);
+ if (rv != APR_SUCCESS) {
+ return rv;
+ }
+ }
+
+ if (remain < e->length) {
+ if (remain <= 0) {
+ return APR_SUCCESS;
+ }
+ apr_bucket_split(e, remain);
+ }
+ }
+
+ rv = apr_bucket_setaside(e, p);
+
+ /* If the bucket type does not implement setaside, then
+ * (hopefully) morph it into a bucket type which does, and set
+ * *that* aside... */
+ if (rv == APR_ENOTIMPL) {
+ const char *s;
+ apr_size_t n;
+
+ rv = apr_bucket_read(e, &s, &n, APR_BLOCK_READ);
+ if (rv == APR_SUCCESS) {
+ rv = apr_bucket_setaside(e, p);
+ }
+ }
+
+ if (rv != APR_SUCCESS) {
+ /* Return an error but still save the brigade if
+ * ->setaside() is really not implemented. */
+ if (rv != APR_ENOTIMPL) {
+ return rv;
+ }
+ }
+
+ APR_BUCKET_REMOVE(e);
+ APR_BRIGADE_INSERT_TAIL(to, e);
+ len += e->length;
+ remain -= e->length;
+ }
+
+ *plen = len;
+ return APR_SUCCESS;
+}
+
void h2_util_bb_log(conn_rec *c, int stream_id, int level,
const char *tag, apr_bucket_brigade *bb);
+/**
+ * Transfer buckets from one brigade to another with a limit on the
+ * maximum amount of bytes transfered.
+ * @param to brigade to transfer buckets to
+ * @param from brigades to remove buckets from
+ * @param p pool that buckets should be setaside to
+ * @param plen maximum bytes to transfer, actual bytes transferred
+ * @param peos if an EOS bucket was transferred
+ */
+AP_DECLARE(apr_status_t) h2_transfer_brigade(apr_bucket_brigade *to,
+ apr_bucket_brigade *from,
+ apr_pool_t *p,
+ apr_size_t *plen,
+ int *peos);
+
#endif /* defined(__mod_h2__h2_util__) */