1 %%%-------------------------------------------------------------------
2 %%% File : etorrent_mnesia_chunks.erl
3 %%% Author : Jesper Louis Andersen <>
4 %%% Description : Chunking code for mnesia.
6 %%% Created : 31 Mar 2008 by Jesper Louis Andersen <>
7 %%%-------------------------------------------------------------------
8 -module(etorrent_chunk
).
10 -include_lib("stdlib/include/qlc.hrl").
12 -include("etorrent_mnesia_table.hrl").
14 -define(DEFAULT_CHUNK_SIZE
, 16384). % Default size for a chunk. All clients use this.
17 -export([pick_chunks
/4, store_chunk
/4, putback_chunks
/1,
18 endgame_remove_chunk
/3, mark_fetched
/2,
21 %%====================================================================
23 %%====================================================================
26 %%--------------------------------------------------------------------
27 %% Function: pick_chunks(Handle, PieceSet, StatePid, Num) -> ...
28 %% Description: Return some chunks for downloading.
30 %% This function is relying on tail-calls to itself with different
31 %% tags to return the needed data.
33 %%--------------------------------------------------------------------
34 %% TODO, not_interested here does not take chunked pieces into account!
35 pick_chunks(Pid
, Id
, PieceSet
, Remaining
) ->
36 case pick_chunks(pick_chunked
, {Pid
, Id
, PieceSet
, [], Remaining
, none
}) of
37 not_interested
-> pick_chunks_endgame(Id
, PieceSet
, Remaining
,
39 {ok
, []} -> pick_chunks_endgame(Id
, PieceSet
, Remaining
,
41 {ok
, Items
} -> {ok
, Items
}
44 pick_chunks_endgame(Id
, PieceSet
, Remaining
, Ret
) ->
45 case etorrent_torrent:is_endgame(Id
) of
46 false
-> Ret
; %% No endgame yet
47 true
-> pick_chunks(endgame
, {Id
, PieceSet
, Remaining
})
51 %% There are 0 remaining chunks to be desired, return the chunks so far
52 pick_chunks(_Operation
, {_Pid
, _Id
, _PieceSet
, SoFar
, 0, _Res
}) ->
55 %% Pick chunks from the already chunked pieces
56 pick_chunks(pick_chunked
, {Pid
, Id
, PieceSet
, SoFar
, Remaining
, Res
}) ->
57 Iterator
= gb_sets:iterator(PieceSet
),
58 case find_chunked_chunks(Id
, gb_sets:next(Iterator
), Res
) of
60 pick_chunks(chunkify_piece
, {Pid
, Id
, PieceSet
, SoFar
, Remaining
, none
});
62 pick_chunks(chunkify_piece
, {Pid
, Id
, PieceSet
, SoFar
, Remaining
, found_chunked
});
63 PieceNum
when is_integer(PieceNum
) ->
64 {atomic
, {ok
, Chunks
, Left
}} =
65 select_chunks_by_piecenum(Id
, PieceNum
,
67 pick_chunks(pick_chunked
, {Pid
, Id
,
68 gb_sets:del_element(PieceNum
, PieceSet
),
74 %% Find a new piece to chunkify. Give up if no more pieces can be chunkified
75 pick_chunks(chunkify_piece
, {Pid
, Id
, PieceSet
, SoFar
, Remaining
, Res
}) ->
76 case chunkify_new_piece(Id
, PieceSet
) of
78 pick_chunks(pick_chunked
, {Pid
, Id
, PieceSet
, SoFar
, Remaining
, Res
});
79 none_eligible
when SoFar
=:= [], Res
=:= none
->
81 none_eligible
when SoFar
=:= [], Res
=:= found_chunked
->
87 %% Handle the endgame for a torrent gracefully
88 pick_chunks(endgame
, {Id
, PieceSet
, N
}) ->
89 Remaining
= find_remaining_chunks(Id
, PieceSet
),
90 Shuffled
= etorrent_utils:shuffle(Remaining
),
91 {endgame
, lists:sublist(Shuffled
, N
)}.
93 %%--------------------------------------------------------------------
94 %% Function: putback_chunks(Pid) -> transaction
95 %% Description: Find all chunks assigned to Pid and mark them as not_fetched
96 %%--------------------------------------------------------------------
97 putback_chunks(Pid
) ->
98 MatchHead
= #chunk
{ idt
= {'_', '_', {assigned
, Pid
}}, _
='_'},
101 Rows
= mnesia:select(chunk
, [{MatchHead
, [], ['$_']}]),
104 {Id
, PieceNum
, _
} = C#chunk
.idt
,
105 Chunks
= C#chunk
.chunks
,
106 NotFetchIdt
= {Id
, PieceNum
, not_fetched
},
107 case mnesia:read(chunk
, NotFetchIdt
, write
) of
109 mnesia:write(#chunk
{ idt
= NotFetchIdt
,
114 R#chunk
.chunks
++ Chunks
})
116 mnesia:delete_object(C
)
121 %%--------------------------------------------------------------------
122 %% Function: store_chunk(Id, PieceNum, {Offset, Len},
123 %% Data, FSPid, PeerGroupPid, Pid) -> ok
124 %% Description: Workhorse function. Store a chunk in the chunk mnesia table.
125 %% If we have all chunks we need, then report the piece is full.
126 %%--------------------------------------------------------------------
127 store_chunk(Id
, PieceNum
, {Offset
, Len
}, Pid
) ->
131 %% Add the newly fetched data to the fetched list
132 Present
= t_update_fetched(Id
, PieceNum
, {Offset
, Len
}),
133 %% Update that the chunk is not anymore assigned to the Pid
134 t_update_chunk_assignment(Id
, PieceNum
, Pid
,
136 %% Count down the number of missing chunks for the piece
137 %% Next lines can be thrown into a seperate counter for speed.
141 false
-> t_decrease_missing_chunks(Id
, PieceNum
)
146 %%--------------------------------------------------------------------
147 %% Function: endgame_remove_chunk/3
148 %% Args: Pid ::= pid() - pid of caller
149 %% Id ::= integer() - torrent id
150 %% IOL ::= {integer(), integer(), integer()} - {Index, Offs, Len}
151 %% Description: Remove a chunk in the endgame from its assignment to a
153 %%--------------------------------------------------------------------
154 endgame_remove_chunk(Pid
, Id
, {Index
, Offset
, _Len
}) ->
155 case mnesia:dirty_read(chunk
, {Id
, Index
, {assigned
, Pid
}}) of
159 NC
= lists:keydelete(Offset
, 1, R#chunk
.chunks
),
160 mnesia:dirty_write(R#chunk
{chunks
= NC
})
163 %%--------------------------------------------------------------------
164 %% Function: mark_fetched/2
165 %% Args: Id ::= integer() - torrent id
166 %% IOL ::= {integer(), integer(), integer()} - {Index, Offs, Len}
167 %% Description: Mark a given chunk as fetched.
168 %%--------------------------------------------------------------------
169 mark_fetched(Id
, {Index
, Offset
, _Len
}) ->
171 case mnesia:read(chunk
, {Id
, Index
, not_fetched
}, write
) of
175 case lists:keymember(Offset
, 1, R#chunk
.chunks
) of
177 case lists:keydelete(Offset
, 1, R#chunk
.chunks
) of
178 [] -> mnesia:delete_object(R
);
179 NC
-> mnesia:write(R#chunk
{ chunks
= NC
})
187 {atomic
, Res
} = mnesia:transaction(F
),
190 %%--------------------------------------------------------------------
191 %% Function: remove_chunks/2
192 %% Args: Id ::= integer() - torrent id
193 %% Idx ::= integer() - Index of Piece
194 %% Description: Oblitterate all chunks for Index in the torrent Id.
195 %%--------------------------------------------------------------------
196 remove_chunks(Id
, Idx
) ->
197 MatchHead
= #chunk
{ idt
= {Id
, Idx
, '_'}, _
= '_' },
199 Rows
= mnesia:select(chunk
, [{MatchHead
, [], ['$_']}]),
200 [mnesia:delete_object(Row
) || Row
<- Rows
]
202 {atomic
, _
} = mnesia:transaction(F
),
205 %%====================================================================
206 %% Internal functions
207 %%====================================================================
209 %%--------------------------------------------------------------------
210 %% Function: find_remaining_chunks(Id, PieceSet) -> [Chunk]
211 %% Description: Find all remaining chunks for a torrent matching PieceSet
212 %%--------------------------------------------------------------------
213 find_remaining_chunks(Id
, PieceSet
) ->
214 MatchHeadAssign
= #chunk
{ idt
= {Id
, '$1', {assigned
, '_'}}, chunks
= '$2'},
215 MatchHeadNotFetch
= #chunk
{ idt
= {Id
, '$1', not_fetched
}, chunks
= '$2'},
216 RowsA
= mnesia:dirty_select(chunk
, [{MatchHeadAssign
, [], [{{'$1', '$2'}}]}]),
217 RowsN
= mnesia:dirty_select(chunk
, [{MatchHeadNotFetch
, [], [{{'$1', '$2'}}]}]),
218 Eligible
= [{PN
, Chunks
} || {PN
, Chunks
} <- (RowsA
++ RowsN
),
219 gb_sets:is_element(PN
, PieceSet
)],
220 [{PN
, Os
, Sz
, Ops
} || {PN
, Chunks
} <- Eligible
, {Os
, Sz
, Ops
} <- Chunks
].
223 %%--------------------------------------------------------------------
224 %% Function: chunkify_new_piece(Id, PieceSet) -> ok | none_eligible
225 %% Description: Find a piece in the PieceSet which has not been chunked
226 %% yet and chunk it. Returns either ok if a piece was chunked or none_eligible
227 %% if we can't find anything to chunk up in the PieceSet.
229 %%--------------------------------------------------------------------
230 chunkify_new_piece(Id
, PieceSet
) when is_integer(Id
) ->
231 It
= gb_sets:iterator(PieceSet
),
232 case find_new_piece(Id
, It
) of
235 P
when is_record(P
, piece
) ->
236 chunkify_piece(Id
, P
),
241 %%--------------------------------------------------------------------
242 %% Function: select_chunks_by_piecenum(Id, PieceNum, Num, Pid) ->
243 %% {ok, [{Offset, Len}], Remain}
244 %% Description: Select up to Num chunks from PieceNum. Will return either
245 %% {ok, Chunks} if it got all chunks it wanted, or {partial, Chunks, Remain}
246 %% if it got some chunks and there is still Remain chunks left to pick.
247 %%--------------------------------------------------------------------
248 select_chunks_by_piecenum(Id
, PieceNum
, Num
, Pid
) ->
251 case mnesia:read(chunk
, {Id
, PieceNum
, not_fetched
}, write
) of
253 %% There are no such chunk anymore. Someone else exhausted it
254 {ok
, {PieceNum
, []}, Num
};
256 %% Get up to the number of chunks we want
257 {Return
, Rest
} = etorrent_utils:gsplit(Num
, R#chunk
.chunks
),
258 [_
|_
] = Return
, % Assert the state of Return
259 %% Write back the missing ones
262 mnesia:delete_object(R
);
264 mnesia:write(R#chunk
{chunks
= Rest
})
266 %% Assign chunk to us
268 case mnesia:read(chunk
, {Pid
, PieceNum
, {assigned
, Pid
}}, write
) of
270 #chunk
{ idt
= {Id
, PieceNum
, {assigned
, Pid
}},
272 [C
] when is_record(C
, chunk
) ->
275 mnesia:write(Q#chunk
{ chunks
= Return
++ Q#chunk
.chunks
}),
277 Remaining
= Num
- length(Return
),
278 {ok
, {PieceNum
, Return
}, Remaining
}
282 %%--------------------------------------------------------------------
283 %% Function: chunkify(Operations) ->
284 %% [{Offset, Size, FileOperations}]
285 %% Description: From a list of operations to read/write a piece, construct
286 %% a list of chunks given by Offset of the chunk, Size of the chunk and
287 %% how to read/write that chunk.
288 %%--------------------------------------------------------------------
290 %% First, we call the version of the function doing the grunt work.
291 chunkify(Operations
) ->
292 chunkify(0, 0, [], Operations
, ?DEFAULT_CHUNK_SIZE
).
294 %% Suppose the next File operation on the piece has 0 bytes in size, then it
295 %% is exhausted and must be thrown away.
296 chunkify(AtOffset
, EatenBytes
, Operations
,
297 [{_Path
, _Offset
, 0} | Rest
], Left
) ->
298 chunkify(AtOffset
, EatenBytes
, Operations
, Rest
, Left
);
300 %% There are no more file operations to carry out. Hence we reached the end of
301 %% the piece and we just return the last chunk operation. Remember to reverse
302 %% the list of operations for that chunk as we build it in reverse.
303 chunkify(AtOffset
, EatenBytes
, Operations
, [], _Sz
) ->
304 [{AtOffset
, EatenBytes
, lists:reverse(Operations
)}];
306 %% There are no more bytes left to add to this chunk. Recurse by calling
307 %% on the rest of the problem and add our chunk to the front when coming
308 %% back. Remember to reverse the Operations list built in reverse.
309 chunkify(AtOffset
, EatenBytes
, Operations
, OpsLeft
, 0) ->
310 R
= chunkify(AtOffset
+ EatenBytes
, 0, [], OpsLeft
, ?DEFAULT_CHUNK_SIZE
),
311 [{AtOffset
, EatenBytes
, lists:reverse(Operations
)} | R
];
313 %% The next file we are processing have a larger size than what is left for this
314 %% chunk. Hence we can just eat off that many bytes from the front file.
315 chunkify(AtOffset
, EatenBytes
, Operations
,
316 [{Path
, Offset
, Size
} | Rest
], Left
) when Left
=< Size
->
317 chunkify(AtOffset
, EatenBytes
+ Left
,
318 [{Path
, Offset
, Left
} | Operations
],
319 [{Path
, Offset
+Left
, Size
- Left
} | Rest
],
322 %% The next file does *not* have enough bytes left, so we eat all the bytes
323 %% we can get from it, and move on to the next file.
324 chunkify(AtOffset
, EatenBytes
, Operations
,
325 [{Path
, Offset
, Size
} | Rest
], Left
) when Left
> Size
->
326 chunkify(AtOffset
, EatenBytes
+ Size
,
327 [{Path
, Offset
, Size
} | Operations
],
331 %%--------------------------------------------------------------------
332 %% Function: chunkify_piece(Id, PieceNum) -> {atomic, ok} | {aborted, Reason}
333 %% Description: Given a PieceNumber, cut it up into chunks and add those
334 %% to the chunk table.
335 %%--------------------------------------------------------------------
336 chunkify_piece(Id
, P
) when is_record(P
, piece
) ->
337 Chunks
= chunkify(P#piece
.files
),
338 NumChunks
= length(Chunks
),
342 [S
] = mnesia:read({piece
, P#piece
.idpn
}),
343 case S#piece
.state
of
345 ok
= mnesia:write(S#piece
{ state
= chunked
,
347 ok
= mnesia:write(#chunk
{ idt
= {S#piece
.id
,
348 S#piece
.piece_number
, not_fetched
},
357 etorrent_torrent:decrease_not_fetched(Id
); % endgames as side-eff.
363 %%--------------------------------------------------------------------
364 %% Function: find_new_piece(Id, Iterator) -> #piece | none
365 %% Description: Search an iterator for a not_fetched piece. Return the #piece
367 %%--------------------------------------------------------------------
368 find_new_piece(Id
, Iterator
) ->
369 case gb_sets:next(Iterator
) of
370 {PieceNumber
, Next
} ->
371 case mnesia:dirty_read(piece
, {Id
, PieceNumber
}) of
373 find_new_piece(Id
, Next
);
374 [P
] when P#piece
.state
=:= not_fetched
->
377 find_new_piece(Id
, Next
)
383 %%--------------------------------------------------------------------
384 %% Function: find_chunked_chunks(Id, iterator_result()) -> none | PieceNum
385 %% Description: Search an iterator for a chunked piece.
386 %%--------------------------------------------------------------------
387 find_chunked_chunks(_Id
, none
, Res
) ->
389 find_chunked_chunks(Id
, {Pn
, Next
}, Res
) ->
390 [P
] = mnesia:dirty_read(piece
, {Id
, Pn
}),
391 case P#piece
.state
of
393 case mnesia:dirty_read(chunk
, {Id
, Pn
, not_fetched
}) of
395 find_chunked_chunks(Id
, gb_sets:next(Next
), found_chunked
);
397 P#piece
.piece_number
%% Optimization: Pick the whole piece and pass it on
400 find_chunked_chunks(Id
, gb_sets:next(Next
), Res
)
403 t_update_fetched(Id
, PieceNum
, {Offset
, _Len
}) ->
404 case etorrent_piece:t_fetched(Id
, PieceNum
) of
407 case mnesia:read(chunk
, {Id
, PieceNum
, fetched
}, write
) of
409 mnesia:write(#chunk
{ idt
= {Id
, PieceNum
, fetched
},
413 case lists:member(Offset
, R#chunk
.chunks
) of
419 [Offset
| R#chunk
.chunks
]}),
425 t_update_chunk_assignment(Id
, PieceNum
, Pid
,
427 case mnesia:read(chunk
, {Id
, PieceNum
, {assigned
, Pid
}}, write
) of
429 %% We stored a chunk that was not belonging to us, do nothing
432 case lists:keydelete({Offset
, Len
}, 1, S#chunk
.chunks
) of
434 mnesia:delete_object(S
);
436 mnesia:write(S#chunk
{ chunks
= L
})
440 t_decrease_missing_chunks(Id
, PieceNum
) ->
441 [P
] = mnesia:read(piece
, {Id
, PieceNum
}, write
),
442 NewP
= P#piece
{ left
= P#piece
.left
- 1 },
444 case NewP#piece
.left
of
447 N
when is_integer(N
) ->