Create a peer manager, and a counter manager.
[etorrent.git] / lib / etorrent-1.0 / src / etorrent_choker.erl
blob64eb01a77eb9b383c5fa2fa310011c7ff1d8b5a8
1 %%%-------------------------------------------------------------------
2 %%% File : etorrent_t_peer_group.erl
3 %%% Author : Jesper Louis Andersen <jesper.louis.andersen@gmail.com>
4 %%% License : See COPYING
5 %%% Description : Master process for a number of peers.
6 %%%
7 %%% Created : 18 Jul 2007 by
8 %%% Jesper Louis Andersen <jesper.louis.andersen@gmail.com>
9 %%%-------------------------------------------------------------------
10 -module(etorrent_choker).
12 -behaviour(gen_server).
14 -include("rate_mgr.hrl").
15 -include("peer_state.hrl").
17 -include("etorrent_mnesia_table.hrl").
19 %% API
20 -export([start_link/1, perform_rechoke/0, monitor/1]).
22 %% gen_server callbacks
23 -export([init/1, handle_call/3, handle_cast/2, handle_info/2,
24 terminate/2, code_change/3]).
26 -record(state, {our_peer_id = none,
27 info_hash = none,
29 timer_ref = none,
30 round = 0,
32 optimistic_unchoke_pid = none,
33 opt_unchoke_chain = []}).
35 -record(rechoke_info, {pid :: pid(),
36 kind :: 'seeding' | 'leeching',
37 state :: 'seeding' | 'leeching' ,
38 snubbed :: bool(),
39 r_interest_state :: 'interested' | 'not_interested',
40 r_choke_state :: 'choked' | 'unchoked' ,
41 l_choke :: bool(),
42 rate :: float() }).
44 -define(SERVER, ?MODULE).
46 -define(ROUND_TIME, 10000).
48 %%====================================================================
49 %% API
50 %%====================================================================
51 start_link(OurPeerId) ->
52 gen_server:start_link({local, ?SERVER}, ?MODULE, [OurPeerId], []).
54 perform_rechoke() ->
55 gen_server:cast(?SERVER, rechoke).
57 monitor(Pid) ->
58 gen_server:call(?SERVER, {monitor, Pid}).
60 %%====================================================================
61 %% gen_server callbacks
62 %%====================================================================
64 init([OurPeerId]) ->
65 process_flag(trap_exit, true),
66 {ok, Tref} = timer:send_interval(?ROUND_TIME, self(), round_tick),
67 {ok, #state{ our_peer_id = OurPeerId,
68 timer_ref = Tref}}.
70 handle_call({monitor, Pid}, _From, S) ->
71 _Tref = erlang:monitor(process, Pid),
72 NewChain = insert_new_peer_into_chain(Pid, S#state.opt_unchoke_chain),
73 perform_rechoke(),
74 {reply, ok, S#state { opt_unchoke_chain = NewChain }};
75 handle_call(Request, _From, State) ->
76 error_logger:error_report([unknown_peer_group_call, Request]),
77 Reply = ok,
78 {reply, Reply, State}.
79 handle_cast(rechoke, S) ->
80 rechoke(S),
81 {noreply, S};
82 handle_cast(_Msg, State) ->
83 {noreply, State}.
85 handle_info(round_tick, S) ->
86 case S#state.round of
87 0 ->
88 {ok, NS} = advance_optimistic_unchoke(S),
89 rechoke(NS),
90 {noreply, NS#state { round = 2}};
91 N when is_integer(N) ->
92 rechoke(S),
93 {noreply, S#state{round = S#state.round - 1}}
94 end;
95 handle_info({'DOWN', _Ref, process, Pid, Reason}, S)
96 when (Reason =:= normal) or (Reason =:= shutdown) ->
97 % The peer shut down normally. Hence we just remove him and start up
98 % other peers. Eventually the tracker will re-add him to the peer list
100 % XXX: We might have to do something else
101 rechoke(S),
103 NewChain = lists:delete(Pid, S#state.opt_unchoke_chain),
104 {noreply, S#state { opt_unchoke_chain = NewChain }};
105 handle_info({'DOWN', _Ref, process, Pid, _Reason}, S) ->
106 % The peer shut down unexpectedly re-add him to the queue in the *back*
107 NS = case etorrent_peer:select(Pid) of
108 [_Peer] ->
109 % XXX: We might have to check that remote is intersted and we were choking
110 rechoke(S);
111 [] -> S
112 end,
114 NewChain = lists:delete(Pid, NS#state.opt_unchoke_chain),
115 {noreply, NS#state{opt_unchoke_chain = NewChain}};
116 handle_info(Info, State) ->
117 error_logger:error_report([unknown_info_peer_group, Info]),
118 {noreply, State}.
120 terminate(Reason, _S) ->
121 error_logger:info_report([peer_group_mgr_term, Reason]),
124 code_change(_OldVsn, State, _Extra) ->
125 {ok, State}.
127 %%--------------------------------------------------------------------
128 %%% Internal functions
129 %%--------------------------------------------------------------------
131 %%--------------------------------------------------------------------
132 %% Function: rechoke(State) -> ok
133 %% Description: Recalculate the choke/unchoke state of peers
134 %%--------------------------------------------------------------------
135 rechoke(S) ->
136 Peers = build_rechoke_info(S#state.opt_unchoke_chain),
137 {PreferredDown, PreferredSeed} = split_preferred(Peers),
138 PreferredSet = prune_preferred_peers(PreferredDown, PreferredSeed),
139 ToChoke = rechoke_unchoke(Peers, PreferredSet),
140 rechoke_choke(ToChoke, 0, optimistics(PreferredSet)).
142 build_rechoke_info(Peers) ->
143 SeederSet = sets:from_list(seeding_torrents()),
144 build_rechoke_info(SeederSet, Peers).
146 build_rechoke_info(_Seeding, []) ->
148 build_rechoke_info(Seeding, [Pid | Next]) ->
149 case etorrent_peer:select(Pid) of
150 [] -> build_rechoke_info(Seeding, Next);
151 [Peer] ->
152 Kind = Peer#peer.state,
153 Snubbed = etorrent_rate_mgr:snubbed(Peer#peer.torrent_id, Pid),
154 PeerState = etorrent_rate_mgr:select_state(Peer#peer.torrent_id, Pid),
155 case sets:is_element(Peer#peer.torrent_id, Seeding) of
156 true ->
157 case etorrent_rate_mgr:fetch_send_rate(
158 Peer#peer.torrent_id,
159 Pid) of
160 none -> build_rechoke_info(Seeding, Next);
161 Rate ->
162 [#rechoke_info { pid = Pid,
163 kind = Kind,
164 state = seeding,
165 rate = Rate,
166 r_interest_state =
167 PeerState#peer_state.interest_state,
168 r_choke_state =
169 PeerState#peer_state.choke_state,
170 l_choke =
171 PeerState#peer_state.local_choke,
172 snubbed = Snubbed } |
173 build_rechoke_info(Seeding, Next)]
174 end;
175 false ->
176 case etorrent_rate_mgr:fetch_recv_rate(
177 Peer#peer.torrent_id,
178 Pid) of
179 none -> build_rechoke_info(Seeding, Next);
180 Rate ->
181 [#rechoke_info { pid = Pid,
182 kind = Kind,
183 state = leeching,
184 rate = -Rate, % Inverted for later sorting!
185 r_interest_state =
186 PeerState#peer_state.interest_state,
187 r_choke_state =
188 PeerState#peer_state.choke_state,
189 l_choke =
190 PeerState#peer_state.local_choke,
191 snubbed = Snubbed } |
192 build_rechoke_info(Seeding, Next)]
195 end.
197 advance_optimistic_unchoke(S) ->
198 NewChain = move_cyclic_chain(S#state.opt_unchoke_chain),
199 case NewChain of
200 [] ->
201 {ok, S}; %% No peers yet
202 [H | _T] ->
203 etorrent_t_peer_recv:unchoke(H),
204 {ok, S#state { opt_unchoke_chain = NewChain,
205 optimistic_unchoke_pid = H }}
206 end.
208 move_cyclic_chain([]) -> [];
209 move_cyclic_chain(Chain) ->
210 F = fun (Pid) ->
211 case etorrent_peer:select(Pid) of
212 [] -> true;
213 [P] -> T = etorrent_rate_mgr:select_state(
214 P#peer.torrent_id,
215 Pid),
216 not (T#peer_state.interest_state =:= interested
217 andalso T#peer_state.choke_state =:= choked)
219 end,
220 {Front, Back} = lists:splitwith(F, Chain),
221 %% Advance chain
222 Back ++ Front.
224 insert_new_peer_into_chain(Pid, Chain) ->
225 Length = length(Chain),
226 Index = lists:max([0, crypto:rand_uniform(0, Length)]),
227 {Front, Back} = lists:split(Index, Chain),
228 Front ++ [Pid | Back].
230 upload_slots() ->
231 case application:get_env(etorrent, max_upload_slots) of
232 {ok, auto} ->
233 {ok, Rate} = application:get_env(etorrent, max_upload_rate),
234 case Rate of
235 N when N =< 0 -> 7; %% Educated guess
236 N when N < 9 -> 2;
237 N when N < 15 -> 3;
238 N when N < 42 -> 4;
239 N ->
240 round(math:sqrt(N * 0.6))
241 end;
242 {ok, N} when is_integer(N) ->
244 end.
246 split_preferred(Peers) ->
247 {Downs, Leechs} = split_preferred_peers(Peers, [], []),
248 {lists:keysort(#rechoke_info.rate, Downs),
249 lists:keysort(#rechoke_info.rate, Leechs)}.
251 prune_preferred_peers(SDowns, SLeechs) ->
252 MaxUploads = upload_slots(),
253 DUploads = lists:max([1, round(MaxUploads * 0.7)]),
254 SUploads = lists:max([1, round(MaxUploads * 0.3)]),
255 {SUP2, DUP2} =
256 case lists:max([0, DUploads - length(SDowns)]) of
257 0 -> {SUploads, DUploads};
258 N -> {SUploads + N, DUploads - N}
259 end,
260 {SUP3, DUP3} =
261 case lists:max([0, SUP2 - length(SLeechs)]) of
262 0 -> {SUP2, DUP2};
263 K ->
264 {SUP2 - K, lists:min([DUP2 + K, length(SDowns)])}
265 end,
266 {TSDowns, TSLeechs} = {lists:sublist(SDowns, DUP3),
267 lists:sublist(SLeechs, SUP3)},
268 sets:union(sets:from_list(TSDowns), sets:from_list(TSLeechs)).
270 rechoke_unchoke([], _PS) ->
272 rechoke_unchoke([P | Next], PSet) ->
273 case sets:is_element(P, PSet) of
274 true ->
275 etorrent_t_peer_recv:unchoke(P#rechoke_info.pid),
276 rechoke_unchoke(Next, PSet);
277 false ->
278 [P | rechoke_unchoke(Next, PSet)]
279 end.
281 optimistics(PSet) ->
282 MinUp = case application:get_env(etorrent, min_uploads) of
283 {ok, N} -> N;
284 undefined -> 1
285 end,
286 lists:max([MinUp, upload_slots() - sets:size(PSet)]).
288 rechoke_choke([], _Count, _Optimistics) ->
290 rechoke_choke([P | Next], Count, Optimistics) when Count >= Optimistics ->
291 etorrent_t_peer_recv:choke(P#rechoke_info.pid),
292 rechoke_choke(Next, Count, Optimistics);
293 rechoke_choke([P | Next], Count, Optimistics) ->
294 case P#rechoke_info.kind =:= seeding of
295 true ->
296 etorrent_t_peer_recv:choke(P#rechoke_info.pid),
297 rechoke_choke(Next, Count, Optimistics);
298 false ->
299 etorrent_t_peer_recv:unchoke(P#rechoke_info.pid),
300 case P#rechoke_info.r_interest_state =:= interested of
301 true ->
302 rechoke_choke(Next, Count+1, Optimistics);
303 false ->
304 rechoke_choke(Next, Count, Optimistics)
306 end.
308 split_preferred_peers([], Downs, Leechs) ->
309 {Downs, Leechs};
310 split_preferred_peers([P | Next], Downs, Leechs) ->
311 case P#rechoke_info.kind =:= seeding
312 orelse P#rechoke_info.r_interest_state =:= not_interested of
313 true ->
314 split_preferred_peers(Next, Downs, Leechs);
315 false when P#rechoke_info.state =:= seeding ->
316 split_preferred_peers(Next, Downs, [P | Leechs]);
317 false when P#rechoke_info.snubbed =:= true ->
318 split_preferred_peers(Next, Downs, Leechs);
319 false ->
320 split_preferred_peers(Next, [P | Downs], Leechs)
321 end.
323 seeding_torrents() ->
324 {atomic, Torrents} = etorrent_torrent:all(),
325 [T#torrent.id || T <- Torrents,
326 T#torrent.state =:= seeding].