1 %%%-------------------------------------------------------------------
2 %%% File : etorrent_t_peer_group.erl
3 %%% Author : Jesper Louis Andersen <jesper.louis.andersen@gmail.com>
4 %%% License : See COPYING
5 %%% Description : Master process for a number of peers.
7 %%% Created : 18 Jul 2007 by
8 %%% Jesper Louis Andersen <jesper.louis.andersen@gmail.com>
9 %%%-------------------------------------------------------------------
11 -module(etorrent_t_peer_group_mgr
).
13 -behaviour(gen_server
).
15 -include("rate_mgr.hrl").
16 -include("peer_state.hrl").
18 -include("etorrent_mnesia_table.hrl").
21 -export([start_link
/5, add_peers
/2, broadcast_have
/2, new_incoming_peer
/4,
22 broadcast_got_chunk
/2, perform_rechoke
/1,
23 broadcast_queue_pieces
/1]).
25 %% gen_server callbacks
26 -export([init
/1, handle_call
/3, handle_cast
/2, handle_info
/2,
27 terminate
/2, code_change
/3]).
29 -record(state
, {available_peers
= [],
38 optimistic_unchoke_pid
= none
,
39 opt_unchoke_chain
= [],
41 file_system_pid
= none
,
42 peer_group_sup
= none
,
45 -define(MAX_PEER_PROCESSES
, 40).
46 -define(ROUND_TIME
, 10000).
47 -define(DEFAULT_NUM_DOWNLOADERS
, 4).
49 %%====================================================================
51 %%====================================================================
52 start_link(OurPeerId
, PeerGroup
, InfoHash
,
53 FileSystemPid
, TorrentHandle
) ->
54 gen_server:start_link(?MODULE
, [OurPeerId
, PeerGroup
, InfoHash
,
55 FileSystemPid
, TorrentHandle
], []).
57 add_peers(Pid
, IPList
) ->
58 gen_server:cast(Pid
, {add_peers
, IPList
}).
60 broadcast_have(Pid
, Index
) ->
61 gen_server:cast(Pid
, {broadcast_have
, Index
}).
63 broadcast_got_chunk(Pid
, Chunk
) ->
64 gen_server:cast(Pid
, {broadcast_got_chunk
, Chunk
}).
66 broadcast_queue_pieces(Pid
) ->
67 gen_server:cast(Pid
, broadcast_queue_pieces
).
69 perform_rechoke(Pid
) ->
70 gen_server:cast(Pid
, rechoke
).
72 new_incoming_peer(Pid
, IP
, Port
, PeerId
) ->
73 %% Set a pretty graceful timeout here as the peer_group can be pretty heavily
74 %% loaded at times. We have 5 acceptors by default anyway.
75 gen_server:call(Pid
, {new_incoming_peer
, IP
, Port
, PeerId
}, 15000).
77 %%====================================================================
78 %% gen_server callbacks
79 %%====================================================================
81 init([OurPeerId
, PeerGroup
, InfoHash
,
82 FileSystemPid
, TorrentId
]) when is_integer(TorrentId
) ->
83 process_flag(trap_exit
, true
),
84 {ok
, Tref
} = timer:send_interval(?ROUND_TIME
, self(), round_tick
),
85 {ok
, #state
{ our_peer_id
= OurPeerId
,
86 peer_group_sup
= PeerGroup
,
87 bad_peers
= dict:new(),
90 torrent_id
= TorrentId
,
91 file_system_pid
= FileSystemPid
}}.
93 handle_call({new_incoming_peer
, _IP
, _Port
, PeerId
}, _From
, S
)
94 when S#state
.our_peer_id
=:= PeerId
->
95 {reply
, connect_to_ourselves
, S
};
96 handle_call({new_incoming_peer
, IP
, Port
, _PeerId
}, _From
, S
) ->
97 case etorrent_bad_peer_mgr:is_bad_peer(IP
, Port
, S#state
.torrent_id
) of
101 start_new_incoming_peer(IP
, Port
, S
)
103 handle_call(Request
, _From
, State
) ->
104 error_logger:error_report([unknown_peer_group_call
, Request
]),
106 {reply
, Reply
, State
}.
108 handle_cast({add_peers
, IPList
}, S
) ->
109 {ok
, NS
} = start_new_peers(IPList
, S
),
111 handle_cast({broadcast_have
, Index
}, S
) ->
112 broadcast_have_message(Index
, S
),
114 handle_cast({broadcast_got_chunk
, Chunk
}, S
) ->
115 bcast_got_chunk(Chunk
, S
),
117 handle_cast(rechoke
, S
) ->
120 handle_cast(broadcast_queue_pieces
, S
) ->
121 bcast_queue_pieces(S
),
123 handle_cast(_Msg
, State
) ->
126 handle_info(round_tick
, S
) ->
127 case S#state
.round of
129 {ok
, NS
} = advance_optimistic_unchoke(S
),
131 {noreply
, NS#state
{ round = 2}};
132 N
when is_integer(N
) ->
134 {noreply
, S#state
{round = S#state
.round - 1}}
136 handle_info({'DOWN', _Ref
, process, Pid
, Reason
}, S
)
137 when (Reason
=:= normal
) or (Reason
=:= shutdown
) ->
138 % The peer shut down normally. Hence we just remove him and start up
139 % other peers. Eventually the tracker will re-add him to the peer list
141 % XXX: We might have to do something else
144 NewChain
= lists:delete(Pid
, S#state
.opt_unchoke_chain
),
145 {ok
, NS
} = start_new_peers([], S#state
{ num_peers
= S#state
.num_peers
-1,
146 opt_unchoke_chain
= NewChain
}),
148 handle_info({'DOWN', _Ref
, process, Pid
, _Reason
}, S
) ->
149 % The peer shut down unexpectedly re-add him to the queue in the *back*
150 NS
= case etorrent_peer:select(Pid
) of
152 {IP
, Port
} = {Peer#peer
.ip
, Peer#peer
.port
},
154 % XXX: We might have to check that remote is intersted and we were choking
156 S#state
{ available_peers
= S#state
.available_peers
++ [{IP
, Port
}]};
160 NewChain
= lists:delete(Pid
, NS#state
.opt_unchoke_chain
),
161 {noreply
, NS#state
{num_peers
= NS#state
.num_peers
-1,
162 opt_unchoke_chain
= NewChain
}};
163 handle_info(Info
, State
) ->
164 error_logger:error_report([unknown_info_peer_group
, Info
]),
167 terminate(Reason
, _S
) ->
168 error_logger:info_report([peer_group_mgr_term
, Reason
]),
171 code_change(_OldVsn
, State
, _Extra
) ->
174 %%--------------------------------------------------------------------
175 %%% Internal functions
176 %%--------------------------------------------------------------------
178 start_new_incoming_peer(IP
, Port
, S
) ->
179 case ?MAX_PEER_PROCESSES
- S#state
.num_peers
of
181 {reply
, already_enough_connections
, S
};
182 N
when is_integer(N
), N
> 0 ->
183 {ok
, Pid
} = etorrent_t_peer_pool_sup:add_peer(
184 S#state
.peer_group_sup
,
187 S#state
.file_system_pid
,
191 erlang:monitor(process, Pid
),
192 NewChain
= insert_new_peer_into_chain(Pid
, S#state
.opt_unchoke_chain
),
195 S#state
{ num_peers
= S#state
.num_peers
+1,
196 opt_unchoke_chain
= NewChain
}}
200 %% Apply F to each Peer Pid
202 Peers
= etorrent_peer:all(S#state
.torrent_id
),
203 lists:foreach(F
, Peers
),
206 bcast_got_chunk(Chunk
, S
) ->
207 foreach_pid(fun (Peer
) ->
208 etorrent_t_peer_recv:endgame_got_chunk(Peer#peer
.pid, Chunk
)
212 bcast_queue_pieces(S
) ->
213 foreach_pid(fun (P
) ->
214 etorrent_t_peer_recv:queue_pieces(P#peer
.pid)
218 broadcast_have_message(Index
, S
) ->
219 foreach_pid(fun (Peer
) ->
220 etorrent_t_peer_recv:send_have_piece(Peer#peer
.pid, Index
)
224 start_new_peers(IPList
, State
) ->
225 %% Update the PeerList with the new incoming peers
226 PeerList
= lists:usort(IPList
++ State#state
.available_peers
),
227 S
= State#state
{ available_peers
= PeerList
},
229 %% Replenish the connected peers.
230 fill_peers(?MAX_PEER_PROCESSES
- S#state
.num_peers
, S
).
232 %%% NOTE: fill_peers/2 and spawn_new_peer/5 tail calls each other.
236 case S#state
.available_peers
of
238 % No peers available, just stop trying to fill peers
241 % Possible peer. Check it.
242 case etorrent_bad_peer_mgr:is_bad_peer(IP
, Port
, S#state
.torrent_id
) of
244 fill_peers(N
, S#state
{available_peers
= R
});
246 spawn_new_peer(IP
, Port
, N
, S#state
{available_peers
= R
})
250 %%--------------------------------------------------------------------
251 %% Function: spawn_new_peer(IP, Port, N, S) -> {ok, State}
252 %% Description: Attempt to spawn the peer at IP/Port. N is the number of
253 %% peers we still need to spawn and S is the current state. Returns
254 %% a new state to be put into the process.
255 %%--------------------------------------------------------------------
256 spawn_new_peer(IP
, Port
, N
, S
) ->
257 case etorrent_peer:connected(IP
, Port
, S#state
.torrent_id
) of
261 {ok
, Pid
} = etorrent_t_peer_pool_sup:add_peer(
262 S#state
.peer_group_sup
,
265 S#state
.file_system_pid
,
269 erlang:monitor(process, Pid
),
270 etorrent_t_peer_recv:connect(Pid
, IP
, Port
),
271 NewChain
= insert_new_peer_into_chain(Pid
, S#state
.opt_unchoke_chain
),
273 fill_peers(N
-1, S#state
{ num_peers
= S#state
.num_peers
+1,
274 opt_unchoke_chain
= NewChain
})
277 %%--------------------------------------------------------------------
278 %% Function: rechoke(State) -> ok
279 %% Description: Recalculate the choke/unchoke state of peers
280 %%--------------------------------------------------------------------
282 Table
= case etorrent_torrent:mode(S#state
.torrent_id
) of
283 seeding
-> etorrent_send_state
;
284 leeching
-> etorrent_recv_state
;
285 endgame
-> etorrent_recv_state
287 Peers
= select_fastest(S#state
.torrent_id
, Table
),
288 rechoke(Peers
, calculate_num_downloaders(S
), S
).
290 rechoke(Peers
, 0, S
) ->
291 lists:foreach(fun(P
) -> optimistic_unchoke_handler(P
, S
) end, Peers
),
293 rechoke([], _N
, _S
) ->
295 rechoke([Peer
| Rest
], N
, S
) when is_record(Peer
, rate_mgr
) ->
296 case ets:lookup(etorrent_peer_state
, Peer#rate_mgr
.pid) of
298 {_Id
, Pid
} = Peer#rate_mgr
.pid,
299 etorrent_t_peer_recv:unchoke(Pid
),
301 [#peer_state
{ interest_state
= I
, pid = {_Id
, Pid
}}] ->
304 etorrent_t_peer_recv:unchoke(Pid
),
305 rechoke(Rest
, N
-1, S
);
307 etorrent_t_peer_recv:unchoke(Pid
),
312 optimistic_unchoke_handler(#rate_mgr
{ pid = {_Id
, Pid
} }, S
) ->
313 case Pid
=:= S#state
.optimistic_unchoke_pid
of
315 ok
; % Handled elsewhere
317 etorrent_t_peer_recv:choke(Pid
)
320 %% TODO: Make number of downloaders depend on current rate!
321 calculate_num_downloaders(S
) ->
322 case ets:lookup(etorrent_peer_state
, {S#state
.torrent_id
,
323 S#state
.optimistic_unchoke_pid
}) of
325 ?DEFAULT_NUM_DOWNLOADERS
;
327 case P#peer_state
.interest_state
of
329 ?DEFAULT_NUM_DOWNLOADERS
- 1;
331 ?DEFAULT_NUM_DOWNLOADERS
335 advance_optimistic_unchoke(S
) ->
336 NewChain
= move_cyclic_chain(S#state
.opt_unchoke_chain
, S
),
339 {ok
, S
}; %% No peers yet
341 etorrent_t_peer_recv:unchoke(H
),
342 {ok
, S#state
{ opt_unchoke_chain
= NewChain
,
343 optimistic_unchoke_pid
= H
}}
346 move_cyclic_chain([], _S
) -> [];
347 move_cyclic_chain(Chain
, S
) ->
348 F
= fun (P
) -> local_unchoked(P
, S#state
.torrent_id
) end,
349 {Front
, Back
} = lists:splitwith(F
, Chain
),
353 local_unchoked(Pid
, TorrentId
) ->
354 case ets:lookup(etorrent_peer_state
, {TorrentId
, Pid
}) of
356 [P
] -> P#peer_state
.local_choke
=:= true
359 insert_new_peer_into_chain(Pid
, Chain
) ->
360 Length
= length(Chain
),
361 Index
= lists:max([0, crypto:rand_uniform(0, Length
)]),
362 {Front
, Back
} = lists:split(Index
, Chain
),
363 Front
++ [Pid
| Back
].
365 select_fastest(Id
, Table
) ->
366 Rows
= ets:select(Table
, [{{rate_mgr
,{Id
,'_'},'_'},[],['$_']}]),
367 lists:reverse(lists:keysort(#rate_mgr
.rate
, Rows
)).