]> granicus.if.org Git - curl/commitdiff
http2: force "drainage" of streams
authorDaniel Stenberg <daniel@haxx.se>
Thu, 30 Apr 2015 06:20:49 +0000 (08:20 +0200)
committerDaniel Stenberg <daniel@haxx.se>
Mon, 18 May 2015 06:57:17 +0000 (08:57 +0200)
... which is necessary since the socket won't be readable but there is
data waiting in the buffer.

lib/http.c
lib/http.h
lib/http2.c
lib/multi.c
lib/transfer.c
lib/url.c
lib/urldata.h

index 7aa258d40c8c105f8958d3e8e66d9ac9d9eb9f31..cf2ee1c373bd69b8075f53b5ac38b2ea8a424ec0 100644 (file)
@@ -174,6 +174,7 @@ CURLcode Curl_http_setup_conn(struct connectdata *conn)
   /* where to store incoming data for this stream and how big the buffer is */
   http->mem = conn->data->state.buffer;
   http->len = BUFSIZE;
+  http->memlen = 0;
 
   return CURLE_OK;
 }
index 50db486d28ad8bfa755ecbe46f02b57bc2cbcc68..a2f702d024da948e614a1cac23ca0f47e17329ca 100644 (file)
@@ -169,6 +169,7 @@ struct HTTP {
 
   char *mem;     /* points to a buffer in memory to store received data */
   size_t len;    /* size of the buffer 'mem' points to */
+  size_t memlen; /* size of data copied to mem */
 };
 
 typedef int (*sending)(void); /* Curl_send */
index ca8299fd2485ad46cb704d02ae95705c293dbccc..42f31290f2b30a30d360f123273d9d9c7d593fd6 100644 (file)
@@ -195,8 +195,8 @@ static int on_frame_recv(nghttp2_session *session, const nghttp2_frame *frame,
 
   (void)session;
   (void)frame;
-  DEBUGF(infof(conn->data, "on_frame_recv() was called with header %x\n",
-               frame->hd.type));
+  DEBUGF(infof(conn->data, "on_frame_recv() header %x stream %x\n",
+               frame->hd.type, stream_id));
 
   if(stream_id) {
     /* get the stream from the hash based on Stream ID, stream ID zero is for
@@ -267,14 +267,25 @@ static int on_frame_recv(nghttp2_session *session, const nghttp2_frame *frame,
     left = stream->header_recvbuf->size_used - stream->nread_header_recvbuf;
     ncopy = MIN(stream->len, left);
 
-    memcpy(stream->mem, stream->header_recvbuf->buffer +
-           stream->nread_header_recvbuf, ncopy);
+    memcpy(&stream->mem[stream->memlen],
+           stream->header_recvbuf->buffer + stream->nread_header_recvbuf,
+           ncopy);
     stream->nread_header_recvbuf += ncopy;
 
-    stream->mem += ncopy;
+    DEBUGF(infof(data_s, "Store %zu bytes headers from stream %x at %p\n",
+                 ncopy, stream_id, stream->mem));
+
     stream->len -= ncopy;
+    stream->memlen += ncopy;
+
+    stream->mem[stream->memlen] = 0; /* DEBUG, remove this */
+
+    DEBUGF(infof(data_s, "BUF: %s", stream->mem));
+
+    data_s->state.drain++;
     break;
   case NGHTTP2_PUSH_PROMISE:
+    DEBUGF(infof(data_s, "Got PUSH_PROMISE, RST_STREAM it!\n"));
     rv = nghttp2_submit_rst_stream(session, NGHTTP2_FLAG_NONE,
                                    frame->push_promise.promised_stream_id,
                                    NGHTTP2_CANCEL);
@@ -328,14 +339,19 @@ static int on_data_chunk_recv(nghttp2_session *session, uint8_t flags,
   stream = data_s->req.protop;
 
   nread = MIN(stream->len, len);
-  memcpy(stream->mem, data, nread);
+  memcpy(&stream->mem[stream->memlen], data, nread);
 
-  stream->mem += nread;
   stream->len -= nread;
+  stream->memlen += nread;
+
+  data_s->state.drain++;
+  /* TODO: this may need to set expire for the multi_socket to work for this
+     stream */
 
   DEBUGF(infof(conn->data, "%zu data received for stream %x "
-               "(%zu left in buffer)\n",
-               nread, stream_id, stream->len));
+               "(%zu left in buffer %p)\n",
+               nread, stream_id,
+               stream->len, stream->mem));
 
   if(nread < len) {
     stream->data = data + nread;
@@ -528,7 +544,10 @@ static int on_header(nghttp2_session *session, const nghttp2_frame *frame,
     Curl_add_buffer(stream->header_recvbuf, "HTTP/2.0 ", 9);
     Curl_add_buffer(stream->header_recvbuf, value, valuelen);
     Curl_add_buffer(stream->header_recvbuf, "\r\n", 2);
+    data_s->state.drain++;
 
+    DEBUGF(infof(data_s, "h2 status: HTTP/2 %03d\n",
+                 stream->status_code));
     return 0;
   }
   else {
@@ -550,6 +569,7 @@ static int on_header(nghttp2_session *session, const nghttp2_frame *frame,
     Curl_add_buffer(stream->header_recvbuf, ":", 1);
     Curl_add_buffer(stream->header_recvbuf, value, valuelen);
     Curl_add_buffer(stream->header_recvbuf, "\r\n", 2);
+    data_s->state.drain++;
 
     DEBUGF(infof(data_s, "h2 header: %.*s: %.*s\n",
                  namelen, name, valuelen, value));
@@ -737,7 +757,8 @@ static ssize_t http2_recv(struct connectdata *conn, int sockindex,
   ssize_t rv;
   ssize_t nread;
   struct http_conn *httpc = &conn->proto.httpc;
-  struct HTTP *stream = conn->data->req.protop;
+  struct SessionHandle *data = conn->data;
+  struct HTTP *stream = data->req.protop;
 
   (void)sockindex; /* we always do HTTP2 on sockindex 0 */
 
@@ -768,7 +789,7 @@ static ssize_t http2_recv(struct connectdata *conn, int sockindex,
            ncopy);
     stream->nread_header_recvbuf += ncopy;
 
-    infof(conn->data, "http2_recv: Got %d bytes from header_recvbuf\n",
+    infof(data, "http2_recv: Got %d bytes from header_recvbuf\n",
           (int)ncopy);
     return ncopy;
   }
@@ -780,66 +801,83 @@ static ssize_t http2_recv(struct connectdata *conn, int sockindex,
     stream->data += nread;
     stream->datalen -= nread;
 
-    infof(conn->data, "%zu data bytes written\n", nread);
+    infof(data, "%zu data bytes written\n", nread);
     if(stream->datalen == 0) {
       stream->data = NULL;
       stream->datalen = 0;
     }
-    infof(conn->data, "http2_recv: Got %d bytes from stream->data\n",
+    infof(data, "http2_recv: Got %d bytes from stream->data\n",
           (int)nread);
     return nread;
   }
 
-  /* remember where to store incoming data for this stream and how big the
-     buffer is */
-  stream->mem = mem;
-  stream->len = len;
-
-  infof(conn->data, "http2_recv: %d bytes buffer\n", stream->len);
-
-  nread = ((Curl_recv*)httpc->recv_underlying)(conn, FIRSTSOCKET,
-                                               httpc->inbuf, H2_BUFSIZE,
-                                               &result);
-  if(result == CURLE_AGAIN) {
-    *err = result;
-    return -1;
+  if(data->state.drain) {
+    DEBUGF(infof(data, "http2_recv: DRAIN %zu bytes stream %x!! (%p => %p)\n",
+                 stream->memlen, stream->stream_id,
+                 stream->mem, mem));
+    if(mem != stream->mem) {
+      /* if we didn't get the same buffer this time, we must move the data to
+         the beginning */
+      memmove(mem, stream->mem, stream->memlen);
+      stream->len = len - stream->memlen;
+      stream->mem = mem;
+    }
   }
+  else {
+    /* remember where to store incoming data for this stream and how big the
+       buffer is */
+    stream->mem = mem;
+    stream->len = len;
+    stream->memlen = 0;
+
+    infof(data, "http2_recv: %d bytes buffer (stream %x)\n",
+          stream->len, stream->stream_id);
+
+    nread = ((Curl_recv*)httpc->recv_underlying)(conn, FIRSTSOCKET,
+                                                 httpc->inbuf, H2_BUFSIZE,
+                                                 &result);
+    if(result == CURLE_AGAIN) {
+      *err = result;
+      return -1;
+    }
 
-  if(nread == -1) {
-    failf(conn->data, "Failed receiving HTTP2 data");
-    *err = result;
-    return 0;
-  }
+    if(nread == -1) {
+      failf(data, "Failed receiving HTTP2 data");
+      *err = result;
+      return 0;
+    }
 
-  if(nread == 0) {
-    failf(conn->data, "Unexpected EOF");
-    *err = CURLE_RECV_ERROR;
-    return -1;
-  }
+    if(nread == 0) {
+      failf(data, "Unexpected EOF");
+      *err = CURLE_RECV_ERROR;
+      return -1;
+    }
 
-  DEBUGF(infof(conn->data, "nread=%zd\n", nread));
+    DEBUGF(infof(data, "nread=%zd\n", nread));
 
-  rv = nghttp2_session_mem_recv(httpc->h2,
-                                (const uint8_t *)httpc->inbuf, nread);
+    rv = nghttp2_session_mem_recv(httpc->h2,
+                                  (const uint8_t *)httpc->inbuf, nread);
 
-  if(nghttp2_is_fatal((int)rv)) {
-    failf(conn->data, "nghttp2_session_mem_recv() returned %d:%s\n",
-          rv, nghttp2_strerror((int)rv));
-    *err = CURLE_RECV_ERROR;
-    return 0;
-  }
-  DEBUGF(infof(conn->data, "nghttp2_session_mem_recv() returns %zd\n", rv));
-  /* Always send pending frames in nghttp2 session, because
-     nghttp2_session_mem_recv() may queue new frame */
-  rv = nghttp2_session_send(httpc->h2);
-  if(rv != 0) {
-    *err = CURLE_SEND_ERROR;
-    return 0;
+    if(nghttp2_is_fatal((int)rv)) {
+      failf(data, "nghttp2_session_mem_recv() returned %d:%s\n",
+            rv, nghttp2_strerror((int)rv));
+      *err = CURLE_RECV_ERROR;
+      return 0;
+    }
+    DEBUGF(infof(data, "nghttp2_session_mem_recv() returns %zd\n", rv));
+    /* Always send pending frames in nghttp2 session, because
+       nghttp2_session_mem_recv() may queue new frame */
+    rv = nghttp2_session_send(httpc->h2);
+    if(rv != 0) {
+      *err = CURLE_SEND_ERROR;
+      return 0;
+    }
   }
   if(len != stream->len) {
-    infof(conn->data, "http2_recv: returns %d for stream %x (%zu/%zu)\n",
+    infof(data, "http2_recv: returns %d for stream %x (%zu/%zu)\n",
           len - stream->len, stream->stream_id,
           len, stream->len);
+    data->state.drain = 0; /* this stream is hereby drained */
     return len - stream->len;
   }
   /* If stream is closed, return 0 to signal the http routine to close
@@ -849,17 +887,17 @@ static ssize_t http2_recv(struct connectdata *conn, int sockindex,
        function. */
     stream->closed = FALSE;
     if(stream->error_code != NGHTTP2_NO_ERROR) {
-      failf(conn->data,
+      failf(data,
             "HTTP/2 stream = %x was not closed cleanly: error_code = %d",
             stream->stream_id, stream->error_code);
       *err = CURLE_HTTP2;
       return -1;
     }
-    DEBUGF(infof(conn->data, "http2_recv returns 0\n"));
+    DEBUGF(infof(data, "http2_recv returns 0\n"));
     return 0;
   }
   *err = CURLE_AGAIN;
-  DEBUGF(infof(conn->data, "http2_recv returns -1, AGAIN\n"));
+  DEBUGF(infof(data, "http2_recv returns -1, AGAIN\n"));
   return -1;
 }
 
index 39d40a9287bfa2c484070d99ee9a864a5464bfa5..a095238d59b9f6742fd92ec28911ddeb49d07d41 100644 (file)
@@ -1500,6 +1500,8 @@ static CURLMcode multi_runsingle(struct Curl_multi *multi,
         break;
       }
 
+      DEBUGF(infof(data, "%s:%d call Curl_readwrite\n", __func__, __LINE__));
+
       /* read/write data if it is ready to do so */
       result = Curl_readwrite(data->easy_conn, &done);
 
index ee7c3721600dc94c8f508c4838789c89823eef5c..8ac91664c52e1f285cedd873c0f2d6dd440bf71d 100644 (file)
@@ -1046,6 +1046,11 @@ CURLcode Curl_readwrite(struct connectdata *conn,
   else
     fd_write = CURL_SOCKET_BAD;
 
+  if(conn->data->state.drain) {
+    select_res |= CURL_CSELECT_IN;
+    DEBUGF(infof(data, "%s: forcibly told to drain data\n", __func__));
+  }
+
   if(!select_res) /* Call for select()/poll() only, if read/write/error
                      status is not known. */
     select_res = Curl_socket_ready(fd_read, fd_write, 0);
@@ -1055,6 +1060,9 @@ CURLcode Curl_readwrite(struct connectdata *conn,
     return CURLE_SEND_ERROR;
   }
 
+  DEBUGF(infof(data, "%s: keepon: %x select_res %x\n", __func__, k->keepon,
+               select_res));
+
   /* We go ahead and do a read if we have a readable socket or if
      the stream was rewound (in which case we have data in a
      buffer) */
index 49b55d89d16a958fef05f6454b7b056c1d55f928..688dd9ea0f8066045546fe44323c8d784d50da95 100644 (file)
--- a/lib/url.c
+++ b/lib/url.c
@@ -2892,6 +2892,16 @@ void Curl_getoff_all_pipelines(struct SessionHandle *data,
     conn->readchannel_inuse = FALSE;
   if(Curl_removeHandleFromPipeline(data, conn->send_pipe) && send_head)
     conn->writechannel_inuse = FALSE;
+
+  if(conn->httpversion == 20) {
+    /* delete this handle from the stream hash */
+    struct HTTP *stream = data->req.protop;
+    if(stream && Curl_hash_delete(&conn->proto.httpc.streamsh,
+                                  &stream->stream_id,
+                                  sizeof(stream->stream_id))) {
+      infof(conn->data, "Failed to remove handle from h2 stream hash!!\n");
+    }
+  }
 }
 
 static void signalPipeClose(struct curl_llist *pipeline, bool pipe_broke)
@@ -5955,10 +5965,12 @@ CURLcode Curl_done(struct connectdata **connp,
 
   if((conn->send_pipe->size + conn->recv_pipe->size != 0 &&
       !data->set.reuse_forbid &&
-      !conn->bits.close))
+      !conn->bits.close)) {
     /* Stop if pipeline is not empty and we do not have to close
        connection. */
+    DEBUGF(infof(data, "Connection still in use, no more Curl_done now!\n"));
     return CURLE_OK;
+  }
 
   conn->bits.done = TRUE; /* called just now! */
 
index db8b1e7325233de988982632487044b8b2e0fcc0..25fb98fe5f6d7a452fe5a9d1e0164f2d91e534a7 100644 (file)
@@ -1309,6 +1309,10 @@ struct UrlState {
 
   curl_off_t infilesize; /* size of file to upload, -1 means unknown.
                             Copied from set.filesize at start of operation */
+
+  int drain; /* Increased when this stream has data to read, even if its
+                socket not necessarily is readable. Decreased when
+                checked. */
 };