Skip to content

Commit

Permalink
Merge d8d9fab into e9bce00
Browse files Browse the repository at this point in the history
  • Loading branch information
mregrock authored Jun 18, 2024
2 parents e9bce00 + d8d9fab commit a12eab3
Show file tree
Hide file tree
Showing 120 changed files with 643 additions and 414 deletions.
18 changes: 9 additions & 9 deletions ydb/core/base/blobstorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,14 @@ bool operator<(const TPDiskCategory x, const TPDiskCategory y) {
}

std::unique_ptr<TEvBlobStorage::TEvPutResult> TEvBlobStorage::TEvPut::MakeErrorResponse(
NKikimrProto::EReplyStatus status, const TString& errorReason, ui32 groupId) {
NKikimrProto::EReplyStatus status, const TString& errorReason, TGroupId groupId) {
auto res = std::make_unique<TEvPutResult>(status, Id, TStorageStatusFlags(), groupId, 0.0f);
res->ErrorReason = errorReason;
return res;
}

std::unique_ptr<TEvBlobStorage::TEvGetResult> TEvBlobStorage::TEvGet::MakeErrorResponse(
NKikimrProto::EReplyStatus status, const TString& errorReason, ui32 groupId) {
NKikimrProto::EReplyStatus status, const TString& errorReason, TGroupId groupId) {
auto res = std::make_unique<TEvGetResult>(status, QuerySize, groupId);
for (ui32 i = 0; i < QuerySize; ++i) {
const auto& from = Queries[i];
Expand All @@ -68,14 +68,14 @@ std::unique_ptr<TEvBlobStorage::TEvGetResult> TEvBlobStorage::TEvGet::MakeErrorR
}

std::unique_ptr<TEvBlobStorage::TEvBlockResult> TEvBlobStorage::TEvBlock::MakeErrorResponse(
NKikimrProto::EReplyStatus status, const TString& errorReason, ui32 /*groupId*/) {
NKikimrProto::EReplyStatus status, const TString& errorReason, TGroupId /*groupId*/) {
auto res = std::make_unique<TEvBlockResult>(status);
res->ErrorReason = errorReason;
return res;
}

std::unique_ptr<TEvBlobStorage::TEvPatchResult> TEvBlobStorage::TEvPatch::MakeErrorResponse(
NKikimrProto::EReplyStatus status, const TString& errorReason, ui32 groupId) {
NKikimrProto::EReplyStatus status, const TString& errorReason, TGroupId groupId) {
auto res = std::make_unique<TEvPatchResult>(status, PatchedId, TStorageStatusFlags(), groupId, 0.0f);
res->ErrorReason = errorReason;
return res;
Expand All @@ -89,35 +89,35 @@ std::unique_ptr<TEvBlobStorage::TEvInplacePatchResult> TEvBlobStorage::TEvInplac
}

std::unique_ptr<TEvBlobStorage::TEvDiscoverResult> TEvBlobStorage::TEvDiscover::MakeErrorResponse(
NKikimrProto::EReplyStatus status, const TString& errorReason, ui32 /*groupId*/) {
NKikimrProto::EReplyStatus status, const TString& errorReason, TGroupId/*groupId*/) {
auto res = std::make_unique<TEvDiscoverResult>(status, MinGeneration, 0);
res->ErrorReason = errorReason;
return res;
}

std::unique_ptr<TEvBlobStorage::TEvRangeResult> TEvBlobStorage::TEvRange::MakeErrorResponse(
NKikimrProto::EReplyStatus status, const TString& errorReason, ui32 groupId) {
NKikimrProto::EReplyStatus status, const TString& errorReason, TGroupId groupId) {
auto res = std::make_unique<TEvRangeResult>(status, From, To, groupId);
res->ErrorReason = errorReason;
return res;
}

std::unique_ptr<TEvBlobStorage::TEvCollectGarbageResult> TEvBlobStorage::TEvCollectGarbage::MakeErrorResponse(
NKikimrProto::EReplyStatus status, const TString& errorReason, ui32 /*groupId*/) {
NKikimrProto::EReplyStatus status, const TString& errorReason, TGroupId /*groupId*/) {
auto res = std::make_unique<TEvCollectGarbageResult>(status, TabletId, RecordGeneration, PerGenerationCounter, Channel);
res->ErrorReason = errorReason;
return res;
}

std::unique_ptr<TEvBlobStorage::TEvStatusResult> TEvBlobStorage::TEvStatus::MakeErrorResponse(
NKikimrProto::EReplyStatus status, const TString& errorReason, ui32 /*groupId*/) {
NKikimrProto::EReplyStatus status, const TString& errorReason, TGroupId /*groupId*/) {
auto res = std::make_unique<TEvStatusResult>(status, TStorageStatusFlags());
res->ErrorReason = errorReason;
return res;
}

std::unique_ptr<TEvBlobStorage::TEvAssimilateResult> TEvBlobStorage::TEvAssimilate::MakeErrorResponse(
NKikimrProto::EReplyStatus status, const TString& errorReason, ui32 /*groupId*/) {
NKikimrProto::EReplyStatus status, const TString& errorReason, TGroupId/*groupId*/) {
return std::make_unique<TEvBlobStorage::TEvAssimilateResult>(status, errorReason);
}

Expand Down
56 changes: 36 additions & 20 deletions ydb/core/base/blobstorage.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include <ydb/core/base/services/blobstorage_service_id.h>
#include <ydb/core/base/blobstorage_grouptype.h>
#include <ydb/core/protos/base.pb.h>
#include <ydb/core/base/blobstorage_common.h>
#include <ydb/core/protos/blobstorage_base.pb.h>
#include <ydb/core/protos/blobstorage_base3.pb.h>
#include <ydb/library/yverify_stream/yverify_stream.h>
Expand All @@ -33,6 +34,7 @@ static constexpr ui64 MaxCollectGarbageFlagsPerMessage = 10000;
static constexpr TDuration VDiskCooldownTimeout = TDuration::Seconds(15);
static constexpr TDuration VDiskCooldownTimeoutOnProxy = TDuration::Seconds(12);


struct TStorageStatusFlags {
ui32 Raw = 0;

Expand Down Expand Up @@ -102,6 +104,8 @@ enum class EGroupConfigurationType : ui32 {
struct TGroupID {
TGroupID() = default;
TGroupID(const TGroupID&) = default;
TGroupID(const TGroupId wrappedId)
: Raw(wrappedId.GetRawId()) {}

TGroupID(EGroupConfigurationType configurationType, ui32 dataCenterId, ui32 groupLocalId) {
Set(configurationType, dataCenterId, groupLocalId);
Expand Down Expand Up @@ -437,6 +441,10 @@ inline IEventHandle *CreateEventForBSProxy(TActorId sender, ui32 groupId, IEvent
return CreateEventForBSProxy(sender, MakeBlobStorageProxyID(groupId), ev, cookie, std::move(traceId));
}

inline IEventHandle *CreateEventForBSProxy(TActorId sender, TGroupId groupId, IEventBase *ev, ui64 cookie, NWilson::TTraceId traceId = {}) {
return CreateEventForBSProxy(sender, MakeBlobStorageProxyID(groupId), ev, cookie, std::move(traceId));
}

inline bool SendToBSProxy(TActorId sender, TActorId recipient, IEventBase *ev, ui64 cookie = 0, NWilson::TTraceId traceId = {}) {
return TActivationContext::Send(CreateEventForBSProxy(sender, recipient, ev, cookie, std::move(traceId)));
}
Expand All @@ -455,6 +463,15 @@ inline bool SendToBSProxy(const TActorContext &ctx, ui32 groupId, IEventBase *ev
return ctx.Send(CreateEventForBSProxy(ctx.SelfID, groupId, ev, cookie, std::move(traceId)));
}

inline bool SendToBSProxy(TActorId sender, TGroupId groupId, IEventBase *ev, ui64 cookie = 0, NWilson::TTraceId traceId = {}) {
return TActivationContext::Send(CreateEventForBSProxy(sender, groupId, ev, cookie, std::move(traceId)));
}

inline bool SendToBSProxy(const TActorContext &ctx, TGroupId groupId, IEventBase *ev, ui64 cookie = 0,
NWilson::TTraceId traceId = {}) {
return ctx.Send(CreateEventForBSProxy(ctx.SelfID, groupId, ev, cookie, std::move(traceId)));
}

struct TEvBlobStorage {
enum EEv {
// user <-> proxy interface
Expand Down Expand Up @@ -888,7 +905,7 @@ struct TEvBlobStorage {
struct TEvPatchResult;
struct TEvInplacePatchResult;
struct TEvAssimilateResult;

struct TEvPut : public TEventLocal<TEvPut, EvPut> {
enum ETactic {
TacticMaxThroughput = 0,
Expand All @@ -908,7 +925,6 @@ struct TEvBlobStorage {
return "unknown";
}
};

const TLogoBlobID Id;
const TRcBuf Buffer; //FIXME(innokentii) const members prevent usage of move-semantics elsewhere
const TInstant Deadline;
Expand Down Expand Up @@ -978,14 +994,14 @@ struct TEvBlobStorage {
}

std::unique_ptr<TEvPutResult> MakeErrorResponse(NKikimrProto::EReplyStatus status, const TString& errorReason,
ui32 groupId);
TGroupId groupId);
};

struct TEvPutResult : public TEventLocal<TEvPutResult, EvPutResult> {
NKikimrProto::EReplyStatus Status;
const TLogoBlobID Id;
const TStorageStatusFlags StatusFlags;
const ui32 GroupId;
const TGroupId GroupId;
const float ApproximateFreeSpaceShare; // 0.f has special meaning 'data could not be obtained'
TString ErrorReason;
bool WrittenBeyondBarrier = false; // was this blob written beyond the barrier?
Expand All @@ -994,7 +1010,7 @@ struct TEvBlobStorage {
const TString StorageId;

TEvPutResult(NKikimrProto::EReplyStatus status, const TLogoBlobID &id, const TStorageStatusFlags statusFlags,
ui32 groupId, float approximateFreeSpaceShare, const TString& storageId = Default<TString>())
TGroupId groupId, float approximateFreeSpaceShare, const TString& storageId = Default<TString>())
: Status(status)
, Id(id)
, StatusFlags(statusFlags)
Expand Down Expand Up @@ -1171,7 +1187,7 @@ struct TEvBlobStorage {
}

std::unique_ptr<TEvGetResult> MakeErrorResponse(NKikimrProto::EReplyStatus status, const TString& errorReason,
ui32 groupId);
TGroupId groupId);

private:
void VerifySameTabletId() const {
Expand Down Expand Up @@ -1215,7 +1231,7 @@ struct TEvBlobStorage {
// todo: replace with queue-like thing
ui32 ResponseSz;
TArrayHolder<TResponse> Responses;
const ui32 GroupId;
const TGroupId GroupId;
ui32 BlockedGeneration = 0; // valid only for requests with non-zero TabletId and true AcquireBlockedGeneration.
TString DebugInfo;
TString ErrorReason;
Expand All @@ -1225,7 +1241,7 @@ struct TEvBlobStorage {
// to measure blobstorage->client hop
TInstant Sent;

TEvGetResult(NKikimrProto::EReplyStatus status, ui32 sz, ui32 groupId)
TEvGetResult(NKikimrProto::EReplyStatus status, ui32 sz, TGroupId groupId)
: Status(status)
, ResponseSz(sz)
, Responses(sz == 0 ? nullptr : new TResponse[sz])
Expand Down Expand Up @@ -1317,7 +1333,7 @@ struct TEvBlobStorage {
}

std::unique_ptr<TEvBlockResult> MakeErrorResponse(NKikimrProto::EReplyStatus status, const TString& errorReason,
ui32 groupId);
TGroupId groupId);
};

struct TEvBlockResult : public TEventLocal<TEvBlockResult, EvBlockResult> {
Expand Down Expand Up @@ -1514,21 +1530,21 @@ struct TEvBlobStorage {
}

std::unique_ptr<TEvPatchResult> MakeErrorResponse(NKikimrProto::EReplyStatus status,
const TString& errorReason, ui32 groupId);
const TString& errorReason, TGroupId groupId);
};

struct TEvPatchResult : public TEventLocal<TEvPatchResult, EvPatchResult> {
NKikimrProto::EReplyStatus Status;
const TLogoBlobID Id;
const TStorageStatusFlags StatusFlags;
const ui32 GroupId;
const TGroupId GroupId;
const float ApproximateFreeSpaceShare; // 0.f has special meaning 'data could not be obtained'
TString ErrorReason;
mutable NLWTrace::TOrbit Orbit;
std::shared_ptr<TExecutionRelay> ExecutionRelay;

TEvPatchResult(NKikimrProto::EReplyStatus status, const TLogoBlobID &id, TStorageStatusFlags statusFlags,
ui32 groupId, float approximateFreeSpaceShare)
TGroupId groupId, float approximateFreeSpaceShare)
: Status(status)
, Id(id)
, StatusFlags(statusFlags)
Expand Down Expand Up @@ -1691,7 +1707,7 @@ struct TEvBlobStorage {
}

std::unique_ptr<TEvDiscoverResult> MakeErrorResponse(NKikimrProto::EReplyStatus status, const TString& errorReason,
ui32 groupId);
TGroupId groupId);
};

struct TEvDiscoverResult : public TEventLocal<TEvDiscoverResult, EvDiscoverResult> {
Expand Down Expand Up @@ -1796,7 +1812,7 @@ struct TEvBlobStorage {
}

std::unique_ptr<TEvRangeResult> MakeErrorResponse(NKikimrProto::EReplyStatus status, const TString& errorReason,
ui32 groupId);
TGroupId groupId);
};

struct TEvRangeResult : public TEventLocal<TEvRangeResult, EvRangeResult> {
Expand All @@ -1820,13 +1836,13 @@ struct TEvBlobStorage {
NKikimrProto::EReplyStatus Status;
TLogoBlobID From;
TLogoBlobID To;

TVector<TResponse> Responses;
const ui32 GroupId;
const TGroupId GroupId;
TString ErrorReason;
std::shared_ptr<TExecutionRelay> ExecutionRelay;

TEvRangeResult(NKikimrProto::EReplyStatus status, const TLogoBlobID &from, const TLogoBlobID &to, ui32 groupId)
TEvRangeResult(NKikimrProto::EReplyStatus status, const TLogoBlobID &from, const TLogoBlobID &to, TGroupId groupId)
: Status(status)
, From(from)
, To(to)
Expand Down Expand Up @@ -1984,7 +2000,7 @@ struct TEvBlobStorage {
}

std::unique_ptr<TEvCollectGarbageResult> MakeErrorResponse(NKikimrProto::EReplyStatus status, const TString& errorReason,
ui32 groupId);
TGroupId groupId);
};

struct TEvCollectGarbageResult : public TEventLocal<TEvCollectGarbageResult, EvCollectGarbageResult> {
Expand Down Expand Up @@ -2052,7 +2068,7 @@ struct TEvBlobStorage {
}

std::unique_ptr<TEvStatusResult> MakeErrorResponse(NKikimrProto::EReplyStatus status, const TString& errorReason,
ui32 groupId);
TGroupId groupId);
};

struct TEvStatusResult : public TEventLocal<TEvStatusResult, EvStatusResult> {
Expand Down Expand Up @@ -2127,7 +2143,7 @@ struct TEvBlobStorage {
}

std::unique_ptr<TEvAssimilateResult> MakeErrorResponse(NKikimrProto::EReplyStatus status, const TString& errorReason,
ui32 groupId);
TGroupId groupId);
};

struct TEvAssimilateResult : TEventLocal<TEvAssimilateResult, EvAssimilateResult> {
Expand Down
11 changes: 11 additions & 0 deletions ydb/core/base/blobstorage_common.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
#pragma once

#include <ydb/core/base/id_wrapper.h>

namespace NKikimr{

using TGroupId = TIdWrapper<ui32, TGroupIdTag>;

}


Loading

0 comments on commit a12eab3

Please sign in to comment.