Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TIdWrapper for bs_controller and blobstorage #4966

Merged
merged 2 commits into from
Jun 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 9 additions & 9 deletions ydb/core/base/blobstorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,14 @@ bool operator<(const TPDiskCategory x, const TPDiskCategory y) {
}

std::unique_ptr<TEvBlobStorage::TEvPutResult> TEvBlobStorage::TEvPut::MakeErrorResponse(
NKikimrProto::EReplyStatus status, const TString& errorReason, ui32 groupId) {
NKikimrProto::EReplyStatus status, const TString& errorReason, TGroupId groupId) {
auto res = std::make_unique<TEvPutResult>(status, Id, TStorageStatusFlags(), groupId, 0.0f);
res->ErrorReason = errorReason;
return res;
}

std::unique_ptr<TEvBlobStorage::TEvGetResult> TEvBlobStorage::TEvGet::MakeErrorResponse(
NKikimrProto::EReplyStatus status, const TString& errorReason, ui32 groupId) {
NKikimrProto::EReplyStatus status, const TString& errorReason, TGroupId groupId) {
auto res = std::make_unique<TEvGetResult>(status, QuerySize, groupId);
for (ui32 i = 0; i < QuerySize; ++i) {
const auto& from = Queries[i];
Expand All @@ -68,14 +68,14 @@ std::unique_ptr<TEvBlobStorage::TEvGetResult> TEvBlobStorage::TEvGet::MakeErrorR
}

std::unique_ptr<TEvBlobStorage::TEvBlockResult> TEvBlobStorage::TEvBlock::MakeErrorResponse(
NKikimrProto::EReplyStatus status, const TString& errorReason, ui32 /*groupId*/) {
NKikimrProto::EReplyStatus status, const TString& errorReason, TGroupId /*groupId*/) {
auto res = std::make_unique<TEvBlockResult>(status);
res->ErrorReason = errorReason;
return res;
}

std::unique_ptr<TEvBlobStorage::TEvPatchResult> TEvBlobStorage::TEvPatch::MakeErrorResponse(
NKikimrProto::EReplyStatus status, const TString& errorReason, ui32 groupId) {
NKikimrProto::EReplyStatus status, const TString& errorReason, TGroupId groupId) {
auto res = std::make_unique<TEvPatchResult>(status, PatchedId, TStorageStatusFlags(), groupId, 0.0f);
res->ErrorReason = errorReason;
return res;
Expand All @@ -89,35 +89,35 @@ std::unique_ptr<TEvBlobStorage::TEvInplacePatchResult> TEvBlobStorage::TEvInplac
}

std::unique_ptr<TEvBlobStorage::TEvDiscoverResult> TEvBlobStorage::TEvDiscover::MakeErrorResponse(
NKikimrProto::EReplyStatus status, const TString& errorReason, ui32 /*groupId*/) {
NKikimrProto::EReplyStatus status, const TString& errorReason, TGroupId/*groupId*/) {
auto res = std::make_unique<TEvDiscoverResult>(status, MinGeneration, 0);
res->ErrorReason = errorReason;
return res;
}

std::unique_ptr<TEvBlobStorage::TEvRangeResult> TEvBlobStorage::TEvRange::MakeErrorResponse(
NKikimrProto::EReplyStatus status, const TString& errorReason, ui32 groupId) {
NKikimrProto::EReplyStatus status, const TString& errorReason, TGroupId groupId) {
auto res = std::make_unique<TEvRangeResult>(status, From, To, groupId);
res->ErrorReason = errorReason;
return res;
}

std::unique_ptr<TEvBlobStorage::TEvCollectGarbageResult> TEvBlobStorage::TEvCollectGarbage::MakeErrorResponse(
NKikimrProto::EReplyStatus status, const TString& errorReason, ui32 /*groupId*/) {
NKikimrProto::EReplyStatus status, const TString& errorReason, TGroupId /*groupId*/) {
auto res = std::make_unique<TEvCollectGarbageResult>(status, TabletId, RecordGeneration, PerGenerationCounter, Channel);
res->ErrorReason = errorReason;
return res;
}

std::unique_ptr<TEvBlobStorage::TEvStatusResult> TEvBlobStorage::TEvStatus::MakeErrorResponse(
NKikimrProto::EReplyStatus status, const TString& errorReason, ui32 /*groupId*/) {
NKikimrProto::EReplyStatus status, const TString& errorReason, TGroupId /*groupId*/) {
auto res = std::make_unique<TEvStatusResult>(status, TStorageStatusFlags());
res->ErrorReason = errorReason;
return res;
}

std::unique_ptr<TEvBlobStorage::TEvAssimilateResult> TEvBlobStorage::TEvAssimilate::MakeErrorResponse(
NKikimrProto::EReplyStatus status, const TString& errorReason, ui32 /*groupId*/) {
NKikimrProto::EReplyStatus status, const TString& errorReason, TGroupId/*groupId*/) {
return std::make_unique<TEvBlobStorage::TEvAssimilateResult>(status, errorReason);
}

Expand Down
52 changes: 34 additions & 18 deletions ydb/core/base/blobstorage.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include <ydb/core/base/services/blobstorage_service_id.h>
#include <ydb/core/base/blobstorage_grouptype.h>
#include <ydb/core/protos/base.pb.h>
#include <ydb/core/base/blobstorage_common.h>
#include <ydb/core/protos/blobstorage_base.pb.h>
#include <ydb/core/protos/blobstorage_base3.pb.h>
#include <ydb/library/yverify_stream/yverify_stream.h>
Expand All @@ -33,6 +34,7 @@ static constexpr ui64 MaxCollectGarbageFlagsPerMessage = 10000;
static constexpr TDuration VDiskCooldownTimeout = TDuration::Seconds(15);
static constexpr TDuration VDiskCooldownTimeoutOnProxy = TDuration::Seconds(12);


struct TStorageStatusFlags {
ui32 Raw = 0;

Expand Down Expand Up @@ -102,6 +104,8 @@ enum class EGroupConfigurationType : ui32 {
struct TGroupID {
TGroupID() = default;
TGroupID(const TGroupID&) = default;
TGroupID(const TGroupId wrappedId)
: Raw(wrappedId.GetRawId()) {}

TGroupID(EGroupConfigurationType configurationType, ui32 dataCenterId, ui32 groupLocalId) {
Set(configurationType, dataCenterId, groupLocalId);
Expand Down Expand Up @@ -437,6 +441,10 @@ inline IEventHandle *CreateEventForBSProxy(TActorId sender, ui32 groupId, IEvent
return CreateEventForBSProxy(sender, MakeBlobStorageProxyID(groupId), ev, cookie, std::move(traceId));
}

inline IEventHandle *CreateEventForBSProxy(TActorId sender, TGroupId groupId, IEventBase *ev, ui64 cookie, NWilson::TTraceId traceId = {}) {
return CreateEventForBSProxy(sender, MakeBlobStorageProxyID(groupId), ev, cookie, std::move(traceId));
}

inline bool SendToBSProxy(TActorId sender, TActorId recipient, IEventBase *ev, ui64 cookie = 0, NWilson::TTraceId traceId = {}) {
return TActivationContext::Send(CreateEventForBSProxy(sender, recipient, ev, cookie, std::move(traceId)));
}
Expand All @@ -455,6 +463,15 @@ inline bool SendToBSProxy(const TActorContext &ctx, ui32 groupId, IEventBase *ev
return ctx.Send(CreateEventForBSProxy(ctx.SelfID, groupId, ev, cookie, std::move(traceId)));
}

inline bool SendToBSProxy(TActorId sender, TGroupId groupId, IEventBase *ev, ui64 cookie = 0, NWilson::TTraceId traceId = {}) {
return TActivationContext::Send(CreateEventForBSProxy(sender, groupId, ev, cookie, std::move(traceId)));
}

inline bool SendToBSProxy(const TActorContext &ctx, TGroupId groupId, IEventBase *ev, ui64 cookie = 0,
NWilson::TTraceId traceId = {}) {
return ctx.Send(CreateEventForBSProxy(ctx.SelfID, groupId, ev, cookie, std::move(traceId)));
}

struct TEvBlobStorage {
enum EEv {
// user <-> proxy interface
Expand Down Expand Up @@ -908,7 +925,6 @@ struct TEvBlobStorage {
return "unknown";
}
};

const TLogoBlobID Id;
const TRcBuf Buffer; //FIXME(innokentii) const members prevent usage of move-semantics elsewhere
const TInstant Deadline;
Expand Down Expand Up @@ -978,14 +994,14 @@ struct TEvBlobStorage {
}

std::unique_ptr<TEvPutResult> MakeErrorResponse(NKikimrProto::EReplyStatus status, const TString& errorReason,
ui32 groupId);
TGroupId groupId);
};

struct TEvPutResult : public TEventLocal<TEvPutResult, EvPutResult> {
NKikimrProto::EReplyStatus Status;
const TLogoBlobID Id;
const TStorageStatusFlags StatusFlags;
const ui32 GroupId;
const TGroupId GroupId;
const float ApproximateFreeSpaceShare; // 0.f has special meaning 'data could not be obtained'
TString ErrorReason;
bool WrittenBeyondBarrier = false; // was this blob written beyond the barrier?
Expand All @@ -994,7 +1010,7 @@ struct TEvBlobStorage {
const TString StorageId;

TEvPutResult(NKikimrProto::EReplyStatus status, const TLogoBlobID &id, const TStorageStatusFlags statusFlags,
ui32 groupId, float approximateFreeSpaceShare, const TString& storageId = Default<TString>())
TGroupId groupId, float approximateFreeSpaceShare, const TString& storageId = Default<TString>())
: Status(status)
, Id(id)
, StatusFlags(statusFlags)
Expand Down Expand Up @@ -1171,7 +1187,7 @@ struct TEvBlobStorage {
}

std::unique_ptr<TEvGetResult> MakeErrorResponse(NKikimrProto::EReplyStatus status, const TString& errorReason,
ui32 groupId);
TGroupId groupId);

private:
void VerifySameTabletId() const {
Expand Down Expand Up @@ -1215,7 +1231,7 @@ struct TEvBlobStorage {
// todo: replace with queue-like thing
ui32 ResponseSz;
TArrayHolder<TResponse> Responses;
const ui32 GroupId;
const TGroupId GroupId;
ui32 BlockedGeneration = 0; // valid only for requests with non-zero TabletId and true AcquireBlockedGeneration.
TString DebugInfo;
TString ErrorReason;
Expand All @@ -1225,7 +1241,7 @@ struct TEvBlobStorage {
// to measure blobstorage->client hop
TInstant Sent;

TEvGetResult(NKikimrProto::EReplyStatus status, ui32 sz, ui32 groupId)
TEvGetResult(NKikimrProto::EReplyStatus status, ui32 sz, TGroupId groupId)
: Status(status)
, ResponseSz(sz)
, Responses(sz == 0 ? nullptr : new TResponse[sz])
Expand Down Expand Up @@ -1317,7 +1333,7 @@ struct TEvBlobStorage {
}

std::unique_ptr<TEvBlockResult> MakeErrorResponse(NKikimrProto::EReplyStatus status, const TString& errorReason,
ui32 groupId);
TGroupId groupId);
};

struct TEvBlockResult : public TEventLocal<TEvBlockResult, EvBlockResult> {
Expand Down Expand Up @@ -1514,21 +1530,21 @@ struct TEvBlobStorage {
}

std::unique_ptr<TEvPatchResult> MakeErrorResponse(NKikimrProto::EReplyStatus status,
const TString& errorReason, ui32 groupId);
const TString& errorReason, TGroupId groupId);
};

struct TEvPatchResult : public TEventLocal<TEvPatchResult, EvPatchResult> {
NKikimrProto::EReplyStatus Status;
const TLogoBlobID Id;
const TStorageStatusFlags StatusFlags;
const ui32 GroupId;
const TGroupId GroupId;
const float ApproximateFreeSpaceShare; // 0.f has special meaning 'data could not be obtained'
TString ErrorReason;
mutable NLWTrace::TOrbit Orbit;
std::shared_ptr<TExecutionRelay> ExecutionRelay;

TEvPatchResult(NKikimrProto::EReplyStatus status, const TLogoBlobID &id, TStorageStatusFlags statusFlags,
ui32 groupId, float approximateFreeSpaceShare)
TGroupId groupId, float approximateFreeSpaceShare)
: Status(status)
, Id(id)
, StatusFlags(statusFlags)
Expand Down Expand Up @@ -1691,7 +1707,7 @@ struct TEvBlobStorage {
}

std::unique_ptr<TEvDiscoverResult> MakeErrorResponse(NKikimrProto::EReplyStatus status, const TString& errorReason,
ui32 groupId);
TGroupId groupId);
};

struct TEvDiscoverResult : public TEventLocal<TEvDiscoverResult, EvDiscoverResult> {
Expand Down Expand Up @@ -1796,7 +1812,7 @@ struct TEvBlobStorage {
}

std::unique_ptr<TEvRangeResult> MakeErrorResponse(NKikimrProto::EReplyStatus status, const TString& errorReason,
ui32 groupId);
TGroupId groupId);
};

struct TEvRangeResult : public TEventLocal<TEvRangeResult, EvRangeResult> {
Expand All @@ -1822,11 +1838,11 @@ struct TEvBlobStorage {
TLogoBlobID To;

TVector<TResponse> Responses;
const ui32 GroupId;
const TGroupId GroupId;
TString ErrorReason;
std::shared_ptr<TExecutionRelay> ExecutionRelay;

TEvRangeResult(NKikimrProto::EReplyStatus status, const TLogoBlobID &from, const TLogoBlobID &to, ui32 groupId)
TEvRangeResult(NKikimrProto::EReplyStatus status, const TLogoBlobID &from, const TLogoBlobID &to, TGroupId groupId)
: Status(status)
, From(from)
, To(to)
Expand Down Expand Up @@ -1984,7 +2000,7 @@ struct TEvBlobStorage {
}

std::unique_ptr<TEvCollectGarbageResult> MakeErrorResponse(NKikimrProto::EReplyStatus status, const TString& errorReason,
ui32 groupId);
TGroupId groupId);
};

struct TEvCollectGarbageResult : public TEventLocal<TEvCollectGarbageResult, EvCollectGarbageResult> {
Expand Down Expand Up @@ -2052,7 +2068,7 @@ struct TEvBlobStorage {
}

std::unique_ptr<TEvStatusResult> MakeErrorResponse(NKikimrProto::EReplyStatus status, const TString& errorReason,
ui32 groupId);
TGroupId groupId);
};

struct TEvStatusResult : public TEventLocal<TEvStatusResult, EvStatusResult> {
Expand Down Expand Up @@ -2127,7 +2143,7 @@ struct TEvBlobStorage {
}

std::unique_ptr<TEvAssimilateResult> MakeErrorResponse(NKikimrProto::EReplyStatus status, const TString& errorReason,
ui32 groupId);
TGroupId groupId);
};

struct TEvAssimilateResult : TEventLocal<TEvAssimilateResult, EvAssimilateResult> {
Expand Down
11 changes: 11 additions & 0 deletions ydb/core/base/blobstorage_common.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
#pragma once

#include <ydb/core/base/id_wrapper.h>

namespace NKikimr{

using TGroupId = TIdWrapper<ui32, TGroupIdTag>;

}


98 changes: 98 additions & 0 deletions ydb/core/base/id_wrapper.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
#pragma once
#include "defs.h"
#include <compare>
#include <concepts>
#include <util/generic/string.h>
#include <util/string/builder.h>
using TString = TBasicString<char>;

class TGroupIdTag;

template <typename T, typename Tag> class TIdWrapper {
private:
T Raw = {};

public:
using Type = T;
using TTag = Tag;

constexpr TIdWrapper() noexcept {}

TIdWrapper(TIdWrapper &&value) = default;

TIdWrapper(const TIdWrapper &other) = default;

TIdWrapper &operator=(const TIdWrapper &value) = default;

TIdWrapper &operator=(TIdWrapper &&value) = default;

TString ToString() const { return TStringBuilder() << Raw; }

void CopyToProto(NProtoBuf::Message *message,
void (NProtoBuf::Message::*pfn)(T value)) {
(message->*pfn)(*this);
}

static constexpr TIdWrapper FromValue(T value) noexcept {
TIdWrapper id;
id.Raw = value;
return id;
}

template <typename TType, typename TProto>
static constexpr TIdWrapper FromProto(const TType *message,
TProto (TType::*pfn)() const) {
return FromValue((message->*pfn)());
}

static constexpr TIdWrapper Zero() noexcept { return TIdWrapper(); }

TIdWrapper &operator+=(const T &other) {
Raw += other.Raw;
return *this;
}

friend TIdWrapper operator+(const TIdWrapper &first,
const T &second) Y_WARN_UNUSED_RESULT {
return TIdWrapper(first->Raw + second);
}

TIdWrapper &operator++() {
Raw++;
return *this;
}

TIdWrapper operator++(int) {
TIdWrapper old = *this;
operator++();
return old;
}

friend std::ostream &operator<<(std::ostream &out, TIdWrapper &id) {
return out << id.Raw;
}

friend IOutputStream &operator<<(IOutputStream &out, const TIdWrapper &id) {
return out << id.Raw;
}

constexpr auto operator<=>(const TIdWrapper &) const = default;

T GetRawId() const { return Raw; }

friend std::hash<TIdWrapper<T, Tag>>;

friend THash<TIdWrapper<T, Tag>>;
};

template <typename T, typename Tag> struct std::hash<TIdWrapper<T, Tag>> {
std::size_t operator()(const TIdWrapper<T, Tag> &id) const {
return std::hash<T>{}(id.Raw);
}
};

template <typename T, typename Tag> struct THash<TIdWrapper<T, Tag>> {
std::size_t operator()(const TIdWrapper<T, Tag> &id) const {
return THash<T>()(id.Raw);
}
};
Loading
Loading