Globalize the etorrent_t_peer_group_mgr and rename it to etorrent_choker while here.
[etorrent.git] / lib / etorrent-1.0 / src / etorrent_choker.erl
blob004e60a27ffc51d64cceb9609db35859ad10ac92
1 %%%-------------------------------------------------------------------
2 %%% File : etorrent_t_peer_group.erl
3 %%% Author : Jesper Louis Andersen <jesper.louis.andersen@gmail.com>
4 %%% License : See COPYING
5 %%% Description : Master process for a number of peers.
6 %%%
7 %%% Created : 18 Jul 2007 by
8 %%% Jesper Louis Andersen <jesper.louis.andersen@gmail.com>
9 %%%-------------------------------------------------------------------
10 -module(etorrent_choker).
12 -behaviour(gen_server).
14 -include("rate_mgr.hrl").
15 -include("peer_state.hrl").
17 -include("etorrent_mnesia_table.hrl").
19 %% API
20 -export([start_link/1, add_peers/2, new_incoming_peer/4, perform_rechoke/0]).
22 %% gen_server callbacks
23 -export([init/1, handle_call/3, handle_cast/2, handle_info/2,
24 terminate/2, code_change/3]).
26 -record(state, {available_peers = [],
27 bad_peers = none,
28 our_peer_id = none,
29 info_hash = none,
31 num_peers = 0,
32 timer_ref = none,
33 round = 0,
35 optimistic_unchoke_pid = none,
36 opt_unchoke_chain = []}).
38 -define(SERVER, ?MODULE).
39 -define(MAX_PEER_PROCESSES, 40).
40 -define(ROUND_TIME, 10000).
41 -define(DEFAULT_NUM_DOWNLOADERS, 4).
43 %%====================================================================
44 %% API
45 %%====================================================================
46 start_link(OurPeerId) ->
47 gen_server:start_link({local, ?SERVER}, ?MODULE, [OurPeerId], []).
49 add_peers(TorrentId, IPList) ->
50 gen_server:cast(?SERVER, {add_peers, [{TorrentId, IPP} || IPP <- IPList]}).
52 perform_rechoke() ->
53 gen_server:cast(?SERVER, rechoke).
55 new_incoming_peer(IP, Port, PeerId, InfoHash) ->
56 %% Set a pretty graceful timeout here as the peer_group can be pretty heavily
57 %% loaded at times. We have 5 acceptors by default anyway.
58 gen_server:call(?SERVER, {new_incoming_peer, IP, Port, PeerId, InfoHash}, 15000).
60 %%====================================================================
61 %% gen_server callbacks
62 %%====================================================================
64 init([OurPeerId]) ->
65 process_flag(trap_exit, true),
66 {ok, Tref} = timer:send_interval(?ROUND_TIME, self(), round_tick),
67 {ok, #state{ our_peer_id = OurPeerId,
68 bad_peers = dict:new(),
69 timer_ref = Tref}}.
72 handle_call({new_incoming_peer, _IP, _Port, PeerId, _InfoHash}, _From, S)
73 when S#state.our_peer_id =:= PeerId ->
74 {reply, connect_to_ourselves, S};
75 handle_call({new_incoming_peer, IP, Port, _PeerId, InfoHash}, _From, S) ->
76 {atomic, [TM]} = etorrent_tracking_map:select({infohash, InfoHash}),
77 case etorrent_bad_peer_mgr:is_bad_peer(IP, Port, TM#tracking_map.id) of
78 true ->
79 {reply, bad_peer, S};
80 false ->
81 start_new_incoming_peer(IP, Port, InfoHash, S)
82 end;
83 handle_call(Request, _From, State) ->
84 error_logger:error_report([unknown_peer_group_call, Request]),
85 Reply = ok,
86 {reply, Reply, State}.
88 handle_cast({add_peers, IPList}, S) ->
89 {ok, NS} = start_new_peers(IPList, S),
90 {noreply, NS};
91 handle_cast(rechoke, S) ->
92 rechoke(S),
93 {noreply, S};
94 handle_cast(_Msg, State) ->
95 {noreply, State}.
97 handle_info(round_tick, S) ->
98 case S#state.round of
99 0 ->
100 {ok, NS} = advance_optimistic_unchoke(S),
101 rechoke(NS),
102 {noreply, NS#state { round = 2}};
103 N when is_integer(N) ->
104 rechoke(S),
105 {noreply, S#state{round = S#state.round - 1}}
106 end;
107 handle_info({'DOWN', _Ref, process, Pid, Reason}, S)
108 when (Reason =:= normal) or (Reason =:= shutdown) ->
109 % The peer shut down normally. Hence we just remove him and start up
110 % other peers. Eventually the tracker will re-add him to the peer list
112 % XXX: We might have to do something else
113 rechoke(S),
115 NewChain = lists:delete(Pid, S#state.opt_unchoke_chain),
116 {ok, NS} = start_new_peers([], S#state { num_peers = S#state.num_peers -1,
117 opt_unchoke_chain = NewChain }),
118 {noreply, NS};
119 handle_info({'DOWN', _Ref, process, Pid, _Reason}, S) ->
120 % The peer shut down unexpectedly re-add him to the queue in the *back*
121 NS = case etorrent_peer:select(Pid) of
122 [Peer] ->
123 {IP, Port} = {Peer#peer.ip, Peer#peer.port},
125 % XXX: We might have to check that remote is intersted and we were choking
126 rechoke(S),
127 S#state { available_peers = S#state.available_peers ++ [{IP, Port}]};
128 [] -> S
129 end,
131 NewChain = lists:delete(Pid, NS#state.opt_unchoke_chain),
132 {noreply, NS#state{num_peers = NS#state.num_peers -1,
133 opt_unchoke_chain = NewChain}};
134 handle_info(Info, State) ->
135 error_logger:error_report([unknown_info_peer_group, Info]),
136 {noreply, State}.
138 terminate(Reason, _S) ->
139 error_logger:info_report([peer_group_mgr_term, Reason]),
142 code_change(_OldVsn, State, _Extra) ->
143 {ok, State}.
145 %%--------------------------------------------------------------------
146 %%% Internal functions
147 %%--------------------------------------------------------------------
149 start_new_incoming_peer(IP, Port, InfoHash, S) ->
150 case ?MAX_PEER_PROCESSES - S#state.num_peers of
151 N when N =< 0 ->
152 {reply, already_enough_connections, S};
153 N when is_integer(N), N > 0 ->
154 {atomic, [T]} = etorrent_tracking_map:select({infohash, InfoHash}),
155 {ok, Pid} = etorrent_t_sup:add_peer(
156 T#tracking_map.supervisor_pid,
157 S#state.our_peer_id,
158 InfoHash,
159 T#tracking_map.id,
160 {IP, Port}),
161 erlang:monitor(process, Pid),
162 NewChain = insert_new_peer_into_chain(Pid, S#state.opt_unchoke_chain),
163 rechoke(S),
164 {reply, {ok, Pid},
165 S#state { num_peers = S#state.num_peers+1,
166 opt_unchoke_chain = NewChain}}
167 end.
170 start_new_peers(IPList, State) ->
171 %% Update the PeerList with the new incoming peers
172 PeerList = lists:usort(IPList ++ State#state.available_peers),
173 S = State#state { available_peers = PeerList},
175 %% Replenish the connected peers.
176 fill_peers(?MAX_PEER_PROCESSES - S#state.num_peers, S).
178 %%% NOTE: fill_peers/2 and spawn_new_peer/5 tail calls each other.
179 fill_peers(0, S) ->
180 {ok, S};
181 fill_peers(N, S) ->
182 case S#state.available_peers of
183 [] ->
184 % No peers available, just stop trying to fill peers
185 {ok, S};
186 [{TorrentId, {IP, Port}} | R] ->
187 % Possible peer. Check it.
188 case etorrent_bad_peer_mgr:is_bad_peer(IP, Port, TorrentId) of
189 true ->
190 fill_peers(N, S#state{available_peers = R});
191 false ->
192 spawn_new_peer(IP, Port, TorrentId, N, S#state{available_peers = R})
194 end.
196 %%--------------------------------------------------------------------
197 %% Function: spawn_new_peer(IP, Port, N, S) -> {ok, State}
198 %% Description: Attempt to spawn the peer at IP/Port. N is the number of
199 %% peers we still need to spawn and S is the current state. Returns
200 %% a new state to be put into the process.
201 %%--------------------------------------------------------------------
202 spawn_new_peer(IP, Port, TorrentId, N, S) ->
203 case etorrent_peer:connected(IP, Port, TorrentId) of
204 true ->
205 fill_peers(N, S);
206 false ->
207 {atomic, [TM]} = etorrent_tracking_map:select(TorrentId),
208 {ok, Pid} = etorrent_t_sup:add_peer(
209 TM#tracking_map.supervisor_pid,
210 S#state.our_peer_id,
211 TM#tracking_map.info_hash,
212 TorrentId,
213 {IP, Port}),
214 erlang:monitor(process, Pid),
215 etorrent_t_peer_recv:connect(Pid, IP, Port),
216 NewChain = insert_new_peer_into_chain(Pid, S#state.opt_unchoke_chain),
217 rechoke(S),
218 fill_peers(N-1, S#state { num_peers = S#state.num_peers +1,
219 opt_unchoke_chain = NewChain})
220 end.
222 %%--------------------------------------------------------------------
223 %% Function: rechoke(State) -> ok
224 %% Description: Recalculate the choke/unchoke state of peers
225 %%--------------------------------------------------------------------
226 rechoke(S) ->
227 Table = etorrent_recv_state,
228 Peers = select_fastest(todo_rewrite_choking_algo, Table),
229 rechoke(Peers, calculate_num_downloaders(S), S).
231 rechoke(Peers, 0, S) ->
232 lists:foreach(fun(P) -> optimistic_unchoke_handler(P, S) end, Peers),
234 rechoke([], _N, _S) ->
236 rechoke([Peer | Rest], N, S) when is_record(Peer, rate_mgr) ->
237 case ets:lookup(etorrent_peer_state, Peer#rate_mgr.pid) of
238 [] ->
239 {_Id, Pid} = Peer#rate_mgr.pid,
240 etorrent_t_peer_recv:unchoke(Pid),
241 rechoke(Rest, N, S);
242 [#peer_state { interest_state = I, pid = {_Id, Pid}}] ->
243 case I of
244 interested ->
245 etorrent_t_peer_recv:unchoke(Pid),
246 rechoke(Rest, N-1, S);
247 not_interested ->
248 etorrent_t_peer_recv:unchoke(Pid),
249 rechoke(Rest, N, S)
251 end.
253 optimistic_unchoke_handler(#rate_mgr { pid = {_Id, Pid} }, S) ->
254 case Pid =:= S#state.optimistic_unchoke_pid of
255 true ->
256 ok; % Handled elsewhere
257 false ->
258 etorrent_t_peer_recv:choke(Pid)
259 end.
261 %% TODO: Make number of downloaders depend on current rate!
262 calculate_num_downloaders(S) ->
263 case ets:lookup(etorrent_peer_state, {todo_redefine_optimistics,
264 S#state.optimistic_unchoke_pid}) of
265 [] ->
266 ?DEFAULT_NUM_DOWNLOADERS;
267 [P] ->
268 case P#peer_state.interest_state of
269 interested ->
270 ?DEFAULT_NUM_DOWNLOADERS - 1;
271 not_interested ->
272 ?DEFAULT_NUM_DOWNLOADERS
274 end.
276 advance_optimistic_unchoke(S) ->
277 NewChain = move_cyclic_chain(S#state.opt_unchoke_chain),
278 case NewChain of
279 [] ->
280 {ok, S}; %% No peers yet
281 [H | _T] ->
282 etorrent_t_peer_recv:unchoke(H),
283 {ok, S#state { opt_unchoke_chain = NewChain,
284 optimistic_unchoke_pid = H }}
285 end.
287 move_cyclic_chain([]) -> [];
288 move_cyclic_chain(Chain) ->
289 F = fun (P) -> local_unchoked(P, todo_move_cyclic_chain_all) end,
290 {Front, Back} = lists:splitwith(F, Chain),
291 %% Advance chain
292 Back ++ Front.
294 local_unchoked(Pid, TorrentId) ->
295 case ets:lookup(etorrent_peer_state, {TorrentId, Pid}) of
296 [] -> true;
297 [P] -> P#peer_state.local_choke =:= true
298 end.
300 insert_new_peer_into_chain(Pid, Chain) ->
301 Length = length(Chain),
302 Index = lists:max([0, crypto:rand_uniform(0, Length)]),
303 {Front, Back} = lists:split(Index, Chain),
304 Front ++ [Pid | Back].
306 select_fastest(Id, Table) ->
307 Rows = ets:select(Table, [{{rate_mgr,{Id,'_'},'_'},[],['$_']}]),
308 lists:reverse(lists:keysort(#rate_mgr.rate, Rows)).