#undef APLOG_MODULE_INDEX
#define APLOG_MODULE_INDEX AP_CORE_MODULE_INDEX
-int ap_core_input_filter(ap_filter_t *f, apr_bucket_brigade *b,
- ap_input_mode_t mode, apr_read_type_e block,
- apr_off_t readbytes)
+struct core_output_filter_ctx {
+ apr_bucket_brigade *buffered_bb;
+ apr_bucket_brigade *tmp_flush_bb;
+ apr_pool_t *deferred_write_pool;
+ apr_size_t bytes_written;
+};
+
+struct core_filter_ctx {
+ apr_bucket_brigade *b;
+ apr_bucket_brigade *tmpbb;
+};
+
+
+apr_status_t ap_core_input_filter(ap_filter_t *f, apr_bucket_brigade *b,
+ ap_input_mode_t mode, apr_read_type_e block,
+ apr_off_t readbytes)
{
- apr_bucket *e;
apr_status_t rv;
core_net_rec *net = f->ctx;
core_ctx_t *ctx = net->in_ctx;
if (!ctx)
{
- /*
- * Note that this code is never executed on Windows because the winnt
- * MPM does the setup of net->in_ctx.
- * XXX: This should be fixed.
- */
- ctx = apr_pcalloc(f->c->pool, sizeof(*ctx));
+ net->in_ctx = ctx = apr_palloc(f->c->pool, sizeof(*ctx));
ctx->b = apr_brigade_create(f->c->pool, f->c->bucket_alloc);
- ctx->tmpbb = apr_brigade_create(ctx->b->p, ctx->b->bucket_alloc);
+ ctx->tmpbb = apr_brigade_create(f->c->pool, f->c->bucket_alloc);
/* seed the brigade with the client socket. */
- e = apr_bucket_socket_create(net->client_socket, f->c->bucket_alloc);
- APR_BRIGADE_INSERT_TAIL(ctx->b, e);
- net->in_ctx = ctx;
+ rv = ap_run_insert_network_bucket(f->c, ctx->b, net->client_socket);
+ if (rv != APR_SUCCESS)
+ return rv;
}
else if (APR_BRIGADE_EMPTY(ctx->b)) {
return APR_EOF;
* empty). We do this by returning whatever we have read. This may
* or may not be bogus, but is consistent (for now) with EOF logic.
*/
- if (APR_STATUS_IS_EAGAIN(rv)) {
+ if (APR_STATUS_IS_EAGAIN(rv) && block == APR_NONBLOCK_READ) {
rv = APR_SUCCESS;
}
return rv;
e = APR_BRIGADE_FIRST(ctx->b);
rv = apr_bucket_read(e, &str, &len, block);
- if (APR_STATUS_IS_EAGAIN(rv)) {
+ if (APR_STATUS_IS_EAGAIN(rv) && block == APR_NONBLOCK_READ) {
+ /* getting EAGAIN for a blocking read is an error; for a
+ * non-blocking read, return an empty brigade. */
return APR_SUCCESS;
}
else if (rv != APR_SUCCESS) {
apr_bucket_brigade *bb = NULL;
apr_bucket *bucket, *next, *flush_upto = NULL;
apr_size_t bytes_in_brigade, non_file_bytes_in_brigade;
- int eor_buckets_in_brigade;
+ int eor_buckets_in_brigade, morphing_bucket_in_brigade;
apr_status_t rv;
/* Fail quickly if the connection has already been aborted. */
if (ctx == NULL) {
ctx = apr_pcalloc(c->pool, sizeof(*ctx));
net->out_ctx = (core_output_filter_ctx_t *)ctx;
- rv = apr_socket_opt_set(net->client_socket, APR_SO_NONBLOCK, 1);
- if (rv != APR_SUCCESS) {
- return rv;
- }
/*
* Need to create tmp brigade with correct lifetime. Passing
* NULL to apr_brigade_split_ex would result in a brigade
}
/* Scan through the brigade and decide whether to attempt a write,
- * based on the following rules:
+ * and how much to write, based on the following rules:
*
* 1) The new_bb is null: Do a nonblocking write of as much as
* possible: do a nonblocking write of as much data as possible,
* completion and has just determined that this connection
* is writable.)
*
- * 2) The brigade contains a flush bucket: Do a blocking write
+ * 2) Determine if and up to which bucket we need to do a blocking
+ * write:
+ *
+ * a) The brigade contains a flush bucket: Do a blocking write
* of everything up that point.
*
- * 3) The request is in CONN_STATE_HANDLER state, and the brigade
+ * b) The request is in CONN_STATE_HANDLER state, and the brigade
* contains at least THRESHOLD_MAX_BUFFER bytes in non-file
* buckets: Do blocking writes until the amount of data in the
* buffer is less than THRESHOLD_MAX_BUFFER. (The point of this
* streaming out lots of data faster than the data can be
* sent to the client.)
*
- * 4) The request is in CONN_STATE_HANDLER state, and the brigade
+ * c) The request is in CONN_STATE_HANDLER state, and the brigade
* contains at least MAX_REQUESTS_IN_PIPELINE EOR buckets:
* Do blocking writes until less than MAX_REQUESTS_IN_PIPELINE EOR
* buckets are left. (The point of this rule is to prevent too many
* FDs being kept open by pipelined requests, possibly allowing a
* DoS).
*
- * 5) The brigade contains at least THRESHOLD_MIN_WRITE
+ * d) The brigade contains a morphing bucket: If there was no other
+ * reason to do a blocking write yet, try reading the bucket. If its
+ * contents fit into memory before THRESHOLD_MAX_BUFFER is reached,
+ * everything is fine. Otherwise we need to do a blocking write the
+ * up to and including the morphing bucket, because ap_save_brigade()
+ * would read the whole bucket into memory later on.
+ *
+ * 3) Actually do the blocking write up to the last bucket determined
+ * by rules 2a-d. The point of doing only one flush is to make as
+ * few calls to writev() as possible.
+ *
+ * 4) If the brigade contains at least THRESHOLD_MIN_WRITE
* bytes: Do a nonblocking write of as much data as possible,
* then save the rest in ctx->buffered_bb.
*/
}
else if (rv != APR_SUCCESS) {
/* The client has aborted the connection */
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, rv, c,
+ "core_output_filter: writing data to the network");
c->aborted = 1;
}
setaside_remaining_output(f, ctx, bb, c);
bytes_in_brigade = 0;
non_file_bytes_in_brigade = 0;
eor_buckets_in_brigade = 0;
+ morphing_bucket_in_brigade = 0;
+
for (bucket = APR_BRIGADE_FIRST(bb); bucket != APR_BRIGADE_SENTINEL(bb);
bucket = next) {
next = APR_BUCKET_NEXT(bucket);
if (!APR_BUCKET_IS_METADATA(bucket)) {
if (bucket->length == (apr_size_t)-1) {
- const char *data;
- apr_size_t length;
- /* XXX support nonblocking read here? */
- rv = apr_bucket_read(bucket, &data, &length, APR_BLOCK_READ);
- if (rv != APR_SUCCESS) {
- return rv;
- }
- /* reading may have split the bucket, so recompute next: */
- next = APR_BUCKET_NEXT(bucket);
+ /*
+ * A setaside of morphing buckets would read everything into
+ * memory. Instead, we will flush everything up to and
+ * including this bucket.
+ */
+ morphing_bucket_in_brigade = 1;
}
- bytes_in_brigade += bucket->length;
- if (!APR_BUCKET_IS_FILE(bucket)) {
- non_file_bytes_in_brigade += bucket->length;
+ else {
+ bytes_in_brigade += bucket->length;
+ if (!APR_BUCKET_IS_FILE(bucket))
+ non_file_bytes_in_brigade += bucket->length;
}
}
else if (AP_BUCKET_IS_EOR(bucket)) {
eor_buckets_in_brigade++;
}
- if (APR_BUCKET_IS_FLUSH(bucket) ||
- (non_file_bytes_in_brigade >= THRESHOLD_MAX_BUFFER) ||
- (eor_buckets_in_brigade > MAX_REQUESTS_IN_PIPELINE) )
- {
+ if (APR_BUCKET_IS_FLUSH(bucket)
+ || non_file_bytes_in_brigade >= THRESHOLD_MAX_BUFFER
+ || morphing_bucket_in_brigade
+ || eor_buckets_in_brigade > MAX_REQUESTS_IN_PIPELINE) {
+ /* this segment of the brigade MUST be sent before returning. */
+
if (APLOGctrace6(c)) {
char *reason = APR_BUCKET_IS_FLUSH(bucket) ?
"FLUSH bucket" :
(non_file_bytes_in_brigade >= THRESHOLD_MAX_BUFFER) ?
"THRESHOLD_MAX_BUFFER" :
+ morphing_bucket_in_brigade ? "morphing bucket" :
"MAX_REQUESTS_IN_PIPELINE";
ap_log_cerror(APLOG_MARK, APLOG_TRACE6, 0, c,
"core_output_filter: flushing because of %s",
bytes_in_brigade = 0;
non_file_bytes_in_brigade = 0;
eor_buckets_in_brigade = 0;
+ morphing_bucket_in_brigade = 0;
}
}
&(ctx->bytes_written), c);
if (rv != APR_SUCCESS) {
/* The client has aborted the connection */
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, rv, c,
+ "core_output_filter: writing data to the network");
c->aborted = 1;
return rv;
}
&(ctx->bytes_written), c);
if ((rv != APR_SUCCESS) && (!APR_STATUS_IS_EAGAIN(rv))) {
/* The client has aborted the connection */
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, rv, c,
+ "core_output_filter: writing data to the network");
c->aborted = 1;
return rv;
}
if (!APR_BUCKET_IS_METADATA(bucket)) {
const char *data;
apr_size_t length;
- rv = apr_bucket_read(bucket, &data, &length, APR_BLOCK_READ);
+
+ /* Non-blocking read first, in case this is a morphing
+ * bucket type. */
+ rv = apr_bucket_read(bucket, &data, &length, APR_NONBLOCK_READ);
+ if (APR_STATUS_IS_EAGAIN(rv)) {
+ /* Read would block; flush any pending data and retry. */
+ if (nvec) {
+ rv = writev_nonblocking(s, vec, nvec, bb, bytes_written, c);
+ if (rv) {
+ return rv;
+ }
+ nvec = 0;
+ }
+
+ rv = apr_bucket_read(bucket, &data, &length, APR_BLOCK_READ);
+ }
if (rv != APR_SUCCESS) {
return rv;
}
+
/* reading may have split the bucket, so recompute next: */
next = APR_BUCKET_NEXT(bucket);
vec[nvec].iov_base = (char *)data;
pollset.reqevents = APR_POLLOUT;
pollset.desc.s = s;
apr_socket_timeout_get(s, &timeout);
- rv = apr_poll(&pollset, 1, &nsds, timeout);
+ do {
+ rv = apr_poll(&pollset, 1, &nsds, timeout);
+ } while (APR_STATUS_IS_EINTR(rv));
if (rv != APR_SUCCESS) {
break;
}