Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid removing empty message #10167

Merged
merged 1 commit into from
Oct 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ydb/services/deprecated/persqueue_v0/grpc_pq_actor.h
Original file line number Diff line number Diff line change
Expand Up @@ -726,7 +726,7 @@ class TReadSessionActor : public TActorBootstrapped<TReadSessionActor> {
static ui32 NormalizeMaxReadSize(ui32 sourceValue);
static ui32 NormalizeMaxReadPartitionsCount(ui32 sourceValue);

static bool RemoveEmptyMessages(NPersQueue::TReadResponse::TBatchedData& data); // returns true if there are nonempty messages
static bool HasMessages(const NPersQueue::TReadResponse::TBatchedData& data); // returns true if there are any messages

private:
IReadSessionHandlerRef Handler;
Expand Down
29 changes: 9 additions & 20 deletions ydb/services/deprecated/persqueue_v0/grpc_pq_read_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1525,7 +1525,7 @@ bool TReadSessionActor::ProcessAnswer(const TActorContext& ctx, TFormedReadRespo

Y_ABORT_UNLESS(formedResponse->RequestsInfly == 0);
i64 diff = formedResponse->Response.ByteSize();
const bool hasMessages = RemoveEmptyMessages(*formedResponse->Response.MutableBatchedData());
const bool hasMessages = HasMessages(formedResponse->Response.GetBatchedData());
if (hasMessages) {
LOG_DEBUG_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " assign read id " << ReadIdToResponse << " to read request " << formedResponse->Guid);
formedResponse->Response.MutableBatchedData()->SetCookie(ReadIdToResponse);
Expand Down Expand Up @@ -1758,26 +1758,15 @@ void TReadSessionActor::HandleWakeup(const TActorContext& ctx) {
}
}

bool TReadSessionActor::RemoveEmptyMessages(TReadResponse::TBatchedData& data) {
bool hasNonEmptyMessages = false;
auto isMessageEmpty = [&](TReadResponse::TBatchedData::TMessageData& message) -> bool {
if (message.GetData().empty()) {
return true;
} else {
hasNonEmptyMessages = true;
return false;
bool TReadSessionActor::HasMessages(const TReadResponse::TBatchedData& data) {
for (const auto& partData : data.GetPartitionData()) {
for (const auto& batch : partData.GetBatch()) {
if (batch.MessageDataSize() > 0) {
return true;
}
}
};
auto batchRemover = [&](TReadResponse::TBatchedData::TBatch& batch) -> bool {
NProtoBuf::RemoveRepeatedFieldItemIf(batch.MutableMessageData(), isMessageEmpty);
return batch.MessageDataSize() == 0;
};
auto partitionDataRemover = [&](TReadResponse::TBatchedData::TPartitionData& partition) -> bool {
NProtoBuf::RemoveRepeatedFieldItemIf(partition.MutableBatch(), batchRemover);
return partition.BatchSize() == 0;
};
NProtoBuf::RemoveRepeatedFieldItemIf(data.MutablePartitionData(), partitionDataRemover);
return hasNonEmptyMessages;
}
return false;
}


Expand Down
38 changes: 18 additions & 20 deletions ydb/services/persqueue_v1/actors/helpers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,29 +4,27 @@

namespace NKikimr::NGRpcProxy::V1 {

bool RemoveEmptyMessages(PersQueue::V1::MigrationStreamingReadServerMessage::DataBatch& data) {
auto batchRemover = [&](PersQueue::V1::MigrationStreamingReadServerMessage::DataBatch::Batch& batch) -> bool {
return batch.message_data_size() == 0;
};
auto partitionDataRemover = [&](PersQueue::V1::MigrationStreamingReadServerMessage::DataBatch::PartitionData& partition) -> bool {
NProtoBuf::RemoveRepeatedFieldItemIf(partition.mutable_batches(), batchRemover);
return partition.batches_size() == 0;
};
NProtoBuf::RemoveRepeatedFieldItemIf(data.mutable_partition_data(), partitionDataRemover);
return !data.partition_data().empty();
bool HasMessages(const PersQueue::V1::MigrationStreamingReadServerMessage::DataBatch& data) {
for (const auto& partData : data.partition_data()) {
for (const auto& batch : partData.batches()) {
if (batch.message_data_size() > 0) {
return true;
}
}
}
return false;
}

// TODO: remove after refactor
bool RemoveEmptyMessages(Topic::StreamReadMessage::ReadResponse& data) {
auto batchRemover = [&](Topic::StreamReadMessage::ReadResponse::Batch& batch) -> bool {
return batch.message_data_size() == 0;
};
auto partitionDataRemover = [&](Topic::StreamReadMessage::ReadResponse::PartitionData& partition) -> bool {
NProtoBuf::RemoveRepeatedFieldItemIf(partition.mutable_batches(), batchRemover);
return partition.batches_size() == 0;
};
NProtoBuf::RemoveRepeatedFieldItemIf(data.mutable_partition_data(), partitionDataRemover);
return !data.partition_data().empty();
bool HasMessages(const Topic::StreamReadMessage::ReadResponse& data) {
for (const auto& partData : data.partition_data()) {
for (const auto& batch : partData.batches()) {
if (batch.message_data_size() > 0) {
return true;
}
}
}
return false;
}

}
4 changes: 2 additions & 2 deletions ydb/services/persqueue_v1/actors/helpers.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ static constexpr ui64 READ_BLOCK_SIZE = 8_KB; // metering

using namespace Ydb;

bool RemoveEmptyMessages(PersQueue::V1::MigrationStreamingReadServerMessage::DataBatch& data);
bool HasMessages(const PersQueue::V1::MigrationStreamingReadServerMessage::DataBatch& data);

bool RemoveEmptyMessages(Topic::StreamReadMessage::ReadResponse& data);
bool HasMessages(const Topic::StreamReadMessage::ReadResponse& data);

}
4 changes: 2 additions & 2 deletions ydb/services/persqueue_v1/actors/read_session_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1913,9 +1913,9 @@ ui64 TReadSessionActor<UseMigrationProtocol>::PrepareResponse(typename TFormedRe
formedResponse->ByteSizeBeforeFiltering = formedResponse->Response.ByteSize();

if constexpr (UseMigrationProtocol) {
formedResponse->HasMessages = RemoveEmptyMessages(*formedResponse->Response.mutable_data_batch());
formedResponse->HasMessages = HasMessages(formedResponse->Response.data_batch());
} else {
formedResponse->HasMessages = RemoveEmptyMessages(*formedResponse->Response.mutable_read_response());
formedResponse->HasMessages = HasMessages(formedResponse->Response.read_response());
}

return formedResponse->HasMessages ? formedResponse->Response.ByteSize() : 0;
Expand Down
Loading