fix some "deprecated" warnings
[mldonkey.git] / src / daemon / common / commonSources.ml
blobc6d3777e3ce371f5be572d881e099b55a3be5742
1 (* Copyright 2001, 2002 Simon, INRIA *)
2 (*
3 This file is part of mldonkey.
5 mldonkey is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation; either version 2 of the License, or
8 (at your option) any later version.
10 mldonkey is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 GNU General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with mldonkey; if not, write to the Free Software
17 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
20 open Queues
21 open Printf2
22 open Md4
23 open Options
24 open BasicSocket
26 open TcpBufferedSocket
27 open CommonFile
28 open CommonGlobals
29 open CommonOptions
30 open CommonTypes
32 (* BUGS:
34 * From mlnet.log: not all sources in [new_sources] are ready. More
35 sources for a file than total sources in [sources_by_uid]. Probably the
36 same source entered several times inside the same queue. After a connection
37 failure, we should not only change the request_time but also the
38 request_score in case it is initial_new_source_score.
42 (* TODO:
44 * Limit total number of indirect connections
45 * Implement need_new_sources
46 * The concept of 'source' should replace the concept of 'client' for the
47 interface. The old version of source management used to keep good clients
48 so that useful information about them would not be lost.
52 (* A source might be in the following states:
54 (1) Not Connected: the source appears in the queues between
55 'new_sources_queue' and 'old_sources_queue2'.
57 (2) Connecting: 'source_last_attempt' is not zero. The source appears in the
58 queue 'connecting_sources_queue' for all files.
60 (3) Connected: The source appears either in the queue
61 'connected_sources_queue' or 'busy_sources_queue' for every files.
62 If 'r.request_time' is 0, it means we can query the source whenever we
63 want, but we are already too busy, so the source is in
64 'busy_sources_queue'. Otherwise, we are not ready to query, and the source
65 has to be in 'connected_sources_queue'.
70 type request_result =
71 | File_possible (* we asked, but didn't know *)
72 | File_not_found (* we asked, the file is not there *)
73 (* | File_expected we asked, because it was announced *)
74 | File_new_source (* we never asked, but we should *)
75 | File_found (* the file was found *)
76 | File_chunk (* the file has chunks we want *)
77 | File_upload (* we uploaded from this client *)
78 (* | File_unknown We don't know anything *)
80 let not_found_score = -5
81 let possible_score = -3
82 let new_source_score = 1 (* after the first request *)
83 let found_score = 3
84 let chunk_score = 5
85 let upload_score = 7
86 let initial_new_source_score = 10 (* before the first request *)
88 let outside_queue = -1
89 let new_sources_queue = 0
90 let good_sources_queue = 1
91 let ready_saved_sources_queue = 2
92 let waiting_saved_sources_queue = 3
93 let old_sources1_queue = 4
94 let old_sources2_queue = 5
95 let old_sources3_queue = 6
96 let do_not_try_queue = 7
97 let connected_sources_queue = 8
98 let connecting_sources_queue = 9
99 let busy_sources_queue = 10
101 let queue_name = [|
102 "new_sources";
103 "good_sources";
104 "ready_saved_sources";
105 "waiting_saved_sources";
106 "old_sources1";
107 "old_sources2";
108 "old_sources3";
109 "do_not_try_queue";
110 "connected_sources";
111 "connecting_sources";
112 "busy_sources";
116 let nqueues = Array.length queue_name
118 let queue_period = Array.make nqueues 600
120 let () =
121 queue_period.(new_sources_queue) <- 0;
122 queue_period.(connected_sources_queue) <- 0;
123 queue_period.(connecting_sources_queue) <- 0;
124 queue_period.(busy_sources_queue) <- 0;
125 queue_period.(good_sources_queue) <- 0;
126 queue_period.(ready_saved_sources_queue) <- 0;
127 queue_period.(waiting_saved_sources_queue) <- 0;
128 queue_period.(old_sources1_queue) <- 0;
129 queue_period.(old_sources2_queue) <- 0;
130 queue_period.(old_sources3_queue) <- 0
132 module Make(M:
135 (*************************************************************************)
136 (*************************************************************************)
137 (*************************************************************************)
138 (* *)
139 (* FUNCTOR Argument *)
140 (* *)
141 (*************************************************************************)
142 (*************************************************************************)
143 (*************************************************************************)
147 val module_name : string
149 type source_uid
150 val dummy_source_uid : source_uid
151 val source_uid_to_value: source_uid -> Options.option_value
152 val value_to_source_uid: Options.option_value -> source_uid
154 type source_brand
155 val dummy_source_brand : source_brand
156 val source_brand_to_value: source_brand -> Options.option_value
157 val value_to_source_brand: Options.option_value -> source_brand
159 val direct_source : source_uid -> bool
160 val indirect_source : source_uid -> bool
161 end) =
162 (struct
165 (*************************************************************************)
166 (*************************************************************************)
167 (*************************************************************************)
168 (* *)
169 (* FUNCTOR Body *)
170 (* *)
171 (*************************************************************************)
172 (*************************************************************************)
173 (*************************************************************************)
175 (*************************************************************************)
176 (* *)
177 (* Types *)
178 (* *)
179 (*************************************************************************)
181 type source = {
182 source_uid : M.source_uid;
183 mutable source_files : file_request list;
185 (* the 'source_score' increases with failures in connections *)
186 mutable source_score : int;
188 (* the 'source_num' that should be used to create the client corresponding to
189 this source *)
190 mutable source_num : int;
192 (* the 'source_age' is the time of the last successful connection *)
193 mutable source_age : int;
195 (* the 'source_connecting' indicates that this source is currently in the
196 process of being connected. *)
197 mutable source_last_attempt : int;
198 mutable source_sock : tcp_connection;
200 mutable source_brand : M.source_brand;
201 mutable source_country_code : int option;
204 and file_request = {
205 request_file : file_sources_manager;
206 mutable request_queue : int;
207 mutable request_time : int;
208 mutable request_score : int;
211 and file_sources_manager = {
212 manager_uid : string;
213 mutable manager_sources : source Queues.Queue.t array;
214 mutable manager_active_sources : int;
215 mutable manager_all_sources : int;
216 mutable manager_file : (unit -> file);
219 and functions = {
220 mutable function_connect: (M.source_uid -> int option -> unit);
221 mutable function_query: (M.source_uid -> string -> unit);
223 mutable function_string_to_manager: (string -> file_sources_manager);
225 mutable function_max_connections_per_second : (unit -> int);
226 mutable function_max_sources_per_file : (unit -> int);
228 mutable function_add_location :
229 (M.source_uid -> string -> int option -> unit);
230 mutable function_remove_location :
231 (M.source_uid -> string -> unit);
234 (*************************************************************************)
235 (* *)
236 (* Modules *)
237 (* *)
238 (*************************************************************************)
240 module HS = Weak.Make(struct
241 type t = source
242 let hash s = Hashtbl.hash s.source_uid
243 let equal x y = x.source_uid = y.source_uid
244 end)
246 module H = Weak.Make(struct
247 type t = source
248 let hash s = Hashtbl.hash s.source_num
249 let equal x y = x.source_num = y.source_num
250 end)
252 module SourcesQueueCreate = Queues.Make(struct
253 type t = source
254 let compare s1 s2 = compare s1.source_uid s2.source_uid
255 end)
257 (*************************************************************************)
258 (* *)
259 (* Global variables *)
260 (* *)
261 (*************************************************************************)
263 let dummy_source = {
264 source_uid = M.dummy_source_uid;
265 source_files = [];
267 source_num = 0;
268 source_score = 0;
269 source_age = 0;
270 source_last_attempt = 0;
271 source_sock = NoConnection;
273 source_brand = M.dummy_source_brand;
274 source_country_code = None;
277 let last_refill = ref 0
279 let not_implemented s _ =
280 failwith (Printf.sprintf "CommonSources.%s not implemented" s)
282 let functions = {
283 function_connect = not_implemented "function_connect";
284 function_query = not_implemented "function_query";
285 function_string_to_manager = not_implemented
286 "function_string_to_manager";
288 function_max_connections_per_second = (fun _ ->
289 !!max_connections_per_second);
290 function_max_sources_per_file = (fun _ -> 10);
292 function_add_location = not_implemented "function_add_location";
293 function_remove_location = not_implemented "function_remove_location";
296 let indirect_connections = ref 0
298 (*************************************************************************)
299 (* *)
300 (* Global tables *)
301 (* *)
302 (*************************************************************************)
304 let sources_by_uid = HS.create 13557
305 let sources_by_num = H.create 13557
307 let file_sources_managers = ref []
309 let connecting_sources = Fifo.create ()
311 let next_direct_sources = Fifo.create ()
312 let next_indirect_sources = ref []
315 let active_queue q =
316 q >= connected_sources_queue && q <= busy_sources_queue
318 (*************************************************************************)
319 (* *)
320 (* request_score *)
321 (* *)
322 (*************************************************************************)
324 let request_score r = r.request_score
326 let set_score_part r score =
327 r.request_score <- score
330 (*************************************************************************)
331 (* *)
332 (* other helper functions *)
333 (* *)
334 (*************************************************************************)
336 let rec find_throttled_queue queue =
337 if queue_period.(queue) > 0 || queue = old_sources3_queue then
338 queue
339 else
340 find_throttled_queue (queue + 1)
342 let get_throttle_delay m q throttled =
343 if throttled then
344 (max 0
345 (queue_period.(q)
346 - (file_priority (m.manager_file ()))
347 + Queue.length m.manager_sources.(connected_sources_queue))
349 else 0
352 * determine the number of (throttled) ready sources for a manager queue
355 (* I know it's evil to break out of an X.iter using an exception...
356 But that function really needs to be fast.
357 Also, this works because Queues are based on Sets, and that Set.iter
358 gives elements in increasing keys order *)
359 exception BreakOutOfLoop
361 let count_file_ready_sources m q throttled =
362 let ready_count = ref 0 in
363 let throttle_delay = get_throttle_delay m q throttled in
364 let ready_threshold =
365 last_time () - !!min_reask_delay - throttle_delay in
366 (try
367 Queue.iter
368 (fun (time, s) ->
369 if time >= ready_threshold then raise BreakOutOfLoop;
370 incr ready_count
371 ) m.manager_sources.(q)
372 with BreakOutOfLoop -> ());
373 !ready_count
376 * determine the total number of ready sources for all downloading files per queue
378 let count_ready_sources queue throttled =
379 List.fold_left (fun ready_count m ->
380 let f = m.manager_file () in
381 if file_state f = FileDownloading then
382 ready_count + count_file_ready_sources m queue throttled
383 else ready_count
384 ) 0 !file_sources_managers
387 let rec find_max_overloaded q managers =
388 let _, remaining_managers =
389 List.fold_left (fun ((current_max, remaining_managers) as acc) m ->
390 let ready_sources = count_file_ready_sources m q true in
391 if ready_sources > current_max then
392 (ready_sources, [m])
393 else if ready_sources = current_max then
394 (current_max, m :: remaining_managers)
395 else acc
396 ) (-1, []) managers in
397 remaining_managers
400 (*************************************************************************)
401 (* *)
402 (* print_source *)
403 (* *)
404 (*************************************************************************)
406 let print_source buf s =
407 Printf.bprintf buf "Source %d:\n" s.source_num;
408 Printf.bprintf buf " score: %d\n" s.source_score;
409 if s.source_age <> 0 then
410 Printf.bprintf buf " age: %d\n" s.source_age;
411 if s.source_last_attempt <> 0 then
412 Printf.bprintf buf " last_attemps: %d" s.source_last_attempt;
413 List.iter (fun r ->
414 Printf.bprintf buf " File %s\n"
415 (file_best_name (r.request_file.manager_file ()));
416 Printf.bprintf buf " Score: %d\n" r.request_score;
417 if r.request_time <> 0 then
418 Printf.bprintf buf " Time: %d\n" r.request_time;
419 ) s.source_files
424 * need_new_sources
428 let need_new_sources file =
429 let ready_threshold = last_time () - !!min_reask_delay in
430 let ready_count = ref 0 in
431 for i = good_sources_queue to old_sources1_queue do
432 let lookin = file.manager_sources.(i) in
434 Queue.iter (fun (time, s) ->
435 if time >= ready_threshold then raise BreakOutOfLoop;
436 incr ready_count
437 ) lookin
438 with BreakOutOfLoop -> ()
439 done;
440 (* let work_count = !ready_count +
441 (Queue.length ( file.manager_sources.( new_sources_queue ) )) +
442 (Queue.length ( file.manager_sources.( connected_sources_queue ) ))
443 in *)
444 let f = file.manager_file () in
445 (* lprintf "commonSources: need_new_source: ready= %d new= %d con= %d prio= %d %s\n"
446 !readyCount
447 (Queue.length ( file.manager_sources.( new_sources_queue ) ) )
448 (Queue.length ( file.manager_sources.( connected_sources_queue ) ) )
449 (file_priority f)
450 (if (file_priority f) + 20 > workCount then "we need" else "have enough");
452 (* (file_priority f) + 20 > work_count *)
453 (* let max_s = functions.function_max_sources_per_file () in
454 (file_priority f)*(max_s/20) + max_s > !all_ready_s + new_s *)
455 (file_priority f) + 20 > !ready_count
458 (*************************************************************************)
459 (* *)
460 (* print *)
461 (* *)
462 (*************************************************************************)
465 let print buf output_type =
466 let pos_to_string v =
467 if v > 0 then string_of_int v else "-" in
469 html_mods_cntr_init ();
470 let mycntr = ref 1 in
472 let html_tr () =
473 mycntr := html_mods_cntr ();
474 Printf.bprintf buf "\\<tr class=\\\"dl-%d\\\"\\>" !mycntr in
476 let html_tr_same () =
477 Printf.bprintf buf "\\<tr class=\\\"dl-%d\\\"\\>" !mycntr in
479 (* Header *)
480 if output_type = HTML then
481 let header = Printf.sprintf "File sources per manager queue (%d)"
482 (List.length !file_sources_managers) in
484 Printf.bprintf buf "\\<div class=results\\>";
485 html_mods_table_header buf "sourcesTable" "sources" [];
486 Printf.bprintf buf "\\<tr\\>";
487 html_mods_td buf [
488 ("", "srh", "Statistics on sources ");
489 ("", "srh", "@ " ^ log_time ());
490 ("", "srh", header); ];
491 Printf.bprintf buf "\\</tr\\>\\</table\\>\\</div\\>\n";
493 html_mods_table_header buf "sourcesTable" "sources" [
494 ( Str, "srh br", "New sources",
495 Printf.sprintf "New(%d)" new_sources_queue );
496 ( Str, "srh br", "Good sources",
497 Printf.sprintf "Good(%d)" good_sources_queue );
498 ( Str, "srh br", "Ready saved sources",
499 Printf.sprintf "Ready(%d)" ready_saved_sources_queue);
500 ( Str, "srh br", "Waiting saved sources",
501 Printf.sprintf "Wait(%d)" waiting_saved_sources_queue);
502 ( Str, "srh br", "Old sources 1",
503 Printf.sprintf "Old1(%d)" old_sources1_queue );
504 ( Str, "srh br", "Old sources 2",
505 Printf.sprintf "Old2(%d)" old_sources2_queue );
506 ( Str, "srh br", "Old sources 3",
507 Printf.sprintf "Old3(%d)" old_sources3_queue );
508 ( Str, "srh br", "Do not try sources",
509 Printf.sprintf "nTry(%d)" do_not_try_queue );
510 ( Str, "srh br", "Connected sources",
511 Printf.sprintf "Conn(%d)" connected_sources_queue );
512 ( Str, "srh br", "Connecting sources",
513 Printf.sprintf "Cing(%d)" connecting_sources_queue );
514 ( Str, "srh br", "Busy sources",
515 Printf.sprintf "Busy(%d)" busy_sources_queue );
516 ( Str, "srh br", "Total sources", "All" );
517 ( Str, "srh br", "Filename", "Name" ); ];
518 else begin
519 Printf.bprintf buf "Statistics on sources: time %d\n" (last_time ());
520 Printf.bprintf buf "File sources per manager queue(%d):\n"
521 (List.length !file_sources_managers);
522 Printf.bprintf buf "new good redy wait old1 old2 old3 ntry conn cing busy all\n";
523 (* "9999 9999 9999 9999 9999 9999 9999 9999 9999 9999 9999 9999"
524 11*5 chars
525 one row each: all,indirect,ready *)
526 end;
528 let list_sum = List.fold_left (+) 0 in
530 let nsources_per_queue = Array.make nqueues 0 in
531 let nready_per_queue = Array.make nqueues 0 in
532 let nindirect_per_queue = Array.make nqueues 0 in
533 let ninvalid_per_queue = Array.make nqueues 0 in
534 let nall = ref 0 in
535 let naact = ref 0 in
536 let naneed = ref 0 in
537 let downloading_managers =
538 List.filter (fun m ->
539 file_state (m.manager_file ()) = FileDownloading
540 ) !file_sources_managers in
541 let my_file_sources_managers =
542 List.sort (fun f1 f2 ->
543 let best_name1 = file_best_name (f1.manager_file ()) in
544 let best_name2 = file_best_name (f2.manager_file ()) in
545 String.compare best_name1 best_name2
546 ) downloading_managers in
547 (* Files *)
548 let ready_threshold = last_time () - !!min_reask_delay in
549 List.iter (fun m ->
550 let name = file_best_name (m.manager_file ()) in
551 let need_sources = need_new_sources m in
552 if need_sources then incr naneed;
554 if m.manager_all_sources <> 0 then begin
555 let slist = ref [] in
556 let sreadylist = ref [] in
557 let streadylist = ref [] in
558 let sindirectlist = ref [] in
559 let sinvalidlist = ref [] in
560 (* Queues *)
561 Array.iteri (fun i q ->
562 let nready = ref 0 in
563 let ntready = count_file_ready_sources m i true in
564 let nindirect = ref 0 in
565 let ninvalid = ref 0 in
566 let nsources = ref 0 in
567 (* Sources *)
568 Queue.iter (fun (time, s) ->
569 incr nsources;
570 if M.indirect_source s.source_uid then incr nindirect
571 else if not (M.direct_source s.source_uid) then incr ninvalid;
572 if time < ready_threshold then incr nready
573 else if i = new_sources_queue then begin
574 Printf.bprintf buf "ERROR: Source is not ready in new_sources_queue !\n";
575 print_source buf s
577 ) q;
579 slist := Queue.length q :: !slist;
580 sreadylist := !nready :: !sreadylist;
581 streadylist := ntready :: !streadylist;
582 sindirectlist := !nindirect :: !sindirectlist;
583 sinvalidlist := !ninvalid :: !sinvalidlist;
585 nready_per_queue.(i) <- nready_per_queue.(i) + !nready;
586 nindirect_per_queue.(i) <- nindirect_per_queue.(i) + !nindirect;
587 ninvalid_per_queue.(i) <- ninvalid_per_queue.(i) + !ninvalid;
588 nsources_per_queue.(i) <- nsources_per_queue.(i) + !nsources;
589 ) m.manager_sources; (* end Queues *)
591 let slist = List.rev !slist in
592 let sreadylist = List.rev !sreadylist in
593 let streadylist = List.rev !streadylist in
594 let sindirectlist = List.rev !sindirectlist in
595 let sinvalidlist = List.rev !sinvalidlist in
597 if output_type = HTML then begin
598 html_tr ();
599 html_mods_td buf (
600 (List.map (fun qlength ->
601 ("", "sr ar br", pos_to_string qlength)) slist) @
602 [ ("", "sr ar br", string_of_int m.manager_all_sources);
603 ("Filename", "sr", shorten name !!max_name_len); ] );
605 Printf.bprintf buf "\\</tr\\>\n";
607 html_tr_same ();
608 html_mods_td buf (
609 (List.map (fun sready ->
610 ("", "sr ar br", pos_to_string sready)) sreadylist) @
611 [ ("", "sr ar br", Printf.sprintf "%d" (list_sum sreadylist));
612 ("", "sr", Printf.sprintf "ready with %d active%s"
613 m.manager_active_sources
614 (if need_sources then " and needs sources"
615 else "")) ] );
616 Printf.bprintf buf "\\</tr\\>\n";
618 html_tr_same ();
619 html_mods_td buf (
620 (List.map (fun sready ->
621 ("", "sr ar br", pos_to_string sready)) streadylist) @
622 [ ("", "sr ar br", string_of_int (list_sum streadylist));
623 ("", "sr", "throttled ready"); ] );
624 Printf.bprintf buf "\\</tr\\>\n";
626 let anindirect = list_sum sindirectlist in
627 if anindirect <> 0 then begin
628 html_tr_same ();
629 html_mods_td buf (
630 (List.map (fun sready ->
631 ("", "sr ar br", pos_to_string sready)) sindirectlist) @
632 [ ("", "sr ar br", string_of_int anindirect);
633 ("", "sr", "indirect"); ] );
634 Printf.bprintf buf "\\</tr\\>\n";
635 end;
637 let aninvalid = list_sum sinvalidlist in
638 if aninvalid <> 0 then begin
639 html_tr_same ();
640 html_mods_td buf (
641 (List.map (fun sready ->
642 ("", "sr ar br", pos_to_string sready)) sinvalidlist) @
643 [ ("", "sr ar br", string_of_int aninvalid);
644 ("", "sr", "invalid"); ] );
645 Printf.bprintf buf "\\</tr\\>\n";
646 end;
648 else begin
649 List.iter (Printf.bprintf buf "%4d ") slist;
650 Printf.bprintf buf "%4d %s\n" m.manager_all_sources name;
651 List.iter (Printf.bprintf buf "%4d ") sreadylist;
652 Printf.bprintf buf "%4d ready %d active%s\n"
653 (list_sum sreadylist) m.manager_active_sources
654 (if need_sources then " needs sources"
655 else "");
656 List.iter (Printf.bprintf buf "%4d ") streadylist;
657 Printf.bprintf buf "%4d throttled ready\n"
658 (list_sum streadylist);
659 let anindirect = list_sum sindirectlist in
660 if anindirect <> 0 then begin
661 List.iter (Printf.bprintf buf "%4d ") sindirectlist;
662 Printf.bprintf buf "%4d indirect\n" anindirect;
663 end;
664 let aninvalid = list_sum sinvalidlist in
665 if aninvalid <> 0 then begin
666 List.iter (Printf.bprintf buf "%4d ") sinvalidlist;
667 Printf.bprintf buf "%4d invalid\n" aninvalid;
669 end;
671 nall := !nall + m.manager_all_sources;
672 naact := !naact + m.manager_active_sources;
674 else begin (* m.manager_all_sources = 0 *)
675 if output_type = HTML then begin
676 html_tr ();
678 html_mods_td buf [
679 ("", "sr ar br", "-"); ("", "sr ar br", "");
680 ("", "sr ar br", ""); ("", "sr ar br", "");
681 ("", "sr ar br", ""); ("", "sr ar br", "");
682 ("", "sr ar br", ""); ("", "sr ar br", "");
683 ("", "sr ar br", ""); ("", "sr ar br", "");
684 ("", "sr ar br", ""); ("", "sr ar br", "");
685 ("", "sr br", shorten name !!max_name_len); ];
686 Printf.bprintf buf "\\</tr\\>\n";
688 else Printf.bprintf buf "None %55s%s\n" "" name;
690 ) my_file_sources_managers; (* end Files *)
692 (* next Header *)
693 if output_type = HTML then begin
694 Printf.bprintf buf "\\</table\\>\\</div\\>\n";
696 html_mods_table_header buf "sourcesTable" "sources" [
697 ( Str, "srh", "New sources", "New" );
698 ( Str, "srh", "Good sources", "Good" );
699 ( Str, "srh", "Ready sources", "Ready" );
700 ( Str, "srh", "Waiting sources", "Wait" );
701 ( Str, "srh", "Old sources 1", "Old1" );
702 ( Str, "srh", "Old sources 2", "Old2" );
703 ( Str, "srh", "Old sources 3", "Old3" );
704 ( Str, "srh", "Do not try", "nTry" );
705 ( Str, "srh", "Connected sources", "Conn" );
706 ( Str, "srh", "Connecting sources", "Cing" );
707 ( Str, "srh", "Busy sources", "Busy" );
708 ( Str, "srh", "Total sources", "All" );
709 ( Str, "srh", "Type", "Type" ); ];
712 else
713 Printf.bprintf buf "new good redy wait old1 old2 old3 ntry conn cing busy all\n";
715 let slist = ref [] in
716 let sreadylist = ref [] in
717 let streadylist = ref [] in
718 let sindirectlist = ref [] in
719 let sinvalidlist = ref [] in
720 let speriodlist = ref [] in
721 (* Queues *)
722 for i = 0 to nqueues - 1 do
723 slist := nsources_per_queue.(i) :: !slist;
724 sreadylist := nready_per_queue.(i) :: !sreadylist;
725 streadylist := count_ready_sources i true :: !streadylist;
726 sindirectlist := nindirect_per_queue.(i) :: !sindirectlist;
727 sinvalidlist := ninvalid_per_queue.(i) :: !sinvalidlist;
728 speriodlist := queue_period.(i) :: !speriodlist;
729 done; (* end Queues *)
731 let nsources = ref 0 in
732 let nroq = ref 0 in
733 HS.iter (fun s ->
734 incr nsources;
735 List.iter (fun r ->
736 if r.request_queue = outside_queue then
737 incr nroq;
738 ) s.source_files;
739 ) sources_by_uid;
741 let slist = List.rev !slist in
742 let sreadylist = List.rev !sreadylist in
743 let streadylist = List.rev !streadylist in
744 let sindirectlist = List.rev !sindirectlist in
745 let sinvalidlist = List.rev !sinvalidlist in
746 let speriodlist = List.rev !speriodlist in
748 if output_type = HTML then begin
749 html_tr ();
750 html_mods_td buf (
751 (List.map (fun q ->
752 ("", "sr ar", pos_to_string q)) slist) @
753 [ ("", "sr ar", Printf.sprintf "%d" !nall);
754 ("", "sr",
755 Printf.sprintf "all source managers (%d by UID) (%d ROQ)"
756 !nsources !nroq);] );
757 Printf.bprintf buf "\\</tr\\>\n";
759 html_tr ();
760 html_mods_td buf (
761 (List.map (fun sready ->
762 ("", "sr ar", pos_to_string sready)) sreadylist) @
763 [ ("", "sr ar", Printf.sprintf "%d" (list_sum sreadylist));
764 ("", "sr",
765 Printf.sprintf "ready with %d active and %i need sources"
766 !naact !naneed); ] );
767 Printf.bprintf buf "\\</tr\\>\n";
769 html_tr ();
770 html_mods_td buf (
771 (List.map (fun sready ->
772 ("", "sr ar", pos_to_string sready)) streadylist) @
773 [ ("", "sr ar", Printf.sprintf "%d" (list_sum streadylist));
774 ("", "sr", "throttled ready"); ] );
775 Printf.bprintf buf "\\</tr\\>\n";
777 let anindirect = list_sum sindirectlist in
778 if anindirect <> 0 then begin
779 html_tr ();
780 html_mods_td buf (
781 (List.map (fun sready ->
782 ("", "sr ar", pos_to_string sready)) sindirectlist) @
783 [ ("", "sr ar", Printf.sprintf "%d" anindirect);
784 ("", "sr", "indirect"); ] );
785 Printf.bprintf buf "\\</tr\\>\n";
786 end;
788 let aninvalid = list_sum sinvalidlist in
789 if aninvalid <> 0 then begin
790 html_tr ();
791 html_mods_td buf (
792 (List.map (fun sready ->
793 ("", "sr ar", pos_to_string sready)) sinvalidlist) @
794 [ ("", "sr ar", Printf.sprintf "%d" aninvalid);
795 ("", "sr", "invalid"); ] );
796 Printf.bprintf buf "\\</tr\\>\n";
797 end;
799 html_tr ();
800 html_mods_td buf (
801 (List.map (fun sready ->
802 ("", "sr ar", pos_to_string sready)) speriodlist) @
803 [ ("", "sr", "");
804 ("", "sr", "period"); ] );
805 Printf.bprintf buf "\\</tr\\>\n";
807 Printf.bprintf buf "\\</table\\>\\</div\\>\n";
809 else begin
810 List.iter (Printf.bprintf buf "%4d ") slist;
811 Printf.bprintf buf "%4d all source managers (%d by UID) (%d ROQ)\n"
812 !nall !nsources !nroq;
813 List.iter (Printf.bprintf buf "%4d ") sreadylist;
814 Printf.bprintf buf "%4d ready %d active %i need sources\n"
815 (list_sum sreadylist) !naact !naneed;
816 List.iter (Printf.bprintf buf "%4d ") streadylist;
817 Printf.bprintf buf "%4d throttled ready\n" (list_sum streadylist);
818 let anindirect = list_sum sindirectlist in
819 if anindirect <> 0 then begin
820 List.iter (Printf.bprintf buf "%4d ") sindirectlist;
821 Printf.bprintf buf "%4d indirect\n" anindirect;
822 end;
823 let aninvalid = list_sum sinvalidlist in
824 if aninvalid <> 0 then begin
825 List.iter (Printf.bprintf buf "%4d ") sinvalidlist;
826 Printf.bprintf buf "%4d invalid\n" aninvalid;
827 end;
828 List.iter (Printf.bprintf buf "%4d ") speriodlist;
829 Printf.bprintf buf " period\n";
830 end;
832 let nconnected = ref 0 in
833 Fifo.iter (fun (_, s) ->
834 if s.source_last_attempt = 0 then incr nconnected;
835 ) connecting_sources;
836 if output_type = HTML then begin
837 html_mods_table_header buf "sourcesTable" "sources" [
838 ( Str, "srh", "Connecting sources", "Connecting sources" );
839 ( Str, "srh", "Next direct sources", "Next direct sources" );
840 ( Str, "srh", "Next indirect sources", "Next indirect sources" ); ];
841 Printf.bprintf buf "\\<tr class=\\\"dl-1\\\"\\>";
842 html_mods_td buf [
843 ("", "sr", (Printf.sprintf "%d entries"
844 (Fifo.length connecting_sources)) ^
845 (if !nconnected > 0 then
846 Printf.sprintf " (connected: %d)" !nconnected else ""));
847 ("", "sr", Printf.sprintf "%d entries"
848 (Fifo.length next_direct_sources));
849 ("", "sr", Printf.sprintf "%d entries"
850 (List.length !next_indirect_sources)); ];
851 Printf.bprintf buf "\\</tr\\>\\</table\\>\\</div\\>\n\\</div\\>"
853 else begin
854 Printf.bprintf buf "Connecting Sources: %d entries"
855 (Fifo.length connecting_sources);
856 if !nconnected > 0 then
857 Printf.bprintf buf " (connected: %d)" !nconnected;
858 Printf.bprintf buf "\n";
859 Printf.bprintf buf "Next Direct Sources: %d entries\n"
860 (Fifo.length next_direct_sources);
861 Printf.bprintf buf "Next Indirect Sources: %d entries\n"
862 (List.length !next_indirect_sources)
866 (*************************************************************************)
867 (* *)
868 (* reschedule_source_for_file *)
869 (* *)
870 (*************************************************************************)
872 let reschedule_source_for_file saved s r =
873 if r.request_queue = outside_queue then
874 let queue =
875 if r.request_score = not_found_score then do_not_try_queue
876 else if s.source_last_attempt <> 0 then connecting_sources_queue
877 else
878 match s.source_sock with
879 | (NoConnection | ConnectionWaiting _) ->
880 (* State (1) *)
881 (* Two things matter: the global score and the local score *)
882 if s.source_score < 1 then
883 (* 2.5.25, replaced expected_score by
884 found_score, so that sources which
885 only have the file are not put in
886 good_sources_queue, unless they have
887 an interesting chunk AND not a bad
888 rank. *)
889 if r.request_score > found_score then
890 if saved then
891 if r.request_time + !!min_reask_delay < last_time () then
892 ready_saved_sources_queue
893 else waiting_saved_sources_queue
894 else if r.request_score = initial_new_source_score then
895 new_sources_queue
896 else good_sources_queue
897 else if r.request_score >= new_source_score then
898 old_sources1_queue
899 else old_sources2_queue
900 else if s.source_score < 5 then old_sources3_queue
901 else do_not_try_queue
903 | Connection _ ->
904 (* State (3) *)
905 if r.request_time = 0 then busy_sources_queue
906 else connected_sources_queue
908 let m = r.request_file in
909 if !verbose_sources > 1 then
910 lprintf_nl "[cSrc] Put source %d in queue %s"
911 s.source_num queue_name.(queue);
912 Queue.put m.manager_sources.(queue) (r.request_time, s);
913 if active_queue queue then
914 m.manager_active_sources <- m.manager_active_sources + 1;
915 m.manager_all_sources <- m.manager_all_sources + 1;
916 r.request_queue <- queue
918 (*************************************************************************)
919 (* *)
920 (* iter_all_sources *)
921 (* *)
922 (*************************************************************************)
924 let iter_all_sources f m =
925 Array.iter (fun q ->
926 Queue.iter (fun (_, s) -> f s) q
927 ) m.manager_sources
929 (*************************************************************************)
930 (* iter_qualified_sources *)
931 (* Only these sources should be used in sourceexchage *)
932 (*************************************************************************)
933 let iter_qualified_sources f m =
934 let q = m.manager_sources.(good_sources_queue) in
935 Queue.iter (fun (_, s) -> f s) q
937 (*************************************************************************)
938 (* *)
939 (* iter_active_sources *)
940 (* *)
941 (*************************************************************************)
943 let iter_active_sources f m =
944 for i = connected_sources_queue to busy_sources_queue do
945 let q = m.manager_sources.(i) in
946 Queue.iter (fun (_, s) -> f s) q
947 done
949 (*************************************************************************)
950 (* *)
951 (* iter_relevant_sources *)
952 (* *)
953 (*************************************************************************)
954 let iter_relevant_sources f m =
955 List.iter (fun i ->
956 if i < nqueues then
957 let q = m.manager_sources.(i) in
958 Queue.iter (fun (_, s) -> f s) q
959 ) !!relevant_queues
961 (*************************************************************************)
962 (* *)
963 (* set_source_brand *)
964 (* *)
965 (*************************************************************************)
967 let set_source_brand s brand =
968 s.source_brand <- brand
970 (*************************************************************************)
971 (* *)
972 (* source_brand *)
973 (* *)
974 (*************************************************************************)
976 let source_brand s = s.source_brand
978 (*************************************************************************)
979 (* *)
980 (* remove_from_queue *)
981 (* *)
982 (*************************************************************************)
984 let remove_from_queue s r =
985 if r.request_queue <> outside_queue then begin
986 if !verbose_sources > 1 then
987 lprintf_nl "[cSrc] Remove source %d from queue %s" s.source_num
988 queue_name.(r.request_queue);
990 let m = r.request_file in
991 if active_queue r.request_queue then
992 m.manager_active_sources <- m.manager_active_sources - 1;
993 Queue.remove r.request_file.manager_sources.(r.request_queue)
994 (r.request_time, s);
995 r.request_queue <- outside_queue;
996 m.manager_all_sources <- m.manager_all_sources - 1
999 (*************************************************************************)
1000 (* *)
1001 (* source_connecting *)
1002 (* *)
1003 (*************************************************************************)
1005 (* From state (1) to state (2) *)
1006 let source_connecting s =
1007 s.source_last_attempt <- last_time ();
1008 Fifo.put connecting_sources (s.source_last_attempt, s);
1009 List.iter (fun r ->
1010 if r.request_queue <> outside_queue then begin
1011 remove_from_queue s r;
1012 reschedule_source_for_file false s r;
1014 ) s.source_files
1017 (*************************************************************************)
1018 (* *)
1019 (* source_query *)
1020 (* *)
1021 (*************************************************************************)
1023 let source_query s r =
1024 remove_from_queue s r;
1025 if r.request_score > not_found_score then
1026 (* query_files will query all files for a source, check that we are
1027 really downloading! example source s has file f1 and file f2,
1028 file f2 is paused we connect because of f1 and then query both
1029 files f1 and f2 ... and yes, we do a cleanup ... but a timed one,
1030 so we can't be sure *)
1031 if r.request_score > not_found_score &&
1032 file_state (r.request_file.manager_file ()) = FileDownloading
1033 then begin
1034 r.request_time <- 0; (* The source is ready for this request *)
1035 reschedule_source_for_file false s r; (* put it in busy_sources_queue *)
1036 (try
1037 functions.function_query s.source_uid r.request_file.manager_uid
1038 with e ->
1039 lprintf_nl "[cSrc] Exception %s in functions.function_query" (Printexc2.to_string e)
1043 (*************************************************************************)
1044 (* *)
1045 (* source_connected *)
1046 (* *)
1047 (*************************************************************************)
1049 (* From state (2) to state (3) *)
1050 let source_connected s =
1051 s.source_score <- 0;
1052 s.source_age <- last_time ();
1053 s.source_last_attempt <- 0;
1054 List.iter (fun r ->
1055 (* lprintf "SOURCE> request: "; *)
1056 if r.request_queue <> outside_queue then begin
1057 (* lprintf "score %d/%d last query %s\n"
1058 r.request_score possible_score
1059 (if r.request_time = 0 then "never" else
1060 Printf.sprintf "%d secs"
1061 (last_time () - r.request_time)); *)
1062 remove_from_queue s r;
1063 if r.request_score > possible_score &&
1064 r.request_time + !!min_reask_delay < last_time () then
1065 source_query s r;
1066 (try
1067 let m = r.request_file in
1068 functions.function_add_location s.source_uid
1069 m.manager_uid s.source_country_code with _ -> ());
1070 reschedule_source_for_file false s r
1071 end
1072 (* else lprintf "outside queue\n" *)
1073 ) s.source_files
1075 (*************************************************************************)
1076 (* *)
1077 (* source_disconnected *)
1078 (* *)
1079 (*************************************************************************)
1081 (* From states (1) or (2) to state (3) *)
1082 let source_disconnected s =
1083 (match s.source_sock with
1084 | NoConnection -> ()
1085 | ConnectionWaiting token ->
1086 cancel_token token;
1087 s.source_sock <- NoConnection
1088 | Connection sock ->
1089 close sock Closed_for_timeout
1091 let connecting = s.source_last_attempt <> 0 in
1092 (* source_last_attempt set to time, on connect_reply set
1093 to zero. if we never reached connect_reply, the ip is
1094 dead. Then we think we were *not* trying to connect
1095 later on ...
1097 s.source_last_attempt <- 0;
1098 List.iter (fun r ->
1099 if r.request_queue <> outside_queue then begin
1100 remove_from_queue s r;
1101 if connecting then begin
1102 r.request_time <- last_time ();
1103 if r.request_score = initial_new_source_score then
1104 set_score_part r new_source_score
1106 else begin
1107 if r.request_time = 0 then
1108 (* we think we were not connecting,
1109 but in some cases we were! and
1110 now we imidiately reconnect for
1111 that file, on a dead IP??
1112 r.request_time <- last_time () - 600;
1113 try this instead:
1115 r.request_time <- last_time ();
1116 (try
1117 let m = r.request_file in
1118 functions.function_remove_location s.source_uid
1119 m.manager_uid
1120 with _ -> ())
1121 end;
1122 reschedule_source_for_file false s r;
1123 end;
1124 ) s.source_files
1126 (*************************************************************************)
1127 (* *)
1128 (* connect_source *)
1129 (* *)
1130 (*************************************************************************)
1132 let connect_source s =
1133 if !verbose_sources > 1 then
1134 lprintf_nl "[cSrc] connect_source";
1135 s.source_score <- s.source_score + 1;
1136 functions.function_connect s.source_uid s.source_country_code
1138 (*************************************************************************)
1139 (* *)
1140 (* create_queues *)
1141 (* *)
1142 (*************************************************************************)
1144 let create_queues () =
1145 let queues = [|
1146 (* New sources *)
1147 (* We should change this to 'oldest_last' to improve Queue.remove *)
1148 (* instead of lifo *)
1149 SourcesQueueCreate.oldest_last ();
1150 (* Good sources *)
1151 (* We should change this to 'oldest_first' to improve Queue.remove *)
1152 (* instead of fifo *)
1153 SourcesQueueCreate.oldest_first ();
1154 (* Ready saved sources *)
1155 SourcesQueueCreate.oldest_last ();
1156 (* Waiting saved sources *)
1157 SourcesQueueCreate.oldest_first ();
1158 (* Old sources *)
1159 (* We should change this to 'oldest_first' to improve Queue.remove *)
1160 (* instead of fifo *)
1161 SourcesQueueCreate.oldest_first ();
1162 SourcesQueueCreate.oldest_first ();
1163 SourcesQueueCreate.oldest_first ();
1164 (* do_not_try *)
1165 SourcesQueueCreate.oldest_first ();
1166 (* Connected Sources *)
1167 SourcesQueueCreate.oldest_first ();
1168 (* Connecting Sources *)
1169 SourcesQueueCreate.oldest_first ();
1170 (* Busy Sources *)
1171 SourcesQueueCreate.oldest_first ();
1172 |] in
1173 if Array.length queues <> Array.length queue_name then begin
1174 lprintf_nl "[cSrc] Fatal error in CommonSources.create_queues";
1175 exit 2;
1176 end;
1177 queues
1179 (*************************************************************************)
1180 (* *)
1181 (* create_file_sources_manager *)
1182 (* *)
1183 (*************************************************************************)
1185 let create_file_sources_manager file_uid =
1186 let m = {
1187 manager_uid = file_uid;
1188 manager_file = not_implemented "manager_file";
1189 manager_all_sources = 0;
1190 manager_active_sources = 0;
1191 manager_sources = create_queues ();
1192 } in
1193 file_sources_managers := m :: !file_sources_managers;
1196 (*************************************************************************)
1197 (* *)
1198 (* remove_file_sources_manager *)
1199 (* *)
1200 (*************************************************************************)
1202 let remove_file_sources_manager m =
1203 iter_all_sources (fun s ->
1204 s.source_files <-
1205 List.filter (fun r -> r.request_file != m) s.source_files;
1206 ) m;
1207 m.manager_sources <- create_queues ();
1208 file_sources_managers := List2.removeq m !file_sources_managers
1211 (*************************************************************************)
1212 (* *)
1213 (* number_of_sources *)
1214 (* *)
1215 (*************************************************************************)
1216 (* get number of sources for a file*)
1217 let number_of_sources f =
1218 f.manager_all_sources
1220 (*************************************************************************)
1221 (* *)
1222 (* create_source_by_uid *)
1223 (* *)
1224 (*************************************************************************)
1226 let create_source_by_uid uid cc =
1228 let finder = { dummy_source with source_uid = uid } in
1229 HS.find sources_by_uid finder
1231 with Not_found ->
1232 if !verbose_sources > 1 then
1233 lprintf_nl "[cSrc] Creating new source";
1234 let n = CommonClient.book_client_num () in
1235 let s = { dummy_source with
1236 source_uid = uid;
1237 source_age = 0;
1238 source_num = n;
1239 source_files = [];
1240 source_country_code = cc;
1241 } in
1242 HS.add sources_by_uid s;
1243 H.add sources_by_num s;
1246 (*************************************************************************)
1247 (* *)
1248 (* find_source_by_uid *)
1249 (* *)
1250 (*************************************************************************)
1252 let find_source_by_uid uid =
1253 let finder = { dummy_source with source_uid = uid } in
1254 HS.find sources_by_uid finder
1256 (*************************************************************************)
1257 (* *)
1258 (* find_source_by_num *)
1259 (* *)
1260 (*************************************************************************)
1262 let find_source_by_num num =
1263 let finder = { dummy_source with source_num = num } in
1264 H.find sources_by_num finder
1266 (*************************************************************************)
1267 (* *)
1268 (* find_request *)
1269 (* *)
1270 (*************************************************************************)
1272 let rec iter_has_request rs file =
1273 match rs with
1274 | [] -> raise Not_found
1275 | r :: tail ->
1276 if r.request_file == file then r
1277 else iter_has_request tail file
1279 let find_request s file =
1280 iter_has_request s.source_files file
1282 (*************************************************************************)
1283 (* *)
1284 (* find_request_result *)
1285 (* *)
1286 (*************************************************************************)
1288 let find_request_result s file =
1289 let r = find_request s file in
1290 let score = r.request_score in
1291 if score <= not_found_score then File_not_found
1292 else if score <= possible_score then File_possible
1293 else if score <= found_score then File_found
1294 else if score <= chunk_score then File_chunk
1295 else if score <= initial_new_source_score then File_new_source
1296 else assert false
1298 (*************************************************************************)
1299 (* *)
1300 (* add_request *)
1301 (* *)
1302 (*************************************************************************)
1304 let check_time time =
1305 if time = 0 then last_time () - 650
1306 else time (* changed 2.5.24 *)
1308 let add_request s file time =
1309 let r =
1311 let r = find_request s file in
1312 remove_from_queue s r;
1313 set_score_part r (if r.request_score = initial_new_source_score then
1314 new_source_score
1315 else r.request_score - 1);
1316 r.request_time <- check_time time;
1318 with Not_found ->
1319 let r = {
1320 request_file = file;
1321 request_time = check_time time;
1322 request_score = possible_score;
1323 request_queue = outside_queue;
1324 } in
1325 s.source_files <- r :: s.source_files;
1326 r in
1327 reschedule_source_for_file false s r;
1330 (*************************************************************************)
1331 (* *)
1332 (* set_request_score *)
1333 (* *)
1334 (*************************************************************************)
1336 let rec set_request_score s file score =
1338 let r = find_request s file in
1339 if (not (
1340 (* If a request has been done in the last half-hour, and the source is
1341 announced as new, just forget it. : why half-hour? - trying min_reask_delay *)
1342 score = initial_new_source_score &&
1343 r.request_time + !!min_reask_delay > last_time ()
1344 )) ||
1345 (* If a file has been paused, and resumed, it is flagged outside_queue / not_found_score in
1346 clean_sources, but really should be re-added to the queues as soon as possible (while retaining
1347 its request_time) or it is skipped for far too long (if it is even found again) - reschedule
1348 now puts new_source_score in old1 *)
1349 (score = initial_new_source_score &&
1350 r.request_queue = outside_queue) then
1351 let score =
1352 if score = initial_new_source_score
1353 then new_source_score
1354 else score in
1355 if r.request_queue < connected_sources_queue then
1356 remove_from_queue s r;
1357 set_score_part r score;
1358 reschedule_source_for_file false s r;
1359 with Not_found ->
1360 let r = {
1361 request_file = file;
1362 request_time = check_time 0;
1363 request_score = possible_score;
1364 request_queue = outside_queue;
1365 } in
1366 set_score_part r score;
1367 s.source_files <- r :: s.source_files;
1368 reschedule_source_for_file false s r
1370 (*************************************************************************)
1371 (* *)
1372 (* set_request_result *)
1373 (* *)
1374 (*************************************************************************)
1376 let set_request_result s file result =
1377 set_request_score s file
1378 (match result with
1379 | File_not_found -> not_found_score
1380 | File_found -> found_score
1381 | File_chunk -> chunk_score
1382 | File_upload -> upload_score
1383 | File_new_source -> initial_new_source_score
1384 | File_possible -> possible_score + 1)
1386 (*************************************************************************)
1387 (* *)
1388 (* source_to_value *)
1389 (* *)
1390 (*************************************************************************)
1392 let source_to_value s assocs =
1393 let requests = ref [] in
1394 List.iter (fun r ->
1395 if r.request_score > possible_score then
1396 requests :=
1397 (SmallList
1398 [once_value (string_to_value r.request_file.manager_uid);
1399 int_to_value r.request_score;
1400 int_to_value r.request_time]
1401 ) :: !requests
1402 ) s.source_files;
1403 if !requests = [] then raise Exit;
1405 ("sscore", int_to_value s.source_score ) ::
1406 ("addr", M.source_uid_to_value s.source_uid ) ::
1407 ("brand", M.source_brand_to_value s.source_brand ) ::
1408 ("files", smalllist_to_value (fun s -> s)
1409 !requests) ::
1410 ("age", int_to_value s.source_age ) ::
1411 assocs
1415 (*************************************************************************)
1416 (* *)
1417 (* query_file *)
1418 (* *)
1419 (*************************************************************************)
1421 let query_file s file =
1422 if file_state (file.manager_file ()) = FileDownloading then
1423 let r = find_request s file in
1424 if r.request_time + !!min_reask_delay <= last_time () then
1426 (* There is really no need to query a not found source again
1427 for the file ... not even after an hour! *)
1428 if r.request_score > not_found_score then
1429 source_query s r
1432 (*************************************************************************)
1433 (* *)
1434 (* query_files *)
1435 (* *)
1436 (*************************************************************************)
1437 (* Query a source for all of its known files*)
1438 let query_files s =
1439 List.iter (fun f ->
1440 query_file s f.request_file;
1441 ) s.source_files
1444 (*************************************************************************)
1445 (* *)
1446 (* add_saved_source_request *)
1447 (* *)
1448 (*************************************************************************)
1450 let add_saved_source_request s uid score time =
1451 if !verbose_sources > 1 then
1452 lprintf_nl "[cSrc] Request %s %d %d" uid score time;
1453 let file =
1455 functions.function_string_to_manager uid
1456 with e ->
1457 if !verbose_sources > 0 then
1458 lprintf_nl "[cSrc] CommonSources: add_saved_source_request -> %s not found" uid;
1459 raise e
1461 let r = add_request s file time in
1462 set_score_part r score;
1463 reschedule_source_for_file true s r;
1464 if !verbose_sources > 1 then
1465 lprintf_nl "[cSrc] Put saved source %d in queue %s" s.source_num
1466 queue_name.(r.request_queue)
1468 (*************************************************************************)
1469 (* *)
1470 (* value_to_source *)
1471 (* *)
1472 (*************************************************************************)
1474 let value_to_source assocs =
1475 (* lprintf "(1) value_to_source\n"; *)
1476 let get_value name conv = conv (List.assoc name assocs) in
1478 let addr = get_value "addr" M.value_to_source_uid in
1479 let files = get_value "files"
1480 (value_to_list (fun s -> s)) in
1482 let last_conn =
1483 try get_value "age" value_to_int with _ -> 0 in
1485 let score = try get_value "sscore" value_to_int with _ -> 0 in
1486 let brand = try get_value "brand" M.value_to_source_brand with _ ->
1487 M.dummy_source_brand in
1489 if !verbose_sources > 1 then
1490 lprintf_nl "[cSrc] New source from value";
1491 let s = create_source_by_uid addr None in
1492 s.source_score <- score;
1493 s.source_age <- last_conn;
1494 s.source_brand <- brand;
1496 (* lprintf "(2) value_to_source \n"; *)
1498 let rec iter v =
1499 match v with
1500 | OnceValue v -> iter v
1501 | List [uid; score; time] | SmallList [uid; score; time] ->
1502 (try
1503 let uid = value_to_string uid in
1504 let score = value_to_int score in
1505 let time = value_to_int time in
1507 (* added in 2.5.27 to fix a bug introduced in 2.5.25 *)
1508 let score =
1509 if score land 0xffff = 0 then score asr 16 else score in
1511 (* lprintf "(3) value_to_source \n"; *)
1513 add_saved_source_request s uid score time
1515 with e ->
1516 if !verbose_sources > 1 then
1517 lprintf_nl "[cSrc] CommonSources.value_to_source: exception %s in iter request"
1518 (Printexc2.to_string e))
1520 | (StringValue _) as uid ->
1521 (try
1522 let uid = value_to_string uid in
1523 (* lprintf "(4) value_to_source \n"; *)
1525 let score = 0 in
1526 let time = 0 in
1527 add_saved_source_request s uid score time
1529 with e ->
1530 if !verbose_sources > 1 then
1531 lprintf_nl "[cSrc] CommonSources.value_to_source: exception %s in iter request"
1532 (Printexc2.to_string e))
1534 | _ -> assert false
1537 (* lprintf "(5) value_to_source \n"; *)
1538 List.iter iter files;
1539 (* lprintf "(6) value_to_source \n"; *)
1540 raise SideEffectOption
1542 (*************************************************************************)
1543 (* *)
1544 (* refill_sources *)
1545 (* *)
1546 (*************************************************************************)
1548 let refill_sources () =
1550 (* Wait for 9 seconds before refilling, since we put at least 10 seconds
1551 of clients in the previous bucket.
1553 wrong assumption for me :
1554 we may have failed to fill the queue with what was available
1555 if !last_refill + 8 < last_time () then
1558 last_refill := last_time ();
1559 if !verbose_sources > 0 then begin
1560 lprintf_nl "[cSrc] CommonSources.refill_sources BEFORE:";
1561 let buf = Buffer.create 100 in
1562 print buf TEXT;
1563 lprintf "%s\n" (Buffer.contents buf);
1564 end;
1567 how much consecutive sources in the queue a file can have
1568 source_f1|source_f1|source_f1|source_f2...
1569 <- - - - - - - 3 - - - - - ->
1570 10 for finer priority scaling
1572 let max_consecutive = 10 in
1575 get at most nsources direct sources from a file
1576 return number of sources found,new queue position
1578 let rec get_sources nsource m queue took =
1579 (* do_not_try == avoid source bounceback, i.e. a dustbin *)
1580 if queue >= do_not_try_queue || nsource <= 0 then
1581 (* we tried all queue or found enough sources, good bye!*)
1582 took
1583 else
1584 let q = m.manager_sources.(queue) in
1585 if Queue.length q > 0 then
1586 let (request_time, s) = Queue.head q in
1587 let throttled = queue_period.(queue) > 0 && nsource > 1 in
1588 let throttle_delay = get_throttle_delay m queue throttled in
1589 if request_time + !!min_reask_delay + throttle_delay < last_time () then begin
1590 if !verbose_sources > 1 then
1591 lprintf_nl "[cSrc] Sources: take source from Queue[%s] for %s"
1592 queue_name.(queue)
1593 (file_best_name (m.manager_file ()));
1594 (* put in the connecting queue*)
1595 source_connecting s;
1596 if M.direct_source s.source_uid then begin
1597 Fifo.put next_direct_sources s;
1598 (* we found a direct source try again in the _same_ queue *)
1599 get_sources (nsource-1) m queue (took+1)
1601 else begin
1602 next_indirect_sources := s :: !next_indirect_sources;
1603 (* we found an indirect source try again in the _same_
1604 queue. indirect sources are "for free". *)
1605 get_sources nsource m queue took
1608 else begin
1609 if !verbose_sources > 1 then
1610 lprintf_nl "[cSrc] Source of queue %s is not ready for %s"
1611 queue_name.(queue) (file_best_name (m.manager_file ()));
1612 (* too early to take sources in this queue try again in the _next_ queue*)
1613 if queue_period.(queue) = 0 then
1614 (* queue not throttled, try next queue *)
1615 let to_take =
1616 (* a maximum of just one source from old3 queue *)
1617 if queue+1 >= old_sources3_queue then min 1 nsource
1618 else nsource in
1619 get_sources to_take m (queue+1) took
1620 else
1621 (* throttled queue, and no ready sources ... *)
1622 if nsource = 1 then
1623 (* nsource = 1 not even a ready source without throttle-delay *)
1624 get_sources 0 m (queue) took
1625 (* exit here *)
1626 else
1627 (* finaly try to take at least one source, regardless of throttles *)
1628 get_sources 1 m (queue) took
1630 else begin
1631 if !verbose_sources > 1 then
1632 lprintf_nl "[cSrc] Queue %s is empty for %s"
1633 queue_name.(queue) (file_best_name (m.manager_file ()));
1634 (* no sources in this queue try again in the _next_ queue *)
1635 let to_take =
1636 (* a maximum of just one source from old3 queue *)
1637 if queue+1 >= old_sources3_queue then min 1 nsource
1638 else nsource in
1639 get_sources to_take m (queue+1) took
1640 end in
1642 (* recalc list if there's no new file*)
1643 (* Fill only with sources from files being downloaded *)
1645 let nfiles = ref 0 in
1646 let files = ref [] in
1647 let min_priority = ref 0 in
1648 let sum_priority = ref 0 in
1649 List.iter (fun m ->
1650 match file_state (m.manager_file ()) with
1651 | FileDownloading ->
1652 let priority = file_priority (m.manager_file ()) in
1653 min_priority := min !min_priority priority;
1654 sum_priority := !sum_priority + priority;
1655 files := (priority, m ) :: !files;
1656 incr nfiles
1657 | _ -> () ) !file_sources_managers;
1659 if !files <> [] then begin
1661 (* 'normalize' to 0 priorities*)
1662 sum_priority := !sum_priority + (!nfiles * (-(!min_priority)));
1663 (* update priorities to be > 0 *)
1664 files := List.map (fun (p, f) ->
1665 let np = p - (!min_priority) in
1666 if np = 0 then begin
1667 incr sum_priority;
1668 (1, f)
1670 else (np, f) ) !files;
1672 (*sort by highest priority*)
1673 files := List.sort (fun (p1,_) (p2,_) -> compare p2 p1) !files;
1675 (* calc sources queue size
1676 at least 3 sources per file*)
1677 let nsources = max (!nfiles * 3)
1678 (functions.function_max_connections_per_second () * 10) in
1680 (* calc how much sources a file can get according to its priority*)
1681 let sources_per_prio =
1682 (float_of_int nsources) /. (float_of_int !sum_priority) in
1686 iter through files to queue sources
1687 flist_todo : next files to test
1688 assigned : number of sources already queued
1689 looped : number of times we allow to loop try to fill queue of sources
1690 (how hard we try to fill queue)
1692 let rec iter_files assigned looped =
1694 (* throw in new sources at high pace and do not care
1695 about them in get_sources, this avoids "locking" a
1696 file's queue sources with thousands of new sources
1697 from SE *)
1698 let try_some_new_sources () =
1699 let extr = ref 0 in
1700 List.iter (fun m ->
1701 let f = m.manager_file () in
1702 let q = m.manager_sources.(new_sources_queue) in
1703 if file_state f = FileDownloading && Queue.length q > 0 then
1704 let (request_time, s) = Queue.head q in
1705 source_connecting s;
1706 if M.direct_source s.source_uid then begin
1707 incr extr;
1708 Fifo.put next_direct_sources s
1710 else
1711 next_indirect_sources := s :: !next_indirect_sources
1712 ) !file_sources_managers;
1713 !extr in
1715 let cleanup_some_old_sources () =
1716 (* Cleanup some sources *)
1717 List.iter (fun m ->
1718 let f = m.manager_file () in
1719 if file_state f = FileDownloading then
1720 let remove_old q t =
1721 if Queue.length q > 0 then
1722 let (request_time, s) = Queue.head q in
1723 if request_time + t < last_time () then
1724 remove_from_queue s (find_request s m) in
1726 remove_old m.manager_sources.(do_not_try_queue) 14400;
1727 remove_old m.manager_sources.(old_sources3_queue) 2400;
1728 remove_old m.manager_sources.(old_sources2_queue) 1200
1729 ) !file_sources_managers in
1731 let rec aux flist_todo assigned =
1732 if assigned >= nsources then cleanup_some_old_sources ()
1733 else
1734 match flist_todo with
1735 | (prio, file) :: t ->
1736 let tt =
1737 min (truncate (sources_per_prio *. (float_of_int prio)))
1738 max_consecutive in
1739 let to_take = max tt 1 in
1740 (* allow at least one source per file :
1741 we will overflow a bit the expected next_direct_sources length
1742 but it's for the good cause : not 'starving' some files
1744 let took = get_sources to_take file good_sources_queue 0 in
1745 aux t (assigned + took)
1747 | [] ->
1748 cleanup_some_old_sources ();
1750 (* more power to the "runaway" (most overloaded) file, pick extra sources *)
1751 let em =
1752 let q = find_throttled_queue good_sources_queue in
1753 if queue_period.(q) > 0 then
1754 let max_overloaded =
1755 List.hd (find_max_overloaded q !file_sources_managers) in
1756 let overhead =
1757 count_file_ready_sources max_overloaded q true in
1758 if overhead > 0 then
1759 get_sources max_consecutive max_overloaded good_sources_queue 0
1760 else 0
1761 else 0 in
1763 if looped > 0 then
1764 (* allow at most looped re-iter of list to not
1765 loop endlessly *)
1766 iter_files (assigned + em) (looped - 1)
1768 let extr = try_some_new_sources () in
1769 aux !files (assigned + extr)
1772 iter_files 0 3;
1774 (* adjust queue throttling *)
1775 let all_ready = ref 0 in
1776 List.iter (fun q ->
1777 let queue_throttled_ready = count_ready_sources q true in
1778 let queue_ready = count_ready_sources q false in
1779 all_ready := !all_ready + queue_throttled_ready;
1780 if !all_ready > nsources && queue_throttled_ready > 0 then begin
1781 (* no need, to increase period on a queue without ready sources *)
1782 (* lprintf "commonSources: increasing queue throttling for (ar=%d rc=%d qr=%d) %s\n" !allReady nsources queueReady queue_name.(q); *)
1783 queue_period.( q ) <- queue_period.( q ) + 1
1785 else begin
1786 if queue_ready = 0 then begin
1787 (* lprintf "commonSources: resetting queue throttling to 0 (ar=%d rc=%d qr=%d) %s\n" !allReady nsources queueReady queue_name.(q); *)
1788 queue_period.( q ) <- 0
1790 else begin
1791 (* lprintf "commonSources: decreasing queue throttling for (ar=%d rc=%d qr=%d) %s\n" !allReady nsources queueReady queue_name.(q); *)
1792 queue_period.( q ) <- max 0 (queue_period.( q ) - 1)
1795 ) [ good_sources_queue; old_sources1_queue; old_sources2_queue; old_sources3_queue ];
1797 end;
1799 if !verbose_sources > 0 then begin
1800 lprintf_nl "[cSrc] CommonSources.refill_sources AFTER:";
1801 let buf = Buffer.create 100 in
1802 print buf TEXT;
1803 lprintf "%s\n" (Buffer.contents buf);
1804 end;
1805 with e ->
1806 lprintf_nl "[cSrc] Exception %s in refill_sources"
1807 (Printexc2.to_string e)
1810 (*************************************************************************)
1811 (* *)
1812 (* clean_sources helper *)
1813 (* *)
1814 (*************************************************************************)
1815 let put_all_outside_queue m q queue =
1816 let _, s = Queue.take q in
1817 m.manager_all_sources <- m.manager_all_sources - 1;
1818 if active_queue queue then
1819 m.manager_active_sources <- m.manager_active_sources - 1;
1820 List.iter (fun r ->
1821 if r.request_file == m then begin
1822 r.request_queue <- outside_queue;
1823 set_score_part r not_found_score
1825 ) s.source_files
1827 (*************************************************************************)
1828 (* *)
1829 (* clean_sources *)
1830 (* *)
1831 (*************************************************************************)
1833 let clean_sources () =
1834 (* Maybe this should be dependant on the file (priority, state,...) ? *)
1835 let max_sources_per_file = functions.function_max_sources_per_file () in
1836 List.iter (fun m ->
1837 match file_state (m.manager_file ()) with
1838 | FileDownloading ->
1839 let nsources = m.manager_all_sources in
1840 if nsources > max_sources_per_file then
1841 let rec iter nsources q queue =
1842 if nsources > 0 then
1843 if Queue.length q > 0 &&
1844 queue <> good_sources_queue then begin
1845 put_all_outside_queue m q queue;
1846 iter (nsources-1) q queue
1848 else
1849 let do_iter q = iter nsources m.manager_sources.(q) q in
1851 if queue = old_sources1_queue then do_iter do_not_try_queue
1852 else if queue = do_not_try_queue then do_iter new_sources_queue
1853 else if queue = new_sources_queue then do_iter waiting_saved_sources_queue
1854 else if queue > good_sources_queue then do_iter (queue-1)
1857 iter (nsources - max_sources_per_file) (m.manager_sources.(old_sources3_queue)) old_sources3_queue
1859 | _ ->
1860 let rec iter q queue =
1861 if Queue.length q > 0 then begin
1862 put_all_outside_queue m q queue;
1863 iter q queue
1865 else
1866 if queue > 0 then
1867 iter m.manager_sources.(queue-1) (queue-1)
1869 iter (m.manager_sources.(do_not_try_queue)) do_not_try_queue
1870 ) !file_sources_managers
1872 (*************************************************************************)
1873 (* *)
1874 (* connect_sources *)
1875 (* *)
1876 (*************************************************************************)
1878 let connect_sources connection_manager =
1880 if !verbose_sources > 1 then
1881 lprintf_nl "[cSrc] connect_sources";
1882 (* After 2 minutes, consider that connections attempted should be revoked. *)
1884 if !verbose_sources > 1 then
1885 lprintf_nl "[cSrc] revoke connecting sources...";
1886 let rec iter () =
1887 if not (Fifo.empty connecting_sources) then
1888 let (time, s) = Fifo.head connecting_sources in
1889 if time <> s.source_last_attempt then begin
1890 ignore (Fifo.take connecting_sources);
1891 iter ()
1892 end
1893 else if time + 120 < last_time () then begin
1894 ignore (Fifo.take connecting_sources);
1895 if s.source_last_attempt <> 0 then source_disconnected s;
1896 iter ()
1899 iter ();
1901 (* First, require !!max_connections_per_second sources to connect to us.
1902 The probability is very high they won't be able to connect to us. *)
1904 if !verbose_sources > 1 then
1905 lprintf_nl "[cSrc] connect indirect sources...";
1906 let (first_sources, last_sources) =
1907 List2.cut !!max_connections_per_second !next_indirect_sources in
1908 next_indirect_sources := last_sources;
1909 List.iter (fun s ->
1910 ignore (connect_source s)) first_sources;
1912 (* Second, for every file being downloaded, query sources that are already
1913 connected if needed *)
1914 if !verbose_sources > 1 then
1915 lprintf_nl "[cSrc] query connected sources...";
1916 List.iter (fun m ->
1917 match file_state (m.manager_file ()) with
1918 | FileDownloading ->
1919 let q = m.manager_sources.(connected_sources_queue) in
1920 let rec iter () =
1921 if Queue.length q > 0 then
1922 let (time, s) = Queue.head q in
1923 if time + !!min_reask_delay < last_time () then begin
1925 let r = find_request s m in
1926 (* lprintf "commonSources: connect_sources: second place for source_query !?\n"; *)
1927 (* isn't that here pretty useless? *)
1928 source_query s r;
1929 (* After this step, the source is
1930 either in 'busy_sources_queue',
1931 if for some reason, the request
1932 could not be sent, or in
1933 'connected_sources_queue' at the
1934 tail if the request could be sent.
1935 This seems thus safe.
1937 iter ()
1938 end in
1939 iter ()
1940 | _ -> ()
1941 ) !file_sources_managers;
1943 if !verbose_sources > 1 then
1944 lprintf_nl "[cSrc] connect to sources...";
1945 (* Finally, connect to available sources *)
1947 let max_sources = functions.function_max_connections_per_second () in
1948 if !verbose_sources > 1 then
1949 lprintf_nl "[cSrc] max_sources: %d" max_sources;
1950 let rec iter nsources refilled =
1951 if nsources > 0 && can_open_connection connection_manager then
1952 if Fifo.length next_direct_sources > 0 then
1953 let s = Fifo.take next_direct_sources in
1954 connect_source s;
1955 let nsources =
1956 match s.source_sock with
1957 | NoConnection ->
1958 if !verbose_sources > 1 then
1959 lprintf_nl "[cSrc] not connected"; nsources
1960 | _ -> nsources - 1 in
1961 iter nsources refilled
1962 else if not refilled then begin
1963 refill_sources ();
1964 iter nsources true
1965 end in
1966 iter max_sources false;
1967 if !verbose_sources > 1 then
1968 lprintf_nl "[cSrc] done connect_sources";
1969 with Exit -> ()
1973 (*************************************************************************)
1974 (* *)
1975 (* attach_sources_to_file *)
1976 (* *)
1977 (*************************************************************************)
1979 let value_to_module f v =
1980 match v with
1981 | Module list -> f list
1982 | _ -> failwith "Option should be a module"
1984 let option = define_option_class "Source"
1985 (fun v ->
1986 (* lprintf "(n) source !!\n"; *)
1987 value_to_module value_to_source v)
1988 (fun s -> Module (source_to_value s []))
1990 let file_sources_option = ref None
1992 let attach_sources_to_file section =
1993 (* lprintf "attach_sources_to_file\n"; *)
1994 let sources = match !file_sources_option with
1995 | None ->
1996 (* lprintf "attaching sources this time\n"; *)
1997 let sources = define_option section
1998 ["sources"] "" (listiter_option option) [] in
1999 (* lprintf "done\n"; *)
2000 file_sources_option := Some sources;
2001 sources
2002 | Some sources -> sources in
2003 sources =:= [];
2004 HS.iter (fun s -> sources =:= s :: !!sources) sources_by_uid;
2006 (fun _ -> sources =:= [])
2009 (*************************************************************************)
2010 (* *)
2011 (* MAIN *)
2012 (* *)
2013 (*************************************************************************)
2015 let () =
2016 Heap.add_memstat M.module_name (fun level buf ->
2018 let nsources_per_queue = Array.make nqueues 0 in
2019 let nready_per_queue = Array.make nqueues 0 in
2020 List.iter (fun m ->
2021 for i = 0 to nqueues -1 do
2022 let q = m.manager_sources.(i) in
2023 let nready = ref 0 in
2024 let nsources = ref 0 in
2025 let ready_threshold = last_time () - !!min_reask_delay in
2026 Queue.iter (fun (time, s) ->
2027 incr nsources;
2028 if time < ready_threshold then incr nready
2029 else if i = new_sources_queue then begin
2030 Printf.bprintf buf "ERROR: Source is not ready in new_sources_queue !\n";
2031 print_source buf s
2033 ) q;
2034 nsources_per_queue.(i) <- nsources_per_queue.(i) + !nsources;
2035 nready_per_queue.(i) <- nready_per_queue.(i) + !nready;
2036 done
2037 ) !file_sources_managers;
2039 Printf.bprintf buf "\nFor all managers (%d):\n" (List.length !file_sources_managers);
2040 for i = 0 to nqueues - 1 do
2041 Printf.bprintf buf " Queue[%s]: %d entries (%d ready)\n"
2042 queue_name.(i) nsources_per_queue.(i) nready_per_queue.(i);
2043 done;
2045 let nsources = ref 0 in
2046 HS.iter (fun _ -> incr nsources) sources_by_uid;
2047 Printf.bprintf buf "Sources by UID table: %d entries\n" !nsources;
2048 let a1, a2, a3, a4, a5, a6 = HS.stats sources_by_uid in
2049 Printf.bprintf buf "Sources by UID table stats: %d %d %d %d %d %d\n"
2050 a1 a2 a3 a4 a5 a6;
2052 nsources := 0;
2053 H.iter (fun _ -> incr nsources) sources_by_num;
2054 Printf.bprintf buf "Sources by NUM table: %d entries\n" !nsources;
2055 let a1, a2, a3, a4, a5, a6 = H.stats sources_by_num in
2056 Printf.bprintf buf "Sources by NUM table stats: %d %d %d %d %d %d\n"
2057 a1 a2 a3 a4 a5 a6;
2059 Printf.bprintf buf "Used indirect connections: %d\n"
2060 !indirect_connections;
2062 let nconnected = ref 0 in
2063 Fifo.iter (fun (_, s) ->
2064 if s.source_last_attempt = 0 then incr nconnected;
2065 ) connecting_sources;
2066 Printf.bprintf buf "Connecting Sources: %d entries"
2067 (Fifo.length connecting_sources);
2068 if !nconnected > 0 then
2069 Printf.bprintf buf " (connected: %d)" !nconnected;
2070 Printf.bprintf buf "\n";
2072 Printf.bprintf buf "Next Direct Sources: %d entries\n"
2073 (Fifo.length next_direct_sources);
2075 Printf.bprintf buf "Next Indirect Sources: %d entries\n"
2076 (List.length !next_indirect_sources)
2079 end)