release-3-0-2
[mldonkey.git] / src / networks / gnutella / gnutellaClients.ml
blobf409ae21067bad30239674a572f27700c9df6b7d
1 (* Copyright 2001, 2002 b8_bavard, b8_fee_carabine, INRIA *)
2 (*
3 This file is part of mldonkey.
5 mldonkey is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation; either version 2 of the License, or
8 (at your option) any later version.
10 mldonkey is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 GNU General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with mldonkey; if not, write to the Free Software
17 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
22 open Options
23 open BasicSocket
24 open TcpBufferedSocket
25 open Int64ops
26 open Printf2
27 open Md4
28 open Queues
30 open CommonSwarming
31 open CommonShared
32 open CommonUploads
33 open CommonOptions
34 open CommonDownloads
35 open CommonInteractive
36 open CommonClient
37 open CommonComplexOptions
38 open CommonTypes
39 open CommonFile
40 open CommonGlobals
41 open CommonDownloads
43 open GnutellaNetwork
44 open GnutellaTypes
45 open GnutellaOptions
46 open GnutellaGlobals
47 open GnutellaComplexOptions
48 open GnutellaProto
49 open GnutellaProtocol
51 (*************************************************************************)
52 (* *)
53 (* Global Values *)
54 (* *)
55 (*************************************************************************)
57 let max_upload_buffer_len = 102400
58 let upload_buffer = String.create max_upload_buffer_len
59 let current_downloads = ref ([] : TcpBufferedSocket.t list)
61 (*************************************************************************)
62 (* *)
63 (* clean_sources *)
64 (* *)
65 (*************************************************************************)
67 let clean_sources () =
68 let list = ref [] in
69 let obsolete_time = last_time () - 3600 in
70 Hashtbl.iter (fun key listref ->
71 let newlist = ref [] in
72 List.iter (fun ((_, time) as u) ->
73 if time > obsolete_time then newlist := u :: !newlist
74 ) !listref;
75 if !newlist <> [] then
76 list := (key, newlist) :: !list
77 ) result_sources;
78 Hashtbl.clear result_sources;
79 List.iter (fun (key, listref) ->
80 Hashtbl.add result_sources key listref
81 ) !list
84 (*************************************************************************)
85 (* *)
86 (* download_finished *)
87 (* *)
88 (*************************************************************************)
90 let download_finished file =
91 if List.memq file !current_files then begin
92 file_completed (as_file file);
93 CommonSwarming.remove_swarmer file.file_swarmer;
94 file.file_swarmer <- None;
95 (* TODO: maybe this remove file may have a bad effect on sharing *)
96 GnutellaGlobals.remove_file file;
97 old_files =:= (file.file_name, file_size file) :: !!old_files;
98 List.iter (fun c ->
99 c.client_downloads <- remove_download file c.client_downloads
100 ) file.file_clients
103 (*************************************************************************)
104 (* *)
105 (* check_finished *)
106 (* *)
107 (*************************************************************************)
109 let check_finished swarmer file =
110 if CommonSwarming.check_finished swarmer then
111 download_finished file
113 (*************************************************************************)
114 (* *)
115 (* init_client *)
116 (* *)
117 (*************************************************************************)
119 let init_client sock =
120 TcpBufferedSocket.set_read_controler sock download_control;
121 TcpBufferedSocket.set_write_controler sock upload_control;
124 (*************************************************************************)
125 (* *)
126 (* find_request *)
127 (* *)
128 (*************************************************************************)
130 let rec find_request c sock headers =
132 if !verbose_msg_clients then begin
133 lprintf "HEADER FROM CLIENT:\n";
134 AnyEndian.dump_ascii header;
135 end; *)
137 let d =
138 match c.client_requests with
139 [] -> failwith "[GDO] No download request !!!"
140 | d :: tail ->
141 c.client_requests <- tail;
144 let file = d.download_file in
146 (try
147 match file.file_ttr with
148 Some _ -> ()
149 | None ->
150 if not d.download_ttr_requested then
151 let (url, _) =
152 List.assoc "x-thex-uri" headers
153 (* TODO: this header is used by Gnutella2 when you want to specify to which
154 depth you want the tigertree:
155 List.assoc "x-tigertree-path" headers *)
157 if String.length url > 0 && url.[0] = '/' then begin
159 let s = Printf.sprintf "GET %s HTTP/1.1" url in
161 let nblocks =
162 let size = file_size file in
163 let chunk_size = CommonUploads.tiger_block_size in
164 Int64.to_int (size // chunk_size) + 1 in
165 let tiger_pos = CommonUploads.tiger_pos nblocks in
166 let end_pos = tiger_pos + nblocks in
167 (* 24 is the size of SHA1 digests *)
168 let end_pos = end_pos * 24 in
169 let range = Printf.sprintf "0-%d" (end_pos-1) in
171 if !verbose_msg_clients then
172 lprintf "[TTR] Requesting ttr with range %s\n" range;
173 let headers =
174 ["Range", Printf.sprintf "bytes=%s" range]
176 let s = make_download_request c s headers in
177 if !verbose_msg_clients then
178 lprintf "[TTR] SENDING TTR REQUEST: %s\n" (String.escaped s);
179 write_string sock s;
180 d.download_ttr_requested <- true;
181 d.download_ranges <- d.download_ranges @ [TTRReq nblocks];
182 c.client_requests <- c.client_requests @ [d];
183 end else
184 lprintf "[TTR] Url is not relative: %s\n" url;
185 with _ -> ());
188 match d.download_ranges with
189 [] -> assert false
190 | req :: tail ->
191 d.download_ranges <- tail;
192 match req with
193 | HEADReq ->
195 if !!gnutella_experimental &&
196 not c.client_support_head_request then
197 c.client_support_head_request <- true;
198 (* TODO: why do we use the HEAD request ?? For the headers ? *)
199 (* lprintf "Received Reply to HEAD request: %s\n"
200 (String.escaped header); *)
201 (* d.download_head <- Head (first_line, headers); *)
202 (fun _ _ _ -> ()), read_more d c sock
204 | RANGEReq (_,_,r) ->
205 (* let size = file_size file in*)
207 set_client_state c (Connected_downloading (file_num file));
209 (* Send the next request !!! *)
210 for i = 1 to GnutellaNetwork.max_queued_ranges do
211 if List.length d.download_ranges <=
212 GnutellaNetwork.max_queued_ranges then
213 (try get_from_client sock c with _ -> ());
214 done;
216 read_some d c, read_more d c sock
218 | TTRReq nblocks ->
219 (* TODO loading the ttr in main memory is probably a bad idea, we should
220 save it on disk in the next version. *)
221 if !verbose_msg_clients then begin
222 lprintf "[TTR] header received: \n";
223 print_head "" headers;
224 end;
225 let buf = Buffer.create 100 in
226 let read_ttr counter_pos b to_read_int =
227 Buffer.add_substring buf b.buf b.pos to_read_int
229 let read_more () =
230 if !verbose_msg_clients then
231 lprintf "[TTR] ttr loaded\n";
232 let ttr = Buffer.contents buf in
233 let tiger_pos = CommonUploads.tiger_pos nblocks in
234 let array = Array.create nblocks TigerTree.null in
235 for i = 0 to nblocks - 1 do
236 array.(i) <- TigerTree.direct_of_string
237 (String.sub ttr ((tiger_pos + i) * 24) 24)
238 done;
239 file.file_ttr <- Some array;
241 (match file.file_swarmer with
242 None -> ()
243 | Some swarmer ->
244 if !verbose_msg_clients then
245 lprintf "[TTR] set_verifier\n";
246 CommonSwarming.set_verifier swarmer
247 (Verification (Array.map (fun ttr -> TigerTree ttr) array))
252 read_more d c sock ()
254 read_ttr, read_more
256 (*************************************************************************)
257 (* *)
258 (* read_some *)
259 (* *)
260 (*************************************************************************)
262 and read_some d c counter_pos b to_read_int =
264 let up = match d.download_uploader with
265 None -> assert false
266 | Some up -> up in
268 let file = d.download_file in
270 if file_state file <> FileDownloading then begin
271 disconnect_client c Closed_by_user;
272 raise Exit;
273 end;
275 begin
277 CommonSwarming.received up
278 counter_pos b.buf b.pos to_read_int;
279 with e ->
280 lprintf "FT: Exception %s in CommonSwarming.received\n"
281 (Printexc2.to_string e);
282 (* TODO: we should pause the download !!! *)
283 end;
284 c.client_reconnect <- true
286 (*************************************************************************)
287 (* *)
288 (* read_more *)
289 (* *)
290 (*************************************************************************)
292 and read_more d c sock () =
293 (* If we have no more range to receive, disconnect *)
294 if d.download_ranges = [] then get_from_client sock c
296 (*************************************************************************)
297 (* *)
298 (* client_parse_header *)
299 (* *)
300 (*************************************************************************)
302 and client_parse_header c gconn sock (first_line, headers) =
303 if !verbose_msg_clients then begin
304 lprintf "[GDO] CLIENT PARSE HEADER\n";
305 end;
307 set_lifetime sock 3600.;
310 (* The reply should be "HTTP/1.1 200 OK" *)
311 let space_pos = String.index first_line ' ' in
312 let code = String.sub first_line (space_pos+1) 3 in
313 let code = int_of_string code in
315 GnutellaProtocol.parse_headers c first_line headers;
317 (* TODO in case of bad code, just jump to the next reply header... *)
318 if code < 200 || code > 299 then
319 failwith "Bad HTTP code";
321 connection_ok c.client_connection_control;
322 set_client_state c Connected_initiating;
323 set_rtimeout sock 120.;
325 let start_pos, end_pos =
327 let (range,_) = List.assoc "content-range" headers in
329 let npos = (String.index range 'b')+6 in
330 let dash_pos = try String.index range '-' with _ -> -10 in
331 let slash_pos = try String.index range '/' with _ -> -20 in
332 let star_pos = try String.index range '*' with _ -> -30 in
333 if star_pos = slash_pos-1 then
334 failwith "Cannot parse range"
335 (* Int64.zero, size (* "bytes */X" *) *)
336 else
337 let x = Int64.of_string (
338 String.sub range npos (dash_pos - npos) )
340 let len = String.length range in
341 let y = Int64.of_string (
342 String.sub range (dash_pos+1) (slash_pos - dash_pos - 1))
344 if slash_pos = star_pos - 1 then
345 x, Int64.succ y (* "bytes x-y/*" *)
346 else
347 let z = Int64.of_string (
348 String.sub range (slash_pos+1) (len - slash_pos -1) )
350 if y = z then
351 failwith "Cannot parse range"
352 (* Int64.pred x, size *)
353 else x, Int64.succ y
354 with
355 | e ->
356 lprintf "[GDO] Exception %s for range [%s]\n"
357 (Printexc2.to_string e) range;
358 raise e
359 with e ->
360 (* TODO: we should be able to reply to a request containing no Content-Range
361 iff Content-Length = Requested Length and
362 Requested Range = 0-Content-Length-1 ??? *)
366 if code <> 206 && code <> 200 then raise Not_found;
367 let (len,_) = List.assoc "content-length" headers in
368 let len = Int64.of_string len in
369 lprintf "Specified length: %Ld\n" len;
370 match d.download_ranges with
371 [] -> raise Not_found
372 | (start_pos,end_pos,r) :: _ ->
373 lprintf "WARNING: Assuming client is replying to range\n";
374 if len <> end_pos -- start_pos then
375 begin
376 lprintf "\n\nERROR: bad computed range: %Ld-%Ld/%Ld \n"
377 start_pos end_pos len;
378 print_head first_line headers;
379 raise Not_found
380 end;
381 (start_pos, end_pos)
382 with _ -> *)
383 (* A bit dangerous, no ??? *)
384 if !verbose_unknown_messages then
385 begin
386 lprintf "[GDO] ERROR: Could not find/parse range header (exception %s), disconnect\n"
387 (Printexc2.to_string e);
388 print_head first_line headers
389 end;
390 disconnect_client c (Closed_for_error "Bad HTTP Range");
391 raise Exit
393 (try
394 let (len,_) = List.assoc "content-length" headers in
395 let len = Int64.of_string len in
396 (* lprintf "[GDO] Specified length: %Ld\n" len; *)
397 if len <> end_pos -- start_pos then
398 begin
399 lprintf "[GDO] ERROR: bad computed range: %Ld-%Ld/%Ld \n"
400 start_pos end_pos len;
401 print_head first_line headers;
402 failwith "Bad Computed Range"
405 with _ ->
406 lprintf "[WARNING]: no Content-Length field\n";
407 print_head first_line headers;
411 lprintf "[GDO] Receiving range: %Ld-%Ld (len = %Ld)\n"
412 start_pos end_pos (end_pos -- start_pos);
413 print_head first_line headers;
416 let read_some, read_more = find_request c sock headers in
418 let counter_pos = ref start_pos in
419 gconn.gconn_handler <- Reader (fun gconn sock ->
421 let b = TcpBufferedSocket.buf sock in
422 let to_read = min (end_pos -- !counter_pos)
423 (Int64.of_int b.len) in
425 (* lprintf "[GDO] Reading: end_pos %Ld counter_pos %Ld len %d = to_read %Ld\n"
426 end_pos !counter_pos b.len to_read;
428 let to_read_int = Int64.to_int to_read in
430 read_some !counter_pos b to_read_int;
431 buf_used b to_read_int;
432 counter_pos := !counter_pos ++ to_read;
433 if !counter_pos = end_pos then begin
434 read_more ();
435 gconn.gconn_handler <- HttpReader (4,
436 ["HTTP",client_parse_header c],
437 GnutellaFunctions.default_handler)
438 end)
440 with e ->
441 if !verbose_unknown_messages then
442 begin
443 lprintf "[GDO] Exception %s in client_parse_header\n" (Printexc2.to_string e);
444 print_head first_line headers;
445 end;
446 disconnect_client c (Closed_for_exception e);
447 raise e
449 (*************************************************************************)
450 (* *)
451 (* get_from_client *)
452 (* *)
453 (*************************************************************************)
455 (* TODO: implement TigerTree download *)
456 (* TODO: use Gnutella2 alternative uids *)
458 and get_from_client sock (c: client) =
460 let rec iter downloads =
461 match downloads with
462 [] ->
463 if !verbose_msg_clients then
464 lprintf "No other download to start\n";
465 raise Not_found
466 | d :: tail ->
468 if file_state d.download_file <> FileDownloading then iter tail
469 else begin
471 let file = d.download_file in
473 let swarmer = match file.file_swarmer with
474 None -> assert false | Some sw -> sw
477 let up = match d.download_uploader with
478 None ->
479 let chunks = [ Int64.zero, file_size file ] in
480 let up = CommonSwarming.register_uploader swarmer
481 (as_client c)
482 (AvailableIntervals chunks) in
483 d.download_uploader <- Some up;
486 | Some up -> up in
488 if !verbose_msg_clients then begin
489 lprintf "FINDING ON CLIENT\n";
490 end;
491 let file = d.download_file in
492 if !verbose_msg_clients then begin
493 lprintf "FILE FOUND, ASKING\n";
494 end;
496 if !verbose_swarming then begin
497 lprintf "Current download:\n Current chunks: ";
498 List.iter (fun (x,y) -> lprintf "%Ld-%Ld " x y) d.download_chunks;
499 lprintf "\n Current ranges: ";
500 List.iter (fun req ->
501 match req with
502 RANGEReq (x,y,r) ->
503 (* let (x,y) = CommonSwarming.range_range r in *)
504 lprintf "%Ld-%Ld " x y
505 | HEADReq ->
506 lprintf "HEAD"
507 | TTRReq _ ->
508 lprintf "TTR"
509 ) d.download_ranges;
511 lprintf "\n Current blocks: ";
512 (* List.iter (fun b -> CommonSwarming.print_block b.up_block) d.download_blocks; *)
513 lprintf "\n\nFinding Range: \n";
514 end;
515 let range =
517 let rec iter () =
518 match d.download_blocks with
519 | [] ->
520 if !verbose_swarming then lprintf "No block\n";
521 (try CommonSwarming.verify_one_chunk swarmer with _ -> ());
522 let _chunk, blocks = CommonSwarming.find_blocks up in
524 if !verbose_swarming then begin
525 lprintf "GOT BLOCKS:\n";
526 CommonSwarming.print_uploaders swarmer;
527 lprintf "Blocks Found: ";
528 List.iter (fun b ->
529 CommonSwarming.print_block b.up_block) blocks;
530 end;
532 d.download_blocks <- blocks;
533 iter ()
534 | blocks ->
536 if !verbose_swarming then begin
537 lprintf "Current Block: "; CommonSwarming.print_block b;
538 end;
541 let (x,y,r) =
542 CommonSwarming.find_range up (Int64.of_int (256 * 1024)) in
544 if !verbose_swarming then begin
545 lprintf "GOT RANGE:\n";
546 CommonSwarming.print_uploaders swarmer;
547 end;
548 d.download_ranges <- d.download_ranges @
549 [RANGEReq (x,y,r)];
550 Printf.sprintf "%Ld-%Ld" x (Int64.pred y)
551 with Not_found ->
552 if !verbose_swarming then
553 lprintf "Could not find range in current block\n";
554 d.download_blocks <- [];
555 iter ()
557 iter ()
558 with Not_found ->
559 if !verbose_unknown_messages then
560 lprintf_nl "Unable to get a block !!";
561 check_finished swarmer file;
562 raise Not_found
564 let s = request_of_download "GET" d in
565 let headers =
566 ("Range", Printf.sprintf "bytes=%s" range) :: []
568 let s = make_download_request c s headers in
569 if !verbose_msg_clients then
570 lprintf "SENDING REQUEST: %s\n" (String.escaped s);
571 write_string sock s;
572 c.client_requests <- c.client_requests @ [d];
573 if !verbose_msg_clients then
574 lprintf "Asking %s For Range %s\n" (Md4.to_string c.client_user.user_uid)
575 range
578 iter c.client_downloads
580 (*************************************************************************)
581 (* *)
582 (* disconnect_client *)
583 (* *)
584 (*************************************************************************)
586 and disconnect_client c r =
587 if !verbose then
588 lprintf_nl "DISCONNECT CLIENT";
589 match c.client_sock with
590 | Connection sock ->
591 (try
592 if !verbose_msg_clients then
593 lprintf_nl "Disconnected from source for %s" (string_of_reason r);
594 c.client_requests <- [];
595 connection_failed c.client_connection_control;
596 set_client_disconnected c r;
597 close sock r;
598 c.client_sock <- NoConnection;
599 List.iter (fun d ->
600 let file = d.download_file in
601 if not (List.memq file c.client_in_queues) then begin
602 Queue.put file.file_clients_queue (0,c);
603 c.client_in_queues <- file :: c.client_in_queues
604 end;
605 match d.download_uploader with
606 None -> ()
607 | Some up ->
608 d.download_uploader <- None;
609 CommonSwarming.unregister_uploader up;
610 d.download_blocks <- [];
611 d.download_ranges <- [];
612 ) c.client_downloads;
613 begin
614 match c.client_connected_for with
615 None -> ()
616 | Some file ->
617 file.file_nconnected_clients <- file.file_nconnected_clients - 1;
619 lprintf "For file %s, %d/%d clients connected (disconnected from %d)\n"
620 (file.file_name) file.file_nconnected_clients (nranges file)
621 (client_num (as_client c.client_client));
623 c.client_connected_for <- None
624 end;
626 if c.client_reconnect then begin
627 c.client_reconnect <- false;
628 connect_client c
630 with e ->
631 lprintf "Exception %s in disconnect_client\n"
632 (Printexc2.to_string e))
633 | _ -> ()
635 (*************************************************************************)
636 (* *)
637 (* connect_client *)
638 (* *)
639 (*************************************************************************)
641 and connect_client c =
642 match c.client_sock with
643 | Connection _ | ConnectionWaiting _ -> ()
644 | NoConnection ->
646 (* Count this connection in the first file counter. Here, we assume
647 that the connection will not be aborted (otherwise, disconnect_client
648 should clearly be called). *)
649 let download = ref None in
650 (try List.iter (fun d ->
651 let file = d.download_file in
652 if file_state file = FileDownloading then
653 begin
654 download := Some d;
655 c.client_connected_for <- Some file;
656 file.file_nconnected_clients <-
657 file.file_nconnected_clients + 1;
659 lprintf "For file %s, %d/%d clients connected (connecting %d)\n"
660 (file.file_name)
661 file.file_nconnected_clients (nranges file)
662 (client_num (as_client c.client_client)); *)
663 raise Exit;
665 ) c.client_downloads with _ -> ());
666 begin
668 match c.client_user.user_kind with
669 Indirect_location (_, uid, _, _) ->
670 GnutellaProto.ask_for_push uid
671 | Known_location (ip, port) ->
672 let token =
673 add_pending_connection connection_manager (fun token ->
675 if !verbose_msg_clients then begin
676 lprintf "connect_client\n";
677 end;
678 if !verbose_msg_clients then begin
679 lprintf "connecting %s:%d\n" (Ip.to_string ip) port;
680 end;
681 c.client_reconnect <- false;
682 let sock = connect token "gnutella download"
683 (Ip.to_inet_addr ip) port
684 (fun sock event ->
685 match event with
686 | BASIC_EVENT RTIMEOUT ->
687 disconnect_client c Closed_for_timeout
688 | BASIC_EVENT LTIMEOUT ->
689 disconnect_client c Closed_for_lifetime
690 | BASIC_EVENT (CLOSED s) ->
691 disconnect_client c s
693 (* You can only use the CONNECTED signal if the socket is not yet controlled
694 by the bandwidth manager... 2004/02/03: Normally, not true anymore, it should
695 now work even in this case... *)
697 | CONNECTED ->
698 init_client sock;
699 get_from_client sock c
700 | _ -> ()
704 c.client_host <- Some (ip, port);
705 c.client_country_code <- None;
706 check_client_country_code c;
707 set_client_state c Connecting;
708 c.client_sock <- Connection sock;
709 TcpBufferedSocket.set_closer sock (fun _ s ->
710 disconnect_client c s
712 set_rtimeout sock 30.;
713 set_gnutella_sock sock !verbose_msg_clients
714 (HttpReader (4, ["HTTP", client_parse_header c],
715 GnutellaFunctions.default_handler))
717 with e ->
718 lprintf "Exception %s while connecting to client\n"
719 (Printexc2.to_string e);
720 disconnect_client c (Closed_for_exception e)
723 c.client_sock <- ConnectionWaiting token
728 1022569854.519 24.102.10.39:3600 -> 212.198.235.45:51736 of len 82
729 ascii [
730 G I V 8 1 : 9 7 4 3 2 1 3 F B 4 8 6 2 3 D 0 F F D F A B B 3 8 0 E C 6 C 0 0 / P o l i c e V i d e o - E v e r y B r e a t h Y o u T a k e . m p g(10)(10)]
732 "GIV %d:%s/%s\n\n" file.file_number client.client_md4 file.file_name
737 (*************************************************************************)
738 (* *)
739 (* push_handler *)
740 (* *)
741 (*************************************************************************)
743 let push_handler cc gconn sock (first_line, headers) =
744 if !verbose_msg_clients then begin
745 lprintf "PUSH";
746 print_head first_line headers;
747 end;
749 let (ip, port) = TcpBufferedSocket.peer_addr sock in
751 if !verbose_msg_clients then begin
752 lprintf "PARSING GIV HEADER\n";
753 end;
754 let colon_pos = String.index first_line ':' in
755 let uid = Md4.of_string (String.sub first_line (colon_pos+1) 32) in
756 let index = int_of_string (String.sub first_line 4 (colon_pos-4)) in
757 if !verbose_msg_clients then begin
758 lprintf "PARSED\n";
759 end;
760 let c = try
761 Hashtbl.find clients_by_uid (Indirect_location ("", uid, ip, port))
762 with _ ->
764 Hashtbl.find clients_by_uid (Known_location (ip,port))
765 with _ ->
766 let c = new_client (Indirect_location ("", uid, ip, port)) in
767 if String.length c.client_user.user_nick == 0 then
768 c.client_user.user_nick <- (Md4.to_string uid);
772 c.client_host <- Some (ip, port);
773 check_client_country_code c;
774 match c.client_sock with
775 | Connection _ ->
776 if !verbose_msg_clients then begin
777 lprintf "ALREADY CONNECTED\n";
778 end;
779 close sock (Closed_for_error "already connected");
780 raise End_of_file
781 | _ ->
782 if !verbose_msg_clients then begin
783 lprintf "NEW CONNECTION\n";
784 end;
785 cc := Some c;
786 c.client_sock <- Connection sock;
787 connection_ok c.client_connection_control;
789 if !verbose_msg_clients then begin
790 lprintf "FINDING FILE %d\n" index;
791 end;
792 let d = find_download_by_index index c.client_downloads in
793 if !verbose_msg_clients then begin
794 lprintf "FILE FOUND\n";
795 end;
797 c.client_downloads <- d :: (List2.removeq d c.client_downloads);
798 get_from_client sock c;
799 gconn.gconn_handler <- HttpReader (4,
800 ["HTTP", client_parse_header c],
801 GnutellaFunctions.default_handler)
802 with e ->
803 lprintf "Exception %s during client connection\n"
804 (Printexc2.to_string e);
805 disconnect_client c (Closed_for_exception e);
806 raise End_of_file
807 with e ->
808 lprintf "Exception %s in push_handler\n" (Printexc2.to_string e);
809 print_head first_line headers;
810 (match !cc with Some c -> disconnect_client c (Closed_for_exception e)
811 | _ -> ());
812 raise e
814 (*************************************************************************)
815 (* *)
816 (* read_request *)
817 (* *)
818 (*************************************************************************)
820 (* TODO: add Gnutella2 alternative UIDs *)
821 (* TODO: implement TigerTree upload *)
823 let read_request url headers gconn sock =
824 let url = Url.of_string url in
826 let reader, size, add_headers = find_file_to_upload gconn url in
827 let partial, (chunk_pos, chunk_end) =
829 true,
830 let (range,_) = List.assoc "range" headers in
831 match Http_server.parse_range range with
832 x, None, _ -> x, size
833 | x, Some y, Some z ->
834 if y = z then (* some vendor bug *)
835 Int64.pred x, y
836 else
837 x, Int64.succ y
838 | x, Some y, None ->
839 x, Int64.succ y
840 with _ -> false, (Int64.zero, size)
843 let chunk_len = chunk_end -- chunk_pos in
844 (* TODO: assert that the the range is inside the file *)
847 let header =
848 let s =
849 Printf.sprintf "HTTP/1.1 %s"
850 (if partial then "206 Partial Reply" else "200 OK") in
851 let headers =
852 ("Connection", "Keep-Alive") ::
853 ("Content-type", "application/binary") ::
854 ("Content-length", Printf.sprintf "%Ld" chunk_len) ::
855 add_headers
857 let headers =
858 if partial then begin
859 ("Accept-Ranges", "bytes") ::
860 ("Content-range",
861 Printf.sprintf "bytes %Ld-%Ld/%Ld"
862 chunk_pos (Int64.pred chunk_end) size) ::
863 headers
864 end else headers in
865 let headers =
866 if gconn.gconn_client_info_sent then headers else
867 begin
868 gconn.gconn_client_info_sent <- true;
869 ("Remote-IP", Ip.to_string (fst (peer_addr sock)) ) ::
870 ("Server", get_user_agent ()) :: headers
873 make_http_header s headers
876 let uc = {
877 uc_sock = sock;
878 uc_partial = true;
879 uc_reader = reader;
880 uc_chunk_pos = chunk_pos;
881 uc_chunk_len = chunk_end -- chunk_pos;
882 uc_chunk_end = chunk_end;
883 uc_size = size;
884 uc_header = header;
885 } in
888 (*************************************************************************)
889 (* *)
890 (* find_slot *)
891 (* *)
892 (*************************************************************************)
894 let find_slot sock =
895 let rec iter list rem =
896 match list with
897 [] ->
898 if List.length rem >= !!max_available_slots then begin
900 if !verbose_msg_clients then begin
901 lprintf "[GUP] All slots used:\n";
902 List.iter (fun s ->
903 let (ip, port) = peer_addr s in
904 lprintf " by %s:%d %b\n" (Ip.to_string ip) port
905 (closed s);
906 ) rem;
907 end;
908 failwith "All Slots Used";
909 end;
910 sock :: rem
911 | s :: tail ->
912 if s == sock then list @ rem
913 else
914 if closed s then
915 iter tail rem
916 else
917 iter tail (s :: rem)
919 current_downloads := iter !current_downloads []
922 (*************************************************************************)
923 (* *)
924 (* get_handler *)
925 (* *)
926 (*************************************************************************)
928 let get_handler get_request cc gconn sock (first_line, headers) =
929 if !verbose_msg_clients then begin
930 lprintf "[GUP] GET";
931 print_head first_line headers;
932 end;
934 (* We don't want to buffer more than 100 kB per upload connection *)
935 set_max_output_buffer sock max_upload_buffer_len;
936 if !verbose_msg_clients then
937 lprintf "[GUP] After set_max_output_buffer: max_refill %d\n"
938 (max_refill sock);
939 (* lprintf "parse_head\n"; *)
941 let request_end = String.index first_line ' ' in
942 let request = String.sub first_line 0 request_end in
944 let url_end = String.index_from first_line (request_end+1) ' ' in
945 let url = String.sub first_line (request_end+1)
946 (url_end - request_end-1) in
948 let proto_len = String.length first_line in
949 let proto = String.sub first_line (url_end+1) (proto_len-url_end-1) in
950 if !verbose_msg_clients then
951 lprintf "[GUP] Header parsed: [%s] [%s] [%s]\n" request url proto;
952 (* "/get/num/filename" *)
954 (* First of all, can we accept this request ???? *)
955 find_slot sock;
956 let uc = read_request url headers gconn sock in
957 let header_sent = ref false in
959 (* For HEAD request, do as if we had already sent the data *)
960 if not get_request then uc.uc_chunk_pos <- uc.uc_chunk_end;
962 let write_done = ref false in
963 let rec refill sock =
964 if !verbose_msg_clients then
965 lprintf "[GUP] refill called\n";
966 if !write_done && not !!keep_alive && remaining_to_write sock = 0 then
967 begin
968 if !verbose_msg_clients then
969 lprintf "CLOSING AFTER WRITE\n";
970 close sock (Closed_by_user)
972 end else
973 let can_write = max_refill sock in
974 if not !header_sent then
975 if can_write > String.length uc.uc_header then begin
976 (* BUG: send the header *)
977 TcpBufferedSocket.write_string sock uc.uc_header;
978 if !verbose_msg_clients then begin
979 lprintf "[GUP] Sending Header:\n";
980 AnyEndian.dump uc.uc_header;
981 lprintf "\n";
982 end;
983 header_sent:=true;
984 end;
985 if !header_sent then begin
986 if uc.uc_chunk_pos = uc.uc_chunk_end then begin
987 if !verbose_msg_clients then
988 lprintf "[GUP] Finished replying to header\n";
989 write_done := true;
990 if !!keep_alive then
991 match gconn.gconn_refill with
992 [] -> ()
993 | [_] ->
994 gconn.gconn_refill <- []
995 | _ :: ((refill :: _ ) as tail) ->
996 gconn.gconn_refill <- tail;
997 refill sock
998 end else
1000 if get_request then
1001 let pos = uc.uc_chunk_pos in
1002 let to_write = uc.uc_chunk_end -- pos in
1003 let rlen = min (max_refill sock) (Int64.to_int to_write) in
1004 if !verbose_msg_clients then
1005 lprintf "[GUP] to_write: %d/%Ld/%d\n" rlen to_write
1006 (remaining_to_write sock);
1008 if rlen > 0 then begin
1009 uc.uc_reader pos upload_buffer 0 rlen;
1010 if !verbose_msg_clients then
1011 lprintf "[GUP] Writting %d\n" rlen;
1012 TcpBufferedSocket.write sock upload_buffer 0 rlen;
1014 uc.uc_chunk_pos <- uc.uc_chunk_pos ++ (Int64.of_int rlen);
1015 if remaining_to_write sock = 0 then refill sock
1017 end;
1019 gconn.gconn_refill <- gconn.gconn_refill @ [refill];
1020 if !verbose_msg_clients then
1021 lprintf "[GUP] refill: %d refillers\n" (List.length gconn.gconn_refill);
1022 match gconn.gconn_refill with
1023 refill :: tail ->
1024 (* First refill handler, must be called immediatly *)
1025 refill sock
1026 | _ -> (* Already a refill handler, wait for it to finish its job *)
1029 with e ->
1030 (* TODO: send back a 404 NOT FOUND error *)
1031 if !verbose_msg_clients then begin
1032 lprintf "[GUP] Exception %s in get_handler\n" (Printexc2.to_string e);
1033 print_head first_line headers;
1034 end;
1035 close sock (Closed_for_exception e);
1036 raise e
1038 (*************************************************************************)
1039 (* *)
1040 (* listen *)
1041 (* *)
1042 (*************************************************************************)
1044 let listen () =
1046 let sock = TcpServerSocket.create "gnutella client server"
1047 (Ip.to_inet_addr !!client_bind_addr)
1048 !!client_port
1049 (fun sock event ->
1050 match event with
1051 TcpServerSocket.CONNECTION (s,
1052 Unix.ADDR_INET(from_ip, from_port)) ->
1053 if !verbose then
1054 lprintf "CONNECTION RECEIVED FROM %s FOR PUSH\n%s"
1055 (Ip.to_string (Ip.of_inet_addr from_ip))
1056 "*********** CONNECTION ***********\n";
1058 let token = create_token connection_manager in
1059 let sock = TcpBufferedSocket.create token
1060 "gnutella client connection" s
1061 (fun sock event ->
1062 match event with
1063 BASIC_EVENT RTIMEOUT -> close sock Closed_for_timeout
1064 | BASIC_EVENT LTIMEOUT -> close sock Closed_for_lifetime
1065 | _ -> ()
1068 TcpBufferedSocket.set_read_controler sock download_control;
1069 TcpBufferedSocket.set_write_controler sock upload_control;
1071 let c = ref None in
1072 TcpBufferedSocket.set_closer sock (fun _ s ->
1073 match !c with
1074 Some c -> disconnect_client c s
1075 | None ->
1076 if !verbose then
1077 lprintf "DISCONNECTION BEFORE CLIENT %s:%d IS KNOWN\n"
1078 (Ip.to_string (peer_ip sock)) (peer_port sock)
1080 TcpBufferedSocket.set_rtimeout sock 30.;
1081 set_gnutella_sock sock !verbose_msg_clients
1082 (HttpReader (4,
1084 "GIV", push_handler c;
1085 "GET", get_handler true c;
1086 "HEAD", get_handler false c;
1087 ], GnutellaFunctions.default_handler)
1089 | _ -> ()
1090 ) in
1091 listen_sock := Some sock;
1093 with e ->
1094 lprintf "Exception %s while init gnutella server\n"
1095 (Printexc2.to_string e)
1097 (*************************************************************************)
1098 (* *)
1099 (* push_connection *)
1100 (* *)
1101 (*************************************************************************)
1103 let push_connection guid index ip port =
1104 let _ =
1105 add_pending_connection connection_manager (fun token ->
1106 let sh =Hashtbl.find shareds_by_id index in
1107 let sock = connect token "gnutella download"
1108 (Ip.to_inet_addr ip) port
1109 (fun sock event ->
1110 match event with
1111 BASIC_EVENT RTIMEOUT -> close sock Closed_for_timeout
1112 | BASIC_EVENT LTIMEOUT -> close sock Closed_for_lifetime
1113 | _ -> ()
1116 lprintf "CONNECTION PUSHED TO %s\n" (Ip.to_string ip);
1118 TcpBufferedSocket.set_read_controler sock download_control;
1119 TcpBufferedSocket.set_write_controler sock upload_control;
1121 let c = ref None in
1122 TcpBufferedSocket.set_closer sock (fun _ s ->
1123 match !c with
1124 Some c -> disconnect_client c s
1125 | None -> ()
1127 TcpBufferedSocket.set_rtimeout sock 30.;
1128 (* TODO test this, looks strange... *)
1129 set_gnutella_sock sock !verbose_msg_clients
1130 (HttpReader (4, [
1131 "GET", get_handler true c;
1132 "HEAD", get_handler false c;
1133 ], GnutellaFunctions.default_handler));
1134 write_string sock
1135 (Printf.sprintf "GIV %d:%s/%s\n\n"
1136 index (Md4.to_string guid) sh.shared_codedname)