1 (* Copyright 2001, 2002 b8_bavard, b8_fee_carabine, INRIA *)
3 This file is part of mldonkey.
5 mldonkey is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation; either version 2 of the License, or
8 (at your option) any later version.
10 mldonkey is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 GNU General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with mldonkey; if not, write to the Free Software
17 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
24 open TcpBufferedSocket
35 open CommonInteractive
37 open CommonComplexOptions
47 open GnutellaComplexOptions
51 (*************************************************************************)
55 (*************************************************************************)
57 let max_upload_buffer_len = 102400
58 let upload_buffer = String.create
max_upload_buffer_len
59 let current_downloads = ref ([] : TcpBufferedSocket.t list
)
61 (*************************************************************************)
65 (*************************************************************************)
67 let clean_sources () =
69 let obsolete_time = last_time
() - 3600 in
70 Hashtbl.iter
(fun key listref
->
71 let newlist = ref [] in
72 List.iter
(fun ((_
, time
) as u
) ->
73 if time
> obsolete_time then newlist := u
:: !newlist
75 if !newlist <> [] then
76 list := (key
, newlist) :: !list
78 Hashtbl.clear result_sources
;
79 List.iter
(fun (key
, listref
) ->
80 Hashtbl.add result_sources key listref
84 (*************************************************************************)
86 (* download_finished *)
88 (*************************************************************************)
90 let download_finished file
=
91 if List.memq file
!current_files
then begin
92 file_completed
(as_file file
);
93 CommonSwarming.remove_swarmer file
.file_swarmer
;
94 file
.file_swarmer
<- None
;
95 (* TODO: maybe this remove file may have a bad effect on sharing *)
96 GnutellaGlobals.remove_file file
;
97 old_files
=:= (file
.file_name
, file_size file
) :: !!old_files
;
99 c
.client_downloads
<- remove_download file c
.client_downloads
103 (*************************************************************************)
107 (*************************************************************************)
109 let check_finished swarmer file
=
110 if CommonSwarming.check_finished swarmer
then
111 download_finished file
113 (*************************************************************************)
117 (*************************************************************************)
119 let init_client sock
=
120 TcpBufferedSocket.set_read_controler sock download_control
;
121 TcpBufferedSocket.set_write_controler sock upload_control
;
124 (*************************************************************************)
128 (*************************************************************************)
130 let rec find_request c sock headers
=
132 if !verbose_msg_clients then begin
133 lprintf "HEADER FROM CLIENT:\n";
134 AnyEndian.dump_ascii header;
138 match c
.client_requests
with
139 [] -> failwith
"[GDO] No download request !!!"
141 c
.client_requests
<- tail
;
144 let file = d.download_file
in
147 match file.file_ttr
with
150 if not
d.download_ttr_requested
then
152 List.assoc
"x-thex-uri" headers
153 (* TODO: this header is used by Gnutella2 when you want to specify to which
154 depth you want the tigertree:
155 List.assoc "x-tigertree-path" headers *)
157 if String.length url
> 0 && url
.[0] = '
/'
then begin
159 let s = Printf.sprintf
"GET %s HTTP/1.1" url
in
162 let size = file_size
file in
163 let chunk_size = CommonUploads.tiger_block_size
in
164 Int64.to_int
(size // chunk_size) + 1 in
165 let tiger_pos = CommonUploads.tiger_pos nblocks in
166 let end_pos = tiger_pos + nblocks in
167 (* 24 is the size of SHA1 digests *)
168 let end_pos = end_pos * 24 in
169 let range = Printf.sprintf
"0-%d" (end_pos-1) in
171 if !verbose_msg_clients
then
172 lprintf
"[TTR] Requesting ttr with range %s\n" range;
174 ["Range", Printf.sprintf
"bytes=%s" range]
176 let s = make_download_request c
s headers in
177 if !verbose_msg_clients
then
178 lprintf
"[TTR] SENDING TTR REQUEST: %s\n" (String.escaped
s);
180 d.download_ttr_requested
<- true;
181 d.download_ranges
<- d.download_ranges
@ [TTRReq
nblocks];
182 c
.client_requests
<- c
.client_requests
@ [d];
184 lprintf
"[TTR] Url is not relative: %s\n" url
;
188 match d.download_ranges
with
191 d.download_ranges
<- tail
;
195 if !!gnutella_experimental
&&
196 not c
.client_support_head_request
then
197 c
.client_support_head_request
<- true;
198 (* TODO: why do we use the HEAD request ?? For the headers ? *)
199 (* lprintf "Received Reply to HEAD request: %s\n"
200 (String.escaped header); *)
201 (* d.download_head <- Head (first_line, headers); *)
202 (fun _ _ _
-> ()), read_more
d c sock
204 | RANGEReq
(_
,_
,r
) ->
205 (* let size = file_size file in*)
207 set_client_state c
(Connected_downloading
(file_num
file));
209 (* Send the next request !!! *)
210 for i
= 1 to GnutellaNetwork.max_queued_ranges
do
211 if List.length
d.download_ranges
<=
212 GnutellaNetwork.max_queued_ranges
then
213 (try get_from_client sock c
with _
-> ());
216 read_some
d c
, read_more
d c sock
219 (* TODO loading the ttr in main memory is probably a bad idea, we should
220 save it on disk in the next version. *)
221 if !verbose_msg_clients
then begin
222 lprintf
"[TTR] header received: \n";
223 print_head
"" headers;
225 let buf = Buffer.create
100 in
226 let read_ttr counter_pos b to_read_int
=
227 Buffer.add_substring
buf b
.buf b
.pos to_read_int
230 if !verbose_msg_clients
then
231 lprintf
"[TTR] ttr loaded\n";
232 let ttr = Buffer.contents
buf in
233 let tiger_pos = CommonUploads.tiger_pos nblocks in
234 let array = Array.create
nblocks TigerTree.null
in
235 for i
= 0 to nblocks - 1 do
236 array.(i
) <- TigerTree.direct_of_string
237 (String.sub
ttr ((tiger_pos + i
) * 24) 24)
239 file.file_ttr
<- Some
array;
241 (match file.file_swarmer
with
244 if !verbose_msg_clients
then
245 lprintf
"[TTR] set_verifier\n";
246 CommonSwarming.set_verifier swarmer
247 (Verification
(Array.map
(fun ttr -> TigerTree
ttr) array))
252 read_more d c sock
()
256 (*************************************************************************)
260 (*************************************************************************)
262 and read_some
d c counter_pos b to_read_int
=
264 let up = match d.download_uploader
with
268 let file = d.download_file
in
270 if file_state
file <> FileDownloading
then begin
271 disconnect_client c Closed_by_user
;
277 CommonSwarming.received
up
278 counter_pos b
.buf b
.pos to_read_int
;
280 lprintf
"FT: Exception %s in CommonSwarming.received\n"
281 (Printexc2.to_string e
);
282 (* TODO: we should pause the download !!! *)
284 c
.client_reconnect
<- true
286 (*************************************************************************)
290 (*************************************************************************)
292 and read_more d c sock
() =
293 (* If we have no more range to receive, disconnect *)
294 if d.download_ranges
= [] then get_from_client sock c
296 (*************************************************************************)
298 (* client_parse_header *)
300 (*************************************************************************)
302 and client_parse_header c gconn sock
(first_line
, headers) =
303 if !verbose_msg_clients
then begin
304 lprintf
"[GDO] CLIENT PARSE HEADER\n";
307 set_lifetime sock
3600.;
310 (* The reply should be "HTTP/1.1 200 OK" *)
311 let space_pos = String.index first_line ' '
in
312 let code = String.sub first_line
(space_pos+1) 3 in
313 let code = int_of_string
code in
315 GnutellaProtocol.parse_headers c first_line
headers;
317 (* TODO in case of bad code, just jump to the next reply header... *)
318 if code < 200 || code > 299 then
319 failwith
"Bad HTTP code";
321 connection_ok c
.client_connection_control
;
322 set_client_state c Connected_initiating
;
323 set_rtimeout sock
120.;
325 let start_pos, end_pos =
327 let (range,_
) = List.assoc
"content-range" headers in
329 let npos = (String.index
range 'b'
)+6 in
330 let dash_pos = try String.index
range '
-'
with _
-> -10 in
331 let slash_pos = try String.index
range '
/'
with _
-> -20 in
332 let star_pos = try String.index
range '
*'
with _
-> -30 in
333 if star_pos = slash_pos-1 then
334 failwith
"Cannot parse range"
335 (* Int64.zero, size (* "bytes */X" *) *)
337 let x = Int64.of_string
(
338 String.sub
range npos (dash_pos - npos) )
340 let len = String.length
range in
341 let y = Int64.of_string
(
342 String.sub
range (dash_pos+1) (slash_pos - dash_pos - 1))
344 if slash_pos = star_pos - 1 then
345 x, Int64.succ
y (* "bytes x-y/*" *)
347 let z = Int64.of_string
(
348 String.sub
range (slash_pos+1) (len - slash_pos -1) )
351 failwith
"Cannot parse range"
352 (* Int64.pred x, size *)
356 lprintf
"[GDO] Exception %s for range [%s]\n"
357 (Printexc2.to_string e
) range;
360 (* TODO: we should be able to reply to a request containing no Content-Range
361 iff Content-Length = Requested Length and
362 Requested Range = 0-Content-Length-1 ??? *)
366 if code <> 206 && code <> 200 then raise Not_found;
367 let (len,_) = List.assoc "content-length" headers in
368 let len = Int64.of_string len in
369 lprintf "Specified length: %Ld\n" len;
370 match d.download_ranges with
371 [] -> raise Not_found
372 | (start_pos,end_pos,r) :: _ ->
373 lprintf "WARNING: Assuming client is replying to range\n";
374 if len <> end_pos -- start_pos then
376 lprintf "\n\nERROR: bad computed range: %Ld-%Ld/%Ld \n"
377 start_pos end_pos len;
378 print_head first_line headers;
383 (* A bit dangerous, no ??? *)
384 if !verbose_unknown_messages
then
386 lprintf
"[GDO] ERROR: Could not find/parse range header (exception %s), disconnect\n"
387 (Printexc2.to_string e
);
388 print_head first_line
headers
390 disconnect_client c
(Closed_for_error
"Bad HTTP Range");
394 let (len,_
) = List.assoc
"content-length" headers in
395 let len = Int64.of_string
len in
396 (* lprintf "[GDO] Specified length: %Ld\n" len; *)
397 if len <> end_pos -- start_pos then
399 lprintf
"[GDO] ERROR: bad computed range: %Ld-%Ld/%Ld \n"
400 start_pos end_pos len;
401 print_head first_line
headers;
402 failwith
"Bad Computed Range"
406 lprintf
"[WARNING]: no Content-Length field\n";
407 print_head first_line
headers;
411 lprintf "[GDO] Receiving range: %Ld-%Ld (len = %Ld)\n"
412 start_pos end_pos (end_pos -- start_pos);
413 print_head first_line headers;
416 let read_some, read_more = find_request c sock
headers in
418 let counter_pos = ref start_pos in
419 gconn
.gconn_handler
<- Reader
(fun gconn sock
->
421 let b = TcpBufferedSocket.buf sock
in
422 let to_read = min
(end_pos -- !counter_pos)
423 (Int64.of_int
b.len) in
425 (* lprintf "[GDO] Reading: end_pos %Ld counter_pos %Ld len %d = to_read %Ld\n"
426 end_pos !counter_pos b.len to_read;
428 let to_read_int = Int64.to_int
to_read in
430 read_some !counter_pos b to_read_int;
431 buf_used
b to_read_int;
432 counter_pos := !counter_pos ++ to_read;
433 if !counter_pos = end_pos then begin
435 gconn
.gconn_handler
<- HttpReader
(4,
436 ["HTTP",client_parse_header c
],
437 GnutellaFunctions.default_handler
)
441 if !verbose_unknown_messages
then
443 lprintf
"[GDO] Exception %s in client_parse_header\n" (Printexc2.to_string e
);
444 print_head first_line
headers;
446 disconnect_client c
(Closed_for_exception e
);
449 (*************************************************************************)
451 (* get_from_client *)
453 (*************************************************************************)
455 (* TODO: implement TigerTree download *)
456 (* TODO: use Gnutella2 alternative uids *)
458 and get_from_client sock
(c
: client
) =
460 let rec iter downloads
=
463 if !verbose_msg_clients
then
464 lprintf
"No other download to start\n";
468 if file_state
d.download_file
<> FileDownloading
then iter tail
471 let file = d.download_file
in
473 let swarmer = match file.file_swarmer
with
474 None
-> assert false | Some sw
-> sw
477 let up = match d.download_uploader
with
479 let chunks = [ Int64.zero
, file_size
file ] in
480 let up = CommonSwarming.register_uploader
swarmer
482 (AvailableIntervals
chunks) in
483 d.download_uploader
<- Some
up;
488 if !verbose_msg_clients
then begin
489 lprintf
"FINDING ON CLIENT\n";
491 let file = d.download_file
in
492 if !verbose_msg_clients
then begin
493 lprintf
"FILE FOUND, ASKING\n";
496 if !verbose_swarming
then begin
497 lprintf
"Current download:\n Current chunks: ";
498 List.iter (fun (x,y) -> lprintf
"%Ld-%Ld " x y) d.download_chunks
;
499 lprintf
"\n Current ranges: ";
500 List.iter (fun req
->
503 (* let (x,y) = CommonSwarming.range_range r in *)
504 lprintf
"%Ld-%Ld " x y
511 lprintf
"\n Current blocks: ";
512 (* List.iter (fun b -> CommonSwarming.print_block b.up_block) d.download_blocks; *)
513 lprintf
"\n\nFinding Range: \n";
518 match d.download_blocks
with
520 if !verbose_swarming
then lprintf
"No block\n";
521 (try CommonSwarming.verify_one_chunk
swarmer with _
-> ());
522 let _chunk, blocks
= CommonSwarming.find_blocks
up in
524 if !verbose_swarming
then begin
525 lprintf
"GOT BLOCKS:\n";
526 CommonSwarming.print_uploaders
swarmer;
527 lprintf
"Blocks Found: ";
529 CommonSwarming.print_block
b.up_block
) blocks
;
532 d.download_blocks
<- blocks
;
536 if !verbose_swarming then begin
537 lprintf "Current Block: "; CommonSwarming.print_block b;
542 CommonSwarming.find_range
up (Int64.of_int
(256 * 1024)) in
544 if !verbose_swarming
then begin
545 lprintf
"GOT RANGE:\n";
546 CommonSwarming.print_uploaders
swarmer;
548 d.download_ranges
<- d.download_ranges
@
550 Printf.sprintf
"%Ld-%Ld" x (Int64.pred
y)
552 if !verbose_swarming
then
553 lprintf
"Could not find range in current block\n";
554 d.download_blocks
<- [];
559 if !verbose_unknown_messages
then
560 lprintf_nl
"Unable to get a block !!";
561 check_finished swarmer file;
564 let s = request_of_download
"GET" d in
566 ("Range", Printf.sprintf
"bytes=%s" range) :: []
568 let s = make_download_request c
s headers in
569 if !verbose_msg_clients
then
570 lprintf
"SENDING REQUEST: %s\n" (String.escaped
s);
572 c
.client_requests
<- c
.client_requests
@ [d];
573 if !verbose_msg_clients
then
574 lprintf
"Asking %s For Range %s\n" (Md4.to_string c
.client_user
.user_uid
)
578 iter c
.client_downloads
580 (*************************************************************************)
582 (* disconnect_client *)
584 (*************************************************************************)
586 and disconnect_client c r
=
588 lprintf_nl
"DISCONNECT CLIENT";
589 match c
.client_sock
with
592 if !verbose_msg_clients
then
593 lprintf_nl
"Disconnected from source for %s" (string_of_reason r
);
594 c
.client_requests
<- [];
595 connection_failed c
.client_connection_control
;
596 set_client_disconnected c r
;
598 c
.client_sock
<- NoConnection
;
600 let file = d.download_file
in
601 if not
(List.memq
file c
.client_in_queues
) then begin
602 Queue.put
file.file_clients_queue
(0,c
);
603 c
.client_in_queues
<- file :: c
.client_in_queues
605 match d.download_uploader
with
608 d.download_uploader
<- None
;
609 CommonSwarming.unregister_uploader
up;
610 d.download_blocks
<- [];
611 d.download_ranges
<- [];
612 ) c
.client_downloads
;
614 match c
.client_connected_for
with
617 file.file_nconnected_clients
<- file.file_nconnected_clients
- 1;
619 lprintf "For file %s, %d/%d clients connected (disconnected from %d)\n"
620 (file.file_name) file.file_nconnected_clients (nranges file)
621 (client_num (as_client c.client_client));
623 c
.client_connected_for
<- None
626 if c
.client_reconnect
then begin
627 c
.client_reconnect
<- false;
631 lprintf
"Exception %s in disconnect_client\n"
632 (Printexc2.to_string e
))
635 (*************************************************************************)
639 (*************************************************************************)
641 and connect_client c
=
642 match c
.client_sock
with
643 | Connection _
| ConnectionWaiting _
-> ()
646 (* Count this connection in the first file counter. Here, we assume
647 that the connection will not be aborted (otherwise, disconnect_client
648 should clearly be called). *)
649 let download = ref None
in
650 (try List.iter (fun d ->
651 let file = d.download_file
in
652 if file_state
file = FileDownloading
then
655 c
.client_connected_for
<- Some
file;
656 file.file_nconnected_clients
<-
657 file.file_nconnected_clients
+ 1;
659 lprintf "For file %s, %d/%d clients connected (connecting %d)\n"
661 file.file_nconnected_clients (nranges file)
662 (client_num (as_client c.client_client)); *)
665 ) c
.client_downloads
with _
-> ());
668 match c
.client_user
.user_kind
with
669 Indirect_location
(_
, uid
, _
, _
) ->
670 GnutellaProto.ask_for_push uid
671 | Known_location
(ip
, port
) ->
673 add_pending_connection connection_manager
(fun token ->
675 if !verbose_msg_clients
then begin
676 lprintf
"connect_client\n";
678 if !verbose_msg_clients
then begin
679 lprintf
"connecting %s:%d\n" (Ip.to_string ip
) port
;
681 c
.client_reconnect
<- false;
682 let sock = connect
token "gnutella download"
683 (Ip.to_inet_addr ip
) port
686 | BASIC_EVENT RTIMEOUT
->
687 disconnect_client c Closed_for_timeout
688 | BASIC_EVENT LTIMEOUT
->
689 disconnect_client c Closed_for_lifetime
690 | BASIC_EVENT
(CLOSED
s) ->
691 disconnect_client c
s
693 (* You can only use the CONNECTED signal if the socket is not yet controlled
694 by the bandwidth manager... 2004/02/03: Normally, not true anymore, it should
695 now work even in this case... *)
699 get_from_client
sock c
704 c
.client_host
<- Some
(ip
, port
);
705 c
.client_country_code
<- None
;
706 check_client_country_code c
;
707 set_client_state c Connecting
;
708 c
.client_sock
<- Connection
sock;
709 TcpBufferedSocket.set_closer
sock (fun _
s ->
710 disconnect_client c
s
712 set_rtimeout
sock 30.;
713 set_gnutella_sock
sock !verbose_msg_clients
714 (HttpReader
(4, ["HTTP", client_parse_header c
],
715 GnutellaFunctions.default_handler
))
718 lprintf
"Exception %s while connecting to client\n"
719 (Printexc2.to_string e
);
720 disconnect_client c
(Closed_for_exception e
)
723 c
.client_sock
<- ConnectionWaiting
token
728 1022569854.519 24.102.10.39:3600 -> 212.198.235.45:51736 of len 82
730 G I V 8 1 : 9 7 4 3 2 1 3 F B 4 8 6 2 3 D 0 F F D F A B B 3 8 0 E C 6 C 0 0 / P o l i c e V i d e o - E v e r y B r e a t h Y o u T a k e . m p g(10)(10)]
732 "GIV %d:%s/%s\n\n" file.file_number client.client_md4 file.file_name
737 (*************************************************************************)
741 (*************************************************************************)
743 let push_handler cc gconn
sock (first_line
, headers) =
744 if !verbose_msg_clients
then begin
746 print_head first_line
headers;
749 let (ip
, port
) = TcpBufferedSocket.peer_addr
sock in
751 if !verbose_msg_clients
then begin
752 lprintf
"PARSING GIV HEADER\n";
754 let colon_pos = String.index first_line '
:'
in
755 let uid = Md4.of_string
(String.sub first_line
(colon_pos+1) 32) in
756 let index = int_of_string
(String.sub first_line
4 (colon_pos-4)) in
757 if !verbose_msg_clients
then begin
761 Hashtbl.find clients_by_uid
(Indirect_location
("", uid, ip
, port
))
764 Hashtbl.find clients_by_uid
(Known_location
(ip
,port
))
766 let c = new_client
(Indirect_location
("", uid, ip
, port
)) in
767 if String.length
c.client_user
.user_nick
== 0 then
768 c.client_user
.user_nick
<- (Md4.to_string
uid);
772 c.client_host
<- Some
(ip
, port
);
773 check_client_country_code
c;
774 match c.client_sock
with
776 if !verbose_msg_clients
then begin
777 lprintf
"ALREADY CONNECTED\n";
779 close
sock (Closed_for_error
"already connected");
782 if !verbose_msg_clients
then begin
783 lprintf
"NEW CONNECTION\n";
786 c.client_sock
<- Connection
sock;
787 connection_ok
c.client_connection_control
;
789 if !verbose_msg_clients
then begin
790 lprintf
"FINDING FILE %d\n" index;
792 let d = find_download_by_index
index c.client_downloads
in
793 if !verbose_msg_clients
then begin
794 lprintf
"FILE FOUND\n";
797 c.client_downloads
<- d :: (List2.removeq
d c.client_downloads
);
798 get_from_client
sock c;
799 gconn
.gconn_handler
<- HttpReader
(4,
800 ["HTTP", client_parse_header
c],
801 GnutellaFunctions.default_handler
)
803 lprintf
"Exception %s during client connection\n"
804 (Printexc2.to_string e
);
805 disconnect_client
c (Closed_for_exception e
);
808 lprintf
"Exception %s in push_handler\n" (Printexc2.to_string e
);
809 print_head first_line
headers;
810 (match !cc
with Some
c -> disconnect_client
c (Closed_for_exception e
)
814 (*************************************************************************)
818 (*************************************************************************)
820 (* TODO: add Gnutella2 alternative UIDs *)
821 (* TODO: implement TigerTree upload *)
823 let read_request url
headers gconn
sock =
824 let url = Url.of_string
url in
826 let reader, size, add_headers
= find_file_to_upload gconn
url in
827 let partial, (chunk_pos
, chunk_end
) =
830 let (range,_
) = List.assoc
"range" headers in
831 match Http_server.parse_range
range with
832 x, None
, _
-> x, size
833 | x, Some
y, Some
z ->
834 if y = z then (* some vendor bug *)
840 with _
-> false, (Int64.zero
, size)
843 let chunk_len = chunk_end
-- chunk_pos
in
844 (* TODO: assert that the the range is inside the file *)
849 Printf.sprintf
"HTTP/1.1 %s"
850 (if partial then "206 Partial Reply" else "200 OK") in
852 ("Connection", "Keep-Alive") ::
853 ("Content-type", "application/binary") ::
854 ("Content-length", Printf.sprintf
"%Ld" chunk_len) ::
858 if partial then begin
859 ("Accept-Ranges", "bytes") ::
861 Printf.sprintf
"bytes %Ld-%Ld/%Ld"
862 chunk_pos
(Int64.pred chunk_end
) size) ::
866 if gconn
.gconn_client_info_sent
then headers else
868 gconn
.gconn_client_info_sent
<- true;
869 ("Remote-IP", Ip.to_string
(fst
(peer_addr
sock)) ) ::
870 ("Server", get_user_agent
()) :: headers
873 make_http_header
s headers
880 uc_chunk_pos
= chunk_pos
;
881 uc_chunk_len
= chunk_end
-- chunk_pos
;
882 uc_chunk_end
= chunk_end
;
888 (*************************************************************************)
892 (*************************************************************************)
895 let rec iter list rem
=
898 if List.length rem
>= !!max_available_slots
then begin
900 if !verbose_msg_clients
then begin
901 lprintf
"[GUP] All slots used:\n";
903 let (ip
, port
) = peer_addr
s in
904 lprintf
" by %s:%d %b\n" (Ip.to_string ip
) port
908 failwith
"All Slots Used";
912 if s == sock then list @ rem
919 current_downloads := iter !current_downloads []
922 (*************************************************************************)
926 (*************************************************************************)
928 let get_handler get_request cc gconn
sock (first_line
, headers) =
929 if !verbose_msg_clients
then begin
931 print_head first_line
headers;
934 (* We don't want to buffer more than 100 kB per upload connection *)
935 set_max_output_buffer
sock max_upload_buffer_len;
936 if !verbose_msg_clients
then
937 lprintf
"[GUP] After set_max_output_buffer: max_refill %d\n"
939 (* lprintf "parse_head\n"; *)
941 let request_end = String.index first_line ' '
in
942 let request = String.sub first_line
0 request_end in
944 let url_end = String.index_from first_line
(request_end+1) ' '
in
945 let url = String.sub first_line
(request_end+1)
946 (url_end - request_end-1) in
948 let proto_len = String.length first_line
in
949 let proto = String.sub first_line
(url_end+1) (proto_len-url_end-1) in
950 if !verbose_msg_clients
then
951 lprintf
"[GUP] Header parsed: [%s] [%s] [%s]\n" request url proto;
952 (* "/get/num/filename" *)
954 (* First of all, can we accept this request ???? *)
956 let uc = read_request url headers gconn
sock in
957 let header_sent = ref false in
959 (* For HEAD request, do as if we had already sent the data *)
960 if not get_request
then uc.uc_chunk_pos
<- uc.uc_chunk_end
;
962 let write_done = ref false in
963 let rec refill sock =
964 if !verbose_msg_clients
then
965 lprintf
"[GUP] refill called\n";
966 if !write_done && not
!!keep_alive
&& remaining_to_write
sock = 0 then
968 if !verbose_msg_clients
then
969 lprintf
"CLOSING AFTER WRITE\n";
970 close
sock (Closed_by_user
)
973 let can_write = max_refill
sock in
974 if not
!header_sent then
975 if can_write > String.length
uc.uc_header
then begin
976 (* BUG: send the header *)
977 TcpBufferedSocket.write_string
sock uc.uc_header
;
978 if !verbose_msg_clients
then begin
979 lprintf
"[GUP] Sending Header:\n";
980 AnyEndian.dump
uc.uc_header
;
985 if !header_sent then begin
986 if uc.uc_chunk_pos
= uc.uc_chunk_end
then begin
987 if !verbose_msg_clients
then
988 lprintf
"[GUP] Finished replying to header\n";
991 match gconn
.gconn_refill
with
994 gconn
.gconn_refill
<- []
995 | _
:: ((refill :: _
) as tail
) ->
996 gconn
.gconn_refill
<- tail
;
1001 let pos = uc.uc_chunk_pos
in
1002 let to_write = uc.uc_chunk_end
-- pos in
1003 let rlen = min
(max_refill
sock) (Int64.to_int
to_write) in
1004 if !verbose_msg_clients
then
1005 lprintf
"[GUP] to_write: %d/%Ld/%d\n" rlen to_write
1006 (remaining_to_write
sock);
1008 if rlen > 0 then begin
1009 uc.uc_reader
pos upload_buffer 0 rlen;
1010 if !verbose_msg_clients
then
1011 lprintf
"[GUP] Writting %d\n" rlen;
1012 TcpBufferedSocket.write
sock upload_buffer 0 rlen;
1014 uc.uc_chunk_pos
<- uc.uc_chunk_pos
++ (Int64.of_int
rlen);
1015 if remaining_to_write
sock = 0 then refill sock
1019 gconn
.gconn_refill
<- gconn
.gconn_refill
@ [refill];
1020 if !verbose_msg_clients
then
1021 lprintf
"[GUP] refill: %d refillers\n" (List.length gconn
.gconn_refill
);
1022 match gconn
.gconn_refill
with
1024 (* First refill handler, must be called immediatly *)
1026 | _
-> (* Already a refill handler, wait for it to finish its job *)
1030 (* TODO: send back a 404 NOT FOUND error *)
1031 if !verbose_msg_clients
then begin
1032 lprintf
"[GUP] Exception %s in get_handler\n" (Printexc2.to_string e
);
1033 print_head first_line
headers;
1035 close
sock (Closed_for_exception e
);
1038 (*************************************************************************)
1042 (*************************************************************************)
1046 let sock = TcpServerSocket.create
"gnutella client server"
1047 (Ip.to_inet_addr
!!client_bind_addr
)
1051 TcpServerSocket.CONNECTION
(s,
1052 Unix.ADDR_INET
(from_ip
, from_port
)) ->
1054 lprintf
"CONNECTION RECEIVED FROM %s FOR PUSH\n%s"
1055 (Ip.to_string
(Ip.of_inet_addr from_ip
))
1056 "*********** CONNECTION ***********\n";
1058 let token = create_token connection_manager
in
1059 let sock = TcpBufferedSocket.create
token
1060 "gnutella client connection" s
1063 BASIC_EVENT RTIMEOUT
-> close
sock Closed_for_timeout
1064 | BASIC_EVENT LTIMEOUT
-> close
sock Closed_for_lifetime
1068 TcpBufferedSocket.set_read_controler
sock download_control
;
1069 TcpBufferedSocket.set_write_controler
sock upload_control
;
1072 TcpBufferedSocket.set_closer
sock (fun _
s ->
1074 Some
c -> disconnect_client
c s
1077 lprintf
"DISCONNECTION BEFORE CLIENT %s:%d IS KNOWN\n"
1078 (Ip.to_string
(peer_ip
sock)) (peer_port
sock)
1080 TcpBufferedSocket.set_rtimeout
sock 30.;
1081 set_gnutella_sock
sock !verbose_msg_clients
1084 "GIV", push_handler c;
1085 "GET", get_handler true c;
1086 "HEAD", get_handler false c;
1087 ], GnutellaFunctions.default_handler
)
1091 listen_sock
:= Some
sock;
1094 lprintf
"Exception %s while init gnutella server\n"
1095 (Printexc2.to_string e
)
1097 (*************************************************************************)
1099 (* push_connection *)
1101 (*************************************************************************)
1103 let push_connection guid
index ip port
=
1105 add_pending_connection connection_manager
(fun token ->
1106 let sh =Hashtbl.find shareds_by_id
index in
1107 let sock = connect
token "gnutella download"
1108 (Ip.to_inet_addr ip
) port
1111 BASIC_EVENT RTIMEOUT
-> close
sock Closed_for_timeout
1112 | BASIC_EVENT LTIMEOUT
-> close
sock Closed_for_lifetime
1116 lprintf
"CONNECTION PUSHED TO %s\n" (Ip.to_string ip
);
1118 TcpBufferedSocket.set_read_controler
sock download_control
;
1119 TcpBufferedSocket.set_write_controler
sock upload_control
;
1122 TcpBufferedSocket.set_closer
sock (fun _ s ->
1124 Some
c -> disconnect_client
c s
1127 TcpBufferedSocket.set_rtimeout
sock 30.;
1128 (* TODO test this, looks strange... *)
1129 set_gnutella_sock
sock !verbose_msg_clients
1131 "GET", get_handler true c;
1132 "HEAD", get_handler false c;
1133 ], GnutellaFunctions.default_handler
));
1135 (Printf.sprintf
"GIV %d:%s/%s\n\n"
1136 index (Md4.to_string guid
) sh.shared_codedname
)