Skip to content

Commit

Permalink
Merge ab4b0a4 into bc5b8c1
Browse files Browse the repository at this point in the history
  • Loading branch information
kruall authored Feb 6, 2024
2 parents bc5b8c1 + ab4b0a4 commit ff12862
Show file tree
Hide file tree
Showing 10 changed files with 517 additions and 32 deletions.
15 changes: 15 additions & 0 deletions ydb/core/base/blobstorage.h
Original file line number Diff line number Diff line change
Expand Up @@ -2407,4 +2407,19 @@ inline bool SendPutToGroup(const TActorContext &ctx, ui32 groupId, TTabletStorag
// TODO(alexvru): check if return status is actually needed?
}

inline bool SendPatchToGroup(const TActorContext &ctx, ui32 groupId, TTabletStorageInfo *storage,
THolder<TEvBlobStorage::TEvPatch> event, ui64 cookie = 0, NWilson::TTraceId traceId = {}) {
auto checkGroupId = [&] {
const TLogoBlobID &id = event->PatchedId;
const ui32 expectedGroupId = storage->GroupFor(id.Channel(), id.Generation());
const TLogoBlobID &originalId = event->OriginalId;
const ui32 expectedOriginalGroupId = storage->GroupFor(originalId.Channel(), originalId.Generation());
return id.TabletID() == storage->TabletID && expectedGroupId != Max<ui32>() && groupId == expectedGroupId && event->OriginalGroupId == expectedOriginalGroupId;
};
Y_VERIFY_S(checkGroupId(), "groupIds# (" << event->OriginalGroupId << ',' << groupId << ") does not match actual ones LogoBlobIds# (" <<
event->OriginalId.ToString() << ',' << event->PatchedId.ToString() << ')');
return SendToBSProxy(ctx, groupId, event.Release(), cookie, std::move(traceId));
// TODO(alexvru): check if return status is actually needed?
}

} // NKikimr
6 changes: 6 additions & 0 deletions ydb/core/blobstorage/dsproxy/mock/dsproxy_mock.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,11 @@ namespace NKikimr {
"not implemented")), 0, ev->Cookie);
}

void Handle(TEvBlobStorage::TEvPatch::TPtr& ev) {
STLOG(PRI_DEBUG, BS_PROXY, BSPM10, "TEvPatch", (Msg, ev->Get()->ToString()));
Send(ev->Sender, CopyExecutionRelay(ev->Get(), Model->Handle(ev->Get())), 0, ev->Cookie);
}

template<typename TOut, typename TIn>
TOut *CopyExecutionRelay(TIn *in, TOut *out) {
out->ExecutionRelay = std::move(in->ExecutionRelay);
Expand All @@ -80,6 +85,7 @@ namespace NKikimr {
hFunc(TEvBlobStorage::TEvRange, Handle);
hFunc(TEvBlobStorage::TEvCollectGarbage, Handle);
hFunc(TEvBlobStorage::TEvStatus, Handle);
hFunc(TEvBlobStorage::TEvPatch, Handle);

hFunc(TEvents::TEvPoisonPill, HandlePoison);
hFunc(TEvBlobStorage::TEvConfigureProxy, Handle);
Expand Down
59 changes: 59 additions & 0 deletions ydb/core/blobstorage/dsproxy/mock/model.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#pragma once

#include <cstring>
#include <ydb/core/base/blobstorage.h>
#include <ydb/core/blobstorage/pdisk/blobstorage_pdisk_util_space_color.h>

Expand Down Expand Up @@ -149,6 +150,64 @@ namespace NFake {
return result.release();
}

TEvBlobStorage::TEvPatchResult* Handle(TEvBlobStorage::TEvPatch *msg) {
// ensure we have full blob id, with PartId set to zero
const TLogoBlobID& id = msg->PatchedId;
Y_ABORT_UNLESS(id == id.FullID());

// validate put against set blocks
if (IsBlocked(id.TabletID(), id.Generation())) {
return new TEvBlobStorage::TEvPatchResult(NKikimrProto::BLOCKED, id, GetStorageStatusFlags(), GroupId, 0.f);
}

// check if this blob is not being collected -- writing such blob is a violation of BS contract
Y_ABORT_UNLESS(!IsCollectedByBarrier(id), "Id# %s", id.ToString().data());


const TLogoBlobID& originalId = msg->OriginalId;
auto it = Blobs.find(originalId);
if (it == Blobs.end()) {
// ensure this blob is not under GC
Y_ABORT_UNLESS(!IsCollectedByBarrier(id), "Id# %s", id.ToString().data());
return new TEvBlobStorage::TEvPatchResult(NKikimrProto::ERROR, id, GetStorageStatusFlags(), GroupId, 0.f);
}

auto& data = it->second;
// TODO(kruall): check bad diffs
TString buffer = TString::Uninitialized(data.Buffer.GetSize());
auto originalBuffer = data.Buffer.GetContiguousSpan();
memcpy(buffer.begin(), originalBuffer.data(), buffer.size());
for (ui32 diffIdx = 0; diffIdx < msg->DiffCount; ++diffIdx) {
auto &diff = msg->Diffs[diffIdx];
auto diffBuffer = diff.Buffer.GetContiguousSpan();
memcpy(buffer.begin() + diff.Offset, diffBuffer.data(), diffBuffer.size());
}


// validate that there are no blobs with the same gen/step, channel, cookie, but with different size
const TLogoBlobID base(id.TabletID(), id.Generation(), id.Step(), id.Channel(), 0, id.Cookie());
auto iter = Blobs.lower_bound(base);
if (iter != Blobs.end()) {
const TLogoBlobID& existing = iter->first;
Y_ABORT_UNLESS(
id.TabletID() != existing.TabletID() ||
id.Generation() != existing.Generation() ||
id.Step() != existing.Step() ||
id.Cookie() != existing.Cookie() ||
id.Channel() != existing.Channel() ||
id == existing,
"id# %s existing# %s", id.ToString().data(), existing.ToString().data());
if (id == existing) {
Y_ABORT_UNLESS(iter->second.Buffer == buffer);
}
}

// put an entry into logo blobs database and reply with success
Blobs.emplace(id, TRope(buffer));

return new TEvBlobStorage::TEvPatchResult(NKikimrProto::OK, id, GetStorageStatusFlags(), GroupId, 0.f);
}

TEvBlobStorage::TEvBlockResult* Handle(TEvBlobStorage::TEvBlock *msg) {
NKikimrProto::EReplyStatus status = NKikimrProto::OK;

Expand Down
20 changes: 19 additions & 1 deletion ydb/core/keyvalue/keyvalue_intermediate.h
Original file line number Diff line number Diff line change
Expand Up @@ -102,13 +102,30 @@ struct TIntermediate {
struct TSetExecutorFastLogPolicy {
bool IsAllowed;
};
struct TPatch {
struct TDiff {
ui32 Offset;
TRope Buffer;
};

TString OriginalKey;
TLogoBlobID OriginalBlobId;
TString PatchedKey;
TLogoBlobID PatchedBlobId;

NKikimrProto::EReplyStatus Status;
TStorageStatusFlags StatusFlags;

TVector<TDiff> Diffs;
};

using TCmd = std::variant<TWrite, TDelete, TRename, TCopyRange, TConcat>;
using TCmd = std::variant<TWrite, TDelete, TRename, TCopyRange, TConcat, TPatch>;
using TReadCmd = std::variant<TRead, TRangeRead>;

TDeque<TRead> Reads;
TDeque<TRangeRead> RangeReads;
TDeque<TWrite> Writes;
TDeque<TPatch> Patches;
TDeque<TDelete> Deletes;
TDeque<TRename> Renames;
TDeque<TCopyRange> CopyRanges;
Expand All @@ -120,6 +137,7 @@ struct TIntermediate {

TStackVec<TCmd, 1> Commands;
TStackVec<ui32, 1> WriteIndices;
TStackVec<ui32, 1> PatchIndices;
std::optional<TReadCmd> ReadCommand;

ui64 WriteCount = 0;
Expand Down
Loading

0 comments on commit ff12862

Please sign in to comment.