patch #7442
[mldonkey.git] / src / networks / bittorrent / kademlia.ml
blobcdafa483862b4533d4878ab4eb2fea50b98e129a
1 (** Kademlia
3 Petar Maymounkov and David Mazières
4 "Kademlia: A Peer-to-Peer Information System Based on the XOR Metric"
5 http://infinite-source.de/az/whitepapers/kademlia_optimized.pdf
6 *)
8 let bucket_nodes = 8
10 (* do not use CommonOptions directly so that tools/bt_dht_node can be compiled separately *)
11 let verbose = ref false
13 module H = Md4.Sha1
15 let log_prefix = "btkad"
16 let lprintf_nl fmt = Printf2.lprintf_nl2 log_prefix fmt
18 type 'a pr = ?exn:exn -> ('a, unit, string, unit) format4 -> 'a
19 type level = [ `Debug | `Info | `User | `Warn | `Error ]
21 class logger prefix =
22 let int_level = function
23 | `Debug -> 0
24 | `Info -> 1
25 | `User -> 2
26 | `Warn -> 3
27 | `Error -> 4
29 let print_log limit prefix level ?exn fmt =
30 let put s =
31 let b = match level with
32 | 0 -> false
33 | 1 -> !verbose
34 | _ -> true
35 in
36 match b,exn with
37 | false, _ -> ()
38 | true, None -> Printf2.lprintf_nl "[%s] %s" prefix s
39 | true, Some exn -> Printf2.lprintf_nl "[%s] %s : exn %s" prefix s (Printexc2.to_string exn)
41 Printf.ksprintf put fmt
43 object
44 val mutable limit = int_level `Info
45 method debug : 'a. 'a pr = fun ?exn fmt -> print_log limit prefix 0 ?exn fmt
46 method info : 'a. 'a pr = fun ?exn fmt -> print_log limit prefix 1 ?exn fmt
47 method user : 'a. 'a pr = fun ?exn fmt -> print_log limit prefix 2 ?exn fmt
48 method warn : 'a. 'a pr = fun ?exn fmt -> print_log limit prefix 3 ?exn fmt
49 method error : 'a. 'a pr = fun ?exn fmt -> print_log limit prefix 4 ?exn fmt
50 method allow (level:level) = limit <- int_level level
51 end
53 let log = new logger log_prefix
55 (** node ID type *)
56 type id = H.t
57 let show_id h = let s = H.to_hexa h in (String.sub s 0 7 ^ ".." ^ String.sub s 17 3)
58 type addr = Ip.t * int
60 type time = int
61 let minutes n = 60 * n
62 let node_period = minutes 15
63 type status = | Good | Bad | Unknown | Pinged
64 type node = { id : id; addr : addr; mutable last : time; mutable status : status; }
65 type bucket = { lo : id; hi : id; mutable last_change : time; mutable nodes : node array; }
66 (* FIXME better *)
67 type tree = L of bucket | N of tree * id * tree
68 type table = { mutable root : tree; self : id; }
70 let now = BasicSocket.last_time
71 let diff t = Printf.sprintf "%d sec ago" (now () - t)
73 let show_addr (ip,port) = Printf.sprintf "%s:%u" (Ip.to_string ip) port
75 let show_status = function
76 | Good -> "good"
77 | Bad -> "bad"
78 | Unknown -> "unknown"
79 | Pinged -> "pinged"
81 let show_node n =
82 Printf.sprintf "%s at %s was %s %s"
83 (show_id n.id) (show_addr n.addr) (show_status n.status) (diff n.last)
85 let show_bucket b =
86 lprintf_nl "count : %d lo : %s hi : %s changed : %s" (Array.length b.nodes) (H.to_hexa b.lo) (H.to_hexa b.hi) (diff b.last_change);
87 Array.iter (fun n -> lprintf_nl " %s" (show_node n)) b.nodes
89 let rec show_tree = function
90 | N (l,_,r) -> show_tree l; show_tree r
91 | L b -> show_bucket b
93 let h2s h =
94 let s = H.direct_to_string h in
95 assert (String.length s = H.length);
98 type cmp = LT | EQ | GT
100 let cmp id1 id2 =
101 match String.compare (h2s id1) (h2s id2) with
102 | -1 -> LT
103 | 0 -> EQ
104 | 1 -> GT
105 | _ -> assert false
107 (* boundaries inclusive *)
108 let inside node hash = not (cmp hash node.lo = LT || cmp hash node.hi = GT)
110 let middle =
111 let s = String.make 20 (Char.chr 0xFF) in
112 s.[0] <- Char.chr 0x7F;
113 H.direct_of_string s
115 let middle' =
116 let s = String.make 20 (Char.chr 0x00) in
117 s.[0] <- Char.chr 0x80;
118 H.direct_of_string s
120 let last =
121 H.direct_of_string (String.make 20 (Char.chr 0xFF))
123 open Big_int
125 let big_int_of_hash h =
126 let s = h2s h in
127 let n = ref zero_big_int in
128 for i = 0 to String.length s - 1 do
129 n := add_int_big_int (Char.code s.[i]) (mult_int_big_int 256 !n)
130 done;
133 let hash_of_big_int n =
134 let s = String.create H.length in
135 let n = ref n in
136 let div = big_int_of_int 256 in
137 for i = String.length s - 1 downto 0 do
138 let (d,m) = quomod_big_int !n div in
139 s.[i] <- Char.chr (int_of_big_int m);
140 n := d
141 done;
142 assert (eq_big_int zero_big_int !n);
143 H.direct_of_string s
145 let big_int_2 = big_int_of_int 2
146 (* hash <-> number *)
147 let h2n = big_int_of_hash
148 let n2h = hash_of_big_int
150 let choose_random lo hi =
151 assert (cmp lo hi = LT);
152 let rec loop a b =
153 if cmp a b = EQ then a else
154 let mid = n2h (div_big_int (add_big_int (h2n a) (h2n b)) big_int_2) in
155 if Random.bool () then loop a mid else loop mid b
157 loop lo hi
159 let split lo hi =
160 assert (cmp lo hi = LT);
161 let mid = div_big_int (add_big_int (h2n lo) (h2n hi)) big_int_2 in
162 n2h mid
164 let succ h =
165 assert (cmp h last <> EQ);
166 n2h (succ_big_int (h2n h))
168 let distance h1 h2 =
169 let s1 = h2s h1 and s2 = h2s h2 in
170 let d = ref zero_big_int in
171 for i = 0 to H.length - 1 do
172 let x = Char.code s1.[i] lxor Char.code s2.[i] in
173 d := add_int_big_int x (mult_int_big_int 256 !d)
174 done;
177 let () =
178 assert (LT = cmp H.null middle);
179 assert (LT = cmp H.null middle');
180 assert (LT = cmp H.null last);
181 assert (GT = cmp middle' middle);
182 assert (GT = cmp last middle');
183 assert (GT = cmp last middle);
184 assert (EQ = cmp H.null H.null);
185 assert (EQ = cmp middle middle);
186 assert (EQ = cmp last last);
187 assert (n2h (h2n middle) = middle);
188 assert (n2h (h2n middle') = middle');
189 assert (n2h (h2n last) = last);
190 assert (n2h (h2n H.null) = H.null);
191 assert (compare_big_int (h2n H.null) zero_big_int = 0);
192 assert (cmp (split H.null last) middle = EQ);
193 assert (eq_big_int (distance H.null last) (pred_big_int (power_int_positive_int 2 160)));
194 assert (eq_big_int (distance middle' middle) (pred_big_int (power_int_positive_int 2 160)));
198 module type Network = sig
199 type t
200 val ping : t -> addr -> (id -> bool -> unit) -> unit
204 (* module Make(T : Network) = struct *)
206 exception Nothing
208 let make_node id addr st = { id = id; addr = addr; last = now (); status = st; }
209 let mark n st =
210 log #info "mark [%s] as %s" (show_node n) (show_status st);
211 n.last <- now ();
212 n.status <- st
213 let touch b = b.last_change <- now ()
216 let rec delete table id =
217 let rec loop = function
218 | N (l,mid,r) -> (match cmp id mid with LT | EQ -> N (loop l, mid, r) | GT -> N (l, mid, loop r))
219 | L b ->
220 Array.iter (fun n ->
221 if cmp n.id id = EQ then
224 let rec update ping table st id data =
225 (* log #debug "insert %s" (show_id node.id); *)
226 let rec loop = function
227 | N (l,mid,r) -> (match cmp id mid with LT | EQ -> N (loop l, mid, r) | GT -> N (l, mid, loop r))
228 | L b ->
229 Array.iteri begin fun i n ->
230 match cmp n.id id = EQ, n.addr = data with
231 | true, true -> mark n st; touch b; raise Nothing
232 | true, false | false, true ->
233 log #warn "conflict [%s] with %s %s, replacing" (show_node n) (show_id id) (show_addr data);
234 b.nodes.(i) <- make_node id data st; (* replace *)
235 touch b;
236 raise Nothing
237 | _ -> ()
238 end b.nodes;
239 if Array.length b.nodes <> bucket_nodes then
240 begin
241 log #info "insert %s %s" (show_id id) (show_addr data);
242 b.nodes <- Array.of_list (make_node id data st :: Array.to_list b.nodes);
243 touch b;
244 raise Nothing
245 end;
246 Array.iteri (fun i n ->
247 if n.status = Good && now () - n.last > node_period then mark n Unknown;
248 if n.status = Bad || (n.status = Pinged && now () - n.last > node_period) then
249 begin
250 log #info "replace [%s] with %s" (show_node b.nodes.(i)) (show_id id);
251 b.nodes.(i) <- make_node id data st; (* replace *)
252 touch b;
253 raise Nothing
254 end) b.nodes;
255 match Array.fold_left (fun acc n -> if n.status = Unknown then n::acc else acc) [] b.nodes with
256 | [] ->
257 if inside b table.self && gt_big_int (distance b.lo b.hi) (big_int_of_int 256) then
258 begin
259 log #info "split %s %s" (H.to_hexa b.lo) (H.to_hexa b.hi);
260 let mid = split b.lo b.hi in
261 let (nodes1,nodes2) = List.partition (fun n -> cmp n.id mid = LT) (Array.to_list b.nodes) in
262 let new_node = N (
263 L { lo = b.lo; hi = mid; last_change = b.last_change; nodes = Array.of_list nodes1; },
264 mid,
265 L { lo = succ mid; hi = b.hi; last_change = b.last_change; nodes = Array.of_list nodes2; } )
267 new_node
269 else
270 begin
271 log #info "bucket full (%s)" (show_id id);
272 raise Nothing
274 | unk ->
275 let count = ref (List.length unk) in
276 log #info "ping %d unknown nodes" !count;
277 let cb n = fun res ->
278 decr count; mark n (match res with Some _ -> Good | None -> Bad);
279 if !count = 0 then (* retry *)
280 begin
281 log #info "all %d pinged, retry %s" (List.length unk) (show_id id);
282 touch b;
283 update ping table st id data
286 List.iter (fun n -> mark n Pinged; ping n.addr (cb n)) unk;
287 raise Nothing
289 if id <> table.self then
290 try while true do table.root <- loop table.root done with Nothing -> () (* loop until no new splits *)
292 let insert_node table node =
293 (* log #debug "insert %s" (show_id node.id); *)
294 let rec loop = function
295 | N (l,mid,r) -> (match cmp node.id mid with LT | EQ -> N (loop l, mid, r) | GT -> N (l, mid, loop r))
296 | L b ->
297 Array.iter begin fun n ->
298 match cmp n.id node.id = EQ, n.addr = node.addr with
299 | true, true -> log #warn "insert_node: duplicate entry %s" (show_node n); raise Nothing
300 | true, false | false, true ->
301 log #warn "insert_node: conflict [%s] with [%s]" (show_node n) (show_node node);
302 raise Nothing
303 | _ -> ()
304 end b.nodes;
305 if Array.length b.nodes <> bucket_nodes then
306 begin
307 b.nodes <- Array.of_list (node :: Array.to_list b.nodes);
308 raise Nothing
309 end;
310 if inside b table.self && gt_big_int (distance b.lo b.hi) (big_int_of_int 256) then
311 begin
312 let mid = split b.lo b.hi in
313 let (nodes1,nodes2) = List.partition (fun n -> cmp n.id mid = LT) (Array.to_list b.nodes) in
314 let last_change = List.fold_left (fun acc n -> max acc n.last) 0 in
315 let new_node = N (
316 L { lo = b.lo; hi = mid; last_change = last_change nodes1; nodes = Array.of_list nodes1; },
317 mid,
318 L { lo = succ mid; hi = b.hi; last_change = last_change nodes2; nodes = Array.of_list nodes2; } )
320 new_node
322 else
323 begin
324 log #warn "insert_node: bucket full [%s]" (show_node node);
325 raise Nothing
328 try while true do table.root <- loop table.root done with Nothing -> ()
330 let all_nodes t =
331 let rec loop acc = function
332 | N (l,_,r) -> let acc = loop acc l in loop acc r
333 | L b -> Array.to_list b.nodes @ acc
335 loop [] t.root
337 (* end *)
339 let refresh table =
340 let expire = now () - node_period in
341 let rec loop acc = function
342 | N (l,_,r) -> let acc = loop acc l in loop acc r
343 | L b when b.last_change < expire ->
344 if Array2.exists (fun n -> n.status <> Bad) b.nodes then
345 let nodes = Array.map (fun n -> n.id, n.addr) b.nodes in
346 (choose_random b.lo b.hi, Array.to_list nodes) :: acc
347 else
348 acc (* do not refresh buckets with all bad nodes *)
349 | L _ -> acc
351 loop [] table.root
353 let find_node t h =
354 let rec loop alt = function
355 | N (l,mid,r) -> (match cmp h mid with LT | EQ -> loop (r::alt) l | GT -> loop (l::alt) r)
356 | L b ->
357 let found = Array.to_list b.nodes in
358 if Array.length b.nodes = bucket_nodes then found
359 else found
360 (* FIXME
361 List.iter (fun node -> fold (fun acc b ->
362 let acc = Array.to_list b.nodes @ acc in
363 if List.length acc >= bucket_nodes then raise Nothing
366 loop [] t.root
368 let create () = { root = L { lo = H.null; hi = last; last_change = now (); nodes = [||]; };
369 self = H.random ();
372 let show_table t =
373 lprintf_nl "self : %s now : %d" (show_id t.self) (now ());
374 show_tree t.root
376 let rec fold f acc = function
377 | N (l,_,r) -> fold f (fold f acc l) r
378 | L b -> f acc b
380 let size t = fold (fun acc b -> acc + Array.length b.nodes) 0 t.root
383 module NoNetwork : Network = struct
384 let ping addr k = k H.null (Random.bool ())
386 module K = Make(NoNetwork)
389 let tt () =
390 let table = create () in
391 show_table table;
392 let addr = Ip.of_string "127.0.0.1", 9000 in
393 let ping addr k = k (if Random.bool () then Some (H.null,addr) else None) in
394 for i = 1 to 1_000_000 do
395 update ping table Good (H.random ()) addr
396 done;
397 show_table table
399 module RoutingTableOption = struct
401 open Options
403 let value_to_status = function
404 | StringValue "good" -> Good
405 | StringValue "bad" -> Bad
406 | StringValue "pinged" -> Pinged
407 | StringValue "unknown" -> Unknown
408 | _ -> failwith "RoutingTableOption.value_to_status"
410 let status_to_value = function
411 | Good -> string_to_value "good"
412 | Bad -> string_to_value "bad"
413 | Pinged -> string_to_value "pinged"
414 | Unknown -> string_to_value "unknown"
416 let value_to_node = function
417 | Module props ->
418 let get cls s = from_value cls (List.assoc s props) in
420 id = H.of_hexa (get string_option "id");
421 addr = (get Ip.option "ip", get port_option "port");
422 last = get int_option "last";
423 status = value_to_status (List.assoc "status" props);
425 | _ -> failwith "RoutingTableOption.value_to_node"
427 let node_to_value n =
428 Module [
429 "id", string_to_value (H.to_hexa n.id);
430 "ip", to_value Ip.option (fst n.addr);
431 "port", to_value port_option (snd n.addr);
432 "last", int_to_value n.last;
433 "status", status_to_value n.status;
436 let value_to_table v =
437 match v with
438 | Module props ->
439 let nodes = value_to_list value_to_node (List.assoc "nodes" props) in
440 let self = H.of_hexa (value_to_string (List.assoc "self" props)) in
441 let t = { root = L { lo = H.null; hi = last; last_change = 0; nodes = [||]; };
442 self = self; }
444 List.iter (insert_node t) nodes;
445 if !verbose then show_table t;
447 | _ -> failwith "RoutingTableOption.value_to_table"
449 let table_to_value t =
450 if !verbose then show_table t;
451 Module [
452 "self", string_to_value (H.to_hexa t.self);
453 "nodes", list_to_value node_to_value (all_nodes t)
456 let t = define_option_class "RoutingTable" value_to_table table_to_value