]> granicus.if.org Git - transmission/commitdiff
(trunk libT) revert r7548, which broke very low speed download limits.. the simplifie...
authorCharles Kerr <charles@transmissionbt.com>
Fri, 2 Jan 2009 23:28:57 +0000 (23:28 +0000)
committerCharles Kerr <charles@transmissionbt.com>
Fri, 2 Jan 2009 23:28:57 +0000 (23:28 +0000)
libtransmission/peer-msgs.c

index c9e8fe3105bb40d427ca0b5b5100f2991bb37840..0a4b7442fc9cd8d2ea1279fc58c33828b6651243 100644 (file)
@@ -106,6 +106,14 @@ enum
 ***  REQUEST MANAGEMENT
 **/
 
+enum
+{
+    AWAITING_BT_LENGTH,
+    AWAITING_BT_ID,
+    AWAITING_BT_MESSAGE,
+    AWAITING_BT_PIECE
+};
+
 struct peer_request
 {
     uint32_t    index;
@@ -244,8 +252,10 @@ reqListRemove( struct request_list *       list,
  * the current message that it's sending us. */
 struct tr_incoming
 {
-    uint32_t          length; /* includes the +1 for id length */
-    struct evbuffer * block; /* piece data for incoming blocks */
+    uint8_t                id;
+    uint32_t               length; /* includes the +1 for id length */
+    struct peer_request    blockReq; /* metadata for incoming blocks */
+    struct evbuffer *      block; /* piece data for incoming blocks */
 };
 
 /**
@@ -269,6 +279,7 @@ struct tr_peermsgs
     tr_bool         peerSentLtepHandshake;
     tr_bool         haveFastSet;
 
+    uint8_t         state;
     uint8_t         ut_pex_id;
     uint16_t        pexCount;
     uint16_t        pexCount6;
@@ -1267,14 +1278,48 @@ readBtLength( tr_peermsgs *     msgs,
         return READ_LATER;
 
     tr_peerIoReadUint32( msgs->peer->io, inbuf, &len );
-    msgs->incoming.length = len;
 
     if( len == 0 ) /* peer sent us a keepalive message */
         dbgmsg( msgs, "got KeepAlive" );
+    else
+    {
+        msgs->incoming.length = len;
+        msgs->state = AWAITING_BT_ID;
+    }
 
     return READ_NOW;
 }
 
+static int readBtMessage( tr_peermsgs *     msgs,
+                          struct evbuffer * inbuf,
+                          size_t            inlen );
+
+static int
+readBtId( tr_peermsgs *     msgs,
+          struct evbuffer * inbuf,
+          size_t            inlen )
+{
+    uint8_t id;
+
+    if( inlen < sizeof( uint8_t ) )
+        return READ_LATER;
+
+    tr_peerIoReadUint8( msgs->peer->io, inbuf, &id );
+    msgs->incoming.id = id;
+
+    if( id == BT_PIECE )
+    {
+        msgs->state = AWAITING_BT_PIECE;
+        return READ_NOW;
+    }
+    else if( msgs->incoming.length != 1 )
+    {
+        msgs->state = AWAITING_BT_MESSAGE;
+        return READ_NOW;
+    }
+    else return readBtMessage( msgs, inbuf, inlen - 1 );
+}
+
 static void
 updatePeerProgress( tr_peermsgs * msgs )
 {
@@ -1361,34 +1406,55 @@ readBtPiece( tr_peermsgs      * msgs,
              size_t             inlen,
              size_t           * setme_piece_bytes_read )
 {
-    struct peer_request req;
+    struct peer_request * req = &msgs->incoming.blockReq;
 
     assert( EVBUFFER_LENGTH( inbuf ) >= inlen );
     dbgmsg( msgs, "In readBtPiece" );
 
-    tr_peerIoReadUint32( msgs->peer->io, inbuf, &req.index );
-    tr_peerIoReadUint32( msgs->peer->io, inbuf, &req.offset );
-    req.length = msgs->incoming.length - 9;
-    dbgmsg( msgs, "got incoming block header %u:%u->%u", req.index, req.offset, req.length );
-
-    { 
+    if( !req->length )
+    {
+        if( inlen < 8 )
+            return READ_LATER;
+
+        tr_peerIoReadUint32( msgs->peer->io, inbuf, &req->index );
+        tr_peerIoReadUint32( msgs->peer->io, inbuf, &req->offset );
+        req->length = msgs->incoming.length - 9;
+        dbgmsg( msgs, "got incoming block header %u:%u->%u", req->index, req->offset, req->length );
+        return READ_NOW;
+    }
+    else
+    {
         int err;
 
-        /* decrypt the whole block in one go */
-        evbuffer_expand( msgs->incoming.block, req.length );
-        tr_peerIoReadBytes( msgs->peer->io, inbuf, EVBUFFER_DATA( msgs->incoming.block ), req.length );
-        EVBUFFER_LENGTH( msgs->incoming.block ) += req.length;
+        /* read in another chunk of data */
+        const size_t nLeft = req->length - EVBUFFER_LENGTH( msgs->incoming.block );
+        size_t n = MIN( nLeft, inlen );
+        size_t i = n;
+
+        while( i > 0 )
+        {
+            uint8_t buf[MAX_STACK_ARRAY_SIZE];
+            const size_t thisPass = MIN( i, sizeof( buf ) );
+            tr_peerIoReadBytes( msgs->peer->io, inbuf, buf, thisPass );
+            evbuffer_add( msgs->incoming.block, buf, thisPass );
+            i -= thisPass;
+        }
 
-        fireClientGotData( msgs, req.length, TRUE );
-        *setme_piece_bytes_read += req.length;
-        dbgmsg( msgs, "got block %u:%u->%u", req.index, req.offset, req.length );
-        assert( EVBUFFER_LENGTH( msgs->incoming.block ) == req.length );
+        fireClientGotData( msgs, n, TRUE );
+        *setme_piece_bytes_read += n;
+        dbgmsg( msgs, "got %zu bytes for block %u:%u->%u ... %d remain",
+               n, req->index, req->offset, req->length,
+               (int)( req->length - EVBUFFER_LENGTH( msgs->incoming.block ) ) );
+        if( EVBUFFER_LENGTH( msgs->incoming.block ) < req->length )
+            return READ_LATER;
 
         /* we've got the whole block ... process it */
-        err = clientGotBlock( msgs, EVBUFFER_DATA( msgs->incoming.block ), &req );
+        err = clientGotBlock( msgs, EVBUFFER_DATA( msgs->incoming.block ), req );
 
         /* cleanup */
         evbuffer_drain( msgs->incoming.block, EVBUFFER_LENGTH( msgs->incoming.block ) );
+        req->length = 0;
+        msgs->state = AWAITING_BT_LENGTH;
         if( !err )
             return READ_NOW;
         else {
@@ -1399,31 +1465,28 @@ readBtPiece( tr_peermsgs      * msgs,
 }
 
 static int
-readBtMessage( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen, size_t * piece )
+readBtMessage( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen )
 {
-    int           ret = READ_NOW;
-    uint8_t       id;
     uint32_t      ui32;
     uint32_t      msglen = msgs->incoming.length;
+    const uint8_t id = msgs->incoming.id;
     const size_t  startBufLen = EVBUFFER_LENGTH( inbuf );
     const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
 
+    --msglen; /* id length */
+
     if( inlen < msglen )
         return READ_LATER;
 
-    tr_peerIoReadUint8( msgs->peer->io, inbuf, &id );
-
     dbgmsg( msgs, "got BT id %d, len %d, buffer size is %zu", (int)id, (int)msglen, inlen );
 
-    if( !messageLengthIsCorrect( msgs, id, msglen ) )
+    if( !messageLengthIsCorrect( msgs, id, msglen + 1 ) )
     {
         dbgmsg( msgs, "bad packet - BT message #%d with a length of %d", (int)id, (int)msglen );
         fireError( msgs, EMSGSIZE );
         return READ_ERR;
     }
 
-    --msglen;
-
     switch( id )
     {
         case BT_CHOKE:
@@ -1494,7 +1557,7 @@ readBtMessage( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen, size_t
         }
 
         case BT_PIECE:
-            ret = readBtPiece( msgs, inbuf, msglen, piece );
+            assert( 0 ); /* handled elsewhere! */
             break;
 
         case BT_PORT:
@@ -1573,10 +1636,11 @@ readBtMessage( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen, size_t
             break;
     }
 
-    assert( EVBUFFER_LENGTH( inbuf ) == startBufLen - msglen - 1 );
+    assert( msglen + 1 == msgs->incoming.length );
+    assert( EVBUFFER_LENGTH( inbuf ) == startBufLen - msglen );
 
-    msgs->incoming.length = 0;
-    return ret;
+    msgs->state = AWAITING_BT_LENGTH;
+    return READ_NOW;
 }
 
 static inline void
@@ -1677,15 +1741,27 @@ canRead( tr_peerIo * io, void * vmsgs, size_t * piece )
     const size_t      inlen = EVBUFFER_LENGTH( in );
 
     if( !inlen )
-        return READ_LATER;
+    {
+        ret = READ_LATER;
+    }
+    else if( msgs->state == AWAITING_BT_PIECE )
+    {
+        ret = inlen ? readBtPiece( msgs, in, inlen, piece ) : READ_LATER;
+    }
+    else switch( msgs->state )
+    {
+        case AWAITING_BT_LENGTH:
+            ret = readBtLength ( msgs, in, inlen ); break;
 
-    /* Incoming data is processed in two stages. First the length is read
-     * and then readBtMessage() waits until all the data has arrived in
-     * the input buffer before starting to parse it */
-    if( msgs->incoming.length == 0 )
-        ret = readBtLength ( msgs, in, inlen );
-    else
-        ret = readBtMessage( msgs, in, inlen, piece );
+        case AWAITING_BT_ID:
+            ret = readBtId     ( msgs, in, inlen ); break;
+
+        case AWAITING_BT_MESSAGE:
+            ret = readBtMessage( msgs, in, inlen ); break;
+
+        default:
+            assert( 0 );
+    }
 
     /* log the raw data that was read */
     if( ( ret != READ_ERR ) && ( EVBUFFER_LENGTH( in ) != inlen ) )
@@ -2165,6 +2241,7 @@ tr_peerMsgsNew( struct tr_torrent * torrent,
     m->peer->clientIsInterested = 0;
     m->peer->peerIsInterested = 0;
     m->peer->have = tr_bitfieldNew( torrent->info.pieceCount );
+    m->state = AWAITING_BT_LENGTH;
     m->pexTimer = tr_timerNew( m->session, pexPulse, m, PEX_INTERVAL );
     m->outMessages = evbuffer_new( );
     m->outMessagesBatchedAt = 0;