From ae9ba9ba893118647bbd2a57197c6899ae76cfec Mon Sep 17 00:00:00 2001 From: Jesper Louis Andersen Date: Fri, 18 Jul 2008 01:25:10 +0200 Subject: [PATCH] Link the ets tables into the choking/unchoking algorithm. --- lib/etorrent-1.0/include/etorrent_mnesia_table.hrl | 7 +- lib/etorrent-1.0/include/peer_state.hrl | 1 + lib/etorrent-1.0/include/rate_mgr.hrl | 2 + lib/etorrent-1.0/src/etorrent_peer.erl | 49 +--------- lib/etorrent-1.0/src/etorrent_rate_mgr.erl | 106 +++++++++++---------- lib/etorrent-1.0/src/etorrent_t_peer_group_mgr.erl | 88 ++++++++--------- lib/etorrent-1.0/src/etorrent_t_peer_recv.erl | 13 +-- lib/etorrent-1.0/src/etorrent_t_peer_send.erl | 12 ++- 8 files changed, 122 insertions(+), 156 deletions(-) create mode 100644 lib/etorrent-1.0/include/peer_state.hrl create mode 100644 lib/etorrent-1.0/include/rate_mgr.hrl diff --git a/lib/etorrent-1.0/include/etorrent_mnesia_table.hrl b/lib/etorrent-1.0/include/etorrent_mnesia_table.hrl index 769b902..b2cd5c2 100644 --- a/lib/etorrent-1.0/include/etorrent_mnesia_table.hrl +++ b/lib/etorrent-1.0/include/etorrent_mnesia_table.hrl @@ -34,12 +34,7 @@ ip, % Ip of peer in question port, % Port of peer in question torrent_id, % (IDX) Torrent Id this peer belongs to - upload_rate = 0.0, % Amount of uploaded bytes this round - download_rate = 0.0, % Amount of downloaded bytes this round - remote_i_state = not_interested, % Is this peer interested in us? - local_c_state = choked, % true if we are choking remote - remote_c_state = choked, % true if the remote is choking us. - optimistic_c_state = not_opt_unchoke}). % true if we have selected this peer for opt. unchoke + local_c_state = choked}). % true if we are choking remote %% Individual pieces are represented via the piece record -record(piece, {idpn, % {Id, PieceNumber} pair identifying the piece diff --git a/lib/etorrent-1.0/include/peer_state.hrl b/lib/etorrent-1.0/include/peer_state.hrl new file mode 100644 index 0000000..ebfb269 --- /dev/null +++ b/lib/etorrent-1.0/include/peer_state.hrl @@ -0,0 +1 @@ +-record(peer_state, {pid, choke_state, interest_state }). diff --git a/lib/etorrent-1.0/include/rate_mgr.hrl b/lib/etorrent-1.0/include/rate_mgr.hrl new file mode 100644 index 0000000..728786f --- /dev/null +++ b/lib/etorrent-1.0/include/rate_mgr.hrl @@ -0,0 +1,2 @@ +-record(rate_mgr, {pid, % Pid of receiver + rate}). % Rate diff --git a/lib/etorrent-1.0/src/etorrent_peer.erl b/lib/etorrent-1.0/src/etorrent_peer.erl index 1c64017..0fd99ff 100644 --- a/lib/etorrent-1.0/src/etorrent_peer.erl +++ b/lib/etorrent-1.0/src/etorrent_peer.erl @@ -12,8 +12,7 @@ %% API -export([new/4, all/1, delete/1, statechange/2, connected/3, - ip_port/1, select_fastest/2, interested/1, local_unchoked/1, - select/1]). + ip_port/1, local_unchoked/1, select/1]). %%==================================================================== %% API @@ -110,20 +109,6 @@ select(Pid) when is_pid(Pid) -> mnesia:dirty_read(peer, Pid). %%-------------------------------------------------------------------- -%% Function: select_fastest(Id, Key) -> [#peer] -%% Interest ::= interested | not_interested -%% Key ::= integer() -%% Description: Select the fastest peers matching the query -%%-------------------------------------------------------------------- -select_fastest(TorrentId, Key) -> - mnesia:transaction( - fun () -> - QH = qlc:q([P || P <- mnesia:table(peer), - P#peer.torrent_id =:= TorrentId]), - qlc:e(qlc:keysort(Key, QH, {order, descending})) - end). - -%%-------------------------------------------------------------------- %% Function: local_unchoked(P) -> bool() | none %% P ::= pid() %% Description: Predicate: P is unchoked locally. If the peer can't be @@ -138,20 +123,6 @@ local_unchoked(P) -> end end. -%%-------------------------------------------------------------------- -%% Function: interested(P) -> bool() -%% P ::= none | pid() -%% Description: Query the remote interest state on P -%%-------------------------------------------------------------------- -interested(none) -> false; -interested(P) when is_pid(P) -> - case mnesia:dirty_read(peer, P) of - [] -> - false; - [_] -> - true - end. - %%==================================================================== %% Internal functions %%==================================================================== @@ -163,24 +134,8 @@ interested(P) when is_pid(P) -> %%-------------------------------------------------------------------- alter_state(Peer, What) -> case What of - optimistic_unchoke -> - Peer#peer{ optimistic_c_state = opt_unchoke }; - remove_optimistic_unchoke -> - Peer#peer{ optimistic_c_state = not_opt_unchoke }; - remote_choking -> - Peer#peer{ remote_c_state = choked}; - remote_unchoking -> - Peer#peer{ remote_c_state = unchoked}; local_choking -> Peer#peer { local_c_state = choked }; local_unchoking -> - Peer#peer { local_c_state = unchoked }; - interested -> - Peer#peer{ remote_i_state = interested}; - not_interested -> - Peer#peer{ remote_i_state = not_interested}; - {download_rate, Rate} -> - Peer#peer { download_rate = Rate }; - {upload_rate, Rate} -> - Peer#peer { upload_rate = Rate } + Peer#peer { local_c_state = unchoked } end. diff --git a/lib/etorrent-1.0/src/etorrent_rate_mgr.erl b/lib/etorrent-1.0/src/etorrent_rate_mgr.erl index fbdcd46..ea3d699 100644 --- a/lib/etorrent-1.0/src/etorrent_rate_mgr.erl +++ b/lib/etorrent-1.0/src/etorrent_rate_mgr.erl @@ -7,25 +7,25 @@ %%%------------------------------------------------------------------- -module(etorrent_rate_mgr). +-include("peer_state.hrl"). +-include("rate_mgr.hrl"). + -behaviour(gen_server). + %% API -export([start_link/0, - choke/1, unchoke/1, interested/1, not_interested/1, - recv_rate/2, send_rate/2]). + choke/2, unchoke/2, interested/2, not_interested/2, + recv_rate/3, send_rate/3]). %% gen_server callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). -record(state, { recv, - send}). - --record(rate, {pid, % Pid of receiver - rate, % Rate - choke_state, - interest_state }). + send, + state}). -define(SERVER, ?MODULE). @@ -40,16 +40,16 @@ start_link() -> gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). %% Send state information -choke(Pid) -> gen_server:cast(?SERVER, {choke, Pid}). -unchoke(Pid) -> gen_server:cast(?SERVER, {unchoke, Pid}). -interested(Pid) -> gen_server:cast(?SERVER, {interested, Pid}). -not_interested(Pid) -> gen_server:cast(?SERVER, {not_interested, Pid}). +choke(Id, Pid) -> gen_server:cast(?SERVER, {choke, Id, Pid}). +unchoke(Id, Pid) -> gen_server:cast(?SERVER, {unchoke, Id, Pid}). +interested(Id, Pid) -> gen_server:cast(?SERVER, {interested, Id, Pid}). +not_interested(Id, Pid) -> gen_server:cast(?SERVER, {not_interested, Id, Pid}). -recv_rate(Pid, Rate) -> - gen_server:cast(?SERVER, {recv_rate, Pid, Rate}). +recv_rate(Id, Pid, Rate) -> + gen_server:cast(?SERVER, {recv_rate, Id, Pid, Rate}). -send_rate(Pid, Rate) -> - gen_server:cast(?SERVER, {send_rate, Pid, Rate}). +send_rate(Id, Pid, Rate) -> + gen_server:cast(?SERVER, {send_rate, Id, Pid, Rate}). %%==================================================================== %% gen_server callbacks @@ -64,9 +64,13 @@ send_rate(Pid, Rate) -> %%-------------------------------------------------------------------- init([]) -> process_flag(trap_exit, true), - RTid = ets:new(etorrent_recv_state, [set, protected, named_table]), - STid = ets:new(etorrent_send_state, [set, protected, named_table]), - {ok, #state{ recv = RTid, send = STid}}. + RTid = ets:new(etorrent_recv_state, [set, protected, named_table, + {keypos, #rate_mgr.pid}]), + STid = ets:new(etorrent_send_state, [set, protected, named_table, + {keypos, #rate_mgr.pid}]), + StTid = ets:new(etorrent_peer_state, [set, protected, named_table, + {keypos, #peer_state.pid}]), + {ok, #state{ recv = RTid, send = STid, state = StTid}}. %%-------------------------------------------------------------------- %% Function: %% handle_call(Request, From, State) -> {reply, Reply, State} | @@ -87,11 +91,11 @@ handle_call(_Request, _From, State) -> %% {stop, Reason, State} %% Description: Handling cast messages %%-------------------------------------------------------------------- -handle_cast({What, Pid}, S) -> - alter_state(What, Pid), +handle_cast({What, Id, Pid}, S) -> + ok = alter_state(What, Id, Pid), {noreply, S}; -handle_cast({What, Who, Rate}, S) -> - alter_state(What, Who, Rate), +handle_cast({What, Id, Who, Rate}, S) -> + ok = alter_state(What, Id, Who, Rate), {noreply, S}; handle_cast(_Msg, State) -> {noreply, State}. @@ -103,8 +107,9 @@ handle_cast(_Msg, State) -> %% Description: Handling all non call/cast messages %%-------------------------------------------------------------------- handle_info({'DOWN', _Ref, process, Pid, _Reason}, S) -> - true = ets:delete(etorrent_recv_state, Pid), - true = ets:delete(etorrent_send_state, Pid), + true = ets:match_delete(etorrent_recv_state, #rate_mgr { pid = {'_', Pid}, _='_'}), + true = ets:match_delete(etorrent_send_state, #rate_mgr { pid = {'_', Pid}, _='_'}), + true = ets:match_delete(etorrent_peer_state, #peer_state { pid = {'_', Pid}, _='_'}), {noreply, S}; handle_info(_Info, State) -> {noreply, State}. @@ -119,6 +124,7 @@ handle_info(_Info, State) -> terminate(_Reason, S) -> true = ets:delete(S#state.recv), true = ets:delete(S#state.send), + true = ets:delete(S#state.state), ok. @@ -133,46 +139,46 @@ code_change(_OldVsn, State, _Extra) -> %%% Internal functions %%-------------------------------------------------------------------- -alter_state(What, Pid) -> - case ets:lookup(etorrent_recv_state, Pid) of +alter_state(What, Id, Pid) -> + _R = case ets:lookup(etorrent_peer_state, {Id, Pid}) of [] -> - ets:insert( + ets:insert(etorrent_peer_state, alter_record(What, - #rate { pid = Pid, - rate = 0.0, - choke_state = choked, - interest_state = not_intersted})), - erlang:monitor(Pid); + #peer_state { pid = {Id, Pid}, + choke_state = choked, + interest_state = not_intersted})), + erlang:monitor(process, Pid); [R] -> - ets:insert(alter_record(What, R)) - end. + ets:insert(etorrent_peer_state, + alter_record(What, R)) + end, + ok. alter_record(What, R) -> case What of choked -> - R#rate { choke_state = choked }; - unchoked -> - R#rate { choke_state = unchoked }; + R#peer_state { choke_state = choked }; + unchok -> + R#peer_state { choke_state = unchoked }; interested -> - R#rate { interest_state = interested }; + R#peer_state { interest_state = interested }; not_interested -> - R#rate { interest_state = not_intersted } + R#peer_state { interest_state = not_intersted } end. -alter_state(What, Who, Rate) -> +alter_state(What, Id, Who, Rate) -> T = case What of recv_rate -> etorrent_recv_state; send_rate -> etorrent_send_state end, - case ets:lookup(T, Who) of + _R = case ets:lookup(T, {Id, Who}) of [] -> - ets:insert( - #rate { pid = Who, - rate = Rate, - choke_state = choked, - interest_state = not_intersted}), - erlang:monitor(Who); + ets:insert(T, + #rate_mgr { pid = {Id, Who}, + rate = Rate }), + erlang:monitor(process, Who); [R] -> - ets:insert(R#rate { rate = Rate }) - end. + ets:insert(T, R#rate_mgr { rate = Rate }) + end, + ok. diff --git a/lib/etorrent-1.0/src/etorrent_t_peer_group_mgr.erl b/lib/etorrent-1.0/src/etorrent_t_peer_group_mgr.erl index b3d28f9..ce25495 100644 --- a/lib/etorrent-1.0/src/etorrent_t_peer_group_mgr.erl +++ b/lib/etorrent-1.0/src/etorrent_t_peer_group_mgr.erl @@ -12,6 +12,9 @@ -behaviour(gen_server). +-include("rate_mgr.hrl"). +-include("peer_state.hrl"). + -include("etorrent_mnesia_table.hrl"). %% API @@ -131,18 +134,9 @@ handle_info({'DOWN', _Ref, process, Pid, Reason}, S) when (Reason =:= normal) or (Reason =:= shutdown) -> % The peer shut down normally. Hence we just remove him and start up % other peers. Eventually the tracker will re-add him to the peer list - case etorrent_peer:select(Pid) of - [Peer] -> - case {Peer#peer.remote_i_state, Peer#peer.local_c_state} of - {interested, choked} -> - rechoke(S); - _ -> - ok - end, - ok; - [] -> - ok - end, + + % XXX: We might have to do something else + rechoke(S), NewChain = lists:delete(Pid, S#state.opt_unchoke_chain), {ok, NS} = start_new_peers([], S#state { num_peers = S#state.num_peers -1, @@ -153,12 +147,9 @@ handle_info({'DOWN', _Ref, process, Pid, _Reason}, S) -> NS = case etorrent_peer:select(Pid) of [Peer] -> {IP, Port} = {Peer#peer.ip, Peer#peer.port}, - case {Peer#peer.remote_i_state, Peer#peer.local_c_state} of - {interested, choked} -> - rechoke(S); - _ -> - ok - end, + + % XXX: We might have to check that remote is intersted and we were choking + rechoke(S), S#state { available_peers = S#state.available_peers ++ [{IP, Port}]}; [] -> S end, @@ -290,12 +281,12 @@ is_bad_peer(IP, Port, S) -> %% Description: Recalculate the choke/unchoke state of peers %%-------------------------------------------------------------------- rechoke(S) -> - Key = case etorrent_torrent:mode(S#state.torrent_id) of - seeding -> #peer.upload_rate; - leeching -> #peer.download_rate; - endgame -> #peer.download_rate + Table = case etorrent_torrent:mode(S#state.torrent_id) of + seeding -> etorrent_send_state; + leeching -> etorrent_recv_state; + endgame -> etorrent_recv_state end, - {atomic, Peers} = etorrent_peer:select_fastest(S#state.torrent_id, Key), + Peers = select_fastest(S#state.torrent_id, Table), rechoke(Peers, calculate_num_downloaders(S), S). rechoke(Peers, 0, S) -> @@ -303,31 +294,42 @@ rechoke(Peers, 0, S) -> ok; rechoke([], _N, _S) -> ok; -rechoke([Peer | Rest], N, S) when is_record(Peer, peer) -> - case Peer#peer.remote_i_state of - interested -> - etorrent_t_peer_recv:unchoke(Peer#peer.pid), - rechoke(Rest, N-1, S); - not_interested -> - etorrent_t_peer_recv:unchoke(Peer#peer.pid), - rechoke(Rest, N, S) +rechoke([Peer | Rest], N, S) when is_record(Peer, rate_mgr) -> + case ets:lookup(etorrent_peer_state, Peer#rate_mgr.pid) of + [] -> + rechoke(Rest, N, S); + [#peer_state { interest_state = I, pid = {_Id, Pid}}] -> + case I of + interested -> + etorrent_t_peer_recv:unchoke(Pid), + rechoke(Rest, N-1, S); + not_interested -> + etorrent_t_peer_recv:unchoke(Pid), + rechoke(Rest, N, S) + end end. -optimistic_unchoke_handler(P, S) -> - case P#peer.pid =:= S#state.optimistic_unchoke_pid of +optimistic_unchoke_handler(#rate_mgr { pid = {_Id, Pid} }, S) -> + case Pid =:= S#state.optimistic_unchoke_pid of true -> ok; % Handled elsewhere false -> - etorrent_t_peer_recv:choke(P#peer.pid) + etorrent_t_peer_recv:choke(Pid) end. %% TODO: Make number of downloaders depend on current rate! calculate_num_downloaders(S) -> - case etorrent_peer:interested(S#state.optimistic_unchoke_pid) of - true -> - ?DEFAULT_NUM_DOWNLOADERS - 1; - false -> - ?DEFAULT_NUM_DOWNLOADERS + case ets:lookup(etorrent_peer_state, {S#state.torrent_id, + S#state.optimistic_unchoke_pid}) of + [] -> + ?DEFAULT_NUM_DOWNLOADERS; + [P] -> + case P#peer_state.interest_state of + interested -> + ?DEFAULT_NUM_DOWNLOADERS - 1; + not_interested -> + ?DEFAULT_NUM_DOWNLOADERS + end end. advance_optimistic_unchoke(S) -> @@ -336,10 +338,6 @@ advance_optimistic_unchoke(S) -> [] -> {ok, S}; %% No peers yet [H | _T] -> - _R1 = etorrent_peer:statechange(S#state.optimistic_unchoke_pid, - remove_optimistic_unchoke), - %% Do not choke here, a later call to rechoke will do it. - _R2 = etorrent_peer:statechange(H, optimistic_unchoke), etorrent_t_peer_recv:unchoke(H), {ok, S#state { opt_unchoke_chain = NewChain, optimistic_unchoke_pid = H }} @@ -357,6 +355,10 @@ insert_new_peer_into_chain(Pid, Chain) -> {Front, Back} = lists:split(Index, Chain), Front ++ [Pid | Back]. +select_fastest(Id, Table) -> + Rows = ets:select(Table, [{{rate_mgr,{Id,'_'},'_'},[],['$_']}]), + lists:reverse(lists:keysort(#rate_mgr.rate, Rows)). + diff --git a/lib/etorrent-1.0/src/etorrent_t_peer_recv.erl b/lib/etorrent-1.0/src/etorrent_t_peer_recv.erl index 3c8af7c..1289b77 100644 --- a/lib/etorrent-1.0/src/etorrent_t_peer_recv.erl +++ b/lib/etorrent-1.0/src/etorrent_t_peer_recv.erl @@ -260,7 +260,7 @@ handle_info(timeout, S) -> end; handle_info(rate_update, S) -> Rate = etorrent_rate:update(S#state.rate, 0), - ok = etorrent_rate_manager:recv_rate(self, Rate#peer_rate.rate), + ok = etorrent_rate_mgr:recv_rate(S#state.torrent_id, self(), Rate#peer_rate.rate), {noreply, S#state { rate = Rate}, 0}; handle_info(_Info, State) -> {noreply, State, 0}. @@ -307,18 +307,18 @@ code_change(_OldVsn, State, _Extra) -> handle_message(keep_alive, S) -> {ok, S}; handle_message(choke, S) -> - ok = etorrent_rate_mgr:choke(self()), + ok = etorrent_rate_mgr:choke(S#state.torrent_id, self()), NS = unqueue_all_pieces(S), {ok, NS#state { remote_choked = true }}; handle_message(unchoke, S) -> - ok = etorrent_rate_mgr:unchoke(self()), + ok = etorrent_rate_mgr:unchoke(S#state.torrent_id, self()), try_to_queue_up_pieces(S#state{remote_choked = false}); handle_message(interested, S) -> - ok = etorrent_rate_mgr:interested(self()), + ok = etorrent_rate_mgr:interested(S#state.torrent_id, self()), etorrent_t_peer_group_mgr:perform_rechoke(S#state.peer_group_pid), {ok, S}; handle_message(not_interested, S) -> - ok = etorrent_rate_mgr:not_interested(self()), + ok = etorrent_rate_mgr:not_interested(S#state.torrent_id, self()), etorrent_t_peer_group_mgr:perform_rechoke(S#state.peer_group_pid), {ok, S}; handle_message({request, Index, Offset, Len}, S) -> @@ -577,7 +577,8 @@ handle_read_from_socket(S, Packet) Left = size(Data), P = iolist_to_binary(lists:reverse([Data | S#state.packet_iolist])), {Msg, Rate} = etorrent_peer_communication:recv_message(S#state.rate, P), - ok = etorrent_rate_mgr:recv_rate(self(), Rate#peer_rate.rate), + ok = etorrent_rate_mgr:recv_rate(S#state.torrent_id, + self(), Rate#peer_rate.rate), case handle_message(Msg, S#state {rate = Rate}) of {ok, NS} -> handle_read_from_socket(NS#state { packet_left = none, diff --git a/lib/etorrent-1.0/src/etorrent_t_peer_send.erl b/lib/etorrent-1.0/src/etorrent_t_peer_send.erl index e684c15..39fb1b7 100644 --- a/lib/etorrent-1.0/src/etorrent_t_peer_send.erl +++ b/lib/etorrent-1.0/src/etorrent_t_peer_send.erl @@ -153,7 +153,8 @@ handle_info(keep_alive_tick, S) -> send_message(keep_alive, S, 0); handle_info(rate_update, S) -> Rate = etorrent_rate:update(S#state.rate, 0), - ok = etorrent_rate_mgr:send_rate(S#state.parent, + ok = etorrent_rate_mgr:send_rate(S#state.torrent_id, + S#state.parent, Rate#peer_rate.rate), {noreply, S#state { rate = Rate }}; handle_info(timeout, S) @@ -234,7 +235,8 @@ terminate(_Reason, S) -> send_piece_message(Msg, S, Timeout) -> case etorrent_peer_communication:send_message(S#state.rate, S#state.socket, Msg) of {ok, R} -> - ok = etorrent_rate_mgr:send_rate(S#state.parent, + ok = etorrent_rate_mgr:send_rate(S#state.torrent_id, + S#state.parent, R#peer_rate.rate), {noreply, S#state { rate = R }, Timeout}; {{error, closed}, R} -> @@ -271,8 +273,10 @@ send_message(Msg, S) -> send_message(Msg, S, Timeout) -> case etorrent_peer_communication:send_message(S#state.rate, S#state.socket, Msg) of {ok, Rate} -> - ok = etorrent_rate_mgr:send_rate(S#state.parent, - Rate#peer_rate.rate), + ok = etorrent_rate_mgr:send_rate( + S#state.torrent_id, + S#state.parent, + Rate#peer_rate.rate), {noreply, S#state { rate = Rate}, Timeout}; {{error, ebadf}, R} -> error_logger:info_report([caught_ebadf, S#state.socket]), -- 2.11.4.GIT