diff --git a/ydb/core/kafka_proxy/actors/actors.h b/ydb/core/kafka_proxy/actors/actors.h index d7de16925140..2e1c27ee8574 100644 --- a/ydb/core/kafka_proxy/actors/actors.h +++ b/ydb/core/kafka_proxy/actors/actors.h @@ -27,12 +27,13 @@ struct TContext { const NKikimrConfig::TKafkaProxyConfig& Config; TActorId ConnectionId; - TString ClientId; + TString KafkaClient; EAuthSteps AuthenticationStep = EAuthSteps::WAIT_HANDSHAKE; TString SaslMechanism; + TString GroupId; TString DatabasePath; TString FolderId; TString CloudId; @@ -128,6 +129,9 @@ inline EKafkaErrors ConvertErrorCode(Ydb::PersQueue::ErrorCode::ErrorCode code) return EKafkaErrors::UNKNOWN_TOPIC_OR_PARTITION; case Ydb::PersQueue::ErrorCode::ErrorCode::ACCESS_DENIED: return EKafkaErrors::TOPIC_AUTHORIZATION_FAILED; + case Ydb::PersQueue::ErrorCode::ErrorCode::SET_OFFSET_ERROR_COMMIT_TO_FUTURE: + case Ydb::PersQueue::ErrorCode::ErrorCode::SET_OFFSET_ERROR_COMMIT_TO_PAST: + return EKafkaErrors::OFFSET_OUT_OF_RANGE; default: return EKafkaErrors::UNKNOWN_SERVER_ERROR; } diff --git a/ydb/core/kafka_proxy/actors/kafka_api_versions_actor.cpp b/ydb/core/kafka_proxy/actors/kafka_api_versions_actor.cpp index 6817f92c9dc2..96c628477a5c 100644 --- a/ydb/core/kafka_proxy/actors/kafka_api_versions_actor.cpp +++ b/ydb/core/kafka_proxy/actors/kafka_api_versions_actor.cpp @@ -43,7 +43,7 @@ TApiVersionsResponseData::TPtr GetApiVersions() { AddApiKey(response->ApiKeys, LEAVE_GROUP); AddApiKey(response->ApiKeys, HEARTBEAT); AddApiKey(response->ApiKeys, FIND_COORDINATOR); - AddApiKey(response->ApiKeys, OFFSET_COMMIT); + AddApiKey(response->ApiKeys, OFFSET_COMMIT, {.MaxVersion=1}); AddApiKey(response->ApiKeys, OFFSET_FETCH); return response; diff --git a/ydb/core/kafka_proxy/actors/kafka_fetch_actor.cpp b/ydb/core/kafka_proxy/actors/kafka_fetch_actor.cpp index 5c652404c48e..5cd0be58bd6b 100644 --- a/ydb/core/kafka_proxy/actors/kafka_fetch_actor.cpp +++ b/ydb/core/kafka_proxy/actors/kafka_fetch_actor.cpp @@ -1,21 +1,22 @@ #include #include -#include "ydb/core/kafka_proxy/kafka_metrics.h" #include +#include "ydb/core/kafka_proxy/kafka_metrics.h" #include -#include #include +#include #include +#include #include -#include "kafka_fetch_actor.h" #include "actors.h" +#include "kafka_fetch_actor.h" namespace NKafka { static constexpr size_t SizeOfZeroVarint = 1; -static constexpr size_t BatchFirstTwoFildsSize = 12; +static constexpr size_t BatchFirstTwoFieldsSize = 12; static constexpr size_t KafkaMagic = 2; NActors::IActor* CreateKafkaFetchActor(const TContext::TPtr context, const ui64 correlationId, const TMessagePtr& message) { @@ -34,7 +35,7 @@ void TKafkaFetchActor::SendFetchRequests(const TActorContext& ctx) { TVector partPQRequests; PrepareFetchRequestData(topicIndex, partPQRequests); - NKikimr::NPQ::TFetchRequestSettings request(Context->DatabasePath, partPQRequests, FetchRequestData->MaxWaitMs, FetchRequestData->MaxBytes, *Context->UserToken); + NKikimr::NPQ::TFetchRequestSettings request(Context->DatabasePath, partPQRequests, FetchRequestData->MaxWaitMs, FetchRequestData->MaxBytes, Context->RlContext, *Context->UserToken); auto fetchActor = NKikimr::NPQ::CreatePQFetchRequestActor(request, NKikimr::MakeSchemeCacheID(), ctx.SelfID); auto actorId = ctx.Register(fetchActor); @@ -60,6 +61,7 @@ void TKafkaFetchActor::PrepareFetchRequestData(const size_t topicIndex, TVector< partPQRequest.Partition = partKafkaRequest.Partition; partPQRequest.Offset = partKafkaRequest.FetchOffset; partPQRequest.MaxBytes = partKafkaRequest.PartitionMaxBytes; + partPQRequest.ClientId = Context->GroupId.Empty() ? NKikimr::NPQ::CLIENTID_WITHOUT_CONSUMER : Context->GroupId; } } @@ -86,7 +88,7 @@ size_t TKafkaFetchActor::CheckTopicIndex(const NKikimr::TEvPQ::TEvFetchResponse: Y_DEBUG_ABORT_UNLESS(topicIt != TopicIndexes.end()); if (topicIt == TopicIndexes.end()) { - KAFKA_LOG_CRIT("Fetch actor: Received unexpected TEvFetchResponse. Ignoring. Expect malformed/incompled fetch reply."); + KAFKA_LOG_ERROR("Fetch actor: Received unexpected TEvFetchResponse. Ignoring. Expect malformed/incompled fetch reply."); return std::numeric_limits::max(); } @@ -172,7 +174,7 @@ void TKafkaFetchActor::FillRecordsBatch(const NKikimrClient::TPersQueueFetchResp record.TimestampDelta = lastTimestamp - baseTimestamp; record.Length = record.Size(TKafkaRecord::MessageMeta::PresentVersions.Max) - SizeOfZeroVarint; - KAFKA_LOG_D("Fetch actor: Record info. Value: " << record.DataChunk.GetData() << ", OffsetDelta: " << record.OffsetDelta << + KAFKA_LOG_D("Fetch actor: Record info. OffsetDelta: " << record.OffsetDelta << ", TimestampDelta: " << record.TimestampDelta << ", Length: " << record.Length); } @@ -184,7 +186,7 @@ void TKafkaFetchActor::FillRecordsBatch(const NKikimrClient::TPersQueueFetchResp recordsBatch.BaseSequence = baseSequense; //recordsBatch.Attributes https://kafka.apache.org/documentation/#recordbatch - recordsBatch.BatchLength = recordsBatch.Size(TKafkaRecordBatch::MessageMeta::PresentVersions.Max) - BatchFirstTwoFildsSize; + recordsBatch.BatchLength = recordsBatch.Size(TKafkaRecordBatch::MessageMeta::PresentVersions.Max) - BatchFirstTwoFieldsSize; KAFKA_LOG_D("Fetch actor: RecordBatch info. BaseOffset: " << recordsBatch.BaseOffset << ", LastOffsetDelta: " << recordsBatch.LastOffsetDelta << ", BaseTimestamp: " << recordsBatch.BaseTimestamp << ", MaxTimestamp: " << recordsBatch.MaxTimestamp << ", BaseSequence: " << recordsBatch.BaseSequence << ", BatchLength: " << recordsBatch.BatchLength); diff --git a/ydb/core/kafka_proxy/actors/kafka_offset_commit_actor.cpp b/ydb/core/kafka_proxy/actors/kafka_offset_commit_actor.cpp index 0f14b79c8eb0..4a4babf95aa5 100644 --- a/ydb/core/kafka_proxy/actors/kafka_offset_commit_actor.cpp +++ b/ydb/core/kafka_proxy/actors/kafka_offset_commit_actor.cpp @@ -1,7 +1,5 @@ #include "kafka_offset_commit_actor.h" -#include - namespace NKafka { @@ -9,30 +7,198 @@ NActors::IActor* CreateKafkaOffsetCommitActor(const TContext::TPtr context, cons return new TKafkaOffsetCommitActor(context, correlationId, message); } -TOffsetCommitResponseData::TPtr TKafkaOffsetCommitActor::GetOffsetCommitResponse() { - TOffsetCommitResponseData::TPtr response = std::make_shared(); +TString TKafkaOffsetCommitActor::LogPrefix() { + return "TKafkaOffsetCommitActor"; +} + +void TKafkaOffsetCommitActor::Die(const TActorContext& ctx) { + KAFKA_LOG_D("PassAway"); + ctx.Send(AuthInitActor, new TEvents::TEvPoisonPill()); + for (const auto& tabletToPipePair: TabletIdToPipe) { + NTabletPipe::CloseClient(ctx, tabletToPipePair.second); + } + TBase::Die(ctx); +} + +void TKafkaOffsetCommitActor::Handle(NKikimr::NGRpcProxy::V1::TEvPQProxy::TEvCloseSession::TPtr& ev, const TActorContext& ctx) { + KAFKA_LOG_CRIT("Auth failed. reason# " << ev->Get()->Reason); + Error = ConvertErrorCode(ev->Get()->ErrorCode); + SendFailedForAllPartitions(Error, ctx); +} +void TKafkaOffsetCommitActor::SendFailedForAllPartitions(EKafkaErrors error, const TActorContext& ctx) { for (auto topicReq: Message->Topics) { TOffsetCommitResponseData::TOffsetCommitResponseTopic topic; topic.Name = topicReq.Name; for (auto partitionRequest: topicReq.Partitions) { TOffsetCommitResponseData::TOffsetCommitResponseTopic::TOffsetCommitResponsePartition partition; partition.PartitionIndex = partitionRequest.PartitionIndex; - partition.ErrorCode = NONE_ERROR; + partition.ErrorCode = error; topic.Partitions.push_back(partition); } - response->Topics.push_back(topic); + Response->Topics.push_back(topic); + } + Send(Context->ConnectionId, new TEvKafka::TEvResponse(CorrelationId, Response, Error)); + Die(ctx); +} + +void TKafkaOffsetCommitActor::Handle(TEvTabletPipe::TEvClientConnected::TPtr& ev, const TActorContext& ctx) { + TEvTabletPipe::TEvClientConnected *msg = ev->Get(); + + if (msg->Status != NKikimrProto::OK) { + KAFKA_LOG_CRIT("Pipe to tablet is dead. status# " << ev->Get()->Status); + ProcessPipeProblem(msg->TabletId, ctx); + } +} + +void TKafkaOffsetCommitActor::Handle(TEvTabletPipe::TEvClientDestroyed::TPtr& ev, const TActorContext& ctx) { + KAFKA_LOG_CRIT("Pipe to tablet is destroyed"); + ProcessPipeProblem(ev->Get()->TabletId, ctx); +} + +void TKafkaOffsetCommitActor::ProcessPipeProblem(ui64 tabletId, const TActorContext& ctx) { + auto cookiesIt = TabletIdToCookies.find(tabletId); + Y_ABORT_UNLESS(cookiesIt != TabletIdToCookies.end()); + + for (auto cookie: cookiesIt->second) { + auto requestInfoIt = CookieToRequestInfo.find(cookie); + Y_ABORT_UNLESS(requestInfoIt != CookieToRequestInfo.end()); + + if (!requestInfoIt->second.Done) { + requestInfoIt->second.Done = true; + AddPartitionResponse(EKafkaErrors::UNKNOWN_SERVER_ERROR, requestInfoIt->second.TopicName, requestInfoIt->second.PartitionId, ctx); + } + } +} + +void TKafkaOffsetCommitActor::Handle(NGRpcProxy::V1::TEvPQProxy::TEvAuthResultOk::TPtr& ev, const TActorContext& ctx) { + KAFKA_LOG_D("Auth success. Topics count: " << ev->Get()->TopicAndTablets.size()); + TopicAndTablets = std::move(ev->Get()->TopicAndTablets); + + for (auto topicReq: Message->Topics) { + auto topicIt = TopicAndTablets.find(NormalizePath(Context->DatabasePath, topicReq.Name.value())); + for (auto partitionRequest: topicReq.Partitions) { + if (topicIt == TopicAndTablets.end()) { + AddPartitionResponse(UNKNOWN_TOPIC_OR_PARTITION, topicReq.Name.value(), partitionRequest.PartitionIndex, ctx); + continue; + } + + auto tabletIdIt = topicIt->second.PartitionIdToTabletId.find(partitionRequest.PartitionIndex); + if (tabletIdIt == topicIt->second.PartitionIdToTabletId.end()) { + AddPartitionResponse(UNKNOWN_TOPIC_OR_PARTITION, topicReq.Name.value(), partitionRequest.PartitionIndex, ctx); + continue; + } + + ui64 tabletId = tabletIdIt->second; + + if (!TabletIdToPipe.contains(tabletId)) { + NTabletPipe::TClientConfig clientConfig; + clientConfig.RetryPolicy = RetryPolicyForPipes; + TabletIdToPipe[tabletId] = ctx.Register(NTabletPipe::CreateClient(ctx.SelfID, tabletId, clientConfig)); + } + + NKikimrClient::TPersQueueRequest request; + request.MutablePartitionRequest()->SetTopic(topicIt->second.TopicNameConverter->GetPrimaryPath()); + request.MutablePartitionRequest()->SetPartition(partitionRequest.PartitionIndex); + request.MutablePartitionRequest()->SetCookie(NextCookie); + + TRequestInfo info(topicReq.Name.value(), partitionRequest.PartitionIndex); + + CookieToRequestInfo.emplace(std::make_pair(NextCookie, info)); + TabletIdToCookies[tabletId].push_back(NextCookie); + NextCookie++; + + auto commit = request.MutablePartitionRequest()->MutableCmdSetClientOffset(); + commit->SetClientId(Message->GroupId.value()); + commit->SetOffset(partitionRequest.CommittedOffset); + commit->SetStrict(true); + + PendingResponses++; + KAFKA_LOG_D("Send commit request for group# " << Message->GroupId.value() << + ", topic# " << topicIt->second.TopicNameConverter->GetPrimaryPath() << + ", partition# " << partitionRequest.PartitionIndex << + ", offset# " << partitionRequest.CommittedOffset); + + TAutoPtr req(new TEvPersQueue::TEvRequest); + req->Record.Swap(&request); + + NTabletPipe::SendData(ctx, TabletIdToPipe[tabletId], req.Release()); + } + } +} + +void TKafkaOffsetCommitActor::Handle(TEvPersQueue::TEvResponse::TPtr& ev, const TActorContext& ctx) { + const auto& partitionResult = ev->Get()->Record.GetPartitionResponse(); + auto requestInfo = CookieToRequestInfo.find(partitionResult.GetCookie()); + requestInfo->second.Done = true; + + Y_ABORT_UNLESS(requestInfo != CookieToRequestInfo.end()); + if (ev->Get()->Record.GetErrorCode() != NPersQueue::NErrorCode::OK) { + KAFKA_LOG_CRIT("Commit offset error. status# " << EErrorCode_Name(ev->Get()->Record.GetErrorCode()) << ", reason# " << ev->Get()->Record.GetErrorReason()); } - return response; + AddPartitionResponse(ConvertErrorCode(NGRpcProxy::V1::ConvertOldCode(ev->Get()->Record.GetErrorCode())), requestInfo->second.TopicName, requestInfo->second.PartitionId, ctx); +} + +void TKafkaOffsetCommitActor::AddPartitionResponse(EKafkaErrors error, const TString& topicName, ui64 partitionId, const TActorContext& ctx) { + if (error != NONE_ERROR) { + Error = error; + } + + PendingResponses--; + TOffsetCommitResponseData::TOffsetCommitResponseTopic::TOffsetCommitResponsePartition partitionResponse; + partitionResponse.PartitionIndex = partitionId; + partitionResponse.ErrorCode = error; + + auto topicIdIt = ResponseTopicIds.find(topicName); + + if (topicIdIt != ResponseTopicIds.end()) { + Response->Topics[topicIdIt->second].Partitions.push_back(partitionResponse); + } else { + ResponseTopicIds[topicName] = Response->Topics.size(); + + TOffsetCommitResponseData::TOffsetCommitResponseTopic topicResponse; + topicResponse.Name = topicName; + topicResponse.Partitions.push_back(partitionResponse); + + Response->Topics.push_back(topicResponse); + } + + if (PendingResponses == 0) { + Send(Context->ConnectionId, new TEvKafka::TEvResponse(CorrelationId, Response, Error)); + Die(ctx); + } } void TKafkaOffsetCommitActor::Bootstrap(const NActors::TActorContext& ctx) { - Y_UNUSED(Message); - auto response = GetOffsetCommitResponse(); + THashSet topicsToResolve; + for (auto topicReq: Message->Topics) { + topicsToResolve.insert(NormalizePath(Context->DatabasePath, topicReq.Name.value())); + } - Send(Context->ConnectionId, new TEvKafka::TEvResponse(CorrelationId, response, EKafkaErrors::NONE_ERROR)); - Die(ctx); + auto topicConverterFactory = std::make_shared( + NKikimr::AppData(ctx)->PQConfig, "" + ); + + auto topicHandler = std::make_unique( + topicConverterFactory + ); + + auto topicsToConverter = topicHandler->GetReadTopicsList(topicsToResolve, false, Context->DatabasePath); + if (!topicsToConverter.IsValid) { + KAFKA_LOG_CRIT("Commit offsets failed. reason# topicsToConverter is not valid"); + Error = INVALID_REQUEST; + SendFailedForAllPartitions(Error, ctx); + return; + } + + AuthInitActor = ctx.Register(new NKikimr::NGRpcProxy::V1::TReadInitAndAuthActor( + ctx, ctx.SelfID, Message->GroupId.value(), 0, "", + NKikimr::NMsgBusProxy::CreatePersQueueMetaCacheV2Id(), NKikimr::MakeSchemeCacheID(), nullptr, Context->UserToken, topicsToConverter, + topicHandler->GetLocalCluster(), false) + ); + + Become(&TKafkaOffsetCommitActor::StateWork); } } // NKafka diff --git a/ydb/core/kafka_proxy/actors/kafka_offset_commit_actor.h b/ydb/core/kafka_proxy/actors/kafka_offset_commit_actor.h index c1664975cb25..81eca60bc00e 100644 --- a/ydb/core/kafka_proxy/actors/kafka_offset_commit_actor.h +++ b/ydb/core/kafka_proxy/actors/kafka_offset_commit_actor.h @@ -1,24 +1,92 @@ #include "actors.h" +#include "ydb/core/base/tablet_pipe.h" +#include "ydb/core/grpc_services/local_rpc/local_rpc.h" +#include +#include +#include +#include #include +#include +#include "ydb/public/lib/base/msgbus_status.h" +#include +#include "ydb/services/persqueue_v1/actors/persqueue_utils.h" +#include + namespace NKafka { +using namespace NKikimr; class TKafkaOffsetCommitActor: public NActors::TActorBootstrapped { + +struct TRequestInfo { + TString TopicName = ""; + ui64 PartitionId = 0; + bool Done = false; + + TRequestInfo(const TString& topicName, ui64 partitionId) + : TopicName(topicName), PartitionId(partitionId) {} +}; + public: + using TBase = NActors::TActorBootstrapped; TKafkaOffsetCommitActor(const TContext::TPtr context, const ui64 correlationId, const TMessagePtr& message) : Context(context) , CorrelationId(correlationId) - , Message(message) { + , Message(message) + , Response(new TOffsetCommitResponseData()) { } void Bootstrap(const NActors::TActorContext& ctx); - TOffsetCommitResponseData::TPtr GetOffsetCommitResponse(); + +private: + TString LogPrefix(); + void Die(const TActorContext& ctx) override; + + STATEFN(StateWork) { + KAFKA_LOG_T("Received event: " << (*ev.Get()).GetTypeName()); + switch (ev->GetTypeRewrite()) { + HFunc(NGRpcProxy::V1::TEvPQProxy::TEvAuthResultOk, Handle); + HFunc(TEvPersQueue::TEvResponse, Handle); + HFunc(NKikimr::NGRpcProxy::V1::TEvPQProxy::TEvCloseSession, Handle); + HFunc(TEvTabletPipe::TEvClientConnected, Handle); + HFunc(TEvTabletPipe::TEvClientDestroyed, Handle); + } + } + + void Handle(NGRpcProxy::V1::TEvPQProxy::TEvAuthResultOk::TPtr& ev, const TActorContext& ctx); + void Handle(TEvPersQueue::TEvResponse::TPtr& ev, const TActorContext& ctx); + void Handle(NKikimr::NGRpcProxy::V1::TEvPQProxy::TEvCloseSession::TPtr& ev, const TActorContext& ctx); + void Handle(TEvTabletPipe::TEvClientConnected::TPtr& ev, const TActorContext& ctx); + void Handle(TEvTabletPipe::TEvClientDestroyed::TPtr& ev, const TActorContext& ctx); + + void AddPartitionResponse(EKafkaErrors error, const TString& topicName, ui64 partitionId, const TActorContext& ctx); + void ProcessPipeProblem(ui64 tabletId, const TActorContext& ctx); + void SendFailedForAllPartitions(EKafkaErrors error, const TActorContext& ctx); private: const TContext::TPtr Context; const ui64 CorrelationId; const TMessagePtr Message; + const TOffsetCommitResponseData::TPtr Response; + + ui64 PendingResponses = 0; + ui64 NextCookie = 0; + std::unordered_map> TabletIdToCookies; + std::unordered_map CookieToRequestInfo; + std::unordered_map ResponseTopicIds; + NKikimr::NGRpcProxy::TTopicInitInfoMap TopicAndTablets; + std::unordered_map TabletIdToPipe; + TActorId AuthInitActor; + EKafkaErrors Error = NONE_ERROR; + + static constexpr NTabletPipe::TClientRetryPolicy RetryPolicyForPipes = { + .RetryLimitCount = 6, + .MinRetryTime = TDuration::MilliSeconds(10), + .MaxRetryTime = TDuration::MilliSeconds(100), + .BackoffMultiplier = 2, + .DoFirstRetryInstantly = true + }; }; } // NKafka diff --git a/ydb/core/kafka_proxy/actors/kafka_read_session_actor.cpp b/ydb/core/kafka_proxy/actors/kafka_read_session_actor.cpp index 412016d1e908..c30cab158276 100644 --- a/ydb/core/kafka_proxy/actors/kafka_read_session_actor.cpp +++ b/ydb/core/kafka_proxy/actors/kafka_read_session_actor.cpp @@ -1,8 +1,8 @@ #include "kafka_read_session_actor.h" namespace NKafka { - static constexpr TDuration WAKEUP_INTERVAL = TDuration::Seconds(1); +static constexpr TDuration LOCK_PARTITION_DELAY = TDuration::Seconds(3); static const TString SUPPORTED_ASSIGN_STRATEGY = "roundrobin"; static const TString SUPPORTED_JOIN_GROUP_PROTOCOL = "consumer"; @@ -35,6 +35,20 @@ void TKafkaReadSessionActor::HandleWakeup(TEvKafka::TEvWakeup::TPtr, const TActo CloseReadSession(ctx); return; } + + for (auto& topicToPartitions: NewPartitionsToLockOnTime) { + auto& partitions = topicToPartitions.second; + for (auto partitionsIt = partitions.begin(); partitionsIt != partitions.end(); ) { + if (partitionsIt->LockOn >= ctx.Now()) { + TopicPartitions[topicToPartitions.first].ToLock.emplace(partitionsIt->PartitionId); + NeedRebalance = true; + partitionsIt = partitions.erase(partitionsIt); + } else { + ++partitionsIt; + } + } + } + Schedule(WAKEUP_INTERVAL, new TEvKafka::TEvWakeup()); } @@ -423,6 +437,10 @@ void TKafkaReadSessionActor::AuthAndFindBalancers(const TActorContext& ctx) { ); TopicsToConverter = topicHandler->GetReadTopicsList(TopicsToReadNames, false, Context->DatabasePath); + if (!TopicsToConverter.IsValid) { + SendJoinGroupResponseFail(ctx, JoinGroupCorellationId, INVALID_REQUEST, TStringBuilder() << "topicsToConverter is not valid"); + return; + } ctx.Register(new NGRpcProxy::V1::TReadInitAndAuthActor( ctx, ctx.SelfID, GroupId, Cookie, Session, NMsgBusProxy::CreatePersQueueMetaCacheV2Id(), MakeSchemeCacheID(), nullptr, Context->UserToken, TopicsToConverter, @@ -448,8 +466,9 @@ void TKafkaReadSessionActor::HandleAuthOk(NGRpcProxy::V1::TEvPQProxy::TEvAuthRes TopicsInfo[internalName] = NGRpcProxy::TTopicHolder::FromTopicInfo(t); FullPathToConverter[t.TopicNameConverter->GetPrimaryPath()] = t.TopicNameConverter; FullPathToConverter[t.TopicNameConverter->GetSecondaryPath()] = t.TopicNameConverter; - // savnik: metering mode } + + Send(Context->ConnectionId, new TEvKafka::TEvReadSessionInfo(GroupId)); for (auto& [topicName, topicInfo] : TopicsInfo) { topicInfo.PipeClient = CreatePipeClient(topicInfo.TabletID, ctx); @@ -485,7 +504,7 @@ void TKafkaReadSessionActor::RegisterBalancerSession(const TString& topic, const auto& req = request->Record; req.SetSession(Session); - req.SetClientNode(Context->ClientId); + req.SetClientNode(Context->KafkaClient); ActorIdToProto(pipe, req.MutablePipeClient()); req.SetClientId(GroupId); @@ -496,7 +515,7 @@ void TKafkaReadSessionActor::RegisterBalancerSession(const TString& topic, const NTabletPipe::SendData(ctx, pipe, request.Release()); } -void TKafkaReadSessionActor::HandleLockPartition(TEvPersQueue::TEvLockPartition::TPtr& ev, const TActorContext&) { +void TKafkaReadSessionActor::HandleLockPartition(TEvPersQueue::TEvLockPartition::TPtr& ev, const TActorContext& ctx) { const auto& record = ev->Get()->Record; KAFKA_LOG_D("partition lock is coming from PQRB topic# " << record.GetTopic() << ", partition# " << record.GetPartition()); @@ -527,8 +546,10 @@ void TKafkaReadSessionActor::HandleLockPartition(TEvPersQueue::TEvLockPartition: return; } - TopicPartitions[name].ToLock.emplace(record.GetPartition()); - NeedRebalance = true; + TNewPartitionToLockInfo partitionToLock; + partitionToLock.LockOn = ctx.Now() + LOCK_PARTITION_DELAY; + partitionToLock.PartitionId = record.GetPartition(); + NewPartitionsToLockOnTime[name].push_back(partitionToLock); } void TKafkaReadSessionActor::HandleReleasePartition(TEvPersQueue::TEvReleasePartition::TPtr& ev, const TActorContext& ctx) { @@ -551,12 +572,22 @@ void TKafkaReadSessionActor::HandleReleasePartition(TEvPersQueue::TEvReleasePart return; } + auto newPartitionsToLockIt = NewPartitionsToLockOnTime.find(pathIt->second->GetInternalName()); + auto newPartitionsToLockCount = newPartitionsToLockIt == NewPartitionsToLockOnTime.end() ? 0 : newPartitionsToLockIt->second.size(); + auto topicPartitionsIt = TopicPartitions.find(pathIt->second->GetInternalName()); Y_ABORT_UNLESS(topicPartitionsIt != TopicPartitions.end()); - Y_ABORT_UNLESS(record.GetCount() <= topicPartitionsIt->second.ToLock.size() + topicPartitionsIt->second.ReadingNow.size()); + Y_ABORT_UNLESS(record.GetCount() <= topicPartitionsIt->second.ToLock.size() + topicPartitionsIt->second.ReadingNow.size() + newPartitionsToLockCount); for (ui32 c = 0; c < record.GetCount(); ++c) { // if some partition not locked yet, then release it without rebalance + if (newPartitionsToLockCount > 0) { + newPartitionsToLockCount--; + InformBalancerAboutPartitionRelease(topicInfoIt->first, newPartitionsToLockIt->second.back().PartitionId, ctx); + newPartitionsToLockIt->second.pop_back(); + continue; + } + if (!topicPartitionsIt->second.ToLock.empty()) { auto partitionToReleaseIt = topicPartitionsIt->second.ToLock.begin(); topicPartitionsIt->second.ToLock.erase(partitionToReleaseIt); diff --git a/ydb/core/kafka_proxy/actors/kafka_read_session_actor.h b/ydb/core/kafka_proxy/actors/kafka_read_session_actor.h index 24e38805ae94..45f704a43c71 100644 --- a/ydb/core/kafka_proxy/actors/kafka_read_session_actor.h +++ b/ydb/core/kafka_proxy/actors/kafka_read_session_actor.h @@ -19,37 +19,37 @@ namespace NKafka { * Pipeline: * * client server - * JOIN_GROUP request(topics) - * ----------------> - * JOIN_GROUP response() - * <---------------- + * JOIN_GROUP request(topics) + * ----------------> + * JOIN_GROUP response() + * <---------------- * - * SYNC_GROUP request() - * ----------------> - * SYNC_GROUP response(partitions to read) - * <---------------- + * SYNC_GROUP request() + * ----------------> + * SYNC_GROUP response(partitions to read) + * <---------------- * - * HEARTBEAT request() - * ----------------> - * HEARTBEAT response(status = OK) - * <---------------- + * HEARTBEAT request() + * ----------------> + * HEARTBEAT response(status = OK) + * <---------------- * - * HEARTBEAT request() - * ----------------> - * HEARTBEAT response(status = REBALANCE_IN_PROGRESS) //if partitions to read list changes - * <---------------- + * HEARTBEAT request() + * ----------------> + * HEARTBEAT response(status = REBALANCE_IN_PROGRESS) //if partitions to read list changes + * <---------------- * - * JOIN_GROUP request(topics) //client send again, because REBALANCE_IN_PROGRESS in heartbeat response - * ----------------> + * JOIN_GROUP request(topics) //client send again, because REBALANCE_IN_PROGRESS in heartbeat response + * ----------------> * - * ... - * ... - * ... + * ... + * ... + * ... * - * LEAVE_GROUP request() - * ----------------> - * LEAVE_GROUP response() - * <---------------- + * LEAVE_GROUP request() + * ----------------> + * LEAVE_GROUP response() + * <---------------- */ class TKafkaReadSessionActor: public NActors::TActorBootstrapped { @@ -66,6 +66,11 @@ struct TPartitionsInfo { THashSet ToLock; }; +struct TNewPartitionToLockInfo { + ui64 PartitionId; + TInstant LockOn; +}; + struct TNextRequestError { EKafkaErrors Code = EKafkaErrors::NONE_ERROR; TString Message = ""; @@ -91,25 +96,27 @@ struct TNextRequestError { STATEFN(StateWork) { switch (ev->GetTypeRewrite()) { - //from client + + // from client HFunc(TEvKafka::TEvJoinGroupRequest, HandleJoinGroup); HFunc(TEvKafka::TEvSyncGroupRequest, HandleSyncGroup); HFunc(TEvKafka::TEvLeaveGroupRequest, HandleLeaveGroup); HFunc(TEvKafka::TEvHeartbeatRequest, HandleHeartbeat); - //from TReadInitAndAuthActor + // from TReadInitAndAuthActor HFunc(NGRpcProxy::V1::TEvPQProxy::TEvAuthResultOk, HandleAuthOk); HFunc(NGRpcProxy::V1::TEvPQProxy::TEvCloseSession, HandleAuthCloseSession); - //from PQRB + // from PQRB HFunc(TEvPersQueue::TEvLockPartition, HandleLockPartition); HFunc(TEvPersQueue::TEvReleasePartition, HandleReleasePartition); HFunc(TEvPersQueue::TEvError, HandleBalancerError); - //from Pipe + // from Pipe HFunc(TEvTabletPipe::TEvClientConnected, HandlePipeConnected); HFunc(TEvTabletPipe::TEvClientDestroyed, HandlePipeDestroyed); + // others HFunc(TEvKafka::TEvWakeup, HandleWakeup); SFunc(TEvents::TEvPoison, Die); } @@ -169,6 +176,7 @@ struct TNextRequestError { THashSet TopicsToReadNames; THashMap TopicPartitions; THashMap FullPathToConverter; // PrimaryFullPath -> Converter, for balancer replies matching + THashMap> NewPartitionsToLockOnTime; // Topic -> PartitionsToLock static constexpr NTabletPipe::TClientRetryPolicy RetryPolicyForPipes = { .RetryLimitCount = 21, diff --git a/ydb/core/kafka_proxy/kafka_connection.cpp b/ydb/core/kafka_proxy/kafka_connection.cpp index 39276e817592..57e728f548d5 100644 --- a/ydb/core/kafka_proxy/kafka_connection.cpp +++ b/ydb/core/kafka_proxy/kafka_connection.cpp @@ -322,7 +322,7 @@ class TKafkaConnection: public TActorBootstrapped, public TNet SendRequestMetrics(ctx); if (Request->Header.ClientId.has_value() && Request->Header.ClientId != "") { - Context->ClientId = Request->Header.ClientId.value(); + Context->KafkaClient = Request->Header.ClientId.value(); } switch (Request->Header.RequestApiKey) { @@ -410,6 +410,11 @@ class TKafkaConnection: public TActorBootstrapped, public TNet Reply(r->CorrelationId, r->Response, r->ErrorCode, ctx); } + void Handle(TEvKafka::TEvReadSessionInfo::TPtr readInfo, const TActorContext& /*ctx*/) { + auto r = readInfo->Get(); + Context->GroupId = r->GroupId; + } + void Handle(TEvKafka::TEvAuthResult::TPtr ev, const TActorContext& ctx) { auto event = ev->Get(); @@ -727,6 +732,7 @@ class TKafkaConnection: public TActorBootstrapped, public TNet hFunc(TEvPollerRegisterResult, HandleConnected); HFunc(TEvKafka::TEvResponse, Handle); HFunc(TEvKafka::TEvAuthResult, Handle); + HFunc(TEvKafka::TEvReadSessionInfo, Handle); HFunc(TEvKafka::TEvHandshakeResult, Handle); sFunc(TEvKafka::TEvKillReadSession, HandleKillReadSession); sFunc(NActors::TEvents::TEvPoison, PassAway); diff --git a/ydb/core/kafka_proxy/kafka_events.h b/ydb/core/kafka_proxy/kafka_events.h index ff7126ecee8b..e2132363a33f 100644 --- a/ydb/core/kafka_proxy/kafka_events.h +++ b/ydb/core/kafka_proxy/kafka_events.h @@ -29,6 +29,7 @@ struct TEvKafka { EvKillReadSession, EvCommitedOffsetsResponse, EvCreateTopicsResponse, + EvReadSessionInfo, EvResponse = EvRequest + 256, EvInternalEvents = EvResponse + 256, EvEnd @@ -165,6 +166,14 @@ struct TEvKafka { {} }; + struct TEvReadSessionInfo : public TEventLocal { + TEvReadSessionInfo(const TString& groupId) + : GroupId(groupId) + {} + + TString GroupId; + }; + struct TEvKillReadSession : public TEventLocal {}; struct TEvUpdateHistCounter : public TEventLocal { diff --git a/ydb/core/kafka_proxy/ut/ut_protocol.cpp b/ydb/core/kafka_proxy/ut/ut_protocol.cpp index 94119c907795..d5906865762e 100644 --- a/ydb/core/kafka_proxy/ut/ut_protocol.cpp +++ b/ydb/core/kafka_proxy/ut/ut_protocol.cpp @@ -366,6 +366,30 @@ class TTestClient { return WriteAndRead(header, request); } + TMessagePtr OffsetCommit(TString groupId, std::unordered_map>> topicsToPartions) { + Cerr << ">>>>> TOffsetCommitRequestData\n"; + + TRequestHeaderData header = Header(NKafka::EApiKey::OFFSET_COMMIT, 1); + + TOffsetCommitRequestData request; + request.GroupId = groupId; + + for (const auto& topicToPartitions : topicsToPartions) { + NKafka::TOffsetCommitRequestData::TOffsetCommitRequestTopic topic; + topic.Name = topicToPartitions.first; + + for (auto partitionAndOffset : topicToPartitions.second) { + NKafka::TOffsetCommitRequestData::TOffsetCommitRequestTopic::TOffsetCommitRequestPartition partition; + partition.PartitionIndex = partitionAndOffset.first; + partition.CommittedOffset = partitionAndOffset.second; + topic.Partitions.push_back(partition); + } + request.Topics.push_back(topic); + } + + return WriteAndRead(header, request); + } + TMessagePtr Produce(const TString& topicName, ui32 partition, const TKafkaRecordBatch& batch) { std::vector> msgs; msgs.emplace_back(partition, batch); @@ -1248,16 +1272,18 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) { } // Y_UNIT_TEST(BalanceScenario) - Y_UNIT_TEST(OffsetFetchScenario) { + Y_UNIT_TEST(OffsetCommitAndFetchScenario) { TInsecureTestServer testServer("2"); - TString topicName = "/Root/topic-0-test"; - TString shortTopicName = "topic-0-test"; + TString firstTopicName = "/Root/topic-0-test"; + TString secondTopicName = "/Root/topic-1-test"; + TString shortTopicName = "topic-1-test"; TString notExistsTopicName = "/Root/not-exists"; ui64 minActivePartitions = 10; - TString consumerName = "consumer-0"; - TString consumer1Name = "consumer-1"; + TString firstConsumerName = "consumer-0"; + TString secondConsumerName = "consumer-1"; + TString notExistsConsumerName = "notExists"; TString key = "record-key"; TString value = "record-value"; @@ -1268,10 +1294,23 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) { { auto result = pqClient - .CreateTopic(topicName, + .CreateTopic(firstTopicName, + NYdb::NTopic::TCreateTopicSettings() + .BeginAddConsumer(firstConsumerName).EndAddConsumer() + .BeginAddConsumer(secondConsumerName).EndAddConsumer() + .PartitioningSettings(minActivePartitions, 100)) + .ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false); + UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SUCCESS); + } + + { + auto result = + pqClient + .CreateTopic(secondTopicName, NYdb::NTopic::TCreateTopicSettings() - .BeginAddConsumer(consumerName).EndAddConsumer() - .BeginAddConsumer(consumer1Name).EndAddConsumer() + .BeginAddConsumer(firstConsumerName).EndAddConsumer() + .BeginAddConsumer(secondConsumerName).EndAddConsumer() .PartitioningSettings(minActivePartitions, 100)) .ExtractValueSync(); UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false); @@ -1300,90 +1339,89 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) { UNIT_ASSERT_VALUES_EQUAL(msg->ErrorCode, static_cast(EKafkaErrors::NONE_ERROR)); } + auto recordsCount = 5; { // Produce + TKafkaRecordBatch batch; batch.BaseOffset = 3; batch.BaseSequence = 5; batch.Magic = 2; // Current supported - batch.Records.resize(1); - batch.Records[0].Key = TKafkaRawBytes(key.Data(), key.Size()); - batch.Records[0].Value = TKafkaRawBytes(value.Data(), value.Size()); - batch.Records[0].Headers.resize(1); - batch.Records[0].Headers[0].Key = TKafkaRawBytes(headerKey.Data(), headerKey.Size()); - batch.Records[0].Headers[0].Value = TKafkaRawBytes(headerValue.Data(), headerValue.Size()); + batch.Records.resize(recordsCount); - auto msg = client.Produce(topicName, 0, batch); + for (auto i = 0; i < recordsCount; i++) { + batch.Records[i].Key = TKafkaRawBytes(key.Data(), key.Size()); + batch.Records[i].Value = TKafkaRawBytes(value.Data(), value.Size()); + } - UNIT_ASSERT_VALUES_EQUAL(msg->Responses[0].Name, topicName); + auto msg = client.Produce(firstTopicName, 0, batch); + + UNIT_ASSERT_VALUES_EQUAL(msg->Responses[0].Name, firstTopicName); UNIT_ASSERT_VALUES_EQUAL(msg->Responses[0].PartitionResponses[0].Index, 0); UNIT_ASSERT_VALUES_EQUAL(msg->Responses[0].PartitionResponses[0].ErrorCode, static_cast(EKafkaErrors::NONE_ERROR)); } { - // Commit offset for consumer-0 - auto settings = NTopic::TReadSessionSettings() - .AppendTopics(NTopic::TTopicReadSettings(topicName)) - .ConsumerName("consumer-0"); - auto topicReader = pqClient.CreateReadSession(settings); - - auto m = Read(topicReader); - UNIT_ASSERT_EQUAL(m.size(), 1); - - UNIT_ASSERT_EQUAL(m[0].GetMessages().size(), 1); - auto& m0 = m[0].GetMessages()[0]; - m0.Commit(); - } - - { - // Commit offset for consumer-1 - auto settings = NTopic::TReadSessionSettings() - .AppendTopics(NTopic::TTopicReadSettings(topicName)) - .ConsumerName("consumer-1"); - auto topicReader = pqClient.CreateReadSession(settings); - - auto m = Read(topicReader); - UNIT_ASSERT_EQUAL(m.size(), 1); - - UNIT_ASSERT_EQUAL(m[0].GetMessages().size(), 1); - auto& m0 = m[0].GetMessages()[0]; - m0.Commit(); - } - - { - // Check commited offset after produce + // Fetch offsets std::map> topicsToPartions; - topicsToPartions[topicName] = std::vector{0, 1, 2, 3}; - auto msg = client.OffsetFetch(consumerName, topicsToPartions); + topicsToPartions[firstTopicName] = std::vector{0, 1, 2, 3 }; + auto msg = client.OffsetFetch(firstConsumerName, topicsToPartions); UNIT_ASSERT_VALUES_EQUAL(msg->Groups.size(), 1); UNIT_ASSERT_VALUES_EQUAL(msg->Groups[0].Topics.size(), 1); const auto& partitions = msg->Groups[0].Topics[0].Partitions; UNIT_ASSERT_VALUES_EQUAL(partitions.size(), 4); auto partition0 = std::find_if(partitions.begin(), partitions.end(), [](const auto& partition) { return partition.PartitionIndex == 0; }); UNIT_ASSERT_VALUES_UNEQUAL(partition0, partitions.end()); - UNIT_ASSERT_VALUES_EQUAL(partition0->CommittedOffset, 1); + UNIT_ASSERT_VALUES_EQUAL(partition0->CommittedOffset, 0); + } + + { + // Check commit + std::unordered_map>> offsets; + std::vector> partitionsAndOffsets; + for (ui64 i = 0; i < minActivePartitions; ++i) { + partitionsAndOffsets.emplace_back(std::make_pair(i, recordsCount)); + } + offsets[firstTopicName] = partitionsAndOffsets; + offsets[shortTopicName] = partitionsAndOffsets; + auto msg = client.OffsetCommit(firstConsumerName, offsets); + UNIT_ASSERT_VALUES_EQUAL(msg->Topics.size(), 2); + for (const auto& topic : msg->Topics) { + UNIT_ASSERT_VALUES_EQUAL(topic.Partitions.size(), minActivePartitions); + for (const auto& partition : topic.Partitions) { + if (topic.Name.value() == firstTopicName) { + if (partition.PartitionIndex == 0) { + UNIT_ASSERT_VALUES_EQUAL(partition.ErrorCode, static_cast(EKafkaErrors::NONE_ERROR)); + } else { + UNIT_ASSERT_VALUES_EQUAL(partition.ErrorCode, static_cast(EKafkaErrors::OFFSET_OUT_OF_RANGE)); + } + } else { + UNIT_ASSERT_VALUES_EQUAL(partition.ErrorCode, static_cast(EKafkaErrors::OFFSET_OUT_OF_RANGE)); + } + } + } } { - // Check with short topic name + // Fetch offsets after commit std::map> topicsToPartions; - topicsToPartions[shortTopicName] = std::vector{0, 1, 2, 3}; - auto msg = client.OffsetFetch(consumerName, topicsToPartions); + topicsToPartions[firstTopicName] = std::vector{0, 1, 2 , 3 }; + auto msg = client.OffsetFetch(firstConsumerName, topicsToPartions); UNIT_ASSERT_VALUES_EQUAL(msg->Groups.size(), 1); UNIT_ASSERT_VALUES_EQUAL(msg->Groups[0].Topics.size(), 1); const auto& partitions = msg->Groups[0].Topics[0].Partitions; UNIT_ASSERT_VALUES_EQUAL(partitions.size(), 4); auto partition0 = std::find_if(partitions.begin(), partitions.end(), [](const auto& partition) { return partition.PartitionIndex == 0; }); UNIT_ASSERT_VALUES_UNEQUAL(partition0, partitions.end()); - UNIT_ASSERT_VALUES_EQUAL(partition0->ErrorCode, NONE_ERROR); - UNIT_ASSERT_VALUES_EQUAL(partition0->CommittedOffset, 1); + UNIT_ASSERT_VALUES_EQUAL(partition0->CommittedOffset, 5); } + { - // Check with nonexistent topic + // Check fetch offsets with nonexistent topic std::map> topicsToPartions; - topicsToPartions["nonexTopic"] = std::vector{0, 1}; - auto msg = client.OffsetFetch(consumerName, topicsToPartions); + topicsToPartions[notExistsTopicName] = std::vector{0, 1}; + auto msg = client.OffsetFetch(firstConsumerName, topicsToPartions); UNIT_ASSERT_VALUES_EQUAL(msg->Groups.size(), 1); UNIT_ASSERT_VALUES_EQUAL(msg->Groups[0].Topics.size(), 1); UNIT_ASSERT_VALUES_EQUAL(msg->Groups[0].Topics[0].Partitions.size(), 2); @@ -1393,10 +1431,30 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) { } { - // Check with nonexistent consumer + // Check commit with nonexistent topic + std::unordered_map>> offsets; + std::vector> partitionsAndOffsets; + for (ui64 i = 0; i < minActivePartitions; ++i) { + partitionsAndOffsets.emplace_back(std::make_pair(i, recordsCount)); + } + offsets[firstTopicName] = partitionsAndOffsets; + offsets[notExistsTopicName] = partitionsAndOffsets; + + auto msg = client.OffsetCommit(notExistsConsumerName, offsets); + UNIT_ASSERT_VALUES_EQUAL(msg->Topics.size(), 2); + UNIT_ASSERT_VALUES_EQUAL(msg->Topics.back().Partitions.size(), minActivePartitions); + for (const auto& topic : msg->Topics) { + for (const auto& partition : topic.Partitions) { + UNIT_ASSERT_VALUES_EQUAL(partition.ErrorCode, static_cast(EKafkaErrors::INVALID_REQUEST)); + } + } + } + + { + // Check fetch offsets nonexistent consumer std::map> topicsToPartions; - topicsToPartions[topicName] = std::vector{0, 1}; - auto msg = client.OffsetFetch("nonexConsumer", topicsToPartions); + topicsToPartions[firstTopicName] = std::vector{0, 1}; + auto msg = client.OffsetFetch(notExistsConsumerName, topicsToPartions); UNIT_ASSERT_VALUES_EQUAL(msg->Groups.size(), 1); UNIT_ASSERT_VALUES_EQUAL(msg->Groups[0].Topics.size(), 1); UNIT_ASSERT_VALUES_EQUAL(msg->Groups[0].Topics[0].Partitions.size(), 2); @@ -1406,22 +1464,45 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) { } { - // Check with 2 consumers + // Check commit with nonexistent consumer + std::unordered_map>> offsets; + std::vector> partitionsAndOffsets; + for (ui64 i = 0; i < minActivePartitions; ++i) { + partitionsAndOffsets.emplace_back(std::make_pair(i, recordsCount)); + } + offsets[firstTopicName] = partitionsAndOffsets; + + auto msg = client.OffsetCommit(notExistsConsumerName, offsets); + UNIT_ASSERT_VALUES_EQUAL(msg->Topics.size(), 1); + UNIT_ASSERT_VALUES_EQUAL(msg->Topics.back().Partitions.size(), minActivePartitions); + for (const auto& topic : msg->Topics) { + for (const auto& partition : topic.Partitions) { + UNIT_ASSERT_VALUES_EQUAL(partition.ErrorCode, static_cast(EKafkaErrors::INVALID_REQUEST)); + } + } + } + + { + // Check fetch offsets with 2 consumers and topics TOffsetFetchRequestData request; TOffsetFetchRequestData::TOffsetFetchRequestGroup::TOffsetFetchRequestTopics topic; - topic.Name = topicName; + topic.Name = firstTopicName; auto partitionIndexes = std::vector{0}; topic.PartitionIndexes = partitionIndexes; + TOffsetFetchRequestData::TOffsetFetchRequestGroup::TOffsetFetchRequestTopics shortTopic; + shortTopic.Name = shortTopicName; + shortTopic.PartitionIndexes = partitionIndexes; + TOffsetFetchRequestData::TOffsetFetchRequestGroup group0; - group0.GroupId = consumerName; + group0.GroupId = firstConsumerName; group0.Topics.push_back(topic); request.Groups.push_back(group0); TOffsetFetchRequestData::TOffsetFetchRequestGroup group1; - group1.GroupId = consumer1Name; - group1.Topics.push_back(topic); + group1.GroupId = secondConsumerName; + group1.Topics.push_back(shortTopic); request.Groups.push_back(group1); auto msg = client.OffsetFetch(request); @@ -1430,7 +1511,11 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) { for (const auto& group: msg->Groups) { UNIT_ASSERT_VALUES_EQUAL(group.Topics.size(), 1); UNIT_ASSERT_VALUES_EQUAL(group.Topics[0].Partitions.size(), 1); - UNIT_ASSERT_VALUES_EQUAL(group.Topics[0].Partitions[0].CommittedOffset, 1); + if (group.GroupId == firstConsumerName) { + UNIT_ASSERT_VALUES_EQUAL(group.Topics[0].Partitions[0].CommittedOffset, 5); + } else if (group.GroupId == secondConsumerName) { + UNIT_ASSERT_VALUES_EQUAL(group.Topics[0].Partitions[0].CommittedOffset, 0); + } UNIT_ASSERT_VALUES_EQUAL(group.Topics[0].Partitions[0].ErrorCode, NONE_ERROR); } } diff --git a/ydb/core/persqueue/fetch_request_actor.cpp b/ydb/core/persqueue/fetch_request_actor.cpp index 52b217590cdf..e2d4d3a39ff4 100644 --- a/ydb/core/persqueue/fetch_request_actor.cpp +++ b/ydb/core/persqueue/fetch_request_actor.cpp @@ -6,7 +6,9 @@ #include #include +#include #include +#include #include @@ -21,7 +23,7 @@ using namespace NSchemeCache; namespace { - const ui32 DefaultTimeout = 30000; + static constexpr TDuration DefaultTimeout = TDuration::MilliSeconds(30000); } struct TTabletInfo { // ToDo !! remove @@ -52,7 +54,22 @@ struct TTopicInfo { using namespace NActors; -class TPQFetchRequestActor : public TActorBootstrapped { +class TPQFetchRequestActor : public TActorBootstrapped + , private TRlHelpers { + +struct TEvPrivate { + enum EEv { + EvTimeout = EventSpaceBegin(NActors::TEvents::ES_PRIVATE), + EvEnd + }; + + static_assert(EvEnd < EventSpaceEnd(NActors::TEvents::ES_PRIVATE), "expect EvEnd < EventSpaceEnd(NActors::TEvents::ES_PRIVATE)"); + + struct TEvTimeout : NActors::TEventLocal { + }; + +}; + private: TFetchRequestSettings Settings; @@ -74,6 +91,7 @@ class TPQFetchRequestActor : public TActorBootstrapped { ui32 PartTabletsRequested; TString ErrorReason; TActorId RequesterId; + ui64 PendingQuotaAmount; public: static constexpr NKikimrServices::TActivity::EType ActorActivityType() { @@ -81,7 +99,8 @@ class TPQFetchRequestActor : public TActorBootstrapped { } TPQFetchRequestActor(const TFetchRequestSettings& settings, const TActorId& schemeCacheId, const TActorId& requesterId) - : Settings(settings) + : TRlHelpers({}, settings.RlCtx, 8_KB, false, TDuration::Seconds(1)) + , Settings(settings) , CanProcessFetchRequest(false) , FetchRequestReadsDone(0) , FetchRequestCurrentReadTablet(0) @@ -115,7 +134,25 @@ class TPQFetchRequestActor : public TActorBootstrapped { TopicInfo[path].FetchInfo[p.Partition] = fetchInfo; } } - // FIXME(savnik) handle request timeout + + void Handle(TEvents::TEvWakeup::TPtr& ev, const TActorContext& ctx) { + const auto tag = static_cast(ev->Get()->Tag); + OnWakeup(tag); + switch (tag) { + case EWakeupTag::RlAllowed: + ProceedFetchRequest(ctx); + PendingQuotaAmount = 0; + break; + + case EWakeupTag::RlNoResource: + // Re-requesting the quota. We do this until we get a quota. + RequestDataQuota(PendingQuotaAmount, ctx); + break; + + default: + Y_VERIFY_DEBUG_S(false, "Unsupported tag: " << static_cast(tag)); + } + } void Bootstrap(const TActorContext& ctx) { LOG_INFO_S(ctx, NKikimrServices::PQ_FETCH_REQUEST, "Fetch request actor boostrapped. Request is valid: " << (!Response)); @@ -128,7 +165,8 @@ class TPQFetchRequestActor : public TActorBootstrapped { ctx.Schedule(TDuration::MilliSeconds(Min(Settings.MaxWaitTimeMs, 30000)), new TEvPersQueue::TEvHasDataInfoResponse); SendSchemeCacheRequest(ctx); - Become(&TPQFetchRequestActor::StateFunc, ctx, TDuration::MilliSeconds(DefaultTimeout), new TEvents::TEvWakeup()); + Schedule(DefaultTimeout, new TEvPrivate::TEvTimeout()); + Become(&TPQFetchRequestActor::StateFunc); } void SendSchemeCacheRequest(const TActorContext& ctx) { @@ -159,7 +197,6 @@ class TPQFetchRequestActor : public TActorBootstrapped { void HandleSchemeCacheResponse(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev, const TActorContext& ctx) { LOG_DEBUG_S(ctx, NKikimrServices::PQ_FETCH_REQUEST, "Handle SchemeCache response"); auto& result = ev->Get()->Request; - for (const auto& entry : result->ResultSet) { auto path = CanonizePath(NKikimr::JoinPath(entry.Path)); switch (entry.Status) { @@ -333,6 +370,7 @@ class TPQFetchRequestActor : public TActorBootstrapped { for (auto& actor: PQClient) { NTabletPipe::CloseClient(ctx, actor); } + TRlHelpers::PassAway(SelfId()); TActorBootstrapped::Die(ctx); } @@ -361,6 +399,7 @@ class TPQFetchRequestActor : public TActorBootstrapped { const auto& part = req.Partition; const auto& maxBytes = req.MaxBytes; const auto& readTimestampMs = req.ReadTimestampMs; + const auto& clientId = req.ClientId; auto it = TopicInfo.find(CanonizePath(topic)); Y_ABORT_UNLESS(it != TopicInfo.end()); if (it->second.PartitionToTablet.find(part) == it->second.PartitionToTablet.end()) { @@ -388,7 +427,7 @@ class TPQFetchRequestActor : public TActorBootstrapped { partReq->SetTopic(topic); partReq->SetPartition(part); auto read = partReq->MutableCmdRead(); - read->SetClientId(NKikimr::NPQ::CLIENTID_WITHOUT_CONSUMER); + read->SetClientId(clientId); read->SetOffset(offset); read->SetCount(1000000); read->SetTimeoutMs(0); @@ -406,6 +445,7 @@ class TPQFetchRequestActor : public TActorBootstrapped { return; } + if (FetchRequestBytesLeft >= (ui32)record.ByteSize()) FetchRequestBytesLeft -= (ui32)record.ByteSize(); else @@ -432,7 +472,32 @@ class TPQFetchRequestActor : public TActorBootstrapped { read->SetErrorReason(record.GetErrorReason()); ++FetchRequestReadsDone; - ProceedFetchRequest(ctx); + + auto it = TopicInfo.find(CanonizePath(topic)); + Y_ABORT_UNLESS(it != TopicInfo.end()); + + SetMeteringMode(it->second.PQInfo->Description.GetPQTabletConfig().GetMeteringMode()); + + if (IsQuotaRequired()) { + PendingQuotaAmount = CalcRuConsumption(GetPayloadSize(record)); + RequestDataQuota(PendingQuotaAmount, ctx); + } else { + ProceedFetchRequest(ctx); + } + + } + + ui64 GetPayloadSize(const NKikimrClient::TResponse& record) const { + ui64 readBytesSize = 0; + const auto& response = record.GetPartitionResponse(); + if (response.HasCmdReadResult()) { + const auto& results = response.GetCmdReadResult().GetResult(); + for (auto& r : results) { + auto proto(NKikimr::GetDeserializedData(r.GetData())); + readBytesSize += proto.GetData().Size(); + } + } + return readBytesSize; } bool CheckAccess(const TSecurityObject& access) { @@ -465,9 +530,10 @@ class TPQFetchRequestActor : public TActorBootstrapped { HFunc(TEvTabletPipe::TEvClientDestroyed, Handle); HFunc(TEvTabletPipe::TEvClientConnected, Handle); HFunc(TEvPersQueue::TEvResponse, Handle); + HFunc(TEvents::TEvWakeup, Handle); HFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, HandleSchemeCacheResponse); HFunc(TEvPersQueue::TEvHasDataInfoResponse, Handle); - CFunc(TEvents::TSystem::Wakeup, HandleTimeout); + CFunc(TEvPrivate::EvTimeout, HandleTimeout); CFunc(NActors::TEvents::TSystem::PoisonPill, Die); ) }; diff --git a/ydb/core/persqueue/fetch_request_actor.h b/ydb/core/persqueue/fetch_request_actor.h index e9b55e62576d..a5f972267f57 100644 --- a/ydb/core/persqueue/fetch_request_actor.h +++ b/ydb/core/persqueue/fetch_request_actor.h @@ -1,5 +1,6 @@ #pragma once +#include #include #include @@ -7,13 +8,15 @@ namespace NKikimr::NPQ { struct TPartitionFetchRequest { TString Topic; + TString ClientId; ui32 Partition; ui64 Offset; ui64 MaxBytes; ui64 ReadTimestampMs; - TPartitionFetchRequest(const TString& topic, ui32 partition, ui64 offset, ui64 maxBytes, ui64 readTimestampMs = 0) + TPartitionFetchRequest(const TString& topic, const TString& clientId, ui32 partition, ui64 offset, ui64 maxBytes, ui64 readTimestampMs = 0) : Topic(topic) + , ClientId(clientId) , Partition(partition) , Offset(offset) , MaxBytes(maxBytes) @@ -29,10 +32,11 @@ struct TFetchRequestSettings { TMaybe User; ui64 MaxWaitTimeMs; ui64 TotalMaxBytes; + TRlContext RlCtx; ui64 RequestId = 0; TFetchRequestSettings( - const TString& database, const TVector& partitions, ui64 maxWaitTimeMs, ui64 totalMaxBytes, + const TString& database, const TVector& partitions, ui64 maxWaitTimeMs, ui64 totalMaxBytes, TRlContext rlCtx, const TMaybe& user = {}, ui64 requestId = 0 ) : Database(database) @@ -40,6 +44,7 @@ struct TFetchRequestSettings { , User(user) , MaxWaitTimeMs(maxWaitTimeMs) , TotalMaxBytes(totalMaxBytes) + , RlCtx(rlCtx) , RequestId(requestId) {} }; diff --git a/ydb/core/persqueue/partition.cpp b/ydb/core/persqueue/partition.cpp index 64aebab88e6b..b996667d5386 100644 --- a/ydb/core/persqueue/partition.cpp +++ b/ydb/core/persqueue/partition.cpp @@ -644,7 +644,7 @@ void TPartition::Handle(TEvPQ::TEvPartitionStatus::TPtr& ev, const TActorContext auto* clientInfo = result.AddConsumerResult(); clientInfo->SetConsumer(userInfo.User); clientInfo->set_errorcode(NPersQueue::NErrorCode::EErrorCode::OK); - clientInfo->SetCommitedOffset(userInfo.GetReadOffset()); + clientInfo->SetCommitedOffset(userInfo.Offset); requiredConsumers.extract(userInfo.User); } continue; diff --git a/ydb/core/persqueue/writer/writer.cpp b/ydb/core/persqueue/writer/writer.cpp index 645f1ec47f7a..1cc4ab2e9bc8 100644 --- a/ydb/core/persqueue/writer/writer.cpp +++ b/ydb/core/persqueue/writer/writer.cpp @@ -735,6 +735,7 @@ class TPartitionWriter: public TActorBootstrapped, private TRl NTabletPipe::CloseAndForgetClient(SelfId(), PipeClient); } SendError("Unexpected termination"); + TRlHelpers::PassAway(SelfId()); TActorBootstrapped::PassAway(); } diff --git a/ydb/services/datastreams/datastreams_proxy.cpp b/ydb/services/datastreams/datastreams_proxy.cpp index 143f4ec2a593..8b2e373acf28 100644 --- a/ydb/services/datastreams/datastreams_proxy.cpp +++ b/ydb/services/datastreams/datastreams_proxy.cpp @@ -1560,6 +1560,7 @@ namespace NKikimr::NDataStreams::V1 { void TGetRecordsActor::Die(const TActorContext& ctx) { NTabletPipe::CloseClient(ctx, PipeClient); + TRlHelpers::PassAway(SelfId()); TBase::Die(ctx); } diff --git a/ydb/services/datastreams/put_records_actor.h b/ydb/services/datastreams/put_records_actor.h index d3e466c463d4..772dab0aec89 100644 --- a/ydb/services/datastreams/put_records_actor.h +++ b/ydb/services/datastreams/put_records_actor.h @@ -224,6 +224,7 @@ namespace NKikimr::NDataStreams::V1 { void Bootstrap(const NActors::TActorContext &ctx); void PreparePartitionActors(const NActors::TActorContext& ctx); void HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev); + void Die(const TActorContext& ctx) override; protected: void Write(const TActorContext& ctx); @@ -258,6 +259,12 @@ namespace NKikimr::NDataStreams::V1 { } }; + template + void TPutRecordsActorBase::Die(const TActorContext& ctx) { + TRlHelpers::PassAway(TDerived::SelfId()); + TBase::Die(ctx); + } + template TPutRecordsActorBase::TPutRecordsActorBase(NGRpcService::IRequestOpCtx* request) : TBase(request, dynamic_cast(request->GetRequest())->stream_name()) diff --git a/ydb/services/persqueue_v1/actors/direct_read_actor.cpp b/ydb/services/persqueue_v1/actors/direct_read_actor.cpp index 0f53ebc81923..0e1fcf04142e 100644 --- a/ydb/services/persqueue_v1/actors/direct_read_actor.cpp +++ b/ydb/services/persqueue_v1/actors/direct_read_actor.cpp @@ -154,7 +154,7 @@ void TDirectReadSessionActor::Die(const TActorContext& ctx) { LOG_INFO_S(ctx, NKikimrServices::PQ_READ_PROXY, LOG_PREFIX << " proxy is DEAD"); ctx.Send(GetPQReadServiceActorID(), new TEvPQProxy::TEvSessionDead(Cookie)); ctx.Send(NPQ::MakePQDReadCacheServiceActorId(), new TEvPQProxy::TEvDirectReadDataSessionDead(Session)); - + TRlHelpers::PassAway(SelfId()); TActorBootstrapped::Die(ctx); } diff --git a/ydb/services/persqueue_v1/actors/read_session_actor.ipp b/ydb/services/persqueue_v1/actors/read_session_actor.ipp index d4532610d3c4..e624997c832c 100644 --- a/ydb/services/persqueue_v1/actors/read_session_actor.ipp +++ b/ydb/services/persqueue_v1/actors/read_session_actor.ipp @@ -369,7 +369,7 @@ void TReadSessionActor::Die(const TActorContext& ctx) { LOG_INFO_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " is DEAD"); ctx.Send(GetPQReadServiceActorID(), new TEvPQProxy::TEvSessionDead(Cookie)); - + TRlHelpers::PassAway(TActorBootstrapped::SelfId()); TActorBootstrapped::Die(ctx); } diff --git a/ydb/services/persqueue_v1/actors/write_session_actor.ipp b/ydb/services/persqueue_v1/actors/write_session_actor.ipp index c757b6e6424d..facb42ce47c3 100644 --- a/ydb/services/persqueue_v1/actors/write_session_actor.ipp +++ b/ydb/services/persqueue_v1/actors/write_session_actor.ipp @@ -299,7 +299,7 @@ void TWriteSessionActor::Die(const TActorContext& ctx) { } State = ES_DYING; - + TRlHelpers::PassAway(TActorBootstrapped::SelfId()); TActorBootstrapped::Die(ctx); }