debug udp
[mldonkey.git] / src / utils / net / udpSocket.ml
blobb30282e8c6ee5dbf3385672d61751a1db2287d8f
1 (* Copyright 2001, 2002 b8_bavard, b8_fee_carabine, INRIA *)
2 (*
3 This file is part of mldonkey.
5 mldonkey is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation; either version 2 of the License, or
8 (at your option) any later version.
10 mldonkey is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 GNU General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with mldonkey; if not, write to the Free Software
17 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
20 open Int64ops
21 open Printf2
22 open BasicSocket
23 open AnyEndian
24 open LittleEndian
26 let log_prefix = "[udpSock]"
28 let lprintf_nl fmt =
29 lprintf_nl2 log_prefix fmt
31 let lprintf_n fmt =
32 lprintf2 log_prefix fmt
34 type event =
35 WRITE_DONE
36 | CAN_REFILL
37 | READ_DONE
38 | BASIC_EVENT of BasicSocket.event
40 type ping = {
41 ping_ip : Ip.t;
42 mutable ping_die_time : int;
43 mutable ping_obsolete_time : int;
44 mutable ping_time : float;
47 let latencies = Hashtbl.create 2131
48 let pings_fifo = Fifo.create ()
49 let pings_hashtbl = Hashtbl.create 2131
51 let declare_ping ip =
53 try
54 let ping = Hashtbl.find pings_hashtbl ip in
55 ping.ping_obsolete_time <- 0; (* ping is void *)
56 ping.ping_die_time <- last_time () + 50
57 with _ ->
58 let ping = {
59 ping_ip = ip;
60 ping_obsolete_time = last_time () + 50;
61 ping_die_time = last_time () + 50;
62 ping_time = current_time ();
63 } in
64 Hashtbl.add pings_hashtbl ip ping;
65 Fifo.put pings_fifo (ping.ping_die_time, ping)
67 let declare_pong ip =
68 try
69 let ping = Hashtbl.find pings_hashtbl ip in
70 if ping.ping_obsolete_time > last_time () then begin
71 ping.ping_die_time <- 0;
72 let time = ping.ping_time in
73 let ip = ping.ping_ip in
74 let delay = current_time () -. time in
75 let delay = 1000. *. delay in
76 let delay = int_of_float delay in
77 let delay = if delay > 65000 then 65000 else delay in
78 try
79 let latency, samples = Hashtbl.find latencies ip in
80 incr samples;
81 if !latency > delay then latency := delay
82 with _ ->
83 Hashtbl.add latencies ip (ref delay, ref 1)
84 end
85 with _ -> ()
87 type udp_packet = {
88 udp_ping : bool;
89 udp_content: string;
90 udp_addr: Unix.sockaddr;
92 val sendto : Unix.file_descr -> string -> int -> int ->
93 Unix.msg_flag MSG_DONTWAITlist -> Unix.sockaddr -> int
97 let max_wlist_size = ref 100000
99 let local_sendto sock p =
100 if p.udp_ping then begin
101 match p.udp_addr with
102 Unix.ADDR_INET(ip, _) ->
103 let ip = Ip.of_inet_addr ip in
104 declare_ping ip
105 | _ -> ()
106 end;
107 Unix.sendto sock p.udp_content 0 (String.length p.udp_content) [] p.udp_addr
110 module PacketSet = Set.Make (struct
111 type t = int * udp_packet
112 let compare (t1,p1) (t2,p2) =
113 compare (t1, String.length p1.udp_content,p1) (t2, String.length p2.udp_content,p2)
114 end)
116 type socks_proxy = {
117 socks_proxy_address : string;
118 socks_proxy_port : int;
119 socks_proxy_user : string;
120 socks_proxy_password : string;
123 type t = {
124 mutable port : int;
125 mutable sock : BasicSocket.t;
126 mutable rlist : udp_packet list;
127 mutable wlist : PacketSet.t;
128 mutable wlist_size : int;
129 mutable event_handler : handler;
130 mutable write_controler : bandwidth_controler option;
131 mutable socks_proxy : socks_proxy option;
132 mutable socks_local : (Ip.t * int) option;
135 and bandwidth_controler = {
136 mutable sockets : t list;
137 mutable remaining_bytes : int;
138 mutable total_bytes : int;
139 mutable allow_io : bool ref;
140 mutable count : int;
141 mutable base_time : int;
142 mutable tcp_bc : TcpBufferedSocket.bandwidth_controler;
145 and handler = t -> event -> unit
148 let udp_uploaded_bytes = ref Int64.zero
149 let udp_downloaded_bytes = ref Int64.zero
151 let buf = Buffer.create 2000
152 let debug = ref false
154 let read t =
155 match t.rlist with
156 [] -> raise Not_found
157 | p :: l ->
158 t.rlist <- l;
161 let set_handler t event handler =
162 let old_handler = t.event_handler in
163 let handler t ev =
164 if ev = event then
165 handler t
166 else
167 old_handler t ev
169 t.event_handler <- handler
171 let set_refill t f =
172 set_handler t CAN_REFILL f;
173 if PacketSet.is_empty t.wlist then
174 (try f t with _ -> ())
176 let set_reader t f =
177 set_handler t READ_DONE f;
178 match t.rlist with
179 [] -> ()
180 | _ -> (try f t with _ -> ())
182 let sock t = t.sock
183 let closed t = closed t.sock
184 let close t =
185 begin
186 match t.write_controler with
187 None -> ()
188 | Some bc ->
189 bc.sockets <- List2.removeq t bc.sockets
190 end;
191 close t.sock
193 let print_addr addr =
194 begin
195 match addr with
196 Unix.ADDR_INET(ip, port) ->
197 lprintf_nl "ADDR_INET (%s, %d)" (Unix.string_of_inet_addr ip) port
198 | Unix.ADDR_UNIX s ->
199 lprintf_nl "ADDR_UNIX (%s)" s;
202 let max_delayed_send = 1
204 let write t ping s ip port =
205 lprintf_nl "UDP write %d bytes to %s:%d" (String.length s) (Ip.to_string ip) port;
206 if not (closed t) && t.wlist_size < !max_wlist_size then
207 let s, addr = match t.socks_local with
208 None -> s, Unix.ADDR_INET(Ip.to_inet_addr ip, port)
209 | Some (ip, port) ->
210 Buffer.reset buf;
211 buf_int8 buf 0;
212 buf_int8 buf 0;
213 buf_int8 buf 0;
214 buf_int8 buf 1;
215 buf_ip buf ip;
216 buf_int16 buf port;
217 Buffer.add_string buf s;
218 Buffer.contents buf, Unix.ADDR_INET(Ip.to_inet_addr ip, port)
220 match t.write_controler with
221 None ->
222 lprintf_nl "UDP no write controller";
223 if not (PacketSet.is_empty t.wlist) then
224 begin
225 let sock = sock t in
227 let len = String.length s in
229 let () =
231 if ping then declare_ping ip;
232 ignore(Unix.sendto (fd sock) s 0 len [] addr);
233 (* if !verbose_bandwidth > 1 then *)
234 begin
235 lprintf_nl "[BW2] direct send udp %d bytes (write)" len;
236 end;
237 with e ->
238 lprintf_nl "Exception in sendto %s:%d" (Ip.to_string ip) port;
239 raise e
241 udp_uploaded_bytes := !udp_uploaded_bytes ++ (Int64.of_int len);
244 lprintf_nl "UDP sent [%s]" (String.escaped
245 (String.sub s pos len));
247 with
248 Unix.Unix_error ((Unix.EWOULDBLOCK | Unix.ENOBUFS), _, _) ->
249 lprintf_nl "UDP err, queue";
250 t.wlist <- PacketSet.add (0, {
251 udp_ping = ping;
252 udp_content = s ;
253 udp_addr = addr;
254 }) t.wlist;
255 t.wlist_size <- t.wlist_size + String.length s;
256 must_write sock true;
257 | e ->
258 lprintf_nl "Exception %s in sendto"
259 (Printexc2.to_string e);
260 print_addr addr;
261 raise e
263 else begin
264 lprintf_nl "UDP queue (wlist_size %d)" t.wlist_size;
265 t.wlist <- PacketSet.add (0, {
266 udp_ping = ping;
267 udp_content = s ;
268 udp_addr = addr;
269 }) t.wlist;
270 t.wlist_size <- t.wlist_size + String.length s;
271 must_write t.sock true;
273 | Some bc ->
275 begin
276 lprintf_nl "UDP with bontroller, queue (wlist_size %d)" t.wlist_size;
277 t.wlist <- PacketSet.add (bc.base_time + max_delayed_send, {
278 udp_ping = ping;
279 udp_content = s;
280 udp_addr = addr;
281 }) t.wlist;
282 t.wlist_size <- t.wlist_size + String.length s;
283 must_write t.sock true;
285 else
286 if !debug then begin
287 lprintf_nl "UDP DROPPED in write";
290 let dummy_sock = Obj.magic 0
292 let read_buf = String.create 66000
294 let rec iter_write_no_bc t sock =
295 lprintf_nl "UDP iter_write_no_bc (wlist_size %d)" t.wlist_size;
296 let (time,p) = PacketSet.min_elt t.wlist in
297 t.wlist <- PacketSet.remove (time,p) t.wlist;
298 t.wlist_size <- t.wlist_size - String.length p.udp_content;
299 let len = String.length p.udp_content in
300 begin try
301 ignore (local_sendto (fd sock) p);
302 udp_uploaded_bytes := !udp_uploaded_bytes ++ (Int64.of_int len);
303 (* if !verbose_bandwidth > 1 then *)
304 begin
305 lprintf_nl "[BW2] direct send udp %d bytes (iter_write_no_bc)" len;
307 with
308 Unix.Unix_error ((Unix.EWOULDBLOCK | Unix.ENOBUFS), _, _) as e ->
309 lprintf_nl "Exception %s in sendto next" (Printexc2.to_string e);
310 raise e
311 | e ->
312 if !debug then
313 lprintf_nl "Exception %s in sendto next"
314 (Printexc2.to_string e)
315 end;
316 iter_write_no_bc t sock
318 let iter_write_no_bc t sock =
320 iter_write_no_bc t sock
321 with
322 Unix.Unix_error ((Unix.EWOULDBLOCK | Unix.ENOBUFS), _, _) ->
323 lprintf_nl "UDP iter_write_no_bc err, must_write";
324 must_write t.sock true
326 let rec iter_write t sock bc =
327 lprintf_nl "UDP iter_write (wlist_size %d)" t.wlist_size;
328 if bc.total_bytes = 0 || bc.remaining_bytes > 0 then
329 let _ = () in
330 let (time,p) = PacketSet.min_elt t.wlist in
331 t.wlist <- PacketSet.remove (time,p) t.wlist;
332 t.wlist_size <- t.wlist_size - String.length p.udp_content;
333 if time < bc.base_time then begin
334 if !debug then begin
335 lprintf_nl "UDP DROPPED in iter_write";
336 end;
337 iter_write t sock bc
338 end else
339 let len = String.length p.udp_content in
340 begin try
341 ignore (local_sendto (fd sock) p);
342 udp_uploaded_bytes := !udp_uploaded_bytes ++ (Int64.of_int len);
343 bc.remaining_bytes <- bc.remaining_bytes - (len +
344 !TcpBufferedSocket.ip_packet_size) ;
345 TcpBufferedSocket.register_bytes (Some bc.tcp_bc) len;
346 (* if !verbose_bandwidth > 1 then *)
347 begin
348 lprintf_nl "[BW2] bc send udp %d bytes" len;
349 end;
350 with
351 Unix.Unix_error ((Unix.EWOULDBLOCK | Unix.ENOBUFS), _, _) as e ->
352 lprintf_nl "Exception %s in sendto next" (Printexc2.to_string e);
353 raise e
354 | e ->
355 if !debug then
356 lprintf_nl "Exception %s in sendto next"
357 (Printexc2.to_string e)
358 end;
359 iter_write t sock bc
360 else
361 bc.allow_io := false
363 let iter_write t sock bc =
365 iter_write t sock bc
366 with
367 Unix.Unix_error ((Unix.EWOULDBLOCK | Unix.ENOBUFS), _, _) ->
368 must_write sock true
370 let udp_handler t sock event =
371 match event with
372 | CAN_READ ->
373 let (len, addr) = Unix.recvfrom (fd sock) read_buf 0 66000 [] in
374 let s, addr = match t.socks_proxy with
375 None -> String.sub read_buf 0 len, addr
376 | Some _ ->
377 String.sub read_buf 10 (len-10),
378 Unix.ADDR_INET(Ip.to_inet_addr (get_ip read_buf 4), get_int16 read_buf 8)
380 udp_downloaded_bytes := !udp_downloaded_bytes ++ (Int64.of_int len);
381 t.rlist <- {
382 udp_content = s;
383 udp_ping = false;
384 udp_addr = addr;
385 } :: t.rlist;
386 t.event_handler t READ_DONE
388 | CAN_WRITE ->
389 begin
390 lprintf_nl "UDP CAN_WRITE";
392 match t.write_controler with
393 None ->
394 iter_write_no_bc t sock
395 | Some bc ->
396 iter_write t sock bc
397 with Not_found ->
398 lprintf_nl "UDP CAN_WRITE Not_found must_write false";
399 must_write t.sock false
400 end;
401 lprintf_nl "UDP CAN_WRITE done";
402 if not (closed t) then begin
403 lprintf_nl "UDP run CAN_REFILL";
404 t.event_handler t CAN_REFILL;
405 if PacketSet.is_empty t.wlist then begin
406 lprintf_nl "UDP packetset empty";
407 must_write t.sock false;
408 t.event_handler t WRITE_DONE
410 end
412 | _ -> t.event_handler t (BASIC_EVENT event)
414 let create addr port handler =
415 let fd = Unix.socket Unix.PF_INET Unix.SOCK_DGRAM 0 in
416 Unix.setsockopt fd Unix.SO_REUSEADDR true;
417 Unix.bind fd (Unix.ADDR_INET ( (*Unix.inet_addr_any*) addr, port));
418 let port = match Unix.getsockname fd with
419 Unix.ADDR_INET (ip, port) -> port
420 |_ -> port in
421 let t = {
422 rlist = [];
423 wlist = PacketSet.empty;
424 wlist_size = 0;
425 sock = dummy_sock;
426 event_handler = handler;
427 write_controler = None;
428 socks_proxy= None;
429 socks_local= None;
430 port = port;
431 } in
432 let sock = BasicSocket.create "udp_socket" fd (udp_handler t) in
433 prevent_close sock;
434 t.sock <- sock;
437 let create_sendonly () = create Unix.inet_addr_any 0 (fun _ _ -> ())
439 let can_write t =
440 PacketSet.is_empty t.wlist
442 let read_packets t f =
443 List.iter (fun p ->
444 try f p with _ -> ()
445 ) t.rlist;
446 t.rlist <- []
448 let set_write_controler s c =
449 s.write_controler <- Some c;
450 c.sockets <- s :: c.sockets;
451 set_allow_write s.sock c.allow_io
453 let new_bandwidth_controler tcp_bc =
454 let udp_bc = {
455 sockets = [];
456 remaining_bytes = 0;
457 total_bytes = 0;
458 allow_io = ref false;
459 count = 0;
460 base_time = 0;
461 tcp_bc = tcp_bc;
462 } in
463 let udp_user total n =
464 if !verbose_bandwidth > 0 then
465 lprintf_nl "udp_user %d/%d" n total;
466 (* let n = if total = 0 then 100000 else n in *)
467 udp_bc.base_time <- udp_bc.base_time + 1;
468 if udp_bc.count = 0 then begin
469 udp_bc.count <- 10;
470 TcpBufferedSocket.set_lost_bytes tcp_bc udp_bc.remaining_bytes
471 udp_bc.base_time;
472 end;
473 udp_bc.count <- udp_bc.count - 1;
474 udp_bc.total_bytes <- total;
475 udp_bc.remaining_bytes <- total / 2;
476 (* udp_bc.remaining_bytes <- udp_bc.remaining_bytes + n; *)
477 if total <> 0 && udp_bc.remaining_bytes > total then
478 udp_bc.remaining_bytes <- total;
479 udp_bc.allow_io := total = 0 || udp_bc.remaining_bytes > 0;
480 if !verbose_bandwidth > 0 then
481 lprintf_nl "udp_bc count:%d total_bytes:%d remaining_bytes:%d"
482 udp_bc.count udp_bc.total_bytes udp_bc.remaining_bytes;
484 TcpBufferedSocket.set_remaining_bytes_user tcp_bc udp_user;
485 udp_bc
487 let remaining_bytes bc =
488 if bc.total_bytes = 0 then 1000000 else bc.remaining_bytes
490 let use_remaining_bytes bc n =
491 bc.remaining_bytes <- bc.remaining_bytes - n
494 let set_socks_proxy t ss =
497 Buffer.reset buf;
498 let s = read_buf in
500 t.socks_proxy <- Some ss;
502 let fd = fd t.sock in
503 Unix.clear_nonblock fd;
504 let socks_ip = Ip.from_name ss.socks_proxy_address in
505 let proxy_addr = Unix.ADDR_INET(Ip.to_inet_addr socks_ip, ss.socks_proxy_port) in
507 buf_int8 buf 5;
508 buf_int8 buf 1;
509 let auth = ss.socks_proxy_user <> "" || ss.socks_proxy_password <> "" in
510 buf_int8 buf (if auth then 2 else 0);
512 let send_and_wait () =
513 let s = Buffer.contents buf in
514 Buffer.reset buf;
515 assert (local_sendto fd s 0 (String.length s) [] proxy_addr > 0);
517 match Unix.select [fd] [] [] 30. with
518 [],_,_ -> failwith "[SOCKS] timeout"
519 | _ -> ()
521 send_and_wait ();
522 assert (fst (Unix.recvfrom fd s 0 2 []) = 2);
523 assert (s.[0] = '\005');
524 if auth then begin
525 assert (s.[1] = '\002');
527 buf_int8 buf 1;
528 buf_string8 buf ss.socks_proxy_user;
529 buf_string8 buf ss.socks_proxy_password;
530 send_and_wait ();
532 assert (fst (Unix.recvfrom fd s 0 2 []) = 2);
533 assert (s.[0] = '\005');
534 end;
535 assert (s.[1] = '\000');
537 buf_int8 buf 5;
538 buf_int8 buf 3;
539 buf_int8 buf 0;
540 buf_int8 buf 1;
541 buf_int buf 0;
542 buf_int16 buf t.port;
544 send_and_wait ();
545 assert (fst (Unix.recvfrom fd s 0 10 []) = 10);
546 assert (s.[0] = '\005');
547 assert (s.[1] = '\000');
549 let ip = get_ip s 4 in
550 let port = get_int16 s 8 in
551 t.socks_local <- Some (ip, port);
553 MlUnix.set_nonblock fd;
554 with e ->
555 lprintf_nl "[SOCKS] proxy error prevent creation of UDP socket: %s"
556 (Printexc2.to_string e);
557 close t "socks proxy error"; raise e
561 let get_latencies verbose =
562 let b = Buffer.create 300 in
563 let counter = ref 0 in
564 Hashtbl.iter (fun ip (latency, samples) ->
565 incr counter;
566 ) latencies;
567 LittleEndian.buf_int b !counter;
568 Hashtbl.iter (fun ip (latency, samples) ->
569 if !verbose then lprintf_nl " Latency UDP: %s -> %d (%d samples)" (Ip.to_string ip) !latency !samples;
570 LittleEndian.buf_ip b ip;
571 LittleEndian.buf_int16 b !latency;
572 LittleEndian.buf_int16 b !samples;
573 ) latencies;
574 Hashtbl.clear latencies;
575 Buffer.contents b
577 let _ =
578 Heap.add_memstat "udpSocket" (fun level buf ->
579 Printf.bprintf buf " %d latencies\n" (Hashtbl.length latencies);
580 Printf.bprintf buf " %d entries in pings_fifo\n" (Fifo.length pings_fifo);
581 Printf.bprintf buf " %d entries in pings_hashtbl\n" (Hashtbl.length pings_hashtbl);
584 add_infinite_timer 5. (fun _ ->
586 while true do
587 let (die_time, ping) = Fifo.head pings_fifo in
588 if die_time < last_time () then begin
589 ignore (Fifo.take pings_fifo);
590 if ping.ping_die_time > last_time () then
591 Fifo.put pings_fifo (ping.ping_die_time, ping)
592 else
593 Hashtbl.remove pings_hashtbl ping.ping_ip
594 end else raise Exit
595 done
596 with _ -> ()