separate udp trackers code to bTUdpTracker.mlp
[mldonkey.git] / src / networks / bittorrent / bTClients.ml
blobdda523363ea5399d0ee9e20c1cb88cc63822b72f
1 (* Copyright 2001, 2002 b52_simon :), b8_bavard, b8_fee_carabine, INRIA *)
2 (*
3 This file is part of mldonkey.
5 mldonkey is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation; either version 2 of the License, or
8 (at your option) any later version.
10 mldonkey is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 GNU General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with mldonkey; if not, write to the Free Software
17 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
21 (** Functions used in client<->client communication
22 and also client<->tracker
25 (** A peer (or client) is always a remote peer in this file.
26 A Piece is a portion of the file associated with a hash (sha1).
27 In mldonkey a piece is referred as a block inside the swarming system.
28 A SubPiece is a portion of a piece (without hash) which can be
29 sent/downloaded to/from a peer.
30 In mldonkey a SubPiece is referred as a range inside the swarming system.
31 @see <http://wiki.theory.org/index.php/BitTorrentSpecification> wiki for some
32 unofficial (but more detailed) specs.
35 open Int64ops
36 open AnyEndian
37 open BigEndian
38 open Printf2
39 open Md4
40 open Options
41 open BasicSocket
42 open TcpBufferedSocket
43 open Ip_set
45 open CommonShared
46 open CommonUploads
47 open CommonOptions
48 open CommonDownloads
49 open CommonInteractive
50 open CommonClient
51 open CommonComplexOptions
52 open CommonTypes
53 open CommonFile
54 open CommonSwarming
55 open CommonGlobals
56 open CommonDownloads
58 open BTRate
59 open BTTypes
60 open BTProtocol
61 open BTOptions
62 open BTGlobals
63 open BTComplexOptions
64 open BTChooser
65 open BTStats
66 open TcpMessages
68 module VB = VerificationBitmap
70 let http_ok = "HTTP 200 OK"
71 let http11_ok = "HTTP/1.1 200 OK"
74 let next_uploaders = ref ([] : BTTypes.client list)
75 let current_uploaders = ref ([] : BTTypes.client list)
77 (** Check that client is valid and record it *)
78 let maybe_new_client file id ip port =
79 let cc = Geoip.get_country_code_option ip in
80 if id <> !!client_uid
81 && ip != Ip.null
82 && port <> 0
83 && (match !Ip.banned (ip, cc) with
84 | None -> true
85 | Some reason ->
86 if !verbose_connect then
87 lprintf_file_nl (as_file file) "%s:%d blocked: %s" (Ip.to_string ip) port reason;
88 false)
89 then
90 ignore (new_client file id (ip,port) cc);
91 if !verbose_sources > 1 then
92 lprintf_file_nl (as_file file) "Received %s:%d" (Ip.to_string ip) port
95 let resume_clients_hook = ref (fun _ -> assert false)
97 include struct
99 (* open modules locally *)
100 open BTUdpTracker
101 open UdpSocket
103 let string_of_event = function
104 | READ_DONE -> "READ_DONE"
105 | WRITE_DONE -> "WRITE_DONE"
106 | CAN_REFILL -> "CAN_REFILL"
107 | BASIC_EVENT e -> match e with
108 | CLOSED reason -> "CLOSED " ^ (string_of_reason reason)
109 | RTIMEOUT -> "RTIMEOUT"
110 | WTIMEOUT -> "WTIMEOUT"
111 | LTIMEOUT -> "LTIMEOUT"
112 | CAN_READ -> "CAN_READ"
113 | CAN_WRITE -> "CAN_WRITE"
115 (** talk to udp tracker and parse response
116 except of parsing should perform everything that
117 talk_to_tracker's inner function does FIXME refactor both *)
118 let interact host port args file t need_sources =
120 lprintf_nl "udpt start with %s:%d" host port;
121 let addr = try (Unix.gethostbyname host).Unix.h_addr_list.(0) with exn -> failwith ("failed to resolve " ^ host) in
122 let ip = Ip.of_inet_addr addr in
123 lprintf_nl "udpt resolved to ip %s" (Ip.to_string ip);
124 let sock = create Unix.inet_addr_any 0 (fun sock event ->
125 (* lprintf_nl "udpt got event %s for %s" (string_of_event event) host *)
126 match event with
127 | WRITE_DONE | CAN_REFILL -> ()
128 | READ_DONE -> assert false (* set_reader prevents this *)
129 | BASIC_EVENT x -> match x with
130 | CLOSED _ -> ()
131 | CAN_READ | CAN_WRITE -> assert false (* udpSocket implementation prevents this *)
132 | LTIMEOUT | WTIMEOUT | RTIMEOUT -> close sock (Closed_for_error "udpt timeout"))
134 let txn = Random.int32 Int32.max_int in
135 lprintf_nl "udpt txn %ld for %s" txn host;
136 write sock false (connect_request txn) ip port;
137 set_reader sock (fun _ ->
138 let p = read sock in
139 let conn = connect_response p.udp_content txn in
140 lprintf_nl "udpt connection_id %Ld for %s" conn host;
141 let txn = Random.int32 Int32.max_int in
142 lprintf_nl "udpt txn' %ld for host %s" txn host;
143 let int s = Int64.of_string (List.assoc s args) in
144 let req = announce_request conn txn
145 ~info_hash:(List.assoc "info_hash" args)
146 ~peer_id:(List.assoc "peer_id" args)
147 (int "downloaded",int "left",int "uploaded")
148 (match List.assoc "event" args with
149 | "completed" -> 1l
150 | "started" -> 2l
151 | "stopped" -> 3l
152 | "" -> 0l
153 | s -> lprintf_nl "udpt event %s? for %s" s host; 0l)
154 ~numwant:(try Int32.of_string (List.assoc "numwant" args) with _ -> -1l)
155 (int_of_string (List.assoc "port" args))
157 write sock false req ip port;
158 set_reader sock (fun _ ->
159 let p = read sock in
161 t.tracker_last_conn <- last_time ();
162 file.file_tracker_connected <- true;
163 t.tracker_interval <- 600;
164 t.tracker_min_interval <- 600;
165 if need_sources then t.tracker_last_clients_num <- 0;
167 let (interval,clients) = announce_response p.udp_content txn in
168 lprintf_nl "udpt got interval %ld clients %d for host %s" interval (List.length clients) host;
169 if interval > 0l then
170 begin
171 t.tracker_interval <- Int32.to_int interval;
172 if t.tracker_min_interval > t.tracker_interval then
173 t.tracker_min_interval <- t.tracker_interval
174 end;
175 List.iter (fun (ip',port) ->
176 let ip = Ip.of_int64 (Int64.logand 0xFFFFFFFFL (Int64.of_int32 ip')) in
177 lprintf_nl "udpt got %s:%d" (Ip.to_string ip) port;
178 maybe_new_client file Sha1.null ip port
179 ) clients;
180 close sock Closed_by_user;
181 lprintf_nl "udpt interact done for %s" host;
182 if need_sources then !resume_clients_hook file
184 with
185 exn ->
186 lprintf_nl "udpt interact exn %s" (Printexc2.to_string exn)
188 end (* include *)
191 In this function we connect to a tracker.
192 @param file The file concerned by the request
193 @param url Url of the tracker to connect
194 @param event Event (as a string) to send to the tracker :
195 can be 'completed' if the file is complete, 'started' for the first
196 connection to this tracker or 'stopped' for a clean stop of the file.
197 Everything else will be ok for a second connection to the tracker.
198 Be careful to the spelling of this event
199 @param f The function used to parse the result of the connection.
200 The function will get a file as an argument (@see talk_to_tracker
201 for an example)
203 If we have less than !!ask_tracker_threshold sources
204 and if we respect the file_tracker_interval then
205 we really ask sources to the tracker
207 let connect_trackers file event need_sources f =
209 (* reset session statistics when sending 'started' event *)
210 if event = "started" then
211 begin
212 file.file_session_uploaded <- Int64.zero;
213 file.file_session_downloaded <- Int64.zero;
214 end;
216 let args,must_check_delay, left =
218 match file.file_swarmer with
219 None ->
220 begin
221 match event with
222 | "started" -> [("event", "started")],true,zero
223 | "stopped" -> [("event", "stopped")],false,zero
224 | _ -> [],true,zero
227 | Some swarmer ->
228 let local_downloaded = CommonSwarming.downloaded swarmer in
229 let left = file_size file -- local_downloaded in
230 match event with
231 | "completed" -> [("event", "completed")],false,zero
232 | "started" -> [("event", "started")],true,left
233 | "stopped" -> [("event", "stopped")],false,left
234 | _ -> [],true,left
237 let args = ("no_peer_id", "1") :: ("compact", "1") :: args in
238 let args =
239 if not need_sources then
240 ("numwant", "0") :: args
241 else if !!numwant > -1 then
242 ("numwant", string_of_int !!numwant) :: args
243 else
244 args
246 let args = if !!send_key then
247 ("key", Sha1.to_hexa !!client_uid) :: args else args
249 let args = if !!force_client_ip then
250 ("ip", Ip.to_string !!set_client_ip) :: args else args
252 let args =
253 ("info_hash", Sha1.direct_to_string file.file_id) ::
254 ("peer_id", Sha1.direct_to_string !!client_uid) ::
255 ("port", string_of_int !!client_port) ::
256 ("uploaded", Int64.to_string file.file_session_uploaded) ::
257 ("downloaded", Int64.to_string file.file_session_downloaded) ::
258 ("left", Int64.to_string left) ::
259 args
262 let enabled_trackers =
263 let enabled_trackers = List.filter (fun t -> tracker_is_enabled t) file.file_trackers in
264 if enabled_trackers <> [] then enabled_trackers
265 else begin
266 (* if there is no tracker left, do something ? *)
267 if !verbose_msg_servers then
268 lprintf_nl "No trackers left for %s, reenabling all of them..." (file_best_name (as_file file));
269 List.iter (fun t ->
270 match t.tracker_status with
271 (* only re-enable after normal error *)
272 | Disabled _ -> t.tracker_status <- Enabled
273 | _ -> ()) file.file_trackers;
274 let enabled_trackers = List.filter (fun t -> tracker_is_enabled t) file.file_trackers in
275 if enabled_trackers = [] && (file_state file) <> FilePaused then
276 begin
277 file_pause (as_file file) (CommonUserDb.admin_user ());
278 lprintf_file_nl (as_file file) "Paused %s, no usable trackers left" (file_best_name (as_file file))
279 end;
280 enabled_trackers
281 end in
283 List.iter (fun t ->
285 (* if we have too few sources we may ask the tracker before the interval *)
286 if not must_check_delay
287 || not file.file_tracker_connected
288 || t.tracker_last_conn + t.tracker_interval < last_time()
289 || ( file.file_clients_num < !!ask_tracker_threshold
290 && (file_state file) == FileDownloading
291 && (if t.tracker_min_interval > !!min_tracker_reask_interval then
292 t.tracker_last_conn + t.tracker_min_interval < last_time()
293 else
294 t.tracker_last_conn + !!min_tracker_reask_interval < last_time() ))
295 then
296 begin
297 (* if we already tried to connect but failed, disable tracker, but allow re-enabling *)
298 if file.file_tracker_connected && t.tracker_last_clients_num = 0 && t.tracker_last_conn < 1 then
299 begin
300 if !verbose_msg_servers then
301 lprintf_nl "Request error from tracker: disabling %s" (show_tracker_url t.tracker_url);
302 t.tracker_status <- Disabled (intern "MLDonkey: Request error from tracker")
304 (* Send request to tracker *)
305 else
306 let args = if String.length t.tracker_id > 0 then
307 ("trackerid", t.tracker_id) :: args else args
309 let args = if String.length t.tracker_key > 0 then
310 ("key", t.tracker_key) :: args else args
312 if !verbose_msg_servers then
313 lprintf_nl "connect_trackers: connected:%s id:%s key:%s last_clients:%i last_conn-last_time:%i numwant:%s file: %s"
314 (string_of_bool file.file_tracker_connected)
315 t.tracker_id t.tracker_key t.tracker_last_clients_num
316 (t.tracker_last_conn - last_time()) (try List.assoc "numwant" args with _ -> "_") file.file_name;
318 match t.tracker_url with
319 | `Http url ->
320 let module H = Http_client in
321 let r = {
322 H.basic_request with
323 H.req_url = Url.of_string ~args: args url;
324 H.req_proxy = !CommonOptions.http_proxy;
325 H.req_user_agent = get_user_agent ();
326 (* #4541 [egs] supports redirect *)
327 H.req_max_retry = !!max_tracker_redirect;
328 } in
330 if !verbose_msg_servers then
331 lprintf_nl "Request sent to tracker %s for file: %s"
332 url file.file_name;
333 H.wget r
334 (fun fileres ->
335 t.tracker_last_conn <- last_time ();
336 file.file_tracker_connected <- true;
337 f t fileres)
338 | `Other url -> assert false (* should have been disabled *)
339 | `Udp (host,port) -> interact host port args file t need_sources
342 else
343 if !verbose_msg_servers then
344 lprintf_nl "Request NOT sent to tracker %s - next request in %ds for file: %s"
345 (show_tracker_url t.tracker_url) (t.tracker_interval - (last_time () - t.tracker_last_conn)) file.file_name
346 ) enabled_trackers
348 let start_upload c =
349 set_client_upload (as_client c) (as_file c.client_file);
350 set_client_has_a_slot (as_client c) NormalSlot;
351 Rate.update_no_change c.client_downloaded_rate;
352 Rate.update_no_change c.client_upload_rate;
353 c.client_last_optimist <- last_time();
354 client_enter_upload_queue (as_client c);
355 send_client c Unchoke
357 (** In this function we decide which peers will be
358 uploaders. We send a choke message to current uploaders
359 that are not in the next uploaders list. We send Unchoke
360 for clients that are in next list (and not in current)
362 let recompute_uploaders () =
363 if !verbose_upload then lprintf_nl "recompute_uploaders";
364 next_uploaders := choose_uploaders current_files;
365 (*Send choke if a current_uploader is not in next_uploaders*)
366 List.iter ( fun c -> if ((List.mem c !next_uploaders)==false) then
367 begin
368 set_client_has_a_slot (as_client c) NoSlot;
369 (*we will let him finish his download and choke him on next_request*)
371 ) !current_uploaders;
373 (*don't send Choke if new uploader is already an uploaders *)
374 List.iter ( fun c ->
375 if not (List.mem c !current_uploaders) then start_upload c
376 ) !next_uploaders;
377 current_uploaders := !next_uploaders
380 (****** Fabrice: why are clients which are disconnected removed ???
381 These clients might still be useful to reconnect to, no ? *)
384 (** This function is called when a client is disconnected
385 (be it by our side or its side).
386 A client which disconnects (even only one time) is discarded.
387 If it's an uploader which disconnects we recompute uploaders
388 (see recompute_uploaders) immediately.
389 @param c The client to disconnect
390 @param reason The reason for the disconnection (see in BasicSocket.ml)
392 let disconnect_client c reason =
393 if !verbose_msg_clients then
394 lprintf_nl "Client %d: disconnected: %s" (client_num c) (string_of_reason reason);
395 begin
396 match c.client_sock with
397 NoConnection -> ()
398 | ConnectionWaiting token ->
399 cancel_token token;
400 c.client_sock <- NoConnection
401 | Connection sock ->
402 close sock reason;
404 (* List.iter (fun r -> CommonSwarming.free_range r) c.client_ranges; *)
405 set_client_disconnected c reason;
406 c.client_session_downloaded <- 0L;
407 c.client_session_uploaded <- 0L;
408 (try if c.client_good then count_seen c with _ -> ());
409 (* this is not useful already done in the match
410 (try close sock reason with _ -> ()); *)
411 (*---------not needed ?? VvvvvV---------------
412 c.client_ranges <- [];
413 c.client_block <- None;
414 if not c.client_good then
415 connection_failed c.client_connection_control;
416 c.client_good <- false;
417 c.client_sock <- NoConnection;
418 c.client_chunks <- [];
419 c.client_allowed_to_write <- zero;
420 c.client_new_chunks <- [];
421 c.client_interesting <- false;
422 c.client_alrd_sent_interested <- false;
423 -------------------^^^^^--------------------*)
424 if (c.client_registered_bitfield) then
425 begin
426 match c.client_uploader with
427 None -> ()
428 | Some up ->
429 c.client_uploader <- None;
430 (* If the client registered a bitfield then
431 we must unregister him to update the swarmer
432 (Useful for availability)
434 CommonSwarming.unregister_uploader up
435 (* c.client_registered_bitfield <- false;
436 for i = 0 to String.length c.client_bitmap - 1 do
437 c.client_bitmap.[0] <- '0';
438 done*)
439 end;
440 (* Don't test if a client have an upload slot because
441 it don't have one (removed during earlier in
442 set_client_disconnected c reason)
444 if (List.mem c !current_uploaders) then
445 begin
446 (*BTGlobals.remove_client*)
447 remove_client c;
448 recompute_uploaders ();
450 else
451 remove_client c;
452 with _ -> ()
456 (** Disconnect all clients of a file
457 @param file The file to which we must disconnects all clients
459 let disconnect_clients file =
460 let must_keep = ref true in
461 (match file_state file with
462 | FilePaused | FileCancelled -> must_keep:=false
463 | _-> ()
465 Hashtbl.iter (fun _ c ->
466 if not ( !must_keep && (client_has_a_slot (as_client c) || c.client_interested)) then
467 begin
468 if !verbose_msg_clients then
469 lprintf_file_nl (as_file file) "disconnect since download is finished";
470 disconnect_client c Closed_by_user
472 ) file.file_clients
475 (** What to do when a file is finished
476 @param file the finished file
478 let download_finished file =
479 if List.memq file !current_files then
480 begin
481 connect_trackers file "completed" false (fun _ _ -> ()); (*must be called before swarmer gets removed from file*)
482 (*CommonComplexOptions.file_completed*)
483 file_completed (as_file file);
484 (* Remove the swarmer for this file as it is not useful anymore... *)
485 CommonSwarming.remove_swarmer file.file_swarmer;
486 file.file_swarmer <- None;
487 (* At this point, the file state is FileDownloaded. We should not remove
488 the file, because we continue to upload. *)
492 (** Check if a file is finished or not.
493 A file is finished if all blocks are verified.
494 @param file The file to check status
496 let check_finished swarmer file =
497 if CommonSwarming.check_finished swarmer then
498 download_finished file
500 let bits = [| 128; 64; 32;16;8;4;2;1 |]
502 (* Check/set bits in strings (bittorrent format) *)
504 let is_bit_set s n =
505 (Char.code s.[n lsr 3]) land bits.(n land 7) <> 0
507 let set_bit s n =
508 let i = n lsr 3 in
509 s.[i] <- Char.unsafe_chr (Char.code s.[i] lor bits.(n land 7))
511 (* Official client seems to use max_range_request 5 and max_range_len 2^14 *)
512 (* How much requests in the 'pipeline' *)
513 let max_range_requests = 5
514 (* How much bytes we can request in one Piece *)
517 (** A wrapper to send Interested message to a client.
518 (Send interested only if needed)
519 @param c The client to send Interested
521 let send_interested c =
522 if c.client_interesting && (not c.client_alrd_sent_interested) then
523 begin
524 c.client_alrd_sent_interested <- true;
525 send_client c Interested
529 (** Send a Bitfield message to a client.
530 @param c The client to send the Bitfield message
533 let send_bitfield c =
534 send_client c (BitField
536 match c.client_file.file_swarmer with
537 | None ->
538 (* This must be a seeded file... *)
539 if !verbose_download then
540 lprintf_nl "Sending completed verified bitmap";
541 let nchunks = Array.length c.client_file.file_chunks in
542 let len = (nchunks+7)/8 in
543 let s = String.make len '\000' in
544 for i = 0 to nchunks - 1 do
545 set_bit s i
546 done;
548 | Some swarmer ->
549 let bitmap = CommonSwarming.chunks_verified_bitmap swarmer in
550 if !verbose_download then
551 lprintf_nl "Sending verified bitmap: [%s]" (VB.to_string bitmap);
552 let len = (VB.length bitmap + 7)/8 in
553 let s = String.make len '\000' in
554 VB.iteri (fun i c ->
555 if c = VB.State_verified then set_bit s i) bitmap;
559 let counter = ref 0
561 let parse_reserved rbits c =
562 let has_bit pos h = Char.code rbits.[pos] land h <> 0 in
564 c.client_dht <- has_bit 7 0x01;
565 c.client_cache_extension <- has_bit 7 0x02;
566 c.client_fast_extension <- has_bit 7 0x04;
568 c.client_utorrent_extension <- has_bit 5 0x10;
570 c.client_azureus_messaging_protocol <- has_bit 0 0x80
573 (** This function is called to parse the first message that
574 a client send.
575 @param counter client num
576 @param cc Expected client (probably useless now that we don't save any client)
577 @param init_sent A boolean to know if we sent this client the handshake message
578 @param gconn Don't know
579 @param sock The socket we use for this client
580 @param proto Unused (required by tuple type?)
581 @param file_id The file hash (sha1) of the file involved in this exchange
583 (* removed: @param peer_id The hash (sha1) of the client. (Should be checked)
585 let rec client_parse_header counter cc init_sent gconn sock
586 (proto, rbits, file_id) =
588 set_lifetime sock 600.;
589 if !verbose_msg_clients then
590 lprintf_nl "client_parse_header %d" counter;
592 let file = Hashtbl.find files_by_uid file_id in
593 if !verbose_msg_clients then
594 lprintf_file_nl (as_file file) "file found";
595 let ccc, cc_country_code = !cc in
596 let c =
597 match ccc with
598 None ->
599 let c = new_client file Sha1.null (TcpBufferedSocket.peer_addr sock) cc_country_code in
600 if !verbose_connect then lprintf_file_nl (as_file file) "Client %d: incoming connection" (client_num c);
601 cc := (Some c), cc_country_code;
603 | Some c ->
604 (* Does it happen that this c was already used to connect successfully?
605 If yes then this must happen: *)
606 c.client_received_peer_id <- false;
607 if cc_country_code <> None && c.client_country_code = None then
608 c.client_country_code <- cc_country_code;
610 (* client could have had Sha1.null as peer_id/uid *)
611 (* this is to be done, later
612 if c.client_uid <> peer_id then
613 c.client_software <- (parse_software (Sha1.direct_to_string peer_id));
617 (* if c.client_uid <> peer_id then begin
618 lprintf "Unexpected client by UID\n";
619 let ccc = new_client file peer_id (TcpBufferedSocket.host sock) in
620 lprintf "CLIENT %d: testing instead of %d\n"
621 (client_num ccc) (client_num c);
622 (match ccc.client_sock with
623 Connection _ ->
624 lprintf_nl "[BT]: This client is already connected";
625 close sock (Closed_for_error "Already connected");
626 remove_client ccc;
628 | _ ->
629 lprintf_nl "[BT]: Client %d: recovered by UID" (client_num ccc);
630 remove_client c;
631 cc := Some ccc;
632 ccc)
633 end else
634 c *)
637 if !verbose_msg_clients then begin
638 let (ip,port) = c.client_host in
639 lprintf_nl "Client %d: Connected from %s:%d" (client_num c)
640 (Ip.to_string ip) port;
641 end;
643 parse_reserved rbits c;
645 (match c.client_sock with
646 NoConnection ->
647 if !verbose_msg_clients then begin
648 let (ip,port) = c.client_host in
649 lprintf_nl "No connection to client (%s:%d)!!!" (Ip.to_string ip) port;
650 end;
651 c.client_sock <- Connection sock
652 | ConnectionWaiting token ->
653 cancel_token token;
654 if !verbose_msg_clients then
655 lprintf_nl "Waiting for connection to client !!!";
656 c.client_sock <- Connection sock
657 | Connection s when s != sock ->
658 if !verbose_msg_clients then
659 lprintf_nl "CLIENT %d: IMMEDIATE RECONNECTION" (client_num c);
660 disconnect_client c (Closed_for_error "Reconnected");
661 c.client_sock <- Connection sock;
662 | Connection _ -> ()
665 set_client_state (c) (Connected (-1));
666 if not init_sent then
667 begin
668 c.client_incoming <- true;
669 send_init !!client_uid file_id sock;
670 end;
671 connection_ok c.client_connection_control;
672 if !verbose_msg_clients then
673 lprintf_nl "file and client found";
674 (* if not c.client_incoming then *)
675 send_bitfield c;
676 c.client_blocks_sent <- file.file_blocks_downloaded;
678 TODO !!! : send interested if and only if we are interested
679 -> we must recieve at least other peer bitfield.
680 in common swarmer -> compare : partition -> partition -> bool
683 set_rtimeout sock !!client_timeout;
684 (* Once parsed succesfully we define the function client_to_client
685 to be the function used when a message is read *)
686 gconn.gconn_handler <- Reader (fun gconn sock ->
687 bt_handler TcpMessages.parsing (client_to_client c) c sock
690 let b = TcpBufferedSocket.buf sock in
691 (* The receive buffer is normally not empty now, lets parse the rest, most likely PeerID *)
692 if b.len <> 0 then
693 ignore (bt_handler TcpMessages.parsing (client_to_client c) c sock);
695 (* Some newer clients send more opcodes in their handshake packet, lets parse them now.
696 Using "while b.len <> 0 do ... done" is not possible here because libtorrent clients
697 send unparsable five extra bytes after their PeerID which would result into a loop *)
698 if b.len <> 0 then
699 ignore (bt_handler TcpMessages.parsing (client_to_client c) c sock);
701 with
702 | Not_found ->
703 let (ip,port) = (TcpBufferedSocket.peer_addr sock) in
704 if !verbose_unexpected_messages then
705 lprintf_nl "Client %s:%d requested a file that is not shared [%s]"
706 (Ip.to_string ip) port (Sha1.to_hexa file_id)
707 | e ->
708 lprintf_nl "Exception %s in client_parse_header" (Printexc2.to_string e);
709 close sock (Closed_for_exception e);
710 raise e
713 (** Update the bitmap of a client. Unclear if it is still useful.
714 @param c The client which we want to update.
716 and update_client_bitmap c =
717 let file = c.client_file in
719 let swarmer = match file.file_swarmer with
720 None -> assert false
721 | Some swarmer -> swarmer
724 let up =
725 match c.client_uploader with
726 None ->
727 let up = CommonSwarming.register_uploader swarmer (as_client c)
728 (AvailableIntervals []) in
729 c.client_uploader <- Some up;
731 | Some up ->
735 let bitmap = match c.client_bitmap with
736 None ->
737 let len = CommonSwarming.partition_size swarmer in
738 let bitmap = Bitv.create len false in
739 c.client_bitmap <- Some bitmap;
740 bitmap
741 | Some bitmap -> bitmap
744 if c.client_new_chunks <> [] then begin
745 let chunks = c.client_new_chunks in
746 c.client_new_chunks <- [];
747 List.iter (fun n -> Bitv.set bitmap n true) chunks;
748 CommonSwarming.update_uploader_intervals up (AvailableBitv bitmap);
752 (** In this function we decide which piece we must request from client.
753 @param sock Socket of the client
754 @param c The client
756 and get_from_client sock (c: client) =
757 let file = c.client_file in
758 (* Check if there's not enough requests in the 'pipeline'
759 and if a request can be send (not choked and file is downloading) *)
760 if List.length c.client_ranges_sent < max_range_requests
761 && file_state file = FileDownloading
762 && (c.client_choked == false)
763 then
764 (* num is the number of the piece, x and y are the position
765 of the subpiece in the piece(!), r is a (CommonSwarmer) range *)
767 let up = match c.client_uploader with
768 None -> assert false
769 | Some up -> up in
770 let swarmer = CommonSwarming.uploader_swarmer up in
774 let num, x,y, r =
776 if !verbose_msg_clients then
777 lprintf_file_nl (as_file file) "CLIENT %d: Finding new range to send" (client_num c);
779 if !verbose_swarming then begin
780 lprintf_n "Current download:\n Current chunks: ";
783 List.iter (fun (x,y) -> lprintf "%Ld-%Ld " x y) c.client_chunks
784 with _ -> lprintf "No Chunks";
786 lprint_newline ();
788 lprintf_n "Current ranges: ";
790 List.iter (fun (p1,p2, r) ->
791 let (x,y) = CommonSwarming.range_range r in
792 lprintf "%Ld-%Ld[%Ld-%Ld] " p1 p2 x y
793 ) c.client_ranges_sent;
795 match c.client_range_waiting with
796 | None -> ()
797 | Some (x,y,r) -> lprintf "Waiting %Ld-%Ld" x y;
799 lprint_newline ();
801 lprintf_n "Current blocks: ";
803 match c.client_chunk with
804 | None -> lprintf "none"
805 | Some (chunk, blocks) -> List.iter (fun b ->
806 CommonSwarming.print_block b.up_block) blocks;
808 lprint_newline ();
810 lprintf_file_nl (as_file file) "Finding Range:";
811 end;
815 (*We must find a block to request first, and then
816 some range inside this block
819 let rec iter () =
821 match c.client_chunk with
823 | None ->
825 if !verbose_swarming then lprintf_file_nl (as_file file) "No block";
826 update_client_bitmap c;
827 (try CommonSwarming.verify_one_chunk swarmer with _ -> ());
828 (*Find a free block in the swarmer*)
829 let chunk, blocks = CommonSwarming.find_blocks up in
830 if !verbose_swarming then begin
831 lprintf_n "Blocks Found: "; List.iter (fun b ->
832 CommonSwarming.print_block b.up_block) blocks;
833 lprint_newline ()
834 end;
835 c.client_chunk <- Some (chunk, blocks);
837 (*We put the found block in client_block to
838 request range in this block. (Useful for
839 not searching each time a new block)
842 iter ()
844 | Some (chunk, blocks) ->
846 if !verbose_swarming then begin
847 lprintf_n "Current Blocks: "; List.iter (fun b ->
848 CommonSwarming.print_block b.up_block) blocks;
849 lprint_newline ()
850 end;
853 (*Given a block find a range inside*)
854 let (x,y,r) =
855 match c.client_range_waiting with
856 | Some (x,y,r) ->
857 c.client_range_waiting <- None;
858 (x,y,r)
859 | None ->
860 CommonSwarming.find_range up (min max_range_len file.file_piece_size)
863 let (x,y,r) =
865 if y -- x > max_range_len then begin
866 c.client_range_waiting <- Some (x ++ max_range_len, y, r);
867 (x, x ++ max_range_len, r)
868 end else
869 (x,y,r)
872 c.client_ranges_sent <- c.client_ranges_sent @ [x,y, r];
873 (* CommonSwarming.alloc_range r; *)
875 (* naughty, naughty, was computing a block number instead of a chunk
876 number. Only matters with merged downloads, and even then other
877 clients didn't seem to care (?), so the bug remained hidden *)
878 if !verbose_swarming then
879 lprintf_file_nl (as_file file) "Asking %d For Range %Ld-%Ld" chunk x y;
881 chunk, x -- file.file_piece_size ** Int64.of_int chunk, y -- x, r
883 with Not_found ->
885 (*If we don't find a range to request inside the block,
886 iter to choose another block*)
887 if !verbose_swarming then
888 lprintf_nl "Could not find range in current block";
889 (* c.client_blocks <- List2.removeq b c.client_blocks; *)
891 c.client_chunk <- None;
893 iter ()
896 iter ()
898 with Not_found ->
899 (*If we don't find a block to request we can check if the
900 file is finished (if there's missing pieces we can't decide
901 that the file is finished because we didn't found
902 a block to ask)
904 if !verbose_swarming then
905 lprintf_nl "Unable to get a block !!";
906 CommonSwarming.compute_bitmap swarmer;
907 check_finished swarmer file;
908 raise Not_found
911 send_client c (Request (num,x,y));
913 if !verbose_msg_clients then
914 lprintf_file_nl (as_file file) "CLIENT %d: Asking %s For Range %Ld-%Ld"
915 (client_num c) (Sha1.to_string c.client_uid) x y
917 with Not_found ->
918 if not (CommonSwarming.check_finished swarmer) && !verbose_download then
919 lprintf_file_nl (as_file file) "BTClient.get_from_client ERROR: can't find a block to download and file is not yet finished for file : %s..." file.file_name
922 (** In this function we match a message sent by a client
923 and react according to this message.
924 @param c The client which sent us a message
925 @param sock The socket used for this client
926 @param msg The message sent by the client
928 and client_to_client c sock msg =
929 if !verbose_msg_clients then begin
930 let (ip,port) = (TcpBufferedSocket.peer_addr sock) in
931 let (timeout, next) = get_rtimeout sock in
932 lprintf_nl "CLIENT %d(%s:%d): (%d, %d,%d) Received %s"
933 (client_num c) (Ip.to_string ip) port
934 (last_time ())
935 (int_of_float timeout)
936 (int_of_float next)
937 (TcpMessages.to_string msg);
938 end;
940 let file = c.client_file in
942 (* Sending the "Have" message was moved to bTGlobals so this is useless *)
943 (* if c.client_blocks_sent != file.file_blocks_downloaded then begin
944 let rec iter list =
945 match list with
946 [] -> ()
947 | b :: tail when tail == c.client_blocks_sent ->
948 c.client_blocks_sent <- list;
949 let (num,_,_) = CommonSwarming.block_block b in
950 send_client c (Have (Int64.of_int num))
951 | _ :: tail -> iter tail
953 iter file.file_blocks_downloaded
954 end;*)
957 match msg with
958 Piece (num, offset, s, pos, len) ->
959 (*A Piece message contains the data*)
960 set_client_state c (Connected_downloading (file_num file));
961 (*flag it as a good client *)
962 c.client_good <- true;
963 if file_state file = FileDownloading then begin
964 let position = offset ++ file.file_piece_size *.. num in
965 let up = match c.client_uploader with
966 None -> assert false
967 | Some up -> up in
968 let swarmer = CommonSwarming.uploader_swarmer up in
970 if !verbose_msg_clients then
971 (match c.client_ranges_sent with
972 [] -> lprintf_file_nl (as_file file) "EMPTY Ranges !!!"
973 | (p1,p2,r) :: _ ->
974 let (x,y) = CommonSwarming.range_range r in
975 lprintf_file_nl (as_file file) "Current range from %s : %Ld [%d] (asked %Ld-%Ld[%Ld-%Ld])"
976 (brand_to_string c.client_brand) position len
977 p1 p2 x y
980 let old_downloaded =
981 CommonSwarming.downloaded swarmer in
982 (* List.iter CommonSwarming.free_range c.client_ranges; *)
983 CommonSwarming.received up
984 position s pos len;
985 (* List.iter CommonSwarming.alloc_range c.client_ranges; *)
986 let new_downloaded =
987 CommonSwarming.downloaded swarmer in
989 (*Update rate and amount of data received from client*)
990 count_download c (new_downloaded -- old_downloaded);
991 (* use len here with max_dr quickfix *)
992 Rate.update c.client_downloaded_rate ~amount:len;
993 (* count bytes downloaded from network for this file *)
994 file.file_session_downloaded <- file.file_session_downloaded ++ (Int64.of_int len);
995 if !verbose_msg_clients then
996 (match c.client_ranges_sent with
997 [] -> lprintf_file_nl (as_file file) "EMPTY Ranges !!!"
998 | (p1,p2,r) :: _ ->
999 let (x,y) = CommonSwarming.range_range r in
1000 lprintf_file_nl (as_file file) "Received %Ld [%d] %Ld-%Ld[%Ld-%Ld] -> %Ld"
1001 position len
1002 p1 p2 x y
1003 (new_downloaded -- old_downloaded)
1006 (* changed 2.5.28 should have been done before !
1007 if new_downloaded <> old_downloaded then
1008 add_file_downloaded (as_file file)
1009 (new_downloaded -- old_downloaded); *)
1010 end;
1011 begin
1012 match c.client_ranges_sent with
1013 [] -> ()
1014 | r :: tail ->
1015 (* CommonSwarming.free_range r; *)
1016 c.client_ranges_sent <- tail;
1017 end;
1018 get_from_client sock c;
1020 (* Check if the client is still interesting for us... *)
1021 check_if_interesting file c
1023 | PeerID p ->
1024 (* Disconnect if that is ourselves. *)
1025 c.client_uid <- Sha1.direct_of_string p;
1026 if not (c.client_uid = !!client_uid) then
1027 begin
1028 let brand, release = parse_software p in
1029 c.client_brand <- brand;
1030 c.client_release <- release;
1031 send_client c Choke;
1032 c.client_sent_choke <- true;
1034 else
1035 disconnect_client c Closed_by_user
1038 | BitField p ->
1039 (*A bitfield is a summary of what a client have*)
1040 begin
1041 match c.client_file.file_swarmer with
1042 None -> ()
1043 | Some swarmer ->
1044 c.client_new_chunks <- [];
1046 let npieces = CommonSwarming.partition_size swarmer in
1047 let nbits = String.length p * 8 in
1049 if nbits < npieces then begin
1050 lprintf_file_nl (as_file file) "Error: expected bitfield of atleast %d but got %d" npieces nbits;
1051 disconnect_client c (Closed_for_error "Wrong bitfield length")
1052 end else begin
1054 let bitmap = CommonSwarming.chunks_verified_bitmap swarmer in
1056 for i = 0 to npieces - 1 do
1057 if is_bit_set p i then begin
1058 c.client_new_chunks <- i :: c.client_new_chunks;
1059 match VB.get bitmap i with
1060 | VB.State_missing | VB.State_partial ->
1061 c.client_interesting <- true
1062 | VB.State_complete | VB.State_verified -> ()
1063 end
1064 done;
1066 update_client_bitmap c;
1067 c.client_registered_bitfield <- true;
1069 if c.client_interesting then
1070 send_interested c;
1072 if !verbose_msg_clients then
1073 lprintf_file_nl (as_file file) "New BitField Registered";
1075 (* for i = 1 to max_range_requests - List.length c.client_ranges do
1076 (try get_from_client sock c with _ -> ())
1077 done
1080 end;
1081 end;
1082 (* Note: a bitfield must only be sent after the handshake and before everything else: NOT here *)
1084 | Have n ->
1085 (* A client can send a "Have" without sending a Bitfield *)
1086 begin
1087 match c.client_file.file_swarmer with
1088 None -> ()
1089 | Some swarmer ->
1090 let n = Int64.to_int n in
1091 let bitmap = CommonSwarming.chunks_verified_bitmap swarmer in
1092 (* lprintf_nl "verified: %c;" (VB.state_to_char (VB.get bitmap n)); *)
1093 (* if the peer has a chunk we don't, tell him we're interested and update his bitmap *)
1094 match VB.get bitmap n with
1095 | VB.State_missing | VB.State_partial ->
1096 c.client_interesting <- true;
1097 send_interested c;
1098 c.client_new_chunks <- n :: c.client_new_chunks;
1099 update_client_bitmap c;
1100 | VB.State_complete | VB.State_verified -> ()
1102 (* begin
1103 match c.client_bitmap, c.client_uploader with
1104 Some bitmap, Some up ->
1105 let swarmer = CommonSwarming.uploader_swarmer up in
1106 let n = Int64.to_int n in
1107 if bitmap.[n] <> '1' then
1109 let verified = CommonSwarming.verified_bitmap swarmer in
1110 if verified.[n] < '2' then begin
1111 c.client_interesting <- true;
1112 send_interested c;
1113 c.client_new_chunks <- n :: c.client_new_chunks;
1114 if c.client_block = None then begin
1115 update_client_bitmap c;
1116 (* for i = 1 to max_range_requests -
1117 List.length c.client_ranges do
1118 (try get_from_client sock c with _ -> ())
1119 done*)
1122 | None, Some _ -> lprintf_nl "no bitmap but client_uploader";
1123 | Some _ , None ->lprintf_nl "bitmap but no client_uploader";
1124 | None, None -> lprintf_nl "no bitmap no client_uploader";
1129 | Interested ->
1130 c.client_interested <- true;
1132 | Choke ->
1133 begin
1134 set_client_state (c) (Connected (-1));
1135 (* remote peer will clear the list of range we sent *)
1136 begin
1137 match c.client_uploader with
1138 None ->
1139 (* Afaik this is no protocol violation and happens if the client
1140 didn't send a client bitmap after the handshake. *)
1141 let (ip,port) = c.client_host in
1142 if !verbose_msg_clients then lprintf_file_nl (as_file file) "%s:%d with software %s : Choke send, but no client bitmap"
1143 (Ip.to_string ip) port (brand_to_string c.client_brand)
1144 | Some up ->
1145 CommonSwarming.clear_uploader_intervals up
1146 end;
1147 c.client_ranges_sent <- [];
1148 c.client_range_waiting <- None;
1149 c.client_choked <- true;
1152 | NotInterested ->
1153 c.client_interested <- false;
1155 | Unchoke ->
1156 begin
1157 c.client_choked <- false;
1158 (* remote peer cleared our request : re-request *)
1159 for i = 1 to max_range_requests -
1160 List.length c.client_ranges_sent do
1161 (try get_from_client sock c with _ -> ())
1162 done
1165 | Request (n, pos, len) ->
1166 if len > max_request_len then begin
1167 close sock (Closed_for_error "Request longer than 1<<16");
1168 raise Exit
1169 end;
1171 if !CommonGlobals.has_upload = 0 then
1172 begin
1173 if client_has_a_slot (as_client c) then
1174 begin
1175 (* lprintf "Received request for upload\n"; *)
1176 (match c.client_upload_requests with
1177 [] ->
1178 CommonUploads.ready_for_upload (as_client c);
1179 | _ -> ());
1180 c.client_upload_requests <- c.client_upload_requests @ [n,pos,len];
1181 let file = c.client_file in
1182 match file.file_shared with
1183 None -> ()
1184 | Some s ->
1185 begin
1186 s.impl_shared_requests <- s.impl_shared_requests + 1;
1187 shared_must_update (as_shared s)
1190 else
1191 begin
1192 send_client c Choke;
1193 c.client_sent_choke <- true;
1194 c.client_upload_requests <- [];
1196 end;
1198 | Ping -> ()
1199 (* We don't 'generate' a Ping message on a Ping. *)
1201 | Cancel (n, pos, len) ->
1202 (* if we receive a cancel message from a peer, remove request *)
1203 if client_has_a_slot (as_client c) then
1204 c.client_upload_requests <- List2.remove_first (n, pos, len) c.client_upload_requests
1205 else
1206 if !verbose_msg_clients then
1207 lprintf_file_nl (as_file file) "Error: received cancel request but client has no slot"
1209 with e ->
1210 lprintf_file_nl (as_file file) "Error %s while handling MESSAGE: %s" (Printexc2.to_string e) (TcpMessages.to_string msg)
1213 (** The function used to connect to a client.
1214 The connection is not immediately initiated. It will
1215 be put in a fifo and dequeued according to
1216 !!max_connections_per_second. (@see commonGlobals.ml)
1217 @param c The client we must connect
1219 let connect_client c =
1220 if can_open_connection connection_manager &&
1221 (let (ip,port) = c.client_host in
1222 match !Ip.banned (ip, c.client_country_code) with
1223 None -> true
1224 | Some reason ->
1225 if !verbose_connect then
1226 lprintf_nl "%s:%d (%s), blocked: %s"
1227 (Ip.to_string ip) port
1228 (fst (Geoip.get_country_code_name c.client_country_code))
1229 reason;
1230 false)
1231 then
1232 match c.client_sock with
1233 NoConnection ->
1235 let token =
1236 add_pending_connection connection_manager (fun token ->
1238 if !verbose_msg_clients then
1239 lprintf_nl "CLIENT %d: connect_client" (client_num c);
1240 let (ip,port) = c.client_host in
1241 if !verbose_msg_clients then
1242 lprintf_nl "connecting %s:%d" (Ip.to_string ip) port;
1243 connection_try c.client_connection_control;
1244 begin
1245 let sock = connect token "bittorrent download"
1246 (Ip.to_inet_addr ip) port
1247 (fun sock event ->
1248 match event with
1249 BASIC_EVENT LTIMEOUT ->
1250 if !verbose_msg_clients then
1251 lprintf_nl "CLIENT %d: LIFETIME" (client_num c);
1252 close sock Closed_for_timeout
1253 | BASIC_EVENT RTIMEOUT ->
1254 if !verbose_msg_clients then
1255 lprintf_nl "CLIENT %d: RTIMEOUT (%d)" (client_num c)
1256 (last_time ())
1258 close sock Closed_for_timeout
1259 | BASIC_EVENT (CLOSED r) ->
1260 begin
1261 match c.client_sock with
1262 | Connection s when s == sock ->
1263 disconnect_client c r
1264 | _ -> ()
1265 end;
1266 | _ -> ()
1269 c.client_sock <- Connection sock;
1270 set_lifetime sock 600.;
1271 TcpBufferedSocket.set_read_controler sock download_control;
1272 TcpBufferedSocket.set_write_controler sock upload_control;
1273 TcpBufferedSocket.set_rtimeout sock 30.;
1274 let file = c.client_file in
1276 if !verbose_msg_clients then
1277 lprintf_file_nl (as_file file) "READY TO DOWNLOAD FILE";
1279 send_init !!client_uid file.file_id sock;
1280 (* Fabrice: Initialize the client bitmap and uploader fields to <> None *)
1281 update_client_bitmap c;
1282 (* (try get_from_client sock c with _ -> ());*)
1283 incr counter;
1284 (*We 'hook' the client_parse_header function to the socket
1285 This function will then be called when the first message will
1286 be parsed
1288 set_bt_sock sock !verbose_msg_clients
1289 (BTHeader (client_parse_header !counter (ref ((Some c), c.client_country_code)) true))
1291 with e ->
1292 lprintf_nl "Exception %s while connecting to client"
1293 (Printexc2.to_string e);
1294 disconnect_client c (Closed_for_exception e)
1296 (*Since this is a pending connection put ConnectionWaiting
1297 in client_sock
1300 c.client_sock <- ConnectionWaiting token
1301 | _ -> ()
1304 (** The Listen function (very much like in C : TCP Socket Server).
1305 Monitors client connection to us.
1307 let listen () =
1309 let s = TcpServerSocket.create "bittorrent client server"
1310 (Ip.to_inet_addr !!client_bind_addr)
1311 !!client_port
1312 (fun sock event ->
1313 match event with
1314 TcpServerSocket.CONNECTION (s,
1315 Unix.ADDR_INET(from_ip, from_port)) ->
1316 (*Receiving an event TcpServerSocket.CONNECTION from
1317 the TcpServerSocket means that a new client try
1318 to connect to us
1320 let ip = (Ip.of_inet_addr from_ip) in
1321 let cc = Geoip.get_country_code_option ip in
1322 if !verbose_sources > 1 then lprintf_nl "CONNECTION RECEIVED FROM %s"
1323 (Ip.to_string (Ip.of_inet_addr from_ip))
1325 (*Reject this connection if we don't want
1326 to bypass the max_connection parameter
1328 if can_open_connection connection_manager &&
1329 (match !Ip.banned (ip, cc) with
1330 None -> true
1331 | Some reason ->
1332 if !verbose_connect then
1333 lprintf_nl "%s:%d (%s) blocked: %s"
1334 (Ip.to_string ip) from_port
1335 (fst (Geoip.get_country_code_name cc))
1336 reason;
1337 false)
1338 then
1339 begin
1340 let token = create_token connection_manager in
1341 let sock = TcpBufferedSocket.create token
1342 "bittorrent client connection" s
1343 (fun sock event ->
1344 match event with
1345 BASIC_EVENT (RTIMEOUT|LTIMEOUT) ->
1346 (*monitor read and life timeout on client
1347 sockets
1349 close sock Closed_for_timeout
1350 | _ -> ()
1353 TcpBufferedSocket.set_read_controler sock download_control;
1354 TcpBufferedSocket.set_write_controler sock upload_control;
1356 let c = ref (None, cc) in
1357 TcpBufferedSocket.set_closer sock (fun _ r ->
1358 match fst !c with
1359 Some c -> begin
1360 match c.client_sock with
1361 | Connection s when s == sock ->
1362 disconnect_client c r
1363 | _ -> ()
1365 | None -> ()
1367 set_rtimeout sock 30.;
1368 incr counter;
1369 (*Again : 'hook' client_parse_header to the socket*)
1370 set_bt_sock sock !verbose_msg_clients
1371 (BTHeader (client_parse_header !counter c false));
1373 else
1374 (*don't forget to close the incoming sock if we can't
1375 open a new connection
1377 Unix.close s
1378 | _ -> ()
1379 ) in
1380 listen_sock := Some s;
1382 with e ->
1383 if !verbose_connect then
1384 lprintf_nl "Exception %s while init bittorrent server"
1385 (Printexc2.to_string e)
1388 (** This function send keepalive messages to all connected clients
1389 (and update socket lifetime)
1391 let send_pings () =
1392 List.iter (fun file ->
1393 Hashtbl.iter (fun _ c ->
1394 match c.client_sock with
1395 | Connection sock ->
1396 send_client c Ping;
1397 set_lifetime sock 130.;
1398 | _ -> ()
1399 ) file.file_clients
1400 ) !current_files
1402 open Bencode
1405 (** Check each clients for a given file if they are connected.
1406 If they aren't, try to connect them
1408 let resume_clients file =
1409 Hashtbl.iter (fun _ c ->
1411 match c.client_sock with
1412 | Connection sock -> ()
1413 (*i think this one is not really usefull for debugging
1414 lprintf_nl "[BT]: RESUME: Client is already connected"; *)
1415 | _ ->
1416 (try
1417 (*test if we can connect client according to the its
1418 connection_control.
1419 Currently the delay between two try is 120 seconds.
1421 if connection_can_try c.client_connection_control then
1422 connect_client c
1423 else
1424 print_control c.client_connection_control
1425 with _ -> ())
1426 with e ->
1427 if !verbose_connect then
1428 lprintf_file_nl (as_file file) "Exception %s in resume_clients" (Printexc2.to_string e)
1429 ) file.file_clients
1431 let () =
1432 resume_clients_hook := resume_clients
1434 (** Check if the value replied by the tracker is correct.
1435 @param key the name of the key
1436 @param n the value to check
1437 @param url Url of the tracker
1438 @param name the name of the file
1440 let chk_keyval key n url name =
1441 let int_n = (Int64.to_int n) in
1442 if !verbose_msg_clients then
1443 lprintf_nl "Reply from %s in file: %s has %s: %d" (show_tracker_url url) name key int_n;
1444 if int_n > -1 then
1445 int_n
1446 else begin
1447 lprintf_nl "Reply from %s in file: %s has an invalid %s value: %d" (show_tracker_url url) name key int_n;
1451 (** Check that client is valid and record it *)
1452 let maybe_new_client file id ip port =
1453 let cc = Geoip.get_country_code_option ip in
1454 if id <> !!client_uid
1455 && ip != Ip.null
1456 && port <> 0
1457 && (match !Ip.banned (ip, cc) with
1458 | None -> true
1459 | Some reason ->
1460 if !verbose_connect then
1461 lprintf_file_nl (as_file file) "%s:%d blocked: %s" (Ip.to_string ip) port reason;
1462 false)
1463 then
1464 ignore (new_client file id (ip,port) cc);
1465 if !verbose_sources > 1 then
1466 lprintf_file_nl (as_file file) "Received %s:%d" (Ip.to_string ip) port;
1469 let exn_catch f x = try `Ok (f x) with exn -> `Exn exn
1471 (** In this function we interact with the tracker
1472 @param file The file for which we want some sources
1473 @param need_sources whether we need any sources
1475 let talk_to_tracker file need_sources =
1476 (* This is the function which will be called by the http client for parsing the response *)
1477 let f t filename =
1478 let tracker_url = show_tracker_url t.tracker_url in
1479 let tracker_failed reason =
1480 (* On failure, disable the tracker and count attempts (@see is_tracker_enabled) *)
1481 let num = match t.tracker_status with | Disabled_failure (i,_) -> i + 1 | _ -> 1 in
1482 t.tracker_status <- Disabled_failure (num, intern reason);
1483 lprintf_file_nl (as_file file) "Failure no. %d%s from Tracker %s for file: %s Reason: %s"
1485 (if !!tracker_retries = 0 then "" else Printf.sprintf "/%d" !!tracker_retries)
1486 tracker_url file.file_name (Charset.Locale.to_utf8 reason)
1488 match exn_catch File.to_string filename with
1489 | `Exn _ | `Ok "" -> tracker_failed "empty reply"
1490 | `Ok s ->
1491 match exn_catch Bencode.decode s with
1492 | `Exn exn -> tracker_failed (Printf.sprintf "wrong reply (%s)" (Printexc2.to_string exn))
1493 | `Ok (Dictionary list) ->
1494 t.tracker_interval <- 600;
1495 t.tracker_min_interval <- 600;
1496 t.tracker_last_clients_num <- 0;
1497 if need_sources then t.tracker_last_clients_num <- 0;
1498 let chk_keyval key n = chk_keyval key n t.tracker_url file.file_name in
1499 if not (List.mem_assoc "failure reason" list) then
1500 begin
1501 begin match t.tracker_status with
1502 | Disabled_failure (i, _) ->
1503 lprintf_file_nl (as_file file) "Received good message from Tracker %s after %d bad attempts"
1504 t.tracker_url i
1505 | _ -> () end;
1506 (* Received good message from tracker after failures, re-enable tracker *)
1507 t.tracker_status <- Enabled;
1508 end;
1509 List.iter (fun (key,value) ->
1510 match (key,value) with
1511 | "failure reason", String failure -> tracker_failed failure
1512 | "warning message", String warning ->
1513 lprintf_file_nl (as_file file) "Warning from Tracker %s in file: %s Reason: %s"
1514 tracker_url file.file_name warning
1515 | "interval", Int n ->
1516 t.tracker_interval <- chk_keyval key n;
1517 (* in case we don't receive "min interval" *)
1518 if t.tracker_min_interval > t.tracker_interval then
1519 t.tracker_min_interval <- t.tracker_interval
1520 | "min interval", Int n ->
1521 t.tracker_min_interval <- chk_keyval key n;
1522 (* make sure "min interval" is always < or equal to "interval" *)
1523 if t.tracker_min_interval > t.tracker_interval then
1524 t.tracker_min_interval <- t.tracker_interval
1525 | "downloaded", Int n ->
1526 t.tracker_torrent_downloaded <- chk_keyval key n
1527 | "complete", Int n
1528 | "done peers", Int n ->
1529 t.tracker_torrent_complete <- chk_keyval key n
1530 | "incomplete", Int n ->
1531 t.tracker_torrent_incomplete <- chk_keyval key n;
1532 (* if complete > 0 and we receive incomplete we probably won't receive num_peers so we simulate it below *)
1533 if t.tracker_torrent_complete > 0 then
1534 t.tracker_torrent_total_clients_count <- (t.tracker_torrent_complete + t.tracker_torrent_incomplete);
1535 | "num peers", Int n ->
1536 t.tracker_torrent_total_clients_count <- chk_keyval key n;
1537 (* if complete > 0 and we receive num_peers we probably won't receive incomplete so we simulate it below *)
1538 if t.tracker_torrent_complete > 0 then
1539 t.tracker_torrent_incomplete <- (t.tracker_torrent_total_clients_count - t.tracker_torrent_complete);
1540 | "last", Int n ->
1541 t.tracker_torrent_last_dl_req <- chk_keyval key n
1542 | "key", String n ->
1543 t.tracker_key <- n;
1544 if !verbose_msg_clients then
1545 lprintf_file_nl (as_file file) "%s in file: %s has key: %s" tracker_url file.file_name n
1546 | "tracker id", String n ->
1547 t.tracker_id <- n;
1548 if !verbose_msg_clients then
1549 lprintf_file_nl (as_file file) "%s in file: %s has tracker id %s" tracker_url file.file_name n
1551 | "peers", List list ->
1552 if need_sources then
1553 List.iter (fun v ->
1554 match v with
1555 | Dictionary list ->
1556 let peer_id = ref Sha1.null in
1557 let peer_ip = ref Ip.null in
1558 let port = ref 0 in
1560 List.iter (fun v ->
1561 match v with
1562 "peer id", String id ->
1563 peer_id := Sha1.direct_of_string id;
1564 | "ip", String ip ->
1565 peer_ip := Ip.of_string ip
1566 | "port", Int p ->
1567 port := Int64.to_int p
1568 | _ -> ()
1569 ) list;
1571 t.tracker_last_clients_num <- t.tracker_last_clients_num + 1;
1572 maybe_new_client file !peer_id !peer_ip !port
1574 | _ -> assert false
1575 ) list
1576 | "peers", String p ->
1577 let rec iter_comp s pos l =
1578 if pos < l then
1579 let ip = Ip.of_ints (get_uint8 s pos,get_uint8 s (pos+1),
1580 get_uint8 s (pos+2),get_uint8 s (pos+3))
1581 and port = get_int16 s (pos+4)
1583 t.tracker_last_clients_num <- t.tracker_last_clients_num + 1;
1584 maybe_new_client file Sha1.null ip port;
1586 iter_comp s (pos+6) l
1588 if need_sources then
1589 iter_comp p 0 (String.length p)
1590 | "private", Int n -> ()
1591 (* TODO: if set to 1, disable peer exchange *)
1593 | key, _ -> lprintf_file_nl (as_file file) "received unknown entry in answer from tracker: %s : %s" key (Bencode.print value)
1594 ) list;
1595 (*Now, that we have added new clients to a file, it's time
1596 to connect to them*)
1597 if !verbose_sources > 0 then
1598 lprintf_file_nl (as_file file) "talk_to_tracker: got %i source(s) for file %s"
1599 t.tracker_last_clients_num file.file_name;
1600 if need_sources then resume_clients file
1602 | _ -> tracker_failed "wrong reply (value)"
1604 let event =
1605 if file.file_tracker_connected then ""
1606 else "started"
1608 connect_trackers file event need_sources f
1611 (** Check to see if file is finished, if not
1612 try to get sources for it
1614 let recover_files () =
1615 if !verbose_share then
1616 lprintf_nl "recover_files";
1617 List.iter (fun file ->
1618 match file.file_swarmer with
1619 None -> ()
1620 | Some swarmer ->
1621 (try check_finished swarmer file with e -> ());
1622 match file_state file with
1623 FileDownloading ->
1624 if !verbose_share then
1625 lprintf_file_nl (as_file file) "recover downloading";
1626 (try talk_to_tracker file true with _ -> ())
1627 | FileShared ->
1628 if !verbose_share then
1629 lprintf_file_nl (as_file file) "recover shared";
1630 (try talk_to_tracker file false with _ -> ())
1631 | FilePaused -> () (*when we are paused we do nothing, not even logging this vvvv*)
1632 | FileQueued -> ()
1633 | s -> if !verbose then lprintf_file_nl (as_file file) "recover: Other state %s!!" (string_of_state s)
1634 ) !current_files
1636 let upload_buffer = String.create 100000
1640 Send a Piece message
1641 for one of the request of client
1642 @param sock The socket of the client
1643 @param c The client
1645 let rec iter_upload sock c =
1646 match c.client_upload_requests with
1647 [] -> ()
1648 | (num, pos, len) :: tail ->
1649 if len = zero then begin
1650 c.client_upload_requests <- tail;
1651 iter_upload sock c
1652 end else
1653 if c.client_allowed_to_write >= 0L then begin
1655 c.client_upload_requests <- tail;
1657 let file = c.client_file in
1658 let offset = pos ++ file.file_piece_size *.. num in
1659 c.client_allowed_to_write <- c.client_allowed_to_write -- len;
1660 count_upload c len;
1661 let len = Int64.to_int len in
1662 (* lprintf "Unix32.read: offset %Ld len %d\n" offset len; *)
1663 Unix32.read (file_fd file) offset upload_buffer 0 len;
1664 (* update upload rate from len bytes *)
1665 Rate.update c.client_upload_rate ~amount:len;
1666 Rate.update c.client_downloaded_rate;
1667 file.file_uploaded <- file.file_uploaded ++ (Int64.of_int len);
1668 file.file_session_uploaded <- file.file_session_uploaded ++ (Int64.of_int len);
1669 let _ =
1670 (* update stats *)
1671 count_filerequest c;
1672 match file.file_shared with
1673 None -> ()
1674 | Some s ->
1675 begin
1676 s.impl_shared_uploaded <- file.file_uploaded;
1677 shared_must_update (as_shared s)
1680 (* lprintf "sending piece\n"; *)
1681 send_client c (Piece (num, pos, upload_buffer, 0, len));
1682 iter_upload sock c
1683 with e ->
1684 if !verbose then
1685 lprintf_nl "Exception %s in iter_upload" (Printexc2.to_string e)
1686 end else
1687 begin
1688 (* lprintf "client is waiting for another piece\n"; *)
1689 ready_for_upload (as_client c)
1694 In this function we check if we can send bytes (according
1695 to bandwidth control), if we can, call iter_upload to
1696 send a Piece message
1697 @param c the client to which we can send some bytes
1698 @param allowed the amount of bytes we can send to client
1700 let client_can_upload c allowed =
1701 (* lprintf "allowed to upload %d\n" allowed; *)
1702 do_if_connected c.client_sock (fun sock ->
1703 match c.client_upload_requests with
1704 [] -> ()
1705 | _ :: tail ->
1706 let new_allowed_to_write =
1707 c.client_allowed_to_write ++ (Int64.of_int allowed) in
1708 if allowed > 0 && can_write_len sock
1709 (Int64.to_int new_allowed_to_write)
1710 then begin
1711 CommonUploads.consume_bandwidth allowed;
1712 c.client_allowed_to_write <- new_allowed_to_write;
1713 end;
1714 iter_upload sock c
1717 let file_resume file =
1718 List.iter (fun t ->
1719 match t.tracker_status with
1720 | Enabled | Disabled_mld _ -> ()
1721 | Disabled_failure _ | Disabled _ -> t.tracker_status <- Enabled
1722 ) file.file_trackers;
1723 (try talk_to_tracker file true with _ -> ())
1728 Send info to tracker when stopping a file.
1729 @param file the file we want to stop
1731 let file_stop file =
1732 if file.file_tracker_connected then
1733 begin
1734 connect_trackers file "stopped" false (fun _ _ ->
1735 lprintf_file_nl (as_file file) "Tracker return: stopped %s" file.file_name;
1736 file.file_tracker_connected <- false)
1740 Create the 'hooks'
1742 let _ =
1743 client_ops.op_client_can_upload <- client_can_upload;
1744 file_ops.op_file_resume <- file_resume;
1745 file_ops.op_file_recover <- file_resume;
1746 file_ops.op_file_pause <- (fun file ->
1747 Hashtbl.iter (fun _ c ->
1748 match c.client_sock with
1749 Connection sock -> close sock Closed_by_user
1750 | _ -> ()
1751 ) file.file_clients;
1752 (*When a file is paused we consider it is stopped*)
1753 file_stop file
1755 file_ops.op_file_queue <- file_ops.op_file_pause;
1756 client_ops.op_client_enter_upload_queue <- (fun c ->
1757 if !verbose_msg_clients then
1758 lprintf_nl "Client %d: client_enter_upload_queue" (client_num c);
1759 ready_for_upload (as_client c));
1760 network.op_network_connected_servers <- (fun _ -> []);