1 %%%-------------------------------------------------------------------
2 %%% File : file_system.erl
3 %%% Author : User Jlouis <jesper.louis.andersen@gmail.com>
4 %%% License : See COPYING
5 %%% Description : Implements access to the file system through
6 %%% file_process processes.
8 %%% Created : 19 Jun 2007 by User Jlouis <jesper.louis.andersen@gmail.com>
9 %%%-------------------------------------------------------------------
12 -include("etorrent_mnesia_table.hrl").
14 -behaviour(gen_server
).
17 -export([start_link
/2,
18 stop
/1, read_piece
/2, size_of_ops
/1,
19 write_chunk
/2, check_piece
/3]).
21 %% gen_server callbacks
22 -export([init
/1, handle_call
/3, handle_cast
/2, handle_info
/2,
23 terminate
/2, code_change
/3]).
25 -record(state
, { torrent_id
= none
, %% id of torrent we are serving
28 file_process_dict
= none
}).
30 %%====================================================================
32 %%====================================================================
34 %%--------------------------------------------------------------------
35 %% Function: size_of_ops(operation_list) -> integer()
36 %% Description: Return the file size of the given operations
37 %%--------------------------------------------------------------------
39 lists:sum([Size
|| {_Path
, _Offset
, Size
} <- Ops
]).
41 %%--------------------------------------------------------------------
42 %% Function: start_link/0
43 %% Description: Spawn and link a new file_system process
44 %%--------------------------------------------------------------------
45 start_link(IDHandle
, SPid
) ->
46 gen_server:start_link(?MODULE
, [IDHandle
, SPid
], []).
48 %%--------------------------------------------------------------------
49 %% Function: stop(Pid) -> ok
50 %% Description: Stop the file_system process identified by Pid
51 %%--------------------------------------------------------------------
53 gen_server:cast(Pid
, stop
).
55 %%--------------------------------------------------------------------
56 %% Function: read_piece(Pid, N) -> {ok, Binary}
57 %% Description: Ask file_system process Pid to retrieve Piece N
58 %%--------------------------------------------------------------------
59 read_piece(Pid
, Pn
) when is_integer(Pn
) ->
60 gen_server:call(Pid
, {read_piece
, Pn
}).
62 %%--------------------------------------------------------------------
63 %% Function: check_piece(Pid, PeerGroupPid, Index) -> ok | wrong_hash
64 %% Description: Search the mnesia tables for the Piece with Index and
65 %% write it back to disk.
66 %%--------------------------------------------------------------------
67 check_piece(Pid
, PeerGroupPid
, Index
) ->
68 gen_server:cast(Pid
, {check_piece
, PeerGroupPid
, Index
}).
70 %%% TODO: The PeerGroupPid could be obtained by the process itself.
71 write_chunk(Pid
, {Index
, Data
, Ops
}) ->
72 gen_server:cast(Pid
, {write_chunk
, {Index
, Data
, Ops
}}).
74 %%====================================================================
75 %% gen_server callbacks
76 %%====================================================================
77 init([IDHandle
, SPid
]) when is_integer(IDHandle
) ->
78 {ok
, #state
{file_process_dict
= dict:new(),
81 torrent_id
= IDHandle
}}.
83 handle_call(Msg
, _From
, S
) when S#state
.file_pool
=:= none
->
84 FSPool
= etorrent_t_sup:get_pid(S#state
.supervisor
, fs_pool
),
85 handle_call(Msg
, _From
, S#state
{ file_pool
= FSPool
});
86 handle_call({read_piece
, PieceNum
}, _From
, S
) ->
87 [#piece
{ files
= Operations
}] =
88 etorrent_piece:select(S#state
.torrent_id
, PieceNum
),
89 {ok
, Data
, NS
} = read_pieces_and_assemble(Operations
, [], S
),
90 {reply
, {ok
, Data
}, NS
}.
92 handle_cast(Msg
, S
) when S#state
.file_pool
=:= none
->
93 FSPool
= etorrent_t_sup:get_pid(S#state
.supervisor
, fs_pool
),
94 handle_cast(Msg
, S#state
{ file_pool
= FSPool
});
95 handle_cast({write_chunk
, {Index
, Data
, Ops
}}, S
) ->
96 case etorrent_piece:select(S#state
.torrent_id
, Index
) of
97 [P
] when P#piece
.state
=:= fetched
->
100 {ok
, NS
} = fs_write(Data
, Ops
, S
),
103 handle_cast({check_piece
, PeerGroupPid
, Index
}, S
) ->
104 [#piece
{ hash = Hash
, files
= Operations
}] =
105 etorrent_piece:select(S#state
.torrent_id
, Index
),
106 {ok
, Data
, NS
} = read_pieces_and_assemble(Operations
, [], S
),
107 DataSize
= size(Data
),
108 case Hash
== crypto:sha(Data
) of
110 ok
= etorrent_torrent:statechange(
112 [{subtract_left
, DataSize
},
113 {add_downloaded
, DataSize
}]),
114 {atomic
, ok
} = etorrent_piece:statechange(
118 %% Make sure there is no chunks left for this piece.
119 ok
= etorrent_chunk:remove_chunks(S#state
.torrent_id
, Index
),
120 ok
= etorrent_t_peer_group_mgr:broadcast_have(PeerGroupPid
,
125 etorrent_piece:statechange(S#state
.torrent_id
,
128 %% TODO: Kill the 'fetched' part in the chunk table.
129 %% TODO: Update 'left' correctly for the piece.
132 handle_cast(stop
, S
) ->
134 handle_cast(_Msg
, State
) ->
137 %%--------------------------------------------------------------------
138 %% Function: handle_info(Info, State) -> {noreply, State} |
139 %% {noreply, State, Timeout} |
140 %% {stop, Reason, State}
141 %% Description: Handling all non call/cast messages
142 %%--------------------------------------------------------------------
143 handle_info({'DOWN', _R
, process, Pid
, _Reason
}, S
) ->
144 Nd
= remove_file_process(Pid
, S#state
.file_process_dict
),
145 {noreply
, S#state
{ file_process_dict
= Nd
}};
146 handle_info(_Info
, State
) ->
149 %%--------------------------------------------------------------------
150 %% Function: terminate(Reason, State) -> void()
151 %% Description: This function is called by a gen_server when it is about to
152 %% terminate. It should be the opposite of Module:init/1 and do any necessary
153 %% cleaning up. When it returns, the gen_server terminates with Reason.
154 %% The return value is ignored.
155 %%--------------------------------------------------------------------
156 terminate(shutdown
, S
) ->
157 stop_all_fs_processes(S#state
.file_process_dict
),
159 terminate(Reason
, _State
) ->
160 error_logger:warning_report([fs_process_terminate
, Reason
]),
163 %%--------------------------------------------------------------------
164 %% Func: code_change(OldVsn, State, Extra) -> {ok, NewState}
165 %% Description: Convert process state when code is changed
166 %%--------------------------------------------------------------------
167 code_change(_OldVsn
, State
, _Extra
) ->
170 %%--------------------------------------------------------------------
171 %%% Internal functions
172 %%--------------------------------------------------------------------
173 create_file_process(Id
, S
) ->
174 {ok
, Pid
} = etorrent_fs_pool_sup:add_file_process(S#state
.file_pool
, S#state
.torrent_id
, Id
),
175 erlang:monitor(process, Pid
),
176 NewDict
= dict:store(Id
, Pid
, S#state
.file_process_dict
),
177 {ok
, Pid
, S#state
{ file_process_dict
= NewDict
}}.
179 read_pieces_and_assemble([], FileData
, S
) ->
180 {ok
, list_to_binary(lists:reverse(FileData
)), S
};
181 read_pieces_and_assemble([{Id
, Offset
, Size
} | Rest
], Done
, S
) ->
182 case dict:find(Id
, S#state
.file_process_dict
) of
186 etorrent_fs_process:get_data(Pid
, Offset
, Size
)}) of
188 read_pieces_and_assemble(Rest
, [Data
| Done
], S
);
189 {'EXIT', {noproc
, _
}} ->
190 D
= remove_file_process(Pid
, S#state
.file_process_dict
),
191 read_pieces_and_assemble([{Id
, Offset
, Size
} | Rest
],
193 S#state
{file_process_dict
= D
})
196 {ok
, Pid
, NS
} = create_file_process(Id
, S
),
197 Data
= etorrent_fs_process:get_data(Pid
, Offset
, Size
),
198 read_pieces_and_assemble(Rest
, [Data
| Done
], NS
)
201 %%--------------------------------------------------------------------
202 %% Func: fs_write(Data, Operations, State) -> {ok, State}
203 %% Description: Write data defined by Operations. Returns new State
204 %% maintaining the file_process_dict.
205 %%--------------------------------------------------------------------
206 fs_write(<<>>, [], S
) ->
208 fs_write(Data
, [{Id
, Offset
, Size
} | Rest
], S
) ->
209 <<Chunk:Size
/binary, Remaining
/binary>> = Data
,
210 case dict:find(Id
, S#state
.file_process_dict
) of
214 etorrent_fs_process:put_data(Pid
, Chunk
,
217 fs_write(Remaining
, Rest
, S
);
218 {'EXIT', {noproc
, _
}} ->
219 D
= remove_file_process(Pid
, S#state
.file_process_dict
),
220 fs_write(Data
, [{Id
, Offset
, Size
} | Rest
],
221 S#state
{file_process_dict
= D
})
224 {ok
, Pid
, NS
} = create_file_process(Id
, S
),
225 ok
= etorrent_fs_process:put_data(Pid
, Chunk
, Offset
, Size
),
226 fs_write(Remaining
, Rest
, NS
)
229 remove_file_process(Pid
, Dict
) ->
230 case dict:fetch_keys(dict:filter(fun (_K
, V
) -> V
=:= Pid
end, Dict
)) of
232 dict:erase(Key
, Dict
);
237 stop_all_fs_processes(Dict
) ->
238 [etorrent_fs_process:stop(Pid
) || {_
, Pid
} <- dict:to_list(Dict
)].