1 (* Copyright 2001, 2002 b8_bavard, b8_fee_carabine, INRIA *)
3 This file is part of mldonkey.
5 mldonkey is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation; either version 2 of the License, or
8 (at your option) any later version.
10 mldonkey is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 GNU General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with mldonkey; if not, write to the Free Software
17 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
26 open TcpBufferedSocket
34 open CommonComplexOptions
44 open FasttrackProtocol
45 open FasttrackComplexOptions
48 let load_nodes_file filename
=
49 let regexp = Str.regexp "^\\([0-9]+\\.[0-9]+\\.[0-9]+\\.[0-9]+\\) \\([0-9]+\\) .*$" in
50 Unix2.tryopen_read filename
(fun cin
->
54 let line = input_line cin
in
56 if Str.string_match
regexp line 0 then
57 let ip = Ip.addr_of_string
(Str.matched_group
1 line) in
58 let port = int_of_string
(Str.matched_group
2 line) in
60 ignore
(H.new_host
ip port Ultrapeer
)
63 lprintf_nl
"Syntax error in %s" filename
;
66 with End_of_file
-> ()
69 let unpack_nodes_gzip filename url
=
70 let ext = String.lowercase
(Filename2.extension filename
) in
71 let last_ext = String.lowercase
(Filename2.last_extension filename
) in
72 let real_ext = if last_ext = ".zip" then last_ext else ext in
76 Misc.archive_extract filename
"gz"
78 lprintf_nl
"Exception %s while extracting from %s" (Printexc2.to_string e
) url
;
84 CommonWeb.add_web_kind
"nodes.gzip" "List of fasttrack nodes"
86 lprintf_nl
"nodes.gzip loaded from %s" url
;
87 let f = unpack_nodes_gzip filename url
in
89 if f <> filename
then Sys.remove
f
92 let server_parse_after s gconn sock
=
94 match s
.server_ciphers
with
98 (* if !verbose_msg_raw then
99 lprintf "server_parse_after: %d bytes\n" b.len; *)
103 let size = TcpMessages.packet_size ciphers
b.buf
b.pos
b.len in
108 let msg = String.sub
b.buf
b.pos
size in
110 let addr, t
= TcpMessages.parse ciphers
msg in
111 FasttrackHandler.server_msg_handler sock s
addr t
;
116 lprintf
"Exception %s in server_parse_after\n"
117 (Printexc2.to_string e
);
118 close sock
(Closed_for_error
"Reply not understood")
120 let server_connection_hook = ref (None
: (server
-> unit) option)
122 let greet_supernode s
=
123 (match !server_connection_hook with
125 server_send s
TcpMessages.DirectPacket
(
126 TcpMessages.NodeInfoReq
(
127 client_ip s
.server_sock
,
132 (* ; server_send_ping s *)
134 let server_parse_netname s gconn sock
=
135 let b = TcpBufferedSocket.buf sock
in
137 let start_pos = b.pos
in
138 let end_pos = start_pos + len in
140 let net = String.sub
buf start_pos len in
141 if !verbose_msg_raw
then
142 lprintf
"net:[%s]\n" (String.escaped
net);
144 if pos
< end_pos then
145 if buf.[pos
] = '
\000'
then begin
146 let netname = String.sub
buf start_pos (pos
-start_pos) in
147 if !verbose_msg_raw
then
148 lprintf
"netname: [%s]\n" (String.escaped
netname);
149 buf_used
b (pos
-start_pos+1);
150 match s
.server_ciphers
with
153 gconn
.gconn_handler
<-
154 CipherReader
(ciphers
.in_cipher
, server_parse_after s
);
161 let server_parse_cipher s gconn sock
=
162 H.connected s
.server_host
;
163 let b = TcpBufferedSocket.buf sock
in
165 match s
.server_ciphers
with
168 if !verbose_msg_raw
then
169 lprintf
"Cipher received from server\n";
170 get_cipher_from_packet
b.buf b.pos ciphers
.in_cipher
;
171 init_cipher ciphers
.in_cipher
;
173 xor_ciphers ciphers
.out_cipher ciphers
.in_cipher
;
174 init_cipher ciphers
.out_cipher
;
177 server_crypt_and_send s ciphers
.out_cipher
(network_name ^
"\000");
178 gconn
.gconn_handler
<- CipherReader
(ciphers
.in_cipher
, server_parse_netname s
);
179 if !verbose_msg_raw
then
180 lprintf
"waiting for netname\n"
182 let client_cipher_seed () =
183 (* Int32.of_int (Random.int max_int) *)
186 let connection_header_hook = ref None
188 let connect_server h
=
189 let s = match h
.host_server
with
191 let s = new_server h
.host_addr h
.host_port
in
192 h
.host_server
<- Some
s;
196 match s.server_sock
with
200 add_pending_connection connection_manager
(fun token ->
203 let ip = Ip.ip_of_addr h
.host_addr
in
204 if not
(Ip.valid
ip) then
205 failwith
"Invalid IP for server\n";
206 let port = s.server_host
.host_port
in
207 if !verbose_msg_servers
then
208 lprintf
"CONNECT TO %s:%d\n"
209 (Ip.string_of_addr h
.host_addr
) port;
210 H.set_request h Tcp_Connect
;
212 (* Standard Kazaa clients send a ping first, and only connect if they
213 receive a Supernode Pong. We send the ping only to get the latency. *)
214 udp_send
ip port true (
215 let module M
= UdpMessages
in
216 M.PingReq
(169, "\128", "KaZaA"));
218 let ip = Ip.to_inet_addr
ip in
219 let sock = connect
token "fasttrack to server"
223 BASIC_EVENT
(RTIMEOUT
|LTIMEOUT
) ->
224 (* lprintf "RTIMEOUT\n"; *)
225 disconnect_from_server nservers
s Closed_for_timeout
228 TcpBufferedSocket.set_read_controler
sock download_control
;
229 TcpBufferedSocket.set_write_controler
sock upload_control
;
231 set_server_state
s Connecting
;
232 s.server_sock
<- Connection
sock;
234 set_fasttrack_sock
sock !verbose_msg_servers
235 (Reader
(server_parse_cipher s)
237 set_closer
sock (fun _ error
->
238 (* lprintf "CLOSER %s\n" error; *)
239 disconnect_from_server nservers
s error
);
240 set_rtimeout
sock !!server_connection_timeout
;
242 let in_cipher = create_cipher
() in
243 let out_cipher = create_cipher
() in
244 s.server_ciphers
<- Some
{
245 in_cipher = in_cipher;
246 out_cipher = out_cipher;
250 set_cipher
out_cipher (client_cipher_seed ()) 0x29;
252 let s = String.create
12 in
254 (match !connection_header_hook with
262 cipher_packet_set
out_cipher s 4;
264 if !verbose_msg_raw
then begin
265 lprintf
"SENDING %s\n" (String.escaped
s);
270 disconnect_from_server nservers
s Closed_connect_failed
273 s.server_sock
<- ConnectionWaiting
token;
276 let get_file_from_source c file
=
278 if connection_can_try c
.client_connection_control
then begin
279 connection_try c
.client_connection_control
;
280 match c
.client_user
.user_kind
with
281 Indirect_location
("", uid
, _, _) ->
283 lprintf "++++++ ASKING FOR PUSH +++++++++\n";
285 (* do as if connection failed. If it connects, connection will be set to OK *)
286 connection_failed c
.client_connection_control
;
288 let uri = (find_download file c
.client_downloads
).download_uri
in
290 FasttrackProto.server_send_push
s uid
uri
291 ) !connected_servers
;
293 lprintf
"PUSH NOT IMPLEMENTED\n"
295 if not
(List.memq file c
.client_in_queues
) then begin
296 Queue.put file
.file_clients_queue
(1,c
);
297 c
.client_in_queues
<- file
:: c
.client_in_queues
301 lprintf
"get_file_from_source: exception %s\n" (Printexc2.to_string e
)
305 let disconnect_server s r
=
306 match s.server_sock
with
307 | Connection
sock -> close
sock r
308 | ConnectionWaiting
token ->
310 s.server_sock
<- NoConnection
;
315 let really_recover_file file
=
318 if not
(Fifo.mem
s.server_searches ss
) then
319 Fifo.put
s.server_searches ss
323 let really_download_file (r
: CommonTypes.result_info
) user group
=
327 (match Uid.to_uid uid
with
328 Md5Ext hash
-> hash
, Uid.to_file_string uid
330 | [] -> raise IgnoreNetwork
332 let hash,file_temp
= iter r
.result_uids
in
334 let file = new_file file_temp
(List.hd r
.result_names
)
335 r
.result_size
[Uid.create
(Md5Ext
hash)] user group
in
337 lprintf
"DOWNLOAD FILE %s\n" file.file_name
;
338 if not
(List.memq
file !current_files
) then begin
339 current_files
:= file :: !current_files
;
342 let sources = Hashtbl.find result_sources r
.result_num
in
343 List.iter (fun (user
, _) ->
344 let c = new_client user
.user_kind
in
345 add_download
file c ();
346 get_file_from_source c file;
351 let ask_for_files () = (* called every minute *)
352 List.iter (fun file ->
354 get_file_from_source c file
357 let module M
= TcpMessages
in
360 let ss = Fifo.take
s.server_searches
in
361 match ss.search_search
with
362 FileUidSearch
(file,file_hash
) ->
363 if file_state
file = FileDownloading
then
364 server_send
s M.DirectPacket
366 (32, ss.search_uid
, M.QueryLocationReq file_hash
))
367 | UserSearch
(_, words
, (realm
, tags
)) ->
375 | "application" -> 0x25
379 server_send
s M.DirectPacket
381 (32, ss.search_uid
, M.QueryFilesReq
382 (words
, realm, tags
)))
384 ) !connected_servers
;
388 server_ops
.op_server_disconnect
<- (fun s ->
389 disconnect_server s Closed_by_user
);
390 server_ops
.op_server_remove
<- (fun s ->
391 disconnect_server s Closed_by_user
395 Int64.to_int
((file_size
file) // min_range_size
) + 5
397 let manage_hosts () =
399 List.iter (fun file ->
400 if file_state
file = FileDownloading
then
402 (* For each file, we allow only (nranges+5) simultaneous communications,
403 to prevent too many clients from saturing the line for only one file. *)
404 let max_nconnected_clients = nranges file in
405 while file.file_nconnected_clients
< max_nconnected_clients do
406 let (_,c) = Queue.take
file.file_clients_queue
in
407 c.client_in_queues
<- List2.removeq
file c.client_in_queues
;
408 FasttrackClients.connect_client
c
413 let rec find_ultrapeer queue
=
414 let (next
,h
) = Queue.head queue
in
416 if next
> last_time
() then begin
417 (* lprintf "not ready: %d s\n" (next - last_time ()); *)
420 ignore
(H.host_queue_take queue
);
422 with _ -> find_ultrapeer queue
424 let try_connect_ultrapeer connect
=
425 (* lprintf "try_connect_ultrapeer....\n"; *)
428 find_ultrapeer ultrapeers_waiting_queue
430 (* lprintf "not in ultrapeers_waiting_queue\n"; *)
433 (* lprintf "contacting..\n"; *)
436 let connect_servers connect
=
437 (* lprintf "connect_servers %d %d\n" !nservers !!max_ultrapeers; *)
438 (if !!max_ultrapeers
> List.length
!connected_servers
then
440 let to_connect = 3 * (!!max_ultrapeers
- !nservers
) in
441 for i
= 1 to to_connect do
442 (* lprintf "try_connect_ultrapeer...\n"; *)
443 try_connect_ultrapeer connect
447 (* Looks like there is no ping to send in Fasttrack? *)
448 let send_pings () = ()
451 let s_uid = ss.search_uid
in
452 let module M
= TcpMessages
in
454 match ss.search_search
with
455 UserSearch
(_, words
, (realm, tags
)) ->
462 | "application" -> 0x25
465 M.QueryFilesReq
(words
, realm, tags
)
466 | FileUidSearch
(_, file_hash
) ->
467 M.QueryLocationReq file_hash
470 FasttrackProto.server_send
s M.DirectPacket
(
471 M.SearchReq
(32, s_uid, t))) !connected_servers
476 push request: we send a push to the server when we cannot connect to
477 a particular client. The client by connecting to us with a
478 "GIVE <push_id>\r\n" request, to which we can reply by a "GET ...." *)