4 http://www.bittorrent.org/beps/bep_0005.html
10 let dht_query_timeout = 20
11 let store_peer_timeout = minutes
30
12 let secret_timeout = minutes
10
15 let log_prefix = "[dht]"
16 let lprintf_nl ?exn fmt
= Printf2.lprintf_nl2 ?exn
log_prefix fmt
18 let catch f x
= try `Ok
(f x
) with e
-> `Exn e
23 let module A
= Autoconf
in
24 let n = int_of_string
A.major_version
* 100 + int_of_string
A.minor_version
* 10 + int_of_string
A.sub_version
- 300 in
25 assert (n > 0 && n < 256);
26 sprintf
"ML%c%c" (if A.scm_version
= "" then '
='
else '
+'
) (Char.chr
n)
28 (* 2-level association *)
32 val create
: unit -> ('a
,'b
,'c
) t
33 val add
: ('a
,'b
,'c
) t
-> 'a
-> 'b
-> 'c
-> unit
34 val find_all
: ('a
,'b
,'c
) t
-> 'a
-> ('b
,'c
) Hashtbl.t
35 val find
: ('a
,'b
,'c
) t
-> 'a
-> 'b
-> 'c
option
36 val remove
: ('a
,'b
,'c
) t
-> 'a
-> 'b
-> unit
37 val iter
: ('a
,'b
,'c
) t
-> ('a
-> 'b
-> 'c
-> unit) -> unit
38 val clear
: ('a
,'b
,'c
) t
-> unit
42 type ('a
,'b
,'c
) t
= ('a
, ('b
, 'c
) Hashtbl.t
) Hashtbl.t
44 let create () = Hashtbl.create 13
46 let hh = try Hashtbl.find h a
with Not_found
-> Hashtbl.create 3 in
47 Hashtbl.replace
hh b c
;
48 Hashtbl.replace h a
hh
49 let find_all h a
= try Hashtbl.find h a
with Not_found
-> Hashtbl.create 3
50 let find h a b
= try Some
(Hashtbl.find (Hashtbl.find h a
) b
) with Not_found
-> None
51 let remove h a b
= try let ha = Hashtbl.find h a
in Hashtbl.remove ha b
; if Hashtbl.length
ha = 0 then Hashtbl.remove h a
with Not_found
-> ()
52 let iter h f
= Hashtbl.iter (fun a h
-> Hashtbl.iter (fun b c
-> f a b c
) h
) h
53 let clear h
= Hashtbl.clear h
57 let stats_add h k
n = Hashtbl.replace h k
(n + try Hashtbl.find h k
with Not_found
-> 0)
61 type dict
= (string * Bencode.value) list
62 let show_dict d
= String.concat
"," & List.map fst d
65 | Query
of string * dict
67 | Error
of int64
* string
69 let show_msg = function
70 | Query
(name
,args
) -> sprintf
"query %s(%s)" name
(show_dict args
)
71 | Response d
-> sprintf
"response (%s)" (show_dict d
)
72 | Error
(e
,s
) -> sprintf
"error (%Ld,%S)" e s
74 let encode (txn
,msg
) =
75 let module B
= Bencode
in
76 let x = match msg
with
77 | Query
(name
,args
) -> ["y", B.String
"q"; "q", B.String name
; "a", B.Dictionary args
]
78 | Response dict
-> ["y", B.String
"r"; "r", B.Dictionary dict
]
79 | Error
(code
,text
) -> ["y", B.String
"e"; "e", B.List
[B.Int code
; B.String text
] ]
81 let x = ("t", B.String txn
) :: ("v", B.String
self_version):: x in
82 B.encode (B.Dictionary
x)
84 let str = function Bencode.String s
-> s
| _
-> failwith
"str"
85 let int = function Bencode.Int s
-> s
| _
-> failwith
"int"
86 let dict = function Bencode.Dictionary s
-> s
| _
-> failwith
"dict"
87 let list = function Bencode.List l
-> l
| _
-> failwith
"list"
89 exception Protocol_error
of string * string
90 exception Malformed_packet
of string
91 exception Method_unknown
of string
94 let module B
= Bencode
in
95 let module Array
= struct let get x k
= match x with B.Dictionary l
-> List.assoc k l
| _
-> failwith
"decode get" end in
96 let x = try B.decode s
with _
-> raise
(Malformed_packet
"decode") in
97 let txn = try str x.("t") with _
-> raise
(Malformed_packet
"txn") in
98 let ver = try Some
(str x.("v")) with _
-> None
in
100 let msg = match str x.("y") with
101 | "q" -> Query
(str x.("q"), dict x.("a"))
102 | "r" -> Response
(dict x.("r"))
103 | "e" -> begin match list x.("e") with B.Int
n :: B.String s
:: _
-> Error
(n, s
) | _
-> failwith
"decode e" end
104 | _
-> failwith
"type"
108 if !verb
then lprintf_nl ~exn
"err";
109 raise
(Protocol_error
(txn,"Invalid argument"))
114 let udp_set_reader socket f
=
115 set_reader socket
begin fun _
->
116 try read_packets socket f
with exn
->
117 if !verb
then lprintf_nl ~exn
"udp reader";
118 close socket
(Closed_for_exception exn
)
123 let send sock stats
(ip
,port
as addr
) txnmsg
=
124 let s = encode txnmsg
in
125 if !debug
then lprintf_nl "KRPC to %s : %S" (show_addr addr
) s;
126 stats_add stats `Sent
1;
127 stats_add stats `SentBytes
(String.length
s);
128 write sock
false s ip port
130 type stats_key
= [ `Timeout
| `Sent
| `SentBytes
| `Recv
| `RecvBytes
| `Decoded
| `Handled
| `NoTxn
]
133 (stats_key
, int) Hashtbl.t
*
134 (addr
, string, (addr
-> dict -> unit) * ([`Error
|`Timeout
]-> unit) * int) A.t
136 let get k
= try Hashtbl.find t k
with Not_found
-> 0 in
138 sprintf
"rpc recv %d pkts (%d bytes)" (get `Recv
) (get `RecvBytes
);
139 sprintf
"rpc sent %d pkts (%d bytes)" (get `Sent
) (get `SentBytes
);
140 sprintf
"rpc decoded %d, handled %d" (get `Decoded
) (get `Handled
);
141 sprintf
"rpc timeouted %d, orphan %d" (get `Timeout
) (get `NoTxn
);
144 let create port enabler bw_control answer
: t
=
145 let socket = create Unix.inet_addr_any port
(fun sock event
->
147 | WRITE_DONE
| CAN_REFILL
-> ()
148 | READ_DONE
-> assert false (* set_reader prevents this *)
149 | BASIC_EVENT
x -> match x with
151 | CAN_READ
| CAN_WRITE
-> assert false (* udpSocket implementation prevents this *)
152 | LTIMEOUT
| WTIMEOUT
| RTIMEOUT
-> () (*close sock (Closed_for_error "KRPC timeout")*))
154 set_write_controler
socket bw_control
;
155 set_wtimeout
(sock
socket) 5.;
156 set_rtimeout
(sock
socket) 5.;
157 let h = A.create () in
158 let stats = Hashtbl.create 10 in
160 let now = last_time
() in
163 A.iter h (fun addr
txn (_
,kerr
,t
) -> incr
total; if t
< now then bad := (addr
,txn,kerr
) :: !bad);
164 if !debug
then lprintf_nl "timeouted %d of %d DHT queries" (List.length
!bad) !total;
165 stats_add stats `Timeout
(List.length
!bad);
166 List.iter (fun (addr
,txn,kerr
) ->
168 try kerr `Timeout
with exn
-> if !debug
then lprintf_nl ~exn
"timeout for %s" (show_addr addr
)) !bad;
170 BasicSocket.add_session_timer enabler
5. (fun () -> timeout h);
171 let handle addr
(txn,ver,msg) =
172 let version = lazy (match ver with Some
s -> sprintf
" client %S" s | None
-> "") in
173 if !debug
then lprintf_nl "KRPC from %s %stxn %S : %s" (show_addr addr
) !!version txn (show_msg msg);
175 | Error
(code
,msg) ->
176 if !verb
then lprintf_nl "error received from %s%s : %Ld %S" (show_addr addr
) !!version code
msg;
177 begin match A.find h addr
txn with
179 stats_add stats `NoTxn
1;
180 if !verb
then lprintf_nl "no txn %S for %s%s" txn (show_addr addr
) !!version
181 | Some
(_
, kerr
, _
) -> A.remove h addr
txn; kerr `Error
183 | Query
(name
,args
) ->
184 let ret = answer addr name args
in
185 send socket stats addr
(txn, ret)
187 match A.find h addr
txn with
189 stats_add stats `NoTxn
1;
190 if !verb
then lprintf_nl "no txn %S for %s%s" txn (show_addr addr
) !!version
191 | Some
(k
,_
,_
) -> A.remove h addr
txn; k addr
ret
194 match p
.udp_addr
with
195 | Unix.ADDR_UNIX _
-> assert false
196 | Unix.ADDR_INET
(inet_addr
,port
) ->
197 let addr = (Ip.of_inet_addr inet_addr
, port
) in
198 let ret = ref None
in
200 stats_add stats `RecvBytes
(String.length p
.udp_content
);
201 stats_add stats `Recv
1;
202 let r = decode_exn p
.udp_content
in
203 stats_add stats `Decoded
1;
206 stats_add stats `Handled
1;
208 let version = match !ret with Some
(_
,Some
s,_
) -> sprintf
" client %S" s | _
-> "" in
209 if !verb
then lprintf_nl ~exn
"handle packet from %s%s : %S" (show_addr
addr) version p
.udp_content
;
210 let error txn code
str = send socket stats addr (txn,(Error
(Int64.of_int code
,str))) in
212 | Malformed_packet
x, Some
(txn, _
, _
)
213 | Protocol_error
("",x), Some
(txn, _
, _
) | Protocol_error
(txn,x), _
-> error txn 203 x
214 | Method_unknown
x, Some
(txn, _
, _
) -> error txn 204 x
215 | _
, Some
(txn, _
, Query _
) -> error txn 202 ""
218 udp_set_reader socket handle;
221 let shutdown (socket,_
,h) =
222 close
socket Closed_by_user
;
223 A.iter h (fun addr _
(_
,kerr
,_
) ->
224 try kerr `Timeout
with exn
-> if !verb
then lprintf_nl ~exn
"shutdown for %s" (show_addr
addr));
227 let write (socket,stats,h) msg addr k ~kerr
=
228 let tt = Assoc2.find_all h addr in
229 let rec loop () = (* choose txn FIXME *)
230 let txn = string_of_int
(Random.int 1_000_000) in
231 match Hashtbl.mem
tt txn with
236 Assoc2.add h addr txn (k
,kerr
,last_time
() + dht_query_timeout);
237 send socket stats addr (txn,msg)
245 | Announce
of H.t
* int * string
247 let show_query = function
249 | FindNode id
-> sprintf
"find_node %s" (show_id id
)
250 | GetPeers
h -> sprintf
"get_peers %s" (show_id
h)
251 | Announce
(h,port
,token
) -> sprintf
"announce %s port=%d token=%S" (show_id
h) port token
255 | Nodes
of (id
* addr) list
256 | Peers
of string * addr list * (id
* addr) list
258 let strl f l
= "[" ^
(String.concat
" " & List.map f l
) ^
"]"
260 let show_node (id
,addr) = sprintf
"%s (%s)" (show_addr
addr) (show_id id
)
262 let show_response = function
264 | Nodes l
-> sprintf
"nodes %s" (strl show_node l
)
265 | Peers
(token
,peers
,nodes
) -> sprintf
"peers token=%S %s %s" token
(strl show_addr peers
) (strl show_node nodes
)
267 let parse_query_exn name args
=
268 let get k
= List.assoc k args
in
269 let sha1 k
= H.direct_of_string
& KRPC.str & get k
in
270 let p = match name
with
272 | "find_node" -> FindNode
(sha1 "target")
273 | "get_peers" -> GetPeers
(sha1 "info_hash")
274 | "announce_peer" -> Announce
(sha1 "info_hash", Int64.to_int
& KRPC.int & get "port", KRPC.str & get "token")
275 | s -> failwith
(sprintf
"parse_query name=%s" name
)
279 let make_query id
x =
280 let sha1 x = Bencode.String
(H.direct_to_string
x) in
281 let self = ("id", sha1 id
) in
283 | Ping
-> KRPC.Query
("ping", [self])
284 | FindNode t
-> KRPC.Query
("find_node", ["target", sha1 t
; self])
285 | GetPeers
h -> KRPC.Query
("get_peers", ["info_hash", sha1 h; self])
286 | Announce
(h, port
, token
) -> KRPC.Query
("announce_peer",
287 ["info_hash", sha1 h;
288 "port", Bencode.Int
(Int64.of_int port
);
289 "token", Bencode.String token
;
293 if String.length
s <> 6 then failwith
"parse_peer" else
294 let c i
= int_of_char
& s.[i
] in
295 Ip.of_ints
(c 0,c 1,c 2,c 3), (c 4 lsl 8 + c 5)
298 assert (String.length
s mod 26 = 0);
300 let nodes = ref [] in
301 while !i < String.length
s do
302 nodes := (H.direct_of_string
(String.sub
s !i 20), parse_peer (String.sub
s (!i+20) 6)) :: !nodes;
307 let make_peer (ip
,port
) =
308 assert (port
<= 0xffff);
309 let (a
,b
,c,d
) = Ip.to_ints ip
in
310 let e = port
lsr 8 and f
= port
land 0xff in
311 let s = String.create 6 in
312 let set i c = s.[i] <- char_of_int
c in
313 set 0 a
; set 1 b
; set 2 c; set 3 d
; set 4 e; set 5 f
;
316 let make_nodes nodes =
317 let s = String.create (26 * List.length
nodes) in
319 List.iter (fun (id
,addr) ->
320 String.blit
(H.direct_to_string id
) 0 s (!i*26) 20;
321 String.blit
(make_peer addr) 0 s (!i*26+20) 6;
326 let parse_response_exn q
dict =
327 let get k
= List.assoc k
dict in
328 let sha1 k
= H.direct_of_string
& KRPC.str & get k
in
332 let s = KRPC.str & get "nodes" in
333 Nodes
(parse_nodes s)
335 let token = KRPC.str & get "token" in
336 let nodes = try parse_nodes (KRPC.str & get "nodes") with Not_found
-> [] in
337 let peers = try List.map
(fun x -> parse_peer & KRPC.str x) & (KRPC.list & get "values") with Not_found
-> [] in
338 Peers
(token, peers, nodes)
343 let make_response id
x =
344 let sha1 x = Bencode.String
(H.direct_to_string
x) in
345 let self = ("id", sha1 id
) in
346 let str s = Bencode.String
s in
348 | Ack
-> KRPC.Response
[self]
349 | Nodes
nodes -> KRPC.Response
[self;"nodes",str (make_nodes nodes)]
350 | Peers
(token,peers,nodes) -> KRPC.Response
353 "nodes",str (make_nodes nodes);
354 "values",Bencode.List
(List.map
(fun addr -> str (make_peer addr)) peers);
361 let e = Dictionary
["t",String
"aa"; "v", String
self_version; "y", String
"e"; "e", List
[Int
201L; String
"A Generic Error Occurred"] ]
362 let s = sprintf
"d1:eli201e24:A Generic Error Occurrede1:t2:aa1:v4:%s1:y1:ee" self_version
363 let v = "aa", KRPC.Error
(201L, "A Generic Error Occurred")
366 assert (encode e = s);
367 assert (KRPC.decode_exn s = (fst
v, Some
self_version, snd
v));
368 assert (KRPC.encode v = s);
373 module Peers
= Map.Make
(struct type t
= addr let compare = compare end)
377 type query_type
= [ `Ping
| `FindNode
| `GetPeers
| `Announce
]
378 type answer_type
= [ `Answer
| `Error
| `Timeout
]
381 rt
: Kademlia.table
; (* routing table *)
382 rpc
: KRPC.t
; (* KRPC protocol socket *)
383 dht_port
: int; (* port *)
384 torrents
: (H.t
, int Peers.t
) Hashtbl.t
; (* torrents announced by other peers *)
385 enabler
: bool ref; (* timers' enabler *)
386 stats : (query_type
* [ `In
| `Out
of answer_type
], int) Hashtbl.t
; (* statistics *)
389 let query_type_of_query = function
391 | FindNode _
-> `FindNode
392 | GetPeers _
-> `GetPeers
393 | Announce _
-> `Announce
395 let dht_query t
addr q k ~kerr
=
396 if !debug
then lprintf_nl "DHT query to %s : %s" (show_addr
addr) (show_query q
);
397 let qt = query_type_of_query q
in
398 KRPC.write t
.rpc
(make_query t
.rt
.self q
) addr begin fun addr dict ->
399 let (id
,r) = try parse_response_exn q
dict with exn
-> stats_add t
.stats (qt, `Out `Error
) 1; kerr
(); raise exn
in
400 if !debug
then lprintf_nl "DHT response from %s (%s) : %s" (show_addr
addr) (show_id id
) (show_response r);
401 stats_add t
.stats (qt, `Out `Answer
) 1;
403 end ~kerr
:(fun reason
-> stats_add t
.stats (qt, `Out
(reason
:>answer_type
)) 1; kerr
())
405 let ping t
addr k
= dht_query t
addr Ping
begin fun node
r ->
406 match r with Ack
-> k
(Some node
)
407 | _
-> k None
; failwith
"dht_query ping" end ~kerr
:(fun () -> k None
)
409 let find_node t
addr h k ~kerr
= dht_query t
addr (FindNode
h) begin fun node
r ->
410 match r with Nodes l
-> k node l
411 | _
-> kerr
(); failwith
"dht_query find_node" end ~kerr
413 let get_peers t
addr h k ~kerr
= dht_query t
addr (GetPeers
h) begin fun node
r ->
414 match r with Peers
(token,peers,nodes) -> k node
token peers nodes
415 | _
-> kerr
(); failwith
"dht_query get_peers" end ~kerr
417 let announce t
addr port
token h k ~kerr
= dht_query t
addr (Announce
(h,port
,token)) begin fun node
r ->
418 match r with Ack
-> k node
419 | _
-> kerr
(); failwith
"dht_query announce" end ~kerr
421 let store t info_hash
addr =
422 let peers = try Hashtbl.find t
.torrents info_hash
with Not_found
-> Peers.empty
in
423 Hashtbl.replace t
.torrents info_hash
(Peers.add addr (BasicSocket.last_time
() + store_peer_timeout) peers)
425 let manage_timeouts enabler
h =
426 BasicSocket.add_session_timer enabler
60. begin fun () ->
427 let now = BasicSocket.last_time
() in
428 let torrents = Hashtbl.fold
(fun k
peers l
-> (k
,peers)::l
) h [] in
431 List.iter (fun (id
,peers) ->
432 let m = Peers.fold
(* removing is rare *)
433 (fun peer expire
m -> incr
total; if expire
< now then (incr
rm; Peers.remove peer
m) else m)
436 if Peers.is_empty
m then Hashtbl.remove h id
else Hashtbl.replace
h id
m
438 if !debug
then lprintf_nl "Removed %d of %d peers for announced torrents" !rm !total
441 let create rt dht_port bw_control answer
=
442 let enabler = ref true in
443 let rpc = KRPC.create dht_port
enabler bw_control answer
in
444 let torrents = Hashtbl.create 8 in
445 manage_timeouts enabler torrents;
446 { rt
= rt
; rpc = rpc; torrents = torrents; dht_port
= dht_port
; enabler = enabler;
447 stats = Hashtbl.create 10; }
450 dht
.enabler := false;
451 KRPC.shutdown dht
.rpc
453 let peers_list f
m = Peers.fold
(fun peer tm l
-> (f peer tm
)::l
) m []
454 let self_get_peers t
h =
455 let peers = peers_list (fun a _
-> a
) (try Hashtbl.find t
.torrents h with Not_found
-> Peers.empty
) in
456 if List.length
peers <= 100 then
459 let a = Array.of_list
peers in
461 Array.to_list
(Array.sub
a 0 100)
463 let self_find_node t
h = List.map
(fun node
-> node
.id
, node
.addr) & Kademlia.find_node t
.rt
h
470 val create : time
-> t
471 val get : t
-> string
472 val valid
: t
-> string -> bool
473 val get_prev
: t
-> string
477 type t
= { mutable cur
: string; mutable prev
: string; timeout : time
; mutable next
: time
; }
478 let make () = string_of_int
(Random.int 1_000_000)
482 { cur
= s; prev
= s; timeout = tm
; next
= now () + tm
; }
484 if now () > t
.next
then
488 t
.next
<- now () + t
.timeout;
493 let get_prev t
= t
.prev
496 s = t
.cur
|| s = t
.prev
500 (* do not hash port cause some broken implementations change it all the time *)
501 let make_token (ip
,_
) h secret
= string_of_int
(Hashtbl.hash
(Ip.to_string ip
, H.direct_to_string
h, secret
))
503 let valid_token addr h secret
token =
504 let cur = Secret.get secret
in
505 let prev = Secret.get_prev secret
in
506 token = make_token addr h cur || token = make_token addr h prev
508 module LimitedSet
= struct
514 val create : int -> t
516 (** @return whether the element was really added *)
517 val insert
: t
-> elt
-> bool
518 val elements
: t
-> elt
list
519 val iter : t
-> (elt
-> unit) -> unit
520 val min_elt
: t
-> elt
524 module Make
(Ord
:Set.OrderedType
) : S
with type elt
= Ord.t
=
527 module S
= Set.Make
(Ord
)
530 type t
= int ref * S.t
ref
532 let create n = ref n, ref S.empty
534 let insert (left
,set) elem
=
535 match S.mem elem
!set with
540 let max = S.max_elt
!set in
541 if Ord.compare elem
max < 0 then
542 begin set := S.add elem
(S.remove max !set); true end
546 set := S.add elem
!set;
550 let iter (_
,set) f
= S.iter f
!set
552 let elements (_
,set) = S.elements !set
554 let min_elt (_
,set) = S.min_elt !set
560 let update dht st id
addr = update (M.ping dht
) dht
.M.rt st id
addr
564 (** @param nodes nodes to start search from, will not be inserted into routing table *)
565 let lookup_node dht ?
nodes target k
=
566 if !debug
then lprintf_nl "lookup %s" (show_id target
);
567 let start = BasicSocket.last_time
() in
568 let module S
= LimitedSet.Make
(struct
570 let compare n1 n2
= Big_int.compare_big_int
(distance target
(fst n1
)) (distance target
(fst n2
))
572 let found = S.create Kademlia.bucket_nodes
in
573 let queried = Hashtbl.create 13 in
574 let active = ref 0 in
578 let result = S.elements found in
579 if !debug
then lprintf_nl "lookup_node %s done, queried %d, found %d, elapsed %ds"
580 (show_id target
) (Hashtbl.length
queried) (List.length
result) (BasicSocket.last_time
() - start);
584 let rec round nodes =
585 let inserted = List.fold_left
(fun acc node
-> if S.insert found node
then acc
+ 1 else acc
) 0 nodes in
588 S.iter found (fun node
->
589 if alpha = !n then raise Break
;
590 if not
(Hashtbl.mem
queried node
) then begin incr
n; query
true node
end)
591 with Break
-> () end;
593 and query
store (id
,addr as node
) =
595 Hashtbl.add queried node
true;
596 if !debug
then lprintf_nl "will query node %s" (show_node node
);
597 M.find_node dht
addr target
begin fun (id
,addr as node
) nodes ->
598 if store then update dht Good id
addr;
600 let inserted = round nodes in
601 let s = try sprintf
", best %s" (show_id
(fst
(S.min_elt found))) with _
-> "" in
602 if !debug
then lprintf_nl "got %d nodes from %s, useful %d%s" (List.length
nodes) (show_node node
) inserted s;
604 end ~kerr
:(fun () -> decr
active; if !debug
then lprintf_nl "timeout from %s" (show_node node
); check_ready ())
606 begin match nodes with
607 | None
-> let (_
:int) = round (M.self_find_node dht target
) in ()
608 | Some l
-> List.iter (query
false) l
612 let show_torrents torrents =
613 let now = BasicSocket.last_time
() in
614 Hashtbl.iter (fun h peers ->
615 let l = M.peers_list (fun addr tm
-> sprintf
"%s (exp. %ds)" (show_addr
addr) (tm
- now)) peers in
616 lprintf_nl "torrent %s : %s" (H.to_hexa
h) (String.concat
" " l))
619 let show dht
= show_table dht
.M.rt
; show_torrents dht
.M.torrents
623 Hashtbl.length dht
.M.torrents,
624 Hashtbl.fold
(fun _
peers acc
-> acc
+ Peers.fold
(fun _ _ acc
-> acc
+ 1) peers 0) dht
.M.torrents 0
625 let rpc_stats dht
= let (_
,st
,_
) = dht
.M.rpc in KRPC.show_stats st
627 let bootstrap dht host
addr k
=
628 M.ping dht
addr begin function
630 if !verb
then lprintf_nl "bootstrap node %s (%s) is up" (show_node node
) host
;
631 lookup_node dht ~
nodes:[node
] dht
.M.rt
.self (fun l ->
632 if !debug
then lprintf_nl "bootstrap via %s (%s) : found %s" (show_addr
addr) host
(strl show_node l);
633 k
(List.length
l >= Kademlia.bucket_nodes
))
635 if !verb
then lprintf_nl "bootstrap node %s (%s) is down" (show_addr
addr) host
;
639 let bootstrap dht
(host
,port
) k
=
641 (fun ip
-> bootstrap dht host
(ip
,port
) k
)
642 (fun () -> if !verb
then lprintf_nl "boostrap node %s cannot be resolved" host
; k
false)
644 let bootstrap ?
(routers
=[]) dht
=
645 lookup_node dht dht
.M.rt
.self begin fun l ->
646 if !debug
then lprintf_nl "auto bootstrap : found %s" (strl show_node l);
649 | true,_
-> if !verb
then lprintf_nl "bootstrap ok, total nodes : %d" (size dht
.M.rt
)
650 | false,[] -> if !verb
then lprintf_nl "boostrap failed, total nodes : %d" (size dht
.M.rt
)
651 | false,(node
::nodes) -> bootstrap dht node
(loop nodes)
653 loop routers
(List.length
l >= Kademlia.bucket_nodes
)
656 let query_peers dht id k
=
657 if !debug
then lprintf_nl "query_peers: start %s" (H.to_hexa id
);
658 lookup_node dht id
(fun nodes ->
659 if !debug
then lprintf_nl "query_peers: found nodes %s" (strl show_node nodes);
661 let found = ref Peers.empty in
663 let left = ref (List.length nodes + 1) (* one immediate check *) in
664 fun () -> decr
left; if 0 = !left then k
(Peers.fold
(fun peer
() l -> peer
:: l) !found [])
667 List.iter begin fun node
->
668 M.get_peers dht
(snd node
) id
begin fun node
token peers nodes ->
669 if !debug
then lprintf_nl "query_peers: got %d peers and %d nodes from %s with token %S"
670 (List.length
peers) (List.length
nodes) (show_node node
) token;
673 found := List.fold_left (fun acc peer -> Peers.add peer () acc) !found peers;
677 ~kerr
:(fun () -> if !debug
then lprintf_nl "query_peers: get_peers error from %s" (show_node node
)(*; check ()*));
681 let start rt port bw_control
=
682 let secret = Secret.create secret_timeout in
683 let rec dht = lazy (M.create rt port bw_control answer
)
684 and answer
addr name args
=
685 let (id
,q
) = parse_query_exn name args
in
686 let node = (id
,addr) in
687 if !debug
then lprintf_nl "DHT query from %s : %s" (show_node node) (show_query q
);
688 update !!dht Good id
addr;
689 stats_add (!!dht).M.stats (M.query_type_of_query q
, `In
) 1;
693 | FindNode
h -> Nodes
(M.self_find_node !!dht h)
695 let token = make_token addr h (Secret.get secret) in
696 let peers = M.self_get_peers !!dht h in
697 let nodes = M.self_find_node !!dht h in
698 if !debug
then lprintf_nl "answer with %d peers and %d nodes" (List.length
peers) (List.length
nodes);
699 Peers
(token,peers,nodes)
700 | Announce
(h,port
,token) ->
701 if not
(valid_token addr h secret token) then failwith
("invalid token " ^
token);
702 M.store !!dht h (fst
addr, port
);
705 if !debug
then lprintf_nl "DHT response to %s : %s" (show_node node) (show_response response);
706 make_response (!!dht).M.rt
.self response
709 let ids = Kademlia.refresh (!!dht).M.rt
in
710 if !debug
then lprintf_nl "will refresh %d buckets" (List.length
ids);
711 let cb prev_id
(id
,addr as node) l =
712 update !!dht Good id
addr; (* replied *)
713 if prev_id
<> id
then
715 if !debug
then lprintf_nl "refresh: node %s changed id (was %s)" (show_node node) (show_id prev_id
);
716 update !!dht Bad prev_id
addr;
718 if !debug
then lprintf_nl "refresh: got %d nodes from %s" (List.length
l) (show_node node);
719 List.iter (fun (id
,addr) -> update !!dht Unknown id
addr) l
721 List.iter (fun (target
, nodes) ->
722 List.iter (fun (id
,addr) -> M.find_node !!dht addr target
(cb id
) ~kerr
:(fun () -> ())) nodes)
725 if !debug
then lprintf_nl "DHT size : %d self : %s" (size
(!!dht).M.rt
) (show_id
(!!dht).M.rt
.self);
726 BasicSocket.add_session_timer
(!!dht).M.enabler 60. refresh;
729 let stop dht = M.shutdown dht