From aa3868df064d3351b0a1f5c9bd3b88f4e62350cc Mon Sep 17 00:00:00 2001 From: Juliusz Chroboczek Date: Sun, 9 Jan 2011 21:48:46 +0000 Subject: [PATCH] Move handling of UDP I/O to tr-udp.c. --- libtransmission/session.h | 2 + libtransmission/tr-dht.c | 90 +++++++++++---------------------------- libtransmission/tr-dht.h | 3 ++ libtransmission/tr-udp.c | 78 ++++++++++++++++++++++++++++++++- 4 files changed, 107 insertions(+), 66 deletions(-) diff --git a/libtransmission/session.h b/libtransmission/session.h index 16acba4e9..4ddf5f1de 100644 --- a/libtransmission/session.h +++ b/libtransmission/session.h @@ -130,6 +130,8 @@ struct tr_session int udp_socket; int udp6_socket; unsigned char * udp6_bound; + struct event *udp_event; + struct event *udp6_event; /* The open port on the local machine for incoming peer requests */ tr_port private_peer_port; diff --git a/libtransmission/tr-dht.c b/libtransmission/tr-dht.c index c67480e5d..036a37d21 100644 --- a/libtransmission/tr-dht.c +++ b/libtransmission/tr-dht.c @@ -59,11 +59,11 @@ THE SOFTWARE. #include "trevent.h" /* tr_runInEventThread() */ #include "utils.h" -static struct event dht_event, dht6_event; +static struct event dht_timer; static unsigned char myid[20]; static tr_session *session = NULL; -static void event_callback(int s, short type, void *ignore); +static void timer_callback(int s, short type, void *ignore); struct bootstrap_closure { tr_session *session; @@ -315,14 +315,8 @@ tr_dhtInit(tr_session *ss) cl->len6 = len6; tr_threadNew( dht_bootstrap, cl ); - event_set( &dht_event, ss->udp_socket, EV_READ, event_callback, NULL ); - tr_timerAdd( &dht_event, 0, tr_cryptoWeakRandInt( 1000000 ) ); - - if( ss->udp6_socket >= 0 ) - { - event_set( &dht6_event, ss->udp6_socket, EV_READ, event_callback, NULL ); - tr_timerAdd( &dht6_event, 0, tr_cryptoWeakRandInt( 1000000 ) ); - } + evtimer_set( &dht_timer, timer_callback, session ); + tr_timerAdd( &dht_timer, 0, tr_cryptoWeakRandInt( 1000000 ) ); tr_ndbg( "DHT", "DHT initialized" ); @@ -342,11 +336,7 @@ tr_dhtUninit(tr_session *ss) tr_ndbg( "DHT", "Uninitializing DHT" ); - event_free( dht_event ); - dht_event = NULL; - - if( ss->udp6_socket >= 0 ) - event_del( &dht6_event ); + event_del( &dht_timer ); /* Since we only save known good nodes, avoid erasing older data if we don't know enough nodes. */ @@ -590,43 +580,25 @@ tr_dhtAnnounce(tr_torrent *tor, int af, tr_bool announce) return ret; } -static void -event_callback(int s, short type, void *ignore UNUSED ) +void +tr_dhtCallback(unsigned char *buf, int buflen, + struct sockaddr *from, socklen_t fromlen, + void *sv ) { - struct event *event; + tr_session *ss = (tr_session*)sv; time_t tosleep; - struct sockaddr_storage from; - socklen_t fromlen; - unsigned char *buf = NULL; int rc; - if (s == session->udp_socket) - event = &dht_event; - else if(s == session->udp6_socket) - event = &dht6_event; - else { - tr_nerr("DHT", "Event on unexpected socket"); - event = NULL; - } + assert(tr_isSession(ss)); - if( type == EV_READ ) { - buf = malloc(4096); - if(buf != NULL) { - fromlen = sizeof(from); - rc = recvfrom(s, buf, 4096 - 1, 0, - (struct sockaddr*)&from, &fromlen); - if(rc < 0) - rc = 0; - else - buf[rc] = 0; - } - } else { - rc = 0; - fromlen = 0; + if(sv != session) { + tr_nerr("DHT", "tr_dhtCallback called for unexpected session"); + return; } - if( dht_periodic( buf, rc, (struct sockaddr*)&from, fromlen, - &tosleep, callback, NULL) < 0 ) { + rc = dht_periodic( buf, buflen, from, fromlen, + &tosleep, callback, NULL); + if(rc < 0) { if(errno == EINTR) { tosleep = 0; } else { @@ -637,28 +609,18 @@ event_callback(int s, short type, void *ignore UNUSED ) } } - if(buf) { - free(buf); - buf = NULL; - } - -#ifdef NOTYET - /* Only do this once in a while. Counting rather than measuring time - avoids a system call. */ - count++; - if(count >= 20) { - rebind_ipv6(FALSE); - count = 0; - } -#endif + /* Being slightly late is fine, + and has the added benefit of adding some jitter. */ + tr_timerAdd( &dht_timer, tosleep, tr_cryptoWeakRandInt( 1000000 ) ); +} - if(event) { - /* Being slightly late is fine, - and has the added benefit of adding some jitter. */ - tr_timerAdd( event, tosleep, tr_cryptoWeakRandInt( 1000000 ) ); - } +static void +timer_callback(int s UNUSED, short type UNUSED, void *session ) +{ + tr_dhtCallback(NULL, 0, NULL, 0, session); } + void dht_hash(void *hash_return, int hash_size, const void *v1, int len1, diff --git a/libtransmission/tr-dht.h b/libtransmission/tr-dht.h index ff4729e47..1edac5da9 100644 --- a/libtransmission/tr-dht.h +++ b/libtransmission/tr-dht.h @@ -44,3 +44,6 @@ int tr_dhtStatus( tr_session *, int af, int * setme_nodeCount ); const char *tr_dhtPrintableStatus(int status); int tr_dhtAddNode( tr_session *, const tr_address *, tr_port, tr_bool bootstrap ); int tr_dhtAnnounce( tr_torrent *, int af, tr_bool announce ); +void tr_dhtCallback(unsigned char *buf, int buflen, + struct sockaddr *from, socklen_t fromlen, + void *sv); diff --git a/libtransmission/tr-udp.c b/libtransmission/tr-udp.c index b76590279..a39e79b64 100644 --- a/libtransmission/tr-udp.c +++ b/libtransmission/tr-udp.c @@ -24,6 +24,8 @@ THE SOFTWARE. #include #include +#include + #include "transmission.h" #include "net.h" #include "session.h" @@ -103,6 +105,42 @@ rebind_ipv6(tr_session *ss, tr_bool force) } } +static void +event_callback(int s, short type, void *sv) +{ + tr_session *ss = (tr_session*)sv; + unsigned char *buf; + struct sockaddr_storage from; + socklen_t fromlen; + int rc; + + assert(tr_isSession(ss)); + assert(type == EV_READ); + + buf = malloc(4096); + if(buf == NULL) { + tr_nerr("UDP", "Couldn't allocate buffer"); + return; + } + + fromlen = sizeof(from); + rc = recvfrom(s, buf, 4096 - 1, 0, + (struct sockaddr*)&from, &fromlen); + if(rc <= 0) + return; + + if(buf[0] == 'd') { + /* DHT packet. */ + buf[rc] = '\0'; + tr_dhtCallback(buf, rc, (struct sockaddr*)&from, fromlen, sv); + } else { + /* Probably a UTP packet. */ + /* Nothing yet. */ + } + + free(buf); +} + void tr_udpInit(tr_session *ss, const tr_address * addr) { @@ -122,6 +160,14 @@ tr_udpInit(tr_session *ss, const tr_address * addr) goto ipv6; } + ss->udp_event = tr_new0(struct event, 1); + if(ss->udp_event == NULL) { + tr_nerr("UDP", "Couldn't allocate IPv4 event"); + close(ss->udp_socket); + ss->udp_socket = -1; + goto ipv6; + } + memset(&sin, 0, sizeof(sin)); sin.sin_family = AF_INET; memcpy(&sin.sin_addr, &addr->addr.addr4, sizeof (struct in_addr)); @@ -134,12 +180,28 @@ tr_udpInit(tr_session *ss, const tr_address * addr) goto ipv6; } + event_set(ss->udp_event, ss->udp_socket, EV_READ | EV_PERSIST, + event_callback, ss); + ipv6: - if(tr_globalIPv6()) - rebind_ipv6(ss, TRUE); + ss->udp6_event = tr_new0(struct event, 1); + if(ss->udp6_event == NULL) { + tr_nerr("UDP", "Couldn't allocate IPv6 event"); + } else { + if(tr_globalIPv6()) + rebind_ipv6(ss, TRUE); + if(ss->udp6_socket >= 0) + event_set(ss->udp6_event, ss->udp6_socket, EV_READ | EV_PERSIST, + event_callback, ss); + } if(ss->isDHTEnabled) tr_dhtInit(ss); + + if(ss->udp_event) + event_add(ss->udp_event, NULL); + if(ss->udp6_event) + event_add(ss->udp6_event, NULL); } void @@ -152,11 +214,23 @@ tr_udpUninit(tr_session *ss) ss->udp_socket = -1; } + if(ss->udp_event) { + event_del(ss->udp_event); + free(ss->udp_event); + ss->udp_event = NULL; + } + if(ss->udp6_socket >= 0) { tr_netCloseSocket( ss->udp6_socket ); ss->udp6_socket = -1; } + if(ss->udp6_event) { + event_del(ss->udp6_event); + free(ss->udp6_event); + ss->udp6_event = NULL; + } + if(ss->udp6_bound) { free(ss->udp6_bound); ss->udp6_bound = NULL; -- 2.40.0