1 %%%-------------------------------------------------------------------
2 %%% File : etorrent_chunk_mgr.erl
3 %%% Author : Jesper Louis Andersen <jlouis@ogre.home>
4 %%% Description : Chunk manager of etorrent.
6 %%% Created : 20 Jul 2008 by Jesper Louis Andersen <jlouis@ogre.home>
7 %%%-------------------------------------------------------------------
8 -module(etorrent_chunk_mgr
).
10 -include("etorrent_piece.hrl").
11 -include("etorrent_chunk.hrl").
13 -behaviour(gen_server
).
16 -export([start_link
/0, remove_chunks
/2, store_chunk
/4, putback_chunks
/1,
17 mark_fetched
/2, pick_chunks
/4, endgame_remove_chunk
/3]).
19 %% gen_server callbacks
20 -export([init
/1, handle_call
/3, handle_cast
/2, handle_info
/2,
21 terminate
/2, code_change
/3]).
24 -define(SERVER
, ?MODULE
).
25 -define(STORE_CHUNK_TIMEOUT
, 20).
26 -define(PICK_CHUNKS_TIMEOUT
, 20).
27 -define(DEFAULT_CHUNK_SIZE
, 16384).
29 %%====================================================================
31 %%====================================================================
32 %%--------------------------------------------------------------------
33 %% Function: start_link() -> {ok,Pid} | ignore | {error,Error}
34 %% Description: Starts the server
35 %%--------------------------------------------------------------------
37 gen_server:start_link({local
, ?SERVER
}, ?MODULE
, [], []).
39 %%--------------------------------------------------------------------
40 %% Function: mark_fetched/2
41 %% Args: Id ::= integer() - torrent id
42 %% IOL ::= {integer(), integer(), integer()} - {Index, Offs, Len}
43 %% Description: Mark a given chunk as fetched.
44 %%--------------------------------------------------------------------
45 mark_fetched(Id
, {Index
, Offset
, Len
}) ->
46 gen_server:call(?SERVER
, {mark_fetched
, Id
, Index
, Offset
, Len
}).
48 %%--------------------------------------------------------------------
49 %% Function: store_chunk(Id, PieceNum, {Offset, Len},
50 %% Data, FSPid, PeerGroupPid, Pid) -> ok
51 %% Description: Workhorse function. Store a chunk in the chunk table.
52 %% If we have all chunks we need, then report the piece is full.
53 %%--------------------------------------------------------------------
54 store_chunk(Id
, Index
, {Offset
, Len
}, Pid
) ->
55 gen_server:call(?SERVER
, {store_chunk
, Id
, Index
, {Offset
, Len
}, Pid
},
56 timer:seconds(?STORE_CHUNK_TIMEOUT
)).
58 %%--------------------------------------------------------------------
59 %% Function: putback_chunks(Pid) -> transaction
60 %% Description: Find all chunks assigned to Pid and mark them as not_fetched
61 %%--------------------------------------------------------------------
62 putback_chunks(Pid
) ->
63 gen_server:cast(?SERVER
, {putback_chunks
, Pid
}).
65 %%--------------------------------------------------------------------
66 %% Function: remove_chunks/2
67 %% Args: Id ::= integer() - torrent id
68 %% Idx ::= integer() - Index of Piece
69 %% Description: Oblitterate all chunks for Index in the torrent Id.
70 %%--------------------------------------------------------------------
71 remove_chunks(TorrentId
, Index
) ->
72 gen_server:cast(?SERVER
, {remove_chunks
, TorrentId
, Index
}).
74 %%--------------------------------------------------------------------
75 %% Function: endgame_remove_chunk/3
76 %% Args: Pid ::= pid() - pid of caller
77 %% Id ::= integer() - torrent id
78 %% IOL ::= {integer(), integer(), integer()} - {Index, Offs, Len}
79 %% Description: Remove a chunk in the endgame from its assignment to a
81 %%--------------------------------------------------------------------
82 endgame_remove_chunk(Pid
, Id
, {Index
, Offset
, Len
}) ->
83 gen_server:call(?SERVER
, {endgame_remove_chunk
, Pid
, Id
, {Index
, Offset
, Len
}}).
85 %%--------------------------------------------------------------------
86 %% Function: pick_chunks(Handle, PieceSet, StatePid, Num) -> ...
87 %% Description: Return some chunks for downloading.
89 %% This function is relying on tail-calls to itself with different
90 %% tags to return the needed data.
92 %%--------------------------------------------------------------------
93 pick_chunks(Pid
, Id
, Set
, N
) ->
94 gen_server:call(?SERVER
, {pick_chunks
, Pid
, Id
, Set
, N
},
95 timer:seconds(?PICK_CHUNKS_TIMEOUT
)).
97 %%====================================================================
98 %% gen_server callbacks
99 %%====================================================================
101 %%--------------------------------------------------------------------
102 %% Function: init(Args) -> {ok, State} |
103 %% {ok, State, Timeout} |
106 %% Description: Initiates the server
107 %%--------------------------------------------------------------------
109 process_flag(trap_exit
, true
),
110 _Tid
= ets:new(etorrent_chunk_tbl
, [set
, protected
, named_table
,
114 %%--------------------------------------------------------------------
115 %% Function: %% handle_call(Request, From, State) -> {reply, Reply, State} |
116 %% {reply, Reply, State, Timeout} |
117 %% {noreply, State} |
118 %% {noreply, State, Timeout} |
119 %% {stop, Reason, Reply, State} |
120 %% {stop, Reason, State}
121 %% Description: Handling call messages
122 %%--------------------------------------------------------------------
123 handle_call({mark_fetched
, Id
, Index
, Offset
, _Len
}, _From
, S
) ->
124 Res
= case ets:lookup(etorrent_chunk_tbl
, {Id
, Index
, not_fetched
}) of
126 [R
] -> case lists:keymember(Offset
, 1, R#chunk
.chunks
) of
127 true
-> case lists:keydelete(Offset
, 1, R#chunk
.chunks
) of
128 [] -> ets:delete_object(etorrent_chunk_tbl
, R
);
129 NC
-> ets:insert(etorrent_chunk_tbl
,
130 R#chunk
{ chunks
= NC
})
137 handle_call({endgame_remove_chunk
, Pid
, Id
, {Index
, Offset
, _Len
}}, _From
, S
) ->
138 Res
= case ets:lookup(etorrent_chunk_tbl
, {Id
, Index
, {assigned
, Pid
}}) of
140 [R
] -> case lists:keydelete(Offset
, 1, R#chunk
.chunks
) of
141 [] -> ets:delete_object(etorrent_chunk_tbl
, R
);
142 NC
-> ets:insert(etorrent_chunk_tbl
,
143 R#chunk
{ chunks
= NC
})
147 handle_call({store_chunk
, Id
, Index
, {Offset
, Len
}, Pid
}, _From
, S
) ->
148 %% Add the newly fetched data to the fetched list
149 Present
= update_fetched(Id
, Index
, {Offset
, Len
}),
150 %% Update chunk assignment
151 update_chunk_assignment(Id
, Index
, Pid
, {Offset
, Len
}),
152 %% Countdown number of missing chunks
156 false
-> etorrent_piece_mgr:decrease_missing_chunks(Id
, Index
)
159 handle_call({pick_chunks
, Pid
, Id
, Set
, Remaining
}, _From
, S
) ->
160 R
= case pick_chunks(pick_chunked
, {Pid
, Id
, Set
, [], Remaining
, none
}) of
161 not_interested
-> pick_chunks_endgame(Id
, Set
, Remaining
, not_interested
);
162 {ok
, []} -> pick_chunks_endgame(Id
, Set
, Remaining
, none_eligible
);
163 {ok
, Items
} -> {ok
, Items
}
166 handle_call(_Request
, _From
, State
) ->
168 {reply
, Reply
, State
}.
170 %%--------------------------------------------------------------------
171 %% Function: handle_cast(Msg, State) -> {noreply, State} |
172 %% {noreply, State, Timeout} |
173 %% {stop, Reason, State}
174 %% Description: Handling cast messages
175 %%--------------------------------------------------------------------
176 handle_cast({putback_chunks
, Pid
}, S
) ->
177 MatchHead
= #chunk
{ idt
= {'_', '_', {assigned
, Pid
}}, _
= '_'},
178 Rows
= ets:select(etorrent_chunk_tbl
, [{MatchHead
, [], ['$_']}]),
181 {Id
, Idx
, _
} = C#chunk
.idt
,
182 Chunks
= C#chunk
.chunks
,
183 NotFetchIdt
= {Id
, Idx
, not_fetched
},
184 case ets:lookup(etorrent_chunk_tbl
, NotFetchIdt
) of
186 ets:insert(etorrent_chunk_tbl
,
187 #chunk
{ idt
= NotFetchIdt
,
190 ets:insert(etorrent_chunk_tbl
,
191 R#chunk
{ chunks
= R#chunk
.chunks
++ Chunks
})
193 ets:delete_object(etorrent_chunk_tbl
, C
)
197 handle_cast({remove_chunks
, Id
, Idx
}, S
) ->
198 MatchHead
= #chunk
{ idt
= {Id
, Idx
, '_'}, _
= '_'},
199 ets:select_delete(etorrent_chunk_tbl
,
200 [{MatchHead
, [], [true
]}]),
202 handle_cast(_Msg
, State
) ->
205 %%--------------------------------------------------------------------
206 %% Function: handle_info(Info, State) -> {noreply, State} |
207 %% {noreply, State, Timeout} |
208 %% {stop, Reason, State}
209 %% Description: Handling all non call/cast messages
210 %%--------------------------------------------------------------------
211 handle_info(_Info
, State
) ->
214 %%--------------------------------------------------------------------
215 %% Function: terminate(Reason, State) -> void()
216 %% Description: This function is called by a gen_server when it is about to
217 %% terminate. It should be the opposite of Module:init/1 and do any necessary
218 %% cleaning up. When it returns, the gen_server terminates with Reason.
219 %% The return value is ignored.
220 %%--------------------------------------------------------------------
221 terminate(_Reason
, _State
) ->
222 ets:delete(etorrent_chunk_tbl
),
225 %%--------------------------------------------------------------------
226 %% Func: code_change(OldVsn, State, Extra) -> {ok, NewState}
227 %% Description: Convert process state when code is changed
228 %%--------------------------------------------------------------------
229 code_change(_OldVsn
, State
, _Extra
) ->
233 %%--------------------------------------------------------------------
234 %%% Internal functions
235 %%--------------------------------------------------------------------
237 %%--------------------------------------------------------------------
238 %% Function: find_remaining_chunks(Id, PieceSet) -> [Chunk]
239 %% Description: Find all remaining chunks for a torrent matching PieceSet
240 %%--------------------------------------------------------------------
241 find_remaining_chunks(Id
, PieceSet
) ->
242 %% Note that the chunk table is often very small.
243 MatchHeadAssign
= #chunk
{ idt
= {Id
, '$1', {assigned
, '_'}}, chunks
= '$2'},
244 MatchHeadNotFetch
= #chunk
{ idt
= {Id
, '$1', not_fetched
}, chunks
= '$2'},
245 RowsA
= ets:select(etorrent_chunk_tbl
, [{MatchHeadAssign
, [], [{{'$1', '$2'}}]}]),
246 RowsN
= ets:select(etorrent_chunk_tbl
, [{MatchHeadNotFetch
, [], [{{'$1', '$2'}}]}]),
247 Eligible
= [{PN
, Chunks
} || {PN
, Chunks
} <- RowsA
++ RowsN
,
248 gb_sets:is_element(PN
, PieceSet
)],
249 [{PN
, Os
, Sz
, Ops
} || {PN
, Chunks
} <- Eligible
,
250 {Os
, Sz
, Ops
} <- Chunks
].
252 %%--------------------------------------------------------------------
253 %% Function: chunkify_new_piece(Id, PieceSet) -> ok | none_eligible
254 %% Description: Find a piece in the PieceSet which has not been chunked
255 %% yet and chunk it. Returns either ok if a piece was chunked or none_eligible
256 %% if we can't find anything to chunk up in the PieceSet.
258 %%--------------------------------------------------------------------
259 chunkify_new_piece(Id
, PieceSet
) when is_integer(Id
) ->
260 It
= gb_sets:iterator(PieceSet
),
261 case find_new_piece(Id
, gb_sets:next(It
)) of
262 none
-> none_eligible
;
263 P
when is_record(P
, piece
) ->
264 chunkify_piece(Id
, P
),
268 %%--------------------------------------------------------------------
269 %% Function: select_chunks_by_piecenum(Id, PieceNum, Num, Pid) ->
270 %% {ok, [{Offset, Len}], Remain}
271 %% Description: Select up to Num chunks from PieceNum. Will return either
272 %% {ok, Chunks} if it got all chunks it wanted, or {partial, Chunks, Remain}
273 %% if it got some chunks and there is still Remain chunks left to pick.
274 %%--------------------------------------------------------------------
275 select_chunks_by_piecenum(Id
, Index
, N
, Pid
) ->
276 [R
] = ets:lookup(etorrent_chunk_tbl
,
277 {Id
, Index
, not_fetched
}),
278 %% Get up to N chunks
279 {Return
, Rest
} = etorrent_utils:gsplit(N
, R#chunk
.chunks
),
280 [_
|_
] = Return
, %% Assert.
281 %% Write back missing chunks.
283 [] -> ets:delete_object(etorrent_chunk_tbl
, R
);
284 [_
|_
] -> ets:insert(etorrent_chunk_tbl
, R#chunk
{ chunks
= Rest
})
286 %% Assign chunk to us
287 Q
= case ets:lookup(etorrent_chunk_tbl
, {Id
, Index
, {assigned
, Pid
}}) of
289 #chunk
{ idt
= {Id
, Index
, {assigned
, Pid
}},
293 ets:insert(etorrent_chunk_tbl
, Q#chunk
{ chunks
= Return
++ Q#chunk
.chunks
}),
294 %% Tell caller how much is remaning
295 Remaining
= N
- length(Return
),
296 {ok
, {Index
, Return
}, Remaining
}.
299 %%--------------------------------------------------------------------
300 %% Function: chunkify(Operations) ->
301 %% [{Offset, Size, FileOperations}]
302 %% Description: From a list of operations to read/write a piece, construct
303 %% a list of chunks given by Offset of the chunk, Size of the chunk and
304 %% how to read/write that chunk.
305 %%--------------------------------------------------------------------
307 %% First, we call the version of the function doing the grunt work.
308 chunkify(Operations
) ->
309 chunkify(0, 0, [], Operations
, ?DEFAULT_CHUNK_SIZE
).
311 %% Suppose the next File operation on the piece has 0 bytes in size, then it
312 %% is exhausted and must be thrown away.
313 chunkify(AtOffset
, EatenBytes
, Operations
,
314 [{_Path
, _Offset
, 0} | Rest
], Left
) ->
315 chunkify(AtOffset
, EatenBytes
, Operations
, Rest
, Left
);
317 %% There are no more file operations to carry out. Hence we reached the end of
318 %% the piece and we just return the last chunk operation. Remember to reverse
319 %% the list of operations for that chunk as we build it in reverse.
320 chunkify(AtOffset
, EatenBytes
, Operations
, [], _Sz
) ->
321 [{AtOffset
, EatenBytes
, lists:reverse(Operations
)}];
323 %% There are no more bytes left to add to this chunk. Recurse by calling
324 %% on the rest of the problem and add our chunk to the front when coming
325 %% back. Remember to reverse the Operations list built in reverse.
326 chunkify(AtOffset
, EatenBytes
, Operations
, OpsLeft
, 0) ->
327 R
= chunkify(AtOffset
+ EatenBytes
, 0, [], OpsLeft
, ?DEFAULT_CHUNK_SIZE
),
328 [{AtOffset
, EatenBytes
, lists:reverse(Operations
)} | R
];
330 %% The next file we are processing have a larger size than what is left for this
331 %% chunk. Hence we can just eat off that many bytes from the front file.
332 chunkify(AtOffset
, EatenBytes
, Operations
,
333 [{Path
, Offset
, Size
} | Rest
], Left
) when Left
=< Size
->
334 chunkify(AtOffset
, EatenBytes
+ Left
,
335 [{Path
, Offset
, Left
} | Operations
],
336 [{Path
, Offset
+Left
, Size
- Left
} | Rest
],
339 %% The next file does *not* have enough bytes left, so we eat all the bytes
340 %% we can get from it, and move on to the next file.
341 chunkify(AtOffset
, EatenBytes
, Operations
,
342 [{Path
, Offset
, Size
} | Rest
], Left
) when Left
> Size
->
343 chunkify(AtOffset
, EatenBytes
+ Size
,
344 [{Path
, Offset
, Size
} | Operations
],
348 %%--------------------------------------------------------------------
349 %% Function: chunkify_piece(Id, PieceNum) -> ok
350 %% Description: Given a PieceNumber, cut it up into chunks and add those
351 %% to the chunk table.
352 %%--------------------------------------------------------------------
353 chunkify_piece(Id
, P
) when is_record(P
, piece
) ->
354 Chunks
= chunkify(P#piece
.files
),
355 NumChunks
= length(Chunks
),
356 not_fetched
= P#piece
.state
,
357 {Id
, Idx
} = P#piece
.idpn
,
358 ok
= etorrent_piece_mgr:chunk(Id
, Idx
, NumChunks
),
359 ets:insert(etorrent_chunk_tbl
,
360 #chunk
{ idt
= {Id
, Idx
, not_fetched
},
362 etorrent_torrent:decrease_not_fetched(Id
),
365 %%--------------------------------------------------------------------
366 %% Function: find_new_piece(Id, Iterator) -> #piece | none
367 %% Description: Search an iterator for a not_fetched piece. Return the #piece
369 %%--------------------------------------------------------------------
370 find_new_piece(_Id
, none
) -> none
;
371 find_new_piece(Id
, {PN
, Next
}) ->
372 case ets:lookup(etorrent_piece_tbl
, {Id
, PN
}) of
374 find_new_piece(Id
, gb_sets:next(Next
));
375 [P
] when P#piece
.state
=:= not_fetched
->
377 [_P
] -> find_new_piece(Id
, gb_sets:next(Next
))
380 %%--------------------------------------------------------------------
381 %% Function: find_chunked_chunks(Id, iterator_result()) -> none | PieceNum
382 %% Description: Search an iterator for a chunked piece.
383 %%--------------------------------------------------------------------
384 find_chunked_chunks(_Id
, none
, Res
) ->
386 find_chunked_chunks(Id
, {Pn
, Next
}, Res
) ->
387 [P
] = ets:lookup(etorrent_piece_tbl
, {Id
, Pn
}),
388 case P#piece
.state
of
390 case ets:lookup(etorrent_chunk_tbl
, {Id
, Pn
, not_fetched
}) of
392 find_chunked_chunks(Id
, gb_sets:next(Next
), found_chunked
);
397 find_chunked_chunks(Id
, gb_sets:next(Next
), Res
)
400 update_fetched(Id
, Index
, {Offset
, _Len
}) ->
401 case etorrent_piece_mgr:fetched(Id
, Index
) of
404 case ets:lookup(etorrent_chunk_tbl
,
405 {Id
, Index
, fetched
}) of
407 ets:insert(etorrent_chunk_tbl
,
408 #chunk
{ idt
= {Id
, Index
, fetched
},
412 case lists:member(Offset
, R#chunk
.chunks
) of
415 ets:insert(etorrent_chunk_tbl
,
416 R#chunk
{ chunks
= [ Offset
| R#chunk
.chunks
]}),
422 update_chunk_assignment(Id
, Index
, Pid
,
424 case ets:lookup(etorrent_chunk_tbl
,
425 {Id
, Index
, {assigned
, Pid
}}) of
427 %% Stored a chunk not belonging to us, ignore
430 case lists:keydelete({Offset
, Len
}, 1, S#chunk
.chunks
) of
432 ets:delete_object(etorrent_chunk_tbl
, S
);
434 ets:insert(etorrent_chunk_tbl
,
435 S#chunk
{ chunks
= L
})
440 %% There are 0 remaining chunks to be desired, return the chunks so far
441 pick_chunks(_Operation
, {_Pid
, _Id
, _PieceSet
, SoFar
, 0, _Res
}) ->
444 %% Pick chunks from the already chunked pieces
445 pick_chunks(pick_chunked
, {Pid
, Id
, PieceSet
, SoFar
, Remaining
, Res
}) ->
446 Iterator
= gb_sets:iterator(PieceSet
),
447 case find_chunked_chunks(Id
, gb_sets:next(Iterator
), Res
) of
449 pick_chunks(chunkify_piece
, {Pid
, Id
, PieceSet
, SoFar
, Remaining
, none
});
451 pick_chunks(chunkify_piece
, {Pid
, Id
, PieceSet
, SoFar
, Remaining
, found_chunked
});
452 PieceNum
when is_integer(PieceNum
) ->
454 select_chunks_by_piecenum(Id
, PieceNum
,
456 pick_chunks(pick_chunked
, {Pid
, Id
,
457 gb_sets:del_element(PieceNum
, PieceSet
),
463 %% Find a new piece to chunkify. Give up if no more pieces can be chunkified
464 pick_chunks(chunkify_piece
, {Pid
, Id
, PieceSet
, SoFar
, Remaining
, Res
}) ->
465 case chunkify_new_piece(Id
, PieceSet
) of
467 pick_chunks(pick_chunked
, {Pid
, Id
, PieceSet
, SoFar
, Remaining
, Res
});
468 none_eligible
when SoFar
=:= [], Res
=:= none
->
470 none_eligible
when SoFar
=:= [], Res
=:= found_chunked
->
476 %% Handle the endgame for a torrent gracefully
477 pick_chunks(endgame
, {Id
, PieceSet
, N
}) ->
478 Remaining
= find_remaining_chunks(Id
, PieceSet
),
479 Shuffled
= etorrent_utils:shuffle(Remaining
),
480 {endgame
, lists:sublist(Shuffled
, N
)}.
483 pick_chunks_endgame(Id
, Set
, Remaining
, Ret
) ->
484 case etorrent_torrent:is_endgame(Id
) of
485 false
-> Ret
; %% No endgame yet
486 true
-> pick_chunks(endgame
, {Id
, Set
, Remaining
})