diff --git a/rebar.config b/rebar.config index d2924d233c..75de49b972 100644 --- a/rebar.config +++ b/rebar.config @@ -9,7 +9,7 @@ ]}. {deps, [ - {riak_core, ".*", {git, "git://github.com/basho/riak_core", "master"}}, + {riak_core, ".*", {git, "git://github.com/ricardobcl/riak_core", "dvvset"}}, {erlang_js, ".*", {git, "git://github.com/basho/erlang_js", "master"}}, {bitcask, ".*", {git, "git://github.com/basho/bitcask", "master"}}, {merge_index, ".*", {git, "git://github.com/basho/merge_index", diff --git a/src/riak_client.erl b/src/riak_client.erl index d92e3a207a..342285c5cc 100644 --- a/src/riak_client.erl +++ b/src/riak_client.erl @@ -254,7 +254,7 @@ delete(Bucket,Key,Options,Timeout) when is_list(Options) -> delete(Bucket,Key,RW,Timeout) -> delete(Bucket,Key,[{rw, RW}], Timeout). -%% @spec delete_vclock(riak_object:bucket(), riak_object:key(), vclock:vclock()) -> +%% @spec delete_vclock(riak_object:bucket(), riak_object:key(), dvvset:clock()) -> %% ok | %% {error, too_many_fails} | %% {error, notfound} | @@ -266,7 +266,7 @@ delete(Bucket,Key,RW,Timeout) -> delete_vclock(Bucket,Key,VClock) -> delete_vclock(Bucket,Key,VClock,[{rw,default}],?DEFAULT_TIMEOUT). -%% @spec delete_vclock(riak_object:bucket(), riak_object:key(), vclock::vclock(), RW :: integer()) -> +%% @spec delete_vclock(riak_object:bucket(), riak_object:key(), dvvset:clock(), RW :: integer()) -> %% ok | %% {error, too_many_fails} | %% {error, notfound} | @@ -280,7 +280,7 @@ delete_vclock(Bucket,Key,VClock,Options) when is_list(Options) -> delete_vclock(Bucket,Key,VClock,RW) -> delete_vclock(Bucket,Key,VClock,[{rw, RW}],?DEFAULT_TIMEOUT). -%% @spec delete_vclock(riak_object:bucket(), riak_object:key(), vclock:vclock(), RW :: integer(), +%% @spec delete_vclock(riak_object:bucket(), riak_object:key(), dvvset:clock(), RW :: integer(), %% TimeoutMillisecs :: integer()) -> %% ok | %% {error, too_many_fails} | diff --git a/src/riak_index.erl b/src/riak_index.erl index 4a8cfbf988..ac72c75da5 100644 --- a/src/riak_index.erl +++ b/src/riak_index.erl @@ -400,7 +400,7 @@ parse_object_hook_test() -> end, ?assertMatch( - {r_object, _, _, _, _, _, _}, + {r_object, _, _, _, _, _}, F([ {<<"field_bin">>, <<"A">>}, {<<"field_int">>, <<"1">>} diff --git a/src/riak_kv_backup.erl b/src/riak_kv_backup.erl index 0aeab87971..95649da172 100644 --- a/src/riak_kv_backup.erl +++ b/src/riak_kv_backup.erl @@ -154,12 +154,10 @@ read_and_restore_function(Client, BinTerm) -> make_binary_bucket(Bucket, Key, OriginalObj) when is_atom(Bucket) -> Bucket1 = list_to_binary(atom_to_list(Bucket)), OriginalContents = riak_object:get_contents(OriginalObj), - OriginalVClock = riak_object:vclock(OriginalObj), % We can't change the bucket name without creating a new object... NewObj = riak_object:new(Bucket1, Key, placeholder), - NewObj1 = riak_object:set_contents(NewObj, OriginalContents), - _NewObj2 = riak_object:set_vclock(NewObj1, OriginalVClock); + _NewObj1 = riak_object:set_contents(NewObj, OriginalContents); %% If the bucket name is a binary, just pass it on through... make_binary_bucket(Bucket, _Key, Obj) when is_binary(Bucket) -> Obj. diff --git a/src/riak_kv_delete.erl b/src/riak_kv_delete.erl index 8d9a4cc96d..96ed3f2da3 100644 --- a/src/riak_kv_delete.erl +++ b/src/riak_kv_delete.erl @@ -66,7 +66,7 @@ delete(ReqId,Bucket,Key,Options,Timeout,Client,ClientId,undefined) -> case C:get(Bucket,Key,[{r,R},{pr,PR},{timeout,Timeout}]) of {ok, OrigObj} -> RemainingTime = Timeout - (riak_core_util:moment() - RealStartTime), - delete(ReqId,Bucket,Key,Options,RemainingTime,Client,ClientId,riak_object:vclock(OrigObj)); + delete(ReqId,Bucket,Key,Options,RemainingTime,Client,ClientId,riak_object:get_vclock(OrigObj)); {error, notfound} -> ?DTRACE(?C_DELETE_INIT1, [-2], []), Client ! {ReqId, {error, notfound}}; @@ -85,7 +85,7 @@ delete(ReqId,Bucket,Key,Options,Timeout,Client,ClientId,VClock) -> {W, PW, DW} -> Obj0 = riak_object:new(Bucket, Key, <<>>, dict:store(?MD_DELETED, "true", dict:new())), - Tombstone = riak_object:set_vclock(Obj0, VClock), + Tombstone = riak_object:set_vclock(Obj0, VClock), %% same value as current Obj0 {ok,C} = riak:local_client(ClientId), Reply = C:put(Tombstone, [{w,W},{pw,PW},{dw, DW},{timeout,Timeout}]), Client ! {ReqId, Reply}, @@ -240,7 +240,7 @@ invalid_w_delete() -> Key = <<"testkey">>, Timeout = 60000, riak_kv_delete_sup:start_delete(node(), [RequestId, Bucket, Key, [{w,W}], - Timeout, self(), undefined, vclock:fresh()]), + Timeout, self(), undefined, riak_object:new_vclock()]), %% Wait for error response receive {_RequestId, Result} -> @@ -275,7 +275,7 @@ invalid_pw_delete() -> Key = <<"testkey">>, Timeout = 60000, riak_kv_delete_sup:start_delete(node(), [RequestId, Bucket, Key, - [{pw,PW}], Timeout, self(), undefined, vclock:fresh()]), + [{pw,PW}], Timeout, self(), undefined, riak_object:new_vclock()]), %% Wait for error response receive {_RequestId, Result} -> diff --git a/src/riak_kv_encoding_migrate.erl b/src/riak_kv_encoding_migrate.erl index 2fa179781c..4e984e506c 100644 --- a/src/riak_kv_encoding_migrate.erl +++ b/src/riak_kv_encoding_migrate.erl @@ -248,9 +248,8 @@ decode_object(RO) -> copy_object(RO, B, K) -> {ok, RC} = riak:local_client(), NO1 = riak_object:new(B, K, <<>>), - NO2 = riak_object:set_vclock(NO1, riak_object:vclock(RO)), - NO3 = riak_object:set_contents(NO2, riak_object:get_contents(RO)), - RC:put(NO3). + NO2 = riak_object:set_contents(NO1, riak_object:get_contents(RO)), + RC:put(NO2). %% Force writes to fail to test failure behavior precommit_fail(_) -> @@ -308,19 +307,17 @@ test_migration() -> {ok, []} = riak_kv_encoding_migrate:delete_migrated_objects(EObjs), {not_needed, [], []} = riak_kv_encoding_migrate:check_cluster(), - C1 = riak_object:get_contents(O2), - V1 = riak_object:vclock(O2), + C1 = riak_object:get_md_values(O2), - C2 = riak_object:get_contents(O4), - V2 = riak_object:vclock(O4), + C2 = riak_object:get_md_values(O4), {ok, MO1} = RC:get(<<"me@mine">>, <<"key">>), - nearly_equal_contents(C1, riak_object:get_contents(MO1)), - true = vclock:descends(riak_object:vclock(MO1), V1), + nearly_equal_contents(C1, riak_object:get_md_values(MO1)), + true = riak_object:descendant(MO1, O2), {ok, MO2} = RC:get(<<"bucket">>, <<"key@">>), - nearly_equal_contents(C2, riak_object:get_contents(MO2)), - true = vclock:descends(riak_object:vclock(MO2), V2), + nearly_equal_contents(C2, riak_object:get_md_values(MO2)), + true = riak_object:descendant(MO2, O4), %% Use precommit hook to test failure scenarios O7 = riak_object:new(<<"fail">>, <<"key%40">>, <<"value">>), diff --git a/src/riak_kv_get_core.erl b/src/riak_kv_get_core.erl index 1ae3975926..1ea7f351ea 100644 --- a/src/riak_kv_get_core.erl +++ b/src/riak_kv_get_core.erl @@ -118,7 +118,7 @@ response(GetCore = #getcore{r = R, num_ok = NumOk, num_notfound = NumNotFound, ok -> Merged; % {ok, MObj} tombstone when DeletedVClock -> - {error, {deleted, riak_object:vclock(MObj)}}; + {error, {deleted, riak_object:get_vclock(MObj)}}; _ -> % tombstone or notfound {error, notfound} end; @@ -160,7 +160,7 @@ final_action(GetCore = #getcore{n = N, merged = Merged0, results = Results, []; _ -> % ok or tombstone [{Idx, outofdate} || {Idx, {ok, RObj}} <- Results, - strict_descendant(MObj, RObj)] ++ + riak_object:strict_descendant(MObj, RObj)] ++ [{Idx, notfound} || {Idx, {error, notfound}} <- Results] end, Action = case ReadRepairs of @@ -203,10 +203,6 @@ info(#getcore{num_ok = NumOks, num_fail = NumFail, results = Results}) -> %% Internal functions %% ==================================================================== -strict_descendant(O1, O2) -> - vclock:descends(riak_object:vclock(O1),riak_object:vclock(O2)) andalso - not vclock:descends(riak_object:vclock(O2),riak_object:vclock(O1)). - merge(Replies, AllowMult) -> RObjs = [RObj || {_I, {ok, RObj}} <- Replies], case RObjs of diff --git a/src/riak_kv_get_fsm.erl b/src/riak_kv_get_fsm.erl index b50a13c63a..40d33701d3 100644 --- a/src/riak_kv_get_fsm.erl +++ b/src/riak_kv_get_fsm.erl @@ -466,10 +466,10 @@ update_stats(_, #state{ bkey = {Bucket, _}, tracked_bucket = StatTracked, calcul %% calling term_to_binary/1, but it should be easier on memory, %% especially for objects with large values. calculate_objsize(Bucket, Obj) -> - Contents = riak_object:get_contents(Obj), + Contents = riak_object:get_md_values(Obj), size(Bucket) + size(riak_object:key(Obj)) + - size(term_to_binary(riak_object:vclock(Obj))) + + size(term_to_binary(riak_object:get_vclock(Obj))) + lists:sum([size(term_to_binary(MD)) + value_size(Value) || {MD, Value} <- Contents]). value_size(Value) when is_binary(Value) -> size(Value); diff --git a/src/riak_kv_index_hashtree.erl b/src/riak_kv_index_hashtree.erl index fb177d899f..f3a8f128e4 100644 --- a/src/riak_kv_index_hashtree.erl +++ b/src/riak_kv_index_hashtree.erl @@ -358,11 +358,7 @@ load_built(#state{trees=Trees}) -> %% Generate a hash value for a binary-encoded `riak_object' -spec hash_object(riak_object_t2b()) -> binary(). hash_object(RObjBin) -> - %% Normalize the `riak_object' vector clock before hashing - RObj = binary_to_term(RObjBin), - Vclock = riak_object:vclock(RObj), - UpdObj = riak_object:set_vclock(RObj, lists:sort(Vclock)), - Hash = erlang:phash2(term_to_binary(UpdObj)), + Hash = erlang:phash2(RObjBin), term_to_binary(Hash). %% Fold over a given vnode's data, inserting each object into the appropriate diff --git a/src/riak_kv_pb_object.erl b/src/riak_kv_pb_object.erl index e92b7762a4..400d4d5a88 100644 --- a/src/riak_kv_pb_object.erl +++ b/src/riak_kv_pb_object.erl @@ -111,11 +111,11 @@ process(#rpbgetreq{bucket=B, key=K, r=R0, pr=PR0, notfound_ok=NFOk, make_option(notfound_ok, NFOk) ++ make_option(basic_quorum, BQ)) of {ok, O} -> - case erlify_rpbvc(VClock) == riak_object:vclock(O) of + case riak_object:equal_vclock(erlify_rpbvc(VClock),riak_object:get_vclock(O)) of true -> {reply, #rpbgetresp{unchanged = true}, State}; _ -> - Contents = riak_object:get_contents(O), + Contents = riak_object:get_md_values(O), PbContent = case Head of true -> %% Remove all the 'value' fields from the contents @@ -127,7 +127,7 @@ process(#rpbgetreq{bucket=B, key=K, r=R0, pr=PR0, notfound_ok=NFOk, riak_pb_kv_codec:encode_contents(Contents) end, {reply, #rpbgetresp{content = PbContent, - vclock = pbify_rpbvc(riak_object:vclock(O))}, State} + vclock = pbify_rpbvc(riak_object:get_vclock(O))}, State} end; {error, {deleted, TombstoneVClock}} -> %% Found a tombstone - return its vector clock so it can @@ -146,7 +146,7 @@ process(#rpbputreq{bucket=B, key=K, vclock=PbVC, {ok, _} when NoneMatch -> {error, "match_found", State}; {ok, O} when NotMod -> - case erlify_rpbvc(PbVC) == riak_object:vclock(O) of + case erlify_rpbvc(PbVC) == riak_object:get_vclock(O) of true -> process(Req#rpbputreq{if_not_modified=undefined, if_none_match=undefined}, @@ -203,7 +203,7 @@ process(#rpbputreq{bucket=B, key=K, vclock=PbVC, content=RpbContent, ok -> {reply, #rpbputresp{}, State}; {ok, Obj} -> - Contents = riak_object:get_contents(Obj), + Contents = riak_object:get_md_values(Obj), PbContents = case ReturnHead of true -> %% Remove all the 'value' fields from the contents @@ -215,7 +215,7 @@ process(#rpbputreq{bucket=B, key=K, vclock=PbVC, content=RpbContent, riak_pb_kv_codec:encode_contents(Contents) end, PutResp = #rpbputresp{content = PbContents, - vclock = pbify_rpbvc(riak_object:vclock(Obj)), + vclock = pbify_rpbvc(riak_object:get_vclock(Obj)), key = ReturnKey }, {reply, PutResp, State}; @@ -289,9 +289,9 @@ make_option(K, V) -> %% Convert a vector clock to erlang erlify_rpbvc(undefined) -> - vclock:fresh(); + riak_object:new_vclock(); erlify_rpbvc(<<>>) -> - vclock:fresh(); + riak_object:new_vclock(); erlify_rpbvc(PbVc) -> binary_to_term(zlib:unzip(PbVc)). diff --git a/src/riak_kv_util.erl b/src/riak_kv_util.erl index 107017b246..e83ad032b5 100644 --- a/src/riak_kv_util.erl +++ b/src/riak_kv_util.erl @@ -68,7 +68,7 @@ is_x_deleted(Obj) -> %% deleted. Return is the atom 'undefined' if all contents %% are marked deleted, or the input Obj if any of them are not. obj_not_deleted(Obj) -> - case [{M, V} || {M, V} <- riak_object:get_contents(Obj), + case [{M, V} || {M, V} <- riak_object:get_md_values(Obj), dict:is_key(<<"X-Riak-Deleted">>, M) =:= false] of [] -> undefined; _ -> Obj diff --git a/src/riak_kv_vnode.erl b/src/riak_kv_vnode.erl index a97a0502e9..e2b096ea7c 100644 --- a/src/riak_kv_vnode.erl +++ b/src/riak_kv_vnode.erl @@ -728,7 +728,7 @@ do_put(Sender, {Bucket,_Key}=BKey, RObj, ReqID, StartTime, Options, State) -> PruneTime = StartTime end, Coord = proplists:get_value(coord, Options, false), - PutArgs = #putargs{returnbody=proplists:get_value(returnbody,Options,false) orelse Coord, + PutArgs = #putargs{returnbody=Coord orelse proplists:get_value(returnbody,Options,false), coord=Coord, lww=proplists:get_value(last_write_wins, BProps, false), bkey=BKey, @@ -795,7 +795,7 @@ prepare_put(#state{vnodeid=VId, coord=Coord, lww=LWW, starttime=StartTime, - prunetime=PruneTime}, + prunetime=_PruneTime}, IndexBackend) -> case Mod:get(Bucket, Key, ModState) of {error, not_found, _UpdModState} -> @@ -818,7 +818,6 @@ prepare_put(#state{vnodeid=VId, {oldobj, OldObj1} -> {{false, OldObj1}, PutArgs}; {newobj, NewObj} -> - VC = riak_object:vclock(NewObj), AMObj = enforce_allow_mult(NewObj, BProps), case IndexBackend of true -> @@ -828,17 +827,7 @@ prepare_put(#state{vnodeid=VId, false -> IndexSpecs = [] end, - case PruneTime of - undefined -> - ObjToStore = AMObj; - _ -> - ObjToStore = - riak_object:set_vclock(AMObj, - vclock:prune(VC, - PruneTime, - BProps)) - end, - {{true, ObjToStore}, + {{true, AMObj}, PutArgs#putargs{index_specs=IndexSpecs, is_index=IndexBackend}} end end. @@ -883,54 +872,25 @@ enforce_allow_mult(Obj, BProps) -> case proplists:get_value(allow_mult, BProps) of true -> Obj; _ -> - case riak_object:get_contents(Obj) of - [_] -> Obj; - Mult -> - {MD, V} = select_newest_content(Mult), - riak_object:set_contents(Obj, [{MD, V}]) + case riak_object:value_count(Obj) of + 1 -> Obj; + _ -> + riak_object:set_lww(Obj) end end. -%% @private -%% choose the latest content to store for the allow_mult=false case -select_newest_content(Mult) -> - hd(lists:sort( - fun({MD0, _}, {MD1, _}) -> - riak_core_util:compare_dates( - dict:fetch(<<"X-Riak-Last-Modified">>, MD0), - dict:fetch(<<"X-Riak-Last-Modified">>, MD1)) - end, - Mult)). - %% @private put_merge(false, true, _CurObj, UpdObj, _VId, _StartTime) -> % coord=false, LWW=true {newobj, UpdObj}; put_merge(false, false, CurObj, UpdObj, _VId, _StartTime) -> % coord=false, LWW=false - ResObj = riak_object:syntactic_merge(CurObj, UpdObj), - case ResObj =:= CurObj of - true -> - {oldobj, CurObj}; - false -> - {newobj, ResObj} - end; -put_merge(true, true, _CurObj, UpdObj, VId, StartTime) -> % coord=false, LWW=true + {newobj, riak_object:syntactic_merge(CurObj, UpdObj)}; +put_merge(true, true, _CurObj, UpdObj, VId, StartTime) -> % coord=true, LWW=true {newobj, riak_object:increment_vclock(UpdObj, VId, StartTime)}; put_merge(true, false, CurObj, UpdObj, VId, StartTime) -> - UpdObj1 = riak_object:increment_vclock(UpdObj, VId, StartTime), - UpdVC = riak_object:vclock(UpdObj1), - CurVC = riak_object:vclock(CurObj), - - %% Check the coord put will replace the existing object - case vclock:get_counter(VId, UpdVC) > vclock:get_counter(VId, CurVC) andalso - vclock:descends(CurVC, UpdVC) == false andalso - vclock:descends(UpdVC, CurVC) == true of - true -> - {newobj, UpdObj1}; - false -> - %% If not, make sure it does - {newobj, riak_object:increment_vclock( - riak_object:merge(CurObj, UpdObj1), VId, StartTime)} - end. + CurObj1 = riak_object:apply_updates(CurObj), + UpdObj1 = riak_object:apply_updates(UpdObj), + ResObj = riak_object:update_vclock(UpdObj1, CurObj1, VId, StartTime), + {newobj, ResObj}. %% @private do_get(_Sender, BKey, ReqID, @@ -1113,8 +1073,8 @@ do_get_vclocks(KeyList,_State=#state{mod=Mod,modstate=ModState}) -> %% @private do_get_vclock({Bucket, Key}, Mod, ModState) -> case Mod:get(Bucket, Key, ModState) of - {error, not_found, _UpdModState} -> vclock:fresh(); - {ok, Val, _UpdModState} -> riak_object:vclock(binary_to_term(Val)) + {error, not_found, _UpdModState} -> riak_object:new_vclock(); + {ok, Val, _UpdModState} -> riak_object:get_vclock(binary_to_term(Val)) end. %% @private diff --git a/src/riak_kv_wm_link_walker.erl b/src/riak_kv_wm_link_walker.erl index a8b6649c5b..5ddd388ed1 100644 --- a/src/riak_kv_wm_link_walker.erl +++ b/src/riak_kv_wm_link_walker.erl @@ -403,7 +403,7 @@ multipart_encode_body(NestedResults, Ctx) when is_list(NestedResults) -> multipart_encode_body(RiakObject, Ctx) -> APIVersion = Ctx#ctx.api_version, Prefix = Ctx#ctx.prefix, - [{MD, V}|Rest] = riak_object:get_contents(RiakObject), + [{MD, V}|Rest] = riak_object:get_md_values(RiakObject), {VHead, Vclock} = riak_kv_wm_utils:vclock_header(RiakObject), [VHead,": ",Vclock,"\r\n", diff --git a/src/riak_kv_wm_object.erl b/src/riak_kv_wm_object.erl index 5f0110f24b..fb991fb836 100644 --- a/src/riak_kv_wm_object.erl +++ b/src/riak_kv_wm_object.erl @@ -757,7 +757,7 @@ produce_multipart_body(RD, Ctx=#ctx{doc={ok, Doc}, bucket=B, prefix=P}) -> Boundary = riak_core_util:unique_id_62(), {[[["\r\n--",Boundary,"\r\n", riak_kv_wm_utils:multipart_encode_body(P, B, Content, APIVersion)] - || Content <- riak_object:get_contents(Doc)], + || Content <- riak_object:get_md_values(Doc)], "\r\n--",Boundary,"--\r\n"], wrq:set_resp_header(?HEAD_CTYPE, "multipart/mixed; boundary="++Boundary, @@ -774,7 +774,7 @@ produce_multipart_body(RD, Ctx=#ctx{doc={ok, Doc}, bucket=B, prefix=P}) -> select_doc(#ctx{doc={ok, Doc}, vtag=Vtag}) -> case riak_object:get_update_value(Doc) of undefined -> - case riak_object:get_contents(Doc) of + case riak_object:get_md_values(Doc) of [Single] -> Single; Mult -> case lists:dropwhile( @@ -804,7 +804,7 @@ encode_vclock_header(RD, #ctx{doc={error, {deleted, VClock}}}) -> %% into something suitable for an HTTP header vclock_header(Doc) -> {?HEAD_VCLOCK, - encode_vclock(riak_object:vclock(Doc))}. + encode_vclock(riak_object:get_vclock(Doc))}. encode_vclock(VClock) -> binary_to_list(base64:encode(zlib:zip(term_to_binary(VClock)))). @@ -815,7 +815,7 @@ encode_vclock(VClock) -> %% vclock is returned. decode_vclock_header(RD) -> case wrq:get_req_header(?HEAD_VCLOCK, RD) of - undefined -> vclock:fresh(); + undefined -> riak_object:new_vclock(); Head -> binary_to_term(zlib:unzip(base64:decode(Head))) end. @@ -864,7 +864,7 @@ generate_etag(RD, Ctx) -> {dict:fetch(?MD_VTAG, MD), RD, Ctx}; multiple_choices -> {ok, Doc} = Ctx#ctx.doc, - <> = crypto:md5(term_to_binary(riak_object:vclock(Doc))), + <> = crypto:md5(term_to_binary(riak_object:get_vclock(Doc))), {riak_core_util:integer_to_list(ETag, 62), RD, Ctx} end. diff --git a/src/riak_kv_wm_utils.erl b/src/riak_kv_wm_utils.erl index abce4365cf..3d3000e2e7 100644 --- a/src/riak_kv_wm_utils.erl +++ b/src/riak_kv_wm_utils.erl @@ -149,7 +149,7 @@ multipart_encode_body(Prefix, Bucket, {MD, V}, APIVersion) -> %% into something suitable for an HTTP header vclock_header(Doc) -> {?HEAD_VCLOCK, - encode_vclock(riak_object:vclock(Doc))}. + encode_vclock(riak_object:get_vclock(Doc))}. encode_vclock(VClock) -> binary_to_list(base64:encode(zlib:zip(term_to_binary(VClock)))). diff --git a/src/riak_object.erl b/src/riak_object.erl index f3d5d90318..ac5f8cf427 100644 --- a/src/riak_object.erl +++ b/src/riak_object.erl @@ -32,7 +32,6 @@ -type key() :: binary(). -type bucket() :: binary(). -%% -type bkey() :: {bucket(), key()}. -type value() :: term(). -record(r_content, { @@ -44,25 +43,27 @@ -record(r_object, { bucket :: bucket(), key :: key(), - contents :: [#r_content{}], - vclock = vclock:fresh() :: vclock:vclock(), + contents :: dvvset:clock(), %% a container for riak_content()'s, with built-in causal information updatemetadata=dict:store(clean, true, dict:new()) :: dict(), updatevalue :: term() }). + -opaque riak_object() :: #r_object{}. +-opaque riak_content() :: #r_content{}. -type index_op() :: add | remove. -type index_value() :: integer() | binary(). -define(MAX_KEY_SIZE, 65536). --export([new/3, new/4, ensure_robject/1, ancestors/1, reconcile/2, equal/2]). --export([increment_vclock/2, increment_vclock/3]). --export([key/1, get_metadata/1, get_metadatas/1, get_values/1, get_value/1]). --export([vclock/1, update_value/2, update_metadata/2, bucket/1, value_count/1]). +-export([new/3, new/4, ensure_robject/1, equal/2, new_vclock/0, equal_vclock/2]). +-export([increment_vclock/2, increment_vclock/3, update_vclock/4, update_vclock/3]). +-export([reconcile/2, descendant/2, strict_descendant/2, key/1]). +-export([get_metadata/1, get_metadatas/1, get_values/1, get_value/1]). +-export([get_vclock/1, update_value/2, update_metadata/2, bucket/1, value_count/1]). -export([get_update_metadata/1, get_update_value/1, get_contents/1]). --export([merge/2, apply_updates/1, syntactic_merge/2]). --export([to_json/1, from_json/1]). +-export([apply_updates/1, syntactic_merge/2, compare_content_dates/2, set_lww/1]). +-export([get_md_values/1,to_json/1, from_json/1]). -export([index_specs/1, diff_index_specs/2]). -export([set_contents/2, set_vclock/2]). %% INTERNAL, only for riak_* @@ -82,32 +83,47 @@ new(B, K, V, C) when is_binary(B), is_binary(K), is_list(C) -> %% NOTE: Removed "is_tuple(MD)" guard to make Dialyzer happy. The previous clause %% has a guard for string(), so this clause is OK without the guard. new(B, K, V, MD) when is_binary(B), is_binary(K) -> - case size(K) > ?MAX_KEY_SIZE of + case byte_size(K) > ?MAX_KEY_SIZE of true -> throw({error,key_too_large}); false -> case MD of no_initial_metadata -> - Contents = [#r_content{metadata=dict:new(), value=V}], + Contents = dvvset:new([#r_content{metadata=dict:new(), value=V}]), #r_object{bucket=B,key=K, - contents=Contents,vclock=vclock:fresh()}; + contents=Contents}; _ -> - Contents = [#r_content{metadata=MD, value=V}], + Contents = dvvset:new([#r_content{metadata=MD, value=V}]), #r_object{bucket=B,key=K,updatemetadata=MD, - contents=Contents,vclock=vclock:fresh()} + contents=Contents} end end. +-spec new_vclock() -> dvvset:clock(). +new_vclock() -> []. + %% Ensure the incoming term is a riak_object. -spec ensure_robject(any()) -> riak_object(). ensure_robject(Obj = #r_object{}) -> Obj. --spec equal(riak_object(), riak_object()) -> true | false. +-spec strict_descendant(riak_object(), riak_object()) -> boolean(). +strict_descendant(#r_object{contents=C1},#r_object{contents=C2}) -> + dvvset:less(C2,C1). + +-spec descendant(riak_object(), riak_object()) -> boolean(). +descendant(#r_object{contents=C1},#r_object{contents=C2}) -> + dvvset:equal(C1,C2) orelse dvvset:less(C2,C1). + +-spec equal_vclock(dvvset:vector(), dvvset:vector()) -> boolean(). +equal_vclock(C1, C2) -> + dvvset:equal(C1,C2). + +-spec equal(riak_object(), riak_object()) -> boolean(). %% @doc Deep (expensive) comparison of Riak objects. equal(Obj1,Obj2) -> (Obj1#r_object.bucket =:= Obj2#r_object.bucket) andalso (Obj1#r_object.key =:= Obj2#r_object.key) - andalso vclock:equal(vclock(Obj1),vclock(Obj2)) + andalso equal_vclock(get_vclock(Obj1), get_vclock(Obj2)) andalso equal2(Obj1,Obj2). equal2(Obj1,Obj2) -> UM1 = lists:keysort(1, dict:to_list(Obj1#r_object.updatemetadata)), @@ -115,75 +131,58 @@ equal2(Obj1,Obj2) -> (UM1 =:= UM2) andalso (Obj1#r_object.updatevalue =:= Obj2#r_object.updatevalue) andalso begin - Cont1 = lists:sort(Obj1#r_object.contents), - Cont2 = lists:sort(Obj2#r_object.contents), + Cont1 = lists:sort(get_md_values(Obj1)), + Cont2 = lists:sort(get_md_values(Obj2)), equal_contents(Cont1,Cont2) end. equal_contents([],[]) -> true; equal_contents(_,[]) -> false; equal_contents([],_) -> false; -equal_contents([C1|R1],[C2|R2]) -> - MD1 = lists:keysort(1, dict:to_list(C1#r_content.metadata)), - MD2 = lists:keysort(1, dict:to_list(C2#r_content.metadata)), +equal_contents([{M1,V1}|R1],[{M2,V2}|R2]) -> + MD1 = lists:keysort(1, dict:to_list(M1)), + MD2 = lists:keysort(1, dict:to_list(M2)), (MD1 =:= MD2) - andalso (C1#r_content.value =:= C2#r_content.value) + andalso (V1 =:= V2) andalso equal_contents(R1,R2). -%% @spec reconcile([riak_object()], boolean()) -> riak_object() -%% @doc Reconcile a list of riak objects. If AllowMultiple is true, -%% the riak_object returned may contain multiple values if Objects -%% contains sibling versions (objects that could not be syntactically -%% merged). If AllowMultiple is false, the riak_object returned will -%% contain the value of the most-recently-updated object, as per the -%% X-Riak-Last-Modified header. -reconcile(Objects, AllowMultiple) -> - RObjs = reconcile(Objects), - AllContents = lists:flatten([O#r_object.contents || O <- RObjs]), + + +% @doc Reconcile the object from the client and the object from the server. +% If AllowMultiple is true, +% the riak_object returned may contain multiple values if Objects +% contains sibling versions (objects that could not be syntactically +% merged). If AllowMultiple is false, the riak_object returned will +% contain the value of the most-recently-updated object, as per the +% X-Riak-Last-Modified header. +-spec reconcile([riak_object()], boolean()) -> riak_object(). +reconcile(RObjs, AllowMultiple) -> + AllContents = [O#r_object.contents || O <- RObjs], + SyncedContents = dvvset:sync(AllContents), Contents = case AllowMultiple of - false -> - [most_recent_content(AllContents)]; - true -> - lists:usort(AllContents) + false -> most_recent_content(SyncedContents); + true -> SyncedContents end, - VClock = vclock:merge([O#r_object.vclock || O <- RObjs]), HdObj = hd(RObjs), - HdObj#r_object{contents=Contents,vclock=VClock, + HdObj#r_object{contents=Contents, updatemetadata=dict:store(clean, true, dict:new()), updatevalue=undefined}. -%% @spec ancestors([riak_object()]) -> [riak_object()] -%% @doc Given a list of riak_object()s, return the objects that are pure -%% ancestors of other objects in the list, if any. The changes in the -%% objects returned by this function are guaranteed to be reflected in -%% the other objects in Objects, and can safely be discarded from the list -%% without losing data. -ancestors(pure_baloney_to_fool_dialyzer) -> - [#r_object{vclock = vclock:fresh()}]; -ancestors(Objects) -> - ToRemove = [[O2 || O2 <- Objects, - vclock:descends(O1#r_object.vclock,O2#r_object.vclock), - (vclock:descends(O2#r_object.vclock,O1#r_object.vclock) == false)] - || O1 <- Objects], - lists:flatten(ToRemove). - -%% @spec reconcile([riak_object()]) -> [riak_object()] -reconcile(Objects) -> - All = sets:from_list(Objects), - Del = sets:from_list(ancestors(Objects)), - remove_duplicate_objects(sets:to_list(sets:subtract(All, Del))). - -remove_duplicate_objects(Os) -> rem_dup_objs(Os,[]). -rem_dup_objs([],Acc) -> Acc; -rem_dup_objs([O|Rest],Acc) -> - EqO = [AO || AO <- Acc, riak_object:equal(AO,O) =:= true], - case EqO of - [] -> rem_dup_objs(Rest,[O|Acc]); - _ -> rem_dup_objs(Rest,Acc) - end. +-spec syntactic_merge(riak_object(), riak_object()) -> riak_object(). +syntactic_merge(CurrentObj, NewObj) -> + UCurr = apply_updates(CurrentObj), + UNew = apply_updates(NewObj), + reconcile([UNew, UCurr], true). + + +-spec set_lww(riak_object()) -> riak_object(). +set_lww(Object=#r_object{contents=C}) -> + set_contents(Object,most_recent_content(C)). -most_recent_content(AllContents) -> - hd(lists:sort(fun compare_content_dates/2, AllContents)). +-spec most_recent_content(riak_content()) -> riak_content(). +most_recent_content(Contents) -> + dvvset:lww(fun riak_object:compare_content_dates/2, Contents). +-spec compare_content_dates(riak_content(), riak_content()) -> boolean(). compare_content_dates(C1,C2) -> D1 = dict:fetch(<<"X-Riak-Last-Modified">>, C1#r_content.metadata), D2 = dict:fetch(<<"X-Riak-Last-Modified">>, C2#r_content.metadata), @@ -209,123 +208,151 @@ compare_content_dates(C1,C2) -> C1 < C2 end. -%% @spec merge(riak_object(), riak_object()) -> riak_object() -%% @doc Merge the contents and vclocks of OldObject and NewObject. -%% Note: This function calls apply_updates on NewObject. -merge(OldObject, NewObject) -> - NewObj1 = apply_updates(NewObject), - OldObject#r_object{contents=lists:umerge(lists:usort(NewObject#r_object.contents), - lists:usort(OldObject#r_object.contents)), - vclock=vclock:merge([OldObject#r_object.vclock, - NewObj1#r_object.vclock]), - updatemetadata=dict:store(clean, true, dict:new()), - updatevalue=undefined}. - -%% @spec apply_updates(riak_object()) -> riak_object() -%% @doc Promote pending updates (made with the update_value() and -%% update_metadata() calls) to this riak_object. -apply_updates(Object=#r_object{}) -> - VL = case Object#r_object.updatevalue of - undefined -> - [C#r_content.value || C <- Object#r_object.contents]; - _ -> - [Object#r_object.updatevalue] - end, - MD = case dict:find(clean, Object#r_object.updatemetadata) of - {ok,_} -> - MDs = [C#r_content.metadata || C <- Object#r_object.contents], - case Object#r_object.updatevalue of - undefined -> MDs; - _ -> [hd(MDs)] - end; - error -> - [dict:erase(clean,Object#r_object.updatemetadata) || _X <- VL] - end, - Contents = [#r_content{metadata=M,value=V} || {M,V} <- lists:zip(MD, VL)], - Object#r_object{contents=Contents, + +% @doc Promote pending updates (made with the update_value() and +% update_metadata() calls) to this riak_object. +-spec apply_updates(riak_object()) -> riak_object(). +apply_updates(Object = #r_object{updatemetadata = Updatemetadata, + updatevalue = VL}) -> + CurrentContents = get_contents(Object), + UpdatedContents = + case VL of + undefined -> + case dict:find(clean, Updatemetadata) of + {ok,_} -> CurrentContents; %% no changes in values or metadata + error -> + NewMD = dict:erase(clean, Updatemetadata), + dvvset:map( + fun (R) -> #r_content{metadata=NewMD, value=R#r_content.value} end, + CurrentContents) + end; + _ -> + MD = case dict:find(clean, Updatemetadata) of + {ok,_} -> + hd(get_metadatas(Object)); + error -> + dict:erase(clean, Updatemetadata) + end, + NewR = #r_content{metadata=MD,value=VL}, + %% extract the causal information + VersionVector = dvvset:join(CurrentContents), + %% construct a new clock with the same causal information as the previous, + %% but with the new value only. + dvvset:new(VersionVector,[NewR]) + end, + Object#r_object{contents=UpdatedContents, updatemetadata=dict:store(clean, true, dict:new()), updatevalue=undefined}. -%% @spec bucket(riak_object()) -> bucket() %% @doc Return the containing bucket for this riak_object. +-spec bucket(riak_object()) -> bucket(). bucket(#r_object{bucket=Bucket}) -> Bucket. -%% @spec key(riak_object()) -> key() %% @doc Return the key for this riak_object. +-spec key(riak_object()) -> key(). key(#r_object{key=Key}) -> Key. -%% @spec vclock(riak_object()) -> vclock:vclock() -%% @doc Return the vector clock for this riak_object. -vclock(#r_object{vclock=VClock}) -> VClock. +%% @doc Return the logical clock for this riak_object. +-spec get_vclock(riak_object()) -> dvvset:vector(). +get_vclock(#r_object{contents=Clock}) -> dvvset:join(Clock). -%% @spec value_count(riak_object()) -> non_neg_integer() %% @doc Return the number of values (siblings) of this riak_object. -value_count(#r_object{contents=Contents}) -> length(Contents). +-spec value_count(riak_object()) -> non_neg_integer(). +value_count(#r_object{contents=Contents}) -> dvvset:size(Contents). -%% @spec get_contents(riak_object()) -> [{dict(), value()}] %% @doc Return the contents (a list of {metadata, value} tuples) for %% this riak_object. -get_contents(#r_object{contents=Contents}) -> - [{Content#r_content.metadata, Content#r_content.value} || - Content <- Contents]. +-spec get_md_values(riak_object()) -> [{dict(), term()}]. +get_md_values(#r_object{contents=Contents}) -> + [{C#r_content.metadata, C#r_content.value} || C <- dvvset:values(Contents)]. + +%% @doc Return the contents (dvvset:clock()) as is. +-spec get_contents(riak_object()) -> dvvset:clock(). +get_contents(#r_object{contents=Contents}) -> Contents. -%% @spec get_metadata(riak_object()) -> dict() %% @doc Assert that this riak_object has no siblings and return its associated %% metadata. This function will fail with a badmatch error if the %% object has siblings (value_count() > 1). -get_metadata(O=#r_object{}) -> - % this blows up intentionally (badmatch) if more than one content value! - [{Metadata,_V}] = get_contents(O), - Metadata. +-spec get_metadata(riak_object()) -> dict(). +get_metadata(#r_object{contents=Contents}) -> + % this blows up intentionally (badmatch) if more than one content value! + 1 = dvvset:size(Contents), + V = dvvset:last(fun riak_object:compare_content_dates/2, Contents), + V#r_content.metadata. -%% @spec get_metadatas(riak_object()) -> [dict()] %% @doc Return a list of the metadata values for this riak_object. -get_metadatas(#r_object{contents=Contents}) -> - [Content#r_content.metadata || Content <- Contents]. +-spec get_metadatas(riak_object()) -> [dict()]. +get_metadatas(#r_object{contents=C}) -> + [V#r_content.metadata || V <- dvvset:values(C)]. -%% @spec get_values(riak_object()) -> [value()] %% @doc Return a list of object values for this riak_object. -get_values(#r_object{contents=C}) -> [Content#r_content.value || Content <- C]. +-spec get_values(riak_object()) -> [value()]. +get_values(#r_object{contents=C}) -> + [V#r_content.value || V <- dvvset:values(C)]. -%% @spec get_value(riak_object()) -> value() %% @doc Assert that this riak_object has no siblings and return its associated %% value. This function will fail with a badmatch error if the object %% has siblings (value_count() > 1). -get_value(Object=#r_object{}) -> - % this blows up intentionally (badmatch) if more than one content value! - [{_M,Value}] = get_contents(Object), - Value. +-spec get_value(riak_object()) -> value(). +get_value(#r_object{contents=C}) -> + % this blows up intentionally (badmatch) if more than one content value! + 1 = dvvset:size(C), + V = dvvset:last(fun riak_object:compare_content_dates/2, C), + V#r_content.value. -%% @spec update_metadata(riak_object(), dict()) -> riak_object() %% @doc Set the updated metadata of an object to M. +-spec update_metadata(riak_object(), dict()) -> riak_object(). update_metadata(Object=#r_object{}, M) -> Object#r_object{updatemetadata=dict:erase(clean, M)}. -%% @spec update_value(riak_object(), value()) -> riak_object() %% @doc Set the updated value of an object to V +-spec update_value(riak_object(), value()) -> riak_object(). update_value(Object=#r_object{}, V) -> Object#r_object{updatevalue=V}. -%% @spec get_update_metadata(riak_object()) -> dict() %% @doc Return the updated metadata of this riak_object. +-spec get_update_metadata(riak_object()) -> dict(). get_update_metadata(#r_object{updatemetadata=UM}) -> UM. -%% @spec get_update_value(riak_object()) -> value() %% @doc Return the updated value of this riak_object. +-spec get_update_value(riak_object()) -> value(). get_update_value(#r_object{updatevalue=UV}) -> UV. -%% @spec set_vclock(riak_object(), vclock:vclock()) -> riak_object() -%% @doc INTERNAL USE ONLY. Set the vclock of riak_object O to V. -set_vclock(Object=#r_object{}, VClock) -> Object#r_object{vclock=VClock}. +%% @doc Set a clock of the riak_object using a version vector, +%% obtained in a get_vclock. +-spec set_vclock(riak_object(), dvvset:vector()) -> riak_object(). +set_vclock(Object, Clock) -> + Vs = dvvset:values(get_contents(apply_updates(Object))), + %% set the contents to a new clock with the same causal information + %% as the version vector, but with the new list of values. + Object#r_object{contents=dvvset:new(Clock,Vs)}. + +%% @doc INTERNAL USE ONLY. Set the contents of riak_object +%% to the Contents. Normal clients should use the +%% set_update_[value|metadata]() + apply_updates() method for changing +%% object contents. +-spec set_contents(riak_object(), riak_content()) -> riak_object(). +set_contents(Object=#r_object{}, Contents) -> + Object#r_object{contents=Contents}. + +%% @doc Increment the entry for Id in O's vclock. +-spec increment_vclock(riak_object(), any(), non_neg_integer()) -> riak_object(). +increment_vclock(Object=#r_object{}, Id, _TS) -> + increment_vclock(Object, Id). + +%% @doc Increment the entry for Id in O's vclock (ignore timestamp since we are not pruning). +-spec increment_vclock(riak_object(), any()) -> riak_object(). +increment_vclock(Object=#r_object{contents=Conts}, Id) -> + Object#r_object{contents=dvvset:update(Conts, Id)}. + +-spec update_vclock(riak_object(), riak_object(), any()) -> riak_object(). +update_vclock(ObjectC=#r_object{contents=ContC}, #r_object{contents=ContR}, Id) -> + ObjectC#r_object{contents=dvvset:update(ContC, ContR, Id)}. + +-spec update_vclock(riak_object(), riak_object(), any(), non_neg_integer()) -> riak_object(). +update_vclock(ObjectC=#r_object{}, ObjectR=#r_object{}, Id, _TS) -> + update_vclock(ObjectC, ObjectR, Id). -%% @doc Increment the entry for ClientId in O's vclock. --spec increment_vclock(riak_object(), vclock:vclock_node()) -> riak_object(). -increment_vclock(Object=#r_object{}, ClientId) -> - Object#r_object{vclock=vclock:increment(ClientId, Object#r_object.vclock)}. -%% @doc Increment the entry for ClientId in O's vclock. --spec increment_vclock(riak_object(), vclock:vclock_node(), vclock:timestamp()) -> riak_object(). -increment_vclock(Object=#r_object{}, ClientId, Timestamp) -> - Object#r_object{vclock=vclock:increment(ClientId, Timestamp, Object#r_object.vclock)}. %% @doc Prepare a list of index specifications %% to pass to the backend. This function is for @@ -381,27 +408,45 @@ index_data(Obj) -> assemble_index_specs(Indexes, IndexOp) -> [{IndexOp, Index, Value} || {Index, Value} <- Indexes]. -%% @spec set_contents(riak_object(), [{dict(), value()}]) -> riak_object() -%% @doc INTERNAL USE ONLY. Set the contents of riak_object to the -%% {Metadata, Value} pairs in MVs. Normal clients should use the -%% set_update_[value|metadata]() + apply_updates() method for changing -%% object contents. -set_contents(Object=#r_object{}, MVs) when is_list(MVs) -> - Object#r_object{contents=[#r_content{metadata=M,value=V} || {M, V} <- MVs]}. -%% @spec to_json(riak_object()) -> {struct, list(any())} %% @doc Converts a riak_object into its JSON equivalent +-spec to_json(riak_object()) -> {struct, list(any())}. to_json(Obj=#r_object{}) -> - {_,Vclock} = riak_kv_wm_utils:vclock_header(Obj), - {struct, [{<<"bucket">>, riak_object:bucket(Obj)}, - {<<"key">>, riak_object:key(Obj)}, - {<<"vclock">>, list_to_binary(Vclock)}, - {<<"values">>, - [{struct, - [{<<"metadata">>, jsonify_metadata(MD)}, - {<<"data">>, V}]} - || {MD, V} <- riak_object:get_contents(Obj) - ]}]}. + {CV,AV} = get_contents(Obj), + {struct, + [ {<<"bucket">>, bucket(Obj)}, + {<<"key">>, key(Obj)}, + {<<"contents">>, + { struct, + [ {<<"causal_values">>, + [ {struct, + [ {<<"id">>, riak_kv_wm_utils:encode_vclock(Id)}, + {<<"counter">>, riak_kv_wm_utils:encode_vclock(Counter)}, + {<<"values">>, + [ {struct, + [ {<<"metadata">>, jsonify_metadata(Rcont#r_content.metadata)}, + {<<"value">>, Rcont#r_content.value} + ] + } || Rcont <- Values + ] + } + ] + } || {Id, Counter, Values} <- CV + ] + }, + {<<"anonym_values">>, + [ {struct, + [ {<<"metadata">>, jsonify_metadata(Rcont#r_content.metadata)}, + {<<"value">>, Rcont#r_content.value} + ] + } || Rcont <- AV + ] + } + ] + } + } + ] + }. -spec from_json(any()) -> riak_object(). from_json({struct, Obj}) -> @@ -409,12 +454,9 @@ from_json({struct, Obj}) -> from_json(Obj) -> Bucket = proplists:get_value(<<"bucket">>, Obj), Key = proplists:get_value(<<"key">>, Obj), - VClock0 = proplists:get_value(<<"vclock">>, Obj), - VClock = binary_to_term(zlib:unzip(base64:decode(VClock0))), - [{struct, Values}] = proplists:get_value(<<"values">>, Obj), - RObj0 = riak_object:new(Bucket, Key, <<"">>), - RObj1 = riak_object:set_vclock(RObj0, VClock), - riak_object:set_contents(RObj1, dejsonify_values(Values, [])). + C = proplists:get_value(<<"contents">>, Obj), + RObj0 = new(Bucket, Key, <<"">>), + set_contents(RObj0, dejsonify_contents(C)). jsonify_metadata(MD) -> MDJS = fun({LastMod, Now={_,_,_}}) -> @@ -473,10 +515,32 @@ jsonify_proplist(List) -> end end, dict:new(), List)). + + +dejsonify_contents({struct, [{<<"causal_values">>, V},{<<"anonym_values">>, A}]}) -> + V1 = dejsonify_contents(V,[]), + A1 = dejsonify_values(A), + {V1,A1}. + + +dejsonify_contents([], Accum) -> + lists:reverse(Accum); +dejsonify_contents([{struct, [ + {<<"id">>, Id}, + {<<"counter">>, Counter}, + {<<"values">>, Values}]} | T], Accum) -> + V = dejsonify_values(Values), + Id2 = binary_to_term(zlib:unzip(base64:decode(Id))), + Counter2 = binary_to_term(zlib:unzip(base64:decode(Counter))), + dejsonify_contents(T, [{Id2,Counter2,V}|Accum]). + + +dejsonify_values(V) -> dejsonify_values(V,[]). dejsonify_values([], Accum) -> lists:reverse(Accum); -dejsonify_values([{<<"metadata">>, {struct, MD0}}, - {<<"data">>, D}|T], Accum) -> +dejsonify_values([{struct, [ + {<<"metadata">>, {struct, MD0}}, + {<<"value">>, V}]} | T], Accum) -> Converter = fun({Key, Val}) -> case Key of <<"Links">> -> @@ -493,7 +557,7 @@ dejsonify_values([{<<"metadata">>, {struct, MD0}}, end end, MD = dict:from_list([Converter(KV) || KV <- MD0]), - dejsonify_values(T, [{MD, D}|Accum]). + dejsonify_values(T, [#r_content{metadata=MD, value=V}|Accum]). %% @doc convert structs back into proplists dejsonify_meta_value({struct, PList}) -> @@ -505,41 +569,8 @@ dejsonify_meta_value({struct, PList}) -> [{Key, dejsonify_meta_value(V)}|Acc] end, [], PList); dejsonify_meta_value(Value) -> Value. - - -is_updated(_Object=#r_object{updatemetadata=M,updatevalue=V}) -> - case dict:find(clean, M) of - error -> true; - {ok,_} -> - case V of - undefined -> false; - _ -> true - end - end. -syntactic_merge(CurrentObject, NewObject) -> - %% Paranoia in case objects were incorrectly stored - %% with update information. Vclock is not updated - %% but since no data is lost the objects will be - %% fixed if rewritten. - UpdatedNew = case is_updated(NewObject) of - true -> apply_updates(NewObject); - false -> NewObject - end, - UpdatedCurr = case is_updated(CurrentObject) of - true -> apply_updates(CurrentObject); - false -> CurrentObject - end, - - case ancestors([UpdatedCurr, UpdatedNew]) of - [] -> merge(UpdatedCurr, UpdatedNew); - [Ancestor] -> - case equal(Ancestor, UpdatedCurr) of - true -> UpdatedNew; - false -> UpdatedCurr - end - end. -ifdef(TEST). @@ -564,14 +595,9 @@ update_test() -> V2 = riak_object:get_value(O2), {O,O2}. -ancestor_test() -> +reconcile_test() -> {O,O2} = update_test(), O3 = riak_object:increment_vclock(O2,self()), - [O] = riak_object:ancestors([O,O3]), - {O,O3}. - -reconcile_test() -> - {O,O3} = ancestor_test(), O3 = riak_object:reconcile([O,O3],true), O3 = riak_object:reconcile([O,O3],false), {O,O3}. @@ -588,7 +614,7 @@ merge2_test() -> O1 = riak_object:increment_vclock(object_test(), node1), O2 = riak_object:increment_vclock(riak_object:new(B,K,V), node2), O3 = riak_object:syntactic_merge(O1, O2), - [node1, node2] = [N || {N,_} <- riak_object:vclock(O3)], + [node1, node2] = dvvset:ids(riak_object:get_contents(O3)), 2 = riak_object:value_count(O3). merge3_test() -> @@ -635,8 +661,9 @@ inequality_value_test() -> inequality_multivalue_test() -> O1 = riak_object:new(<<"test">>, <<"a">>, "value"), - [C] = riak_object:get_contents(O1), - O1p = riak_object:set_contents(O1, [C,C]), + C = riak_object:get_contents(O1), + 1 = riak_object:value_count(O1), + O1p = riak_object:set_contents(O1, {[C,C],[]}), false = riak_object:equal(O1, O1p), false = riak_object:equal(O1p, O1). @@ -723,24 +750,6 @@ get_update_metadata_test() -> riak_object:get_update_metadata( riak_object:update_metadata(O, NewMD))). -is_updated_test() -> - O = riak_object:new(<<"test">>, <<"test">>, test), - ?assertNot(is_updated(O)), - OMu = riak_object:update_metadata( - O, dict:store(<<"X-Test-Update">>, "testupdate", - riak_object:get_metadata(O))), - ?assert(is_updated(OMu)), - OVu = riak_object:update_value(O, testupdate), - ?assert(is_updated(OVu)). - -remove_duplicates_test() -> - O0 = riak_object:new(<<"test">>, <<"test">>, zero), - O1 = riak_object:new(<<"test">>, <<"test">>, one), - ND = remove_duplicate_objects([O0, O0, O1, O1, O0, O1]), - ?assertEqual(2, length(ND)), - ?assert(lists:member(O0, ND)), - ?assert(lists:member(O1, ND)). - new_with_ctype_test() -> O = riak_object:new(<<"b">>, <<"k">>, <<"{\"a\":1}">>, "application/json"), ?assertEqual("application/json", dict:fetch(?MD_CTYPE, riak_object:get_metadata(O))). @@ -768,14 +777,34 @@ jsonify_round_trip_test() -> {?MD_INDEX, Indexes}, {?MD_LINKS, Links}]), O = riak_object:new(<<"b">>, <<"k">>, <<"{\"a\":1}">>, MD), + [V] = dvvset:values(get_contents(O)), O2 = from_json(to_json(O)), ?assertEqual(bucket(O), bucket(O2)), ?assertEqual(key(O), key(O2)), - ?assert(vclock:equal(vclock(O), vclock(O2))), ?assertEqual(lists:sort(Meta), lists:sort(dict:fetch(?MD_USERMETA, get_metadata(O2)))), ?assertEqual(Links, dict:fetch(?MD_LINKS, get_metadata(O2))), ?assertEqual(lists:sort(Indexes), lists:sort(index_data(O2))), - ?assertEqual(get_contents(O), get_contents(O2)). + O3 = increment_vclock(O,"a"), + O3a = set_contents(O3, dvvset:new(dvvset:join(get_contents(O3)),[V])), + O4 = increment_vclock(O3a,"a"), + O4a = set_contents(O4, dvvset:new(dvvset:join(get_contents(O4)),[V])), + O5 = update_vclock(O4a,O3a,"b"), + O3b = from_json(to_json(O3a)), + O4b = from_json(to_json(O4a)), + O5b = from_json(to_json(O5)), + ?assert(dvvset:equal(get_contents(O), get_contents(O2))), + ?assert(dvvset:equal(get_contents(O3a), get_contents(O3b))), + ?assert(dvvset:equal(get_contents(O4a), get_contents(O4b))), + ?assert(dvvset:equal(get_contents(O5), get_contents(O5b))), + ?assertEqual(get_contents(O), get_contents(O2)), + ?assertEqual(get_contents(O3a), get_contents(O3b)), + ?assertEqual(get_contents(O4a), get_contents(O4b)), + ?assertEqual(get_contents(O5), get_contents(O5b)), + ?assertEqual(get_md_values(O), get_md_values(O2)), + ?assertEqual(get_md_values(O3a), get_md_values(O3b)), + ?assertEqual(get_md_values(O4a), get_md_values(O4b)), + ?assertEqual(get_md_values(O5), get_md_values(O5b)). + check_most_recent({V1, T1, D1}, {V2, T2, D2}) -> MD1 = dict:store(<<"X-Riak-Last-Modified">>, T1, D1), @@ -784,16 +813,16 @@ check_most_recent({V1, T1, D1}, {V2, T2, D2}) -> O1 = riak_object:new(<<"test">>, <<"a">>, V1, MD1), O2 = riak_object:new(<<"test">>, <<"a">>, V2, MD2), - C1 = hd(O1#r_object.contents), - C2 = hd(O2#r_object.contents), + C1 = (dvvset:last(fun riak_object:compare_content_dates/2, get_contents(O1))), + C2 = (dvvset:last(fun riak_object:compare_content_dates/2, get_contents(O2))), - C3 = most_recent_content([C1, C2]), - C4 = most_recent_content([C2, C1]), + C3 = most_recent_content(dvvset:new([C1, C2])), + C4 = most_recent_content(dvvset:new([C2, C1])), ?assertEqual(C3, C4), - C3#r_content.value. - + (dvvset:last(fun riak_object:compare_content_dates/2, C3))#r_content.value. + determinstic_most_recent_test() -> D = calendar:datetime_to_gregorian_seconds( diff --git a/test/fsm_eqc_util.erl b/test/fsm_eqc_util.erl index 6b14874268..300e564bdb 100644 --- a/test/fsm_eqc_util.erl +++ b/test/fsm_eqc_util.erl @@ -141,7 +141,7 @@ build_riak_obj(B,K,Vc,Val,tombstone) -> add_tombstone(Obj). add_tombstone(Obj) -> - [{M,V}] = riak_object:get_contents(Obj), + [{M,V}] = riak_object:get_md_values(Obj), NewM = dict:store(<<"X-Riak-Deleted">>, true, M), riak_object:set_contents(Obj, [{NewM, V}]).