sync zlibstubs.c with upstream
[mldonkey.git] / src / networks / bittorrent / bT_DHT.ml
blob71569ed2d5775bcc23007cc4745ce9a4c4934121
1 (**
2 DHT
4 http://www.bittorrent.org/beps/bep_0005.html
5 *)
7 open Kademlia
8 open Printf
10 let dht_query_timeout = 20
11 let store_peer_timeout = minutes 30
12 let secret_timeout = minutes 10
13 let alpha = 3
15 let log_prefix = "[dht]"
16 let lprintf_nl ?exn fmt = Printf2.lprintf_nl2 ?exn log_prefix fmt
18 let catch f x = try `Ok (f x) with e -> `Exn e
19 let (&) f x = f x
20 let (!!) = Lazy.force
22 let self_version =
23 let module A = Autoconf in
24 let n = int_of_string A.major_version * 100 + int_of_string A.minor_version * 10 + int_of_string A.sub_version - 300 in
25 assert (n > 0 && n < 256);
26 sprintf "ML%c%c" (if A.scm_version = "" then '=' else '+') (Char.chr n)
28 (* 2-level association *)
29 module Assoc2 : sig
31 type ('a,'b,'c) t
32 val create : unit -> ('a,'b,'c) t
33 val add : ('a,'b,'c) t -> 'a -> 'b -> 'c -> unit
34 val find_all : ('a,'b,'c) t -> 'a -> ('b,'c) Hashtbl.t
35 val find : ('a,'b,'c) t -> 'a -> 'b -> 'c option
36 val remove : ('a,'b,'c) t -> 'a -> 'b -> unit
37 val iter : ('a,'b,'c) t -> ('a -> 'b -> 'c -> unit) -> unit
38 val clear : ('a,'b,'c) t -> unit
40 end = struct
42 type ('a,'b,'c) t = ('a, ('b, 'c) Hashtbl.t) Hashtbl.t
44 let create () = Hashtbl.create 13
45 let add h a b c =
46 let hh = try Hashtbl.find h a with Not_found -> Hashtbl.create 3 in
47 Hashtbl.replace hh b c;
48 Hashtbl.replace h a hh
49 let find_all h a = try Hashtbl.find h a with Not_found -> Hashtbl.create 3
50 let find h a b = try Some (Hashtbl.find (Hashtbl.find h a) b) with Not_found -> None
51 let remove h a b = try let ha = Hashtbl.find h a in Hashtbl.remove ha b; if Hashtbl.length ha = 0 then Hashtbl.remove h a with Not_found -> ()
52 let iter h f = Hashtbl.iter (fun a h -> Hashtbl.iter (fun b c -> f a b c) h) h
53 let clear h = Hashtbl.clear h
55 end
57 let stats_add h k n = Hashtbl.replace h k (n + try Hashtbl.find h k with Not_found -> 0)
59 module KRPC = struct
61 type dict = (string * Bencode.value) list
62 let show_dict d = String.concat "," & List.map fst d
64 type msg =
65 | Query of string * dict
66 | Response of dict
67 | Error of int64 * string
69 let show_msg = function
70 | Query (name,args) -> sprintf "query %s(%s)" name (show_dict args)
71 | Response d -> sprintf "response (%s)" (show_dict d)
72 | Error (e,s) -> sprintf "error (%Ld,%S)" e s
74 let encode (txn,msg) =
75 let module B = Bencode in
76 let x = match msg with
77 | Query (name,args) -> ["y", B.String "q"; "q", B.String name; "a", B.Dictionary args]
78 | Response dict -> ["y", B.String "r"; "r", B.Dictionary dict]
79 | Error (code,text) -> ["y", B.String "e"; "e", B.List [B.Int code; B.String text] ]
81 let x = ("t", B.String txn) :: ("v", B.String self_version):: x in
82 B.encode (B.Dictionary x)
84 let str = function Bencode.String s -> s | _ -> failwith "str"
85 let int = function Bencode.Int s -> s | _ -> failwith "int"
86 let dict = function Bencode.Dictionary s -> s | _ -> failwith "dict"
87 let list = function Bencode.List l -> l | _ -> failwith "list"
89 exception Protocol_error of string * string
90 exception Malformed_packet of string
91 exception Method_unknown of string
93 let decode_exn s =
94 let module B = Bencode in
95 let module Array = struct let get x k = match x with B.Dictionary l -> List.assoc k l | _ -> failwith "decode get" end in
96 let x = try B.decode s with _ -> raise (Malformed_packet "decode") in
97 let txn = try str x.("t") with _ -> raise (Malformed_packet "txn") in
98 let ver = try Some (str x.("v")) with _ -> None in
99 try
100 let msg = match str x.("y") with
101 | "q" -> Query (str x.("q"), dict x.("a"))
102 | "r" -> Response (dict x.("r"))
103 | "e" -> begin match list x.("e") with B.Int n :: B.String s :: _ -> Error (n, s) | _ -> failwith "decode e" end
104 | _ -> failwith "type"
105 in (txn, ver, msg)
106 with
107 exn ->
108 if !verb then lprintf_nl ~exn "err";
109 raise (Protocol_error (txn,"Invalid argument"))
111 open BasicSocket
112 open UdpSocket
114 let udp_set_reader socket f =
115 set_reader socket begin fun _ ->
116 try read_packets socket f with exn ->
117 if !verb then lprintf_nl ~exn "udp reader";
118 close socket (Closed_for_exception exn)
121 module A = Assoc2
123 let send sock stats (ip,port as addr) txnmsg =
124 let s = encode txnmsg in
125 if !debug then lprintf_nl "KRPC to %s : %S" (show_addr addr) s;
126 stats_add stats `Sent 1;
127 stats_add stats `SentBytes (String.length s);
128 write sock false s ip port
130 type stats_key = [ `Timeout | `Sent | `SentBytes | `Recv | `RecvBytes | `Decoded | `Handled | `NoTxn ]
131 type t =
132 UdpSocket.t *
133 (stats_key, int) Hashtbl.t *
134 (addr, string, (addr -> dict -> unit) * ([`Error|`Timeout]-> unit) * int) A.t
135 let show_stats t =
136 let get k = try Hashtbl.find t k with Not_found -> 0 in
138 sprintf "rpc recv %d pkts (%d bytes)" (get `Recv) (get `RecvBytes);
139 sprintf "rpc sent %d pkts (%d bytes)" (get `Sent) (get `SentBytes);
140 sprintf "rpc decoded %d, handled %d" (get `Decoded) (get `Handled);
141 sprintf "rpc timeouted %d, orphan %d" (get `Timeout) (get `NoTxn);
144 let create port enabler bw_control answer : t =
145 let socket = create Unix.inet_addr_any port (fun sock event ->
146 match event with
147 | WRITE_DONE | CAN_REFILL -> ()
148 | READ_DONE -> assert false (* set_reader prevents this *)
149 | BASIC_EVENT x -> match x with
150 | CLOSED _ -> ()
151 | CAN_READ | CAN_WRITE -> assert false (* udpSocket implementation prevents this *)
152 | LTIMEOUT | WTIMEOUT | RTIMEOUT -> () (*close sock (Closed_for_error "KRPC timeout")*))
154 set_write_controler socket bw_control;
155 set_wtimeout (sock socket) 5.;
156 set_rtimeout (sock socket) 5.;
157 let h = A.create () in
158 let stats = Hashtbl.create 10 in
159 let timeout h =
160 let now = last_time () in
161 let bad = ref [] in
162 let total = ref 0 in
163 A.iter h (fun addr txn (_,kerr,t) -> incr total; if t < now then bad := (addr,txn,kerr) :: !bad);
164 if !debug then lprintf_nl "timeouted %d of %d DHT queries" (List.length !bad) !total;
165 stats_add stats `Timeout (List.length !bad);
166 List.iter (fun (addr,txn,kerr) ->
167 A.remove h addr txn;
168 try kerr `Timeout with exn -> if !debug then lprintf_nl ~exn "timeout for %s" (show_addr addr)) !bad;
170 BasicSocket.add_session_timer enabler 5. (fun () -> timeout h);
171 let handle addr (txn,ver,msg) =
172 let version = lazy (match ver with Some s -> sprintf " client %S" s | None -> "") in
173 if !debug then lprintf_nl "KRPC from %s %stxn %S : %s" (show_addr addr) !!version txn (show_msg msg);
174 match msg with
175 | Error (code,msg) ->
176 if !verb then lprintf_nl "error received from %s%s : %Ld %S" (show_addr addr) !!version code msg;
177 begin match A.find h addr txn with
178 | None ->
179 stats_add stats `NoTxn 1;
180 if !verb then lprintf_nl "no txn %S for %s%s" txn (show_addr addr) !!version
181 | Some (_, kerr, _) -> A.remove h addr txn; kerr `Error
183 | Query (name,args) ->
184 let ret = answer addr name args in
185 send socket stats addr (txn, ret)
186 | Response ret ->
187 match A.find h addr txn with
188 | None ->
189 stats_add stats `NoTxn 1;
190 if !verb then lprintf_nl "no txn %S for %s%s" txn (show_addr addr) !!version
191 | Some (k,_,_) -> A.remove h addr txn; k addr ret
193 let handle p =
194 match p.udp_addr with
195 | Unix.ADDR_UNIX _ -> assert false
196 | Unix.ADDR_INET (inet_addr,port) ->
197 let addr = (Ip.of_inet_addr inet_addr, port) in
198 let ret = ref None in
200 stats_add stats `RecvBytes (String.length p.udp_content);
201 stats_add stats `Recv 1;
202 let r = decode_exn p.udp_content in
203 stats_add stats `Decoded 1;
204 ret := Some r;
205 handle addr r;
206 stats_add stats `Handled 1;
207 with exn ->
208 let version = match !ret with Some (_,Some s,_) -> sprintf " client %S" s | _ -> "" in
209 if !verb then lprintf_nl ~exn "handle packet from %s%s : %S" (show_addr addr) version p.udp_content;
210 let error txn code str = send socket stats addr (txn,(Error (Int64.of_int code,str))) in
211 match exn,!ret with
212 | Malformed_packet x, Some (txn, _, _)
213 | Protocol_error ("",x), Some(txn, _, _) | Protocol_error (txn,x), _ -> error txn 203 x
214 | Method_unknown x, Some (txn, _, _) -> error txn 204 x
215 | _, Some (txn, _, Query _) -> error txn 202 ""
216 | _ -> ()
218 udp_set_reader socket handle;
219 (socket,stats,h)
221 let shutdown (socket,_,h) =
222 close socket Closed_by_user;
223 A.iter h (fun addr _ (_,kerr,_) ->
224 try kerr `Timeout with exn -> if !verb then lprintf_nl ~exn "shutdown for %s" (show_addr addr));
225 A.clear h
227 let write (socket,stats,h) msg addr k ~kerr =
228 let tt = Assoc2.find_all h addr in
229 let rec loop () = (* choose txn FIXME *)
230 let txn = string_of_int (Random.int 1_000_000) in
231 match Hashtbl.mem tt txn with
232 | true -> loop ()
233 | false -> txn
235 let txn = loop () in
236 Assoc2.add h addr txn (k,kerr,last_time () + dht_query_timeout);
237 send socket stats addr (txn,msg)
239 end (* KRPC *)
241 type query =
242 | Ping
243 | FindNode of id
244 | GetPeers of H.t
245 | Announce of H.t * int * string
247 let show_query = function
248 | Ping -> "ping"
249 | FindNode id -> sprintf "find_node %s" (show_id id)
250 | GetPeers h -> sprintf "get_peers %s" (show_id h)
251 | Announce (h,port,token) -> sprintf "announce %s port=%d token=%S" (show_id h) port token
253 type response =
254 | Ack
255 | Nodes of (id * addr) list
256 | Peers of string * addr list * (id * addr) list
258 let strl f l = "[" ^ (String.concat " " & List.map f l) ^ "]"
260 let show_node (id,addr) = sprintf "%s (%s)" (show_addr addr) (show_id id)
262 let show_response = function
263 | Ack -> "ack"
264 | Nodes l -> sprintf "nodes %s" (strl show_node l)
265 | Peers (token,peers,nodes) -> sprintf "peers token=%S %s %s" token (strl show_addr peers) (strl show_node nodes)
267 let parse_query_exn name args =
268 let get k = List.assoc k args in
269 let sha1 k = H.direct_of_string & KRPC.str & get k in
270 let p = match name with
271 | "ping" -> Ping
272 | "find_node" -> FindNode (sha1 "target")
273 | "get_peers" -> GetPeers (sha1 "info_hash")
274 | "announce_peer" -> Announce (sha1 "info_hash", Int64.to_int & KRPC.int & get "port", KRPC.str & get "token")
275 | s -> failwith (sprintf "parse_query name=%s" name)
277 sha1 "id", p
279 let make_query id x =
280 let sha1 x = Bencode.String (H.direct_to_string x) in
281 let self = ("id", sha1 id) in
282 match x with
283 | Ping -> KRPC.Query ("ping", [self])
284 | FindNode t -> KRPC.Query ("find_node", ["target", sha1 t; self])
285 | GetPeers h -> KRPC.Query ("get_peers", ["info_hash", sha1 h; self])
286 | Announce (h, port, token) -> KRPC.Query ("announce_peer",
287 ["info_hash", sha1 h;
288 "port", Bencode.Int (Int64.of_int port);
289 "token", Bencode.String token;
290 self])
292 let parse_peer s =
293 if String.length s <> 6 then failwith "parse_peer" else
294 let c i = int_of_char & s.[i] in
295 Ip.of_ints (c 0,c 1,c 2,c 3), (c 4 lsl 8 + c 5)
297 let parse_nodes s =
298 assert (String.length s mod 26 = 0);
299 let i = ref 0 in
300 let nodes = ref [] in
301 while !i < String.length s do
302 nodes := (H.direct_of_string (String.sub s !i 20), parse_peer (String.sub s (!i+20) 6)) :: !nodes;
303 i := !i + 26;
304 done;
305 !nodes
307 let make_peer (ip,port) =
308 assert (port <= 0xffff);
309 let (a,b,c,d) = Ip.to_ints ip in
310 let e = port lsr 8 and f = port land 0xff in
311 let s = String.create 6 in
312 let set i c = s.[i] <- char_of_int c in
313 set 0 a; set 1 b; set 2 c; set 3 d; set 4 e; set 5 f;
316 let make_nodes nodes =
317 let s = String.create (26 * List.length nodes) in
318 let i = ref 0 in
319 List.iter (fun (id,addr) ->
320 String.blit (H.direct_to_string id) 0 s (!i*26) 20;
321 String.blit (make_peer addr) 0 s (!i*26+20) 6;
322 incr i
323 ) nodes;
326 let parse_response_exn q dict =
327 let get k = List.assoc k dict in
328 let sha1 k = H.direct_of_string & KRPC.str & get k in
329 let p = match q with
330 | Ping -> Ack
331 | FindNode _ ->
332 let s = KRPC.str & get "nodes" in
333 Nodes (parse_nodes s)
334 | GetPeers _ ->
335 let token = KRPC.str & get "token" in
336 let nodes = try parse_nodes (KRPC.str & get "nodes") with Not_found -> [] in
337 let peers = try List.map (fun x -> parse_peer & KRPC.str x) & (KRPC.list & get "values") with Not_found -> [] in
338 Peers (token, peers, nodes)
339 | Announce _ -> Ack
341 sha1 "id", p
343 let make_response id x =
344 let sha1 x = Bencode.String (H.direct_to_string x) in
345 let self = ("id", sha1 id) in
346 let str s = Bencode.String s in
347 match x with
348 | Ack -> KRPC.Response [self]
349 | Nodes nodes -> KRPC.Response [self;"nodes",str (make_nodes nodes)]
350 | Peers (token,peers,nodes) -> KRPC.Response
351 [self;
352 "token",str token;
353 "nodes",str (make_nodes nodes);
354 "values",Bencode.List (List.map (fun addr -> str (make_peer addr)) peers);
357 module Test = struct
359 open Bencode
361 let e = Dictionary ["t",String "aa"; "v", String self_version; "y", String "e"; "e", List [Int 201L; String "A Generic Error Occurred"] ]
362 let s = sprintf "d1:eli201e24:A Generic Error Occurrede1:t2:aa1:v4:%s1:y1:ee" self_version
363 let v = "aa", KRPC.Error (201L, "A Generic Error Occurred")
365 let () =
366 assert (encode e = s);
367 assert (KRPC.decode_exn s = (fst v, Some self_version, snd v));
368 assert (KRPC.encode v = s);
373 module Peers = Map.Make(struct type t = addr let compare = compare end)
375 module M = struct
377 type query_type = [ `Ping | `FindNode | `GetPeers | `Announce ]
378 type answer_type = [ `Answer | `Error | `Timeout ]
380 type t = {
381 rt : Kademlia.table; (* routing table *)
382 rpc : KRPC.t; (* KRPC protocol socket *)
383 dht_port : int; (* port *)
384 torrents : (H.t, int Peers.t) Hashtbl.t; (* torrents announced by other peers *)
385 enabler : bool ref; (* timers' enabler *)
386 stats : (query_type * [ `In | `Out of answer_type ], int) Hashtbl.t; (* statistics *)
389 let query_type_of_query = function
390 | Ping -> `Ping
391 | FindNode _ -> `FindNode
392 | GetPeers _ -> `GetPeers
393 | Announce _ -> `Announce
395 let dht_query t addr q k ~kerr =
396 if !debug then lprintf_nl "DHT query to %s : %s" (show_addr addr) (show_query q);
397 let qt = query_type_of_query q in
398 KRPC.write t.rpc (make_query t.rt.self q) addr begin fun addr dict ->
399 let (id,r) = try parse_response_exn q dict with exn -> stats_add t.stats (qt, `Out `Error) 1; kerr (); raise exn in
400 if !debug then lprintf_nl "DHT response from %s (%s) : %s" (show_addr addr) (show_id id) (show_response r);
401 stats_add t.stats (qt, `Out `Answer) 1;
402 k (id,addr) r
403 end ~kerr:(fun reason -> stats_add t.stats (qt, `Out (reason:>answer_type)) 1; kerr ())
405 let ping t addr k = dht_query t addr Ping begin fun node r ->
406 match r with Ack -> k (Some node)
407 | _ -> k None; failwith "dht_query ping" end ~kerr:(fun () -> k None)
409 let find_node t addr h k ~kerr = dht_query t addr (FindNode h) begin fun node r ->
410 match r with Nodes l -> k node l
411 | _ -> kerr (); failwith "dht_query find_node" end ~kerr
413 let get_peers t addr h k ~kerr = dht_query t addr (GetPeers h) begin fun node r ->
414 match r with Peers (token,peers,nodes) -> k node token peers nodes
415 | _ -> kerr (); failwith "dht_query get_peers" end ~kerr
417 let announce t addr port token h k ~kerr = dht_query t addr (Announce (h,port,token)) begin fun node r ->
418 match r with Ack -> k node
419 | _ -> kerr (); failwith "dht_query announce" end ~kerr
421 let store t info_hash addr =
422 let peers = try Hashtbl.find t.torrents info_hash with Not_found -> Peers.empty in
423 Hashtbl.replace t.torrents info_hash (Peers.add addr (BasicSocket.last_time () + store_peer_timeout) peers)
425 let manage_timeouts enabler h =
426 BasicSocket.add_session_timer enabler 60. begin fun () ->
427 let now = BasicSocket.last_time () in
428 let torrents = Hashtbl.fold (fun k peers l -> (k,peers)::l) h [] in
429 let rm = ref 0 in
430 let total = ref 0 in
431 List.iter (fun (id,peers) ->
432 let m = Peers.fold (* removing is rare *)
433 (fun peer expire m -> incr total; if expire < now then (incr rm; Peers.remove peer m) else m)
434 peers peers
436 if Peers.is_empty m then Hashtbl.remove h id else Hashtbl.replace h id m
437 ) torrents;
438 if !debug then lprintf_nl "Removed %d of %d peers for announced torrents" !rm !total
441 let create rt dht_port bw_control answer =
442 let enabler = ref true in
443 let rpc = KRPC.create dht_port enabler bw_control answer in
444 let torrents = Hashtbl.create 8 in
445 manage_timeouts enabler torrents;
446 { rt = rt; rpc = rpc; torrents = torrents; dht_port = dht_port; enabler = enabler;
447 stats = Hashtbl.create 10; }
449 let shutdown dht =
450 dht.enabler := false;
451 KRPC.shutdown dht.rpc
453 let peers_list f m = Peers.fold (fun peer tm l -> (f peer tm)::l) m []
454 let self_get_peers t h =
455 let peers = peers_list (fun a _ -> a) (try Hashtbl.find t.torrents h with Not_found -> Peers.empty) in
456 if List.length peers <= 100 then
457 peers
458 else
459 let a = Array.of_list peers in
460 Array2.shuffle a;
461 Array.to_list (Array.sub a 0 100)
463 let self_find_node t h = List.map (fun node -> node.id, node.addr) & Kademlia.find_node t.rt h
465 end (* module M *)
467 module Secret : sig
469 type t
470 val create : time -> t
471 val get : t -> string
472 val valid : t -> string -> bool
473 val get_prev : t -> string
475 end = struct
477 type t = { mutable cur : string; mutable prev : string; timeout : time; mutable next : time; }
478 let make () = string_of_int (Random.int 1_000_000)
479 let create tm =
480 assert (tm > 0);
481 let s = make () in
482 { cur = s; prev = s; timeout = tm; next = now () + tm; }
483 let invalidate t =
484 if now () > t.next then
485 begin
486 t.prev <- t.cur;
487 t.cur <- make ();
488 t.next <- now () + t.timeout;
490 let get t =
491 invalidate t;
492 t.cur
493 let get_prev t = t.prev
494 let valid t s =
495 invalidate t;
496 s = t.cur || s = t.prev
500 (* do not hash port cause some broken implementations change it all the time *)
501 let make_token (ip,_) h secret = string_of_int (Hashtbl.hash (Ip.to_string ip, H.direct_to_string h, secret))
503 let valid_token addr h secret token =
504 let cur = Secret.get secret in
505 let prev = Secret.get_prev secret in
506 token = make_token addr h cur || token = make_token addr h prev
508 module LimitedSet = struct
510 module type S = sig
512 type elt
513 type t
514 val create : int -> t
516 (** @return whether the element was really added *)
517 val insert : t -> elt -> bool
518 val elements : t -> elt list
519 val iter : t -> (elt -> unit) -> unit
520 val min_elt : t -> elt
522 end
524 module Make(Ord:Set.OrderedType) : S with type elt = Ord.t =
525 struct
527 module S = Set.Make(Ord)
529 type elt = Ord.t
530 type t = int ref * S.t ref
532 let create n = ref n, ref S.empty
534 let insert (left,set) elem =
535 match S.mem elem !set with
536 | true -> false
537 | false ->
538 match !left with
539 | 0 ->
540 let max = S.max_elt !set in
541 if Ord.compare elem max < 0 then
542 begin set := S.add elem (S.remove max !set); true end
543 else
544 false
545 | n ->
546 set := S.add elem !set;
547 decr left;
548 true
550 let iter (_,set) f = S.iter f !set
552 let elements (_,set) = S.elements !set
554 let min_elt (_,set) = S.min_elt !set
556 end (* Make *)
558 end (* LimitedSet *)
560 let update dht st id addr = update (M.ping dht) dht.M.rt st id addr
562 exception Break
564 (** @param nodes nodes to start search from, will not be inserted into routing table *)
565 let lookup_node dht ?nodes target k =
566 if !debug then lprintf_nl "lookup %s" (show_id target);
567 let start = BasicSocket.last_time () in
568 let module S = LimitedSet.Make(struct
569 type t = id * addr
570 let compare n1 n2 = Big_int.compare_big_int (distance target (fst n1)) (distance target (fst n2))
571 end) in
572 let found = S.create Kademlia.bucket_nodes in
573 let queried = Hashtbl.create 13 in
574 let active = ref 0 in
575 let check_ready () =
576 if 0 = !active then
577 begin
578 let result = S.elements found in
579 if !debug then lprintf_nl "lookup_node %s done, queried %d, found %d, elapsed %ds"
580 (show_id target) (Hashtbl.length queried) (List.length result) (BasicSocket.last_time () - start);
581 k result
584 let rec round nodes =
585 let inserted = List.fold_left (fun acc node -> if S.insert found node then acc + 1 else acc) 0 nodes in
586 begin try
587 let n = ref 0 in
588 S.iter found (fun node ->
589 if alpha = !n then raise Break;
590 if not (Hashtbl.mem queried node) then begin incr n; query true node end)
591 with Break -> () end;
592 inserted
593 and query store (id,addr as node) =
594 incr active;
595 Hashtbl.add queried node true;
596 if !debug then lprintf_nl "will query node %s" (show_node node);
597 M.find_node dht addr target begin fun (id,addr as node) nodes ->
598 if store then update dht Good id addr;
599 decr active;
600 let inserted = round nodes in
601 let s = try sprintf ", best %s" (show_id (fst (S.min_elt found))) with _ -> "" in
602 if !debug then lprintf_nl "got %d nodes from %s, useful %d%s" (List.length nodes) (show_node node) inserted s;
603 check_ready ()
604 end ~kerr:(fun () -> decr active; if !debug then lprintf_nl "timeout from %s" (show_node node); check_ready ())
606 begin match nodes with
607 | None -> let (_:int) = round (M.self_find_node dht target) in ()
608 | Some l -> List.iter (query false) l
609 end;
610 check_ready ()
612 let show_torrents torrents =
613 let now = BasicSocket.last_time () in
614 Hashtbl.iter (fun h peers ->
615 let l = M.peers_list (fun addr tm -> sprintf "%s (exp. %ds)" (show_addr addr) (tm - now)) peers in
616 lprintf_nl "torrent %s : %s" (H.to_hexa h) (String.concat " " l))
617 torrents
619 let show dht = show_table dht.M.rt; show_torrents dht.M.torrents
620 let stat dht =
621 buckets dht.M.rt,
622 size dht.M.rt,
623 Hashtbl.length dht.M.torrents,
624 Hashtbl.fold (fun _ peers acc -> acc + Peers.fold (fun _ _ acc -> acc + 1) peers 0) dht.M.torrents 0
625 let rpc_stats dht = let (_,st,_) = dht.M.rpc in KRPC.show_stats st
627 let bootstrap dht host addr k =
628 M.ping dht addr begin function
629 | Some node ->
630 if !verb then lprintf_nl "bootstrap node %s (%s) is up" (show_node node) host;
631 lookup_node dht ~nodes:[node] dht.M.rt.self (fun l ->
632 if !debug then lprintf_nl "bootstrap via %s (%s) : found %s" (show_addr addr) host (strl show_node l);
633 k (List.length l >= Kademlia.bucket_nodes))
634 | None ->
635 if !verb then lprintf_nl "bootstrap node %s (%s) is down" (show_addr addr) host;
636 k false
639 let bootstrap dht (host,port) k =
640 Ip.async_ip host
641 (fun ip -> bootstrap dht host (ip,port) k)
642 (fun () -> if !verb then lprintf_nl "boostrap node %s cannot be resolved" host; k false)
644 let bootstrap ?(routers=[]) dht =
645 lookup_node dht dht.M.rt.self begin fun l ->
646 if !debug then lprintf_nl "auto bootstrap : found %s" (strl show_node l);
647 let rec loop l ok =
648 match ok,l with
649 | true,_ -> if !verb then lprintf_nl "bootstrap ok, total nodes : %d" (size dht.M.rt)
650 | false,[] -> if !verb then lprintf_nl "boostrap failed, total nodes : %d" (size dht.M.rt)
651 | false,(node::nodes) -> bootstrap dht node (loop nodes)
653 loop routers (List.length l >= Kademlia.bucket_nodes)
656 let query_peers dht id k =
657 if !debug then lprintf_nl "query_peers: start %s" (H.to_hexa id);
658 lookup_node dht id (fun nodes ->
659 if !debug then lprintf_nl "query_peers: found nodes %s" (strl show_node nodes);
661 let found = ref Peers.empty in
662 let check =
663 let left = ref (List.length nodes + 1) (* one immediate check *) in
664 fun () -> decr left; if 0 = !left then k (Peers.fold (fun peer () l -> peer :: l) !found [])
667 List.iter begin fun node ->
668 M.get_peers dht (snd node) id begin fun node token peers nodes ->
669 if !debug then lprintf_nl "query_peers: got %d peers and %d nodes from %s with token %S"
670 (List.length peers) (List.length nodes) (show_node node) token;
671 k node token peers;
673 found := List.fold_left (fun acc peer -> Peers.add peer () acc) !found peers;
674 check ()
677 ~kerr:(fun () -> if !debug then lprintf_nl "query_peers: get_peers error from %s" (show_node node)(*; check ()*));
678 (* check () *)
679 end nodes)
681 let start rt port bw_control =
682 let secret = Secret.create secret_timeout in
683 let rec dht = lazy (M.create rt port bw_control answer)
684 and answer addr name args =
685 let (id,q) = parse_query_exn name args in
686 let node = (id,addr) in
687 if !debug then lprintf_nl "DHT query from %s : %s" (show_node node) (show_query q);
688 update !!dht Good id addr;
689 stats_add (!!dht).M.stats (M.query_type_of_query q, `In) 1;
690 let response =
691 match q with
692 | Ping -> Ack
693 | FindNode h -> Nodes (M.self_find_node !!dht h)
694 | GetPeers h ->
695 let token = make_token addr h (Secret.get secret) in
696 let peers = M.self_get_peers !!dht h in
697 let nodes = M.self_find_node !!dht h in
698 if !debug then lprintf_nl "answer with %d peers and %d nodes" (List.length peers) (List.length nodes);
699 Peers (token,peers,nodes)
700 | Announce (h,port,token) ->
701 if not (valid_token addr h secret token) then failwith ("invalid token " ^ token);
702 M.store !!dht h (fst addr, port);
705 if !debug then lprintf_nl "DHT response to %s : %s" (show_node node) (show_response response);
706 make_response (!!dht).M.rt.self response
708 let refresh () =
709 let ids = Kademlia.refresh (!!dht).M.rt in
710 if !debug then lprintf_nl "will refresh %d buckets" (List.length ids);
711 let cb prev_id (id,addr as node) l =
712 update !!dht Good id addr; (* replied *)
713 if prev_id <> id then
714 begin
715 if !debug then lprintf_nl "refresh: node %s changed id (was %s)" (show_node node) (show_id prev_id);
716 update !!dht Bad prev_id addr;
717 end;
718 if !debug then lprintf_nl "refresh: got %d nodes from %s" (List.length l) (show_node node);
719 List.iter (fun (id,addr) -> update !!dht Unknown id addr) l
721 List.iter (fun (target, nodes) ->
722 List.iter (fun (id,addr) -> M.find_node !!dht addr target (cb id) ~kerr:(fun () -> ())) nodes)
725 if !debug then lprintf_nl "DHT size : %d self : %s" (size (!!dht).M.rt) (show_id (!!dht).M.rt.self);
726 BasicSocket.add_session_timer (!!dht).M.enabler 60. refresh;
727 !!dht
729 let stop dht = M.shutdown dht