From 904a91ce76c9103169b0884f89bfd9ee21373f6d Mon Sep 17 00:00:00 2001 From: Jesper Louis Andersen Date: Sun, 3 Aug 2008 21:19:15 +0200 Subject: [PATCH] Cleanup and handle more fast extension specialities in etorrent_t_peer_send. --- lib/etorrent-1.0/src/etorrent_t_peer_send.erl | 63 ++++++++++++++++----------- lib/etorrent-1.0/src/etorrent_utils.erl | 17 +++----- 2 files changed, 43 insertions(+), 37 deletions(-) diff --git a/lib/etorrent-1.0/src/etorrent_t_peer_send.erl b/lib/etorrent-1.0/src/etorrent_t_peer_send.erl index 6ae649b..d5ae90f 100644 --- a/lib/etorrent-1.0/src/etorrent_t_peer_send.erl +++ b/lib/etorrent-1.0/src/etorrent_t_peer_send.erl @@ -30,7 +30,7 @@ handle_call/3, handle_cast/2]). -record(state, {socket = none, - request_queue = none, + requests = none, fast_extension = false, @@ -74,7 +74,7 @@ local_request(Pid, {Index, Offset, Size}) -> %% Description: Cancel the {Index, Offset, Len} at the peer. %%-------------------------------------------------------------------- cancel(Pid, Index, Offset, Len) -> - gen_server:cast(Pid, {cancel_piece, Index, Offset, Len}). + gen_server:cast(Pid, {cancel, Index, Offset, Len}). %%-------------------------------------------------------------------- %% Func: choke(Pid) @@ -125,14 +125,14 @@ stop(Pid) -> %%==================================================================== init([Socket, FilesystemPid, TorrentId, FastExtension, Parent]) -> process_flag(trap_exit, true), - {ok, TRef} = timer:send_interval(?DEFAULT_KEEP_ALIVE_INTERVAL, self(), keep_alive_tick), + {ok, TRef} = timer:send_interval(?DEFAULT_KEEP_ALIVE_INTERVAL, self(), tick), {ok, Tref2} = timer:send_interval(?RATE_UPDATE, self(), rate_update), {ok, #state{socket = Socket, timer = TRef, rate_timer = Tref2, - request_queue = queue:new(), - rate = etorrent_rate:init(?RATE_FUDGE), + requests = queue:new(), + rate = etorrent_rate:init(), parent = Parent, torrent_id = TorrentId, fast_extension = FastExtension, @@ -159,7 +159,7 @@ handle_call(_Request, _From, State) -> code_change(_OldVsn, State, _Extra) -> {ok, State}. -handle_info(keep_alive_tick, S) -> +handle_info(tick, S) -> send_message(keep_alive, S, 0); handle_info(rate_update, S) -> Rate = etorrent_rate:update(S#state.rate, 0), @@ -175,11 +175,11 @@ handle_info(timeout, S) handle_info(timeout, S) when S#state.choke =:= true -> {noreply, S}; handle_info(timeout, S) when S#state.choke =:= false -> - case queue:out(S#state.request_queue) of + case queue:out(S#state.requests) of {empty, _} -> {noreply, S}; {{value, {Index, Offset, Len}}, NewQ} -> - send_piece(Index, Offset, Len, S#state { request_queue = NewQ } ) + send_piece(Index, Offset, Len, S#state { requests = NewQ } ) end; handle_info(Msg, S) -> error_logger:info_report([got_unknown_message, Msg, S]), @@ -191,8 +191,7 @@ handle_cast(unchoke, S) when S#state.choke == false -> {noreply, S, 0}; handle_cast(unchoke, S) when S#state.choke == true -> ok = etorrent_rate_mgr:local_unchoke(S#state.torrent_id, S#state.parent), - send_message(unchoke, S#state{choke = false, - request_queue = queue:new()}); + send_message(unchoke, S#state{choke = false}); handle_cast(check_choke, S) when S#state.choke =:= true -> {noreply, S, 0}; handle_cast(check_choke, S) when S#state.choke =:= false -> @@ -212,22 +211,34 @@ handle_cast({have, Pn}, S) -> send_message({have, Pn}, S); handle_cast({local_request, {Index, Offset, Size}}, S) -> send_message({request, Index, Offset, Size}, S); +handle_cast({remote_request, Idx, Offset, Len}, S) + when S#state.fast_extension =:= true, S#state.choke == true -> + send_message({reject_request, Idx, Offset, Len}, S, 0); handle_cast({remote_request, _Index, _Offset, _Len}, S) when S#state.choke == true -> {noreply, S, 0}; handle_cast({remote_request, Index, Offset, Len}, S) when S#state.choke == false -> - Requests = queue:len(S#state.request_queue), - case Requests > ?MAX_REQUESTS of + case queue:len(S#state.requests) > ?MAX_REQUESTS of + true when S#state.fast_extension =:= true -> + send_message({reject_request, Index, Offset, Len}, S, 0); true -> {stop, max_queue_len_exceeded, S}; false -> - NQ = queue:in({Index, Offset, Len}, S#state.request_queue), - {noreply, S#state{request_queue = NQ}, 0} + NQ = queue:in({Index, Offset, Len}, S#state.requests), + {noreply, S#state{requests = NQ}, 0} end; -handle_cast({cancel_piece, Index, OffSet, Len}, S) -> - NQ = etorrent_utils:queue_remove({Index, OffSet, Len}, S#state.request_queue), - {noreply, S#state{request_queue = NQ}, 0}; +handle_cast({cancel, Idx, Offset, Len}, S) when S#state.fast_extension =:= true -> + try + NQ = etorrent_utils:queue_remove_check({Idx, Offset, Len}, + S#state.requests), + {noreply, S#state { requests = NQ}, 0} + catch + exit:badmatch -> {stop, normal, S} + end; +handle_cast({cancel, Index, OffSet, Len}, S) -> + NQ = etorrent_utils:queue_remove({Index, OffSet, Len}, S#state.requests), + {noreply, S#state{requests = NQ}, 0}; handle_cast(stop, S) -> {stop, normal, S}. @@ -312,24 +323,24 @@ perform_choke(S = #state { fast_extension = FX, choke = C}) -> {false, true} -> {noreply, S, 0}; {false, false} -> ok = local_choke(S), - send_message(choke, S#state{choke = true, piece_cache = none}); + send_message(choke, S#state{choke = true, requests = queue:new(), + piece_cache = none}); {true, true} -> {noreply, S, 0}; {true, false} -> local_choke(S), {ok, NS} = send(choke, S), - FS = empty_piece_cache(NS), + FS = empty_requests(NS), {noreply, FS, 0} end. +empty_requests(S) -> + empty_requests(queue:out(S#state.requests), S). -empty_piece_cache(S) -> - empty_piece_cache(S#state.piece_cache, S). - -empty_piece_cache([], S) -> - S#state { piece_cache = [] }; -empty_piece_cache([{Index, Offset, Len} | Next], S) -> +empty_requests({empty, Q}, S) -> + S#state { requests = Q , piece_cache = none}; +empty_requests({{value, {Index, Offset, Len}}, Next}, S) -> {ok, NS} = send({reject_request, Index, Offset, Len}, S), - empty_piece_cache(Next, NS). + empty_requests(queue:out(Next), NS). local_choke(S) -> etorrent_rate_mgr:local_choke(S#state.torrent_id, diff --git a/lib/etorrent-1.0/src/etorrent_utils.erl b/lib/etorrent-1.0/src/etorrent_utils.erl index fa5ce2c..b60e142 100644 --- a/lib/etorrent-1.0/src/etorrent_utils.erl +++ b/lib/etorrent-1.0/src/etorrent_utils.erl @@ -9,9 +9,8 @@ -module(etorrent_utils). %% API --export([queue_remove/2, queue_remove_with_check/2, - build_encoded_form_rfc1738/1, - shuffle/1, gsplit/2]). +-export([queue_remove/2, queue_remove_check/2, + build_encoded_form_rfc1738/1, shuffle/1, gsplit/2]). %%==================================================================== %% API @@ -43,15 +42,11 @@ gsplit(N, [H|T], Rest) -> %% return false. %% Note: Inefficient implementation. Converts to/from lists. %%-------------------------------------------------------------------- -queue_remove_with_check(Item, Q) -> +queue_remove_check(Item, Q) -> QList = queue:to_list(Q), - case lists:member(Item, QList) of - true -> - List = lists:delete(Item, QList), - {ok, queue:from_list(List)}; - false -> - false - end. + true = lists:member(Item, QList), + List = lists:delete(Item, QList), + queue:from_list(List). %%-------------------------------------------------------------------- %% Function: queue_remove(Item, queue()) -> queue() -- 2.11.4.GIT