2 * Copyright (c) Facebook, Inc. and its affiliates.
4 * This source code is licensed under the MIT license found in the
5 * LICENSE file in the "hack" directory of this source tree.
9 module Hack_bucket
= Bucket
11 module Bucket
= Hack_bucket
12 open Typing_service_types
17 The type checking service receives a list of files and their symbols as input and
18 distributes the work to worker processes.
20 A worker process's input is a subset of files. At the end of its work,
21 it returns back to master the following progress report:
22 - completed: a list of computations it completed
23 - remaining: a list of computations which it didn't end up performing. For example,
24 it may not have been able to get through the whole list of computations because
25 it exceeded its worker memory cap
26 - deferred: a list of computations which were discovered to be necessary to be
27 performed before some of the computations in the input subset
29 The deferred list needs further explanation, so read below.
33 Here's how we actually plumb through the workitems and their individual results, via MultiWorker:
34 * We have mutable state [ref files_to_process], a list of filenames still to be processed
35 for the entire typecheck.
36 * Datatype [progress] represents an input batch of work for a worker to do,
37 and also an output indication of how it dispatched that batch
38 * Datatype [typing_result] represents the accumulation of telemetry, errors, deps results from workers.
39 Each individual worker is given an empty [typing_result] as input,
40 and merges its own results into that empty input to give a per-worker output,
41 and then we have a separate accumulator and we merge each per-worker's output
42 into that accumulator.
43 * next : () -> progress
44 This mutates [files_to_process] by removing a bucket of filenames,
45 and returns a degenerate progress {remaining=bucket; completed=[]; deferred=[]}
46 which is basically just the bucket of work to be done by the job.
47 * neutral : typing_result
48 This value is just the empty typing_result {errors=Empty; deps=Empty; telemetry=Empty}
49 * job : typing_result -> progress -> (typing_result', progress')
50 MultiWorker will invoke this job. For input,
51 it provides a copy of the degenerate [typing_result] that it got from [neutral], and
52 it provides the degenerate [progress] i.e. just the bucket of work that it got from [next]
53 The behavior of our job is to take items out of progress.remaining and typecheck them.
54 It repeats this process until it either runs out of items or its heap grows too big.
55 It returns a new progress' {remaining'; completed; deferred} to show the items still
56 remaining, the ones it completed, and the ones it had to defer (see below).
57 It returns a new typing_result' {errors; deps; telemetry} by merging its own
58 newly discovered errors, deps, telemetry onto the (degenerate i.e. empty)
59 typing_result it was given as input.
60 * merge : (typing_result * progress) (accumulator : typing_result) -> typing_result
61 The initial value of accumulator is the same [neutral] that was given to each job.
62 After each job, MultiWorker calls [merge] to merge the results of that job
64 Our merge function looks at the progress {remaining;completed;deferred} that
65 came out of the job, and mutates [files_to_process] by sticking back "remaining+deferred" into it.
66 It then merges the typing_result {errors;deps;telemetry} that came out of the job
67 with those in its accumulator.
69 The type signatures for MultiWorker look like they'd allow a variety of implementations,
70 e.g. having just a single accumulator that starts at "neutral" and feeds one by one into
71 each job. But we don't do that.
75 The normal computation kind for the type checking service is to type check all symbols in a file.
76 The type checker would always be able to find a given symbol it needs to perform this job
77 if all the declarations were computed for all the files in the repo before type checking begins.
79 However, such eager declaration does not occur in the following scenarios:
80 - When the server initialization strategy is Lazy: not having a declaration phase was
81 the original meaning of lazy init
82 - If the server initialized from a saved state: most decls would not be available because
83 saved states only consist of one or more of the following: naming table, dependency
84 graph, errors, and hot decls (a small subset of all possible decls)
86 The decl heap module, when asked for a decl that is not yet in memory, can declare the decl's
87 file lazily. Declaring a file means parsing the file and extracting the symbols declared in it,
88 hence the term 'decl'.
90 This lazy declaration strategy works reasonably well for most cases, but there's a case where
91 it works poorly: if a particular file that is getting type checked requires a large number
92 of symbols which have not yet been declared, the unlucky worker ends up serially declaring
93 all the files that contain those symbols. In some cases, we observe type checking times on
94 the order of minutes for a single file.
96 Therefore, to account for the fact that we don't have the overall eager declaration phase
97 in some initialization scenarios, the decl heap module is now capable of refusing to declare
98 a file and instead adding the file to a list of deferments.
100 The type checker worker is then able to return that list and the file which was being type
101 checked when deferments occured back to the master process. The master process is then
102 able to distribute the declaration of the files and the (re)checking of the original
103 file to all of its workers, thus achieving parallelism and better type checking times overall.
105 The deferment of declarations is adjustable: it works by using a threshold value that dictates
106 the maximum number of lazy declarations allowed per file. If the threshold is not set, then
107 there is no limit, and no declarations would be deferred. If the threshold is set at 1,
108 then all declarations would be deferred.
110 The idea behind deferring declarations is similar to the 2nd of the 4 build systems considered in
111 the following paper (Make, Excel's calc engine, Shake, and Bazel):
112 https://www.microsoft.com/en-us/research/uploads/prod/2018/03/build-systems-final.pdf
114 The paper refers to this approach as "restarting", and further suggests that recording and reusing
115 the chain of jobs could be used to minimize the number of restarts.
118 module Delegate
= Typing_service_delegate
120 type progress
= job_progress
122 let neutral : unit -> typing_result
=
125 errors
= Errors.empty
;
126 dep_edges
= Typing_deps.dep_edges_make
();
127 telemetry
= Telemetry.create
();
128 jobs_finished_to_end
= Measure.create
();
129 jobs_finished_early
= Measure.create
();
132 (*****************************************************************************)
133 (* The job that will be run on the workers *)
134 (*****************************************************************************)
136 let handle_exn_as_error : type res
. Pos.t
-> (unit -> res
option) -> res
option
140 | WorkerCancel.Worker_should_exit
as e
->
141 (* Cancellation requests must be re-raised *)
144 Errors.exception_occurred pos
(Exception.wrap e
);
147 let type_fun (ctx
: Provider_context.t
) (fn
: Relative_path.t
) (x
: string) :
148 (Tast.def
* Typing_inference_env.t_global_with_pos
) option =
149 match Ast_provider.find_fun_in_file ~full
:true ctx fn x
with
151 handle_exn_as_error f
.Aast.f_span
(fun () ->
152 let fun_ = Naming.fun_ ctx f
in
153 Nast_check.def ctx
(Aast.Fun
fun_);
155 Typing_toplevel.fun_def ctx
fun_
156 |> Option.map ~f
:(fun (f
, global_tvenv
) -> (Aast.Fun f
, global_tvenv
))
158 Option.iter
def_opt (fun (f
, _
) -> Tast_check.def ctx f
);
162 let type_class (ctx
: Provider_context.t
) (fn
: Relative_path.t
) (x
: string) :
163 (Tast.def
* Typing_inference_env.t_global_with_pos list
) option =
164 match Ast_provider.find_class_in_file ~full
:true ctx fn x
with
166 handle_exn_as_error cls
.Aast.c_span
(fun () ->
167 let class_ = Naming.class_ ctx cls
in
168 Nast_check.def ctx
(Aast.Class
class_);
170 Typing_toplevel.class_def ctx
class_
171 |> Option.map ~f
:(fun (c
, global_tvenv
) ->
172 (Aast.Class c
, global_tvenv
))
174 Option.iter
def_opt (fun (f
, _
) -> Tast_check.def ctx f
);
179 (ctx
: Provider_context.t
) (fn
: Relative_path.t
) (x
: string) :
181 match Ast_provider.find_record_def_in_file ~full
:true ctx fn x
with
183 handle_exn_as_error rd
.Aast.rd_span
(fun () ->
184 let rd = Naming.record_def ctx
rd in
185 Nast_check.def ctx
(Aast.RecordDef
rd);
187 let def = Aast.RecordDef
(Typing_toplevel.record_def_def ctx
rd) in
188 Tast_check.def ctx
def;
192 let check_typedef (ctx
: Provider_context.t
) (fn
: Relative_path.t
) (x
: string)
194 match Ast_provider.find_typedef_in_file ~full
:true ctx fn x
with
196 handle_exn_as_error Pos.none
(fun () ->
197 let typedef = Naming.typedef ctx t
in
198 Nast_check.def ctx
(Aast.Typedef
typedef);
199 let ret = Typing.typedef_def ctx
typedef in
200 let def = Aast.Typedef
ret in
201 Tast_check.def ctx
def;
205 let check_const (ctx
: Provider_context.t
) (fn
: Relative_path.t
) (x
: string) :
207 match Ast_provider.find_gconst_in_file ~full
:true ctx fn x
with
210 handle_exn_as_error cst
.Aast.cst_span
(fun () ->
211 let cst = Naming.global_const ctx
cst in
212 Nast_check.def ctx
(Aast.Constant
cst);
213 let def = Aast.Constant
(Typing_toplevel.gconst_def ctx
cst) in
214 Tast_check.def ctx
def;
217 let should_enable_deferring
218 (opts
: GlobalOptions.t
) (file
: check_file_computation
) =
219 match GlobalOptions.tco_max_times_to_defer_type_checking opts
with
220 | Some max_times
when file
.deferred_count
>= max_times
-> false
223 type process_file_results
= {
225 deferred_decls
: Deferred_decl.deferment list
;
229 (dynamic_view_files
: Relative_path.Set.t
)
230 (ctx
: Provider_context.t
)
232 (file
: check_file_computation
) : process_file_results
=
233 let fn = file
.path
in
234 let (errors'
, ast
) = Ast_provider.get_ast_with_error ~full
:true ctx
fn in
235 if not
(Errors.is_empty errors'
) then
236 { errors
= Errors.merge errors' errors
; deferred_decls
= [] }
240 (Provider_context.get_tcopt ctx
) with
241 GlobalOptions.tco_dynamic_view
=
242 Relative_path.Set.mem dynamic_view_files
fn;
245 let (funs
, classes
, record_defs
, typedefs
, gconsts
) = Nast.get_defs ast
in
246 let ctx = Provider_context.map_tcopt
ctx ~f
:(fun _tcopt
-> opts) in
247 let ignore_type_record_def opts fn name
=
248 ignore
(type_record_def opts fn name
)
250 let ignore_check_typedef opts fn name
=
251 ignore
(check_typedef opts fn name
)
253 let ignore_check_const opts fn name
= ignore
(check_const opts fn name
) in
256 Deferred_decl.with_deferred_decls
257 ~enable
:(should_enable_deferring opts file
)
258 ~declaration_threshold_opt
:
259 (GlobalOptions.tco_defer_class_declaration_threshold
opts)
260 ~memory_mb_threshold_opt
:
261 (GlobalOptions.tco_defer_class_memory_mb_threshold
opts)
263 Errors.do_with_context
fn Errors.Typing
@@ fun () ->
264 let (fun_tasts
, fun_global_tvenvs
) =
266 |> List.filter_map ~f
:(type_fun ctx fn)
269 let (class_tasts
, class_global_tvenvs
) =
270 List.map classes ~f
:snd
271 |> List.filter_map ~f
:(type_class ctx fn)
274 let class_global_tvenvs = List.concat
class_global_tvenvs in
275 List.map record_defs ~f
:snd
276 |> List.iter ~f
:(ignore_type_record_def ctx fn);
277 List.map typedefs ~f
:snd
|> List.iter ~f
:(ignore_check_typedef ctx fn);
278 List.map gconsts ~f
:snd
|> List.iter ~f
:(ignore_check_const ctx fn);
279 (fun_tasts
@ class_tasts
, fun_global_tvenvs
@ class_global_tvenvs)
282 | Ok
(errors'
, (tasts
, global_tvenvs
)) ->
283 if GlobalOptions.tco_global_inference
opts then
284 Typing_global_inference.StateSubConstraintGraphs.build_and_save
288 { errors
= Errors.merge errors' errors
; deferred_decls
= [] }
289 | Error deferred_decls
-> { errors
; deferred_decls
}
291 | WorkerCancel.Worker_should_exit
as e
->
292 (* Cancellation requests must be re-raised *)
295 let stack = Caml.Printexc.get_raw_backtrace
() in
297 prerr_endline
("Exception on file " ^
Relative_path.S.to_string
fn)
299 Caml.Printexc.raise_with_backtrace e
stack
301 let get_mem_telemetry () : Telemetry.t
option =
302 if SharedMem.hh_log_level
() > 0 then
304 ( Telemetry.create
()
305 |> Telemetry.object_ ~key
:"gc" ~
value:(Telemetry.quick_gc_stat
())
306 |> Telemetry.object_ ~key
:"shmem" ~
value:(SharedMem.get_telemetry
()) )
311 ~
(check_info
: check_info
)
312 ~
(start_counters
: Counters.time_in_sec
* Telemetry.t
)
313 ~
(end_counters
: Counters.time_in_sec
* Telemetry.t
)
314 ~
(second_run_end_counters
: (Counters.time_in_sec
* Telemetry.t
) option)
315 ~
(file
: check_file_computation
)
316 ~
(result : process_file_results
) : unit =
317 let (start_time
, start_counters
) = start_counters
in
318 let (end_time
, end_counters
) = end_counters
in
319 let duration = end_time
-. start_time
in
320 let duration_second_run =
321 Option.map second_run_end_counters ~f
:(fun (time
, _
) -> time
-. end_time
)
323 let deciding_time = Option.value duration_second_run ~default
:duration in
324 (* "deciding_time" is what we compare against the threshold, to see if we should log. *)
325 (* We'll also log if it had been previously deferred, or if it's being deferred right now. *)
327 Float.(deciding_time >= check_info
.profile_type_check_duration_threshold
)
328 || file
.deferred_count
> 0
329 || not
(List.is_empty
result.deferred_decls
)
331 if should_log then begin
332 let profile = Telemetry.diff ~all
:false ~prev
:start_counters end_counters
in
333 let profile_second_run =
334 Option.map second_run_end_counters ~f
:(fun (_
, counters
) ->
335 Telemetry.diff ~all
:false ~prev
:end_counters counters
)
338 try Some
(Relative_path.to_absolute file
.path
|> Unix.stat
).Unix.st_size
339 with Unix.Unix_error _
-> None
341 let deferment_telemetry =
343 |> Telemetry.int_ ~key
:"times_checked" ~
value:(file
.deferred_count
+ 1)
345 ~key
:"files_to_declare"
346 ~
value:(List.length
result.deferred_decls
)
350 |> Telemetry.int_opt ~key
:"filesize" ~
value:filesize_opt
351 |> Telemetry.object_ ~key
:"deferment" ~
value:deferment_telemetry
352 |> Telemetry.object_ ~key
:"profile" ~
value:profile
358 ~f
:(fun telemetry profile ->
359 Telemetry.object_
telemetry ~key
:"profile_second_run" ~
value:profile)
361 HackEventLogger.ProfileTypeCheck.process_file
362 ~recheck_id
:check_info
.recheck_id
367 (Relative_path.suffix file
.path
)
368 ( if List.is_empty
result.deferred_decls
then
371 "discover-decl-deps" )
372 (Option.value duration_second_run ~default
:duration)
373 ( if SharedMem.hh_log_level
() > 0 then
374 "\n" ^
Telemetry.to_string
telemetry
379 let read_counters () : Counters.time_in_sec
* Telemetry.t
=
380 let typecheck_time = Counters.read_time
Counters.Category.Typecheck
in
381 let mem_telemetry = get_mem_telemetry () in
382 let operations_counters = Counters.get_counters
() in
385 |> Telemetry.object_opt ~key
:"memory" ~
value:mem_telemetry
386 |> Telemetry.object_ ~key
:"operations" ~
value:operations_counters )
388 module ProcessFilesTally
= struct
389 (** Counters for the [file_computation] of each sort being processed *)
391 decls
: int; (** how many [Declare] items we performed *)
392 prefetches
: int; (** how many [Prefetch] items we performed *)
393 checks_done
: int; (** how many [Check] items we typechecked *)
394 checks_deferred
: int; (** how many [Check] items we deferred to later *)
395 decls_deferred
: int; (** how many [Declare] items we added for later *)
396 exceeded_cap_count
: int; (** how many times we exceeded the memory cap *)
406 exceeded_cap_count
= 0;
409 let incr_decls tally
= { tally
with decls
= tally
.decls
+ 1 }
411 let incr_caps tally
=
412 { tally
with exceeded_cap_count
= tally
.exceeded_cap_count
+ 1 }
414 let incr_prefetches tally
= { tally
with prefetches
= tally
.prefetches
+ 1 }
416 let incr_checks tally deferred_decls
=
417 if List.is_empty deferred_decls
then
418 { tally
with checks_done
= tally
.checks_done
+ 1 }
422 checks_deferred
= tally
.checks_deferred
+ 1;
423 decls_deferred
= tally
.decls_deferred
+ List.length deferred_decls
;
427 let get_heap_size () = Gc.((quick_stat
()).Stat.heap_words
) * 8 / 1024 / 1024
430 (dynamic_view_files
: Relative_path.Set.t
)
431 (ctx : Provider_context.t
)
432 ({ errors
; dep_edges
; telemetry; jobs_finished_early
; jobs_finished_to_end
} :
434 (progress
: computation_progress
)
435 ~
(memory_cap
: int option)
436 ~
(check_info
: check_info
) : typing_result
* computation_progress
=
437 SharedMem.invalidate_caches
();
438 File_provider.local_changes_push_sharedmem_stack
();
439 Ast_provider.local_changes_push_sharedmem_stack
();
441 Decl_counters.set_mode check_info
.profile_decling
;
442 let _prev_counters_state = Counters.reset
() in
443 let (_start_counter_time
, start_counters
) = read_counters () in
444 let tally = ProcessFilesTally.empty in
445 let start_file_count = List.length progress
.remaining
in
446 let start_time = Unix.gettimeofday
() in
448 let rec process_or_exit errors progress
tally max_heap_mb
=
449 (* If the major heap has exceeded the bounds, we decline to typecheck the remaining files.
450 We use [quick_stat] instead of [stat] in get_heap_size in order to avoid walking the major heap,
451 and we don't change the minor heap because it's small and fixed-size.
452 The start-remaining test is to make sure we make at least one file of progress
453 even in case of a crazy low memory cap. *)
454 let heap_mb = get_heap_size () in
455 let max_heap_mb = Int.max
heap_mb max_heap_mb in
456 let cap = Option.value memory_cap ~default
:Int.max_value
in
458 heap_mb > cap && start_file_count > List.length progress
.remaining
460 let (exit_now
, tally, heap_mb) =
462 (true, ProcessFilesTally.incr_caps tally, heap_mb)
464 (false, tally, heap_mb)
466 match progress
.remaining
with
467 | [] -> (errors
, progress
, tally, heap_mb, max_heap_mb)
469 let cgroup_stats = CGroup.get_stats
() in
470 (match cgroup_stats with
472 | Ok
{ CGroup.total
; _
} ->
473 Measure.sample
"worker_cgroup_total" (float_of_int total
));
474 (errors
, progress
, tally, heap_mb, max_heap_mb)
476 let (errors
, deferred
, tally) =
479 let process_file () =
480 process_file dynamic_view_files
ctx errors file
483 if check_info
.profile_log then (
484 let start_counters = read_counters () in
485 let result = process_file () in
486 let end_counters = read_counters () in
487 let second_run_end_counters =
488 if check_info
.profile_type_check_twice
then
489 (* we're running this routine solely for the side effect *)
490 (* of seeing how long it takes to run. *)
491 let _ignored = process_file () in
492 Some
(read_counters ())
500 ~
second_run_end_counters
508 ProcessFilesTally.incr_checks tally result.deferred_decls
511 if List.is_empty
result.deferred_decls
then
514 List.map
result.deferred_decls ~f
:(fun fn -> Declare
fn)
515 @ [Check
{ file
with deferred_count
= file
.deferred_count
+ 1 }]
517 (result.errors
, deferred, tally)
518 | Declare
(_path
, class_name
) ->
519 let (_
: Decl_provider.class_decl
option) =
520 Decl_provider.get_class
ctx class_name
522 (errors
, [], ProcessFilesTally.incr_decls tally)
525 (errors
, [], ProcessFilesTally.incr_prefetches tally)
529 completed
= fn :: progress.completed
;
531 deferred = List.concat
[deferred; progress.deferred];
534 process_or_exit errors
progress tally max_heap_mb
537 (* Process as many files as we can, and merge in their errors *)
538 let (errors
, progress, tally, final_heap_mb
, max_heap_mb) =
539 process_or_exit errors
progress tally 0
544 Typing_deps.flush_ideps_batch
(Provider_context.get_deps_mode
ctx)
546 let dep_edges = Typing_deps.merge_dep_edges
dep_edges new_dep_edges in
548 (* Gather up our various forms of telemetry... *)
549 let (_end_counter_time
, end_counters) = read_counters () in
550 (* Note: the 'add' operation (performed here, and also later in case of
551 MultiWorker.merge) will strip all non-numbers from telemetry. *)
559 ~prev
:start_counters)
561 let processed_file_count =
562 start_file_count - List.length
progress.remaining
564 let processed_file_fraction =
565 float_of_int
processed_file_count /. float_of_int
start_file_count
568 if List.is_empty
progress.remaining
then
573 let open ProcessFilesTally
in
574 Measure.sample ~
record "seconds" (Unix.gettimeofday
() -. start_time);
575 Measure.sample ~
record "final_heap_mb" (float_of_int final_heap_mb
);
576 Measure.sample ~
record "files" (float_of_int
processed_file_count);
577 Measure.sample ~
record "files_fraction" processed_file_fraction;
578 Measure.sample ~
record "decls" (float_of_int
tally.decls
);
579 Measure.sample ~
record "prefetches" (float_of_int
tally.prefetches
);
580 Measure.sample ~
record "checks_done" (float_of_int
tally.checks_done
);
581 Measure.sample ~
record "checks_deferred" (float_of_int
tally.checks_deferred
);
582 Measure.sample ~
record "decls_deferred" (float_of_int
tally.decls_deferred
);
586 (float_of_int
tally.exceeded_cap_count
);
587 Measure.sample ~
record "max_heap_mb" (float_of_int
max_heap_mb);
589 TypingLogger.flush_buffers
();
590 Ast_provider.local_changes_pop_sharedmem_stack
();
591 File_provider.local_changes_pop_sharedmem_stack
();
592 ( { errors
; dep_edges; telemetry; jobs_finished_early
; jobs_finished_to_end
},
595 let load_and_process_files
596 (ctx : Provider_context.t
)
597 (dynamic_view_files
: Relative_path.Set.t
)
598 (typing_result
: typing_result
)
599 (progress : computation_progress
)
600 ~
(memory_cap
: int option)
601 ~
(check_info
: check_info
) : typing_result
* computation_progress
=
602 (* When the type-checking worker receives SIGUSR1, display a position which
603 corresponds approximately with the function/expression being checked. *)
606 (Sys.Signal_handle
Typing.debug_print_last_pos
);
615 (*****************************************************************************)
616 (* Let's go! That's where the action is *)
617 (*****************************************************************************)
619 (** Merge the results from multiple workers.
621 We don't really care about which files are left unchecked since we use
622 (gasp) mutation to track that, so combine the errors but always return an
623 empty list for the list of unchecked files. *)
625 ~
(should_prefetch_deferred_files
: bool)
626 (delegate_state
: Delegate.state
ref)
627 (files_to_process
: file_computation
BigList.t
ref)
628 (files_initial_count
: int)
629 (files_in_progress
: file_computation
Hash_set.t
)
630 (files_checked_count
: int ref)
631 ((produced_by_job
: typing_result
), (progress : progress))
632 (acc
: typing_result
) : typing_result
=
634 match progress.kind
with
636 | DelegateProgress _
->
638 Delegate.merge !delegate_state produced_by_job
.errors
progress.progress
640 let progress = progress.progress in
642 files_to_process
:= BigList.append
progress.remaining
!files_to_process
;
644 (* Let's also prepend the deferred files! *)
645 files_to_process
:= BigList.append
progress.deferred !files_to_process
;
647 (* Prefetch the deferred files, if necessary *)
649 if should_prefetch_deferred_files
&& List.length
progress.deferred > 10 then
650 let files_to_prefetch =
651 List.fold
progress.deferred ~init
:[] ~f
:(fun acc computation
->
652 match computation
with
653 | Declare
(path
, _
) -> path
:: acc
656 BigList.cons
(Prefetch
files_to_prefetch) !files_to_process
660 (* If workers can steal work from each other, then it's possible that
661 some of the files that the current worker completed checking have already
662 been removed from the in-progress set. Thus, we should keep track of
663 how many type check computations we actually remove from the in-progress
664 set. Note that we also skip counting Declare and Prefetch computations,
665 since they are not relevant for computing how many files we've type
667 let completed_check_count =
670 ~f
:(fun acc computation
->
671 match Hash_set.Poly.strict_remove files_in_progress computation
with
674 match computation
with
682 (* Deferred type check computations should be subtracted from completed
683 in order to produce an accurate count because they we requeued them, yet
684 they were also included in the completed list.
691 let deferred_check_count = List.count ~f
:is_check progress.deferred in
692 let completed_check_count = completed_check_count - deferred_check_count in
694 files_checked_count
:= !files_checked_count
+ completed_check_count;
695 let delegate_progress =
696 Typing_service_delegate.get_progress
!delegate_state
698 ServerProgress.send_percentage_progress_to_monitor
699 ~operation
:"typechecking"
700 ~done_count
:!files_checked_count
701 ~total_count
:files_initial_count
703 ~extra
:delegate_progress;
704 accumulate_job_output produced_by_job acc
707 (workers
: MultiWorker.worker list
option)
708 (delegate_state
: Delegate.state
ref)
709 (files_to_process
: file_computation
BigList.t
ref)
710 (files_in_progress
: file_computation
Hash_set.Poly.t
)
711 (record : Measure.record) =
712 let max_size = Bucket.max_size () in
715 | Some w
-> List.length w
718 let return_bucket_job kind ~current_bucket ~remaining_jobs
=
719 (* Update our shared mutable state, because hey: it's not like we're
720 writing OCaml or anything. *)
721 files_to_process
:= remaining_jobs
;
722 List.iter ~f
:(Hash_set.Poly.add files_in_progress
) current_bucket
;
726 progress = { completed
= []; remaining
= current_bucket
; deferred = [] };
730 Measure.time ~
record "time" @@ fun () ->
731 let (state
, delegate_job
) =
732 Typing_service_delegate.next
737 delegate_state
:= state
;
739 let (stolen
, state
) = Typing_service_delegate.steal state
max_size in
740 (* If a delegate job is returned, then that means that it should be done
741 by the next MultiWorker worker (the one for whom we're creating a job
742 in this function). If delegate job is None, then the regular (local
743 type checking) logic applies. *)
744 match delegate_job
with
745 | Some
{ current_bucket
; remaining_jobs
; job
} ->
746 return_bucket_job (DelegateProgress job
) current_bucket remaining_jobs
748 (* WARNING: the following List.length is costly - for a full init, files_to_process starts
749 out as the size of the entire repo, and we're traversing the entire list. *)
750 let files_to_process_length = BigList.length
!files_to_process
in
751 (match (files_to_process_length, stolen
) with
752 | (0, []) when Hash_set.Poly.is_empty files_in_progress
-> Bucket.Done
753 | (0, []) -> Bucket.Wait
754 | (_
, stolen_jobs
) ->
756 if files_to_process_length > List.length stolen_jobs
then
760 "Steal payload from local workers: %d jobs"
761 (List.length stolen_jobs
);
762 delegate_state
:= state
;
764 List.map
stolen_jobs ~f
:(fun job
->
765 Hash_set.Poly.remove files_in_progress job
;
767 | Check
{ path
; deferred_count
} ->
768 Check
{ path
; deferred_count
= deferred_count
+ 1 }
769 | _
-> failwith
"unexpected state")
771 BigList.rev_append
stolen_jobs !files_to_process
775 match num_workers with
776 (* When num_workers is zero, the execution mode is delegate-only, so we give an empty bucket to MultiWorker for execution. *)
777 | 0 -> return_bucket_job Progress
[] jobs
780 Bucket.calculate_bucket_size
781 ~num_jobs
:files_to_process_length
785 let (current_bucket
, remaining_jobs
) =
786 BigList.split_n
jobs bucket_size
788 return_bucket_job Progress current_bucket remaining_jobs
792 (next : unit -> 'a
Bucket.bucket
)
793 (files_to_process
: 'b
Hash_set.Poly.elt
BigList.t
ref)
794 (files_in_progress
: 'b
Hash_set.Poly.t
) : unit -> 'a list
=
796 (* The size of [files_to_process] is bounded only by repo size, but
797 [files_in_progress] is capped at [(worker count) * (max bucket size)]. *)
799 BigList.append
(Hash_set.Poly.to_list files_in_progress
) !files_to_process
;
800 let rec add_next acc
=
802 | Bucket.Job j
-> add_next (j
:: acc
)
810 `next` and `merge` both run in the master process and update mutable
811 state in order to track work in progress and work remaining.
812 `job` runs in each worker and does not have access to this mutable state.
814 let process_in_parallel
815 (ctx : Provider_context.t
)
816 (dynamic_view_files
: Relative_path.Set.t
)
817 (workers
: MultiWorker.worker list
option)
818 (delegate_state
: Delegate.state
)
819 (telemetry : Telemetry.t
)
820 (fnl
: file_computation
BigList.t
)
821 ~
(interrupt
: 'a
MultiWorker.interrupt_config
)
822 ~
(memory_cap
: int option)
823 ~
(check_info
: check_info
) :
824 typing_result
* Delegate.state
* Telemetry.t
* 'a
* Relative_path.t list
=
825 let record = Measure.create
() in
826 (* [record] is used by [next] *)
827 let delegate_state = ref delegate_state in
828 let files_to_process = ref fnl
in
829 let files_in_progress = Hash_set.Poly.create
() in
830 let files_processed_count = ref 0 in
831 let files_initial_count = BigList.length fnl
in
832 let delegate_progress =
833 Typing_service_delegate.get_progress
!delegate_state
835 ServerProgress.send_percentage_progress_to_monitor
836 ~operation
:"typechecking"
838 ~total_count
:files_initial_count
840 ~extra
:delegate_progress;
843 next workers
delegate_state files_to_process files_in_progress record
845 let should_prefetch_deferred_files =
847 && TypecheckerOptions.prefetch_deferred_files
848 (Provider_context.get_tcopt
ctx)
851 load_and_process_files ctx dynamic_view_files ~memory_cap ~check_info
853 let job (typing_result
: typing_result
) (progress : progress) =
854 let (typing_result
, computation_progress
) =
855 match progress.kind
with
856 | Progress
-> job typing_result
progress.progress
857 | DelegateProgress
job -> Delegate.process
job
859 (typing_result
, { progress with progress = computation_progress
})
861 let (typing_result
, env
, cancelled_results
) =
862 MultiWorker.call_with_interrupt
865 ~
neutral:(neutral ())
868 ~
should_prefetch_deferred_files
873 files_processed_count)
875 ~
on_cancelled:(on_cancelled next files_to_process files_in_progress)
879 Typing_service_delegate.add_telemetry
!delegate_state telemetry
882 ~
value:(Measure.stats_to_telemetry ~
record ())
884 let paths_of (cancelled_results
: progress list
) : Relative_path.t list
=
885 let paths_of (cancelled_progress
: progress) =
886 let cancelled_computations = cancelled_progress
.progress.remaining
in
887 let paths_of paths
(cancelled_computation
: file_computation
) =
888 match cancelled_computation
with
889 | Check
{ path
; _
} -> path
:: paths
892 List.fold
cancelled_computations ~init
:[] ~f
:paths_of
894 List.concat
(List.map cancelled_results ~f
:paths_of)
896 (typing_result
, !delegate_state, telemetry, env
, paths_of cancelled_results
)
898 type ('a
, 'b
, 'c
, 'd
) job_result
= 'a
* 'b
* 'c
* 'd
* Relative_path.t list
900 module type Mocking_sig
= sig
901 val with_test_mocking
:
902 (* real job payload, that we can modify... *)
903 file_computation
BigList.t
->
904 ((* ... before passing it to the real job executor... *)
905 file_computation
BigList.t
->
906 ('a
, 'b
, 'c
, 'd
) job_result
) ->
907 (* ... which output we can also modify. *)
908 ('a
, 'b
, 'c
, 'd
) job_result
911 module NoMocking
= struct
912 let with_test_mocking fnl f
= f fnl
915 module TestMocking
= struct
916 let cancelled = ref Relative_path.Set.empty
918 let set_is_cancelled x
= cancelled := Relative_path.Set.add
!cancelled x
920 let is_cancelled x
= Relative_path.Set.mem
!cancelled x
922 let with_test_mocking fnl f
=
923 let (mock_cancelled
, fnl
) =
924 List.partition_map
(BigList.as_list fnl
) ~f
:(fun computation
->
925 match computation
with
926 | Check
{ path
; _
} ->
927 if is_cancelled path
then
931 | _
-> `Snd computation
)
933 (* Only cancel once to avoid infinite loops *)
934 cancelled := Relative_path.Set.empty;
935 let (res
, delegate_state, telemetry, env
, cancelled) =
936 f
(BigList.create fnl
)
938 (res
, delegate_state, telemetry, env
, mock_cancelled
@ cancelled)
942 ( val if Injector_config.use_test_stubbing
then
943 (module TestMocking
: Mocking_sig
)
945 (module NoMocking
: Mocking_sig
) )
947 let should_process_sequentially
948 (opts : TypecheckerOptions.t
) (fnl
: file_computation
BigList.t
) : bool =
949 (* If decls can be deferred, then we should process in parallel, since
950 we are likely to have more computations than there are files to type check. *)
951 let defer_threshold =
952 TypecheckerOptions.defer_class_declaration_threshold
opts
954 let parallel_threshold =
955 TypecheckerOptions.parallel_type_checking_threshold
opts
957 match (defer_threshold, BigList.length fnl
) with
958 | (None
, file_count
) when file_count
< parallel_threshold -> true
961 let go_with_interrupt
962 (ctx : Provider_context.t
)
963 (workers
: MultiWorker.worker list
option)
964 (delegate_state : Delegate.state
)
965 (telemetry : Telemetry.t
)
966 (dynamic_view_files
: Relative_path.Set.t
)
967 (fnl
: Relative_path.t list
)
968 ~
(interrupt
: 'a
MultiWorker.interrupt_config
)
969 ~
(memory_cap
: int option)
970 ~
(check_info
: check_info
)
971 ~
(profiling
: CgroupProfiler.Profiling.t
) :
972 (Errors.t
, Delegate.state
, Telemetry.t
, 'a
) job_result
=
973 let opts = Provider_context.get_tcopt
ctx in
974 let sample_rate = GlobalOptions.tco_typecheck_sample_rate
opts in
975 let fnl = BigList.create
fnl in
977 if sample_rate >= 1.0 then
983 float (Base.String.hash
(Relative_path.suffix x
) mod 1000000)
984 <= sample_rate *. 1000000.0)
988 "Sampling %f percent of files: %d out of %d"
990 (BigList.length
result)
991 (BigList.length
fnl);
995 BigList.map
fnl ~f
:(fun path
-> Check
{ path
; deferred_count
= 0 })
997 Mocking.with_test_mocking fnl @@ fun fnl ->
998 let (typing_result
, delegate_state, telemetry, env
, cancelled_fnl
) =
999 if should_process_sequentially opts fnl then begin
1000 Hh_logger.log
"Type checking service will process files sequentially";
1002 { completed
= []; remaining
= BigList.as_list
fnl; deferred = [] }
1004 let (typing_result
, _progress
) =
1016 interrupt
.MultiThreadedCall.env
,
1019 Hh_logger.log
"Type checking service will process files in parallel";
1021 match (workers, TypecheckerOptions.num_local_workers
opts) with
1022 | (Some
workers, Some num_local_workers
) ->
1023 let (workers, _
) = List.split_n
workers num_local_workers
in
1041 Typing_deps.register_discovered_dep_edges typing_result
.dep_edges;
1042 let cgroup_total_max =
1043 Base.Option.value ~default
:0.0 (Measure.get_max
"worker_cgroup_total")
1045 CgroupProfiler.Profiling.record_stats
1048 ~metric
:"cgroup_total"
1049 ~
value:cgroup_total_max;
1051 if check_info
.profile_log then
1053 "Typecheck perf: %s"
1054 (HackEventLogger.ProfileTypeCheck.get_telemetry_url
1055 ~init_id
:check_info
.init_id
1056 ~recheck_id
:check_info
.recheck_id
);
1057 let job_size_telemetry =
1059 |> Telemetry.object_
1060 ~key
:"finished_to_end"
1062 (Measure.stats_to_telemetry
1063 ~
record:typing_result
.jobs_finished_to_end
1065 |> Telemetry.object_
1066 ~key
:"finished_early"
1068 (Measure.stats_to_telemetry
1069 ~
record:typing_result
.jobs_finished_early
1074 |> Telemetry.object_ ~key
:"profiling_info" ~
value:typing_result
.telemetry
1075 |> Telemetry.object_ ~key
:"job_sizes" ~
value:job_size_telemetry
1077 (typing_result
.errors
, delegate_state, telemetry, env
, cancelled_fnl
)
1080 ?
(profiling
: CgroupProfiler.Profiling.t
= CgroupProfiler.Profiling.empty)
1081 (ctx : Provider_context.t
)
1082 (workers : MultiWorker.worker list
option)
1083 (delegate_state : Delegate.state
)
1084 (telemetry : Telemetry.t
)
1085 (dynamic_view_files
: Relative_path.Set.t
)
1086 (fnl : Relative_path.t list
)
1087 ~
(memory_cap
: int option)
1088 ~
(check_info
: check_info
) : Errors.t
* Delegate.state
* Telemetry.t
=
1089 let interrupt = MultiThreadedCall.no_interrupt
() in
1090 let (res
, delegate_state, telemetry, (), cancelled) =
1103 assert (List.is_empty
cancelled);
1104 (res
, delegate_state, telemetry)