1 (* Copyright 2001, 2002 b52_simon :), b8_bavard, b8_fee_carabine, INRIA *)
3 This file is part of mldonkey.
5 mldonkey is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation; either version 2 of the License, or
8 (at your option) any later version.
10 mldonkey is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 GNU General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with mldonkey; if not, write to the Free Software
17 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
21 (** Functions used in client<->client communication
22 and also client<->tracker
25 (** A peer (or client) is always a remote peer in this file.
26 A Piece is a portion of the file associated with a hash (sha1).
27 In mldonkey a piece is referred as a block inside the swarming system.
28 A SubPiece is a portion of a piece (without hash) which can be
29 sent/downloaded to/from a peer.
30 In mldonkey a SubPiece is referred as a range inside the swarming system.
31 @see <http://wiki.theory.org/index.php/BitTorrentSpecification> wiki for some
32 unofficial (but more detailed) specs.
42 open TcpBufferedSocket
47 open CommonInteractive
63 module VB
= VerificationBitmap
65 let http_ok = "HTTP 200 OK"
66 let http11_ok = "HTTP/1.1 200 OK"
69 let next_uploaders = ref ([] : BTTypes.client list
)
70 let current_uploaders = ref ([] : BTTypes.client list
)
72 (** Check that client is valid and record it *)
73 let maybe_new_client file id ip port
=
74 let cc = Geoip.get_country_code_option ip
in
78 && (match !Ip.banned
(ip
, cc) with
81 if !verbose_connect
then
82 lprintf_file_nl
(as_file file
) "%s:%d blocked: %s" (Ip.to_string ip
) port reason
;
85 ignore
(new_client file id
(ip
,port
) cc);
86 if !verbose_sources
> 1 then
87 lprintf_file_nl
(as_file file
) "Received %s:%d" (Ip.to_string ip
) port
90 let resume_clients_hook = ref (fun _
-> assert false)
94 (* open modules locally *)
98 let string_of_event = function
99 | READ_DONE
-> "READ_DONE"
100 | WRITE_DONE
-> "WRITE_DONE"
101 | CAN_REFILL
-> "CAN_REFILL"
102 | BASIC_EVENT e
-> match e
with
103 | CLOSED reason
-> "CLOSED " ^
(string_of_reason reason
)
104 | RTIMEOUT
-> "RTIMEOUT"
105 | WTIMEOUT
-> "WTIMEOUT"
106 | LTIMEOUT
-> "LTIMEOUT"
107 | CAN_READ
-> "CAN_READ"
108 | CAN_WRITE
-> "CAN_WRITE"
110 (** talk to udp tracker and parse response
111 except of parsing should perform everything that
112 talk_to_tracker's inner function does FIXME refactor both
114 Better create single global udp socket and use it for all
115 tracker requests and distinguish trackers by txn? FIXME?
117 let talk_to_udp_tracker host port args file t need_sources
=
119 let socket = create
(Ip.to_inet_addr
!!client_bind_addr
) 0 (fun sock event
->
120 (* lprintf_nl "udpt got event %s for %s" (string_of_event event) host; *)
122 | WRITE_DONE
| CAN_REFILL
-> ()
123 | READ_DONE
-> assert false (* set_reader prevents this *)
124 | BASIC_EVENT x
-> match x
with
126 | CAN_READ
| CAN_WRITE
-> assert false (* udpSocket implementation prevents this *)
127 | LTIMEOUT
| WTIMEOUT
| RTIMEOUT
-> close sock
(Closed_for_error
"udpt timeout"))
130 set_reader socket begin fun _
->
132 lprintf_nl ~exn
"udpt interact with %s" host
;
133 close
socket (Closed_for_exception exn
)
136 BasicSocket.set_wtimeout
(sock
socket) 60.;
137 BasicSocket.set_rtimeout
(sock
socket) 60.;
138 let txn = Random.int32
Int32.max_int
in
139 (* lprintf_nl "udpt txn %ld for %s" txn host; *)
140 write
socket false (connect_request
txn) ip port
;
141 set_reader begin fun () ->
142 let p = read
socket in
143 let conn = connect_response
p.udp_content
txn in
144 (* lprintf_nl "udpt connection_id %Ld for %s" conn host; *)
145 let txn = Random.int32
Int32.max_int
in
146 (* lprintf_nl "udpt txn' %ld for host %s" txn host; *)
147 let int s
= Int64.of_string
(List.assoc s args
) in
148 let req = announce_request
conn txn
149 ~info_hash
:(List.assoc
"info_hash" args
)
150 ~peer_id
:(List.assoc
"peer_id" args
)
151 (int "downloaded",int "left",int "uploaded")
152 (match try List.assoc
"event" args
with Not_found
-> "" with
157 | s
-> lprintf_nl
"udpt event %s? for %s" s host
; 0l)
158 ~ip
:(if !!force_client_ip
then (Int64.to_int32
(Ip.to_int64
!!set_client_ip
)) else 0l)
159 ~numwant
:(if need_sources
then try Int32.of_string
(List.assoc
"numwant" args
) with _
-> -1l else 0l)
160 (int_of_string
(List.assoc
"port" args
))
162 write
socket false req ip port
;
163 set_reader (fun () ->
164 let p = read
socket in
166 t
.tracker_last_conn
<- last_time
();
167 file
.file_tracker_connected
<- true;
168 t
.tracker_interval
<- 600;
169 t
.tracker_min_interval
<- 600;
170 if need_sources
then t
.tracker_last_clients_num
<- 0;
172 let (interval
,clients
) = announce_response
p.udp_content
txn in
173 if !verbose_msg_servers
then
174 lprintf_nl
"udpt got interval %ld clients %d for host %s" interval
(List.length clients
) host
;
175 if interval
> 0l then
177 t
.tracker_interval
<- Int32.to_int interval
;
178 if t
.tracker_min_interval
> t
.tracker_interval
then
179 t
.tracker_min_interval
<- t
.tracker_interval
182 List.iter
(fun (ip'
,port
) ->
183 let ip = Ip.of_int64
(Int64.logand
0xFFFFFFFFL
(Int64.of_int32
ip'
)) in
184 (* lprintf_nl "udpt got %s:%d" (Ip.to_string ip) port; *)
185 t
.tracker_last_clients_num
<- t
.tracker_last_clients_num
+ 1;
186 maybe_new_client file
Sha1.null
ip port
188 close
socket Closed_by_user
;
189 if !verbose_msg_servers
then
190 lprintf_nl
"udpt interact done for %s" host
;
191 if need_sources
then !resume_clients_hook file
195 if !verbose_msg_servers
then
196 lprintf_nl
"udpt start with %s:%d" host port
;
197 Ip.async_ip host
(fun ip ->
198 (* lprintf_nl "udpt resolved %s to ip %s" host (Ip.to_string ip); *)
199 if not
(Ip.equal
Ip.localhost
ip) then
200 try interact ip with exn
-> lprintf_nl ~exn
"udpt interact with %s" host
201 else if !verbose_msg_servers
then
202 lprintf_nl
"udpt ignoring tracker %s (resolves to localhost)" host
)
204 if !verbose_msg_servers
then
205 lprintf_nl
"udpt failed to resolve %s" host
)
208 lprintf_nl ~exn
"udpt start"
213 In this function we connect to a tracker.
214 @param file The file concerned by the request
215 @param url Url of the tracker to connect
216 @param event Event (as a string) to send to the tracker :
217 can be 'completed' if the file is complete, 'started' for the first
218 connection to this tracker or 'stopped' for a clean stop of the file.
219 Everything else will be ok for a second connection to the tracker.
220 Be careful to the spelling of this event
221 @param f The function used to parse the result of the connection.
222 The function will get a file as an argument (@see talk_to_tracker
225 If we have less than !!ask_tracker_threshold sources
226 and if we respect the file_tracker_interval then
227 we really ask sources to the tracker
229 let connect_trackers file event need_sources f
=
231 (* reset session statistics when sending 'started' event *)
232 if event
= "started" then
234 file
.file_session_uploaded
<- Int64.zero
;
235 file
.file_session_downloaded
<- Int64.zero
;
238 let args,must_check_delay
, left
=
240 match file
.file_swarmer
with
244 | "started" -> [("event", "started")],true,zero
245 | "stopped" -> [("event", "stopped")],false,zero
250 let local_downloaded = CommonSwarming.downloaded swarmer
in
251 let left = file_size file
-- local_downloaded in
253 | "completed" -> [("event", "completed")],false,zero
254 | "started" -> [("event", "started")],true,left
255 | "stopped" -> [("event", "stopped")],false,left
259 let args = ("no_peer_id", "1") :: ("compact", "1") :: args in
261 if not need_sources
then
262 ("numwant", "0") :: args
263 else if !!numwant
> -1 then
264 ("numwant", string_of_int
!!numwant
) :: args
268 let args = if !!send_key
then
269 ("key", Sha1.to_hexa
!!client_uid
) :: args else args
271 let args = if !!force_client_ip
then
272 ("ip", Ip.to_string
!!set_client_ip
) :: args else args
275 ("info_hash", Sha1.direct_to_string file
.file_id
) ::
276 ("peer_id", Sha1.direct_to_string
!!client_uid
) ::
277 ("port", string_of_int
!!client_port
) ::
278 ("uploaded", Int64.to_string file
.file_session_uploaded
) ::
279 ("downloaded", Int64.to_string file
.file_session_downloaded
) ::
280 ("left", Int64.to_string
left) ::
284 let enabled_trackers =
285 let enabled_trackers = List.filter
(fun t
-> tracker_is_enabled t
) file
.file_trackers
in
286 if enabled_trackers <> [] then enabled_trackers
288 (* if there is no tracker left, do something ? *)
289 if !verbose_msg_servers
then
290 lprintf_nl
"No trackers left for %s, reenabling all of them..." (file_best_name
(as_file file
));
292 match t
.tracker_status
with
293 (* only re-enable after normal error *)
294 | Disabled _
-> t
.tracker_status
<- Enabled
295 | _
-> ()) file
.file_trackers
;
296 List.filter
(fun t
-> tracker_is_enabled t
) file
.file_trackers
301 (* if we have too few sources we may ask the tracker before the interval *)
302 if not must_check_delay
303 || not file
.file_tracker_connected
304 || t
.tracker_last_conn
+ t
.tracker_interval
< last_time
()
305 || ( file
.file_clients_num
< !!ask_tracker_threshold
306 && (file_state file
) == FileDownloading
307 && (if t
.tracker_min_interval
> !!min_tracker_reask_interval
then
308 t
.tracker_last_conn
+ t
.tracker_min_interval
< last_time
()
310 t
.tracker_last_conn
+ !!min_tracker_reask_interval
< last_time
() ))
313 (* if we already tried to connect but failed, disable tracker, but allow re-enabling *)
314 (* FIXME t.tracker_last_conn < 1 only at first connect, so later failures will stay undetected! *)
315 if file
.file_tracker_connected
&& t
.tracker_last_clients_num
= 0 && t
.tracker_last_conn
< 1 then
317 if !verbose_msg_servers
then
318 lprintf_nl
"Request error from tracker: disabling %s" (show_tracker_url t
.tracker_url
);
319 t
.tracker_status
<- Disabled
(intern
"MLDonkey: Request error from tracker")
321 (* Send request to tracker *)
323 let args = if String.length t
.tracker_id
> 0 then
324 ("trackerid", t
.tracker_id
) :: args else args
326 let args = if String.length t
.tracker_key
> 0 then
327 ("key", t
.tracker_key
) :: args else args
329 if !verbose_msg_servers
then
330 lprintf_nl
"connect_trackers: connected:%s id:%s key:%s last_clients:%i last_conn-last_time:%i numwant:%s file: %s"
331 (string_of_bool file
.file_tracker_connected
)
332 t
.tracker_id t
.tracker_key t
.tracker_last_clients_num
333 (t
.tracker_last_conn
- last_time
()) (try List.assoc
"numwant" args with _
-> "_") file
.file_name
;
335 match t
.tracker_url
with
337 let module H
= Http_client
in
340 H.req_url
= Url.of_string ~
args url
;
341 H.req_proxy
= !CommonOptions.http_proxy
;
342 H.req_user_agent
= get_user_agent
();
343 (* #4541 [egs] supports redirect *)
344 H.req_max_retry
= !!max_tracker_redirect
;
345 H.req_filter_ip
= (fun ip -> not
(Ip.equal
Ip.localhost
ip));
348 if !verbose_msg_servers
then
349 lprintf_nl
"Request sent to tracker %s for file: %s"
353 t
.tracker_last_conn
<- last_time
();
354 file
.file_tracker_connected
<- true;
356 | `Other url
-> assert false (* should have been disabled *)
357 | `Udp
(host
,port
) -> talk_to_udp_tracker host port
args file t need_sources
361 if !verbose_msg_servers
then
362 lprintf_nl
"Request NOT sent to tracker %s - next request in %ds for file: %s"
363 (show_tracker_url t
.tracker_url
) (t
.tracker_interval
- (last_time
() - t
.tracker_last_conn
)) file
.file_name
366 let connect_trackers file event need_sources f
=
367 if !!use_trackers
then connect_trackers file event need_sources f
370 set_client_upload
(as_client c
) (as_file c
.client_file
);
371 set_client_has_a_slot
(as_client c
) NormalSlot
;
372 Rate.update_no_change c
.client_downloaded_rate
;
373 Rate.update_no_change c
.client_upload_rate
;
374 c
.client_last_optimist
<- last_time
();
375 client_enter_upload_queue
(as_client c
);
376 send_client c Unchoke
378 (** In this function we decide which peers will be
379 uploaders. We send a choke message to current uploaders
380 that are not in the next uploaders list. We send Unchoke
381 for clients that are in next list (and not in current)
383 let recompute_uploaders () =
384 if !verbose_upload
then lprintf_nl
"recompute_uploaders";
385 next_uploaders := choose_uploaders current_files
;
386 (*Send choke if a current_uploader is not in next_uploaders*)
387 List.iter
( fun c
-> if ((List.mem c
!next_uploaders)==false) then
389 set_client_has_a_slot
(as_client c
) NoSlot
;
390 (*we will let him finish his download and choke him on next_request*)
392 ) !current_uploaders;
394 (*don't send Choke if new uploader is already an uploaders *)
396 if not
(List.mem c
!current_uploaders) then start_upload c
398 current_uploaders := !next_uploaders
401 (****** Fabrice: why are clients which are disconnected removed ???
402 These clients might still be useful to reconnect to, no ? *)
405 (** This function is called when a client is disconnected
406 (be it by our side or its side).
407 A client which disconnects (even only one time) is discarded.
408 If it's an uploader which disconnects we recompute uploaders
409 (see recompute_uploaders) immediately.
410 @param c The client to disconnect
411 @param reason The reason for the disconnection (see in BasicSocket.ml)
413 let disconnect_client c reason
=
414 if !verbose_msg_clients
then
415 lprintf_nl
"Client %d: disconnected: %s" (client_num c
) (string_of_reason reason
);
417 match c
.client_sock
with
419 | ConnectionWaiting token
->
421 c
.client_sock
<- NoConnection
425 (* List.iter (fun r -> CommonSwarming.free_range r) c.client_ranges; *)
426 set_client_disconnected c reason
;
427 c
.client_session_downloaded
<- 0L;
428 c
.client_session_uploaded
<- 0L;
429 (try if c
.client_good
then count_seen c
with _
-> ());
430 (* this is not useful already done in the match
431 (try close sock reason with _ -> ()); *)
432 (*---------not needed ?? VvvvvV---------------
433 c.client_ranges <- [];
434 c.client_block <- None;
435 if not c.client_good then
436 connection_failed c.client_connection_control;
437 c.client_good <- false;
438 c.client_sock <- NoConnection;
439 c.client_chunks <- [];
440 c.client_allowed_to_write <- zero;
441 c.client_new_chunks <- [];
442 c.client_interesting <- false;
443 c.client_alrd_sent_interested <- false;
444 -------------------^^^^^--------------------*)
445 if (c
.client_registered_bitfield
) then
447 match c
.client_uploader
with
450 c
.client_uploader
<- None
;
451 (* If the client registered a bitfield then
452 we must unregister him to update the swarmer
453 (Useful for availability)
455 CommonSwarming.unregister_uploader up
456 (* c.client_registered_bitfield <- false;
457 for i = 0 to String.length c.client_bitmap - 1 do
458 c.client_bitmap.[0] <- '0';
461 (* Don't test if a client have an upload slot because
462 it don't have one (removed during earlier in
463 set_client_disconnected c reason)
465 if (List.mem c
!current_uploaders) then
467 (*BTGlobals.remove_client*)
469 recompute_uploaders ();
477 (** Disconnect all clients of a file
478 @param file The file to which we must disconnects all clients
480 let disconnect_clients file
=
481 let must_keep = ref true in
482 (match file_state file
with
483 | FilePaused
| FileCancelled
-> must_keep:=false
486 Hashtbl.iter
(fun _ c
->
487 if not
( !must_keep && (client_has_a_slot
(as_client c
) || c
.client_interested
)) then
489 if !verbose_msg_clients
then
490 lprintf_file_nl
(as_file file
) "disconnect since download is finished";
491 disconnect_client c Closed_by_user
496 (** What to do when a file is finished
497 @param file the finished file
499 let download_finished file
=
500 if List.memq file
!current_files
then
502 connect_trackers file
"completed" false (fun _ _
->
503 lprintf_file_nl
(as_file file
) "Tracker return: completed %s" file
.file_name
;
504 ()); (*must be called before swarmer gets removed from file*)
505 (*CommonComplexOptions.file_completed*)
506 file_completed
(as_file file
);
507 (* Remove the swarmer for this file as it is not useful anymore... *)
508 CommonSwarming.remove_swarmer file
.file_swarmer
;
509 file
.file_swarmer
<- None
;
510 (* At this point, the file state is FileDownloaded. We should not remove
511 the file, because we continue to upload. *)
515 (** Check if a file is finished or not.
516 A file is finished if all blocks are verified.
517 @param file The file to check status
519 let check_finished swarmer file
=
520 if CommonSwarming.check_finished swarmer
then
521 download_finished file
523 let bits = [| 128; 64; 32;16;8;4;2;1 |]
525 (* Check/set bits in strings (bittorrent format) *)
528 (Char.code s
.[n
lsr 3]) land bits.(n
land 7) <> 0
532 s
.[i] <- Char.unsafe_chr
(Char.code s
.[i] lor bits.(n
land 7))
534 (* Official client seems to use max_range_request 5 and max_range_len 2^14 *)
535 (* How much requests in the 'pipeline' *)
536 let max_range_requests = 5
537 (* How much bytes we can request in one Piece *)
540 let s = String.make
8 '
\x00'
in
541 s.[7] <- (match !bt_dht
with None
-> '
\x00'
| Some _
-> '
\x01'
);
542 s.[5] <- '
\x10'
; (* TODO bep9, bep10, notify clients about extended*)
546 let send_init client_uid file_id sock
=
547 let buf = Buffer.create
100 in
548 buf_string8
buf "BitTorrent protocol";
549 Buffer.add_string
buf (reserved ());
550 Buffer.add_string
buf (Sha1.direct_to_string file_id
);
551 Buffer.add_string
buf (Sha1.direct_to_string client_uid
);
552 let s = Buffer.contents
buf in
555 (** A wrapper to send Interested message to a client.
556 (Send interested only if needed)
557 @param c The client to send Interested
559 let send_interested c
=
560 if c
.client_interesting
&& (not c
.client_alrd_sent_interested
) then
562 c
.client_alrd_sent_interested
<- true;
563 send_client c Interested
567 (** Send a Bitfield message to a client.
568 @param c The client to send the Bitfield message
571 let send_bitfield c
=
572 if not c
.client_file
.file_metadata_downloading
then
573 send_client c
(BitField
575 match c
.client_file
.file_swarmer
with
577 (* This must be a seeded file... *)
578 if !verbose_download
then
579 lprintf_nl
"Sending completed verified bitmap";
580 let nchunks = Array.length c
.client_file
.file_chunks
in
581 let len = (nchunks+7)/8 in
582 let s = String.make
len '
\000'
in
583 for i = 0 to nchunks - 1 do
588 let bitmap = CommonSwarming.chunks_verified_bitmap swarmer
in
589 if !verbose_download
then
590 lprintf_nl
"Sending verified bitmap: [%s]" (VB.to_string
bitmap);
591 let len = (VB.length
bitmap + 7)/8 in
592 let s = String.make
len '
\000'
in
594 if c
= VB.State_verified
then set_bit s i) bitmap;
600 let parse_reserved rbits c
=
601 let has_bit pos h
= Char.code rbits
.[pos
] land h
<> 0 in
603 c
.client_dht
<- has_bit 7 0x01;
604 c
.client_cache_extension
<- has_bit 7 0x02;
605 c
.client_fast_extension
<- has_bit 7 0x04;
607 c
.client_utorrent_extension
<- has_bit 5 0x10;
609 c
.client_azureus_messaging_protocol
<- has_bit 0 0x80
611 let send_extended_handshake c file
=
612 let module B
= Bencode
in
613 let msg = (B.encode
(B.Dictionary
[(* "e",B.Int 0L; *)
614 "m", (B.Dictionary
["ut_metadata", B.Int
1L]);
615 (* "metadata_size", B.Int (-1L) *)])) in begin
616 send_client c
(Extended
(Int64.to_int
0L, msg));
619 let send_extended_piece_request c piece file
=
620 let module B
= Bencode
in
621 let msg = (B.encode
(B.Dictionary
["msg_type", B.Int
0L; (* 0 is request subtype*)
622 "piece", B.Int piece
; ])) in begin
623 send_client c
(Extended
(Int64.to_int c
.client_ut_metadata_msg
, msg));
627 let (ip,port
) = c
.client_host
in
628 Printf.sprintf
"%s:%d %S" (Ip.to_string
ip) port
(brand_to_string c
.client_brand
)
630 (** This function is called to parse the first message that
632 @param counter client num
633 @param cc Expected client (probably useless now that we don't save any client)
634 @param init_sent A boolean to know if we sent this client the handshake message
635 @param gconn Don't know
636 @param sock The socket we use for this client
637 @param proto Unused (required by tuple type?)
638 @param file_id The file hash (sha1) of the file involved in this exchange
640 (* removed: @param peer_id The hash (sha1) of the client. (Should be checked)
642 let rec client_parse_header counter cc init_sent gconn sock
643 (proto
, rbits
, file_id
) =
645 set_lifetime sock
600.;
646 if !verbose_msg_clients
then
647 lprintf_nl
"client_parse_header %d" counter;
649 let file = Hashtbl.find files_by_uid file_id
in
650 if !verbose_msg_clients
then
651 lprintf_file_nl
(as_file
file) "file found";
652 let ccc, cc_country_code
= !cc in
656 let c = new_client
file Sha1.null
(TcpBufferedSocket.peer_addr sock
) cc_country_code
in
657 if !verbose_connect
then lprintf_file_nl
(as_file
file) "Client %d: incoming connection" (client_num
c);
658 cc := (Some
c), cc_country_code
;
661 (* Does it happen that this c was already used to connect successfully?
662 If yes then this must happen: *)
663 c.client_received_peer_id
<- false;
664 if cc_country_code
<> None
&& c.client_country_code
= None
then
665 c.client_country_code
<- cc_country_code
;
667 (* client could have had Sha1.null as peer_id/uid *)
668 (* this is to be done, later
669 if c.client_uid <> peer_id then
670 c.client_software <- (parse_software (Sha1.direct_to_string peer_id));
674 (* if c.client_uid <> peer_id then begin
675 lprintf "Unexpected client by UID\n";
676 let ccc = new_client file peer_id (TcpBufferedSocket.host sock) in
677 lprintf "CLIENT %d: testing instead of %d\n"
678 (client_num ccc) (client_num c);
679 (match ccc.client_sock with
681 lprintf_nl "[BT]: This client is already connected";
682 close sock (Closed_for_error "Already connected");
686 lprintf_nl "[BT]: Client %d: recovered by UID" (client_num ccc);
694 if !verbose_msg_clients
then
695 lprintf_nl
"Client %d: Connected from %s" (client_num
c) (show_client c);
697 parse_reserved rbits
c;
699 (match c.client_sock
with
701 if !verbose_msg_clients
then begin
702 let (ip,port
) = c.client_host
in
703 lprintf_nl
"No connection to client (%s:%d)!!!" (Ip.to_string
ip) port
;
705 c.client_sock
<- Connection sock
706 | ConnectionWaiting token
->
708 if !verbose_msg_clients
then
709 lprintf_nl
"Waiting for connection to client !!!";
710 c.client_sock
<- Connection sock
711 | Connection
s when s != sock
->
712 if !verbose_msg_clients
then
713 lprintf_nl
"CLIENT %d: IMMEDIATE RECONNECTION" (client_num
c);
714 disconnect_client c (Closed_for_error
"Reconnected");
715 c.client_sock
<- Connection sock
;
719 set_client_state
(c) (Connected
(-1));
720 if not init_sent
then
722 c.client_incoming
<- true;
723 send_init !!client_uid file_id sock
;
724 send_extended_handshake c file;
726 connection_ok
c.client_connection_control
;
727 if !verbose_msg_clients
then
728 lprintf_nl
"file and client found";
729 (* if not c.client_incoming then *)
730 send_bitfield c; (* BitField is always the first message *)
731 begin match c.client_dht
, !bt_dht
with
732 | true, Some dht
-> send_client
c (DHT_Port dht
.BT_DHT.M.dht_port
)
735 c.client_blocks_sent
<- file.file_blocks_downloaded
;
737 TODO !!! : send interested if and only if we are interested
738 -> we must recieve at least other peer bitfield.
739 in common swarmer -> compare : partition -> partition -> bool
742 set_rtimeout sock
!!client_timeout
;
743 (* Once parsed succesfully we define the function client_to_client
744 to be the function used when a message is read *)
745 gconn
.gconn_handler
<- Reader
(fun gconn sock
->
746 bt_handler
TcpMessages.parsing
(client_to_client
c) c sock
749 let b = TcpBufferedSocket.buf sock
in
750 (* The receive buffer is normally not empty now, lets parse the rest, most likely PeerID *)
752 ignore
(bt_handler
TcpMessages.parsing
(client_to_client
c) c sock
);
754 (* Some newer clients send more opcodes in their handshake packet, lets parse them now.
755 Using "while b.len <> 0 do ... done" is not possible here because libtorrent clients
756 send unparsable five extra bytes after their PeerID which would result into a loop *)
758 ignore
(bt_handler
TcpMessages.parsing
(client_to_client
c) c sock
);
762 let (ip,port
) = (TcpBufferedSocket.peer_addr sock
) in
763 if !verbose_unexpected_messages
then
764 lprintf_nl
"Client %s:%d requested a file that is not shared [%s]"
765 (Ip.to_string
ip) port
(Sha1.to_hexa file_id
)
767 lprintf_nl ~exn
"client_parse_header";
768 close sock
(Closed_for_exception exn
);
772 (** Update the bitmap of a client. Unclear if it is still useful.
773 @param c The client which we want to update.
775 and update_client_bitmap
c =
776 let file = c.client_file
in
778 let swarmer = match file.file_swarmer
with
780 | Some
swarmer -> swarmer
784 match c.client_uploader
with
786 let up = CommonSwarming.register_uploader
swarmer (as_client
c)
787 (AvailableIntervals
[]) in
788 c.client_uploader
<- Some
up;
794 let bitmap = match c.client_bitmap
with
796 let len = CommonSwarming.partition_size
swarmer in
797 let bitmap = Bitv.create
len false in
798 c.client_bitmap
<- Some
bitmap;
800 | Some
bitmap -> bitmap
803 if c.client_new_chunks
<> [] then begin
804 let chunks = c.client_new_chunks
in
805 c.client_new_chunks
<- [];
806 List.iter
(fun n
-> Bitv.set
bitmap n
true) chunks;
807 CommonSwarming.update_uploader_intervals
up (AvailableBitv
bitmap);
811 (** In this function we decide which piece we must request from client.
812 @param sock Socket of the client
815 and get_from_client sock
(c: client
) =
816 let file = c.client_file
in
817 (* Check if there's not enough requests in the 'pipeline'
818 and if a request can be send (not choked and file is downloading) *)
819 if List.length
c.client_ranges_sent
< max_range_requests
820 && file_state
file = FileDownloading
821 && (c.client_choked
== false)
823 (* num is the number of the piece, x and y are the position
824 of the subpiece in the piece(!), r is a (CommonSwarmer) range *)
826 let up = match c.client_uploader
with
829 let swarmer = CommonSwarming.uploader_swarmer
up in
835 if !verbose_msg_clients
then
836 lprintf_file_nl
(as_file
file) "CLIENT %d: Finding new range to send" (client_num
c);
838 if !verbose_swarming
then begin
839 lprintf_n
"Current download:\n Current chunks: ";
842 List.iter
(fun (x
,y
) -> lprintf
"%Ld-%Ld " x y
) c.client_chunks
843 with _
-> lprintf
"No Chunks";
847 lprintf_n
"Current ranges: ";
849 List.iter
(fun (p1
,p2
, r) ->
850 let (x
,y
) = CommonSwarming.range_range
r in
851 lprintf
"%Ld-%Ld[%Ld-%Ld] " p1 p2 x y
852 ) c.client_ranges_sent
;
854 match c.client_range_waiting
with
856 | Some
(x
,y
,r) -> lprintf
"Waiting %Ld-%Ld" x y
;
860 lprintf_n
"Current blocks: ";
862 match c.client_chunk
with
863 | None
-> lprintf
"none"
864 | Some
(chunk
, blocks
) -> List.iter
(fun b ->
865 CommonSwarming.print_block
b.up_block
) blocks
;
869 lprintf_file_nl
(as_file
file) "Finding Range:";
874 (*We must find a block to request first, and then
875 some range inside this block
880 match c.client_chunk
with
884 if !verbose_swarming
then lprintf_file_nl
(as_file
file) "No block";
885 update_client_bitmap
c;
886 (try CommonSwarming.verify_one_chunk
swarmer with _
-> ());
887 (*Find a free block in the swarmer*)
888 let chunk, blocks
= CommonSwarming.find_blocks
up in
889 if !verbose_swarming
then begin
890 lprintf_n
"Blocks Found: "; List.iter (fun b ->
891 CommonSwarming.print_block
b.up_block
) blocks
;
894 c.client_chunk
<- Some
(chunk, blocks
);
896 (*We put the found block in client_block to
897 request range in this block. (Useful for
898 not searching each time a new block)
903 | Some
(chunk, blocks
) ->
905 if !verbose_swarming
then begin
906 lprintf_n
"Current Blocks: "; List.iter (fun b ->
907 CommonSwarming.print_block
b.up_block
) blocks
;
912 (*Given a block find a range inside*)
914 match c.client_range_waiting
with
916 c.client_range_waiting
<- None
;
919 CommonSwarming.find_range
up (min max_range_len
file.file_piece_size
)
924 if y
-- x
> max_range_len
then begin
925 c.client_range_waiting
<- Some
(x
++ max_range_len
, y
, r);
926 (x
, x
++ max_range_len
, r)
931 c.client_ranges_sent
<- c.client_ranges_sent
@ [x
,y
, r];
932 (* CommonSwarming.alloc_range r; *)
934 (* naughty, naughty, was computing a block number instead of a chunk
935 number. Only matters with merged downloads, and even then other
936 clients didn't seem to care (?), so the bug remained hidden *)
937 if !verbose_swarming
then
938 lprintf_file_nl
(as_file
file) "Asking %d For Range %Ld-%Ld" chunk x y
;
940 chunk, x
-- file.file_piece_size
** Int64.of_int
chunk, y
-- x
, r
944 (*If we don't find a range to request inside the block,
945 iter to choose another block*)
946 if !verbose_swarming
then
947 lprintf_nl
"Could not find range in current block";
948 (* c.client_blocks <- List2.removeq b c.client_blocks; *)
950 c.client_chunk
<- None
;
958 (*If we don't find a block to request we can check if the
959 file is finished (if there's missing pieces we can't decide
960 that the file is finished because we didn't found
963 if !verbose_swarming
then
964 lprintf_nl
"Unable to get a block !!";
965 CommonSwarming.compute_bitmap
swarmer;
966 check_finished swarmer file;
970 send_client
c (Request
(num,x
,y
));
972 if !verbose_msg_clients
then
973 lprintf_file_nl
(as_file
file) "CLIENT %d: Asking %s For Range %Ld-%Ld"
974 (client_num
c) (Sha1.to_string
c.client_uid
) x y
977 if not
(CommonSwarming.check_finished swarmer) && !verbose_download
then
978 lprintf_file_nl
(as_file
file) "BTClient.get_from_client ERROR: can't find a block to download and file is not yet finished for file : %s..." file.file_name
981 (** In this function we match a message sent by a client
982 and react according to this message.
983 @param c The client which sent us a message
984 @param sock The socket used for this client
985 @param msg The message sent by the client
987 and client_to_client
c sock
msg =
988 if !verbose_msg_clients
then begin
989 let (ip,port
) = (TcpBufferedSocket.peer_addr sock
) in
990 let (timeout
, next
) = get_rtimeout sock
in
991 lprintf_nl
"CLIENT %d(%s:%d): (%d, %d,%d) Received %s"
992 (client_num
c) (Ip.to_string
ip) port
994 (int_of_float timeout
)
996 (TcpMessages.to_string
msg);
999 let file = c.client_file
in
1001 (* Sending the "Have" message was moved to bTGlobals so this is useless *)
1002 (* if c.client_blocks_sent != file.file_blocks_downloaded then begin
1006 | b :: tail when tail == c.client_blocks_sent ->
1007 c.client_blocks_sent <- list;
1008 let (num,_,_) = CommonSwarming.block_block b in
1009 send_client c (Have (Int64.of_int num))
1010 | _ :: tail -> iter tail
1012 iter file.file_blocks_downloaded
1017 | Piece
(num, offset
, s, pos
, len) ->
1018 (*A Piece message contains the data*)
1019 set_client_state
c (Connected_downloading
(file_num
file));
1020 (*flag it as a good client *)
1021 c.client_good
<- true;
1022 if file_state
file = FileDownloading
then begin
1023 let position = offset
++ file.file_piece_size
*.. num in
1024 let up = match c.client_uploader
with
1025 None
-> assert false
1027 let swarmer = CommonSwarming.uploader_swarmer
up in
1029 if !verbose_msg_clients
then
1030 (match c.client_ranges_sent
with
1031 [] -> lprintf_file_nl
(as_file
file) "EMPTY Ranges !!!"
1033 let (x
,y
) = CommonSwarming.range_range
r in
1034 lprintf_file_nl
(as_file
file) "Current range from %s : %Ld [%d] (asked %Ld-%Ld[%Ld-%Ld])"
1035 (show_client c) position len
1039 let old_downloaded =
1040 CommonSwarming.downloaded
swarmer in
1041 (* List.iter CommonSwarming.free_range c.client_ranges; *)
1042 CommonSwarming.received
up
1044 (* List.iter CommonSwarming.alloc_range c.client_ranges; *)
1045 let new_downloaded =
1046 CommonSwarming.downloaded
swarmer in
1048 (*Update rate and amount of data received from client*)
1049 count_download
c (new_downloaded -- old_downloaded);
1050 (* use len here with max_dr quickfix *)
1051 Rate.update
c.client_downloaded_rate ~amount
:len;
1052 (* count bytes downloaded from network for this file *)
1053 file.file_session_downloaded
<- file.file_session_downloaded
++ (Int64.of_int
len);
1054 if !verbose_msg_clients
then
1055 (match c.client_ranges_sent
with
1056 [] -> lprintf_file_nl
(as_file
file) "EMPTY Ranges !!!"
1058 let (x
,y
) = CommonSwarming.range_range
r in
1059 lprintf_file_nl
(as_file
file) "Received %Ld [%d] %Ld-%Ld[%Ld-%Ld] -> %Ld"
1062 (new_downloaded -- old_downloaded)
1065 (* changed 2.5.28 should have been done before !
1066 if new_downloaded <> old_downloaded then
1067 add_file_downloaded (as_file file)
1068 (new_downloaded -- old_downloaded); *)
1071 match c.client_ranges_sent
with
1074 (* CommonSwarming.free_range r; *)
1075 c.client_ranges_sent
<- tail
;
1077 get_from_client sock
c;
1079 (* Check if the client is still interesting for us... *)
1080 check_if_interesting
file c
1083 (* Disconnect if that is ourselves. *)
1084 c.client_uid
<- Sha1.direct_of_string
p;
1085 if not
(c.client_uid
= !!client_uid
) then
1087 let brand, release
= parse_software
p in
1088 c.client_brand
<- brand;
1089 c.client_release
<- release
;
1090 send_client
c Choke
;
1091 c.client_sent_choke
<- true;
1094 disconnect_client c Closed_by_user
1098 (*A bitfield is a summary of what a client have*)
1099 if !verbose_msg_clients
then
1100 lprintf_file_nl
(as_file
file) "Bitfield message, metadata state %B" c.client_file
.file_metadata_downloading
;
1101 if not
c.client_file
.file_metadata_downloading
then
1103 match c.client_file
.file_swarmer
with
1106 c.client_new_chunks
<- [];
1108 let npieces = CommonSwarming.partition_size
swarmer in
1109 let nbits = String.length
p * 8 in
1111 if nbits < npieces then begin
1112 lprintf_file_nl
(as_file
file) "Error: expected bitfield of atleast %d but got %d" npieces nbits;
1113 disconnect_client c (Closed_for_error
"Wrong bitfield length")
1116 let bitmap = CommonSwarming.chunks_verified_bitmap
swarmer in
1118 for i = 0 to npieces - 1 do
1119 if is_bit_set p i then begin
1120 c.client_new_chunks
<- i :: c.client_new_chunks
;
1121 match VB.get
bitmap i with
1122 | VB.State_missing
| VB.State_partial
->
1123 c.client_interesting
<- true
1124 | VB.State_complete
| VB.State_verified
-> ()
1128 update_client_bitmap
c;
1129 c.client_registered_bitfield
<- true;
1131 if c.client_interesting
then
1134 if !verbose_msg_clients
then
1135 lprintf_file_nl
(as_file
file) "New BitField Registered";
1137 (* for i = 1 to max_range_requests - List.length c.client_ranges do
1138 (try get_from_client sock c with _ -> ())
1144 (* Note: a bitfield must only be sent after the handshake and before everything else: NOT here *)
1147 (* A client can send a "Have" without sending a Bitfield *)
1148 if not
c.client_file
.file_metadata_downloading
then
1150 match c.client_file
.file_swarmer
with
1153 let n = Int64.to_int
n in
1154 let bitmap = CommonSwarming.chunks_verified_bitmap
swarmer in
1155 (* lprintf_nl "verified: %c;" (VB.state_to_char (VB.get bitmap n)); *)
1156 (* if the peer has a chunk we don't, tell him we're interested and update his bitmap *)
1157 match VB.get
bitmap n with
1158 | VB.State_missing
| VB.State_partial
->
1159 c.client_interesting
<- true;
1161 c.client_new_chunks
<- n :: c.client_new_chunks
;
1162 update_client_bitmap
c;
1163 | VB.State_complete
| VB.State_verified
-> ()
1166 match c.client_bitmap, c.client_uploader with
1167 Some bitmap, Some up ->
1168 let swarmer = CommonSwarming.uploader_swarmer up in
1169 let n = Int64.to_int n in
1170 if bitmap.[n] <> '1' then
1172 let verified = CommonSwarming.verified_bitmap swarmer in
1173 if verified.[n] < '2' then begin
1174 c.client_interesting <- true;
1176 c.client_new_chunks <- n :: c.client_new_chunks;
1177 if c.client_block = None then begin
1178 update_client_bitmap c;
1179 (* for i = 1 to max_range_requests -
1180 List.length c.client_ranges do
1181 (try get_from_client sock c with _ -> ())
1185 | None
, Some _
-> lprintf_nl
"no bitmap but client_uploader";
1186 | Some _
, None
->lprintf_nl
"bitmap but no client_uploader";
1187 | None
, None
-> lprintf_nl
"no bitmap no client_uploader";
1193 c.client_interested
<- true;
1197 set_client_state
(c) (Connected
(-1));
1198 (* remote peer will clear the list of range we sent *)
1200 match c.client_uploader
with
1202 (* Afaik this is no protocol violation and happens if the client
1203 didn't send a client bitmap after the handshake. *)
1204 if !verbose_msg_clients
then lprintf_file_nl
(as_file
file) "%s : Choke send, but no client bitmap"
1207 CommonSwarming.clear_uploader_intervals
up
1209 c.client_ranges_sent
<- [];
1210 c.client_range_waiting
<- None
;
1211 c.client_choked
<- true;
1215 c.client_interested
<- false;
1219 c.client_choked
<- false;
1220 (* remote peer cleared our request : re-request *)
1221 for i = 1 to max_range_requests -
1222 List.length
c.client_ranges_sent
do
1223 (try get_from_client sock
c with _
-> ())
1227 | Request
(n, pos
, len) ->
1228 if len > max_request_len
then begin
1229 close sock
(Closed_for_error
"Request longer than 1<<16");
1233 if !CommonGlobals.has_upload
= 0 then
1235 if client_has_a_slot
(as_client
c) then
1237 (* lprintf "Received request for upload\n"; *)
1238 (match c.client_upload_requests
with
1240 CommonUploads.ready_for_upload
(as_client
c);
1242 c.client_upload_requests
<- c.client_upload_requests
@ [n,pos
,len];
1243 let file = c.client_file
in
1244 match file.file_shared
with
1248 s.impl_shared_requests
<- s.impl_shared_requests
+ 1;
1249 shared_must_update
(as_shared
s)
1254 send_client
c Choke
;
1255 c.client_sent_choke
<- true;
1256 c.client_upload_requests
<- [];
1261 (* We don't 'generate' a Ping message on a Ping. *)
1263 | Cancel
(n, pos
, len) ->
1264 (* if we receive a cancel message from a peer, remove request *)
1265 if client_has_a_slot
(as_client
c) then
1266 c.client_upload_requests
<- List2.remove_first
(n, pos
, len) c.client_upload_requests
1268 if !verbose_msg_clients
then
1269 lprintf_file_nl
(as_file
file) "Error: received cancel request but client has no slot"
1271 | Extended
(extmsg
, payload
) ->
1272 (* extmsg: 0 handshake, N other message previously declared in handshake.
1273 atm ignore extended messages if were not currently in metadata state.
1274 TODO when were not in metadata state we should be friendly and answer metadata requests
1276 let module B
= Bencode
in
1277 if file.file_metadata_downloading
then begin
1278 (* since we got at least one extended handshake from the peer, it should be okay to
1279 send a handshake back now. we need to send it so the remote client knows how
1280 to send us messages back.
1281 this should of course be moved but I dont know where yet.
1282 also we shouldnt send more than one handshake of course...
1284 if !verbose_msg_clients
then
1285 lprintf_file_nl
(as_file
file) "Got extended msg: %d %s" extmsg
(String.escaped payload
);
1289 if !verbose_msg_clients
then
1290 lprintf_file_nl
(as_file
file) "Got extended handshake";
1291 let dict = Bencode.decode payload
in begin
1293 B.Dictionary list
->
1294 List.iter (fun (key
,value) ->
1295 match key
, value with
1296 | "metadata_size", B.Int
n ->
1297 if !verbose_msg_clients
then
1298 lprintf_file_nl
(as_file
file) "Got metadata size %Ld" n;
1299 c.client_file
.file_metadata_size
<- n;
1300 | "m", B.Dictionary mdict
->
1301 if !verbose_msg_clients
then
1302 lprintf_file_nl
(as_file
file) "Got meta dict";
1303 List.iter (fun (key
,value) ->
1304 match key
, value with
1305 "ut_metadata", B.Int
n ->
1306 if !verbose_msg_clients
then
1307 lprintf_file_nl
(as_file
file) "ut_metadata is %Ld " n;
1308 c.client_ut_metadata_msg
<- n;
1314 (* okay so now we know what to ask for, so ask for metadata now
1315 since metadata can be larger than 16k which is the limit, the transfer needs to be chunked, so
1316 it is not really right to make the query here. but its a start.
1317 also im just asking for piece 0.
1318 (we should also check that we actually got the metadata info before proceeding)
1320 send_extended_handshake c file;
1321 send_extended_piece_request c c.client_file
.file_metadata_piece
file;
1324 | 0x01 -> (* ut_metadata is 1 because we asked it to be 1 in the handshake
1325 the msg_type is probably
1327 but could be 0 for request(unlikely since we didnt advertise we had the meta)
1328 2 for reject, also unlikely since peers shouldnt advertise if they dont have(but will need handling in the end)
1330 {'msg_type': 1, 'piece': 0, 'total_size': 3425}
1331 after the dict comes the actual piece
1333 if !verbose_msg_clients
then
1334 lprintf_file_nl
(as_file
file) "Got extended ut_metadata message";
1335 let msgtype = ref 0L in begin
1337 match B.decode payload
with
1338 B.Dictionary list
->
1339 List.iter (fun (key
,value) ->
1340 match key
, value with
1341 "msg_type", B.Int
n ->
1342 if !verbose_msg_clients
then
1343 lprintf_file_nl
(as_file
file) "msg_type %Ld" n;
1345 | "piece", B.Int
n ->
1346 if !verbose_msg_clients
then
1347 lprintf_file_nl
(as_file
file) "piece %Ld" n;
1348 file.file_metadata_piece
<- n;
1349 | "total_size", B.Int
n ->
1350 if !verbose_msg_clients
then
1351 lprintf_file_nl
(as_file
file) "total_size %Ld" n; (* should always be the same as received in the initial handshake i suppose *)
1358 let last_piece_index = (Int64.div
file.file_metadata_size
16384L) in
1359 if !verbose_msg_clients
then
1360 lprintf_file_nl
(as_file
file) "handling metadata piece %Ld of %Ld"
1361 file.file_metadata_piece
1363 (* store the metadata piece in memory *)
1364 file.file_metadata_chunks
.(1 + (Int64.to_int
file.file_metadata_piece
)) <- payload
;
1365 (* possibly write metadata to disk *)
1366 if file.file_metadata_piece
>=
1367 (Int64.div
file.file_metadata_size
16384L) then begin
1368 if !verbose_msg_clients
then
1369 lprintf_file_nl
(as_file
file) "this was the last piece";
1370 (* here we should simply delete the current download, and wait for mld to pick up the new torrent file *)
1371 (* the entire payload is currently in the array, TODO *)
1372 let newtorrentfile = (Printf.sprintf
"%s/BT-%s.torrent"
1373 (Filename2.temp_dir_name
())
1374 (Sha1.to_string
file.file_id
)) in
1375 let fd = Unix32.create_rw
newtorrentfile in
1376 let fileindex = ref 0L in
1378 (* the ee is so we can use the same method to find the
1379 start of the payload for the real payloads as well as the synthetic ones
1381 file.file_metadata_chunks
.(0) <- "eed4:info";
1382 file.file_metadata_chunks
.(2 + Int64.to_int
last_piece_index) <- "eee";
1384 Array.iteri
(fun index
chunk ->
1385 (* regexp ee is a fugly way to find the end of the 1st dict before the real payload *)
1386 let metaindex = (2 + (Str.search_forward
(Str.regexp_string
"ee") chunk 0 )) in
1387 let chunklength = ((String.length
chunk) - metaindex) in
1388 Unix32.write
fd !fileindex chunk
1391 fileindex := Int64.add
!fileindex (Int64.of_int
chunklength);
1393 ) file.file_metadata_chunks
;
1395 (* TODO ignoring errors for now, the array isnt really set up right anyway yet *)
1397 lprintf_file_nl (as_file file) "Error %s saving metadata"
1398 (Printexc2.to_string e)
1401 (* Yay, now the new torrent is on disk! amazing! However, now we need to kill the dummy torrent
1402 and restart it with the fresh real torrent *)
1404 (* it seems we need to use the dynamic interface... *)
1406 lprintf_file_nl
(as_file
file) "cancelling metadata download ";
1407 let owner = file.file_file
.impl_file_owner
in
1408 let group = file.file_file
.impl_file_group
in begin
1409 CommonInteractive.file_cancel
(as_file
file) owner ;
1410 (* hack_op_file_cancel c.client_file; *)
1412 lprintf_file_nl
(as_file
file) "starting download from metadata torrent %s" newtorrentfile ;
1413 ignore
(CommonNetwork.network_parse_url
BTGlobals.network
newtorrentfile owner group);
1415 (try Sys.remove
newtorrentfile with _
-> ())
1420 (* now ask for the next metadata piece, if any *)
1421 let nextpiece = (Int64.succ
file.file_metadata_piece
) in begin
1422 if !verbose_msg_clients
then
1423 lprintf_file_nl
(as_file
file) "asking for the next piece %Ld" nextpiece;
1424 send_extended_piece_request c nextpiece file;
1428 if !verbose_msg_clients
then
1429 lprintf_file_nl
(as_file
file) "unmatched extended subtype" ;
1433 if !verbose_msg_clients
then
1434 lprintf_file_nl
(as_file
file) "Got extended other msg ";
1440 if !verbose_msg_clients
then
1441 lprintf_file_nl
(as_file
file) "Received DHT PORT when DHT is disabled. From %s" (show_client c)
1443 BT_DHT.M.ping dht
(fst
c.client_host
, port
) begin function
1446 lprintf_file_nl
(as_file
file) "Peer %s didn't reply to DHT ping on port %d" (show_client c) port
1448 BT_DHT.update dht
Kademlia.Good id addr
1452 lprintf_file_nl
(as_file
file) "Error %s while handling MESSAGE: %s" (Printexc2.to_string e
) (TcpMessages.to_string
msg)
1455 (** The function used to connect to a client.
1456 The connection is not immediately initiated. It will
1457 be put in a fifo and dequeued according to
1458 !!max_connections_per_second. (@see commonGlobals.ml)
1459 @param c The client we must connect
1461 let connect_client c =
1462 if can_open_connection connection_manager
&&
1463 (let (ip,port
) = c.client_host
in
1464 match !Ip.banned
(ip, c.client_country_code
) with
1467 if !verbose_connect
then
1468 lprintf_nl
"%s:%d (%s), blocked: %s"
1469 (Ip.to_string
ip) port
1470 (fst
(Geoip.get_country_code_name
c.client_country_code
))
1474 match c.client_sock
with
1478 add_pending_connection connection_manager
(fun token ->
1480 if !verbose_msg_clients
then
1481 lprintf_nl
"CLIENT %d: connect_client" (client_num
c);
1482 let (ip,port
) = c.client_host
in
1483 if !verbose_msg_clients
then
1484 lprintf_nl
"connecting %s:%d" (Ip.to_string
ip) port
;
1485 connection_try
c.client_connection_control
;
1487 let sock = connect
token "bittorrent download"
1488 (Ip.to_inet_addr
ip) port
1491 BASIC_EVENT LTIMEOUT
->
1492 if !verbose_msg_clients
then
1493 lprintf_nl
"CLIENT %d: LIFETIME" (client_num
c);
1494 close
sock Closed_for_timeout
1495 | BASIC_EVENT RTIMEOUT
->
1496 if !verbose_msg_clients
then
1497 lprintf_nl
"CLIENT %d: RTIMEOUT (%d)" (client_num
c)
1500 close
sock Closed_for_timeout
1501 | BASIC_EVENT
(CLOSED
r) ->
1503 match c.client_sock
with
1504 | Connection
s when s == sock ->
1505 disconnect_client c r
1511 c.client_sock
<- Connection
sock;
1512 set_lifetime
sock 600.;
1513 TcpBufferedSocket.set_read_controler
sock download_control
;
1514 TcpBufferedSocket.set_write_controler
sock upload_control
;
1515 TcpBufferedSocket.set_rtimeout
sock 30.;
1516 let file = c.client_file
in
1518 if !verbose_msg_clients
then
1519 lprintf_file_nl
(as_file
file) "READY TO DOWNLOAD FILE";
1521 send_init !!client_uid
file.file_id
sock;
1522 send_extended_handshake c file;
1524 (* Fabrice: Initialize the client bitmap and uploader fields to <> None *)
1525 update_client_bitmap
c;
1526 (* (try get_from_client sock c with _ -> ());*)
1528 (*We 'hook' the client_parse_header function to the socket
1529 This function will then be called when the first message will
1532 set_bt_sock
sock !verbose_msg_clients
1533 (BTHeader
(client_parse_header !counter (ref ((Some
c), c.client_country_code
)) true))
1536 lprintf_nl ~exn
"connecting to client";
1537 disconnect_client c (Closed_for_exception exn
)
1539 (*Since this is a pending connection put ConnectionWaiting
1543 c.client_sock
<- ConnectionWaiting
token
1547 (** The Listen function (very much like in C : TCP Socket Server).
1548 Monitors client connection to us.
1552 let s = TcpServerSocket.create
"bittorrent client server"
1553 (Ip.to_inet_addr
!!client_bind_addr
)
1557 TcpServerSocket.CONNECTION
(s,
1558 Unix.ADDR_INET
(from_ip
, from_port
)) ->
1559 (*Receiving an event TcpServerSocket.CONNECTION from
1560 the TcpServerSocket means that a new client try
1563 let ip = (Ip.of_inet_addr from_ip
) in
1564 let cc = Geoip.get_country_code_option
ip in
1565 if !verbose_sources
> 1 then lprintf_nl
"CONNECTION RECEIVED FROM %s"
1566 (Ip.to_string
(Ip.of_inet_addr from_ip
))
1568 (*Reject this connection if we don't want
1569 to bypass the max_connection parameter
1571 if can_open_connection connection_manager
&&
1572 (match !Ip.banned
(ip, cc) with
1575 if !verbose_connect
then
1576 lprintf_nl
"%s:%d (%s) blocked: %s"
1577 (Ip.to_string
ip) from_port
1578 (fst
(Geoip.get_country_code_name
cc))
1583 let token = create_token connection_manager
in
1584 let sock = TcpBufferedSocket.create
token
1585 "bittorrent client connection" s
1588 BASIC_EVENT
(RTIMEOUT
|LTIMEOUT
) ->
1589 (*monitor read and life timeout on client
1592 close
sock Closed_for_timeout
1596 TcpBufferedSocket.set_read_controler
sock download_control
;
1597 TcpBufferedSocket.set_write_controler
sock upload_control
;
1599 let c = ref (None
, cc) in
1600 TcpBufferedSocket.set_closer
sock (fun _
r ->
1603 match c.client_sock
with
1604 | Connection
s when s == sock ->
1605 disconnect_client c r
1610 set_rtimeout
sock 30.;
1612 (*Again : 'hook' client_parse_header to the socket*)
1613 set_bt_sock
sock !verbose_msg_clients
1614 (BTHeader
(client_parse_header !counter c false));
1617 (*don't forget to close the incoming sock if we can't
1618 open a new connection
1623 listen_sock
:= Some
s;
1626 if !verbose_connect
then
1627 lprintf_nl ~exn
"init bittorrent server"
1630 (** This function send keepalive messages to all connected clients
1631 (and update socket lifetime)
1634 List.iter (fun file ->
1635 Hashtbl.iter (fun _
c ->
1636 match c.client_sock
with
1637 | Connection
sock ->
1639 set_lifetime
sock 130.;
1647 (** Check each clients for a given file if they are connected.
1648 If they aren't, try to connect them
1650 let resume_clients file =
1651 Hashtbl.iter (fun _
c ->
1653 match c.client_sock
with
1654 | Connection
sock -> ()
1655 (*i think this one is not really usefull for debugging
1656 lprintf_nl "[BT]: RESUME: Client is already connected"; *)
1659 (*test if we can connect client according to the its
1661 Currently the delay between two try is 120 seconds.
1663 if connection_can_try
c.client_connection_control
then
1666 print_control
c.client_connection_control
1669 if !verbose_connect
then
1670 lprintf_file_nl ~exn
(as_file
file) "resume_clients"
1674 resume_clients_hook := resume_clients
1676 (** Check if the value replied by the tracker is correct.
1677 @param key the name of the key
1678 @param n the value to check
1679 @param url Url of the tracker
1680 @param name the name of the file
1682 let chk_keyval key
n url name
=
1683 let int_n = (Int64.to_int
n) in
1684 if !verbose_msg_clients
then
1685 lprintf_nl
"Reply from %s in file: %s has %s: %d" (show_tracker_url url
) name key
int_n;
1689 lprintf_nl
"Reply from %s in file: %s has an invalid %s value: %d" (show_tracker_url url
) name key
int_n;
1693 let exn_catch f x
= try `Ok
(f x
) with exn
-> `Exn exn
1695 (** In this function we interact with the tracker
1696 @param file The file for which we want some sources
1697 @param need_sources whether we need any sources
1699 let talk_to_tracker file need_sources
=
1700 (* This is the function which will be called by the http client for parsing the response *)
1702 let tracker_url = show_tracker_url t
.tracker_url in
1703 let tracker_failed reason
=
1704 (* On failure, disable the tracker and count attempts (@see is_tracker_enabled) *)
1705 let num = match t
.tracker_status
with | Disabled_failure
(i,_
) -> i + 1 | _
-> 1 in
1706 t
.tracker_status
<- Disabled_failure
(num, intern reason
);
1707 lprintf_file_nl
(as_file
file) "Failure no. %d%s from Tracker %s for file: %s Reason: %s"
1709 (if !!tracker_retries
= 0 then "" else Printf.sprintf
"/%d" !!tracker_retries
)
1710 tracker_url file.file_name
(Charset.Locale.to_utf8 reason
)
1712 match exn_catch File.to_string filename
with
1713 | `Exn _
| `Ok
"" -> tracker_failed "empty reply"
1715 match exn_catch Bencode.decode
s with
1716 | `Exn exn
-> tracker_failed (Printf.sprintf
"wrong reply (%s)" (Printexc2.to_string exn
))
1717 | `Ok
(Dictionary list
) ->
1718 t
.tracker_interval
<- 600;
1719 t
.tracker_min_interval
<- 600;
1720 if need_sources
then t
.tracker_last_clients_num
<- 0;
1721 let chk_keyval key
n = chk_keyval key
n t
.tracker_url file.file_name
in
1722 if not
(List.mem_assoc
"failure reason" list
) then
1724 begin match t
.tracker_status
with
1725 | Disabled_failure
(i, _
) ->
1726 lprintf_file_nl
(as_file
file) "Received good message from Tracker %s after %d bad attempts"
1729 (* Received good message from tracker after failures, re-enable tracker *)
1730 t
.tracker_status
<- Enabled
;
1732 List.iter (fun (key
,value) ->
1733 match (key
,value) with
1734 | "failure reason", String failure
-> tracker_failed failure
1735 | "warning message", String warning
->
1736 lprintf_file_nl
(as_file
file) "Warning from Tracker %s in file: %s Reason: %s"
1737 tracker_url file.file_name warning
1738 | "interval", Int
n ->
1739 t
.tracker_interval
<- chk_keyval key
n;
1740 (* in case we don't receive "min interval" *)
1741 if t
.tracker_min_interval
> t
.tracker_interval
then
1742 t
.tracker_min_interval
<- t
.tracker_interval
1743 | "min interval", Int
n ->
1744 t
.tracker_min_interval
<- chk_keyval key
n;
1745 (* make sure "min interval" is always < or equal to "interval" *)
1746 if t
.tracker_min_interval
> t
.tracker_interval
then
1747 t
.tracker_min_interval
<- t
.tracker_interval
1748 | "downloaded", Int
n ->
1749 t
.tracker_torrent_downloaded
<- chk_keyval key
n
1751 | "done peers", Int
n ->
1752 t
.tracker_torrent_complete
<- chk_keyval key
n
1753 | "incomplete", Int
n ->
1754 t
.tracker_torrent_incomplete
<- chk_keyval key
n;
1755 (* if complete > 0 and we receive incomplete we probably won't receive num_peers so we simulate it below *)
1756 if t
.tracker_torrent_complete
> 0 then
1757 t
.tracker_torrent_total_clients_count
<- (t
.tracker_torrent_complete
+ t
.tracker_torrent_incomplete
);
1758 | "num peers", Int
n ->
1759 t
.tracker_torrent_total_clients_count
<- chk_keyval key
n;
1760 (* if complete > 0 and we receive num_peers we probably won't receive incomplete so we simulate it below *)
1761 if t
.tracker_torrent_complete
> 0 then
1762 t
.tracker_torrent_incomplete
<- (t
.tracker_torrent_total_clients_count
- t
.tracker_torrent_complete
);
1764 t
.tracker_torrent_last_dl_req
<- chk_keyval key
n
1765 | "key", String
n ->
1767 if !verbose_msg_clients
then
1768 lprintf_file_nl
(as_file
file) "%s in file: %s has key: %s" tracker_url file.file_name
n
1769 | "tracker id", String
n ->
1771 if !verbose_msg_clients
then
1772 lprintf_file_nl
(as_file
file) "%s in file: %s has tracker id %s" tracker_url file.file_name
n
1774 | "peers", List list
->
1775 if need_sources
then
1778 | Dictionary list
->
1779 let peer_id = ref Sha1.null
in
1780 let peer_ip = ref Ip.null
in
1785 "peer id", String id
->
1786 peer_id := Sha1.direct_of_string id
;
1787 | "ip", String
ip ->
1788 peer_ip := Ip.of_string
ip
1790 port := Int64.to_int
p
1794 t
.tracker_last_clients_num
<- t
.tracker_last_clients_num
+ 1;
1795 maybe_new_client file !peer_id !peer_ip !port
1799 | "peers", String
p ->
1800 let rec iter_comp s pos l
=
1802 let ip = Ip.of_ints
(get_uint8
s pos
,get_uint8
s (pos
+1),
1803 get_uint8
s (pos
+2),get_uint8
s (pos
+3))
1804 and port = get_int16
s (pos
+4)
1806 t
.tracker_last_clients_num
<- t
.tracker_last_clients_num
+ 1;
1807 maybe_new_client file Sha1.null
ip port;
1809 iter_comp s (pos
+6) l
1811 if need_sources
then
1812 iter_comp p 0 (String.length
p)
1813 | "private", Int
n -> ()
1814 (* TODO: if set to 1, disable peer exchange *)
1816 (* TODO IPv6 support required *)
1817 | key
, _
-> lprintf_file_nl
(as_file
file) "received unknown entry in answer from tracker: %s : %s" key
(Bencode.print
value)
1819 (*Now, that we have added new clients to a file, it's time
1820 to connect to them*)
1821 if !verbose_sources
> 0 then
1822 lprintf_file_nl
(as_file
file) "talk_to_tracker: got %i source(s) for file %s"
1823 t
.tracker_last_clients_num
file.file_name
;
1824 if need_sources
then resume_clients file
1826 | _
-> tracker_failed "wrong reply (value)"
1829 if file.file_tracker_connected
then ""
1832 connect_trackers file event need_sources
f
1834 let talk_to_dht file need_sources
=
1838 if !verbose
then lprintf_file_nl
(as_file
file) "DHT announce";
1839 file.file_last_dht_announce
<- last_time
();
1840 BT_DHT.query_peers dht
file.file_id
(fun (_
,addr
as node
) token peers
->
1841 BT_DHT.M.announce dht addr
!!client_port
token file.file_id
(fun _
-> ()) ~kerr
:(fun () ->
1842 if !verbose
then lprintf_file_nl
(as_file
file) "DHT announce to %s failed" (BT_DHT.show_node node
));
1843 if need_sources
then
1845 List.iter (fun (ip,port) -> maybe_new_client file Sha1.null
ip port) peers
;
1849 let talk_to_tracker file need_sources
=
1850 if file.file_last_dht_announce
+ 14*60 < last_time
() && not
file.file_private
then talk_to_dht file need_sources
;
1851 talk_to_tracker file need_sources
1853 (** Check to see if file is finished, if not
1854 try to get sources for it
1856 let recover_files () =
1857 if !verbose_share
then
1858 lprintf_nl
"recover_files";
1859 List.iter (fun file ->
1860 match file.file_swarmer
with
1863 (try check_finished swarmer file with e
-> ());
1864 match file_state
file with
1866 if !verbose_share
then
1867 lprintf_file_nl
(as_file
file) "recover downloading";
1868 (try talk_to_tracker file true with _
-> ())
1870 if !verbose_share
then
1871 lprintf_file_nl
(as_file
file) "recover shared";
1872 (try talk_to_tracker file false with _
-> ())
1873 | FilePaused
-> () (*when we are paused we do nothing, not even logging this vvvv*)
1875 | s -> if !verbose
then lprintf_file_nl
(as_file
file) "recover: Other state %s!!" (string_of_state
s)
1878 let upload_buffer = String.create
100000
1882 Send a Piece message
1883 for one of the request of client
1884 @param sock The socket of the client
1887 let rec iter_upload sock c =
1888 match c.client_upload_requests
with
1890 | (num, pos
, len) :: tail
->
1891 if len = zero
then begin
1892 c.client_upload_requests
<- tail
;
1895 if c.client_allowed_to_write
>= 0L then begin
1897 c.client_upload_requests
<- tail
;
1899 let file = c.client_file
in
1900 let offset = pos
++ file.file_piece_size
*.. num in
1901 c.client_allowed_to_write
<- c.client_allowed_to_write
-- len;
1903 let len = Int64.to_int
len in
1904 (* lprintf "Unix32.read: offset %Ld len %d\n" offset len; *)
1905 Unix32.read
(file_fd
file) offset upload_buffer 0 len;
1906 (* update upload rate from len bytes *)
1907 Rate.update
c.client_upload_rate ~amount
:len;
1908 Rate.update
c.client_downloaded_rate
;
1909 file.file_uploaded
<- file.file_uploaded
++ (Int64.of_int
len);
1910 file.file_session_uploaded
<- file.file_session_uploaded
++ (Int64.of_int
len);
1913 count_filerequest
c;
1914 match file.file_shared
with
1918 s.impl_shared_uploaded
<- file.file_uploaded
;
1919 shared_must_update
(as_shared
s)
1922 (* lprintf "sending piece\n"; *)
1923 send_client
c (Piece
(num, pos
, upload_buffer, 0, len));
1927 lprintf_nl ~exn
"iter_upload"
1930 (* lprintf "client is waiting for another piece\n"; *)
1931 ready_for_upload
(as_client
c)
1936 In this function we check if we can send bytes (according
1937 to bandwidth control), if we can, call iter_upload to
1938 send a Piece message
1939 @param c the client to which we can send some bytes
1940 @param allowed the amount of bytes we can send to client
1942 let client_can_upload c allowed
=
1943 (* lprintf "allowed to upload %d\n" allowed; *)
1944 do_if_connected
c.client_sock
(fun sock ->
1945 match c.client_upload_requests
with
1948 let new_allowed_to_write =
1949 c.client_allowed_to_write
++ (Int64.of_int allowed
) in
1950 if allowed
> 0 && can_write_len
sock
1951 (Int64.to_int
new_allowed_to_write)
1953 CommonUploads.consume_bandwidth allowed
;
1954 c.client_allowed_to_write
<- new_allowed_to_write;
1959 let file_resume file =
1961 match t
.tracker_status
with
1962 | Enabled
| Disabled_mld
_ -> ()
1963 | Disabled_failure
_ | Disabled
_ -> t
.tracker_status
<- Enabled
1964 ) file.file_trackers
;
1965 (try talk_to_tracker file true with _ -> ())
1970 Send info to tracker when stopping a file.
1971 @param file the file we want to stop
1973 let file_stop file =
1974 if file.file_tracker_connected
then
1976 connect_trackers file "stopped" false (fun _ _ ->
1977 lprintf_file_nl
(as_file
file) "Tracker return: stopped %s" file.file_name
;
1978 file.file_tracker_connected
<- false)
1985 client_ops
.op_client_can_upload
<- client_can_upload;
1986 file_ops
.op_file_resume
<- file_resume;
1987 file_ops
.op_file_recover
<- file_resume;
1988 file_ops
.op_file_pause
<- (fun file ->
1989 Hashtbl.iter (fun _ c ->
1990 match c.client_sock
with
1991 Connection
sock -> close
sock Closed_by_user
1993 ) file.file_clients
;
1994 (*When a file is paused we consider it is stopped*)
1997 file_ops
.op_file_queue
<- file_ops
.op_file_pause
;
1998 client_ops
.op_client_enter_upload_queue
<- (fun c ->
1999 if !verbose_msg_clients
then
2000 lprintf_nl
"Client %d: client_enter_upload_queue" (client_num
c);
2001 ready_for_upload
(as_client
c));
2002 network
.op_network_connected_servers
<- (fun _ -> []);