#include "http_core.h"
#include "http_config.h"
#include "http_protocol.h"
+#include "http_request.h"
#include "http_log.h"
#include "serf.h"
int done_headers;
int keep_reading;
request_rec *r;
+ apr_pool_t *serf_pool;
+ apr_bucket_brigade *tmpbb;
serf_config_t *conf;
serf_ssl_context_t *ssl_ctx;
serf_bucket_alloc_t *bkt_alloc;
} s_baton_t;
+/**
+ * This works right now because all timers are invoked in the single listener
+ * thread in the Event MPM -- the same thread that serf callbacks are made
+ * from, so we don't technically need a mutex yet, but with the Simple MPM,
+ * invocations are made from worker threads, and we need to figure out locking
+ */
+static void timed_cleanup_callback(void *baton)
+{
+ s_baton_t *ctx = baton;
+
+ ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, ctx->r, "serf: timed_cleanup_callback");
+
+ /* Causes all serf connections to unregister from the event mpm */
+ apr_pool_destroy(ctx->serf_pool);
+ if (ctx->rstatus) {
+ ap_log_rerror(APLOG_MARK, APLOG_ERR, ctx->rstatus, ctx->r,
+ "serf: request returned: %d", ctx->rstatus);
+ ctx->r->status = HTTP_OK;
+ ap_die(ctx->rstatus, ctx->r);
+ }
+ else {
+ ap_finalize_request_protocol(ctx->r);
+ ap_process_request_after_handler(ctx->r);
+ return;
+ }
+}
+
static void closed_connection(serf_connection_t *conn,
void *closed_baton,
apr_status_t why,
{
s_baton_t *ctx = closed_baton;
+ ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, ctx->r, "serf: closed_connection");
+
if (why) {
/* justin says that error handling isn't done yet. hah. */
/* XXXXXX: review */
ap_log_rerror(APLOG_MARK, APLOG_ERR, why, ctx->r, "Closed Connection Error");
ctx->rstatus = HTTP_INTERNAL_SERVER_ERROR;
- return;
+ }
+
+ if (mpm_supprts_serf) {
+ ap_mpm_register_timed_callback(apr_time_from_msec(1),
+ timed_cleanup_callback, ctx);
}
}
serf_bucket_t *c;
s_baton_t *ctx = setup_baton;
+ ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, ctx->r, "serf: conn_setup ");
+
c = serf_bucket_socket_create(sock, ctx->bkt_alloc);
if (ctx->want_ssl) {
c = serf_bucket_ssl_decrypt_create(c, ctx->ssl_ctx, ctx->bkt_alloc);
void *acceptor_baton,
apr_pool_t *pool)
{
+ s_baton_t *ctx = acceptor_baton;
serf_bucket_t *c;
serf_bucket_alloc_t *bkt_alloc;
+ ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, ctx->r, "serf: accept_response");
+
/* get the per-request bucket allocator */
bkt_alloc = serf_request_get_alloc(request);
apr_size_t len;
serf_status_line sl;
+ ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, ctx->r, "serf: handle_response");
+
/* XXXXXXX: Create better error message. */
rv = serf_bucket_response_status(response, &sl);
if (rv) {
ctx->rstatus = HTTP_INTERNAL_SERVER_ERROR;
+ if (mpm_supprts_serf) {
+ ap_mpm_register_timed_callback(apr_time_from_msec(1),
+ timed_cleanup_callback, ctx);
+ }
+
return rv;
}
**/
do {
+ apr_bucket *e;
+ apr_brigade_cleanup(ctx->tmpbb);
rv = serf_bucket_read(response, AP_IOBUFSIZE, &data, &len);
if (SERF_BUCKET_READ_ERROR(rv)) {
serf_bucket_headers_do(hdrs, copy_headers_out, ctx);
ctx->done_headers = 1;
}
-
- /* XXXX: write to brigades and stuff. meh */
- ap_rwrite(data, len, ctx->r);
+
+
+ if (len > 0) {
+ /* TODO: make APR bucket <-> serf bucket stuff more magical. */
+ e = apr_bucket_immortal_create(data, len, ctx->r->connection->bucket_alloc);
+ APR_BRIGADE_INSERT_TAIL(ctx->tmpbb, e);
+ }
+
+ ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, ctx->r, "serf: writing %"APR_SIZE_T_FMT" bytes", len);
if (APR_STATUS_IS_EOF(rv)) {
ctx->keep_reading = 0;
+ e = apr_bucket_flush_create(ctx->r->connection->bucket_alloc);
+ APR_BRIGADE_INSERT_TAIL(ctx->tmpbb, e);
+
+ ctx->rstatus = ap_pass_brigade(ctx->r->output_filters, ctx->tmpbb);
+
+ if (mpm_supprts_serf) {
+ ap_mpm_register_timed_callback(apr_time_from_msec(1),
+ timed_cleanup_callback, ctx);
+ }
return APR_EOF;
}
+ ctx->rstatus = ap_pass_brigade(ctx->r->output_filters, ctx->tmpbb);
+
/* XXXX: Should we send a flush now? */
if (APR_STATUS_IS_EAGAIN(rv)) {
return APR_SUCCESS;
serf_bucket_t *hdrs_bkt;
serf_bucket_t *body_bkt = NULL;
+ ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, ctx->r, "serf: setup_request");
/* XXXXX: handle incoming request bodies */
*req_bkt = serf_bucket_request_create(ctx->r->method, ctx->r->unparsed_uri, body_bkt,
return APR_SUCCESS;
}
-static void
-timed_callback(void *baton)
-{
- s_baton_t *ctx = baton;
-
- if (ctx->keep_reading) {
- ap_mpm_register_timed_callback(apr_time_from_msec(100), timed_callback, baton);
- }
- else if (ctx->rstatus) {
- ap_log_rerror(APLOG_MARK, APLOG_ERR, ctx->rstatus, ctx->r,
- "serf: request returned: %d", ctx->rstatus);
- ctx->r->status = HTTP_OK;
- ap_die(ctx->rstatus, ctx->r);
- }
- else {
- ap_finalize_request_protocol(ctx->r);
- ap_process_request_after_handler(ctx->r);
- return;
- }
-}
-
-
#ifndef apr_time_from_msec
#define apr_time_from_msec(x) (x * 1000)
#endif
static int drive_serf(request_rec *r, serf_config_t *conf)
{
apr_status_t rv;
- apr_pool_t *pool = r->pool;
+ apr_pool_t *pool;
apr_sockaddr_t *address;
s_baton_t *baton = apr_palloc(r->pool, sizeof(s_baton_t));
/* XXXXX: make persistent/per-process or something.*/
serf_server_config_t *ctx =
(serf_server_config_t *)ap_get_module_config(r->server->module_config,
&serf_module);
-
+
+ /* Allocate everything out of a subpool, with a shorter lifetime than
+ * the main request, so that we can cleanup safely and remove our events
+ * from the main serf context in the async mpm mode.
+ */
+ apr_pool_create(&pool, r->pool);
if (strcmp(conf->url.scheme, "cluster") == 0) {
int rc;
ap_serf_cluster_provider_t *cp;
baton->r = r;
baton->conf = conf;
+ baton->serf_pool = pool;
baton->bkt_alloc = serf_bucket_allocator_create(pool, NULL, NULL);
baton->ssl_ctx = NULL;
baton->rstatus = OK;
+ baton->tmpbb = apr_brigade_create(r->pool, r->connection->bucket_alloc);
baton->done_headers = 0;
baton->keep_reading = 1;
baton);
if (mpm_supprts_serf) {
-
- rv = ap_mpm_register_timed_callback(apr_time_from_msec(100), timed_callback, baton);
-
- if (rv != APR_SUCCESS) {
- ap_log_rerror(APLOG_MARK, APLOG_ERR, rv, r, "ap_mpm_register_timed_callback failed.");
- return HTTP_INTERNAL_SERVER_ERROR;
- }
-
+ ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, baton->r, "handing off serf request to mpm");
return SUSPENDED;
}
else {