diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_grace_join_imp.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_grace_join_imp.cpp index d6cbdc65ab1a..a30b3e6bebb3 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_grace_join_imp.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_grace_join_imp.cpp @@ -1160,7 +1160,7 @@ bool TTable::TryToReduceMemoryAndWait() { TableBucketsSpillers[largestBucketIndex].SpillBucket(std::move(TableBuckets[largestBucketIndex])); TableBuckets[largestBucketIndex] = TTableBucket{}; - return TableBucketsSpillers[largestBucketIndex].HasRunningAsyncIoOperation(); + return TableBucketsSpillers[largestBucketIndex].IsProcessingSpilling(); } void TTable::UpdateSpilling() { @@ -1319,12 +1319,6 @@ TTableBucket&& TTableBucketSpiller::ExtractBucket() { return std::move(CurrentBucket); } -bool TTableBucketSpiller::HasRunningAsyncIoOperation() const { - return StateUi64Adapter.HasRunningAsyncIoOperation() - || StateUi32Adapter.HasRunningAsyncIoOperation() - || StateCharAdapter.HasRunningAsyncIoOperation(); -} - bool TTableBucketSpiller::IsInMemory() const { return State == EState::InMemory; } @@ -1358,37 +1352,37 @@ void TTableBucketSpiller::ProcessBucketSpilling() { while (NextVectorToProcess != ENextVectorToProcess::None) { switch (NextVectorToProcess) { case ENextVectorToProcess::KeyAndVals: - if (StateUi64Adapter.HasRunningAsyncIoOperation() || !StateUi64Adapter.IsAcceptingData()) return; + if (!StateUi64Adapter.IsAcceptingData()) return; StateUi64Adapter.AddData(std::move(CurrentBucket.KeyIntVals)); NextVectorToProcess = ENextVectorToProcess::DataIntVals; break; case ENextVectorToProcess::DataIntVals: - if (StateUi64Adapter.HasRunningAsyncIoOperation() || !StateUi64Adapter.IsAcceptingData()) return; + if (!StateUi64Adapter.IsAcceptingData()) return; StateUi64Adapter.AddData(std::move(CurrentBucket.DataIntVals)); NextVectorToProcess = ENextVectorToProcess::StringsValues; break; case ENextVectorToProcess::StringsValues: - if (StateCharAdapter.HasRunningAsyncIoOperation() || !StateCharAdapter.IsAcceptingData()) return; + if (!StateCharAdapter.IsAcceptingData()) return; StateCharAdapter.AddData(std::move(CurrentBucket.StringsValues)); NextVectorToProcess = ENextVectorToProcess::StringsOffsets; break; case ENextVectorToProcess::StringsOffsets: - if (StateUi32Adapter.HasRunningAsyncIoOperation() || !StateUi32Adapter.IsAcceptingData()) return; + if (!StateUi32Adapter.IsAcceptingData()) return; StateUi32Adapter.AddData(std::move(CurrentBucket.StringsOffsets)); NextVectorToProcess = ENextVectorToProcess::InterfaceValues; break; case ENextVectorToProcess::InterfaceValues: - if (StateCharAdapter.HasRunningAsyncIoOperation() || !StateCharAdapter.IsAcceptingData()) return; + if (!StateCharAdapter.IsAcceptingData()) return; StateCharAdapter.AddData(std::move(CurrentBucket.InterfaceValues)); NextVectorToProcess = ENextVectorToProcess::InterfaceOffsets; break; case ENextVectorToProcess::InterfaceOffsets: - if (StateUi32Adapter.HasRunningAsyncIoOperation() || !StateUi32Adapter.IsAcceptingData()) return; + if (!StateUi32Adapter.IsAcceptingData()) return; StateUi32Adapter.AddData(std::move(CurrentBucket.InterfaceOffsets)); NextVectorToProcess = ENextVectorToProcess::None; @@ -1433,71 +1427,85 @@ void TTableBucketSpiller::ProcessBucketRestoration() { while (NextVectorToProcess != ENextVectorToProcess::None) { switch (NextVectorToProcess) { case ENextVectorToProcess::KeyAndVals: - if (StateUi64Adapter.HasRunningAsyncIoOperation()) return; + if (StateUi64Adapter.IsDataReady()) { + AppendVector(CurrentBucket.KeyIntVals, StateUi64Adapter.ExtractVector()); + NextVectorToProcess = ENextVectorToProcess::DataIntVals; + break; + } - if (!StateUi64Adapter.IsDataReady()) { + if (StateUi64Adapter.IsAcceptingDataRequests()) { StateUi64Adapter.RequestNextVector(); - if (StateUi64Adapter.HasRunningAsyncIoOperation()) return; + break; } - AppendVector(CurrentBucket.KeyIntVals, StateUi64Adapter.ExtractVector()); - NextVectorToProcess = ENextVectorToProcess::DataIntVals; - break; + return; case ENextVectorToProcess::DataIntVals: - if (StateUi64Adapter.HasRunningAsyncIoOperation()) return; + if (StateUi64Adapter.IsDataReady()) { + AppendVector(CurrentBucket.DataIntVals, StateUi64Adapter.ExtractVector()); + NextVectorToProcess = ENextVectorToProcess::StringsValues; + break; + } - if (!StateUi64Adapter.IsDataReady()) { + if (StateUi64Adapter.IsAcceptingDataRequests()) { StateUi64Adapter.RequestNextVector(); - if (StateUi64Adapter.HasRunningAsyncIoOperation()) return; + break; } - AppendVector(CurrentBucket.DataIntVals, StateUi64Adapter.ExtractVector()); - NextVectorToProcess = ENextVectorToProcess::StringsValues; - break; + return; case ENextVectorToProcess::StringsValues: - if (StateCharAdapter.HasRunningAsyncIoOperation()) return; + if (StateCharAdapter.IsDataReady()) { + AppendVector(CurrentBucket.StringsValues, StateCharAdapter.ExtractVector()); + NextVectorToProcess = ENextVectorToProcess::StringsOffsets; + break; + } - if (!StateCharAdapter.IsDataReady()) { + if (StateCharAdapter.IsAcceptingDataRequests()) { StateCharAdapter.RequestNextVector(); - if (StateCharAdapter.HasRunningAsyncIoOperation()) return; + break; } - AppendVector(CurrentBucket.StringsValues, StateCharAdapter.ExtractVector()); - NextVectorToProcess = ENextVectorToProcess::StringsOffsets; - break; + return; case ENextVectorToProcess::StringsOffsets: - if (StateUi32Adapter.HasRunningAsyncIoOperation()) return; + if (StateUi32Adapter.IsDataReady()) { + AppendVector(CurrentBucket.StringsOffsets, StateUi32Adapter.ExtractVector()); + NextVectorToProcess = ENextVectorToProcess::InterfaceValues; + break; + } - if (!StateUi32Adapter.IsDataReady()) { + if (StateUi32Adapter.IsAcceptingDataRequests()) { StateUi32Adapter.RequestNextVector(); - if (StateUi32Adapter.HasRunningAsyncIoOperation()) return; + break; } - AppendVector(CurrentBucket.StringsOffsets, StateUi32Adapter.ExtractVector()); - NextVectorToProcess = ENextVectorToProcess::InterfaceValues; - break; + return; case ENextVectorToProcess::InterfaceValues: - if (StateCharAdapter.HasRunningAsyncIoOperation()) return; + if (StateCharAdapter.IsDataReady()) { + AppendVector(CurrentBucket.InterfaceValues, StateCharAdapter.ExtractVector()); + NextVectorToProcess = ENextVectorToProcess::InterfaceOffsets; + break; + } - if (!StateCharAdapter.IsDataReady()) { + if (StateCharAdapter.IsAcceptingDataRequests()) { StateCharAdapter.RequestNextVector(); - if (StateCharAdapter.HasRunningAsyncIoOperation()) return; + break; } - AppendVector(CurrentBucket.InterfaceValues, StateCharAdapter.ExtractVector()); - NextVectorToProcess = ENextVectorToProcess::InterfaceOffsets; - break; + return; case ENextVectorToProcess::InterfaceOffsets: - if (StateUi32Adapter.HasRunningAsyncIoOperation()) return; + if (StateUi32Adapter.IsDataReady()) { + AppendVector(CurrentBucket.InterfaceOffsets, StateUi32Adapter.ExtractVector()); + + SpilledBucketsCount--; + if (SpilledBucketsCount == 0) { + NextVectorToProcess = ENextVectorToProcess::None; + State = EState::WaitingForExtraction; + } else { + NextVectorToProcess = ENextVectorToProcess::KeyAndVals; + } + + break; + } - if (!StateUi32Adapter.IsDataReady()) { + if (StateUi32Adapter.IsAcceptingDataRequests()) { StateUi32Adapter.RequestNextVector(); - if (StateUi32Adapter.HasRunningAsyncIoOperation()) return; + break; } - AppendVector(CurrentBucket.InterfaceOffsets, StateUi32Adapter.ExtractVector()); - SpilledBucketsCount--; - if (SpilledBucketsCount == 0) { - NextVectorToProcess = ENextVectorToProcess::None; - State = EState::WaitingForExtraction; - } else { - NextVectorToProcess = ENextVectorToProcess::KeyAndVals; - } - break; + return; default: return; diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_grace_join_imp.h b/ydb/library/yql/minikql/comp_nodes/mkql_grace_join_imp.h index 9053ac630316..455f3959db78 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_grace_join_imp.h +++ b/ydb/library/yql/minikql/comp_nodes/mkql_grace_join_imp.h @@ -113,8 +113,6 @@ class TTableBucketSpiller { void Update(); // Flushes all the data from inner spillers. Should be called when no more data is expected for spilling. void Finalize(); - // Checks if spillers are waiting for any running async operation. No calls other than update are allowed when the method returns true. - bool HasRunningAsyncIoOperation() const; // Is bucket in memory. False if spilled. bool IsInMemory() const; // Is bucket loaded to memory but still owned by spilled. diff --git a/ydb/library/yql/minikql/computation/mkql_vector_spiller_adapter.h b/ydb/library/yql/minikql/computation/mkql_vector_spiller_adapter.h index 3e5b9a3d2df3..cfd96c8328f3 100644 --- a/ydb/library/yql/minikql/computation/mkql_vector_spiller_adapter.h +++ b/ydb/library/yql/minikql/computation/mkql_vector_spiller_adapter.h @@ -29,10 +29,6 @@ class TVectorSpillerAdapter { { } - bool HasRunningAsyncIoOperation() const { - return ReadOperation.has_value() && !ReadOperation->HasValue() || WriteOperation.has_value() && !WriteOperation->HasValue(); - } - ///Returns current stete of the adapter EState GetState() const { return State;