patch #8326
[mldonkey.git] / src / networks / bittorrent / bTClients.ml
blob01ddd521e12eb3cee0d7f4ca42d80e8e149f0495
1 (* Copyright 2001, 2002 b52_simon :), b8_bavard, b8_fee_carabine, INRIA *)
2 (*
3 This file is part of mldonkey.
5 mldonkey is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation; either version 2 of the License, or
8 (at your option) any later version.
10 mldonkey is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 GNU General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with mldonkey; if not, write to the Free Software
17 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
21 (** Functions used in client<->client communication
22 and also client<->tracker
25 (** A peer (or client) is always a remote peer in this file.
26 A Piece is a portion of the file associated with a hash (sha1).
27 In mldonkey a piece is referred as a block inside the swarming system.
28 A SubPiece is a portion of a piece (without hash) which can be
29 sent/downloaded to/from a peer.
30 In mldonkey a SubPiece is referred as a range inside the swarming system.
31 @see <http://wiki.theory.org/index.php/BitTorrentSpecification> wiki for some
32 unofficial (but more detailed) specs.
35 open Int64ops
36 open AnyEndian
37 open BigEndian
38 open Printf2
39 open Md4
40 open Options
41 open BasicSocket
42 open TcpBufferedSocket
43 open Ip_set
45 open CommonShared
46 open CommonUploads
47 open CommonOptions
48 open CommonDownloads
49 open CommonInteractive
50 open CommonClient
51 open CommonComplexOptions
52 open CommonTypes
53 open CommonFile
54 open CommonSwarming
55 open CommonGlobals
56 open CommonDownloads
58 open BTRate
59 open BTTypes
60 open BTProtocol
61 open BTOptions
62 open BTGlobals
63 open BTComplexOptions
64 open BTChooser
65 open BTStats
66 open TcpMessages
68 module VB = VerificationBitmap
70 let http_ok = "HTTP 200 OK"
71 let http11_ok = "HTTP/1.1 200 OK"
74 let next_uploaders = ref ([] : BTTypes.client list)
75 let current_uploaders = ref ([] : BTTypes.client list)
77 (** Check that client is valid and record it *)
78 let maybe_new_client file id ip port =
79 let cc = Geoip.get_country_code_option ip in
80 if id <> !!client_uid
81 && ip != Ip.null
82 && port <> 0
83 && (match !Ip.banned (ip, cc) with
84 | None -> true
85 | Some reason ->
86 if !verbose_connect then
87 lprintf_file_nl (as_file file) "%s:%d blocked: %s" (Ip.to_string ip) port reason;
88 false)
89 then
90 ignore (new_client file id (ip,port) cc);
91 if !verbose_sources > 1 then
92 lprintf_file_nl (as_file file) "Received %s:%d" (Ip.to_string ip) port
95 let resume_clients_hook = ref (fun _ -> assert false)
97 include struct
99 (* open modules locally *)
100 open BTUdpTracker
101 open UdpSocket
103 let string_of_event = function
104 | READ_DONE -> "READ_DONE"
105 | WRITE_DONE -> "WRITE_DONE"
106 | CAN_REFILL -> "CAN_REFILL"
107 | BASIC_EVENT e -> match e with
108 | CLOSED reason -> "CLOSED " ^ (string_of_reason reason)
109 | RTIMEOUT -> "RTIMEOUT"
110 | WTIMEOUT -> "WTIMEOUT"
111 | LTIMEOUT -> "LTIMEOUT"
112 | CAN_READ -> "CAN_READ"
113 | CAN_WRITE -> "CAN_WRITE"
115 (** talk to udp tracker and parse response
116 except of parsing should perform everything that
117 talk_to_tracker's inner function does FIXME refactor both
119 Better create single global udp socket and use it for all
120 tracker requests and distinguish trackers by txn? FIXME?
122 let talk_to_udp_tracker host port args file t need_sources =
123 let interact ip =
124 let socket = create (Ip.to_inet_addr !!client_bind_addr) 0 (fun sock event ->
125 (* lprintf_nl "udpt got event %s for %s" (string_of_event event) host; *)
126 match event with
127 | WRITE_DONE | CAN_REFILL -> ()
128 | READ_DONE -> assert false (* set_reader prevents this *)
129 | BASIC_EVENT x -> match x with
130 | CLOSED _ -> ()
131 | CAN_READ | CAN_WRITE -> assert false (* udpSocket implementation prevents this *)
132 | LTIMEOUT | WTIMEOUT | RTIMEOUT -> close sock (Closed_for_error "udpt timeout"))
134 let set_reader f =
135 set_reader socket begin fun _ ->
136 try f () with exn ->
137 lprintf_nl "udpt interact exn %s" (Printexc2.to_string exn);
138 close socket (Closed_for_exception exn)
141 BasicSocket.set_wtimeout (sock socket) 60.;
142 BasicSocket.set_rtimeout (sock socket) 60.;
143 let txn = Random.int32 Int32.max_int in
144 (* lprintf_nl "udpt txn %ld for %s" txn host; *)
145 write socket false (connect_request txn) ip port;
146 set_reader begin fun () ->
147 let p = read socket in
148 let conn = connect_response p.udp_content txn in
149 (* lprintf_nl "udpt connection_id %Ld for %s" conn host; *)
150 let txn = Random.int32 Int32.max_int in
151 (* lprintf_nl "udpt txn' %ld for host %s" txn host; *)
152 let int s = Int64.of_string (List.assoc s args) in
153 let req = announce_request conn txn
154 ~info_hash:(List.assoc "info_hash" args)
155 ~peer_id:(List.assoc "peer_id" args)
156 (int "downloaded",int "left",int "uploaded")
157 (match try List.assoc "event" args with Not_found -> "" with
158 | "completed" -> 1l
159 | "started" -> 2l
160 | "stopped" -> 3l
161 | "" -> 0l
162 | s -> lprintf_nl "udpt event %s? for %s" s host; 0l)
163 ~ip:(if !!force_client_ip then (Int64.to_int32 (Ip.to_int64 !!set_client_ip)) else 0l)
164 ~numwant:(if need_sources then try Int32.of_string (List.assoc "numwant" args) with _ -> -1l else 0l)
165 (int_of_string (List.assoc "port" args))
167 write socket false req ip port;
168 set_reader (fun () ->
169 let p = read socket in
171 t.tracker_last_conn <- last_time ();
172 file.file_tracker_connected <- true;
173 t.tracker_interval <- 600;
174 t.tracker_min_interval <- 600;
175 if need_sources then t.tracker_last_clients_num <- 0;
177 let (interval,clients) = announce_response p.udp_content txn in
178 if !verbose_msg_servers then
179 lprintf_nl "udpt got interval %ld clients %d for host %s" interval (List.length clients) host;
180 if interval > 0l then
181 begin
182 t.tracker_interval <- Int32.to_int interval;
183 if t.tracker_min_interval > t.tracker_interval then
184 t.tracker_min_interval <- t.tracker_interval
185 end;
186 if need_sources then
187 List.iter (fun (ip',port) ->
188 let ip = Ip.of_int64 (Int64.logand 0xFFFFFFFFL (Int64.of_int32 ip')) in
189 (* lprintf_nl "udpt got %s:%d" (Ip.to_string ip) port; *)
190 t.tracker_last_clients_num <- t.tracker_last_clients_num + 1;
191 maybe_new_client file Sha1.null ip port
192 ) clients;
193 close socket Closed_by_user;
194 if !verbose_msg_servers then
195 lprintf_nl "udpt interact done for %s" host;
196 if need_sources then !resume_clients_hook file
197 ) end
200 if !verbose_msg_servers then
201 lprintf_nl "udpt start with %s:%d" host port;
202 Ip.async_ip host (fun ip ->
203 (* lprintf_nl "udpt resolved %s to ip %s" host (Ip.to_string ip); *)
204 if not (Ip.equal Ip.localhost ip) then
205 try interact ip with exn -> lprintf_nl "udpt interact exn %s" (Printexc2.to_string exn)
206 else if !verbose_msg_servers then
207 lprintf_nl "udpt ignoring tracker %s (resolves to localhost)" host)
208 (fun () ->
209 if !verbose_msg_servers then
210 lprintf_nl "udpt failed to resolve %s" host)
211 with
212 exn ->
213 lprintf_nl "udpt start exn %s" (Printexc2.to_string exn)
215 end (* include *)
218 In this function we connect to a tracker.
219 @param file The file concerned by the request
220 @param url Url of the tracker to connect
221 @param event Event (as a string) to send to the tracker :
222 can be 'completed' if the file is complete, 'started' for the first
223 connection to this tracker or 'stopped' for a clean stop of the file.
224 Everything else will be ok for a second connection to the tracker.
225 Be careful to the spelling of this event
226 @param f The function used to parse the result of the connection.
227 The function will get a file as an argument (@see talk_to_tracker
228 for an example)
230 If we have less than !!ask_tracker_threshold sources
231 and if we respect the file_tracker_interval then
232 we really ask sources to the tracker
234 let connect_trackers file event need_sources f =
236 (* reset session statistics when sending 'started' event *)
237 if event = "started" then
238 begin
239 file.file_session_uploaded <- Int64.zero;
240 file.file_session_downloaded <- Int64.zero;
241 end;
243 let args,must_check_delay, left =
245 match file.file_swarmer with
246 None ->
247 begin
248 match event with
249 | "started" -> [("event", "started")],true,zero
250 | "stopped" -> [("event", "stopped")],false,zero
251 | _ -> [],true,zero
254 | Some swarmer ->
255 let local_downloaded = CommonSwarming.downloaded swarmer in
256 let left = file_size file -- local_downloaded in
257 match event with
258 | "completed" -> [("event", "completed")],false,zero
259 | "started" -> [("event", "started")],true,left
260 | "stopped" -> [("event", "stopped")],false,left
261 | _ -> [],true,left
264 let args = ("no_peer_id", "1") :: ("compact", "1") :: args in
265 let args =
266 if not need_sources then
267 ("numwant", "0") :: args
268 else if !!numwant > -1 then
269 ("numwant", string_of_int !!numwant) :: args
270 else
271 args
273 let args = if !!send_key then
274 ("key", Sha1.to_hexa !!client_uid) :: args else args
276 let args = if !!force_client_ip then
277 ("ip", Ip.to_string !!set_client_ip) :: args else args
279 let args =
280 ("info_hash", Sha1.direct_to_string file.file_id) ::
281 ("peer_id", Sha1.direct_to_string !!client_uid) ::
282 ("port", string_of_int !!client_port) ::
283 ("uploaded", Int64.to_string file.file_session_uploaded) ::
284 ("downloaded", Int64.to_string file.file_session_downloaded) ::
285 ("left", Int64.to_string left) ::
286 args
289 let enabled_trackers =
290 let enabled_trackers = List.filter (fun t -> tracker_is_enabled t) file.file_trackers in
291 if enabled_trackers <> [] then enabled_trackers
292 else begin
293 (* if there is no tracker left, do something ? *)
294 if !verbose_msg_servers then
295 lprintf_nl "No trackers left for %s, reenabling all of them..." (file_best_name (as_file file));
296 List.iter (fun t ->
297 match t.tracker_status with
298 (* only re-enable after normal error *)
299 | Disabled _ -> t.tracker_status <- Enabled
300 | _ -> ()) file.file_trackers;
301 List.filter (fun t -> tracker_is_enabled t) file.file_trackers
302 end in
304 List.iter (fun t ->
306 (* if we have too few sources we may ask the tracker before the interval *)
307 if not must_check_delay
308 || not file.file_tracker_connected
309 || t.tracker_last_conn + t.tracker_interval < last_time()
310 || ( file.file_clients_num < !!ask_tracker_threshold
311 && (file_state file) == FileDownloading
312 && (if t.tracker_min_interval > !!min_tracker_reask_interval then
313 t.tracker_last_conn + t.tracker_min_interval < last_time()
314 else
315 t.tracker_last_conn + !!min_tracker_reask_interval < last_time() ))
316 then
317 begin
318 (* if we already tried to connect but failed, disable tracker, but allow re-enabling *)
319 (* FIXME t.tracker_last_conn < 1 only at first connect, so later failures will stay undetected! *)
320 if file.file_tracker_connected && t.tracker_last_clients_num = 0 && t.tracker_last_conn < 1 then
321 begin
322 if !verbose_msg_servers then
323 lprintf_nl "Request error from tracker: disabling %s" (show_tracker_url t.tracker_url);
324 t.tracker_status <- Disabled (intern "MLDonkey: Request error from tracker")
326 (* Send request to tracker *)
327 else
328 let args = if String.length t.tracker_id > 0 then
329 ("trackerid", t.tracker_id) :: args else args
331 let args = if String.length t.tracker_key > 0 then
332 ("key", t.tracker_key) :: args else args
334 if !verbose_msg_servers then
335 lprintf_nl "connect_trackers: connected:%s id:%s key:%s last_clients:%i last_conn-last_time:%i numwant:%s file: %s"
336 (string_of_bool file.file_tracker_connected)
337 t.tracker_id t.tracker_key t.tracker_last_clients_num
338 (t.tracker_last_conn - last_time()) (try List.assoc "numwant" args with _ -> "_") file.file_name;
340 match t.tracker_url with
341 | `Http url ->
342 let module H = Http_client in
343 let r = {
344 H.basic_request with
345 H.req_url = Url.of_string ~args url;
346 H.req_proxy = !CommonOptions.http_proxy;
347 H.req_user_agent = get_user_agent ();
348 (* #4541 [egs] supports redirect *)
349 H.req_max_retry = !!max_tracker_redirect;
350 H.req_filter_ip = (fun ip -> not (Ip.equal Ip.localhost ip));
351 } in
353 if !verbose_msg_servers then
354 lprintf_nl "Request sent to tracker %s for file: %s"
355 url file.file_name;
356 H.wget r
357 (fun fileres ->
358 t.tracker_last_conn <- last_time ();
359 file.file_tracker_connected <- true;
360 f t fileres)
361 | `Other url -> assert false (* should have been disabled *)
362 | `Udp (host,port) -> talk_to_udp_tracker host port args file t need_sources
365 else
366 if !verbose_msg_servers then
367 lprintf_nl "Request NOT sent to tracker %s - next request in %ds for file: %s"
368 (show_tracker_url t.tracker_url) (t.tracker_interval - (last_time () - t.tracker_last_conn)) file.file_name
369 ) enabled_trackers
371 let connect_trackers file event need_sources f =
372 if !!use_trackers then connect_trackers file event need_sources f
374 let start_upload c =
375 set_client_upload (as_client c) (as_file c.client_file);
376 set_client_has_a_slot (as_client c) NormalSlot;
377 Rate.update_no_change c.client_downloaded_rate;
378 Rate.update_no_change c.client_upload_rate;
379 c.client_last_optimist <- last_time();
380 client_enter_upload_queue (as_client c);
381 send_client c Unchoke
383 (** In this function we decide which peers will be
384 uploaders. We send a choke message to current uploaders
385 that are not in the next uploaders list. We send Unchoke
386 for clients that are in next list (and not in current)
388 let recompute_uploaders () =
389 if !verbose_upload then lprintf_nl "recompute_uploaders";
390 next_uploaders := choose_uploaders current_files;
391 (*Send choke if a current_uploader is not in next_uploaders*)
392 List.iter ( fun c -> if ((List.mem c !next_uploaders)==false) then
393 begin
394 set_client_has_a_slot (as_client c) NoSlot;
395 (*we will let him finish his download and choke him on next_request*)
397 ) !current_uploaders;
399 (*don't send Choke if new uploader is already an uploaders *)
400 List.iter ( fun c ->
401 if not (List.mem c !current_uploaders) then start_upload c
402 ) !next_uploaders;
403 current_uploaders := !next_uploaders
406 (****** Fabrice: why are clients which are disconnected removed ???
407 These clients might still be useful to reconnect to, no ? *)
410 (** This function is called when a client is disconnected
411 (be it by our side or its side).
412 A client which disconnects (even only one time) is discarded.
413 If it's an uploader which disconnects we recompute uploaders
414 (see recompute_uploaders) immediately.
415 @param c The client to disconnect
416 @param reason The reason for the disconnection (see in BasicSocket.ml)
418 let disconnect_client c reason =
419 if !verbose_msg_clients then
420 lprintf_nl "Client %d: disconnected: %s" (client_num c) (string_of_reason reason);
421 begin
422 match c.client_sock with
423 NoConnection -> ()
424 | ConnectionWaiting token ->
425 cancel_token token;
426 c.client_sock <- NoConnection
427 | Connection sock ->
428 close sock reason;
430 (* List.iter (fun r -> CommonSwarming.free_range r) c.client_ranges; *)
431 set_client_disconnected c reason;
432 c.client_session_downloaded <- 0L;
433 c.client_session_uploaded <- 0L;
434 (try if c.client_good then count_seen c with _ -> ());
435 (* this is not useful already done in the match
436 (try close sock reason with _ -> ()); *)
437 (*---------not needed ?? VvvvvV---------------
438 c.client_ranges <- [];
439 c.client_block <- None;
440 if not c.client_good then
441 connection_failed c.client_connection_control;
442 c.client_good <- false;
443 c.client_sock <- NoConnection;
444 c.client_chunks <- [];
445 c.client_allowed_to_write <- zero;
446 c.client_new_chunks <- [];
447 c.client_interesting <- false;
448 c.client_alrd_sent_interested <- false;
449 -------------------^^^^^--------------------*)
450 if (c.client_registered_bitfield) then
451 begin
452 match c.client_uploader with
453 None -> ()
454 | Some up ->
455 c.client_uploader <- None;
456 (* If the client registered a bitfield then
457 we must unregister him to update the swarmer
458 (Useful for availability)
460 CommonSwarming.unregister_uploader up
461 (* c.client_registered_bitfield <- false;
462 for i = 0 to String.length c.client_bitmap - 1 do
463 c.client_bitmap.[0] <- '0';
464 done*)
465 end;
466 (* Don't test if a client have an upload slot because
467 it don't have one (removed during earlier in
468 set_client_disconnected c reason)
470 if (List.mem c !current_uploaders) then
471 begin
472 (*BTGlobals.remove_client*)
473 remove_client c;
474 recompute_uploaders ();
476 else
477 remove_client c;
478 with _ -> ()
482 (** Disconnect all clients of a file
483 @param file The file to which we must disconnects all clients
485 let disconnect_clients file =
486 let must_keep = ref true in
487 (match file_state file with
488 | FilePaused | FileCancelled -> must_keep:=false
489 | _-> ()
491 Hashtbl.iter (fun _ c ->
492 if not ( !must_keep && (client_has_a_slot (as_client c) || c.client_interested)) then
493 begin
494 if !verbose_msg_clients then
495 lprintf_file_nl (as_file file) "disconnect since download is finished";
496 disconnect_client c Closed_by_user
498 ) file.file_clients
501 (** What to do when a file is finished
502 @param file the finished file
504 let download_finished file =
505 if List.memq file !current_files then
506 begin
507 connect_trackers file "completed" false (fun _ _ ->
508 lprintf_file_nl (as_file file) "Tracker return: completed %s" file.file_name;
509 ()); (*must be called before swarmer gets removed from file*)
510 (*CommonComplexOptions.file_completed*)
511 file_completed (as_file file);
512 (* Remove the swarmer for this file as it is not useful anymore... *)
513 CommonSwarming.remove_swarmer file.file_swarmer;
514 file.file_swarmer <- None;
515 (* At this point, the file state is FileDownloaded. We should not remove
516 the file, because we continue to upload. *)
520 (** Check if a file is finished or not.
521 A file is finished if all blocks are verified.
522 @param file The file to check status
524 let check_finished swarmer file =
525 if CommonSwarming.check_finished swarmer then
526 download_finished file
528 let bits = [| 128; 64; 32;16;8;4;2;1 |]
530 (* Check/set bits in strings (bittorrent format) *)
532 let is_bit_set s n =
533 (Char.code s.[n lsr 3]) land bits.(n land 7) <> 0
535 let set_bit s n =
536 let i = n lsr 3 in
537 s.[i] <- Char.unsafe_chr (Char.code s.[i] lor bits.(n land 7))
539 (* Official client seems to use max_range_request 5 and max_range_len 2^14 *)
540 (* How much requests in the 'pipeline' *)
541 let max_range_requests = 5
542 (* How much bytes we can request in one Piece *)
544 let reserved () =
545 let s = String.make 8 '\x00' in
546 s.[7] <- (match !bt_dht with None -> '\x00' | Some _ -> '\x01');
547 s.[5] <- '\x10'; (* TODO bep9, bep10, notify clients about extended*)
550 (** handshake *)
551 let send_init client_uid file_id sock =
552 let buf = Buffer.create 100 in
553 buf_string8 buf "BitTorrent protocol";
554 Buffer.add_string buf (reserved ());
555 Buffer.add_string buf (Sha1.direct_to_string file_id);
556 Buffer.add_string buf (Sha1.direct_to_string client_uid);
557 let s = Buffer.contents buf in
558 write_string sock s
560 (** A wrapper to send Interested message to a client.
561 (Send interested only if needed)
562 @param c The client to send Interested
564 let send_interested c =
565 if c.client_interesting && (not c.client_alrd_sent_interested) then
566 begin
567 c.client_alrd_sent_interested <- true;
568 send_client c Interested
572 (** Send a Bitfield message to a client.
573 @param c The client to send the Bitfield message
576 let send_bitfield c =
577 if not c.client_file.file_metadata_downloading then
578 send_client c (BitField
580 match c.client_file.file_swarmer with
581 | None ->
582 (* This must be a seeded file... *)
583 if !verbose_download then
584 lprintf_nl "Sending completed verified bitmap";
585 let nchunks = Array.length c.client_file.file_chunks in
586 let len = (nchunks+7)/8 in
587 let s = String.make len '\000' in
588 for i = 0 to nchunks - 1 do
589 set_bit s i
590 done;
592 | Some swarmer ->
593 let bitmap = CommonSwarming.chunks_verified_bitmap swarmer in
594 if !verbose_download then
595 lprintf_nl "Sending verified bitmap: [%s]" (VB.to_string bitmap);
596 let len = (VB.length bitmap + 7)/8 in
597 let s = String.make len '\000' in
598 VB.iteri (fun i c ->
599 if c = VB.State_verified then set_bit s i) bitmap;
603 let counter = ref 0
605 let parse_reserved rbits c =
606 let has_bit pos h = Char.code rbits.[pos] land h <> 0 in
608 c.client_dht <- has_bit 7 0x01;
609 c.client_cache_extension <- has_bit 7 0x02;
610 c.client_fast_extension <- has_bit 7 0x04;
612 c.client_utorrent_extension <- has_bit 5 0x10;
614 c.client_azureus_messaging_protocol <- has_bit 0 0x80
616 let send_extended_handshake c file =
617 let module B = Bencode in
618 let msg = (B.encode (B.Dictionary [(* "e",B.Int 0L; *)
619 "m", (B.Dictionary ["ut_metadata", B.Int 1L]);
620 (* "metadata_size", B.Int (-1L) *)])) in begin
621 send_client c (Extended (Int64.to_int 0L, msg));
624 let send_extended_piece_request c piece file =
625 let module B = Bencode in
626 let msg = (B.encode (B.Dictionary ["msg_type", B.Int 0L; (* 0 is request subtype*)
627 "piece", B.Int piece; ])) in begin
628 send_client c (Extended (Int64.to_int c.client_ut_metadata_msg, msg));
631 let show_client c =
632 let (ip,port) = c.client_host in
633 Printf.sprintf "%s:%d %S" (Ip.to_string ip) port (brand_to_string c.client_brand)
635 (** This function is called to parse the first message that
636 a client send.
637 @param counter client num
638 @param cc Expected client (probably useless now that we don't save any client)
639 @param init_sent A boolean to know if we sent this client the handshake message
640 @param gconn Don't know
641 @param sock The socket we use for this client
642 @param proto Unused (required by tuple type?)
643 @param file_id The file hash (sha1) of the file involved in this exchange
645 (* removed: @param peer_id The hash (sha1) of the client. (Should be checked)
647 let rec client_parse_header counter cc init_sent gconn sock
648 (proto, rbits, file_id) =
650 set_lifetime sock 600.;
651 if !verbose_msg_clients then
652 lprintf_nl "client_parse_header %d" counter;
654 let file = Hashtbl.find files_by_uid file_id in
655 if !verbose_msg_clients then
656 lprintf_file_nl (as_file file) "file found";
657 let ccc, cc_country_code = !cc in
658 let c =
659 match ccc with
660 None ->
661 let c = new_client file Sha1.null (TcpBufferedSocket.peer_addr sock) cc_country_code in
662 if !verbose_connect then lprintf_file_nl (as_file file) "Client %d: incoming connection" (client_num c);
663 cc := (Some c), cc_country_code;
665 | Some c ->
666 (* Does it happen that this c was already used to connect successfully?
667 If yes then this must happen: *)
668 c.client_received_peer_id <- false;
669 if cc_country_code <> None && c.client_country_code = None then
670 c.client_country_code <- cc_country_code;
672 (* client could have had Sha1.null as peer_id/uid *)
673 (* this is to be done, later
674 if c.client_uid <> peer_id then
675 c.client_software <- (parse_software (Sha1.direct_to_string peer_id));
679 (* if c.client_uid <> peer_id then begin
680 lprintf "Unexpected client by UID\n";
681 let ccc = new_client file peer_id (TcpBufferedSocket.host sock) in
682 lprintf "CLIENT %d: testing instead of %d\n"
683 (client_num ccc) (client_num c);
684 (match ccc.client_sock with
685 Connection _ ->
686 lprintf_nl "[BT]: This client is already connected";
687 close sock (Closed_for_error "Already connected");
688 remove_client ccc;
690 | _ ->
691 lprintf_nl "[BT]: Client %d: recovered by UID" (client_num ccc);
692 remove_client c;
693 cc := Some ccc;
694 ccc)
695 end else
696 c *)
699 if !verbose_msg_clients then
700 lprintf_nl "Client %d: Connected from %s" (client_num c) (show_client c);
702 parse_reserved rbits c;
704 (match c.client_sock with
705 NoConnection ->
706 if !verbose_msg_clients then begin
707 let (ip,port) = c.client_host in
708 lprintf_nl "No connection to client (%s:%d)!!!" (Ip.to_string ip) port;
709 end;
710 c.client_sock <- Connection sock
711 | ConnectionWaiting token ->
712 cancel_token token;
713 if !verbose_msg_clients then
714 lprintf_nl "Waiting for connection to client !!!";
715 c.client_sock <- Connection sock
716 | Connection s when s != sock ->
717 if !verbose_msg_clients then
718 lprintf_nl "CLIENT %d: IMMEDIATE RECONNECTION" (client_num c);
719 disconnect_client c (Closed_for_error "Reconnected");
720 c.client_sock <- Connection sock;
721 | Connection _ -> ()
724 set_client_state (c) (Connected (-1));
725 if not init_sent then
726 begin
727 c.client_incoming <- true;
728 send_init !!client_uid file_id sock;
729 send_extended_handshake c file;
730 end;
731 connection_ok c.client_connection_control;
732 if !verbose_msg_clients then
733 lprintf_nl "file and client found";
734 (* if not c.client_incoming then *)
735 send_bitfield c; (* BitField is always the first message *)
736 begin match c.client_dht, !bt_dht with
737 | true, Some dht -> send_client c (DHT_Port dht.BT_DHT.M.dht_port)
738 | _ -> ()
739 end;
740 c.client_blocks_sent <- file.file_blocks_downloaded;
742 TODO !!! : send interested if and only if we are interested
743 -> we must recieve at least other peer bitfield.
744 in common swarmer -> compare : partition -> partition -> bool
747 set_rtimeout sock !!client_timeout;
748 (* Once parsed succesfully we define the function client_to_client
749 to be the function used when a message is read *)
750 gconn.gconn_handler <- Reader (fun gconn sock ->
751 bt_handler TcpMessages.parsing (client_to_client c) c sock
754 let b = TcpBufferedSocket.buf sock in
755 (* The receive buffer is normally not empty now, lets parse the rest, most likely PeerID *)
756 if b.len <> 0 then
757 ignore (bt_handler TcpMessages.parsing (client_to_client c) c sock);
759 (* Some newer clients send more opcodes in their handshake packet, lets parse them now.
760 Using "while b.len <> 0 do ... done" is not possible here because libtorrent clients
761 send unparsable five extra bytes after their PeerID which would result into a loop *)
762 if b.len <> 0 then
763 ignore (bt_handler TcpMessages.parsing (client_to_client c) c sock);
765 with
766 | Not_found ->
767 let (ip,port) = (TcpBufferedSocket.peer_addr sock) in
768 if !verbose_unexpected_messages then
769 lprintf_nl "Client %s:%d requested a file that is not shared [%s]"
770 (Ip.to_string ip) port (Sha1.to_hexa file_id)
771 | e ->
772 lprintf_nl "Exception %s in client_parse_header" (Printexc2.to_string e);
773 close sock (Closed_for_exception e);
774 raise e
777 (** Update the bitmap of a client. Unclear if it is still useful.
778 @param c The client which we want to update.
780 and update_client_bitmap c =
781 let file = c.client_file in
783 let swarmer = match file.file_swarmer with
784 None -> assert false
785 | Some swarmer -> swarmer
788 let up =
789 match c.client_uploader with
790 None ->
791 let up = CommonSwarming.register_uploader swarmer (as_client c)
792 (AvailableIntervals []) in
793 c.client_uploader <- Some up;
795 | Some up ->
799 let bitmap = match c.client_bitmap with
800 None ->
801 let len = CommonSwarming.partition_size swarmer in
802 let bitmap = Bitv.create len false in
803 c.client_bitmap <- Some bitmap;
804 bitmap
805 | Some bitmap -> bitmap
808 if c.client_new_chunks <> [] then begin
809 let chunks = c.client_new_chunks in
810 c.client_new_chunks <- [];
811 List.iter (fun n -> Bitv.set bitmap n true) chunks;
812 CommonSwarming.update_uploader_intervals up (AvailableBitv bitmap);
816 (** In this function we decide which piece we must request from client.
817 @param sock Socket of the client
818 @param c The client
820 and get_from_client sock (c: client) =
821 let file = c.client_file in
822 (* Check if there's not enough requests in the 'pipeline'
823 and if a request can be send (not choked and file is downloading) *)
824 if List.length c.client_ranges_sent < max_range_requests
825 && file_state file = FileDownloading
826 && (c.client_choked == false)
827 then
828 (* num is the number of the piece, x and y are the position
829 of the subpiece in the piece(!), r is a (CommonSwarmer) range *)
831 let up = match c.client_uploader with
832 None -> assert false
833 | Some up -> up in
834 let swarmer = CommonSwarming.uploader_swarmer up in
838 let num, x,y, r =
840 if !verbose_msg_clients then
841 lprintf_file_nl (as_file file) "CLIENT %d: Finding new range to send" (client_num c);
843 if !verbose_swarming then begin
844 lprintf_n "Current download:\n Current chunks: ";
847 List.iter (fun (x,y) -> lprintf "%Ld-%Ld " x y) c.client_chunks
848 with _ -> lprintf "No Chunks";
850 lprint_newline ();
852 lprintf_n "Current ranges: ";
854 List.iter (fun (p1,p2, r) ->
855 let (x,y) = CommonSwarming.range_range r in
856 lprintf "%Ld-%Ld[%Ld-%Ld] " p1 p2 x y
857 ) c.client_ranges_sent;
859 match c.client_range_waiting with
860 | None -> ()
861 | Some (x,y,r) -> lprintf "Waiting %Ld-%Ld" x y;
863 lprint_newline ();
865 lprintf_n "Current blocks: ";
867 match c.client_chunk with
868 | None -> lprintf "none"
869 | Some (chunk, blocks) -> List.iter (fun b ->
870 CommonSwarming.print_block b.up_block) blocks;
872 lprint_newline ();
874 lprintf_file_nl (as_file file) "Finding Range:";
875 end;
879 (*We must find a block to request first, and then
880 some range inside this block
883 let rec iter () =
885 match c.client_chunk with
887 | None ->
889 if !verbose_swarming then lprintf_file_nl (as_file file) "No block";
890 update_client_bitmap c;
891 (try CommonSwarming.verify_one_chunk swarmer with _ -> ());
892 (*Find a free block in the swarmer*)
893 let chunk, blocks = CommonSwarming.find_blocks up in
894 if !verbose_swarming then begin
895 lprintf_n "Blocks Found: "; List.iter (fun b ->
896 CommonSwarming.print_block b.up_block) blocks;
897 lprint_newline ()
898 end;
899 c.client_chunk <- Some (chunk, blocks);
901 (*We put the found block in client_block to
902 request range in this block. (Useful for
903 not searching each time a new block)
906 iter ()
908 | Some (chunk, blocks) ->
910 if !verbose_swarming then begin
911 lprintf_n "Current Blocks: "; List.iter (fun b ->
912 CommonSwarming.print_block b.up_block) blocks;
913 lprint_newline ()
914 end;
917 (*Given a block find a range inside*)
918 let (x,y,r) =
919 match c.client_range_waiting with
920 | Some (x,y,r) ->
921 c.client_range_waiting <- None;
922 (x,y,r)
923 | None ->
924 CommonSwarming.find_range up (min max_range_len file.file_piece_size)
927 let (x,y,r) =
929 if y -- x > max_range_len then begin
930 c.client_range_waiting <- Some (x ++ max_range_len, y, r);
931 (x, x ++ max_range_len, r)
932 end else
933 (x,y,r)
936 c.client_ranges_sent <- c.client_ranges_sent @ [x,y, r];
937 (* CommonSwarming.alloc_range r; *)
939 (* naughty, naughty, was computing a block number instead of a chunk
940 number. Only matters with merged downloads, and even then other
941 clients didn't seem to care (?), so the bug remained hidden *)
942 if !verbose_swarming then
943 lprintf_file_nl (as_file file) "Asking %d For Range %Ld-%Ld" chunk x y;
945 chunk, x -- file.file_piece_size ** Int64.of_int chunk, y -- x, r
947 with Not_found ->
949 (*If we don't find a range to request inside the block,
950 iter to choose another block*)
951 if !verbose_swarming then
952 lprintf_nl "Could not find range in current block";
953 (* c.client_blocks <- List2.removeq b c.client_blocks; *)
955 c.client_chunk <- None;
957 iter ()
960 iter ()
962 with Not_found ->
963 (*If we don't find a block to request we can check if the
964 file is finished (if there's missing pieces we can't decide
965 that the file is finished because we didn't found
966 a block to ask)
968 if !verbose_swarming then
969 lprintf_nl "Unable to get a block !!";
970 CommonSwarming.compute_bitmap swarmer;
971 check_finished swarmer file;
972 raise Not_found
975 send_client c (Request (num,x,y));
977 if !verbose_msg_clients then
978 lprintf_file_nl (as_file file) "CLIENT %d: Asking %s For Range %Ld-%Ld"
979 (client_num c) (Sha1.to_string c.client_uid) x y
981 with Not_found ->
982 if not (CommonSwarming.check_finished swarmer) && !verbose_download then
983 lprintf_file_nl (as_file file) "BTClient.get_from_client ERROR: can't find a block to download and file is not yet finished for file : %s..." file.file_name
986 (** In this function we match a message sent by a client
987 and react according to this message.
988 @param c The client which sent us a message
989 @param sock The socket used for this client
990 @param msg The message sent by the client
992 and client_to_client c sock msg =
993 if !verbose_msg_clients then begin
994 let (ip,port) = (TcpBufferedSocket.peer_addr sock) in
995 let (timeout, next) = get_rtimeout sock in
996 lprintf_nl "CLIENT %d(%s:%d): (%d, %d,%d) Received %s"
997 (client_num c) (Ip.to_string ip) port
998 (last_time ())
999 (int_of_float timeout)
1000 (int_of_float next)
1001 (TcpMessages.to_string msg);
1002 end;
1004 let file = c.client_file in
1006 (* Sending the "Have" message was moved to bTGlobals so this is useless *)
1007 (* if c.client_blocks_sent != file.file_blocks_downloaded then begin
1008 let rec iter list =
1009 match list with
1010 [] -> ()
1011 | b :: tail when tail == c.client_blocks_sent ->
1012 c.client_blocks_sent <- list;
1013 let (num,_,_) = CommonSwarming.block_block b in
1014 send_client c (Have (Int64.of_int num))
1015 | _ :: tail -> iter tail
1017 iter file.file_blocks_downloaded
1018 end;*)
1021 match msg with
1022 | Piece (num, offset, s, pos, len) ->
1023 (*A Piece message contains the data*)
1024 set_client_state c (Connected_downloading (file_num file));
1025 (*flag it as a good client *)
1026 c.client_good <- true;
1027 if file_state file = FileDownloading then begin
1028 let position = offset ++ file.file_piece_size *.. num in
1029 let up = match c.client_uploader with
1030 None -> assert false
1031 | Some up -> up in
1032 let swarmer = CommonSwarming.uploader_swarmer up in
1034 if !verbose_msg_clients then
1035 (match c.client_ranges_sent with
1036 [] -> lprintf_file_nl (as_file file) "EMPTY Ranges !!!"
1037 | (p1,p2,r) :: _ ->
1038 let (x,y) = CommonSwarming.range_range r in
1039 lprintf_file_nl (as_file file) "Current range from %s : %Ld [%d] (asked %Ld-%Ld[%Ld-%Ld])"
1040 (show_client c) position len
1041 p1 p2 x y
1044 let old_downloaded =
1045 CommonSwarming.downloaded swarmer in
1046 (* List.iter CommonSwarming.free_range c.client_ranges; *)
1047 CommonSwarming.received up
1048 position s pos len;
1049 (* List.iter CommonSwarming.alloc_range c.client_ranges; *)
1050 let new_downloaded =
1051 CommonSwarming.downloaded swarmer in
1053 (*Update rate and amount of data received from client*)
1054 count_download c (new_downloaded -- old_downloaded);
1055 (* use len here with max_dr quickfix *)
1056 Rate.update c.client_downloaded_rate ~amount:len;
1057 (* count bytes downloaded from network for this file *)
1058 file.file_session_downloaded <- file.file_session_downloaded ++ (Int64.of_int len);
1059 if !verbose_msg_clients then
1060 (match c.client_ranges_sent with
1061 [] -> lprintf_file_nl (as_file file) "EMPTY Ranges !!!"
1062 | (p1,p2,r) :: _ ->
1063 let (x,y) = CommonSwarming.range_range r in
1064 lprintf_file_nl (as_file file) "Received %Ld [%d] %Ld-%Ld[%Ld-%Ld] -> %Ld"
1065 position len
1066 p1 p2 x y
1067 (new_downloaded -- old_downloaded)
1070 (* changed 2.5.28 should have been done before !
1071 if new_downloaded <> old_downloaded then
1072 add_file_downloaded (as_file file)
1073 (new_downloaded -- old_downloaded); *)
1074 end;
1075 begin
1076 match c.client_ranges_sent with
1077 [] -> ()
1078 | r :: tail ->
1079 (* CommonSwarming.free_range r; *)
1080 c.client_ranges_sent <- tail;
1081 end;
1082 get_from_client sock c;
1084 (* Check if the client is still interesting for us... *)
1085 check_if_interesting file c
1087 | PeerID p ->
1088 (* Disconnect if that is ourselves. *)
1089 c.client_uid <- Sha1.direct_of_string p;
1090 if not (c.client_uid = !!client_uid) then
1091 begin
1092 let brand, release = parse_software p in
1093 c.client_brand <- brand;
1094 c.client_release <- release;
1095 send_client c Choke;
1096 c.client_sent_choke <- true;
1098 else
1099 disconnect_client c Closed_by_user
1102 | BitField p ->
1103 (*A bitfield is a summary of what a client have*)
1104 if !verbose_msg_clients then
1105 lprintf_file_nl (as_file file) "Bitfield message, metadata state %B" c.client_file.file_metadata_downloading;
1106 if not c.client_file.file_metadata_downloading then
1107 begin
1108 match c.client_file.file_swarmer with
1109 None -> ()
1110 | Some swarmer ->
1111 c.client_new_chunks <- [];
1113 let npieces = CommonSwarming.partition_size swarmer in
1114 let nbits = String.length p * 8 in
1116 if nbits < npieces then begin
1117 lprintf_file_nl (as_file file) "Error: expected bitfield of atleast %d but got %d" npieces nbits;
1118 disconnect_client c (Closed_for_error "Wrong bitfield length")
1119 end else begin
1121 let bitmap = CommonSwarming.chunks_verified_bitmap swarmer in
1123 for i = 0 to npieces - 1 do
1124 if is_bit_set p i then begin
1125 c.client_new_chunks <- i :: c.client_new_chunks;
1126 match VB.get bitmap i with
1127 | VB.State_missing | VB.State_partial ->
1128 c.client_interesting <- true
1129 | VB.State_complete | VB.State_verified -> ()
1130 end
1131 done;
1133 update_client_bitmap c;
1134 c.client_registered_bitfield <- true;
1136 if c.client_interesting then
1137 send_interested c;
1139 if !verbose_msg_clients then
1140 lprintf_file_nl (as_file file) "New BitField Registered";
1142 (* for i = 1 to max_range_requests - List.length c.client_ranges do
1143 (try get_from_client sock c with _ -> ())
1144 done
1147 end;
1148 end;
1149 (* Note: a bitfield must only be sent after the handshake and before everything else: NOT here *)
1151 | Have n ->
1152 (* A client can send a "Have" without sending a Bitfield *)
1153 if not c.client_file.file_metadata_downloading then
1154 begin
1155 match c.client_file.file_swarmer with
1156 None -> ()
1157 | Some swarmer ->
1158 let n = Int64.to_int n in
1159 let bitmap = CommonSwarming.chunks_verified_bitmap swarmer in
1160 (* lprintf_nl "verified: %c;" (VB.state_to_char (VB.get bitmap n)); *)
1161 (* if the peer has a chunk we don't, tell him we're interested and update his bitmap *)
1162 match VB.get bitmap n with
1163 | VB.State_missing | VB.State_partial ->
1164 c.client_interesting <- true;
1165 send_interested c;
1166 c.client_new_chunks <- n :: c.client_new_chunks;
1167 update_client_bitmap c;
1168 | VB.State_complete | VB.State_verified -> ()
1170 (* begin
1171 match c.client_bitmap, c.client_uploader with
1172 Some bitmap, Some up ->
1173 let swarmer = CommonSwarming.uploader_swarmer up in
1174 let n = Int64.to_int n in
1175 if bitmap.[n] <> '1' then
1177 let verified = CommonSwarming.verified_bitmap swarmer in
1178 if verified.[n] < '2' then begin
1179 c.client_interesting <- true;
1180 send_interested c;
1181 c.client_new_chunks <- n :: c.client_new_chunks;
1182 if c.client_block = None then begin
1183 update_client_bitmap c;
1184 (* for i = 1 to max_range_requests -
1185 List.length c.client_ranges do
1186 (try get_from_client sock c with _ -> ())
1187 done*)
1190 | None, Some _ -> lprintf_nl "no bitmap but client_uploader";
1191 | Some _ , None ->lprintf_nl "bitmap but no client_uploader";
1192 | None, None -> lprintf_nl "no bitmap no client_uploader";
1197 | Interested ->
1198 c.client_interested <- true;
1200 | Choke ->
1201 begin
1202 set_client_state (c) (Connected (-1));
1203 (* remote peer will clear the list of range we sent *)
1204 begin
1205 match c.client_uploader with
1206 None ->
1207 (* Afaik this is no protocol violation and happens if the client
1208 didn't send a client bitmap after the handshake. *)
1209 if !verbose_msg_clients then lprintf_file_nl (as_file file) "%s : Choke send, but no client bitmap"
1210 (show_client c)
1211 | Some up ->
1212 CommonSwarming.clear_uploader_intervals up
1213 end;
1214 c.client_ranges_sent <- [];
1215 c.client_range_waiting <- None;
1216 c.client_choked <- true;
1219 | NotInterested ->
1220 c.client_interested <- false;
1222 | Unchoke ->
1223 begin
1224 c.client_choked <- false;
1225 (* remote peer cleared our request : re-request *)
1226 for i = 1 to max_range_requests -
1227 List.length c.client_ranges_sent do
1228 (try get_from_client sock c with _ -> ())
1229 done
1232 | Request (n, pos, len) ->
1233 if len > max_request_len then begin
1234 close sock (Closed_for_error "Request longer than 1<<16");
1235 raise Exit
1236 end;
1238 if !CommonGlobals.has_upload = 0 then
1239 begin
1240 if client_has_a_slot (as_client c) then
1241 begin
1242 (* lprintf "Received request for upload\n"; *)
1243 (match c.client_upload_requests with
1244 [] ->
1245 CommonUploads.ready_for_upload (as_client c);
1246 | _ -> ());
1247 c.client_upload_requests <- c.client_upload_requests @ [n,pos,len];
1248 let file = c.client_file in
1249 match file.file_shared with
1250 None -> ()
1251 | Some s ->
1252 begin
1253 s.impl_shared_requests <- s.impl_shared_requests + 1;
1254 shared_must_update (as_shared s)
1257 else
1258 begin
1259 send_client c Choke;
1260 c.client_sent_choke <- true;
1261 c.client_upload_requests <- [];
1263 end;
1265 | Ping -> ()
1266 (* We don't 'generate' a Ping message on a Ping. *)
1268 | Cancel (n, pos, len) ->
1269 (* if we receive a cancel message from a peer, remove request *)
1270 if client_has_a_slot (as_client c) then
1271 c.client_upload_requests <- List2.remove_first (n, pos, len) c.client_upload_requests
1272 else
1273 if !verbose_msg_clients then
1274 lprintf_file_nl (as_file file) "Error: received cancel request but client has no slot"
1276 | Extended (extmsg, payload) ->
1277 (* extmsg: 0 handshake, N other message previously declared in handshake.
1278 atm ignore extended messages if were not currently in metadata state.
1279 TODO when were not in metadata state we should be friendly and answer metadata requests
1281 let module B = Bencode in
1282 if file.file_metadata_downloading then begin
1283 (* since we got at least one extended handshake from the peer, it should be okay to
1284 send a handshake back now. we need to send it so the remote client knows how
1285 to send us messages back.
1286 this should of course be moved but I dont know where yet.
1287 also we shouldnt send more than one handshake of course...
1289 if !verbose_msg_clients then
1290 lprintf_file_nl (as_file file) "Got extended msg: %d %s" extmsg (String.escaped payload);
1292 match extmsg with
1293 0x0 ->
1294 if !verbose_msg_clients then
1295 lprintf_file_nl (as_file file) "Got extended handshake";
1296 let dict = Bencode.decode payload in begin
1297 match dict with
1298 B.Dictionary list ->
1299 List.iter (fun (key,value) ->
1300 match key, value with
1301 | "metadata_size", B.Int n ->
1302 if !verbose_msg_clients then
1303 lprintf_file_nl (as_file file) "Got metadata size %Ld" n;
1304 c.client_file.file_metadata_size <- n;
1305 | "m", B.Dictionary mdict ->
1306 if !verbose_msg_clients then
1307 lprintf_file_nl (as_file file) "Got meta dict";
1308 List.iter (fun (key,value) ->
1309 match key, value with
1310 "ut_metadata", B.Int n ->
1311 if !verbose_msg_clients then
1312 lprintf_file_nl (as_file file) "ut_metadata is %Ld " n;
1313 c.client_ut_metadata_msg <- n;
1314 | _ -> ();
1315 ) mdict;
1317 | _ -> () ;
1318 ) list;
1319 (* okay so now we know what to ask for, so ask for metadata now
1320 since metadata can be larger than 16k which is the limit, the transfer needs to be chunked, so
1321 it is not really right to make the query here. but its a start.
1322 also im just asking for piece 0.
1323 (we should also check that we actually got the metadata info before proceeding)
1325 send_extended_handshake c file;
1326 send_extended_piece_request c c.client_file.file_metadata_piece file;
1327 |_ -> () ;
1328 end;
1329 | 0x01 -> (* ut_metadata is 1 because we asked it to be 1 in the handshake
1330 the msg_type is probably
1331 1 for data,
1332 but could be 0 for request(unlikely since we didnt advertise we had the meta)
1333 2 for reject, also unlikely since peers shouldnt advertise if they dont have(but will need handling in the end)
1335 {'msg_type': 1, 'piece': 0, 'total_size': 3425}
1336 after the dict comes the actual piece
1338 if !verbose_msg_clients then
1339 lprintf_file_nl (as_file file) "Got extended ut_metadata message";
1340 let msgtype = ref 0L in begin
1341 begin
1342 match B.decode payload with
1343 B.Dictionary list ->
1344 List.iter (fun (key,value) ->
1345 match key, value with
1346 "msg_type", B.Int n ->
1347 if !verbose_msg_clients then
1348 lprintf_file_nl (as_file file) "msg_type %Ld" n;
1349 msgtype := n;
1350 | "piece", B.Int n ->
1351 if !verbose_msg_clients then
1352 lprintf_file_nl (as_file file) "piece %Ld" n;
1353 file.file_metadata_piece <- n;
1354 | "total_size", B.Int n ->
1355 if !verbose_msg_clients then
1356 lprintf_file_nl (as_file file) "total_size %Ld" n; (* should always be the same as received in the initial handshake i suppose *)
1357 |_ -> () ;
1358 ) list;
1359 |_ -> () ;
1360 end;
1361 match !msgtype with
1362 1L ->
1363 let last_piece_index = (Int64.div file.file_metadata_size 16384L) in
1364 if !verbose_msg_clients then
1365 lprintf_file_nl (as_file file) "handling metadata piece %Ld of %Ld"
1366 file.file_metadata_piece
1367 last_piece_index;
1368 (* store the metadata piece in memory *)
1369 file.file_metadata_chunks.(1 + (Int64.to_int file.file_metadata_piece)) <- payload;
1370 (* possibly write metadata to disk *)
1371 if file.file_metadata_piece >=
1372 (Int64.div file.file_metadata_size 16384L) then begin
1373 if !verbose_msg_clients then
1374 lprintf_file_nl (as_file file) "this was the last piece";
1375 (* here we should simply delete the current download, and wait for mld to pick up the new torrent file *)
1376 (* the entire payload is currently in the array, TODO *)
1377 let newtorrentfile = (Printf.sprintf "%s/BT-%s.torrent"
1378 (Filename2.temp_dir_name ())
1379 (Sha1.to_string file.file_id)) in
1380 let fd = Unix32.create_rw newtorrentfile in
1381 let fileindex = ref 0L in
1382 begin
1383 (* the ee is so we can use the same method to find the
1384 start of the payload for the real payloads as well as the synthetic ones
1386 file.file_metadata_chunks.(0) <- "eed4:info";
1387 file.file_metadata_chunks.(2 + Int64.to_int last_piece_index) <- "eee";
1389 Array.iteri (fun index chunk ->
1390 (* regexp ee is a fugly way to find the end of the 1st dict before the real payload *)
1391 let metaindex = (2 + (Str.search_forward (Str.regexp_string "ee") chunk 0 )) in
1392 let chunklength = ((String.length chunk) - metaindex) in
1393 Unix32.write fd !fileindex chunk
1394 metaindex
1395 chunklength;
1396 fileindex := Int64.add !fileindex (Int64.of_int chunklength);
1398 ) file.file_metadata_chunks;
1399 with e -> begin
1400 (* TODO ignoring errors for now, the array isnt really set up right anyway yet *)
1402 lprintf_file_nl (as_file file) "Error %s saving metadata"
1403 (Printexc2.to_string e)
1404 *) ()
1405 end;
1406 (* Yay, now the new torrent is on disk! amazing! However, now we need to kill the dummy torrent
1407 and restart it with the fresh real torrent *)
1409 (* it seems we need to use the dynamic interface... *)
1410 if !verbose then
1411 lprintf_file_nl (as_file file) "cancelling metadata download ";
1412 let owner = file.file_file.impl_file_owner in
1413 let group = file.file_file.impl_file_group in begin
1414 CommonInteractive.file_cancel (as_file file) owner ;
1415 (* hack_op_file_cancel c.client_file; *)
1416 if !verbose then
1417 lprintf_file_nl (as_file file) "starting download from metadata torrent %s" newtorrentfile ;
1418 ignore(CommonNetwork.network_parse_url BTGlobals.network newtorrentfile owner group);
1419 end;
1420 (try Sys.remove newtorrentfile with _ -> ())
1421 end;
1424 else begin
1425 (* now ask for the next metadata piece, if any *)
1426 let nextpiece = (Int64.succ file.file_metadata_piece) in begin
1427 if !verbose_msg_clients then
1428 lprintf_file_nl (as_file file) "asking for the next piece %Ld" nextpiece;
1429 send_extended_piece_request c nextpiece file;
1430 end;
1431 end;
1432 |_ ->
1433 if !verbose_msg_clients then
1434 lprintf_file_nl (as_file file) "unmatched extended subtype" ;
1435 end;
1437 | _ ->
1438 if !verbose_msg_clients then
1439 lprintf_file_nl (as_file file) "Got extended other msg ";
1440 end;
1442 | DHT_Port port ->
1443 match !bt_dht with
1444 | None ->
1445 if !verbose_msg_clients then
1446 lprintf_file_nl (as_file file) "Received DHT PORT when DHT is disabled. From %s" (show_client c)
1447 | Some dht ->
1448 BT_DHT.M.ping dht (fst c.client_host, port) begin function
1449 | None ->
1450 if !verbose then
1451 lprintf_file_nl (as_file file) "Peer %s didn't reply to DHT ping on port %d" (show_client c) port
1452 | Some (id,addr) ->
1453 BT_DHT.update dht Kademlia.Good id addr
1456 with e ->
1457 lprintf_file_nl (as_file file) "Error %s while handling MESSAGE: %s" (Printexc2.to_string e) (TcpMessages.to_string msg)
1460 (** The function used to connect to a client.
1461 The connection is not immediately initiated. It will
1462 be put in a fifo and dequeued according to
1463 !!max_connections_per_second. (@see commonGlobals.ml)
1464 @param c The client we must connect
1466 let connect_client c =
1467 if can_open_connection connection_manager &&
1468 (let (ip,port) = c.client_host in
1469 match !Ip.banned (ip, c.client_country_code) with
1470 None -> true
1471 | Some reason ->
1472 if !verbose_connect then
1473 lprintf_nl "%s:%d (%s), blocked: %s"
1474 (Ip.to_string ip) port
1475 (fst (Geoip.get_country_code_name c.client_country_code))
1476 reason;
1477 false)
1478 then
1479 match c.client_sock with
1480 NoConnection ->
1482 let token =
1483 add_pending_connection connection_manager (fun token ->
1485 if !verbose_msg_clients then
1486 lprintf_nl "CLIENT %d: connect_client" (client_num c);
1487 let (ip,port) = c.client_host in
1488 if !verbose_msg_clients then
1489 lprintf_nl "connecting %s:%d" (Ip.to_string ip) port;
1490 connection_try c.client_connection_control;
1491 begin
1492 let sock = connect token "bittorrent download"
1493 (Ip.to_inet_addr ip) port
1494 (fun sock event ->
1495 match event with
1496 BASIC_EVENT LTIMEOUT ->
1497 if !verbose_msg_clients then
1498 lprintf_nl "CLIENT %d: LIFETIME" (client_num c);
1499 close sock Closed_for_timeout
1500 | BASIC_EVENT RTIMEOUT ->
1501 if !verbose_msg_clients then
1502 lprintf_nl "CLIENT %d: RTIMEOUT (%d)" (client_num c)
1503 (last_time ())
1505 close sock Closed_for_timeout
1506 | BASIC_EVENT (CLOSED r) ->
1507 begin
1508 match c.client_sock with
1509 | Connection s when s == sock ->
1510 disconnect_client c r
1511 | _ -> ()
1512 end;
1513 | _ -> ()
1516 c.client_sock <- Connection sock;
1517 set_lifetime sock 600.;
1518 TcpBufferedSocket.set_read_controler sock download_control;
1519 TcpBufferedSocket.set_write_controler sock upload_control;
1520 TcpBufferedSocket.set_rtimeout sock 30.;
1521 let file = c.client_file in
1523 if !verbose_msg_clients then
1524 lprintf_file_nl (as_file file) "READY TO DOWNLOAD FILE";
1526 send_init !!client_uid file.file_id sock;
1527 send_extended_handshake c file;
1529 (* Fabrice: Initialize the client bitmap and uploader fields to <> None *)
1530 update_client_bitmap c;
1531 (* (try get_from_client sock c with _ -> ());*)
1532 incr counter;
1533 (*We 'hook' the client_parse_header function to the socket
1534 This function will then be called when the first message will
1535 be parsed
1537 set_bt_sock sock !verbose_msg_clients
1538 (BTHeader (client_parse_header !counter (ref ((Some c), c.client_country_code)) true))
1540 with e ->
1541 lprintf_nl "Exception %s while connecting to client"
1542 (Printexc2.to_string e);
1543 disconnect_client c (Closed_for_exception e)
1545 (*Since this is a pending connection put ConnectionWaiting
1546 in client_sock
1549 c.client_sock <- ConnectionWaiting token
1550 | _ -> ()
1553 (** The Listen function (very much like in C : TCP Socket Server).
1554 Monitors client connection to us.
1556 let listen () =
1558 let s = TcpServerSocket.create "bittorrent client server"
1559 (Ip.to_inet_addr !!client_bind_addr)
1560 !!client_port
1561 (fun sock event ->
1562 match event with
1563 TcpServerSocket.CONNECTION (s,
1564 Unix.ADDR_INET(from_ip, from_port)) ->
1565 (*Receiving an event TcpServerSocket.CONNECTION from
1566 the TcpServerSocket means that a new client try
1567 to connect to us
1569 let ip = (Ip.of_inet_addr from_ip) in
1570 let cc = Geoip.get_country_code_option ip in
1571 if !verbose_sources > 1 then lprintf_nl "CONNECTION RECEIVED FROM %s"
1572 (Ip.to_string (Ip.of_inet_addr from_ip))
1574 (*Reject this connection if we don't want
1575 to bypass the max_connection parameter
1577 if can_open_connection connection_manager &&
1578 (match !Ip.banned (ip, cc) with
1579 None -> true
1580 | Some reason ->
1581 if !verbose_connect then
1582 lprintf_nl "%s:%d (%s) blocked: %s"
1583 (Ip.to_string ip) from_port
1584 (fst (Geoip.get_country_code_name cc))
1585 reason;
1586 false)
1587 then
1588 begin
1589 let token = create_token connection_manager in
1590 let sock = TcpBufferedSocket.create token
1591 "bittorrent client connection" s
1592 (fun sock event ->
1593 match event with
1594 BASIC_EVENT (RTIMEOUT|LTIMEOUT) ->
1595 (*monitor read and life timeout on client
1596 sockets
1598 close sock Closed_for_timeout
1599 | _ -> ()
1602 TcpBufferedSocket.set_read_controler sock download_control;
1603 TcpBufferedSocket.set_write_controler sock upload_control;
1605 let c = ref (None, cc) in
1606 TcpBufferedSocket.set_closer sock (fun _ r ->
1607 match fst !c with
1608 Some c -> begin
1609 match c.client_sock with
1610 | Connection s when s == sock ->
1611 disconnect_client c r
1612 | _ -> ()
1614 | None -> ()
1616 set_rtimeout sock 30.;
1617 incr counter;
1618 (*Again : 'hook' client_parse_header to the socket*)
1619 set_bt_sock sock !verbose_msg_clients
1620 (BTHeader (client_parse_header !counter c false));
1622 else
1623 (*don't forget to close the incoming sock if we can't
1624 open a new connection
1626 Unix.close s
1627 | _ -> ()
1628 ) in
1629 listen_sock := Some s;
1631 with e ->
1632 if !verbose_connect then
1633 lprintf_nl "Exception %s while init bittorrent server"
1634 (Printexc2.to_string e)
1637 (** This function send keepalive messages to all connected clients
1638 (and update socket lifetime)
1640 let send_pings () =
1641 List.iter (fun file ->
1642 Hashtbl.iter (fun _ c ->
1643 match c.client_sock with
1644 | Connection sock ->
1645 send_client c Ping;
1646 set_lifetime sock 130.;
1647 | _ -> ()
1648 ) file.file_clients
1649 ) !current_files
1651 open Bencode
1654 (** Check each clients for a given file if they are connected.
1655 If they aren't, try to connect them
1657 let resume_clients file =
1658 Hashtbl.iter (fun _ c ->
1660 match c.client_sock with
1661 | Connection sock -> ()
1662 (*i think this one is not really usefull for debugging
1663 lprintf_nl "[BT]: RESUME: Client is already connected"; *)
1664 | _ ->
1665 (try
1666 (*test if we can connect client according to the its
1667 connection_control.
1668 Currently the delay between two try is 120 seconds.
1670 if connection_can_try c.client_connection_control then
1671 connect_client c
1672 else
1673 print_control c.client_connection_control
1674 with _ -> ())
1675 with e ->
1676 if !verbose_connect then
1677 lprintf_file_nl (as_file file) "Exception %s in resume_clients" (Printexc2.to_string e)
1678 ) file.file_clients
1680 let () =
1681 resume_clients_hook := resume_clients
1683 (** Check if the value replied by the tracker is correct.
1684 @param key the name of the key
1685 @param n the value to check
1686 @param url Url of the tracker
1687 @param name the name of the file
1689 let chk_keyval key n url name =
1690 let int_n = (Int64.to_int n) in
1691 if !verbose_msg_clients then
1692 lprintf_nl "Reply from %s in file: %s has %s: %d" (show_tracker_url url) name key int_n;
1693 if int_n > -1 then
1694 int_n
1695 else begin
1696 lprintf_nl "Reply from %s in file: %s has an invalid %s value: %d" (show_tracker_url url) name key int_n;
1700 let exn_catch f x = try `Ok (f x) with exn -> `Exn exn
1702 (** In this function we interact with the tracker
1703 @param file The file for which we want some sources
1704 @param need_sources whether we need any sources
1706 let talk_to_tracker file need_sources =
1707 (* This is the function which will be called by the http client for parsing the response *)
1708 let f t filename =
1709 let tracker_url = show_tracker_url t.tracker_url in
1710 let tracker_failed reason =
1711 (* On failure, disable the tracker and count attempts (@see is_tracker_enabled) *)
1712 let num = match t.tracker_status with | Disabled_failure (i,_) -> i + 1 | _ -> 1 in
1713 t.tracker_status <- Disabled_failure (num, intern reason);
1714 lprintf_file_nl (as_file file) "Failure no. %d%s from Tracker %s for file: %s Reason: %s"
1716 (if !!tracker_retries = 0 then "" else Printf.sprintf "/%d" !!tracker_retries)
1717 tracker_url file.file_name (Charset.Locale.to_utf8 reason)
1719 match exn_catch File.to_string filename with
1720 | `Exn _ | `Ok "" -> tracker_failed "empty reply"
1721 | `Ok s ->
1722 match exn_catch Bencode.decode s with
1723 | `Exn exn -> tracker_failed (Printf.sprintf "wrong reply (%s)" (Printexc2.to_string exn))
1724 | `Ok (Dictionary list) ->
1725 t.tracker_interval <- 600;
1726 t.tracker_min_interval <- 600;
1727 if need_sources then t.tracker_last_clients_num <- 0;
1728 let chk_keyval key n = chk_keyval key n t.tracker_url file.file_name in
1729 if not (List.mem_assoc "failure reason" list) then
1730 begin
1731 begin match t.tracker_status with
1732 | Disabled_failure (i, _) ->
1733 lprintf_file_nl (as_file file) "Received good message from Tracker %s after %d bad attempts"
1734 tracker_url i
1735 | _ -> () end;
1736 (* Received good message from tracker after failures, re-enable tracker *)
1737 t.tracker_status <- Enabled;
1738 end;
1739 List.iter (fun (key,value) ->
1740 match (key,value) with
1741 | "failure reason", String failure -> tracker_failed failure
1742 | "warning message", String warning ->
1743 lprintf_file_nl (as_file file) "Warning from Tracker %s in file: %s Reason: %s"
1744 tracker_url file.file_name warning
1745 | "interval", Int n ->
1746 t.tracker_interval <- chk_keyval key n;
1747 (* in case we don't receive "min interval" *)
1748 if t.tracker_min_interval > t.tracker_interval then
1749 t.tracker_min_interval <- t.tracker_interval
1750 | "min interval", Int n ->
1751 t.tracker_min_interval <- chk_keyval key n;
1752 (* make sure "min interval" is always < or equal to "interval" *)
1753 if t.tracker_min_interval > t.tracker_interval then
1754 t.tracker_min_interval <- t.tracker_interval
1755 | "downloaded", Int n ->
1756 t.tracker_torrent_downloaded <- chk_keyval key n
1757 | "complete", Int n
1758 | "done peers", Int n ->
1759 t.tracker_torrent_complete <- chk_keyval key n
1760 | "incomplete", Int n ->
1761 t.tracker_torrent_incomplete <- chk_keyval key n;
1762 (* if complete > 0 and we receive incomplete we probably won't receive num_peers so we simulate it below *)
1763 if t.tracker_torrent_complete > 0 then
1764 t.tracker_torrent_total_clients_count <- (t.tracker_torrent_complete + t.tracker_torrent_incomplete);
1765 | "num peers", Int n ->
1766 t.tracker_torrent_total_clients_count <- chk_keyval key n;
1767 (* if complete > 0 and we receive num_peers we probably won't receive incomplete so we simulate it below *)
1768 if t.tracker_torrent_complete > 0 then
1769 t.tracker_torrent_incomplete <- (t.tracker_torrent_total_clients_count - t.tracker_torrent_complete);
1770 | "last", Int n ->
1771 t.tracker_torrent_last_dl_req <- chk_keyval key n
1772 | "key", String n ->
1773 t.tracker_key <- n;
1774 if !verbose_msg_clients then
1775 lprintf_file_nl (as_file file) "%s in file: %s has key: %s" tracker_url file.file_name n
1776 | "tracker id", String n ->
1777 t.tracker_id <- n;
1778 if !verbose_msg_clients then
1779 lprintf_file_nl (as_file file) "%s in file: %s has tracker id %s" tracker_url file.file_name n
1781 | "peers", List list ->
1782 if need_sources then
1783 List.iter (fun v ->
1784 match v with
1785 | Dictionary list ->
1786 let peer_id = ref Sha1.null in
1787 let peer_ip = ref Ip.null in
1788 let port = ref 0 in
1790 List.iter (fun v ->
1791 match v with
1792 "peer id", String id ->
1793 peer_id := Sha1.direct_of_string id;
1794 | "ip", String ip ->
1795 peer_ip := Ip.of_string ip
1796 | "port", Int p ->
1797 port := Int64.to_int p
1798 | _ -> ()
1799 ) list;
1801 t.tracker_last_clients_num <- t.tracker_last_clients_num + 1;
1802 maybe_new_client file !peer_id !peer_ip !port
1804 | _ -> assert false
1805 ) list
1806 | "peers", String p ->
1807 let rec iter_comp s pos l =
1808 if pos < l then
1809 let ip = Ip.of_ints (get_uint8 s pos,get_uint8 s (pos+1),
1810 get_uint8 s (pos+2),get_uint8 s (pos+3))
1811 and port = get_int16 s (pos+4)
1813 t.tracker_last_clients_num <- t.tracker_last_clients_num + 1;
1814 maybe_new_client file Sha1.null ip port;
1816 iter_comp s (pos+6) l
1818 if need_sources then
1819 iter_comp p 0 (String.length p)
1820 | "private", Int n -> ()
1821 (* TODO: if set to 1, disable peer exchange *)
1822 | "peers6", _ -> ()
1823 (* TODO IPv6 support required *)
1824 | key, _ -> lprintf_file_nl (as_file file) "received unknown entry in answer from tracker: %s : %s" key (Bencode.print value)
1825 ) list;
1826 (*Now, that we have added new clients to a file, it's time
1827 to connect to them*)
1828 if !verbose_sources > 0 then
1829 lprintf_file_nl (as_file file) "talk_to_tracker: got %i source(s) for file %s"
1830 t.tracker_last_clients_num file.file_name;
1831 if need_sources then resume_clients file
1833 | _ -> tracker_failed "wrong reply (value)"
1835 let event =
1836 if file.file_tracker_connected then ""
1837 else "started"
1839 connect_trackers file event need_sources f
1841 let talk_to_dht file need_sources =
1842 match !bt_dht with
1843 | None -> ()
1844 | Some dht ->
1845 if !verbose then lprintf_file_nl (as_file file) "DHT announce";
1846 file.file_last_dht_announce <- last_time ();
1847 BT_DHT.query_peers dht file.file_id (fun (_,addr as node) token peers ->
1848 BT_DHT.M.announce dht addr !!client_port token file.file_id (fun _ -> ()) ~kerr:(fun () ->
1849 if !verbose then lprintf_file_nl (as_file file) "DHT announce to %s failed" (BT_DHT.show_node node));
1850 if need_sources then
1851 begin
1852 List.iter (fun (ip,port) -> maybe_new_client file Sha1.null ip port) peers;
1853 resume_clients file
1854 end)
1856 let talk_to_tracker file need_sources =
1857 if file.file_last_dht_announce + 14*60 < last_time () && not file.file_private then talk_to_dht file need_sources;
1858 talk_to_tracker file need_sources
1860 (** Check to see if file is finished, if not
1861 try to get sources for it
1863 let recover_files () =
1864 if !verbose_share then
1865 lprintf_nl "recover_files";
1866 List.iter (fun file ->
1867 match file.file_swarmer with
1868 None -> ()
1869 | Some swarmer ->
1870 (try check_finished swarmer file with e -> ());
1871 match file_state file with
1872 FileDownloading ->
1873 if !verbose_share then
1874 lprintf_file_nl (as_file file) "recover downloading";
1875 (try talk_to_tracker file true with _ -> ())
1876 | FileShared ->
1877 if !verbose_share then
1878 lprintf_file_nl (as_file file) "recover shared";
1879 (try talk_to_tracker file false with _ -> ())
1880 | FilePaused -> () (*when we are paused we do nothing, not even logging this vvvv*)
1881 | FileQueued -> ()
1882 | s -> if !verbose then lprintf_file_nl (as_file file) "recover: Other state %s!!" (string_of_state s)
1883 ) !current_files
1885 let upload_buffer = String.create 100000
1889 Send a Piece message
1890 for one of the request of client
1891 @param sock The socket of the client
1892 @param c The client
1894 let rec iter_upload sock c =
1895 match c.client_upload_requests with
1896 [] -> ()
1897 | (num, pos, len) :: tail ->
1898 if len = zero then begin
1899 c.client_upload_requests <- tail;
1900 iter_upload sock c
1901 end else
1902 if c.client_allowed_to_write >= 0L then begin
1904 c.client_upload_requests <- tail;
1906 let file = c.client_file in
1907 let offset = pos ++ file.file_piece_size *.. num in
1908 c.client_allowed_to_write <- c.client_allowed_to_write -- len;
1909 count_upload c len;
1910 let len = Int64.to_int len in
1911 (* lprintf "Unix32.read: offset %Ld len %d\n" offset len; *)
1912 Unix32.read (file_fd file) offset upload_buffer 0 len;
1913 (* update upload rate from len bytes *)
1914 Rate.update c.client_upload_rate ~amount:len;
1915 Rate.update c.client_downloaded_rate;
1916 file.file_uploaded <- file.file_uploaded ++ (Int64.of_int len);
1917 file.file_session_uploaded <- file.file_session_uploaded ++ (Int64.of_int len);
1918 let _ =
1919 (* update stats *)
1920 count_filerequest c;
1921 match file.file_shared with
1922 None -> ()
1923 | Some s ->
1924 begin
1925 s.impl_shared_uploaded <- file.file_uploaded;
1926 shared_must_update (as_shared s)
1929 (* lprintf "sending piece\n"; *)
1930 send_client c (Piece (num, pos, upload_buffer, 0, len));
1931 iter_upload sock c
1932 with e ->
1933 if !verbose then
1934 lprintf_nl "Exception %s in iter_upload" (Printexc2.to_string e)
1935 end else
1936 begin
1937 (* lprintf "client is waiting for another piece\n"; *)
1938 ready_for_upload (as_client c)
1943 In this function we check if we can send bytes (according
1944 to bandwidth control), if we can, call iter_upload to
1945 send a Piece message
1946 @param c the client to which we can send some bytes
1947 @param allowed the amount of bytes we can send to client
1949 let client_can_upload c allowed =
1950 (* lprintf "allowed to upload %d\n" allowed; *)
1951 do_if_connected c.client_sock (fun sock ->
1952 match c.client_upload_requests with
1953 [] -> ()
1954 | _ :: tail ->
1955 let new_allowed_to_write =
1956 c.client_allowed_to_write ++ (Int64.of_int allowed) in
1957 if allowed > 0 && can_write_len sock
1958 (Int64.to_int new_allowed_to_write)
1959 then begin
1960 CommonUploads.consume_bandwidth allowed;
1961 c.client_allowed_to_write <- new_allowed_to_write;
1962 end;
1963 iter_upload sock c
1966 let file_resume file =
1967 List.iter (fun t ->
1968 match t.tracker_status with
1969 | Enabled | Disabled_mld _ -> ()
1970 | Disabled_failure _ | Disabled _ -> t.tracker_status <- Enabled
1971 ) file.file_trackers;
1972 (try talk_to_tracker file true with _ -> ())
1977 Send info to tracker when stopping a file.
1978 @param file the file we want to stop
1980 let file_stop file =
1981 if file.file_tracker_connected then
1982 begin
1983 connect_trackers file "stopped" false (fun _ _ ->
1984 lprintf_file_nl (as_file file) "Tracker return: stopped %s" file.file_name;
1985 file.file_tracker_connected <- false)
1989 Create the 'hooks'
1991 let _ =
1992 client_ops.op_client_can_upload <- client_can_upload;
1993 file_ops.op_file_resume <- file_resume;
1994 file_ops.op_file_recover <- file_resume;
1995 file_ops.op_file_pause <- (fun file ->
1996 Hashtbl.iter (fun _ c ->
1997 match c.client_sock with
1998 Connection sock -> close sock Closed_by_user
1999 | _ -> ()
2000 ) file.file_clients;
2001 (*When a file is paused we consider it is stopped*)
2002 file_stop file
2004 file_ops.op_file_queue <- file_ops.op_file_pause;
2005 client_ops.op_client_enter_upload_queue <- (fun c ->
2006 if !verbose_msg_clients then
2007 lprintf_nl "Client %d: client_enter_upload_queue" (client_num c);
2008 ready_for_upload (as_client c));
2009 network.op_network_connected_servers <- (fun _ -> []);