BT: stop asking users to submit unknown client ids
[mldonkey.git] / src / networks / bittorrent / bT_DHT.ml
bloba85db3be0af57f5ca2d26072a0109fe70dbaf995
1 (**
2 DHT
4 http://www.bittorrent.org/beps/bep_0005.html
5 *)
7 open Kademlia
8 open Printf
10 let dht_query_timeout = 20
11 let store_peer_timeout = minutes 30
12 let secret_timeout = minutes 10
13 let alpha = 3
15 let log_prefix = "dht"
16 let lprintf_nl fmt = Printf2.lprintf_nl2 log_prefix fmt
18 let log = new logger log_prefix
20 let catch f x = try `Ok (f x) with e -> `Exn e
21 let (&) f x = f x
22 let (!!) = Lazy.force
24 let self_version =
25 let module A = Autoconf in
26 let n = int_of_string A.major_version * 100 + int_of_string A.minor_version * 10 + int_of_string A.sub_version - 300 in
27 assert (n > 0 && n < 256);
28 sprintf "ML%c%c" (if A.scm_version = "" then '=' else '+') (Char.chr n)
30 (* 2-level association *)
31 module Assoc2 : sig
33 type ('a,'b,'c) t
34 val create : unit -> ('a,'b,'c) t
35 val add : ('a,'b,'c) t -> 'a -> 'b -> 'c -> unit
36 val find_all : ('a,'b,'c) t -> 'a -> ('b,'c) Hashtbl.t
37 val find : ('a,'b,'c) t -> 'a -> 'b -> 'c option
38 val remove : ('a,'b,'c) t -> 'a -> 'b -> unit
39 val iter : ('a,'b,'c) t -> ('a -> 'b -> 'c -> unit) -> unit
40 val clear : ('a,'b,'c) t -> unit
42 end = struct
44 type ('a,'b,'c) t = ('a, ('b, 'c) Hashtbl.t) Hashtbl.t
46 let create () = Hashtbl.create 13
47 let add h a b c =
48 let hh = try Hashtbl.find h a with Not_found -> Hashtbl.create 3 in
49 Hashtbl.replace hh b c;
50 Hashtbl.replace h a hh
51 let find_all h a = try Hashtbl.find h a with Not_found -> Hashtbl.create 3
52 let find h a b = try Some (Hashtbl.find (Hashtbl.find h a) b) with Not_found -> None
53 let remove h a b = try let ha = Hashtbl.find h a in Hashtbl.remove ha b; if Hashtbl.length ha = 0 then Hashtbl.remove h a with Not_found -> ()
54 let iter h f = Hashtbl.iter (fun a h -> Hashtbl.iter (fun b c -> f a b c) h) h
55 let clear h = Hashtbl.clear h
57 end
59 module KRPC = struct
61 type dict = (string * Bencode.value) list
62 let show_dict d = String.concat "," & List.map fst d
64 type msg =
65 | Query of string * dict
66 | Response of dict
67 | Error of int64 * string
69 let show_msg = function
70 | Query (name,args) -> sprintf "query %s(%s)" name (show_dict args)
71 | Response d -> sprintf "response (%s)" (show_dict d)
72 | Error (e,s) -> sprintf "error (%Ld,%S)" e s
74 let encode (txn,msg) =
75 let module B = Bencode in
76 let x = match msg with
77 | Query (name,args) -> ["y", B.String "q"; "q", B.String name; "a", B.Dictionary args]
78 | Response dict -> ["y", B.String "r"; "r", B.Dictionary dict]
79 | Error (code,text) -> ["y", B.String "e"; "e", B.List [B.Int code; B.String text] ]
81 let x = ("t", B.String txn) :: ("v", B.String self_version):: x in
82 B.encode (B.Dictionary x)
84 let str = function Bencode.String s -> s | _ -> failwith "str"
85 let int = function Bencode.Int s -> s | _ -> failwith "int"
86 let dict = function Bencode.Dictionary s -> s | _ -> failwith "dict"
87 let list = function Bencode.List l -> l | _ -> failwith "list"
89 exception Protocol_error of string * string
90 exception Malformed_packet of string
91 exception Method_unknown of string
93 let decode_exn s =
94 let module B = Bencode in
95 let module Array = struct let get x k = match x with B.Dictionary l -> List.assoc k l | _ -> failwith "decode get" end in
96 let x = try B.decode s with _ -> raise (Malformed_packet "decode") in
97 let txn = try str x.("t") with _ -> raise (Malformed_packet "txn") in
98 let ver = try Some (str x.("v")) with _ -> None in
99 try
100 let msg = match str x.("y") with
101 | "q" -> Query (str x.("q"), dict x.("a"))
102 | "r" -> Response (dict x.("r"))
103 | "e" -> begin match list x.("e") with B.Int n :: B.String s :: _ -> Error (n, s) | _ -> failwith "decode e" end
104 | _ -> failwith "type"
105 in (txn, ver, msg)
106 with
107 exn -> log #warn ~exn "err"; raise (Protocol_error (txn,"Invalid argument"))
109 open BasicSocket
110 open UdpSocket
112 let udp_set_reader socket f =
113 set_reader socket begin fun _ ->
114 try read_packets socket f with exn ->
115 log #warn ~exn "udp reader";
116 close socket (Closed_for_exception exn)
119 module A = Assoc2
121 let send sock (ip,port as addr) txnmsg =
122 let s = encode txnmsg in
123 log #debug "KRPC to %s : %S" (show_addr addr) s;
124 write sock false s ip port
126 type t = UdpSocket.t * (addr, string, (addr -> dict -> unit) * (unit -> unit) * int) A.t
128 let create port enabler bw_control answer : t =
129 let socket = create Unix.inet_addr_any port (fun sock event ->
130 match event with
131 | WRITE_DONE | CAN_REFILL -> ()
132 | READ_DONE -> assert false (* set_reader prevents this *)
133 | BASIC_EVENT x -> match x with
134 | CLOSED _ -> ()
135 | CAN_READ | CAN_WRITE -> assert false (* udpSocket implementation prevents this *)
136 | LTIMEOUT | WTIMEOUT | RTIMEOUT -> () (*close sock (Closed_for_error "KRPC timeout")*))
138 set_write_controler socket bw_control;
139 set_wtimeout (sock socket) 5.;
140 set_rtimeout (sock socket) 5.;
141 let h = A.create () in
142 let timeout h =
143 let now = last_time () in
144 let bad = ref [] in
145 let total = ref 0 in
146 A.iter h (fun addr txn (_,kerr,t) -> incr total; if t < now then bad := (addr,txn,kerr) :: !bad);
147 log #info "timeouted %d of %d DHT queries" (List.length !bad) !total;
148 List.iter (fun (addr,txn,kerr) ->
149 A.remove h addr txn;
150 try kerr () with exn -> log #info ~exn "timeout for %s" (show_addr addr)) !bad;
152 BasicSocket.add_session_timer enabler 5. (fun () -> timeout h);
153 let handle addr (txn,ver,msg) =
154 let version = match ver with Some s -> sprintf "client %S " s | None -> "" in
155 log #debug "KRPC from %s %stxn %S : %s" (show_addr addr) version txn (show_msg msg);
156 match msg with
157 | Error _ ->
158 begin match A.find h addr txn with
159 | None -> log #warn "no txn %S for %s %s (error received)" txn (show_addr addr) version
160 | Some (_, kerr, _) -> A.remove h addr txn; kerr ()
162 | Query (name,args) ->
163 let ret = answer addr name args in
164 send socket addr (txn, ret)
165 | Response ret ->
166 match A.find h addr txn with
167 | None -> log #warn "no txn %S for %s %s" txn (show_addr addr) version
168 | Some (k,_,_) -> A.remove h addr txn; k addr ret
170 let handle p =
171 match p.udp_addr with
172 | Unix.ADDR_UNIX _ -> assert false
173 | Unix.ADDR_INET (inet_addr,port) ->
174 let addr = (Ip.of_inet_addr inet_addr, port) in
175 let ret = ref None in
177 (* log #debug "recv %S" p.udp_content; *)
178 let r = decode_exn p.udp_content in
179 ret := Some r;
180 handle addr r
181 with exn ->
182 let version = match !ret with Some (_,Some s,_) -> sprintf " client %S" s | _ -> "" in
183 log #warn ~exn "dht handle packet from %s%s : %S" (show_addr addr) version p.udp_content;
184 let error txn code str = send socket addr (txn,(Error (Int64.of_int code,str))) in
185 match exn,!ret with
186 | Malformed_packet x, Some (txn, _, _)
187 | Protocol_error ("",x), Some(txn, _, _) | Protocol_error (txn,x), _ -> error txn 203 x
188 | Method_unknown x, Some (txn, _, _) -> error txn 204 x
189 | _, Some (txn, _, Query _) -> error txn 202 ""
190 | _ -> ()
192 udp_set_reader socket handle;
193 (socket,h)
195 let shutdown (socket,h) =
196 close socket Closed_by_user;
197 A.iter h (fun addr _ (_,kerr,_) ->
198 try kerr () with exn -> log #warn ~exn "shutdown for %s" (show_addr addr));
199 A.clear h
201 let write (socket,h) msg addr k ~kerr =
202 let tt = Assoc2.find_all h addr in
203 let rec loop () = (* choose txn FIXME *)
204 let txn = string_of_int (Random.int 1_000_000) in
205 match Hashtbl.mem tt txn with
206 | true -> loop ()
207 | false -> txn
209 let txn = loop () in
210 Assoc2.add h addr txn (k,kerr,last_time () + dht_query_timeout);
211 send socket addr (txn,msg)
213 end (* KRPC *)
215 type query =
216 | Ping
217 | FindNode of id
218 | GetPeers of H.t
219 | Announce of H.t * int * string
221 let show_query = function
222 | Ping -> "ping"
223 | FindNode id -> sprintf "find_node %s" (show_id id)
224 | GetPeers h -> sprintf "get_peers %s" (show_id h)
225 | Announce (h,port,token) -> sprintf "announce %s port=%d token=%S" (show_id h) port token
227 type response =
228 | Ack
229 | Nodes of (id * addr) list
230 | Peers of string * addr list * (id * addr) list
232 let strl f l = "[" ^ (String.concat " " & List.map f l) ^ "]"
234 let show_node (id,addr) = sprintf "%s (%s)" (show_addr addr) (show_id id)
236 let show_response = function
237 | Ack -> "ack"
238 | Nodes l -> sprintf "nodes %s" (strl show_node l)
239 | Peers (token,peers,nodes) -> sprintf "peers token=%S %s %s" token (strl show_addr peers) (strl show_node nodes)
241 let parse_query_exn name args =
242 let get k = List.assoc k args in
243 let sha1 k = H.direct_of_string & KRPC.str & get k in
244 let p = match name with
245 | "ping" -> Ping
246 | "find_node" -> FindNode (sha1 "target")
247 | "get_peers" -> GetPeers (sha1 "info_hash")
248 | "announce_peer" -> Announce (sha1 "info_hash", Int64.to_int & KRPC.int & get "port", KRPC.str & get "token")
249 | s -> failwith (sprintf "parse_query name=%s" name)
251 sha1 "id", p
253 let make_query id x =
254 let sha1 x = Bencode.String (H.direct_to_string x) in
255 let self = ("id", sha1 id) in
256 match x with
257 | Ping -> KRPC.Query ("ping", [self])
258 | FindNode t -> KRPC.Query ("find_node", ["target", sha1 t; self])
259 | GetPeers h -> KRPC.Query ("get_peers", ["info_hash", sha1 h; self])
260 | Announce (h, port, token) -> KRPC.Query ("announce_peer",
261 ["info_hash", sha1 h;
262 "port", Bencode.Int (Int64.of_int port);
263 "token", Bencode.String token;
264 self])
266 let parse_peer s =
267 if String.length s <> 6 then failwith "parse_peer" else
268 let c i = int_of_char & s.[i] in
269 Ip.of_ints (c 0,c 1,c 2,c 3), (c 4 lsl 8 + c 5)
271 let parse_nodes s =
272 assert (String.length s mod 26 = 0);
273 let i = ref 0 in
274 let nodes = ref [] in
275 while !i < String.length s do
276 nodes := (H.direct_of_string (String.sub s !i 20), parse_peer (String.sub s (!i+20) 6)) :: !nodes;
277 i := !i + 26;
278 done;
279 !nodes
281 let make_peer (ip,port) =
282 assert (port <= 0xffff);
283 let (a,b,c,d) = Ip.to_ints ip in
284 let e = port lsr 8 and f = port land 0xff in
285 let s = String.create 6 in
286 let set i c = s.[i] <- char_of_int c in
287 set 0 a; set 1 b; set 2 c; set 3 d; set 4 e; set 5 f;
290 let make_nodes nodes =
291 let s = String.create (26 * List.length nodes) in
292 let i = ref 0 in
293 List.iter (fun (id,addr) ->
294 String.blit (H.direct_to_string id) 0 s (!i*26) 20;
295 String.blit (make_peer addr) 0 s (!i*26+20) 6;
296 incr i
297 ) nodes;
300 let parse_response_exn q dict =
301 let get k = List.assoc k dict in
302 let sha1 k = H.direct_of_string & KRPC.str & get k in
303 let p = match q with
304 | Ping -> Ack
305 | FindNode _ ->
306 let s = KRPC.str & get "nodes" in
307 Nodes (parse_nodes s)
308 | GetPeers _ ->
309 let token = KRPC.str & get "token" in
310 let nodes = try parse_nodes (KRPC.str & get "nodes") with Not_found -> [] in
311 let peers = try List.map (fun x -> parse_peer & KRPC.str x) & (KRPC.list & get "values") with Not_found -> [] in
312 Peers (token, peers, nodes)
313 | Announce _ -> Ack
315 sha1 "id", p
317 let make_response id x =
318 let sha1 x = Bencode.String (H.direct_to_string x) in
319 let self = ("id", sha1 id) in
320 let str s = Bencode.String s in
321 match x with
322 | Ack -> KRPC.Response [self]
323 | Nodes nodes -> KRPC.Response [self;"nodes",str (make_nodes nodes)]
324 | Peers (token,peers,nodes) -> KRPC.Response
325 [self;
326 "token",str token;
327 "nodes",str (make_nodes nodes);
328 "values",Bencode.List (List.map (fun addr -> str (make_peer addr)) peers);
331 module Test = struct
333 open Bencode
335 let e = Dictionary ["t",String "aa"; "v", String self_version; "y", String "e"; "e", List [Int 201L; String "A Generic Error Occurred"] ]
336 let s = sprintf "d1:eli201e24:A Generic Error Occurrede1:t2:aa1:v4:%s1:y1:ee" self_version
337 let v = "aa", KRPC.Error (201L, "A Generic Error Occurred")
339 let () =
340 assert (encode e = s);
341 assert (KRPC.decode_exn s = (fst v, Some self_version, snd v));
342 assert (KRPC.encode v = s);
347 module Peers = Map.Make(struct type t = addr let compare = compare end)
349 module M = struct
351 type t = {
352 rt : Kademlia.table; (* routing table *)
353 rpc : KRPC.t; (* KRPC protocol socket *)
354 dht_port : int; (* port *)
355 torrents : (H.t, int Peers.t) Hashtbl.t; (* torrents announced by other peers *)
356 enabler : bool ref; (* timers' enabler *)
359 let dht_query t addr q k ~kerr =
360 log #info "DHT query to %s : %s" (show_addr addr) (show_query q);
361 KRPC.write t.rpc (make_query t.rt.self q) addr begin fun addr dict ->
362 let (id,r) = try parse_response_exn q dict with exn -> kerr (); raise exn in
363 log #info "DHT response from %s (%s) : %s" (show_addr addr) (show_id id) (show_response r);
364 k (id,addr) r
365 end ~kerr
367 let ping t addr k = dht_query t addr Ping begin fun node r ->
368 match r with Ack -> k (Some node)
369 | _ -> k None; failwith "dht_query ping" end ~kerr:(fun () -> k None)
371 let find_node t addr h k ~kerr = dht_query t addr (FindNode h) begin fun node r ->
372 match r with Nodes l -> k node l
373 | _ -> kerr (); failwith "dht_query find_node" end ~kerr
375 let get_peers t addr h k ~kerr = dht_query t addr (GetPeers h) begin fun node r ->
376 match r with Peers (token,peers,nodes) -> k node token peers nodes
377 | _ -> kerr (); failwith "dht_query get_peers" end ~kerr
379 let announce t addr port token h k ~kerr = dht_query t addr (Announce (h,port,token)) begin fun node r ->
380 match r with Ack -> k node
381 | _ -> kerr (); failwith "dht_query announce" end ~kerr
383 let store t info_hash addr =
384 let peers = try Hashtbl.find t.torrents info_hash with Not_found -> Peers.empty in
385 Hashtbl.replace t.torrents info_hash (Peers.add addr (BasicSocket.last_time () + store_peer_timeout) peers)
387 let manage_timeouts enabler h =
388 BasicSocket.add_session_timer enabler 60. begin fun () ->
389 let now = BasicSocket.last_time () in
390 let torrents = Hashtbl.fold (fun k peers l -> (k,peers)::l) h [] in
391 let rm = ref 0 in
392 let total = ref 0 in
393 List.iter (fun (id,peers) ->
394 let m = Peers.fold (* removing is rare *)
395 (fun peer expire m -> incr total; if expire < now then (incr rm; Peers.remove peer m) else m)
396 peers peers
398 if Peers.is_empty m then Hashtbl.remove h id else Hashtbl.replace h id m
399 ) torrents;
400 log #info "Removed %d of %d peers for announced torrents" !rm !total
403 let create rt dht_port bw_control answer =
404 let enabler = ref true in
405 let rpc = KRPC.create dht_port enabler bw_control answer in
406 let torrents = Hashtbl.create 8 in
407 manage_timeouts enabler torrents;
408 { rt = rt; rpc = rpc; torrents = torrents; dht_port = dht_port; enabler = enabler; }
410 let shutdown dht =
411 dht.enabler := false;
412 KRPC.shutdown dht.rpc
414 let peers_list f m = Peers.fold (fun peer tm l -> (f peer tm)::l) m []
415 let self_get_peers t h =
416 let peers = peers_list (fun a _ -> a) (try Hashtbl.find t.torrents h with Not_found -> Peers.empty) in
417 if List.length peers <= 100 then
418 peers
419 else
420 let a = Array.of_list peers in
421 Array2.shuffle a;
422 Array.to_list (Array.sub a 0 100)
424 let self_find_node t h = List.map (fun node -> node.id, node.addr) & Kademlia.find_node t.rt h
426 end (* module M *)
428 module Secret : sig
430 type t
431 val create : time -> t
432 val get : t -> string
433 val valid : t -> string -> bool
434 val get_prev : t -> string
436 end = struct
438 type t = { mutable cur : string; mutable prev : string; timeout : time; mutable next : time; }
439 let make () = string_of_int (Random.int 1_000_000)
440 let create tm =
441 assert (tm > 0);
442 let s = make () in
443 { cur = s; prev = s; timeout = tm; next = now () + tm; }
444 let invalidate t =
445 if now () > t.next then
446 begin
447 t.prev <- t.cur;
448 t.cur <- make ();
449 t.next <- now () + t.timeout;
451 let get t =
452 invalidate t;
453 t.cur
454 let get_prev t = t.prev
455 let valid t s =
456 invalidate t;
457 s = t.cur || s = t.prev
461 let make_token addr h secret = string_of_int (Hashtbl.hash [show_addr addr; H.direct_to_string h; secret])
463 let valid_token addr h secret token =
464 token = make_token addr h (Secret.get secret) ||
465 token = make_token addr h (Secret.get_prev secret)
467 module LimitedSet = struct
469 module type S = sig
471 type elt
472 type t
473 val create : int -> t
474 (** @return whether the element was really added *)
475 val insert : t -> elt -> bool
476 val elements : t -> elt list
477 val iter : t -> (elt -> unit) -> unit
478 val min_elt : t -> elt
480 end
482 module Make(Ord:Set.OrderedType) : S with type elt = Ord.t =
483 struct
485 module S = Set.Make(Ord)
487 type elt = Ord.t
488 type t = int ref * S.t ref
490 let create n = ref n, ref S.empty
492 let insert (left,set) elem =
493 match S.mem elem !set with
494 | true -> false
495 | false ->
496 match !left with
497 | 0 ->
498 let max = S.max_elt !set in
499 if Ord.compare elem max < 0 then
500 begin set := S.add elem (S.remove max !set); true end
501 else
502 false
503 | n ->
504 set := S.add elem !set;
505 decr left;
506 true
508 let iter (_,set) f = S.iter f !set
510 let elements (_,set) = S.elements !set
512 let min_elt (_,set) = S.min_elt !set
514 end (* Make *)
516 end (* LimitedSet *)
518 let update dht st id addr = update (M.ping dht) dht.M.rt st id addr
520 exception Break
522 (** @param nodes nodes to start search from, will not be inserted into routing table *)
523 let lookup_node dht ?nodes target k =
524 log #info "lookup %s" (show_id target);
525 let start = BasicSocket.last_time () in
526 let module S = LimitedSet.Make(struct
527 type t = id * addr
528 let compare n1 n2 = Big_int.compare_big_int (distance target (fst n1)) (distance target (fst n2))
529 end) in
530 let found = S.create Kademlia.bucket_nodes in
531 let queried = Hashtbl.create 13 in
532 let active = ref 0 in
533 let check_ready () =
534 if 0 = !active then
535 begin
536 let result = S.elements found in
537 log #info "lookup_node %s done, queried %d, found %d, elapsed %ds"
538 (show_id target) (Hashtbl.length queried) (List.length result) (BasicSocket.last_time () - start);
539 k result
542 let rec round nodes =
543 let inserted = List.fold_left (fun acc node -> if S.insert found node then acc + 1 else acc) 0 nodes in
544 begin try
545 let n = ref 0 in
546 S.iter found (fun node ->
547 if alpha = !n then raise Break;
548 if not (Hashtbl.mem queried node) then begin incr n; query true node end)
549 with Break -> () end;
550 inserted
551 and query store (id,addr as node) =
552 incr active;
553 Hashtbl.add queried node true;
554 log #info "will query node %s" (show_node node);
555 M.find_node dht addr target begin fun (id,addr as node) nodes ->
556 if store then update dht Good id addr;
557 decr active;
558 let inserted = round nodes in
559 let s = try sprintf ", best %s" (show_id (fst (S.min_elt found))) with _ -> "" in
560 log #info "got %d nodes from %s, useful %d%s" (List.length nodes) (show_node node) inserted s;
561 check_ready ()
562 end ~kerr:(fun () -> decr active; log #info "timeout from %s" (show_node node); check_ready ())
564 begin match nodes with
565 | None -> let (_:int) = round (M.self_find_node dht target) in ()
566 | Some l -> List.iter (query false) l
567 end;
568 check_ready ()
570 let show_torrents torrents =
571 let now = BasicSocket.last_time () in
572 Hashtbl.iter (fun h peers ->
573 let l = M.peers_list (fun addr tm -> sprintf "%s (exp. %ds)" (show_addr addr) (tm - now)) peers in
574 lprintf_nl "torrent %s : %s" (H.to_hexa h) (String.concat " " l))
575 torrents
577 let show dht = show_table dht.M.rt; show_torrents dht.M.torrents
579 let bootstrap dht host addr k =
580 M.ping dht addr begin function
581 | Some node ->
582 log #info "bootstrap node %s (%s) is up" (show_node node) host;
583 lookup_node dht ~nodes:[node] dht.M.rt.self (fun l ->
584 log #info "bootstrap via %s (%s) : found %s" (show_addr addr) host (strl show_node l);
585 k (List.length l >= Kademlia.bucket_nodes))
586 | None ->
587 log #warn "bootstrap node %s (%s) is down" (show_addr addr) host;
588 k false
591 let bootstrap dht (host,port) k =
592 Ip.async_ip host
593 (fun ip -> bootstrap dht host (ip,port) k)
594 (fun n -> log #warn "boostrap node %s cannot be resolved (%d)" host n; k false)
596 let bootstrap ?(routers=[]) dht =
597 lookup_node dht dht.M.rt.self begin fun l ->
598 log #info "auto bootstrap : found %s" (strl show_node l);
599 let rec loop l ok =
600 match ok,l with
601 | true,_ -> log #user "bootstrap ok, total nodes : %d" (size dht.M.rt)
602 | false,[] -> log #warn "boostrap failed, total nodes : %d" (size dht.M.rt)
603 | false,(node::nodes) -> bootstrap dht node (loop nodes)
605 loop routers (List.length l >= Kademlia.bucket_nodes)
608 let query_peers dht id k =
609 log #info "query_peers: start %s" (H.to_hexa id);
610 lookup_node dht id (fun nodes ->
611 log #info "query_peers: found nodes %s" (strl show_node nodes);
613 let found = ref Peers.empty in
614 let check =
615 let left = ref (List.length nodes + 1) (* one immediate check *) in
616 fun () -> decr left; if 0 = !left then k (Peers.fold (fun peer () l -> peer :: l) !found [])
619 List.iter begin fun node ->
620 M.get_peers dht (snd node) id begin fun node token peers nodes ->
621 log #info "query_peers: got %d peers and %d nodes from %s with token %S"
622 (List.length peers) (List.length nodes) (show_node node) token;
623 k node token peers;
625 found := List.fold_left (fun acc peer -> Peers.add peer () acc) !found peers;
626 check ()
629 ~kerr:(fun () -> log #info "query_peers: get_peers error from %s" (show_node node)(*; check ()*));
630 (* check () *)
631 end nodes)
633 let start rt port bw_control =
634 let secret = Secret.create secret_timeout in
635 let rec dht = lazy (M.create rt port bw_control answer)
636 and answer addr name args =
638 let (id,q) = parse_query_exn name args in
639 let node = (id,addr) in
640 log #info "DHT query from %s : %s" (show_node node) (show_query q);
641 update !!dht Good id addr;
642 let response =
643 match q with
644 | Ping -> Ack
645 | FindNode h -> Nodes (M.self_find_node !!dht h)
646 | GetPeers h ->
647 let token = make_token addr h (Secret.get secret) in
648 let peers = M.self_get_peers !!dht h in
649 let nodes = M.self_find_node !!dht h in
650 log #info "answer with %d peers and %d nodes" (List.length peers) (List.length nodes);
651 Peers (token,peers,nodes)
652 | Announce (h,port,token) ->
653 if not (valid_token addr h secret token) then failwith "bad token in announce";
654 M.store !!dht h (fst addr, port);
657 log #info "DHT response to %s : %s" (show_node node) (show_response response);
658 make_response (!!dht).M.rt.self response
659 with
660 exn -> log #warn ~exn "query %s from %s" name (show_addr addr); raise exn
662 let refresh () =
663 let ids = Kademlia.refresh (!!dht).M.rt in
664 log #info "will refresh %d buckets" (List.length ids);
665 let cb prev_id (id,addr as node) l =
666 update !!dht Good id addr; (* replied *)
667 if prev_id <> id then
668 begin
669 log #info "refresh: node %s changed id (was %s)" (show_node node) (show_id prev_id);
670 update !!dht Bad prev_id addr;
671 end;
672 log #info "refresh: got %d nodes from %s" (List.length l) (show_node node);
673 List.iter (fun (id,addr) -> update !!dht Unknown id addr) l
675 List.iter (fun (target, nodes) ->
676 List.iter (fun (id,addr) -> M.find_node !!dht addr target (cb id) ~kerr:(fun () -> ())) nodes)
679 log #info "DHT size : %d self : %s" (size (!!dht).M.rt) (show_id (!!dht).M.rt.self);
680 BasicSocket.add_session_timer (!!dht).M.enabler 60. refresh;
681 !!dht
683 let stop dht = M.shutdown dht