]> granicus.if.org Git - transmission/commitdiff
try to rework the bandwidth code yet again s.t. it satisfies all three: (1) fairly...
authorCharles Kerr <charles@transmissionbt.com>
Sat, 20 Dec 2008 22:19:34 +0000 (22:19 +0000)
committerCharles Kerr <charles@transmissionbt.com>
Sat, 20 Dec 2008 22:19:34 +0000 (22:19 +0000)
libtransmission/bandwidth.c
libtransmission/handshake.c
libtransmission/handshake.h
libtransmission/peer-io.c
libtransmission/peer-io.h
libtransmission/peer-mgr.c

index cf35d4d9459403663ff771fd3b203d426f49f87a..ac3570a52773a61a3862fed2f7423655b5c5ffaa 100644 (file)
@@ -294,40 +294,59 @@ tr_bandwidthAllocate( tr_bandwidth  * b,
                       tr_direction    dir,
                       int             period_msec )
 {
-    int n;
+    int i, n, peerCount;
     tr_ptrArray * tmp;
     struct tr_peerIo ** peers;
 
+    /* allocateBandwidth() is a helper function with two purposes:
+     * 1. allocate bandwidth to b and its subtree
+     * 2. accumulate an array of all the peerIos from b and its subtree. */
     tmp = tr_ptrArrayNew( );
     allocateBandwidth( b, dir, period_msec, tmp );
-    peers = (struct tr_peerIo**) tr_ptrArrayPeek( tmp, &n );
-
-    /* loop through all the peers, reading and writing in small chunks,
-     * until we run out of bandwidth or peers. we do it this way to 
-     * prevent one peer from using up all the bandwidth */
-#if 0
-fprintf( stderr, "%s - %d peers\n", (dir==TR_UP)?"up":"down", n );
-#endif
-    while( n > 0 )
+    peers = (struct tr_peerIo**) tr_ptrArrayPeek( tmp, &peerCount );
+
+    /* Stop all peers from listening for the socket to be ready for IO.
+     * See "Second phase of IO" lower in this function for more info. */
+    for( i=0; i<peerCount; ++i )
+        tr_peerIoSetEnabled( peers[i], dir, FALSE );
+
+    /* First phase of IO.  Tries to distribute bandwidth in a fair/even manner
+     * to avoid "greedy peers" from starving out the other peers: loop through
+     * peers in a round-robin fashion, giving each one of them them small chunks
+     * of bandwidth to use.  (It's small to conserve some of the bandwidth
+     * until the end of the loop).  Keep looping until we run out of bandwidth
+     * or peers that are ready to use it. */
+    n = peerCount;
+    i = n ? tr_cryptoWeakRandInt( n ) : 0; /* pick a random starting point */
+    for( ; n>0; )
     {
-        int i;
-        for( i=0; i<n; )
-        {
-            const int increment = n==1 ? 4096 : 1024;
-            const int byteCount = tr_peerIoFlush( peers[i], dir, increment);
-
-#if 0
-            if( byteCount )
-                fprintf( stderr, "peer %p: %d bytes\n", peers[i], byteCount );
-#endif
-
-            if( byteCount == increment )
-                ++i;
-            else
-                peers[i] = peers[--n];
+        const int increment = n==1 ? 4096 : 1024;
+        const int byteCount = tr_peerIoFlush( peers[i], dir, increment);
+
+        if( byteCount == increment )
+            ++i;
+        else {
+            /* peer is done writing for now; move it to the end of the list */
+            tr_peerIo * tmp = peers[i];
+            peers[i] = peers[n-1];
+            peers[n-1] = tmp;
+            --n;
         }
+
+        assert( i <= n );
+        if( i == n )
+            i = 0;
     }
 
+    /* Second phase of IO.  To help us scale well in high bandiwdth situations
+     * such as LANs, enable on-demand IO for peers with bandwidth left to burn.
+     * This on-demand IO for a peer is enabled until either (1) the peer runs
+     * out of bandwidth, or (2) the next tr_bandwidthAllocate() call, when we
+     * start all over again. */
+    for( i=0; i<peerCount; ++i )
+        if( tr_peerIoHasBandwidthLeft( peers[i], dir ) )
+            tr_peerIoSetEnabled( peers[i], dir, TRUE );
+
     /* cleanup */
     tr_ptrArrayFree( tmp, NULL );
 }
index 84679bcf8996b70825f05a986410a174ff118a4b..356a3681f647c5a49db8d413130cc059b2fe465c 100644 (file)
@@ -1090,19 +1090,26 @@ fireDoneFunc( tr_handshake * handshake,
     return success;
 }
 
+void
+tr_handshakeFree( tr_handshake * handshake )
+{
+    if( handshake->io )
+        tr_peerIoFree( handshake->io );
+
+    tr_free( handshake );
+}
+
 static int
 tr_handshakeDone( tr_handshake * handshake,
                   int            isOK )
 {
-    int success;
+    tr_bool success;
 
     dbgmsg( handshake, "handshakeDone: %s", isOK ? "connected" : "aborting" );
     tr_peerIoSetIOFuncs( handshake->io, NULL, NULL, NULL, NULL );
 
     success = fireDoneFunc( handshake, isOK );
 
-    tr_free( handshake );
-
     return success ? READ_LATER : READ_ERR;
 }
 
@@ -1192,6 +1199,19 @@ tr_handshakeGetIO( tr_handshake * handshake )
     return handshake->io;
 }
 
+struct tr_peerIo*
+tr_handshakeStealIO( tr_handshake * handshake )
+{
+    struct tr_peerIo * io;
+
+    assert( handshake );
+    assert( handshake->io );
+
+    io = handshake->io;
+    handshake->io = NULL;
+    return io;
+}
+
 const tr_address *
 tr_handshakeGetAddr( const struct tr_handshake * handshake,
                      tr_port                   * port )
index c653a1444f69f742f539a064ae5164575eb0eb36..4fe861363955f07693e40fadd23afb612a74d660 100644 (file)
@@ -39,8 +39,13 @@ const struct tr_address *
                       tr_handshakeGetAddr( const struct tr_handshake  * handshake,
                                             tr_port                    * port );
 
+void                   tr_handshakeFree( tr_handshake * handshake );
+
 void                   tr_handshakeAbort( tr_handshake * handshake );
 
 struct tr_peerIo*      tr_handshakeGetIO( tr_handshake * handshake );
 
+struct tr_peerIo*      tr_handshakeStealIO( tr_handshake * handshake );
+
+
 #endif
index 191c176e62118c87611084b0c7e7a07f8ec6978a..024d2c1015d23d3c1fd35c5fd869d0754c77cbf2 100644 (file)
@@ -34,7 +34,6 @@
 #include "utils.h"
 
 #define MAGIC_NUMBER 206745
-#define IO_TIMEOUT_SECS 8
 
 static size_t
 getPacketOverhead( size_t d )
@@ -87,7 +86,6 @@ struct tr_peerIo
     int                magicNumber;
 
     uint8_t            encryptionMode;
-    uint8_t            timeout;
     tr_port            port;
     int                socket;
 
@@ -111,6 +109,9 @@ struct tr_peerIo
 
     struct evbuffer  * inbuf;
     struct evbuffer  * outbuf;
+
+    struct event       event_read;
+    struct event       event_write;
 };
 
 /***
@@ -118,12 +119,8 @@ struct tr_peerIo
 ***/
 
 static void
-didWriteWrapper( void     * unused UNUSED,
-                 size_t     bytes_transferred,
-                 void     * vio )
+didWriteWrapper( tr_peerIo * io, size_t bytes_transferred )
 {
-    tr_peerIo *  io = vio;
-
     while( bytes_transferred )
     {
         struct tr_datatype * next = io->output_datatypes->data;
@@ -146,13 +143,10 @@ didWriteWrapper( void     * unused UNUSED,
 }
 
 static void
-canReadWrapper( void    * unused UNUSED,
-                size_t    bytes_transferred UNUSED,
-                void    * vio )
+canReadWrapper( tr_peerIo * io )
 {
-    int          done = 0;
-    int          err = 0;
-    tr_peerIo *  io = vio;
+    tr_bool done = 0;
+    tr_bool err = 0;
     tr_session * session = io->session;
 
     dbgmsg( io, "canRead" );
@@ -168,14 +162,13 @@ canReadWrapper( void    * unused UNUSED,
             const size_t oldLen = EVBUFFER_LENGTH( io->inbuf );
             const int ret = io->canRead( io, io->userData, &piece );
 
-            if( ret != READ_ERR )
-            {
-                const size_t used = oldLen - EVBUFFER_LENGTH( io->inbuf );
-                if( piece )
-                    tr_bandwidthUsed( io->bandwidth, TR_DOWN, piece, TRUE );
-                if( used != piece )
-                    tr_bandwidthUsed( io->bandwidth, TR_DOWN, used - piece, FALSE );
-            }
+            const size_t used = oldLen - EVBUFFER_LENGTH( io->inbuf );
+
+            if( piece )
+                tr_bandwidthUsed( io->bandwidth, TR_DOWN, piece, TRUE );
+
+            if( used != piece )
+                tr_bandwidthUsed( io->bandwidth, TR_DOWN, used - piece, FALSE );
 
             switch( ret )
             {
@@ -199,46 +192,155 @@ canReadWrapper( void    * unused UNUSED,
     }
 }
 
-#if 0
+#define _isBool(b) (((b)==0 || (b)==1))
+
+static int
+isPeerIo( const tr_peerIo * io )
+{
+    return ( io != NULL )
+        && ( io->magicNumber == MAGIC_NUMBER )
+        && ( tr_isAddress( &io->addr ) )
+        && ( _isBool( io->isEncrypted ) )
+        && ( _isBool( io->isIncoming ) )
+        && ( _isBool( io->peerIdIsSet ) )
+        && ( _isBool( io->extendedProtocolSupported ) )
+        && ( _isBool( io->fastExtensionSupported ) );
+}
+
 static void
-gotErrorWrapper( struct tr_iobuf  * iobuf,
-                 short              what,
-                 void             * userData )
+event_read_cb( int fd, short event UNUSED, void * vio )
 {
-    tr_peerIo * c = userData;
+    int res;
+    short what = EVBUFFER_READ;
+    tr_peerIo * io = vio;
+    const size_t howmuch = tr_bandwidthClamp( io->bandwidth, TR_DOWN, io->session->so_rcvbuf );
+    const tr_direction dir = TR_DOWN;
 
-    if( c->gotError )
-        c->gotError( iobuf, what, c->userData );
+    assert( isPeerIo( io ) );
+
+    dbgmsg( io, "libevent says this peer is ready to read" );
+
+    /* if we don't have any bandwidth left, stop reading */
+    if( howmuch < 1 ) {
+        tr_peerIoSetEnabled( io, dir, FALSE );
+        return;
+    }
+
+    res = evbuffer_read( io->inbuf, fd, howmuch );
+    if( res == -1 ) {
+        if( errno == EAGAIN || errno == EINTR )
+            goto reschedule;
+        /* error case */
+        what |= EVBUFFER_ERROR;
+    } else if( res == 0 ) {
+        /* eof case */
+        what |= EVBUFFER_EOF;
+    }
+
+    if( res <= 0 )
+        goto error;
+
+    tr_peerIoSetEnabled( io, dir, TRUE );
+
+    /* Invoke the user callback - must always be called last */
+    canReadWrapper( io );
+
+    return;
+
+ reschedule:
+    tr_peerIoSetEnabled( io, dir, TRUE );
+    return;
+
+ error:
+    if( io->gotError != NULL )
+        io->gotError( io, what, io->userData );
 }
+
+static int
+tr_evbuffer_write( tr_peerIo * io, int fd, size_t howmuch )
+{
+    struct evbuffer * buffer = io->outbuf;
+    int n = MIN( EVBUFFER_LENGTH( buffer ), howmuch );
+
+#ifdef WIN32
+    n = send(fd, buffer->buffer, n,  0 );
+#else
+    n = write(fd, buffer->buffer, n );
 #endif
+    dbgmsg( io, "wrote %d to peer (%s)", n, (n==-1?strerror(errno):"") );
 
-/**
-***
-**/
+    if( n == -1 )
+        return -1;
+    if (n == 0)
+        return 0;
+    evbuffer_drain( buffer, n );
+
+    return n;
+}
 
-#if 0
 static void
-bufevNew( tr_peerIo * io )
+event_write_cb( int fd, short event UNUSED, void * vio )
 {
-    io->iobuf = tr_iobuf_new( io->session,
-                              io->bandwidth,
-                              io->socket,
-                              EV_READ | EV_WRITE,
-                              canReadWrapper,
-                              didWriteWrapper,
-                              gotErrorWrapper,
-                              io );
+    int res = 0;
+    short what = EVBUFFER_WRITE;
+    tr_peerIo * io = vio;
+    size_t howmuch;
+    const tr_direction dir = TR_UP;
 
-    tr_iobuf_settimeout( io->iobuf, io->timeout, io->timeout );
-}
+    assert( isPeerIo( io ) );
+
+    dbgmsg( io, "libevent says this peer is ready to write" );
+
+    howmuch = MIN( (size_t)io->session->so_sndbuf, EVBUFFER_LENGTH( io->outbuf ) );
+    howmuch = tr_bandwidthClamp( io->bandwidth, dir, howmuch );
+
+    /* if we don't have any bandwidth left, stop writing */
+    if( howmuch < 1 ) {
+        tr_peerIoSetEnabled( io, dir, FALSE );
+        return;
+    }
+
+    res = tr_evbuffer_write( io, fd, howmuch );
+    if (res == -1) {
+#ifndef WIN32
+/*todo. evbuffer uses WriteFile when WIN32 is set. WIN32 system calls do not
+ *  *set errno. thus this error checking is not portable*/
+        if (errno == EAGAIN || errno == EINTR || errno == EINPROGRESS)
+            goto reschedule;
+        /* error case */
+        what |= EVBUFFER_ERROR;
+
+#else
+        goto reschedule;
 #endif
 
-static int
-isPeerIo( const tr_peerIo * io )
-{
-    return ( io != NULL ) && ( io->magicNumber == MAGIC_NUMBER );
+    } else if (res == 0) {
+        /* eof case */
+        what |= EVBUFFER_EOF;
+    }
+    if (res <= 0)
+        goto error;
+
+    if( EVBUFFER_LENGTH( io->outbuf ) )
+        tr_peerIoSetEnabled( io, dir, TRUE );
+
+    didWriteWrapper( io, res );
+    return;
+
+ reschedule:
+    if( EVBUFFER_LENGTH( io->outbuf ) )
+        tr_peerIoSetEnabled( io, dir, TRUE );
+    return;
+
+ error:
+    io->gotError( io, what, io->userData );
 }
 
+/**
+***
+**/
+
+
 static int
 isFlag( int flag )
 {
@@ -266,10 +368,11 @@ tr_peerIoNew( tr_session       * session,
     io->port = port;
     io->socket = socket;
     io->isIncoming = isIncoming != 0;
-    io->timeout = IO_TIMEOUT_SECS;
     io->timeCreated = time( NULL );
     io->inbuf = evbuffer_new( );
     io->outbuf = evbuffer_new( );
+    event_set( &io->event_read, io->socket, EV_READ, event_read_cb, io );
+    event_set( &io->event_write, io->socket, EV_WRITE, event_write_cb, io );
 #if 0
     bufevNew( io );
 #endif
@@ -314,6 +417,8 @@ io_dtor( void * vio )
 {
     tr_peerIo * io = vio;
 
+    event_del( &io->event_read );
+    event_del( &io->event_write );
     tr_peerIoSetBandwidth( io, NULL );
     evbuffer_free( io->outbuf );
     evbuffer_free( io->inbuf );
@@ -377,30 +482,17 @@ tr_peerIoGetAddrStr( const tr_peerIo * io )
     return tr_peerIoAddrStr( &io->addr, io->port );
 }
 
-#if 0
-static void
-tr_peerIoTryRead( tr_peerIo * io )
-{
-    if( EVBUFFER_LENGTH( tr_iobuf_input( io->iobuf )))
-        (*canReadWrapper)( io->iobuf, ~0, io );
-}
-#endif
-
 void
-tr_peerIoSetIOFuncs( tr_peerIo *     io,
-                     tr_can_read_cb  readcb,
-                     tr_did_write_cb writecb,
-                     tr_net_error_cb errcb,
-                     void *          userData )
+tr_peerIoSetIOFuncs( tr_peerIo        * io,
+                     tr_can_read_cb     readcb,
+                     tr_did_write_cb    writecb,
+                     tr_net_error_cb    errcb,
+                     void             * userData )
 {
     io->canRead = readcb;
     io->didWrite = writecb;
     io->gotError = errcb;
     io->userData = userData;
-
-#if 0
-    tr_peerIoTryRead( io );
-#endif
 }
 
 tr_bool
@@ -436,17 +528,6 @@ tr_peerIoReconnect( tr_peerIo * io )
     return -1;
 }
 
-#if 0
-void
-tr_peerIoSetTimeoutSecs( tr_peerIo * io,
-                         int         secs )
-{
-    io->timeout = secs;
-    tr_iobuf_settimeout( io->iobuf, io->timeout, io->timeout );
-    tr_iobuf_enable( io->iobuf, EV_READ | EV_WRITE );
-}
-#endif
-
 /**
 ***
 **/
@@ -563,7 +644,8 @@ getDesiredOutputBufferSize( const tr_peerIo * io )
     const double maxBlockSize = 16 * 1024; /* 16 KiB is from BT spec */
     const double currentSpeed = tr_bandwidthGetPieceSpeed( io->bandwidth, TR_UP );
     const double period = 20; /* arbitrary */
-    return MAX( maxBlockSize*20.5, currentSpeed*1024*period );
+    const double numBlocks = 5.5; /* the 5 is arbitrary; the .5 is to leave room for messages */
+    return MAX( maxBlockSize*numBlocks, currentSpeed*1024*period );
 }
 
 size_t
@@ -804,8 +886,8 @@ tr_peerIoTryRead( tr_peerIo * io, size_t howmuch )
 
     dbgmsg( io, "read %d from peer (%s)", res, (res==-1?strerror(errno):"") );
 
-    if( res > 0 )
-        canReadWrapper( io, res, io );
+    if( EVBUFFER_LENGTH( io->inbuf ) )
+        canReadWrapper( io );
 
     if( ( res <= 0 ) && ( io->gotError ) && ( errno != EAGAIN ) && ( errno != EINTR ) && ( errno != EINPROGRESS ) )
     {
@@ -826,25 +908,13 @@ tr_peerIoTryWrite( tr_peerIo * io, size_t howmuch )
     assert( isPeerIo( io ) );
 
     howmuch = tr_bandwidthClamp( io->bandwidth, TR_UP, howmuch );
-    howmuch = MIN( howmuch, EVBUFFER_LENGTH( io->outbuf ) );
-    n = (int) howmuch;
 
-#ifdef WIN32
-    n = send( io->socket, EVBUFFER_DATA( io->outbuf ), n,  0 );
-#else
-    n = write( io->socket, EVBUFFER_DATA( io->outbuf ),  n );
-#endif
-    dbgmsg( io, "wrote %d to peer (%s)", n, (n==-1?strerror(errno):"") );
+    n = tr_evbuffer_write( io, io->socket, (int)howmuch );
 
     if( n > 0 )
-    {
-        evbuffer_drain( io->outbuf, n );
+        didWriteWrapper( io, n );
 
-        didWriteWrapper( NULL, n, io );
-    }
-
-    if( ( n < 0 ) && ( io->gotError ) && ( errno != EPIPE ) && ( errno != EAGAIN ) && ( errno != EINTR ) && ( errno != EINPROGRESS ) )
-    {
+    if( ( n < 0 ) && ( io->gotError ) && ( errno != EPIPE ) && ( errno != EAGAIN ) && ( errno != EINTR ) && ( errno != EINPROGRESS ) ) {
         short what = EVBUFFER_WRITE | EVBUFFER_ERROR;
         io->gotError( io, what, io->userData );
     }
@@ -875,3 +945,54 @@ tr_peerIoGetReadBuffer( tr_peerIo * io )
 
     return io->inbuf;
 }
+
+tr_bool
+tr_peerIoHasBandwidthLeft( const tr_peerIo * io, tr_direction dir )
+{
+    assert( isPeerIo( io ) );
+    assert( dir==TR_UP || dir==TR_DOWN );
+
+    return tr_bandwidthClamp( io->bandwidth, dir, 1024 ) > 0;
+}
+
+/***
+****
+****/
+
+static void
+event_enable( tr_peerIo * io, short event )
+{
+    assert( isPeerIo( io ) );
+
+    if( event & EV_READ )
+        event_add( &io->event_read, NULL );
+
+    if( event & EV_WRITE )
+        event_add( &io->event_write, NULL );
+}
+
+static void
+event_disable( struct tr_peerIo * io, short event )
+{
+    assert( isPeerIo( io ) );
+
+    if( event & EV_READ )
+        event_del( &io->event_read );
+
+    if( event & EV_WRITE )
+        event_del( &io->event_write );
+}
+
+
+void
+tr_peerIoSetEnabled( tr_peerIo    * io,
+                     tr_direction   dir,
+                     tr_bool        isEnabled )
+{
+    const short event = dir == TR_UP ? EV_WRITE : EV_READ;
+
+    if( isEnabled )
+        event_enable( io, event );
+    else
+        event_disable( io, event );
+}
index d55f4c24d18391f84bccebbeab5e291b6ed98220..4d6d317ce8bde76533957b73f13464326df546b3 100644 (file)
@@ -25,7 +25,6 @@ struct evbuffer;
 struct tr_address;
 struct tr_bandwidth;
 struct tr_crypto;
-struct tr_iobuf;
 typedef struct tr_peerIo tr_peerIo;
 
 /**
@@ -213,6 +212,13 @@ void      tr_peerIoBandwidthUsed( tr_peerIo           * io,
 ***
 **/
 
+tr_bool   tr_peerIoHasBandwidthLeft( const tr_peerIo  * io,
+                                     tr_direction       direction );
+
+void      tr_peerIoSetEnabled( tr_peerIo    * io,
+                               tr_direction   dir,
+                               tr_bool        isEnabled );
+                       
 int       tr_peerIoFlush( tr_peerIo     * io,
                           tr_direction    dir,
                           size_t          byteLimit );
index 9cbc7b2c22e2ac4ee52cd55869b3c16506227cc2..12f91acfcbda8285f977c12eede152f9c50dac96 100644 (file)
@@ -57,7 +57,7 @@ enum
     RECONNECT_PERIOD_MSEC = ( 2 * 1000 ),
    
     /* how frequently to reallocate bandwidth */
-    BANDWIDTH_PERIOD_MSEC = 100,
+    BANDWIDTH_PERIOD_MSEC = 500,
 
     /* max # of peers to ask fer per torrent per reconnect pulse */
     MAX_RECONNECTIONS_PER_PULSE = 4,
@@ -143,6 +143,7 @@ struct tr_peerMgr
     tr_session      * session;
     tr_ptrArray     * torrents; /* Torrent */
     tr_ptrArray     * incomingHandshakes; /* tr_handshake */
+    tr_ptrArray     * finishedHandshakes; /* tr_handshake */
     tr_timer        * bandwidthTimer;
 };
 
@@ -463,6 +464,7 @@ tr_peerMgrNew( tr_session * session )
     m->session = session;
     m->torrents = tr_ptrArrayNew( );
     m->incomingHandshakes = tr_ptrArrayNew( );
+    m->finishedHandshakes = tr_ptrArrayNew( );
     m->bandwidthTimer = tr_timerNew( session, bandwidthPulse, m, BANDWIDTH_PERIOD_MSEC );
     return m;
 }
@@ -470,6 +472,8 @@ tr_peerMgrNew( tr_session * session )
 void
 tr_peerMgrFree( tr_peerMgr * manager )
 {
+    tr_handshake * handshake;
+
     managerLock( manager );
 
     tr_timerFree( &manager->bandwidthTimer );
@@ -481,6 +485,11 @@ tr_peerMgrFree( tr_peerMgr * manager )
 
     tr_ptrArrayFree( manager->incomingHandshakes, NULL );
 
+    while(( handshake = tr_ptrArrayPop( manager->finishedHandshakes )))
+        tr_handshakeFree( handshake );
+
+    tr_ptrArrayFree( manager->finishedHandshakes, NULL );
+
     /* free the torrents. */
     tr_ptrArrayFree( manager->torrents, torrentDestructor );
 
@@ -1194,11 +1203,11 @@ getPeerCount( const Torrent * t )
 
 /* FIXME: this is kind of a mess. */
 static tr_bool
-myHandshakeDoneCB( tr_handshake *  handshake,
-                   tr_peerIo *     io,
+myHandshakeDoneCB( tr_handshake  *  handshake,
+                   tr_peerIo     * io,
                    int             isConnected,
                    const uint8_t * peer_id,
-                   void *          vmanager )
+                   void          * vmanager )
 {
     tr_bool            ok = isConnected;
     tr_bool            success = FALSE;
@@ -1240,8 +1249,6 @@ myHandshakeDoneCB( tr_handshake *  handshake,
             if( atom )
                 ++atom->numFails;
         }
-
-        tr_peerIoFree( io );
     }
     else /* looking good */
     {
@@ -1255,13 +1262,11 @@ myHandshakeDoneCB( tr_handshake *  handshake,
         {
             tordbg( t, "banned peer %s tried to reconnect",
                     tr_peerIoAddrStr( &atom->addr, atom->port ) );
-            tr_peerIoFree( io );
         }
         else if( tr_peerIoIsIncoming( io )
                && ( getPeerCount( t ) >= getMaxPeerCount( t->tor ) ) )
 
         {
-            tr_peerIoFree( io );
         }
         else
         {
@@ -1269,7 +1274,6 @@ myHandshakeDoneCB( tr_handshake *  handshake,
 
             if( peer ) /* we already have this peer */
             {
-                tr_peerIoFree( io );
             }
             else
             {
@@ -1285,7 +1289,7 @@ myHandshakeDoneCB( tr_handshake *  handshake,
                 }
 
                 peer->port = port;
-                peer->io = io;
+                peer->io = tr_handshakeStealIO( handshake );
                 tr_peerMsgsNew( t->tor, peer, peerCallbackFunc, t, &peer->msgsTag );
                 tr_peerIoSetBandwidth( io, peer->bandwidth );
 
@@ -1294,6 +1298,9 @@ myHandshakeDoneCB( tr_handshake *  handshake,
         }
     }
 
+    if( !success )
+        tr_ptrArrayAppend( manager->finishedHandshakes, handshake );
+
     if( t )
         torrentUnlock( t );
 
@@ -1544,17 +1551,9 @@ tr_peerMgrGetPeers( tr_peerMgr      * manager,
             }
         }
 
-#warning this for loop can be removed when we are sure the bug is fixed
-        for( i=0; i<peersReturning; ++i )
-            assert( tr_isAddress( &pex[i].addr ) );
-
         assert( ( walk - pex ) == peersReturning );
         qsort( pex, peersReturning, sizeof( tr_pex ), tr_pexCompare );
 
-#warning this for loop can be removed when we are sure the bug is fixed
-        for( i=0; i<peersReturning; ++i )
-            assert( tr_isAddress( &pex[i].addr ) );
-
         *setme_pex = pex;
     }
 
@@ -2394,13 +2393,20 @@ pumpAllPeers( tr_peerMgr * mgr )
 static int
 bandwidthPulse( void * vmgr )
 {
+    tr_handshake * handshake;
     tr_peerMgr * mgr = vmgr;
     managerLock( mgr );
 
+    /* FIXME: this next line probably isn't necessary... */
     pumpAllPeers( mgr );
+
+    /* allocate bandwidth to the peers */
     tr_bandwidthAllocate( mgr->session->bandwidth, TR_UP, BANDWIDTH_PERIOD_MSEC );
     tr_bandwidthAllocate( mgr->session->bandwidth, TR_DOWN, BANDWIDTH_PERIOD_MSEC );
-    pumpAllPeers( mgr );
+
+    /* free all the finished handshakes */
+    while(( handshake = tr_ptrArrayPop( mgr->finishedHandshakes )))
+        tr_handshakeFree( handshake );
 
     managerUnlock( mgr );
     return TRUE;