Timeout.select
[hiphop-php.git] / hphp / hack / src / utils / sys / timeout.ml
blob867ccbe9b68fdd83db444116c1277675756409ce
1 (**
2 * Copyright (c) 2015, Facebook, Inc.
3 * All rights reserved.
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the "hack" directory of this source tree. An additional grant
7 * of patent rights can be found in the PATENTS file in the same directory.
9 *)
11 exception Timeout of int
13 (* The IDs are used to tell the difference between timeout A timing out and timeout B timing out.
14 * So they only really need to be unique between any two active timeouts in the same process. *)
15 let id_counter = ref 0
16 let mk_id () = incr id_counter; !id_counter
18 module Alarm_timeout = struct
20 (** Timeout *)
22 type t = int
23 let with_timeout ~timeout ~on_timeout ~do_ =
24 let id = mk_id () in
25 let callback () = raise (Timeout id) in
26 try
27 let timer = Timer.set_timer ~interval:(float_of_int timeout) ~callback in
28 let ret =
29 try do_ id
30 with exn ->
31 (* Any uncaught exception will cancel the timeout *)
32 Timer.cancel_timer timer;
33 raise exn
35 Timer.cancel_timer timer;
36 ret
37 with Timeout exn_id when exn_id = id ->
38 on_timeout ()
40 let check_timeout _ = ()
42 let select ?timeout:_ = Sys_utils.select_non_intr
44 (** Channel *)
46 type in_channel = Pervasives.in_channel * int option
47 let ignore_timeout f ?timeout (ic, _pid) = f ic
48 let input = ignore_timeout Pervasives.input
49 let really_input = ignore_timeout Pervasives.really_input
50 let input_char = ignore_timeout Pervasives.input_char
51 let input_line = ignore_timeout Pervasives.input_line
53 let input_value_with_workaround ic =
54 (* OCaml 4.03.0 changed the behavior of input_value to no longer
55 * throw End_of_file when the pipe has closed. We can simulate that
56 * behavior, however, by trying to read a byte afterwards, which WILL
57 * raise End_of_file if the pipe has closed
58 * http://caml.inria.fr/mantis/view.php?id=7142 *)
59 try Pervasives.input_value ic
60 with Failure msg as e ->
61 if msg = "input_value: truncated object"
62 then Pervasives.input_char ic |> ignore;
63 raise e
65 let input_value = ignore_timeout input_value_with_workaround
66 let open_in name = Pervasives.open_in name, None
67 let close_in (ic, _) = Pervasives.close_in ic
68 let close_in_noerr (ic, _) = Pervasives.close_in_noerr ic
69 let in_channel_of_descr fd = Unix.in_channel_of_descr fd, None
70 let descr_of_in_channel (ic, _) = Unix.descr_of_in_channel ic
71 let open_process cmd args =
72 let child_in_fd, out_fd = Unix.pipe () in
73 let in_fd, child_out_fd = Unix.pipe () in
74 Unix.set_close_on_exec in_fd;
75 Unix.set_close_on_exec out_fd;
76 let pid =
77 Unix.(create_process cmd args child_in_fd child_out_fd stderr) in
78 Unix.close child_out_fd;
79 Unix.close child_in_fd;
80 let ic = (Unix.in_channel_of_descr in_fd, Some pid) in
81 let oc = Unix.out_channel_of_descr out_fd in
82 (ic, oc)
84 let open_process_in cmd args =
85 let child_in_fd, out_fd = Unix.pipe () in
86 let in_fd, child_out_fd = Unix.pipe () in
87 Unix.set_close_on_exec in_fd;
88 Unix.set_close_on_exec out_fd;
89 Unix.close out_fd;
90 let pid =
91 Unix.(create_process cmd args child_in_fd child_out_fd stderr) in
92 Unix.close child_out_fd;
93 Unix.close child_in_fd;
94 let ic = (Unix.in_channel_of_descr in_fd, Some pid) in
97 let close_process_in (ic, pid) =
98 match pid with
99 | None -> invalid_arg "Timeout.close_process_in"
100 | Some pid ->
101 Pervasives.close_in ic;
102 snd(Sys_utils.waitpid_non_intr [] pid)
104 let read_process ~timeout ~on_timeout ~reader cmd args =
105 let (ic, oc) = open_process cmd args in
106 with_timeout ~timeout ~on_timeout
107 ~do_:(fun timeout ->
108 try reader timeout ic oc
109 with exn -> close_in ic; close_out oc; raise exn)
111 let open_connection ?timeout sockaddr =
112 let (ic, oc) = Unix.open_connection sockaddr in
113 ((ic, None), oc)
115 let shutdown_connection (ic, _) =
116 Unix.shutdown_connection ic
118 let is_timeout_exn id = function
119 | Timeout exn_id -> exn_id = id
120 | _ -> false
124 module Select_timeout = struct
126 (** Timeout *)
128 type t = {
129 timeout: float;
130 id: int;
132 let create timeout =
133 { timeout = Unix.gettimeofday () +. timeout; id = mk_id () }
134 let with_timeout ~timeout ~on_timeout ~do_ =
135 let t = create (float timeout) in
136 try do_ t
137 with Timeout exn_id when exn_id = t.id -> on_timeout ()
139 let check_timeout t =
140 if Unix.gettimeofday () > t.timeout then raise (Timeout t.id)
142 (** Channel *)
144 type channel = {
145 fd: Unix.file_descr;
146 buf: Bytes.t;
147 mutable curr: int;
148 mutable max: int;
149 mutable pid: int option;
152 type in_channel = channel
154 let buffer_size = 65536-9 (* From ocaml/byterun/io.h *)
156 let in_channel_of_descr fd =
157 let buf = Bytes.create buffer_size in
158 { fd; buf; curr = 0; max = 0; pid = None }
160 let descr_of_in_channel { fd; _ } = fd
162 let open_in name =
163 let fd = Unix.openfile name [Unix.O_RDONLY; Unix.O_NONBLOCK] 0o640 in
164 in_channel_of_descr fd
166 let close_in tic = Unix.close tic.fd
168 let close_in_noerr tic =
169 try Unix.close tic.fd
170 with _ -> ()
172 let close_process_in tic =
173 match tic.pid with
174 | None -> invalid_arg "Timeout.close_process_in"
175 | Some pid ->
176 close_in tic;
177 snd (Sys_utils.waitpid_non_intr [] pid)
179 (* A negative timeout for select means block until a fd is ready *)
180 let no_select_timeout = ~-.1.0
182 (* A wrapper around Sys_utils.select_non_intr. If timeout would fire before the select's timeout,
183 * then change the select's timeout and throw an exception when it fires *)
184 let select ?timeout rfds wfds xfds select_timeout =
185 match timeout with
186 (* No timeout set, fallback to Sys_utils.select_non_intr *)
187 | None -> Sys_utils.select_non_intr rfds wfds xfds select_timeout
188 | Some { timeout; id } ->
189 let timeout = timeout -. Unix.gettimeofday () in
190 (* Whoops, timeout already fired, throw right away! *)
191 if timeout < 0. then raise (Timeout id);
192 (* A negative select_timeout would mean wait forever *)
193 if select_timeout >= 0.0 && select_timeout < timeout
194 (* The select's timeout is smaller than our timeout, so leave it alone *)
195 then Sys_utils.select_non_intr rfds wfds xfds select_timeout
196 else
197 (* Our timeout is smaller, so use that *)
198 match Sys_utils.select_non_intr rfds wfds xfds timeout with
199 (* Timeout hit! Throw an exception! *)
200 | [], [], [] -> raise (Timeout id)
201 (* Got a result before the timeout fired, so just return that *)
202 | ret -> ret
204 let do_read ?timeout tic =
205 match select ?timeout [ tic.fd ] [] [] no_select_timeout with
206 | [], _, _ ->
207 failwith
208 "This should be unreachable. How did select return with no fd when there is no timeout?"
209 | [_], _, _ ->
210 let read = try
211 Unix.read tic.fd tic.buf tic.max (buffer_size - tic.max)
212 with Unix.Unix_error (Unix.EPIPE, _, _) ->
213 raise End_of_file in
214 tic.max <- tic.max + read;
215 read
216 | _ :: _, _, _-> assert false (* Should never happen *)
218 let refill ?timeout tic =
219 tic.curr <- 0;
220 tic.max <- 0;
221 let nread = do_read ?timeout tic in
222 if nread = 0 then raise End_of_file;
223 nread
225 let unsafe_input ?timeout tic s ofs len =
226 let n = if len > max_int then max_int else len in
227 let avail = tic.max - tic.curr in
228 if n <= avail then begin (* There is enough to read in the buffer. *)
229 Bytes.blit tic.buf tic.curr s ofs n;
230 tic.curr <- tic.curr + n;
232 end else if avail > 0 then begin (* Read the rest of the buffer. *)
233 Bytes.blit tic.buf tic.curr s ofs avail;
234 tic.curr <- tic.curr + avail;
235 avail
236 end else begin (* No input to read, refill buffer. *)
237 let nread = refill ?timeout tic in
238 let n = min nread n in
239 Bytes.blit tic.buf tic.curr s ofs n;
240 tic.curr <- tic.curr + n;
244 let input ?timeout tic s ofs len =
245 if ofs < 0 || len < 0 || ofs > Bytes.length s - len then
246 invalid_arg "input"
247 else
248 unsafe_input ?timeout tic s ofs len
250 let input_char ?timeout tic =
251 if tic.curr = tic.max then ignore (refill ?timeout tic);
252 tic.curr <- tic.curr + 1;
253 Bytes.get tic.buf (tic.curr - 1)
255 (* Read in channel until we discover a '\n' *)
256 let input_scan_line ?timeout tic =
257 let rec scan_line tic pos =
258 if pos < tic.max then
259 if Bytes.get tic.buf pos = '\n' then
260 pos - tic.curr + 1
261 else
262 scan_line tic (pos+1)
263 else begin
264 let pos =
265 if tic.curr <> 0 then begin
266 tic.max <- tic.max - tic.curr;
267 Bytes.blit tic.buf tic.curr tic.buf 0 tic.max;
268 tic.curr <- 0;
269 tic.max
270 end else
273 if tic.max = buffer_size then
274 - (tic.max - tic.curr)
275 else
276 let nread = do_read ?timeout tic in
277 if nread = 0 then
278 - (tic.max - tic.curr)
279 else begin
280 scan_line tic pos
282 end in
283 scan_line tic tic.curr
285 let input_line ?timeout tic =
287 let rec build_result buf pos = function
288 | [] -> buf
289 | hd :: tl ->
290 let len = Bytes.length hd in
291 Bytes.blit hd 0 buf (pos - len) len;
292 build_result buf (pos - len) tl in
294 let rec scan accu len =
296 let n = input_scan_line ?timeout tic in
298 (* End of file, if accu is not empty, return the last line. *)
299 if n = 0 then begin
300 match accu with
301 | [] -> raise End_of_file
302 | _ -> build_result (Bytes.create len) len accu
304 (* New line found in the buffer. *)
305 end else if n > 0 then begin
306 let result = Bytes.create (n - 1) in (* No need to keep '\n' *)
307 ignore (unsafe_input tic result 0 (n - 1));
308 ignore (input_char tic); (* Skip newline *)
309 match accu with
310 | [] -> result
311 | _ ->
312 let len = len + n - 1 in
313 build_result (Bytes.create len) len (result :: accu)
315 (* New line not found in the buffer *)
316 end else begin
317 let ofs = Bytes.create (-n) in
318 ignore (unsafe_input tic ofs 0 (-n));
319 scan (ofs :: accu) (len - n)
320 end in
322 Bytes.unsafe_to_string (scan [] 0)
324 let rec unsafe_really_input ?timeout tic buf ofs len =
325 if len = 0 then
327 else
328 let r = unsafe_input ?timeout tic buf ofs len in
329 if r = 0
330 then raise End_of_file
331 else unsafe_really_input ?timeout tic buf (ofs + r) (len - r)
333 let really_input ?timeout tic buf ofs len =
334 if ofs < 0 || len < 0 || ofs > Bytes.length buf - len then
335 invalid_arg "really_input"
336 else
337 unsafe_really_input ?timeout tic buf ofs len
339 (** Marshal *)
341 let marshal_magic = Bytes.of_string "\x84\x95\xA6\xBE"
342 let input_value ?timeout tic =
343 let magic = Bytes.create 4 in
344 magic.[0] <- input_char ?timeout tic;
345 magic.[1] <- input_char ?timeout tic;
346 magic.[2] <- input_char ?timeout tic;
347 magic.[3] <- input_char ?timeout tic;
348 if magic <> marshal_magic then
349 failwith "Select.input_value: bad object.";
350 let b1 = int_of_char (input_char ?timeout tic) in
351 let b2 = int_of_char (input_char ?timeout tic) in
352 let b3 = int_of_char (input_char ?timeout tic) in
353 let b4 = int_of_char (input_char ?timeout tic) in
354 let len = ((b1 lsl 24) lor (b2 lsl 16) lor (b3 lsl 8) lor b4) + 12 in
355 let data = Bytes.create (len + 8) in
356 Bytes.blit magic 0 data 0 4;
357 data.[4] <- char_of_int b1;
358 data.[5] <- char_of_int b2;
359 data.[6] <- char_of_int b3;
360 data.[7] <- char_of_int b4;
361 begin
362 try unsafe_really_input ?timeout tic data 8 len
363 with End_of_file ->
364 failwith "Select.input_value: truncated object."
365 end;
366 Marshal.from_bytes data 0
368 (** Process *)
370 let open_process cmd args =
371 let child_in_fd, out_fd = Unix.pipe () in
372 let in_fd, child_out_fd = Unix.pipe () in
373 Unix.set_close_on_exec in_fd;
374 Unix.set_close_on_exec out_fd;
375 let pid =
376 Unix.(create_process cmd args child_in_fd child_out_fd stderr) in
377 Unix.close child_out_fd;
378 Unix.close child_in_fd;
379 let tic = in_channel_of_descr in_fd in
380 tic.pid <- Some pid;
381 let oc = Unix.out_channel_of_descr out_fd in
382 (tic, oc)
384 let open_process_in cmd args =
385 let child_in_fd, out_fd = Unix.pipe () in
386 let in_fd, child_out_fd = Unix.pipe () in
387 Unix.set_close_on_exec in_fd;
388 Unix.set_close_on_exec out_fd;
389 Unix.close out_fd;
390 let pid =
391 Unix.(create_process cmd args child_in_fd child_out_fd stderr) in
392 Unix.close child_out_fd;
393 Unix.close child_in_fd;
394 let tic = in_channel_of_descr in_fd in
395 tic.pid <- Some pid;
398 let read_process ~timeout ~on_timeout ~reader cmd args =
399 let (tic, oc) = open_process cmd args in
400 let on_timeout () =
401 Option.iter ~f:Sys_utils.terminate_process tic.pid;
402 tic.pid <- None;
403 on_timeout ()
405 with_timeout ~timeout ~on_timeout
406 ~do_:(fun timeout ->
407 try reader timeout tic oc
408 with exn ->
409 Option.iter ~f:Sys_utils.terminate_process tic.pid;
410 tic.pid <- None;
411 close_in tic;
412 close_out oc;
413 raise exn)
415 (** Socket *)
417 let open_connection ?timeout sockaddr =
418 let connect sock sockaddr =
420 (* connect binds the fd sock to the socket at sockaddr. If sock is nonblocking, and the
421 * connect call would block, it errors. You can then use select to wait for the connect
422 * to finish.
424 * On Windows, if the connect succeeds, sock will be returned in the writable fd set.
425 * If the connect fails, the sock will be returned in the exception fd set.
426 * https://msdn.microsoft.com/en-us/library/windows/desktop/ms737625(v=vs.85).aspx
428 * On Linux, the sock will always be returned in the writable fd set, and you're supposed
429 * to use getsockopt to read the SO_ERROR option at level SOL_SOCKET to figure out if the
430 * connect worked. However, this code is only used on Windows, so that's fine *)
431 Unix.connect sock sockaddr;
432 with
433 | Unix.Unix_error ((Unix.EINPROGRESS | Unix.EWOULDBLOCK), _, _) -> begin
434 match select ?timeout [] [sock] [] no_select_timeout with
435 | _, [], [exn_sock] when exn_sock = sock ->
436 failwith "Failed to connect to socket"
437 | _, [], _ ->
438 failwith
439 "This should be unreachable. How did select return with no fd when there is no timeout?"
440 | _, [sock], _ -> ()
441 | _, _, _ -> assert false
443 | exn -> Unix.close sock; raise exn in
444 let sock =
445 Unix.socket (Unix.domain_of_sockaddr sockaddr) Unix.SOCK_STREAM 0 in
446 Unix.set_nonblock sock;
447 connect sock sockaddr;
448 Unix.clear_nonblock sock;
449 Unix.set_close_on_exec sock;
450 let tic = in_channel_of_descr sock in
451 let oc = Unix.out_channel_of_descr sock in
452 (tic, oc)
454 let shutdown_connection { fd; _ } =
455 Unix.(shutdown fd SHUTDOWN_SEND)
457 let is_timeout_exn {id; timeout=_;} = function
458 | Timeout exn_id -> exn_id = id
459 | _ -> false
463 module type S = sig
465 type t
466 val with_timeout:
467 timeout:int ->
468 on_timeout:(unit -> 'a) ->
469 do_:(t -> 'a) -> 'a
470 val check_timeout: t -> unit
472 type in_channel
473 val in_channel_of_descr: Unix.file_descr -> in_channel
474 val descr_of_in_channel: in_channel -> Unix.file_descr
475 val open_in: string -> in_channel
476 val close_in: in_channel -> unit
477 val close_in_noerr: in_channel -> unit
478 val select:
479 ?timeout:t ->
480 Unix.file_descr list ->
481 Unix.file_descr list ->
482 Unix.file_descr list ->
483 float ->
484 Unix.file_descr list * Unix.file_descr list * Unix.file_descr list
485 val input: ?timeout:t -> in_channel -> bytes -> int -> int -> int
486 val really_input: ?timeout:t -> in_channel -> bytes -> int -> int -> unit
487 val input_char: ?timeout:t -> in_channel -> char
488 val input_line: ?timeout:t -> in_channel -> string
489 val input_value: ?timeout:t -> in_channel -> 'a
490 val open_process: string -> string array -> in_channel * out_channel
491 val open_process_in: string -> string array -> in_channel
492 val close_process_in: in_channel -> Unix.process_status
493 val read_process:
494 timeout:int ->
495 on_timeout:(unit -> 'a) ->
496 reader:(t -> in_channel -> out_channel -> 'a) ->
497 string -> string array -> 'a
498 val open_connection:
499 ?timeout:t -> Unix.sockaddr -> in_channel * out_channel
500 val shutdown_connection: in_channel -> unit
502 val is_timeout_exn: t -> exn -> bool
505 let select = (module Select_timeout : S)
506 let alarm = (module Alarm_timeout : S)
508 include (val (if Sys.win32 then select else alarm))
510 let read_connection ~timeout ~on_timeout ~reader sockaddr =
511 with_timeout ~timeout ~on_timeout
512 ~do_:(fun timeout ->
513 let (tic, oc) = open_connection ~timeout sockaddr in
514 try reader timeout tic oc
515 with exn ->
516 close_out oc;
517 raise exn)