diff --git a/ydb/core/kafka_proxy/actors/kafka_produce_actor.cpp b/ydb/core/kafka_proxy/actors/kafka_produce_actor.cpp index 4829933de3e7..452beeadc61d 100644 --- a/ydb/core/kafka_proxy/actors/kafka_produce_actor.cpp +++ b/ydb/core/kafka_proxy/actors/kafka_produce_actor.cpp @@ -294,6 +294,7 @@ THolder Convert(const TProduceRequestData:: w->SetUncompressedSize(record.Value ? record.Value->size() : 0); w->SetClientDC(clientDC); w->SetIgnoreQuotaDeadline(true); + w->SetExternalOperation(true); totalSize += record.Value ? record.Value->size() : 0; } diff --git a/ydb/core/persqueue/fetch_request_actor.cpp b/ydb/core/persqueue/fetch_request_actor.cpp index e2d4d3a39ff4..4ddd00ac3b6b 100644 --- a/ydb/core/persqueue/fetch_request_actor.cpp +++ b/ydb/core/persqueue/fetch_request_actor.cpp @@ -433,6 +433,7 @@ struct TEvPrivate { read->SetTimeoutMs(0); read->SetBytes(Min(maxBytes, FetchRequestBytesLeft)); read->SetReadTimestampMs(readTimestampMs); + read->SetExternalOperation(true); NTabletPipe::SendData(ctx, jt->second.PipeClient, preq.Release()); } @@ -479,7 +480,7 @@ struct TEvPrivate { SetMeteringMode(it->second.PQInfo->Description.GetPQTabletConfig().GetMeteringMode()); if (IsQuotaRequired()) { - PendingQuotaAmount = CalcRuConsumption(GetPayloadSize(record)); + PendingQuotaAmount = 1 + CalcRuConsumption(GetPayloadSize(record)); RequestDataQuota(PendingQuotaAmount, ctx); } else { ProceedFetchRequest(ctx);