Skip to content

Commit

Permalink
Merge d0a470d into 2529557
Browse files Browse the repository at this point in the history
  • Loading branch information
mregrock authored Jun 14, 2024
2 parents 2529557 + d0a470d commit 558ffac
Show file tree
Hide file tree
Showing 119 changed files with 630 additions and 412 deletions.
18 changes: 9 additions & 9 deletions ydb/core/base/blobstorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,14 @@ bool operator<(const TPDiskCategory x, const TPDiskCategory y) {
}

std::unique_ptr<TEvBlobStorage::TEvPutResult> TEvBlobStorage::TEvPut::MakeErrorResponse(
NKikimrProto::EReplyStatus status, const TString& errorReason, ui32 groupId) {
NKikimrProto::EReplyStatus status, const TString& errorReason, TGroupId groupId) {
auto res = std::make_unique<TEvPutResult>(status, Id, TStorageStatusFlags(), groupId, 0.0f);
res->ErrorReason = errorReason;
return res;
}

std::unique_ptr<TEvBlobStorage::TEvGetResult> TEvBlobStorage::TEvGet::MakeErrorResponse(
NKikimrProto::EReplyStatus status, const TString& errorReason, ui32 groupId) {
NKikimrProto::EReplyStatus status, const TString& errorReason, TGroupId groupId) {
auto res = std::make_unique<TEvGetResult>(status, QuerySize, groupId);
for (ui32 i = 0; i < QuerySize; ++i) {
const auto& from = Queries[i];
Expand All @@ -68,14 +68,14 @@ std::unique_ptr<TEvBlobStorage::TEvGetResult> TEvBlobStorage::TEvGet::MakeErrorR
}

std::unique_ptr<TEvBlobStorage::TEvBlockResult> TEvBlobStorage::TEvBlock::MakeErrorResponse(
NKikimrProto::EReplyStatus status, const TString& errorReason, ui32 /*groupId*/) {
NKikimrProto::EReplyStatus status, const TString& errorReason, TGroupId /*groupId*/) {
auto res = std::make_unique<TEvBlockResult>(status);
res->ErrorReason = errorReason;
return res;
}

std::unique_ptr<TEvBlobStorage::TEvPatchResult> TEvBlobStorage::TEvPatch::MakeErrorResponse(
NKikimrProto::EReplyStatus status, const TString& errorReason, ui32 groupId) {
NKikimrProto::EReplyStatus status, const TString& errorReason, TGroupId groupId) {
auto res = std::make_unique<TEvPatchResult>(status, PatchedId, TStorageStatusFlags(), groupId, 0.0f);
res->ErrorReason = errorReason;
return res;
Expand All @@ -89,35 +89,35 @@ std::unique_ptr<TEvBlobStorage::TEvInplacePatchResult> TEvBlobStorage::TEvInplac
}

std::unique_ptr<TEvBlobStorage::TEvDiscoverResult> TEvBlobStorage::TEvDiscover::MakeErrorResponse(
NKikimrProto::EReplyStatus status, const TString& errorReason, ui32 /*groupId*/) {
NKikimrProto::EReplyStatus status, const TString& errorReason, TGroupId/*groupId*/) {
auto res = std::make_unique<TEvDiscoverResult>(status, MinGeneration, 0);
res->ErrorReason = errorReason;
return res;
}

std::unique_ptr<TEvBlobStorage::TEvRangeResult> TEvBlobStorage::TEvRange::MakeErrorResponse(
NKikimrProto::EReplyStatus status, const TString& errorReason, ui32 groupId) {
NKikimrProto::EReplyStatus status, const TString& errorReason, TGroupId groupId) {
auto res = std::make_unique<TEvRangeResult>(status, From, To, groupId);
res->ErrorReason = errorReason;
return res;
}

std::unique_ptr<TEvBlobStorage::TEvCollectGarbageResult> TEvBlobStorage::TEvCollectGarbage::MakeErrorResponse(
NKikimrProto::EReplyStatus status, const TString& errorReason, ui32 /*groupId*/) {
NKikimrProto::EReplyStatus status, const TString& errorReason, TGroupId /*groupId*/) {
auto res = std::make_unique<TEvCollectGarbageResult>(status, TabletId, RecordGeneration, PerGenerationCounter, Channel);
res->ErrorReason = errorReason;
return res;
}

std::unique_ptr<TEvBlobStorage::TEvStatusResult> TEvBlobStorage::TEvStatus::MakeErrorResponse(
NKikimrProto::EReplyStatus status, const TString& errorReason, ui32 /*groupId*/) {
NKikimrProto::EReplyStatus status, const TString& errorReason, TGroupId /*groupId*/) {
auto res = std::make_unique<TEvStatusResult>(status, TStorageStatusFlags());
res->ErrorReason = errorReason;
return res;
}

std::unique_ptr<TEvBlobStorage::TEvAssimilateResult> TEvBlobStorage::TEvAssimilate::MakeErrorResponse(
NKikimrProto::EReplyStatus status, const TString& errorReason, ui32 /*groupId*/) {
NKikimrProto::EReplyStatus status, const TString& errorReason, TGroupId/*groupId*/) {
return std::make_unique<TEvBlobStorage::TEvAssimilateResult>(status, errorReason);
}

Expand Down
56 changes: 36 additions & 20 deletions ydb/core/base/blobstorage.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include <ydb/core/base/services/blobstorage_service_id.h>
#include <ydb/core/base/blobstorage_grouptype.h>
#include <ydb/core/protos/base.pb.h>
#include <ydb/core/base/blobstorage_common.h>
#include <ydb/core/protos/blobstorage_base.pb.h>
#include <ydb/core/protos/blobstorage_base3.pb.h>
#include <ydb/library/yverify_stream/yverify_stream.h>
Expand All @@ -33,6 +34,7 @@ static constexpr ui64 MaxCollectGarbageFlagsPerMessage = 10000;
static constexpr TDuration VDiskCooldownTimeout = TDuration::Seconds(15);
static constexpr TDuration VDiskCooldownTimeoutOnProxy = TDuration::Seconds(12);


struct TStorageStatusFlags {
ui32 Raw = 0;

Expand Down Expand Up @@ -102,6 +104,8 @@ enum class EGroupConfigurationType : ui32 {
struct TGroupID {
TGroupID() = default;
TGroupID(const TGroupID&) = default;
TGroupID(const TGroupId wrappedId)
: Raw(wrappedId.GetRawId()) {}

TGroupID(EGroupConfigurationType configurationType, ui32 dataCenterId, ui32 groupLocalId) {
Set(configurationType, dataCenterId, groupLocalId);
Expand Down Expand Up @@ -437,6 +441,10 @@ inline IEventHandle *CreateEventForBSProxy(TActorId sender, ui32 groupId, IEvent
return CreateEventForBSProxy(sender, MakeBlobStorageProxyID(groupId), ev, cookie, std::move(traceId));
}

inline IEventHandle *CreateEventForBSProxy(TActorId sender, TGroupId groupId, IEventBase *ev, ui64 cookie, NWilson::TTraceId traceId = {}) {
return CreateEventForBSProxy(sender, MakeBlobStorageProxyID(groupId), ev, cookie, std::move(traceId));
}

inline bool SendToBSProxy(TActorId sender, TActorId recipient, IEventBase *ev, ui64 cookie = 0, NWilson::TTraceId traceId = {}) {
return TActivationContext::Send(CreateEventForBSProxy(sender, recipient, ev, cookie, std::move(traceId)));
}
Expand All @@ -455,6 +463,15 @@ inline bool SendToBSProxy(const TActorContext &ctx, ui32 groupId, IEventBase *ev
return ctx.Send(CreateEventForBSProxy(ctx.SelfID, groupId, ev, cookie, std::move(traceId)));
}

inline bool SendToBSProxy(TActorId sender, TGroupId groupId, IEventBase *ev, ui64 cookie = 0, NWilson::TTraceId traceId = {}) {
return TActivationContext::Send(CreateEventForBSProxy(sender, groupId, ev, cookie, std::move(traceId)));
}

inline bool SendToBSProxy(const TActorContext &ctx, TGroupId groupId, IEventBase *ev, ui64 cookie = 0,
NWilson::TTraceId traceId = {}) {
return ctx.Send(CreateEventForBSProxy(ctx.SelfID, groupId, ev, cookie, std::move(traceId)));
}

struct TEvBlobStorage {
enum EEv {
// user <-> proxy interface
Expand Down Expand Up @@ -888,7 +905,7 @@ struct TEvBlobStorage {
struct TEvPatchResult;
struct TEvInplacePatchResult;
struct TEvAssimilateResult;

struct TEvPut : public TEventLocal<TEvPut, EvPut> {
enum ETactic {
TacticMaxThroughput = 0,
Expand All @@ -908,7 +925,6 @@ struct TEvBlobStorage {
return "unknown";
}
};

const TLogoBlobID Id;
const TRcBuf Buffer; //FIXME(innokentii) const members prevent usage of move-semantics elsewhere
const TInstant Deadline;
Expand Down Expand Up @@ -978,14 +994,14 @@ struct TEvBlobStorage {
}

std::unique_ptr<TEvPutResult> MakeErrorResponse(NKikimrProto::EReplyStatus status, const TString& errorReason,
ui32 groupId);
TGroupId groupId);
};

struct TEvPutResult : public TEventLocal<TEvPutResult, EvPutResult> {
NKikimrProto::EReplyStatus Status;
const TLogoBlobID Id;
const TStorageStatusFlags StatusFlags;
const ui32 GroupId;
const TGroupId GroupId;
const float ApproximateFreeSpaceShare; // 0.f has special meaning 'data could not be obtained'
TString ErrorReason;
bool WrittenBeyondBarrier = false; // was this blob written beyond the barrier?
Expand All @@ -994,7 +1010,7 @@ struct TEvBlobStorage {
const TString StorageId;

TEvPutResult(NKikimrProto::EReplyStatus status, const TLogoBlobID &id, const TStorageStatusFlags statusFlags,
ui32 groupId, float approximateFreeSpaceShare, const TString& storageId = Default<TString>())
TGroupId groupId, float approximateFreeSpaceShare, const TString& storageId = Default<TString>())
: Status(status)
, Id(id)
, StatusFlags(statusFlags)
Expand Down Expand Up @@ -1171,7 +1187,7 @@ struct TEvBlobStorage {
}

std::unique_ptr<TEvGetResult> MakeErrorResponse(NKikimrProto::EReplyStatus status, const TString& errorReason,
ui32 groupId);
TGroupId groupId);

private:
void VerifySameTabletId() const {
Expand Down Expand Up @@ -1215,7 +1231,7 @@ struct TEvBlobStorage {
// todo: replace with queue-like thing
ui32 ResponseSz;
TArrayHolder<TResponse> Responses;
const ui32 GroupId;
const TGroupId GroupId;
ui32 BlockedGeneration = 0; // valid only for requests with non-zero TabletId and true AcquireBlockedGeneration.
TString DebugInfo;
TString ErrorReason;
Expand All @@ -1225,7 +1241,7 @@ struct TEvBlobStorage {
// to measure blobstorage->client hop
TInstant Sent;

TEvGetResult(NKikimrProto::EReplyStatus status, ui32 sz, ui32 groupId)
TEvGetResult(NKikimrProto::EReplyStatus status, ui32 sz, TGroupId groupId)
: Status(status)
, ResponseSz(sz)
, Responses(sz == 0 ? nullptr : new TResponse[sz])
Expand Down Expand Up @@ -1317,7 +1333,7 @@ struct TEvBlobStorage {
}

std::unique_ptr<TEvBlockResult> MakeErrorResponse(NKikimrProto::EReplyStatus status, const TString& errorReason,
ui32 groupId);
TGroupId groupId);
};

struct TEvBlockResult : public TEventLocal<TEvBlockResult, EvBlockResult> {
Expand Down Expand Up @@ -1514,21 +1530,21 @@ struct TEvBlobStorage {
}

std::unique_ptr<TEvPatchResult> MakeErrorResponse(NKikimrProto::EReplyStatus status,
const TString& errorReason, ui32 groupId);
const TString& errorReason, TGroupId groupId);
};

struct TEvPatchResult : public TEventLocal<TEvPatchResult, EvPatchResult> {
NKikimrProto::EReplyStatus Status;
const TLogoBlobID Id;
const TStorageStatusFlags StatusFlags;
const ui32 GroupId;
const TGroupId GroupId;
const float ApproximateFreeSpaceShare; // 0.f has special meaning 'data could not be obtained'
TString ErrorReason;
mutable NLWTrace::TOrbit Orbit;
std::shared_ptr<TExecutionRelay> ExecutionRelay;

TEvPatchResult(NKikimrProto::EReplyStatus status, const TLogoBlobID &id, TStorageStatusFlags statusFlags,
ui32 groupId, float approximateFreeSpaceShare)
TGroupId groupId, float approximateFreeSpaceShare)
: Status(status)
, Id(id)
, StatusFlags(statusFlags)
Expand Down Expand Up @@ -1691,7 +1707,7 @@ struct TEvBlobStorage {
}

std::unique_ptr<TEvDiscoverResult> MakeErrorResponse(NKikimrProto::EReplyStatus status, const TString& errorReason,
ui32 groupId);
TGroupId groupId);
};

struct TEvDiscoverResult : public TEventLocal<TEvDiscoverResult, EvDiscoverResult> {
Expand Down Expand Up @@ -1796,7 +1812,7 @@ struct TEvBlobStorage {
}

std::unique_ptr<TEvRangeResult> MakeErrorResponse(NKikimrProto::EReplyStatus status, const TString& errorReason,
ui32 groupId);
TGroupId groupId);
};

struct TEvRangeResult : public TEventLocal<TEvRangeResult, EvRangeResult> {
Expand All @@ -1820,13 +1836,13 @@ struct TEvBlobStorage {
NKikimrProto::EReplyStatus Status;
TLogoBlobID From;
TLogoBlobID To;

TVector<TResponse> Responses;
const ui32 GroupId;
const TGroupId GroupId;
TString ErrorReason;
std::shared_ptr<TExecutionRelay> ExecutionRelay;

TEvRangeResult(NKikimrProto::EReplyStatus status, const TLogoBlobID &from, const TLogoBlobID &to, ui32 groupId)
TEvRangeResult(NKikimrProto::EReplyStatus status, const TLogoBlobID &from, const TLogoBlobID &to, TGroupId groupId)
: Status(status)
, From(from)
, To(to)
Expand Down Expand Up @@ -1984,7 +2000,7 @@ struct TEvBlobStorage {
}

std::unique_ptr<TEvCollectGarbageResult> MakeErrorResponse(NKikimrProto::EReplyStatus status, const TString& errorReason,
ui32 groupId);
TGroupId groupId);
};

struct TEvCollectGarbageResult : public TEventLocal<TEvCollectGarbageResult, EvCollectGarbageResult> {
Expand Down Expand Up @@ -2052,7 +2068,7 @@ struct TEvBlobStorage {
}

std::unique_ptr<TEvStatusResult> MakeErrorResponse(NKikimrProto::EReplyStatus status, const TString& errorReason,
ui32 groupId);
TGroupId groupId);
};

struct TEvStatusResult : public TEventLocal<TEvStatusResult, EvStatusResult> {
Expand Down Expand Up @@ -2127,7 +2143,7 @@ struct TEvBlobStorage {
}

std::unique_ptr<TEvAssimilateResult> MakeErrorResponse(NKikimrProto::EReplyStatus status, const TString& errorReason,
ui32 groupId);
TGroupId groupId);
};

struct TEvAssimilateResult : TEventLocal<TEvAssimilateResult, EvAssimilateResult> {
Expand Down
96 changes: 96 additions & 0 deletions ydb/core/base/id_wrapper.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
#pragma once
#include "defs.h"
#include <compare>
#include <concepts>
#include <util/generic/string.h>
#include <util/string/builder.h>
using TString = TBasicString<char>;

class TGroupIdTag;

template <typename T, typename Tag> class TIdWrapper {
private:
T Raw = {};

public:
using Type = T;
using TTag = Tag;

constexpr TIdWrapper() noexcept {}

TIdWrapper(TIdWrapper &&value) = default;

TIdWrapper(const TIdWrapper &other) = default;

TIdWrapper &operator=(const TIdWrapper &value) = default;

TIdWrapper &operator=(TIdWrapper &&value) = default;

TString ToString() const { return TStringBuilder() << Raw; }

void CopyToProto(NProtoBuf::Message *message,
void (NProtoBuf::Message::*pfn)(T value)) {
(message->*pfn)(*this);
}

static constexpr TIdWrapper FromValue(T value) noexcept {
TIdWrapper id;
id.Raw = value;
return id;
}

template <typename TType, typename TProto>
static constexpr TIdWrapper FromProto(const TType *message,
TProto (TType::*pfn)() const) {
return FromValue((message->*pfn)());
}

static constexpr TIdWrapper Zero() noexcept { return TIdWrapper(); }

TIdWrapper &operator+=(const T &other) {
Raw += other.Raw;
return *this;
}

friend TIdWrapper operator+(const TIdWrapper &first,
const T &second) Y_WARN_UNUSED_RESULT {
return TIdWrapper(first->Raw + second);
}
TIdWrapper &operator++() {
Raw++;
return *this;
}

TIdWrapper operator++(int) {
TIdWrapper old = *this;
operator++();
return old;
}

friend std::ostream &operator<<(std::ostream &out, TIdWrapper &id) {
return out << id.Raw;
}

friend IOutputStream &operator<<(IOutputStream &out, const TIdWrapper &id) {
return out << id.Raw;
}

constexpr auto operator<=>(const TIdWrapper &) const = default;

T GetRawId() const { return Raw; }

friend std::hash<TIdWrapper<T, Tag>>;
friend THash<TIdWrapper<T, Tag>>;
};

template <typename T, typename Tag> struct std::hash<TIdWrapper<T, Tag>> {
std::size_t operator()(const TIdWrapper<T, Tag> &id) const {
return std::hash<T>{}(id.Raw);
}
};

template <typename T, typename Tag> struct THash<TIdWrapper<T, Tag>> {
std::size_t operator()(const TIdWrapper<T, Tag> &id) const {
return THash<T>()(id.Raw);
}
};
Loading

0 comments on commit 558ffac

Please sign in to comment.