Handle HAVE_ALL and HAVE_NONE. Cleanup the BITFIELD message.
[etorrent.git] / lib / etorrent-1.0 / src / etorrent_fs.erl
blob12a47bbda82e5b72727f2234a03e690619438d82
1 %%%-------------------------------------------------------------------
2 %%% File : file_system.erl
3 %%% Author : User Jlouis <jesper.louis.andersen@gmail.com>
4 %%% License : See COPYING
5 %%% Description : Implements access to the file system through
6 %%% file_process processes.
7 %%%
8 %%% Created : 19 Jun 2007 by User Jlouis <jesper.louis.andersen@gmail.com>
9 %%%-------------------------------------------------------------------
10 -module(etorrent_fs).
12 -include("etorrent_piece.hrl").
13 -include("etorrent_mnesia_table.hrl").
15 -behaviour(gen_server).
17 %% API
18 -export([start_link/2,
19 stop/1, read_piece/2, size_of_ops/1,
20 write_chunk/2, check_piece/2]).
22 %% gen_server callbacks
23 -export([init/1, handle_call/3, handle_cast/2, handle_info/2,
24 terminate/2, code_change/3]).
26 -record(state, { torrent_id = none, %% id of torrent we are serving
27 file_pool = none,
28 supervisor = none,
29 file_process_dict = none}).
31 %%====================================================================
32 %% API
33 %%====================================================================
35 %%--------------------------------------------------------------------
36 %% Function: size_of_ops(operation_list) -> integer()
37 %% Description: Return the file size of the given operations
38 %%--------------------------------------------------------------------
39 size_of_ops(Ops) ->
40 lists:sum([Size || {_Path, _Offset, Size} <- Ops]).
42 %%--------------------------------------------------------------------
43 %% Function: start_link/0
44 %% Description: Spawn and link a new file_system process
45 %%--------------------------------------------------------------------
46 start_link(IDHandle, SPid) ->
47 gen_server:start_link(?MODULE, [IDHandle, SPid], []).
49 %%--------------------------------------------------------------------
50 %% Function: stop(Pid) -> ok
51 %% Description: Stop the file_system process identified by Pid
52 %%--------------------------------------------------------------------
53 stop(Pid) ->
54 gen_server:cast(Pid, stop).
56 %%--------------------------------------------------------------------
57 %% Function: read_piece(Pid, N) -> {ok, Binary}
58 %% Description: Ask file_system process Pid to retrieve Piece N
59 %%--------------------------------------------------------------------
60 read_piece(Pid, Pn) when is_integer(Pn) ->
61 gen_server:call(Pid, {read_piece, Pn}).
63 %%--------------------------------------------------------------------
64 %% Function: check_piece(Pid, PeerGroupPid, Index) -> ok | wrong_hash
65 %% Description: Search the mnesia tables for the Piece with Index and
66 %% write it back to disk.
67 %%--------------------------------------------------------------------
68 check_piece(Pid, Index) ->
69 gen_server:cast(Pid, {check_piece, Index}).
72 write_chunk(Pid, {Index, Data, Ops}) ->
73 gen_server:cast(Pid, {write_chunk, {Index, Data, Ops}}).
75 %%====================================================================
76 %% gen_server callbacks
77 %%====================================================================
78 init([IDHandle, SPid]) when is_integer(IDHandle) ->
79 process_flag(trap_exit, true),
80 {ok, #state{file_process_dict = dict:new(),
81 file_pool = none,
82 supervisor = SPid,
83 torrent_id = IDHandle}}.
85 handle_call(Msg, _From, S) when S#state.file_pool =:= none ->
86 FSPool = etorrent_t_sup:get_pid(S#state.supervisor, fs_pool),
87 handle_call(Msg, _From, S#state { file_pool = FSPool });
88 handle_call({read_piece, PieceNum}, _From, S) ->
89 [#piece { files = Operations}] =
90 etorrent_piece_mgr:select(S#state.torrent_id, PieceNum),
91 {ok, Data, NS} = read_pieces_and_assemble(Operations, [], S),
92 {reply, {ok, Data}, NS}.
94 handle_cast(Msg, S) when S#state.file_pool =:= none ->
95 FSPool = etorrent_t_sup:get_pid(S#state.supervisor, fs_pool),
96 handle_cast(Msg, S#state { file_pool = FSPool });
97 handle_cast({write_chunk, {Index, Data, Ops}}, S) ->
98 case etorrent_piece_mgr:select(S#state.torrent_id, Index) of
99 [P] when P#piece.state =:= fetched ->
100 {noreply, S};
101 [_] ->
102 {ok, NS} = fs_write(Data, Ops, S),
103 {noreply, NS}
104 end;
105 handle_cast({check_piece, Index}, S) ->
106 [#piece { hash = Hash, files = Operations}] =
107 etorrent_piece_mgr:select(S#state.torrent_id, Index),
108 {ok, Data, NS} = read_pieces_and_assemble(Operations, [], S),
109 DataSize = size(Data),
110 case Hash == crypto:sha(Data) of
111 true ->
112 ok = etorrent_torrent:statechange(
113 S#state.torrent_id,
114 [{subtract_left, DataSize},
115 {add_downloaded, DataSize}]),
116 ok = etorrent_piece_mgr:statechange(
117 S#state.torrent_id,
118 Index,
119 fetched),
120 %% Make sure there is no chunks left for this piece.
121 ok = etorrent_chunk_mgr:remove_chunks(S#state.torrent_id, Index),
122 broadcast_have_message(Index, S#state.torrent_id),
123 {noreply, NS};
124 false ->
125 ok =
126 etorrent_piece_mgr:statechange(S#state.torrent_id,
127 Index,
128 not_fetched),
129 etorrent_chunk_mgr:remove_chunks(S#state.torrent_id, Index),
130 %% 'left' will be updated when the piece is chunked again.
131 {noreply, NS}
132 end;
133 handle_cast(stop, S) ->
134 {stop, normal, S};
135 handle_cast(_Msg, State) ->
136 {noreply, State}.
138 %%--------------------------------------------------------------------
139 %% Function: handle_info(Info, State) -> {noreply, State} |
140 %% {noreply, State, Timeout} |
141 %% {stop, Reason, State}
142 %% Description: Handling all non call/cast messages
143 %%--------------------------------------------------------------------
144 handle_info({'DOWN', _R, process, Pid, _Reason}, S) ->
145 Nd = remove_file_process(Pid, S#state.file_process_dict),
146 {noreply, S#state { file_process_dict = Nd }};
147 handle_info(_Info, State) ->
148 {noreply, State}.
150 %%--------------------------------------------------------------------
151 %% Function: terminate(Reason, State) -> void()
152 %% Description: This function is called by a gen_server when it is about to
153 %% terminate. It should be the opposite of Module:init/1 and do any necessary
154 %% cleaning up. When it returns, the gen_server terminates with Reason.
155 %% The return value is ignored.
156 %%--------------------------------------------------------------------
157 terminate(_Reason, S) ->
158 ok = stop_all_fs_processes(S#state.file_process_dict),
159 ok = etorrent_path_map:delete(S#state.torrent_id),
162 %%--------------------------------------------------------------------
163 %% Func: code_change(OldVsn, State, Extra) -> {ok, NewState}
164 %% Description: Convert process state when code is changed
165 %%--------------------------------------------------------------------
166 code_change(_OldVsn, State, _Extra) ->
167 {ok, State}.
169 %%--------------------------------------------------------------------
170 %%% Internal functions
171 %%--------------------------------------------------------------------
172 create_file_process(Id, S) ->
173 {ok, Pid} = etorrent_fs_pool_sup:add_file_process(S#state.file_pool, S#state.torrent_id, Id),
174 erlang:monitor(process, Pid),
175 NewDict = dict:store(Id, Pid, S#state.file_process_dict),
176 {ok, Pid, S#state{ file_process_dict = NewDict }}.
178 read_pieces_and_assemble([], FileData, S) ->
179 {ok, list_to_binary(lists:reverse(FileData)), S};
180 read_pieces_and_assemble([{Id, Offset, Size} | Rest], Done, S) ->
181 %% 2 Notes: This can't be tail recursive due to catch-handler on stack.
182 %% I've seen exit:{timeout, ...}. We should probably just warn
183 %% And try again ;)
184 case dict:find(Id, S#state.file_process_dict) of
185 {ok, Pid} ->
187 Data = etorrent_fs_process:get_data(Pid, Offset, Size),
188 read_pieces_and_assemble(Rest, [Data | Done], S)
189 catch
190 exit:{noproc, _} ->
191 D = remove_file_process(Pid, S#state.file_process_dict),
192 read_pieces_and_assemble([{Id, Offset, Size} | Rest],
193 Done,
194 S#state{file_process_dict = D})
195 end;
196 error ->
197 {ok, Pid, NS} = create_file_process(Id, S),
198 Data = etorrent_fs_process:get_data(Pid, Offset, Size),
199 read_pieces_and_assemble(Rest, [Data | Done], NS)
200 end.
202 %%--------------------------------------------------------------------
203 %% Func: fs_write(Data, Operations, State) -> {ok, State}
204 %% Description: Write data defined by Operations. Returns new State
205 %% maintaining the file_process_dict.
206 %%--------------------------------------------------------------------
207 fs_write(<<>>, [], S) ->
208 {ok, S};
209 fs_write(Data, [{Id, Offset, Size} | Rest], S) ->
210 <<Chunk:Size/binary, Remaining/binary>> = Data,
211 case dict:find(Id, S#state.file_process_dict) of
212 {ok, Pid} ->
213 Ref = make_ref(),
214 case catch({Ref,
215 etorrent_fs_process:put_data(Pid, Chunk,
216 Offset, Size)}) of
217 {Ref, ok} ->
218 fs_write(Remaining, Rest, S);
219 {'EXIT', {noproc, _}} ->
220 D = remove_file_process(Pid, S#state.file_process_dict),
221 fs_write(Data, [{Id, Offset, Size} | Rest],
222 S#state{file_process_dict = D})
223 end;
224 error ->
225 {ok, Pid, NS} = create_file_process(Id, S),
226 ok = etorrent_fs_process:put_data(Pid, Chunk, Offset, Size),
227 fs_write(Remaining, Rest, NS)
228 end.
230 remove_file_process(Pid, Dict) ->
231 case dict:fetch_keys(dict:filter(fun (_K, V) -> V =:= Pid end, Dict)) of
232 [Key] ->
233 dict:erase(Key, Dict);
234 [] ->
235 Dict
236 end.
238 stop_all_fs_processes(Dict) ->
239 lists:foreach(fun({_, Pid}) -> etorrent_fs_process:stop(Pid) end,
240 dict:to_list(Dict)),
243 broadcast_have_message(Index, TorrentId) ->
244 Peers = etorrent_peer:all(TorrentId),
245 lists:foreach(fun (Peer) ->
246 etorrent_t_peer_recv:have(Peer#peer.pid, Index)
247 end,
248 Peers).