Implement the new choking algorithm. Still some loose ends, but client works.
[etorrent.git] / lib / etorrent-1.0 / src / etorrent_choker.erl
blob09c861db32d58ef5e55a2d3cb3767a02325fd2a2
1 %%%-------------------------------------------------------------------
2 %%% File : etorrent_t_peer_group.erl
3 %%% Author : Jesper Louis Andersen <jesper.louis.andersen@gmail.com>
4 %%% License : See COPYING
5 %%% Description : Master process for a number of peers.
6 %%%
7 %%% Created : 18 Jul 2007 by
8 %%% Jesper Louis Andersen <jesper.louis.andersen@gmail.com>
9 %%%-------------------------------------------------------------------
10 -module(etorrent_choker).
12 -behaviour(gen_server).
14 -include("rate_mgr.hrl").
15 -include("peer_state.hrl").
17 -include("etorrent_mnesia_table.hrl").
19 %% API
20 -export([start_link/1, add_peers/2, new_incoming_peer/4, perform_rechoke/0]).
22 %% gen_server callbacks
23 -export([init/1, handle_call/3, handle_cast/2, handle_info/2,
24 terminate/2, code_change/3]).
26 -record(state, {available_peers = [],
27 bad_peers = none,
28 our_peer_id = none,
29 info_hash = none,
31 num_peers = 0,
32 timer_ref = none,
33 round = 0,
35 optimistic_unchoke_pid = none,
36 opt_unchoke_chain = []}).
38 -record(rechoke_info, {pid :: pid(),
39 kind :: 'seeding' | 'leeching',
40 state :: 'seeding' | 'leeching' ,
41 snubbed :: bool(),
42 r_interest_state :: 'interested' | 'not_interested',
43 r_choke_state :: 'choked' | 'unchoked' ,
44 rate :: float() }).
46 -define(SERVER, ?MODULE).
47 -define(MAX_PEER_PROCESSES, 40).
48 -define(ROUND_TIME, 10000).
50 %%====================================================================
51 %% API
52 %%====================================================================
53 start_link(OurPeerId) ->
54 gen_server:start_link({local, ?SERVER}, ?MODULE, [OurPeerId], []).
56 add_peers(TorrentId, IPList) ->
57 gen_server:cast(?SERVER, {add_peers, [{TorrentId, IPP} || IPP <- IPList]}).
59 perform_rechoke() ->
60 gen_server:cast(?SERVER, rechoke).
62 new_incoming_peer(IP, Port, PeerId, InfoHash) ->
63 %% Set a pretty graceful timeout here as the peer_group can be pretty heavily
64 %% loaded at times. We have 5 acceptors by default anyway.
65 gen_server:call(?SERVER, {new_incoming_peer, IP, Port, PeerId, InfoHash}, 15000).
67 %%====================================================================
68 %% gen_server callbacks
69 %%====================================================================
71 init([OurPeerId]) ->
72 process_flag(trap_exit, true),
73 {ok, Tref} = timer:send_interval(?ROUND_TIME, self(), round_tick),
74 {ok, #state{ our_peer_id = OurPeerId,
75 bad_peers = dict:new(),
76 timer_ref = Tref}}.
79 handle_call({new_incoming_peer, _IP, _Port, PeerId, _InfoHash}, _From, S)
80 when S#state.our_peer_id =:= PeerId ->
81 {reply, connect_to_ourselves, S};
82 handle_call({new_incoming_peer, IP, Port, _PeerId, InfoHash}, _From, S) ->
83 {atomic, [TM]} = etorrent_tracking_map:select({infohash, InfoHash}),
84 case etorrent_bad_peer_mgr:is_bad_peer(IP, Port, TM#tracking_map.id) of
85 true ->
86 {reply, bad_peer, S};
87 false ->
88 start_new_incoming_peer(IP, Port, InfoHash, S)
89 end;
90 handle_call(Request, _From, State) ->
91 error_logger:error_report([unknown_peer_group_call, Request]),
92 Reply = ok,
93 {reply, Reply, State}.
95 handle_cast({add_peers, IPList}, S) ->
96 {ok, NS} = start_new_peers(IPList, S),
97 {noreply, NS};
98 handle_cast(rechoke, S) ->
99 rechoke(S),
100 {noreply, S};
101 handle_cast(_Msg, State) ->
102 {noreply, State}.
104 handle_info(round_tick, S) ->
105 case S#state.round of
106 0 ->
107 {ok, NS} = advance_optimistic_unchoke(S),
108 rechoke(NS),
109 {noreply, NS#state { round = 2}};
110 N when is_integer(N) ->
111 rechoke(S),
112 {noreply, S#state{round = S#state.round - 1}}
113 end;
114 handle_info({'DOWN', _Ref, process, Pid, Reason}, S)
115 when (Reason =:= normal) or (Reason =:= shutdown) ->
116 % The peer shut down normally. Hence we just remove him and start up
117 % other peers. Eventually the tracker will re-add him to the peer list
119 % XXX: We might have to do something else
120 rechoke(S),
122 NewChain = lists:delete(Pid, S#state.opt_unchoke_chain),
123 {ok, NS} = start_new_peers([], S#state { num_peers = S#state.num_peers -1,
124 opt_unchoke_chain = NewChain }),
125 {noreply, NS};
126 handle_info({'DOWN', _Ref, process, Pid, _Reason}, S) ->
127 % The peer shut down unexpectedly re-add him to the queue in the *back*
128 NS = case etorrent_peer:select(Pid) of
129 [Peer] ->
130 {IP, Port} = {Peer#peer.ip, Peer#peer.port},
132 % XXX: We might have to check that remote is intersted and we were choking
133 rechoke(S),
134 S#state { available_peers = S#state.available_peers ++ [{IP, Port}]};
135 [] -> S
136 end,
138 NewChain = lists:delete(Pid, NS#state.opt_unchoke_chain),
139 {noreply, NS#state{num_peers = NS#state.num_peers -1,
140 opt_unchoke_chain = NewChain}};
141 handle_info(Info, State) ->
142 error_logger:error_report([unknown_info_peer_group, Info]),
143 {noreply, State}.
145 terminate(Reason, _S) ->
146 error_logger:info_report([peer_group_mgr_term, Reason]),
149 code_change(_OldVsn, State, _Extra) ->
150 {ok, State}.
152 %%--------------------------------------------------------------------
153 %%% Internal functions
154 %%--------------------------------------------------------------------
156 start_new_incoming_peer(IP, Port, InfoHash, S) ->
157 case max_peer_processes() - S#state.num_peers of
158 N when N =< 0 ->
159 {reply, already_enough_connections, S};
160 N when is_integer(N), N > 0 ->
161 {atomic, [T]} = etorrent_tracking_map:select({infohash, InfoHash}),
162 {ok, Pid} = etorrent_t_sup:add_peer(
163 T#tracking_map.supervisor_pid,
164 S#state.our_peer_id,
165 InfoHash,
166 T#tracking_map.id,
167 {IP, Port}),
168 erlang:monitor(process, Pid),
169 NewChain = insert_new_peer_into_chain(Pid, S#state.opt_unchoke_chain),
170 rechoke(S),
171 {reply, {ok, Pid},
172 S#state { num_peers = S#state.num_peers+1,
173 opt_unchoke_chain = NewChain}}
174 end.
177 start_new_peers(IPList, State) ->
178 %% Update the PeerList with the new incoming peers
179 PeerList = lists:usort(IPList ++ State#state.available_peers),
180 S = State#state { available_peers = PeerList},
182 %% Replenish the connected peers.
183 fill_peers(max_peer_processes() - S#state.num_peers, S).
185 %%% NOTE: fill_peers/2 and spawn_new_peer/5 tail calls each other.
186 fill_peers(0, S) ->
187 {ok, S};
188 fill_peers(N, S) ->
189 case S#state.available_peers of
190 [] ->
191 % No peers available, just stop trying to fill peers
192 {ok, S};
193 [{TorrentId, {IP, Port}} | R] ->
194 % Possible peer. Check it.
195 case etorrent_bad_peer_mgr:is_bad_peer(IP, Port, TorrentId) of
196 true ->
197 fill_peers(N, S#state{available_peers = R});
198 false ->
199 spawn_new_peer(IP, Port, TorrentId, N, S#state{available_peers = R})
201 end.
203 %%--------------------------------------------------------------------
204 %% Function: spawn_new_peer(IP, Port, N, S) -> {ok, State}
205 %% Description: Attempt to spawn the peer at IP/Port. N is the number of
206 %% peers we still need to spawn and S is the current state. Returns
207 %% a new state to be put into the process.
208 %%--------------------------------------------------------------------
209 spawn_new_peer(IP, Port, TorrentId, N, S) ->
210 case etorrent_peer:connected(IP, Port, TorrentId) of
211 true ->
212 fill_peers(N, S);
213 false ->
214 {atomic, [TM]} = etorrent_tracking_map:select(TorrentId),
215 {ok, Pid} = etorrent_t_sup:add_peer(
216 TM#tracking_map.supervisor_pid,
217 S#state.our_peer_id,
218 TM#tracking_map.info_hash,
219 TorrentId,
220 {IP, Port}),
221 erlang:monitor(process, Pid),
222 etorrent_t_peer_recv:connect(Pid, IP, Port),
223 NewChain = insert_new_peer_into_chain(Pid, S#state.opt_unchoke_chain),
224 rechoke(S),
225 fill_peers(N-1, S#state { num_peers = S#state.num_peers +1,
226 opt_unchoke_chain = NewChain})
227 end.
229 %%--------------------------------------------------------------------
230 %% Function: rechoke(State) -> ok
231 %% Description: Recalculate the choke/unchoke state of peers
232 %%--------------------------------------------------------------------
233 rechoke(S) ->
234 Peers = build_rechoke_info(S#state.opt_unchoke_chain),
235 {PreferredDown, PreferredSeed} = split_preferred(Peers),
236 PreferredSet = prune_preferred_peers(PreferredDown, PreferredSeed),
237 {N, ToChoke} = rechoke_unchoke(Peers, PreferredSet, 0, []),
238 rechoke_choke(ToChoke, N, optimistics(PreferredSet)).
240 build_rechoke_info(Peers) ->
241 SeederSet = sets:from_list(seeding_torrents()),
242 build_rechoke_info(SeederSet, Peers).
244 build_rechoke_info(_Seeding, []) ->
246 build_rechoke_info(Seeding, [Pid | Next]) ->
247 case etorrent_peer:select(Pid) of
248 [] -> build_rechoke_info(Seeding, Next);
249 [Peer] ->
250 Kind = Peer#peer.state,
251 Snubbed = etorrent_rate_mgr:snubbed(Peer#peer.torrent_id, Pid),
252 PeerState = etorrent_rate_mgr:select_state(Peer#peer.torrent_id, Pid),
253 case sets:is_element(Peer#peer.torrent_id, Seeding) of
254 true ->
255 case etorrent_rate_mgr:fetch_send_rate(
256 Peer#peer.torrent_id,
257 Pid) of
258 none -> build_rechoke_info(Seeding, Next);
259 Rate ->
260 [#rechoke_info { pid = Pid,
261 kind = Kind,
262 state = seeding,
263 rate = Rate,
264 r_interest_state =
265 PeerState#peer_state.interest_state,
266 r_choke_state =
267 PeerState#peer_state.choke_state,
268 snubbed = Snubbed } |
269 build_rechoke_info(Seeding, Next)]
270 end;
271 false ->
272 case etorrent_rate_mgr:fetch_recv_rate(
273 Peer#peer.torrent_id,
274 Pid) of
275 none -> build_rechoke_info(Seeding, Next);
276 Rate ->
277 [#rechoke_info { pid = Pid,
278 kind = Kind,
279 state = leeching,
280 rate = -Rate, % Inverted for later sorting!
281 snubbed = Snubbed } |
282 build_rechoke_info(Seeding, Next)]
285 end.
287 advance_optimistic_unchoke(S) ->
288 NewChain = move_cyclic_chain(S#state.opt_unchoke_chain),
289 case NewChain of
290 [] ->
291 {ok, S}; %% No peers yet
292 [H | _T] ->
293 etorrent_t_peer_recv:unchoke(H),
294 {ok, S#state { opt_unchoke_chain = NewChain,
295 optimistic_unchoke_pid = H }}
296 end.
298 %%TODO: Fix cyclic chain move!
299 move_cyclic_chain([]) -> [];
300 move_cyclic_chain(Chain) ->
301 F = fun (P) -> local_unchoked(P, todo_move_cyclic_chain_all) end,
302 {Front, Back} = lists:splitwith(F, Chain),
303 %% Advance chain
304 Back ++ Front.
306 local_unchoked(Pid, TorrentId) ->
307 case ets:lookup(etorrent_peer_state, {TorrentId, Pid}) of
308 [] -> true;
309 [P] -> P#peer_state.local_choke =:= true
310 end.
312 insert_new_peer_into_chain(Pid, Chain) ->
313 Length = length(Chain),
314 Index = lists:max([0, crypto:rand_uniform(0, Length)]),
315 {Front, Back} = lists:split(Index, Chain),
316 Front ++ [Pid | Back].
318 max_peer_processes() ->
319 case application:get_env(etorrent, max_peers) of
320 {ok, N} when is_integer(N) ->
322 undefined ->
323 ?MAX_PEER_PROCESSES
324 end.
326 upload_slots() ->
327 case application:get_env(etorrent, max_upload_slots) of
328 {ok, auto} ->
329 {ok, Rate} = application:get_env(etorrent, max_upload_rate),
330 case Rate of
331 N when N =< 0 -> 7; %% Educated guess
332 N when N < 9 -> 2;
333 N when N < 15 -> 3;
334 N when N < 42 -> 4;
335 N ->
336 round(math:sqrt(N * 0.6))
337 end;
338 {ok, N} when is_integer(N) ->
340 end.
342 split_preferred(Peers) ->
343 {Downs, Leechs} = split_preferred_peers(Peers, [], []),
344 {lists:keysort(#rechoke_info.rate, Downs),
345 lists:keysort(#rechoke_info.rate, Leechs)}.
347 prune_preferred_peers(SDowns, SLeechs) ->
348 MaxUploads = upload_slots(),
349 DUploads = lists:max([1, round(MaxUploads * 0.7)]),
350 SUploads = lists:max([1, round(MaxUploads * 0.3)]),
351 {SUP2, DUP2} =
352 case lists:max([0, DUploads - length(SDowns)]) of
353 0 -> {SUploads, DUploads};
354 N -> {SUploads + N, DUploads - N}
355 end,
356 {SUP3, DUP3} =
357 case lists:max([0, SUP2 - length(SLeechs)]) of
358 0 -> {SUP2, DUP2};
359 K ->
360 {SUP2 - K, lists:min([DUP2 + K, length(SDowns)])}
361 end,
362 {TSDowns, TSLeechs} = {lists:sublist(SDowns, DUP3),
363 lists:sublist(SLeechs, SUP3)},
364 sets:union(sets:from_list(TSDowns), sets:from_list(TSLeechs)).
366 rechoke_unchoke([], _PS, Count, ToChoke) ->
367 {Count, ToChoke};
368 rechoke_unchoke([P | Next], PSet, Count, ToChoke) ->
369 case sets:is_element(P, PSet) of
370 true ->
371 etorrent_t_peer_recv:unchoke(P#rechoke_info.pid),
372 rechoke_unchoke(Next, PSet, Count+1, ToChoke);
373 false ->
374 rechoke_unchoke(Next, PSet, Count+1, [P | ToChoke])
375 end.
377 optimistics(PSet) ->
378 MinUp = case application:get_env(etorrent, min_uploads) of
379 {ok, N} -> N;
380 undefined -> 1
381 end,
382 lists:max([MinUp, upload_slots() - sets:size(PSet)]).
384 rechoke_choke([], _Count, _Optimistics) ->
386 rechoke_choke([P | Next], Count, Optimistics) when Count >= Optimistics ->
387 etorrent_t_peer_recv:choke(P#rechoke_info.pid),
388 rechoke_choke(Next, Count, Optimistics);
389 rechoke_choke([P | Next], Count, Optimistics) ->
390 case P#rechoke_info.kind =:= seeding of
391 true ->
392 etorrent_t_peer_recv:choke(P#rechoke_info.pid),
393 rechoke_choke(Next, Count, Optimistics);
394 false ->
395 etorrent_t_peer_recv:unchoke(P#rechoke_info.pid),
396 case P#rechoke_info.r_interest_state =:= interested of
397 true ->
398 rechoke_choke(Next, Count+1, Optimistics);
399 false ->
400 rechoke_choke(Next, Count, Optimistics)
402 end.
404 split_preferred_peers([], Downs, Leechs) ->
405 {Downs, Leechs};
406 split_preferred_peers([P | Next], Downs, Leechs) ->
407 case P#rechoke_info.state =:= seeding
408 orelse P#rechoke_info.r_interest_state =:= not_interested of
409 true ->
410 split_preferred_peers(Next, Downs, Leechs);
411 false when P#rechoke_info.state =:= seeding ->
412 split_preferred_peers(Next, Downs, [P | Leechs]);
413 false when P#rechoke_info.snubbed =:= true ->
414 split_preferred_peers(Next, Downs, Leechs);
415 false ->
416 split_preferred_peers(Next, [P | Downs], Leechs)
417 end.
419 seeding_torrents() ->
420 {atomic, Torrents} = etorrent_torrent:all(),
421 [T || T <- Torrents,
422 T#torrent.state =:= seeding].