Merge branch 'jlouis/types'
[etorrent.git] / lib / etorrent-1.0 / src / etorrent_t_peer_group_mgr.erl
blobee021c44d04c7a4b778b99d18a70871a36c28937
1 %%%-------------------------------------------------------------------
2 %%% File : etorrent_t_peer_group.erl
3 %%% Author : Jesper Louis Andersen <jesper.louis.andersen@gmail.com>
4 %%% License : See COPYING
5 %%% Description : Master process for a number of peers.
6 %%%
7 %%% Created : 18 Jul 2007 by
8 %%% Jesper Louis Andersen <jesper.louis.andersen@gmail.com>
9 %%%-------------------------------------------------------------------
11 -module(etorrent_t_peer_group_mgr).
13 -behaviour(gen_server).
15 -include("rate_mgr.hrl").
16 -include("peer_state.hrl").
18 -include("etorrent_mnesia_table.hrl").
20 %% API
21 -export([start_link/5, add_peers/2, broadcast_have/2, new_incoming_peer/4,
22 broadcast_got_chunk/2, perform_rechoke/1,
23 broadcast_queue_pieces/1]).
25 %% gen_server callbacks
26 -export([init/1, handle_call/3, handle_cast/2, handle_info/2,
27 terminate/2, code_change/3]).
29 -record(state, {available_peers = [],
30 bad_peers = none,
31 our_peer_id = none,
32 info_hash = none,
34 num_peers = 0,
35 timer_ref = none,
36 round = 0,
38 optimistic_unchoke_pid = none,
39 opt_unchoke_chain = [],
41 file_system_pid = none,
42 peer_group_sup = none,
43 torrent_id = none}).
45 -define(MAX_PEER_PROCESSES, 40).
46 -define(ROUND_TIME, 10000).
47 -define(DEFAULT_NUM_DOWNLOADERS, 4).
49 %%====================================================================
50 %% API
51 %%====================================================================
52 start_link(OurPeerId, PeerGroup, InfoHash,
53 FileSystemPid, TorrentHandle) ->
54 gen_server:start_link(?MODULE, [OurPeerId, PeerGroup, InfoHash,
55 FileSystemPid, TorrentHandle], []).
57 add_peers(Pid, IPList) ->
58 gen_server:cast(Pid, {add_peers, IPList}).
60 broadcast_have(Pid, Index) ->
61 gen_server:cast(Pid, {broadcast_have, Index}).
63 broadcast_got_chunk(Pid, Chunk) ->
64 gen_server:cast(Pid, {broadcast_got_chunk, Chunk}).
66 broadcast_queue_pieces(Pid) ->
67 gen_server:cast(Pid, broadcast_queue_pieces).
69 perform_rechoke(Pid) ->
70 gen_server:cast(Pid, rechoke).
72 new_incoming_peer(Pid, IP, Port, PeerId) ->
73 %% Set a pretty graceful timeout here as the peer_group can be pretty heavily
74 %% loaded at times. We have 5 acceptors by default anyway.
75 gen_server:call(Pid, {new_incoming_peer, IP, Port, PeerId}, 15000).
77 %%====================================================================
78 %% gen_server callbacks
79 %%====================================================================
81 init([OurPeerId, PeerGroup, InfoHash,
82 FileSystemPid, TorrentId]) when is_integer(TorrentId) ->
83 process_flag(trap_exit, true),
84 {ok, Tref} = timer:send_interval(?ROUND_TIME, self(), round_tick),
85 {ok, #state{ our_peer_id = OurPeerId,
86 peer_group_sup = PeerGroup,
87 bad_peers = dict:new(),
88 info_hash = InfoHash,
89 timer_ref = Tref,
90 torrent_id = TorrentId,
91 file_system_pid = FileSystemPid}}.
93 handle_call({new_incoming_peer, _IP, _Port, PeerId}, _From, S)
94 when S#state.our_peer_id =:= PeerId ->
95 {reply, connect_to_ourselves, S};
96 handle_call({new_incoming_peer, IP, Port, _PeerId}, _From, S) ->
97 case etorrent_bad_peer_mgr:is_bad_peer(IP, Port, S#state.torrent_id) of
98 true ->
99 {reply, bad_peer, S};
100 false ->
101 start_new_incoming_peer(IP, Port, S)
102 end;
103 handle_call(Request, _From, State) ->
104 error_logger:error_report([unknown_peer_group_call, Request]),
105 Reply = ok,
106 {reply, Reply, State}.
108 handle_cast({add_peers, IPList}, S) ->
109 {ok, NS} = start_new_peers(IPList, S),
110 {noreply, NS};
111 handle_cast({broadcast_have, Index}, S) ->
112 broadcast_have_message(Index, S),
113 {noreply, S};
114 handle_cast({broadcast_got_chunk, Chunk}, S) ->
115 bcast_got_chunk(Chunk, S),
116 {noreply, S};
117 handle_cast(rechoke, S) ->
118 rechoke(S),
119 {noreply, S};
120 handle_cast(broadcast_queue_pieces, S) ->
121 bcast_queue_pieces(S),
122 {noreply, S};
123 handle_cast(_Msg, State) ->
124 {noreply, State}.
126 handle_info(round_tick, S) ->
127 case S#state.round of
128 0 ->
129 {ok, NS} = advance_optimistic_unchoke(S),
130 rechoke(NS),
131 {noreply, NS#state { round = 2}};
132 N when is_integer(N) ->
133 rechoke(S),
134 {noreply, S#state{round = S#state.round - 1}}
135 end;
136 handle_info({'DOWN', _Ref, process, Pid, Reason}, S)
137 when (Reason =:= normal) or (Reason =:= shutdown) ->
138 % The peer shut down normally. Hence we just remove him and start up
139 % other peers. Eventually the tracker will re-add him to the peer list
141 % XXX: We might have to do something else
142 rechoke(S),
144 NewChain = lists:delete(Pid, S#state.opt_unchoke_chain),
145 {ok, NS} = start_new_peers([], S#state { num_peers = S#state.num_peers -1,
146 opt_unchoke_chain = NewChain }),
147 {noreply, NS};
148 handle_info({'DOWN', _Ref, process, Pid, _Reason}, S) ->
149 % The peer shut down unexpectedly re-add him to the queue in the *back*
150 NS = case etorrent_peer:select(Pid) of
151 [Peer] ->
152 {IP, Port} = {Peer#peer.ip, Peer#peer.port},
154 % XXX: We might have to check that remote is intersted and we were choking
155 rechoke(S),
156 S#state { available_peers = S#state.available_peers ++ [{IP, Port}]};
157 [] -> S
158 end,
160 NewChain = lists:delete(Pid, NS#state.opt_unchoke_chain),
161 {noreply, NS#state{num_peers = NS#state.num_peers -1,
162 opt_unchoke_chain = NewChain}};
163 handle_info(Info, State) ->
164 error_logger:error_report([unknown_info_peer_group, Info]),
165 {noreply, State}.
167 terminate(Reason, _S) ->
168 error_logger:info_report([peer_group_mgr_term, Reason]),
171 code_change(_OldVsn, State, _Extra) ->
172 {ok, State}.
174 %%--------------------------------------------------------------------
175 %%% Internal functions
176 %%--------------------------------------------------------------------
178 start_new_incoming_peer(IP, Port, S) ->
179 case ?MAX_PEER_PROCESSES - S#state.num_peers of
180 N when N =< 0 ->
181 {reply, already_enough_connections, S};
182 N when is_integer(N), N > 0 ->
183 {ok, Pid} = etorrent_t_peer_pool_sup:add_peer(
184 S#state.peer_group_sup,
185 S#state.our_peer_id,
186 S#state.info_hash,
187 S#state.file_system_pid,
188 self(),
189 S#state.torrent_id,
190 {IP, Port}),
191 erlang:monitor(process, Pid),
192 NewChain = insert_new_peer_into_chain(Pid, S#state.opt_unchoke_chain),
193 rechoke(S),
194 {reply, {ok, Pid},
195 S#state { num_peers = S#state.num_peers+1,
196 opt_unchoke_chain = NewChain}}
197 end.
200 %% Apply F to each Peer Pid
201 foreach_pid(F, S) ->
202 Peers = etorrent_peer:all(S#state.torrent_id),
203 lists:foreach(F, Peers),
206 bcast_got_chunk(Chunk, S) ->
207 foreach_pid(fun (Peer) ->
208 etorrent_t_peer_recv:endgame_got_chunk(Peer#peer.pid, Chunk)
209 end,
212 bcast_queue_pieces(S) ->
213 foreach_pid(fun (P) ->
214 etorrent_t_peer_recv:queue_pieces(P#peer.pid)
215 end,
218 broadcast_have_message(Index, S) ->
219 foreach_pid(fun (Peer) ->
220 etorrent_t_peer_recv:send_have_piece(Peer#peer.pid, Index)
221 end,
224 start_new_peers(IPList, State) ->
225 %% Update the PeerList with the new incoming peers
226 PeerList = lists:usort(IPList ++ State#state.available_peers),
227 S = State#state { available_peers = PeerList},
229 %% Replenish the connected peers.
230 fill_peers(?MAX_PEER_PROCESSES - S#state.num_peers, S).
232 %%% NOTE: fill_peers/2 and spawn_new_peer/5 tail calls each other.
233 fill_peers(0, S) ->
234 {ok, S};
235 fill_peers(N, S) ->
236 case S#state.available_peers of
237 [] ->
238 % No peers available, just stop trying to fill peers
239 {ok, S};
240 [{IP, Port} | R] ->
241 % Possible peer. Check it.
242 case etorrent_bad_peer_mgr:is_bad_peer(IP, Port, S#state.torrent_id) of
243 true ->
244 fill_peers(N, S#state{available_peers = R});
245 false ->
246 spawn_new_peer(IP, Port, N, S#state{available_peers = R})
248 end.
250 %%--------------------------------------------------------------------
251 %% Function: spawn_new_peer(IP, Port, N, S) -> {ok, State}
252 %% Description: Attempt to spawn the peer at IP/Port. N is the number of
253 %% peers we still need to spawn and S is the current state. Returns
254 %% a new state to be put into the process.
255 %%--------------------------------------------------------------------
256 spawn_new_peer(IP, Port, N, S) ->
257 case etorrent_peer:connected(IP, Port, S#state.torrent_id) of
258 true ->
259 fill_peers(N, S);
260 false ->
261 {ok, Pid} = etorrent_t_peer_pool_sup:add_peer(
262 S#state.peer_group_sup,
263 S#state.our_peer_id,
264 S#state.info_hash,
265 S#state.file_system_pid,
266 self(),
267 S#state.torrent_id,
268 {IP, Port}),
269 erlang:monitor(process, Pid),
270 etorrent_t_peer_recv:connect(Pid, IP, Port),
271 NewChain = insert_new_peer_into_chain(Pid, S#state.opt_unchoke_chain),
272 rechoke(S),
273 fill_peers(N-1, S#state { num_peers = S#state.num_peers +1,
274 opt_unchoke_chain = NewChain})
275 end.
277 %%--------------------------------------------------------------------
278 %% Function: rechoke(State) -> ok
279 %% Description: Recalculate the choke/unchoke state of peers
280 %%--------------------------------------------------------------------
281 rechoke(S) ->
282 Table = case etorrent_torrent:mode(S#state.torrent_id) of
283 seeding -> etorrent_send_state;
284 leeching -> etorrent_recv_state;
285 endgame -> etorrent_recv_state
286 end,
287 Peers = select_fastest(S#state.torrent_id, Table),
288 rechoke(Peers, calculate_num_downloaders(S), S).
290 rechoke(Peers, 0, S) ->
291 lists:foreach(fun(P) -> optimistic_unchoke_handler(P, S) end, Peers),
293 rechoke([], _N, _S) ->
295 rechoke([Peer | Rest], N, S) when is_record(Peer, rate_mgr) ->
296 case ets:lookup(etorrent_peer_state, Peer#rate_mgr.pid) of
297 [] ->
298 {_Id, Pid} = Peer#rate_mgr.pid,
299 etorrent_t_peer_recv:unchoke(Pid),
300 rechoke(Rest, N, S);
301 [#peer_state { interest_state = I, pid = {_Id, Pid}}] ->
302 case I of
303 interested ->
304 etorrent_t_peer_recv:unchoke(Pid),
305 rechoke(Rest, N-1, S);
306 not_interested ->
307 etorrent_t_peer_recv:unchoke(Pid),
308 rechoke(Rest, N, S)
310 end.
312 optimistic_unchoke_handler(#rate_mgr { pid = {_Id, Pid} }, S) ->
313 case Pid =:= S#state.optimistic_unchoke_pid of
314 true ->
315 ok; % Handled elsewhere
316 false ->
317 etorrent_t_peer_recv:choke(Pid)
318 end.
320 %% TODO: Make number of downloaders depend on current rate!
321 calculate_num_downloaders(S) ->
322 case ets:lookup(etorrent_peer_state, {S#state.torrent_id,
323 S#state.optimistic_unchoke_pid}) of
324 [] ->
325 ?DEFAULT_NUM_DOWNLOADERS;
326 [P] ->
327 case P#peer_state.interest_state of
328 interested ->
329 ?DEFAULT_NUM_DOWNLOADERS - 1;
330 not_interested ->
331 ?DEFAULT_NUM_DOWNLOADERS
333 end.
335 advance_optimistic_unchoke(S) ->
336 NewChain = move_cyclic_chain(S#state.opt_unchoke_chain, S),
337 case NewChain of
338 [] ->
339 {ok, S}; %% No peers yet
340 [H | _T] ->
341 etorrent_t_peer_recv:unchoke(H),
342 {ok, S#state { opt_unchoke_chain = NewChain,
343 optimistic_unchoke_pid = H }}
344 end.
346 move_cyclic_chain([], _S) -> [];
347 move_cyclic_chain(Chain, S) ->
348 F = fun (P) -> local_unchoked(P, S#state.torrent_id) end,
349 {Front, Back} = lists:splitwith(F, Chain),
350 %% Advance chain
351 Back ++ Front.
353 local_unchoked(Pid, TorrentId) ->
354 case ets:lookup(etorrent_peer_state, {TorrentId, Pid}) of
355 [] -> true;
356 [P] -> P#peer_state.local_choke =:= true
357 end.
359 insert_new_peer_into_chain(Pid, Chain) ->
360 Length = length(Chain),
361 Index = lists:max([0, crypto:rand_uniform(0, Length)]),
362 {Front, Back} = lists:split(Index, Chain),
363 Front ++ [Pid | Back].
365 select_fastest(Id, Table) ->
366 Rows = ets:select(Table, [{{rate_mgr,{Id,'_'},'_'},[],['$_']}]),
367 lists:reverse(lists:keysort(#rate_mgr.rate, Rows)).