1 %%%-------------------------------------------------------------------
2 %%% File : etorrent_t_peer_group.erl
3 %%% Author : Jesper Louis Andersen <jesper.louis.andersen@gmail.com>
4 %%% License : See COPYING
5 %%% Description : Master process for a number of peers.
7 %%% Created : 18 Jul 2007 by
8 %%% Jesper Louis Andersen <jesper.louis.andersen@gmail.com>
9 %%%-------------------------------------------------------------------
10 -module(etorrent_choker
).
12 -behaviour(gen_server
).
14 -include("rate_mgr.hrl").
15 -include("peer_state.hrl").
17 -include("etorrent_mnesia_table.hrl").
20 -export([start_link
/1, add_peers
/2, new_incoming_peer
/4, perform_rechoke
/0]).
22 %% gen_server callbacks
23 -export([init
/1, handle_call
/3, handle_cast
/2, handle_info
/2,
24 terminate
/2, code_change
/3]).
26 -record(state
, {available_peers
= [],
35 optimistic_unchoke_pid
= none
,
36 opt_unchoke_chain
= []}).
38 -define(SERVER
, ?MODULE
).
39 -define(MAX_PEER_PROCESSES
, 40).
40 -define(ROUND_TIME
, 10000).
41 -define(DEFAULT_NUM_DOWNLOADERS
, 4).
43 %%====================================================================
45 %%====================================================================
46 start_link(OurPeerId
) ->
47 gen_server:start_link({local
, ?SERVER
}, ?MODULE
, [OurPeerId
], []).
49 add_peers(TorrentId
, IPList
) ->
50 gen_server:cast(?SERVER
, {add_peers
, [{TorrentId
, IPP
} || IPP
<- IPList
]}).
53 gen_server:cast(?SERVER
, rechoke
).
55 new_incoming_peer(IP
, Port
, PeerId
, InfoHash
) ->
56 %% Set a pretty graceful timeout here as the peer_group can be pretty heavily
57 %% loaded at times. We have 5 acceptors by default anyway.
58 gen_server:call(?SERVER
, {new_incoming_peer
, IP
, Port
, PeerId
, InfoHash
}, 15000).
60 %%====================================================================
61 %% gen_server callbacks
62 %%====================================================================
65 process_flag(trap_exit
, true
),
66 {ok
, Tref
} = timer:send_interval(?ROUND_TIME
, self(), round_tick
),
67 {ok
, #state
{ our_peer_id
= OurPeerId
,
68 bad_peers
= dict:new(),
72 handle_call({new_incoming_peer
, _IP
, _Port
, PeerId
, _InfoHash
}, _From
, S
)
73 when S#state
.our_peer_id
=:= PeerId
->
74 {reply
, connect_to_ourselves
, S
};
75 handle_call({new_incoming_peer
, IP
, Port
, _PeerId
, InfoHash
}, _From
, S
) ->
76 {atomic
, [TM
]} = etorrent_tracking_map:select({infohash
, InfoHash
}),
77 case etorrent_bad_peer_mgr:is_bad_peer(IP
, Port
, TM#tracking_map
.id
) of
81 start_new_incoming_peer(IP
, Port
, InfoHash
, S
)
83 handle_call(Request
, _From
, State
) ->
84 error_logger:error_report([unknown_peer_group_call
, Request
]),
86 {reply
, Reply
, State
}.
88 handle_cast({add_peers
, IPList
}, S
) ->
89 {ok
, NS
} = start_new_peers(IPList
, S
),
91 handle_cast(rechoke
, S
) ->
94 handle_cast(_Msg
, State
) ->
97 handle_info(round_tick
, S
) ->
100 {ok
, NS
} = advance_optimistic_unchoke(S
),
102 {noreply
, NS#state
{ round = 2}};
103 N
when is_integer(N
) ->
105 {noreply
, S#state
{round = S#state
.round - 1}}
107 handle_info({'DOWN', _Ref
, process, Pid
, Reason
}, S
)
108 when (Reason
=:= normal
) or (Reason
=:= shutdown
) ->
109 % The peer shut down normally. Hence we just remove him and start up
110 % other peers. Eventually the tracker will re-add him to the peer list
112 % XXX: We might have to do something else
115 NewChain
= lists:delete(Pid
, S#state
.opt_unchoke_chain
),
116 {ok
, NS
} = start_new_peers([], S#state
{ num_peers
= S#state
.num_peers
-1,
117 opt_unchoke_chain
= NewChain
}),
119 handle_info({'DOWN', _Ref
, process, Pid
, _Reason
}, S
) ->
120 % The peer shut down unexpectedly re-add him to the queue in the *back*
121 NS
= case etorrent_peer:select(Pid
) of
123 {IP
, Port
} = {Peer#peer
.ip
, Peer#peer
.port
},
125 % XXX: We might have to check that remote is intersted and we were choking
127 S#state
{ available_peers
= S#state
.available_peers
++ [{IP
, Port
}]};
131 NewChain
= lists:delete(Pid
, NS#state
.opt_unchoke_chain
),
132 {noreply
, NS#state
{num_peers
= NS#state
.num_peers
-1,
133 opt_unchoke_chain
= NewChain
}};
134 handle_info(Info
, State
) ->
135 error_logger:error_report([unknown_info_peer_group
, Info
]),
138 terminate(Reason
, _S
) ->
139 error_logger:info_report([peer_group_mgr_term
, Reason
]),
142 code_change(_OldVsn
, State
, _Extra
) ->
145 %%--------------------------------------------------------------------
146 %%% Internal functions
147 %%--------------------------------------------------------------------
149 start_new_incoming_peer(IP
, Port
, InfoHash
, S
) ->
150 case ?MAX_PEER_PROCESSES
- S#state
.num_peers
of
152 {reply
, already_enough_connections
, S
};
153 N
when is_integer(N
), N
> 0 ->
154 {atomic
, [T
]} = etorrent_tracking_map:select({infohash
, InfoHash
}),
155 {ok
, Pid
} = etorrent_t_sup:add_peer(
156 T#tracking_map
.supervisor_pid
,
161 erlang:monitor(process, Pid
),
162 NewChain
= insert_new_peer_into_chain(Pid
, S#state
.opt_unchoke_chain
),
165 S#state
{ num_peers
= S#state
.num_peers
+1,
166 opt_unchoke_chain
= NewChain
}}
170 start_new_peers(IPList
, State
) ->
171 %% Update the PeerList with the new incoming peers
172 PeerList
= lists:usort(IPList
++ State#state
.available_peers
),
173 S
= State#state
{ available_peers
= PeerList
},
175 %% Replenish the connected peers.
176 fill_peers(?MAX_PEER_PROCESSES
- S#state
.num_peers
, S
).
178 %%% NOTE: fill_peers/2 and spawn_new_peer/5 tail calls each other.
182 case S#state
.available_peers
of
184 % No peers available, just stop trying to fill peers
186 [{TorrentId
, {IP
, Port
}} | R
] ->
187 % Possible peer. Check it.
188 case etorrent_bad_peer_mgr:is_bad_peer(IP
, Port
, TorrentId
) of
190 fill_peers(N
, S#state
{available_peers
= R
});
192 spawn_new_peer(IP
, Port
, TorrentId
, N
, S#state
{available_peers
= R
})
196 %%--------------------------------------------------------------------
197 %% Function: spawn_new_peer(IP, Port, N, S) -> {ok, State}
198 %% Description: Attempt to spawn the peer at IP/Port. N is the number of
199 %% peers we still need to spawn and S is the current state. Returns
200 %% a new state to be put into the process.
201 %%--------------------------------------------------------------------
202 spawn_new_peer(IP
, Port
, TorrentId
, N
, S
) ->
203 case etorrent_peer:connected(IP
, Port
, TorrentId
) of
207 {atomic
, [TM
]} = etorrent_tracking_map:select(TorrentId
),
208 {ok
, Pid
} = etorrent_t_sup:add_peer(
209 TM#tracking_map
.supervisor_pid
,
211 TM#tracking_map
.info_hash
,
214 erlang:monitor(process, Pid
),
215 etorrent_t_peer_recv:connect(Pid
, IP
, Port
),
216 NewChain
= insert_new_peer_into_chain(Pid
, S#state
.opt_unchoke_chain
),
218 fill_peers(N
-1, S#state
{ num_peers
= S#state
.num_peers
+1,
219 opt_unchoke_chain
= NewChain
})
222 %%--------------------------------------------------------------------
223 %% Function: rechoke(State) -> ok
224 %% Description: Recalculate the choke/unchoke state of peers
225 %%--------------------------------------------------------------------
227 Table
= etorrent_recv_state
,
228 Peers
= select_fastest(todo_rewrite_choking_algo
, Table
),
229 rechoke(Peers
, calculate_num_downloaders(S
), S
).
231 rechoke(Peers
, 0, S
) ->
232 lists:foreach(fun(P
) -> optimistic_unchoke_handler(P
, S
) end, Peers
),
234 rechoke([], _N
, _S
) ->
236 rechoke([Peer
| Rest
], N
, S
) when is_record(Peer
, rate_mgr
) ->
237 case ets:lookup(etorrent_peer_state
, Peer#rate_mgr
.pid) of
239 {_Id
, Pid
} = Peer#rate_mgr
.pid,
240 etorrent_t_peer_recv:unchoke(Pid
),
242 [#peer_state
{ interest_state
= I
, pid = {_Id
, Pid
}}] ->
245 etorrent_t_peer_recv:unchoke(Pid
),
246 rechoke(Rest
, N
-1, S
);
248 etorrent_t_peer_recv:unchoke(Pid
),
253 optimistic_unchoke_handler(#rate_mgr
{ pid = {_Id
, Pid
} }, S
) ->
254 case Pid
=:= S#state
.optimistic_unchoke_pid
of
256 ok
; % Handled elsewhere
258 etorrent_t_peer_recv:choke(Pid
)
261 %% TODO: Make number of downloaders depend on current rate!
262 calculate_num_downloaders(S
) ->
263 case ets:lookup(etorrent_peer_state
, {todo_redefine_optimistics
,
264 S#state
.optimistic_unchoke_pid
}) of
266 ?DEFAULT_NUM_DOWNLOADERS
;
268 case P#peer_state
.interest_state
of
270 ?DEFAULT_NUM_DOWNLOADERS
- 1;
272 ?DEFAULT_NUM_DOWNLOADERS
276 advance_optimistic_unchoke(S
) ->
277 NewChain
= move_cyclic_chain(S#state
.opt_unchoke_chain
),
280 {ok
, S
}; %% No peers yet
282 etorrent_t_peer_recv:unchoke(H
),
283 {ok
, S#state
{ opt_unchoke_chain
= NewChain
,
284 optimistic_unchoke_pid
= H
}}
287 move_cyclic_chain([]) -> [];
288 move_cyclic_chain(Chain
) ->
289 F
= fun (P
) -> local_unchoked(P
, todo_move_cyclic_chain_all
) end,
290 {Front
, Back
} = lists:splitwith(F
, Chain
),
294 local_unchoked(Pid
, TorrentId
) ->
295 case ets:lookup(etorrent_peer_state
, {TorrentId
, Pid
}) of
297 [P
] -> P#peer_state
.local_choke
=:= true
300 insert_new_peer_into_chain(Pid
, Chain
) ->
301 Length
= length(Chain
),
302 Index
= lists:max([0, crypto:rand_uniform(0, Length
)]),
303 {Front
, Back
} = lists:split(Index
, Chain
),
304 Front
++ [Pid
| Back
].
306 select_fastest(Id
, Table
) ->
307 Rows
= ets:select(Table
, [{{rate_mgr
,{Id
,'_'},'_'},[],['$_']}]),
308 lists:reverse(lists:keysort(#rate_mgr
.rate
, Rows
)).