diff --git a/guides/configuration.md b/guides/configuration.md index 2cbb0058..7116f7e7 100644 --- a/guides/configuration.md +++ b/guides/configuration.md @@ -9,11 +9,10 @@ Amoc supports the following generic configuration parameters: * default value - empty list (`[]`) * example: `AMOC_NODES="['amoc@amoc-1', 'amoc@amoc-2']"` - -* `interarrival` - a delay (in ms, for each node in the cluster independently) between creating the processes - for two consecutive users: - * default value - 50 ms. - * example: `AMOC_INTERARRIVAL="50"` +* `interarrival` - a throttle rate (in units per millisecond) between creating the processes + for two consecutive users, or 0 for no throttling: + * default value - {1200, 60000}, i.e., a new user every 50ms. + * example: `AMOC_INTERARRIVAL="{1200, 60000}"` * this parameter can be updated at runtime (in the same way as scenario configuration). * `extra_code_paths` - a list of paths that should be included using `code:add_pathsz/1` interface diff --git a/src/amoc_controller.erl b/src/amoc_controller.erl index 94a84f9b..a30b41c9 100644 --- a/src/amoc_controller.erl +++ b/src/amoc_controller.erl @@ -8,20 +8,19 @@ -behaviour(gen_server). -define(SERVER, ?MODULE). +-define(DEFAULT_INTERARRIVAL, 1200). --required_variable(#{name => interarrival, default_value => 50, - verification => {?MODULE, non_neg_integer, 1}, - description => "a delay between creating the processes for two " - "consecutive users (ms, def: 50ms)", - update => {?MODULE, maybe_update_interarrival_timer, 2}}). +-required_variable(#{name => interarrival, default_value => ?DEFAULT_INTERARRIVAL, + verification => {?MODULE, verify_interarrival, 1}, + description => "Throttle rate for the Scenario:start/1,2 callback (def: 50ms)", + update => {?MODULE, update_interarrival_rate, 2}}). -record(state, {scenario :: amoc:scenario() | undefined, last_user_id = 0 :: last_user_id(), status = idle :: idle | running | terminating | finished | {error, any()} | disabled, scenario_state :: any(), %% state returned from Scenario:init/0 - create_users = [] :: [amoc_scenario:user_id()], - tref :: timer:tref() | undefined}). + create_users = [] :: [amoc_scenario:user_id()]}). -type state() :: #state{}. %% Internal state of the node's controller @@ -41,7 +40,7 @@ %% Number of users currently running in the node -type last_user_id() :: non_neg_integer(). %% Highest user id registered in the node --type interarrival() :: non_neg_integer(). +-type interarrival() :: amoc_throttle:rate() | amoc_throttle:throttle(). %% Time to wait in between spawning new users %% ------------------------------------------------------------------ @@ -65,9 +64,9 @@ %% ------------------------------------------------------------------ %% Parameters verification functions %% ------------------------------------------------------------------ --export([maybe_update_interarrival_timer/2, non_neg_integer/1]). +-export([update_interarrival_rate/2, verify_interarrival/1]). --export([zero_users_running/0]). +-export([get_interarrival/0, zero_users_running/0]). %% ------------------------------------------------------------------ %% gen_server Function Exports @@ -122,14 +121,22 @@ disable() -> gen_server:call(?SERVER, disable). %% @private --spec non_neg_integer(any()) -> boolean(). -non_neg_integer(Interarrival) -> - is_integer(Interarrival) andalso Interarrival >= 0. +-spec verify_interarrival(any()) -> boolean(). +verify_interarrival(infinity) -> + true; +verify_interarrival(Rate) + when is_integer(Rate), Rate >= 0 -> + true; +verify_interarrival(#{rate := Rate, interval := Interval}) + when is_integer(Rate), Rate >= 0, is_integer(Interval), Interval > 0 -> + true; +verify_interarrival(_) -> + false. %% @private --spec maybe_update_interarrival_timer(interarrival, term()) -> ok. -maybe_update_interarrival_timer(interarrival, _) -> - gen_server:cast(?SERVER, maybe_update_interarrival_timer). +-spec update_interarrival_rate(interarrival, amoc_throttle:throttle()) -> ok. +update_interarrival_rate(interarrival, #{rate := Rate, interval := Interval}) -> + ok = amoc_throttle:change_rate(interarrival, Rate, Interval). %% @private -spec zero_users_running() -> ok. @@ -180,8 +187,6 @@ handle_call(_Request, _From, State) -> %% @private -spec handle_cast(any(), state()) -> {noreply, state()}. -handle_cast(maybe_update_interarrival_timer, State) -> - {noreply, maybe_update_interarrival_timer(State)}; handle_cast(zero_users_running, State) -> NewSate = handle_zero_users_running(State), {noreply, NewSate}; @@ -190,12 +195,6 @@ handle_cast(_Msg, State) -> %% @private -spec handle_info(any(), state()) -> {noreply, state()}. -handle_info(start_user, State) -> - NewSate = handle_start_user(State), - {noreply, NewSate}; -handle_info(start_all_users, State) -> - NewSate = handle_start_all_users(State), - {noreply, NewSate}; handle_info(_Msg, State) -> {noreply, State}. @@ -244,17 +243,14 @@ handle_update_settings(_Settings, #state{status = Status}) -> -spec handle_add(amoc_scenario:user_id(), amoc_scenario:user_id(), state()) -> {handle_call_res(), state()}. handle_add(StartId, EndId, #state{last_user_id = LastId, - create_users = ScheduledUsers, status = running, scenario = Scenario, - tref = TRef} = State) when StartId =< EndId, + scenario_state = ScenarioState} = State) when StartId =< EndId, LastId < StartId -> amoc_telemetry:execute([controller, users], #{count => EndId - StartId + 1}, #{scenario => Scenario, type => add}), - NewUsers = lists:seq(StartId, EndId), - NewScheduledUsers = lists:append(ScheduledUsers, NewUsers), - NewTRef = maybe_start_timer(TRef), - {ok, State#state{create_users = NewScheduledUsers, tref = NewTRef, last_user_id = EndId}}; + amoc_users_sup:start_children(Scenario, lists:seq(StartId, EndId), ScenarioState), + {ok, State#state{last_user_id = EndId}}; handle_add(_StartId, _EndId, #state{status = running} = State) -> {{error, invalid_range}, State}; handle_add(_StartId, _EndId, #state{status = Status} = State) -> @@ -287,23 +283,6 @@ handle_disable(#state{status = idle} = State) -> handle_disable(#state{status = Status} = State) -> {{error, {invalid_status, Status}}, State}. --spec handle_start_user(state()) -> state(). -handle_start_user(#state{create_users = [UserId | T], - scenario = Scenario, - scenario_state = ScenarioState} = State) -> - amoc_users_sup:start_child(Scenario, UserId, ScenarioState), - State#state{create_users = T}; -handle_start_user(#state{create_users = [], tref = TRef} = State) -> - State#state{tref = maybe_stop_timer(TRef)}. - --spec handle_start_all_users(state()) -> state(). -handle_start_all_users(#state{create_users = AllUsers, - scenario = Scenario, - scenario_state = ScenarioState, - tref = TRef} = State) -> - amoc_users_sup:start_children(Scenario, AllUsers, ScenarioState), - State#state{create_users = [], tref = maybe_stop_timer(TRef)}. - %% ------------------------------------------------------------------ %% helpers %% ------------------------------------------------------------------ @@ -316,12 +295,15 @@ start_tables() -> %% ETS creation {ok | error, any()}. init_scenario(Scenario, Settings) -> case amoc_config_scenario:parse_scenario_settings(Scenario, Settings) of - ok -> amoc_scenario:init(Scenario); + ok -> + start_interarrival(), + amoc_scenario:init(Scenario); {error, Type, Reason} -> {error, {Type, Reason}} end. -spec terminate_scenario(state()) -> ok | {ok, any()} | {error, any()}. terminate_scenario(#state{scenario = Scenario, scenario_state = ScenarioState}) -> + stop_interarrival(), amoc_scenario:terminate(Scenario, ScenarioState). -spec handle_zero_users_running(state()) -> state(). @@ -331,35 +313,22 @@ handle_zero_users_running(#state{status = terminating} = State) -> handle_zero_users_running(State) -> State. --spec maybe_stop_timer(timer:tref() | undefined) -> undefined. -maybe_stop_timer(undefined) -> - undefined; -maybe_stop_timer(TRef) -> - {ok, cancel} = timer:cancel(TRef), - undefined. - --spec get_interarrival() -> interarrival(). +-spec get_interarrival() -> infinity | interarrival(). get_interarrival() -> - amoc_config:get(interarrival). - --spec maybe_update_interarrival_timer(state()) -> state(). -maybe_update_interarrival_timer(#state{tref = undefined} = State) -> - State; -maybe_update_interarrival_timer(#state{tref = TRef} = State) -> - {ok, cancel} = timer:cancel(TRef), - Value = get_interarrival(), - NewTRef = do_interarrival(Value), - State#state{tref = NewTRef}. - --spec maybe_start_timer(timer:tref() | undefined) -> timer:tref(). -maybe_start_timer(undefined) -> - Value = get_interarrival(), - do_interarrival(Value); -maybe_start_timer(TRef) -> TRef. - -do_interarrival(0) -> - self() ! start_all_users, - undefined; -do_interarrival(Value) -> - {ok, NewTRef} = timer:send_interval(Value, start_user), - NewTRef. + amoc_config:get(interarrival, ?DEFAULT_INTERARRIVAL). + +-spec start_interarrival() -> any(). +start_interarrival() -> + case get_interarrival() of + infinity -> + amoc_throttle:start(interarrival, 1); + #{rate := Rate, interval := Interval} -> + Config = #{rate => Rate, interval => Interval}, + amoc_throttle:start(interarrival, Config); + Rate -> + amoc_throttle:start(interarrival, Rate) + end. + +-spec stop_interarrival() -> any(). +stop_interarrival() -> + amoc_throttle:stop(interarrival). diff --git a/src/amoc_scenario.erl b/src/amoc_scenario.erl index 1209d1c4..6a3b4098 100644 --- a/src/amoc_scenario.erl +++ b/src/amoc_scenario.erl @@ -91,6 +91,7 @@ start(Scenario, Id, State) -> {false, false} -> exit("the scenario module must export either start/2 or start/1 function") end, + infinity =:= amoc_controller:get_interarrival() orelse amoc_throttle:wait(interarrival), telemetry:span([amoc, scenario, start], Metadata, Span). %% ------------------------------------------------------------------ diff --git a/test/amoc_config_scenario_SUITE.erl b/test/amoc_config_scenario_SUITE.erl index 6ab04ba3..f91019af 100644 --- a/test/amoc_config_scenario_SUITE.erl +++ b/test/amoc_config_scenario_SUITE.erl @@ -94,7 +94,7 @@ end_per_testcase(_, Config) -> parse_scenario_settings(_) -> mock_ets_tables(), ets:insert(configurable_modules, {amoc_controller, configurable}), - ScenarioSettings = [{interarrival, 500}, + ScenarioSettings = [{interarrival, #{rate => 12000, interval => 60000}}, {config_scenario_var1, def1}], Ret = amoc_config_scenario:parse_scenario_settings(?MODULE, ScenarioSettings), ?assertEqual(ok, Ret), @@ -114,7 +114,7 @@ parse_scenario_settings(_) -> %% overwritten variable ?assertEqual(val2, amoc_config:get(config_scenario_var2)), %% configurable module variable (defined in amoc_controller) - ?assertEqual(500, amoc_config:get(interarrival)). + ?assertEqual(#{rate => 12000, interval => 60000}, amoc_config:get(interarrival)). update_settings(_) -> set_initial_configuration(), diff --git a/test/controller_SUITE.erl b/test/controller_SUITE.erl index 18a3b54c..c280ba55 100644 --- a/test/controller_SUITE.erl +++ b/test/controller_SUITE.erl @@ -62,6 +62,7 @@ end_per_suite(Config) -> init_per_testcase(_TestCase, Config) -> application:ensure_all_started(amoc), + amoc_cluster:set_master_node(node()), Config. end_per_testcase(_TestCase, Config) -> @@ -215,7 +216,7 @@ stop_running_scenario_with_users_eventually_terminates(_) -> wait_helper:wait_until(WaitUntilFun, WaitUntilValue). interarrival_equal_zero_starts_all_users_at_once(_) -> - Vars = [{interarrival, 0}, {testing_var1, def1} | test_helpers:other_vars_to_keep_quiet()], + Vars = [{interarrival, infinity}, {testing_var1, def1} | test_helpers:other_vars_to_keep_quiet()], do_start_scenario(testing_scenario, Vars), NumOfUsers = 1000, amoc_controller:add_users(1, NumOfUsers), diff --git a/test/test_helpers.erl b/test/test_helpers.erl index 47bc35f9..ffac87ba 100644 --- a/test/test_helpers.erl +++ b/test/test_helpers.erl @@ -13,18 +13,18 @@ wait_until_scenario_has_users(Scenario, Current, HighestId, ExtraConfig) -> wait_helper:wait_until(WaitUntilFun, WaitUntilValue, ExtraConfig). all_vars() -> - [{interarrival, 1}, {testing_var1, def1}, + [{interarrival, #{rate => 60000, interval => 60000}}, {testing_var1, def1}, {config_scenario_var1, unused_value}]. regular_vars() -> - [{interarrival, 1}, {testing_var1, def1}]. + [{interarrival, #{rate => 60000, interval => 60000}}, {testing_var1, def1}]. all_vars_with_state() -> - [{interarrival, 1}, {testing_state_var1, def1}, + [{interarrival, #{rate => 60000, interval => 60000}}, {testing_state_var1, def1}, {config_scenario_var1, unused_value}]. regular_vars_with_state() -> - [{interarrival, 1}, {testing_state_var1, def1}]. + [{interarrival, #{rate => 60000, interval => 60000}}, {testing_state_var1, def1}]. other_vars_to_keep_quiet() -> [{config_scenario_var1, unused_value}].