1 %%%-------------------------------------------------------------------
2 %%% File : file_system.erl
3 %%% Author : User Jlouis <jesper.louis.andersen@gmail.com>
4 %%% License : See COPYING
5 %%% Description : Implements access to the file system through
6 %%% file_process processes.
8 %%% Created : 19 Jun 2007 by User Jlouis <jesper.louis.andersen@gmail.com>
9 %%%-------------------------------------------------------------------
12 -include("etorrent_piece.hrl").
13 -include("etorrent_mnesia_table.hrl").
15 -behaviour(gen_server
).
18 -export([start_link
/2,
19 stop
/1, read_piece
/2, size_of_ops
/1,
20 write_chunk
/2, check_piece
/2]).
22 %% gen_server callbacks
23 -export([init
/1, handle_call
/3, handle_cast
/2, handle_info
/2,
24 terminate
/2, code_change
/3]).
26 -record(state
, { torrent_id
= none
, %% id of torrent we are serving
29 file_process_dict
= none
}).
31 %%====================================================================
33 %%====================================================================
35 %%--------------------------------------------------------------------
36 %% Function: size_of_ops(operation_list) -> integer()
37 %% Description: Return the file size of the given operations
38 %%--------------------------------------------------------------------
40 lists:sum([Size
|| {_Path
, _Offset
, Size
} <- Ops
]).
42 %%--------------------------------------------------------------------
43 %% Function: start_link/0
44 %% Description: Spawn and link a new file_system process
45 %%--------------------------------------------------------------------
46 start_link(IDHandle
, SPid
) ->
47 gen_server:start_link(?MODULE
, [IDHandle
, SPid
], []).
49 %%--------------------------------------------------------------------
50 %% Function: stop(Pid) -> ok
51 %% Description: Stop the file_system process identified by Pid
52 %%--------------------------------------------------------------------
54 gen_server:cast(Pid
, stop
).
56 %%--------------------------------------------------------------------
57 %% Function: read_piece(Pid, N) -> {ok, Binary}
58 %% Description: Ask file_system process Pid to retrieve Piece N
59 %%--------------------------------------------------------------------
60 read_piece(Pid
, Pn
) when is_integer(Pn
) ->
61 gen_server:call(Pid
, {read_piece
, Pn
}).
63 %%--------------------------------------------------------------------
64 %% Function: check_piece(Pid, PeerGroupPid, Index) -> ok | wrong_hash
65 %% Description: Search the mnesia tables for the Piece with Index and
66 %% write it back to disk.
67 %%--------------------------------------------------------------------
68 check_piece(Pid
, Index
) ->
69 gen_server:cast(Pid
, {check_piece
, Index
}).
72 write_chunk(Pid
, {Index
, Data
, Ops
}) ->
73 gen_server:cast(Pid
, {write_chunk
, {Index
, Data
, Ops
}}).
75 %%====================================================================
76 %% gen_server callbacks
77 %%====================================================================
78 init([IDHandle
, SPid
]) when is_integer(IDHandle
) ->
79 process_flag(trap_exit
, true
),
80 {ok
, #state
{file_process_dict
= dict:new(),
83 torrent_id
= IDHandle
}}.
85 handle_call(Msg
, _From
, S
) when S#state
.file_pool
=:= none
->
86 FSPool
= etorrent_t_sup:get_pid(S#state
.supervisor
, fs_pool
),
87 handle_call(Msg
, _From
, S#state
{ file_pool
= FSPool
});
88 handle_call({read_piece
, PieceNum
}, _From
, S
) ->
89 [#piece
{ files
= Operations
}] =
90 etorrent_piece_mgr:select(S#state
.torrent_id
, PieceNum
),
91 {ok
, Data
, NS
} = read_pieces_and_assemble(Operations
, [], S
),
92 {reply
, {ok
, Data
}, NS
}.
94 handle_cast(Msg
, S
) when S#state
.file_pool
=:= none
->
95 FSPool
= etorrent_t_sup:get_pid(S#state
.supervisor
, fs_pool
),
96 handle_cast(Msg
, S#state
{ file_pool
= FSPool
});
97 handle_cast({write_chunk
, {Index
, Data
, Ops
}}, S
) ->
98 case etorrent_piece_mgr:select(S#state
.torrent_id
, Index
) of
99 [P
] when P#piece
.state
=:= fetched
->
102 {ok
, NS
} = fs_write(Data
, Ops
, S
),
105 handle_cast({check_piece
, Index
}, S
) ->
106 [#piece
{ hash = Hash
, files
= Operations
}] =
107 etorrent_piece_mgr:select(S#state
.torrent_id
, Index
),
108 {ok
, Data
, NS
} = read_pieces_and_assemble(Operations
, [], S
),
109 DataSize
= size(Data
),
110 case Hash
== crypto:sha(Data
) of
112 ok
= etorrent_torrent:statechange(
114 [{subtract_left
, DataSize
},
115 {add_downloaded
, DataSize
}]),
116 ok
= etorrent_piece_mgr:statechange(
120 %% Make sure there is no chunks left for this piece.
121 ok
= etorrent_chunk_mgr:remove_chunks(S#state
.torrent_id
, Index
),
122 broadcast_have_message(Index
, S#state
.torrent_id
),
126 etorrent_piece_mgr:statechange(S#state
.torrent_id
,
129 etorrent_chunk_mgr:remove_chunks(S#state
.torrent_id
, Index
),
130 %% 'left' will be updated when the piece is chunked again.
133 handle_cast(stop
, S
) ->
135 handle_cast(_Msg
, State
) ->
138 %%--------------------------------------------------------------------
139 %% Function: handle_info(Info, State) -> {noreply, State} |
140 %% {noreply, State, Timeout} |
141 %% {stop, Reason, State}
142 %% Description: Handling all non call/cast messages
143 %%--------------------------------------------------------------------
144 handle_info({'DOWN', _R
, process, Pid
, _Reason
}, S
) ->
145 Nd
= remove_file_process(Pid
, S#state
.file_process_dict
),
146 {noreply
, S#state
{ file_process_dict
= Nd
}};
147 handle_info(_Info
, State
) ->
150 %%--------------------------------------------------------------------
151 %% Function: terminate(Reason, State) -> void()
152 %% Description: This function is called by a gen_server when it is about to
153 %% terminate. It should be the opposite of Module:init/1 and do any necessary
154 %% cleaning up. When it returns, the gen_server terminates with Reason.
155 %% The return value is ignored.
156 %%--------------------------------------------------------------------
157 terminate(_Reason
, S
) ->
158 ok
= stop_all_fs_processes(S#state
.file_process_dict
),
159 ok
= etorrent_path_map:delete(S#state
.torrent_id
),
162 %%--------------------------------------------------------------------
163 %% Func: code_change(OldVsn, State, Extra) -> {ok, NewState}
164 %% Description: Convert process state when code is changed
165 %%--------------------------------------------------------------------
166 code_change(_OldVsn
, State
, _Extra
) ->
169 %%--------------------------------------------------------------------
170 %%% Internal functions
171 %%--------------------------------------------------------------------
172 create_file_process(Id
, S
) ->
173 {ok
, Pid
} = etorrent_fs_pool_sup:add_file_process(S#state
.file_pool
, S#state
.torrent_id
, Id
),
174 erlang:monitor(process, Pid
),
175 NewDict
= dict:store(Id
, Pid
, S#state
.file_process_dict
),
176 {ok
, Pid
, S#state
{ file_process_dict
= NewDict
}}.
178 read_pieces_and_assemble([], FileData
, S
) ->
179 {ok
, list_to_binary(lists:reverse(FileData
)), S
};
180 read_pieces_and_assemble([{Id
, Offset
, Size
} | Rest
], Done
, S
) ->
181 case dict:find(Id
, S#state
.file_process_dict
) of
185 etorrent_fs_process:get_data(Pid
, Offset
, Size
)}) of
187 read_pieces_and_assemble(Rest
, [Data
| Done
], S
);
188 {'EXIT', {noproc
, _
}} ->
189 D
= remove_file_process(Pid
, S#state
.file_process_dict
),
190 read_pieces_and_assemble([{Id
, Offset
, Size
} | Rest
],
192 S#state
{file_process_dict
= D
})
195 {ok
, Pid
, NS
} = create_file_process(Id
, S
),
196 Data
= etorrent_fs_process:get_data(Pid
, Offset
, Size
),
197 read_pieces_and_assemble(Rest
, [Data
| Done
], NS
)
200 %%--------------------------------------------------------------------
201 %% Func: fs_write(Data, Operations, State) -> {ok, State}
202 %% Description: Write data defined by Operations. Returns new State
203 %% maintaining the file_process_dict.
204 %%--------------------------------------------------------------------
205 fs_write(<<>>, [], S
) ->
207 fs_write(Data
, [{Id
, Offset
, Size
} | Rest
], S
) ->
208 <<Chunk:Size
/binary, Remaining
/binary>> = Data
,
209 case dict:find(Id
, S#state
.file_process_dict
) of
213 etorrent_fs_process:put_data(Pid
, Chunk
,
216 fs_write(Remaining
, Rest
, S
);
217 {'EXIT', {noproc
, _
}} ->
218 D
= remove_file_process(Pid
, S#state
.file_process_dict
),
219 fs_write(Data
, [{Id
, Offset
, Size
} | Rest
],
220 S#state
{file_process_dict
= D
})
223 {ok
, Pid
, NS
} = create_file_process(Id
, S
),
224 ok
= etorrent_fs_process:put_data(Pid
, Chunk
, Offset
, Size
),
225 fs_write(Remaining
, Rest
, NS
)
228 remove_file_process(Pid
, Dict
) ->
229 case dict:fetch_keys(dict:filter(fun (_K
, V
) -> V
=:= Pid
end, Dict
)) of
231 dict:erase(Key
, Dict
);
236 stop_all_fs_processes(Dict
) ->
237 lists:foreach(fun({_
, Pid
}) -> etorrent_fs_process:stop(Pid
) end,
241 broadcast_have_message(Index
, TorrentId
) ->
242 Peers
= etorrent_peer:all(TorrentId
),
243 lists:foreach(fun (Peer
) ->
244 etorrent_t_peer_recv:have(Peer#peer
.pid, Index
)