debug udp
[mldonkey.git] / src / networks / bittorrent / bTClients.ml
blobe404ff05fdd9abb7520a47749808877008a72409
1 (* Copyright 2001, 2002 b52_simon :), b8_bavard, b8_fee_carabine, INRIA *)
2 (*
3 This file is part of mldonkey.
5 mldonkey is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation; either version 2 of the License, or
8 (at your option) any later version.
10 mldonkey is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 GNU General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with mldonkey; if not, write to the Free Software
17 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
21 (** Functions used in client<->client communication
22 and also client<->tracker
25 (** A peer (or client) is always a remote peer in this file.
26 A Piece is a portion of the file associated with a hash (sha1).
27 In mldonkey a piece is referred as a block inside the swarming system.
28 A SubPiece is a portion of a piece (without hash) which can be
29 sent/downloaded to/from a peer.
30 In mldonkey a SubPiece is referred as a range inside the swarming system.
31 @see <http://wiki.theory.org/index.php/BitTorrentSpecification> wiki for some
32 unofficial (but more detailed) specs.
35 open Int64ops
36 open AnyEndian
37 open BigEndian
38 open Printf2
39 open Md4
40 open Options
41 open BasicSocket
42 open TcpBufferedSocket
43 open Ip_set
45 open CommonShared
46 open CommonUploads
47 open CommonOptions
48 open CommonDownloads
49 open CommonInteractive
50 open CommonClient
51 open CommonComplexOptions
52 open CommonTypes
53 open CommonFile
54 open CommonSwarming
55 open CommonGlobals
56 open CommonDownloads
58 open BTRate
59 open BTTypes
60 open BTProtocol
61 open BTOptions
62 open BTGlobals
63 open BTComplexOptions
64 open BTChooser
65 open BTStats
66 open TcpMessages
68 module VB = VerificationBitmap
70 let http_ok = "HTTP 200 OK"
71 let http11_ok = "HTTP/1.1 200 OK"
74 let next_uploaders = ref ([] : BTTypes.client list)
75 let current_uploaders = ref ([] : BTTypes.client list)
77 (** Check that client is valid and record it *)
78 let maybe_new_client file id ip port =
79 let cc = Geoip.get_country_code_option ip in
80 if id <> !!client_uid
81 && ip != Ip.null
82 && port <> 0
83 && (match !Ip.banned (ip, cc) with
84 | None -> true
85 | Some reason ->
86 if !verbose_connect then
87 lprintf_file_nl (as_file file) "%s:%d blocked: %s" (Ip.to_string ip) port reason;
88 false)
89 then
90 ignore (new_client file id (ip,port) cc);
91 if !verbose_sources > 1 then
92 lprintf_file_nl (as_file file) "Received %s:%d" (Ip.to_string ip) port
95 let resume_clients_hook = ref (fun _ -> assert false)
97 include struct
99 (* open modules locally *)
100 open BTUdpTracker
101 open UdpSocket
103 let string_of_event = function
104 | READ_DONE -> "READ_DONE"
105 | WRITE_DONE -> "WRITE_DONE"
106 | CAN_REFILL -> "CAN_REFILL"
107 | BASIC_EVENT e -> match e with
108 | CLOSED reason -> "CLOSED " ^ (string_of_reason reason)
109 | RTIMEOUT -> "RTIMEOUT"
110 | WTIMEOUT -> "WTIMEOUT"
111 | LTIMEOUT -> "LTIMEOUT"
112 | CAN_READ -> "CAN_READ"
113 | CAN_WRITE -> "CAN_WRITE"
115 (** talk to udp tracker and parse response
116 except of parsing should perform everything that
117 talk_to_tracker's inner function does FIXME refactor both
119 Better create single global udp socket and use it for all
120 tracker requests and distinguish trackers by txn? FIXME?
122 let talk_to_udp_tracker host port args file t need_sources =
124 lprintf_nl "udpt start with %s:%d" host port;
125 let addr = try (Unix.gethostbyname host).Unix.h_addr_list.(0) with exn -> failwith ("failed to resolve " ^ host) in
126 let ip = Ip.of_inet_addr addr in
127 lprintf_nl "udpt resolved to ip %s" (Ip.to_string ip);
128 let socket = create Unix.inet_addr_any 0 (fun sock event ->
129 lprintf_nl "udpt got event %s for %s" (string_of_event event) host
130 match event with
131 | WRITE_DONE | CAN_REFILL -> ()
132 | READ_DONE -> assert false (* set_reader prevents this *)
133 | BASIC_EVENT x -> match x with
134 | CLOSED _ -> ()
135 | CAN_READ | CAN_WRITE -> assert false (* udpSocket implementation prevents this *)
136 | LTIMEOUT | WTIMEOUT | RTIMEOUT -> close sock (Closed_for_error "udpt timeout"))
138 BasicSocket.set_wtimeout (sock socket) 5.;
139 BasicSocket.set_rtimeout (sock socket) 5.;
140 let txn = Random.int32 Int32.max_int in
141 lprintf_nl "udpt txn %ld for %s" txn host;
142 write socket false (connect_request txn) ip port;
143 set_reader socket (fun _ ->
144 let p = read socket in
145 let conn = connect_response p.udp_content txn in
146 lprintf_nl "udpt connection_id %Ld for %s" conn host;
147 let txn = Random.int32 Int32.max_int in
148 lprintf_nl "udpt txn' %ld for host %s" txn host;
149 let int s = Int64.of_string (List.assoc s args) in
150 let req = announce_request conn txn
151 ~info_hash:(List.assoc "info_hash" args)
152 ~peer_id:(List.assoc "peer_id" args)
153 (int "downloaded",int "left",int "uploaded")
154 (match List.assoc "event" args with
155 | "completed" -> 1l
156 | "started" -> 2l
157 | "stopped" -> 3l
158 | "" -> 0l
159 | s -> lprintf_nl "udpt event %s? for %s" s host; 0l)
160 ~numwant:(try Int32.of_string (List.assoc "numwant" args) with _ -> -1l)
161 (int_of_string (List.assoc "port" args))
163 write socket false req ip port;
164 set_reader socket (fun _ ->
165 let p = read socket in
167 t.tracker_last_conn <- last_time ();
168 file.file_tracker_connected <- true;
169 t.tracker_interval <- 600;
170 t.tracker_min_interval <- 600;
171 if need_sources then t.tracker_last_clients_num <- 0;
173 let (interval,clients) = announce_response p.udp_content txn in
174 lprintf_nl "udpt got interval %ld clients %d for host %s" interval (List.length clients) host;
175 if interval > 0l then
176 begin
177 t.tracker_interval <- Int32.to_int interval;
178 if t.tracker_min_interval > t.tracker_interval then
179 t.tracker_min_interval <- t.tracker_interval
180 end;
181 List.iter (fun (ip',port) ->
182 let ip = Ip.of_int64 (Int64.logand 0xFFFFFFFFL (Int64.of_int32 ip')) in
183 lprintf_nl "udpt got %s:%d" (Ip.to_string ip) port;
184 t.tracker_last_clients_num <- t.tracker_last_clients_num + 1;
185 maybe_new_client file Sha1.null ip port
186 ) clients;
187 close socket Closed_by_user;
188 lprintf_nl "udpt interact done for %s" host;
189 if need_sources then !resume_clients_hook file
191 with
192 exn ->
193 lprintf_nl "udpt interact exn %s" (Printexc2.to_string exn)
195 end (* include *)
198 In this function we connect to a tracker.
199 @param file The file concerned by the request
200 @param url Url of the tracker to connect
201 @param event Event (as a string) to send to the tracker :
202 can be 'completed' if the file is complete, 'started' for the first
203 connection to this tracker or 'stopped' for a clean stop of the file.
204 Everything else will be ok for a second connection to the tracker.
205 Be careful to the spelling of this event
206 @param f The function used to parse the result of the connection.
207 The function will get a file as an argument (@see talk_to_tracker
208 for an example)
210 If we have less than !!ask_tracker_threshold sources
211 and if we respect the file_tracker_interval then
212 we really ask sources to the tracker
214 let connect_trackers file event need_sources f =
216 (* reset session statistics when sending 'started' event *)
217 if event = "started" then
218 begin
219 file.file_session_uploaded <- Int64.zero;
220 file.file_session_downloaded <- Int64.zero;
221 end;
223 let args,must_check_delay, left =
225 match file.file_swarmer with
226 None ->
227 begin
228 match event with
229 | "started" -> [("event", "started")],true,zero
230 | "stopped" -> [("event", "stopped")],false,zero
231 | _ -> [],true,zero
234 | Some swarmer ->
235 let local_downloaded = CommonSwarming.downloaded swarmer in
236 let left = file_size file -- local_downloaded in
237 match event with
238 | "completed" -> [("event", "completed")],false,zero
239 | "started" -> [("event", "started")],true,left
240 | "stopped" -> [("event", "stopped")],false,left
241 | _ -> [],true,left
244 let args = ("no_peer_id", "1") :: ("compact", "1") :: args in
245 let args =
246 if not need_sources then
247 ("numwant", "0") :: args
248 else if !!numwant > -1 then
249 ("numwant", string_of_int !!numwant) :: args
250 else
251 args
253 let args = if !!send_key then
254 ("key", Sha1.to_hexa !!client_uid) :: args else args
256 let args = if !!force_client_ip then
257 ("ip", Ip.to_string !!set_client_ip) :: args else args
259 let args =
260 ("info_hash", Sha1.direct_to_string file.file_id) ::
261 ("peer_id", Sha1.direct_to_string !!client_uid) ::
262 ("port", string_of_int !!client_port) ::
263 ("uploaded", Int64.to_string file.file_session_uploaded) ::
264 ("downloaded", Int64.to_string file.file_session_downloaded) ::
265 ("left", Int64.to_string left) ::
266 args
269 let enabled_trackers =
270 let enabled_trackers = List.filter (fun t -> tracker_is_enabled t) file.file_trackers in
271 if enabled_trackers <> [] then enabled_trackers
272 else begin
273 (* if there is no tracker left, do something ? *)
274 if !verbose_msg_servers then
275 lprintf_nl "No trackers left for %s, reenabling all of them..." (file_best_name (as_file file));
276 List.iter (fun t ->
277 match t.tracker_status with
278 (* only re-enable after normal error *)
279 | Disabled _ -> t.tracker_status <- Enabled
280 | _ -> ()) file.file_trackers;
281 let enabled_trackers = List.filter (fun t -> tracker_is_enabled t) file.file_trackers in
282 if enabled_trackers = [] && (file_state file) <> FilePaused then
283 begin
284 file_pause (as_file file) (CommonUserDb.admin_user ());
285 lprintf_file_nl (as_file file) "Paused %s, no usable trackers left" (file_best_name (as_file file))
286 end;
287 enabled_trackers
288 end in
290 List.iter (fun t ->
292 (* if we have too few sources we may ask the tracker before the interval *)
293 if not must_check_delay
294 || not file.file_tracker_connected
295 || t.tracker_last_conn + t.tracker_interval < last_time()
296 || ( file.file_clients_num < !!ask_tracker_threshold
297 && (file_state file) == FileDownloading
298 && (if t.tracker_min_interval > !!min_tracker_reask_interval then
299 t.tracker_last_conn + t.tracker_min_interval < last_time()
300 else
301 t.tracker_last_conn + !!min_tracker_reask_interval < last_time() ))
302 then
303 begin
304 (* if we already tried to connect but failed, disable tracker, but allow re-enabling *)
305 (* FIXME t.tracker_last_conn < 1 only at first connect, so later failures will stay undetected! *)
306 if file.file_tracker_connected && t.tracker_last_clients_num = 0 && t.tracker_last_conn < 1 then
307 begin
308 if !verbose_msg_servers then
309 lprintf_nl "Request error from tracker: disabling %s" (show_tracker_url t.tracker_url);
310 t.tracker_status <- Disabled (intern "MLDonkey: Request error from tracker")
312 (* Send request to tracker *)
313 else
314 let args = if String.length t.tracker_id > 0 then
315 ("trackerid", t.tracker_id) :: args else args
317 let args = if String.length t.tracker_key > 0 then
318 ("key", t.tracker_key) :: args else args
320 if !verbose_msg_servers then
321 lprintf_nl "connect_trackers: connected:%s id:%s key:%s last_clients:%i last_conn-last_time:%i numwant:%s file: %s"
322 (string_of_bool file.file_tracker_connected)
323 t.tracker_id t.tracker_key t.tracker_last_clients_num
324 (t.tracker_last_conn - last_time()) (try List.assoc "numwant" args with _ -> "_") file.file_name;
326 match t.tracker_url with
327 | `Http url ->
328 let module H = Http_client in
329 let r = {
330 H.basic_request with
331 H.req_url = Url.of_string ~args: args url;
332 H.req_proxy = !CommonOptions.http_proxy;
333 H.req_user_agent = get_user_agent ();
334 (* #4541 [egs] supports redirect *)
335 H.req_max_retry = !!max_tracker_redirect;
336 } in
338 if !verbose_msg_servers then
339 lprintf_nl "Request sent to tracker %s for file: %s"
340 url file.file_name;
341 H.wget r
342 (fun fileres ->
343 t.tracker_last_conn <- last_time ();
344 file.file_tracker_connected <- true;
345 f t fileres)
346 | `Other url -> assert false (* should have been disabled *)
347 | `Udp (host,port) -> talk_to_udp_tracker host port args file t need_sources
350 else
351 if !verbose_msg_servers then
352 lprintf_nl "Request NOT sent to tracker %s - next request in %ds for file: %s"
353 (show_tracker_url t.tracker_url) (t.tracker_interval - (last_time () - t.tracker_last_conn)) file.file_name
354 ) enabled_trackers
356 let start_upload c =
357 set_client_upload (as_client c) (as_file c.client_file);
358 set_client_has_a_slot (as_client c) NormalSlot;
359 Rate.update_no_change c.client_downloaded_rate;
360 Rate.update_no_change c.client_upload_rate;
361 c.client_last_optimist <- last_time();
362 client_enter_upload_queue (as_client c);
363 send_client c Unchoke
365 (** In this function we decide which peers will be
366 uploaders. We send a choke message to current uploaders
367 that are not in the next uploaders list. We send Unchoke
368 for clients that are in next list (and not in current)
370 let recompute_uploaders () =
371 if !verbose_upload then lprintf_nl "recompute_uploaders";
372 next_uploaders := choose_uploaders current_files;
373 (*Send choke if a current_uploader is not in next_uploaders*)
374 List.iter ( fun c -> if ((List.mem c !next_uploaders)==false) then
375 begin
376 set_client_has_a_slot (as_client c) NoSlot;
377 (*we will let him finish his download and choke him on next_request*)
379 ) !current_uploaders;
381 (*don't send Choke if new uploader is already an uploaders *)
382 List.iter ( fun c ->
383 if not (List.mem c !current_uploaders) then start_upload c
384 ) !next_uploaders;
385 current_uploaders := !next_uploaders
388 (****** Fabrice: why are clients which are disconnected removed ???
389 These clients might still be useful to reconnect to, no ? *)
392 (** This function is called when a client is disconnected
393 (be it by our side or its side).
394 A client which disconnects (even only one time) is discarded.
395 If it's an uploader which disconnects we recompute uploaders
396 (see recompute_uploaders) immediately.
397 @param c The client to disconnect
398 @param reason The reason for the disconnection (see in BasicSocket.ml)
400 let disconnect_client c reason =
401 if !verbose_msg_clients then
402 lprintf_nl "Client %d: disconnected: %s" (client_num c) (string_of_reason reason);
403 begin
404 match c.client_sock with
405 NoConnection -> ()
406 | ConnectionWaiting token ->
407 cancel_token token;
408 c.client_sock <- NoConnection
409 | Connection sock ->
410 close sock reason;
412 (* List.iter (fun r -> CommonSwarming.free_range r) c.client_ranges; *)
413 set_client_disconnected c reason;
414 c.client_session_downloaded <- 0L;
415 c.client_session_uploaded <- 0L;
416 (try if c.client_good then count_seen c with _ -> ());
417 (* this is not useful already done in the match
418 (try close sock reason with _ -> ()); *)
419 (*---------not needed ?? VvvvvV---------------
420 c.client_ranges <- [];
421 c.client_block <- None;
422 if not c.client_good then
423 connection_failed c.client_connection_control;
424 c.client_good <- false;
425 c.client_sock <- NoConnection;
426 c.client_chunks <- [];
427 c.client_allowed_to_write <- zero;
428 c.client_new_chunks <- [];
429 c.client_interesting <- false;
430 c.client_alrd_sent_interested <- false;
431 -------------------^^^^^--------------------*)
432 if (c.client_registered_bitfield) then
433 begin
434 match c.client_uploader with
435 None -> ()
436 | Some up ->
437 c.client_uploader <- None;
438 (* If the client registered a bitfield then
439 we must unregister him to update the swarmer
440 (Useful for availability)
442 CommonSwarming.unregister_uploader up
443 (* c.client_registered_bitfield <- false;
444 for i = 0 to String.length c.client_bitmap - 1 do
445 c.client_bitmap.[0] <- '0';
446 done*)
447 end;
448 (* Don't test if a client have an upload slot because
449 it don't have one (removed during earlier in
450 set_client_disconnected c reason)
452 if (List.mem c !current_uploaders) then
453 begin
454 (*BTGlobals.remove_client*)
455 remove_client c;
456 recompute_uploaders ();
458 else
459 remove_client c;
460 with _ -> ()
464 (** Disconnect all clients of a file
465 @param file The file to which we must disconnects all clients
467 let disconnect_clients file =
468 let must_keep = ref true in
469 (match file_state file with
470 | FilePaused | FileCancelled -> must_keep:=false
471 | _-> ()
473 Hashtbl.iter (fun _ c ->
474 if not ( !must_keep && (client_has_a_slot (as_client c) || c.client_interested)) then
475 begin
476 if !verbose_msg_clients then
477 lprintf_file_nl (as_file file) "disconnect since download is finished";
478 disconnect_client c Closed_by_user
480 ) file.file_clients
483 (** What to do when a file is finished
484 @param file the finished file
486 let download_finished file =
487 if List.memq file !current_files then
488 begin
489 connect_trackers file "completed" false (fun _ _ ->
490 lprintf_file_nl (as_file file) "Tracker return: completed %s" file.file_name;
491 ); (*must be called before swarmer gets removed from file*)
492 (*CommonComplexOptions.file_completed*)
493 file_completed (as_file file);
494 (* Remove the swarmer for this file as it is not useful anymore... *)
495 CommonSwarming.remove_swarmer file.file_swarmer;
496 file.file_swarmer <- None;
497 (* At this point, the file state is FileDownloaded. We should not remove
498 the file, because we continue to upload. *)
502 (** Check if a file is finished or not.
503 A file is finished if all blocks are verified.
504 @param file The file to check status
506 let check_finished swarmer file =
507 if CommonSwarming.check_finished swarmer then
508 download_finished file
510 let bits = [| 128; 64; 32;16;8;4;2;1 |]
512 (* Check/set bits in strings (bittorrent format) *)
514 let is_bit_set s n =
515 (Char.code s.[n lsr 3]) land bits.(n land 7) <> 0
517 let set_bit s n =
518 let i = n lsr 3 in
519 s.[i] <- Char.unsafe_chr (Char.code s.[i] lor bits.(n land 7))
521 (* Official client seems to use max_range_request 5 and max_range_len 2^14 *)
522 (* How much requests in the 'pipeline' *)
523 let max_range_requests = 5
524 (* How much bytes we can request in one Piece *)
527 (** A wrapper to send Interested message to a client.
528 (Send interested only if needed)
529 @param c The client to send Interested
531 let send_interested c =
532 if c.client_interesting && (not c.client_alrd_sent_interested) then
533 begin
534 c.client_alrd_sent_interested <- true;
535 send_client c Interested
539 (** Send a Bitfield message to a client.
540 @param c The client to send the Bitfield message
543 let send_bitfield c =
544 send_client c (BitField
546 match c.client_file.file_swarmer with
547 | None ->
548 (* This must be a seeded file... *)
549 if !verbose_download then
550 lprintf_nl "Sending completed verified bitmap";
551 let nchunks = Array.length c.client_file.file_chunks in
552 let len = (nchunks+7)/8 in
553 let s = String.make len '\000' in
554 for i = 0 to nchunks - 1 do
555 set_bit s i
556 done;
558 | Some swarmer ->
559 let bitmap = CommonSwarming.chunks_verified_bitmap swarmer in
560 if !verbose_download then
561 lprintf_nl "Sending verified bitmap: [%s]" (VB.to_string bitmap);
562 let len = (VB.length bitmap + 7)/8 in
563 let s = String.make len '\000' in
564 VB.iteri (fun i c ->
565 if c = VB.State_verified then set_bit s i) bitmap;
569 let counter = ref 0
571 let parse_reserved rbits c =
572 let has_bit pos h = Char.code rbits.[pos] land h <> 0 in
574 c.client_dht <- has_bit 7 0x01;
575 c.client_cache_extension <- has_bit 7 0x02;
576 c.client_fast_extension <- has_bit 7 0x04;
578 c.client_utorrent_extension <- has_bit 5 0x10;
580 c.client_azureus_messaging_protocol <- has_bit 0 0x80
583 (** This function is called to parse the first message that
584 a client send.
585 @param counter client num
586 @param cc Expected client (probably useless now that we don't save any client)
587 @param init_sent A boolean to know if we sent this client the handshake message
588 @param gconn Don't know
589 @param sock The socket we use for this client
590 @param proto Unused (required by tuple type?)
591 @param file_id The file hash (sha1) of the file involved in this exchange
593 (* removed: @param peer_id The hash (sha1) of the client. (Should be checked)
595 let rec client_parse_header counter cc init_sent gconn sock
596 (proto, rbits, file_id) =
598 set_lifetime sock 600.;
599 if !verbose_msg_clients then
600 lprintf_nl "client_parse_header %d" counter;
602 let file = Hashtbl.find files_by_uid file_id in
603 if !verbose_msg_clients then
604 lprintf_file_nl (as_file file) "file found";
605 let ccc, cc_country_code = !cc in
606 let c =
607 match ccc with
608 None ->
609 let c = new_client file Sha1.null (TcpBufferedSocket.peer_addr sock) cc_country_code in
610 if !verbose_connect then lprintf_file_nl (as_file file) "Client %d: incoming connection" (client_num c);
611 cc := (Some c), cc_country_code;
613 | Some c ->
614 (* Does it happen that this c was already used to connect successfully?
615 If yes then this must happen: *)
616 c.client_received_peer_id <- false;
617 if cc_country_code <> None && c.client_country_code = None then
618 c.client_country_code <- cc_country_code;
620 (* client could have had Sha1.null as peer_id/uid *)
621 (* this is to be done, later
622 if c.client_uid <> peer_id then
623 c.client_software <- (parse_software (Sha1.direct_to_string peer_id));
627 (* if c.client_uid <> peer_id then begin
628 lprintf "Unexpected client by UID\n";
629 let ccc = new_client file peer_id (TcpBufferedSocket.host sock) in
630 lprintf "CLIENT %d: testing instead of %d\n"
631 (client_num ccc) (client_num c);
632 (match ccc.client_sock with
633 Connection _ ->
634 lprintf_nl "[BT]: This client is already connected";
635 close sock (Closed_for_error "Already connected");
636 remove_client ccc;
638 | _ ->
639 lprintf_nl "[BT]: Client %d: recovered by UID" (client_num ccc);
640 remove_client c;
641 cc := Some ccc;
642 ccc)
643 end else
644 c *)
647 if !verbose_msg_clients then begin
648 let (ip,port) = c.client_host in
649 lprintf_nl "Client %d: Connected from %s:%d" (client_num c)
650 (Ip.to_string ip) port;
651 end;
653 parse_reserved rbits c;
655 (match c.client_sock with
656 NoConnection ->
657 if !verbose_msg_clients then begin
658 let (ip,port) = c.client_host in
659 lprintf_nl "No connection to client (%s:%d)!!!" (Ip.to_string ip) port;
660 end;
661 c.client_sock <- Connection sock
662 | ConnectionWaiting token ->
663 cancel_token token;
664 if !verbose_msg_clients then
665 lprintf_nl "Waiting for connection to client !!!";
666 c.client_sock <- Connection sock
667 | Connection s when s != sock ->
668 if !verbose_msg_clients then
669 lprintf_nl "CLIENT %d: IMMEDIATE RECONNECTION" (client_num c);
670 disconnect_client c (Closed_for_error "Reconnected");
671 c.client_sock <- Connection sock;
672 | Connection _ -> ()
675 set_client_state (c) (Connected (-1));
676 if not init_sent then
677 begin
678 c.client_incoming <- true;
679 send_init !!client_uid file_id sock;
680 end;
681 connection_ok c.client_connection_control;
682 if !verbose_msg_clients then
683 lprintf_nl "file and client found";
684 (* if not c.client_incoming then *)
685 send_bitfield c;
686 c.client_blocks_sent <- file.file_blocks_downloaded;
688 TODO !!! : send interested if and only if we are interested
689 -> we must recieve at least other peer bitfield.
690 in common swarmer -> compare : partition -> partition -> bool
693 set_rtimeout sock !!client_timeout;
694 (* Once parsed succesfully we define the function client_to_client
695 to be the function used when a message is read *)
696 gconn.gconn_handler <- Reader (fun gconn sock ->
697 bt_handler TcpMessages.parsing (client_to_client c) c sock
700 let b = TcpBufferedSocket.buf sock in
701 (* The receive buffer is normally not empty now, lets parse the rest, most likely PeerID *)
702 if b.len <> 0 then
703 ignore (bt_handler TcpMessages.parsing (client_to_client c) c sock);
705 (* Some newer clients send more opcodes in their handshake packet, lets parse them now.
706 Using "while b.len <> 0 do ... done" is not possible here because libtorrent clients
707 send unparsable five extra bytes after their PeerID which would result into a loop *)
708 if b.len <> 0 then
709 ignore (bt_handler TcpMessages.parsing (client_to_client c) c sock);
711 with
712 | Not_found ->
713 let (ip,port) = (TcpBufferedSocket.peer_addr sock) in
714 if !verbose_unexpected_messages then
715 lprintf_nl "Client %s:%d requested a file that is not shared [%s]"
716 (Ip.to_string ip) port (Sha1.to_hexa file_id)
717 | e ->
718 lprintf_nl "Exception %s in client_parse_header" (Printexc2.to_string e);
719 close sock (Closed_for_exception e);
720 raise e
723 (** Update the bitmap of a client. Unclear if it is still useful.
724 @param c The client which we want to update.
726 and update_client_bitmap c =
727 let file = c.client_file in
729 let swarmer = match file.file_swarmer with
730 None -> assert false
731 | Some swarmer -> swarmer
734 let up =
735 match c.client_uploader with
736 None ->
737 let up = CommonSwarming.register_uploader swarmer (as_client c)
738 (AvailableIntervals []) in
739 c.client_uploader <- Some up;
741 | Some up ->
745 let bitmap = match c.client_bitmap with
746 None ->
747 let len = CommonSwarming.partition_size swarmer in
748 let bitmap = Bitv.create len false in
749 c.client_bitmap <- Some bitmap;
750 bitmap
751 | Some bitmap -> bitmap
754 if c.client_new_chunks <> [] then begin
755 let chunks = c.client_new_chunks in
756 c.client_new_chunks <- [];
757 List.iter (fun n -> Bitv.set bitmap n true) chunks;
758 CommonSwarming.update_uploader_intervals up (AvailableBitv bitmap);
762 (** In this function we decide which piece we must request from client.
763 @param sock Socket of the client
764 @param c The client
766 and get_from_client sock (c: client) =
767 let file = c.client_file in
768 (* Check if there's not enough requests in the 'pipeline'
769 and if a request can be send (not choked and file is downloading) *)
770 if List.length c.client_ranges_sent < max_range_requests
771 && file_state file = FileDownloading
772 && (c.client_choked == false)
773 then
774 (* num is the number of the piece, x and y are the position
775 of the subpiece in the piece(!), r is a (CommonSwarmer) range *)
777 let up = match c.client_uploader with
778 None -> assert false
779 | Some up -> up in
780 let swarmer = CommonSwarming.uploader_swarmer up in
784 let num, x,y, r =
786 if !verbose_msg_clients then
787 lprintf_file_nl (as_file file) "CLIENT %d: Finding new range to send" (client_num c);
789 if !verbose_swarming then begin
790 lprintf_n "Current download:\n Current chunks: ";
793 List.iter (fun (x,y) -> lprintf "%Ld-%Ld " x y) c.client_chunks
794 with _ -> lprintf "No Chunks";
796 lprint_newline ();
798 lprintf_n "Current ranges: ";
800 List.iter (fun (p1,p2, r) ->
801 let (x,y) = CommonSwarming.range_range r in
802 lprintf "%Ld-%Ld[%Ld-%Ld] " p1 p2 x y
803 ) c.client_ranges_sent;
805 match c.client_range_waiting with
806 | None -> ()
807 | Some (x,y,r) -> lprintf "Waiting %Ld-%Ld" x y;
809 lprint_newline ();
811 lprintf_n "Current blocks: ";
813 match c.client_chunk with
814 | None -> lprintf "none"
815 | Some (chunk, blocks) -> List.iter (fun b ->
816 CommonSwarming.print_block b.up_block) blocks;
818 lprint_newline ();
820 lprintf_file_nl (as_file file) "Finding Range:";
821 end;
825 (*We must find a block to request first, and then
826 some range inside this block
829 let rec iter () =
831 match c.client_chunk with
833 | None ->
835 if !verbose_swarming then lprintf_file_nl (as_file file) "No block";
836 update_client_bitmap c;
837 (try CommonSwarming.verify_one_chunk swarmer with _ -> ());
838 (*Find a free block in the swarmer*)
839 let chunk, blocks = CommonSwarming.find_blocks up in
840 if !verbose_swarming then begin
841 lprintf_n "Blocks Found: "; List.iter (fun b ->
842 CommonSwarming.print_block b.up_block) blocks;
843 lprint_newline ()
844 end;
845 c.client_chunk <- Some (chunk, blocks);
847 (*We put the found block in client_block to
848 request range in this block. (Useful for
849 not searching each time a new block)
852 iter ()
854 | Some (chunk, blocks) ->
856 if !verbose_swarming then begin
857 lprintf_n "Current Blocks: "; List.iter (fun b ->
858 CommonSwarming.print_block b.up_block) blocks;
859 lprint_newline ()
860 end;
863 (*Given a block find a range inside*)
864 let (x,y,r) =
865 match c.client_range_waiting with
866 | Some (x,y,r) ->
867 c.client_range_waiting <- None;
868 (x,y,r)
869 | None ->
870 CommonSwarming.find_range up (min max_range_len file.file_piece_size)
873 let (x,y,r) =
875 if y -- x > max_range_len then begin
876 c.client_range_waiting <- Some (x ++ max_range_len, y, r);
877 (x, x ++ max_range_len, r)
878 end else
879 (x,y,r)
882 c.client_ranges_sent <- c.client_ranges_sent @ [x,y, r];
883 (* CommonSwarming.alloc_range r; *)
885 (* naughty, naughty, was computing a block number instead of a chunk
886 number. Only matters with merged downloads, and even then other
887 clients didn't seem to care (?), so the bug remained hidden *)
888 if !verbose_swarming then
889 lprintf_file_nl (as_file file) "Asking %d For Range %Ld-%Ld" chunk x y;
891 chunk, x -- file.file_piece_size ** Int64.of_int chunk, y -- x, r
893 with Not_found ->
895 (*If we don't find a range to request inside the block,
896 iter to choose another block*)
897 if !verbose_swarming then
898 lprintf_nl "Could not find range in current block";
899 (* c.client_blocks <- List2.removeq b c.client_blocks; *)
901 c.client_chunk <- None;
903 iter ()
906 iter ()
908 with Not_found ->
909 (*If we don't find a block to request we can check if the
910 file is finished (if there's missing pieces we can't decide
911 that the file is finished because we didn't found
912 a block to ask)
914 if !verbose_swarming then
915 lprintf_nl "Unable to get a block !!";
916 CommonSwarming.compute_bitmap swarmer;
917 check_finished swarmer file;
918 raise Not_found
921 send_client c (Request (num,x,y));
923 if !verbose_msg_clients then
924 lprintf_file_nl (as_file file) "CLIENT %d: Asking %s For Range %Ld-%Ld"
925 (client_num c) (Sha1.to_string c.client_uid) x y
927 with Not_found ->
928 if not (CommonSwarming.check_finished swarmer) && !verbose_download then
929 lprintf_file_nl (as_file file) "BTClient.get_from_client ERROR: can't find a block to download and file is not yet finished for file : %s..." file.file_name
932 (** In this function we match a message sent by a client
933 and react according to this message.
934 @param c The client which sent us a message
935 @param sock The socket used for this client
936 @param msg The message sent by the client
938 and client_to_client c sock msg =
939 if !verbose_msg_clients then begin
940 let (ip,port) = (TcpBufferedSocket.peer_addr sock) in
941 let (timeout, next) = get_rtimeout sock in
942 lprintf_nl "CLIENT %d(%s:%d): (%d, %d,%d) Received %s"
943 (client_num c) (Ip.to_string ip) port
944 (last_time ())
945 (int_of_float timeout)
946 (int_of_float next)
947 (TcpMessages.to_string msg);
948 end;
950 let file = c.client_file in
952 (* Sending the "Have" message was moved to bTGlobals so this is useless *)
953 (* if c.client_blocks_sent != file.file_blocks_downloaded then begin
954 let rec iter list =
955 match list with
956 [] -> ()
957 | b :: tail when tail == c.client_blocks_sent ->
958 c.client_blocks_sent <- list;
959 let (num,_,_) = CommonSwarming.block_block b in
960 send_client c (Have (Int64.of_int num))
961 | _ :: tail -> iter tail
963 iter file.file_blocks_downloaded
964 end;*)
967 match msg with
968 Piece (num, offset, s, pos, len) ->
969 (*A Piece message contains the data*)
970 set_client_state c (Connected_downloading (file_num file));
971 (*flag it as a good client *)
972 c.client_good <- true;
973 if file_state file = FileDownloading then begin
974 let position = offset ++ file.file_piece_size *.. num in
975 let up = match c.client_uploader with
976 None -> assert false
977 | Some up -> up in
978 let swarmer = CommonSwarming.uploader_swarmer up in
980 if !verbose_msg_clients then
981 (match c.client_ranges_sent with
982 [] -> lprintf_file_nl (as_file file) "EMPTY Ranges !!!"
983 | (p1,p2,r) :: _ ->
984 let (x,y) = CommonSwarming.range_range r in
985 lprintf_file_nl (as_file file) "Current range from %s : %Ld [%d] (asked %Ld-%Ld[%Ld-%Ld])"
986 (brand_to_string c.client_brand) position len
987 p1 p2 x y
990 let old_downloaded =
991 CommonSwarming.downloaded swarmer in
992 (* List.iter CommonSwarming.free_range c.client_ranges; *)
993 CommonSwarming.received up
994 position s pos len;
995 (* List.iter CommonSwarming.alloc_range c.client_ranges; *)
996 let new_downloaded =
997 CommonSwarming.downloaded swarmer in
999 (*Update rate and amount of data received from client*)
1000 count_download c (new_downloaded -- old_downloaded);
1001 (* use len here with max_dr quickfix *)
1002 Rate.update c.client_downloaded_rate ~amount:len;
1003 (* count bytes downloaded from network for this file *)
1004 file.file_session_downloaded <- file.file_session_downloaded ++ (Int64.of_int len);
1005 if !verbose_msg_clients then
1006 (match c.client_ranges_sent with
1007 [] -> lprintf_file_nl (as_file file) "EMPTY Ranges !!!"
1008 | (p1,p2,r) :: _ ->
1009 let (x,y) = CommonSwarming.range_range r in
1010 lprintf_file_nl (as_file file) "Received %Ld [%d] %Ld-%Ld[%Ld-%Ld] -> %Ld"
1011 position len
1012 p1 p2 x y
1013 (new_downloaded -- old_downloaded)
1016 (* changed 2.5.28 should have been done before !
1017 if new_downloaded <> old_downloaded then
1018 add_file_downloaded (as_file file)
1019 (new_downloaded -- old_downloaded); *)
1020 end;
1021 begin
1022 match c.client_ranges_sent with
1023 [] -> ()
1024 | r :: tail ->
1025 (* CommonSwarming.free_range r; *)
1026 c.client_ranges_sent <- tail;
1027 end;
1028 get_from_client sock c;
1030 (* Check if the client is still interesting for us... *)
1031 check_if_interesting file c
1033 | PeerID p ->
1034 (* Disconnect if that is ourselves. *)
1035 c.client_uid <- Sha1.direct_of_string p;
1036 if not (c.client_uid = !!client_uid) then
1037 begin
1038 let brand, release = parse_software p in
1039 c.client_brand <- brand;
1040 c.client_release <- release;
1041 send_client c Choke;
1042 c.client_sent_choke <- true;
1044 else
1045 disconnect_client c Closed_by_user
1048 | BitField p ->
1049 (*A bitfield is a summary of what a client have*)
1050 begin
1051 match c.client_file.file_swarmer with
1052 None -> ()
1053 | Some swarmer ->
1054 c.client_new_chunks <- [];
1056 let npieces = CommonSwarming.partition_size swarmer in
1057 let nbits = String.length p * 8 in
1059 if nbits < npieces then begin
1060 lprintf_file_nl (as_file file) "Error: expected bitfield of atleast %d but got %d" npieces nbits;
1061 disconnect_client c (Closed_for_error "Wrong bitfield length")
1062 end else begin
1064 let bitmap = CommonSwarming.chunks_verified_bitmap swarmer in
1066 for i = 0 to npieces - 1 do
1067 if is_bit_set p i then begin
1068 c.client_new_chunks <- i :: c.client_new_chunks;
1069 match VB.get bitmap i with
1070 | VB.State_missing | VB.State_partial ->
1071 c.client_interesting <- true
1072 | VB.State_complete | VB.State_verified -> ()
1073 end
1074 done;
1076 update_client_bitmap c;
1077 c.client_registered_bitfield <- true;
1079 if c.client_interesting then
1080 send_interested c;
1082 if !verbose_msg_clients then
1083 lprintf_file_nl (as_file file) "New BitField Registered";
1085 (* for i = 1 to max_range_requests - List.length c.client_ranges do
1086 (try get_from_client sock c with _ -> ())
1087 done
1090 end;
1091 end;
1092 (* Note: a bitfield must only be sent after the handshake and before everything else: NOT here *)
1094 | Have n ->
1095 (* A client can send a "Have" without sending a Bitfield *)
1096 begin
1097 match c.client_file.file_swarmer with
1098 None -> ()
1099 | Some swarmer ->
1100 let n = Int64.to_int n in
1101 let bitmap = CommonSwarming.chunks_verified_bitmap swarmer in
1102 (* lprintf_nl "verified: %c;" (VB.state_to_char (VB.get bitmap n)); *)
1103 (* if the peer has a chunk we don't, tell him we're interested and update his bitmap *)
1104 match VB.get bitmap n with
1105 | VB.State_missing | VB.State_partial ->
1106 c.client_interesting <- true;
1107 send_interested c;
1108 c.client_new_chunks <- n :: c.client_new_chunks;
1109 update_client_bitmap c;
1110 | VB.State_complete | VB.State_verified -> ()
1112 (* begin
1113 match c.client_bitmap, c.client_uploader with
1114 Some bitmap, Some up ->
1115 let swarmer = CommonSwarming.uploader_swarmer up in
1116 let n = Int64.to_int n in
1117 if bitmap.[n] <> '1' then
1119 let verified = CommonSwarming.verified_bitmap swarmer in
1120 if verified.[n] < '2' then begin
1121 c.client_interesting <- true;
1122 send_interested c;
1123 c.client_new_chunks <- n :: c.client_new_chunks;
1124 if c.client_block = None then begin
1125 update_client_bitmap c;
1126 (* for i = 1 to max_range_requests -
1127 List.length c.client_ranges do
1128 (try get_from_client sock c with _ -> ())
1129 done*)
1132 | None, Some _ -> lprintf_nl "no bitmap but client_uploader";
1133 | Some _ , None ->lprintf_nl "bitmap but no client_uploader";
1134 | None, None -> lprintf_nl "no bitmap no client_uploader";
1139 | Interested ->
1140 c.client_interested <- true;
1142 | Choke ->
1143 begin
1144 set_client_state (c) (Connected (-1));
1145 (* remote peer will clear the list of range we sent *)
1146 begin
1147 match c.client_uploader with
1148 None ->
1149 (* Afaik this is no protocol violation and happens if the client
1150 didn't send a client bitmap after the handshake. *)
1151 let (ip,port) = c.client_host in
1152 if !verbose_msg_clients then lprintf_file_nl (as_file file) "%s:%d with software %s : Choke send, but no client bitmap"
1153 (Ip.to_string ip) port (brand_to_string c.client_brand)
1154 | Some up ->
1155 CommonSwarming.clear_uploader_intervals up
1156 end;
1157 c.client_ranges_sent <- [];
1158 c.client_range_waiting <- None;
1159 c.client_choked <- true;
1162 | NotInterested ->
1163 c.client_interested <- false;
1165 | Unchoke ->
1166 begin
1167 c.client_choked <- false;
1168 (* remote peer cleared our request : re-request *)
1169 for i = 1 to max_range_requests -
1170 List.length c.client_ranges_sent do
1171 (try get_from_client sock c with _ -> ())
1172 done
1175 | Request (n, pos, len) ->
1176 if len > max_request_len then begin
1177 close sock (Closed_for_error "Request longer than 1<<16");
1178 raise Exit
1179 end;
1181 if !CommonGlobals.has_upload = 0 then
1182 begin
1183 if client_has_a_slot (as_client c) then
1184 begin
1185 (* lprintf "Received request for upload\n"; *)
1186 (match c.client_upload_requests with
1187 [] ->
1188 CommonUploads.ready_for_upload (as_client c);
1189 | _ -> ());
1190 c.client_upload_requests <- c.client_upload_requests @ [n,pos,len];
1191 let file = c.client_file in
1192 match file.file_shared with
1193 None -> ()
1194 | Some s ->
1195 begin
1196 s.impl_shared_requests <- s.impl_shared_requests + 1;
1197 shared_must_update (as_shared s)
1200 else
1201 begin
1202 send_client c Choke;
1203 c.client_sent_choke <- true;
1204 c.client_upload_requests <- [];
1206 end;
1208 | Ping -> ()
1209 (* We don't 'generate' a Ping message on a Ping. *)
1211 | Cancel (n, pos, len) ->
1212 (* if we receive a cancel message from a peer, remove request *)
1213 if client_has_a_slot (as_client c) then
1214 c.client_upload_requests <- List2.remove_first (n, pos, len) c.client_upload_requests
1215 else
1216 if !verbose_msg_clients then
1217 lprintf_file_nl (as_file file) "Error: received cancel request but client has no slot"
1219 with e ->
1220 lprintf_file_nl (as_file file) "Error %s while handling MESSAGE: %s" (Printexc2.to_string e) (TcpMessages.to_string msg)
1223 (** The function used to connect to a client.
1224 The connection is not immediately initiated. It will
1225 be put in a fifo and dequeued according to
1226 !!max_connections_per_second. (@see commonGlobals.ml)
1227 @param c The client we must connect
1229 let connect_client c =
1230 if can_open_connection connection_manager &&
1231 (let (ip,port) = c.client_host in
1232 match !Ip.banned (ip, c.client_country_code) with
1233 None -> true
1234 | Some reason ->
1235 if !verbose_connect then
1236 lprintf_nl "%s:%d (%s), blocked: %s"
1237 (Ip.to_string ip) port
1238 (fst (Geoip.get_country_code_name c.client_country_code))
1239 reason;
1240 false)
1241 then
1242 match c.client_sock with
1243 NoConnection ->
1245 let token =
1246 add_pending_connection connection_manager (fun token ->
1248 if !verbose_msg_clients then
1249 lprintf_nl "CLIENT %d: connect_client" (client_num c);
1250 let (ip,port) = c.client_host in
1251 if !verbose_msg_clients then
1252 lprintf_nl "connecting %s:%d" (Ip.to_string ip) port;
1253 connection_try c.client_connection_control;
1254 begin
1255 let sock = connect token "bittorrent download"
1256 (Ip.to_inet_addr ip) port
1257 (fun sock event ->
1258 match event with
1259 BASIC_EVENT LTIMEOUT ->
1260 if !verbose_msg_clients then
1261 lprintf_nl "CLIENT %d: LIFETIME" (client_num c);
1262 close sock Closed_for_timeout
1263 | BASIC_EVENT RTIMEOUT ->
1264 if !verbose_msg_clients then
1265 lprintf_nl "CLIENT %d: RTIMEOUT (%d)" (client_num c)
1266 (last_time ())
1268 close sock Closed_for_timeout
1269 | BASIC_EVENT (CLOSED r) ->
1270 begin
1271 match c.client_sock with
1272 | Connection s when s == sock ->
1273 disconnect_client c r
1274 | _ -> ()
1275 end;
1276 | _ -> ()
1279 c.client_sock <- Connection sock;
1280 set_lifetime sock 600.;
1281 TcpBufferedSocket.set_read_controler sock download_control;
1282 TcpBufferedSocket.set_write_controler sock upload_control;
1283 TcpBufferedSocket.set_rtimeout sock 30.;
1284 let file = c.client_file in
1286 if !verbose_msg_clients then
1287 lprintf_file_nl (as_file file) "READY TO DOWNLOAD FILE";
1289 send_init !!client_uid file.file_id sock;
1290 (* Fabrice: Initialize the client bitmap and uploader fields to <> None *)
1291 update_client_bitmap c;
1292 (* (try get_from_client sock c with _ -> ());*)
1293 incr counter;
1294 (*We 'hook' the client_parse_header function to the socket
1295 This function will then be called when the first message will
1296 be parsed
1298 set_bt_sock sock !verbose_msg_clients
1299 (BTHeader (client_parse_header !counter (ref ((Some c), c.client_country_code)) true))
1301 with e ->
1302 lprintf_nl "Exception %s while connecting to client"
1303 (Printexc2.to_string e);
1304 disconnect_client c (Closed_for_exception e)
1306 (*Since this is a pending connection put ConnectionWaiting
1307 in client_sock
1310 c.client_sock <- ConnectionWaiting token
1311 | _ -> ()
1314 (** The Listen function (very much like in C : TCP Socket Server).
1315 Monitors client connection to us.
1317 let listen () =
1319 let s = TcpServerSocket.create "bittorrent client server"
1320 (Ip.to_inet_addr !!client_bind_addr)
1321 !!client_port
1322 (fun sock event ->
1323 match event with
1324 TcpServerSocket.CONNECTION (s,
1325 Unix.ADDR_INET(from_ip, from_port)) ->
1326 (*Receiving an event TcpServerSocket.CONNECTION from
1327 the TcpServerSocket means that a new client try
1328 to connect to us
1330 let ip = (Ip.of_inet_addr from_ip) in
1331 let cc = Geoip.get_country_code_option ip in
1332 if !verbose_sources > 1 then lprintf_nl "CONNECTION RECEIVED FROM %s"
1333 (Ip.to_string (Ip.of_inet_addr from_ip))
1335 (*Reject this connection if we don't want
1336 to bypass the max_connection parameter
1338 if can_open_connection connection_manager &&
1339 (match !Ip.banned (ip, cc) with
1340 None -> true
1341 | Some reason ->
1342 if !verbose_connect then
1343 lprintf_nl "%s:%d (%s) blocked: %s"
1344 (Ip.to_string ip) from_port
1345 (fst (Geoip.get_country_code_name cc))
1346 reason;
1347 false)
1348 then
1349 begin
1350 let token = create_token connection_manager in
1351 let sock = TcpBufferedSocket.create token
1352 "bittorrent client connection" s
1353 (fun sock event ->
1354 match event with
1355 BASIC_EVENT (RTIMEOUT|LTIMEOUT) ->
1356 (*monitor read and life timeout on client
1357 sockets
1359 close sock Closed_for_timeout
1360 | _ -> ()
1363 TcpBufferedSocket.set_read_controler sock download_control;
1364 TcpBufferedSocket.set_write_controler sock upload_control;
1366 let c = ref (None, cc) in
1367 TcpBufferedSocket.set_closer sock (fun _ r ->
1368 match fst !c with
1369 Some c -> begin
1370 match c.client_sock with
1371 | Connection s when s == sock ->
1372 disconnect_client c r
1373 | _ -> ()
1375 | None -> ()
1377 set_rtimeout sock 30.;
1378 incr counter;
1379 (*Again : 'hook' client_parse_header to the socket*)
1380 set_bt_sock sock !verbose_msg_clients
1381 (BTHeader (client_parse_header !counter c false));
1383 else
1384 (*don't forget to close the incoming sock if we can't
1385 open a new connection
1387 Unix.close s
1388 | _ -> ()
1389 ) in
1390 listen_sock := Some s;
1392 with e ->
1393 if !verbose_connect then
1394 lprintf_nl "Exception %s while init bittorrent server"
1395 (Printexc2.to_string e)
1398 (** This function send keepalive messages to all connected clients
1399 (and update socket lifetime)
1401 let send_pings () =
1402 List.iter (fun file ->
1403 Hashtbl.iter (fun _ c ->
1404 match c.client_sock with
1405 | Connection sock ->
1406 send_client c Ping;
1407 set_lifetime sock 130.;
1408 | _ -> ()
1409 ) file.file_clients
1410 ) !current_files
1412 open Bencode
1415 (** Check each clients for a given file if they are connected.
1416 If they aren't, try to connect them
1418 let resume_clients file =
1419 Hashtbl.iter (fun _ c ->
1421 match c.client_sock with
1422 | Connection sock -> ()
1423 (*i think this one is not really usefull for debugging
1424 lprintf_nl "[BT]: RESUME: Client is already connected"; *)
1425 | _ ->
1426 (try
1427 (*test if we can connect client according to the its
1428 connection_control.
1429 Currently the delay between two try is 120 seconds.
1431 if connection_can_try c.client_connection_control then
1432 connect_client c
1433 else
1434 print_control c.client_connection_control
1435 with _ -> ())
1436 with e ->
1437 if !verbose_connect then
1438 lprintf_file_nl (as_file file) "Exception %s in resume_clients" (Printexc2.to_string e)
1439 ) file.file_clients
1441 let () =
1442 resume_clients_hook := resume_clients
1444 (** Check if the value replied by the tracker is correct.
1445 @param key the name of the key
1446 @param n the value to check
1447 @param url Url of the tracker
1448 @param name the name of the file
1450 let chk_keyval key n url name =
1451 let int_n = (Int64.to_int n) in
1452 if !verbose_msg_clients then
1453 lprintf_nl "Reply from %s in file: %s has %s: %d" (show_tracker_url url) name key int_n;
1454 if int_n > -1 then
1455 int_n
1456 else begin
1457 lprintf_nl "Reply from %s in file: %s has an invalid %s value: %d" (show_tracker_url url) name key int_n;
1461 (** Check that client is valid and record it *)
1462 let maybe_new_client file id ip port =
1463 let cc = Geoip.get_country_code_option ip in
1464 if id <> !!client_uid
1465 && ip != Ip.null
1466 && port <> 0
1467 && (match !Ip.banned (ip, cc) with
1468 | None -> true
1469 | Some reason ->
1470 if !verbose_connect then
1471 lprintf_file_nl (as_file file) "%s:%d blocked: %s" (Ip.to_string ip) port reason;
1472 false)
1473 then
1474 ignore (new_client file id (ip,port) cc);
1475 if !verbose_sources > 1 then
1476 lprintf_file_nl (as_file file) "Received %s:%d" (Ip.to_string ip) port;
1479 let exn_catch f x = try `Ok (f x) with exn -> `Exn exn
1481 (** In this function we interact with the tracker
1482 @param file The file for which we want some sources
1483 @param need_sources whether we need any sources
1485 let talk_to_tracker file need_sources =
1486 (* This is the function which will be called by the http client for parsing the response *)
1487 let f t filename =
1488 let tracker_url = show_tracker_url t.tracker_url in
1489 let tracker_failed reason =
1490 (* On failure, disable the tracker and count attempts (@see is_tracker_enabled) *)
1491 let num = match t.tracker_status with | Disabled_failure (i,_) -> i + 1 | _ -> 1 in
1492 t.tracker_status <- Disabled_failure (num, intern reason);
1493 lprintf_file_nl (as_file file) "Failure no. %d%s from Tracker %s for file: %s Reason: %s"
1495 (if !!tracker_retries = 0 then "" else Printf.sprintf "/%d" !!tracker_retries)
1496 tracker_url file.file_name (Charset.Locale.to_utf8 reason)
1498 match exn_catch File.to_string filename with
1499 | `Exn _ | `Ok "" -> tracker_failed "empty reply"
1500 | `Ok s ->
1501 match exn_catch Bencode.decode s with
1502 | `Exn exn -> tracker_failed (Printf.sprintf "wrong reply (%s)" (Printexc2.to_string exn))
1503 | `Ok (Dictionary list) ->
1504 t.tracker_interval <- 600;
1505 t.tracker_min_interval <- 600;
1506 t.tracker_last_clients_num <- 0;
1507 if need_sources then t.tracker_last_clients_num <- 0;
1508 let chk_keyval key n = chk_keyval key n t.tracker_url file.file_name in
1509 if not (List.mem_assoc "failure reason" list) then
1510 begin
1511 begin match t.tracker_status with
1512 | Disabled_failure (i, _) ->
1513 lprintf_file_nl (as_file file) "Received good message from Tracker %s after %d bad attempts"
1514 t.tracker_url i
1515 | _ -> () end;
1516 (* Received good message from tracker after failures, re-enable tracker *)
1517 t.tracker_status <- Enabled;
1518 end;
1519 List.iter (fun (key,value) ->
1520 match (key,value) with
1521 | "failure reason", String failure -> tracker_failed failure
1522 | "warning message", String warning ->
1523 lprintf_file_nl (as_file file) "Warning from Tracker %s in file: %s Reason: %s"
1524 tracker_url file.file_name warning
1525 | "interval", Int n ->
1526 t.tracker_interval <- chk_keyval key n;
1527 (* in case we don't receive "min interval" *)
1528 if t.tracker_min_interval > t.tracker_interval then
1529 t.tracker_min_interval <- t.tracker_interval
1530 | "min interval", Int n ->
1531 t.tracker_min_interval <- chk_keyval key n;
1532 (* make sure "min interval" is always < or equal to "interval" *)
1533 if t.tracker_min_interval > t.tracker_interval then
1534 t.tracker_min_interval <- t.tracker_interval
1535 | "downloaded", Int n ->
1536 t.tracker_torrent_downloaded <- chk_keyval key n
1537 | "complete", Int n
1538 | "done peers", Int n ->
1539 t.tracker_torrent_complete <- chk_keyval key n
1540 | "incomplete", Int n ->
1541 t.tracker_torrent_incomplete <- chk_keyval key n;
1542 (* if complete > 0 and we receive incomplete we probably won't receive num_peers so we simulate it below *)
1543 if t.tracker_torrent_complete > 0 then
1544 t.tracker_torrent_total_clients_count <- (t.tracker_torrent_complete + t.tracker_torrent_incomplete);
1545 | "num peers", Int n ->
1546 t.tracker_torrent_total_clients_count <- chk_keyval key n;
1547 (* if complete > 0 and we receive num_peers we probably won't receive incomplete so we simulate it below *)
1548 if t.tracker_torrent_complete > 0 then
1549 t.tracker_torrent_incomplete <- (t.tracker_torrent_total_clients_count - t.tracker_torrent_complete);
1550 | "last", Int n ->
1551 t.tracker_torrent_last_dl_req <- chk_keyval key n
1552 | "key", String n ->
1553 t.tracker_key <- n;
1554 if !verbose_msg_clients then
1555 lprintf_file_nl (as_file file) "%s in file: %s has key: %s" tracker_url file.file_name n
1556 | "tracker id", String n ->
1557 t.tracker_id <- n;
1558 if !verbose_msg_clients then
1559 lprintf_file_nl (as_file file) "%s in file: %s has tracker id %s" tracker_url file.file_name n
1561 | "peers", List list ->
1562 if need_sources then
1563 List.iter (fun v ->
1564 match v with
1565 | Dictionary list ->
1566 let peer_id = ref Sha1.null in
1567 let peer_ip = ref Ip.null in
1568 let port = ref 0 in
1570 List.iter (fun v ->
1571 match v with
1572 "peer id", String id ->
1573 peer_id := Sha1.direct_of_string id;
1574 | "ip", String ip ->
1575 peer_ip := Ip.of_string ip
1576 | "port", Int p ->
1577 port := Int64.to_int p
1578 | _ -> ()
1579 ) list;
1581 t.tracker_last_clients_num <- t.tracker_last_clients_num + 1;
1582 maybe_new_client file !peer_id !peer_ip !port
1584 | _ -> assert false
1585 ) list
1586 | "peers", String p ->
1587 let rec iter_comp s pos l =
1588 if pos < l then
1589 let ip = Ip.of_ints (get_uint8 s pos,get_uint8 s (pos+1),
1590 get_uint8 s (pos+2),get_uint8 s (pos+3))
1591 and port = get_int16 s (pos+4)
1593 t.tracker_last_clients_num <- t.tracker_last_clients_num + 1;
1594 maybe_new_client file Sha1.null ip port;
1596 iter_comp s (pos+6) l
1598 if need_sources then
1599 iter_comp p 0 (String.length p)
1600 | "private", Int n -> ()
1601 (* TODO: if set to 1, disable peer exchange *)
1603 | key, _ -> lprintf_file_nl (as_file file) "received unknown entry in answer from tracker: %s : %s" key (Bencode.print value)
1604 ) list;
1605 (*Now, that we have added new clients to a file, it's time
1606 to connect to them*)
1607 if !verbose_sources > 0 then
1608 lprintf_file_nl (as_file file) "talk_to_tracker: got %i source(s) for file %s"
1609 t.tracker_last_clients_num file.file_name;
1610 if need_sources then resume_clients file
1612 | _ -> tracker_failed "wrong reply (value)"
1614 let event =
1615 if file.file_tracker_connected then ""
1616 else "started"
1618 connect_trackers file event need_sources f
1621 (** Check to see if file is finished, if not
1622 try to get sources for it
1624 let recover_files () =
1625 if !verbose_share then
1626 lprintf_nl "recover_files";
1627 List.iter (fun file ->
1628 match file.file_swarmer with
1629 None -> ()
1630 | Some swarmer ->
1631 (try check_finished swarmer file with e -> ());
1632 match file_state file with
1633 FileDownloading ->
1634 if !verbose_share then
1635 lprintf_file_nl (as_file file) "recover downloading";
1636 (try talk_to_tracker file true with _ -> ())
1637 | FileShared ->
1638 if !verbose_share then
1639 lprintf_file_nl (as_file file) "recover shared";
1640 (try talk_to_tracker file false with _ -> ())
1641 | FilePaused -> () (*when we are paused we do nothing, not even logging this vvvv*)
1642 | FileQueued -> ()
1643 | s -> if !verbose then lprintf_file_nl (as_file file) "recover: Other state %s!!" (string_of_state s)
1644 ) !current_files
1646 let upload_buffer = String.create 100000
1650 Send a Piece message
1651 for one of the request of client
1652 @param sock The socket of the client
1653 @param c The client
1655 let rec iter_upload sock c =
1656 match c.client_upload_requests with
1657 [] -> ()
1658 | (num, pos, len) :: tail ->
1659 if len = zero then begin
1660 c.client_upload_requests <- tail;
1661 iter_upload sock c
1662 end else
1663 if c.client_allowed_to_write >= 0L then begin
1665 c.client_upload_requests <- tail;
1667 let file = c.client_file in
1668 let offset = pos ++ file.file_piece_size *.. num in
1669 c.client_allowed_to_write <- c.client_allowed_to_write -- len;
1670 count_upload c len;
1671 let len = Int64.to_int len in
1672 (* lprintf "Unix32.read: offset %Ld len %d\n" offset len; *)
1673 Unix32.read (file_fd file) offset upload_buffer 0 len;
1674 (* update upload rate from len bytes *)
1675 Rate.update c.client_upload_rate ~amount:len;
1676 Rate.update c.client_downloaded_rate;
1677 file.file_uploaded <- file.file_uploaded ++ (Int64.of_int len);
1678 file.file_session_uploaded <- file.file_session_uploaded ++ (Int64.of_int len);
1679 let _ =
1680 (* update stats *)
1681 count_filerequest c;
1682 match file.file_shared with
1683 None -> ()
1684 | Some s ->
1685 begin
1686 s.impl_shared_uploaded <- file.file_uploaded;
1687 shared_must_update (as_shared s)
1690 (* lprintf "sending piece\n"; *)
1691 send_client c (Piece (num, pos, upload_buffer, 0, len));
1692 iter_upload sock c
1693 with e ->
1694 if !verbose then
1695 lprintf_nl "Exception %s in iter_upload" (Printexc2.to_string e)
1696 end else
1697 begin
1698 (* lprintf "client is waiting for another piece\n"; *)
1699 ready_for_upload (as_client c)
1704 In this function we check if we can send bytes (according
1705 to bandwidth control), if we can, call iter_upload to
1706 send a Piece message
1707 @param c the client to which we can send some bytes
1708 @param allowed the amount of bytes we can send to client
1710 let client_can_upload c allowed =
1711 (* lprintf "allowed to upload %d\n" allowed; *)
1712 do_if_connected c.client_sock (fun sock ->
1713 match c.client_upload_requests with
1714 [] -> ()
1715 | _ :: tail ->
1716 let new_allowed_to_write =
1717 c.client_allowed_to_write ++ (Int64.of_int allowed) in
1718 if allowed > 0 && can_write_len sock
1719 (Int64.to_int new_allowed_to_write)
1720 then begin
1721 CommonUploads.consume_bandwidth allowed;
1722 c.client_allowed_to_write <- new_allowed_to_write;
1723 end;
1724 iter_upload sock c
1727 let file_resume file =
1728 List.iter (fun t ->
1729 match t.tracker_status with
1730 | Enabled | Disabled_mld _ -> ()
1731 | Disabled_failure _ | Disabled _ -> t.tracker_status <- Enabled
1732 ) file.file_trackers;
1733 (try talk_to_tracker file true with _ -> ())
1738 Send info to tracker when stopping a file.
1739 @param file the file we want to stop
1741 let file_stop file =
1742 if file.file_tracker_connected then
1743 begin
1744 connect_trackers file "stopped" false (fun _ _ ->
1745 lprintf_file_nl (as_file file) "Tracker return: stopped %s" file.file_name;
1746 file.file_tracker_connected <- false)
1750 Create the 'hooks'
1752 let _ =
1753 client_ops.op_client_can_upload <- client_can_upload;
1754 file_ops.op_file_resume <- file_resume;
1755 file_ops.op_file_recover <- file_resume;
1756 file_ops.op_file_pause <- (fun file ->
1757 Hashtbl.iter (fun _ c ->
1758 match c.client_sock with
1759 Connection sock -> close sock Closed_by_user
1760 | _ -> ()
1761 ) file.file_clients;
1762 (*When a file is paused we consider it is stopped*)
1763 file_stop file
1765 file_ops.op_file_queue <- file_ops.op_file_pause;
1766 client_ops.op_client_enter_upload_queue <- (fun c ->
1767 if !verbose_msg_clients then
1768 lprintf_nl "Client %d: client_enter_upload_queue" (client_num c);
1769 ready_for_upload (as_client c));
1770 network.op_network_connected_servers <- (fun _ -> []);