patch #7372
[mldonkey.git] / src / networks / gnutella2 / g2Supernode.ml
blob107f0d31cea07fb8f027d4a5ec588f639e607b33
1 (* Copyright 2001, 2002 b8_bavard, b8_fee_carabine, INRIA *)
2 (*
3 This file is part of mldonkey.
5 mldonkey is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation; either version 2 of the License, or
8 (at your option) any later version.
10 mldonkey is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 GNU General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with mldonkey; if not, write to the Free Software
17 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
20 open Queues
21 open Printf2
22 open Md4
23 open Options
25 open BasicSocket
26 open TcpBufferedSocket
28 open CommonDownloads
29 open CommonOptions
30 open CommonSearch
31 open CommonServer
32 open CommonComplexOptions
33 open CommonFile
34 open CommonTypes
35 open CommonGlobals
36 open CommonHosts
38 open G2Network
39 open G2Types
40 open G2Globals
41 open G2Options
42 open G2Protocol
43 open G2ComplexOptions
44 open G2Proto
45 open G2Servers
48 This is a simple ultrapeer implementation without Bloom filters. It can
49 be used only to test publication of files and upload between MLdonkey clients.
50 Maybe if Bloom filters are correctly implemented, it could be used as a normal
51 G2 ultrapeer...
54 (*************************************************************************)
55 (* *)
56 (* Types *)
57 (* *)
58 (*************************************************************************)
60 type node = {
61 mutable node_sock : tcp_connection;
62 mutable node_packets : Md4.t list;
63 mutable node_former_packets : Md4.t list Fifo.t;
64 mutable node_guids : Md4.t list;
65 mutable node_host : host;
69 (*************************************************************************)
70 (* *)
71 (* Constants *)
72 (* *)
73 (*************************************************************************)
75 let gnutella_connect_proto = "GNUTELLA CONNECT/"
77 (*************************************************************************)
78 (* *)
79 (* Global values *)
80 (* *)
81 (*************************************************************************)
83 let gnutella_enabled = ref None
84 let supernode_started = ref false
85 let supernode_sock = ref None
86 let nodes = ref []
87 let sent_packets = Hashtbl.create 1111
88 let node_guids = Hashtbl.create 1111
90 (*************************************************************************)
91 (* *)
92 (* Options *)
93 (* *)
94 (*************************************************************************)
96 let supernode_enabled = define_option gnutella_section
97 ["supernode_enabled"]
98 "(only for development tests)"
99 bool_option false
101 let supernode_port = define_option gnutella_section
102 ["supernode_port"]
103 "(only for development tests)"
104 port_option 6348
106 let supernode_degree = define_option gnutella_section
107 ["supernode_degree"]
108 "(only for development tests)"
109 int_option 20
111 (* Maximal number of packets received from one client per minute *)
112 let supernode_max_activity = define_option gnutella_section
113 ["supernode_max_activity"]
114 "(only for development tests)"
115 int_option 20
117 (*************************************************************************)
118 (* *)
119 (* remove_packets *)
120 (* *)
121 (*************************************************************************)
123 let remove_packets list =
124 List.iter (fun uid -> Hashtbl.remove sent_packets uid) list
126 (*************************************************************************)
127 (* *)
128 (* disconnect_node *)
129 (* *)
130 (*************************************************************************)
132 let disconnect_node node s =
133 match node.node_sock with
134 Connection sock ->
135 lprintf "DISCONNECTED FROM NODE %s\n" (string_of_reason s);
136 close sock s;
137 node.node_sock <- NoConnection;
138 nodes := List2.removeq node !nodes;
139 remove_packets node.node_packets;
140 Fifo.iter remove_packets node.node_former_packets;
141 List.iter (fun guid ->
142 Hashtbl.remove node_guids guid
143 ) node.node_guids
144 | ConnectionWaiting token -> assert false
145 | NoConnection -> ()
147 (*************************************************************************)
148 (* *)
149 (* node_send *)
150 (* *)
151 (*************************************************************************)
153 let node_send node t =
156 match node.node_sock with
157 NoConnection | ConnectionWaiting _ -> ()
158 | Connection sock ->
159 let m = server_msg_to_string t in
160 write_string sock m
162 host_send node.node_sock node.node_host t
164 (*************************************************************************)
165 (* *)
166 (* update_route *)
167 (* *)
168 (*************************************************************************)
170 let update_route puid node =
171 if not (Hashtbl.mem sent_packets puid) then begin
172 Hashtbl.add sent_packets puid node;
173 node.node_packets <- puid :: node.node_packets;
174 true
175 end else false
177 (*************************************************************************)
178 (* *)
179 (* node_send_ping *)
180 (* *)
181 (*************************************************************************)
183 let node_send_ping node =
184 node_send node
185 (packet PI [
186 packet (PI_UDP (client_ip NoConnection, !!client_port))[]])
188 (*************************************************************************)
189 (* *)
190 (* supernode_to_node *)
191 (* *)
192 (*************************************************************************)
194 let supernode_to_node node sock gconn p =
195 set_lifetime sock 600.;
196 if !verbose_msg_servers then begin
197 lprintf "RECEIVED supernode_to_node: %s\n"
198 (Print.print p)
199 end;
200 match p.g2_payload with
201 | PI ->
202 node_send node (packet PO []);
204 | Q2 quid ->
205 if update_route quid node then
206 List.iter (fun n -> if n != node then node_send n p) !nodes
208 (* Normally, the client immediatly replies with QA to ack the query...
209 forget it... *)
210 | QH2 (n, quid) ->
212 begin
213 if not (Hashtbl.mem node_guids quid) then begin
214 node.node_guids <- quid :: node.node_guids;
215 Hashtbl.add node_guids quid node
216 end;
220 let n = Hashtbl.find sent_packets quid in
221 node_send n p
222 with Not_found ->
223 lprintf "Unable to forward QueryReplyReq\n"
228 | PushReq t ->
229 if p.pkt_hops < 2 then
230 begin
232 let n = Hashtbl.find node_guids t.Push.guid in
233 node_send n { p with pkt_hops = p.pkt_hops + 1 }
234 with Not_found ->
235 lprintf "Unable to forward PushReq\n"
238 (* TODO: We don't care about all these ones currently *)
239 | QrtPatchReq _ -> ()
240 | QrtResetReq _ -> ()
241 (* We don't care about all these ones currently *)
242 | PongReq t -> ()
243 | ByeReq _ -> ()
244 | UnknownReq _ -> ()
245 | VendorReq _ -> ()
247 | _ -> ()
249 (*************************************************************************)
250 (* *)
251 (* supernode_handler2 *)
252 (* *)
253 (*************************************************************************)
255 let supernode_handler2 node_ref gconn sock (first_line, headers) =
256 lprintf "Entering supernode_handler2\n";
257 if List.length !nodes >= !!supernode_degree then begin
258 (* TODO somebody arrived before the final ack... close and forget this client *)
259 close sock Closed_by_user;
260 raise Exit
261 end;
263 (* The reply should be "GNUTELLA/0.6 200 OK" *)
264 let space_pos = String.index first_line ' ' in
265 let slash_pos = String.index first_line '/' in
266 let proto = String.sub first_line (slash_pos+1) (space_pos - slash_pos -1) in
267 let code = String.sub first_line (space_pos+1) 3 in
269 let h = server_parse_headers first_line headers in
271 if proto <> "0.6" then
272 failwith (Printf.sprintf "Bad protocol [%s]" proto);
273 if code <> "200" then
274 failwith (Printf.sprintf "Bad return code [%s]" code);
276 let (ip, port) = peer_addr sock in
277 let host = H.new_host (Ip.AddrIp ip) port Peer in
278 let node = {
279 node_sock = Connection sock;
280 node_packets = [];
281 node_former_packets = Fifo.create ();
282 node_guids = [];
283 node_host = host;
284 } in
285 (* No packets were sent during the last 3 minutes *)
286 Fifo.put node.node_former_packets [];
287 Fifo.put node.node_former_packets [];
288 Fifo.put node.node_former_packets [];
290 node_ref := Some node;
291 nodes := node :: !nodes;
294 TcpBufferedSocket.set_rtimeout sock 300.;
295 if h.hsrpl_content_deflate then deflate_connection sock;
296 node_send_ping node;
297 gconn.gconn_handler <- Reader
298 (g2_handler (supernode_to_node node sock))
301 (*************************************************************************)
302 (* *)
303 (* supernode_handler1 *)
304 (* *)
305 (*************************************************************************)
307 let gnutella_proto = "GNUTELLA/"
308 let gnutella_proto_len = String.length gnutella_proto
310 let supernode_handler1 node gconn sock (first_line, headers) =
311 lprintf "Entering supernode_handler1\n";
312 if List.length !nodes >= !!supernode_degree then begin
313 (* TODO hum... we should not close the socket like that, but send an error
314 reply... *)
315 close sock Closed_by_user;
316 raise Exit
317 end;
320 (* The request should be "GNUTELLA CONNECT/0.6" *)
321 if first_line <> "GNUTELLA CONNECT/0.6" then
322 failwith "Bad connection protocol";
324 let h = server_parse_headers first_line headers in
326 let req = {
327 hsreq_accept_deflate = false;
328 hsreq_content_deflate = !!deflate_connections && h.hsrpl_accept_deflate;
329 hsreq_ultrapeer_needed = false;
330 hsreq_ultrapeer = true;
331 hsreq_remote_address =
332 Ip.to_string (fst (peer_addr sock));
333 hsreq_local_address = Printf.sprintf "%s:%d"
334 (Ip.to_string (client_ip (Connection sock)))
335 !!client_port
338 let msg = make_http_header
339 "GNUTELLA/0.6 200 OK"
340 (make_handshake_request_headers req)
342 if !verbose_msg_servers then
343 lprintf_nl "SENDING %s\n" (String.escaped msg);
345 write_string sock msg;
346 set_gnutella_sock sock !verbose_msg_servers
347 (HttpReader (gnutella_proto_len,
348 [gnutella_proto, supernode_handler2 node],
349 G2Functions.default_handler));
351 with e ->
352 if !verbose_msg_servers then
353 lprintf "DISCONNECT WITH EXCEPTION %s\n" (Printexc2.to_string e);
354 close sock (Closed_for_exception e)
356 (*************************************************************************)
357 (* *)
358 (* start_supernode *)
359 (* *)
360 (*************************************************************************)
362 let gnutella_connect = "GNUTELLA CONNECT/"
363 let gnutella_connect_len = String.length gnutella_connect
365 let start_supernode enabler =
366 if not !supernode_started then begin
367 supernode_started := true;
370 let sock = TcpServerSocket.create "gnutella supernode"
371 Unix.inet_addr_any
372 !!supernode_port
373 (fun sock event ->
374 match event with
375 TcpServerSocket.CONNECTION (s,
376 Unix.ADDR_INET(from_ip, from_port)) ->
377 lprintf "CONNECTION RECEIVED FROM %s FOR SUPERNODE\n"
378 (Ip.to_string (Ip.of_inet_addr from_ip))
381 lprintf "*********** CONNECTION ***********\n";
383 let token = create_token connection_manager in
384 let sock = TcpBufferedSocket.create token
385 "gnutella client connection" s
386 (fun sock event ->
387 match event with
388 BASIC_EVENT (RTIMEOUT|LTIMEOUT) -> close sock Closed_for_timeout
389 | _ -> ()
392 TcpBufferedSocket.set_read_controler sock download_control;
393 TcpBufferedSocket.set_write_controler sock upload_control;
395 let node = ref None in
396 TcpBufferedSocket.set_closer sock (fun _ s ->
397 match !node with
398 Some node -> disconnect_node node s
399 | None -> ()
401 TcpBufferedSocket.set_rtimeout sock 30.;
402 set_gnutella_sock sock !verbose_msg_servers
403 (HttpReader (gnutella_connect_len,
404 [gnutella_connect, supernode_handler1 node],
405 G2Functions.default_handler));
406 | _ -> ()
407 ) in
408 add_session_timer enabler 60. (fun _ ->
409 List.iter (fun node ->
411 if List.length node.node_packets > !!supernode_max_activity then
412 disconnect_node node (Closed_for_error "Too active")
413 else begin
415 node_send_ping node;
416 (* Forget packets older than 3 minutes *)
417 remove_packets (Fifo.take node.node_former_packets);
418 Fifo.put node.node_former_packets node.node_packets;
419 node.node_packets <- [];
421 ) !nodes;
423 supernode_sock := Some sock;
425 with e ->
426 lprintf "Exception %s while initializing gnutella supernode\n"
427 (Printexc2.to_string e)
431 (*************************************************************************)
432 (* *)
433 (* stop_supernode *)
434 (* *)
435 (*************************************************************************)
437 let stop_supernode () =
438 if !supernode_started then begin
439 supernode_started := false;
441 (match !supernode_sock with None -> ()
442 | Some sock ->
443 supernode_sock := None;
444 TcpServerSocket.close sock Closed_by_user);
448 (*************************************************************************)
449 (* *)
450 (* enable *)
451 (* *)
452 (*************************************************************************)
454 let enable enabler =
455 gnutella_enabled := Some enabler;
456 if !!supernode_enabled then start_supernode enabler
458 (*************************************************************************)
459 (* *)
460 (* disable *)
461 (* *)
462 (*************************************************************************)
464 let disable () =
465 gnutella_enabled := None;
466 stop_supernode ()
468 (*************************************************************************)
469 (* *)
470 (* MAIN *)
471 (* *)
472 (*************************************************************************)
474 let _ =
475 option_hook supernode_enabled (fun _ ->
476 match !gnutella_enabled with
477 None -> ()
478 | Some enabler ->
479 if !!supernode_enabled then start_supernode enabler
480 else stop_supernode ()
482 plugin_enable_hooks := enable :: !plugin_enable_hooks;
483 plugin_disable_hooks := disable :: !plugin_disable_hooks