Globalize the etorrent_t_peer_group_mgr and rename it to etorrent_choker while here.
[etorrent.git] / lib / etorrent-1.0 / src / etorrent_t_peer_recv.erl
blob95b9c7cb073d11b12fa5e93bd8264a9d2ab20bb3
1 %%%-------------------------------------------------------------------
2 %%% File : etorrent_t_peer_recv.erl
3 %%% Author : Jesper Louis Andersen <jesper.louis.andersen@gmail.com>
4 %%% License : See COPYING
5 %%% Description : Represents a peers receiving of data
6 %%%
7 %%% Created : 19 Jul 2007 by
8 %%% Jesper Louis Andersen <jesper.louis.andersen@gmail.com>
9 %%%-------------------------------------------------------------------
10 -module(etorrent_t_peer_recv).
12 -behaviour(gen_server).
14 -include("etorrent_mnesia_table.hrl").
15 -include("etorrent_rate.hrl").
17 %% API
18 -export([start_link/6, connect/3, choke/1, unchoke/1, interested/1,
19 send_have_piece/2, complete_handshake/4, endgame_got_chunk/2,
20 queue_pieces/1, stop/1]).
22 %% gen_server callbacks
23 -export([init/1, handle_call/3, handle_cast/2, handle_info/2,
24 terminate/2, code_change/3]).
26 -record(state, { remote_peer_id = none,
27 local_peer_id = none,
28 info_hash = none,
30 tcp_socket = none,
32 remote_choked = true,
34 local_interested = false,
36 remote_request_set = none,
38 piece_set = none,
39 piece_request = [],
41 packet_left = none,
42 packet_iolist = [],
44 endgame = false, % Are we in endgame mode?
46 parent = none,
48 file_system_pid = none,
49 send_pid = none,
51 rate = none,
52 rate_timer = none,
53 torrent_id = none}).
55 -define(DEFAULT_CONNECT_TIMEOUT, 120000). % Default timeout in ms
56 -define(DEFAULT_CHUNK_SIZE, 16384). % Default size for a chunk. All clients use this.
57 -define(HIGH_WATERMARK, 15). % How many chunks to queue up to
58 -define(LOW_WATERMARK, 5). % Requeue when there are less than this number of pieces in queue
59 %%====================================================================
60 %% API
61 %%====================================================================
62 %%--------------------------------------------------------------------
63 %% Function: start_link() -> {ok,Pid} | ignore | {error,Error}
64 %% Description: Starts the server
65 %%--------------------------------------------------------------------
66 start_link(LocalPeerId, InfoHash, FilesystemPid, Id, Parent,
67 {IP, Port}) ->
68 gen_server:start_link(?MODULE, [LocalPeerId, InfoHash,
69 FilesystemPid, Id, Parent,
70 {IP, Port}], []).
72 %%--------------------------------------------------------------------
73 %% Function: stop/1
74 %% Args: Pid ::= pid()
75 %% Description: Gracefully ask the server to stop.
76 %%--------------------------------------------------------------------
77 stop(Pid) ->
78 gen_server:cast(Pid, stop).
80 %%--------------------------------------------------------------------
81 %% Function: connect(Pid, IP, Port)
82 %% Description: Connect to the IP and Portnumber for communication with
83 %% the peer. Note we don't handle the connect in the init phase. This is
84 %% due to the fact that a connect may take a considerable amount of time.
85 %% With this scheme, we spawn off processes, and then make them all attempt
86 %% connects in parallel, which is much easier.
87 %%--------------------------------------------------------------------
88 connect(Pid, IP, Port) ->
89 gen_server:cast(Pid, {connect, IP, Port}).
91 %%--------------------------------------------------------------------
92 %% Function: choke(Pid)
93 %% Description: Choke the peer.
94 %%--------------------------------------------------------------------
95 choke(Pid) ->
96 gen_server:cast(Pid, choke).
98 %%--------------------------------------------------------------------
99 %% Function: unchoke(Pid)
100 %% Description: Unchoke the peer.
101 %%--------------------------------------------------------------------
102 unchoke(Pid) ->
103 gen_server:cast(Pid, unchoke).
105 %%--------------------------------------------------------------------
106 %% Function: interested(Pid)
107 %% Description: Tell the peer we are interested.
108 %%--------------------------------------------------------------------
109 interested(Pid) ->
110 gen_server:cast(Pid, interested).
112 %%--------------------------------------------------------------------
113 %% Function: send_have_piece(Pid, PieceNumber)
114 %% Description: Tell the peer we have just recieved piece PieceNumber.
115 %%--------------------------------------------------------------------
116 send_have_piece(Pid, PieceNumber) ->
117 gen_server:cast(Pid, {send_have_piece, PieceNumber}).
119 %%--------------------------------------------------------------------
120 %% Function: endgame_got_chunk(Pid, Index, Offset) -> ok
121 %% Description: We got the chunk {Index, Offset}, handle it.
122 %%--------------------------------------------------------------------
123 endgame_got_chunk(Pid, Chunk) ->
124 gen_server:cast(Pid, {endgame_got_chunk, Chunk}).
126 %%--------------------------------------------------------------------
127 %% Function: complete_handshake(Pid, ReservedBytes, Socket, PeerId)
128 %% Description: Complete the handshake initiated by another client.
129 %%--------------------------------------------------------------------
130 complete_handshake(Pid, ReservedBytes, Socket, PeerId) ->
131 gen_server:cast(Pid, {complete_handshake, ReservedBytes, Socket, PeerId}).
133 queue_pieces(Pid) ->
134 gen_server:cast(Pid, queue_pieces).
136 %%====================================================================
137 %% gen_server callbacks
138 %%====================================================================
140 %%--------------------------------------------------------------------
141 %% Function: init(Args) -> {ok, State} |
142 %% {ok, State, Timeout} |
143 %% ignore |
144 %% {stop, Reason}
145 %% Description: Initiates the server
146 %%--------------------------------------------------------------------
147 init([LocalPeerId, InfoHash, FilesystemPid, Id, Parent, {IP, Port}]) ->
148 process_flag(trap_exit, true),
149 {ok, TRef} = timer:send_interval(?RATE_UPDATE, self(), rate_update),
150 ok = etorrent_peer:new(IP, Port, Id, self()),
151 {ok, #state{
152 parent = Parent,
153 local_peer_id = LocalPeerId,
154 piece_set = gb_sets:new(),
155 remote_request_set = gb_trees:empty(),
156 info_hash = InfoHash,
157 torrent_id = Id,
158 rate = etorrent_rate:init(?RATE_FUDGE),
159 rate_timer = TRef,
160 file_system_pid = FilesystemPid}}.
162 %%--------------------------------------------------------------------
163 %% Function: %% handle_call(Request, From, State) -> {reply, Reply, State} |
164 %% {reply, Reply, State, Timeout} |
165 %% {noreply, State} |
166 %% {noreply, State, Timeout} |
167 %% {stop, Reason, Reply, State} |
168 %% {stop, Reason, State}
169 %% Description: Handling call messages
170 %%--------------------------------------------------------------------
171 handle_call(_Request, _From, State) ->
172 Reply = ok,
173 {reply, Reply, State, 0}.
175 %%--------------------------------------------------------------------
176 %% Function: handle_cast(Msg, State) -> {noreply, State} |
177 %% {noreply, State, Timeout} |
178 %% {stop, Reason, State}
179 %% Description: Handling cast messages
180 %%--------------------------------------------------------------------
181 handle_cast({connect, IP, Port}, S) ->
182 case gen_tcp:connect(IP, Port, [binary, {active, false}],
183 ?DEFAULT_CONNECT_TIMEOUT) of
184 {ok, Socket} ->
185 case etorrent_peer_communication:initiate_handshake(
186 Socket,
187 S#state.local_peer_id,
188 S#state.info_hash) of
189 {ok, _ReservedBytes, PeerId}
190 when PeerId == S#state.local_peer_id ->
191 {stop, normal, S};
192 {ok, _ReservedBytes, PeerId} ->
193 complete_connection_setup(S#state { tcp_socket = Socket,
194 remote_peer_id = PeerId});
195 {error, _} ->
196 {stop, normal, S}
197 end;
198 {error, _Reason} ->
199 {stop, normal, S}
200 end;
201 handle_cast({complete_handshake, _ReservedBytes, Socket, RemotePeerId}, S) ->
202 case etorrent_peer_communication:complete_handshake(Socket,
203 S#state.info_hash,
204 S#state.local_peer_id) of
205 ok -> complete_connection_setup(S#state { tcp_socket = Socket,
206 remote_peer_id = RemotePeerId });
207 {error, stop} -> {stop, normal, S}
208 end;
209 handle_cast(choke, S) ->
210 etorrent_t_peer_send:choke(S#state.send_pid),
211 {noreply, S, 0};
212 handle_cast(unchoke, S) ->
213 etorrent_t_peer_send:unchoke(S#state.send_pid),
214 {noreply, S, 0};
215 handle_cast(interested, S) ->
216 {noreply, statechange_interested(S, true), 0};
217 handle_cast({send_have_piece, PieceNumber}, S) ->
218 etorrent_t_peer_send:send_have_piece(S#state.send_pid, PieceNumber),
219 {noreply, S, 0};
220 handle_cast({endgame_got_chunk, Chunk}, S) ->
221 NS = handle_endgame_got_chunk(Chunk, S),
222 {noreply, NS, 0};
223 handle_cast(queue_pieces, S) ->
224 {ok, NS} = try_to_queue_up_pieces(S),
225 {noreply, NS, 0};
226 handle_cast(stop, S) ->
227 {stop, normal, S};
228 handle_cast(_Msg, State) ->
229 {noreply, State, 0}.
231 %%--------------------------------------------------------------------
232 %% Function: handle_info(Info, State) -> {noreply, State} |
233 %% {noreply, State, Timeout} |
234 %% {stop, Reason, State}
235 %% Description: Handling all non call/cast messages
236 %%--------------------------------------------------------------------
237 handle_info(timeout, S) ->
238 case gen_tcp:recv(S#state.tcp_socket, 0, 3000) of
239 {ok, Packet} ->
240 handle_read_from_socket(S, Packet);
241 {error, closed} ->
242 {stop, normal, S};
243 {error, ebadf} ->
244 {stop, normal, S};
245 {error, etimedout} ->
246 {noreply, S, 0};
247 {error, timeout} when S#state.remote_choked =:= true ->
248 {noreply, S, 0};
249 {error, timeout} when S#state.remote_choked =:= false ->
250 {ok, NS} = try_to_queue_up_pieces(S),
251 {noreply, NS, 0}
252 end;
253 handle_info(rate_update, S) ->
254 Rate = etorrent_rate:update(S#state.rate, 0),
255 ok = etorrent_rate_mgr:recv_rate(S#state.torrent_id, self(), Rate#peer_rate.rate, 0),
256 {noreply, S#state { rate = Rate}, 0};
257 handle_info(_Info, State) ->
258 {noreply, State, 0}.
260 %%--------------------------------------------------------------------
261 %% Function: terminate(Reason, State) -> void()
262 %% Description: This function is called by a gen_server when it is about to
263 %% terminate. It should be the opposite of Module:init/1 and do any necessary
264 %% cleaning up. When it returns, the gen_server terminates with Reason.
265 %% The return value is ignored.
266 %%--------------------------------------------------------------------
267 terminate(Reason, S) ->
268 etorrent_peer:delete(self()),
269 _NS = unqueue_all_pieces(S),
270 case S#state.tcp_socket of
271 none ->
273 Sock ->
274 gen_tcp:close(Sock)
275 end,
276 case Reason of
277 normal -> ok;
278 shutdown -> ok;
279 _ -> error_logger:info_report([reason_for_termination, Reason])
280 end,
283 %%--------------------------------------------------------------------
284 %% Func: code_change(OldVsn, State, Extra) -> {ok, NewState}
285 %% Description: Convert process state when code is changed
286 %%--------------------------------------------------------------------
287 code_change(_OldVsn, State, _Extra) ->
288 {ok, State}.
290 %%--------------------------------------------------------------------
291 %%% Internal functions
292 %%--------------------------------------------------------------------
294 %%--------------------------------------------------------------------
295 %% Func: handle_message(Msg, State) -> {ok, NewState} | {stop, Reason, NewState}
296 %% Description: Process an incoming message Msg from the wire. Return either
297 %% {ok, S} if the processing was ok, or {stop, Reason, S} in case of an error.
298 %%--------------------------------------------------------------------
299 handle_message(keep_alive, S) ->
300 {ok, S};
301 handle_message(choke, S) ->
302 ok = etorrent_rate_mgr:choke(S#state.torrent_id, self()),
303 NS = unqueue_all_pieces(S),
304 {ok, NS#state { remote_choked = true }};
305 handle_message(unchoke, S) ->
306 ok = etorrent_rate_mgr:unchoke(S#state.torrent_id, self()),
307 try_to_queue_up_pieces(S#state{remote_choked = false});
308 handle_message(interested, S) ->
309 ok = etorrent_rate_mgr:interested(S#state.torrent_id, self()),
310 %%TODO: Only call this if the guy is *not* choked by us
311 etorrent_choker:perform_rechoke(),
312 {ok, S};
313 handle_message(not_interested, S) ->
314 ok = etorrent_rate_mgr:not_interested(S#state.torrent_id, self()),
315 %%TODO: Only call this if the guy is *not* choked by us
316 etorrent_choker:perform_rechoke(),
317 {ok, S};
318 handle_message({request, Index, Offset, Len}, S) ->
319 etorrent_t_peer_send:remote_request(S#state.send_pid, Index, Offset, Len),
320 {ok, S};
321 handle_message({cancel, Index, Offset, Len}, S) ->
322 etorrent_t_peer_send:cancel(S#state.send_pid, Index, Offset, Len),
323 {ok, S};
324 handle_message({have, PieceNum}, S) ->
325 case etorrent_piece_mgr:valid(S#state.torrent_id, PieceNum) of
326 true ->
327 PieceSet = gb_sets:add_element(PieceNum, S#state.piece_set),
328 NS = S#state{piece_set = PieceSet},
329 case etorrent_piece_mgr:interesting(S#state.torrent_id, PieceNum) of
330 true when S#state.local_interested =:= true ->
331 try_to_queue_up_pieces(S);
332 true when S#state.local_interested =:= false ->
333 try_to_queue_up_pieces(statechange_interested(S, true));
334 false ->
335 {ok, NS}
336 end;
337 false ->
338 {ok, {IP, Port}} = inet:peername(S#state.tcp_socket),
339 etorrent_bad_peer_mgr:enter_peer(IP, Port, S#state.remote_peer_id),
340 {stop, normal, S}
341 end;
342 handle_message({bitfield, BitField}, S) ->
343 case gb_sets:size(S#state.piece_set) of
344 0 ->
345 Size = etorrent_torrent:num_pieces(S#state.torrent_id),
346 {ok, PieceSet} =
347 etorrent_peer_communication:destruct_bitfield(Size, BitField),
348 case etorrent_piece_mgr:check_interest(S#state.torrent_id, PieceSet) of
349 interested ->
350 {ok, statechange_interested(S#state {piece_set = PieceSet},
351 true)};
352 not_interested ->
353 {ok, S#state{piece_set = PieceSet}};
354 invalid_piece ->
355 {stop, {invalid_piece_2, S#state.remote_peer_id}, S}
356 end;
357 N when is_integer(N) ->
358 %% This is a bad peer. Kill him!
359 {ok, {IP, Port}} = inet:peername(S#state.tcp_socket),
360 etorrent_bad_peer_mgr:enter_peer(IP, Port, S#state.remote_peer_id),
361 {stop, normal, S}
362 end;
363 handle_message({piece, Index, Offset, Data}, S) ->
364 case handle_got_chunk(Index, Offset, Data, size(Data), S) of
365 {ok, NS} ->
366 try_to_queue_up_pieces(NS)
367 end;
368 handle_message(Unknown, S) ->
369 {stop, {unknown_message, Unknown}, S}.
372 %%--------------------------------------------------------------------
373 %% Func: handle_endgame_got_chunk(Index, Offset, S) -> State
374 %% Description: Some other peer just downloaded {Index, Offset, Len} so try
375 %% not to download it here if we can avoid it.
376 %%--------------------------------------------------------------------
377 handle_endgame_got_chunk({Index, Offset, Len}, S) ->
378 case gb_trees:is_defined({Index, Offset, Len}, S#state.remote_request_set) of
379 true ->
380 %% Delete the element from the request set.
381 RS = gb_trees:delete({Index, Offset, Len}, S#state.remote_request_set),
382 etorrent_t_peer_send:cancel(S#state.send_pid,
383 Index,
384 Offset,
385 Len),
386 etorrent_chunk_mgr:endgame_remove_chunk(S#state.send_pid,
387 S#state.torrent_id,
388 {Index, Offset, Len}),
389 S#state { remote_request_set = RS };
390 false ->
391 %% Not an element in the request queue, ignore
392 etorrent_chunk_mgr:endgame_remove_chunk(S#state.send_pid,
393 S#state.torrent_id,
394 {Index, Offset, Len}),
396 end.
398 %%--------------------------------------------------------------------
399 %% Func: handle_got_chunk(Index, Offset, Data, Len, S) -> {ok, State}
400 %% Description: We just got some chunk data. Store it in the mnesia DB
401 %%--------------------------------------------------------------------
402 handle_got_chunk(Index, Offset, Data, Len, S) ->
403 case gb_trees:lookup({Index, Offset, Len},
404 S#state.remote_request_set) of
405 {value, Ops} ->
406 ok = etorrent_fs:write_chunk(S#state.file_system_pid,
407 {Index, Data, Ops}),
408 case etorrent_chunk_mgr:store_chunk(S#state.torrent_id,
409 Index,
410 {Offset, Len},
411 self()) of
412 full ->
413 etorrent_fs:check_piece(S#state.file_system_pid, Index);
414 ok ->
416 end,
417 %% Tell other peers we got the chunk if in endgame
418 case S#state.endgame of
419 true ->
420 case etorrent_chunk_mgr:mark_fetched(S#state.torrent_id,
421 {Index, Offset, Len}) of
422 found ->
424 assigned ->
425 broadcast_got_chunk({Index, Offset, Len}, S#state.torrent_id)
426 end;
427 false ->
429 end,
430 RS = gb_trees:delete_any({Index, Offset, Len}, S#state.remote_request_set),
431 {ok, S#state { remote_request_set = RS }};
432 none ->
433 %% Stray piece, we could try to get hold of it but for now we just
434 %% throw it on the floor.
435 {ok, S}
436 end.
438 %%--------------------------------------------------------------------
439 %% Function: unqueue_all_pieces/1
440 %% Description: Unqueue all queued pieces at the other end. We place
441 %% the earlier queued items at the end to compensate for quick
442 %% choke/unchoke problems and live data.
443 %%--------------------------------------------------------------------
444 unqueue_all_pieces(S) ->
445 %% Put chunks back
446 ok = etorrent_chunk_mgr:putback_chunks(self()),
447 %% Tell other peers that there is 0xf00d!
448 broadcast_queue_pieces(S#state.torrent_id),
449 %% Clean up the request set.
450 S#state{remote_request_set = gb_trees:empty()}.
452 %%--------------------------------------------------------------------
453 %% Function: try_to_queue_up_requests(state()) -> {ok, state()}
454 %% Description: Try to queue up requests at the other end.
455 %%--------------------------------------------------------------------
456 try_to_queue_up_pieces(S) when S#state.remote_choked == true ->
457 {ok, S};
458 try_to_queue_up_pieces(S) ->
459 case gb_trees:size(S#state.remote_request_set) of
460 N when N > ?LOW_WATERMARK ->
461 {ok, S};
462 %% Optimization: Only replenish pieces modulo some N
463 N when is_integer(N) ->
464 PiecesToQueue = ?HIGH_WATERMARK - N,
465 case etorrent_chunk_mgr:pick_chunks(self(),
466 S#state.torrent_id,
467 S#state.piece_set,
468 PiecesToQueue) of
469 not_interested ->
470 {ok, statechange_interested(S, false)};
471 none_eligible ->
472 {ok, S};
473 {ok, Items} ->
474 queue_items(Items, S);
475 {endgame, Items} ->
476 queue_items(Items, S#state { endgame = true })
478 end.
480 %%--------------------------------------------------------------------
481 %% Function: queue_items/2
482 %% Args: ChunkList ::= [CompactChunk | ExplicitChunk]
483 %% S ::= #state
484 %% CompactChunk ::= {PieceNumber, ChunkList}
485 %% ExplicitChunk ::= {PieceNumber, Offset, Size, Ops}
486 %% ChunkList ::= [{Offset, Size, Ops}]
487 %% PieceNumber, Offset, Size ::= integer()
488 %% Ops ::= file_operations - described elsewhere.
489 %% Description: Send chunk messages for each chunk we decided to queue.
490 %% also add these chunks to the piece request set.
491 %%--------------------------------------------------------------------
492 queue_items(ChunkList, S) ->
493 RSet = queue_items(ChunkList, S#state.send_pid, S#state.remote_request_set),
494 {ok, S#state { remote_request_set = RSet }}.
496 queue_items([], _SendPid, Tree) -> Tree;
497 queue_items([{Pn, Chunks} | Rest], SendPid, Tree) ->
498 NT = lists:foldl(
499 fun ({Offset, Size, Ops}, T) ->
500 case gb_trees:is_defined({Pn, Offset, Size}, T) of
501 true ->
502 Tree;
503 false ->
504 etorrent_t_peer_send:local_request(SendPid,
505 {Pn, Offset, Size}),
506 gb_trees:enter({Pn, Offset, Size}, Ops, T)
508 end,
509 Tree,
510 Chunks),
511 queue_items(Rest, SendPid, NT);
512 queue_items([{Pn, Offset, Size, Ops} | Rest], SendPid, Tree) ->
513 NT = case gb_trees:is_defined({Pn, Offset, Size}, Tree) of
514 true ->
515 Tree;
516 false ->
517 etorrent_t_peer_send:local_request(SendPid,
518 {Pn, Offset, Size}),
519 gb_trees:enter({Pn, Offset, Size}, Ops, Tree)
520 end,
521 queue_items(Rest, SendPid, NT).
523 %%--------------------------------------------------------------------
524 %% Function: complete_connection_setup() -> gen_server_reply()}
525 %% Description: Do the bookkeeping needed to set up the peer:
526 %% * enable passive messaging mode on the socket.
527 %% * Start the send pid
528 %% * Send off the bitfield
529 %%--------------------------------------------------------------------
530 complete_connection_setup(S) ->
531 {ok, SendPid} = etorrent_t_peer_sup:add_sender(S#state.parent,
532 S#state.tcp_socket,
533 S#state.file_system_pid,
534 S#state.torrent_id,
535 self()),
536 BF = etorrent_piece_mgr:bitfield(S#state.torrent_id),
537 etorrent_t_peer_send:bitfield(SendPid, BF),
539 {noreply, S#state{send_pid = SendPid}, 0}.
541 statechange_interested(S, What) ->
542 etorrent_t_peer_send:interested(S#state.send_pid),
543 S#state{local_interested = What}.
546 %%--------------------------------------------------------------------
547 %% Function: handle_read_from_socket(State, Packet) -> NewState
548 %% Packet ::= binary()
549 %% State ::= #state()
550 %% Description: Packet came in. Handle it.
551 %%--------------------------------------------------------------------
552 handle_read_from_socket(S, <<>>) ->
553 {noreply, S, 0};
554 handle_read_from_socket(S, <<0:32/big-integer, Rest/binary>>) when S#state.packet_left =:= none ->
555 handle_read_from_socket(S, Rest);
556 handle_read_from_socket(S, <<Left:32/big-integer, Rest/binary>>) when S#state.packet_left =:= none ->
557 handle_read_from_socket(S#state { packet_left = Left,
558 packet_iolist = []}, Rest);
559 handle_read_from_socket(S, Packet) when is_binary(S#state.packet_left) ->
560 H = S#state.packet_left,
561 handle_read_from_socket(S#state { packet_left = none },
562 <<H/binary, Packet/binary>>);
563 handle_read_from_socket(S, Packet) when size(Packet) < 4, S#state.packet_left =:= none ->
564 {noreply, S#state { packet_left = Packet }};
565 handle_read_from_socket(S, Packet)
566 when size(Packet) >= S#state.packet_left, is_integer(S#state.packet_left) ->
567 Left = S#state.packet_left,
568 <<Data:Left/binary, Rest/binary>> = Packet,
569 Left = size(Data),
570 P = iolist_to_binary(lists:reverse([Data | S#state.packet_iolist])),
571 {Msg, Rate, Amount} = etorrent_peer_communication:recv_message(S#state.rate, P),
572 case Msg of
573 {piece, _, _ ,_} ->
574 ok = etorrent_rate_mgr:recv_rate(S#state.torrent_id,
575 self(), Rate#peer_rate.rate,
576 Amount, last_update);
577 _Msg ->
578 ok = etorrent_rate_mgr:recv_rate(S#state.torrent_id,
579 self(), Rate#peer_rate.rate,
580 Amount, normal)
581 end,
582 case handle_message(Msg, S#state {rate = Rate}) of
583 {ok, NS} ->
584 handle_read_from_socket(NS#state { packet_left = none,
585 packet_iolist = []},
586 Rest);
587 {stop, Err, NS} ->
588 {stop, Err, NS}
589 end;
590 handle_read_from_socket(S, Packet)
591 when size(Packet) < S#state.packet_left, is_integer(S#state.packet_left) ->
592 {noreply, S#state { packet_iolist = [Packet | S#state.packet_iolist],
593 packet_left = S#state.packet_left - size(Packet) }, 0}.
595 broadcast_queue_pieces(TorrentId) ->
596 Peers = etorrent_peer:all(TorrentId),
597 lists:foreach(fun (P) ->
598 etorrent_t_peer_recv:queue_pieces(P#peer.pid)
599 end,
600 Peers).
602 broadcast_got_chunk(Chunk, TorrentId) ->
603 Peers = etorrent_peer:all(TorrentId),
604 lists:foreach(fun (Peer) ->
605 etorrent_t_peer_recv:endgame_got_chunk(Peer#peer.pid, Chunk)
606 end,
607 Peers).