Skip to content

Commit

Permalink
Add support for probes, which are called periodically
Browse files Browse the repository at this point in the history
- New bahaviour: mongoose_instrument_probe
- Use timer:apply_interval/4 to call Module:probe/3
  There is timer:apply_periodically in OTP 26, which protects against
  executing the function in parallel, but we need to drop OTP 25 first.
- Store probe timers in the state
- The provided probe function returns measurements, which are then
  passed to mongoose_instrument:execute/4
- Failing probes cause error logs, but are not removed to allow
  recovery from transient issues.
- Probe interval can be set per event, and is 15 seconds by default.

Also: add missing 'loglevel' key to the config() type.

Add a module allowing periodic execution of instrumentation events
  • Loading branch information
chrzaszcz committed Apr 23, 2024
1 parent 515b2df commit c66c8fc
Show file tree
Hide file tree
Showing 2 changed files with 123 additions and 59 deletions.
149 changes: 90 additions & 59 deletions src/instrument/mongoose_instrument.erl
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,11 @@
-type metric_type() :: gauge | spiral | histogram. % to be extended
-type measurements() :: #{atom() => term()}.
-type spec() :: {event_name(), labels(), config()}.
-type config() :: #{metrics => metrics()}. % to be extended
-type config() :: #{metrics => metrics(),
loglevel => logger:level(),
probe => probe_config()}.
-type probe_config() :: #{module := module(),
interval => pos_integer()}.
-type handler_key() :: atom(). % key in the `instrumentation' section of the config file
-type handler_fun() :: fun((event_name(), labels(), config(), measurements()) -> any()).
-type handlers() :: {[handler_fun()], config()}.
Expand All @@ -46,15 +50,17 @@
-optional_callbacks([config_spec/0, start/0, stop/0]).

-export_type([event_name/0, labels/0, label_key/0, label_value/0, config/0, measurements/0,
spec/0, handlers/0, metric_name/0, metric_type/0]).
spec/0, handlers/0, metric_name/0, metric_type/0, probe_config/0]).

%% API

%% @doc Specifies the `instrumentation' section of the config file
-spec config_spec() -> mongoose_config_spec:config_section().
config_spec() ->
Items = [{atom_to_binary(Key), config_spec(Key)} || Key <- all_handler_keys()],
#section{items = maps:from_list(Items),
Options = #{<<"probe_interval">> => #option{type = integer, validate = positive}},
#section{items = maps:merge(maps:from_list(Items), Options),
defaults = #{<<"probe_interval">> => 15},
wrap = global_config,
include = always}.

Expand Down Expand Up @@ -141,60 +147,66 @@ remove_handler(Key) ->

%% gen_server callbacks

-type state() :: #{event_name() => #{labels() => handlers()}}.
-type state() :: #{events := event_map(), probe_timers := probe_timer_map()}.
-type event_map() :: #{event_name() => #{labels() => handlers()}}.
-type probe_timer_map() :: #{{event_name(), labels()} => timer:tref()}.

-spec init([]) -> {ok, state()}.
init([]) ->
lists:foreach(fun start_handler/1, handler_modules()),
erlang:process_flag(trap_exit, true), % Make sure that terminate is called
persistent_term:erase(?MODULE), % Prevent inconsistency when restarted after a kill
{ok, #{}}.
{ok, #{events => #{}, probe_timers => #{}}}.

-spec handle_call(any(), gen_server:from(), state()) ->
{reply, ok | {ok, handlers()} | {error, map()}, state()}.
handle_call({set_up, EventName, Labels, Config}, _From, State) ->
case set_up_and_register(EventName, Labels, Config, State) of
handle_call({set_up, EventName, Labels, Config}, _From,
#{events := Events, probe_timers := ProbeTimers} = State) ->
case set_up_and_register_event(EventName, Labels, Config, Events) of
{error, _} = Error ->
{reply, Error, State};
NewState = #{} ->
update_if_persisted(State, NewState),
{reply, ok, NewState}
NewEvents = #{} ->
update_if_persisted(Events, NewEvents),
NewProbeTimers = start_probe_if_needed(EventName, Labels, Config, ProbeTimers),
{reply, ok, #{events => NewEvents, probe_timers => NewProbeTimers}}
end;
handle_call({tear_down, EventName, Labels}, _From, State) ->
NewState = deregister(EventName, Labels, State),
update_if_persisted(State, NewState),
{reply, ok, NewState};
handle_call({add_handler, Key, ConfigOpts}, _From, State) ->
handle_call({tear_down, EventName, Labels}, _From,
#{events := Events, probe_timers := ProbeTimers}) ->
NewProbeTimers = deregister_probe_timer(EventName, Labels, ProbeTimers),
NewEvents = deregister_event(EventName, Labels, Events),
update_if_persisted(Events, NewEvents),
{reply, ok, #{events => NewEvents, probe_timers => NewProbeTimers}};
handle_call({add_handler, Key, ConfigOpts}, _From, State = #{events := Events}) ->
case mongoose_config:lookup_opt([instrumentation, Key]) of
{error, not_found} ->
mongoose_config:set_opt([instrumentation, Key], ConfigOpts),
Module = handler_module(Key),
start_handler(Module),
NewState = update_handlers(State, [], [Module]),
update_if_persisted(State, NewState),
{reply, ok, NewState};
NewEvents = update_handlers(Events, [], [Module]),
update_if_persisted(Events, NewEvents),
{reply, ok, State#{events := NewEvents}};
{ok, ExistingConfig} ->
{reply, {error, #{what => handler_already_configured, handler_key => Key,
existing_config => ExistingConfig}},
State}
end;
handle_call({remove_handler, Key}, _From, State) ->
handle_call({remove_handler, Key}, _From, State = #{events := Events}) ->
case mongoose_config:lookup_opt([instrumentation, Key]) of
{error, not_found} ->
{reply, {error, #{what => handler_not_configured, handler_key => Key}}, State};
{ok, _} ->
mongoose_config:unset_opt([instrumentation, Key]),
Module = handler_module(Key),
NewState = update_handlers(State, [Module], []),
update_if_persisted(State, NewState),
NewEvents = update_handlers(Events, [Module], []),
update_if_persisted(Events, NewEvents),
stop_handler(Module),
{reply, ok, NewState}
{reply, ok, State#{events := NewEvents}}
end;
handle_call(persist, _From, State) ->
persistent_term:put(?MODULE, State),
handle_call(persist, _From, State = #{events := Events}) ->
persistent_term:put(?MODULE, Events),
{reply, ok, State};
handle_call({lookup, EventName, Labels}, _From, State) ->
{reply, lookup(EventName, Labels, State), State};
handle_call({lookup, EventName, Labels}, _From, State = #{events := Events}) ->
{reply, lookup(EventName, Labels, Events), State};
handle_call(Request, From, State) ->
?UNEXPECTED_CALL(Request, From),
{reply, {error, #{what => unexpected_call, request => Request}}, State}.
Expand All @@ -220,18 +232,19 @@ code_change(_OldVsn, State, _Extra) ->

%% Internal functions

-spec update_if_persisted(state(), state()) -> ok.
update_if_persisted(State, NewState) ->
-spec update_if_persisted(event_map(), event_map()) -> ok.
update_if_persisted(Events, NewEvents) ->
try persistent_term:get(?MODULE) of
State -> persistent_term:put(?MODULE, NewState)
Events -> persistent_term:put(?MODULE, NewEvents)
catch
error:badarg -> ok
end.

-spec set_up_and_register(event_name(), labels(), config(), state()) -> state() | {error, map()}.
set_up_and_register(EventName, Labels, Config, State) ->
-spec set_up_and_register_event(event_name(), labels(), config(), event_map()) ->
event_map() | {error, map()}.
set_up_and_register_event(EventName, Labels, Config, Events) ->
LabelKeys = label_keys(Labels),
case State of
case Events of
#{EventName := #{Labels := _}} ->
{error, #{what => event_already_registered,
event_name => EventName, labels => Labels}};
Expand All @@ -240,30 +253,43 @@ set_up_and_register(EventName, Labels, Config, State) ->
case label_keys(ExistingLabels) of
LabelKeys ->
Handlers = do_set_up(EventName, Labels, Config),
State#{EventName := HandlerMap#{Labels => Handlers}};
Events#{EventName := HandlerMap#{Labels => Handlers}};
ExistingKeys ->
{error, #{what => inconsistent_labels,
event_name => EventName, labels => Labels,
existing_label_keys => ExistingKeys}}
end;
#{} ->
Handlers = do_set_up(EventName, Labels, Config),
State#{EventName => #{Labels => Handlers}}
Events#{EventName => #{Labels => Handlers}}
end.

-spec do_set_up(event_name(), labels(), config()) -> handlers().
do_set_up(EventName, Labels, Config) ->
HandlerFuns = set_up_handlers(EventName, Labels, Config, handler_modules()),
{HandlerFuns, Config}.

-spec update_handlers(state(), [module()], [module()]) -> state().
update_handlers(State, ToRemove, ToAdd) ->
-spec start_probe_if_needed(event_name(), labels(), config(), probe_timer_map()) ->
probe_timer_map().
start_probe_if_needed(EventName, Labels, #{probe := ProbeConfig}, ProbeTimers) ->
TRef = mongoose_instrument_probe:start_probe_timer(EventName, Labels, ProbeConfig),
add_probe_timer(EventName, Labels, TRef, ProbeTimers);
start_probe_if_needed(_EventName, _Labels, _Config, ProbeTimers) ->
ProbeTimers.

-spec add_probe_timer(event_name(), labels(), timer:tref(), probe_timer_map()) -> probe_timer_map().
add_probe_timer(EventName, Labels, TRef, ProbeTimers) ->
false = maps:is_key({EventName, Labels}, ProbeTimers), % sanity check to detect timer leak
ProbeTimers#{{EventName, Labels} => TRef}.

-spec update_handlers(event_map(), [module()], [module()]) -> event_map().
update_handlers(Events, ToRemove, ToAdd) ->
maps:map(fun(EventName, HandlerMap) ->
maps:map(fun(Labels, Handlers) ->
update_event_handlers(EventName, Labels, Handlers,
ToRemove, ToAdd)
end, HandlerMap)
end, State).
end, Events).

-spec update_event_handlers(event_name(), labels(), handlers(), [module()], [module()]) ->
handlers().
Expand All @@ -278,25 +304,35 @@ set_up_handlers(EventName, Labels, Config, Modules) ->
UsedModules = lists:filter(fun(Mod) -> Mod:set_up(EventName, Labels, Config) end, Modules),
modules_to_funs(UsedModules).

-spec deregister(event_name(), labels(), state()) -> state().
deregister(EventName, Labels, State) ->
case State of
-spec deregister_event(event_name(), labels(), event_map()) -> event_map().
deregister_event(EventName, Labels, Events) ->
case Events of
#{EventName := HandlerMap} ->
case maps:remove(Labels, HandlerMap) of
Empty when Empty =:= #{} ->
maps:remove(EventName, State);
maps:remove(EventName, Events);
NewHandlerMap ->
State#{EventName := NewHandlerMap}
Events#{EventName := NewHandlerMap}
end;
#{} ->
State
Events
end.

-spec deregister_probe_timer(event_name(), labels(), probe_timer_map()) -> probe_timer_map().
deregister_probe_timer(EventName, Labels, ProbeTimers) ->
case maps:take({EventName, Labels}, ProbeTimers) of
{TRef, NewProbeTimers} ->
timer:cancel(TRef),
NewProbeTimers;
error ->
ProbeTimers % no timer was registered
end.

-spec lookup(event_name(), labels()) -> {ok, handlers()} | {error, map()}.
lookup(EventName, Labels) ->
try persistent_term:get(?MODULE) of
State ->
lookup(EventName, Labels, State)
Events ->
lookup(EventName, Labels, Events)
catch
%% Although persist/0 should be called before handling traffic,
%% some instrumented events might happen before that, and they shouldn't fail.
Expand All @@ -306,9 +342,9 @@ lookup(EventName, Labels) ->
gen_server:call(?MODULE, {lookup, EventName, Labels})
end.

-spec lookup(event_name(), labels(), state()) -> {ok, handlers()} | {error, map()}.
lookup(EventName, Labels, State) ->
case State of
-spec lookup(event_name(), labels(), event_map()) -> {ok, handlers()} | {error, map()}.
lookup(EventName, Labels, Events) ->
case Events of
#{EventName := #{Labels := Handlers}} ->
{ok, Handlers};
#{} ->
Expand Down Expand Up @@ -338,7 +374,8 @@ modules_to_funs(Modules) ->

-spec handler_modules() -> [module()].
handler_modules() ->
[handler_module(Key) || Key <- maps:keys(mongoose_config:get_opt(instrumentation))].
Keys = [Key || {Key, #{}} <- maps:to_list(mongoose_config:get_opt(instrumentation))],
lists:map(fun handler_module/1, Keys).

-spec handler_module(handler_key()) -> module().
handler_module(Key) ->
Expand Down Expand Up @@ -372,13 +409,7 @@ stop_handler(Module) ->

-spec call_handler(handler_fun(), event_name(), labels(), config(), measurements()) -> any().
call_handler(HandlerFun, EventName, Labels, Config, Measurements) ->
try
HandlerFun(EventName, Labels, Config, Measurements)
catch
Class:Reason:StackTrace ->
?LOG_ERROR(#{what => event_handler_failed,
handler_fun => HandlerFun,
event_name => EventName, labels => Labels, config => Config,
measurements => Measurements,
class => Class, reason => Reason, stacktrace => StackTrace})
end.
safely:apply_and_log(HandlerFun, [EventName, Labels, Config, Measurements],
#{what => event_handler_failed, handler_fun => HandlerFun,
event_name => EventName, labels => Labels, config => Config,
measurements => Measurements}).
33 changes: 33 additions & 0 deletions src/instrument/mongoose_instrument_probe.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
-module(mongoose_instrument_probe).

-export([start_probe_timer/3, call/3]).

-callback probe(mongoose_instrument:event_name(), mongoose_instrument:labels()) ->
mongoose_instrument:measurements().

-ignore_xref([call/3]).

-spec start_probe_timer(mongoose_instrument:event_name(),
mongoose_instrument:labels(),
mongoose_instrument:probe_config()) -> timer:tref().
start_probe_timer(EventName, Labels, #{module := Module} = ProbeConfig) ->
Interval = timer:seconds(get_probe_interval(ProbeConfig)),
%% TODO: when dropping support for OTP25, consider changing this to apply_repeatedly
{ok, TRef} = timer:apply_interval(Interval, ?MODULE, call, [Module, EventName, Labels]),
TRef.

call(ProbeMod, EventName, Labels) ->
case safely:apply_and_log(ProbeMod, probe, [EventName, Labels],
#{what => probe_failed, probe_mod => ProbeMod,
event_name => EventName, labels => Labels}) of
{exception, _} ->
ok; % Already logged
Measurements = #{} ->
mongoose_instrument:execute(EventName, Labels, Measurements)
end.

-spec get_probe_interval(mongoose_instrument:probe_config()) -> pos_integer().
get_probe_interval(#{interval := Interval}) when is_integer(Interval), Interval > 0 ->
Interval;
get_probe_interval(#{}) ->
mongoose_config:get_opt([instrumentation, probe_interval]).

0 comments on commit c66c8fc

Please sign in to comment.