diff --git a/ydb/core/base/blobstorage.h b/ydb/core/base/blobstorage.h index 34c07e6feaa3..afbe5784a99b 100644 --- a/ydb/core/base/blobstorage.h +++ b/ydb/core/base/blobstorage.h @@ -1468,6 +1468,10 @@ struct TEvBlobStorage { } } + static ui8 BlobPlacementKind(const TLogoBlobID &blob) { + return blob.Hash() % BaseDomainsCount; + } + static bool GetBlobIdWithSamePlacement(const TLogoBlobID &originalId, TLogoBlobID *patchedId, ui32 bitsForBruteForce, ui32 originalGroupId, ui32 currentGroupId) { @@ -2407,4 +2411,19 @@ inline bool SendPutToGroup(const TActorContext &ctx, ui32 groupId, TTabletStorag // TODO(alexvru): check if return status is actually needed? } +inline bool SendPatchToGroup(const TActorContext &ctx, ui32 groupId, TTabletStorageInfo *storage, + THolder event, ui64 cookie = 0, NWilson::TTraceId traceId = {}) { + auto checkGroupId = [&] { + const TLogoBlobID &id = event->PatchedId; + const ui32 expectedGroupId = storage->GroupFor(id.Channel(), id.Generation()); + const TLogoBlobID &originalId = event->OriginalId; + const ui32 expectedOriginalGroupId = storage->GroupFor(originalId.Channel(), originalId.Generation()); + return id.TabletID() == storage->TabletID && expectedGroupId != Max() && groupId == expectedGroupId && event->OriginalGroupId == expectedOriginalGroupId; + }; + Y_VERIFY_S(checkGroupId(), "groupIds# (" << event->OriginalGroupId << ',' << groupId << ") does not match actual ones LogoBlobIds# (" << + event->OriginalId.ToString() << ',' << event->PatchedId.ToString() << ')'); + return SendToBSProxy(ctx, groupId, event.Release(), cookie, std::move(traceId)); + // TODO(alexvru): check if return status is actually needed? +} + } // NKikimr diff --git a/ydb/core/blobstorage/dsproxy/mock/dsproxy_mock.cpp b/ydb/core/blobstorage/dsproxy/mock/dsproxy_mock.cpp index 777ecf4b7ebf..1a39d931f91f 100644 --- a/ydb/core/blobstorage/dsproxy/mock/dsproxy_mock.cpp +++ b/ydb/core/blobstorage/dsproxy/mock/dsproxy_mock.cpp @@ -55,6 +55,11 @@ namespace NKikimr { "not implemented")), 0, ev->Cookie); } + void Handle(TEvBlobStorage::TEvPatch::TPtr& ev) { + STLOG(PRI_DEBUG, BS_PROXY, BSPM10, "TEvPatch", (Msg, ev->Get()->ToString())); + Send(ev->Sender, CopyExecutionRelay(ev->Get(), Model->Handle(ev->Get())), 0, ev->Cookie); + } + template TOut *CopyExecutionRelay(TIn *in, TOut *out) { out->ExecutionRelay = std::move(in->ExecutionRelay); @@ -80,6 +85,7 @@ namespace NKikimr { hFunc(TEvBlobStorage::TEvRange, Handle); hFunc(TEvBlobStorage::TEvCollectGarbage, Handle); hFunc(TEvBlobStorage::TEvStatus, Handle); + hFunc(TEvBlobStorage::TEvPatch, Handle); hFunc(TEvents::TEvPoisonPill, HandlePoison); hFunc(TEvBlobStorage::TEvConfigureProxy, Handle); diff --git a/ydb/core/blobstorage/dsproxy/mock/model.h b/ydb/core/blobstorage/dsproxy/mock/model.h index a3f545578754..59accba10d84 100644 --- a/ydb/core/blobstorage/dsproxy/mock/model.h +++ b/ydb/core/blobstorage/dsproxy/mock/model.h @@ -1,5 +1,6 @@ #pragma once +#include #include #include @@ -149,6 +150,64 @@ namespace NFake { return result.release(); } + TEvBlobStorage::TEvPatchResult* Handle(TEvBlobStorage::TEvPatch *msg) { + // ensure we have full blob id, with PartId set to zero + const TLogoBlobID& id = msg->PatchedId; + Y_ABORT_UNLESS(id == id.FullID()); + + // validate put against set blocks + if (IsBlocked(id.TabletID(), id.Generation())) { + return new TEvBlobStorage::TEvPatchResult(NKikimrProto::BLOCKED, id, GetStorageStatusFlags(), GroupId, 0.f); + } + + // check if this blob is not being collected -- writing such blob is a violation of BS contract + Y_ABORT_UNLESS(!IsCollectedByBarrier(id), "Id# %s", id.ToString().data()); + + + const TLogoBlobID& originalId = msg->OriginalId; + auto it = Blobs.find(originalId); + if (it == Blobs.end()) { + // ensure this blob is not under GC + Y_ABORT_UNLESS(!IsCollectedByBarrier(id), "Id# %s", id.ToString().data()); + return new TEvBlobStorage::TEvPatchResult(NKikimrProto::ERROR, id, GetStorageStatusFlags(), GroupId, 0.f); + } + + auto& data = it->second; + // TODO(kruall): check bad diffs + TString buffer = TString::Uninitialized(data.Buffer.GetSize()); + auto originalBuffer = data.Buffer.GetContiguousSpan(); + memcpy(buffer.Detach(), originalBuffer.data(), buffer.size()); + for (ui32 diffIdx = 0; diffIdx < msg->DiffCount; ++diffIdx) { + auto &diff = msg->Diffs[diffIdx]; + auto diffBuffer = diff.Buffer.GetContiguousSpan(); + memcpy(buffer.Detach() + diff.Offset, diffBuffer.data(), diffBuffer.size()); + } + + + // validate that there are no blobs with the same gen/step, channel, cookie, but with different size + const TLogoBlobID base(id.TabletID(), id.Generation(), id.Step(), id.Channel(), 0, id.Cookie()); + auto iter = Blobs.lower_bound(base); + if (iter != Blobs.end()) { + const TLogoBlobID& existing = iter->first; + Y_ABORT_UNLESS( + id.TabletID() != existing.TabletID() || + id.Generation() != existing.Generation() || + id.Step() != existing.Step() || + id.Cookie() != existing.Cookie() || + id.Channel() != existing.Channel() || + id == existing, + "id# %s existing# %s", id.ToString().data(), existing.ToString().data()); + if (id == existing) { + Y_ABORT_UNLESS(iter->second.Buffer == buffer); + } + } + + // put an entry into logo blobs database and reply with success + Blobs.emplace(id, TRope(buffer)); + + return new TEvBlobStorage::TEvPatchResult(NKikimrProto::OK, id, GetStorageStatusFlags(), GroupId, 0.f); + } + TEvBlobStorage::TEvBlockResult* Handle(TEvBlobStorage::TEvBlock *msg) { NKikimrProto::EReplyStatus status = NKikimrProto::OK; diff --git a/ydb/core/keyvalue/keyvalue_intermediate.h b/ydb/core/keyvalue/keyvalue_intermediate.h index fb2895d22a1a..0eb903be0de5 100644 --- a/ydb/core/keyvalue/keyvalue_intermediate.h +++ b/ydb/core/keyvalue/keyvalue_intermediate.h @@ -102,13 +102,30 @@ struct TIntermediate { struct TSetExecutorFastLogPolicy { bool IsAllowed; }; + struct TPatch { + struct TDiff { + ui32 Offset; + TRope Buffer; + }; + + TString OriginalKey; + TLogoBlobID OriginalBlobId; + TString PatchedKey; + TLogoBlobID PatchedBlobId; + + NKikimrProto::EReplyStatus Status; + TStorageStatusFlags StatusFlags; + + TVector Diffs; + }; - using TCmd = std::variant; + using TCmd = std::variant; using TReadCmd = std::variant; TDeque Reads; TDeque RangeReads; TDeque Writes; + TDeque Patches; TDeque Deletes; TDeque Renames; TDeque CopyRanges; @@ -120,6 +137,7 @@ struct TIntermediate { TStackVec Commands; TStackVec WriteIndices; + TStackVec PatchIndices; std::optional ReadCommand; ui64 WriteCount = 0; diff --git a/ydb/core/keyvalue/keyvalue_state.cpp b/ydb/core/keyvalue/keyvalue_state.cpp index 73e6cc76e4bf..140146b04b05 100644 --- a/ydb/core/keyvalue/keyvalue_state.cpp +++ b/ydb/core/keyvalue/keyvalue_state.cpp @@ -765,6 +765,25 @@ TLogoBlobID TKeyValueState::AllocateLogoBlobId(ui32 size, ui32 storageChannelIdx return id; } +TLogoBlobID TKeyValueState::AllocatePatchedLogoBlobId(ui32 size, ui32 storageChannelIdx, TLogoBlobID originalBlobId, ui64 requestUid) { + ui32 generation = ExecutorGeneration; + TLogoBlobID id; + using TEvPatch = TEvBlobStorage::TEvPatch; + do { + id = TLogoBlobID(TabletId, generation, NextLogoBlobStep, storageChannelIdx, size, NextLogoBlobCookie); + if (NextLogoBlobCookie < TLogoBlobID::MaxCookie) { + NextLogoBlobCookie++; + } else { + Step(); + } + } while (TEvPatch::BlobPlacementKind(id) != TEvPatch::BlobPlacementKind(originalBlobId)); + Y_ABORT_UNLESS(!CollectOperation || THelpers::GenerationStep(id) > + THelpers::TGenerationStep(CollectOperation->Header.GetCollectGeneration(), CollectOperation->Header.GetCollectStep())); + ++InFlightForStep[id.Step()]; + ++RequestUidStepToCount[std::make_tuple(requestUid, id.Step())]; + return id; +} + void TKeyValueState::RequestExecute(THolder &intermediate, ISimpleDb &db, const TActorContext &ctx, const TTabletStorageInfo *info) { if (IsDamaged) { @@ -1089,6 +1108,38 @@ void TKeyValueState::ProcessCmd(TIntermediate::TWrite &request, } } +void TKeyValueState::ProcessCmd(TIntermediate::TPatch &request, + NKikimrClient::TKeyValueResponse::TPatchResult *legacyResponse, + NKikimrKeyValue::StorageChannel *response, + ISimpleDb &db, const TActorContext &ctx, TRequestStat &/*stat*/, ui64 unixTime, + TIntermediate* /*intermediate*/) +{ + TIndexRecord& record = Index[request.PatchedKey]; + Dereference(record, db, ctx); + + record.Chain = {}; + + record.Chain.push_back(TIndexRecord::TChainItem(request.PatchedBlobId, 0)); + CountWriteRecord(request.PatchedBlobId); // TODO(kruall) change to CountPatchRecord + + ui32 storage_channel = request.PatchedBlobId.Channel() + MainStorageChannelInPublicApi; + // ctx.Send(ChannelBalancerActorId, new TChannelBalancer::TEvReportWriteLatency(channel, request.Latency)); + + record.CreationUnixTime = unixTime; + UpdateKeyValue(request.PatchedKey, record, db, ctx); + + if (legacyResponse) { + legacyResponse->SetStatus(NKikimrProto::OK); + legacyResponse->SetStatusFlags(request.StatusFlags.Raw); + } + if (response) { + response->set_status(NKikimrKeyValue::Statuses::RSTATUS_OK); + response->set_status_flag(GetStatusFlag(request.StatusFlags)); + response->set_storage_channel(storage_channel); + } +} + + void TKeyValueState::ProcessCmd(const TIntermediate::TDelete &request, NKikimrClient::TKeyValueResponse::TDeleteRangeResult *legacyResponse, NKikimrKeyValue::StorageChannel */*response*/, @@ -1396,6 +1447,10 @@ void TKeyValueState::CmdCmds(THolder &intermediate, ISimpleDb &db wasWrite = true; return intermediate->Response.AddWriteResult(); } + if constexpr (std::is_same_v) { + wasWrite = true; + return intermediate->Response.AddPatchResult(); + } if constexpr (std::is_same_v) { return intermediate->Response.AddDeleteRangeResult(); } @@ -1517,6 +1572,29 @@ TKeyValueState::TCheckResult TKeyValueState::CheckCmd(const TIntermediate::TWrit return {}; } +TKeyValueState::TCheckResult TKeyValueState::CheckCmd(const TIntermediate::TPatch &cmd, TKeySet& keys, ui32 index) const +{ + auto it = keys.find(cmd.OriginalKey); + if (it == keys.end()) { + TStringStream str; + str << "KeyValue# " << TabletId + << " InputKey# " << EscapeC(cmd.OriginalKey) << " does not exist in CmdPatch(" << index << ")" + << " Marker# KV28"; + return {NKikimrKeyValue::Statuses::RSTATUS_NOT_FOUND, str.Str()}; + } + + if (!IsKeyLengthValid(cmd.PatchedKey)) { + TStringStream str; + str << "KeyValue# " << TabletId + << " Key length in CmdPatch(" << index << ") is " << cmd.PatchedKey.length() << " but max is " << MaxKeySize + << " Marker# KV27"; + return {NKikimrKeyValue::Statuses::RSTATUS_INTERNAL_ERROR, str.Str()}; + } + keys.insert(cmd.PatchedKey); + return {}; +} + + TKeyValueState::TCheckResult TKeyValueState::CheckCmd(const TIntermediate::TGetStatus &/*cmd*/, TKeySet& /*keys*/, ui32 /*index*/) const { return {}; } @@ -1589,6 +1667,7 @@ void TKeyValueState::ProcessCmds(THolder &intermediate, ISimpleDb success = success && CheckCmds(intermediate, intermediate->Concats, ctx, keys, info); success = success && CheckCmds(intermediate, intermediate->Deletes, ctx, keys, info); success = success && CheckCmds(intermediate, intermediate->Writes, ctx, keys, info); + success = success && CheckCmds(intermediate, intermediate->Patches, ctx, keys, info); success = success && CheckCmds(intermediate, ctx, keys, info); success = success && CheckCmds(intermediate, intermediate->GetStatuses, ctx, keys, info); if (!success) { @@ -2302,6 +2381,111 @@ bool TKeyValueState::PrepareCmdWrite(const TActorContext &ctx, NKikimrClient::TK return false; } +bool TKeyValueState::PrepareCmdPatch(const TActorContext &ctx, NKikimrClient::TKeyValueRequest &kvRequest, + TEvKeyValue::TEvRequest& ev, THolder &intermediate, const TTabletStorageInfo *info) { + + intermediate->PatchIndices.reserve(kvRequest.CmdPatchSize()); + for (ui32 patchIdx = 0; patchIdx < kvRequest.CmdPatchSize(); ++patchIdx) { + auto& request = kvRequest.GetCmdPatch(patchIdx); + intermediate->PatchIndices.push_back(intermediate->Commands.size()); + auto& cmd = intermediate->Commands.emplace_back(TIntermediate::TPatch()); + auto& interm = std::get(cmd); + + if (!request.HasOriginalKey()) { + TStringStream str; + str << "KeyValue# " << TabletId; + str << " Missing OriginalKey in CmdPatch(" << patchIdx << ") Marker# KV35"; + ReplyError(ctx, str.Str(), NMsgBusProxy::MSTATUS_INTERNALERROR, NKikimrKeyValue::Statuses::RSTATUS_INTERNAL_ERROR, intermediate); + return true; + } + interm.OriginalKey = request.GetOriginalKey(); + if (!request.HasPatchedKey()) { + TStringStream str; + str << "KeyValue# " << TabletId; + str << " Missing PatchedKey in CmdPatch(" << patchIdx << ") Marker# KV37"; + ReplyError(ctx, str.Str(), NMsgBusProxy::MSTATUS_INTERNALERROR, NKikimrKeyValue::Statuses::RSTATUS_INTERNAL_ERROR, intermediate); + return true; + } + interm.PatchedKey = request.GetPatchedKey(); + + auto it = Index.find(request.GetOriginalKey()); + if (it == Index.end()) { + TStringStream str; + str << "KeyValue# " << TabletId; + str << " Missing Value/PayloadId in CmdPatch(" << patchIdx << ") Marker# KV39"; + ReplyError(ctx, str.Str(), NMsgBusProxy::MSTATUS_INTERNALERROR, NKikimrKeyValue::Statuses::RSTATUS_INTERNAL_ERROR, intermediate); + return true; + } + + auto &[key, indexRecord] = *it; + if (indexRecord.Chain.size() > 1) { + TStringStream str; + str << "KeyValue# " << TabletId; + str << " Patching work only with one-blob values instead of many-blob values in CmdPatch(" << patchIdx << ") Marker# KV62"; + ReplyError(ctx, str.Str(), NMsgBusProxy::MSTATUS_INTERNALERROR, NKikimrKeyValue::Statuses::RSTATUS_INTERNAL_ERROR, intermediate); + return true; + } + + if (indexRecord.Chain.size() == 0 || indexRecord.Chain[0].IsInline()) { + TStringStream str; + str << "KeyValue# " << TabletId; + str << " Patching only with one-blob values instead of none-blob values in CmdPatch(" << patchIdx << ") Marker# KV63"; + ReplyError(ctx, str.Str(), NMsgBusProxy::MSTATUS_INTERNALERROR, NKikimrKeyValue::Statuses::RSTATUS_INTERNAL_ERROR, intermediate); + return true; + } + + interm.Diffs.resize(request.DiffsSize()); + for (ui32 diffIdx = 0; diffIdx < request.DiffsSize(); ++diffIdx) { + auto &diff = request.GetDiffs(diffIdx); + switch (diff.GetDataCase()) { + case NKikimrClient::TKeyValueRequest::TCmdPatch::TDiff::kValue: + interm.Diffs[diffIdx].Buffer = TRope(diff.GetValue()); + break; + + case NKikimrClient::TKeyValueRequest::TCmdPatch::TDiff::kPayloadId: + interm.Diffs[diffIdx].Buffer = ev.GetPayload(diff.GetPayloadId()); + break; + + case NKikimrClient::TKeyValueRequest::TCmdPatch::TDiff::DATA_NOT_SET: { + TStringStream str; + str << "KeyValue# " << TabletId; + str << " Missing Value/PayloadId in CmdPatch(" << patchIdx << ") Diff(" << diffIdx << ") Marker# KV38"; + ReplyError(ctx, str.Str(), NMsgBusProxy::MSTATUS_INTERNALERROR, NKikimrKeyValue::Statuses::RSTATUS_INTERNAL_ERROR, intermediate); + return true; + } + } + interm.Diffs[diffIdx].Offset = diff.GetOffset(); + } + + ui32 storageChannelIdx = BLOB_CHANNEL; + if (request.HasStorageChannel()) { + auto storageChannel = request.GetStorageChannel(); + ui32 storageChannelOffset = (ui32)storageChannel; + + if (storageChannelOffset == NKikimrClient::TKeyValueRequest::INLINE) { + TStringStream str; + str << "KeyValue# " << TabletId; + str << " Patching blob can't be store in inline channel CmdPatch(" << patchIdx << ") Marker# KV91"; + ReplyError(ctx, str.Str(), NMsgBusProxy::MSTATUS_INTERNALERROR, NKikimrKeyValue::Statuses::RSTATUS_INTERNAL_ERROR, intermediate); + return true; + } + + storageChannelIdx = storageChannelOffset + BLOB_CHANNEL; + ui32 endChannel = info->Channels.size(); + if (storageChannelIdx >= endChannel) { + storageChannelIdx = BLOB_CHANNEL; + LOG_INFO_S(ctx, NKikimrServices::KEYVALUE, "KeyValue# " << TabletId + << " CmdPatch StorageChannel# " << storageChannelOffset + << " does not exist, using MAIN"); + } + } + + interm.OriginalBlobId = indexRecord.Chain[0].LogoBlobId; + interm.PatchedBlobId = TLogoBlobID(0, 0, 0, storageChannelIdx, interm.OriginalBlobId.BlobSize(), 0); + } + return false; +} + TKeyValueState::TPrepareResult TKeyValueState::InitGetStatusCommand(TIntermediate::TGetStatus &cmd, NKikimrClient::TKeyValueRequest::EStorageChannel storageChannel, const TTabletStorageInfo *info) @@ -2899,13 +3083,27 @@ void TKeyValueState::RegisterRequestActor(const TActorContext &ctx, THolderRequestUid); + ui32 newRefCount = ++RefCounts[patch.PatchedBlobId]; + Y_ABORT_UNLESS(newRefCount == 1); + intermediate->RefCountsIncr.emplace_back(patch.PatchedBlobId, true); + }; + for (auto& write : intermediate->Writes) { fixWrite(write); } + for (auto& patch : intermediate->Patches) { + fixPatch(patch); + } for (auto& cmd : intermediate->Commands) { if (auto *write = std::get_if(&cmd)) { fixWrite(*write); } + if (auto *patch = std::get_if(&cmd)) { + fixPatch(*patch); + } } ctx.RegisterWithSameMailbox(CreateKeyValueStorageRequest(std::move(intermediate), info, tabletGeneration)); @@ -3202,6 +3400,7 @@ bool TKeyValueState::PrepareIntermediate(TEvKeyValue::TEvRequest::TPtr &ev, THol error = error || PrepareCmdConcat(ctx, request, intermediate); error = error || PrepareCmdDelete(ctx, request, intermediate); error = error || PrepareCmdWrite(ctx, request, *ev->Get(), intermediate, info); + error = error || PrepareCmdPatch(ctx, request, *ev->Get(), intermediate, info); error = error || PrepareCmdGetStatus(ctx, request, intermediate, info); error = error || PrepareCmdTrimLeakedBlobs(ctx, request, intermediate, info); error = error || PrepareCmdSetExecutorFastLogPolicy(ctx, request, intermediate, info); diff --git a/ydb/core/keyvalue/keyvalue_state.h b/ydb/core/keyvalue/keyvalue_state.h index 0b4241dda105..cca24582fbad 100644 --- a/ydb/core/keyvalue/keyvalue_state.h +++ b/ydb/core/keyvalue/keyvalue_state.h @@ -347,6 +347,10 @@ class TKeyValueState { NKikimrClient::TKeyValueResponse::TWriteResult *legacyResponse, NKikimrKeyValue::StorageChannel *response, ISimpleDb &db, const TActorContext &ctx, TRequestStat &stat, ui64 unixTime, TIntermediate *intermediate); + void ProcessCmd(TIntermediate::TPatch &request, + NKikimrClient::TKeyValueResponse::TPatchResult *legacyResponse, + NKikimrKeyValue::StorageChannel *response, + ISimpleDb &db, const TActorContext &ctx, TRequestStat &stat, ui64 unixTime, TIntermediate *intermediate); void ProcessCmd(const TIntermediate::TDelete &request, NKikimrClient::TKeyValueResponse::TDeleteRangeResult *legacyResponse, NKikimrKeyValue::StorageChannel *response, @@ -386,6 +390,7 @@ class TKeyValueState { TCheckResult CheckCmd(const TIntermediate::TRename &cmd, TKeySet& keys, ui32 index) const; TCheckResult CheckCmd(const TIntermediate::TDelete &cmd, TKeySet& keys, ui32 index) const; TCheckResult CheckCmd(const TIntermediate::TWrite &cmd, TKeySet& keys, ui32 index) const; + TCheckResult CheckCmd(const TIntermediate::TPatch &cmd, TKeySet& keys, ui32 index) const; TCheckResult CheckCmd(const TIntermediate::TCopyRange &cmd, TKeySet& keys, ui32 index) const; TCheckResult CheckCmd(const TIntermediate::TConcat &cmd, TKeySet& keys, ui32 index) const; TCheckResult CheckCmd(const TIntermediate::TGetStatus &cmd, TKeySet& keys, ui32 index) const; @@ -398,6 +403,7 @@ class TKeyValueState { void Step(); TLogoBlobID AllocateLogoBlobId(ui32 size, ui32 storageChannelIdx, ui64 requestUid); + TLogoBlobID AllocatePatchedLogoBlobId(ui32 size, ui32 storageChannelIdx, TLogoBlobID originalBlobId, ui64 requestUid); TIntrusivePtr& GetCollectOperation() { return CollectOperation; } @@ -507,6 +513,8 @@ class TKeyValueState { THolder &intermediate); bool PrepareCmdWrite(const TActorContext &ctx, NKikimrClient::TKeyValueRequest &kvRequest, TEvKeyValue::TEvRequest& ev, THolder &intermediate, const TTabletStorageInfo *info); + bool PrepareCmdPatch(const TActorContext &ctx, NKikimrClient::TKeyValueRequest &kvRequest, TEvKeyValue::TEvRequest& ev, + THolder &intermediate, const TTabletStorageInfo *info); bool PrepareCmdGetStatus(const TActorContext &ctx, NKikimrClient::TKeyValueRequest &kvRequest, THolder &intermediate, const TTabletStorageInfo *info); bool PrepareCmdCopyRange(const TActorContext& ctx, NKikimrClient::TKeyValueRequest& kvRequest, diff --git a/ydb/core/keyvalue/keyvalue_storage_request.cpp b/ydb/core/keyvalue/keyvalue_storage_request.cpp index 9aa0a83cdfa1..b3e5bce941ca 100644 --- a/ydb/core/keyvalue/keyvalue_storage_request.cpp +++ b/ydb/core/keyvalue/keyvalue_storage_request.cpp @@ -17,19 +17,21 @@ namespace NKeyValue { class TKeyValueStorageRequest : public TActorBootstrapped { using TBase = TActorBootstrapped; - ui64 ReadRequestsSent; - ui64 ReadRequestsReplied; - ui64 WriteRequestsSent; - ui64 WriteRequestsReplied; - ui64 GetStatusRequestsSent; - ui64 GetStatusRequestsReplied; - ui64 RangeRequestsSent; - ui64 RangeRequestsReplied; - ui64 InFlightLimit; + ui64 ReadRequestsSent = 0; + ui64 ReadRequestsReplied = 0; + ui64 WriteRequestsSent = 0; + ui64 WriteRequestsReplied = 0; + ui64 PatchRequestsSent = 0; + ui64 PatchRequestsReplied = 0; + ui64 GetStatusRequestsSent = 0; + ui64 GetStatusRequestsReplied = 0; + ui64 RangeRequestsSent = 0; + ui64 RangeRequestsReplied = 0; + ui64 InFlightLimit = 50; ui64 InFlightLimitSeq; - ui64 InFlightQueries; - ui64 InFlightRequestsLimit; - ui64 NextInFlightBatchCookie; + ui64 InFlightQueries = 0; + ui64 InFlightRequestsLimit = 10; + ui64 NextInFlightBatchCookie = 1; ui32 TabletGeneration; THolder IntermediateResults; @@ -72,19 +74,7 @@ class TKeyValueStorageRequest : public TActorBootstrapped&& intermediate, const TTabletStorageInfo *tabletInfo, ui32 tabletGeneration) - : ReadRequestsSent(0) - , ReadRequestsReplied(0) - , WriteRequestsSent(0) - , WriteRequestsReplied(0) - , GetStatusRequestsSent(0) - , GetStatusRequestsReplied(0) - , RangeRequestsSent(0) - , RangeRequestsReplied(0) - , InFlightLimit(50) - , InFlightLimitSeq(intermediate->SequentialReadLimit) - , InFlightQueries(0) - , InFlightRequestsLimit(10) - , NextInFlightBatchCookie(1) + : InFlightLimitSeq(intermediate->SequentialReadLimit) , TabletGeneration(tabletGeneration) , IntermediateResults(std::move(intermediate)) , TabletInfo(const_cast(tabletInfo)) @@ -152,6 +142,7 @@ class TKeyValueStorageRequest : public TActorBootstrappedTabletID; str << " EvPut cookie# " << (ui64)cookie; str << " > writes#" << (ui32)IntermediateResults->Writes.size(); + str << " > commands#" << (ui32)IntermediateResults->Commands.size(); str << " Marker# KV25"; ReplyErrorAndDie(ctx, str.Str()); return; @@ -171,6 +162,63 @@ class TKeyValueStorageRequest : public TActorBootstrappedGet()->GroupId; + CheckYellow(ev->Get()->StatusFlags, groupId); + + NKikimrProto::EReplyStatus status = ev->Get()->Status; + if (status != NKikimrProto::OK) { + Cerr << "Patch Not OK response!\n" << NKikimrProto::EReplyStatus_Name(status) << Endl; + Cerr << ev->Get()->ErrorReason << Endl; + TInstant now = TAppData::TimeProvider->Now(); + + TStringStream str; + str << "KeyValue# " << TabletInfo->TabletID; + str << " Unexpected EvPatch result# " << NKikimrProto::EReplyStatus_Name(status).data(); + str << " Deadline# " << IntermediateResults->Deadline.MilliSeconds(); + str << " Now# " << now.MilliSeconds(); + str << " GotAt# " << IntermediateResults->Stat.IntermediateCreatedAt.MilliSeconds(); + str << " LastSuccess# " << LastSuccess; + str << " SinceLastSuccess# " << (now - LastSuccess).MilliSeconds(); + str << " EnqueuedAs# " << IntermediateResults->Stat.EnqueuedAs; + str << " ErrorReason# " << ev->Get()->ErrorReason; + str << " Marker# KV70"; + + NLog::EPriority logPriority = ErrorStateMuteChecker.Register(now, MuteDuration); + + ReplyErrorAndDie(ctx, str.Str(), + status == NKikimrProto::TIMEOUT ? NMsgBusProxy::MSTATUS_TIMEOUT : NMsgBusProxy::MSTATUS_INTERNALERROR, + logPriority); + return; + } + + Cerr << "Patch OK response!\n"; + ui64 cookie = ev->Cookie; + ui64 patchIdx = cookie; + if (patchIdx >= IntermediateResults->Patches.size() && patchIdx >= IntermediateResults->Commands.size()) { + TStringStream str; + str << "KeyValue# " << TabletInfo->TabletID; + str << " EvPatch cookie# " << (ui64)cookie; + str << " > writes#" << (ui32)IntermediateResults->Patches.size(); + str << " > commands#" << (ui32)IntermediateResults->Commands.size(); + str << " Marker# KV71"; + ReplyErrorAndDie(ctx, str.Str()); + return; + } + + TIntermediate::TPatch *patch = nullptr; + if (IntermediateResults->Patches.size()) { + patch = &IntermediateResults->Patches[patchIdx]; + } else { + patch = &std::get(IntermediateResults->Commands[patchIdx]); + } + patch->StatusFlags.Merge(ev->Get()->StatusFlags.Raw); + ++PatchRequestsReplied; + IntermediateResults->Stat.GroupWrittenBytes[std::make_pair(ev->Get()->Id.Channel(), groupId)] += ev->Get()->Id.BlobSize(); + IntermediateResults->Stat.GroupWrittenIops[std::make_pair(ev->Get()->Id.Channel(), groupId)] += 1; + UpdateRequest(ctx); + } + void Handle(TEvBlobStorage::TEvStatusResult::TPtr &ev, const TActorContext &ctx) { TInstant now = TAppData::TimeProvider->Now(); ui64 durationMs = (now - GetStatusSentAt).MilliSeconds(); @@ -350,28 +398,41 @@ class TKeyValueStorageRequest : public TActorBootstrappedWrites) { if (write.Status == NKikimrProto::UNKNOWN) { write.Status = NKikimrProto::OK; } } + for (auto& patch : IntermediateResults->Patches) { + if (patch.Status == NKikimrProto::UNKNOWN) { + patch.Status = NKikimrProto::OK; + } + } for (auto& getStatus : IntermediateResults->GetStatuses) { if (getStatus.Status == NKikimrProto::UNKNOWN) { getStatus.Status = NKikimrProto::OK; } } for (auto& cmd : IntermediateResults->Commands) { - if (!std::holds_alternative(cmd)) { - continue; + if (std::holds_alternative(cmd)) { + auto& write = std::get(cmd); + if (write.Status == NKikimrProto::UNKNOWN) { + write.Status = NKikimrProto::OK; + } } - auto& write = std::get(cmd); - if (write.Status == NKikimrProto::UNKNOWN) { - write.Status = NKikimrProto::OK; + if (std::holds_alternative(cmd)) { + auto& patch = std::get(cmd); + if (patch.Status == NKikimrProto::UNKNOWN) { + patch.Status = NKikimrProto::OK; + } } } IntermediateResults->Stat.YellowStopChannels.reserve(YellowStopChannels.size()); @@ -453,6 +514,7 @@ class TKeyValueStorageRequest : public TActorBootstrapped void { + using Type = std::decay_t; + if constexpr (std::is_same_v) { + + ui32 originalGroupId = TabletInfo->GroupFor(request.OriginalBlobId.Channel(), request.OriginalBlobId.Generation()); + + TArrayHolder diffs(new TEvBlobStorage::TEvPatch::TDiff[request.Diffs.size()]); + for (ui32 diffIdx = 0; diffIdx < request.Diffs.size(); ++diffIdx) { + auto &diff = request.Diffs[diffIdx]; + diffs[diffIdx].Buffer = TRcBuf(diff.Buffer); + diffs[diffIdx].Offset = diff.Offset; + } + + THolder patch( + new TEvBlobStorage::TEvPatch( + originalGroupId, request.OriginalBlobId, request.PatchedBlobId, TLogoBlobID::MaxCookie, + std::move(diffs), request.Diffs.size(), IntermediateResults->Deadline)); + + const ui32 groupId = TabletInfo->GroupFor(request.PatchedBlobId.Channel(), request.PatchedBlobId.Generation()); + Y_VERIFY_S(groupId != Max(), "Patch Blob# " << request.PatchedBlobId.ToString() << " is mapped to an invalid group (-1)!"); + LOG_DEBUG_S(ctx, NKikimrServices::KEYVALUE, "KeyValue# " << TabletInfo->TabletID + << " Send TEvPatch# " << patch->ToString() << " to groupId# " << groupId + << " now# " << TAppData::TimeProvider->Now().MilliSeconds() << " Marker# KV69"); + + + SendPatchToGroup(ctx, groupId, TabletInfo.Get(), std::move(patch), i, Span.GetTraceId()); + ++PatchRequestsSent; + } + }; + + for (ui64 i : IntermediateResults->PatchIndices) { + auto &cmd = IntermediateResults->Commands[i]; + Y_ABORT_UNLESS(std::holds_alternative(cmd)); + auto& patch = std::get(cmd); + sendPatch(i, patch); + } + + for (ui64 i = 0; i < IntermediateResults->Patches.size(); ++i) { + sendPatch(i, IntermediateResults->Patches[i]); + } + PutTimer.Reset(); + } + STFUNC(StateWait) { switch (ev->GetTypeRewrite()) { HFunc(TEvBlobStorage::TEvGetResult, Handle); HFunc(TEvBlobStorage::TEvPutResult, Handle); + HFunc(TEvBlobStorage::TEvPatchResult, Handle); HFunc(TEvBlobStorage::TEvStatusResult, Handle); HFunc(TEvBlobStorage::TEvRangeResult, Handle); HFunc(TEvents::TEvWakeup, Handle); diff --git a/ydb/core/keyvalue/keyvalue_ut.cpp b/ydb/core/keyvalue/keyvalue_ut.cpp index 27a5a79020a8..4a3663ed254a 100644 --- a/ydb/core/keyvalue/keyvalue_ut.cpp +++ b/ydb/core/keyvalue/keyvalue_ut.cpp @@ -1,6 +1,7 @@ #include "defs.h" #include "keyvalue.h" #include "keyvalue_flat_impl.h" +#include "keyvalue_intermediate.h" #include "keyvalue_state.h" #include #include @@ -177,6 +178,44 @@ void CmdWrite(const TDeque &keys, const TDeque &values, }); } +struct TDiff { + ui32 Offset; + TString Buffer; +}; + +void CmdPatch(const TString &originalKey, const TString &patchedKey, const TVector &diffs, + const NKikimrClient::TKeyValueRequest::EStorageChannel storageChannel, TTestContext &tc) { + TAutoPtr handle; + TEvKeyValue::TEvResponse *result; + THolder request; + DoWithRetry([&] { + tc.Runtime->ResetScheduledCount(); + request.Reset(new TEvKeyValue::TEvRequest); + auto *patch = request->Record.AddCmdPatch(); + patch->SetOriginalKey(originalKey); + patch->SetPatchedKey(patchedKey); + patch->SetStorageChannel(storageChannel); + for (ui64 idx = 0; idx < diffs.size(); ++idx) { + auto diff = patch->AddDiffs(); + diff->SetOffset(diffs[idx].Offset); + diff->SetValue(diffs[idx].Buffer); + } + tc.Runtime->SendToPipe(tc.TabletId, tc.Edge, request.Release(), 0, GetPipeConfigWithRetries()); + result = tc.Runtime->GrabEdgeEvent(handle); + UNIT_ASSERT(result); + UNIT_ASSERT(result->Record.HasStatus()); + UNIT_ASSERT_EQUAL(result->Record.GetStatus(), NMsgBusProxy::MSTATUS_OK); + UNIT_ASSERT_VALUES_EQUAL(result->Record.PatchResultSize(), 1); + + const auto &patchResult = result->Record.GetPatchResult(0); + UNIT_ASSERT(patchResult.HasStatus()); + UNIT_ASSERT_EQUAL(patchResult.GetStatus(), NKikimrProto::OK); + UNIT_ASSERT(patchResult.HasStatusFlags()); + UNIT_ASSERT(patchResult.GetStatusFlags() & ui32(NKikimrBlobStorage::StatusIsValid)); + return true; + }); +} + void CmdWrite(const TString &key, const TString &value, const NKikimrClient::TKeyValueRequest::EStorageChannel storageChannel, const NKikimrClient::TKeyValueRequest::EPriority priority, TTestContext &tc) { @@ -961,6 +1000,20 @@ Y_UNIT_TEST(TestWriteReadDeleteWithRestartsThenResponseOk) { }); } +Y_UNIT_TEST(TestWriteReadPatchRead) { + TTestContext tc; + TFinalizer finalizer(tc); + bool activeZone = false; + tc.Prepare(INITIAL_TEST_DISPATCH_NAME, [](TTestActorRuntime &){}, activeZone); + CmdWrite("key", "value", NKikimrClient::TKeyValueRequest::MAIN, + NKikimrClient::TKeyValueRequest::REALTIME, tc); + CmdRead({"key"}, NKikimrClient::TKeyValueRequest::REALTIME, + {"value"}, {}, tc); + TVector diffs = {TDiff{0, "m"}, TDiff{2, "t"}, TDiff{4, "r"}}; + CmdPatch("key", "key2", diffs, NKikimrClient::TKeyValueRequest::MAIN, tc); + CmdRead({"key2"}, NKikimrClient::TKeyValueRequest::REALTIME, + {"matur"}, {}, tc); +} Y_UNIT_TEST(TestWriteReadDeleteWithRestartsAndCatchCollectGarbageEvents) { TTestContext tc; diff --git a/ydb/core/protos/msgbus.proto b/ydb/core/protos/msgbus.proto index 93042908fd1d..e55ea8973dde 100644 --- a/ydb/core/protos/msgbus.proto +++ b/ydb/core/protos/msgbus.proto @@ -207,6 +207,7 @@ message TResponse { repeated TKeyValueResponse.TGetStatusResult GetStatusResult = 909; optional TKeyValueResponse.TTrimLeakedBlobsResult TrimLeakedBlobsResult = 910; optional TKeyValueResponse.TSetExecutorFastLogPolicyResult SetExecutorFastLogPolicyResult = 911; + repeated TKeyValueResponse.TPatchResult PatchResult = 912; // THiveCreateTablet message TCreateTabletResult { diff --git a/ydb/core/protos/msgbus_kv.proto b/ydb/core/protos/msgbus_kv.proto index c8317d9d4c01..d85c2b2c5cb4 100644 --- a/ydb/core/protos/msgbus_kv.proto +++ b/ydb/core/protos/msgbus_kv.proto @@ -85,6 +85,20 @@ message TKeyValueRequest { message TCmdSetExecutorFastLogPolicy { optional bool IsAllowed = 1 [default = false]; // mandatory } + message TCmdPatch { + message TDiff { + optional uint32 Offset = 1 [default = 0]; + oneof Data { // mandatory + bytes Value = 2; + uint32 PayloadId = 3; + } + } + + optional bytes OriginalKey = 1; + optional bytes PatchedKey = 2; + repeated TDiff Diffs = 3; + optional EStorageChannel StorageChannel = 4; // (default = MAIN) + } optional uint64 TabletId = 1; // mandatory optional uint64 Generation = 2; // optional, no generation check is done if missing @@ -101,6 +115,7 @@ message TKeyValueRequest { repeated TCmdGetStatus CmdGetStatus = 13; optional TCmdTrimLeakedBlobs CmdTrimLeakedBlobs = 14; optional TCmdSetExecutorFastLogPolicy CmdSetExecutorFastLogPolicy = 15; + repeated TCmdPatch CmdPatch = 17; optional uint64 DeadlineInstantMs = 10; } @@ -146,6 +161,10 @@ message TKeyValueResponse { optional uint32 Status = 1; // EReplyStatus from ydb/core/protos/base.proto optional uint32 StatusFlags = 2; // A set of flags from EStatusFlags ydb/core/protos/blobstorage.proto } + message TPatchResult { + optional uint32 Status = 1; // EReplyStatus from ydb/core/protos/base.proto + optional uint32 StatusFlags = 2; // A set of flags from EStatusFlags ydb/core/protos/blobstorage.proto + } message TRenameResult { optional uint32 Status = 1; // EReplyStatus from ydb/core/protos/base.proto } @@ -181,6 +200,7 @@ message TKeyValueResponse { repeated TGetStatusResult GetStatusResult = 12; optional TTrimLeakedBlobsResult TrimLeakedBlobsResult = 13; optional TSetExecutorFastLogPolicyResult SetExecutorFastLogPolicyResult = 14; + repeated TPatchResult PatchResult = 15; optional string ErrorReason = 9; // When present contains human-readable error description }