patch #7318
[mldonkey.git] / src / networks / fasttrack / fasttrackServers.ml
blob8ac85d25355fd485ebdd3778e117b817fa505182
1 (* Copyright 2001, 2002 b8_bavard, b8_fee_carabine, INRIA *)
2 (*
3 This file is part of mldonkey.
5 mldonkey is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation; either version 2 of the License, or
8 (at your option) any later version.
10 mldonkey is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 GNU General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with mldonkey; if not, write to the Free Software
17 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
20 open Int64ops
21 open Options
22 open Queues
23 open Printf2
24 open Md4
25 open BasicSocket
26 open TcpBufferedSocket
28 open AnyEndian
30 open CommonHosts
31 open CommonOptions
32 open CommonSearch
33 open CommonServer
34 open CommonComplexOptions
35 open CommonFile
36 open CommonDownloads
37 open CommonTypes
38 open CommonGlobals
40 open FasttrackNetwork
41 open FasttrackTypes
42 open FasttrackGlobals
43 open FasttrackOptions
44 open FasttrackProtocol
45 open FasttrackComplexOptions
46 open FasttrackProto
48 let load_nodes_file filename =
49 let regexp = Str.regexp "^\\([0-9]+\\.[0-9]+\\.[0-9]+\\.[0-9]+\\) \\([0-9]+\\) .*$" in
50 Unix2.tryopen_read filename (fun cin ->
51 try
53 while true do
54 let line = input_line cin in
55 try
56 if Str.string_match regexp line 0 then
57 let ip = Ip.addr_of_string (Str.matched_group 1 line) in
58 let port = int_of_string (Str.matched_group 2 line) in
59 try
60 ignore (H.new_host ip port Ultrapeer)
61 with Not_found -> ()
62 with _ ->
63 lprintf_nl "Syntax error in %s" filename;
64 done
66 with End_of_file -> ()
69 let unpack_nodes_gzip filename url =
70 let ext = String.lowercase (Filename2.extension filename) in
71 let last_ext = String.lowercase (Filename2.last_extension filename) in
72 let real_ext = if last_ext = ".zip" then last_ext else ext in
73 match real_ext with
74 | ".gzip" -> (
75 try
76 Misc.archive_extract filename "gz"
77 with e ->
78 lprintf_nl "Exception %s while extracting from %s" (Printexc2.to_string e) url;
79 raise Not_found
81 | _ -> filename
83 let _ =
84 CommonWeb.add_web_kind "nodes.gzip" "List of fasttrack nodes"
85 (fun url filename ->
86 lprintf_nl "nodes.gzip loaded from %s" url;
87 let f = unpack_nodes_gzip filename url in
88 load_nodes_file f;
89 if f <> filename then Sys.remove f
92 let server_parse_after s gconn sock =
93 try
94 match s.server_ciphers with
95 None -> assert false
96 | Some ciphers ->
97 let b = buf sock in
98 (* if !verbose_msg_raw then
99 lprintf "server_parse_after: %d bytes\n" b.len; *)
100 let rec iter () =
101 let len = b.len in
102 if len > 0 then
103 let size = TcpMessages.packet_size ciphers b.buf b.pos b.len in
104 match size with
105 None -> ()
106 | Some size ->
107 if len >= size then
108 let msg = String.sub b.buf b.pos size in
109 buf_used b size;
110 let addr, t = TcpMessages.parse ciphers msg in
111 FasttrackHandler.server_msg_handler sock s addr t;
112 iter ()
114 iter ()
115 with e ->
116 lprintf "Exception %s in server_parse_after\n"
117 (Printexc2.to_string e);
118 close sock (Closed_for_error "Reply not understood")
120 let server_connection_hook = ref (None : (server -> unit) option)
122 let greet_supernode s =
123 (match !server_connection_hook with
124 None ->
125 server_send s TcpMessages.DirectPacket (
126 TcpMessages.NodeInfoReq (
127 client_ip s.server_sock,
128 !!client_port,
129 default_bandwidth,
130 client_name ()))
131 | Some f -> f s)
132 (* ; server_send_ping s *)
134 let server_parse_netname s gconn sock =
135 let b = TcpBufferedSocket.buf sock in
136 let len = b.len in
137 let start_pos = b.pos in
138 let end_pos = start_pos + len in
139 let buf = b.buf in
140 let net = String.sub buf start_pos len in
141 if !verbose_msg_raw then
142 lprintf "net:[%s]\n" (String.escaped net);
143 let rec iter pos =
144 if pos < end_pos then
145 if buf.[pos] = '\000' then begin
146 let netname = String.sub buf start_pos (pos-start_pos) in
147 if !verbose_msg_raw then
148 lprintf "netname: [%s]\n" (String.escaped netname);
149 buf_used b (pos-start_pos+1);
150 match s.server_ciphers with
151 None -> assert false
152 | Some ciphers ->
153 gconn.gconn_handler <-
154 CipherReader (ciphers.in_cipher, server_parse_after s);
155 greet_supernode s
156 end else
157 iter (pos+1)
159 iter start_pos
161 let server_parse_cipher s gconn sock =
162 H.connected s.server_host;
163 let b = TcpBufferedSocket.buf sock in
164 if b.len >= 8 then
165 match s.server_ciphers with
166 None -> assert false
167 | Some ciphers ->
168 if !verbose_msg_raw then
169 lprintf "Cipher received from server\n";
170 get_cipher_from_packet b.buf b.pos ciphers.in_cipher;
171 init_cipher ciphers.in_cipher;
173 xor_ciphers ciphers.out_cipher ciphers.in_cipher;
174 init_cipher ciphers.out_cipher;
176 buf_used b 8;
177 server_crypt_and_send s ciphers.out_cipher (network_name ^ "\000");
178 gconn.gconn_handler <- CipherReader (ciphers.in_cipher, server_parse_netname s);
179 if !verbose_msg_raw then
180 lprintf "waiting for netname\n"
182 let client_cipher_seed () =
183 (* Int32.of_int (Random.int max_int) *)
184 0x0fACB1238l
186 let connection_header_hook = ref None
188 let connect_server h =
189 let s = match h.host_server with
190 None ->
191 let s = new_server h.host_addr h.host_port in
192 h.host_server <- Some s;
194 | Some s -> s
196 match s.server_sock with
197 | NoConnection ->
198 incr nservers;
199 let token =
200 add_pending_connection connection_manager (fun token ->
201 decr nservers;
203 let ip = Ip.ip_of_addr h.host_addr in
204 if not (Ip.valid ip) then
205 failwith "Invalid IP for server\n";
206 let port = s.server_host.host_port in
207 if !verbose_msg_servers then
208 lprintf "CONNECT TO %s:%d\n"
209 (Ip.string_of_addr h.host_addr) port;
210 H.set_request h Tcp_Connect;
211 H.try_connect h;
212 (* Standard Kazaa clients send a ping first, and only connect if they
213 receive a Supernode Pong. We send the ping only to get the latency. *)
214 udp_send ip port true (
215 let module M = UdpMessages in
216 M.PingReq (169, "\128", "KaZaA"));
218 let ip = Ip.to_inet_addr ip in
219 let sock = connect token "fasttrack to server"
220 ip port
221 (fun sock event ->
222 match event with
223 BASIC_EVENT (RTIMEOUT|LTIMEOUT) ->
224 (* lprintf "RTIMEOUT\n"; *)
225 disconnect_from_server nservers s Closed_for_timeout
226 | _ -> ()
227 ) in
228 TcpBufferedSocket.set_read_controler sock download_control;
229 TcpBufferedSocket.set_write_controler sock upload_control;
231 set_server_state s Connecting;
232 s.server_sock <- Connection sock;
233 incr nservers;
234 set_fasttrack_sock sock !verbose_msg_servers
235 (Reader (server_parse_cipher s)
237 set_closer sock (fun _ error ->
238 (* lprintf "CLOSER %s\n" error; *)
239 disconnect_from_server nservers s error);
240 set_rtimeout sock !!server_connection_timeout;
242 let in_cipher = create_cipher () in
243 let out_cipher = create_cipher () in
244 s.server_ciphers <- Some {
245 in_cipher = in_cipher;
246 out_cipher = out_cipher;
247 in_xinu = 0x51L;
248 out_xinu = 0x51L;
250 set_cipher out_cipher (client_cipher_seed ()) 0x29;
252 let s = String.create 12 in
254 (match !connection_header_hook with
255 None ->
256 s.[0] <- '\250';
257 s.[1] <- '\000';
258 s.[2] <- '\182';
259 s.[3] <- '\043';
260 | Some f -> f s);
262 cipher_packet_set out_cipher s 4;
264 if !verbose_msg_raw then begin
265 lprintf "SENDING %s\n" (String.escaped s);
266 AnyEndian.dump s;
267 end;
268 write_string sock s;
269 with _ ->
270 disconnect_from_server nservers s Closed_connect_failed
273 s.server_sock <- ConnectionWaiting token;
274 | _ -> ()
276 let get_file_from_source c file =
278 if connection_can_try c.client_connection_control then begin
279 connection_try c.client_connection_control;
280 match c.client_user.user_kind with
281 Indirect_location ("", uid, _, _) ->
283 lprintf "++++++ ASKING FOR PUSH +++++++++\n";
285 (* do as if connection failed. If it connects, connection will be set to OK *)
286 connection_failed c.client_connection_control;
288 let uri = (find_download file c.client_downloads).download_uri in
289 List.iter (fun s ->
290 FasttrackProto.server_send_push s uid uri
291 ) !connected_servers;
293 lprintf "PUSH NOT IMPLEMENTED\n"
294 | _ ->
295 if not (List.memq file c.client_in_queues) then begin
296 Queue.put file.file_clients_queue (1,c);
297 c.client_in_queues <- file :: c.client_in_queues
300 with e ->
301 lprintf "get_file_from_source: exception %s\n" (Printexc2.to_string e)
303 let exit = Exit
305 let disconnect_server s r =
306 match s.server_sock with
307 | Connection sock -> close sock r
308 | ConnectionWaiting token ->
309 cancel_token token;
310 s.server_sock <- NoConnection;
311 free_ciphers s
313 | _ -> ()
315 let really_recover_file file =
316 List.iter (fun s ->
317 List.iter (fun ss ->
318 if not (Fifo.mem s.server_searches ss) then
319 Fifo.put s.server_searches ss
320 ) file.file_searches
321 ) !connected_servers
323 let really_download_file (r : CommonTypes.result_info) user group =
324 let rec iter uids =
325 match uids with
326 uid :: tail ->
327 (match Uid.to_uid uid with
328 Md5Ext hash -> hash, Uid.to_file_string uid
329 | _ -> iter tail)
330 | [] -> raise IgnoreNetwork
332 let hash,file_temp = iter r.result_uids in
334 let file = new_file file_temp (List.hd r.result_names)
335 r.result_size [Uid.create (Md5Ext hash)] user group in
336 if !verbose then
337 lprintf "DOWNLOAD FILE %s\n" file.file_name;
338 if not (List.memq file !current_files) then begin
339 current_files := file :: !current_files;
340 end;
341 begin
342 let sources = Hashtbl.find result_sources r.result_num in
343 List.iter (fun (user, _) ->
344 let c = new_client user.user_kind in
345 add_download file c ();
346 get_file_from_source c file;
347 ) !sources;
348 end;
349 file
351 let ask_for_files () = (* called every minute *)
352 List.iter (fun file ->
353 List.iter (fun c ->
354 get_file_from_source c file
355 ) file.file_clients
356 ) !current_files;
357 let module M = TcpMessages in
358 List.iter (fun s ->
360 let ss = Fifo.take s.server_searches in
361 match ss.search_search with
362 FileUidSearch (file,file_hash) ->
363 if file_state file = FileDownloading then
364 server_send s M.DirectPacket
365 (M.SearchReq
366 (32, ss.search_uid, M.QueryLocationReq file_hash))
367 | UserSearch (_, words, (realm, tags)) ->
369 let realm =
370 match realm with
371 "audio" -> 0x21
372 | "video" -> 0x22
373 | "image" -> 0x23
374 | "text" -> 0x24
375 | "application" -> 0x25
376 | _ -> 0x3f
379 server_send s M.DirectPacket
380 (M.SearchReq
381 (32, ss.search_uid, M.QueryFilesReq
382 (words, realm, tags)))
383 with _ -> ()
384 ) !connected_servers;
387 let _ =
388 server_ops.op_server_disconnect <- (fun s ->
389 disconnect_server s Closed_by_user);
390 server_ops.op_server_remove <- (fun s ->
391 disconnect_server s Closed_by_user
394 let nranges file =
395 Int64.to_int ((file_size file) // min_range_size) + 5
397 let manage_hosts () =
398 H.manage_hosts ();
399 List.iter (fun file ->
400 if file_state file = FileDownloading then
402 (* For each file, we allow only (nranges+5) simultaneous communications,
403 to prevent too many clients from saturing the line for only one file. *)
404 let max_nconnected_clients = nranges file in
405 while file.file_nconnected_clients < max_nconnected_clients do
406 let (_,c) = Queue.take file.file_clients_queue in
407 c.client_in_queues <- List2.removeq file c.client_in_queues;
408 FasttrackClients.connect_client c
409 done
410 with _ -> ()
411 ) !current_files
413 let rec find_ultrapeer queue =
414 let (next,h) = Queue.head queue in
416 if next > last_time () then begin
417 (* lprintf "not ready: %d s\n" (next - last_time ()); *)
418 raise Not_found;
419 end;
420 ignore (H.host_queue_take queue);
422 with _ -> find_ultrapeer queue
424 let try_connect_ultrapeer connect =
425 (* lprintf "try_connect_ultrapeer....\n"; *)
426 let h =
428 find_ultrapeer ultrapeers_waiting_queue
429 with _ ->
430 (* lprintf "not in ultrapeers_waiting_queue\n"; *)
431 raise Not_found
433 (* lprintf "contacting..\n"; *)
434 connect h
436 let connect_servers connect =
437 (* lprintf "connect_servers %d %d\n" !nservers !!max_ultrapeers; *)
438 (if !!max_ultrapeers > List.length !connected_servers then
440 let to_connect = 3 * (!!max_ultrapeers - !nservers) in
441 for i = 1 to to_connect do
442 (* lprintf "try_connect_ultrapeer...\n"; *)
443 try_connect_ultrapeer connect
444 done
445 with _ -> ())
447 (* Looks like there is no ping to send in Fasttrack? *)
448 let send_pings () = ()
450 let send_query ss =
451 let s_uid = ss.search_uid in
452 let module M = TcpMessages in
453 let t =
454 match ss.search_search with
455 UserSearch (_, words, (realm, tags)) ->
456 let realm =
457 match realm with
458 "audio" -> 0x21
459 | "video" -> 0x22
460 | "image" -> 0x23
461 | "text" -> 0x24
462 | "application" -> 0x25
463 | _ -> 0x3f
465 M.QueryFilesReq (words, realm, tags)
466 | FileUidSearch (_, file_hash) ->
467 M.QueryLocationReq file_hash
469 List.iter (fun s ->
470 FasttrackProto.server_send s M.DirectPacket (
471 M.SearchReq (32, s_uid, t))) !connected_servers
474 TODO:
476 push request: we send a push to the server when we cannot connect to
477 a particular client. The client by connecting to us with a
478 "GIVE <push_id>\r\n" request, to which we can reply by a "GET ...." *)