patch #7247
[mldonkey.git] / src / networks / fasttrack / fasttrackGlobals.ml
blobaf790e8bf9a6aca3c467f163877cbb540ae06939
1 (* Copyright 2001, 2002 b8_bavard, b8_fee_carabine, INRIA *)
2 (*
3 This file is part of mldonkey.
5 mldonkey is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation; either version 2 of the License, or
8 (at your option) any later version.
10 mldonkey is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 GNU General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with mldonkey; if not, write to the Free Software
17 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
20 open Int64ops
21 open Queues
22 open Printf2
23 open Md4
24 open BasicSocket
25 open Options
26 open TcpBufferedSocket
28 open CommonInteractive
29 open CommonSwarming
30 open CommonHosts
31 open CommonOptions
32 open CommonClient
33 open CommonUser
34 open CommonTypes
35 open CommonComplexOptions
36 open CommonServer
37 open CommonResult
38 open CommonFile
39 open CommonGlobals
40 open CommonDownloads
41 open CommonNetwork
43 open FasttrackNetwork
44 open FasttrackTypes
45 open FasttrackOptions
47 let log_prefix = "[Fasttrack]"
49 let lprintf_nl fmt =
50 lprintf_nl2 log_prefix fmt
52 let lprintf_n fmt =
53 lprintf2 log_prefix fmt
55 let search_num = ref 0
57 let should_update_shared_files = ref false
59 let network = new_network "FT" "Fasttrack"
61 NetworkHasSupernodes;
62 NetworkHasRooms;
63 NetworkHasChat;
64 NetworkHasSearch;
67 let connection_manager = network.network_connection_manager
69 let (server_ops : server CommonServer.server_ops) =
70 CommonServer.new_server_ops network
72 let (room_ops : server CommonRoom.room_ops) =
73 CommonRoom.new_room_ops network
75 let (user_ops : user CommonUser.user_ops) =
76 CommonUser.new_user_ops network
78 let (file_ops : file CommonFile.file_ops) =
79 CommonFile.new_file_ops network
81 let (client_ops : client CommonClient.client_ops) =
82 CommonClient.new_client_ops network
84 let as_client c = as_client c.client_client
85 let as_file file = as_file file.file_file
86 let file_size file = file.file_file.impl_file_size
87 let file_downloaded file = file_downloaded (as_file file)
88 let file_age file = file.file_file.impl_file_age
89 let file_fd file = file_fd (as_file file)
90 let file_disk_name file = file_disk_name (as_file file)
91 let file_state file =
92 file_state (as_file file)
93 let file_num file =
94 file_num (as_file file)
95 let file_must_update file =
96 file_must_update (as_file file)
97 let server_num s =
98 server_num (as_server s.server_server)
99 let server_must_update s =
100 server_must_update (as_server s.server_server)
101 let server_state s =
102 server_state (as_server s.server_server)
103 let set_server_state s state =
104 set_server_state (as_server s.server_server) state
105 let client_type c = client_type (as_client c)
106 let set_client_state client state =
107 CommonClient.set_client_state (as_client client) state
108 let set_client_disconnected client =
109 CommonClient.set_client_disconnected (as_client client)
110 let client_must_update client = client_must_update (as_client client)
112 (*************************************************************************)
113 (* *)
114 (* Global values *)
115 (* *)
116 (*************************************************************************)
118 let nservers = ref 0
119 let ready _ = false
120 let hosts_counter = ref 0
121 let old_client_name = ref ""
122 let ft_client_name = ref ""
123 let file_chunk_size = 307200
125 (*************************************************************************)
126 (* *)
127 (* Global tables *)
128 (* *)
129 (*************************************************************************)
131 let current_files = ref ([] : FasttrackTypes.file list)
132 let listen_sock = ref (None : TcpServerSocket.t option)
133 let udp_sock = ref (None: UdpSocket.t option)
134 let result_sources = Hashtbl.create 1011
135 (* let hosts_by_key = Hashtbl.create 103 *)
136 let (searches_by_uid : (int, local_search) Hashtbl.t) = Hashtbl.create 11
137 let files_by_uid = Hashtbl.create 13
138 let (users_by_uid ) = Hashtbl.create 127
139 let (clients_by_uid ) = Hashtbl.create 127
140 let results_by_uid = Hashtbl.create 127
141 let connected_servers = ref ([] : server list)
144 let (workflow : host Queue.t) =
145 Queues.workflow (fun time -> time + 120 > last_time ())
148 (* From the main workflow, hosts are moved to these workflows when they
149 are ready to be connected. *)
150 let (ultrapeers_waiting_queue : host Queue.t) = Queues.workflow ready
152 (* peers are only tested when no ultrapeers are available... *)
153 let (peers_waiting_queue : host Queue.t) = Queues.workflow ready
155 (* These are the peers that we should try to contact by UDP *)
156 let (waiting_udp_queue : host Queue.t) = Queues.workflow ready
158 (* These are the peers that have replied to our UDP requests *)
159 let (active_udp_queue : host Queue.t) = Queues.fifo ()
161 (*************************************************************************)
162 (* *)
163 (* Global functions *)
164 (* *)
165 (*************************************************************************)
167 module H = CommonHosts.Make(struct
168 include FasttrackTypes
169 type ip = Ip.addr
171 let requests =
173 Tcp_Connect,
174 (600, (fun kind ->
175 [ match kind with
176 | Ultrapeer -> ultrapeers_waiting_queue
177 | (_) -> peers_waiting_queue
180 Udp_Connect,
181 (600, (fun kind ->
182 [waiting_udp_queue]
185 let default_requests kind = [Tcp_Connect,0; Udp_Connect,0]
187 let max_ultrapeers = max_known_ultrapeers
188 let max_peers = max_known_peers
189 end)
191 let check_server_country_code s =
192 if Geoip.active () then
193 match s.server_country_code with
194 | None ->
195 s.server_country_code <-
196 Geoip.get_country_code_option (Ip.ip_of_addr s.server_host.host_addr)
197 | _ -> ()
199 let new_server ip port =
200 let h = H.new_host ip port Ultrapeer in
201 match h.host_server with
202 Some s -> s
203 | None ->
204 let rec s = {
205 server_server = server_impl;
206 server_host = h;
207 server_country_code = None;
208 server_sock = NoConnection;
209 server_ciphers = None;
210 server_agent = "<unknown>";
211 server_description = "";
212 server_nfiles = Int64.zero;
213 server_nusers = Int64.zero;
214 server_maxnusers = 0L;
215 server_nkb = 0;
217 server_need_qrt = true;
218 server_ping_last = Md4.random ();
219 server_nfiles_last = zero;
220 server_nkb_last = 0;
221 server_vendor = "";
222 server_last_lni = 0;
224 server_connected = zero;
225 server_query_key = ();
226 server_searches = Fifo.create ();
227 server_shared = Intset.empty;
228 } and
229 server_impl = {
230 dummy_server_impl with
231 impl_server_val = s;
232 impl_server_ops = server_ops;
233 } in
234 server_add server_impl;
235 h.host_server <- Some s;
236 check_server_country_code s;
239 let add_source r (user : user) =
240 let ss =
242 Hashtbl.find result_sources r.stored_result_num
243 with _ ->
244 let ss = ref [] in
245 Hashtbl.add result_sources r.stored_result_num ss;
248 if not (List.mem_assq user !ss) then begin
249 ss := (user, last_time ()) :: !ss
252 let new_result file_name file_size tags hashes _ =
254 match hashes with
255 | [ hash ] ->
256 let r =
258 let r = Hashtbl.find results_by_uid hash in
259 increment_avail r
260 with _ ->
261 let tags = update_or_create_avail tags in
262 let r = { dummy_result with
263 result_names = [file_name];
264 result_size = file_size;
265 result_tags = tags;
266 result_uids = [Uid.create (Md5Ext hash)];
267 result_source_network = network.network_num;
270 let r = update_result_num r in
271 Hashtbl.add results_by_uid hash r;
275 | _ -> assert false
277 let min_range_size = megabyte
279 let new_file file_temporary file_name file_size file_hash user group =
280 let file_temp = Filename.concat !!temp_directory file_temporary in
281 (* (Printf.sprintf "FT-%s" (Md4.to_string file_id)) in *)
282 let t = Unix32.create_rw file_temp in
283 let file_chunk_size =
284 max megabyte (
285 1L ++ file_size // (max 5L (1L ++ file_size // (megabytes 5)))
288 let uid = Uid.create (Md5Ext file_hash) in
289 let rec file = {
290 file_file = file_impl;
291 file_temp = file_temporary;
292 file_name = file_name;
293 file_clients = [];
294 file_swarmer = None;
295 file_searches = [search];
296 file_uids = [uid];
297 file_clients_queue = Queues.workflow (fun _ -> false);
298 file_nconnected_clients = 0;
299 file_ttr = None;
300 } and file_impl = {
301 dummy_file_impl with
302 impl_file_fd = Some t;
303 impl_file_size = file_size;
304 impl_file_downloaded = Int64.zero;
305 impl_file_owner = user;
306 impl_file_group = group;
307 impl_file_val = file;
308 impl_file_ops = file_ops;
309 impl_file_age = last_time ();
310 impl_file_best_name = file_name;
311 impl_file_filenames = [file_name];
312 } and search = {
313 search_search = FileUidSearch (file, file_hash);
314 search_uid = !search_num;
315 search_hosts = Intset.empty;
318 incr search_num;
319 let kernel = CommonSwarming.create_swarmer file_temp file_size in
320 let swarmer = CommonSwarming.create kernel (as_file file)
321 file_chunk_size in
322 file.file_swarmer <- Some swarmer;
323 Hashtbl.add searches_by_uid search.search_uid search;
324 (* lprintf "SET SIZE : %Ld\n" file_size;*)
325 CommonSwarming.set_verifier swarmer ForceVerification;
326 CommonSwarming.set_verified swarmer (fun _ _ ->
327 file_must_update file;
330 CommonSwarming.set_writer swarmer (fun offset s pos len ->
333 lprintf "DOWNLOADED: %d/%d/%d\n" pos len (String.length s);
334 AnyEndian.dump_sub s pos len;
337 if !!CommonOptions.buffer_writes then
338 Unix32.buffered_write_copy t offset s pos len
339 else
340 Unix32.write t offset s pos len
341 ); *)
342 current_files := file :: !current_files;
343 file_add file_impl FileDownloading;
344 (* lprintf "ADD FILE TO DOWNLOAD LIST\n"; *)
345 file
347 exception FileFound of file
349 let new_file file_id file_name file_size file_uids user group =
350 let file = ref None in
351 List.iter (fun uid ->
352 match Uid.to_uid uid with
353 Md5Ext file_hash ->
354 file := Some (try
355 Hashtbl.find files_by_uid file_hash
356 with _ ->
357 let file = new_file file_id file_name file_size file_hash user group in
358 Hashtbl.add files_by_uid file_hash file;
359 file)
360 | _ -> ()
361 ) file_uids;
362 match !file with
363 None -> assert false
364 | Some file -> file
366 let new_user kind =
368 let s = Hashtbl.find users_by_uid kind in
369 s.user_kind <- kind;
371 with _ ->
372 let rec user = {
373 user_user = user_impl;
374 user_uid = (match kind with
375 Known_location _ -> Md4.null
376 | Indirect_location (_, uid, _, _) -> uid);
377 user_kind = kind;
378 (* user_files = []; *)
379 user_speed = 0;
380 user_vendor = "";
381 user_software = "";
382 user_nick = "";
383 } and user_impl = {
384 dummy_user_impl with
385 impl_user_ops = user_ops;
386 impl_user_val = user;
387 } in
388 user_add user_impl;
389 Hashtbl.add users_by_uid kind user;
390 user
392 let check_client_country_code c =
393 if Geoip.active () then
394 match c.client_country_code with
395 | None ->
396 (match c.client_host with
397 | Some (ip,port) -> c.client_country_code <- Geoip.get_country_code_option ip
398 | _ -> ())
399 | _ -> ()
401 let new_client kind =
403 Hashtbl.find clients_by_uid kind
404 with _ ->
405 let user = new_user kind in
406 let rec c = {
407 client_client = impl;
408 client_sock = NoConnection;
409 (* client_name = name;
410 client_kind = None; *)
411 client_requests = [];
413 client_pos = Int32.zero;
414 client_error = false;
417 client_all_files = None;
418 client_user = user;
419 client_connection_control = new_connection_control (());
420 client_downloads = [];
421 client_host = None;
422 client_country_code = None;
423 client_reconnect = false;
424 client_in_queues = [];
425 client_connected_for = None;
426 client_support_head_request = true;
427 } and impl = {
428 dummy_client_impl with
429 impl_client_val = c;
430 impl_client_ops = client_ops;
431 impl_client_upload = None;
432 } in
433 new_client impl;
434 Hashtbl.add clients_by_uid kind c;
437 let add_download file c () =
438 (* let r = new_result file.file_name (file_size file) in *)
439 (* add_source r c.client_user index; *)
440 if !verbose then lprintf "Adding file to client\n";
441 if not (List.memq c file.file_clients) then begin
442 let chunks = [ Int64.zero, file_size file ] in
443 let d = {
444 download_file = file;
445 (* download_uri = index; *)
446 download_chunks = chunks;
447 download_uploader = None;
448 download_ranges = [];
449 download_blocks = [];
450 download_uri = "";
451 download_head_requested = false;
452 download_ttr_requested = false;
453 } in
454 c.client_downloads <- c.client_downloads @ [d];
455 List.iter (fun uid ->
456 match Uid.to_uid uid with
457 Md5Ext hash -> d.download_uri <- Md5Ext.to_hexa_case false hash
458 | _ -> ()
459 ) file.file_uids;
460 file.file_clients <- c :: file.file_clients;
461 file_add_source (as_file file) (as_client c);
462 if not (List.memq file c.client_in_queues) then begin
463 Queue.put file.file_clients_queue (0,c);
464 c.client_in_queues <- file :: c.client_in_queues
465 end;
468 let rec find_download file list =
469 match list with
470 [] -> raise Not_found
471 | d :: tail ->
472 if d.download_file == file then d else find_download file tail
474 let remove_download file list =
475 let rec iter file list rev =
476 match list with
477 [] -> List.rev rev
478 | d :: tail ->
479 if d.download_file == file then
480 iter file tail rev else
481 iter file tail (d :: rev)
483 iter file list []
485 let remove_file file =
486 List.iter (fun uid ->
487 match Uid.to_uid uid with
488 Md5Ext hash -> Hashtbl.remove files_by_uid hash
489 | _ -> ()
490 ) file.file_uids;
491 current_files := List2.removeq file !current_files
493 let client_ip sock =
494 CommonOptions.client_ip
495 (match sock with Connection sock -> Some sock | _ -> None)
497 let free_ciphers s =
498 match s.server_ciphers with
499 None -> ()
500 | Some ciphers ->
501 cipher_free ciphers.in_cipher;
502 cipher_free ciphers.out_cipher;
503 s.server_ciphers <- None
505 let disconnect_from_server nservers s reason =
506 match s.server_sock with
507 | Connection sock ->
508 let h = s.server_host in
509 (match server_state s with
510 Connected _ ->
511 let connection_time = Int64.to_int (
512 (int64_time ()) -- s.server_connected) in
513 if !verbose then lprintf "DISCONNECT FROM SERVER %s:%d after %d seconds [%s]\n"
514 (Ip.string_of_addr h.host_addr) h.host_port
515 connection_time
516 (string_of_reason reason)
518 | _ -> ()
520 (try close sock reason with _ -> ());
521 s.server_sock <- NoConnection;
522 free_ciphers s;
523 set_server_state s (NotConnected (reason, -1));
524 s.server_need_qrt <- true;
525 decr nservers;
526 if List.memq s !connected_servers then
527 connected_servers := List2.removeq s !connected_servers
528 | _ -> ()
530 let client_name () =
532 let name = !!global_login in
533 if name != !old_client_name then begin
534 let len = String.length name in
535 ft_client_name := String.sub name 0 (min 32 len);
536 old_client_name := name;
537 String2.replace_char !ft_client_name ' ' '_';
538 end;
539 !ft_client_name
541 (*************************************************************
542 Define a function to be called when the "mem_stats" command
543 is used to display information on structure footprint.
544 **************************************************************)
546 let _ =
547 Heap.add_memstat "FasttrackGlobals" (fun level buf ->
548 Printf.bprintf buf "Number of old files: %d\n" (List.length !!old_files))