1 (* Copyright 2001, 2002 b52_simon :), b8_bavard, b8_fee_carabine, INRIA *)
3 This file is part of mldonkey.
5 mldonkey is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation; either version 2 of the License, or
8 (at your option) any later version.
10 mldonkey is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 GNU General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with mldonkey; if not, write to the Free Software
17 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
21 (** Functions used in client<->client communication
22 and also client<->tracker
25 (** A peer (or client) is always a remote peer in this file.
26 A Piece is a portion of the file associated with a hash (sha1).
27 In mldonkey a piece is referred as a block inside the swarming system.
28 A SubPiece is a portion of a piece (without hash) which can be
29 sent/downloaded to/from a peer.
30 In mldonkey a SubPiece is referred as a range inside the swarming system.
31 @see <http://wiki.theory.org/index.php/BitTorrentSpecification> wiki for some
32 unofficial (but more detailed) specs.
42 open TcpBufferedSocket
49 open CommonInteractive
51 open CommonComplexOptions
68 module VB
= VerificationBitmap
70 let http_ok = "HTTP 200 OK"
71 let http11_ok = "HTTP/1.1 200 OK"
74 let next_uploaders = ref ([] : BTTypes.client list
)
75 let current_uploaders = ref ([] : BTTypes.client list
)
77 (** Check that client is valid and record it *)
78 let maybe_new_client file id ip port
=
79 let cc = Geoip.get_country_code_option ip
in
83 && (match !Ip.banned
(ip
, cc) with
86 if !verbose_connect
then
87 lprintf_file_nl
(as_file file
) "%s:%d blocked: %s" (Ip.to_string ip
) port reason
;
90 ignore
(new_client file id
(ip
,port
) cc);
91 if !verbose_sources
> 1 then
92 lprintf_file_nl
(as_file file
) "Received %s:%d" (Ip.to_string ip
) port
95 let resume_clients_hook = ref (fun _
-> assert false)
99 (* open modules locally *)
103 let string_of_event = function
104 | READ_DONE
-> "READ_DONE"
105 | WRITE_DONE
-> "WRITE_DONE"
106 | CAN_REFILL
-> "CAN_REFILL"
107 | BASIC_EVENT e
-> match e
with
108 | CLOSED reason
-> "CLOSED " ^
(string_of_reason reason
)
109 | RTIMEOUT
-> "RTIMEOUT"
110 | WTIMEOUT
-> "WTIMEOUT"
111 | LTIMEOUT
-> "LTIMEOUT"
112 | CAN_READ
-> "CAN_READ"
113 | CAN_WRITE
-> "CAN_WRITE"
115 (** talk to udp tracker and parse response
116 except of parsing should perform everything that
117 talk_to_tracker's inner function does FIXME refactor both
119 Better create single global udp socket and use it for all
120 tracker requests and distinguish trackers by txn? FIXME?
122 let talk_to_udp_tracker host port args file t need_sources
=
124 lprintf_nl
"udpt start with %s:%d" host port
;
125 let addr = try (Unix.gethostbyname host
).Unix.h_addr_list
.(0) with exn
-> failwith
("failed to resolve " ^ host
) in
126 let ip = Ip.of_inet_addr
addr in
127 lprintf_nl
"udpt resolved to ip %s" (Ip.to_string
ip);
128 let socket = create
Unix.inet_addr_any
0 (fun sock event
->
129 lprintf_nl
"udpt got event %s for %s" (string_of_event event
) host
131 | WRITE_DONE
| CAN_REFILL
-> ()
132 | READ_DONE
-> assert false (* set_reader prevents this *)
133 | BASIC_EVENT x
-> match x
with
135 | CAN_READ
| CAN_WRITE
-> assert false (* udpSocket implementation prevents this *)
136 | LTIMEOUT
| WTIMEOUT
| RTIMEOUT
-> close sock
(Closed_for_error
"udpt timeout"))
138 BasicSocket.set_wtimeout
(sock
socket) 5.;
139 BasicSocket.set_rtimeout
(sock
socket) 5.;
140 let txn = Random.int32
Int32.max_int
in
141 lprintf_nl
"udpt txn %ld for %s" txn host
;
142 write
socket false (connect_request
txn) ip port
;
143 set_reader
socket (fun _
->
144 let p = read
socket in
145 let conn = connect_response
p.udp_content
txn in
146 lprintf_nl
"udpt connection_id %Ld for %s" conn host
;
147 let txn = Random.int32
Int32.max_int
in
148 lprintf_nl
"udpt txn' %ld for host %s" txn host
;
149 let int s
= Int64.of_string
(List.assoc s args
) in
150 let req = announce_request
conn txn
151 ~info_hash
:(List.assoc
"info_hash" args
)
152 ~peer_id
:(List.assoc
"peer_id" args
)
153 (int "downloaded",int "left",int "uploaded")
154 (match List.assoc
"event" args
with
159 | s
-> lprintf_nl
"udpt event %s? for %s" s host
; 0l)
160 ~numwant
:(try Int32.of_string
(List.assoc
"numwant" args
) with _
-> -1l)
161 (int_of_string
(List.assoc
"port" args
))
163 write
socket false req ip port
;
164 set_reader
socket (fun _
->
165 let p = read
socket in
167 t
.tracker_last_conn
<- last_time
();
168 file
.file_tracker_connected
<- true;
169 t
.tracker_interval
<- 600;
170 t
.tracker_min_interval
<- 600;
171 if need_sources
then t
.tracker_last_clients_num
<- 0;
173 let (interval
,clients
) = announce_response
p.udp_content
txn in
174 lprintf_nl
"udpt got interval %ld clients %d for host %s" interval
(List.length clients
) host
;
175 if interval
> 0l then
177 t
.tracker_interval
<- Int32.to_int interval
;
178 if t
.tracker_min_interval
> t
.tracker_interval
then
179 t
.tracker_min_interval
<- t
.tracker_interval
181 List.iter
(fun (ip'
,port
) ->
182 let ip = Ip.of_int64
(Int64.logand
0xFFFFFFFFL
(Int64.of_int32
ip'
)) in
183 lprintf_nl
"udpt got %s:%d" (Ip.to_string
ip) port
;
184 t
.tracker_last_clients_num
<- t
.tracker_last_clients_num
+ 1;
185 maybe_new_client file
Sha1.null
ip port
187 close
socket Closed_by_user
;
188 lprintf_nl
"udpt interact done for %s" host
;
189 if need_sources
then !resume_clients_hook file
193 lprintf_nl
"udpt interact exn %s" (Printexc2.to_string exn
)
198 In this function we connect to a tracker.
199 @param file The file concerned by the request
200 @param url Url of the tracker to connect
201 @param event Event (as a string) to send to the tracker :
202 can be 'completed' if the file is complete, 'started' for the first
203 connection to this tracker or 'stopped' for a clean stop of the file.
204 Everything else will be ok for a second connection to the tracker.
205 Be careful to the spelling of this event
206 @param f The function used to parse the result of the connection.
207 The function will get a file as an argument (@see talk_to_tracker
210 If we have less than !!ask_tracker_threshold sources
211 and if we respect the file_tracker_interval then
212 we really ask sources to the tracker
214 let connect_trackers file event need_sources f
=
216 (* reset session statistics when sending 'started' event *)
217 if event
= "started" then
219 file
.file_session_uploaded
<- Int64.zero
;
220 file
.file_session_downloaded
<- Int64.zero
;
223 let args,must_check_delay
, left
=
225 match file
.file_swarmer
with
229 | "started" -> [("event", "started")],true,zero
230 | "stopped" -> [("event", "stopped")],false,zero
235 let local_downloaded = CommonSwarming.downloaded swarmer
in
236 let left = file_size file
-- local_downloaded in
238 | "completed" -> [("event", "completed")],false,zero
239 | "started" -> [("event", "started")],true,left
240 | "stopped" -> [("event", "stopped")],false,left
244 let args = ("no_peer_id", "1") :: ("compact", "1") :: args in
246 if not need_sources
then
247 ("numwant", "0") :: args
248 else if !!numwant
> -1 then
249 ("numwant", string_of_int
!!numwant
) :: args
253 let args = if !!send_key
then
254 ("key", Sha1.to_hexa
!!client_uid
) :: args else args
256 let args = if !!force_client_ip
then
257 ("ip", Ip.to_string
!!set_client_ip
) :: args else args
260 ("info_hash", Sha1.direct_to_string file
.file_id
) ::
261 ("peer_id", Sha1.direct_to_string
!!client_uid
) ::
262 ("port", string_of_int
!!client_port
) ::
263 ("uploaded", Int64.to_string file
.file_session_uploaded
) ::
264 ("downloaded", Int64.to_string file
.file_session_downloaded
) ::
265 ("left", Int64.to_string
left) ::
269 let enabled_trackers =
270 let enabled_trackers = List.filter
(fun t
-> tracker_is_enabled t
) file
.file_trackers
in
271 if enabled_trackers <> [] then enabled_trackers
273 (* if there is no tracker left, do something ? *)
274 if !verbose_msg_servers
then
275 lprintf_nl
"No trackers left for %s, reenabling all of them..." (file_best_name
(as_file file
));
277 match t
.tracker_status
with
278 (* only re-enable after normal error *)
279 | Disabled _
-> t
.tracker_status
<- Enabled
280 | _
-> ()) file
.file_trackers
;
281 let enabled_trackers = List.filter
(fun t
-> tracker_is_enabled t
) file
.file_trackers
in
282 if enabled_trackers = [] && (file_state file
) <> FilePaused
then
284 file_pause
(as_file file
) (CommonUserDb.admin_user
());
285 lprintf_file_nl
(as_file file
) "Paused %s, no usable trackers left" (file_best_name
(as_file file
))
292 (* if we have too few sources we may ask the tracker before the interval *)
293 if not must_check_delay
294 || not file
.file_tracker_connected
295 || t
.tracker_last_conn
+ t
.tracker_interval
< last_time
()
296 || ( file
.file_clients_num
< !!ask_tracker_threshold
297 && (file_state file
) == FileDownloading
298 && (if t
.tracker_min_interval
> !!min_tracker_reask_interval
then
299 t
.tracker_last_conn
+ t
.tracker_min_interval
< last_time
()
301 t
.tracker_last_conn
+ !!min_tracker_reask_interval
< last_time
() ))
304 (* if we already tried to connect but failed, disable tracker, but allow re-enabling *)
305 (* FIXME t.tracker_last_conn < 1 only at first connect, so later failures will stay undetected! *)
306 if file
.file_tracker_connected
&& t
.tracker_last_clients_num
= 0 && t
.tracker_last_conn
< 1 then
308 if !verbose_msg_servers
then
309 lprintf_nl
"Request error from tracker: disabling %s" (show_tracker_url t
.tracker_url
);
310 t
.tracker_status
<- Disabled
(intern
"MLDonkey: Request error from tracker")
312 (* Send request to tracker *)
314 let args = if String.length t
.tracker_id
> 0 then
315 ("trackerid", t
.tracker_id
) :: args else args
317 let args = if String.length t
.tracker_key
> 0 then
318 ("key", t
.tracker_key
) :: args else args
320 if !verbose_msg_servers
then
321 lprintf_nl
"connect_trackers: connected:%s id:%s key:%s last_clients:%i last_conn-last_time:%i numwant:%s file: %s"
322 (string_of_bool file
.file_tracker_connected
)
323 t
.tracker_id t
.tracker_key t
.tracker_last_clients_num
324 (t
.tracker_last_conn
- last_time
()) (try List.assoc
"numwant" args with _
-> "_") file
.file_name
;
326 match t
.tracker_url
with
328 let module H
= Http_client
in
331 H.req_url
= Url.of_string ~
args: args url
;
332 H.req_proxy
= !CommonOptions.http_proxy
;
333 H.req_user_agent
= get_user_agent
();
334 (* #4541 [egs] supports redirect *)
335 H.req_max_retry
= !!max_tracker_redirect
;
338 if !verbose_msg_servers
then
339 lprintf_nl
"Request sent to tracker %s for file: %s"
343 t
.tracker_last_conn
<- last_time
();
344 file
.file_tracker_connected
<- true;
346 | `Other url
-> assert false (* should have been disabled *)
347 | `Udp
(host
,port
) -> talk_to_udp_tracker host port
args file t need_sources
351 if !verbose_msg_servers
then
352 lprintf_nl
"Request NOT sent to tracker %s - next request in %ds for file: %s"
353 (show_tracker_url t
.tracker_url
) (t
.tracker_interval
- (last_time
() - t
.tracker_last_conn
)) file
.file_name
357 set_client_upload
(as_client c
) (as_file c
.client_file
);
358 set_client_has_a_slot
(as_client c
) NormalSlot
;
359 Rate.update_no_change c
.client_downloaded_rate
;
360 Rate.update_no_change c
.client_upload_rate
;
361 c
.client_last_optimist
<- last_time
();
362 client_enter_upload_queue
(as_client c
);
363 send_client c Unchoke
365 (** In this function we decide which peers will be
366 uploaders. We send a choke message to current uploaders
367 that are not in the next uploaders list. We send Unchoke
368 for clients that are in next list (and not in current)
370 let recompute_uploaders () =
371 if !verbose_upload
then lprintf_nl
"recompute_uploaders";
372 next_uploaders := choose_uploaders current_files
;
373 (*Send choke if a current_uploader is not in next_uploaders*)
374 List.iter
( fun c
-> if ((List.mem c
!next_uploaders)==false) then
376 set_client_has_a_slot
(as_client c
) NoSlot
;
377 (*we will let him finish his download and choke him on next_request*)
379 ) !current_uploaders;
381 (*don't send Choke if new uploader is already an uploaders *)
383 if not
(List.mem c
!current_uploaders) then start_upload c
385 current_uploaders := !next_uploaders
388 (****** Fabrice: why are clients which are disconnected removed ???
389 These clients might still be useful to reconnect to, no ? *)
392 (** This function is called when a client is disconnected
393 (be it by our side or its side).
394 A client which disconnects (even only one time) is discarded.
395 If it's an uploader which disconnects we recompute uploaders
396 (see recompute_uploaders) immediately.
397 @param c The client to disconnect
398 @param reason The reason for the disconnection (see in BasicSocket.ml)
400 let disconnect_client c reason
=
401 if !verbose_msg_clients
then
402 lprintf_nl
"Client %d: disconnected: %s" (client_num c
) (string_of_reason reason
);
404 match c
.client_sock
with
406 | ConnectionWaiting token
->
408 c
.client_sock
<- NoConnection
412 (* List.iter (fun r -> CommonSwarming.free_range r) c.client_ranges; *)
413 set_client_disconnected c reason
;
414 c
.client_session_downloaded
<- 0L;
415 c
.client_session_uploaded
<- 0L;
416 (try if c
.client_good
then count_seen c
with _
-> ());
417 (* this is not useful already done in the match
418 (try close sock reason with _ -> ()); *)
419 (*---------not needed ?? VvvvvV---------------
420 c.client_ranges <- [];
421 c.client_block <- None;
422 if not c.client_good then
423 connection_failed c.client_connection_control;
424 c.client_good <- false;
425 c.client_sock <- NoConnection;
426 c.client_chunks <- [];
427 c.client_allowed_to_write <- zero;
428 c.client_new_chunks <- [];
429 c.client_interesting <- false;
430 c.client_alrd_sent_interested <- false;
431 -------------------^^^^^--------------------*)
432 if (c
.client_registered_bitfield
) then
434 match c
.client_uploader
with
437 c
.client_uploader
<- None
;
438 (* If the client registered a bitfield then
439 we must unregister him to update the swarmer
440 (Useful for availability)
442 CommonSwarming.unregister_uploader up
443 (* c.client_registered_bitfield <- false;
444 for i = 0 to String.length c.client_bitmap - 1 do
445 c.client_bitmap.[0] <- '0';
448 (* Don't test if a client have an upload slot because
449 it don't have one (removed during earlier in
450 set_client_disconnected c reason)
452 if (List.mem c
!current_uploaders) then
454 (*BTGlobals.remove_client*)
456 recompute_uploaders ();
464 (** Disconnect all clients of a file
465 @param file The file to which we must disconnects all clients
467 let disconnect_clients file
=
468 let must_keep = ref true in
469 (match file_state file
with
470 | FilePaused
| FileCancelled
-> must_keep:=false
473 Hashtbl.iter
(fun _ c
->
474 if not
( !must_keep && (client_has_a_slot
(as_client c
) || c
.client_interested
)) then
476 if !verbose_msg_clients
then
477 lprintf_file_nl
(as_file file
) "disconnect since download is finished";
478 disconnect_client c Closed_by_user
483 (** What to do when a file is finished
484 @param file the finished file
486 let download_finished file
=
487 if List.memq file
!current_files
then
489 connect_trackers file
"completed" false (fun _ _
->
490 lprintf_file_nl
(as_file file
) "Tracker return: completed %s" file
.file_name
;
491 ); (*must be called before swarmer gets removed from file*)
492 (*CommonComplexOptions.file_completed*)
493 file_completed
(as_file file
);
494 (* Remove the swarmer for this file as it is not useful anymore... *)
495 CommonSwarming.remove_swarmer file
.file_swarmer
;
496 file
.file_swarmer
<- None
;
497 (* At this point, the file state is FileDownloaded. We should not remove
498 the file, because we continue to upload. *)
502 (** Check if a file is finished or not.
503 A file is finished if all blocks are verified.
504 @param file The file to check status
506 let check_finished swarmer file
=
507 if CommonSwarming.check_finished swarmer
then
508 download_finished file
510 let bits = [| 128; 64; 32;16;8;4;2;1 |]
512 (* Check/set bits in strings (bittorrent format) *)
515 (Char.code s
.[n
lsr 3]) land bits.(n
land 7) <> 0
519 s
.[i] <- Char.unsafe_chr
(Char.code s
.[i] lor bits.(n
land 7))
521 (* Official client seems to use max_range_request 5 and max_range_len 2^14 *)
522 (* How much requests in the 'pipeline' *)
523 let max_range_requests = 5
524 (* How much bytes we can request in one Piece *)
527 (** A wrapper to send Interested message to a client.
528 (Send interested only if needed)
529 @param c The client to send Interested
531 let send_interested c
=
532 if c
.client_interesting
&& (not c
.client_alrd_sent_interested
) then
534 c
.client_alrd_sent_interested
<- true;
535 send_client c Interested
539 (** Send a Bitfield message to a client.
540 @param c The client to send the Bitfield message
543 let send_bitfield c
=
544 send_client c
(BitField
546 match c
.client_file
.file_swarmer
with
548 (* This must be a seeded file... *)
549 if !verbose_download
then
550 lprintf_nl
"Sending completed verified bitmap";
551 let nchunks = Array.length c
.client_file
.file_chunks
in
552 let len = (nchunks+7)/8 in
553 let s = String.make
len '
\000'
in
554 for i = 0 to nchunks - 1 do
559 let bitmap = CommonSwarming.chunks_verified_bitmap swarmer
in
560 if !verbose_download
then
561 lprintf_nl
"Sending verified bitmap: [%s]" (VB.to_string
bitmap);
562 let len = (VB.length
bitmap + 7)/8 in
563 let s = String.make
len '
\000'
in
565 if c
= VB.State_verified
then set_bit s i) bitmap;
571 let parse_reserved rbits c
=
572 let has_bit pos h
= Char.code rbits
.[pos
] land h
<> 0 in
574 c
.client_dht
<- has_bit 7 0x01;
575 c
.client_cache_extension
<- has_bit 7 0x02;
576 c
.client_fast_extension
<- has_bit 7 0x04;
578 c
.client_utorrent_extension
<- has_bit 5 0x10;
580 c
.client_azureus_messaging_protocol
<- has_bit 0 0x80
583 (** This function is called to parse the first message that
585 @param counter client num
586 @param cc Expected client (probably useless now that we don't save any client)
587 @param init_sent A boolean to know if we sent this client the handshake message
588 @param gconn Don't know
589 @param sock The socket we use for this client
590 @param proto Unused (required by tuple type?)
591 @param file_id The file hash (sha1) of the file involved in this exchange
593 (* removed: @param peer_id The hash (sha1) of the client. (Should be checked)
595 let rec client_parse_header counter cc init_sent gconn sock
596 (proto
, rbits
, file_id
) =
598 set_lifetime sock
600.;
599 if !verbose_msg_clients
then
600 lprintf_nl
"client_parse_header %d" counter;
602 let file = Hashtbl.find files_by_uid file_id
in
603 if !verbose_msg_clients
then
604 lprintf_file_nl
(as_file
file) "file found";
605 let ccc, cc_country_code
= !cc in
609 let c = new_client
file Sha1.null
(TcpBufferedSocket.peer_addr sock
) cc_country_code
in
610 if !verbose_connect
then lprintf_file_nl
(as_file
file) "Client %d: incoming connection" (client_num
c);
611 cc := (Some
c), cc_country_code
;
614 (* Does it happen that this c was already used to connect successfully?
615 If yes then this must happen: *)
616 c.client_received_peer_id
<- false;
617 if cc_country_code
<> None
&& c.client_country_code
= None
then
618 c.client_country_code
<- cc_country_code
;
620 (* client could have had Sha1.null as peer_id/uid *)
621 (* this is to be done, later
622 if c.client_uid <> peer_id then
623 c.client_software <- (parse_software (Sha1.direct_to_string peer_id));
627 (* if c.client_uid <> peer_id then begin
628 lprintf "Unexpected client by UID\n";
629 let ccc = new_client file peer_id (TcpBufferedSocket.host sock) in
630 lprintf "CLIENT %d: testing instead of %d\n"
631 (client_num ccc) (client_num c);
632 (match ccc.client_sock with
634 lprintf_nl "[BT]: This client is already connected";
635 close sock (Closed_for_error "Already connected");
639 lprintf_nl "[BT]: Client %d: recovered by UID" (client_num ccc);
647 if !verbose_msg_clients
then begin
648 let (ip,port
) = c.client_host
in
649 lprintf_nl
"Client %d: Connected from %s:%d" (client_num
c)
650 (Ip.to_string
ip) port
;
653 parse_reserved rbits
c;
655 (match c.client_sock
with
657 if !verbose_msg_clients
then begin
658 let (ip,port
) = c.client_host
in
659 lprintf_nl
"No connection to client (%s:%d)!!!" (Ip.to_string
ip) port
;
661 c.client_sock
<- Connection sock
662 | ConnectionWaiting token
->
664 if !verbose_msg_clients
then
665 lprintf_nl
"Waiting for connection to client !!!";
666 c.client_sock
<- Connection sock
667 | Connection
s when s != sock
->
668 if !verbose_msg_clients
then
669 lprintf_nl
"CLIENT %d: IMMEDIATE RECONNECTION" (client_num
c);
670 disconnect_client c (Closed_for_error
"Reconnected");
671 c.client_sock
<- Connection sock
;
675 set_client_state
(c) (Connected
(-1));
676 if not init_sent
then
678 c.client_incoming
<- true;
679 send_init
!!client_uid file_id sock
;
681 connection_ok
c.client_connection_control
;
682 if !verbose_msg_clients
then
683 lprintf_nl
"file and client found";
684 (* if not c.client_incoming then *)
686 c.client_blocks_sent
<- file.file_blocks_downloaded
;
688 TODO !!! : send interested if and only if we are interested
689 -> we must recieve at least other peer bitfield.
690 in common swarmer -> compare : partition -> partition -> bool
693 set_rtimeout sock
!!client_timeout
;
694 (* Once parsed succesfully we define the function client_to_client
695 to be the function used when a message is read *)
696 gconn
.gconn_handler
<- Reader
(fun gconn sock
->
697 bt_handler
TcpMessages.parsing
(client_to_client
c) c sock
700 let b = TcpBufferedSocket.buf sock
in
701 (* The receive buffer is normally not empty now, lets parse the rest, most likely PeerID *)
703 ignore
(bt_handler
TcpMessages.parsing
(client_to_client
c) c sock
);
705 (* Some newer clients send more opcodes in their handshake packet, lets parse them now.
706 Using "while b.len <> 0 do ... done" is not possible here because libtorrent clients
707 send unparsable five extra bytes after their PeerID which would result into a loop *)
709 ignore
(bt_handler
TcpMessages.parsing
(client_to_client
c) c sock
);
713 let (ip,port
) = (TcpBufferedSocket.peer_addr sock
) in
714 if !verbose_unexpected_messages
then
715 lprintf_nl
"Client %s:%d requested a file that is not shared [%s]"
716 (Ip.to_string
ip) port
(Sha1.to_hexa file_id
)
718 lprintf_nl
"Exception %s in client_parse_header" (Printexc2.to_string e
);
719 close sock
(Closed_for_exception e
);
723 (** Update the bitmap of a client. Unclear if it is still useful.
724 @param c The client which we want to update.
726 and update_client_bitmap
c =
727 let file = c.client_file
in
729 let swarmer = match file.file_swarmer
with
731 | Some
swarmer -> swarmer
735 match c.client_uploader
with
737 let up = CommonSwarming.register_uploader
swarmer (as_client
c)
738 (AvailableIntervals
[]) in
739 c.client_uploader
<- Some
up;
745 let bitmap = match c.client_bitmap
with
747 let len = CommonSwarming.partition_size
swarmer in
748 let bitmap = Bitv.create
len false in
749 c.client_bitmap
<- Some
bitmap;
751 | Some
bitmap -> bitmap
754 if c.client_new_chunks
<> [] then begin
755 let chunks = c.client_new_chunks
in
756 c.client_new_chunks
<- [];
757 List.iter
(fun n
-> Bitv.set
bitmap n
true) chunks;
758 CommonSwarming.update_uploader_intervals
up (AvailableBitv
bitmap);
762 (** In this function we decide which piece we must request from client.
763 @param sock Socket of the client
766 and get_from_client sock
(c: client
) =
767 let file = c.client_file
in
768 (* Check if there's not enough requests in the 'pipeline'
769 and if a request can be send (not choked and file is downloading) *)
770 if List.length
c.client_ranges_sent
< max_range_requests
771 && file_state
file = FileDownloading
772 && (c.client_choked
== false)
774 (* num is the number of the piece, x and y are the position
775 of the subpiece in the piece(!), r is a (CommonSwarmer) range *)
777 let up = match c.client_uploader
with
780 let swarmer = CommonSwarming.uploader_swarmer
up in
786 if !verbose_msg_clients
then
787 lprintf_file_nl
(as_file
file) "CLIENT %d: Finding new range to send" (client_num
c);
789 if !verbose_swarming
then begin
790 lprintf_n
"Current download:\n Current chunks: ";
793 List.iter
(fun (x
,y
) -> lprintf
"%Ld-%Ld " x y
) c.client_chunks
794 with _
-> lprintf
"No Chunks";
798 lprintf_n
"Current ranges: ";
800 List.iter
(fun (p1
,p2
, r) ->
801 let (x
,y
) = CommonSwarming.range_range
r in
802 lprintf
"%Ld-%Ld[%Ld-%Ld] " p1 p2 x y
803 ) c.client_ranges_sent
;
805 match c.client_range_waiting
with
807 | Some
(x
,y
,r) -> lprintf
"Waiting %Ld-%Ld" x y
;
811 lprintf_n
"Current blocks: ";
813 match c.client_chunk
with
814 | None
-> lprintf
"none"
815 | Some
(chunk
, blocks
) -> List.iter
(fun b ->
816 CommonSwarming.print_block
b.up_block
) blocks
;
820 lprintf_file_nl
(as_file
file) "Finding Range:";
825 (*We must find a block to request first, and then
826 some range inside this block
831 match c.client_chunk
with
835 if !verbose_swarming
then lprintf_file_nl
(as_file
file) "No block";
836 update_client_bitmap
c;
837 (try CommonSwarming.verify_one_chunk
swarmer with _
-> ());
838 (*Find a free block in the swarmer*)
839 let chunk, blocks
= CommonSwarming.find_blocks
up in
840 if !verbose_swarming
then begin
841 lprintf_n
"Blocks Found: "; List.iter (fun b ->
842 CommonSwarming.print_block
b.up_block
) blocks
;
845 c.client_chunk
<- Some
(chunk, blocks
);
847 (*We put the found block in client_block to
848 request range in this block. (Useful for
849 not searching each time a new block)
854 | Some
(chunk, blocks
) ->
856 if !verbose_swarming
then begin
857 lprintf_n
"Current Blocks: "; List.iter (fun b ->
858 CommonSwarming.print_block
b.up_block
) blocks
;
863 (*Given a block find a range inside*)
865 match c.client_range_waiting
with
867 c.client_range_waiting
<- None
;
870 CommonSwarming.find_range
up (min max_range_len
file.file_piece_size
)
875 if y
-- x
> max_range_len
then begin
876 c.client_range_waiting
<- Some
(x
++ max_range_len
, y
, r);
877 (x
, x
++ max_range_len
, r)
882 c.client_ranges_sent
<- c.client_ranges_sent
@ [x
,y
, r];
883 (* CommonSwarming.alloc_range r; *)
885 (* naughty, naughty, was computing a block number instead of a chunk
886 number. Only matters with merged downloads, and even then other
887 clients didn't seem to care (?), so the bug remained hidden *)
888 if !verbose_swarming
then
889 lprintf_file_nl
(as_file
file) "Asking %d For Range %Ld-%Ld" chunk x y
;
891 chunk, x
-- file.file_piece_size
** Int64.of_int
chunk, y
-- x
, r
895 (*If we don't find a range to request inside the block,
896 iter to choose another block*)
897 if !verbose_swarming
then
898 lprintf_nl
"Could not find range in current block";
899 (* c.client_blocks <- List2.removeq b c.client_blocks; *)
901 c.client_chunk
<- None
;
909 (*If we don't find a block to request we can check if the
910 file is finished (if there's missing pieces we can't decide
911 that the file is finished because we didn't found
914 if !verbose_swarming
then
915 lprintf_nl
"Unable to get a block !!";
916 CommonSwarming.compute_bitmap
swarmer;
917 check_finished swarmer file;
921 send_client
c (Request
(num,x
,y
));
923 if !verbose_msg_clients
then
924 lprintf_file_nl
(as_file
file) "CLIENT %d: Asking %s For Range %Ld-%Ld"
925 (client_num
c) (Sha1.to_string
c.client_uid
) x y
928 if not
(CommonSwarming.check_finished swarmer) && !verbose_download
then
929 lprintf_file_nl
(as_file
file) "BTClient.get_from_client ERROR: can't find a block to download and file is not yet finished for file : %s..." file.file_name
932 (** In this function we match a message sent by a client
933 and react according to this message.
934 @param c The client which sent us a message
935 @param sock The socket used for this client
936 @param msg The message sent by the client
938 and client_to_client
c sock msg
=
939 if !verbose_msg_clients
then begin
940 let (ip,port
) = (TcpBufferedSocket.peer_addr sock
) in
941 let (timeout
, next
) = get_rtimeout sock
in
942 lprintf_nl
"CLIENT %d(%s:%d): (%d, %d,%d) Received %s"
943 (client_num
c) (Ip.to_string
ip) port
945 (int_of_float timeout
)
947 (TcpMessages.to_string msg
);
950 let file = c.client_file
in
952 (* Sending the "Have" message was moved to bTGlobals so this is useless *)
953 (* if c.client_blocks_sent != file.file_blocks_downloaded then begin
957 | b :: tail when tail == c.client_blocks_sent ->
958 c.client_blocks_sent <- list;
959 let (num,_,_) = CommonSwarming.block_block b in
960 send_client c (Have (Int64.of_int num))
961 | _ :: tail -> iter tail
963 iter file.file_blocks_downloaded
968 Piece
(num, offset
, s, pos
, len) ->
969 (*A Piece message contains the data*)
970 set_client_state
c (Connected_downloading
(file_num
file));
971 (*flag it as a good client *)
972 c.client_good
<- true;
973 if file_state
file = FileDownloading
then begin
974 let position = offset
++ file.file_piece_size
*.. num in
975 let up = match c.client_uploader
with
978 let swarmer = CommonSwarming.uploader_swarmer
up in
980 if !verbose_msg_clients
then
981 (match c.client_ranges_sent
with
982 [] -> lprintf_file_nl
(as_file
file) "EMPTY Ranges !!!"
984 let (x
,y
) = CommonSwarming.range_range
r in
985 lprintf_file_nl
(as_file
file) "Current range from %s : %Ld [%d] (asked %Ld-%Ld[%Ld-%Ld])"
986 (brand_to_string
c.client_brand
) position len
991 CommonSwarming.downloaded
swarmer in
992 (* List.iter CommonSwarming.free_range c.client_ranges; *)
993 CommonSwarming.received
up
995 (* List.iter CommonSwarming.alloc_range c.client_ranges; *)
997 CommonSwarming.downloaded
swarmer in
999 (*Update rate and amount of data received from client*)
1000 count_download
c (new_downloaded -- old_downloaded);
1001 (* use len here with max_dr quickfix *)
1002 Rate.update
c.client_downloaded_rate ~amount
:len;
1003 (* count bytes downloaded from network for this file *)
1004 file.file_session_downloaded
<- file.file_session_downloaded
++ (Int64.of_int
len);
1005 if !verbose_msg_clients
then
1006 (match c.client_ranges_sent
with
1007 [] -> lprintf_file_nl
(as_file
file) "EMPTY Ranges !!!"
1009 let (x
,y
) = CommonSwarming.range_range
r in
1010 lprintf_file_nl
(as_file
file) "Received %Ld [%d] %Ld-%Ld[%Ld-%Ld] -> %Ld"
1013 (new_downloaded -- old_downloaded)
1016 (* changed 2.5.28 should have been done before !
1017 if new_downloaded <> old_downloaded then
1018 add_file_downloaded (as_file file)
1019 (new_downloaded -- old_downloaded); *)
1022 match c.client_ranges_sent
with
1025 (* CommonSwarming.free_range r; *)
1026 c.client_ranges_sent
<- tail
;
1028 get_from_client sock
c;
1030 (* Check if the client is still interesting for us... *)
1031 check_if_interesting
file c
1034 (* Disconnect if that is ourselves. *)
1035 c.client_uid
<- Sha1.direct_of_string
p;
1036 if not
(c.client_uid
= !!client_uid
) then
1038 let brand, release
= parse_software
p in
1039 c.client_brand
<- brand;
1040 c.client_release
<- release
;
1041 send_client
c Choke
;
1042 c.client_sent_choke
<- true;
1045 disconnect_client c Closed_by_user
1049 (*A bitfield is a summary of what a client have*)
1051 match c.client_file
.file_swarmer
with
1054 c.client_new_chunks
<- [];
1056 let npieces = CommonSwarming.partition_size
swarmer in
1057 let nbits = String.length
p * 8 in
1059 if nbits < npieces then begin
1060 lprintf_file_nl
(as_file
file) "Error: expected bitfield of atleast %d but got %d" npieces nbits;
1061 disconnect_client c (Closed_for_error
"Wrong bitfield length")
1064 let bitmap = CommonSwarming.chunks_verified_bitmap
swarmer in
1066 for i = 0 to npieces - 1 do
1067 if is_bit_set p i then begin
1068 c.client_new_chunks
<- i :: c.client_new_chunks
;
1069 match VB.get
bitmap i with
1070 | VB.State_missing
| VB.State_partial
->
1071 c.client_interesting
<- true
1072 | VB.State_complete
| VB.State_verified
-> ()
1076 update_client_bitmap
c;
1077 c.client_registered_bitfield
<- true;
1079 if c.client_interesting
then
1082 if !verbose_msg_clients
then
1083 lprintf_file_nl
(as_file
file) "New BitField Registered";
1085 (* for i = 1 to max_range_requests - List.length c.client_ranges do
1086 (try get_from_client sock c with _ -> ())
1092 (* Note: a bitfield must only be sent after the handshake and before everything else: NOT here *)
1095 (* A client can send a "Have" without sending a Bitfield *)
1097 match c.client_file
.file_swarmer
with
1100 let n = Int64.to_int
n in
1101 let bitmap = CommonSwarming.chunks_verified_bitmap
swarmer in
1102 (* lprintf_nl "verified: %c;" (VB.state_to_char (VB.get bitmap n)); *)
1103 (* if the peer has a chunk we don't, tell him we're interested and update his bitmap *)
1104 match VB.get
bitmap n with
1105 | VB.State_missing
| VB.State_partial
->
1106 c.client_interesting
<- true;
1108 c.client_new_chunks
<- n :: c.client_new_chunks
;
1109 update_client_bitmap
c;
1110 | VB.State_complete
| VB.State_verified
-> ()
1113 match c.client_bitmap, c.client_uploader with
1114 Some bitmap, Some up ->
1115 let swarmer = CommonSwarming.uploader_swarmer up in
1116 let n = Int64.to_int n in
1117 if bitmap.[n] <> '1' then
1119 let verified = CommonSwarming.verified_bitmap swarmer in
1120 if verified.[n] < '2' then begin
1121 c.client_interesting <- true;
1123 c.client_new_chunks <- n :: c.client_new_chunks;
1124 if c.client_block = None then begin
1125 update_client_bitmap c;
1126 (* for i = 1 to max_range_requests -
1127 List.length c.client_ranges do
1128 (try get_from_client sock c with _ -> ())
1132 | None
, Some _
-> lprintf_nl
"no bitmap but client_uploader";
1133 | Some _
, None
->lprintf_nl
"bitmap but no client_uploader";
1134 | None
, None
-> lprintf_nl
"no bitmap no client_uploader";
1140 c.client_interested
<- true;
1144 set_client_state
(c) (Connected
(-1));
1145 (* remote peer will clear the list of range we sent *)
1147 match c.client_uploader
with
1149 (* Afaik this is no protocol violation and happens if the client
1150 didn't send a client bitmap after the handshake. *)
1151 let (ip,port
) = c.client_host
in
1152 if !verbose_msg_clients
then lprintf_file_nl
(as_file
file) "%s:%d with software %s : Choke send, but no client bitmap"
1153 (Ip.to_string
ip) port
(brand_to_string
c.client_brand
)
1155 CommonSwarming.clear_uploader_intervals
up
1157 c.client_ranges_sent
<- [];
1158 c.client_range_waiting
<- None
;
1159 c.client_choked
<- true;
1163 c.client_interested
<- false;
1167 c.client_choked
<- false;
1168 (* remote peer cleared our request : re-request *)
1169 for i = 1 to max_range_requests -
1170 List.length
c.client_ranges_sent
do
1171 (try get_from_client sock
c with _
-> ())
1175 | Request
(n, pos
, len) ->
1176 if len > max_request_len
then begin
1177 close sock
(Closed_for_error
"Request longer than 1<<16");
1181 if !CommonGlobals.has_upload
= 0 then
1183 if client_has_a_slot
(as_client
c) then
1185 (* lprintf "Received request for upload\n"; *)
1186 (match c.client_upload_requests
with
1188 CommonUploads.ready_for_upload
(as_client
c);
1190 c.client_upload_requests
<- c.client_upload_requests
@ [n,pos
,len];
1191 let file = c.client_file
in
1192 match file.file_shared
with
1196 s.impl_shared_requests
<- s.impl_shared_requests
+ 1;
1197 shared_must_update
(as_shared
s)
1202 send_client
c Choke
;
1203 c.client_sent_choke
<- true;
1204 c.client_upload_requests
<- [];
1209 (* We don't 'generate' a Ping message on a Ping. *)
1211 | Cancel
(n, pos
, len) ->
1212 (* if we receive a cancel message from a peer, remove request *)
1213 if client_has_a_slot
(as_client
c) then
1214 c.client_upload_requests
<- List2.remove_first
(n, pos
, len) c.client_upload_requests
1216 if !verbose_msg_clients
then
1217 lprintf_file_nl
(as_file
file) "Error: received cancel request but client has no slot"
1220 lprintf_file_nl
(as_file
file) "Error %s while handling MESSAGE: %s" (Printexc2.to_string e
) (TcpMessages.to_string msg
)
1223 (** The function used to connect to a client.
1224 The connection is not immediately initiated. It will
1225 be put in a fifo and dequeued according to
1226 !!max_connections_per_second. (@see commonGlobals.ml)
1227 @param c The client we must connect
1229 let connect_client c =
1230 if can_open_connection connection_manager
&&
1231 (let (ip,port
) = c.client_host
in
1232 match !Ip.banned
(ip, c.client_country_code
) with
1235 if !verbose_connect
then
1236 lprintf_nl
"%s:%d (%s), blocked: %s"
1237 (Ip.to_string
ip) port
1238 (fst
(Geoip.get_country_code_name
c.client_country_code
))
1242 match c.client_sock
with
1246 add_pending_connection connection_manager
(fun token ->
1248 if !verbose_msg_clients
then
1249 lprintf_nl
"CLIENT %d: connect_client" (client_num
c);
1250 let (ip,port
) = c.client_host
in
1251 if !verbose_msg_clients
then
1252 lprintf_nl
"connecting %s:%d" (Ip.to_string
ip) port
;
1253 connection_try
c.client_connection_control
;
1255 let sock = connect
token "bittorrent download"
1256 (Ip.to_inet_addr
ip) port
1259 BASIC_EVENT LTIMEOUT
->
1260 if !verbose_msg_clients
then
1261 lprintf_nl
"CLIENT %d: LIFETIME" (client_num
c);
1262 close
sock Closed_for_timeout
1263 | BASIC_EVENT RTIMEOUT
->
1264 if !verbose_msg_clients
then
1265 lprintf_nl
"CLIENT %d: RTIMEOUT (%d)" (client_num
c)
1268 close
sock Closed_for_timeout
1269 | BASIC_EVENT
(CLOSED
r) ->
1271 match c.client_sock
with
1272 | Connection
s when s == sock ->
1273 disconnect_client c r
1279 c.client_sock
<- Connection
sock;
1280 set_lifetime
sock 600.;
1281 TcpBufferedSocket.set_read_controler
sock download_control
;
1282 TcpBufferedSocket.set_write_controler
sock upload_control
;
1283 TcpBufferedSocket.set_rtimeout
sock 30.;
1284 let file = c.client_file
in
1286 if !verbose_msg_clients
then
1287 lprintf_file_nl
(as_file
file) "READY TO DOWNLOAD FILE";
1289 send_init
!!client_uid
file.file_id
sock;
1290 (* Fabrice: Initialize the client bitmap and uploader fields to <> None *)
1291 update_client_bitmap
c;
1292 (* (try get_from_client sock c with _ -> ());*)
1294 (*We 'hook' the client_parse_header function to the socket
1295 This function will then be called when the first message will
1298 set_bt_sock
sock !verbose_msg_clients
1299 (BTHeader
(client_parse_header !counter (ref ((Some
c), c.client_country_code
)) true))
1302 lprintf_nl
"Exception %s while connecting to client"
1303 (Printexc2.to_string e
);
1304 disconnect_client c (Closed_for_exception e
)
1306 (*Since this is a pending connection put ConnectionWaiting
1310 c.client_sock
<- ConnectionWaiting
token
1314 (** The Listen function (very much like in C : TCP Socket Server).
1315 Monitors client connection to us.
1319 let s = TcpServerSocket.create
"bittorrent client server"
1320 (Ip.to_inet_addr
!!client_bind_addr
)
1324 TcpServerSocket.CONNECTION
(s,
1325 Unix.ADDR_INET
(from_ip
, from_port
)) ->
1326 (*Receiving an event TcpServerSocket.CONNECTION from
1327 the TcpServerSocket means that a new client try
1330 let ip = (Ip.of_inet_addr from_ip
) in
1331 let cc = Geoip.get_country_code_option
ip in
1332 if !verbose_sources
> 1 then lprintf_nl
"CONNECTION RECEIVED FROM %s"
1333 (Ip.to_string
(Ip.of_inet_addr from_ip
))
1335 (*Reject this connection if we don't want
1336 to bypass the max_connection parameter
1338 if can_open_connection connection_manager
&&
1339 (match !Ip.banned
(ip, cc) with
1342 if !verbose_connect
then
1343 lprintf_nl
"%s:%d (%s) blocked: %s"
1344 (Ip.to_string
ip) from_port
1345 (fst
(Geoip.get_country_code_name
cc))
1350 let token = create_token connection_manager
in
1351 let sock = TcpBufferedSocket.create
token
1352 "bittorrent client connection" s
1355 BASIC_EVENT
(RTIMEOUT
|LTIMEOUT
) ->
1356 (*monitor read and life timeout on client
1359 close
sock Closed_for_timeout
1363 TcpBufferedSocket.set_read_controler
sock download_control
;
1364 TcpBufferedSocket.set_write_controler
sock upload_control
;
1366 let c = ref (None
, cc) in
1367 TcpBufferedSocket.set_closer
sock (fun _
r ->
1370 match c.client_sock
with
1371 | Connection
s when s == sock ->
1372 disconnect_client c r
1377 set_rtimeout
sock 30.;
1379 (*Again : 'hook' client_parse_header to the socket*)
1380 set_bt_sock
sock !verbose_msg_clients
1381 (BTHeader
(client_parse_header !counter c false));
1384 (*don't forget to close the incoming sock if we can't
1385 open a new connection
1390 listen_sock
:= Some
s;
1393 if !verbose_connect
then
1394 lprintf_nl
"Exception %s while init bittorrent server"
1395 (Printexc2.to_string e
)
1398 (** This function send keepalive messages to all connected clients
1399 (and update socket lifetime)
1402 List.iter (fun file ->
1403 Hashtbl.iter (fun _
c ->
1404 match c.client_sock
with
1405 | Connection
sock ->
1407 set_lifetime
sock 130.;
1415 (** Check each clients for a given file if they are connected.
1416 If they aren't, try to connect them
1418 let resume_clients file =
1419 Hashtbl.iter (fun _
c ->
1421 match c.client_sock
with
1422 | Connection
sock -> ()
1423 (*i think this one is not really usefull for debugging
1424 lprintf_nl "[BT]: RESUME: Client is already connected"; *)
1427 (*test if we can connect client according to the its
1429 Currently the delay between two try is 120 seconds.
1431 if connection_can_try
c.client_connection_control
then
1434 print_control
c.client_connection_control
1437 if !verbose_connect
then
1438 lprintf_file_nl
(as_file
file) "Exception %s in resume_clients" (Printexc2.to_string e
)
1442 resume_clients_hook := resume_clients
1444 (** Check if the value replied by the tracker is correct.
1445 @param key the name of the key
1446 @param n the value to check
1447 @param url Url of the tracker
1448 @param name the name of the file
1450 let chk_keyval key
n url name
=
1451 let int_n = (Int64.to_int
n) in
1452 if !verbose_msg_clients
then
1453 lprintf_nl
"Reply from %s in file: %s has %s: %d" (show_tracker_url url
) name key
int_n;
1457 lprintf_nl
"Reply from %s in file: %s has an invalid %s value: %d" (show_tracker_url url
) name key
int_n;
1461 (** Check that client is valid and record it *)
1462 let maybe_new_client file id
ip port
=
1463 let cc = Geoip.get_country_code_option
ip in
1464 if id
<> !!client_uid
1467 && (match !Ip.banned
(ip, cc) with
1470 if !verbose_connect
then
1471 lprintf_file_nl
(as_file
file) "%s:%d blocked: %s" (Ip.to_string
ip) port reason
;
1474 ignore
(new_client
file id
(ip,port
) cc);
1475 if !verbose_sources
> 1 then
1476 lprintf_file_nl
(as_file
file) "Received %s:%d" (Ip.to_string
ip) port
;
1479 let exn_catch f x
= try `Ok
(f x
) with exn
-> `Exn exn
1481 (** In this function we interact with the tracker
1482 @param file The file for which we want some sources
1483 @param need_sources whether we need any sources
1485 let talk_to_tracker file need_sources
=
1486 (* This is the function which will be called by the http client for parsing the response *)
1488 let tracker_url = show_tracker_url t
.tracker_url in
1489 let tracker_failed reason
=
1490 (* On failure, disable the tracker and count attempts (@see is_tracker_enabled) *)
1491 let num = match t
.tracker_status
with | Disabled_failure
(i,_
) -> i + 1 | _
-> 1 in
1492 t
.tracker_status
<- Disabled_failure
(num, intern reason
);
1493 lprintf_file_nl
(as_file
file) "Failure no. %d%s from Tracker %s for file: %s Reason: %s"
1495 (if !!tracker_retries
= 0 then "" else Printf.sprintf
"/%d" !!tracker_retries
)
1496 tracker_url file.file_name
(Charset.Locale.to_utf8 reason
)
1498 match exn_catch File.to_string filename
with
1499 | `Exn _
| `Ok
"" -> tracker_failed "empty reply"
1501 match exn_catch Bencode.decode
s with
1502 | `Exn exn
-> tracker_failed (Printf.sprintf
"wrong reply (%s)" (Printexc2.to_string exn
))
1503 | `Ok
(Dictionary list
) ->
1504 t
.tracker_interval
<- 600;
1505 t
.tracker_min_interval
<- 600;
1506 t
.tracker_last_clients_num
<- 0;
1507 if need_sources
then t
.tracker_last_clients_num
<- 0;
1508 let chk_keyval key
n = chk_keyval key
n t
.tracker_url file.file_name
in
1509 if not
(List.mem_assoc
"failure reason" list
) then
1511 begin match t
.tracker_status
with
1512 | Disabled_failure
(i, _
) ->
1513 lprintf_file_nl
(as_file
file) "Received good message from Tracker %s after %d bad attempts"
1516 (* Received good message from tracker after failures, re-enable tracker *)
1517 t
.tracker_status
<- Enabled
;
1519 List.iter (fun (key
,value) ->
1520 match (key
,value) with
1521 | "failure reason", String failure
-> tracker_failed failure
1522 | "warning message", String warning
->
1523 lprintf_file_nl
(as_file
file) "Warning from Tracker %s in file: %s Reason: %s"
1524 tracker_url file.file_name warning
1525 | "interval", Int
n ->
1526 t
.tracker_interval
<- chk_keyval key
n;
1527 (* in case we don't receive "min interval" *)
1528 if t
.tracker_min_interval
> t
.tracker_interval
then
1529 t
.tracker_min_interval
<- t
.tracker_interval
1530 | "min interval", Int
n ->
1531 t
.tracker_min_interval
<- chk_keyval key
n;
1532 (* make sure "min interval" is always < or equal to "interval" *)
1533 if t
.tracker_min_interval
> t
.tracker_interval
then
1534 t
.tracker_min_interval
<- t
.tracker_interval
1535 | "downloaded", Int
n ->
1536 t
.tracker_torrent_downloaded
<- chk_keyval key
n
1538 | "done peers", Int
n ->
1539 t
.tracker_torrent_complete
<- chk_keyval key
n
1540 | "incomplete", Int
n ->
1541 t
.tracker_torrent_incomplete
<- chk_keyval key
n;
1542 (* if complete > 0 and we receive incomplete we probably won't receive num_peers so we simulate it below *)
1543 if t
.tracker_torrent_complete
> 0 then
1544 t
.tracker_torrent_total_clients_count
<- (t
.tracker_torrent_complete
+ t
.tracker_torrent_incomplete
);
1545 | "num peers", Int
n ->
1546 t
.tracker_torrent_total_clients_count
<- chk_keyval key
n;
1547 (* if complete > 0 and we receive num_peers we probably won't receive incomplete so we simulate it below *)
1548 if t
.tracker_torrent_complete
> 0 then
1549 t
.tracker_torrent_incomplete
<- (t
.tracker_torrent_total_clients_count
- t
.tracker_torrent_complete
);
1551 t
.tracker_torrent_last_dl_req
<- chk_keyval key
n
1552 | "key", String
n ->
1554 if !verbose_msg_clients
then
1555 lprintf_file_nl
(as_file
file) "%s in file: %s has key: %s" tracker_url file.file_name
n
1556 | "tracker id", String
n ->
1558 if !verbose_msg_clients
then
1559 lprintf_file_nl
(as_file
file) "%s in file: %s has tracker id %s" tracker_url file.file_name
n
1561 | "peers", List list
->
1562 if need_sources
then
1565 | Dictionary list
->
1566 let peer_id = ref Sha1.null
in
1567 let peer_ip = ref Ip.null
in
1572 "peer id", String id
->
1573 peer_id := Sha1.direct_of_string id
;
1574 | "ip", String
ip ->
1575 peer_ip := Ip.of_string
ip
1577 port := Int64.to_int
p
1581 t
.tracker_last_clients_num
<- t
.tracker_last_clients_num
+ 1;
1582 maybe_new_client file !peer_id !peer_ip !port
1586 | "peers", String
p ->
1587 let rec iter_comp s pos l
=
1589 let ip = Ip.of_ints
(get_uint8
s pos
,get_uint8
s (pos
+1),
1590 get_uint8
s (pos
+2),get_uint8
s (pos
+3))
1591 and port = get_int16
s (pos
+4)
1593 t
.tracker_last_clients_num
<- t
.tracker_last_clients_num
+ 1;
1594 maybe_new_client file Sha1.null
ip port;
1596 iter_comp s (pos
+6) l
1598 if need_sources
then
1599 iter_comp p 0 (String.length
p)
1600 | "private", Int
n -> ()
1601 (* TODO: if set to 1, disable peer exchange *)
1603 | key
, _
-> lprintf_file_nl
(as_file
file) "received unknown entry in answer from tracker: %s : %s" key
(Bencode.print
value)
1605 (*Now, that we have added new clients to a file, it's time
1606 to connect to them*)
1607 if !verbose_sources
> 0 then
1608 lprintf_file_nl
(as_file
file) "talk_to_tracker: got %i source(s) for file %s"
1609 t
.tracker_last_clients_num
file.file_name
;
1610 if need_sources
then resume_clients file
1612 | _
-> tracker_failed "wrong reply (value)"
1615 if file.file_tracker_connected
then ""
1618 connect_trackers file event need_sources
f
1621 (** Check to see if file is finished, if not
1622 try to get sources for it
1624 let recover_files () =
1625 if !verbose_share
then
1626 lprintf_nl
"recover_files";
1627 List.iter (fun file ->
1628 match file.file_swarmer
with
1631 (try check_finished swarmer file with e
-> ());
1632 match file_state
file with
1634 if !verbose_share
then
1635 lprintf_file_nl
(as_file
file) "recover downloading";
1636 (try talk_to_tracker file true with _
-> ())
1638 if !verbose_share
then
1639 lprintf_file_nl
(as_file
file) "recover shared";
1640 (try talk_to_tracker file false with _
-> ())
1641 | FilePaused
-> () (*when we are paused we do nothing, not even logging this vvvv*)
1643 | s -> if !verbose
then lprintf_file_nl
(as_file
file) "recover: Other state %s!!" (string_of_state
s)
1646 let upload_buffer = String.create
100000
1650 Send a Piece message
1651 for one of the request of client
1652 @param sock The socket of the client
1655 let rec iter_upload sock c =
1656 match c.client_upload_requests
with
1658 | (num, pos
, len) :: tail
->
1659 if len = zero
then begin
1660 c.client_upload_requests
<- tail
;
1663 if c.client_allowed_to_write
>= 0L then begin
1665 c.client_upload_requests
<- tail
;
1667 let file = c.client_file
in
1668 let offset = pos
++ file.file_piece_size
*.. num in
1669 c.client_allowed_to_write
<- c.client_allowed_to_write
-- len;
1671 let len = Int64.to_int
len in
1672 (* lprintf "Unix32.read: offset %Ld len %d\n" offset len; *)
1673 Unix32.read
(file_fd
file) offset upload_buffer 0 len;
1674 (* update upload rate from len bytes *)
1675 Rate.update
c.client_upload_rate ~amount
:len;
1676 Rate.update
c.client_downloaded_rate
;
1677 file.file_uploaded
<- file.file_uploaded
++ (Int64.of_int
len);
1678 file.file_session_uploaded
<- file.file_session_uploaded
++ (Int64.of_int
len);
1681 count_filerequest
c;
1682 match file.file_shared
with
1686 s.impl_shared_uploaded
<- file.file_uploaded
;
1687 shared_must_update
(as_shared
s)
1690 (* lprintf "sending piece\n"; *)
1691 send_client
c (Piece
(num, pos
, upload_buffer, 0, len));
1695 lprintf_nl
"Exception %s in iter_upload" (Printexc2.to_string e
)
1698 (* lprintf "client is waiting for another piece\n"; *)
1699 ready_for_upload
(as_client
c)
1704 In this function we check if we can send bytes (according
1705 to bandwidth control), if we can, call iter_upload to
1706 send a Piece message
1707 @param c the client to which we can send some bytes
1708 @param allowed the amount of bytes we can send to client
1710 let client_can_upload c allowed
=
1711 (* lprintf "allowed to upload %d\n" allowed; *)
1712 do_if_connected
c.client_sock
(fun sock ->
1713 match c.client_upload_requests
with
1716 let new_allowed_to_write =
1717 c.client_allowed_to_write
++ (Int64.of_int allowed
) in
1718 if allowed
> 0 && can_write_len
sock
1719 (Int64.to_int
new_allowed_to_write)
1721 CommonUploads.consume_bandwidth allowed
;
1722 c.client_allowed_to_write
<- new_allowed_to_write;
1727 let file_resume file =
1729 match t
.tracker_status
with
1730 | Enabled
| Disabled_mld
_ -> ()
1731 | Disabled_failure
_ | Disabled
_ -> t
.tracker_status
<- Enabled
1732 ) file.file_trackers
;
1733 (try talk_to_tracker file true with _ -> ())
1738 Send info to tracker when stopping a file.
1739 @param file the file we want to stop
1741 let file_stop file =
1742 if file.file_tracker_connected
then
1744 connect_trackers file "stopped" false (fun _ _ ->
1745 lprintf_file_nl
(as_file
file) "Tracker return: stopped %s" file.file_name
;
1746 file.file_tracker_connected
<- false)
1753 client_ops
.op_client_can_upload
<- client_can_upload;
1754 file_ops
.op_file_resume
<- file_resume;
1755 file_ops
.op_file_recover
<- file_resume;
1756 file_ops
.op_file_pause
<- (fun file ->
1757 Hashtbl.iter (fun _ c ->
1758 match c.client_sock
with
1759 Connection
sock -> close
sock Closed_by_user
1761 ) file.file_clients
;
1762 (*When a file is paused we consider it is stopped*)
1765 file_ops
.op_file_queue
<- file_ops
.op_file_pause
;
1766 client_ops
.op_client_enter_upload_queue
<- (fun c ->
1767 if !verbose_msg_clients
then
1768 lprintf_nl
"Client %d: client_enter_upload_queue" (client_num
c);
1769 ready_for_upload
(as_client
c));
1770 network
.op_network_connected_servers
<- (fun _ -> []);