Skip to content

Commit

Permalink
Merge pull request #175 from esl/users/pool_supervision_tree
Browse files Browse the repository at this point in the history
Users supervision tree
  • Loading branch information
DenysGonchar authored Feb 21, 2024
2 parents e63ae4b + 623a89f commit 7b1ea3d
Show file tree
Hide file tree
Showing 11 changed files with 550 additions and 151 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@ jobs:
name: ${{ matrix.test-type }} test on OTP ${{matrix.otp_vsn}}
strategy:
matrix:
otp_vsn: ['26.1', '25.3', '24.3']
otp_vsn: ['26.2', '25.3', '24.3']
rebar_vsn: ['3.22.0']
test-type: ['regular', 'integration']
runs-on: 'ubuntu-22.04'
steps:
- uses: actions/checkout@v3
- uses: actions/checkout@v4
- uses: erlef/setup-beam@v1
with:
otp-version: ${{ matrix.otp_vsn }}
Expand Down
2 changes: 1 addition & 1 deletion elvis.config
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
{elvis_style, invalid_dynamic_call, #{ignore => [amoc_code_server_SUITE]}},
{elvis_style, dont_repeat_yourself, #{min_complexity => 50}},
{elvis_style, no_debug_call, disable},
{elvis_style, no_block_expressions, #{ignore => [amoc_code_server_SUITE]}},
{elvis_style, no_block_expressions, #{ignore => [amoc_code_server_SUITE, controller_SUITE]}},
{elvis_style, no_throw, disable},
{elvis_style, no_import, disable}
]},
Expand Down
6 changes: 4 additions & 2 deletions guides/distributed-run.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,16 @@ amoc_dist:remove(50, Force).
```elixir
:amoc_dist.remove(50, force).
```
Remove 50 sessions.
Remove 50 sessions.

Where ``Force`` is a boolean of value:

* ``true`` - to kill the user processes using ``supervisor:terminate_child/2`` function
* ``false`` - to send ``exit(User,shutdown)`` signal to the user process (can be ignored by the user)

All the users are `temporary` children of the `simple_one_for_one` supervisor with the `shutdown` key set to `2000`.
All the users are `temporary` children of a `simple_one_for_one` supervisor with the `shutdown` key set to `2000`.

Note that removal operation is asynchronous, and if we call `amoc_controller:remove_users/2` two times in a row, it may select the same users for removal.

Also all the user processes trap exits.

Expand Down
4 changes: 4 additions & 0 deletions guides/local-run.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ amoc:remove(10, true).
:amoc.remove(10, true).
```

Note that removal operation is asynchronous, and if we call `amoc_controller:remove_users/2` two times in a row, it may select the same users for removal.

Also note that all the user processes trap exits.

#### Many independent Amoc nodes

Sometimes a need arises to run several Amoc nodes independently from each other.
Expand Down
2 changes: 1 addition & 1 deletion integration_test/extra_code_paths/path1/dummy_helper.erl
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ test_amoc_dist() ->
get_users_info(SlaveNodes) ->
Users = [{Node, Id} ||
Node <- SlaveNodes,
{Id, _Pid} <- rpc:call(Node, ets, tab2list, [amoc_users])],
{_Pid, Id} <- rpc:call(Node, amoc_users_sup, get_all_children, [])],
Ids = lists:usort([Id || {_, Id} <- Users]),
Nodes = lists:usort([Node || {Node, _} <- Users]),
N = length(Ids),
Expand Down
145 changes: 67 additions & 78 deletions src/amoc_controller.erl
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
%% @copyright 2023 Erlang Solutions Ltd.
%% @copyright 2024 Erlang Solutions Ltd.
%% @doc Main controller of a node, responsible for the scenario and the users
%%
%% Note that this module should be rarely used, APIs are fully exposed by `amoc' and `amoc_dist'
Expand All @@ -8,16 +8,14 @@
-behaviour(gen_server).

-define(SERVER, ?MODULE).
-define(USERS_TABLE, amoc_users).

-required_variable(#{name => interarrival, default_value => 50,
verification => {?MODULE, positive_integer, 1},
verification => {?MODULE, non_neg_integer, 1},
description => "a delay between creating the processes for two "
"consecutive users (ms, def: 50ms)",
update => {?MODULE, maybe_update_interarrival_timer, 2}}).

-record(state, {scenario :: amoc:scenario() | undefined,
no_of_users = 0 :: user_count(),
last_user_id = 0 :: last_user_id(),
status = idle :: idle | running | terminating | finished |
{error, any()} | disabled,
Expand Down Expand Up @@ -67,7 +65,9 @@
%% ------------------------------------------------------------------
%% Parameters verification functions
%% ------------------------------------------------------------------
-export([maybe_update_interarrival_timer/2, positive_integer/1]).
-export([maybe_update_interarrival_timer/2, non_neg_integer/1]).

-export([zero_users_running/0]).

%% ------------------------------------------------------------------
%% gen_server Function Exports
Expand All @@ -77,6 +77,7 @@
%% ------------------------------------------------------------------
%% API Function Definitions
%% ------------------------------------------------------------------

%% @private
-spec start_link() -> {ok, pid()}.
start_link() ->
Expand Down Expand Up @@ -121,17 +122,24 @@ disable() ->
gen_server:call(?SERVER, disable).

%% @private
-spec positive_integer(any()) -> boolean().
positive_integer(Interarrival) ->
is_integer(Interarrival) andalso Interarrival > 0.
-spec non_neg_integer(any()) -> boolean().
non_neg_integer(Interarrival) ->
is_integer(Interarrival) andalso Interarrival >= 0.

%% @private
-spec maybe_update_interarrival_timer(interarrival, term()) -> ok.
maybe_update_interarrival_timer(interarrival, _) ->
gen_server:cast(?SERVER, maybe_update_interarrival_timer).

%% @private
-spec zero_users_running() -> ok.
zero_users_running() ->
gen_server:cast(?SERVER, zero_users_running).

%% ------------------------------------------------------------------
%% gen_server Function Definitions
%% ------------------------------------------------------------------

%% @private
-spec init([]) -> {ok, state()}.
init([]) ->
Expand Down Expand Up @@ -174,6 +182,9 @@ handle_call(_Request, _From, State) ->
-spec handle_cast(any(), state()) -> {noreply, state()}.
handle_cast(maybe_update_interarrival_timer, State) ->
{noreply, maybe_update_interarrival_timer(State)};
handle_cast(zero_users_running, State) ->
NewSate = handle_zero_users_running(State),
{noreply, NewSate};
handle_cast(_Msg, State) ->
{noreply, State}.

Expand All @@ -182,8 +193,8 @@ handle_cast(_Msg, State) ->
handle_info(start_user, State) ->
NewSate = handle_start_user(State),
{noreply, NewSate};
handle_info({'DOWN', _, process, Pid, _}, State) ->
NewSate = handle_stop_user(Pid, State),
handle_info(start_all_users, State) ->
NewSate = handle_start_all_users(State),
{noreply, NewSate};
handle_info(_Msg, State) ->
{noreply, State}.
Expand All @@ -209,12 +220,15 @@ handle_start_scenario(_Scenario, _Settings, #state{status = Status} = State) ->
{{error, {invalid_status, Status}}, State}.

-spec handle_stop_scenario(state()) -> {handle_call_res(), state()}.
handle_stop_scenario(#state{no_of_users = 0, status = running} = State) ->
terminate_scenario(State),
{ok, State#state{status = finished}};
handle_stop_scenario(#state{status = running} = State) ->
terminate_all_users(),
{ok, State#state{status = terminating}};
case amoc_users_sup:count_no_of_users() of
0 ->
terminate_scenario(State),
{ok, State#state{status = finished}};
_ ->
amoc_users_sup:terminate_all_children(),
{ok, State#state{status = terminating}}
end;
handle_stop_scenario(#state{status = Status} = State) ->
{{error, {invalid_status, Status}}, State}.

Expand All @@ -240,29 +254,25 @@ handle_add(StartId, EndId, #state{last_user_id = LastId,
NewUsers = lists:seq(StartId, EndId),
NewScheduledUsers = lists:append(ScheduledUsers, NewUsers),
NewTRef = maybe_start_timer(TRef),
{ok, State#state{create_users = NewScheduledUsers, tref = NewTRef,
last_user_id = EndId}};
{ok, State#state{create_users = NewScheduledUsers, tref = NewTRef, last_user_id = EndId}};
handle_add(_StartId, _EndId, #state{status = running} = State) ->
{{error, invalid_range}, State};
handle_add(_StartId, _EndId, #state{status = Status} = State) ->
{{error, {invalid_status, Status}}, State}.

-spec handle_remove(user_count(), boolean(), state()) -> handle_call_res().
handle_remove(Count, ForceRemove, #state{status = running, scenario = Scenario}) ->
amoc_telemetry:execute([controller, users], #{count => Count},
CountRemove = amoc_users_sup:stop_children(Count, ForceRemove),
amoc_telemetry:execute([controller, users], #{count => CountRemove},
#{scenario => Scenario, type => remove}),
Pids = case ets:match_object(?USERS_TABLE, '$1', Count) of
{Objects, _} -> [Pid || {_Id, Pid} <- Objects];
'$end_of_table' -> []
end,
amoc_users_sup:stop_children(Pids, ForceRemove),
{ok, length(Pids)};
{ok, CountRemove};
handle_remove(_Count, _ForceRemove, #state{status = Status}) ->
{error, {invalid_status, Status}}.

-spec handle_status(state()) -> amoc_status().
handle_status(#state{status = running, scenario = Scenario,
no_of_users = N, last_user_id = LastId}) ->
last_user_id = LastId}) ->
N = amoc_users_sup:count_no_of_users(),
{running, #{scenario => Scenario, currently_running_users => N, highest_user_id => LastId}};
handle_status(#state{status = terminating, scenario = Scenario}) ->
{terminating, Scenario};
Expand All @@ -279,33 +289,26 @@ handle_disable(#state{status = Status} = State) ->

-spec handle_start_user(state()) -> state().
handle_start_user(#state{create_users = [UserId | T],
no_of_users = N,
scenario = Scenario,
scenario_state = ScenarioState} = State) ->
start_user(Scenario, UserId, ScenarioState),
State#state{create_users = T, no_of_users = N + 1};
amoc_users_sup:start_child(Scenario, UserId, ScenarioState),
State#state{create_users = T};
handle_start_user(#state{create_users = [], tref = TRef} = State) ->
State#state{tref = maybe_stop_timer(TRef)}.

-spec handle_stop_user(pid(), state()) -> state().
handle_stop_user(Pid, State) ->
case ets:match(?USERS_TABLE, {'$1', Pid}, 1) of
{[[UserId]], _} ->
ets:delete(?USERS_TABLE, UserId),
dec_no_of_users(State);
_ ->
State
end.
-spec handle_start_all_users(state()) -> state().
handle_start_all_users(#state{create_users = AllUsers,
scenario = Scenario,
scenario_state = ScenarioState,
tref = TRef} = State) ->
amoc_users_sup:start_children(Scenario, AllUsers, ScenarioState),
State#state{create_users = [], tref = maybe_stop_timer(TRef)}.

%% ------------------------------------------------------------------
%% helpers
%% ------------------------------------------------------------------
-spec start_tables() -> ok.
start_tables() -> %% ETS creation
?USERS_TABLE = ets:new(?USERS_TABLE, [named_table,
ordered_set,
protected,
{read_concurrency, true}]),
amoc_config_utils:create_amoc_config_ets(),
ok.

Expand All @@ -321,11 +324,12 @@ init_scenario(Scenario, Settings) ->
terminate_scenario(#state{scenario = Scenario, scenario_state = ScenarioState}) ->
amoc_scenario:terminate(Scenario, ScenarioState).

-spec maybe_start_timer(timer:tref() | undefined) -> timer:tref().
maybe_start_timer(undefined) ->
{ok, TRef} = timer:send_interval(interarrival(), start_user),
TRef;
maybe_start_timer(TRef) -> TRef.
-spec handle_zero_users_running(state()) -> state().
handle_zero_users_running(#state{status = terminating} = State) ->
terminate_scenario(State),
State#state{status = finished};
handle_zero_users_running(State) ->
State.

-spec maybe_stop_timer(timer:tref() | undefined) -> undefined.
maybe_stop_timer(undefined) ->
Expand All @@ -334,43 +338,28 @@ maybe_stop_timer(TRef) ->
{ok, cancel} = timer:cancel(TRef),
undefined.

-spec start_user(amoc:scenario(), amoc_scenario:user_id(), any()) -> ok.
start_user(Scenario, Id, ScenarioState) ->
{ok, Pid} = supervisor:start_child(amoc_users_sup, [Scenario, Id, ScenarioState]),
ets:insert(?USERS_TABLE, {Id, Pid}),
erlang:monitor(process, Pid),
ok.

-spec terminate_all_users() -> any().
terminate_all_users() ->
%stop all the users
Match = ets:match_object(?USERS_TABLE, '$1', 200),
terminate_all_users(Match).

%% ets:continuation/0 type is unfortunately not exported from the ets module.
-spec terminate_all_users({tuple(), term()} | '$end_of_table') -> ok.
terminate_all_users({Objects, Continuation}) ->
Pids = [Pid || {_Id, Pid} <- Objects],
amoc_users_sup:stop_children(Pids, true),
Match = ets:match_object(Continuation),
terminate_all_users(Match);
terminate_all_users('$end_of_table') -> ok.

-spec dec_no_of_users(state()) -> state().
dec_no_of_users(#state{no_of_users = 1, status = terminating} = State) ->
terminate_scenario(State),
State#state{no_of_users = 0, status = finished};
dec_no_of_users(#state{no_of_users = N} = State) ->
State#state{no_of_users = N - 1}.

-spec interarrival() -> interarrival().
interarrival() ->
-spec get_interarrival() -> interarrival().
get_interarrival() ->
amoc_config:get(interarrival).

-spec maybe_update_interarrival_timer(state()) -> state().
maybe_update_interarrival_timer(#state{tref = undefined} = State) ->
State;
maybe_update_interarrival_timer(#state{tref = TRef} = State) ->
{ok, cancel} = timer:cancel(TRef),
{ok, NewTRef} = timer:send_interval(interarrival(), start_user),
Value = get_interarrival(),
NewTRef = do_interarrival(Value),
State#state{tref = NewTRef}.

-spec maybe_start_timer(timer:tref() | undefined) -> timer:tref().
maybe_start_timer(undefined) ->
Value = get_interarrival(),
do_interarrival(Value);
maybe_start_timer(TRef) -> TRef.

do_interarrival(0) ->
self() ! start_all_users,
undefined;
do_interarrival(Value) ->
{ok, NewTRef} = timer:send_interval(Value, start_user),
NewTRef.
9 changes: 3 additions & 6 deletions src/amoc_sup.erl
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
%% @private
%% @copyright 2023 Erlang Solutions Ltd.
%% @copyright 2024 Erlang Solutions Ltd.
-module(amoc_sup).

-behaviour(supervisor).
Expand All @@ -26,12 +26,9 @@ start_link() ->
%% Supervisor callbacks
%% ===================================================================
-spec init(term()) ->
{ok, {SupFlags :: {RestartStrategy :: supervisor:strategy(),
MaxR :: non_neg_integer(), MaxT :: pos_integer()},
[ChildSpec :: supervisor:child_spec()]
}}.
{ok, {supervisor:sup_flags(), [supervisor:child_spec()]}}.
init([]) ->
{ok, {{one_for_one, 5, 10},
{ok, {#{strategy => one_for_all, intensity => 0},
[
?SUP(amoc_users_sup, supervisor),
?SUP(amoc_throttle_sup, supervisor),
Expand Down
6 changes: 3 additions & 3 deletions src/users/amoc_user.erl
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
%% @private
%% @copyright 2023 Erlang Solutions Ltd.
%% @copyright 2024 Erlang Solutions Ltd.
-module(amoc_user).

%% API
Expand All @@ -10,15 +10,15 @@
-type state() :: term().

-spec start_link(amoc:scenario(), amoc_scenario:user_id(), state()) ->
{ok, pid()}.
{ok, pid()} | {error, term()}.
start_link(Scenario, Id, State) ->
proc_lib:start_link(?MODULE, init, [self(), Scenario, Id, State]).

-spec stop() -> no_return().
stop() ->
stop(self(), false).

-spec stop(pid(), boolean()) -> no_return() | ok | {error, any()}.
-spec stop(pid(), boolean()) -> ok.
stop(Pid, Force) when is_pid(Pid) ->
amoc_users_sup:stop_child(Pid, Force).

Expand Down
Loading

0 comments on commit 7b1ea3d

Please sign in to comment.