Globalize the etorrent_t_peer_group_mgr and rename it to etorrent_choker while here.
[etorrent.git] / lib / etorrent-1.0 / src / etorrent_fs.erl
blob755dd47dbf4d7b47dcc9f9c24b17cd840cdaa39a
1 %%%-------------------------------------------------------------------
2 %%% File : file_system.erl
3 %%% Author : User Jlouis <jesper.louis.andersen@gmail.com>
4 %%% License : See COPYING
5 %%% Description : Implements access to the file system through
6 %%% file_process processes.
7 %%%
8 %%% Created : 19 Jun 2007 by User Jlouis <jesper.louis.andersen@gmail.com>
9 %%%-------------------------------------------------------------------
10 -module(etorrent_fs).
12 -include("etorrent_piece.hrl").
13 -include("etorrent_mnesia_table.hrl").
15 -behaviour(gen_server).
17 %% API
18 -export([start_link/2,
19 stop/1, read_piece/2, size_of_ops/1,
20 write_chunk/2, check_piece/2]).
22 %% gen_server callbacks
23 -export([init/1, handle_call/3, handle_cast/2, handle_info/2,
24 terminate/2, code_change/3]).
26 -record(state, { torrent_id = none, %% id of torrent we are serving
27 file_pool = none,
28 supervisor = none,
29 file_process_dict = none}).
31 %%====================================================================
32 %% API
33 %%====================================================================
35 %%--------------------------------------------------------------------
36 %% Function: size_of_ops(operation_list) -> integer()
37 %% Description: Return the file size of the given operations
38 %%--------------------------------------------------------------------
39 size_of_ops(Ops) ->
40 lists:sum([Size || {_Path, _Offset, Size} <- Ops]).
42 %%--------------------------------------------------------------------
43 %% Function: start_link/0
44 %% Description: Spawn and link a new file_system process
45 %%--------------------------------------------------------------------
46 start_link(IDHandle, SPid) ->
47 gen_server:start_link(?MODULE, [IDHandle, SPid], []).
49 %%--------------------------------------------------------------------
50 %% Function: stop(Pid) -> ok
51 %% Description: Stop the file_system process identified by Pid
52 %%--------------------------------------------------------------------
53 stop(Pid) ->
54 gen_server:cast(Pid, stop).
56 %%--------------------------------------------------------------------
57 %% Function: read_piece(Pid, N) -> {ok, Binary}
58 %% Description: Ask file_system process Pid to retrieve Piece N
59 %%--------------------------------------------------------------------
60 read_piece(Pid, Pn) when is_integer(Pn) ->
61 gen_server:call(Pid, {read_piece, Pn}).
63 %%--------------------------------------------------------------------
64 %% Function: check_piece(Pid, PeerGroupPid, Index) -> ok | wrong_hash
65 %% Description: Search the mnesia tables for the Piece with Index and
66 %% write it back to disk.
67 %%--------------------------------------------------------------------
68 check_piece(Pid, Index) ->
69 gen_server:cast(Pid, {check_piece, Index}).
72 write_chunk(Pid, {Index, Data, Ops}) ->
73 gen_server:cast(Pid, {write_chunk, {Index, Data, Ops}}).
75 %%====================================================================
76 %% gen_server callbacks
77 %%====================================================================
78 init([IDHandle, SPid]) when is_integer(IDHandle) ->
79 {ok, #state{file_process_dict = dict:new(),
80 file_pool = none,
81 supervisor = SPid,
82 torrent_id = IDHandle}}.
84 handle_call(Msg, _From, S) when S#state.file_pool =:= none ->
85 FSPool = etorrent_t_sup:get_pid(S#state.supervisor, fs_pool),
86 handle_call(Msg, _From, S#state { file_pool = FSPool });
87 handle_call({read_piece, PieceNum}, _From, S) ->
88 [#piece { files = Operations}] =
89 etorrent_piece_mgr:select(S#state.torrent_id, PieceNum),
90 {ok, Data, NS} = read_pieces_and_assemble(Operations, [], S),
91 {reply, {ok, Data}, NS}.
93 handle_cast(Msg, S) when S#state.file_pool =:= none ->
94 FSPool = etorrent_t_sup:get_pid(S#state.supervisor, fs_pool),
95 handle_cast(Msg, S#state { file_pool = FSPool });
96 handle_cast({write_chunk, {Index, Data, Ops}}, S) ->
97 case etorrent_piece_mgr:select(S#state.torrent_id, Index) of
98 [P] when P#piece.state =:= fetched ->
99 {noreply, S};
100 [_] ->
101 {ok, NS} = fs_write(Data, Ops, S),
102 {noreply, NS}
103 end;
104 handle_cast({check_piece, Index}, S) ->
105 [#piece { hash = Hash, files = Operations}] =
106 etorrent_piece_mgr:select(S#state.torrent_id, Index),
107 {ok, Data, NS} = read_pieces_and_assemble(Operations, [], S),
108 DataSize = size(Data),
109 case Hash == crypto:sha(Data) of
110 true ->
111 ok = etorrent_torrent:statechange(
112 S#state.torrent_id,
113 [{subtract_left, DataSize},
114 {add_downloaded, DataSize}]),
115 ok = etorrent_piece_mgr:statechange(
116 S#state.torrent_id,
117 Index,
118 fetched),
119 %% Make sure there is no chunks left for this piece.
120 ok = etorrent_chunk_mgr:remove_chunks(S#state.torrent_id, Index),
121 broadcast_have_message(Index, S#state.torrent_id),
122 {noreply, NS};
123 false ->
124 ok =
125 etorrent_piece_mgr:statechange(S#state.torrent_id,
126 Index,
127 not_fetched),
128 etorrent_chunk_mgr:remove_chunks(S#state.torrent_id, Index),
129 %% 'left' will be updated when the piece is chunked again.
130 {noreply, NS}
131 end;
132 handle_cast(stop, S) ->
133 {stop, normal, S};
134 handle_cast(_Msg, State) ->
135 {noreply, State}.
137 %%--------------------------------------------------------------------
138 %% Function: handle_info(Info, State) -> {noreply, State} |
139 %% {noreply, State, Timeout} |
140 %% {stop, Reason, State}
141 %% Description: Handling all non call/cast messages
142 %%--------------------------------------------------------------------
143 handle_info({'DOWN', _R, process, Pid, _Reason}, S) ->
144 Nd = remove_file_process(Pid, S#state.file_process_dict),
145 {noreply, S#state { file_process_dict = Nd }};
146 handle_info(_Info, State) ->
147 {noreply, State}.
149 %%--------------------------------------------------------------------
150 %% Function: terminate(Reason, State) -> void()
151 %% Description: This function is called by a gen_server when it is about to
152 %% terminate. It should be the opposite of Module:init/1 and do any necessary
153 %% cleaning up. When it returns, the gen_server terminates with Reason.
154 %% The return value is ignored.
155 %%--------------------------------------------------------------------
156 terminate(shutdown, S) ->
157 ok = stop_all_fs_processes(S#state.file_process_dict),
159 terminate(Reason, _State) ->
160 error_logger:warning_report([fs_process_terminate, Reason]),
163 %%--------------------------------------------------------------------
164 %% Func: code_change(OldVsn, State, Extra) -> {ok, NewState}
165 %% Description: Convert process state when code is changed
166 %%--------------------------------------------------------------------
167 code_change(_OldVsn, State, _Extra) ->
168 {ok, State}.
170 %%--------------------------------------------------------------------
171 %%% Internal functions
172 %%--------------------------------------------------------------------
173 create_file_process(Id, S) ->
174 {ok, Pid} = etorrent_fs_pool_sup:add_file_process(S#state.file_pool, S#state.torrent_id, Id),
175 erlang:monitor(process, Pid),
176 NewDict = dict:store(Id, Pid, S#state.file_process_dict),
177 {ok, Pid, S#state{ file_process_dict = NewDict }}.
179 read_pieces_and_assemble([], FileData, S) ->
180 {ok, list_to_binary(lists:reverse(FileData)), S};
181 read_pieces_and_assemble([{Id, Offset, Size} | Rest], Done, S) ->
182 case dict:find(Id, S#state.file_process_dict) of
183 {ok, Pid} ->
184 Ref = make_ref(),
185 case catch({Ref,
186 etorrent_fs_process:get_data(Pid, Offset, Size)}) of
187 {Ref, Data} ->
188 read_pieces_and_assemble(Rest, [Data | Done], S);
189 {'EXIT', {noproc, _}} ->
190 D = remove_file_process(Pid, S#state.file_process_dict),
191 read_pieces_and_assemble([{Id, Offset, Size} | Rest],
192 Done,
193 S#state{file_process_dict = D})
194 end;
195 error ->
196 {ok, Pid, NS} = create_file_process(Id, S),
197 Data = etorrent_fs_process:get_data(Pid, Offset, Size),
198 read_pieces_and_assemble(Rest, [Data | Done], NS)
199 end.
201 %%--------------------------------------------------------------------
202 %% Func: fs_write(Data, Operations, State) -> {ok, State}
203 %% Description: Write data defined by Operations. Returns new State
204 %% maintaining the file_process_dict.
205 %%--------------------------------------------------------------------
206 fs_write(<<>>, [], S) ->
207 {ok, S};
208 fs_write(Data, [{Id, Offset, Size} | Rest], S) ->
209 <<Chunk:Size/binary, Remaining/binary>> = Data,
210 case dict:find(Id, S#state.file_process_dict) of
211 {ok, Pid} ->
212 Ref = make_ref(),
213 case catch({Ref,
214 etorrent_fs_process:put_data(Pid, Chunk,
215 Offset, Size)}) of
216 {Ref, ok} ->
217 fs_write(Remaining, Rest, S);
218 {'EXIT', {noproc, _}} ->
219 D = remove_file_process(Pid, S#state.file_process_dict),
220 fs_write(Data, [{Id, Offset, Size} | Rest],
221 S#state{file_process_dict = D})
222 end;
223 error ->
224 {ok, Pid, NS} = create_file_process(Id, S),
225 ok = etorrent_fs_process:put_data(Pid, Chunk, Offset, Size),
226 fs_write(Remaining, Rest, NS)
227 end.
229 remove_file_process(Pid, Dict) ->
230 case dict:fetch_keys(dict:filter(fun (_K, V) -> V =:= Pid end, Dict)) of
231 [Key] ->
232 dict:erase(Key, Dict);
233 [] ->
235 end.
237 stop_all_fs_processes(Dict) ->
238 lists:foreach(fun({_, Pid}) -> etorrent_fs_process:stop(Pid) end,
239 dict:to_list(Dict)),
242 broadcast_have_message(Index, TorrentId) ->
243 Peers = etorrent_peer:all(TorrentId),
244 lists:foreach(fun (Peer) ->
245 etorrent_t_peer_recv:send_have_piece(Peer#peer.pid, Index)
246 end,
247 Peers).