2 * Copyright (c) 2015, Facebook, Inc.
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the "hack" directory of this source tree. An additional grant
7 * of patent rights can be found in the PATENTS file in the same directory.
11 exception Timeout
of int
13 (* The IDs are used to tell the difference between timeout A timing out and timeout B timing out.
14 * So they only really need to be unique between any two active timeouts in the same process. *)
15 let id_counter = ref 0
16 let mk_id () = incr
id_counter; !id_counter
18 module Alarm_timeout
= struct
23 let with_timeout ~timeout ~on_timeout ~do_
=
25 let callback () = raise
(Timeout
id) in
27 let timer = Timer.set_timer ~interval
:(float_of_int timeout
) ~
callback in
31 (* Any uncaught exception will cancel the timeout *)
32 Timer.cancel_timer
timer;
35 Timer.cancel_timer
timer;
37 with Timeout exn_id
when exn_id
= id ->
40 let check_timeout _
= ()
42 let select ?timeout
:_
= Sys_utils.select_non_intr
46 type in_channel
= Pervasives.in_channel
* int option
47 let ignore_timeout f ?timeout
(ic
, _pid
) = f ic
48 let input = ignore_timeout Pervasives.input
49 let really_input = ignore_timeout Pervasives.really_input
50 let input_char = ignore_timeout Pervasives.input_char
51 let input_line = ignore_timeout Pervasives.input_line
53 let input_value_with_workaround ic
=
54 (* OCaml 4.03.0 changed the behavior of input_value to no longer
55 * throw End_of_file when the pipe has closed. We can simulate that
56 * behavior, however, by trying to read a byte afterwards, which WILL
57 * raise End_of_file if the pipe has closed
58 * http://caml.inria.fr/mantis/view.php?id=7142 *)
59 try Pervasives.input_value ic
60 with Failure msg
as e
->
61 if msg
= "input_value: truncated object"
62 then Pervasives.input_char ic
|> ignore
;
65 let input_value = ignore_timeout input_value_with_workaround
66 let open_in name
= Pervasives.open_in name
, None
67 let close_in (ic
, _
) = Pervasives.close_in ic
68 let close_in_noerr (ic
, _
) = Pervasives.close_in_noerr ic
69 let in_channel_of_descr fd
= Unix.in_channel_of_descr fd
, None
70 let descr_of_in_channel (ic
, _
) = Unix.descr_of_in_channel ic
71 let open_process cmd args
=
72 let child_in_fd, out_fd
= Unix.pipe
() in
73 let in_fd, child_out_fd
= Unix.pipe
() in
74 Unix.set_close_on_exec
in_fd;
75 Unix.set_close_on_exec out_fd
;
77 Unix.(create_process cmd args
child_in_fd child_out_fd stderr
) in
78 Unix.close child_out_fd
;
79 Unix.close
child_in_fd;
80 let ic = (Unix.in_channel_of_descr in_fd, Some
pid) in
81 let oc = Unix.out_channel_of_descr out_fd
in
84 let open_process_in cmd args
=
85 let child_in_fd, out_fd
= Unix.pipe
() in
86 let in_fd, child_out_fd
= Unix.pipe
() in
87 Unix.set_close_on_exec
in_fd;
88 Unix.set_close_on_exec out_fd
;
91 Unix.(create_process cmd args
child_in_fd child_out_fd stderr
) in
92 Unix.close child_out_fd
;
93 Unix.close
child_in_fd;
94 let ic = (Unix.in_channel_of_descr in_fd, Some
pid) in
97 let close_process_in (ic, pid) =
99 | None
-> invalid_arg
"Timeout.close_process_in"
101 Pervasives.close_in ic;
102 snd
(Sys_utils.waitpid_non_intr
[] pid)
104 let read_process ~timeout ~on_timeout ~reader cmd args
=
105 let (ic, oc) = open_process cmd args
in
106 with_timeout ~timeout ~on_timeout
108 try reader timeout
ic oc
109 with exn
-> close_in ic; close_out
oc; raise exn
)
111 let open_connection ?timeout sockaddr
=
112 let (ic, oc) = Unix.open_connection sockaddr
in
115 let shutdown_connection (ic, _
) =
116 Unix.shutdown_connection ic
118 let is_timeout_exn id = function
119 | Timeout exn_id
-> exn_id
= id
124 module Select_timeout
= struct
133 { timeout
= Unix.gettimeofday
() +. timeout
; id = mk_id () }
134 let with_timeout ~timeout ~on_timeout ~do_
=
135 let t = create (float timeout
) in
137 with Timeout exn_id
when exn_id
= t.id -> on_timeout
()
139 let check_timeout t =
140 if Unix.gettimeofday
() > t.timeout
then raise
(Timeout
t.id)
149 mutable pid: int option;
152 type in_channel
= channel
154 let buffer_size = 65536-9 (* From ocaml/byterun/io.h *)
156 let in_channel_of_descr fd
=
157 let buf = Bytes.create buffer_size in
158 { fd
; buf; curr
= 0; max
= 0; pid = None
}
160 let descr_of_in_channel { fd
; _
} = fd
163 let fd = Unix.openfile name
[Unix.O_RDONLY
; Unix.O_NONBLOCK
] 0o640
in
164 in_channel_of_descr fd
166 let close_in tic
= Unix.close tic
.fd
168 let close_in_noerr tic
=
169 try Unix.close tic
.fd
172 let close_process_in tic
=
174 | None
-> invalid_arg
"Timeout.close_process_in"
177 snd
(Sys_utils.waitpid_non_intr
[] pid)
179 (* A negative timeout for select means block until a fd is ready *)
180 let no_select_timeout = ~
-.1.0
182 (* A wrapper around Sys_utils.select_non_intr. If timeout would fire before the select's timeout,
183 * then change the select's timeout and throw an exception when it fires *)
184 let select ?timeout rfds wfds xfds select_timeout
=
186 (* No timeout set, fallback to Sys_utils.select_non_intr *)
187 | None
-> Sys_utils.select_non_intr rfds wfds xfds select_timeout
188 | Some
{ timeout
; id } ->
189 let timeout = timeout -. Unix.gettimeofday
() in
190 (* Whoops, timeout already fired, throw right away! *)
191 if timeout < 0. then raise
(Timeout
id);
192 (* A negative select_timeout would mean wait forever *)
193 if select_timeout
>= 0.0 && select_timeout
< timeout
194 (* The select's timeout is smaller than our timeout, so leave it alone *)
195 then Sys_utils.select_non_intr rfds wfds xfds select_timeout
197 (* Our timeout is smaller, so use that *)
198 match Sys_utils.select_non_intr rfds wfds xfds
timeout with
199 (* Timeout hit! Throw an exception! *)
200 | [], [], [] -> raise
(Timeout
id)
201 (* Got a result before the timeout fired, so just return that *)
204 let do_read ?
timeout tic
=
205 match select ?
timeout [ tic
.fd ] [] [] no_select_timeout with
208 "This should be unreachable. How did select return with no fd when there is no timeout?"
211 Unix.read tic
.fd tic
.buf tic
.max
(buffer_size - tic
.max
)
212 with Unix.Unix_error
(Unix.EPIPE
, _
, _
) ->
214 tic
.max
<- tic
.max
+ read;
216 | _
:: _
, _
, _
-> assert false (* Should never happen *)
218 let refill ?
timeout tic
=
221 let nread = do_read ?
timeout tic
in
222 if nread = 0 then raise End_of_file
;
225 let unsafe_input ?
timeout tic s ofs len
=
226 let n = if len
> max_int
then max_int
else len
in
227 let avail = tic
.max
- tic
.curr
in
228 if n <= avail then begin (* There is enough to read in the buffer. *)
229 Bytes.blit tic
.buf tic
.curr s ofs
n;
230 tic
.curr
<- tic
.curr
+ n;
232 end else if avail > 0 then begin (* Read the rest of the buffer. *)
233 Bytes.blit tic
.buf tic
.curr s ofs
avail;
234 tic
.curr
<- tic
.curr
+ avail;
236 end else begin (* No input to read, refill buffer. *)
237 let nread = refill ?
timeout tic
in
238 let n = min
nread n in
239 Bytes.blit tic
.buf tic
.curr s ofs
n;
240 tic
.curr
<- tic
.curr
+ n;
244 let input ?
timeout tic s ofs len
=
245 if ofs
< 0 || len
< 0 || ofs
> Bytes.length s
- len
then
248 unsafe_input ?
timeout tic s ofs len
250 let input_char ?
timeout tic
=
251 if tic
.curr
= tic
.max
then ignore
(refill ?
timeout tic
);
252 tic
.curr
<- tic
.curr
+ 1;
253 Bytes.get tic
.buf (tic
.curr
- 1)
255 (* Read in channel until we discover a '\n' *)
256 let input_scan_line ?
timeout tic
=
257 let rec scan_line tic pos
=
258 if pos
< tic
.max
then
259 if Bytes.get tic
.buf pos
= '
\n'
then
262 scan_line tic
(pos
+1)
265 if tic
.curr
<> 0 then begin
266 tic
.max
<- tic
.max
- tic
.curr
;
267 Bytes.blit tic
.buf tic
.curr tic
.buf 0 tic
.max
;
273 if tic
.max
= buffer_size then
274 - (tic
.max
- tic
.curr
)
276 let nread = do_read ?
timeout tic
in
278 - (tic
.max
- tic
.curr
)
283 scan_line tic tic
.curr
285 let input_line ?
timeout tic
=
287 let rec build_result buf pos = function
290 let len = Bytes.length hd
in
291 Bytes.blit hd
0 buf (pos - len) len;
292 build_result buf (pos - len) tl
in
294 let rec scan accu
len =
296 let n = input_scan_line ?
timeout tic
in
298 (* End of file, if accu is not empty, return the last line. *)
301 | [] -> raise End_of_file
302 | _
-> build_result (Bytes.create len) len accu
304 (* New line found in the buffer. *)
305 end else if n > 0 then begin
306 let result = Bytes.create (n - 1) in (* No need to keep '\n' *)
307 ignore
(unsafe_input tic
result 0 (n - 1));
308 ignore
(input_char tic
); (* Skip newline *)
312 let len = len + n - 1 in
313 build_result (Bytes.create len) len (result :: accu
)
315 (* New line not found in the buffer *)
317 let ofs = Bytes.create (-n) in
318 ignore
(unsafe_input tic
ofs 0 (-n));
319 scan (ofs :: accu
) (len - n)
322 Bytes.unsafe_to_string
(scan [] 0)
324 let rec unsafe_really_input ?
timeout tic
buf ofs len =
328 let r = unsafe_input ?
timeout tic
buf ofs len in
330 then raise End_of_file
331 else unsafe_really_input ?
timeout tic
buf (ofs + r) (len - r)
333 let really_input ?
timeout tic
buf ofs len =
334 if ofs < 0 || len < 0 || ofs > Bytes.length
buf - len then
335 invalid_arg
"really_input"
337 unsafe_really_input ?
timeout tic
buf ofs len
341 let marshal_magic = Bytes.of_string
"\x84\x95\xA6\xBE"
342 let input_value ?
timeout tic
=
343 let magic = Bytes.create 4 in
344 magic.[0] <- input_char ?
timeout tic
;
345 magic.[1] <- input_char ?
timeout tic
;
346 magic.[2] <- input_char ?
timeout tic
;
347 magic.[3] <- input_char ?
timeout tic
;
348 if magic <> marshal_magic then
349 failwith
"Select.input_value: bad object.";
350 let b1 = int_of_char
(input_char ?
timeout tic
) in
351 let b2 = int_of_char
(input_char ?
timeout tic
) in
352 let b3 = int_of_char
(input_char ?
timeout tic
) in
353 let b4 = int_of_char
(input_char ?
timeout tic
) in
354 let len = ((b1 lsl 24) lor (b2 lsl 16) lor (b3 lsl 8) lor b4) + 12 in
355 let data = Bytes.create (len + 8) in
356 Bytes.blit
magic 0 data 0 4;
357 data.[4] <- char_of_int
b1;
358 data.[5] <- char_of_int
b2;
359 data.[6] <- char_of_int
b3;
360 data.[7] <- char_of_int
b4;
362 try unsafe_really_input ?
timeout tic
data 8 len
364 failwith
"Select.input_value: truncated object."
366 Marshal.from_bytes
data 0
370 let open_process cmd args
=
371 let child_in_fd, out_fd
= Unix.pipe
() in
372 let in_fd, child_out_fd
= Unix.pipe
() in
373 Unix.set_close_on_exec
in_fd;
374 Unix.set_close_on_exec out_fd
;
376 Unix.(create_process cmd args
child_in_fd child_out_fd stderr
) in
377 Unix.close child_out_fd
;
378 Unix.close
child_in_fd;
379 let tic = in_channel_of_descr in_fd in
381 let oc = Unix.out_channel_of_descr out_fd
in
384 let open_process_in cmd args
=
385 let child_in_fd, out_fd
= Unix.pipe
() in
386 let in_fd, child_out_fd
= Unix.pipe
() in
387 Unix.set_close_on_exec
in_fd;
388 Unix.set_close_on_exec out_fd
;
391 Unix.(create_process cmd args
child_in_fd child_out_fd stderr
) in
392 Unix.close child_out_fd
;
393 Unix.close
child_in_fd;
394 let tic = in_channel_of_descr in_fd in
398 let read_process ~
timeout ~on_timeout ~reader cmd args
=
399 let (tic, oc) = open_process cmd args
in
401 Option.iter ~f
:Sys_utils.terminate_process
tic.pid;
405 with_timeout ~
timeout ~
on_timeout
407 try reader
timeout tic oc
409 Option.iter ~f
:Sys_utils.terminate_process
tic.pid;
417 let open_connection ?
timeout sockaddr
=
418 let connect sock sockaddr
=
420 (* connect binds the fd sock to the socket at sockaddr. If sock is nonblocking, and the
421 * connect call would block, it errors. You can then use select to wait for the connect
424 * On Windows, if the connect succeeds, sock will be returned in the writable fd set.
425 * If the connect fails, the sock will be returned in the exception fd set.
426 * https://msdn.microsoft.com/en-us/library/windows/desktop/ms737625(v=vs.85).aspx
428 * On Linux, the sock will always be returned in the writable fd set, and you're supposed
429 * to use getsockopt to read the SO_ERROR option at level SOL_SOCKET to figure out if the
430 * connect worked. However, this code is only used on Windows, so that's fine *)
431 Unix.connect sock sockaddr
;
433 | Unix.Unix_error
((Unix.EINPROGRESS
| Unix.EWOULDBLOCK
), _
, _
) -> begin
434 match select ?
timeout [] [sock
] [] no_select_timeout with
435 | _
, [], [exn_sock
] when exn_sock
= sock
->
436 failwith
"Failed to connect to socket"
439 "This should be unreachable. How did select return with no fd when there is no timeout?"
441 | _
, _
, _
-> assert false
443 | exn
-> Unix.close sock
; raise exn
in
445 Unix.socket
(Unix.domain_of_sockaddr sockaddr
) Unix.SOCK_STREAM
0 in
446 Unix.set_nonblock
sock;
447 connect sock sockaddr
;
448 Unix.clear_nonblock
sock;
449 Unix.set_close_on_exec
sock;
450 let tic = in_channel_of_descr sock in
451 let oc = Unix.out_channel_of_descr
sock in
454 let shutdown_connection { fd; _
} =
455 Unix.(shutdown
fd SHUTDOWN_SEND
)
457 let is_timeout_exn {id; timeout=_
;} = function
458 | Timeout exn_id
-> exn_id
= id
468 on_timeout:(unit -> 'a
) ->
470 val check_timeout: t -> unit
473 val in_channel_of_descr: Unix.file_descr
-> in_channel
474 val descr_of_in_channel: in_channel
-> Unix.file_descr
475 val open_in: string -> in_channel
476 val close_in: in_channel
-> unit
477 val close_in_noerr: in_channel
-> unit
480 Unix.file_descr list
->
481 Unix.file_descr list
->
482 Unix.file_descr list
->
484 Unix.file_descr list
* Unix.file_descr list
* Unix.file_descr list
485 val input: ?
timeout:t -> in_channel
-> bytes
-> int -> int -> int
486 val really_input: ?
timeout:t -> in_channel
-> bytes
-> int -> int -> unit
487 val input_char: ?
timeout:t -> in_channel
-> char
488 val input_line: ?
timeout:t -> in_channel
-> string
489 val input_value: ?
timeout:t -> in_channel
-> 'a
490 val open_process: string -> string array
-> in_channel
* out_channel
491 val open_process_in: string -> string array
-> in_channel
492 val close_process_in: in_channel
-> Unix.process_status
495 on_timeout:(unit -> 'a
) ->
496 reader
:(t -> in_channel
-> out_channel
-> 'a
) ->
497 string -> string array
-> 'a
499 ?
timeout:t -> Unix.sockaddr
-> in_channel
* out_channel
500 val shutdown_connection: in_channel
-> unit
502 val is_timeout_exn: t -> exn
-> bool
505 let select = (module Select_timeout
: S
)
506 let alarm = (module Alarm_timeout
: S
)
508 include (val (if Sys.win32
then select else alarm))
510 let read_connection ~
timeout ~
on_timeout ~reader sockaddr
=
511 with_timeout ~
timeout ~
on_timeout
513 let (tic, oc) = open_connection ~
timeout sockaddr
in
514 try reader
timeout tic oc