patch 7144
[mldonkey.git] / src / networks / openFT / openFTServers.ml
blob797e6514d80a74e5837044a2fc683b3354f55f7e
1 (* Copyright 2001, 2002 b8_bavard, b8_fee_carabine, INRIA *)
2 (*
3 This file is part of mldonkey.
5 mldonkey is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation; either version 2 of the License, or
8 (at your option) any later version.
10 mldonkey is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 GNU General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with mldonkey; if not, write to the Free Software
17 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
20 open Md4
21 open CommonOptions
22 open CommonSearch
23 open CommonServer
24 open CommonComplexOptions
25 open CommonFile
26 open BasicSocket
27 open TcpBufferedSocket
29 open CommonTypes
30 open CommonGlobals
31 open Options
32 open OpenFTTypes
33 open OpenFTGlobals
34 open OpenFTOptions
35 open OpenFTProtocol
36 open OpenFTComplexOptions
38 module DG = CommonGlobals
39 module DO = CommonOptions
41 let disconnect_from_server s =
42 match s.server_sock with
43 None -> ()
44 | Some sock ->
46 lprintf "DISCONNECT FROM SERVER %s:%d"
47 (Ip.to_string s.server_ip) s.server_port;
48 lprint_newline ();
50 close sock "disconnect";
51 s.server_sock <- None;
52 set_server_state s NotConnected;
53 decr nservers;
54 connection_failed s.server_connection_control;
55 connected_servers := List2.removeq s !connected_servers;
56 if s.server_type = User_node then
57 Hashtbl.remove servers_by_key (s.server_ip, s.server_port)
59 let ask_for_files _ =
60 lprintf "OpenFTServers.ask_for_files not implemented"; lprint_newline ()
62 let send_pings _ =
63 lprintf "OpenFTServers.send_pings not implemented"; lprint_newline ()
65 let recover_files _ =
66 lprintf "recover_files"; lprint_newline ();
67 let module Q = Search in
68 Hashtbl.iter (fun _ file ->
69 if file_state file = FileDownloading then begin
70 incr nsearches;
71 let t = SearchReq {
72 Q.id = !nsearches;
73 Q.search_type = Q.Search_md5;
74 Q.words = (String.lowercase (Md4.to_string file.file_md5)) ;
75 Q.exclude = "";
76 Q.realm = "";
77 Q.size_min = Int64.zero;
78 Q.size_max = Int64.zero;
79 Q.kbps_min = Int64.zero;
80 Q.kbps_max = Int64.zero;
81 } in
82 List.iter (fun s ->
83 match s.server_sock with
84 None -> ()
85 | Some sock -> server_send sock t
86 ) !connected_servers
87 end
88 ) files_by_md5
90 let recover_files_from_server sock =
91 lprintf "recover_files_from_server"; lprint_newline ();
92 let module Q = Search in
93 Hashtbl.iter (fun _ file ->
94 if file_state file = FileDownloading then begin
95 incr nsearches;
96 let t = SearchReq {
97 Q.id = !nsearches;
98 Q.search_type = Q.Search_md5;
99 Q.words = (String.lowercase (Md4.to_string file.file_md5)) ;
100 Q.exclude = "";
101 Q.realm = "";
102 Q.size_min = Int64.zero;
103 Q.size_max = Int64.zero;
104 Q.kbps_min = Int64.zero;
105 Q.kbps_max = Int64.zero;
106 } in
107 server_send sock t
108 end
109 ) files_by_md5
113 let send_query keywords =
114 let module Q = Search in
115 let words = String2.unsplit keywords ' ' in
116 incr nsearches;
117 let t = SearchReq {
118 Q.id = !nsearches;
119 Q.search_type = Q.Search_filename;
120 Q.words = words;
121 Q.exclude = "";
122 Q.realm = "";
123 Q.size_min = Int64.zero;
124 Q.size_max = Int64.zero;
125 Q.kbps_min = Int64.zero;
126 Q.kbps_max = Int64.zero;
127 } in
128 List.iter (fun s ->
129 match s.server_sock with
130 None -> ()
131 | Some sock -> server_send sock t
132 ) !connected_servers;
133 !nsearches
136 let update_user t =
137 let module Q = QueryReply in
138 let user = new_user t.Q.guid (match t.Q.dont_connect with
139 Some true -> Indirect_location ("", t.Q.guid)
140 | _ -> Known_location(t.Q.ip, t.Q.port))
142 user.user_speed <- t.Q.speed;
143 user
146 (* NOT IMPLEMENTED YET
148 let send_query min_speed keywords xml_query =
149 let module Q = Query in
150 let words = String2.unsplit keywords ' ' in
151 let t = QueryReq {
152 Q.min_speed = 0;
153 Q.keywords = words;
154 Q.xml_query = "" } in
155 let p = new_packet t in
156 if !!verbose_servers > 0 then begin
157 lprintf "sending query for <%s>" words; lprint_newline ();
158 end;
159 List.iter (fun s ->
160 match s.server_sock with
161 None -> ()
162 | Some sock -> server_send sock p
163 ) !connected_servers;
166 let extension_list = [
167 "mp3" ; "avi" ; "jpg" ; "jpeg" ; "txt" ; "mov" ; "mpg"
170 let rec remove_short list list2 =
171 match list with
172 [] -> List.rev list2
173 | s :: list ->
174 if List.mem s extension_list then
175 remove_short list (s :: list2) else
177 if String.length s < 5 then (* keywords should had list be 5 bytes *)
178 remove_short list list2
179 else
180 remove_short list (s :: list2)
182 let stem s =
183 let s = String.lowercase (String.copy s) in
184 for i = 0 to String.length s - 1 do
185 let c = s.[i] in
186 match c with
187 'a'..'z' | '0' .. '9' -> ()
188 | _ -> s.[i] <- ' ';
189 done;
190 remove_short (String2.split s ' ') []
192 let get_name_keywords file_name =
193 match stem file_name with
194 [] | [_] ->
195 lprintf "Not enough keywords to recover %s" file_name;
196 lprint_newline ();
197 [file_name]
198 | l -> l
200 let recover_files () =
201 List.iter (fun file ->
202 let keywords = get_name_keywords file.file_name
204 ignore (send_query 0 keywords "")
205 ) !current_files;
208 let recover_files_from_server sock =
209 if !!verbose_servers > 0 then begin
210 lprintf "trying to recover files from server"; lprint_newline ();
211 end;
212 List.iter (fun file ->
213 if !!verbose_servers > 0 then begin
214 lprintf "FOR FILE %s" file.file_name; lprint_newline ();
215 end;
216 let keywords = get_name_keywords file.file_name in
217 let words = String2.unsplit keywords ' ' in
218 if !!verbose_servers > 0 then begin
219 lprintf "sending query for <%s>" words; lprint_newline ();
220 end;
221 let module Q = Query in
222 let t = QueryReq {
223 Q.min_speed = 0;
224 Q.keywords = words;
225 Q.xml_query = "" } in
226 let p = new_packet t in
227 server_send sock p
228 ) !current_files;
232 let redirector_to_client p sock =
233 (* lprintf "redirector_to_client"; lprint_newline (); *)
234 match p.pkt_payload with
235 PongReq t ->
236 let module P = Pong in
237 (* lprintf "ADDING PEER %s:%d" (Ip.to_string t.P.ip) t.P.port; *)
238 Fifo.put peers_queue (t.P.ip, t.P.port);
239 | _ -> ()
241 let redirector_parse_header sock header =
242 (* lprintf "redirector_parse_header"; lprint_newline ();*)
243 if String2.starts_with header gnutella_ok then begin
244 (* lprintf "GOOD HEADER FROM REDIRECTOR:waiting for pongs";*)
245 server_send_new sock (
246 let module P = Ping in
247 PingReq (P.ComplexPing {
248 P.ip = DO.client_ip (Some sock);
249 P.port = !!client_port;
250 P.nfiles = Int64.zero;
251 P.nkb = Int64.zero;
252 P.s = "none:128:false";
254 end else begin
255 if !!verbose_servers>10 then begin
256 lprintf "BAD HEADER FROM REDIRECTOR: "; lprint_newline ();
257 LittleEndian.dump header;
258 end;
259 close sock "bad header";
260 redirector_connected := false;
261 raise Not_found
264 let connect_to_redirector () =
265 match !redirectors_to_try with
266 [] ->
267 redirectors_to_try := !redirectors_ips
268 | ip :: tail ->
269 redirectors_to_try := tail;
270 (* lprintf "connect to redirector"; lprint_newline (); *)
272 let sock = connect "openft to redirector"
273 (Ip.to_inet_addr ip) 6346
274 (fun sock event ->
275 match event with
276 BASIC_EVENT RTIMEOUT ->
277 close sock "timeout";
278 redirector_connected := false;
279 (* lprintf "TIMEOUT FROM REDIRECTOR"; lprint_newline ()*)
280 | _ -> ()
281 ) in
282 TcpBufferedSocket.set_read_controler sock download_control;
283 TcpBufferedSocket.set_write_controler sock upload_control;
286 redirector_connected := true;
287 set_reader sock (handler !!verbose_servers redirector_parse_header
288 (gnutella_handler parse redirector_to_client)
290 set_closer sock (fun _ _ ->
291 (* lprintf "redirector disconnected"; lprint_newline (); *)
292 redirector_connected := false);
293 set_rtimeout sock 10.;
294 set_lifetime (TcpBufferedSocket.sock sock) 120.;
295 write_string sock "GNUTELLA CONNECT/0.4\n\n";
296 with e ->
297 lprintf "Exception in connect_to_redirector: %s"
298 (Printexc2.to_string e); lprint_newline ();
299 redirector_connected := false
301 let add_peers headers =
302 (try
303 let up = List.assoc "x-try-ultrapeers" headers in
304 List.iter (fun s ->
306 let len = String.length s in
307 (* lprintf "NEW ULTRAPEER %s" s; lprint_newline ();*)
308 let pos = String.index s ':' in
309 let ip = String.sub s 0 pos in
310 let port = String.sub s (pos+1) (len - pos - 1) in
311 let ip = Ip.of_string ip in
312 let port = int_of_string port in
313 (* lprintf "ADDING UP %s:%d" (Ip.to_string ip) port;
314 lprint_newline ();*)
315 Fifo.put ultrapeers_queue (ip,port ) ;
316 while Fifo.length ultrapeers_queue > !!max_known_ultrapeers do
317 ignore (Fifo.take ultrapeers_queue)
318 done
320 with _ -> ()
321 ) (String2.split up ',');
322 with e ->
323 lprintf "add_ulta_peers : %s" (Printexc2.to_string e);
324 lprint_newline () );
325 (try
326 let up = List.assoc "x-try" headers in
327 List.iter (fun s ->
329 let len = String.length s in
330 (* lprintf "NEW PEER %s" s; lprint_newline (); *)
331 let pos = String.index s ':' in
332 let ip = String.sub s 0 pos in
333 let port = String.sub s (pos+1) (len - pos - 1) in
334 let ip = Ip.of_string ip in
335 let port = int_of_string port in
336 (* lprintf "ADDING PEER %s:%d" (Ip.to_string ip) port;
337 lprint_newline ();*)
338 Fifo.put peers_queue (ip,port);
339 while Fifo.length peers_queue > !!max_known_peers do
340 ignore (Fifo.take peers_queue)
341 done
343 with _ -> ()
344 ) (String2.split up ',')
345 with _ -> ())
348 ascii: [ G N U T E L L A / 0 . 6 2 0 0 O K(13)(10) U s e r - A g e n t : G n u c l e u s 1 . 8 . 2 . 0(13)(10) R e m o t e - I P : 2 1 2 . 1 9 8 . 2 3 5 . 1 2 3(13)(10) X - Q u e r y - R o u t i n g : 0 . 1(13)(10) X - U l t r a p e e r : T r u e(13)(10) X - L e a f - M a x : 4 0 0(13)(10) U p t i m e : 0 D 0 3 H 3 0 M(13)(10)(13)]
353 let update_user t =
354 let module Q = QueryReply in
355 let user = new_user t.Q.guid (match t.Q.dont_connect with
356 Some true -> Indirect_location ("", t.Q.guid, _, _)
357 | _ -> Known_location(t.Q.ip, t.Q.port))
359 user.user_speed <- t.Q.speed;
360 user
362 let update_client t =
363 let module Q = QueryReply in
364 let c = new_client t.Q.guid (match t.Q.dont_connect with
365 Some true -> Indirect_location ("", t.Q.guid, _, _)
366 | _ -> Known_location(t.Q.ip, t.Q.port))
369 c.client_user.user_speed <- t.Q.speed;
372 let server_parse_header s sock header =
373 if !!verbose_servers> 10 then LittleEndian.dump_ascii header;
375 if String2.starts_with header gnutella_200_ok then begin
376 (* lprintf "GOOD HEADER FROM ULTRAPEER";
377 lprint_newline (); *)
378 set_rtimeout sock Date.half_day_in_secs;
379 (* lprintf "SPLIT HEADER..."; lprint_newline ();*)
380 let lines = Http_client.split_header header in
381 match lines with
382 [] -> raise Not_found
383 | _ :: headers ->
384 (* lprintf "CUT HEADER"; lprint_newline ();*)
385 let headers = Http_client.cut_headers headers in
386 let agent = List.assoc "user-agent" headers in
387 (* lprintf "USER AGENT: %s" agent; lprint_newline ();*)
388 if String2.starts_with agent "LimeWire" ||
389 String2.starts_with agent "Gnucleus" ||
390 String2.starts_with agent "BearShare"
391 then
392 begin
393 s.server_agent <- agent;
394 (* lprintf "LIMEWIRE Detected"; lprint_newline ();*)
395 add_peers headers;
396 if List.assoc "x-ultrapeer" headers <> "True" then begin
397 (* lprintf "NOT AN ULTRAPEER ???"; lprint_newline (); *)
398 raise Not_found;
399 end;
401 (* lprintf "******** ULTRA PEER %s:%d *******"
402 (Ip.to_string s.server_ip) s.server_port;
403 lprint_newline (); *)
404 write_string sock "GNUTELLA/0.6 200 OK\r\n\r\n";
405 set_server_state s Connected_idle;
406 connected_servers := s :: !connected_servers;
407 recover_files_from_server sock
409 else raise Not_found
410 end else
411 if String2.starts_with header gnutella_503_shielded then begin
412 (* lprintf "GOOD HEADER FROM SIMPLE PEER";
413 lprint_newline ();*)
414 let lines = Http_client.split_header header in
415 match lines with
416 [] -> raise Not_found
417 | _ :: headers ->
418 let headers = Http_client.cut_headers headers in
419 let agent = List.assoc "user-agent" headers in
420 if String2.starts_with agent "LimeWire" ||
421 String2.starts_with agent "Gnucleus" ||
422 String2.starts_with agent "BearShare"
423 then
424 begin
425 (* lprintf "LIMEWIRE Detected"; lprint_newline ();*)
426 add_peers headers;
427 raise Not_found
429 else raise Not_found
430 end else begin
431 (* lprintf "BAD HEADER FROM SERVER: [%s]" header; lprint_newline (); *)
432 raise Not_found
434 with
435 | Not_found ->
436 (* lprintf "DISCONNECTION"; lprint_newline (); *)
437 disconnect_from_server s
438 | e ->
440 lprintf "DISCONNECT WITH EXCEPTION %s" (Printexc2.to_string e);
441 lprint_newline ();
443 disconnect_from_server s
447 let get_file_from_source c file =
448 if connection_can_try c.client_connection_control then begin
449 connection_try c.client_connection_control;
450 let u = c.client_user in
451 let s = u.user_server in
452 lprintf "******* DOWNLOAD FROM %s %d %d *******"
453 (Ip.to_string s.server_ip) s.server_port s.server_http_port;
454 lprint_newline ();
455 if s.server_http_port <> 0 then
457 match c.client_user.user_kind with
458 Indirect_location ("", uid) ->
459 lprintf "++++++ ASKING FOR PUSH +++++++++"; lprint_newline ();
461 (* do as if connection failed. If it connects, connection will be set to OK *)
462 connection_failed c.client_connection_control;
463 let module P = Push in
464 let t = PushReq {
465 P.guid = uid;
466 P.ip = DO.client_ip None;
467 P.port = !!client_port;
468 P.index = List.assq file c.client_downloads;
469 } in
470 let p = new_packet t in
471 List.iter (fun s ->
472 match s.server_sock with
473 None -> ()
474 | Some sock -> server_send sock p
475 ) !connected_servers
476 | _ ->
478 OpenFTClients.connect_client c
481 let download_file (r : result) =
482 let file = new_file r.result_md5 r.result_name r.result_size in
483 lprintf "DOWNLOAD FILE %s" file.file_name; lprint_newline ();
484 if not (List.memq file !current_files) then begin
485 current_files := file :: !current_files;
486 end;
487 List.iter (fun (user, index) ->
488 let s = user.user_server in
489 let c = new_client s.server_ip s.server_port s.server_http_port in
490 add_download file c index;
491 get_file_from_source c file;
492 ) r.result_sources;
495 (* these two functions are also in dcGlobals.ml *)
496 let exit_exn = Exit
497 let basename filename =
498 let s =
499 let len = String.length filename in
501 let pos = String.rindex_from filename (len-1) '\\' in
502 String.sub filename (pos+1) (len-pos-1)
503 with _ ->
505 if len > 2 then
506 let c1 = Char.lowercase filename.[0] in
507 let c2 = filename.[1] in
508 match c1,c2 with
509 'a'..'z', ':' ->
510 String.sub filename 2 (len -2 )
511 | _ -> raise exit_exn
512 else raise exit_exn
513 with _ -> Filename.basename filename
515 String.lowercase s
518 let server_to_client s t sock =
520 if !!verbose_servers> 200 then begin
521 lprintf "From server:"; lprint_newline ();
522 print t;
523 end;
524 match t with
525 VersionReq ->
526 set_rtimeout sock 60.;
527 server_send sock (
528 let module V = VersionReply in
529 VersionReplyReq {
530 V.major_num = 0;
531 V.minor_num = 0;
532 V.micro_num = 5;
535 | VersionReplyReq t ->
536 let module V = VersionReply in
537 s.server_version <- Printf.sprintf "%d.%d.%d"
538 t.V.major_num t.V.minor_num t.V.micro_num;
539 set_server_state s Connected_idle;
540 connection_ok s.server_connection_control;
543 | NodeInfoReq ->
544 server_send sock (let module N = NodeInfoReply in
545 NodeInfoReplyReq { N.ip = client_ip (Some sock);
546 N.port = !!port;
547 N.http_port = !!http_port;
550 | NodeInfoReplyReq t ->
551 let module N = NodeInfoReply in
552 s.server_http_port <- t.N.http_port;
553 assert (s.server_port = t.N.port)
554 (* we should already have this information, no ? *)
556 | ClassReq ->
557 server_send sock (ClassReplyReq User_node)
559 | ClassReplyReq t ->
560 s.server_type <- t;
561 begin
562 match s.server_type with
563 Search_node ->
564 server_send sock (ChildReq None);
565 server_send sock (StatsReq Stats.Retrieve_info);
566 | _ ->
567 (* don't stay connected more than one minute to a user node *)
568 set_lifetime sock 60.
571 | NodeListReq ->
572 List.iter (fun s ->
573 server_send sock (let module N = NodeListReply in
574 (NodeListReplyReq (Some {
575 N.ip = s.server_ip;
576 N.port = s.server_port;
577 N.node_type = s.server_type;
578 })))
579 ) !connected_servers;
580 server_send sock (NodeListReplyReq None)
582 | NodeListReplyReq None -> ()
583 | NodeListReplyReq (Some t) ->
584 begin
585 let module N = NodeListReply in
586 if t.N.port <> 0 then
587 let s = new_server t.N.ip t.N.port in
588 match s.server_type with
589 User_node -> Fifo.put peers_queue (t.N.ip, t.N.port)
590 | _ -> s.server_type <- t.N.node_type
593 | NodeCapReq ->
594 server_send sock (NodeCapReplyReq ["MD5-FULL"]) (* not "ZLIB" yet *)
596 | NodeCapReplyReq t ->
597 s.server_caps <- t
599 | ChildReplyReq t ->
600 if t then begin
602 lprintf "************ CONNECTED AND CHILD *************";
603 lprint_newline ();
604 set_rtimeout sock 3600.;
605 server_send sock (ChildReq (Some true));
606 connected_servers := s :: !connected_servers;
607 recover_files_from_server sock
609 end else
610 disconnect_from_server s
612 | PingReq -> server_send sock PingReplyReq
615 | SearchReplyReq t ->
616 lprintf "REPLY TO QUERY"; lprint_newline ();
617 let module Q = SearchReply in
618 begin
620 let ss = Hashtbl.find searches_by_uid t.Q.id in
622 let ip = if t.Q.ip = Ip.null then s.server_ip else t.Q.ip in
623 let user = new_user ip t.Q.port t.Q.http_port in
625 lprintf "NEW RESULT %s" t.Q.filename; lprint_newline ();
626 let result = new_result t.Q.md5 (basename t.Q.filename) t.Q.size in
627 lprintf "ADDING SOURCE FOR RESULT NOT IMPLEMENTED";
628 lprint_newline ();
629 add_source result user t.Q.filename;
631 CommonInteractive.search_add_result ss.search_search result.result_result;
632 with Not_found ->
633 lprintf "NO SUCH SEARCH !!!!"; lprint_newline ();
636 let file = Hashtbl.find files_by_md5 t.Q.md5 in
637 let ip = if t.Q.ip = Ip.null then s.server_ip else t.Q.ip in
638 let user = new_user ip t.Q.port t.Q.http_port in
640 let result = new_result t.Q.md5 (basename t.Q.filename) t.Q.size in
641 lprintf "ADDING SOURCE FOR RESULT NOT IMPLEMENTED";
642 lprint_newline ();
643 add_source result user t.Q.filename;
645 let s = user.user_server in
646 let c = new_client s.server_ip s.server_port s.server_http_port in
647 add_download file c t.Q.filename;
648 get_file_from_source c file;
650 with _ ->
651 lprintf "NO SUCH SEARCH for no file !!!!";
652 lprint_newline ();
656 | _ ->
657 lprintf "UNUSED MESSAGE";
658 print t
662 lprintf "server_to_client"; lprint_newline ();
663 print p;
665 match p.pkt_payload with
666 | PingReq t ->
667 if p.pkt_hops <= 3 then
668 server_send sock {
669 p with
670 pkt_hops = p.pkt_hops + 1;
671 pkt_type = PONG;
672 pkt_payload = (
673 let module P = Pong in
674 PongReq {
675 P.ip = (DO.client_ip (Some sock));
676 P.port = !!client_port;
677 P.nfiles = 10;
678 P.nkb = 10;
681 | PongReq t ->
683 let module P = Pong in
684 (* lprintf "FROM %s:%d" (Ip.to_string t.P.ip) t.P.port; *)
685 if p.pkt_uid = s.server_ping_last then begin
686 s.server_nfiles_last <- s.server_nfiles_last + t.P.nfiles;
687 s.server_nkb_last <- s.server_nkb_last + t.P.nkb
690 | QueryReq _ ->
691 (* lprintf "REPLY TO QUERY NOT IMPLEMENTED YET :("; lprint_newline ();*)
694 | QueryReplyReq t ->
695 (* lprintf "REPLY TO QUERY"; lprint_newline ();*)
696 let module Q = QueryReply in
697 begin
699 let s = Hashtbl.find searches_by_uid p.pkt_uid in
701 let user = update_user t in
703 (* lprintf "ADDING RESULTS"; lprint_newline ();*)
704 List.iter (fun f ->
705 (* lprintf "NEW RESULT %s" f.Q.name; lprint_newline ();*)
706 let result = new_result f.Q.name f.Q.size in
707 add_source result user f.Q.index;
709 search_add_result s.search_search result.result_result;
710 ) t.Q.files
711 with Not_found ->
712 lprintf "NO SUCH SEARCH !!!!"; lprint_newline ();
713 List.iter (fun ff ->
714 List.iter (fun file ->
715 if file.file_name = ff.Q.name &&
716 file_size file = ff.Q.size then
717 begin
718 lprintf "++++++++++++++ RECOVER FILE %s +++++++++++++" file.file_name; lprint_newline ();
719 let c = update_client t in
720 add_download file c ff.Q.index;
722 ) !current_files;
723 ) t.Q.files
725 | _ -> ()
727 let send_pings () =
728 let pl =
729 let module P = Ping in
730 PingReq P.SimplePing
732 List.iter (fun s ->
733 match s.server_sock with
734 None -> ()
735 | Some sock ->
736 let p = { (new_packet pl) with pkt_ttl = 1; } in
737 s.server_nfiles <- s.server_nfiles_last;
738 s.server_nkb <- s.server_nkb_last;
739 s.server_ping_last <- p.pkt_uid;
740 s.server_nfiles_last <- 0;
741 s.server_nkb_last <- 0;
742 server_send sock p
743 ) !connected_servers
746 let connect_server (ip,port) =
747 if !!verbose_servers > 5 then begin
748 lprintf "SHOULD CONNECT TO %s:%d" (Ip.to_string ip) port;
749 lprint_newline ();
750 end;
751 let s = new_server ip port in
752 match s.server_sock with
753 Some _ -> ()
754 | None ->
756 let sock = connect "openft to server"
757 (Ip.to_inet_addr ip) port
758 (fun sock event ->
759 match event with
760 BASIC_EVENT (RTIMEOUT|LTIMEOUT) ->
761 (* lprintf "RTIMEOUT"; lprint_newline (); *)
762 disconnect_from_server s
763 | _ -> ()
764 ) in
765 TcpBufferedSocket.set_read_controler sock download_control;
766 TcpBufferedSocket.set_write_controler sock upload_control;
768 connection_try s.server_connection_control;
769 set_server_state s Connecting;
770 s.server_sock <- Some sock;
771 incr nservers;
772 set_reader sock (cut_messages OpenFTProtocol.parse
773 (server_to_client s)
775 set_closer sock (fun _ error ->
776 (* lprintf "CLOSER %s" error; lprint_newline ();*)
777 disconnect_from_server s);
778 set_rtimeout sock !!server_connection_timeout;
779 server_send sock VersionReq;
780 server_send sock ClassReq;
781 server_send sock NodeInfoReq;
782 server_send sock NodeListReq;
783 server_send sock NodeCapReq
784 with _ ->
785 disconnect_from_server s
788 let try_connect_ultrapeer () =
789 (* lprintf "try_connect_ultrapeer"; lprint_newline (); *)
790 let s = try
791 Fifo.take ultrapeers_queue
792 with _ ->
793 try
794 Fifo.take peers_queue
795 with _ ->
796 Hashtbl.iter (fun key s ->
797 match s.server_type with
798 User_node -> ()
799 | _ -> Fifo.put ultrapeers_queue key
800 ) servers_by_key;
801 raise Not_found
803 connect_server s;
806 let connect_servers () =
807 if !nservers < !!max_ultrapeers then begin
808 for i = !nservers to !!max_ultrapeers - 1 do
809 try_connect_ultrapeer ()
810 done
815 let ask_for_files () =
816 List.iter (fun file ->
817 List.iter (fun c ->
819 get_file_from_source c file
820 ) file.file_clients
821 ) !current_files;
828 let _ =
829 server_ops.op_server_connect <- (fun s ->
830 connect_server (s.server_ip, s.server_port));
831 server_ops.op_server_disconnect <- disconnect_from_server;
833 (* server_ops.op_server_query_users <- (fun s -> *)
834 match s.server_sock with
835 None -> ()
836 | Some sock ->
837 server_send sock (GetNickListReq)
839 (* server_ops.op_server_users <- (fun s -> *)
840 List2.tail_map (fun u -> as_user u.user_user) s.server_users
843 server_ops.op_server_remove <- (fun s ->
844 disconnect_from_server s;
845 Hashtbl.remove servers_by_key (s.server_ip, s.server_port);
846 server_remove (as_server s.server_server);
848 server_ops.op_server_sort <- (fun s ->
849 connection_last_conn s.server_connection_control