Skip to content

Commit

Permalink
RequestUnits for read by Kafka protocol
Browse files Browse the repository at this point in the history
  • Loading branch information
niksaveliev committed Jan 5, 2024
1 parent 4a88496 commit 9452781
Show file tree
Hide file tree
Showing 11 changed files with 90 additions and 14 deletions.
2 changes: 1 addition & 1 deletion ydb/core/kafka_proxy/actors/kafka_fetch_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ void TKafkaFetchActor::SendFetchRequests(const TActorContext& ctx) {
TVector<NKikimr::NPQ::TPartitionFetchRequest> partPQRequests;
PrepareFetchRequestData(topicIndex, partPQRequests);

NKikimr::NPQ::TFetchRequestSettings request(Context->DatabasePath, partPQRequests, FetchRequestData->MaxWaitMs, FetchRequestData->MaxBytes, *Context->UserToken);
NKikimr::NPQ::TFetchRequestSettings request(Context->DatabasePath, partPQRequests, FetchRequestData->MaxWaitMs, FetchRequestData->MaxBytes, Context->RlContext, *Context->UserToken);

auto fetchActor = NKikimr::NPQ::CreatePQFetchRequestActor(request, NKikimr::MakeSchemeCacheID(), ctx.SelfID);
auto actorId = ctx.Register(fetchActor);
Expand Down
1 change: 0 additions & 1 deletion ydb/core/kafka_proxy/actors/kafka_read_session_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -466,7 +466,6 @@ void TKafkaReadSessionActor::HandleAuthOk(NGRpcProxy::V1::TEvPQProxy::TEvAuthRes
TopicsInfo[internalName] = NGRpcProxy::TTopicHolder::FromTopicInfo(t);
FullPathToConverter[t.TopicNameConverter->GetPrimaryPath()] = t.TopicNameConverter;
FullPathToConverter[t.TopicNameConverter->GetSecondaryPath()] = t.TopicNameConverter;
// savnik: metering mode
}

Send(Context->ConnectionId, new TEvKafka::TEvReadSessionInfo(GroupId));
Expand Down
79 changes: 72 additions & 7 deletions ydb/core/persqueue/fetch_request_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@

#include <ydb/core/base/tablet_pipe.h>
#include <ydb/core/client/server/msgbus_server_pq_metacache.h>
#include <ydb/core/persqueue/pq_rl_helpers.h>
#include <ydb/core/persqueue/user_info.h>
#include <ydb/core/persqueue/write_meta.h>

#include <ydb/public/lib/base/msgbus_status.h>

Expand All @@ -21,7 +23,7 @@ using namespace NSchemeCache;


namespace {
const ui32 DefaultTimeout = 30000;
static constexpr TDuration DefaultTimeout = TDuration::MilliSeconds(30000);
}

struct TTabletInfo { // ToDo !! remove
Expand Down Expand Up @@ -52,7 +54,22 @@ struct TTopicInfo {

using namespace NActors;

class TPQFetchRequestActor : public TActorBootstrapped<TPQFetchRequestActor> {
class TPQFetchRequestActor : public TActorBootstrapped<TPQFetchRequestActor>
, private TRlHelpers {

struct TEvPrivate {
enum EEv {
EvTimeout = EventSpaceBegin(NActors::TEvents::ES_PRIVATE),
EvEnd
};

static_assert(EvEnd < EventSpaceEnd(NActors::TEvents::ES_PRIVATE), "expect EvEnd < EventSpaceEnd(NActors::TEvents::ES_PRIVATE)");

struct TEvTimeout : NActors::TEventLocal<TEvTimeout, EvTimeout> {
};

};

private:
TFetchRequestSettings Settings;

Expand All @@ -74,14 +91,16 @@ class TPQFetchRequestActor : public TActorBootstrapped<TPQFetchRequestActor> {
ui32 PartTabletsRequested;
TString ErrorReason;
TActorId RequesterId;
ui64 PendingQuotaAmount;

public:
static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
return NKikimrServices::TActivity::PQ_FETCH_REQUEST;
}

TPQFetchRequestActor(const TFetchRequestSettings& settings, const TActorId& schemeCacheId, const TActorId& requesterId)
: Settings(settings)
: TRlHelpers({}, settings.RlCtx, 8_KB, false, TDuration::Seconds(1)) //savnik: check duration
, Settings(settings)
, CanProcessFetchRequest(false)
, FetchRequestReadsDone(0)
, FetchRequestCurrentReadTablet(0)
Expand Down Expand Up @@ -115,7 +134,25 @@ class TPQFetchRequestActor : public TActorBootstrapped<TPQFetchRequestActor> {
TopicInfo[path].FetchInfo[p.Partition] = fetchInfo;
}
}
// FIXME(savnik) handle request timeout

void Handle(TEvents::TEvWakeup::TPtr& ev, const TActorContext& ctx) {
const auto tag = static_cast<EWakeupTag>(ev->Get()->Tag);
OnWakeup(tag);
switch (tag) {
case EWakeupTag::RlAllowed:
ProceedFetchRequest(ctx);
PendingQuotaAmount = 0;
break;

case EWakeupTag::RlNoResource:
// Re-requesting the quota. We do this until we get a quota.
RequestDataQuota(PendingQuotaAmount, ctx);
break;

default:
Y_VERIFY_DEBUG_S(false, "Unsupported tag: " << static_cast<ui64>(tag));
}
}

void Bootstrap(const TActorContext& ctx) {
LOG_INFO_S(ctx, NKikimrServices::PQ_FETCH_REQUEST, "Fetch request actor boostrapped. Request is valid: " << (!Response));
Expand All @@ -128,7 +165,8 @@ class TPQFetchRequestActor : public TActorBootstrapped<TPQFetchRequestActor> {
ctx.Schedule(TDuration::MilliSeconds(Min<ui32>(Settings.MaxWaitTimeMs, 30000)), new TEvPersQueue::TEvHasDataInfoResponse);

SendSchemeCacheRequest(ctx);
Become(&TPQFetchRequestActor::StateFunc, ctx, TDuration::MilliSeconds(DefaultTimeout), new TEvents::TEvWakeup());
Schedule(DefaultTimeout, new TEvPrivate::TEvTimeout());
Become(&TPQFetchRequestActor::StateFunc);
}

void SendSchemeCacheRequest(const TActorContext& ctx) {
Expand Down Expand Up @@ -332,6 +370,7 @@ class TPQFetchRequestActor : public TActorBootstrapped<TPQFetchRequestActor> {
for (auto& actor: PQClient) {
NTabletPipe::CloseClient(ctx, actor);
}
TRlHelpers::PassAway(SelfId());
TActorBootstrapped<TPQFetchRequestActor>::Die(ctx);
}

Expand Down Expand Up @@ -433,7 +472,32 @@ class TPQFetchRequestActor : public TActorBootstrapped<TPQFetchRequestActor> {
read->SetErrorReason(record.GetErrorReason());

++FetchRequestReadsDone;
ProceedFetchRequest(ctx);

auto it = TopicInfo.find(CanonizePath(topic));
Y_ABORT_UNLESS(it != TopicInfo.end());

SetMeteringMode(it->second.PQInfo->Description.GetPQTabletConfig().GetMeteringMode());

if (IsQuotaRequired()) {
PendingQuotaAmount = CalcRuConsumption(GetPayloadSize(record));
RequestDataQuota(PendingQuotaAmount, ctx);
} else {
ProceedFetchRequest(ctx);
}

}

ui64 GetPayloadSize(const NKikimrClient::TResponse& record) const {
ui64 readBytesSize = 0;
const auto& response = record.GetPartitionResponse();
if (response.HasCmdReadResult()) {
const auto& results = response.GetCmdReadResult().GetResult();
for (auto& r : results) {
auto proto(NKikimr::GetDeserializedData(r.GetData()));
readBytesSize += proto.GetData().Size();
}
}
return readBytesSize;
}

bool CheckAccess(const TSecurityObject& access) {
Expand Down Expand Up @@ -466,9 +530,10 @@ class TPQFetchRequestActor : public TActorBootstrapped<TPQFetchRequestActor> {
HFunc(TEvTabletPipe::TEvClientDestroyed, Handle);
HFunc(TEvTabletPipe::TEvClientConnected, Handle);
HFunc(TEvPersQueue::TEvResponse, Handle);
HFunc(TEvents::TEvWakeup, Handle);
HFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, HandleSchemeCacheResponse);
HFunc(TEvPersQueue::TEvHasDataInfoResponse, Handle);
CFunc(TEvents::TSystem::Wakeup, HandleTimeout);
CFunc(TEvPrivate::EvTimeout, HandleTimeout);
CFunc(NActors::TEvents::TSystem::PoisonPill, Die);
)
};
Expand Down
5 changes: 4 additions & 1 deletion ydb/core/persqueue/fetch_request_actor.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#pragma once

#include <ydb/core/persqueue/pq_rl_helpers.h>
#include <ydb/library/actors/core/actor.h>
#include <ydb/library/aclib/aclib.h>

Expand Down Expand Up @@ -31,17 +32,19 @@ struct TFetchRequestSettings {
TMaybe<NACLib::TUserToken> User;
ui64 MaxWaitTimeMs;
ui64 TotalMaxBytes;
TRlContext RlCtx;

ui64 RequestId = 0;
TFetchRequestSettings(
const TString& database, const TVector<TPartitionFetchRequest>& partitions, ui64 maxWaitTimeMs, ui64 totalMaxBytes,
const TString& database, const TVector<TPartitionFetchRequest>& partitions, ui64 maxWaitTimeMs, ui64 totalMaxBytes, TRlContext rlCtx,
const TMaybe<NACLib::TUserToken>& user = {}, ui64 requestId = 0
)
: Database(database)
, Partitions(partitions)
, User(user)
, MaxWaitTimeMs(maxWaitTimeMs)
, TotalMaxBytes(totalMaxBytes)
, RlCtx(rlCtx)
, RequestId(requestId)
{}
};
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/persqueue/pq_rl_helpers.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ class TRlHelpers: public NMetering::TStreamRequestUnitsCalculator {
};

void Bootstrap(const TActorId selfId, const NActors::TActorContext& ctx);
void PassAway(const TActorId selfId);
void PassAway(const TActorId selfId); //savnik а где он вызывается в текущих акторах, которые наследуют этот класс?

bool IsQuotaRequired() const;
bool IsQuotaInflight() const;
Expand Down
1 change: 1 addition & 0 deletions ydb/core/persqueue/writer/writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -735,6 +735,7 @@ class TPartitionWriter: public TActorBootstrapped<TPartitionWriter>, private TRl
NTabletPipe::CloseAndForgetClient(SelfId(), PipeClient);
}
SendError("Unexpected termination");
TRlHelpers::PassAway(SelfId());
TActorBootstrapped::PassAway();
}

Expand Down
1 change: 1 addition & 0 deletions ydb/services/datastreams/datastreams_proxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1560,6 +1560,7 @@ namespace NKikimr::NDataStreams::V1 {

void TGetRecordsActor::Die(const TActorContext& ctx) {
NTabletPipe::CloseClient(ctx, PipeClient);
TRlHelpers::PassAway(SelfId());
TBase::Die(ctx);
}

Expand Down
7 changes: 7 additions & 0 deletions ydb/services/datastreams/put_records_actor.h
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,7 @@ namespace NKikimr::NDataStreams::V1 {
void Bootstrap(const NActors::TActorContext &ctx);
void PreparePartitionActors(const NActors::TActorContext& ctx);
void HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev);
void Die(const TActorContext& ctx) override;

protected:
void Write(const TActorContext& ctx);
Expand Down Expand Up @@ -258,6 +259,12 @@ namespace NKikimr::NDataStreams::V1 {
}
};

template<class TDerived, class TProto>
void TPutRecordsActorBase<TDerived, TProto>::Die(const TActorContext& ctx) {
TRlHelpers::PassAway(TDerived::SelfId());
TBase::Die(ctx);
}

template<class TDerived, class TProto>
TPutRecordsActorBase<TDerived, TProto>::TPutRecordsActorBase(NGRpcService::IRequestOpCtx* request)
: TBase(request, dynamic_cast<const typename TProto::TRequest*>(request->GetRequest())->stream_name())
Expand Down
2 changes: 1 addition & 1 deletion ydb/services/persqueue_v1/actors/direct_read_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ void TDirectReadSessionActor::Die(const TActorContext& ctx) {
LOG_INFO_S(ctx, NKikimrServices::PQ_READ_PROXY, LOG_PREFIX << " proxy is DEAD");
ctx.Send(GetPQReadServiceActorID(), new TEvPQProxy::TEvSessionDead(Cookie));
ctx.Send(NPQ::MakePQDReadCacheServiceActorId(), new TEvPQProxy::TEvDirectReadDataSessionDead(Session));

TRlHelpers::PassAway(SelfId());
TActorBootstrapped<TDirectReadSessionActor>::Die(ctx);
}

Expand Down
2 changes: 1 addition & 1 deletion ydb/services/persqueue_v1/actors/read_session_actor.ipp
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,7 @@ void TReadSessionActor<UseMigrationProtocol>::Die(const TActorContext& ctx) {

LOG_INFO_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " is DEAD");
ctx.Send(GetPQReadServiceActorID(), new TEvPQProxy::TEvSessionDead(Cookie));

TRlHelpers::PassAway(TActorBootstrapped<TReadSessionActor>::SelfId());
TActorBootstrapped<TReadSessionActor>::Die(ctx);
}

Expand Down
2 changes: 1 addition & 1 deletion ydb/services/persqueue_v1/actors/write_session_actor.ipp
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ void TWriteSessionActor<UseMigrationProtocol>::Die(const TActorContext& ctx) {
}

State = ES_DYING;

TRlHelpers::PassAway(TActorBootstrapped<TWriteSessionActor>::SelfId());
TActorBootstrapped<TWriteSessionActor>::Die(ctx);
}

Expand Down

0 comments on commit 9452781

Please sign in to comment.