1 %%%-------------------------------------------------------------------
2 %%% @author Joan Arnaldich Bernal <joan@Flatland>
3 %%% @copyright (C) 2011, Joan Arnaldich Bernal
6 %%% Module for an execution Host. An execution host abstracts a place
7 %%% where tasks can be executed, by answering to the submit(M,F,A)
8 %%% async call. Once the call has been processed, they return the
9 %%% message {host_done, Result} to the caller.
11 %%% An execution host may implement policies like an allowed number of
12 %%% parallel executions, round-robin against other execution hosts, etc...
14 %%% The implementation here will assume the execution host is the
15 %%% local machine and will limit the parallel executions givien an
16 %%% initial parameter. Other execution hosts will be implemented as
17 %%% tasks require them.
20 %%% Created : 9 Oct 2011 by Joan Arnaldich Bernal <joan@Flatland>
21 %%%-------------------------------------------------------------------
22 -module(mkrl_exec_host_node
).
24 -behaviour(gen_server
).
27 -export([submit
/4, start_link
/2, start
/2, stop
/1]).
29 %% gen_server callbacks
30 -export([init
/1, handle_call
/3, handle_cast
/2, handle_info
/2,
31 terminate
/2, code_change
/3]).
35 -define(SERVER
, ?MODULE
).
37 -record(state
, { max_parallel
:: integer(),
38 pending_mfas
:: [{atom(), atom(), [any()]}],
39 currently_spawned
:: [dict()] }).
41 %%%===================================================================
43 %%%===================================================================
45 %%--------------------------------------------------------------------
50 %%--------------------------------------------------------------------
51 -spec
start_link(atom(), integer()) -> {ok
, pid()} | {error
, any()} | ignore
.
52 start_link(Name
, MaxParallel
) ->
53 gen_server:start_link({local
, Name
}, ?MODULE
, [MaxParallel
], []).
55 -spec
start(atom(), integer()) -> {ok
, pid()} | {error
, any()} | ignore
.
56 start(Name
, MaxParallel
) ->
57 gen_server:start({local
, Name
}, ?MODULE
, [MaxParallel
], []).
60 -spec
submit(atom(), atom(), atom(), [any()]) -> ok
.
61 submit(ExecHost
, M
, F
, A
) ->
62 gen_server:cast(ExecHost
, {submit
, self(), M
, F
, A
} ).
65 -spec
stop(atom()) -> ok
.
67 gen_server:call(ExecHost
, stop
).
69 %%%===================================================================
70 %%% gen_server callbacks
71 %%%===================================================================
73 %%--------------------------------------------------------------------
76 %% Initializes the server
78 %% @spec init(Args) -> {ok, State} |
79 %% {ok, State, Timeout} |
83 %%--------------------------------------------------------------------
84 init([MaxParallel
]) ->
85 process_flag(trap_exit
, true
),
86 {ok
, #state
{ max_parallel
= MaxParallel
,
88 currently_spawned
= dict:new() }}.
90 %%--------------------------------------------------------------------
93 %% Handling call messages
95 %% @spec handle_call(Request, From, State) ->
96 %% {reply, Reply, State} |
97 %% {reply, Reply, State, Timeout} |
99 %% {noreply, State, Timeout} |
100 %% {stop, Reason, Reply, State} |
101 %% {stop, Reason, State}
103 %%--------------------------------------------------------------------
104 handle_call(stop
, _From
, State
) ->
105 {stop
, normal
, State
}.
107 %%--------------------------------------------------------------------
110 %% Handling cast messages
112 %% @spec handle_cast(Msg, State) -> {noreply, State} |
113 %% {noreply, State, Timeout} |
114 %% {stop, Reason, State}
116 %%--------------------------------------------------------------------
117 handle_cast({submit
, Caller
, M
, F
, A
}, State
) ->
118 %io:format("Before Submit State: ~p~n", [State]),
119 OldDict
= State#state
.currently_spawned
,
120 case dict:size(OldDict
) < State#state
.max_parallel
of
122 NewDict
= spawn_and_update_dict(Caller
, M
, F
, A
, OldDict
),
125 currently_spawned
= NewDict
}};
126 false
-> % just queue them...
129 pending_mfas
= [{Caller
, M
, F
, A
} | State#state
.pending_mfas
]}}
131 handle_cast(_Msg
, State
) -> {noreply
, State
}.
133 %%--------------------------------------------------------------------
136 %% Handling all non call/cast messages
138 %% @spec handle_info(Info, State) -> {noreply, State} |
139 %% {noreply, State, Timeout} |
140 %% {stop, Reason, State}
142 %%--------------------------------------------------------------------
143 handle_info({'EXIT', Pid
, Reason
}, State
) ->
144 %io:format("EXIT: Pid=~p~n Reason=~p~n", [Pid, Reason]),
145 OldSpawned
= State#state
.currently_spawned
,
146 case dict:find(Pid
, OldSpawned
) of
148 PidRemovedDict
= dict:erase(Pid
, OldSpawned
),
149 NewState
= case State#state
.pending_mfas
of
151 State#state
{ currently_spawned
= PidRemovedDict
};
152 [{Caller
, M
,F
,A
}| Rest
] ->
153 NewSpawns
= spawn_and_update_dict(Caller
, M
, F
, A
, PidRemovedDict
),
154 State#state
{ currently_spawned
= NewSpawns
,
155 pending_mfas
= Rest
}
157 {noreply
, NewState
};
159 %% Unknown subprocess
162 handle_info(_Msg
, State
) ->
165 %%--------------------------------------------------------------------
168 %% This function is called by a gen_server when it is about to
169 %% terminate. It should be the opposite of Module:init/1 and do any
170 %% necessary cleaning up. When it returns, the gen_server terminates
171 %% with Reason. The return value is ignored.
173 %% @spec terminate(Reason, State) -> void()
175 %%--------------------------------------------------------------------
176 terminate(_Reason
, _State
) ->
179 %%--------------------------------------------------------------------
182 %% Convert process state when code is changed
184 %% @spec code_change(OldVsn, State, Extra) -> {ok, NewState}
186 %%--------------------------------------------------------------------
187 code_change(_OldVsn
, State
, _Extra
) ->
190 %%%===================================================================
191 %%% Internal functions
192 %%%===================================================================
193 -spec
spawn_and_update_dict(pid(), atom(), atom(), atom(), dict()) -> dict().
194 spawn_and_update_dict(Caller
, M
, F
, A
, OldDict
) ->
195 NewPid
= spawn_link(fun() ->
196 Result
= apply(M
, F
, A
),
197 %io:format("RESULT: ~p Caller: ~p Pid: ~p~n", [Result, Caller, self()]),
198 Caller
! {host_done
, self(), Result
}
200 dict:store(NewPid
, {Caller
, M
, F
, A
}, OldDict
).