1 (* Copyright 2001, 2002 Simon, INRIA *)
3 This file is part of mldonkey.
5 mldonkey is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation; either version 2 of the License, or
8 (at your option) any later version.
10 mldonkey is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 GNU General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with mldonkey; if not, write to the Free Software
17 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
26 open TcpBufferedSocket
34 * From mlnet.log: not all sources in [new_sources] are ready. More
35 sources for a file than total sources in [sources_by_uid]. Probably the
36 same source entered several times inside the same queue. After a connection
37 failure, we should not only change the request_time but also the
38 request_score in case it is initial_new_source_score.
44 * Limit total number of indirect connections
45 * Implement need_new_sources
46 * The concept of 'source' should replace the concept of 'client' for the
47 interface. The old version of source management used to keep good clients
48 so that useful information about them would not be lost.
52 (* A source might be in the following states:
54 (1) Not Connected: the source appears in the queues between
55 'new_sources_queue' and 'old_sources_queue2'.
57 (2) Connecting: 'source_last_attempt' is not zero. The source appears in the
58 queue 'connecting_sources_queue' for all files.
60 (3) Connected: The source appears either in the queue
61 'connected_sources_queue' or 'busy_sources_queue' for every files.
62 If 'r.request_time' is 0, it means we can query the source whenever we
63 want, but we are already too busy, so the source is in
64 'busy_sources_queue'. Otherwise, we are not ready to query, and the source
65 has to be in 'connected_sources_queue'.
71 | File_possible
(* we asked, but didn't know *)
72 | File_not_found
(* we asked, the file is not there *)
73 (* | File_expected we asked, because it was announced *)
74 | File_new_source
(* we never asked, but we should *)
75 | File_found
(* the file was found *)
76 | File_chunk
(* the file has chunks we want *)
77 | File_upload
(* we uploaded from this client *)
78 (* | File_unknown We don't know anything *)
80 let not_found_score = -5
81 let possible_score = -3
82 let new_source_score = 1 (* after the first request *)
86 let initial_new_source_score = 10 (* before the first request *)
88 let outside_queue = -1
89 let new_sources_queue = 0
90 let good_sources_queue = 1
91 let ready_saved_sources_queue = 2
92 let waiting_saved_sources_queue = 3
93 let old_sources1_queue = 4
94 let old_sources2_queue = 5
95 let old_sources3_queue = 6
96 let do_not_try_queue = 7
97 let connected_sources_queue = 8
98 let connecting_sources_queue = 9
99 let busy_sources_queue = 10
104 "ready_saved_sources";
105 "waiting_saved_sources";
111 "connecting_sources";
116 let nqueues = Array.length
queue_name
118 let queue_period = Array.make
nqueues 600
121 queue_period.(new_sources_queue) <- 0;
122 queue_period.(connected_sources_queue) <- 0;
123 queue_period.(connecting_sources_queue) <- 0;
124 queue_period.(busy_sources_queue) <- 0;
125 queue_period.(good_sources_queue) <- 0;
126 queue_period.(ready_saved_sources_queue) <- 0;
127 queue_period.(waiting_saved_sources_queue) <- 0;
128 queue_period.(old_sources1_queue) <- 0;
129 queue_period.(old_sources2_queue) <- 0;
130 queue_period.(old_sources3_queue) <- 0
135 (*************************************************************************)
136 (*************************************************************************)
137 (*************************************************************************)
139 (* FUNCTOR Argument *)
141 (*************************************************************************)
142 (*************************************************************************)
143 (*************************************************************************)
147 val module_name
: string
150 val dummy_source_uid
: source_uid
151 val source_uid_to_value
: source_uid
-> Options.option_value
152 val value_to_source_uid
: Options.option_value
-> source_uid
155 val dummy_source_brand
: source_brand
156 val source_brand_to_value
: source_brand
-> Options.option_value
157 val value_to_source_brand
: Options.option_value
-> source_brand
159 val direct_source
: source_uid
-> bool
160 val indirect_source
: source_uid
-> bool
165 (*************************************************************************)
166 (*************************************************************************)
167 (*************************************************************************)
171 (*************************************************************************)
172 (*************************************************************************)
173 (*************************************************************************)
175 (*************************************************************************)
179 (*************************************************************************)
182 source_uid
: M.source_uid
;
183 mutable source_files
: file_request list
;
185 (* the 'source_score' increases with failures in connections *)
186 mutable source_score
: int;
188 (* the 'source_num' that should be used to create the client corresponding to
190 mutable source_num
: int;
192 (* the 'source_age' is the time of the last successful connection *)
193 mutable source_age
: int;
195 (* the 'source_connecting' indicates that this source is currently in the
196 process of being connected. *)
197 mutable source_last_attempt
: int;
198 mutable source_sock
: tcp_connection
;
200 mutable source_brand
: M.source_brand
;
201 mutable source_country_code
: int option;
205 request_file
: file_sources_manager
;
206 mutable request_queue
: int;
207 mutable request_time
: int;
208 mutable request_score
: int;
211 and file_sources_manager
= {
212 manager_uid
: string;
213 mutable manager_sources
: source
Queues.Queue.t array
;
214 mutable manager_active_sources
: int;
215 mutable manager_all_sources
: int;
216 mutable manager_file
: (unit -> file
);
220 mutable function_connect
: (M.source_uid
-> int option -> unit);
221 mutable function_query
: (M.source_uid
-> string -> unit);
223 mutable function_string_to_manager
: (string -> file_sources_manager
);
225 mutable function_max_connections_per_second
: (unit -> int);
226 mutable function_max_sources_per_file
: (unit -> int);
228 mutable function_add_location
:
229 (M.source_uid
-> string -> int option -> unit);
230 mutable function_remove_location
:
231 (M.source_uid
-> string -> unit);
234 (*************************************************************************)
238 (*************************************************************************)
240 module HS
= Weak.Make
(struct
242 let hash s
= Hashtbl.hash s
.source_uid
243 let equal x y
= x
.source_uid
= y
.source_uid
246 module H
= Weak.Make
(struct
248 let hash s
= Hashtbl.hash s
.source_num
249 let equal x y
= x
.source_num
= y
.source_num
252 module SourcesQueueCreate
= Queues.Make
(struct
254 let compare s1 s2
= compare s1
.source_uid s2
.source_uid
257 (*************************************************************************)
259 (* Global variables *)
261 (*************************************************************************)
264 source_uid
= M.dummy_source_uid
;
270 source_last_attempt
= 0;
271 source_sock
= NoConnection
;
273 source_brand
= M.dummy_source_brand
;
274 source_country_code
= None
;
277 let last_refill = ref 0
279 let not_implemented s _
=
280 failwith
(Printf.sprintf
"CommonSources.%s not implemented" s
)
283 function_connect
= not_implemented "function_connect";
284 function_query
= not_implemented "function_query";
285 function_string_to_manager
= not_implemented
286 "function_string_to_manager";
288 function_max_connections_per_second
= (fun _
->
289 !!max_connections_per_second
);
290 function_max_sources_per_file
= (fun _
-> 10);
292 function_add_location
= not_implemented "function_add_location";
293 function_remove_location
= not_implemented "function_remove_location";
296 let indirect_connections = ref 0
298 (*************************************************************************)
302 (*************************************************************************)
304 let sources_by_uid = HS.create
13557
305 let sources_by_num = H.create
13557
307 let file_sources_managers = ref []
309 let connecting_sources = Fifo.create
()
311 let next_direct_sources = Fifo.create
()
312 let next_indirect_sources = ref []
316 q
>= connected_sources_queue && q
<= busy_sources_queue
318 (*************************************************************************)
322 (*************************************************************************)
324 let request_score r
= r
.request_score
326 let set_score_part r score
=
327 r
.request_score <- score
330 (*************************************************************************)
332 (* other helper functions *)
334 (*************************************************************************)
336 let rec find_throttled_queue queue
=
337 if queue_period.(queue
) > 0 || queue
= old_sources3_queue then
340 find_throttled_queue (queue
+ 1)
342 let get_throttle_delay m q throttled
=
346 - (file_priority
(m
.manager_file
()))
347 + Queue.length m
.manager_sources
.(connected_sources_queue))
352 * determine the number of (throttled) ready sources for a manager queue
355 (* I know it's evil to break out of an X.iter using an exception...
356 But that function really needs to be fast.
357 Also, this works because Queues are based on Sets, and that Set.iter
358 gives elements in increasing keys order *)
359 exception BreakOutOfLoop
361 let count_file_ready_sources m q throttled
=
362 let ready_count = ref 0 in
363 let throttle_delay = get_throttle_delay m q throttled
in
364 let ready_threshold =
365 last_time
() - !!min_reask_delay
- throttle_delay in
369 if time
>= ready_threshold then raise BreakOutOfLoop
;
371 ) m
.manager_sources
.(q
)
372 with BreakOutOfLoop
-> ());
376 * determine the total number of ready sources for all downloading files per queue
378 let count_ready_sources queue throttled
=
379 List.fold_left
(fun ready_count m
->
380 let f = m
.manager_file
() in
381 if file_state
f = FileDownloading
then
382 ready_count + count_file_ready_sources m queue throttled
384 ) 0 !file_sources_managers
387 let rec find_max_overloaded q managers
=
388 let _, remaining_managers
=
389 List.fold_left
(fun ((current_max
, remaining_managers
) as acc
) m
->
390 let ready_sources = count_file_ready_sources m q
true in
391 if ready_sources > current_max
then
393 else if ready_sources = current_max
then
394 (current_max
, m
:: remaining_managers
)
396 ) (-1, []) managers
in
400 (*************************************************************************)
404 (*************************************************************************)
406 let print_source buf s
=
407 Printf.bprintf buf
"Source %d:\n" s
.source_num
;
408 Printf.bprintf buf
" score: %d\n" s
.source_score
;
409 if s
.source_age
<> 0 then
410 Printf.bprintf buf
" age: %d\n" s
.source_age
;
411 if s
.source_last_attempt
<> 0 then
412 Printf.bprintf buf
" last_attemps: %d" s
.source_last_attempt
;
414 Printf.bprintf buf
" File %s\n"
415 (file_best_name
(r
.request_file
.manager_file
()));
416 Printf.bprintf buf
" Score: %d\n" r
.request_score;
417 if r
.request_time
<> 0 then
418 Printf.bprintf buf
" Time: %d\n" r
.request_time
;
428 let need_new_sources file
=
429 let ready_threshold = last_time
() - !!min_reask_delay
in
430 let ready_count = ref 0 in
431 for i
= good_sources_queue to old_sources1_queue do
432 let lookin = file
.manager_sources
.(i
) in
434 Queue.iter
(fun (time
, s
) ->
435 if time
>= ready_threshold then raise BreakOutOfLoop
;
438 with BreakOutOfLoop
-> ()
440 (* let work_count = !ready_count +
441 (Queue.length ( file.manager_sources.( new_sources_queue ) )) +
442 (Queue.length ( file.manager_sources.( connected_sources_queue ) ))
444 let f = file
.manager_file
() in
445 (* lprintf "commonSources: need_new_source: ready= %d new= %d con= %d prio= %d %s\n"
447 (Queue.length ( file.manager_sources.( new_sources_queue ) ) )
448 (Queue.length ( file.manager_sources.( connected_sources_queue ) ) )
450 (if (file_priority f) + 20 > workCount then "we need" else "have enough");
452 (* (file_priority f) + 20 > work_count *)
453 (* let max_s = functions.function_max_sources_per_file () in
454 (file_priority f)*(max_s/20) + max_s > !all_ready_s + new_s *)
455 (file_priority
f) + 20 > !ready_count
458 (*************************************************************************)
462 (*************************************************************************)
465 let print buf output_type
=
466 let pos_to_string v
=
467 if v
> 0 then string_of_int v
else "-" in
469 html_mods_cntr_init
();
470 let mycntr = ref 1 in
473 mycntr := html_mods_cntr
();
474 Printf.bprintf buf
"\\<tr class=\\\"dl-%d\\\"\\>" !mycntr in
476 let html_tr_same () =
477 Printf.bprintf buf
"\\<tr class=\\\"dl-%d\\\"\\>" !mycntr in
480 if output_type
= HTML
then
481 let header = Printf.sprintf
"File sources per manager queue (%d)"
482 (List.length
!file_sources_managers) in
484 Printf.bprintf buf
"\\<div class=results\\>";
485 html_mods_table_header buf
"sourcesTable" "sources" [];
486 Printf.bprintf buf
"\\<tr\\>";
488 ("", "srh", "Statistics on sources ");
489 ("", "srh", "@ " ^ log_time
());
490 ("", "srh", header); ];
491 Printf.bprintf buf
"\\</tr\\>\\</table\\>\\</div\\>\n";
493 html_mods_table_header buf
"sourcesTable" "sources" [
494 ( Str
, "srh br", "New sources",
495 Printf.sprintf
"New(%d)" new_sources_queue );
496 ( Str
, "srh br", "Good sources",
497 Printf.sprintf
"Good(%d)" good_sources_queue );
498 ( Str
, "srh br", "Ready saved sources",
499 Printf.sprintf
"Ready(%d)" ready_saved_sources_queue);
500 ( Str
, "srh br", "Waiting saved sources",
501 Printf.sprintf
"Wait(%d)" waiting_saved_sources_queue);
502 ( Str
, "srh br", "Old sources 1",
503 Printf.sprintf
"Old1(%d)" old_sources1_queue );
504 ( Str
, "srh br", "Old sources 2",
505 Printf.sprintf
"Old2(%d)" old_sources2_queue );
506 ( Str
, "srh br", "Old sources 3",
507 Printf.sprintf
"Old3(%d)" old_sources3_queue );
508 ( Str
, "srh br", "Do not try sources",
509 Printf.sprintf
"nTry(%d)" do_not_try_queue );
510 ( Str
, "srh br", "Connected sources",
511 Printf.sprintf
"Conn(%d)" connected_sources_queue );
512 ( Str
, "srh br", "Connecting sources",
513 Printf.sprintf
"Cing(%d)" connecting_sources_queue );
514 ( Str
, "srh br", "Busy sources",
515 Printf.sprintf
"Busy(%d)" busy_sources_queue );
516 ( Str
, "srh br", "Total sources", "All" );
517 ( Str
, "srh br", "Filename", "Name" ); ];
519 Printf.bprintf buf
"Statistics on sources: time %d\n" (last_time
());
520 Printf.bprintf buf
"File sources per manager queue(%d):\n"
521 (List.length
!file_sources_managers);
522 Printf.bprintf buf
"new good redy wait old1 old2 old3 ntry conn cing busy all\n";
523 (* "9999 9999 9999 9999 9999 9999 9999 9999 9999 9999 9999 9999"
525 one row each: all,indirect,ready *)
528 let list_sum = List.fold_left
(+) 0 in
530 let nsources_per_queue = Array.make
nqueues 0 in
531 let nready_per_queue = Array.make
nqueues 0 in
532 let nindirect_per_queue = Array.make
nqueues 0 in
533 let ninvalid_per_queue = Array.make
nqueues 0 in
536 let naneed = ref 0 in
537 let downloading_managers =
538 List.filter
(fun m
->
539 file_state
(m
.manager_file
()) = FileDownloading
540 ) !file_sources_managers in
541 let my_file_sources_managers =
542 List.sort
(fun f1 f2
->
543 let best_name1 = file_best_name
(f1
.manager_file
()) in
544 let best_name2 = file_best_name
(f2
.manager_file
()) in
545 String.compare best_name1 best_name2
546 ) downloading_managers in
548 let ready_threshold = last_time
() - !!min_reask_delay
in
550 let name = file_best_name
(m
.manager_file
()) in
551 let need_sources = need_new_sources m
in
552 if need_sources then incr
naneed;
554 if m
.manager_all_sources
<> 0 then begin
555 let slist = ref [] in
556 let sreadylist = ref [] in
557 let streadylist = ref [] in
558 let sindirectlist = ref [] in
559 let sinvalidlist = ref [] in
561 Array.iteri
(fun i q
->
562 let nready = ref 0 in
563 let ntready = count_file_ready_sources m i
true in
564 let nindirect = ref 0 in
565 let ninvalid = ref 0 in
566 let nsources = ref 0 in
568 Queue.iter
(fun (time
, s
) ->
570 if M.indirect_source s
.source_uid
then incr
nindirect
571 else if not
(M.direct_source s
.source_uid
) then incr
ninvalid;
572 if time
< ready_threshold then incr
nready
573 else if i
= new_sources_queue then begin
574 Printf.bprintf buf
"ERROR: Source is not ready in new_sources_queue !\n";
579 slist := Queue.length q
:: !slist;
580 sreadylist := !nready :: !sreadylist;
581 streadylist := ntready :: !streadylist;
582 sindirectlist := !nindirect :: !sindirectlist;
583 sinvalidlist := !ninvalid :: !sinvalidlist;
585 nready_per_queue.(i
) <- nready_per_queue.(i
) + !nready;
586 nindirect_per_queue.(i
) <- nindirect_per_queue.(i
) + !nindirect;
587 ninvalid_per_queue.(i
) <- ninvalid_per_queue.(i
) + !ninvalid;
588 nsources_per_queue.(i
) <- nsources_per_queue.(i
) + !nsources;
589 ) m
.manager_sources
; (* end Queues *)
591 let slist = List.rev
!slist in
592 let sreadylist = List.rev
!sreadylist in
593 let streadylist = List.rev
!streadylist in
594 let sindirectlist = List.rev
!sindirectlist in
595 let sinvalidlist = List.rev
!sinvalidlist in
597 if output_type
= HTML
then begin
600 (List.map
(fun qlength
->
601 ("", "sr ar br", pos_to_string qlength
)) slist) @
602 [ ("", "sr ar br", string_of_int m
.manager_all_sources
);
603 ("Filename", "sr", shorten
name !!max_name_len
); ] );
605 Printf.bprintf buf
"\\</tr\\>\n";
609 (List.map
(fun sready
->
610 ("", "sr ar br", pos_to_string sready
)) sreadylist) @
611 [ ("", "sr ar br", Printf.sprintf
"%d" (list_sum sreadylist));
612 ("", "sr", Printf.sprintf
"ready with %d active%s"
613 m
.manager_active_sources
614 (if need_sources then " and needs sources"
616 Printf.bprintf buf
"\\</tr\\>\n";
620 (List.map
(fun sready
->
621 ("", "sr ar br", pos_to_string sready
)) streadylist) @
622 [ ("", "sr ar br", string_of_int
(list_sum streadylist));
623 ("", "sr", "throttled ready"); ] );
624 Printf.bprintf buf
"\\</tr\\>\n";
626 let anindirect = list_sum sindirectlist in
627 if anindirect <> 0 then begin
630 (List.map
(fun sready
->
631 ("", "sr ar br", pos_to_string sready
)) sindirectlist) @
632 [ ("", "sr ar br", string_of_int
anindirect);
633 ("", "sr", "indirect"); ] );
634 Printf.bprintf buf
"\\</tr\\>\n";
637 let aninvalid = list_sum sinvalidlist in
638 if aninvalid <> 0 then begin
641 (List.map
(fun sready
->
642 ("", "sr ar br", pos_to_string sready
)) sinvalidlist) @
643 [ ("", "sr ar br", string_of_int
aninvalid);
644 ("", "sr", "invalid"); ] );
645 Printf.bprintf buf
"\\</tr\\>\n";
649 List.iter
(Printf.bprintf buf
"%4d ") slist;
650 Printf.bprintf buf
"%4d %s\n" m
.manager_all_sources
name;
651 List.iter
(Printf.bprintf buf
"%4d ") sreadylist;
652 Printf.bprintf buf
"%4d ready %d active%s\n"
653 (list_sum sreadylist) m
.manager_active_sources
654 (if need_sources then " needs sources"
656 List.iter
(Printf.bprintf buf
"%4d ") streadylist;
657 Printf.bprintf buf
"%4d throttled ready\n"
658 (list_sum streadylist);
659 let anindirect = list_sum sindirectlist in
660 if anindirect <> 0 then begin
661 List.iter
(Printf.bprintf buf
"%4d ") sindirectlist;
662 Printf.bprintf buf
"%4d indirect\n" anindirect;
664 let aninvalid = list_sum sinvalidlist in
665 if aninvalid <> 0 then begin
666 List.iter
(Printf.bprintf buf
"%4d ") sinvalidlist;
667 Printf.bprintf buf
"%4d invalid\n" aninvalid;
671 nall := !nall + m
.manager_all_sources
;
672 naact := !naact + m
.manager_active_sources
;
674 else begin (* m.manager_all_sources = 0 *)
675 if output_type
= HTML
then begin
679 ("", "sr ar br", "-"); ("", "sr ar br", "");
680 ("", "sr ar br", ""); ("", "sr ar br", "");
681 ("", "sr ar br", ""); ("", "sr ar br", "");
682 ("", "sr ar br", ""); ("", "sr ar br", "");
683 ("", "sr ar br", ""); ("", "sr ar br", "");
684 ("", "sr ar br", ""); ("", "sr ar br", "");
685 ("", "sr br", shorten
name !!max_name_len
); ];
686 Printf.bprintf buf
"\\</tr\\>\n";
688 else Printf.bprintf buf
"None %55s%s\n" "" name;
690 ) my_file_sources_managers; (* end Files *)
693 if output_type
= HTML
then begin
694 Printf.bprintf buf
"\\</table\\>\\</div\\>\n";
696 html_mods_table_header buf
"sourcesTable" "sources" [
697 ( Str
, "srh", "New sources", "New" );
698 ( Str
, "srh", "Good sources", "Good" );
699 ( Str
, "srh", "Ready sources", "Ready" );
700 ( Str
, "srh", "Waiting sources", "Wait" );
701 ( Str
, "srh", "Old sources 1", "Old1" );
702 ( Str
, "srh", "Old sources 2", "Old2" );
703 ( Str
, "srh", "Old sources 3", "Old3" );
704 ( Str
, "srh", "Do not try", "nTry" );
705 ( Str
, "srh", "Connected sources", "Conn" );
706 ( Str
, "srh", "Connecting sources", "Cing" );
707 ( Str
, "srh", "Busy sources", "Busy" );
708 ( Str
, "srh", "Total sources", "All" );
709 ( Str
, "srh", "Type", "Type" ); ];
713 Printf.bprintf buf
"new good redy wait old1 old2 old3 ntry conn cing busy all\n";
715 let slist = ref [] in
716 let sreadylist = ref [] in
717 let streadylist = ref [] in
718 let sindirectlist = ref [] in
719 let sinvalidlist = ref [] in
720 let speriodlist = ref [] in
722 for i
= 0 to nqueues - 1 do
723 slist := nsources_per_queue.(i
) :: !slist;
724 sreadylist := nready_per_queue.(i
) :: !sreadylist;
725 streadylist := count_ready_sources i
true :: !streadylist;
726 sindirectlist := nindirect_per_queue.(i
) :: !sindirectlist;
727 sinvalidlist := ninvalid_per_queue.(i
) :: !sinvalidlist;
728 speriodlist := queue_period.(i
) :: !speriodlist;
729 done; (* end Queues *)
731 let nsources = ref 0 in
736 if r
.request_queue
= outside_queue then
741 let slist = List.rev
!slist in
742 let sreadylist = List.rev
!sreadylist in
743 let streadylist = List.rev
!streadylist in
744 let sindirectlist = List.rev
!sindirectlist in
745 let sinvalidlist = List.rev
!sinvalidlist in
746 let speriodlist = List.rev
!speriodlist in
748 if output_type
= HTML
then begin
752 ("", "sr ar", pos_to_string q
)) slist) @
753 [ ("", "sr ar", Printf.sprintf
"%d" !nall);
755 Printf.sprintf
"all source managers (%d by UID) (%d ROQ)"
756 !nsources !nroq);] );
757 Printf.bprintf buf
"\\</tr\\>\n";
761 (List.map
(fun sready
->
762 ("", "sr ar", pos_to_string sready
)) sreadylist) @
763 [ ("", "sr ar", Printf.sprintf
"%d" (list_sum sreadylist));
765 Printf.sprintf
"ready with %d active and %i need sources"
766 !naact !naneed); ] );
767 Printf.bprintf buf
"\\</tr\\>\n";
771 (List.map
(fun sready
->
772 ("", "sr ar", pos_to_string sready
)) streadylist) @
773 [ ("", "sr ar", Printf.sprintf
"%d" (list_sum streadylist));
774 ("", "sr", "throttled ready"); ] );
775 Printf.bprintf buf
"\\</tr\\>\n";
777 let anindirect = list_sum sindirectlist in
778 if anindirect <> 0 then begin
781 (List.map
(fun sready
->
782 ("", "sr ar", pos_to_string sready
)) sindirectlist) @
783 [ ("", "sr ar", Printf.sprintf
"%d" anindirect);
784 ("", "sr", "indirect"); ] );
785 Printf.bprintf buf
"\\</tr\\>\n";
788 let aninvalid = list_sum sinvalidlist in
789 if aninvalid <> 0 then begin
792 (List.map
(fun sready
->
793 ("", "sr ar", pos_to_string sready
)) sinvalidlist) @
794 [ ("", "sr ar", Printf.sprintf
"%d" aninvalid);
795 ("", "sr", "invalid"); ] );
796 Printf.bprintf buf
"\\</tr\\>\n";
801 (List.map
(fun sready
->
802 ("", "sr ar", pos_to_string sready
)) speriodlist) @
804 ("", "sr", "period"); ] );
805 Printf.bprintf buf
"\\</tr\\>\n";
807 Printf.bprintf buf
"\\</table\\>\\</div\\>\n";
810 List.iter
(Printf.bprintf buf
"%4d ") slist;
811 Printf.bprintf buf
"%4d all source managers (%d by UID) (%d ROQ)\n"
812 !nall !nsources !nroq;
813 List.iter
(Printf.bprintf buf
"%4d ") sreadylist;
814 Printf.bprintf buf
"%4d ready %d active %i need sources\n"
815 (list_sum sreadylist) !naact !naneed;
816 List.iter
(Printf.bprintf buf
"%4d ") streadylist;
817 Printf.bprintf buf
"%4d throttled ready\n" (list_sum streadylist);
818 let anindirect = list_sum sindirectlist in
819 if anindirect <> 0 then begin
820 List.iter
(Printf.bprintf buf
"%4d ") sindirectlist;
821 Printf.bprintf buf
"%4d indirect\n" anindirect;
823 let aninvalid = list_sum sinvalidlist in
824 if aninvalid <> 0 then begin
825 List.iter
(Printf.bprintf buf
"%4d ") sinvalidlist;
826 Printf.bprintf buf
"%4d invalid\n" aninvalid;
828 List.iter
(Printf.bprintf buf
"%4d ") speriodlist;
829 Printf.bprintf buf
" period\n";
832 let nconnected = ref 0 in
833 Fifo.iter
(fun (_, s
) ->
834 if s
.source_last_attempt
= 0 then incr
nconnected;
835 ) connecting_sources;
836 if output_type
= HTML
then begin
837 html_mods_table_header buf
"sourcesTable" "sources" [
838 ( Str
, "srh", "Connecting sources", "Connecting sources" );
839 ( Str
, "srh", "Next direct sources", "Next direct sources" );
840 ( Str
, "srh", "Next indirect sources", "Next indirect sources" ); ];
841 Printf.bprintf buf
"\\<tr class=\\\"dl-1\\\"\\>";
843 ("", "sr", (Printf.sprintf
"%d entries"
844 (Fifo.length
connecting_sources)) ^
845 (if !nconnected > 0 then
846 Printf.sprintf
" (connected: %d)" !nconnected else ""));
847 ("", "sr", Printf.sprintf
"%d entries"
848 (Fifo.length
next_direct_sources));
849 ("", "sr", Printf.sprintf
"%d entries"
850 (List.length
!next_indirect_sources)); ];
851 Printf.bprintf buf
"\\</tr\\>\\</table\\>\\</div\\>\n\\</div\\>"
854 Printf.bprintf buf
"Connecting Sources: %d entries"
855 (Fifo.length
connecting_sources);
856 if !nconnected > 0 then
857 Printf.bprintf buf
" (connected: %d)" !nconnected;
858 Printf.bprintf buf
"\n";
859 Printf.bprintf buf
"Next Direct Sources: %d entries\n"
860 (Fifo.length
next_direct_sources);
861 Printf.bprintf buf
"Next Indirect Sources: %d entries\n"
862 (List.length
!next_indirect_sources)
866 (*************************************************************************)
868 (* reschedule_source_for_file *)
870 (*************************************************************************)
872 let reschedule_source_for_file saved s r
=
873 if r
.request_queue
= outside_queue then
875 if r
.request_score = not_found_score then do_not_try_queue
876 else if s
.source_last_attempt
<> 0 then connecting_sources_queue
878 match s
.source_sock
with
879 | (NoConnection
| ConnectionWaiting
_) ->
881 (* Two things matter: the global score and the local score *)
882 if s
.source_score
< 1 then
883 (* 2.5.25, replaced expected_score by
884 found_score, so that sources which
885 only have the file are not put in
886 good_sources_queue, unless they have
887 an interesting chunk AND not a bad
889 if r
.request_score > found_score then
891 if r
.request_time
+ !!min_reask_delay
< last_time
() then
892 ready_saved_sources_queue
893 else waiting_saved_sources_queue
894 else if r
.request_score = initial_new_source_score then
896 else good_sources_queue
897 else if r
.request_score >= new_source_score then
899 else old_sources2_queue
900 else if s
.source_score
< 5 then old_sources3_queue
901 else do_not_try_queue
905 if r
.request_time
= 0 then busy_sources_queue
906 else connected_sources_queue
908 let m = r
.request_file
in
909 if !verbose_sources
> 1 then
910 lprintf_nl
"[cSrc] Put source %d in queue %s"
911 s
.source_num
queue_name.(queue);
912 Queue.put
m.manager_sources
.(queue) (r
.request_time
, s
);
913 if active_queue queue then
914 m.manager_active_sources
<- m.manager_active_sources
+ 1;
915 m.manager_all_sources
<- m.manager_all_sources
+ 1;
916 r
.request_queue
<- queue
918 (*************************************************************************)
920 (* iter_all_sources *)
922 (*************************************************************************)
924 let iter_all_sources f m =
926 Queue.iter
(fun (_, s
) -> f s
) q
929 (*************************************************************************)
930 (* iter_qualified_sources *)
931 (* Only these sources should be used in sourceexchage *)
932 (*************************************************************************)
933 let iter_qualified_sources f m =
934 let q = m.manager_sources
.(good_sources_queue) in
935 Queue.iter
(fun (_, s
) -> f s
) q
937 (*************************************************************************)
939 (* iter_active_sources *)
941 (*************************************************************************)
943 let iter_active_sources f m =
944 for i
= connected_sources_queue to busy_sources_queue do
945 let q = m.manager_sources
.(i
) in
946 Queue.iter
(fun (_, s
) -> f s
) q
949 (*************************************************************************)
951 (* iter_relevant_sources *)
953 (*************************************************************************)
954 let iter_relevant_sources f m =
957 let q = m.manager_sources
.(i
) in
958 Queue.iter
(fun (_, s
) -> f s
) q
961 (*************************************************************************)
963 (* set_source_brand *)
965 (*************************************************************************)
967 let set_source_brand s brand
=
968 s
.source_brand
<- brand
970 (*************************************************************************)
974 (*************************************************************************)
976 let source_brand s
= s
.source_brand
978 (*************************************************************************)
980 (* remove_from_queue *)
982 (*************************************************************************)
984 let remove_from_queue s r
=
985 if r
.request_queue
<> outside_queue then begin
986 if !verbose_sources
> 1 then
987 lprintf_nl
"[cSrc] Remove source %d from queue %s" s
.source_num
988 queue_name.(r
.request_queue
);
990 let m = r
.request_file
in
991 if active_queue r
.request_queue
then
992 m.manager_active_sources
<- m.manager_active_sources
- 1;
993 Queue.remove r
.request_file
.manager_sources
.(r
.request_queue
)
995 r
.request_queue
<- outside_queue;
996 m.manager_all_sources
<- m.manager_all_sources
- 1
999 (*************************************************************************)
1001 (* source_connecting *)
1003 (*************************************************************************)
1005 (* From state (1) to state (2) *)
1006 let source_connecting s
=
1007 s
.source_last_attempt
<- last_time
();
1008 Fifo.put
connecting_sources (s
.source_last_attempt
, s
);
1010 if r
.request_queue
<> outside_queue then begin
1011 remove_from_queue s r
;
1012 reschedule_source_for_file false s r
;
1017 (*************************************************************************)
1021 (*************************************************************************)
1023 let source_query s r
=
1024 remove_from_queue s r
;
1025 if r
.request_score > not_found_score then
1026 (* query_files will query all files for a source, check that we are
1027 really downloading! example source s has file f1 and file f2,
1028 file f2 is paused we connect because of f1 and then query both
1029 files f1 and f2 ... and yes, we do a cleanup ... but a timed one,
1030 so we can't be sure *)
1031 if r
.request_score > not_found_score &&
1032 file_state
(r
.request_file
.manager_file
()) = FileDownloading
1034 r
.request_time
<- 0; (* The source is ready for this request *)
1035 reschedule_source_for_file false s r
; (* put it in busy_sources_queue *)
1037 functions.function_query s
.source_uid r
.request_file
.manager_uid
1039 lprintf_nl
"[cSrc] Exception %s in functions.function_query" (Printexc2.to_string e
)
1043 (*************************************************************************)
1045 (* source_connected *)
1047 (*************************************************************************)
1049 (* From state (2) to state (3) *)
1050 let source_connected s
=
1051 s
.source_score
<- 0;
1052 s
.source_age
<- last_time
();
1053 s
.source_last_attempt
<- 0;
1055 (* lprintf "SOURCE> request: "; *)
1056 if r
.request_queue
<> outside_queue then begin
1057 (* lprintf "score %d/%d last query %s\n"
1058 r.request_score possible_score
1059 (if r.request_time = 0 then "never" else
1060 Printf.sprintf "%d secs"
1061 (last_time () - r.request_time)); *)
1062 remove_from_queue s r
;
1063 if r
.request_score > possible_score &&
1064 r
.request_time
+ !!min_reask_delay
< last_time
() then
1067 let m = r
.request_file
in
1068 functions.function_add_location s
.source_uid
1069 m.manager_uid s
.source_country_code
with _ -> ());
1070 reschedule_source_for_file false s r
1072 (* else lprintf "outside queue\n" *)
1075 (*************************************************************************)
1077 (* source_disconnected *)
1079 (*************************************************************************)
1081 (* From states (1) or (2) to state (3) *)
1082 let source_disconnected s
=
1083 (match s
.source_sock
with
1084 | NoConnection
-> ()
1085 | ConnectionWaiting token
->
1087 s
.source_sock
<- NoConnection
1088 | Connection sock
->
1089 close sock Closed_for_timeout
1091 let connecting = s
.source_last_attempt
<> 0 in
1092 (* source_last_attempt set to time, on connect_reply set
1093 to zero. if we never reached connect_reply, the ip is
1094 dead. Then we think we were *not* trying to connect
1097 s
.source_last_attempt
<- 0;
1099 if r
.request_queue
<> outside_queue then begin
1100 remove_from_queue s r
;
1101 if connecting then begin
1102 r
.request_time
<- last_time
();
1103 if r
.request_score = initial_new_source_score then
1104 set_score_part r
new_source_score
1107 if r
.request_time
= 0 then
1108 (* we think we were not connecting,
1109 but in some cases we were! and
1110 now we imidiately reconnect for
1111 that file, on a dead IP??
1112 r.request_time <- last_time () - 600;
1115 r
.request_time
<- last_time
();
1117 let m = r
.request_file
in
1118 functions.function_remove_location s
.source_uid
1122 reschedule_source_for_file false s r
;
1126 (*************************************************************************)
1128 (* connect_source *)
1130 (*************************************************************************)
1132 let connect_source s
=
1133 if !verbose_sources
> 1 then
1134 lprintf_nl
"[cSrc] connect_source";
1135 s
.source_score
<- s
.source_score
+ 1;
1136 functions.function_connect s
.source_uid s
.source_country_code
1138 (*************************************************************************)
1142 (*************************************************************************)
1144 let create_queues () =
1147 (* We should change this to 'oldest_last' to improve Queue.remove *)
1148 (* instead of lifo *)
1149 SourcesQueueCreate.oldest_last
();
1151 (* We should change this to 'oldest_first' to improve Queue.remove *)
1152 (* instead of fifo *)
1153 SourcesQueueCreate.oldest_first
();
1154 (* Ready saved sources *)
1155 SourcesQueueCreate.oldest_last
();
1156 (* Waiting saved sources *)
1157 SourcesQueueCreate.oldest_first
();
1159 (* We should change this to 'oldest_first' to improve Queue.remove *)
1160 (* instead of fifo *)
1161 SourcesQueueCreate.oldest_first
();
1162 SourcesQueueCreate.oldest_first
();
1163 SourcesQueueCreate.oldest_first
();
1165 SourcesQueueCreate.oldest_first
();
1166 (* Connected Sources *)
1167 SourcesQueueCreate.oldest_first
();
1168 (* Connecting Sources *)
1169 SourcesQueueCreate.oldest_first
();
1171 SourcesQueueCreate.oldest_first
();
1173 if Array.length
queues <> Array.length
queue_name then begin
1174 lprintf_nl
"[cSrc] Fatal error in CommonSources.create_queues";
1179 (*************************************************************************)
1181 (* create_file_sources_manager *)
1183 (*************************************************************************)
1185 let create_file_sources_manager file_uid
=
1187 manager_uid
= file_uid
;
1188 manager_file
= not_implemented "manager_file";
1189 manager_all_sources
= 0;
1190 manager_active_sources
= 0;
1191 manager_sources
= create_queues ();
1193 file_sources_managers := m :: !file_sources_managers;
1196 (*************************************************************************)
1198 (* remove_file_sources_manager *)
1200 (*************************************************************************)
1202 let remove_file_sources_manager m =
1203 iter_all_sources (fun s
->
1205 List.filter
(fun r
-> r
.request_file
!= m) s
.source_files
;
1207 m.manager_sources
<- create_queues ();
1208 file_sources_managers := List2.removeq
m !file_sources_managers
1211 (*************************************************************************)
1213 (* number_of_sources *)
1215 (*************************************************************************)
1216 (* get number of sources for a file*)
1217 let number_of_sources f =
1218 f.manager_all_sources
1220 (*************************************************************************)
1222 (* create_source_by_uid *)
1224 (*************************************************************************)
1226 let create_source_by_uid uid cc
=
1228 let finder = { dummy_source with source_uid
= uid
} in
1229 HS.find
sources_by_uid finder
1232 if !verbose_sources
> 1 then
1233 lprintf_nl
"[cSrc] Creating new source";
1234 let n = CommonClient.book_client_num
() in
1235 let s = { dummy_source with
1240 source_country_code
= cc
;
1242 HS.add
sources_by_uid s;
1243 H.add
sources_by_num s;
1246 (*************************************************************************)
1248 (* find_source_by_uid *)
1250 (*************************************************************************)
1252 let find_source_by_uid uid
=
1253 let finder = { dummy_source with source_uid
= uid
} in
1254 HS.find
sources_by_uid finder
1256 (*************************************************************************)
1258 (* find_source_by_num *)
1260 (*************************************************************************)
1262 let find_source_by_num num
=
1263 let finder = { dummy_source with source_num
= num
} in
1264 H.find
sources_by_num finder
1266 (*************************************************************************)
1270 (*************************************************************************)
1272 let rec iter_has_request rs file
=
1274 | [] -> raise Not_found
1276 if r
.request_file
== file
then r
1277 else iter_has_request tail file
1279 let find_request s file
=
1280 iter_has_request s.source_files file
1282 (*************************************************************************)
1284 (* find_request_result *)
1286 (*************************************************************************)
1288 let find_request_result s file
=
1289 let r = find_request s file
in
1290 let score = r.request_score in
1291 if score <= not_found_score then File_not_found
1292 else if score <= possible_score then File_possible
1293 else if score <= found_score then File_found
1294 else if score <= chunk_score then File_chunk
1295 else if score <= initial_new_source_score then File_new_source
1298 (*************************************************************************)
1302 (*************************************************************************)
1304 let check_time time
=
1305 if time
= 0 then last_time
() - 650
1306 else time
(* changed 2.5.24 *)
1308 let add_request s file time
=
1311 let r = find_request s file
in
1312 remove_from_queue s r;
1313 set_score_part r (if r.request_score = initial_new_source_score then
1315 else r.request_score - 1);
1316 r.request_time
<- check_time time
;
1320 request_file
= file
;
1321 request_time
= check_time time
;
1322 request_score = possible_score;
1323 request_queue
= outside_queue;
1325 s.source_files
<- r :: s.source_files
;
1327 reschedule_source_for_file false s r;
1330 (*************************************************************************)
1332 (* set_request_score *)
1334 (*************************************************************************)
1336 let rec set_request_score s file
score =
1338 let r = find_request s file
in
1340 (* If a request has been done in the last half-hour, and the source is
1341 announced as new, just forget it. : why half-hour? - trying min_reask_delay *)
1342 score = initial_new_source_score &&
1343 r.request_time
+ !!min_reask_delay
> last_time
()
1345 (* If a file has been paused, and resumed, it is flagged outside_queue / not_found_score in
1346 clean_sources, but really should be re-added to the queues as soon as possible (while retaining
1347 its request_time) or it is skipped for far too long (if it is even found again) - reschedule
1348 now puts new_source_score in old1 *)
1349 (score = initial_new_source_score &&
1350 r.request_queue
= outside_queue) then
1352 if score = initial_new_source_score
1353 then new_source_score
1355 if r.request_queue
< connected_sources_queue then
1356 remove_from_queue s r;
1357 set_score_part r score;
1358 reschedule_source_for_file false s r;
1361 request_file
= file
;
1362 request_time
= check_time 0;
1363 request_score = possible_score;
1364 request_queue
= outside_queue;
1366 set_score_part r score;
1367 s.source_files
<- r :: s.source_files
;
1368 reschedule_source_for_file false s r
1370 (*************************************************************************)
1372 (* set_request_result *)
1374 (*************************************************************************)
1376 let set_request_result s file result
=
1377 set_request_score s file
1379 | File_not_found
-> not_found_score
1380 | File_found
-> found_score
1381 | File_chunk
-> chunk_score
1382 | File_upload
-> upload_score
1383 | File_new_source
-> initial_new_source_score
1384 | File_possible
-> possible_score + 1)
1386 (*************************************************************************)
1388 (* source_to_value *)
1390 (*************************************************************************)
1392 let source_to_value s assocs
=
1393 let requests = ref [] in
1395 if r.request_score > possible_score then
1398 [once_value
(string_to_value
r.request_file
.manager_uid
);
1399 int_to_value
r.request_score;
1400 int_to_value
r.request_time
]
1403 if !requests = [] then raise Exit
;
1405 ("sscore", int_to_value
s.source_score
) ::
1406 ("addr", M.source_uid_to_value
s.source_uid
) ::
1407 ("brand", M.source_brand_to_value
s.source_brand ) ::
1408 ("files", smalllist_to_value
(fun s -> s)
1410 ("age", int_to_value
s.source_age
) ::
1415 (*************************************************************************)
1419 (*************************************************************************)
1421 let query_file s file
=
1422 if file_state
(file
.manager_file
()) = FileDownloading
then
1423 let r = find_request s file
in
1424 if r.request_time
+ !!min_reask_delay
<= last_time
() then
1426 (* There is really no need to query a not found source again
1427 for the file ... not even after an hour! *)
1428 if r.request_score > not_found_score then
1432 (*************************************************************************)
1436 (*************************************************************************)
1437 (* Query a source for all of its known files*)
1440 query_file s f.request_file
;
1444 (*************************************************************************)
1446 (* add_saved_source_request *)
1448 (*************************************************************************)
1450 let add_saved_source_request s uid
score time
=
1451 if !verbose_sources
> 1 then
1452 lprintf_nl
"[cSrc] Request %s %d %d" uid
score time
;
1455 functions.function_string_to_manager uid
1457 if !verbose_sources
> 0 then
1458 lprintf_nl
"[cSrc] CommonSources: add_saved_source_request -> %s not found" uid
;
1461 let r = add_request s file time
in
1462 set_score_part r score;
1463 reschedule_source_for_file true s r;
1464 if !verbose_sources
> 1 then
1465 lprintf_nl
"[cSrc] Put saved source %d in queue %s" s.source_num
1466 queue_name.(r.request_queue
)
1468 (*************************************************************************)
1470 (* value_to_source *)
1472 (*************************************************************************)
1474 let value_to_source assocs
=
1475 (* lprintf "(1) value_to_source\n"; *)
1476 let get_value name conv
= conv
(List.assoc
name assocs
) in
1478 let addr = get_value "addr" M.value_to_source_uid
in
1479 let files = get_value "files"
1480 (value_to_list
(fun s -> s)) in
1483 try get_value "age" value_to_int
with _ -> 0 in
1485 let score = try get_value "sscore" value_to_int
with _ -> 0 in
1486 let brand = try get_value "brand" M.value_to_source_brand
with _ ->
1487 M.dummy_source_brand
in
1489 if !verbose_sources
> 1 then
1490 lprintf_nl
"[cSrc] New source from value";
1491 let s = create_source_by_uid addr None
in
1492 s.source_score
<- score;
1493 s.source_age
<- last_conn;
1494 s.source_brand <- brand;
1496 (* lprintf "(2) value_to_source \n"; *)
1500 | OnceValue v
-> iter v
1501 | List
[uid
; score; time
] | SmallList
[uid
; score; time
] ->
1503 let uid = value_to_string
uid in
1504 let score = value_to_int
score in
1505 let time = value_to_int
time in
1507 (* added in 2.5.27 to fix a bug introduced in 2.5.25 *)
1509 if score land 0xffff = 0 then score asr 16 else score in
1511 (* lprintf "(3) value_to_source \n"; *)
1513 add_saved_source_request s uid score time
1516 if !verbose_sources
> 1 then
1517 lprintf_nl
"[cSrc] CommonSources.value_to_source: exception %s in iter request"
1518 (Printexc2.to_string e
))
1520 | (StringValue
_) as uid ->
1522 let uid = value_to_string
uid in
1523 (* lprintf "(4) value_to_source \n"; *)
1527 add_saved_source_request s uid score time
1530 if !verbose_sources
> 1 then
1531 lprintf_nl
"[cSrc] CommonSources.value_to_source: exception %s in iter request"
1532 (Printexc2.to_string e
))
1537 (* lprintf "(5) value_to_source \n"; *)
1538 List.iter iter files;
1539 (* lprintf "(6) value_to_source \n"; *)
1540 raise SideEffectOption
1542 (*************************************************************************)
1544 (* refill_sources *)
1546 (*************************************************************************)
1548 let refill_sources () =
1550 (* Wait for 9 seconds before refilling, since we put at least 10 seconds
1551 of clients in the previous bucket.
1553 wrong assumption for me :
1554 we may have failed to fill the queue with what was available
1555 if !last_refill + 8 < last_time () then
1558 last_refill := last_time
();
1559 if !verbose_sources
> 0 then begin
1560 lprintf_nl
"[cSrc] CommonSources.refill_sources BEFORE:";
1561 let buf = Buffer.create
100 in
1563 lprintf
"%s\n" (Buffer.contents
buf);
1567 how much consecutive sources in the queue a file can have
1568 source_f1|source_f1|source_f1|source_f2...
1569 <- - - - - - - 3 - - - - - ->
1570 10 for finer priority scaling
1572 let max_consecutive = 10 in
1575 get at most nsources direct sources from a file
1576 return number of sources found,new queue position
1578 let rec get_sources nsource
m queue took
=
1579 (* do_not_try == avoid source bounceback, i.e. a dustbin *)
1580 if queue >= do_not_try_queue || nsource
<= 0 then
1581 (* we tried all queue or found enough sources, good bye!*)
1584 let q = m.manager_sources
.(queue) in
1585 if Queue.length
q > 0 then
1586 let (request_time
, s) = Queue.head
q in
1587 let throttled = queue_period.(queue) > 0 && nsource
> 1 in
1588 let throttle_delay = get_throttle_delay m queue throttled in
1589 if request_time
+ !!min_reask_delay
+ throttle_delay < last_time
() then begin
1590 if !verbose_sources
> 1 then
1591 lprintf_nl
"[cSrc] Sources: take source from Queue[%s] for %s"
1593 (file_best_name
(m.manager_file
()));
1594 (* put in the connecting queue*)
1595 source_connecting s;
1596 if M.direct_source
s.source_uid
then begin
1597 Fifo.put
next_direct_sources s;
1598 (* we found a direct source try again in the _same_ queue *)
1599 get_sources (nsource
-1) m queue (took
+1)
1602 next_indirect_sources := s :: !next_indirect_sources;
1603 (* we found an indirect source try again in the _same_
1604 queue. indirect sources are "for free". *)
1605 get_sources nsource
m queue took
1609 if !verbose_sources
> 1 then
1610 lprintf_nl
"[cSrc] Source of queue %s is not ready for %s"
1611 queue_name.(queue) (file_best_name
(m.manager_file
()));
1612 (* too early to take sources in this queue try again in the _next_ queue*)
1613 if queue_period.(queue) = 0 then
1614 (* queue not throttled, try next queue *)
1616 (* a maximum of just one source from old3 queue *)
1617 if queue+1 >= old_sources3_queue then min
1 nsource
1619 get_sources to_take m (queue+1) took
1621 (* throttled queue, and no ready sources ... *)
1623 (* nsource = 1 not even a ready source without throttle-delay *)
1624 get_sources 0 m (queue) took
1627 (* finaly try to take at least one source, regardless of throttles *)
1628 get_sources 1 m (queue) took
1631 if !verbose_sources
> 1 then
1632 lprintf_nl
"[cSrc] Queue %s is empty for %s"
1633 queue_name.(queue) (file_best_name
(m.manager_file
()));
1634 (* no sources in this queue try again in the _next_ queue *)
1636 (* a maximum of just one source from old3 queue *)
1637 if queue+1 >= old_sources3_queue then min
1 nsource
1639 get_sources to_take m (queue+1) took
1642 (* recalc list if there's no new file*)
1643 (* Fill only with sources from files being downloaded *)
1645 let nfiles = ref 0 in
1646 let files = ref [] in
1647 let min_priority = ref 0 in
1648 let sum_priority = ref 0 in
1650 match file_state
(m.manager_file
()) with
1651 | FileDownloading
->
1652 let priority = file_priority
(m.manager_file
()) in
1653 min_priority := min
!min_priority priority;
1654 sum_priority := !sum_priority + priority;
1655 files := (priority, m ) :: !files;
1657 | _ -> () ) !file_sources_managers;
1659 if !files <> [] then begin
1661 (* 'normalize' to 0 priorities*)
1662 sum_priority := !sum_priority + (!nfiles * (-(!min_priority)));
1663 (* update priorities to be > 0 *)
1664 files := List.map
(fun (p
, f) ->
1665 let np = p
- (!min_priority) in
1666 if np = 0 then begin
1670 else (np, f) ) !files;
1672 (*sort by highest priority*)
1673 files := List.sort
(fun (p1
,_) (p2
,_) -> compare p2 p1
) !files;
1675 (* calc sources queue size
1676 at least 3 sources per file*)
1677 let nsources = max
(!nfiles * 3)
1678 (functions.function_max_connections_per_second
() * 10) in
1680 (* calc how much sources a file can get according to its priority*)
1681 let sources_per_prio =
1682 (float_of_int
nsources) /. (float_of_int
!sum_priority) in
1686 iter through files to queue sources
1687 flist_todo : next files to test
1688 assigned : number of sources already queued
1689 looped : number of times we allow to loop try to fill queue of sources
1690 (how hard we try to fill queue)
1692 let rec iter_files assigned looped
=
1694 (* throw in new sources at high pace and do not care
1695 about them in get_sources, this avoids "locking" a
1696 file's queue sources with thousands of new sources
1698 let try_some_new_sources () =
1701 let f = m.manager_file
() in
1702 let q = m.manager_sources
.(new_sources_queue) in
1703 if file_state
f = FileDownloading
&& Queue.length
q > 0 then
1704 let (request_time
, s) = Queue.head
q in
1705 source_connecting s;
1706 if M.direct_source
s.source_uid
then begin
1708 Fifo.put
next_direct_sources s
1711 next_indirect_sources := s :: !next_indirect_sources
1712 ) !file_sources_managers;
1715 let cleanup_some_old_sources () =
1716 (* Cleanup some sources *)
1718 let f = m.manager_file
() in
1719 if file_state
f = FileDownloading
then
1720 let remove_old q t
=
1721 if Queue.length
q > 0 then
1722 let (request_time
, s) = Queue.head
q in
1723 if request_time
+ t
< last_time
() then
1724 remove_from_queue s (find_request s m) in
1726 remove_old m.manager_sources
.(do_not_try_queue) 14400;
1727 remove_old m.manager_sources
.(old_sources3_queue) 2400;
1728 remove_old m.manager_sources
.(old_sources2_queue) 1200
1729 ) !file_sources_managers in
1731 let rec aux flist_todo assigned
=
1732 if assigned
>= nsources then cleanup_some_old_sources ()
1734 match flist_todo
with
1735 | (prio
, file) :: t
->
1737 min
(truncate
(sources_per_prio *. (float_of_int prio
)))
1739 let to_take = max
tt 1 in
1740 (* allow at least one source per file :
1741 we will overflow a bit the expected next_direct_sources length
1742 but it's for the good cause : not 'starving' some files
1744 let took = get_sources to_take file good_sources_queue 0 in
1745 aux t
(assigned
+ took)
1748 cleanup_some_old_sources ();
1750 (* more power to the "runaway" (most overloaded) file, pick extra sources *)
1752 let q = find_throttled_queue good_sources_queue in
1753 if queue_period.(q) > 0 then
1754 let max_overloaded =
1755 List.hd
(find_max_overloaded q !file_sources_managers) in
1757 count_file_ready_sources max_overloaded q true in
1758 if overhead > 0 then
1759 get_sources max_consecutive max_overloaded good_sources_queue 0
1764 (* allow at most looped re-iter of list to not
1766 iter_files (assigned
+ em) (looped
- 1)
1768 let extr = try_some_new_sources () in
1769 aux !files (assigned
+ extr)
1774 (* adjust queue throttling *)
1775 let all_ready = ref 0 in
1777 let queue_throttled_ready = count_ready_sources q true in
1778 let queue_ready = count_ready_sources q false in
1779 all_ready := !all_ready + queue_throttled_ready;
1780 if !all_ready > nsources && queue_throttled_ready > 0 then begin
1781 (* no need, to increase period on a queue without ready sources *)
1782 (* lprintf "commonSources: increasing queue throttling for (ar=%d rc=%d qr=%d) %s\n" !allReady nsources queueReady queue_name.(q); *)
1783 queue_period.( q ) <- queue_period.( q ) + 1
1786 if queue_ready = 0 then begin
1787 (* lprintf "commonSources: resetting queue throttling to 0 (ar=%d rc=%d qr=%d) %s\n" !allReady nsources queueReady queue_name.(q); *)
1788 queue_period.( q ) <- 0
1791 (* lprintf "commonSources: decreasing queue throttling for (ar=%d rc=%d qr=%d) %s\n" !allReady nsources queueReady queue_name.(q); *)
1792 queue_period.( q ) <- max
0 (queue_period.( q ) - 1)
1795 ) [ good_sources_queue; old_sources1_queue; old_sources2_queue; old_sources3_queue ];
1799 if !verbose_sources
> 0 then begin
1800 lprintf_nl
"[cSrc] CommonSources.refill_sources AFTER:";
1801 let buf = Buffer.create
100 in
1803 lprintf
"%s\n" (Buffer.contents
buf);
1806 lprintf_nl
"[cSrc] Exception %s in refill_sources"
1807 (Printexc2.to_string e
)
1810 (*************************************************************************)
1812 (* clean_sources helper *)
1814 (*************************************************************************)
1815 let put_all_outside_queue m q queue =
1816 let _, s = Queue.take
q in
1817 m.manager_all_sources
<- m.manager_all_sources
- 1;
1818 if active_queue queue then
1819 m.manager_active_sources
<- m.manager_active_sources
- 1;
1821 if r.request_file
== m then begin
1822 r.request_queue
<- outside_queue;
1823 set_score_part r not_found_score
1827 (*************************************************************************)
1831 (*************************************************************************)
1833 let clean_sources () =
1834 (* Maybe this should be dependant on the file (priority, state,...) ? *)
1835 let max_sources_per_file = functions.function_max_sources_per_file
() in
1837 match file_state
(m.manager_file
()) with
1838 | FileDownloading
->
1839 let nsources = m.manager_all_sources
in
1840 if nsources > max_sources_per_file then
1841 let rec iter nsources q queue =
1842 if nsources > 0 then
1843 if Queue.length
q > 0 &&
1844 queue <> good_sources_queue then begin
1845 put_all_outside_queue m q queue;
1846 iter (nsources-1) q queue
1849 let do_iter q = iter nsources m.manager_sources
.(q) q in
1851 if queue = old_sources1_queue then do_iter do_not_try_queue
1852 else if queue = do_not_try_queue then do_iter new_sources_queue
1853 else if queue = new_sources_queue then do_iter waiting_saved_sources_queue
1854 else if queue > good_sources_queue then do_iter (queue-1)
1857 iter (nsources - max_sources_per_file) (m.manager_sources
.(old_sources3_queue)) old_sources3_queue
1860 let rec iter q queue =
1861 if Queue.length
q > 0 then begin
1862 put_all_outside_queue m q queue;
1867 iter m.manager_sources
.(queue-1) (queue-1)
1869 iter (m.manager_sources
.(do_not_try_queue)) do_not_try_queue
1870 ) !file_sources_managers
1872 (*************************************************************************)
1874 (* connect_sources *)
1876 (*************************************************************************)
1878 let connect_sources connection_manager
=
1880 if !verbose_sources
> 1 then
1881 lprintf_nl
"[cSrc] connect_sources";
1882 (* After 2 minutes, consider that connections attempted should be revoked. *)
1884 if !verbose_sources
> 1 then
1885 lprintf_nl
"[cSrc] revoke connecting sources...";
1887 if not
(Fifo.empty
connecting_sources) then
1888 let (time, s) = Fifo.head
connecting_sources in
1889 if time <> s.source_last_attempt
then begin
1890 ignore
(Fifo.take
connecting_sources);
1893 else if time + 120 < last_time
() then begin
1894 ignore
(Fifo.take
connecting_sources);
1895 if s.source_last_attempt
<> 0 then source_disconnected s;
1901 (* First, require !!max_connections_per_second sources to connect to us.
1902 The probability is very high they won't be able to connect to us. *)
1904 if !verbose_sources
> 1 then
1905 lprintf_nl
"[cSrc] connect indirect sources...";
1906 let (first_sources
, last_sources
) =
1907 List2.cut
!!max_connections_per_second
!next_indirect_sources in
1908 next_indirect_sources := last_sources
;
1910 ignore
(connect_source s)) first_sources
;
1912 (* Second, for every file being downloaded, query sources that are already
1913 connected if needed *)
1914 if !verbose_sources
> 1 then
1915 lprintf_nl
"[cSrc] query connected sources...";
1917 match file_state
(m.manager_file
()) with
1918 | FileDownloading
->
1919 let q = m.manager_sources
.(connected_sources_queue) in
1921 if Queue.length
q > 0 then
1922 let (time, s) = Queue.head
q in
1923 if time + !!min_reask_delay
< last_time
() then begin
1925 let r = find_request s m in
1926 (* lprintf "commonSources: connect_sources: second place for source_query !?\n"; *)
1927 (* isn't that here pretty useless? *)
1929 (* After this step, the source is
1930 either in 'busy_sources_queue',
1931 if for some reason, the request
1932 could not be sent, or in
1933 'connected_sources_queue' at the
1934 tail if the request could be sent.
1935 This seems thus safe.
1941 ) !file_sources_managers;
1943 if !verbose_sources
> 1 then
1944 lprintf_nl
"[cSrc] connect to sources...";
1945 (* Finally, connect to available sources *)
1947 let max_sources = functions.function_max_connections_per_second
() in
1948 if !verbose_sources
> 1 then
1949 lprintf_nl
"[cSrc] max_sources: %d" max_sources;
1950 let rec iter nsources refilled
=
1951 if nsources > 0 && can_open_connection connection_manager
then
1952 if Fifo.length
next_direct_sources > 0 then
1953 let s = Fifo.take
next_direct_sources in
1956 match s.source_sock
with
1958 if !verbose_sources
> 1 then
1959 lprintf_nl
"[cSrc] not connected"; nsources
1960 | _ -> nsources - 1 in
1961 iter nsources refilled
1962 else if not refilled
then begin
1966 iter max_sources false;
1967 if !verbose_sources
> 1 then
1968 lprintf_nl
"[cSrc] done connect_sources";
1973 (*************************************************************************)
1975 (* attach_sources_to_file *)
1977 (*************************************************************************)
1979 let value_to_module f v
=
1981 | Module list
-> f list
1982 | _ -> failwith
"Option should be a module"
1984 let option = define_option_class
"Source"
1986 (* lprintf "(n) source !!\n"; *)
1987 value_to_module value_to_source v
)
1988 (fun s -> Module
(source_to_value s []))
1990 let file_sources_option = ref None
1992 let attach_sources_to_file section
=
1993 (* lprintf "attach_sources_to_file\n"; *)
1994 let sources = match !file_sources_option with
1996 (* lprintf "attaching sources this time\n"; *)
1997 let sources = define_option section
1998 ["sources"] "" (listiter_option
option) [] in
1999 (* lprintf "done\n"; *)
2000 file_sources_option := Some
sources;
2002 | Some
sources -> sources in
2004 HS.iter (fun s -> sources =:= s :: !!sources) sources_by_uid;
2006 (fun _ -> sources =:= [])
2009 (*************************************************************************)
2013 (*************************************************************************)
2016 Heap.add_memstat
M.module_name
(fun level
buf ->
2018 let nsources_per_queue = Array.make
nqueues 0 in
2019 let nready_per_queue = Array.make
nqueues 0 in
2021 for i
= 0 to nqueues -1 do
2022 let q = m.manager_sources
.(i
) in
2023 let nready = ref 0 in
2024 let nsources = ref 0 in
2025 let ready_threshold = last_time
() - !!min_reask_delay
in
2026 Queue.iter (fun (time, s) ->
2028 if time < ready_threshold then incr
nready
2029 else if i
= new_sources_queue then begin
2030 Printf.bprintf
buf "ERROR: Source is not ready in new_sources_queue !\n";
2034 nsources_per_queue.(i
) <- nsources_per_queue.(i
) + !nsources;
2035 nready_per_queue.(i
) <- nready_per_queue.(i
) + !nready;
2037 ) !file_sources_managers;
2039 Printf.bprintf
buf "\nFor all managers (%d):\n" (List.length
!file_sources_managers);
2040 for i
= 0 to nqueues - 1 do
2041 Printf.bprintf
buf " Queue[%s]: %d entries (%d ready)\n"
2042 queue_name.(i
) nsources_per_queue.(i
) nready_per_queue.(i
);
2045 let nsources = ref 0 in
2046 HS.iter (fun _ -> incr
nsources) sources_by_uid;
2047 Printf.bprintf
buf "Sources by UID table: %d entries\n" !nsources;
2048 let a1, a2
, a3
, a4
, a5
, a6
= HS.stats
sources_by_uid in
2049 Printf.bprintf
buf "Sources by UID table stats: %d %d %d %d %d %d\n"
2053 H.iter (fun _ -> incr
nsources) sources_by_num;
2054 Printf.bprintf
buf "Sources by NUM table: %d entries\n" !nsources;
2055 let a1, a2
, a3
, a4
, a5
, a6
= H.stats
sources_by_num in
2056 Printf.bprintf
buf "Sources by NUM table stats: %d %d %d %d %d %d\n"
2059 Printf.bprintf
buf "Used indirect connections: %d\n"
2060 !indirect_connections;
2062 let nconnected = ref 0 in
2063 Fifo.iter (fun (_, s) ->
2064 if s.source_last_attempt
= 0 then incr
nconnected;
2065 ) connecting_sources;
2066 Printf.bprintf
buf "Connecting Sources: %d entries"
2067 (Fifo.length
connecting_sources);
2068 if !nconnected > 0 then
2069 Printf.bprintf
buf " (connected: %d)" !nconnected;
2070 Printf.bprintf
buf "\n";
2072 Printf.bprintf
buf "Next Direct Sources: %d entries\n"
2073 (Fifo.length
next_direct_sources);
2075 Printf.bprintf
buf "Next Indirect Sources: %d entries\n"
2076 (List.length
!next_indirect_sources)