Seems to work
[makerl.git] / src / mkrl_exec_host_node.erl
bloba78073660cf3985636aa7a98571a1ebd7942cde3
1 %%%-------------------------------------------------------------------
2 %%% @author Joan Arnaldich Bernal <joan@Flatland>
3 %%% @copyright (C) 2011, Joan Arnaldich Bernal
4 %%% @doc
6 %%% Module for an execution Host. An execution host abstracts a place
7 %%% where tasks can be executed, by answering to the submit(M,F,A)
8 %%% async call. Once the call has been processed, they return the
9 %%% message {host_done, Result} to the caller.
10 %%%
11 %%% An execution host may implement policies like an allowed number of
12 %%% parallel executions, round-robin against other execution hosts, etc...
13 %%%
14 %%% The implementation here will assume the execution host is the
15 %%% local machine and will limit the parallel executions givien an
16 %%% initial parameter. Other execution hosts will be implemented as
17 %%% tasks require them.
19 %%% @end
20 %%% Created : 9 Oct 2011 by Joan Arnaldich Bernal <joan@Flatland>
21 %%%-------------------------------------------------------------------
22 -module(mkrl_exec_host_node).
24 -behaviour(gen_server).
26 %% API
27 -export([submit/4, start_link/2, start/2, stop/1]).
29 %% gen_server callbacks
30 -export([init/1, handle_call/3, handle_cast/2, handle_info/2,
31 terminate/2, code_change/3]).
35 -define(SERVER, ?MODULE).
37 -record(state, { max_parallel :: integer(),
38 pending_mfas :: [{atom(), atom(), [any()]}],
39 currently_spawned :: [dict()] }).
41 %%%===================================================================
42 %%% API
43 %%%===================================================================
45 %%--------------------------------------------------------------------
46 %% @doc
47 %% Starts the server
49 %% @end
50 %%--------------------------------------------------------------------
51 -spec start_link(atom(), integer()) -> {ok, pid()} | {error, any()} | ignore.
52 start_link(Name, MaxParallel) ->
53 gen_server:start_link({local, Name}, ?MODULE, [MaxParallel], []).
55 -spec start(atom(), integer()) -> {ok, pid()} | {error, any()} | ignore.
56 start(Name, MaxParallel) ->
57 gen_server:start({local, Name}, ?MODULE, [MaxParallel], []).
60 -spec submit(atom(), atom(), atom(), [any()]) -> ok.
61 submit(ExecHost, M, F, A) ->
62 gen_server:cast(ExecHost, {submit, self(), M, F, A} ).
65 -spec stop(atom()) -> ok.
66 stop(ExecHost) ->
67 gen_server:call(ExecHost, stop).
69 %%%===================================================================
70 %%% gen_server callbacks
71 %%%===================================================================
73 %%--------------------------------------------------------------------
74 %% @private
75 %% @doc
76 %% Initializes the server
78 %% @spec init(Args) -> {ok, State} |
79 %% {ok, State, Timeout} |
80 %% ignore |
81 %% {stop, Reason}
82 %% @end
83 %%--------------------------------------------------------------------
84 init([MaxParallel]) ->
85 process_flag(trap_exit, true),
86 {ok, #state{ max_parallel = MaxParallel,
87 pending_mfas = [],
88 currently_spawned = dict:new() }}.
90 %%--------------------------------------------------------------------
91 %% @private
92 %% @doc
93 %% Handling call messages
95 %% @spec handle_call(Request, From, State) ->
96 %% {reply, Reply, State} |
97 %% {reply, Reply, State, Timeout} |
98 %% {noreply, State} |
99 %% {noreply, State, Timeout} |
100 %% {stop, Reason, Reply, State} |
101 %% {stop, Reason, State}
102 %% @end
103 %%--------------------------------------------------------------------
104 handle_call(stop, _From, State) ->
105 {stop, normal, State}.
107 %%--------------------------------------------------------------------
108 %% @private
109 %% @doc
110 %% Handling cast messages
112 %% @spec handle_cast(Msg, State) -> {noreply, State} |
113 %% {noreply, State, Timeout} |
114 %% {stop, Reason, State}
115 %% @end
116 %%--------------------------------------------------------------------
117 handle_cast({submit, Caller, M, F, A}, State) ->
118 %io:format("Before Submit State: ~p~n", [State]),
119 OldDict = State#state.currently_spawned,
120 case dict:size(OldDict) < State#state.max_parallel of
121 true ->
122 NewDict = spawn_and_update_dict(Caller, M, F, A, OldDict),
123 {noreply,
124 State#state{
125 currently_spawned = NewDict }};
126 false -> % just queue them...
127 {noreply,
128 State#state{
129 pending_mfas = [{Caller, M, F, A} | State#state.pending_mfas]}}
130 end;
131 handle_cast(_Msg, State) -> {noreply, State}.
133 %%--------------------------------------------------------------------
134 %% @private
135 %% @doc
136 %% Handling all non call/cast messages
138 %% @spec handle_info(Info, State) -> {noreply, State} |
139 %% {noreply, State, Timeout} |
140 %% {stop, Reason, State}
141 %% @end
142 %%--------------------------------------------------------------------
143 handle_info({'EXIT', Pid, Reason}, State) ->
144 %io:format("EXIT: Pid=~p~n Reason=~p~n", [Pid, Reason]),
145 OldSpawned = State#state.currently_spawned,
146 case dict:find(Pid, OldSpawned) of
147 {ok, _} ->
148 PidRemovedDict = dict:erase(Pid, OldSpawned),
149 NewState = case State#state.pending_mfas of
150 [] ->
151 State#state{ currently_spawned = PidRemovedDict };
152 [{Caller, M,F,A}| Rest] ->
153 NewSpawns = spawn_and_update_dict(Caller, M, F, A, PidRemovedDict),
154 State#state{ currently_spawned = NewSpawns,
155 pending_mfas = Rest }
156 end,
157 {noreply, NewState };
158 error ->
159 %% Unknown subprocess
160 {noreply, State}
161 end;
162 handle_info(_Msg, State) ->
163 {noreply, State}.
165 %%--------------------------------------------------------------------
166 %% @private
167 %% @doc
168 %% This function is called by a gen_server when it is about to
169 %% terminate. It should be the opposite of Module:init/1 and do any
170 %% necessary cleaning up. When it returns, the gen_server terminates
171 %% with Reason. The return value is ignored.
173 %% @spec terminate(Reason, State) -> void()
174 %% @end
175 %%--------------------------------------------------------------------
176 terminate(_Reason, _State) ->
179 %%--------------------------------------------------------------------
180 %% @private
181 %% @doc
182 %% Convert process state when code is changed
184 %% @spec code_change(OldVsn, State, Extra) -> {ok, NewState}
185 %% @end
186 %%--------------------------------------------------------------------
187 code_change(_OldVsn, State, _Extra) ->
188 {ok, State}.
190 %%%===================================================================
191 %%% Internal functions
192 %%%===================================================================
193 -spec spawn_and_update_dict(pid(), atom(), atom(), atom(), dict()) -> dict().
194 spawn_and_update_dict(Caller, M, F, A, OldDict) ->
195 NewPid = spawn_link(fun() ->
196 Result = apply(M, F, A),
197 %io:format("RESULT: ~p Caller: ~p Pid: ~p~n", [Result, Caller, self()]),
198 Caller ! {host_done, self(), Result}
199 end),
200 dict:store(NewPid, {Caller, M, F, A}, OldDict).