enable proxy authentication for http client too
[mldonkey.git] / src / utils / net / tcpBufferedSocket.ml
blob4fd8f41bd7b389925359ae3aaac44b61a8fbcbd5
1 (* Copyright 2001, 2002 b8_bavard, b8_fee_carabine, INRIA *)
2 (*
3 This file is part of mldonkey.
5 mldonkey is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation; either version 2 of the License, or
8 (at your option) any later version.
10 mldonkey is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 GNU General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with mldonkey; if not, write to the Free Software
17 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
20 open Int64ops
21 open Printf2
22 open BasicSocket
25 let latencies = Hashtbl.create 2131
27 let max_opened_connections = ref (fun () -> 20)
28 let max_connections_per_second = ref (fun () -> 50)
30 let opened_connections = ref 0
31 let opened_connections_this_second = ref 0
33 let max_buffer_size = ref 50000
35 let bind_address = ref Unix.inet_addr_any
36 let ip_packet_size = ref 40
37 let mtu_packet_size = ref 1500
38 let minimal_packet_size = ref 600
39 let packet_frame_size = 1
41 let proc_net_fs = ref true
43 let tcp_uploaded_bytes = ref Int64.zero
44 let tcp_downloaded_bytes = ref Int64.zero
46 let exn_exit = Exit
48 (*************************************************************************)
49 (* *)
50 (* TYPES *)
51 (* *)
52 (*************************************************************************)
54 type event =
55 WRITE_DONE
56 | CAN_REFILL
57 | CONNECTED
58 | BUFFER_OVERFLOW
59 | READ_DONE of int
60 | BASIC_EVENT of BasicSocket.event
62 let string_of_event = function
63 | CONNECTED -> "CONNECTED"
64 | WRITE_DONE -> "WRITE_DONE"
65 | CAN_REFILL -> "CAN_REFILL"
66 | BUFFER_OVERFLOW -> "BUFFER_OVERFLOW"
67 | READ_DONE n -> Printf.sprintf "READ_DONE %d" n
68 | BASIC_EVENT e -> string_of_basic_event e
70 type token = {
71 mutable token_used : bool;
72 connection_manager : connection_manager;
75 and connection_manager = {
76 cm_name : string;
77 mutable nestablished_connections : int;
78 mutable nwaiting_connections : int;
79 mutable nconnections_last_second : int;
80 waiting_connections : (token * (token -> unit)) Fifo.t;
83 type buf = {
84 mutable buf : string;
85 mutable pos : int;
86 mutable len : int;
87 mutable max_buf_size : int;
88 mutable min_buf_size : int
91 type t = {
92 name : string;
93 mutable closing : bool;
94 mutable sock_in : BasicSocket.t;
95 mutable sock_out : BasicSocket.t;
96 mutable rbuf : buf;
97 mutable wbuf : buf;
98 mutable event_handler : handler;
99 mutable error : close_reason;
100 mutable nread : int;
101 mutable ndown_packets : int;
102 mutable nwrite : int;
103 mutable nup_packets : int;
104 mutable monitored : bool;
106 mutable read_control : bandwidth_controler option;
107 mutable write_control : bandwidth_controler option;
108 mutable write_power : int;
109 mutable read_power : int;
111 mutable peer_addr : (Ip.t * int) option;
112 mutable my_ip : Ip.t;
114 mutable noproxy : bool;
115 mutable connecting : bool;
116 mutable host : Ip.t;
117 mutable connect_time : float;
119 mutable token : token;
121 mutable compression : (
122 Zlib.stream *
123 Zlib.stream *
124 buf * (* read buffer after decompression *)
125 buf (* write buffer before decompression *)
126 ) option;
130 and handler = t -> event -> unit
132 and bandwidth_controler = {
133 bc_name : string;
134 mutable remaining_bytes : int;
135 mutable total_bytes : int;
136 mutable nconnections : int;
137 mutable connections : t list;
138 allow_io : bool ref;
139 mutable remaining_bytes_user : ((* total *) int -> (* remaining *) int -> unit);
140 mutable moved_bytes : int64;
141 mutable lost_bytes : int array; (* 3600 samples*)
142 mutable forecast_bytes : int;
144 mutable ndone_last_second : int;
148 (*************************************************************************)
149 (* *)
150 (* Connections Managers *)
151 (* *)
152 (*************************************************************************)
154 let connection_managers = ref []
156 let create_connection_manager name =
157 let manager = {
158 cm_name = name;
159 nestablished_connections = 0;
160 nwaiting_connections = 0;
161 waiting_connections = Fifo.create ();
162 nconnections_last_second = 0;
165 connection_managers := manager :: !connection_managers;
166 manager
168 let create_token manager = {
169 token_used = false;
170 connection_manager = manager;
173 let add_pending_connection manager f =
174 let token = create_token manager in
175 Fifo.put manager.waiting_connections (token, f);
176 manager.nwaiting_connections <- manager.nwaiting_connections + 1;
177 token
179 let can_open_connection manager =
180 manager.nestablished_connections +
181 manager.nwaiting_connections < !max_opened_connections ()
184 (******
185 This scheduler does not use the already established connections.
186 *****)
188 let schedule_connections () =
189 let max_wanted = !max_opened_connections () in
190 let max_connections_per_second = !max_connections_per_second () in
192 let rec iter todo_managers done_managers =
195 lprintf "todo_managers %d done_managers %d\n"
196 (List.length todo_managers) (List.length done_managers);
198 match todo_managers with
199 [] -> begin
200 match done_managers with
201 [] -> ()
202 | _ -> iter done_managers []
204 | manager :: tail ->
206 lprintf "!opened_connections_this_second %d < max_connections_per_second %d\n" !opened_connections_this_second max_connections_per_second;
207 lprintf "&& !opened_connections %d < max_wanted %d\n"
208 !opened_connections max_wanted;
210 if !opened_connections_this_second < max_connections_per_second
211 && !opened_connections < max_wanted then
213 (try
214 let (token,f) =
215 Fifo.take manager.waiting_connections in
216 manager.nwaiting_connections <-
217 manager.nwaiting_connections - 1;
218 if not token.token_used then begin
219 f token;
220 end;
221 (* prevent in any case the token from being used later *)
222 token.token_used <- true;
223 true
224 with _ -> false)
225 then
226 iter tail (manager :: done_managers)
227 else
228 iter tail done_managers
230 iter !connection_managers []
232 let cancel_token token =
233 token.token_used <- true
235 let used_token token = token.token_used
237 let unlimited_connection_manager = create_connection_manager "Unlimited"
239 let reset_connection_scheduler _ =
240 if !verbose_bandwidth > 0 then
241 lprintf_nl "[BW1 %6d] Connections opened this second : %d/%d total %d/%d"
242 (last_time ())
243 !opened_connections_this_second
244 (!max_connections_per_second ())
245 !opened_connections
246 (!max_opened_connections ());
248 List.iter (fun cm ->
249 if !verbose_bandwidth > 0 then begin
250 if cm.nconnections_last_second > 0 then
251 lprintf_nl "[BW1 %6d] %s opened %d connections last second"
252 (last_time ()) cm.cm_name cm.nconnections_last_second;
253 if cm.nwaiting_connections > 0 then
254 lprintf_nl "[BW1 %6d] %s still waits for %d connections"
255 (last_time ()) cm.cm_name cm.nwaiting_connections;
256 end;
257 cm.nconnections_last_second <- 0;
258 ) !connection_managers;
260 opened_connections_this_second := 0
262 let use_token token fd =
263 if token.token_used then begin
264 (try Unix.close fd with _ -> ());
265 failwith "Token already used";
266 end;
267 token.token_used <- true;
268 token.connection_manager.nestablished_connections <-
269 token.connection_manager.nestablished_connections + 1;
270 incr opened_connections;
271 incr opened_connections_this_second
273 (*************************************************************************)
274 (* *)
275 (* Bandwidth Consumers *)
276 (* *)
277 (*************************************************************************)
279 let add_connect_latency ip time =
280 if ip <> Ip.null && time > 1. then
281 let delay = current_time () -. time in
282 let delayf = 1000. *. delay in
283 let delay = int_of_float delayf in
284 let delay = if delay > 65000 then 65000 else delay in
286 lprintf "add_connect_latency %s -> %d (%f)\n"
287 (Ip.to_string ip) delay delayf;
290 let latency, samples = Hashtbl.find latencies ip in
291 incr samples;
292 if !latency > delay then latency := delay
293 with _ ->
294 Hashtbl.add latencies ip (ref delay, ref 1)
296 let forecast_bytes t nbytes =
297 match t with
298 None -> ()
299 | Some bc ->
300 let nip_packets = 1 + nbytes / !mtu_packet_size in
301 let nbytes = nbytes + nip_packets * !ip_packet_size in
302 let nframes = 1 + nbytes / packet_frame_size in
303 bc.forecast_bytes <- bc.forecast_bytes +
304 (nframes * packet_frame_size)
306 let register_bytes t nbytes =
307 match t with
308 None -> ()
309 | Some bc ->
310 let nip_packets = 1 + nbytes / !mtu_packet_size in
311 let nbytes = nbytes + nip_packets * !ip_packet_size in
312 let nframes = 1 + nbytes / packet_frame_size in
313 bc.remaining_bytes <- bc.remaining_bytes -
314 (nframes * packet_frame_size)
316 let forecast_download t n =
317 forecast_bytes t.read_control n
319 let forecast_upload t n =
320 forecast_bytes t.write_control n
322 let register_download t n =
323 register_bytes t.read_control n
325 let register_upload t n =
326 register_bytes t.write_control n
328 let accept_connection_bandwidth t =
329 register_download t 0;
330 forecast_upload t 0;
331 forecast_download t 0
333 let best_packet_size nbytes =
334 let nbytes = max nbytes !minimal_packet_size in
335 let nip_packets = 1 + nbytes / !mtu_packet_size in
336 let headers = nip_packets * !ip_packet_size in
337 let nframes = 1 + (nbytes + headers) / packet_frame_size in
338 nframes * packet_frame_size - headers
340 (*************************************************************************)
341 (* *)
342 (* Buffers management *)
343 (* *)
344 (*************************************************************************)
346 let copy_read_buffer = ref true
348 let big_buffer_len = 65536
349 let big_buffer = String.create big_buffer_len
351 let min_buffer_read = 2000
352 let min_read_size = min_buffer_read - 100
354 let old_strings_size = 20
355 let old_strings = Array.create old_strings_size ""
356 let old_strings_len = ref 0
358 let new_string () =
359 if !old_strings_len > 0 then begin
360 decr old_strings_len;
361 let s = old_strings.(!old_strings_len) in
362 old_strings.(!old_strings_len) <- "";
364 end else
365 String.create min_buffer_read
367 let delete_string s =
368 if !old_strings_len < old_strings_size &&
369 String.length s = min_buffer_read then begin
370 old_strings.(!old_strings_len) <- s;
371 incr old_strings_len;
374 let buf_create max =
376 buf = "";
377 pos = 0;
378 len = 0;
379 max_buf_size = max;
380 min_buf_size = min_read_size;
384 let buf_used b nused =
385 if nused = b.len then
386 ( b.len <- 0;
387 b.pos <- 0;
388 delete_string b.buf;
389 b.buf <- "";
391 else
392 (b.len <- b.len - nused; b.pos <- b.pos + nused)
394 let buf_size t =
395 (String.length t.rbuf.buf),
396 (String.length t.wbuf.buf)
398 (*************************************************************************)
399 (* *)
400 (* buf_add *)
401 (* *)
402 (*************************************************************************)
404 let buf_add t b s pos1 len =
405 let curpos = b.pos + b.len in
406 let max_len =
407 if b.buf = "" then
408 begin
409 b.buf <- new_string ();
410 min_buffer_read
411 end else
412 String.length b.buf in
413 if max_len - curpos < len then (* resize before blit *)
414 if b.len + len < max_len then (* just move to 0 *)
415 begin
416 String.blit b.buf b.pos b.buf 0 b.len;
417 String.blit s pos1 b.buf b.len len;
418 b.len <- b.len + len;
419 b.pos <- 0;
421 else
422 if b.len + len > b.max_buf_size then begin
423 lprintf "[TCP_BS]: BUFFER OVERFLOW %d+%d> %d " b.len len b.max_buf_size ;
425 lprintf "MESSAGE: [";
426 for i = pos1 to pos1 + (min len 20) - 1 do
427 lprintf "(%d)" (int_of_char s.[i]);
428 done;
429 if len > 20 then lprintf "...";
430 lprintf "]\n";
432 t.event_handler t BUFFER_OVERFLOW;
433 (* TODO: why do we have this ??? in case of BUFFER_OVERFLOW, just close the
434 socket !!! *)
436 else
437 let new_len = min (max (2 * max_len) (b.len + len)) b.max_buf_size in
438 (* if t.monitored then
439 (lprintf "Allocate new for %d\n" len; ); *)
440 let new_buf = String.create new_len in
441 String.blit b.buf b.pos new_buf 0 b.len;
442 String.blit s pos1 new_buf b.len len;
443 b.len <- b.len + len;
444 b.pos <- 0;
445 if max_len = min_buffer_read then delete_string b.buf;
446 (* if t.monitored then
447 (lprintf "new buffer allocated\n"; ); *)
448 b.buf <- new_buf
449 else begin
450 String.blit s pos1 b.buf curpos len;
451 b.len <- b.len + len
454 (*************************************************************************)
455 (* *)
456 (* Sockets management *)
457 (* *)
458 (*************************************************************************)
460 let buf t = t.rbuf
461 let setsock_iptos_throughput t =
462 ignore (setsock_iptos_throughput (fd (t.sock_in)));
463 ignore (setsock_iptos_throughput (fd (t.sock_out)))
465 let closed t = closed t.sock_out
466 let error t = t.error
467 let sock_used t nused = buf_used t.rbuf nused
468 let remaining_to_write t = t.wbuf.len
469 let nread t = t.nread
470 let nwritten t = t.nwrite
471 let can_write t = t.wbuf.len = 0
472 let can_write_len t len =
473 let b = t.wbuf.max_buf_size > t.wbuf.len + len in
474 (* if not b then
475 lprintf "can_write_len failed: %d < %d + %d\n"
476 t.wbuf.max_buf_size t.wbuf.len len; *)
478 let not_buffer_more t max = t.wbuf.len < max
479 let get_rtimeout t = get_rtimeout t.sock_in
480 let max_refill t = t.wbuf.max_buf_size - t.wbuf.len
482 let close t s =
484 if t.monitored then begin
485 lprintf "close with %s %s\n" t.error s;
486 end;
488 if not t.closing then
489 begin
491 t.token.connection_manager.nestablished_connections <-
492 t.token.connection_manager.nestablished_connections - 1;
493 decr opened_connections;
494 t.closing <- true;
495 delete_string t.rbuf.buf;
496 delete_string t.wbuf.buf;
497 t.rbuf.buf <- "";
498 t.wbuf.buf <- "";
499 if t.nread > 0 then begin
500 register_upload t 0;
501 forecast_download t 0;
502 end;
503 close t.sock_in s;
504 if t.sock_in != t.sock_out then
505 close t.sock_out s
506 (* (Printf.sprintf "%s after %d/%d" s t.nread t.nwrite) *)
507 with e ->
508 lprintf "Exception %s in TcpBufferedSocket.close\n"
509 (Printexc2.to_string e);
510 raise e
513 let shutdown t s =
515 if t.monitored then begin
516 lprintf "shutdown\n";
517 end;
519 (try
520 BasicSocket.shutdown t.sock_out s;
521 if t.sock_in != t.sock_out then
522 BasicSocket.shutdown t.sock_in s;
523 with e ->
524 lprintf "exception %s in shutdown\n" (Printexc2.to_string e);
526 (try close t s with e ->
527 lprintf "exception %s in shutdown\n" (Printexc2.to_string e);
530 (*************************************************************************)
531 (* *)
532 (* write *)
533 (* *)
534 (*************************************************************************)
536 let write t s pos1 len =
537 (* lprintf "want_write %d\n" len; *)
538 if len > 0 && not (closed t) then
539 let pos2 = pos1 + len in
540 let b = t.wbuf in
541 let pos1 =
542 if b.len = 0 && not t.connecting && (match t.write_control with
543 None ->
544 (* lprintf "NO CONTROL\n"; *)
545 true
546 | Some bc ->
547 (* lprintf "LIMIT %d\n" bc.total_bytes; *)
548 bc.total_bytes = 0)
549 then
551 let fd = fd t.sock_out in
552 (* lprintf "WRITE [%s]\n" (String.escaped (String.sub s pos1 len)); *)
553 let nw = MlUnix.write fd s pos1 len in
554 if !verbose_bandwidth > 1 then
555 lprintf_nl "[BW2 %6d] immediate write %d/%d on %s:%d"
556 (last_time ()) nw len t.name (sock_num t.sock_out);
558 register_upload t len;
559 forecast_download t 0;
561 (* if t.monitored then begin
562 lprintf "write: direct written %d\n" nw;
563 end; *)
564 tcp_uploaded_bytes := !tcp_uploaded_bytes ++ (Int64.of_int nw);
565 (match t.write_control with
566 None -> ()
567 | Some bc ->
568 bc.moved_bytes <- bc.moved_bytes ++ (Int64.of_int nw));
569 if t.nwrite = 0 then begin
570 (* if t.connecting then
571 lprintf "WRITE BEFORE CONNECTION.......\n";
572 lprintf "add_connect_latency at %f\n" (current_time ()); *)
573 add_connect_latency t.host t.connect_time;
574 end;
576 t.nwrite <- t.nwrite + nw;
577 if nw = 0 then (close t Closed_by_peer; pos2) else
578 pos1 + nw
579 with
580 Unix.Unix_error ((Unix.EWOULDBLOCK | Unix.EAGAIN | Unix.ENOTCONN), _, _) -> pos1
581 | e ->
582 t.error <- Closed_for_error (Printf.sprintf "Write Error: %s" (Printexc2.to_string e));
583 close t t.error;
585 (* lprintf "exce %s in read\n" (Printexc2.to_string e); *)
586 raise e
588 else pos1
590 if pos2 > pos1 then
591 let sock = t.sock_out in
592 must_write sock true;
593 buf_add t b s pos1 (pos2 - pos1)
595 (*************************************************************************)
596 (* *)
597 (* can_read_handler *)
598 (* *)
599 (*************************************************************************)
601 let dummy_sock = Obj.magic 0
604 (* max_len is the maximal length we authorized to read, min_read_size
605 is the minimal size we authorize to read *)
607 let can_read_handler t sock max_len =
608 (* let max_len = 100000 in (* REMOVE THIS: don't care about bw *) *)
609 let b = t.rbuf in
610 let curpos = b.pos + b.len in
611 (* lprintf "curpos %d/%d\n" curpos b.len; *)
612 let buffer, buffer_pos, buffer_len =
613 if !copy_read_buffer then
614 big_buffer, 0, big_buffer_len
615 else
616 let can_write_in_buffer =
617 if b.buf = "" then
618 if b.min_buf_size <= min_buffer_read then begin
619 b.buf <- new_string ();
620 min_buffer_read
621 end else begin
622 b.buf <- String.create b.min_buf_size;
623 b.min_buf_size
625 else
626 let buf_len = String.length b.buf in
627 if buf_len - curpos < min_read_size then
628 if b.len + min_read_size > b.max_buf_size then
630 t.event_handler t BUFFER_OVERFLOW;
631 lprintf "[OVERFLOW] in %s" (info sock);
632 close t Closed_for_overflow;
633 raise exn_exit
635 else
636 if b.len + min_read_size < buf_len then
638 String.blit b.buf b.pos b.buf 0 b.len;
639 b.pos <- 0;
640 buf_len - b.len
642 else
643 let new_len = min
644 (max
645 (2 * buf_len) (b.len + min_read_size)) b.max_buf_size
647 let new_buf = String.create new_len in
648 String.blit b.buf b.pos new_buf 0 b.len;
649 b.pos <- 0;
650 b.buf <- new_buf;
651 new_len - b.len
652 else
653 buf_len - curpos
655 b.buf, b.pos+b.len, can_write_in_buffer
657 let can_read = min max_len buffer_len in
658 if can_read > 0 then
659 let old_len = b.len in
660 let nread = try
661 (* lprintf "{can read %d} --> " can_read; *)
662 (* lprintf "Unix.read %d/%d/%d\n" (String.length b.buf) (b.pos + b.len) can_read; *)
663 Unix.read (fd sock) buffer buffer_pos can_read;
666 with
667 Unix.Unix_error((Unix.EWOULDBLOCK | Unix.EAGAIN), _,_) as e -> raise e
668 | e ->
669 t.error <- Closed_for_error (Printf.sprintf "Can Read Error: %s" (Printexc2.to_string e));
670 close t t.error;
672 (* lprintf "exce %s in read\n" (Printexc2.to_string e); *)
673 raise e
678 if nread = can_read then begin
679 lprintf "READ LIMITED BY BW CONTROL: %d\n" nread;
680 end;
683 if !verbose_bandwidth > 1 then
684 lprintf_nl "[BW2 %6d] %sread %d/%d/%d on %s:%d" (last_time ())
685 (if old_len > 0 then "completing " else "") nread can_read max_len
686 t.name (sock_num t.sock_in);
689 if !copy_read_buffer then
690 buf_add t b big_buffer 0 nread
691 else
692 b.len <- b.len + nread;
693 (* lprintf " %d\n" nread; *)
694 b.min_buf_size <- min b.max_buf_size (
695 max (nread + nread / 2) min_read_size);
698 if nread = can_read then begin
699 lprintf "Unix.read: read limited: %d\n" nread;
700 lprintf " given to handler: %d\n" max_len;
701 lprintf " given by buffer: %d\n" buffer_len;
702 end;
705 tcp_downloaded_bytes := !tcp_downloaded_bytes ++ (Int64.of_int nread);
706 (match t.read_control with
707 None -> () | Some bc ->
708 bc.moved_bytes <- bc.moved_bytes ++ (Int64.of_int nread));
710 t.nread <- t.nread + nread;
711 if nread > 0 then begin
712 register_download t nread;
713 register_upload t 0;
714 end;
715 if nread = 0 then begin
716 close t Closed_by_peer;
717 end else begin
720 (* if t.monitored then
721 (lprintf "event handler READ DONE\n"; ); *)
722 t.event_handler t (READ_DONE nread);
723 with
724 | e ->
725 (* if t.monitored then
726 (lprintf "Exception in READ DONE\n"; ); *)
727 t.error <- Closed_for_error (Printf.sprintf "READ_DONE Error: %s" (Printexc2.to_string e));
728 close t t.error;
730 (* lprintf "exce %s in read\n" (Printexc2.to_string e); *)
731 raise e
734 (*************************************************************************)
735 (* *)
736 (* can_write_handler *)
737 (* *)
738 (*************************************************************************)
740 let can_write_handler t sock max_len =
741 (* if t.monitored then (
742 lprintf "CAN_WRITE (%d)\n" t.wbuf.len;
743 ); *)
744 let b = t.wbuf in
745 if not t.connecting then (
746 if b.len > 0 then
747 begin
749 (* lprintf "try write %d/%d\n" max_len t.wbuf.len; *)
750 let fd = fd sock in
751 (* lprintf "WRITE [%s]\n" (String.escaped
752 (String.sub b.buf b.pos max_len)); *)
753 let nw = MlUnix.write fd b.buf b.pos max_len in
754 if !verbose_bandwidth > 1 then
755 lprintf_nl "[BW2 %6d] postponed %swrite %d/%d/%d on %s:%d"
756 (last_time ()) (if max_len < b.len then "partial " else "")
757 nw max_len b.len t.name (sock_num t.sock_out);
759 (* if t.monitored then
760 (lprintf "written %d\n" nw; ); *)
761 tcp_uploaded_bytes := !tcp_uploaded_bytes ++ (Int64.of_int nw);
762 (match t.write_control with
763 None -> ()
764 | Some bc ->
765 bc.moved_bytes <- bc.moved_bytes ++ (Int64.of_int nw));
766 if t.nwrite = 0 then begin
767 (* lprintf "add_connect_latency at %f\n" (current_time ()); *)
768 add_connect_latency t.host t.connect_time;
769 end;
770 t.nwrite <- t.nwrite + nw;
771 b.len <- b.len - nw;
772 b.pos <- b.pos + nw;
773 if nw = 0 then close t Closed_by_peer else
774 if b.len = 0 then begin
775 b.pos <- 0;
776 delete_string b.buf;
777 b.buf <- "";
779 with
780 Unix.Unix_error((Unix.EWOULDBLOCK | Unix.EAGAIN ), _,_) as e -> raise e
781 | e ->
782 t.error <- Closed_for_error (Printf.sprintf "Can Write Error: %s" (Printexc2.to_string e));
783 close t t.error;
785 (* lprintf "exce %s in read\n" (Printexc2.to_string e); *)
786 raise e
788 end;
789 if not (closed t) then begin
790 t.event_handler t CAN_REFILL;
791 if b.len = 0 then begin
792 delete_string b.buf;
793 b.pos <- 0;
794 must_write t.sock_out false;
795 t.event_handler t WRITE_DONE
800 (*************************************************************************)
801 (* *)
802 (* tcp_handler *)
803 (* *)
804 (*************************************************************************)
806 let tcp_handler_write t sock =
807 begin
808 match t.write_control with
809 None ->
810 can_write_handler t sock t.wbuf.len
811 | Some bc ->
812 if bc.total_bytes = 0 then
813 can_write_handler t sock t.wbuf.len
814 else begin
815 bc.connections <- t :: bc.connections;
816 bc.nconnections <- t.write_power + bc.nconnections
820 let get_latencies verbose =
821 let b = Buffer.create 300 in
822 let counter = ref 0 in
823 Hashtbl.iter (fun ip (latency, samples) ->
824 incr counter;
825 ) latencies;
826 LittleEndian.buf_int b !counter;
827 Hashtbl.iter (fun ip (latency, samples) ->
828 if !verbose then lprintf " Latency TCP: %s -> %d (%d samples)\n" (Ip.to_string ip) !latency !samples;
829 LittleEndian.buf_ip b ip;
830 LittleEndian.buf_int16 b !latency;
831 LittleEndian.buf_int16 b !samples;
832 ) latencies;
833 Hashtbl.clear latencies;
834 Buffer.contents b
836 let tcp_handler t event =
837 match event with
838 | CAN_READ ->
839 (* lprintf "CAN_READ\n"; *)
840 begin
841 match t.read_control with
842 None ->
843 can_read_handler t t.sock_in 1000000
844 | Some bc ->
845 if bc.total_bytes = 0 then
846 can_read_handler t t.sock_in 1000000
847 else begin
848 (* lprintf "DELAYED\n"; *)
849 if bc.remaining_bytes > 0 then
850 begin
851 bc.connections <- t :: bc.connections;
852 bc.nconnections <- t.read_power + bc.nconnections
856 | CAN_WRITE ->
857 (* lprintf "CAN_WRITE\n"; *)
858 let can_write =
859 if t.nwrite = 0 && t.noproxy && t.connecting then begin
860 t.connecting <- false;
861 t.event_handler t CONNECTED;
862 t.nwrite = 0
863 end else true
865 if can_write then tcp_handler_write t t.sock_out
866 | _ -> t.event_handler t (BASIC_EVENT event)
869 (*************************************************************************)
870 (* *)
871 (* Bandwidth Controlers *)
872 (* *)
873 (*************************************************************************)
875 let read_bandwidth_controlers = ref []
876 let write_bandwidth_controlers = ref []
878 let create_read_bandwidth_controler name rate =
879 let bc = {
880 bc_name = name;
881 remaining_bytes = rate;
882 total_bytes = rate;
883 nconnections = 0;
884 connections = [];
885 allow_io = ref true;
886 remaining_bytes_user = (fun _ _ -> ());
887 moved_bytes = Int64.zero;
888 lost_bytes = Array.create 3600 0;
889 forecast_bytes = 0;
890 ndone_last_second = 0;
891 } in
892 read_bandwidth_controlers := bc :: !read_bandwidth_controlers;
895 let create_write_bandwidth_controler name rate =
896 let bc = {
897 bc_name = name;
898 remaining_bytes = rate;
899 total_bytes = rate;
900 nconnections = 0;
901 connections = [];
902 allow_io = ref true;
903 remaining_bytes_user = (fun _ _ -> ());
904 moved_bytes = Int64.zero;
905 lost_bytes = Array.create 3600 0;
906 forecast_bytes = 0;
907 ndone_last_second = 0;
908 } in
909 write_bandwidth_controlers := bc :: !write_bandwidth_controlers;
912 let change_rate bc rate =
913 bc.total_bytes <- rate
915 let bandwidth_controler t sock =
916 (match t.read_control with
917 None -> ()
918 | Some bc ->
919 (* must_read sock (bc.total_bytes = 0 || bc.remaining_bytes > 0)); *)
920 (* bandwidth_controler is called before socket is engaged into data transfer.
921 This can be either upload or download connection, but when download speed
922 is capped and fully saturated the above condition will be false,
923 consequently the socket will never get [want_read] property and
924 will never get considered by the event loop, until finally being
925 closed on timeout. This bug manifests itself with no _new_ BT upload clients
926 or inability to connect to servers (DC) when download is running.
927 This is a temporary fix, bandwidth limiting logic needs some global refactoring *)
928 must_read sock true);
929 (match t.write_control with
930 None -> ()
931 | Some bc ->
932 must_write sock ((bc.total_bytes = 0 || bc.remaining_bytes > 0)
933 && t.wbuf.len > 0))
935 let reset_bandwidth_controlers _ =
936 List.iter (fun bc ->
937 bc.remaining_bytes_user bc.total_bytes bc.remaining_bytes;
938 bc.remaining_bytes <- bc.total_bytes - bc.forecast_bytes;
939 if !verbose_bandwidth > 0 && bc.ndone_last_second > 0 then
940 lprintf_nl "[BW1 %6d] %s read %d/%d last second" (last_time ())
941 bc.bc_name bc.ndone_last_second bc.total_bytes;
942 if !verbose_bandwidth > 0 && bc.forecast_bytes > 0 then
943 lprintf_nl "[BW1 %6d] %s forecast read %d bytes for next second" (last_time ())
944 bc.bc_name bc.forecast_bytes;
945 bc.forecast_bytes <- 0;
946 bc.ndone_last_second <- 0;
947 if bc.remaining_bytes > 0 then bc.allow_io := true
948 (* lprintf "READ remaining_bytes: %d" bc.remaining_bytes; *)
949 ) !read_bandwidth_controlers;
950 List.iter (fun bc ->
951 bc.remaining_bytes_user bc.total_bytes bc.remaining_bytes;
952 bc.remaining_bytes <- bc.total_bytes - bc.forecast_bytes;
953 if !verbose_bandwidth > 0 && bc.ndone_last_second > 0 then
954 lprintf_nl "[BW1 %6d] %s wrote %d/%d last second" (last_time ())
955 bc.bc_name bc.ndone_last_second bc.total_bytes;
956 if !verbose_bandwidth > 0 && bc.forecast_bytes > 0 then
957 lprintf_nl "[BW1 %6d] %s forecast write %d bytes for next second" (last_time ())
958 bc.bc_name bc.forecast_bytes;
959 bc.forecast_bytes <- 0;
960 bc.ndone_last_second <- 0;
961 if bc.remaining_bytes > 0 then bc.allow_io := true;
963 lprintf "WRITE remaining_bytes: %d\n" bc.remaining_bytes;
965 ) !write_bandwidth_controlers
967 let compute_lost_byte bc =
968 if bc.total_bytes = 0 then -1 else
969 let sum = ref Int64.zero in
970 for i = 0 to 3600-1 do
971 sum := !sum ++ (Int64.of_int bc.lost_bytes.(i));
972 done;
973 Int64.to_int (!sum // 3600L)
975 let moved_bytes bc = bc.moved_bytes
977 let set_remaining_bytes_user bc f =
978 bc.remaining_bytes_user <- f
980 let set_lost_bytes bc lost sec =
981 bc.lost_bytes.(sec mod 3600) <- lost
983 (*************************************************************************)
984 (* *)
985 (* Setting handlers *)
986 (* *)
987 (*************************************************************************)
989 let set_closer t f =
990 let old_handler = t.event_handler in
991 let handler t ev =
992 (* if t.monitored then (lprintf "set_closer handler\n"); *)
993 match ev with
994 BASIC_EVENT (CLOSED s) ->
995 (* lprintf "READ_DONE %d\n" nread; *)
996 f t s
997 |_ -> old_handler t ev
999 t.event_handler <- handler
1001 let set_rtimer t f =
1002 let old_handler = t.event_handler in
1003 let handler t ev =
1004 (* if t.monitored then (lprintf "set_closer handler\n"); *)
1005 match ev with
1006 BASIC_EVENT (RTIMEOUT | LTIMEOUT) ->
1008 |_ -> old_handler t ev
1010 t.event_handler <- handler
1012 let set_handler t event handler =
1013 let old_handler = t.event_handler in
1014 let handler t ev =
1015 (* if t.monitored then (lprintf "set_handler handler\n"; ); *)
1016 if ev = event then
1017 handler t
1018 else
1019 old_handler t ev
1021 t.event_handler <- handler
1023 let set_refill t f =
1024 set_handler t CAN_REFILL f;
1025 (try f t with _ -> ())
1027 let close_after_write t =
1028 if t.wbuf.len = 0 then begin
1029 (* lprintf "close_after_write: CLOSE\n"; - log output removed *)
1030 shutdown t Closed_by_user
1032 else
1033 set_handler t WRITE_DONE (fun t ->
1034 (* lprintf "close_after_write: CLOSE\n"; - log output removed *)
1035 shutdown t Closed_by_user)
1037 let http_proxy = ref None
1039 let set_reader t f =
1040 (* lprintf "set_reader for %s\n" t.host; *)
1041 let old_handler = t.event_handler in
1042 let ff =
1043 if t.noproxy then
1045 else
1046 match !http_proxy with
1047 | None -> f
1048 | Some _ ->
1049 fun sock nread ->
1050 (* HTTP/1.0 200 OK\n\n *)
1051 let b = buf sock in
1052 let rcode, rstr, rstr_end =
1054 let rcode_pos = 8 (*String.index_from b.buf b.pos ' '*) in
1055 let rcode = String.sub b.buf (rcode_pos+1) 3 in
1056 let rstr_pos = 12 (*String.index_from b.buf (rcode_pos+1) ' '*) in
1057 let rstr_end = String.index_from b.buf (rstr_pos+1) '\n' in
1058 let rstr = String.sub b.buf (rstr_pos+1) (rstr_end-rstr_pos-1) in
1059 lprintf "From proxy for %s: %s %s\n"
1060 (Ip.to_string sock.host) rcode rstr;
1061 rcode, rstr, rstr_end
1062 with _ ->
1063 "", "", 0
1065 (match rcode with
1066 "200" -> (*lprintf "Connect to client via proxy ok\n";*)
1067 let pos = String.index_from b.buf (rstr_end+1) '\n' in
1068 let used = pos + 1 - b.pos in
1069 sock_used sock used;
1070 if nread != used then
1071 f sock (nread - used)
1072 | _ ->
1073 (* fall back to user handler *)
1074 f sock nread);
1075 let handler t ev =
1076 match ev with
1077 READ_DONE nread -> f t nread
1078 |_ -> old_handler t ev
1080 t.event_handler <- handler;
1081 t.connecting <- false;
1082 t.event_handler t CONNECTED;
1083 if t.nwrite = 0 then tcp_handler t CAN_WRITE;
1084 lprintf "old handler set\n"
1086 let handler t ev =
1087 match ev with
1088 READ_DONE nread -> ff t nread
1089 | _ -> old_handler t ev
1091 t.event_handler <- handler
1093 let set_handler t event handler =
1094 let old_handler = t.event_handler in
1095 let handler t ev =
1096 (* if t.monitored then (lprintf "set_handler handler\n"; ); *)
1097 if ev = event then
1098 handler t
1099 else
1100 old_handler t ev
1102 t.event_handler <- handler
1104 let set_refill t f =
1105 set_handler t CAN_REFILL f;
1106 if t.wbuf.len = 0 then (try f t with _ -> ())
1108 let set_connected t f =
1109 set_handler t CONNECTED f
1111 (*************************************************************************)
1112 (* *)
1113 (* Socket Configuration *)
1114 (* *)
1115 (*************************************************************************)
1117 let set_read_controler t bc =
1118 t.read_control <- Some bc;
1119 (* set_before_select t.sock (bandwidth_controler t); *)
1120 set_allow_read t.sock_in bc.allow_io;
1121 bandwidth_controler t t.sock_in
1123 let set_write_controler t bc =
1124 t.write_control <- Some bc;
1125 (* set_before_select t.sock (bandwidth_controler t); *)
1126 set_allow_write t.sock_out bc.allow_io;
1127 bandwidth_controler t t.sock_out
1129 let set_monitored t b = t.monitored <- b
1130 let monitored t = t.monitored
1132 let set_rtimeout s t = set_rtimeout s.sock_in t
1133 let set_wtimeout s t = set_wtimeout s.sock_out t
1135 let set_write_power t p = (* t.write_power <- p *) ()
1136 let set_read_power t p = (* t.read_power <- p *) ()
1138 let set_lifetime s = set_lifetime s.sock_in
1140 (*************************************************************************)
1141 (* *)
1142 (* Printing Information *)
1143 (* *)
1144 (*************************************************************************)
1146 let dump_socket t buf =
1147 print_socket buf t.sock_in;
1148 print_socket buf t.sock_out;
1149 Printf.bprintf buf "rbuf: %d/%d wbuf: %d/%d\n" t.rbuf.len
1150 (String.length t.rbuf.buf) t.wbuf.len (String.length t.wbuf.buf)
1152 let stats buf t =
1153 BasicSocket.stats buf t.sock_in;
1154 BasicSocket.stats buf t.sock_out;
1155 Printf.bprintf buf " rbuf size: %d/%d\n" (String.length t.rbuf.buf)
1156 t.rbuf.max_buf_size;
1157 Printf.bprintf buf " wbuf size: %d/%d\n" (String.length t.wbuf.buf)
1158 t.wbuf.max_buf_size
1160 (*************************************************************************)
1161 (* *)
1162 (* create *)
1163 (* *)
1164 (*************************************************************************)
1166 let create token name fd handler =
1167 use_token token fd;
1168 MlUnix.set_close_on_exec fd;
1169 let t = {
1170 name = name;
1171 token = token;
1172 closing = false;
1173 sock_in = dummy_sock;
1174 sock_out = dummy_sock;
1175 rbuf = buf_create !max_buffer_size;
1176 wbuf = buf_create !max_buffer_size;
1177 event_handler = handler;
1178 error = Closed_by_peer;
1179 nread = 0;
1180 ndown_packets = 0;
1181 nwrite = 0;
1182 nup_packets = 0;
1183 monitored = false;
1184 read_control = None;
1185 write_control = None;
1186 write_power = 1;
1187 read_power = 1;
1188 connect_time = 0.;
1189 peer_addr = None;
1190 my_ip = Ip.null;
1191 noproxy = true;
1192 connecting = false;
1193 host = Ip.null;
1194 compression = None;
1195 } in
1196 let sock = BasicSocket.create name fd (fun _ event ->
1197 tcp_handler t event) in
1198 set_printer sock (fun sock ->
1199 Printf.sprintf "%s (nread: %d nwritten: %d) [U %s,D %s]" name t.nread t.nwrite
1200 (string_of_bool (t.read_control <> None)) (string_of_bool (t.write_control <> None));
1203 set_dump_info sock (dump_socket t);
1204 if !debug then
1205 lprintf_nl "[tBS] fd %d %s" (sock_num sock) name;
1206 t.sock_in <- sock;
1207 t.sock_out <- sock;
1210 (*************************************************************************)
1211 (* *)
1212 (* create_pipe *)
1213 (* *)
1214 (*************************************************************************)
1216 let create_pipe token name fd_in fd_out handler =
1217 use_token token fd_in;
1218 MlUnix.set_close_on_exec fd_in;
1219 MlUnix.set_close_on_exec fd_out;
1220 let t = {
1221 name = name;
1222 token = token;
1223 closing = false;
1224 sock_in = dummy_sock;
1225 sock_out = dummy_sock;
1226 rbuf = buf_create !max_buffer_size;
1227 wbuf = buf_create !max_buffer_size;
1228 event_handler = handler;
1229 error = Closed_by_peer;
1230 nread = 0;
1231 ndown_packets = 0;
1232 nwrite = 0;
1233 nup_packets = 0;
1234 monitored = false;
1235 read_control = None;
1236 write_control = None;
1237 write_power = 1;
1238 read_power = 1;
1239 peer_addr = None;
1240 my_ip = Ip.null;
1241 noproxy = true;
1242 connecting = false;
1243 host = Ip.null;
1244 connect_time = 0.;
1245 compression = None;
1246 } in
1248 let fname = (fun _ ->
1249 Printf.sprintf "%s (nread: %d nwritten: %d) [U %s,D %s]" name
1250 t.nread t.nwrite (string_of_bool (t.read_control <> None))
1251 (string_of_bool (t.write_control <> None));
1253 ) in
1256 let sock_in = BasicSocket.create name fd_in (fun _ event ->
1257 tcp_handler t event) in
1258 if !debug then
1259 lprintf_nl "[tBS] fd %d %s" (sock_num sock_in) name;
1260 set_printer sock_in fname;
1261 set_dump_info sock_in (dump_socket t);
1263 let sock_out = BasicSocket.create name fd_out (fun _ event ->
1264 tcp_handler t event) in
1265 set_printer sock_out fname;
1266 set_dump_info sock_out (dump_socket t);
1268 t.sock_in <- sock_in;
1269 t.sock_out <- sock_out;
1272 (*************************************************************************)
1273 (* *)
1274 (* create_blocking *)
1275 (* *)
1276 (*************************************************************************)
1278 let create_blocking token name fd handler =
1279 use_token token fd;
1280 MlUnix.set_close_on_exec fd;
1281 let t = {
1282 name = name;
1283 token = token;
1284 closing = false;
1285 sock_in = dummy_sock;
1286 sock_out = dummy_sock;
1287 rbuf = buf_create !max_buffer_size;
1288 wbuf = buf_create !max_buffer_size;
1289 event_handler = handler;
1290 error = Closed_by_peer;
1291 nread = 0;
1292 ndown_packets = 0;
1293 nwrite = 0;
1294 nup_packets = 0;
1295 monitored = false;
1296 read_control = None;
1297 write_control = None;
1298 write_power = 1;
1299 read_power = 1;
1300 peer_addr = None;
1301 my_ip = Ip.null;
1302 noproxy = true;
1303 connecting = false;
1304 host = Ip.null;
1305 connect_time = 0.;
1306 compression = None;
1307 } in
1308 let sock = create_blocking name fd (fun sock event ->
1309 tcp_handler t event) in
1310 t.sock_in <- sock;
1311 t.sock_out <- sock;
1312 set_dump_info sock (dump_socket t);
1315 let create_simple token name fd =
1316 create token name fd (fun _ _ -> ())
1318 (*************************************************************************)
1319 (* *)
1320 (* connect *)
1321 (* *)
1322 (*************************************************************************)
1324 let connect token name host port handler =
1325 if token.token_used then failwith "Token already used";
1327 (* lprintf "CONNECT %s:%d\n" (Unix.string_of_inet_addr host) port; *)
1328 let s = Unix.socket Unix.PF_INET Unix.SOCK_STREAM 0 in
1329 if !bind_address <> Unix.inet_addr_any then
1330 Unix.bind s (Unix.ADDR_INET (!bind_address, 0));
1331 let proxy_ip, proxy_port, proxy_auth =
1332 match !http_proxy with
1333 | None -> Ip.null, 0, None
1334 | Some (h, p, auth) -> Ip.from_name h, p, auth
1336 let ip = Ip.of_inet_addr host in
1337 let use_proxy = proxy_ip <> Ip.null && proxy_ip <> ip in
1338 if use_proxy then begin
1339 (* connect to proxy in blocking mode, so we sure, connections established when we send CONNECT *)
1340 lprintf "via proxy\n";
1341 Unix.connect s (Unix.ADDR_INET(Ip.to_inet_addr proxy_ip, proxy_port));
1343 let buf = Buffer.create 200 in
1344 let dotted_host = Unix.string_of_inet_addr host in
1345 Printf.bprintf buf "CONNECT %s:%d HTTP/1.1\n" dotted_host port;
1346 Printf.bprintf buf "Pragma: no-cache\n";
1347 Printf.bprintf buf "Cache-Control: no-cache\n";
1348 Printf.bprintf buf "Connection: Keep-Alive\n";
1349 Printf.bprintf buf "Proxy-Connection: Keep-Alive\n";
1350 begin match proxy_auth with
1351 | Some (login,password) ->
1352 Printf.bprintf buf "Proxy-Authorization: Basic %s\n" (Base64.encode (login ^ ":" ^ password))
1353 | None -> ()
1354 end;
1355 Printf.bprintf buf "User-Agent: MLdonkey/%s\n" Autoconf.current_version;
1356 Printf.bprintf buf "\n";
1357 ignore (MlUnix.write s (Buffer.contents buf) 0 (Buffer.length buf))
1358 end;
1359 let t = create token name s handler in
1361 if !verbose_bandwidth > 1 then
1362 lprintf_nl "[BW2 %6d] connect on %s:%d" (last_time ()) t.name (sock_num t.sock_out);
1364 token.connection_manager.nconnections_last_second <-
1365 token.connection_manager.nconnections_last_second + 1;
1367 must_write t.sock_out true;
1369 t.host <- ip;
1370 (* lprintf "add_connect at %f\n" (current_time ()); *)
1371 t.connect_time <- current_time ();
1372 if use_proxy then begin
1373 t.noproxy <- false;
1374 end else
1375 Unix.connect s (Unix.ADDR_INET(host,port));
1377 t.connecting <- true;
1378 register_upload t 0; (* The TCP SYN packet *)
1379 forecast_download t 0; (* The TCP ACK packet *)
1380 forecast_upload t 0; (* The TCP ACK packet *)
1381 (* if use_proxy then begin
1382 end; *)
1384 with
1385 Unix.Unix_error((Unix.EINPROGRESS|Unix.EINTR|Unix.EWOULDBLOCK),_,_) ->
1386 t.connecting <- true;
1387 register_upload t 0; (* The TCP SYN packet *)
1388 forecast_download t 0; (* The TCP ACK packet *)
1389 forecast_upload t 0; (* The TCP ACK packet *)
1391 | Unix.Unix_error (Unix.ENETUNREACH,_,_) as e ->
1392 (* log nothing here, but later in donkeyClient.ml *)
1393 close t Closed_connect_failed;
1394 raise e
1395 | e ->
1396 close t Closed_connect_failed;
1397 raise e
1398 with
1399 Unix.Unix_error (Unix.ENETUNREACH,_,_) as e -> raise e (* avoid logging *)
1400 | Unix.Unix_error (Unix.ENOBUFS,_,_) as e ->
1401 if Autoconf.windows then lprintf_nl
1402 "No more free buffers, read http://support.microsoft.com/kb/q196271/ to fix this problem";
1403 raise e
1404 | e ->
1405 lprintf_nl "Exception (%s) before connect to host %s:%d"
1406 (Printexc2.to_string e) (Unix.string_of_inet_addr host) port;
1407 raise e
1411 (*************************************************************************)
1412 (* *)
1413 (* IP addresses *)
1414 (* *)
1415 (*************************************************************************)
1418 let my_ip t =
1419 if t.my_ip = Ip.null then
1420 let fd = fd t.sock_in in
1421 match Unix.getsockname fd with
1422 Unix.ADDR_INET (ip, port) ->
1423 let ip = Ip.of_inet_addr ip in
1424 t.my_ip <- ip;
1426 | _ -> raise Not_found
1427 else t.my_ip
1429 let peer_addr t =
1430 match t.peer_addr with
1431 Some (ip, port) -> (ip,port)
1432 | None ->
1433 let fd = fd t.sock_out in
1434 match Unix.getpeername fd with
1435 Unix.ADDR_INET (ip, port) ->
1436 let ip = Ip.of_inet_addr ip in
1437 t.peer_addr <- Some (ip, port);
1438 ip, port
1439 | _ -> raise Not_found
1441 let peer_ip t = fst (peer_addr t)
1442 let peer_port t = snd (peer_addr t)
1445 let host t =
1446 let fd = fd t.sock_out in
1447 match Unix.getpeername fd with
1448 Unix.ADDR_INET (ip, port) -> Ip.of_inet_addr ip, port
1449 | _ -> raise Not_found
1452 (*************************************************************************)
1453 (* *)
1454 (* Sending Marshalled Values *)
1455 (* *)
1456 (*************************************************************************)
1458 open AnyEndian
1459 open LittleEndian
1461 let internal_buf = Buffer.create 17000
1463 let simple_send_buf buf sock =
1464 let s = Buffer.contents buf in
1465 Buffer.reset buf;
1466 buf_int8 buf 228;
1467 let len = String.length s in
1468 buf_int buf len;
1469 write sock (Buffer.contents buf) 0 5;
1470 write sock s 0 len
1472 let value_send sock m =
1473 (* Buffer.reset internal_buf; *)
1474 Buffer.reset internal_buf;
1475 Buffer.add_string internal_buf (Marshal.to_string m []);
1476 simple_send_buf internal_buf sock
1478 let value_handler f sock nread =
1479 let b = buf sock in
1481 while b.len >= 5 do
1482 let msg_len = get_int b.buf (b.pos+1) in
1483 if b.len >= 5 + msg_len then
1484 begin
1485 let s = String.sub b.buf (b.pos+5) msg_len in
1486 let t = Marshal.from_string s 0 in
1487 buf_used b (msg_len + 5);
1488 f t sock;
1491 else raise Not_found
1492 done
1493 with Not_found -> ()
1495 (*************************************************************************)
1496 (* *)
1497 (* exec_command *)
1498 (* *)
1499 (*************************************************************************)
1501 let exec_command token cmd args handler =
1502 MlUnix.execvp_command cmd args (fun in_read out_write ->
1503 let t = create_pipe token "pipe" in_read out_write
1504 handler in
1505 must_read t.sock_in false;
1510 (*************************************************************************)
1511 (* *)
1512 (* Setting configuration *)
1513 (* *)
1514 (*************************************************************************)
1516 let set_max_connections_per_second f =
1517 max_connections_per_second := f
1519 let set_max_opened_connections f =
1520 max_opened_connections := f
1522 let set_max_input_buffer t len =
1523 t.rbuf.max_buf_size <- len
1525 let set_max_output_buffer t len =
1526 t.wbuf.max_buf_size <- len
1528 (*************************************************************************)
1529 (* *)
1530 (* Compression *)
1531 (* *)
1532 (*************************************************************************)
1534 let to_deflate = ref []
1535 let to_deflate_len = ref 0
1537 let compression_buffer_len = !max_buffer_size / 10
1538 let compression_buffer = String.create compression_buffer_len
1540 let deflate_connection sock =
1541 (* lprintf "Creating deflate connection\n"; *)
1542 let comp = Some (Zlib.inflate_init true, Zlib.deflate_init 6 true,
1543 buf_create !max_buffer_size, buf_create !max_buffer_size) in
1544 sock.compression <- comp
1546 let rec iter_deflate sock zs wbuf =
1547 if wbuf.len > 0 then begin
1548 (* lprintf "iter_deflate\n"; *)
1549 let (_, used_in, used_out) = Zlib.deflate zs
1550 wbuf.buf wbuf.pos wbuf.len
1551 compression_buffer 0 compression_buffer_len
1552 Zlib.Z_SYNC_FLUSH in
1554 lprintf "deflated %d/%d -> %d\n" used_in wbuf.len used_out;
1555 lprintf "[%s]\n" (String.escaped (String.sub compression_buffer 0 used_out));
1557 write sock compression_buffer 0 used_out;
1558 buf_used wbuf used_in;
1559 if used_in > 0 || used_out > 0 then
1560 iter_deflate sock zs wbuf
1563 let deflate_timer _ =
1564 List.iter (fun sock ->
1566 match sock.compression with
1567 Some (_, zs, _, wbuf) ->
1568 if closed sock then raise Exit;
1569 iter_deflate sock zs wbuf
1570 | _ -> ()
1571 with e ->
1572 lprintf "[ERROR] Exception %s in CanBeCompressed.deflate_timer\n"
1573 (Printexc2.to_string e)
1574 ) !to_deflate;
1575 to_deflate := [];
1576 to_deflate_len := 0
1578 let _to_deflate conn =
1579 if not (List.memq conn !to_deflate) then
1580 to_deflate := conn :: !to_deflate;
1581 if !to_deflate_len > 1000000 then
1582 deflate_timer ()
1584 let rec iter_inflate zs sock b rbuf =
1585 if b.len > 0 then begin
1587 lprintf "iter_inflate %d\n" b.len;
1588 lprintf "[%s]\n" (String.escaped (String.sub b.buf b.pos b.len));
1590 let (_, used_in, used_out) = Zlib.inflate zs b.buf b.pos b.len
1591 compression_buffer 0 compression_buffer_len
1592 Zlib.Z_SYNC_FLUSH in
1594 lprintf "inflated %d/%d -> %d\n" used_in b.len used_out;
1595 lprintf "[%s]\n" (String.escaped (String.sub compression_buffer 0 used_out));
1597 buf_add sock rbuf compression_buffer 0 used_out;
1598 buf_used b used_in;
1599 if used_in > 0 || used_out > 0 then
1600 iter_inflate zs sock b rbuf
1603 let buf t =
1604 match t.compression with
1605 None -> t.rbuf
1606 | Some (zs, _, rbuf, _) ->
1607 (* lprintf "CanBeCompressed.buf\n"; *)
1608 let b = buf t in
1609 if b.len > 0 then iter_inflate zs t b rbuf;
1610 rbuf
1612 let write t s pos len =
1613 match t.compression with
1614 None -> write t s pos len
1615 | Some (_,_, _,wbuf) ->
1617 to_deflate_len := !to_deflate_len + len;
1618 _to_deflate t;
1619 buf_add t wbuf s pos len
1621 (*************************************************************************)
1622 (* *)
1623 (* MAIN *)
1624 (* *)
1625 (*************************************************************************)
1627 let write_string t s = write t s 0 (String.length s)
1629 let _ =
1630 add_bandwidth_second_timer (fun _ ->
1631 reset_bandwidth_controlers ();
1632 reset_connection_scheduler ();
1633 deflate_timer ();
1636 set_before_select_hook (fun _ ->
1637 schedule_connections ();
1638 List.iter (fun bc ->
1639 let old_value = !(bc.allow_io) in
1640 bc.allow_io := (bc.total_bytes = 0 || bc.remaining_bytes > 0);
1641 if !verbose_bandwidth > 2 && (old_value <> !(bc.allow_io)) then
1642 lprintf_nl "[BW3 %6d] %20s: stop reading" (last_time ()) bc.bc_name
1643 ) !read_bandwidth_controlers;
1644 List.iter (fun bc ->
1645 let old_value = !(bc.allow_io) in
1646 bc.allow_io := (bc.total_bytes = 0 || bc.remaining_bytes > 0);
1647 if !verbose_bandwidth > 2 && (old_value <> !(bc.allow_io)) then
1648 lprintf_nl "[BW3 %6d] %20s: stop writing" (last_time ()) bc.bc_name
1649 ) !write_bandwidth_controlers;
1653 Some ideas:
1655 * sort the connections so that we try to read and write as
1656 much as possible to/from connections on which we already have read/written
1657 a lot.
1658 * postpone reads and writes for a few seconds so that we have more to
1659 read/write.
1661 dhcppc3:~# ping computer.domain.org -f -s <packet_size> -c 1000
1662 where computer.domain.org is a nearby computer on a 100 Mbs link.
1664 On my cable link, making <packet_size> vary from 1 to 2000 shows:
1665 * packets > 1450 are 100% lost
1666 * 300 > packets > 1 are 5% lost but exactly the same time (14.5 s)
1667 * packets > 300 are 30% lost and higher time (17s)
1669 So, we can suppose that sending 250 bytes is exactly the same as sending one
1670 byte, and the contrary, so [TODO]: round up all packets sent and received
1671 to an option 'packet_unit' (default 250). Ask Simon if it makes sense.
1673 1000 packets transmitted, 947 received, 5% packet loss, time 14191ms
1674 , pipe 6, ipg/ewma 14.205/0.000 ms
1679 set_after_select_hook (fun _ ->
1680 List.iter (fun bc ->
1681 if bc.remaining_bytes > 0 then begin
1683 bc.connections <- List.sort (fun t1 t2 ->
1684 let w1 = t1.nread in
1685 let w2 = t2.nread in
1686 compare w1 w2
1687 ) bc.connections;
1689 if !verbose_bandwidth > 2 && bc.nconnections > 0 then begin
1690 lprintf_nl "[BW3 %6d] %d read-waiting connections for %d allowed" (last_time ()) bc.nconnections bc.remaining_bytes;
1691 List.iter (fun t ->
1692 lprintf_nl "[BW3 %6d] %s:%d [buffered %d]" (last_time ()) t.name (sock_num t.sock_in) ((buf t).len);
1693 ) bc.connections;
1694 end;
1697 List.iter (fun t ->
1698 if bc.remaining_bytes > 0 then
1699 let nconnections = max bc.nconnections 1 in
1700 let can_read = max 1 (bc.remaining_bytes / nconnections) in
1701 let can_read = max !ip_packet_size (can_read * t.read_power) in
1702 (try
1703 (* lprintf "allow to read %d\n" can_read; *)
1704 can_read_handler t t.sock_in can_read
1705 with e ->
1706 (* lprintf "Exception %s in can_read_handler %s:%d\n"
1707 (Printexc2.to_string e)
1708 t.name (sock_num t.sock_in) *) ()
1710 bc.nconnections <- bc.nconnections - t.read_power
1711 else
1712 if !verbose_bandwidth > 2 then
1713 lprintf_nl "[BW3 %6d] %s:%d could not read" (last_time ())
1714 t.name (sock_num t.sock_out)
1715 ) bc.connections;
1716 (* if bc.remaining_bytes > 0 then bc.allow_io := false; *)
1717 end;
1719 if !verbose_bandwidth > 2 then begin
1721 if bc.remaining_bytes > 0 then
1722 lprintf_nl "[BW3 %6d] still %d bytes to read" (last_time ()) bc.remaining_bytes;
1724 if bc.nconnections > 0 then
1725 lprintf_nl "[BW3 %6d] still %d read-waiting connections after loop"
1726 (last_time ()) bc.nconnections;
1727 end;
1730 bc.connections <- [];
1731 bc.nconnections <- 0;
1732 ) !read_bandwidth_controlers;
1733 List.iter (fun bc ->
1734 if bc.remaining_bytes > 0 then begin
1736 bc.connections <- List.sort (fun t1 t2 ->
1737 let w1 = remaining_to_write t1 in
1738 let w2 = remaining_to_write t2 in
1739 compare w1 w2
1740 ) bc.connections;
1742 if !verbose_bandwidth > 2 && bc.nconnections > 0 then begin
1743 lprintf_nl "[BW3 %6d] %d write-waiting connections for %d allowed"
1744 (last_time ()) bc.nconnections bc.remaining_bytes;
1745 List.iter (fun t ->
1746 lprintf_nl "[BW3 %6d] %s:%d [buffered %d]"
1747 (last_time ()) t.name (sock_num t.sock_out) (remaining_to_write t);
1748 ) bc.connections;
1749 end;
1750 List.iter (fun t ->
1751 if bc.remaining_bytes > 0 then
1752 let nconnections = max bc.nconnections 1 in
1753 let can_write = max 1 (bc.remaining_bytes / nconnections) in
1754 let can_write = best_packet_size (can_write * t.write_power)
1756 let old_nwrite = t.nwrite in
1757 (try
1758 (* lprintf "WRITE\n"; *)
1759 can_write_handler t t.sock_out (min can_write t.wbuf.len)
1760 with _ -> ());
1761 bc.remaining_bytes <- bc.remaining_bytes -
1762 t.nwrite + old_nwrite;
1763 bc.nconnections <- bc.nconnections - t.write_power;
1764 else
1765 if !verbose_bandwidth > 2 then
1766 lprintf_nl "[BW3 %6d] %s:%d could not write buffered %d bytes"
1767 (last_time ()) t.name (sock_num t.sock_out) (remaining_to_write t)
1768 ) bc.connections;
1769 (* if bc.remaining_bytes > 0 then bc.allow_io := false; *)
1770 end;
1771 if !verbose_bandwidth > 2 then begin
1773 if bc.remaining_bytes > 0 then begin
1774 lprintf_nl "[BW3 %6d] still %d bytes to write"
1775 (last_time ()) bc.remaining_bytes;
1776 List.iter (fun t ->
1777 let len = remaining_to_write t in
1778 if len > 0 then
1779 lprintf_nl "[BW3 %6d] %s:%d could write %d" (last_time ())
1780 t.name (sock_num t.sock_out) len
1781 ) bc.connections
1782 end;
1784 if bc.nconnections > 0 then
1785 lprintf_nl "[BW3 %6d] still %d write-waiting connections after loop"
1786 (last_time ()) bc.nconnections;
1787 end;
1788 bc.connections <- [];
1789 bc.nconnections <- 0;
1790 ) !write_bandwidth_controlers
1793 let prevent_close t =
1794 prevent_close t.sock_in;
1795 prevent_close t.sock_out
1797 let must_write t bool = must_write t.sock_out bool
1798 let output_buffered t = t.wbuf.len
1800 let _ =
1801 Heap.add_memstat "tcpBufferedSocket" (fun level buf ->
1802 Printf.bprintf buf " %d latencies\n" (Hashtbl.length latencies);
1803 Printf.bprintf buf " String.length big_buffer: %d\n" (String.length big_buffer);
1804 Printf.bprintf buf " connection_managers: %d\n" (List.length !connection_managers);
1805 Printf.bprintf buf " read_bandwidth_controlers: %d\n" (List.length !read_bandwidth_controlers);
1806 Printf.bprintf buf " write_bandwidth_controlers: %d\n" (List.length !write_bandwidth_controlers);
1807 Printf.bprintf buf " to_deflate: %d\n" (List.length !to_deflate);
1808 Printf.bprintf buf " max_opened_connections: %d\n" (!max_opened_connections ());
1809 Printf.bprintf buf " max_connections_per_second: %d\n" (!max_connections_per_second ());
1810 Printf.bprintf buf " max_buffer_size: %d\n" (!max_buffer_size);