From 00e103753bd89d5e0ad83cb4a6967fac46092394 Mon Sep 17 00:00:00 2001 From: spiralvoice Date: Sun, 23 Jan 2011 15:20:25 +0000 Subject: [PATCH] patch #7442 --- config/Makefile.in | 44 +- distrib/ChangeLog | 5 + src/daemon/common/commonOptions.ml | 8 +- src/daemon/common/commonTypes.ml | 1 - src/daemon/driver/driverInterface.ml | 2 - src/networks/bittorrent/bTClients.ml | 86 +++- src/networks/bittorrent/bTComplexOptions.ml | 17 +- src/networks/bittorrent/bTGlobals.ml | 4 + src/networks/bittorrent/bTInteractive.ml | 240 ++++------ src/networks/bittorrent/bTMain.ml | 27 ++ src/networks/bittorrent/bTOptions.ml | 17 + src/networks/bittorrent/bTProtocol.ml | 50 +- src/networks/bittorrent/bTTorrent.ml | 10 +- src/networks/bittorrent/bTTypes.ml | 5 +- src/networks/bittorrent/bT_DHT.ml | 684 ++++++++++++++++++++++++++++ src/networks/bittorrent/kademlia.ml | 459 +++++++++++++++++++ src/utils/cdk/array2.ml | 8 + src/utils/cdk/list2.ml | 8 +- tools/bt_dht_node.ml | 62 +++ tools/make_torrent.ml | 34 +- tools/subconv.ml | 2 +- 21 files changed, 1496 insertions(+), 277 deletions(-) create mode 100644 src/networks/bittorrent/bT_DHT.ml create mode 100644 src/networks/bittorrent/kademlia.ml create mode 100644 tools/bt_dht_node.ml diff --git a/config/Makefile.in b/config/Makefile.in index 5edbf1c3..ef4e2433 100644 --- a/config/Makefile.in +++ b/config/Makefile.in @@ -28,8 +28,8 @@ NO_LIBS_opt= NO_STATIC_LIBS_opt= NO_CMXA= -LIBS_byte=-custom bigarray.cma unix.cma str.cma -LIBS_opt= bigarray.cmxa unix.cmxa str.cmxa +LIBS_byte=-custom bigarray.cma unix.cma str.cma nums.cma +LIBS_opt= bigarray.cmxa unix.cmxa str.cmxa nums.cmxa BIGARRAY_LIBS_opt=bigarray.cmxa BIGARRAY_LIBS_byte=bigarray.cma @@ -152,9 +152,9 @@ CDK_SRCS+= $(LIB)/fifo.ml $(CDK)/arg2.ml $(LIB)/syslog.ml $(CDK)/printf2.ml \ $(CDK)/heap.ml \ $(CDK)/printexc2.ml $(CDK)/genlex2.ml \ $(CDK)/filepath.ml $(CDK)/string2.ml \ - $(CDK)/filename2.ml $(CDK)/list2.ml $(CDK)/hashtbl2.ml \ + $(CDK)/filename2.ml $(CDK)/array2.ml $(CDK)/hashtbl2.ml \ $(CDK)/unix2.ml $(CDK)/file.ml \ - $(CDK)/heap_c.c $(CDK)/array2.ml + $(CDK)/heap_c.c $(CDK)/list2.ml EXTLIB_SRCS += $(EXTLIB)/IO.ml @@ -450,6 +450,8 @@ BITTORRENT_SRCS= \ $(SRC_BITTORRENT)/bTUdpTracker.ml \ $(SRC_BITTORRENT)/bTProtocol.ml \ $(SRC_BITTORRENT)/bTTorrent.ml \ + $(SRC_BITTORRENT)/kademlia.ml \ + $(SRC_BITTORRENT)/bT_DHT.ml \ $(SRC_BITTORRENT)/bTGlobals.ml \ $(SRC_BITTORRENT)/bTComplexOptions.ml \ $(SRC_BITTORRENT)/bTStats.ml \ @@ -536,6 +538,11 @@ MAKE_TORRENT_SRCS = \ $(COMMON_SRCS) $(COMMON_CLIENT_SRCS) $(BITSTRING_SRCS) $(BITTORRENT_SRCS) \ tools/make_torrent.ml +BT_DHT_NODE_SRCS = \ + $(EXTLIB_SRCS) $(CDK_SRCS) $(LIB_SRCS) $(NET_SRCS) \ + $(SRC_BITTORRENT)/bencode.ml $(SRC_BITTORRENT)/kademlia.ml $(SRC_BITTORRENT)/bT_DHT.ml \ + tools/bt_dht_node.ml + GET_RANGE_SRCS = \ $(EXTLIB_SRCS) $(CDK_SRCS) $(LIB_SRCS) $(NET_SRCS) $(MP3TAG_SRCS) \ tools/get_range.ml @@ -1359,6 +1366,7 @@ EXPAND(OBSERVER,observer) EXPAND(MLD_HASH,mld_hash) EXPAND(OCAMLPP,ocamlpp) EXPAND(MAKE_TORRENT,make_torrent,NO,NO,NO,NO,MAGIC,BITSTRING,UPNP_NATPMP) +EXPAND(BT_DHT_NODE,bt_dht_node,NO,NO,NO,NO,NO,BITSTRING,UPNP_NATPMP) EXPAND(SUBCONV,subconv) EXPAND(MLSPLIT,mlsplit) EXPAND(CONTESTER,contester,CRYPT) @@ -1462,8 +1470,8 @@ clean: rm -f mlfiletp mlfiletp+gui mlfiletp.exe rm -f mldc mldc+gui mldc.exe rm -f mlfasttrack mlfasttrack+gui mlfasttrack.exe - rm -f svg_converter svg_converter.byte mld_hash make_torrent copysources get_range subconv testrss - rm -f svg_converter.exe mld_hash.exe make_torrent.exe copysources.exe get_range.exe subconv.exe testrss.exe + rm -f svg_converter svg_converter.byte mld_hash make_torrent bt_dht_node copysources get_range subconv testrss + rm -f svg_converter.exe mld_hash.exe make_torrent.exe bt_dht_node.exe copysources.exe get_range.exe subconv.exe testrss.exe rm -f tests tests.exe (for i in $(SUBDIRS); do \ rm -f $$i/*.cm? $$i/*.o $$i/*.annot ; \ @@ -1573,16 +1581,16 @@ $(LOCAL)/ocamlopt-$(REQUIRED_OCAML)/ocamlopt: $(LOCAL)/ocamlopt-$(REQUIRED_OCAML cd $(LOCAL)/ocamlopt-$(REQUIRED_OCAML); $(MAKE) ifeq ("$(BITTORRENT)", "yes") -MAKE_TORRENT=make_torrent -MAKE_TORRENT_BYTE=$(MAKE_TORRENT).byte -MAKE_TORRENT_STATIC=$(MAKE_TORRENT).static -MAKE_TORRENT_BYTE_STATIC=$(MAKE_TORRENT_BYTE).static +BT_UTILS=make_torrent bt_dht_node +BT_UTILS_BYTE=$(foreach x, $(BT_UTILS), $(x).byte) +BT_UTILS_STATIC=$(foreach x, $(BT_UTILS), $(x).static) +BT_UTILS_BYTE_STATIC=$(foreach x, $(BT_UTILS), $(x).byte.static) endif -utils.byte: mld_hash.byte $(MAKE_TORRENT_BYTE) copysources.byte get_range.byte subconv.byte -utils.opt: svg_converter mld_hash $(MAKE_TORRENT) copysources get_range subconv -utils.opt.static: svg_converter mld_hash.static $(MAKE_TORRENT_STATIC) copysources.static get_range.static subconv.static -utils.byte.static: mld_hash.byte.static $(MAKE_TORRENT_BYTE_STATIC) copysources.byte.static get_range.byte.static subconv.byte.static +utils.byte: mld_hash.byte $(BT_UTILS_BYTE) copysources.byte get_range.byte subconv.byte +utils.opt: svg_converter mld_hash $(BT_UTILS) copysources get_range subconv +utils.opt.static: svg_converter mld_hash.static $(BT_UTILS_STATIC) copysources.static get_range.static subconv.static +utils.byte.static: mld_hash.byte.static $(BT_UTILS_BYTE_STATIC) copysources.byte.static get_range.byte.static subconv.byte.static utils.static: if test "$(TARGET_TYPE)" = "byte"; then \ $(MAKE) utils.byte.static; \ @@ -1675,10 +1683,10 @@ release.mlnet.distri: mlnet mlnet.static mv $(DISDIR).tar mldonkey-$(CURRENT_VERSION).static.$(MD4ARCH)-`uname -s | sed "s/\//_/"`$(GLIBC_VERSION_ARCH).tar $(COMPRESS) mldonkey-$(CURRENT_VERSION).static.$(MD4ARCH)-`uname -s | sed "s/\//_/"`$(GLIBC_VERSION_ARCH).tar -release.utils.shared: mld_hash $(MAKE_TORRENT) +release.utils.shared: mld_hash $(BT_UTILS) rm -rf mldonkey-* mkdir -p $(DISDIR) - for i in "mld_hash $(MAKE_TORRENT)"; do \ + for i in "mld_hash $(BT_UTILS)"; do \ cp -f $$i $(DISDIR)/$$i && \ if [ "$(SYSTEM)" != "macos" ]; then \ strip $(DISDIR)/$$i; \ @@ -1689,12 +1697,12 @@ release.utils.shared: mld_hash $(MAKE_TORRENT) mv $(DISDIR).tar mldonkey-tools-$(CURRENT_VERSION).shared.$(MD4ARCH)-`uname -s | sed "s/\//_/"`$(GLIBC_VERSION_ARCH).tar $(COMPRESS) mldonkey-tools-$(CURRENT_VERSION).shared.$(MD4ARCH)-`uname -s | sed "s/\//_/"`$(GLIBC_VERSION_ARCH).tar -release.utils.static: mld_hash.static $(MAKE_TORRENT_STATIC) +release.utils.static: mld_hash.static $(BT_UTILS_STATIC) rm -rf mldonkey-* mkdir -p $(DISDIR) cp -f mld_hash.static $(DISDIR)/mld_hash && strip $(DISDIR)/mld_hash ifeq ("$(BITTORRENT)", "yes") - cp -f make_torrent.static $(DISDIR)/make_torrent && strip $(DISDIR)/make_torrent + for i in "$(BT_UTILS_STATIC)"; do cp -f $$i $(DISDIR)/$$i && strip $(DISDIR)/$$i; done endif mv $(DISDIR) $(DISDIR)-$(CURRENT_VERSION) tar cf $(DISDIR).tar $(DISDIR)-$(CURRENT_VERSION) diff --git a/distrib/ChangeLog b/distrib/ChangeLog index 316f376d..58fb24c1 100644 --- a/distrib/ChangeLog +++ b/distrib/ChangeLog @@ -14,6 +14,11 @@ http://mldonkey.sourceforge.net/Windows#MinGW_Installation ChangeLog ========= +2011/01/23 +7442: BT: DHT tracker support (ygrek) +- new options BT-dht_port, BT-use_trackers, BT-dht_bootstrap_nodes +- new verbosity level "dht" +------------------------------------------------------------------------------- 2011/01/22: version 3.0.7 = tag release-3-0-7 2011/01/06 diff --git a/src/daemon/common/commonOptions.ml b/src/daemon/common/commonOptions.ml index d390ebbc..c1b60557 100644 --- a/src/daemon/common/commonOptions.ml +++ b/src/daemon/common/commonOptions.ml @@ -578,7 +578,8 @@ let verbosity = define_expert_option current_section ["verbosity"] act : debug activity bw : debug bandwidth geo : debug GeoIP - unexp : debug unexpected messages" + unexp : debug unexpected messages + dht : debug DHT" string_option "" @@ -1909,6 +1910,7 @@ let verbose_activity = ref false let verbose_user_commands = ref false let verbose_geoip = ref false let verbose_unexpected_messages = ref false +let verbose_dht = ref (ref false) let set_all v = verbose_msg_clients := v; @@ -1938,7 +1940,8 @@ let set_all v = verbose_activity := v; verbose_user_commands := v; Geoip.verbose := v; - verbose_unexpected_messages := v + verbose_unexpected_messages := v; + !verbose_dht := v let _ = option_hook verbosity (fun _ -> @@ -1975,6 +1978,7 @@ let _ = | "unexp" -> verbose_unexpected_messages := true | "com" -> verbose_user_commands := true | "geo" -> Geoip.verbose := true + | "dht" -> !verbose_dht := true | "all" -> diff --git a/src/daemon/common/commonTypes.ml b/src/daemon/common/commonTypes.ml index a6c0e838..3f4e6e20 100644 --- a/src/daemon/common/commonTypes.ml +++ b/src/daemon/common/commonTypes.ml @@ -87,7 +87,6 @@ let file_string_of_uid uid = exception Illegal_urn of string exception Torrent_started of string exception Torrent_already_exists of string -exception Torrent_can_not_be_used of string let uid_of_string s = let s = String.lowercase s in diff --git a/src/daemon/driver/driverInterface.ml b/src/daemon/driver/driverInterface.ml index 5f4373b3..741d02ae 100644 --- a/src/daemon/driver/driverInterface.ml +++ b/src/daemon/driver/driverInterface.ml @@ -1128,8 +1128,6 @@ let gui_reader (gui: gui_record) t _ = gui_send gui (Console (Printf.sprintf "Failure: %s\n" s)) | Torrent_started s -> gui_send gui (Console (Printf.sprintf "\nInfo: Torrent %s started\n" s)) - | Torrent_can_not_be_used s -> - gui_send gui (Console (Printf.sprintf "\nError: Torrent %s does not have valid tracker URLs\n" s)) | Torrent_already_exists s -> gui_send gui (Console (Printf.sprintf "\nError: Torrent %s is already in download queue\n" s)) | e -> diff --git a/src/networks/bittorrent/bTClients.ml b/src/networks/bittorrent/bTClients.ml index b627daf7..929a95cc 100644 --- a/src/networks/bittorrent/bTClients.ml +++ b/src/networks/bittorrent/bTClients.ml @@ -295,13 +295,7 @@ let connect_trackers file event need_sources f = (* only re-enable after normal error *) | Disabled _ -> t.tracker_status <- Enabled | _ -> ()) file.file_trackers; - let enabled_trackers = List.filter (fun t -> tracker_is_enabled t) file.file_trackers in - if enabled_trackers = [] && (file_state file) <> FilePaused then - begin - file_pause (as_file file) (CommonUserDb.admin_user ()); - lprintf_file_nl (as_file file) "Paused %s, no usable trackers left" (file_best_name (as_file file)) - end; - enabled_trackers + List.filter (fun t -> tracker_is_enabled t) file.file_trackers end in List.iter (fun t -> @@ -370,6 +364,9 @@ let connect_trackers file event need_sources f = (show_tracker_url t.tracker_url) (t.tracker_interval - (last_time () - t.tracker_last_conn)) file.file_name ) enabled_trackers +let connect_trackers file event need_sources f = + if !!use_trackers then connect_trackers file event need_sources f + let start_upload c = set_client_upload (as_client c) (as_file c.client_file); set_client_has_a_slot (as_client c) NormalSlot; @@ -540,6 +537,20 @@ let set_bit s n = let max_range_requests = 5 (* How much bytes we can request in one Piece *) +let reserved () = + let s = String.make 8 '\x00' in + s.[7] <- (match !bt_dht with None -> '\x00' | Some _ -> '\x01'); + s + +(** handshake *) +let send_init client_uid file_id sock = + let buf = Buffer.create 100 in + buf_string8 buf "BitTorrent protocol"; + Buffer.add_string buf (reserved ()); + Buffer.add_string buf (Sha1.direct_to_string file_id); + Buffer.add_string buf (Sha1.direct_to_string client_uid); + let s = Buffer.contents buf in + write_string sock s (** A wrapper to send Interested message to a client. (Send interested only if needed) @@ -596,6 +607,9 @@ let parse_reserved rbits c = c.client_azureus_messaging_protocol <- has_bit 0 0x80 +let show_client c = + let (ip,port) = c.client_host in + Printf.sprintf "%s:%d %S" (Ip.to_string ip) port (brand_to_string c.client_brand) (** This function is called to parse the first message that a client send. @@ -661,11 +675,8 @@ let rec client_parse_header counter cc init_sent gconn sock c *) in - if !verbose_msg_clients then begin - let (ip,port) = c.client_host in - lprintf_nl "Client %d: Connected from %s:%d" (client_num c) - (Ip.to_string ip) port; - end; + if !verbose_msg_clients then + lprintf_nl "Client %d: Connected from %s" (client_num c) (show_client c); parse_reserved rbits c; @@ -699,7 +710,11 @@ let rec client_parse_header counter cc init_sent gconn sock if !verbose_msg_clients then lprintf_nl "file and client found"; (* if not c.client_incoming then *) - send_bitfield c; + send_bitfield c; (* BitField is always the first message *) + begin match c.client_dht, !bt_dht with + | true, Some dht -> send_client c (DHT_Port dht.BT_DHT.M.dht_port) + | _ -> () + end; c.client_blocks_sent <- file.file_blocks_downloaded; (* TODO !!! : send interested if and only if we are interested @@ -733,8 +748,8 @@ let rec client_parse_header counter cc init_sent gconn sock (Ip.to_string ip) port (Sha1.to_hexa file_id) | e -> lprintf_nl "Exception %s in client_parse_header" (Printexc2.to_string e); - close sock (Closed_for_exception e); - raise e + close sock (Closed_for_exception e); + raise e (** Update the bitmap of a client. Unclear if it is still useful. @@ -982,7 +997,7 @@ and client_to_client c sock msg = try match msg with - Piece (num, offset, s, pos, len) -> + | Piece (num, offset, s, pos, len) -> (*A Piece message contains the data*) set_client_state c (Connected_downloading (file_num file)); (*flag it as a good client *) @@ -1000,7 +1015,7 @@ and client_to_client c sock msg = | (p1,p2,r) :: _ -> let (x,y) = CommonSwarming.range_range r in lprintf_file_nl (as_file file) "Current range from %s : %Ld [%d] (asked %Ld-%Ld[%Ld-%Ld])" - (brand_to_string c.client_brand) position len + (show_client c) position len p1 p2 x y ); @@ -1165,9 +1180,8 @@ and client_to_client c sock msg = None -> (* Afaik this is no protocol violation and happens if the client didn't send a client bitmap after the handshake. *) - let (ip,port) = c.client_host in - if !verbose_msg_clients then lprintf_file_nl (as_file file) "%s:%d with software %s : Choke send, but no client bitmap" - (Ip.to_string ip) port (brand_to_string c.client_brand) + if !verbose_msg_clients then lprintf_file_nl (as_file file) "%s : Choke send, but no client bitmap" + (show_client c) | Some up -> CommonSwarming.clear_uploader_intervals up end; @@ -1233,6 +1247,20 @@ and client_to_client c sock msg = if !verbose_msg_clients then lprintf_file_nl (as_file file) "Error: received cancel request but client has no slot" + | DHT_Port port -> + match !bt_dht with + | None -> + if !verbose_msg_clients then + lprintf_file_nl (as_file file) "Received DHT PORT when DHT is disabled. From %s" (show_client c) + | Some dht -> + BT_DHT.M.ping dht (fst c.client_host, port) begin function + | None -> + if !verbose then + lprintf_file_nl (as_file file) "Peer %s didn't reply to DHT ping on port %d" (show_client c) port + | Some (id,addr) -> + BT_DHT.update dht Kademlia.Good id addr + end + with e -> lprintf_file_nl (as_file file) "Error %s while handling MESSAGE: %s" (Printexc2.to_string e) (TcpMessages.to_string msg) @@ -1615,6 +1643,24 @@ let talk_to_tracker file need_sources = in connect_trackers file event need_sources f +let talk_to_dht file need_sources = + match !bt_dht with + | None -> () + | Some dht -> + if !verbose then lprintf_file_nl (as_file file) "DHT announce"; + file.file_last_dht_announce <- last_time (); + BT_DHT.query_peers dht file.file_id (fun (_,addr as node) token peers -> + BT_DHT.M.announce dht addr !!client_port token file.file_id (fun _ -> ()) ~kerr:(fun () -> + if !verbose then lprintf_file_nl (as_file file) "DHT announce to %s failed" (BT_DHT.show_node node)); + if need_sources then + begin + List.iter (fun (ip,port) -> maybe_new_client file Sha1.null ip port) peers; + resume_clients file + end) + +let talk_to_tracker file need_sources = + if file.file_last_dht_announce + 14*60 < last_time () && not file.file_private then talk_to_dht file need_sources; + talk_to_tracker file need_sources (** Check to see if file is finished, if not try to get sources for it diff --git a/src/networks/bittorrent/bTComplexOptions.ml b/src/networks/bittorrent/bTComplexOptions.ml index e0271bf1..0d8095da 100644 --- a/src/networks/bittorrent/bTComplexOptions.ml +++ b/src/networks/bittorrent/bTComplexOptions.ml @@ -32,6 +32,11 @@ open BTTypes open BTOptions open BTGlobals +let bt_dht_ini = create_options_file "bt_dht.ini" +let bt_dht_section = file_section bt_dht_ini [] "" + +let dht_routing_table = define_option bt_dht_section ["dht_routing_table"] "" + Kademlia.RoutingTableOption.t (Kademlia.create ()) let bt_stats_ini = create_options_file "stats_bt.ini" let bt_stats_section = file_section bt_stats_ini [] "" @@ -133,7 +138,11 @@ let value_to_file file_size file_state user group assocs = let file_creation_date = try get_value "file_creation_date" value_to_int64 with Not_found -> Int64.zero in let file_modified_by = try get_value "file_modified_by" value_to_string with Not_found -> "" in let file_encoding = try get_value "file_encoding" value_to_string with Not_found -> "" in - let file_is_private = try get_value "file_is_private" value_to_int64 with Not_found -> Int64.zero in + let file_is_private = + try get_value "file_is_private" value_to_bool with + | Not_found -> false + | _ -> try get_value "file_is_private" value_to_int64 <> 0L with _ -> false + in let file_files = try let file_files = (get_value "file_files" @@ -260,9 +269,8 @@ let save_config () = let config_files_loaded = ref false let load _ = - (try - Options.load bt_stats_ini; - with Sys_error _ -> ()); + begin try Options.load bt_stats_ini with Sys_error _ -> () end; + begin try Options.load bt_dht_ini with Sys_error _ -> () end; check_client_uid (); config_files_loaded := true @@ -300,6 +308,7 @@ let save _ = guptime =:= !!guptime + (last_time () - start_time) - !diff_time; diff_time := (last_time () - start_time); Options.save_with_help bt_stats_ini; + Options.save_with_help bt_dht_ini; end (* lprintf "SAVED\n"; *) diff --git a/src/networks/bittorrent/bTGlobals.ml b/src/networks/bittorrent/bTGlobals.ml index 36c7ab0c..ab24ae04 100644 --- a/src/networks/bittorrent/bTGlobals.ml +++ b/src/networks/bittorrent/bTGlobals.ml @@ -153,6 +153,8 @@ let current_files = ref ([] : BTTypes.file list) let listen_sock = ref (None : TcpServerSocket.t option) +let bt_dht = ref (None : BT_DHT.M.t option) + let files_by_uid = Hashtbl.create 13 let max_range_len = Int64.of_int (1 lsl 14) @@ -299,6 +301,8 @@ let new_file file_id t torrent_diskname file_temp file_state user group = file_shared = None; file_session_uploaded = Int64.zero; file_session_downloaded = Int64.zero; + file_last_dht_announce = 0; + file_private = t.torrent_private; } and file_impl = { dummy_file_impl with impl_file_owner = user; diff --git a/src/networks/bittorrent/bTInteractive.ml b/src/networks/bittorrent/bTInteractive.ml index eba8f634..3f8f7ddd 100644 --- a/src/networks/bittorrent/bTInteractive.ml +++ b/src/networks/bittorrent/bTInteractive.ml @@ -212,166 +212,102 @@ let auto_links = let op_file_print file o = let buf = o.conn_buf in - if use_html_mods o then begin - Printf.bprintf buf "\\\\" (html_mods_cntr ()); - html_mods_td buf [ - ("Filename", "sr br", "Filename"); - ("", "sr", file.file_name) ]; - - Printf.bprintf buf "\\\\" (html_mods_cntr ()); - html_mods_td buf [ - ("Torrent metadata hash", "sr", "Hash"); - ("", "sr", Sha1.to_hexa file.file_id) ]; - - Printf.bprintf buf "\\\\" (html_mods_cntr ()); - html_mods_td buf [ - ("Search for other possible Torrent Files", "sr br", "Torrent Srch"); - ("", "sr", Printf.sprintf "\\IsoHunt\\" - (file.file_name) - ) - ]; + if use_html_mods o then + begin + let emit text ?(desc=text) value = + Printf.bprintf buf "\\\\" (html_mods_cntr ()); + html_mods_td buf [ + (desc, "sr br", text); + ("", "sr", value) + ] + in + + emit (_s"Filename") file.file_name; + emit (_s"Hash") ~desc:(_s"Torrent metadata hash") (Sha1.to_hexa file.file_id); + emit (_s"Torrent search") ~desc:(_s"Search for similar torrent files") (Printf.sprintf + "\\IsoHunt\\" file.file_name); - Printf.bprintf buf "\\\\" (html_mods_cntr ()); let tracker_header_printed = ref false in List.iter (fun tracker -> let tracker_url = show_tracker_url tracker.tracker_url in let tracker_text = + if not !!use_trackers then + Printf.sprintf "disabled: %s" tracker_url + else match tracker.tracker_status with - | Disabled s | Disabled_mld s -> - Printf.sprintf "\\disabled: %s\\\\--error: %s\\" tracker_url s - | Disabled_failure (i,s) -> - Printf.sprintf "\\disabled: %s\\\\--error: %s (try %d)\\" tracker_url s i - | _ -> - Printf.sprintf "enabled: %s" tracker_url - + | Disabled s | Disabled_mld s -> + Printf.sprintf "\\disabled: %s\\\\--error: %s\\" + tracker_url s + | Disabled_failure (i,s) -> + Printf.sprintf "\\disabled: %s\\\\--error: %s (try %d)\\" + tracker_url s i + | _ -> + Printf.sprintf "enabled: %s" tracker_url in - html_mods_td buf [ - (if not !tracker_header_printed then - ("Tracker(s)", "sr br", "Tracker(s)") - else - ("", "sr br", "") - ); - (tracker_url, "sr", tracker_text)]; - Printf.bprintf buf "\\\\" (html_mods_cntr ()); + let text = if not !tracker_header_printed then _s"Tracker(s)" else "" in + emit text tracker_text; tracker_header_printed := true; ) file.file_trackers; - html_mods_td buf [ - ("Torrent Filename", "sr br", "Torrent Fname"); - ("", "sr", file.file_torrent_diskname) ]; - - Printf.bprintf buf "\\\\" (html_mods_cntr ()); - - html_mods_td buf [ - ("Comment", "sr br", "Comment"); - ("", "sr", match file.file_comment with - "" -> "-" - | s -> auto_links s) ]; - - Printf.bprintf buf "\\\\" (html_mods_cntr ()); - html_mods_td buf [ - ("Created by", "sr br", "Created by"); - ("", "sr", match file.file_created_by with - "" -> "-" - | s -> auto_links s) ]; - - Printf.bprintf buf "\\\\" (html_mods_cntr ()); - html_mods_td buf [ - ("Creation date", "sr br", "Creation date"); - ("", "sr", Date.to_string (Int64.to_float file.file_creation_date) ) ]; - - Printf.bprintf buf "\\\\" (html_mods_cntr ()); - html_mods_td buf [ - ("Modified by", "sr br", "Modified by"); - ("", "sr", match file.file_modified_by with - "" -> "-" - | s -> auto_links s) ]; - - Printf.bprintf buf "\\\\" (html_mods_cntr ()); - html_mods_td buf [ - ("Encoding", "sr br", "Encoding"); - ("", "sr", match file.file_encoding with - "" -> "-" - | _ -> file.file_encoding) ]; - - Printf.bprintf buf "\\\\" (html_mods_cntr ()); - html_mods_td buf [ - ("Piece size", "sr br", "Piece size"); - ("", "sr", Int64.to_string file.file_piece_size) ]; + emit (_s"Torrent filename") file.file_torrent_diskname; + emit (_s"Comment") (match file.file_comment with "" -> "-" | s -> auto_links s); + emit (_s"Created by") (match file.file_created_by with "" -> "-" | s -> auto_links s); + emit (_s"Creation date") (Date.to_string (Int64.to_float file.file_creation_date)); + emit (_s"Modified by") (match file.file_modified_by with "" -> "-" | s -> auto_links s); + emit (_s"Encoding") (match file.file_encoding with "" -> "-" | s -> s); + emit (_s"Piece size") (Int64.to_string file.file_piece_size); + emit (_s"Private") ~desc:(_s"Private torrents get peers only via trackers") + (if file.file_private then _s "yes" else _s "no"); + if !bt_dht <> None then + emit (_s"Last DHT announce") ~desc:(_s"Last time this torrent was announced in DHT") + (string_of_date file.file_last_dht_announce); let rec print_first_tracker l = match l with | [] -> () | t :: q -> - if not (tracker_is_enabled t) then print_first_tracker q + if not (tracker_is_enabled t) then + print_first_tracker q else begin - Printf.bprintf buf "\\\\" (html_mods_cntr ()); - html_mods_td buf [ - ("Last Tracker Announce", "sr br", "Last Announce"); - ("", "sr", string_of_date t.tracker_last_conn) ]; + emit (_s"Last announce") ~desc:(_s"Last time this torrent was announced to the tracker") + (string_of_date t.tracker_last_conn); if t.tracker_last_conn > 1 then - begin - Printf.bprintf buf "\\\\" (html_mods_cntr ()); - html_mods_td buf [ - ("Next Tracker Announce (planned)", "sr br", "Next Announce"); - ("", "sr", string_of_date (t.tracker_last_conn + t.tracker_interval)) ]; - end; - - Printf.bprintf buf "\\\\" (html_mods_cntr ()); - html_mods_td buf [ - ("Tracker Announce Interval", "sr br", "Announce Interval"); - ("", "sr", Printf.sprintf "%d seconds" t.tracker_interval) ]; - - Printf.bprintf buf "\\\\" (html_mods_cntr ()); - html_mods_td buf [ - ("Minimum Tracker Announce Interval", "sr br", "Min Announce Interval"); - ("", "sr", Printf.sprintf "%d seconds" t.tracker_min_interval) ]; + emit (_s"Next announce") ~desc:(_s"Time of the next announce to the tracker (planned)") + (string_of_date (t.tracker_last_conn + t.tracker_interval)); + + emit (_s"Announce interval") ~desc:(_s"Tracker announce interval") + (Printf.sprintf "%d seconds" t.tracker_interval); + + emit (_s"Min announce interval") ~desc:(_s"Minimum tracker announce interval") + (Printf.sprintf "%d seconds" t.tracker_min_interval); (* show only interesting answers*) - if t.tracker_torrent_downloaded > 0 then begin - Printf.bprintf buf "\\\\" (html_mods_cntr ()); - html_mods_td buf [ - ("Downloaded", "sr br", "Downloaded"); - ("", "sr", Printf.sprintf "%d" t.tracker_torrent_downloaded) ] - end; - if t.tracker_torrent_complete > 0 then begin - Printf.bprintf buf "\\\\" (html_mods_cntr ()); - html_mods_td buf [ - ("Complete (seeds)", "sr br", "Complete"); - ("", "sr", Printf.sprintf "%d" t.tracker_torrent_complete) ] - end; - if t.tracker_torrent_incomplete > 0 then begin - Printf.bprintf buf "\\\\" (html_mods_cntr ()); - html_mods_td buf [ - ("Incomplete (peers)", "sr br", "Incomplete"); - ("", "sr", Printf.sprintf "%d" t.tracker_torrent_incomplete) ] - end; - if t.tracker_torrent_total_clients_count > 0 then begin - Printf.bprintf buf "\\\\" (html_mods_cntr ()); - html_mods_td buf [ - ("Total client count", "sr br", "All clients"); - ("", "sr", Printf.sprintf "%d" t.tracker_torrent_total_clients_count) ] - end; - if t.tracker_torrent_last_dl_req > 0 then begin - Printf.bprintf buf "\\\\" (html_mods_cntr ()); - html_mods_td buf [ - ("Latest torrent request", "sr br", "Latest request"); - ("", "sr", Printf.sprintf "%ds" t.tracker_torrent_last_dl_req) ] - end; - if String.length t.tracker_id > 0 then begin - Printf.bprintf buf "\\\\" (html_mods_cntr ()); - html_mods_td buf [ - ("Tracker id", "sr br", "Tracker id"); - ("", "sr", t.tracker_id) ] - end; - if String.length t.tracker_key > 0 then begin - Printf.bprintf buf "\\\\" (html_mods_cntr ()); - html_mods_td buf [ - ("Tracker key", "sr br", "Tracker key"); - ("", "sr", t.tracker_key) ] - end - end in + if t.tracker_torrent_downloaded > 0 then + emit (_s"Downloaded") (string_of_int t.tracker_torrent_downloaded); + + if t.tracker_torrent_complete > 0 then + emit (_s"Seeders") ~desc:(_s"Peers that have complete download") + (string_of_int t.tracker_torrent_complete); + + if t.tracker_torrent_incomplete > 0 then + emit (_s"Leechers") ~desc:(_s"Peers that have incomplete download") + (string_of_int t.tracker_torrent_incomplete); + + if t.tracker_torrent_total_clients_count > 0 then + emit (_s"Peers") ~desc:(_s"Total clients count") + (string_of_int t.tracker_torrent_total_clients_count); + + if t.tracker_torrent_last_dl_req > 0 then + emit (_s"Latest request") (Printf.sprintf "%ds" t.tracker_torrent_last_dl_req); + + if String.length t.tracker_id > 0 then + emit (_s"Tracker id") t.tracker_id; + + if String.length t.tracker_key > 0 then + emit (_s"Tracker key") t.tracker_key; + end + in print_first_tracker file.file_trackers; (* This is bad. Magic info should be automatically filled in when @@ -404,17 +340,13 @@ let op_file_print file o = let cntr = ref 0 in List.iter (fun (filename, size, magic) -> - Printf.bprintf buf "\\\\" (html_mods_cntr ()); let fs = Printf.sprintf "File %d" !cntr in let magic_string = match magic with - None -> "" + | None -> "" | Some m -> Printf.sprintf " / %s" m; in - html_mods_td buf [ - (fs, "sr br", fs); - ("", "sr", (Printf.sprintf "%s (%Ld bytes)%s" filename size magic_string)) - ]; + emit fs (Printf.sprintf "%s (%Ld bytes)%s" filename size magic_string); incr cntr; ) file.file_files end (* use_html_mods *) @@ -698,11 +630,6 @@ let load_torrent_string s user group = (* Save the torrent, because we later want to put it in the seeded directory. *) - let torrent_is_usable = ref false in - List.iter (fun url -> if can_handle_tracker (make_tracker_url url) then torrent_is_usable := true) - (if torrent.torrent_announce_list <> [] then torrent.torrent_announce_list else [torrent.torrent_announce]); - if not !torrent_is_usable then raise (Torrent_can_not_be_used torrent.torrent_name); - let torrent_diskname = CommonFile.concat_file downloads_directory (torrent.torrent_name ^ ".torrent") in if Sys.file_exists torrent_diskname then begin @@ -866,9 +793,6 @@ let scan_new_torrents_directory () = load_torrent_file file user user.user_default_group; (try Sys.remove file with _ -> ()) with - Torrent_can_not_be_used _ -> - Unix2.rename file (Filename.concat old_directory file_basename); - lprintf_nl "Torrent %s does not have valid tracker URLs, moved to torrents/old ..." file_basename | e -> Unix2.rename file (Filename.concat old_directory file_basename); lprintf_nl "Error %s in scan_new_torrents_directory for %s, moved to torrents/old ..." @@ -951,7 +875,6 @@ let op_network_parse_url url user group = "", true with Torrent_already_exists _ -> "A torrent with this name is already in the download queue", false - | Torrent_can_not_be_used _ -> "This torrent does not have valid tracker URLs", false with e -> lprintf_nl "Exception %s while 2nd loading" (Printexc2.to_string e); let s = Printf.sprintf "Can not load load torrent file: %s" @@ -1067,8 +990,8 @@ let compute_torrent filename announce comment = filename announce comment; let basename = Printf.sprintf "%s.torrent" (Filename.basename filename) in let torrent = Filename.concat seeded_directory basename in - let is_private = 0 in - let file_id = BTTorrent.generate_torrent announce torrent comment (Int64.of_int is_private) filename in + let is_private = false in + let file_id = BTTorrent.generate_torrent announce torrent comment is_private filename in match try_share_file torrent with | `Err msg -> failwith msg | `Ok torrent_path -> @@ -1332,7 +1255,6 @@ let op_gui_message s user = let file = load_torrent_string text user user.user_default_group in raise (Torrent_started file.file_name) with e -> (match e with - | Torrent_can_not_be_used s -> lprintf_nl "Loading torrent from GUI: torrent %s can not be used" s | Torrent_already_exists s -> lprintf_nl "Loading torrent from GUI: torrent %s is already in download queue" s | _ -> ()); raise e) @@ -1405,7 +1327,7 @@ let _ = [ !!client_port, "client_port TCP"; !!BTTracker.tracker_port, "tracker_port TCP"; - ]); + ] @ (match !bt_dht with None -> [] | Some dht -> [dht.BT_DHT.M.dht_port,"dht_port UDP"])); network.op_network_porttest_result <- (fun _ -> !porttest_result); network.op_network_porttest_start <- (fun _ -> azureus_porttest_random := (Random.int 100000); diff --git a/src/networks/bittorrent/bTMain.ml b/src/networks/bittorrent/bTMain.ml index 17a91eef..b1e073a7 100644 --- a/src/networks/bittorrent/bTMain.ml +++ b/src/networks/bittorrent/bTMain.ml @@ -39,6 +39,25 @@ let lprintf_nl fmt = let is_enabled = ref false +let stop_dht () = + match !bt_dht with + | None -> () + | Some dht -> + if !verbose then lprintf_nl "stopping DHT"; + BT_DHT.stop dht; + bt_dht := None + +let start_dht () = + let already = match !bt_dht with Some dht -> dht.BT_DHT.M.dht_port = !!dht_port | None -> false in + if not already && !!dht_port > 0 then + begin + stop_dht (); + lprintf_nl "starting DHT on port %d" !!dht_port; + let dht = BT_DHT.start !!dht_routing_table !!dht_port CommonGlobals.udp_write_controler in + BT_DHT.bootstrap dht ~routers:!!dht_bootstrap_nodes; + bt_dht := Some dht + end + let disable enabler () = if !enabler then begin is_enabled := false; @@ -51,6 +70,7 @@ let disable enabler () = listen_sock := None; TcpServerSocket.close sock Closed_by_user); BTTracker.stop_tracker (); + stop_dht (); if !!enable_bittorrent then enable_bittorrent =:= false end @@ -76,6 +96,7 @@ let enable () = with e -> lprintf "Exception in BTTracker.start_tracker: %s\n" (Printexc2.to_string e)); + start_dht (); if !!share_scan_interval <> 0 then add_session_timer enabler (float_of_int (!!share_scan_interval * 60)) (fun _ -> BTInteractive.share_files (); @@ -114,11 +135,17 @@ let enable () = () let _ = + CommonOptions.verbose_dht := Kademlia.verbose; network.op_network_is_enabled <- (fun _ -> !!CommonOptions.enable_bittorrent); option_hook enable_bittorrent (fun _ -> if !CommonOptions.start_running_plugins then if !!enable_bittorrent then network_enable network else network_disable network); + option_hook dht_port (fun _ -> + if !is_enabled then + begin + if !!dht_port = 0 then stop_dht () else start_dht () + end); (* network.op_network_save_simple_options <- BTComplexOptions.save_config; network.op_network_load_simple_options <- diff --git a/src/networks/bittorrent/bTOptions.ml b/src/networks/bittorrent/bTOptions.ml index bb2bd14b..1664fec3 100644 --- a/src/networks/bittorrent/bTOptions.ml +++ b/src/networks/bittorrent/bTOptions.ml @@ -164,3 +164,20 @@ let get_user_agent () = CommonOptions.get_user_agent () else !!user_agent +let dht_port = define_option bittorrent_section ["dht_port"] + "The UDP port to bind the DHT node to (0 to disable)" + port_option 12345 + +let use_trackers = define_option bittorrent_section ["use_trackers"] + "Send announces to trackers" + bool_option true + +let dht_bootstrap_nodes = define_option bittorrent_section ["dht_bootstrap_nodes"] + "Addresses of nodes used to bootstrap DHT network. Tried in order until enough nodes are found." + (list_option addr_option) + [ + "service.ygrek.org.ua",6881; + "router.utorrent.com",6881; + "router.transmission.com",6881; + ] + diff --git a/src/networks/bittorrent/bTProtocol.ml b/src/networks/bittorrent/bTProtocol.ml index b940242c..6b1aaac3 100644 --- a/src/networks/bittorrent/bTProtocol.ml +++ b/src/networks/bittorrent/bTProtocol.ml @@ -196,7 +196,7 @@ No payload: * 1 - unchoke: you have been unblocked * 2 - interested: I'm interested in downloading this file now * 3 - not interested: I'm not interested in downloading this file now -With bencoded payload: +With payload: * 4 - have int : index of new completed chunk * 5 - bitfield: @@ -210,10 +210,12 @@ With bencoded payload: int: index int: begin string: piece - * 8 - cancel: cancel a requesu + * 8 - cancel: cancel a request int: index int: begin int: length (power of 2, 2 ^ 15) + * 9 - DHT port announcement + int16: UDP port Choke/unchoke every 10 seconds *) @@ -263,6 +265,7 @@ module TcpMessages = struct | Cancel of int * int64 * int64 | Ping | PeerID of string + | DHT_Port of int let to_string msg = match msg with @@ -280,10 +283,11 @@ module TcpMessages = struct Printf.sprintf "Cancel %d %Ld[%Ld]" index offset len | Ping -> "Ping" | PeerID s -> Printf.sprintf "PeerID [%s]" (String.escaped s) + | DHT_Port n -> Printf.sprintf "DHT_Port %d" n let parsing opcode m = match opcode with - 0 -> Choke + | 0 -> Choke | 1 -> Unchoke | 2 -> Interested | 3 -> NotInterested @@ -292,6 +296,7 @@ module TcpMessages = struct | 6 -> Request (get_int m 0, get_uint64_32 m 4, get_uint64_32 m 8) | 7 -> Piece (get_int m 0, get_uint64_32 m 4, m, 8, String.length m - 8) | 8 -> Cancel (get_int m 0, get_uint64_32 m 4, get_uint64_32 m 8) + | 9 -> DHT_Port (get_int16 m 0) | -1 -> PeerID m | _ -> raise Not_found @@ -316,10 +321,10 @@ module TcpMessages = struct buf_int buf num; buf_int64_32 buf index; Buffer.add_substring buf s pos len - | Cancel _ -> () | PeerID _ -> () | Ping -> () + | DHT_Port n -> buf_int8 buf 9; buf_int16 buf n end; let s = Buffer.contents buf in str_int s 0 (String.length s - 4); @@ -496,6 +501,7 @@ let bt_handler parse_fun handler c sock = (* lprintf "Message complete: %d\n" msg_len; *) if msg_len > 0 then let opcode = get_int8 b.buf b.pos in + (* FIXME sub *) let payload = String.sub b.buf (b.pos+1) (msg_len-1) in buf_used b msg_len; (* lprintf "Opcode %d\n" opcode; *) @@ -587,32 +593,6 @@ let set_bt_sock sock info ghandler = (* TcpBufferedSocket.close sock "write done" *) | refill :: _ -> refill sock) -(* -No payload: - * 0 - choke: you have been blocked - * 1 - unchoke: you have been unblocked - * 2 - interested: I'm interested in downloading this file now - * 3 - not interested: I'm not interested in downloading this file now -With bencoded payload: - * 4 - have - int : index of new completed chunk - * 5 - bitfield: - string: a bitfield of bit 1 for downloaded chunks - byte: bits are inverted 0....7 ---> 7 .... 0 - * 6 - request - int: index - int: begin - int: length (power of 2, 2 ^ 15) - * 7 - piece - int: index - int: begin - string: piece - * 8 - cancel: cancel a requesu - int: index - int: begin - int: length (power of 2, 2 ^ 15) -*) - let send_client client_sock msg = do_if_connected client_sock (fun sock -> try @@ -627,13 +607,3 @@ let send_client client_sock msg = (Printexc2.to_string e) ) -let zero8 = String.make 8 '\000' - -let send_init client_uid file_id sock = - let buf = Buffer.create 100 in - buf_string8 buf "BitTorrent protocol"; - Buffer.add_string buf zero8; - Buffer.add_string buf (Sha1.direct_to_string file_id); - Buffer.add_string buf (Sha1.direct_to_string client_uid); - let s = Buffer.contents buf in - write_string sock s diff --git a/src/networks/bittorrent/bTTorrent.ml b/src/networks/bittorrent/bTTorrent.ml index c083dfb4..dd306658 100644 --- a/src/networks/bittorrent/bTTorrent.ml +++ b/src/networks/bittorrent/bTTorrent.ml @@ -80,7 +80,7 @@ let decode_torrent s = let file_encoding = ref "" in let file_codepage = ref zero in let file_ed2k_hash = ref "" in - let file_is_private = ref zero in + let file_is_private = ref false in let file_aps = ref (List []) in let file_dht_backup_enable = ref zero in let length = ref zero in @@ -211,10 +211,8 @@ let decode_torrent s = | "publisher-url.utf-8", String publisher_url_utf8 -> () | "private", Int n -> - (* TODO: if set to 1, only accept peers from tracker *) - file_is_private := n; - if !verbose_msg_servers && - Int64.to_int !file_is_private = 1 then + file_is_private := n <> 0L; + if !verbose_msg_servers && !file_is_private then lprintf_nl "[BT] torrent is private" | key, _ -> if !verbose_msg_servers then @@ -365,7 +363,7 @@ let encode_torrent torrent = "name.utf-8", String torrent.torrent_name_utf8; "piece length", Int torrent.torrent_piece_size; "pieces", String pieces; - "private", Int torrent.torrent_private; + "private", Int (if torrent.torrent_private then 1L else 0L); ] in diff --git a/src/networks/bittorrent/bTTypes.ml b/src/networks/bittorrent/bTTypes.ml index 88910821..db54b0e7 100644 --- a/src/networks/bittorrent/bTTypes.ml +++ b/src/networks/bittorrent/bTTypes.ml @@ -40,7 +40,7 @@ type torrent = { mutable torrent_creation_date : int64; mutable torrent_modified_by : string; mutable torrent_encoding : string; - mutable torrent_private : int64; + mutable torrent_private : bool; (* mutable torrent_nodes : string; *) @@ -333,6 +333,9 @@ and file = { (** session uploaded and downloaded bytes, for statistics reporting *) mutable file_session_uploaded : int64; mutable file_session_downloaded : int64; + (** DHT specific *) + mutable file_last_dht_announce : int; + file_private : bool; } and ft = { diff --git a/src/networks/bittorrent/bT_DHT.ml b/src/networks/bittorrent/bT_DHT.ml new file mode 100644 index 00000000..a85db3be --- /dev/null +++ b/src/networks/bittorrent/bT_DHT.ml @@ -0,0 +1,684 @@ +(** + DHT + + http://www.bittorrent.org/beps/bep_0005.html +*) + +open Kademlia +open Printf + +let dht_query_timeout = 20 +let store_peer_timeout = minutes 30 +let secret_timeout = minutes 10 +let alpha = 3 + +let log_prefix = "dht" +let lprintf_nl fmt = Printf2.lprintf_nl2 log_prefix fmt + +let log = new logger log_prefix + +let catch f x = try `Ok (f x) with e -> `Exn e +let (&) f x = f x +let (!!) = Lazy.force + +let self_version = + let module A = Autoconf in + let n = int_of_string A.major_version * 100 + int_of_string A.minor_version * 10 + int_of_string A.sub_version - 300 in + assert (n > 0 && n < 256); + sprintf "ML%c%c" (if A.scm_version = "" then '=' else '+') (Char.chr n) + +(* 2-level association *) +module Assoc2 : sig + + type ('a,'b,'c) t + val create : unit -> ('a,'b,'c) t + val add : ('a,'b,'c) t -> 'a -> 'b -> 'c -> unit + val find_all : ('a,'b,'c) t -> 'a -> ('b,'c) Hashtbl.t + val find : ('a,'b,'c) t -> 'a -> 'b -> 'c option + val remove : ('a,'b,'c) t -> 'a -> 'b -> unit + val iter : ('a,'b,'c) t -> ('a -> 'b -> 'c -> unit) -> unit + val clear : ('a,'b,'c) t -> unit + +end = struct + +type ('a,'b,'c) t = ('a, ('b, 'c) Hashtbl.t) Hashtbl.t + +let create () = Hashtbl.create 13 +let add h a b c = + let hh = try Hashtbl.find h a with Not_found -> Hashtbl.create 3 in + Hashtbl.replace hh b c; + Hashtbl.replace h a hh +let find_all h a = try Hashtbl.find h a with Not_found -> Hashtbl.create 3 +let find h a b = try Some (Hashtbl.find (Hashtbl.find h a) b) with Not_found -> None +let remove h a b = try let ha = Hashtbl.find h a in Hashtbl.remove ha b; if Hashtbl.length ha = 0 then Hashtbl.remove h a with Not_found -> () +let iter h f = Hashtbl.iter (fun a h -> Hashtbl.iter (fun b c -> f a b c) h) h +let clear h = Hashtbl.clear h + +end + +module KRPC = struct + +type dict = (string * Bencode.value) list +let show_dict d = String.concat "," & List.map fst d + +type msg = + | Query of string * dict + | Response of dict + | Error of int64 * string + +let show_msg = function + | Query (name,args) -> sprintf "query %s(%s)" name (show_dict args) + | Response d -> sprintf "response (%s)" (show_dict d) + | Error (e,s) -> sprintf "error (%Ld,%S)" e s + +let encode (txn,msg) = + let module B = Bencode in + let x = match msg with + | Query (name,args) -> ["y", B.String "q"; "q", B.String name; "a", B.Dictionary args] + | Response dict -> ["y", B.String "r"; "r", B.Dictionary dict] + | Error (code,text) -> ["y", B.String "e"; "e", B.List [B.Int code; B.String text] ] + in + let x = ("t", B.String txn) :: ("v", B.String self_version):: x in + B.encode (B.Dictionary x) + +let str = function Bencode.String s -> s | _ -> failwith "str" +let int = function Bencode.Int s -> s | _ -> failwith "int" +let dict = function Bencode.Dictionary s -> s | _ -> failwith "dict" +let list = function Bencode.List l -> l | _ -> failwith "list" + +exception Protocol_error of string * string +exception Malformed_packet of string +exception Method_unknown of string + +let decode_exn s = + let module B = Bencode in + let module Array = struct let get x k = match x with B.Dictionary l -> List.assoc k l | _ -> failwith "decode get" end in + let x = try B.decode s with _ -> raise (Malformed_packet "decode") in + let txn = try str x.("t") with _ -> raise (Malformed_packet "txn") in + let ver = try Some (str x.("v")) with _ -> None in + try + let msg = match str x.("y") with + | "q" -> Query (str x.("q"), dict x.("a")) + | "r" -> Response (dict x.("r")) + | "e" -> begin match list x.("e") with B.Int n :: B.String s :: _ -> Error (n, s) | _ -> failwith "decode e" end + | _ -> failwith "type" + in (txn, ver, msg) + with + exn -> log #warn ~exn "err"; raise (Protocol_error (txn,"Invalid argument")) + +open BasicSocket +open UdpSocket + +let udp_set_reader socket f = + set_reader socket begin fun _ -> + try read_packets socket f with exn -> + log #warn ~exn "udp reader"; + close socket (Closed_for_exception exn) + end + +module A = Assoc2 + +let send sock (ip,port as addr) txnmsg = + let s = encode txnmsg in + log #debug "KRPC to %s : %S" (show_addr addr) s; + write sock false s ip port + +type t = UdpSocket.t * (addr, string, (addr -> dict -> unit) * (unit -> unit) * int) A.t + +let create port enabler bw_control answer : t = + let socket = create Unix.inet_addr_any port (fun sock event -> + match event with + | WRITE_DONE | CAN_REFILL -> () + | READ_DONE -> assert false (* set_reader prevents this *) + | BASIC_EVENT x -> match x with + | CLOSED _ -> () + | CAN_READ | CAN_WRITE -> assert false (* udpSocket implementation prevents this *) + | LTIMEOUT | WTIMEOUT | RTIMEOUT -> () (*close sock (Closed_for_error "KRPC timeout")*)) + in + set_write_controler socket bw_control; + set_wtimeout (sock socket) 5.; + set_rtimeout (sock socket) 5.; + let h = A.create () in + let timeout h = + let now = last_time () in + let bad = ref [] in + let total = ref 0 in + A.iter h (fun addr txn (_,kerr,t) -> incr total; if t < now then bad := (addr,txn,kerr) :: !bad); + log #info "timeouted %d of %d DHT queries" (List.length !bad) !total; + List.iter (fun (addr,txn,kerr) -> + A.remove h addr txn; + try kerr () with exn -> log #info ~exn "timeout for %s" (show_addr addr)) !bad; + in + BasicSocket.add_session_timer enabler 5. (fun () -> timeout h); + let handle addr (txn,ver,msg) = + let version = match ver with Some s -> sprintf "client %S " s | None -> "" in + log #debug "KRPC from %s %stxn %S : %s" (show_addr addr) version txn (show_msg msg); + match msg with + | Error _ -> + begin match A.find h addr txn with + | None -> log #warn "no txn %S for %s %s (error received)" txn (show_addr addr) version + | Some (_, kerr, _) -> A.remove h addr txn; kerr () + end + | Query (name,args) -> + let ret = answer addr name args in + send socket addr (txn, ret) + | Response ret -> + match A.find h addr txn with + | None -> log #warn "no txn %S for %s %s" txn (show_addr addr) version + | Some (k,_,_) -> A.remove h addr txn; k addr ret + in + let handle p = + match p.udp_addr with + | Unix.ADDR_UNIX _ -> assert false + | Unix.ADDR_INET (inet_addr,port) -> + let addr = (Ip.of_inet_addr inet_addr, port) in + let ret = ref None in + try +(* log #debug "recv %S" p.udp_content; *) + let r = decode_exn p.udp_content in + ret := Some r; + handle addr r + with exn -> + let version = match !ret with Some (_,Some s,_) -> sprintf " client %S" s | _ -> "" in + log #warn ~exn "dht handle packet from %s%s : %S" (show_addr addr) version p.udp_content; + let error txn code str = send socket addr (txn,(Error (Int64.of_int code,str))) in + match exn,!ret with + | Malformed_packet x, Some (txn, _, _) + | Protocol_error ("",x), Some(txn, _, _) | Protocol_error (txn,x), _ -> error txn 203 x + | Method_unknown x, Some (txn, _, _) -> error txn 204 x + | _, Some (txn, _, Query _) -> error txn 202 "" + | _ -> () + in + udp_set_reader socket handle; + (socket,h) + +let shutdown (socket,h) = + close socket Closed_by_user; + A.iter h (fun addr _ (_,kerr,_) -> + try kerr () with exn -> log #warn ~exn "shutdown for %s" (show_addr addr)); + A.clear h + +let write (socket,h) msg addr k ~kerr = + let tt = Assoc2.find_all h addr in + let rec loop () = (* choose txn FIXME *) + let txn = string_of_int (Random.int 1_000_000) in + match Hashtbl.mem tt txn with + | true -> loop () + | false -> txn + in + let txn = loop () in + Assoc2.add h addr txn (k,kerr,last_time () + dht_query_timeout); + send socket addr (txn,msg) + +end (* KRPC *) + +type query = +| Ping +| FindNode of id +| GetPeers of H.t +| Announce of H.t * int * string + +let show_query = function +| Ping -> "ping" +| FindNode id -> sprintf "find_node %s" (show_id id) +| GetPeers h -> sprintf "get_peers %s" (show_id h) +| Announce (h,port,token) -> sprintf "announce %s port=%d token=%S" (show_id h) port token + +type response = +| Ack +| Nodes of (id * addr) list +| Peers of string * addr list * (id * addr) list + +let strl f l = "[" ^ (String.concat " " & List.map f l) ^ "]" + +let show_node (id,addr) = sprintf "%s (%s)" (show_addr addr) (show_id id) + +let show_response = function +| Ack -> "ack" +| Nodes l -> sprintf "nodes %s" (strl show_node l) +| Peers (token,peers,nodes) -> sprintf "peers token=%S %s %s" token (strl show_addr peers) (strl show_node nodes) + +let parse_query_exn name args = + let get k = List.assoc k args in + let sha1 k = H.direct_of_string & KRPC.str & get k in + let p = match name with + | "ping" -> Ping + | "find_node" -> FindNode (sha1 "target") + | "get_peers" -> GetPeers (sha1 "info_hash") + | "announce_peer" -> Announce (sha1 "info_hash", Int64.to_int & KRPC.int & get "port", KRPC.str & get "token") + | s -> failwith (sprintf "parse_query name=%s" name) + in + sha1 "id", p + +let make_query id x = + let sha1 x = Bencode.String (H.direct_to_string x) in + let self = ("id", sha1 id) in + match x with + | Ping -> KRPC.Query ("ping", [self]) + | FindNode t -> KRPC.Query ("find_node", ["target", sha1 t; self]) + | GetPeers h -> KRPC.Query ("get_peers", ["info_hash", sha1 h; self]) + | Announce (h, port, token) -> KRPC.Query ("announce_peer", + ["info_hash", sha1 h; + "port", Bencode.Int (Int64.of_int port); + "token", Bencode.String token; + self]) + +let parse_peer s = + if String.length s <> 6 then failwith "parse_peer" else + let c i = int_of_char & s.[i] in + Ip.of_ints (c 0,c 1,c 2,c 3), (c 4 lsl 8 + c 5) + +let parse_nodes s = + assert (String.length s mod 26 = 0); + let i = ref 0 in + let nodes = ref [] in + while !i < String.length s do + nodes := (H.direct_of_string (String.sub s !i 20), parse_peer (String.sub s (!i+20) 6)) :: !nodes; + i := !i + 26; + done; + !nodes + +let make_peer (ip,port) = + assert (port <= 0xffff); + let (a,b,c,d) = Ip.to_ints ip in + let e = port lsr 8 and f = port land 0xff in + let s = String.create 6 in + let set i c = s.[i] <- char_of_int c in + set 0 a; set 1 b; set 2 c; set 3 d; set 4 e; set 5 f; + s + +let make_nodes nodes = + let s = String.create (26 * List.length nodes) in + let i = ref 0 in + List.iter (fun (id,addr) -> + String.blit (H.direct_to_string id) 0 s (!i*26) 20; + String.blit (make_peer addr) 0 s (!i*26+20) 6; + incr i + ) nodes; + s + +let parse_response_exn q dict = + let get k = List.assoc k dict in + let sha1 k = H.direct_of_string & KRPC.str & get k in + let p = match q with + | Ping -> Ack + | FindNode _ -> + let s = KRPC.str & get "nodes" in + Nodes (parse_nodes s) + | GetPeers _ -> + let token = KRPC.str & get "token" in + let nodes = try parse_nodes (KRPC.str & get "nodes") with Not_found -> [] in + let peers = try List.map (fun x -> parse_peer & KRPC.str x) & (KRPC.list & get "values") with Not_found -> [] in + Peers (token, peers, nodes) + | Announce _ -> Ack + in + sha1 "id", p + +let make_response id x = + let sha1 x = Bencode.String (H.direct_to_string x) in + let self = ("id", sha1 id) in + let str s = Bencode.String s in + match x with + | Ack -> KRPC.Response [self] + | Nodes nodes -> KRPC.Response [self;"nodes",str (make_nodes nodes)] + | Peers (token,peers,nodes) -> KRPC.Response + [self; + "token",str token; + "nodes",str (make_nodes nodes); + "values",Bencode.List (List.map (fun addr -> str (make_peer addr)) peers); + ] + +module Test = struct + +open Bencode + +let e = Dictionary ["t",String "aa"; "v", String self_version; "y", String "e"; "e", List [Int 201L; String "A Generic Error Occurred"] ] +let s = sprintf "d1:eli201e24:A Generic Error Occurrede1:t2:aa1:v4:%s1:y1:ee" self_version +let v = "aa", KRPC.Error (201L, "A Generic Error Occurred") + +let () = + assert (encode e = s); + assert (KRPC.decode_exn s = (fst v, Some self_version, snd v)); + assert (KRPC.encode v = s); + () + +end + +module Peers = Map.Make(struct type t = addr let compare = compare end) + +module M = struct + +type t = { + rt : Kademlia.table; (* routing table *) + rpc : KRPC.t; (* KRPC protocol socket *) + dht_port : int; (* port *) + torrents : (H.t, int Peers.t) Hashtbl.t; (* torrents announced by other peers *) + enabler : bool ref; (* timers' enabler *) +} + +let dht_query t addr q k ~kerr = + log #info "DHT query to %s : %s" (show_addr addr) (show_query q); + KRPC.write t.rpc (make_query t.rt.self q) addr begin fun addr dict -> + let (id,r) = try parse_response_exn q dict with exn -> kerr (); raise exn in + log #info "DHT response from %s (%s) : %s" (show_addr addr) (show_id id) (show_response r); + k (id,addr) r + end ~kerr + +let ping t addr k = dht_query t addr Ping begin fun node r -> + match r with Ack -> k (Some node) + | _ -> k None; failwith "dht_query ping" end ~kerr:(fun () -> k None) + +let find_node t addr h k ~kerr = dht_query t addr (FindNode h) begin fun node r -> + match r with Nodes l -> k node l + | _ -> kerr (); failwith "dht_query find_node" end ~kerr + +let get_peers t addr h k ~kerr = dht_query t addr (GetPeers h) begin fun node r -> + match r with Peers (token,peers,nodes) -> k node token peers nodes + | _ -> kerr (); failwith "dht_query get_peers" end ~kerr + +let announce t addr port token h k ~kerr = dht_query t addr (Announce (h,port,token)) begin fun node r -> + match r with Ack -> k node + | _ -> kerr (); failwith "dht_query announce" end ~kerr + +let store t info_hash addr = + let peers = try Hashtbl.find t.torrents info_hash with Not_found -> Peers.empty in + Hashtbl.replace t.torrents info_hash (Peers.add addr (BasicSocket.last_time () + store_peer_timeout) peers) + +let manage_timeouts enabler h = + BasicSocket.add_session_timer enabler 60. begin fun () -> + let now = BasicSocket.last_time () in + let torrents = Hashtbl.fold (fun k peers l -> (k,peers)::l) h [] in + let rm = ref 0 in + let total = ref 0 in + List.iter (fun (id,peers) -> + let m = Peers.fold (* removing is rare *) + (fun peer expire m -> incr total; if expire < now then (incr rm; Peers.remove peer m) else m) + peers peers + in + if Peers.is_empty m then Hashtbl.remove h id else Hashtbl.replace h id m + ) torrents; + log #info "Removed %d of %d peers for announced torrents" !rm !total + end + +let create rt dht_port bw_control answer = + let enabler = ref true in + let rpc = KRPC.create dht_port enabler bw_control answer in + let torrents = Hashtbl.create 8 in + manage_timeouts enabler torrents; + { rt = rt; rpc = rpc; torrents = torrents; dht_port = dht_port; enabler = enabler; } + +let shutdown dht = + dht.enabler := false; + KRPC.shutdown dht.rpc + +let peers_list f m = Peers.fold (fun peer tm l -> (f peer tm)::l) m [] +let self_get_peers t h = + let peers = peers_list (fun a _ -> a) (try Hashtbl.find t.torrents h with Not_found -> Peers.empty) in + if List.length peers <= 100 then + peers + else + let a = Array.of_list peers in + Array2.shuffle a; + Array.to_list (Array.sub a 0 100) + +let self_find_node t h = List.map (fun node -> node.id, node.addr) & Kademlia.find_node t.rt h + +end (* module M *) + +module Secret : sig + +type t +val create : time -> t +val get : t -> string +val valid : t -> string -> bool +val get_prev : t -> string + +end = struct + +type t = { mutable cur : string; mutable prev : string; timeout : time; mutable next : time; } +let make () = string_of_int (Random.int 1_000_000) +let create tm = + assert (tm > 0); + let s = make () in + { cur = s; prev = s; timeout = tm; next = now () + tm; } +let invalidate t = + if now () > t.next then + begin + t.prev <- t.cur; + t.cur <- make (); + t.next <- now () + t.timeout; + end +let get t = + invalidate t; + t.cur +let get_prev t = t.prev +let valid t s = + invalidate t; + s = t.cur || s = t.prev + +end + +let make_token addr h secret = string_of_int (Hashtbl.hash [show_addr addr; H.direct_to_string h; secret]) + +let valid_token addr h secret token = + token = make_token addr h (Secret.get secret) || + token = make_token addr h (Secret.get_prev secret) + +module LimitedSet = struct + +module type S = sig + +type elt +type t +val create : int -> t +(** @return whether the element was really added *) +val insert : t -> elt -> bool +val elements : t -> elt list +val iter : t -> (elt -> unit) -> unit +val min_elt : t -> elt + +end + +module Make(Ord:Set.OrderedType) : S with type elt = Ord.t = +struct + +module S = Set.Make(Ord) + +type elt = Ord.t +type t = int ref * S.t ref + +let create n = ref n, ref S.empty + +let insert (left,set) elem = + match S.mem elem !set with + | true -> false + | false -> + match !left with + | 0 -> + let max = S.max_elt !set in + if Ord.compare elem max < 0 then + begin set := S.add elem (S.remove max !set); true end + else + false + | n -> + set := S.add elem !set; + decr left; + true + +let iter (_,set) f = S.iter f !set + +let elements (_,set) = S.elements !set + +let min_elt (_,set) = S.min_elt !set + +end (* Make *) + +end (* LimitedSet *) + +let update dht st id addr = update (M.ping dht) dht.M.rt st id addr + +exception Break + +(** @param nodes nodes to start search from, will not be inserted into routing table *) +let lookup_node dht ?nodes target k = + log #info "lookup %s" (show_id target); + let start = BasicSocket.last_time () in + let module S = LimitedSet.Make(struct + type t = id * addr + let compare n1 n2 = Big_int.compare_big_int (distance target (fst n1)) (distance target (fst n2)) + end) in + let found = S.create Kademlia.bucket_nodes in + let queried = Hashtbl.create 13 in + let active = ref 0 in + let check_ready () = + if 0 = !active then + begin + let result = S.elements found in + log #info "lookup_node %s done, queried %d, found %d, elapsed %ds" + (show_id target) (Hashtbl.length queried) (List.length result) (BasicSocket.last_time () - start); + k result + end + in + let rec round nodes = + let inserted = List.fold_left (fun acc node -> if S.insert found node then acc + 1 else acc) 0 nodes in + begin try + let n = ref 0 in + S.iter found (fun node -> + if alpha = !n then raise Break; + if not (Hashtbl.mem queried node) then begin incr n; query true node end) + with Break -> () end; + inserted + and query store (id,addr as node) = + incr active; + Hashtbl.add queried node true; + log #info "will query node %s" (show_node node); + M.find_node dht addr target begin fun (id,addr as node) nodes -> + if store then update dht Good id addr; + decr active; + let inserted = round nodes in + let s = try sprintf ", best %s" (show_id (fst (S.min_elt found))) with _ -> "" in + log #info "got %d nodes from %s, useful %d%s" (List.length nodes) (show_node node) inserted s; + check_ready () + end ~kerr:(fun () -> decr active; log #info "timeout from %s" (show_node node); check_ready ()) + in + begin match nodes with + | None -> let (_:int) = round (M.self_find_node dht target) in () + | Some l -> List.iter (query false) l + end; + check_ready () + +let show_torrents torrents = + let now = BasicSocket.last_time () in + Hashtbl.iter (fun h peers -> + let l = M.peers_list (fun addr tm -> sprintf "%s (exp. %ds)" (show_addr addr) (tm - now)) peers in + lprintf_nl "torrent %s : %s" (H.to_hexa h) (String.concat " " l)) + torrents + +let show dht = show_table dht.M.rt; show_torrents dht.M.torrents + +let bootstrap dht host addr k = + M.ping dht addr begin function + | Some node -> + log #info "bootstrap node %s (%s) is up" (show_node node) host; + lookup_node dht ~nodes:[node] dht.M.rt.self (fun l -> + log #info "bootstrap via %s (%s) : found %s" (show_addr addr) host (strl show_node l); + k (List.length l >= Kademlia.bucket_nodes)) + | None -> + log #warn "bootstrap node %s (%s) is down" (show_addr addr) host; + k false + end + +let bootstrap dht (host,port) k = + Ip.async_ip host + (fun ip -> bootstrap dht host (ip,port) k) + (fun n -> log #warn "boostrap node %s cannot be resolved (%d)" host n; k false) + +let bootstrap ?(routers=[]) dht = + lookup_node dht dht.M.rt.self begin fun l -> + log #info "auto bootstrap : found %s" (strl show_node l); + let rec loop l ok = + match ok,l with + | true,_ -> log #user "bootstrap ok, total nodes : %d" (size dht.M.rt) + | false,[] -> log #warn "boostrap failed, total nodes : %d" (size dht.M.rt) + | false,(node::nodes) -> bootstrap dht node (loop nodes) + in + loop routers (List.length l >= Kademlia.bucket_nodes) + end + +let query_peers dht id k = + log #info "query_peers: start %s" (H.to_hexa id); + lookup_node dht id (fun nodes -> + log #info "query_peers: found nodes %s" (strl show_node nodes); +(* + let found = ref Peers.empty in + let check = + let left = ref (List.length nodes + 1) (* one immediate check *) in + fun () -> decr left; if 0 = !left then k (Peers.fold (fun peer () l -> peer :: l) !found []) + in +*) + List.iter begin fun node -> + M.get_peers dht (snd node) id begin fun node token peers nodes -> + log #info "query_peers: got %d peers and %d nodes from %s with token %S" + (List.length peers) (List.length nodes) (show_node node) token; + k node token peers; +(* + found := List.fold_left (fun acc peer -> Peers.add peer () acc) !found peers; + check () +*) + end + ~kerr:(fun () -> log #info "query_peers: get_peers error from %s" (show_node node)(*; check ()*)); +(* check () *) + end nodes) + +let start rt port bw_control = + let secret = Secret.create secret_timeout in + let rec dht = lazy (M.create rt port bw_control answer) + and answer addr name args = + try + let (id,q) = parse_query_exn name args in + let node = (id,addr) in + log #info "DHT query from %s : %s" (show_node node) (show_query q); + update !!dht Good id addr; + let response = + match q with + | Ping -> Ack + | FindNode h -> Nodes (M.self_find_node !!dht h) + | GetPeers h -> + let token = make_token addr h (Secret.get secret) in + let peers = M.self_get_peers !!dht h in + let nodes = M.self_find_node !!dht h in + log #info "answer with %d peers and %d nodes" (List.length peers) (List.length nodes); + Peers (token,peers,nodes) + | Announce (h,port,token) -> + if not (valid_token addr h secret token) then failwith "bad token in announce"; + M.store !!dht h (fst addr, port); + Ack + in + log #info "DHT response to %s : %s" (show_node node) (show_response response); + make_response (!!dht).M.rt.self response + with + exn -> log #warn ~exn "query %s from %s" name (show_addr addr); raise exn + in + let refresh () = + let ids = Kademlia.refresh (!!dht).M.rt in + log #info "will refresh %d buckets" (List.length ids); + let cb prev_id (id,addr as node) l = + update !!dht Good id addr; (* replied *) + if prev_id <> id then + begin + log #info "refresh: node %s changed id (was %s)" (show_node node) (show_id prev_id); + update !!dht Bad prev_id addr; + end; + log #info "refresh: got %d nodes from %s" (List.length l) (show_node node); + List.iter (fun (id,addr) -> update !!dht Unknown id addr) l + in + List.iter (fun (target, nodes) -> + List.iter (fun (id,addr) -> M.find_node !!dht addr target (cb id) ~kerr:(fun () -> ())) nodes) + ids + in + log #info "DHT size : %d self : %s" (size (!!dht).M.rt) (show_id (!!dht).M.rt.self); + BasicSocket.add_session_timer (!!dht).M.enabler 60. refresh; + !!dht + +let stop dht = M.shutdown dht + diff --git a/src/networks/bittorrent/kademlia.ml b/src/networks/bittorrent/kademlia.ml new file mode 100644 index 00000000..cdafa483 --- /dev/null +++ b/src/networks/bittorrent/kademlia.ml @@ -0,0 +1,459 @@ +(** Kademlia + + Petar Maymounkov and David Mazières + "Kademlia: A Peer-to-Peer Information System Based on the XOR Metric" + http://infinite-source.de/az/whitepapers/kademlia_optimized.pdf +*) + +let bucket_nodes = 8 + +(* do not use CommonOptions directly so that tools/bt_dht_node can be compiled separately *) +let verbose = ref false + +module H = Md4.Sha1 + +let log_prefix = "btkad" +let lprintf_nl fmt = Printf2.lprintf_nl2 log_prefix fmt + +type 'a pr = ?exn:exn -> ('a, unit, string, unit) format4 -> 'a +type level = [ `Debug | `Info | `User | `Warn | `Error ] + +class logger prefix = + let int_level = function + | `Debug -> 0 + | `Info -> 1 + | `User -> 2 + | `Warn -> 3 + | `Error -> 4 + in + let print_log limit prefix level ?exn fmt = + let put s = + let b = match level with + | 0 -> false + | 1 -> !verbose + | _ -> true + in + match b,exn with + | false, _ -> () + | true, None -> Printf2.lprintf_nl "[%s] %s" prefix s + | true, Some exn -> Printf2.lprintf_nl "[%s] %s : exn %s" prefix s (Printexc2.to_string exn) + in + Printf.ksprintf put fmt +in +object +val mutable limit = int_level `Info +method debug : 'a. 'a pr = fun ?exn fmt -> print_log limit prefix 0 ?exn fmt +method info : 'a. 'a pr = fun ?exn fmt -> print_log limit prefix 1 ?exn fmt +method user : 'a. 'a pr = fun ?exn fmt -> print_log limit prefix 2 ?exn fmt +method warn : 'a. 'a pr = fun ?exn fmt -> print_log limit prefix 3 ?exn fmt +method error : 'a. 'a pr = fun ?exn fmt -> print_log limit prefix 4 ?exn fmt +method allow (level:level) = limit <- int_level level +end + +let log = new logger log_prefix + +(** node ID type *) +type id = H.t +let show_id h = let s = H.to_hexa h in (String.sub s 0 7 ^ ".." ^ String.sub s 17 3) +type addr = Ip.t * int + +type time = int +let minutes n = 60 * n +let node_period = minutes 15 +type status = | Good | Bad | Unknown | Pinged +type node = { id : id; addr : addr; mutable last : time; mutable status : status; } +type bucket = { lo : id; hi : id; mutable last_change : time; mutable nodes : node array; } +(* FIXME better *) +type tree = L of bucket | N of tree * id * tree +type table = { mutable root : tree; self : id; } + +let now = BasicSocket.last_time +let diff t = Printf.sprintf "%d sec ago" (now () - t) + +let show_addr (ip,port) = Printf.sprintf "%s:%u" (Ip.to_string ip) port + +let show_status = function + | Good -> "good" + | Bad -> "bad" + | Unknown -> "unknown" + | Pinged -> "pinged" + +let show_node n = + Printf.sprintf "%s at %s was %s %s" + (show_id n.id) (show_addr n.addr) (show_status n.status) (diff n.last) + +let show_bucket b = + lprintf_nl "count : %d lo : %s hi : %s changed : %s" (Array.length b.nodes) (H.to_hexa b.lo) (H.to_hexa b.hi) (diff b.last_change); + Array.iter (fun n -> lprintf_nl " %s" (show_node n)) b.nodes + +let rec show_tree = function + | N (l,_,r) -> show_tree l; show_tree r + | L b -> show_bucket b + +let h2s h = + let s = H.direct_to_string h in + assert (String.length s = H.length); + s + +type cmp = LT | EQ | GT + +let cmp id1 id2 = + match String.compare (h2s id1) (h2s id2) with + | -1 -> LT + | 0 -> EQ + | 1 -> GT + | _ -> assert false + +(* boundaries inclusive *) +let inside node hash = not (cmp hash node.lo = LT || cmp hash node.hi = GT) + +let middle = + let s = String.make 20 (Char.chr 0xFF) in + s.[0] <- Char.chr 0x7F; + H.direct_of_string s + +let middle' = + let s = String.make 20 (Char.chr 0x00) in + s.[0] <- Char.chr 0x80; + H.direct_of_string s + +let last = + H.direct_of_string (String.make 20 (Char.chr 0xFF)) + +open Big_int + +let big_int_of_hash h = + let s = h2s h in + let n = ref zero_big_int in + for i = 0 to String.length s - 1 do + n := add_int_big_int (Char.code s.[i]) (mult_int_big_int 256 !n) + done; + !n + +let hash_of_big_int n = + let s = String.create H.length in + let n = ref n in + let div = big_int_of_int 256 in + for i = String.length s - 1 downto 0 do + let (d,m) = quomod_big_int !n div in + s.[i] <- Char.chr (int_of_big_int m); + n := d + done; + assert (eq_big_int zero_big_int !n); + H.direct_of_string s + +let big_int_2 = big_int_of_int 2 +(* hash <-> number *) +let h2n = big_int_of_hash +let n2h = hash_of_big_int + +let choose_random lo hi = + assert (cmp lo hi = LT); + let rec loop a b = + if cmp a b = EQ then a else + let mid = n2h (div_big_int (add_big_int (h2n a) (h2n b)) big_int_2) in + if Random.bool () then loop a mid else loop mid b + in + loop lo hi + +let split lo hi = + assert (cmp lo hi = LT); + let mid = div_big_int (add_big_int (h2n lo) (h2n hi)) big_int_2 in + n2h mid + +let succ h = + assert (cmp h last <> EQ); + n2h (succ_big_int (h2n h)) + +let distance h1 h2 = + let s1 = h2s h1 and s2 = h2s h2 in + let d = ref zero_big_int in + for i = 0 to H.length - 1 do + let x = Char.code s1.[i] lxor Char.code s2.[i] in + d := add_int_big_int x (mult_int_big_int 256 !d) + done; + !d + +let () = + assert (LT = cmp H.null middle); + assert (LT = cmp H.null middle'); + assert (LT = cmp H.null last); + assert (GT = cmp middle' middle); + assert (GT = cmp last middle'); + assert (GT = cmp last middle); + assert (EQ = cmp H.null H.null); + assert (EQ = cmp middle middle); + assert (EQ = cmp last last); + assert (n2h (h2n middle) = middle); + assert (n2h (h2n middle') = middle'); + assert (n2h (h2n last) = last); + assert (n2h (h2n H.null) = H.null); + assert (compare_big_int (h2n H.null) zero_big_int = 0); + assert (cmp (split H.null last) middle = EQ); + assert (eq_big_int (distance H.null last) (pred_big_int (power_int_positive_int 2 160))); + assert (eq_big_int (distance middle' middle) (pred_big_int (power_int_positive_int 2 160))); + () + +(* +module type Network = sig + type t + val ping : t -> addr -> (id -> bool -> unit) -> unit +end +*) + +(* module Make(T : Network) = struct *) + +exception Nothing + +let make_node id addr st = { id = id; addr = addr; last = now (); status = st; } +let mark n st = + log #info "mark [%s] as %s" (show_node n) (show_status st); + n.last <- now (); + n.status <- st +let touch b = b.last_change <- now () + +(* +let rec delete table id = + let rec loop = function + | N (l,mid,r) -> (match cmp id mid with LT | EQ -> N (loop l, mid, r) | GT -> N (l, mid, loop r)) + | L b -> + Array.iter (fun n -> + if cmp n.id id = EQ then +*) + +let rec update ping table st id data = +(* log #debug "insert %s" (show_id node.id); *) + let rec loop = function + | N (l,mid,r) -> (match cmp id mid with LT | EQ -> N (loop l, mid, r) | GT -> N (l, mid, loop r)) + | L b -> + Array.iteri begin fun i n -> + match cmp n.id id = EQ, n.addr = data with + | true, true -> mark n st; touch b; raise Nothing + | true, false | false, true -> + log #warn "conflict [%s] with %s %s, replacing" (show_node n) (show_id id) (show_addr data); + b.nodes.(i) <- make_node id data st; (* replace *) + touch b; + raise Nothing + | _ -> () + end b.nodes; + if Array.length b.nodes <> bucket_nodes then + begin + log #info "insert %s %s" (show_id id) (show_addr data); + b.nodes <- Array.of_list (make_node id data st :: Array.to_list b.nodes); + touch b; + raise Nothing + end; + Array.iteri (fun i n -> + if n.status = Good && now () - n.last > node_period then mark n Unknown; + if n.status = Bad || (n.status = Pinged && now () - n.last > node_period) then + begin + log #info "replace [%s] with %s" (show_node b.nodes.(i)) (show_id id); + b.nodes.(i) <- make_node id data st; (* replace *) + touch b; + raise Nothing + end) b.nodes; + match Array.fold_left (fun acc n -> if n.status = Unknown then n::acc else acc) [] b.nodes with + | [] -> + if inside b table.self && gt_big_int (distance b.lo b.hi) (big_int_of_int 256) then + begin + log #info "split %s %s" (H.to_hexa b.lo) (H.to_hexa b.hi); + let mid = split b.lo b.hi in + let (nodes1,nodes2) = List.partition (fun n -> cmp n.id mid = LT) (Array.to_list b.nodes) in + let new_node = N ( + L { lo = b.lo; hi = mid; last_change = b.last_change; nodes = Array.of_list nodes1; }, + mid, + L { lo = succ mid; hi = b.hi; last_change = b.last_change; nodes = Array.of_list nodes2; } ) + in + new_node + end + else + begin + log #info "bucket full (%s)" (show_id id); + raise Nothing + end + | unk -> + let count = ref (List.length unk) in + log #info "ping %d unknown nodes" !count; + let cb n = fun res -> + decr count; mark n (match res with Some _ -> Good | None -> Bad); + if !count = 0 then (* retry *) + begin + log #info "all %d pinged, retry %s" (List.length unk) (show_id id); + touch b; + update ping table st id data + end + in + List.iter (fun n -> mark n Pinged; ping n.addr (cb n)) unk; + raise Nothing + in + if id <> table.self then + try while true do table.root <- loop table.root done with Nothing -> () (* loop until no new splits *) + +let insert_node table node = +(* log #debug "insert %s" (show_id node.id); *) + let rec loop = function + | N (l,mid,r) -> (match cmp node.id mid with LT | EQ -> N (loop l, mid, r) | GT -> N (l, mid, loop r)) + | L b -> + Array.iter begin fun n -> + match cmp n.id node.id = EQ, n.addr = node.addr with + | true, true -> log #warn "insert_node: duplicate entry %s" (show_node n); raise Nothing + | true, false | false, true -> + log #warn "insert_node: conflict [%s] with [%s]" (show_node n) (show_node node); + raise Nothing + | _ -> () + end b.nodes; + if Array.length b.nodes <> bucket_nodes then + begin + b.nodes <- Array.of_list (node :: Array.to_list b.nodes); + raise Nothing + end; + if inside b table.self && gt_big_int (distance b.lo b.hi) (big_int_of_int 256) then + begin + let mid = split b.lo b.hi in + let (nodes1,nodes2) = List.partition (fun n -> cmp n.id mid = LT) (Array.to_list b.nodes) in + let last_change = List.fold_left (fun acc n -> max acc n.last) 0 in + let new_node = N ( + L { lo = b.lo; hi = mid; last_change = last_change nodes1; nodes = Array.of_list nodes1; }, + mid, + L { lo = succ mid; hi = b.hi; last_change = last_change nodes2; nodes = Array.of_list nodes2; } ) + in + new_node + end + else + begin + log #warn "insert_node: bucket full [%s]" (show_node node); + raise Nothing + end + in + try while true do table.root <- loop table.root done with Nothing -> () + +let all_nodes t = + let rec loop acc = function + | N (l,_,r) -> let acc = loop acc l in loop acc r + | L b -> Array.to_list b.nodes @ acc + in + loop [] t.root + +(* end *) + +let refresh table = + let expire = now () - node_period in + let rec loop acc = function + | N (l,_,r) -> let acc = loop acc l in loop acc r + | L b when b.last_change < expire -> + if Array2.exists (fun n -> n.status <> Bad) b.nodes then + let nodes = Array.map (fun n -> n.id, n.addr) b.nodes in + (choose_random b.lo b.hi, Array.to_list nodes) :: acc + else + acc (* do not refresh buckets with all bad nodes *) + | L _ -> acc + in + loop [] table.root + +let find_node t h = + let rec loop alt = function + | N (l,mid,r) -> (match cmp h mid with LT | EQ -> loop (r::alt) l | GT -> loop (l::alt) r) + | L b -> + let found = Array.to_list b.nodes in + if Array.length b.nodes = bucket_nodes then found + else found +(* FIXME + List.iter (fun node -> fold (fun acc b -> + let acc = Array.to_list b.nodes @ acc in + if List.length acc >= bucket_nodes then raise Nothing +*) + in + loop [] t.root + +let create () = { root = L { lo = H.null; hi = last; last_change = now (); nodes = [||]; }; + self = H.random (); + } + +let show_table t = + lprintf_nl "self : %s now : %d" (show_id t.self) (now ()); + show_tree t.root + +let rec fold f acc = function + | N (l,_,r) -> fold f (fold f acc l) r + | L b -> f acc b + +let size t = fold (fun acc b -> acc + Array.length b.nodes) 0 t.root + +(* +module NoNetwork : Network = struct + let ping addr k = k H.null (Random.bool ()) +end +module K = Make(NoNetwork) +*) + +let tt () = + let table = create () in + show_table table; + let addr = Ip.of_string "127.0.0.1", 9000 in + let ping addr k = k (if Random.bool () then Some (H.null,addr) else None) in + for i = 1 to 1_000_000 do + update ping table Good (H.random ()) addr + done; + show_table table + +module RoutingTableOption = struct + +open Options + +let value_to_status = function + | StringValue "good" -> Good + | StringValue "bad" -> Bad + | StringValue "pinged" -> Pinged + | StringValue "unknown" -> Unknown + | _ -> failwith "RoutingTableOption.value_to_status" + +let status_to_value = function + | Good -> string_to_value "good" + | Bad -> string_to_value "bad" + | Pinged -> string_to_value "pinged" + | Unknown -> string_to_value "unknown" + +let value_to_node = function + | Module props -> + let get cls s = from_value cls (List.assoc s props) in + { + id = H.of_hexa (get string_option "id"); + addr = (get Ip.option "ip", get port_option "port"); + last = get int_option "last"; + status = value_to_status (List.assoc "status" props); + } + | _ -> failwith "RoutingTableOption.value_to_node" + +let node_to_value n = + Module [ + "id", string_to_value (H.to_hexa n.id); + "ip", to_value Ip.option (fst n.addr); + "port", to_value port_option (snd n.addr); + "last", int_to_value n.last; + "status", status_to_value n.status; + ] + +let value_to_table v = + match v with + | Module props -> + let nodes = value_to_list value_to_node (List.assoc "nodes" props) in + let self = H.of_hexa (value_to_string (List.assoc "self" props)) in + let t = { root = L { lo = H.null; hi = last; last_change = 0; nodes = [||]; }; + self = self; } + in + List.iter (insert_node t) nodes; + if !verbose then show_table t; + t + | _ -> failwith "RoutingTableOption.value_to_table" + +let table_to_value t = + if !verbose then show_table t; + Module [ + "self", string_to_value (H.to_hexa t.self); + "nodes", list_to_value node_to_value (all_nodes t) + ] + +let t = define_option_class "RoutingTable" value_to_table table_to_value + +end + diff --git a/src/utils/cdk/array2.ml b/src/utils/cdk/array2.ml index 719e02e1..0cf067a0 100644 --- a/src/utils/cdk/array2.ml +++ b/src/utils/cdk/array2.ml @@ -61,4 +61,12 @@ let subarray_fold_lefti f x a firstidx lastidx = done; !r +(** Fisher-Yates shuffle *) +let shuffle a = + for i = Array.length a - 1 downto 1 do + let j = Random.int (i+1) in + let tmp = a.(j) in + a.(j) <- a.(i); + a.(i) <- tmp + done diff --git a/src/utils/cdk/list2.ml b/src/utils/cdk/list2.ml index 0dc7a92e..36542a9e 100644 --- a/src/utils/cdk/list2.ml +++ b/src/utils/cdk/list2.ml @@ -108,13 +108,7 @@ let max list = let shuffle list = let a = Array.of_list list in - let len = Array.length a in - for i = 0 to len-1 do - let p = Random.int (len-1) in - let tmp = a.(i) in - a.(i) <- a.(p); - a.(p) <- tmp; - done; + Array2.shuffle a; Array.to_list a let filter_map f = diff --git a/tools/bt_dht_node.ml b/tools/bt_dht_node.ml new file mode 100644 index 00000000..51d1d89d --- /dev/null +++ b/tools/bt_dht_node.ml @@ -0,0 +1,62 @@ +(** standalone DHT node *) + +open BT_DHT + +let bracket res destroy k = + let x = try k res with exn -> destroy res; raise exn in + destroy res; + x + +let with_open_in_bin file = bracket (open_in_bin file) close_in_noerr +let with_open_out_bin file = bracket (open_out_bin file) close_out_noerr + +let load file : Kademlia.table = with_open_in_bin file Marshal.from_channel + +let store file (t:Kademlia.table) = + let temp = file ^ ".tmp" in + try + with_open_out_bin temp (fun ch -> Marshal.to_channel ch t []; Unix2.fsync (Unix.descr_of_out_channel ch)); + Sys.rename temp file + with exn -> + log #warn ~exn "write to %S failed" file; Sys.remove temp + +let init file = try load file with _ -> Kademlia.create () + +let run_queries = + let ids = [| + "FA959F240D5859CAC30F32ECD21BD89F576481F0"; + "BDE98D04AB6BD6E8EA7440F82870E5191E130A84"; + "857224361969AE12066166539538F07BD5EF48B4"; + "81F643A195BBE3BB1DE1AC9184B9F84D74A37EFF"; + "7CC9963D90B54DF1710469743C1B43E0E20489C0"; + "C2C65A1AA5537406183F4D815C77A2A578B00BFB"; + "72F5A608AFBDF6111E5A86B337E9FC27D6020663"; + "FE73D74660695208F3ACD221B7A9A128A3D36D47"; + |] in + fun dht -> + let id = Kademlia.H.of_hexa ids.(Random.int (Array.length ids)) in + query_peers dht id (fun node token peers -> + log #info "run_queries : %s returned %d peers : %s" + (show_node node) (List.length peers) (strl Kademlia.show_addr peers)) + +let () = + Random.self_init (); + try + match Sys.argv with + | [|_;file;port|] -> + let bw = UdpSocket.new_bandwidth_controler + (TcpBufferedSocket.create_write_bandwidth_controler "UNLIMIT" 0) in + let dht = start (init file) (int_of_string port) bw in + let finish () = store file dht.M.rt; stop dht; exit 0 in + Sys.set_signal Sys.sigint (Sys.Signal_handle (fun _ -> show dht; finish ())); + Sys.set_signal Sys.sigterm (Sys.Signal_handle (fun _ -> show dht; finish ())); + Sys.set_signal Sys.sighup (Sys.Signal_handle (fun _ -> show dht)); + BasicSocket.add_infinite_timer 1800. (fun () -> run_queries dht); + BasicSocket.add_infinite_timer 3600. (fun () -> store file dht.M.rt); + let routers = ["router.utorrent.com", 6881; "router.transmission.com",6881] in + bootstrap dht ~routers; + BasicSocket.loop () + | _ -> Printf.eprintf "Usage : %s \n" Sys.argv.(0) + with + exn -> log #error "main : %s" (Printexc.to_string exn) + diff --git a/tools/make_torrent.ml b/tools/make_torrent.ml index d48487c2..2d85ba6b 100644 --- a/tools/make_torrent.ml +++ b/tools/make_torrent.ml @@ -48,15 +48,15 @@ let check_torrent () = end let _ = - Arg.parse [ - "-tracker", Arg.String ((:=) announce), - " : set the tracker to put in the torrent file"; - "-torrent", Arg.String ((:=) torrent_filename), - " : the .torrent file to use"; - "-comment", Arg.String ((:=) torrent_comment), - "\"\" : some comments on the torrent"; - "-private", Arg.Int ((:=) torrent_private), - "<0|1> : set the private flag"; + let args = [ + "-tracker", Arg.Set_string announce, + " set the tracker to put in the torrent file"; + "-torrent", Arg.Set_string torrent_filename, + " the .torrent file to use"; + "-comment", Arg.Set_string torrent_comment, + "\"\" some comments on the torrent"; + "-private", Arg.Set_int torrent_private, + "<0|1> set the private flag"; "-change", Arg.Unit (fun _ -> check_tracker (); @@ -70,7 +70,7 @@ let _ = let s = Bencode.encode encoded in File.from_string !torrent_filename s; Printf.printf "Torrent file of %s modified\n" (Sha1.to_hexa torrent_id); - ), ": change the tracker inside a .torrent file"; + ), " change the tracker inside a .torrent file"; "-print", Arg.Unit (fun filename -> check_torrent (); @@ -85,7 +85,7 @@ let _ = Printf.printf " length: %Ld\n" torrent.torrent_length; Printf.printf " encoding: %s\n" torrent.torrent_encoding; Printf.printf " tracker: %s\n" torrent.torrent_announce; - Printf.printf " private: %s\n" (Int64.to_string torrent.torrent_private); + Printf.printf " private: %s\n" (if torrent.torrent_private then "yes" else "no"); Printf.printf " piece size: %Ld\n" torrent.torrent_piece_size; Printf.printf " Pieces: %d\n" (Array.length torrent.torrent_pieces); Array.iteri (fun i s -> @@ -97,19 +97,19 @@ let _ = Printf.printf " %10Ld : %s\n" len s ) torrent.torrent_files; end; - ), ": change the tracker inside a .torrent file"; + ), " print the contents of a .torrent file"; "-create", Arg.String (fun filename -> check_tracker (); check_torrent (); try let hash = BTTorrent.generate_torrent !announce !torrent_filename !torrent_comment - (Int64.of_int !torrent_private) filename + (!torrent_private<>0) filename in Printf.printf "Torrent file generated : %s\n" (Sha1.to_hexa hash); with exn -> Printf.printf "Cannot create torrent : %s\n" (Printexc2.to_string exn); exit 2 - )," : compute hashes of filename(s) (can be a directory) and generate a .torrent file"; + )," compute hashes of filename(s) (can be a directory) and generate a .torrent file"; "-split", Arg.String (fun filename -> check_torrent (); @@ -142,7 +142,7 @@ let _ = iter zero torrent.torrent_files; Unix32.close bt_fd; - ), " : split a file corresponding to a .torrent file"; + ), " split a file corresponding to a .torrent file"; "-check", Arg.String (fun filename -> check_torrent (); @@ -196,8 +196,10 @@ let _ = Printf.printf "Torrent file verified !!!\n"; - ), " : check that is well encoded by a .torrent"; + ), " check that is well encoded by a .torrent"; ] + in + Arg.parse (Arg.align args) (fun s -> Printf.printf "Don't know what to do with %s\n" s; Printf.printf "Use --help to get some help\n"; diff --git a/tools/subconv.ml b/tools/subconv.ml index 4ae1f127..016040fc 100644 --- a/tools/subconv.ml +++ b/tools/subconv.ml @@ -230,7 +230,7 @@ let read_frames file concat_frame = while true do let line = input_line ic in - let i = int_of_string line in + let _i = int_of_string line in let line = input_line ic in let frame1, frame2 = -- 2.11.4.GIT