Skip to content

Commit

Permalink
explicitly clean up async mapred input sender
Browse files Browse the repository at this point in the history
To help the process of shutting down the asynchronus mapred input
sender, this patch explicitly kills it whenever it knows that it should
die. The previous patch made the sender better able to determine that
time on its own, but this should improve the situation even more.
  • Loading branch information
beerriot committed Oct 20, 2012
1 parent 3ee48a4 commit 37cd964
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 5 deletions.
20 changes: 16 additions & 4 deletions src/riak_kv_pb_mapred.erl
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,8 @@ process_stream(#pipe_result{ref=ReqId, from=PhaseId, result=Res},
case encode_mapred_phase([Res], ContentType, HasMRQuery) of
{error, Reason} ->
erlang:cancel_timer(PipeCtx#pipe_ctx.timer),
%% destroying the pipe will automatically kill the sender
riak_pipe:destroy(PipeCtx#pipe_ctx.pipe),
cleanup_sender(PipeCtx),
{error, Reason, State#state{req = undefined, req_ctx = undefined}};
Response ->
{reply, #rpbmapredresp{phase=PhaseId, response=Response}, State}
Expand All @@ -111,8 +111,8 @@ process_stream(#pipe_log{ref=ReqId, from=From, msg=Msg},
case Msg of
{trace, [error], {error, Info}} ->
erlang:cancel_timer(PipeCtx#pipe_ctx.timer),
%% destroying the pipe will automatically kill the sender
riak_pipe:destroy(PipeCtx#pipe_ctx.pipe),
cleanup_sender(PipeCtx),
JsonInfo = {struct, riak_kv_mapred_json:jsonify_pipe_error(
From, Info)},
{error, mochijson2:encode(JsonInfo), State#state{req = undefined, req_ctx = undefined}};
Expand Down Expand Up @@ -141,9 +141,9 @@ process_stream({'DOWN', Ref, process, Pid, Reason}, Ref,

process_stream({pipe_timeout, Ref}, Ref,
State=#state{req=#rpbmapredreq{},
req_ctx=#pipe_ctx{ref=Ref,pipe=Pipe}}) ->
%% destroying the pipe will automatically kill the sender
req_ctx=#pipe_ctx{ref=Ref,pipe=Pipe}=PipeCtx}) ->
riak_pipe:destroy(Pipe),
cleanup_sender(PipeCtx),
{error, "timeout", State#state{req=undefined, req_ctx=undefined}};

%% LEGACY Handle response from mapred_stream/mapred_bucket_stream
Expand Down Expand Up @@ -178,6 +178,18 @@ process_stream(_,_,State) -> % Ignore any late replies from gen_servers/messages
%% Internal functions
%% ===================================================================

%% Destroying the pipe via riak_pipe_builder:destroy/1 does not kill
%% the sender immediately, because it causes the builder to exit with
%% reason `normal', so no exit signal is sent. The sender will
%% eventually receive `worker_startup_error's from vnodes that can no
%% longer find the fittings, but to help the process along, we kill
%% them immediately here.
cleanup_sender(#pipe_ctx{sender={SenderPid, SenderRef}}) ->
erlang:demonitor(SenderRef, [flush]),
exit(SenderPid, kill);
cleanup_sender(_) ->
ok.

pipe_mapreduce(Req, State, Inputs, Query, Timeout) ->
try riak_kv_mrc_pipe:mapred_stream(Query) of
{{ok, Pipe}, _NumKeeps} ->
Expand Down
15 changes: 14 additions & 1 deletion src/riak_kv_wm_mapred.erl
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,16 @@ pipe_mapred(RD,
State}
end.

%% Destroying the pipe via riak_pipe_builder:destroy/1 does not kill
%% the sender immediately, because it causes the builder to exit with
%% reason `normal', so no exit signal is sent. The sender will
%% eventually receive `worker_startup_error's from vnodes that can no
%% longer find the fittings, but to help the process along, we kill
%% them immediately here.
cleanup_sender({SenderPid, SenderRef}) ->
erlang:demonitor(SenderRef, [flush]),
exit(SenderPid, kill).

cleanup_timer({Tref, PipeRef}) ->
case erlang:cancel_timer(Tref) of
false ->
Expand Down Expand Up @@ -239,13 +249,14 @@ pipe_mapred_nonchunked(RD, State, Pipe, NumKeeps, Sender, PipeTref) ->
prevent_keepalive(),
{{halt, 500}, send_error(Error, RD), State};
{error, timeout} ->
%% destroying the pipe will tear down the linked sender
riak_pipe:destroy(Pipe),
cleanup_sender(Sender),
prevent_keepalive(),
{{halt, 500}, send_error({error, timeout}, RD), State};
{error, Error} ->
cleanup_timer(PipeTref),
riak_pipe:destroy(Pipe),
cleanup_sender(Sender),
prevent_keepalive(),
{{halt, 500}, send_error({error, Error}, RD), State}
end.
Expand Down Expand Up @@ -328,6 +339,7 @@ pipe_stream_mapred_results(RD, Pipe,
{iolist_to_binary(["\r\n--", Boundary, "--\r\n"]), done};
{error, timeout} ->
riak_pipe:destroy(Pipe),
cleanup_sender(Sender),
prevent_keepalive(),
{format_error({error, timeout}), done};
{error, {sender_error, Error}} ->
Expand All @@ -337,6 +349,7 @@ pipe_stream_mapred_results(RD, Pipe,
{error, {Error, _Input}} ->
cleanup_timer(PipeTref),
riak_pipe:destroy(Pipe),
cleanup_sender(Sender),
prevent_keepalive(),
{format_error({error, Error}), done}
end.
Expand Down

0 comments on commit 37cd964

Please sign in to comment.