1 (* Copyright 2001, 2002 b8_bavard, b8_fee_carabine, INRIA *)
3 This file is part of mldonkey.
5 mldonkey is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation; either version 2 of the License, or
8 (at your option) any later version.
10 mldonkey is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 GNU General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with mldonkey; if not, write to the Free Software
17 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
21 The jobs of swarmers are :
22 * select what data to ask from each uploader
23 * merge data coming from uploaders, potentially from different
24 networks, into a single Unix32 backend.
28 Each network frontend can have a different (fixed) chunk size
29 t1 +--------+--------+--------+--------+--------+--------+-------- chunks
30 t2 +------+------+------+------+------+------+------+------+------ chunks
32 each block is contained in at most /\ chunk_of_block
33 one chunk, for any network || mappings
36 +------+-+----+---+--+------+------++-----+--+---+----+-+------ blocks
37 | | | | | | | ... variable size
39 r<>r r r r r r<>r r<>r ranges
40 ^ one dbl linked list/block
41 | encoding missing data ranges
42 uploaders physically uploader
56 exception VerifierNotReady
59 AvailableIntervals
of (int64
* int64
) list
60 | AvailableBitv
of Bitv.t
64 | VerificationNotAvailable
66 | Verification
of uid_type array
68 let exit_on_error = ref false
70 let log_prefix = "[cSw]"
73 lprintf_nl2
log_prefix fmt
76 lprintf2
log_prefix fmt
84 module VB
= VerificationBitmap
86 (* If we want to become 'multinet', we should:
87 * there shouldn't be any block_size, instead blocks should correspond
88 to the largest blocks which are completely included in one chunk for
90 * a completed block should not be verified until all the blocks are
91 completed for all networks.
92 * swarmers should be loaded before other files, so that they can be
93 used and shared by different files.
98 TODO: s_last_seen is useless, only t_last_seen is useful, at least in the
103 (* network "frontend"/"view"/... to a swarmer *)
105 network frontend use "chunks" of data,
106 swarmer use "blocks" of data *)
109 mutable t_primary
: bool;
111 mutable t_s
: swarmer
;
112 t_chunk_size
: int64
;
115 mutable t_converted_verified_bitmap
: VerificationBitmap.t
;
116 mutable t_last_seen
: int array
;
117 mutable t_ncomplete_chunks
: int;
118 mutable t_nverified_chunks
: int;
120 mutable t_verifier
: verification
; (* information available to
121 check data correctness *)
122 mutable t_verified
: (int -> int -> unit); (* function to call
123 when a chunk is verified;
127 chunk just verified *)
129 (* mapping from network chunks to swarmer blocks *)
130 mutable t_blocks_of_chunk
: int list array
;
131 (* mapping from swarmer blocks to network chunks *)
132 mutable t_chunk_of_block
: int array
;
139 s_disk_allocation_block_size
: int64
;
141 mutable s_networks
: t list
; (** list of frontends, primary at head
142 t.t_s = s <=> t in s.s_networks *)
143 mutable s_strategy
: swarming_strategy
;
145 mutable s_verified_bitmap
: VerificationBitmap.t
;
146 mutable s_priorities_bitmap
: string;
147 mutable s_priorities_intervals
: (int64
* int) list
;
148 (* beginning, priority *)
149 mutable s_disk_allocated
: Bitv.t
;
150 mutable s_availability
: int array
;
151 mutable s_nuploading
: int array
;
152 (* mutable s_last_seen : int array; *)
154 mutable s_blocks
: block_v array
;
155 mutable s_block_pos
: int64 array
; (** offset of the beginning of
161 | PartialBlock
of block
167 mutable block_num
: int;
168 mutable block_begin
: Int64.t
;
169 mutable block_end
: Int64.t
;
170 mutable block_ranges
: range
; (** [range] of the double-linked
171 list of ranges associated to the
173 what about using a standard list
175 or a balanced tree ? *)
176 mutable block_remaining
: int64
; (* amount of bytes missing. *)
177 mutable block_unselected_remaining
: int64
; (* same, less ranges
178 selected for uploaders. *)
182 mutable range_block
: block
;
183 mutable range_begin
: Int64.t
;
184 mutable range_end
: Int64.t
;
185 mutable range_prev
: range
option;
186 mutable range_next
: range
option;
187 mutable range_nuploading
: int; (* number of uploaders currently
188 referencing that range *)
195 mutable up_declared
: bool;
197 mutable up_intervals
: intervals
;
198 mutable up_complete_blocks
: int array
; (** block numbers *)
199 mutable up_ncomplete
: int; (** number of blocks not yet handled,
201 up_complete_blocks *)
203 mutable up_partial_blocks
: (int * int64
* int64
) array
; (** block
208 mutable up_npartial
: int; (** number of blocks not yet handled,
212 mutable up_blocks
: uploader_block list
;
214 mutable up_ranges
: (int64
* int64
* range
) list
; (* ranges referenced by
218 and uploader_block
= {
220 up_block_begin
: int64
;
221 up_block_end
: int64
;
225 Ranges represent "holes" of missing data in a block; Data is
226 missing between offsets range_begin and range_end.
228 [block]'s [block_ranges] reference the first (smallest offsets) of
229 the [range]s associated with it.
231 [range]s are double-linked together thru [range_prev] and
234 r.range_next.range_prev = r.range_prev.range_next = r
235 ( when links are different from None )
237 [range]s have a backlink to their "owner" [block]:
239 b.block_ranges.{range_next}*.{range_prev}*.range_block = b
241 ranges offsets are all within their block's offsets limits, do not
242 overlap, and are sorted in increasing offsets order:
244 b.block_begin <= b.block_ranges.block_begin ... <=
245 r.range_prev.range_end <= r.range_begin <=
246 r.range_end <= r.range_next.range_begin <= ...
249 (* range owners are only used thru uploaders.up_ranges. blocks could be
250 saved in [uploaders]' [up_ranges] along range, but would
251 need updating when the swarmer is splitted.
253 Removing [range] from [up_ranges] and [range_nuploading] from
254 [range] could be good too, because they're not correctly updated
255 when the swarmer is splitted. Again, getting rid of them is a
256 problem of performance.
260 Data missing for a block is the sum of the "sizes" of its ranges.
262 b.block_remaining = sum (r.range_end - r.range_begin) b.block_ranges
265 (* swarmer invariants ?
266 s.s_verified_bitmap.[i] = State_missing <=> s_blocks.[i] = EmptyBlock
267 s.s_verified_bitmap.[i] = State_partial <=> s_blocks.[i] = PartialBlock _
268 s.s_verified_bitmap.[i] = State_complete <=> s_blocks.[i] = CompletedBlock
269 s.s_verified_bitmap.[i] = State_verified <=> s_blocks.[i] = VerifiedBlock
270 If so, why not drop s_verified_bitmap, and replace it by some
271 verified_bitmap s i and verified_bitmap_all s functions ?
274 (* frontend invariants ?
276 List.length (List.filter (fun x -> x >= State_complete) t_converted_verified_bitmap)
278 List.length (List.filter (fun x -> x = State_verified) t_converted_verified_bitmap)
280 hence t_ncomplete_chunks >= t_nverified_chunks
282 All chunks are [t_chunk_size] bytes in size, and first start at
283 offset 0. This is assumed in [create], [associate], [verify_chunk],
284 [duplicate_chunks], maybe more.
287 (* uploaders invariants ?
288 uploader block numbers are stored in reverse order in
289 up_complete_blocks and up_partial_blocks (first blocks at the end
290 of arrays), then array is processed from end to begin.
292 0 <= up_ncomplete < Array.length up_complete_blocks
293 0 <= up.up_npartial < Array.length up_partial_blocks
295 When a block has been selected, it's pushed out of the first
296 up_ncomplete first elements of up_complete_blocks by swapping it
297 with the #(up_ncomplete-1) element, then up_ncomplete is
298 decreased. (and similarly with s/complete/partial/ ?)
300 The question is now, aren't there better datastructures than
301 arrays for the job ? ;)
304 (*************************************************************************)
308 (*************************************************************************)
310 module HS
= Weak.Make
(struct
312 let hash file
= Hashtbl.hash file
.s_filename
314 let equal x y
= x
.s_filename
= y
.s_filename
317 let swarmers_by_name = HS.create
31
319 module HT
= Weak.Make
(struct
320 (* since type declarations are recursive, One can't write type t = t *)
321 (* this is a workaround; a bit ugly, but works *)
326 let equal x y
= x
.t_num
= y
.t_num
329 let frontends_by_num = HT.create
31
331 module HU
= Weak.Make
(struct
333 let hash u
= Hashtbl.hash (client_num u
.up_client
)
335 let equal x y
= (client_num x
.up_client
) = (client_num y
.up_client
)
338 let uploaders_by_num = HU.create
113
340 let frontend_counter = ref 0
341 let swarmer_counter = ref 0
343 (** sets [t.t_last_seen] of the verified blocks to current time, and
344 associated file's [t.t_s.s_file] last seen value to the oldest of the
345 remaining last seen values *)
347 let compute_last_seen t
=
348 let last_seen_total = ref (BasicSocket.last_time
()) in
350 if c
= VB.State_verified
then
351 t
.t_last_seen
.(i
) <- BasicSocket.last_time
()
353 last_seen_total := min
!last_seen_total t
.t_last_seen
.(i
)
354 ) t
.t_converted_verified_bitmap
;
355 set_file_last_seen t
.t_file
!last_seen_total;
358 (** (internal) return a 0 sized range at offset [pos], and assigned to
361 let void_range b pos
=
368 range_nuploading
= 0;
373 let compute_range_size r =
374 r.range_end
-- r.range_begin
376 let rec ranges_iter f
r =
378 match r.range_next
with
383 let rec ranges_fold f acc
r =
385 match r.range_next
with
387 | Some rr
-> ranges_fold f
acc rr
389 let rec ranges_for_all p
r =
391 (match r.range_next
with
393 | Some
r -> ranges_for_all p
r)
395 let block_ranges_for_all p b
=
396 ranges_for_all p b
.block_ranges
398 let block_ranges_fold f
acc b
=
399 ranges_fold f
acc b
.block_ranges
401 (** (internal) assigns range [r], and all other ranges along
402 [range_next] links, to block [b] *)
404 let rec own_ranges b
r =
405 ranges_iter (fun r -> r.range_block
<- b
) r
408 Find ranges that are after [cut_pos] offset, unlink them from r
409 double-linked list of ranges, set their owner to [b] and return
410 the first of the removed ranges.
412 If all ranges are before [cut_pos] return a 0-sized range.
414 If [cut_pos] is within one of the ranges, that range is cut in
415 two at [cut_pos] offset, and link each half to its side.
417 Also, what do to if range_nuploading is not 0 ?
418 => [cut_ranges_after] is being called from [split_blocks] that
419 does not preserve [s_nuploading] for blocks either
422 let cut_ranges_after b
r cut_pos
=
424 (* after the cut position already ? *)
425 if r.range_begin
>= cut_pos
then begin
426 (match r.range_prev
with
428 let b1 = r.range_block
in
429 b1.block_ranges
<- void_range b1 cut_pos
431 rp
.range_next
<- None
;
432 r.range_prev
<- None
);
435 (* still before the cut position ? *)
436 else if r.range_end
<= cut_pos
then
437 match r.range_next
with
438 | None
-> void_range b cut_pos
441 (* across cut position, must split a range *)
443 let split_r = { r with
444 range_begin
= cut_pos
;
447 (match split_r.range_next
with
449 | Some rr
-> rr
.range_prev
<- Some
split_r);
452 r.range_end
<- cut_pos
;
453 r.range_next
<- None
;
455 if r.range_nuploading
<> 0 then
456 lprintf_nl "WARNING: Splitting a range currently being uploaded, don't know what to do with range_nuploading :/";
459 let cut_ranges = iter r in
460 own_ranges b
cut_ranges;
463 (** (internal) return the offset of the end of the [i]th block of
466 let compute_block_end s i
=
467 let b = s
.s_block_pos
in
468 if Array.length
b = i
+ 1 then
473 (** (internal) return the offset of the beginning of the [i]th block
476 let compute_block_begin s i
=
477 let b = s
.s_block_pos
in
480 (** Finds the number of the block containing [chunk_pos] offset, using
481 dichotomy. Blocks are half opened [block_begin, block_end[ *)
483 (* 0 <= chunk_pos < s.s_size *)
484 let compute_block_num s chunk_pos
=
485 assert (0L <= chunk_pos
&& chunk_pos
< s
.s_size
);
486 let b = s
.s_block_pos
in
488 0 <= min <= max <= Array.length b - 1
489 compute_block_begin s min <= chunk_pos < compute_block_end s max *)
491 let rec iter min max
=
492 if min
= max
then min
493 else (* from now on, min < max *)
494 let medium = (min
+ max
) / 2 in
495 (* Euclide => 2*medium <= min + max <= 2*medium + 1 *)
496 (* min < max => 2*min < min + max < 2*max
497 => min <= medium < max *)
499 if chunk_pos
< b.(medium) then
500 iter min
(medium - 1)
504 (* min = medium < max => 2*min < min + max <= 2*min + 1
505 <=> min < max <= min + 1
507 if chunk_pos
< b.(max
) then
510 let i = iter 0 (Array.length
b - 1) in
512 lprintf_nl "%Ld is block %d [%Ld-%Ld]" chunk_pos
i
513 (compute_block_begin s
i) (compute_block_end s
i);
516 (** Return true if ranges fully "cover" their block
517 ("the block is made of holes") *)
519 let block_is_empty b =
520 let rec iter begin_pos
r =
521 r.range_begin
= begin_pos
&&
522 (match r.range_next
with
523 | Some rr
-> iter r.range_end rr
524 | None
-> r.range_end
= b.block_end
)
526 iter b.block_begin
b.block_ranges
528 let block_is_full b =
529 let r = b.block_ranges
in
530 r.range_next
= None
&& r.range_begin
= r.range_end
532 (** iter function [f] over all the blocks contained in the list of [intervals]
534 [f] will receive block number, block beginning and ending offsets,
535 and overlapping interval beginning and ending offsets.
537 If an interval starts halfway of a block, iteration starts on the
538 next block, with interval_begin < block_begin indicating where the
539 interval really started.
541 If an interval ends halfway of a block, iteration ends on that
542 block, with interval_end < block_end indicating where the interval
546 let iter_intervals s f intervals
=
547 let nchunks = Array.length s
.s_blocks
in
548 List.iter (fun (interval_begin
, interval_end
) ->
549 let interval_begin = min
interval_begin s
.s_size
in
550 let interval_end = min
interval_end s
.s_size
in
551 (* lprintf "apply on %Ld-%Ld\n" interval_begin interval_end; *)
552 if interval_begin < interval_end then
553 let i0 = compute_block_num s
interval_begin in
554 let block_begin = compute_block_begin s
i0 in
555 let rec iter_blocks i block_begin interval_begin =
556 (* lprintf "iter_blocks %d %Ld %Ld\n" i block_begin interval_begin; *)
557 if i < nchunks && block_begin < interval_end then
558 let block_end = compute_block_end s
i in
559 let current_end = min
block_end interval_end in
562 lprintf_nl "Apply: %d %Ld-%Ld %Ld-%Ld"
563 i block_begin block_end interval_begin current_end;
565 f
i block_begin block_end interval_begin current_end;
566 iter_blocks (i+1) block_end block_end
568 iter_blocks i0 block_begin interval_begin
571 (*************************************************************************)
575 (*************************************************************************)
577 let dummy_swarmer = {
581 s_disk_allocation_block_size
= zero
;
583 s_strategy
= AdvancedStrategy
;
584 s_verified_bitmap
= VB.create
0 VB.State_missing
;
585 s_priorities_bitmap
= "";
586 s_priorities_intervals
= [(zero
, 1)];
587 s_disk_allocated
= Bitv.create
0 false;
590 s_availability
= [||];
594 let insert_prio_interval intervals last_end
(new_start
,new_end
,new_prio
) =
595 assert (new_start
< new_end
);
596 assert (new_end
<= last_end
);
597 let rec insert_end prev_prio
= function
598 | [] -> if new_end
= last_end
then [] else if new_prio
<> prev_prio
then [new_end
, prev_prio
] else []
599 | (pos
,prio
as this
) :: tail
->
600 match Int64.compare pos new_end
with
601 | -1 -> insert_end prio tail
(* eat it *)
603 if new_prio
<> prev_prio
then
604 (new_end
, prev_prio
) :: this
:: tail
608 assert (prio
<> prev_prio
);
609 if new_prio
<> prio
then this
:: tail
else tail
612 let rec insert prev_prio
= function (* not tail rec ! *)
614 if new_prio
<> prev_prio
then
615 (new_start
, new_prio
) :: insert_end prev_prio
[]
617 insert_end prev_prio
[]
618 | (pos
, prio
as this
) :: tail
->
619 match Int64.compare pos new_start
with
620 | -1 -> this
:: insert prio tail
(* leave current and continue searching *)
622 if new_prio
<> prev_prio
then
623 (new_start
, new_prio
) :: insert_end prev_prio
(this
::tail
) (* mark new interval and search interval end *)
625 insert_end prev_prio
(this
::tail
) (* start of new interval gets merged with previous interval *)
627 assert (prio
<> prev_prio
); (* invariant *)
628 if new_prio
<> prev_prio
then
629 (new_start
, new_prio
) :: insert_end prio tail
634 insert (-1) intervals
636 let rec validate_intervals limit
= function
637 | (p1
,prio1
)::((p2
,prio2
)::_
as tail
) when prio1
<> prio2
&& p1
< p2
-> validate_intervals limit tail
638 | [p
,_
] when p
< limit
-> true
641 let priority_zero = Char.chr
0
643 let swarmer_recompute_priorities_bitmap s
=
644 String.fill s
.s_priorities_bitmap
0
645 (String.length s
.s_priorities_bitmap
) priority_zero;
646 let mark interval_begin interval_end priority
=
647 if interval_end > interval_begin && s
.s_size
>= interval_end && interval_begin >= 0L then
649 () (* do not mark - zero blocks will not overwrite boundaries of non-zero blocks *)
652 let i_begin = compute_block_num s
interval_begin in
653 let i_end = compute_block_num s
(Int64.pred
interval_end) in
654 let priochar = Char.chr
(max
0 (min priority
255)) in
655 (* String.fill s.s_priorities_bitmap i_begin (i_end - i_begin + 1) priochar *)
656 for i = i_begin to i_end do
657 s
.s_priorities_bitmap
.[i] <- priochar
661 lprintf_nl "WARNING: recompute_priorities %Ld %Ld %Ld" interval_begin interval_end s
.s_size
663 let rec loop = function
664 | (i_begin, priority
) :: ((i_end,_
) :: _
as tail
) -> mark i_begin i_end priority
; loop tail
665 | [i_begin, priority
] -> mark i_begin s
.s_size priority
666 | [] -> lprintf_nl "WARNING: recompute_priorities []"
668 loop s
.s_priorities_intervals
670 (* Intervals with fixed byte positions are needed to recompute priorities bitmap
671 after merge, cause priobitmap depends on block size *)
672 let swarmer_set_interval s
(p1
,p2
,prio
as new_interval
) =
674 lprintf_nl "swarmer_set_interval %S %Ld (%Ld,%Ld,%u)" s
.s_filename s
.s_size p1 p2 prio
;
675 s
.s_priorities_intervals
<- insert_prio_interval s
.s_priorities_intervals s
.s_size new_interval
;
676 swarmer_recompute_priorities_bitmap s
678 (** if a swarmer is already associated with that [file_name], return it;
679 Otherwise create a new one with default values, that will be fixed
680 by the first frontend association *)
681 let create_swarmer file_name file_size
=
683 HS.find
swarmers_by_name
685 s_filename
= file_name
688 incr
swarmer_counter;
691 (* to avoid extreme disk fragmentation, space is pre-allocated in
692 blocks of this size *)
693 let disk_allocation_block_size =
695 (round_up64
(max
1L (file_size
// 200L)) (megabytes
1)) in
697 1 + Int64.to_int
(Int64.pred file_size
// disk_allocation_block_size) in
700 s_num
= !swarmer_counter;
701 s_filename
= file_name
;
703 s_disk_allocation_block_size
= disk_allocation_block_size;
707 s_strategy
= AdvancedStrategy
;
709 s_verified_bitmap
= VB.create
nblocks VB.State_missing
;
710 s_priorities_bitmap
= String.make
nblocks priority_zero;
711 s_priorities_intervals
= [(zero
, 1)]; (* JAVE init all prios to 1, thus all chunks will be downloaded as usual *)
712 s_disk_allocated
= Bitv.create
ndiskblocks false;
713 s_blocks
= Array.create
nblocks EmptyBlock
;
714 s_block_pos
= Array.create
nblocks zero
;
715 s_availability
= Array.create
nblocks 0;
716 s_nuploading
= Array.create
nblocks 0;
717 (* s_last_seen = Array.create nblocks 0; *)
720 swarmer_recompute_priorities_bitmap s;
721 HS.add
swarmers_by_name s;
724 (** Split swarmer existing blocks in at [chunk_size] boundaries *)
725 let split_blocks s chunk_size
=
727 let size = s.s_size
in
729 let nblocks = Array.length
s.s_blocks
in
730 (* Split existing blocks at [chunk_size] boundaries
732 [index_s] is the index of the existing block being analysed
733 [chunk_begin] is the offset of the beginning of the current containing chunk
734 [new_blocks] is the list of new blocks already splitted, in
736 List contains tuples: block, beginning offset, verified status char *)
737 let rec iter index_s chunk_begin new_blocks
=
738 (* lprintf "iter (1) %d/%d %Ld\n" index_s nblocks chunk_begin; *)
739 if index_s
= nblocks then List.rev new_blocks
else
742 let block_begin = compute_block_begin s index_s
in
743 let block_end = compute_block_end s index_s
in
746 let chunk_end = chunk_begin
++ chunk_size
in
747 let chunk_end = min
chunk_end size in
749 if chunk_end > block_end then
751 s.s_blocks
.(index_s
),
753 VB.get
s.s_verified_bitmap index_s
755 iter (index_s
+1) chunk_begin
new_blocks
757 else if chunk_end = block_end then
759 s.s_blocks
.(index_s
),
761 VB.get
s.s_verified_bitmap index_s
763 iter (index_s
+1) chunk_end new_blocks
766 (* chunk_end < block_end
767 We need to split this block in two parts *)
768 s.s_block_pos
.(index_s
) <- chunk_end;
769 match s.s_blocks
.(index_s
) with
770 | EmptyBlock
| CompleteBlock
| VerifiedBlock
->
772 (* s.s_blocks.(index_s) will appear twice in the result list *)
774 s.s_blocks
.(index_s
),
776 VB.get
s.s_verified_bitmap index_s
778 iter index_s
chunk_end new_blocks
781 (* split b1 in two; b2 is the part after [chunk_end] offset *)
785 block_begin = chunk_end;
786 block_end = b1.block_end;
787 block_ranges
= b1.block_ranges
; (* fixed below *)
788 block_num
= 0; (* fixed below *)
789 block_remaining
= zero
; (* fixed below *)
790 block_unselected_remaining
= zero
; (* fixed below *)
792 b2.block_ranges
<- cut_ranges_after b2 b1.block_ranges
chunk_end;
793 b1.block_end <- chunk_end;
796 (if block_is_full b1 then
797 (* lprintf "Partial block b1 should become CompleteBlock\n"; *)
802 ) else if block_is_empty b1 then
803 (* lprintf "Partial block b1 should become EmptyBlock\n"; *)
811 VB.get
s.s_verified_bitmap index_s
815 if block_is_full b2 then begin
816 (* lprintf "Partial block b2 should become CompleteBlock\n"; *)
817 s.s_blocks
.(index_s
) <- CompleteBlock
;
818 VB.set
s.s_verified_bitmap index_s
VB.State_complete
820 else if block_is_empty b2 then begin
821 (* lprintf "Partial block b2 should become EmptyBlock\n"; *)
822 s.s_blocks
.(index_s
) <- EmptyBlock
;
823 VB.set
s.s_verified_bitmap index_s
VB.State_missing
;
826 s.s_blocks
.(index_s
) <- PartialBlock
b2;
827 iter index_s
chunk_end new_blocks
830 let blocks = iter 0 zero
[] in
832 (* blocks have been splitted, now rebuild swarmer *)
833 let nblocks = List.length
blocks in
834 (* lprintf "%d blocks to generate\n" nblocks; *)
836 if Array2.exists
((<>) 0) s.s_availability
then
837 lprintf_nl "WARNING: splitting swarmer discarded availability counters";
838 if Array2.exists
((<>) 0) s.s_nuploading
then
839 lprintf_nl "WARNING: splitting a swarmer beging uploaded to";
841 s.s_blocks
<- Array.create
nblocks EmptyBlock
;
842 s.s_verified_bitmap
<- VB.create
nblocks VB.State_missing
;
843 s.s_priorities_bitmap
<- String.make
nblocks priority_zero;
844 s.s_block_pos
<- Array.create
nblocks zero
;
845 s.s_availability
<- Array.create
nblocks 0; (* not preserved ? *)
846 s.s_nuploading
<- Array.create
nblocks 0; (* not preserved ? *)
847 (* s.s_last_seen <- Array.create nblocks 0; *)
849 let rec iter i list
=
852 | (b, pos
, c
) :: tail
->
858 let block_size = compute_block_end s i --
859 compute_block_begin s i in
860 let remaining, unselected_remaining
=
861 block_ranges_fold (fun (racc
, uracc
) r ->
862 let range_size = compute_range_size r in
864 if r.range_nuploading
= 0 then
865 uracc
-- range_size else uracc
)
866 ) (block_size, block_size) b in
867 b.block_remaining
<- remaining;
868 b.block_unselected_remaining
<- unselected_remaining
;
870 | EmptyBlock
| CompleteBlock
| VerifiedBlock
-> ()
873 VB.set
s.s_verified_bitmap
i c
;
874 s.s_block_pos
.(i) <- pos
;
879 swarmer_recompute_priorities_bitmap s
882 (** Associate a(n additional) frontend to a swarmer *)
884 let associate is_primary t
s =
885 (* a swarmer cannot be associated more than once to a network *)
886 if not
(List.memq t
s.s_networks
) then
887 let size = file_size t
.t_file
in
889 (* what about raising an exception instead ? *)
890 if s.s_size
<> size then begin
891 lprintf_nl "file_size for %s does not match: swarmer %Ld / real %Ld" s.s_filename
s.s_size
size;
896 t
.t_converted_verified_bitmap
<- VB.create t
.t_nchunks
VB.State_missing
;
897 t
.t_last_seen
<- Array.create t
.t_nchunks
0;
898 t
.t_chunk_of_block
<- [||];
899 t
.t_blocks_of_chunk
<- Array.create t
.t_nchunks
[];
901 (* invariant: primary frontend is at the head of swarmer's
902 [s_networks], and is the first associated with the swarmer *)
903 if is_primary
then begin
905 assert(s.s_networks
= []);
907 (* was s.s_networks <- t :: s.s_networks; *)
909 match s.s_networks
with
911 assert(tprim
.t_primary
);
912 if file_disk_name t
.t_file
<> file_disk_name tprim
.t_file
then
913 (* TODO: transfer data into swarmer instead of discarding it *)
914 Unix32.remove
(file_fd t
.t_file
);
915 t
.t_primary
<- false;
916 s.s_networks
<- s.s_networks
@ [t
];
920 (match s.s_networks
with
923 List.iter (fun tt
-> assert(not tt
.t_primary
)) tail
924 | [] -> assert false);
926 (* at this point, we are supposed to split the blocks in the swarmer
927 in smaller blocks depending on the block_size of this network, and compute
928 the t_chunk_of_block and t_blocks_of_chunk fields. *)
930 let chunk_size = t
.t_chunk_size
in
931 split_blocks s chunk_size;
933 let nblocks = Array.length
s.s_blocks
in
934 (* For all networks, adjust the chunks and mappings *)
936 let nchunks = VB.length t
.t_converted_verified_bitmap
in
937 t
.t_chunk_of_block
<- Array.create
nblocks 0;
938 t
.t_blocks_of_chunk
<- Array.create
nchunks [];
940 let chunk_size = t
.t_chunk_size
in
941 for i = 0 to nblocks - 1 do
942 let block_begin = compute_block_begin s i in
943 let chunk = Int64.to_int
(block_begin // chunk_size) in
944 t
.t_chunk_of_block
.(i) <- chunk;
945 t
.t_blocks_of_chunk
.(chunk) <- i :: t
.t_blocks_of_chunk
.(chunk)
949 (* TODO: If not primary, set_file_downloaded should be called *)
950 if not is_primary
then
951 add_file_downloaded t
.t_file
(zero
-- file_downloaded t
.t_file
);
953 (* check that all frontends use the primary's file backend *)
954 (match s.s_networks
with
955 | t
:: tail
when is_primary
->
957 set_file_fd tt
.t_file
(file_fd t
.t_file
)
960 set_file_fd t
.t_file
(file_fd tprim
.t_file
)
961 | [] -> assert false)
963 (** Create a primary frontend and its swarmer *)
965 let create ss file
chunk_size =
967 incr
frontend_counter;
968 let size = file_size file
in
969 (* wrong if size is a multiple of chunk_size, or on purpose ? *)
971 1 + Int64.to_int
(Int64.pred
size // chunk_size) in
974 t_num
= !frontend_counter;
980 t_chunk_size
= chunk_size;
982 t_ncomplete_chunks
= 0;
983 t_nverified_chunks
= 0;
985 t_converted_verified_bitmap
= VB.create nchunks VB.State_missing
;
986 t_last_seen
= Array.create nchunks 0;
988 t_verifier
= NoVerification
;
989 t_verified
= (fun _ _
-> ());
991 t_chunk_of_block
= [||];
992 t_blocks_of_chunk
= Array.create nchunks [];
995 HT.add
frontends_by_num t;
999 (* copy the disk block over itself; Safer than overwriting it with zeroes *)
1001 let iter_disk_space f
t interval_begin interval_end =
1003 (* 0 <= chunk_pos < s.s_size *)
1004 let compute_disk_block_num s chunk_pos
=
1005 assert (0L <= chunk_pos
&& chunk_pos
< s.s_size
);
1006 Int64.to_int
(chunk_pos
// s.s_disk_allocation_block_size
) in
1008 if interval_begin < interval_end then
1010 let fd = file_fd
t.t_file
in
1011 let first_disk_block = compute_disk_block_num s interval_begin in
1012 let last_disk_block = compute_disk_block_num s (Int64.pred
interval_end) in
1013 for disk_block
= first_disk_block to last_disk_block do
1017 exception Not_preallocated_block_found
1019 let is_fully_preallocated t interval_begin interval_end =
1021 iter_disk_space (fun s fd disk_block
->
1022 if not
(Bitv.get
s.s_disk_allocated disk_block
) then
1023 raise Not_preallocated_block_found
1024 ) t interval_begin interval_end;
1026 with Not_preallocated_block_found
-> false
1028 let preallocate_disk_space t interval_begin interval_end =
1029 iter_disk_space (fun s fd disk_block
->
1030 if not
(Bitv.get
s.s_disk_allocated disk_block
) then begin
1031 let pos = (Int64.of_int disk_block
) ** s.s_disk_allocation_block_size
in
1032 let size = min
(file_size
t.t_file
-- pos) s.s_disk_allocation_block_size
in
1033 if !verbose
then lprintf_nl "preallocating %Ld bytes for %s" size (file_best_name
t.t_file
);
1034 Unix32.copy_chunk
fd fd pos pos (Int64.to_int
size)
1036 Bitv.set
s.s_disk_allocated disk_block
true
1037 ) t interval_begin interval_end
1039 let mark_disk_space_preallocated t interval_begin interval_end =
1040 iter_disk_space (fun s fd disk_block
->
1041 Bitv.set
s.s_disk_allocated disk_block
true
1042 ) t interval_begin interval_end
1044 let check_finished t =
1045 let file = t.t_file
in
1046 match file_state
file with
1055 | FileDownloading
->
1056 if VB.existsi
(fun i c
-> c
<> VB.State_verified
)
1057 t.t_converted_verified_bitmap
then false
1059 if file_size
file <> file_downloaded
t.t_file
then
1060 lprintf_nl "Downloaded size differs after complete verification";
1064 (** (debug) output a [swarmer] to current log *)
1067 lprintf_nl "Ranges after %s:" str
;
1070 lprintf_n " %Ld-%Ld(%d)"
1071 r.range_begin
r.range_end
r.range_nuploading
;
1072 match r.range_next
with
1073 | None
-> lprint_newline
()
1074 | Some
r -> iter r in
1076 Array.iteri
(fun i b ->
1077 lprintf_n " %d: " i;
1078 let block_begin = compute_block_begin s i in
1079 let block_end = compute_block_end s i in
1080 lprintf
"%Ld - %Ld [%Ld] %c " block_begin block_end
1081 (block_end -- block_begin)
1083 (VB.get
s.s_verified_bitmap
i));
1085 let j = t.t_chunk_of_block
.(i) in
1086 lprintf
"(b %d %c [" j
1088 (VB.get
t.t_converted_verified_bitmap
j));
1089 List.iter (fun ii
-> lprintf
"%d " ii
) t.t_blocks_of_chunk
.(j);
1095 lprintf
" [%Ld .. %Ld] --> "
1096 b.block_begin b.block_end;
1098 | EmptyBlock
-> lprintf_nl "_"
1099 | CompleteBlock
-> lprintf_nl "C"
1100 | VerifiedBlock
-> lprintf_nl "V"
1103 lprintf_nl "Files:";
1105 lprintf_nl " File num: %d" (file_num
t.t_file
);
1106 lprintf_nl " %s" (if t.t_primary
then "primary" else "secondary");
1107 lprintf_nl " Downloaded: %Ld" (file_downloaded
t.t_file
);
1108 lprintf_nl " Bitmap: %s" (VB.to_string
t.t_converted_verified_bitmap
)
1111 (** iter function f over all the ranges of a block *)
1113 let iter_block_ranges f
b =
1114 let rec iter_range f
r =
1115 let next = r.range_next
in (* keep next range in case f mutates it *)
1119 | Some rr
-> iter_range f rr
1121 iter_range f
b.block_ranges
1123 (** (debug) output a [block] to current log *)
1126 lprintf_n "Block %d: %Ld-%Ld"
1127 b.block_num
b.block_begin b.block_end;
1129 lprintf_nl " ranges:";
1130 iter_block_ranges (fun r ->
1131 lprintf_nl " %Ld-%Ld" r.range_begin
r.range_end
) b;
1134 (** (shadows CommonFile.add_file_downloaded)
1135 increments amount downloaded of the primary frontend of the swarmer,
1136 and of maybe_t, if provided, and if it's different from the primary. *)
1138 let add_file_downloaded maybe_t
s size =
1139 (* lprintf "add_file_downloaded %Ld\n" size; *)
1140 match s.s_networks
with
1142 assert(t.t_primary
);
1143 add_file_downloaded t.t_file
size;
1147 if t.t_num
<> tt
.t_num
then
1148 add_file_downloaded tt
.t_file
size);
1149 if file_downloaded
t.t_file
< zero
then
1150 lprintf_nl "ERROR: file_downloaded < zero!";
1154 (** Close all the ranges of a block, adding their size to the
1155 downloaded amount *)
1157 let close_block_ranges maybe_t
s b =
1158 iter_block_ranges (fun r ->
1159 let added = compute_range_size r in
1160 add_file_downloaded maybe_t
s added;
1161 b.block_remaining
<- b.block_remaining
-- added;
1162 if r.range_nuploading
= 0 then
1163 b.block_unselected_remaining
<- b.block_unselected_remaining
-- added;
1164 r.range_begin
<- r.range_end
;
1165 r.range_prev
<- None
;
1166 r.range_next
<- None
) b;
1167 if b.block_remaining
<> 0L then
1168 lprintf_nl "WARNING: block_remaining should be 0 after close_block_ranges";
1169 if b.block_unselected_remaining
<> 0L then
1170 lprintf_nl "WARNING: block_unselected_remaining should be 0 after close_block_ranges"
1172 (*************************************************************************)
1174 (* swarmers verified bitmaps *)
1176 (*************************************************************************)
1178 (* For every swarmer, there is a "primary" verifier, and secondary
1179 verifiers. When a block is downloaded, it is tagged State_complete
1180 in the verified_bitmap, and this State_complete is propagated to
1181 the primary bitmap if possible (if all sub-blocks are also
1184 If the primary chunk becomes State_complete, then a verification is
1185 needed on the primary. If the verification works, the
1186 verified_bitmap becomes State_verified, and the secondary verifiers
1187 are tagged with State_complete (if they use a different
1188 verification scheme) or State_verified (if no verification scheme
1189 or a verification scheme that has already been used).
1192 (* corruption has been detected, and the block has been reset to
1195 let set_swarmer_state_missing s i =
1196 let current_state = VB.get
s.s_verified_bitmap
i in
1197 match current_state with
1198 | VB.State_complete
| VB.State_verified
->
1199 (VB.set
s.s_verified_bitmap
i VB.State_missing
;
1201 let j = t.t_chunk_of_block
.(i) in
1202 match VB.get
t.t_converted_verified_bitmap
j with
1203 | VB.State_missing
-> ()
1204 | VB.State_partial
->
1205 if List.for_all
(fun i -> VB.get
s.s_verified_bitmap
i = VB.State_missing
)
1206 t.t_blocks_of_chunk
.(j) then
1207 VB.set
t.t_converted_verified_bitmap
j VB.State_missing
1208 | VB.State_complete
->
1209 lprintf_nl "set_swarmer_state_missing: invalidating a block within a completed chunk?"
1210 | VB.State_verified
->
1211 lprintf_nl "set_swarmer_state_missing: invalidating a block within a verified chunk?"
1213 | VB.State_missing
-> ()
1214 | VB.State_partial
->
1215 lprintf_nl "set_swarmer_state_missing: invalidating a partial block ?"
1217 (* we have started downloading this block, so mark all containing chunks
1219 let set_swarmer_state_partial s i =
1220 match VB.get
s.s_verified_bitmap
i with
1221 | VB.State_missing
->
1222 VB.set
s.s_verified_bitmap
i VB.State_partial
;
1224 let j = t.t_chunk_of_block
.(i) in
1225 match VB.get
t.t_converted_verified_bitmap
j with
1226 | VB.State_missing
->
1227 VB.set
t.t_converted_verified_bitmap
j VB.State_partial
1228 | VB.State_partial
-> ()
1229 | VB.State_complete
->
1230 lprintf_nl "set_swarmer_state_partial: partial block within a completed chunk?"
1231 | VB.State_verified
->
1232 lprintf_nl "set_swarmer_state_partial: partial block within a verified chunk?"
1234 | VB.State_partial
-> ()
1235 | VB.State_complete
->
1236 lprintf_nl "set_swarmer_state_partial: trying to demote a completed block?"
1237 | VB.State_verified
->
1238 lprintf_nl "set_swarmer_state_partial: trying to demote a verified block?"
1241 (* we finished this block, trying to escalate to primary frontend
1242 verification bitmap *)
1243 let set_swarmer_state_complete s i =
1244 match VB.get
s.s_verified_bitmap
i with
1245 | VB.State_missing
| VB.State_partial
->
1246 (VB.set
s.s_verified_bitmap
i VB.State_complete
;
1247 match s.s_networks
with
1249 assert (t.t_primary
);
1250 let j = t.t_chunk_of_block
.(i) in
1251 (match VB.get
t.t_converted_verified_bitmap
j with
1252 | VB.State_missing
| VB.State_partial
->
1253 if List.for_all
(fun i -> VB.get
s.s_verified_bitmap
i = VB.State_complete
)
1254 t.t_blocks_of_chunk
.(j) then begin
1255 t.t_ncomplete_chunks
<- t.t_ncomplete_chunks
+ 1;
1256 VB.set
t.t_converted_verified_bitmap
j VB.State_complete
1258 | VB.State_complete
-> ()
1259 | VB.State_verified
->
1260 (* lprintf_nl "set_swarmer_state_complete: trying to demote a verified block? (1)" *)
1262 | [] -> assert false)
1263 | VB.State_complete
-> ()
1264 | VB.State_verified
->
1265 (* lprintf_nl "set_swarmer_state_complete: trying to demote a verified block? (2)" *)
1266 VB.set
s.s_verified_bitmap
i VB.State_complete
;
1267 match s.s_networks
with
1269 assert (t.t_primary
);
1270 let j = t.t_chunk_of_block
.(i) in
1271 (match VB.get
t.t_converted_verified_bitmap
j with
1272 | VB.State_verified
->
1273 VB.set
t.t_converted_verified_bitmap
j VB.State_complete
;
1274 t.t_nverified_chunks
<- t.t_nverified_chunks
- 1;
1275 | VB.State_complete
-> ()
1276 | VB.State_missing
| VB.State_partial
->
1277 lprintf_nl "BUG: set_swarmer_state_complete: demoting a verified block from an incomplete chunk")
1278 | [] -> assert false
1281 (* the primary verifier has worked, so let ask secondary ones for
1283 let set_swarmer_state_verified s i =
1284 match VB.get
s.s_verified_bitmap
i with
1285 | VB.State_missing
| VB.State_partial
| VB.State_complete
->
1286 (VB.set
s.s_verified_bitmap
i VB.State_verified
;
1287 (* lprintf "set_swarmer_state_verified %d done\n" i; *)
1288 match s.s_networks
with
1289 | [] -> assert false
1290 | tprim
:: secondaries
->
1291 assert (tprim
.t_primary
);
1292 (* that test is somewhat redundant, since only primary
1293 frontends with verification can have merged secondary
1294 frontends; See merge *)
1295 match tprim
.t_verifier
with
1296 | NoVerification
| VerificationNotAvailable
-> ()
1297 | Verification _
| ForceVerification
->
1298 let jprim = tprim
.t_chunk_of_block
.(i) in
1299 assert (VB.get tprim
.t_converted_verified_bitmap
jprim = VB.State_verified
);
1301 assert (not
t.t_primary
);
1302 let j = t.t_chunk_of_block
.(i) in
1303 if List.for_all
(fun i -> VB.get
s.s_verified_bitmap
i = VB.State_verified
)
1304 t.t_blocks_of_chunk
.(j) then
1305 match t.t_verifier
with
1306 | NoVerification
| VerificationNotAvailable
->
1307 (* we have no way to check data integrity
1308 for this network, assume other(s) know
1310 (match VB.get
t.t_converted_verified_bitmap
j with
1311 | VB.State_missing
| VB.State_partial
->
1312 VB.set
t.t_converted_verified_bitmap
j VB.State_verified
;
1313 t.t_ncomplete_chunks
<- t.t_ncomplete_chunks
+ 1;
1314 t.t_nverified_chunks
<- t.t_nverified_chunks
+ 1
1315 | VB.State_complete
->
1316 VB.set
t.t_converted_verified_bitmap
j VB.State_verified
;
1317 t.t_nverified_chunks
<- t.t_nverified_chunks
+ 1
1318 | VB.State_verified
-> ())
1321 (* all chunks are verified, so set
1322 converted_verified_bitmap to State_complete,
1323 probably to trigger data verification later.
1325 Is that code necessary at all ? *)
1326 (match VB.get
t.t_converted_verified_bitmap
j with
1327 | VB.State_missing
| VB.State_partial
->
1328 VB.set
t.t_converted_verified_bitmap
j VB.State_complete
;
1329 t.t_ncomplete_chunks
<- t.t_ncomplete_chunks
+ 1
1330 | VB.State_complete
-> ()
1331 | VB.State_verified
->
1332 lprintf_nl "set_swarmer_state_verified: trying to demote a verified block in another frontend?")
1334 | VB.State_verified
-> ()
1336 (** set block as completed, closing all remaining ranges, and
1337 incrementing amount downloaded by their total size.
1338 If the block was empty its whole size is added *)
1340 let set_completed_block maybe_t
s i =
1341 let mark_completed () =
1342 set_swarmer_state_complete s i;
1343 s.s_blocks
.(i) <- CompleteBlock
in
1344 match s.s_blocks
.(i) with
1345 | CompleteBlock
| VerifiedBlock
-> ()
1347 let block_begin = compute_block_begin s i in
1348 let block_end = compute_block_end s i in
1349 add_file_downloaded maybe_t
s (block_end -- block_begin);
1352 close_block_ranges maybe_t
s b;
1355 (** set block as verified, closing all remaining ranges, and
1356 incrementing amount downloaded by their total size.
1357 If the block was empty its whole size is added
1359 (is it normal that no maybe_t can be provided ? my guess is that
1360 this function is always called on behalf of a primary frontend) *)
1362 let set_verified_block s j =
1363 match s.s_blocks
.(j) with
1364 | VerifiedBlock
-> ()
1366 set_completed_block None
s j;
1367 s.s_blocks
.(j) <- VerifiedBlock
;
1368 set_swarmer_state_verified s j
1370 (*************************************************************************)
1372 (* frontends verified bitmaps *)
1374 (*************************************************************************)
1376 (* We've seen how swarmer verification propagates to the frontend(s)
1377 verifications, now let's see the reverse *)
1379 let set_frontend_state_missing t j =
1380 assert(VB.get
t.t_converted_verified_bitmap
j = VB.State_complete
);
1382 assert(List.for_all
(fun i -> VB.get
s.s_verified_bitmap
i <> VB.State_verified
) t.t_blocks_of_chunk
.(j));
1383 t.t_ncomplete_chunks
<- t.t_ncomplete_chunks
- 1;
1384 if List.for_all
(fun i -> VB.get
s.s_verified_bitmap
i = VB.State_complete
) t.t_blocks_of_chunk
.(j) then begin
1385 if !verbose_swarming
|| !verbose
then
1386 lprintf_nl "Complete block %d/%d of %s failed verification, reloading..."
1387 (j + 1) t.t_nchunks
(file_best_name
t.t_file
);
1389 VB.set
t.t_converted_verified_bitmap
j VB.State_missing
;
1391 match s.s_blocks
.(i) with
1392 | EmptyBlock
-> set_swarmer_state_missing s i
1393 | PartialBlock _
-> set_swarmer_state_partial s i
1395 let block_begin = compute_block_begin s i in
1396 let block_end = compute_block_end s i in
1398 add_file_downloaded None
s (block_begin -- block_end);
1400 s.s_blocks
.(i) <- EmptyBlock
;
1401 set_swarmer_state_missing s i
1403 | VerifiedBlock
-> assert false
1404 ) t.t_blocks_of_chunk
.(j)
1407 (* afaiu not supposed to happen, so this code is for debugging ? *)
1408 if !verbose_swarming
then begin
1410 lprintf_n " Swarmer was incomplete: ";
1412 lprintf
"%c" (VB.state_to_char
(VB.get
s.s_verified_bitmap
i));
1413 if VB.get
s.s_verified_bitmap
i = VB.State_complete
then incr
nsub;
1414 ) t.t_blocks_of_chunk
.(j);
1415 lprintf_nl " = %d/%d" !nsub (List.length
t.t_blocks_of_chunk
.(j))
1417 VB.set
t.t_converted_verified_bitmap
j VB.State_partial
1420 (* aka set_completed_chunk (internal) *)
1421 let set_frontend_state_complete t j =
1422 match VB.get
t.t_converted_verified_bitmap
j with
1423 | VB.State_missing
| VB.State_partial
->
1424 if (not
!CommonGlobals.is_startup_phase
) && (!verbose_swarming
|| !verbose
) then
1425 lprintf_nl "Completed block %d/%d of %s"
1426 (j + 1) t.t_nchunks
(file_best_name
t.t_file
);
1428 List.iter (fun i -> set_completed_block None
s i)
1429 t.t_blocks_of_chunk
.(j)
1430 | VB.State_complete
| VB.State_verified
-> ()
1432 (* aka set_verified_chunk (internal) *)
1433 let set_frontend_state_verified t j =
1434 let mark_verified () =
1435 VB.set
t.t_converted_verified_bitmap
j VB.State_verified
;
1436 if (not
!CommonGlobals.is_startup_phase
) && (!verbose_swarming
|| !verbose
) then
1437 lprintf_nl "Verified block %d/%d of %s"
1438 (j + 1) t.t_nchunks
(file_best_name
t.t_file
);
1439 if t.t_primary
then begin
1441 (* The primary is supposed to propagate verified chunks to the file *)
1442 List.iter (fun i -> set_verified_block s i) t.t_blocks_of_chunk
.(j);
1443 if !verbose_swarming
then
1444 print_s "VERIFIED" s
1446 t.t_verified
t.t_nverified_chunks
j in
1447 if j = 0 && !Autoconf.magic_works
then check_magic
t.t_file
;
1448 match VB.get
t.t_converted_verified_bitmap
j with
1449 | VB.State_missing
| VB.State_partial
->
1450 t.t_ncomplete_chunks
<- t.t_ncomplete_chunks
+ 1;
1451 t.t_nverified_chunks
<- t.t_nverified_chunks
+ 1;
1453 | VB.State_complete
->
1454 t.t_nverified_chunks
<- t.t_nverified_chunks
+ 1;
1456 | VB.State_verified
-> ()
1458 let set_chunks_verified_bitmap t bitmap
=
1459 VB.iteri
(fun j c
->
1461 | VB.State_missing
| VB.State_partial
->
1463 | VB.State_complete
->
1464 set_frontend_state_complete t j
1465 | VB.State_verified
->
1466 set_frontend_state_verified t j;
1467 if VB.get
t.t_converted_verified_bitmap
j <> VB.State_verified
then
1468 lprintf_nl "FIELD AS BEEN CLEARED"
1471 let chunks_verified_bitmap t =
1472 t.t_converted_verified_bitmap
1474 (** Check the equality of the hash of [t]'s data between offsets
1475 [begin_pos] and [end_pos] against the value of [uid] *)
1477 (*************************************************************************)
1479 (* verify_chunk (internal) *)
1481 (*************************************************************************)
1483 let verify_chunk t j =
1484 let verify t uid begin_pos end_pos
=
1485 file_verify
t.t_file uid begin_pos end_pos
in
1487 if VB.get
t.t_converted_verified_bitmap
j = VB.State_complete
then
1488 let nchunks = VB.length
t.t_converted_verified_bitmap
in
1489 match t.t_verifier
with
1491 | VerificationNotAvailable
-> ()
1493 | ForceVerification
->
1494 set_frontend_state_verified t j
1496 | Verification uids
when Array.length uids
= nchunks ->
1500 let chunk_begin = t.t_chunk_size
*.. j in
1501 let chunk_end = chunk_begin ++ t.t_chunk_size
in
1502 let chunk_end = min
chunk_end s.s_size
in
1503 if verify t uids
.(j) chunk_begin chunk_end then
1504 set_frontend_state_verified t j
1506 set_frontend_state_missing t j
1507 with VerifierNotReady
-> ())
1509 | Verification chunks
->
1510 (* network only provides a hash for the whole file ? *)
1511 assert (Array.length chunks
= 1);
1512 (* let nchunks = String.length t.t_converted_verified_bitmap in *)
1514 if VB.for_all
(function
1515 | VB.State_missing
| VB.State_partial
-> false
1516 | VB.State_complete
| VB.State_verified
-> true) t.t_converted_verified_bitmap
then
1519 if verify t chunks
.(0) zero
s.s_size
then
1520 VB.iteri
(fun j _
->
1521 set_frontend_state_verified t j
1522 ) t.t_converted_verified_bitmap
1524 VB.iteri
(fun j c
->
1525 if c
= VB.State_complete
then set_frontend_state_missing t j
1526 ) t.t_converted_verified_bitmap
1527 with VerifierNotReady
-> ()
1530 (** mark a block as completed, ready for verification *)
1532 let must_verify_block s i =
1533 set_swarmer_state_complete s i
1535 (** mark all blocks as completed, ready for verification *)
1537 let verify_all_chunks t =
1539 VB.iteri
(fun i state
->
1541 | VB.State_verified
->
1542 must_verify_block s i
1543 | VB.State_missing
->
1544 let block_begin = compute_block_begin s i in
1545 let block_end = compute_block_end s i in
1546 add_file_downloaded None
s (block_end -- block_begin);
1547 must_verify_block s i
1548 | VB.State_partial
| VB.State_complete
-> ()
1549 ) s.s_verified_bitmap
1551 (** same, and synchronously calls the verification of all chunks *)
1553 let verify_all_chunks_immediately t =
1554 verify_all_chunks t;
1555 VB.iteri
(fun i _
-> verify_chunk t i) t.t_converted_verified_bitmap
1558 (** synchronously verify all completed chunks not yet verified *)
1560 let compute_bitmap t =
1561 if t.t_ncomplete_chunks
> t.t_nverified_chunks
then
1562 VB.iteri
(fun i c
->
1563 if c
= VB.State_complete
then verify_chunk t i) t.t_converted_verified_bitmap
1566 (** Replaces the ith block of the swarmer with a PartialBlock
1567 ranges are created with s_range_size size *)
1571 let block_begin = compute_block_begin s i in
1572 let block_end = compute_block_end s i in
1573 let block_size = block_end -- block_begin in
1577 block_begin = block_begin;
1578 block_end = block_end;
1579 block_ranges
= range
;
1581 block_remaining
= block_size;
1582 block_unselected_remaining
= block_size;
1588 range_begin
= block_begin;
1589 range_end
= block_end;
1591 range_nuploading
= 0;
1594 (* lprintf "New block %Ld-%Ld\n" block_begin block_end; *)
1596 s.s_blocks
.(i) <- PartialBlock
b;
1597 if VB.get
s.s_verified_bitmap
i = VB.State_missing
then
1598 set_swarmer_state_partial s i;
1599 if debug_all then lprintf_nl "NB[%s]" (VB.to_string
s.s_verified_bitmap
);
1602 (** Remove an interval from the beginning of a range, adding the size
1603 of the removed part to the downloaded amount
1604 Closed ranges are removed
1605 When last range is removed, mark the block for verification *)
1607 let range_received maybe_t
r interval_begin interval_end =
1608 (* lprintf " range_received: %Ld-%Ld for %Ld-%Ld\n"
1609 interval_begin interval_end r.range_begin r.range_end; *)
1610 (* interval overlap with the beginning of range ? *)
1611 (* was: r.range_begin < interval_end && r.range_end > interval_begin *)
1612 if r.range_begin
>= interval_begin &&
1613 r.range_begin
< interval_end then begin
1614 (* lprintf "... entered\n"; *)
1616 max
(min
interval_end r.range_end
) r.range_begin
in
1617 let downloaded = new_begin -- r.range_begin
in
1618 let b = r.range_block
in
1619 let s = b.block_s
in
1620 add_file_downloaded maybe_t
s downloaded;
1621 b.block_remaining
<- b.block_remaining
-- downloaded;
1622 if r.range_nuploading
= 0 then
1623 b.block_unselected_remaining
<- b.block_unselected_remaining
-- downloaded;
1624 r.range_begin
<- new_begin;
1625 if r.range_begin
= r.range_end
then begin
1626 (* range completed, unlink it *)
1627 (match r.range_next
with
1628 | Some rr
-> rr
.range_prev
<- r.range_prev
1630 (match r.range_prev
with
1631 | Some rr
-> rr
.range_next
<- r.range_next
1633 (* that was the first range of the block *)
1634 match r.range_next
with
1635 | Some rr
-> (* fix block's first range *)
1636 b.block_ranges
<- rr
1637 | None
-> (* that was the last remaining range of the block *)
1638 match s.s_blocks
.(b.block_num
) with
1639 | PartialBlock _
| EmptyBlock
->
1640 (match s.s_networks
with
1642 assert(t.t_primary
);
1643 (match t.t_verifier
with
1645 set_verified_block s b.block_num
1647 set_completed_block (Some
t) s b.block_num
;
1648 must_verify_block s b.block_num
)
1649 | [] -> assert false)
1651 r.range_next
<- None
;
1652 r.range_prev
<- None
;
1654 lprintf " ... new range %Ld-%Ld\n" r.range_begin r.range_end;
1658 (** Split a range at [cut_pos] offset, if needed;
1659 ranges stay linked together *)
1661 let rec split_range r cut_pos
=
1662 (* lprintf " split_range: cut_pos %Ld\n" cut_pos; *)
1663 if r.range_begin
< cut_pos
&& r.range_end
> cut_pos
then
1666 range_block
= r.range_block
;
1667 range_nuploading
= 0;
1668 range_next
= r.range_next
;
1669 range_prev
= Some
r;
1670 range_begin
= cut_pos
;
1671 range_end
= r.range_end
;
1673 (match r.range_next
with
1675 | Some old_next_range
->
1676 old_next_range
.range_prev
<- Some
split_r);
1678 r.range_next
<- Some
split_r;
1679 r.range_end
<- cut_pos
1680 (* lprintf " NEW RANGE: %Ld- OLD RANGE: %Ld-%Ld\n"
1681 split_r.range_begin r.range_begin r.range_end; *)
1683 (** Remove an interval from the ranges of a block, calling
1684 range_received over all of them
1686 Assumption: we never download ranges from the middle, so present
1687 intervals can only overlap the beginning of ranges
1689 A (double linked) list is definitely not the most efficient
1690 datastructure for this operation... *)
1692 let set_present_block b interval_begin interval_end =
1693 let interval_size = interval_end -- interval_begin in
1694 let old_remaining = b.block_remaining
in
1695 (* download can only happen at the beginning of ranges, so we must
1696 first split at each interval beginning *)
1697 iter_block_ranges (fun r ->
1698 split_range r interval_begin) b;
1699 iter_block_ranges (fun r ->
1700 range_received None
r interval_begin interval_end) b;
1701 let new_present = old_remaining -- b.block_remaining
in
1702 if new_present <> interval_size then
1703 lprintf_nl "set_present_block: %Ld added <> %Ld effectively added"
1704 interval_size new_present
1706 (** Remove a list of intervals from the ranges of a swarmer *)
1708 let set_present s intervals
=
1709 iter_intervals s (fun i block_begin block_end interval_begin interval_end ->
1710 (* lprintf "interval: %Ld-%Ld in block %d [%Ld-%Ld]\n"
1711 interval_begin interval_end i block_begin block_end; *)
1712 match s.s_blocks
.(i) with
1714 (* lprintf " EmptyBlock"; *)
1715 if block_begin >= interval_begin && block_end <= interval_end
1717 (* lprintf " --> CompleteBlock\n"; *)
1718 s.s_blocks
.(i) <- CompleteBlock
;
1719 must_verify_block s i;
1720 add_file_downloaded None
s (block_end -- block_begin)
1723 let b = new_block s i in
1724 (* lprintf " ... set_present_block\n"; *)
1725 set_present_block b interval_begin interval_end
1727 (* lprintf " PartialBlock\n"; *)
1728 set_present_block b interval_begin interval_end
1729 | CompleteBlock
| VerifiedBlock
->
1730 (* lprintf " Other\n"; *)
1733 match s.s_networks
with
1735 List.iter (fun (interval_begin, interval_end) ->
1736 mark_disk_space_preallocated tprim
interval_begin interval_end;
1738 | [] -> assert false
1740 (** reverse absent/present in the list and call set_present *)
1742 let set_absent s list_absent
=
1743 (** Build the complementary list of intervals of [intervals] in
1744 [set_begin, set_end[ *)
1745 let rec complementary acc set_begin set_end intervals
=
1746 match intervals
with
1749 if set_begin
= set_end
then acc else
1750 (set_begin
, set_end
) :: acc
1753 | (interval_begin, interval_end) :: other_intervals
->
1755 if set_begin
= interval_begin then acc
1756 else (set_begin
, interval_begin) :: acc
1758 complementary acc interval_end set_end other_intervals
in
1759 let list_present = complementary [] Int64.zero
s.s_size list_absent
in
1760 set_present s list_present
1762 let intervals_to_string s intervals
=
1763 match intervals
with
1764 | AvailableIntervals intervals
->
1765 let st = VB.create (Array.length
s.s_blocks
) VB.State_missing
in
1766 iter_intervals s (fun i _ _ _ _
-> VB.set
st i VB.State_partial
) intervals
;
1768 | AvailableBitv
b -> Bitv.to_string
b
1770 (*************************************************************************)
1774 (*************************************************************************)
1776 (** (debug) output an [uploader] to current log *)
1778 let print_uploader up
=
1779 lprintf_n " interesting complete_blocks: %d\n " up
.up_ncomplete
;
1780 Array.iter (fun i -> lprintf
" %d " i) up
.up_complete_blocks
;
1782 lprintf_n " interesting partial_blocks: %d\n " up
.up_npartial
;
1783 Array.iter (fun (i, begin_pos
, end_pos
) ->
1784 lprintf
" %d[%Ld...%Ld] " i begin_pos end_pos
1785 ) up
.up_partial_blocks
;
1788 (** if not [up_declared],
1789 sets [up_intervals], [up_complete_blocks], [up_ncomplete],
1790 [up_partial_blocks], [up_npartial] according to [intervals],
1791 resets [up_blocks], and calls
1792 [client_has_bitmap] on associated client.
1794 My feeling is that if all those fields only make sense when
1795 up_declared is true, they should be regrouped in a record option.
1798 let set_uploader_intervals up intervals
=
1799 if up
.up_declared
then
1800 lprintf_nl "set_uploader_intervals: called on an already declared uploader\n"
1804 (* INVARIANT: complete_blocks must be in reverse order *)
1806 let complete_blocks = ref [] in
1807 let partial_blocks = ref [] in
1809 let incr_availability s i =
1810 s.s_availability
.(i) <- s.s_availability
.(i) + 1 in
1812 (match intervals
with
1813 | AvailableIntervals intervals
->
1814 iter_intervals s (fun i block_begin block_end interval_begin interval_end ->
1815 (* lprintf "iter_intervals %d %Ld-%Ld %Ld-%Ld\n"
1816 i block_begin block_end interval_begin interval_end; *)
1817 incr_availability s i;
1819 match s.s_blocks
.(i) with
1820 | CompleteBlock
| VerifiedBlock
-> ()
1821 | EmptyBlock
| PartialBlock _
->
1822 if block_begin = interval_begin && block_end = interval_end then
1823 complete_blocks := i :: !complete_blocks
1826 (i, interval_begin, interval_end) :: !partial_blocks
1829 | AvailableBitv bitmap
->
1830 Bitv.iteri_true
(fun i ->
1832 incr_availability s j;
1833 complete_blocks := j :: !complete_blocks
1834 ) t.t_blocks_of_chunk
.(i)
1839 (* s.s_last_seen.(i) <- BasicSocket.last_time (); *)
1840 let i = t.t_chunk_of_block
.(i) in
1841 t.t_last_seen
.(i) <- BasicSocket.last_time
()
1844 let complete_blocks = Array.of_list
!complete_blocks in
1845 let partial_blocks = Array.of_list
!partial_blocks in
1846 up
.up_intervals
<- intervals
;
1848 up
.up_complete_blocks
<- complete_blocks;
1849 up
.up_ncomplete
<- Array.length
complete_blocks;
1851 if Array.length
partial_blocks > 0 then
1852 lprintf_nl "WARNING: partial_blocks = %d" (Array.length
partial_blocks);
1853 up
.up_partial_blocks
<- partial_blocks;
1854 up
.up_npartial
<- Array.length
partial_blocks;
1858 up
.up_declared
<- true;
1860 let bm = intervals_to_string s intervals
in
1861 client_has_bitmap up
.up_client up
.up_t
.t_file
bm;
1863 if debug_all then print_uploader up
1865 (*************************************************************************)
1867 (* register_uploader *)
1869 (*************************************************************************)
1871 let register_uploader t client intervals
=
1877 up_declared
= false;
1878 up_intervals
= intervals
;
1880 up_complete_blocks
= [||];
1883 up_partial_blocks
= [||];
1891 HU.add
uploaders_by_num up;
1892 set_uploader_intervals up intervals
;
1895 (*************************************************************************)
1897 (* unregister_uploader *)
1899 (*************************************************************************)
1901 let clear_uploader_ranges up =
1902 List.iter (fun (_
,_
,r) ->
1903 if r.range_nuploading
> 0 then
1904 r.range_nuploading
<- r.range_nuploading
- 1
1906 lprintf_nl "clear_uploader_ranges: some range_nuploading was about to become negative\n";
1907 if r.range_nuploading
= 0 then
1908 let b = r.range_block
in
1909 b.block_unselected_remaining
<-
1910 b.block_unselected_remaining
++ (compute_range_size r);
1911 if b.block_unselected_remaining
> b.block_remaining
then
1912 lprintf_nl "clear_uploader_ranges: block_unselected_remaining larger than block_remaining!";
1916 let clear_uploader_blocks up =
1918 let num = b.up_block
.block_num
in
1922 lprintf_nl "Client %d unselect %d" (client_num
up.up_client
) num;
1923 if s.s_nuploading
.(num) > 0 then
1924 s.s_nuploading
.(num) <- s.s_nuploading
.(num) - 1
1926 lprintf_nl "clear_uploader_blocks: some s_nuploading was about to
1931 let clear_uploader_intervals up =
1932 if up.up_declared
then
1933 let decr_availability s i =
1934 if s.s_availability
.(i) > 0 then
1935 s.s_availability
.(i) <- s.s_availability
.(i) - 1
1937 lprintf_nl "clear_uploader_intervals: some s_availability was about to become negative\n" in
1938 (* lprintf "clean_uploader_chunks:\n"; *)
1941 Array.iter (decr_availability s) up.up_complete_blocks
;
1942 up.up_complete_blocks
<- [||];
1943 up.up_ncomplete
<- 0;
1944 Array.iter (fun (b,_
,_
) -> decr_availability s b) up.up_partial_blocks
;
1945 up.up_partial_blocks
<- [||];
1946 up.up_npartial
<- 0;
1947 clear_uploader_blocks up;
1948 up.up_declared
<- false
1950 let update_uploader_intervals up intervals
=
1951 clear_uploader_intervals up;
1952 set_uploader_intervals up intervals
1954 let unregister_uploader up =
1955 clear_uploader_intervals up;
1956 clear_uploader_ranges up
1958 (** (debug) output the uploaders of a swarmer to current log *)
1960 let print_uploaders s =
1961 Array.iteri
(fun i b ->
1963 | EmptyBlock
-> lprintf
"_"
1964 | CompleteBlock
-> lprintf
"C"
1965 | VerifiedBlock
-> lprintf
"V"
1967 if s.s_nuploading
.(i) > 9 then
1970 lprintf
"%d" s.s_nuploading
.(i)
1973 Array.iteri
(fun i b ->
1975 | EmptyBlock
-> lprintf
"_"
1976 | CompleteBlock
-> lprintf
"C"
1977 | VerifiedBlock
-> lprintf
"V"
1979 lprintf
"{ %d : %d=" b.block_num
1980 s.s_nuploading
.(b.block_num
);
1981 iter_block_ranges (fun r ->
1982 lprintf
"(%d)" r.range_nuploading
1988 (** (see uploaders invariants above)
1989 Drop the [n]th element from the [up.up_ncomplete] first elements
1990 of [up.complete_blocks] by swapping it with the
1991 ([up.up_ncomplete]-1)th element, then decrease [up.up_ncomplete];
1992 Then return that element, after converting associated block to
1993 PartialBlock if necessary.
1996 let permute_and_return up n
=
1997 assert (n
<= up.up_ncomplete
-1);
1998 let b = up.up_complete_blocks
.(n
) in
1999 if debug_all then lprintf_nl "permute_and_return %d <> %d" n
b;
2000 if n
< up.up_ncomplete
then begin
2001 up.up_complete_blocks
.(n
) <- up.up_complete_blocks
.(up.up_ncomplete
-1);
2002 up.up_complete_blocks
.(up.up_ncomplete
-1) <- b
2004 up.up_ncomplete
<- up.up_ncomplete
- 1;
2007 match s.s_blocks
.(b) with
2009 let b = new_block s b in
2012 up_block_begin
= b.block_begin;
2013 up_block_end
= b.block_end }
2017 up_block_begin
= b.block_begin;
2018 up_block_end
= b.block_end }
2020 lprintf_nl "ERROR: verified block in permute_and_return %d\n" b;
2023 lprintf_nl "ERROR: complete block in permute_and_return %d\n" b;
2026 (** find a block in up_complete_blocks that's not already
2027 CompleteBlock or VerifiedBlock.
2028 If none can be found, do the same with up_partial_blocks.
2029 If none can be found still, raise Not_found exception
2031 up_ncomplete and up_npartial are used as in the same way as in
2032 permute_and_return, but no element is ever permuted.
2034 Since set_uploader_intervals puts the blocks with the lowest
2035 offsets at the end of up_complete_blocks and up_partial_blocks,
2036 this also selects the blocks in increasing offsets order.
2039 let linear_select_blocks up =
2040 let rec iter_partial up =
2041 let n = up.up_npartial
in
2042 if n = 0 then raise Not_found
;
2043 let b, block_begin, block_end = up.up_partial_blocks
.(n-1) in
2044 up.up_npartial
<- n-1;
2047 if s.s_priorities_bitmap
.[b] = priority_zero then iter_partial up else
2048 let chunk = t.t_chunk_of_block
.(b) in
2049 match s.s_blocks
.(b) with
2050 | CompleteBlock
| VerifiedBlock
->
2056 up_block_begin
= block_begin;
2057 up_block_end
= block_end }]
2059 let b = new_block s b in
2063 up_block_begin
= block_begin;
2064 up_block_end
= block_end }] in
2065 let rec iter_complete up =
2066 let n = up.up_ncomplete
in
2067 if n = 0 then iter_partial up
2069 let b = up.up_complete_blocks
.(n-1) in
2070 up.up_ncomplete
<- n-1;
2073 if s.s_priorities_bitmap
.[b] = priority_zero then iter_complete up else
2074 let chunk = t.t_chunk_of_block
.(b) in
2075 match s.s_blocks
.(b) with
2076 | CompleteBlock
| VerifiedBlock
->
2082 up_block_begin
= b.block_begin;
2083 up_block_end
= b.block_end }]
2085 let b = new_block s b in
2089 up_block_begin
= b.block_begin;
2090 up_block_end
= b.block_end }]
2094 (** Check whether block [n] of swarmer [s] is not already downloaded
2097 Chunk verification may be called if the block is completed but not
2101 let should_download_block s n =
2102 (* lprintf "should_download_block %d\n" n; *)
2104 match VB.get
s.s_verified_bitmap
n with
2105 | VB.State_missing
| VB.State_partial
-> true
2106 | VB.State_complete
->
2107 (match s.s_networks
with
2109 assert(t.t_primary
);
2111 let n = t.t_chunk_of_block
.(n) in
2112 if VB.get
t.t_converted_verified_bitmap
n = VB.State_complete
then
2114 with VerifierNotReady
-> ());
2115 | [] -> assert false);
2116 (match VB.get
s.s_verified_bitmap
n with
2117 | VB.State_missing
| VB.State_partial
-> true
2118 | VB.State_complete
| VB.State_verified
-> false)
2119 | VB.State_verified
-> false
2122 lprintf "should_download_block %d\n" n; *)
2125 (*************************************************************************)
2127 (* select_blocks (internal) *)
2129 (*************************************************************************)
2134 choice_user_priority
: int;
2135 choice_remaining
: int64
;
2136 choice_unselected_remaining
: int64
;
2137 choice_preallocated
: bool;
2140 let dummy_choice = {
2143 choice_user_priority
= 0;
2144 choice_remaining
= 0L;
2145 choice_unselected_remaining
= 0L;
2146 choice_preallocated
= false;
2149 (* Return the best list of blocks to ask from an uploader
2150 All the blocks from the list must come from the same chunk *)
2152 let select_blocks up =
2156 match s.s_strategy
with
2158 linear_select_blocks up
2160 if up.up_ncomplete
= 0 && up.up_npartial
= 0 then raise Not_found
;
2162 (* to evaluate the relative rarity of a block, we must compare it to
2163 the availability of *all* blocks, not only those available from
2165 let sum_availability = Array.fold_left
(+) 0 s.s_availability
in
2166 let mean_availability = sum_availability / Array.length
s.s_blocks
in
2168 let my_t = if t.t_verifier
<> NoVerification
then t else
2169 match s.s_networks
with
2171 assert(tprim
.t_primary
);
2173 | [] -> assert false in
2174 let verification_available = my_t.t_verifier
<> NoVerification
in
2176 let several_frontends = List.length
s.s_networks
> 1 in
2177 (* compute the number of missing or partial blocks in the same
2178 chunks (for all networks) as a given block *)
2179 (* memoize some results *)
2180 let memoization_calls = ref 0 in
2181 let memoization_hits = ref 0 in
2182 let debug_memoization = false in
2184 incr
memoization_calls;
2186 let result = Hashtbl.find h p
in
2187 incr
memoization_hits;
2188 if debug_memoization then
2189 (* defeats the purpose of memoization, only enable for
2191 let recomputed_result = f p
in
2192 if result <> recomputed_result then begin
2193 lprintf_nl "memoization failure";
2199 Hashtbl.add h p
result;
2201 let memoize_remaining_blocks_in_chunk = Hashtbl.create 17 in
2202 let memoize_remaining_blocks_in_chunks = Hashtbl.create 17 in
2204 let remaining_blocks_in_chunks i =
2205 assert (i >= 0 && i < VB.length
s.s_verified_bitmap
);
2206 memoize memoize_remaining_blocks_in_chunks
2208 List.fold_left
(fun acc t ->
2210 (memoize memoize_remaining_blocks_in_chunk
2212 let chunk = t.t_chunk_of_block
.(i) in
2213 List.fold_left
(fun acc b ->
2215 (match VB.get
s.s_verified_bitmap
b with
2216 | VB.State_missing
| VB.State_partial
-> true
2217 | VB.State_complete
| VB.State_verified
-> false) then acc + 1
2218 else acc) 0 t.t_blocks_of_chunk
.(chunk)) (t.t_num
, i))
2219 ) 0 s.s_networks
) i in
2222 let preview_beginning = 9000000L in
2223 let preview_end = (s.s_size ** 98L) // 100L in
2226 (* sources_per_chunk was initially for edonkey only *)
2227 let data_per_source = 9728000L // (Int64.of_int
!!sources_per_chunk
) in
2229 let need_to_complete_some_blocks_quickly =
2230 verification_available && t.t_nverified_chunks
< 2
2233 let create_choice n b =
2234 let block_begin = compute_block_begin s b in
2235 let block_end = compute_block_end s b in
2236 let size = block_end -- block_begin in
2237 let remaining, unselected_remaining
= match s.s_blocks
.(b) with
2238 | EmptyBlock
-> size, size
2239 | PartialBlock
b -> b.block_remaining
, b.block_unselected_remaining
2240 | CompleteBlock
| VerifiedBlock
-> 0L, 0L in
2244 choice_user_priority
= Char.code
s.s_priorities_bitmap
.[b];
2245 choice_remaining
= remaining;
2246 choice_preallocated
= is_fully_preallocated t block_begin block_end;
2247 choice_unselected_remaining
= unselected_remaining
;
2251 let choice_num choice
=
2252 choice
.choice_num in
2254 let _choice_block choice
=
2255 choice
.choice_block
in
2257 let choice_user_priority choice
=
2258 choice
.choice_user_priority in
2260 let choice_remaining choice
=
2261 choice
.choice_remaining in
2263 let choice_unselected_remaining choice
=
2264 choice
.choice_unselected_remaining in
2266 let choice_nuploaders choice
=
2267 s.s_nuploading
.(choice
.choice_block
) in
2269 let choice_remaining_per_uploader choice
=
2270 choice
.choice_unselected_remaining //
2271 (Int64.of_int
(choice_nuploaders choice
+ 1)) (* planned *) in
2273 (* has enough uploaders *)
2274 let choice_saturated choice
=
2275 choice
.choice_unselected_remaining <=
2276 Int64.of_int
(choice_nuploaders choice
) ** data_per_source in
2278 let choice_availability choice
=
2279 s.s_availability
.(choice
.choice_block
) in
2281 (* remaining blocks in the same chunk, for all frontends *)
2282 let choice_other_remaining choice
=
2283 remaining_blocks_in_chunks (choice
.choice_block
) in
2285 let choice_preallocated choice
=
2286 choice
.choice_preallocated in
2288 let print_choice c
=
2289 lprintf_nl "choice %d:%d priority:%d nup:%d rem:%Ld rmu:%Ld rpu:%Ld sat:%B sib:%d av:%d pre:%b"
2290 (choice_num c
) up.up_complete_blocks
.(choice_num c
)
2291 (choice_user_priority c
)
2292 (choice_nuploaders c
)
2293 (choice_remaining c
)
2294 (choice_unselected_remaining c
)
2295 (choice_remaining_per_uploader c
)
2296 (choice_saturated c
)
2297 (choice_other_remaining c
)
2298 (choice_availability c
)
2299 (choice_preallocated c
) in
2301 (** > 0 == c1 is best, < 0 = c2 is best, 0 == they're equivalent *)
2302 let compare_choices c1 c2
=
2304 (* Avoid stepping on each other's feet *)
2306 match choice_unselected_remaining c1
,
2307 choice_unselected_remaining c2
with
2312 if cmp <> 0 then cmp else
2314 (* avoid overly unbalanced situations *)
2316 match choice_saturated c1
, choice_saturated c2
with
2320 | true, true -> 0 in
2321 if cmp <> 0 then cmp else
2324 (* Do what Master asked for *)
2325 let cmp = compare
(choice_user_priority c1
)
2326 (choice_user_priority c2
) in
2327 if cmp <> 0 then cmp else
2329 (* "OPTIMIZATIONS" *)
2330 (* Pick really rare gems: if average availability of all
2331 blocks is higher than 5 connected sources, pick in
2332 priority blocks present in at most 3 connected sources;
2333 is that too restrictive ? *)
2335 if not
need_to_complete_some_blocks_quickly &&
2336 mean_availability > 5 &&
2337 (choice_availability c1
<= 3 ||
2338 choice_availability c2
<= 3) then
2339 compare
(choice_availability c2
)
2340 (choice_availability c1
)
2342 if cmp <> 0 then cmp else
2344 (* try to quickly complete (and validate) chunks;
2345 if there's only one frontend, each chunk has only one
2346 block, and looking at siblings make no sense *)
2348 if verification_available && several_frontends then
2349 compare
(choice_other_remaining c2
)
2350 (choice_other_remaining c1
)
2352 if cmp <> 0 then cmp else
2354 (* try to quickly complete blocks *)
2356 compare
(choice_unselected_remaining c2
)
2357 (choice_unselected_remaining c1
) in
2358 if cmp <> 0 then cmp else
2360 (* pick blocks that won't require allocating more disk space *)
2362 match choice_preallocated c1
, choice_preallocated c2
with
2366 if cmp <> 0 then cmp else
2372 (* compare a new chunk against a list of best choices numbers (and a
2373 specimen of best choice) *)
2374 let keep_best_chunks chunk_blocks_indexes best_choices specimen
=
2375 match chunk_blocks_indexes
with
2376 | [] -> best_choices
, specimen
2378 let this_chunk_specimen = List.fold_left
(fun acc n ->
2379 let choice = create_choice n up.up_complete_blocks
.(n) in
2380 let cmp = compare_choices choice acc in
2381 if cmp <= 0 then acc
2382 else choice) (create_choice h
up.up_complete_blocks
.(h
)) q
in
2383 let cmp = compare_choices this_chunk_specimen specimen
in
2384 if cmp < 0 then best_choices
, specimen
2385 else if cmp > 0 then [chunk_blocks_indexes
], this_chunk_specimen
2386 else chunk_blocks_indexes
:: best_choices
, specimen
in
2388 let current_chunk_num, current_chunk_blocks_indexes
,
2389 best_choices
, specimen
=
2390 Array2.subarray_fold_lefti
(fun
2391 ((current_chunk_num, current_chunk_blocks_indexes
,
2392 best_choices
, specimen
) as acc) n b ->
2393 if s.s_priorities_bitmap
.[b] = priority_zero ||
2394 not
(should_download_block s b) then acc
2396 let chunk_num = t.t_chunk_of_block
.(b) in
2397 if chunk_num = current_chunk_num then
2398 (current_chunk_num, n :: current_chunk_blocks_indexes
,
2399 best_choices
, specimen
)
2401 (* different chunk *)
2402 match current_chunk_blocks_indexes
with
2404 (* no previous chunk *)
2405 (chunk_num, [n], best_choices
, specimen
)
2407 let new_best_choices, new_specimen
=
2408 keep_best_chunks current_chunk_blocks_indexes best_choices specimen
in
2409 (chunk_num, [n], new_best_choices, new_specimen
)
2410 ) (-1, [], [], dummy_choice) up.up_complete_blocks
0
2411 (up.up_ncomplete
-1) in
2414 let best_choices, specimen
=
2415 keep_best_chunks current_chunk_blocks_indexes
best_choices specimen
in
2416 (* what about up_partial_blocks ?
2417 currently they're taken care of by linear_select_block
2420 if debug_all then print_choice specimen
;
2424 match best_choices with
2425 | [] -> raise Not_found
2426 | [choice] -> choice
2428 let nchoices = List.length
best_choices in
2429 List.nth
best_choices (Random.int nchoices) in
2433 | [] -> assert false
2434 | b :: _
-> t.t_chunk_of_block
.(up.up_complete_blocks
.(b)) in
2436 if debug_all || !verbose_swarming
then begin
2437 lprintf_nl "\nBlocksFound in %d: %d" chunk (List.length
blocks);
2439 lprintf_n " %d" up.up_complete_blocks
.(n)
2446 let probably_buggy =
2447 List.for_all
(fun n ->
2448 let block_num = up.up_complete_blocks
.(n) in
2449 match s.s_blocks
.(block_num) with
2450 | EmptyBlock
-> false
2452 block_ranges_for_all (fun r ->
2453 r.range_nuploading
> 0) b
2454 | CompleteBlock
| VerifiedBlock
->
2456 if probably_buggy then begin
2457 lprintf_nl "Probably buggy choice (%d):" chunk;
2458 Array2.subarray_fold_lefti
(fun () n b ->
2459 if s.s_priorities_bitmap
.[b] <> priority_zero &&
2460 should_download_block s b then
2461 let this_choice = create_choice n b in
2462 if List.mem
n blocks then lprintf
"** "
2463 else if List.exists
(List.mem
n) best_choices then
2466 print_choice this_choice;
2467 match s.s_blocks
.(b) with
2468 | EmptyBlock
| CompleteBlock
| VerifiedBlock
-> ()
2470 let total_uploading =
2471 block_ranges_fold (fun acc r ->
2472 lprintf
"%d " r.range_nuploading
;
2473 acc + r.range_nuploading
) 0 b in
2474 lprintf
"total=%d" total_uploading;
2476 ) () up.up_complete_blocks
0 (up.up_ncomplete
- 1);
2481 chunk, List.map
(fun n -> permute_and_return up n) blocks
2483 if !verbose_swarming
then
2484 lprintf_nl "select_block: fallback to linear strategy";
2485 linear_select_blocks up
2488 (* print_s "NO BLOCK FOUND" s; *)
2491 (** If uploader is associated to a file being downloaded,
2492 clear previously selected block (in any) and select best available
2493 block, according to block selection strategy
2494 @param up the uploader *)
2496 let find_blocks up =
2498 if debug_all then begin
2500 for i = 0 to up.up_ncomplete
- 1 do
2501 lprintf
"%d " up.up_complete_blocks
.(i)
2507 match file_state
t.t_file
with
2518 let num = b.up_block
.block_num in
2520 lprintf_nl "Client %d unselected %d" (client_num
up.up_client
) num;
2521 if s.s_nuploading
.(num) > 0 then
2522 s.s_nuploading
.(num) <- s.s_nuploading
.(num) - 1
2524 lprintf_nl "find_blocks: s_nuploading was about to become negative"
2528 let chunk, blocks = select_blocks up in
2530 let num = b.up_block
.block_num in
2532 lprintf_nl "Client %d selected %d" (client_num
up.up_client
) num;
2533 s.s_nuploading
.(num) <- s.s_nuploading
.(num) + 1;
2535 up.up_blocks
<- blocks;
2538 if debug_all then lprintf_nl "Exception %s in find_blocks" (Printexc2.to_string e
);
2541 (** Remove completed ranges from an uploader's range list, and
2542 decrease their reference counter *)
2544 let remove_completed_uploader_ranges up =
2545 let not_completed_ranges, completed_ranges
=
2546 List.partition
(fun (_
,_
,r) ->
2547 r.range_begin
< r.range_end
) up.up_ranges
in
2548 up.up_ranges
<- not_completed_ranges;
2549 List.iter (fun (_
,_
,r) ->
2550 if r.range_nuploading
> 0 then
2551 r.range_nuploading
<- r.range_nuploading
- 1
2553 lprintf_nl "remove_completed_uploader_ranges: range_nuploading
2554 was about to become negative!";
2555 if r.range_nuploading
= 0 then
2556 let b = r.range_block
in
2557 b.block_unselected_remaining
<-
2558 b.block_unselected_remaining
++ (compute_range_size r);
2559 if b.block_unselected_remaining
> b.block_remaining
then
2560 lprintf_nl "remove_completed_uploader_ranges: block_unselected_remaining is larger than block_remaining";
2563 (** uploader accessors *)
2565 let current_ranges up = up.up_ranges
2567 let current_blocks up =
2568 match up.up_blocks
with
2569 | [] -> raise Not_found
2572 (** Check whether a range is in a list *)
2574 let in_uploader_ranges r list
=
2575 List.exists
(fun (_
,_
,r'
) -> r'
== r) list
2577 let set_strategy t strategy
= t.t_s
.s_strategy
<- strategy
2579 let get_strategy t = t.t_s
.s_strategy
2581 (*************************************************************************)
2585 (*************************************************************************)
2587 let uploader_ranges_fold_left f
acc l
=
2591 | h
:: q
-> aux (f
acc h
) q
2594 (** Find a range to upload from [up], that is at most [range_size]
2595 bytes long (split some range if necessary) *)
2597 (* Is merging at all useful ? Once next range starts downloading, they
2598 can no longer be merged, so it should be rare... *)
2599 let allow_merge_ranges = true
2601 type ranges_cluster
= {
2602 cluster_ranges
: range list
;
2603 cluster_nuploading
: int;
2604 cluster_size
: Int64.t
2607 let dummy_ranges_cluster = {
2608 cluster_ranges
= [];
2609 cluster_nuploading
= 0;
2613 let is_dummy_cluster cluster
=
2614 cluster
.cluster_ranges
= []
2616 let find_range up range_size =
2618 (** merge two consecutive ranges in the first, if possible;
2619 Return true if successful *)
2620 let merge_ranges r r2
=
2621 match r.range_next
with
2625 r.range_end
< r2
.range_begin
||
2626 r2
.range_nuploading
> 0 then false
2628 r.range_end
<- r2
.range_end
;
2629 r.range_next
<- r2
.range_next
;
2630 (match r.range_next
with
2633 r3
.range_prev
<- Some
r);
2637 remove_completed_uploader_ranges up;
2639 let b, more_blocks
=
2640 match up.up_blocks
with
2643 lprintf_nl "find_range: uploader had no block selected";
2645 | b :: more_blocks
-> b, more_blocks
2648 match file_state
t.t_file
with
2655 lprintf_nl "find_range: file %s in bad state %s"
2656 t.t_s
.s_filename
(string_of_state
(file_state
t.t_file
));
2661 lprintf_nl "find_range: is there a range of size %Ld in %s for %d ?"
2663 (String.concat
" " (List.map
(fun b ->
2664 Printf.sprintf
"[%Ld-%Ld]" b.up_block_begin
b.up_block_end
2666 (client_num
up.up_client
);
2667 (* pick the first correct cluster with fewest uploaders
2668 We're not trying to get a range that's at least as big as
2669 [range_size] bytes - that would prevent partially downloaded
2670 ranges from being completed first *)
2671 let rec iter acc r b more_blocks
=
2672 let correct_range r =
2673 not
(in_uploader_ranges r up.up_ranges
) &&
2674 r.range_begin
< r.range_end
&&
2675 r.range_begin
>= b.up_block_begin
&&
2676 r.range_begin
< b.up_block_end
in
2678 if not
(correct_range r) then acc
2680 (* find if there are ranges to merge ahead *)
2681 let rec iter_cluster r cluster
=
2682 let cluster = { cluster with
2683 cluster_ranges
= r :: cluster.cluster_ranges
;
2684 cluster_size
= cluster.cluster_size
++
2685 (compute_range_size r)
2687 if not
allow_merge_ranges ||
2688 cluster.cluster_size
>= range_size then cluster
2690 match r.range_next
with
2693 if rr
.range_begin
= r.range_end
&&
2694 correct_range rr
&& rr
.range_nuploading
= 0 then
2695 iter_cluster rr
cluster
2699 iter_cluster r { dummy_ranges_cluster with
2700 cluster_nuploading
= r.range_nuploading
} in
2703 if is_dummy_cluster acc then cluster
2705 (* find a range with as few uploaders as possible *)
2706 let cmp = compare
acc.cluster_nuploading
2707 cluster.cluster_nuploading
in
2711 (* fast exit, and why I didn't use an iterator :/
2712 Could have used an exception, but I don't like that ;) *)
2713 if not
(is_dummy_cluster best_cluster) &&
2714 best_cluster.cluster_nuploading
= 0 then b, best_cluster
2716 match r.range_next
with
2717 | Some rr
-> iter best_cluster rr
b more_blocks
2719 match more_blocks
with
2720 | [] -> b, best_cluster
2721 | b :: more_blocks
->
2722 if debug_all || !verbose
then
2723 lprintf_nl "find_range: Client %d, next block"
2724 (client_num
up.up_client
);
2725 iter best_cluster b.up_block
.block_ranges
b more_blocks
in
2727 let best_block, best_cluster =
2728 iter dummy_ranges_cluster b.up_block
.block_ranges
b more_blocks
in
2729 if not
(is_dummy_cluster best_cluster) &&
2730 best_cluster.cluster_nuploading
> 0 &&
2731 (file_downloaded
t.t_file
< file_size
t.t_file
** 98L // 100L) then begin
2732 (* it seems they're only sucky choices left on that block, is
2733 there really nothing else better elsewhere ? *)
2734 let s = b.up_block
.block_s
in
2735 for i = 0 to up.up_ncomplete
- 1 do
2736 let block = up.up_complete_blocks
.(i) in
2737 if not
(List.exists
(fun b -> b.up_block
.block_num = block
2738 ) up.up_blocks
) then
2739 if s.s_priorities_bitmap
.[block] <> priority_zero &&
2740 should_download_block s block then
2741 let partial_found = match s.s_blocks
.(block) with
2742 | EmptyBlock
-> true
2743 | CompleteBlock
| VerifiedBlock
-> false
2744 | PartialBlock
b -> b.block_unselected_remaining
> 0L in
2745 if partial_found then begin
2747 if debug_all || !verbose then
2748 lprintf_nl "find_range: Client %d better switch cluster now!"
2749 (client_num up.up_client);
2755 match List.rev
best_cluster.cluster_ranges
with
2758 lprintf_nl "find_range: no correct range found!";
2761 if not
(List.for_all
(merge_ranges r) q
) then
2762 lprintf_nl "find_range: ranges did not merge as well as planned";
2763 split_range r (min
(r.range_begin
++ range_size)
2764 best_block.up_block_end
);
2765 if debug_all then begin
2767 iter_block_ranges (fun rr
->
2768 let selected = if rr
== r then "*" else "" in
2769 lprintf
" %s[%Ld-%Ld]:%d%s" selected
2770 rr
.range_begin rr
.range_end rr
.range_nuploading
2772 ) best_block.up_block
;
2775 let key = r.range_begin
, r.range_end
, r in
2776 up.up_ranges
<- up.up_ranges
@ [key];
2777 if r.range_nuploading
= 0 then begin
2778 let b = r.range_block
in
2779 b.block_unselected_remaining
<-
2780 b.block_unselected_remaining
-- (compute_range_size r);
2781 if b.block_unselected_remaining
< 0L then
2782 lprintf_nl "find_range: block_unselected_remaining is negative!";
2784 r.range_nuploading
<- r.range_nuploading
+ 1;
2787 (** range accessor(s) *)
2789 let range_range r = (r.range_begin
, r.range_end
)
2791 (** Data has been received from uploader [up]. Transfer data to file
2792 and update uploader ranges.
2793 Data = String.sub [str] [string_begin] [string_len] *)
2795 let received up file_begin str string_begin string_len
=
2796 assert (string_begin
>= 0);
2797 assert (string_len
>= 0);
2798 assert (string_begin
+ string_len
<= String.length str
);
2803 let debug_bad_write unexpected_intervals
=
2804 if !verbose_swarming
then begin
2805 lprintf
"Dismiss unwanted data from %d for %s:"
2806 (client_num
up.up_client
) (file_best_name
t.t_file
);
2807 List.iter (fun (i_begin, i_end) ->
2808 lprintf
" %Ld-%Ld" i_begin i_end) unexpected_intervals
;
2810 lprintf_nl " received: file_pos:%Ld string:%d %d"
2811 file_begin string_begin string_len
;
2812 lprintf_nl " ranges:";
2813 List.iter (fun (_
,_
,r) ->
2814 lprintf_n " range: %Ld-%Ld"
2817 (match r.range_next
with
2820 lprintf
" next: %Ld" rr
.range_begin
);
2821 (match r.range_prev
with
2824 lprintf
" prev: %Ld" rr
.range_begin
);
2826 let b = r.range_block
in
2827 lprintf_n " block: %d[%c] %Ld-%Ld [%s]"
2829 (VB.state_to_char
(VB.get
s.s_verified_bitmap
b.block_num))
2830 b.block_begin b.block_end
2831 (match s.s_blocks
.(b.block_num) with
2832 | EmptyBlock
-> "empty"
2833 | PartialBlock _
-> "partial"
2834 | CompleteBlock
-> "complete"
2835 | VerifiedBlock
-> "verified"
2837 let br = b.block_ranges
in
2838 lprintf
" first range: %Ld(%Ld)"
2840 (compute_range_size br);
2844 if !exit_on_error then exit
2 in
2846 if string_len
> 0 then
2847 let file_end = file_begin
++ (Int64.of_int string_len
) in
2848 if !verbose_swarming
then
2849 lprintf_nl "received on %Ld-%Ld" file_begin
file_end;
2853 let remove_interval intervals
interval_begin interval_end =
2854 let rec remove acc intervals
=
2855 match intervals
with
2856 | [] -> List.rev
acc
2857 | ((fi_begin
, fi_end
) as first_interval
) :: others
->
2858 if fi_begin
>= interval_end then
2859 List.rev_append
acc intervals
2860 else if fi_end
<= interval_begin then
2861 remove (first_interval
:: acc) others
2864 let acc = if fi_begin
< interval_begin then
2865 (fi_begin
, interval_begin) :: acc else acc in
2866 let acc = if fi_end
> interval_end then
2867 (interval_end, fi_end
) :: acc else acc in
2869 remove [] intervals
in
2870 List.fold_left
(fun acc (_
, _
, r) ->
2871 remove_interval acc r.range_begin
r.range_end
2872 ) [(file_begin
, file_end)] up.up_ranges
in
2873 if intervals_out <> [] then
2874 debug_bad_write intervals_out;
2876 let file_end = min
file_end s.s_size
in
2878 match file_state
t.t_file
with
2887 lprintf_nl "received: wrong file state %s for %s" (string_of_state
(file_state
t.t_file
)) s.s_filename
;
2889 | FileDownloading
->
2891 List.iter (fun (_
,_
,r) ->
2892 (* was: r.range_begin < file_end && r.range_end > file_begin *)
2893 if r.range_begin
>= file_begin
&&
2894 r.range_begin
< file_end then
2895 let file_end = min
file_end r.range_end
in
2896 let written_len = file_end -- r.range_begin
in
2897 let string_pos = string_begin
+
2898 Int64.to_int
(r.range_begin
-- file_begin
) in
2899 let string_length = Int64.to_int
written_len in
2900 if string_length > 0 then
2901 match s.s_networks
with
2902 | [] -> assert false
2904 assert (tprim
.t_primary
);
2906 preallocate_disk_space tprim
2907 r.range_begin
file_end
2911 lprintf_nl "Exception %s while preallocating disk space [%Ld-%Ld] for %s"
2912 (Printexc2.to_string e
)
2913 r.range_begin
file_end
2914 (file_best_name
t.t_file
));
2915 file_write tprim
.t_file
2917 str
string_pos string_length;
2918 range_received (Some
t) r r.range_begin
file_end;
2920 remove_completed_uploader_ranges up
2922 lprintf_nl "Exception %s while receiving data"
2923 (Printexc2.to_string e
);
2924 remove_completed_uploader_ranges up;
2927 (** compute the list of present intervals of a swarmer *)
2929 let present_intervals s =
2930 (* intervals is a reversed list of intervals *)
2931 let append_interval ((interval_begin, interval_end) as interval
) intervals
=
2932 (* remove void intervals *)
2933 if interval_begin = interval_end then intervals
2935 match intervals
with
2937 | (last_interval_begin
, last_interval_end
) :: other_intervals
->
2938 if last_interval_end
< interval_begin then
2939 interval
:: intervals
2941 (* coalescing intervals *)
2942 (last_interval_begin
, interval_end) :: other_intervals
in
2945 Array2.fold_lefti
(fun acc i b ->
2946 match s.s_blocks
.(i) with
2948 | CompleteBlock
| VerifiedBlock
->
2949 append_interval (compute_block_begin s i, compute_block_end s i) acc
2951 let acc, last_interval_end
=
2952 block_ranges_fold (fun (acc, lie
) r ->
2953 (append_interval (lie
, r.range_begin
) acc, r.range_end
)
2954 ) (acc, compute_block_begin s i) b in
2955 append_interval (last_interval_end
, compute_block_end s i) acc
2958 (*************************************************************************)
2960 (* propagate_chunk *)
2962 (*************************************************************************)
2965 chunk_uid
: uid_type
;
2969 type chunk_occurrence
= t * int * Int64.t (* frontend, chunk number, offset *)
2971 type chunk_occurrences
= {
2972 mutable occurrence_present
: chunk_occurrence list
;
2973 mutable occurrence_missing
: chunk_occurrence list
;
2976 let propagate_chunk t1 pos1
size destinations
=
2977 List.iter (fun (t2
, j2
, pos2
) ->
2978 if t1
.t_num
<> t2
.t_num
|| pos1
<> pos2
then begin
2979 if !verbose
then lprintf_nl "Should propagate chunk from %s %Ld to %s %Ld [%Ld]"
2980 (file_best_name t1
.t_file
) pos1
2981 (file_best_name t2
.t_file
) pos2
size;
2982 Unix32.copy_chunk
(file_fd t1
.t_file
) (file_fd t2
.t_file
)
2983 pos1 pos2
(Int64.to_int
size);
2984 set_frontend_state_complete t2 j2
2988 let dummy_chunk_occurrences () =
2989 { occurrence_present
= []; occurrence_missing
= [] }
2991 let duplicate_chunks () =
2992 let chunks = Hashtbl.create 100 in
2995 let nchunks = VB.length
t.t_converted_verified_bitmap
in
2996 match t.t_verifier
with
2997 | Verification uids
when Array.length uids
= nchunks ->
2998 let rec iter j len
pos =
3001 chunk_uid
= uids
.(j);
3002 chunk_size = min
(s.s_size
-- pos) t.t_chunk_size
;
3006 Hashtbl.find
chunks c
3008 let occurrences = dummy_chunk_occurrences () in
3009 Hashtbl.add
chunks c occurrences;
3011 (match VB.get
t.t_converted_verified_bitmap
j with
3012 | VB.State_missing
| VB.State_partial
->
3013 occurrences.occurrence_missing
<-
3014 (t, j, pos) :: occurrences.occurrence_missing
3015 | VB.State_complete
-> ()
3016 | VB.State_verified
->
3017 occurrences.occurrence_present
<-
3018 (t, j, pos) :: occurrences.occurrence_present
);
3019 iter (j+1) len
(pos ++ t.t_chunk_size
)
3021 iter 0 (VB.length
t.t_converted_verified_bitmap
) zero
3025 Hashtbl.iter (fun c occurrences ->
3026 (* we need a verified chunk to copy over the others *)
3027 match occurrences.occurrence_present
, occurrences.occurrence_missing
with
3030 | (t, _
, pos) :: _
, missing
->
3031 propagate_chunk t pos c.chunk_size missing
3034 let set_verifier t f
=
3036 (* TODO: check that false as t_primary is a good value to start with *)
3037 set_chunks_verified_bitmap t t.t_converted_verified_bitmap
3039 let set_verified t f
=
3042 let downloaded t = file_downloaded
t.t_file
3044 let block_chunk_num t b =
3045 t.t_chunk_of_block
.(b.block_num)
3047 let partition_size t = VB.length
t.t_converted_verified_bitmap
3049 let uploader_swarmer up = up.up_t
3051 (** Return the availability of the chunks of [t] as a string *)
3053 let chunks_availability t =
3055 String2.init
(partition_size t) (fun i ->
3058 (List.map
(fun i -> s.s_availability
.(i)) t.t_blocks_of_chunk
.(i)) in
3059 if v < 0 then 0 else
3060 if v > 200 then 200 else v))
3062 let is_interesting up =
3063 up.up_ncomplete
> 0 || up.up_npartial
> 0
3066 (*************************************************************************)
3068 (* value_to_int64_pair (internal) *)
3070 (*************************************************************************)
3072 let value_to_int64_pair v =
3074 | List
[v1
;v2
] | SmallList
[v1
;v2
] ->
3075 (value_to_int64 v1
, value_to_int64 v2
)
3077 failwith
"Options: Not an int64 pair"
3079 (*************************************************************************)
3083 (*************************************************************************)
3085 let set_present t = set_present t.t_s
3086 let set_absent t = set_absent t.t_s
3087 let present_intervals t = present_intervals t.t_s
3088 let print_t str
t = print_s str
t.t_s
3089 let print_uploaders t = print_uploaders t.t_s
3091 (*************************************************************************)
3093 (* value_to_frontend *)
3095 (*************************************************************************)
3097 let value_to_frontend t assocs
=
3099 let debug_wrong_downloaded t present d
=
3100 lprintf_nl "ERROR: stored downloaded value not restored !!! (%Ld/%Ld)" (downloaded t) d
;
3101 lprintf_nl "ERROR: present:";
3102 List.iter (fun (x
,y
) ->
3103 lprintf_nl " (%Ld,%Ld);" x y
3106 let p = present_intervals t in
3107 lprintf_nl "ERROR: present now:";
3110 List.fold_left
(fun acc (x
,y
) ->
3111 lprintf_nl " (%Ld,%Ld);" x y
;
3115 lprintf_nl "ERROR: total %Ld" total;
3116 if p = present
then begin
3117 lprintf_nl "ERROR: both appear to be the same!";
3119 if !exit_on_error then exit
2 in
3121 let get_value name conv
= conv
(List.assoc name assocs
) in
3124 try get_value "file_primary" value_to_bool
with _
-> true in
3127 let file_name = get_value "file_swarmer" value_to_string
in
3128 let s = HS.find
swarmers_by_name
3129 { dummy_swarmer with s_filename
= file_name } in
3130 associate primary t s
3131 (* TODO: make as many checks as possible to ensure the file and the swarmers
3132 are correctly associed. *)
3133 with Not_found
-> ());
3136 let mtime = try file_mtime
t.t_file
with _ -> 0. in
3139 value_to_float
(List.assoc
"file_mtime" assocs
)
3140 with Not_found
-> mtime
3147 set_chunks_verified_bitmap t
3148 (VB.of_string
(get_value "file_chunks" value_to_string
))
3150 set_chunks_verified_bitmap t
3151 (VB.of_string
(get_value "file_all_chunks" value_to_string
))
3154 lprintf_nl "Exception %s while loading bitmap"
3155 (Printexc2.to_string e
);
3156 (* force everything to be checked ASAP ? *)
3157 set_chunks_verified_bitmap t (VB.create (partition_size t) VB.State_complete
)
3161 lprintf "set_verified_bitmap: t = %s\n" t.t_converted_verified_bitmap;
3162 lprintf "set_verified_bitmap: s = %s\n" t.t_s.s_verified_bitmap;
3165 if primary then begin
3166 if !verbose_swarming
then lprintf_nl "Loading present...";
3169 (get_value "file_present_chunks"
3170 (value_to_list
value_to_int64_pair))
3172 set_present t present;
3175 lprintf_nl "Exception %s while set present"
3176 (Printexc2.to_string e
);
3177 verify_all_chunks t;
3180 if !verbose_swarming
then lprintf_nl "Downloaded after present %Ld" (downloaded t);
3183 let d = get_value "file_downloaded" value_to_int64
in
3184 if d <> downloaded t && !verbose
then
3185 debug_wrong_downloaded t present d
3186 with Not_found
-> ());
3189 (* TODO re-implement this
3191 let last_seen = get_value "file_chunks_age"
3192 (value_to_list value_to_int) in
3193 t.t_last_seen <- Array.of_list last_seen
3199 (*************************************************************************)
3201 (* frontend_to_value *)
3203 (*************************************************************************)
3205 let frontend_to_value t other_vals
=
3206 [("file_primary", bool_to_value
t.t_primary
);
3207 ("file_swarmer", string_to_value
t.t_s
.s_filename
);
3208 ("file_mtime", float_to_value
(try file_mtime
t.t_file
with _ -> 0.));
3209 ("file_chunks", string_to_value
(VB.to_string
(chunks_verified_bitmap t)))] @
3210 (if t.t_primary
then
3211 [("file_present_chunks", List
3212 (List.map
(fun (i1
,i2
) ->
3213 SmallList
[int64_to_value i1
; int64_to_value i2
])
3214 (present_intervals t)))]
3216 [("file_downloaded", int64_to_value
(downloaded t));
3217 ("file_chunks_age", List
(Array.to_list
3218 (Array.map int_to_value
t.t_last_seen
)))] @
3221 (** Verify one chunk of swarmer [s], if any frontend of that swarmer
3222 has a chunk to verify *)
3224 let verify_one_chunk s =
3225 (* lprintf "verify_one_chunk: %d networks\n" (List.length s.s_networks); *)
3226 List.exists
(fun t ->
3227 (* lprintf "verify_one_chunk of file %d\n" (file_num t.t_file); *)
3228 VB.existsi
(fun i c ->
3229 if c = VB.State_complete
then verify_chunk t i;
3230 c = VB.State_complete
) t.t_converted_verified_bitmap
3232 (* lprintf "verify_one_chunk: nothing done\n"; *)
3234 (** Verify one chunk of each swarmer that needs it *)
3236 let verify_some_chunks () =
3239 ignore
(verify_one_chunk s)
3243 (** Verify one chunk of the swarmer associated with [t], if needed *)
3245 let verify_one_chunk t =
3246 ignore
(verify_one_chunk t.t_s
)
3248 (** Merge a second frontend [f2] to a first one [f1], so they share
3251 First swarmer [f1] must support some hashing scheme.
3252 Data of the second swarmer [f2] is currently lost during merging, so
3253 you'd better merge in swarmers quickly.
3254 Merging is denied if any of the two frontends is being used, so it
3255 may be necessary to pause them first, to get rid of any downloads.
3260 let s1 = HS.find
swarmers_by_name { dummy_swarmer with
3261 s_filename
= file_disk_name f1
} in
3262 let s2 = HS.find
swarmers_by_name { dummy_swarmer with
3263 s_filename
= file_disk_name f2
} in
3265 if s1.s_filename
= s2.s_filename
then
3266 failwith
"Files are already sharing their swarmer";
3268 if s1.s_size
<> s2.s_size
then
3269 failwith
"Files don't have the same size";
3272 match s2.s_networks
with
3275 lprintf_nl "s_networks: %d files" (List.length list
);
3276 failwith
"Second file is already merged with other files"
3280 match s1.s_networks
with
3281 | [] -> assert false
3283 match t1.t_verifier
with
3284 | NoVerification
| VerificationNotAvailable
->
3285 failwith
"Cannot use first file as a primary for swarming (no verification scheme)"
3286 | Verification
_ | ForceVerification
-> t1
3289 List.iter (fun (s, filename
) ->
3290 Array.iteri
(fun i nuploading
->
3291 if nuploading
> 0 then
3292 failwith
(Printf.sprintf
"%s is currently being downloaded" filename
)
3298 (* replace T2 swarmer *)
3299 associate false t2 t1.t_s
3301 (*************************************************************************)
3303 (* has_secondaries *)
3305 (*************************************************************************)
3307 let has_secondaries t =
3308 t.t_primary
&& List.length
t.t_s
.s_networks
> 1
3310 (*************************************************************************)
3312 (* Remove swarmer *)
3314 (*************************************************************************)
3316 let remove_swarmer file_swarmer
=
3317 match file_swarmer
with
3319 | Some sw
-> if not
(has_secondaries sw
)
3320 then HS.remove swarmers_by_name sw
.t_s
3321 else lprintf_nl "Tried to remove swarmer with secondaries"
3323 (*************************************************************************)
3327 (*************************************************************************)
3330 List.map
(fun t -> t.t_file
) t.t_s
.s_networks
3332 (*************************************************************************)
3336 (*************************************************************************)
3338 module SwarmerOption
= struct
3340 let value_to_priority_interval v =
3342 | List
[v1
;p] | SmallList
[v1
;p] ->
3343 (value_to_int64 v1
, value_to_int
p)
3345 failwith
"Options: Not a priority interval"
3347 let value_to_swarmer v =
3350 let get_value name conv
= conv
(List.assoc name assocs
) in
3351 let file_size = get_value "file_size" value_to_int64
in
3352 let file_name = get_value "file_name" value_to_string
in
3353 let s = create_swarmer file_name file_size in
3356 get_value "file_download_random" value_to_bool
3359 s.s_strategy
<- if order then AdvancedStrategy
else LinearStrategy
);
3361 let bitmap = Bitv.of_string
(get_value "file_disk_allocation_bitmap"
3363 if Bitv.length
bitmap = Bitv.length
s.s_disk_allocated
then
3364 s.s_disk_allocated
<- bitmap
3366 (* s_disk_allocated missing or inconsistent ?
3367 set_present will fix it *)
3368 let block_sizes = get_value "file_chunk_sizes"
3369 (value_to_list value_to_int64
) in
3370 List.iter (fun bsize
->
3371 split_blocks s bsize
3375 get_value "file_priorities_intervals" (value_to_list
value_to_priority_interval)
3376 with Not_found
-> [(zero
, 1)]
3378 if validate_intervals s.s_size
intervals then
3379 s.s_priorities_intervals
<- intervals
3381 lprintf_nl "Failed to validate priority intervals, using default. File %s" file_name;
3382 swarmer_recompute_priorities_bitmap s;
3387 let swarmer_to_value s =
3389 ("file_size", int64_to_value
s.s_size
);
3390 ("file_name", string_to_value
s.s_filename
);
3391 ("file_disk_allocation_bitmap", string_to_value
3392 (Bitv.to_string
s.s_disk_allocated
));
3393 ("file_chunk_sizes", list_to_value int64_to_value
3394 (List.map
(fun t -> t.t_chunk_size
) s.s_networks
));
3395 ("file_priorities_intervals", List
3397 (fun (i_begin, priority
) -> SmallList
[int64_to_value
i_begin; int_to_value priority
])
3398 s.s_priorities_intervals
));
3399 ("file_download_random", bool_to_value
3400 (match s.s_strategy
with
3401 | AdvancedStrategy
-> true
3402 | LinearStrategy
-> false));
3406 define_option_class
"Swarmer" value_to_swarmer swarmer_to_value
3410 (** Checks most variants of a swarmer, nobably verification bitmaps
3411 consistency; Raise an exception if a problem is found *)
3413 let check_swarmer s =
3415 match s.s_networks
with
3416 | [] -> lprintf_nl "found unused swarmer %s, discarding" s.s_filename
;
3418 assert(tprim
.t_primary
);
3420 VB.iteri
(fun i c ->
3421 if c = VB.State_verified
then begin
3422 if List.exists
(fun j -> VB.get
s.s_verified_bitmap
j <> VB.State_verified
)
3423 tprim
.t_blocks_of_chunk
.(i) then
3424 failwith
"Bad propagation of State_verified from primary to swarmer";
3426 else if List.exists
(fun j -> VB.get
s.s_verified_bitmap
j = VB.State_verified
)
3427 tprim
.t_blocks_of_chunk
.(i) then
3428 failwith
"Swarmer has State_verified not coming from primary";
3429 ) tprim
.t_converted_verified_bitmap
;
3431 let fd = file_fd tprim
.t_file
in
3434 assert (not
t.t_primary
);
3435 assert (file_fd
t.t_file
== fd);
3437 VB.iteri
(fun i c ->
3438 if c = VB.State_verified
then begin
3439 if List.exists
(fun j -> VB.get
s.s_verified_bitmap
j <> VB.State_verified
)
3440 t.t_blocks_of_chunk
.(i) then
3441 failwith
"State_verified in secondary without State_verified in primary"
3443 else if c = VB.State_complete
then begin
3444 if List.exists
(fun j -> VB.get
s.s_verified_bitmap
j <> VB.State_verified
)
3445 t.t_blocks_of_chunk
.(i) then
3446 failwith
"State_complete in secondary without State_verified in primary"
3448 ) t.t_converted_verified_bitmap
3454 (*************************************************************************)
3456 (* Option swarmers *)
3458 (*************************************************************************)
3461 define_option
CommonComplexOptions.swarmers_section
3462 ["swarmers"] "All the swarmers used" (list_option
SwarmerOption.t) []
3464 (*************************************************************************)
3468 (*************************************************************************)
3471 set_after_save_hook files_ini
(fun _ -> swarmers =:= []);
3472 set_before_save_hook files_ini
(fun _ ->
3473 let list = ref [] in
3475 if s.s_networks
<> [] then
3476 list := s :: !list) swarmers_by_name;
3478 (* put primary frontends to the head, so that swarmers' invariants
3479 can be verified while downloads are being restored from ini files *)
3480 let primary_files, secondary_files
=
3481 List.partition
(fun file ->
3482 match file_files
file with
3483 | primary_file
:: _ when primary_file
== file -> true
3484 | _ -> false) !!CommonComplexOptions.files
in
3485 CommonComplexOptions.files
=:= primary_files @ secondary_files
3487 set_after_load_hook files_ini
(fun _ ->
3494 (*************************************************************************)
3498 (*************************************************************************)
3500 (* Compute an approximation of the storage used by this module *)
3503 BasicSocket.add_infinite_timer
300. duplicate_chunks;
3504 Heap.add_memstat
"CommonSwarming" (fun level buf
->
3505 let counter = ref 0 in
3506 let nchunks = ref 0 in
3507 let nblocks = ref 0 in
3508 let nranges = ref 0 in
3510 let n = VB.length
s.s_verified_bitmap
in
3511 nchunks := !nchunks + n;
3513 Array.iter (fun b ->
3517 iter_block_ranges (fun _ -> incr
nranges) b
3523 Printf.bprintf buf
" Swarmers: %d\n" !counter;
3524 Printf.bprintf buf
" nchunks: %d nblocks: %d nranges: %d\n"
3525 !nchunks !nblocks !nranges;
3526 Printf.bprintf buf
" Storage (without blocks): %d bytes\n"
3532 let counter = ref 0 in
3533 let storage = ref 0 in
3535 storage := !storage + 76 +
3536 Array.length
up.up_complete_blocks
* 4 +
3537 List.length
up.up_ranges
* (12 + 16 + 12 + 12 + 4) +
3538 Array.length
up.up_partial_blocks
* (16 + 12 + 12) +
3539 (8 + match up.up_intervals
with
3540 | AvailableIntervals
list -> List.length
list * (12 + 12 + 12 + 12)
3541 | AvailableBitv
b -> let ws = Sys.word_size
in (ws/8) + ((ws / 8) * (Bitv.length
b / (ws - 2)))
3545 Printf.bprintf buf
" Uploaders: %d\n" !counter;
3546 Printf.bprintf buf
" Storage: %d bytes\n" !storage;
3549 (* functions for priority bitmask *)
3551 let file_swarmer f
=
3552 HS.find
swarmers_by_name { dummy_swarmer with s_filename
= file_disk_name f
}
3555 (** set the priority bitmask for each chunk in a file *)
3556 let set_swarmer_chunk_priorities f priobitmap
=
3557 let s = file_swarmer f
in
3558 if String.length priobitmap
= VB.length
s.s_verified_bitmap
then
3559 s.s_priorities_bitmap
<- priobitmap
3562 (** get the priority bitmask for a file (do not mutate the string directly, use swarmer_set_interval) *)
3563 let get_swarmer_block_priorities s = s.s_priorities_bitmap
3564 let get_swarmer_block_verified s = s.s_verified_bitmap
3565 let get_swarmer_priorities_intervals s = s.s_priorities_intervals
3567 (* using compute_block_num outside of swarming code is probably
3568 broken, networks supports are aware of chunks, not blocks
3569 maybe other block-related functions should be censored in the same
3571 tag block numbers are chunk numbers so they're not inadvertedly
3572 mistaken for each other ?
3574 (* let compute_block_num = () *)