Skip to content

Commit

Permalink
Merge 3725fad into 5440a6b
Browse files Browse the repository at this point in the history
  • Loading branch information
FloatingCrowbar authored Oct 7, 2024
2 parents 5440a6b + 3725fad commit 0676eae
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 39 deletions.
2 changes: 1 addition & 1 deletion ydb/services/deprecated/persqueue_v0/grpc_pq_actor.h
Original file line number Diff line number Diff line change
Expand Up @@ -726,7 +726,7 @@ class TReadSessionActor : public TActorBootstrapped<TReadSessionActor> {
static ui32 NormalizeMaxReadSize(ui32 sourceValue);
static ui32 NormalizeMaxReadPartitionsCount(ui32 sourceValue);

static bool RemoveEmptyMessages(NPersQueue::TReadResponse::TBatchedData& data); // returns true if there are nonempty messages
static bool HasMessages(const NPersQueue::TReadResponse::TBatchedData& data); // returns true if there are any messages

private:
IReadSessionHandlerRef Handler;
Expand Down
29 changes: 9 additions & 20 deletions ydb/services/deprecated/persqueue_v0/grpc_pq_read_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1525,7 +1525,7 @@ bool TReadSessionActor::ProcessAnswer(const TActorContext& ctx, TFormedReadRespo

Y_ABORT_UNLESS(formedResponse->RequestsInfly == 0);
i64 diff = formedResponse->Response.ByteSize();
const bool hasMessages = RemoveEmptyMessages(*formedResponse->Response.MutableBatchedData());
const bool hasMessages = HasMessages(formedResponse->Response.GetBatchedData());
if (hasMessages) {
LOG_DEBUG_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " assign read id " << ReadIdToResponse << " to read request " << formedResponse->Guid);
formedResponse->Response.MutableBatchedData()->SetCookie(ReadIdToResponse);
Expand Down Expand Up @@ -1758,26 +1758,15 @@ void TReadSessionActor::HandleWakeup(const TActorContext& ctx) {
}
}

bool TReadSessionActor::RemoveEmptyMessages(TReadResponse::TBatchedData& data) {
bool hasNonEmptyMessages = false;
auto isMessageEmpty = [&](TReadResponse::TBatchedData::TMessageData& message) -> bool {
if (message.GetData().empty()) {
return true;
} else {
hasNonEmptyMessages = true;
return false;
bool TReadSessionActor::HasMessages(const TReadResponse::TBatchedData& data) {
for (const auto& partData : data.GetPartitionData()) {
for (const auto& batch : partData.GetBatch()) {
if (batch.MessageDataSize() > 0) {
return true;
}
}
};
auto batchRemover = [&](TReadResponse::TBatchedData::TBatch& batch) -> bool {
NProtoBuf::RemoveRepeatedFieldItemIf(batch.MutableMessageData(), isMessageEmpty);
return batch.MessageDataSize() == 0;
};
auto partitionDataRemover = [&](TReadResponse::TBatchedData::TPartitionData& partition) -> bool {
NProtoBuf::RemoveRepeatedFieldItemIf(partition.MutableBatch(), batchRemover);
return partition.BatchSize() == 0;
};
NProtoBuf::RemoveRepeatedFieldItemIf(data.MutablePartitionData(), partitionDataRemover);
return hasNonEmptyMessages;
}
return false;
}


Expand Down
Binary file not shown.
46 changes: 28 additions & 18 deletions ydb/services/persqueue_v1/actors/helpers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,29 +4,39 @@

namespace NKikimr::NGRpcProxy::V1 {

/*
for (const auto& partData : data.GetPartitionData()) {
for (const auto& batch : partData.GetBatch()) {
if (batch.MessageDataSize() > 0) {
return true;
}
}
}
return false;
*/

bool RemoveEmptyMessages(PersQueue::V1::MigrationStreamingReadServerMessage::DataBatch& data) {
auto batchRemover = [&](PersQueue::V1::MigrationStreamingReadServerMessage::DataBatch::Batch& batch) -> bool {
return batch.message_data_size() == 0;
};
auto partitionDataRemover = [&](PersQueue::V1::MigrationStreamingReadServerMessage::DataBatch::PartitionData& partition) -> bool {
NProtoBuf::RemoveRepeatedFieldItemIf(partition.mutable_batches(), batchRemover);
return partition.batches_size() == 0;
};
NProtoBuf::RemoveRepeatedFieldItemIf(data.mutable_partition_data(), partitionDataRemover);
return !data.partition_data().empty();
for (const auto& partData : data.partition_data()) {
for (const auto& batch : partData.batches()) {
if (batch.message_data_size() > 0) {
return true;
}
}
}
return false;
}

// TODO: remove after refactor
bool RemoveEmptyMessages(Topic::StreamReadMessage::ReadResponse& data) {
auto batchRemover = [&](Topic::StreamReadMessage::ReadResponse::Batch& batch) -> bool {
return batch.message_data_size() == 0;
};
auto partitionDataRemover = [&](Topic::StreamReadMessage::ReadResponse::PartitionData& partition) -> bool {
NProtoBuf::RemoveRepeatedFieldItemIf(partition.mutable_batches(), batchRemover);
return partition.batches_size() == 0;
};
NProtoBuf::RemoveRepeatedFieldItemIf(data.mutable_partition_data(), partitionDataRemover);
return !data.partition_data().empty();
for (const auto& partData : data.partition_data()) {
for (const auto& batch : partData.batches()) {
if (batch.message_data_size() > 0) {
return true;
}
}
}
return false;
}

}

0 comments on commit 0676eae

Please sign in to comment.