4 http://www.bittorrent.org/beps/bep_0005.html
10 let dht_query_timeout = 20
11 let store_peer_timeout = minutes
30
12 let secret_timeout = minutes
10
15 let log_prefix = "dht"
16 let lprintf_nl fmt
= Printf2.lprintf_nl2
log_prefix fmt
18 let log = new logger
log_prefix
20 let catch f x
= try `Ok
(f x
) with e
-> `Exn e
25 let module A
= Autoconf
in
26 let n = int_of_string
A.major_version
* 100 + int_of_string
A.minor_version
* 10 + int_of_string
A.sub_version
- 300 in
27 assert (n > 0 && n < 256);
28 sprintf
"ML%c%c" (if A.scm_version
= "" then '
='
else '
+'
) (Char.chr
n)
30 (* 2-level association *)
34 val create
: unit -> ('a
,'b
,'c
) t
35 val add
: ('a
,'b
,'c
) t
-> 'a
-> 'b
-> 'c
-> unit
36 val find_all
: ('a
,'b
,'c
) t
-> 'a
-> ('b
,'c
) Hashtbl.t
37 val find
: ('a
,'b
,'c
) t
-> 'a
-> 'b
-> 'c
option
38 val remove
: ('a
,'b
,'c
) t
-> 'a
-> 'b
-> unit
39 val iter
: ('a
,'b
,'c
) t
-> ('a
-> 'b
-> 'c
-> unit) -> unit
40 val clear
: ('a
,'b
,'c
) t
-> unit
44 type ('a
,'b
,'c
) t
= ('a
, ('b
, 'c
) Hashtbl.t
) Hashtbl.t
46 let create () = Hashtbl.create 13
48 let hh = try Hashtbl.find h a
with Not_found
-> Hashtbl.create 3 in
49 Hashtbl.replace
hh b c
;
50 Hashtbl.replace h a
hh
51 let find_all h a
= try Hashtbl.find h a
with Not_found
-> Hashtbl.create 3
52 let find h a b
= try Some
(Hashtbl.find (Hashtbl.find h a
) b
) with Not_found
-> None
53 let remove h a b
= try let ha = Hashtbl.find h a
in Hashtbl.remove ha b
; if Hashtbl.length
ha = 0 then Hashtbl.remove h a
with Not_found
-> ()
54 let iter h f
= Hashtbl.iter (fun a h
-> Hashtbl.iter (fun b c
-> f a b c
) h
) h
55 let clear h
= Hashtbl.clear h
61 type dict
= (string * Bencode.value) list
62 let show_dict d
= String.concat
"," & List.map fst d
65 | Query
of string * dict
67 | Error
of int64
* string
69 let show_msg = function
70 | Query
(name
,args
) -> sprintf
"query %s(%s)" name
(show_dict args
)
71 | Response d
-> sprintf
"response (%s)" (show_dict d
)
72 | Error
(e
,s
) -> sprintf
"error (%Ld,%S)" e s
74 let encode (txn
,msg
) =
75 let module B
= Bencode
in
76 let x = match msg
with
77 | Query
(name
,args
) -> ["y", B.String
"q"; "q", B.String name
; "a", B.Dictionary args
]
78 | Response dict
-> ["y", B.String
"r"; "r", B.Dictionary dict
]
79 | Error
(code
,text
) -> ["y", B.String
"e"; "e", B.List
[B.Int code
; B.String text
] ]
81 let x = ("t", B.String txn
) :: ("v", B.String
self_version):: x in
82 B.encode (B.Dictionary
x)
84 let str = function Bencode.String s
-> s
| _
-> failwith
"str"
85 let int = function Bencode.Int s
-> s
| _
-> failwith
"int"
86 let dict = function Bencode.Dictionary s
-> s
| _
-> failwith
"dict"
87 let list = function Bencode.List l
-> l
| _
-> failwith
"list"
89 exception Protocol_error
of string * string
90 exception Malformed_packet
of string
91 exception Method_unknown
of string
94 let module B
= Bencode
in
95 let module Array
= struct let get x k
= match x with B.Dictionary l
-> List.assoc k l
| _
-> failwith
"decode get" end in
96 let x = try B.decode s
with _
-> raise
(Malformed_packet
"decode") in
97 let txn = try str x.("t") with _
-> raise
(Malformed_packet
"txn") in
98 let ver = try Some
(str x.("v")) with _
-> None
in
100 let msg = match str x.("y") with
101 | "q" -> Query
(str x.("q"), dict x.("a"))
102 | "r" -> Response
(dict x.("r"))
103 | "e" -> begin match list x.("e") with B.Int
n :: B.String s
:: _
-> Error
(n, s
) | _
-> failwith
"decode e" end
104 | _
-> failwith
"type"
107 exn
-> log #warn ~exn
"err"; raise
(Protocol_error
(txn,"Invalid argument"))
112 let udp_set_reader socket f
=
113 set_reader socket
begin fun _
->
114 try read_packets socket f
with exn
->
115 log #warn ~exn
"udp reader";
116 close socket
(Closed_for_exception exn
)
121 let send sock
(ip
,port
as addr
) txnmsg
=
122 let s = encode txnmsg
in
123 log #debug
"KRPC to %s : %S" (show_addr addr
) s;
124 write sock
false s ip port
126 type t
= UdpSocket.t
* (addr
, string, (addr
-> dict -> unit) * (unit -> unit) * int) A.t
128 let create port enabler bw_control answer
: t
=
129 let socket = create Unix.inet_addr_any port
(fun sock event
->
131 | WRITE_DONE
| CAN_REFILL
-> ()
132 | READ_DONE
-> assert false (* set_reader prevents this *)
133 | BASIC_EVENT
x -> match x with
135 | CAN_READ
| CAN_WRITE
-> assert false (* udpSocket implementation prevents this *)
136 | LTIMEOUT
| WTIMEOUT
| RTIMEOUT
-> () (*close sock (Closed_for_error "KRPC timeout")*))
138 set_write_controler
socket bw_control
;
139 set_wtimeout
(sock
socket) 5.;
140 set_rtimeout
(sock
socket) 5.;
141 let h = A.create () in
143 let now = last_time
() in
146 A.iter h (fun addr
txn (_
,kerr
,t
) -> incr
total; if t
< now then bad := (addr
,txn,kerr
) :: !bad);
147 log #info
"timeouted %d of %d DHT queries" (List.length
!bad) !total;
148 List.iter (fun (addr
,txn,kerr
) ->
150 try kerr
() with exn
-> log #info ~exn
"timeout for %s" (show_addr addr
)) !bad;
152 BasicSocket.add_session_timer enabler
5. (fun () -> timeout h);
153 let handle addr
(txn,ver,msg) =
154 let version = match ver with Some
s -> sprintf
"client %S " s | None
-> "" in
155 log #debug
"KRPC from %s %stxn %S : %s" (show_addr addr
) version txn (show_msg msg);
158 begin match A.find h addr
txn with
159 | None
-> log #warn
"no txn %S for %s %s (error received)" txn (show_addr addr
) version
160 | Some
(_
, kerr
, _
) -> A.remove h addr
txn; kerr
()
162 | Query
(name
,args
) ->
163 let ret = answer addr name args
in
164 send socket addr
(txn, ret)
166 match A.find h addr
txn with
167 | None
-> log #warn
"no txn %S for %s %s" txn (show_addr addr
) version
168 | Some
(k
,_
,_
) -> A.remove h addr
txn; k addr
ret
171 match p
.udp_addr
with
172 | Unix.ADDR_UNIX _
-> assert false
173 | Unix.ADDR_INET
(inet_addr
,port
) ->
174 let addr = (Ip.of_inet_addr inet_addr
, port
) in
175 let ret = ref None
in
177 (* log #debug "recv %S" p.udp_content; *)
178 let r = decode_exn p
.udp_content
in
182 let version = match !ret with Some
(_
,Some
s,_
) -> sprintf
" client %S" s | _
-> "" in
183 log #warn ~exn
"dht handle packet from %s%s : %S" (show_addr
addr) version p
.udp_content
;
184 let error txn code
str = send socket addr (txn,(Error
(Int64.of_int code
,str))) in
186 | Malformed_packet
x, Some
(txn, _
, _
)
187 | Protocol_error
("",x), Some
(txn, _
, _
) | Protocol_error
(txn,x), _
-> error txn 203 x
188 | Method_unknown
x, Some
(txn, _
, _
) -> error txn 204 x
189 | _
, Some
(txn, _
, Query _
) -> error txn 202 ""
192 udp_set_reader socket handle;
195 let shutdown (socket,h) =
196 close
socket Closed_by_user
;
197 A.iter h (fun addr _
(_
,kerr
,_
) ->
198 try kerr
() with exn
-> log #warn ~exn
"shutdown for %s" (show_addr
addr));
201 let write (socket,h) msg addr k ~kerr
=
202 let tt = Assoc2.find_all h addr in
203 let rec loop () = (* choose txn FIXME *)
204 let txn = string_of_int
(Random.int 1_000_000) in
205 match Hashtbl.mem
tt txn with
210 Assoc2.add h addr txn (k
,kerr
,last_time
() + dht_query_timeout);
211 send socket addr (txn,msg)
219 | Announce
of H.t
* int * string
221 let show_query = function
223 | FindNode id
-> sprintf
"find_node %s" (show_id id
)
224 | GetPeers
h -> sprintf
"get_peers %s" (show_id
h)
225 | Announce
(h,port
,token
) -> sprintf
"announce %s port=%d token=%S" (show_id
h) port token
229 | Nodes
of (id
* addr) list
230 | Peers
of string * addr list * (id
* addr) list
232 let strl f l
= "[" ^
(String.concat
" " & List.map f l
) ^
"]"
234 let show_node (id
,addr) = sprintf
"%s (%s)" (show_addr
addr) (show_id id
)
236 let show_response = function
238 | Nodes l
-> sprintf
"nodes %s" (strl show_node l
)
239 | Peers
(token
,peers
,nodes
) -> sprintf
"peers token=%S %s %s" token
(strl show_addr peers
) (strl show_node nodes
)
241 let parse_query_exn name args
=
242 let get k
= List.assoc k args
in
243 let sha1 k
= H.direct_of_string
& KRPC.str & get k
in
244 let p = match name
with
246 | "find_node" -> FindNode
(sha1 "target")
247 | "get_peers" -> GetPeers
(sha1 "info_hash")
248 | "announce_peer" -> Announce
(sha1 "info_hash", Int64.to_int
& KRPC.int & get "port", KRPC.str & get "token")
249 | s -> failwith
(sprintf
"parse_query name=%s" name
)
253 let make_query id
x =
254 let sha1 x = Bencode.String
(H.direct_to_string
x) in
255 let self = ("id", sha1 id
) in
257 | Ping
-> KRPC.Query
("ping", [self])
258 | FindNode t
-> KRPC.Query
("find_node", ["target", sha1 t
; self])
259 | GetPeers
h -> KRPC.Query
("get_peers", ["info_hash", sha1 h; self])
260 | Announce
(h, port
, token
) -> KRPC.Query
("announce_peer",
261 ["info_hash", sha1 h;
262 "port", Bencode.Int
(Int64.of_int port
);
263 "token", Bencode.String token
;
267 if String.length
s <> 6 then failwith
"parse_peer" else
268 let c i
= int_of_char
& s.[i
] in
269 Ip.of_ints
(c 0,c 1,c 2,c 3), (c 4 lsl 8 + c 5)
272 assert (String.length
s mod 26 = 0);
274 let nodes = ref [] in
275 while !i < String.length
s do
276 nodes := (H.direct_of_string
(String.sub
s !i 20), parse_peer (String.sub
s (!i+20) 6)) :: !nodes;
281 let make_peer (ip
,port
) =
282 assert (port
<= 0xffff);
283 let (a
,b
,c,d
) = Ip.to_ints ip
in
284 let e = port
lsr 8 and f
= port
land 0xff in
285 let s = String.create 6 in
286 let set i c = s.[i] <- char_of_int
c in
287 set 0 a
; set 1 b
; set 2 c; set 3 d
; set 4 e; set 5 f
;
290 let make_nodes nodes =
291 let s = String.create (26 * List.length
nodes) in
293 List.iter (fun (id
,addr) ->
294 String.blit
(H.direct_to_string id
) 0 s (!i*26) 20;
295 String.blit
(make_peer addr) 0 s (!i*26+20) 6;
300 let parse_response_exn q
dict =
301 let get k
= List.assoc k
dict in
302 let sha1 k
= H.direct_of_string
& KRPC.str & get k
in
306 let s = KRPC.str & get "nodes" in
307 Nodes
(parse_nodes s)
309 let token = KRPC.str & get "token" in
310 let nodes = try parse_nodes (KRPC.str & get "nodes") with Not_found
-> [] in
311 let peers = try List.map
(fun x -> parse_peer & KRPC.str x) & (KRPC.list & get "values") with Not_found
-> [] in
312 Peers
(token, peers, nodes)
317 let make_response id
x =
318 let sha1 x = Bencode.String
(H.direct_to_string
x) in
319 let self = ("id", sha1 id
) in
320 let str s = Bencode.String
s in
322 | Ack
-> KRPC.Response
[self]
323 | Nodes
nodes -> KRPC.Response
[self;"nodes",str (make_nodes nodes)]
324 | Peers
(token,peers,nodes) -> KRPC.Response
327 "nodes",str (make_nodes nodes);
328 "values",Bencode.List
(List.map
(fun addr -> str (make_peer addr)) peers);
335 let e = Dictionary
["t",String
"aa"; "v", String
self_version; "y", String
"e"; "e", List
[Int
201L; String
"A Generic Error Occurred"] ]
336 let s = sprintf
"d1:eli201e24:A Generic Error Occurrede1:t2:aa1:v4:%s1:y1:ee" self_version
337 let v = "aa", KRPC.Error
(201L, "A Generic Error Occurred")
340 assert (encode e = s);
341 assert (KRPC.decode_exn s = (fst
v, Some
self_version, snd
v));
342 assert (KRPC.encode v = s);
347 module Peers
= Map.Make
(struct type t
= addr let compare = compare end)
352 rt
: Kademlia.table
; (* routing table *)
353 rpc
: KRPC.t
; (* KRPC protocol socket *)
354 dht_port
: int; (* port *)
355 torrents
: (H.t
, int Peers.t
) Hashtbl.t
; (* torrents announced by other peers *)
356 enabler
: bool ref; (* timers' enabler *)
359 let dht_query t
addr q k ~kerr
=
360 log #info
"DHT query to %s : %s" (show_addr
addr) (show_query q
);
361 KRPC.write t
.rpc
(make_query t
.rt
.self q
) addr begin fun addr dict ->
362 let (id
,r) = try parse_response_exn q
dict with exn
-> kerr
(); raise exn
in
363 log #info
"DHT response from %s (%s) : %s" (show_addr
addr) (show_id id
) (show_response r);
367 let ping t
addr k
= dht_query t
addr Ping
begin fun node
r ->
368 match r with Ack
-> k
(Some node
)
369 | _
-> k None
; failwith
"dht_query ping" end ~kerr
:(fun () -> k None
)
371 let find_node t
addr h k ~kerr
= dht_query t
addr (FindNode
h) begin fun node
r ->
372 match r with Nodes l
-> k node l
373 | _
-> kerr
(); failwith
"dht_query find_node" end ~kerr
375 let get_peers t
addr h k ~kerr
= dht_query t
addr (GetPeers
h) begin fun node
r ->
376 match r with Peers
(token,peers,nodes) -> k node
token peers nodes
377 | _
-> kerr
(); failwith
"dht_query get_peers" end ~kerr
379 let announce t
addr port
token h k ~kerr
= dht_query t
addr (Announce
(h,port
,token)) begin fun node
r ->
380 match r with Ack
-> k node
381 | _
-> kerr
(); failwith
"dht_query announce" end ~kerr
383 let store t info_hash
addr =
384 let peers = try Hashtbl.find t
.torrents info_hash
with Not_found
-> Peers.empty
in
385 Hashtbl.replace t
.torrents info_hash
(Peers.add addr (BasicSocket.last_time
() + store_peer_timeout) peers)
387 let manage_timeouts enabler
h =
388 BasicSocket.add_session_timer enabler
60. begin fun () ->
389 let now = BasicSocket.last_time
() in
390 let torrents = Hashtbl.fold
(fun k
peers l
-> (k
,peers)::l
) h [] in
393 List.iter (fun (id
,peers) ->
394 let m = Peers.fold
(* removing is rare *)
395 (fun peer expire
m -> incr
total; if expire
< now then (incr
rm; Peers.remove peer
m) else m)
398 if Peers.is_empty
m then Hashtbl.remove h id
else Hashtbl.replace
h id
m
400 log #info
"Removed %d of %d peers for announced torrents" !rm !total
403 let create rt dht_port bw_control answer
=
404 let enabler = ref true in
405 let rpc = KRPC.create dht_port
enabler bw_control answer
in
406 let torrents = Hashtbl.create 8 in
407 manage_timeouts enabler torrents;
408 { rt
= rt
; rpc = rpc; torrents = torrents; dht_port
= dht_port
; enabler = enabler; }
411 dht
.enabler := false;
412 KRPC.shutdown dht
.rpc
414 let peers_list f
m = Peers.fold
(fun peer tm l
-> (f peer tm
)::l
) m []
415 let self_get_peers t
h =
416 let peers = peers_list (fun a _
-> a
) (try Hashtbl.find t
.torrents h with Not_found
-> Peers.empty
) in
417 if List.length
peers <= 100 then
420 let a = Array.of_list
peers in
422 Array.to_list
(Array.sub
a 0 100)
424 let self_find_node t
h = List.map
(fun node
-> node
.id
, node
.addr) & Kademlia.find_node t
.rt
h
431 val create : time
-> t
432 val get : t
-> string
433 val valid
: t
-> string -> bool
434 val get_prev
: t
-> string
438 type t
= { mutable cur
: string; mutable prev
: string; timeout : time
; mutable next
: time
; }
439 let make () = string_of_int
(Random.int 1_000_000)
443 { cur
= s; prev
= s; timeout = tm
; next
= now () + tm
; }
445 if now () > t
.next
then
449 t
.next
<- now () + t
.timeout;
454 let get_prev t
= t
.prev
457 s = t
.cur
|| s = t
.prev
461 let make_token addr h secret
= string_of_int
(Hashtbl.hash
[show_addr
addr; H.direct_to_string
h; secret
])
463 let valid_token addr h secret
token =
464 token = make_token addr h (Secret.get secret
) ||
465 token = make_token addr h (Secret.get_prev secret
)
467 module LimitedSet
= struct
473 val create : int -> t
474 (** @return whether the element was really added *)
475 val insert
: t
-> elt
-> bool
476 val elements
: t
-> elt
list
477 val iter : t
-> (elt
-> unit) -> unit
478 val min_elt
: t
-> elt
482 module Make
(Ord
:Set.OrderedType
) : S
with type elt
= Ord.t
=
485 module S
= Set.Make
(Ord
)
488 type t
= int ref * S.t
ref
490 let create n = ref n, ref S.empty
492 let insert (left
,set) elem
=
493 match S.mem elem
!set with
498 let max = S.max_elt
!set in
499 if Ord.compare elem
max < 0 then
500 begin set := S.add elem
(S.remove max !set); true end
504 set := S.add elem
!set;
508 let iter (_
,set) f
= S.iter f
!set
510 let elements (_
,set) = S.elements !set
512 let min_elt (_
,set) = S.min_elt !set
518 let update dht st id
addr = update (M.ping dht
) dht
.M.rt st id
addr
522 (** @param nodes nodes to start search from, will not be inserted into routing table *)
523 let lookup_node dht ?
nodes target k
=
524 log #info
"lookup %s" (show_id target
);
525 let start = BasicSocket.last_time
() in
526 let module S
= LimitedSet.Make
(struct
528 let compare n1 n2
= Big_int.compare_big_int
(distance target
(fst n1
)) (distance target
(fst n2
))
530 let found = S.create Kademlia.bucket_nodes
in
531 let queried = Hashtbl.create 13 in
532 let active = ref 0 in
536 let result = S.elements found in
537 log #info
"lookup_node %s done, queried %d, found %d, elapsed %ds"
538 (show_id target
) (Hashtbl.length
queried) (List.length
result) (BasicSocket.last_time
() - start);
542 let rec round nodes =
543 let inserted = List.fold_left
(fun acc node
-> if S.insert found node
then acc
+ 1 else acc
) 0 nodes in
546 S.iter found (fun node
->
547 if alpha = !n then raise Break
;
548 if not
(Hashtbl.mem
queried node
) then begin incr
n; query
true node
end)
549 with Break
-> () end;
551 and query
store (id
,addr as node
) =
553 Hashtbl.add queried node
true;
554 log #info
"will query node %s" (show_node node
);
555 M.find_node dht
addr target
begin fun (id
,addr as node
) nodes ->
556 if store then update dht Good id
addr;
558 let inserted = round nodes in
559 let s = try sprintf
", best %s" (show_id
(fst
(S.min_elt found))) with _
-> "" in
560 log #info
"got %d nodes from %s, useful %d%s" (List.length
nodes) (show_node node
) inserted s;
562 end ~kerr
:(fun () -> decr
active; log #info
"timeout from %s" (show_node node
); check_ready ())
564 begin match nodes with
565 | None
-> let (_
:int) = round (M.self_find_node dht target
) in ()
566 | Some l
-> List.iter (query
false) l
570 let show_torrents torrents =
571 let now = BasicSocket.last_time
() in
572 Hashtbl.iter (fun h peers ->
573 let l = M.peers_list (fun addr tm
-> sprintf
"%s (exp. %ds)" (show_addr
addr) (tm
- now)) peers in
574 lprintf_nl "torrent %s : %s" (H.to_hexa
h) (String.concat
" " l))
577 let show dht
= show_table dht
.M.rt
; show_torrents dht
.M.torrents
579 let bootstrap dht host
addr k
=
580 M.ping dht
addr begin function
582 log #info
"bootstrap node %s (%s) is up" (show_node node
) host
;
583 lookup_node dht ~
nodes:[node
] dht
.M.rt
.self (fun l ->
584 log #info
"bootstrap via %s (%s) : found %s" (show_addr
addr) host
(strl show_node l);
585 k
(List.length
l >= Kademlia.bucket_nodes
))
587 log #warn
"bootstrap node %s (%s) is down" (show_addr
addr) host
;
591 let bootstrap dht
(host
,port
) k
=
593 (fun ip
-> bootstrap dht host
(ip
,port
) k
)
594 (fun n -> log #warn
"boostrap node %s cannot be resolved (%d)" host
n; k
false)
596 let bootstrap ?
(routers
=[]) dht
=
597 lookup_node dht dht
.M.rt
.self begin fun l ->
598 log #info
"auto bootstrap : found %s" (strl show_node l);
601 | true,_
-> log #user
"bootstrap ok, total nodes : %d" (size dht
.M.rt
)
602 | false,[] -> log #warn
"boostrap failed, total nodes : %d" (size dht
.M.rt
)
603 | false,(node
::nodes) -> bootstrap dht node
(loop nodes)
605 loop routers
(List.length
l >= Kademlia.bucket_nodes
)
608 let query_peers dht id k
=
609 log #info
"query_peers: start %s" (H.to_hexa id
);
610 lookup_node dht id
(fun nodes ->
611 log #info
"query_peers: found nodes %s" (strl show_node nodes);
613 let found = ref Peers.empty in
615 let left = ref (List.length nodes + 1) (* one immediate check *) in
616 fun () -> decr
left; if 0 = !left then k
(Peers.fold
(fun peer
() l -> peer
:: l) !found [])
619 List.iter begin fun node
->
620 M.get_peers dht
(snd node
) id
begin fun node
token peers nodes ->
621 log #info
"query_peers: got %d peers and %d nodes from %s with token %S"
622 (List.length
peers) (List.length
nodes) (show_node node
) token;
625 found := List.fold_left (fun acc peer -> Peers.add peer () acc) !found peers;
629 ~kerr
:(fun () -> log #info
"query_peers: get_peers error from %s" (show_node node
)(*; check ()*));
633 let start rt port bw_control
=
634 let secret = Secret.create secret_timeout in
635 let rec dht = lazy (M.create rt port bw_control answer
)
636 and answer
addr name args
=
638 let (id
,q
) = parse_query_exn name args
in
639 let node = (id
,addr) in
640 log #info
"DHT query from %s : %s" (show_node node) (show_query q
);
641 update !!dht Good id
addr;
645 | FindNode
h -> Nodes
(M.self_find_node !!dht h)
647 let token = make_token addr h (Secret.get secret) in
648 let peers = M.self_get_peers !!dht h in
649 let nodes = M.self_find_node !!dht h in
650 log #info
"answer with %d peers and %d nodes" (List.length
peers) (List.length
nodes);
651 Peers
(token,peers,nodes)
652 | Announce
(h,port
,token) ->
653 if not
(valid_token addr h secret token) then failwith
"bad token in announce";
654 M.store !!dht h (fst
addr, port
);
657 log #info
"DHT response to %s : %s" (show_node node) (show_response response);
658 make_response (!!dht).M.rt
.self response
660 exn
-> log #warn ~exn
"query %s from %s" name
(show_addr
addr); raise exn
663 let ids = Kademlia.refresh (!!dht).M.rt
in
664 log #info
"will refresh %d buckets" (List.length
ids);
665 let cb prev_id
(id
,addr as node) l =
666 update !!dht Good id
addr; (* replied *)
667 if prev_id
<> id
then
669 log #info
"refresh: node %s changed id (was %s)" (show_node node) (show_id prev_id
);
670 update !!dht Bad prev_id
addr;
672 log #info
"refresh: got %d nodes from %s" (List.length
l) (show_node node);
673 List.iter (fun (id
,addr) -> update !!dht Unknown id
addr) l
675 List.iter (fun (target
, nodes) ->
676 List.iter (fun (id
,addr) -> M.find_node !!dht addr target
(cb id
) ~kerr
:(fun () -> ())) nodes)
679 log #info
"DHT size : %d self : %s" (size
(!!dht).M.rt
) (show_id
(!!dht).M.rt
.self);
680 BasicSocket.add_session_timer
(!!dht).M.enabler 60. refresh;
683 let stop dht = M.shutdown dht