patch #7442
[mldonkey.git] / src / networks / bittorrent / bTClients.ml
blob929a95cc01a7a56727223199a8c885e0a44b4a09
1 (* Copyright 2001, 2002 b52_simon :), b8_bavard, b8_fee_carabine, INRIA *)
2 (*
3 This file is part of mldonkey.
5 mldonkey is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation; either version 2 of the License, or
8 (at your option) any later version.
10 mldonkey is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 GNU General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with mldonkey; if not, write to the Free Software
17 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
21 (** Functions used in client<->client communication
22 and also client<->tracker
25 (** A peer (or client) is always a remote peer in this file.
26 A Piece is a portion of the file associated with a hash (sha1).
27 In mldonkey a piece is referred as a block inside the swarming system.
28 A SubPiece is a portion of a piece (without hash) which can be
29 sent/downloaded to/from a peer.
30 In mldonkey a SubPiece is referred as a range inside the swarming system.
31 @see <http://wiki.theory.org/index.php/BitTorrentSpecification> wiki for some
32 unofficial (but more detailed) specs.
35 open Int64ops
36 open AnyEndian
37 open BigEndian
38 open Printf2
39 open Md4
40 open Options
41 open BasicSocket
42 open TcpBufferedSocket
43 open Ip_set
45 open CommonShared
46 open CommonUploads
47 open CommonOptions
48 open CommonDownloads
49 open CommonInteractive
50 open CommonClient
51 open CommonComplexOptions
52 open CommonTypes
53 open CommonFile
54 open CommonSwarming
55 open CommonGlobals
56 open CommonDownloads
58 open BTRate
59 open BTTypes
60 open BTProtocol
61 open BTOptions
62 open BTGlobals
63 open BTComplexOptions
64 open BTChooser
65 open BTStats
66 open TcpMessages
68 module VB = VerificationBitmap
70 let http_ok = "HTTP 200 OK"
71 let http11_ok = "HTTP/1.1 200 OK"
74 let next_uploaders = ref ([] : BTTypes.client list)
75 let current_uploaders = ref ([] : BTTypes.client list)
77 (** Check that client is valid and record it *)
78 let maybe_new_client file id ip port =
79 let cc = Geoip.get_country_code_option ip in
80 if id <> !!client_uid
81 && ip != Ip.null
82 && port <> 0
83 && (match !Ip.banned (ip, cc) with
84 | None -> true
85 | Some reason ->
86 if !verbose_connect then
87 lprintf_file_nl (as_file file) "%s:%d blocked: %s" (Ip.to_string ip) port reason;
88 false)
89 then
90 ignore (new_client file id (ip,port) cc);
91 if !verbose_sources > 1 then
92 lprintf_file_nl (as_file file) "Received %s:%d" (Ip.to_string ip) port
95 let resume_clients_hook = ref (fun _ -> assert false)
97 include struct
99 (* open modules locally *)
100 open BTUdpTracker
101 open UdpSocket
103 let string_of_event = function
104 | READ_DONE -> "READ_DONE"
105 | WRITE_DONE -> "WRITE_DONE"
106 | CAN_REFILL -> "CAN_REFILL"
107 | BASIC_EVENT e -> match e with
108 | CLOSED reason -> "CLOSED " ^ (string_of_reason reason)
109 | RTIMEOUT -> "RTIMEOUT"
110 | WTIMEOUT -> "WTIMEOUT"
111 | LTIMEOUT -> "LTIMEOUT"
112 | CAN_READ -> "CAN_READ"
113 | CAN_WRITE -> "CAN_WRITE"
115 (** talk to udp tracker and parse response
116 except of parsing should perform everything that
117 talk_to_tracker's inner function does FIXME refactor both
119 Better create single global udp socket and use it for all
120 tracker requests and distinguish trackers by txn? FIXME?
122 let talk_to_udp_tracker host port args file t need_sources =
123 let interact ip =
124 let socket = create (Ip.to_inet_addr !!client_bind_addr) 0 (fun sock event ->
125 (* lprintf_nl "udpt got event %s for %s" (string_of_event event) host; *)
126 match event with
127 | WRITE_DONE | CAN_REFILL -> ()
128 | READ_DONE -> assert false (* set_reader prevents this *)
129 | BASIC_EVENT x -> match x with
130 | CLOSED _ -> ()
131 | CAN_READ | CAN_WRITE -> assert false (* udpSocket implementation prevents this *)
132 | LTIMEOUT | WTIMEOUT | RTIMEOUT -> close sock (Closed_for_error "udpt timeout"))
134 let set_reader f =
135 set_reader socket begin fun _ ->
136 try f () with exn ->
137 lprintf_nl "udpt interact exn %s" (Printexc2.to_string exn);
138 close socket (Closed_for_exception exn)
141 BasicSocket.set_wtimeout (sock socket) 60.;
142 BasicSocket.set_rtimeout (sock socket) 60.;
143 let txn = Random.int32 Int32.max_int in
144 (* lprintf_nl "udpt txn %ld for %s" txn host; *)
145 write socket false (connect_request txn) ip port;
146 set_reader begin fun () ->
147 let p = read socket in
148 let conn = connect_response p.udp_content txn in
149 (* lprintf_nl "udpt connection_id %Ld for %s" conn host; *)
150 let txn = Random.int32 Int32.max_int in
151 (* lprintf_nl "udpt txn' %ld for host %s" txn host; *)
152 let int s = Int64.of_string (List.assoc s args) in
153 let req = announce_request conn txn
154 ~info_hash:(List.assoc "info_hash" args)
155 ~peer_id:(List.assoc "peer_id" args)
156 (int "downloaded",int "left",int "uploaded")
157 (match try List.assoc "event" args with Not_found -> "" with
158 | "completed" -> 1l
159 | "started" -> 2l
160 | "stopped" -> 3l
161 | "" -> 0l
162 | s -> lprintf_nl "udpt event %s? for %s" s host; 0l)
163 ~ip:(if !!force_client_ip then (Int64.to_int32 (Ip.to_int64 !!set_client_ip)) else 0l)
164 ~numwant:(if need_sources then try Int32.of_string (List.assoc "numwant" args) with _ -> -1l else 0l)
165 (int_of_string (List.assoc "port" args))
167 write socket false req ip port;
168 set_reader (fun () ->
169 let p = read socket in
171 t.tracker_last_conn <- last_time ();
172 file.file_tracker_connected <- true;
173 t.tracker_interval <- 600;
174 t.tracker_min_interval <- 600;
175 if need_sources then t.tracker_last_clients_num <- 0;
177 let (interval,clients) = announce_response p.udp_content txn in
178 if !verbose_msg_servers then
179 lprintf_nl "udpt got interval %ld clients %d for host %s" interval (List.length clients) host;
180 if interval > 0l then
181 begin
182 t.tracker_interval <- Int32.to_int interval;
183 if t.tracker_min_interval > t.tracker_interval then
184 t.tracker_min_interval <- t.tracker_interval
185 end;
186 if need_sources then
187 List.iter (fun (ip',port) ->
188 let ip = Ip.of_int64 (Int64.logand 0xFFFFFFFFL (Int64.of_int32 ip')) in
189 (* lprintf_nl "udpt got %s:%d" (Ip.to_string ip) port; *)
190 t.tracker_last_clients_num <- t.tracker_last_clients_num + 1;
191 maybe_new_client file Sha1.null ip port
192 ) clients;
193 close socket Closed_by_user;
194 if !verbose_msg_servers then
195 lprintf_nl "udpt interact done for %s" host;
196 if need_sources then !resume_clients_hook file
197 ) end
200 if !verbose_msg_servers then
201 lprintf_nl "udpt start with %s:%d" host port;
202 Ip.async_ip host (fun ip ->
203 (* lprintf_nl "udpt resolved %s to ip %s" host (Ip.to_string ip); *)
204 try interact ip with exn -> lprintf_nl "udpt interact exn %s" (Printexc2.to_string exn))
205 (fun n ->
206 if !verbose_msg_servers then
207 lprintf_nl "udpt failed to resolve %s (%d)" host n)
208 with
209 exn ->
210 lprintf_nl "udpt start exn %s" (Printexc2.to_string exn)
212 end (* include *)
215 In this function we connect to a tracker.
216 @param file The file concerned by the request
217 @param url Url of the tracker to connect
218 @param event Event (as a string) to send to the tracker :
219 can be 'completed' if the file is complete, 'started' for the first
220 connection to this tracker or 'stopped' for a clean stop of the file.
221 Everything else will be ok for a second connection to the tracker.
222 Be careful to the spelling of this event
223 @param f The function used to parse the result of the connection.
224 The function will get a file as an argument (@see talk_to_tracker
225 for an example)
227 If we have less than !!ask_tracker_threshold sources
228 and if we respect the file_tracker_interval then
229 we really ask sources to the tracker
231 let connect_trackers file event need_sources f =
233 (* reset session statistics when sending 'started' event *)
234 if event = "started" then
235 begin
236 file.file_session_uploaded <- Int64.zero;
237 file.file_session_downloaded <- Int64.zero;
238 end;
240 let args,must_check_delay, left =
242 match file.file_swarmer with
243 None ->
244 begin
245 match event with
246 | "started" -> [("event", "started")],true,zero
247 | "stopped" -> [("event", "stopped")],false,zero
248 | _ -> [],true,zero
251 | Some swarmer ->
252 let local_downloaded = CommonSwarming.downloaded swarmer in
253 let left = file_size file -- local_downloaded in
254 match event with
255 | "completed" -> [("event", "completed")],false,zero
256 | "started" -> [("event", "started")],true,left
257 | "stopped" -> [("event", "stopped")],false,left
258 | _ -> [],true,left
261 let args = ("no_peer_id", "1") :: ("compact", "1") :: args in
262 let args =
263 if not need_sources then
264 ("numwant", "0") :: args
265 else if !!numwant > -1 then
266 ("numwant", string_of_int !!numwant) :: args
267 else
268 args
270 let args = if !!send_key then
271 ("key", Sha1.to_hexa !!client_uid) :: args else args
273 let args = if !!force_client_ip then
274 ("ip", Ip.to_string !!set_client_ip) :: args else args
276 let args =
277 ("info_hash", Sha1.direct_to_string file.file_id) ::
278 ("peer_id", Sha1.direct_to_string !!client_uid) ::
279 ("port", string_of_int !!client_port) ::
280 ("uploaded", Int64.to_string file.file_session_uploaded) ::
281 ("downloaded", Int64.to_string file.file_session_downloaded) ::
282 ("left", Int64.to_string left) ::
283 args
286 let enabled_trackers =
287 let enabled_trackers = List.filter (fun t -> tracker_is_enabled t) file.file_trackers in
288 if enabled_trackers <> [] then enabled_trackers
289 else begin
290 (* if there is no tracker left, do something ? *)
291 if !verbose_msg_servers then
292 lprintf_nl "No trackers left for %s, reenabling all of them..." (file_best_name (as_file file));
293 List.iter (fun t ->
294 match t.tracker_status with
295 (* only re-enable after normal error *)
296 | Disabled _ -> t.tracker_status <- Enabled
297 | _ -> ()) file.file_trackers;
298 List.filter (fun t -> tracker_is_enabled t) file.file_trackers
299 end in
301 List.iter (fun t ->
303 (* if we have too few sources we may ask the tracker before the interval *)
304 if not must_check_delay
305 || not file.file_tracker_connected
306 || t.tracker_last_conn + t.tracker_interval < last_time()
307 || ( file.file_clients_num < !!ask_tracker_threshold
308 && (file_state file) == FileDownloading
309 && (if t.tracker_min_interval > !!min_tracker_reask_interval then
310 t.tracker_last_conn + t.tracker_min_interval < last_time()
311 else
312 t.tracker_last_conn + !!min_tracker_reask_interval < last_time() ))
313 then
314 begin
315 (* if we already tried to connect but failed, disable tracker, but allow re-enabling *)
316 (* FIXME t.tracker_last_conn < 1 only at first connect, so later failures will stay undetected! *)
317 if file.file_tracker_connected && t.tracker_last_clients_num = 0 && t.tracker_last_conn < 1 then
318 begin
319 if !verbose_msg_servers then
320 lprintf_nl "Request error from tracker: disabling %s" (show_tracker_url t.tracker_url);
321 t.tracker_status <- Disabled (intern "MLDonkey: Request error from tracker")
323 (* Send request to tracker *)
324 else
325 let args = if String.length t.tracker_id > 0 then
326 ("trackerid", t.tracker_id) :: args else args
328 let args = if String.length t.tracker_key > 0 then
329 ("key", t.tracker_key) :: args else args
331 if !verbose_msg_servers then
332 lprintf_nl "connect_trackers: connected:%s id:%s key:%s last_clients:%i last_conn-last_time:%i numwant:%s file: %s"
333 (string_of_bool file.file_tracker_connected)
334 t.tracker_id t.tracker_key t.tracker_last_clients_num
335 (t.tracker_last_conn - last_time()) (try List.assoc "numwant" args with _ -> "_") file.file_name;
337 match t.tracker_url with
338 | `Http url ->
339 let module H = Http_client in
340 let r = {
341 H.basic_request with
342 H.req_url = Url.of_string ~args: args url;
343 H.req_proxy = !CommonOptions.http_proxy;
344 H.req_user_agent = get_user_agent ();
345 (* #4541 [egs] supports redirect *)
346 H.req_max_retry = !!max_tracker_redirect;
347 } in
349 if !verbose_msg_servers then
350 lprintf_nl "Request sent to tracker %s for file: %s"
351 url file.file_name;
352 H.wget r
353 (fun fileres ->
354 t.tracker_last_conn <- last_time ();
355 file.file_tracker_connected <- true;
356 f t fileres)
357 | `Other url -> assert false (* should have been disabled *)
358 | `Udp (host,port) -> talk_to_udp_tracker host port args file t need_sources
361 else
362 if !verbose_msg_servers then
363 lprintf_nl "Request NOT sent to tracker %s - next request in %ds for file: %s"
364 (show_tracker_url t.tracker_url) (t.tracker_interval - (last_time () - t.tracker_last_conn)) file.file_name
365 ) enabled_trackers
367 let connect_trackers file event need_sources f =
368 if !!use_trackers then connect_trackers file event need_sources f
370 let start_upload c =
371 set_client_upload (as_client c) (as_file c.client_file);
372 set_client_has_a_slot (as_client c) NormalSlot;
373 Rate.update_no_change c.client_downloaded_rate;
374 Rate.update_no_change c.client_upload_rate;
375 c.client_last_optimist <- last_time();
376 client_enter_upload_queue (as_client c);
377 send_client c Unchoke
379 (** In this function we decide which peers will be
380 uploaders. We send a choke message to current uploaders
381 that are not in the next uploaders list. We send Unchoke
382 for clients that are in next list (and not in current)
384 let recompute_uploaders () =
385 if !verbose_upload then lprintf_nl "recompute_uploaders";
386 next_uploaders := choose_uploaders current_files;
387 (*Send choke if a current_uploader is not in next_uploaders*)
388 List.iter ( fun c -> if ((List.mem c !next_uploaders)==false) then
389 begin
390 set_client_has_a_slot (as_client c) NoSlot;
391 (*we will let him finish his download and choke him on next_request*)
393 ) !current_uploaders;
395 (*don't send Choke if new uploader is already an uploaders *)
396 List.iter ( fun c ->
397 if not (List.mem c !current_uploaders) then start_upload c
398 ) !next_uploaders;
399 current_uploaders := !next_uploaders
402 (****** Fabrice: why are clients which are disconnected removed ???
403 These clients might still be useful to reconnect to, no ? *)
406 (** This function is called when a client is disconnected
407 (be it by our side or its side).
408 A client which disconnects (even only one time) is discarded.
409 If it's an uploader which disconnects we recompute uploaders
410 (see recompute_uploaders) immediately.
411 @param c The client to disconnect
412 @param reason The reason for the disconnection (see in BasicSocket.ml)
414 let disconnect_client c reason =
415 if !verbose_msg_clients then
416 lprintf_nl "Client %d: disconnected: %s" (client_num c) (string_of_reason reason);
417 begin
418 match c.client_sock with
419 NoConnection -> ()
420 | ConnectionWaiting token ->
421 cancel_token token;
422 c.client_sock <- NoConnection
423 | Connection sock ->
424 close sock reason;
426 (* List.iter (fun r -> CommonSwarming.free_range r) c.client_ranges; *)
427 set_client_disconnected c reason;
428 c.client_session_downloaded <- 0L;
429 c.client_session_uploaded <- 0L;
430 (try if c.client_good then count_seen c with _ -> ());
431 (* this is not useful already done in the match
432 (try close sock reason with _ -> ()); *)
433 (*---------not needed ?? VvvvvV---------------
434 c.client_ranges <- [];
435 c.client_block <- None;
436 if not c.client_good then
437 connection_failed c.client_connection_control;
438 c.client_good <- false;
439 c.client_sock <- NoConnection;
440 c.client_chunks <- [];
441 c.client_allowed_to_write <- zero;
442 c.client_new_chunks <- [];
443 c.client_interesting <- false;
444 c.client_alrd_sent_interested <- false;
445 -------------------^^^^^--------------------*)
446 if (c.client_registered_bitfield) then
447 begin
448 match c.client_uploader with
449 None -> ()
450 | Some up ->
451 c.client_uploader <- None;
452 (* If the client registered a bitfield then
453 we must unregister him to update the swarmer
454 (Useful for availability)
456 CommonSwarming.unregister_uploader up
457 (* c.client_registered_bitfield <- false;
458 for i = 0 to String.length c.client_bitmap - 1 do
459 c.client_bitmap.[0] <- '0';
460 done*)
461 end;
462 (* Don't test if a client have an upload slot because
463 it don't have one (removed during earlier in
464 set_client_disconnected c reason)
466 if (List.mem c !current_uploaders) then
467 begin
468 (*BTGlobals.remove_client*)
469 remove_client c;
470 recompute_uploaders ();
472 else
473 remove_client c;
474 with _ -> ()
478 (** Disconnect all clients of a file
479 @param file The file to which we must disconnects all clients
481 let disconnect_clients file =
482 let must_keep = ref true in
483 (match file_state file with
484 | FilePaused | FileCancelled -> must_keep:=false
485 | _-> ()
487 Hashtbl.iter (fun _ c ->
488 if not ( !must_keep && (client_has_a_slot (as_client c) || c.client_interested)) then
489 begin
490 if !verbose_msg_clients then
491 lprintf_file_nl (as_file file) "disconnect since download is finished";
492 disconnect_client c Closed_by_user
494 ) file.file_clients
497 (** What to do when a file is finished
498 @param file the finished file
500 let download_finished file =
501 if List.memq file !current_files then
502 begin
503 connect_trackers file "completed" false (fun _ _ ->
504 lprintf_file_nl (as_file file) "Tracker return: completed %s" file.file_name;
505 ()); (*must be called before swarmer gets removed from file*)
506 (*CommonComplexOptions.file_completed*)
507 file_completed (as_file file);
508 (* Remove the swarmer for this file as it is not useful anymore... *)
509 CommonSwarming.remove_swarmer file.file_swarmer;
510 file.file_swarmer <- None;
511 (* At this point, the file state is FileDownloaded. We should not remove
512 the file, because we continue to upload. *)
516 (** Check if a file is finished or not.
517 A file is finished if all blocks are verified.
518 @param file The file to check status
520 let check_finished swarmer file =
521 if CommonSwarming.check_finished swarmer then
522 download_finished file
524 let bits = [| 128; 64; 32;16;8;4;2;1 |]
526 (* Check/set bits in strings (bittorrent format) *)
528 let is_bit_set s n =
529 (Char.code s.[n lsr 3]) land bits.(n land 7) <> 0
531 let set_bit s n =
532 let i = n lsr 3 in
533 s.[i] <- Char.unsafe_chr (Char.code s.[i] lor bits.(n land 7))
535 (* Official client seems to use max_range_request 5 and max_range_len 2^14 *)
536 (* How much requests in the 'pipeline' *)
537 let max_range_requests = 5
538 (* How much bytes we can request in one Piece *)
540 let reserved () =
541 let s = String.make 8 '\x00' in
542 s.[7] <- (match !bt_dht with None -> '\x00' | Some _ -> '\x01');
545 (** handshake *)
546 let send_init client_uid file_id sock =
547 let buf = Buffer.create 100 in
548 buf_string8 buf "BitTorrent protocol";
549 Buffer.add_string buf (reserved ());
550 Buffer.add_string buf (Sha1.direct_to_string file_id);
551 Buffer.add_string buf (Sha1.direct_to_string client_uid);
552 let s = Buffer.contents buf in
553 write_string sock s
555 (** A wrapper to send Interested message to a client.
556 (Send interested only if needed)
557 @param c The client to send Interested
559 let send_interested c =
560 if c.client_interesting && (not c.client_alrd_sent_interested) then
561 begin
562 c.client_alrd_sent_interested <- true;
563 send_client c Interested
567 (** Send a Bitfield message to a client.
568 @param c The client to send the Bitfield message
571 let send_bitfield c =
572 send_client c (BitField
574 match c.client_file.file_swarmer with
575 | None ->
576 (* This must be a seeded file... *)
577 if !verbose_download then
578 lprintf_nl "Sending completed verified bitmap";
579 let nchunks = Array.length c.client_file.file_chunks in
580 let len = (nchunks+7)/8 in
581 let s = String.make len '\000' in
582 for i = 0 to nchunks - 1 do
583 set_bit s i
584 done;
586 | Some swarmer ->
587 let bitmap = CommonSwarming.chunks_verified_bitmap swarmer in
588 if !verbose_download then
589 lprintf_nl "Sending verified bitmap: [%s]" (VB.to_string bitmap);
590 let len = (VB.length bitmap + 7)/8 in
591 let s = String.make len '\000' in
592 VB.iteri (fun i c ->
593 if c = VB.State_verified then set_bit s i) bitmap;
597 let counter = ref 0
599 let parse_reserved rbits c =
600 let has_bit pos h = Char.code rbits.[pos] land h <> 0 in
602 c.client_dht <- has_bit 7 0x01;
603 c.client_cache_extension <- has_bit 7 0x02;
604 c.client_fast_extension <- has_bit 7 0x04;
606 c.client_utorrent_extension <- has_bit 5 0x10;
608 c.client_azureus_messaging_protocol <- has_bit 0 0x80
610 let show_client c =
611 let (ip,port) = c.client_host in
612 Printf.sprintf "%s:%d %S" (Ip.to_string ip) port (brand_to_string c.client_brand)
614 (** This function is called to parse the first message that
615 a client send.
616 @param counter client num
617 @param cc Expected client (probably useless now that we don't save any client)
618 @param init_sent A boolean to know if we sent this client the handshake message
619 @param gconn Don't know
620 @param sock The socket we use for this client
621 @param proto Unused (required by tuple type?)
622 @param file_id The file hash (sha1) of the file involved in this exchange
624 (* removed: @param peer_id The hash (sha1) of the client. (Should be checked)
626 let rec client_parse_header counter cc init_sent gconn sock
627 (proto, rbits, file_id) =
629 set_lifetime sock 600.;
630 if !verbose_msg_clients then
631 lprintf_nl "client_parse_header %d" counter;
633 let file = Hashtbl.find files_by_uid file_id in
634 if !verbose_msg_clients then
635 lprintf_file_nl (as_file file) "file found";
636 let ccc, cc_country_code = !cc in
637 let c =
638 match ccc with
639 None ->
640 let c = new_client file Sha1.null (TcpBufferedSocket.peer_addr sock) cc_country_code in
641 if !verbose_connect then lprintf_file_nl (as_file file) "Client %d: incoming connection" (client_num c);
642 cc := (Some c), cc_country_code;
644 | Some c ->
645 (* Does it happen that this c was already used to connect successfully?
646 If yes then this must happen: *)
647 c.client_received_peer_id <- false;
648 if cc_country_code <> None && c.client_country_code = None then
649 c.client_country_code <- cc_country_code;
651 (* client could have had Sha1.null as peer_id/uid *)
652 (* this is to be done, later
653 if c.client_uid <> peer_id then
654 c.client_software <- (parse_software (Sha1.direct_to_string peer_id));
658 (* if c.client_uid <> peer_id then begin
659 lprintf "Unexpected client by UID\n";
660 let ccc = new_client file peer_id (TcpBufferedSocket.host sock) in
661 lprintf "CLIENT %d: testing instead of %d\n"
662 (client_num ccc) (client_num c);
663 (match ccc.client_sock with
664 Connection _ ->
665 lprintf_nl "[BT]: This client is already connected";
666 close sock (Closed_for_error "Already connected");
667 remove_client ccc;
669 | _ ->
670 lprintf_nl "[BT]: Client %d: recovered by UID" (client_num ccc);
671 remove_client c;
672 cc := Some ccc;
673 ccc)
674 end else
675 c *)
678 if !verbose_msg_clients then
679 lprintf_nl "Client %d: Connected from %s" (client_num c) (show_client c);
681 parse_reserved rbits c;
683 (match c.client_sock with
684 NoConnection ->
685 if !verbose_msg_clients then begin
686 let (ip,port) = c.client_host in
687 lprintf_nl "No connection to client (%s:%d)!!!" (Ip.to_string ip) port;
688 end;
689 c.client_sock <- Connection sock
690 | ConnectionWaiting token ->
691 cancel_token token;
692 if !verbose_msg_clients then
693 lprintf_nl "Waiting for connection to client !!!";
694 c.client_sock <- Connection sock
695 | Connection s when s != sock ->
696 if !verbose_msg_clients then
697 lprintf_nl "CLIENT %d: IMMEDIATE RECONNECTION" (client_num c);
698 disconnect_client c (Closed_for_error "Reconnected");
699 c.client_sock <- Connection sock;
700 | Connection _ -> ()
703 set_client_state (c) (Connected (-1));
704 if not init_sent then
705 begin
706 c.client_incoming <- true;
707 send_init !!client_uid file_id sock;
708 end;
709 connection_ok c.client_connection_control;
710 if !verbose_msg_clients then
711 lprintf_nl "file and client found";
712 (* if not c.client_incoming then *)
713 send_bitfield c; (* BitField is always the first message *)
714 begin match c.client_dht, !bt_dht with
715 | true, Some dht -> send_client c (DHT_Port dht.BT_DHT.M.dht_port)
716 | _ -> ()
717 end;
718 c.client_blocks_sent <- file.file_blocks_downloaded;
720 TODO !!! : send interested if and only if we are interested
721 -> we must recieve at least other peer bitfield.
722 in common swarmer -> compare : partition -> partition -> bool
725 set_rtimeout sock !!client_timeout;
726 (* Once parsed succesfully we define the function client_to_client
727 to be the function used when a message is read *)
728 gconn.gconn_handler <- Reader (fun gconn sock ->
729 bt_handler TcpMessages.parsing (client_to_client c) c sock
732 let b = TcpBufferedSocket.buf sock in
733 (* The receive buffer is normally not empty now, lets parse the rest, most likely PeerID *)
734 if b.len <> 0 then
735 ignore (bt_handler TcpMessages.parsing (client_to_client c) c sock);
737 (* Some newer clients send more opcodes in their handshake packet, lets parse them now.
738 Using "while b.len <> 0 do ... done" is not possible here because libtorrent clients
739 send unparsable five extra bytes after their PeerID which would result into a loop *)
740 if b.len <> 0 then
741 ignore (bt_handler TcpMessages.parsing (client_to_client c) c sock);
743 with
744 | Not_found ->
745 let (ip,port) = (TcpBufferedSocket.peer_addr sock) in
746 if !verbose_unexpected_messages then
747 lprintf_nl "Client %s:%d requested a file that is not shared [%s]"
748 (Ip.to_string ip) port (Sha1.to_hexa file_id)
749 | e ->
750 lprintf_nl "Exception %s in client_parse_header" (Printexc2.to_string e);
751 close sock (Closed_for_exception e);
752 raise e
755 (** Update the bitmap of a client. Unclear if it is still useful.
756 @param c The client which we want to update.
758 and update_client_bitmap c =
759 let file = c.client_file in
761 let swarmer = match file.file_swarmer with
762 None -> assert false
763 | Some swarmer -> swarmer
766 let up =
767 match c.client_uploader with
768 None ->
769 let up = CommonSwarming.register_uploader swarmer (as_client c)
770 (AvailableIntervals []) in
771 c.client_uploader <- Some up;
773 | Some up ->
777 let bitmap = match c.client_bitmap with
778 None ->
779 let len = CommonSwarming.partition_size swarmer in
780 let bitmap = Bitv.create len false in
781 c.client_bitmap <- Some bitmap;
782 bitmap
783 | Some bitmap -> bitmap
786 if c.client_new_chunks <> [] then begin
787 let chunks = c.client_new_chunks in
788 c.client_new_chunks <- [];
789 List.iter (fun n -> Bitv.set bitmap n true) chunks;
790 CommonSwarming.update_uploader_intervals up (AvailableBitv bitmap);
794 (** In this function we decide which piece we must request from client.
795 @param sock Socket of the client
796 @param c The client
798 and get_from_client sock (c: client) =
799 let file = c.client_file in
800 (* Check if there's not enough requests in the 'pipeline'
801 and if a request can be send (not choked and file is downloading) *)
802 if List.length c.client_ranges_sent < max_range_requests
803 && file_state file = FileDownloading
804 && (c.client_choked == false)
805 then
806 (* num is the number of the piece, x and y are the position
807 of the subpiece in the piece(!), r is a (CommonSwarmer) range *)
809 let up = match c.client_uploader with
810 None -> assert false
811 | Some up -> up in
812 let swarmer = CommonSwarming.uploader_swarmer up in
816 let num, x,y, r =
818 if !verbose_msg_clients then
819 lprintf_file_nl (as_file file) "CLIENT %d: Finding new range to send" (client_num c);
821 if !verbose_swarming then begin
822 lprintf_n "Current download:\n Current chunks: ";
825 List.iter (fun (x,y) -> lprintf "%Ld-%Ld " x y) c.client_chunks
826 with _ -> lprintf "No Chunks";
828 lprint_newline ();
830 lprintf_n "Current ranges: ";
832 List.iter (fun (p1,p2, r) ->
833 let (x,y) = CommonSwarming.range_range r in
834 lprintf "%Ld-%Ld[%Ld-%Ld] " p1 p2 x y
835 ) c.client_ranges_sent;
837 match c.client_range_waiting with
838 | None -> ()
839 | Some (x,y,r) -> lprintf "Waiting %Ld-%Ld" x y;
841 lprint_newline ();
843 lprintf_n "Current blocks: ";
845 match c.client_chunk with
846 | None -> lprintf "none"
847 | Some (chunk, blocks) -> List.iter (fun b ->
848 CommonSwarming.print_block b.up_block) blocks;
850 lprint_newline ();
852 lprintf_file_nl (as_file file) "Finding Range:";
853 end;
857 (*We must find a block to request first, and then
858 some range inside this block
861 let rec iter () =
863 match c.client_chunk with
865 | None ->
867 if !verbose_swarming then lprintf_file_nl (as_file file) "No block";
868 update_client_bitmap c;
869 (try CommonSwarming.verify_one_chunk swarmer with _ -> ());
870 (*Find a free block in the swarmer*)
871 let chunk, blocks = CommonSwarming.find_blocks up in
872 if !verbose_swarming then begin
873 lprintf_n "Blocks Found: "; List.iter (fun b ->
874 CommonSwarming.print_block b.up_block) blocks;
875 lprint_newline ()
876 end;
877 c.client_chunk <- Some (chunk, blocks);
879 (*We put the found block in client_block to
880 request range in this block. (Useful for
881 not searching each time a new block)
884 iter ()
886 | Some (chunk, blocks) ->
888 if !verbose_swarming then begin
889 lprintf_n "Current Blocks: "; List.iter (fun b ->
890 CommonSwarming.print_block b.up_block) blocks;
891 lprint_newline ()
892 end;
895 (*Given a block find a range inside*)
896 let (x,y,r) =
897 match c.client_range_waiting with
898 | Some (x,y,r) ->
899 c.client_range_waiting <- None;
900 (x,y,r)
901 | None ->
902 CommonSwarming.find_range up (min max_range_len file.file_piece_size)
905 let (x,y,r) =
907 if y -- x > max_range_len then begin
908 c.client_range_waiting <- Some (x ++ max_range_len, y, r);
909 (x, x ++ max_range_len, r)
910 end else
911 (x,y,r)
914 c.client_ranges_sent <- c.client_ranges_sent @ [x,y, r];
915 (* CommonSwarming.alloc_range r; *)
917 (* naughty, naughty, was computing a block number instead of a chunk
918 number. Only matters with merged downloads, and even then other
919 clients didn't seem to care (?), so the bug remained hidden *)
920 if !verbose_swarming then
921 lprintf_file_nl (as_file file) "Asking %d For Range %Ld-%Ld" chunk x y;
923 chunk, x -- file.file_piece_size ** Int64.of_int chunk, y -- x, r
925 with Not_found ->
927 (*If we don't find a range to request inside the block,
928 iter to choose another block*)
929 if !verbose_swarming then
930 lprintf_nl "Could not find range in current block";
931 (* c.client_blocks <- List2.removeq b c.client_blocks; *)
933 c.client_chunk <- None;
935 iter ()
938 iter ()
940 with Not_found ->
941 (*If we don't find a block to request we can check if the
942 file is finished (if there's missing pieces we can't decide
943 that the file is finished because we didn't found
944 a block to ask)
946 if !verbose_swarming then
947 lprintf_nl "Unable to get a block !!";
948 CommonSwarming.compute_bitmap swarmer;
949 check_finished swarmer file;
950 raise Not_found
953 send_client c (Request (num,x,y));
955 if !verbose_msg_clients then
956 lprintf_file_nl (as_file file) "CLIENT %d: Asking %s For Range %Ld-%Ld"
957 (client_num c) (Sha1.to_string c.client_uid) x y
959 with Not_found ->
960 if not (CommonSwarming.check_finished swarmer) && !verbose_download then
961 lprintf_file_nl (as_file file) "BTClient.get_from_client ERROR: can't find a block to download and file is not yet finished for file : %s..." file.file_name
964 (** In this function we match a message sent by a client
965 and react according to this message.
966 @param c The client which sent us a message
967 @param sock The socket used for this client
968 @param msg The message sent by the client
970 and client_to_client c sock msg =
971 if !verbose_msg_clients then begin
972 let (ip,port) = (TcpBufferedSocket.peer_addr sock) in
973 let (timeout, next) = get_rtimeout sock in
974 lprintf_nl "CLIENT %d(%s:%d): (%d, %d,%d) Received %s"
975 (client_num c) (Ip.to_string ip) port
976 (last_time ())
977 (int_of_float timeout)
978 (int_of_float next)
979 (TcpMessages.to_string msg);
980 end;
982 let file = c.client_file in
984 (* Sending the "Have" message was moved to bTGlobals so this is useless *)
985 (* if c.client_blocks_sent != file.file_blocks_downloaded then begin
986 let rec iter list =
987 match list with
988 [] -> ()
989 | b :: tail when tail == c.client_blocks_sent ->
990 c.client_blocks_sent <- list;
991 let (num,_,_) = CommonSwarming.block_block b in
992 send_client c (Have (Int64.of_int num))
993 | _ :: tail -> iter tail
995 iter file.file_blocks_downloaded
996 end;*)
999 match msg with
1000 | Piece (num, offset, s, pos, len) ->
1001 (*A Piece message contains the data*)
1002 set_client_state c (Connected_downloading (file_num file));
1003 (*flag it as a good client *)
1004 c.client_good <- true;
1005 if file_state file = FileDownloading then begin
1006 let position = offset ++ file.file_piece_size *.. num in
1007 let up = match c.client_uploader with
1008 None -> assert false
1009 | Some up -> up in
1010 let swarmer = CommonSwarming.uploader_swarmer up in
1012 if !verbose_msg_clients then
1013 (match c.client_ranges_sent with
1014 [] -> lprintf_file_nl (as_file file) "EMPTY Ranges !!!"
1015 | (p1,p2,r) :: _ ->
1016 let (x,y) = CommonSwarming.range_range r in
1017 lprintf_file_nl (as_file file) "Current range from %s : %Ld [%d] (asked %Ld-%Ld[%Ld-%Ld])"
1018 (show_client c) position len
1019 p1 p2 x y
1022 let old_downloaded =
1023 CommonSwarming.downloaded swarmer in
1024 (* List.iter CommonSwarming.free_range c.client_ranges; *)
1025 CommonSwarming.received up
1026 position s pos len;
1027 (* List.iter CommonSwarming.alloc_range c.client_ranges; *)
1028 let new_downloaded =
1029 CommonSwarming.downloaded swarmer in
1031 (*Update rate and amount of data received from client*)
1032 count_download c (new_downloaded -- old_downloaded);
1033 (* use len here with max_dr quickfix *)
1034 Rate.update c.client_downloaded_rate ~amount:len;
1035 (* count bytes downloaded from network for this file *)
1036 file.file_session_downloaded <- file.file_session_downloaded ++ (Int64.of_int len);
1037 if !verbose_msg_clients then
1038 (match c.client_ranges_sent with
1039 [] -> lprintf_file_nl (as_file file) "EMPTY Ranges !!!"
1040 | (p1,p2,r) :: _ ->
1041 let (x,y) = CommonSwarming.range_range r in
1042 lprintf_file_nl (as_file file) "Received %Ld [%d] %Ld-%Ld[%Ld-%Ld] -> %Ld"
1043 position len
1044 p1 p2 x y
1045 (new_downloaded -- old_downloaded)
1048 (* changed 2.5.28 should have been done before !
1049 if new_downloaded <> old_downloaded then
1050 add_file_downloaded (as_file file)
1051 (new_downloaded -- old_downloaded); *)
1052 end;
1053 begin
1054 match c.client_ranges_sent with
1055 [] -> ()
1056 | r :: tail ->
1057 (* CommonSwarming.free_range r; *)
1058 c.client_ranges_sent <- tail;
1059 end;
1060 get_from_client sock c;
1062 (* Check if the client is still interesting for us... *)
1063 check_if_interesting file c
1065 | PeerID p ->
1066 (* Disconnect if that is ourselves. *)
1067 c.client_uid <- Sha1.direct_of_string p;
1068 if not (c.client_uid = !!client_uid) then
1069 begin
1070 let brand, release = parse_software p in
1071 c.client_brand <- brand;
1072 c.client_release <- release;
1073 send_client c Choke;
1074 c.client_sent_choke <- true;
1076 else
1077 disconnect_client c Closed_by_user
1080 | BitField p ->
1081 (*A bitfield is a summary of what a client have*)
1082 begin
1083 match c.client_file.file_swarmer with
1084 None -> ()
1085 | Some swarmer ->
1086 c.client_new_chunks <- [];
1088 let npieces = CommonSwarming.partition_size swarmer in
1089 let nbits = String.length p * 8 in
1091 if nbits < npieces then begin
1092 lprintf_file_nl (as_file file) "Error: expected bitfield of atleast %d but got %d" npieces nbits;
1093 disconnect_client c (Closed_for_error "Wrong bitfield length")
1094 end else begin
1096 let bitmap = CommonSwarming.chunks_verified_bitmap swarmer in
1098 for i = 0 to npieces - 1 do
1099 if is_bit_set p i then begin
1100 c.client_new_chunks <- i :: c.client_new_chunks;
1101 match VB.get bitmap i with
1102 | VB.State_missing | VB.State_partial ->
1103 c.client_interesting <- true
1104 | VB.State_complete | VB.State_verified -> ()
1105 end
1106 done;
1108 update_client_bitmap c;
1109 c.client_registered_bitfield <- true;
1111 if c.client_interesting then
1112 send_interested c;
1114 if !verbose_msg_clients then
1115 lprintf_file_nl (as_file file) "New BitField Registered";
1117 (* for i = 1 to max_range_requests - List.length c.client_ranges do
1118 (try get_from_client sock c with _ -> ())
1119 done
1122 end;
1123 end;
1124 (* Note: a bitfield must only be sent after the handshake and before everything else: NOT here *)
1126 | Have n ->
1127 (* A client can send a "Have" without sending a Bitfield *)
1128 begin
1129 match c.client_file.file_swarmer with
1130 None -> ()
1131 | Some swarmer ->
1132 let n = Int64.to_int n in
1133 let bitmap = CommonSwarming.chunks_verified_bitmap swarmer in
1134 (* lprintf_nl "verified: %c;" (VB.state_to_char (VB.get bitmap n)); *)
1135 (* if the peer has a chunk we don't, tell him we're interested and update his bitmap *)
1136 match VB.get bitmap n with
1137 | VB.State_missing | VB.State_partial ->
1138 c.client_interesting <- true;
1139 send_interested c;
1140 c.client_new_chunks <- n :: c.client_new_chunks;
1141 update_client_bitmap c;
1142 | VB.State_complete | VB.State_verified -> ()
1144 (* begin
1145 match c.client_bitmap, c.client_uploader with
1146 Some bitmap, Some up ->
1147 let swarmer = CommonSwarming.uploader_swarmer up in
1148 let n = Int64.to_int n in
1149 if bitmap.[n] <> '1' then
1151 let verified = CommonSwarming.verified_bitmap swarmer in
1152 if verified.[n] < '2' then begin
1153 c.client_interesting <- true;
1154 send_interested c;
1155 c.client_new_chunks <- n :: c.client_new_chunks;
1156 if c.client_block = None then begin
1157 update_client_bitmap c;
1158 (* for i = 1 to max_range_requests -
1159 List.length c.client_ranges do
1160 (try get_from_client sock c with _ -> ())
1161 done*)
1164 | None, Some _ -> lprintf_nl "no bitmap but client_uploader";
1165 | Some _ , None ->lprintf_nl "bitmap but no client_uploader";
1166 | None, None -> lprintf_nl "no bitmap no client_uploader";
1171 | Interested ->
1172 c.client_interested <- true;
1174 | Choke ->
1175 begin
1176 set_client_state (c) (Connected (-1));
1177 (* remote peer will clear the list of range we sent *)
1178 begin
1179 match c.client_uploader with
1180 None ->
1181 (* Afaik this is no protocol violation and happens if the client
1182 didn't send a client bitmap after the handshake. *)
1183 if !verbose_msg_clients then lprintf_file_nl (as_file file) "%s : Choke send, but no client bitmap"
1184 (show_client c)
1185 | Some up ->
1186 CommonSwarming.clear_uploader_intervals up
1187 end;
1188 c.client_ranges_sent <- [];
1189 c.client_range_waiting <- None;
1190 c.client_choked <- true;
1193 | NotInterested ->
1194 c.client_interested <- false;
1196 | Unchoke ->
1197 begin
1198 c.client_choked <- false;
1199 (* remote peer cleared our request : re-request *)
1200 for i = 1 to max_range_requests -
1201 List.length c.client_ranges_sent do
1202 (try get_from_client sock c with _ -> ())
1203 done
1206 | Request (n, pos, len) ->
1207 if len > max_request_len then begin
1208 close sock (Closed_for_error "Request longer than 1<<16");
1209 raise Exit
1210 end;
1212 if !CommonGlobals.has_upload = 0 then
1213 begin
1214 if client_has_a_slot (as_client c) then
1215 begin
1216 (* lprintf "Received request for upload\n"; *)
1217 (match c.client_upload_requests with
1218 [] ->
1219 CommonUploads.ready_for_upload (as_client c);
1220 | _ -> ());
1221 c.client_upload_requests <- c.client_upload_requests @ [n,pos,len];
1222 let file = c.client_file in
1223 match file.file_shared with
1224 None -> ()
1225 | Some s ->
1226 begin
1227 s.impl_shared_requests <- s.impl_shared_requests + 1;
1228 shared_must_update (as_shared s)
1231 else
1232 begin
1233 send_client c Choke;
1234 c.client_sent_choke <- true;
1235 c.client_upload_requests <- [];
1237 end;
1239 | Ping -> ()
1240 (* We don't 'generate' a Ping message on a Ping. *)
1242 | Cancel (n, pos, len) ->
1243 (* if we receive a cancel message from a peer, remove request *)
1244 if client_has_a_slot (as_client c) then
1245 c.client_upload_requests <- List2.remove_first (n, pos, len) c.client_upload_requests
1246 else
1247 if !verbose_msg_clients then
1248 lprintf_file_nl (as_file file) "Error: received cancel request but client has no slot"
1250 | DHT_Port port ->
1251 match !bt_dht with
1252 | None ->
1253 if !verbose_msg_clients then
1254 lprintf_file_nl (as_file file) "Received DHT PORT when DHT is disabled. From %s" (show_client c)
1255 | Some dht ->
1256 BT_DHT.M.ping dht (fst c.client_host, port) begin function
1257 | None ->
1258 if !verbose then
1259 lprintf_file_nl (as_file file) "Peer %s didn't reply to DHT ping on port %d" (show_client c) port
1260 | Some (id,addr) ->
1261 BT_DHT.update dht Kademlia.Good id addr
1264 with e ->
1265 lprintf_file_nl (as_file file) "Error %s while handling MESSAGE: %s" (Printexc2.to_string e) (TcpMessages.to_string msg)
1268 (** The function used to connect to a client.
1269 The connection is not immediately initiated. It will
1270 be put in a fifo and dequeued according to
1271 !!max_connections_per_second. (@see commonGlobals.ml)
1272 @param c The client we must connect
1274 let connect_client c =
1275 if can_open_connection connection_manager &&
1276 (let (ip,port) = c.client_host in
1277 match !Ip.banned (ip, c.client_country_code) with
1278 None -> true
1279 | Some reason ->
1280 if !verbose_connect then
1281 lprintf_nl "%s:%d (%s), blocked: %s"
1282 (Ip.to_string ip) port
1283 (fst (Geoip.get_country_code_name c.client_country_code))
1284 reason;
1285 false)
1286 then
1287 match c.client_sock with
1288 NoConnection ->
1290 let token =
1291 add_pending_connection connection_manager (fun token ->
1293 if !verbose_msg_clients then
1294 lprintf_nl "CLIENT %d: connect_client" (client_num c);
1295 let (ip,port) = c.client_host in
1296 if !verbose_msg_clients then
1297 lprintf_nl "connecting %s:%d" (Ip.to_string ip) port;
1298 connection_try c.client_connection_control;
1299 begin
1300 let sock = connect token "bittorrent download"
1301 (Ip.to_inet_addr ip) port
1302 (fun sock event ->
1303 match event with
1304 BASIC_EVENT LTIMEOUT ->
1305 if !verbose_msg_clients then
1306 lprintf_nl "CLIENT %d: LIFETIME" (client_num c);
1307 close sock Closed_for_timeout
1308 | BASIC_EVENT RTIMEOUT ->
1309 if !verbose_msg_clients then
1310 lprintf_nl "CLIENT %d: RTIMEOUT (%d)" (client_num c)
1311 (last_time ())
1313 close sock Closed_for_timeout
1314 | BASIC_EVENT (CLOSED r) ->
1315 begin
1316 match c.client_sock with
1317 | Connection s when s == sock ->
1318 disconnect_client c r
1319 | _ -> ()
1320 end;
1321 | _ -> ()
1324 c.client_sock <- Connection sock;
1325 set_lifetime sock 600.;
1326 TcpBufferedSocket.set_read_controler sock download_control;
1327 TcpBufferedSocket.set_write_controler sock upload_control;
1328 TcpBufferedSocket.set_rtimeout sock 30.;
1329 let file = c.client_file in
1331 if !verbose_msg_clients then
1332 lprintf_file_nl (as_file file) "READY TO DOWNLOAD FILE";
1334 send_init !!client_uid file.file_id sock;
1335 (* Fabrice: Initialize the client bitmap and uploader fields to <> None *)
1336 update_client_bitmap c;
1337 (* (try get_from_client sock c with _ -> ());*)
1338 incr counter;
1339 (*We 'hook' the client_parse_header function to the socket
1340 This function will then be called when the first message will
1341 be parsed
1343 set_bt_sock sock !verbose_msg_clients
1344 (BTHeader (client_parse_header !counter (ref ((Some c), c.client_country_code)) true))
1346 with e ->
1347 lprintf_nl "Exception %s while connecting to client"
1348 (Printexc2.to_string e);
1349 disconnect_client c (Closed_for_exception e)
1351 (*Since this is a pending connection put ConnectionWaiting
1352 in client_sock
1355 c.client_sock <- ConnectionWaiting token
1356 | _ -> ()
1359 (** The Listen function (very much like in C : TCP Socket Server).
1360 Monitors client connection to us.
1362 let listen () =
1364 let s = TcpServerSocket.create "bittorrent client server"
1365 (Ip.to_inet_addr !!client_bind_addr)
1366 !!client_port
1367 (fun sock event ->
1368 match event with
1369 TcpServerSocket.CONNECTION (s,
1370 Unix.ADDR_INET(from_ip, from_port)) ->
1371 (*Receiving an event TcpServerSocket.CONNECTION from
1372 the TcpServerSocket means that a new client try
1373 to connect to us
1375 let ip = (Ip.of_inet_addr from_ip) in
1376 let cc = Geoip.get_country_code_option ip in
1377 if !verbose_sources > 1 then lprintf_nl "CONNECTION RECEIVED FROM %s"
1378 (Ip.to_string (Ip.of_inet_addr from_ip))
1380 (*Reject this connection if we don't want
1381 to bypass the max_connection parameter
1383 if can_open_connection connection_manager &&
1384 (match !Ip.banned (ip, cc) with
1385 None -> true
1386 | Some reason ->
1387 if !verbose_connect then
1388 lprintf_nl "%s:%d (%s) blocked: %s"
1389 (Ip.to_string ip) from_port
1390 (fst (Geoip.get_country_code_name cc))
1391 reason;
1392 false)
1393 then
1394 begin
1395 let token = create_token connection_manager in
1396 let sock = TcpBufferedSocket.create token
1397 "bittorrent client connection" s
1398 (fun sock event ->
1399 match event with
1400 BASIC_EVENT (RTIMEOUT|LTIMEOUT) ->
1401 (*monitor read and life timeout on client
1402 sockets
1404 close sock Closed_for_timeout
1405 | _ -> ()
1408 TcpBufferedSocket.set_read_controler sock download_control;
1409 TcpBufferedSocket.set_write_controler sock upload_control;
1411 let c = ref (None, cc) in
1412 TcpBufferedSocket.set_closer sock (fun _ r ->
1413 match fst !c with
1414 Some c -> begin
1415 match c.client_sock with
1416 | Connection s when s == sock ->
1417 disconnect_client c r
1418 | _ -> ()
1420 | None -> ()
1422 set_rtimeout sock 30.;
1423 incr counter;
1424 (*Again : 'hook' client_parse_header to the socket*)
1425 set_bt_sock sock !verbose_msg_clients
1426 (BTHeader (client_parse_header !counter c false));
1428 else
1429 (*don't forget to close the incoming sock if we can't
1430 open a new connection
1432 Unix.close s
1433 | _ -> ()
1434 ) in
1435 listen_sock := Some s;
1437 with e ->
1438 if !verbose_connect then
1439 lprintf_nl "Exception %s while init bittorrent server"
1440 (Printexc2.to_string e)
1443 (** This function send keepalive messages to all connected clients
1444 (and update socket lifetime)
1446 let send_pings () =
1447 List.iter (fun file ->
1448 Hashtbl.iter (fun _ c ->
1449 match c.client_sock with
1450 | Connection sock ->
1451 send_client c Ping;
1452 set_lifetime sock 130.;
1453 | _ -> ()
1454 ) file.file_clients
1455 ) !current_files
1457 open Bencode
1460 (** Check each clients for a given file if they are connected.
1461 If they aren't, try to connect them
1463 let resume_clients file =
1464 Hashtbl.iter (fun _ c ->
1466 match c.client_sock with
1467 | Connection sock -> ()
1468 (*i think this one is not really usefull for debugging
1469 lprintf_nl "[BT]: RESUME: Client is already connected"; *)
1470 | _ ->
1471 (try
1472 (*test if we can connect client according to the its
1473 connection_control.
1474 Currently the delay between two try is 120 seconds.
1476 if connection_can_try c.client_connection_control then
1477 connect_client c
1478 else
1479 print_control c.client_connection_control
1480 with _ -> ())
1481 with e ->
1482 if !verbose_connect then
1483 lprintf_file_nl (as_file file) "Exception %s in resume_clients" (Printexc2.to_string e)
1484 ) file.file_clients
1486 let () =
1487 resume_clients_hook := resume_clients
1489 (** Check if the value replied by the tracker is correct.
1490 @param key the name of the key
1491 @param n the value to check
1492 @param url Url of the tracker
1493 @param name the name of the file
1495 let chk_keyval key n url name =
1496 let int_n = (Int64.to_int n) in
1497 if !verbose_msg_clients then
1498 lprintf_nl "Reply from %s in file: %s has %s: %d" (show_tracker_url url) name key int_n;
1499 if int_n > -1 then
1500 int_n
1501 else begin
1502 lprintf_nl "Reply from %s in file: %s has an invalid %s value: %d" (show_tracker_url url) name key int_n;
1506 let exn_catch f x = try `Ok (f x) with exn -> `Exn exn
1508 (** In this function we interact with the tracker
1509 @param file The file for which we want some sources
1510 @param need_sources whether we need any sources
1512 let talk_to_tracker file need_sources =
1513 (* This is the function which will be called by the http client for parsing the response *)
1514 let f t filename =
1515 let tracker_url = show_tracker_url t.tracker_url in
1516 let tracker_failed reason =
1517 (* On failure, disable the tracker and count attempts (@see is_tracker_enabled) *)
1518 let num = match t.tracker_status with | Disabled_failure (i,_) -> i + 1 | _ -> 1 in
1519 t.tracker_status <- Disabled_failure (num, intern reason);
1520 lprintf_file_nl (as_file file) "Failure no. %d%s from Tracker %s for file: %s Reason: %s"
1522 (if !!tracker_retries = 0 then "" else Printf.sprintf "/%d" !!tracker_retries)
1523 tracker_url file.file_name (Charset.Locale.to_utf8 reason)
1525 match exn_catch File.to_string filename with
1526 | `Exn _ | `Ok "" -> tracker_failed "empty reply"
1527 | `Ok s ->
1528 match exn_catch Bencode.decode s with
1529 | `Exn exn -> tracker_failed (Printf.sprintf "wrong reply (%s)" (Printexc2.to_string exn))
1530 | `Ok (Dictionary list) ->
1531 t.tracker_interval <- 600;
1532 t.tracker_min_interval <- 600;
1533 if need_sources then t.tracker_last_clients_num <- 0;
1534 let chk_keyval key n = chk_keyval key n t.tracker_url file.file_name in
1535 if not (List.mem_assoc "failure reason" list) then
1536 begin
1537 begin match t.tracker_status with
1538 | Disabled_failure (i, _) ->
1539 lprintf_file_nl (as_file file) "Received good message from Tracker %s after %d bad attempts"
1540 tracker_url i
1541 | _ -> () end;
1542 (* Received good message from tracker after failures, re-enable tracker *)
1543 t.tracker_status <- Enabled;
1544 end;
1545 List.iter (fun (key,value) ->
1546 match (key,value) with
1547 | "failure reason", String failure -> tracker_failed failure
1548 | "warning message", String warning ->
1549 lprintf_file_nl (as_file file) "Warning from Tracker %s in file: %s Reason: %s"
1550 tracker_url file.file_name warning
1551 | "interval", Int n ->
1552 t.tracker_interval <- chk_keyval key n;
1553 (* in case we don't receive "min interval" *)
1554 if t.tracker_min_interval > t.tracker_interval then
1555 t.tracker_min_interval <- t.tracker_interval
1556 | "min interval", Int n ->
1557 t.tracker_min_interval <- chk_keyval key n;
1558 (* make sure "min interval" is always < or equal to "interval" *)
1559 if t.tracker_min_interval > t.tracker_interval then
1560 t.tracker_min_interval <- t.tracker_interval
1561 | "downloaded", Int n ->
1562 t.tracker_torrent_downloaded <- chk_keyval key n
1563 | "complete", Int n
1564 | "done peers", Int n ->
1565 t.tracker_torrent_complete <- chk_keyval key n
1566 | "incomplete", Int n ->
1567 t.tracker_torrent_incomplete <- chk_keyval key n;
1568 (* if complete > 0 and we receive incomplete we probably won't receive num_peers so we simulate it below *)
1569 if t.tracker_torrent_complete > 0 then
1570 t.tracker_torrent_total_clients_count <- (t.tracker_torrent_complete + t.tracker_torrent_incomplete);
1571 | "num peers", Int n ->
1572 t.tracker_torrent_total_clients_count <- chk_keyval key n;
1573 (* if complete > 0 and we receive num_peers we probably won't receive incomplete so we simulate it below *)
1574 if t.tracker_torrent_complete > 0 then
1575 t.tracker_torrent_incomplete <- (t.tracker_torrent_total_clients_count - t.tracker_torrent_complete);
1576 | "last", Int n ->
1577 t.tracker_torrent_last_dl_req <- chk_keyval key n
1578 | "key", String n ->
1579 t.tracker_key <- n;
1580 if !verbose_msg_clients then
1581 lprintf_file_nl (as_file file) "%s in file: %s has key: %s" tracker_url file.file_name n
1582 | "tracker id", String n ->
1583 t.tracker_id <- n;
1584 if !verbose_msg_clients then
1585 lprintf_file_nl (as_file file) "%s in file: %s has tracker id %s" tracker_url file.file_name n
1587 | "peers", List list ->
1588 if need_sources then
1589 List.iter (fun v ->
1590 match v with
1591 | Dictionary list ->
1592 let peer_id = ref Sha1.null in
1593 let peer_ip = ref Ip.null in
1594 let port = ref 0 in
1596 List.iter (fun v ->
1597 match v with
1598 "peer id", String id ->
1599 peer_id := Sha1.direct_of_string id;
1600 | "ip", String ip ->
1601 peer_ip := Ip.of_string ip
1602 | "port", Int p ->
1603 port := Int64.to_int p
1604 | _ -> ()
1605 ) list;
1607 t.tracker_last_clients_num <- t.tracker_last_clients_num + 1;
1608 maybe_new_client file !peer_id !peer_ip !port
1610 | _ -> assert false
1611 ) list
1612 | "peers", String p ->
1613 let rec iter_comp s pos l =
1614 if pos < l then
1615 let ip = Ip.of_ints (get_uint8 s pos,get_uint8 s (pos+1),
1616 get_uint8 s (pos+2),get_uint8 s (pos+3))
1617 and port = get_int16 s (pos+4)
1619 t.tracker_last_clients_num <- t.tracker_last_clients_num + 1;
1620 maybe_new_client file Sha1.null ip port;
1622 iter_comp s (pos+6) l
1624 if need_sources then
1625 iter_comp p 0 (String.length p)
1626 | "private", Int n -> ()
1627 (* TODO: if set to 1, disable peer exchange *)
1629 | key, _ -> lprintf_file_nl (as_file file) "received unknown entry in answer from tracker: %s : %s" key (Bencode.print value)
1630 ) list;
1631 (*Now, that we have added new clients to a file, it's time
1632 to connect to them*)
1633 if !verbose_sources > 0 then
1634 lprintf_file_nl (as_file file) "talk_to_tracker: got %i source(s) for file %s"
1635 t.tracker_last_clients_num file.file_name;
1636 if need_sources then resume_clients file
1638 | _ -> tracker_failed "wrong reply (value)"
1640 let event =
1641 if file.file_tracker_connected then ""
1642 else "started"
1644 connect_trackers file event need_sources f
1646 let talk_to_dht file need_sources =
1647 match !bt_dht with
1648 | None -> ()
1649 | Some dht ->
1650 if !verbose then lprintf_file_nl (as_file file) "DHT announce";
1651 file.file_last_dht_announce <- last_time ();
1652 BT_DHT.query_peers dht file.file_id (fun (_,addr as node) token peers ->
1653 BT_DHT.M.announce dht addr !!client_port token file.file_id (fun _ -> ()) ~kerr:(fun () ->
1654 if !verbose then lprintf_file_nl (as_file file) "DHT announce to %s failed" (BT_DHT.show_node node));
1655 if need_sources then
1656 begin
1657 List.iter (fun (ip,port) -> maybe_new_client file Sha1.null ip port) peers;
1658 resume_clients file
1659 end)
1661 let talk_to_tracker file need_sources =
1662 if file.file_last_dht_announce + 14*60 < last_time () && not file.file_private then talk_to_dht file need_sources;
1663 talk_to_tracker file need_sources
1665 (** Check to see if file is finished, if not
1666 try to get sources for it
1668 let recover_files () =
1669 if !verbose_share then
1670 lprintf_nl "recover_files";
1671 List.iter (fun file ->
1672 match file.file_swarmer with
1673 None -> ()
1674 | Some swarmer ->
1675 (try check_finished swarmer file with e -> ());
1676 match file_state file with
1677 FileDownloading ->
1678 if !verbose_share then
1679 lprintf_file_nl (as_file file) "recover downloading";
1680 (try talk_to_tracker file true with _ -> ())
1681 | FileShared ->
1682 if !verbose_share then
1683 lprintf_file_nl (as_file file) "recover shared";
1684 (try talk_to_tracker file false with _ -> ())
1685 | FilePaused -> () (*when we are paused we do nothing, not even logging this vvvv*)
1686 | FileQueued -> ()
1687 | s -> if !verbose then lprintf_file_nl (as_file file) "recover: Other state %s!!" (string_of_state s)
1688 ) !current_files
1690 let upload_buffer = String.create 100000
1694 Send a Piece message
1695 for one of the request of client
1696 @param sock The socket of the client
1697 @param c The client
1699 let rec iter_upload sock c =
1700 match c.client_upload_requests with
1701 [] -> ()
1702 | (num, pos, len) :: tail ->
1703 if len = zero then begin
1704 c.client_upload_requests <- tail;
1705 iter_upload sock c
1706 end else
1707 if c.client_allowed_to_write >= 0L then begin
1709 c.client_upload_requests <- tail;
1711 let file = c.client_file in
1712 let offset = pos ++ file.file_piece_size *.. num in
1713 c.client_allowed_to_write <- c.client_allowed_to_write -- len;
1714 count_upload c len;
1715 let len = Int64.to_int len in
1716 (* lprintf "Unix32.read: offset %Ld len %d\n" offset len; *)
1717 Unix32.read (file_fd file) offset upload_buffer 0 len;
1718 (* update upload rate from len bytes *)
1719 Rate.update c.client_upload_rate ~amount:len;
1720 Rate.update c.client_downloaded_rate;
1721 file.file_uploaded <- file.file_uploaded ++ (Int64.of_int len);
1722 file.file_session_uploaded <- file.file_session_uploaded ++ (Int64.of_int len);
1723 let _ =
1724 (* update stats *)
1725 count_filerequest c;
1726 match file.file_shared with
1727 None -> ()
1728 | Some s ->
1729 begin
1730 s.impl_shared_uploaded <- file.file_uploaded;
1731 shared_must_update (as_shared s)
1734 (* lprintf "sending piece\n"; *)
1735 send_client c (Piece (num, pos, upload_buffer, 0, len));
1736 iter_upload sock c
1737 with e ->
1738 if !verbose then
1739 lprintf_nl "Exception %s in iter_upload" (Printexc2.to_string e)
1740 end else
1741 begin
1742 (* lprintf "client is waiting for another piece\n"; *)
1743 ready_for_upload (as_client c)
1748 In this function we check if we can send bytes (according
1749 to bandwidth control), if we can, call iter_upload to
1750 send a Piece message
1751 @param c the client to which we can send some bytes
1752 @param allowed the amount of bytes we can send to client
1754 let client_can_upload c allowed =
1755 (* lprintf "allowed to upload %d\n" allowed; *)
1756 do_if_connected c.client_sock (fun sock ->
1757 match c.client_upload_requests with
1758 [] -> ()
1759 | _ :: tail ->
1760 let new_allowed_to_write =
1761 c.client_allowed_to_write ++ (Int64.of_int allowed) in
1762 if allowed > 0 && can_write_len sock
1763 (Int64.to_int new_allowed_to_write)
1764 then begin
1765 CommonUploads.consume_bandwidth allowed;
1766 c.client_allowed_to_write <- new_allowed_to_write;
1767 end;
1768 iter_upload sock c
1771 let file_resume file =
1772 List.iter (fun t ->
1773 match t.tracker_status with
1774 | Enabled | Disabled_mld _ -> ()
1775 | Disabled_failure _ | Disabled _ -> t.tracker_status <- Enabled
1776 ) file.file_trackers;
1777 (try talk_to_tracker file true with _ -> ())
1782 Send info to tracker when stopping a file.
1783 @param file the file we want to stop
1785 let file_stop file =
1786 if file.file_tracker_connected then
1787 begin
1788 connect_trackers file "stopped" false (fun _ _ ->
1789 lprintf_file_nl (as_file file) "Tracker return: stopped %s" file.file_name;
1790 file.file_tracker_connected <- false)
1794 Create the 'hooks'
1796 let _ =
1797 client_ops.op_client_can_upload <- client_can_upload;
1798 file_ops.op_file_resume <- file_resume;
1799 file_ops.op_file_recover <- file_resume;
1800 file_ops.op_file_pause <- (fun file ->
1801 Hashtbl.iter (fun _ c ->
1802 match c.client_sock with
1803 Connection sock -> close sock Closed_by_user
1804 | _ -> ()
1805 ) file.file_clients;
1806 (*When a file is paused we consider it is stopped*)
1807 file_stop file
1809 file_ops.op_file_queue <- file_ops.op_file_pause;
1810 client_ops.op_client_enter_upload_queue <- (fun c ->
1811 if !verbose_msg_clients then
1812 lprintf_nl "Client %d: client_enter_upload_queue" (client_num c);
1813 ready_for_upload (as_client c));
1814 network.op_network_connected_servers <- (fun _ -> []);