1 (* Copyright 2001, 2002 b8_bavard, b8_fee_carabine, INRIA *)
3 This file is part of mldonkey.
5 mldonkey is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation; either version 2 of the License, or
8 (at your option) any later version.
10 mldonkey is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 GNU General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with mldonkey; if not, write to the Free Software
17 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
24 open CommonComplexOptions
27 open TcpBufferedSocket
36 open OpenFTComplexOptions
38 module DG
= CommonGlobals
39 module DO
= CommonOptions
41 let disconnect_from_server s
=
42 match s
.server_sock
with
46 lprintf "DISCONNECT FROM SERVER %s:%d"
47 (Ip.to_string s.server_ip) s.server_port;
50 close sock
"disconnect";
51 s
.server_sock
<- None
;
52 set_server_state s NotConnected
;
54 connection_failed s
.server_connection_control
;
55 connected_servers
:= List2.removeq s
!connected_servers
;
56 if s
.server_type
= User_node
then
57 Hashtbl.remove servers_by_key
(s
.server_ip
, s
.server_port
)
60 lprintf
"OpenFTServers.ask_for_files not implemented"; lprint_newline
()
63 lprintf
"OpenFTServers.send_pings not implemented"; lprint_newline
()
66 lprintf
"recover_files"; lprint_newline
();
67 let module Q
= Search
in
68 Hashtbl.iter
(fun _ file
->
69 if file_state file
= FileDownloading
then begin
73 Q.search_type
= Q.Search_md5
;
74 Q.words
= (String.lowercase
(Md4.to_string file
.file_md5
)) ;
77 Q.size_min
= Int64.zero
;
78 Q.size_max
= Int64.zero
;
79 Q.kbps_min
= Int64.zero
;
80 Q.kbps_max
= Int64.zero
;
83 match s
.server_sock
with
85 | Some sock
-> server_send sock
t
90 let recover_files_from_server sock
=
91 lprintf
"recover_files_from_server"; lprint_newline
();
92 let module Q
= Search
in
93 Hashtbl.iter
(fun _ file
->
94 if file_state file
= FileDownloading
then begin
98 Q.search_type
= Q.Search_md5
;
99 Q.words
= (String.lowercase
(Md4.to_string file
.file_md5
)) ;
102 Q.size_min
= Int64.zero
;
103 Q.size_max
= Int64.zero
;
104 Q.kbps_min
= Int64.zero
;
105 Q.kbps_max
= Int64.zero
;
113 let send_query keywords
=
114 let module Q
= Search
in
115 let words = String2.unsplit keywords ' '
in
119 Q.search_type
= Q.Search_filename
;
123 Q.size_min
= Int64.zero
;
124 Q.size_max
= Int64.zero
;
125 Q.kbps_min
= Int64.zero
;
126 Q.kbps_max
= Int64.zero
;
129 match s
.server_sock
with
131 | Some sock
-> server_send sock
t
132 ) !connected_servers
;
137 let module Q = QueryReply in
138 let user = new_user t.Q.guid (match t.Q.dont_connect with
139 Some true -> Indirect_location ("", t.Q.guid)
140 | _ -> Known_location(t.Q.ip, t.Q.port))
142 user.user_speed <- t.Q.speed;
146 (* NOT IMPLEMENTED YET
148 let send_query min_speed keywords xml_query =
149 let module Q = Query in
150 let words = String2.unsplit keywords ' ' in
154 Q.xml_query = "" } in
155 let p = new_packet t in
156 if !!verbose_servers > 0 then begin
157 lprintf "sending query for <%s>" words; lprint_newline ();
160 match s.server_sock with
162 | Some sock -> server_send sock p
163 ) !connected_servers;
166 let extension_list = [
167 "mp3" ; "avi" ; "jpg" ; "jpeg" ; "txt" ; "mov" ; "mpg"
170 let rec remove_short list list2 =
174 if List.mem s extension_list then
175 remove_short list (s :: list2) else
177 if String.length s < 5 then (* keywords should had list be 5 bytes *)
178 remove_short list list2
180 remove_short list
(s
:: list2
)
183 let s = String.lowercase
(String.copy
s) in
184 for i
= 0 to String.length
s - 1 do
187 'a'
..'z'
| '
0'
.. '
9'
-> ()
190 remove_short (String2.split
s ' '
) []
192 let get_name_keywords file_name
=
193 match stem file_name
with
195 lprintf
"Not enough keywords to recover %s" file_name
;
200 let recover_files () =
201 List.iter
(fun file
->
202 let keywords = get_name_keywords file
.file_name
204 ignore
(send_query 0 keywords "")
208 let recover_files_from_server sock
=
209 if !!verbose_servers
> 0 then begin
210 lprintf
"trying to recover files from server"; lprint_newline
();
212 List.iter
(fun file
->
213 if !!verbose_servers
> 0 then begin
214 lprintf
"FOR FILE %s" file
.file_name
; lprint_newline
();
216 let keywords = get_name_keywords file
.file_name
in
217 let words = String2.unsplit
keywords ' '
in
218 if !!verbose_servers
> 0 then begin
219 lprintf
"sending query for <%s>" words; lprint_newline
();
221 let module Q
= Query
in
225 Q.xml_query
= "" } in
226 let p = new_packet
t in
232 let redirector_to_client p sock
=
233 (* lprintf "redirector_to_client"; lprint_newline (); *)
234 match p.pkt_payload
with
236 let module P
= Pong
in
237 (* lprintf "ADDING PEER %s:%d" (Ip.to_string t.P.ip) t.P.port; *)
238 Fifo.put peers_queue
(t.P.ip
, t.P.port
);
241 let redirector_parse_header sock header
=
242 (* lprintf "redirector_parse_header"; lprint_newline ();*)
243 if String2.starts_with header gnutella_ok
then begin
244 (* lprintf "GOOD HEADER FROM REDIRECTOR:waiting for pongs";*)
245 server_send_new sock
(
246 let module P
= Ping
in
247 PingReq
(P.ComplexPing
{
248 P.ip
= DO.client_ip
(Some sock
);
249 P.port
= !!client_port
;
250 P.nfiles
= Int64.zero
;
252 P.s = "none:128:false";
255 if !!verbose_servers
>10 then begin
256 lprintf
"BAD HEADER FROM REDIRECTOR: "; lprint_newline
();
257 LittleEndian.dump header
;
259 close sock
"bad header";
260 redirector_connected
:= false;
264 let connect_to_redirector () =
265 match !redirectors_to_try
with
267 redirectors_to_try
:= !redirectors_ips
269 redirectors_to_try
:= tail
;
270 (* lprintf "connect to redirector"; lprint_newline (); *)
272 let sock = connect
"openft to redirector"
273 (Ip.to_inet_addr ip
) 6346
276 BASIC_EVENT RTIMEOUT
->
277 close
sock "timeout";
278 redirector_connected
:= false;
279 (* lprintf "TIMEOUT FROM REDIRECTOR"; lprint_newline ()*)
282 TcpBufferedSocket.set_read_controler
sock download_control
;
283 TcpBufferedSocket.set_write_controler
sock upload_control
;
286 redirector_connected
:= true;
287 set_reader
sock (handler
!!verbose_servers
redirector_parse_header
288 (gnutella_handler parse
redirector_to_client)
290 set_closer
sock (fun _ _
->
291 (* lprintf "redirector disconnected"; lprint_newline (); *)
292 redirector_connected
:= false);
293 set_rtimeout
sock 10.;
294 set_lifetime
(TcpBufferedSocket.sock sock) 120.;
295 write_string
sock "GNUTELLA CONNECT/0.4\n\n";
297 lprintf
"Exception in connect_to_redirector: %s"
298 (Printexc2.to_string e
); lprint_newline
();
299 redirector_connected
:= false
301 let add_peers headers
=
303 let up = List.assoc
"x-try-ultrapeers" headers
in
306 let len = String.length
s in
307 (* lprintf "NEW ULTRAPEER %s" s; lprint_newline ();*)
308 let pos = String.index
s '
:'
in
309 let ip = String.sub
s 0 pos in
310 let port = String.sub
s (pos+1) (len - pos - 1) in
311 let ip = Ip.of_string
ip in
312 let port = int_of_string
port in
313 (* lprintf "ADDING UP %s:%d" (Ip.to_string ip) port;
315 Fifo.put ultrapeers_queue
(ip,port ) ;
316 while Fifo.length ultrapeers_queue
> !!max_known_ultrapeers
do
317 ignore
(Fifo.take ultrapeers_queue
)
321 ) (String2.split
up '
,'
);
323 lprintf
"add_ulta_peers : %s" (Printexc2.to_string e
);
326 let up = List.assoc
"x-try" headers
in
329 let len = String.length
s in
330 (* lprintf "NEW PEER %s" s; lprint_newline (); *)
331 let pos = String.index
s '
:'
in
332 let ip = String.sub
s 0 pos in
333 let port = String.sub
s (pos+1) (len - pos - 1) in
334 let ip = Ip.of_string
ip in
335 let port = int_of_string
port in
336 (* lprintf "ADDING PEER %s:%d" (Ip.to_string ip) port;
338 Fifo.put peers_queue
(ip,port);
339 while Fifo.length peers_queue
> !!max_known_peers
do
340 ignore
(Fifo.take peers_queue
)
344 ) (String2.split
up '
,'
)
348 ascii: [ G N U T E L L A / 0 . 6 2 0 0 O K(13)(10) U s e r - A g e n t : G n u c l e u s 1 . 8 . 2 . 0(13)(10) R e m o t e - I P : 2 1 2 . 1 9 8 . 2 3 5 . 1 2 3(13)(10) X - Q u e r y - R o u t i n g : 0 . 1(13)(10) X - U l t r a p e e r : T r u e(13)(10) X - L e a f - M a x : 4 0 0(13)(10) U p t i m e : 0 D 0 3 H 3 0 M(13)(10)(13)]
354 let module Q
= QueryReply
in
355 let user = new_user
t.Q.guid
(match t.Q.dont_connect
with
356 Some
true -> Indirect_location
("", t.Q.guid
, _
, _
)
357 | _
-> Known_location
(t.Q.ip, t.Q.port))
359 user.user_speed
<- t.Q.speed
;
362 let update_client t =
363 let module Q
= QueryReply
in
364 let c = new_client
t.Q.guid
(match t.Q.dont_connect
with
365 Some
true -> Indirect_location
("", t.Q.guid
, _
, _
)
366 | _
-> Known_location
(t.Q.ip, t.Q.port))
369 c.client_user
.user_speed
<- t.Q.speed
;
372 let server_parse_header s sock header
=
373 if !!verbose_servers
> 10 then LittleEndian.dump_ascii header
;
375 if String2.starts_with header gnutella_200_ok
then begin
376 (* lprintf "GOOD HEADER FROM ULTRAPEER";
377 lprint_newline (); *)
378 set_rtimeout
sock Date.half_day_in_secs
;
379 (* lprintf "SPLIT HEADER..."; lprint_newline ();*)
380 let lines = Http_client.split_header header
in
382 [] -> raise Not_found
384 (* lprintf "CUT HEADER"; lprint_newline ();*)
385 let headers = Http_client.cut_headers
headers in
386 let agent = List.assoc
"user-agent" headers in
387 (* lprintf "USER AGENT: %s" agent; lprint_newline ();*)
388 if String2.starts_with
agent "LimeWire" ||
389 String2.starts_with
agent "Gnucleus" ||
390 String2.starts_with
agent "BearShare"
393 s.server_agent
<- agent;
394 (* lprintf "LIMEWIRE Detected"; lprint_newline ();*)
396 if List.assoc
"x-ultrapeer" headers <> "True" then begin
397 (* lprintf "NOT AN ULTRAPEER ???"; lprint_newline (); *)
401 (* lprintf "******** ULTRA PEER %s:%d *******"
402 (Ip.to_string s.server_ip) s.server_port;
403 lprint_newline (); *)
404 write_string
sock "GNUTELLA/0.6 200 OK\r\n\r\n";
405 set_server_state
s Connected_idle
;
406 connected_servers
:= s :: !connected_servers
;
407 recover_files_from_server sock
411 if String2.starts_with header gnutella_503_shielded
then begin
412 (* lprintf "GOOD HEADER FROM SIMPLE PEER";
414 let lines = Http_client.split_header header
in
416 [] -> raise Not_found
418 let headers = Http_client.cut_headers
headers in
419 let agent = List.assoc
"user-agent" headers in
420 if String2.starts_with
agent "LimeWire" ||
421 String2.starts_with
agent "Gnucleus" ||
422 String2.starts_with
agent "BearShare"
425 (* lprintf "LIMEWIRE Detected"; lprint_newline ();*)
431 (* lprintf "BAD HEADER FROM SERVER: [%s]" header; lprint_newline (); *)
436 (* lprintf "DISCONNECTION"; lprint_newline (); *)
437 disconnect_from_server s
440 lprintf "DISCONNECT WITH EXCEPTION %s" (Printexc2.to_string e);
443 disconnect_from_server s
447 let get_file_from_source c file
=
448 if connection_can_try
c.client_connection_control
then begin
449 connection_try
c.client_connection_control
;
450 let u = c.client_user
in
451 let s = u.user_server
in
452 lprintf
"******* DOWNLOAD FROM %s %d %d *******"
453 (Ip.to_string
s.server_ip
) s.server_port
s.server_http_port
;
455 if s.server_http_port
<> 0 then
457 match c.client_user.user_kind with
458 Indirect_location ("", uid) ->
459 lprintf "++++++ ASKING FOR PUSH +++++++++"; lprint_newline ();
461 (* do as if connection failed. If it connects, connection will be set to OK *)
462 connection_failed
c.client_connection_control
;
463 let module P
= Push
in
466 P.ip = DO.client_ip None
;
467 P.port = !!client_port
;
468 P.index
= List.assq file
c.client_downloads
;
470 let p = new_packet
t in
472 match s.server_sock
with
474 | Some
sock -> server_send
sock p
478 OpenFTClients.connect_client
c
481 let download_file (r
: result
) =
482 let file = new_file r
.result_md5 r
.result_name r
.result_size
in
483 lprintf
"DOWNLOAD FILE %s" file.file_name
; lprint_newline
();
484 if not
(List.memq
file !current_files
) then begin
485 current_files
:= file :: !current_files
;
487 List.iter
(fun (user, index
) ->
488 let s = user.user_server
in
489 let c = new_client
s.server_ip
s.server_port
s.server_http_port
in
490 add_download
file c index
;
491 get_file_from_source c file;
495 (* these two functions are also in dcGlobals.ml *)
497 let basename filename
=
499 let len = String.length filename
in
501 let pos = String.rindex_from filename
(len-1) '
\\'
in
502 String.sub filename
(pos+1) (len-pos-1)
506 let c1 = Char.lowercase filename
.[0] in
507 let c2 = filename
.[1] in
510 String.sub filename
2 (len -2 )
511 | _
-> raise
exit_exn
513 with _
-> Filename.basename filename
518 let server_to_client s t sock =
520 if !!verbose_servers
> 200 then begin
521 lprintf
"From server:"; lprint_newline
();
526 set_rtimeout
sock 60.;
528 let module V
= VersionReply
in
535 | VersionReplyReq
t ->
536 let module V
= VersionReply
in
537 s.server_version
<- Printf.sprintf
"%d.%d.%d"
538 t.V.major_num
t.V.minor_num
t.V.micro_num
;
539 set_server_state
s Connected_idle
;
540 connection_ok
s.server_connection_control
;
544 server_send
sock (let module N
= NodeInfoReply
in
545 NodeInfoReplyReq
{ N.ip = client_ip
(Some
sock);
547 N.http_port
= !!http_port
;
550 | NodeInfoReplyReq
t ->
551 let module N
= NodeInfoReply
in
552 s.server_http_port
<- t.N.http_port
;
553 assert (s.server_port
= t.N.port)
554 (* we should already have this information, no ? *)
557 server_send
sock (ClassReplyReq User_node
)
562 match s.server_type
with
564 server_send
sock (ChildReq None
);
565 server_send
sock (StatsReq
Stats.Retrieve_info
);
567 (* don't stay connected more than one minute to a user node *)
568 set_lifetime
sock 60.
573 server_send
sock (let module N
= NodeListReply
in
574 (NodeListReplyReq
(Some
{
576 N.port = s.server_port
;
577 N.node_type
= s.server_type
;
579 ) !connected_servers
;
580 server_send
sock (NodeListReplyReq None
)
582 | NodeListReplyReq None
-> ()
583 | NodeListReplyReq
(Some
t) ->
585 let module N
= NodeListReply
in
586 if t.N.port <> 0 then
587 let s = new_server
t.N.ip t.N.port in
588 match s.server_type
with
589 User_node
-> Fifo.put peers_queue
(t.N.ip, t.N.port)
590 | _
-> s.server_type
<- t.N.node_type
594 server_send
sock (NodeCapReplyReq
["MD5-FULL"]) (* not "ZLIB" yet *)
596 | NodeCapReplyReq
t ->
602 lprintf
"************ CONNECTED AND CHILD *************";
604 set_rtimeout
sock 3600.;
605 server_send
sock (ChildReq
(Some
true));
606 connected_servers
:= s :: !connected_servers
;
607 recover_files_from_server sock
610 disconnect_from_server s
612 | PingReq
-> server_send
sock PingReplyReq
615 | SearchReplyReq
t ->
616 lprintf
"REPLY TO QUERY"; lprint_newline
();
617 let module Q
= SearchReply
in
620 let ss = Hashtbl.find searches_by_uid
t.Q.id
in
622 let ip = if t.Q.ip = Ip.null
then s.server_ip
else t.Q.ip in
623 let user = new_user
ip t.Q.port t.Q.http_port
in
625 lprintf
"NEW RESULT %s" t.Q.filename
; lprint_newline
();
626 let result = new_result
t.Q.md5
(basename t.Q.filename
) t.Q.size
in
627 lprintf
"ADDING SOURCE FOR RESULT NOT IMPLEMENTED";
629 add_source
result user t.Q.filename
;
631 CommonInteractive.search_add_result
ss.search_search
result.result_result
;
633 lprintf
"NO SUCH SEARCH !!!!"; lprint_newline
();
636 let file = Hashtbl.find files_by_md5
t.Q.md5
in
637 let ip = if t.Q.ip = Ip.null
then s.server_ip
else t.Q.ip in
638 let user = new_user
ip t.Q.port t.Q.http_port
in
640 let result = new_result
t.Q.md5
(basename t.Q.filename
) t.Q.size
in
641 lprintf
"ADDING SOURCE FOR RESULT NOT IMPLEMENTED";
643 add_source
result user t.Q.filename
;
645 let s = user.user_server
in
646 let c = new_client
s.server_ip
s.server_port
s.server_http_port
in
647 add_download
file c t.Q.filename
;
648 get_file_from_source c file;
651 lprintf
"NO SUCH SEARCH for no file !!!!";
657 lprintf
"UNUSED MESSAGE";
662 lprintf "server_to_client"; lprint_newline ();
665 match p.pkt_payload
with
667 if p.pkt_hops
<= 3 then
670 pkt_hops
= p.pkt_hops
+ 1;
673 let module P
= Pong
in
675 P.ip = (DO.client_ip
(Some
sock));
676 P.port = !!client_port
;
683 let module P
= Pong
in
684 (* lprintf "FROM %s:%d" (Ip.to_string t.P.ip) t.P.port; *)
685 if p.pkt_uid
= s.server_ping_last
then begin
686 s.server_nfiles_last
<- s.server_nfiles_last
+ t.P.nfiles
;
687 s.server_nkb_last
<- s.server_nkb_last
+ t.P.nkb
691 (* lprintf "REPLY TO QUERY NOT IMPLEMENTED YET :("; lprint_newline ();*)
695 (* lprintf "REPLY TO QUERY"; lprint_newline ();*)
696 let module Q
= QueryReply
in
699 let s = Hashtbl.find searches_by_uid
p.pkt_uid
in
701 let user = update_user t in
703 (* lprintf "ADDING RESULTS"; lprint_newline ();*)
705 (* lprintf "NEW RESULT %s" f.Q.name; lprint_newline ();*)
706 let result = new_result f
.Q.name f
.Q.size
in
707 add_source
result user f
.Q.index
;
709 search_add_result
s.search_search
result.result_result
;
712 lprintf
"NO SUCH SEARCH !!!!"; lprint_newline
();
714 List.iter
(fun file ->
715 if file.file_name
= ff
.Q.name
&&
716 file_size
file = ff
.Q.size
then
718 lprintf
"++++++++++++++ RECOVER FILE %s +++++++++++++" file.file_name
; lprint_newline
();
719 let c = update_client t in
720 add_download
file c ff
.Q.index
;
729 let module P
= Ping
in
733 match s.server_sock
with
736 let p = { (new_packet
pl) with pkt_ttl
= 1; } in
737 s.server_nfiles
<- s.server_nfiles_last
;
738 s.server_nkb
<- s.server_nkb_last
;
739 s.server_ping_last
<- p.pkt_uid
;
740 s.server_nfiles_last
<- 0;
741 s.server_nkb_last
<- 0;
746 let connect_server (ip,port) =
747 if !!verbose_servers
> 5 then begin
748 lprintf
"SHOULD CONNECT TO %s:%d" (Ip.to_string
ip) port;
751 let s = new_server
ip port in
752 match s.server_sock
with
756 let sock = connect
"openft to server"
757 (Ip.to_inet_addr
ip) port
760 BASIC_EVENT
(RTIMEOUT
|LTIMEOUT
) ->
761 (* lprintf "RTIMEOUT"; lprint_newline (); *)
762 disconnect_from_server s
765 TcpBufferedSocket.set_read_controler
sock download_control
;
766 TcpBufferedSocket.set_write_controler
sock upload_control
;
768 connection_try
s.server_connection_control
;
769 set_server_state
s Connecting
;
770 s.server_sock
<- Some
sock;
772 set_reader
sock (cut_messages
OpenFTProtocol.parse
775 set_closer
sock (fun _ error
->
776 (* lprintf "CLOSER %s" error; lprint_newline ();*)
777 disconnect_from_server s);
778 set_rtimeout
sock !!server_connection_timeout
;
779 server_send
sock VersionReq
;
780 server_send
sock ClassReq
;
781 server_send
sock NodeInfoReq
;
782 server_send
sock NodeListReq
;
783 server_send
sock NodeCapReq
785 disconnect_from_server s
788 let try_connect_ultrapeer () =
789 (* lprintf "try_connect_ultrapeer"; lprint_newline (); *)
791 Fifo.take ultrapeers_queue
794 Fifo.take peers_queue
796 Hashtbl.iter
(fun key
s ->
797 match s.server_type
with
799 | _
-> Fifo.put ultrapeers_queue key
806 let connect_servers () =
807 if !nservers
< !!max_ultrapeers
then begin
808 for i
= !nservers
to !!max_ultrapeers
- 1 do
809 try_connect_ultrapeer ()
815 let ask_for_files () =
816 List.iter (fun file ->
819 get_file_from_source c file
829 server_ops
.op_server_connect
<- (fun s ->
830 connect_server (s.server_ip
, s.server_port
));
831 server_ops
.op_server_disconnect
<- disconnect_from_server;
833 (* server_ops.op_server_query_users <- (fun s -> *)
834 match s.server_sock
with
837 server_send
sock (GetNickListReq
)
839 (* server_ops.op_server_users <- (fun s -> *)
840 List2.tail_map
(fun u -> as_user
u.user_user
) s.server_users
843 server_ops
.op_server_remove
<- (fun s ->
844 disconnect_from_server s;
845 Hashtbl.remove servers_by_key
(s.server_ip
, s.server_port
);
846 server_remove
(as_server
s.server_server
);
848 server_ops
.op_server_sort
<- (fun s ->
849 connection_last_conn
s.server_connection_control