Handle initial recv timeouts gracefully.
[etorrent.git] / lib / etorrent-1.0 / src / etorrent_t_peer_recv.erl
blob8945a45bf89400dffe28071e0157b2b6b1e80a4f
1 %%%-------------------------------------------------------------------
2 %%% File : etorrent_t_peer_recv.erl
3 %%% Author : Jesper Louis Andersen <jesper.louis.andersen@gmail.com>
4 %%% License : See COPYING
5 %%% Description : Represents a peers receiving of data
6 %%%
7 %%% Created : 19 Jul 2007 by
8 %%% Jesper Louis Andersen <jesper.louis.andersen@gmail.com>
9 %%%-------------------------------------------------------------------
10 -module(etorrent_t_peer_recv).
12 -behaviour(gen_server).
14 -include("etorrent_mnesia_table.hrl").
15 -include("etorrent_rate.hrl").
17 %% API
18 -export([start_link/6, connect/3, choke/1, unchoke/1, interested/1,
19 send_have_piece/2, complete_handshake/4, endgame_got_chunk/2,
20 queue_pieces/1, stop/1]).
22 %% gen_server callbacks
23 -export([init/1, handle_call/3, handle_cast/2, handle_info/2,
24 terminate/2, code_change/3]).
26 -record(state, { remote_peer_id = none,
27 local_peer_id = none,
28 info_hash = none,
30 pieces_left,
31 seeder = false,
32 tcp_socket = none,
34 remote_choked = true,
36 local_interested = false,
38 remote_request_set = none,
40 piece_set = none,
41 piece_request = [],
43 packet_left = none,
44 packet_iolist = [],
46 endgame = false, % Are we in endgame mode?
48 parent = none,
50 file_system_pid = none,
51 send_pid = none,
53 rate = none,
54 rate_timer = none,
55 torrent_id = none}).
57 -define(DEFAULT_CONNECT_TIMEOUT, 120000). % Default timeout in ms
58 -define(DEFAULT_CHUNK_SIZE, 16384). % Default size for a chunk. All clients use this.
59 -define(HIGH_WATERMARK, 15). % How many chunks to queue up to
60 -define(LOW_WATERMARK, 5). % Requeue when there are less than this number of pieces in queue
61 %%====================================================================
62 %% API
63 %%====================================================================
64 %%--------------------------------------------------------------------
65 %% Function: start_link() -> {ok,Pid} | ignore | {error,Error}
66 %% Description: Starts the server
67 %%--------------------------------------------------------------------
68 start_link(LocalPeerId, InfoHash, FilesystemPid, Id, Parent,
69 {IP, Port}) ->
70 gen_server:start_link(?MODULE, [LocalPeerId, InfoHash,
71 FilesystemPid, Id, Parent,
72 {IP, Port}], []).
74 %%--------------------------------------------------------------------
75 %% Function: stop/1
76 %% Args: Pid ::= pid()
77 %% Description: Gracefully ask the server to stop.
78 %%--------------------------------------------------------------------
79 stop(Pid) ->
80 gen_server:cast(Pid, stop).
82 %%--------------------------------------------------------------------
83 %% Function: connect(Pid, IP, Port)
84 %% Description: Connect to the IP and Portnumber for communication with
85 %% the peer. Note we don't handle the connect in the init phase. This is
86 %% due to the fact that a connect may take a considerable amount of time.
87 %% With this scheme, we spawn off processes, and then make them all attempt
88 %% connects in parallel, which is much easier.
89 %%--------------------------------------------------------------------
90 connect(Pid, IP, Port) ->
91 gen_server:cast(Pid, {connect, IP, Port}).
93 %%--------------------------------------------------------------------
94 %% Function: choke(Pid)
95 %% Description: Choke the peer.
96 %%--------------------------------------------------------------------
97 choke(Pid) ->
98 gen_server:cast(Pid, choke).
100 %%--------------------------------------------------------------------
101 %% Function: unchoke(Pid)
102 %% Description: Unchoke the peer.
103 %%--------------------------------------------------------------------
104 unchoke(Pid) ->
105 gen_server:cast(Pid, unchoke).
107 %%--------------------------------------------------------------------
108 %% Function: interested(Pid)
109 %% Description: Tell the peer we are interested.
110 %%--------------------------------------------------------------------
111 interested(Pid) ->
112 gen_server:cast(Pid, interested).
114 %%--------------------------------------------------------------------
115 %% Function: send_have_piece(Pid, PieceNumber)
116 %% Description: Tell the peer we have just recieved piece PieceNumber.
117 %%--------------------------------------------------------------------
118 send_have_piece(Pid, PieceNumber) ->
119 gen_server:cast(Pid, {send_have_piece, PieceNumber}).
121 %%--------------------------------------------------------------------
122 %% Function: endgame_got_chunk(Pid, Index, Offset) -> ok
123 %% Description: We got the chunk {Index, Offset}, handle it.
124 %%--------------------------------------------------------------------
125 endgame_got_chunk(Pid, Chunk) ->
126 gen_server:cast(Pid, {endgame_got_chunk, Chunk}).
128 %%--------------------------------------------------------------------
129 %% Function: complete_handshake(Pid, ReservedBytes, Socket, PeerId)
130 %% Description: Complete the handshake initiated by another client.
131 %%--------------------------------------------------------------------
132 complete_handshake(Pid, ReservedBytes, Socket, PeerId) ->
133 gen_server:cast(Pid, {complete_handshake, ReservedBytes, Socket, PeerId}).
135 queue_pieces(Pid) ->
136 gen_server:cast(Pid, queue_pieces).
138 %%====================================================================
139 %% gen_server callbacks
140 %%====================================================================
142 %%--------------------------------------------------------------------
143 %% Function: init(Args) -> {ok, State} |
144 %% {ok, State, Timeout} |
145 %% ignore |
146 %% {stop, Reason}
147 %% Description: Initiates the server
148 %%--------------------------------------------------------------------
149 init([LocalPeerId, InfoHash, FilesystemPid, Id, Parent, {IP, Port}]) ->
150 process_flag(trap_exit, true),
151 {ok, TRef} = timer:send_interval(?RATE_UPDATE, self(), rate_update),
152 %% TODO: Update the leeching state to seeding when peer finished torrent.
153 ok = etorrent_peer:new(IP, Port, Id, self(), leeching),
154 [T] = etorrent_torrent:select(Id),
155 {ok, #state{
156 pieces_left = T#torrent.pieces,
157 parent = Parent,
158 local_peer_id = LocalPeerId,
159 piece_set = gb_sets:new(),
160 remote_request_set = gb_trees:empty(),
161 info_hash = InfoHash,
162 torrent_id = Id,
163 rate = etorrent_rate:init(?RATE_FUDGE),
164 rate_timer = TRef,
165 file_system_pid = FilesystemPid}}.
167 %%--------------------------------------------------------------------
168 %% Function: %% handle_call(Request, From, State) -> {reply, Reply, State} |
169 %% {reply, Reply, State, Timeout} |
170 %% {noreply, State} |
171 %% {noreply, State, Timeout} |
172 %% {stop, Reason, Reply, State} |
173 %% {stop, Reason, State}
174 %% Description: Handling call messages
175 %%--------------------------------------------------------------------
176 handle_call(_Request, _From, State) ->
177 Reply = ok,
178 {reply, Reply, State, 0}.
180 %%--------------------------------------------------------------------
181 %% Function: handle_cast(Msg, State) -> {noreply, State} |
182 %% {noreply, State, Timeout} |
183 %% {stop, Reason, State}
184 %% Description: Handling cast messages
185 %%--------------------------------------------------------------------
186 handle_cast({connect, IP, Port}, S) ->
187 case gen_tcp:connect(IP, Port, [binary, {active, false}],
188 ?DEFAULT_CONNECT_TIMEOUT) of
189 {ok, Socket} ->
190 case etorrent_peer_communication:initiate_handshake(
191 Socket,
192 S#state.local_peer_id,
193 S#state.info_hash) of
194 {ok, _ReservedBytes, PeerId}
195 when PeerId == S#state.local_peer_id ->
196 {stop, normal, S};
197 {ok, _ReservedBytes, PeerId} ->
198 complete_connection_setup(S#state { tcp_socket = Socket,
199 remote_peer_id = PeerId});
200 {error, _} ->
201 {stop, normal, S}
202 end;
203 {error, _Reason} ->
204 {stop, normal, S}
205 end;
206 handle_cast({complete_handshake, _ReservedBytes, Socket, RemotePeerId}, S) ->
207 case etorrent_peer_communication:complete_handshake(Socket,
208 S#state.info_hash,
209 S#state.local_peer_id) of
210 ok -> complete_connection_setup(S#state { tcp_socket = Socket,
211 remote_peer_id = RemotePeerId });
212 {error, stop} -> {stop, normal, S}
213 end;
214 handle_cast(choke, S) ->
215 etorrent_t_peer_send:choke(S#state.send_pid),
216 {noreply, S, 0};
217 handle_cast(unchoke, S) ->
218 etorrent_t_peer_send:unchoke(S#state.send_pid),
219 {noreply, S, 0};
220 handle_cast(interested, S) ->
221 {noreply, statechange_interested(S, true), 0};
222 handle_cast({send_have_piece, PieceNumber}, S) ->
223 etorrent_t_peer_send:send_have_piece(S#state.send_pid, PieceNumber),
224 {noreply, S, 0};
225 handle_cast({endgame_got_chunk, Chunk}, S) ->
226 NS = handle_endgame_got_chunk(Chunk, S),
227 {noreply, NS, 0};
228 handle_cast(queue_pieces, S) ->
229 {ok, NS} = try_to_queue_up_pieces(S),
230 {noreply, NS, 0};
231 handle_cast(stop, S) ->
232 {stop, normal, S};
233 handle_cast(_Msg, State) ->
234 {noreply, State, 0}.
236 %%--------------------------------------------------------------------
237 %% Function: handle_info(Info, State) -> {noreply, State} |
238 %% {noreply, State, Timeout} |
239 %% {stop, Reason, State}
240 %% Description: Handling all non call/cast messages
241 %%--------------------------------------------------------------------
242 handle_info(timeout, S) when S#state.tcp_socket =:= none ->
243 %% Haven't started up yet
244 {noreply, S, 3000};
245 handle_info(timeout, S) ->
246 case gen_tcp:recv(S#state.tcp_socket, 0, 3000) of
247 {ok, Packet} ->
248 handle_read_from_socket(S, Packet);
249 {error, closed} ->
250 {stop, normal, S};
251 {error, ebadf} ->
252 {stop, normal, S};
253 {error, etimedout} ->
254 {noreply, S, 0};
255 {error, timeout} when S#state.remote_choked =:= true ->
256 {noreply, S, 0};
257 {error, timeout} when S#state.remote_choked =:= false ->
258 {ok, NS} = try_to_queue_up_pieces(S),
259 {noreply, NS, 0}
260 end;
261 handle_info(rate_update, S) ->
262 Rate = etorrent_rate:update(S#state.rate, 0),
263 ok = etorrent_rate_mgr:recv_rate(S#state.torrent_id, self(), Rate#peer_rate.rate, 0),
264 {noreply, S#state { rate = Rate}, 0};
265 handle_info(_Info, State) ->
266 {noreply, State, 0}.
268 %%--------------------------------------------------------------------
269 %% Function: terminate(Reason, State) -> void()
270 %% Description: This function is called by a gen_server when it is about to
271 %% terminate. It should be the opposite of Module:init/1 and do any necessary
272 %% cleaning up. When it returns, the gen_server terminates with Reason.
273 %% The return value is ignored.
274 %%--------------------------------------------------------------------
275 terminate(Reason, S) ->
276 etorrent_peer:delete(self()),
277 _NS = unqueue_all_pieces(S),
278 case S#state.tcp_socket of
279 none ->
281 Sock ->
282 gen_tcp:close(Sock)
283 end,
284 case Reason of
285 normal -> ok;
286 shutdown -> ok;
287 _ -> error_logger:info_report([reason_for_termination, Reason])
288 end,
291 %%--------------------------------------------------------------------
292 %% Func: code_change(OldVsn, State, Extra) -> {ok, NewState}
293 %% Description: Convert process state when code is changed
294 %%--------------------------------------------------------------------
295 code_change(_OldVsn, State, _Extra) ->
296 {ok, State}.
298 %%--------------------------------------------------------------------
299 %%% Internal functions
300 %%--------------------------------------------------------------------
302 %%--------------------------------------------------------------------
303 %% Func: handle_message(Msg, State) -> {ok, NewState} | {stop, Reason, NewState}
304 %% Description: Process an incoming message Msg from the wire. Return either
305 %% {ok, S} if the processing was ok, or {stop, Reason, S} in case of an error.
306 %%--------------------------------------------------------------------
307 handle_message(keep_alive, S) ->
308 {ok, S};
309 handle_message(choke, S) ->
310 ok = etorrent_rate_mgr:choke(S#state.torrent_id, self()),
311 NS = unqueue_all_pieces(S),
312 {ok, NS#state { remote_choked = true }};
313 handle_message(unchoke, S) ->
314 ok = etorrent_rate_mgr:unchoke(S#state.torrent_id, self()),
315 try_to_queue_up_pieces(S#state{remote_choked = false});
316 handle_message(interested, S) ->
317 ok = etorrent_rate_mgr:interested(S#state.torrent_id, self()),
318 ok = etorrent_t_peer_send:check_choke(S#state.send_pid),
319 {ok, S};
320 handle_message(not_interested, S) ->
321 ok = etorrent_rate_mgr:not_interested(S#state.torrent_id, self()),
322 ok = etorrent_t_peer_send:check_choke(S#state.send_pid),
323 {ok, S};
324 handle_message({request, Index, Offset, Len}, S) ->
325 etorrent_t_peer_send:remote_request(S#state.send_pid, Index, Offset, Len),
326 {ok, S};
327 handle_message({cancel, Index, Offset, Len}, S) ->
328 etorrent_t_peer_send:cancel(S#state.send_pid, Index, Offset, Len),
329 {ok, S};
330 handle_message({have, PieceNum}, S) ->
331 case etorrent_piece_mgr:valid(S#state.torrent_id, PieceNum) of
332 true ->
333 PieceSet = gb_sets:add_element(PieceNum, S#state.piece_set),
334 Left = S#state.pieces_left - 1,
335 case Left of
336 0 -> ok = etorrent_peer:statechange(self(), seeder);
337 _N -> ok
338 end,
339 NS = S#state{piece_set = PieceSet,
340 pieces_left = Left,
341 seeder = Left == 0},
342 case etorrent_piece_mgr:interesting(S#state.torrent_id, PieceNum) of
343 true when S#state.local_interested =:= true ->
344 try_to_queue_up_pieces(S);
345 true when S#state.local_interested =:= false ->
346 try_to_queue_up_pieces(statechange_interested(S, true));
347 false ->
348 {ok, NS}
349 end;
350 false ->
351 {ok, {IP, Port}} = inet:peername(S#state.tcp_socket),
352 etorrent_bad_peer_mgr:enter_peer(IP, Port, S#state.remote_peer_id),
353 {stop, normal, S}
354 end;
355 handle_message({bitfield, BitField}, S) ->
356 case gb_sets:size(S#state.piece_set) of
357 0 ->
358 Size = etorrent_torrent:num_pieces(S#state.torrent_id),
359 {ok, PieceSet} =
360 etorrent_peer_communication:destruct_bitfield(Size, BitField),
361 Left = S#state.pieces_left - gb_sets:size(PieceSet),
362 case Left of
363 0 -> ok = etorrent_peer:statechange(self(), seeder);
364 _N -> ok
365 end,
366 case etorrent_piece_mgr:check_interest(S#state.torrent_id, PieceSet) of
367 interested ->
368 {ok, statechange_interested(S#state {piece_set = PieceSet,
369 pieces_left = Left,
370 seeder = Left == 0},
371 true)};
372 not_interested ->
373 {ok, S#state{piece_set = PieceSet, pieces_left = Left,
374 seeder = Left == 0}};
375 invalid_piece ->
376 {stop, {invalid_piece_2, S#state.remote_peer_id}, S}
377 end;
378 N when is_integer(N) ->
379 %% This is a bad peer. Kill him!
380 {ok, {IP, Port}} = inet:peername(S#state.tcp_socket),
381 etorrent_bad_peer_mgr:enter_peer(IP, Port, S#state.remote_peer_id),
382 {stop, normal, S}
383 end;
384 handle_message({piece, Index, Offset, Data}, S) ->
385 case handle_got_chunk(Index, Offset, Data, size(Data), S) of
386 {ok, NS} ->
387 try_to_queue_up_pieces(NS)
388 end;
389 handle_message(Unknown, S) ->
390 {stop, {unknown_message, Unknown}, S}.
393 %%--------------------------------------------------------------------
394 %% Func: handle_endgame_got_chunk(Index, Offset, S) -> State
395 %% Description: Some other peer just downloaded {Index, Offset, Len} so try
396 %% not to download it here if we can avoid it.
397 %%--------------------------------------------------------------------
398 handle_endgame_got_chunk({Index, Offset, Len}, S) ->
399 case gb_trees:is_defined({Index, Offset, Len}, S#state.remote_request_set) of
400 true ->
401 %% Delete the element from the request set.
402 RS = gb_trees:delete({Index, Offset, Len}, S#state.remote_request_set),
403 etorrent_t_peer_send:cancel(S#state.send_pid,
404 Index,
405 Offset,
406 Len),
407 etorrent_chunk_mgr:endgame_remove_chunk(S#state.send_pid,
408 S#state.torrent_id,
409 {Index, Offset, Len}),
410 S#state { remote_request_set = RS };
411 false ->
412 %% Not an element in the request queue, ignore
413 etorrent_chunk_mgr:endgame_remove_chunk(S#state.send_pid,
414 S#state.torrent_id,
415 {Index, Offset, Len}),
417 end.
419 %%--------------------------------------------------------------------
420 %% Func: handle_got_chunk(Index, Offset, Data, Len, S) -> {ok, State}
421 %% Description: We just got some chunk data. Store it in the mnesia DB
422 %%--------------------------------------------------------------------
423 handle_got_chunk(Index, Offset, Data, Len, S) ->
424 case gb_trees:lookup({Index, Offset, Len},
425 S#state.remote_request_set) of
426 {value, Ops} ->
427 ok = etorrent_fs:write_chunk(S#state.file_system_pid,
428 {Index, Data, Ops}),
429 case etorrent_chunk_mgr:store_chunk(S#state.torrent_id,
430 Index,
431 {Offset, Len},
432 self()) of
433 full ->
434 etorrent_fs:check_piece(S#state.file_system_pid, Index);
435 ok ->
437 end,
438 %% Tell other peers we got the chunk if in endgame
439 case S#state.endgame of
440 true ->
441 case etorrent_chunk_mgr:mark_fetched(S#state.torrent_id,
442 {Index, Offset, Len}) of
443 found ->
445 assigned ->
446 broadcast_got_chunk({Index, Offset, Len}, S#state.torrent_id)
447 end;
448 false ->
450 end,
451 RS = gb_trees:delete_any({Index, Offset, Len}, S#state.remote_request_set),
452 {ok, S#state { remote_request_set = RS }};
453 none ->
454 %% Stray piece, we could try to get hold of it but for now we just
455 %% throw it on the floor.
456 {ok, S}
457 end.
459 %%--------------------------------------------------------------------
460 %% Function: unqueue_all_pieces/1
461 %% Description: Unqueue all queued pieces at the other end. We place
462 %% the earlier queued items at the end to compensate for quick
463 %% choke/unchoke problems and live data.
464 %%--------------------------------------------------------------------
465 unqueue_all_pieces(S) ->
466 %% Put chunks back
467 ok = etorrent_chunk_mgr:putback_chunks(self()),
468 %% Tell other peers that there is 0xf00d!
469 broadcast_queue_pieces(S#state.torrent_id),
470 %% Clean up the request set.
471 S#state{remote_request_set = gb_trees:empty()}.
473 %%--------------------------------------------------------------------
474 %% Function: try_to_queue_up_requests(state()) -> {ok, state()}
475 %% Description: Try to queue up requests at the other end.
476 %%--------------------------------------------------------------------
477 try_to_queue_up_pieces(S) when S#state.remote_choked == true ->
478 {ok, S};
479 try_to_queue_up_pieces(S) ->
480 case gb_trees:size(S#state.remote_request_set) of
481 N when N > ?LOW_WATERMARK ->
482 {ok, S};
483 %% Optimization: Only replenish pieces modulo some N
484 N when is_integer(N) ->
485 PiecesToQueue = ?HIGH_WATERMARK - N,
486 case etorrent_chunk_mgr:pick_chunks(self(),
487 S#state.torrent_id,
488 S#state.piece_set,
489 PiecesToQueue) of
490 not_interested ->
491 {ok, statechange_interested(S, false)};
492 none_eligible ->
493 {ok, S};
494 {ok, Items} ->
495 queue_items(Items, S);
496 {endgame, Items} ->
497 queue_items(Items, S#state { endgame = true })
499 end.
501 %%--------------------------------------------------------------------
502 %% Function: queue_items/2
503 %% Args: ChunkList ::= [CompactChunk | ExplicitChunk]
504 %% S ::= #state
505 %% CompactChunk ::= {PieceNumber, ChunkList}
506 %% ExplicitChunk ::= {PieceNumber, Offset, Size, Ops}
507 %% ChunkList ::= [{Offset, Size, Ops}]
508 %% PieceNumber, Offset, Size ::= integer()
509 %% Ops ::= file_operations - described elsewhere.
510 %% Description: Send chunk messages for each chunk we decided to queue.
511 %% also add these chunks to the piece request set.
512 %%--------------------------------------------------------------------
513 queue_items(ChunkList, S) ->
514 RSet = queue_items(ChunkList, S#state.send_pid, S#state.remote_request_set),
515 {ok, S#state { remote_request_set = RSet }}.
517 queue_items([], _SendPid, Tree) -> Tree;
518 queue_items([{Pn, Chunks} | Rest], SendPid, Tree) ->
519 NT = lists:foldl(
520 fun ({Offset, Size, Ops}, T) ->
521 case gb_trees:is_defined({Pn, Offset, Size}, T) of
522 true ->
523 Tree;
524 false ->
525 etorrent_t_peer_send:local_request(SendPid,
526 {Pn, Offset, Size}),
527 gb_trees:enter({Pn, Offset, Size}, Ops, T)
529 end,
530 Tree,
531 Chunks),
532 queue_items(Rest, SendPid, NT);
533 queue_items([{Pn, Offset, Size, Ops} | Rest], SendPid, Tree) ->
534 NT = case gb_trees:is_defined({Pn, Offset, Size}, Tree) of
535 true ->
536 Tree;
537 false ->
538 etorrent_t_peer_send:local_request(SendPid,
539 {Pn, Offset, Size}),
540 gb_trees:enter({Pn, Offset, Size}, Ops, Tree)
541 end,
542 queue_items(Rest, SendPid, NT).
544 %%--------------------------------------------------------------------
545 %% Function: complete_connection_setup() -> gen_server_reply()}
546 %% Description: Do the bookkeeping needed to set up the peer:
547 %% * enable passive messaging mode on the socket.
548 %% * Start the send pid
549 %% * Send off the bitfield
550 %%--------------------------------------------------------------------
551 complete_connection_setup(S) ->
552 {ok, SendPid} = etorrent_t_peer_sup:add_sender(S#state.parent,
553 S#state.tcp_socket,
554 S#state.file_system_pid,
555 S#state.torrent_id,
556 self()),
557 BF = etorrent_piece_mgr:bitfield(S#state.torrent_id),
558 etorrent_t_peer_send:bitfield(SendPid, BF),
560 {noreply, S#state{send_pid = SendPid}, 0}.
562 statechange_interested(S, What) ->
563 etorrent_t_peer_send:interested(S#state.send_pid),
564 S#state{local_interested = What}.
567 %%--------------------------------------------------------------------
568 %% Function: handle_read_from_socket(State, Packet) -> NewState
569 %% Packet ::= binary()
570 %% State ::= #state()
571 %% Description: Packet came in. Handle it.
572 %%--------------------------------------------------------------------
573 handle_read_from_socket(S, <<>>) ->
574 {noreply, S, 0};
575 handle_read_from_socket(S, <<0:32/big-integer, Rest/binary>>) when S#state.packet_left =:= none ->
576 handle_read_from_socket(S, Rest);
577 handle_read_from_socket(S, <<Left:32/big-integer, Rest/binary>>) when S#state.packet_left =:= none ->
578 handle_read_from_socket(S#state { packet_left = Left,
579 packet_iolist = []}, Rest);
580 handle_read_from_socket(S, Packet) when is_binary(S#state.packet_left) ->
581 H = S#state.packet_left,
582 handle_read_from_socket(S#state { packet_left = none },
583 <<H/binary, Packet/binary>>);
584 handle_read_from_socket(S, Packet) when size(Packet) < 4, S#state.packet_left =:= none ->
585 {noreply, S#state { packet_left = Packet }};
586 handle_read_from_socket(S, Packet)
587 when size(Packet) >= S#state.packet_left, is_integer(S#state.packet_left) ->
588 Left = S#state.packet_left,
589 <<Data:Left/binary, Rest/binary>> = Packet,
590 Left = size(Data),
591 P = iolist_to_binary(lists:reverse([Data | S#state.packet_iolist])),
592 {Msg, Rate, Amount} = etorrent_peer_communication:recv_message(S#state.rate, P),
593 case Msg of
594 {piece, _, _ ,_} ->
595 ok = etorrent_rate_mgr:recv_rate(S#state.torrent_id,
596 self(), Rate#peer_rate.rate,
597 Amount, last_update);
598 _Msg ->
599 ok = etorrent_rate_mgr:recv_rate(S#state.torrent_id,
600 self(), Rate#peer_rate.rate,
601 Amount, normal)
602 end,
603 case handle_message(Msg, S#state {rate = Rate}) of
604 {ok, NS} ->
605 handle_read_from_socket(NS#state { packet_left = none,
606 packet_iolist = []},
607 Rest);
608 {stop, Err, NS} ->
609 {stop, Err, NS}
610 end;
611 handle_read_from_socket(S, Packet)
612 when size(Packet) < S#state.packet_left, is_integer(S#state.packet_left) ->
613 {noreply, S#state { packet_iolist = [Packet | S#state.packet_iolist],
614 packet_left = S#state.packet_left - size(Packet) }, 0}.
616 broadcast_queue_pieces(TorrentId) ->
617 Peers = etorrent_peer:all(TorrentId),
618 lists:foreach(fun (P) ->
619 etorrent_t_peer_recv:queue_pieces(P#peer.pid)
620 end,
621 Peers).
623 broadcast_got_chunk(Chunk, TorrentId) ->
624 Peers = etorrent_peer:all(TorrentId),
625 lists:foreach(fun (Peer) ->
626 etorrent_t_peer_recv:endgame_got_chunk(Peer#peer.pid, Chunk)
627 end,
628 Peers).