Skip to content

Commit

Permalink
Merge 9e5dff8 into 7182df1
Browse files Browse the repository at this point in the history
  • Loading branch information
mregrock authored May 29, 2024
2 parents 7182df1 + 9e5dff8 commit a327d96
Show file tree
Hide file tree
Showing 89 changed files with 619 additions and 469 deletions.
18 changes: 9 additions & 9 deletions ydb/core/base/blobstorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,14 @@ bool operator<(const TPDiskCategory x, const TPDiskCategory y) {
}

std::unique_ptr<TEvBlobStorage::TEvPutResult> TEvBlobStorage::TEvPut::MakeErrorResponse(
NKikimrProto::EReplyStatus status, const TString& errorReason, ui32 groupId) {
NKikimrProto::EReplyStatus status, const TString& errorReason, TGroupId groupId) {
auto res = std::make_unique<TEvPutResult>(status, Id, TStorageStatusFlags(), groupId, 0.0f);
res->ErrorReason = errorReason;
return res;
}

std::unique_ptr<TEvBlobStorage::TEvGetResult> TEvBlobStorage::TEvGet::MakeErrorResponse(
NKikimrProto::EReplyStatus status, const TString& errorReason, ui32 groupId) {
NKikimrProto::EReplyStatus status, const TString& errorReason, TGroupId groupId) {
auto res = std::make_unique<TEvGetResult>(status, QuerySize, groupId);
for (ui32 i = 0; i < QuerySize; ++i) {
const auto& from = Queries[i];
Expand All @@ -68,14 +68,14 @@ std::unique_ptr<TEvBlobStorage::TEvGetResult> TEvBlobStorage::TEvGet::MakeErrorR
}

std::unique_ptr<TEvBlobStorage::TEvBlockResult> TEvBlobStorage::TEvBlock::MakeErrorResponse(
NKikimrProto::EReplyStatus status, const TString& errorReason, ui32 /*groupId*/) {
NKikimrProto::EReplyStatus status, const TString& errorReason, TGroupId /*groupId*/) {
auto res = std::make_unique<TEvBlockResult>(status);
res->ErrorReason = errorReason;
return res;
}

std::unique_ptr<TEvBlobStorage::TEvPatchResult> TEvBlobStorage::TEvPatch::MakeErrorResponse(
NKikimrProto::EReplyStatus status, const TString& errorReason, ui32 groupId) {
NKikimrProto::EReplyStatus status, const TString& errorReason, TGroupId groupId) {
auto res = std::make_unique<TEvPatchResult>(status, PatchedId, TStorageStatusFlags(), groupId, 0.0f);
res->ErrorReason = errorReason;
return res;
Expand All @@ -89,35 +89,35 @@ std::unique_ptr<TEvBlobStorage::TEvInplacePatchResult> TEvBlobStorage::TEvInplac
}

std::unique_ptr<TEvBlobStorage::TEvDiscoverResult> TEvBlobStorage::TEvDiscover::MakeErrorResponse(
NKikimrProto::EReplyStatus status, const TString& errorReason, ui32 /*groupId*/) {
NKikimrProto::EReplyStatus status, const TString& errorReason, TGroupId/*groupId*/) {
auto res = std::make_unique<TEvDiscoverResult>(status, MinGeneration, 0);
res->ErrorReason = errorReason;
return res;
}

std::unique_ptr<TEvBlobStorage::TEvRangeResult> TEvBlobStorage::TEvRange::MakeErrorResponse(
NKikimrProto::EReplyStatus status, const TString& errorReason, ui32 groupId) {
NKikimrProto::EReplyStatus status, const TString& errorReason, TGroupId groupId) {
auto res = std::make_unique<TEvRangeResult>(status, From, To, groupId);
res->ErrorReason = errorReason;
return res;
}

std::unique_ptr<TEvBlobStorage::TEvCollectGarbageResult> TEvBlobStorage::TEvCollectGarbage::MakeErrorResponse(
NKikimrProto::EReplyStatus status, const TString& errorReason, ui32 /*groupId*/) {
NKikimrProto::EReplyStatus status, const TString& errorReason, TGroupId /*groupId*/) {
auto res = std::make_unique<TEvCollectGarbageResult>(status, TabletId, RecordGeneration, PerGenerationCounter, Channel);
res->ErrorReason = errorReason;
return res;
}

std::unique_ptr<TEvBlobStorage::TEvStatusResult> TEvBlobStorage::TEvStatus::MakeErrorResponse(
NKikimrProto::EReplyStatus status, const TString& errorReason, ui32 /*groupId*/) {
NKikimrProto::EReplyStatus status, const TString& errorReason, TGroupId /*groupId*/) {
auto res = std::make_unique<TEvStatusResult>(status, TStorageStatusFlags());
res->ErrorReason = errorReason;
return res;
}

std::unique_ptr<TEvBlobStorage::TEvAssimilateResult> TEvBlobStorage::TEvAssimilate::MakeErrorResponse(
NKikimrProto::EReplyStatus status, const TString& errorReason, ui32 /*groupId*/) {
NKikimrProto::EReplyStatus status, const TString& errorReason, TGroupId/*groupId*/) {
return std::make_unique<TEvBlobStorage::TEvAssimilateResult>(status, errorReason);
}

Expand Down
42 changes: 23 additions & 19 deletions ydb/core/base/blobstorage.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include <ydb/core/base/services/blobstorage_service_id.h>
#include <ydb/core/base/blobstorage_grouptype.h>
#include <ydb/core/protos/base.pb.h>
#include <ydb/core/base/id_wrapper.h>
#include <ydb/core/protos/blobstorage_base.pb.h>
#include <ydb/core/protos/blobstorage_base3.pb.h>
#include <ydb/library/yverify_stream/yverify_stream.h>
Expand Down Expand Up @@ -102,6 +103,8 @@ enum class EGroupConfigurationType : ui32 {
struct TGroupID {
TGroupID() = default;
TGroupID(const TGroupID&) = default;
TGroupID(const TIdWrapper<ui32, TGroupIdTag> wrapped_id)
: Raw(wrapped_id.GetRawId()) {}

TGroupID(EGroupConfigurationType configurationType, ui32 dataCenterId, ui32 groupLocalId) {
Set(configurationType, dataCenterId, groupLocalId);
Expand Down Expand Up @@ -886,6 +889,8 @@ struct TEvBlobStorage {
struct TEvInplacePatchResult;
struct TEvAssimilateResult;

using TGroupId = TIdWrapper<ui32, TGroupIdTag>;

struct TEvPut : public TEventLocal<TEvPut, EvPut> {
enum ETactic {
TacticMaxThroughput = 0,
Expand All @@ -905,7 +910,6 @@ struct TEvBlobStorage {
return "unknown";
}
};

const TLogoBlobID Id;
const TRcBuf Buffer; //FIXME(innokentii) const members prevent usage of move-semantics elsewhere
const TInstant Deadline;
Expand Down Expand Up @@ -975,14 +979,14 @@ struct TEvBlobStorage {
}

std::unique_ptr<TEvPutResult> MakeErrorResponse(NKikimrProto::EReplyStatus status, const TString& errorReason,
ui32 groupId);
TGroupId groupId);
};

struct TEvPutResult : public TEventLocal<TEvPutResult, EvPutResult> {
NKikimrProto::EReplyStatus Status;
const TLogoBlobID Id;
const TStorageStatusFlags StatusFlags;
const ui32 GroupId;
const TGroupId GroupId;
const float ApproximateFreeSpaceShare; // 0.f has special meaning 'data could not be obtained'
TString ErrorReason;
bool WrittenBeyondBarrier = false; // was this blob written beyond the barrier?
Expand All @@ -991,7 +995,7 @@ struct TEvBlobStorage {
const TString StorageId;

TEvPutResult(NKikimrProto::EReplyStatus status, const TLogoBlobID &id, const TStorageStatusFlags statusFlags,
ui32 groupId, float approximateFreeSpaceShare, const TString& storageId = Default<TString>())
TGroupId groupId, float approximateFreeSpaceShare, const TString& storageId = Default<TString>())
: Status(status)
, Id(id)
, StatusFlags(statusFlags)
Expand Down Expand Up @@ -1168,7 +1172,7 @@ struct TEvBlobStorage {
}

std::unique_ptr<TEvGetResult> MakeErrorResponse(NKikimrProto::EReplyStatus status, const TString& errorReason,
ui32 groupId);
TGroupId groupId);

private:
void VerifySameTabletId() const {
Expand Down Expand Up @@ -1212,7 +1216,7 @@ struct TEvBlobStorage {
// todo: replace with queue-like thing
ui32 ResponseSz;
TArrayHolder<TResponse> Responses;
const ui32 GroupId;
const TGroupId GroupId;
ui32 BlockedGeneration = 0; // valid only for requests with non-zero TabletId and true AcquireBlockedGeneration.
TString DebugInfo;
TString ErrorReason;
Expand All @@ -1222,7 +1226,7 @@ struct TEvBlobStorage {
// to measure blobstorage->client hop
TInstant Sent;

TEvGetResult(NKikimrProto::EReplyStatus status, ui32 sz, ui32 groupId)
TEvGetResult(NKikimrProto::EReplyStatus status, ui32 sz, TGroupId groupId)
: Status(status)
, ResponseSz(sz)
, Responses(sz == 0 ? nullptr : new TResponse[sz])
Expand Down Expand Up @@ -1314,7 +1318,7 @@ struct TEvBlobStorage {
}

std::unique_ptr<TEvBlockResult> MakeErrorResponse(NKikimrProto::EReplyStatus status, const TString& errorReason,
ui32 groupId);
TGroupId groupId);
};

struct TEvBlockResult : public TEventLocal<TEvBlockResult, EvBlockResult> {
Expand Down Expand Up @@ -1511,21 +1515,21 @@ struct TEvBlobStorage {
}

std::unique_ptr<TEvPatchResult> MakeErrorResponse(NKikimrProto::EReplyStatus status,
const TString& errorReason, ui32 groupId);
const TString& errorReason, TGroupId groupId);
};

struct TEvPatchResult : public TEventLocal<TEvPatchResult, EvPatchResult> {
NKikimrProto::EReplyStatus Status;
const TLogoBlobID Id;
const TStorageStatusFlags StatusFlags;
const ui32 GroupId;
const TGroupId GroupId;
const float ApproximateFreeSpaceShare; // 0.f has special meaning 'data could not be obtained'
TString ErrorReason;
mutable NLWTrace::TOrbit Orbit;
std::shared_ptr<TExecutionRelay> ExecutionRelay;

TEvPatchResult(NKikimrProto::EReplyStatus status, const TLogoBlobID &id, TStorageStatusFlags statusFlags,
ui32 groupId, float approximateFreeSpaceShare)
TGroupId groupId, float approximateFreeSpaceShare)
: Status(status)
, Id(id)
, StatusFlags(statusFlags)
Expand Down Expand Up @@ -1688,7 +1692,7 @@ struct TEvBlobStorage {
}

std::unique_ptr<TEvDiscoverResult> MakeErrorResponse(NKikimrProto::EReplyStatus status, const TString& errorReason,
ui32 groupId);
TGroupId groupId);
};

struct TEvDiscoverResult : public TEventLocal<TEvDiscoverResult, EvDiscoverResult> {
Expand Down Expand Up @@ -1793,7 +1797,7 @@ struct TEvBlobStorage {
}

std::unique_ptr<TEvRangeResult> MakeErrorResponse(NKikimrProto::EReplyStatus status, const TString& errorReason,
ui32 groupId);
TGroupId groupId);
};

struct TEvRangeResult : public TEventLocal<TEvRangeResult, EvRangeResult> {
Expand All @@ -1817,13 +1821,13 @@ struct TEvBlobStorage {
NKikimrProto::EReplyStatus Status;
TLogoBlobID From;
TLogoBlobID To;

TVector<TResponse> Responses;
const ui32 GroupId;
const TGroupId GroupId;
TString ErrorReason;
std::shared_ptr<TExecutionRelay> ExecutionRelay;

TEvRangeResult(NKikimrProto::EReplyStatus status, const TLogoBlobID &from, const TLogoBlobID &to, ui32 groupId)
TEvRangeResult(NKikimrProto::EReplyStatus status, const TLogoBlobID &from, const TLogoBlobID &to, TGroupId groupId)
: Status(status)
, From(from)
, To(to)
Expand Down Expand Up @@ -1981,7 +1985,7 @@ struct TEvBlobStorage {
}

std::unique_ptr<TEvCollectGarbageResult> MakeErrorResponse(NKikimrProto::EReplyStatus status, const TString& errorReason,
ui32 groupId);
TGroupId groupId);
};

struct TEvCollectGarbageResult : public TEventLocal<TEvCollectGarbageResult, EvCollectGarbageResult> {
Expand Down Expand Up @@ -2049,7 +2053,7 @@ struct TEvBlobStorage {
}

std::unique_ptr<TEvStatusResult> MakeErrorResponse(NKikimrProto::EReplyStatus status, const TString& errorReason,
ui32 groupId);
TGroupId groupId);
};

struct TEvStatusResult : public TEventLocal<TEvStatusResult, EvStatusResult> {
Expand Down Expand Up @@ -2124,7 +2128,7 @@ struct TEvBlobStorage {
}

std::unique_ptr<TEvAssimilateResult> MakeErrorResponse(NKikimrProto::EReplyStatus status, const TString& errorReason,
ui32 groupId);
TGroupId groupId);
};

struct TEvAssimilateResult : TEventLocal<TEvAssimilateResult, EvAssimilateResult> {
Expand Down
99 changes: 99 additions & 0 deletions ydb/core/base/id_wrapper.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
#pragma once
#include "defs.h"
#include <compare>
#include <concepts>
#include <util/generic/string.h>
#include <util/string/builder.h>
using TString = TBasicString<char>;

class TGroupIdTag;
template <class T>
concept IntegralType = std::is_integral<T>::value;

template <class T>
concept StringType = std::convertible_to<T, TString>;
template <typename T, typename Tag> class TIdWrapper {
private:
T Raw;

public:
using Type = T;
using TTag = Tag;

constexpr TIdWrapper() noexcept : Raw(0) {}

~TIdWrapper() = default;

explicit TIdWrapper(const T &value) : Raw(value) {}

TIdWrapper(TIdWrapper &&value) = default;

TIdWrapper(const TIdWrapper &other) = default;

TIdWrapper &operator=(const TIdWrapper &value) = default;

TIdWrapper &operator=(TIdWrapper &&value) = default;

TString ToString() const {
return TStringBuilder() << Raw;
}

void CopyToProto(NProtoBuf::Message *message,
void (NProtoBuf::Message::*pfn)(T value)) {
(message->*pfn)(*this);
}

static constexpr TIdWrapper FromValue(T value) noexcept {
return TIdWrapper(value);
}

static constexpr TIdWrapper Zero() noexcept { return TIdWrapper(); }

TIdWrapper &operator+=(const T &other) {
Raw += other.Raw;
return *this;
}

friend TIdWrapper operator+(const TIdWrapper &first,
const T &second) Y_WARN_UNUSED_RESULT {
return TIdWrapper(first->Raw + second);
}

TIdWrapper &operator++() {
Raw++;
return *this;
}

TIdWrapper operator++(int) {
TIdWrapper old = *this;
operator++();
return old;
}

friend std::ostream &operator<<(std::ostream &out, TIdWrapper &id) {
return out << id.Raw;
}

friend IOutputStream& operator<<(IOutputStream& out, const TIdWrapper& id) {
return out << id.Raw;
}

constexpr auto operator<=>(const TIdWrapper &) const = default;

T GetRawId() const { return Raw; }

friend std::hash<TIdWrapper<T, Tag>>;
friend THash<TIdWrapper<T, Tag>>;
};

template <typename T, typename Tag> struct std::hash<TIdWrapper<T, Tag>> {
std::size_t operator()(const TIdWrapper<T, Tag> &id) const {
return std::hash<T>{}(id.Raw);
}
};

template <typename T, typename Tag> struct THash<TIdWrapper<T, Tag>> {
std::size_t operator()(const TIdWrapper<T, Tag> &id) const {
return THash<T>()(id.Raw);
}
};
1 change: 1 addition & 0 deletions ydb/core/blob_depot/agent/agent_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include "resolved_value.h"

#include <ydb/core/protos/blob_depot_config.pb.h>
#include <ydb/core/base/id_wrapper.h>

namespace NKikimr::NBlobDepot {

Expand Down
2 changes: 1 addition & 1 deletion ydb/core/blob_depot/agent/proxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ namespace NKikimr::NBlobDepot {
switch (const ui32 type = event->Type()) {
case TEvBlobStorage::EvGet: {
auto& get = static_cast<TEvBlobStorage::TEvGet&>(*event);
response = get.MakeErrorResponse(NKikimrProto::OK, "proxy has vanished", groupId);
response = get.MakeErrorResponse(NKikimrProto::OK, "proxy has vanished", TIdWrapper<ui32, TGroupIdTag>(groupId));
auto& r = static_cast<TEvBlobStorage::TEvGetResult&>(*response);
for (size_t i = 0; i < r.ResponseSz; ++i) {
r.Responses[i].Status = NKikimrProto::NODATA;
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/blob_depot/agent/query.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ namespace NKikimr::NBlobDepot {
switch (Event->GetTypeRewrite()) {
#define XX(TYPE) \
case TEvBlobStorage::TYPE: \
response = Event->Get<TEvBlobStorage::T##TYPE>()->MakeErrorResponse(status, errorReason, Agent.VirtualGroupId); \
response = Event->Get<TEvBlobStorage::T##TYPE>()->MakeErrorResponse(status, errorReason, TIdWrapper<ui32, TGroupIdTag>(Agent.VirtualGroupId)); \
static_cast<TEvBlobStorage::T##TYPE##Result&>(*response).ExecutionRelay = std::move(ExecutionRelay); \
break; \
//
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/blob_depot/agent/storage_get.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ namespace NKikimr::NBlobDepot {
}

Response = std::make_unique<TEvBlobStorage::TEvGetResult>(NKikimrProto::OK, Request.QuerySize,
Agent.VirtualGroupId);
TIdWrapper<ui32, TGroupIdTag>(Agent.VirtualGroupId));
AnswersRemain = Request.QuerySize;

if (Request.ReaderTabletData) {
Expand Down
Loading

0 comments on commit a327d96

Please sign in to comment.