Handle HAVE_ALL and HAVE_NONE. Cleanup the BITFIELD message.
[etorrent.git] / lib / etorrent-1.0 / src / etorrent_chunk_mgr.erl
blobcbe0a997c41c5eb3cff31eb262e519c48e37ced2
1 %%%-------------------------------------------------------------------
2 %%% File : etorrent_chunk_mgr.erl
3 %%% Author : Jesper Louis Andersen <jlouis@ogre.home>
4 %%% Description : Chunk manager of etorrent.
5 %%%
6 %%% Created : 20 Jul 2008 by Jesper Louis Andersen <jlouis@ogre.home>
7 %%%-------------------------------------------------------------------
8 -module(etorrent_chunk_mgr).
10 -include("etorrent_piece.hrl").
11 -include("etorrent_chunk.hrl").
13 -behaviour(gen_server).
15 %% API
16 -export([start_link/0, remove_chunks/2, store_chunk/4, putback_chunks/1,
17 mark_fetched/2, pick_chunks/4, endgame_remove_chunk/3]).
19 %% gen_server callbacks
20 -export([init/1, handle_call/3, handle_cast/2, handle_info/2,
21 terminate/2, code_change/3]).
23 -record(state, {}).
24 -define(SERVER, ?MODULE).
25 -define(STORE_CHUNK_TIMEOUT, 20).
26 -define(PICK_CHUNKS_TIMEOUT, 20).
27 -define(DEFAULT_CHUNK_SIZE, 16384).
29 %%====================================================================
30 %% API
31 %%====================================================================
32 %%--------------------------------------------------------------------
33 %% Function: start_link() -> {ok,Pid} | ignore | {error,Error}
34 %% Description: Starts the server
35 %%--------------------------------------------------------------------
36 start_link() ->
37 gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
39 %%--------------------------------------------------------------------
40 %% Function: mark_fetched/2
41 %% Args: Id ::= integer() - torrent id
42 %% IOL ::= {integer(), integer(), integer()} - {Index, Offs, Len}
43 %% Description: Mark a given chunk as fetched.
44 %%--------------------------------------------------------------------
45 mark_fetched(Id, {Index, Offset, Len}) ->
46 gen_server:call(?SERVER, {mark_fetched, Id, Index, Offset, Len}).
48 %%--------------------------------------------------------------------
49 %% Function: store_chunk(Id, PieceNum, {Offset, Len},
50 %% Data, FSPid, PeerGroupPid, Pid) -> ok
51 %% Description: Workhorse function. Store a chunk in the chunk table.
52 %% If we have all chunks we need, then report the piece is full.
53 %%--------------------------------------------------------------------
54 store_chunk(Id, Index, {Offset, Len}, Pid) ->
55 gen_server:call(?SERVER, {store_chunk, Id, Index, {Offset, Len}, Pid},
56 timer:seconds(?STORE_CHUNK_TIMEOUT)).
58 %%--------------------------------------------------------------------
59 %% Function: putback_chunks(Pid) -> transaction
60 %% Description: Find all chunks assigned to Pid and mark them as not_fetched
61 %%--------------------------------------------------------------------
62 putback_chunks(Pid) ->
63 gen_server:cast(?SERVER, {putback_chunks, Pid}).
65 %%--------------------------------------------------------------------
66 %% Function: remove_chunks/2
67 %% Args: Id ::= integer() - torrent id
68 %% Idx ::= integer() - Index of Piece
69 %% Description: Oblitterate all chunks for Index in the torrent Id.
70 %%--------------------------------------------------------------------
71 remove_chunks(TorrentId, Index) ->
72 gen_server:cast(?SERVER, {remove_chunks, TorrentId, Index}).
74 %%--------------------------------------------------------------------
75 %% Function: endgame_remove_chunk/3
76 %% Args: Pid ::= pid() - pid of caller
77 %% Id ::= integer() - torrent id
78 %% IOL ::= {integer(), integer(), integer()} - {Index, Offs, Len}
79 %% Description: Remove a chunk in the endgame from its assignment to a
80 %% given pid
81 %%--------------------------------------------------------------------
82 endgame_remove_chunk(Pid, Id, {Index, Offset, Len}) ->
83 gen_server:call(?SERVER, {endgame_remove_chunk, Pid, Id, {Index, Offset, Len}}).
85 %%--------------------------------------------------------------------
86 %% Function: pick_chunks(Handle, PieceSet, StatePid, Num) -> ...
87 %% Description: Return some chunks for downloading.
89 %% This function is relying on tail-calls to itself with different
90 %% tags to return the needed data.
92 %%--------------------------------------------------------------------
93 pick_chunks(Pid, Id, Set, N) ->
94 gen_server:call(?SERVER, {pick_chunks, Pid, Id, Set, N},
95 timer:seconds(?PICK_CHUNKS_TIMEOUT)).
97 %%====================================================================
98 %% gen_server callbacks
99 %%====================================================================
101 %%--------------------------------------------------------------------
102 %% Function: init(Args) -> {ok, State} |
103 %% {ok, State, Timeout} |
104 %% ignore |
105 %% {stop, Reason}
106 %% Description: Initiates the server
107 %%--------------------------------------------------------------------
108 init([]) ->
109 process_flag(trap_exit, true),
110 _Tid = ets:new(etorrent_chunk_tbl, [set, protected, named_table,
111 {keypos, 2}]),
112 {ok, #state{}}.
114 %%--------------------------------------------------------------------
115 %% Function: %% handle_call(Request, From, State) -> {reply, Reply, State} |
116 %% {reply, Reply, State, Timeout} |
117 %% {noreply, State} |
118 %% {noreply, State, Timeout} |
119 %% {stop, Reason, Reply, State} |
120 %% {stop, Reason, State}
121 %% Description: Handling call messages
122 %%--------------------------------------------------------------------
123 handle_call({mark_fetched, Id, Index, Offset, _Len}, _From, S) ->
124 Res = case ets:lookup(etorrent_chunk_tbl, {Id, Index, not_fetched}) of
125 [] -> assigned;
126 [R] -> case lists:keymember(Offset, 1, R#chunk.chunks) of
127 true -> case lists:keydelete(Offset, 1, R#chunk.chunks) of
128 [] -> ets:delete_object(etorrent_chunk_tbl, R);
129 NC -> ets:insert(etorrent_chunk_tbl,
130 R#chunk { chunks = NC })
131 end,
132 found;
133 false -> assigned
135 end,
136 {reply, Res, S};
137 handle_call({endgame_remove_chunk, Pid, Id, {Index, Offset, _Len}}, _From, S) ->
138 Res = case ets:lookup(etorrent_chunk_tbl, {Id, Index, {assigned, Pid}}) of
139 [] -> ok;
140 [R] -> case lists:keydelete(Offset, 1, R#chunk.chunks) of
141 [] -> ets:delete_object(etorrent_chunk_tbl, R);
142 NC -> ets:insert(etorrent_chunk_tbl,
143 R#chunk { chunks = NC })
145 end,
146 {reply, Res, S};
147 handle_call({store_chunk, Id, Index, {Offset, Len}, Pid}, _From, S) ->
148 %% Add the newly fetched data to the fetched list
149 Present = update_fetched(Id, Index, {Offset, Len}),
150 %% Update chunk assignment
151 update_chunk_assignment(Id, Index, Pid, {Offset, Len}),
152 %% Countdown number of missing chunks
153 R = case Present of
154 fetched -> ok;
155 true -> ok;
156 false -> etorrent_piece_mgr:decrease_missing_chunks(Id, Index)
157 end,
158 {reply, R, S};
159 handle_call({pick_chunks, Pid, Id, Set, Remaining}, _From, S) ->
160 R = case pick_chunks(pick_chunked, {Pid, Id, Set, [], Remaining, none}) of
161 not_interested -> pick_chunks_endgame(Id, Set, Remaining, not_interested);
162 {ok, []} -> pick_chunks_endgame(Id, Set, Remaining, none_eligible);
163 {ok, Items} -> {ok, Items}
164 end,
165 {reply, R, S};
166 handle_call(_Request, _From, State) ->
167 Reply = ok,
168 {reply, Reply, State}.
170 %%--------------------------------------------------------------------
171 %% Function: handle_cast(Msg, State) -> {noreply, State} |
172 %% {noreply, State, Timeout} |
173 %% {stop, Reason, State}
174 %% Description: Handling cast messages
175 %%--------------------------------------------------------------------
176 handle_cast({putback_chunks, Pid}, S) ->
177 MatchHead = #chunk { idt = {'_', '_', {assigned, Pid}}, _ = '_'},
178 Rows = ets:select(etorrent_chunk_tbl, [{MatchHead, [], ['$_']}]),
179 lists:foreach(
180 fun(C) ->
181 {Id, Idx, _} = C#chunk.idt,
182 Chunks = C#chunk.chunks,
183 NotFetchIdt = {Id, Idx, not_fetched},
184 case ets:lookup(etorrent_chunk_tbl, NotFetchIdt) of
185 [] ->
186 ets:insert(etorrent_chunk_tbl,
187 #chunk { idt = NotFetchIdt,
188 chunks = Chunks});
189 [R] ->
190 ets:insert(etorrent_chunk_tbl,
191 R#chunk { chunks = R#chunk.chunks ++ Chunks})
192 end,
193 ets:delete_object(etorrent_chunk_tbl, C)
194 end,
195 Rows),
196 {noreply, S};
197 handle_cast({remove_chunks, Id, Idx}, S) ->
198 MatchHead = #chunk { idt = {Id, Idx, '_'}, _ = '_'},
199 ets:select_delete(etorrent_chunk_tbl,
200 [{MatchHead, [], [true]}]),
201 {noreply, S};
202 handle_cast(_Msg, State) ->
203 {noreply, State}.
205 %%--------------------------------------------------------------------
206 %% Function: handle_info(Info, State) -> {noreply, State} |
207 %% {noreply, State, Timeout} |
208 %% {stop, Reason, State}
209 %% Description: Handling all non call/cast messages
210 %%--------------------------------------------------------------------
211 handle_info(_Info, State) ->
212 {noreply, State}.
214 %%--------------------------------------------------------------------
215 %% Function: terminate(Reason, State) -> void()
216 %% Description: This function is called by a gen_server when it is about to
217 %% terminate. It should be the opposite of Module:init/1 and do any necessary
218 %% cleaning up. When it returns, the gen_server terminates with Reason.
219 %% The return value is ignored.
220 %%--------------------------------------------------------------------
221 terminate(_Reason, _State) ->
222 ets:delete(etorrent_chunk_tbl),
225 %%--------------------------------------------------------------------
226 %% Func: code_change(OldVsn, State, Extra) -> {ok, NewState}
227 %% Description: Convert process state when code is changed
228 %%--------------------------------------------------------------------
229 code_change(_OldVsn, State, _Extra) ->
230 {ok, State}.
233 %%--------------------------------------------------------------------
234 %%% Internal functions
235 %%--------------------------------------------------------------------
237 %%--------------------------------------------------------------------
238 %% Function: find_remaining_chunks(Id, PieceSet) -> [Chunk]
239 %% Description: Find all remaining chunks for a torrent matching PieceSet
240 %%--------------------------------------------------------------------
241 find_remaining_chunks(Id, PieceSet) ->
242 %% Note that the chunk table is often very small.
243 MatchHeadAssign = #chunk { idt = {Id, '$1', {assigned, '_'}}, chunks = '$2'},
244 MatchHeadNotFetch = #chunk { idt = {Id, '$1', not_fetched}, chunks = '$2'},
245 RowsA = ets:select(etorrent_chunk_tbl, [{MatchHeadAssign, [], [{{'$1', '$2'}}]}]),
246 RowsN = ets:select(etorrent_chunk_tbl, [{MatchHeadNotFetch, [], [{{'$1', '$2'}}]}]),
247 Eligible = [{PN, Chunks} || {PN, Chunks} <- RowsA ++ RowsN,
248 gb_sets:is_element(PN, PieceSet)],
249 [{PN, Os, Sz, Ops} || {PN, Chunks} <- Eligible,
250 {Os, Sz, Ops} <- Chunks].
252 %%--------------------------------------------------------------------
253 %% Function: chunkify_new_piece(Id, PieceSet) -> ok | none_eligible
254 %% Description: Find a piece in the PieceSet which has not been chunked
255 %% yet and chunk it. Returns either ok if a piece was chunked or none_eligible
256 %% if we can't find anything to chunk up in the PieceSet.
258 %%--------------------------------------------------------------------
259 chunkify_new_piece(Id, PieceSet) when is_integer(Id) ->
260 It = gb_sets:iterator(PieceSet),
261 case find_new_piece(Id, gb_sets:next(It)) of
262 none -> none_eligible;
263 P when is_record(P, piece) ->
264 chunkify_piece(Id, P),
266 end.
268 %%--------------------------------------------------------------------
269 %% Function: select_chunks_by_piecenum(Id, PieceNum, Num, Pid) ->
270 %% {ok, [{Offset, Len}], Remain}
271 %% Description: Select up to Num chunks from PieceNum. Will return either
272 %% {ok, Chunks} if it got all chunks it wanted, or {partial, Chunks, Remain}
273 %% if it got some chunks and there is still Remain chunks left to pick.
274 %%--------------------------------------------------------------------
275 select_chunks_by_piecenum(Id, Index, N, Pid) ->
276 [R] = ets:lookup(etorrent_chunk_tbl,
277 {Id, Index, not_fetched}),
278 %% Get up to N chunks
279 {Return, Rest} = etorrent_utils:gsplit(N, R#chunk.chunks),
280 [_|_] = Return, %% Assert.
281 %% Write back missing chunks.
282 case Rest of
283 [] -> ets:delete_object(etorrent_chunk_tbl, R);
284 [_|_] -> ets:insert(etorrent_chunk_tbl, R#chunk { chunks = Rest})
285 end,
286 %% Assign chunk to us
287 Q = case ets:lookup(etorrent_chunk_tbl, {Id, Index, {assigned, Pid}}) of
288 [] ->
289 #chunk { idt = {Id, Index, {assigned, Pid}},
290 chunks = [] };
291 [C] -> C
292 end,
293 ets:insert(etorrent_chunk_tbl, Q#chunk { chunks = Return ++ Q#chunk.chunks}),
294 %% Tell caller how much is remaning
295 Remaining = N - length(Return),
296 {ok, {Index, Return}, Remaining}.
299 %%--------------------------------------------------------------------
300 %% Function: chunkify(Operations) ->
301 %% [{Offset, Size, FileOperations}]
302 %% Description: From a list of operations to read/write a piece, construct
303 %% a list of chunks given by Offset of the chunk, Size of the chunk and
304 %% how to read/write that chunk.
305 %%--------------------------------------------------------------------
307 %% First, we call the version of the function doing the grunt work.
308 chunkify(Operations) ->
309 chunkify(0, 0, [], Operations, ?DEFAULT_CHUNK_SIZE).
311 %% Suppose the next File operation on the piece has 0 bytes in size, then it
312 %% is exhausted and must be thrown away.
313 chunkify(AtOffset, EatenBytes, Operations,
314 [{_Path, _Offset, 0} | Rest], Left) ->
315 chunkify(AtOffset, EatenBytes, Operations, Rest, Left);
317 %% There are no more file operations to carry out. Hence we reached the end of
318 %% the piece and we just return the last chunk operation. Remember to reverse
319 %% the list of operations for that chunk as we build it in reverse.
320 chunkify(AtOffset, EatenBytes, Operations, [], _Sz) ->
321 [{AtOffset, EatenBytes, lists:reverse(Operations)}];
323 %% There are no more bytes left to add to this chunk. Recurse by calling
324 %% on the rest of the problem and add our chunk to the front when coming
325 %% back. Remember to reverse the Operations list built in reverse.
326 chunkify(AtOffset, EatenBytes, Operations, OpsLeft, 0) ->
327 R = chunkify(AtOffset + EatenBytes, 0, [], OpsLeft, ?DEFAULT_CHUNK_SIZE),
328 [{AtOffset, EatenBytes, lists:reverse(Operations)} | R];
330 %% The next file we are processing have a larger size than what is left for this
331 %% chunk. Hence we can just eat off that many bytes from the front file.
332 chunkify(AtOffset, EatenBytes, Operations,
333 [{Path, Offset, Size} | Rest], Left) when Left =< Size ->
334 chunkify(AtOffset, EatenBytes + Left,
335 [{Path, Offset, Left} | Operations],
336 [{Path, Offset+Left, Size - Left} | Rest],
339 %% The next file does *not* have enough bytes left, so we eat all the bytes
340 %% we can get from it, and move on to the next file.
341 chunkify(AtOffset, EatenBytes, Operations,
342 [{Path, Offset, Size} | Rest], Left) when Left > Size ->
343 chunkify(AtOffset, EatenBytes + Size,
344 [{Path, Offset, Size} | Operations],
345 Rest,
346 Left - Size).
348 %%--------------------------------------------------------------------
349 %% Function: chunkify_piece(Id, PieceNum) -> ok
350 %% Description: Given a PieceNumber, cut it up into chunks and add those
351 %% to the chunk table.
352 %%--------------------------------------------------------------------
353 chunkify_piece(Id, P) when is_record(P, piece) ->
354 Chunks = chunkify(P#piece.files),
355 NumChunks = length(Chunks),
356 not_fetched = P#piece.state,
357 {Id, Idx} = P#piece.idpn,
358 ok = etorrent_piece_mgr:chunk(Id, Idx, NumChunks),
359 ets:insert(etorrent_chunk_tbl,
360 #chunk { idt = {Id, Idx, not_fetched},
361 chunks = Chunks}),
362 etorrent_torrent:decrease_not_fetched(Id),
365 %%--------------------------------------------------------------------
366 %% Function: find_new_piece(Id, Iterator) -> #piece | none
367 %% Description: Search an iterator for a not_fetched piece. Return the #piece
368 %% record or none.
369 %%--------------------------------------------------------------------
370 find_new_piece(_Id, none) -> none;
371 find_new_piece(Id, {PN, Next}) ->
372 case ets:lookup(etorrent_piece_tbl, {Id, PN}) of
373 [] ->
374 find_new_piece(Id, gb_sets:next(Next));
375 [P] when P#piece.state =:= not_fetched ->
377 [_P] -> find_new_piece(Id, gb_sets:next(Next))
378 end.
380 %%--------------------------------------------------------------------
381 %% Function: find_chunked_chunks(Id, iterator_result()) -> none | PieceNum
382 %% Description: Search an iterator for a chunked piece.
383 %%--------------------------------------------------------------------
384 find_chunked_chunks(_Id, none, Res) ->
385 Res;
386 find_chunked_chunks(Id, {Pn, Next}, Res) ->
387 [P] = ets:lookup(etorrent_piece_tbl, {Id, Pn}),
388 case P#piece.state of
389 chunked ->
390 case ets:lookup(etorrent_chunk_tbl, {Id, Pn, not_fetched}) of
391 [] ->
392 find_chunked_chunks(Id, gb_sets:next(Next), found_chunked);
393 _ ->
394 P#piece.piece_number
395 end;
396 _Other ->
397 find_chunked_chunks(Id, gb_sets:next(Next), Res)
398 end.
400 update_fetched(Id, Index, {Offset, _Len}) ->
401 case etorrent_piece_mgr:fetched(Id, Index) of
402 true -> fetched;
403 false ->
404 case ets:lookup(etorrent_chunk_tbl,
405 {Id, Index, fetched}) of
406 [] ->
407 ets:insert(etorrent_chunk_tbl,
408 #chunk { idt = {Id, Index, fetched},
409 chunks = [Offset]}),
410 false;
411 [R] ->
412 case lists:member(Offset, R#chunk.chunks) of
413 true -> true;
414 false ->
415 ets:insert(etorrent_chunk_tbl,
416 R#chunk { chunks = [ Offset | R#chunk.chunks]}),
417 false
420 end.
422 update_chunk_assignment(Id, Index, Pid,
423 {Offset, Len}) ->
424 case ets:lookup(etorrent_chunk_tbl,
425 {Id, Index, {assigned, Pid}}) of
426 [] ->
427 %% Stored a chunk not belonging to us, ignore
429 [S] ->
430 case lists:keydelete({Offset, Len}, 1, S#chunk.chunks) of
431 [] ->
432 ets:delete_object(etorrent_chunk_tbl, S);
433 L when is_list(L) ->
434 ets:insert(etorrent_chunk_tbl,
435 S#chunk { chunks = L })
437 end.
440 %% There are 0 remaining chunks to be desired, return the chunks so far
441 pick_chunks(_Operation, {_Pid, _Id, _PieceSet, SoFar, 0, _Res}) ->
442 {ok, SoFar};
444 %% Pick chunks from the already chunked pieces
445 pick_chunks(pick_chunked, {Pid, Id, PieceSet, SoFar, Remaining, Res}) ->
446 Iterator = gb_sets:iterator(PieceSet),
447 case find_chunked_chunks(Id, gb_sets:next(Iterator), Res) of
448 none ->
449 pick_chunks(chunkify_piece, {Pid, Id, PieceSet, SoFar, Remaining, none});
450 found_chunked ->
451 pick_chunks(chunkify_piece, {Pid, Id, PieceSet, SoFar, Remaining, found_chunked});
452 PieceNum when is_integer(PieceNum) ->
453 {ok, Chunks, Left} =
454 select_chunks_by_piecenum(Id, PieceNum,
455 Remaining, Pid),
456 pick_chunks(pick_chunked, {Pid, Id,
457 gb_sets:del_element(PieceNum, PieceSet),
458 [Chunks | SoFar],
459 Left, Res})
460 end;
463 %% Find a new piece to chunkify. Give up if no more pieces can be chunkified
464 pick_chunks(chunkify_piece, {Pid, Id, PieceSet, SoFar, Remaining, Res}) ->
465 case chunkify_new_piece(Id, PieceSet) of
466 ok ->
467 pick_chunks(pick_chunked, {Pid, Id, PieceSet, SoFar, Remaining, Res});
468 none_eligible when SoFar =:= [], Res =:= none ->
469 not_interested;
470 none_eligible when SoFar =:= [], Res =:= found_chunked ->
471 {ok, []};
472 none_eligible ->
473 {ok, SoFar}
474 end;
476 %% Handle the endgame for a torrent gracefully
477 pick_chunks(endgame, {Id, PieceSet, N}) ->
478 Remaining = find_remaining_chunks(Id, PieceSet),
479 Shuffled = etorrent_utils:shuffle(Remaining),
480 {endgame, lists:sublist(Shuffled, N)}.
483 pick_chunks_endgame(Id, Set, Remaining, Ret) ->
484 case etorrent_torrent:is_endgame(Id) of
485 false -> Ret; %% No endgame yet
486 true -> pick_chunks(endgame, {Id, Set, Remaining})
487 end.