Fix choking state calculation for leechers.
[etorrent.git] / lib / etorrent-1.0 / src / etorrent_choker.erl
blob97989bd8a09a1bd23ffdcbee5642f602b0f78235
1 %%%-------------------------------------------------------------------
2 %%% File : etorrent_t_peer_group.erl
3 %%% Author : Jesper Louis Andersen <jesper.louis.andersen@gmail.com>
4 %%% License : See COPYING
5 %%% Description : Master process for a number of peers.
6 %%%
7 %%% Created : 18 Jul 2007 by
8 %%% Jesper Louis Andersen <jesper.louis.andersen@gmail.com>
9 %%%-------------------------------------------------------------------
10 -module(etorrent_choker).
12 -behaviour(gen_server).
14 -include("rate_mgr.hrl").
15 -include("peer_state.hrl").
17 -include("etorrent_mnesia_table.hrl").
19 %% API
20 -export([start_link/1, add_peers/2, new_incoming_peer/4, perform_rechoke/0]).
22 %% gen_server callbacks
23 -export([init/1, handle_call/3, handle_cast/2, handle_info/2,
24 terminate/2, code_change/3]).
26 -record(state, {available_peers = [],
27 bad_peers = none,
28 our_peer_id = none,
29 info_hash = none,
31 num_peers = 0,
32 timer_ref = none,
33 round = 0,
35 optimistic_unchoke_pid = none,
36 opt_unchoke_chain = []}).
38 -record(rechoke_info, {pid :: pid(),
39 kind :: 'seeding' | 'leeching',
40 state :: 'seeding' | 'leeching' ,
41 snubbed :: bool(),
42 r_interest_state :: 'interested' | 'not_interested',
43 r_choke_state :: 'choked' | 'unchoked' ,
44 l_choke :: bool(),
45 rate :: float() }).
47 -define(SERVER, ?MODULE).
48 -define(MAX_PEER_PROCESSES, 40).
49 -define(ROUND_TIME, 10000).
51 %%====================================================================
52 %% API
53 %%====================================================================
54 start_link(OurPeerId) ->
55 gen_server:start_link({local, ?SERVER}, ?MODULE, [OurPeerId], []).
57 add_peers(TorrentId, IPList) ->
58 gen_server:cast(?SERVER, {add_peers, [{TorrentId, IPP} || IPP <- IPList]}).
60 perform_rechoke() ->
61 gen_server:cast(?SERVER, rechoke).
63 new_incoming_peer(IP, Port, PeerId, InfoHash) ->
64 %% Set a pretty graceful timeout here as the peer_group can be pretty heavily
65 %% loaded at times. We have 5 acceptors by default anyway.
66 gen_server:call(?SERVER, {new_incoming_peer, IP, Port, PeerId, InfoHash}, 15000).
68 %%====================================================================
69 %% gen_server callbacks
70 %%====================================================================
72 init([OurPeerId]) ->
73 process_flag(trap_exit, true),
74 {ok, Tref} = timer:send_interval(?ROUND_TIME, self(), round_tick),
75 {ok, #state{ our_peer_id = OurPeerId,
76 bad_peers = dict:new(),
77 timer_ref = Tref}}.
80 handle_call({new_incoming_peer, _IP, _Port, PeerId, _InfoHash}, _From, S)
81 when S#state.our_peer_id =:= PeerId ->
82 {reply, connect_to_ourselves, S};
83 handle_call({new_incoming_peer, IP, Port, _PeerId, InfoHash}, _From, S) ->
84 {atomic, [TM]} = etorrent_tracking_map:select({infohash, InfoHash}),
85 case etorrent_bad_peer_mgr:is_bad_peer(IP, Port, TM#tracking_map.id) of
86 true ->
87 {reply, bad_peer, S};
88 false ->
89 start_new_incoming_peer(IP, Port, InfoHash, S)
90 end;
91 handle_call(Request, _From, State) ->
92 error_logger:error_report([unknown_peer_group_call, Request]),
93 Reply = ok,
94 {reply, Reply, State}.
96 handle_cast({add_peers, IPList}, S) ->
97 {ok, NS} = start_new_peers(IPList, S),
98 {noreply, NS};
99 handle_cast(rechoke, S) ->
100 rechoke(S),
101 {noreply, S};
102 handle_cast(_Msg, State) ->
103 {noreply, State}.
105 handle_info(round_tick, S) ->
106 case S#state.round of
107 0 ->
108 {ok, NS} = advance_optimistic_unchoke(S),
109 rechoke(NS),
110 {noreply, NS#state { round = 2}};
111 N when is_integer(N) ->
112 rechoke(S),
113 {noreply, S#state{round = S#state.round - 1}}
114 end;
115 handle_info({'DOWN', _Ref, process, Pid, Reason}, S)
116 when (Reason =:= normal) or (Reason =:= shutdown) ->
117 % The peer shut down normally. Hence we just remove him and start up
118 % other peers. Eventually the tracker will re-add him to the peer list
120 % XXX: We might have to do something else
121 rechoke(S),
123 NewChain = lists:delete(Pid, S#state.opt_unchoke_chain),
124 {ok, NS} = start_new_peers([], S#state { num_peers = S#state.num_peers -1,
125 opt_unchoke_chain = NewChain }),
126 {noreply, NS};
127 handle_info({'DOWN', _Ref, process, Pid, _Reason}, S) ->
128 % The peer shut down unexpectedly re-add him to the queue in the *back*
129 NS = case etorrent_peer:select(Pid) of
130 [Peer] ->
131 {IP, Port} = {Peer#peer.ip, Peer#peer.port},
133 % XXX: We might have to check that remote is intersted and we were choking
134 rechoke(S),
135 S#state { available_peers = S#state.available_peers ++ [{IP, Port}]};
136 [] -> S
137 end,
139 NewChain = lists:delete(Pid, NS#state.opt_unchoke_chain),
140 {noreply, NS#state{num_peers = NS#state.num_peers -1,
141 opt_unchoke_chain = NewChain}};
142 handle_info(Info, State) ->
143 error_logger:error_report([unknown_info_peer_group, Info]),
144 {noreply, State}.
146 terminate(Reason, _S) ->
147 error_logger:info_report([peer_group_mgr_term, Reason]),
150 code_change(_OldVsn, State, _Extra) ->
151 {ok, State}.
153 %%--------------------------------------------------------------------
154 %%% Internal functions
155 %%--------------------------------------------------------------------
157 start_new_incoming_peer(IP, Port, InfoHash, S) ->
158 case max_peer_processes() - S#state.num_peers of
159 N when N =< 0 ->
160 {reply, already_enough_connections, S};
161 N when is_integer(N), N > 0 ->
162 {atomic, [T]} = etorrent_tracking_map:select({infohash, InfoHash}),
163 {ok, Pid} = etorrent_t_sup:add_peer(
164 T#tracking_map.supervisor_pid,
165 S#state.our_peer_id,
166 InfoHash,
167 T#tracking_map.id,
168 {IP, Port}),
169 erlang:monitor(process, Pid),
170 NewChain = insert_new_peer_into_chain(Pid, S#state.opt_unchoke_chain),
171 rechoke(S),
172 {reply, {ok, Pid},
173 S#state { num_peers = S#state.num_peers+1,
174 opt_unchoke_chain = NewChain}}
175 end.
178 start_new_peers(IPList, State) ->
179 %% Update the PeerList with the new incoming peers
180 PeerList = lists:usort(IPList ++ State#state.available_peers),
181 S = State#state { available_peers = PeerList},
183 %% Replenish the connected peers.
184 fill_peers(max_peer_processes() - S#state.num_peers, S).
186 %%% NOTE: fill_peers/2 and spawn_new_peer/5 tail calls each other.
187 fill_peers(0, S) ->
188 {ok, S};
189 fill_peers(N, S) ->
190 case S#state.available_peers of
191 [] ->
192 % No peers available, just stop trying to fill peers
193 {ok, S};
194 [{TorrentId, {IP, Port}} | R] ->
195 % Possible peer. Check it.
196 case etorrent_bad_peer_mgr:is_bad_peer(IP, Port, TorrentId) of
197 true ->
198 fill_peers(N, S#state{available_peers = R});
199 false ->
200 spawn_new_peer(IP, Port, TorrentId, N, S#state{available_peers = R})
202 end.
204 %%--------------------------------------------------------------------
205 %% Function: spawn_new_peer(IP, Port, N, S) -> {ok, State}
206 %% Description: Attempt to spawn the peer at IP/Port. N is the number of
207 %% peers we still need to spawn and S is the current state. Returns
208 %% a new state to be put into the process.
209 %%--------------------------------------------------------------------
210 spawn_new_peer(IP, Port, TorrentId, N, S) ->
211 case etorrent_peer:connected(IP, Port, TorrentId) of
212 true ->
213 fill_peers(N, S);
214 false ->
215 {atomic, [TM]} = etorrent_tracking_map:select(TorrentId),
216 {ok, Pid} = etorrent_t_sup:add_peer(
217 TM#tracking_map.supervisor_pid,
218 S#state.our_peer_id,
219 TM#tracking_map.info_hash,
220 TorrentId,
221 {IP, Port}),
222 erlang:monitor(process, Pid),
223 etorrent_t_peer_recv:connect(Pid, IP, Port),
224 NewChain = insert_new_peer_into_chain(Pid, S#state.opt_unchoke_chain),
225 rechoke(S),
226 fill_peers(N-1, S#state { num_peers = S#state.num_peers +1,
227 opt_unchoke_chain = NewChain})
228 end.
230 %%--------------------------------------------------------------------
231 %% Function: rechoke(State) -> ok
232 %% Description: Recalculate the choke/unchoke state of peers
233 %%--------------------------------------------------------------------
234 rechoke(S) ->
235 Peers = build_rechoke_info(S#state.opt_unchoke_chain),
236 {PreferredDown, PreferredSeed} = split_preferred(Peers),
237 PreferredSet = prune_preferred_peers(PreferredDown, PreferredSeed),
238 {N, ToChoke} = rechoke_unchoke(Peers, PreferredSet, 0, []),
239 rechoke_choke(ToChoke, N, optimistics(PreferredSet)).
241 build_rechoke_info(Peers) ->
242 SeederSet = sets:from_list(seeding_torrents()),
243 build_rechoke_info(SeederSet, Peers).
245 build_rechoke_info(_Seeding, []) ->
247 build_rechoke_info(Seeding, [Pid | Next]) ->
248 case etorrent_peer:select(Pid) of
249 [] -> build_rechoke_info(Seeding, Next);
250 [Peer] ->
251 Kind = Peer#peer.state,
252 Snubbed = etorrent_rate_mgr:snubbed(Peer#peer.torrent_id, Pid),
253 PeerState = etorrent_rate_mgr:select_state(Peer#peer.torrent_id, Pid),
254 case sets:is_element(Peer#peer.torrent_id, Seeding) of
255 true ->
256 case etorrent_rate_mgr:fetch_send_rate(
257 Peer#peer.torrent_id,
258 Pid) of
259 none -> build_rechoke_info(Seeding, Next);
260 Rate ->
261 [#rechoke_info { pid = Pid,
262 kind = Kind,
263 state = seeding,
264 rate = Rate,
265 r_interest_state =
266 PeerState#peer_state.interest_state,
267 r_choke_state =
268 PeerState#peer_state.choke_state,
269 l_choke =
270 PeerState#peer_state.local_choke,
271 snubbed = Snubbed } |
272 build_rechoke_info(Seeding, Next)]
273 end;
274 false ->
275 case etorrent_rate_mgr:fetch_recv_rate(
276 Peer#peer.torrent_id,
277 Pid) of
278 none -> build_rechoke_info(Seeding, Next);
279 Rate ->
280 [#rechoke_info { pid = Pid,
281 kind = Kind,
282 state = leeching,
283 rate = -Rate, % Inverted for later sorting!
284 r_interest_state =
285 PeerState#peer_state.interest_state,
286 r_choke_state =
287 PeerState#peer_state.choke_state,
288 l_choke =
289 PeerState#peer_state.local_choke,
290 snubbed = Snubbed } |
291 build_rechoke_info(Seeding, Next)]
294 end.
296 advance_optimistic_unchoke(S) ->
297 NewChain = move_cyclic_chain(S#state.opt_unchoke_chain),
298 case NewChain of
299 [] ->
300 {ok, S}; %% No peers yet
301 [H | _T] ->
302 etorrent_t_peer_recv:unchoke(H),
303 {ok, S#state { opt_unchoke_chain = NewChain,
304 optimistic_unchoke_pid = H }}
305 end.
307 move_cyclic_chain([]) -> [];
308 move_cyclic_chain(Chain) ->
309 F = fun (Pid) ->
310 case etorrent_peer:select(Pid) of
311 [] -> true;
312 [P] -> T = etorrent_rate_mgr:select_state(
313 P#peer.torrent_id,
314 Pid),
315 not (T#peer_state.interest_state =:= interested
316 andalso T#peer_state.choke_state =:= choked)
318 end,
319 {Front, Back} = lists:splitwith(F, Chain),
320 %% Advance chain
321 Back ++ Front.
323 insert_new_peer_into_chain(Pid, Chain) ->
324 Length = length(Chain),
325 Index = lists:max([0, crypto:rand_uniform(0, Length)]),
326 {Front, Back} = lists:split(Index, Chain),
327 Front ++ [Pid | Back].
329 max_peer_processes() ->
330 case application:get_env(etorrent, max_peers) of
331 {ok, N} when is_integer(N) ->
333 undefined ->
334 ?MAX_PEER_PROCESSES
335 end.
337 upload_slots() ->
338 case application:get_env(etorrent, max_upload_slots) of
339 {ok, auto} ->
340 {ok, Rate} = application:get_env(etorrent, max_upload_rate),
341 case Rate of
342 N when N =< 0 -> 7; %% Educated guess
343 N when N < 9 -> 2;
344 N when N < 15 -> 3;
345 N when N < 42 -> 4;
346 N ->
347 round(math:sqrt(N * 0.6))
348 end;
349 {ok, N} when is_integer(N) ->
351 end.
353 split_preferred(Peers) ->
354 {Downs, Leechs} = split_preferred_peers(Peers, [], []),
355 {lists:keysort(#rechoke_info.rate, Downs),
356 lists:keysort(#rechoke_info.rate, Leechs)}.
358 prune_preferred_peers(SDowns, SLeechs) ->
359 MaxUploads = upload_slots(),
360 DUploads = lists:max([1, round(MaxUploads * 0.7)]),
361 SUploads = lists:max([1, round(MaxUploads * 0.3)]),
362 {SUP2, DUP2} =
363 case lists:max([0, DUploads - length(SDowns)]) of
364 0 -> {SUploads, DUploads};
365 N -> {SUploads + N, DUploads - N}
366 end,
367 {SUP3, DUP3} =
368 case lists:max([0, SUP2 - length(SLeechs)]) of
369 0 -> {SUP2, DUP2};
370 K ->
371 {SUP2 - K, lists:min([DUP2 + K, length(SDowns)])}
372 end,
373 {TSDowns, TSLeechs} = {lists:sublist(SDowns, DUP3),
374 lists:sublist(SLeechs, SUP3)},
375 sets:union(sets:from_list(TSDowns), sets:from_list(TSLeechs)).
377 rechoke_unchoke([], _PS, Count, ToChoke) ->
378 {Count, ToChoke};
379 rechoke_unchoke([P | Next], PSet, Count, ToChoke) ->
380 case sets:is_element(P, PSet) of
381 true ->
382 etorrent_t_peer_recv:unchoke(P#rechoke_info.pid),
383 rechoke_unchoke(Next, PSet, Count+1, ToChoke);
384 false ->
385 rechoke_unchoke(Next, PSet, Count+1, [P | ToChoke])
386 end.
388 optimistics(PSet) ->
389 MinUp = case application:get_env(etorrent, min_uploads) of
390 {ok, N} -> N;
391 undefined -> 1
392 end,
393 lists:max([MinUp, upload_slots() - sets:size(PSet)]).
395 rechoke_choke([], _Count, _Optimistics) ->
397 rechoke_choke([P | Next], Count, Optimistics) when Count >= Optimistics ->
398 etorrent_t_peer_recv:choke(P#rechoke_info.pid),
399 rechoke_choke(Next, Count, Optimistics);
400 rechoke_choke([P | Next], Count, Optimistics) ->
401 case P#rechoke_info.kind =:= seeding of
402 true ->
403 etorrent_t_peer_recv:choke(P#rechoke_info.pid),
404 rechoke_choke(Next, Count, Optimistics);
405 false ->
406 etorrent_t_peer_recv:unchoke(P#rechoke_info.pid),
407 case P#rechoke_info.r_interest_state =:= interested of
408 true ->
409 rechoke_choke(Next, Count+1, Optimistics);
410 false ->
411 rechoke_choke(Next, Count, Optimistics)
413 end.
415 split_preferred_peers([], Downs, Leechs) ->
416 {Downs, Leechs};
417 split_preferred_peers([P | Next], Downs, Leechs) ->
418 case P#rechoke_info.state =:= seeding
419 orelse P#rechoke_info.r_interest_state =:= not_interested of
420 true ->
421 split_preferred_peers(Next, Downs, Leechs);
422 false when P#rechoke_info.state =:= seeding ->
423 split_preferred_peers(Next, Downs, [P | Leechs]);
424 false when P#rechoke_info.snubbed =:= true ->
425 split_preferred_peers(Next, Downs, Leechs);
426 false ->
427 split_preferred_peers(Next, [P | Downs], Leechs)
428 end.
430 seeding_torrents() ->
431 {atomic, Torrents} = etorrent_torrent:all(),
432 [T || T <- Torrents,
433 T#torrent.state =:= seeding].