patch #7448
[mldonkey.git] / src / daemon / common / commonSwarming.ml
blob5336eb960da2454c6a4ece946710f99d5250ea83
1 (* Copyright 2001, 2002 b8_bavard, b8_fee_carabine, INRIA *)
2 (*
3 This file is part of mldonkey.
5 mldonkey is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation; either version 2 of the License, or
8 (at your option) any later version.
10 mldonkey is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 GNU General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with mldonkey; if not, write to the Free Software
17 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
21 The jobs of swarmers are :
22 * select what data to ask from each uploader
23 * merge data coming from uploaders, potentially from different
24 networks, into a single Unix32 backend.
26 OVERALL SCHEMA
28 Each network frontend can have a different (fixed) chunk size
29 t1 +--------+--------+--------+--------+--------+--------+-------- chunks
30 t2 +------+------+------+------+------+------+------+------+------ chunks
32 each block is contained in at most /\ chunk_of_block
33 one chunk, for any network || mappings
34 \/ blocks_of_chunk
35 swarmer
36 +------+-+----+---+--+------+------++-----+--+---+----+-+------ blocks
37 | | | | | | | ... variable size
38 v v v v v v v
39 r<>r r r r r r<>r r<>r ranges
40 ^ one dbl linked list/block
41 | encoding missing data ranges
42 uploaders physically uploader
43 reference ranges
46 open Int64ops
47 open Options
48 open Printf2
50 open CommonOptions
51 open CommonTypes
54 let debug_all = false
56 exception VerifierNotReady
58 type intervals =
59 AvailableIntervals of (int64 * int64) list
60 | AvailableBitv of Bitv.t
62 type verification =
63 NoVerification
64 | VerificationNotAvailable
65 | ForceVerification
66 | Verification of uid_type array
68 let exit_on_error = ref false
70 let log_prefix = "[cSw]"
72 let lprintf_nl fmt =
73 lprintf_nl2 log_prefix fmt
75 let lprintf_n fmt =
76 lprintf2 log_prefix fmt
78 open CommonTypes
80 open CommonFile
81 open CommonTypes
82 open CommonClient
84 module VB = VerificationBitmap
86 (* If we want to become 'multinet', we should:
87 * there shouldn't be any block_size, instead blocks should correspond
88 to the largest blocks which are completely included in one chunk for
89 every network.
90 * a completed block should not be verified until all the blocks are
91 completed for all networks.
92 * swarmers should be loaded before other files, so that they can be
93 used and shared by different files.
95 * Verification:
96 - must_verify_chunk
98 TODO: s_last_seen is useless, only t_last_seen is useful, at least in the
99 first version.
103 (* network "frontend"/"view"/... to a swarmer *)
104 (* glossary:
105 network frontend use "chunks" of data,
106 swarmer use "blocks" of data *)
107 type t = {
108 t_num : int;
109 mutable t_primary : bool;
110 t_file : file;
111 mutable t_s : swarmer;
112 t_chunk_size : int64;
114 t_nchunks : int;
115 mutable t_converted_verified_bitmap : VerificationBitmap.t;
116 mutable t_last_seen : int array;
117 mutable t_ncomplete_chunks : int;
118 mutable t_nverified_chunks : int;
120 mutable t_verifier : verification; (* information available to
121 check data correctness *)
122 mutable t_verified : (int -> int -> unit); (* function to call
123 when a chunk is verified;
124 receives the number
125 of verified chunks,
126 and the index of the
127 chunk just verified *)
129 (* mapping from network chunks to swarmer blocks *)
130 mutable t_blocks_of_chunk : int list array;
131 (* mapping from swarmer blocks to network chunks *)
132 mutable t_chunk_of_block : int array;
135 and swarmer = {
136 s_num : int;
137 s_filename : string;
138 s_size : int64;
139 s_disk_allocation_block_size : int64;
141 mutable s_networks : t list; (** list of frontends, primary at head
142 t.t_s = s <=> t in s.s_networks *)
143 mutable s_strategy : swarming_strategy;
145 mutable s_verified_bitmap : VerificationBitmap.t;
146 mutable s_priorities_bitmap : string;
147 mutable s_priorities_intervals : (int64 * int) list;
148 (* beginning, priority *)
149 mutable s_disk_allocated : Bitv.t;
150 mutable s_availability : int array;
151 mutable s_nuploading : int array;
152 (* mutable s_last_seen : int array; *)
154 mutable s_blocks : block_v array;
155 mutable s_block_pos : int64 array; (** offset of the beginning of
156 each block *)
159 and block_v =
160 EmptyBlock
161 | PartialBlock of block
162 | CompleteBlock
163 | VerifiedBlock
165 and block = {
166 block_s : swarmer;
167 mutable block_num : int;
168 mutable block_begin : Int64.t;
169 mutable block_end : Int64.t;
170 mutable block_ranges : range; (** [range] of the double-linked
171 list of ranges associated to the
172 [block]
173 what about using a standard list
174 instead ?
175 or a balanced tree ? *)
176 mutable block_remaining : int64; (* amount of bytes missing. *)
177 mutable block_unselected_remaining : int64; (* same, less ranges
178 selected for uploaders. *)
181 and range = {
182 mutable range_block : block;
183 mutable range_begin : Int64.t;
184 mutable range_end : Int64.t;
185 mutable range_prev : range option;
186 mutable range_next : range option;
187 mutable range_nuploading : int; (* number of uploaders currently
188 referencing that range *)
191 and uploader = {
192 up_t : t;
193 up_client : client;
195 mutable up_declared : bool;
197 mutable up_intervals : intervals;
198 mutable up_complete_blocks : int array; (** block numbers *)
199 mutable up_ncomplete : int; (** number of blocks not yet handled,
200 at the beginning of
201 up_complete_blocks *)
203 mutable up_partial_blocks : (int * int64 * int64) array; (** block
204 number,
205 begin_pos,
206 end_pos
208 mutable up_npartial : int; (** number of blocks not yet handled,
209 at the beginning of
210 up_partial_blocks *)
212 mutable up_blocks : uploader_block list;
214 mutable up_ranges : (int64 * int64 * range) list; (* ranges referenced by
215 that uploader, see
216 range_nuploading *)
218 and uploader_block = {
219 up_block : block;
220 up_block_begin : int64;
221 up_block_end : int64;
224 (* range invariants:
225 Ranges represent "holes" of missing data in a block; Data is
226 missing between offsets range_begin and range_end.
228 [block]'s [block_ranges] reference the first (smallest offsets) of
229 the [range]s associated with it.
231 [range]s are double-linked together thru [range_prev] and
232 [range_next]:
234 r.range_next.range_prev = r.range_prev.range_next = r
235 ( when links are different from None )
237 [range]s have a backlink to their "owner" [block]:
239 b.block_ranges.{range_next}*.{range_prev}*.range_block = b
241 ranges offsets are all within their block's offsets limits, do not
242 overlap, and are sorted in increasing offsets order:
244 b.block_begin <= b.block_ranges.block_begin ... <=
245 r.range_prev.range_end <= r.range_begin <=
246 r.range_end <= r.range_next.range_begin <= ...
247 <= b.block_end *)
249 (* range owners are only used thru uploaders.up_ranges. blocks could be
250 saved in [uploaders]' [up_ranges] along range, but would
251 need updating when the swarmer is splitted.
253 Removing [range] from [up_ranges] and [range_nuploading] from
254 [range] could be good too, because they're not correctly updated
255 when the swarmer is splitted. Again, getting rid of them is a
256 problem of performance.
259 (* block invariants
260 Data missing for a block is the sum of the "sizes" of its ranges.
262 b.block_remaining = sum (r.range_end - r.range_begin) b.block_ranges
265 (* swarmer invariants ?
266 s.s_verified_bitmap.[i] = State_missing <=> s_blocks.[i] = EmptyBlock
267 s.s_verified_bitmap.[i] = State_partial <=> s_blocks.[i] = PartialBlock _
268 s.s_verified_bitmap.[i] = State_complete <=> s_blocks.[i] = CompletedBlock
269 s.s_verified_bitmap.[i] = State_verified <=> s_blocks.[i] = VerifiedBlock
270 If so, why not drop s_verified_bitmap, and replace it by some
271 verified_bitmap s i and verified_bitmap_all s functions ?
274 (* frontend invariants ?
275 t_ncomplete_chunks =
276 List.length (List.filter (fun x -> x >= State_complete) t_converted_verified_bitmap)
277 t_nverified_chunks =
278 List.length (List.filter (fun x -> x = State_verified) t_converted_verified_bitmap)
280 hence t_ncomplete_chunks >= t_nverified_chunks
282 All chunks are [t_chunk_size] bytes in size, and first start at
283 offset 0. This is assumed in [create], [associate], [verify_chunk],
284 [duplicate_chunks], maybe more.
287 (* uploaders invariants ?
288 uploader block numbers are stored in reverse order in
289 up_complete_blocks and up_partial_blocks (first blocks at the end
290 of arrays), then array is processed from end to begin.
292 0 <= up_ncomplete < Array.length up_complete_blocks
293 0 <= up.up_npartial < Array.length up_partial_blocks
295 When a block has been selected, it's pushed out of the first
296 up_ncomplete first elements of up_complete_blocks by swapping it
297 with the #(up_ncomplete-1) element, then up_ncomplete is
298 decreased. (and similarly with s/complete/partial/ ?)
300 The question is now, aren't there better datastructures than
301 arrays for the job ? ;)
304 (*************************************************************************)
305 (* *)
306 (* Global values *)
307 (* *)
308 (*************************************************************************)
310 module HS = Weak.Make(struct
311 type t = swarmer
312 let hash file = Hashtbl.hash file.s_filename
314 let equal x y = x.s_filename = y.s_filename
315 end)
317 let swarmers_by_name = HS.create 31
319 module HT = Weak.Make(struct
320 (* since type declarations are recursive, One can't write type t = t *)
321 (* this is a workaround; a bit ugly, but works *)
322 type frontend = t
323 type t = frontend
324 let hash t = t.t_num
326 let equal x y = x.t_num = y.t_num
327 end)
329 let frontends_by_num = HT.create 31
331 module HU = Weak.Make(struct
332 type t = uploader
333 let hash u = Hashtbl.hash (client_num u.up_client)
335 let equal x y = (client_num x.up_client) = (client_num y.up_client)
336 end)
338 let uploaders_by_num = HU.create 113
340 let frontend_counter = ref 0
341 let swarmer_counter = ref 0
343 (** sets [t.t_last_seen] of the verified blocks to current time, and
344 associated file's [t.t_s.s_file] last seen value to the oldest of the
345 remaining last seen values *)
347 let compute_last_seen t =
348 let last_seen_total = ref (BasicSocket.last_time ()) in
349 VB.iteri (fun i c ->
350 if c = VB.State_verified then
351 t.t_last_seen.(i) <- BasicSocket.last_time ()
352 else
353 last_seen_total := min !last_seen_total t.t_last_seen.(i)
354 ) t.t_converted_verified_bitmap;
355 set_file_last_seen t.t_file !last_seen_total;
356 t.t_last_seen
358 (** (internal) return a 0 sized range at offset [pos], and assigned to
359 block [b] *)
361 let void_range b pos =
362 let r = {
363 range_prev = None;
364 range_next = None;
365 range_begin = pos;
366 range_end = pos;
367 range_block = b;
368 range_nuploading = 0;
373 let compute_range_size r =
374 r.range_end -- r.range_begin
376 let rec ranges_iter f r =
377 f r;
378 match r.range_next with
379 | None -> ()
380 | Some r ->
381 ranges_iter f r
383 let rec ranges_fold f acc r =
384 let acc = f acc r in
385 match r.range_next with
386 | None -> acc
387 | Some rr -> ranges_fold f acc rr
389 let rec ranges_for_all p r =
390 p r &&
391 (match r.range_next with
392 | None -> true
393 | Some r -> ranges_for_all p r)
395 let block_ranges_for_all p b =
396 ranges_for_all p b.block_ranges
398 let block_ranges_fold f acc b =
399 ranges_fold f acc b.block_ranges
401 (** (internal) assigns range [r], and all other ranges along
402 [range_next] links, to block [b] *)
404 let rec own_ranges b r =
405 ranges_iter (fun r -> r.range_block <- b) r
407 (** (internal)
408 Find ranges that are after [cut_pos] offset, unlink them from r
409 double-linked list of ranges, set their owner to [b] and return
410 the first of the removed ranges.
412 If all ranges are before [cut_pos] return a 0-sized range.
414 If [cut_pos] is within one of the ranges, that range is cut in
415 two at [cut_pos] offset, and link each half to its side.
417 Also, what do to if range_nuploading is not 0 ?
418 => [cut_ranges_after] is being called from [split_blocks] that
419 does not preserve [s_nuploading] for blocks either
422 let cut_ranges_after b r cut_pos =
423 let rec iter r =
424 (* after the cut position already ? *)
425 if r.range_begin >= cut_pos then begin
426 (match r.range_prev with
427 | None ->
428 let b1 = r.range_block in
429 b1.block_ranges <- void_range b1 cut_pos
430 | Some rp ->
431 rp.range_next <- None;
432 r.range_prev <- None);
434 end
435 (* still before the cut position ? *)
436 else if r.range_end <= cut_pos then
437 match r.range_next with
438 | None -> void_range b cut_pos
439 | Some r -> iter r
440 else
441 (* across cut position, must split a range *)
442 (* "right" half *)
443 let split_r = { r with
444 range_begin = cut_pos;
445 range_prev = None;
446 } in
447 (match split_r.range_next with
448 | None -> ()
449 | Some rr -> rr.range_prev <- Some split_r);
451 (* "left" half *)
452 r.range_end <- cut_pos;
453 r.range_next <- None;
455 if r.range_nuploading <> 0 then
456 lprintf_nl "WARNING: Splitting a range currently being uploaded, don't know what to do with range_nuploading :/";
458 split_r in
459 let cut_ranges = iter r in
460 own_ranges b cut_ranges;
461 cut_ranges
463 (** (internal) return the offset of the end of the [i]th block of
464 swarmer [s] *)
466 let compute_block_end s i =
467 let b = s.s_block_pos in
468 if Array.length b = i + 1 then
469 s.s_size
470 else
471 b.(i+1)
473 (** (internal) return the offset of the beginning of the [i]th block
474 of swarmer [s] *)
476 let compute_block_begin s i =
477 let b = s.s_block_pos in
478 b.(i)
480 (** Finds the number of the block containing [chunk_pos] offset, using
481 dichotomy. Blocks are half opened [block_begin, block_end[ *)
483 (* 0 <= chunk_pos < s.s_size *)
484 let compute_block_num s chunk_pos =
485 assert (0L <= chunk_pos && chunk_pos < s.s_size);
486 let b = s.s_block_pos in
487 (* invariants:
488 0 <= min <= max <= Array.length b - 1
489 compute_block_begin s min <= chunk_pos < compute_block_end s max *)
491 let rec iter min max =
492 if min = max then min
493 else (* from now on, min < max *)
494 let medium = (min + max) / 2 in
495 (* Euclide => 2*medium <= min + max <= 2*medium + 1 *)
496 (* min < max => 2*min < min + max < 2*max
497 => min <= medium < max *)
498 if min < medium then
499 if chunk_pos < b.(medium) then
500 iter min (medium - 1)
501 else
502 iter medium max
503 else
504 (* min = medium < max => 2*min < min + max <= 2*min + 1
505 <=> min < max <= min + 1
506 <=> min + 1 = max *)
507 if chunk_pos < b.(max) then
508 min else max
510 let i = iter 0 (Array.length b - 1) in
511 if debug_all then
512 lprintf_nl "%Ld is block %d [%Ld-%Ld]" chunk_pos i
513 (compute_block_begin s i) (compute_block_end s i);
516 (** Return true if ranges fully "cover" their block
517 ("the block is made of holes") *)
519 let block_is_empty b =
520 let rec iter begin_pos r =
521 r.range_begin = begin_pos &&
522 (match r.range_next with
523 | Some rr -> iter r.range_end rr
524 | None -> r.range_end = b.block_end)
526 iter b.block_begin b.block_ranges
528 let block_is_full b =
529 let r = b.block_ranges in
530 r.range_next = None && r.range_begin = r.range_end
532 (** iter function [f] over all the blocks contained in the list of [intervals]
534 [f] will receive block number, block beginning and ending offsets,
535 and overlapping interval beginning and ending offsets.
537 If an interval starts halfway of a block, iteration starts on the
538 next block, with interval_begin < block_begin indicating where the
539 interval really started.
541 If an interval ends halfway of a block, iteration ends on that
542 block, with interval_end < block_end indicating where the interval
543 really ended.
546 let iter_intervals s f intervals =
547 let nchunks = Array.length s.s_blocks in
548 List.iter (fun (interval_begin, interval_end) ->
549 let interval_begin = min interval_begin s.s_size in
550 let interval_end = min interval_end s.s_size in
551 (* lprintf "apply on %Ld-%Ld\n" interval_begin interval_end; *)
552 if interval_begin < interval_end then
553 let i0 = compute_block_num s interval_begin in
554 let block_begin = compute_block_begin s i0 in
555 let rec iter_blocks i block_begin interval_begin =
556 (* lprintf "iter_blocks %d %Ld %Ld\n" i block_begin interval_begin; *)
557 if i < nchunks && block_begin < interval_end then
558 let block_end = compute_block_end s i in
559 let current_end = min block_end interval_end in
561 if debug_all then
562 lprintf_nl "Apply: %d %Ld-%Ld %Ld-%Ld"
563 i block_begin block_end interval_begin current_end;
565 f i block_begin block_end interval_begin current_end;
566 iter_blocks (i+1) block_end block_end
568 iter_blocks i0 block_begin interval_begin
569 ) intervals
571 (*************************************************************************)
572 (* *)
573 (* Swarmers *)
574 (* *)
575 (*************************************************************************)
577 let dummy_swarmer = {
578 s_num = 0;
579 s_filename = "";
580 s_size = zero;
581 s_disk_allocation_block_size = zero;
582 s_networks = [];
583 s_strategy = AdvancedStrategy;
584 s_verified_bitmap = VB.create 0 VB.State_missing;
585 s_priorities_bitmap = "";
586 s_priorities_intervals = [(zero, 1)];
587 s_disk_allocated = Bitv.create 0 false;
588 s_blocks = [||];
589 s_block_pos = [||];
590 s_availability = [||];
591 s_nuploading = [||];
594 let insert_prio_interval intervals last_end (new_start,new_end,new_prio) =
595 assert (new_start < new_end);
596 assert (new_end <= last_end);
597 let rec insert_end prev_prio = function
598 | [] -> if new_end = last_end then [] else if new_prio <> prev_prio then [new_end, prev_prio] else []
599 | (pos,prio as this) :: tail ->
600 match Int64.compare pos new_end with
601 | -1 -> insert_end prio tail (* eat it *)
602 | 1 ->
603 if new_prio <> prev_prio then
604 (new_end, prev_prio) :: this :: tail
605 else
606 this :: tail
607 | 0 ->
608 assert (prio <> prev_prio);
609 if new_prio <> prio then this :: tail else tail
610 | _ -> assert false
612 let rec insert prev_prio = function (* not tail rec ! *)
613 | [] ->
614 if new_prio <> prev_prio then
615 (new_start, new_prio) :: insert_end prev_prio []
616 else
617 insert_end prev_prio []
618 | (pos, prio as this) :: tail ->
619 match Int64.compare pos new_start with
620 | -1 -> this :: insert prio tail (* leave current and continue searching *)
621 | 1 ->
622 if new_prio <> prev_prio then
623 (new_start, new_prio) :: insert_end prev_prio (this::tail) (* mark new interval and search interval end *)
624 else
625 insert_end prev_prio (this::tail) (* start of new interval gets merged with previous interval *)
626 | 0 ->
627 assert (prio <> prev_prio); (* invariant *)
628 if new_prio <> prev_prio then
629 (new_start, new_prio) :: insert_end prio tail
630 else
631 insert_end prio tail
632 | _ -> assert false
634 insert (-1) intervals
636 let rec validate_intervals limit = function
637 | (p1,prio1)::((p2,prio2)::_ as tail) when prio1 <> prio2 && p1 < p2 -> validate_intervals limit tail
638 | [p,_] when p < limit -> true
639 | _ -> false
641 let priority_zero = Char.chr 0
643 let swarmer_recompute_priorities_bitmap s =
644 String.fill s.s_priorities_bitmap 0
645 (String.length s.s_priorities_bitmap) priority_zero;
646 let mark interval_begin interval_end priority =
647 if interval_end > interval_begin && s.s_size >= interval_end && interval_begin >= 0L then
648 if priority = 0 then
649 () (* do not mark - zero blocks will not overwrite boundaries of non-zero blocks *)
650 else
651 begin
652 let i_begin = compute_block_num s interval_begin in
653 let i_end = compute_block_num s (Int64.pred interval_end) in
654 let priochar = Char.chr (max 0 (min priority 255)) in
655 (* String.fill s.s_priorities_bitmap i_begin (i_end - i_begin + 1) priochar *)
656 for i = i_begin to i_end do
657 s.s_priorities_bitmap.[i] <- priochar
658 done
660 else
661 lprintf_nl "WARNING: recompute_priorities %Ld %Ld %Ld" interval_begin interval_end s.s_size
663 let rec loop = function
664 | (i_begin, priority) :: ((i_end,_) :: _ as tail) -> mark i_begin i_end priority; loop tail
665 | [i_begin, priority] -> mark i_begin s.s_size priority
666 | [] -> lprintf_nl "WARNING: recompute_priorities []"
668 loop s.s_priorities_intervals
670 (* Intervals with fixed byte positions are needed to recompute priorities bitmap
671 after merge, cause priobitmap depends on block size *)
672 let swarmer_set_interval s (p1,p2,prio as new_interval) =
673 if !verbose then
674 lprintf_nl "swarmer_set_interval %S %Ld (%Ld,%Ld,%u)" s.s_filename s.s_size p1 p2 prio;
675 s.s_priorities_intervals <- insert_prio_interval s.s_priorities_intervals s.s_size new_interval;
676 swarmer_recompute_priorities_bitmap s
678 (** if a swarmer is already associated with that [file_name], return it;
679 Otherwise create a new one with default values, that will be fixed
680 by the first frontend association *)
681 let create_swarmer file_name file_size =
683 HS.find swarmers_by_name
684 { dummy_swarmer with
685 s_filename = file_name
687 with Not_found ->
688 incr swarmer_counter;
690 let nblocks = 1 in
691 (* to avoid extreme disk fragmentation, space is pre-allocated in
692 blocks of this size *)
693 let disk_allocation_block_size =
694 min (megabytes 10)
695 (round_up64 (max 1L (file_size // 200L)) (megabytes 1)) in
696 let ndiskblocks =
697 1 + Int64.to_int (Int64.pred file_size // disk_allocation_block_size) in
698 let rec s = {
700 s_num = !swarmer_counter;
701 s_filename = file_name;
702 s_size = file_size;
703 s_disk_allocation_block_size = disk_allocation_block_size;
705 s_networks = [];
707 s_strategy = AdvancedStrategy;
709 s_verified_bitmap = VB.create nblocks VB.State_missing;
710 s_priorities_bitmap = String.make nblocks priority_zero;
711 s_priorities_intervals = [(zero, 1)]; (* JAVE init all prios to 1, thus all chunks will be downloaded as usual *)
712 s_disk_allocated = Bitv.create ndiskblocks false;
713 s_blocks = Array.create nblocks EmptyBlock ;
714 s_block_pos = Array.create nblocks zero;
715 s_availability = Array.create nblocks 0;
716 s_nuploading = Array.create nblocks 0;
717 (* s_last_seen = Array.create nblocks 0; *)
720 swarmer_recompute_priorities_bitmap s;
721 HS.add swarmers_by_name s;
724 (** Split swarmer existing blocks in at [chunk_size] boundaries *)
725 let split_blocks s chunk_size =
727 let size = s.s_size in
729 let nblocks = Array.length s.s_blocks in
730 (* Split existing blocks at [chunk_size] boundaries
731 invariants:
732 [index_s] is the index of the existing block being analysed
733 [chunk_begin] is the offset of the beginning of the current containing chunk
734 [new_blocks] is the list of new blocks already splitted, in
735 reverse order.
736 List contains tuples: block, beginning offset, verified status char *)
737 let rec iter index_s chunk_begin new_blocks =
738 (* lprintf "iter (1) %d/%d %Ld\n" index_s nblocks chunk_begin; *)
739 if index_s = nblocks then List.rev new_blocks else
741 (* existing block *)
742 let block_begin = compute_block_begin s index_s in
743 let block_end = compute_block_end s index_s in
745 (* current chunk *)
746 let chunk_end = chunk_begin ++ chunk_size in
747 let chunk_end = min chunk_end size in
749 if chunk_end > block_end then
750 let new_blocks = (
751 s.s_blocks.(index_s),
752 block_begin,
753 VB.get s.s_verified_bitmap index_s
754 ) :: new_blocks in
755 iter (index_s+1) chunk_begin new_blocks
757 else if chunk_end = block_end then
758 let new_blocks = (
759 s.s_blocks.(index_s),
760 block_begin,
761 VB.get s.s_verified_bitmap index_s
762 ) :: new_blocks in
763 iter (index_s+1) chunk_end new_blocks
765 else begin
766 (* chunk_end < block_end
767 We need to split this block in two parts *)
768 s.s_block_pos.(index_s) <- chunk_end;
769 match s.s_blocks.(index_s) with
770 | EmptyBlock | CompleteBlock | VerifiedBlock ->
772 (* s.s_blocks.(index_s) will appear twice in the result list *)
773 let new_blocks = (
774 s.s_blocks.(index_s),
775 block_begin,
776 VB.get s.s_verified_bitmap index_s
777 ) :: new_blocks in
778 iter index_s chunk_end new_blocks
780 | PartialBlock b1 ->
781 (* split b1 in two; b2 is the part after [chunk_end] offset *)
782 let b2 = {
783 block_s = s;
785 block_begin = chunk_end;
786 block_end = b1.block_end;
787 block_ranges = b1.block_ranges; (* fixed below *)
788 block_num = 0; (* fixed below *)
789 block_remaining = zero; (* fixed below *)
790 block_unselected_remaining = zero; (* fixed below *)
791 } in
792 b2.block_ranges <- cut_ranges_after b2 b1.block_ranges chunk_end;
793 b1.block_end <- chunk_end;
795 let new_blocks =
796 (if block_is_full b1 then
797 (* lprintf "Partial block b1 should become CompleteBlock\n"; *)
799 CompleteBlock,
800 block_begin,
801 VB.State_complete
802 ) else if block_is_empty b1 then
803 (* lprintf "Partial block b1 should become EmptyBlock\n"; *)
805 EmptyBlock,
806 block_begin,
807 VB.State_missing
808 ) else (
809 PartialBlock b1,
810 block_begin,
811 VB.get s.s_verified_bitmap index_s
813 :: new_blocks in
815 if block_is_full b2 then begin
816 (* lprintf "Partial block b2 should become CompleteBlock\n"; *)
817 s.s_blocks.(index_s) <- CompleteBlock;
818 VB.set s.s_verified_bitmap index_s VB.State_complete
820 else if block_is_empty b2 then begin
821 (* lprintf "Partial block b2 should become EmptyBlock\n"; *)
822 s.s_blocks.(index_s) <- EmptyBlock;
823 VB.set s.s_verified_bitmap index_s VB.State_missing;
824 end
825 else
826 s.s_blocks.(index_s) <- PartialBlock b2;
827 iter index_s chunk_end new_blocks
830 let blocks = iter 0 zero [] in
832 (* blocks have been splitted, now rebuild swarmer *)
833 let nblocks = List.length blocks in
834 (* lprintf "%d blocks to generate\n" nblocks; *)
836 if Array2.exists ((<>) 0) s.s_availability then
837 lprintf_nl "WARNING: splitting swarmer discarded availability counters";
838 if Array2.exists ((<>) 0) s.s_nuploading then
839 lprintf_nl "WARNING: splitting a swarmer beging uploaded to";
841 s.s_blocks <- Array.create nblocks EmptyBlock;
842 s.s_verified_bitmap <- VB.create nblocks VB.State_missing;
843 s.s_priorities_bitmap <- String.make nblocks priority_zero;
844 s.s_block_pos <- Array.create nblocks zero;
845 s.s_availability <- Array.create nblocks 0; (* not preserved ? *)
846 s.s_nuploading <- Array.create nblocks 0; (* not preserved ? *)
847 (* s.s_last_seen <- Array.create nblocks 0; *)
849 let rec iter i list =
850 match list with
851 | [] -> ()
852 | (b, pos, c) :: tail ->
853 begin
854 match b with
855 | PartialBlock b ->
856 begin
857 b.block_num <- i;
858 let block_size = compute_block_end s i --
859 compute_block_begin s i in
860 let remaining, unselected_remaining =
861 block_ranges_fold (fun (racc, uracc) r ->
862 let range_size = compute_range_size r in
863 (racc -- range_size,
864 if r.range_nuploading = 0 then
865 uracc -- range_size else uracc)
866 ) (block_size, block_size) b in
867 b.block_remaining <- remaining;
868 b.block_unselected_remaining <- unselected_remaining;
870 | EmptyBlock | CompleteBlock | VerifiedBlock -> ()
871 end;
872 s.s_blocks.(i) <- b;
873 VB.set s.s_verified_bitmap i c;
874 s.s_block_pos.(i) <- pos;
876 iter (i+1) tail
878 iter 0 blocks;
879 swarmer_recompute_priorities_bitmap s
882 (** Associate a(n additional) frontend to a swarmer *)
884 let associate is_primary t s =
885 (* a swarmer cannot be associated more than once to a network *)
886 if not (List.memq t s.s_networks) then
887 let size = file_size t.t_file in
889 (* what about raising an exception instead ? *)
890 if s.s_size <> size then begin
891 lprintf_nl "file_size for %s does not match: swarmer %Ld / real %Ld" s.s_filename s.s_size size;
892 exit 2
893 end;
895 t.t_s <- s;
896 t.t_converted_verified_bitmap <- VB.create t.t_nchunks VB.State_missing;
897 t.t_last_seen <- Array.create t.t_nchunks 0;
898 t.t_chunk_of_block <- [||];
899 t.t_blocks_of_chunk <- Array.create t.t_nchunks [];
901 (* invariant: primary frontend is at the head of swarmer's
902 [s_networks], and is the first associated with the swarmer *)
903 if is_primary then begin
904 t.t_primary <- true;
905 assert(s.s_networks = []);
906 s.s_networks <- [t]
907 (* was s.s_networks <- t :: s.s_networks; *)
908 end else begin
909 match s.s_networks with
910 | tprim :: _ ->
911 assert(tprim.t_primary);
912 if file_disk_name t.t_file <> file_disk_name tprim.t_file then
913 (* TODO: transfer data into swarmer instead of discarding it *)
914 Unix32.remove (file_fd t.t_file);
915 t.t_primary <- false;
916 s.s_networks <- s.s_networks @ [t];
917 | [] -> assert false
918 end;
920 (match s.s_networks with
921 | t :: tail ->
922 assert(t.t_primary);
923 List.iter (fun tt -> assert(not tt.t_primary)) tail
924 | [] -> assert false);
926 (* at this point, we are supposed to split the blocks in the swarmer
927 in smaller blocks depending on the block_size of this network, and compute
928 the t_chunk_of_block and t_blocks_of_chunk fields. *)
930 let chunk_size = t.t_chunk_size in
931 split_blocks s chunk_size;
933 let nblocks = Array.length s.s_blocks in
934 (* For all networks, adjust the chunks and mappings *)
935 List.iter (fun t ->
936 let nchunks = VB.length t.t_converted_verified_bitmap in
937 t.t_chunk_of_block <- Array.create nblocks 0;
938 t.t_blocks_of_chunk <- Array.create nchunks [];
940 let chunk_size = t.t_chunk_size in
941 for i = 0 to nblocks - 1 do
942 let block_begin = compute_block_begin s i in
943 let chunk = Int64.to_int (block_begin // chunk_size) in
944 t.t_chunk_of_block.(i) <- chunk;
945 t.t_blocks_of_chunk.(chunk) <- i :: t.t_blocks_of_chunk.(chunk)
946 done
947 ) s.s_networks;
949 (* TODO: If not primary, set_file_downloaded should be called *)
950 if not is_primary then
951 add_file_downloaded t.t_file (zero -- file_downloaded t.t_file);
953 (* check that all frontends use the primary's file backend *)
954 (match s.s_networks with
955 | t :: tail when is_primary ->
956 List.iter (fun tt ->
957 set_file_fd tt.t_file (file_fd t.t_file)
958 ) tail
959 | tprim :: tail ->
960 set_file_fd t.t_file (file_fd tprim.t_file)
961 | [] -> assert false)
963 (** Create a primary frontend and its swarmer *)
965 let create ss file chunk_size =
967 incr frontend_counter;
968 let size = file_size file in
969 (* wrong if size is a multiple of chunk_size, or on purpose ? *)
970 let nchunks =
971 1 + Int64.to_int (Int64.pred size // chunk_size) in
973 let t = {
974 t_num = !frontend_counter;
975 t_s = ss;
976 t_primary = true;
977 t_file = file;
979 t_nchunks = nchunks;
980 t_chunk_size = chunk_size;
982 t_ncomplete_chunks = 0;
983 t_nverified_chunks = 0;
985 t_converted_verified_bitmap = VB.create nchunks VB.State_missing;
986 t_last_seen = Array.create nchunks 0;
988 t_verifier = NoVerification;
989 t_verified = (fun _ _ -> ());
991 t_chunk_of_block = [||];
992 t_blocks_of_chunk = Array.create nchunks [];
995 HT.add frontends_by_num t;
996 associate true t ss;
999 (* copy the disk block over itself; Safer than overwriting it with zeroes *)
1001 let iter_disk_space f t interval_begin interval_end =
1003 (* 0 <= chunk_pos < s.s_size *)
1004 let compute_disk_block_num s chunk_pos =
1005 assert (0L <= chunk_pos && chunk_pos < s.s_size);
1006 Int64.to_int (chunk_pos // s.s_disk_allocation_block_size) in
1008 if interval_begin < interval_end then
1009 let s = t.t_s in
1010 let fd = file_fd t.t_file in
1011 let first_disk_block = compute_disk_block_num s interval_begin in
1012 let last_disk_block = compute_disk_block_num s (Int64.pred interval_end) in
1013 for disk_block = first_disk_block to last_disk_block do
1014 f s fd disk_block
1015 done
1017 exception Not_preallocated_block_found
1019 let is_fully_preallocated t interval_begin interval_end =
1021 iter_disk_space (fun s fd disk_block ->
1022 if not(Bitv.get s.s_disk_allocated disk_block) then
1023 raise Not_preallocated_block_found
1024 ) t interval_begin interval_end;
1025 true
1026 with Not_preallocated_block_found -> false
1028 let preallocate_disk_space t interval_begin interval_end =
1029 iter_disk_space (fun s fd disk_block ->
1030 if not(Bitv.get s.s_disk_allocated disk_block) then begin
1031 let pos = (Int64.of_int disk_block) ** s.s_disk_allocation_block_size in
1032 let size = min (file_size t.t_file -- pos) s.s_disk_allocation_block_size in
1033 if !verbose then lprintf_nl "preallocating %Ld bytes for %s" size (file_best_name t.t_file);
1034 Unix32.copy_chunk fd fd pos pos (Int64.to_int size)
1035 end;
1036 Bitv.set s.s_disk_allocated disk_block true
1037 ) t interval_begin interval_end
1039 let mark_disk_space_preallocated t interval_begin interval_end =
1040 iter_disk_space (fun s fd disk_block ->
1041 Bitv.set s.s_disk_allocated disk_block true
1042 ) t interval_begin interval_end
1044 let check_finished t =
1045 let file = t.t_file in
1046 match file_state file with
1047 | FileNew
1048 | FileCancelled
1049 | FileAborted _
1050 | FileShared
1051 | FileDownloaded
1052 | FileQueued
1053 | FilePaused ->
1054 false
1055 | FileDownloading ->
1056 if VB.existsi (fun i c -> c <> VB.State_verified)
1057 t.t_converted_verified_bitmap then false
1058 else begin
1059 if file_size file <> file_downloaded t.t_file then
1060 lprintf_nl "Downloaded size differs after complete verification";
1061 true
1064 (** (debug) output a [swarmer] to current log *)
1066 let print_s str s =
1067 lprintf_nl "Ranges after %s:" str;
1069 let rec iter r =
1070 lprintf_n " %Ld-%Ld(%d)"
1071 r.range_begin r.range_end r.range_nuploading;
1072 match r.range_next with
1073 | None -> lprint_newline ()
1074 | Some r -> iter r in
1076 Array.iteri (fun i b ->
1077 lprintf_n " %d: " i;
1078 let block_begin = compute_block_begin s i in
1079 let block_end = compute_block_end s i in
1080 lprintf "%Ld - %Ld [%Ld] %c " block_begin block_end
1081 (block_end -- block_begin)
1082 (VB.state_to_char
1083 (VB.get s.s_verified_bitmap i));
1084 List.iter (fun t ->
1085 let j = t.t_chunk_of_block.(i) in
1086 lprintf "(b %d %c [" j
1087 (VB.state_to_char
1088 (VB.get t.t_converted_verified_bitmap j));
1089 List.iter (fun ii -> lprintf "%d " ii) t.t_blocks_of_chunk.(j);
1090 lprintf "]";
1091 ) s.s_networks;
1093 match b with
1094 | PartialBlock b ->
1095 lprintf " [%Ld .. %Ld] --> "
1096 b.block_begin b.block_end;
1097 iter b.block_ranges
1098 | EmptyBlock -> lprintf_nl "_"
1099 | CompleteBlock -> lprintf_nl "C"
1100 | VerifiedBlock -> lprintf_nl "V"
1101 ) s.s_blocks;
1103 lprintf_nl "Files:";
1104 List.iter (fun t ->
1105 lprintf_nl " File num: %d" (file_num t.t_file);
1106 lprintf_nl " %s" (if t.t_primary then "primary" else "secondary");
1107 lprintf_nl " Downloaded: %Ld" (file_downloaded t.t_file);
1108 lprintf_nl " Bitmap: %s" (VB.to_string t.t_converted_verified_bitmap)
1109 ) s.s_networks
1111 (** iter function f over all the ranges of a block *)
1113 let iter_block_ranges f b =
1114 let rec iter_range f r =
1115 let next = r.range_next in (* keep next range in case f mutates it *)
1116 f r;
1117 match next with
1118 | None -> ()
1119 | Some rr -> iter_range f rr
1121 iter_range f b.block_ranges
1123 (** (debug) output a [block] to current log *)
1125 let print_block b =
1126 lprintf_n "Block %d: %Ld-%Ld"
1127 b.block_num b.block_begin b.block_end;
1128 lprint_newline ();
1129 lprintf_nl " ranges:";
1130 iter_block_ranges (fun r ->
1131 lprintf_nl " %Ld-%Ld" r.range_begin r.range_end) b;
1132 lprint_newline ()
1134 (** (shadows CommonFile.add_file_downloaded)
1135 increments amount downloaded of the primary frontend of the swarmer,
1136 and of maybe_t, if provided, and if it's different from the primary. *)
1138 let add_file_downloaded maybe_t s size =
1139 (* lprintf "add_file_downloaded %Ld\n" size; *)
1140 match s.s_networks with
1141 | t :: _ ->
1142 assert(t.t_primary);
1143 add_file_downloaded t.t_file size;
1144 (match maybe_t with
1145 | None -> ()
1146 | Some tt ->
1147 if t.t_num <> tt.t_num then
1148 add_file_downloaded tt.t_file size);
1149 if file_downloaded t.t_file < zero then
1150 lprintf_nl "ERROR: file_downloaded < zero!";
1152 | _ -> assert false
1154 (** Close all the ranges of a block, adding their size to the
1155 downloaded amount *)
1157 let close_block_ranges maybe_t s b =
1158 iter_block_ranges (fun r ->
1159 let added = compute_range_size r in
1160 add_file_downloaded maybe_t s added;
1161 b.block_remaining <- b.block_remaining -- added;
1162 if r.range_nuploading = 0 then
1163 b.block_unselected_remaining <- b.block_unselected_remaining -- added;
1164 r.range_begin <- r.range_end;
1165 r.range_prev <- None;
1166 r.range_next <- None) b;
1167 if b.block_remaining <> 0L then
1168 lprintf_nl "WARNING: block_remaining should be 0 after close_block_ranges";
1169 if b.block_unselected_remaining <> 0L then
1170 lprintf_nl "WARNING: block_unselected_remaining should be 0 after close_block_ranges"
1172 (*************************************************************************)
1173 (* *)
1174 (* swarmers verified bitmaps *)
1175 (* *)
1176 (*************************************************************************)
1178 (* For every swarmer, there is a "primary" verifier, and secondary
1179 verifiers. When a block is downloaded, it is tagged State_complete
1180 in the verified_bitmap, and this State_complete is propagated to
1181 the primary bitmap if possible (if all sub-blocks are also
1182 State_complete).
1184 If the primary chunk becomes State_complete, then a verification is
1185 needed on the primary. If the verification works, the
1186 verified_bitmap becomes State_verified, and the secondary verifiers
1187 are tagged with State_complete (if they use a different
1188 verification scheme) or State_verified (if no verification scheme
1189 or a verification scheme that has already been used).
1192 (* corruption has been detected, and the block has been reset to
1193 State_missing *)
1195 let set_swarmer_state_missing s i =
1196 let current_state = VB.get s.s_verified_bitmap i in
1197 match current_state with
1198 | VB.State_complete | VB.State_verified ->
1199 (VB.set s.s_verified_bitmap i VB.State_missing;
1200 List.iter (fun t ->
1201 let j = t.t_chunk_of_block.(i) in
1202 match VB.get t.t_converted_verified_bitmap j with
1203 | VB.State_missing -> ()
1204 | VB.State_partial ->
1205 if List.for_all (fun i -> VB.get s.s_verified_bitmap i = VB.State_missing)
1206 t.t_blocks_of_chunk.(j) then
1207 VB.set t.t_converted_verified_bitmap j VB.State_missing
1208 | VB.State_complete ->
1209 lprintf_nl "set_swarmer_state_missing: invalidating a block within a completed chunk?"
1210 | VB.State_verified ->
1211 lprintf_nl "set_swarmer_state_missing: invalidating a block within a verified chunk?"
1212 ) s.s_networks)
1213 | VB.State_missing -> ()
1214 | VB.State_partial ->
1215 lprintf_nl "set_swarmer_state_missing: invalidating a partial block ?"
1217 (* we have started downloading this block, so mark all containing chunks
1218 also as started. *)
1219 let set_swarmer_state_partial s i =
1220 match VB.get s.s_verified_bitmap i with
1221 | VB.State_missing ->
1222 VB.set s.s_verified_bitmap i VB.State_partial;
1223 List.iter (fun t ->
1224 let j = t.t_chunk_of_block.(i) in
1225 match VB.get t.t_converted_verified_bitmap j with
1226 | VB.State_missing ->
1227 VB.set t.t_converted_verified_bitmap j VB.State_partial
1228 | VB.State_partial -> ()
1229 | VB.State_complete ->
1230 lprintf_nl "set_swarmer_state_partial: partial block within a completed chunk?"
1231 | VB.State_verified ->
1232 lprintf_nl "set_swarmer_state_partial: partial block within a verified chunk?"
1233 ) s.s_networks
1234 | VB.State_partial -> ()
1235 | VB.State_complete ->
1236 lprintf_nl "set_swarmer_state_partial: trying to demote a completed block?"
1237 | VB.State_verified ->
1238 lprintf_nl "set_swarmer_state_partial: trying to demote a verified block?"
1241 (* we finished this block, trying to escalate to primary frontend
1242 verification bitmap *)
1243 let set_swarmer_state_complete s i =
1244 match VB.get s.s_verified_bitmap i with
1245 | VB.State_missing | VB.State_partial ->
1246 (VB.set s.s_verified_bitmap i VB.State_complete;
1247 match s.s_networks with
1248 | t :: _ ->
1249 assert (t.t_primary);
1250 let j = t.t_chunk_of_block.(i) in
1251 (match VB.get t.t_converted_verified_bitmap j with
1252 | VB.State_missing | VB.State_partial ->
1253 if List.for_all (fun i -> VB.get s.s_verified_bitmap i = VB.State_complete)
1254 t.t_blocks_of_chunk.(j) then begin
1255 t.t_ncomplete_chunks <- t.t_ncomplete_chunks + 1;
1256 VB.set t.t_converted_verified_bitmap j VB.State_complete
1258 | VB.State_complete -> ()
1259 | VB.State_verified ->
1260 (* lprintf_nl "set_swarmer_state_complete: trying to demote a verified block? (1)" *)
1262 | [] -> assert false)
1263 | VB.State_complete -> ()
1264 | VB.State_verified ->
1265 (* lprintf_nl "set_swarmer_state_complete: trying to demote a verified block? (2)" *)
1266 VB.set s.s_verified_bitmap i VB.State_complete;
1267 match s.s_networks with
1268 | t :: _ ->
1269 assert (t.t_primary);
1270 let j = t.t_chunk_of_block.(i) in
1271 (match VB.get t.t_converted_verified_bitmap j with
1272 | VB.State_verified ->
1273 VB.set t.t_converted_verified_bitmap j VB.State_complete;
1274 t.t_nverified_chunks <- t.t_nverified_chunks - 1;
1275 | VB.State_complete -> ()
1276 | VB.State_missing | VB.State_partial ->
1277 lprintf_nl "BUG: set_swarmer_state_complete: demoting a verified block from an incomplete chunk")
1278 | [] -> assert false
1281 (* the primary verifier has worked, so let ask secondary ones for
1282 verification too *)
1283 let set_swarmer_state_verified s i =
1284 match VB.get s.s_verified_bitmap i with
1285 | VB.State_missing | VB.State_partial | VB.State_complete ->
1286 (VB.set s.s_verified_bitmap i VB.State_verified;
1287 (* lprintf "set_swarmer_state_verified %d done\n" i; *)
1288 match s.s_networks with
1289 | [] -> assert false
1290 | tprim :: secondaries ->
1291 assert (tprim.t_primary);
1292 (* that test is somewhat redundant, since only primary
1293 frontends with verification can have merged secondary
1294 frontends; See merge *)
1295 match tprim.t_verifier with
1296 | NoVerification | VerificationNotAvailable -> ()
1297 | Verification _ | ForceVerification ->
1298 let jprim = tprim.t_chunk_of_block.(i) in
1299 assert (VB.get tprim.t_converted_verified_bitmap jprim = VB.State_verified);
1300 List.iter (fun t ->
1301 assert (not t.t_primary);
1302 let j = t.t_chunk_of_block.(i) in
1303 if List.for_all (fun i -> VB.get s.s_verified_bitmap i = VB.State_verified)
1304 t.t_blocks_of_chunk.(j) then
1305 match t.t_verifier with
1306 | NoVerification | VerificationNotAvailable ->
1307 (* we have no way to check data integrity
1308 for this network, assume other(s) know
1309 better *)
1310 (match VB.get t.t_converted_verified_bitmap j with
1311 | VB.State_missing | VB.State_partial ->
1312 VB.set t.t_converted_verified_bitmap j VB.State_verified;
1313 t.t_ncomplete_chunks <- t.t_ncomplete_chunks + 1;
1314 t.t_nverified_chunks <- t.t_nverified_chunks + 1
1315 | VB.State_complete ->
1316 VB.set t.t_converted_verified_bitmap j VB.State_verified;
1317 t.t_nverified_chunks <- t.t_nverified_chunks + 1
1318 | VB.State_verified -> ())
1319 | ForceVerification
1320 | Verification _ ->
1321 (* all chunks are verified, so set
1322 converted_verified_bitmap to State_complete,
1323 probably to trigger data verification later.
1325 Is that code necessary at all ? *)
1326 (match VB.get t.t_converted_verified_bitmap j with
1327 | VB.State_missing | VB.State_partial ->
1328 VB.set t.t_converted_verified_bitmap j VB.State_complete;
1329 t.t_ncomplete_chunks <- t.t_ncomplete_chunks + 1
1330 | VB.State_complete -> ()
1331 | VB.State_verified ->
1332 lprintf_nl "set_swarmer_state_verified: trying to demote a verified block in another frontend?")
1333 ) secondaries)
1334 | VB.State_verified -> ()
1336 (** set block as completed, closing all remaining ranges, and
1337 incrementing amount downloaded by their total size.
1338 If the block was empty its whole size is added *)
1340 let set_completed_block maybe_t s i =
1341 let mark_completed () =
1342 set_swarmer_state_complete s i;
1343 s.s_blocks.(i) <- CompleteBlock in
1344 match s.s_blocks.(i) with
1345 | CompleteBlock | VerifiedBlock -> ()
1346 | EmptyBlock ->
1347 let block_begin = compute_block_begin s i in
1348 let block_end = compute_block_end s i in
1349 add_file_downloaded maybe_t s (block_end -- block_begin);
1350 mark_completed ()
1351 | PartialBlock b ->
1352 close_block_ranges maybe_t s b;
1353 mark_completed ()
1355 (** set block as verified, closing all remaining ranges, and
1356 incrementing amount downloaded by their total size.
1357 If the block was empty its whole size is added
1359 (is it normal that no maybe_t can be provided ? my guess is that
1360 this function is always called on behalf of a primary frontend) *)
1362 let set_verified_block s j =
1363 match s.s_blocks.(j) with
1364 | VerifiedBlock -> ()
1365 | _ ->
1366 set_completed_block None s j;
1367 s.s_blocks.(j) <- VerifiedBlock;
1368 set_swarmer_state_verified s j
1370 (*************************************************************************)
1371 (* *)
1372 (* frontends verified bitmaps *)
1373 (* *)
1374 (*************************************************************************)
1376 (* We've seen how swarmer verification propagates to the frontend(s)
1377 verifications, now let's see the reverse *)
1379 let set_frontend_state_missing t j =
1380 assert(VB.get t.t_converted_verified_bitmap j = VB.State_complete);
1381 let s = t.t_s in
1382 assert(List.for_all (fun i -> VB.get s.s_verified_bitmap i <> VB.State_verified) t.t_blocks_of_chunk.(j));
1383 t.t_ncomplete_chunks <- t.t_ncomplete_chunks - 1;
1384 if List.for_all (fun i -> VB.get s.s_verified_bitmap i = VB.State_complete) t.t_blocks_of_chunk.(j) then begin
1385 if !verbose_swarming || !verbose then
1386 lprintf_nl "Complete block %d/%d of %s failed verification, reloading..."
1387 (j + 1) t.t_nchunks (file_best_name t.t_file);
1389 VB.set t.t_converted_verified_bitmap j VB.State_missing;
1390 List.iter (fun i ->
1391 match s.s_blocks.(i) with
1392 | EmptyBlock -> set_swarmer_state_missing s i
1393 | PartialBlock _ -> set_swarmer_state_partial s i
1394 | CompleteBlock ->
1395 let block_begin = compute_block_begin s i in
1396 let block_end = compute_block_end s i in
1397 (* negative *)
1398 add_file_downloaded None s (block_begin -- block_end);
1400 s.s_blocks.(i) <- EmptyBlock;
1401 set_swarmer_state_missing s i
1403 | VerifiedBlock -> assert false
1404 ) t.t_blocks_of_chunk.(j)
1406 else begin
1407 (* afaiu not supposed to happen, so this code is for debugging ? *)
1408 if !verbose_swarming then begin
1409 let nsub = ref 0 in
1410 lprintf_n " Swarmer was incomplete: ";
1411 List.iter (fun i ->
1412 lprintf "%c" (VB.state_to_char (VB.get s.s_verified_bitmap i));
1413 if VB.get s.s_verified_bitmap i = VB.State_complete then incr nsub;
1414 ) t.t_blocks_of_chunk.(j);
1415 lprintf_nl " = %d/%d" !nsub (List.length t.t_blocks_of_chunk.(j))
1416 end;
1417 VB.set t.t_converted_verified_bitmap j VB.State_partial
1420 (* aka set_completed_chunk (internal) *)
1421 let set_frontend_state_complete t j =
1422 match VB.get t.t_converted_verified_bitmap j with
1423 | VB.State_missing | VB.State_partial ->
1424 if (not !CommonGlobals.is_startup_phase) && (!verbose_swarming || !verbose) then
1425 lprintf_nl "Completed block %d/%d of %s"
1426 (j + 1) t.t_nchunks (file_best_name t.t_file);
1427 let s = t.t_s in
1428 List.iter (fun i -> set_completed_block None s i)
1429 t.t_blocks_of_chunk.(j)
1430 | VB.State_complete | VB.State_verified -> ()
1432 (* aka set_verified_chunk (internal) *)
1433 let set_frontend_state_verified t j =
1434 let mark_verified () =
1435 VB.set t.t_converted_verified_bitmap j VB.State_verified;
1436 if (not !CommonGlobals.is_startup_phase) && (!verbose_swarming || !verbose) then
1437 lprintf_nl "Verified block %d/%d of %s"
1438 (j + 1) t.t_nchunks (file_best_name t.t_file);
1439 if t.t_primary then begin
1440 let s = t.t_s in
1441 (* The primary is supposed to propagate verified chunks to the file *)
1442 List.iter (fun i -> set_verified_block s i) t.t_blocks_of_chunk.(j);
1443 if !verbose_swarming then
1444 print_s "VERIFIED" s
1445 end;
1446 t.t_verified t.t_nverified_chunks j in
1447 if j = 0 && !Autoconf.magic_works then check_magic t.t_file;
1448 match VB.get t.t_converted_verified_bitmap j with
1449 | VB.State_missing | VB.State_partial ->
1450 t.t_ncomplete_chunks <- t.t_ncomplete_chunks + 1;
1451 t.t_nverified_chunks <- t.t_nverified_chunks + 1;
1452 mark_verified ()
1453 | VB.State_complete ->
1454 t.t_nverified_chunks <- t.t_nverified_chunks + 1;
1455 mark_verified ()
1456 | VB.State_verified -> ()
1458 let set_chunks_verified_bitmap t bitmap =
1459 VB.iteri (fun j c ->
1460 match c with
1461 | VB.State_missing | VB.State_partial ->
1463 | VB.State_complete ->
1464 set_frontend_state_complete t j
1465 | VB.State_verified ->
1466 set_frontend_state_verified t j;
1467 if VB.get t.t_converted_verified_bitmap j <> VB.State_verified then
1468 lprintf_nl "FIELD AS BEEN CLEARED"
1469 ) bitmap
1471 let chunks_verified_bitmap t =
1472 t.t_converted_verified_bitmap
1474 (** Check the equality of the hash of [t]'s data between offsets
1475 [begin_pos] and [end_pos] against the value of [uid] *)
1477 (*************************************************************************)
1478 (* *)
1479 (* verify_chunk (internal) *)
1480 (* *)
1481 (*************************************************************************)
1483 let verify_chunk t j =
1484 let verify t uid begin_pos end_pos =
1485 file_verify t.t_file uid begin_pos end_pos in
1487 if VB.get t.t_converted_verified_bitmap j = VB.State_complete then
1488 let nchunks = VB.length t.t_converted_verified_bitmap in
1489 match t.t_verifier with
1490 | NoVerification
1491 | VerificationNotAvailable -> ()
1493 | ForceVerification ->
1494 set_frontend_state_verified t j
1496 | Verification uids when Array.length uids = nchunks ->
1498 (try
1499 let s = t.t_s in
1500 let chunk_begin = t.t_chunk_size *.. j in
1501 let chunk_end = chunk_begin ++ t.t_chunk_size in
1502 let chunk_end = min chunk_end s.s_size in
1503 if verify t uids.(j) chunk_begin chunk_end then
1504 set_frontend_state_verified t j
1505 else
1506 set_frontend_state_missing t j
1507 with VerifierNotReady -> ())
1509 | Verification chunks ->
1510 (* network only provides a hash for the whole file ? *)
1511 assert (Array.length chunks = 1);
1512 (* let nchunks = String.length t.t_converted_verified_bitmap in *)
1514 if VB.for_all (function
1515 | VB.State_missing | VB.State_partial -> false
1516 | VB.State_complete | VB.State_verified -> true) t.t_converted_verified_bitmap then
1518 let s = t.t_s in
1519 if verify t chunks.(0) zero s.s_size then
1520 VB.iteri (fun j _ ->
1521 set_frontend_state_verified t j
1522 ) t.t_converted_verified_bitmap
1523 else
1524 VB.iteri (fun j c ->
1525 if c = VB.State_complete then set_frontend_state_missing t j
1526 ) t.t_converted_verified_bitmap
1527 with VerifierNotReady -> ()
1530 (** mark a block as completed, ready for verification *)
1532 let must_verify_block s i =
1533 set_swarmer_state_complete s i
1535 (** mark all blocks as completed, ready for verification *)
1537 let verify_all_chunks t =
1538 let s = t.t_s in
1539 VB.iteri (fun i state ->
1540 match state with
1541 | VB.State_verified ->
1542 must_verify_block s i
1543 | VB.State_missing ->
1544 let block_begin = compute_block_begin s i in
1545 let block_end = compute_block_end s i in
1546 add_file_downloaded None s (block_end -- block_begin);
1547 must_verify_block s i
1548 | VB.State_partial | VB.State_complete -> ()
1549 ) s.s_verified_bitmap
1551 (** same, and synchronously calls the verification of all chunks *)
1553 let verify_all_chunks_immediately t =
1554 verify_all_chunks t;
1555 VB.iteri (fun i _ -> verify_chunk t i) t.t_converted_verified_bitmap
1558 (** synchronously verify all completed chunks not yet verified *)
1560 let compute_bitmap t =
1561 if t.t_ncomplete_chunks > t.t_nverified_chunks then
1562 VB.iteri (fun i c ->
1563 if c = VB.State_complete then verify_chunk t i) t.t_converted_verified_bitmap
1566 (** Replaces the ith block of the swarmer with a PartialBlock
1567 ranges are created with s_range_size size *)
1569 let new_block s i =
1571 let block_begin = compute_block_begin s i in
1572 let block_end = compute_block_end s i in
1573 let block_size = block_end -- block_begin in
1574 let rec b = {
1575 block_s = s;
1577 block_begin = block_begin;
1578 block_end = block_end;
1579 block_ranges = range;
1580 block_num = i;
1581 block_remaining = block_size;
1582 block_unselected_remaining = block_size;
1585 and range = {
1586 range_prev = None;
1587 range_next = None;
1588 range_begin = block_begin;
1589 range_end = block_end;
1590 range_block = b;
1591 range_nuploading = 0;
1594 (* lprintf "New block %Ld-%Ld\n" block_begin block_end; *)
1596 s.s_blocks.(i) <- PartialBlock b;
1597 if VB.get s.s_verified_bitmap i = VB.State_missing then
1598 set_swarmer_state_partial s i;
1599 if debug_all then lprintf_nl "NB[%s]" (VB.to_string s.s_verified_bitmap);
1602 (** Remove an interval from the beginning of a range, adding the size
1603 of the removed part to the downloaded amount
1604 Closed ranges are removed
1605 When last range is removed, mark the block for verification *)
1607 let range_received maybe_t r interval_begin interval_end =
1608 (* lprintf " range_received: %Ld-%Ld for %Ld-%Ld\n"
1609 interval_begin interval_end r.range_begin r.range_end; *)
1610 (* interval overlap with the beginning of range ? *)
1611 (* was: r.range_begin < interval_end && r.range_end > interval_begin *)
1612 if r.range_begin >= interval_begin &&
1613 r.range_begin < interval_end then begin
1614 (* lprintf "... entered\n"; *)
1615 let new_begin =
1616 max (min interval_end r.range_end) r.range_begin in
1617 let downloaded = new_begin -- r.range_begin in
1618 let b = r.range_block in
1619 let s = b.block_s in
1620 add_file_downloaded maybe_t s downloaded;
1621 b.block_remaining <- b.block_remaining -- downloaded;
1622 if r.range_nuploading = 0 then
1623 b.block_unselected_remaining <- b.block_unselected_remaining -- downloaded;
1624 r.range_begin <- new_begin;
1625 if r.range_begin = r.range_end then begin
1626 (* range completed, unlink it *)
1627 (match r.range_next with
1628 | Some rr -> rr.range_prev <- r.range_prev
1629 | None -> ());
1630 (match r.range_prev with
1631 | Some rr -> rr.range_next <- r.range_next
1632 | None ->
1633 (* that was the first range of the block *)
1634 match r.range_next with
1635 | Some rr -> (* fix block's first range *)
1636 b.block_ranges <- rr
1637 | None -> (* that was the last remaining range of the block *)
1638 match s.s_blocks.(b.block_num) with
1639 | PartialBlock _ | EmptyBlock ->
1640 (match s.s_networks with
1641 | t :: _ ->
1642 assert(t.t_primary);
1643 (match t.t_verifier with
1644 | NoVerification ->
1645 set_verified_block s b.block_num
1646 | _ ->
1647 set_completed_block (Some t) s b.block_num;
1648 must_verify_block s b.block_num)
1649 | [] -> assert false)
1650 | _ -> () );
1651 r.range_next <- None;
1652 r.range_prev <- None;
1653 end (* else begin
1654 lprintf " ... new range %Ld-%Ld\n" r.range_begin r.range_end;
1655 end *)
1658 (** Split a range at [cut_pos] offset, if needed;
1659 ranges stay linked together *)
1661 let rec split_range r cut_pos =
1662 (* lprintf " split_range: cut_pos %Ld\n" cut_pos; *)
1663 if r.range_begin < cut_pos && r.range_end > cut_pos then
1664 (* "right" half *)
1665 let split_r = {
1666 range_block = r.range_block;
1667 range_nuploading = 0;
1668 range_next = r.range_next;
1669 range_prev = Some r;
1670 range_begin = cut_pos;
1671 range_end = r.range_end;
1672 } in
1673 (match r.range_next with
1674 | None -> ()
1675 | Some old_next_range ->
1676 old_next_range.range_prev <- Some split_r);
1677 (* "left" half *)
1678 r.range_next <- Some split_r;
1679 r.range_end <- cut_pos
1680 (* lprintf " NEW RANGE: %Ld- OLD RANGE: %Ld-%Ld\n"
1681 split_r.range_begin r.range_begin r.range_end; *)
1683 (** Remove an interval from the ranges of a block, calling
1684 range_received over all of them
1686 Assumption: we never download ranges from the middle, so present
1687 intervals can only overlap the beginning of ranges
1689 A (double linked) list is definitely not the most efficient
1690 datastructure for this operation... *)
1692 let set_present_block b interval_begin interval_end =
1693 let interval_size = interval_end -- interval_begin in
1694 let old_remaining = b.block_remaining in
1695 (* download can only happen at the beginning of ranges, so we must
1696 first split at each interval beginning *)
1697 iter_block_ranges (fun r ->
1698 split_range r interval_begin) b;
1699 iter_block_ranges (fun r ->
1700 range_received None r interval_begin interval_end) b;
1701 let new_present = old_remaining -- b.block_remaining in
1702 if new_present <> interval_size then
1703 lprintf_nl "set_present_block: %Ld added <> %Ld effectively added"
1704 interval_size new_present
1706 (** Remove a list of intervals from the ranges of a swarmer *)
1708 let set_present s intervals =
1709 iter_intervals s (fun i block_begin block_end interval_begin interval_end ->
1710 (* lprintf "interval: %Ld-%Ld in block %d [%Ld-%Ld]\n"
1711 interval_begin interval_end i block_begin block_end; *)
1712 match s.s_blocks.(i) with
1713 | EmptyBlock ->
1714 (* lprintf " EmptyBlock"; *)
1715 if block_begin >= interval_begin && block_end <= interval_end
1716 then begin
1717 (* lprintf " --> CompleteBlock\n"; *)
1718 s.s_blocks.(i) <- CompleteBlock;
1719 must_verify_block s i;
1720 add_file_downloaded None s (block_end -- block_begin)
1722 else
1723 let b = new_block s i in
1724 (* lprintf " ... set_present_block\n"; *)
1725 set_present_block b interval_begin interval_end
1726 | PartialBlock b ->
1727 (* lprintf " PartialBlock\n"; *)
1728 set_present_block b interval_begin interval_end
1729 | CompleteBlock | VerifiedBlock ->
1730 (* lprintf " Other\n"; *)
1732 ) intervals;
1733 match s.s_networks with
1734 | tprim :: _ ->
1735 List.iter (fun (interval_begin, interval_end) ->
1736 mark_disk_space_preallocated tprim interval_begin interval_end;
1737 ) intervals
1738 | [] -> assert false
1740 (** reverse absent/present in the list and call set_present *)
1742 let set_absent s list_absent =
1743 (** Build the complementary list of intervals of [intervals] in
1744 [set_begin, set_end[ *)
1745 let rec complementary acc set_begin set_end intervals =
1746 match intervals with
1747 | [] ->
1748 let acc =
1749 if set_begin = set_end then acc else
1750 (set_begin, set_end) :: acc
1752 List.rev acc
1753 | (interval_begin, interval_end) :: other_intervals ->
1754 let acc =
1755 if set_begin = interval_begin then acc
1756 else (set_begin, interval_begin) :: acc
1758 complementary acc interval_end set_end other_intervals in
1759 let list_present = complementary [] Int64.zero s.s_size list_absent in
1760 set_present s list_present
1762 let intervals_to_string s intervals =
1763 match intervals with
1764 | AvailableIntervals intervals ->
1765 let st = VB.create (Array.length s.s_blocks) VB.State_missing in
1766 iter_intervals s (fun i _ _ _ _ -> VB.set st i VB.State_partial) intervals;
1767 VB.to_string st
1768 | AvailableBitv b -> Bitv.to_string b
1770 (*************************************************************************)
1771 (* *)
1772 (* Uploaders *)
1773 (* *)
1774 (*************************************************************************)
1776 (** (debug) output an [uploader] to current log *)
1778 let print_uploader up =
1779 lprintf_n " interesting complete_blocks: %d\n " up.up_ncomplete;
1780 Array.iter (fun i -> lprintf " %d " i) up.up_complete_blocks;
1781 lprint_newline ();
1782 lprintf_n " interesting partial_blocks: %d\n " up.up_npartial;
1783 Array.iter (fun (i, begin_pos, end_pos) ->
1784 lprintf " %d[%Ld...%Ld] " i begin_pos end_pos
1785 ) up.up_partial_blocks;
1786 lprint_newline ()
1788 (** if not [up_declared],
1789 sets [up_intervals], [up_complete_blocks], [up_ncomplete],
1790 [up_partial_blocks], [up_npartial] according to [intervals],
1791 resets [up_blocks], and calls
1792 [client_has_bitmap] on associated client.
1794 My feeling is that if all those fields only make sense when
1795 up_declared is true, they should be regrouped in a record option.
1798 let set_uploader_intervals up intervals =
1799 if up.up_declared then
1800 lprintf_nl "set_uploader_intervals: called on an already declared uploader\n"
1801 else
1802 let t = up.up_t in
1803 let s = t.t_s in
1804 (* INVARIANT: complete_blocks must be in reverse order *)
1806 let complete_blocks = ref [] in
1807 let partial_blocks = ref [] in
1809 let incr_availability s i =
1810 s.s_availability.(i) <- s.s_availability.(i) + 1 in
1812 (match intervals with
1813 | AvailableIntervals intervals ->
1814 iter_intervals s (fun i block_begin block_end interval_begin interval_end ->
1815 (* lprintf "iter_intervals %d %Ld-%Ld %Ld-%Ld\n"
1816 i block_begin block_end interval_begin interval_end; *)
1817 incr_availability s i;
1819 match s.s_blocks.(i) with
1820 | CompleteBlock | VerifiedBlock -> ()
1821 | EmptyBlock | PartialBlock _ ->
1822 if block_begin = interval_begin && block_end = interval_end then
1823 complete_blocks := i :: !complete_blocks
1824 else
1825 partial_blocks :=
1826 (i, interval_begin, interval_end) :: !partial_blocks
1827 ) intervals
1829 | AvailableBitv bitmap ->
1830 Bitv.iteri_true (fun i ->
1831 List.iter (fun j ->
1832 incr_availability s j;
1833 complete_blocks := j :: !complete_blocks
1834 ) t.t_blocks_of_chunk.(i)
1835 ) bitmap
1838 List.iter (fun i ->
1839 (* s.s_last_seen.(i) <- BasicSocket.last_time (); *)
1840 let i = t.t_chunk_of_block.(i) in
1841 t.t_last_seen.(i) <- BasicSocket.last_time ()
1842 ) !complete_blocks;
1844 let complete_blocks = Array.of_list !complete_blocks in
1845 let partial_blocks = Array.of_list !partial_blocks in
1846 up.up_intervals <- intervals;
1848 up.up_complete_blocks <- complete_blocks;
1849 up.up_ncomplete <- Array.length complete_blocks;
1851 if Array.length partial_blocks > 0 then
1852 lprintf_nl "WARNING: partial_blocks = %d" (Array.length partial_blocks);
1853 up.up_partial_blocks <- partial_blocks;
1854 up.up_npartial <- Array.length partial_blocks;
1856 up.up_blocks <- [];
1858 up.up_declared <- true;
1860 let bm = intervals_to_string s intervals in
1861 client_has_bitmap up.up_client up.up_t.t_file bm;
1863 if debug_all then print_uploader up
1865 (*************************************************************************)
1866 (* *)
1867 (* register_uploader *)
1868 (* *)
1869 (*************************************************************************)
1871 let register_uploader t client intervals =
1872 let up =
1874 up_t = t;
1875 up_client = client;
1877 up_declared = false;
1878 up_intervals = intervals;
1880 up_complete_blocks = [||];
1881 up_ncomplete = 0;
1883 up_partial_blocks = [||];
1884 up_npartial = 0;
1886 up_blocks = [];
1888 up_ranges = [];
1891 HU.add uploaders_by_num up;
1892 set_uploader_intervals up intervals;
1895 (*************************************************************************)
1896 (* *)
1897 (* unregister_uploader *)
1898 (* *)
1899 (*************************************************************************)
1901 let clear_uploader_ranges up =
1902 List.iter (fun (_,_,r) ->
1903 if r.range_nuploading > 0 then
1904 r.range_nuploading <- r.range_nuploading - 1
1905 else
1906 lprintf_nl "clear_uploader_ranges: some range_nuploading was about to become negative\n";
1907 if r.range_nuploading = 0 then
1908 let b = r.range_block in
1909 b.block_unselected_remaining <-
1910 b.block_unselected_remaining ++ (compute_range_size r);
1911 if b.block_unselected_remaining > b.block_remaining then
1912 lprintf_nl "clear_uploader_ranges: block_unselected_remaining larger than block_remaining!";
1913 ) up.up_ranges;
1914 up.up_ranges <- []
1916 let clear_uploader_blocks up =
1917 List.iter (fun b ->
1918 let num = b.up_block.block_num in
1919 let t = up.up_t in
1920 let s = t.t_s in
1921 if debug_all then
1922 lprintf_nl "Client %d unselect %d" (client_num up.up_client) num;
1923 if s.s_nuploading.(num) > 0 then
1924 s.s_nuploading.(num) <- s.s_nuploading.(num) - 1
1925 else
1926 lprintf_nl "clear_uploader_blocks: some s_nuploading was about to
1927 become negative\n";
1928 ) up.up_blocks;
1929 up.up_blocks <- []
1931 let clear_uploader_intervals up =
1932 if up.up_declared then
1933 let decr_availability s i =
1934 if s.s_availability.(i) > 0 then
1935 s.s_availability.(i) <- s.s_availability.(i) - 1
1936 else
1937 lprintf_nl "clear_uploader_intervals: some s_availability was about to become negative\n" in
1938 (* lprintf "clean_uploader_chunks:\n"; *)
1939 let t = up.up_t in
1940 let s = t.t_s in
1941 Array.iter (decr_availability s) up.up_complete_blocks;
1942 up.up_complete_blocks <- [||];
1943 up.up_ncomplete <- 0;
1944 Array.iter (fun (b,_,_) -> decr_availability s b) up.up_partial_blocks;
1945 up.up_partial_blocks <- [||];
1946 up.up_npartial <- 0;
1947 clear_uploader_blocks up;
1948 up.up_declared <- false
1950 let update_uploader_intervals up intervals =
1951 clear_uploader_intervals up;
1952 set_uploader_intervals up intervals
1954 let unregister_uploader up =
1955 clear_uploader_intervals up;
1956 clear_uploader_ranges up
1958 (** (debug) output the uploaders of a swarmer to current log *)
1960 let print_uploaders s =
1961 Array.iteri (fun i b ->
1962 match b with
1963 | EmptyBlock -> lprintf "_"
1964 | CompleteBlock -> lprintf "C"
1965 | VerifiedBlock -> lprintf "V"
1966 | PartialBlock b ->
1967 if s.s_nuploading.(i) > 9 then
1968 lprintf "X"
1969 else
1970 lprintf "%d" s.s_nuploading.(i)
1971 ) s.s_blocks;
1972 lprint_newline ();
1973 Array.iteri (fun i b ->
1974 match b with
1975 | EmptyBlock -> lprintf "_"
1976 | CompleteBlock -> lprintf "C"
1977 | VerifiedBlock -> lprintf "V"
1978 | PartialBlock b ->
1979 lprintf "{ %d : %d=" b.block_num
1980 s.s_nuploading.(b.block_num);
1981 iter_block_ranges (fun r ->
1982 lprintf "(%d)" r.range_nuploading
1983 ) b;
1984 lprintf " }";
1985 ) s.s_blocks;
1986 lprint_newline ()
1988 (** (see uploaders invariants above)
1989 Drop the [n]th element from the [up.up_ncomplete] first elements
1990 of [up.complete_blocks] by swapping it with the
1991 ([up.up_ncomplete]-1)th element, then decrease [up.up_ncomplete];
1992 Then return that element, after converting associated block to
1993 PartialBlock if necessary.
1996 let permute_and_return up n =
1997 assert (n <= up.up_ncomplete-1);
1998 let b = up.up_complete_blocks.(n) in
1999 if debug_all then lprintf_nl "permute_and_return %d <> %d" n b;
2000 if n < up.up_ncomplete then begin
2001 up.up_complete_blocks.(n) <- up.up_complete_blocks.(up.up_ncomplete-1);
2002 up.up_complete_blocks.(up.up_ncomplete-1) <- b
2003 end;
2004 up.up_ncomplete <- up.up_ncomplete - 1;
2005 let t = up.up_t in
2006 let s = t.t_s in
2007 match s.s_blocks.(b) with
2008 | EmptyBlock ->
2009 let b = new_block s b in
2011 up_block = b;
2012 up_block_begin = b.block_begin;
2013 up_block_end = b.block_end }
2014 | PartialBlock b ->
2016 up_block = b;
2017 up_block_begin = b.block_begin;
2018 up_block_end = b.block_end }
2019 | VerifiedBlock ->
2020 lprintf_nl "ERROR: verified block in permute_and_return %d\n" b;
2021 assert false
2022 | CompleteBlock ->
2023 lprintf_nl "ERROR: complete block in permute_and_return %d\n" b;
2024 assert false
2026 (** find a block in up_complete_blocks that's not already
2027 CompleteBlock or VerifiedBlock.
2028 If none can be found, do the same with up_partial_blocks.
2029 If none can be found still, raise Not_found exception
2031 up_ncomplete and up_npartial are used as in the same way as in
2032 permute_and_return, but no element is ever permuted.
2034 Since set_uploader_intervals puts the blocks with the lowest
2035 offsets at the end of up_complete_blocks and up_partial_blocks,
2036 this also selects the blocks in increasing offsets order.
2039 let linear_select_blocks up =
2040 let rec iter_partial up =
2041 let n = up.up_npartial in
2042 if n = 0 then raise Not_found;
2043 let b, block_begin, block_end = up.up_partial_blocks.(n-1) in
2044 up.up_npartial <- n-1;
2045 let t = up.up_t in
2046 let s = t.t_s in
2047 if s.s_priorities_bitmap.[b] = priority_zero then iter_partial up else
2048 let chunk = t.t_chunk_of_block.(b) in
2049 match s.s_blocks.(b) with
2050 | CompleteBlock | VerifiedBlock ->
2051 iter_partial up
2052 | PartialBlock b ->
2053 chunk,
2055 up_block = b;
2056 up_block_begin = block_begin;
2057 up_block_end = block_end }]
2058 | EmptyBlock ->
2059 let b = new_block s b in
2060 chunk,
2062 up_block = b;
2063 up_block_begin = block_begin;
2064 up_block_end = block_end }] in
2065 let rec iter_complete up =
2066 let n = up.up_ncomplete in
2067 if n = 0 then iter_partial up
2068 else
2069 let b = up.up_complete_blocks.(n-1) in
2070 up.up_ncomplete <- n-1;
2071 let t = up.up_t in
2072 let s = t.t_s in
2073 if s.s_priorities_bitmap.[b] = priority_zero then iter_complete up else
2074 let chunk = t.t_chunk_of_block.(b) in
2075 match s.s_blocks.(b) with
2076 | CompleteBlock | VerifiedBlock ->
2077 iter_complete up
2078 | PartialBlock b ->
2079 chunk,
2081 up_block = b;
2082 up_block_begin = b.block_begin;
2083 up_block_end = b.block_end }]
2084 | EmptyBlock ->
2085 let b = new_block s b in
2086 chunk,
2088 up_block = b;
2089 up_block_begin = b.block_begin;
2090 up_block_end = b.block_end }]
2092 iter_complete up
2094 (** Check whether block [n] of swarmer [s] is not already downloaded
2095 and verified.
2097 Chunk verification may be called if the block is completed but not
2098 verified.
2101 let should_download_block s n =
2102 (* lprintf "should_download_block %d\n" n; *)
2103 let result =
2104 match VB.get s.s_verified_bitmap n with
2105 | VB.State_missing | VB.State_partial -> true
2106 | VB.State_complete ->
2107 (match s.s_networks with
2108 | t :: _ ->
2109 assert(t.t_primary);
2110 (try
2111 let n = t.t_chunk_of_block.(n) in
2112 if VB.get t.t_converted_verified_bitmap n = VB.State_complete then
2113 verify_chunk t n
2114 with VerifierNotReady -> ());
2115 | [] -> assert false);
2116 (match VB.get s.s_verified_bitmap n with
2117 | VB.State_missing | VB.State_partial -> true
2118 | VB.State_complete | VB.State_verified -> false)
2119 | VB.State_verified -> false
2121 (* if result then
2122 lprintf "should_download_block %d\n" n; *)
2123 result
2125 (*************************************************************************)
2126 (* *)
2127 (* select_blocks (internal) *)
2128 (* *)
2129 (*************************************************************************)
2131 type choice = {
2132 choice_num : int;
2133 choice_block : int;
2134 choice_user_priority : int;
2135 choice_remaining : int64;
2136 choice_unselected_remaining : int64;
2137 choice_preallocated : bool;
2140 let dummy_choice = {
2141 choice_num = 0;
2142 choice_block = 0;
2143 choice_user_priority = 0;
2144 choice_remaining = 0L;
2145 choice_unselected_remaining = 0L;
2146 choice_preallocated = false;
2149 (* Return the best list of blocks to ask from an uploader
2150 All the blocks from the list must come from the same chunk *)
2152 let select_blocks up =
2153 let t = up.up_t in
2154 let s = t.t_s in
2156 match s.s_strategy with
2157 | LinearStrategy ->
2158 linear_select_blocks up
2159 | _ ->
2160 if up.up_ncomplete = 0 && up.up_npartial = 0 then raise Not_found;
2162 (* to evaluate the relative rarity of a block, we must compare it to
2163 the availability of *all* blocks, not only those available from
2164 that uploader *)
2165 let sum_availability = Array.fold_left (+) 0 s.s_availability in
2166 let mean_availability = sum_availability / Array.length s.s_blocks in
2168 let my_t = if t.t_verifier <> NoVerification then t else
2169 match s.s_networks with
2170 | tprim :: _ ->
2171 assert(tprim.t_primary);
2172 tprim
2173 | [] -> assert false in
2174 let verification_available = my_t.t_verifier <> NoVerification in
2176 let several_frontends = List.length s.s_networks > 1 in
2177 (* compute the number of missing or partial blocks in the same
2178 chunks (for all networks) as a given block *)
2179 (* memoize some results *)
2180 let memoization_calls = ref 0 in
2181 let memoization_hits = ref 0 in
2182 let debug_memoization = false in
2183 let memoize h f p =
2184 incr memoization_calls;
2186 let result = Hashtbl.find h p in
2187 incr memoization_hits;
2188 if debug_memoization then
2189 (* defeats the purpose of memoization, only enable for
2190 debugging *)
2191 let recomputed_result = f p in
2192 if result <> recomputed_result then begin
2193 lprintf_nl "memoization failure";
2194 recomputed_result
2195 end else result
2196 else result
2197 with Not_found ->
2198 let result = f p in
2199 Hashtbl.add h p result;
2200 result in
2201 let memoize_remaining_blocks_in_chunk = Hashtbl.create 17 in
2202 let memoize_remaining_blocks_in_chunks = Hashtbl.create 17 in
2204 let remaining_blocks_in_chunks i =
2205 assert (i >= 0 && i < VB.length s.s_verified_bitmap);
2206 memoize memoize_remaining_blocks_in_chunks
2207 (fun i ->
2208 List.fold_left (fun acc t ->
2209 acc +
2210 (memoize memoize_remaining_blocks_in_chunk
2211 (fun (tnum, i) ->
2212 let chunk = t.t_chunk_of_block.(i) in
2213 List.fold_left (fun acc b ->
2214 if b <> i &&
2215 (match VB.get s.s_verified_bitmap b with
2216 | VB.State_missing | VB.State_partial -> true
2217 | VB.State_complete | VB.State_verified -> false) then acc + 1
2218 else acc) 0 t.t_blocks_of_chunk.(chunk)) (t.t_num, i))
2219 ) 0 s.s_networks) i in
2222 let preview_beginning = 9000000L in
2223 let preview_end = (s.s_size ** 98L) // 100L in
2226 (* sources_per_chunk was initially for edonkey only *)
2227 let data_per_source = 9728000L // (Int64.of_int !!sources_per_chunk) in
2229 let need_to_complete_some_blocks_quickly =
2230 verification_available && t.t_nverified_chunks < 2
2233 let create_choice n b =
2234 let block_begin = compute_block_begin s b in
2235 let block_end = compute_block_end s b in
2236 let size = block_end -- block_begin in
2237 let remaining, unselected_remaining = match s.s_blocks.(b) with
2238 | EmptyBlock -> size, size
2239 | PartialBlock b -> b.block_remaining, b.block_unselected_remaining
2240 | CompleteBlock | VerifiedBlock -> 0L, 0L in
2242 choice_num = n;
2243 choice_block = b;
2244 choice_user_priority = Char.code s.s_priorities_bitmap.[b];
2245 choice_remaining = remaining;
2246 choice_preallocated = is_fully_preallocated t block_begin block_end;
2247 choice_unselected_remaining = unselected_remaining;
2248 } in
2250 (* accessors *)
2251 let choice_num choice =
2252 choice.choice_num in
2254 let _choice_block choice =
2255 choice.choice_block in
2257 let choice_user_priority choice =
2258 choice.choice_user_priority in
2260 let choice_remaining choice =
2261 choice.choice_remaining in
2263 let choice_unselected_remaining choice =
2264 choice.choice_unselected_remaining in
2266 let choice_nuploaders choice =
2267 s.s_nuploading.(choice.choice_block) in
2269 let choice_remaining_per_uploader choice =
2270 choice.choice_unselected_remaining //
2271 (Int64.of_int (choice_nuploaders choice + 1)) (* planned *) in
2273 (* has enough uploaders *)
2274 let choice_saturated choice =
2275 choice.choice_unselected_remaining <=
2276 Int64.of_int (choice_nuploaders choice) ** data_per_source in
2278 let choice_availability choice =
2279 s.s_availability.(choice.choice_block) in
2281 (* remaining blocks in the same chunk, for all frontends *)
2282 let choice_other_remaining choice =
2283 remaining_blocks_in_chunks (choice.choice_block) in
2285 let choice_preallocated choice =
2286 choice.choice_preallocated in
2288 let print_choice c =
2289 lprintf_nl "choice %d:%d priority:%d nup:%d rem:%Ld rmu:%Ld rpu:%Ld sat:%B sib:%d av:%d pre:%b"
2290 (choice_num c) up.up_complete_blocks.(choice_num c)
2291 (choice_user_priority c)
2292 (choice_nuploaders c)
2293 (choice_remaining c)
2294 (choice_unselected_remaining c)
2295 (choice_remaining_per_uploader c)
2296 (choice_saturated c)
2297 (choice_other_remaining c)
2298 (choice_availability c)
2299 (choice_preallocated c) in
2301 (** > 0 == c1 is best, < 0 = c2 is best, 0 == they're equivalent *)
2302 let compare_choices c1 c2 =
2303 (* "RULES" *)
2304 (* Avoid stepping on each other's feet *)
2305 let cmp =
2306 match choice_unselected_remaining c1,
2307 choice_unselected_remaining c2 with
2308 | 0L, 0L -> 0
2309 | _, 0L -> 1
2310 | 0L, _ -> -1
2311 | _, _ -> 0 in
2312 if cmp <> 0 then cmp else
2314 (* avoid overly unbalanced situations *)
2315 let cmp =
2316 match choice_saturated c1, choice_saturated c2 with
2317 | false, false -> 0
2318 | false, true -> 1
2319 | true, false -> -1
2320 | true, true -> 0 in
2321 if cmp <> 0 then cmp else
2323 (* "WISHES" *)
2324 (* Do what Master asked for *)
2325 let cmp = compare (choice_user_priority c1)
2326 (choice_user_priority c2) in
2327 if cmp <> 0 then cmp else
2329 (* "OPTIMIZATIONS" *)
2330 (* Pick really rare gems: if average availability of all
2331 blocks is higher than 5 connected sources, pick in
2332 priority blocks present in at most 3 connected sources;
2333 is that too restrictive ? *)
2334 let cmp =
2335 if not need_to_complete_some_blocks_quickly &&
2336 mean_availability > 5 &&
2337 (choice_availability c1 <= 3 ||
2338 choice_availability c2 <= 3) then
2339 compare (choice_availability c2)
2340 (choice_availability c1)
2341 else 0 in
2342 if cmp <> 0 then cmp else
2344 (* try to quickly complete (and validate) chunks;
2345 if there's only one frontend, each chunk has only one
2346 block, and looking at siblings make no sense *)
2347 let cmp =
2348 if verification_available && several_frontends then
2349 compare (choice_other_remaining c2)
2350 (choice_other_remaining c1)
2351 else 0 in
2352 if cmp <> 0 then cmp else
2354 (* try to quickly complete blocks *)
2355 let cmp =
2356 compare (choice_unselected_remaining c2)
2357 (choice_unselected_remaining c1) in
2358 if cmp <> 0 then cmp else
2360 (* pick blocks that won't require allocating more disk space *)
2361 let cmp =
2362 match choice_preallocated c1, choice_preallocated c2 with
2363 | true, false -> 1
2364 | false, true -> -1
2365 | _ -> 0 in
2366 if cmp <> 0 then cmp else
2368 (* "DEFAULT" *)
2369 (* Can't tell *)
2370 0 in
2372 (* compare a new chunk against a list of best choices numbers (and a
2373 specimen of best choice) *)
2374 let keep_best_chunks chunk_blocks_indexes best_choices specimen =
2375 match chunk_blocks_indexes with
2376 | [] -> best_choices, specimen
2377 | h :: q ->
2378 let this_chunk_specimen = List.fold_left (fun acc n ->
2379 let choice = create_choice n up.up_complete_blocks.(n) in
2380 let cmp = compare_choices choice acc in
2381 if cmp <= 0 then acc
2382 else choice) (create_choice h up.up_complete_blocks.(h)) q in
2383 let cmp = compare_choices this_chunk_specimen specimen in
2384 if cmp < 0 then best_choices, specimen
2385 else if cmp > 0 then [chunk_blocks_indexes], this_chunk_specimen
2386 else chunk_blocks_indexes :: best_choices, specimen in
2388 let current_chunk_num, current_chunk_blocks_indexes,
2389 best_choices, specimen =
2390 Array2.subarray_fold_lefti (fun
2391 ((current_chunk_num, current_chunk_blocks_indexes,
2392 best_choices, specimen) as acc) n b ->
2393 if s.s_priorities_bitmap.[b] = priority_zero ||
2394 not (should_download_block s b) then acc
2395 else
2396 let chunk_num = t.t_chunk_of_block.(b) in
2397 if chunk_num = current_chunk_num then
2398 (current_chunk_num, n :: current_chunk_blocks_indexes,
2399 best_choices, specimen)
2400 else
2401 (* different chunk *)
2402 match current_chunk_blocks_indexes with
2403 | [] ->
2404 (* no previous chunk *)
2405 (chunk_num, [n], best_choices, specimen)
2406 | h :: q ->
2407 let new_best_choices, new_specimen =
2408 keep_best_chunks current_chunk_blocks_indexes best_choices specimen in
2409 (chunk_num, [n], new_best_choices, new_specimen)
2410 ) (-1, [], [], dummy_choice) up.up_complete_blocks 0
2411 (up.up_ncomplete -1) in
2413 (* last chunk *)
2414 let best_choices, specimen =
2415 keep_best_chunks current_chunk_blocks_indexes best_choices specimen in
2416 (* what about up_partial_blocks ?
2417 currently they're taken care of by linear_select_block
2418 fallback below *)
2420 if debug_all then print_choice specimen;
2423 let blocks =
2424 match best_choices with
2425 | [] -> raise Not_found
2426 | [choice] -> choice
2427 | _::_ ->
2428 let nchoices = List.length best_choices in
2429 List.nth best_choices (Random.int nchoices) in
2431 let chunk =
2432 match blocks with
2433 | [] -> assert false
2434 | b :: _ -> t.t_chunk_of_block.(up.up_complete_blocks.(b)) in
2436 if debug_all || !verbose_swarming then begin
2437 lprintf_nl "\nBlocksFound in %d: %d" chunk (List.length blocks);
2438 List.iter (fun n ->
2439 lprintf_n " %d" up.up_complete_blocks.(n)
2440 ) blocks;
2441 lprint_newline ()
2442 end;
2445 (* DEBUG *)
2446 let probably_buggy =
2447 List.for_all (fun n ->
2448 let block_num = up.up_complete_blocks.(n) in
2449 match s.s_blocks.(block_num) with
2450 | EmptyBlock -> false
2451 | PartialBlock b ->
2452 block_ranges_for_all (fun r ->
2453 r.range_nuploading > 0) b
2454 | CompleteBlock | VerifiedBlock ->
2455 true) blocks in
2456 if probably_buggy then begin
2457 lprintf_nl "Probably buggy choice (%d):" chunk;
2458 Array2.subarray_fold_lefti (fun () n b ->
2459 if s.s_priorities_bitmap.[b] <> priority_zero &&
2460 should_download_block s b then
2461 let this_choice = create_choice n b in
2462 if List.mem n blocks then lprintf "** "
2463 else if List.exists (List.mem n) best_choices then
2464 lprintf "-- "
2465 else lprintf " ";
2466 print_choice this_choice;
2467 match s.s_blocks.(b) with
2468 | EmptyBlock | CompleteBlock | VerifiedBlock -> ()
2469 | PartialBlock b ->
2470 let total_uploading =
2471 block_ranges_fold (fun acc r ->
2472 lprintf "%d " r.range_nuploading;
2473 acc + r.range_nuploading) 0 b in
2474 lprintf "total=%d" total_uploading;
2475 lprint_newline ()
2476 ) () up.up_complete_blocks 0 (up.up_ncomplete - 1);
2477 end;
2478 (* /DEBUG *)
2481 chunk, List.map (fun n -> permute_and_return up n) blocks
2482 with Not_found ->
2483 if !verbose_swarming then
2484 lprintf_nl "select_block: fallback to linear strategy";
2485 linear_select_blocks up
2486 with Not_found ->
2488 (* print_s "NO BLOCK FOUND" s; *)
2489 raise Not_found
2491 (** If uploader is associated to a file being downloaded,
2492 clear previously selected block (in any) and select best available
2493 block, according to block selection strategy
2494 @param up the uploader *)
2496 let find_blocks up =
2498 if debug_all then begin
2499 lprintf "C: ";
2500 for i = 0 to up.up_ncomplete - 1 do
2501 lprintf "%d " up.up_complete_blocks.(i)
2502 done;
2503 end;
2505 let t = up.up_t in
2506 let s = t.t_s in
2507 match file_state t.t_file with
2508 | FilePaused
2509 | FileAborted _
2510 | FileCancelled
2511 | FileShared
2512 | FileNew
2513 | FileDownloaded ->
2514 raise Not_found
2515 | FileDownloading
2516 | FileQueued ->
2517 List.iter (fun b ->
2518 let num = b.up_block.block_num in
2519 if debug_all then
2520 lprintf_nl "Client %d unselected %d" (client_num up.up_client) num;
2521 if s.s_nuploading.(num) > 0 then
2522 s.s_nuploading.(num) <- s.s_nuploading.(num) - 1
2523 else
2524 lprintf_nl "find_blocks: s_nuploading was about to become negative"
2525 ) up.up_blocks;
2526 up.up_blocks <- [];
2528 let chunk, blocks = select_blocks up in
2529 List.iter (fun b ->
2530 let num = b.up_block.block_num in
2531 if debug_all then
2532 lprintf_nl "Client %d selected %d" (client_num up.up_client) num;
2533 s.s_nuploading.(num) <- s.s_nuploading.(num) + 1;
2534 ) blocks;
2535 up.up_blocks <- blocks;
2536 chunk, blocks
2537 with e ->
2538 if debug_all then lprintf_nl "Exception %s in find_blocks" (Printexc2.to_string e);
2539 raise e
2541 (** Remove completed ranges from an uploader's range list, and
2542 decrease their reference counter *)
2544 let remove_completed_uploader_ranges up =
2545 let not_completed_ranges, completed_ranges =
2546 List.partition (fun (_,_,r) ->
2547 r.range_begin < r.range_end) up.up_ranges in
2548 up.up_ranges <- not_completed_ranges;
2549 List.iter (fun (_,_,r) ->
2550 if r.range_nuploading > 0 then
2551 r.range_nuploading <- r.range_nuploading - 1
2552 else
2553 lprintf_nl "remove_completed_uploader_ranges: range_nuploading
2554 was about to become negative!";
2555 if r.range_nuploading = 0 then
2556 let b = r.range_block in
2557 b.block_unselected_remaining <-
2558 b.block_unselected_remaining ++ (compute_range_size r);
2559 if b.block_unselected_remaining > b.block_remaining then
2560 lprintf_nl "remove_completed_uploader_ranges: block_unselected_remaining is larger than block_remaining";
2561 ) completed_ranges
2563 (** uploader accessors *)
2565 let current_ranges up = up.up_ranges
2567 let current_blocks up =
2568 match up.up_blocks with
2569 | [] -> raise Not_found
2570 | bl -> bl
2572 (** Check whether a range is in a list *)
2574 let in_uploader_ranges r list =
2575 List.exists (fun (_,_,r') -> r' == r) list
2577 let set_strategy t strategy = t.t_s.s_strategy <- strategy
2579 let get_strategy t = t.t_s.s_strategy
2581 (*************************************************************************)
2582 (* *)
2583 (* find_range *)
2584 (* *)
2585 (*************************************************************************)
2587 let uploader_ranges_fold_left f acc l =
2588 let rec aux acc l =
2589 match l with
2590 | [] -> acc
2591 | h :: q -> aux (f acc h) q
2592 in aux acc l
2594 (** Find a range to upload from [up], that is at most [range_size]
2595 bytes long (split some range if necessary) *)
2597 (* Is merging at all useful ? Once next range starts downloading, they
2598 can no longer be merged, so it should be rare... *)
2599 let allow_merge_ranges = true
2601 type ranges_cluster = {
2602 cluster_ranges: range list;
2603 cluster_nuploading: int;
2604 cluster_size: Int64.t
2607 let dummy_ranges_cluster = {
2608 cluster_ranges = [];
2609 cluster_nuploading = 0;
2610 cluster_size = 0L
2613 let is_dummy_cluster cluster =
2614 cluster.cluster_ranges = []
2616 let find_range up range_size =
2618 (** merge two consecutive ranges in the first, if possible;
2619 Return true if successful *)
2620 let merge_ranges r r2 =
2621 match r.range_next with
2622 | None -> false
2623 | Some rr ->
2624 if rr != r2 ||
2625 r.range_end < r2.range_begin ||
2626 r2.range_nuploading > 0 then false
2627 else begin
2628 r.range_end <- r2.range_end;
2629 r.range_next <- r2.range_next;
2630 (match r.range_next with
2631 | None -> ()
2632 | Some r3 ->
2633 r3.range_prev <- Some r);
2634 true
2635 end in
2637 remove_completed_uploader_ranges up;
2639 let b, more_blocks =
2640 match up.up_blocks with
2641 | [] ->
2642 if debug_all then
2643 lprintf_nl "find_range: uploader had no block selected";
2644 raise Not_found
2645 | b :: more_blocks -> b, more_blocks
2647 let t = up.up_t in
2648 match file_state t.t_file with
2649 | FilePaused
2650 | FileAborted _
2651 | FileCancelled
2652 | FileShared
2653 | FileNew
2654 | FileDownloaded ->
2655 lprintf_nl "find_range: file %s in bad state %s"
2656 t.t_s.s_filename (string_of_state (file_state t.t_file));
2657 raise Not_found
2658 | FileDownloading
2659 | FileQueued ->
2660 if debug_all then
2661 lprintf_nl "find_range: is there a range of size %Ld in %s for %d ?"
2662 range_size
2663 (String.concat " " (List.map (fun b ->
2664 Printf.sprintf "[%Ld-%Ld]" b.up_block_begin b.up_block_end
2665 ) up.up_blocks))
2666 (client_num up.up_client);
2667 (* pick the first correct cluster with fewest uploaders
2668 We're not trying to get a range that's at least as big as
2669 [range_size] bytes - that would prevent partially downloaded
2670 ranges from being completed first *)
2671 let rec iter acc r b more_blocks =
2672 let correct_range r =
2673 not (in_uploader_ranges r up.up_ranges) &&
2674 r.range_begin < r.range_end &&
2675 r.range_begin >= b.up_block_begin &&
2676 r.range_begin < b.up_block_end in
2677 let best_cluster =
2678 if not (correct_range r) then acc
2679 else
2680 (* find if there are ranges to merge ahead *)
2681 let rec iter_cluster r cluster =
2682 let cluster = { cluster with
2683 cluster_ranges = r :: cluster.cluster_ranges;
2684 cluster_size = cluster.cluster_size ++
2685 (compute_range_size r)
2686 } in
2687 if not allow_merge_ranges ||
2688 cluster.cluster_size >= range_size then cluster
2689 else
2690 match r.range_next with
2691 | None -> cluster
2692 | Some rr ->
2693 if rr.range_begin = r.range_end &&
2694 correct_range rr && rr.range_nuploading = 0 then
2695 iter_cluster rr cluster
2696 else cluster in
2698 let cluster =
2699 iter_cluster r { dummy_ranges_cluster with
2700 cluster_nuploading = r.range_nuploading } in
2701 if debug_all then
2702 lprint_newline ();
2703 if is_dummy_cluster acc then cluster
2704 else
2705 (* find a range with as few uploaders as possible *)
2706 let cmp = compare acc.cluster_nuploading
2707 cluster.cluster_nuploading in
2708 if cmp < 0 then acc
2709 else cluster in
2711 (* fast exit, and why I didn't use an iterator :/
2712 Could have used an exception, but I don't like that ;) *)
2713 if not (is_dummy_cluster best_cluster) &&
2714 best_cluster.cluster_nuploading = 0 then b, best_cluster
2715 else
2716 match r.range_next with
2717 | Some rr -> iter best_cluster rr b more_blocks
2718 | None ->
2719 match more_blocks with
2720 | [] -> b, best_cluster
2721 | b :: more_blocks ->
2722 if debug_all || !verbose then
2723 lprintf_nl "find_range: Client %d, next block"
2724 (client_num up.up_client);
2725 iter best_cluster b.up_block.block_ranges b more_blocks in
2727 let best_block, best_cluster =
2728 iter dummy_ranges_cluster b.up_block.block_ranges b more_blocks in
2729 if not (is_dummy_cluster best_cluster) &&
2730 best_cluster.cluster_nuploading > 0 &&
2731 (file_downloaded t.t_file < file_size t.t_file ** 98L // 100L) then begin
2732 (* it seems they're only sucky choices left on that block, is
2733 there really nothing else better elsewhere ? *)
2734 let s = b.up_block.block_s in
2735 for i = 0 to up.up_ncomplete - 1 do
2736 let block = up.up_complete_blocks.(i) in
2737 if not (List.exists (fun b -> b.up_block.block_num = block
2738 ) up.up_blocks) then
2739 if s.s_priorities_bitmap.[block] <> priority_zero &&
2740 should_download_block s block then
2741 let partial_found = match s.s_blocks.(block) with
2742 | EmptyBlock -> true
2743 | CompleteBlock | VerifiedBlock -> false
2744 | PartialBlock b -> b.block_unselected_remaining > 0L in
2745 if partial_found then begin
2747 if debug_all || !verbose then
2748 lprintf_nl "find_range: Client %d better switch cluster now!"
2749 (client_num up.up_client);
2751 raise Not_found
2753 done
2754 end;
2755 match List.rev best_cluster.cluster_ranges with
2756 | [] ->
2757 if debug_all then
2758 lprintf_nl "find_range: no correct range found!";
2759 raise Not_found
2760 | r :: q ->
2761 if not (List.for_all (merge_ranges r) q) then
2762 lprintf_nl "find_range: ranges did not merge as well as planned";
2763 split_range r (min (r.range_begin ++ range_size)
2764 best_block.up_block_end);
2765 if debug_all then begin
2766 lprintf "=> ";
2767 iter_block_ranges (fun rr ->
2768 let selected = if rr == r then "*" else "" in
2769 lprintf " %s[%Ld-%Ld]:%d%s" selected
2770 rr.range_begin rr.range_end rr.range_nuploading
2771 selected
2772 ) best_block.up_block;
2773 lprint_newline ();
2774 end;
2775 let key = r.range_begin, r.range_end, r in
2776 up.up_ranges <- up.up_ranges @ [key];
2777 if r.range_nuploading = 0 then begin
2778 let b = r.range_block in
2779 b.block_unselected_remaining <-
2780 b.block_unselected_remaining -- (compute_range_size r);
2781 if b.block_unselected_remaining < 0L then
2782 lprintf_nl "find_range: block_unselected_remaining is negative!";
2783 end;
2784 r.range_nuploading <- r.range_nuploading + 1;
2787 (** range accessor(s) *)
2789 let range_range r = (r.range_begin, r.range_end)
2791 (** Data has been received from uploader [up]. Transfer data to file
2792 and update uploader ranges.
2793 Data = String.sub [str] [string_begin] [string_len] *)
2795 let received up file_begin str string_begin string_len =
2796 assert (string_begin >= 0);
2797 assert (string_len >= 0);
2798 assert (string_begin + string_len <= String.length str);
2800 let t = up.up_t in
2801 let s = t.t_s in
2803 let debug_bad_write unexpected_intervals =
2804 if !verbose_swarming then begin
2805 lprintf "Dismiss unwanted data from %d for %s:"
2806 (client_num up.up_client) (file_best_name t.t_file);
2807 List.iter (fun (i_begin, i_end) ->
2808 lprintf " %Ld-%Ld" i_begin i_end) unexpected_intervals;
2809 lprint_newline ();
2810 lprintf_nl " received: file_pos:%Ld string:%d %d"
2811 file_begin string_begin string_len;
2812 lprintf_nl " ranges:";
2813 List.iter (fun (_,_,r) ->
2814 lprintf_n " range: %Ld-%Ld"
2815 r.range_begin
2816 r.range_end;
2817 (match r.range_next with
2818 | None -> ()
2819 | Some rr ->
2820 lprintf " next: %Ld" rr.range_begin);
2821 (match r.range_prev with
2822 | None -> ()
2823 | Some rr ->
2824 lprintf " prev: %Ld" rr.range_begin);
2825 lprint_newline ();
2826 let b = r.range_block in
2827 lprintf_n " block: %d[%c] %Ld-%Ld [%s]"
2828 b.block_num
2829 (VB.state_to_char (VB.get s.s_verified_bitmap b.block_num))
2830 b.block_begin b.block_end
2831 (match s.s_blocks.(b.block_num) with
2832 | EmptyBlock -> "empty"
2833 | PartialBlock _ -> "partial"
2834 | CompleteBlock -> "complete"
2835 | VerifiedBlock -> "verified"
2837 let br = b.block_ranges in
2838 lprintf " first range: %Ld(%Ld)"
2839 br.range_begin
2840 (compute_range_size br);
2841 lprint_newline ();
2842 ) up.up_ranges
2843 end;
2844 if !exit_on_error then exit 2 in
2846 if string_len > 0 then
2847 let file_end = file_begin ++ (Int64.of_int string_len) in
2848 if !verbose_swarming then
2849 lprintf_nl "received on %Ld-%Ld" file_begin file_end;
2851 (* DEBUG *)
2852 let intervals_out =
2853 let remove_interval intervals interval_begin interval_end =
2854 let rec remove acc intervals =
2855 match intervals with
2856 | [] -> List.rev acc
2857 | ((fi_begin, fi_end) as first_interval) :: others ->
2858 if fi_begin >= interval_end then
2859 List.rev_append acc intervals
2860 else if fi_end <= interval_begin then
2861 remove (first_interval :: acc) others
2862 else
2863 remove (
2864 let acc = if fi_begin < interval_begin then
2865 (fi_begin, interval_begin) :: acc else acc in
2866 let acc = if fi_end > interval_end then
2867 (interval_end, fi_end) :: acc else acc in
2868 acc) others in
2869 remove [] intervals in
2870 List.fold_left (fun acc (_, _, r) ->
2871 remove_interval acc r.range_begin r.range_end
2872 ) [(file_begin, file_end)] up.up_ranges in
2873 if intervals_out <> [] then
2874 debug_bad_write intervals_out;
2876 let file_end = min file_end s.s_size in
2878 match file_state t.t_file with
2879 | FilePaused
2880 | FileAborted _
2881 | FileCancelled
2882 | FileShared
2883 | FileNew
2884 | FileQueued
2885 | FileDownloaded ->
2886 if !verbose then
2887 lprintf_nl "received: wrong file state %s for %s" (string_of_state (file_state t.t_file)) s.s_filename;
2889 | FileDownloading ->
2891 List.iter (fun (_,_,r) ->
2892 (* was: r.range_begin < file_end && r.range_end > file_begin *)
2893 if r.range_begin >= file_begin &&
2894 r.range_begin < file_end then
2895 let file_end = min file_end r.range_end in
2896 let written_len = file_end -- r.range_begin in
2897 let string_pos = string_begin +
2898 Int64.to_int (r.range_begin -- file_begin) in
2899 let string_length = Int64.to_int written_len in
2900 if string_length > 0 then
2901 match s.s_networks with
2902 | [] -> assert false
2903 | tprim :: _ ->
2904 assert (tprim.t_primary);
2905 (try
2906 preallocate_disk_space tprim
2907 r.range_begin file_end
2908 with
2909 End_of_file -> ()
2910 | e ->
2911 lprintf_nl "Exception %s while preallocating disk space [%Ld-%Ld] for %s"
2912 (Printexc2.to_string e)
2913 r.range_begin file_end
2914 (file_best_name t.t_file));
2915 file_write tprim.t_file
2916 r.range_begin
2917 str string_pos string_length;
2918 range_received (Some t) r r.range_begin file_end;
2919 ) up.up_ranges;
2920 remove_completed_uploader_ranges up
2921 with e ->
2922 lprintf_nl "Exception %s while receiving data"
2923 (Printexc2.to_string e);
2924 remove_completed_uploader_ranges up;
2925 raise e
2927 (** compute the list of present intervals of a swarmer *)
2929 let present_intervals s =
2930 (* intervals is a reversed list of intervals *)
2931 let append_interval ((interval_begin, interval_end) as interval) intervals =
2932 (* remove void intervals *)
2933 if interval_begin = interval_end then intervals
2934 else
2935 match intervals with
2936 | [] -> [interval]
2937 | (last_interval_begin, last_interval_end) :: other_intervals ->
2938 if last_interval_end < interval_begin then
2939 interval :: intervals
2940 else
2941 (* coalescing intervals *)
2942 (last_interval_begin, interval_end) :: other_intervals in
2944 List.rev (
2945 Array2.fold_lefti (fun acc i b ->
2946 match s.s_blocks.(i) with
2947 | EmptyBlock -> acc
2948 | CompleteBlock | VerifiedBlock ->
2949 append_interval (compute_block_begin s i, compute_block_end s i) acc
2950 | PartialBlock b ->
2951 let acc, last_interval_end =
2952 block_ranges_fold (fun (acc, lie) r ->
2953 (append_interval (lie, r.range_begin) acc, r.range_end)
2954 ) (acc, compute_block_begin s i) b in
2955 append_interval (last_interval_end, compute_block_end s i) acc
2956 ) [] s.s_blocks)
2958 (*************************************************************************)
2959 (* *)
2960 (* propagate_chunk *)
2961 (* *)
2962 (*************************************************************************)
2964 type chunk = {
2965 chunk_uid : uid_type;
2966 chunk_size : int64;
2969 type chunk_occurrence = t * int * Int64.t (* frontend, chunk number, offset *)
2971 type chunk_occurrences = {
2972 mutable occurrence_present : chunk_occurrence list;
2973 mutable occurrence_missing : chunk_occurrence list;
2976 let propagate_chunk t1 pos1 size destinations =
2977 List.iter (fun (t2, j2, pos2) ->
2978 if t1.t_num <> t2.t_num || pos1 <> pos2 then begin
2979 if !verbose then lprintf_nl "Should propagate chunk from %s %Ld to %s %Ld [%Ld]"
2980 (file_best_name t1.t_file) pos1
2981 (file_best_name t2.t_file) pos2 size;
2982 Unix32.copy_chunk (file_fd t1.t_file) (file_fd t2.t_file)
2983 pos1 pos2 (Int64.to_int size);
2984 set_frontend_state_complete t2 j2
2986 ) destinations
2988 let dummy_chunk_occurrences () =
2989 { occurrence_present = []; occurrence_missing = [] }
2991 let duplicate_chunks () =
2992 let chunks = Hashtbl.create 100 in
2993 HS.iter (fun s ->
2994 List.iter (fun t ->
2995 let nchunks = VB.length t.t_converted_verified_bitmap in
2996 match t.t_verifier with
2997 | Verification uids when Array.length uids = nchunks ->
2998 let rec iter j len pos =
2999 if j < len then
3000 let c = {
3001 chunk_uid = uids.(j);
3002 chunk_size = min (s.s_size -- pos) t.t_chunk_size;
3003 } in
3004 let occurrences =
3006 Hashtbl.find chunks c
3007 with Not_found ->
3008 let occurrences = dummy_chunk_occurrences () in
3009 Hashtbl.add chunks c occurrences;
3010 occurrences in
3011 (match VB.get t.t_converted_verified_bitmap j with
3012 | VB.State_missing | VB.State_partial ->
3013 occurrences.occurrence_missing <-
3014 (t, j, pos) :: occurrences.occurrence_missing
3015 | VB.State_complete -> ()
3016 | VB.State_verified ->
3017 occurrences.occurrence_present <-
3018 (t, j, pos) :: occurrences.occurrence_present);
3019 iter (j+1) len (pos ++ t.t_chunk_size)
3021 iter 0 (VB.length t.t_converted_verified_bitmap) zero
3022 | _ -> ()
3023 ) s.s_networks
3024 ) swarmers_by_name;
3025 Hashtbl.iter (fun c occurrences ->
3026 (* we need a verified chunk to copy over the others *)
3027 match occurrences.occurrence_present, occurrences.occurrence_missing with
3028 | _ , []
3029 | [], _ -> ()
3030 | (t, _, pos) :: _, missing ->
3031 propagate_chunk t pos c.chunk_size missing
3032 ) chunks
3034 let set_verifier t f =
3035 t.t_verifier <- f;
3036 (* TODO: check that false as t_primary is a good value to start with *)
3037 set_chunks_verified_bitmap t t.t_converted_verified_bitmap
3039 let set_verified t f =
3040 t.t_verified <- f
3042 let downloaded t = file_downloaded t.t_file
3044 let block_chunk_num t b =
3045 t.t_chunk_of_block.(b.block_num)
3047 let partition_size t = VB.length t.t_converted_verified_bitmap
3049 let uploader_swarmer up = up.up_t
3051 (** Return the availability of the chunks of [t] as a string *)
3053 let chunks_availability t =
3054 let s = t.t_s in
3055 String2.init (partition_size t) (fun i ->
3056 char_of_int (
3057 let v = List2.min
3058 (List.map (fun i -> s.s_availability.(i)) t.t_blocks_of_chunk.(i)) in
3059 if v < 0 then 0 else
3060 if v > 200 then 200 else v))
3062 let is_interesting up =
3063 up.up_ncomplete > 0 || up.up_npartial > 0
3066 (*************************************************************************)
3067 (* *)
3068 (* value_to_int64_pair (internal) *)
3069 (* *)
3070 (*************************************************************************)
3072 let value_to_int64_pair v =
3073 match v with
3074 | List [v1;v2] | SmallList [v1;v2] ->
3075 (value_to_int64 v1, value_to_int64 v2)
3076 | _ ->
3077 failwith "Options: Not an int64 pair"
3079 (*************************************************************************)
3080 (* *)
3081 (* WRAPPERS *)
3082 (* *)
3083 (*************************************************************************)
3085 let set_present t = set_present t.t_s
3086 let set_absent t = set_absent t.t_s
3087 let present_intervals t = present_intervals t.t_s
3088 let print_t str t = print_s str t.t_s
3089 let print_uploaders t = print_uploaders t.t_s
3091 (*************************************************************************)
3092 (* *)
3093 (* value_to_frontend *)
3094 (* *)
3095 (*************************************************************************)
3097 let value_to_frontend t assocs =
3099 let debug_wrong_downloaded t present d =
3100 lprintf_nl "ERROR: stored downloaded value not restored !!! (%Ld/%Ld)" (downloaded t) d;
3101 lprintf_nl "ERROR: present:";
3102 List.iter (fun (x,y) ->
3103 lprintf_nl " (%Ld,%Ld);" x y
3104 ) present;
3106 let p = present_intervals t in
3107 lprintf_nl "ERROR: present now:";
3109 let total =
3110 List.fold_left (fun acc (x,y) ->
3111 lprintf_nl " (%Ld,%Ld);" x y;
3112 acc ++ (y -- x)
3113 ) zero p in
3115 lprintf_nl "ERROR: total %Ld" total;
3116 if p = present then begin
3117 lprintf_nl "ERROR: both appear to be the same!";
3118 end;
3119 if !exit_on_error then exit 2 in
3121 let get_value name conv = conv (List.assoc name assocs) in
3123 let primary =
3124 try get_value "file_primary" value_to_bool with _ -> true in
3126 (try
3127 let file_name = get_value "file_swarmer" value_to_string in
3128 let s = HS.find swarmers_by_name
3129 { dummy_swarmer with s_filename = file_name } in
3130 associate primary t s
3131 (* TODO: make as many checks as possible to ensure the file and the swarmers
3132 are correctly associed. *)
3133 with Not_found -> ());
3135 let _ =
3136 let mtime = try file_mtime t.t_file with _ -> 0. in
3137 let old_mtime =
3139 value_to_float (List.assoc "file_mtime" assocs)
3140 with Not_found -> mtime
3142 old_mtime = mtime
3145 (try
3147 set_chunks_verified_bitmap t
3148 (VB.of_string (get_value "file_chunks" value_to_string))
3149 with Not_found ->
3150 set_chunks_verified_bitmap t
3151 (VB.of_string (get_value "file_all_chunks" value_to_string))
3153 with e ->
3154 lprintf_nl "Exception %s while loading bitmap"
3155 (Printexc2.to_string e);
3156 (* force everything to be checked ASAP ? *)
3157 set_chunks_verified_bitmap t (VB.create (partition_size t) VB.State_complete)
3161 lprintf "set_verified_bitmap: t = %s\n" t.t_converted_verified_bitmap;
3162 lprintf "set_verified_bitmap: s = %s\n" t.t_s.s_verified_bitmap;
3165 if primary then begin
3166 if !verbose_swarming then lprintf_nl "Loading present...";
3167 let present = try
3168 let present =
3169 (get_value "file_present_chunks"
3170 (value_to_list value_to_int64_pair))
3172 set_present t present;
3173 present
3174 with e ->
3175 lprintf_nl "Exception %s while set present"
3176 (Printexc2.to_string e);
3177 verify_all_chunks t;
3180 if !verbose_swarming then lprintf_nl "Downloaded after present %Ld" (downloaded t);
3182 (try
3183 let d = get_value "file_downloaded" value_to_int64 in
3184 if d <> downloaded t && !verbose then
3185 debug_wrong_downloaded t present d
3186 with Not_found -> ());
3187 end;
3189 (* TODO re-implement this
3190 (try
3191 let last_seen = get_value "file_chunks_age"
3192 (value_to_list value_to_int) in
3193 t.t_last_seen <- Array.of_list last_seen
3194 with _ -> ());
3199 (*************************************************************************)
3200 (* *)
3201 (* frontend_to_value *)
3202 (* *)
3203 (*************************************************************************)
3205 let frontend_to_value t other_vals =
3206 [("file_primary", bool_to_value t.t_primary);
3207 ("file_swarmer", string_to_value t.t_s.s_filename);
3208 ("file_mtime", float_to_value (try file_mtime t.t_file with _ -> 0.));
3209 ("file_chunks", string_to_value (VB.to_string (chunks_verified_bitmap t)))] @
3210 (if t.t_primary then
3211 [("file_present_chunks", List
3212 (List.map (fun (i1,i2) ->
3213 SmallList [int64_to_value i1; int64_to_value i2])
3214 (present_intervals t)))]
3215 else []) @
3216 [("file_downloaded", int64_to_value (downloaded t));
3217 ("file_chunks_age", List (Array.to_list
3218 (Array.map int_to_value t.t_last_seen)))] @
3219 other_vals
3221 (** Verify one chunk of swarmer [s], if any frontend of that swarmer
3222 has a chunk to verify *)
3224 let verify_one_chunk s =
3225 (* lprintf "verify_one_chunk: %d networks\n" (List.length s.s_networks); *)
3226 List.exists (fun t ->
3227 (* lprintf "verify_one_chunk of file %d\n" (file_num t.t_file); *)
3228 VB.existsi (fun i c ->
3229 if c = VB.State_complete then verify_chunk t i;
3230 c = VB.State_complete) t.t_converted_verified_bitmap
3231 ) s.s_networks
3232 (* lprintf "verify_one_chunk: nothing done\n"; *)
3234 (** Verify one chunk of each swarmer that needs it *)
3236 let verify_some_chunks () =
3237 HS.iter (fun s ->
3239 ignore(verify_one_chunk s)
3240 with _ -> ()
3241 ) swarmers_by_name
3243 (** Verify one chunk of the swarmer associated with [t], if needed *)
3245 let verify_one_chunk t =
3246 ignore(verify_one_chunk t.t_s)
3248 (** Merge a second frontend [f2] to a first one [f1], so they share
3249 the same swarmer.
3251 First swarmer [f1] must support some hashing scheme.
3252 Data of the second swarmer [f2] is currently lost during merging, so
3253 you'd better merge in swarmers quickly.
3254 Merging is denied if any of the two frontends is being used, so it
3255 may be necessary to pause them first, to get rid of any downloads.
3258 let merge f1 f2 =
3260 let s1 = HS.find swarmers_by_name { dummy_swarmer with
3261 s_filename = file_disk_name f1 } in
3262 let s2 = HS.find swarmers_by_name { dummy_swarmer with
3263 s_filename = file_disk_name f2 } in
3265 if s1.s_filename = s2.s_filename then
3266 failwith "Files are already sharing their swarmer";
3268 if s1.s_size <> s2.s_size then
3269 failwith "Files don't have the same size";
3271 let t2 =
3272 match s2.s_networks with
3273 | [t] -> t
3274 | list ->
3275 lprintf_nl "s_networks: %d files" (List.length list);
3276 failwith "Second file is already merged with other files"
3279 let t1 =
3280 match s1.s_networks with
3281 | [] -> assert false
3282 | t1 :: _ ->
3283 match t1.t_verifier with
3284 | NoVerification | VerificationNotAvailable ->
3285 failwith "Cannot use first file as a primary for swarming (no verification scheme)"
3286 | Verification _ | ForceVerification -> t1
3289 List.iter (fun (s, filename) ->
3290 Array.iteri (fun i nuploading ->
3291 if nuploading > 0 then
3292 failwith (Printf.sprintf "%s is currently being downloaded" filename)
3293 ) s.s_nuploading
3295 s1, "First file";
3296 s2, "Second file"];
3298 (* replace T2 swarmer *)
3299 associate false t2 t1.t_s
3301 (*************************************************************************)
3302 (* *)
3303 (* has_secondaries *)
3304 (* *)
3305 (*************************************************************************)
3307 let has_secondaries t =
3308 t.t_primary && List.length t.t_s.s_networks > 1
3310 (*************************************************************************)
3311 (* *)
3312 (* Remove swarmer *)
3313 (* *)
3314 (*************************************************************************)
3316 let remove_swarmer file_swarmer =
3317 match file_swarmer with
3318 | None -> ()
3319 | Some sw -> if not (has_secondaries sw)
3320 then HS.remove swarmers_by_name sw.t_s
3321 else lprintf_nl "Tried to remove swarmer with secondaries"
3323 (*************************************************************************)
3324 (* *)
3325 (* subfiles *)
3326 (* *)
3327 (*************************************************************************)
3329 let subfiles t =
3330 List.map (fun t -> t.t_file) t.t_s.s_networks
3332 (*************************************************************************)
3333 (* *)
3334 (* SwarmerOption *)
3335 (* *)
3336 (*************************************************************************)
3338 module SwarmerOption = struct
3340 let value_to_priority_interval v =
3341 match v with
3342 | List [v1;p] | SmallList [v1;p] ->
3343 (value_to_int64 v1, value_to_int p)
3344 | _ ->
3345 failwith "Options: Not a priority interval"
3347 let value_to_swarmer v =
3348 match v with
3349 | Module assocs ->
3350 let get_value name conv = conv (List.assoc name assocs) in
3351 let file_size = get_value "file_size" value_to_int64 in
3352 let file_name = get_value "file_name" value_to_string in
3353 let s = create_swarmer file_name file_size in
3354 (let order =
3356 get_value "file_download_random" value_to_bool
3357 with _ -> true
3359 s.s_strategy <- if order then AdvancedStrategy else LinearStrategy);
3360 (try
3361 let bitmap = Bitv.of_string (get_value "file_disk_allocation_bitmap"
3362 value_to_string) in
3363 if Bitv.length bitmap = Bitv.length s.s_disk_allocated then
3364 s.s_disk_allocated <- bitmap
3365 with _ -> ());
3366 (* s_disk_allocated missing or inconsistent ?
3367 set_present will fix it *)
3368 let block_sizes = get_value "file_chunk_sizes"
3369 (value_to_list value_to_int64) in
3370 List.iter (fun bsize ->
3371 split_blocks s bsize
3372 ) block_sizes;
3373 let intervals =
3375 get_value "file_priorities_intervals" (value_to_list value_to_priority_interval)
3376 with Not_found -> [(zero, 1)]
3378 if validate_intervals s.s_size intervals then
3379 s.s_priorities_intervals <- intervals
3380 else
3381 lprintf_nl "Failed to validate priority intervals, using default. File %s" file_name;
3382 swarmer_recompute_priorities_bitmap s;
3385 | _ -> assert false
3387 let swarmer_to_value s =
3388 Module [
3389 ("file_size", int64_to_value s.s_size);
3390 ("file_name", string_to_value s.s_filename);
3391 ("file_disk_allocation_bitmap", string_to_value
3392 (Bitv.to_string s.s_disk_allocated));
3393 ("file_chunk_sizes", list_to_value int64_to_value
3394 (List.map (fun t -> t.t_chunk_size) s.s_networks));
3395 ("file_priorities_intervals", List
3396 (List.map
3397 (fun (i_begin, priority) -> SmallList [int64_to_value i_begin; int_to_value priority])
3398 s.s_priorities_intervals));
3399 ("file_download_random", bool_to_value
3400 (match s.s_strategy with
3401 | AdvancedStrategy -> true
3402 | LinearStrategy -> false));
3405 let t =
3406 define_option_class "Swarmer" value_to_swarmer swarmer_to_value
3410 (** Checks most variants of a swarmer, nobably verification bitmaps
3411 consistency; Raise an exception if a problem is found *)
3413 let check_swarmer s =
3415 match s.s_networks with
3416 | [] -> lprintf_nl "found unused swarmer %s, discarding" s.s_filename;
3417 | tprim :: tail ->
3418 assert(tprim.t_primary);
3420 VB.iteri (fun i c ->
3421 if c = VB.State_verified then begin
3422 if List.exists (fun j -> VB.get s.s_verified_bitmap j <> VB.State_verified)
3423 tprim.t_blocks_of_chunk.(i) then
3424 failwith "Bad propagation of State_verified from primary to swarmer";
3426 else if List.exists (fun j -> VB.get s.s_verified_bitmap j = VB.State_verified)
3427 tprim.t_blocks_of_chunk.(i) then
3428 failwith "Swarmer has State_verified not coming from primary";
3429 ) tprim.t_converted_verified_bitmap;
3431 let fd = file_fd tprim.t_file in
3433 List.iter (fun t ->
3434 assert (not t.t_primary);
3435 assert (file_fd t.t_file == fd);
3437 VB.iteri (fun i c ->
3438 if c = VB.State_verified then begin
3439 if List.exists (fun j -> VB.get s.s_verified_bitmap j <> VB.State_verified)
3440 t.t_blocks_of_chunk.(i) then
3441 failwith "State_verified in secondary without State_verified in primary"
3442 end
3443 else if c = VB.State_complete then begin
3444 if List.exists (fun j -> VB.get s.s_verified_bitmap j <> VB.State_verified)
3445 t.t_blocks_of_chunk.(i) then
3446 failwith "State_complete in secondary without State_verified in primary"
3447 end
3448 ) t.t_converted_verified_bitmap
3449 ) tail
3450 with e ->
3451 print_s "ERROR" s;
3452 raise e
3454 (*************************************************************************)
3455 (* *)
3456 (* Option swarmers *)
3457 (* *)
3458 (*************************************************************************)
3460 let swarmers =
3461 define_option CommonComplexOptions.swarmers_section
3462 ["swarmers"] "All the swarmers used" (list_option SwarmerOption.t) []
3464 (*************************************************************************)
3465 (* *)
3466 (* Options hooks *)
3467 (* *)
3468 (*************************************************************************)
3470 let _ =
3471 set_after_save_hook files_ini (fun _ -> swarmers =:= []);
3472 set_before_save_hook files_ini (fun _ ->
3473 let list = ref [] in
3474 HS.iter (fun s ->
3475 if s.s_networks <> [] then
3476 list := s :: !list) swarmers_by_name;
3477 swarmers =:= !list;
3478 (* put primary frontends to the head, so that swarmers' invariants
3479 can be verified while downloads are being restored from ini files *)
3480 let primary_files, secondary_files =
3481 List.partition (fun file ->
3482 match file_files file with
3483 | primary_file :: _ when primary_file == file -> true
3484 | _ -> false) !!CommonComplexOptions.files in
3485 CommonComplexOptions.files =:= primary_files @ secondary_files
3487 set_after_load_hook files_ini (fun _ ->
3488 List.iter (fun s ->
3489 check_swarmer s;
3490 ) !!swarmers;
3491 swarmers =:= []
3494 (*************************************************************************)
3495 (* *)
3496 (* MAIN *)
3497 (* *)
3498 (*************************************************************************)
3500 (* Compute an approximation of the storage used by this module *)
3502 let _ =
3503 BasicSocket.add_infinite_timer 300. duplicate_chunks;
3504 Heap.add_memstat "CommonSwarming" (fun level buf ->
3505 let counter = ref 0 in
3506 let nchunks = ref 0 in
3507 let nblocks = ref 0 in
3508 let nranges = ref 0 in
3509 HS.iter (fun s ->
3510 let n = VB.length s.s_verified_bitmap in
3511 nchunks := !nchunks + n;
3513 Array.iter (fun b ->
3514 match b with
3515 | PartialBlock b ->
3516 incr nblocks;
3517 iter_block_ranges (fun _ -> incr nranges) b
3518 | _ -> ()
3519 ) s.s_blocks;
3521 incr counter
3522 ) swarmers_by_name;
3523 Printf.bprintf buf " Swarmers: %d\n" !counter;
3524 Printf.bprintf buf " nchunks: %d nblocks: %d nranges: %d\n"
3525 !nchunks !nblocks !nranges;
3526 Printf.bprintf buf " Storage (without blocks): %d bytes\n"
3527 ( !counter * 108 +
3528 !nchunks * 17 +
3529 !nblocks * 64 +
3530 !nranges * 84);
3532 let counter = ref 0 in
3533 let storage = ref 0 in
3534 HU.iter (fun up ->
3535 storage := !storage + 76 +
3536 Array.length up.up_complete_blocks * 4 +
3537 List.length up.up_ranges * (12 + 16 + 12 + 12 + 4) +
3538 Array.length up.up_partial_blocks * (16 + 12 + 12) +
3539 (8 + match up.up_intervals with
3540 | AvailableIntervals list -> List.length list * (12 + 12 + 12 + 12)
3541 | AvailableBitv b -> let ws = Sys.word_size in (ws/8) + ((ws / 8) * (Bitv.length b / (ws - 2)))
3543 incr counter;
3544 ) uploaders_by_num;
3545 Printf.bprintf buf " Uploaders: %d\n" !counter;
3546 Printf.bprintf buf " Storage: %d bytes\n" !storage;
3549 (* functions for priority bitmask *)
3551 let file_swarmer f =
3552 HS.find swarmers_by_name { dummy_swarmer with s_filename = file_disk_name f }
3555 (** set the priority bitmask for each chunk in a file *)
3556 let set_swarmer_chunk_priorities f priobitmap =
3557 let s = file_swarmer f in
3558 if String.length priobitmap = VB.length s.s_verified_bitmap then
3559 s.s_priorities_bitmap <- priobitmap
3562 (** get the priority bitmask for a file (do not mutate the string directly, use swarmer_set_interval) *)
3563 let get_swarmer_block_priorities s = s.s_priorities_bitmap
3564 let get_swarmer_block_verified s = s.s_verified_bitmap
3565 let get_swarmer_priorities_intervals s = s.s_priorities_intervals
3567 (* using compute_block_num outside of swarming code is probably
3568 broken, networks supports are aware of chunks, not blocks
3569 maybe other block-related functions should be censored in the same
3570 way ?
3571 tag block numbers are chunk numbers so they're not inadvertedly
3572 mistaken for each other ?
3574 (* let compute_block_num = () *)