1 (* Copyright 2001, 2002 b8_bavard, b8_fee_carabine, INRIA *)
3 This file is part of mldonkey.
5 mldonkey is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation; either version 2 of the License, or
8 (at your option) any later version.
10 mldonkey is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 GNU General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with mldonkey; if not, write to the Free Software
17 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
26 open TcpBufferedSocket
28 open CommonInteractive
35 open CommonComplexOptions
47 let log_prefix = "[Fasttrack]"
50 lprintf_nl2
log_prefix fmt
53 lprintf2
log_prefix fmt
55 let search_num = ref 0
57 let should_update_shared_files = ref false
59 let network = new_network
"FT" "Fasttrack"
67 let connection_manager = network.network_connection_manager
69 let (server_ops
: server
CommonServer.server_ops
) =
70 CommonServer.new_server_ops
network
72 let (room_ops
: server
CommonRoom.room_ops
) =
73 CommonRoom.new_room_ops
network
75 let (user_ops
: user
CommonUser.user_ops
) =
76 CommonUser.new_user_ops
network
78 let (file_ops
: file
CommonFile.file_ops
) =
79 CommonFile.new_file_ops
network
81 let (client_ops
: client
CommonClient.client_ops
) =
82 CommonClient.new_client_ops
network
84 let as_client c
= as_client c
.client_client
85 let as_file file
= as_file file
.file_file
86 let file_size file
= file
.file_file
.impl_file_size
87 let file_downloaded file
= file_downloaded (as_file file
)
88 let file_age file
= file
.file_file
.impl_file_age
89 let file_fd file
= file_fd (as_file file
)
90 let file_disk_name file
= file_disk_name (as_file file
)
92 file_state (as_file file
)
94 file_num (as_file file
)
95 let file_must_update file
=
96 file_must_update (as_file file
)
98 server_num (as_server s
.server_server
)
99 let server_must_update s
=
100 server_must_update (as_server s
.server_server
)
102 server_state (as_server s
.server_server
)
103 let set_server_state s state
=
104 set_server_state (as_server s
.server_server
) state
105 let client_type c
= client_type (as_client c
)
106 let set_client_state client state
=
107 CommonClient.set_client_state (as_client client
) state
108 let set_client_disconnected client
=
109 CommonClient.set_client_disconnected (as_client client
)
110 let client_must_update client
= client_must_update (as_client client
)
112 (*************************************************************************)
116 (*************************************************************************)
120 let hosts_counter = ref 0
121 let old_client_name = ref ""
122 let ft_client_name = ref ""
123 let file_chunk_size = 307200
125 (*************************************************************************)
129 (*************************************************************************)
131 let current_files = ref ([] : FasttrackTypes.file list
)
132 let listen_sock = ref (None
: TcpServerSocket.t
option)
133 let udp_sock = ref (None
: UdpSocket.t
option)
134 let result_sources = Hashtbl.create
1011
135 (* let hosts_by_key = Hashtbl.create 103 *)
136 let (searches_by_uid
: (int, local_search
) Hashtbl.t
) = Hashtbl.create
11
137 let files_by_uid = Hashtbl.create
13
138 let (users_by_uid
) = Hashtbl.create
127
139 let (clients_by_uid
) = Hashtbl.create
127
140 let results_by_uid = Hashtbl.create
127
141 let connected_servers = ref ([] : server list
)
144 let (workflow : host Queue.t) =
145 Queues.workflow (fun time -> time + 120 > last_time ())
148 (* From the main workflow, hosts are moved to these workflows when they
149 are ready to be connected. *)
150 let (ultrapeers_waiting_queue
: host
Queue.t
) = Queues.workflow
ready
152 (* peers are only tested when no ultrapeers are available... *)
153 let (peers_waiting_queue
: host
Queue.t
) = Queues.workflow
ready
155 (* These are the peers that we should try to contact by UDP *)
156 let (waiting_udp_queue
: host
Queue.t
) = Queues.workflow
ready
158 (* These are the peers that have replied to our UDP requests *)
159 let (active_udp_queue
: host
Queue.t
) = Queues.fifo
()
161 (*************************************************************************)
163 (* Global functions *)
165 (*************************************************************************)
167 module H
= CommonHosts.Make
(struct
168 include FasttrackTypes
176 | Ultrapeer
-> ultrapeers_waiting_queue
177 | (_
) -> peers_waiting_queue
185 let default_requests kind
= [Tcp_Connect
,0; Udp_Connect
,0]
187 let max_ultrapeers = max_known_ultrapeers
188 let max_peers = max_known_peers
191 let check_server_country_code s
=
192 if Geoip.active
() then
193 match s
.server_country_code
with
195 s
.server_country_code
<-
196 Geoip.get_country_code_option
(Ip.ip_of_addr s
.server_host
.host_addr
)
199 let new_server ip port
=
200 let h = H.new_host ip port Ultrapeer
in
201 match h.host_server
with
205 server_server
= server_impl
;
207 server_country_code
= None
;
208 server_sock
= NoConnection
;
209 server_ciphers
= None
;
210 server_agent
= "<unknown>";
211 server_description
= "";
212 server_nfiles
= Int64.zero
;
213 server_nusers
= Int64.zero
;
214 server_maxnusers
= 0L;
217 server_need_qrt
= true;
218 server_ping_last
= Md4.random
();
219 server_nfiles_last
= zero
;
224 server_connected
= zero
;
225 server_query_key
= ();
226 server_searches
= Fifo.create
();
227 server_shared
= Intset.empty
;
230 dummy_server_impl
with
232 impl_server_ops
= server_ops
;
234 server_add server_impl
;
235 h.host_server
<- Some
s;
236 check_server_country_code s;
239 let add_source r
(user
: user
) =
242 Hashtbl.find
result_sources r
.stored_result_num
245 Hashtbl.add
result_sources r
.stored_result_num
ss;
248 if not
(List.mem_assq user
!ss) then begin
249 ss := (user
, last_time
()) :: !ss
252 let new_result file_name
file_size tags hashes _
=
258 let r = Hashtbl.find
results_by_uid hash
in
261 let tags = update_or_create_avail
tags in
262 let r = { dummy_result
with
263 result_names
= [file_name
];
264 result_size
= file_size;
266 result_uids
= [Uid.create
(Md5Ext hash
)];
267 result_source_network
= network.network_num
;
270 let r = update_result_num
r in
271 Hashtbl.add
results_by_uid hash
r;
277 let min_range_size = megabyte
279 let new_file file_temporary file_name
file_size file_hash user group
=
280 let file_temp = Filename.concat
!!temp_directory file_temporary
in
281 (* (Printf.sprintf "FT-%s" (Md4.to_string file_id)) in *)
282 let t = Unix32.create_rw
file_temp in
283 let file_chunk_size =
285 1L ++ file_size // (max
5L (1L ++ file_size // (megabytes
5)))
288 let uid = Uid.create
(Md5Ext file_hash
) in
290 file_file
= file_impl
;
291 file_temp = file_temporary
;
292 file_name
= file_name
;
295 file_searches
= [search
];
297 file_clients_queue
= Queues.workflow
(fun _
-> false);
298 file_nconnected_clients
= 0;
302 impl_file_fd
= Some
t;
303 impl_file_size
= file_size;
304 impl_file_downloaded
= Int64.zero
;
305 impl_file_owner
= user
;
306 impl_file_group
= group
;
307 impl_file_val
= file;
308 impl_file_ops
= file_ops
;
309 impl_file_age
= last_time
();
310 impl_file_best_name
= file_name
;
311 impl_file_filenames
= [file_name
];
313 search_search
= FileUidSearch
(file, file_hash
);
314 search_uid
= !search_num;
315 search_hosts
= Intset.empty
;
319 let kernel = CommonSwarming.create_swarmer
file_temp file_size in
320 let swarmer = CommonSwarming.create
kernel (as_file file)
322 file.file_swarmer
<- Some
swarmer;
323 Hashtbl.add searches_by_uid search
.search_uid search
;
324 (* lprintf "SET SIZE : %Ld\n" file_size;*)
325 CommonSwarming.set_verifier
swarmer ForceVerification
;
326 CommonSwarming.set_verified
swarmer (fun _ _
->
327 file_must_update file;
330 CommonSwarming.set_writer swarmer (fun offset s pos len ->
333 lprintf "DOWNLOADED: %d/%d/%d\n" pos len (String.length s);
334 AnyEndian.dump_sub s pos len;
337 if !!CommonOptions.buffer_writes
then
338 Unix32.buffered_write_copy
t offset
s pos len
340 Unix32.write
t offset
s pos len
342 current_files := file :: !current_files;
343 file_add file_impl FileDownloading
;
344 (* lprintf "ADD FILE TO DOWNLOAD LIST\n"; *)
347 exception FileFound
of file
349 let new_file file_id file_name
file_size file_uids user group
=
350 let file = ref None
in
351 List.iter
(fun uid ->
352 match Uid.to_uid
uid with
355 Hashtbl.find
files_by_uid file_hash
357 let file = new_file file_id file_name
file_size file_hash user group
in
358 Hashtbl.add
files_by_uid file_hash
file;
368 let s = Hashtbl.find users_by_uid kind
in
373 user_user
= user_impl
;
374 user_uid
= (match kind
with
375 Known_location _
-> Md4.null
376 | Indirect_location
(_
, uid, _
, _
) -> uid);
378 (* user_files = []; *)
385 impl_user_ops
= user_ops
;
386 impl_user_val
= user;
389 Hashtbl.add users_by_uid kind
user;
392 let check_client_country_code c
=
393 if Geoip.active
() then
394 match c
.client_country_code
with
396 (match c
.client_host
with
397 | Some
(ip
,port
) -> c
.client_country_code
<- Geoip.get_country_code_option ip
401 let new_client kind
=
403 Hashtbl.find clients_by_uid kind
405 let user = new_user kind
in
407 client_client
= impl
;
408 client_sock
= NoConnection
;
409 (* client_name = name;
410 client_kind = None; *)
411 client_requests
= [];
413 client_pos = Int32.zero;
414 client_error = false;
417 client_all_files
= None
;
419 client_connection_control
= new_connection_control
(());
420 client_downloads
= [];
422 client_country_code
= None
;
423 client_reconnect
= false;
424 client_in_queues
= [];
425 client_connected_for
= None
;
426 client_support_head_request
= true;
428 dummy_client_impl
with
430 impl_client_ops
= client_ops
;
431 impl_client_upload
= None
;
434 Hashtbl.add clients_by_uid kind
c;
437 let add_download file c () =
438 (* let r = new_result file.file_name (file_size file) in *)
439 (* add_source r c.client_user index; *)
440 if !verbose
then lprintf
"Adding file to client\n";
441 if not
(List.memq
c file.file_clients
) then begin
442 let chunks = [ Int64.zero
, file_size file ] in
444 download_file
= file;
445 (* download_uri = index; *)
446 download_chunks
= chunks;
447 download_uploader
= None
;
448 download_ranges
= [];
449 download_blocks
= [];
451 download_head_requested
= false;
452 download_ttr_requested
= false;
454 c.client_downloads
<- c.client_downloads
@ [d];
455 List.iter
(fun uid ->
456 match Uid.to_uid
uid with
457 Md5Ext hash
-> d.download_uri
<- Md5Ext.to_hexa_case
false hash
460 file.file_clients
<- c :: file.file_clients
;
461 file_add_source
(as_file file) (as_client c);
462 if not
(List.memq
file c.client_in_queues
) then begin
463 Queue.put
file.file_clients_queue
(0,c);
464 c.client_in_queues
<- file :: c.client_in_queues
468 let rec find_download file list
=
470 [] -> raise Not_found
472 if d.download_file
== file then d else find_download file tail
474 let remove_download file list
=
475 let rec iter file list rev
=
479 if d.download_file
== file then
480 iter file tail rev
else
481 iter file tail
(d :: rev
)
485 let remove_file file =
486 List.iter (fun uid ->
487 match Uid.to_uid
uid with
488 Md5Ext hash
-> Hashtbl.remove
files_by_uid hash
491 current_files := List2.removeq
file !current_files
494 CommonOptions.client_ip
495 (match sock
with Connection sock
-> Some sock
| _
-> None
)
498 match s.server_ciphers
with
501 cipher_free ciphers
.in_cipher
;
502 cipher_free ciphers
.out_cipher
;
503 s.server_ciphers
<- None
505 let disconnect_from_server nservers s reason
=
506 match s.server_sock
with
508 let h = s.server_host
in
509 (match server_state s with
511 let connection_time = Int64.to_int
(
512 (int64_time
()) -- s.server_connected
) in
513 if !verbose
then lprintf
"DISCONNECT FROM SERVER %s:%d after %d seconds [%s]\n"
514 (Ip.string_of_addr
h.host_addr
) h.host_port
516 (string_of_reason reason
)
520 (try close sock reason
with _
-> ());
521 s.server_sock
<- NoConnection
;
523 set_server_state s (NotConnected
(reason
, -1));
524 s.server_need_qrt
<- true;
526 if List.memq
s !connected_servers then
527 connected_servers := List2.removeq
s !connected_servers
532 let name = !!global_login
in
533 if name != !old_client_name then begin
534 let len = String.length
name in
535 ft_client_name := String.sub
name 0 (min
32 len);
536 old_client_name := name;
537 String2.replace_char
!ft_client_name ' ' '_'
;
541 (*************************************************************
542 Define a function to be called when the "mem_stats" command
543 is used to display information on structure footprint.
544 **************************************************************)
547 Heap.add_memstat
"FasttrackGlobals" (fun level buf
->
548 Printf.bprintf buf
"Number of old files: %d\n" (List.length
!!old_files
))