1 (* Copyright 2001, 2002 b52_simon :), b8_bavard, b8_fee_carabine, INRIA *)
3 This file is part of mldonkey.
5 mldonkey is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation; either version 2 of the License, or
8 (at your option) any later version.
10 mldonkey is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 GNU General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with mldonkey; if not, write to the Free Software
17 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
21 (** Functions used in client<->client communication
24 (** A peer (or client) is always a remote peer in this file.
25 A Piece is a portion of the file associated with a hash (sha1).
26 In mldonkey a piece is referred as a block inside the swarming system.
27 A SubPiece is a portion of a piece (without hash) which can be
28 sent/downloaded to/from a peer.
29 In mldonkey a SubPiece is referred as a range inside the swarming system.
30 @see <http://wiki.theory.org/index.php/BitTorrentSpecification> wiki for some
31 unofficial (but more detailed) specs.
41 open TcpBufferedSocket
48 open CommonInteractive
50 open CommonComplexOptions
67 module VB
= VerificationBitmap
69 let http_ok = "HTTP 200 OK"
70 let http11_ok = "HTTP/1.1 200 OK"
73 let next_uploaders = ref ([] : BTTypes.client list
)
74 let current_uploaders = ref ([] : BTTypes.client list
)
78 In this function we connect to a tracker.
79 @param file The file concerned by the request
80 @param url Url of the tracker to connect
81 @param event Event (as a string) to send to the tracker :
82 can be 'completed' if the file is complete, 'started' for the first
83 connection to this tracker or 'stopped' for a clean stop of the file.
84 Everything else will be ok for a second connection to the tracker.
85 Be careful to the spelling of this event
86 @param f The function used to parse the result of the connection.
87 The function will get a file as an argument (@see talk_to_tracker
90 If we have less than !!ask_tracker_threshold sources
91 and if we respect the file_tracker_interval then
92 we really ask sources to the tracker
94 let connect_trackers file event need_sources f
=
96 (* reset session statistics when sending 'started' event *)
97 if event
= "started" then
99 file
.file_session_uploaded
<- Int64.zero
;
100 file
.file_session_downloaded
<- Int64.zero
;
103 let args,must_check_delay
, left
=
105 match file
.file_swarmer
with
109 | "started" -> [("event", "started")],true,zero
110 | "stopped" -> [("event", "stopped")],false,zero
115 let local_downloaded = CommonSwarming.downloaded swarmer
in
116 let left = file_size file
-- local_downloaded in
118 | "completed" -> [("event", "completed")],false,zero
119 | "started" -> [("event", "started")],true,left
120 | "stopped" -> [("event", "stopped")],false,left
124 let args = ("no_peer_id", "1") :: ("compact", "1") :: args in
126 if not need_sources
then
127 ("numwant", "0") :: args
128 else if !!numwant
> -1 then
129 ("numwant", string_of_int
!!numwant
) :: args
133 let args = if !!send_key
then
134 ("key", Sha1.to_hexa
!!client_uid
) :: args else args
136 let args = if !!force_client_ip
then
137 ("ip", Ip.to_string
!!set_client_ip
) :: args else args
140 ("info_hash", Sha1.direct_to_string file
.file_id
) ::
141 ("peer_id", Sha1.direct_to_string
!!client_uid
) ::
142 ("port", string_of_int
!!client_port
) ::
143 ("uploaded", Int64.to_string file
.file_session_uploaded
) ::
144 ("downloaded", Int64.to_string file
.file_session_downloaded
) ::
145 ("left", Int64.to_string
left) ::
149 let enabled_trackers =
150 let enabled_trackers = List.filter
(fun t
-> tracker_is_enabled t
) file
.file_trackers
in
151 if enabled_trackers <> [] then enabled_trackers
153 (* if there is no tracker left, do something ? *)
154 if !verbose_msg_servers
then
155 lprintf_nl
"No trackers left for %s, reenabling all of them..." (file_best_name
(as_file file
));
157 match t
.tracker_status
with
158 (* only re-enable after normal error *)
159 | Disabled _
-> t
.tracker_status
<- Enabled
160 | _
-> ()) file
.file_trackers
;
161 let enabled_trackers = List.filter
(fun t
-> tracker_is_enabled t
) file
.file_trackers
in
162 if enabled_trackers = [] && (file_state file
) <> FilePaused
then
164 file_pause
(as_file file
) (CommonUserDb.admin_user
());
165 lprintf_file_nl
(as_file file
) "Paused %s, no usable trackers left" (file_best_name
(as_file file
))
172 (* if we have too few sources we may ask the tracker before the interval *)
173 if not must_check_delay
174 || not file
.file_tracker_connected
175 || t
.tracker_last_conn
+ t
.tracker_interval
< last_time
()
176 || ( file
.file_clients_num
< !!ask_tracker_threshold
177 && (file_state file
) == FileDownloading
178 && (if t
.tracker_min_interval
> !!min_tracker_reask_interval
then
179 t
.tracker_last_conn
+ t
.tracker_min_interval
< last_time
()
181 t
.tracker_last_conn
+ !!min_tracker_reask_interval
< last_time
() ))
184 (* if we already tried to connect but failed, disable tracker, but allow re-enabling *)
185 if file
.file_tracker_connected
&& t
.tracker_last_clients_num
= 0 && t
.tracker_last_conn
< 1 then
187 if !verbose_msg_servers
then
188 lprintf_nl
"Request error from tracker: disabling %s" t
.tracker_url
;
189 t
.tracker_status
<- Disabled
(intern
"MLDonkey: Request error from tracker")
191 (* Send request to tracker *)
193 let args = if String.length t
.tracker_id
> 0 then
194 ("trackerid", t
.tracker_id
) :: args else args
196 let args = if String.length t
.tracker_key
> 0 then
197 ("key", t
.tracker_key
) :: args else args
199 if !verbose_msg_servers
then
200 lprintf_nl
"connect_trackers: tracker_connected:%s id:%s key:%s last_clients:%i last_conn-last_time:%i file: %s"
201 (string_of_bool file
.file_tracker_connected
)
202 t
.tracker_id t
.tracker_key t
.tracker_last_clients_num
203 (t
.tracker_last_conn
- last_time
()) file
.file_name
;
205 let module H
= Http_client
in
206 let url = t
.tracker_url
in
209 H.req_url
= Url.of_string ~
args: args url;
210 H.req_proxy
= !CommonOptions.http_proxy
;
211 H.req_user_agent
= get_user_agent
();
212 (* #4541 [egs] supports redirect *)
213 H.req_max_retry
= !!max_tracker_redirect
;
216 if !verbose_msg_servers
then
217 lprintf_nl
"Request sent to tracker %s for file: %s"
218 t
.tracker_url file
.file_name
;
221 t
.tracker_last_conn
<- last_time
();
222 file
.file_tracker_connected
<- true;
228 if !verbose_msg_servers
then
229 lprintf_nl
"Request NOT sent to tracker %s - next request in %ds for file: %s"
230 t
.tracker_url
(t
.tracker_interval
- (last_time
() - t
.tracker_last_conn
)) file
.file_name
234 set_client_upload
(as_client c
) (as_file c
.client_file
);
235 set_client_has_a_slot
(as_client c
) NormalSlot
;
236 Rate.update_no_change c
.client_downloaded_rate
;
237 Rate.update_no_change c
.client_upload_rate
;
238 c
.client_last_optimist
<- last_time
();
239 client_enter_upload_queue
(as_client c
);
240 send_client c Unchoke
242 (** In this function we decide which peers will be
243 uploaders. We send a choke message to current uploaders
244 that are not in the next uploaders list. We send Unchoke
245 for clients that are in next list (and not in current)
247 let recompute_uploaders () =
248 if !verbose_upload
then lprintf_nl
"recompute_uploaders";
249 next_uploaders := choose_uploaders current_files
;
250 (*Send choke if a current_uploader is not in next_uploaders*)
251 List.iter
( fun c
-> if ((List.mem c
!next_uploaders)==false) then
253 set_client_has_a_slot
(as_client c
) NoSlot
;
254 (*we will let him finish his download and choke him on next_request*)
256 ) !current_uploaders;
258 (*don't send Choke if new uploader is already an uploaders *)
260 if not
(List.mem c
!current_uploaders) then start_upload c
262 current_uploaders := !next_uploaders
265 (****** Fabrice: why are clients which are disconnected removed ???
266 These clients might still be useful to reconnect to, no ? *)
269 (** This function is called when a client is disconnected
270 (be it by our side or its side).
271 A client which disconnects (even only one time) is discarded.
272 If it's an uploader which disconnects we recompute uploaders
273 (see recompute_uploaders) immediately.
274 @param c The client to disconnect
275 @param reason The reason for the disconnection (see in BasicSocket.ml)
277 let disconnect_client c reason
=
278 if !verbose_msg_clients
then
279 lprintf_nl
"Client %d: disconnected: %s" (client_num c
) (string_of_reason reason
);
281 match c
.client_sock
with
283 | ConnectionWaiting token
->
285 c
.client_sock
<- NoConnection
289 (* List.iter (fun r -> CommonSwarming.free_range r) c.client_ranges; *)
290 set_client_disconnected c reason
;
291 c
.client_session_downloaded
<- 0L;
292 c
.client_session_uploaded
<- 0L;
293 (try if c
.client_good
then count_seen c
with _
-> ());
294 (* this is not useful already done in the match
295 (try close sock reason with _ -> ()); *)
296 (*---------not needed ?? VvvvvV---------------
297 c.client_ranges <- [];
298 c.client_block <- None;
299 if not c.client_good then
300 connection_failed c.client_connection_control;
301 c.client_good <- false;
302 c.client_sock <- NoConnection;
303 c.client_chunks <- [];
304 c.client_allowed_to_write <- zero;
305 c.client_new_chunks <- [];
306 c.client_interesting <- false;
307 c.client_alrd_sent_interested <- false;
308 -------------------^^^^^--------------------*)
309 if (c
.client_registered_bitfield
) then
311 match c
.client_uploader
with
314 c
.client_uploader
<- None
;
315 (* If the client registered a bitfield then
316 we must unregister him to update the swarmer
317 (Useful for availability)
319 CommonSwarming.unregister_uploader up
320 (* c.client_registered_bitfield <- false;
321 for i = 0 to String.length c.client_bitmap - 1 do
322 c.client_bitmap.[0] <- '0';
325 (* Don't test if a client have an upload slot because
326 it don't have one (removed during earlier in
327 set_client_disconnected c reason)
329 if (List.mem c
!current_uploaders) then
331 (*BTGlobals.remove_client*)
333 recompute_uploaders ();
341 (** Disconnect all clients of a file
342 @param file The file to which we must disconnects all clients
344 let disconnect_clients file
=
345 let must_keep = ref true in
346 (match file_state file
with
347 | FilePaused
| FileCancelled
-> must_keep:=false
350 Hashtbl.iter
(fun _ c
->
351 if not
( !must_keep && (client_has_a_slot
(as_client c
) || c
.client_interested
)) then
353 if !verbose_msg_clients
then
354 lprintf_file_nl
(as_file file
) "disconnect since download is finished";
355 disconnect_client c Closed_by_user
360 (** What to do when a file is finished
361 @param file the finished file
363 let download_finished file
=
364 if List.memq file
!current_files
then
366 connect_trackers file
"completed" false (fun _ _
-> ()); (*must be called before swarmer gets removed from file*)
367 (*CommonComplexOptions.file_completed*)
368 file_completed
(as_file file
);
369 (* Remove the swarmer for this file as it is not useful anymore... *)
370 CommonSwarming.remove_swarmer file
.file_swarmer
;
371 file
.file_swarmer
<- None
;
372 (* At this point, the file state is FileDownloaded. We should not remove
373 the file, because we continue to upload. *)
377 (** Check if a file is finished or not.
378 A file is finished if all blocks are verified.
379 @param file The file to check status
381 let check_finished swarmer file
=
382 if CommonSwarming.check_finished swarmer
then
383 download_finished file
385 let bits = [| 128; 64; 32;16;8;4;2;1 |]
387 (* Check/set bits in strings (bittorrent format) *)
390 (Char.code s
.[n
lsr 3]) land bits.(n
land 7) <> 0
394 s
.[i] <- Char.unsafe_chr
(Char.code s
.[i] lor bits.(n
land 7))
396 (* Official client seems to use max_range_request 5 and max_range_len 2^14 *)
397 (* How much requests in the 'pipeline' *)
398 let max_range_requests = 5
399 (* How much bytes we can request in one Piece *)
402 (** A wrapper to send Interested message to a client.
403 (Send interested only if needed)
404 @param c The client to send Interested
406 let send_interested c
=
407 if c
.client_interesting
&& (not c
.client_alrd_sent_interested
) then
409 c
.client_alrd_sent_interested
<- true;
410 send_client c Interested
414 (** Send a Bitfield message to a client.
415 @param c The client to send the Bitfield message
418 let send_bitfield c
=
419 send_client c
(BitField
421 match c
.client_file
.file_swarmer
with
423 (* This must be a seeded file... *)
424 if !verbose_download
then
425 lprintf_nl
"Sending completed verified bitmap";
426 let nchunks = Array.length c
.client_file
.file_chunks
in
427 let len = (nchunks+7)/8 in
428 let s = String.make
len '
\000'
in
429 for i = 0 to nchunks - 1 do
434 let bitmap = CommonSwarming.chunks_verified_bitmap swarmer
in
435 if !verbose_download
then
436 lprintf_nl
"Sending verified bitmap: [%s]" (VB.to_string
bitmap);
437 let len = (VB.length
bitmap + 7)/8 in
438 let s = String.make
len '
\000'
in
440 if c
= VB.State_verified
then set_bit s i) bitmap;
446 let parse_reserved rbits c
=
447 let has_bit pos h
= Char.code rbits
.[pos
] land h
<> 0 in
449 c
.client_dht
<- has_bit 7 0x01;
450 c
.client_cache_extension
<- has_bit 7 0x02;
451 c
.client_fast_extension
<- has_bit 7 0x04;
453 c
.client_utorrent_extension
<- has_bit 5 0x10;
455 c
.client_azureus_messaging_protocol
<- has_bit 0 0x80
458 (** This function is called to parse the first message that
460 @param counter client num
461 @param cc Expected client (probably useless now that we don't save any client)
462 @param init_sent A boolean to know if we sent this client the handshake message
463 @param gconn Don't know
464 @param sock The socket we use for this client
465 @param proto Unused (required by tuple type?)
466 @param file_id The file hash (sha1) of the file involved in this exchange
468 (* removed: @param peer_id The hash (sha1) of the client. (Should be checked)
470 let rec client_parse_header counter cc init_sent gconn sock
471 (proto
, rbits
, file_id
) =
473 set_lifetime sock
600.;
474 if !verbose_msg_clients
then
475 lprintf_nl
"client_parse_header %d" counter;
477 let file = Hashtbl.find files_by_uid file_id
in
478 if !verbose_msg_clients
then
479 lprintf_file_nl
(as_file
file) "file found";
480 let ccc, cc_country_code
= !cc
in
484 let c = new_client
file Sha1.null
(TcpBufferedSocket.peer_addr sock
) cc_country_code
in
485 if !verbose_connect
then lprintf_file_nl
(as_file
file) "Client %d: incoming connection" (client_num
c);
486 cc
:= (Some
c), cc_country_code
;
489 (* Does it happen that this c was already used to connect successfully?
490 If yes then this must happen: *)
491 c.client_received_peer_id
<- false;
492 if cc_country_code
<> None
&& c.client_country_code
= None
then
493 c.client_country_code
<- cc_country_code
;
495 (* client could have had Sha1.null as peer_id/uid *)
496 (* this is to be done, later
497 if c.client_uid <> peer_id then
498 c.client_software <- (parse_software (Sha1.direct_to_string peer_id));
502 (* if c.client_uid <> peer_id then begin
503 lprintf "Unexpected client by UID\n";
504 let ccc = new_client file peer_id (TcpBufferedSocket.host sock) in
505 lprintf "CLIENT %d: testing instead of %d\n"
506 (client_num ccc) (client_num c);
507 (match ccc.client_sock with
509 lprintf_nl "[BT]: This client is already connected";
510 close sock (Closed_for_error "Already connected");
514 lprintf_nl "[BT]: Client %d: recovered by UID" (client_num ccc);
522 if !verbose_msg_clients
then begin
523 let (ip
,port
) = c.client_host
in
524 lprintf_nl
"Client %d: Connected from %s:%d" (client_num
c)
525 (Ip.to_string ip
) port
;
528 parse_reserved rbits
c;
530 (match c.client_sock
with
532 if !verbose_msg_clients
then begin
533 let (ip
,port
) = c.client_host
in
534 lprintf_nl
"No connection to client (%s:%d)!!!" (Ip.to_string ip
) port
;
536 c.client_sock
<- Connection sock
537 | ConnectionWaiting token
->
539 if !verbose_msg_clients
then
540 lprintf_nl
"Waiting for connection to client !!!";
541 c.client_sock
<- Connection sock
542 | Connection
s when s != sock
->
543 if !verbose_msg_clients
then
544 lprintf_nl
"CLIENT %d: IMMEDIATE RECONNECTION" (client_num
c);
545 disconnect_client c (Closed_for_error
"Reconnected");
546 c.client_sock
<- Connection sock
;
550 set_client_state
(c) (Connected
(-1));
551 if not init_sent
then
553 c.client_incoming
<- true;
554 send_init
!!client_uid file_id sock
;
556 connection_ok
c.client_connection_control
;
557 if !verbose_msg_clients
then
558 lprintf_nl
"file and client found";
559 (* if not c.client_incoming then *)
561 c.client_blocks_sent
<- file.file_blocks_downloaded
;
563 TODO !!! : send interested if and only if we are interested
564 -> we must recieve at least other peer bitfield.
565 in common swarmer -> compare : partition -> partition -> bool
568 set_rtimeout sock
!!client_timeout
;
569 (* Once parsed succesfully we define the function client_to_client
570 to be the function used when a message is read *)
571 gconn
.gconn_handler
<- Reader
(fun gconn sock
->
572 bt_handler
TcpMessages.parsing
(client_to_client
c) c sock
575 let b = TcpBufferedSocket.buf sock
in
576 (* The receive buffer is normally not empty now, lets parse the rest, most likely PeerID *)
578 ignore
(bt_handler
TcpMessages.parsing
(client_to_client
c) c sock
);
580 (* Some newer clients send more opcodes in their handshake packet, lets parse them now.
581 Using "while b.len <> 0 do ... done" is not possible here because libtorrent clients
582 send unparsable five extra bytes after their PeerID which would result into a loop *)
584 ignore
(bt_handler
TcpMessages.parsing
(client_to_client
c) c sock
);
588 let (ip
,port
) = (TcpBufferedSocket.peer_addr sock
) in
589 if !verbose_unexpected_messages
then
590 lprintf_nl
"Client %s:%d requested a file that is not shared [%s]"
591 (Ip.to_string ip
) port
(Sha1.to_hexa file_id
)
593 lprintf_nl
"Exception %s in client_parse_header" (Printexc2.to_string e
);
594 close sock
(Closed_for_exception e
);
598 (** Update the bitmap of a client. Unclear if it is still useful.
599 @param c The client which we want to update.
601 and update_client_bitmap
c =
602 let file = c.client_file
in
604 let swarmer = match file.file_swarmer
with
606 | Some
swarmer -> swarmer
610 match c.client_uploader
with
612 let up = CommonSwarming.register_uploader
swarmer (as_client
c)
613 (AvailableIntervals
[]) in
614 c.client_uploader
<- Some
up;
620 let bitmap = match c.client_bitmap
with
622 let len = CommonSwarming.partition_size
swarmer in
623 let bitmap = Bitv.create
len false in
624 c.client_bitmap
<- Some
bitmap;
626 | Some
bitmap -> bitmap
629 if c.client_new_chunks
<> [] then begin
630 let chunks = c.client_new_chunks
in
631 c.client_new_chunks
<- [];
632 List.iter
(fun n
-> Bitv.set
bitmap n
true) chunks;
633 CommonSwarming.update_uploader_intervals
up (AvailableBitv
bitmap);
637 (** In this function we decide which piece we must request from client.
638 @param sock Socket of the client
641 and get_from_client sock
(c: client
) =
642 let file = c.client_file
in
643 (* Check if there's not enough requests in the 'pipeline'
644 and if a request can be send (not choked and file is downloading) *)
645 if List.length
c.client_ranges_sent
< max_range_requests
646 && file_state
file = FileDownloading
647 && (c.client_choked
== false)
649 (* num is the number of the piece, x and y are the position
650 of the subpiece in the piece(!), r is a (CommonSwarmer) range *)
652 let up = match c.client_uploader
with
655 let swarmer = CommonSwarming.uploader_swarmer
up in
661 if !verbose_msg_clients
then
662 lprintf_file_nl
(as_file
file) "CLIENT %d: Finding new range to send" (client_num
c);
664 if !verbose_swarming
then begin
665 lprintf_n
"Current download:\n Current chunks: ";
668 List.iter
(fun (x
,y
) -> lprintf
"%Ld-%Ld " x y
) c.client_chunks
669 with _
-> lprintf
"No Chunks";
673 lprintf_n
"Current ranges: ";
675 List.iter
(fun (p1
,p2
, r) ->
676 let (x
,y
) = CommonSwarming.range_range
r in
677 lprintf
"%Ld-%Ld[%Ld-%Ld] " p1 p2 x y
678 ) c.client_ranges_sent
;
680 match c.client_range_waiting
with
682 | Some
(x
,y
,r) -> lprintf
"Waiting %Ld-%Ld" x y
;
686 lprintf_n
"Current blocks: ";
688 match c.client_chunk
with
689 | None
-> lprintf
"none"
690 | Some
(chunk
, blocks
) -> List.iter
(fun b ->
691 CommonSwarming.print_block
b.up_block
) blocks
;
695 lprintf_file_nl
(as_file
file) "Finding Range:";
700 (*We must find a block to request first, and then
701 some range inside this block
706 match c.client_chunk
with
710 if !verbose_swarming
then lprintf_file_nl
(as_file
file) "No block";
711 update_client_bitmap
c;
712 (try CommonSwarming.verify_one_chunk
swarmer with _
-> ());
713 (*Find a free block in the swarmer*)
714 let chunk, blocks
= CommonSwarming.find_blocks
up in
715 if !verbose_swarming
then begin
716 lprintf_n
"Blocks Found: "; List.iter (fun b ->
717 CommonSwarming.print_block
b.up_block
) blocks
;
720 c.client_chunk
<- Some
(chunk, blocks
);
722 (*We put the found block in client_block to
723 request range in this block. (Useful for
724 not searching each time a new block)
729 | Some
(chunk, blocks
) ->
731 if !verbose_swarming
then begin
732 lprintf_n
"Current Blocks: "; List.iter (fun b ->
733 CommonSwarming.print_block
b.up_block
) blocks
;
738 (*Given a block find a range inside*)
740 match c.client_range_waiting
with
742 c.client_range_waiting
<- None
;
745 CommonSwarming.find_range
up (min max_range_len
file.file_piece_size
)
750 if y
-- x
> max_range_len
then begin
751 c.client_range_waiting
<- Some
(x
++ max_range_len
, y
, r);
752 (x
, x
++ max_range_len
, r)
757 c.client_ranges_sent
<- c.client_ranges_sent
@ [x
,y
, r];
758 (* CommonSwarming.alloc_range r; *)
760 (* naughty, naughty, was computing a block number instead of a chunk
761 number. Only matters with merged downloads, and even then other
762 clients didn't seem to care (?), so the bug remained hidden *)
763 if !verbose_swarming
then
764 lprintf_file_nl
(as_file
file) "Asking %d For Range %Ld-%Ld" chunk x y
;
766 chunk, x
-- file.file_piece_size
** Int64.of_int
chunk, y
-- x
, r
770 (*If we don't find a range to request inside the block,
771 iter to choose another block*)
772 if !verbose_swarming
then
773 lprintf_nl
"Could not find range in current block";
774 (* c.client_blocks <- List2.removeq b c.client_blocks; *)
776 c.client_chunk
<- None
;
784 (*If we don't find a block to request we can check if the
785 file is finished (if there's missing pieces we can't decide
786 that the file is finished because we didn't found
789 if !verbose_swarming
then
790 lprintf_nl
"Unable to get a block !!";
791 CommonSwarming.compute_bitmap
swarmer;
792 check_finished swarmer file;
796 send_client
c (Request
(num,x
,y
));
798 if !verbose_msg_clients
then
799 lprintf_file_nl
(as_file
file) "CLIENT %d: Asking %s For Range %Ld-%Ld"
800 (client_num
c) (Sha1.to_string
c.client_uid
) x y
803 if not
(CommonSwarming.check_finished swarmer) && !verbose_download
then
804 lprintf_file_nl
(as_file
file) "BTClient.get_from_client ERROR: can't find a block to download and file is not yet finished for file : %s..." file.file_name
807 (** In this function we match a message sent by a client
808 and react according to this message.
809 @param c The client which sent us a message
810 @param sock The socket used for this client
811 @param msg The message sent by the client
813 and client_to_client
c sock msg
=
814 if !verbose_msg_clients
then begin
815 let (ip
,port
) = (TcpBufferedSocket.peer_addr sock
) in
816 let (timeout
, next
) = get_rtimeout sock
in
817 lprintf_nl
"CLIENT %d(%s:%d): (%d, %d,%d) Received %s"
818 (client_num
c) (Ip.to_string ip
) port
820 (int_of_float timeout
)
822 (TcpMessages.to_string msg
);
825 let file = c.client_file
in
827 (* Sending the "Have" message was moved to bTGlobals so this is useless *)
828 (* if c.client_blocks_sent != file.file_blocks_downloaded then begin
832 | b :: tail when tail == c.client_blocks_sent ->
833 c.client_blocks_sent <- list;
834 let (num,_,_) = CommonSwarming.block_block b in
835 send_client c (Have (Int64.of_int num))
836 | _ :: tail -> iter tail
838 iter file.file_blocks_downloaded
843 Piece
(num, offset
, s, pos
, len) ->
844 (*A Piece message contains the data*)
845 set_client_state
c (Connected_downloading
(file_num
file));
846 (*flag it as a good client *)
847 c.client_good
<- true;
848 if file_state
file = FileDownloading
then begin
849 let position = offset
++ file.file_piece_size
*.. num in
850 let up = match c.client_uploader
with
853 let swarmer = CommonSwarming.uploader_swarmer
up in
855 if !verbose_msg_clients
then
856 (match c.client_ranges_sent
with
857 [] -> lprintf_file_nl
(as_file
file) "EMPTY Ranges !!!"
859 let (x
,y
) = CommonSwarming.range_range
r in
860 lprintf_file_nl
(as_file
file) "Current range from %s : %Ld [%d] (asked %Ld-%Ld[%Ld-%Ld])"
861 (brand_to_string
c.client_brand
) position len
866 CommonSwarming.downloaded
swarmer in
867 (* List.iter CommonSwarming.free_range c.client_ranges; *)
868 CommonSwarming.received
up
870 (* List.iter CommonSwarming.alloc_range c.client_ranges; *)
872 CommonSwarming.downloaded
swarmer in
874 (*Update rate and amount of data received from client*)
875 count_download
c (new_downloaded -- old_downloaded);
876 (* use len here with max_dr quickfix *)
877 Rate.update
c.client_downloaded_rate ~amount
:len;
878 (* count bytes downloaded from network for this file *)
879 file.file_session_downloaded
<- file.file_session_downloaded
++ (Int64.of_int
len);
880 if !verbose_msg_clients
then
881 (match c.client_ranges_sent
with
882 [] -> lprintf_file_nl
(as_file
file) "EMPTY Ranges !!!"
884 let (x
,y
) = CommonSwarming.range_range
r in
885 lprintf_file_nl
(as_file
file) "Received %Ld [%d] %Ld-%Ld[%Ld-%Ld] -> %Ld"
888 (new_downloaded -- old_downloaded)
891 (* changed 2.5.28 should have been done before !
892 if new_downloaded <> old_downloaded then
893 add_file_downloaded (as_file file)
894 (new_downloaded -- old_downloaded); *)
897 match c.client_ranges_sent
with
900 (* CommonSwarming.free_range r; *)
901 c.client_ranges_sent
<- tail
;
903 get_from_client sock
c;
905 (* Check if the client is still interesting for us... *)
906 check_if_interesting
file c
909 (* Disconnect if that is ourselves. *)
910 c.client_uid
<- Sha1.direct_of_string p
;
911 if not
(c.client_uid
= !!client_uid
) then
913 let brand, release
= parse_software p
in
914 c.client_brand
<- brand;
915 c.client_release
<- release
;
917 c.client_sent_choke
<- true;
920 disconnect_client c Closed_by_user
924 (*A bitfield is a summary of what a client have*)
926 match c.client_file
.file_swarmer
with
929 c.client_new_chunks
<- [];
931 let npieces = CommonSwarming.partition_size
swarmer in
932 let nbits = String.length p
* 8 in
934 if nbits < npieces then begin
935 lprintf_file_nl
(as_file
file) "Error: expected bitfield of atleast %d but got %d" npieces nbits;
936 disconnect_client c (Closed_for_error
"Wrong bitfield length")
939 let bitmap = CommonSwarming.chunks_verified_bitmap
swarmer in
941 for i = 0 to npieces - 1 do
942 if is_bit_set p
i then begin
943 c.client_new_chunks
<- i :: c.client_new_chunks
;
944 match VB.get
bitmap i with
945 | VB.State_missing
| VB.State_partial
->
946 c.client_interesting
<- true
947 | VB.State_complete
| VB.State_verified
-> ()
951 update_client_bitmap
c;
952 c.client_registered_bitfield
<- true;
954 if c.client_interesting
then
957 if !verbose_msg_clients
then
958 lprintf_file_nl
(as_file
file) "New BitField Registered";
960 (* for i = 1 to max_range_requests - List.length c.client_ranges do
961 (try get_from_client sock c with _ -> ())
967 (* Note: a bitfield must only be sent after the handshake and before everything else: NOT here *)
970 (* A client can send a "Have" without sending a Bitfield *)
972 match c.client_file
.file_swarmer
with
975 let n = Int64.to_int
n in
976 let bitmap = CommonSwarming.chunks_verified_bitmap
swarmer in
977 (* lprintf_nl "verified: %c;" (VB.state_to_char (VB.get bitmap n)); *)
978 (* if the peer has a chunk we don't, tell him we're interested and update his bitmap *)
979 match VB.get
bitmap n with
980 | VB.State_missing
| VB.State_partial
->
981 c.client_interesting
<- true;
983 c.client_new_chunks
<- n :: c.client_new_chunks
;
984 update_client_bitmap
c;
985 | VB.State_complete
| VB.State_verified
-> ()
988 match c.client_bitmap, c.client_uploader with
989 Some bitmap, Some up ->
990 let swarmer = CommonSwarming.uploader_swarmer up in
991 let n = Int64.to_int n in
992 if bitmap.[n] <> '1' then
994 let verified = CommonSwarming.verified_bitmap swarmer in
995 if verified.[n] < '2' then begin
996 c.client_interesting <- true;
998 c.client_new_chunks <- n :: c.client_new_chunks;
999 if c.client_block = None then begin
1000 update_client_bitmap c;
1001 (* for i = 1 to max_range_requests -
1002 List.length c.client_ranges do
1003 (try get_from_client sock c with _ -> ())
1007 | None
, Some _
-> lprintf_nl
"no bitmap but client_uploader";
1008 | Some _
, None
->lprintf_nl
"bitmap but no client_uploader";
1009 | None
, None
-> lprintf_nl
"no bitmap no client_uploader";
1015 c.client_interested
<- true;
1019 set_client_state
(c) (Connected
(-1));
1020 (* remote peer will clear the list of range we sent *)
1022 match c.client_uploader
with
1024 (* Afaik this is no protocol violation and happens if the client
1025 didn't send a client bitmap after the handshake. *)
1026 let (ip
,port
) = c.client_host
in
1027 if !verbose_msg_clients
then lprintf_file_nl
(as_file
file) "%s:%d with software %s : Choke send, but no client bitmap"
1028 (Ip.to_string ip
) port
(brand_to_string
c.client_brand
)
1030 CommonSwarming.clear_uploader_intervals
up
1032 c.client_ranges_sent
<- [];
1033 c.client_range_waiting
<- None
;
1034 c.client_choked
<- true;
1038 c.client_interested
<- false;
1042 c.client_choked
<- false;
1043 (* remote peer cleared our request : re-request *)
1044 for i = 1 to max_range_requests -
1045 List.length
c.client_ranges_sent
do
1046 (try get_from_client sock
c with _
-> ())
1050 | Request
(n, pos
, len) ->
1051 if len > max_request_len
then begin
1052 close sock
(Closed_for_error
"Request longer than 1<<16");
1056 if !CommonGlobals.has_upload
= 0 then
1058 if client_has_a_slot
(as_client
c) then
1060 (* lprintf "Received request for upload\n"; *)
1061 (match c.client_upload_requests
with
1063 CommonUploads.ready_for_upload
(as_client
c);
1065 c.client_upload_requests
<- c.client_upload_requests
@ [n,pos
,len];
1066 let file = c.client_file
in
1067 match file.file_shared
with
1071 s.impl_shared_requests
<- s.impl_shared_requests
+ 1;
1072 shared_must_update
(as_shared
s)
1077 send_client
c Choke
;
1078 c.client_sent_choke
<- true;
1079 c.client_upload_requests
<- [];
1084 (* We don't 'generate' a Ping message on a Ping. *)
1086 | Cancel
(n, pos
, len) ->
1087 (* if we receive a cancel message from a peer, remove request *)
1088 if client_has_a_slot
(as_client
c) then
1089 c.client_upload_requests
<- List2.remove_first
(n, pos
, len) c.client_upload_requests
1091 if !verbose_msg_clients
then
1092 lprintf_file_nl
(as_file
file) "Error: received cancel request but client has no slot"
1095 lprintf_file_nl
(as_file
file) "Error %s while handling MESSAGE: %s" (Printexc2.to_string e
) (TcpMessages.to_string msg
)
1098 (** The function used to connect to a client.
1099 The connection is not immediately initiated. It will
1100 be put in a fifo and dequeued according to
1101 !!max_connections_per_second. (@see commonGlobals.ml)
1102 @param c The client we must connect
1104 let connect_client c =
1105 if can_open_connection connection_manager
&&
1106 (let (ip
,port
) = c.client_host
in
1107 match !Ip.banned
(ip
, c.client_country_code
) with
1110 if !verbose_connect
then
1111 lprintf_nl
"%s:%d (%s), blocked: %s"
1112 (Ip.to_string ip
) port
1113 (fst
(Geoip.get_country_code_name
c.client_country_code
))
1117 match c.client_sock
with
1121 add_pending_connection connection_manager
(fun token ->
1123 if !verbose_msg_clients
then
1124 lprintf_nl
"CLIENT %d: connect_client" (client_num
c);
1125 let (ip
,port
) = c.client_host
in
1126 if !verbose_msg_clients
then
1127 lprintf_nl
"connecting %s:%d" (Ip.to_string ip
) port
;
1128 connection_try
c.client_connection_control
;
1130 let sock = connect
token "bittorrent download"
1131 (Ip.to_inet_addr ip
) port
1134 BASIC_EVENT LTIMEOUT
->
1135 if !verbose_msg_clients
then
1136 lprintf_nl
"CLIENT %d: LIFETIME" (client_num
c);
1137 close
sock Closed_for_timeout
1138 | BASIC_EVENT RTIMEOUT
->
1139 if !verbose_msg_clients
then
1140 lprintf_nl
"CLIENT %d: RTIMEOUT (%d)" (client_num
c)
1143 close
sock Closed_for_timeout
1144 | BASIC_EVENT
(CLOSED
r) ->
1146 match c.client_sock
with
1147 | Connection
s when s == sock ->
1148 disconnect_client c r
1154 c.client_sock
<- Connection
sock;
1155 set_lifetime
sock 600.;
1156 TcpBufferedSocket.set_read_controler
sock download_control
;
1157 TcpBufferedSocket.set_write_controler
sock upload_control
;
1158 TcpBufferedSocket.set_rtimeout
sock 30.;
1159 let file = c.client_file
in
1161 if !verbose_msg_clients
then
1162 lprintf_file_nl
(as_file
file) "READY TO DOWNLOAD FILE";
1164 send_init
!!client_uid
file.file_id
sock;
1165 (* Fabrice: Initialize the client bitmap and uploader fields to <> None *)
1166 update_client_bitmap
c;
1167 (* (try get_from_client sock c with _ -> ());*)
1169 (*We 'hook' the client_parse_header function to the socket
1170 This function will then be called when the first message will
1173 set_bt_sock
sock !verbose_msg_clients
1174 (BTHeader
(client_parse_header !counter (ref ((Some
c), c.client_country_code
)) true))
1177 lprintf_nl
"Exception %s while connecting to client"
1178 (Printexc2.to_string e
);
1179 disconnect_client c (Closed_for_exception e
)
1181 (*Since this is a pending connection put ConnectionWaiting
1185 c.client_sock
<- ConnectionWaiting
token
1189 (** The Listen function (very much like in C : TCP Socket Server).
1190 Monitors client connection to us.
1194 let s = TcpServerSocket.create
"bittorrent client server"
1195 (Ip.to_inet_addr
!!client_bind_addr
)
1199 TcpServerSocket.CONNECTION
(s,
1200 Unix.ADDR_INET
(from_ip
, from_port
)) ->
1201 (*Receiving an event TcpServerSocket.CONNECTION from
1202 the TcpServerSocket means that a new client try
1205 let ip = (Ip.of_inet_addr from_ip
) in
1206 let cc = Geoip.get_country_code_option
ip in
1207 if !verbose_sources
> 1 then lprintf_nl
"CONNECTION RECEIVED FROM %s"
1208 (Ip.to_string
(Ip.of_inet_addr from_ip
))
1210 (*Reject this connection if we don't want
1211 to bypass the max_connection parameter
1213 if can_open_connection connection_manager
&&
1214 (match !Ip.banned
(ip, cc) with
1217 if !verbose_connect
then
1218 lprintf_nl
"%s:%d (%s) blocked: %s"
1219 (Ip.to_string
ip) from_port
1220 (fst
(Geoip.get_country_code_name
cc))
1225 let token = create_token connection_manager
in
1226 let sock = TcpBufferedSocket.create
token
1227 "bittorrent client connection" s
1230 BASIC_EVENT
(RTIMEOUT
|LTIMEOUT
) ->
1231 (*monitor read and life timeout on client
1234 close
sock Closed_for_timeout
1238 TcpBufferedSocket.set_read_controler
sock download_control
;
1239 TcpBufferedSocket.set_write_controler
sock upload_control
;
1241 let c = ref (None
, cc) in
1242 TcpBufferedSocket.set_closer
sock (fun _
r ->
1245 match c.client_sock
with
1246 | Connection
s when s == sock ->
1247 disconnect_client c r
1252 set_rtimeout
sock 30.;
1254 (*Again : 'hook' client_parse_header to the socket*)
1255 set_bt_sock
sock !verbose_msg_clients
1256 (BTHeader
(client_parse_header !counter c false));
1259 (*don't forget to close the incoming sock if we can't
1260 open a new connection
1265 listen_sock
:= Some
s;
1268 if !verbose_connect
then
1269 lprintf_nl
"Exception %s while init bittorrent server"
1270 (Printexc2.to_string e
)
1273 (** This function send keepalive messages to all connected clients
1274 (and update socket lifetime)
1277 List.iter (fun file ->
1278 Hashtbl.iter (fun _
c ->
1279 match c.client_sock
with
1280 | Connection
sock ->
1282 set_lifetime
sock 130.;
1290 (** Check each clients for a given file if they are connected.
1291 If they aren't, try to connect them
1293 let resume_clients file =
1294 Hashtbl.iter (fun _
c ->
1296 match c.client_sock
with
1297 | Connection
sock -> ()
1298 (*i think this one is not really usefull for debugging
1299 lprintf_nl "[BT]: RESUME: Client is already connected"; *)
1302 (*test if we can connect client according to the its
1304 Currently the delay between two try is 120 seconds.
1306 if connection_can_try
c.client_connection_control
then
1309 print_control
c.client_connection_control
1312 if !verbose_connect
then
1313 lprintf_file_nl
(as_file
file) "Exception %s in resume_clients" (Printexc2.to_string e
)
1316 (** Check if the value replied by the tracker is correct.
1317 @param key the name of the key
1318 @param n the value to check
1319 @param url Url of the tracker
1320 @param name the name of the file
1322 let chk_keyval key
n url name
=
1323 let int_n = (Int64.to_int
n) in
1324 if !verbose_msg_clients
then
1325 lprintf_nl
"Reply from %s in file: %s has %s: %d" url name key
int_n;
1329 lprintf_nl
"Reply from %s in file: %s has an invalid %s value: %d" url name key
int_n;
1333 (** Check that client is valid and record it *)
1334 let maybe_new_client file id
ip port
=
1335 let cc = Geoip.get_country_code_option
ip in
1336 if id
<> !!client_uid
1339 && (match !Ip.banned
(ip, cc) with
1342 if !verbose_connect
then
1343 lprintf_file_nl
(as_file
file) "%s:%d blocked: %s" (Ip.to_string
ip) port reason
;
1346 ignore
(new_client
file id
(ip,port
) cc);
1347 if !verbose_sources
> 1 then
1348 lprintf_file_nl
(as_file
file) "Received %s:%d" (Ip.to_string
ip) port
;
1351 (** In this function we interact with the tracker
1352 @param file The file for which we want some sources
1353 @param need_sources whether we need any sources
1355 let talk_to_tracker file need_sources
=
1357 (*This is the function which will be called by the http client
1358 for parsing the response
1362 File.to_string filename
1363 with e
-> lprintf_file_nl
(as_file
file) "Empty reply from tracker"; ""
1366 match tracker_reply with
1368 if !verbose_connect
then
1369 lprintf_file_nl
(as_file
file) "Empty reply from tracker";
1371 | _
-> Bencode.decode
tracker_reply
1373 t
.tracker_interval
<- 600;
1374 t
.tracker_min_interval
<- 600;
1375 if need_sources
then t
.tracker_last_clients_num
<- 0;
1378 List.iter (fun (key
,value) ->
1379 (match (key
, value) with
1380 | String
"failure reason", _
-> ()
1381 | _
-> (match t
.tracker_status
with
1382 | Disabled_failure
(i, _
) ->
1383 lprintf_file_nl
(as_file
file) "Received good message from Tracker %s in file: %s after %d bad attempts"
1384 t
.tracker_url
file.file_name
i
1386 (* Received good message from tracker after failures, re-enable tracker *)
1387 t
.tracker_status
<- Enabled
);
1389 match (key
, value) with
1390 | String
"failure reason", String failure
->
1391 (* On failure, disable the tracker, count the attempts and forbid re-enabling *)
1392 t
.tracker_status
<- (match t
.tracker_status
with
1393 | Disabled_failure
(i,_
) -> Disabled_failure
(i + 1, intern failure
)
1394 | _
-> Disabled_failure
(1, intern failure
));
1395 lprintf_file_nl
(as_file
file) "Failure no. %d%s from Tracker %s in file: %s Reason: %s"
1396 (match t
.tracker_status
with | Disabled_failure
(i,_
) -> i | _
-> 1)
1397 (if !!tracker_retries
= 0 then "" else Printf.sprintf
"/%d" !!tracker_retries
)
1398 t
.tracker_url
file.file_name
(Charset.to_utf8 failure
)
1399 | String
"warning message", String warning
->
1400 lprintf_file_nl
(as_file
file) "Warning from Tracker %s in file: %s Reason: %s" t
.tracker_url
file.file_name warning
1401 | String
"interval", Int
n ->
1402 t
.tracker_interval
<- chk_keyval (Bencode.print key
) n t
.tracker_url
file.file_name
;
1403 (* in case we don't receive "min interval" *)
1404 if t
.tracker_min_interval
> t
.tracker_interval
then
1405 t
.tracker_min_interval
<- t
.tracker_interval
1406 | String
"min interval", Int
n ->
1407 t
.tracker_min_interval
<- chk_keyval (Bencode.print key
) n t
.tracker_url
file.file_name
;
1408 (* make sure "min interval" is always < or equal to "interval" *)
1409 if t
.tracker_min_interval
> t
.tracker_interval
then
1410 t
.tracker_min_interval
<- t
.tracker_interval
1411 | String
"downloaded", Int
n ->
1412 t
.tracker_torrent_downloaded
<- chk_keyval (Bencode.print key
) n t
.tracker_url
file.file_name
1413 | String
"complete", Int
n
1414 | String
"done peers", Int
n ->
1415 t
.tracker_torrent_complete
<- chk_keyval (Bencode.print key
) n t
.tracker_url
file.file_name
1416 | String
"incomplete", Int
n ->
1417 t
.tracker_torrent_incomplete
<- chk_keyval (Bencode.print key
) n t
.tracker_url
file.file_name
;
1418 (* if complete > 0 and we receive incomplete we probably won't receive num_peers so we simulate it below *)
1419 if t
.tracker_torrent_complete
> 0 then
1420 t
.tracker_torrent_total_clients_count
<- (t
.tracker_torrent_complete
+ t
.tracker_torrent_incomplete
);
1421 | String
"num peers", Int
n ->
1422 t
.tracker_torrent_total_clients_count
<- chk_keyval (Bencode.print key
) n t
.tracker_url
file.file_name
;
1423 (* if complete > 0 and we receive num_peers we probably won't receive incomplete so we simulate it below *)
1424 if t
.tracker_torrent_complete
> 0 then
1425 t
.tracker_torrent_incomplete
<- (t
.tracker_torrent_total_clients_count
- t
.tracker_torrent_complete
);
1426 | String
"last", Int
n ->
1427 t
.tracker_torrent_last_dl_req
<- chk_keyval (Bencode.print key
) n t
.tracker_url
file.file_name
1428 | String
"key", String
n ->
1430 if !verbose_msg_clients
then
1431 lprintf_file_nl
(as_file
file) "%s in file: %s has key: %s" t
.tracker_url
file.file_name
n
1432 | String
"tracker id", String
n ->
1434 if !verbose_msg_clients
then
1435 lprintf_file_nl
(as_file
file) "%s in file: %s has tracker id %s" t
.tracker_url
file.file_name
n
1437 | String
"peers", List list
->
1438 if need_sources
then
1441 | Dictionary list
->
1442 let peer_id = ref Sha1.null
in
1443 let peer_ip = ref Ip.null
in
1448 String
"peer id", String id
->
1449 peer_id := Sha1.direct_of_string id
;
1450 | String
"ip", String
ip ->
1451 peer_ip := Ip.of_string
ip
1452 | String
"port", Int p
->
1453 port := Int64.to_int p
1457 t
.tracker_last_clients_num
<- t
.tracker_last_clients_num
+ 1;
1458 maybe_new_client file !peer_id !peer_ip !port
1462 | String
"peers", String p
->
1463 let rec iter_comp s pos l
=
1465 let ip = Ip.of_ints
(get_uint8
s pos
,get_uint8
s (pos
+1),
1466 get_uint8
s (pos
+2),get_uint8
s (pos
+3))
1467 and port = get_int16
s (pos
+4)
1469 t
.tracker_last_clients_num
<- t
.tracker_last_clients_num
+ 1;
1470 maybe_new_client file Sha1.null
ip port;
1472 iter_comp s (pos
+6) l
1474 if need_sources
then
1475 iter_comp p
0 (String.length p
)
1476 | String
"private", Int
n -> ()
1477 (* TODO: if set to 1, disable peer exchange *)
1479 | _
-> lprintf_file_nl
(as_file
file) "received unknown entry in answer from tracker: %s : %s" (Bencode.print key
) (Bencode.print
value)
1481 (*Now, that we have added new clients to a file, it's time
1482 to connect to them*)
1483 if !verbose_sources
> 0 then
1484 lprintf_file_nl
(as_file
file) "talk_to_tracker: got %i source(s) for file %s"
1485 t
.tracker_last_clients_num
file.file_name
;
1486 if need_sources
then resume_clients file
1491 if file.file_tracker_connected
then ""
1494 connect_trackers file event need_sources
f
1497 (** Check to see if file is finished, if not
1498 try to get sources for it
1500 let recover_files () =
1501 if !verbose_share
then
1502 lprintf_nl
"recover_files";
1503 List.iter (fun file ->
1504 match file.file_swarmer
with
1507 (try check_finished swarmer file with e
-> ());
1508 match file_state
file with
1510 if !verbose_share
then
1511 lprintf_file_nl
(as_file
file) "recover downloading";
1512 (try talk_to_tracker file true with _
-> ())
1514 if !verbose_share
then
1515 lprintf_file_nl
(as_file
file) "recover shared";
1516 (try talk_to_tracker file false with _
-> ())
1517 | FilePaused
-> () (*when we are paused we do nothing, not even logging this vvvv*)
1518 | s -> lprintf_file_nl
(as_file
file) "recover: Other state %s!!" (string_of_state
s)
1521 let upload_buffer = String.create
100000
1525 Send a Piece message
1526 for one of the request of client
1527 @param sock The socket of the client
1530 let rec iter_upload sock c =
1531 match c.client_upload_requests
with
1533 | (num, pos
, len) :: tail
->
1534 if len = zero
then begin
1535 c.client_upload_requests
<- tail
;
1538 if c.client_allowed_to_write
>= 0L then begin
1540 c.client_upload_requests
<- tail
;
1542 let file = c.client_file
in
1543 let offset = pos
++ file.file_piece_size
*.. num in
1544 c.client_allowed_to_write
<- c.client_allowed_to_write
-- len;
1546 let len = Int64.to_int
len in
1547 (* lprintf "Unix32.read: offset %Ld len %d\n" offset len; *)
1548 Unix32.read
(file_fd
file) offset upload_buffer 0 len;
1549 (* update upload rate from len bytes *)
1550 Rate.update
c.client_upload_rate ~amount
:len;
1551 Rate.update
c.client_downloaded_rate
;
1552 file.file_uploaded
<- file.file_uploaded
++ (Int64.of_int
len);
1553 file.file_session_uploaded
<- file.file_session_uploaded
++ (Int64.of_int
len);
1556 count_filerequest
c;
1557 match file.file_shared
with
1561 s.impl_shared_uploaded
<- file.file_uploaded
;
1562 shared_must_update
(as_shared
s)
1565 (* lprintf "sending piece\n"; *)
1566 send_client
c (Piece
(num, pos
, upload_buffer, 0, len));
1568 with e
-> if !verbose
then lprintf_nl
1569 "Exception %s in iter_upload" (Printexc2.to_string e
)
1572 (* lprintf "client is waiting for another piece\n"; *)
1573 ready_for_upload
(as_client
c)
1578 In this function we check if we can send bytes (according
1579 to bandwidth control), if we can, call iter_upload to
1580 send a Piece message
1581 @param c the client to which we can send some bytes
1582 @param allowed the amount of bytes we can send to client
1584 let client_can_upload c allowed
=
1585 (* lprintf "allowed to upload %d\n" allowed; *)
1586 do_if_connected
c.client_sock
(fun sock ->
1587 match c.client_upload_requests
with
1590 let new_allowed_to_write =
1591 c.client_allowed_to_write
++ (Int64.of_int allowed
) in
1592 if allowed
> 0 && can_write_len
sock
1593 (Int64.to_int
new_allowed_to_write)
1595 CommonUploads.consume_bandwidth allowed
;
1596 c.client_allowed_to_write
<- new_allowed_to_write;
1601 let file_resume file =
1603 match t
.tracker_status
with
1604 | Enabled
| Disabled_mld
_ -> ()
1605 | Disabled_failure
_ | Disabled
_ -> t
.tracker_status
<- Enabled
1606 ) file.file_trackers
;
1607 (try talk_to_tracker file true with _ -> ())
1612 Send info to tracker when stopping a file.
1613 @param file the file we want to stop
1615 let file_stop file =
1616 if file.file_tracker_connected
then
1618 connect_trackers file "stopped" false (fun _ _ ->
1619 lprintf_file_nl
(as_file
file) "Tracker return: stopped %s" file.file_name
;
1620 file.file_tracker_connected
<- false)
1627 client_ops
.op_client_can_upload
<- client_can_upload;
1628 file_ops
.op_file_resume
<- file_resume;
1629 file_ops
.op_file_recover
<- file_resume;
1630 file_ops
.op_file_pause
<- (fun file ->
1631 Hashtbl.iter (fun _ c ->
1632 match c.client_sock
with
1633 Connection
sock -> close
sock Closed_by_user
1635 ) file.file_clients
;
1636 (*When a file is paused we consider it is stopped*)
1639 file_ops
.op_file_queue
<- file_ops
.op_file_pause
;
1640 client_ops
.op_client_enter_upload_queue
<- (fun c ->
1641 if !verbose_msg_clients
then
1642 lprintf_nl
"Client %d: client_enter_upload_queue" (client_num
c);
1643 ready_for_upload
(as_client
c));
1644 network
.op_network_connected_servers
<- (fun _ -> []);