From 64f7ed87df6dfca1954c5afdd9777e18b6451a2e Mon Sep 17 00:00:00 2001 From: Nikita Saveliev Date: Mon, 1 Apr 2024 10:32:12 +0000 Subject: [PATCH 1/2] Fix kafka counters --- ydb/core/kafka_proxy/actors/kafka_produce_actor.cpp | 1 + ydb/core/persqueue/fetch_request_actor.cpp | 1 + 2 files changed, 2 insertions(+) diff --git a/ydb/core/kafka_proxy/actors/kafka_produce_actor.cpp b/ydb/core/kafka_proxy/actors/kafka_produce_actor.cpp index 4829933de3e7..452beeadc61d 100644 --- a/ydb/core/kafka_proxy/actors/kafka_produce_actor.cpp +++ b/ydb/core/kafka_proxy/actors/kafka_produce_actor.cpp @@ -294,6 +294,7 @@ THolder Convert(const TProduceRequestData:: w->SetUncompressedSize(record.Value ? record.Value->size() : 0); w->SetClientDC(clientDC); w->SetIgnoreQuotaDeadline(true); + w->SetExternalOperation(true); totalSize += record.Value ? record.Value->size() : 0; } diff --git a/ydb/core/persqueue/fetch_request_actor.cpp b/ydb/core/persqueue/fetch_request_actor.cpp index e2d4d3a39ff4..45c0195f12df 100644 --- a/ydb/core/persqueue/fetch_request_actor.cpp +++ b/ydb/core/persqueue/fetch_request_actor.cpp @@ -433,6 +433,7 @@ struct TEvPrivate { read->SetTimeoutMs(0); read->SetBytes(Min(maxBytes, FetchRequestBytesLeft)); read->SetReadTimestampMs(readTimestampMs); + read->SetExternalOperation(true); NTabletPipe::SendData(ctx, jt->second.PipeClient, preq.Release()); } From 0912446dd7f290b869debbdf44f2d92fe9ec6119 Mon Sep 17 00:00:00 2001 From: Nikita Saveliev Date: Wed, 17 Apr 2024 14:22:06 +0000 Subject: [PATCH 2/2] Fix RU --- ydb/core/persqueue/fetch_request_actor.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ydb/core/persqueue/fetch_request_actor.cpp b/ydb/core/persqueue/fetch_request_actor.cpp index 45c0195f12df..4ddd00ac3b6b 100644 --- a/ydb/core/persqueue/fetch_request_actor.cpp +++ b/ydb/core/persqueue/fetch_request_actor.cpp @@ -480,7 +480,7 @@ struct TEvPrivate { SetMeteringMode(it->second.PQInfo->Description.GetPQTabletConfig().GetMeteringMode()); if (IsQuotaRequired()) { - PendingQuotaAmount = CalcRuConsumption(GetPayloadSize(record)); + PendingQuotaAmount = 1 + CalcRuConsumption(GetPayloadSize(record)); RequestDataQuota(PendingQuotaAmount, ctx); } else { ProceedFetchRequest(ctx);