patch 7144
[mldonkey.git] / src / networks / bittorrent / bTClients.ml
blobb627daf7b7ae8bde43e066c56a3c56b5bd7b1f0a
1 (* Copyright 2001, 2002 b52_simon :), b8_bavard, b8_fee_carabine, INRIA *)
2 (*
3 This file is part of mldonkey.
5 mldonkey is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation; either version 2 of the License, or
8 (at your option) any later version.
10 mldonkey is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 GNU General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with mldonkey; if not, write to the Free Software
17 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
21 (** Functions used in client<->client communication
22 and also client<->tracker
25 (** A peer (or client) is always a remote peer in this file.
26 A Piece is a portion of the file associated with a hash (sha1).
27 In mldonkey a piece is referred as a block inside the swarming system.
28 A SubPiece is a portion of a piece (without hash) which can be
29 sent/downloaded to/from a peer.
30 In mldonkey a SubPiece is referred as a range inside the swarming system.
31 @see <http://wiki.theory.org/index.php/BitTorrentSpecification> wiki for some
32 unofficial (but more detailed) specs.
35 open Int64ops
36 open AnyEndian
37 open BigEndian
38 open Printf2
39 open Md4
40 open Options
41 open BasicSocket
42 open TcpBufferedSocket
43 open Ip_set
45 open CommonShared
46 open CommonUploads
47 open CommonOptions
48 open CommonDownloads
49 open CommonInteractive
50 open CommonClient
51 open CommonComplexOptions
52 open CommonTypes
53 open CommonFile
54 open CommonSwarming
55 open CommonGlobals
56 open CommonDownloads
58 open BTRate
59 open BTTypes
60 open BTProtocol
61 open BTOptions
62 open BTGlobals
63 open BTComplexOptions
64 open BTChooser
65 open BTStats
66 open TcpMessages
68 module VB = VerificationBitmap
70 let http_ok = "HTTP 200 OK"
71 let http11_ok = "HTTP/1.1 200 OK"
74 let next_uploaders = ref ([] : BTTypes.client list)
75 let current_uploaders = ref ([] : BTTypes.client list)
77 (** Check that client is valid and record it *)
78 let maybe_new_client file id ip port =
79 let cc = Geoip.get_country_code_option ip in
80 if id <> !!client_uid
81 && ip != Ip.null
82 && port <> 0
83 && (match !Ip.banned (ip, cc) with
84 | None -> true
85 | Some reason ->
86 if !verbose_connect then
87 lprintf_file_nl (as_file file) "%s:%d blocked: %s" (Ip.to_string ip) port reason;
88 false)
89 then
90 ignore (new_client file id (ip,port) cc);
91 if !verbose_sources > 1 then
92 lprintf_file_nl (as_file file) "Received %s:%d" (Ip.to_string ip) port
95 let resume_clients_hook = ref (fun _ -> assert false)
97 include struct
99 (* open modules locally *)
100 open BTUdpTracker
101 open UdpSocket
103 let string_of_event = function
104 | READ_DONE -> "READ_DONE"
105 | WRITE_DONE -> "WRITE_DONE"
106 | CAN_REFILL -> "CAN_REFILL"
107 | BASIC_EVENT e -> match e with
108 | CLOSED reason -> "CLOSED " ^ (string_of_reason reason)
109 | RTIMEOUT -> "RTIMEOUT"
110 | WTIMEOUT -> "WTIMEOUT"
111 | LTIMEOUT -> "LTIMEOUT"
112 | CAN_READ -> "CAN_READ"
113 | CAN_WRITE -> "CAN_WRITE"
115 (** talk to udp tracker and parse response
116 except of parsing should perform everything that
117 talk_to_tracker's inner function does FIXME refactor both
119 Better create single global udp socket and use it for all
120 tracker requests and distinguish trackers by txn? FIXME?
122 let talk_to_udp_tracker host port args file t need_sources =
123 let interact ip =
124 let socket = create (Ip.to_inet_addr !!client_bind_addr) 0 (fun sock event ->
125 (* lprintf_nl "udpt got event %s for %s" (string_of_event event) host; *)
126 match event with
127 | WRITE_DONE | CAN_REFILL -> ()
128 | READ_DONE -> assert false (* set_reader prevents this *)
129 | BASIC_EVENT x -> match x with
130 | CLOSED _ -> ()
131 | CAN_READ | CAN_WRITE -> assert false (* udpSocket implementation prevents this *)
132 | LTIMEOUT | WTIMEOUT | RTIMEOUT -> close sock (Closed_for_error "udpt timeout"))
134 let set_reader f =
135 set_reader socket begin fun _ ->
136 try f () with exn ->
137 lprintf_nl "udpt interact exn %s" (Printexc2.to_string exn);
138 close socket (Closed_for_exception exn)
141 BasicSocket.set_wtimeout (sock socket) 60.;
142 BasicSocket.set_rtimeout (sock socket) 60.;
143 let txn = Random.int32 Int32.max_int in
144 (* lprintf_nl "udpt txn %ld for %s" txn host; *)
145 write socket false (connect_request txn) ip port;
146 set_reader begin fun () ->
147 let p = read socket in
148 let conn = connect_response p.udp_content txn in
149 (* lprintf_nl "udpt connection_id %Ld for %s" conn host; *)
150 let txn = Random.int32 Int32.max_int in
151 (* lprintf_nl "udpt txn' %ld for host %s" txn host; *)
152 let int s = Int64.of_string (List.assoc s args) in
153 let req = announce_request conn txn
154 ~info_hash:(List.assoc "info_hash" args)
155 ~peer_id:(List.assoc "peer_id" args)
156 (int "downloaded",int "left",int "uploaded")
157 (match try List.assoc "event" args with Not_found -> "" with
158 | "completed" -> 1l
159 | "started" -> 2l
160 | "stopped" -> 3l
161 | "" -> 0l
162 | s -> lprintf_nl "udpt event %s? for %s" s host; 0l)
163 ~ip:(if !!force_client_ip then (Int64.to_int32 (Ip.to_int64 !!set_client_ip)) else 0l)
164 ~numwant:(if need_sources then try Int32.of_string (List.assoc "numwant" args) with _ -> -1l else 0l)
165 (int_of_string (List.assoc "port" args))
167 write socket false req ip port;
168 set_reader (fun () ->
169 let p = read socket in
171 t.tracker_last_conn <- last_time ();
172 file.file_tracker_connected <- true;
173 t.tracker_interval <- 600;
174 t.tracker_min_interval <- 600;
175 if need_sources then t.tracker_last_clients_num <- 0;
177 let (interval,clients) = announce_response p.udp_content txn in
178 if !verbose_msg_servers then
179 lprintf_nl "udpt got interval %ld clients %d for host %s" interval (List.length clients) host;
180 if interval > 0l then
181 begin
182 t.tracker_interval <- Int32.to_int interval;
183 if t.tracker_min_interval > t.tracker_interval then
184 t.tracker_min_interval <- t.tracker_interval
185 end;
186 if need_sources then
187 List.iter (fun (ip',port) ->
188 let ip = Ip.of_int64 (Int64.logand 0xFFFFFFFFL (Int64.of_int32 ip')) in
189 (* lprintf_nl "udpt got %s:%d" (Ip.to_string ip) port; *)
190 t.tracker_last_clients_num <- t.tracker_last_clients_num + 1;
191 maybe_new_client file Sha1.null ip port
192 ) clients;
193 close socket Closed_by_user;
194 if !verbose_msg_servers then
195 lprintf_nl "udpt interact done for %s" host;
196 if need_sources then !resume_clients_hook file
197 ) end
200 if !verbose_msg_servers then
201 lprintf_nl "udpt start with %s:%d" host port;
202 Ip.async_ip host (fun ip ->
203 (* lprintf_nl "udpt resolved %s to ip %s" host (Ip.to_string ip); *)
204 try interact ip with exn -> lprintf_nl "udpt interact exn %s" (Printexc2.to_string exn))
205 (fun n ->
206 if !verbose_msg_servers then
207 lprintf_nl "udpt failed to resolve %s (%d)" host n)
208 with
209 exn ->
210 lprintf_nl "udpt start exn %s" (Printexc2.to_string exn)
212 end (* include *)
215 In this function we connect to a tracker.
216 @param file The file concerned by the request
217 @param url Url of the tracker to connect
218 @param event Event (as a string) to send to the tracker :
219 can be 'completed' if the file is complete, 'started' for the first
220 connection to this tracker or 'stopped' for a clean stop of the file.
221 Everything else will be ok for a second connection to the tracker.
222 Be careful to the spelling of this event
223 @param f The function used to parse the result of the connection.
224 The function will get a file as an argument (@see talk_to_tracker
225 for an example)
227 If we have less than !!ask_tracker_threshold sources
228 and if we respect the file_tracker_interval then
229 we really ask sources to the tracker
231 let connect_trackers file event need_sources f =
233 (* reset session statistics when sending 'started' event *)
234 if event = "started" then
235 begin
236 file.file_session_uploaded <- Int64.zero;
237 file.file_session_downloaded <- Int64.zero;
238 end;
240 let args,must_check_delay, left =
242 match file.file_swarmer with
243 None ->
244 begin
245 match event with
246 | "started" -> [("event", "started")],true,zero
247 | "stopped" -> [("event", "stopped")],false,zero
248 | _ -> [],true,zero
251 | Some swarmer ->
252 let local_downloaded = CommonSwarming.downloaded swarmer in
253 let left = file_size file -- local_downloaded in
254 match event with
255 | "completed" -> [("event", "completed")],false,zero
256 | "started" -> [("event", "started")],true,left
257 | "stopped" -> [("event", "stopped")],false,left
258 | _ -> [],true,left
261 let args = ("no_peer_id", "1") :: ("compact", "1") :: args in
262 let args =
263 if not need_sources then
264 ("numwant", "0") :: args
265 else if !!numwant > -1 then
266 ("numwant", string_of_int !!numwant) :: args
267 else
268 args
270 let args = if !!send_key then
271 ("key", Sha1.to_hexa !!client_uid) :: args else args
273 let args = if !!force_client_ip then
274 ("ip", Ip.to_string !!set_client_ip) :: args else args
276 let args =
277 ("info_hash", Sha1.direct_to_string file.file_id) ::
278 ("peer_id", Sha1.direct_to_string !!client_uid) ::
279 ("port", string_of_int !!client_port) ::
280 ("uploaded", Int64.to_string file.file_session_uploaded) ::
281 ("downloaded", Int64.to_string file.file_session_downloaded) ::
282 ("left", Int64.to_string left) ::
283 args
286 let enabled_trackers =
287 let enabled_trackers = List.filter (fun t -> tracker_is_enabled t) file.file_trackers in
288 if enabled_trackers <> [] then enabled_trackers
289 else begin
290 (* if there is no tracker left, do something ? *)
291 if !verbose_msg_servers then
292 lprintf_nl "No trackers left for %s, reenabling all of them..." (file_best_name (as_file file));
293 List.iter (fun t ->
294 match t.tracker_status with
295 (* only re-enable after normal error *)
296 | Disabled _ -> t.tracker_status <- Enabled
297 | _ -> ()) file.file_trackers;
298 let enabled_trackers = List.filter (fun t -> tracker_is_enabled t) file.file_trackers in
299 if enabled_trackers = [] && (file_state file) <> FilePaused then
300 begin
301 file_pause (as_file file) (CommonUserDb.admin_user ());
302 lprintf_file_nl (as_file file) "Paused %s, no usable trackers left" (file_best_name (as_file file))
303 end;
304 enabled_trackers
305 end in
307 List.iter (fun t ->
309 (* if we have too few sources we may ask the tracker before the interval *)
310 if not must_check_delay
311 || not file.file_tracker_connected
312 || t.tracker_last_conn + t.tracker_interval < last_time()
313 || ( file.file_clients_num < !!ask_tracker_threshold
314 && (file_state file) == FileDownloading
315 && (if t.tracker_min_interval > !!min_tracker_reask_interval then
316 t.tracker_last_conn + t.tracker_min_interval < last_time()
317 else
318 t.tracker_last_conn + !!min_tracker_reask_interval < last_time() ))
319 then
320 begin
321 (* if we already tried to connect but failed, disable tracker, but allow re-enabling *)
322 (* FIXME t.tracker_last_conn < 1 only at first connect, so later failures will stay undetected! *)
323 if file.file_tracker_connected && t.tracker_last_clients_num = 0 && t.tracker_last_conn < 1 then
324 begin
325 if !verbose_msg_servers then
326 lprintf_nl "Request error from tracker: disabling %s" (show_tracker_url t.tracker_url);
327 t.tracker_status <- Disabled (intern "MLDonkey: Request error from tracker")
329 (* Send request to tracker *)
330 else
331 let args = if String.length t.tracker_id > 0 then
332 ("trackerid", t.tracker_id) :: args else args
334 let args = if String.length t.tracker_key > 0 then
335 ("key", t.tracker_key) :: args else args
337 if !verbose_msg_servers then
338 lprintf_nl "connect_trackers: connected:%s id:%s key:%s last_clients:%i last_conn-last_time:%i numwant:%s file: %s"
339 (string_of_bool file.file_tracker_connected)
340 t.tracker_id t.tracker_key t.tracker_last_clients_num
341 (t.tracker_last_conn - last_time()) (try List.assoc "numwant" args with _ -> "_") file.file_name;
343 match t.tracker_url with
344 | `Http url ->
345 let module H = Http_client in
346 let r = {
347 H.basic_request with
348 H.req_url = Url.of_string ~args: args url;
349 H.req_proxy = !CommonOptions.http_proxy;
350 H.req_user_agent = get_user_agent ();
351 (* #4541 [egs] supports redirect *)
352 H.req_max_retry = !!max_tracker_redirect;
353 } in
355 if !verbose_msg_servers then
356 lprintf_nl "Request sent to tracker %s for file: %s"
357 url file.file_name;
358 H.wget r
359 (fun fileres ->
360 t.tracker_last_conn <- last_time ();
361 file.file_tracker_connected <- true;
362 f t fileres)
363 | `Other url -> assert false (* should have been disabled *)
364 | `Udp (host,port) -> talk_to_udp_tracker host port args file t need_sources
367 else
368 if !verbose_msg_servers then
369 lprintf_nl "Request NOT sent to tracker %s - next request in %ds for file: %s"
370 (show_tracker_url t.tracker_url) (t.tracker_interval - (last_time () - t.tracker_last_conn)) file.file_name
371 ) enabled_trackers
373 let start_upload c =
374 set_client_upload (as_client c) (as_file c.client_file);
375 set_client_has_a_slot (as_client c) NormalSlot;
376 Rate.update_no_change c.client_downloaded_rate;
377 Rate.update_no_change c.client_upload_rate;
378 c.client_last_optimist <- last_time();
379 client_enter_upload_queue (as_client c);
380 send_client c Unchoke
382 (** In this function we decide which peers will be
383 uploaders. We send a choke message to current uploaders
384 that are not in the next uploaders list. We send Unchoke
385 for clients that are in next list (and not in current)
387 let recompute_uploaders () =
388 if !verbose_upload then lprintf_nl "recompute_uploaders";
389 next_uploaders := choose_uploaders current_files;
390 (*Send choke if a current_uploader is not in next_uploaders*)
391 List.iter ( fun c -> if ((List.mem c !next_uploaders)==false) then
392 begin
393 set_client_has_a_slot (as_client c) NoSlot;
394 (*we will let him finish his download and choke him on next_request*)
396 ) !current_uploaders;
398 (*don't send Choke if new uploader is already an uploaders *)
399 List.iter ( fun c ->
400 if not (List.mem c !current_uploaders) then start_upload c
401 ) !next_uploaders;
402 current_uploaders := !next_uploaders
405 (****** Fabrice: why are clients which are disconnected removed ???
406 These clients might still be useful to reconnect to, no ? *)
409 (** This function is called when a client is disconnected
410 (be it by our side or its side).
411 A client which disconnects (even only one time) is discarded.
412 If it's an uploader which disconnects we recompute uploaders
413 (see recompute_uploaders) immediately.
414 @param c The client to disconnect
415 @param reason The reason for the disconnection (see in BasicSocket.ml)
417 let disconnect_client c reason =
418 if !verbose_msg_clients then
419 lprintf_nl "Client %d: disconnected: %s" (client_num c) (string_of_reason reason);
420 begin
421 match c.client_sock with
422 NoConnection -> ()
423 | ConnectionWaiting token ->
424 cancel_token token;
425 c.client_sock <- NoConnection
426 | Connection sock ->
427 close sock reason;
429 (* List.iter (fun r -> CommonSwarming.free_range r) c.client_ranges; *)
430 set_client_disconnected c reason;
431 c.client_session_downloaded <- 0L;
432 c.client_session_uploaded <- 0L;
433 (try if c.client_good then count_seen c with _ -> ());
434 (* this is not useful already done in the match
435 (try close sock reason with _ -> ()); *)
436 (*---------not needed ?? VvvvvV---------------
437 c.client_ranges <- [];
438 c.client_block <- None;
439 if not c.client_good then
440 connection_failed c.client_connection_control;
441 c.client_good <- false;
442 c.client_sock <- NoConnection;
443 c.client_chunks <- [];
444 c.client_allowed_to_write <- zero;
445 c.client_new_chunks <- [];
446 c.client_interesting <- false;
447 c.client_alrd_sent_interested <- false;
448 -------------------^^^^^--------------------*)
449 if (c.client_registered_bitfield) then
450 begin
451 match c.client_uploader with
452 None -> ()
453 | Some up ->
454 c.client_uploader <- None;
455 (* If the client registered a bitfield then
456 we must unregister him to update the swarmer
457 (Useful for availability)
459 CommonSwarming.unregister_uploader up
460 (* c.client_registered_bitfield <- false;
461 for i = 0 to String.length c.client_bitmap - 1 do
462 c.client_bitmap.[0] <- '0';
463 done*)
464 end;
465 (* Don't test if a client have an upload slot because
466 it don't have one (removed during earlier in
467 set_client_disconnected c reason)
469 if (List.mem c !current_uploaders) then
470 begin
471 (*BTGlobals.remove_client*)
472 remove_client c;
473 recompute_uploaders ();
475 else
476 remove_client c;
477 with _ -> ()
481 (** Disconnect all clients of a file
482 @param file The file to which we must disconnects all clients
484 let disconnect_clients file =
485 let must_keep = ref true in
486 (match file_state file with
487 | FilePaused | FileCancelled -> must_keep:=false
488 | _-> ()
490 Hashtbl.iter (fun _ c ->
491 if not ( !must_keep && (client_has_a_slot (as_client c) || c.client_interested)) then
492 begin
493 if !verbose_msg_clients then
494 lprintf_file_nl (as_file file) "disconnect since download is finished";
495 disconnect_client c Closed_by_user
497 ) file.file_clients
500 (** What to do when a file is finished
501 @param file the finished file
503 let download_finished file =
504 if List.memq file !current_files then
505 begin
506 connect_trackers file "completed" false (fun _ _ ->
507 lprintf_file_nl (as_file file) "Tracker return: completed %s" file.file_name;
508 ()); (*must be called before swarmer gets removed from file*)
509 (*CommonComplexOptions.file_completed*)
510 file_completed (as_file file);
511 (* Remove the swarmer for this file as it is not useful anymore... *)
512 CommonSwarming.remove_swarmer file.file_swarmer;
513 file.file_swarmer <- None;
514 (* At this point, the file state is FileDownloaded. We should not remove
515 the file, because we continue to upload. *)
519 (** Check if a file is finished or not.
520 A file is finished if all blocks are verified.
521 @param file The file to check status
523 let check_finished swarmer file =
524 if CommonSwarming.check_finished swarmer then
525 download_finished file
527 let bits = [| 128; 64; 32;16;8;4;2;1 |]
529 (* Check/set bits in strings (bittorrent format) *)
531 let is_bit_set s n =
532 (Char.code s.[n lsr 3]) land bits.(n land 7) <> 0
534 let set_bit s n =
535 let i = n lsr 3 in
536 s.[i] <- Char.unsafe_chr (Char.code s.[i] lor bits.(n land 7))
538 (* Official client seems to use max_range_request 5 and max_range_len 2^14 *)
539 (* How much requests in the 'pipeline' *)
540 let max_range_requests = 5
541 (* How much bytes we can request in one Piece *)
544 (** A wrapper to send Interested message to a client.
545 (Send interested only if needed)
546 @param c The client to send Interested
548 let send_interested c =
549 if c.client_interesting && (not c.client_alrd_sent_interested) then
550 begin
551 c.client_alrd_sent_interested <- true;
552 send_client c Interested
556 (** Send a Bitfield message to a client.
557 @param c The client to send the Bitfield message
560 let send_bitfield c =
561 send_client c (BitField
563 match c.client_file.file_swarmer with
564 | None ->
565 (* This must be a seeded file... *)
566 if !verbose_download then
567 lprintf_nl "Sending completed verified bitmap";
568 let nchunks = Array.length c.client_file.file_chunks in
569 let len = (nchunks+7)/8 in
570 let s = String.make len '\000' in
571 for i = 0 to nchunks - 1 do
572 set_bit s i
573 done;
575 | Some swarmer ->
576 let bitmap = CommonSwarming.chunks_verified_bitmap swarmer in
577 if !verbose_download then
578 lprintf_nl "Sending verified bitmap: [%s]" (VB.to_string bitmap);
579 let len = (VB.length bitmap + 7)/8 in
580 let s = String.make len '\000' in
581 VB.iteri (fun i c ->
582 if c = VB.State_verified then set_bit s i) bitmap;
586 let counter = ref 0
588 let parse_reserved rbits c =
589 let has_bit pos h = Char.code rbits.[pos] land h <> 0 in
591 c.client_dht <- has_bit 7 0x01;
592 c.client_cache_extension <- has_bit 7 0x02;
593 c.client_fast_extension <- has_bit 7 0x04;
595 c.client_utorrent_extension <- has_bit 5 0x10;
597 c.client_azureus_messaging_protocol <- has_bit 0 0x80
600 (** This function is called to parse the first message that
601 a client send.
602 @param counter client num
603 @param cc Expected client (probably useless now that we don't save any client)
604 @param init_sent A boolean to know if we sent this client the handshake message
605 @param gconn Don't know
606 @param sock The socket we use for this client
607 @param proto Unused (required by tuple type?)
608 @param file_id The file hash (sha1) of the file involved in this exchange
610 (* removed: @param peer_id The hash (sha1) of the client. (Should be checked)
612 let rec client_parse_header counter cc init_sent gconn sock
613 (proto, rbits, file_id) =
615 set_lifetime sock 600.;
616 if !verbose_msg_clients then
617 lprintf_nl "client_parse_header %d" counter;
619 let file = Hashtbl.find files_by_uid file_id in
620 if !verbose_msg_clients then
621 lprintf_file_nl (as_file file) "file found";
622 let ccc, cc_country_code = !cc in
623 let c =
624 match ccc with
625 None ->
626 let c = new_client file Sha1.null (TcpBufferedSocket.peer_addr sock) cc_country_code in
627 if !verbose_connect then lprintf_file_nl (as_file file) "Client %d: incoming connection" (client_num c);
628 cc := (Some c), cc_country_code;
630 | Some c ->
631 (* Does it happen that this c was already used to connect successfully?
632 If yes then this must happen: *)
633 c.client_received_peer_id <- false;
634 if cc_country_code <> None && c.client_country_code = None then
635 c.client_country_code <- cc_country_code;
637 (* client could have had Sha1.null as peer_id/uid *)
638 (* this is to be done, later
639 if c.client_uid <> peer_id then
640 c.client_software <- (parse_software (Sha1.direct_to_string peer_id));
644 (* if c.client_uid <> peer_id then begin
645 lprintf "Unexpected client by UID\n";
646 let ccc = new_client file peer_id (TcpBufferedSocket.host sock) in
647 lprintf "CLIENT %d: testing instead of %d\n"
648 (client_num ccc) (client_num c);
649 (match ccc.client_sock with
650 Connection _ ->
651 lprintf_nl "[BT]: This client is already connected";
652 close sock (Closed_for_error "Already connected");
653 remove_client ccc;
655 | _ ->
656 lprintf_nl "[BT]: Client %d: recovered by UID" (client_num ccc);
657 remove_client c;
658 cc := Some ccc;
659 ccc)
660 end else
661 c *)
664 if !verbose_msg_clients then begin
665 let (ip,port) = c.client_host in
666 lprintf_nl "Client %d: Connected from %s:%d" (client_num c)
667 (Ip.to_string ip) port;
668 end;
670 parse_reserved rbits c;
672 (match c.client_sock with
673 NoConnection ->
674 if !verbose_msg_clients then begin
675 let (ip,port) = c.client_host in
676 lprintf_nl "No connection to client (%s:%d)!!!" (Ip.to_string ip) port;
677 end;
678 c.client_sock <- Connection sock
679 | ConnectionWaiting token ->
680 cancel_token token;
681 if !verbose_msg_clients then
682 lprintf_nl "Waiting for connection to client !!!";
683 c.client_sock <- Connection sock
684 | Connection s when s != sock ->
685 if !verbose_msg_clients then
686 lprintf_nl "CLIENT %d: IMMEDIATE RECONNECTION" (client_num c);
687 disconnect_client c (Closed_for_error "Reconnected");
688 c.client_sock <- Connection sock;
689 | Connection _ -> ()
692 set_client_state (c) (Connected (-1));
693 if not init_sent then
694 begin
695 c.client_incoming <- true;
696 send_init !!client_uid file_id sock;
697 end;
698 connection_ok c.client_connection_control;
699 if !verbose_msg_clients then
700 lprintf_nl "file and client found";
701 (* if not c.client_incoming then *)
702 send_bitfield c;
703 c.client_blocks_sent <- file.file_blocks_downloaded;
705 TODO !!! : send interested if and only if we are interested
706 -> we must recieve at least other peer bitfield.
707 in common swarmer -> compare : partition -> partition -> bool
710 set_rtimeout sock !!client_timeout;
711 (* Once parsed succesfully we define the function client_to_client
712 to be the function used when a message is read *)
713 gconn.gconn_handler <- Reader (fun gconn sock ->
714 bt_handler TcpMessages.parsing (client_to_client c) c sock
717 let b = TcpBufferedSocket.buf sock in
718 (* The receive buffer is normally not empty now, lets parse the rest, most likely PeerID *)
719 if b.len <> 0 then
720 ignore (bt_handler TcpMessages.parsing (client_to_client c) c sock);
722 (* Some newer clients send more opcodes in their handshake packet, lets parse them now.
723 Using "while b.len <> 0 do ... done" is not possible here because libtorrent clients
724 send unparsable five extra bytes after their PeerID which would result into a loop *)
725 if b.len <> 0 then
726 ignore (bt_handler TcpMessages.parsing (client_to_client c) c sock);
728 with
729 | Not_found ->
730 let (ip,port) = (TcpBufferedSocket.peer_addr sock) in
731 if !verbose_unexpected_messages then
732 lprintf_nl "Client %s:%d requested a file that is not shared [%s]"
733 (Ip.to_string ip) port (Sha1.to_hexa file_id)
734 | e ->
735 lprintf_nl "Exception %s in client_parse_header" (Printexc2.to_string e);
736 close sock (Closed_for_exception e);
737 raise e
740 (** Update the bitmap of a client. Unclear if it is still useful.
741 @param c The client which we want to update.
743 and update_client_bitmap c =
744 let file = c.client_file in
746 let swarmer = match file.file_swarmer with
747 None -> assert false
748 | Some swarmer -> swarmer
751 let up =
752 match c.client_uploader with
753 None ->
754 let up = CommonSwarming.register_uploader swarmer (as_client c)
755 (AvailableIntervals []) in
756 c.client_uploader <- Some up;
758 | Some up ->
762 let bitmap = match c.client_bitmap with
763 None ->
764 let len = CommonSwarming.partition_size swarmer in
765 let bitmap = Bitv.create len false in
766 c.client_bitmap <- Some bitmap;
767 bitmap
768 | Some bitmap -> bitmap
771 if c.client_new_chunks <> [] then begin
772 let chunks = c.client_new_chunks in
773 c.client_new_chunks <- [];
774 List.iter (fun n -> Bitv.set bitmap n true) chunks;
775 CommonSwarming.update_uploader_intervals up (AvailableBitv bitmap);
779 (** In this function we decide which piece we must request from client.
780 @param sock Socket of the client
781 @param c The client
783 and get_from_client sock (c: client) =
784 let file = c.client_file in
785 (* Check if there's not enough requests in the 'pipeline'
786 and if a request can be send (not choked and file is downloading) *)
787 if List.length c.client_ranges_sent < max_range_requests
788 && file_state file = FileDownloading
789 && (c.client_choked == false)
790 then
791 (* num is the number of the piece, x and y are the position
792 of the subpiece in the piece(!), r is a (CommonSwarmer) range *)
794 let up = match c.client_uploader with
795 None -> assert false
796 | Some up -> up in
797 let swarmer = CommonSwarming.uploader_swarmer up in
801 let num, x,y, r =
803 if !verbose_msg_clients then
804 lprintf_file_nl (as_file file) "CLIENT %d: Finding new range to send" (client_num c);
806 if !verbose_swarming then begin
807 lprintf_n "Current download:\n Current chunks: ";
810 List.iter (fun (x,y) -> lprintf "%Ld-%Ld " x y) c.client_chunks
811 with _ -> lprintf "No Chunks";
813 lprint_newline ();
815 lprintf_n "Current ranges: ";
817 List.iter (fun (p1,p2, r) ->
818 let (x,y) = CommonSwarming.range_range r in
819 lprintf "%Ld-%Ld[%Ld-%Ld] " p1 p2 x y
820 ) c.client_ranges_sent;
822 match c.client_range_waiting with
823 | None -> ()
824 | Some (x,y,r) -> lprintf "Waiting %Ld-%Ld" x y;
826 lprint_newline ();
828 lprintf_n "Current blocks: ";
830 match c.client_chunk with
831 | None -> lprintf "none"
832 | Some (chunk, blocks) -> List.iter (fun b ->
833 CommonSwarming.print_block b.up_block) blocks;
835 lprint_newline ();
837 lprintf_file_nl (as_file file) "Finding Range:";
838 end;
842 (*We must find a block to request first, and then
843 some range inside this block
846 let rec iter () =
848 match c.client_chunk with
850 | None ->
852 if !verbose_swarming then lprintf_file_nl (as_file file) "No block";
853 update_client_bitmap c;
854 (try CommonSwarming.verify_one_chunk swarmer with _ -> ());
855 (*Find a free block in the swarmer*)
856 let chunk, blocks = CommonSwarming.find_blocks up in
857 if !verbose_swarming then begin
858 lprintf_n "Blocks Found: "; List.iter (fun b ->
859 CommonSwarming.print_block b.up_block) blocks;
860 lprint_newline ()
861 end;
862 c.client_chunk <- Some (chunk, blocks);
864 (*We put the found block in client_block to
865 request range in this block. (Useful for
866 not searching each time a new block)
869 iter ()
871 | Some (chunk, blocks) ->
873 if !verbose_swarming then begin
874 lprintf_n "Current Blocks: "; List.iter (fun b ->
875 CommonSwarming.print_block b.up_block) blocks;
876 lprint_newline ()
877 end;
880 (*Given a block find a range inside*)
881 let (x,y,r) =
882 match c.client_range_waiting with
883 | Some (x,y,r) ->
884 c.client_range_waiting <- None;
885 (x,y,r)
886 | None ->
887 CommonSwarming.find_range up (min max_range_len file.file_piece_size)
890 let (x,y,r) =
892 if y -- x > max_range_len then begin
893 c.client_range_waiting <- Some (x ++ max_range_len, y, r);
894 (x, x ++ max_range_len, r)
895 end else
896 (x,y,r)
899 c.client_ranges_sent <- c.client_ranges_sent @ [x,y, r];
900 (* CommonSwarming.alloc_range r; *)
902 (* naughty, naughty, was computing a block number instead of a chunk
903 number. Only matters with merged downloads, and even then other
904 clients didn't seem to care (?), so the bug remained hidden *)
905 if !verbose_swarming then
906 lprintf_file_nl (as_file file) "Asking %d For Range %Ld-%Ld" chunk x y;
908 chunk, x -- file.file_piece_size ** Int64.of_int chunk, y -- x, r
910 with Not_found ->
912 (*If we don't find a range to request inside the block,
913 iter to choose another block*)
914 if !verbose_swarming then
915 lprintf_nl "Could not find range in current block";
916 (* c.client_blocks <- List2.removeq b c.client_blocks; *)
918 c.client_chunk <- None;
920 iter ()
923 iter ()
925 with Not_found ->
926 (*If we don't find a block to request we can check if the
927 file is finished (if there's missing pieces we can't decide
928 that the file is finished because we didn't found
929 a block to ask)
931 if !verbose_swarming then
932 lprintf_nl "Unable to get a block !!";
933 CommonSwarming.compute_bitmap swarmer;
934 check_finished swarmer file;
935 raise Not_found
938 send_client c (Request (num,x,y));
940 if !verbose_msg_clients then
941 lprintf_file_nl (as_file file) "CLIENT %d: Asking %s For Range %Ld-%Ld"
942 (client_num c) (Sha1.to_string c.client_uid) x y
944 with Not_found ->
945 if not (CommonSwarming.check_finished swarmer) && !verbose_download then
946 lprintf_file_nl (as_file file) "BTClient.get_from_client ERROR: can't find a block to download and file is not yet finished for file : %s..." file.file_name
949 (** In this function we match a message sent by a client
950 and react according to this message.
951 @param c The client which sent us a message
952 @param sock The socket used for this client
953 @param msg The message sent by the client
955 and client_to_client c sock msg =
956 if !verbose_msg_clients then begin
957 let (ip,port) = (TcpBufferedSocket.peer_addr sock) in
958 let (timeout, next) = get_rtimeout sock in
959 lprintf_nl "CLIENT %d(%s:%d): (%d, %d,%d) Received %s"
960 (client_num c) (Ip.to_string ip) port
961 (last_time ())
962 (int_of_float timeout)
963 (int_of_float next)
964 (TcpMessages.to_string msg);
965 end;
967 let file = c.client_file in
969 (* Sending the "Have" message was moved to bTGlobals so this is useless *)
970 (* if c.client_blocks_sent != file.file_blocks_downloaded then begin
971 let rec iter list =
972 match list with
973 [] -> ()
974 | b :: tail when tail == c.client_blocks_sent ->
975 c.client_blocks_sent <- list;
976 let (num,_,_) = CommonSwarming.block_block b in
977 send_client c (Have (Int64.of_int num))
978 | _ :: tail -> iter tail
980 iter file.file_blocks_downloaded
981 end;*)
984 match msg with
985 Piece (num, offset, s, pos, len) ->
986 (*A Piece message contains the data*)
987 set_client_state c (Connected_downloading (file_num file));
988 (*flag it as a good client *)
989 c.client_good <- true;
990 if file_state file = FileDownloading then begin
991 let position = offset ++ file.file_piece_size *.. num in
992 let up = match c.client_uploader with
993 None -> assert false
994 | Some up -> up in
995 let swarmer = CommonSwarming.uploader_swarmer up in
997 if !verbose_msg_clients then
998 (match c.client_ranges_sent with
999 [] -> lprintf_file_nl (as_file file) "EMPTY Ranges !!!"
1000 | (p1,p2,r) :: _ ->
1001 let (x,y) = CommonSwarming.range_range r in
1002 lprintf_file_nl (as_file file) "Current range from %s : %Ld [%d] (asked %Ld-%Ld[%Ld-%Ld])"
1003 (brand_to_string c.client_brand) position len
1004 p1 p2 x y
1007 let old_downloaded =
1008 CommonSwarming.downloaded swarmer in
1009 (* List.iter CommonSwarming.free_range c.client_ranges; *)
1010 CommonSwarming.received up
1011 position s pos len;
1012 (* List.iter CommonSwarming.alloc_range c.client_ranges; *)
1013 let new_downloaded =
1014 CommonSwarming.downloaded swarmer in
1016 (*Update rate and amount of data received from client*)
1017 count_download c (new_downloaded -- old_downloaded);
1018 (* use len here with max_dr quickfix *)
1019 Rate.update c.client_downloaded_rate ~amount:len;
1020 (* count bytes downloaded from network for this file *)
1021 file.file_session_downloaded <- file.file_session_downloaded ++ (Int64.of_int len);
1022 if !verbose_msg_clients then
1023 (match c.client_ranges_sent with
1024 [] -> lprintf_file_nl (as_file file) "EMPTY Ranges !!!"
1025 | (p1,p2,r) :: _ ->
1026 let (x,y) = CommonSwarming.range_range r in
1027 lprintf_file_nl (as_file file) "Received %Ld [%d] %Ld-%Ld[%Ld-%Ld] -> %Ld"
1028 position len
1029 p1 p2 x y
1030 (new_downloaded -- old_downloaded)
1033 (* changed 2.5.28 should have been done before !
1034 if new_downloaded <> old_downloaded then
1035 add_file_downloaded (as_file file)
1036 (new_downloaded -- old_downloaded); *)
1037 end;
1038 begin
1039 match c.client_ranges_sent with
1040 [] -> ()
1041 | r :: tail ->
1042 (* CommonSwarming.free_range r; *)
1043 c.client_ranges_sent <- tail;
1044 end;
1045 get_from_client sock c;
1047 (* Check if the client is still interesting for us... *)
1048 check_if_interesting file c
1050 | PeerID p ->
1051 (* Disconnect if that is ourselves. *)
1052 c.client_uid <- Sha1.direct_of_string p;
1053 if not (c.client_uid = !!client_uid) then
1054 begin
1055 let brand, release = parse_software p in
1056 c.client_brand <- brand;
1057 c.client_release <- release;
1058 send_client c Choke;
1059 c.client_sent_choke <- true;
1061 else
1062 disconnect_client c Closed_by_user
1065 | BitField p ->
1066 (*A bitfield is a summary of what a client have*)
1067 begin
1068 match c.client_file.file_swarmer with
1069 None -> ()
1070 | Some swarmer ->
1071 c.client_new_chunks <- [];
1073 let npieces = CommonSwarming.partition_size swarmer in
1074 let nbits = String.length p * 8 in
1076 if nbits < npieces then begin
1077 lprintf_file_nl (as_file file) "Error: expected bitfield of atleast %d but got %d" npieces nbits;
1078 disconnect_client c (Closed_for_error "Wrong bitfield length")
1079 end else begin
1081 let bitmap = CommonSwarming.chunks_verified_bitmap swarmer in
1083 for i = 0 to npieces - 1 do
1084 if is_bit_set p i then begin
1085 c.client_new_chunks <- i :: c.client_new_chunks;
1086 match VB.get bitmap i with
1087 | VB.State_missing | VB.State_partial ->
1088 c.client_interesting <- true
1089 | VB.State_complete | VB.State_verified -> ()
1090 end
1091 done;
1093 update_client_bitmap c;
1094 c.client_registered_bitfield <- true;
1096 if c.client_interesting then
1097 send_interested c;
1099 if !verbose_msg_clients then
1100 lprintf_file_nl (as_file file) "New BitField Registered";
1102 (* for i = 1 to max_range_requests - List.length c.client_ranges do
1103 (try get_from_client sock c with _ -> ())
1104 done
1107 end;
1108 end;
1109 (* Note: a bitfield must only be sent after the handshake and before everything else: NOT here *)
1111 | Have n ->
1112 (* A client can send a "Have" without sending a Bitfield *)
1113 begin
1114 match c.client_file.file_swarmer with
1115 None -> ()
1116 | Some swarmer ->
1117 let n = Int64.to_int n in
1118 let bitmap = CommonSwarming.chunks_verified_bitmap swarmer in
1119 (* lprintf_nl "verified: %c;" (VB.state_to_char (VB.get bitmap n)); *)
1120 (* if the peer has a chunk we don't, tell him we're interested and update his bitmap *)
1121 match VB.get bitmap n with
1122 | VB.State_missing | VB.State_partial ->
1123 c.client_interesting <- true;
1124 send_interested c;
1125 c.client_new_chunks <- n :: c.client_new_chunks;
1126 update_client_bitmap c;
1127 | VB.State_complete | VB.State_verified -> ()
1129 (* begin
1130 match c.client_bitmap, c.client_uploader with
1131 Some bitmap, Some up ->
1132 let swarmer = CommonSwarming.uploader_swarmer up in
1133 let n = Int64.to_int n in
1134 if bitmap.[n] <> '1' then
1136 let verified = CommonSwarming.verified_bitmap swarmer in
1137 if verified.[n] < '2' then begin
1138 c.client_interesting <- true;
1139 send_interested c;
1140 c.client_new_chunks <- n :: c.client_new_chunks;
1141 if c.client_block = None then begin
1142 update_client_bitmap c;
1143 (* for i = 1 to max_range_requests -
1144 List.length c.client_ranges do
1145 (try get_from_client sock c with _ -> ())
1146 done*)
1149 | None, Some _ -> lprintf_nl "no bitmap but client_uploader";
1150 | Some _ , None ->lprintf_nl "bitmap but no client_uploader";
1151 | None, None -> lprintf_nl "no bitmap no client_uploader";
1156 | Interested ->
1157 c.client_interested <- true;
1159 | Choke ->
1160 begin
1161 set_client_state (c) (Connected (-1));
1162 (* remote peer will clear the list of range we sent *)
1163 begin
1164 match c.client_uploader with
1165 None ->
1166 (* Afaik this is no protocol violation and happens if the client
1167 didn't send a client bitmap after the handshake. *)
1168 let (ip,port) = c.client_host in
1169 if !verbose_msg_clients then lprintf_file_nl (as_file file) "%s:%d with software %s : Choke send, but no client bitmap"
1170 (Ip.to_string ip) port (brand_to_string c.client_brand)
1171 | Some up ->
1172 CommonSwarming.clear_uploader_intervals up
1173 end;
1174 c.client_ranges_sent <- [];
1175 c.client_range_waiting <- None;
1176 c.client_choked <- true;
1179 | NotInterested ->
1180 c.client_interested <- false;
1182 | Unchoke ->
1183 begin
1184 c.client_choked <- false;
1185 (* remote peer cleared our request : re-request *)
1186 for i = 1 to max_range_requests -
1187 List.length c.client_ranges_sent do
1188 (try get_from_client sock c with _ -> ())
1189 done
1192 | Request (n, pos, len) ->
1193 if len > max_request_len then begin
1194 close sock (Closed_for_error "Request longer than 1<<16");
1195 raise Exit
1196 end;
1198 if !CommonGlobals.has_upload = 0 then
1199 begin
1200 if client_has_a_slot (as_client c) then
1201 begin
1202 (* lprintf "Received request for upload\n"; *)
1203 (match c.client_upload_requests with
1204 [] ->
1205 CommonUploads.ready_for_upload (as_client c);
1206 | _ -> ());
1207 c.client_upload_requests <- c.client_upload_requests @ [n,pos,len];
1208 let file = c.client_file in
1209 match file.file_shared with
1210 None -> ()
1211 | Some s ->
1212 begin
1213 s.impl_shared_requests <- s.impl_shared_requests + 1;
1214 shared_must_update (as_shared s)
1217 else
1218 begin
1219 send_client c Choke;
1220 c.client_sent_choke <- true;
1221 c.client_upload_requests <- [];
1223 end;
1225 | Ping -> ()
1226 (* We don't 'generate' a Ping message on a Ping. *)
1228 | Cancel (n, pos, len) ->
1229 (* if we receive a cancel message from a peer, remove request *)
1230 if client_has_a_slot (as_client c) then
1231 c.client_upload_requests <- List2.remove_first (n, pos, len) c.client_upload_requests
1232 else
1233 if !verbose_msg_clients then
1234 lprintf_file_nl (as_file file) "Error: received cancel request but client has no slot"
1236 with e ->
1237 lprintf_file_nl (as_file file) "Error %s while handling MESSAGE: %s" (Printexc2.to_string e) (TcpMessages.to_string msg)
1240 (** The function used to connect to a client.
1241 The connection is not immediately initiated. It will
1242 be put in a fifo and dequeued according to
1243 !!max_connections_per_second. (@see commonGlobals.ml)
1244 @param c The client we must connect
1246 let connect_client c =
1247 if can_open_connection connection_manager &&
1248 (let (ip,port) = c.client_host in
1249 match !Ip.banned (ip, c.client_country_code) with
1250 None -> true
1251 | Some reason ->
1252 if !verbose_connect then
1253 lprintf_nl "%s:%d (%s), blocked: %s"
1254 (Ip.to_string ip) port
1255 (fst (Geoip.get_country_code_name c.client_country_code))
1256 reason;
1257 false)
1258 then
1259 match c.client_sock with
1260 NoConnection ->
1262 let token =
1263 add_pending_connection connection_manager (fun token ->
1265 if !verbose_msg_clients then
1266 lprintf_nl "CLIENT %d: connect_client" (client_num c);
1267 let (ip,port) = c.client_host in
1268 if !verbose_msg_clients then
1269 lprintf_nl "connecting %s:%d" (Ip.to_string ip) port;
1270 connection_try c.client_connection_control;
1271 begin
1272 let sock = connect token "bittorrent download"
1273 (Ip.to_inet_addr ip) port
1274 (fun sock event ->
1275 match event with
1276 BASIC_EVENT LTIMEOUT ->
1277 if !verbose_msg_clients then
1278 lprintf_nl "CLIENT %d: LIFETIME" (client_num c);
1279 close sock Closed_for_timeout
1280 | BASIC_EVENT RTIMEOUT ->
1281 if !verbose_msg_clients then
1282 lprintf_nl "CLIENT %d: RTIMEOUT (%d)" (client_num c)
1283 (last_time ())
1285 close sock Closed_for_timeout
1286 | BASIC_EVENT (CLOSED r) ->
1287 begin
1288 match c.client_sock with
1289 | Connection s when s == sock ->
1290 disconnect_client c r
1291 | _ -> ()
1292 end;
1293 | _ -> ()
1296 c.client_sock <- Connection sock;
1297 set_lifetime sock 600.;
1298 TcpBufferedSocket.set_read_controler sock download_control;
1299 TcpBufferedSocket.set_write_controler sock upload_control;
1300 TcpBufferedSocket.set_rtimeout sock 30.;
1301 let file = c.client_file in
1303 if !verbose_msg_clients then
1304 lprintf_file_nl (as_file file) "READY TO DOWNLOAD FILE";
1306 send_init !!client_uid file.file_id sock;
1307 (* Fabrice: Initialize the client bitmap and uploader fields to <> None *)
1308 update_client_bitmap c;
1309 (* (try get_from_client sock c with _ -> ());*)
1310 incr counter;
1311 (*We 'hook' the client_parse_header function to the socket
1312 This function will then be called when the first message will
1313 be parsed
1315 set_bt_sock sock !verbose_msg_clients
1316 (BTHeader (client_parse_header !counter (ref ((Some c), c.client_country_code)) true))
1318 with e ->
1319 lprintf_nl "Exception %s while connecting to client"
1320 (Printexc2.to_string e);
1321 disconnect_client c (Closed_for_exception e)
1323 (*Since this is a pending connection put ConnectionWaiting
1324 in client_sock
1327 c.client_sock <- ConnectionWaiting token
1328 | _ -> ()
1331 (** The Listen function (very much like in C : TCP Socket Server).
1332 Monitors client connection to us.
1334 let listen () =
1336 let s = TcpServerSocket.create "bittorrent client server"
1337 (Ip.to_inet_addr !!client_bind_addr)
1338 !!client_port
1339 (fun sock event ->
1340 match event with
1341 TcpServerSocket.CONNECTION (s,
1342 Unix.ADDR_INET(from_ip, from_port)) ->
1343 (*Receiving an event TcpServerSocket.CONNECTION from
1344 the TcpServerSocket means that a new client try
1345 to connect to us
1347 let ip = (Ip.of_inet_addr from_ip) in
1348 let cc = Geoip.get_country_code_option ip in
1349 if !verbose_sources > 1 then lprintf_nl "CONNECTION RECEIVED FROM %s"
1350 (Ip.to_string (Ip.of_inet_addr from_ip))
1352 (*Reject this connection if we don't want
1353 to bypass the max_connection parameter
1355 if can_open_connection connection_manager &&
1356 (match !Ip.banned (ip, cc) with
1357 None -> true
1358 | Some reason ->
1359 if !verbose_connect then
1360 lprintf_nl "%s:%d (%s) blocked: %s"
1361 (Ip.to_string ip) from_port
1362 (fst (Geoip.get_country_code_name cc))
1363 reason;
1364 false)
1365 then
1366 begin
1367 let token = create_token connection_manager in
1368 let sock = TcpBufferedSocket.create token
1369 "bittorrent client connection" s
1370 (fun sock event ->
1371 match event with
1372 BASIC_EVENT (RTIMEOUT|LTIMEOUT) ->
1373 (*monitor read and life timeout on client
1374 sockets
1376 close sock Closed_for_timeout
1377 | _ -> ()
1380 TcpBufferedSocket.set_read_controler sock download_control;
1381 TcpBufferedSocket.set_write_controler sock upload_control;
1383 let c = ref (None, cc) in
1384 TcpBufferedSocket.set_closer sock (fun _ r ->
1385 match fst !c with
1386 Some c -> begin
1387 match c.client_sock with
1388 | Connection s when s == sock ->
1389 disconnect_client c r
1390 | _ -> ()
1392 | None -> ()
1394 set_rtimeout sock 30.;
1395 incr counter;
1396 (*Again : 'hook' client_parse_header to the socket*)
1397 set_bt_sock sock !verbose_msg_clients
1398 (BTHeader (client_parse_header !counter c false));
1400 else
1401 (*don't forget to close the incoming sock if we can't
1402 open a new connection
1404 Unix.close s
1405 | _ -> ()
1406 ) in
1407 listen_sock := Some s;
1409 with e ->
1410 if !verbose_connect then
1411 lprintf_nl "Exception %s while init bittorrent server"
1412 (Printexc2.to_string e)
1415 (** This function send keepalive messages to all connected clients
1416 (and update socket lifetime)
1418 let send_pings () =
1419 List.iter (fun file ->
1420 Hashtbl.iter (fun _ c ->
1421 match c.client_sock with
1422 | Connection sock ->
1423 send_client c Ping;
1424 set_lifetime sock 130.;
1425 | _ -> ()
1426 ) file.file_clients
1427 ) !current_files
1429 open Bencode
1432 (** Check each clients for a given file if they are connected.
1433 If they aren't, try to connect them
1435 let resume_clients file =
1436 Hashtbl.iter (fun _ c ->
1438 match c.client_sock with
1439 | Connection sock -> ()
1440 (*i think this one is not really usefull for debugging
1441 lprintf_nl "[BT]: RESUME: Client is already connected"; *)
1442 | _ ->
1443 (try
1444 (*test if we can connect client according to the its
1445 connection_control.
1446 Currently the delay between two try is 120 seconds.
1448 if connection_can_try c.client_connection_control then
1449 connect_client c
1450 else
1451 print_control c.client_connection_control
1452 with _ -> ())
1453 with e ->
1454 if !verbose_connect then
1455 lprintf_file_nl (as_file file) "Exception %s in resume_clients" (Printexc2.to_string e)
1456 ) file.file_clients
1458 let () =
1459 resume_clients_hook := resume_clients
1461 (** Check if the value replied by the tracker is correct.
1462 @param key the name of the key
1463 @param n the value to check
1464 @param url Url of the tracker
1465 @param name the name of the file
1467 let chk_keyval key n url name =
1468 let int_n = (Int64.to_int n) in
1469 if !verbose_msg_clients then
1470 lprintf_nl "Reply from %s in file: %s has %s: %d" (show_tracker_url url) name key int_n;
1471 if int_n > -1 then
1472 int_n
1473 else begin
1474 lprintf_nl "Reply from %s in file: %s has an invalid %s value: %d" (show_tracker_url url) name key int_n;
1478 let exn_catch f x = try `Ok (f x) with exn -> `Exn exn
1480 (** In this function we interact with the tracker
1481 @param file The file for which we want some sources
1482 @param need_sources whether we need any sources
1484 let talk_to_tracker file need_sources =
1485 (* This is the function which will be called by the http client for parsing the response *)
1486 let f t filename =
1487 let tracker_url = show_tracker_url t.tracker_url in
1488 let tracker_failed reason =
1489 (* On failure, disable the tracker and count attempts (@see is_tracker_enabled) *)
1490 let num = match t.tracker_status with | Disabled_failure (i,_) -> i + 1 | _ -> 1 in
1491 t.tracker_status <- Disabled_failure (num, intern reason);
1492 lprintf_file_nl (as_file file) "Failure no. %d%s from Tracker %s for file: %s Reason: %s"
1494 (if !!tracker_retries = 0 then "" else Printf.sprintf "/%d" !!tracker_retries)
1495 tracker_url file.file_name (Charset.Locale.to_utf8 reason)
1497 match exn_catch File.to_string filename with
1498 | `Exn _ | `Ok "" -> tracker_failed "empty reply"
1499 | `Ok s ->
1500 match exn_catch Bencode.decode s with
1501 | `Exn exn -> tracker_failed (Printf.sprintf "wrong reply (%s)" (Printexc2.to_string exn))
1502 | `Ok (Dictionary list) ->
1503 t.tracker_interval <- 600;
1504 t.tracker_min_interval <- 600;
1505 if need_sources then t.tracker_last_clients_num <- 0;
1506 let chk_keyval key n = chk_keyval key n t.tracker_url file.file_name in
1507 if not (List.mem_assoc "failure reason" list) then
1508 begin
1509 begin match t.tracker_status with
1510 | Disabled_failure (i, _) ->
1511 lprintf_file_nl (as_file file) "Received good message from Tracker %s after %d bad attempts"
1512 tracker_url i
1513 | _ -> () end;
1514 (* Received good message from tracker after failures, re-enable tracker *)
1515 t.tracker_status <- Enabled;
1516 end;
1517 List.iter (fun (key,value) ->
1518 match (key,value) with
1519 | "failure reason", String failure -> tracker_failed failure
1520 | "warning message", String warning ->
1521 lprintf_file_nl (as_file file) "Warning from Tracker %s in file: %s Reason: %s"
1522 tracker_url file.file_name warning
1523 | "interval", Int n ->
1524 t.tracker_interval <- chk_keyval key n;
1525 (* in case we don't receive "min interval" *)
1526 if t.tracker_min_interval > t.tracker_interval then
1527 t.tracker_min_interval <- t.tracker_interval
1528 | "min interval", Int n ->
1529 t.tracker_min_interval <- chk_keyval key n;
1530 (* make sure "min interval" is always < or equal to "interval" *)
1531 if t.tracker_min_interval > t.tracker_interval then
1532 t.tracker_min_interval <- t.tracker_interval
1533 | "downloaded", Int n ->
1534 t.tracker_torrent_downloaded <- chk_keyval key n
1535 | "complete", Int n
1536 | "done peers", Int n ->
1537 t.tracker_torrent_complete <- chk_keyval key n
1538 | "incomplete", Int n ->
1539 t.tracker_torrent_incomplete <- chk_keyval key n;
1540 (* if complete > 0 and we receive incomplete we probably won't receive num_peers so we simulate it below *)
1541 if t.tracker_torrent_complete > 0 then
1542 t.tracker_torrent_total_clients_count <- (t.tracker_torrent_complete + t.tracker_torrent_incomplete);
1543 | "num peers", Int n ->
1544 t.tracker_torrent_total_clients_count <- chk_keyval key n;
1545 (* if complete > 0 and we receive num_peers we probably won't receive incomplete so we simulate it below *)
1546 if t.tracker_torrent_complete > 0 then
1547 t.tracker_torrent_incomplete <- (t.tracker_torrent_total_clients_count - t.tracker_torrent_complete);
1548 | "last", Int n ->
1549 t.tracker_torrent_last_dl_req <- chk_keyval key n
1550 | "key", String n ->
1551 t.tracker_key <- n;
1552 if !verbose_msg_clients then
1553 lprintf_file_nl (as_file file) "%s in file: %s has key: %s" tracker_url file.file_name n
1554 | "tracker id", String n ->
1555 t.tracker_id <- n;
1556 if !verbose_msg_clients then
1557 lprintf_file_nl (as_file file) "%s in file: %s has tracker id %s" tracker_url file.file_name n
1559 | "peers", List list ->
1560 if need_sources then
1561 List.iter (fun v ->
1562 match v with
1563 | Dictionary list ->
1564 let peer_id = ref Sha1.null in
1565 let peer_ip = ref Ip.null in
1566 let port = ref 0 in
1568 List.iter (fun v ->
1569 match v with
1570 "peer id", String id ->
1571 peer_id := Sha1.direct_of_string id;
1572 | "ip", String ip ->
1573 peer_ip := Ip.of_string ip
1574 | "port", Int p ->
1575 port := Int64.to_int p
1576 | _ -> ()
1577 ) list;
1579 t.tracker_last_clients_num <- t.tracker_last_clients_num + 1;
1580 maybe_new_client file !peer_id !peer_ip !port
1582 | _ -> assert false
1583 ) list
1584 | "peers", String p ->
1585 let rec iter_comp s pos l =
1586 if pos < l then
1587 let ip = Ip.of_ints (get_uint8 s pos,get_uint8 s (pos+1),
1588 get_uint8 s (pos+2),get_uint8 s (pos+3))
1589 and port = get_int16 s (pos+4)
1591 t.tracker_last_clients_num <- t.tracker_last_clients_num + 1;
1592 maybe_new_client file Sha1.null ip port;
1594 iter_comp s (pos+6) l
1596 if need_sources then
1597 iter_comp p 0 (String.length p)
1598 | "private", Int n -> ()
1599 (* TODO: if set to 1, disable peer exchange *)
1601 | key, _ -> lprintf_file_nl (as_file file) "received unknown entry in answer from tracker: %s : %s" key (Bencode.print value)
1602 ) list;
1603 (*Now, that we have added new clients to a file, it's time
1604 to connect to them*)
1605 if !verbose_sources > 0 then
1606 lprintf_file_nl (as_file file) "talk_to_tracker: got %i source(s) for file %s"
1607 t.tracker_last_clients_num file.file_name;
1608 if need_sources then resume_clients file
1610 | _ -> tracker_failed "wrong reply (value)"
1612 let event =
1613 if file.file_tracker_connected then ""
1614 else "started"
1616 connect_trackers file event need_sources f
1619 (** Check to see if file is finished, if not
1620 try to get sources for it
1622 let recover_files () =
1623 if !verbose_share then
1624 lprintf_nl "recover_files";
1625 List.iter (fun file ->
1626 match file.file_swarmer with
1627 None -> ()
1628 | Some swarmer ->
1629 (try check_finished swarmer file with e -> ());
1630 match file_state file with
1631 FileDownloading ->
1632 if !verbose_share then
1633 lprintf_file_nl (as_file file) "recover downloading";
1634 (try talk_to_tracker file true with _ -> ())
1635 | FileShared ->
1636 if !verbose_share then
1637 lprintf_file_nl (as_file file) "recover shared";
1638 (try talk_to_tracker file false with _ -> ())
1639 | FilePaused -> () (*when we are paused we do nothing, not even logging this vvvv*)
1640 | FileQueued -> ()
1641 | s -> if !verbose then lprintf_file_nl (as_file file) "recover: Other state %s!!" (string_of_state s)
1642 ) !current_files
1644 let upload_buffer = String.create 100000
1648 Send a Piece message
1649 for one of the request of client
1650 @param sock The socket of the client
1651 @param c The client
1653 let rec iter_upload sock c =
1654 match c.client_upload_requests with
1655 [] -> ()
1656 | (num, pos, len) :: tail ->
1657 if len = zero then begin
1658 c.client_upload_requests <- tail;
1659 iter_upload sock c
1660 end else
1661 if c.client_allowed_to_write >= 0L then begin
1663 c.client_upload_requests <- tail;
1665 let file = c.client_file in
1666 let offset = pos ++ file.file_piece_size *.. num in
1667 c.client_allowed_to_write <- c.client_allowed_to_write -- len;
1668 count_upload c len;
1669 let len = Int64.to_int len in
1670 (* lprintf "Unix32.read: offset %Ld len %d\n" offset len; *)
1671 Unix32.read (file_fd file) offset upload_buffer 0 len;
1672 (* update upload rate from len bytes *)
1673 Rate.update c.client_upload_rate ~amount:len;
1674 Rate.update c.client_downloaded_rate;
1675 file.file_uploaded <- file.file_uploaded ++ (Int64.of_int len);
1676 file.file_session_uploaded <- file.file_session_uploaded ++ (Int64.of_int len);
1677 let _ =
1678 (* update stats *)
1679 count_filerequest c;
1680 match file.file_shared with
1681 None -> ()
1682 | Some s ->
1683 begin
1684 s.impl_shared_uploaded <- file.file_uploaded;
1685 shared_must_update (as_shared s)
1688 (* lprintf "sending piece\n"; *)
1689 send_client c (Piece (num, pos, upload_buffer, 0, len));
1690 iter_upload sock c
1691 with e ->
1692 if !verbose then
1693 lprintf_nl "Exception %s in iter_upload" (Printexc2.to_string e)
1694 end else
1695 begin
1696 (* lprintf "client is waiting for another piece\n"; *)
1697 ready_for_upload (as_client c)
1702 In this function we check if we can send bytes (according
1703 to bandwidth control), if we can, call iter_upload to
1704 send a Piece message
1705 @param c the client to which we can send some bytes
1706 @param allowed the amount of bytes we can send to client
1708 let client_can_upload c allowed =
1709 (* lprintf "allowed to upload %d\n" allowed; *)
1710 do_if_connected c.client_sock (fun sock ->
1711 match c.client_upload_requests with
1712 [] -> ()
1713 | _ :: tail ->
1714 let new_allowed_to_write =
1715 c.client_allowed_to_write ++ (Int64.of_int allowed) in
1716 if allowed > 0 && can_write_len sock
1717 (Int64.to_int new_allowed_to_write)
1718 then begin
1719 CommonUploads.consume_bandwidth allowed;
1720 c.client_allowed_to_write <- new_allowed_to_write;
1721 end;
1722 iter_upload sock c
1725 let file_resume file =
1726 List.iter (fun t ->
1727 match t.tracker_status with
1728 | Enabled | Disabled_mld _ -> ()
1729 | Disabled_failure _ | Disabled _ -> t.tracker_status <- Enabled
1730 ) file.file_trackers;
1731 (try talk_to_tracker file true with _ -> ())
1736 Send info to tracker when stopping a file.
1737 @param file the file we want to stop
1739 let file_stop file =
1740 if file.file_tracker_connected then
1741 begin
1742 connect_trackers file "stopped" false (fun _ _ ->
1743 lprintf_file_nl (as_file file) "Tracker return: stopped %s" file.file_name;
1744 file.file_tracker_connected <- false)
1748 Create the 'hooks'
1750 let _ =
1751 client_ops.op_client_can_upload <- client_can_upload;
1752 file_ops.op_file_resume <- file_resume;
1753 file_ops.op_file_recover <- file_resume;
1754 file_ops.op_file_pause <- (fun file ->
1755 Hashtbl.iter (fun _ c ->
1756 match c.client_sock with
1757 Connection sock -> close sock Closed_by_user
1758 | _ -> ()
1759 ) file.file_clients;
1760 (*When a file is paused we consider it is stopped*)
1761 file_stop file
1763 file_ops.op_file_queue <- file_ops.op_file_pause;
1764 client_ops.op_client_enter_upload_queue <- (fun c ->
1765 if !verbose_msg_clients then
1766 lprintf_nl "Client %d: client_enter_upload_queue" (client_num c);
1767 ready_for_upload (as_client c));
1768 network.op_network_connected_servers <- (fun _ -> []);