Let torrent statechanges take mulitple things to change.
[etorrent.git] / lib / etorrent-1.0 / src / etorrent_fs.erl
blob9fe76c80662e69a07507d40b328792b51f99d087
1 %%%-------------------------------------------------------------------
2 %%% File : file_system.erl
3 %%% Author : User Jlouis <jesper.louis.andersen@gmail.com>
4 %%% License : See COPYING
5 %%% Description : Implements access to the file system through
6 %%% file_process processes.
7 %%%
8 %%% Created : 19 Jun 2007 by User Jlouis <jesper.louis.andersen@gmail.com>
9 %%%-------------------------------------------------------------------
10 -module(etorrent_fs).
12 -include("etorrent_mnesia_table.hrl").
14 -behaviour(gen_server).
16 %% API
17 -export([start_link/2,
18 stop/1, read_piece/2, size_of_ops/1,
19 write_chunk/2, check_piece/3]).
21 %% gen_server callbacks
22 -export([init/1, handle_call/3, handle_cast/2, handle_info/2,
23 terminate/2, code_change/3]).
25 -record(state, { torrent_id = none, %% id of torrent we are serving
26 file_pool = none,
27 supervisor = none,
28 file_process_dict = none}).
30 %%====================================================================
31 %% API
32 %%====================================================================
34 %%--------------------------------------------------------------------
35 %% Function: size_of_ops(operation_list) -> integer()
36 %% Description: Return the file size of the given operations
37 %%--------------------------------------------------------------------
38 size_of_ops(Ops) ->
39 lists:sum([Size || {_Path, _Offset, Size} <- Ops]).
41 %%--------------------------------------------------------------------
42 %% Function: start_link/0
43 %% Description: Spawn and link a new file_system process
44 %%--------------------------------------------------------------------
45 start_link(IDHandle, SPid) ->
46 gen_server:start_link(?MODULE, [IDHandle, SPid], []).
48 %%--------------------------------------------------------------------
49 %% Function: stop(Pid) -> ok
50 %% Description: Stop the file_system process identified by Pid
51 %%--------------------------------------------------------------------
52 stop(Pid) ->
53 gen_server:cast(Pid, stop).
55 %%--------------------------------------------------------------------
56 %% Function: read_piece(Pid, N) -> {ok, Binary}
57 %% Description: Ask file_system process Pid to retrieve Piece N
58 %%--------------------------------------------------------------------
59 read_piece(Pid, Pn) when is_integer(Pn) ->
60 gen_server:call(Pid, {read_piece, Pn}).
62 %%--------------------------------------------------------------------
63 %% Function: check_piece(Pid, PeerGroupPid, Index) -> ok | wrong_hash
64 %% Description: Search the mnesia tables for the Piece with Index and
65 %% write it back to disk.
66 %%--------------------------------------------------------------------
67 check_piece(Pid, PeerGroupPid, Index) ->
68 gen_server:cast(Pid, {check_piece, PeerGroupPid, Index}).
70 %%% TODO: The PeerGroupPid could be obtained by the process itself.
71 write_chunk(Pid, {Index, Data, Ops}) ->
72 gen_server:cast(Pid, {write_chunk, {Index, Data, Ops}}).
74 %%====================================================================
75 %% gen_server callbacks
76 %%====================================================================
77 init([IDHandle, SPid]) when is_integer(IDHandle) ->
78 {ok, #state{file_process_dict = dict:new(),
79 file_pool = none,
80 supervisor = SPid,
81 torrent_id = IDHandle}}.
83 handle_call(Msg, _From, S) when S#state.file_pool =:= none ->
84 FSPool = etorrent_t_sup:get_pid(S#state.supervisor, fs_pool),
85 handle_call(Msg, _From, S#state { file_pool = FSPool });
86 handle_call({read_piece, PieceNum}, _From, S) ->
87 [#piece { files = Operations}] =
88 etorrent_piece:select(S#state.torrent_id, PieceNum),
89 {ok, Data, NS} = read_pieces_and_assemble(Operations, [], S),
90 {reply, {ok, Data}, NS}.
92 handle_cast(Msg, S) when S#state.file_pool =:= none ->
93 FSPool = etorrent_t_sup:get_pid(S#state.supervisor, fs_pool),
94 handle_cast(Msg, S#state { file_pool = FSPool });
95 handle_cast({write_chunk, {Index, Data, Ops}}, S) ->
96 case etorrent_piece:select(S#state.torrent_id, Index) of
97 [P] when P#piece.state =:= fetched ->
98 {noreply, S};
99 [_] ->
100 {ok, NS} = fs_write(Data, Ops, S),
101 {noreply, NS}
102 end;
103 handle_cast({check_piece, PeerGroupPid, Index}, S) ->
104 [#piece { hash = Hash, files = Operations}] =
105 etorrent_piece:select(S#state.torrent_id, Index),
106 {ok, Data, NS} = read_pieces_and_assemble(Operations, [], S),
107 DataSize = size(Data),
108 case Hash == crypto:sha(Data) of
109 true ->
110 ok = etorrent_torrent:statechange(
111 S#state.torrent_id,
112 [{subtract_left, DataSize},
113 {add_downloaded, DataSize}]),
114 {atomic, ok} = etorrent_piece:statechange(
115 S#state.torrent_id,
116 Index,
117 fetched),
118 %% Make sure there is no chunks left for this piece.
119 ok = etorrent_chunk:remove_chunks(S#state.torrent_id, Index),
120 ok = etorrent_t_peer_group_mgr:broadcast_have(PeerGroupPid,
121 Index),
122 {noreply, NS};
123 false ->
124 {atomic, ok} =
125 etorrent_piece:statechange(S#state.torrent_id,
126 Index,
127 not_fetched),
128 %% TODO: Kill the 'fetched' part in the chunk table.
129 %% TODO: Update 'left' correctly for the piece.
130 {noreply, NS}
131 end;
132 handle_cast(stop, S) ->
133 {stop, normal, S};
134 handle_cast(_Msg, State) ->
135 {noreply, State}.
137 %%--------------------------------------------------------------------
138 %% Function: handle_info(Info, State) -> {noreply, State} |
139 %% {noreply, State, Timeout} |
140 %% {stop, Reason, State}
141 %% Description: Handling all non call/cast messages
142 %%--------------------------------------------------------------------
143 handle_info({'DOWN', _R, process, Pid, _Reason}, S) ->
144 Nd = remove_file_process(Pid, S#state.file_process_dict),
145 {noreply, S#state { file_process_dict = Nd }};
146 handle_info(_Info, State) ->
147 {noreply, State}.
149 %%--------------------------------------------------------------------
150 %% Function: terminate(Reason, State) -> void()
151 %% Description: This function is called by a gen_server when it is about to
152 %% terminate. It should be the opposite of Module:init/1 and do any necessary
153 %% cleaning up. When it returns, the gen_server terminates with Reason.
154 %% The return value is ignored.
155 %%--------------------------------------------------------------------
156 terminate(shutdown, S) ->
157 stop_all_fs_processes(S#state.file_process_dict),
159 terminate(Reason, _State) ->
160 error_logger:warning_report([fs_process_terminate, Reason]),
163 %%--------------------------------------------------------------------
164 %% Func: code_change(OldVsn, State, Extra) -> {ok, NewState}
165 %% Description: Convert process state when code is changed
166 %%--------------------------------------------------------------------
167 code_change(_OldVsn, State, _Extra) ->
168 {ok, State}.
170 %%--------------------------------------------------------------------
171 %%% Internal functions
172 %%--------------------------------------------------------------------
173 create_file_process(Id, S) ->
174 {ok, Pid} = etorrent_fs_pool_sup:add_file_process(S#state.file_pool, S#state.torrent_id, Id),
175 erlang:monitor(process, Pid),
176 NewDict = dict:store(Id, Pid, S#state.file_process_dict),
177 {ok, Pid, S#state{ file_process_dict = NewDict }}.
179 read_pieces_and_assemble([], FileData, S) ->
180 {ok, list_to_binary(lists:reverse(FileData)), S};
181 read_pieces_and_assemble([{Id, Offset, Size} | Rest], Done, S) ->
182 case dict:find(Id, S#state.file_process_dict) of
183 {ok, Pid} ->
184 Ref = make_ref(),
185 case catch({Ref,
186 etorrent_fs_process:get_data(Pid, Offset, Size)}) of
187 {Ref, Data} ->
188 read_pieces_and_assemble(Rest, [Data | Done], S);
189 {'EXIT', {noproc, _}} ->
190 D = remove_file_process(Pid, S#state.file_process_dict),
191 read_pieces_and_assemble([{Id, Offset, Size} | Rest],
192 Done,
193 S#state{file_process_dict = D})
194 end;
195 error ->
196 {ok, Pid, NS} = create_file_process(Id, S),
197 Data = etorrent_fs_process:get_data(Pid, Offset, Size),
198 read_pieces_and_assemble(Rest, [Data | Done], NS)
199 end.
201 %%--------------------------------------------------------------------
202 %% Func: fs_write(Data, Operations, State) -> {ok, State}
203 %% Description: Write data defined by Operations. Returns new State
204 %% maintaining the file_process_dict.
205 %%--------------------------------------------------------------------
206 fs_write(<<>>, [], S) ->
207 {ok, S};
208 fs_write(Data, [{Id, Offset, Size} | Rest], S) ->
209 <<Chunk:Size/binary, Remaining/binary>> = Data,
210 case dict:find(Id, S#state.file_process_dict) of
211 {ok, Pid} ->
212 Ref = make_ref(),
213 case catch({Ref,
214 etorrent_fs_process:put_data(Pid, Chunk,
215 Offset, Size)}) of
216 {Ref, ok} ->
217 fs_write(Remaining, Rest, S);
218 {'EXIT', {noproc, _}} ->
219 D = remove_file_process(Pid, S#state.file_process_dict),
220 fs_write(Data, [{Id, Offset, Size} | Rest],
221 S#state{file_process_dict = D})
222 end;
223 error ->
224 {ok, Pid, NS} = create_file_process(Id, S),
225 ok = etorrent_fs_process:put_data(Pid, Chunk, Offset, Size),
226 fs_write(Remaining, Rest, NS)
227 end.
229 remove_file_process(Pid, Dict) ->
230 case dict:fetch_keys(dict:filter(fun (_K, V) -> V =:= Pid end, Dict)) of
231 [Key] ->
232 dict:erase(Key, Dict);
233 [] ->
235 end.
237 stop_all_fs_processes(Dict) ->
238 [etorrent_fs_process:stop(Pid) || {_, Pid} <- dict:to_list(Dict)].