1 %%%-------------------------------------------------------------------
2 %%% File : etorrent_t_peer_group.erl
3 %%% Author : Jesper Louis Andersen <jesper.louis.andersen@gmail.com>
4 %%% License : See COPYING
5 %%% Description : Master process for a number of peers.
7 %%% Created : 18 Jul 2007 by
8 %%% Jesper Louis Andersen <jesper.louis.andersen@gmail.com>
9 %%%-------------------------------------------------------------------
10 -module(etorrent_choker
).
12 -behaviour(gen_server
).
14 -include("rate_mgr.hrl").
15 -include("peer_state.hrl").
17 -include("etorrent_mnesia_table.hrl").
20 -export([start_link
/1, add_peers
/2, new_incoming_peer
/4, perform_rechoke
/0]).
22 %% gen_server callbacks
23 -export([init
/1, handle_call
/3, handle_cast
/2, handle_info
/2,
24 terminate
/2, code_change
/3]).
26 -record(state
, {available_peers
= [],
35 optimistic_unchoke_pid
= none
,
36 opt_unchoke_chain
= []}).
38 -record(rechoke_info
, {pid :: pid(),
39 kind
:: 'seeding' | 'leeching',
40 state
:: 'seeding' | 'leeching' ,
42 r_interest_state
:: 'interested' | 'not_interested',
43 r_choke_state
:: 'choked' | 'unchoked' ,
47 -define(SERVER
, ?MODULE
).
48 -define(MAX_PEER_PROCESSES
, 40).
49 -define(ROUND_TIME
, 10000).
51 %%====================================================================
53 %%====================================================================
54 start_link(OurPeerId
) ->
55 gen_server:start_link({local
, ?SERVER
}, ?MODULE
, [OurPeerId
], []).
57 add_peers(TorrentId
, IPList
) ->
58 gen_server:cast(?SERVER
, {add_peers
, [{TorrentId
, IPP
} || IPP
<- IPList
]}).
61 gen_server:cast(?SERVER
, rechoke
).
63 new_incoming_peer(IP
, Port
, PeerId
, InfoHash
) ->
64 %% Set a pretty graceful timeout here as the peer_group can be pretty heavily
65 %% loaded at times. We have 5 acceptors by default anyway.
66 gen_server:call(?SERVER
, {new_incoming_peer
, IP
, Port
, PeerId
, InfoHash
}, 15000).
68 %%====================================================================
69 %% gen_server callbacks
70 %%====================================================================
73 process_flag(trap_exit
, true
),
74 {ok
, Tref
} = timer:send_interval(?ROUND_TIME
, self(), round_tick
),
75 {ok
, #state
{ our_peer_id
= OurPeerId
,
76 bad_peers
= dict:new(),
80 handle_call({new_incoming_peer
, _IP
, _Port
, PeerId
, _InfoHash
}, _From
, S
)
81 when S#state
.our_peer_id
=:= PeerId
->
82 {reply
, connect_to_ourselves
, S
};
83 handle_call({new_incoming_peer
, IP
, Port
, _PeerId
, InfoHash
}, _From
, S
) ->
84 {atomic
, [TM
]} = etorrent_tracking_map:select({infohash
, InfoHash
}),
85 case etorrent_bad_peer_mgr:is_bad_peer(IP
, Port
, TM#tracking_map
.id
) of
89 start_new_incoming_peer(IP
, Port
, InfoHash
, S
)
91 handle_call(Request
, _From
, State
) ->
92 error_logger:error_report([unknown_peer_group_call
, Request
]),
94 {reply
, Reply
, State
}.
96 handle_cast({add_peers
, IPList
}, S
) ->
97 {ok
, NS
} = start_new_peers(IPList
, S
),
99 handle_cast(rechoke
, S
) ->
102 handle_cast(_Msg
, State
) ->
105 handle_info(round_tick
, S
) ->
106 case S#state
.round of
108 {ok
, NS
} = advance_optimistic_unchoke(S
),
110 {noreply
, NS#state
{ round = 2}};
111 N
when is_integer(N
) ->
113 {noreply
, S#state
{round = S#state
.round - 1}}
115 handle_info({'DOWN', _Ref
, process, Pid
, Reason
}, S
)
116 when (Reason
=:= normal
) or (Reason
=:= shutdown
) ->
117 % The peer shut down normally. Hence we just remove him and start up
118 % other peers. Eventually the tracker will re-add him to the peer list
120 % XXX: We might have to do something else
123 NewChain
= lists:delete(Pid
, S#state
.opt_unchoke_chain
),
124 {ok
, NS
} = start_new_peers([], S#state
{ num_peers
= S#state
.num_peers
-1,
125 opt_unchoke_chain
= NewChain
}),
127 handle_info({'DOWN', _Ref
, process, Pid
, _Reason
}, S
) ->
128 % The peer shut down unexpectedly re-add him to the queue in the *back*
129 NS
= case etorrent_peer:select(Pid
) of
131 {IP
, Port
} = {Peer#peer
.ip
, Peer#peer
.port
},
133 % XXX: We might have to check that remote is intersted and we were choking
135 S#state
{ available_peers
= S#state
.available_peers
++ [{IP
, Port
}]};
139 NewChain
= lists:delete(Pid
, NS#state
.opt_unchoke_chain
),
140 {noreply
, NS#state
{num_peers
= NS#state
.num_peers
-1,
141 opt_unchoke_chain
= NewChain
}};
142 handle_info(Info
, State
) ->
143 error_logger:error_report([unknown_info_peer_group
, Info
]),
146 terminate(Reason
, _S
) ->
147 error_logger:info_report([peer_group_mgr_term
, Reason
]),
150 code_change(_OldVsn
, State
, _Extra
) ->
153 %%--------------------------------------------------------------------
154 %%% Internal functions
155 %%--------------------------------------------------------------------
157 start_new_incoming_peer(IP
, Port
, InfoHash
, S
) ->
158 case max_peer_processes() - S#state
.num_peers
of
160 {reply
, already_enough_connections
, S
};
161 N
when is_integer(N
), N
> 0 ->
162 {atomic
, [T
]} = etorrent_tracking_map:select({infohash
, InfoHash
}),
163 {ok
, Pid
} = etorrent_t_sup:add_peer(
164 T#tracking_map
.supervisor_pid
,
169 erlang:monitor(process, Pid
),
170 NewChain
= insert_new_peer_into_chain(Pid
, S#state
.opt_unchoke_chain
),
173 S#state
{ num_peers
= S#state
.num_peers
+1,
174 opt_unchoke_chain
= NewChain
}}
178 start_new_peers(IPList
, State
) ->
179 %% Update the PeerList with the new incoming peers
180 PeerList
= lists:usort(IPList
++ State#state
.available_peers
),
181 S
= State#state
{ available_peers
= PeerList
},
183 %% Replenish the connected peers.
184 fill_peers(max_peer_processes() - S#state
.num_peers
, S
).
186 %%% NOTE: fill_peers/2 and spawn_new_peer/5 tail calls each other.
190 case S#state
.available_peers
of
192 % No peers available, just stop trying to fill peers
194 [{TorrentId
, {IP
, Port
}} | R
] ->
195 % Possible peer. Check it.
196 case etorrent_bad_peer_mgr:is_bad_peer(IP
, Port
, TorrentId
) of
198 fill_peers(N
, S#state
{available_peers
= R
});
200 spawn_new_peer(IP
, Port
, TorrentId
, N
, S#state
{available_peers
= R
})
204 %%--------------------------------------------------------------------
205 %% Function: spawn_new_peer(IP, Port, N, S) -> {ok, State}
206 %% Description: Attempt to spawn the peer at IP/Port. N is the number of
207 %% peers we still need to spawn and S is the current state. Returns
208 %% a new state to be put into the process.
209 %%--------------------------------------------------------------------
210 spawn_new_peer(IP
, Port
, TorrentId
, N
, S
) ->
211 case etorrent_peer:connected(IP
, Port
, TorrentId
) of
215 {atomic
, [TM
]} = etorrent_tracking_map:select(TorrentId
),
216 {ok
, Pid
} = etorrent_t_sup:add_peer(
217 TM#tracking_map
.supervisor_pid
,
219 TM#tracking_map
.info_hash
,
222 erlang:monitor(process, Pid
),
223 etorrent_t_peer_recv:connect(Pid
, IP
, Port
),
224 NewChain
= insert_new_peer_into_chain(Pid
, S#state
.opt_unchoke_chain
),
226 fill_peers(N
-1, S#state
{ num_peers
= S#state
.num_peers
+1,
227 opt_unchoke_chain
= NewChain
})
230 %%--------------------------------------------------------------------
231 %% Function: rechoke(State) -> ok
232 %% Description: Recalculate the choke/unchoke state of peers
233 %%--------------------------------------------------------------------
235 Peers
= build_rechoke_info(S#state
.opt_unchoke_chain
),
236 {PreferredDown
, PreferredSeed
} = split_preferred(Peers
),
237 PreferredSet
= prune_preferred_peers(PreferredDown
, PreferredSeed
),
238 {N
, ToChoke
} = rechoke_unchoke(Peers
, PreferredSet
, 0, []),
239 rechoke_choke(ToChoke
, N
, optimistics(PreferredSet
)).
241 build_rechoke_info(Peers
) ->
242 SeederSet
= sets:from_list(seeding_torrents()),
243 build_rechoke_info(SeederSet
, Peers
).
245 build_rechoke_info(_Seeding
, []) ->
247 build_rechoke_info(Seeding
, [Pid
| Next
]) ->
248 case etorrent_peer:select(Pid
) of
249 [] -> build_rechoke_info(Seeding
, Next
);
251 Kind
= Peer#peer
.state
,
252 Snubbed
= etorrent_rate_mgr:snubbed(Peer#peer
.torrent_id
, Pid
),
253 PeerState
= etorrent_rate_mgr:select_state(Peer#peer
.torrent_id
, Pid
),
254 case sets:is_element(Peer#peer
.torrent_id
, Seeding
) of
256 case etorrent_rate_mgr:fetch_send_rate(
257 Peer#peer
.torrent_id
,
259 none
-> build_rechoke_info(Seeding
, Next
);
261 [#rechoke_info
{ pid = Pid
,
266 PeerState#peer_state
.interest_state
,
268 PeerState#peer_state
.choke_state
,
270 PeerState#peer_state
.local_choke
,
271 snubbed
= Snubbed
} |
272 build_rechoke_info(Seeding
, Next
)]
275 case etorrent_rate_mgr:fetch_recv_rate(
276 Peer#peer
.torrent_id
,
278 none
-> build_rechoke_info(Seeding
, Next
);
280 [#rechoke_info
{ pid = Pid
,
283 rate
= -Rate
, % Inverted for later sorting!
285 PeerState#peer_state
.interest_state
,
287 PeerState#peer_state
.choke_state
,
289 PeerState#peer_state
.local_choke
,
290 snubbed
= Snubbed
} |
291 build_rechoke_info(Seeding
, Next
)]
296 advance_optimistic_unchoke(S
) ->
297 NewChain
= move_cyclic_chain(S#state
.opt_unchoke_chain
),
300 {ok
, S
}; %% No peers yet
302 etorrent_t_peer_recv:unchoke(H
),
303 {ok
, S#state
{ opt_unchoke_chain
= NewChain
,
304 optimistic_unchoke_pid
= H
}}
307 move_cyclic_chain([]) -> [];
308 move_cyclic_chain(Chain
) ->
310 case etorrent_peer:select(Pid
) of
312 [P
] -> T
= etorrent_rate_mgr:select_state(
315 not (T#peer_state
.interest_state
=:= interested
316 andalso T#peer_state
.choke_state
=:= choked
)
319 {Front
, Back
} = lists:splitwith(F
, Chain
),
323 insert_new_peer_into_chain(Pid
, Chain
) ->
324 Length
= length(Chain
),
325 Index
= lists:max([0, crypto:rand_uniform(0, Length
)]),
326 {Front
, Back
} = lists:split(Index
, Chain
),
327 Front
++ [Pid
| Back
].
329 max_peer_processes() ->
330 case application:get_env(etorrent
, max_peers
) of
331 {ok
, N
} when is_integer(N
) ->
338 case application:get_env(etorrent
, max_upload_slots
) of
340 {ok
, Rate
} = application:get_env(etorrent
, max_upload_rate
),
342 N
when N
=< 0 -> 7; %% Educated guess
347 round(math:sqrt(N
* 0.6))
349 {ok
, N
} when is_integer(N
) ->
353 split_preferred(Peers
) ->
354 {Downs
, Leechs
} = split_preferred_peers(Peers
, [], []),
355 {lists:keysort(#rechoke_info
.rate
, Downs
),
356 lists:keysort(#rechoke_info
.rate
, Leechs
)}.
358 prune_preferred_peers(SDowns
, SLeechs
) ->
359 MaxUploads
= upload_slots(),
360 DUploads
= lists:max([1, round(MaxUploads
* 0.7)]),
361 SUploads
= lists:max([1, round(MaxUploads
* 0.3)]),
363 case lists:max([0, DUploads
- length(SDowns
)]) of
364 0 -> {SUploads
, DUploads
};
365 N
-> {SUploads
+ N
, DUploads
- N
}
368 case lists:max([0, SUP2
- length(SLeechs
)]) of
371 {SUP2
- K
, lists:min([DUP2
+ K
, length(SDowns
)])}
373 {TSDowns
, TSLeechs
} = {lists:sublist(SDowns
, DUP3
),
374 lists:sublist(SLeechs
, SUP3
)},
375 sets:union(sets:from_list(TSDowns
), sets:from_list(TSLeechs
)).
377 rechoke_unchoke([], _PS
, Count
, ToChoke
) ->
379 rechoke_unchoke([P
| Next
], PSet
, Count
, ToChoke
) ->
380 case sets:is_element(P
, PSet
) of
382 etorrent_t_peer_recv:unchoke(P#rechoke_info
.pid),
383 rechoke_unchoke(Next
, PSet
, Count
+1, ToChoke
);
385 rechoke_unchoke(Next
, PSet
, Count
+1, [P
| ToChoke
])
389 MinUp
= case application:get_env(etorrent
, min_uploads
) of
393 lists:max([MinUp
, upload_slots() - sets:size(PSet
)]).
395 rechoke_choke([], _Count
, _Optimistics
) ->
397 rechoke_choke([P
| Next
], Count
, Optimistics
) when Count
>= Optimistics
->
398 etorrent_t_peer_recv:choke(P#rechoke_info
.pid),
399 rechoke_choke(Next
, Count
, Optimistics
);
400 rechoke_choke([P
| Next
], Count
, Optimistics
) ->
401 case P#rechoke_info
.kind
=:= seeding
of
403 etorrent_t_peer_recv:choke(P#rechoke_info
.pid),
404 rechoke_choke(Next
, Count
, Optimistics
);
406 etorrent_t_peer_recv:unchoke(P#rechoke_info
.pid),
407 case P#rechoke_info
.r_interest_state
=:= interested
of
409 rechoke_choke(Next
, Count
+1, Optimistics
);
411 rechoke_choke(Next
, Count
, Optimistics
)
415 split_preferred_peers([], Downs
, Leechs
) ->
417 split_preferred_peers([P
| Next
], Downs
, Leechs
) ->
418 case P#rechoke_info
.state
=:= seeding
419 orelse P#rechoke_info
.r_interest_state
=:= not_interested
of
421 split_preferred_peers(Next
, Downs
, Leechs
);
422 false
when P#rechoke_info
.state
=:= seeding
->
423 split_preferred_peers(Next
, Downs
, [P
| Leechs
]);
424 false
when P#rechoke_info
.snubbed
=:= true
->
425 split_preferred_peers(Next
, Downs
, Leechs
);
427 split_preferred_peers(Next
, [P
| Downs
], Leechs
)
430 seeding_torrents() ->
431 {atomic
, Torrents
} = etorrent_torrent:all(),
433 T#torrent
.state
=:= seeding
].