Skip to content

Commit

Permalink
reduce uninformative process death spam
Browse files Browse the repository at this point in the history
Using supervisor:terminate_child/2 instead of erlang:exit/2 means that
the supervisor doesn't complain about the untimely death of its worker
process.  Combine this with changing the fittings from linking to
monitoring of the builder process (so the builder isn't torn down
abnormally when a fitting dies), and you get the end of "fitting_died"
spam.
  • Loading branch information
beerriot committed Nov 16, 2011
1 parent 7bdb7c5 commit 276e90c
Show file tree
Hide file tree
Showing 6 changed files with 47 additions and 16 deletions.
3 changes: 1 addition & 2 deletions src/riak_pipe.erl
Original file line number Diff line number Diff line change
Expand Up @@ -369,8 +369,7 @@ collect_results(Pipe, ResultAcc, LogAcc, Timeout) ->
%% of waiting for an `eoi' to propagate through.
-spec destroy(pipe()) -> ok.
destroy(#pipe{builder=Builder}) ->
erlang:exit(Builder, kill),
ok.
riak_pipe_builder:destroy(Builder).

%% @doc Get all active pipelines hosted on `Node'. Pass the atom
%% `global' instead of a node name to get all pipelines hosted on
Expand Down
23 changes: 18 additions & 5 deletions src/riak_pipe_builder.erl
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@
%% API
-export([start_link/2]).
-export([fitting_pids/1,
pipeline/1]).
pipeline/1,
destroy/1]).

%% gen_fsm callbacks
-export([init/1,
Expand Down Expand Up @@ -86,6 +87,11 @@ fitting_pids(Builder) ->
pipeline(BuilderPid) ->
gen_fsm:sync_send_event(BuilderPid, pipeline).

%% @doc Shutdown the pipeline built by this builder.
-spec destroy(pid()) -> ok.
destroy(BuilderPid) ->
gen_fsm:sync_send_event(BuilderPid, destroy, infinity).

%%%===================================================================
%%% gen_fsm callbacks
%%%===================================================================
Expand Down Expand Up @@ -123,14 +129,18 @@ wait_pipeline_shutdown(_Event, State) ->
{next_state, wait_pipeline_shutdown, State}.

%% @doc A client is asking for the fittings. Respond.
-spec wait_pipeline_shutdown(pipeline, term(), state()) ->
-spec wait_pipeline_shutdown(pipeline | destroy, term(), state()) ->
{reply,
{ok, #pipe{}},
wait_pipeline_shutdown,
state()}.
state()}
|{stop, normal, ok, state()}.
wait_pipeline_shutdown(pipeline, _From, #state{pipe=Pipe}=State) ->
%% everything is started - reply now
{reply, {ok, Pipe}, wait_pipeline_shutdown, State};
wait_pipeline_shutdown(destroy, _From, State) ->
%% client asked to shutdown this pipe immediately
{stop, normal, ok, State};
wait_pipeline_shutdown(_, _, State) ->
%% unknown message - reply {error, unknown} to get rid of it
{reply, {error, unknown}, wait_pipeline_shutdown, State}.
Expand Down Expand Up @@ -217,9 +227,12 @@ maybe_shutdown(Reason, _StateName, State) ->
%% explode!
{stop, {fitting_exited_abnormally, Reason}, State}.

%% @doc Unused.
%% @doc Terminate any fittings that are still alive.
-spec terminate(term(), atom(), state()) -> ok.
terminate(_Reason, _StateName, _State) ->
terminate(_Reason, _StateName, #state{alive=Alive}) ->
%% this is a brutal kill of each fitting, just in case that fitting
%% is otherwise swamped with stop/restart messages from its workers
[ riak_pipe_fitting_sup:terminate_fitting(F) || {F,_R} <- Alive ],
ok.

%% @doc Unused.
Expand Down
13 changes: 8 additions & 5 deletions src/riak_pipe_fitting.erl
Original file line number Diff line number Diff line change
Expand Up @@ -137,10 +137,8 @@ crash(Pid, Fun) ->
%%% gen_fsm callbacks
%%%===================================================================

%% @doc Initialize the fitting process. This function links to the
%% builder process, so it will tear down if the builder exits
%% abnormally (which happens if another fitting exist
%% abnormally).
%% @doc Initialize the fitting process. This function monitors the
%% builder process, so it will tear down if the builder exits.
-spec init([pid() | riak_pipe:fitting_spec() | riak_pipe:fitting()
| riak_pipe:exec_opts()]) ->
{ok, wait_upstream_eoi, state()}.
Expand All @@ -159,7 +157,7 @@ init([Builder,

?T(Details, [], {fitting, init_started}),

erlang:link(Builder),
erlang:monitor(process, Builder),

?T(Details, [], {fitting, init_finished}),

Expand Down Expand Up @@ -333,6 +331,11 @@ handle_sync_event(_Event, _From, StateName, State) ->
atom(), state()) ->
{next_state, atom(), state()}
|{stop, normal, state()}.
handle_info({'DOWN', _Ref, process, Builder, _Reason},
_StateName,
#state{builder=Builder}=State) ->
%% if the builder exits, stop immediately
{stop, normal, State};
handle_info({'DOWN', Ref, _, _, _}, StateName, State) ->
case lists:keytake(Ref, #worker.monitor, State#state.workers) of
{value, Worker, Rest} ->
Expand Down
10 changes: 9 additions & 1 deletion src/riak_pipe_fitting_sup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@

%% API
-export([start_link/0]).
-export([add_fitting/4]).
-export([add_fitting/4,
terminate_fitting/1]).

%% Supervisor callbacks
-export([init/1]).
Expand Down Expand Up @@ -55,6 +56,13 @@ add_fitting(Builder, Spec, Output, Options) ->
?DPF("Adding fitting for ~p", [Spec]),
supervisor:start_child(?SERVER, [Builder, Spec, Output, Options]).

%% @doc Terminate a fitting immediately. Useful for tearing down
%% pipelines that may be otherwise swamped with messages from
%% restarting workers.
-spec terminate_fitting(riak_pipe:fitting()) -> ok | {error, term()}.
terminate_fitting(#fitting{pid=Pid}) ->
supervisor:terminate_child(?SERVER, Pid).

%%%===================================================================
%%% Supervisor callbacks
%%%===================================================================
Expand Down
6 changes: 4 additions & 2 deletions src/riak_pipe_vnode.erl
Original file line number Diff line number Diff line change
Expand Up @@ -622,14 +622,16 @@ handle_exit(Pid, Reason, #state{partition=Partition}=State) ->
%% @doc Handle a 'DOWN' message from a fitting process. Kill the
%% worker associated with that fitting and dispose of its queue.
-spec handle_info(term(), state()) -> {ok, state()}.
handle_info({'DOWN',_,process,Pid,_}, #state{partition=Partition}=State) ->
handle_info({'DOWN',_,process,Pid,_},
#state{partition=Partition, worker_sup=WorkerSup}=State) ->
NewState = case worker_by_fitting_pid(Pid, State) of
{ok, Worker} ->
?T(Worker#worker.details, [error],
{vnode, {fitting_died, Partition}}),
%% if the fitting died, tear down its worker
erlang:unlink(Worker#worker.pid),
erlang:exit(Worker#worker.pid, fitting_died),
riak_pipe_vnode_worker_sup:terminate_worker(
WorkerSup, Worker#worker.pid),
remove_worker(Worker, State);
none ->
%% TODO: log this somewhere?
Expand Down
8 changes: 7 additions & 1 deletion src/riak_pipe_vnode_worker_sup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@

%% API
-export([start_link/2]).
-export([start_worker/2]).
-export([start_worker/2,
terminate_worker/2]).

%% Supervisor callbacks
-export([init/1]).
Expand All @@ -48,6 +49,11 @@ start_link(Partition, VnodePid) ->
start_worker(Supervisor, Details) ->
supervisor:start_child(Supervisor, [Details]).

%% @doc Stop a worker immediately
-spec terminate_worker(pid(), pid()) -> ok | {error, term()}.
terminate_worker(Supervisor, WorkerPid) ->
supervisor:terminate_child(Supervisor, WorkerPid).

%%%===================================================================
%%% Supervisor callbacks
%%%===================================================================
Expand Down

0 comments on commit 276e90c

Please sign in to comment.