Skip to content

Commit

Permalink
Merge pull request #433 from branch 'bwf-no-luke'
Browse files Browse the repository at this point in the history
  • Loading branch information
beerriot committed Nov 14, 2012
2 parents 4eaa81c + cb88551 commit b0255c0
Show file tree
Hide file tree
Showing 32 changed files with 119 additions and 2,442 deletions.
1 change: 0 additions & 1 deletion rebar.config
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@

{deps, [
{riak_core, ".*", {git, "git://github.com/basho/riak_core", "master"}},
{luke, ".*", {git, "git://github.com/basho/luke", "master"}},
{erlang_js, ".*", {git, "git://github.com/basho/erlang_js", "master"}},
{bitcask, ".*", {git, "git://github.com/basho/bitcask", "master"}},
{merge_index, ".*", {git, "git://github.com/basho/merge_index",
Expand Down
193 changes: 9 additions & 184 deletions src/riak_client.erl
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,6 @@
-module(riak_client, [Node,ClientId]).
-author('Justin Sheehy <justin@basho.com>').

-export([mapred/2,mapred/3,mapred/4]).
-export([mapred_stream/2,mapred_stream/3,mapred_stream/4]).
-export([mapred_bucket/2,mapred_bucket/3,mapred_bucket/4]).
-export([mapred_bucket_stream/3,mapred_bucket_stream/4,mapred_bucket_stream/5,
mapred_bucket_stream/6]).
-export([mapred_dynamic_inputs_stream/3]).
-export([get/2, get/3,get/4]).
-export([put/1, put/2,put/3,put/4,put/5]).
-export([delete/2,delete/3,delete/4]).
Expand All @@ -56,158 +50,6 @@

-type riak_client() :: term().

%% @spec mapred(Inputs :: riak_kv_mapred_term:mapred_inputs(),
%% Query :: [riak_kv_mapred_query:mapred_queryterm()]) ->
%% {ok, riak_kv_mapred_query:mapred_result()} |
%% {error, {bad_qterm, riak_kv_mapred_query:mapred_queryterm()}} |
%% {error, timeout} |
%% {error, Err :: term()}
%% @doc Perform a map/reduce job across the cluster.
%% See the map/reduce documentation for explanation of behavior.
%% @equiv mapred(Inputs, Query, default_timeout())
mapred(Inputs,Query) -> mapred(Inputs,Query,?DEFAULT_TIMEOUT).

%% @spec mapred(Inputs :: riak_kv_mapred_term:mapred_inputs(),
%% Query :: [riak_kv_mapred_query:mapred_queryterm()],
%% TimeoutMillisecs :: integer() | 'infinity') ->
%% {ok, riak_kv_mapred_query:mapred_result()} |
%% {error, {bad_qterm, riak_kv_mapred_query:mapred_queryterm()}} |
%% {error, timeout} |
%% {error, Err :: term()}
%% @doc Perform a map/reduce job across the cluster.
%% See the map/reduce documentation for explanation of behavior.
mapred(Inputs,Query,Timeout) ->
mapred(Inputs,Query,undefined,Timeout).

%% @spec mapred(Inputs :: riak_kv_mapred_term:mapred_inputs(),
%% Query :: [riak_kv_mapred_query:mapred_queryterm()],
%% TimeoutMillisecs :: integer() | 'infinity',
%% ResultTransformer :: function()) ->
%% {ok, riak_kv_mapred_query:mapred_result()} |
%% {error, {bad_qterm, riak_kv_mapred_query:mapred_queryterm()}} |
%% {error, timeout} |
%% {error, Err :: term()}
%% @doc Perform a map/reduce job across the cluster.
%% See the map/reduce documentation for explanation of behavior.
mapred(Inputs,Query,ResultTransformer,Timeout) when is_binary(Inputs) orelse
is_tuple(Inputs) ->
case is_binary(Inputs) orelse is_key_filter(Inputs) of
true ->
mapred_bucket(Inputs, Query, ResultTransformer, Timeout);
false ->
Me = self(),
case mapred_stream(Query,Me,ResultTransformer,Timeout) of
{ok, {ReqId, FlowPid}} ->
mapred_dynamic_inputs_stream(FlowPid, Inputs, Timeout),
luke_flow:finish_inputs(FlowPid),
luke_flow:collect_output(ReqId, Timeout);
Error ->
Error
end
end;
mapred(Inputs,Query,ResultTransformer,Timeout)
when is_list(Query),
(is_integer(Timeout) orelse Timeout =:= infinity) ->
Me = self(),
case mapred_stream(Query,Me,ResultTransformer,Timeout) of
{ok, {ReqId, FlowPid}} ->
case is_list(Inputs) of
true ->
add_inputs(FlowPid, Inputs);
false ->
mapred_dynamic_inputs_stream(FlowPid, Inputs, Timeout)
end,
luke_flow:finish_inputs(FlowPid),
luke_flow:collect_output(ReqId, Timeout);
Error ->
Error
end.

%% @spec mapred_stream(Query :: [riak_kv_mapred_query:mapred_queryterm()],
%% ClientPid :: pid()) ->
%% {ok, {ReqId :: term(), MR_FSM_PID :: pid()}} |
%% {error, {bad_qterm, riak_kv_mapred_query:mapred_queryterm()}} |
%% {error, Err :: term()}
%% @doc Perform a streaming map/reduce job across the cluster.
%% See the map/reduce documentation for explanation of behavior.
mapred_stream(Query,ClientPid) ->
mapred_stream(Query,ClientPid,?DEFAULT_TIMEOUT).

%% @spec mapred_stream(Query :: [riak_kv_mapred_query:mapred_queryterm()],
%% ClientPid :: pid(),
%% TimeoutMillisecs :: integer() | 'infinity') ->
%% {ok, {ReqId :: term(), MR_FSM_PID :: pid()}} |
%% {error, {bad_qterm, riak_kv_mapred_query:mapred_queryterm()}} |
%% {error, Err :: term()}
%% @doc Perform a streaming map/reduce job across the cluster.
%% See the map/reduce documentation for explanation of behavior.
mapred_stream(Query, ClientPid, Timeout) ->
mapred_stream(Query, ClientPid, undefined, Timeout).

%% @spec mapred_stream(Query :: [riak_kv_mapred_query:mapred_queryterm()],
%% ClientPid :: pid(),
%% TimeoutMillisecs :: integer() | 'infinity',
%% ResultTransformer :: function()) ->
%% {ok, {ReqId :: term(), MR_FSM_PID :: pid()}} |
%% {error, {bad_qterm, riak_kv_mapred_query:mapred_queryterm()}} |
%% {error, Err :: term()}
%% @doc Perform a streaming map/reduce job across the cluster.
%% See the map/reduce documentation for explanation of behavior.
mapred_stream(Query,ClientPid,ResultTransformer,Timeout)
when is_list(Query), is_pid(ClientPid),
(is_integer(Timeout) orelse Timeout =:= infinity) ->
ReqId = mk_reqid(),
case riak_kv_mapred_query:start(Node, ClientPid, ReqId, Query, ResultTransformer, Timeout) of
{ok, Pid} ->
{ok, {ReqId, Pid}};
Error ->
Error
end.

mapred_bucket_stream(Bucket, Query, ClientPid) ->
mapred_bucket_stream(Bucket, Query, ClientPid, ?DEFAULT_TIMEOUT).

mapred_bucket_stream(Bucket, Query, ClientPid, Timeout) ->
mapred_bucket_stream(Bucket, Query, ClientPid, undefined, Timeout).

mapred_bucket_stream(Bucket, Query, ClientPid, ResultTransformer, Timeout) ->
{ok,{MR_ReqId,MR_FSM}} = mapred_stream(Query,ClientPid,ResultTransformer,Timeout),
{ok,_Stream_ReqID} = stream_list_keys(Bucket, Timeout,
MR_FSM, mapred),
{ok,MR_ReqId}.


%% @deprecated Only in place for backwards compatibility.
mapred_bucket_stream(Bucket, Query, ClientPid, ResultTransformer, Timeout, _) ->
mapred_bucket_stream(Bucket, Query, ClientPid, ResultTransformer, Timeout).

mapred_bucket(Bucket, Query) ->
mapred_bucket(Bucket, Query, ?DEFAULT_TIMEOUT).

mapred_bucket(Bucket, Query, Timeout) ->
mapred_bucket(Bucket, Query, undefined, Timeout).

mapred_bucket(Bucket, Query, ResultTransformer, Timeout) ->
Me = self(),
{ok,MR_ReqId} = mapred_bucket_stream(Bucket, Query, Me,
ResultTransformer, Timeout),
luke_flow:collect_output(MR_ReqId, Timeout).

-define(PRINT(Var), io:format("DEBUG: ~p:~p - ~p~n~n ~p~n~n", [?MODULE, ?LINE, ??Var, Var])).

%% An InputDef defines a Module and Function to call to generate
%% inputs for a map/reduce job. Should return {ok,
%% LukeReqID}. Ideally, we'd combine both the other input types (BKeys
%% and Bucket) into this approach, but postponing until after a code
%% review of Map/Reduce.
mapred_dynamic_inputs_stream(FSMPid, InputDef, Timeout) ->
case InputDef of
{modfun, Mod, Fun, Options} ->
Mod:Fun(FSMPid, Options, Timeout);
_ ->
throw({invalid_inputdef, InputDef})
end.

%% @spec get(riak_object:bucket(), riak_object:key()) ->
%% {ok, riak_object:riak_object()} |
%% {error, notfound} |
Expand Down Expand Up @@ -469,7 +311,7 @@ list_keys(Bucket, Timeout, ErrorTolerance) when is_integer(Timeout) ->
Me = self(),
ReqId = mk_reqid(),
FSM_Timeout = trunc(Timeout / 8),
riak_kv_keys_fsm_legacy_sup:start_keys_fsm(Node, [ReqId, Bucket, FSM_Timeout, plain, ErrorTolerance, Me]),
riak_kv_keys_fsm_legacy_sup:start_keys_fsm(Node, [ReqId, Bucket, FSM_Timeout, ErrorTolerance, Me]),
wait_for_listkeys(ReqId, Timeout);
%% @spec list_keys(riak_object:bucket(), TimeoutMillisecs :: integer()) ->
%% {ok, [Key :: riak_object:key()]} |
Expand All @@ -487,7 +329,7 @@ list_keys(Bucket, Filter, Timeout) ->
_ ->
Me = self(),
ReqId = mk_reqid(),
riak_kv_keys_fsm_sup:start_keys_fsm(Node, [{raw, ReqId, Me}, [Bucket, Filter, Timeout, plain]]),
riak_kv_keys_fsm_sup:start_keys_fsm(Node, [{raw, ReqId, Me}, [Bucket, Filter, Timeout]]),
wait_for_listkeys(ReqId, Timeout)
end.

Expand All @@ -505,11 +347,11 @@ stream_list_keys(Bucket, Timeout, _) ->
stream_list_keys(Bucket, Timeout).

%% @deprecated Only in place for backwards compatibility.
stream_list_keys(Bucket0, Timeout, ErrorTolerance, Client, ClientType) ->
stream_list_keys(Bucket0, Timeout, ErrorTolerance, Client, _ClientType) ->
ReqId = mk_reqid(),
case build_filter(Bucket0) of
{ok, Filter} ->
riak_kv_keys_fsm_legacy_sup:start_keys_fsm(Node, [ReqId, Filter, Timeout, ClientType, ErrorTolerance, Client]),
riak_kv_keys_fsm_legacy_sup:start_keys_fsm(Node, [ReqId, Filter, Timeout, ErrorTolerance, Client]),
{ok, ReqId};
Error ->
Error
Expand Down Expand Up @@ -550,17 +392,15 @@ stream_list_keys(Input, Timeout, Client, ClientType) when is_pid(Client) ->
Client},
[Bucket,
FilterExprs,
Timeout,
ClientType]]),
Timeout]]),
{ok, ReqId}
end;
Bucket ->
riak_kv_keys_fsm_sup:start_keys_fsm(Node,
[{raw, ReqId, Client},
[Bucket,
none,
Timeout,
ClientType]]),
Timeout]]),
{ok, ReqId}
end
end;
Expand Down Expand Up @@ -636,7 +476,7 @@ list_buckets(Filter, Timeout) ->
_ ->
Me = self(),
ReqId = mk_reqid(),
riak_kv_buckets_fsm_sup:start_buckets_fsm(Node, [{raw, ReqId, Me}, [Filter, Timeout, plain]]),
riak_kv_buckets_fsm_sup:start_buckets_fsm(Node, [{raw, ReqId, Me}, [Filter, Timeout]]),
wait_for_listbuckets(ReqId, Timeout)
end.

Expand Down Expand Up @@ -676,7 +516,7 @@ get_index(Bucket, Query) ->
get_index(Bucket, Query, Timeout) ->
Me = self(),
ReqId = mk_reqid(),
riak_kv_index_fsm_sup:start_index_fsm(Node, [{raw, ReqId, Me}, [Bucket, none, Query, Timeout, plain]]),
riak_kv_index_fsm_sup:start_index_fsm(Node, [{raw, ReqId, Me}, [Bucket, none, Query, Timeout]]),
wait_for_query_results(ReqId, Timeout).

%% @spec stream_get_index(Bucket :: binary(),
Expand All @@ -700,7 +540,7 @@ stream_get_index(Bucket, Query) ->
stream_get_index(Bucket, Query, Timeout) ->
Me = self(),
ReqId = mk_reqid(),
riak_kv_index_fsm_sup:start_index_fsm(Node, [{raw, ReqId, Me}, [Bucket, none, Query, Timeout, plain]]),
riak_kv_index_fsm_sup:start_index_fsm(Node, [{raw, ReqId, Me}, [Bucket, none, Query, Timeout]]),
{ok, ReqId}.

%% @spec set_bucket(riak_object:bucket(), [BucketProp :: {atom(),term()}]) -> ok
Expand Down Expand Up @@ -799,21 +639,6 @@ wait_for_query_results(ReqId, Timeout, Acc) ->
{error, timeout}
end.

add_inputs(_FlowPid, []) ->
ok;
add_inputs(FlowPid, Inputs) when length(Inputs) < 100 ->
luke_flow:add_inputs(FlowPid, Inputs);
add_inputs(FlowPid, Inputs) ->
{Current, Next} = lists:split(100, Inputs),
luke_flow:add_inputs(FlowPid, Current),
add_inputs(FlowPid, Next).

is_key_filter({Bucket, Filters}) when is_binary(Bucket),
is_list(Filters) ->
true;
is_key_filter(_) ->
false.

%% @deprecated This function is only here to support
%% rolling upgrades and will be removed.
build_filter({Bucket, Exprs}) ->
Expand Down
3 changes: 0 additions & 3 deletions src/riak_index.erl
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,6 @@

mapred_index(Dest, Args) ->
mapred_index(Dest, Args, ?TIMEOUT).
mapred_index(FlowPid, [_Bucket, _Query], _Timeout)
when is_pid(FlowPid) ->
throw({not_supported, mapred_index, FlowPid});
mapred_index(_Pipe, [Bucket, Query], Timeout) ->
{ok, C} = riak:local_client(),
{ok, ReqId} = C:stream_get_index(Bucket, Query, Timeout),
Expand Down
1 change: 0 additions & 1 deletion src/riak_kv.app.src
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
crypto,
riak_api,
riak_core,
luke,
erlang_js,
mochiweb,
webmachine,
Expand Down
8 changes: 5 additions & 3 deletions src/riak_kv_app.erl
Original file line number Diff line number Diff line change
Expand Up @@ -115,12 +115,14 @@ start(_Type, _StartArgs) ->
[true, false],
false),

%% mapred_system should remain until no nodes still exist
%% that would propose 'legacy' as the default choice
riak_core_capability:register({riak_kv, mapred_system},
[pipe, legacy],
legacy,
[pipe],
pipe,
{riak_kv,
mapred_system,
[{pipe, pipe}, {legacy, legacy}]}),
[{pipe, pipe}]}),

riak_core_capability:register({riak_kv, mapred_2i_pipe},
[true, false],
Expand Down
45 changes: 11 additions & 34 deletions src/riak_kv_buckets_fsm.erl
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
-type req_id() :: non_neg_integer().

-record(state, {buckets=sets:new() :: [term()],
client_type :: plain | mapred,
from :: from()}).

-include("riak_kv_dtrace.hrl").
Expand All @@ -45,26 +44,20 @@
%% the number of primary preflist vnodes the operation
%% should cover, the service to use to check for available nodes,
%% and the registered name to use to access the vnode master process.
init(From={_, _, ClientPid}, [ItemFilter, Timeout, ClientType]) ->
init(From={_, _, ClientPid}, [ItemFilter, Timeout]) ->
ClientNode = atom_to_list(node(ClientPid)),
PidStr = pid_to_list(ClientPid),
FilterX = if ItemFilter == none -> 0;
true -> 1
end,
case ClientType of
%% Link to the mapred job so we die if the job dies
mapred ->
?DTRACE(?C_BUCKETS_INIT, [1, FilterX],
[<<"mapred">>, ClientNode, PidStr]),
link(ClientPid);
_ ->
?DTRACE(?C_BUCKETS_INIT, [2, FilterX],
[<<"other">>, ClientNode, PidStr])
end,
%% "other" is a legacy term from when MapReduce used this FSM (in
%% which case, the string "mapred" would appear
?DTRACE(?C_BUCKETS_INIT, [2, FilterX],
[<<"other">>, ClientNode, PidStr]),
%% Construct the bucket listing request
Req = ?KV_LISTBUCKETS_REQ{item_filter=ItemFilter},
{Req, allup, 1, 1, riak_kv, riak_kv_vnode_master, Timeout,
#state{client_type=ClientType, from=From}}.
#state{from=From}}.

process_results(done, StateData) ->
{done, StateData};
Expand All @@ -78,31 +71,15 @@ process_results({error, Reason}, _State) ->
{error, Reason}.

finish({error, Error},
StateData=#state{client_type=ClientType,
from={raw, ReqId, ClientPid}}) ->
StateData=#state{from={raw, ReqId, ClientPid}}) ->
?DTRACE(?C_BUCKETS_FINISH, [-1], []),
case ClientType of
mapred ->
%% An error occurred or the timeout interval elapsed
%% so all we can do now is die so that the rest of the
%% MapReduce processes will also die and be cleaned up.
exit(Error);
plain ->
%% Notify the requesting client that an error
%% occurred or the timeout has elapsed.
ClientPid ! {ReqId, Error}
end,
%% Notify the requesting client that an error
%% occurred or the timeout has elapsed.
ClientPid ! {ReqId, Error},
{stop, normal, StateData};
finish(clean,
StateData=#state{buckets=Buckets,
client_type=ClientType,
from={raw, ReqId, ClientPid}}) ->
case ClientType of
mapred ->
luke_flow:add_inputs(Buckets),
luke_flow:finish_inputs(ClientPid);
plain ->
ClientPid ! {ReqId, {buckets, sets:to_list(Buckets)}}
end,
ClientPid ! {ReqId, {buckets, sets:to_list(Buckets)}},
?DTRACE(?C_BUCKETS_FINISH, [0], []),
{stop, normal, StateData}.
Loading

0 comments on commit b0255c0

Please sign in to comment.