When deleting chunks in endgame, delete the chunk row if there are no more chunks...
[etorrent.git] / lib / etorrent-1.0 / src / etorrent_chunk.erl
blob69e128ee4df5567fe8e7c1834c8cdaa17c0761b8
1 %%%-------------------------------------------------------------------
2 %%% File : etorrent_mnesia_chunks.erl
3 %%% Author : Jesper Louis Andersen <>
4 %%% Description : Chunking code for mnesia.
5 %%%
6 %%% Created : 31 Mar 2008 by Jesper Louis Andersen <>
7 %%%-------------------------------------------------------------------
8 -module(etorrent_chunk).
10 -include_lib("stdlib/include/qlc.hrl").
12 -include("etorrent_mnesia_table.hrl").
14 -define(DEFAULT_CHUNK_SIZE, 16384). % Default size for a chunk. All clients use this.
16 %% API
17 -export([pick_chunks/4, store_chunk/4, putback_chunks/1,
18 endgame_remove_chunk/3, mark_fetched/2,
19 remove_chunks/2]).
21 %%====================================================================
22 %% API
23 %%====================================================================
26 %%--------------------------------------------------------------------
27 %% Function: pick_chunks(Handle, PieceSet, StatePid, Num) -> ...
28 %% Description: Return some chunks for downloading.
30 %% This function is relying on tail-calls to itself with different
31 %% tags to return the needed data.
33 %%--------------------------------------------------------------------
34 %% TODO, not_interested here does not take chunked pieces into account!
35 pick_chunks(Pid, Id, PieceSet, Remaining) ->
36 case pick_chunks(pick_chunked, {Pid, Id, PieceSet, [], Remaining, none}) of
37 not_interested -> pick_chunks_endgame(Id, PieceSet, Remaining,
38 not_interested);
39 {ok, []} -> pick_chunks_endgame(Id, PieceSet, Remaining,
40 none_eligible);
41 {ok, Items} -> {ok, Items}
42 end.
44 pick_chunks_endgame(Id, PieceSet, Remaining, Ret) ->
45 case etorrent_torrent:is_endgame(Id) of
46 false -> Ret; %% No endgame yet
47 true -> pick_chunks(endgame, {Id, PieceSet, Remaining})
48 end.
51 %% There are 0 remaining chunks to be desired, return the chunks so far
52 pick_chunks(_Operation, {_Pid, _Id, _PieceSet, SoFar, 0, _Res}) ->
53 {ok, SoFar};
55 %% Pick chunks from the already chunked pieces
56 pick_chunks(pick_chunked, {Pid, Id, PieceSet, SoFar, Remaining, Res}) ->
57 Iterator = gb_sets:iterator(PieceSet),
58 case find_chunked_chunks(Id, gb_sets:next(Iterator), Res) of
59 none ->
60 pick_chunks(chunkify_piece, {Pid, Id, PieceSet, SoFar, Remaining, none});
61 found_chunked ->
62 pick_chunks(chunkify_piece, {Pid, Id, PieceSet, SoFar, Remaining, found_chunked});
63 PieceNum when is_integer(PieceNum) ->
64 {atomic, {ok, Chunks, Left}} =
65 select_chunks_by_piecenum(Id, PieceNum,
66 Remaining, Pid),
67 pick_chunks(pick_chunked, {Pid, Id,
68 gb_sets:del_element(PieceNum, PieceSet),
69 [Chunks | SoFar],
70 Left, Res})
71 end;
74 %% Find a new piece to chunkify. Give up if no more pieces can be chunkified
75 pick_chunks(chunkify_piece, {Pid, Id, PieceSet, SoFar, Remaining, Res}) ->
76 case chunkify_new_piece(Id, PieceSet) of
77 ok ->
78 pick_chunks(pick_chunked, {Pid, Id, PieceSet, SoFar, Remaining, Res});
79 none_eligible when SoFar =:= [], Res =:= none ->
80 not_interested;
81 none_eligible when SoFar =:= [], Res =:= found_chunked ->
82 {ok, []};
83 none_eligible ->
84 {ok, SoFar}
85 end;
87 %% Handle the endgame for a torrent gracefully
88 pick_chunks(endgame, {Id, PieceSet, N}) ->
89 Remaining = find_remaining_chunks(Id, PieceSet),
90 Shuffled = etorrent_utils:shuffle(Remaining),
91 {endgame, lists:sublist(Shuffled, N)}.
93 %%--------------------------------------------------------------------
94 %% Function: putback_chunks(Pid) -> transaction
95 %% Description: Find all chunks assigned to Pid and mark them as not_fetched
96 %%--------------------------------------------------------------------
97 putback_chunks(Pid) ->
98 MatchHead = #chunk { idt = {'_', '_', {assigned, Pid}}, _='_'},
99 mnesia:transaction(
100 fun () ->
101 Rows = mnesia:select(chunk, [{MatchHead, [], ['$_']}]),
102 lists:foreach(
103 fun(C) ->
104 {Id, PieceNum, _} = C#chunk.idt,
105 Chunks = C#chunk.chunks,
106 NotFetchIdt = {Id, PieceNum, not_fetched},
107 case mnesia:read(chunk, NotFetchIdt, write) of
108 [] ->
109 mnesia:write(#chunk{ idt = NotFetchIdt,
110 chunks = Chunks});
111 [R] ->
112 mnesia:write(
113 R#chunk { chunks =
114 R#chunk.chunks ++ Chunks})
115 end,
116 mnesia:delete_object(C)
117 end,
118 Rows)
119 end).
121 %%--------------------------------------------------------------------
122 %% Function: store_chunk(Id, PieceNum, {Offset, Len},
123 %% Data, FSPid, PeerGroupPid, Pid) -> ok
124 %% Description: Workhorse function. Store a chunk in the chunk mnesia table.
125 %% If we have all chunks we need, then report the piece is full.
126 %%--------------------------------------------------------------------
127 store_chunk(Id, PieceNum, {Offset, Len}, Pid) ->
128 {atomic, Res} =
129 mnesia:transaction(
130 fun () ->
131 %% Add the newly fetched data to the fetched list
132 Present = t_update_fetched(Id, PieceNum, {Offset, Len}),
133 %% Update that the chunk is not anymore assigned to the Pid
134 t_update_chunk_assignment(Id, PieceNum, Pid,
135 {Offset, Len}),
136 %% Count down the number of missing chunks for the piece
137 %% Next lines can be thrown into a seperate counter for speed.
138 case Present of
139 fetched -> ok;
140 true -> ok;
141 false -> t_decrease_missing_chunks(Id, PieceNum)
143 end),
144 Res.
146 %%--------------------------------------------------------------------
147 %% Function: endgame_remove_chunk/3
148 %% Args: Pid ::= pid() - pid of caller
149 %% Id ::= integer() - torrent id
150 %% IOL ::= {integer(), integer(), integer()} - {Index, Offs, Len}
151 %% Description: Remove a chunk in the endgame from its assignment to a
152 %% given pid
153 %%--------------------------------------------------------------------
154 endgame_remove_chunk(Pid, Id, {Index, Offset, _Len}) ->
155 case mnesia:dirty_read(chunk, {Id, Index, {assigned, Pid}}) of
156 [] ->
158 [R] ->
159 NC = lists:keydelete(Offset, 1, R#chunk.chunks),
160 mnesia:dirty_write(R#chunk {chunks = NC})
161 end.
163 %%--------------------------------------------------------------------
164 %% Function: mark_fetched/2
165 %% Args: Id ::= integer() - torrent id
166 %% IOL ::= {integer(), integer(), integer()} - {Index, Offs, Len}
167 %% Description: Mark a given chunk as fetched.
168 %%--------------------------------------------------------------------
169 mark_fetched(Id, {Index, Offset, _Len}) ->
170 F = fun () ->
171 case mnesia:read(chunk, {Id, Index, not_fetched}, write) of
172 [] ->
173 assigned;
174 [R] ->
175 case lists:keymember(Offset, 1, R#chunk.chunks) of
176 true ->
177 case lists:keydelete(Offset, 1, R#chunk.chunks) of
178 [] -> mnesia:delete_object(R);
179 NC -> mnesia:write(R#chunk { chunks = NC })
180 end,
181 found;
182 false ->
183 assigned
186 end,
187 {atomic, Res} = mnesia:transaction(F),
188 Res.
190 %%--------------------------------------------------------------------
191 %% Function: remove_chunks/2
192 %% Args: Id ::= integer() - torrent id
193 %% Idx ::= integer() - Index of Piece
194 %% Description: Oblitterate all chunks for Index in the torrent Id.
195 %%--------------------------------------------------------------------
196 remove_chunks(Id, Idx) ->
197 MatchHead = #chunk { idt = {Id, Idx, '_'}, _ = '_' },
198 F = fun () ->
199 Rows = mnesia:select(chunk, [{MatchHead, [], ['$_']}]),
200 [mnesia:delete_object(Row) || Row <- Rows]
201 end,
202 {atomic, _} = mnesia:transaction(F),
205 %%====================================================================
206 %% Internal functions
207 %%====================================================================
209 %%--------------------------------------------------------------------
210 %% Function: find_remaining_chunks(Id, PieceSet) -> [Chunk]
211 %% Description: Find all remaining chunks for a torrent matching PieceSet
212 %%--------------------------------------------------------------------
213 find_remaining_chunks(Id, PieceSet) ->
214 MatchHeadAssign = #chunk { idt = {Id, '$1', {assigned, '_'}}, chunks = '$2'},
215 MatchHeadNotFetch = #chunk { idt = {Id, '$1', not_fetched}, chunks = '$2'},
216 RowsA = mnesia:dirty_select(chunk, [{MatchHeadAssign, [], [{{'$1', '$2'}}]}]),
217 RowsN = mnesia:dirty_select(chunk, [{MatchHeadNotFetch, [], [{{'$1', '$2'}}]}]),
218 Eligible = [{PN, Chunks} || {PN, Chunks} <- (RowsA ++ RowsN),
219 gb_sets:is_element(PN, PieceSet)],
220 [{PN, Os, Sz, Ops} || {PN, Chunks} <- Eligible, {Os, Sz, Ops} <- Chunks].
223 %%--------------------------------------------------------------------
224 %% Function: chunkify_new_piece(Id, PieceSet) -> ok | none_eligible
225 %% Description: Find a piece in the PieceSet which has not been chunked
226 %% yet and chunk it. Returns either ok if a piece was chunked or none_eligible
227 %% if we can't find anything to chunk up in the PieceSet.
229 %%--------------------------------------------------------------------
230 chunkify_new_piece(Id, PieceSet) when is_integer(Id) ->
231 It = gb_sets:iterator(PieceSet),
232 case find_new_piece(Id, It) of
233 none ->
234 none_eligible;
235 P when is_record(P, piece) ->
236 chunkify_piece(Id, P),
238 end.
241 %%--------------------------------------------------------------------
242 %% Function: select_chunks_by_piecenum(Id, PieceNum, Num, Pid) ->
243 %% {ok, [{Offset, Len}], Remain}
244 %% Description: Select up to Num chunks from PieceNum. Will return either
245 %% {ok, Chunks} if it got all chunks it wanted, or {partial, Chunks, Remain}
246 %% if it got some chunks and there is still Remain chunks left to pick.
247 %%--------------------------------------------------------------------
248 select_chunks_by_piecenum(Id, PieceNum, Num, Pid) ->
249 mnesia:transaction(
250 fun () ->
251 case mnesia:read(chunk, {Id, PieceNum, not_fetched}, write) of
252 [] ->
253 %% There are no such chunk anymore. Someone else exhausted it
254 {ok, {PieceNum, []}, Num};
255 [R] ->
256 %% Get up to the number of chunks we want
257 {Return, Rest} = etorrent_utils:gsplit(Num, R#chunk.chunks),
258 [_|_] = Return, % Assert the state of Return
259 %% Write back the missing ones
260 case Rest of
261 [] ->
262 mnesia:delete_object(R);
263 [_|_] ->
264 mnesia:write(R#chunk {chunks = Rest})
265 end,
266 %% Assign chunk to us
268 case mnesia:read(chunk, {Pid, PieceNum, {assigned, Pid}}, write) of
269 [] ->
270 #chunk { idt = {Id, PieceNum, {assigned, Pid}},
271 chunks = [] };
272 [C] when is_record(C, chunk) ->
274 end,
275 mnesia:write(Q#chunk { chunks = Return ++ Q#chunk.chunks}),
276 %% Return remaining
277 Remaining = Num - length(Return),
278 {ok, {PieceNum, Return}, Remaining}
280 end).
282 %%--------------------------------------------------------------------
283 %% Function: chunkify(Operations) ->
284 %% [{Offset, Size, FileOperations}]
285 %% Description: From a list of operations to read/write a piece, construct
286 %% a list of chunks given by Offset of the chunk, Size of the chunk and
287 %% how to read/write that chunk.
288 %%--------------------------------------------------------------------
290 %% First, we call the version of the function doing the grunt work.
291 chunkify(Operations) ->
292 chunkify(0, 0, [], Operations, ?DEFAULT_CHUNK_SIZE).
294 %% Suppose the next File operation on the piece has 0 bytes in size, then it
295 %% is exhausted and must be thrown away.
296 chunkify(AtOffset, EatenBytes, Operations,
297 [{_Path, _Offset, 0} | Rest], Left) ->
298 chunkify(AtOffset, EatenBytes, Operations, Rest, Left);
300 %% There are no more file operations to carry out. Hence we reached the end of
301 %% the piece and we just return the last chunk operation. Remember to reverse
302 %% the list of operations for that chunk as we build it in reverse.
303 chunkify(AtOffset, EatenBytes, Operations, [], _Sz) ->
304 [{AtOffset, EatenBytes, lists:reverse(Operations)}];
306 %% There are no more bytes left to add to this chunk. Recurse by calling
307 %% on the rest of the problem and add our chunk to the front when coming
308 %% back. Remember to reverse the Operations list built in reverse.
309 chunkify(AtOffset, EatenBytes, Operations, OpsLeft, 0) ->
310 R = chunkify(AtOffset + EatenBytes, 0, [], OpsLeft, ?DEFAULT_CHUNK_SIZE),
311 [{AtOffset, EatenBytes, lists:reverse(Operations)} | R];
313 %% The next file we are processing have a larger size than what is left for this
314 %% chunk. Hence we can just eat off that many bytes from the front file.
315 chunkify(AtOffset, EatenBytes, Operations,
316 [{Path, Offset, Size} | Rest], Left) when Left =< Size ->
317 chunkify(AtOffset, EatenBytes + Left,
318 [{Path, Offset, Left} | Operations],
319 [{Path, Offset+Left, Size - Left} | Rest],
322 %% The next file does *not* have enough bytes left, so we eat all the bytes
323 %% we can get from it, and move on to the next file.
324 chunkify(AtOffset, EatenBytes, Operations,
325 [{Path, Offset, Size} | Rest], Left) when Left > Size ->
326 chunkify(AtOffset, EatenBytes + Size,
327 [{Path, Offset, Size} | Operations],
328 Rest,
329 Left - Size).
331 %%--------------------------------------------------------------------
332 %% Function: chunkify_piece(Id, PieceNum) -> {atomic, ok} | {aborted, Reason}
333 %% Description: Given a PieceNumber, cut it up into chunks and add those
334 %% to the chunk table.
335 %%--------------------------------------------------------------------
336 chunkify_piece(Id, P) when is_record(P, piece) ->
337 Chunks = chunkify(P#piece.files),
338 NumChunks = length(Chunks),
339 {atomic, Res} =
340 mnesia:transaction(
341 fun () ->
342 [S] = mnesia:read({piece, P#piece.idpn}),
343 case S#piece.state of
344 not_fetched ->
345 ok = mnesia:write(S#piece{ state = chunked,
346 left = NumChunks}),
347 ok = mnesia:write(#chunk { idt = {S#piece.id,
348 S#piece.piece_number, not_fetched},
349 chunks = Chunks}),
351 _ ->
352 already_there
354 end),
355 case Res of
356 ok ->
357 etorrent_torrent:decrease_not_fetched(Id); % endgames as side-eff.
358 already_there ->
360 end,
363 %%--------------------------------------------------------------------
364 %% Function: find_new_piece(Id, Iterator) -> #piece | none
365 %% Description: Search an iterator for a not_fetched piece. Return the #piece
366 %% record or none.
367 %%--------------------------------------------------------------------
368 find_new_piece(Id, Iterator) ->
369 case gb_sets:next(Iterator) of
370 {PieceNumber, Next} ->
371 case mnesia:dirty_read(piece, {Id, PieceNumber}) of
372 [] ->
373 find_new_piece(Id, Next);
374 [P] when P#piece.state =:= not_fetched ->
376 [_P] ->
377 find_new_piece(Id, Next)
378 end;
379 none ->
380 none
381 end.
383 %%--------------------------------------------------------------------
384 %% Function: find_chunked_chunks(Id, iterator_result()) -> none | PieceNum
385 %% Description: Search an iterator for a chunked piece.
386 %%--------------------------------------------------------------------
387 find_chunked_chunks(_Id, none, Res) ->
388 Res;
389 find_chunked_chunks(Id, {Pn, Next}, Res) ->
390 [P] = mnesia:dirty_read(piece, {Id, Pn}),
391 case P#piece.state of
392 chunked ->
393 case mnesia:dirty_read(chunk, {Id, Pn, not_fetched}) of
394 [] ->
395 find_chunked_chunks(Id, gb_sets:next(Next), found_chunked);
396 _ ->
397 P#piece.piece_number %% Optimization: Pick the whole piece and pass it on
398 end;
399 _Other ->
400 find_chunked_chunks(Id, gb_sets:next(Next), Res)
401 end.
403 t_update_fetched(Id, PieceNum, {Offset, _Len}) ->
404 case etorrent_piece:t_fetched(Id, PieceNum) of
405 true -> fetched;
406 false ->
407 case mnesia:read(chunk, {Id, PieceNum, fetched}, write) of
408 [] ->
409 mnesia:write(#chunk { idt = {Id, PieceNum, fetched},
410 chunks = [Offset]}),
411 false;
412 [R] ->
413 case lists:member(Offset, R#chunk.chunks) of
414 true ->
415 true;
416 false ->
417 mnesia:write(
418 R#chunk { chunks =
419 [Offset | R#chunk.chunks]}),
420 false
423 end.
425 t_update_chunk_assignment(Id, PieceNum, Pid,
426 {Offset, Len}) ->
427 case mnesia:read(chunk, {Id, PieceNum, {assigned, Pid}}, write) of
428 [] ->
429 %% We stored a chunk that was not belonging to us, do nothing
431 [S] ->
432 case lists:keydelete({Offset, Len}, 1, S#chunk.chunks) of
433 [] ->
434 mnesia:delete_object(S);
435 L when is_list(L) ->
436 mnesia:write(S#chunk { chunks = L })
438 end.
440 t_decrease_missing_chunks(Id, PieceNum) ->
441 [P] = mnesia:read(piece, {Id, PieceNum}, write),
442 NewP = P#piece { left = P#piece.left - 1 },
443 mnesia:write(NewP),
444 case NewP#piece.left of
445 0 ->
446 full;
447 N when is_integer(N) ->
449 end.