From: Daniel Stenberg Date: Tue, 8 Jan 2019 13:24:15 +0000 (+0100) Subject: multi: multiplexing improvements X-Git-Tag: curl-7_64_0~60 X-Git-Url: https://granicus.if.org/sourcecode?a=commitdiff_plain;h=4c35574bb785ce;p=curl multi: multiplexing improvements Fixes #3436 Closes #3448 Problem 1 After LOTS of scratching my head, I eventually realized that even when doing 10 uploads in parallel, sometimes the socket callback to the application that tells it what to wait for on the socket, looked like it would reflect the status of just the single transfer that just changed state. Digging into the code revealed that this was indeed the truth. When multiple transfers are using the same connection, the application did not correctly get the *combined* flags for all transfers which then could make it switch to READ (only) when in fact most transfers wanted to get told when the socket was WRITEABLE. Problem 1b A separate but related regression had also been introduced by me when I cleared connection/transfer association better a while ago, as now the logic couldn't find the connection and see if that was marked as used by more transfers and then it would also prematurely remove the socket from the socket hash table even in times other transfers were still using it! Fix 1 Make sure that each socket stored in the socket hash has a "combined" action field of what to ask the application to wait for, that is potentially the ORed action of multiple parallel transfers. And remove that socket hash entry only if there are no transfers left using it. Problem 2 The socket hash entry stored an association to a single transfer using that socket - and when curl_multi_socket_action() was called to tell libcurl about activities on that specific socket only that transfer was "handled". This was WRONG, as a single socket/connection can be used by numerous parallel transfers and not necessarily a single one. Fix 2 We now store a list of handles in the socket hashtable entry and when libcurl is told there's traffic for a particular socket, it now iterates over all known transfers using that single socket. --- diff --git a/lib/multi.c b/lib/multi.c index 249280f2b..21a6cab6e 100644 --- a/lib/multi.c +++ b/lib/multi.c @@ -189,14 +189,17 @@ static void mstate(struct Curl_easy *data, CURLMstate state #endif /* - * We add one of these structs to the sockhash for a particular socket + * We add one of these structs to the sockhash for each socket */ struct Curl_sh_entry { - struct Curl_easy *easy; - int action; /* what action READ/WRITE this socket waits for */ - curl_socket_t socket; /* mainly to ease debugging */ + struct curl_llist list; /* list of easy handles using this socket */ + unsigned int action; /* what combined action READ/WRITE this socket waits + for */ void *socketp; /* settable by users with curl_multi_assign() */ + unsigned int users; /* number of transfers using this */ + unsigned int readers; /* this many transfers want to read */ + unsigned int writers; /* this many transfers want to write */ }; /* bits for 'action' having no bits means this socket is not expecting any action */ @@ -215,8 +218,7 @@ static struct Curl_sh_entry *sh_getentry(struct curl_hash *sh, /* make sure this socket is present in the hash for this handle */ static struct Curl_sh_entry *sh_addentry(struct curl_hash *sh, - curl_socket_t s, - struct Curl_easy *data) + curl_socket_t s) { struct Curl_sh_entry *there = sh_getentry(sh, s); struct Curl_sh_entry *check; @@ -230,8 +232,7 @@ static struct Curl_sh_entry *sh_addentry(struct curl_hash *sh, if(!check) return NULL; /* major failure */ - check->easy = data; - check->socket = s; + Curl_llist_init(&check->list, NULL); /* make/add new hash entry */ if(!Curl_hash_add(sh, (char *)&s, sizeof(curl_socket_t), check)) { @@ -2354,6 +2355,9 @@ static CURLMcode singlesocket(struct Curl_multi *multi, curl_socket_t s; int num; unsigned int curraction; + int actions[MAX_SOCKSPEREASYHANDLE]; + unsigned int comboaction; + bool sincebefore = FALSE; for(i = 0; i< MAX_SOCKSPEREASYHANDLE; i++) socks[i] = CURL_SOCKET_BAD; @@ -2370,7 +2374,8 @@ static CURLMcode singlesocket(struct Curl_multi *multi, for(i = 0; (i< MAX_SOCKSPEREASYHANDLE) && (curraction & (GETSOCK_READSOCK(i) | GETSOCK_WRITESOCK(i))); i++) { - int action = CURL_POLL_NONE; + unsigned int action = CURL_POLL_NONE; + unsigned int prevaction = 0; s = socks[i]; @@ -2382,29 +2387,72 @@ static CURLMcode singlesocket(struct Curl_multi *multi, if(curraction & GETSOCK_WRITESOCK(i)) action |= CURL_POLL_OUT; + actions[i] = action; + + comboaction = action; if(entry) { - /* yeps, already present so check if it has the same action set */ - if(entry->action == action) - /* same, continue */ - continue; + /* check if new for this transfer */ + for(i = 0; i< data->numsocks; i++) { + if(s == data->sockets[i]) { + prevaction = data->actions[i]; + sincebefore = TRUE; + break; + } + } + } else { - /* this is a socket we didn't have before, add it! */ - entry = sh_addentry(&multi->sockhash, s, data); + /* this is a socket we didn't have before, add it to the hash! */ + entry = sh_addentry(&multi->sockhash, s); if(!entry) /* fatal */ return CURLM_OUT_OF_MEMORY; } + if(sincebefore && (prevaction != action)) { + /* Socket was used already, but different action now */ + if(prevaction & CURL_POLL_IN) + entry->readers--; + if(prevaction & CURL_POLL_OUT) + entry->writers--; + if(action & CURL_POLL_IN) + entry->readers++; + if(action & CURL_POLL_OUT) + entry->writers++; + } + else if(!sincebefore) { + /* a new user */ + entry->users++; + if(action & CURL_POLL_IN) + entry->readers++; + if(action & CURL_POLL_OUT) + entry->writers++; + + /* add 'data' to the list of handles using this socket! */ + Curl_llist_insert_next(&entry->list, entry->list.tail, + data, &data->sh_queue); + } + + comboaction = (entry->writers? CURL_POLL_OUT : 0) | + (entry->readers ? CURL_POLL_IN : 0); + +#if 0 + infof(data, "--- Comboaction: %u readers %u writers\n", + entry->readers, entry->writers); +#endif + /* check if it has the same action set */ + if(entry->action == comboaction) + /* same, continue */ + continue; /* we know (entry != NULL) at this point, see the logic above */ if(multi->socket_cb) multi->socket_cb(data, s, - action, + comboaction, multi->socket_userp, entry->socketp); - entry->action = action; /* store the current action state */ + entry->action = comboaction; /* store the current action state */ } num = i; /* number of sockets */ @@ -2413,73 +2461,45 @@ static CURLMcode singlesocket(struct Curl_multi *multi, make sure to detect sockets that are removed */ for(i = 0; i< data->numsocks; i++) { int j; + bool stillused = FALSE; s = data->sockets[i]; - for(j = 0; jsockhash, s); + /* if this is NULL here, the socket has been closed and notified so + already by Curl_multi_closed() */ if(entry) { - /* this socket has been removed. Tell the app to remove it */ - bool remove_sock_from_hash = TRUE; - - /* check if the socket to be removed serves a connection which has - other easy-s in a pipeline. In this case the socket should not be - removed. */ - struct connectdata *easy_conn = data->easy_conn; - if(easy_conn) { - if(easy_conn->recv_pipe.size > 1) { - /* the handle should not be removed from the pipe yet */ - remove_sock_from_hash = FALSE; - - /* Update the sockhash entry to instead point to the next in line - for the recv_pipe, or the first (in case this particular easy - isn't already) */ - if(entry->easy == data) { - if(Curl_recvpipe_head(data, easy_conn)) - entry->easy = easy_conn->recv_pipe.head->next->ptr; - else - entry->easy = easy_conn->recv_pipe.head->ptr; - } - } - if(easy_conn->send_pipe.size > 1) { - /* the handle should not be removed from the pipe yet */ - remove_sock_from_hash = FALSE; - - /* Update the sockhash entry to instead point to the next in line - for the send_pipe, or the first (in case this particular easy - isn't already) */ - if(entry->easy == data) { - if(Curl_sendpipe_head(data, easy_conn)) - entry->easy = easy_conn->send_pipe.head->next->ptr; - else - entry->easy = easy_conn->send_pipe.head->ptr; - } - } - /* Don't worry about overwriting recv_pipe head with send_pipe_head, - when action will be asked on the socket (see multi_socket()), the - head of the correct pipe will be taken according to the - action. */ - } - - if(remove_sock_from_hash) { - /* in this case 'entry' is always non-NULL */ + int oldactions = data->actions[i]; + /* this socket has been removed. Decrease user count */ + entry->users--; + if(oldactions & CURL_POLL_OUT) + entry->writers--; + if(oldactions & CURL_POLL_IN) + entry->readers--; + if(!entry->users) { if(multi->socket_cb) - multi->socket_cb(data, - s, - CURL_POLL_REMOVE, + multi->socket_cb(data, s, CURL_POLL_REMOVE, multi->socket_userp, entry->socketp); sh_delentry(&multi->sockhash, s); } - } /* if sockhash entry existed */ + else { + /* remove this transfer as a user of this socket */ + Curl_llist_remove(&entry->list, &data->sh_queue, NULL); + } + } } /* for loop over numsocks */ memcpy(data->sockets, socks, num*sizeof(curl_socket_t)); + memcpy(data->actions, actions, num*sizeof(int)); data->numsocks = num; return CURLM_OK; } @@ -2619,46 +2639,52 @@ static CURLMcode multi_socket(struct Curl_multi *multi, and just move on. */ ; else { + struct curl_llist *list = &entry->list; + struct curl_llist_element *e; SIGPIPE_VARIABLE(pipe_st); - data = entry->easy; + /* the socket can be shared by many transfers, iterate */ + for(e = list->head; e; e = e->next) { + data = (struct Curl_easy *)e->ptr; - if(data->magic != CURLEASY_MAGIC_NUMBER) - /* bad bad bad bad bad bad bad */ - return CURLM_INTERNAL_ERROR; - - /* If the pipeline is enabled, take the handle which is in the head of - the pipeline. If we should write into the socket, take the send_pipe - head. If we should read from the socket, take the recv_pipe head. */ - if(data->easy_conn) { - if((ev_bitmask & CURL_POLL_OUT) && - data->easy_conn->send_pipe.head) - data = data->easy_conn->send_pipe.head->ptr; - else if((ev_bitmask & CURL_POLL_IN) && - data->easy_conn->recv_pipe.head) - data = data->easy_conn->recv_pipe.head->ptr; - } - - if(data->easy_conn && - !(data->easy_conn->handler->flags & PROTOPT_DIRLOCK)) - /* set socket event bitmask if they're not locked */ - data->easy_conn->cselect_bits = ev_bitmask; - - sigpipe_ignore(data, &pipe_st); - result = multi_runsingle(multi, now, data); - sigpipe_restore(&pipe_st); + if(data->magic != CURLEASY_MAGIC_NUMBER) + /* bad bad bad bad bad bad bad */ + return CURLM_INTERNAL_ERROR; - if(data->easy_conn && - !(data->easy_conn->handler->flags & PROTOPT_DIRLOCK)) - /* clear the bitmask only if not locked */ - data->easy_conn->cselect_bits = 0; + /* If the pipeline is enabled, take the handle which is in the head of + the pipeline. If we should write into the socket, take the + send_pipe head. If we should read from the socket, take the + recv_pipe head. */ + if(data->easy_conn) { + if((ev_bitmask & CURL_POLL_OUT) && + data->easy_conn->send_pipe.head) + data = data->easy_conn->send_pipe.head->ptr; + else if((ev_bitmask & CURL_POLL_IN) && + data->easy_conn->recv_pipe.head) + data = data->easy_conn->recv_pipe.head->ptr; + } - if(CURLM_OK >= result) { - /* get the socket(s) and check if the state has been changed since - last */ - result = singlesocket(multi, data); - if(result) - return result; + if(data->easy_conn && + !(data->easy_conn->handler->flags & PROTOPT_DIRLOCK)) + /* set socket event bitmask if they're not locked */ + data->easy_conn->cselect_bits = ev_bitmask; + + sigpipe_ignore(data, &pipe_st); + result = multi_runsingle(multi, now, data); + sigpipe_restore(&pipe_st); + + if(data->easy_conn && + !(data->easy_conn->handler->flags & PROTOPT_DIRLOCK)) + /* clear the bitmask only if not locked */ + data->easy_conn->cselect_bits = 0; + + if(CURLM_OK >= result) { + /* get the socket(s) and check if the state has been changed since + last */ + result = singlesocket(multi, data); + if(result) + return result; + } } /* Now we fall-through and do the timer-based stuff, since we don't want @@ -3002,6 +3028,9 @@ void Curl_expire(struct Curl_easy *data, time_t milli, expire_id id) DEBUGASSERT(id < EXPIRE_LAST); + infof(data, "Expire in %ld ms for %x (transfer %p)\n", + (long)milli, id, data); + set = Curl_now(); set.tv_sec += milli/1000; set.tv_usec += (unsigned int)(milli%1000)*1000; @@ -3093,7 +3122,7 @@ void Curl_expire_clear(struct Curl_easy *data) } #ifdef DEBUGBUILD - infof(data, "Expire cleared\n"); + infof(data, "Expire cleared (transfer %p)\n", data); #endif nowp->tv_sec = 0; nowp->tv_usec = 0; diff --git a/lib/urldata.h b/lib/urldata.h index 11bbbc03e..177c39850 100644 --- a/lib/urldata.h +++ b/lib/urldata.h @@ -7,7 +7,7 @@ * | (__| |_| | _ <| |___ * \___|\___/|_| \_\_____| * - * Copyright (C) 1998 - 2018, Daniel Stenberg, , et al. + * Copyright (C) 1998 - 2019, Daniel Stenberg, , et al. * * This software is licensed as described in the file COPYING, which * you should have received as part of this distribution. The terms @@ -1780,6 +1780,7 @@ struct Curl_easy { struct connectdata *easy_conn; /* the "unit's" connection */ struct curl_llist_element connect_queue; struct curl_llist_element pipeline_queue; + struct curl_llist_element sh_queue; /* list per Curl_sh_entry */ CURLMstate mstate; /* the handle's state */ CURLcode result; /* previous result */ @@ -1791,6 +1792,8 @@ struct Curl_easy { the state etc are also kept. This array is mostly used to detect when a socket is to be removed from the hash. See singlesocket(). */ curl_socket_t sockets[MAX_SOCKSPEREASYHANDLE]; + int actions[MAX_SOCKSPEREASYHANDLE]; /* action for each socket in + sockets[] */ int numsocks; struct Names dns;