minor refactoring in typing_check_service.profile_log
[hiphop-php.git] / hphp / hack / src / typing / typing_check_service.ml
blob9e57fa2826102fdb2dd0d21231dd12e9280d255f
1 (*
2 * Copyright (c) Facebook, Inc. and its affiliates.
4 * This source code is licensed under the MIT license found in the
5 * LICENSE file in the "hack" directory of this source tree.
7 *)
9 module Hack_bucket = Bucket
10 open Hh_prelude
11 module Bucket = Hack_bucket
12 open Typing_service_types
15 ####
17 The type checking service receives a list of files and their symbols as input and
18 distributes the work to worker processes.
20 A worker process's input is a subset of files. At the end of its work,
21 it returns back to master the following progress report:
22 - completed: a list of computations it completed
23 - remaining: a list of computations which it didn't end up performing. For example,
24 it may not have been able to get through the whole list of computations because
25 it exceeded its worker memory cap
26 - deferred: a list of computations which were discovered to be necessary to be
27 performed before some of the computations in the input subset
29 The deferred list needs further explanation, so read below.
31 ####
33 Here's how we actually plumb through the workitems and their individual results, via MultiWorker:
34 * We have mutable state [ref files_to_process], a list of filenames still to be processed
35 for the entire typecheck.
36 * Datatype [progress] represents an input batch of work for a worker to do,
37 and also an output indication of how it dispatched that batch
38 * Datatype [typing_result] represents the accumulation of telemetry, errors, deps results from workers.
39 Each individual worker is given an empty [typing_result] as input,
40 and merges its own results into that empty input to give a per-worker output,
41 and then we have a separate accumulator and we merge each per-worker's output
42 into that accumulator.
43 * next : () -> progress
44 This mutates [files_to_process] by removing a bucket of filenames,
45 and returns a degenerate progress {remaining=bucket; completed=[]; deferred=[]}
46 which is basically just the bucket of work to be done by the job.
47 * neutral : typing_result
48 This value is just the empty typing_result {errors=Empty; deps=Empty; telemetry=Empty}
49 * job : typing_result -> progress -> (typing_result', progress')
50 MultiWorker will invoke this job. For input,
51 it provides a copy of the degenerate [typing_result] that it got from [neutral], and
52 it provides the degenerate [progress] i.e. just the bucket of work that it got from [next]
53 The behavior of our job is to take items out of progress.remaining and typecheck them.
54 It repeats this process until it either runs out of items or its heap grows too big.
55 It returns a new progress' {remaining'; completed; deferred} to show the items still
56 remaining, the ones it completed, and the ones it had to defer (see below).
57 It returns a new typing_result' {errors; deps; telemetry} by merging its own
58 newly discovered errors, deps, telemetry onto the (degenerate i.e. empty)
59 typing_result it was given as input.
60 * merge : (typing_result * progress) (accumulator : typing_result) -> typing_result
61 The initial value of accumulator is the same [neutral] that was given to each job.
62 After each job, MultiWorker calls [merge] to merge the results of that job
63 into the accumulator.
64 Our merge function looks at the progress {remaining;completed;deferred} that
65 came out of the job, and mutates [files_to_process] by sticking back "remaining+deferred" into it.
66 It then merges the typing_result {errors;deps;telemetry} that came out of the job
67 with those in its accumulator.
69 The type signatures for MultiWorker look like they'd allow a variety of implementations,
70 e.g. having just a single accumulator that starts at "neutral" and feeds one by one into
71 each job. But we don't do that.
73 ####
75 The normal computation kind for the type checking service is to type check all symbols in a file.
76 The type checker would always be able to find a given symbol it needs to perform this job
77 if all the declarations were computed for all the files in the repo before type checking begins.
79 However, such eager declaration does not occur in the following scenarios:
80 - When the server initialization strategy is Lazy: not having a declaration phase was
81 the original meaning of lazy init
82 - If the server initialized from a saved state: most decls would not be available because
83 saved states only consist of one or more of the following: naming table, dependency
84 graph, errors, and hot decls (a small subset of all possible decls)
86 The decl heap module, when asked for a decl that is not yet in memory, can declare the decl's
87 file lazily. Declaring a file means parsing the file and extracting the symbols declared in it,
88 hence the term 'decl'.
90 This lazy declaration strategy works reasonably well for most cases, but there's a case where
91 it works poorly: if a particular file that is getting type checked requires a large number
92 of symbols which have not yet been declared, the unlucky worker ends up serially declaring
93 all the files that contain those symbols. In some cases, we observe type checking times on
94 the order of minutes for a single file.
96 Therefore, to account for the fact that we don't have the overall eager declaration phase
97 in some initialization scenarios, the decl heap module is now capable of refusing to declare
98 a file and instead adding the file to a list of deferments.
100 The type checker worker is then able to return that list and the file which was being type
101 checked when deferments occured back to the master process. The master process is then
102 able to distribute the declaration of the files and the (re)checking of the original
103 file to all of its workers, thus achieving parallelism and better type checking times overall.
105 The deferment of declarations is adjustable: it works by using a threshold value that dictates
106 the maximum number of lazy declarations allowed per file. If the threshold is not set, then
107 there is no limit, and no declarations would be deferred. If the threshold is set at 1,
108 then all declarations would be deferred.
110 The idea behind deferring declarations is similar to the 2nd of the 4 build systems considered in
111 the following paper (Make, Excel's calc engine, Shake, and Bazel):
112 https://www.microsoft.com/en-us/research/uploads/prod/2018/03/build-systems-final.pdf
114 The paper refers to this approach as "restarting", and further suggests that recording and reusing
115 the chain of jobs could be used to minimize the number of restarts.
118 module Delegate = Typing_service_delegate
120 type progress = job_progress
122 let neutral : unit -> typing_result =
123 fun () ->
125 errors = Errors.empty;
126 dep_edges = Typing_deps.dep_edges_make ();
127 telemetry = Telemetry.create ();
128 jobs_finished_to_end = Measure.create ();
129 jobs_finished_early = Measure.create ();
132 (*****************************************************************************)
133 (* The job that will be run on the workers *)
134 (*****************************************************************************)
136 let handle_exn_as_error : type res. Pos.t -> (unit -> res option) -> res option
138 fun pos f ->
139 try f () with
140 | WorkerCancel.Worker_should_exit as e ->
141 (* Cancellation requests must be re-raised *)
142 raise e
143 | e ->
144 Errors.exception_occurred pos (Exception.wrap e);
145 None
147 let type_fun (ctx : Provider_context.t) (fn : Relative_path.t) (x : string) :
148 (Tast.def * Typing_inference_env.t_global_with_pos) option =
149 match Ast_provider.find_fun_in_file ~full:true ctx fn x with
150 | Some f ->
151 handle_exn_as_error f.Aast.f_span (fun () ->
152 let fun_ = Naming.fun_ ctx f in
153 Nast_check.def ctx (Aast.Fun fun_);
154 let def_opt =
155 Typing_toplevel.fun_def ctx fun_
156 |> Option.map ~f:(fun (f, global_tvenv) -> (Aast.Fun f, global_tvenv))
158 Option.iter def_opt (fun (f, _) -> Tast_check.def ctx f);
159 def_opt)
160 | None -> None
162 let type_class (ctx : Provider_context.t) (fn : Relative_path.t) (x : string) :
163 (Tast.def * Typing_inference_env.t_global_with_pos list) option =
164 match Ast_provider.find_class_in_file ~full:true ctx fn x with
165 | Some cls ->
166 handle_exn_as_error cls.Aast.c_span (fun () ->
167 let class_ = Naming.class_ ctx cls in
168 Nast_check.def ctx (Aast.Class class_);
169 let def_opt =
170 Typing_toplevel.class_def ctx class_
171 |> Option.map ~f:(fun (c, global_tvenv) ->
172 (Aast.Class c, global_tvenv))
174 Option.iter def_opt (fun (f, _) -> Tast_check.def ctx f);
175 def_opt)
176 | None -> None
178 let type_record_def
179 (ctx : Provider_context.t) (fn : Relative_path.t) (x : string) :
180 Tast.def option =
181 match Ast_provider.find_record_def_in_file ~full:true ctx fn x with
182 | Some rd ->
183 handle_exn_as_error rd.Aast.rd_span (fun () ->
184 let rd = Naming.record_def ctx rd in
185 Nast_check.def ctx (Aast.RecordDef rd);
187 let def = Aast.RecordDef (Typing_toplevel.record_def_def ctx rd) in
188 Tast_check.def ctx def;
189 Some def)
190 | None -> None
192 let check_typedef (ctx : Provider_context.t) (fn : Relative_path.t) (x : string)
193 : Tast.def option =
194 match Ast_provider.find_typedef_in_file ~full:true ctx fn x with
195 | Some t ->
196 handle_exn_as_error Pos.none (fun () ->
197 let typedef = Naming.typedef ctx t in
198 Nast_check.def ctx (Aast.Typedef typedef);
199 let ret = Typing.typedef_def ctx typedef in
200 let def = Aast.Typedef ret in
201 Tast_check.def ctx def;
202 Some def)
203 | None -> None
205 let check_const (ctx : Provider_context.t) (fn : Relative_path.t) (x : string) :
206 Tast.def option =
207 match Ast_provider.find_gconst_in_file ~full:true ctx fn x with
208 | None -> None
209 | Some cst ->
210 handle_exn_as_error cst.Aast.cst_span (fun () ->
211 let cst = Naming.global_const ctx cst in
212 Nast_check.def ctx (Aast.Constant cst);
213 let def = Aast.Constant (Typing_toplevel.gconst_def ctx cst) in
214 Tast_check.def ctx def;
215 Some def)
217 let should_enable_deferring
218 (opts : GlobalOptions.t) (file : check_file_computation) =
219 match GlobalOptions.tco_max_times_to_defer_type_checking opts with
220 | Some max_times when file.deferred_count >= max_times -> false
221 | _ -> true
223 type process_file_results = {
224 errors: Errors.t;
225 deferred_decls: Deferred_decl.deferment list;
228 let process_file
229 (dynamic_view_files : Relative_path.Set.t)
230 (ctx : Provider_context.t)
231 (errors : Errors.t)
232 (file : check_file_computation) : process_file_results =
233 let fn = file.path in
234 let (errors', ast) = Ast_provider.get_ast_with_error ~full:true ctx fn in
235 if not (Errors.is_empty errors') then
236 { errors = Errors.merge errors' errors; deferred_decls = [] }
237 else
238 let opts =
240 (Provider_context.get_tcopt ctx) with
241 GlobalOptions.tco_dynamic_view =
242 Relative_path.Set.mem dynamic_view_files fn;
245 let (funs, classes, record_defs, typedefs, gconsts) = Nast.get_defs ast in
246 let ctx = Provider_context.map_tcopt ctx ~f:(fun _tcopt -> opts) in
247 let ignore_type_record_def opts fn name =
248 ignore (type_record_def opts fn name)
250 let ignore_check_typedef opts fn name =
251 ignore (check_typedef opts fn name)
253 let ignore_check_const opts fn name = ignore (check_const opts fn name) in
255 let result =
256 Deferred_decl.with_deferred_decls
257 ~enable:(should_enable_deferring opts file)
258 ~declaration_threshold_opt:
259 (GlobalOptions.tco_defer_class_declaration_threshold opts)
260 ~memory_mb_threshold_opt:
261 (GlobalOptions.tco_defer_class_memory_mb_threshold opts)
262 @@ fun () ->
263 Errors.do_with_context fn Errors.Typing @@ fun () ->
264 let (fun_tasts, fun_global_tvenvs) =
265 List.map funs ~f:snd
266 |> List.filter_map ~f:(type_fun ctx fn)
267 |> List.unzip
269 let (class_tasts, class_global_tvenvs) =
270 List.map classes ~f:snd
271 |> List.filter_map ~f:(type_class ctx fn)
272 |> List.unzip
274 let class_global_tvenvs = List.concat class_global_tvenvs in
275 List.map record_defs ~f:snd
276 |> List.iter ~f:(ignore_type_record_def ctx fn);
277 List.map typedefs ~f:snd |> List.iter ~f:(ignore_check_typedef ctx fn);
278 List.map gconsts ~f:snd |> List.iter ~f:(ignore_check_const ctx fn);
279 (fun_tasts @ class_tasts, fun_global_tvenvs @ class_global_tvenvs)
281 match result with
282 | Ok (errors', (tasts, global_tvenvs)) ->
283 if GlobalOptions.tco_global_inference opts then
284 Typing_global_inference.StateSubConstraintGraphs.build_and_save
286 tasts
287 global_tvenvs;
288 { errors = Errors.merge errors' errors; deferred_decls = [] }
289 | Error deferred_decls -> { errors; deferred_decls }
290 with
291 | WorkerCancel.Worker_should_exit as e ->
292 (* Cancellation requests must be re-raised *)
293 raise e
294 | e ->
295 let stack = Caml.Printexc.get_raw_backtrace () in
296 let () =
297 prerr_endline ("Exception on file " ^ Relative_path.S.to_string fn)
299 Caml.Printexc.raise_with_backtrace e stack
301 let get_mem_telemetry () : Telemetry.t option =
302 if SharedMem.hh_log_level () > 0 then
303 Some
304 ( Telemetry.create ()
305 |> Telemetry.object_ ~key:"gc" ~value:(Telemetry.quick_gc_stat ())
306 |> Telemetry.object_ ~key:"shmem" ~value:(SharedMem.get_telemetry ()) )
307 else
308 None
310 let profile_log
311 ~(check_info : check_info)
312 ~(start_counters : Counters.time_in_sec * Telemetry.t)
313 ~(end_counters : Counters.time_in_sec * Telemetry.t)
314 ~(second_run_end_counters : (Counters.time_in_sec * Telemetry.t) option)
315 ~(file : check_file_computation)
316 ~(result : process_file_results) : unit =
317 let (start_time, start_counters) = start_counters in
318 let (end_time, end_counters) = end_counters in
319 let duration = end_time -. start_time in
320 let duration_second_run =
321 Option.map second_run_end_counters ~f:(fun (time, _) -> time -. end_time)
323 let deciding_time = Option.value duration_second_run ~default:duration in
324 (* "deciding_time" is what we compare against the threshold, to see if we should log. *)
325 (* We'll also log if it had been previously deferred, or if it's being deferred right now. *)
326 let should_log =
327 Float.(deciding_time >= check_info.profile_type_check_duration_threshold)
328 || file.deferred_count > 0
329 || not (List.is_empty result.deferred_decls)
331 if should_log then begin
332 let profile = Telemetry.diff ~all:false ~prev:start_counters end_counters in
333 let profile_second_run =
334 Option.map second_run_end_counters ~f:(fun (_, counters) ->
335 Telemetry.diff ~all:false ~prev:end_counters counters)
337 let filesize_opt =
338 try Some (Relative_path.to_absolute file.path |> Unix.stat).Unix.st_size
339 with Unix.Unix_error _ -> None
341 let deferment_telemetry =
342 Telemetry.create ()
343 |> Telemetry.int_ ~key:"times_checked" ~value:(file.deferred_count + 1)
344 |> Telemetry.int_
345 ~key:"files_to_declare"
346 ~value:(List.length result.deferred_decls)
348 let telemetry =
349 Telemetry.create ()
350 |> Telemetry.int_opt ~key:"filesize" ~value:filesize_opt
351 |> Telemetry.object_ ~key:"deferment" ~value:deferment_telemetry
352 |> Telemetry.object_ ~key:"profile" ~value:profile
354 let telemetry =
355 Option.fold
356 ~init:telemetry
357 profile_second_run
358 ~f:(fun telemetry profile ->
359 Telemetry.object_ telemetry ~key:"profile_second_run" ~value:profile)
361 HackEventLogger.ProfileTypeCheck.process_file
362 ~recheck_id:check_info.recheck_id
363 ~path:file.path
364 ~telemetry;
365 Hh_logger.log
366 "%s [%s] %fs%s"
367 (Relative_path.suffix file.path)
368 ( if List.is_empty result.deferred_decls then
369 "type-check"
370 else
371 "discover-decl-deps" )
372 (Option.value duration_second_run ~default:duration)
373 ( if SharedMem.hh_log_level () > 0 then
374 "\n" ^ Telemetry.to_string telemetry
375 else
376 "" )
379 let read_counters () : Counters.time_in_sec * Telemetry.t =
380 let typecheck_time = Counters.read_time Counters.Category.Typecheck in
381 let mem_telemetry = get_mem_telemetry () in
382 let operations_counters = Counters.get_counters () in
383 ( typecheck_time,
384 Telemetry.create ()
385 |> Telemetry.object_opt ~key:"memory" ~value:mem_telemetry
386 |> Telemetry.object_ ~key:"operations" ~value:operations_counters )
388 module ProcessFilesTally = struct
389 (** Counters for the [file_computation] of each sort being processed *)
390 type t = {
391 decls: int; (** how many [Declare] items we performed *)
392 prefetches: int; (** how many [Prefetch] items we performed *)
393 checks_done: int; (** how many [Check] items we typechecked *)
394 checks_deferred: int; (** how many [Check] items we deferred to later *)
395 decls_deferred: int; (** how many [Declare] items we added for later *)
396 exceeded_cap_count: int; (** how many times we exceeded the memory cap *)
399 let empty =
401 decls = 0;
402 prefetches = 0;
403 checks_done = 0;
404 checks_deferred = 0;
405 decls_deferred = 0;
406 exceeded_cap_count = 0;
409 let incr_decls tally = { tally with decls = tally.decls + 1 }
411 let incr_caps tally =
412 { tally with exceeded_cap_count = tally.exceeded_cap_count + 1 }
414 let incr_prefetches tally = { tally with prefetches = tally.prefetches + 1 }
416 let incr_checks tally deferred_decls =
417 if List.is_empty deferred_decls then
418 { tally with checks_done = tally.checks_done + 1 }
419 else
421 tally with
422 checks_deferred = tally.checks_deferred + 1;
423 decls_deferred = tally.decls_deferred + List.length deferred_decls;
427 let get_heap_size () = Gc.((quick_stat ()).Stat.heap_words) * 8 / 1024 / 1024
429 let process_files
430 (dynamic_view_files : Relative_path.Set.t)
431 (ctx : Provider_context.t)
432 ({ errors; dep_edges; telemetry; jobs_finished_early; jobs_finished_to_end } :
433 typing_result)
434 (progress : computation_progress)
435 ~(memory_cap : int option)
436 ~(check_info : check_info) : typing_result * computation_progress =
437 SharedMem.invalidate_caches ();
438 File_provider.local_changes_push_sharedmem_stack ();
439 Ast_provider.local_changes_push_sharedmem_stack ();
441 Decl_counters.set_mode check_info.profile_decling;
442 let _prev_counters_state = Counters.reset () in
443 let (_start_counter_time, start_counters) = read_counters () in
444 let tally = ProcessFilesTally.empty in
445 let start_file_count = List.length progress.remaining in
446 let start_time = Unix.gettimeofday () in
448 let rec process_or_exit errors progress tally max_heap_mb =
449 (* If the major heap has exceeded the bounds, we decline to typecheck the remaining files.
450 We use [quick_stat] instead of [stat] in get_heap_size in order to avoid walking the major heap,
451 and we don't change the minor heap because it's small and fixed-size.
452 The start-remaining test is to make sure we make at least one file of progress
453 even in case of a crazy low memory cap. *)
454 let heap_mb = get_heap_size () in
455 let max_heap_mb = Int.max heap_mb max_heap_mb in
456 let cap = Option.value memory_cap ~default:Int.max_value in
457 let over_cap =
458 heap_mb > cap && start_file_count > List.length progress.remaining
460 let (exit_now, tally, heap_mb) =
461 if over_cap then
462 (true, ProcessFilesTally.incr_caps tally, heap_mb)
463 else
464 (false, tally, heap_mb)
466 match progress.remaining with
467 | [] -> (errors, progress, tally, heap_mb, max_heap_mb)
468 | _ when exit_now ->
469 let cgroup_stats = CGroup.get_stats () in
470 (match cgroup_stats with
471 | Error _ -> ()
472 | Ok { CGroup.total; _ } ->
473 Measure.sample "worker_cgroup_total" (float_of_int total));
474 (errors, progress, tally, heap_mb, max_heap_mb)
475 | fn :: fns ->
476 let (errors, deferred, tally) =
477 match fn with
478 | Check file ->
479 let process_file () =
480 process_file dynamic_view_files ctx errors file
482 let result =
483 if check_info.profile_log then (
484 let start_counters = read_counters () in
485 let result = process_file () in
486 let end_counters = read_counters () in
487 let second_run_end_counters =
488 if check_info.profile_type_check_twice then
489 (* we're running this routine solely for the side effect *)
490 (* of seeing how long it takes to run. *)
491 let _ignored = process_file () in
492 Some (read_counters ())
493 else
494 None
496 profile_log
497 ~check_info
498 ~start_counters
499 ~end_counters
500 ~second_run_end_counters
501 ~file
502 ~result;
503 result
504 ) else
505 process_file ()
507 let tally =
508 ProcessFilesTally.incr_checks tally result.deferred_decls
510 let deferred =
511 if List.is_empty result.deferred_decls then
513 else
514 List.map result.deferred_decls ~f:(fun fn -> Declare fn)
515 @ [Check { file with deferred_count = file.deferred_count + 1 }]
517 (result.errors, deferred, tally)
518 | Declare (_path, class_name) ->
519 let (_ : Decl_provider.class_decl option) =
520 Decl_provider.get_class ctx class_name
522 (errors, [], ProcessFilesTally.incr_decls tally)
523 | Prefetch paths ->
524 Vfs.prefetch paths;
525 (errors, [], ProcessFilesTally.incr_prefetches tally)
527 let progress =
529 completed = fn :: progress.completed;
530 remaining = fns;
531 deferred = List.concat [deferred; progress.deferred];
534 process_or_exit errors progress tally max_heap_mb
537 (* Process as many files as we can, and merge in their errors *)
538 let (errors, progress, tally, final_heap_mb, max_heap_mb) =
539 process_or_exit errors progress tally 0
542 (* Update edges *)
543 let new_dep_edges =
544 Typing_deps.flush_ideps_batch (Provider_context.get_deps_mode ctx)
546 let dep_edges = Typing_deps.merge_dep_edges dep_edges new_dep_edges in
548 (* Gather up our various forms of telemetry... *)
549 let (_end_counter_time, end_counters) = read_counters () in
550 (* Note: the 'add' operation (performed here, and also later in case of
551 MultiWorker.merge) will strip all non-numbers from telemetry. *)
552 let telemetry =
553 Telemetry.add
554 telemetry
555 (Telemetry.diff
556 ~all:false
557 ~suffix_keys:false
558 end_counters
559 ~prev:start_counters)
561 let processed_file_count =
562 start_file_count - List.length progress.remaining
564 let processed_file_fraction =
565 float_of_int processed_file_count /. float_of_int start_file_count
567 let record =
568 if List.is_empty progress.remaining then
569 jobs_finished_to_end
570 else
571 jobs_finished_early
573 let open ProcessFilesTally in
574 Measure.sample ~record "seconds" (Unix.gettimeofday () -. start_time);
575 Measure.sample ~record "final_heap_mb" (float_of_int final_heap_mb);
576 Measure.sample ~record "files" (float_of_int processed_file_count);
577 Measure.sample ~record "files_fraction" processed_file_fraction;
578 Measure.sample ~record "decls" (float_of_int tally.decls);
579 Measure.sample ~record "prefetches" (float_of_int tally.prefetches);
580 Measure.sample ~record "checks_done" (float_of_int tally.checks_done);
581 Measure.sample ~record "checks_deferred" (float_of_int tally.checks_deferred);
582 Measure.sample ~record "decls_deferred" (float_of_int tally.decls_deferred);
583 Measure.sample
584 ~record
585 "exceeded_cap_count"
586 (float_of_int tally.exceeded_cap_count);
587 Measure.sample ~record "max_heap_mb" (float_of_int max_heap_mb);
589 TypingLogger.flush_buffers ();
590 Ast_provider.local_changes_pop_sharedmem_stack ();
591 File_provider.local_changes_pop_sharedmem_stack ();
592 ( { errors; dep_edges; telemetry; jobs_finished_early; jobs_finished_to_end },
593 progress )
595 let load_and_process_files
596 (ctx : Provider_context.t)
597 (dynamic_view_files : Relative_path.Set.t)
598 (typing_result : typing_result)
599 (progress : computation_progress)
600 ~(memory_cap : int option)
601 ~(check_info : check_info) : typing_result * computation_progress =
602 (* When the type-checking worker receives SIGUSR1, display a position which
603 corresponds approximately with the function/expression being checked. *)
604 Sys_utils.set_signal
605 Sys.sigusr1
606 (Sys.Signal_handle Typing.debug_print_last_pos);
607 process_files
608 dynamic_view_files
610 typing_result
611 progress
612 ~memory_cap
613 ~check_info
615 (*****************************************************************************)
616 (* Let's go! That's where the action is *)
617 (*****************************************************************************)
619 (** Merge the results from multiple workers.
621 We don't really care about which files are left unchecked since we use
622 (gasp) mutation to track that, so combine the errors but always return an
623 empty list for the list of unchecked files. *)
624 let merge
625 ~(should_prefetch_deferred_files : bool)
626 (delegate_state : Delegate.state ref)
627 (files_to_process : file_computation BigList.t ref)
628 (files_initial_count : int)
629 (files_in_progress : file_computation Hash_set.t)
630 (files_checked_count : int ref)
631 ((produced_by_job : typing_result), (progress : progress))
632 (acc : typing_result) : typing_result =
633 let () =
634 match progress.kind with
635 | Progress -> ()
636 | DelegateProgress _ ->
637 delegate_state :=
638 Delegate.merge !delegate_state produced_by_job.errors progress.progress
640 let progress = progress.progress in
642 files_to_process := BigList.append progress.remaining !files_to_process;
644 (* Let's also prepend the deferred files! *)
645 files_to_process := BigList.append progress.deferred !files_to_process;
647 (* Prefetch the deferred files, if necessary *)
648 files_to_process :=
649 if should_prefetch_deferred_files && List.length progress.deferred > 10 then
650 let files_to_prefetch =
651 List.fold progress.deferred ~init:[] ~f:(fun acc computation ->
652 match computation with
653 | Declare (path, _) -> path :: acc
654 | _ -> acc)
656 BigList.cons (Prefetch files_to_prefetch) !files_to_process
657 else
658 !files_to_process;
660 (* If workers can steal work from each other, then it's possible that
661 some of the files that the current worker completed checking have already
662 been removed from the in-progress set. Thus, we should keep track of
663 how many type check computations we actually remove from the in-progress
664 set. Note that we also skip counting Declare and Prefetch computations,
665 since they are not relevant for computing how many files we've type
666 checked. *)
667 let completed_check_count =
668 List.fold
669 ~init:0
670 ~f:(fun acc computation ->
671 match Hash_set.Poly.strict_remove files_in_progress computation with
672 | Ok () ->
673 begin
674 match computation with
675 | Check _ -> acc + 1
676 | _ -> acc
678 | _ -> acc)
679 progress.completed
682 (* Deferred type check computations should be subtracted from completed
683 in order to produce an accurate count because they we requeued them, yet
684 they were also included in the completed list.
686 let is_check file =
687 match file with
688 | Check _ -> true
689 | _ -> false
691 let deferred_check_count = List.count ~f:is_check progress.deferred in
692 let completed_check_count = completed_check_count - deferred_check_count in
694 files_checked_count := !files_checked_count + completed_check_count;
695 let delegate_progress =
696 Typing_service_delegate.get_progress !delegate_state
698 ServerProgress.send_percentage_progress_to_monitor
699 ~operation:"typechecking"
700 ~done_count:!files_checked_count
701 ~total_count:files_initial_count
702 ~unit:"files"
703 ~extra:delegate_progress;
704 accumulate_job_output produced_by_job acc
706 let next
707 (workers : MultiWorker.worker list option)
708 (delegate_state : Delegate.state ref)
709 (files_to_process : file_computation BigList.t ref)
710 (files_in_progress : file_computation Hash_set.Poly.t)
711 (record : Measure.record) =
712 let max_size = Bucket.max_size () in
713 let num_workers =
714 match workers with
715 | Some w -> List.length w
716 | None -> 1
718 let return_bucket_job kind ~current_bucket ~remaining_jobs =
719 (* Update our shared mutable state, because hey: it's not like we're
720 writing OCaml or anything. *)
721 files_to_process := remaining_jobs;
722 List.iter ~f:(Hash_set.Poly.add files_in_progress) current_bucket;
723 Bucket.Job
725 kind;
726 progress = { completed = []; remaining = current_bucket; deferred = [] };
729 fun () ->
730 Measure.time ~record "time" @@ fun () ->
731 let (state, delegate_job) =
732 Typing_service_delegate.next
733 !files_to_process
734 files_in_progress
735 !delegate_state
737 delegate_state := state;
739 let (stolen, state) = Typing_service_delegate.steal state max_size in
740 (* If a delegate job is returned, then that means that it should be done
741 by the next MultiWorker worker (the one for whom we're creating a job
742 in this function). If delegate job is None, then the regular (local
743 type checking) logic applies. *)
744 match delegate_job with
745 | Some { current_bucket; remaining_jobs; job } ->
746 return_bucket_job (DelegateProgress job) current_bucket remaining_jobs
747 | None ->
748 (* WARNING: the following List.length is costly - for a full init, files_to_process starts
749 out as the size of the entire repo, and we're traversing the entire list. *)
750 let files_to_process_length = BigList.length !files_to_process in
751 (match (files_to_process_length, stolen) with
752 | (0, []) when Hash_set.Poly.is_empty files_in_progress -> Bucket.Done
753 | (0, []) -> Bucket.Wait
754 | (_, stolen_jobs) ->
755 let jobs =
756 if files_to_process_length > List.length stolen_jobs then
757 !files_to_process
758 else begin
759 Hh_logger.log
760 "Steal payload from local workers: %d jobs"
761 (List.length stolen_jobs);
762 delegate_state := state;
763 let stolen_jobs =
764 List.map stolen_jobs ~f:(fun job ->
765 Hash_set.Poly.remove files_in_progress job;
766 match job with
767 | Check { path; deferred_count } ->
768 Check { path; deferred_count = deferred_count + 1 }
769 | _ -> failwith "unexpected state")
771 BigList.rev_append stolen_jobs !files_to_process
774 begin
775 match num_workers with
776 (* When num_workers is zero, the execution mode is delegate-only, so we give an empty bucket to MultiWorker for execution. *)
777 | 0 -> return_bucket_job Progress [] jobs
778 | _ ->
779 let bucket_size =
780 Bucket.calculate_bucket_size
781 ~num_jobs:files_to_process_length
782 ~num_workers
783 ~max_size
785 let (current_bucket, remaining_jobs) =
786 BigList.split_n jobs bucket_size
788 return_bucket_job Progress current_bucket remaining_jobs
789 end)
791 let on_cancelled
792 (next : unit -> 'a Bucket.bucket)
793 (files_to_process : 'b Hash_set.Poly.elt BigList.t ref)
794 (files_in_progress : 'b Hash_set.Poly.t) : unit -> 'a list =
795 fun () ->
796 (* The size of [files_to_process] is bounded only by repo size, but
797 [files_in_progress] is capped at [(worker count) * (max bucket size)]. *)
798 files_to_process :=
799 BigList.append (Hash_set.Poly.to_list files_in_progress) !files_to_process;
800 let rec add_next acc =
801 match next () with
802 | Bucket.Job j -> add_next (j :: acc)
803 | Bucket.Wait
804 | Bucket.Done ->
807 add_next []
810 `next` and `merge` both run in the master process and update mutable
811 state in order to track work in progress and work remaining.
812 `job` runs in each worker and does not have access to this mutable state.
814 let process_in_parallel
815 (ctx : Provider_context.t)
816 (dynamic_view_files : Relative_path.Set.t)
817 (workers : MultiWorker.worker list option)
818 (delegate_state : Delegate.state)
819 (telemetry : Telemetry.t)
820 (fnl : file_computation BigList.t)
821 ~(interrupt : 'a MultiWorker.interrupt_config)
822 ~(memory_cap : int option)
823 ~(check_info : check_info) :
824 typing_result * Delegate.state * Telemetry.t * 'a * Relative_path.t list =
825 let record = Measure.create () in
826 (* [record] is used by [next] *)
827 let delegate_state = ref delegate_state in
828 let files_to_process = ref fnl in
829 let files_in_progress = Hash_set.Poly.create () in
830 let files_processed_count = ref 0 in
831 let files_initial_count = BigList.length fnl in
832 let delegate_progress =
833 Typing_service_delegate.get_progress !delegate_state
835 ServerProgress.send_percentage_progress_to_monitor
836 ~operation:"typechecking"
837 ~done_count:0
838 ~total_count:files_initial_count
839 ~unit:"files"
840 ~extra:delegate_progress;
842 let next =
843 next workers delegate_state files_to_process files_in_progress record
845 let should_prefetch_deferred_files =
846 Vfs.is_vfs ()
847 && TypecheckerOptions.prefetch_deferred_files
848 (Provider_context.get_tcopt ctx)
850 let job =
851 load_and_process_files ctx dynamic_view_files ~memory_cap ~check_info
853 let job (typing_result : typing_result) (progress : progress) =
854 let (typing_result, computation_progress) =
855 match progress.kind with
856 | Progress -> job typing_result progress.progress
857 | DelegateProgress job -> Delegate.process job
859 (typing_result, { progress with progress = computation_progress })
861 let (typing_result, env, cancelled_results) =
862 MultiWorker.call_with_interrupt
863 workers
864 ~job
865 ~neutral:(neutral ())
866 ~merge:
867 (merge
868 ~should_prefetch_deferred_files
869 delegate_state
870 files_to_process
871 files_initial_count
872 files_in_progress
873 files_processed_count)
874 ~next
875 ~on_cancelled:(on_cancelled next files_to_process files_in_progress)
876 ~interrupt
878 let telemetry =
879 Typing_service_delegate.add_telemetry !delegate_state telemetry
880 |> Telemetry.object_
881 ~key:"next"
882 ~value:(Measure.stats_to_telemetry ~record ())
884 let paths_of (cancelled_results : progress list) : Relative_path.t list =
885 let paths_of (cancelled_progress : progress) =
886 let cancelled_computations = cancelled_progress.progress.remaining in
887 let paths_of paths (cancelled_computation : file_computation) =
888 match cancelled_computation with
889 | Check { path; _ } -> path :: paths
890 | _ -> paths
892 List.fold cancelled_computations ~init:[] ~f:paths_of
894 List.concat (List.map cancelled_results ~f:paths_of)
896 (typing_result, !delegate_state, telemetry, env, paths_of cancelled_results)
898 type ('a, 'b, 'c, 'd) job_result = 'a * 'b * 'c * 'd * Relative_path.t list
900 module type Mocking_sig = sig
901 val with_test_mocking :
902 (* real job payload, that we can modify... *)
903 file_computation BigList.t ->
904 ((* ... before passing it to the real job executor... *)
905 file_computation BigList.t ->
906 ('a, 'b, 'c, 'd) job_result) ->
907 (* ... which output we can also modify. *)
908 ('a, 'b, 'c, 'd) job_result
911 module NoMocking = struct
912 let with_test_mocking fnl f = f fnl
915 module TestMocking = struct
916 let cancelled = ref Relative_path.Set.empty
918 let set_is_cancelled x = cancelled := Relative_path.Set.add !cancelled x
920 let is_cancelled x = Relative_path.Set.mem !cancelled x
922 let with_test_mocking fnl f =
923 let (mock_cancelled, fnl) =
924 List.partition_map (BigList.as_list fnl) ~f:(fun computation ->
925 match computation with
926 | Check { path; _ } ->
927 if is_cancelled path then
928 `Fst path
929 else
930 `Snd computation
931 | _ -> `Snd computation)
933 (* Only cancel once to avoid infinite loops *)
934 cancelled := Relative_path.Set.empty;
935 let (res, delegate_state, telemetry, env, cancelled) =
936 f (BigList.create fnl)
938 (res, delegate_state, telemetry, env, mock_cancelled @ cancelled)
941 module Mocking =
942 ( val if Injector_config.use_test_stubbing then
943 (module TestMocking : Mocking_sig)
944 else
945 (module NoMocking : Mocking_sig) )
947 let should_process_sequentially
948 (opts : TypecheckerOptions.t) (fnl : file_computation BigList.t) : bool =
949 (* If decls can be deferred, then we should process in parallel, since
950 we are likely to have more computations than there are files to type check. *)
951 let defer_threshold =
952 TypecheckerOptions.defer_class_declaration_threshold opts
954 let parallel_threshold =
955 TypecheckerOptions.parallel_type_checking_threshold opts
957 match (defer_threshold, BigList.length fnl) with
958 | (None, file_count) when file_count < parallel_threshold -> true
959 | _ -> false
961 let go_with_interrupt
962 (ctx : Provider_context.t)
963 (workers : MultiWorker.worker list option)
964 (delegate_state : Delegate.state)
965 (telemetry : Telemetry.t)
966 (dynamic_view_files : Relative_path.Set.t)
967 (fnl : Relative_path.t list)
968 ~(interrupt : 'a MultiWorker.interrupt_config)
969 ~(memory_cap : int option)
970 ~(check_info : check_info)
971 ~(profiling : CgroupProfiler.Profiling.t) :
972 (Errors.t, Delegate.state, Telemetry.t, 'a) job_result =
973 let opts = Provider_context.get_tcopt ctx in
974 let sample_rate = GlobalOptions.tco_typecheck_sample_rate opts in
975 let fnl = BigList.create fnl in
976 let fnl =
977 if sample_rate >= 1.0 then
979 else
980 let result =
981 BigList.filter
982 ~f:(fun x ->
983 float (Base.String.hash (Relative_path.suffix x) mod 1000000)
984 <= sample_rate *. 1000000.0)
987 Hh_logger.log
988 "Sampling %f percent of files: %d out of %d"
989 sample_rate
990 (BigList.length result)
991 (BigList.length fnl);
992 result
994 let fnl =
995 BigList.map fnl ~f:(fun path -> Check { path; deferred_count = 0 })
997 Mocking.with_test_mocking fnl @@ fun fnl ->
998 let (typing_result, delegate_state, telemetry, env, cancelled_fnl) =
999 if should_process_sequentially opts fnl then begin
1000 Hh_logger.log "Type checking service will process files sequentially";
1001 let progress =
1002 { completed = []; remaining = BigList.as_list fnl; deferred = [] }
1004 let (typing_result, _progress) =
1005 process_files
1006 dynamic_view_files
1008 (neutral ())
1009 progress
1010 ~memory_cap:None
1011 ~check_info
1013 ( typing_result,
1014 delegate_state,
1015 telemetry,
1016 interrupt.MultiThreadedCall.env,
1017 [] )
1018 end else begin
1019 Hh_logger.log "Type checking service will process files in parallel";
1020 let workers =
1021 match (workers, TypecheckerOptions.num_local_workers opts) with
1022 | (Some workers, Some num_local_workers) ->
1023 let (workers, _) = List.split_n workers num_local_workers in
1024 Some workers
1025 | (None, _)
1026 | (_, None) ->
1027 workers
1029 process_in_parallel
1031 dynamic_view_files
1032 workers
1033 delegate_state
1034 telemetry
1036 ~interrupt
1037 ~memory_cap
1038 ~check_info
1041 Typing_deps.register_discovered_dep_edges typing_result.dep_edges;
1042 let cgroup_total_max =
1043 Base.Option.value ~default:0.0 (Measure.get_max "worker_cgroup_total")
1045 CgroupProfiler.Profiling.record_stats
1046 ~profiling
1047 ~stage:"type check"
1048 ~metric:"cgroup_total"
1049 ~value:cgroup_total_max;
1051 if check_info.profile_log then
1052 Hh_logger.log
1053 "Typecheck perf: %s"
1054 (HackEventLogger.ProfileTypeCheck.get_telemetry_url
1055 ~init_id:check_info.init_id
1056 ~recheck_id:check_info.recheck_id);
1057 let job_size_telemetry =
1058 Telemetry.create ()
1059 |> Telemetry.object_
1060 ~key:"finished_to_end"
1061 ~value:
1062 (Measure.stats_to_telemetry
1063 ~record:typing_result.jobs_finished_to_end
1065 |> Telemetry.object_
1066 ~key:"finished_early"
1067 ~value:
1068 (Measure.stats_to_telemetry
1069 ~record:typing_result.jobs_finished_early
1072 let telemetry =
1073 telemetry
1074 |> Telemetry.object_ ~key:"profiling_info" ~value:typing_result.telemetry
1075 |> Telemetry.object_ ~key:"job_sizes" ~value:job_size_telemetry
1077 (typing_result.errors, delegate_state, telemetry, env, cancelled_fnl)
1079 let go
1080 ?(profiling : CgroupProfiler.Profiling.t = CgroupProfiler.Profiling.empty)
1081 (ctx : Provider_context.t)
1082 (workers : MultiWorker.worker list option)
1083 (delegate_state : Delegate.state)
1084 (telemetry : Telemetry.t)
1085 (dynamic_view_files : Relative_path.Set.t)
1086 (fnl : Relative_path.t list)
1087 ~(memory_cap : int option)
1088 ~(check_info : check_info) : Errors.t * Delegate.state * Telemetry.t =
1089 let interrupt = MultiThreadedCall.no_interrupt () in
1090 let (res, delegate_state, telemetry, (), cancelled) =
1091 go_with_interrupt
1093 workers
1094 delegate_state
1095 telemetry
1096 dynamic_view_files
1098 ~interrupt
1099 ~memory_cap
1100 ~check_info
1101 ~profiling
1103 assert (List.is_empty cancelled);
1104 (res, delegate_state, telemetry)