patch #7210
[mldonkey.git] / src / daemon / common / commonUploads.ml
blob49cd387d281f69da7b0ab92c2633d513e6c23a63
1 (* Copyright 2001, 2002 b8_bavard, b8_fee_carabine, INRIA *)
2 (*
3 This file is part of mldonkey.
5 mldonkey is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation; either version 2 of the License, or
8 (at your option) any later version.
10 mldonkey is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 GNU General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with mldonkey; if not, write to the Free Software
17 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
20 open Int64ops
21 open Md4
22 open CommonShared
23 open Printf2
24 open CommonInteractive
25 open CommonClient
26 open CommonComplexOptions
27 open CommonTypes
28 open CommonFile
29 open Options
30 open BasicSocket
31 open TcpBufferedSocket
33 open CommonGlobals
34 open CommonOptions
36 let log_prefix = "[cUp]"
38 let lprintf_nl fmt =
39 lprintf_nl2 log_prefix fmt
41 let lprintf_n fmt =
42 lprintf2 log_prefix fmt
45 PROBLEMS: most of the time, users won't share their files on all networks.
46 We should provide a different directory than incoming/, where files
47 would be shared, per directory ?
49 We should move to a per-network approach. This module would be a functor,
50 and each network would be configured to share or not share different
51 directories. Moreover, we should have a different sharing strategy.
53 Default would be: share all files greater than 1 MB in incoming/ on Edonkey.
57 (*******************************************************************
59 TYPES
61 *******************************************************************)
63 let ed2k_block_size = 9728000L
64 let tiger_block_size = Int64.of_int (1024 * 1024)
66 type shared_file = {
67 shared_codedname : string;
68 shared_info : Store.index;
69 shared_fd : Unix32.t;
70 shared_format : CommonTypes.format;
71 shared_impl : shared_file shared_impl;
72 mutable shared_uids_wanted :
73 (file_uid_id * (shared_file -> Uid.t -> unit)) list;
76 and shared_info = {
77 shared_fullname : string;
78 shared_size : int64;
79 mutable shared_md4s : Md4.t array;
80 mutable shared_tiger : TigerTree.t array;
81 mutable shared_bitprint : Uid.t option;
82 mutable shared_mtime : float;
83 mutable shared_uids : Uid.t list;
84 mutable shared_id : int;
87 and shared_tree =
89 shared_dirname : string;
90 mutable shared_files : shared_file list;
91 mutable shared_dirs : (string * shared_tree) list;
94 type local_search = {
95 mutable local_search_results : (shared_file * shared_info) list;
96 mutable local_search_query : query;
99 module IndexingSharedFiles = struct
101 let store_name = "shared_store"
103 let search_query s = s.local_search_query
105 type search = local_search
106 type result = shared_info
108 let result_names sh = [sh.shared_fullname]
109 let result_size sh = sh.shared_size
110 let result_uids sh = []
111 let result_tags sh = []
113 (* We should probably directly use the Store.index here so that all
114 shared_infos are stored on disk. *)
115 type stored_result = Store.index
116 let result_index r = r
120 module IndexedSharedFiles = CommonIndexing.Make(IndexingSharedFiles)
122 (*************************************************************************)
123 (* *)
124 (* SAVED SHARED FILES *)
125 (* *)
126 (*************************************************************************)
128 module SharedFileOption = struct
130 let get_value assocs name conv =
131 try conv (List.assoc name assocs)
132 with _ -> failwith (Printf.sprintf "Bad shared file %s" name)
134 let value_to_info v =
135 match v with
136 Options.Module assocs ->
137 let sh_md4s = get_value assocs "md4s"
138 (value_to_array (fun v ->
139 Md4.of_string (value_to_string v)))
141 let sh_ttr = get_value assocs "ttr"
142 (value_to_array (fun v ->
143 TigerTree.of_string (value_to_string v)))
145 let sh_uids = get_value assocs "uids"
146 (value_to_list (fun v ->
147 Uid.of_string (value_to_string v)))
150 let sh_size = get_value assocs "size" value_to_int64 in
151 let sh_name = get_value assocs "name" value_to_filename in
152 let sh_mtime = get_value assocs "mtime" value_to_float in
154 let sh_bitprint = ref None in
155 List.iter (fun uid ->
156 match Uid.to_uid uid with
157 Bitprint _ -> sh_bitprint := Some uid;
158 | _ -> ()
159 ) sh_uids;
161 { shared_fullname = sh_name;
162 shared_mtime = sh_mtime;
163 shared_size = sh_size;
164 shared_md4s = sh_md4s;
165 shared_tiger = sh_ttr;
166 shared_bitprint = !sh_bitprint;
167 shared_uids = sh_uids;
168 shared_id = 0;
171 | _ -> failwith "Options: not a shared file info option"
173 let info_to_value info =
174 Options.Module [
175 "name", filename_to_value info.shared_fullname;
176 "md4s", array_to_value Md4.hash_to_value info.shared_md4s;
177 "mtime", float_to_value info.shared_mtime;
178 "size", int64_to_value info.shared_size;
179 "ttr", array_to_value TigerTree.hash_to_value info.shared_tiger;
180 "uids", list_to_value (fun v ->
181 string_to_value (Uid.to_string v)) info.shared_uids;
184 let t = define_option_class "SharedFile" value_to_info info_to_value
187 let shared_ini = create_options_file "shared_files.ini"
189 let shared_section = file_section shared_ini [] ""
191 let old_shared_files = define_option shared_section
192 ["shared_files"] ""
193 (list_option SharedFileOption.t) []
195 let infos_by_name = Hashtbl.create 113
197 let _ =
198 set_after_load_hook shared_ini (fun _ ->
199 List.iter (fun info ->
200 let index = IndexedSharedFiles.add info in
201 Hashtbl.add infos_by_name info.shared_fullname index
202 ) !!old_shared_files;
203 old_shared_files =:= [];
205 set_before_save_hook shared_ini (fun _ ->
206 Hashtbl.iter (fun _ index ->
207 old_shared_files =:= (IndexedSharedFiles.get_result index)
208 :: !!old_shared_files
209 ) infos_by_name
211 set_after_save_hook shared_ini (fun _ -> old_shared_files =:= [])
213 let load () = try Options.load shared_ini with _ -> ()
214 let save () = Options.save shared_ini
216 (*************************************************************************)
217 (* *)
218 (* NETWORK *)
219 (* *)
220 (*************************************************************************)
222 let network = CommonNetwork.new_network "GS" "Global Shares"
223 [ VirtualNetwork ]
225 let _ =
226 network.op_network_connected <- (fun _ -> false);
227 network.op_network_is_enabled <- (fun _ -> raise IgnoreNetwork);
228 network.op_network_update_options <- (fun _ -> raise IgnoreNetwork);
229 network.op_network_info <- (fun n ->
231 network_netnum = network.network_num;
232 network_config_filename = (match network.network_config_file with
233 [] -> "" | opfile :: _ -> options_file_name opfile);
234 network_netname = network.network_name;
235 network_netflags = network.network_flags;
236 network_enabled = true;
237 network_uploaded = Int64.zero;
238 network_downloaded = Int64.zero;
239 network_connected_servers = 0;
241 network.op_network_ports <- (fun _ ->
243 !!http_port, "http_port";
244 !!telnet_port, "telnet_port";
245 !!gui_port, "gui_port";
246 !!gift_port, "gift_port GUI";
248 network.op_network_connected_servers <- (fun _ -> [])
250 let (shared_ops : shared_file CommonShared.shared_ops) =
251 CommonShared.new_shared_ops network
253 let waiting_shared_files = ref []
254 let shareds_by_uid = Hashtbl.create 13
255 let shareds_by_id = Hashtbl.create 13
257 let add_by_uid uid sh =
258 Hashtbl.add shareds_by_uid uid sh
260 let find_by_uid uid =
261 Hashtbl.find shareds_by_uid uid
263 module SharedFilesIndex = IndexedSharedFiles.MakeIndex (struct
264 let add_search_result s sh =
265 let r = Hashtbl.find shareds_by_id sh.shared_id in
266 s.local_search_results <- (r, sh) :: s.local_search_results
267 end)
269 let current_job = ref None
271 (*******************************************************************
273 DATA STRUCTURES
275 *******************************************************************)
277 let shareds_counter = ref 1
278 let shared_counter = ref (Int64.zero)
279 let shared_files = Hashtbl.create 13
281 let new_shared_dir dirname = {
282 shared_dirname = dirname;
283 shared_files = [];
284 shared_dirs = [];
287 let shared_tree = new_shared_dir ""
289 (*******************************************************************
291 HASHES COMPUTATION
293 *******************************************************************)
295 let md4_of_list md4s =
296 let len = List.length md4s in
297 let s = String.create (len * 16) in
298 let rec iter list i =
299 match list with
300 [] -> ()
301 | md4 :: tail ->
302 let md4 = Md4.direct_to_string md4 in
303 String.blit md4 0 s i 16;
304 iter tail (i+16)
306 iter md4s 0;
307 Md4.string s
309 let rec tiger_of_array array pos block =
310 if block = 1 then
311 array.(pos)
312 else
313 let len = Array.length array in
314 if pos + block / 2 >= len then
315 tiger_of_array array pos (block/2)
316 else
317 let d1 = tiger_of_array array pos (block/2) in
318 let d2 = tiger_of_array array (pos+block/2) (block/2) in
319 let s = String.create (1 + Tiger.length * 2) in
320 s.[0] <- '\001';
321 String.blit (TigerTree.direct_to_string d1) 0 s 1 Tiger.length;
322 String.blit (TigerTree.direct_to_string d2) 0 s (1+Tiger.length) Tiger.length;
323 let t = Tiger.string s in
324 let t = TigerTree.direct_of_string (Tiger.direct_to_string t) in
327 let rec tiger_max_block_size block len =
328 if block >= len then block
329 else tiger_max_block_size (block*2) len
331 let tiger_of_array array =
332 tiger_of_array array 0 (tiger_max_block_size 1 (Array.length array))
334 let rec tiger_pos nblocks =
335 if nblocks < 2 then 0 else
336 let half = nblocks / 2 in
337 let acc = nblocks - 2 * half in
338 let half = half + acc in
339 half + tiger_pos half
341 let rec tiger_pos2 nblocks =
342 if nblocks < 2 then 0, [] else
343 let half = nblocks / 2 in
344 let acc = nblocks - 2 * half in
345 let half = half + acc in
346 let pos, list = tiger_pos2 half in
347 let list = (nblocks, pos) :: list in
348 let pos = half + pos in
349 pos, list
351 let tiger_node d1 d2 =
352 let s = String.create (1 + Tiger.length * 2) in
353 s.[0] <- '\001';
354 String.blit (TigerTree.direct_to_string d1) 0 s 1 Tiger.length;
355 String.blit (TigerTree.direct_to_string d2) 0 s (1+Tiger.length) Tiger.length;
356 let t = Tiger.string s in
357 let t = TigerTree.direct_of_string (Tiger.direct_to_string t) in
360 let rec tiger_tree s array pos block =
361 if block = 1 then
362 array.(pos)
363 else
364 let len = Array.length array in
365 if pos + block / 2 >= len then
366 tiger_tree s array pos (block/2)
367 else
368 let d1 = tiger_tree s array pos (block/2) in
369 let d2 = tiger_tree s array (pos+block/2) (block/2) in
370 tiger_node d1 d2
372 let rec fill_tiger_tree s list =
373 match list with
374 [] -> ()
375 | (nblocks, pos) :: tail ->
376 (* nblocks: the number of blocks in the next level
377 pos: the position of the blocks in to be created
379 let half = nblocks / 2 in
380 let acc = nblocks - 2 * half in
382 let next_pos = pos + half + acc in
383 for i = 0 to half - 1 do
384 let d1 = s.(next_pos+2*i) in
385 let d2 = s.(next_pos+2*i+1) in
386 s.(pos+i) <- tiger_node d1 d2;
387 done;
388 if acc = 1 then s.(pos+half) <- s.(next_pos+2*half);
389 fill_tiger_tree s tail
391 let flatten_tiger_array array =
392 let len = Array.length array in
393 let s = String.create ( len * TigerTree.length) in
394 for i = 0 to len - 1 do
395 String.blit (TigerTree.direct_to_string array.(i)) 0
396 s (i * TigerTree.length) TigerTree.length
397 done;
400 let unflatten_tiger_array s =
401 let len = String.length s / TigerTree.length in
402 let array = Array.create len TigerTree.null in
403 for i = 0 to len - 1 do
404 array.(i) <- TigerTree.direct_of_string
405 (String.sub s (i * TigerTree.length) TigerTree.length)
406 done;
407 array
409 let make_tiger_tree array =
410 let len = Array.length array in
411 let pos, list = tiger_pos2 len in
412 let s = Array.create (pos + len) TigerTree.null in
413 for i = 0 to len - 1 do
414 s.(pos+i) <- array.(i)
415 done;
416 fill_tiger_tree s list;
417 flatten_tiger_array s
419 let build_tiger_tree_file uid ttr =
420 let s = make_tiger_tree ttr in
421 Unix2.safe_mkdir "ttr";
422 Unix2.can_write_to_directory "ttr";
423 File.from_string (Filename.concat "ttr" (Uid.to_file_string uid)) s
425 let rec start_job_for sh (wanted_id, handler) =
426 let info = IndexedSharedFiles.get_result sh.shared_info in
428 List.iter (fun id ->
429 match wanted_id,Uid.to_uid id with
430 BITPRINT, Bitprint _
431 | SHA1, Sha1 _
432 | ED2K, Ed2k _
433 | MD5, Md5 _
434 | MD5EXT, Md5Ext _
435 | TIGER, TigerTree _
436 -> (try handler sh id with _ -> ()); raise Exit
437 | _ -> ()
438 ) info.shared_uids;
440 match wanted_id with
441 SHA1 ->
442 begin
444 CommonHasher.compute_sha1 (Unix32.filename sh.shared_fd)
445 zero info.shared_size (fun job ->
446 if job.CommonHasher.job_error then begin
447 lprintf_nl "Error during hashing of %s" info.shared_fullname;
448 current_job := None;
449 end else
450 begin
451 let sha1 = job.CommonHasher.job_result
453 let uid = Uid.create (Sha1 sha1) in
454 info.shared_uids <- uid :: info.shared_uids;
455 IndexedSharedFiles.update_result sh.shared_info info;
457 add_by_uid uid sh;
458 start_job_for sh (wanted_id, handler)
461 with e ->
462 current_job := None;
463 raise e
466 | BITPRINT ->
467 let sha1 = ref None in
468 let tiger = ref None in
469 List.iter (fun id ->
470 match Uid.to_uid id with
471 | Sha1 (s) -> sha1 := Some s
472 | TigerTree (s) -> tiger := Some s
473 | _ -> ()
474 ) info.shared_uids;
475 begin
476 match !sha1, !tiger with
477 Some sha1, Some tiger ->
478 let uid = Uid.create (Bitprint (sha1, tiger)) in
479 info.shared_uids <- uid :: info.shared_uids;
480 info.shared_bitprint <- Some uid;
481 IndexedSharedFiles.update_result sh.shared_info info;
483 add_by_uid uid sh;
485 build_tiger_tree_file uid info.shared_tiger;
487 start_job_for sh (wanted_id, handler)
489 | _ -> ()
491 (* Not enough information to compute the bitprint. Ask for the corresponding
492 information. What happens if there is an error during SHA1 or TIGER
493 computation ??? *)
494 ask_for_uid sh BITPRINT handler;
495 (match !sha1 with
496 None -> ask_for_uid sh SHA1 (fun _ _ -> ())
497 | _ -> ());
498 (match !tiger with
499 None -> ask_for_uid sh TIGER (fun _ _ -> ())
500 | _ -> ());
504 | MD5EXT ->
505 let md5ext =
507 let fd = Unix32.create_ro info.shared_fullname in
508 let file_size = Unix32.getsize64 fd in
509 let len64 = min 307200L file_size in
510 let len = Int64.to_int len64 in
511 let s = String.create len in
512 Unix32.read fd zero s 0 len;
513 Md5Ext.string s
514 with e ->
515 current_job := None;
516 raise e
518 let uid = Uid.create (Md5Ext (md5ext)) in
519 info.shared_uids <- uid :: info.shared_uids;
520 IndexedSharedFiles.update_result sh.shared_info info;
522 add_by_uid uid sh;
523 start_job_for sh (wanted_id, handler)
525 | ED2K ->
526 let size = info.shared_size in
527 let chunk_size = ed2k_block_size in
528 let rec iter pos hashes =
529 if pos < size then
531 CommonHasher.compute_md4 info.shared_fullname
532 pos (min (size -- pos) chunk_size)
533 (fun job ->
534 if job.CommonHasher.job_error then begin
535 lprintf_nl "Error during hashing of %s" info.shared_fullname;
536 current_job := None;
537 end else begin
539 iter (pos ++ chunk_size) (job.CommonHasher.job_result :: hashes)
540 end)
541 with e ->
542 current_job := None;
543 raise e
544 else
545 let list = List.rev hashes in
546 let ed2k = md4_of_list list in
547 let uid = Uid.create (Ed2k (ed2k)) in
548 info.shared_md4s <- Array.of_list list;
549 info.shared_uids <- uid :: info.shared_uids;
550 IndexedSharedFiles.update_result sh.shared_info info;
552 add_by_uid uid sh;
553 start_job_for sh (wanted_id, handler)
555 iter zero []
557 | TIGER ->
559 if TigerTree.enabled then
561 let size = info.shared_size in
562 let chunk_size = tiger_block_size in
563 let rec iter pos hashes =
564 if pos < size then
565 CommonHasher.compute_tiger info.shared_fullname
566 pos (min (size -- pos) chunk_size)
567 (fun job ->
568 if job.CommonHasher.job_error then begin
569 lprintf_nl "Error during hashing of %s"
570 info.shared_fullname;
571 current_job := None;
572 end else begin
573 iter (pos ++ chunk_size)
574 (job.CommonHasher.job_result :: hashes)
575 end)
576 else
577 let array = Array.of_list (List.rev hashes) in
578 let tiger = tiger_of_array array in
579 let uid = Uid.create (TigerTree (tiger)) in
580 info.shared_tiger <- array;
581 info.shared_uids <- uid :: info.shared_uids;
582 IndexedSharedFiles.update_result sh.shared_info info;
584 add_by_uid uid sh;
585 start_job_for sh (wanted_id, handler)
587 iter zero []
589 | _ -> raise Exit
591 with Exit ->
592 current_job := None
593 | e -> current_job := None; raise e
595 let shared_files_timer _ =
596 match !current_job with
597 | Some _ -> ()
598 | None ->
599 match !waiting_shared_files with
600 [] -> ()
601 | sh :: tail ->
602 match sh.shared_uids_wanted with
603 [] -> waiting_shared_files := tail;
604 | uid :: tail ->
605 if !verbose_share then
606 lprintf_nl "shared_files_timer: starting job";
607 sh.shared_uids_wanted <- tail;
608 current_job := Some sh;
609 start_job_for sh uid
611 let ask_for_uid sh uid f =
612 sh.shared_uids_wanted <- (uid,f) :: sh.shared_uids_wanted;
613 waiting_shared_files := sh :: !waiting_shared_files
615 (*******************************************************************
617 FUNCTIONS
619 *******************************************************************)
621 let rec add_shared_file node sh dir_list =
622 match dir_list with
623 [] -> assert false
624 | [filename] ->
625 node.shared_files <- sh :: node.shared_files
626 | dirname :: dir_tail ->
627 let node =
629 List.assoc dirname node.shared_dirs
630 with _ ->
631 let new_node = new_shared_dir dirname in
632 node.shared_dirs <- (dirname, new_node) :: node.shared_dirs;
633 new_node
635 add_shared_file node sh dir_tail
637 let new_info full_name size =
638 incr shareds_counter;
639 let fd = Unix32.create_ro full_name in
640 let mtime = Unix32.mtime64 fd in
642 let index = Hashtbl.find infos_by_name full_name in
643 let info = IndexedSharedFiles.get_result index in
644 if info.shared_mtime <> mtime then begin
645 Hashtbl.remove infos_by_name full_name;
646 IndexedSharedFiles.remove_result index;
647 raise Not_found;
648 end;
650 info.shared_id <- !shareds_counter;
651 IndexedSharedFiles.update_result index info;
652 info, index
653 with Not_found ->
654 let info = {
655 shared_fullname = full_name;
656 shared_uids = [];
657 shared_mtime = mtime;
658 shared_md4s = [||];
659 shared_tiger = [||];
660 shared_bitprint = None;
661 shared_size = size;
662 shared_id = !shareds_counter;
663 } in
664 let index =IndexedSharedFiles.add info in
665 Hashtbl.add infos_by_name full_name index;
666 info, index
668 let add_shared full_name codedname size =
670 Hashtbl.find shared_files codedname
671 with Not_found ->
673 let fd = Unix32.create_ro full_name in
675 let info, index = new_info full_name size in
677 let rec impl = {
678 impl_shared_update = 1;
679 impl_shared_fullname = full_name;
680 impl_shared_codedname = codedname;
681 impl_shared_size = size;
682 impl_shared_id = Md4.random ();
683 impl_shared_num = 0;
684 impl_shared_uploaded = Int64.zero;
685 impl_shared_ops = shared_ops;
686 impl_shared_val = sh;
687 impl_shared_requests = 0;
688 impl_shared_file = None;
689 impl_shared_servers = [];
691 and sh = {
692 shared_info = index;
693 shared_codedname = codedname;
694 shared_fd= fd;
695 shared_format = CommonMultimedia.get_info full_name;
696 shared_impl = impl;
697 shared_uids_wanted = [];
698 } in
700 update_shared_num impl;
702 (* lprintf "FILE ADDED: %s\n" codedname; *)
703 Hashtbl.add shared_files codedname sh;
704 Hashtbl.add shareds_by_id info.shared_id sh;
706 List.iter (fun uid -> add_by_uid uid sh) info.shared_uids;
708 SharedFilesIndex.add sh.shared_info;
709 add_shared_file shared_tree sh (String2.split codedname '/');
710 shared_counter := !shared_counter ++ size;
711 (* lprintf "Total shared : %Ld\n" !shared_counter; *)
714 let iter f =
715 Hashtbl.iter (fun _ sh ->
716 f sh
717 ) shared_files
719 let query q =
720 let s = {
721 local_search_query = q;
722 local_search_results = []
723 } in
724 SharedFilesIndex.find s;
725 s.local_search_results
727 let find_by_name name = Hashtbl.find shared_files name
729 (*let find_by_num num = Hashtbl.find table num *)
731 (**********************************************************************
733 UPLOAD SCHEDULER
735 ***********************************************************************)
737 let client_is_connected c = is_connected (client_state c)
739 let upload_clients = (Fifo.create () : client Fifo.t)
741 let (pending_slots_map : client Intmap.t ref) = ref Intmap.empty
742 (* let (pending_slots_fifo : int Fifo.t) = Fifo.create () *)
744 let packet_size = 10240
746 (* two seconds max of streaming ahead *)
747 let streaming_amount () =
748 int_of_float (!CommonGlobals.payload_bandwidth *. 2.0)
749 let streaming_left = ref (streaming_amount ())
750 let streaming_time = (ref None : float option ref)
752 let next_uploads () =
754 let rec next_uploads_aux () =
755 let rec next_uploads_round n =
756 let upload_to_one_client max_amount =
757 let c = Fifo.take upload_clients in
758 client_can_upload c max_amount
759 (* it's up to client_can_upload to put the client back into the Fifo *)
762 (* lprintf "next_uploads %d %d\n"
763 (Fifo.length upload_clients) !streaming_left;
764 Fifo.iter (fun c ->
765 lprintf " client %d\n" (client_num c)
766 ) upload_clients; *)
767 if n>0 &&
768 not (Fifo.empty upload_clients) &&
769 !streaming_left > 0 then begin
770 upload_to_one_client packet_size;
771 next_uploads_round (n-1)
772 end in
774 (* stop if no uploader could take anything during the last round *)
775 let old_streaming_left = !streaming_left in
776 next_uploads_round (Fifo.length upload_clients);
777 if !streaming_left < old_streaming_left then
778 next_uploads_aux () in
781 if !verbose_upload then begin
782 lprintf "streaming_left %d\n" !streaming_left;
783 end; *)
784 (* buffer empties with time... *)
785 (* FIXME wall-clock time is not needed here and causes problems when clock jumps *)
786 let new_streaming_time = BasicSocket.current_time () in
787 let deltat' = (match !streaming_time with
788 | None -> 0.
789 | Some t -> new_streaming_time -. t) in
790 (* stay sane no matter what *)
791 let deltat = min 10. (max 0. deltat') in
792 if abs_float (deltat -. deltat') > epsilon_float then
793 lprintf_nl "Detected clock jump. deltat %f adjusted to %f" deltat' deltat;
794 (* do not overflow *)
795 let deltab = !CommonGlobals.payload_bandwidth *. deltat in
796 if deltab > float max_int then
797 lprintf_nl "OVERFLOW deltab %f, ignored" deltab
798 else
799 streaming_left := !streaming_left + (int_of_float deltab);
800 (* lprintf_nl "next_uploads %f %f %d %d %d" new_streaming_time deltat !streaming_left (streaming_amount()) (Fifo.length upload_clients); *)
801 streaming_left := min !streaming_left (streaming_amount ());
802 streaming_time := Some new_streaming_time;
803 next_uploads_aux ()
805 let upload_credit_timer _ =
806 if !has_upload = 0 then
807 (if !upload_credit < 300 then incr upload_credit)
808 else
809 decr has_upload
811 let ready_for_upload c =
812 Fifo.put upload_clients c
814 let add_pending_slot c =
815 if client_has_a_slot c then begin
816 if !verbose_upload then lprintf_nl "Avoided inserting an uploader in pending slots!"
818 else
819 if not (Intmap.mem (client_num c) !pending_slots_map) then
820 pending_slots_map := Intmap.add (client_num c) c !pending_slots_map
822 let remove_pending_slot c =
823 if Intmap.mem (client_num c) !pending_slots_map then
824 pending_slots_map := Intmap.remove (client_num c) !pending_slots_map
826 let rec give_a_slot c =
827 remove_pending_slot c;
828 if not (client_is_connected c) then
829 find_pending_slot ()
830 else
831 begin
832 set_client_has_a_slot c NormalSlot;
833 client_enter_upload_queue c
836 and find_pending_slot () =
838 let rec iter () =
839 let c = Intmap.nth !pending_slots_map
840 (Random.int (Intmap.length !pending_slots_map)) in
841 give_a_slot c
843 iter ()
844 with _ -> ()
846 let add_pending_slot c =
847 let client_upload c =
848 match client_upload c with
849 None -> raise Not_found
850 | Some file -> file
852 let csh = file_shared (client_upload c) in
853 let cdir = shared_dir csh in
854 let cprio = ref (shared_prio csh) in
855 let cfriend = ref (if is_friend c && !!friends_upload_slot then 1 else 0) in
856 let csmallfiles = ref (match csh with
857 | None -> 0
858 | Some sh -> if shared_size sh <= !!small_files_slot_limit then 1 else 0) in
859 let allowed_release_slots =
860 ref (Misc.percentage_of_ints !!max_upload_slots !!max_release_slots) in
862 (* check current upload slots for already used special slots *)
863 Intmap.iter (fun _ c ->
864 if shared_dir (file_shared (client_upload c)) = cdir then
865 decr cprio;
866 match client_slot c with
867 ReleaseSlot -> decr allowed_release_slots
868 | FriendSlot -> decr cfriend
869 | SmallFileSlot -> decr csmallfiles
870 | _ -> ()) !CommonClient.uploaders;
872 let slot_type =
873 if file_release (client_upload c) && !allowed_release_slots > 0 then Some ReleaseSlot else
874 if !cfriend > 0 then Some FriendSlot else
875 if !csmallfiles > 0 then Some SmallFileSlot else
876 if !cprio > 0 then Some (PrioSlot cdir) else
877 None
879 match slot_type with
880 Some slot ->
881 remove_pending_slot c;
882 if client_is_connected c then
883 begin
884 set_client_has_a_slot c slot;
885 client_enter_upload_queue c
887 | None -> add_pending_slot c
889 let static_refill_upload_slots () =
890 let len = Intmap.length !CommonClient.uploaders in
891 if len < !!max_upload_slots then find_pending_slot ()
893 (* Since dynamic slots allocation is based on feedback, it should not
894 * allocate new slots too fast, since connections need some time to reach
895 * a stable state.
896 * To compensate for that slow pace, slots are allocated quadratically
897 * as long as the link is not saturated.
900 let not_saturated_count = ref 0
901 let allocation_cluster = ref 1
903 let dynamic_refill_upload_slots () =
904 let reset_state () =
905 not_saturated_count := 0;
906 allocation_cluster := 1 in
908 let open_slots n =
909 let i = ref n in
910 if !verbose_upload then
911 lprintf_nl "try to allocate %d more slots" n;
912 while !i > 0 do
913 find_pending_slot ();
914 decr i
915 done in
917 let slot_bw = 3072 in
918 let min_upload_slots = 3 in
919 let estimated_capacity = detected_uplink_capacity () in
920 let estimated_capacity = if !!max_hard_upload_rate = 0 then
921 estimated_capacity
922 else
923 (* max_hard_upload_rate lowered manually,... *)
924 min estimated_capacity (!!max_hard_upload_rate * 1024) in
925 if !verbose_upload then
926 lprintf_nl "usage: %d(%d) capacity: %d"
927 (short_delay_upload_usage ())
928 (upload_usage ())
929 estimated_capacity;
930 let len = Intmap.length !CommonClient.uploaders in
931 if len < !!max_upload_slots then begin
933 (* enough free bw for another slot *)
934 if short_delay_upload_usage () + slot_bw < estimated_capacity then begin
935 if !verbose_upload then
936 lprintf_nl "uplink not fully used";
937 incr not_saturated_count
938 end else reset_state ();
940 if len < min_upload_slots then begin
941 if !verbose_upload then
942 lprintf_nl "too few upload slots";
943 open_slots (min_upload_slots - len);
944 reset_state ()
945 end else if !not_saturated_count >= 2 then begin
946 open_slots (min !allocation_cluster (!!max_upload_slots - len));
947 incr allocation_cluster
951 let turn = ref (-1)
953 let refill_upload_slots () =
954 if !CommonGlobals.has_upload = 0 then begin
955 incr turn;
956 if !turn = 5 then
957 turn := 0;
958 if !!dynamic_slots then begin
959 if !turn = 0 then
960 (* call every 5s *)
961 dynamic_refill_upload_slots ()
962 end else
963 (* call every 1s *)
964 static_refill_upload_slots ()
967 let consume_bandwidth len =
968 streaming_left := !streaming_left - len
970 (**********************************************************************
972 DOWNLOAD SCHEDULER
974 ***********************************************************************)
976 let download_credit = ref 0
977 let download_fifo = Fifo.create ()
979 let download_engine () =
980 if not (Fifo.empty download_fifo) then begin
981 let credit = !!max_hard_download_rate in
982 let credit = 2 * (if credit = 0 then 10000 else credit) in
983 download_credit := !download_credit + credit;
984 let rec iter () =
985 if !download_credit > 0 && not (Fifo.empty download_fifo) then
986 begin
987 (try
988 let (f, len) = Fifo.take download_fifo in
989 download_credit := !download_credit - (len / 1000 + 1);
990 f ()
991 with _ -> ());
992 iter ()
995 iter ()
998 let queue_download_request f len =
999 if !!max_hard_download_rate = 0 then
1000 f ()
1001 else
1002 Fifo.put download_fifo (f,len)
1004 (* timer started every 1/10 seconds *)
1005 let upload_download_timer () =
1006 (try download_engine ()
1007 with e ->
1008 lprintf_nl "Exception %s in download_engine" (Printexc2.to_string e)
1010 (try next_uploads ()
1011 with e -> lprintf_nl "exc %s in upload" (Printexc2.to_string e))
1013 let words_of_filename =
1014 let extension_list = [
1015 "mp3" ; "avi" ; "jpg" ; "jpeg" ; "txt" ; "mov" ; "mpg" ; "ogm"
1018 let rec remove_short list list2 =
1019 match list with
1020 [] -> List.rev list2
1021 | s :: list ->
1022 if List.mem s extension_list then
1023 remove_short list (s :: list2) else
1025 if String.length s < 5 then (* keywords should had list be 5 bytes *)
1026 remove_short list list2
1027 else
1028 remove_short list (s :: list2)
1031 let get_name_keywords file_name =
1032 match remove_short (String2.stem file_name) [] with
1033 [] | [_] ->
1034 lprintf_nl "Not enough keywords to recover %s" file_name;
1035 [file_name]
1036 | l -> l
1038 get_name_keywords
1040 let _ =
1041 Heap.add_memstat "CommonUploads" (fun level buf ->
1042 Printf.bprintf buf " infos_by_name: %d\n" (Hashtbl.length infos_by_name);
1043 Printf.bprintf buf " shareds_by_uid: %d\n" (Hashtbl.length shareds_by_uid);
1044 Printf.bprintf buf " shareds_by_id: %d\n" (Hashtbl.length shareds_by_id);
1045 Printf.bprintf buf " shared_files: %d\n" (Hashtbl.length shared_files);
1046 Printf.bprintf buf " pending_slots: %d\n" (Intmap.length !pending_slots_map);