Skip to content

Commit

Permalink
Convert interarrival into throttle mechanism
Browse files Browse the repository at this point in the history
Have the controller simply requesting the user supervisors to start all
the users at once, and then have each user process await for throttle
permission.
  • Loading branch information
NelsonVides committed Jun 2, 2024
1 parent bc60d37 commit 989c5af
Show file tree
Hide file tree
Showing 6 changed files with 61 additions and 91 deletions.
9 changes: 4 additions & 5 deletions guides/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,10 @@ Amoc supports the following generic configuration parameters:
* default value - empty list (`[]`)
* example: `AMOC_NODES="['amoc@amoc-1', 'amoc@amoc-2']"`


* `interarrival` - a delay (in ms, for each node in the cluster independently) between creating the processes
for two consecutive users:
* default value - 50 ms.
* example: `AMOC_INTERARRIVAL="50"`
* `interarrival` - a throttle rate (in units per millisecond) between creating the processes
for two consecutive users, or 0 for no throttling:
* default value - {1200, 60000}, i.e., a new user every 50ms.
* example: `AMOC_INTERARRIVAL="{1200, 60000}"`
* this parameter can be updated at runtime (in the same way as scenario configuration).

* `extra_code_paths` - a list of paths that should be included using `code:add_pathsz/1` interface
Expand Down
127 changes: 48 additions & 79 deletions src/amoc_controller.erl
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,19 @@
-behaviour(gen_server).

-define(SERVER, ?MODULE).
-define(DEFAULT_INTERARRIVAL, 1200).

-required_variable(#{name => interarrival, default_value => 50,
verification => {?MODULE, non_neg_integer, 1},
description => "a delay between creating the processes for two "
"consecutive users (ms, def: 50ms)",
update => {?MODULE, maybe_update_interarrival_timer, 2}}).
-required_variable(#{name => interarrival, default_value => ?DEFAULT_INTERARRIVAL,
verification => {?MODULE, verify_interarrival, 1},
description => "Throttle rate for the Scenario:start/1,2 callback (def: 50ms)",
update => {?MODULE, update_interarrival_rate, 2}}).

-record(state, {scenario :: amoc:scenario() | undefined,
last_user_id = 0 :: last_user_id(),
status = idle :: idle | running | terminating | finished |
{error, any()} | disabled,
scenario_state :: any(), %% state returned from Scenario:init/0
create_users = [] :: [amoc_scenario:user_id()],
tref :: timer:tref() | undefined}).
create_users = [] :: [amoc_scenario:user_id()]}).

-type state() :: #state{}.
%% Internal state of the node's controller
Expand All @@ -41,7 +40,7 @@
%% Number of users currently running in the node
-type last_user_id() :: non_neg_integer().
%% Highest user id registered in the node
-type interarrival() :: non_neg_integer().
-type interarrival() :: amoc_throttle:rate() | amoc_throttle:throttle().
%% Time to wait in between spawning new users

%% ------------------------------------------------------------------
Expand All @@ -65,9 +64,9 @@
%% ------------------------------------------------------------------
%% Parameters verification functions
%% ------------------------------------------------------------------
-export([maybe_update_interarrival_timer/2, non_neg_integer/1]).
-export([update_interarrival_rate/2, verify_interarrival/1]).

-export([zero_users_running/0]).
-export([get_interarrival/0, zero_users_running/0]).

%% ------------------------------------------------------------------
%% gen_server Function Exports
Expand Down Expand Up @@ -122,14 +121,22 @@ disable() ->
gen_server:call(?SERVER, disable).

%% @private
-spec non_neg_integer(any()) -> boolean().
non_neg_integer(Interarrival) ->
is_integer(Interarrival) andalso Interarrival >= 0.
-spec verify_interarrival(any()) -> boolean().
verify_interarrival(infinity) ->
true;
verify_interarrival(Rate)
when is_integer(Rate), Rate >= 0 ->
true;
verify_interarrival(#{rate := Rate, interval := Interval})
when is_integer(Rate), Rate >= 0, is_integer(Interval), Interval > 0 ->
true;
verify_interarrival(_) ->
false.

%% @private
-spec maybe_update_interarrival_timer(interarrival, term()) -> ok.
maybe_update_interarrival_timer(interarrival, _) ->
gen_server:cast(?SERVER, maybe_update_interarrival_timer).
-spec update_interarrival_rate(interarrival, amoc_throttle:throttle()) -> ok.
update_interarrival_rate(interarrival, #{rate := Rate, interval := Interval}) ->
ok = amoc_throttle:change_rate(interarrival, Rate, Interval).

%% @private
-spec zero_users_running() -> ok.
Expand Down Expand Up @@ -180,8 +187,6 @@ handle_call(_Request, _From, State) ->

%% @private
-spec handle_cast(any(), state()) -> {noreply, state()}.
handle_cast(maybe_update_interarrival_timer, State) ->
{noreply, maybe_update_interarrival_timer(State)};
handle_cast(zero_users_running, State) ->
NewSate = handle_zero_users_running(State),
{noreply, NewSate};
Expand All @@ -190,12 +195,6 @@ handle_cast(_Msg, State) ->

%% @private
-spec handle_info(any(), state()) -> {noreply, state()}.
handle_info(start_user, State) ->
NewSate = handle_start_user(State),
{noreply, NewSate};
handle_info(start_all_users, State) ->
NewSate = handle_start_all_users(State),
{noreply, NewSate};
handle_info(_Msg, State) ->
{noreply, State}.

Expand Down Expand Up @@ -244,17 +243,14 @@ handle_update_settings(_Settings, #state{status = Status}) ->
-spec handle_add(amoc_scenario:user_id(), amoc_scenario:user_id(), state()) ->
{handle_call_res(), state()}.
handle_add(StartId, EndId, #state{last_user_id = LastId,
create_users = ScheduledUsers,
status = running,
scenario = Scenario,
tref = TRef} = State) when StartId =< EndId,
scenario_state = ScenarioState} = State) when StartId =< EndId,
LastId < StartId ->
amoc_telemetry:execute([controller, users], #{count => EndId - StartId + 1},
#{scenario => Scenario, type => add}),
NewUsers = lists:seq(StartId, EndId),
NewScheduledUsers = lists:append(ScheduledUsers, NewUsers),
NewTRef = maybe_start_timer(TRef),
{ok, State#state{create_users = NewScheduledUsers, tref = NewTRef, last_user_id = EndId}};
amoc_users_sup:start_children(Scenario, lists:seq(StartId, EndId), ScenarioState),
{ok, State#state{last_user_id = EndId}};
handle_add(_StartId, _EndId, #state{status = running} = State) ->
{{error, invalid_range}, State};
handle_add(_StartId, _EndId, #state{status = Status} = State) ->
Expand Down Expand Up @@ -287,23 +283,6 @@ handle_disable(#state{status = idle} = State) ->
handle_disable(#state{status = Status} = State) ->
{{error, {invalid_status, Status}}, State}.

-spec handle_start_user(state()) -> state().
handle_start_user(#state{create_users = [UserId | T],
scenario = Scenario,
scenario_state = ScenarioState} = State) ->
amoc_users_sup:start_child(Scenario, UserId, ScenarioState),
State#state{create_users = T};
handle_start_user(#state{create_users = [], tref = TRef} = State) ->
State#state{tref = maybe_stop_timer(TRef)}.

-spec handle_start_all_users(state()) -> state().
handle_start_all_users(#state{create_users = AllUsers,
scenario = Scenario,
scenario_state = ScenarioState,
tref = TRef} = State) ->
amoc_users_sup:start_children(Scenario, AllUsers, ScenarioState),
State#state{create_users = [], tref = maybe_stop_timer(TRef)}.

%% ------------------------------------------------------------------
%% helpers
%% ------------------------------------------------------------------
Expand All @@ -316,12 +295,15 @@ start_tables() -> %% ETS creation
{ok | error, any()}.
init_scenario(Scenario, Settings) ->
case amoc_config_scenario:parse_scenario_settings(Scenario, Settings) of
ok -> amoc_scenario:init(Scenario);
ok ->
start_interarrival(),
amoc_scenario:init(Scenario);
{error, Type, Reason} -> {error, {Type, Reason}}
end.

-spec terminate_scenario(state()) -> ok | {ok, any()} | {error, any()}.
terminate_scenario(#state{scenario = Scenario, scenario_state = ScenarioState}) ->
stop_interarrival(),
amoc_scenario:terminate(Scenario, ScenarioState).

-spec handle_zero_users_running(state()) -> state().
Expand All @@ -331,35 +313,22 @@ handle_zero_users_running(#state{status = terminating} = State) ->
handle_zero_users_running(State) ->
State.

-spec maybe_stop_timer(timer:tref() | undefined) -> undefined.
maybe_stop_timer(undefined) ->
undefined;
maybe_stop_timer(TRef) ->
{ok, cancel} = timer:cancel(TRef),
undefined.

-spec get_interarrival() -> interarrival().
-spec get_interarrival() -> infinity | interarrival().
get_interarrival() ->
amoc_config:get(interarrival).

-spec maybe_update_interarrival_timer(state()) -> state().
maybe_update_interarrival_timer(#state{tref = undefined} = State) ->
State;
maybe_update_interarrival_timer(#state{tref = TRef} = State) ->
{ok, cancel} = timer:cancel(TRef),
Value = get_interarrival(),
NewTRef = do_interarrival(Value),
State#state{tref = NewTRef}.

-spec maybe_start_timer(timer:tref() | undefined) -> timer:tref().
maybe_start_timer(undefined) ->
Value = get_interarrival(),
do_interarrival(Value);
maybe_start_timer(TRef) -> TRef.

do_interarrival(0) ->
self() ! start_all_users,
undefined;
do_interarrival(Value) ->
{ok, NewTRef} = timer:send_interval(Value, start_user),
NewTRef.
amoc_config:get(interarrival, ?DEFAULT_INTERARRIVAL).

-spec start_interarrival() -> any().
start_interarrival() ->
case get_interarrival() of
infinity ->
amoc_throttle:start(interarrival, 1);
#{rate := Rate, interval := Interval} ->
Config = #{rate => Rate, interval => Interval},
amoc_throttle:start(interarrival, Config);
Rate ->
amoc_throttle:start(interarrival, Rate)
end.

-spec stop_interarrival() -> any().
stop_interarrival() ->
amoc_throttle:stop(interarrival).
1 change: 1 addition & 0 deletions src/amoc_scenario.erl
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ start(Scenario, Id, State) ->
{false, false} ->
exit("the scenario module must export either start/2 or start/1 function")
end,
infinity =:= amoc_controller:get_interarrival() orelse amoc_throttle:wait(interarrival),
telemetry:span([amoc, scenario, start], Metadata, Span).

%% ------------------------------------------------------------------
Expand Down
4 changes: 2 additions & 2 deletions test/amoc_config_scenario_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ end_per_testcase(_, Config) ->
parse_scenario_settings(_) ->
mock_ets_tables(),
ets:insert(configurable_modules, {amoc_controller, configurable}),
ScenarioSettings = [{interarrival, 500},
ScenarioSettings = [{interarrival, #{rate => 12000, interval => 60000}},
{config_scenario_var1, def1}],
Ret = amoc_config_scenario:parse_scenario_settings(?MODULE, ScenarioSettings),
?assertEqual(ok, Ret),
Expand All @@ -114,7 +114,7 @@ parse_scenario_settings(_) ->
%% overwritten variable
?assertEqual(val2, amoc_config:get(config_scenario_var2)),
%% configurable module variable (defined in amoc_controller)
?assertEqual(500, amoc_config:get(interarrival)).
?assertEqual(#{rate => 12000, interval => 60000}, amoc_config:get(interarrival)).

update_settings(_) ->
set_initial_configuration(),
Expand Down
3 changes: 2 additions & 1 deletion test/controller_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ end_per_suite(Config) ->

init_per_testcase(_TestCase, Config) ->
application:ensure_all_started(amoc),
amoc_cluster:set_master_node(node()),
Config.

end_per_testcase(_TestCase, Config) ->
Expand Down Expand Up @@ -215,7 +216,7 @@ stop_running_scenario_with_users_eventually_terminates(_) ->
wait_helper:wait_until(WaitUntilFun, WaitUntilValue).

interarrival_equal_zero_starts_all_users_at_once(_) ->
Vars = [{interarrival, 0}, {testing_var1, def1} | test_helpers:other_vars_to_keep_quiet()],
Vars = [{interarrival, infinity}, {testing_var1, def1} | test_helpers:other_vars_to_keep_quiet()],
do_start_scenario(testing_scenario, Vars),
NumOfUsers = 1000,
amoc_controller:add_users(1, NumOfUsers),
Expand Down
8 changes: 4 additions & 4 deletions test/test_helpers.erl
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,18 @@ wait_until_scenario_has_users(Scenario, Current, HighestId, ExtraConfig) ->
wait_helper:wait_until(WaitUntilFun, WaitUntilValue, ExtraConfig).

all_vars() ->
[{interarrival, 1}, {testing_var1, def1},
[{interarrival, #{rate => 60000, interval => 60000}}, {testing_var1, def1},
{config_scenario_var1, unused_value}].

regular_vars() ->
[{interarrival, 1}, {testing_var1, def1}].
[{interarrival, #{rate => 60000, interval => 60000}}, {testing_var1, def1}].

all_vars_with_state() ->
[{interarrival, 1}, {testing_state_var1, def1},
[{interarrival, #{rate => 60000, interval => 60000}}, {testing_state_var1, def1},
{config_scenario_var1, unused_value}].

regular_vars_with_state() ->
[{interarrival, 1}, {testing_state_var1, def1}].
[{interarrival, #{rate => 60000, interval => 60000}}, {testing_state_var1, def1}].

other_vars_to_keep_quiet() ->
[{config_scenario_var1, unused_value}].

0 comments on commit 989c5af

Please sign in to comment.