#include "mpm_common.h"
#include "scoreboard.h"
#include "mod_core.h"
-#include "mod_proxy.h"
#include "ap_listen.h"
+#include "core.h"
#include "mod_so.h" /* for ap_find_loaded_module_symbol */
#define APLOG_MODULE_INDEX AP_CORE_MODULE_INDEX
struct core_output_filter_ctx {
- apr_bucket_brigade *buffered_bb;
- apr_bucket_brigade *tmp_flush_bb;
- apr_pool_t *deferred_write_pool;
+ apr_bucket_brigade *empty_bb;
apr_size_t bytes_written;
+ struct iovec *vec;
+ apr_size_t nvec;
};
struct core_filter_ctx {
- apr_bucket_brigade *b;
+ apr_bucket_brigade *bb;
apr_bucket_brigade *tmpbb;
};
-AP_DECLARE(core_ctx_t *) ap_create_core_ctx(conn_rec *c)
-{
- core_ctx_t *ctx = apr_palloc(c->pool, sizeof(*ctx));
- ctx->b = apr_brigade_create(c->pool, c->bucket_alloc);
- ctx->tmpbb = apr_brigade_create(c->pool, c->bucket_alloc);
- return ctx;
-}
-
-AP_DECLARE(apr_bucket_brigade *) ap_core_ctx_get_bb(core_ctx_t *ctx)
-{
- return ctx->b;
-}
-
apr_status_t ap_core_input_filter(ap_filter_t *f, apr_bucket_brigade *b,
ap_input_mode_t mode, apr_read_type_e block,
apr_off_t readbytes)
{
- apr_bucket *e;
- apr_status_t rv;
+ apr_status_t rv = APR_SUCCESS;
core_net_rec *net = f->ctx;
core_ctx_t *ctx = net->in_ctx;
const char *str;
if (!ctx)
{
- net->in_ctx = ctx = ap_create_core_ctx(f->c);
+ net->in_ctx = ctx = apr_palloc(f->c->pool, sizeof(*ctx));
+ ctx->bb = apr_brigade_create(f->c->pool, f->c->bucket_alloc);
+ ctx->tmpbb = apr_brigade_create(f->c->pool, f->c->bucket_alloc);
/* seed the brigade with the client socket. */
- e = apr_bucket_socket_create(net->client_socket, f->c->bucket_alloc);
- APR_BRIGADE_INSERT_TAIL(ctx->b, e);
+ rv = ap_run_insert_network_bucket(f->c, ctx->bb, net->client_socket);
+ if (rv != APR_SUCCESS)
+ return rv;
}
- else if (APR_BRIGADE_EMPTY(ctx->b)) {
- return APR_EOF;
+ else {
+ ap_filter_reinstate_brigade(f, ctx->bb, NULL);
+ if (APR_BRIGADE_EMPTY(ctx->bb)) {
+ return APR_EOF;
+ }
}
/* ### This is bad. */
- BRIGADE_NORMALIZE(ctx->b);
+ BRIGADE_NORMALIZE(ctx->bb);
/* check for empty brigade again *AFTER* BRIGADE_NORMALIZE()
* If we have lost our socket bucket (see above), we are EOF.
* Ideally, this should be returning SUCCESS with EOS bucket, but
* some higher-up APIs (spec. read_request_line via ap_rgetline)
* want an error code. */
- if (APR_BRIGADE_EMPTY(ctx->b)) {
+ if (APR_BRIGADE_EMPTY(ctx->bb)) {
return APR_EOF;
}
if (mode == AP_MODE_GETLINE) {
/* we are reading a single LF line, e.g. the HTTP headers */
- rv = apr_brigade_split_line(b, ctx->b, block, HUGE_STRING_LEN);
+ rv = apr_brigade_split_line(b, ctx->bb, block, HUGE_STRING_LEN);
/* We should treat EAGAIN here the same as we do for EOF (brigade is
* empty). We do this by returning whatever we have read. This may
* or may not be bogus, but is consistent (for now) with EOF logic.
if (APR_STATUS_IS_EAGAIN(rv) && block == APR_NONBLOCK_READ) {
rv = APR_SUCCESS;
}
- return rv;
+ goto cleanup;
}
/* ### AP_MODE_PEEK is a horrific name for this mode because we also
* mean that there is another request, just a blank line.
*/
while (1) {
- if (APR_BRIGADE_EMPTY(ctx->b))
- return APR_EOF;
-
- e = APR_BRIGADE_FIRST(ctx->b);
+ if (APR_BRIGADE_EMPTY(ctx->bb)) {
+ rv = APR_EOF;
+ goto cleanup;
+ }
+ e = APR_BRIGADE_FIRST(ctx->bb);
rv = apr_bucket_read(e, &str, &len, APR_NONBLOCK_READ);
-
- if (rv != APR_SUCCESS)
- return rv;
+ if (rv != APR_SUCCESS) {
+ goto cleanup;
+ }
c = str;
while (c < str + len) {
else if (*c == APR_ASCII_CR && *(c + 1) == APR_ASCII_LF)
c += 2;
else
- return APR_SUCCESS;
+ goto cleanup;
}
/* If we reach here, we were a bucket just full of CRLFs, so
/* FIXME: Is this the right thing to do in the core? */
apr_bucket_delete(e);
}
- return APR_SUCCESS;
+
+ /* UNREACHABLE */
+ ap_assert(0);
}
/* If mode is EXHAUSTIVE, we want to just read everything until the end
apr_bucket *e;
/* Tack on any buckets that were set aside. */
- APR_BRIGADE_CONCAT(b, ctx->b);
+ APR_BRIGADE_CONCAT(b, ctx->bb);
/* Since we've just added all potential buckets (which will most
* likely simply be the socket bucket) we know this is the end,
* must be EOS. */
e = apr_bucket_eos_create(f->c->bucket_alloc);
APR_BRIGADE_INSERT_TAIL(b, e);
- return APR_SUCCESS;
+
+ rv = APR_SUCCESS;
+ goto cleanup;
}
/* read up to the amount they specified. */
AP_DEBUG_ASSERT(readbytes > 0);
- e = APR_BRIGADE_FIRST(ctx->b);
+ e = APR_BRIGADE_FIRST(ctx->bb);
rv = apr_bucket_read(e, &str, &len, block);
-
- if (APR_STATUS_IS_EAGAIN(rv) && block == APR_NONBLOCK_READ) {
- /* getting EAGAIN for a blocking read is an error; for a
- * non-blocking read, return an empty brigade. */
- return APR_SUCCESS;
- }
- else if (rv != APR_SUCCESS) {
- return rv;
+ if (rv != APR_SUCCESS) {
+ if (APR_STATUS_IS_EAGAIN(rv) && block == APR_NONBLOCK_READ) {
+ /* getting EAGAIN for a blocking read is an error; not for a
+ * non-blocking read, return an empty brigade. */
+ rv = APR_SUCCESS;
+ }
+ goto cleanup;
}
else if (block == APR_BLOCK_READ && len == 0) {
/* We wanted to read some bytes in blocking mode. We read
*
* When we are in normal mode, return an EOS bucket to the
* caller.
- * When we are in speculative mode, leave ctx->b empty, so
+ * When we are in speculative mode, leave ctx->bb empty, so
* that the next call returns an EOS bucket.
*/
apr_bucket_delete(e);
e = apr_bucket_eos_create(f->c->bucket_alloc);
APR_BRIGADE_INSERT_TAIL(b, e);
}
- return APR_SUCCESS;
+ goto cleanup;
}
/* Have we read as much data as we wanted (be greedy)? */
if (len < readbytes) {
apr_size_t bucket_len;
- rv = APR_SUCCESS;
/* We already registered the data in e in len */
e = APR_BUCKET_NEXT(e);
while ((len < readbytes) && (rv == APR_SUCCESS)
- && (e != APR_BRIGADE_SENTINEL(ctx->b))) {
+ && (e != APR_BRIGADE_SENTINEL(ctx->bb))) {
/* Check for the availability of buckets with known length */
if (e->length != -1) {
len += e->length;
readbytes = len;
}
- rv = apr_brigade_partition(ctx->b, readbytes, &e);
+ rv = apr_brigade_partition(ctx->bb, readbytes, &e);
if (rv != APR_SUCCESS) {
- return rv;
+ goto cleanup;
}
/* Must do move before CONCAT */
- ctx->tmpbb = apr_brigade_split_ex(ctx->b, e, ctx->tmpbb);
+ ctx->tmpbb = apr_brigade_split_ex(ctx->bb, e, ctx->tmpbb);
if (mode == AP_MODE_READBYTES) {
- APR_BRIGADE_CONCAT(b, ctx->b);
+ APR_BRIGADE_CONCAT(b, ctx->bb);
}
else if (mode == AP_MODE_SPECULATIVE) {
apr_bucket *copy_bucket;
- for (e = APR_BRIGADE_FIRST(ctx->b);
- e != APR_BRIGADE_SENTINEL(ctx->b);
+ for (e = APR_BRIGADE_FIRST(ctx->bb);
+ e != APR_BRIGADE_SENTINEL(ctx->bb);
e = APR_BUCKET_NEXT(e))
{
rv = apr_bucket_copy(e, ©_bucket);
if (rv != APR_SUCCESS) {
- return rv;
+ goto cleanup;
}
APR_BRIGADE_INSERT_TAIL(b, copy_bucket);
}
}
- /* Take what was originally there and place it back on ctx->b */
- APR_BRIGADE_CONCAT(ctx->b, ctx->tmpbb);
+ /* Take what was originally there and place it back on ctx->bb */
+ APR_BRIGADE_CONCAT(ctx->bb, ctx->tmpbb);
}
- return APR_SUCCESS;
-}
-static void setaside_remaining_output(ap_filter_t *f,
- core_output_filter_ctx_t *ctx,
- apr_bucket_brigade *bb,
- conn_rec *c);
+cleanup:
+ ap_filter_adopt_brigade(f, ctx->bb);
+ return rv;
+}
static apr_status_t send_brigade_nonblocking(apr_socket_t *s,
apr_bucket_brigade *bb,
- apr_size_t *bytes_written,
+ core_output_filter_ctx_t *ctx,
conn_rec *c);
-static void remove_empty_buckets(apr_bucket_brigade *bb);
-
-static apr_status_t send_brigade_blocking(apr_socket_t *s,
- apr_bucket_brigade *bb,
- apr_size_t *bytes_written,
- conn_rec *c);
-
static apr_status_t writev_nonblocking(apr_socket_t *s,
- struct iovec *vec, apr_size_t nvec,
apr_bucket_brigade *bb,
- apr_size_t *cumulative_bytes_written,
+ core_output_filter_ctx_t *ctx,
+ apr_size_t bytes_to_write,
+ apr_size_t nvec,
conn_rec *c);
#if APR_HAS_SENDFILE
static apr_status_t sendfile_nonblocking(apr_socket_t *s,
apr_bucket *bucket,
- apr_size_t *cumulative_bytes_written,
+ core_output_filter_ctx_t *ctx,
conn_rec *c);
#endif
-/* XXX: Should these be configurable parameters? */
-#define THRESHOLD_MIN_WRITE 4096
-#define THRESHOLD_MAX_BUFFER 65536
-#define MAX_REQUESTS_IN_PIPELINE 5
-
/* Optional function coming from mod_logio, used for logging of output
* traffic
*/
extern APR_OPTIONAL_FN_TYPE(ap_logio_add_bytes_out) *ap__logio_add_bytes_out;
-apr_status_t ap_core_output_filter(ap_filter_t *f, apr_bucket_brigade *new_bb)
+apr_status_t ap_core_output_filter(ap_filter_t *f, apr_bucket_brigade *bb)
{
conn_rec *c = f->c;
core_net_rec *net = f->ctx;
+ apr_socket_t *sock = net->client_socket;
core_output_filter_ctx_t *ctx = net->out_ctx;
- apr_bucket_brigade *bb = NULL;
- apr_bucket *bucket, *next, *flush_upto = NULL;
- apr_size_t bytes_in_brigade, non_file_bytes_in_brigade;
- int eor_buckets_in_brigade, morphing_bucket_in_brigade;
+ apr_interval_time_t sock_timeout = 0;
apr_status_t rv;
/* Fail quickly if the connection has already been aborted. */
if (c->aborted) {
- if (new_bb != NULL) {
- apr_brigade_cleanup(new_bb);
- }
+ apr_brigade_cleanup(bb);
return APR_ECONNABORTED;
}
if (ctx == NULL) {
ctx = apr_pcalloc(c->pool, sizeof(*ctx));
net->out_ctx = (core_output_filter_ctx_t *)ctx;
- rv = apr_socket_opt_set(net->client_socket, APR_SO_NONBLOCK, 1);
- if (rv != APR_SUCCESS) {
- return rv;
- }
- /*
- * Need to create tmp brigade with correct lifetime. Passing
- * NULL to apr_brigade_split_ex would result in a brigade
- * allocated from bb->pool which might be wrong.
- */
- ctx->tmp_flush_bb = apr_brigade_create(c->pool, c->bucket_alloc);
- /* same for buffered_bb and ap_save_brigade */
- ctx->buffered_bb = apr_brigade_create(c->pool, c->bucket_alloc);
}
- if (new_bb != NULL)
- bb = new_bb;
-
- if ((ctx->buffered_bb != NULL) &&
- !APR_BRIGADE_EMPTY(ctx->buffered_bb)) {
- if (new_bb != NULL) {
- APR_BRIGADE_PREPEND(bb, ctx->buffered_bb);
+ /* remain compatible with legacy MPMs that passed NULL to this filter */
+ if (bb == NULL) {
+ if (ctx->empty_bb == NULL) {
+ ctx->empty_bb = apr_brigade_create(c->pool, c->bucket_alloc);
}
else {
- bb = ctx->buffered_bb;
+ apr_brigade_cleanup(ctx->empty_bb);
}
- c->data_in_output_filters = 0;
+ bb = ctx->empty_bb;
}
- else if (new_bb == NULL) {
+
+ /* Prepend buckets set aside, if any. */
+ ap_filter_reinstate_brigade(f, bb, NULL);
+ if (APR_BRIGADE_EMPTY(bb)) {
return APR_SUCCESS;
}
- /* Scan through the brigade and decide whether to attempt a write,
- * and how much to write, based on the following rules:
- *
- * 1) The new_bb is null: Do a nonblocking write of as much as
- * possible: do a nonblocking write of as much data as possible,
- * then save the rest in ctx->buffered_bb. (If new_bb == NULL,
- * it probably means that the MPM is doing asynchronous write
- * completion and has just determined that this connection
- * is writable.)
- *
- * 2) Determine if and up to which bucket we need to do a blocking
- * write:
- *
- * a) The brigade contains a flush bucket: Do a blocking write
- * of everything up that point.
- *
- * b) The request is in CONN_STATE_HANDLER state, and the brigade
- * contains at least THRESHOLD_MAX_BUFFER bytes in non-file
- * buckets: Do blocking writes until the amount of data in the
- * buffer is less than THRESHOLD_MAX_BUFFER. (The point of this
- * rule is to provide flow control, in case a handler is
- * streaming out lots of data faster than the data can be
- * sent to the client.)
- *
- * c) The request is in CONN_STATE_HANDLER state, and the brigade
- * contains at least MAX_REQUESTS_IN_PIPELINE EOR buckets:
- * Do blocking writes until less than MAX_REQUESTS_IN_PIPELINE EOR
- * buckets are left. (The point of this rule is to prevent too many
- * FDs being kept open by pipelined requests, possibly allowing a
- * DoS).
- *
- * d) The brigade contains a morphing bucket: If there was no other
- * reason to do a blocking write yet, try reading the bucket. If its
- * contents fit into memory before THRESHOLD_MAX_BUFFER is reached,
- * everything is fine. Otherwise we need to do a blocking write the
- * up to and including the morphing bucket, because ap_save_brigade()
- * would read the whole bucket into memory later on.
- *
- * 3) Actually do the blocking write up to the last bucket determined
- * by rules 2a-d. The point of doing only one flush is to make as
- * few calls to writev() as possible.
- *
- * 4) If the brigade contains at least THRESHOLD_MIN_WRITE
- * bytes: Do a nonblocking write of as much data as possible,
- * then save the rest in ctx->buffered_bb.
- */
+ /* Non-blocking writes on the socket in any case. */
+ apr_socket_timeout_get(sock, &sock_timeout);
+ apr_socket_timeout_set(sock, 0);
- if (new_bb == NULL) {
- rv = send_brigade_nonblocking(net->client_socket, bb,
- &(ctx->bytes_written), c);
+ do {
+ rv = send_brigade_nonblocking(sock, bb, ctx, c);
if (APR_STATUS_IS_EAGAIN(rv)) {
- rv = APR_SUCCESS;
- }
- else if (rv != APR_SUCCESS) {
- /* The client has aborted the connection */
- c->aborted = 1;
- }
- setaside_remaining_output(f, ctx, bb, c);
- return rv;
- }
-
- bytes_in_brigade = 0;
- non_file_bytes_in_brigade = 0;
- eor_buckets_in_brigade = 0;
- morphing_bucket_in_brigade = 0;
-
- for (bucket = APR_BRIGADE_FIRST(bb); bucket != APR_BRIGADE_SENTINEL(bb);
- bucket = next) {
- next = APR_BUCKET_NEXT(bucket);
-
- if (!APR_BUCKET_IS_METADATA(bucket)) {
- if (bucket->length == (apr_size_t)-1) {
- /*
- * A setaside of morphing buckets would read everything into
- * memory. Instead, we will flush everything up to and
- * including this bucket.
- */
- morphing_bucket_in_brigade = 1;
- }
- else {
- bytes_in_brigade += bucket->length;
- if (!APR_BUCKET_IS_FILE(bucket))
- non_file_bytes_in_brigade += bucket->length;
- }
- }
- else if (AP_BUCKET_IS_EOR(bucket)) {
- eor_buckets_in_brigade++;
- }
-
- if (APR_BUCKET_IS_FLUSH(bucket)
- || non_file_bytes_in_brigade >= THRESHOLD_MAX_BUFFER
- || morphing_bucket_in_brigade
- || eor_buckets_in_brigade > MAX_REQUESTS_IN_PIPELINE) {
- /* this segment of the brigade MUST be sent before returning. */
-
- if (APLOGctrace6(c)) {
- char *reason = APR_BUCKET_IS_FLUSH(bucket) ?
- "FLUSH bucket" :
- (non_file_bytes_in_brigade >= THRESHOLD_MAX_BUFFER) ?
- "THRESHOLD_MAX_BUFFER" :
- morphing_bucket_in_brigade ? "morphing bucket" :
- "MAX_REQUESTS_IN_PIPELINE";
- ap_log_cerror(APLOG_MARK, APLOG_TRACE6, 0, c,
- "core_output_filter: flushing because of %s",
- reason);
- }
- /*
- * Defer the actual blocking write to avoid doing many writes.
+ /* Scan through the brigade and decide whether we must absolutely
+ * flush the remaining data, based on ap_filter_reinstate_brigade()
+ * rules. If so, wait for writability and retry, otherwise we did
+ * our best already and can wait for the next call.
*/
- flush_upto = next;
-
- bytes_in_brigade = 0;
- non_file_bytes_in_brigade = 0;
- eor_buckets_in_brigade = 0;
- morphing_bucket_in_brigade = 0;
- }
- }
-
- if (flush_upto != NULL) {
- ctx->tmp_flush_bb = apr_brigade_split_ex(bb, flush_upto,
- ctx->tmp_flush_bb);
- rv = send_brigade_blocking(net->client_socket, bb,
- &(ctx->bytes_written), c);
- if (rv != APR_SUCCESS) {
- /* The client has aborted the connection */
- c->aborted = 1;
- return rv;
- }
- APR_BRIGADE_CONCAT(bb, ctx->tmp_flush_bb);
- }
-
- if (bytes_in_brigade >= THRESHOLD_MIN_WRITE) {
- rv = send_brigade_nonblocking(net->client_socket, bb,
- &(ctx->bytes_written), c);
- if ((rv != APR_SUCCESS) && (!APR_STATUS_IS_EAGAIN(rv))) {
- /* The client has aborted the connection */
- c->aborted = 1;
- return rv;
+ apr_bucket *flush_upto;
+ ap_filter_reinstate_brigade(f, bb, &flush_upto);
+ if (flush_upto) {
+ apr_int32_t nfd;
+ apr_pollfd_t pfd;
+ memset(&pfd, 0, sizeof(pfd));
+ pfd.reqevents = APR_POLLOUT;
+ pfd.desc_type = APR_POLL_SOCKET;
+ pfd.desc.s = sock;
+ pfd.p = c->pool;
+ do {
+ rv = apr_poll(&pfd, 1, &nfd, sock_timeout);
+ } while (APR_STATUS_IS_EINTR(rv));
+ }
}
- }
+ } while (rv == APR_SUCCESS && !APR_BRIGADE_EMPTY(bb));
- setaside_remaining_output(f, ctx, bb, c);
- return APR_SUCCESS;
-}
+ /* Restore original socket timeout before leaving. */
+ apr_socket_timeout_set(sock, sock_timeout);
-/*
- * This function assumes that either ctx->buffered_bb == NULL, or
- * ctx->buffered_bb is empty, or ctx->buffered_bb == bb
- */
-static void setaside_remaining_output(ap_filter_t *f,
- core_output_filter_ctx_t *ctx,
- apr_bucket_brigade *bb,
- conn_rec *c)
-{
- if (bb == NULL) {
- return;
- }
- remove_empty_buckets(bb);
- if (!APR_BRIGADE_EMPTY(bb)) {
- c->data_in_output_filters = 1;
- if (bb != ctx->buffered_bb) {
- if (!ctx->deferred_write_pool) {
- apr_pool_create(&ctx->deferred_write_pool, c->pool);
- apr_pool_tag(ctx->deferred_write_pool, "deferred_write");
- }
- ap_save_brigade(f, &(ctx->buffered_bb), &bb,
- ctx->deferred_write_pool);
- apr_brigade_cleanup(bb);
- }
- }
- else if (ctx->deferred_write_pool) {
+ if (rv != APR_SUCCESS && !APR_STATUS_IS_EAGAIN(rv)) {
+ /* The client has aborted the connection */
+ ap_log_cerror(
+ APLOG_MARK, APLOG_TRACE1, rv, c,
+ "core_output_filter: writing data to the network");
/*
- * There are no more requests in the pipeline. We can just clear the
- * pool.
+ * Set c->aborted before apr_brigade_cleanup to have the correct status
+ * when logging the request as apr_brigade_cleanup triggers the logging
+ * of the request if it contains an EOR bucket.
*/
- apr_pool_clear(ctx->deferred_write_pool);
+ c->aborted = 1;
+ apr_brigade_cleanup(bb);
+ return rv;
}
+
+ return ap_filter_setaside_brigade(f, bb);
}
#ifndef APR_MAX_IOVEC_SIZE
-#define MAX_IOVEC_TO_WRITE 16
+#define NVEC_MIN 16
+#define NVEC_MAX NVEC_MIN
#else
#if APR_MAX_IOVEC_SIZE > 16
-#define MAX_IOVEC_TO_WRITE 16
+#define NVEC_MIN 16
#else
-#define MAX_IOVEC_TO_WRITE APR_MAX_IOVEC_SIZE
+#define NVEC_MIN APR_MAX_IOVEC_SIZE
#endif
+#define NVEC_MAX APR_MAX_IOVEC_SIZE
+#endif
+
+static APR_INLINE int is_in_memory_bucket(apr_bucket *b)
+{
+ /* These buckets' data are already in memory. */
+ return APR_BUCKET_IS_HEAP(b)
+ || APR_BUCKET_IS_POOL(b)
+ || APR_BUCKET_IS_TRANSIENT(b)
+ || APR_BUCKET_IS_IMMORTAL(b);
+}
+
+#if APR_HAS_SENDFILE
+static APR_INLINE int can_sendfile_bucket(apr_bucket *b)
+{
+ /* Use sendfile to send the bucket unless:
+ * - the bucket is not a file bucket, or
+ * - the file is too small for sendfile to be useful, or
+ * - sendfile is disabled in the httpd config via "EnableSendfile off".
+ */
+ if (APR_BUCKET_IS_FILE(b) && b->length >= AP_MIN_SENDFILE_BYTES) {
+ apr_file_t *file = ((apr_bucket_file *)b->data)->fd;
+ return apr_file_flags_get(file) & APR_SENDFILE_ENABLED;
+ }
+ else {
+ return 0;
+ }
+}
#endif
static apr_status_t send_brigade_nonblocking(apr_socket_t *s,
apr_bucket_brigade *bb,
- apr_size_t *bytes_written,
+ core_output_filter_ctx_t *ctx,
conn_rec *c)
{
+ apr_status_t rv = APR_SUCCESS;
+ core_server_config *conf =
+ ap_get_core_module_config(c->base_server->module_config);
+ apr_size_t nvec = 0, nbytes = 0;
apr_bucket *bucket, *next;
- apr_status_t rv;
- struct iovec vec[MAX_IOVEC_TO_WRITE];
- apr_size_t nvec = 0;
-
- remove_empty_buckets(bb);
+ const char *data;
+ apr_size_t length;
for (bucket = APR_BRIGADE_FIRST(bb);
bucket != APR_BRIGADE_SENTINEL(bb);
bucket = next) {
next = APR_BUCKET_NEXT(bucket);
-#if APR_HAS_SENDFILE
- if (APR_BUCKET_IS_FILE(bucket)) {
- apr_bucket_file *file_bucket = (apr_bucket_file *)(bucket->data);
- apr_file_t *fd = file_bucket->fd;
- /* Use sendfile to send this file unless:
- * - the platform doesn't support sendfile,
- * - the file is too small for sendfile to be useful, or
- * - sendfile is disabled in the httpd config via "EnableSendfile off"
- */
- if ((apr_file_flags_get(fd) & APR_SENDFILE_ENABLED) &&
- (bucket->length >= AP_MIN_SENDFILE_BYTES)) {
- if (nvec > 0) {
- (void)apr_socket_opt_set(s, APR_TCP_NOPUSH, 1);
- rv = writev_nonblocking(s, vec, nvec, bb, bytes_written, c);
- nvec = 0;
- if (rv != APR_SUCCESS) {
- (void)apr_socket_opt_set(s, APR_TCP_NOPUSH, 0);
- return rv;
- }
- }
- rv = sendfile_nonblocking(s, bucket, bytes_written, c);
- if (nvec > 0) {
- (void)apr_socket_opt_set(s, APR_TCP_NOPUSH, 0);
- }
+#if APR_HAS_SENDFILE
+ if (can_sendfile_bucket(bucket)) {
+ if (nvec > 0) {
+ (void)apr_socket_opt_set(s, APR_TCP_NOPUSH, 1);
+ rv = writev_nonblocking(s, bb, ctx, nbytes, nvec, c);
if (rv != APR_SUCCESS) {
- return rv;
+ goto cleanup;
}
- break;
+ nbytes = 0;
+ nvec = 0;
}
+ rv = sendfile_nonblocking(s, bucket, ctx, c);
+ if (rv != APR_SUCCESS) {
+ goto cleanup;
+ }
+ continue;
}
#endif /* APR_HAS_SENDFILE */
- /* didn't sendfile */
- if (!APR_BUCKET_IS_METADATA(bucket)) {
- const char *data;
- apr_size_t length;
-
+
+ if (bucket->length) {
/* Non-blocking read first, in case this is a morphing
* bucket type. */
rv = apr_bucket_read(bucket, &data, &length, APR_NONBLOCK_READ);
if (APR_STATUS_IS_EAGAIN(rv)) {
/* Read would block; flush any pending data and retry. */
if (nvec) {
- rv = writev_nonblocking(s, vec, nvec, bb, bytes_written, c);
- if (rv) {
- return rv;
+ rv = writev_nonblocking(s, bb, ctx, nbytes, nvec, c);
+ if (rv != APR_SUCCESS) {
+ goto cleanup;
}
+ nbytes = 0;
nvec = 0;
}
-
+ (void)apr_socket_opt_set(s, APR_TCP_NOPUSH, 0);
+
rv = apr_bucket_read(bucket, &data, &length, APR_BLOCK_READ);
}
if (rv != APR_SUCCESS) {
- return rv;
+ goto cleanup;
}
/* reading may have split the bucket, so recompute next: */
next = APR_BUCKET_NEXT(bucket);
- vec[nvec].iov_base = (char *)data;
- vec[nvec].iov_len = length;
- nvec++;
- if (nvec == MAX_IOVEC_TO_WRITE) {
- rv = writev_nonblocking(s, vec, nvec, bb, bytes_written, c);
- nvec = 0;
- if (rv != APR_SUCCESS) {
- return rv;
- }
- break;
- }
}
- }
- if (nvec > 0) {
- rv = writev_nonblocking(s, vec, nvec, bb, bytes_written, c);
- if (rv != APR_SUCCESS) {
- return rv;
+ if (!bucket->length) {
+ /* Don't delete empty buckets until all the previous ones have been
+ * sent (nvec == 0); this must happen in sequence since metabuckets
+ * like EOR could free the data still pointed to by the iovec. So
+ * unless the latter is empty, let writev_nonblocking() cleanup the
+ * brigade in order.
+ */
+ if (!nvec) {
+ apr_bucket_delete(bucket);
+ }
+ continue;
}
- }
-
- remove_empty_buckets(bb);
- return APR_SUCCESS;
-}
-
-static void remove_empty_buckets(apr_bucket_brigade *bb)
-{
- apr_bucket *bucket;
- while (((bucket = APR_BRIGADE_FIRST(bb)) != APR_BRIGADE_SENTINEL(bb)) &&
- (APR_BUCKET_IS_METADATA(bucket) || (bucket->length == 0))) {
- APR_BUCKET_REMOVE(bucket);
- apr_bucket_destroy(bucket);
- }
-}
-
-static apr_status_t send_brigade_blocking(apr_socket_t *s,
- apr_bucket_brigade *bb,
- apr_size_t *bytes_written,
- conn_rec *c)
-{
- apr_status_t rv;
-
- rv = APR_SUCCESS;
- while (!APR_BRIGADE_EMPTY(bb)) {
- rv = send_brigade_nonblocking(s, bb, bytes_written, c);
- if (rv != APR_SUCCESS) {
- if (APR_STATUS_IS_EAGAIN(rv)) {
- /* Wait until we can send more data */
- apr_int32_t nsds;
- apr_interval_time_t timeout;
- apr_pollfd_t pollset;
-
- pollset.p = c->pool;
- pollset.desc_type = APR_POLL_SOCKET;
- pollset.reqevents = APR_POLLOUT;
- pollset.desc.s = s;
- apr_socket_timeout_get(s, &timeout);
- rv = apr_poll(&pollset, 1, &nsds, timeout);
+ /* Make sure that these new data fit in our iovec. */
+ if (nvec == ctx->nvec) {
+ if (nvec == NVEC_MAX) {
+ (void)apr_socket_opt_set(s, APR_TCP_NOPUSH, 1);
+ rv = writev_nonblocking(s, bb, ctx, nbytes, nvec, c);
if (rv != APR_SUCCESS) {
- break;
+ goto cleanup;
}
+ nbytes = 0;
+ nvec = 0;
}
else {
- break;
+ struct iovec *newvec;
+ apr_size_t newn = nvec * 2;
+ if (newn < NVEC_MIN) {
+ newn = NVEC_MIN;
+ }
+ else if (newn > NVEC_MAX) {
+ newn = NVEC_MAX;
+ }
+ newvec = apr_palloc(c->pool, newn * sizeof(struct iovec));
+ if (nvec) {
+ memcpy(newvec, ctx->vec, nvec * sizeof(struct iovec));
+ }
+ ctx->vec = newvec;
+ ctx->nvec = newn;
}
}
+ nbytes += length;
+ ctx->vec[nvec].iov_base = (void *)data;
+ ctx->vec[nvec].iov_len = length;
+ nvec++;
+
+ /* Flush above max threshold, unless the brigade still contains in
+ * memory buckets which we want to try writing in the same pass (if
+ * we are at the end of the brigade, the write will happen outside
+ * the loop anyway).
+ */
+ if (nbytes >= conf->flush_max_threshold
+ && next != APR_BRIGADE_SENTINEL(bb)
+ && !is_in_memory_bucket(next)) {
+ (void)apr_socket_opt_set(s, APR_TCP_NOPUSH, 1);
+ rv = writev_nonblocking(s, bb, ctx, nbytes, nvec, c);
+ if (rv != APR_SUCCESS) {
+ goto cleanup;
+ }
+ nbytes = 0;
+ nvec = 0;
+ }
+ }
+ if (nvec > 0) {
+ rv = writev_nonblocking(s, bb, ctx, nbytes, nvec, c);
}
+
+cleanup:
+ (void)apr_socket_opt_set(s, APR_TCP_NOPUSH, 0);
return rv;
}
static apr_status_t writev_nonblocking(apr_socket_t *s,
- struct iovec *vec, apr_size_t nvec,
apr_bucket_brigade *bb,
- apr_size_t *cumulative_bytes_written,
+ core_output_filter_ctx_t *ctx,
+ apr_size_t bytes_to_write,
+ apr_size_t nvec,
conn_rec *c)
{
- apr_status_t rv = APR_SUCCESS, arv;
- apr_size_t bytes_written = 0, bytes_to_write = 0;
- apr_size_t i, offset;
- apr_interval_time_t old_timeout;
-
- arv = apr_socket_timeout_get(s, &old_timeout);
- if (arv != APR_SUCCESS) {
- return arv;
- }
- arv = apr_socket_timeout_set(s, 0);
- if (arv != APR_SUCCESS) {
- return arv;
- }
+ apr_status_t rv;
+ struct iovec *vec = ctx->vec;
+ apr_size_t bytes_written = 0;
+ apr_size_t i, offset = 0;
- for (i = 0; i < nvec; i++) {
- bytes_to_write += vec[i].iov_len;
- }
- offset = 0;
- while (bytes_written < bytes_to_write) {
+ do {
apr_size_t n = 0;
rv = apr_socket_sendv(s, vec + offset, nvec - offset, &n);
- if (n > 0) {
- bytes_written += n;
- for (i = offset; i < nvec; ) {
- apr_bucket *bucket = APR_BRIGADE_FIRST(bb);
- if (APR_BUCKET_IS_METADATA(bucket)) {
- APR_BUCKET_REMOVE(bucket);
- apr_bucket_destroy(bucket);
- }
- else if (n >= vec[i].iov_len) {
- APR_BUCKET_REMOVE(bucket);
- apr_bucket_destroy(bucket);
- offset++;
- n -= vec[i++].iov_len;
- }
- else {
+ bytes_written += n;
+
+ for (i = offset; i < nvec; ) {
+ apr_bucket *bucket = APR_BRIGADE_FIRST(bb);
+ if (!bucket->length) {
+ apr_bucket_delete(bucket);
+ }
+ else if (n >= vec[i].iov_len) {
+ apr_bucket_delete(bucket);
+ n -= vec[i++].iov_len;
+ offset++;
+ }
+ else {
+ if (n) {
apr_bucket_split(bucket, n);
- APR_BUCKET_REMOVE(bucket);
- apr_bucket_destroy(bucket);
+ apr_bucket_delete(bucket);
vec[i].iov_len -= n;
vec[i].iov_base = (char *) vec[i].iov_base + n;
- break;
}
+ break;
}
}
- if (rv != APR_SUCCESS) {
- break;
- }
- }
+ } while (rv == APR_SUCCESS && bytes_written < bytes_to_write);
+
if ((ap__logio_add_bytes_out != NULL) && (bytes_written > 0)) {
ap__logio_add_bytes_out(c, bytes_written);
}
- *cumulative_bytes_written += bytes_written;
+ ctx->bytes_written += bytes_written;
- arv = apr_socket_timeout_set(s, old_timeout);
- if ((arv != APR_SUCCESS) && (rv == APR_SUCCESS)) {
- return arv;
- }
- else {
- return rv;
- }
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE6, rv, c,
+ "writev_nonblocking: %"APR_SIZE_T_FMT"/%"APR_SIZE_T_FMT,
+ bytes_written, bytes_to_write);
+ return rv;
}
#if APR_HAS_SENDFILE
static apr_status_t sendfile_nonblocking(apr_socket_t *s,
apr_bucket *bucket,
- apr_size_t *cumulative_bytes_written,
+ core_output_filter_ctx_t *ctx,
conn_rec *c)
{
- apr_status_t rv = APR_SUCCESS;
- apr_bucket_file *file_bucket;
- apr_file_t *fd;
- apr_size_t file_length;
- apr_off_t file_offset;
- apr_size_t bytes_written = 0;
+ apr_status_t rv;
+ apr_file_t *file = ((apr_bucket_file *)bucket->data)->fd;
+ apr_size_t bytes_written = bucket->length; /* bytes_to_write for now */
+ apr_off_t file_offset = bucket->start;
- if (!APR_BUCKET_IS_FILE(bucket)) {
- ap_log_error(APLOG_MARK, APLOG_ERR, rv, c->base_server, APLOGNO(00006)
- "core_filter: sendfile_nonblocking: "
- "this should never happen");
- return APR_EGENERAL;
- }
- file_bucket = (apr_bucket_file *)(bucket->data);
- fd = file_bucket->fd;
- file_length = bucket->length;
- file_offset = bucket->start;
-
- if (bytes_written < file_length) {
- apr_size_t n = file_length - bytes_written;
- apr_status_t arv;
- apr_interval_time_t old_timeout;
-
- arv = apr_socket_timeout_get(s, &old_timeout);
- if (arv != APR_SUCCESS) {
- return arv;
- }
- arv = apr_socket_timeout_set(s, 0);
- if (arv != APR_SUCCESS) {
- return arv;
- }
- rv = apr_socket_sendfile(s, fd, NULL, &file_offset, &n, 0);
- if (rv == APR_SUCCESS) {
- bytes_written += n;
- file_offset += n;
- }
- arv = apr_socket_timeout_set(s, old_timeout);
- if ((arv != APR_SUCCESS) && (rv == APR_SUCCESS)) {
- rv = arv;
- }
- }
+ rv = apr_socket_sendfile(s, file, NULL, &file_offset, &bytes_written, 0);
if ((ap__logio_add_bytes_out != NULL) && (bytes_written > 0)) {
ap__logio_add_bytes_out(c, bytes_written);
}
- *cumulative_bytes_written += bytes_written;
- if ((bytes_written < file_length) && (bytes_written > 0)) {
- apr_bucket_split(bucket, bytes_written);
- APR_BUCKET_REMOVE(bucket);
- apr_bucket_destroy(bucket);
+ ctx->bytes_written += bytes_written;
+
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE6, rv, c,
+ "sendfile_nonblocking: %" APR_SIZE_T_FMT "/%" APR_SIZE_T_FMT,
+ bytes_written, bucket->length);
+ if (bytes_written >= bucket->length) {
+ apr_bucket_delete(bucket);
}
- else if (bytes_written == file_length) {
- APR_BUCKET_REMOVE(bucket);
- apr_bucket_destroy(bucket);
+ else if (bytes_written > 0) {
+ apr_bucket_split(bucket, bytes_written);
+ apr_bucket_delete(bucket);
+ if (rv == APR_SUCCESS) {
+ rv = APR_EAGAIN;
+ }
}
return rv;
}