patch #7139
[mldonkey.git] / src / networks / bittorrent / bTClients.ml
blobe95fe8c4c36f9952a4f845fc62983cb7c6725e1c
1 (* Copyright 2001, 2002 b52_simon :), b8_bavard, b8_fee_carabine, INRIA *)
2 (*
3 This file is part of mldonkey.
5 mldonkey is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation; either version 2 of the License, or
8 (at your option) any later version.
10 mldonkey is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 GNU General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with mldonkey; if not, write to the Free Software
17 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
21 (** Functions used in client<->client communication
24 (** A peer (or client) is always a remote peer in this file.
25 A Piece is a portion of the file associated with a hash (sha1).
26 In mldonkey a piece is referred as a block inside the swarming system.
27 A SubPiece is a portion of a piece (without hash) which can be
28 sent/downloaded to/from a peer.
29 In mldonkey a SubPiece is referred as a range inside the swarming system.
30 @see <http://wiki.theory.org/index.php/BitTorrentSpecification> wiki for some
31 unofficial (but more detailed) specs.
34 open Int64ops
35 open AnyEndian
36 open BigEndian
37 open Printf2
38 open Md4
39 open Options
40 open BasicSocket
41 open TcpBufferedSocket
42 open Ip_set
44 open CommonShared
45 open CommonUploads
46 open CommonOptions
47 open CommonDownloads
48 open CommonInteractive
49 open CommonClient
50 open CommonComplexOptions
51 open CommonTypes
52 open CommonFile
53 open CommonSwarming
54 open CommonGlobals
55 open CommonDownloads
57 open BTRate
58 open BTTypes
59 open BTProtocol
60 open BTOptions
61 open BTGlobals
62 open BTComplexOptions
63 open BTChooser
64 open BTStats
65 open TcpMessages
67 module VB = VerificationBitmap
69 let http_ok = "HTTP 200 OK"
70 let http11_ok = "HTTP/1.1 200 OK"
73 let next_uploaders = ref ([] : BTTypes.client list)
74 let current_uploaders = ref ([] : BTTypes.client list)
77 (**
78 In this function we connect to a tracker.
79 @param file The file concerned by the request
80 @param url Url of the tracker to connect
81 @param event Event (as a string) to send to the tracker :
82 can be 'completed' if the file is complete, 'started' for the first
83 connection to this tracker or 'stopped' for a clean stop of the file.
84 Everything else will be ok for a second connection to the tracker.
85 Be careful to the spelling of this event
86 @param f The function used to parse the result of the connection.
87 The function will get a file as an argument (@see talk_to_tracker
88 for an example)
90 If we have less than !!ask_tracker_threshold sources
91 and if we respect the file_tracker_interval then
92 we really ask sources to the tracker
94 let connect_trackers file event need_sources f =
96 (* reset session statistics when sending 'started' event *)
97 if event = "started" then
98 begin
99 file.file_session_uploaded <- Int64.zero;
100 file.file_session_downloaded <- Int64.zero;
101 end;
103 let args,must_check_delay, left =
105 match file.file_swarmer with
106 None ->
107 begin
108 match event with
109 | "started" -> [("event", "started")],true,zero
110 | "stopped" -> [("event", "stopped")],false,zero
111 | _ -> [],true,zero
114 | Some swarmer ->
115 let local_downloaded = CommonSwarming.downloaded swarmer in
116 let left = file_size file -- local_downloaded in
117 match event with
118 | "completed" -> [("event", "completed")],false,zero
119 | "started" -> [("event", "started")],true,left
120 | "stopped" -> [("event", "stopped")],false,left
121 | _ -> [],true,left
124 let args = ("no_peer_id", "1") :: ("compact", "1") :: args in
125 let args =
126 if not need_sources then
127 ("numwant", "0") :: args
128 else if !!numwant > -1 then
129 ("numwant", string_of_int !!numwant) :: args
130 else
131 args
133 let args = if !!send_key then
134 ("key", Sha1.to_hexa !!client_uid) :: args else args
136 let args = if !!force_client_ip then
137 ("ip", Ip.to_string !!set_client_ip) :: args else args
139 let args =
140 ("info_hash", Sha1.direct_to_string file.file_id) ::
141 ("peer_id", Sha1.direct_to_string !!client_uid) ::
142 ("port", string_of_int !!client_port) ::
143 ("uploaded", Int64.to_string file.file_session_uploaded) ::
144 ("downloaded", Int64.to_string file.file_session_downloaded) ::
145 ("left", Int64.to_string left) ::
146 args
149 let enabled_trackers =
150 let enabled_trackers = List.filter (fun t -> tracker_is_enabled t) file.file_trackers in
151 if enabled_trackers <> [] then enabled_trackers
152 else begin
153 (* if there is no tracker left, do something ? *)
154 if !verbose_msg_servers then
155 lprintf_nl "No trackers left for %s, reenabling all of them..." (file_best_name (as_file file));
156 List.iter (fun t ->
157 match t.tracker_status with
158 (* only re-enable after normal error *)
159 | Disabled _ -> t.tracker_status <- Enabled
160 | _ -> ()) file.file_trackers;
161 let enabled_trackers = List.filter (fun t -> tracker_is_enabled t) file.file_trackers in
162 if enabled_trackers = [] && (file_state file) <> FilePaused then
163 begin
164 file_pause (as_file file) (CommonUserDb.admin_user ());
165 lprintf_file_nl (as_file file) "Paused %s, no usable trackers left" (file_best_name (as_file file))
166 end;
167 enabled_trackers
168 end in
170 List.iter (fun t ->
172 (* if we have too few sources we may ask the tracker before the interval *)
173 if not must_check_delay
174 || not file.file_tracker_connected
175 || t.tracker_last_conn + t.tracker_interval < last_time()
176 || ( file.file_clients_num < !!ask_tracker_threshold
177 && (file_state file) == FileDownloading
178 && (if t.tracker_min_interval > !!min_tracker_reask_interval then
179 t.tracker_last_conn + t.tracker_min_interval < last_time()
180 else
181 t.tracker_last_conn + !!min_tracker_reask_interval < last_time() ))
182 then
183 begin
184 (* if we already tried to connect but failed, disable tracker, but allow re-enabling *)
185 if file.file_tracker_connected && t.tracker_last_clients_num = 0 && t.tracker_last_conn < 1 then
186 begin
187 if !verbose_msg_servers then
188 lprintf_nl "Request error from tracker: disabling %s" t.tracker_url;
189 t.tracker_status <- Disabled (intern "MLDonkey: Request error from tracker")
191 (* Send request to tracker *)
192 else begin
193 let args = if String.length t.tracker_id > 0 then
194 ("trackerid", t.tracker_id) :: args else args
196 let args = if String.length t.tracker_key > 0 then
197 ("key", t.tracker_key) :: args else args
199 if !verbose_msg_servers then
200 lprintf_nl "connect_trackers: tracker_connected:%s id:%s key:%s last_clients:%i last_conn-last_time:%i file: %s"
201 (string_of_bool file.file_tracker_connected)
202 t.tracker_id t.tracker_key t.tracker_last_clients_num
203 (t.tracker_last_conn - last_time()) file.file_name;
205 let module H = Http_client in
206 let url = t.tracker_url in
207 let r = {
208 H.basic_request with
209 H.req_url = Url.of_string ~args: args url;
210 H.req_proxy = !CommonOptions.http_proxy;
211 H.req_user_agent = get_user_agent ();
212 (* #4541 [egs] supports redirect *)
213 H.req_max_retry = !!max_tracker_redirect;
214 } in
216 if !verbose_msg_servers then
217 lprintf_nl "Request sent to tracker %s for file: %s"
218 t.tracker_url file.file_name;
219 H.wget r
220 (fun fileres ->
221 t.tracker_last_conn <- last_time ();
222 file.file_tracker_connected <- true;
223 f t fileres)
227 else
228 if !verbose_msg_servers then
229 lprintf_nl "Request NOT sent to tracker %s - next request in %ds for file: %s"
230 t.tracker_url (t.tracker_interval - (last_time () - t.tracker_last_conn)) file.file_name
231 ) enabled_trackers
233 let start_upload c =
234 set_client_upload (as_client c) (as_file c.client_file);
235 set_client_has_a_slot (as_client c) NormalSlot;
236 Rate.update_no_change c.client_downloaded_rate;
237 Rate.update_no_change c.client_upload_rate;
238 c.client_last_optimist <- last_time();
239 client_enter_upload_queue (as_client c);
240 send_client c Unchoke
242 (** In this function we decide which peers will be
243 uploaders. We send a choke message to current uploaders
244 that are not in the next uploaders list. We send Unchoke
245 for clients that are in next list (and not in current)
247 let recompute_uploaders () =
248 if !verbose_upload then lprintf_nl "recompute_uploaders";
249 next_uploaders := choose_uploaders current_files;
250 (*Send choke if a current_uploader is not in next_uploaders*)
251 List.iter ( fun c -> if ((List.mem c !next_uploaders)==false) then
252 begin
253 set_client_has_a_slot (as_client c) NoSlot;
254 (*we will let him finish his download and choke him on next_request*)
256 ) !current_uploaders;
258 (*don't send Choke if new uploader is already an uploaders *)
259 List.iter ( fun c ->
260 if not (List.mem c !current_uploaders) then start_upload c
261 ) !next_uploaders;
262 current_uploaders := !next_uploaders
265 (****** Fabrice: why are clients which are disconnected removed ???
266 These clients might still be useful to reconnect to, no ? *)
269 (** This function is called when a client is disconnected
270 (be it by our side or its side).
271 A client which disconnects (even only one time) is discarded.
272 If it's an uploader which disconnects we recompute uploaders
273 (see recompute_uploaders) immediately.
274 @param c The client to disconnect
275 @param reason The reason for the disconnection (see in BasicSocket.ml)
277 let disconnect_client c reason =
278 if !verbose_msg_clients then
279 lprintf_nl "Client %d: disconnected: %s" (client_num c) (string_of_reason reason);
280 begin
281 match c.client_sock with
282 NoConnection -> ()
283 | ConnectionWaiting token ->
284 cancel_token token;
285 c.client_sock <- NoConnection
286 | Connection sock ->
287 close sock reason;
289 (* List.iter (fun r -> CommonSwarming.free_range r) c.client_ranges; *)
290 set_client_disconnected c reason;
291 c.client_session_downloaded <- 0L;
292 c.client_session_uploaded <- 0L;
293 (try if c.client_good then count_seen c with _ -> ());
294 (* this is not useful already done in the match
295 (try close sock reason with _ -> ()); *)
296 (*---------not needed ?? VvvvvV---------------
297 c.client_ranges <- [];
298 c.client_block <- None;
299 if not c.client_good then
300 connection_failed c.client_connection_control;
301 c.client_good <- false;
302 c.client_sock <- NoConnection;
303 c.client_chunks <- [];
304 c.client_allowed_to_write <- zero;
305 c.client_new_chunks <- [];
306 c.client_interesting <- false;
307 c.client_alrd_sent_interested <- false;
308 -------------------^^^^^--------------------*)
309 if (c.client_registered_bitfield) then
310 begin
311 match c.client_uploader with
312 None -> ()
313 | Some up ->
314 c.client_uploader <- None;
315 (* If the client registered a bitfield then
316 we must unregister him to update the swarmer
317 (Useful for availability)
319 CommonSwarming.unregister_uploader up
320 (* c.client_registered_bitfield <- false;
321 for i = 0 to String.length c.client_bitmap - 1 do
322 c.client_bitmap.[0] <- '0';
323 done*)
324 end;
325 (* Don't test if a client have an upload slot because
326 it don't have one (removed during earlier in
327 set_client_disconnected c reason)
329 if (List.mem c !current_uploaders) then
330 begin
331 (*BTGlobals.remove_client*)
332 remove_client c;
333 recompute_uploaders ();
335 else
336 remove_client c;
337 with _ -> ()
341 (** Disconnect all clients of a file
342 @param file The file to which we must disconnects all clients
344 let disconnect_clients file =
345 let must_keep = ref true in
346 (match file_state file with
347 | FilePaused | FileCancelled -> must_keep:=false
348 | _-> ()
350 Hashtbl.iter (fun _ c ->
351 if not ( !must_keep && (client_has_a_slot (as_client c) || c.client_interested)) then
352 begin
353 if !verbose_msg_clients then
354 lprintf_file_nl (as_file file) "disconnect since download is finished";
355 disconnect_client c Closed_by_user
357 ) file.file_clients
360 (** What to do when a file is finished
361 @param file the finished file
363 let download_finished file =
364 if List.memq file !current_files then
365 begin
366 connect_trackers file "completed" false (fun _ _ -> ()); (*must be called before swarmer gets removed from file*)
367 (*CommonComplexOptions.file_completed*)
368 file_completed (as_file file);
369 (* Remove the swarmer for this file as it is not useful anymore... *)
370 CommonSwarming.remove_swarmer file.file_swarmer;
371 file.file_swarmer <- None;
372 (* At this point, the file state is FileDownloaded. We should not remove
373 the file, because we continue to upload. *)
377 (** Check if a file is finished or not.
378 A file is finished if all blocks are verified.
379 @param file The file to check status
381 let check_finished swarmer file =
382 if CommonSwarming.check_finished swarmer then
383 download_finished file
385 let bits = [| 128; 64; 32;16;8;4;2;1 |]
387 (* Check/set bits in strings (bittorrent format) *)
389 let is_bit_set s n =
390 (Char.code s.[n lsr 3]) land bits.(n land 7) <> 0
392 let set_bit s n =
393 let i = n lsr 3 in
394 s.[i] <- Char.unsafe_chr (Char.code s.[i] lor bits.(n land 7))
396 (* Official client seems to use max_range_request 5 and max_range_len 2^14 *)
397 (* How much requests in the 'pipeline' *)
398 let max_range_requests = 5
399 (* How much bytes we can request in one Piece *)
402 (** A wrapper to send Interested message to a client.
403 (Send interested only if needed)
404 @param c The client to send Interested
406 let send_interested c =
407 if c.client_interesting && (not c.client_alrd_sent_interested) then
408 begin
409 c.client_alrd_sent_interested <- true;
410 send_client c Interested
414 (** Send a Bitfield message to a client.
415 @param c The client to send the Bitfield message
418 let send_bitfield c =
419 send_client c (BitField
421 match c.client_file.file_swarmer with
422 | None ->
423 (* This must be a seeded file... *)
424 if !verbose_download then
425 lprintf_nl "Sending completed verified bitmap";
426 let nchunks = Array.length c.client_file.file_chunks in
427 let len = (nchunks+7)/8 in
428 let s = String.make len '\000' in
429 for i = 0 to nchunks - 1 do
430 set_bit s i
431 done;
433 | Some swarmer ->
434 let bitmap = CommonSwarming.chunks_verified_bitmap swarmer in
435 if !verbose_download then
436 lprintf_nl "Sending verified bitmap: [%s]" (VB.to_string bitmap);
437 let len = (VB.length bitmap + 7)/8 in
438 let s = String.make len '\000' in
439 VB.iteri (fun i c ->
440 if c = VB.State_verified then set_bit s i) bitmap;
444 let counter = ref 0
446 let parse_reserved rbits c =
447 let has_bit pos h = Char.code rbits.[pos] land h <> 0 in
449 c.client_dht <- has_bit 7 0x01;
450 c.client_cache_extension <- has_bit 7 0x02;
451 c.client_fast_extension <- has_bit 7 0x04;
453 c.client_utorrent_extension <- has_bit 5 0x10;
455 c.client_azureus_messaging_protocol <- has_bit 0 0x80
458 (** This function is called to parse the first message that
459 a client send.
460 @param counter client num
461 @param cc Expected client (probably useless now that we don't save any client)
462 @param init_sent A boolean to know if we sent this client the handshake message
463 @param gconn Don't know
464 @param sock The socket we use for this client
465 @param proto Unused (required by tuple type?)
466 @param file_id The file hash (sha1) of the file involved in this exchange
468 (* removed: @param peer_id The hash (sha1) of the client. (Should be checked)
470 let rec client_parse_header counter cc init_sent gconn sock
471 (proto, rbits, file_id) =
473 set_lifetime sock 600.;
474 if !verbose_msg_clients then
475 lprintf_nl "client_parse_header %d" counter;
477 let file = Hashtbl.find files_by_uid file_id in
478 if !verbose_msg_clients then
479 lprintf_file_nl (as_file file) "file found";
480 let ccc, cc_country_code = !cc in
481 let c =
482 match ccc with
483 None ->
484 let c = new_client file Sha1.null (TcpBufferedSocket.peer_addr sock) cc_country_code in
485 if !verbose_connect then lprintf_file_nl (as_file file) "Client %d: incoming connection" (client_num c);
486 cc := (Some c), cc_country_code;
488 | Some c ->
489 (* Does it happen that this c was already used to connect successfully?
490 If yes then this must happen: *)
491 c.client_received_peer_id <- false;
492 if cc_country_code <> None && c.client_country_code = None then
493 c.client_country_code <- cc_country_code;
495 (* client could have had Sha1.null as peer_id/uid *)
496 (* this is to be done, later
497 if c.client_uid <> peer_id then
498 c.client_software <- (parse_software (Sha1.direct_to_string peer_id));
502 (* if c.client_uid <> peer_id then begin
503 lprintf "Unexpected client by UID\n";
504 let ccc = new_client file peer_id (TcpBufferedSocket.host sock) in
505 lprintf "CLIENT %d: testing instead of %d\n"
506 (client_num ccc) (client_num c);
507 (match ccc.client_sock with
508 Connection _ ->
509 lprintf_nl "[BT]: This client is already connected";
510 close sock (Closed_for_error "Already connected");
511 remove_client ccc;
513 | _ ->
514 lprintf_nl "[BT]: Client %d: recovered by UID" (client_num ccc);
515 remove_client c;
516 cc := Some ccc;
517 ccc)
518 end else
519 c *)
522 if !verbose_msg_clients then begin
523 let (ip,port) = c.client_host in
524 lprintf_nl "Client %d: Connected from %s:%d" (client_num c)
525 (Ip.to_string ip) port;
526 end;
528 parse_reserved rbits c;
530 (match c.client_sock with
531 NoConnection ->
532 if !verbose_msg_clients then begin
533 let (ip,port) = c.client_host in
534 lprintf_nl "No connection to client (%s:%d)!!!" (Ip.to_string ip) port;
535 end;
536 c.client_sock <- Connection sock
537 | ConnectionWaiting token ->
538 cancel_token token;
539 if !verbose_msg_clients then
540 lprintf_nl "Waiting for connection to client !!!";
541 c.client_sock <- Connection sock
542 | Connection s when s != sock ->
543 if !verbose_msg_clients then
544 lprintf_nl "CLIENT %d: IMMEDIATE RECONNECTION" (client_num c);
545 disconnect_client c (Closed_for_error "Reconnected");
546 c.client_sock <- Connection sock;
547 | Connection _ -> ()
550 set_client_state (c) (Connected (-1));
551 if not init_sent then
552 begin
553 c.client_incoming <- true;
554 send_init !!client_uid file_id sock;
555 end;
556 connection_ok c.client_connection_control;
557 if !verbose_msg_clients then
558 lprintf_nl "file and client found";
559 (* if not c.client_incoming then *)
560 send_bitfield c;
561 c.client_blocks_sent <- file.file_blocks_downloaded;
563 TODO !!! : send interested if and only if we are interested
564 -> we must recieve at least other peer bitfield.
565 in common swarmer -> compare : partition -> partition -> bool
568 set_rtimeout sock !!client_timeout;
569 (* Once parsed succesfully we define the function client_to_client
570 to be the function used when a message is read *)
571 gconn.gconn_handler <- Reader (fun gconn sock ->
572 bt_handler TcpMessages.parsing (client_to_client c) c sock
575 let b = TcpBufferedSocket.buf sock in
576 (* The receive buffer is normally not empty now, lets parse the rest, most likely PeerID *)
577 if b.len <> 0 then
578 ignore (bt_handler TcpMessages.parsing (client_to_client c) c sock);
580 (* Some newer clients send more opcodes in their handshake packet, lets parse them now.
581 Using "while b.len <> 0 do ... done" is not possible here because libtorrent clients
582 send unparsable five extra bytes after their PeerID which would result into a loop *)
583 if b.len <> 0 then
584 ignore (bt_handler TcpMessages.parsing (client_to_client c) c sock);
586 with
587 | Not_found ->
588 let (ip,port) = (TcpBufferedSocket.peer_addr sock) in
589 if !verbose_unexpected_messages then
590 lprintf_nl "Client %s:%d requested a file that is not shared [%s]"
591 (Ip.to_string ip) port (Sha1.to_hexa file_id)
592 | e ->
593 lprintf_nl "Exception %s in client_parse_header" (Printexc2.to_string e);
594 close sock (Closed_for_exception e);
595 raise e
598 (** Update the bitmap of a client. Unclear if it is still useful.
599 @param c The client which we want to update.
601 and update_client_bitmap c =
602 let file = c.client_file in
604 let swarmer = match file.file_swarmer with
605 None -> assert false
606 | Some swarmer -> swarmer
609 let up =
610 match c.client_uploader with
611 None ->
612 let up = CommonSwarming.register_uploader swarmer (as_client c)
613 (AvailableIntervals []) in
614 c.client_uploader <- Some up;
616 | Some up ->
620 let bitmap = match c.client_bitmap with
621 None ->
622 let len = CommonSwarming.partition_size swarmer in
623 let bitmap = Bitv.create len false in
624 c.client_bitmap <- Some bitmap;
625 bitmap
626 | Some bitmap -> bitmap
629 if c.client_new_chunks <> [] then begin
630 let chunks = c.client_new_chunks in
631 c.client_new_chunks <- [];
632 List.iter (fun n -> Bitv.set bitmap n true) chunks;
633 CommonSwarming.update_uploader_intervals up (AvailableBitv bitmap);
637 (** In this function we decide which piece we must request from client.
638 @param sock Socket of the client
639 @param c The client
641 and get_from_client sock (c: client) =
642 let file = c.client_file in
643 (* Check if there's not enough requests in the 'pipeline'
644 and if a request can be send (not choked and file is downloading) *)
645 if List.length c.client_ranges_sent < max_range_requests
646 && file_state file = FileDownloading
647 && (c.client_choked == false)
648 then
649 (* num is the number of the piece, x and y are the position
650 of the subpiece in the piece(!), r is a (CommonSwarmer) range *)
652 let up = match c.client_uploader with
653 None -> assert false
654 | Some up -> up in
655 let swarmer = CommonSwarming.uploader_swarmer up in
659 let num, x,y, r =
661 if !verbose_msg_clients then
662 lprintf_file_nl (as_file file) "CLIENT %d: Finding new range to send" (client_num c);
664 if !verbose_swarming then begin
665 lprintf_n "Current download:\n Current chunks: ";
668 List.iter (fun (x,y) -> lprintf "%Ld-%Ld " x y) c.client_chunks
669 with _ -> lprintf "No Chunks";
671 lprint_newline ();
673 lprintf_n "Current ranges: ";
675 List.iter (fun (p1,p2, r) ->
676 let (x,y) = CommonSwarming.range_range r in
677 lprintf "%Ld-%Ld[%Ld-%Ld] " p1 p2 x y
678 ) c.client_ranges_sent;
680 match c.client_range_waiting with
681 | None -> ()
682 | Some (x,y,r) -> lprintf "Waiting %Ld-%Ld" x y;
684 lprint_newline ();
686 lprintf_n "Current blocks: ";
688 match c.client_chunk with
689 | None -> lprintf "none"
690 | Some (chunk, blocks) -> List.iter (fun b ->
691 CommonSwarming.print_block b.up_block) blocks;
693 lprint_newline ();
695 lprintf_file_nl (as_file file) "Finding Range:";
696 end;
700 (*We must find a block to request first, and then
701 some range inside this block
704 let rec iter () =
706 match c.client_chunk with
708 | None ->
710 if !verbose_swarming then lprintf_file_nl (as_file file) "No block";
711 update_client_bitmap c;
712 (try CommonSwarming.verify_one_chunk swarmer with _ -> ());
713 (*Find a free block in the swarmer*)
714 let chunk, blocks = CommonSwarming.find_blocks up in
715 if !verbose_swarming then begin
716 lprintf_n "Blocks Found: "; List.iter (fun b ->
717 CommonSwarming.print_block b.up_block) blocks;
718 lprint_newline ()
719 end;
720 c.client_chunk <- Some (chunk, blocks);
722 (*We put the found block in client_block to
723 request range in this block. (Useful for
724 not searching each time a new block)
727 iter ()
729 | Some (chunk, blocks) ->
731 if !verbose_swarming then begin
732 lprintf_n "Current Blocks: "; List.iter (fun b ->
733 CommonSwarming.print_block b.up_block) blocks;
734 lprint_newline ()
735 end;
738 (*Given a block find a range inside*)
739 let (x,y,r) =
740 match c.client_range_waiting with
741 | Some (x,y,r) ->
742 c.client_range_waiting <- None;
743 (x,y,r)
744 | None ->
745 CommonSwarming.find_range up (min max_range_len file.file_piece_size)
748 let (x,y,r) =
750 if y -- x > max_range_len then begin
751 c.client_range_waiting <- Some (x ++ max_range_len, y, r);
752 (x, x ++ max_range_len, r)
753 end else
754 (x,y,r)
757 c.client_ranges_sent <- c.client_ranges_sent @ [x,y, r];
758 (* CommonSwarming.alloc_range r; *)
760 (* naughty, naughty, was computing a block number instead of a chunk
761 number. Only matters with merged downloads, and even then other
762 clients didn't seem to care (?), so the bug remained hidden *)
763 if !verbose_swarming then
764 lprintf_file_nl (as_file file) "Asking %d For Range %Ld-%Ld" chunk x y;
766 chunk, x -- file.file_piece_size ** Int64.of_int chunk, y -- x, r
768 with Not_found ->
770 (*If we don't find a range to request inside the block,
771 iter to choose another block*)
772 if !verbose_swarming then
773 lprintf_nl "Could not find range in current block";
774 (* c.client_blocks <- List2.removeq b c.client_blocks; *)
776 c.client_chunk <- None;
778 iter ()
781 iter ()
783 with Not_found ->
784 (*If we don't find a block to request we can check if the
785 file is finished (if there's missing pieces we can't decide
786 that the file is finished because we didn't found
787 a block to ask)
789 if !verbose_swarming then
790 lprintf_nl "Unable to get a block !!";
791 CommonSwarming.compute_bitmap swarmer;
792 check_finished swarmer file;
793 raise Not_found
796 send_client c (Request (num,x,y));
798 if !verbose_msg_clients then
799 lprintf_file_nl (as_file file) "CLIENT %d: Asking %s For Range %Ld-%Ld"
800 (client_num c) (Sha1.to_string c.client_uid) x y
802 with Not_found ->
803 if not (CommonSwarming.check_finished swarmer) && !verbose_download then
804 lprintf_file_nl (as_file file) "BTClient.get_from_client ERROR: can't find a block to download and file is not yet finished for file : %s..." file.file_name
807 (** In this function we match a message sent by a client
808 and react according to this message.
809 @param c The client which sent us a message
810 @param sock The socket used for this client
811 @param msg The message sent by the client
813 and client_to_client c sock msg =
814 if !verbose_msg_clients then begin
815 let (ip,port) = (TcpBufferedSocket.peer_addr sock) in
816 let (timeout, next) = get_rtimeout sock in
817 lprintf_nl "CLIENT %d(%s:%d): (%d, %d,%d) Received %s"
818 (client_num c) (Ip.to_string ip) port
819 (last_time ())
820 (int_of_float timeout)
821 (int_of_float next)
822 (TcpMessages.to_string msg);
823 end;
825 let file = c.client_file in
827 (* Sending the "Have" message was moved to bTGlobals so this is useless *)
828 (* if c.client_blocks_sent != file.file_blocks_downloaded then begin
829 let rec iter list =
830 match list with
831 [] -> ()
832 | b :: tail when tail == c.client_blocks_sent ->
833 c.client_blocks_sent <- list;
834 let (num,_,_) = CommonSwarming.block_block b in
835 send_client c (Have (Int64.of_int num))
836 | _ :: tail -> iter tail
838 iter file.file_blocks_downloaded
839 end;*)
842 match msg with
843 Piece (num, offset, s, pos, len) ->
844 (*A Piece message contains the data*)
845 set_client_state c (Connected_downloading (file_num file));
846 (*flag it as a good client *)
847 c.client_good <- true;
848 if file_state file = FileDownloading then begin
849 let position = offset ++ file.file_piece_size *.. num in
850 let up = match c.client_uploader with
851 None -> assert false
852 | Some up -> up in
853 let swarmer = CommonSwarming.uploader_swarmer up in
855 if !verbose_msg_clients then
856 (match c.client_ranges_sent with
857 [] -> lprintf_file_nl (as_file file) "EMPTY Ranges !!!"
858 | (p1,p2,r) :: _ ->
859 let (x,y) = CommonSwarming.range_range r in
860 lprintf_file_nl (as_file file) "Current range from %s : %Ld [%d] (asked %Ld-%Ld[%Ld-%Ld])"
861 (brand_to_string c.client_brand) position len
862 p1 p2 x y
865 let old_downloaded =
866 CommonSwarming.downloaded swarmer in
867 (* List.iter CommonSwarming.free_range c.client_ranges; *)
868 CommonSwarming.received up
869 position s pos len;
870 (* List.iter CommonSwarming.alloc_range c.client_ranges; *)
871 let new_downloaded =
872 CommonSwarming.downloaded swarmer in
874 (*Update rate and amount of data received from client*)
875 count_download c (new_downloaded -- old_downloaded);
876 (* use len here with max_dr quickfix *)
877 Rate.update c.client_downloaded_rate ~amount:len;
878 (* count bytes downloaded from network for this file *)
879 file.file_session_downloaded <- file.file_session_downloaded ++ (Int64.of_int len);
880 if !verbose_msg_clients then
881 (match c.client_ranges_sent with
882 [] -> lprintf_file_nl (as_file file) "EMPTY Ranges !!!"
883 | (p1,p2,r) :: _ ->
884 let (x,y) = CommonSwarming.range_range r in
885 lprintf_file_nl (as_file file) "Received %Ld [%d] %Ld-%Ld[%Ld-%Ld] -> %Ld"
886 position len
887 p1 p2 x y
888 (new_downloaded -- old_downloaded)
891 (* changed 2.5.28 should have been done before !
892 if new_downloaded <> old_downloaded then
893 add_file_downloaded (as_file file)
894 (new_downloaded -- old_downloaded); *)
895 end;
896 begin
897 match c.client_ranges_sent with
898 [] -> ()
899 | r :: tail ->
900 (* CommonSwarming.free_range r; *)
901 c.client_ranges_sent <- tail;
902 end;
903 get_from_client sock c;
905 (* Check if the client is still interesting for us... *)
906 check_if_interesting file c
908 | PeerID p ->
909 (* Disconnect if that is ourselves. *)
910 c.client_uid <- Sha1.direct_of_string p;
911 if not (c.client_uid = !!client_uid) then
912 begin
913 let brand, release = parse_software p in
914 c.client_brand <- brand;
915 c.client_release <- release;
916 send_client c Choke;
917 c.client_sent_choke <- true;
919 else
920 disconnect_client c Closed_by_user
923 | BitField p ->
924 (*A bitfield is a summary of what a client have*)
925 begin
926 match c.client_file.file_swarmer with
927 None -> ()
928 | Some swarmer ->
929 c.client_new_chunks <- [];
931 let npieces = CommonSwarming.partition_size swarmer in
932 let nbits = String.length p * 8 in
934 if nbits < npieces then begin
935 lprintf_file_nl (as_file file) "Error: expected bitfield of atleast %d but got %d" npieces nbits;
936 disconnect_client c (Closed_for_error "Wrong bitfield length")
937 end else begin
939 let bitmap = CommonSwarming.chunks_verified_bitmap swarmer in
941 for i = 0 to npieces - 1 do
942 if is_bit_set p i then begin
943 c.client_new_chunks <- i :: c.client_new_chunks;
944 match VB.get bitmap i with
945 | VB.State_missing | VB.State_partial ->
946 c.client_interesting <- true
947 | VB.State_complete | VB.State_verified -> ()
948 end
949 done;
951 update_client_bitmap c;
952 c.client_registered_bitfield <- true;
954 if c.client_interesting then
955 send_interested c;
957 if !verbose_msg_clients then
958 lprintf_file_nl (as_file file) "New BitField Registered";
960 (* for i = 1 to max_range_requests - List.length c.client_ranges do
961 (try get_from_client sock c with _ -> ())
962 done
965 end;
966 end;
967 (* Note: a bitfield must only be sent after the handshake and before everything else: NOT here *)
969 | Have n ->
970 (* A client can send a "Have" without sending a Bitfield *)
971 begin
972 match c.client_file.file_swarmer with
973 None -> ()
974 | Some swarmer ->
975 let n = Int64.to_int n in
976 let bitmap = CommonSwarming.chunks_verified_bitmap swarmer in
977 (* lprintf_nl "verified: %c;" (VB.state_to_char (VB.get bitmap n)); *)
978 (* if the peer has a chunk we don't, tell him we're interested and update his bitmap *)
979 match VB.get bitmap n with
980 | VB.State_missing | VB.State_partial ->
981 c.client_interesting <- true;
982 send_interested c;
983 c.client_new_chunks <- n :: c.client_new_chunks;
984 update_client_bitmap c;
985 | VB.State_complete | VB.State_verified -> ()
987 (* begin
988 match c.client_bitmap, c.client_uploader with
989 Some bitmap, Some up ->
990 let swarmer = CommonSwarming.uploader_swarmer up in
991 let n = Int64.to_int n in
992 if bitmap.[n] <> '1' then
994 let verified = CommonSwarming.verified_bitmap swarmer in
995 if verified.[n] < '2' then begin
996 c.client_interesting <- true;
997 send_interested c;
998 c.client_new_chunks <- n :: c.client_new_chunks;
999 if c.client_block = None then begin
1000 update_client_bitmap c;
1001 (* for i = 1 to max_range_requests -
1002 List.length c.client_ranges do
1003 (try get_from_client sock c with _ -> ())
1004 done*)
1007 | None, Some _ -> lprintf_nl "no bitmap but client_uploader";
1008 | Some _ , None ->lprintf_nl "bitmap but no client_uploader";
1009 | None, None -> lprintf_nl "no bitmap no client_uploader";
1014 | Interested ->
1015 c.client_interested <- true;
1017 | Choke ->
1018 begin
1019 set_client_state (c) (Connected (-1));
1020 (* remote peer will clear the list of range we sent *)
1021 begin
1022 match c.client_uploader with
1023 None ->
1024 (* Afaik this is no protocol violation and happens if the client
1025 didn't send a client bitmap after the handshake. *)
1026 let (ip,port) = c.client_host in
1027 if !verbose_msg_clients then lprintf_file_nl (as_file file) "%s:%d with software %s : Choke send, but no client bitmap"
1028 (Ip.to_string ip) port (brand_to_string c.client_brand)
1029 | Some up ->
1030 CommonSwarming.clear_uploader_intervals up
1031 end;
1032 c.client_ranges_sent <- [];
1033 c.client_range_waiting <- None;
1034 c.client_choked <- true;
1037 | NotInterested ->
1038 c.client_interested <- false;
1040 | Unchoke ->
1041 begin
1042 c.client_choked <- false;
1043 (* remote peer cleared our request : re-request *)
1044 for i = 1 to max_range_requests -
1045 List.length c.client_ranges_sent do
1046 (try get_from_client sock c with _ -> ())
1047 done
1050 | Request (n, pos, len) ->
1051 if len > max_request_len then begin
1052 close sock (Closed_for_error "Request longer than 1<<16");
1053 raise Exit
1054 end;
1056 if !CommonGlobals.has_upload = 0 then
1057 begin
1058 if client_has_a_slot (as_client c) then
1059 begin
1060 (* lprintf "Received request for upload\n"; *)
1061 (match c.client_upload_requests with
1062 [] ->
1063 CommonUploads.ready_for_upload (as_client c);
1064 | _ -> ());
1065 c.client_upload_requests <- c.client_upload_requests @ [n,pos,len];
1066 let file = c.client_file in
1067 match file.file_shared with
1068 None -> ()
1069 | Some s ->
1070 begin
1071 s.impl_shared_requests <- s.impl_shared_requests + 1;
1072 shared_must_update (as_shared s)
1075 else
1076 begin
1077 send_client c Choke;
1078 c.client_sent_choke <- true;
1079 c.client_upload_requests <- [];
1081 end;
1083 | Ping -> ()
1084 (* We don't 'generate' a Ping message on a Ping. *)
1086 | Cancel (n, pos, len) ->
1087 (* if we receive a cancel message from a peer, remove request *)
1088 if client_has_a_slot (as_client c) then
1089 c.client_upload_requests <- List2.remove_first (n, pos, len) c.client_upload_requests
1090 else
1091 if !verbose_msg_clients then
1092 lprintf_file_nl (as_file file) "Error: received cancel request but client has no slot"
1094 with e ->
1095 lprintf_file_nl (as_file file) "Error %s while handling MESSAGE: %s" (Printexc2.to_string e) (TcpMessages.to_string msg)
1098 (** The function used to connect to a client.
1099 The connection is not immediately initiated. It will
1100 be put in a fifo and dequeued according to
1101 !!max_connections_per_second. (@see commonGlobals.ml)
1102 @param c The client we must connect
1104 let connect_client c =
1105 if can_open_connection connection_manager &&
1106 (let (ip,port) = c.client_host in
1107 match !Ip.banned (ip, c.client_country_code) with
1108 None -> true
1109 | Some reason ->
1110 if !verbose_connect then
1111 lprintf_nl "%s:%d (%s), blocked: %s"
1112 (Ip.to_string ip) port
1113 (fst (Geoip.get_country_code_name c.client_country_code))
1114 reason;
1115 false)
1116 then
1117 match c.client_sock with
1118 NoConnection ->
1120 let token =
1121 add_pending_connection connection_manager (fun token ->
1123 if !verbose_msg_clients then
1124 lprintf_nl "CLIENT %d: connect_client" (client_num c);
1125 let (ip,port) = c.client_host in
1126 if !verbose_msg_clients then
1127 lprintf_nl "connecting %s:%d" (Ip.to_string ip) port;
1128 connection_try c.client_connection_control;
1129 begin
1130 let sock = connect token "bittorrent download"
1131 (Ip.to_inet_addr ip) port
1132 (fun sock event ->
1133 match event with
1134 BASIC_EVENT LTIMEOUT ->
1135 if !verbose_msg_clients then
1136 lprintf_nl "CLIENT %d: LIFETIME" (client_num c);
1137 close sock Closed_for_timeout
1138 | BASIC_EVENT RTIMEOUT ->
1139 if !verbose_msg_clients then
1140 lprintf_nl "CLIENT %d: RTIMEOUT (%d)" (client_num c)
1141 (last_time ())
1143 close sock Closed_for_timeout
1144 | BASIC_EVENT (CLOSED r) ->
1145 begin
1146 match c.client_sock with
1147 | Connection s when s == sock ->
1148 disconnect_client c r
1149 | _ -> ()
1150 end;
1151 | _ -> ()
1154 c.client_sock <- Connection sock;
1155 set_lifetime sock 600.;
1156 TcpBufferedSocket.set_read_controler sock download_control;
1157 TcpBufferedSocket.set_write_controler sock upload_control;
1158 TcpBufferedSocket.set_rtimeout sock 30.;
1159 let file = c.client_file in
1161 if !verbose_msg_clients then
1162 lprintf_file_nl (as_file file) "READY TO DOWNLOAD FILE";
1164 send_init !!client_uid file.file_id sock;
1165 (* Fabrice: Initialize the client bitmap and uploader fields to <> None *)
1166 update_client_bitmap c;
1167 (* (try get_from_client sock c with _ -> ());*)
1168 incr counter;
1169 (*We 'hook' the client_parse_header function to the socket
1170 This function will then be called when the first message will
1171 be parsed
1173 set_bt_sock sock !verbose_msg_clients
1174 (BTHeader (client_parse_header !counter (ref ((Some c), c.client_country_code)) true))
1176 with e ->
1177 lprintf_nl "Exception %s while connecting to client"
1178 (Printexc2.to_string e);
1179 disconnect_client c (Closed_for_exception e)
1181 (*Since this is a pending connection put ConnectionWaiting
1182 in client_sock
1185 c.client_sock <- ConnectionWaiting token
1186 | _ -> ()
1189 (** The Listen function (very much like in C : TCP Socket Server).
1190 Monitors client connection to us.
1192 let listen () =
1194 let s = TcpServerSocket.create "bittorrent client server"
1195 (Ip.to_inet_addr !!client_bind_addr)
1196 !!client_port
1197 (fun sock event ->
1198 match event with
1199 TcpServerSocket.CONNECTION (s,
1200 Unix.ADDR_INET(from_ip, from_port)) ->
1201 (*Receiving an event TcpServerSocket.CONNECTION from
1202 the TcpServerSocket means that a new client try
1203 to connect to us
1205 let ip = (Ip.of_inet_addr from_ip) in
1206 let cc = Geoip.get_country_code_option ip in
1207 if !verbose_sources > 1 then lprintf_nl "CONNECTION RECEIVED FROM %s"
1208 (Ip.to_string (Ip.of_inet_addr from_ip))
1210 (*Reject this connection if we don't want
1211 to bypass the max_connection parameter
1213 if can_open_connection connection_manager &&
1214 (match !Ip.banned (ip, cc) with
1215 None -> true
1216 | Some reason ->
1217 if !verbose_connect then
1218 lprintf_nl "%s:%d (%s) blocked: %s"
1219 (Ip.to_string ip) from_port
1220 (fst (Geoip.get_country_code_name cc))
1221 reason;
1222 false)
1223 then
1224 begin
1225 let token = create_token connection_manager in
1226 let sock = TcpBufferedSocket.create token
1227 "bittorrent client connection" s
1228 (fun sock event ->
1229 match event with
1230 BASIC_EVENT (RTIMEOUT|LTIMEOUT) ->
1231 (*monitor read and life timeout on client
1232 sockets
1234 close sock Closed_for_timeout
1235 | _ -> ()
1238 TcpBufferedSocket.set_read_controler sock download_control;
1239 TcpBufferedSocket.set_write_controler sock upload_control;
1241 let c = ref (None, cc) in
1242 TcpBufferedSocket.set_closer sock (fun _ r ->
1243 match fst !c with
1244 Some c -> begin
1245 match c.client_sock with
1246 | Connection s when s == sock ->
1247 disconnect_client c r
1248 | _ -> ()
1250 | None -> ()
1252 set_rtimeout sock 30.;
1253 incr counter;
1254 (*Again : 'hook' client_parse_header to the socket*)
1255 set_bt_sock sock !verbose_msg_clients
1256 (BTHeader (client_parse_header !counter c false));
1258 else
1259 (*don't forget to close the incoming sock if we can't
1260 open a new connection
1262 Unix.close s
1263 | _ -> ()
1264 ) in
1265 listen_sock := Some s;
1267 with e ->
1268 if !verbose_connect then
1269 lprintf_nl "Exception %s while init bittorrent server"
1270 (Printexc2.to_string e)
1273 (** This function send keepalive messages to all connected clients
1274 (and update socket lifetime)
1276 let send_pings () =
1277 List.iter (fun file ->
1278 Hashtbl.iter (fun _ c ->
1279 match c.client_sock with
1280 | Connection sock ->
1281 send_client c Ping;
1282 set_lifetime sock 130.;
1283 | _ -> ()
1284 ) file.file_clients
1285 ) !current_files
1287 open Bencode
1290 (** Check each clients for a given file if they are connected.
1291 If they aren't, try to connect them
1293 let resume_clients file =
1294 Hashtbl.iter (fun _ c ->
1296 match c.client_sock with
1297 | Connection sock -> ()
1298 (*i think this one is not really usefull for debugging
1299 lprintf_nl "[BT]: RESUME: Client is already connected"; *)
1300 | _ ->
1301 (try
1302 (*test if we can connect client according to the its
1303 connection_control.
1304 Currently the delay between two try is 120 seconds.
1306 if connection_can_try c.client_connection_control then
1307 connect_client c
1308 else
1309 print_control c.client_connection_control
1310 with _ -> ())
1311 with e ->
1312 if !verbose_connect then
1313 lprintf_file_nl (as_file file) "Exception %s in resume_clients" (Printexc2.to_string e)
1314 ) file.file_clients
1316 (** Check if the value replied by the tracker is correct.
1317 @param key the name of the key
1318 @param n the value to check
1319 @param url Url of the tracker
1320 @param name the name of the file
1322 let chk_keyval key n url name =
1323 let int_n = (Int64.to_int n) in
1324 if !verbose_msg_clients then
1325 lprintf_nl "Reply from %s in file: %s has %s: %d" url name key int_n;
1326 if int_n > -1 then
1327 int_n
1328 else begin
1329 lprintf_nl "Reply from %s in file: %s has an invalid %s value: %d" url name key int_n;
1333 (** Check that client is valid and record it *)
1334 let maybe_new_client file id ip port =
1335 let cc = Geoip.get_country_code_option ip in
1336 if id <> !!client_uid
1337 && ip != Ip.null
1338 && port <> 0
1339 && (match !Ip.banned (ip, cc) with
1340 | None -> true
1341 | Some reason ->
1342 if !verbose_connect then
1343 lprintf_file_nl (as_file file) "%s:%d blocked: %s" (Ip.to_string ip) port reason;
1344 false)
1345 then
1346 ignore (new_client file id (ip,port) cc);
1347 if !verbose_sources > 1 then
1348 lprintf_file_nl (as_file file) "Received %s:%d" (Ip.to_string ip) port;
1351 (** In this function we interact with the tracker
1352 @param file The file for which we want some sources
1353 @param need_sources whether we need any sources
1355 let talk_to_tracker file need_sources =
1356 let f t filename =
1357 (*This is the function which will be called by the http client
1358 for parsing the response
1360 let tracker_reply =
1362 File.to_string filename
1363 with e -> lprintf_file_nl (as_file file) "Empty reply from tracker"; ""
1365 let v =
1366 match tracker_reply with
1367 | "" ->
1368 if !verbose_connect then
1369 lprintf_file_nl (as_file file) "Empty reply from tracker";
1370 Bencode.decode ""
1371 | _ -> Bencode.decode tracker_reply
1373 t.tracker_interval <- 600;
1374 t.tracker_min_interval <- 600;
1375 if need_sources then t.tracker_last_clients_num <- 0;
1376 match v with
1377 Dictionary list ->
1378 List.iter (fun (key,value) ->
1379 (match (key, value) with
1380 | String "failure reason", _ -> ()
1381 | _ -> (match t.tracker_status with
1382 | Disabled_failure (i, _) ->
1383 lprintf_file_nl (as_file file) "Received good message from Tracker %s in file: %s after %d bad attempts"
1384 t.tracker_url file.file_name i
1385 | _ -> ());
1386 (* Received good message from tracker after failures, re-enable tracker *)
1387 t.tracker_status <- Enabled);
1389 match (key, value) with
1390 | String "failure reason", String failure ->
1391 (* On failure, disable the tracker, count the attempts and forbid re-enabling *)
1392 t.tracker_status <- (match t.tracker_status with
1393 | Disabled_failure (i,_) -> Disabled_failure (i + 1, intern failure)
1394 | _ -> Disabled_failure (1, intern failure));
1395 lprintf_file_nl (as_file file) "Failure no. %d%s from Tracker %s in file: %s Reason: %s"
1396 (match t.tracker_status with | Disabled_failure (i,_) -> i | _ -> 1)
1397 (if !!tracker_retries = 0 then "" else Printf.sprintf "/%d" !!tracker_retries)
1398 t.tracker_url file.file_name (Charset.to_utf8 failure)
1399 | String "warning message", String warning ->
1400 lprintf_file_nl (as_file file) "Warning from Tracker %s in file: %s Reason: %s" t.tracker_url file.file_name warning
1401 | String "interval", Int n ->
1402 t.tracker_interval <- chk_keyval (Bencode.print key) n t.tracker_url file.file_name;
1403 (* in case we don't receive "min interval" *)
1404 if t.tracker_min_interval > t.tracker_interval then
1405 t.tracker_min_interval <- t.tracker_interval
1406 | String "min interval", Int n ->
1407 t.tracker_min_interval <- chk_keyval (Bencode.print key) n t.tracker_url file.file_name;
1408 (* make sure "min interval" is always < or equal to "interval" *)
1409 if t.tracker_min_interval > t.tracker_interval then
1410 t.tracker_min_interval <- t.tracker_interval
1411 | String "downloaded", Int n ->
1412 t.tracker_torrent_downloaded <- chk_keyval (Bencode.print key) n t.tracker_url file.file_name
1413 | String "complete", Int n
1414 | String "done peers", Int n ->
1415 t.tracker_torrent_complete <- chk_keyval (Bencode.print key) n t.tracker_url file.file_name
1416 | String "incomplete", Int n ->
1417 t.tracker_torrent_incomplete <- chk_keyval (Bencode.print key) n t.tracker_url file.file_name;
1418 (* if complete > 0 and we receive incomplete we probably won't receive num_peers so we simulate it below *)
1419 if t.tracker_torrent_complete > 0 then
1420 t.tracker_torrent_total_clients_count <- (t.tracker_torrent_complete + t.tracker_torrent_incomplete);
1421 | String "num peers", Int n ->
1422 t.tracker_torrent_total_clients_count <- chk_keyval (Bencode.print key) n t.tracker_url file.file_name;
1423 (* if complete > 0 and we receive num_peers we probably won't receive incomplete so we simulate it below *)
1424 if t.tracker_torrent_complete > 0 then
1425 t.tracker_torrent_incomplete <- (t.tracker_torrent_total_clients_count - t.tracker_torrent_complete);
1426 | String "last", Int n ->
1427 t.tracker_torrent_last_dl_req <- chk_keyval (Bencode.print key) n t.tracker_url file.file_name
1428 | String "key", String n ->
1429 t.tracker_key <- n;
1430 if !verbose_msg_clients then
1431 lprintf_file_nl (as_file file) "%s in file: %s has key: %s" t.tracker_url file.file_name n
1432 | String "tracker id", String n ->
1433 t.tracker_id <- n;
1434 if !verbose_msg_clients then
1435 lprintf_file_nl (as_file file) "%s in file: %s has tracker id %s" t.tracker_url file.file_name n
1437 | String "peers", List list ->
1438 if need_sources then
1439 List.iter (fun v ->
1440 match v with
1441 | Dictionary list ->
1442 let peer_id = ref Sha1.null in
1443 let peer_ip = ref Ip.null in
1444 let port = ref 0 in
1446 List.iter (fun v ->
1447 match v with
1448 String "peer id", String id ->
1449 peer_id := Sha1.direct_of_string id;
1450 | String "ip", String ip ->
1451 peer_ip := Ip.of_string ip
1452 | String "port", Int p ->
1453 port := Int64.to_int p
1454 | _ -> ()
1455 ) list;
1457 t.tracker_last_clients_num <- t.tracker_last_clients_num + 1;
1458 maybe_new_client file !peer_id !peer_ip !port
1460 | _ -> assert false
1461 ) list
1462 | String "peers", String p ->
1463 let rec iter_comp s pos l =
1464 if pos < l then
1465 let ip = Ip.of_ints (get_uint8 s pos,get_uint8 s (pos+1),
1466 get_uint8 s (pos+2),get_uint8 s (pos+3))
1467 and port = get_int16 s (pos+4)
1469 t.tracker_last_clients_num <- t.tracker_last_clients_num + 1;
1470 maybe_new_client file Sha1.null ip port;
1472 iter_comp s (pos+6) l
1474 if need_sources then
1475 iter_comp p 0 (String.length p)
1476 | String "private", Int n -> ()
1477 (* TODO: if set to 1, disable peer exchange *)
1479 | _ -> lprintf_file_nl (as_file file) "received unknown entry in answer from tracker: %s : %s" (Bencode.print key) (Bencode.print value)
1480 ) list;
1481 (*Now, that we have added new clients to a file, it's time
1482 to connect to them*)
1483 if !verbose_sources > 0 then
1484 lprintf_file_nl (as_file file) "talk_to_tracker: got %i source(s) for file %s"
1485 t.tracker_last_clients_num file.file_name;
1486 if need_sources then resume_clients file
1488 | _ -> assert false
1490 let event =
1491 if file.file_tracker_connected then ""
1492 else "started"
1494 connect_trackers file event need_sources f
1497 (** Check to see if file is finished, if not
1498 try to get sources for it
1500 let recover_files () =
1501 if !verbose_share then
1502 lprintf_nl "recover_files";
1503 List.iter (fun file ->
1504 match file.file_swarmer with
1505 None -> ()
1506 | Some swarmer ->
1507 (try check_finished swarmer file with e -> ());
1508 match file_state file with
1509 FileDownloading ->
1510 if !verbose_share then
1511 lprintf_file_nl (as_file file) "recover downloading";
1512 (try talk_to_tracker file true with _ -> ())
1513 | FileShared ->
1514 if !verbose_share then
1515 lprintf_file_nl (as_file file) "recover shared";
1516 (try talk_to_tracker file false with _ -> ())
1517 | FilePaused -> () (*when we are paused we do nothing, not even logging this vvvv*)
1518 | s -> lprintf_file_nl (as_file file) "recover: Other state %s!!" (string_of_state s)
1519 ) !current_files
1521 let upload_buffer = String.create 100000
1525 Send a Piece message
1526 for one of the request of client
1527 @param sock The socket of the client
1528 @param c The client
1530 let rec iter_upload sock c =
1531 match c.client_upload_requests with
1532 [] -> ()
1533 | (num, pos, len) :: tail ->
1534 if len = zero then begin
1535 c.client_upload_requests <- tail;
1536 iter_upload sock c
1537 end else
1538 if c.client_allowed_to_write >= 0L then begin
1540 c.client_upload_requests <- tail;
1542 let file = c.client_file in
1543 let offset = pos ++ file.file_piece_size *.. num in
1544 c.client_allowed_to_write <- c.client_allowed_to_write -- len;
1545 count_upload c len;
1546 let len = Int64.to_int len in
1547 (* lprintf "Unix32.read: offset %Ld len %d\n" offset len; *)
1548 Unix32.read (file_fd file) offset upload_buffer 0 len;
1549 (* update upload rate from len bytes *)
1550 Rate.update c.client_upload_rate ~amount:len;
1551 Rate.update c.client_downloaded_rate;
1552 file.file_uploaded <- file.file_uploaded ++ (Int64.of_int len);
1553 file.file_session_uploaded <- file.file_session_uploaded ++ (Int64.of_int len);
1554 let _ =
1555 (* update stats *)
1556 count_filerequest c;
1557 match file.file_shared with
1558 None -> ()
1559 | Some s ->
1560 begin
1561 s.impl_shared_uploaded <- file.file_uploaded;
1562 shared_must_update (as_shared s)
1565 (* lprintf "sending piece\n"; *)
1566 send_client c (Piece (num, pos, upload_buffer, 0, len));
1567 iter_upload sock c
1568 with e -> if !verbose then lprintf_nl
1569 "Exception %s in iter_upload" (Printexc2.to_string e)
1570 end else
1571 begin
1572 (* lprintf "client is waiting for another piece\n"; *)
1573 ready_for_upload (as_client c)
1578 In this function we check if we can send bytes (according
1579 to bandwidth control), if we can, call iter_upload to
1580 send a Piece message
1581 @param c the client to which we can send some bytes
1582 @param allowed the amount of bytes we can send to client
1584 let client_can_upload c allowed =
1585 (* lprintf "allowed to upload %d\n" allowed; *)
1586 do_if_connected c.client_sock (fun sock ->
1587 match c.client_upload_requests with
1588 [] -> ()
1589 | _ :: tail ->
1590 let new_allowed_to_write =
1591 c.client_allowed_to_write ++ (Int64.of_int allowed) in
1592 if allowed > 0 && can_write_len sock
1593 (Int64.to_int new_allowed_to_write)
1594 then begin
1595 CommonUploads.consume_bandwidth allowed;
1596 c.client_allowed_to_write <- new_allowed_to_write;
1597 end;
1598 iter_upload sock c
1601 let file_resume file =
1602 List.iter (fun t ->
1603 match t.tracker_status with
1604 | Enabled | Disabled_mld _ -> ()
1605 | Disabled_failure _ | Disabled _ -> t.tracker_status <- Enabled
1606 ) file.file_trackers;
1607 (try talk_to_tracker file true with _ -> ())
1612 Send info to tracker when stopping a file.
1613 @param file the file we want to stop
1615 let file_stop file =
1616 if file.file_tracker_connected then
1617 begin
1618 connect_trackers file "stopped" false (fun _ _ ->
1619 lprintf_file_nl (as_file file) "Tracker return: stopped %s" file.file_name;
1620 file.file_tracker_connected <- false)
1624 Create the 'hooks'
1626 let _ =
1627 client_ops.op_client_can_upload <- client_can_upload;
1628 file_ops.op_file_resume <- file_resume;
1629 file_ops.op_file_recover <- file_resume;
1630 file_ops.op_file_pause <- (fun file ->
1631 Hashtbl.iter (fun _ c ->
1632 match c.client_sock with
1633 Connection sock -> close sock Closed_by_user
1634 | _ -> ()
1635 ) file.file_clients;
1636 (*When a file is paused we consider it is stopped*)
1637 file_stop file
1639 file_ops.op_file_queue <- file_ops.op_file_pause;
1640 client_ops.op_client_enter_upload_queue <- (fun c ->
1641 if !verbose_msg_clients then
1642 lprintf_nl "Client %d: client_enter_upload_queue" (client_num c);
1643 ready_for_upload (as_client c));
1644 network.op_network_connected_servers <- (fun _ -> []);