1 (* Copyright 2001, 2002 b8_bavard, b8_fee_carabine, INRIA *)
3 This file is part of mldonkey.
5 mldonkey is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation; either version 2 of the License, or
8 (at your option) any later version.
10 mldonkey is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 GNU General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with mldonkey; if not, write to the Free Software
17 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
20 open CommonInteractive
27 open TcpBufferedSocket
34 open CommonComplexOptions
47 let log_prefix = "[Gnutella]"
50 lprintf_nl2
log_prefix fmt
53 lprintf2
log_prefix fmt
55 let should_update_shared_files = ref false
59 let network = new_network
"GNUT" "Gnutella"
67 let connection_manager = network.network_connection_manager
69 let (server_ops
: server
CommonServer.server_ops
) =
70 CommonServer.new_server_ops
network
72 let (room_ops
: server
CommonRoom.room_ops
) =
73 CommonRoom.new_room_ops
network
75 let (user_ops
: user
CommonUser.user_ops
) =
76 CommonUser.new_user_ops
network
78 let (file_ops
: file
CommonFile.file_ops
) =
79 CommonFile.new_file_ops
network
81 let (client_ops
: client
CommonClient.client_ops
) =
82 CommonClient.new_client_ops
network
84 let as_client c
= as_client c
.client_client
85 let as_file file
= as_file file
.file_file
86 let file_size file
= file
.file_file
.impl_file_size
87 let file_downloaded file
= file_downloaded (as_file file
)
88 let file_age file
= file
.file_file
.impl_file_age
89 let file_fd file
= file_fd (as_file file
)
90 let file_disk_name file
= file_disk_name (as_file file
)
91 let file_state file
= file_state (as_file file
)
92 let file_num file
= file_num (as_file file
)
93 let file_must_update file
= file_must_update (as_file file
)
94 let client_must_update client
= client_must_update (as_client client
)
96 let current_files = ref ([] : GnutellaTypes.file list
)
98 let listen_sock = ref (None
: TcpServerSocket.t
option)
100 (*let hosts_by_key = Hashtbl.create 103 *)
102 let (searches_by_uid
: (Md4.t
, local_search
) Hashtbl.t
) = Hashtbl.create
11
105 let redirector_connected = ref false
106 (* let redirectors_ips = ref ( [] : Ip.t list) *)
107 let redirectors_to_try = ref ( [] : string list
)
111 (*let (shareds_by_uid : (uid_type, shared) Hashtbl.t) = Hashtbl.create 13 *)
112 let files_by_uid = Hashtbl.create
13
113 (* let files_by_key = Hashtbl.create 13 *)
115 let (users_by_uid
) = Hashtbl.create
127
116 let (clients_by_uid
) = Hashtbl.create
127
117 (* We don't want to support this feature anymore as it is too old.
119 let results_by_key = Hashtbl.create 127 *)
123 let (results_by_uid
: (uid_type
, result
) Hashtbl.t
) = Hashtbl.create
127
125 let max_upload_buffer_len = 102400
126 let upload_buffer = String.create
max_upload_buffer_len
128 (***************************************************************
134 ****************************************************************)
138 (* From the main workflow, hosts are moved to these workflows when they
139 are ready to be connected. They will only be connected when connections
140 will be available. We separate g1/g2, and g0 (unknown kind). *)
141 let (ultrapeers_waiting_queue
: host
Queue.t
) = Queues.workflow
ready
143 (* peers are only tested when no ultrapeers are available... *)
144 let (peers_waiting_queue
: host
Queue.t
) = Queues.workflow
ready
146 (* These are the peers that we should try to contact by UDP *)
147 let (waiting_udp_queue
: host
Queue.t
) = Queues.workflow
ready
149 (* These are the peers that have replied to our UDP requests *)
150 let (active_udp_queue
: host
Queue.t
) = Queues.fifo
()
154 let connected_servers = ref ([] : server list
)
157 module H
= CommonHosts.Make
(struct
158 include GnutellaTypes
166 | Ultrapeer
-> ultrapeers_waiting_queue
167 | (_
) -> peers_waiting_queue
175 let default_requests kind
= [Tcp_Connect
,0; Udp_Connect
,0]
177 let max_ultrapeers = max_known_ultrapeers
178 let max_peers = max_known_peers
181 let find_server ip port
=
183 let h = Hashtbl.find
H.hosts_by_key (ip
,port
) in
187 let check_server_country_code s
=
188 if Geoip.active
() then
189 match s
.server_country_code
with
191 s
.server_country_code
<-
192 Geoip.get_country_code_option
(Ip.ip_of_addr s
.server_host
.host_addr
)
195 let new_server ip port
=
196 let h = H.new_host ip port Ultrapeer
in
197 match h.host_server
with
201 server_server
= server_impl
;
202 server_ciphers
= None
;
204 server_country_code
= None
;
205 server_sock
= NoConnection
;
206 server_agent
= "<unknown>";
207 server_description
= "";
208 server_nfiles
= Int64.zero
;
210 server_nusers
= Int64.zero
;
211 server_maxnusers
= 0L;
212 server_need_qrt
= true;
213 server_ping_last
= Md4.random
();
215 server_nfiles_last
= Int64.zero
;
219 server_connected
= zero
;
220 server_query_key
= NoUdpSupport
;
221 server_searches
= Fifo.create
();
222 server_shared
= Intset.empty
;
225 dummy_server_impl
with
227 impl_server_ops
= server_ops
;
229 server_add server_impl
;
230 h.host_server
<- Some
s;
231 h.host_on_remove
<- (fun _
-> server_remove
(as_server server_impl
));
232 check_server_country_code s;
235 let extract_uids arg
= Uid.expand
[Uid.of_string arg
]
237 let result_sources = Hashtbl.create
1000
239 let add_source r
(s : user
) (index
: file_uri
) =
242 Hashtbl.find
result_sources r
.stored_result_num
245 Hashtbl.add
result_sources r
.stored_result_num
ss;
248 let key = (s, index
) in
249 if not
(List.mem_assq
key !ss) then begin
250 ss := (key, last_time
()) :: !ss
253 let new_result file_name
file_size (tags
: CommonTypes.tag list
) (uids
: Uid.t list
) sources
=
256 lprintf "New result by key\n";
257 let key = (file_name, file_size) in
259 Hashtbl.find results_by_key key
261 let r = { dummy_result with
262 result_names = [file_name];
263 result_size = file_size;
265 (* TODO: result_netfid, result_network *)
269 let r = update_result_num
r in
270 Hashtbl.add
results_by_key key r;
272 failwith
"Result without UID dropped"
273 | uid
:: other_uids
->
275 lprintf
"New result by UID\n";
278 let r = Hashtbl.find results_by_uid
(Uid.to_uid uid
) in
282 let tags = update_or_create_avail
tags in
284 let r = { dummy_result
with
285 result_names
= [file_name
];
286 result_size
= file_size;
289 result_source_network
= network.network_num
;
292 let rs = update_result_num
r in
293 Hashtbl.add results_by_uid
(Uid.to_uid uid
) rs;
296 (* let r = IndexedResults.get_result rs in
297 let rec iter_uid uid =
298 if not (List.mem uid r.result_uids) then begin
299 r.result_uids <- uid :: r.result_uids;
301 let rrs = Hashtbl.find results_by_uid uid in
303 let result_uids = rr.result_uids in
304 rr.result_uids <- [];
305 List.iter (fun uid ->
306 Hashtbl.remove results_by_uid uid) result_uids;
307 List.iter (fun uid -> iter_uid uid) result_uids;
308 List.iter (fun ( (s: user) , (index: file_uri) ) ->
313 Hashtbl.add results_by_uid uid r;
316 List.iter iter_uid other_uids;
320 let megabyte = Int64.of_int
(1024 * 1024)
321 let megabytes10 = Int64.of_int
(10 * 1024 * 1024)
323 let new_file file_temporary file_name
file_size file_uids user group
=
324 let file_temp = Filename.concat
!!temp_directory file_temporary
in
325 let t = Unix32.create_rw
file_temp in
327 file_file
= file_impl
;
328 file_temp = file_temporary
;
329 file_name
= file_name
;
331 file_uids
= file_uids
;
334 file_clients_queue
= Queues.workflow
(fun _
-> false);
335 file_nconnected_clients
= 0;
338 (dummy_file_impl
()) with
339 impl_file_fd
= Some
t;
340 impl_file_size
= file_size;
341 impl_file_downloaded
= Int64.zero
;
342 impl_file_owner
= user
;
343 impl_file_group
= group
;
344 impl_file_val
= file;
345 impl_file_ops
= file_ops
;
346 impl_file_age
= last_time
();
347 impl_file_best_name
= file_name
;
348 impl_file_filenames
= [file_name
];
352 lprintf_nl "SET SIZE : %Ld" file_size;
353 let kernel = CommonSwarming.create_swarmer
file_temp file_size in
354 let swarmer = CommonSwarming.create
kernel (as_file file) megabyte in
355 CommonSwarming.set_verifier
swarmer ForceVerification
;
357 (* TODO: we could generalize this approach to any UID that is computed
358 on the complete file (md5, sha1,...) *)
359 if file_size < !!sha1_verification_threshold
then
360 List.iter
(fun uid
->
361 match Uid.to_uid uid
with
363 CommonSwarming.set_verifier
swarmer (Verification
[| uid
|])
365 file.file_swarmer
<- Some
swarmer;
366 current_files := file :: !current_files;
367 file_add file_impl FileDownloading
;
370 exception FileFound
of file
372 let new_file file_id file_name
file_size file_uids user group
=
373 (* if file_uids = [] then
374 try Hashtbl.find files_by_key (file_name, file_size) with
376 let file = new_file file_id file_name file_size in
377 Hashtbl.add files_by_key (file_name, file_size) file;
381 List.iter
(fun uid
->
382 try raise
(FileFound
(Hashtbl.find
files_by_uid uid
))
385 let file = new_file file_id file_name
file_size file_uids user group
in
386 List.iter
(fun uid
->
388 lprintf
"Adding file %s\n" (Uid.to_string uid
);
389 Hashtbl.add
files_by_uid uid
file) file_uids
;
391 with FileFound
file ->
392 List.iter
(fun uid
->
393 if not
(List.mem uid
file.file_uids
) then begin
394 file.file_uids
<- uid
:: file.file_uids
;
395 Hashtbl.add
files_by_uid uid
file;
402 let s = Hashtbl.find users_by_uid kind
in
407 user_user
= user_impl
;
408 user_uid
= (match kind
with
409 Known_location _
-> Md4.null
410 | Indirect_location
(_
, uid
, _
, _
) -> uid
);
412 (* user_files = []; *)
415 (* user_gnutella2 = false; *)
420 impl_user_ops
= user_ops
;
421 impl_user_val
= user;
424 Hashtbl.add users_by_uid kind
user;
427 let check_client_country_code c
=
428 if Geoip.active
() then
429 match c
.client_country_code
with
431 (match c
.client_host
with
433 c
.client_country_code
<- Geoip.get_country_code_option ip
437 let new_client kind
=
439 Hashtbl.find clients_by_uid kind
441 let user = new_user kind
in
443 client_client
= impl
;
444 client_sock
= NoConnection
;
445 (* client_name = name;
446 client_kind = None; *)
447 client_requests
= [];
449 client_pos = Int32.zero;
450 client_error = false;
453 client_all_files
= None
;
455 client_connection_control
= new_connection_control
(());
456 client_downloads
= [];
458 client_country_code
= None
;
459 client_reconnect
= false;
460 client_in_queues
= [];
461 client_connected_for
= None
;
462 client_support_head_request
= true;
465 dummy_client_impl
with
467 impl_client_ops
= client_ops
;
468 impl_client_upload
= None
;
471 Hashtbl.add clients_by_uid kind
c;
474 let add_download file c index
=
475 (* let r = new_result file.file_name (file_size file) in *)
476 (* add_source r c.client_user index; *)
478 lprintf
"Adding file to client\n";
479 if not
(List.memq
c file.file_clients
) then begin
480 let chunks = [ Int64.zero
, file_size file ] in
481 (* let up = CommonSwarming.register_uploader file.file_swarmer
482 (CommonSwarming.AvailableRanges chunks) in *)
483 c.client_downloads
<- c.client_downloads
@ [{
484 download_file
= file;
485 download_uri
= index
;
486 download_chunks
= chunks;
487 download_ranges
= [];
488 download_blocks
= [];
489 download_uploader
= None
;
490 download_head_requested
= false;
491 download_ttr_requested
= false;
493 file.file_clients
<- c :: file.file_clients
;
494 file_add_source
(as_file file) (as_client c)
497 let rec find_download file list
=
499 [] -> raise Not_found
501 if d
.download_file
== file then d
else find_download file tail
503 let rec find_download_by_index index list
=
505 [] -> raise Not_found
507 match d
.download_uri
with
508 FileByIndex
(i
,_
) when i
= index
-> d
509 | _
-> find_download_by_index index tail
511 let remove_download file list
=
512 let rec iter file list rev
=
516 if d
.download_file
== file then
517 iter file tail rev
else
518 iter file tail
(d
:: rev
)
523 server_num (as_server
s.server_server
)
526 server_state (as_server
s.server_server
)
528 let set_server_state s state
=
529 set_server_state (as_server
s.server_server
) state
532 let server_remove s =
533 connected_servers := List2.removeq s !connected_servers;
534 (* Hashtbl.remove servers_by_key (s.server_ip, s.server_port)*)
538 let client_type c = client_type (as_client c)
540 let set_client_state client state
=
541 CommonClient.set_client_state (as_client client
) state
543 let set_client_disconnected client
=
544 CommonClient.set_client_disconnected (as_client client
)
547 let remove_file file =
548 (* if file.file_uids = [] then
549 Hashtbl.remove files_by_key (file.file_name, file.file_file.impl_file_size)
551 List.iter (fun uid
->
553 lprintf
"******REMOVE %s\n" (Uid.to_string uid
);
554 Hashtbl.remove
files_by_uid uid
556 current_files := List2.removeq
file !current_files
558 let udp_sock = ref (None
: UdpSocket.t option)
561 CommonOptions.client_ip
562 (match sock
with Connection sock
-> Some sock
| _
-> None
)
564 let disconnect_from_server s r =
566 lprintf_nl "disconnect_from_server %s" (string_of_reason
r);
567 match s.server_sock
with
569 let h = s.server_host
in
570 (match server_state s with
572 let connection_time = Int64.to_int
(
573 Int64.sub
(int64_time
()) s.server_connected
) in
575 lprintf_nl "disconnect_from_connected_server %s:%d after %d seconds (%s)\n"
576 (Ip.string_of_addr
h.host_addr
) h.host_port
577 connection_time (string_of_reason
r)
581 (try close sock
r with _
-> ());
582 s.server_sock
<- NoConnection
;
583 set_server_state s (NotConnected
(r, -1));
584 s.server_need_qrt
<- true;
586 if List.memq
s !connected_servers then
587 connected_servers := List2.removeq
s !connected_servers
592 let parse_magnet url =
593 let url = Url.of_string url in
594 if url.Url.file = "magnet:" then
597 List.iter (fun (value, arg) ->
598 if String2.starts_with value "xt" then
599 uids := (extract_uids arg) @ !uids
601 if String2.starts_with value "dn" then
602 name := Url.decode arg
605 (* This is an error in the magnet, where a & has been kept instead of being
607 name := Printf.sprintf
"%s&%s" !name value
609 lprintf
"MAGNET: unused field %s = %s\n"
617 String2.replace_char
s '
\r' '
\n'
;
618 String2.replace_char
s ' ' '
\n'
621 let name = !!global_login
in
622 let len = String.length
name in
623 if len < 32 then name else String.sub
name 0 32
625 (*************************************************************
627 Define a function to be called when the "mem_stats" command
628 is used to display information on structure footprint.
630 **************************************************************)
633 (* let network_info = CommonNetwork.network_info network in *)
634 let name = network.network_name ^
"Globals" in
635 Heap.add_memstat
name (fun level buf
->
636 Printf.bprintf buf
"Number of old files: %d\n" (List.length
!!old_files