From deec628182123e313cf43170cb78e4fe10fd9a43 Mon Sep 17 00:00:00 2001 From: Lucian Wischik Date: Wed, 11 Dec 2019 09:17:39 -0800 Subject: [PATCH] avoid race in Lwt.pick Summary: There's a race in clientIdeService.serve... ``` Lwt.pick [ send_queued_up_messages t.out_fd ...; emit_messages_from_daemon t.in_fd ...; ] ``` Both forks of the pick involve `marshal_tools_lwt`. In other words, they each involve a sequence of async writes, or a sequence of async reads. Imagine if both forks start, and both perform part of their sequence of reads, but then one of the forks wins. Lwt will "attempt to cancel" the other fork. * Imagine if it cancels the write fork after only some of the async writes -- the next time we write a message, then what the consumer unmarshals will be a weird amalgam of some bytes from the start of one message and some bytes from the start of the next, which will lead to a marshal error. * Imagine if it cancels the read fork after only some of the async reads -- the next time we read a message, what we unmarshal will be an amalgam of some bytes from the end of one message and some bytes from the start of the next, which will lead to a marshal error. We have been observing marshal errors in the wild. This seems the likeliest explannation. WHAT THIS DIFF DOES: I rewrote the "serve" loop to avoid the race. I copied the style from ClientLsp.get_message_source, which also uses `Lwt_unix.wait_read` to discover whether there's an incoming message available. The call to `wait_read` is easily cancellable with no harm done. Another option would have been to make Marshal_tools atomic. We still probably should do that. But I picked the avenue in this diff because it was straightforward, and because I think it makes the code look cleaner. I think that multiple async loops, while possibile, make the code harder to reason about. I took the opportunity to factor out the "shutdown" action and document it a bit better. Differential Revision: D18927910 fbshipit-source-id: 8a8694eb95ba221e084cd117e8cf5debe09c2107 --- .../src/client/ide_service/clientIdeService.ml | 131 +++++++++++---------- 1 file changed, 69 insertions(+), 62 deletions(-) diff --git a/hphp/hack/src/client/ide_service/clientIdeService.ml b/hphp/hack/src/client/ide_service/clientIdeService.ml index 73df3c1aec8..f60bd236140 100644 --- a/hphp/hack/src/client/ide_service/clientIdeService.ml +++ b/hphp/hack/src/client/ide_service/clientIdeService.ml @@ -333,74 +333,81 @@ let stop (t : t) ~(tracking_id : string) ~(reason : Stop_reason.t) : unit Lwt.t set_state t (Stopped reason); Lwt.return_unit +let cleanup_upon_shutdown_or_exn (t : t) ~(e : Exception.t option) : unit Lwt.t + = + (* We are invoked with e=None when one of the message-queues has said that + it's closed. This indicates an orderly shutdown has been performed by 'stop'. + We are invoked with e=Some when we had an exception in our main serve loop. *) + let (message, stack) = + match e with + | None -> ("message-queue closed", "") + | Some e -> (Exception.get_ctor_string e, Exception.get_backtrace_string e) + in + HackEventLogger.serverless_ide_crash ~message ~stack; + log "Shutdown triggered by %s\n%s" message stack; + (* We might as well call 'stop' in both cases; there'll be no harm. *) + match t.state with + | Stopped _ -> Lwt.return_unit + | _ -> + log "Shutting down..."; + let%lwt () = stop t ~tracking_id:"exception" ~reason:Stop_reason.Crashed in + Lwt.return_unit + let rec serve (t : t) : unit Lwt.t = - let send_queued_up_messages ~out_fd messages_to_send : bool Lwt.t = - let%lwt next_message = Lwt_message_queue.pop messages_to_send in - match next_message with - | None -> Lwt.return false - | Some (Message_wrapper next_message) -> + (* Behavior of 'serve' is (1) take items from `messages_to_send` and send + then over the wire to the daemon, (2) take items from the wire from the + daemon and put them onto `notification_emitter` or `response_emitter` queues, + (3) keep doing this until we discover that a queue has been closed, which + is the "cancellation" signal for us to stop our loop. + The code looks a bit funny because the only way to tell if a queue is closed + is when we attemept to awaitingly-read or synchronously-write to it. *) + try%lwt + (* We mutate the data in `t` which is why we don't return a new `t` here. *) + let%lwt next_action = + (* Care! we only put things in a Pick which are safe to cancel. *) + Lwt.pick + [ + (let%lwt outgoing_opt = Lwt_message_queue.pop t.messages_to_send in + match outgoing_opt with + | None -> Lwt.return `Close + | Some outgoing -> Lwt.return (`Outgoing outgoing)); + (let%lwt () = Lwt_unix.wait_read t.in_fd in + Lwt.return `Incoming); + ] + in + match next_action with + | `Close -> + let%lwt () = cleanup_upon_shutdown_or_exn t ~e:None in + Lwt.return_unit + | `Outgoing (Message_wrapper next_message) -> log_debug "-> %s" (ClientIdeMessage.tracked_t_to_string next_message); let%lwt (_ : int) = - Marshal_tools_lwt.to_fd_with_preamble out_fd next_message + Marshal_tools_lwt.to_fd_with_preamble t.out_fd next_message in - Lwt.return true - in - let emit_messages_from_daemon ~in_fd response_emitter notification_emitter : - bool Lwt.t = - let%lwt (message : ClientIdeMessage.message_from_daemon) = - Marshal_tools_lwt.from_fd_with_preamble in_fd - in - log_debug "<- %s" (ClientIdeMessage.message_from_daemon_to_string message); - match message with - | ClientIdeMessage.Notification notification -> - process_status_notification t notification; - Lwt.return (Lwt_message_queue.push notification_emitter notification) - | ClientIdeMessage.Response response -> - Lwt.return - (Lwt_message_queue.push response_emitter (Response_wrapper response)) - in - let%lwt should_continue = - try%lwt - (* We mutate the data in `t`, which is why we don't return a new `t` here. - *) - let%lwt should_continue = - Lwt.pick - [ - send_queued_up_messages ~out_fd:t.out_fd t.messages_to_send; - emit_messages_from_daemon - ~in_fd:t.in_fd - t.response_emitter - t.notification_emitter; - ] + serve t + | `Incoming -> + let%lwt (message : ClientIdeMessage.message_from_daemon) = + Marshal_tools_lwt.from_fd_with_preamble t.in_fd in - if should_continue = false then - HackEventLogger.serverless_ide_crash - ~message: - "No crash, but should_continue set to false by message queue." - ~stack:"No stack"; - Lwt.return should_continue - with e -> - let e = Exception.wrap e in - let stack = Exception.get_backtrace_string e in - let message = Exception.to_string e in - HackEventLogger.serverless_ide_crash ~message ~stack; - log "Exception occurred in ClientIdeService.serve: %s" message; - Lwt.return false - in - if should_continue then - serve t - else ( - log "Shutting down"; - match t.state with - | Stopped _ -> - (* Already stopped, don't do anything. *) - Lwt.return_unit - | _ -> - let%lwt () = - stop t ~tracking_id:"exception" ~reason:Stop_reason.Crashed + log_debug "<- %s" (ClientIdeMessage.message_from_daemon_to_string message); + let queue_is_open = + match message with + | ClientIdeMessage.Notification notification -> + process_status_notification t notification; + Lwt_message_queue.push t.notification_emitter notification + | ClientIdeMessage.Response response -> + Lwt_message_queue.push t.response_emitter (Response_wrapper response) in - Lwt.return_unit - ) + if queue_is_open then + serve t + else + let%lwt () = cleanup_upon_shutdown_or_exn t ~e:None in + Lwt.return_unit + with e -> + let e = Exception.wrap e in + (* cleanup function below will log the exception *) + let%lwt () = cleanup_upon_shutdown_or_exn t ~e:(Some e) in + Lwt.return_unit let push_message (t : t) (message : message_wrapper) : unit = match t.state with -- 2.11.4.GIT