Handle HAVE_ALL and HAVE_NONE. Cleanup the BITFIELD message.
[etorrent.git] / lib / etorrent-1.0 / src / etorrent_t_peer_send.erl
blob29e8229c35f4ea7b019ce1fb63a8380788dbcb52
1 %%%-------------------------------------------------------------------
2 %%% File : etorrent_t_peer_send.erl
3 %%% Author : Jesper Louis Andersen
4 %%% License : See COPYING
5 %%% Description : Send out events to a foreign socket.
6 %%%
7 %%% Created : 27 Jan 2007 by
8 %%% Jesper Louis Andersen <jesper.louis.andersen@gmail.com>
9 %%%-------------------------------------------------------------------
10 -module(etorrent_t_peer_send).
12 -include("etorrent_mnesia_table.hrl").
13 -include("etorrent_rate.hrl").
15 -behaviour(gen_server).
17 %% API
18 -export([start_link/4,
19 stop/1,
20 check_choke/1,
22 local_request/2, remote_request/4, cancel/4,
23 choke/1, unchoke/1, have/2,
25 not_interested/1, interested/1,
26 bitfield/2]).
28 %% gen_server callbacks
29 -export([init/1, handle_info/2, terminate/2, code_change/3,
30 handle_call/3, handle_cast/2]).
32 -record(state, {socket = none,
33 request_queue = none,
35 rate = none,
36 choke = true,
37 interested = false, % Are we interested in the peer?
38 timer = none,
39 rate_timer = none,
40 parent = none,
41 piece_cache = none,
42 torrent_id = none,
43 file_system_pid = none}).
45 -define(DEFAULT_KEEP_ALIVE_INTERVAL, 120*1000). % From proto. spec.
46 -define(MAX_REQUESTS, 1024). % Maximal number of requests a peer may make.
47 %%====================================================================
48 %% API
49 %%====================================================================
50 start_link(Socket, FilesystemPid, TorrentId, RecvPid) ->
51 gen_server:start_link(?MODULE,
52 [Socket, FilesystemPid, TorrentId, RecvPid], []).
54 %%--------------------------------------------------------------------
55 %% Func: remote_request(Pid, Index, Offset, Len)
56 %% Description: The remote end (ie, the peer) requested a chunk
57 %% {Index, Offset, Len}
58 %%--------------------------------------------------------------------
59 remote_request(Pid, Index, Offset, Len) ->
60 gen_server:cast(Pid, {remote_request, Index, Offset, Len}).
62 %%--------------------------------------------------------------------
63 %% Func: local_request(Pid, Index, Offset, Len)
64 %% Description: We request a piece from the peer: {Index, Offset, Len}
65 %%--------------------------------------------------------------------
66 local_request(Pid, {Index, Offset, Size}) ->
67 gen_server:cast(Pid, {local_request, {Index, Offset, Size}}).
69 %%--------------------------------------------------------------------
70 %% Func: cancel(Pid, Index, Offset, Len)
71 %% Description: Cancel the {Index, Offset, Len} at the peer.
72 %%--------------------------------------------------------------------
73 cancel(Pid, Index, Offset, Len) ->
74 gen_server:cast(Pid, {cancel_piece, Index, Offset, Len}).
76 %%--------------------------------------------------------------------
77 %% Func: choke(Pid)
78 %% Description: Choke the peer.
79 %%--------------------------------------------------------------------
80 choke(Pid) ->
81 gen_server:cast(Pid, choke).
83 %%--------------------------------------------------------------------
84 %% Func: unchoke(Pid)
85 %% Description: Unchoke the peer.
86 %%--------------------------------------------------------------------
87 unchoke(Pid) ->
88 gen_server:cast(Pid, unchoke).
90 check_choke(Pid) -> gen_server:cast(Pid, check_choke).
92 %%--------------------------------------------------------------------
93 %% Func: not_interested(Pid)
94 %% Description: Tell the peer we are not interested in him anymore
95 %%--------------------------------------------------------------------
96 not_interested(Pid) ->
97 gen_server:cast(Pid, not_interested).
99 interested(Pid) ->
100 gen_server:cast(Pid, interested).
102 %%--------------------------------------------------------------------
103 %% Func: have(Pid, PieceNumber)
104 %% Description: Tell the peer we have the piece PieceNumber
105 %%--------------------------------------------------------------------
106 have(Pid, PieceNumber) ->
107 gen_server:cast(Pid, {have, PieceNumber}).
109 bitfield(Pid, BitField) ->
110 gen_server:cast(Pid, {bitfield, BitField}).
113 %%--------------------------------------------------------------------
114 %% Func: stop(Pid)
115 %% Description: Tell the send process to stop the communication.
116 %%--------------------------------------------------------------------
117 stop(Pid) ->
118 gen_server:cast(Pid, stop).
120 %%====================================================================
121 %% gen_server callbacks
122 %%====================================================================
123 init([Socket, FilesystemPid, TorrentId, Parent]) ->
124 process_flag(trap_exit, true),
125 {ok, TRef} = timer:send_interval(?DEFAULT_KEEP_ALIVE_INTERVAL, self(), keep_alive_tick),
126 {ok, Tref2} = timer:send_interval(?RATE_UPDATE, self(), rate_update),
127 {ok,
128 #state{socket = Socket,
129 timer = TRef,
130 rate_timer = Tref2,
131 request_queue = queue:new(),
132 rate = etorrent_rate:init(?RATE_FUDGE),
133 parent = Parent,
134 torrent_id = TorrentId,
135 file_system_pid = FilesystemPid},
138 %%--------------------------------------------------------------------
139 %% Function: %% handle_call(Request, From, State) -> {reply, Reply, State} |
140 %% {reply, Reply, State, Timeout} |
141 %% {noreply, State} |
142 %% {noreply, State, Timeout} |
143 %% {stop, Reason, Reply, State} |
144 %% {stop, Reason, State}
145 %% Description: Handling call messages
146 %%--------------------------------------------------------------------
147 handle_call(_Request, _From, State) ->
148 Reply = ok,
149 {reply, Reply, State}.
151 %%--------------------------------------------------------------------
152 %% Func: code_change(OldVsn, State, Extra) -> {ok, NewState}
153 %% Description: Convert process state when code is changed
154 %%--------------------------------------------------------------------
155 code_change(_OldVsn, State, _Extra) ->
156 {ok, State}.
158 handle_info(keep_alive_tick, S) ->
159 send_message(keep_alive, S, 0);
160 handle_info(rate_update, S) ->
161 Rate = etorrent_rate:update(S#state.rate, 0),
162 ok = etorrent_rate_mgr:send_rate(S#state.torrent_id,
163 S#state.parent,
164 Rate#peer_rate.rate,
166 {noreply, S#state { rate = Rate }};
167 handle_info(timeout, S)
168 when S#state.choke =:= true andalso S#state.piece_cache =:= none ->
169 garbage_collect(),
170 {noreply, S};
171 handle_info(timeout, S) when S#state.choke =:= true ->
172 {noreply, S};
173 handle_info(timeout, S) when S#state.choke =:= false ->
174 case queue:out(S#state.request_queue) of
175 {empty, _} ->
176 {noreply, S};
177 {{value, {Index, Offset, Len}}, NewQ} ->
178 send_piece(Index, Offset, Len, S#state { request_queue = NewQ } )
179 end;
180 handle_info(Msg, S) ->
181 error_logger:info_report([got_unknown_message, Msg, S]),
182 {stop, {unknown_msg, Msg}}.
184 handle_cast(choke, S) when S#state.choke == true ->
185 {noreply, S, 0};
186 handle_cast(choke, S) when S#state.choke == false ->
187 ok = etorrent_rate_mgr:local_choke(S#state.torrent_id, S#state.parent),
188 send_message(choke, S#state{choke = true, piece_cache = none});
189 handle_cast(unchoke, S) when S#state.choke == false ->
190 {noreply, S, 0};
191 handle_cast(unchoke, S) when S#state.choke == true ->
192 ok = etorrent_rate_mgr:local_unchoke(S#state.torrent_id, S#state.parent),
193 send_message(unchoke, S#state{choke = false,
194 request_queue = queue:new()});
195 handle_cast(check_choke, S) when S#state.choke =:= true ->
196 {noreply, S, 0};
197 handle_cast(check_choke, S) when S#state.choke =:= false ->
198 ok = etorrent_choker:perform_rechoke(),
199 {noreply, S, 0};
200 handle_cast({bitfield, BF}, S) ->
201 send_message({bitfield, BF}, S);
202 handle_cast(not_interested, S) when S#state.interested =:= false ->
203 {noreply, S, 0};
204 handle_cast(not_interested, S) when S#state.interested =:= true ->
205 send_message(not_interested, S#state { interested = false });
206 handle_cast(interested, S) when S#state.interested =:= true ->
207 {noreply, S, 0};
208 handle_cast(interested, S) when S#state.interested =:= false ->
209 send_message(interested, S#state { interested = true });
210 handle_cast({have, Pn}, S) ->
211 send_message({have, Pn}, S);
212 handle_cast({local_request, {Index, Offset, Size}}, S) ->
213 send_message({request, Index, Offset, Size}, S);
214 handle_cast({remote_request, _Index, _Offset, _Len}, S)
215 when S#state.choke == true ->
216 {noreply, S, 0};
217 handle_cast({remote_request, Index, Offset, Len}, S)
218 when S#state.choke == false ->
219 Requests = queue:len(S#state.request_queue),
220 case Requests > ?MAX_REQUESTS of
221 true ->
222 {stop, max_queue_len_exceeded, S};
223 false ->
224 NQ = queue:in({Index, Offset, Len}, S#state.request_queue),
225 {noreply, S#state{request_queue = NQ}, 0}
226 end;
227 handle_cast({cancel_piece, Index, OffSet, Len}, S) ->
228 NQ = etorrent_utils:queue_remove({Index, OffSet, Len}, S#state.request_queue),
229 {noreply, S#state{request_queue = NQ}, 0};
230 handle_cast(stop, S) ->
231 {stop, normal, S}.
234 %% Terminating normally means we should inform our recv pair
235 terminate(_Reason, S) ->
236 timer:cancel(S#state.timer),
237 timer:cancel(S#state.rate_timer),
240 %%--------------------------------------------------------------------
241 %%% Internal functions
242 %%--------------------------------------------------------------------
244 %%--------------------------------------------------------------------
245 %% Function: send_piece_message/2
246 %% Description: Send the message Msg and handle an eventual connection
247 %% close gracefully.
248 %%--------------------------------------------------------------------
249 send_piece_message(Msg, S, Timeout) ->
250 case etorrent_peer_communication:send_message(S#state.rate, S#state.socket, Msg) of
251 {ok, R, Amount} ->
252 ok = etorrent_rate_mgr:send_rate(S#state.torrent_id,
253 S#state.parent,
254 R#peer_rate.rate,
255 Amount),
256 {noreply, S#state { rate = R }, Timeout};
257 {{error, closed}, R, _Amount} ->
258 {stop, normal, S#state { rate = R}}
259 end.
261 send_piece(Index, Offset, Len, S) ->
262 case S#state.piece_cache of
263 {I, Binary} when I == Index ->
264 <<_Skip:Offset/binary, Data:Len/binary, _R/binary>> = Binary,
265 Msg = {piece, Index, Offset, Data},
266 %% Track uploaded size for torrent (for the tracker)
267 ok = etorrent_torrent:statechange(S#state.torrent_id,
268 {add_upload, Len}),
269 %% Track the amount uploaded by this peer.
271 send_piece_message(Msg, S, 0);
272 %% Update cache and try again...
273 {I, _Binary} when I /= Index ->
274 NS = load_piece(Index, S),
275 send_piece(Index, Offset, Len, NS);
276 none ->
277 NS = load_piece(Index, S),
278 send_piece(Index, Offset, Len, NS)
279 end.
281 load_piece(Index, S) ->
282 {ok, Piece} = etorrent_fs:read_piece(S#state.file_system_pid, Index),
283 S#state{piece_cache = {Index, Piece}}.
285 send_message(Msg, S) ->
286 send_message(Msg, S, 0).
288 send_message(Msg, S, Timeout) ->
289 case etorrent_peer_communication:send_message(S#state.rate, S#state.socket, Msg) of
290 {ok, Rate, Amount} ->
291 ok = etorrent_rate_mgr:send_rate(
292 S#state.torrent_id,
293 S#state.parent,
294 Rate#peer_rate.rate,
295 Amount),
296 {noreply, S#state { rate = Rate}, Timeout};
297 {{error, ebadf}, R, _Amount} ->
298 error_logger:info_report([caught_ebadf, S#state.socket]),
299 {stop, normal, S#state { rate = R}};
300 {{error, closed}, R, _Amount} ->
301 {stop, normal, S#state { rate = R}}
302 end.