1 (* Copyright 2001, 2002 b52_simon :), b8_bavard, b8_fee_carabine, INRIA *)
3 This file is part of mldonkey.
5 mldonkey is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation; either version 2 of the License, or
8 (at your option) any later version.
10 mldonkey is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 GNU General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with mldonkey; if not, write to the Free Software
17 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
21 (** Functions used in client<->client communication
22 and also client<->tracker
25 (** A peer (or client) is always a remote peer in this file.
26 A Piece is a portion of the file associated with a hash (sha1).
27 In mldonkey a piece is referred as a block inside the swarming system.
28 A SubPiece is a portion of a piece (without hash) which can be
29 sent/downloaded to/from a peer.
30 In mldonkey a SubPiece is referred as a range inside the swarming system.
31 @see <http://wiki.theory.org/index.php/BitTorrentSpecification> wiki for some
32 unofficial (but more detailed) specs.
42 open TcpBufferedSocket
49 open CommonInteractive
51 open CommonComplexOptions
68 module VB
= VerificationBitmap
70 let http_ok = "HTTP 200 OK"
71 let http11_ok = "HTTP/1.1 200 OK"
74 let next_uploaders = ref ([] : BTTypes.client list
)
75 let current_uploaders = ref ([] : BTTypes.client list
)
77 (** Check that client is valid and record it *)
78 let maybe_new_client file id ip port
=
79 let cc = Geoip.get_country_code_option ip
in
83 && (match !Ip.banned
(ip
, cc) with
86 if !verbose_connect
then
87 lprintf_file_nl
(as_file file
) "%s:%d blocked: %s" (Ip.to_string ip
) port reason
;
90 ignore
(new_client file id
(ip
,port
) cc);
91 if !verbose_sources
> 1 then
92 lprintf_file_nl
(as_file file
) "Received %s:%d" (Ip.to_string ip
) port
95 let resume_clients_hook = ref (fun _
-> assert false)
99 (* open modules locally *)
103 let string_of_event = function
104 | READ_DONE
-> "READ_DONE"
105 | WRITE_DONE
-> "WRITE_DONE"
106 | CAN_REFILL
-> "CAN_REFILL"
107 | BASIC_EVENT e
-> match e
with
108 | CLOSED reason
-> "CLOSED " ^
(string_of_reason reason
)
109 | RTIMEOUT
-> "RTIMEOUT"
110 | WTIMEOUT
-> "WTIMEOUT"
111 | LTIMEOUT
-> "LTIMEOUT"
112 | CAN_READ
-> "CAN_READ"
113 | CAN_WRITE
-> "CAN_WRITE"
115 (** talk to udp tracker and parse response
116 except of parsing should perform everything that
117 talk_to_tracker's inner function does FIXME refactor both *)
118 let interact host port args file t need_sources
=
120 lprintf_nl
"udpt start with %s:%d" host port
;
121 let addr = try (Unix.gethostbyname host
).Unix.h_addr_list
.(0) with exn
-> failwith
("failed to resolve " ^ host
) in
122 let ip = Ip.of_inet_addr
addr in
123 lprintf_nl
"udpt resolved to ip %s" (Ip.to_string
ip);
124 let sock = create
Unix.inet_addr_any
0 (fun sock event
->
125 (* lprintf_nl "udpt got event %s for %s" (string_of_event event) host *)
127 | WRITE_DONE
| CAN_REFILL
-> ()
128 | READ_DONE
-> assert false (* set_reader prevents this *)
129 | BASIC_EVENT x
-> match x
with
131 | CAN_READ
| CAN_WRITE
-> assert false (* udpSocket implementation prevents this *)
132 | LTIMEOUT
| WTIMEOUT
| RTIMEOUT
-> close
sock (Closed_for_error
"udpt timeout"))
134 let txn = Random.int32
Int32.max_int
in
135 lprintf_nl
"udpt txn %ld for %s" txn host
;
136 write
sock false (connect_request
txn) ip port
;
137 set_reader
sock (fun _
->
139 let conn = connect_response
p.udp_content
txn in
140 lprintf_nl
"udpt connection_id %Ld for %s" conn host
;
141 let txn = Random.int32
Int32.max_int
in
142 lprintf_nl
"udpt txn' %ld for host %s" txn host
;
143 let int s
= Int64.of_string
(List.assoc s args
) in
144 let req = announce_request
conn txn
145 ~info_hash
:(List.assoc
"info_hash" args
)
146 ~peer_id
:(List.assoc
"peer_id" args
)
147 (int "downloaded",int "left",int "uploaded")
148 (match List.assoc
"event" args
with
153 | s
-> lprintf_nl
"udpt event %s? for %s" s host
; 0l)
154 ~numwant
:(try Int32.of_string
(List.assoc
"numwant" args
) with _
-> -1l)
155 (int_of_string
(List.assoc
"port" args
))
157 write
sock false req ip port
;
158 set_reader
sock (fun _
->
161 t
.tracker_last_conn
<- last_time
();
162 file
.file_tracker_connected
<- true;
163 t
.tracker_interval
<- 600;
164 t
.tracker_min_interval
<- 600;
165 if need_sources
then t
.tracker_last_clients_num
<- 0;
167 let (interval
,clients
) = announce_response
p.udp_content
txn in
168 lprintf_nl
"udpt got interval %ld clients %d for host %s" interval
(List.length clients
) host
;
169 if interval
> 0l then
171 t
.tracker_interval
<- Int32.to_int interval
;
172 if t
.tracker_min_interval
> t
.tracker_interval
then
173 t
.tracker_min_interval
<- t
.tracker_interval
175 List.iter
(fun (ip'
,port
) ->
176 let ip = Ip.of_int64
(Int64.logand
0xFFFFFFFFL
(Int64.of_int32
ip'
)) in
177 lprintf_nl
"udpt got %s:%d" (Ip.to_string
ip) port
;
178 maybe_new_client file
Sha1.null
ip port
180 close
sock Closed_by_user
;
181 lprintf_nl
"udpt interact done for %s" host
;
182 if need_sources
then !resume_clients_hook file
186 lprintf_nl
"udpt interact exn %s" (Printexc2.to_string exn
)
191 In this function we connect to a tracker.
192 @param file The file concerned by the request
193 @param url Url of the tracker to connect
194 @param event Event (as a string) to send to the tracker :
195 can be 'completed' if the file is complete, 'started' for the first
196 connection to this tracker or 'stopped' for a clean stop of the file.
197 Everything else will be ok for a second connection to the tracker.
198 Be careful to the spelling of this event
199 @param f The function used to parse the result of the connection.
200 The function will get a file as an argument (@see talk_to_tracker
203 If we have less than !!ask_tracker_threshold sources
204 and if we respect the file_tracker_interval then
205 we really ask sources to the tracker
207 let connect_trackers file event need_sources f
=
209 (* reset session statistics when sending 'started' event *)
210 if event
= "started" then
212 file
.file_session_uploaded
<- Int64.zero
;
213 file
.file_session_downloaded
<- Int64.zero
;
216 let args,must_check_delay
, left
=
218 match file
.file_swarmer
with
222 | "started" -> [("event", "started")],true,zero
223 | "stopped" -> [("event", "stopped")],false,zero
228 let local_downloaded = CommonSwarming.downloaded swarmer
in
229 let left = file_size file
-- local_downloaded in
231 | "completed" -> [("event", "completed")],false,zero
232 | "started" -> [("event", "started")],true,left
233 | "stopped" -> [("event", "stopped")],false,left
237 let args = ("no_peer_id", "1") :: ("compact", "1") :: args in
239 if not need_sources
then
240 ("numwant", "0") :: args
241 else if !!numwant
> -1 then
242 ("numwant", string_of_int
!!numwant
) :: args
246 let args = if !!send_key
then
247 ("key", Sha1.to_hexa
!!client_uid
) :: args else args
249 let args = if !!force_client_ip
then
250 ("ip", Ip.to_string
!!set_client_ip
) :: args else args
253 ("info_hash", Sha1.direct_to_string file
.file_id
) ::
254 ("peer_id", Sha1.direct_to_string
!!client_uid
) ::
255 ("port", string_of_int
!!client_port
) ::
256 ("uploaded", Int64.to_string file
.file_session_uploaded
) ::
257 ("downloaded", Int64.to_string file
.file_session_downloaded
) ::
258 ("left", Int64.to_string
left) ::
262 let enabled_trackers =
263 let enabled_trackers = List.filter
(fun t
-> tracker_is_enabled t
) file
.file_trackers
in
264 if enabled_trackers <> [] then enabled_trackers
266 (* if there is no tracker left, do something ? *)
267 if !verbose_msg_servers
then
268 lprintf_nl
"No trackers left for %s, reenabling all of them..." (file_best_name
(as_file file
));
270 match t
.tracker_status
with
271 (* only re-enable after normal error *)
272 | Disabled _
-> t
.tracker_status
<- Enabled
273 | _
-> ()) file
.file_trackers
;
274 let enabled_trackers = List.filter
(fun t
-> tracker_is_enabled t
) file
.file_trackers
in
275 if enabled_trackers = [] && (file_state file
) <> FilePaused
then
277 file_pause
(as_file file
) (CommonUserDb.admin_user
());
278 lprintf_file_nl
(as_file file
) "Paused %s, no usable trackers left" (file_best_name
(as_file file
))
285 (* if we have too few sources we may ask the tracker before the interval *)
286 if not must_check_delay
287 || not file
.file_tracker_connected
288 || t
.tracker_last_conn
+ t
.tracker_interval
< last_time
()
289 || ( file
.file_clients_num
< !!ask_tracker_threshold
290 && (file_state file
) == FileDownloading
291 && (if t
.tracker_min_interval
> !!min_tracker_reask_interval
then
292 t
.tracker_last_conn
+ t
.tracker_min_interval
< last_time
()
294 t
.tracker_last_conn
+ !!min_tracker_reask_interval
< last_time
() ))
297 (* if we already tried to connect but failed, disable tracker, but allow re-enabling *)
298 if file
.file_tracker_connected
&& t
.tracker_last_clients_num
= 0 && t
.tracker_last_conn
< 1 then
300 if !verbose_msg_servers
then
301 lprintf_nl
"Request error from tracker: disabling %s" (show_tracker_url t
.tracker_url
);
302 t
.tracker_status
<- Disabled
(intern
"MLDonkey: Request error from tracker")
304 (* Send request to tracker *)
306 let args = if String.length t
.tracker_id
> 0 then
307 ("trackerid", t
.tracker_id
) :: args else args
309 let args = if String.length t
.tracker_key
> 0 then
310 ("key", t
.tracker_key
) :: args else args
312 if !verbose_msg_servers
then
313 lprintf_nl
"connect_trackers: connected:%s id:%s key:%s last_clients:%i last_conn-last_time:%i numwant:%s file: %s"
314 (string_of_bool file
.file_tracker_connected
)
315 t
.tracker_id t
.tracker_key t
.tracker_last_clients_num
316 (t
.tracker_last_conn
- last_time
()) (try List.assoc
"numwant" args with _
-> "_") file
.file_name
;
318 match t
.tracker_url
with
320 let module H
= Http_client
in
323 H.req_url
= Url.of_string ~
args: args url
;
324 H.req_proxy
= !CommonOptions.http_proxy
;
325 H.req_user_agent
= get_user_agent
();
326 (* #4541 [egs] supports redirect *)
327 H.req_max_retry
= !!max_tracker_redirect
;
330 if !verbose_msg_servers
then
331 lprintf_nl
"Request sent to tracker %s for file: %s"
335 t
.tracker_last_conn
<- last_time
();
336 file
.file_tracker_connected
<- true;
338 | `Other url
-> assert false (* should have been disabled *)
339 | `Udp
(host
,port
) -> interact host port
args file t need_sources
343 if !verbose_msg_servers
then
344 lprintf_nl
"Request NOT sent to tracker %s - next request in %ds for file: %s"
345 (show_tracker_url t
.tracker_url
) (t
.tracker_interval
- (last_time
() - t
.tracker_last_conn
)) file
.file_name
349 set_client_upload
(as_client c
) (as_file c
.client_file
);
350 set_client_has_a_slot
(as_client c
) NormalSlot
;
351 Rate.update_no_change c
.client_downloaded_rate
;
352 Rate.update_no_change c
.client_upload_rate
;
353 c
.client_last_optimist
<- last_time
();
354 client_enter_upload_queue
(as_client c
);
355 send_client c Unchoke
357 (** In this function we decide which peers will be
358 uploaders. We send a choke message to current uploaders
359 that are not in the next uploaders list. We send Unchoke
360 for clients that are in next list (and not in current)
362 let recompute_uploaders () =
363 if !verbose_upload
then lprintf_nl
"recompute_uploaders";
364 next_uploaders := choose_uploaders current_files
;
365 (*Send choke if a current_uploader is not in next_uploaders*)
366 List.iter
( fun c
-> if ((List.mem c
!next_uploaders)==false) then
368 set_client_has_a_slot
(as_client c
) NoSlot
;
369 (*we will let him finish his download and choke him on next_request*)
371 ) !current_uploaders;
373 (*don't send Choke if new uploader is already an uploaders *)
375 if not
(List.mem c
!current_uploaders) then start_upload c
377 current_uploaders := !next_uploaders
380 (****** Fabrice: why are clients which are disconnected removed ???
381 These clients might still be useful to reconnect to, no ? *)
384 (** This function is called when a client is disconnected
385 (be it by our side or its side).
386 A client which disconnects (even only one time) is discarded.
387 If it's an uploader which disconnects we recompute uploaders
388 (see recompute_uploaders) immediately.
389 @param c The client to disconnect
390 @param reason The reason for the disconnection (see in BasicSocket.ml)
392 let disconnect_client c reason
=
393 if !verbose_msg_clients
then
394 lprintf_nl
"Client %d: disconnected: %s" (client_num c
) (string_of_reason reason
);
396 match c
.client_sock
with
398 | ConnectionWaiting token
->
400 c
.client_sock
<- NoConnection
404 (* List.iter (fun r -> CommonSwarming.free_range r) c.client_ranges; *)
405 set_client_disconnected c reason
;
406 c
.client_session_downloaded
<- 0L;
407 c
.client_session_uploaded
<- 0L;
408 (try if c
.client_good
then count_seen c
with _
-> ());
409 (* this is not useful already done in the match
410 (try close sock reason with _ -> ()); *)
411 (*---------not needed ?? VvvvvV---------------
412 c.client_ranges <- [];
413 c.client_block <- None;
414 if not c.client_good then
415 connection_failed c.client_connection_control;
416 c.client_good <- false;
417 c.client_sock <- NoConnection;
418 c.client_chunks <- [];
419 c.client_allowed_to_write <- zero;
420 c.client_new_chunks <- [];
421 c.client_interesting <- false;
422 c.client_alrd_sent_interested <- false;
423 -------------------^^^^^--------------------*)
424 if (c
.client_registered_bitfield
) then
426 match c
.client_uploader
with
429 c
.client_uploader
<- None
;
430 (* If the client registered a bitfield then
431 we must unregister him to update the swarmer
432 (Useful for availability)
434 CommonSwarming.unregister_uploader up
435 (* c.client_registered_bitfield <- false;
436 for i = 0 to String.length c.client_bitmap - 1 do
437 c.client_bitmap.[0] <- '0';
440 (* Don't test if a client have an upload slot because
441 it don't have one (removed during earlier in
442 set_client_disconnected c reason)
444 if (List.mem c
!current_uploaders) then
446 (*BTGlobals.remove_client*)
448 recompute_uploaders ();
456 (** Disconnect all clients of a file
457 @param file The file to which we must disconnects all clients
459 let disconnect_clients file
=
460 let must_keep = ref true in
461 (match file_state file
with
462 | FilePaused
| FileCancelled
-> must_keep:=false
465 Hashtbl.iter
(fun _ c
->
466 if not
( !must_keep && (client_has_a_slot
(as_client c
) || c
.client_interested
)) then
468 if !verbose_msg_clients
then
469 lprintf_file_nl
(as_file file
) "disconnect since download is finished";
470 disconnect_client c Closed_by_user
475 (** What to do when a file is finished
476 @param file the finished file
478 let download_finished file
=
479 if List.memq file
!current_files
then
481 connect_trackers file
"completed" false (fun _ _
-> ()); (*must be called before swarmer gets removed from file*)
482 (*CommonComplexOptions.file_completed*)
483 file_completed
(as_file file
);
484 (* Remove the swarmer for this file as it is not useful anymore... *)
485 CommonSwarming.remove_swarmer file
.file_swarmer
;
486 file
.file_swarmer
<- None
;
487 (* At this point, the file state is FileDownloaded. We should not remove
488 the file, because we continue to upload. *)
492 (** Check if a file is finished or not.
493 A file is finished if all blocks are verified.
494 @param file The file to check status
496 let check_finished swarmer file
=
497 if CommonSwarming.check_finished swarmer
then
498 download_finished file
500 let bits = [| 128; 64; 32;16;8;4;2;1 |]
502 (* Check/set bits in strings (bittorrent format) *)
505 (Char.code s
.[n
lsr 3]) land bits.(n
land 7) <> 0
509 s
.[i] <- Char.unsafe_chr
(Char.code s
.[i] lor bits.(n
land 7))
511 (* Official client seems to use max_range_request 5 and max_range_len 2^14 *)
512 (* How much requests in the 'pipeline' *)
513 let max_range_requests = 5
514 (* How much bytes we can request in one Piece *)
517 (** A wrapper to send Interested message to a client.
518 (Send interested only if needed)
519 @param c The client to send Interested
521 let send_interested c
=
522 if c
.client_interesting
&& (not c
.client_alrd_sent_interested
) then
524 c
.client_alrd_sent_interested
<- true;
525 send_client c Interested
529 (** Send a Bitfield message to a client.
530 @param c The client to send the Bitfield message
533 let send_bitfield c
=
534 send_client c
(BitField
536 match c
.client_file
.file_swarmer
with
538 (* This must be a seeded file... *)
539 if !verbose_download
then
540 lprintf_nl
"Sending completed verified bitmap";
541 let nchunks = Array.length c
.client_file
.file_chunks
in
542 let len = (nchunks+7)/8 in
543 let s = String.make
len '
\000'
in
544 for i = 0 to nchunks - 1 do
549 let bitmap = CommonSwarming.chunks_verified_bitmap swarmer
in
550 if !verbose_download
then
551 lprintf_nl
"Sending verified bitmap: [%s]" (VB.to_string
bitmap);
552 let len = (VB.length
bitmap + 7)/8 in
553 let s = String.make
len '
\000'
in
555 if c
= VB.State_verified
then set_bit s i) bitmap;
561 let parse_reserved rbits c
=
562 let has_bit pos h
= Char.code rbits
.[pos
] land h
<> 0 in
564 c
.client_dht
<- has_bit 7 0x01;
565 c
.client_cache_extension
<- has_bit 7 0x02;
566 c
.client_fast_extension
<- has_bit 7 0x04;
568 c
.client_utorrent_extension
<- has_bit 5 0x10;
570 c
.client_azureus_messaging_protocol
<- has_bit 0 0x80
573 (** This function is called to parse the first message that
575 @param counter client num
576 @param cc Expected client (probably useless now that we don't save any client)
577 @param init_sent A boolean to know if we sent this client the handshake message
578 @param gconn Don't know
579 @param sock The socket we use for this client
580 @param proto Unused (required by tuple type?)
581 @param file_id The file hash (sha1) of the file involved in this exchange
583 (* removed: @param peer_id The hash (sha1) of the client. (Should be checked)
585 let rec client_parse_header counter cc init_sent gconn
sock
586 (proto
, rbits
, file_id
) =
588 set_lifetime
sock 600.;
589 if !verbose_msg_clients
then
590 lprintf_nl
"client_parse_header %d" counter;
592 let file = Hashtbl.find files_by_uid file_id
in
593 if !verbose_msg_clients
then
594 lprintf_file_nl
(as_file
file) "file found";
595 let ccc, cc_country_code
= !cc in
599 let c = new_client
file Sha1.null
(TcpBufferedSocket.peer_addr
sock) cc_country_code
in
600 if !verbose_connect
then lprintf_file_nl
(as_file
file) "Client %d: incoming connection" (client_num
c);
601 cc := (Some
c), cc_country_code
;
604 (* Does it happen that this c was already used to connect successfully?
605 If yes then this must happen: *)
606 c.client_received_peer_id
<- false;
607 if cc_country_code
<> None
&& c.client_country_code
= None
then
608 c.client_country_code
<- cc_country_code
;
610 (* client could have had Sha1.null as peer_id/uid *)
611 (* this is to be done, later
612 if c.client_uid <> peer_id then
613 c.client_software <- (parse_software (Sha1.direct_to_string peer_id));
617 (* if c.client_uid <> peer_id then begin
618 lprintf "Unexpected client by UID\n";
619 let ccc = new_client file peer_id (TcpBufferedSocket.host sock) in
620 lprintf "CLIENT %d: testing instead of %d\n"
621 (client_num ccc) (client_num c);
622 (match ccc.client_sock with
624 lprintf_nl "[BT]: This client is already connected";
625 close sock (Closed_for_error "Already connected");
629 lprintf_nl "[BT]: Client %d: recovered by UID" (client_num ccc);
637 if !verbose_msg_clients
then begin
638 let (ip,port
) = c.client_host
in
639 lprintf_nl
"Client %d: Connected from %s:%d" (client_num
c)
640 (Ip.to_string
ip) port
;
643 parse_reserved rbits
c;
645 (match c.client_sock
with
647 if !verbose_msg_clients
then begin
648 let (ip,port
) = c.client_host
in
649 lprintf_nl
"No connection to client (%s:%d)!!!" (Ip.to_string
ip) port
;
651 c.client_sock
<- Connection
sock
652 | ConnectionWaiting token
->
654 if !verbose_msg_clients
then
655 lprintf_nl
"Waiting for connection to client !!!";
656 c.client_sock
<- Connection
sock
657 | Connection
s when s != sock ->
658 if !verbose_msg_clients
then
659 lprintf_nl
"CLIENT %d: IMMEDIATE RECONNECTION" (client_num
c);
660 disconnect_client c (Closed_for_error
"Reconnected");
661 c.client_sock
<- Connection
sock;
665 set_client_state
(c) (Connected
(-1));
666 if not init_sent
then
668 c.client_incoming
<- true;
669 send_init
!!client_uid file_id
sock;
671 connection_ok
c.client_connection_control
;
672 if !verbose_msg_clients
then
673 lprintf_nl
"file and client found";
674 (* if not c.client_incoming then *)
676 c.client_blocks_sent
<- file.file_blocks_downloaded
;
678 TODO !!! : send interested if and only if we are interested
679 -> we must recieve at least other peer bitfield.
680 in common swarmer -> compare : partition -> partition -> bool
683 set_rtimeout
sock !!client_timeout
;
684 (* Once parsed succesfully we define the function client_to_client
685 to be the function used when a message is read *)
686 gconn
.gconn_handler
<- Reader
(fun gconn
sock ->
687 bt_handler
TcpMessages.parsing
(client_to_client
c) c sock
690 let b = TcpBufferedSocket.buf
sock in
691 (* The receive buffer is normally not empty now, lets parse the rest, most likely PeerID *)
693 ignore
(bt_handler
TcpMessages.parsing
(client_to_client
c) c sock);
695 (* Some newer clients send more opcodes in their handshake packet, lets parse them now.
696 Using "while b.len <> 0 do ... done" is not possible here because libtorrent clients
697 send unparsable five extra bytes after their PeerID which would result into a loop *)
699 ignore
(bt_handler
TcpMessages.parsing
(client_to_client
c) c sock);
703 let (ip,port
) = (TcpBufferedSocket.peer_addr
sock) in
704 if !verbose_unexpected_messages
then
705 lprintf_nl
"Client %s:%d requested a file that is not shared [%s]"
706 (Ip.to_string
ip) port
(Sha1.to_hexa file_id
)
708 lprintf_nl
"Exception %s in client_parse_header" (Printexc2.to_string e
);
709 close
sock (Closed_for_exception e
);
713 (** Update the bitmap of a client. Unclear if it is still useful.
714 @param c The client which we want to update.
716 and update_client_bitmap
c =
717 let file = c.client_file
in
719 let swarmer = match file.file_swarmer
with
721 | Some
swarmer -> swarmer
725 match c.client_uploader
with
727 let up = CommonSwarming.register_uploader
swarmer (as_client
c)
728 (AvailableIntervals
[]) in
729 c.client_uploader
<- Some
up;
735 let bitmap = match c.client_bitmap
with
737 let len = CommonSwarming.partition_size
swarmer in
738 let bitmap = Bitv.create
len false in
739 c.client_bitmap
<- Some
bitmap;
741 | Some
bitmap -> bitmap
744 if c.client_new_chunks
<> [] then begin
745 let chunks = c.client_new_chunks
in
746 c.client_new_chunks
<- [];
747 List.iter
(fun n
-> Bitv.set
bitmap n
true) chunks;
748 CommonSwarming.update_uploader_intervals
up (AvailableBitv
bitmap);
752 (** In this function we decide which piece we must request from client.
753 @param sock Socket of the client
756 and get_from_client
sock (c: client
) =
757 let file = c.client_file
in
758 (* Check if there's not enough requests in the 'pipeline'
759 and if a request can be send (not choked and file is downloading) *)
760 if List.length
c.client_ranges_sent
< max_range_requests
761 && file_state
file = FileDownloading
762 && (c.client_choked
== false)
764 (* num is the number of the piece, x and y are the position
765 of the subpiece in the piece(!), r is a (CommonSwarmer) range *)
767 let up = match c.client_uploader
with
770 let swarmer = CommonSwarming.uploader_swarmer
up in
776 if !verbose_msg_clients
then
777 lprintf_file_nl
(as_file
file) "CLIENT %d: Finding new range to send" (client_num
c);
779 if !verbose_swarming
then begin
780 lprintf_n
"Current download:\n Current chunks: ";
783 List.iter
(fun (x
,y
) -> lprintf
"%Ld-%Ld " x y
) c.client_chunks
784 with _
-> lprintf
"No Chunks";
788 lprintf_n
"Current ranges: ";
790 List.iter
(fun (p1
,p2
, r) ->
791 let (x
,y
) = CommonSwarming.range_range
r in
792 lprintf
"%Ld-%Ld[%Ld-%Ld] " p1 p2 x y
793 ) c.client_ranges_sent
;
795 match c.client_range_waiting
with
797 | Some
(x
,y
,r) -> lprintf
"Waiting %Ld-%Ld" x y
;
801 lprintf_n
"Current blocks: ";
803 match c.client_chunk
with
804 | None
-> lprintf
"none"
805 | Some
(chunk
, blocks
) -> List.iter
(fun b ->
806 CommonSwarming.print_block
b.up_block
) blocks
;
810 lprintf_file_nl
(as_file
file) "Finding Range:";
815 (*We must find a block to request first, and then
816 some range inside this block
821 match c.client_chunk
with
825 if !verbose_swarming
then lprintf_file_nl
(as_file
file) "No block";
826 update_client_bitmap
c;
827 (try CommonSwarming.verify_one_chunk
swarmer with _
-> ());
828 (*Find a free block in the swarmer*)
829 let chunk, blocks
= CommonSwarming.find_blocks
up in
830 if !verbose_swarming
then begin
831 lprintf_n
"Blocks Found: "; List.iter (fun b ->
832 CommonSwarming.print_block
b.up_block
) blocks
;
835 c.client_chunk
<- Some
(chunk, blocks
);
837 (*We put the found block in client_block to
838 request range in this block. (Useful for
839 not searching each time a new block)
844 | Some
(chunk, blocks
) ->
846 if !verbose_swarming
then begin
847 lprintf_n
"Current Blocks: "; List.iter (fun b ->
848 CommonSwarming.print_block
b.up_block
) blocks
;
853 (*Given a block find a range inside*)
855 match c.client_range_waiting
with
857 c.client_range_waiting
<- None
;
860 CommonSwarming.find_range
up (min max_range_len
file.file_piece_size
)
865 if y
-- x
> max_range_len
then begin
866 c.client_range_waiting
<- Some
(x
++ max_range_len
, y
, r);
867 (x
, x
++ max_range_len
, r)
872 c.client_ranges_sent
<- c.client_ranges_sent
@ [x
,y
, r];
873 (* CommonSwarming.alloc_range r; *)
875 (* naughty, naughty, was computing a block number instead of a chunk
876 number. Only matters with merged downloads, and even then other
877 clients didn't seem to care (?), so the bug remained hidden *)
878 if !verbose_swarming
then
879 lprintf_file_nl
(as_file
file) "Asking %d For Range %Ld-%Ld" chunk x y
;
881 chunk, x
-- file.file_piece_size
** Int64.of_int
chunk, y
-- x
, r
885 (*If we don't find a range to request inside the block,
886 iter to choose another block*)
887 if !verbose_swarming
then
888 lprintf_nl
"Could not find range in current block";
889 (* c.client_blocks <- List2.removeq b c.client_blocks; *)
891 c.client_chunk
<- None
;
899 (*If we don't find a block to request we can check if the
900 file is finished (if there's missing pieces we can't decide
901 that the file is finished because we didn't found
904 if !verbose_swarming
then
905 lprintf_nl
"Unable to get a block !!";
906 CommonSwarming.compute_bitmap
swarmer;
907 check_finished swarmer file;
911 send_client
c (Request
(num,x
,y
));
913 if !verbose_msg_clients
then
914 lprintf_file_nl
(as_file
file) "CLIENT %d: Asking %s For Range %Ld-%Ld"
915 (client_num
c) (Sha1.to_string
c.client_uid
) x y
918 if not
(CommonSwarming.check_finished swarmer) && !verbose_download
then
919 lprintf_file_nl
(as_file
file) "BTClient.get_from_client ERROR: can't find a block to download and file is not yet finished for file : %s..." file.file_name
922 (** In this function we match a message sent by a client
923 and react according to this message.
924 @param c The client which sent us a message
925 @param sock The socket used for this client
926 @param msg The message sent by the client
928 and client_to_client
c sock msg
=
929 if !verbose_msg_clients
then begin
930 let (ip,port
) = (TcpBufferedSocket.peer_addr
sock) in
931 let (timeout
, next
) = get_rtimeout
sock in
932 lprintf_nl
"CLIENT %d(%s:%d): (%d, %d,%d) Received %s"
933 (client_num
c) (Ip.to_string
ip) port
935 (int_of_float timeout
)
937 (TcpMessages.to_string msg
);
940 let file = c.client_file
in
942 (* Sending the "Have" message was moved to bTGlobals so this is useless *)
943 (* if c.client_blocks_sent != file.file_blocks_downloaded then begin
947 | b :: tail when tail == c.client_blocks_sent ->
948 c.client_blocks_sent <- list;
949 let (num,_,_) = CommonSwarming.block_block b in
950 send_client c (Have (Int64.of_int num))
951 | _ :: tail -> iter tail
953 iter file.file_blocks_downloaded
958 Piece
(num, offset
, s, pos
, len) ->
959 (*A Piece message contains the data*)
960 set_client_state
c (Connected_downloading
(file_num
file));
961 (*flag it as a good client *)
962 c.client_good
<- true;
963 if file_state
file = FileDownloading
then begin
964 let position = offset
++ file.file_piece_size
*.. num in
965 let up = match c.client_uploader
with
968 let swarmer = CommonSwarming.uploader_swarmer
up in
970 if !verbose_msg_clients
then
971 (match c.client_ranges_sent
with
972 [] -> lprintf_file_nl
(as_file
file) "EMPTY Ranges !!!"
974 let (x
,y
) = CommonSwarming.range_range
r in
975 lprintf_file_nl
(as_file
file) "Current range from %s : %Ld [%d] (asked %Ld-%Ld[%Ld-%Ld])"
976 (brand_to_string
c.client_brand
) position len
981 CommonSwarming.downloaded
swarmer in
982 (* List.iter CommonSwarming.free_range c.client_ranges; *)
983 CommonSwarming.received
up
985 (* List.iter CommonSwarming.alloc_range c.client_ranges; *)
987 CommonSwarming.downloaded
swarmer in
989 (*Update rate and amount of data received from client*)
990 count_download
c (new_downloaded -- old_downloaded);
991 (* use len here with max_dr quickfix *)
992 Rate.update
c.client_downloaded_rate ~amount
:len;
993 (* count bytes downloaded from network for this file *)
994 file.file_session_downloaded
<- file.file_session_downloaded
++ (Int64.of_int
len);
995 if !verbose_msg_clients
then
996 (match c.client_ranges_sent
with
997 [] -> lprintf_file_nl
(as_file
file) "EMPTY Ranges !!!"
999 let (x
,y
) = CommonSwarming.range_range
r in
1000 lprintf_file_nl
(as_file
file) "Received %Ld [%d] %Ld-%Ld[%Ld-%Ld] -> %Ld"
1003 (new_downloaded -- old_downloaded)
1006 (* changed 2.5.28 should have been done before !
1007 if new_downloaded <> old_downloaded then
1008 add_file_downloaded (as_file file)
1009 (new_downloaded -- old_downloaded); *)
1012 match c.client_ranges_sent
with
1015 (* CommonSwarming.free_range r; *)
1016 c.client_ranges_sent
<- tail
;
1018 get_from_client
sock c;
1020 (* Check if the client is still interesting for us... *)
1021 check_if_interesting
file c
1024 (* Disconnect if that is ourselves. *)
1025 c.client_uid
<- Sha1.direct_of_string
p;
1026 if not
(c.client_uid
= !!client_uid
) then
1028 let brand, release
= parse_software
p in
1029 c.client_brand
<- brand;
1030 c.client_release
<- release
;
1031 send_client
c Choke
;
1032 c.client_sent_choke
<- true;
1035 disconnect_client c Closed_by_user
1039 (*A bitfield is a summary of what a client have*)
1041 match c.client_file
.file_swarmer
with
1044 c.client_new_chunks
<- [];
1046 let npieces = CommonSwarming.partition_size
swarmer in
1047 let nbits = String.length
p * 8 in
1049 if nbits < npieces then begin
1050 lprintf_file_nl
(as_file
file) "Error: expected bitfield of atleast %d but got %d" npieces nbits;
1051 disconnect_client c (Closed_for_error
"Wrong bitfield length")
1054 let bitmap = CommonSwarming.chunks_verified_bitmap
swarmer in
1056 for i = 0 to npieces - 1 do
1057 if is_bit_set p i then begin
1058 c.client_new_chunks
<- i :: c.client_new_chunks
;
1059 match VB.get
bitmap i with
1060 | VB.State_missing
| VB.State_partial
->
1061 c.client_interesting
<- true
1062 | VB.State_complete
| VB.State_verified
-> ()
1066 update_client_bitmap
c;
1067 c.client_registered_bitfield
<- true;
1069 if c.client_interesting
then
1072 if !verbose_msg_clients
then
1073 lprintf_file_nl
(as_file
file) "New BitField Registered";
1075 (* for i = 1 to max_range_requests - List.length c.client_ranges do
1076 (try get_from_client sock c with _ -> ())
1082 (* Note: a bitfield must only be sent after the handshake and before everything else: NOT here *)
1085 (* A client can send a "Have" without sending a Bitfield *)
1087 match c.client_file
.file_swarmer
with
1090 let n = Int64.to_int
n in
1091 let bitmap = CommonSwarming.chunks_verified_bitmap
swarmer in
1092 (* lprintf_nl "verified: %c;" (VB.state_to_char (VB.get bitmap n)); *)
1093 (* if the peer has a chunk we don't, tell him we're interested and update his bitmap *)
1094 match VB.get
bitmap n with
1095 | VB.State_missing
| VB.State_partial
->
1096 c.client_interesting
<- true;
1098 c.client_new_chunks
<- n :: c.client_new_chunks
;
1099 update_client_bitmap
c;
1100 | VB.State_complete
| VB.State_verified
-> ()
1103 match c.client_bitmap, c.client_uploader with
1104 Some bitmap, Some up ->
1105 let swarmer = CommonSwarming.uploader_swarmer up in
1106 let n = Int64.to_int n in
1107 if bitmap.[n] <> '1' then
1109 let verified = CommonSwarming.verified_bitmap swarmer in
1110 if verified.[n] < '2' then begin
1111 c.client_interesting <- true;
1113 c.client_new_chunks <- n :: c.client_new_chunks;
1114 if c.client_block = None then begin
1115 update_client_bitmap c;
1116 (* for i = 1 to max_range_requests -
1117 List.length c.client_ranges do
1118 (try get_from_client sock c with _ -> ())
1122 | None
, Some _
-> lprintf_nl
"no bitmap but client_uploader";
1123 | Some _
, None
->lprintf_nl
"bitmap but no client_uploader";
1124 | None
, None
-> lprintf_nl
"no bitmap no client_uploader";
1130 c.client_interested
<- true;
1134 set_client_state
(c) (Connected
(-1));
1135 (* remote peer will clear the list of range we sent *)
1137 match c.client_uploader
with
1139 (* Afaik this is no protocol violation and happens if the client
1140 didn't send a client bitmap after the handshake. *)
1141 let (ip,port
) = c.client_host
in
1142 if !verbose_msg_clients
then lprintf_file_nl
(as_file
file) "%s:%d with software %s : Choke send, but no client bitmap"
1143 (Ip.to_string
ip) port
(brand_to_string
c.client_brand
)
1145 CommonSwarming.clear_uploader_intervals
up
1147 c.client_ranges_sent
<- [];
1148 c.client_range_waiting
<- None
;
1149 c.client_choked
<- true;
1153 c.client_interested
<- false;
1157 c.client_choked
<- false;
1158 (* remote peer cleared our request : re-request *)
1159 for i = 1 to max_range_requests -
1160 List.length
c.client_ranges_sent
do
1161 (try get_from_client
sock c with _
-> ())
1165 | Request
(n, pos
, len) ->
1166 if len > max_request_len
then begin
1167 close
sock (Closed_for_error
"Request longer than 1<<16");
1171 if !CommonGlobals.has_upload
= 0 then
1173 if client_has_a_slot
(as_client
c) then
1175 (* lprintf "Received request for upload\n"; *)
1176 (match c.client_upload_requests
with
1178 CommonUploads.ready_for_upload
(as_client
c);
1180 c.client_upload_requests
<- c.client_upload_requests
@ [n,pos
,len];
1181 let file = c.client_file
in
1182 match file.file_shared
with
1186 s.impl_shared_requests
<- s.impl_shared_requests
+ 1;
1187 shared_must_update
(as_shared
s)
1192 send_client
c Choke
;
1193 c.client_sent_choke
<- true;
1194 c.client_upload_requests
<- [];
1199 (* We don't 'generate' a Ping message on a Ping. *)
1201 | Cancel
(n, pos
, len) ->
1202 (* if we receive a cancel message from a peer, remove request *)
1203 if client_has_a_slot
(as_client
c) then
1204 c.client_upload_requests
<- List2.remove_first
(n, pos
, len) c.client_upload_requests
1206 if !verbose_msg_clients
then
1207 lprintf_file_nl
(as_file
file) "Error: received cancel request but client has no slot"
1210 lprintf_file_nl
(as_file
file) "Error %s while handling MESSAGE: %s" (Printexc2.to_string e
) (TcpMessages.to_string msg
)
1213 (** The function used to connect to a client.
1214 The connection is not immediately initiated. It will
1215 be put in a fifo and dequeued according to
1216 !!max_connections_per_second. (@see commonGlobals.ml)
1217 @param c The client we must connect
1219 let connect_client c =
1220 if can_open_connection connection_manager
&&
1221 (let (ip,port
) = c.client_host
in
1222 match !Ip.banned
(ip, c.client_country_code
) with
1225 if !verbose_connect
then
1226 lprintf_nl
"%s:%d (%s), blocked: %s"
1227 (Ip.to_string
ip) port
1228 (fst
(Geoip.get_country_code_name
c.client_country_code
))
1232 match c.client_sock
with
1236 add_pending_connection connection_manager
(fun token ->
1238 if !verbose_msg_clients
then
1239 lprintf_nl
"CLIENT %d: connect_client" (client_num
c);
1240 let (ip,port
) = c.client_host
in
1241 if !verbose_msg_clients
then
1242 lprintf_nl
"connecting %s:%d" (Ip.to_string
ip) port
;
1243 connection_try
c.client_connection_control
;
1245 let sock = connect
token "bittorrent download"
1246 (Ip.to_inet_addr
ip) port
1249 BASIC_EVENT LTIMEOUT
->
1250 if !verbose_msg_clients
then
1251 lprintf_nl
"CLIENT %d: LIFETIME" (client_num
c);
1252 close
sock Closed_for_timeout
1253 | BASIC_EVENT RTIMEOUT
->
1254 if !verbose_msg_clients
then
1255 lprintf_nl
"CLIENT %d: RTIMEOUT (%d)" (client_num
c)
1258 close
sock Closed_for_timeout
1259 | BASIC_EVENT
(CLOSED
r) ->
1261 match c.client_sock
with
1262 | Connection
s when s == sock ->
1263 disconnect_client c r
1269 c.client_sock
<- Connection
sock;
1270 set_lifetime
sock 600.;
1271 TcpBufferedSocket.set_read_controler
sock download_control
;
1272 TcpBufferedSocket.set_write_controler
sock upload_control
;
1273 TcpBufferedSocket.set_rtimeout
sock 30.;
1274 let file = c.client_file
in
1276 if !verbose_msg_clients
then
1277 lprintf_file_nl
(as_file
file) "READY TO DOWNLOAD FILE";
1279 send_init
!!client_uid
file.file_id
sock;
1280 (* Fabrice: Initialize the client bitmap and uploader fields to <> None *)
1281 update_client_bitmap
c;
1282 (* (try get_from_client sock c with _ -> ());*)
1284 (*We 'hook' the client_parse_header function to the socket
1285 This function will then be called when the first message will
1288 set_bt_sock
sock !verbose_msg_clients
1289 (BTHeader
(client_parse_header !counter (ref ((Some
c), c.client_country_code
)) true))
1292 lprintf_nl
"Exception %s while connecting to client"
1293 (Printexc2.to_string e
);
1294 disconnect_client c (Closed_for_exception e
)
1296 (*Since this is a pending connection put ConnectionWaiting
1300 c.client_sock
<- ConnectionWaiting
token
1304 (** The Listen function (very much like in C : TCP Socket Server).
1305 Monitors client connection to us.
1309 let s = TcpServerSocket.create
"bittorrent client server"
1310 (Ip.to_inet_addr
!!client_bind_addr
)
1314 TcpServerSocket.CONNECTION
(s,
1315 Unix.ADDR_INET
(from_ip
, from_port
)) ->
1316 (*Receiving an event TcpServerSocket.CONNECTION from
1317 the TcpServerSocket means that a new client try
1320 let ip = (Ip.of_inet_addr from_ip
) in
1321 let cc = Geoip.get_country_code_option
ip in
1322 if !verbose_sources
> 1 then lprintf_nl
"CONNECTION RECEIVED FROM %s"
1323 (Ip.to_string
(Ip.of_inet_addr from_ip
))
1325 (*Reject this connection if we don't want
1326 to bypass the max_connection parameter
1328 if can_open_connection connection_manager
&&
1329 (match !Ip.banned
(ip, cc) with
1332 if !verbose_connect
then
1333 lprintf_nl
"%s:%d (%s) blocked: %s"
1334 (Ip.to_string
ip) from_port
1335 (fst
(Geoip.get_country_code_name
cc))
1340 let token = create_token connection_manager
in
1341 let sock = TcpBufferedSocket.create
token
1342 "bittorrent client connection" s
1345 BASIC_EVENT
(RTIMEOUT
|LTIMEOUT
) ->
1346 (*monitor read and life timeout on client
1349 close
sock Closed_for_timeout
1353 TcpBufferedSocket.set_read_controler
sock download_control
;
1354 TcpBufferedSocket.set_write_controler
sock upload_control
;
1356 let c = ref (None
, cc) in
1357 TcpBufferedSocket.set_closer
sock (fun _
r ->
1360 match c.client_sock
with
1361 | Connection
s when s == sock ->
1362 disconnect_client c r
1367 set_rtimeout
sock 30.;
1369 (*Again : 'hook' client_parse_header to the socket*)
1370 set_bt_sock
sock !verbose_msg_clients
1371 (BTHeader
(client_parse_header !counter c false));
1374 (*don't forget to close the incoming sock if we can't
1375 open a new connection
1380 listen_sock
:= Some
s;
1383 if !verbose_connect
then
1384 lprintf_nl
"Exception %s while init bittorrent server"
1385 (Printexc2.to_string e
)
1388 (** This function send keepalive messages to all connected clients
1389 (and update socket lifetime)
1392 List.iter (fun file ->
1393 Hashtbl.iter (fun _
c ->
1394 match c.client_sock
with
1395 | Connection
sock ->
1397 set_lifetime
sock 130.;
1405 (** Check each clients for a given file if they are connected.
1406 If they aren't, try to connect them
1408 let resume_clients file =
1409 Hashtbl.iter (fun _
c ->
1411 match c.client_sock
with
1412 | Connection
sock -> ()
1413 (*i think this one is not really usefull for debugging
1414 lprintf_nl "[BT]: RESUME: Client is already connected"; *)
1417 (*test if we can connect client according to the its
1419 Currently the delay between two try is 120 seconds.
1421 if connection_can_try
c.client_connection_control
then
1424 print_control
c.client_connection_control
1427 if !verbose_connect
then
1428 lprintf_file_nl
(as_file
file) "Exception %s in resume_clients" (Printexc2.to_string e
)
1432 resume_clients_hook := resume_clients
1434 (** Check if the value replied by the tracker is correct.
1435 @param key the name of the key
1436 @param n the value to check
1437 @param url Url of the tracker
1438 @param name the name of the file
1440 let chk_keyval key
n url name
=
1441 let int_n = (Int64.to_int
n) in
1442 if !verbose_msg_clients
then
1443 lprintf_nl
"Reply from %s in file: %s has %s: %d" (show_tracker_url url
) name key
int_n;
1447 lprintf_nl
"Reply from %s in file: %s has an invalid %s value: %d" (show_tracker_url url
) name key
int_n;
1451 (** Check that client is valid and record it *)
1452 let maybe_new_client file id
ip port
=
1453 let cc = Geoip.get_country_code_option
ip in
1454 if id
<> !!client_uid
1457 && (match !Ip.banned
(ip, cc) with
1460 if !verbose_connect
then
1461 lprintf_file_nl
(as_file
file) "%s:%d blocked: %s" (Ip.to_string
ip) port reason
;
1464 ignore
(new_client
file id
(ip,port
) cc);
1465 if !verbose_sources
> 1 then
1466 lprintf_file_nl
(as_file
file) "Received %s:%d" (Ip.to_string
ip) port
;
1469 let exn_catch f x
= try `Ok
(f x
) with exn
-> `Exn exn
1471 (** In this function we interact with the tracker
1472 @param file The file for which we want some sources
1473 @param need_sources whether we need any sources
1475 let talk_to_tracker file need_sources
=
1476 (* This is the function which will be called by the http client for parsing the response *)
1478 let tracker_url = show_tracker_url t
.tracker_url in
1479 let tracker_failed reason
=
1480 (* On failure, disable the tracker and count attempts (@see is_tracker_enabled) *)
1481 let num = match t
.tracker_status
with | Disabled_failure
(i,_
) -> i + 1 | _
-> 1 in
1482 t
.tracker_status
<- Disabled_failure
(num, intern reason
);
1483 lprintf_file_nl
(as_file
file) "Failure no. %d%s from Tracker %s for file: %s Reason: %s"
1485 (if !!tracker_retries
= 0 then "" else Printf.sprintf
"/%d" !!tracker_retries
)
1486 tracker_url file.file_name
(Charset.Locale.to_utf8 reason
)
1488 match exn_catch File.to_string filename
with
1489 | `Exn _
| `Ok
"" -> tracker_failed "empty reply"
1491 match exn_catch Bencode.decode
s with
1492 | `Exn exn
-> tracker_failed (Printf.sprintf
"wrong reply (%s)" (Printexc2.to_string exn
))
1493 | `Ok
(Dictionary list
) ->
1494 t
.tracker_interval
<- 600;
1495 t
.tracker_min_interval
<- 600;
1496 t
.tracker_last_clients_num
<- 0;
1497 if need_sources
then t
.tracker_last_clients_num
<- 0;
1498 let chk_keyval key
n = chk_keyval key
n t
.tracker_url file.file_name
in
1499 if not
(List.mem_assoc
"failure reason" list
) then
1501 begin match t
.tracker_status
with
1502 | Disabled_failure
(i, _
) ->
1503 lprintf_file_nl
(as_file
file) "Received good message from Tracker %s after %d bad attempts"
1506 (* Received good message from tracker after failures, re-enable tracker *)
1507 t
.tracker_status
<- Enabled
;
1509 List.iter (fun (key
,value) ->
1510 match (key
,value) with
1511 | "failure reason", String failure
-> tracker_failed failure
1512 | "warning message", String warning
->
1513 lprintf_file_nl
(as_file
file) "Warning from Tracker %s in file: %s Reason: %s"
1514 tracker_url file.file_name warning
1515 | "interval", Int
n ->
1516 t
.tracker_interval
<- chk_keyval key
n;
1517 (* in case we don't receive "min interval" *)
1518 if t
.tracker_min_interval
> t
.tracker_interval
then
1519 t
.tracker_min_interval
<- t
.tracker_interval
1520 | "min interval", Int
n ->
1521 t
.tracker_min_interval
<- chk_keyval key
n;
1522 (* make sure "min interval" is always < or equal to "interval" *)
1523 if t
.tracker_min_interval
> t
.tracker_interval
then
1524 t
.tracker_min_interval
<- t
.tracker_interval
1525 | "downloaded", Int
n ->
1526 t
.tracker_torrent_downloaded
<- chk_keyval key
n
1528 | "done peers", Int
n ->
1529 t
.tracker_torrent_complete
<- chk_keyval key
n
1530 | "incomplete", Int
n ->
1531 t
.tracker_torrent_incomplete
<- chk_keyval key
n;
1532 (* if complete > 0 and we receive incomplete we probably won't receive num_peers so we simulate it below *)
1533 if t
.tracker_torrent_complete
> 0 then
1534 t
.tracker_torrent_total_clients_count
<- (t
.tracker_torrent_complete
+ t
.tracker_torrent_incomplete
);
1535 | "num peers", Int
n ->
1536 t
.tracker_torrent_total_clients_count
<- chk_keyval key
n;
1537 (* if complete > 0 and we receive num_peers we probably won't receive incomplete so we simulate it below *)
1538 if t
.tracker_torrent_complete
> 0 then
1539 t
.tracker_torrent_incomplete
<- (t
.tracker_torrent_total_clients_count
- t
.tracker_torrent_complete
);
1541 t
.tracker_torrent_last_dl_req
<- chk_keyval key
n
1542 | "key", String
n ->
1544 if !verbose_msg_clients
then
1545 lprintf_file_nl
(as_file
file) "%s in file: %s has key: %s" tracker_url file.file_name
n
1546 | "tracker id", String
n ->
1548 if !verbose_msg_clients
then
1549 lprintf_file_nl
(as_file
file) "%s in file: %s has tracker id %s" tracker_url file.file_name
n
1551 | "peers", List list
->
1552 if need_sources
then
1555 | Dictionary list
->
1556 let peer_id = ref Sha1.null
in
1557 let peer_ip = ref Ip.null
in
1562 "peer id", String id
->
1563 peer_id := Sha1.direct_of_string id
;
1564 | "ip", String
ip ->
1565 peer_ip := Ip.of_string
ip
1567 port := Int64.to_int
p
1571 t
.tracker_last_clients_num
<- t
.tracker_last_clients_num
+ 1;
1572 maybe_new_client file !peer_id !peer_ip !port
1576 | "peers", String
p ->
1577 let rec iter_comp s pos l
=
1579 let ip = Ip.of_ints
(get_uint8
s pos
,get_uint8
s (pos
+1),
1580 get_uint8
s (pos
+2),get_uint8
s (pos
+3))
1581 and port = get_int16
s (pos
+4)
1583 t
.tracker_last_clients_num
<- t
.tracker_last_clients_num
+ 1;
1584 maybe_new_client file Sha1.null
ip port;
1586 iter_comp s (pos
+6) l
1588 if need_sources
then
1589 iter_comp p 0 (String.length
p)
1590 | "private", Int
n -> ()
1591 (* TODO: if set to 1, disable peer exchange *)
1593 | key
, _
-> lprintf_file_nl
(as_file
file) "received unknown entry in answer from tracker: %s : %s" key
(Bencode.print
value)
1595 (*Now, that we have added new clients to a file, it's time
1596 to connect to them*)
1597 if !verbose_sources
> 0 then
1598 lprintf_file_nl
(as_file
file) "talk_to_tracker: got %i source(s) for file %s"
1599 t
.tracker_last_clients_num
file.file_name
;
1600 if need_sources
then resume_clients file
1602 | _
-> tracker_failed "wrong reply (value)"
1605 if file.file_tracker_connected
then ""
1608 connect_trackers file event need_sources
f
1611 (** Check to see if file is finished, if not
1612 try to get sources for it
1614 let recover_files () =
1615 if !verbose_share
then
1616 lprintf_nl
"recover_files";
1617 List.iter (fun file ->
1618 match file.file_swarmer
with
1621 (try check_finished swarmer file with e
-> ());
1622 match file_state
file with
1624 if !verbose_share
then
1625 lprintf_file_nl
(as_file
file) "recover downloading";
1626 (try talk_to_tracker file true with _
-> ())
1628 if !verbose_share
then
1629 lprintf_file_nl
(as_file
file) "recover shared";
1630 (try talk_to_tracker file false with _
-> ())
1631 | FilePaused
-> () (*when we are paused we do nothing, not even logging this vvvv*)
1633 | s -> if !verbose
then lprintf_file_nl
(as_file
file) "recover: Other state %s!!" (string_of_state
s)
1636 let upload_buffer = String.create
100000
1640 Send a Piece message
1641 for one of the request of client
1642 @param sock The socket of the client
1645 let rec iter_upload sock c =
1646 match c.client_upload_requests
with
1648 | (num, pos
, len) :: tail
->
1649 if len = zero
then begin
1650 c.client_upload_requests
<- tail
;
1653 if c.client_allowed_to_write
>= 0L then begin
1655 c.client_upload_requests
<- tail
;
1657 let file = c.client_file
in
1658 let offset = pos
++ file.file_piece_size
*.. num in
1659 c.client_allowed_to_write
<- c.client_allowed_to_write
-- len;
1661 let len = Int64.to_int
len in
1662 (* lprintf "Unix32.read: offset %Ld len %d\n" offset len; *)
1663 Unix32.read
(file_fd
file) offset upload_buffer 0 len;
1664 (* update upload rate from len bytes *)
1665 Rate.update
c.client_upload_rate ~amount
:len;
1666 Rate.update
c.client_downloaded_rate
;
1667 file.file_uploaded
<- file.file_uploaded
++ (Int64.of_int
len);
1668 file.file_session_uploaded
<- file.file_session_uploaded
++ (Int64.of_int
len);
1671 count_filerequest
c;
1672 match file.file_shared
with
1676 s.impl_shared_uploaded
<- file.file_uploaded
;
1677 shared_must_update
(as_shared
s)
1680 (* lprintf "sending piece\n"; *)
1681 send_client
c (Piece
(num, pos
, upload_buffer, 0, len));
1685 lprintf_nl
"Exception %s in iter_upload" (Printexc2.to_string e
)
1688 (* lprintf "client is waiting for another piece\n"; *)
1689 ready_for_upload
(as_client
c)
1694 In this function we check if we can send bytes (according
1695 to bandwidth control), if we can, call iter_upload to
1696 send a Piece message
1697 @param c the client to which we can send some bytes
1698 @param allowed the amount of bytes we can send to client
1700 let client_can_upload c allowed
=
1701 (* lprintf "allowed to upload %d\n" allowed; *)
1702 do_if_connected
c.client_sock
(fun sock ->
1703 match c.client_upload_requests
with
1706 let new_allowed_to_write =
1707 c.client_allowed_to_write
++ (Int64.of_int allowed
) in
1708 if allowed
> 0 && can_write_len
sock
1709 (Int64.to_int
new_allowed_to_write)
1711 CommonUploads.consume_bandwidth allowed
;
1712 c.client_allowed_to_write
<- new_allowed_to_write;
1717 let file_resume file =
1719 match t
.tracker_status
with
1720 | Enabled
| Disabled_mld
_ -> ()
1721 | Disabled_failure
_ | Disabled
_ -> t
.tracker_status
<- Enabled
1722 ) file.file_trackers
;
1723 (try talk_to_tracker file true with _ -> ())
1728 Send info to tracker when stopping a file.
1729 @param file the file we want to stop
1731 let file_stop file =
1732 if file.file_tracker_connected
then
1734 connect_trackers file "stopped" false (fun _ _ ->
1735 lprintf_file_nl
(as_file
file) "Tracker return: stopped %s" file.file_name
;
1736 file.file_tracker_connected
<- false)
1743 client_ops
.op_client_can_upload
<- client_can_upload;
1744 file_ops
.op_file_resume
<- file_resume;
1745 file_ops
.op_file_recover
<- file_resume;
1746 file_ops
.op_file_pause
<- (fun file ->
1747 Hashtbl.iter (fun _ c ->
1748 match c.client_sock
with
1749 Connection
sock -> close
sock Closed_by_user
1751 ) file.file_clients
;
1752 (*When a file is paused we consider it is stopped*)
1755 file_ops
.op_file_queue
<- file_ops
.op_file_pause
;
1756 client_ops
.op_client_enter_upload_queue
<- (fun c ->
1757 if !verbose_msg_clients
then
1758 lprintf_nl
"Client %d: client_enter_upload_queue" (client_num
c);
1759 ready_for_upload
(as_client
c));
1760 network
.op_network_connected_servers
<- (fun _ -> []);