#include "apr_fnmatch.h"
#include "apr_hash.h"
#include "apr_thread_proc.h" /* for RLIMIT stuff */
-#include "apr_hooks.h"
#define APR_WANT_IOVEC
#define APR_WANT_STRFUNC
} while (!APR_BRIGADE_EMPTY(b) && (e != APR_BRIGADE_SENTINEL(b))); \
} while (0)
+/* we know core's module_index is 0 */
+#undef APLOG_MODULE_INDEX
+#define APLOG_MODULE_INDEX AP_CORE_MODULE_INDEX
-/**
- * Split the contents of a brigade after bucket 'e' to an existing brigade
- *
- * XXXX: Should this function be added to APR-Util?
- */
-static void brigade_move(apr_bucket_brigade *b, apr_bucket_brigade *a,
- apr_bucket *e)
-{
- apr_bucket *f;
+struct core_output_filter_ctx {
+ apr_bucket_brigade *buffered_bb;
+ apr_bucket_brigade *tmp_flush_bb;
+ apr_pool_t *deferred_write_pool;
+ apr_size_t bytes_written;
+};
- if (e != APR_BRIGADE_SENTINEL(b)) {
- f = APR_RING_LAST(&b->list);
- APR_RING_UNSPLICE(e, f, link);
- APR_RING_SPLICE_HEAD(&a->list, e, f, apr_bucket, link);
- }
+struct core_filter_ctx {
+ apr_bucket_brigade *b;
+ apr_bucket_brigade *tmpbb;
+};
- APR_BRIGADE_CHECK_CONSISTENCY(a);
- APR_BRIGADE_CHECK_CONSISTENCY(b);
-}
-int ap_core_input_filter(ap_filter_t *f, apr_bucket_brigade *b,
- ap_input_mode_t mode, apr_read_type_e block,
- apr_off_t readbytes)
+apr_status_t ap_core_input_filter(ap_filter_t *f, apr_bucket_brigade *b,
+ ap_input_mode_t mode, apr_read_type_e block,
+ apr_off_t readbytes)
{
- apr_bucket *e;
apr_status_t rv;
core_net_rec *net = f->ctx;
core_ctx_t *ctx = net->in_ctx;
if (!ctx)
{
- ctx = apr_pcalloc(f->c->pool, sizeof(*ctx));
+ net->in_ctx = ctx = apr_palloc(f->c->pool, sizeof(*ctx));
ctx->b = apr_brigade_create(f->c->pool, f->c->bucket_alloc);
- ctx->tmpbb = apr_brigade_create(ctx->b->p, ctx->b->bucket_alloc);
+ ctx->tmpbb = apr_brigade_create(f->c->pool, f->c->bucket_alloc);
/* seed the brigade with the client socket. */
- e = apr_bucket_socket_create(net->client_socket, f->c->bucket_alloc);
- APR_BRIGADE_INSERT_TAIL(ctx->b, e);
- net->in_ctx = ctx;
+ rv = ap_run_insert_network_bucket(f->c, ctx->b, net->client_socket);
+ if (rv != APR_SUCCESS)
+ return rv;
}
else if (APR_BRIGADE_EMPTY(ctx->b)) {
return APR_EOF;
* empty). We do this by returning whatever we have read. This may
* or may not be bogus, but is consistent (for now) with EOF logic.
*/
- if (APR_STATUS_IS_EAGAIN(rv)) {
+ if (APR_STATUS_IS_EAGAIN(rv) && block == APR_NONBLOCK_READ) {
rv = APR_SUCCESS;
}
return rv;
* the brigade that was passed down, and send that brigade back.
*
* NOTE: This is VERY dangerous to use, and should only be done with
- * extreme caution. However, the Perchild MPM needs this feature
- * if it is ever going to work correctly again. With this, the Perchild
- * MPM can easily request the socket and all data that has been read,
- * which means that it can pass it to the correct child process.
+ * extreme caution. FWLIW, this would be needed by an MPM like Perchild;
+ * such an MPM can easily request the socket and all data that has been
+ * read, which means that it can pass it to the correct child process.
*/
if (mode == AP_MODE_EXHAUSTIVE) {
apr_bucket *e;
e = APR_BRIGADE_FIRST(ctx->b);
rv = apr_bucket_read(e, &str, &len, block);
- if (APR_STATUS_IS_EAGAIN(rv)) {
+ if (APR_STATUS_IS_EAGAIN(rv) && block == APR_NONBLOCK_READ) {
+ /* getting EAGAIN for a blocking read is an error; for a
+ * non-blocking read, return an empty brigade. */
return APR_SUCCESS;
}
else if (rv != APR_SUCCESS) {
}
/* Must do move before CONCAT */
- brigade_move(ctx->b, ctx->tmpbb, e);
+ ctx->tmpbb = apr_brigade_split_ex(ctx->b, e, ctx->tmpbb);
if (mode == AP_MODE_READBYTES) {
APR_BRIGADE_CONCAT(b, ctx->b);
static void setaside_remaining_output(ap_filter_t *f,
core_output_filter_ctx_t *ctx,
apr_bucket_brigade *bb,
- int make_a_copy, conn_rec *c);
+ conn_rec *c);
static apr_status_t send_brigade_nonblocking(apr_socket_t *s,
apr_bucket_brigade *bb,
conn_rec *c);
#endif
+/* XXX: Should these be configurable parameters? */
#define THRESHOLD_MIN_WRITE 4096
#define THRESHOLD_MAX_BUFFER 65536
+#define MAX_REQUESTS_IN_PIPELINE 5
/* Optional function coming from mod_logio, used for logging of output
* traffic
conn_rec *c = f->c;
core_net_rec *net = f->ctx;
core_output_filter_ctx_t *ctx = net->out_ctx;
- apr_bucket_brigade *bb;
- apr_bucket *bucket, *next;
+ apr_bucket_brigade *bb = NULL;
+ apr_bucket *bucket, *next, *flush_upto = NULL;
apr_size_t bytes_in_brigade, non_file_bytes_in_brigade;
+ int eor_buckets_in_brigade, morphing_bucket_in_brigade;
+ apr_status_t rv;
/* Fail quickly if the connection has already been aborted. */
if (c->aborted) {
}
if (ctx == NULL) {
- apr_status_t rv;
ctx = apr_pcalloc(c->pool, sizeof(*ctx));
net->out_ctx = (core_output_filter_ctx_t *)ctx;
- rv = apr_socket_opt_set(net->client_socket, APR_SO_NONBLOCK, 1);
- if (rv != APR_SUCCESS) {
- return rv;
- }
+ /*
+ * Need to create tmp brigade with correct lifetime. Passing
+ * NULL to apr_brigade_split_ex would result in a brigade
+ * allocated from bb->pool which might be wrong.
+ */
+ ctx->tmp_flush_bb = apr_brigade_create(c->pool, c->bucket_alloc);
+ /* same for buffered_bb and ap_save_brigade */
+ ctx->buffered_bb = apr_brigade_create(c->pool, c->bucket_alloc);
}
- if (new_bb != NULL) {
- for (bucket = APR_BRIGADE_FIRST(new_bb); bucket != APR_BRIGADE_SENTINEL(new_bb); bucket = APR_BUCKET_NEXT(bucket)) {
- if (bucket->length > 0) {
- ctx->bytes_in += bucket->length;
- }
- }
- }
+ if (new_bb != NULL)
+ bb = new_bb;
if ((ctx->buffered_bb != NULL) &&
!APR_BRIGADE_EMPTY(ctx->buffered_bb)) {
- bb = ctx->buffered_bb;
- ctx->buffered_bb = NULL;
if (new_bb != NULL) {
- APR_BRIGADE_CONCAT(bb, new_bb);
+ APR_BRIGADE_PREPEND(bb, ctx->buffered_bb);
+ }
+ else {
+ bb = ctx->buffered_bb;
}
c->data_in_output_filters = 0;
}
- else if (new_bb != NULL) {
- bb = new_bb;
- }
- else {
+ else if (new_bb == NULL) {
return APR_SUCCESS;
}
/* Scan through the brigade and decide whether to attempt a write,
- * based on the following rules:
+ * and how much to write, based on the following rules:
*
* 1) The new_bb is null: Do a nonblocking write of as much as
* possible: do a nonblocking write of as much data as possible,
* completion and has just determined that this connection
* is writable.)
*
- * 2) The brigade contains a flush bucket: Do a blocking write
+ * 2) Determine if and up to which bucket we need to do a blocking
+ * write:
+ *
+ * a) The brigade contains a flush bucket: Do a blocking write
* of everything up that point.
*
- * 3) The request is in CONN_STATE_HANDLER state, and the brigade
+ * b) The request is in CONN_STATE_HANDLER state, and the brigade
* contains at least THRESHOLD_MAX_BUFFER bytes in non-file
* buckets: Do blocking writes until the amount of data in the
* buffer is less than THRESHOLD_MAX_BUFFER. (The point of this
* streaming out lots of data faster than the data can be
* sent to the client.)
*
- * 4) The brigade contains at least THRESHOLD_MIN_WRITE
+ * c) The request is in CONN_STATE_HANDLER state, and the brigade
+ * contains at least MAX_REQUESTS_IN_PIPELINE EOR buckets:
+ * Do blocking writes until less than MAX_REQUESTS_IN_PIPELINE EOR
+ * buckets are left. (The point of this rule is to prevent too many
+ * FDs being kept open by pipelined requests, possibly allowing a
+ * DoS).
+ *
+ * d) The brigade contains a morphing bucket: If there was no other
+ * reason to do a blocking write yet, try reading the bucket. If its
+ * contents fit into memory before THRESHOLD_MAX_BUFFER is reached,
+ * everything is fine. Otherwise we need to do a blocking write the
+ * up to and including the morphing bucket, because ap_save_brigade()
+ * would read the whole bucket into memory later on.
+ *
+ * 3) Actually do the blocking write up to the last bucket determined
+ * by rules 2a-d. The point of doing only one flush is to make as
+ * few calls to writev() as possible.
+ *
+ * 4) If the brigade contains at least THRESHOLD_MIN_WRITE
* bytes: Do a nonblocking write of as much data as possible,
* then save the rest in ctx->buffered_bb.
*/
if (new_bb == NULL) {
- apr_status_t rv = send_brigade_nonblocking(net->client_socket, bb,
- &(ctx->bytes_written), c);
+ rv = send_brigade_nonblocking(net->client_socket, bb,
+ &(ctx->bytes_written), c);
if (APR_STATUS_IS_EAGAIN(rv)) {
rv = APR_SUCCESS;
}
else if (rv != APR_SUCCESS) {
/* The client has aborted the connection */
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, rv, c,
+ "core_output_filter: writing data to the network");
c->aborted = 1;
}
- setaside_remaining_output(f, ctx, bb, 0, c);
+ setaside_remaining_output(f, ctx, bb, c);
return rv;
}
bytes_in_brigade = 0;
non_file_bytes_in_brigade = 0;
+ eor_buckets_in_brigade = 0;
+ morphing_bucket_in_brigade = 0;
+
for (bucket = APR_BRIGADE_FIRST(bb); bucket != APR_BRIGADE_SENTINEL(bb);
bucket = next) {
next = APR_BUCKET_NEXT(bucket);
- if (APR_BUCKET_IS_FLUSH(bucket)) {
- apr_bucket_brigade *remainder = apr_brigade_split(bb, next);
- apr_status_t rv = send_brigade_blocking(net->client_socket, bb,
- &(ctx->bytes_written), c);
- if (rv != APR_SUCCESS) {
- /* The client has aborted the connection */
- c->aborted = 1;
- return rv;
+
+ if (!APR_BUCKET_IS_METADATA(bucket)) {
+ if (bucket->length == (apr_size_t)-1) {
+ /*
+ * A setaside of morphing buckets would read everything into
+ * memory. Instead, we will flush everything up to and
+ * including this bucket.
+ */
+ morphing_bucket_in_brigade = 1;
}
- bb = remainder;
- next = APR_BRIGADE_FIRST(bb);
- bytes_in_brigade = 0;
- non_file_bytes_in_brigade = 0;
- }
- else if (!APR_BUCKET_IS_METADATA(bucket)) {
- if (bucket->length < 0) {
- const char *data;
- apr_size_t length;
- /* XXX support nonblocking read here? */
- apr_status_t rv =
- apr_bucket_read(bucket, &data, &length, APR_BLOCK_READ);
- if (rv != APR_SUCCESS) {
- return rv;
- }
- /* reading may have split the bucket, so recompute next: */
- next = APR_BUCKET_NEXT(bucket);
+ else {
+ bytes_in_brigade += bucket->length;
+ if (!APR_BUCKET_IS_FILE(bucket))
+ non_file_bytes_in_brigade += bucket->length;
}
- bytes_in_brigade += bucket->length;
- if (!APR_BUCKET_IS_FILE(bucket)) {
- non_file_bytes_in_brigade += bucket->length;
+ }
+ else if (AP_BUCKET_IS_EOR(bucket)) {
+ eor_buckets_in_brigade++;
+ }
+
+ if (APR_BUCKET_IS_FLUSH(bucket)
+ || non_file_bytes_in_brigade >= THRESHOLD_MAX_BUFFER
+ || morphing_bucket_in_brigade
+ || eor_buckets_in_brigade > MAX_REQUESTS_IN_PIPELINE) {
+ /* this segment of the brigade MUST be sent before returning. */
+
+ if (APLOGctrace6(c)) {
+ char *reason = APR_BUCKET_IS_FLUSH(bucket) ?
+ "FLUSH bucket" :
+ (non_file_bytes_in_brigade >= THRESHOLD_MAX_BUFFER) ?
+ "THRESHOLD_MAX_BUFFER" :
+ morphing_bucket_in_brigade ? "morphing bucket" :
+ "MAX_REQUESTS_IN_PIPELINE";
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE6, 0, c,
+ "core_output_filter: flushing because of %s",
+ reason);
}
+ /*
+ * Defer the actual blocking write to avoid doing many writes.
+ */
+ flush_upto = next;
+
+ bytes_in_brigade = 0;
+ non_file_bytes_in_brigade = 0;
+ eor_buckets_in_brigade = 0;
+ morphing_bucket_in_brigade = 0;
}
}
- if (non_file_bytes_in_brigade >= THRESHOLD_MAX_BUFFER) {
- /* ### Writing the entire brigade may be excessive; we really just
- * ### need to send enough data to be under THRESHOLD_MAX_BUFFER.
- */
- apr_status_t rv = send_brigade_blocking(net->client_socket, bb,
- &(ctx->bytes_written), c);
+ if (flush_upto != NULL) {
+ ctx->tmp_flush_bb = apr_brigade_split_ex(bb, flush_upto,
+ ctx->tmp_flush_bb);
+ rv = send_brigade_blocking(net->client_socket, bb,
+ &(ctx->bytes_written), c);
if (rv != APR_SUCCESS) {
/* The client has aborted the connection */
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, rv, c,
+ "core_output_filter: writing data to the network");
c->aborted = 1;
return rv;
}
+ APR_BRIGADE_CONCAT(bb, ctx->tmp_flush_bb);
}
- else if (bytes_in_brigade >= THRESHOLD_MIN_WRITE) {
- apr_status_t rv = send_brigade_nonblocking(net->client_socket, bb,
- &(ctx->bytes_written), c);
+
+ if (bytes_in_brigade >= THRESHOLD_MIN_WRITE) {
+ rv = send_brigade_nonblocking(net->client_socket, bb,
+ &(ctx->bytes_written), c);
if ((rv != APR_SUCCESS) && (!APR_STATUS_IS_EAGAIN(rv))) {
/* The client has aborted the connection */
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, rv, c,
+ "core_output_filter: writing data to the network");
c->aborted = 1;
return rv;
}
}
- setaside_remaining_output(f, ctx, bb, 1, c);
+ setaside_remaining_output(f, ctx, bb, c);
return APR_SUCCESS;
}
+/*
+ * This function assumes that either ctx->buffered_bb == NULL, or
+ * ctx->buffered_bb is empty, or ctx->buffered_bb == bb
+ */
static void setaside_remaining_output(ap_filter_t *f,
core_output_filter_ctx_t *ctx,
apr_bucket_brigade *bb,
- int make_a_copy, conn_rec *c)
+ conn_rec *c)
{
if (bb == NULL) {
return;
remove_empty_buckets(bb);
if (!APR_BRIGADE_EMPTY(bb)) {
c->data_in_output_filters = 1;
- if (make_a_copy) {
- /* XXX should this use a separate deferred write pool, like
- * the original ap_core_output_filter?
- */
- ap_save_brigade(f, &(ctx->buffered_bb), &bb, c->pool);
- apr_brigade_destroy(bb);
- }
- else {
- ctx->buffered_bb = bb;
+ if (bb != ctx->buffered_bb) {
+ if (!ctx->deferred_write_pool) {
+ apr_pool_create(&ctx->deferred_write_pool, c->pool);
+ apr_pool_tag(ctx->deferred_write_pool, "deferred_write");
+ }
+ ap_save_brigade(f, &(ctx->buffered_bb), &bb,
+ ctx->deferred_write_pool);
+ apr_brigade_cleanup(bb);
}
}
- else {
- apr_brigade_destroy(bb);
+ else if (ctx->deferred_write_pool) {
+ /*
+ * There are no more requests in the pipeline. We can just clear the
+ * pool.
+ */
+ apr_pool_clear(ctx->deferred_write_pool);
}
}
for (bucket = APR_BRIGADE_FIRST(bb);
bucket != APR_BRIGADE_SENTINEL(bb);
bucket = next) {
- int did_sendfile = 0;
next = APR_BUCKET_NEXT(bucket);
#if APR_HAS_SENDFILE
if (APR_BUCKET_IS_FILE(bucket)) {
if ((apr_file_flags_get(fd) & APR_SENDFILE_ENABLED) &&
(bucket->length >= AP_MIN_SENDFILE_BYTES)) {
- did_sendfile = 1;
if (nvec > 0) {
(void)apr_socket_opt_set(s, APR_TCP_NOPUSH, 1);
rv = writev_nonblocking(s, vec, nvec, bb, bytes_written, c);
}
}
#endif /* APR_HAS_SENDFILE */
- if (!did_sendfile && !APR_BUCKET_IS_METADATA(bucket)) {
+ /* didn't sendfile */
+ if (!APR_BUCKET_IS_METADATA(bucket)) {
const char *data;
apr_size_t length;
- rv = apr_bucket_read(bucket, &data, &length, APR_BLOCK_READ);
+
+ /* Non-blocking read first, in case this is a morphing
+ * bucket type. */
+ rv = apr_bucket_read(bucket, &data, &length, APR_NONBLOCK_READ);
+ if (APR_STATUS_IS_EAGAIN(rv)) {
+ /* Read would block; flush any pending data and retry. */
+ if (nvec) {
+ rv = writev_nonblocking(s, vec, nvec, bb, bytes_written, c);
+ if (rv) {
+ return rv;
+ }
+ nvec = 0;
+ }
+
+ rv = apr_bucket_read(bucket, &data, &length, APR_BLOCK_READ);
+ }
if (rv != APR_SUCCESS) {
return rv;
}
+
/* reading may have split the bucket, so recompute next: */
next = APR_BUCKET_NEXT(bucket);
vec[nvec].iov_base = (char *)data;
pollset.reqevents = APR_POLLOUT;
pollset.desc.s = s;
apr_socket_timeout_get(s, &timeout);
- rv = apr_poll(&pollset, 1, &nsds, timeout);
+ do {
+ rv = apr_poll(&pollset, 1, &nsds, timeout);
+ } while (APR_STATUS_IS_EINTR(rv));
if (rv != APR_SUCCESS) {
break;
}
apr_size_t bytes_written = 0;
if (!APR_BUCKET_IS_FILE(bucket)) {
- ap_log_error(APLOG_MARK, APLOG_ERR, rv, c->base_server,
+ ap_log_error(APLOG_MARK, APLOG_ERR, rv, c->base_server, APLOGNO(00006)
"core_filter: sendfile_nonblocking: "
"this should never happen");
return APR_EGENERAL;