1 (* Copyright 2001, 2002 b8_bavard, b8_fee_carabine, INRIA *)
3 This file is part of mldonkey.
5 mldonkey is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation; either version 2 of the License, or
8 (at your option) any later version.
10 mldonkey is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 GNU General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with mldonkey; if not, write to the Free Software
17 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
26 open TcpBufferedSocket
32 open CommonComplexOptions
48 This is a simple ultrapeer implementation without Bloom filters. It can
49 be used only to test publication of files and upload between MLdonkey clients.
50 Maybe if Bloom filters are correctly implemented, it could be used as a normal
54 (*************************************************************************)
58 (*************************************************************************)
61 mutable node_sock
: tcp_connection
;
62 mutable node_packets
: Md4.t list
;
63 mutable node_former_packets
: Md4.t list
Fifo.t
;
64 mutable node_guids
: Md4.t list
;
65 mutable node_host
: host
;
69 (*************************************************************************)
73 (*************************************************************************)
75 let gnutella_connect_proto = "GNUTELLA CONNECT/"
77 (*************************************************************************)
81 (*************************************************************************)
83 let gnutella_enabled = ref None
84 let supernode_started = ref false
85 let supernode_sock = ref None
87 let sent_packets = Hashtbl.create
1111
88 let node_guids = Hashtbl.create
1111
90 (*************************************************************************)
94 (*************************************************************************)
96 let supernode_enabled = define_option gnutella_section
98 "(only for development tests)"
101 let supernode_port = define_option gnutella_section
103 "(only for development tests)"
106 let supernode_degree = define_option gnutella_section
108 "(only for development tests)"
111 (* Maximal number of packets received from one client per minute *)
112 let supernode_max_activity = define_option gnutella_section
113 ["supernode_max_activity"]
114 "(only for development tests)"
117 (*************************************************************************)
121 (*************************************************************************)
123 let remove_packets list
=
124 List.iter
(fun uid
-> Hashtbl.remove
sent_packets uid
) list
126 (*************************************************************************)
128 (* disconnect_node *)
130 (*************************************************************************)
132 let disconnect_node node s
=
133 match node
.node_sock
with
135 lprintf
"DISCONNECTED FROM NODE %s\n" (string_of_reason s
);
137 node
.node_sock
<- NoConnection
;
138 nodes := List2.removeq node
!nodes;
139 remove_packets node
.node_packets
;
140 Fifo.iter
remove_packets node
.node_former_packets
;
141 List.iter
(fun guid
->
142 Hashtbl.remove
node_guids guid
144 | ConnectionWaiting token
-> assert false
147 (*************************************************************************)
151 (*************************************************************************)
153 let node_send node t
=
156 match node.node_sock with
157 NoConnection | ConnectionWaiting _ -> ()
159 let m = server_msg_to_string t in
162 host_send node
.node_sock node
.node_host t
164 (*************************************************************************)
168 (*************************************************************************)
170 let update_route puid node
=
171 if not
(Hashtbl.mem
sent_packets puid
) then begin
172 Hashtbl.add
sent_packets puid node
;
173 node
.node_packets
<- puid
:: node
.node_packets
;
177 (*************************************************************************)
181 (*************************************************************************)
183 let node_send_ping node
=
186 packet
(PI_UDP
(client_ip NoConnection
, !!client_port
))[]])
188 (*************************************************************************)
190 (* supernode_to_node *)
192 (*************************************************************************)
194 let supernode_to_node node sock gconn p
=
195 set_lifetime sock
600.;
196 if !verbose_msg_servers
then begin
197 lprintf
"RECEIVED supernode_to_node: %s\n"
200 match p
.g2_payload
with
202 node_send node
(packet PO
[]);
205 if update_route quid node
then
206 List.iter
(fun n
-> if n
!= node
then node_send n p
) !nodes
208 (* Normally, the client immediatly replies with QA to ack the query...
213 if not
(Hashtbl.mem
node_guids quid
) then begin
214 node
.node_guids <- quid
:: node
.node_guids;
215 Hashtbl.add
node_guids quid node
220 let n = Hashtbl.find
sent_packets quid
in
223 lprintf
"Unable to forward QueryReplyReq\n"
229 if p.pkt_hops < 2 then
232 let n = Hashtbl.find node_guids t.Push.guid in
233 node_send n { p with pkt_hops = p.pkt_hops + 1 }
235 lprintf "Unable to forward PushReq\n"
238 (* TODO: We don't care about all these ones currently *)
239 | QrtPatchReq _
-> ()
240 | QrtResetReq _
-> ()
241 (* We don't care about all these ones currently *)
249 (*************************************************************************)
251 (* supernode_handler2 *)
253 (*************************************************************************)
255 let supernode_handler2 node_ref gconn sock
(first_line
, headers
) =
256 lprintf
"Entering supernode_handler2\n";
257 if List.length
!nodes >= !!supernode_degree then begin
258 (* TODO somebody arrived before the final ack... close and forget this client *)
259 close sock Closed_by_user
;
263 (* The reply should be "GNUTELLA/0.6 200 OK" *)
264 let space_pos = String.index first_line ' '
in
265 let slash_pos = String.index first_line '
/'
in
266 let proto = String.sub first_line
(slash_pos+1) (space_pos - slash_pos -1) in
267 let code = String.sub first_line
(space_pos+1) 3 in
269 let h = server_parse_headers first_line headers
in
271 if proto <> "0.6" then
272 failwith
(Printf.sprintf
"Bad protocol [%s]" proto);
273 if code <> "200" then
274 failwith
(Printf.sprintf
"Bad return code [%s]" code);
276 let (ip
, port
) = peer_addr sock
in
277 let host = H.new_host
(Ip.AddrIp ip
) port Peer
in
279 node_sock
= Connection sock
;
281 node_former_packets
= Fifo.create
();
285 (* No packets were sent during the last 3 minutes *)
286 Fifo.put
node.node_former_packets
[];
287 Fifo.put
node.node_former_packets
[];
288 Fifo.put
node.node_former_packets
[];
290 node_ref
:= Some
node;
291 nodes := node :: !nodes;
294 TcpBufferedSocket.set_rtimeout sock
300.;
295 if h.hsrpl_content_deflate
then deflate_connection sock
;
297 gconn
.gconn_handler
<- Reader
298 (g2_handler
(supernode_to_node node sock
))
301 (*************************************************************************)
303 (* supernode_handler1 *)
305 (*************************************************************************)
307 let gnutella_proto = "GNUTELLA/"
308 let gnutella_proto_len = String.length
gnutella_proto
310 let supernode_handler1 node gconn sock
(first_line
, headers
) =
311 lprintf
"Entering supernode_handler1\n";
312 if List.length
!nodes >= !!supernode_degree then begin
313 (* TODO hum... we should not close the socket like that, but send an error
315 close sock Closed_by_user
;
320 (* The request should be "GNUTELLA CONNECT/0.6" *)
321 if first_line
<> "GNUTELLA CONNECT/0.6" then
322 failwith
"Bad connection protocol";
324 let h = server_parse_headers first_line headers
in
327 hsreq_accept_deflate
= false;
328 hsreq_content_deflate
= !!deflate_connections
&& h.hsrpl_accept_deflate
;
329 hsreq_ultrapeer_needed
= false;
330 hsreq_ultrapeer
= true;
331 hsreq_remote_address
=
332 Ip.to_string
(fst
(peer_addr sock
));
333 hsreq_local_address
= Printf.sprintf
"%s:%d"
334 (Ip.to_string
(client_ip
(Connection sock
)))
338 let msg = make_http_header
339 "GNUTELLA/0.6 200 OK"
340 (make_handshake_request_headers
req)
342 if !verbose_msg_servers
then
343 lprintf_nl
"SENDING %s\n" (String.escaped
msg);
345 write_string sock
msg;
346 set_gnutella_sock sock
!verbose_msg_servers
347 (HttpReader
(gnutella_proto_len,
348 [gnutella_proto, supernode_handler2 node],
349 G2Functions.default_handler
));
352 if !verbose_msg_servers
then
353 lprintf
"DISCONNECT WITH EXCEPTION %s\n" (Printexc2.to_string e
);
354 close sock
(Closed_for_exception e
)
356 (*************************************************************************)
358 (* start_supernode *)
360 (*************************************************************************)
362 let gnutella_connect = "GNUTELLA CONNECT/"
363 let gnutella_connect_len = String.length
gnutella_connect
365 let start_supernode enabler
=
366 if not
!supernode_started then begin
367 supernode_started := true;
370 let sock = TcpServerSocket.create
"gnutella supernode"
375 TcpServerSocket.CONNECTION
(s
,
376 Unix.ADDR_INET
(from_ip
, from_port
)) ->
377 lprintf
"CONNECTION RECEIVED FROM %s FOR SUPERNODE\n"
378 (Ip.to_string
(Ip.of_inet_addr from_ip
))
381 lprintf
"*********** CONNECTION ***********\n";
383 let token = create_token connection_manager
in
384 let sock = TcpBufferedSocket.create
token
385 "gnutella client connection" s
388 BASIC_EVENT
(RTIMEOUT
|LTIMEOUT
) -> close
sock Closed_for_timeout
392 TcpBufferedSocket.set_read_controler
sock download_control
;
393 TcpBufferedSocket.set_write_controler
sock upload_control
;
395 let node = ref None
in
396 TcpBufferedSocket.set_closer
sock (fun _ s
->
398 Some
node -> disconnect_node node s
401 TcpBufferedSocket.set_rtimeout
sock 30.;
402 set_gnutella_sock
sock !verbose_msg_servers
403 (HttpReader
(gnutella_connect_len,
404 [gnutella_connect, supernode_handler1 node],
405 G2Functions.default_handler
));
408 add_session_timer enabler
60. (fun _
->
409 List.iter
(fun node ->
411 if List.length
node.node_packets
> !!supernode_max_activity then
412 disconnect_node node (Closed_for_error
"Too active")
416 (* Forget packets older than 3 minutes *)
417 remove_packets (Fifo.take
node.node_former_packets
);
418 Fifo.put
node.node_former_packets
node.node_packets
;
419 node.node_packets
<- [];
423 supernode_sock := Some
sock;
426 lprintf
"Exception %s while initializing gnutella supernode\n"
427 (Printexc2.to_string e
)
431 (*************************************************************************)
435 (*************************************************************************)
437 let stop_supernode () =
438 if !supernode_started then begin
439 supernode_started := false;
441 (match !supernode_sock with None
-> ()
443 supernode_sock := None
;
444 TcpServerSocket.close
sock Closed_by_user
);
448 (*************************************************************************)
452 (*************************************************************************)
455 gnutella_enabled := Some enabler
;
456 if !!supernode_enabled then start_supernode enabler
458 (*************************************************************************)
462 (*************************************************************************)
465 gnutella_enabled := None
;
468 (*************************************************************************)
472 (*************************************************************************)
475 option_hook
supernode_enabled (fun _ ->
476 match !gnutella_enabled with
479 if !!supernode_enabled then start_supernode enabler
480 else stop_supernode ()
482 plugin_enable_hooks
:= enable :: !plugin_enable_hooks
;
483 plugin_disable_hooks
:= disable :: !plugin_disable_hooks