Only perform rechokes when we need to from peer_recv.
[etorrent.git] / lib / etorrent-1.0 / src / etorrent_t_peer_send.erl
blob208eeb279ec99b2746e9fc0672abecf38caebba3
1 %%%-------------------------------------------------------------------
2 %%% File : etorrent_t_peer_send.erl
3 %%% Author : Jesper Louis Andersen
4 %%% License : See COPYING
5 %%% Description : Send out events to a foreign socket.
6 %%%
7 %%% Created : 27 Jan 2007 by
8 %%% Jesper Louis Andersen <jesper.louis.andersen@gmail.com>
9 %%%-------------------------------------------------------------------
10 -module(etorrent_t_peer_send).
12 -include("etorrent_mnesia_table.hrl").
13 -include("etorrent_rate.hrl").
15 -behaviour(gen_server).
17 %% API
18 -export([start_link/4,
19 stop/1,
20 check_choke/1,
22 local_request/2, remote_request/4, cancel/4,
23 choke/1, unchoke/1,
25 not_interested/1, interested/1,
26 send_have_piece/2,
27 bitfield/2]).
29 %% gen_server callbacks
30 -export([init/1, handle_info/2, terminate/2, code_change/3,
31 handle_call/3, handle_cast/2]).
33 -record(state, {socket = none,
34 request_queue = none,
36 rate = none,
37 choke = true,
38 interested = false, % Are we interested in the peer?
39 timer = none,
40 rate_timer = none,
41 parent = none,
42 piece_cache = none,
43 torrent_id = none,
44 file_system_pid = none}).
46 -define(DEFAULT_KEEP_ALIVE_INTERVAL, 120*1000). % From proto. spec.
47 -define(MAX_REQUESTS, 1024). % Maximal number of requests a peer may make.
48 %%====================================================================
49 %% API
50 %%====================================================================
51 start_link(Socket, FilesystemPid, TorrentId, RecvPid) ->
52 gen_server:start_link(?MODULE,
53 [Socket, FilesystemPid, TorrentId, RecvPid], []).
55 %%--------------------------------------------------------------------
56 %% Func: remote_request(Pid, Index, Offset, Len)
57 %% Description: The remote end (ie, the peer) requested a chunk
58 %% {Index, Offset, Len}
59 %%--------------------------------------------------------------------
60 remote_request(Pid, Index, Offset, Len) ->
61 gen_server:cast(Pid, {remote_request, Index, Offset, Len}).
63 %%--------------------------------------------------------------------
64 %% Func: local_request(Pid, Index, Offset, Len)
65 %% Description: We request a piece from the peer: {Index, Offset, Len}
66 %%--------------------------------------------------------------------
67 local_request(Pid, {Index, Offset, Size}) ->
68 gen_server:cast(Pid, {local_request, {Index, Offset, Size}}).
70 %%--------------------------------------------------------------------
71 %% Func: cancel(Pid, Index, Offset, Len)
72 %% Description: Cancel the {Index, Offset, Len} at the peer.
73 %%--------------------------------------------------------------------
74 cancel(Pid, Index, Offset, Len) ->
75 gen_server:cast(Pid, {cancel_piece, Index, Offset, Len}).
77 %%--------------------------------------------------------------------
78 %% Func: choke(Pid)
79 %% Description: Choke the peer.
80 %%--------------------------------------------------------------------
81 choke(Pid) ->
82 gen_server:cast(Pid, choke).
84 %%--------------------------------------------------------------------
85 %% Func: unchoke(Pid)
86 %% Description: Unchoke the peer.
87 %%--------------------------------------------------------------------
88 unchoke(Pid) ->
89 gen_server:cast(Pid, unchoke).
91 check_choke(Pid) -> gen_server:cast(Pid, check_choke).
93 %%--------------------------------------------------------------------
94 %% Func: not_interested(Pid)
95 %% Description: Tell the peer we are not interested in him anymore
96 %%--------------------------------------------------------------------
97 not_interested(Pid) ->
98 gen_server:cast(Pid, not_interested).
100 interested(Pid) ->
101 gen_server:cast(Pid, interested).
103 %%--------------------------------------------------------------------
104 %% Func: send_have_piece(Pid, PieceNumber)
105 %% Description: Tell the peer we have the piece PieceNumber
106 %%--------------------------------------------------------------------
107 send_have_piece(Pid, PieceNumber) ->
108 gen_server:cast(Pid, {have, PieceNumber}).
110 bitfield(Pid, BitField) ->
111 gen_server:cast(Pid, {bitfield, BitField}).
114 %%--------------------------------------------------------------------
115 %% Func: stop(Pid)
116 %% Description: Tell the send process to stop the communication.
117 %%--------------------------------------------------------------------
118 stop(Pid) ->
119 gen_server:cast(Pid, stop).
121 %%====================================================================
122 %% gen_server callbacks
123 %%====================================================================
124 init([Socket, FilesystemPid, TorrentId, Parent]) ->
125 process_flag(trap_exit, true),
126 {ok, TRef} = timer:send_interval(?DEFAULT_KEEP_ALIVE_INTERVAL, self(), keep_alive_tick),
127 {ok, Tref2} = timer:send_interval(?RATE_UPDATE, self(), rate_update),
128 {ok,
129 #state{socket = Socket,
130 timer = TRef,
131 rate_timer = Tref2,
132 request_queue = queue:new(),
133 rate = etorrent_rate:init(?RATE_FUDGE),
134 parent = Parent,
135 torrent_id = TorrentId,
136 file_system_pid = FilesystemPid},
139 %%--------------------------------------------------------------------
140 %% Function: %% handle_call(Request, From, State) -> {reply, Reply, State} |
141 %% {reply, Reply, State, Timeout} |
142 %% {noreply, State} |
143 %% {noreply, State, Timeout} |
144 %% {stop, Reason, Reply, State} |
145 %% {stop, Reason, State}
146 %% Description: Handling call messages
147 %%--------------------------------------------------------------------
148 handle_call(_Request, _From, State) ->
149 Reply = ok,
150 {reply, Reply, State}.
152 %%--------------------------------------------------------------------
153 %% Func: code_change(OldVsn, State, Extra) -> {ok, NewState}
154 %% Description: Convert process state when code is changed
155 %%--------------------------------------------------------------------
156 code_change(_OldVsn, State, _Extra) ->
157 {ok, State}.
159 handle_info(keep_alive_tick, S) ->
160 send_message(keep_alive, S, 0);
161 handle_info(rate_update, S) ->
162 Rate = etorrent_rate:update(S#state.rate, 0),
163 ok = etorrent_rate_mgr:send_rate(S#state.torrent_id,
164 S#state.parent,
165 Rate#peer_rate.rate,
167 {noreply, S#state { rate = Rate }};
168 handle_info(timeout, S)
169 when S#state.choke =:= true andalso S#state.piece_cache =:= none ->
170 garbage_collect(),
171 {noreply, S};
172 handle_info(timeout, S) when S#state.choke =:= true ->
173 {noreply, S};
174 handle_info(timeout, S) when S#state.choke =:= false ->
175 case queue:out(S#state.request_queue) of
176 {empty, _} ->
177 {noreply, S};
178 {{value, {Index, Offset, Len}}, NewQ} ->
179 send_piece(Index, Offset, Len, S#state { request_queue = NewQ } )
180 end;
181 handle_info(Msg, S) ->
182 error_logger:info_report([got_unknown_message, Msg, S]),
183 {stop, {unknown_msg, Msg}}.
185 handle_cast(choke, S) when S#state.choke == true ->
186 {noreply, S, 0};
187 handle_cast(choke, S) when S#state.choke == false ->
188 ok = etorrent_rate_mgr:local_choke(S#state.torrent_id, S#state.parent),
189 send_message(choke, S#state{choke = true, piece_cache = none});
190 handle_cast(unchoke, S) when S#state.choke == false ->
191 {noreply, S, 0};
192 handle_cast(unchoke, S) when S#state.choke == true ->
193 ok = etorrent_rate_mgr:local_unchoke(S#state.torrent_id, S#state.parent),
194 send_message(unchoke, S#state{choke = false,
195 request_queue = queue:new()});
196 handle_cast(check_choke, S) when S#state.choke =:= true ->
197 {noreply, S, 0};
198 handle_cast(check_choke, S) when S#state.choke =:= false ->
199 ok = etorrent_choker:perform_rechoke(),
200 {noreply, S, 0};
201 handle_cast({bitfield, BF}, S) ->
202 send_message({bitfield, BF}, S);
203 handle_cast(not_interested, S) when S#state.interested =:= false ->
204 {noreply, S, 0};
205 handle_cast(not_interested, S) when S#state.interested =:= true ->
206 send_message(not_interested, S#state { interested = false });
207 handle_cast(interested, S) when S#state.interested =:= true ->
208 {noreply, S, 0};
209 handle_cast(interested, S) when S#state.interested =:= false ->
210 send_message(interested, S#state { interested = true });
211 handle_cast({have, Pn}, S) ->
212 send_message({have, Pn}, S);
213 handle_cast({local_request, {Index, Offset, Size}}, S) ->
214 send_message({request, Index, Offset, Size}, S);
215 handle_cast({remote_request, _Index, _Offset, _Len}, S)
216 when S#state.choke == true ->
217 {noreply, S, 0};
218 handle_cast({remote_request, Index, Offset, Len}, S)
219 when S#state.choke == false ->
220 Requests = queue:len(S#state.request_queue),
221 case Requests > ?MAX_REQUESTS of
222 true ->
223 {stop, max_queue_len_exceeded, S};
224 false ->
225 NQ = queue:in({Index, Offset, Len}, S#state.request_queue),
226 {noreply, S#state{request_queue = NQ}, 0}
227 end;
228 handle_cast({cancel_piece, Index, OffSet, Len}, S) ->
229 NQ = etorrent_utils:queue_remove({Index, OffSet, Len}, S#state.request_queue),
230 {noreply, S#state{request_queue = NQ}, 0};
231 handle_cast(stop, S) ->
232 {stop, normal, S}.
235 %% Terminating normally means we should inform our recv pair
236 terminate(_Reason, S) ->
237 timer:cancel(S#state.timer),
238 timer:cancel(S#state.rate_timer),
241 %%--------------------------------------------------------------------
242 %%% Internal functions
243 %%--------------------------------------------------------------------
245 %%--------------------------------------------------------------------
246 %% Function: send_piece_message/2
247 %% Description: Send the message Msg and handle an eventual connection
248 %% close gracefully.
249 %%--------------------------------------------------------------------
250 send_piece_message(Msg, S, Timeout) ->
251 case etorrent_peer_communication:send_message(S#state.rate, S#state.socket, Msg) of
252 {ok, R, Amount} ->
253 ok = etorrent_rate_mgr:send_rate(S#state.torrent_id,
254 S#state.parent,
255 R#peer_rate.rate,
256 Amount),
257 {noreply, S#state { rate = R }, Timeout};
258 {{error, closed}, R, _Amount} ->
259 {stop, normal, S#state { rate = R}}
260 end.
262 send_piece(Index, Offset, Len, S) ->
263 case S#state.piece_cache of
264 {I, Binary} when I == Index ->
265 <<_Skip:Offset/binary, Data:Len/binary, _R/binary>> = Binary,
266 Msg = {piece, Index, Offset, Data},
267 %% Track uploaded size for torrent (for the tracker)
268 ok = etorrent_torrent:statechange(S#state.torrent_id,
269 {add_upload, Len}),
270 %% Track the amount uploaded by this peer.
272 send_piece_message(Msg, S, 0);
273 %% Update cache and try again...
274 {I, _Binary} when I /= Index ->
275 NS = load_piece(Index, S),
276 send_piece(Index, Offset, Len, NS);
277 none ->
278 NS = load_piece(Index, S),
279 send_piece(Index, Offset, Len, NS)
280 end.
282 load_piece(Index, S) ->
283 {ok, Piece} = etorrent_fs:read_piece(S#state.file_system_pid, Index),
284 S#state{piece_cache = {Index, Piece}}.
286 send_message(Msg, S) ->
287 send_message(Msg, S, 0).
289 send_message(Msg, S, Timeout) ->
290 case etorrent_peer_communication:send_message(S#state.rate, S#state.socket, Msg) of
291 {ok, Rate, Amount} ->
292 ok = etorrent_rate_mgr:send_rate(
293 S#state.torrent_id,
294 S#state.parent,
295 Rate#peer_rate.rate,
296 Amount),
297 {noreply, S#state { rate = Rate}, Timeout};
298 {{error, ebadf}, R, _Amount} ->
299 error_logger:info_report([caught_ebadf, S#state.socket]),
300 {stop, normal, S#state { rate = R}};
301 {{error, closed}, R, _Amount} ->
302 {stop, normal, S#state { rate = R}}
303 end.