]> granicus.if.org Git - apache/commitdiff
Work in Progress.
authorPaul Querna <pquerna@apache.org>
Sat, 28 Mar 2009 13:24:18 +0000 (13:24 +0000)
committerPaul Querna <pquerna@apache.org>
Sat, 28 Mar 2009 13:24:18 +0000 (13:24 +0000)
Convert the 100ms timed callback to a single cleanup callback that is added
when the request is ready to finish. Basically works, though it has
some issues with flushing and closing the connection.

git-svn-id: https://svn.apache.org/repos/asf/httpd/httpd/trunk@759460 13f79535-47bb-0310-9956-ffa450edef68

modules/proxy/mod_serf.c

index 667b07f862041132c024eb664331d54917b05482..49e4cb5cd270f2e9f83704875b4c6f706177a727 100644 (file)
@@ -20,6 +20,7 @@
 #include "http_core.h"
 #include "http_config.h"
 #include "http_protocol.h"
+#include "http_request.h"
 #include "http_log.h"
 
 #include "serf.h"
@@ -53,12 +54,41 @@ typedef struct {
     int done_headers;
     int keep_reading;
     request_rec *r;
+    apr_pool_t *serf_pool;
+    apr_bucket_brigade *tmpbb;
     serf_config_t *conf;
     serf_ssl_context_t *ssl_ctx;
     serf_bucket_alloc_t *bkt_alloc;
 } s_baton_t;
 
 
+/**
+ * This works right now because all timers are invoked in the single listener
+ * thread in the Event MPM -- the same thread that serf callbacks are made
+ * from, so we don't technically need a mutex yet, but with the Simple MPM,
+ * invocations are made from worker threads, and we need to figure out locking
+ */
+static void timed_cleanup_callback(void *baton)
+{
+    s_baton_t *ctx = baton;
+    
+    ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, ctx->r, "serf: timed_cleanup_callback");
+
+    /* Causes all serf connections to unregister from the event mpm */
+    apr_pool_destroy(ctx->serf_pool);
+    if (ctx->rstatus) {
+        ap_log_rerror(APLOG_MARK, APLOG_ERR, ctx->rstatus, ctx->r,
+                      "serf: request returned: %d", ctx->rstatus);
+        ctx->r->status = HTTP_OK;
+        ap_die(ctx->rstatus, ctx->r);
+    }
+    else {
+        ap_finalize_request_protocol(ctx->r);
+        ap_process_request_after_handler(ctx->r);
+        return;
+    }
+}
+
 static void closed_connection(serf_connection_t *conn,
                               void *closed_baton,
                               apr_status_t why,
@@ -66,12 +96,18 @@ static void closed_connection(serf_connection_t *conn,
 {
     s_baton_t *ctx = closed_baton;
 
+    ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, ctx->r, "serf: closed_connection");
+
     if (why) {
         /* justin says that error handling isn't done yet. hah. */
         /* XXXXXX: review */
         ap_log_rerror(APLOG_MARK, APLOG_ERR, why, ctx->r, "Closed Connection Error");
         ctx->rstatus = HTTP_INTERNAL_SERVER_ERROR;
-        return;
+    }
+
+    if (mpm_supprts_serf) {
+        ap_mpm_register_timed_callback(apr_time_from_msec(1),
+                                       timed_cleanup_callback, ctx);
     }
 }
 
@@ -82,6 +118,8 @@ static serf_bucket_t* conn_setup(apr_socket_t *sock,
     serf_bucket_t *c;
     s_baton_t *ctx = setup_baton;
 
+    ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, ctx->r, "serf: conn_setup ");
+
     c = serf_bucket_socket_create(sock, ctx->bkt_alloc);
     if (ctx->want_ssl) {
         c = serf_bucket_ssl_decrypt_create(c, ctx->ssl_ctx, ctx->bkt_alloc);
@@ -196,9 +234,12 @@ static serf_bucket_t* accept_response(serf_request_t *request,
                                       void *acceptor_baton,
                                       apr_pool_t *pool)
 {
+    s_baton_t *ctx = acceptor_baton;
     serf_bucket_t *c;
     serf_bucket_alloc_t *bkt_alloc;
 
+    ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, ctx->r, "serf: accept_response");
+
     /* get the per-request bucket allocator */
     bkt_alloc = serf_request_get_alloc(request);
 
@@ -219,6 +260,8 @@ static apr_status_t handle_response(serf_request_t *request,
     apr_size_t len;
     serf_status_line sl;
 
+    ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, ctx->r, "serf: handle_response");
+
     /* XXXXXXX: Create better error message. */
     rv = serf_bucket_response_status(response, &sl);
     if (rv) {
@@ -230,6 +273,11 @@ static apr_status_t handle_response(serf_request_t *request,
 
         ctx->rstatus = HTTP_INTERNAL_SERVER_ERROR;
 
+        if (mpm_supprts_serf) {
+            ap_mpm_register_timed_callback(apr_time_from_msec(1),
+                                           timed_cleanup_callback, ctx);
+        }
+
         return rv;
     }
     
@@ -239,6 +287,8 @@ static apr_status_t handle_response(serf_request_t *request,
      **/
 
     do {
+        apr_bucket *e;
+        apr_brigade_cleanup(ctx->tmpbb);
         rv = serf_bucket_read(response, AP_IOBUFSIZE, &data, &len);
 
         if (SERF_BUCKET_READ_ERROR(rv)) {
@@ -252,15 +302,32 @@ static apr_status_t handle_response(serf_request_t *request,
             serf_bucket_headers_do(hdrs, copy_headers_out, ctx);
             ctx->done_headers = 1;
         }
-        
-        /* XXXX: write to brigades and stuff. meh */
-        ap_rwrite(data, len, ctx->r);
+
+
+        if (len > 0) {
+            /* TODO: make APR bucket <-> serf bucket stuff more magical. */
+            e = apr_bucket_immortal_create(data, len, ctx->r->connection->bucket_alloc);
+            APR_BRIGADE_INSERT_TAIL(ctx->tmpbb, e);
+        }
+
+        ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, ctx->r, "serf: writing %"APR_SIZE_T_FMT" bytes", len);
 
         if (APR_STATUS_IS_EOF(rv)) {
             ctx->keep_reading = 0;
+            e = apr_bucket_flush_create(ctx->r->connection->bucket_alloc);
+            APR_BRIGADE_INSERT_TAIL(ctx->tmpbb, e);
+
+            ctx->rstatus = ap_pass_brigade(ctx->r->output_filters, ctx->tmpbb);
+
+            if (mpm_supprts_serf) {
+                ap_mpm_register_timed_callback(apr_time_from_msec(1),
+                                               timed_cleanup_callback, ctx);
+            }
             return APR_EOF;
         }
 
+        ctx->rstatus = ap_pass_brigade(ctx->r->output_filters, ctx->tmpbb);
+
         /* XXXX: Should we send a flush now? */
         if (APR_STATUS_IS_EAGAIN(rv)) {
             return APR_SUCCESS;
@@ -283,6 +350,7 @@ static apr_status_t setup_request(serf_request_t *request,
     serf_bucket_t *hdrs_bkt;
     serf_bucket_t *body_bkt = NULL;
 
+    ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, ctx->r, "serf: setup_request");
 
     /* XXXXX: handle incoming request bodies */
     *req_bkt = serf_bucket_request_create(ctx->r->method, ctx->r->unparsed_uri, body_bkt,
@@ -326,28 +394,6 @@ static apr_status_t setup_request(serf_request_t *request,
     return APR_SUCCESS;
 }
 
-static void 
-timed_callback(void *baton)
-{
-    s_baton_t *ctx = baton;
-
-    if (ctx->keep_reading) {
-        ap_mpm_register_timed_callback(apr_time_from_msec(100), timed_callback, baton);
-    }
-    else if (ctx->rstatus) {
-        ap_log_rerror(APLOG_MARK, APLOG_ERR, ctx->rstatus, ctx->r,
-                      "serf: request returned: %d", ctx->rstatus);
-        ctx->r->status = HTTP_OK;
-        ap_die(ctx->rstatus, ctx->r);
-    }
-    else {
-        ap_finalize_request_protocol(ctx->r);
-        ap_process_request_after_handler(ctx->r);
-        return;
-    }
-}
-
-
 #ifndef apr_time_from_msec
 #define apr_time_from_msec(x) (x * 1000)
 #endif
@@ -356,7 +402,7 @@ timed_callback(void *baton)
 static int drive_serf(request_rec *r, serf_config_t *conf)
 {
     apr_status_t rv;
-    apr_pool_t *pool = r->pool;
+    apr_pool_t *pool;
     apr_sockaddr_t *address;
     s_baton_t *baton = apr_palloc(r->pool, sizeof(s_baton_t));
     /* XXXXX: make persistent/per-process or something.*/
@@ -366,7 +412,12 @@ static int drive_serf(request_rec *r, serf_config_t *conf)
     serf_server_config_t *ctx = 
         (serf_server_config_t *)ap_get_module_config(r->server->module_config,
                                                      &serf_module);
-    
+
+    /* Allocate everything out of a subpool, with a shorter lifetime than
+     * the main request, so that we can cleanup safely and remove our events
+     * from the main serf context in the async mpm mode.
+     */
+    apr_pool_create(&pool, r->pool);
     if (strcmp(conf->url.scheme, "cluster") == 0) {
         int rc;
         ap_serf_cluster_provider_t *cp;
@@ -449,10 +500,12 @@ static int drive_serf(request_rec *r, serf_config_t *conf)
 
     baton->r = r;
     baton->conf = conf;
+    baton->serf_pool = pool;
     baton->bkt_alloc = serf_bucket_allocator_create(pool, NULL, NULL);
     baton->ssl_ctx = NULL;
     baton->rstatus = OK;
 
+    baton->tmpbb = apr_brigade_create(r->pool, r->connection->bucket_alloc);
     baton->done_headers = 0;
     baton->keep_reading = 1;
 
@@ -472,14 +525,7 @@ static int drive_serf(request_rec *r, serf_config_t *conf)
                                               baton);
 
     if (mpm_supprts_serf) {
-
-        rv = ap_mpm_register_timed_callback(apr_time_from_msec(100), timed_callback, baton);
-        
-        if (rv != APR_SUCCESS) {
-            ap_log_rerror(APLOG_MARK, APLOG_ERR, rv, r, "ap_mpm_register_timed_callback failed.");
-            return HTTP_INTERNAL_SERVER_ERROR;       
-        }
-
+        ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, baton->r, "handing off serf request to mpm");
         return SUSPENDED;
     }
     else {