Skip to content

Commit

Permalink
Fix sink empty batch (#10463)
Browse files Browse the repository at this point in the history
  • Loading branch information
nikvas0 authored Oct 16, 2024
1 parent 6b4d585 commit 0025e00
Showing 1 changed file with 8 additions and 3 deletions.
11 changes: 8 additions & 3 deletions ydb/core/kqp/runtime/kqp_write_table.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -889,14 +889,17 @@ class TShardsInfo {

void MakeNextBatches(i64 maxDataSize, ui64 maxCount) {
YQL_ENSURE(BatchesInFlight == 0);
YQL_ENSURE(!IsEmpty());
i64 dataSize = 0;
// For columnshard batch can be slightly larger than the limit.
while (BatchesInFlight < maxCount
&& BatchesInFlight < Batches.size()
&& dataSize + GetBatch(BatchesInFlight)->GetMemory() <= maxDataSize) {
&& (dataSize + GetBatch(BatchesInFlight)->GetMemory() <= maxDataSize || BatchesInFlight == 0)) {
dataSize += GetBatch(BatchesInFlight)->GetMemory();
++BatchesInFlight;
}
YQL_ENSURE(BatchesInFlight == Batches.size() || GetBatch(BatchesInFlight)->GetMemory() <= maxDataSize);
YQL_ENSURE(BatchesInFlight != 0);
YQL_ENSURE(BatchesInFlight == maxCount || BatchesInFlight == Batches.size() || dataSize + GetBatch(BatchesInFlight)->GetMemory() >= maxDataSize);
}

const IPayloadSerializer::IBatchPtr& GetBatch(size_t index) const {
Expand Down Expand Up @@ -1200,7 +1203,9 @@ class TShardedWriteController : public IShardedWriteController {
if (force) {
for (auto& [shardId, batches] : Serializer->FlushBatchesForce()) {
for (auto& batch : batches) {
ShardsInfo.GetShard(shardId).PushBatch(std::move(batch));
if (batch && !batch->IsEmpty()) {
ShardsInfo.GetShard(shardId).PushBatch(std::move(batch));
}
}
}
} else {
Expand Down

0 comments on commit 0025e00

Please sign in to comment.