patch #7573
[mldonkey.git] / src / networks / bittorrent / bT_DHT.ml
blob6b77b0b18b18d53e47f4074a1226327f95d9624e
1 (**
2 DHT
4 http://www.bittorrent.org/beps/bep_0005.html
5 *)
7 open Kademlia
8 open Printf
10 let dht_query_timeout = 20
11 let store_peer_timeout = minutes 30
12 let secret_timeout = minutes 10
13 let alpha = 3
15 let log_prefix = "[dht]"
16 let lprintf_nl ?exn fmt = ksprintf (fun s ->
17 let exn = match exn with Some exn -> " : exn "^Printexc.to_string exn | None -> "" in
18 Printf2.lprintf_nl2 log_prefix "%s%s" s exn) fmt
20 let catch f x = try `Ok (f x) with e -> `Exn e
21 let (&) f x = f x
22 let (!!) = Lazy.force
24 let self_version =
25 let module A = Autoconf in
26 let n = int_of_string A.major_version * 100 + int_of_string A.minor_version * 10 + int_of_string A.sub_version - 300 in
27 assert (n > 0 && n < 256);
28 sprintf "ML%c%c" (if A.scm_version = "" then '=' else '+') (Char.chr n)
30 (* 2-level association *)
31 module Assoc2 : sig
33 type ('a,'b,'c) t
34 val create : unit -> ('a,'b,'c) t
35 val add : ('a,'b,'c) t -> 'a -> 'b -> 'c -> unit
36 val find_all : ('a,'b,'c) t -> 'a -> ('b,'c) Hashtbl.t
37 val find : ('a,'b,'c) t -> 'a -> 'b -> 'c option
38 val remove : ('a,'b,'c) t -> 'a -> 'b -> unit
39 val iter : ('a,'b,'c) t -> ('a -> 'b -> 'c -> unit) -> unit
40 val clear : ('a,'b,'c) t -> unit
42 end = struct
44 type ('a,'b,'c) t = ('a, ('b, 'c) Hashtbl.t) Hashtbl.t
46 let create () = Hashtbl.create 13
47 let add h a b c =
48 let hh = try Hashtbl.find h a with Not_found -> Hashtbl.create 3 in
49 Hashtbl.replace hh b c;
50 Hashtbl.replace h a hh
51 let find_all h a = try Hashtbl.find h a with Not_found -> Hashtbl.create 3
52 let find h a b = try Some (Hashtbl.find (Hashtbl.find h a) b) with Not_found -> None
53 let remove h a b = try let ha = Hashtbl.find h a in Hashtbl.remove ha b; if Hashtbl.length ha = 0 then Hashtbl.remove h a with Not_found -> ()
54 let iter h f = Hashtbl.iter (fun a h -> Hashtbl.iter (fun b c -> f a b c) h) h
55 let clear h = Hashtbl.clear h
57 end
59 let stats_add h k n = Hashtbl.replace h k (n + try Hashtbl.find h k with Not_found -> 0)
61 module KRPC = struct
63 type dict = (string * Bencode.value) list
64 let show_dict d = String.concat "," & List.map fst d
66 type msg =
67 | Query of string * dict
68 | Response of dict
69 | Error of int64 * string
71 let show_msg = function
72 | Query (name,args) -> sprintf "query %s(%s)" name (show_dict args)
73 | Response d -> sprintf "response (%s)" (show_dict d)
74 | Error (e,s) -> sprintf "error (%Ld,%S)" e s
76 let encode (txn,msg) =
77 let module B = Bencode in
78 let x = match msg with
79 | Query (name,args) -> ["y", B.String "q"; "q", B.String name; "a", B.Dictionary args]
80 | Response dict -> ["y", B.String "r"; "r", B.Dictionary dict]
81 | Error (code,text) -> ["y", B.String "e"; "e", B.List [B.Int code; B.String text] ]
83 let x = ("t", B.String txn) :: ("v", B.String self_version):: x in
84 B.encode (B.Dictionary x)
86 let str = function Bencode.String s -> s | _ -> failwith "str"
87 let int = function Bencode.Int s -> s | _ -> failwith "int"
88 let dict = function Bencode.Dictionary s -> s | _ -> failwith "dict"
89 let list = function Bencode.List l -> l | _ -> failwith "list"
91 exception Protocol_error of string * string
92 exception Malformed_packet of string
93 exception Method_unknown of string
95 let decode_exn s =
96 let module B = Bencode in
97 let module Array = struct let get x k = match x with B.Dictionary l -> List.assoc k l | _ -> failwith "decode get" end in
98 let x = try B.decode s with _ -> raise (Malformed_packet "decode") in
99 let txn = try str x.("t") with _ -> raise (Malformed_packet "txn") in
100 let ver = try Some (str x.("v")) with _ -> None in
102 let msg = match str x.("y") with
103 | "q" -> Query (str x.("q"), dict x.("a"))
104 | "r" -> Response (dict x.("r"))
105 | "e" -> begin match list x.("e") with B.Int n :: B.String s :: _ -> Error (n, s) | _ -> failwith "decode e" end
106 | _ -> failwith "type"
107 in (txn, ver, msg)
108 with
109 exn ->
110 if !verb then lprintf_nl ~exn "err";
111 raise (Protocol_error (txn,"Invalid argument"))
113 open BasicSocket
114 open UdpSocket
116 let udp_set_reader socket f =
117 set_reader socket begin fun _ ->
118 try read_packets socket f with exn ->
119 if !verb then lprintf_nl ~exn "udp reader";
120 close socket (Closed_for_exception exn)
123 module A = Assoc2
125 let send sock stats (ip,port as addr) txnmsg =
126 let s = encode txnmsg in
127 if !debug then lprintf_nl "KRPC to %s : %S" (show_addr addr) s;
128 stats_add stats `Sent 1;
129 stats_add stats `SentBytes (String.length s);
130 write sock false s ip port
132 type stats_key = [ `Timeout | `Sent | `SentBytes | `Recv | `RecvBytes | `Decoded | `Handled | `NoTxn ]
133 type t =
134 UdpSocket.t *
135 (stats_key, int) Hashtbl.t *
136 (addr, string, (addr -> dict -> unit) * ([`Error|`Timeout]-> unit) * int) A.t
137 let show_stats t =
138 let get k = try Hashtbl.find t k with Not_found -> 0 in
140 sprintf "rpc recv %d pkts (%d bytes)" (get `Recv) (get `RecvBytes);
141 sprintf "rpc sent %d pkts (%d bytes)" (get `Sent) (get `SentBytes);
142 sprintf "rpc decoded %d, handled %d" (get `Decoded) (get `Handled);
143 sprintf "rpc timeouted %d, orphan %d" (get `Timeout) (get `NoTxn);
146 let create port enabler bw_control answer : t =
147 let socket = create Unix.inet_addr_any port (fun sock event ->
148 match event with
149 | WRITE_DONE | CAN_REFILL -> ()
150 | READ_DONE -> assert false (* set_reader prevents this *)
151 | BASIC_EVENT x -> match x with
152 | CLOSED _ -> ()
153 | CAN_READ | CAN_WRITE -> assert false (* udpSocket implementation prevents this *)
154 | LTIMEOUT | WTIMEOUT | RTIMEOUT -> () (*close sock (Closed_for_error "KRPC timeout")*))
156 set_write_controler socket bw_control;
157 set_wtimeout (sock socket) 5.;
158 set_rtimeout (sock socket) 5.;
159 let h = A.create () in
160 let stats = Hashtbl.create 10 in
161 let timeout h =
162 let now = last_time () in
163 let bad = ref [] in
164 let total = ref 0 in
165 A.iter h (fun addr txn (_,kerr,t) -> incr total; if t < now then bad := (addr,txn,kerr) :: !bad);
166 if !debug then lprintf_nl "timeouted %d of %d DHT queries" (List.length !bad) !total;
167 stats_add stats `Timeout (List.length !bad);
168 List.iter (fun (addr,txn,kerr) ->
169 A.remove h addr txn;
170 try kerr `Timeout with exn -> if !debug then lprintf_nl ~exn "timeout for %s" (show_addr addr)) !bad;
172 BasicSocket.add_session_timer enabler 5. (fun () -> timeout h);
173 let handle addr (txn,ver,msg) =
174 let version = lazy (match ver with Some s -> sprintf " client %S" s | None -> "") in
175 if !debug then lprintf_nl "KRPC from %s %stxn %S : %s" (show_addr addr) !!version txn (show_msg msg);
176 match msg with
177 | Error (code,msg) ->
178 if !verb then lprintf_nl "error received from %s%s : %Ld %S" (show_addr addr) !!version code msg;
179 begin match A.find h addr txn with
180 | None ->
181 stats_add stats `NoTxn 1;
182 if !verb then lprintf_nl "no txn %S for %s%s" txn (show_addr addr) !!version
183 | Some (_, kerr, _) -> A.remove h addr txn; kerr `Error
185 | Query (name,args) ->
186 let ret = answer addr name args in
187 send socket stats addr (txn, ret)
188 | Response ret ->
189 match A.find h addr txn with
190 | None ->
191 stats_add stats `NoTxn 1;
192 if !verb then lprintf_nl "no txn %S for %s%s" txn (show_addr addr) !!version
193 | Some (k,_,_) -> A.remove h addr txn; k addr ret
195 let handle p =
196 match p.udp_addr with
197 | Unix.ADDR_UNIX _ -> assert false
198 | Unix.ADDR_INET (inet_addr,port) ->
199 let addr = (Ip.of_inet_addr inet_addr, port) in
200 let ret = ref None in
202 stats_add stats `RecvBytes (String.length p.udp_content);
203 stats_add stats `Recv 1;
204 let r = decode_exn p.udp_content in
205 stats_add stats `Decoded 1;
206 ret := Some r;
207 handle addr r;
208 stats_add stats `Handled 1;
209 with exn ->
210 let version = match !ret with Some (_,Some s,_) -> sprintf " client %S" s | _ -> "" in
211 if !verb then lprintf_nl ~exn "handle packet from %s%s : %S" (show_addr addr) version p.udp_content;
212 let error txn code str = send socket stats addr (txn,(Error (Int64.of_int code,str))) in
213 match exn,!ret with
214 | Malformed_packet x, Some (txn, _, _)
215 | Protocol_error ("",x), Some(txn, _, _) | Protocol_error (txn,x), _ -> error txn 203 x
216 | Method_unknown x, Some (txn, _, _) -> error txn 204 x
217 | _, Some (txn, _, Query _) -> error txn 202 ""
218 | _ -> ()
220 udp_set_reader socket handle;
221 (socket,stats,h)
223 let shutdown (socket,_,h) =
224 close socket Closed_by_user;
225 A.iter h (fun addr _ (_,kerr,_) ->
226 try kerr `Timeout with exn -> if !verb then lprintf_nl ~exn "shutdown for %s" (show_addr addr));
227 A.clear h
229 let write (socket,stats,h) msg addr k ~kerr =
230 let tt = Assoc2.find_all h addr in
231 let rec loop () = (* choose txn FIXME *)
232 let txn = string_of_int (Random.int 1_000_000) in
233 match Hashtbl.mem tt txn with
234 | true -> loop ()
235 | false -> txn
237 let txn = loop () in
238 Assoc2.add h addr txn (k,kerr,last_time () + dht_query_timeout);
239 send socket stats addr (txn,msg)
241 end (* KRPC *)
243 type query =
244 | Ping
245 | FindNode of id
246 | GetPeers of H.t
247 | Announce of H.t * int * string
249 let show_query = function
250 | Ping -> "ping"
251 | FindNode id -> sprintf "find_node %s" (show_id id)
252 | GetPeers h -> sprintf "get_peers %s" (show_id h)
253 | Announce (h,port,token) -> sprintf "announce %s port=%d token=%S" (show_id h) port token
255 type response =
256 | Ack
257 | Nodes of (id * addr) list
258 | Peers of string * addr list * (id * addr) list
260 let strl f l = "[" ^ (String.concat " " & List.map f l) ^ "]"
262 let show_node (id,addr) = sprintf "%s (%s)" (show_addr addr) (show_id id)
264 let show_response = function
265 | Ack -> "ack"
266 | Nodes l -> sprintf "nodes %s" (strl show_node l)
267 | Peers (token,peers,nodes) -> sprintf "peers token=%S %s %s" token (strl show_addr peers) (strl show_node nodes)
269 let parse_query_exn name args =
270 let get k = List.assoc k args in
271 let sha1 k = H.direct_of_string & KRPC.str & get k in
272 let p = match name with
273 | "ping" -> Ping
274 | "find_node" -> FindNode (sha1 "target")
275 | "get_peers" -> GetPeers (sha1 "info_hash")
276 | "announce_peer" -> Announce (sha1 "info_hash", Int64.to_int & KRPC.int & get "port", KRPC.str & get "token")
277 | s -> failwith (sprintf "parse_query name=%s" name)
279 sha1 "id", p
281 let make_query id x =
282 let sha1 x = Bencode.String (H.direct_to_string x) in
283 let self = ("id", sha1 id) in
284 match x with
285 | Ping -> KRPC.Query ("ping", [self])
286 | FindNode t -> KRPC.Query ("find_node", ["target", sha1 t; self])
287 | GetPeers h -> KRPC.Query ("get_peers", ["info_hash", sha1 h; self])
288 | Announce (h, port, token) -> KRPC.Query ("announce_peer",
289 ["info_hash", sha1 h;
290 "port", Bencode.Int (Int64.of_int port);
291 "token", Bencode.String token;
292 self])
294 let parse_peer s =
295 if String.length s <> 6 then failwith "parse_peer" else
296 let c i = int_of_char & s.[i] in
297 Ip.of_ints (c 0,c 1,c 2,c 3), (c 4 lsl 8 + c 5)
299 let parse_nodes s =
300 assert (String.length s mod 26 = 0);
301 let i = ref 0 in
302 let nodes = ref [] in
303 while !i < String.length s do
304 nodes := (H.direct_of_string (String.sub s !i 20), parse_peer (String.sub s (!i+20) 6)) :: !nodes;
305 i := !i + 26;
306 done;
307 !nodes
309 let make_peer (ip,port) =
310 assert (port <= 0xffff);
311 let (a,b,c,d) = Ip.to_ints ip in
312 let e = port lsr 8 and f = port land 0xff in
313 let s = String.create 6 in
314 let set i c = s.[i] <- char_of_int c in
315 set 0 a; set 1 b; set 2 c; set 3 d; set 4 e; set 5 f;
318 let make_nodes nodes =
319 let s = String.create (26 * List.length nodes) in
320 let i = ref 0 in
321 List.iter (fun (id,addr) ->
322 String.blit (H.direct_to_string id) 0 s (!i*26) 20;
323 String.blit (make_peer addr) 0 s (!i*26+20) 6;
324 incr i
325 ) nodes;
328 let parse_response_exn q dict =
329 let get k = List.assoc k dict in
330 let sha1 k = H.direct_of_string & KRPC.str & get k in
331 let p = match q with
332 | Ping -> Ack
333 | FindNode _ ->
334 let s = KRPC.str & get "nodes" in
335 Nodes (parse_nodes s)
336 | GetPeers _ ->
337 let token = KRPC.str & get "token" in
338 let nodes = try parse_nodes (KRPC.str & get "nodes") with Not_found -> [] in
339 let peers = try List.map (fun x -> parse_peer & KRPC.str x) & (KRPC.list & get "values") with Not_found -> [] in
340 Peers (token, peers, nodes)
341 | Announce _ -> Ack
343 sha1 "id", p
345 let make_response id x =
346 let sha1 x = Bencode.String (H.direct_to_string x) in
347 let self = ("id", sha1 id) in
348 let str s = Bencode.String s in
349 match x with
350 | Ack -> KRPC.Response [self]
351 | Nodes nodes -> KRPC.Response [self;"nodes",str (make_nodes nodes)]
352 | Peers (token,peers,nodes) -> KRPC.Response
353 [self;
354 "token",str token;
355 "nodes",str (make_nodes nodes);
356 "values",Bencode.List (List.map (fun addr -> str (make_peer addr)) peers);
359 module Test = struct
361 open Bencode
363 let e = Dictionary ["t",String "aa"; "v", String self_version; "y", String "e"; "e", List [Int 201L; String "A Generic Error Occurred"] ]
364 let s = sprintf "d1:eli201e24:A Generic Error Occurrede1:t2:aa1:v4:%s1:y1:ee" self_version
365 let v = "aa", KRPC.Error (201L, "A Generic Error Occurred")
367 let () =
368 assert (encode e = s);
369 assert (KRPC.decode_exn s = (fst v, Some self_version, snd v));
370 assert (KRPC.encode v = s);
375 module Peers = Map.Make(struct type t = addr let compare = compare end)
377 module M = struct
379 type query_type = [ `Ping | `FindNode | `GetPeers | `Announce ]
380 type answer_type = [ `Answer | `Error | `Timeout ]
382 type t = {
383 rt : Kademlia.table; (* routing table *)
384 rpc : KRPC.t; (* KRPC protocol socket *)
385 dht_port : int; (* port *)
386 torrents : (H.t, int Peers.t) Hashtbl.t; (* torrents announced by other peers *)
387 enabler : bool ref; (* timers' enabler *)
388 stats : (query_type * [ `In | `Out of answer_type ], int) Hashtbl.t; (* statistics *)
391 let query_type_of_query = function
392 | Ping -> `Ping
393 | FindNode _ -> `FindNode
394 | GetPeers _ -> `GetPeers
395 | Announce _ -> `Announce
397 let dht_query t addr q k ~kerr =
398 if !debug then lprintf_nl "DHT query to %s : %s" (show_addr addr) (show_query q);
399 let qt = query_type_of_query q in
400 KRPC.write t.rpc (make_query t.rt.self q) addr begin fun addr dict ->
401 let (id,r) = try parse_response_exn q dict with exn -> stats_add t.stats (qt, `Out `Error) 1; kerr (); raise exn in
402 if !debug then lprintf_nl "DHT response from %s (%s) : %s" (show_addr addr) (show_id id) (show_response r);
403 stats_add t.stats (qt, `Out `Answer) 1;
404 k (id,addr) r
405 end ~kerr:(fun reason -> stats_add t.stats (qt, `Out (reason:>answer_type)) 1; kerr ())
407 let ping t addr k = dht_query t addr Ping begin fun node r ->
408 match r with Ack -> k (Some node)
409 | _ -> k None; failwith "dht_query ping" end ~kerr:(fun () -> k None)
411 let find_node t addr h k ~kerr = dht_query t addr (FindNode h) begin fun node r ->
412 match r with Nodes l -> k node l
413 | _ -> kerr (); failwith "dht_query find_node" end ~kerr
415 let get_peers t addr h k ~kerr = dht_query t addr (GetPeers h) begin fun node r ->
416 match r with Peers (token,peers,nodes) -> k node token peers nodes
417 | _ -> kerr (); failwith "dht_query get_peers" end ~kerr
419 let announce t addr port token h k ~kerr = dht_query t addr (Announce (h,port,token)) begin fun node r ->
420 match r with Ack -> k node
421 | _ -> kerr (); failwith "dht_query announce" end ~kerr
423 let store t info_hash addr =
424 let peers = try Hashtbl.find t.torrents info_hash with Not_found -> Peers.empty in
425 Hashtbl.replace t.torrents info_hash (Peers.add addr (BasicSocket.last_time () + store_peer_timeout) peers)
427 let manage_timeouts enabler h =
428 BasicSocket.add_session_timer enabler 60. begin fun () ->
429 let now = BasicSocket.last_time () in
430 let torrents = Hashtbl.fold (fun k peers l -> (k,peers)::l) h [] in
431 let rm = ref 0 in
432 let total = ref 0 in
433 List.iter (fun (id,peers) ->
434 let m = Peers.fold (* removing is rare *)
435 (fun peer expire m -> incr total; if expire < now then (incr rm; Peers.remove peer m) else m)
436 peers peers
438 if Peers.is_empty m then Hashtbl.remove h id else Hashtbl.replace h id m
439 ) torrents;
440 if !debug then lprintf_nl "Removed %d of %d peers for announced torrents" !rm !total
443 let create rt dht_port bw_control answer =
444 let enabler = ref true in
445 let rpc = KRPC.create dht_port enabler bw_control answer in
446 let torrents = Hashtbl.create 8 in
447 manage_timeouts enabler torrents;
448 { rt = rt; rpc = rpc; torrents = torrents; dht_port = dht_port; enabler = enabler;
449 stats = Hashtbl.create 10; }
451 let shutdown dht =
452 dht.enabler := false;
453 KRPC.shutdown dht.rpc
455 let peers_list f m = Peers.fold (fun peer tm l -> (f peer tm)::l) m []
456 let self_get_peers t h =
457 let peers = peers_list (fun a _ -> a) (try Hashtbl.find t.torrents h with Not_found -> Peers.empty) in
458 if List.length peers <= 100 then
459 peers
460 else
461 let a = Array.of_list peers in
462 Array2.shuffle a;
463 Array.to_list (Array.sub a 0 100)
465 let self_find_node t h = List.map (fun node -> node.id, node.addr) & Kademlia.find_node t.rt h
467 end (* module M *)
469 module Secret : sig
471 type t
472 val create : time -> t
473 val get : t -> string
474 val valid : t -> string -> bool
475 val get_prev : t -> string
477 end = struct
479 type t = { mutable cur : string; mutable prev : string; timeout : time; mutable next : time; }
480 let make () = string_of_int (Random.int 1_000_000)
481 let create tm =
482 assert (tm > 0);
483 let s = make () in
484 { cur = s; prev = s; timeout = tm; next = now () + tm; }
485 let invalidate t =
486 if now () > t.next then
487 begin
488 t.prev <- t.cur;
489 t.cur <- make ();
490 t.next <- now () + t.timeout;
492 let get t =
493 invalidate t;
494 t.cur
495 let get_prev t = t.prev
496 let valid t s =
497 invalidate t;
498 s = t.cur || s = t.prev
502 (* do not hash port cause some broken implementations change it all the time *)
503 let make_token (ip,_) h secret = string_of_int (Hashtbl.hash (Ip.to_string ip, H.direct_to_string h, secret))
505 let valid_token addr h secret token =
506 let cur = Secret.get secret in
507 let prev = Secret.get_prev secret in
508 token = make_token addr h cur || token = make_token addr h prev
510 module LimitedSet = struct
512 module type S = sig
514 type elt
515 type t
516 val create : int -> t
517 (** @return whether the element was really added *)
518 val insert : t -> elt -> bool
519 val elements : t -> elt list
520 val iter : t -> (elt -> unit) -> unit
521 val min_elt : t -> elt
523 end
525 module Make(Ord:Set.OrderedType) : S with type elt = Ord.t =
526 struct
528 module S = Set.Make(Ord)
530 type elt = Ord.t
531 type t = int ref * S.t ref
533 let create n = ref n, ref S.empty
535 let insert (left,set) elem =
536 match S.mem elem !set with
537 | true -> false
538 | false ->
539 match !left with
540 | 0 ->
541 let max = S.max_elt !set in
542 if Ord.compare elem max < 0 then
543 begin set := S.add elem (S.remove max !set); true end
544 else
545 false
546 | n ->
547 set := S.add elem !set;
548 decr left;
549 true
551 let iter (_,set) f = S.iter f !set
553 let elements (_,set) = S.elements !set
555 let min_elt (_,set) = S.min_elt !set
557 end (* Make *)
559 end (* LimitedSet *)
561 let update dht st id addr = update (M.ping dht) dht.M.rt st id addr
563 exception Break
565 (** @param nodes nodes to start search from, will not be inserted into routing table *)
566 let lookup_node dht ?nodes target k =
567 if !debug then lprintf_nl "lookup %s" (show_id target);
568 let start = BasicSocket.last_time () in
569 let module S = LimitedSet.Make(struct
570 type t = id * addr
571 let compare n1 n2 = Big_int.compare_big_int (distance target (fst n1)) (distance target (fst n2))
572 end) in
573 let found = S.create Kademlia.bucket_nodes in
574 let queried = Hashtbl.create 13 in
575 let active = ref 0 in
576 let check_ready () =
577 if 0 = !active then
578 begin
579 let result = S.elements found in
580 if !debug then lprintf_nl "lookup_node %s done, queried %d, found %d, elapsed %ds"
581 (show_id target) (Hashtbl.length queried) (List.length result) (BasicSocket.last_time () - start);
582 k result
585 let rec round nodes =
586 let inserted = List.fold_left (fun acc node -> if S.insert found node then acc + 1 else acc) 0 nodes in
587 begin try
588 let n = ref 0 in
589 S.iter found (fun node ->
590 if alpha = !n then raise Break;
591 if not (Hashtbl.mem queried node) then begin incr n; query true node end)
592 with Break -> () end;
593 inserted
594 and query store (id,addr as node) =
595 incr active;
596 Hashtbl.add queried node true;
597 if !debug then lprintf_nl "will query node %s" (show_node node);
598 M.find_node dht addr target begin fun (id,addr as node) nodes ->
599 if store then update dht Good id addr;
600 decr active;
601 let inserted = round nodes in
602 let s = try sprintf ", best %s" (show_id (fst (S.min_elt found))) with _ -> "" in
603 if !debug then lprintf_nl "got %d nodes from %s, useful %d%s" (List.length nodes) (show_node node) inserted s;
604 check_ready ()
605 end ~kerr:(fun () -> decr active; if !debug then lprintf_nl "timeout from %s" (show_node node); check_ready ())
607 begin match nodes with
608 | None -> let (_:int) = round (M.self_find_node dht target) in ()
609 | Some l -> List.iter (query false) l
610 end;
611 check_ready ()
613 let show_torrents torrents =
614 let now = BasicSocket.last_time () in
615 Hashtbl.iter (fun h peers ->
616 let l = M.peers_list (fun addr tm -> sprintf "%s (exp. %ds)" (show_addr addr) (tm - now)) peers in
617 lprintf_nl "torrent %s : %s" (H.to_hexa h) (String.concat " " l))
618 torrents
620 let show dht = show_table dht.M.rt; show_torrents dht.M.torrents
621 let stat dht =
622 buckets dht.M.rt,
623 size dht.M.rt,
624 Hashtbl.length dht.M.torrents,
625 Hashtbl.fold (fun _ peers acc -> acc + Peers.fold (fun _ _ acc -> acc + 1) peers 0) dht.M.torrents 0
626 let rpc_stats dht = let (_,st,_) = dht.M.rpc in KRPC.show_stats st
628 let bootstrap dht host addr k =
629 M.ping dht addr begin function
630 | Some node ->
631 if !verb then lprintf_nl "bootstrap node %s (%s) is up" (show_node node) host;
632 lookup_node dht ~nodes:[node] dht.M.rt.self (fun l ->
633 if !debug then lprintf_nl "bootstrap via %s (%s) : found %s" (show_addr addr) host (strl show_node l);
634 k (List.length l >= Kademlia.bucket_nodes))
635 | None ->
636 if !verb then lprintf_nl "bootstrap node %s (%s) is down" (show_addr addr) host;
637 k false
640 let bootstrap dht (host,port) k =
641 Ip.async_ip host
642 (fun ip -> bootstrap dht host (ip,port) k)
643 (fun () -> if !verb then lprintf_nl "boostrap node %s cannot be resolved" host; k false)
645 let bootstrap ?(routers=[]) dht =
646 lookup_node dht dht.M.rt.self begin fun l ->
647 if !debug then lprintf_nl "auto bootstrap : found %s" (strl show_node l);
648 let rec loop l ok =
649 match ok,l with
650 | true,_ -> if !verb then lprintf_nl "bootstrap ok, total nodes : %d" (size dht.M.rt)
651 | false,[] -> if !verb then lprintf_nl "boostrap failed, total nodes : %d" (size dht.M.rt)
652 | false,(node::nodes) -> bootstrap dht node (loop nodes)
654 loop routers (List.length l >= Kademlia.bucket_nodes)
657 let query_peers dht id k =
658 if !debug then lprintf_nl "query_peers: start %s" (H.to_hexa id);
659 lookup_node dht id (fun nodes ->
660 if !debug then lprintf_nl "query_peers: found nodes %s" (strl show_node nodes);
662 let found = ref Peers.empty in
663 let check =
664 let left = ref (List.length nodes + 1) (* one immediate check *) in
665 fun () -> decr left; if 0 = !left then k (Peers.fold (fun peer () l -> peer :: l) !found [])
668 List.iter begin fun node ->
669 M.get_peers dht (snd node) id begin fun node token peers nodes ->
670 if !debug then lprintf_nl "query_peers: got %d peers and %d nodes from %s with token %S"
671 (List.length peers) (List.length nodes) (show_node node) token;
672 k node token peers;
674 found := List.fold_left (fun acc peer -> Peers.add peer () acc) !found peers;
675 check ()
678 ~kerr:(fun () -> if !debug then lprintf_nl "query_peers: get_peers error from %s" (show_node node)(*; check ()*));
679 (* check () *)
680 end nodes)
682 let start rt port bw_control =
683 let secret = Secret.create secret_timeout in
684 let rec dht = lazy (M.create rt port bw_control answer)
685 and answer addr name args =
686 let (id,q) = parse_query_exn name args in
687 let node = (id,addr) in
688 if !debug then lprintf_nl "DHT query from %s : %s" (show_node node) (show_query q);
689 update !!dht Good id addr;
690 stats_add (!!dht).M.stats (M.query_type_of_query q, `In) 1;
691 let response =
692 match q with
693 | Ping -> Ack
694 | FindNode h -> Nodes (M.self_find_node !!dht h)
695 | GetPeers h ->
696 let token = make_token addr h (Secret.get secret) in
697 let peers = M.self_get_peers !!dht h in
698 let nodes = M.self_find_node !!dht h in
699 if !debug then lprintf_nl "answer with %d peers and %d nodes" (List.length peers) (List.length nodes);
700 Peers (token,peers,nodes)
701 | Announce (h,port,token) ->
702 if not (valid_token addr h secret token) then failwith ("invalid token " ^ token);
703 M.store !!dht h (fst addr, port);
706 if !debug then lprintf_nl "DHT response to %s : %s" (show_node node) (show_response response);
707 make_response (!!dht).M.rt.self response
709 let refresh () =
710 let ids = Kademlia.refresh (!!dht).M.rt in
711 if !debug then lprintf_nl "will refresh %d buckets" (List.length ids);
712 let cb prev_id (id,addr as node) l =
713 update !!dht Good id addr; (* replied *)
714 if prev_id <> id then
715 begin
716 if !debug then lprintf_nl "refresh: node %s changed id (was %s)" (show_node node) (show_id prev_id);
717 update !!dht Bad prev_id addr;
718 end;
719 if !debug then lprintf_nl "refresh: got %d nodes from %s" (List.length l) (show_node node);
720 List.iter (fun (id,addr) -> update !!dht Unknown id addr) l
722 List.iter (fun (target, nodes) ->
723 List.iter (fun (id,addr) -> M.find_node !!dht addr target (cb id) ~kerr:(fun () -> ())) nodes)
726 if !debug then lprintf_nl "DHT size : %d self : %s" (size (!!dht).M.rt) (show_id (!!dht).M.rt.self);
727 BasicSocket.add_session_timer (!!dht).M.enabler 60. refresh;
728 !!dht
730 let stop dht = M.shutdown dht