Skip to content

Commit

Permalink
Added TIdWrapper class for BS_Controller
Browse files Browse the repository at this point in the history
Added TIdWrapper class for BS_Controller

Added hash for IdWrapper

Small fix

Small changes

Start testing

Wrapped TGroupId

Implemented TGroupId partly in bscontroller, blobstorage and dsproxy

Minor changes in TGroupId

Minor changes

Added IdWrapper for bscontroller, partly for blobstorage, nodewarden, dsproxy
  • Loading branch information
mregrock authored and Egor Kulin committed May 29, 2024
1 parent 05603fe commit 145e89c
Show file tree
Hide file tree
Showing 105 changed files with 683 additions and 516 deletions.
18 changes: 9 additions & 9 deletions ydb/core/base/blobstorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,14 @@ bool operator<(const TPDiskCategory x, const TPDiskCategory y) {
}

std::unique_ptr<TEvBlobStorage::TEvPutResult> TEvBlobStorage::TEvPut::MakeErrorResponse(
NKikimrProto::EReplyStatus status, const TString& errorReason, ui32 groupId) {
NKikimrProto::EReplyStatus status, const TString& errorReason, TGroupId groupId) {
auto res = std::make_unique<TEvPutResult>(status, Id, TStorageStatusFlags(), groupId, 0.0f);
res->ErrorReason = errorReason;
return res;
}

std::unique_ptr<TEvBlobStorage::TEvGetResult> TEvBlobStorage::TEvGet::MakeErrorResponse(
NKikimrProto::EReplyStatus status, const TString& errorReason, ui32 groupId) {
NKikimrProto::EReplyStatus status, const TString& errorReason, TGroupId groupId) {
auto res = std::make_unique<TEvGetResult>(status, QuerySize, groupId);
for (ui32 i = 0; i < QuerySize; ++i) {
const auto& from = Queries[i];
Expand All @@ -68,14 +68,14 @@ std::unique_ptr<TEvBlobStorage::TEvGetResult> TEvBlobStorage::TEvGet::MakeErrorR
}

std::unique_ptr<TEvBlobStorage::TEvBlockResult> TEvBlobStorage::TEvBlock::MakeErrorResponse(
NKikimrProto::EReplyStatus status, const TString& errorReason, ui32 /*groupId*/) {
NKikimrProto::EReplyStatus status, const TString& errorReason, TGroupId /*groupId*/) {
auto res = std::make_unique<TEvBlockResult>(status);
res->ErrorReason = errorReason;
return res;
}

std::unique_ptr<TEvBlobStorage::TEvPatchResult> TEvBlobStorage::TEvPatch::MakeErrorResponse(
NKikimrProto::EReplyStatus status, const TString& errorReason, ui32 groupId) {
NKikimrProto::EReplyStatus status, const TString& errorReason, TGroupId groupId) {
auto res = std::make_unique<TEvPatchResult>(status, PatchedId, TStorageStatusFlags(), groupId, 0.0f);
res->ErrorReason = errorReason;
return res;
Expand All @@ -89,35 +89,35 @@ std::unique_ptr<TEvBlobStorage::TEvInplacePatchResult> TEvBlobStorage::TEvInplac
}

std::unique_ptr<TEvBlobStorage::TEvDiscoverResult> TEvBlobStorage::TEvDiscover::MakeErrorResponse(
NKikimrProto::EReplyStatus status, const TString& errorReason, ui32 /*groupId*/) {
NKikimrProto::EReplyStatus status, const TString& errorReason, TGroupId/*groupId*/) {
auto res = std::make_unique<TEvDiscoverResult>(status, MinGeneration, 0);
res->ErrorReason = errorReason;
return res;
}

std::unique_ptr<TEvBlobStorage::TEvRangeResult> TEvBlobStorage::TEvRange::MakeErrorResponse(
NKikimrProto::EReplyStatus status, const TString& errorReason, ui32 groupId) {
NKikimrProto::EReplyStatus status, const TString& errorReason, TGroupId groupId) {
auto res = std::make_unique<TEvRangeResult>(status, From, To, groupId);
res->ErrorReason = errorReason;
return res;
}

std::unique_ptr<TEvBlobStorage::TEvCollectGarbageResult> TEvBlobStorage::TEvCollectGarbage::MakeErrorResponse(
NKikimrProto::EReplyStatus status, const TString& errorReason, ui32 /*groupId*/) {
NKikimrProto::EReplyStatus status, const TString& errorReason, TGroupId /*groupId*/) {
auto res = std::make_unique<TEvCollectGarbageResult>(status, TabletId, RecordGeneration, PerGenerationCounter, Channel);
res->ErrorReason = errorReason;
return res;
}

std::unique_ptr<TEvBlobStorage::TEvStatusResult> TEvBlobStorage::TEvStatus::MakeErrorResponse(
NKikimrProto::EReplyStatus status, const TString& errorReason, ui32 /*groupId*/) {
NKikimrProto::EReplyStatus status, const TString& errorReason, TGroupId /*groupId*/) {
auto res = std::make_unique<TEvStatusResult>(status, TStorageStatusFlags());
res->ErrorReason = errorReason;
return res;
}

std::unique_ptr<TEvBlobStorage::TEvAssimilateResult> TEvBlobStorage::TEvAssimilate::MakeErrorResponse(
NKikimrProto::EReplyStatus status, const TString& errorReason, ui32 /*groupId*/) {
NKikimrProto::EReplyStatus status, const TString& errorReason, TGroupId/*groupId*/) {
return std::make_unique<TEvBlobStorage::TEvAssimilateResult>(status, errorReason);
}

Expand Down
42 changes: 23 additions & 19 deletions ydb/core/base/blobstorage.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include <ydb/core/base/services/blobstorage_service_id.h>
#include <ydb/core/base/blobstorage_grouptype.h>
#include <ydb/core/protos/base.pb.h>
#include <ydb/core/base/id_wrapper.h>
#include <ydb/core/protos/blobstorage_base.pb.h>
#include <ydb/core/protos/blobstorage_base3.pb.h>
#include <ydb/library/yverify_stream/yverify_stream.h>
Expand Down Expand Up @@ -102,6 +103,8 @@ enum class EGroupConfigurationType : ui32 {
struct TGroupID {
TGroupID() = default;
TGroupID(const TGroupID&) = default;
TGroupID(const TIdWrapper<ui32, TGroupIdTag> wrapped_id)
: Raw(wrapped_id.GetRawId()) {}

TGroupID(EGroupConfigurationType configurationType, ui32 dataCenterId, ui32 groupLocalId) {
Set(configurationType, dataCenterId, groupLocalId);
Expand Down Expand Up @@ -886,6 +889,8 @@ struct TEvBlobStorage {
struct TEvInplacePatchResult;
struct TEvAssimilateResult;

using TGroupId = TIdWrapper<ui32, TGroupIdTag>;

struct TEvPut : public TEventLocal<TEvPut, EvPut> {
enum ETactic {
TacticMaxThroughput = 0,
Expand All @@ -905,7 +910,6 @@ struct TEvBlobStorage {
return "unknown";
}
};

const TLogoBlobID Id;
const TRcBuf Buffer; //FIXME(innokentii) const members prevent usage of move-semantics elsewhere
const TInstant Deadline;
Expand Down Expand Up @@ -975,14 +979,14 @@ struct TEvBlobStorage {
}

std::unique_ptr<TEvPutResult> MakeErrorResponse(NKikimrProto::EReplyStatus status, const TString& errorReason,
ui32 groupId);
TGroupId groupId);
};

struct TEvPutResult : public TEventLocal<TEvPutResult, EvPutResult> {
NKikimrProto::EReplyStatus Status;
const TLogoBlobID Id;
const TStorageStatusFlags StatusFlags;
const ui32 GroupId;
const TGroupId GroupId;
const float ApproximateFreeSpaceShare; // 0.f has special meaning 'data could not be obtained'
TString ErrorReason;
bool WrittenBeyondBarrier = false; // was this blob written beyond the barrier?
Expand All @@ -991,7 +995,7 @@ struct TEvBlobStorage {
const TString StorageId;

TEvPutResult(NKikimrProto::EReplyStatus status, const TLogoBlobID &id, const TStorageStatusFlags statusFlags,
ui32 groupId, float approximateFreeSpaceShare, const TString& storageId = Default<TString>())
TGroupId groupId, float approximateFreeSpaceShare, const TString& storageId = Default<TString>())
: Status(status)
, Id(id)
, StatusFlags(statusFlags)
Expand Down Expand Up @@ -1168,7 +1172,7 @@ struct TEvBlobStorage {
}

std::unique_ptr<TEvGetResult> MakeErrorResponse(NKikimrProto::EReplyStatus status, const TString& errorReason,
ui32 groupId);
TGroupId groupId);

private:
void VerifySameTabletId() const {
Expand Down Expand Up @@ -1212,7 +1216,7 @@ struct TEvBlobStorage {
// todo: replace with queue-like thing
ui32 ResponseSz;
TArrayHolder<TResponse> Responses;
const ui32 GroupId;
const TGroupId GroupId;
ui32 BlockedGeneration = 0; // valid only for requests with non-zero TabletId and true AcquireBlockedGeneration.
TString DebugInfo;
TString ErrorReason;
Expand All @@ -1222,7 +1226,7 @@ struct TEvBlobStorage {
// to measure blobstorage->client hop
TInstant Sent;

TEvGetResult(NKikimrProto::EReplyStatus status, ui32 sz, ui32 groupId)
TEvGetResult(NKikimrProto::EReplyStatus status, ui32 sz, TGroupId groupId)
: Status(status)
, ResponseSz(sz)
, Responses(sz == 0 ? nullptr : new TResponse[sz])
Expand Down Expand Up @@ -1314,7 +1318,7 @@ struct TEvBlobStorage {
}

std::unique_ptr<TEvBlockResult> MakeErrorResponse(NKikimrProto::EReplyStatus status, const TString& errorReason,
ui32 groupId);
TGroupId groupId);
};

struct TEvBlockResult : public TEventLocal<TEvBlockResult, EvBlockResult> {
Expand Down Expand Up @@ -1511,21 +1515,21 @@ struct TEvBlobStorage {
}

std::unique_ptr<TEvPatchResult> MakeErrorResponse(NKikimrProto::EReplyStatus status,
const TString& errorReason, ui32 groupId);
const TString& errorReason, TGroupId groupId);
};

struct TEvPatchResult : public TEventLocal<TEvPatchResult, EvPatchResult> {
NKikimrProto::EReplyStatus Status;
const TLogoBlobID Id;
const TStorageStatusFlags StatusFlags;
const ui32 GroupId;
const TGroupId GroupId;
const float ApproximateFreeSpaceShare; // 0.f has special meaning 'data could not be obtained'
TString ErrorReason;
mutable NLWTrace::TOrbit Orbit;
std::shared_ptr<TExecutionRelay> ExecutionRelay;

TEvPatchResult(NKikimrProto::EReplyStatus status, const TLogoBlobID &id, TStorageStatusFlags statusFlags,
ui32 groupId, float approximateFreeSpaceShare)
TGroupId groupId, float approximateFreeSpaceShare)
: Status(status)
, Id(id)
, StatusFlags(statusFlags)
Expand Down Expand Up @@ -1688,7 +1692,7 @@ struct TEvBlobStorage {
}

std::unique_ptr<TEvDiscoverResult> MakeErrorResponse(NKikimrProto::EReplyStatus status, const TString& errorReason,
ui32 groupId);
TGroupId groupId);
};

struct TEvDiscoverResult : public TEventLocal<TEvDiscoverResult, EvDiscoverResult> {
Expand Down Expand Up @@ -1793,7 +1797,7 @@ struct TEvBlobStorage {
}

std::unique_ptr<TEvRangeResult> MakeErrorResponse(NKikimrProto::EReplyStatus status, const TString& errorReason,
ui32 groupId);
TGroupId groupId);
};

struct TEvRangeResult : public TEventLocal<TEvRangeResult, EvRangeResult> {
Expand All @@ -1817,13 +1821,13 @@ struct TEvBlobStorage {
NKikimrProto::EReplyStatus Status;
TLogoBlobID From;
TLogoBlobID To;

TVector<TResponse> Responses;
const ui32 GroupId;
const TGroupId GroupId;
TString ErrorReason;
std::shared_ptr<TExecutionRelay> ExecutionRelay;

TEvRangeResult(NKikimrProto::EReplyStatus status, const TLogoBlobID &from, const TLogoBlobID &to, ui32 groupId)
TEvRangeResult(NKikimrProto::EReplyStatus status, const TLogoBlobID &from, const TLogoBlobID &to, TGroupId groupId)
: Status(status)
, From(from)
, To(to)
Expand Down Expand Up @@ -1981,7 +1985,7 @@ struct TEvBlobStorage {
}

std::unique_ptr<TEvCollectGarbageResult> MakeErrorResponse(NKikimrProto::EReplyStatus status, const TString& errorReason,
ui32 groupId);
TGroupId groupId);
};

struct TEvCollectGarbageResult : public TEventLocal<TEvCollectGarbageResult, EvCollectGarbageResult> {
Expand Down Expand Up @@ -2049,7 +2053,7 @@ struct TEvBlobStorage {
}

std::unique_ptr<TEvStatusResult> MakeErrorResponse(NKikimrProto::EReplyStatus status, const TString& errorReason,
ui32 groupId);
TGroupId groupId);
};

struct TEvStatusResult : public TEventLocal<TEvStatusResult, EvStatusResult> {
Expand Down Expand Up @@ -2124,7 +2128,7 @@ struct TEvBlobStorage {
}

std::unique_ptr<TEvAssimilateResult> MakeErrorResponse(NKikimrProto::EReplyStatus status, const TString& errorReason,
ui32 groupId);
TGroupId groupId);
};

struct TEvAssimilateResult : TEventLocal<TEvAssimilateResult, EvAssimilateResult> {
Expand Down
99 changes: 99 additions & 0 deletions ydb/core/base/id_wrapper.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
#pragma once
#include "defs.h"
#include <compare>
#include <concepts>
#include <util/generic/string.h>
#include <util/string/builder.h>
using TString = TBasicString<char>;

class TGroupIdTag;
template <class T>
concept IntegralType = std::is_integral<T>::value;

template <class T>
concept StringType = std::convertible_to<T, TString>;
template <typename T, typename Tag> class TIdWrapper {
private:
T Raw;

public:
using Type = T;
using TTag = Tag;

constexpr TIdWrapper() noexcept : Raw(0) {}

~TIdWrapper() = default;

explicit TIdWrapper(const T &value) : Raw(value) {}

TIdWrapper(TIdWrapper &&value) = default;

TIdWrapper(const TIdWrapper &other) = default;

TIdWrapper &operator=(const TIdWrapper &value) = default;

TIdWrapper &operator=(TIdWrapper &&value) = default;

TString ToString() const {
return TStringBuilder() << Raw;
}

void CopyToProto(NProtoBuf::Message *message,
void (NProtoBuf::Message::*pfn)(T value)) {
(message->*pfn)(*this);
}

static constexpr TIdWrapper FromValue(T value) noexcept {
return TIdWrapper(value);
}

static constexpr TIdWrapper Zero() noexcept { return TIdWrapper(); }

TIdWrapper &operator+=(const T &other) {
Raw += other.Raw;
return *this;
}

friend TIdWrapper operator+(const TIdWrapper &first,
const T &second) Y_WARN_UNUSED_RESULT {
return TIdWrapper(first->Raw + second);
}

TIdWrapper &operator++() {
Raw++;
return *this;
}

TIdWrapper operator++(int) {
TIdWrapper old = *this;
operator++();
return old;
}

friend std::ostream &operator<<(std::ostream &out, TIdWrapper &id) {
return out << id.Raw;
}

friend IOutputStream& operator<<(IOutputStream& out, const TIdWrapper& id) {
return out << id.Raw;
}

constexpr auto operator<=>(const TIdWrapper &) const = default;

T GetRawId() const { return Raw; }

friend std::hash<TIdWrapper<T, Tag>>;
friend THash<TIdWrapper<T, Tag>>;
};

template <typename T, typename Tag> struct std::hash<TIdWrapper<T, Tag>> {
std::size_t operator()(const TIdWrapper<T, Tag> &id) const {
return std::hash<T>{}(id.Raw);
}
};

template <typename T, typename Tag> struct THash<TIdWrapper<T, Tag>> {
std::size_t operator()(const TIdWrapper<T, Tag> &id) const {
return THash<T>()(id.Raw);
}
};
1 change: 1 addition & 0 deletions ydb/core/blob_depot/agent/agent_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include "resolved_value.h"

#include <ydb/core/protos/blob_depot_config.pb.h>
#include <ydb/core/base/id_wrapper.h>

namespace NKikimr::NBlobDepot {

Expand Down
2 changes: 1 addition & 1 deletion ydb/core/blob_depot/agent/proxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ namespace NKikimr::NBlobDepot {
switch (const ui32 type = event->Type()) {
case TEvBlobStorage::EvGet: {
auto& get = static_cast<TEvBlobStorage::TEvGet&>(*event);
response = get.MakeErrorResponse(NKikimrProto::OK, "proxy has vanished", groupId);
response = get.MakeErrorResponse(NKikimrProto::OK, "proxy has vanished", TIdWrapper<ui32, TGroupIdTag>(groupId));
auto& r = static_cast<TEvBlobStorage::TEvGetResult&>(*response);
for (size_t i = 0; i < r.ResponseSz; ++i) {
r.Responses[i].Status = NKikimrProto::NODATA;
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/blob_depot/agent/query.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ namespace NKikimr::NBlobDepot {
switch (Event->GetTypeRewrite()) {
#define XX(TYPE) \
case TEvBlobStorage::TYPE: \
response = Event->Get<TEvBlobStorage::T##TYPE>()->MakeErrorResponse(status, errorReason, Agent.VirtualGroupId); \
response = Event->Get<TEvBlobStorage::T##TYPE>()->MakeErrorResponse(status, errorReason, TIdWrapper<ui32, TGroupIdTag>(Agent.VirtualGroupId)); \
static_cast<TEvBlobStorage::T##TYPE##Result&>(*response).ExecutionRelay = std::move(ExecutionRelay); \
break; \
//
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/blob_depot/agent/storage_get.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ namespace NKikimr::NBlobDepot {
}

Response = std::make_unique<TEvBlobStorage::TEvGetResult>(NKikimrProto::OK, Request.QuerySize,
Agent.VirtualGroupId);
TIdWrapper<ui32, TGroupIdTag>(Agent.VirtualGroupId));
AnswersRemain = Request.QuerySize;

if (Request.ReaderTabletData) {
Expand Down
Loading

0 comments on commit 145e89c

Please sign in to comment.