2 * Copyright (c) 2015, Facebook, Inc.
5 * This source code is licensed under the MIT license found in the
6 * LICENSE file in the "hack" directory of this source tree.
11 * This tool allows for marshaling directly over file descriptors (instead of
12 * ocaml "channels") to avoid buffering so that we can safely use marshaling
13 * and libancillary together.
16 * Ocaml's marshaling is done over channels, which have their own internal
17 * buffer. This means after reading a marshaled object from a channel, the
18 * FD's position is not guaranteed to be pointing to the beginning of the
19 * next marshaled object (but instead points to the position after the
20 * buffered read). So another process cannot receive this FD (over
21 * libancillary) to start reading the next object.
24 * Start each message with a fixed-size preamble that describes the
25 * size of the payload to read. Read precisely that many bytes directly
26 * from the FD avoiding Ocaml channels entirely.
29 exception Invalid_Int_Size_Exception
30 exception Payload_Size_Too_Large_Exception
31 exception Malformed_Preamble_Exception
32 exception Writing_Preamble_Exception
33 exception Writing_Payload_Exception
34 exception Reading_Preamble_Exception
35 exception Reading_Payload_Exception
37 (* We want to marshal exceptions (or at least their message+stacktrace) over *)
38 (* the wire. This type ensures that no one will attempt to pattern-match on *)
39 (* the thing we marshal: 'Values of extensible variant types, for example *)
40 (* exceptions (of extensible type exn), returned by the unmarhsaller should *)
41 (* not be pattern-matched over, because unmarshalling does not preserve the *)
42 (* information required for matching their constructors.' *)
43 (* https://caml.inria.fr/pub/docs/manual-ocaml/libref/Marshal.html *)
44 type remote_exception_data
= {
49 module type WRITER_READER
= sig
53 val return
: 'a
-> 'a result
54 val fail
: exn
-> 'a result
55 val (>>=): 'a result
-> ('a
-> 'b result
) -> 'b result
57 val write
: ?timeout
:Timeout.t
-> fd
-> buffer
:bytes
-> offset
:int -> size
:int -> int result
58 val read
: ?timeout
:Timeout.t
-> fd
-> buffer
:bytes
-> offset
:int -> size
:int -> int result
60 val log
: string -> unit
63 module type REGULAR_WRITER_READER
= WRITER_READER
64 with type 'a result
= 'a
and type fd
= Unix.file_descr
66 module RegularWriterReader
: REGULAR_WRITER_READER
= struct
68 type fd
= Unix.file_descr
71 let fail exn
= raise exn
74 let rec write ?timeout fd ~buffer ~offset ~size
=
75 match Timeout.select ?timeout
[] [fd
] [] ~
-.1.0 with
78 (* Timeout.select handles EINTR, but the Unix.write call can also be interrupted. If the write
79 * is interrupted before any bytes are written, the call fails with EINTR. Otherwise, the call
80 * succeeds and returns the number of bytes written.
82 try Unix.write fd buffer offset size
83 with Unix.Unix_error
(Unix.EINTR
, _
, _
) ->
84 write ?timeout fd ~buffer ~offset ~size
86 (* Marshal_tools reads from file descriptors. These file descriptors might be for some
87 * non-blocking socket. Normally if you try to read from an fd, it will block until some data is
88 * ready. But if you try to read from a non-blocking socket and it's not ready, you get an
91 * People using Marshal_tools probably are calling Unix.select first. However that only guarantees
92 * that the first read won't block. Marshal_tools will always do at least 2 reads (one for the
93 * preamble and one or more for the data). Any read after the first might block.
95 let rec read ?timeout fd ~buffer ~offset ~size
=
96 match Timeout.select ?timeout
[fd
] [] [] ~
-.1.0 with
99 (* Timeout.select handles EINTR, but the Unix.read call can also be interrupted. If the read
100 * is interrupted before any bytes are read, the call fails with EINTR. Otherwise, the call
101 * succeeds and returns the number of bytes read.
103 try Unix.read fd buffer offset size
104 with Unix.Unix_error
(Unix.EINTR
, _
, _
) ->
105 read ?timeout fd ~buffer ~offset ~size
107 let log str
= Printf.eprintf
"%s\n%!" str
110 module MarshalToolsFunctor
(WriterReader
: WRITER_READER
): sig
111 val expected_preamble_size
: int
112 val to_fd_with_preamble
:
113 ?timeout
:Timeout.t
->
114 ?flags
:Marshal.extern_flags list
->
117 int WriterReader.result
118 val from_fd_with_preamble
: ?timeout
:Timeout.t
-> WriterReader.fd
-> 'a
WriterReader.result
121 let (>>=) = WriterReader.(>>=)
123 let preamble_start_sentinel = '
\142'
124 (** Size in bytes. *)
125 let preamble_core_size = 4
126 let expected_preamble_size = preamble_core_size + 1
127 (** Payload size in bytes = 2^31 - 1. *)
128 let maximum_payload_size = (1 lsl (preamble_core_size * 8)) - 1
130 let get_preamble_core (size
: int) =
131 (** We limit payload size to 2^31 - 1 bytes. *)
132 if size
>= maximum_payload_size then
133 raise Payload_Size_Too_Large_Exception
;
134 let rec loop i
(remainder
: int) acc
=
136 else loop (i
- 1) (remainder
/ 256)
137 (Bytes.set acc i
(Char.chr
(remainder
mod 256)); acc
) in
138 loop (preamble_core_size - 1) size
(Bytes.create
preamble_core_size)
140 let make_preamble (size
: int) =
141 let preamble_core = get_preamble_core size
in
142 let preamble = Bytes.create
(preamble_core_size + 1) in
143 Bytes.set
preamble 0 preamble_start_sentinel;
144 Bytes.blit
preamble_core 0 preamble 1 4;
147 let parse_preamble preamble =
148 if (Bytes.length
preamble) <> expected_preamble_size
149 || (Bytes.get
preamble 0) <> preamble_start_sentinel then
150 raise Malformed_Preamble_Exception
;
153 else loop (i
+ 1) ((acc
* 256) + (int_of_char
(Bytes.get
preamble i
))) in
156 let rec write_payload ?timeout fd buffer offset to_write
=
157 if to_write
= 0 then WriterReader.return offset
else begin
158 WriterReader.write ?timeout fd ~buffer ~offset ~size
:to_write
159 >>= (fun bytes_written
->
160 if bytes_written
= 0 then WriterReader.return offset
else begin
161 write_payload ?timeout fd buffer
(offset
+bytes_written
) (to_write
-bytes_written
)
166 (* Returns the size of the marshaled payload *)
167 let to_fd_with_preamble ?timeout ?
(flags
=[]) fd obj
=
168 let payload = Marshal.to_bytes obj flags
in
169 let size = Bytes.length
payload in
170 let preamble = make_preamble size in
171 write_payload ?timeout fd
preamble 0 expected_preamble_size
172 >>= (fun preamble_bytes_written
->
173 if preamble_bytes_written
<> expected_preamble_size
174 then WriterReader.fail Writing_Preamble_Exception
175 else WriterReader.return ())
176 >>= (fun () -> write_payload ?timeout fd
payload 0 size)
177 >>= (fun bytes_written
->
178 if bytes_written
<> size
179 then WriterReader.fail Writing_Payload_Exception
180 else WriterReader.return size)
182 let rec read_payload ?timeout fd buffer offset to_read
=
183 if to_read
= 0 then WriterReader.return offset
else begin
184 WriterReader.read ?timeout fd ~buffer ~offset ~
size:to_read
185 >>= (fun bytes_read
->
186 if bytes_read
= 0 then WriterReader.return offset
else begin
187 read_payload ?timeout fd buffer
(offset
+bytes_read
) (to_read
-bytes_read
)
192 let from_fd_with_preamble ?timeout fd
=
193 let preamble = Bytes.create
expected_preamble_size in
194 WriterReader.read ?timeout fd ~buffer
:preamble ~offset
:0 ~
size:expected_preamble_size
195 >>= (fun bytes_read
->
197 (** Unix manpage for read says 0 bytes read indicates end of file. *)
198 then WriterReader.fail End_of_file
200 if (bytes_read
<> expected_preamble_size)
202 WriterReader.log (Printf.sprintf
"Error, only read %d bytes for preamble." bytes_read
);
203 WriterReader.fail Reading_Preamble_Exception
205 else WriterReader.return ()
208 let payload_size = parse_preamble preamble in
209 let payload = Bytes.create
payload_size in
210 read_payload ?timeout fd
payload 0 payload_size
211 >>= (fun payload_size_read
->
212 if (payload_size_read
<> payload_size)
213 then WriterReader.fail Reading_Payload_Exception
214 else WriterReader.return (Marshal.from_bytes
payload 0)
219 module RegularMarshalTools
= MarshalToolsFunctor
(RegularWriterReader
)
220 include RegularMarshalTools