Watchman-based revision tracking in Hack Monitor Informant
[hiphop-php.git] / hphp / hack / src / server / hhMonitorInformant.ml
blob4bf1f16e380482fbd4bb7020b60eb7b7db90a76b
1 (**
2 * Copyright (c) 2016, Facebook, Inc.
3 * All rights reserved.
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the "hack" directory of this source tree. An additional grant
7 * of patent rights can be found in the PATENTS file in the same directory.
9 *)
11 include HhMonitorInformant_sig.Types
13 (**
14 * The Revision tracker tracks the latest known SVN Revision of the repo,
15 * the corresponding SVN revisions of Hg revisions, and the sequence of
16 * revision changes (from hg update). See record type "env" below.
18 * This machinery is necessary because Watchman state change events give
19 * us only the HG Revisions of hg updates, but we need to make decisions
20 * on their SVN Revision numbers.
22 * We want to be able to:
23 * 1) Determine when the base SVN revision (in trunk) has changed
24 * "significantly" so we can inform the server monitor to trigger
25 * a server restart (since incremental typechecks can be slower
26 * than a fresh server using a saved state).
27 * 2) Fulfill goal 1 without blocking, and while being "highly responsive"
29 * The definition of "significant" can be adjusted according to how fast
30 * incremental type checking is compared to a fresh restart.
32 * The meaning of "highly responsive" above roughly means using a cache
33 * for SVN revisions (because a Mercurial request to get the SVN revision
34 * number of an HG Revision is very slow, on the order of seconds).
35 * Consider the following scenario:
37 * Server is running with Repo at Revision 100. User does hg update to
38 * Rev 500 (which triggers a server restart) and works on that for
39 * an hour. Then the user hg updates back to Rev 100.
41 * At this point, the server restart should be triggered because we
42 * are moving across hundreds of revisions. Without caching the Revision
43 * 100 result, the restart would be delayed by seconds. With a cache, the
44 * restart is triggered immediately.
46 module Revision_tracker = struct
48 (** This is just a heuristic taken by eye-balling some graphs. Can turn
49 * up the sensitivity as the advantage of a fresh server over incremental
50 * typechecking improves. *)
51 let restart_min_svn_distance = 100
53 type timestamp = float
55 type env = {
56 watchman : Watchman.watchman_instance ref;
57 root : Path.t;
58 current_base_revision : int ref;
60 (**
61 * Running queries and cached results. A query gets the SVN revision for a
62 * given HG Revision.
64 queries : (Hg.hg_rev, (Hg.svn_rev Future.t)) Hashtbl.t;
66 (**
67 * Timestamp and HG revision of state exit events.
69 * Why do we keep the entire sequence? It seems like it would be sufficient
70 * to just consume from the Watchman subscription one-at-a-time,
71 * processing at most one state exit at a time. But consider the case
72 * of many sequential hg updates, the last of which is very distant
73 * (i.e. significant), and we happen to have a cached value only for
74 * the last hg update.
76 * If we processed one-at-a-time as the updates came in, it would be many
77 * seconds of processing each hg query before deciding at the final one
78 * to restart the server (so the server restart is delayed by many
79 * seconds).
81 * By keeping a running queue of state exits and "preprocessing" new
82 * exits from the watchman subscription (before appending them to this
83 * queue), we can catch that final hg update early on and proactively
84 * trigger a server restart.
86 state_exits : (timestamp * Hg.hg_rev) Queue.t;
89 type instance =
90 | Initializing of Watchman.watchman_instance * Path.t * (Hg.svn_rev Future.t)
91 | Tracking of env
93 type change =
94 | Hg_update_enter of Hg.hg_rev
95 | Hg_update_exit of Hg.hg_rev
97 (** Revision_tracker has lots of mutable state anyway, so might as well
98 * make it responsible for maintaining its own instance. *)
99 type t = instance ref
101 let init watchman root =
102 ref @@ Initializing (watchman, root,
103 Hg.current_working_copy_base_rev (Path.to_string root))
105 (** Wrap Future.get. If process exits abnormally, returns 0. *)
106 let svn_rev_of_future future =
107 let parse_svn_rev svn_rev =
108 try int_of_string svn_rev
109 with Failure "int_of_string" ->
110 Hh_logger.log "Revision_tracker failed to parse svn_rev: %s" svn_rev;
113 try begin
114 let result = Future.get future in
115 parse_svn_rev result
116 end with
117 | Future_sig.Process_failure _ -> 0
119 let active_env watchman root base_svn_rev =
121 watchman = ref @@ watchman;
122 root;
123 current_base_revision = ref base_svn_rev;
124 queries = Hashtbl.create 200;
125 state_exits = Queue.create() ;
128 let get_distance svn_rev env =
129 abs @@ svn_rev - !(env.current_base_revision)
131 (** See docs on non-recursive version below. *)
132 let rec churn_exits env acc =
133 try begin
134 let timestamp, hg_rev = Queue.peek env.state_exits in
135 (** Hashtable always has an entry, since it is added before
136 * being put on the state_exits queue. *)
137 let future = Hashtbl.find env.queries hg_rev in
138 if Future.is_ready future then
139 let _ = Queue.pop env.state_exits in
140 let svn_rev = svn_rev_of_future future in
141 let distance = float_of_int @@ get_distance svn_rev env in
142 let elapsed = (Unix.time () -. timestamp) in
143 (** Allow up to 2 revisions per second for incremental. More than that,
144 * prefer a server restart. *)
145 let should_restart = distance > (float_of_int restart_min_svn_distance)
146 && (distance /. elapsed) > 2.0 in
147 (** Repo has been moved to a new SVN Rev, so we set this mutable
148 * reference. This must be done after computing distance. *)
149 let () = env.current_base_revision := svn_rev in
150 churn_exits env (acc || should_restart)
151 else
154 with
155 | Queue.Empty -> acc
158 * Keep popping state_exits queue until we reach a non-ready result.
160 * Returns true if a state exit is encountered that should trigger a
161 * restart.
163 * Non-blocking.
165 let churn_exits env = churn_exits env false
167 let maybe_add_query hg_rev env =
168 (** Don't add if we already have an entry for this. *)
169 try ignore @@ Hashtbl.find env.queries hg_rev
170 with
171 | Not_found ->
172 let future = Hg.get_closest_svn_ancestor
173 hg_rev (Path.to_string env.root) in
174 Hashtbl.add env.queries hg_rev future
176 let purge_exits env = Queue.clear env.state_exits
178 let handle_change change env =
179 let should_exit = churn_exits env in
180 if should_exit
181 then
182 let () = purge_exits env in
183 Informant_sig.Restart_server
184 else
185 match change with
186 | None ->
187 Informant_sig.Move_along
188 | Some (Hg_update_enter hg_rev) ->
189 let () = maybe_add_query hg_rev env in
190 Informant_sig.Move_along
191 | Some (Hg_update_exit hg_rev) ->
192 let () = maybe_add_query hg_rev env in
193 let () = Queue.push (Unix.time (), hg_rev) env.state_exits in
194 Informant_sig.Move_along
196 let parse_json json = match json with
197 | None -> None
198 | Some json ->
199 let open Hh_json.Access in
200 (return json) >>=
201 get_string "rev" |> function
202 | Result.Error _ ->
203 let () = Hh_logger.log
204 "Revision_tracker failed to get rev in json: %s"
205 (Hh_json.json_to_string json) in
206 None
207 | Result.Ok (v, _) -> Some v
209 let get_change env =
210 let watchman, change = Watchman.get_changes !(env.watchman) in
211 env.watchman := watchman;
212 match change with
213 | Watchman.Watchman_unavailable
214 | Watchman.Watchman_synchronous _ ->
215 None
216 | Watchman.Watchman_pushed (Watchman.State_enter (state, json))
217 when state = "hg.update" ->
218 let open Option in
219 parse_json json >>= fun hg_rev ->
220 Some (Hg_update_enter hg_rev)
221 | Watchman.Watchman_pushed (Watchman.State_leave (state, json))
222 when state = "hg.update" ->
223 let open Option in
224 parse_json json >>= fun hg_rev ->
225 Some (Hg_update_exit hg_rev)
226 | Watchman.Watchman_pushed _ ->
227 None
230 * This "catches" significant state exits early on, before adding them
231 * to state_exits queue. Returns true if we want to
233 * See docs on state_exits queue above.
235 let preprocess change env = match change with
236 | None -> false
237 | Some (Hg_update_enter _hg_rev) ->
238 false
239 | Some (Hg_update_exit hg_rev) ->
240 try begin
241 let future = Hashtbl.find env.queries hg_rev in
242 if Future.is_ready future then
243 let svn_rev = svn_rev_of_future future in
244 let distance = get_distance svn_rev env in
245 (** Just a heuristic - restart if we're crossing more than
246 * 30 revisions. *)
247 let () = env.current_base_revision := svn_rev in
248 distance > restart_min_svn_distance
249 else
250 false
251 end with
252 | Not_found -> false
255 * This must be a non-blocking call, so it creates Futures and consumes ready
256 * Futures.
258 * The steps are:
259 * 1) Get state change event from Watchman.
260 * 2) Pre-process event to maybe early trigger a server restart
261 * 3) Maybe add a needed query
262 * 4) Append state exit to state_exits queue
263 * 5) Check state_exits queue processing all ready results.
264 * *)
265 let process env =
266 let change = get_change env in
267 let should_restart = preprocess change env in
268 if should_restart
269 then
270 let () = purge_exits env in
271 Informant_sig.Restart_server
272 else
273 handle_change change env
275 let make_report t = match !t with
276 | Initializing (watchman, root, future) ->
277 if Future.is_ready future
278 then
279 let svn_rev = svn_rev_of_future future in
280 let () = Hh_logger.log "Initialized Revision_tracker to SVN rev: %d"
281 svn_rev in
282 let env = active_env watchman root svn_rev in
283 let () = t := Tracking env in
284 process env
285 else
286 Informant_sig.Move_along
287 | Tracking env ->
288 process env
292 type env = {
293 (** Reports for an Active informant are made by pinging the
294 * revision_tracker. *)
295 revision_tracker : Revision_tracker.t;
298 type t =
299 (** Informant is active. *)
300 | Active of env
301 (** We don't run the informant if Watchman fails to initialize,
302 * or if Watchman subscriptions are disabled in the local config. *)
303 | Resigned
305 let init { root; allow_subscriptions; } =
306 (** Active informant requires Watchman subscriptions. *)
307 if not allow_subscriptions then
308 Resigned
309 else
310 let watchman = Watchman.init {
311 Watchman.subscribe_mode = Some Watchman.Drop_changes;
312 init_timeout = 30;
313 sync_directory = "";
314 root;
315 } in
316 match watchman with
317 | None -> Resigned
318 | Some watchman_env ->
319 Active
321 revision_tracker = Revision_tracker.init
322 (Watchman.Watchman_alive watchman_env) root;
325 let report informant = match informant with
326 | Resigned -> Informant_sig.Move_along
327 | Active env ->
328 Revision_tracker.make_report env.revision_tracker