*** REQUEST MANAGEMENT
**/
-enum
-{
- AWAITING_BT_LENGTH,
- AWAITING_BT_ID,
- AWAITING_BT_MESSAGE,
- AWAITING_BT_PIECE
-};
-
struct peer_request
{
uint32_t index;
* the current message that it's sending us. */
struct tr_incoming
{
- uint8_t id;
- uint32_t length; /* includes the +1 for id length */
- struct peer_request blockReq; /* metadata for incoming blocks */
- struct evbuffer * block; /* piece data for incoming blocks */
+ uint32_t length; /* includes the +1 for id length */
+ struct evbuffer * block; /* piece data for incoming blocks */
};
/**
tr_bool peerSentLtepHandshake;
tr_bool haveFastSet;
- uint8_t state;
uint8_t ut_pex_id;
uint16_t pexCount;
uint16_t pexCount6;
return READ_LATER;
tr_peerIoReadUint32( msgs->peer->io, inbuf, &len );
+ msgs->incoming.length = len;
if( len == 0 ) /* peer sent us a keepalive message */
dbgmsg( msgs, "got KeepAlive" );
- else
- {
- msgs->incoming.length = len;
- msgs->state = AWAITING_BT_ID;
- }
return READ_NOW;
}
-static int readBtMessage( tr_peermsgs * msgs,
- struct evbuffer * inbuf,
- size_t inlen );
-
-static int
-readBtId( tr_peermsgs * msgs,
- struct evbuffer * inbuf,
- size_t inlen )
-{
- uint8_t id;
-
- if( inlen < sizeof( uint8_t ) )
- return READ_LATER;
-
- tr_peerIoReadUint8( msgs->peer->io, inbuf, &id );
- msgs->incoming.id = id;
-
- if( id == BT_PIECE )
- {
- msgs->state = AWAITING_BT_PIECE;
- return READ_NOW;
- }
- else if( msgs->incoming.length != 1 )
- {
- msgs->state = AWAITING_BT_MESSAGE;
- return READ_NOW;
- }
- else return readBtMessage( msgs, inbuf, inlen - 1 );
-}
-
static void
updatePeerProgress( tr_peermsgs * msgs )
{
size_t inlen,
size_t * setme_piece_bytes_read )
{
- struct peer_request * req = &msgs->incoming.blockReq;
+ struct peer_request req;
assert( EVBUFFER_LENGTH( inbuf ) >= inlen );
dbgmsg( msgs, "In readBtPiece" );
- if( !req->length )
- {
- if( inlen < 8 )
- return READ_LATER;
-
- tr_peerIoReadUint32( msgs->peer->io, inbuf, &req->index );
- tr_peerIoReadUint32( msgs->peer->io, inbuf, &req->offset );
- req->length = msgs->incoming.length - 9;
- dbgmsg( msgs, "got incoming block header %u:%u->%u", req->index, req->offset, req->length );
- return READ_NOW;
- }
- else
- {
- int err;
+ tr_peerIoReadUint32( msgs->peer->io, inbuf, &req.index );
+ tr_peerIoReadUint32( msgs->peer->io, inbuf, &req.offset );
+ req.length = msgs->incoming.length - 9;
+ dbgmsg( msgs, "got incoming block header %u:%u->%u", req.index, req.offset, req.length );
- /* read in another chunk of data */
- const size_t nLeft = req->length - EVBUFFER_LENGTH( msgs->incoming.block );
- size_t n = MIN( nLeft, inlen );
- size_t i = n;
+ {
+ int err;
- while( i > 0 )
- {
- uint8_t buf[MAX_STACK_ARRAY_SIZE];
- const size_t thisPass = MIN( i, sizeof( buf ) );
- tr_peerIoReadBytes( msgs->peer->io, inbuf, buf, thisPass );
- evbuffer_add( msgs->incoming.block, buf, thisPass );
- i -= thisPass;
- }
+ /* decrypt the whole block in one go */
+ evbuffer_expand( msgs->incoming.block, req.length );
+ tr_peerIoReadBytes( msgs->peer->io, inbuf, EVBUFFER_DATA( msgs->incoming.block ), req.length );
+ EVBUFFER_LENGTH( msgs->incoming.block ) += req.length;
- fireClientGotData( msgs, n, TRUE );
- *setme_piece_bytes_read += n;
- dbgmsg( msgs, "got %zu bytes for block %u:%u->%u ... %d remain",
- n, req->index, req->offset, req->length,
- (int)( req->length - EVBUFFER_LENGTH( msgs->incoming.block ) ) );
- if( EVBUFFER_LENGTH( msgs->incoming.block ) < req->length )
- return READ_LATER;
+ fireClientGotData( msgs, req.length, TRUE );
+ *setme_piece_bytes_read += req.length;
+ dbgmsg( msgs, "got block %u:%u->%u", req.index, req.offset, req.length );
+ assert( EVBUFFER_LENGTH( msgs->incoming.block ) == req.length );
/* we've got the whole block ... process it */
- err = clientGotBlock( msgs, EVBUFFER_DATA( msgs->incoming.block ), req );
+ err = clientGotBlock( msgs, EVBUFFER_DATA( msgs->incoming.block ), &req );
/* cleanup */
evbuffer_drain( msgs->incoming.block, EVBUFFER_LENGTH( msgs->incoming.block ) );
- req->length = 0;
- msgs->state = AWAITING_BT_LENGTH;
if( !err )
return READ_NOW;
else {
}
static int
-readBtMessage( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen )
+readBtMessage( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen, size_t * piece )
{
+ int ret = READ_NOW;
+ uint8_t id;
uint32_t ui32;
uint32_t msglen = msgs->incoming.length;
- const uint8_t id = msgs->incoming.id;
const size_t startBufLen = EVBUFFER_LENGTH( inbuf );
const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
- --msglen; /* id length */
-
if( inlen < msglen )
return READ_LATER;
+ tr_peerIoReadUint8( msgs->peer->io, inbuf, &id );
+
dbgmsg( msgs, "got BT id %d, len %d, buffer size is %zu", (int)id, (int)msglen, inlen );
- if( !messageLengthIsCorrect( msgs, id, msglen + 1 ) )
+ if( !messageLengthIsCorrect( msgs, id, msglen ) )
{
dbgmsg( msgs, "bad packet - BT message #%d with a length of %d", (int)id, (int)msglen );
fireError( msgs, EMSGSIZE );
return READ_ERR;
}
+ --msglen;
+
switch( id )
{
case BT_CHOKE:
}
case BT_PIECE:
- assert( 0 ); /* handled elsewhere! */
+ ret = readBtPiece( msgs, inbuf, msglen, piece );
break;
case BT_PORT:
break;
}
- assert( msglen + 1 == msgs->incoming.length );
- assert( EVBUFFER_LENGTH( inbuf ) == startBufLen - msglen );
+ assert( EVBUFFER_LENGTH( inbuf ) == startBufLen - msglen - 1 );
- msgs->state = AWAITING_BT_LENGTH;
- return READ_NOW;
+ msgs->incoming.length = 0;
+ return ret;
}
static void
const size_t inlen = EVBUFFER_LENGTH( in );
if( !inlen )
- {
- ret = READ_LATER;
- }
- else if( msgs->state == AWAITING_BT_PIECE )
- {
- ret = inlen ? readBtPiece( msgs, in, inlen, piece ) : READ_LATER;
- }
- else switch( msgs->state )
- {
- case AWAITING_BT_LENGTH:
- ret = readBtLength ( msgs, in, inlen ); break;
-
- case AWAITING_BT_ID:
- ret = readBtId ( msgs, in, inlen ); break;
-
- case AWAITING_BT_MESSAGE:
- ret = readBtMessage( msgs, in, inlen ); break;
+ return READ_LATER;
- default:
- assert( 0 );
- }
+ /* Incoming data is processed in two stages. First the length is read
+ * and then readBtMessage() waits until all the data has arrived in
+ * the input buffer before starting to parse it */
+ if( msgs->incoming.length == 0 )
+ ret = readBtLength ( msgs, in, inlen );
+ else
+ ret = readBtMessage( msgs, in, inlen, piece );
/* log the raw data that was read */
if( ( ret != READ_ERR ) && ( EVBUFFER_LENGTH( in ) != inlen ) )
m->peer->clientIsInterested = 0;
m->peer->peerIsInterested = 0;
m->peer->have = tr_bitfieldNew( torrent->info.pieceCount );
- m->state = AWAITING_BT_LENGTH;
m->pexTimer = tr_timerNew( m->session, pexPulse, m, PEX_INTERVAL );
m->outMessages = evbuffer_new( );
m->outMessagesBatchedAt = 0;