Skip to content

Commit

Permalink
Merge b010a58 into c5d16f6
Browse files Browse the repository at this point in the history
  • Loading branch information
kardymonds authored Nov 19, 2024
2 parents c5d16f6 + b010a58 commit 9b4626f
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 5 deletions.
6 changes: 5 additions & 1 deletion ydb/core/fq/libs/row_dispatcher/topic_session.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -672,9 +672,13 @@ void TTopicSession::DoFiltering(ui64 rowsOffset, ui64 numberRows, const TVector<
Y_ENSURE(rowsOffset < offsets.size(), "Invalid first row ofset");
Y_ENSURE(numberRows, "Expected non empty parsed batch");
Y_ENSURE(parsedValues, "Expected non empty schema");
LOG_ROW_DISPATCHER_TRACE("SendToFiltering, first offset: " << offsets[rowsOffset] << ", last offset: " << offsets[rowsOffset + numberRows - 1]);
auto lastOffset = offsets[rowsOffset + numberRows - 1];
LOG_ROW_DISPATCHER_TRACE("SendToFiltering, first offset: " << offsets[rowsOffset] << ", last offset: " << lastOffset);

for (auto& [actorId, info] : Clients) {
if (info.NextMessageOffset && lastOffset < info.NextMessageOffset) { // the batch has already been processed
continue;
}
try {
if (info.Filter) {
info.Filter->Push(offsets, RebuildJson(info, parsedValues), rowsOffset, numberRows);
Expand Down
8 changes: 4 additions & 4 deletions ydb/core/fq/libs/row_dispatcher/ut/topic_session_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ Y_UNIT_TEST_SUITE(TopicSessionTests) {
auto source = BuildSource(topicName);
StartSession(ReadActorId1, source);

const std::vector<TString> data = { Json1, Json2 }; // offset 0, 1
const std::vector<TString> data = { Json1, Json2, Json3 }; // offset 0, 1, 2
PQWrite(data, topicName);
ExpectNewDataArrived({ReadActorId1});
ExpectMessageBatch(ReadActorId1, data);
Expand All @@ -341,11 +341,11 @@ Y_UNIT_TEST_SUITE(TopicSessionTests) {
StartSession(ReadActorId2, source, 1);
ExpectNewDataArrived({ReadActorId2});

PQWrite({ Json3 }, topicName);
PQWrite({ Json4 }, topicName);
ExpectNewDataArrived({ReadActorId1});

ExpectMessageBatch(ReadActorId1, { Json3 });
ExpectMessageBatch(ReadActorId2, { Json2, Json3 });
ExpectMessageBatch(ReadActorId1, { Json4 });
ExpectMessageBatch(ReadActorId2, { Json2, Json3, Json4 });

StopSession(ReadActorId1, source);
StopSession(ReadActorId2, source);
Expand Down

0 comments on commit 9b4626f

Please sign in to comment.