4 http://www.bittorrent.org/beps/bep_0005.html
10 let dht_query_timeout = 20
11 let store_peer_timeout = minutes
30
12 let secret_timeout = minutes
10
15 let log_prefix = "[dht]"
16 let lprintf_nl ?exn fmt
= ksprintf
(fun s
->
17 let exn = match exn with Some
exn -> " : exn "^
Printexc.to_string
exn | None
-> "" in
18 Printf2.lprintf_nl2
log_prefix "%s%s" s
exn) fmt
20 let catch f x
= try `Ok
(f x
) with e
-> `Exn e
25 let module A
= Autoconf
in
26 let n = int_of_string
A.major_version
* 100 + int_of_string
A.minor_version
* 10 + int_of_string
A.sub_version
- 300 in
27 assert (n > 0 && n < 256);
28 sprintf
"ML%c%c" (if A.scm_version
= "" then '
='
else '
+'
) (Char.chr
n)
30 (* 2-level association *)
34 val create
: unit -> ('a
,'b
,'c
) t
35 val add
: ('a
,'b
,'c
) t
-> 'a
-> 'b
-> 'c
-> unit
36 val find_all
: ('a
,'b
,'c
) t
-> 'a
-> ('b
,'c
) Hashtbl.t
37 val find
: ('a
,'b
,'c
) t
-> 'a
-> 'b
-> 'c
option
38 val remove
: ('a
,'b
,'c
) t
-> 'a
-> 'b
-> unit
39 val iter
: ('a
,'b
,'c
) t
-> ('a
-> 'b
-> 'c
-> unit) -> unit
40 val clear
: ('a
,'b
,'c
) t
-> unit
44 type ('a
,'b
,'c
) t
= ('a
, ('b
, 'c
) Hashtbl.t
) Hashtbl.t
46 let create () = Hashtbl.create 13
48 let hh = try Hashtbl.find h a
with Not_found
-> Hashtbl.create 3 in
49 Hashtbl.replace
hh b c
;
50 Hashtbl.replace h a
hh
51 let find_all h a
= try Hashtbl.find h a
with Not_found
-> Hashtbl.create 3
52 let find h a b
= try Some
(Hashtbl.find (Hashtbl.find h a
) b
) with Not_found
-> None
53 let remove h a b
= try let ha = Hashtbl.find h a
in Hashtbl.remove ha b
; if Hashtbl.length
ha = 0 then Hashtbl.remove h a
with Not_found
-> ()
54 let iter h f
= Hashtbl.iter (fun a h
-> Hashtbl.iter (fun b c
-> f a b c
) h
) h
55 let clear h
= Hashtbl.clear h
59 let stats_add h k
n = Hashtbl.replace h k
(n + try Hashtbl.find h k
with Not_found
-> 0)
63 type dict
= (string * Bencode.value) list
64 let show_dict d
= String.concat
"," & List.map fst d
67 | Query
of string * dict
69 | Error
of int64
* string
71 let show_msg = function
72 | Query
(name
,args
) -> sprintf
"query %s(%s)" name
(show_dict args
)
73 | Response d
-> sprintf
"response (%s)" (show_dict d
)
74 | Error
(e
,s
) -> sprintf
"error (%Ld,%S)" e s
76 let encode (txn
,msg
) =
77 let module B
= Bencode
in
78 let x = match msg
with
79 | Query
(name
,args
) -> ["y", B.String
"q"; "q", B.String name
; "a", B.Dictionary args
]
80 | Response dict
-> ["y", B.String
"r"; "r", B.Dictionary dict
]
81 | Error
(code
,text
) -> ["y", B.String
"e"; "e", B.List
[B.Int code
; B.String text
] ]
83 let x = ("t", B.String txn
) :: ("v", B.String
self_version):: x in
84 B.encode (B.Dictionary
x)
86 let str = function Bencode.String s
-> s
| _
-> failwith
"str"
87 let int = function Bencode.Int s
-> s
| _
-> failwith
"int"
88 let dict = function Bencode.Dictionary s
-> s
| _
-> failwith
"dict"
89 let list = function Bencode.List l
-> l
| _
-> failwith
"list"
91 exception Protocol_error
of string * string
92 exception Malformed_packet
of string
93 exception Method_unknown
of string
96 let module B
= Bencode
in
97 let module Array
= struct let get x k
= match x with B.Dictionary l
-> List.assoc k l
| _
-> failwith
"decode get" end in
98 let x = try B.decode s
with _
-> raise
(Malformed_packet
"decode") in
99 let txn = try str x.("t") with _
-> raise
(Malformed_packet
"txn") in
100 let ver = try Some
(str x.("v")) with _
-> None
in
102 let msg = match str x.("y") with
103 | "q" -> Query
(str x.("q"), dict x.("a"))
104 | "r" -> Response
(dict x.("r"))
105 | "e" -> begin match list x.("e") with B.Int
n :: B.String s
:: _
-> Error
(n, s
) | _
-> failwith
"decode e" end
106 | _
-> failwith
"type"
110 if !verb
then lprintf_nl ~
exn "err";
111 raise
(Protocol_error
(txn,"Invalid argument"))
116 let udp_set_reader socket f
=
117 set_reader socket
begin fun _
->
118 try read_packets socket f
with exn ->
119 if !verb
then lprintf_nl ~
exn "udp reader";
120 close socket
(Closed_for_exception
exn)
125 let send sock stats
(ip
,port
as addr
) txnmsg
=
126 let s = encode txnmsg
in
127 if !debug
then lprintf_nl "KRPC to %s : %S" (show_addr addr
) s;
128 stats_add stats `Sent
1;
129 stats_add stats `SentBytes
(String.length
s);
130 write sock
false s ip port
132 type stats_key
= [ `Timeout
| `Sent
| `SentBytes
| `Recv
| `RecvBytes
| `Decoded
| `Handled
| `NoTxn
]
135 (stats_key
, int) Hashtbl.t
*
136 (addr
, string, (addr
-> dict -> unit) * ([`Error
|`Timeout
]-> unit) * int) A.t
138 let get k
= try Hashtbl.find t k
with Not_found
-> 0 in
140 sprintf
"rpc recv %d pkts (%d bytes)" (get `Recv
) (get `RecvBytes
);
141 sprintf
"rpc sent %d pkts (%d bytes)" (get `Sent
) (get `SentBytes
);
142 sprintf
"rpc decoded %d, handled %d" (get `Decoded
) (get `Handled
);
143 sprintf
"rpc timeouted %d, orphan %d" (get `Timeout
) (get `NoTxn
);
146 let create port enabler bw_control answer
: t
=
147 let socket = create Unix.inet_addr_any port
(fun sock event
->
149 | WRITE_DONE
| CAN_REFILL
-> ()
150 | READ_DONE
-> assert false (* set_reader prevents this *)
151 | BASIC_EVENT
x -> match x with
153 | CAN_READ
| CAN_WRITE
-> assert false (* udpSocket implementation prevents this *)
154 | LTIMEOUT
| WTIMEOUT
| RTIMEOUT
-> () (*close sock (Closed_for_error "KRPC timeout")*))
156 set_write_controler
socket bw_control
;
157 set_wtimeout
(sock
socket) 5.;
158 set_rtimeout
(sock
socket) 5.;
159 let h = A.create () in
160 let stats = Hashtbl.create 10 in
162 let now = last_time
() in
165 A.iter h (fun addr
txn (_
,kerr
,t
) -> incr
total; if t
< now then bad := (addr
,txn,kerr
) :: !bad);
166 if !debug
then lprintf_nl "timeouted %d of %d DHT queries" (List.length
!bad) !total;
167 stats_add stats `Timeout
(List.length
!bad);
168 List.iter (fun (addr
,txn,kerr
) ->
170 try kerr `Timeout
with exn -> if !debug
then lprintf_nl ~
exn "timeout for %s" (show_addr addr
)) !bad;
172 BasicSocket.add_session_timer enabler
5. (fun () -> timeout h);
173 let handle addr
(txn,ver,msg) =
174 let version = lazy (match ver with Some
s -> sprintf
" client %S" s | None
-> "") in
175 if !debug
then lprintf_nl "KRPC from %s %stxn %S : %s" (show_addr addr
) !!version txn (show_msg msg);
177 | Error
(code
,msg) ->
178 if !verb
then lprintf_nl "error received from %s%s : %Ld %S" (show_addr addr
) !!version code
msg;
179 begin match A.find h addr
txn with
181 stats_add stats `NoTxn
1;
182 if !verb
then lprintf_nl "no txn %S for %s%s" txn (show_addr addr
) !!version
183 | Some
(_
, kerr
, _
) -> A.remove h addr
txn; kerr `Error
185 | Query
(name
,args
) ->
186 let ret = answer addr name args
in
187 send socket stats addr
(txn, ret)
189 match A.find h addr
txn with
191 stats_add stats `NoTxn
1;
192 if !verb
then lprintf_nl "no txn %S for %s%s" txn (show_addr addr
) !!version
193 | Some
(k
,_
,_
) -> A.remove h addr
txn; k addr
ret
196 match p
.udp_addr
with
197 | Unix.ADDR_UNIX _
-> assert false
198 | Unix.ADDR_INET
(inet_addr
,port
) ->
199 let addr = (Ip.of_inet_addr inet_addr
, port
) in
200 let ret = ref None
in
202 stats_add stats `RecvBytes
(String.length p
.udp_content
);
203 stats_add stats `Recv
1;
204 let r = decode_exn p
.udp_content
in
205 stats_add stats `Decoded
1;
208 stats_add stats `Handled
1;
210 let version = match !ret with Some
(_
,Some
s,_
) -> sprintf
" client %S" s | _
-> "" in
211 if !verb
then lprintf_nl ~
exn "handle packet from %s%s : %S" (show_addr
addr) version p
.udp_content
;
212 let error txn code
str = send socket stats addr (txn,(Error
(Int64.of_int code
,str))) in
214 | Malformed_packet
x, Some
(txn, _
, _
)
215 | Protocol_error
("",x), Some
(txn, _
, _
) | Protocol_error
(txn,x), _
-> error txn 203 x
216 | Method_unknown
x, Some
(txn, _
, _
) -> error txn 204 x
217 | _
, Some
(txn, _
, Query _
) -> error txn 202 ""
220 udp_set_reader socket handle;
223 let shutdown (socket,_
,h) =
224 close
socket Closed_by_user
;
225 A.iter h (fun addr _
(_
,kerr
,_
) ->
226 try kerr `Timeout
with exn -> if !verb
then lprintf_nl ~
exn "shutdown for %s" (show_addr
addr));
229 let write (socket,stats,h) msg addr k ~kerr
=
230 let tt = Assoc2.find_all h addr in
231 let rec loop () = (* choose txn FIXME *)
232 let txn = string_of_int
(Random.int 1_000_000) in
233 match Hashtbl.mem
tt txn with
238 Assoc2.add h addr txn (k
,kerr
,last_time
() + dht_query_timeout);
239 send socket stats addr (txn,msg)
247 | Announce
of H.t
* int * string
249 let show_query = function
251 | FindNode id
-> sprintf
"find_node %s" (show_id id
)
252 | GetPeers
h -> sprintf
"get_peers %s" (show_id
h)
253 | Announce
(h,port
,token
) -> sprintf
"announce %s port=%d token=%S" (show_id
h) port token
257 | Nodes
of (id
* addr) list
258 | Peers
of string * addr list * (id
* addr) list
260 let strl f l
= "[" ^
(String.concat
" " & List.map f l
) ^
"]"
262 let show_node (id
,addr) = sprintf
"%s (%s)" (show_addr
addr) (show_id id
)
264 let show_response = function
266 | Nodes l
-> sprintf
"nodes %s" (strl show_node l
)
267 | Peers
(token
,peers
,nodes
) -> sprintf
"peers token=%S %s %s" token
(strl show_addr peers
) (strl show_node nodes
)
269 let parse_query_exn name args
=
270 let get k
= List.assoc k args
in
271 let sha1 k
= H.direct_of_string
& KRPC.str & get k
in
272 let p = match name
with
274 | "find_node" -> FindNode
(sha1 "target")
275 | "get_peers" -> GetPeers
(sha1 "info_hash")
276 | "announce_peer" -> Announce
(sha1 "info_hash", Int64.to_int
& KRPC.int & get "port", KRPC.str & get "token")
277 | s -> failwith
(sprintf
"parse_query name=%s" name
)
281 let make_query id
x =
282 let sha1 x = Bencode.String
(H.direct_to_string
x) in
283 let self = ("id", sha1 id
) in
285 | Ping
-> KRPC.Query
("ping", [self])
286 | FindNode t
-> KRPC.Query
("find_node", ["target", sha1 t
; self])
287 | GetPeers
h -> KRPC.Query
("get_peers", ["info_hash", sha1 h; self])
288 | Announce
(h, port
, token
) -> KRPC.Query
("announce_peer",
289 ["info_hash", sha1 h;
290 "port", Bencode.Int
(Int64.of_int port
);
291 "token", Bencode.String token
;
295 if String.length
s <> 6 then failwith
"parse_peer" else
296 let c i
= int_of_char
& s.[i
] in
297 Ip.of_ints
(c 0,c 1,c 2,c 3), (c 4 lsl 8 + c 5)
300 assert (String.length
s mod 26 = 0);
302 let nodes = ref [] in
303 while !i < String.length
s do
304 nodes := (H.direct_of_string
(String.sub
s !i 20), parse_peer (String.sub
s (!i+20) 6)) :: !nodes;
309 let make_peer (ip
,port
) =
310 assert (port
<= 0xffff);
311 let (a
,b
,c,d
) = Ip.to_ints ip
in
312 let e = port
lsr 8 and f
= port
land 0xff in
313 let s = String.create 6 in
314 let set i c = s.[i] <- char_of_int
c in
315 set 0 a
; set 1 b
; set 2 c; set 3 d
; set 4 e; set 5 f
;
318 let make_nodes nodes =
319 let s = String.create (26 * List.length
nodes) in
321 List.iter (fun (id
,addr) ->
322 String.blit
(H.direct_to_string id
) 0 s (!i*26) 20;
323 String.blit
(make_peer addr) 0 s (!i*26+20) 6;
328 let parse_response_exn q
dict =
329 let get k
= List.assoc k
dict in
330 let sha1 k
= H.direct_of_string
& KRPC.str & get k
in
334 let s = KRPC.str & get "nodes" in
335 Nodes
(parse_nodes s)
337 let token = KRPC.str & get "token" in
338 let nodes = try parse_nodes (KRPC.str & get "nodes") with Not_found
-> [] in
339 let peers = try List.map
(fun x -> parse_peer & KRPC.str x) & (KRPC.list & get "values") with Not_found
-> [] in
340 Peers
(token, peers, nodes)
345 let make_response id
x =
346 let sha1 x = Bencode.String
(H.direct_to_string
x) in
347 let self = ("id", sha1 id
) in
348 let str s = Bencode.String
s in
350 | Ack
-> KRPC.Response
[self]
351 | Nodes
nodes -> KRPC.Response
[self;"nodes",str (make_nodes nodes)]
352 | Peers
(token,peers,nodes) -> KRPC.Response
355 "nodes",str (make_nodes nodes);
356 "values",Bencode.List
(List.map
(fun addr -> str (make_peer addr)) peers);
363 let e = Dictionary
["t",String
"aa"; "v", String
self_version; "y", String
"e"; "e", List
[Int
201L; String
"A Generic Error Occurred"] ]
364 let s = sprintf
"d1:eli201e24:A Generic Error Occurrede1:t2:aa1:v4:%s1:y1:ee" self_version
365 let v = "aa", KRPC.Error
(201L, "A Generic Error Occurred")
368 assert (encode e = s);
369 assert (KRPC.decode_exn s = (fst
v, Some
self_version, snd
v));
370 assert (KRPC.encode v = s);
375 module Peers
= Map.Make
(struct type t
= addr let compare = compare end)
379 type query_type
= [ `Ping
| `FindNode
| `GetPeers
| `Announce
]
380 type answer_type
= [ `Answer
| `Error
| `Timeout
]
383 rt
: Kademlia.table
; (* routing table *)
384 rpc
: KRPC.t
; (* KRPC protocol socket *)
385 dht_port
: int; (* port *)
386 torrents
: (H.t
, int Peers.t
) Hashtbl.t
; (* torrents announced by other peers *)
387 enabler
: bool ref; (* timers' enabler *)
388 stats : (query_type
* [ `In
| `Out
of answer_type
], int) Hashtbl.t
; (* statistics *)
391 let query_type_of_query = function
393 | FindNode _
-> `FindNode
394 | GetPeers _
-> `GetPeers
395 | Announce _
-> `Announce
397 let dht_query t
addr q k ~kerr
=
398 if !debug
then lprintf_nl "DHT query to %s : %s" (show_addr
addr) (show_query q
);
399 let qt = query_type_of_query q
in
400 KRPC.write t
.rpc
(make_query t
.rt
.self q
) addr begin fun addr dict ->
401 let (id
,r) = try parse_response_exn q
dict with exn -> stats_add t
.stats (qt, `Out `Error
) 1; kerr
(); raise
exn in
402 if !debug
then lprintf_nl "DHT response from %s (%s) : %s" (show_addr
addr) (show_id id
) (show_response r);
403 stats_add t
.stats (qt, `Out `Answer
) 1;
405 end ~kerr
:(fun reason
-> stats_add t
.stats (qt, `Out
(reason
:>answer_type
)) 1; kerr
())
407 let ping t
addr k
= dht_query t
addr Ping
begin fun node
r ->
408 match r with Ack
-> k
(Some node
)
409 | _
-> k None
; failwith
"dht_query ping" end ~kerr
:(fun () -> k None
)
411 let find_node t
addr h k ~kerr
= dht_query t
addr (FindNode
h) begin fun node
r ->
412 match r with Nodes l
-> k node l
413 | _
-> kerr
(); failwith
"dht_query find_node" end ~kerr
415 let get_peers t
addr h k ~kerr
= dht_query t
addr (GetPeers
h) begin fun node
r ->
416 match r with Peers
(token,peers,nodes) -> k node
token peers nodes
417 | _
-> kerr
(); failwith
"dht_query get_peers" end ~kerr
419 let announce t
addr port
token h k ~kerr
= dht_query t
addr (Announce
(h,port
,token)) begin fun node
r ->
420 match r with Ack
-> k node
421 | _
-> kerr
(); failwith
"dht_query announce" end ~kerr
423 let store t info_hash
addr =
424 let peers = try Hashtbl.find t
.torrents info_hash
with Not_found
-> Peers.empty
in
425 Hashtbl.replace t
.torrents info_hash
(Peers.add addr (BasicSocket.last_time
() + store_peer_timeout) peers)
427 let manage_timeouts enabler
h =
428 BasicSocket.add_session_timer enabler
60. begin fun () ->
429 let now = BasicSocket.last_time
() in
430 let torrents = Hashtbl.fold
(fun k
peers l
-> (k
,peers)::l
) h [] in
433 List.iter (fun (id
,peers) ->
434 let m = Peers.fold
(* removing is rare *)
435 (fun peer expire
m -> incr
total; if expire
< now then (incr
rm; Peers.remove peer
m) else m)
438 if Peers.is_empty
m then Hashtbl.remove h id
else Hashtbl.replace
h id
m
440 if !debug
then lprintf_nl "Removed %d of %d peers for announced torrents" !rm !total
443 let create rt dht_port bw_control answer
=
444 let enabler = ref true in
445 let rpc = KRPC.create dht_port
enabler bw_control answer
in
446 let torrents = Hashtbl.create 8 in
447 manage_timeouts enabler torrents;
448 { rt
= rt
; rpc = rpc; torrents = torrents; dht_port
= dht_port
; enabler = enabler;
449 stats = Hashtbl.create 10; }
452 dht
.enabler := false;
453 KRPC.shutdown dht
.rpc
455 let peers_list f
m = Peers.fold
(fun peer tm l
-> (f peer tm
)::l
) m []
456 let self_get_peers t
h =
457 let peers = peers_list (fun a _
-> a
) (try Hashtbl.find t
.torrents h with Not_found
-> Peers.empty
) in
458 if List.length
peers <= 100 then
461 let a = Array.of_list
peers in
463 Array.to_list
(Array.sub
a 0 100)
465 let self_find_node t
h = List.map
(fun node
-> node
.id
, node
.addr) & Kademlia.find_node t
.rt
h
472 val create : time
-> t
473 val get : t
-> string
474 val valid
: t
-> string -> bool
475 val get_prev
: t
-> string
479 type t
= { mutable cur
: string; mutable prev
: string; timeout : time
; mutable next
: time
; }
480 let make () = string_of_int
(Random.int 1_000_000)
484 { cur
= s; prev
= s; timeout = tm
; next
= now () + tm
; }
486 if now () > t
.next
then
490 t
.next
<- now () + t
.timeout;
495 let get_prev t
= t
.prev
498 s = t
.cur
|| s = t
.prev
502 (* do not hash port cause some broken implementations change it all the time *)
503 let make_token (ip
,_
) h secret
= string_of_int
(Hashtbl.hash
(Ip.to_string ip
, H.direct_to_string
h, secret
))
505 let valid_token addr h secret
token =
506 let cur = Secret.get secret
in
507 let prev = Secret.get_prev secret
in
508 token = make_token addr h cur || token = make_token addr h prev
510 module LimitedSet
= struct
516 val create : int -> t
517 (** @return whether the element was really added *)
518 val insert
: t
-> elt
-> bool
519 val elements
: t
-> elt
list
520 val iter : t
-> (elt
-> unit) -> unit
521 val min_elt
: t
-> elt
525 module Make
(Ord
:Set.OrderedType
) : S
with type elt
= Ord.t
=
528 module S
= Set.Make
(Ord
)
531 type t
= int ref * S.t
ref
533 let create n = ref n, ref S.empty
535 let insert (left
,set) elem
=
536 match S.mem elem
!set with
541 let max = S.max_elt
!set in
542 if Ord.compare elem
max < 0 then
543 begin set := S.add elem
(S.remove max !set); true end
547 set := S.add elem
!set;
551 let iter (_
,set) f
= S.iter f
!set
553 let elements (_
,set) = S.elements !set
555 let min_elt (_
,set) = S.min_elt !set
561 let update dht st id
addr = update (M.ping dht
) dht
.M.rt st id
addr
565 (** @param nodes nodes to start search from, will not be inserted into routing table *)
566 let lookup_node dht ?
nodes target k
=
567 if !debug
then lprintf_nl "lookup %s" (show_id target
);
568 let start = BasicSocket.last_time
() in
569 let module S
= LimitedSet.Make
(struct
571 let compare n1 n2
= Big_int.compare_big_int
(distance target
(fst n1
)) (distance target
(fst n2
))
573 let found = S.create Kademlia.bucket_nodes
in
574 let queried = Hashtbl.create 13 in
575 let active = ref 0 in
579 let result = S.elements found in
580 if !debug
then lprintf_nl "lookup_node %s done, queried %d, found %d, elapsed %ds"
581 (show_id target
) (Hashtbl.length
queried) (List.length
result) (BasicSocket.last_time
() - start);
585 let rec round nodes =
586 let inserted = List.fold_left
(fun acc node
-> if S.insert found node
then acc
+ 1 else acc
) 0 nodes in
589 S.iter found (fun node
->
590 if alpha = !n then raise Break
;
591 if not
(Hashtbl.mem
queried node
) then begin incr
n; query
true node
end)
592 with Break
-> () end;
594 and query
store (id
,addr as node
) =
596 Hashtbl.add queried node
true;
597 if !debug
then lprintf_nl "will query node %s" (show_node node
);
598 M.find_node dht
addr target
begin fun (id
,addr as node
) nodes ->
599 if store then update dht Good id
addr;
601 let inserted = round nodes in
602 let s = try sprintf
", best %s" (show_id
(fst
(S.min_elt found))) with _
-> "" in
603 if !debug
then lprintf_nl "got %d nodes from %s, useful %d%s" (List.length
nodes) (show_node node
) inserted s;
605 end ~kerr
:(fun () -> decr
active; if !debug
then lprintf_nl "timeout from %s" (show_node node
); check_ready ())
607 begin match nodes with
608 | None
-> let (_
:int) = round (M.self_find_node dht target
) in ()
609 | Some l
-> List.iter (query
false) l
613 let show_torrents torrents =
614 let now = BasicSocket.last_time
() in
615 Hashtbl.iter (fun h peers ->
616 let l = M.peers_list (fun addr tm
-> sprintf
"%s (exp. %ds)" (show_addr
addr) (tm
- now)) peers in
617 lprintf_nl "torrent %s : %s" (H.to_hexa
h) (String.concat
" " l))
620 let show dht
= show_table dht
.M.rt
; show_torrents dht
.M.torrents
624 Hashtbl.length dht
.M.torrents,
625 Hashtbl.fold
(fun _
peers acc
-> acc
+ Peers.fold
(fun _ _ acc
-> acc
+ 1) peers 0) dht
.M.torrents 0
626 let rpc_stats dht
= let (_
,st
,_
) = dht
.M.rpc in KRPC.show_stats st
628 let bootstrap dht host
addr k
=
629 M.ping dht
addr begin function
631 if !verb
then lprintf_nl "bootstrap node %s (%s) is up" (show_node node
) host
;
632 lookup_node dht ~
nodes:[node
] dht
.M.rt
.self (fun l ->
633 if !debug
then lprintf_nl "bootstrap via %s (%s) : found %s" (show_addr
addr) host
(strl show_node l);
634 k
(List.length
l >= Kademlia.bucket_nodes
))
636 if !verb
then lprintf_nl "bootstrap node %s (%s) is down" (show_addr
addr) host
;
640 let bootstrap dht
(host
,port
) k
=
642 (fun ip
-> bootstrap dht host
(ip
,port
) k
)
643 (fun () -> if !verb
then lprintf_nl "boostrap node %s cannot be resolved" host
; k
false)
645 let bootstrap ?
(routers
=[]) dht
=
646 lookup_node dht dht
.M.rt
.self begin fun l ->
647 if !debug
then lprintf_nl "auto bootstrap : found %s" (strl show_node l);
650 | true,_
-> if !verb
then lprintf_nl "bootstrap ok, total nodes : %d" (size dht
.M.rt
)
651 | false,[] -> if !verb
then lprintf_nl "boostrap failed, total nodes : %d" (size dht
.M.rt
)
652 | false,(node
::nodes) -> bootstrap dht node
(loop nodes)
654 loop routers
(List.length
l >= Kademlia.bucket_nodes
)
657 let query_peers dht id k
=
658 if !debug
then lprintf_nl "query_peers: start %s" (H.to_hexa id
);
659 lookup_node dht id
(fun nodes ->
660 if !debug
then lprintf_nl "query_peers: found nodes %s" (strl show_node nodes);
662 let found = ref Peers.empty in
664 let left = ref (List.length nodes + 1) (* one immediate check *) in
665 fun () -> decr
left; if 0 = !left then k
(Peers.fold
(fun peer
() l -> peer
:: l) !found [])
668 List.iter begin fun node
->
669 M.get_peers dht
(snd node
) id
begin fun node
token peers nodes ->
670 if !debug
then lprintf_nl "query_peers: got %d peers and %d nodes from %s with token %S"
671 (List.length
peers) (List.length
nodes) (show_node node
) token;
674 found := List.fold_left (fun acc peer -> Peers.add peer () acc) !found peers;
678 ~kerr
:(fun () -> if !debug
then lprintf_nl "query_peers: get_peers error from %s" (show_node node
)(*; check ()*));
682 let start rt port bw_control
=
683 let secret = Secret.create secret_timeout in
684 let rec dht = lazy (M.create rt port bw_control answer
)
685 and answer
addr name args
=
686 let (id
,q
) = parse_query_exn name args
in
687 let node = (id
,addr) in
688 if !debug
then lprintf_nl "DHT query from %s : %s" (show_node node) (show_query q
);
689 update !!dht Good id
addr;
690 stats_add (!!dht).M.stats (M.query_type_of_query q
, `In
) 1;
694 | FindNode
h -> Nodes
(M.self_find_node !!dht h)
696 let token = make_token addr h (Secret.get secret) in
697 let peers = M.self_get_peers !!dht h in
698 let nodes = M.self_find_node !!dht h in
699 if !debug
then lprintf_nl "answer with %d peers and %d nodes" (List.length
peers) (List.length
nodes);
700 Peers
(token,peers,nodes)
701 | Announce
(h,port
,token) ->
702 if not
(valid_token addr h secret token) then failwith
("invalid token " ^
token);
703 M.store !!dht h (fst
addr, port
);
706 if !debug
then lprintf_nl "DHT response to %s : %s" (show_node node) (show_response response);
707 make_response (!!dht).M.rt
.self response
710 let ids = Kademlia.refresh (!!dht).M.rt
in
711 if !debug
then lprintf_nl "will refresh %d buckets" (List.length
ids);
712 let cb prev_id
(id
,addr as node) l =
713 update !!dht Good id
addr; (* replied *)
714 if prev_id
<> id
then
716 if !debug
then lprintf_nl "refresh: node %s changed id (was %s)" (show_node node) (show_id prev_id
);
717 update !!dht Bad prev_id
addr;
719 if !debug
then lprintf_nl "refresh: got %d nodes from %s" (List.length
l) (show_node node);
720 List.iter (fun (id
,addr) -> update !!dht Unknown id
addr) l
722 List.iter (fun (target
, nodes) ->
723 List.iter (fun (id
,addr) -> M.find_node !!dht addr target
(cb id
) ~kerr
:(fun () -> ())) nodes)
726 if !debug
then lprintf_nl "DHT size : %d self : %s" (size
(!!dht).M.rt
) (show_id
(!!dht).M.rt
.self);
727 BasicSocket.add_session_timer
(!!dht).M.enabler 60. refresh;
730 let stop dht = M.shutdown dht