From e929d3e857978fa4164d1df9129604899571fc98 Mon Sep 17 00:00:00 2001 From: jarnaldich Date: Mon, 10 Oct 2011 14:54:04 +0200 Subject: [PATCH] Seems to work --- src/mkrl_exec_host_node.erl | 69 +++++++++++++++++++++++++++++---------------- src/mkrl_utils.erl | 8 +++++- src/scratch.erl | 37 ++++++++++++++++++++++++ 3 files changed, 89 insertions(+), 25 deletions(-) create mode 100644 src/scratch.erl diff --git a/src/mkrl_exec_host_node.erl b/src/mkrl_exec_host_node.erl index 66e9e6f..a780736 100644 --- a/src/mkrl_exec_host_node.erl +++ b/src/mkrl_exec_host_node.erl @@ -24,13 +24,13 @@ -behaviour(gen_server). %% API --export([start_link/0]). +-export([submit/4, start_link/2, start/2, stop/1]). %% gen_server callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --export([submit/4, start_link/1]). + -define(SERVER, ?MODULE). @@ -52,10 +52,20 @@ start_link(Name, MaxParallel) -> gen_server:start_link({local, Name}, ?MODULE, [MaxParallel], []). +-spec start(atom(), integer()) -> {ok, pid()} | {error, any()} | ignore. +start(Name, MaxParallel) -> + gen_server:start({local, Name}, ?MODULE, [MaxParallel], []). + + -spec submit(atom(), atom(), atom(), [any()]) -> ok. submit(ExecHost, M, F, A) -> gen_server:cast(ExecHost, {submit, self(), M, F, A} ). + +-spec stop(atom()) -> ok. +stop(ExecHost) -> + gen_server:call(ExecHost, stop). + %%%=================================================================== %%% gen_server callbacks %%%=================================================================== @@ -91,9 +101,8 @@ init([MaxParallel]) -> %% {stop, Reason, State} %% @end %%-------------------------------------------------------------------- -handle_call(_Request, _From, State) -> - Reply = ok, - {reply, Reply, State}. +handle_call(stop, _From, State) -> + {stop, normal, State}. %%-------------------------------------------------------------------- %% @private @@ -105,23 +114,19 @@ handle_call(_Request, _From, State) -> %% {stop, Reason, State} %% @end %%-------------------------------------------------------------------- - handle_cast({submit, Caller, M, F, A}, State) -> + %io:format("Before Submit State: ~p~n", [State]), OldDict = State#state.currently_spawned, case dict:size(OldDict) < State#state.max_parallel of true -> - NewPid = spawn_link(fun() -> - Result = apply(M, F, A), - Caller ! {self(), Result} - end), - NewDict = dict:store(NewPid, {Caller, M, F, A}, OldDict), + NewDict = spawn_and_update_dict(Caller, M, F, A, OldDict), {noreply, State#state{ currently_spawned = NewDict }}; false -> % just queue them... {noreply, State#state{ - pending_mfas = [{M, F, A} | State#state.pending_mfas]}} + pending_mfas = [{Caller, M, F, A} | State#state.pending_mfas]}} end; handle_cast(_Msg, State) -> {noreply, State}. @@ -136,19 +141,26 @@ handle_cast(_Msg, State) -> {noreply, State}. %% @end %%-------------------------------------------------------------------- handle_info({'EXIT', Pid, Reason}, State) -> - case lists:member(Pid, State#state.currently_spawned) of - true -> - NewCurSpawned = case State#state.pending_mfas of - [] -> - lists:delete(Pid, List); - [{M,F,A}|_] -> - NewPid = spawn_link(M, F, A), - [NewPid | lists:delete(Pid, List)] - end, - false -> %% Unknown subprocess + %io:format("EXIT: Pid=~p~n Reason=~p~n", [Pid, Reason]), + OldSpawned = State#state.currently_spawned, + case dict:find(Pid, OldSpawned) of + {ok, _} -> + PidRemovedDict = dict:erase(Pid, OldSpawned), + NewState = case State#state.pending_mfas of + [] -> + State#state{ currently_spawned = PidRemovedDict }; + [{Caller, M,F,A}| Rest] -> + NewSpawns = spawn_and_update_dict(Caller, M, F, A, PidRemovedDict), + State#state{ currently_spawned = NewSpawns, + pending_mfas = Rest } + end, + {noreply, NewState }; + error -> + %% Unknown subprocess {noreply, State} - end. -handle_info(_Msg, State) -> {noreply, State}. + end; +handle_info(_Msg, State) -> + {noreply, State}. %%-------------------------------------------------------------------- %% @private @@ -178,3 +190,12 @@ code_change(_OldVsn, State, _Extra) -> %%%=================================================================== %%% Internal functions %%%=================================================================== +-spec spawn_and_update_dict(pid(), atom(), atom(), atom(), dict()) -> dict(). +spawn_and_update_dict(Caller, M, F, A, OldDict) -> + NewPid = spawn_link(fun() -> + Result = apply(M, F, A), + %io:format("RESULT: ~p Caller: ~p Pid: ~p~n", [Result, Caller, self()]), + Caller ! {host_done, self(), Result} + end), + dict:store(NewPid, {Caller, M, F, A}, OldDict). + diff --git a/src/mkrl_utils.erl b/src/mkrl_utils.erl index d679691..e600adf 100644 --- a/src/mkrl_utils.erl +++ b/src/mkrl_utils.erl @@ -7,7 +7,8 @@ %%%------------------------------------------------------------------- -module(mkrl_utils). -export([mkdir/1, - file_out/2]). + file_out/2, + delayed_apply/3]). -spec mkdir(string()) -> boolean(). mkdir(Dir) -> @@ -28,3 +29,8 @@ file_out(Terms, OutputFile) -> Terms). +-spec delayed_apply(integer(), fun(), [term()]) -> any(). +delayed_apply(Milli, F, Args) -> + timer:sleep(Milli), + apply(F, Args). + diff --git a/src/scratch.erl b/src/scratch.erl new file mode 100644 index 0000000..0a3d8d5 --- /dev/null +++ b/src/scratch.erl @@ -0,0 +1,37 @@ +%%%------------------------------------------------------------------- +%%% File : scratch.erl +%%% Author : <> +%%% Description : +%%% +%%% Created : 10 Oct 2011 by <> +%%%------------------------------------------------------------------- +-module(scratch). +-compile(export_all). + +spawner(Bandwith, Func, List) -> + mkrl_exec_host_node:start(spawner, Bandwith), + [ mkrl_exec_host_node:submit(spawner, + erlang, + apply, [Func, [N]]) + || N <- List ], + Waiter = fun() -> + receive {host_done, _Pid, Result} -> + Result + after 20000 -> + timeout + end + end, + Res = lists:foldl(fun(_, Ac) -> + [ Waiter() | Ac ] + end, + [], + List), + try + mkrl_exec_host_node:stop(spawner) + catch + Error:Reason -> io:format("Caught ~p:~p~n", [Error, Reason]) + end, + Res. + + +%% timer:tc(scratch, spawner, [10, fun(X) -> timer:sleep(100), X end, lists:seq(1, 20)]). -- 2.11.4.GIT