From: Charles Kerr Date: Fri, 2 Jan 2009 23:28:57 +0000 (+0000) Subject: (trunk libT) revert r7548, which broke very low speed download limits.. the simplifie... X-Git-Tag: 1.60~588 X-Git-Url: https://granicus.if.org/sourcecode?a=commitdiff_plain;h=49f46dcddbddd589217565e71eff22db68247b47;p=transmission (trunk libT) revert r7548, which broke very low speed download limits.. the simplified peer-msgs parsing didn't distinguish between piece & raw data until the piece was done downloading. --- diff --git a/libtransmission/peer-msgs.c b/libtransmission/peer-msgs.c index c9e8fe310..0a4b7442f 100644 --- a/libtransmission/peer-msgs.c +++ b/libtransmission/peer-msgs.c @@ -106,6 +106,14 @@ enum *** REQUEST MANAGEMENT **/ +enum +{ + AWAITING_BT_LENGTH, + AWAITING_BT_ID, + AWAITING_BT_MESSAGE, + AWAITING_BT_PIECE +}; + struct peer_request { uint32_t index; @@ -244,8 +252,10 @@ reqListRemove( struct request_list * list, * the current message that it's sending us. */ struct tr_incoming { - uint32_t length; /* includes the +1 for id length */ - struct evbuffer * block; /* piece data for incoming blocks */ + uint8_t id; + uint32_t length; /* includes the +1 for id length */ + struct peer_request blockReq; /* metadata for incoming blocks */ + struct evbuffer * block; /* piece data for incoming blocks */ }; /** @@ -269,6 +279,7 @@ struct tr_peermsgs tr_bool peerSentLtepHandshake; tr_bool haveFastSet; + uint8_t state; uint8_t ut_pex_id; uint16_t pexCount; uint16_t pexCount6; @@ -1267,14 +1278,48 @@ readBtLength( tr_peermsgs * msgs, return READ_LATER; tr_peerIoReadUint32( msgs->peer->io, inbuf, &len ); - msgs->incoming.length = len; if( len == 0 ) /* peer sent us a keepalive message */ dbgmsg( msgs, "got KeepAlive" ); + else + { + msgs->incoming.length = len; + msgs->state = AWAITING_BT_ID; + } return READ_NOW; } +static int readBtMessage( tr_peermsgs * msgs, + struct evbuffer * inbuf, + size_t inlen ); + +static int +readBtId( tr_peermsgs * msgs, + struct evbuffer * inbuf, + size_t inlen ) +{ + uint8_t id; + + if( inlen < sizeof( uint8_t ) ) + return READ_LATER; + + tr_peerIoReadUint8( msgs->peer->io, inbuf, &id ); + msgs->incoming.id = id; + + if( id == BT_PIECE ) + { + msgs->state = AWAITING_BT_PIECE; + return READ_NOW; + } + else if( msgs->incoming.length != 1 ) + { + msgs->state = AWAITING_BT_MESSAGE; + return READ_NOW; + } + else return readBtMessage( msgs, inbuf, inlen - 1 ); +} + static void updatePeerProgress( tr_peermsgs * msgs ) { @@ -1361,34 +1406,55 @@ readBtPiece( tr_peermsgs * msgs, size_t inlen, size_t * setme_piece_bytes_read ) { - struct peer_request req; + struct peer_request * req = &msgs->incoming.blockReq; assert( EVBUFFER_LENGTH( inbuf ) >= inlen ); dbgmsg( msgs, "In readBtPiece" ); - tr_peerIoReadUint32( msgs->peer->io, inbuf, &req.index ); - tr_peerIoReadUint32( msgs->peer->io, inbuf, &req.offset ); - req.length = msgs->incoming.length - 9; - dbgmsg( msgs, "got incoming block header %u:%u->%u", req.index, req.offset, req.length ); - - { + if( !req->length ) + { + if( inlen < 8 ) + return READ_LATER; + + tr_peerIoReadUint32( msgs->peer->io, inbuf, &req->index ); + tr_peerIoReadUint32( msgs->peer->io, inbuf, &req->offset ); + req->length = msgs->incoming.length - 9; + dbgmsg( msgs, "got incoming block header %u:%u->%u", req->index, req->offset, req->length ); + return READ_NOW; + } + else + { int err; - /* decrypt the whole block in one go */ - evbuffer_expand( msgs->incoming.block, req.length ); - tr_peerIoReadBytes( msgs->peer->io, inbuf, EVBUFFER_DATA( msgs->incoming.block ), req.length ); - EVBUFFER_LENGTH( msgs->incoming.block ) += req.length; + /* read in another chunk of data */ + const size_t nLeft = req->length - EVBUFFER_LENGTH( msgs->incoming.block ); + size_t n = MIN( nLeft, inlen ); + size_t i = n; + + while( i > 0 ) + { + uint8_t buf[MAX_STACK_ARRAY_SIZE]; + const size_t thisPass = MIN( i, sizeof( buf ) ); + tr_peerIoReadBytes( msgs->peer->io, inbuf, buf, thisPass ); + evbuffer_add( msgs->incoming.block, buf, thisPass ); + i -= thisPass; + } - fireClientGotData( msgs, req.length, TRUE ); - *setme_piece_bytes_read += req.length; - dbgmsg( msgs, "got block %u:%u->%u", req.index, req.offset, req.length ); - assert( EVBUFFER_LENGTH( msgs->incoming.block ) == req.length ); + fireClientGotData( msgs, n, TRUE ); + *setme_piece_bytes_read += n; + dbgmsg( msgs, "got %zu bytes for block %u:%u->%u ... %d remain", + n, req->index, req->offset, req->length, + (int)( req->length - EVBUFFER_LENGTH( msgs->incoming.block ) ) ); + if( EVBUFFER_LENGTH( msgs->incoming.block ) < req->length ) + return READ_LATER; /* we've got the whole block ... process it */ - err = clientGotBlock( msgs, EVBUFFER_DATA( msgs->incoming.block ), &req ); + err = clientGotBlock( msgs, EVBUFFER_DATA( msgs->incoming.block ), req ); /* cleanup */ evbuffer_drain( msgs->incoming.block, EVBUFFER_LENGTH( msgs->incoming.block ) ); + req->length = 0; + msgs->state = AWAITING_BT_LENGTH; if( !err ) return READ_NOW; else { @@ -1399,31 +1465,28 @@ readBtPiece( tr_peermsgs * msgs, } static int -readBtMessage( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen, size_t * piece ) +readBtMessage( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen ) { - int ret = READ_NOW; - uint8_t id; uint32_t ui32; uint32_t msglen = msgs->incoming.length; + const uint8_t id = msgs->incoming.id; const size_t startBufLen = EVBUFFER_LENGTH( inbuf ); const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io ); + --msglen; /* id length */ + if( inlen < msglen ) return READ_LATER; - tr_peerIoReadUint8( msgs->peer->io, inbuf, &id ); - dbgmsg( msgs, "got BT id %d, len %d, buffer size is %zu", (int)id, (int)msglen, inlen ); - if( !messageLengthIsCorrect( msgs, id, msglen ) ) + if( !messageLengthIsCorrect( msgs, id, msglen + 1 ) ) { dbgmsg( msgs, "bad packet - BT message #%d with a length of %d", (int)id, (int)msglen ); fireError( msgs, EMSGSIZE ); return READ_ERR; } - --msglen; - switch( id ) { case BT_CHOKE: @@ -1494,7 +1557,7 @@ readBtMessage( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen, size_t } case BT_PIECE: - ret = readBtPiece( msgs, inbuf, msglen, piece ); + assert( 0 ); /* handled elsewhere! */ break; case BT_PORT: @@ -1573,10 +1636,11 @@ readBtMessage( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen, size_t break; } - assert( EVBUFFER_LENGTH( inbuf ) == startBufLen - msglen - 1 ); + assert( msglen + 1 == msgs->incoming.length ); + assert( EVBUFFER_LENGTH( inbuf ) == startBufLen - msglen ); - msgs->incoming.length = 0; - return ret; + msgs->state = AWAITING_BT_LENGTH; + return READ_NOW; } static inline void @@ -1677,15 +1741,27 @@ canRead( tr_peerIo * io, void * vmsgs, size_t * piece ) const size_t inlen = EVBUFFER_LENGTH( in ); if( !inlen ) - return READ_LATER; + { + ret = READ_LATER; + } + else if( msgs->state == AWAITING_BT_PIECE ) + { + ret = inlen ? readBtPiece( msgs, in, inlen, piece ) : READ_LATER; + } + else switch( msgs->state ) + { + case AWAITING_BT_LENGTH: + ret = readBtLength ( msgs, in, inlen ); break; - /* Incoming data is processed in two stages. First the length is read - * and then readBtMessage() waits until all the data has arrived in - * the input buffer before starting to parse it */ - if( msgs->incoming.length == 0 ) - ret = readBtLength ( msgs, in, inlen ); - else - ret = readBtMessage( msgs, in, inlen, piece ); + case AWAITING_BT_ID: + ret = readBtId ( msgs, in, inlen ); break; + + case AWAITING_BT_MESSAGE: + ret = readBtMessage( msgs, in, inlen ); break; + + default: + assert( 0 ); + } /* log the raw data that was read */ if( ( ret != READ_ERR ) && ( EVBUFFER_LENGTH( in ) != inlen ) ) @@ -2165,6 +2241,7 @@ tr_peerMsgsNew( struct tr_torrent * torrent, m->peer->clientIsInterested = 0; m->peer->peerIsInterested = 0; m->peer->have = tr_bitfieldNew( torrent->info.pieceCount ); + m->state = AWAITING_BT_LENGTH; m->pexTimer = tr_timerNew( m->session, pexPulse, m, PEX_INTERVAL ); m->outMessages = evbuffer_new( ); m->outMessagesBatchedAt = 0;