Handle CHOKE correctly in the fast extension.
[etorrent.git] / lib / etorrent-1.0 / src / etorrent_t_peer_recv.erl
blob98e6a1e1c6d02a78de591fab1e2302b2bc363043
1 %%%-------------------------------------------------------------------
2 %%% File : etorrent_t_peer_recv.erl
3 %%% Author : Jesper Louis Andersen <jesper.louis.andersen@gmail.com>
4 %%% License : See COPYING
5 %%% Description : Represents a peers receiving of data
6 %%%
7 %%% Created : 19 Jul 2007 by
8 %%% Jesper Louis Andersen <jesper.louis.andersen@gmail.com>
9 %%%-------------------------------------------------------------------
10 -module(etorrent_t_peer_recv).
12 -behaviour(gen_server).
14 -include("etorrent_mnesia_table.hrl").
15 -include("etorrent_rate.hrl").
17 %% API
18 -export([start_link/6, connect/3, choke/1, unchoke/1, interested/1,
19 have/2, complete_handshake/4, endgame_got_chunk/2,
20 queue_pieces/1, stop/1]).
22 %% gen_server callbacks
23 -export([init/1, handle_call/3, handle_cast/2, handle_info/2,
24 terminate/2, code_change/3]).
26 -record(state, { remote_peer_id = none,
27 local_peer_id = none,
28 info_hash = none,
30 fast_extension = false, % Peer uses fast extension
32 pieces_left,
33 seeder = false,
34 tcp_socket = none,
36 remote_choked = true,
38 local_interested = false,
40 remote_request_set = none,
42 piece_set = unknown,
43 piece_request = [],
45 packet_left = none,
46 packet_iolist = [],
48 endgame = false, % Are we in endgame mode?
50 parent = none,
52 file_system_pid = none,
53 send_pid = none,
55 rate = none,
56 rate_timer = none,
57 torrent_id = none}).
59 -define(DEFAULT_CONNECT_TIMEOUT, 120000). % Default timeout in ms
60 -define(DEFAULT_CHUNK_SIZE, 16384). % Default size for a chunk. All clients use this.
61 -define(HIGH_WATERMARK, 15). % How many chunks to queue up to
62 -define(LOW_WATERMARK, 5). % Requeue when there are less than this number of pieces in queue
63 %%====================================================================
64 %% API
65 %%====================================================================
66 %%--------------------------------------------------------------------
67 %% Function: start_link() -> {ok,Pid} | ignore | {error,Error}
68 %% Description: Starts the server
69 %%--------------------------------------------------------------------
70 start_link(LocalPeerId, InfoHash, FilesystemPid, Id, Parent,
71 {IP, Port}) ->
72 gen_server:start_link(?MODULE, [LocalPeerId, InfoHash,
73 FilesystemPid, Id, Parent,
74 {IP, Port}], []).
76 %%--------------------------------------------------------------------
77 %% Function: stop/1
78 %% Args: Pid ::= pid()
79 %% Description: Gracefully ask the server to stop.
80 %%--------------------------------------------------------------------
81 stop(Pid) ->
82 gen_server:cast(Pid, stop).
84 %%--------------------------------------------------------------------
85 %% Function: connect(Pid, IP, Port)
86 %% Description: Connect to the IP and Portnumber for communication with
87 %% the peer. Note we don't handle the connect in the init phase. This is
88 %% due to the fact that a connect may take a considerable amount of time.
89 %% With this scheme, we spawn off processes, and then make them all attempt
90 %% connects in parallel, which is much easier.
91 %%--------------------------------------------------------------------
92 connect(Pid, IP, Port) ->
93 gen_server:cast(Pid, {connect, IP, Port}).
95 %%--------------------------------------------------------------------
96 %% Function: choke(Pid)
97 %% Description: Choke the peer.
98 %%--------------------------------------------------------------------
99 choke(Pid) ->
100 gen_server:cast(Pid, choke).
102 %%--------------------------------------------------------------------
103 %% Function: unchoke(Pid)
104 %% Description: Unchoke the peer.
105 %%--------------------------------------------------------------------
106 unchoke(Pid) ->
107 gen_server:cast(Pid, unchoke).
109 %%--------------------------------------------------------------------
110 %% Function: interested(Pid)
111 %% Description: Tell the peer we are interested.
112 %%--------------------------------------------------------------------
113 interested(Pid) ->
114 gen_server:cast(Pid, interested).
116 %%--------------------------------------------------------------------
117 %% Function: have(Pid, PieceNumber)
118 %% Description: Tell the peer we have just received piece PieceNumber.
119 %%--------------------------------------------------------------------
120 have(Pid, PieceNumber) ->
121 gen_server:cast(Pid, {have, PieceNumber}).
123 %%--------------------------------------------------------------------
124 %% Function: endgame_got_chunk(Pid, Index, Offset) -> ok
125 %% Description: We got the chunk {Index, Offset}, handle it.
126 %%--------------------------------------------------------------------
127 endgame_got_chunk(Pid, Chunk) ->
128 gen_server:cast(Pid, {endgame_got_chunk, Chunk}).
130 %%--------------------------------------------------------------------
131 %% Function: complete_handshake(Pid, Capabilities, Socket, PeerId)
132 %% Description: Complete the handshake initiated by another client.
133 %%--------------------------------------------------------------------
134 complete_handshake(Pid, Capabilities, Socket, PeerId) ->
135 gen_server:cast(Pid, {complete_handshake, Capabilities, Socket, PeerId}).
137 queue_pieces(Pid) ->
138 gen_server:cast(Pid, queue_pieces).
140 %%====================================================================
141 %% gen_server callbacks
142 %%====================================================================
144 %%--------------------------------------------------------------------
145 %% Function: init(Args) -> {ok, State} |
146 %% {ok, State, Timeout} |
147 %% ignore |
148 %% {stop, Reason}
149 %% Description: Initiates the server
150 %%--------------------------------------------------------------------
151 init([LocalPeerId, InfoHash, FilesystemPid, Id, Parent, {IP, Port}]) ->
152 process_flag(trap_exit, true),
153 {ok, TRef} = timer:send_interval(?RATE_UPDATE, self(), rate_update),
154 %% TODO: Update the leeching state to seeding when peer finished torrent.
155 ok = etorrent_peer:new(IP, Port, Id, self(), leeching),
156 ok = etorrent_choker:monitor(self()),
157 [T] = etorrent_torrent:select(Id),
158 {ok, #state{
159 pieces_left = T#torrent.pieces,
160 parent = Parent,
161 local_peer_id = LocalPeerId,
162 remote_request_set = gb_trees:empty(),
163 info_hash = InfoHash,
164 torrent_id = Id,
165 rate = etorrent_rate:init(?RATE_FUDGE),
166 rate_timer = TRef,
167 file_system_pid = FilesystemPid}}.
169 %%--------------------------------------------------------------------
170 %% Function: %% handle_call(Request, From, State) -> {reply, Reply, State} |
171 %% {reply, Reply, State, Timeout} |
172 %% {noreply, State} |
173 %% {noreply, State, Timeout} |
174 %% {stop, Reason, Reply, State} |
175 %% {stop, Reason, State}
176 %% Description: Handling call messages
177 %%--------------------------------------------------------------------
178 handle_call(_Request, _From, State) ->
179 Reply = ok,
180 {reply, Reply, State, 0}.
182 %%--------------------------------------------------------------------
183 %% Function: handle_cast(Msg, State) -> {noreply, State} |
184 %% {noreply, State, Timeout} |
185 %% {stop, Reason, State}
186 %% Description: Handling cast messages
187 %%--------------------------------------------------------------------
188 handle_cast({connect, IP, Port}, S) ->
189 case gen_tcp:connect(IP, Port, [binary, {active, false}],
190 ?DEFAULT_CONNECT_TIMEOUT) of
191 {ok, Socket} ->
192 case etorrent_peer_communication:initiate_handshake(
193 Socket,
194 S#state.local_peer_id,
195 S#state.info_hash) of
196 {ok, _Capabilities, PeerId}
197 when PeerId == S#state.local_peer_id ->
198 {stop, normal, S};
199 {ok, Capabilities, PeerId} ->
200 FastExtension = lists:member(fast_extension, Capabilities),
201 complete_connection_setup(S#state { tcp_socket = Socket,
202 remote_peer_id = PeerId,
203 fast_extension = FastExtension});
204 {error, _} ->
205 {stop, normal, S}
206 end;
207 {error, _Reason} ->
208 {stop, normal, S}
209 end;
210 handle_cast({complete_handshake, Capabilities, Socket, RemotePeerId}, S) ->
211 FastExtension = lists:member(fast_extension, Capabilities),
212 case etorrent_peer_communication:complete_handshake(Socket,
213 S#state.info_hash,
214 S#state.local_peer_id) of
215 ok -> complete_connection_setup(S#state { tcp_socket = Socket,
216 remote_peer_id = RemotePeerId,
217 fast_extension = FastExtension});
218 {error, stop} -> {stop, normal, S}
219 end;
220 handle_cast(choke, S) ->
221 etorrent_t_peer_send:choke(S#state.send_pid),
222 {noreply, S, 0};
223 handle_cast(unchoke, S) ->
224 etorrent_t_peer_send:unchoke(S#state.send_pid),
225 {noreply, S, 0};
226 handle_cast(interested, S) ->
227 {noreply, statechange_interested(S, true), 0};
228 handle_cast({have, PN}, S) when S#state.piece_set =:= unknown ->
229 etorrent_t_peer_send:have(S#state.send_pid, PN),
230 {noreply, S, 0};
231 handle_cast({have, PN}, S) ->
232 case gb_sets:is_element(PN, S#state.piece_set) of
233 true -> ok;
234 false -> ok = etorrent_t_peer_send:have(S#state.send_pid, PN)
235 end,
236 {noreply, S, 0};
237 handle_cast({endgame_got_chunk, Chunk}, S) ->
238 NS = handle_endgame_got_chunk(Chunk, S),
239 {noreply, NS, 0};
240 handle_cast(queue_pieces, S) ->
241 {ok, NS} = try_to_queue_up_pieces(S),
242 {noreply, NS, 0};
243 handle_cast(stop, S) ->
244 {stop, normal, S};
245 handle_cast(_Msg, State) ->
246 {noreply, State, 0}.
248 %%--------------------------------------------------------------------
249 %% Function: handle_info(Info, State) -> {noreply, State} |
250 %% {noreply, State, Timeout} |
251 %% {stop, Reason, State}
252 %% Description: Handling all non call/cast messages
253 %%--------------------------------------------------------------------
254 handle_info(timeout, S) when S#state.tcp_socket =:= none ->
255 %% Haven't started up yet
256 {noreply, S, 3000};
257 handle_info(timeout, S) ->
258 case gen_tcp:recv(S#state.tcp_socket, 0, 3000) of
259 {ok, Packet} ->
260 handle_read_from_socket(S, Packet);
261 {error, closed} ->
262 {stop, normal, S};
263 {error, ebadf} ->
264 {stop, normal, S};
265 {error, ehostunreach} ->
266 {stop, normal, S};
267 {error, etimedout} ->
268 {noreply, S, 0};
269 {error, timeout} when S#state.remote_choked =:= true ->
270 {noreply, S, 0};
271 {error, timeout} when S#state.remote_choked =:= false ->
272 {ok, NS} = try_to_queue_up_pieces(S),
273 {noreply, NS, 0}
274 end;
275 handle_info(rate_update, S) ->
276 Rate = etorrent_rate:update(S#state.rate, 0),
277 ok = etorrent_rate_mgr:recv_rate(S#state.torrent_id, self(), Rate#peer_rate.rate, 0),
278 {noreply, S#state { rate = Rate}, 0};
279 handle_info(_Info, State) ->
280 {noreply, State, 0}.
282 %%--------------------------------------------------------------------
283 %% Function: terminate(Reason, State) -> void()
284 %% Description: This function is called by a gen_server when it is about to
285 %% terminate. It should be the opposite of Module:init/1 and do any necessary
286 %% cleaning up. When it returns, the gen_server terminates with Reason.
287 %% The return value is ignored.
288 %%--------------------------------------------------------------------
289 terminate(Reason, S) ->
290 etorrent_peer:delete(self()),
291 etorrent_counters:release_peer_slot(),
292 _NS = unqueue_all_pieces(S),
293 case S#state.tcp_socket of
294 none ->
296 Sock ->
297 gen_tcp:close(Sock)
298 end,
299 case Reason of
300 normal -> ok;
301 shutdown -> ok;
302 _ -> error_logger:info_report([reason_for_termination, Reason])
303 end,
306 %%--------------------------------------------------------------------
307 %% Func: code_change(OldVsn, State, Extra) -> {ok, NewState}
308 %% Description: Convert process state when code is changed
309 %%--------------------------------------------------------------------
310 code_change(_OldVsn, State, _Extra) ->
311 {ok, State}.
313 %%--------------------------------------------------------------------
314 %%% Internal functions
315 %%--------------------------------------------------------------------
317 %%--------------------------------------------------------------------
318 %% Func: handle_message(Msg, State) -> {ok, NewState} | {stop, Reason, NewState}
319 %% Description: Process an incoming message Msg from the wire. Return either
320 %% {ok, S} if the processing was ok, or {stop, Reason, S} in case of an error.
321 %%--------------------------------------------------------------------
322 handle_message(keep_alive, S) ->
323 {ok, S};
324 handle_message(choke, S) ->
325 ok = etorrent_rate_mgr:choke(S#state.torrent_id, self()),
326 NS = case S#state.fast_extension of
327 true -> S;
328 false -> unqueue_all_pieces(S)
329 end,
330 {ok, NS#state { remote_choked = true }};
331 handle_message(unchoke, S) ->
332 ok = etorrent_rate_mgr:unchoke(S#state.torrent_id, self()),
333 try_to_queue_up_pieces(S#state{remote_choked = false});
334 handle_message(interested, S) ->
335 ok = etorrent_rate_mgr:interested(S#state.torrent_id, self()),
336 ok = etorrent_t_peer_send:check_choke(S#state.send_pid),
337 {ok, S};
338 handle_message(not_interested, S) ->
339 ok = etorrent_rate_mgr:not_interested(S#state.torrent_id, self()),
340 ok = etorrent_t_peer_send:check_choke(S#state.send_pid),
341 {ok, S};
342 handle_message({request, Index, Offset, Len}, S) ->
343 etorrent_t_peer_send:remote_request(S#state.send_pid, Index, Offset, Len),
344 {ok, S};
345 handle_message({cancel, Index, Offset, Len}, S) ->
346 etorrent_t_peer_send:cancel(S#state.send_pid, Index, Offset, Len),
347 {ok, S};
348 handle_message({have, PieceNum}, S) ->
349 peer_have(PieceNum, S);
350 handle_message(have_none, S) when S#state.piece_set =/= unknown ->
351 {stop, normal, S};
352 handle_message(have_none, S) when S#state.fast_extension =:= true ->
353 Size = etorrent_torrent:num_pieces(S#state.torrent_id),
354 {ok, S#state { piece_set = gb_sets:new(),
355 pieces_left = Size,
356 seeder = false}};
357 handle_message(have_all, S) when S#state.piece_set =/= unknown ->
358 {stop, normal, S};
359 handle_message(have_all, S) when S#state.fast_extension =:= true ->
360 Size = etorrent_torrent:num_pieces(S#state.torrent_id),
361 FullSet = gb_sets:from_list(lists:seq(0, Size - 1)),
362 {ok, S#state { piece_set = FullSet,
363 pieces_left = 0,
364 seeder = true }};
365 handle_message({bitfield, _BF}, S) when S#state.piece_set =/= unknown ->
366 {ok, {IP, Port}} = inet:peername(S#state.tcp_socket),
367 etorrent_peer_mgr:enter_bad_peer(IP, Port, S#state.remote_peer_id),
368 {stop, normal, S};
369 handle_message({bitfield, BitField}, S) ->
370 Size = etorrent_torrent:num_pieces(S#state.torrent_id),
371 {ok, PieceSet} =
372 etorrent_peer_communication:destruct_bitfield(Size, BitField),
373 Left = S#state.pieces_left - gb_sets:size(PieceSet),
374 case Left of
375 0 -> ok = etorrent_peer:statechange(self(), seeder);
376 _N -> ok
377 end,
378 case etorrent_piece_mgr:check_interest(S#state.torrent_id, PieceSet) of
379 interested ->
380 {ok, statechange_interested(S#state {piece_set = PieceSet,
381 pieces_left = Left,
382 seeder = Left == 0},
383 true)};
384 not_interested ->
385 {ok, S#state{piece_set = PieceSet, pieces_left = Left,
386 seeder = Left == 0}};
387 invalid_piece ->
388 {stop, {invalid_piece_2, S#state.remote_peer_id}, S}
389 end;
390 handle_message({piece, Index, Offset, Data}, S) ->
391 case handle_got_chunk(Index, Offset, Data, size(Data), S) of
392 {ok, NS} ->
393 try_to_queue_up_pieces(NS)
394 end;
395 handle_message(Unknown, S) ->
396 error_logger:info_report([{unknown_message, Unknown}]),
397 {stop, normal, S}.
399 %%--------------------------------------------------------------------
400 %% Func: handle_endgame_got_chunk(Index, Offset, S) -> State
401 %% Description: Some other peer just downloaded {Index, Offset, Len} so try
402 %% not to download it here if we can avoid it.
403 %%--------------------------------------------------------------------
404 handle_endgame_got_chunk({Index, Offset, Len}, S) ->
405 case gb_trees:is_defined({Index, Offset, Len}, S#state.remote_request_set) of
406 true ->
407 %% Delete the element from the request set.
408 RS = gb_trees:delete({Index, Offset, Len}, S#state.remote_request_set),
409 etorrent_t_peer_send:cancel(S#state.send_pid,
410 Index,
411 Offset,
412 Len),
413 etorrent_chunk_mgr:endgame_remove_chunk(S#state.send_pid,
414 S#state.torrent_id,
415 {Index, Offset, Len}),
416 S#state { remote_request_set = RS };
417 false ->
418 %% Not an element in the request queue, ignore
419 etorrent_chunk_mgr:endgame_remove_chunk(S#state.send_pid,
420 S#state.torrent_id,
421 {Index, Offset, Len}),
423 end.
425 %%--------------------------------------------------------------------
426 %% Func: handle_got_chunk(Index, Offset, Data, Len, S) -> {ok, State}
427 %% Description: We just got some chunk data. Store it in the mnesia DB
428 %%--------------------------------------------------------------------
429 handle_got_chunk(Index, Offset, Data, Len, S) ->
430 case gb_trees:lookup({Index, Offset, Len},
431 S#state.remote_request_set) of
432 {value, Ops} ->
433 ok = etorrent_fs:write_chunk(S#state.file_system_pid,
434 {Index, Data, Ops}),
435 case etorrent_chunk_mgr:store_chunk(S#state.torrent_id,
436 Index,
437 {Offset, Len},
438 self()) of
439 full ->
440 etorrent_fs:check_piece(S#state.file_system_pid, Index);
441 ok ->
443 end,
444 %% Tell other peers we got the chunk if in endgame
445 case S#state.endgame of
446 true ->
447 case etorrent_chunk_mgr:mark_fetched(S#state.torrent_id,
448 {Index, Offset, Len}) of
449 found ->
451 assigned ->
452 broadcast_got_chunk({Index, Offset, Len}, S#state.torrent_id)
453 end;
454 false ->
456 end,
457 RS = gb_trees:delete_any({Index, Offset, Len}, S#state.remote_request_set),
458 {ok, S#state { remote_request_set = RS }};
459 none ->
460 %% Stray piece, we could try to get hold of it but for now we just
461 %% throw it on the floor.
462 {ok, S}
463 end.
465 %%--------------------------------------------------------------------
466 %% Function: unqueue_all_pieces/1
467 %% Description: Unqueue all queued pieces at the other end. We place
468 %% the earlier queued items at the end to compensate for quick
469 %% choke/unchoke problems and live data.
470 %%--------------------------------------------------------------------
471 unqueue_all_pieces(S) ->
472 %% Put chunks back
473 ok = etorrent_chunk_mgr:putback_chunks(self()),
474 %% Tell other peers that there is 0xf00d!
475 broadcast_queue_pieces(S#state.torrent_id),
476 %% Clean up the request set.
477 S#state{remote_request_set = gb_trees:empty()}.
479 %%--------------------------------------------------------------------
480 %% Function: try_to_queue_up_requests(state()) -> {ok, state()}
481 %% Description: Try to queue up requests at the other end.
482 %%--------------------------------------------------------------------
483 try_to_queue_up_pieces(S) when S#state.remote_choked == true ->
484 {ok, S};
485 try_to_queue_up_pieces(S) ->
486 case gb_trees:size(S#state.remote_request_set) of
487 N when N > ?LOW_WATERMARK ->
488 {ok, S};
489 %% Optimization: Only replenish pieces modulo some N
490 N when is_integer(N) ->
491 PiecesToQueue = ?HIGH_WATERMARK - N,
492 case etorrent_chunk_mgr:pick_chunks(self(),
493 S#state.torrent_id,
494 S#state.piece_set,
495 PiecesToQueue) of
496 not_interested ->
497 {ok, statechange_interested(S, false)};
498 none_eligible ->
499 {ok, S};
500 {ok, Items} ->
501 queue_items(Items, S);
502 {endgame, Items} ->
503 queue_items(Items, S#state { endgame = true })
505 end.
507 %%--------------------------------------------------------------------
508 %% Function: queue_items/2
509 %% Args: ChunkList ::= [CompactChunk | ExplicitChunk]
510 %% S ::= #state
511 %% CompactChunk ::= {PieceNumber, ChunkList}
512 %% ExplicitChunk ::= {PieceNumber, Offset, Size, Ops}
513 %% ChunkList ::= [{Offset, Size, Ops}]
514 %% PieceNumber, Offset, Size ::= integer()
515 %% Ops ::= file_operations - described elsewhere.
516 %% Description: Send chunk messages for each chunk we decided to queue.
517 %% also add these chunks to the piece request set.
518 %%--------------------------------------------------------------------
519 queue_items(ChunkList, S) ->
520 RSet = queue_items(ChunkList, S#state.send_pid, S#state.remote_request_set),
521 {ok, S#state { remote_request_set = RSet }}.
523 queue_items([], _SendPid, Tree) -> Tree;
524 queue_items([{Pn, Chunks} | Rest], SendPid, Tree) ->
525 NT = lists:foldl(
526 fun ({Offset, Size, Ops}, T) ->
527 case gb_trees:is_defined({Pn, Offset, Size}, T) of
528 true ->
529 Tree;
530 false ->
531 etorrent_t_peer_send:local_request(SendPid,
532 {Pn, Offset, Size}),
533 gb_trees:enter({Pn, Offset, Size}, Ops, T)
535 end,
536 Tree,
537 Chunks),
538 queue_items(Rest, SendPid, NT);
539 queue_items([{Pn, Offset, Size, Ops} | Rest], SendPid, Tree) ->
540 NT = case gb_trees:is_defined({Pn, Offset, Size}, Tree) of
541 true ->
542 Tree;
543 false ->
544 etorrent_t_peer_send:local_request(SendPid,
545 {Pn, Offset, Size}),
546 gb_trees:enter({Pn, Offset, Size}, Ops, Tree)
547 end,
548 queue_items(Rest, SendPid, NT).
550 %%--------------------------------------------------------------------
551 %% Function: complete_connection_setup() -> gen_server_reply()}
552 %% Description: Do the bookkeeping needed to set up the peer:
553 %% * enable passive messaging mode on the socket.
554 %% * Start the send pid
555 %% * Send off the bitfield
556 %%--------------------------------------------------------------------
557 complete_connection_setup(S) ->
558 {ok, SendPid} = etorrent_t_peer_sup:add_sender(S#state.parent,
559 S#state.tcp_socket,
560 S#state.file_system_pid,
561 S#state.torrent_id,
562 self()),
563 BF = etorrent_piece_mgr:bitfield(S#state.torrent_id),
564 etorrent_t_peer_send:bitfield(SendPid, BF),
566 {noreply, S#state{send_pid = SendPid}, 0}.
568 statechange_interested(S, What) ->
569 etorrent_t_peer_send:interested(S#state.send_pid),
570 S#state{local_interested = What}.
573 %%--------------------------------------------------------------------
574 %% Function: handle_read_from_socket(State, Packet) -> NewState
575 %% Packet ::= binary()
576 %% State ::= #state()
577 %% Description: Packet came in. Handle it.
578 %%--------------------------------------------------------------------
579 handle_read_from_socket(S, <<>>) ->
580 {noreply, S, 0};
581 handle_read_from_socket(S, <<0:32/big-integer, Rest/binary>>) when S#state.packet_left =:= none ->
582 handle_read_from_socket(S, Rest);
583 handle_read_from_socket(S, <<Left:32/big-integer, Rest/binary>>) when S#state.packet_left =:= none ->
584 handle_read_from_socket(S#state { packet_left = Left,
585 packet_iolist = []}, Rest);
586 handle_read_from_socket(S, Packet) when is_binary(S#state.packet_left) ->
587 H = S#state.packet_left,
588 handle_read_from_socket(S#state { packet_left = none },
589 <<H/binary, Packet/binary>>);
590 handle_read_from_socket(S, Packet) when size(Packet) < 4, S#state.packet_left =:= none ->
591 {noreply, S#state { packet_left = Packet }};
592 handle_read_from_socket(S, Packet)
593 when size(Packet) >= S#state.packet_left, is_integer(S#state.packet_left) ->
594 Left = S#state.packet_left,
595 <<Data:Left/binary, Rest/binary>> = Packet,
596 Left = size(Data),
597 P = iolist_to_binary(lists:reverse([Data | S#state.packet_iolist])),
598 {Msg, Rate, Amount} = etorrent_peer_communication:recv_message(S#state.rate, P),
599 case Msg of
600 {piece, _, _ ,_} ->
601 ok = etorrent_rate_mgr:recv_rate(S#state.torrent_id,
602 self(), Rate#peer_rate.rate,
603 Amount, last_update);
604 _Msg ->
605 ok = etorrent_rate_mgr:recv_rate(S#state.torrent_id,
606 self(), Rate#peer_rate.rate,
607 Amount, normal)
608 end,
609 case handle_message(Msg, S#state {rate = Rate}) of
610 {ok, NS} ->
611 handle_read_from_socket(NS#state { packet_left = none,
612 packet_iolist = []},
613 Rest);
614 {stop, Err, NS} ->
615 {stop, Err, NS}
616 end;
617 handle_read_from_socket(S, Packet)
618 when size(Packet) < S#state.packet_left, is_integer(S#state.packet_left) ->
619 {noreply, S#state { packet_iolist = [Packet | S#state.packet_iolist],
620 packet_left = S#state.packet_left - size(Packet) }, 0}.
622 broadcast_queue_pieces(TorrentId) ->
623 Peers = etorrent_peer:all(TorrentId),
624 lists:foreach(fun (P) ->
625 etorrent_t_peer_recv:queue_pieces(P#peer.pid)
626 end,
627 Peers).
630 broadcast_got_chunk(Chunk, TorrentId) ->
631 Peers = etorrent_peer:all(TorrentId),
632 lists:foreach(fun (Peer) ->
633 etorrent_t_peer_recv:endgame_got_chunk(Peer#peer.pid, Chunk)
634 end,
635 Peers).
637 peer_have(PN, S) when S#state.piece_set =:= unknown ->
638 peer_have(PN, S#state {piece_set = gb_sets:new()});
639 peer_have(PN, S) ->
640 case etorrent_piece_mgr:valid(S#state.torrent_id, PN) of
641 true ->
642 Left = S#state.pieces_left - 1,
643 case peer_seeds(S#state.torrent_id, Left) of
644 ok ->
645 PieceSet = gb_sets:add_element(PN, S#state.piece_set),
646 NS = S#state{piece_set = PieceSet,
647 pieces_left = Left,
648 seeder = Left == 0},
649 case etorrent_piece_mgr:interesting(S#state.torrent_id, PN) of
650 true when S#state.local_interested =:= true ->
651 try_to_queue_up_pieces(S);
652 true when S#state.local_interested =:= false ->
653 try_to_queue_up_pieces(statechange_interested(S, true));
654 false ->
655 {ok, NS}
656 end;
657 {stop, R} -> {stop, R, S}
658 end;
659 false ->
660 {ok, {IP, Port}} = inet:peername(S#state.tcp_socket),
661 etorrent_peer_mgr:enter_bad_peer(IP, Port, S#state.remote_peer_id),
662 {stop, normal, S}
663 end.
665 peer_seeds(Id, 0) ->
666 ok = etorrent_peer:statechange(self(), seeder),
667 [T] = etorrent_torrent:select(Id),
668 case T#torrent.state of
669 seeding -> {stop, normal};
670 _ -> ok
671 end;
672 peer_seeds(_Id, _N) -> ok.