tr_direction dir,
int period_msec )
{
- int n;
+ int i, n, peerCount;
tr_ptrArray * tmp;
struct tr_peerIo ** peers;
+ /* allocateBandwidth() is a helper function with two purposes:
+ * 1. allocate bandwidth to b and its subtree
+ * 2. accumulate an array of all the peerIos from b and its subtree. */
tmp = tr_ptrArrayNew( );
allocateBandwidth( b, dir, period_msec, tmp );
- peers = (struct tr_peerIo**) tr_ptrArrayPeek( tmp, &n );
-
- /* loop through all the peers, reading and writing in small chunks,
- * until we run out of bandwidth or peers. we do it this way to
- * prevent one peer from using up all the bandwidth */
-#if 0
-fprintf( stderr, "%s - %d peers\n", (dir==TR_UP)?"up":"down", n );
-#endif
- while( n > 0 )
+ peers = (struct tr_peerIo**) tr_ptrArrayPeek( tmp, &peerCount );
+
+ /* Stop all peers from listening for the socket to be ready for IO.
+ * See "Second phase of IO" lower in this function for more info. */
+ for( i=0; i<peerCount; ++i )
+ tr_peerIoSetEnabled( peers[i], dir, FALSE );
+
+ /* First phase of IO. Tries to distribute bandwidth in a fair/even manner
+ * to avoid "greedy peers" from starving out the other peers: loop through
+ * peers in a round-robin fashion, giving each one of them them small chunks
+ * of bandwidth to use. (It's small to conserve some of the bandwidth
+ * until the end of the loop). Keep looping until we run out of bandwidth
+ * or peers that are ready to use it. */
+ n = peerCount;
+ i = n ? tr_cryptoWeakRandInt( n ) : 0; /* pick a random starting point */
+ for( ; n>0; )
{
- int i;
- for( i=0; i<n; )
- {
- const int increment = n==1 ? 4096 : 1024;
- const int byteCount = tr_peerIoFlush( peers[i], dir, increment);
-
-#if 0
- if( byteCount )
- fprintf( stderr, "peer %p: %d bytes\n", peers[i], byteCount );
-#endif
-
- if( byteCount == increment )
- ++i;
- else
- peers[i] = peers[--n];
+ const int increment = n==1 ? 4096 : 1024;
+ const int byteCount = tr_peerIoFlush( peers[i], dir, increment);
+
+ if( byteCount == increment )
+ ++i;
+ else {
+ /* peer is done writing for now; move it to the end of the list */
+ tr_peerIo * tmp = peers[i];
+ peers[i] = peers[n-1];
+ peers[n-1] = tmp;
+ --n;
}
+
+ assert( i <= n );
+ if( i == n )
+ i = 0;
}
+ /* Second phase of IO. To help us scale well in high bandiwdth situations
+ * such as LANs, enable on-demand IO for peers with bandwidth left to burn.
+ * This on-demand IO for a peer is enabled until either (1) the peer runs
+ * out of bandwidth, or (2) the next tr_bandwidthAllocate() call, when we
+ * start all over again. */
+ for( i=0; i<peerCount; ++i )
+ if( tr_peerIoHasBandwidthLeft( peers[i], dir ) )
+ tr_peerIoSetEnabled( peers[i], dir, TRUE );
+
/* cleanup */
tr_ptrArrayFree( tmp, NULL );
}
#include "utils.h"
#define MAGIC_NUMBER 206745
-#define IO_TIMEOUT_SECS 8
static size_t
getPacketOverhead( size_t d )
int magicNumber;
uint8_t encryptionMode;
- uint8_t timeout;
tr_port port;
int socket;
struct evbuffer * inbuf;
struct evbuffer * outbuf;
+
+ struct event event_read;
+ struct event event_write;
};
/***
***/
static void
-didWriteWrapper( void * unused UNUSED,
- size_t bytes_transferred,
- void * vio )
+didWriteWrapper( tr_peerIo * io, size_t bytes_transferred )
{
- tr_peerIo * io = vio;
-
while( bytes_transferred )
{
struct tr_datatype * next = io->output_datatypes->data;
}
static void
-canReadWrapper( void * unused UNUSED,
- size_t bytes_transferred UNUSED,
- void * vio )
+canReadWrapper( tr_peerIo * io )
{
- int done = 0;
- int err = 0;
- tr_peerIo * io = vio;
+ tr_bool done = 0;
+ tr_bool err = 0;
tr_session * session = io->session;
dbgmsg( io, "canRead" );
const size_t oldLen = EVBUFFER_LENGTH( io->inbuf );
const int ret = io->canRead( io, io->userData, &piece );
- if( ret != READ_ERR )
- {
- const size_t used = oldLen - EVBUFFER_LENGTH( io->inbuf );
- if( piece )
- tr_bandwidthUsed( io->bandwidth, TR_DOWN, piece, TRUE );
- if( used != piece )
- tr_bandwidthUsed( io->bandwidth, TR_DOWN, used - piece, FALSE );
- }
+ const size_t used = oldLen - EVBUFFER_LENGTH( io->inbuf );
+
+ if( piece )
+ tr_bandwidthUsed( io->bandwidth, TR_DOWN, piece, TRUE );
+
+ if( used != piece )
+ tr_bandwidthUsed( io->bandwidth, TR_DOWN, used - piece, FALSE );
switch( ret )
{
}
}
-#if 0
+#define _isBool(b) (((b)==0 || (b)==1))
+
+static int
+isPeerIo( const tr_peerIo * io )
+{
+ return ( io != NULL )
+ && ( io->magicNumber == MAGIC_NUMBER )
+ && ( tr_isAddress( &io->addr ) )
+ && ( _isBool( io->isEncrypted ) )
+ && ( _isBool( io->isIncoming ) )
+ && ( _isBool( io->peerIdIsSet ) )
+ && ( _isBool( io->extendedProtocolSupported ) )
+ && ( _isBool( io->fastExtensionSupported ) );
+}
+
static void
-gotErrorWrapper( struct tr_iobuf * iobuf,
- short what,
- void * userData )
+event_read_cb( int fd, short event UNUSED, void * vio )
{
- tr_peerIo * c = userData;
+ int res;
+ short what = EVBUFFER_READ;
+ tr_peerIo * io = vio;
+ const size_t howmuch = tr_bandwidthClamp( io->bandwidth, TR_DOWN, io->session->so_rcvbuf );
+ const tr_direction dir = TR_DOWN;
- if( c->gotError )
- c->gotError( iobuf, what, c->userData );
+ assert( isPeerIo( io ) );
+
+ dbgmsg( io, "libevent says this peer is ready to read" );
+
+ /* if we don't have any bandwidth left, stop reading */
+ if( howmuch < 1 ) {
+ tr_peerIoSetEnabled( io, dir, FALSE );
+ return;
+ }
+
+ res = evbuffer_read( io->inbuf, fd, howmuch );
+ if( res == -1 ) {
+ if( errno == EAGAIN || errno == EINTR )
+ goto reschedule;
+ /* error case */
+ what |= EVBUFFER_ERROR;
+ } else if( res == 0 ) {
+ /* eof case */
+ what |= EVBUFFER_EOF;
+ }
+
+ if( res <= 0 )
+ goto error;
+
+ tr_peerIoSetEnabled( io, dir, TRUE );
+
+ /* Invoke the user callback - must always be called last */
+ canReadWrapper( io );
+
+ return;
+
+ reschedule:
+ tr_peerIoSetEnabled( io, dir, TRUE );
+ return;
+
+ error:
+ if( io->gotError != NULL )
+ io->gotError( io, what, io->userData );
}
+
+static int
+tr_evbuffer_write( tr_peerIo * io, int fd, size_t howmuch )
+{
+ struct evbuffer * buffer = io->outbuf;
+ int n = MIN( EVBUFFER_LENGTH( buffer ), howmuch );
+
+#ifdef WIN32
+ n = send(fd, buffer->buffer, n, 0 );
+#else
+ n = write(fd, buffer->buffer, n );
#endif
+ dbgmsg( io, "wrote %d to peer (%s)", n, (n==-1?strerror(errno):"") );
-/**
-***
-**/
+ if( n == -1 )
+ return -1;
+ if (n == 0)
+ return 0;
+ evbuffer_drain( buffer, n );
+
+ return n;
+}
-#if 0
static void
-bufevNew( tr_peerIo * io )
+event_write_cb( int fd, short event UNUSED, void * vio )
{
- io->iobuf = tr_iobuf_new( io->session,
- io->bandwidth,
- io->socket,
- EV_READ | EV_WRITE,
- canReadWrapper,
- didWriteWrapper,
- gotErrorWrapper,
- io );
+ int res = 0;
+ short what = EVBUFFER_WRITE;
+ tr_peerIo * io = vio;
+ size_t howmuch;
+ const tr_direction dir = TR_UP;
- tr_iobuf_settimeout( io->iobuf, io->timeout, io->timeout );
-}
+ assert( isPeerIo( io ) );
+
+ dbgmsg( io, "libevent says this peer is ready to write" );
+
+ howmuch = MIN( (size_t)io->session->so_sndbuf, EVBUFFER_LENGTH( io->outbuf ) );
+ howmuch = tr_bandwidthClamp( io->bandwidth, dir, howmuch );
+
+ /* if we don't have any bandwidth left, stop writing */
+ if( howmuch < 1 ) {
+ tr_peerIoSetEnabled( io, dir, FALSE );
+ return;
+ }
+
+ res = tr_evbuffer_write( io, fd, howmuch );
+ if (res == -1) {
+#ifndef WIN32
+/*todo. evbuffer uses WriteFile when WIN32 is set. WIN32 system calls do not
+ * *set errno. thus this error checking is not portable*/
+ if (errno == EAGAIN || errno == EINTR || errno == EINPROGRESS)
+ goto reschedule;
+ /* error case */
+ what |= EVBUFFER_ERROR;
+
+#else
+ goto reschedule;
#endif
-static int
-isPeerIo( const tr_peerIo * io )
-{
- return ( io != NULL ) && ( io->magicNumber == MAGIC_NUMBER );
+ } else if (res == 0) {
+ /* eof case */
+ what |= EVBUFFER_EOF;
+ }
+ if (res <= 0)
+ goto error;
+
+ if( EVBUFFER_LENGTH( io->outbuf ) )
+ tr_peerIoSetEnabled( io, dir, TRUE );
+
+ didWriteWrapper( io, res );
+ return;
+
+ reschedule:
+ if( EVBUFFER_LENGTH( io->outbuf ) )
+ tr_peerIoSetEnabled( io, dir, TRUE );
+ return;
+
+ error:
+ io->gotError( io, what, io->userData );
}
+/**
+***
+**/
+
+
static int
isFlag( int flag )
{
io->port = port;
io->socket = socket;
io->isIncoming = isIncoming != 0;
- io->timeout = IO_TIMEOUT_SECS;
io->timeCreated = time( NULL );
io->inbuf = evbuffer_new( );
io->outbuf = evbuffer_new( );
+ event_set( &io->event_read, io->socket, EV_READ, event_read_cb, io );
+ event_set( &io->event_write, io->socket, EV_WRITE, event_write_cb, io );
#if 0
bufevNew( io );
#endif
{
tr_peerIo * io = vio;
+ event_del( &io->event_read );
+ event_del( &io->event_write );
tr_peerIoSetBandwidth( io, NULL );
evbuffer_free( io->outbuf );
evbuffer_free( io->inbuf );
return tr_peerIoAddrStr( &io->addr, io->port );
}
-#if 0
-static void
-tr_peerIoTryRead( tr_peerIo * io )
-{
- if( EVBUFFER_LENGTH( tr_iobuf_input( io->iobuf )))
- (*canReadWrapper)( io->iobuf, ~0, io );
-}
-#endif
-
void
-tr_peerIoSetIOFuncs( tr_peerIo * io,
- tr_can_read_cb readcb,
- tr_did_write_cb writecb,
- tr_net_error_cb errcb,
- void * userData )
+tr_peerIoSetIOFuncs( tr_peerIo * io,
+ tr_can_read_cb readcb,
+ tr_did_write_cb writecb,
+ tr_net_error_cb errcb,
+ void * userData )
{
io->canRead = readcb;
io->didWrite = writecb;
io->gotError = errcb;
io->userData = userData;
-
-#if 0
- tr_peerIoTryRead( io );
-#endif
}
tr_bool
return -1;
}
-#if 0
-void
-tr_peerIoSetTimeoutSecs( tr_peerIo * io,
- int secs )
-{
- io->timeout = secs;
- tr_iobuf_settimeout( io->iobuf, io->timeout, io->timeout );
- tr_iobuf_enable( io->iobuf, EV_READ | EV_WRITE );
-}
-#endif
-
/**
***
**/
const double maxBlockSize = 16 * 1024; /* 16 KiB is from BT spec */
const double currentSpeed = tr_bandwidthGetPieceSpeed( io->bandwidth, TR_UP );
const double period = 20; /* arbitrary */
- return MAX( maxBlockSize*20.5, currentSpeed*1024*period );
+ const double numBlocks = 5.5; /* the 5 is arbitrary; the .5 is to leave room for messages */
+ return MAX( maxBlockSize*numBlocks, currentSpeed*1024*period );
}
size_t
dbgmsg( io, "read %d from peer (%s)", res, (res==-1?strerror(errno):"") );
- if( res > 0 )
- canReadWrapper( io, res, io );
+ if( EVBUFFER_LENGTH( io->inbuf ) )
+ canReadWrapper( io );
if( ( res <= 0 ) && ( io->gotError ) && ( errno != EAGAIN ) && ( errno != EINTR ) && ( errno != EINPROGRESS ) )
{
assert( isPeerIo( io ) );
howmuch = tr_bandwidthClamp( io->bandwidth, TR_UP, howmuch );
- howmuch = MIN( howmuch, EVBUFFER_LENGTH( io->outbuf ) );
- n = (int) howmuch;
-#ifdef WIN32
- n = send( io->socket, EVBUFFER_DATA( io->outbuf ), n, 0 );
-#else
- n = write( io->socket, EVBUFFER_DATA( io->outbuf ), n );
-#endif
- dbgmsg( io, "wrote %d to peer (%s)", n, (n==-1?strerror(errno):"") );
+ n = tr_evbuffer_write( io, io->socket, (int)howmuch );
if( n > 0 )
- {
- evbuffer_drain( io->outbuf, n );
+ didWriteWrapper( io, n );
- didWriteWrapper( NULL, n, io );
- }
-
- if( ( n < 0 ) && ( io->gotError ) && ( errno != EPIPE ) && ( errno != EAGAIN ) && ( errno != EINTR ) && ( errno != EINPROGRESS ) )
- {
+ if( ( n < 0 ) && ( io->gotError ) && ( errno != EPIPE ) && ( errno != EAGAIN ) && ( errno != EINTR ) && ( errno != EINPROGRESS ) ) {
short what = EVBUFFER_WRITE | EVBUFFER_ERROR;
io->gotError( io, what, io->userData );
}
return io->inbuf;
}
+
+tr_bool
+tr_peerIoHasBandwidthLeft( const tr_peerIo * io, tr_direction dir )
+{
+ assert( isPeerIo( io ) );
+ assert( dir==TR_UP || dir==TR_DOWN );
+
+ return tr_bandwidthClamp( io->bandwidth, dir, 1024 ) > 0;
+}
+
+/***
+****
+****/
+
+static void
+event_enable( tr_peerIo * io, short event )
+{
+ assert( isPeerIo( io ) );
+
+ if( event & EV_READ )
+ event_add( &io->event_read, NULL );
+
+ if( event & EV_WRITE )
+ event_add( &io->event_write, NULL );
+}
+
+static void
+event_disable( struct tr_peerIo * io, short event )
+{
+ assert( isPeerIo( io ) );
+
+ if( event & EV_READ )
+ event_del( &io->event_read );
+
+ if( event & EV_WRITE )
+ event_del( &io->event_write );
+}
+
+
+void
+tr_peerIoSetEnabled( tr_peerIo * io,
+ tr_direction dir,
+ tr_bool isEnabled )
+{
+ const short event = dir == TR_UP ? EV_WRITE : EV_READ;
+
+ if( isEnabled )
+ event_enable( io, event );
+ else
+ event_disable( io, event );
+}
RECONNECT_PERIOD_MSEC = ( 2 * 1000 ),
/* how frequently to reallocate bandwidth */
- BANDWIDTH_PERIOD_MSEC = 100,
+ BANDWIDTH_PERIOD_MSEC = 500,
/* max # of peers to ask fer per torrent per reconnect pulse */
MAX_RECONNECTIONS_PER_PULSE = 4,
tr_session * session;
tr_ptrArray * torrents; /* Torrent */
tr_ptrArray * incomingHandshakes; /* tr_handshake */
+ tr_ptrArray * finishedHandshakes; /* tr_handshake */
tr_timer * bandwidthTimer;
};
m->session = session;
m->torrents = tr_ptrArrayNew( );
m->incomingHandshakes = tr_ptrArrayNew( );
+ m->finishedHandshakes = tr_ptrArrayNew( );
m->bandwidthTimer = tr_timerNew( session, bandwidthPulse, m, BANDWIDTH_PERIOD_MSEC );
return m;
}
void
tr_peerMgrFree( tr_peerMgr * manager )
{
+ tr_handshake * handshake;
+
managerLock( manager );
tr_timerFree( &manager->bandwidthTimer );
tr_ptrArrayFree( manager->incomingHandshakes, NULL );
+ while(( handshake = tr_ptrArrayPop( manager->finishedHandshakes )))
+ tr_handshakeFree( handshake );
+
+ tr_ptrArrayFree( manager->finishedHandshakes, NULL );
+
/* free the torrents. */
tr_ptrArrayFree( manager->torrents, torrentDestructor );
/* FIXME: this is kind of a mess. */
static tr_bool
-myHandshakeDoneCB( tr_handshake * handshake,
- tr_peerIo * io,
+myHandshakeDoneCB( tr_handshake * handshake,
+ tr_peerIo * io,
int isConnected,
const uint8_t * peer_id,
- void * vmanager )
+ void * vmanager )
{
tr_bool ok = isConnected;
tr_bool success = FALSE;
if( atom )
++atom->numFails;
}
-
- tr_peerIoFree( io );
}
else /* looking good */
{
{
tordbg( t, "banned peer %s tried to reconnect",
tr_peerIoAddrStr( &atom->addr, atom->port ) );
- tr_peerIoFree( io );
}
else if( tr_peerIoIsIncoming( io )
&& ( getPeerCount( t ) >= getMaxPeerCount( t->tor ) ) )
{
- tr_peerIoFree( io );
}
else
{
if( peer ) /* we already have this peer */
{
- tr_peerIoFree( io );
}
else
{
}
peer->port = port;
- peer->io = io;
+ peer->io = tr_handshakeStealIO( handshake );
tr_peerMsgsNew( t->tor, peer, peerCallbackFunc, t, &peer->msgsTag );
tr_peerIoSetBandwidth( io, peer->bandwidth );
}
}
+ if( !success )
+ tr_ptrArrayAppend( manager->finishedHandshakes, handshake );
+
if( t )
torrentUnlock( t );
}
}
-#warning this for loop can be removed when we are sure the bug is fixed
- for( i=0; i<peersReturning; ++i )
- assert( tr_isAddress( &pex[i].addr ) );
-
assert( ( walk - pex ) == peersReturning );
qsort( pex, peersReturning, sizeof( tr_pex ), tr_pexCompare );
-#warning this for loop can be removed when we are sure the bug is fixed
- for( i=0; i<peersReturning; ++i )
- assert( tr_isAddress( &pex[i].addr ) );
-
*setme_pex = pex;
}
static int
bandwidthPulse( void * vmgr )
{
+ tr_handshake * handshake;
tr_peerMgr * mgr = vmgr;
managerLock( mgr );
+ /* FIXME: this next line probably isn't necessary... */
pumpAllPeers( mgr );
+
+ /* allocate bandwidth to the peers */
tr_bandwidthAllocate( mgr->session->bandwidth, TR_UP, BANDWIDTH_PERIOD_MSEC );
tr_bandwidthAllocate( mgr->session->bandwidth, TR_DOWN, BANDWIDTH_PERIOD_MSEC );
- pumpAllPeers( mgr );
+
+ /* free all the finished handshakes */
+ while(( handshake = tr_ptrArrayPop( mgr->finishedHandshakes )))
+ tr_handshakeFree( handshake );
managerUnlock( mgr );
return TRUE;