Skip to content

Commit

Permalink
Yql 18490 get rid of HasRunningAsyncIoOperation in mkql_grace_join_imp (
Browse files Browse the repository at this point in the history
  • Loading branch information
lll-phill-lll authored May 31, 2024
1 parent 76567af commit 785a089
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 61 deletions.
118 changes: 63 additions & 55 deletions ydb/library/yql/minikql/comp_nodes/mkql_grace_join_imp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1160,7 +1160,7 @@ bool TTable::TryToReduceMemoryAndWait() {
TableBucketsSpillers[largestBucketIndex].SpillBucket(std::move(TableBuckets[largestBucketIndex]));
TableBuckets[largestBucketIndex] = TTableBucket{};

return TableBucketsSpillers[largestBucketIndex].HasRunningAsyncIoOperation();
return TableBucketsSpillers[largestBucketIndex].IsProcessingSpilling();
}

void TTable::UpdateSpilling() {
Expand Down Expand Up @@ -1319,12 +1319,6 @@ TTableBucket&& TTableBucketSpiller::ExtractBucket() {
return std::move(CurrentBucket);
}

bool TTableBucketSpiller::HasRunningAsyncIoOperation() const {
return StateUi64Adapter.HasRunningAsyncIoOperation()
|| StateUi32Adapter.HasRunningAsyncIoOperation()
|| StateCharAdapter.HasRunningAsyncIoOperation();
}

bool TTableBucketSpiller::IsInMemory() const {
return State == EState::InMemory;
}
Expand Down Expand Up @@ -1358,37 +1352,37 @@ void TTableBucketSpiller::ProcessBucketSpilling() {
while (NextVectorToProcess != ENextVectorToProcess::None) {
switch (NextVectorToProcess) {
case ENextVectorToProcess::KeyAndVals:
if (StateUi64Adapter.HasRunningAsyncIoOperation() || !StateUi64Adapter.IsAcceptingData()) return;
if (!StateUi64Adapter.IsAcceptingData()) return;

StateUi64Adapter.AddData(std::move(CurrentBucket.KeyIntVals));
NextVectorToProcess = ENextVectorToProcess::DataIntVals;
break;
case ENextVectorToProcess::DataIntVals:
if (StateUi64Adapter.HasRunningAsyncIoOperation() || !StateUi64Adapter.IsAcceptingData()) return;
if (!StateUi64Adapter.IsAcceptingData()) return;

StateUi64Adapter.AddData(std::move(CurrentBucket.DataIntVals));
NextVectorToProcess = ENextVectorToProcess::StringsValues;
break;
case ENextVectorToProcess::StringsValues:
if (StateCharAdapter.HasRunningAsyncIoOperation() || !StateCharAdapter.IsAcceptingData()) return;
if (!StateCharAdapter.IsAcceptingData()) return;

StateCharAdapter.AddData(std::move(CurrentBucket.StringsValues));
NextVectorToProcess = ENextVectorToProcess::StringsOffsets;
break;
case ENextVectorToProcess::StringsOffsets:
if (StateUi32Adapter.HasRunningAsyncIoOperation() || !StateUi32Adapter.IsAcceptingData()) return;
if (!StateUi32Adapter.IsAcceptingData()) return;

StateUi32Adapter.AddData(std::move(CurrentBucket.StringsOffsets));
NextVectorToProcess = ENextVectorToProcess::InterfaceValues;
break;
case ENextVectorToProcess::InterfaceValues:
if (StateCharAdapter.HasRunningAsyncIoOperation() || !StateCharAdapter.IsAcceptingData()) return;
if (!StateCharAdapter.IsAcceptingData()) return;

StateCharAdapter.AddData(std::move(CurrentBucket.InterfaceValues));
NextVectorToProcess = ENextVectorToProcess::InterfaceOffsets;
break;
case ENextVectorToProcess::InterfaceOffsets:
if (StateUi32Adapter.HasRunningAsyncIoOperation() || !StateUi32Adapter.IsAcceptingData()) return;
if (!StateUi32Adapter.IsAcceptingData()) return;

StateUi32Adapter.AddData(std::move(CurrentBucket.InterfaceOffsets));
NextVectorToProcess = ENextVectorToProcess::None;
Expand Down Expand Up @@ -1433,71 +1427,85 @@ void TTableBucketSpiller::ProcessBucketRestoration() {
while (NextVectorToProcess != ENextVectorToProcess::None) {
switch (NextVectorToProcess) {
case ENextVectorToProcess::KeyAndVals:
if (StateUi64Adapter.HasRunningAsyncIoOperation()) return;
if (StateUi64Adapter.IsDataReady()) {
AppendVector(CurrentBucket.KeyIntVals, StateUi64Adapter.ExtractVector());
NextVectorToProcess = ENextVectorToProcess::DataIntVals;
break;
}

if (!StateUi64Adapter.IsDataReady()) {
if (StateUi64Adapter.IsAcceptingDataRequests()) {
StateUi64Adapter.RequestNextVector();
if (StateUi64Adapter.HasRunningAsyncIoOperation()) return;
break;
}
AppendVector(CurrentBucket.KeyIntVals, StateUi64Adapter.ExtractVector());
NextVectorToProcess = ENextVectorToProcess::DataIntVals;
break;
return;
case ENextVectorToProcess::DataIntVals:
if (StateUi64Adapter.HasRunningAsyncIoOperation()) return;
if (StateUi64Adapter.IsDataReady()) {
AppendVector(CurrentBucket.DataIntVals, StateUi64Adapter.ExtractVector());
NextVectorToProcess = ENextVectorToProcess::StringsValues;
break;
}

if (!StateUi64Adapter.IsDataReady()) {
if (StateUi64Adapter.IsAcceptingDataRequests()) {
StateUi64Adapter.RequestNextVector();
if (StateUi64Adapter.HasRunningAsyncIoOperation()) return;
break;
}
AppendVector(CurrentBucket.DataIntVals, StateUi64Adapter.ExtractVector());
NextVectorToProcess = ENextVectorToProcess::StringsValues;
break;
return;
case ENextVectorToProcess::StringsValues:
if (StateCharAdapter.HasRunningAsyncIoOperation()) return;
if (StateCharAdapter.IsDataReady()) {
AppendVector(CurrentBucket.StringsValues, StateCharAdapter.ExtractVector());
NextVectorToProcess = ENextVectorToProcess::StringsOffsets;
break;
}

if (!StateCharAdapter.IsDataReady()) {
if (StateCharAdapter.IsAcceptingDataRequests()) {
StateCharAdapter.RequestNextVector();
if (StateCharAdapter.HasRunningAsyncIoOperation()) return;
break;
}
AppendVector(CurrentBucket.StringsValues, StateCharAdapter.ExtractVector());
NextVectorToProcess = ENextVectorToProcess::StringsOffsets;
break;
return;
case ENextVectorToProcess::StringsOffsets:
if (StateUi32Adapter.HasRunningAsyncIoOperation()) return;
if (StateUi32Adapter.IsDataReady()) {
AppendVector(CurrentBucket.StringsOffsets, StateUi32Adapter.ExtractVector());
NextVectorToProcess = ENextVectorToProcess::InterfaceValues;
break;
}

if (!StateUi32Adapter.IsDataReady()) {
if (StateUi32Adapter.IsAcceptingDataRequests()) {
StateUi32Adapter.RequestNextVector();
if (StateUi32Adapter.HasRunningAsyncIoOperation()) return;
break;
}
AppendVector(CurrentBucket.StringsOffsets, StateUi32Adapter.ExtractVector());
NextVectorToProcess = ENextVectorToProcess::InterfaceValues;
break;
return;
case ENextVectorToProcess::InterfaceValues:
if (StateCharAdapter.HasRunningAsyncIoOperation()) return;
if (StateCharAdapter.IsDataReady()) {
AppendVector(CurrentBucket.InterfaceValues, StateCharAdapter.ExtractVector());
NextVectorToProcess = ENextVectorToProcess::InterfaceOffsets;
break;
}

if (!StateCharAdapter.IsDataReady()) {
if (StateCharAdapter.IsAcceptingDataRequests()) {
StateCharAdapter.RequestNextVector();
if (StateCharAdapter.HasRunningAsyncIoOperation()) return;
break;
}
AppendVector(CurrentBucket.InterfaceValues, StateCharAdapter.ExtractVector());
NextVectorToProcess = ENextVectorToProcess::InterfaceOffsets;
break;
return;
case ENextVectorToProcess::InterfaceOffsets:
if (StateUi32Adapter.HasRunningAsyncIoOperation()) return;
if (StateUi32Adapter.IsDataReady()) {
AppendVector(CurrentBucket.InterfaceOffsets, StateUi32Adapter.ExtractVector());

SpilledBucketsCount--;
if (SpilledBucketsCount == 0) {
NextVectorToProcess = ENextVectorToProcess::None;
State = EState::WaitingForExtraction;
} else {
NextVectorToProcess = ENextVectorToProcess::KeyAndVals;
}

break;
}

if (!StateUi32Adapter.IsDataReady()) {
if (StateUi32Adapter.IsAcceptingDataRequests()) {
StateUi32Adapter.RequestNextVector();
if (StateUi32Adapter.HasRunningAsyncIoOperation()) return;
break;
}
AppendVector(CurrentBucket.InterfaceOffsets, StateUi32Adapter.ExtractVector());
SpilledBucketsCount--;
if (SpilledBucketsCount == 0) {
NextVectorToProcess = ENextVectorToProcess::None;
State = EState::WaitingForExtraction;
} else {
NextVectorToProcess = ENextVectorToProcess::KeyAndVals;
}
break;
return;
default:
return;

Expand Down
2 changes: 0 additions & 2 deletions ydb/library/yql/minikql/comp_nodes/mkql_grace_join_imp.h
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,6 @@ class TTableBucketSpiller {
void Update();
// Flushes all the data from inner spillers. Should be called when no more data is expected for spilling.
void Finalize();
// Checks if spillers are waiting for any running async operation. No calls other than update are allowed when the method returns true.
bool HasRunningAsyncIoOperation() const;
// Is bucket in memory. False if spilled.
bool IsInMemory() const;
// Is bucket loaded to memory but still owned by spilled.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,6 @@ class TVectorSpillerAdapter {
{
}

bool HasRunningAsyncIoOperation() const {
return ReadOperation.has_value() && !ReadOperation->HasValue() || WriteOperation.has_value() && !WriteOperation->HasValue();
}

///Returns current stete of the adapter
EState GetState() const {
return State;
Expand Down

0 comments on commit 785a089

Please sign in to comment.