From d426c20c0a71df7db7a8bae18c91034df502070a Mon Sep 17 00:00:00 2001 From: "Steinar H. Gunderson" Date: Fri, 28 Sep 2007 14:28:14 +0000 Subject: [PATCH] Three fixes in one commit (sorry): a) Take care of the tcpbuf if it ends while queued for transmission, note broken servers and close them in the main loop, and store TCP socket generation number in order not to send the same query twice over the same socket. --- ares/ares__close_sockets.c | 6 ++ ares/ares_cancel.c | 2 +- ares/ares_destroy.c | 4 +- ares/ares_init.c | 3 + ares/ares_private.h | 28 ++++++++- ares/ares_process.c | 113 ++++++++++++++++++++++++++++++++++--- ares/ares_send.c | 10 +++- 7 files changed, 150 insertions(+), 16 deletions(-) diff --git a/ares/ares__close_sockets.c b/ares/ares__close_sockets.c index 64eccdf76..c2588663d 100644 --- a/ares/ares__close_sockets.c +++ b/ares/ares__close_sockets.c @@ -35,6 +35,8 @@ void ares__close_sockets(ares_channel channel, struct server_state *server) /* Advance server->qhead; pull out query as we go. */ sendreq = server->qhead; server->qhead = sendreq->next; + if (sendreq->data_storage != NULL) + free(sendreq->data_storage); free(sendreq); } server->qtail = NULL; @@ -45,12 +47,16 @@ void ares__close_sockets(ares_channel channel, struct server_state *server) server->tcp_buffer = NULL; server->tcp_lenbuf_pos = 0; + /* Reset brokenness */ + server->is_broken = 0; + /* Close the TCP and UDP sockets. */ if (server->tcp_socket != ARES_SOCKET_BAD) { SOCK_STATE_CALLBACK(channel, server->tcp_socket, 0, 0); closesocket(server->tcp_socket); server->tcp_socket = ARES_SOCKET_BAD; + server->tcp_connection_generation = ++channel->tcp_connection_generation; } if (server->udp_socket != ARES_SOCKET_BAD) { diff --git a/ares/ares_cancel.c b/ares/ares_cancel.c index 9641dab20..65f86b966 100644 --- a/ares/ares_cancel.c +++ b/ares/ares_cancel.c @@ -33,7 +33,7 @@ void ares_cancel(ares_channel channel) next = query->next; query->callback(query->arg, ARES_ETIMEOUT, NULL, 0); free(query->tcpbuf); - free(query->skip_server); + free(query->server_info); free(query); } channel->queries = NULL; diff --git a/ares/ares_destroy.c b/ares/ares_destroy.c index 8d9bdbc15..e844ea628 100644 --- a/ares/ares_destroy.c +++ b/ares/ares_destroy.c @@ -65,8 +65,8 @@ void ares_destroy(ares_channel channel) query->callback(query->arg, ARES_EDESTRUCTION, NULL, 0); if (query->tcpbuf) free(query->tcpbuf); - if (query->skip_server) - free(query->skip_server); + if (query->server_info) + free(query->server_info); free(query); } diff --git a/ares/ares_init.c b/ares/ares_init.c index add3ffe0d..298dd7b90 100644 --- a/ares/ares_init.c +++ b/ares/ares_init.c @@ -136,6 +136,7 @@ int ares_init_options(ares_channel *channelptr, struct ares_options *options, channel->nservers = -1; channel->ndomains = -1; channel->nsort = -1; + channel->tcp_connection_generation = 0; channel->lookups = NULL; channel->queries = NULL; channel->domains = NULL; @@ -201,10 +202,12 @@ int ares_init_options(ares_channel *channelptr, struct ares_options *options, server = &channel->servers[i]; server->udp_socket = ARES_SOCKET_BAD; server->tcp_socket = ARES_SOCKET_BAD; + server->tcp_connection_generation = ++channel->tcp_connection_generation; server->tcp_lenbuf_pos = 0; server->tcp_buffer = NULL; server->qhead = NULL; server->qtail = NULL; + server->is_broken = 0; } init_id_key(&channel->id_key, ARES_ID_KEY_LEN); diff --git a/ares/ares_private.h b/ares/ares_private.h index f0314515c..dd9070aa7 100644 --- a/ares/ares_private.h +++ b/ares/ares_private.h @@ -89,6 +89,11 @@ struct send_request { const unsigned char *data; size_t len; + /* The query for which we're sending this data */ + struct query* owner_query; + /* The buffer we're using, if we have our own copy of the packet */ + unsigned char *data_storage; + /* Next request in queue */ struct send_request *next; }; @@ -110,6 +115,17 @@ struct server_state { /* TCP output queue */ struct send_request *qhead; struct send_request *qtail; + + /* Which incarnation of this connection is this? We don't want to + * retransmit requests into the very same socket, but if the server + * closes on us and we re-open the connection, then we do want to + * re-send. */ + int tcp_connection_generation; + + /* Is this server broken? We mark connections as broken when a + * request that is queued for sending times out. + */ + int is_broken; }; struct query { @@ -130,7 +146,7 @@ struct query { /* Query status */ int try; int server; - int *skip_server; + struct query_server_info *server_info; /* per-server state */ int using_tcp; int error_status; @@ -138,6 +154,12 @@ struct query { struct query *next; }; +/* Per-server state for a query */ +struct query_server_info { + int skip_server; /* should we skip server, due to errors, etc? */ + int tcp_connection_generation; /* into which TCP connection did we send? */ +}; + /* An IP address pattern; matches an IP address X if X & mask == addr */ #define PATTERN_MASK 0x1 #define PATTERN_CIDR 0x2 @@ -188,6 +210,9 @@ struct ares_channeldata { /* key to use when generating new ids */ rc4_key id_key; + /* Generation number to use for the next TCP socket open/close */ + int tcp_connection_generation; + /* Active queries */ struct query *queries; @@ -220,4 +245,3 @@ short ares__generate_new_id(rc4_key* key); #endif #endif /* __ARES_PRIVATE_H */ - diff --git a/ares/ares_process.c b/ares/ares_process.c index 1b5802994..15aa06e1c 100644 --- a/ares/ares_process.c +++ b/ares/ares_process.c @@ -62,6 +62,7 @@ static void read_tcp_data(ares_channel channel, fd_set *read_fds, static void read_udp_packets(ares_channel channel, fd_set *read_fds, ares_socket_t read_fd, time_t now); static void process_timeouts(ares_channel channel, time_t now); +static void process_broken_connections(ares_channel channel, time_t now); static void process_answer(ares_channel channel, unsigned char *abuf, int alen, int whichserver, int tcp, time_t now); static void handle_error(ares_channel channel, int whichserver, time_t now); @@ -87,6 +88,7 @@ void ares_process(ares_channel channel, fd_set *read_fds, fd_set *write_fds) read_tcp_data(channel, read_fds, ARES_SOCKET_BAD, now); read_udp_packets(channel, read_fds, ARES_SOCKET_BAD, now); process_timeouts(channel, now); + process_broken_connections(channel, now); } /* Something interesting happened on the wire, or there was a timeout. @@ -157,7 +159,7 @@ static void write_tcp_data(ares_channel channel, /* Make sure server has data to send and is selected in write_fds or write_fd. */ server = &channel->servers[i]; - if (!server->qhead || server->tcp_socket == ARES_SOCKET_BAD) + if (!server->qhead || server->tcp_socket == ARES_SOCKET_BAD || server->is_broken) continue; if(write_fds) { @@ -216,6 +218,8 @@ static void write_tcp_data(ares_channel channel, SOCK_STATE_CALLBACK(channel, server->tcp_socket, 1, 0); server->qtail = NULL; } + if (sendreq->data_storage != NULL) + free(sendreq->data_storage); free(sendreq); } else @@ -248,6 +252,8 @@ static void write_tcp_data(ares_channel channel, SOCK_STATE_CALLBACK(channel, server->tcp_socket, 1, 0); server->qtail = NULL; } + if (sendreq->data_storage != NULL) + free(sendreq->data_storage); free(sendreq); } else @@ -278,7 +284,7 @@ static void read_tcp_data(ares_channel channel, fd_set *read_fds, { /* Make sure the server has a socket and is selected in read_fds. */ server = &channel->servers[i]; - if (server->tcp_socket == ARES_SOCKET_BAD) + if (server->tcp_socket == ARES_SOCKET_BAD || server->is_broken) continue; if(read_fds) { @@ -376,7 +382,7 @@ static void read_udp_packets(ares_channel channel, fd_set *read_fds, /* Make sure the server has a socket and is selected in read_fds. */ server = &channel->servers[i]; - if (server->udp_socket == ARES_SOCKET_BAD) + if (server->udp_socket == ARES_SOCKET_BAD || server->is_broken) continue; if(read_fds) { @@ -492,6 +498,20 @@ static void process_answer(ares_channel channel, unsigned char *abuf, end_query(channel, query, ARES_SUCCESS, abuf, alen); } +/* Close all the connections that are no longer usable. */ +static void process_broken_connections(ares_channel channel, time_t now) +{ + int i; + for (i = 0; i < channel->nservers; i++) + { + struct server_state *server = &channel->servers[i]; + if (server->is_broken) + { + handle_error(channel, i, now); + } + } +} + static void handle_error(ares_channel channel, int whichserver, time_t now) { struct query *query, *next; @@ -526,7 +546,7 @@ static void skip_server(ares_channel channel, struct query *query, */ if (channel->nservers > 1) { - query->skip_server[whichserver] = 1; + query->server_info[whichserver].skip_server = 1; } } @@ -538,10 +558,21 @@ static struct query *next_server(ares_channel channel, struct query *query, time { for (; query->server < channel->nservers; query->server++) { - if (!query->skip_server[query->server]) + struct server_state *server = &channel->servers[query->server]; + /* We don't want to use this server if (1) we decided this + * connection is broken, and thus about to be closed, (2) + * we've decided to skip this server because of earlier + * errors we encountered, or (3) we already sent this query + * over this exact connection. + */ + if (!server->is_broken && + !query->server_info[query->server].skip_server && + !(query->using_tcp && + (query->server_info[query->server].tcp_connection_generation == + server->tcp_connection_generation))) { - ares__send_query(channel, query, now); - return (query->next); + ares__send_query(channel, query, now); + return (query->next); } } query->server = 0; @@ -582,8 +613,16 @@ void ares__send_query(ares_channel channel, struct query *query, time_t now) end_query(channel, query, ARES_ENOMEM, NULL, 0); return; } + /* To make the common case fast, we avoid copies by using the + * query's tcpbuf for as long as the query is alive. In the rare + * case where the query ends while it's queued for transmission, + * then we give the sendreq its own copy of the request packet + * and put it in sendreq->data_storage. + */ + sendreq->data_storage = NULL; sendreq->data = query->tcpbuf; sendreq->len = query->tcplen; + sendreq->owner_query = query; sendreq->next = NULL; if (server->qtail) server->qtail->next = sendreq; @@ -594,6 +633,8 @@ void ares__send_query(ares_channel channel, struct query *query, time_t now) } server->qtail = sendreq; query->timeout = 0; + query->server_info[query->server].tcp_connection_generation = + server->tcp_connection_generation; } else { @@ -721,6 +762,7 @@ static int open_tcp_socket(ares_channel channel, struct server_state *server) SOCK_STATE_CALLBACK(channel, s, 1, 0); server->tcp_buffer_pos = 0; server->tcp_socket = s; + server->tcp_connection_generation = ++channel->tcp_connection_generation; return 0; } @@ -839,6 +881,61 @@ static struct query *end_query (ares_channel channel, struct query *query, int s struct query **q, *next; int i; + /* First we check to see if this query ended while one of our send + * queues still has pointers to it. + */ + for (i = 0; i < channel->nservers; i++) + { + struct server_state *server = &channel->servers[i]; + struct send_request *sendreq; + for (sendreq = server->qhead; sendreq; sendreq = sendreq->next) + if (sendreq->owner_query == query) + { + sendreq->owner_query = NULL; + assert(sendreq->data_storage == NULL); + if (status == ARES_SUCCESS) + { + /* We got a reply for this query, but this queued + * sendreq points into this soon-to-be-gone query's + * tcpbuf. Probably this means we timed out and queued + * the query for retransmission, then received a + * response before actually retransmitting. This is + * perfectly fine, so we want to keep the connection + * running smoothly if we can. But in the worst case + * we may have sent only some prefix of the query, + * with some suffix of the query left to send. Also, + * the buffer may be queued on multiple queues. To + * prevent dangling pointers to the query's tcpbuf and + * handle these cases, we just give such sendreqs + * their own copy of the query packet. + */ + sendreq->data_storage = malloc(sendreq->len); + if (sendreq->data_storage != NULL) + { + memcpy(sendreq->data_storage, sendreq->data, sendreq->len); + sendreq->data = sendreq->data_storage; + } + } + if ((status != ARES_SUCCESS) || (sendreq->data_storage == NULL)) + { + /* We encountered an error (probably a timeout, + * suggesting the DNS server we're talking to is + * probably unreachable, wedged, or severely + * overloaded) or we couldn't copy the request, so + * mark the connection as broken. When we get to + * process_broken_connections() we'll close the + * connection and try to re-send requests to another + * server. + */ + server->is_broken = 1; + /* Just to be paranoid, zero out this sendreq... */ + sendreq->data = NULL; + sendreq->len = 0; + } + } + } + + /* Invoke the callback */ query->callback(query->arg, status, abuf, alen); for (q = &channel->queries; *q; q = &(*q)->next) { @@ -851,7 +948,7 @@ static struct query *end_query (ares_channel channel, struct query *query, int s else next = NULL; free(query->tcpbuf); - free(query->skip_server); + free(query->server_info); free(query); /* Simple cleanup policy: if no queries are remaining, close all diff --git a/ares/ares_send.c b/ares/ares_send.c index 7f4362c7c..fd1450b4a 100644 --- a/ares/ares_send.c +++ b/ares/ares_send.c @@ -62,8 +62,9 @@ void ares_send(ares_channel channel, const unsigned char *qbuf, int qlen, callback(arg, ARES_ENOMEM, NULL, 0); return; } - query->skip_server = malloc(channel->nservers * sizeof(int)); - if (!query->skip_server) + query->server_info = malloc(channel->nservers * + sizeof(query->server_info[0])); + if (!query->server_info) { free(query->tcpbuf); free(query); @@ -93,7 +94,10 @@ void ares_send(ares_channel channel, const unsigned char *qbuf, int qlen, query->try = 0; query->server = 0; for (i = 0; i < channel->nservers; i++) - query->skip_server[i] = 0; + { + query->server_info[i].skip_server = 0; + query->server_info[i].tcp_connection_generation = 0; + } query->using_tcp = (channel->flags & ARES_FLAG_USEVC) || qlen > PACKETSZ; query->error_status = ARES_ECONNREFUSED; -- 2.40.0