Curl_lwt: handle Lwt.cancel
[ocurl.git] / curl_lwt.ml
blobc02ac041b654922b6585b08f34d2f3f25a884d3c
1 (** Lwt support for Curl *)
3 module M = Curl.Multi
5 let debug = ref false
6 let log fmt = Printf.ksprintf prerr_endline fmt
8 let set_debug x = debug := x
10 let int_of_fd : Unix.file_descr -> int = Obj.magic
12 type multi = {
13 mt : Curl.Multi.mt;
14 all_events : (Unix.file_descr, Lwt_engine.event list) Hashtbl.t;
15 wakeners : (Curl.t, Curl.curlCode Lwt.u) Hashtbl.t;
18 let create () =
19 let mt = M.create () in
20 let all_events = Hashtbl.create 32 in
21 let wakeners = Hashtbl.create 32 in
22 let finished s =
23 if !debug then log "finished %s" s;
24 let rec loop n =
25 match M.remove_finished mt with
26 | None -> if n > 0 && !debug then log "removed %u handles via %s" n s
27 | Some (h,code) ->
28 if !debug then log "wakeup";
29 begin try
30 let w = Hashtbl.find wakeners h in
31 Hashtbl.remove wakeners h;
32 Lwt.wakeup w code
33 with Not_found ->
34 prerr_endline "curl_lwt: orphan handle, how come?"
35 end;
36 loop (n+1)
38 loop 0
40 let on_readable fd _ =
41 if !debug then log "on_readable fd %d" (int_of_fd fd);
42 let (_:int) = M.action mt fd M.EV_IN in
43 finished "on_readable";
45 let on_writable fd _ =
46 if !debug then log "on_writable fd %d" (int_of_fd fd);
47 let (_:int) = M.action mt fd M.EV_OUT in
48 finished "on_writable";
50 let on_timer _ =
51 if !debug then log "on_timer";
52 M.action_timeout mt;
53 finished "on_timer"
55 M.set_timer_function mt begin fun timeout ->
56 if !debug then log "set timeout %d" timeout;
57 let (_:Lwt_engine.event) = Lwt_engine.on_timer (float_of_int timeout /. 1000.) false on_timer in
59 end;
60 M.set_socket_function mt begin fun fd what ->
61 if !debug then log "set socket fd %d %s" (int_of_fd fd)
62 (match what with
63 | M.POLL_NONE -> "none"
64 | M.POLL_REMOVE -> "remove"
65 | M.POLL_IN -> "in"
66 | M.POLL_OUT -> "out"
67 | M.POLL_INOUT -> "inout");
68 begin
69 try
70 List.iter Lwt_engine.stop_event (Hashtbl.find all_events fd);
71 Hashtbl.remove all_events fd;
72 if !debug then log "removed handlers for %d" (int_of_fd fd);
73 with
74 Not_found -> () (* first event for the socket - no association *)
75 end;
76 let events = match what with
77 | M.POLL_REMOVE | M.POLL_NONE -> []
78 | M.POLL_IN -> [Lwt_engine.on_readable fd (on_readable fd)]
79 | M.POLL_OUT -> [Lwt_engine.on_writable fd (on_writable fd)]
80 | M.POLL_INOUT -> [Lwt_engine.on_readable fd (on_readable fd); Lwt_engine.on_writable fd (on_writable fd)]
82 match events with
83 | [] -> ()
84 | _ -> Hashtbl.add all_events fd events;
85 end;
86 { mt; all_events; wakeners; }
88 (* lwt may not run in parallel so one global is OK'ish *)
89 let global = lazy (create ())
91 let perform h =
92 let t = Lazy.force global in
93 let (waiter,wakener) = Lwt.wait () in
94 let waiter = Lwt.protected waiter in
95 Lwt.on_cancel waiter (fun () ->
96 Curl.Multi.remove t.mt h;
97 Hashtbl.remove t.wakeners h;
99 Hashtbl.add t.wakeners h wakener;
100 M.add t.mt h;
101 waiter