1 (* Copyright 2001, 2002 b8_bavard, b8_fee_carabine, INRIA *)
3 This file is part of mldonkey.
5 mldonkey is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation; either version 2 of the License, or
8 (at your option) any later version.
10 mldonkey is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 GNU General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with mldonkey; if not, write to the Free Software
17 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
25 let latencies = Hashtbl.create
2131
27 let max_opened_connections = ref (fun () -> 20)
28 let max_connections_per_second = ref (fun () -> 50)
30 let opened_connections = ref 0
31 let opened_connections_this_second = ref 0
33 let max_buffer_size = ref 50000
35 let bind_address = ref Unix.inet_addr_any
36 let ip_packet_size = ref 40
37 let mtu_packet_size = ref 1500
38 let minimal_packet_size = ref 600
39 let packet_frame_size = 1
41 let proc_net_fs = ref true
43 let tcp_uploaded_bytes = ref Int64.zero
44 let tcp_downloaded_bytes = ref Int64.zero
48 (*************************************************************************)
52 (*************************************************************************)
60 | BASIC_EVENT
of BasicSocket.event
62 let string_of_event = function
63 | CONNECTED
-> "CONNECTED"
64 | WRITE_DONE
-> "WRITE_DONE"
65 | CAN_REFILL
-> "CAN_REFILL"
66 | BUFFER_OVERFLOW
-> "BUFFER_OVERFLOW"
67 | READ_DONE n
-> Printf.sprintf
"READ_DONE %d" n
68 | BASIC_EVENT e
-> string_of_basic_event e
71 mutable token_used
: bool;
72 connection_manager
: connection_manager
;
75 and connection_manager
= {
77 mutable nestablished_connections
: int;
78 mutable nwaiting_connections
: int;
79 mutable nconnections_last_second
: int;
80 waiting_connections
: (token
* (token
-> unit)) Fifo.t
;
87 mutable max_buf_size
: int;
88 mutable min_buf_size
: int
93 mutable closing
: bool;
94 mutable sock_in
: BasicSocket.t
;
95 mutable sock_out
: BasicSocket.t
;
98 mutable event_handler
: handler
;
99 mutable error
: close_reason
;
101 mutable ndown_packets
: int;
102 mutable nwrite
: int;
103 mutable nup_packets
: int;
104 mutable monitored
: bool;
106 mutable read_control
: bandwidth_controler
option;
107 mutable write_control
: bandwidth_controler
option;
108 mutable write_power
: int;
109 mutable read_power
: int;
111 mutable peer_addr
: (Ip.t
* int) option;
112 mutable my_ip
: Ip.t
;
114 mutable noproxy
: bool;
115 mutable connecting
: bool;
117 mutable connect_time
: float;
119 mutable token
: token
;
121 mutable compression
: (
124 buf
* (* read buffer after decompression *)
125 buf
(* write buffer before decompression *)
130 and handler
= t
-> event
-> unit
132 and bandwidth_controler
= {
134 mutable remaining_bytes
: int;
135 mutable total_bytes
: int;
136 mutable nconnections
: int;
137 mutable connections
: t list
;
139 mutable remaining_bytes_user
: ((* total *) int -> (* remaining *) int -> unit);
140 mutable moved_bytes
: int64
;
141 mutable lost_bytes
: int array
; (* 3600 samples*)
142 mutable forecast_bytes
: int;
144 mutable ndone_last_second
: int;
148 (*************************************************************************)
150 (* Connections Managers *)
152 (*************************************************************************)
154 let connection_managers = ref []
156 let create_connection_manager name
=
159 nestablished_connections
= 0;
160 nwaiting_connections
= 0;
161 waiting_connections
= Fifo.create
();
162 nconnections_last_second
= 0;
165 connection_managers := manager :: !connection_managers;
168 let create_token manager = {
170 connection_manager
= manager;
173 let add_pending_connection manager f
=
174 let token = create_token manager in
175 Fifo.put
manager.waiting_connections
(token, f
);
176 manager.nwaiting_connections
<- manager.nwaiting_connections
+ 1;
179 let can_open_connection manager =
180 manager.nestablished_connections
+
181 manager.nwaiting_connections
< !max_opened_connections ()
185 This scheduler does not use the already established connections.
188 let schedule_connections () =
189 let max_wanted = !max_opened_connections () in
190 let max_connections_per_second = !max_connections_per_second () in
192 let rec iter todo_managers done_managers
=
195 lprintf "todo_managers %d done_managers %d\n"
196 (List.length todo_managers) (List.length done_managers);
198 match todo_managers
with
200 match done_managers
with
202 | _
-> iter done_managers
[]
206 lprintf "!opened_connections_this_second %d < max_connections_per_second %d\n" !opened_connections_this_second max_connections_per_second;
207 lprintf "&& !opened_connections %d < max_wanted %d\n"
208 !opened_connections max_wanted;
210 if !opened_connections_this_second < max_connections_per_second
211 && !opened_connections < max_wanted then
215 Fifo.take
manager.waiting_connections
in
216 manager.nwaiting_connections
<-
217 manager.nwaiting_connections
- 1;
218 if not
token.token_used
then begin
221 (* prevent in any case the token from being used later *)
222 token.token_used
<- true;
226 iter tail
(manager :: done_managers
)
228 iter tail done_managers
230 iter !connection_managers []
232 let cancel_token token =
233 token.token_used
<- true
235 let used_token token = token.token_used
237 let unlimited_connection_manager = create_connection_manager "Unlimited"
239 let reset_connection_scheduler _
=
240 if !verbose_bandwidth
> 0 then
241 lprintf_nl
"[BW1 %6d] Connections opened this second : %d/%d total %d/%d"
243 !opened_connections_this_second
244 (!max_connections_per_second ())
246 (!max_opened_connections ());
249 if !verbose_bandwidth
> 0 then begin
250 if cm
.nconnections_last_second
> 0 then
251 lprintf_nl
"[BW1 %6d] %s opened %d connections last second"
252 (last_time
()) cm
.cm_name cm
.nconnections_last_second
;
253 if cm
.nwaiting_connections
> 0 then
254 lprintf_nl
"[BW1 %6d] %s still waits for %d connections"
255 (last_time
()) cm
.cm_name cm
.nwaiting_connections
;
257 cm
.nconnections_last_second
<- 0;
258 ) !connection_managers;
260 opened_connections_this_second := 0
262 let use_token token fd
=
263 if token.token_used
then begin
264 (try Unix.close fd
with _
-> ());
265 failwith
"Token already used";
267 token.token_used
<- true;
268 token.connection_manager
.nestablished_connections
<-
269 token.connection_manager
.nestablished_connections
+ 1;
270 incr
opened_connections;
271 incr
opened_connections_this_second
273 (*************************************************************************)
275 (* Bandwidth Consumers *)
277 (*************************************************************************)
279 let add_connect_latency ip time
=
280 if ip
<> Ip.null
&& time
> 1. then
281 let delay = current_time
() -. time
in
282 let delayf = 1000. *. delay in
283 let delay = int_of_float
delayf in
284 let delay = if delay > 65000 then 65000 else delay in
286 lprintf "add_connect_latency %s -> %d (%f)\n"
287 (Ip.to_string ip) delay delayf;
290 let latency, samples
= Hashtbl.find
latencies ip
in
292 if !latency > delay then latency := delay
294 Hashtbl.add
latencies ip
(ref delay, ref 1)
296 let forecast_bytes t nbytes
=
300 let nip_packets = 1 + nbytes
/ !mtu_packet_size in
301 let nbytes = nbytes + nip_packets * !ip_packet_size in
302 let nframes = 1 + nbytes / packet_frame_size in
303 bc
.forecast_bytes <- bc
.forecast_bytes +
304 (nframes * packet_frame_size)
306 let register_bytes t
nbytes =
310 let nip_packets = 1 + nbytes / !mtu_packet_size in
311 let nbytes = nbytes + nip_packets * !ip_packet_size in
312 let nframes = 1 + nbytes / packet_frame_size in
313 bc
.remaining_bytes
<- bc
.remaining_bytes
-
314 (nframes * packet_frame_size)
316 let forecast_download t n
=
317 forecast_bytes t
.read_control n
319 let forecast_upload t n
=
320 forecast_bytes t
.write_control n
322 let register_download t n
=
323 register_bytes t
.read_control n
325 let register_upload t n
=
326 register_bytes t
.write_control n
328 let accept_connection_bandwidth t
=
329 register_download t
0;
331 forecast_download t
0
333 let best_packet_size nbytes =
334 let nbytes = max
nbytes !minimal_packet_size in
335 let nip_packets = 1 + nbytes / !mtu_packet_size in
336 let headers = nip_packets * !ip_packet_size in
337 let nframes = 1 + (nbytes + headers) / packet_frame_size in
338 nframes * packet_frame_size - headers
340 (*************************************************************************)
342 (* Buffers management *)
344 (*************************************************************************)
346 let copy_read_buffer = ref true
348 let big_buffer_len = 65536
349 let big_buffer = String.create
big_buffer_len
351 let min_buffer_read = 2000
352 let min_read_size = min_buffer_read - 100
354 let old_strings_size = 20
355 let old_strings = Array.create
old_strings_size ""
356 let old_strings_len = ref 0
359 if !old_strings_len > 0 then begin
360 decr
old_strings_len;
361 let s = old_strings.(!old_strings_len) in
362 old_strings.(!old_strings_len) <- "";
365 String.create
min_buffer_read
367 let delete_string s =
368 if !old_strings_len < old_strings_size &&
369 String.length
s = min_buffer_read then begin
370 old_strings.(!old_strings_len) <- s;
371 incr
old_strings_len;
380 min_buf_size
= min_read_size;
384 let buf_used b nused
=
385 if nused
= b
.len
then
392 (b
.len
<- b
.len
- nused
; b
.pos
<- b
.pos
+ nused
)
395 (String.length t
.rbuf
.buf
),
396 (String.length t
.wbuf
.buf
)
398 (*************************************************************************)
402 (*************************************************************************)
404 let buf_add t b
s pos1 len
=
405 let curpos = b
.pos
+ b
.len
in
409 b
.buf
<- new_string ();
412 String.length b
.buf
in
413 if max_len - curpos < len
then (* resize before blit *)
414 if b
.len
+ len
< max_len then (* just move to 0 *)
416 String.blit b
.buf b
.pos b
.buf
0 b
.len
;
417 String.blit
s pos1 b
.buf b
.len len
;
418 b
.len
<- b
.len
+ len
;
422 if b
.len
+ len
> b
.max_buf_size
then begin
423 lprintf
"[TCP_BS]: BUFFER OVERFLOW %d+%d> %d " b
.len len b
.max_buf_size
;
425 lprintf
"MESSAGE: [";
426 for i
= pos1
to pos1
+ (min len
20) - 1 do
427 lprintf
"(%d)" (int_of_char
s.[i
]);
429 if len
> 20 then lprintf
"...";
432 t
.event_handler t BUFFER_OVERFLOW
;
433 (* TODO: why do we have this ??? in case of BUFFER_OVERFLOW, just close the
437 let new_len = min
(max
(2 * max_len) (b
.len
+ len
)) b
.max_buf_size
in
438 (* if t.monitored then
439 (lprintf "Allocate new for %d\n" len; ); *)
440 let new_buf = String.create
new_len in
441 String.blit b
.buf b
.pos
new_buf 0 b
.len
;
442 String.blit
s pos1
new_buf b
.len len
;
443 b
.len
<- b
.len
+ len
;
445 if max_len = min_buffer_read then delete_string b
.buf
;
446 (* if t.monitored then
447 (lprintf "new buffer allocated\n"; ); *)
450 String.blit
s pos1 b
.buf
curpos len
;
454 (*************************************************************************)
456 (* Sockets management *)
458 (*************************************************************************)
461 let setsock_iptos_throughput t
=
462 ignore
(setsock_iptos_throughput (fd
(t
.sock_in
)));
463 ignore
(setsock_iptos_throughput (fd
(t
.sock_out
)))
465 let closed t
= closed t
.sock_out
466 let error t
= t
.error
467 let sock_used t nused
= buf_used t
.rbuf nused
468 let remaining_to_write t
= t
.wbuf
.len
469 let nread t
= t
.nread
470 let nwritten t
= t
.nwrite
471 let can_write t
= t
.wbuf
.len
= 0
472 let can_write_len t len
=
473 let b = t
.wbuf
.max_buf_size
> t
.wbuf
.len
+ len
in
475 lprintf "can_write_len failed: %d < %d + %d\n"
476 t.wbuf.max_buf_size t.wbuf.len len; *)
478 let not_buffer_more t max
= t
.wbuf
.len
< max
479 let get_rtimeout t
= get_rtimeout t
.sock_in
480 let max_refill t
= t
.wbuf
.max_buf_size
- t
.wbuf
.len
484 if t.monitored then begin
485 lprintf "close with %s %s\n" t.error s;
488 if not t
.closing
then
491 t
.token.connection_manager
.nestablished_connections
<-
492 t
.token.connection_manager
.nestablished_connections
- 1;
493 decr
opened_connections;
495 delete_string t
.rbuf
.buf;
496 delete_string t
.wbuf
.buf;
499 if t
.nread > 0 then begin
501 forecast_download t
0;
504 if t
.sock_in
!= t
.sock_out
then
506 (* (Printf.sprintf "%s after %d/%d" s t.nread t.nwrite) *)
508 lprintf
"Exception %s in TcpBufferedSocket.close\n"
509 (Printexc2.to_string e
);
515 if t.monitored then begin
516 lprintf "shutdown\n";
520 BasicSocket.shutdown t
.sock_out
s;
521 if t
.sock_in
!= t
.sock_out
then
522 BasicSocket.shutdown t
.sock_in
s;
524 lprintf
"exception %s in shutdown\n" (Printexc2.to_string e
);
526 (try close t
s with e
->
527 lprintf
"exception %s in shutdown\n" (Printexc2.to_string e
);
530 (*************************************************************************)
534 (*************************************************************************)
536 let write t
s pos1 len
=
537 (* lprintf "want_write %d\n" len; *)
538 if len
> 0 && not
(closed t
) then
539 let pos2 = pos1
+ len
in
542 if b.len
= 0 && not t
.connecting
&& (match t
.write_control
with
544 (* lprintf "NO CONTROL\n"; *)
547 (* lprintf "LIMIT %d\n" bc.total_bytes; *)
551 let fd = fd t
.sock_out
in
552 (* lprintf "WRITE [%s]\n" (String.escaped (String.sub s pos1 len)); *)
553 let nw = MlUnix.write fd s pos1 len
in
554 if !verbose_bandwidth
> 1 then
555 lprintf_nl
"[BW2 %6d] immediate write %d/%d on %s:%d"
556 (last_time
()) nw len t
.name
(sock_num t
.sock_out
);
558 register_upload t len
;
559 forecast_download t
0;
561 (* if t.monitored then begin
562 lprintf "write: direct written %d\n" nw;
564 tcp_uploaded_bytes := !tcp_uploaded_bytes ++ (Int64.of_int
nw);
565 (match t
.write_control
with
568 bc
.moved_bytes
<- bc
.moved_bytes
++ (Int64.of_int
nw));
569 if t
.nwrite
= 0 then begin
570 (* if t.connecting then
571 lprintf "WRITE BEFORE CONNECTION.......\n";
572 lprintf "add_connect_latency at %f\n" (current_time ()); *)
573 add_connect_latency t
.host t
.connect_time
;
576 t
.nwrite
<- t
.nwrite
+ nw;
577 if nw = 0 then (close t Closed_by_peer
; pos2) else
580 Unix.Unix_error
((Unix.EWOULDBLOCK
| Unix.EAGAIN
| Unix.ENOTCONN
), _
, _
) -> pos1
582 t
.error <- Closed_for_error
(Printf.sprintf
"Write Error: %s" (Printexc2.to_string e
));
585 (* lprintf "exce %s in read\n" (Printexc2.to_string e); *)
591 let sock = t
.sock_out
in
592 must_write
sock true;
593 buf_add t
b s pos1 (pos2 - pos1)
595 (*************************************************************************)
597 (* can_read_handler *)
599 (*************************************************************************)
601 let dummy_sock = Obj.magic
0
604 (* max_len is the maximal length we authorized to read, min_read_size
605 is the minimal size we authorize to read *)
607 let can_read_handler t
sock max_len =
608 (* let max_len = 100000 in (* REMOVE THIS: don't care about bw *) *)
610 let curpos = b.pos
+ b.len
in
611 (* lprintf "curpos %d/%d\n" curpos b.len; *)
612 let buffer, buffer_pos
, buffer_len
=
613 if !copy_read_buffer then
614 big_buffer, 0, big_buffer_len
616 let can_write_in_buffer =
618 if b.min_buf_size
<= min_buffer_read then begin
619 b.buf <- new_string ();
622 b.buf <- String.create
b.min_buf_size
;
626 let buf_len = String.length
b.buf in
627 if buf_len - curpos < min_read_size then
628 if b.len
+ min_read_size > b.max_buf_size
then
630 t
.event_handler t BUFFER_OVERFLOW
;
631 lprintf
"[OVERFLOW] in %s" (info
sock);
632 close t Closed_for_overflow
;
636 if b.len
+ min_read_size < buf_len then
638 String.blit
b.buf b.pos
b.buf 0 b.len
;
645 (2 * buf_len) (b.len
+ min_read_size)) b.max_buf_size
647 let new_buf = String.create
new_len in
648 String.blit
b.buf b.pos
new_buf 0 b.len
;
655 b.buf, b.pos
+b.len
, can_write_in_buffer
657 let can_read = min
max_len buffer_len
in
659 let old_len = b.len
in
661 (* lprintf "{can read %d} --> " can_read; *)
662 (* lprintf "Unix.read %d/%d/%d\n" (String.length b.buf) (b.pos + b.len) can_read; *)
663 Unix.read
(fd sock) buffer buffer_pos
can_read;
667 Unix.Unix_error
((Unix.EWOULDBLOCK
| Unix.EAGAIN
), _
,_
) as e
-> raise e
669 t
.error <- Closed_for_error
(Printf.sprintf
"Can Read Error: %s" (Printexc2.to_string e
));
672 (* lprintf "exce %s in read\n" (Printexc2.to_string e); *)
678 if nread = can_read then begin
679 lprintf "READ LIMITED BY BW CONTROL: %d\n" nread;
683 if !verbose_bandwidth
> 1 then
684 lprintf_nl
"[BW2 %6d] %sread %d/%d/%d on %s:%d" (last_time
())
685 (if old_len > 0 then "completing " else "") nread can_read max_len
686 t
.name
(sock_num t
.sock_in
);
689 if !copy_read_buffer then
690 buf_add t
b big_buffer 0 nread
692 b.len
<- b.len
+ nread;
693 (* lprintf " %d\n" nread; *)
694 b.min_buf_size
<- min
b.max_buf_size
(
695 max
(nread + nread / 2) min_read_size);
698 if nread = can_read then begin
699 lprintf "Unix.read: read limited: %d\n" nread;
700 lprintf " given to handler: %d\n" max_len;
701 lprintf " given by buffer: %d\n" buffer_len;
705 tcp_downloaded_bytes := !tcp_downloaded_bytes ++ (Int64.of_int
nread);
706 (match t
.read_control
with
707 None
-> () | Some bc
->
708 bc
.moved_bytes
<- bc
.moved_bytes
++ (Int64.of_int
nread));
710 t
.nread <- t
.nread + nread;
711 if nread > 0 then begin
712 register_download t
nread;
715 if nread = 0 then begin
716 close t Closed_by_peer
;
720 (* if t.monitored then
721 (lprintf "event handler READ DONE\n"; ); *)
722 t
.event_handler t
(READ_DONE
nread);
725 (* if t.monitored then
726 (lprintf "Exception in READ DONE\n"; ); *)
727 t
.error <- Closed_for_error
(Printf.sprintf
"READ_DONE Error: %s" (Printexc2.to_string e
));
730 (* lprintf "exce %s in read\n" (Printexc2.to_string e); *)
734 (*************************************************************************)
736 (* can_write_handler *)
738 (*************************************************************************)
740 let can_write_handler t
sock max_len =
741 (* if t.monitored then (
742 lprintf "CAN_WRITE (%d)\n" t.wbuf.len;
745 if not t
.connecting
then (
749 (* lprintf "try write %d/%d\n" max_len t.wbuf.len; *)
751 (* lprintf "WRITE [%s]\n" (String.escaped
752 (String.sub b.buf b.pos max_len)); *)
753 let nw = MlUnix.write fd b.buf b.pos
max_len in
754 if !verbose_bandwidth
> 1 then
755 lprintf_nl
"[BW2 %6d] postponed %swrite %d/%d/%d on %s:%d"
756 (last_time
()) (if max_len < b.len
then "partial " else "")
757 nw max_len b.len t
.name
(sock_num t
.sock_out
);
759 (* if t.monitored then
760 (lprintf "written %d\n" nw; ); *)
761 tcp_uploaded_bytes := !tcp_uploaded_bytes ++ (Int64.of_int
nw);
762 (match t
.write_control
with
765 bc
.moved_bytes
<- bc
.moved_bytes
++ (Int64.of_int
nw));
766 if t
.nwrite
= 0 then begin
767 (* lprintf "add_connect_latency at %f\n" (current_time ()); *)
768 add_connect_latency t
.host t
.connect_time
;
770 t
.nwrite
<- t
.nwrite
+ nw;
773 if nw = 0 then close t Closed_by_peer
else
774 if b.len
= 0 then begin
780 Unix.Unix_error
((Unix.EWOULDBLOCK
| Unix.EAGAIN
), _
,_
) as e
-> raise e
782 t
.error <- Closed_for_error
(Printf.sprintf
"Can Write Error: %s" (Printexc2.to_string e
));
785 (* lprintf "exce %s in read\n" (Printexc2.to_string e); *)
789 if not
(closed t
) then begin
790 t
.event_handler t CAN_REFILL
;
791 if b.len
= 0 then begin
794 must_write t
.sock_out
false;
795 t
.event_handler t WRITE_DONE
800 (*************************************************************************)
804 (*************************************************************************)
806 let tcp_handler_write t
sock =
808 match t
.write_control
with
810 can_write_handler t
sock t
.wbuf
.len
812 if bc
.total_bytes
= 0 then
813 can_write_handler t
sock t
.wbuf
.len
815 bc
.connections
<- t
:: bc
.connections
;
816 bc
.nconnections
<- t
.write_power
+ bc
.nconnections
820 let get_latencies verbose
=
821 let b = Buffer.create
300 in
822 let counter = ref 0 in
823 Hashtbl.iter (fun ip
(latency, samples
) ->
826 LittleEndian.buf_int
b !counter;
827 Hashtbl.iter (fun ip
(latency, samples
) ->
828 if !verbose
then lprintf
" Latency TCP: %s -> %d (%d samples)\n" (Ip.to_string ip
) !latency !samples
;
829 LittleEndian.buf_ip
b ip
;
830 LittleEndian.buf_int16
b !latency;
831 LittleEndian.buf_int16
b !samples
;
833 Hashtbl.clear
latencies;
836 let tcp_handler t event
=
839 (* lprintf "CAN_READ\n"; *)
841 match t
.read_control
with
843 can_read_handler t t
.sock_in
1000000
845 if bc
.total_bytes
= 0 then
846 can_read_handler t t
.sock_in
1000000
848 (* lprintf "DELAYED\n"; *)
849 if bc
.remaining_bytes
> 0 then
851 bc
.connections
<- t
:: bc
.connections
;
852 bc
.nconnections
<- t
.read_power
+ bc
.nconnections
857 (* lprintf "CAN_WRITE\n"; *)
859 if t
.nwrite
= 0 && t
.noproxy
&& t
.connecting
then begin
860 t
.connecting
<- false;
861 t
.event_handler t CONNECTED
;
865 if can_write then tcp_handler_write t t
.sock_out
866 | _
-> t
.event_handler t
(BASIC_EVENT event
)
869 (*************************************************************************)
871 (* Bandwidth Controlers *)
873 (*************************************************************************)
875 let read_bandwidth_controlers = ref []
876 let write_bandwidth_controlers = ref []
878 let create_read_bandwidth_controler name rate
=
881 remaining_bytes
= rate
;
886 remaining_bytes_user
= (fun _ _
-> ());
887 moved_bytes
= Int64.zero
;
888 lost_bytes
= Array.create
3600 0;
890 ndone_last_second
= 0;
892 read_bandwidth_controlers := bc :: !read_bandwidth_controlers;
895 let create_write_bandwidth_controler name rate
=
898 remaining_bytes
= rate
;
903 remaining_bytes_user
= (fun _ _
-> ());
904 moved_bytes
= Int64.zero
;
905 lost_bytes
= Array.create
3600 0;
907 ndone_last_second
= 0;
909 write_bandwidth_controlers := bc :: !write_bandwidth_controlers;
912 let change_rate bc rate
=
913 bc.total_bytes
<- rate
915 let bandwidth_controler t
sock =
916 (match t
.read_control
with
919 (* must_read sock (bc.total_bytes = 0 || bc.remaining_bytes > 0)); *)
920 (* bandwidth_controler is called before socket is engaged into data transfer.
921 This can be either upload or download connection, but when download speed
922 is capped and fully saturated the above condition will be false,
923 consequently the socket will never get [want_read] property and
924 will never get considered by the event loop, until finally being
925 closed on timeout. This bug manifests itself with no _new_ BT upload clients
926 or inability to connect to servers (DC) when download is running.
927 This is a temporary fix, bandwidth limiting logic needs some global refactoring *)
928 must_read
sock true);
929 (match t
.write_control
with
932 must_write
sock ((bc.total_bytes
= 0 || bc.remaining_bytes
> 0)
935 let reset_bandwidth_controlers _
=
937 bc.remaining_bytes_user
bc.total_bytes
bc.remaining_bytes
;
938 bc.remaining_bytes
<- bc.total_bytes
- bc.forecast_bytes;
939 if !verbose_bandwidth
> 0 && bc.ndone_last_second
> 0 then
940 lprintf_nl
"[BW1 %6d] %s read %d/%d last second" (last_time
())
941 bc.bc_name
bc.ndone_last_second
bc.total_bytes
;
942 if !verbose_bandwidth
> 0 && bc.forecast_bytes > 0 then
943 lprintf_nl
"[BW1 %6d] %s forecast read %d bytes for next second" (last_time
())
944 bc.bc_name
bc.forecast_bytes;
945 bc.forecast_bytes <- 0;
946 bc.ndone_last_second
<- 0;
947 if bc.remaining_bytes
> 0 then bc.allow_io
:= true
948 (* lprintf "READ remaining_bytes: %d" bc.remaining_bytes; *)
949 ) !read_bandwidth_controlers;
951 bc.remaining_bytes_user
bc.total_bytes
bc.remaining_bytes
;
952 bc.remaining_bytes
<- bc.total_bytes
- bc.forecast_bytes;
953 if !verbose_bandwidth
> 0 && bc.ndone_last_second
> 0 then
954 lprintf_nl
"[BW1 %6d] %s wrote %d/%d last second" (last_time
())
955 bc.bc_name
bc.ndone_last_second
bc.total_bytes
;
956 if !verbose_bandwidth
> 0 && bc.forecast_bytes > 0 then
957 lprintf_nl
"[BW1 %6d] %s forecast write %d bytes for next second" (last_time
())
958 bc.bc_name
bc.forecast_bytes;
959 bc.forecast_bytes <- 0;
960 bc.ndone_last_second
<- 0;
961 if bc.remaining_bytes
> 0 then bc.allow_io
:= true;
963 lprintf "WRITE remaining_bytes: %d\n" bc.remaining_bytes;
965 ) !write_bandwidth_controlers
967 let compute_lost_byte bc =
968 if bc.total_bytes
= 0 then -1 else
969 let sum = ref Int64.zero
in
970 for i
= 0 to 3600-1 do
971 sum := !sum ++ (Int64.of_int
bc.lost_bytes
.(i
));
973 Int64.to_int
(!sum // 3600L)
975 let moved_bytes bc = bc.moved_bytes
977 let set_remaining_bytes_user bc f
=
978 bc.remaining_bytes_user
<- f
980 let set_lost_bytes bc lost sec
=
981 bc.lost_bytes
.(sec
mod 3600) <- lost
983 (*************************************************************************)
985 (* Setting handlers *)
987 (*************************************************************************)
990 let old_handler = t
.event_handler
in
992 (* if t.monitored then (lprintf "set_closer handler\n"); *)
994 BASIC_EVENT
(CLOSED
s) ->
995 (* lprintf "READ_DONE %d\n" nread; *)
997 |_
-> old_handler t ev
999 t
.event_handler
<- handler
1001 let set_rtimer t f
=
1002 let old_handler = t
.event_handler
in
1004 (* if t.monitored then (lprintf "set_closer handler\n"); *)
1006 BASIC_EVENT
(RTIMEOUT
| LTIMEOUT
) ->
1008 |_
-> old_handler t ev
1010 t
.event_handler
<- handler
1012 let set_handler t event
handler =
1013 let old_handler = t
.event_handler
in
1015 (* if t.monitored then (lprintf "set_handler handler\n"; ); *)
1021 t
.event_handler
<- handler
1023 let set_refill t f
=
1024 set_handler t CAN_REFILL f
;
1025 (try f t
with _
-> ())
1027 let close_after_write t
=
1028 if t
.wbuf
.len
= 0 then begin
1029 (* lprintf "close_after_write: CLOSE\n"; - log output removed *)
1030 shutdown t Closed_by_user
1033 set_handler t WRITE_DONE
(fun t
->
1034 (* lprintf "close_after_write: CLOSE\n"; - log output removed *)
1035 shutdown t Closed_by_user
)
1037 let http_proxy = ref None
1039 let set_reader t f
=
1040 (* lprintf "set_reader for %s\n" t.host; *)
1041 let old_handler = t
.event_handler
in
1046 match !http_proxy with
1050 (* HTTP/1.0 200 OK\n\n *)
1052 let rcode, rstr
, rstr_end
=
1054 let rcode_pos = 8 (*String.index_from b.buf b.pos ' '*) in
1055 let rcode = String.sub
b.buf (rcode_pos+1) 3 in
1056 let rstr_pos = 12 (*String.index_from b.buf (rcode_pos+1) ' '*) in
1057 let rstr_end = String.index_from
b.buf (rstr_pos+1) '
\n'
in
1058 let rstr = String.sub
b.buf (rstr_pos+1) (rstr_end-rstr_pos-1) in
1059 lprintf
"From proxy for %s: %s %s\n"
1060 (Ip.to_string
sock.host
) rcode rstr;
1061 rcode, rstr, rstr_end
1066 "200" -> (*lprintf "Connect to client via proxy ok\n";*)
1067 let pos = String.index_from
b.buf (rstr_end+1) '
\n'
in
1068 let used = pos + 1 - b.pos in
1069 sock_used sock used;
1070 if nread != used then
1071 f
sock (nread - used)
1073 (* fall back to user handler *)
1077 READ_DONE
nread -> f t
nread
1078 |_
-> old_handler t ev
1080 t
.event_handler
<- handler;
1081 t
.connecting
<- false;
1082 t
.event_handler t CONNECTED
;
1083 if t
.nwrite
= 0 then tcp_handler t CAN_WRITE
;
1084 lprintf
"old handler set\n"
1088 READ_DONE
nread -> ff t
nread
1089 | _
-> old_handler t ev
1091 t
.event_handler
<- handler
1093 let set_handler t event
handler =
1094 let old_handler = t
.event_handler
in
1096 (* if t.monitored then (lprintf "set_handler handler\n"; ); *)
1102 t
.event_handler
<- handler
1104 let set_refill t f
=
1105 set_handler t CAN_REFILL f
;
1106 if t
.wbuf
.len
= 0 then (try f t
with _
-> ())
1108 let set_connected t f
=
1109 set_handler t CONNECTED f
1111 (*************************************************************************)
1113 (* Socket Configuration *)
1115 (*************************************************************************)
1117 let set_read_controler t
bc =
1118 t
.read_control
<- Some
bc;
1119 (* set_before_select t.sock (bandwidth_controler t); *)
1120 set_allow_read t
.sock_in
bc.allow_io
;
1121 bandwidth_controler t t
.sock_in
1123 let set_write_controler t
bc =
1124 t
.write_control
<- Some
bc;
1125 (* set_before_select t.sock (bandwidth_controler t); *)
1126 set_allow_write t
.sock_out
bc.allow_io
;
1127 bandwidth_controler t t
.sock_out
1129 let set_monitored t
b = t
.monitored
<- b
1130 let monitored t
= t
.monitored
1132 let set_rtimeout s t
= set_rtimeout s.sock_in t
1133 let set_wtimeout s t
= set_wtimeout s.sock_out t
1135 let set_write_power t p
= (* t.write_power <- p *) ()
1136 let set_read_power t p
= (* t.read_power <- p *) ()
1138 let set_lifetime s = set_lifetime s.sock_in
1140 (*************************************************************************)
1142 (* Printing Information *)
1144 (*************************************************************************)
1146 let dump_socket t
buf =
1147 print_socket
buf t
.sock_in
;
1148 print_socket
buf t
.sock_out
;
1149 Printf.bprintf
buf "rbuf: %d/%d wbuf: %d/%d\n" t
.rbuf
.len
1150 (String.length t
.rbuf
.buf) t
.wbuf
.len
(String.length t
.wbuf
.buf)
1153 BasicSocket.stats buf t
.sock_in
;
1154 BasicSocket.stats buf t
.sock_out
;
1155 Printf.bprintf
buf " rbuf size: %d/%d\n" (String.length t
.rbuf
.buf)
1156 t
.rbuf
.max_buf_size
;
1157 Printf.bprintf
buf " wbuf size: %d/%d\n" (String.length t
.wbuf
.buf)
1160 (*************************************************************************)
1164 (*************************************************************************)
1166 let create token name
fd handler =
1168 MlUnix.set_close_on_exec
fd;
1173 sock_in
= dummy_sock;
1174 sock_out
= dummy_sock;
1175 rbuf
= buf_create !max_buffer_size;
1176 wbuf
= buf_create !max_buffer_size;
1177 event_handler
= handler;
1178 error = Closed_by_peer
;
1184 read_control
= None
;
1185 write_control
= None
;
1196 let sock = BasicSocket.create name
fd (fun _ event
->
1197 tcp_handler t event
) in
1198 set_printer
sock (fun sock ->
1199 Printf.sprintf
"%s (nread: %d nwritten: %d) [U %s,D %s]" name
t.nread t.nwrite
1200 (string_of_bool
(t.read_control
<> None
)) (string_of_bool
(t.write_control
<> None
));
1203 set_dump_info
sock (dump_socket t);
1205 lprintf_nl
"[tBS] fd %d %s" (sock_num
sock) name
;
1210 (*************************************************************************)
1214 (*************************************************************************)
1216 let create_pipe token name fd_in fd_out
handler =
1217 use_token token fd_in
;
1218 MlUnix.set_close_on_exec fd_in
;
1219 MlUnix.set_close_on_exec fd_out
;
1224 sock_in
= dummy_sock;
1225 sock_out
= dummy_sock;
1226 rbuf
= buf_create !max_buffer_size;
1227 wbuf
= buf_create !max_buffer_size;
1228 event_handler
= handler;
1229 error = Closed_by_peer
;
1235 read_control
= None
;
1236 write_control
= None
;
1248 let fname = (fun _
->
1249 Printf.sprintf
"%s (nread: %d nwritten: %d) [U %s,D %s]" name
1250 t.nread t.nwrite
(string_of_bool
(t.read_control
<> None
))
1251 (string_of_bool
(t.write_control
<> None
));
1256 let sock_in = BasicSocket.create name fd_in
(fun _ event
->
1257 tcp_handler t event
) in
1259 lprintf_nl
"[tBS] fd %d %s" (sock_num
sock_in) name
;
1260 set_printer
sock_in fname;
1261 set_dump_info
sock_in (dump_socket t);
1263 let sock_out = BasicSocket.create name fd_out
(fun _ event
->
1264 tcp_handler t event
) in
1265 set_printer
sock_out fname;
1266 set_dump_info
sock_out (dump_socket t);
1268 t.sock_in <- sock_in;
1269 t.sock_out <- sock_out;
1272 (*************************************************************************)
1274 (* create_blocking *)
1276 (*************************************************************************)
1278 let create_blocking token name
fd handler =
1280 MlUnix.set_close_on_exec
fd;
1285 sock_in = dummy_sock;
1286 sock_out = dummy_sock;
1287 rbuf
= buf_create !max_buffer_size;
1288 wbuf
= buf_create !max_buffer_size;
1289 event_handler
= handler;
1290 error = Closed_by_peer
;
1296 read_control
= None
;
1297 write_control
= None
;
1308 let sock = create_blocking name
fd (fun sock event
->
1309 tcp_handler t event
) in
1312 set_dump_info
sock (dump_socket t);
1315 let create_simple token name
fd =
1316 create token name
fd (fun _ _
-> ())
1318 (*************************************************************************)
1322 (*************************************************************************)
1324 let connect token name host port
handler =
1325 if token.token_used
then failwith
"Token already used";
1327 (* lprintf "CONNECT %s:%d\n" (Unix.string_of_inet_addr host) port; *)
1328 let s = Unix.socket
Unix.PF_INET
Unix.SOCK_STREAM
0 in
1329 if !bind_address <> Unix.inet_addr_any
then
1330 Unix.bind
s (Unix.ADDR_INET
(!bind_address, 0));
1331 let proxy_ip, proxy_port
, proxy_auth
=
1332 match !http_proxy with
1333 | None
-> Ip.null
, 0, None
1334 | Some
(h
, p
, auth
) -> Ip.from_name h
, p
, auth
1336 let ip = Ip.of_inet_addr host
in
1337 let use_proxy = proxy_ip <> Ip.null
&& proxy_ip <> ip in
1338 if use_proxy then begin
1339 (* connect to proxy in blocking mode, so we sure, connections established when we send CONNECT *)
1340 lprintf
"via proxy\n";
1341 Unix.connect s (Unix.ADDR_INET
(Ip.to_inet_addr
proxy_ip, proxy_port
));
1343 let buf = Buffer.create 200 in
1344 let dotted_host = Unix.string_of_inet_addr host
in
1345 Printf.bprintf
buf "CONNECT %s:%d HTTP/1.1\n" dotted_host port
;
1346 Printf.bprintf
buf "Pragma: no-cache\n";
1347 Printf.bprintf
buf "Cache-Control: no-cache\n";
1348 Printf.bprintf
buf "Connection: Keep-Alive\n";
1349 Printf.bprintf
buf "Proxy-Connection: Keep-Alive\n";
1350 begin match proxy_auth
with
1351 | Some
(login
,password
) ->
1352 Printf.bprintf
buf "Proxy-Authorization: Basic %s\n" (Base64.encode
(login ^
":" ^ password
))
1355 Printf.bprintf
buf "User-Agent: MLdonkey/%s\n" Autoconf.current_version
;
1356 Printf.bprintf
buf "\n";
1357 ignore
(MlUnix.write s (Buffer.contents
buf) 0 (Buffer.length
buf))
1359 let t = create token name
s handler in
1361 if !verbose_bandwidth
> 1 then
1362 lprintf_nl
"[BW2 %6d] connect on %s:%d" (last_time
()) t.name
(sock_num
t.sock_out);
1364 token.connection_manager
.nconnections_last_second
<-
1365 token.connection_manager
.nconnections_last_second
+ 1;
1367 must_write
t.sock_out true;
1370 (* lprintf "add_connect at %f\n" (current_time ()); *)
1371 t.connect_time
<- current_time
();
1372 if use_proxy then begin
1375 Unix.connect s (Unix.ADDR_INET
(host
,port
));
1377 t.connecting
<- true;
1378 register_upload t 0; (* The TCP SYN packet *)
1379 forecast_download t 0; (* The TCP ACK packet *)
1380 forecast_upload t 0; (* The TCP ACK packet *)
1381 (* if use_proxy then begin
1385 Unix.Unix_error
((Unix.EINPROGRESS
|Unix.EINTR
|Unix.EWOULDBLOCK
),_
,_
) ->
1386 t.connecting
<- true;
1387 register_upload t 0; (* The TCP SYN packet *)
1388 forecast_download t 0; (* The TCP ACK packet *)
1389 forecast_upload t 0; (* The TCP ACK packet *)
1391 | Unix.Unix_error
(Unix.ENETUNREACH
,_
,_
) as e
->
1392 (* log nothing here, but later in donkeyClient.ml *)
1393 close t Closed_connect_failed
;
1396 close t Closed_connect_failed
;
1399 Unix.Unix_error
(Unix.ENETUNREACH
,_
,_
) as e
-> raise e
(* avoid logging *)
1400 | Unix.Unix_error
(Unix.ENOBUFS
,_
,_
) as e
->
1401 if Autoconf.windows
then lprintf_nl
1402 "No more free buffers, read http://support.microsoft.com/kb/q196271/ to fix this problem";
1405 lprintf_nl
"Exception (%s) before connect to host %s:%d"
1406 (Printexc2.to_string e
) (Unix.string_of_inet_addr host
) port
;
1411 (*************************************************************************)
1415 (*************************************************************************)
1419 if t.my_ip = Ip.null
then
1420 let fd = fd t.sock_in in
1421 match Unix.getsockname
fd with
1422 Unix.ADDR_INET
(ip, port
) ->
1423 let ip = Ip.of_inet_addr
ip in
1426 | _
-> raise Not_found
1430 match t.peer_addr with
1431 Some
(ip, port
) -> (ip,port
)
1433 let fd = fd t.sock_out in
1434 match Unix.getpeername
fd with
1435 Unix.ADDR_INET
(ip, port
) ->
1436 let ip = Ip.of_inet_addr
ip in
1437 t.peer_addr <- Some
(ip, port
);
1439 | _
-> raise Not_found
1441 let peer_ip t = fst
(peer_addr t)
1442 let peer_port t = snd
(peer_addr t)
1446 let fd = fd t.sock_out in
1447 match Unix.getpeername fd with
1448 Unix.ADDR_INET (ip, port) -> Ip.of_inet_addr ip, port
1449 | _ -> raise Not_found
1452 (*************************************************************************)
1454 (* Sending Marshalled Values *)
1456 (*************************************************************************)
1461 let internal_buf = Buffer.create 17000
1463 let simple_send_buf buf sock =
1464 let s = Buffer.contents
buf in
1467 let len = String.length
s in
1469 write sock (Buffer.contents
buf) 0 5;
1472 let value_send sock m
=
1473 (* Buffer.reset internal_buf; *)
1474 Buffer.reset
internal_buf;
1475 Buffer.add_string
internal_buf (Marshal.to_string m
[]);
1476 simple_send_buf internal_buf sock
1478 let value_handler f
sock nread =
1482 let msg_len = get_int
b.buf (b.pos+1) in
1483 if b.len >= 5 + msg_len then
1485 let s = String.sub
b.buf (b.pos+5) msg_len in
1486 let t = Marshal.from_string
s 0 in
1487 buf_used b (msg_len + 5);
1491 else raise Not_found
1493 with Not_found
-> ()
1495 (*************************************************************************)
1499 (*************************************************************************)
1501 let exec_command token cmd args
handler =
1502 MlUnix.execvp_command cmd args
(fun in_read out_write
->
1503 let t = create_pipe token "pipe" in_read out_write
1505 must_read
t.sock_in false;
1510 (*************************************************************************)
1512 (* Setting configuration *)
1514 (*************************************************************************)
1516 let set_max_connections_per_second f
=
1517 max_connections_per_second := f
1519 let set_max_opened_connections f
=
1520 max_opened_connections := f
1522 let set_max_input_buffer t len =
1523 t.rbuf
.max_buf_size
<- len
1525 let set_max_output_buffer t len =
1526 t.wbuf
.max_buf_size
<- len
1528 (*************************************************************************)
1532 (*************************************************************************)
1534 let to_deflate = ref []
1535 let to_deflate_len = ref 0
1537 let compression_buffer_len = !max_buffer_size / 10
1538 let compression_buffer = String.create compression_buffer_len
1540 let deflate_connection sock =
1541 (* lprintf "Creating deflate connection\n"; *)
1542 let comp = Some
(Zlib.inflate_init
true, Zlib.deflate_init
6 true,
1543 buf_create !max_buffer_size, buf_create !max_buffer_size) in
1544 sock.compression
<- comp
1546 let rec iter_deflate sock zs wbuf
=
1547 if wbuf
.len > 0 then begin
1548 (* lprintf "iter_deflate\n"; *)
1549 let (_
, used_in
, used_out
) = Zlib.deflate zs
1550 wbuf
.buf wbuf
.pos wbuf
.len
1551 compression_buffer 0 compression_buffer_len
1552 Zlib.Z_SYNC_FLUSH
in
1554 lprintf "deflated %d/%d -> %d\n" used_in wbuf.len used_out;
1555 lprintf "[%s]\n" (String.escaped (String.sub compression_buffer 0 used_out));
1557 write sock compression_buffer 0 used_out
;
1558 buf_used wbuf used_in
;
1559 if used_in
> 0 || used_out
> 0 then
1560 iter_deflate sock zs wbuf
1563 let deflate_timer _
=
1564 List.iter (fun sock ->
1566 match sock.compression
with
1567 Some
(_
, zs
, _
, wbuf
) ->
1568 if closed sock then raise Exit
;
1569 iter_deflate sock zs wbuf
1572 lprintf
"[ERROR] Exception %s in CanBeCompressed.deflate_timer\n"
1573 (Printexc2.to_string e
)
1578 let _to_deflate conn
=
1579 if not
(List.memq conn
!to_deflate) then
1580 to_deflate := conn
:: !to_deflate;
1581 if !to_deflate_len > 1000000 then
1584 let rec iter_inflate zs
sock b rbuf
=
1585 if b.len > 0 then begin
1587 lprintf "iter_inflate %d\n" b.len;
1588 lprintf "[%s]\n" (String.escaped (String.sub b.buf b.pos b.len));
1590 let (_
, used_in
, used_out
) = Zlib.inflate zs
b.buf b.pos b.len
1591 compression_buffer 0 compression_buffer_len
1592 Zlib.Z_SYNC_FLUSH
in
1594 lprintf "inflated %d/%d -> %d\n" used_in b.len used_out;
1595 lprintf "[%s]\n" (String.escaped (String.sub compression_buffer 0 used_out));
1597 buf_add sock rbuf
compression_buffer 0 used_out
;
1599 if used_in
> 0 || used_out
> 0 then
1600 iter_inflate zs
sock b rbuf
1604 match t.compression
with
1606 | Some
(zs
, _
, rbuf
, _
) ->
1607 (* lprintf "CanBeCompressed.buf\n"; *)
1609 if b.len > 0 then iter_inflate zs
t b rbuf
;
1612 let write t s pos len =
1613 match t.compression
with
1614 None
-> write t s pos len
1615 | Some
(_
,_
, _
,wbuf
) ->
1617 to_deflate_len := !to_deflate_len + len;
1619 buf_add t wbuf
s pos len
1621 (*************************************************************************)
1625 (*************************************************************************)
1627 let write_string t s = write t s 0 (String.length
s)
1630 add_bandwidth_second_timer
(fun _ ->
1631 reset_bandwidth_controlers ();
1632 reset_connection_scheduler ();
1636 set_before_select_hook
(fun _ ->
1637 schedule_connections ();
1638 List.iter (fun bc ->
1639 let old_value = !(bc.allow_io
) in
1640 bc.allow_io
:= (bc.total_bytes
= 0 || bc.remaining_bytes
> 0);
1641 if !verbose_bandwidth
> 2 && (old_value <> !(bc.allow_io
)) then
1642 lprintf_nl
"[BW3 %6d] %20s: stop reading" (last_time
()) bc.bc_name
1643 ) !read_bandwidth_controlers;
1644 List.iter (fun bc ->
1645 let old_value = !(bc.allow_io
) in
1646 bc.allow_io
:= (bc.total_bytes
= 0 || bc.remaining_bytes
> 0);
1647 if !verbose_bandwidth
> 2 && (old_value <> !(bc.allow_io
)) then
1648 lprintf_nl
"[BW3 %6d] %20s: stop writing" (last_time
()) bc.bc_name
1649 ) !write_bandwidth_controlers;
1655 * sort the connections so that we try to read and write as
1656 much as possible to/from connections on which we already have read/written
1658 * postpone reads and writes for a few seconds so that we have more to
1661 dhcppc3:~# ping computer.domain.org -f -s <packet_size> -c 1000
1662 where computer.domain.org is a nearby computer on a 100 Mbs link.
1664 On my cable link, making <packet_size> vary from 1 to 2000 shows:
1665 * packets > 1450 are 100% lost
1666 * 300 > packets > 1 are 5% lost but exactly the same time (14.5 s)
1667 * packets > 300 are 30% lost and higher time (17s)
1669 So, we can suppose that sending 250 bytes is exactly the same as sending one
1670 byte, and the contrary, so [TODO]: round up all packets sent and received
1671 to an option 'packet_unit' (default 250). Ask Simon if it makes sense.
1673 1000 packets transmitted, 947 received, 5% packet loss, time 14191ms
1674 , pipe 6, ipg/ewma 14.205/0.000 ms
1679 set_after_select_hook
(fun _ ->
1680 List.iter (fun bc ->
1681 if bc.remaining_bytes
> 0 then begin
1683 bc.connections
<- List.sort
(fun t1 t2
->
1684 let w1 = t1
.nread in
1685 let w2 = t2
.nread in
1689 if !verbose_bandwidth
> 2 && bc.nconnections
> 0 then begin
1690 lprintf_nl
"[BW3 %6d] %d read-waiting connections for %d allowed" (last_time
()) bc.nconnections
bc.remaining_bytes
;
1692 lprintf_nl
"[BW3 %6d] %s:%d [buffered %d]" (last_time
()) t.name
(sock_num
t.sock_in) ((buf t).len);
1698 if bc.remaining_bytes
> 0 then
1699 let nconnections = max
bc.nconnections 1 in
1700 let can_read = max
1 (bc.remaining_bytes
/ nconnections) in
1701 let can_read = max
!ip_packet_size (can_read * t.read_power
) in
1703 (* lprintf "allow to read %d\n" can_read; *)
1704 can_read_handler t t.sock_in can_read
1706 (* lprintf "Exception %s in can_read_handler %s:%d\n"
1707 (Printexc2.to_string e)
1708 t.name (sock_num t.sock_in) *) ()
1710 bc.nconnections <- bc.nconnections - t.read_power
1712 if !verbose_bandwidth
> 2 then
1713 lprintf_nl
"[BW3 %6d] %s:%d could not read" (last_time
())
1714 t.name
(sock_num
t.sock_out)
1716 (* if bc.remaining_bytes > 0 then bc.allow_io := false; *)
1719 if !verbose_bandwidth
> 2 then begin
1721 if bc.remaining_bytes
> 0 then
1722 lprintf_nl
"[BW3 %6d] still %d bytes to read" (last_time
()) bc.remaining_bytes
;
1724 if bc.nconnections > 0 then
1725 lprintf_nl
"[BW3 %6d] still %d read-waiting connections after loop"
1726 (last_time
()) bc.nconnections;
1730 bc.connections
<- [];
1731 bc.nconnections <- 0;
1732 ) !read_bandwidth_controlers;
1733 List.iter (fun bc ->
1734 if bc.remaining_bytes
> 0 then begin
1736 bc.connections
<- List.sort
(fun t1 t2
->
1737 let w1 = remaining_to_write t1
in
1738 let w2 = remaining_to_write t2
in
1742 if !verbose_bandwidth
> 2 && bc.nconnections > 0 then begin
1743 lprintf_nl
"[BW3 %6d] %d write-waiting connections for %d allowed"
1744 (last_time
()) bc.nconnections bc.remaining_bytes
;
1746 lprintf_nl
"[BW3 %6d] %s:%d [buffered %d]"
1747 (last_time
()) t.name
(sock_num
t.sock_out) (remaining_to_write t);
1751 if bc.remaining_bytes
> 0 then
1752 let nconnections = max
bc.nconnections 1 in
1753 let can_write = max
1 (bc.remaining_bytes
/ nconnections) in
1754 let can_write = best_packet_size (can_write * t.write_power
)
1756 let old_nwrite = t.nwrite
in
1758 (* lprintf "WRITE\n"; *)
1759 can_write_handler t t.sock_out (min
can_write t.wbuf
.len)
1761 bc.remaining_bytes
<- bc.remaining_bytes
-
1762 t.nwrite
+ old_nwrite;
1763 bc.nconnections <- bc.nconnections - t.write_power
;
1765 if !verbose_bandwidth
> 2 then
1766 lprintf_nl
"[BW3 %6d] %s:%d could not write buffered %d bytes"
1767 (last_time
()) t.name
(sock_num
t.sock_out) (remaining_to_write t)
1769 (* if bc.remaining_bytes > 0 then bc.allow_io := false; *)
1771 if !verbose_bandwidth
> 2 then begin
1773 if bc.remaining_bytes
> 0 then begin
1774 lprintf_nl
"[BW3 %6d] still %d bytes to write"
1775 (last_time
()) bc.remaining_bytes
;
1777 let len = remaining_to_write t in
1779 lprintf_nl
"[BW3 %6d] %s:%d could write %d" (last_time
())
1780 t.name
(sock_num
t.sock_out) len
1784 if bc.nconnections > 0 then
1785 lprintf_nl
"[BW3 %6d] still %d write-waiting connections after loop"
1786 (last_time
()) bc.nconnections;
1788 bc.connections
<- [];
1789 bc.nconnections <- 0;
1790 ) !write_bandwidth_controlers
1793 let prevent_close t =
1794 prevent_close t.sock_in;
1795 prevent_close t.sock_out
1797 let must_write t bool = must_write t.sock_out bool
1798 let output_buffered t = t.wbuf
.len
1801 Heap.add_memstat
"tcpBufferedSocket" (fun level
buf ->
1802 Printf.bprintf
buf " %d latencies\n" (Hashtbl.length
latencies);
1803 Printf.bprintf
buf " String.length big_buffer: %d\n" (String.length
big_buffer);
1804 Printf.bprintf
buf " connection_managers: %d\n" (List.length
!connection_managers);
1805 Printf.bprintf
buf " read_bandwidth_controlers: %d\n" (List.length
!read_bandwidth_controlers);
1806 Printf.bprintf
buf " write_bandwidth_controlers: %d\n" (List.length
!write_bandwidth_controlers);
1807 Printf.bprintf
buf " to_deflate: %d\n" (List.length
!to_deflate);
1808 Printf.bprintf
buf " max_opened_connections: %d\n" (!max_opened_connections ());
1809 Printf.bprintf
buf " max_connections_per_second: %d\n" (!max_connections_per_second ());
1810 Printf.bprintf
buf " max_buffer_size: %d\n" (!max_buffer_size);