From 0a4d8602e0d1e687c4ca939706effaa3172ab7a6 Mon Sep 17 00:00:00 2001 From: Charles Kerr Date: Sun, 1 Nov 2009 02:10:47 +0000 Subject: [PATCH] (trunk libT) #2548: T's request queue can send out too many duplicate requests --- libtransmission/peer-mgr.c | 369 +++++++++++++++--------------------- libtransmission/peer-mgr.h | 2 + libtransmission/peer-msgs.c | 5 + libtransmission/torrent.c | 1 + 4 files changed, 165 insertions(+), 212 deletions(-) diff --git a/libtransmission/peer-mgr.c b/libtransmission/peer-mgr.c index 853ff47ab..a507b0f35 100644 --- a/libtransmission/peer-mgr.c +++ b/libtransmission/peer-mgr.c @@ -85,9 +85,16 @@ enum MYFLAG_UNREACHABLE = 2, /* the minimum we'll wait before attempting to reconnect to a peer */ - MINIMUM_RECONNECT_INTERVAL_SECS = 5 -}; + MINIMUM_RECONNECT_INTERVAL_SECS = 5, + + /* this is how many blocks we'll try to queue up + * for the iterator to walk through */ + ITERATOR_BLOCK_BUFFER_SIZE = 2048, + /* if the number of blocks in the iterator queue drops + * below this number, fill it up with more */ + ITERATOR_LOW_MARK = 256 +}; /** *** @@ -136,12 +143,21 @@ tr_atomAddrStr( const struct peer_atom * atom ) return tr_peerIoAddrStr( &atom->addr, atom->port ); } +struct tr_blockIteratorItem +{ + tr_block_index_t block; + uint8_t requestCount; +}; + struct tr_blockIterator { - time_t expirationDate; + tr_bool didLoop; + int pos; + int size; + int begin; + struct tr_blockIteratorItem items[ITERATOR_BLOCK_BUFFER_SIZE]; struct tr_torrent_peers * t; - tr_block_index_t blockIndex, blockCount, *blocks; - tr_piece_index_t pieceIndex, pieceCount, *pieces; + tr_priority_t priority; }; typedef struct tr_torrent_peers @@ -157,7 +173,6 @@ typedef struct tr_torrent_peers tr_peer * optimistic; /* the optimistic peer, or NULL if none */ struct tr_blockIterator * refillQueue; /* used in refillPulse() */ struct tr_peerMgr * manager; - int * pendingRequestCount; tr_bool isRunning; } @@ -170,7 +185,6 @@ struct tr_peerMgr tr_timer * bandwidthTimer; tr_timer * rechokeTimer; tr_timer * reconnectTimer; - tr_timer * refillUpkeepTimer; }; #define tordbg( t, ... ) \ @@ -416,7 +430,6 @@ torrentDestructor( void * vt ) tr_ptrArrayDestruct( &t->outgoingHandshakes, NULL ); tr_ptrArrayDestruct( &t->peers, NULL ); - tr_free( t->pendingRequestCount ); tr_free( t ); } @@ -458,7 +471,6 @@ torrentConstructor( tr_peerMgr * manager, static int bandwidthPulse ( void * vmgr ); static int rechokePulse ( void * vmgr ); static int reconnectPulse ( void * vmgr ); -static int refillUpkeep ( void * vmgr ); tr_peerMgr* tr_peerMgrNew( tr_session * session ) @@ -480,9 +492,6 @@ deleteTimers( struct tr_peerMgr * m ) if( m->reconnectTimer ) tr_timerFree( &m->reconnectTimer ); - - if( m->refillUpkeepTimer ) - tr_timerFree( &m->refillUpkeepTimer ); } void @@ -539,213 +548,178 @@ tr_peerMgrPeerIsSeed( const tr_torrent * tor, ***** ****/ -static void -assertValidPiece( Torrent * t, tr_piece_index_t piece ) -{ - assert( t ); - assert( t->tor ); - assert( piece < t->tor->info.pieceCount ); -} - -static int -getPieceRequests( Torrent * t, tr_piece_index_t piece ) -{ - assertValidPiece( t, piece ); - - return t->pendingRequestCount ? t->pendingRequestCount[piece] : 0; -} - -static void -incrementPieceRequests( Torrent * t, tr_piece_index_t piece ) -{ - assertValidPiece( t, piece ); - - if( t->pendingRequestCount == NULL ) - t->pendingRequestCount = tr_new0( int, t->tor->info.pieceCount ); - t->pendingRequestCount[piece]++; -} - -static void -decrementPieceRequests( Torrent * t, tr_piece_index_t piece ) +static struct tr_blockIterator* +blockIteratorNew( Torrent * t ) { - assertValidPiece( t, piece ); - - if( t->pendingRequestCount ) - t->pendingRequestCount[piece]--; + struct tr_blockIterator * i = tr_new0( struct tr_blockIterator, 1 ); + i->pos = 0; + i->size = 0; + i->priority = TR_PRI_HIGH; + i->t = t; + return i; } -struct tr_refill_piece -{ - tr_priority_t priority; - uint32_t piece; - uint32_t peerCount; - int random; - int pendingRequestCount; - int missingBlockCount; -}; - static int -compareRefillPiece( const void * aIn, const void * bIn ) +compareIteratorItems( const void * va, const void * vb ) { - const struct tr_refill_piece * a = aIn; - const struct tr_refill_piece * b = bIn; - - /* if one piece has a higher priority, it goes first */ - if( a->priority != b->priority ) - return a->priority > b->priority ? -1 : 1; - - /* have a per-priority endgame */ - if( a->pendingRequestCount != b->pendingRequestCount ) - return a->pendingRequestCount < b->pendingRequestCount ? -1 : 1; - - /* fewer missing pieces goes first */ - if( a->missingBlockCount != b->missingBlockCount ) - return a->missingBlockCount < b->missingBlockCount ? -1 : 1; - - /* otherwise if one has fewer peers, it goes first */ - if( a->peerCount != b->peerCount ) - return a->peerCount < b->peerCount ? -1 : 1; - - /* otherwise go with our random seed */ - if( a->random != b->random ) - return a->random < b->random ? -1 : 1; - + const struct tr_blockIteratorItem * a = va; + const struct tr_blockIteratorItem * b = vb; + if( a->block < b->block ) return -1; + if( a->block > b->block ) return 1; return 0; } -static tr_piece_index_t * -getPreferredPieces( Torrent * t, tr_piece_index_t * pieceCount ) +static void +blockIteratorRefill( struct tr_blockIterator * it ) { - const tr_torrent * tor = t->tor; - const tr_info * inf = &tor->info; - tr_piece_index_t i; - tr_piece_index_t poolSize = 0; - tr_piece_index_t * pool = tr_new( tr_piece_index_t , inf->pieceCount ); - int peerCount; - const tr_peer ** peers; - - assert( torrentIsLocked( t ) ); + int pieceCount; + tr_piece_index_t * pieces; + const tr_torrent * tor = it->t->tor; + const tr_info * inf = tr_torrentInfo( tor ); - peers = (const tr_peer**) tr_ptrArrayBase( &t->peers ); - peerCount = tr_ptrArraySize( &t->peers ); - - /* make a list of the pieces that we want but don't have */ - for( i = 0; i < inf->pieceCount; ++i ) - if( !tor->info.pieces[i].dnd - && !tr_cpPieceIsComplete( &tor->completion, i ) ) - pool[poolSize++] = i; - - /* sort the pool by which to request next */ - if( poolSize > 1 ) + /* build a pool of the pieces we might request blocks from */ + pieces = tr_new( tr_piece_index_t, inf->pieceCount ); + pieceCount = 0; { - tr_piece_index_t j; - struct tr_refill_piece * p = tr_new( struct tr_refill_piece, poolSize ); + tr_piece_index_t i; + for( i=0; ipieceCount; ++i ) + if( !inf->pieces[i].dnd ) + if( inf->pieces[i].priority == it->priority ) + if( !tr_cpPieceIsComplete( &tor->completion, i ) ) + pieces[pieceCount++] = i; + } - for( j = 0; j < poolSize; ++j ) - { - int k; - const tr_piece_index_t piece = pool[j]; - struct tr_refill_piece * setme = p + j; - - setme->piece = piece; - setme->priority = inf->pieces[piece].priority; - setme->peerCount = 0; - setme->random = tr_cryptoWeakRandInt( INT_MAX ); - setme->pendingRequestCount = getPieceRequests( t, piece ); - setme->missingBlockCount - = tr_cpMissingBlocksInPiece( &tor->completion, piece ); - - for( k = 0; k < peerCount; ++k ) - { - const tr_peer * peer = peers[k]; - if( peer->peerIsInterested - && !peer->clientIsChoked - && tr_bitfieldHas( peer->have, piece ) ) - ++setme->peerCount; + /* while we're short on blocks and there are still pieces left... */ + while( ( it->size < ITERATOR_BLOCK_BUFFER_SIZE ) && ( pieceCount > 0 ) ) + { + tr_block_index_t i; + tr_block_index_t b; + tr_block_index_t e; + + /* pull a random piece out of the pool */ + const int poolIndex = tr_cryptoRandInt( pieceCount ); + const tr_piece_index_t piece = pieces[poolIndex]; + pieces[poolIndex] = pieces[--pieceCount]; + + /* add the piece's blocks that we don't have to our iterator */ + b = tr_torPieceFirstBlock( tor, piece ); + e = b + tr_torPieceCountBlocks( tor, piece ); + for( i=b; (i!=e) && (it->size < ITERATOR_BLOCK_BUFFER_SIZE); ++i ) { + if( !tr_cpBlockIsCompleteFast( &tor->completion, i ) ) { + int pos; + struct tr_blockIteratorItem tmp; + tr_bool match; + tmp.block = i; + tmp.requestCount = 0; + pos = tr_lowerBound( &tmp, it->items, it->size, sizeof(struct tr_blockIteratorItem), compareIteratorItems, &match ); + if( match ) + continue; + memmove( it->items+pos+1, it->items+pos, sizeof(struct tr_blockIteratorItem)*(it->size-pos) ); + it->items[pos] = tmp; + ++it->size; } } + } - qsort( p, poolSize, sizeof( struct tr_refill_piece ), - compareRefillPiece ); + if( it->pos >= it->size ) + it->pos = 0; - for( j = 0; j < poolSize; ++j ) - pool[j] = p[j].piece; + /* cleanup */ + tr_free( pieces ); +} - tr_free( p ); +static void +blockIteratorRewind( struct tr_blockIterator * it ) +{ + /* if we don't have any blocks in the iterator, + * try to regenerate a list of blocks in the current priority. + * if that fails, go to the next lower priority and retry. */ + for( ;; ) { + if( it->size <= ITERATOR_LOW_MARK ) + blockIteratorRefill( it ); + if( it->size > 0 ) + break; + if( it->priority == TR_PRI_LOW ) + return; + --it->priority; /* try a lower priority */ } - *pieceCount = poolSize; - return pool; -} - -static struct tr_blockIterator* -blockIteratorNew( Torrent * t ) -{ - struct tr_blockIterator * i = tr_new0( struct tr_blockIterator, 1 ); - i->expirationDate = time( NULL ) + PIECE_LIST_SHELF_LIFE_SECS; - i->t = t; - i->pieces = getPreferredPieces( t, &i->pieceCount ); - i->blocks = tr_new0( tr_block_index_t, t->tor->blockCountInPiece ); - tordbg( t, "creating new refill queue.. it contains %"PRIu32" pieces", i->pieceCount ); - return i; + it->begin = it->pos; + it->didLoop = FALSE; } static tr_bool -blockIteratorNext( struct tr_blockIterator * i, tr_block_index_t * setme ) +blockIteratorNext( struct tr_blockIterator * it, tr_block_index_t * setme ) { - tr_bool found; - Torrent * t = i->t; - tr_torrent * tor = t->tor; + static const int single_req_threshold = 100; + static const int double_req_threshold = 50; - while( ( i->blockIndex == i->blockCount ) - && ( i->pieceIndex < i->pieceCount ) ) + while( !it->didLoop ) { - const tr_piece_index_t index = i->pieces[i->pieceIndex++]; - const tr_block_index_t b = tr_torPieceFirstBlock( tor, index ); - const tr_block_index_t e = b + tr_torPieceCountBlocks( tor, index ); - tr_block_index_t block; + ++it->pos; + it->pos %= it->size; + it->didLoop |= it->pos == it->begin; - assert( index < tor->info.pieceCount ); + if( ( it->items[it->pos].requestCount >= 1 ) && ( it->size >= single_req_threshold ) ) + continue; + if( ( it->items[it->pos].requestCount >= 2 ) && ( it->size >= double_req_threshold ) ) + continue; - i->blockCount = 0; - i->blockIndex = 0; - for( block=b; block!=e; ++block ) - if( !tr_cpBlockIsCompleteFast( &tor->completion, block ) ) - i->blocks[i->blockCount++] = block; + *setme = it->items[it->pos].block; + return TRUE; } - assert( i->blockCount <= tor->blockCountInPiece ); - - if(( found = ( i->blockIndex < i->blockCount ))) - *setme = i->blocks[i->blockIndex++]; - - return found; + return FALSE; } - static void -blockIteratorSkipCurrentPiece( struct tr_blockIterator * i ) +blockIteratorInvalidate( struct tr_blockIterator * it ) { - i->blockIndex = i->blockCount; + it->size = 0; + it->priority = TR_PRI_HIGH; } -static void -blockIteratorFree( struct tr_blockIterator ** inout ) +void +tr_peerMgrFilePrioritiesChanged( tr_torrent * tor ) { - struct tr_blockIterator * it = *inout; + if( ( tor != NULL ) && ( tor->torrentPeers != NULL ) && ( tor->torrentPeers->refillQueue != NULL ) ) + blockIteratorInvalidate( tor->torrentPeers->refillQueue ); +} +static void +blockIteratorRemoveBlock( struct tr_blockIterator * it, tr_block_index_t block ) +{ if( it != NULL ) { - tr_free( it->blocks ); - tr_free( it->pieces ); - tr_free( it ); + struct tr_blockIteratorItem tmp, *pos; + tmp.block = block; + pos = bsearch( &tmp, it->items, it->size, sizeof(struct tr_blockIteratorItem), compareIteratorItems ); + if( pos != NULL ) + { + const int i = pos - it->items; + + assert( pos->block == block ); + assert( i >= 0 ); + assert( i < it->size ); + + --it->size; + + memmove( it->items+i, it->items+i+1, sizeof(struct tr_blockIteratorItem) * (it->size-i) ); + + if( it->pos > i ) + --it->pos; + } } +} +static void +blockIteratorFree( struct tr_blockIterator ** inout ) +{ + tr_free( *inout ); *inout = NULL; } static tr_peer** -getPeersUploadingToClient( Torrent * t, - int * setmeCount ) +getPeersUploadingToClient( Torrent * t, int * setmeCount ) { int j; int peerCount = 0; @@ -782,27 +756,6 @@ getBlockOffsetInPiece( const tr_torrent * tor, uint64_t b ) return (uint32_t)( blockPos - piecePos ); } -static int -refillUpkeep( void * vmgr ) -{ - tr_torrent * tor = NULL; - tr_peerMgr * mgr = vmgr; - time_t now; - managerLock( mgr ); - - now = time( NULL ); - while(( tor = tr_torrentNext( mgr->session, tor ))) { - Torrent * t = tor->torrentPeers; - if( t && t->refillQueue && ( t->refillQueue->expirationDate <= now ) ) { - tordbg( t, "refill queue is past its shelf date; discarding." ); - blockIteratorFree( &t->refillQueue ); - } - } - - managerUnlock( mgr ); - return TRUE; -} - static void sortPeersByLiveliness( tr_peer ** peers, void ** clientData, int n, uint64_t now ); @@ -820,8 +773,10 @@ refillPulse( int fd UNUSED, short type UNUSED, void * vtorrent ) if( !t->isRunning ) return; - if( tr_torrentIsSeed( t->tor ) ) + if( tr_torrentIsSeed( t->tor ) ) { + blockIteratorFree( &t->refillQueue ); return; + } torrentLock( t ); tordbg( t, "Refilling Request Buffers..." ); @@ -835,6 +790,8 @@ refillPulse( int fd UNUSED, short type UNUSED, void * vtorrent ) webseeds = tr_memdup( tr_ptrArrayBase( &t->webseeds ), webseedCount * sizeof( tr_webseed* ) ); + blockIteratorRewind( t->refillQueue ); + while( ( webseedCount || peerCount ) && (( hasNext = blockIteratorNext( t->refillQueue, &block ))) ) { @@ -864,7 +821,7 @@ refillPulse( int fd UNUSED, short type UNUSED, void * vtorrent ) break; case TR_ADDREQ_OK: - incrementPieceRequests( t, index ); + ++t->refillQueue->items[t->refillQueue->pos].requestCount; handled = TRUE; break; @@ -885,7 +842,6 @@ refillPulse( int fd UNUSED, short type UNUSED, void * vtorrent ) break; case TR_ADDREQ_OK: - incrementPieceRequests( t, index ); handled = TRUE; break; @@ -894,20 +850,12 @@ refillPulse( int fd UNUSED, short type UNUSED, void * vtorrent ) break; } } - - if( !handled ) - blockIteratorSkipCurrentPiece( t->refillQueue ); } /* cleanup */ tr_free( webseeds ); tr_free( peers ); - if( !hasNext ) { - tordbg( t, "refill queue has no more blocks to request... freeing (webseed count: %d, peer count: %d)", webseedCount, peerCount ); - blockIteratorFree( &t->refillQueue ); - } - torrentUnlock( t ); } @@ -1039,7 +987,7 @@ peerCallbackFunc( void * vpeer, void * vevent, void * vt ) break; case TR_PEER_CANCEL: - decrementPieceRequests( t, e->pieceIndex ); + blockIteratorRemoveBlock( t->refillQueue, _tr_block( t->tor, e->pieceIndex, e->offset ) ); break; case TR_PEER_PEER_GOT_DATA: @@ -1133,7 +1081,7 @@ peerCallbackFunc( void * vpeer, void * vevent, void * vt ) tr_cpBlockAdd( &tor->completion, block ); tr_torrentSetDirty( tor ); - decrementPieceRequests( t, e->pieceIndex ); + blockIteratorRemoveBlock( t->refillQueue, block ); broadcastGotBlock( t, e->pieceIndex, e->offset, e->length ); @@ -1620,9 +1568,6 @@ ensureMgrTimersExist( struct tr_peerMgr * m ) if( m->reconnectTimer == NULL ) m->reconnectTimer = tr_timerNew( s, reconnectPulse, m, RECONNECT_PERIOD_MSEC ); - - if( m->refillUpkeepTimer == NULL ) - m->refillUpkeepTimer = tr_timerNew( s, refillUpkeep, m, REFILL_UPKEEP_PERIOD_MSEC ); } void diff --git a/libtransmission/peer-mgr.h b/libtransmission/peer-mgr.h index 7e9c35463..4dc8c5899 100644 --- a/libtransmission/peer-mgr.h +++ b/libtransmission/peer-mgr.h @@ -118,6 +118,8 @@ void tr_peerMgrFree( tr_peerMgr * manager ); tr_bool tr_peerMgrPeerIsSeed( const tr_torrent * tor, const tr_address * addr ); +void tr_peerMgrFilePrioritiesChanged( tr_torrent * tor ); + void tr_peerMgrAddIncoming( tr_peerMgr * manager, tr_address * addr, tr_port port, diff --git a/libtransmission/peer-msgs.c b/libtransmission/peer-msgs.c index 7a1c5f43e..8f2518615 100644 --- a/libtransmission/peer-msgs.c +++ b/libtransmission/peer-msgs.c @@ -458,10 +458,13 @@ firePeerProgress( tr_peermsgs * msgs ) publish( msgs, &e ); } +static double blocksGotten = 0.0; + static void fireGotBlock( tr_peermsgs * msgs, const struct peer_request * req ) { tr_peer_event e = blankEvent; +++blocksGotten; e.eventType = TR_PEER_CLIENT_GOT_BLOCK; e.pieceIndex = req->index; e.offset = req->offset; @@ -1558,6 +1561,8 @@ decrementDownloadedCount( tr_peermsgs * msgs, uint32_t byteCount ) static TR_INLINE void clientGotUnwantedBlock( tr_peermsgs * msgs, const struct peer_request * req ) { +static double unwantedGotten = 0.0; +fprintf( stderr, "dupe ratio: %f\n", ++unwantedGotten / blocksGotten ); decrementDownloadedCount( msgs, req->length ); } diff --git a/libtransmission/torrent.c b/libtransmission/torrent.c index 7d204d25c..08bfc78ea 100644 --- a/libtransmission/torrent.c +++ b/libtransmission/torrent.c @@ -1685,6 +1685,7 @@ tr_torrentSetFilePriorities( tr_torrent * tor, for( i = 0; i < fileCount; ++i ) tr_torrentInitFilePriority( tor, files[i], priority ); + tr_peerMgrFilePrioritiesChanged( tor ); tr_torrentSetDirty( tor ); tr_torrentUnlock( tor ); } -- 2.40.0