1 (* Copyright 2001, 2002 b8_bavard, b8_fee_carabine, INRIA *)
3 This file is part of mldonkey.
5 mldonkey is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation; either version 2 of the License, or
8 (at your option) any later version.
10 mldonkey is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 GNU General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with mldonkey; if not, write to the Free Software
17 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
26 let log_prefix = "[udpSock]"
29 lprintf_nl2
log_prefix fmt
32 lprintf2
log_prefix fmt
38 | BASIC_EVENT
of BasicSocket.event
42 mutable ping_die_time
: int;
43 mutable ping_obsolete_time
: int;
44 mutable ping_time
: float;
47 let latencies = Hashtbl.create
2131
48 let pings_fifo = Fifo.create
()
49 let pings_hashtbl = Hashtbl.create
2131
54 let ping = Hashtbl.find
pings_hashtbl ip
in
55 ping.ping_obsolete_time
<- 0; (* ping is void *)
56 ping.ping_die_time
<- last_time
() + 50
60 ping_obsolete_time
= last_time
() + 50;
61 ping_die_time
= last_time
() + 50;
62 ping_time
= current_time
();
64 Hashtbl.add
pings_hashtbl ip
ping;
65 Fifo.put
pings_fifo (ping.ping_die_time
, ping)
69 let ping = Hashtbl.find
pings_hashtbl ip
in
70 if ping.ping_obsolete_time
> last_time
() then begin
71 ping.ping_die_time
<- 0;
72 let time = ping.ping_time
in
73 let ip = ping.ping_ip
in
74 let delay = current_time
() -. time in
75 let delay = 1000. *. delay in
76 let delay = int_of_float
delay in
77 let delay = if delay > 65000 then 65000 else delay in
79 let latency, samples
= Hashtbl.find
latencies ip in
81 if !latency > delay then latency := delay
83 Hashtbl.add
latencies ip (ref delay, ref 1)
90 udp_addr
: Unix.sockaddr
;
92 val sendto : Unix.file_descr -> string -> int -> int ->
93 Unix.msg_flag MSG_DONTWAITlist -> Unix.sockaddr -> int
97 let max_wlist_size = ref 100000
99 let local_sendto sock p
=
100 if p
.udp_ping
then begin
101 match p
.udp_addr
with
102 Unix.ADDR_INET
(ip, _
) ->
103 let ip = Ip.of_inet_addr
ip in
107 Unix.sendto sock p
.udp_content
0 (String.length p
.udp_content
) [] p
.udp_addr
110 module PacketSet
= Set.Make
(struct
111 type t
= int * udp_packet
112 let compare (t1
,p1
) (t2
,p2
) =
113 compare (t1
, String.length p1
.udp_content
,p1
) (t2
, String.length p2
.udp_content
,p2
)
117 socks_proxy_address
: string;
118 socks_proxy_port
: int;
119 socks_proxy_user
: string;
120 socks_proxy_password
: string;
125 mutable sock
: BasicSocket.t
;
126 mutable rlist
: udp_packet list
;
127 mutable wlist
: PacketSet.t
;
128 mutable wlist_size
: int;
129 mutable event_handler
: handler
;
130 mutable write_controler
: bandwidth_controler
option;
131 mutable socks_proxy
: socks_proxy
option;
132 mutable socks_local
: (Ip.t
* int) option;
135 and bandwidth_controler
= {
136 mutable sockets
: t list
;
137 mutable remaining_bytes
: int;
138 mutable total_bytes
: int;
139 mutable allow_io
: bool ref;
141 mutable base_time
: int;
142 mutable tcp_bc
: TcpBufferedSocket.bandwidth_controler
;
145 and handler
= t
-> event
-> unit
148 let udp_uploaded_bytes = ref Int64.zero
149 let udp_downloaded_bytes = ref Int64.zero
151 let buf = Buffer.create
2000
152 let debug = ref false
156 [] -> raise Not_found
161 let set_handler t event handler
=
162 let old_handler = t
.event_handler
in
169 t
.event_handler
<- handler
172 set_handler t CAN_REFILL f
;
173 if PacketSet.is_empty t
.wlist
then
174 (try f t
with _
-> ())
177 set_handler t READ_DONE f
;
180 | _
-> (try f t
with _
-> ())
183 let closed t
= closed t
.sock
186 match t
.write_controler
with
189 bc
.sockets
<- List2.removeq t bc
.sockets
193 let print_addr addr
=
196 Unix.ADDR_INET
(ip, port
) ->
197 lprintf_nl "ADDR_INET (%s, %d)" (Unix.string_of_inet_addr
ip) port
198 | Unix.ADDR_UNIX s
->
199 lprintf_nl "ADDR_UNIX (%s)" s
;
202 let max_delayed_send = 1
204 let write t
ping s
ip port
=
205 lprintf_nl "UDP write %d bytes to %s:%d" (String.length s
) (Ip.to_string
ip) port
;
206 if not
(closed t
) && t
.wlist_size
< !max_wlist_size then
207 let s, addr
= match t
.socks_local
with
208 None
-> s, Unix.ADDR_INET
(Ip.to_inet_addr
ip, port
)
217 Buffer.add_string
buf s;
218 Buffer.contents
buf, Unix.ADDR_INET
(Ip.to_inet_addr
ip, port
)
220 match t
.write_controler
with
222 lprintf_nl "UDP no write controller";
223 if not
(PacketSet.is_empty t
.wlist
) then
227 let len = String.length
s in
231 if ping then declare_ping ip;
232 ignore
(Unix.sendto
(fd
sock) s 0 len [] addr
);
233 (* if !verbose_bandwidth > 1 then *)
235 lprintf_nl "[BW2] direct send udp %d bytes (write)" len;
238 lprintf_nl "Exception in sendto %s:%d" (Ip.to_string
ip) port
;
241 udp_uploaded_bytes := !udp_uploaded_bytes ++ (Int64.of_int
len);
244 lprintf_nl "UDP sent [%s]" (String.escaped
245 (String.sub s pos len));
248 Unix.Unix_error
((Unix.EWOULDBLOCK
| Unix.ENOBUFS
), _
, _
) ->
249 lprintf_nl "UDP err, queue";
250 t
.wlist
<- PacketSet.add
(0, {
255 t
.wlist_size
<- t
.wlist_size
+ String.length
s;
256 must_write
sock true;
258 lprintf_nl "Exception %s in sendto"
259 (Printexc2.to_string e
);
264 lprintf_nl "UDP queue (wlist_size %d)" t
.wlist_size
;
265 t
.wlist
<- PacketSet.add
(0, {
270 t
.wlist_size
<- t
.wlist_size
+ String.length
s;
271 must_write t
.sock true;
276 lprintf_nl "UDP with bontroller, queue (wlist_size %d)" t
.wlist_size
;
277 t
.wlist
<- PacketSet.add
(bc
.base_time
+ max_delayed_send, {
282 t
.wlist_size
<- t
.wlist_size
+ String.length
s;
283 must_write t
.sock true;
287 lprintf_nl "UDP DROPPED in write";
290 let dummy_sock = Obj.magic
0
292 let read_buf = String.create
66000
294 let rec iter_write_no_bc t
sock =
295 lprintf_nl "UDP iter_write_no_bc (wlist_size %d)" t
.wlist_size
;
296 let (time,p
) = PacketSet.min_elt t
.wlist
in
297 t
.wlist
<- PacketSet.remove
(time,p
) t
.wlist
;
298 t
.wlist_size
<- t
.wlist_size
- String.length p
.udp_content
;
299 let len = String.length p
.udp_content
in
301 ignore
(local_sendto (fd
sock) p
);
302 udp_uploaded_bytes := !udp_uploaded_bytes ++ (Int64.of_int
len);
303 (* if !verbose_bandwidth > 1 then *)
305 lprintf_nl "[BW2] direct send udp %d bytes (iter_write_no_bc)" len;
308 Unix.Unix_error
((Unix.EWOULDBLOCK
| Unix.ENOBUFS
), _
, _
) as e
->
309 lprintf_nl "Exception %s in sendto next" (Printexc2.to_string e
);
313 lprintf_nl "Exception %s in sendto next"
314 (Printexc2.to_string e
)
316 iter_write_no_bc t
sock
318 let iter_write_no_bc t
sock =
320 iter_write_no_bc t
sock
322 Unix.Unix_error
((Unix.EWOULDBLOCK
| Unix.ENOBUFS
), _
, _
) ->
323 lprintf_nl "UDP iter_write_no_bc err, must_write";
324 must_write t
.sock true
326 let rec iter_write t
sock bc
=
327 lprintf_nl "UDP iter_write (wlist_size %d)" t
.wlist_size
;
328 if bc
.total_bytes
= 0 || bc
.remaining_bytes
> 0 then
330 let (time,p
) = PacketSet.min_elt t
.wlist
in
331 t
.wlist
<- PacketSet.remove
(time,p
) t
.wlist
;
332 t
.wlist_size
<- t
.wlist_size
- String.length p
.udp_content
;
333 if time < bc
.base_time
then begin
335 lprintf_nl "UDP DROPPED in iter_write";
339 let len = String.length p
.udp_content
in
341 ignore
(local_sendto (fd
sock) p
);
342 udp_uploaded_bytes := !udp_uploaded_bytes ++ (Int64.of_int
len);
343 bc
.remaining_bytes
<- bc
.remaining_bytes
- (len +
344 !TcpBufferedSocket.ip_packet_size
) ;
345 TcpBufferedSocket.register_bytes
(Some bc
.tcp_bc
) len;
346 (* if !verbose_bandwidth > 1 then *)
348 lprintf_nl "[BW2] bc send udp %d bytes" len;
351 Unix.Unix_error
((Unix.EWOULDBLOCK
| Unix.ENOBUFS
), _, _) as e
->
352 lprintf_nl "Exception %s in sendto next" (Printexc2.to_string e
);
356 lprintf_nl "Exception %s in sendto next"
357 (Printexc2.to_string e
)
363 let iter_write t
sock bc
=
367 Unix.Unix_error
((Unix.EWOULDBLOCK
| Unix.ENOBUFS
), _, _) ->
370 let udp_handler t
sock event
=
373 let (len, addr
) = Unix.recvfrom
(fd
sock) read_buf 0 66000 [] in
374 let s, addr
= match t
.socks_proxy
with
375 None
-> String.sub
read_buf 0 len, addr
377 String.sub
read_buf 10 (len-10),
378 Unix.ADDR_INET
(Ip.to_inet_addr
(get_ip
read_buf 4), get_int16
read_buf 8)
380 udp_downloaded_bytes := !udp_downloaded_bytes ++ (Int64.of_int
len);
386 t
.event_handler t READ_DONE
390 lprintf_nl "UDP CAN_WRITE";
392 match t
.write_controler
with
394 iter_write_no_bc t
sock
398 lprintf_nl "UDP CAN_WRITE Not_found must_write false";
399 must_write t
.sock false
401 lprintf_nl "UDP CAN_WRITE done";
402 if not
(closed t
) then begin
403 lprintf_nl "UDP run CAN_REFILL";
404 t
.event_handler t CAN_REFILL
;
405 if PacketSet.is_empty t
.wlist
then begin
406 lprintf_nl "UDP packetset empty";
407 must_write t
.sock false;
408 t
.event_handler t WRITE_DONE
412 | _ -> t
.event_handler t
(BASIC_EVENT event
)
414 let create addr port
handler =
415 let fd = Unix.socket
Unix.PF_INET
Unix.SOCK_DGRAM
0 in
416 Unix.setsockopt
fd Unix.SO_REUSEADDR
true;
417 Unix.bind
fd (Unix.ADDR_INET
( (*Unix.inet_addr_any*) addr
, port
));
418 let port = match Unix.getsockname
fd with
419 Unix.ADDR_INET
(ip, port) -> port
423 wlist
= PacketSet.empty
;
426 event_handler
= handler;
427 write_controler
= None
;
432 let sock = BasicSocket.create "udp_socket" fd (udp_handler t) in
437 let create_sendonly () = create Unix.inet_addr_any
0 (fun _ _ -> ())
440 PacketSet.is_empty
t.wlist
442 let read_packets t f
=
448 let set_write_controler s c
=
449 s.write_controler
<- Some c
;
450 c
.sockets
<- s :: c
.sockets
;
451 set_allow_write
s.sock c
.allow_io
453 let new_bandwidth_controler tcp_bc
=
458 allow_io
= ref false;
463 let udp_user total n
=
464 if !verbose_bandwidth
> 0 then
465 lprintf_nl "udp_user %d/%d" n total
;
466 (* let n = if total = 0 then 100000 else n in *)
467 udp_bc.base_time
<- udp_bc.base_time
+ 1;
468 if udp_bc.count
= 0 then begin
470 TcpBufferedSocket.set_lost_bytes tcp_bc
udp_bc.remaining_bytes
473 udp_bc.count
<- udp_bc.count
- 1;
474 udp_bc.total_bytes
<- total
;
475 udp_bc.remaining_bytes
<- total
/ 2;
476 (* udp_bc.remaining_bytes <- udp_bc.remaining_bytes + n; *)
477 if total
<> 0 && udp_bc.remaining_bytes
> total
then
478 udp_bc.remaining_bytes
<- total
;
479 udp_bc.allow_io
:= total
= 0 || udp_bc.remaining_bytes
> 0;
480 if !verbose_bandwidth
> 0 then
481 lprintf_nl "udp_bc count:%d total_bytes:%d remaining_bytes:%d"
482 udp_bc.count
udp_bc.total_bytes
udp_bc.remaining_bytes
;
484 TcpBufferedSocket.set_remaining_bytes_user tcp_bc
udp_user;
487 let remaining_bytes bc
=
488 if bc
.total_bytes
= 0 then 1000000 else bc
.remaining_bytes
490 let use_remaining_bytes bc
n =
491 bc
.remaining_bytes <- bc
.remaining_bytes - n
494 let set_socks_proxy t ss =
500 t.socks_proxy <- Some ss;
502 let fd = fd t.sock in
503 Unix.clear_nonblock fd;
504 let socks_ip = Ip.from_name ss.socks_proxy_address in
505 let proxy_addr = Unix.ADDR_INET(Ip.to_inet_addr socks_ip, ss.socks_proxy_port) in
509 let auth = ss.socks_proxy_user <> "" || ss.socks_proxy_password <> "" in
510 buf_int8 buf (if auth then 2 else 0);
512 let send_and_wait () =
513 let s = Buffer.contents buf in
515 assert (local_sendto fd s 0 (String.length s) [] proxy_addr > 0);
517 match Unix.select [fd] [] [] 30. with
518 [],_,_ -> failwith "[SOCKS] timeout"
522 assert (fst (Unix.recvfrom fd s 0 2 []) = 2);
523 assert (s.[0] = '\005');
525 assert (s.[1] = '\002');
528 buf_string8 buf ss.socks_proxy_user;
529 buf_string8 buf ss.socks_proxy_password;
532 assert (fst (Unix.recvfrom fd s 0 2 []) = 2);
533 assert (s.[0] = '\005');
535 assert (s.[1] = '\000');
542 buf_int16 buf t.port;
545 assert (fst (Unix.recvfrom fd s 0 10 []) = 10);
546 assert (s.[0] = '\005');
547 assert (s.[1] = '\000');
549 let ip = get_ip s 4 in
550 let port = get_int16 s 8 in
551 t.socks_local <- Some (ip, port);
553 MlUnix.set_nonblock fd;
555 lprintf_nl "[SOCKS] proxy error prevent creation of UDP socket: %s"
556 (Printexc2.to_string e);
557 close t "socks proxy error"; raise e
561 let get_latencies verbose
=
562 let b = Buffer.create 300 in
563 let counter = ref 0 in
564 Hashtbl.iter
(fun ip (latency, samples
) ->
567 LittleEndian.buf_int
b !counter;
568 Hashtbl.iter
(fun ip (latency, samples
) ->
569 if !verbose
then lprintf_nl " Latency UDP: %s -> %d (%d samples)" (Ip.to_string
ip) !latency !samples
;
570 LittleEndian.buf_ip
b ip;
571 LittleEndian.buf_int16
b !latency;
572 LittleEndian.buf_int16
b !samples
;
574 Hashtbl.clear
latencies;
578 Heap.add_memstat
"udpSocket" (fun level
buf ->
579 Printf.bprintf
buf " %d latencies\n" (Hashtbl.length
latencies);
580 Printf.bprintf
buf " %d entries in pings_fifo\n" (Fifo.length
pings_fifo);
581 Printf.bprintf
buf " %d entries in pings_hashtbl\n" (Hashtbl.length
pings_hashtbl);
584 add_infinite_timer
5. (fun _ ->
587 let (die_time
, ping) = Fifo.head
pings_fifo in
588 if die_time
< last_time
() then begin
589 ignore
(Fifo.take
pings_fifo);
590 if ping.ping_die_time
> last_time
() then
591 Fifo.put
pings_fifo (ping.ping_die_time
, ping)
593 Hashtbl.remove
pings_hashtbl ping.ping_ip