1 %%%-------------------------------------------------------------------
2 %%% File : etorrent_t_peer_group.erl
3 %%% Author : Jesper Louis Andersen <jesper.louis.andersen@gmail.com>
4 %%% License : See COPYING
5 %%% Description : Master process for a number of peers.
7 %%% Created : 18 Jul 2007 by
8 %%% Jesper Louis Andersen <jesper.louis.andersen@gmail.com>
9 %%%-------------------------------------------------------------------
10 -module(etorrent_choker
).
12 -behaviour(gen_server
).
14 -include("rate_mgr.hrl").
15 -include("peer_state.hrl").
17 -include("etorrent_mnesia_table.hrl").
20 -export([start_link
/1, add_peers
/2, new_incoming_peer
/4, perform_rechoke
/0]).
22 %% gen_server callbacks
23 -export([init
/1, handle_call
/3, handle_cast
/2, handle_info
/2,
24 terminate
/2, code_change
/3]).
26 -record(state
, {available_peers
= [],
35 optimistic_unchoke_pid
= none
,
36 opt_unchoke_chain
= []}).
38 -record(rechoke_info
, {pid :: pid(),
39 kind
:: 'seeding' | 'leeching',
40 state
:: 'seeding' | 'leeching' ,
42 r_interest_state
:: 'interested' | 'not_interested',
43 r_choke_state
:: 'choked' | 'unchoked' ,
46 -define(SERVER
, ?MODULE
).
47 -define(MAX_PEER_PROCESSES
, 40).
48 -define(ROUND_TIME
, 10000).
50 %%====================================================================
52 %%====================================================================
53 start_link(OurPeerId
) ->
54 gen_server:start_link({local
, ?SERVER
}, ?MODULE
, [OurPeerId
], []).
56 add_peers(TorrentId
, IPList
) ->
57 gen_server:cast(?SERVER
, {add_peers
, [{TorrentId
, IPP
} || IPP
<- IPList
]}).
60 gen_server:cast(?SERVER
, rechoke
).
62 new_incoming_peer(IP
, Port
, PeerId
, InfoHash
) ->
63 %% Set a pretty graceful timeout here as the peer_group can be pretty heavily
64 %% loaded at times. We have 5 acceptors by default anyway.
65 gen_server:call(?SERVER
, {new_incoming_peer
, IP
, Port
, PeerId
, InfoHash
}, 15000).
67 %%====================================================================
68 %% gen_server callbacks
69 %%====================================================================
72 process_flag(trap_exit
, true
),
73 {ok
, Tref
} = timer:send_interval(?ROUND_TIME
, self(), round_tick
),
74 {ok
, #state
{ our_peer_id
= OurPeerId
,
75 bad_peers
= dict:new(),
79 handle_call({new_incoming_peer
, _IP
, _Port
, PeerId
, _InfoHash
}, _From
, S
)
80 when S#state
.our_peer_id
=:= PeerId
->
81 {reply
, connect_to_ourselves
, S
};
82 handle_call({new_incoming_peer
, IP
, Port
, _PeerId
, InfoHash
}, _From
, S
) ->
83 {atomic
, [TM
]} = etorrent_tracking_map:select({infohash
, InfoHash
}),
84 case etorrent_bad_peer_mgr:is_bad_peer(IP
, Port
, TM#tracking_map
.id
) of
88 start_new_incoming_peer(IP
, Port
, InfoHash
, S
)
90 handle_call(Request
, _From
, State
) ->
91 error_logger:error_report([unknown_peer_group_call
, Request
]),
93 {reply
, Reply
, State
}.
95 handle_cast({add_peers
, IPList
}, S
) ->
96 {ok
, NS
} = start_new_peers(IPList
, S
),
98 handle_cast(rechoke
, S
) ->
101 handle_cast(_Msg
, State
) ->
104 handle_info(round_tick
, S
) ->
105 case S#state
.round of
107 {ok
, NS
} = advance_optimistic_unchoke(S
),
109 {noreply
, NS#state
{ round = 2}};
110 N
when is_integer(N
) ->
112 {noreply
, S#state
{round = S#state
.round - 1}}
114 handle_info({'DOWN', _Ref
, process, Pid
, Reason
}, S
)
115 when (Reason
=:= normal
) or (Reason
=:= shutdown
) ->
116 % The peer shut down normally. Hence we just remove him and start up
117 % other peers. Eventually the tracker will re-add him to the peer list
119 % XXX: We might have to do something else
122 NewChain
= lists:delete(Pid
, S#state
.opt_unchoke_chain
),
123 {ok
, NS
} = start_new_peers([], S#state
{ num_peers
= S#state
.num_peers
-1,
124 opt_unchoke_chain
= NewChain
}),
126 handle_info({'DOWN', _Ref
, process, Pid
, _Reason
}, S
) ->
127 % The peer shut down unexpectedly re-add him to the queue in the *back*
128 NS
= case etorrent_peer:select(Pid
) of
130 {IP
, Port
} = {Peer#peer
.ip
, Peer#peer
.port
},
132 % XXX: We might have to check that remote is intersted and we were choking
134 S#state
{ available_peers
= S#state
.available_peers
++ [{IP
, Port
}]};
138 NewChain
= lists:delete(Pid
, NS#state
.opt_unchoke_chain
),
139 {noreply
, NS#state
{num_peers
= NS#state
.num_peers
-1,
140 opt_unchoke_chain
= NewChain
}};
141 handle_info(Info
, State
) ->
142 error_logger:error_report([unknown_info_peer_group
, Info
]),
145 terminate(Reason
, _S
) ->
146 error_logger:info_report([peer_group_mgr_term
, Reason
]),
149 code_change(_OldVsn
, State
, _Extra
) ->
152 %%--------------------------------------------------------------------
153 %%% Internal functions
154 %%--------------------------------------------------------------------
156 start_new_incoming_peer(IP
, Port
, InfoHash
, S
) ->
157 case max_peer_processes() - S#state
.num_peers
of
159 {reply
, already_enough_connections
, S
};
160 N
when is_integer(N
), N
> 0 ->
161 {atomic
, [T
]} = etorrent_tracking_map:select({infohash
, InfoHash
}),
162 {ok
, Pid
} = etorrent_t_sup:add_peer(
163 T#tracking_map
.supervisor_pid
,
168 erlang:monitor(process, Pid
),
169 NewChain
= insert_new_peer_into_chain(Pid
, S#state
.opt_unchoke_chain
),
172 S#state
{ num_peers
= S#state
.num_peers
+1,
173 opt_unchoke_chain
= NewChain
}}
177 start_new_peers(IPList
, State
) ->
178 %% Update the PeerList with the new incoming peers
179 PeerList
= lists:usort(IPList
++ State#state
.available_peers
),
180 S
= State#state
{ available_peers
= PeerList
},
182 %% Replenish the connected peers.
183 fill_peers(max_peer_processes() - S#state
.num_peers
, S
).
185 %%% NOTE: fill_peers/2 and spawn_new_peer/5 tail calls each other.
189 case S#state
.available_peers
of
191 % No peers available, just stop trying to fill peers
193 [{TorrentId
, {IP
, Port
}} | R
] ->
194 % Possible peer. Check it.
195 case etorrent_bad_peer_mgr:is_bad_peer(IP
, Port
, TorrentId
) of
197 fill_peers(N
, S#state
{available_peers
= R
});
199 spawn_new_peer(IP
, Port
, TorrentId
, N
, S#state
{available_peers
= R
})
203 %%--------------------------------------------------------------------
204 %% Function: spawn_new_peer(IP, Port, N, S) -> {ok, State}
205 %% Description: Attempt to spawn the peer at IP/Port. N is the number of
206 %% peers we still need to spawn and S is the current state. Returns
207 %% a new state to be put into the process.
208 %%--------------------------------------------------------------------
209 spawn_new_peer(IP
, Port
, TorrentId
, N
, S
) ->
210 case etorrent_peer:connected(IP
, Port
, TorrentId
) of
214 {atomic
, [TM
]} = etorrent_tracking_map:select(TorrentId
),
215 {ok
, Pid
} = etorrent_t_sup:add_peer(
216 TM#tracking_map
.supervisor_pid
,
218 TM#tracking_map
.info_hash
,
221 erlang:monitor(process, Pid
),
222 etorrent_t_peer_recv:connect(Pid
, IP
, Port
),
223 NewChain
= insert_new_peer_into_chain(Pid
, S#state
.opt_unchoke_chain
),
225 fill_peers(N
-1, S#state
{ num_peers
= S#state
.num_peers
+1,
226 opt_unchoke_chain
= NewChain
})
229 %%--------------------------------------------------------------------
230 %% Function: rechoke(State) -> ok
231 %% Description: Recalculate the choke/unchoke state of peers
232 %%--------------------------------------------------------------------
234 Peers
= build_rechoke_info(S#state
.opt_unchoke_chain
),
235 {PreferredDown
, PreferredSeed
} = split_preferred(Peers
),
236 PreferredSet
= prune_preferred_peers(PreferredDown
, PreferredSeed
),
237 {N
, ToChoke
} = rechoke_unchoke(Peers
, PreferredSet
, 0, []),
238 rechoke_choke(ToChoke
, N
, optimistics(PreferredSet
)).
240 build_rechoke_info(Peers
) ->
241 SeederSet
= sets:from_list(seeding_torrents()),
242 build_rechoke_info(SeederSet
, Peers
).
244 build_rechoke_info(_Seeding
, []) ->
246 build_rechoke_info(Seeding
, [Pid
| Next
]) ->
247 case etorrent_peer:select(Pid
) of
248 [] -> build_rechoke_info(Seeding
, Next
);
250 Kind
= Peer#peer
.state
,
251 Snubbed
= etorrent_rate_mgr:snubbed(Peer#peer
.torrent_id
, Pid
),
252 PeerState
= etorrent_rate_mgr:select_state(Peer#peer
.torrent_id
, Pid
),
253 case sets:is_element(Peer#peer
.torrent_id
, Seeding
) of
255 case etorrent_rate_mgr:fetch_send_rate(
256 Peer#peer
.torrent_id
,
258 none
-> build_rechoke_info(Seeding
, Next
);
260 [#rechoke_info
{ pid = Pid
,
265 PeerState#peer_state
.interest_state
,
267 PeerState#peer_state
.choke_state
,
268 snubbed
= Snubbed
} |
269 build_rechoke_info(Seeding
, Next
)]
272 case etorrent_rate_mgr:fetch_recv_rate(
273 Peer#peer
.torrent_id
,
275 none
-> build_rechoke_info(Seeding
, Next
);
277 [#rechoke_info
{ pid = Pid
,
280 rate
= -Rate
, % Inverted for later sorting!
281 snubbed
= Snubbed
} |
282 build_rechoke_info(Seeding
, Next
)]
287 advance_optimistic_unchoke(S
) ->
288 NewChain
= move_cyclic_chain(S#state
.opt_unchoke_chain
),
291 {ok
, S
}; %% No peers yet
293 etorrent_t_peer_recv:unchoke(H
),
294 {ok
, S#state
{ opt_unchoke_chain
= NewChain
,
295 optimistic_unchoke_pid
= H
}}
298 %%TODO: Fix cyclic chain move!
299 move_cyclic_chain([]) -> [];
300 move_cyclic_chain(Chain
) ->
301 F
= fun (P
) -> local_unchoked(P
, todo_move_cyclic_chain_all
) end,
302 {Front
, Back
} = lists:splitwith(F
, Chain
),
306 local_unchoked(Pid
, TorrentId
) ->
307 case ets:lookup(etorrent_peer_state
, {TorrentId
, Pid
}) of
309 [P
] -> P#peer_state
.local_choke
=:= true
312 insert_new_peer_into_chain(Pid
, Chain
) ->
313 Length
= length(Chain
),
314 Index
= lists:max([0, crypto:rand_uniform(0, Length
)]),
315 {Front
, Back
} = lists:split(Index
, Chain
),
316 Front
++ [Pid
| Back
].
318 max_peer_processes() ->
319 case application:get_env(etorrent
, max_peers
) of
320 {ok
, N
} when is_integer(N
) ->
327 case application:get_env(etorrent
, max_upload_slots
) of
329 {ok
, Rate
} = application:get_env(etorrent
, max_upload_rate
),
331 N
when N
=< 0 -> 7; %% Educated guess
336 round(math:sqrt(N
* 0.6))
338 {ok
, N
} when is_integer(N
) ->
342 split_preferred(Peers
) ->
343 {Downs
, Leechs
} = split_preferred_peers(Peers
, [], []),
344 {lists:keysort(#rechoke_info
.rate
, Downs
),
345 lists:keysort(#rechoke_info
.rate
, Leechs
)}.
347 prune_preferred_peers(SDowns
, SLeechs
) ->
348 MaxUploads
= upload_slots(),
349 DUploads
= lists:max([1, round(MaxUploads
* 0.7)]),
350 SUploads
= lists:max([1, round(MaxUploads
* 0.3)]),
352 case lists:max([0, DUploads
- length(SDowns
)]) of
353 0 -> {SUploads
, DUploads
};
354 N
-> {SUploads
+ N
, DUploads
- N
}
357 case lists:max([0, SUP2
- length(SLeechs
)]) of
360 {SUP2
- K
, lists:min([DUP2
+ K
, length(SDowns
)])}
362 {TSDowns
, TSLeechs
} = {lists:sublist(SDowns
, DUP3
),
363 lists:sublist(SLeechs
, SUP3
)},
364 sets:union(sets:from_list(TSDowns
), sets:from_list(TSLeechs
)).
366 rechoke_unchoke([], _PS
, Count
, ToChoke
) ->
368 rechoke_unchoke([P
| Next
], PSet
, Count
, ToChoke
) ->
369 case sets:is_element(P
, PSet
) of
371 etorrent_t_peer_recv:unchoke(P#rechoke_info
.pid),
372 rechoke_unchoke(Next
, PSet
, Count
+1, ToChoke
);
374 rechoke_unchoke(Next
, PSet
, Count
+1, [P
| ToChoke
])
378 MinUp
= case application:get_env(etorrent
, min_uploads
) of
382 lists:max([MinUp
, upload_slots() - sets:size(PSet
)]).
384 rechoke_choke([], _Count
, _Optimistics
) ->
386 rechoke_choke([P
| Next
], Count
, Optimistics
) when Count
>= Optimistics
->
387 etorrent_t_peer_recv:choke(P#rechoke_info
.pid),
388 rechoke_choke(Next
, Count
, Optimistics
);
389 rechoke_choke([P
| Next
], Count
, Optimistics
) ->
390 case P#rechoke_info
.kind
=:= seeding
of
392 etorrent_t_peer_recv:choke(P#rechoke_info
.pid),
393 rechoke_choke(Next
, Count
, Optimistics
);
395 etorrent_t_peer_recv:unchoke(P#rechoke_info
.pid),
396 case P#rechoke_info
.r_interest_state
=:= interested
of
398 rechoke_choke(Next
, Count
+1, Optimistics
);
400 rechoke_choke(Next
, Count
, Optimistics
)
404 split_preferred_peers([], Downs
, Leechs
) ->
406 split_preferred_peers([P
| Next
], Downs
, Leechs
) ->
407 case P#rechoke_info
.state
=:= seeding
408 orelse P#rechoke_info
.r_interest_state
=:= not_interested
of
410 split_preferred_peers(Next
, Downs
, Leechs
);
411 false
when P#rechoke_info
.state
=:= seeding
->
412 split_preferred_peers(Next
, Downs
, [P
| Leechs
]);
413 false
when P#rechoke_info
.snubbed
=:= true
->
414 split_preferred_peers(Next
, Downs
, Leechs
);
416 split_preferred_peers(Next
, [P
| Downs
], Leechs
)
419 seeding_torrents() ->
420 {atomic
, Torrents
} = etorrent_torrent:all(),
422 T#torrent
.state
=:= seeding
].