#include "trevent.h" /* tr_runInEventThread() */
#include "utils.h"
-static struct event dht_event, dht6_event;
+static struct event dht_timer;
static unsigned char myid[20];
static tr_session *session = NULL;
-static void event_callback(int s, short type, void *ignore);
+static void timer_callback(int s, short type, void *ignore);
struct bootstrap_closure {
tr_session *session;
cl->len6 = len6;
tr_threadNew( dht_bootstrap, cl );
- event_set( &dht_event, ss->udp_socket, EV_READ, event_callback, NULL );
- tr_timerAdd( &dht_event, 0, tr_cryptoWeakRandInt( 1000000 ) );
-
- if( ss->udp6_socket >= 0 )
- {
- event_set( &dht6_event, ss->udp6_socket, EV_READ, event_callback, NULL );
- tr_timerAdd( &dht6_event, 0, tr_cryptoWeakRandInt( 1000000 ) );
- }
+ evtimer_set( &dht_timer, timer_callback, session );
+ tr_timerAdd( &dht_timer, 0, tr_cryptoWeakRandInt( 1000000 ) );
tr_ndbg( "DHT", "DHT initialized" );
tr_ndbg( "DHT", "Uninitializing DHT" );
- event_free( dht_event );
- dht_event = NULL;
-
- if( ss->udp6_socket >= 0 )
- event_del( &dht6_event );
+ event_del( &dht_timer );
/* Since we only save known good nodes, avoid erasing older data if we
don't know enough nodes. */
return ret;
}
-static void
-event_callback(int s, short type, void *ignore UNUSED )
+void
+tr_dhtCallback(unsigned char *buf, int buflen,
+ struct sockaddr *from, socklen_t fromlen,
+ void *sv )
{
- struct event *event;
+ tr_session *ss = (tr_session*)sv;
time_t tosleep;
- struct sockaddr_storage from;
- socklen_t fromlen;
- unsigned char *buf = NULL;
int rc;
- if (s == session->udp_socket)
- event = &dht_event;
- else if(s == session->udp6_socket)
- event = &dht6_event;
- else {
- tr_nerr("DHT", "Event on unexpected socket");
- event = NULL;
- }
+ assert(tr_isSession(ss));
- if( type == EV_READ ) {
- buf = malloc(4096);
- if(buf != NULL) {
- fromlen = sizeof(from);
- rc = recvfrom(s, buf, 4096 - 1, 0,
- (struct sockaddr*)&from, &fromlen);
- if(rc < 0)
- rc = 0;
- else
- buf[rc] = 0;
- }
- } else {
- rc = 0;
- fromlen = 0;
+ if(sv != session) {
+ tr_nerr("DHT", "tr_dhtCallback called for unexpected session");
+ return;
}
- if( dht_periodic( buf, rc, (struct sockaddr*)&from, fromlen,
- &tosleep, callback, NULL) < 0 ) {
+ rc = dht_periodic( buf, buflen, from, fromlen,
+ &tosleep, callback, NULL);
+ if(rc < 0) {
if(errno == EINTR) {
tosleep = 0;
} else {
}
}
- if(buf) {
- free(buf);
- buf = NULL;
- }
-
-#ifdef NOTYET
- /* Only do this once in a while. Counting rather than measuring time
- avoids a system call. */
- count++;
- if(count >= 20) {
- rebind_ipv6(FALSE);
- count = 0;
- }
-#endif
+ /* Being slightly late is fine,
+ and has the added benefit of adding some jitter. */
+ tr_timerAdd( &dht_timer, tosleep, tr_cryptoWeakRandInt( 1000000 ) );
+}
- if(event) {
- /* Being slightly late is fine,
- and has the added benefit of adding some jitter. */
- tr_timerAdd( event, tosleep, tr_cryptoWeakRandInt( 1000000 ) );
- }
+static void
+timer_callback(int s UNUSED, short type UNUSED, void *session )
+{
+ tr_dhtCallback(NULL, 0, NULL, 0, session);
}
+
void
dht_hash(void *hash_return, int hash_size,
const void *v1, int len1,
#include <unistd.h>
#include <assert.h>
+#include <event.h>
+
#include "transmission.h"
#include "net.h"
#include "session.h"
}
}
+static void
+event_callback(int s, short type, void *sv)
+{
+ tr_session *ss = (tr_session*)sv;
+ unsigned char *buf;
+ struct sockaddr_storage from;
+ socklen_t fromlen;
+ int rc;
+
+ assert(tr_isSession(ss));
+ assert(type == EV_READ);
+
+ buf = malloc(4096);
+ if(buf == NULL) {
+ tr_nerr("UDP", "Couldn't allocate buffer");
+ return;
+ }
+
+ fromlen = sizeof(from);
+ rc = recvfrom(s, buf, 4096 - 1, 0,
+ (struct sockaddr*)&from, &fromlen);
+ if(rc <= 0)
+ return;
+
+ if(buf[0] == 'd') {
+ /* DHT packet. */
+ buf[rc] = '\0';
+ tr_dhtCallback(buf, rc, (struct sockaddr*)&from, fromlen, sv);
+ } else {
+ /* Probably a UTP packet. */
+ /* Nothing yet. */
+ }
+
+ free(buf);
+}
+
void
tr_udpInit(tr_session *ss, const tr_address * addr)
{
goto ipv6;
}
+ ss->udp_event = tr_new0(struct event, 1);
+ if(ss->udp_event == NULL) {
+ tr_nerr("UDP", "Couldn't allocate IPv4 event");
+ close(ss->udp_socket);
+ ss->udp_socket = -1;
+ goto ipv6;
+ }
+
memset(&sin, 0, sizeof(sin));
sin.sin_family = AF_INET;
memcpy(&sin.sin_addr, &addr->addr.addr4, sizeof (struct in_addr));
goto ipv6;
}
+ event_set(ss->udp_event, ss->udp_socket, EV_READ | EV_PERSIST,
+ event_callback, ss);
+
ipv6:
- if(tr_globalIPv6())
- rebind_ipv6(ss, TRUE);
+ ss->udp6_event = tr_new0(struct event, 1);
+ if(ss->udp6_event == NULL) {
+ tr_nerr("UDP", "Couldn't allocate IPv6 event");
+ } else {
+ if(tr_globalIPv6())
+ rebind_ipv6(ss, TRUE);
+ if(ss->udp6_socket >= 0)
+ event_set(ss->udp6_event, ss->udp6_socket, EV_READ | EV_PERSIST,
+ event_callback, ss);
+ }
if(ss->isDHTEnabled)
tr_dhtInit(ss);
+
+ if(ss->udp_event)
+ event_add(ss->udp_event, NULL);
+ if(ss->udp6_event)
+ event_add(ss->udp6_event, NULL);
}
void
ss->udp_socket = -1;
}
+ if(ss->udp_event) {
+ event_del(ss->udp_event);
+ free(ss->udp_event);
+ ss->udp_event = NULL;
+ }
+
if(ss->udp6_socket >= 0) {
tr_netCloseSocket( ss->udp6_socket );
ss->udp6_socket = -1;
}
+ if(ss->udp6_event) {
+ event_del(ss->udp6_event);
+ free(ss->udp6_event);
+ ss->udp6_event = NULL;
+ }
+
if(ss->udp6_bound) {
free(ss->udp6_bound);
ss->udp6_bound = NULL;