Skip to content

Commit

Permalink
grace join: add bloom filters (ydb-platform#6951)
Browse files Browse the repository at this point in the history
(cherry picked from commit 774401f)
  • Loading branch information
yumkam committed Oct 10, 2024
1 parent 58678ae commit 9322b6b
Show file tree
Hide file tree
Showing 3 changed files with 267 additions and 44 deletions.
117 changes: 80 additions & 37 deletions ydb/library/yql/minikql/comp_nodes/mkql_grace_join.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,6 @@ TColumnDataPackInfo GetPackInfo(TType* type) {
void TGraceJoinPacker::Pack() {

TuplesPacked++;
TuplesBatchPacked++;
std::fill(TupleIntVals.begin(), TupleIntVals.end(), 0);

for (ui64 i = 0; i < ColumnsPackInfo.size(); i++) {
Expand Down Expand Up @@ -590,6 +589,7 @@ class TGraceJoinSpillingSupportState : public TComputationValue<TGraceJoinSpilli
, SelfJoinSameKeys_(isSelfJoin && (leftKeyColumns == rightKeyColumns))
, IsSpillingAllowed(isSpillingAllowed)
{
YQL_LOG(GRACEJOIN_DEBUG) << (const void *)&*JoinedTablePtr << "# AnyJoinSettings=" << (int)anyJoinSettings << " JoinKind=" << (int)joinKind;
if (JoinKind == EJoinKind::Full || JoinKind == EJoinKind::Exclusion || IsSelfJoin_) {
LeftPacker->BatchSize = std::numeric_limits<ui64>::max();
RightPacker->BatchSize = std::numeric_limits<ui64>::max();
Expand All @@ -607,7 +607,9 @@ class TGraceJoinSpillingSupportState : public TComputationValue<TGraceJoinSpilli
break;
}
case EOperatingMode::Spilling: {
DoCalculateWithSpilling(ctx);
auto r = DoCalculateWithSpilling(ctx, output);
if (r == EFetchResult::One)
return r;
if (GetMode() == EOperatingMode::Spilling) {
return EFetchResult::Yield;
}
Expand Down Expand Up @@ -657,10 +659,41 @@ class TGraceJoinSpillingSupportState : public TComputationValue<TGraceJoinSpilli
Mode = mode;
}

bool FetchAndPackData(TComputationContext& ctx) {
EFetchResult FetchAndPackData(TComputationContext& ctx, NUdf::TUnboxedValue*const* output) {
const NKikimr::NMiniKQL::EFetchResult resultLeft = FlowLeft->FetchValues(ctx, LeftPacker->TuplePtrs.data());
NKikimr::NMiniKQL::EFetchResult resultRight;

if (resultLeft == EFetchResult::One) {
if (LeftPacker->TuplesPacked == 0) {
LeftPacker->StartTime = std::chrono::system_clock::now();
}
LeftPacker->Pack();
{
auto added = LeftPacker->TablePtr->AddTuple(LeftPacker->TupleIntVals.data(), LeftPacker->TupleStrings.data(), LeftPacker->TupleStrSizes.data(), LeftPacker->IColumnsHolder.data(), *RightPacker->TablePtr);
if (added == GraceJoin::TTable::EAddTupleResult::Added)
++LeftPacker->TuplesBatchPacked;
else if (added == GraceJoin::TTable::EAddTupleResult::AnyMatch)
; // row dropped
else if (JoinKind == EJoinKind::Inner || JoinKind == EJoinKind::Right || JoinKind == EJoinKind::RightSemi || JoinKind == EJoinKind::RightOnly || JoinKind == EJoinKind::LeftSemi)
; // row dropped
else { // Left, LeftOnly, Full, Exclusion: output row
for (size_t i = 0; i < LeftRenames.size() / 2; i++) {
auto & valPtr = output[LeftRenames[2 * i + 1]];
if ( valPtr ) {
*valPtr = *LeftPacker->TuplePtrs[LeftRenames[2 * i]];
}
}
for (size_t i = 0; i < RightRenames.size() / 2; i++) {
auto & valPtr = output[RightRenames[2 * i + 1]];
if ( valPtr ) {
*valPtr = NYql::NUdf::TUnboxedValue();
}
}
return EFetchResult::One;
}
}
}

if (IsSelfJoin_) {
resultRight = resultLeft;
if (!SelfJoinSameKeys_) {
Expand All @@ -670,27 +703,40 @@ class TGraceJoinSpillingSupportState : public TComputationValue<TGraceJoinSpilli
resultRight = FlowRight->FetchValues(ctx, RightPacker->TuplePtrs.data());
}

if (resultLeft == EFetchResult::One) {
if (LeftPacker->TuplesPacked == 0) {
LeftPacker->StartTime = std::chrono::system_clock::now();
}
LeftPacker->Pack();
LeftPacker->TablePtr->AddTuple(LeftPacker->TupleIntVals.data(), LeftPacker->TupleStrings.data(), LeftPacker->TupleStrSizes.data(), LeftPacker->IColumnsHolder.data());
}

if (resultRight == EFetchResult::One) {
if (RightPacker->TuplesPacked == 0) {
RightPacker->StartTime = std::chrono::system_clock::now();
}

if ( !SelfJoinSameKeys_ ) {
RightPacker->Pack();
RightPacker->TablePtr->AddTuple(RightPacker->TupleIntVals.data(), RightPacker->TupleStrings.data(), RightPacker->TupleStrSizes.data(), RightPacker->IColumnsHolder.data());
auto added = RightPacker->TablePtr->AddTuple(RightPacker->TupleIntVals.data(), RightPacker->TupleStrings.data(), RightPacker->TupleStrSizes.data(), RightPacker->IColumnsHolder.data(), *LeftPacker->TablePtr);
if (added == GraceJoin::TTable::EAddTupleResult::Added)
++RightPacker->TuplesBatchPacked;
else if (added == GraceJoin::TTable::EAddTupleResult::AnyMatch)
; // row dropped
else if (JoinKind == EJoinKind::Inner || JoinKind == EJoinKind::Left || JoinKind == EJoinKind::LeftSemi || JoinKind == EJoinKind::LeftOnly || JoinKind == EJoinKind::RightSemi)
; // row dropped
else { // Right, RightOnly, Full, Exclusion: output row
for (size_t i = 0; i < LeftRenames.size() / 2; i++) {
auto & valPtr = output[LeftRenames[2 * i + 1]];
if ( valPtr ) {
*valPtr = NYql::NUdf::TUnboxedValue();
}
}
for (size_t i = 0; i < RightRenames.size() / 2; i++) {
auto & valPtr = output[RightRenames[2 * i + 1]];
if ( valPtr ) {
*valPtr = *RightPacker->TuplePtrs[RightRenames[2 * i]];
}
}
return EFetchResult::One;
}
}
}

if (resultLeft == EFetchResult::Yield || resultRight == EFetchResult::Yield) {
return true;
return EFetchResult::Yield;
}

if (resultLeft == EFetchResult::Finish ) {
Expand All @@ -702,7 +748,7 @@ class TGraceJoinSpillingSupportState : public TComputationValue<TGraceJoinSpilli
*HaveMoreRightRows = false;
}

return false;
return EFetchResult::Finish;
}

void UnpackJoinedData(NUdf::TUnboxedValue*const* output) {
Expand Down Expand Up @@ -762,33 +808,29 @@ class TGraceJoinSpillingSupportState : public TComputationValue<TGraceJoinSpilli
break;
}

bool isYield = FetchAndPackData(ctx);
auto isYield = FetchAndPackData(ctx, output);
if (IsSpillingAllowed && ctx.SpillerFactory && IsSwitchToSpillingModeCondition()) {
const auto used = TlsAllocState->GetUsed();
const auto limit = TlsAllocState->GetLimit();

YQL_LOG(INFO) << "yellow zone reached " << (used*100/limit) << "%=" << used << "/" << limit;
YQL_LOG(INFO) << "switching Memory mode to Spilling";
YQL_LOG(INFO) << (const void *)&*JoinedTablePtr << "# switching Memory mode to Spilling";

SwitchMode(EOperatingMode::Spilling, ctx);
return EFetchResult::Yield;
}
if (isYield) return EFetchResult::Yield;
if (isYield != EFetchResult::Finish) return isYield;

if (!*HaveMoreRightRows && !*PartialJoinCompleted && LeftPacker->TuplesBatchPacked >= LeftPacker->BatchSize ) {
*PartialJoinCompleted = true;
JoinedTablePtr->Join(*LeftPacker->TablePtr, *RightPacker->TablePtr, JoinKind, *HaveMoreLeftRows, *HaveMoreRightRows);
JoinedTablePtr->ResetIterator();
}

if (!*HaveMoreLeftRows && !*PartialJoinCompleted && RightPacker->TuplesBatchPacked >= RightPacker->BatchSize ) {
*PartialJoinCompleted = true;
JoinedTablePtr->Join(*LeftPacker->TablePtr, *RightPacker->TablePtr, JoinKind, *HaveMoreLeftRows, *HaveMoreRightRows);
JoinedTablePtr->ResetIterator();

}
if (!*PartialJoinCompleted && (
(!*HaveMoreRightRows && (!*HaveMoreLeftRows || LeftPacker->TuplesBatchPacked >= LeftPacker->BatchSize )) ||
(!*HaveMoreLeftRows && RightPacker->TuplesBatchPacked >= RightPacker->BatchSize))) {

if (!*HaveMoreRightRows && !*HaveMoreLeftRows && !*PartialJoinCompleted) {
YQL_LOG(GRACEJOIN_TRACE)
<< (const void *)&*JoinedTablePtr << '#'
<< " HaveLeft " << *HaveMoreLeftRows << " LeftPacked " << LeftPacker->TuplesBatchPacked << " LeftBatch " << LeftPacker->BatchSize
<< " HaveRight " << *HaveMoreRightRows << " RightPacked " << RightPacker->TuplesBatchPacked << " RightBatch " << RightPacker->BatchSize
;
*PartialJoinCompleted = true;
LeftPacker->StartTime = std::chrono::system_clock::now();
RightPacker->StartTime = std::chrono::system_clock::now();
Expand Down Expand Up @@ -832,7 +874,7 @@ class TGraceJoinSpillingSupportState : public TComputationValue<TGraceJoinSpilli
return LeftPacker->TablePtr->IsRestoringSpilledBuckets() || RightPacker->TablePtr->IsRestoringSpilledBuckets();
}

void DoCalculateWithSpilling(TComputationContext& ctx) {
EFetchResult DoCalculateWithSpilling(TComputationContext& ctx, NUdf::TUnboxedValue*const* output) {
UpdateSpilling();

ui32 cnt = 0;
Expand All @@ -841,28 +883,29 @@ void DoCalculateWithSpilling(TComputationContext& ctx) {
if (!HasMemoryForProcessing() && !IsSpillingFinalized) {
bool isWaitingForReduce = TryToReduceMemoryAndWait();

if (isWaitingForReduce) return;
if (isWaitingForReduce) return EFetchResult::Yield;
}
}
bool isYield = FetchAndPackData(ctx);
if (isYield) return;
auto isYield = FetchAndPackData(ctx, output);
if (isYield != EFetchResult::Finish) return isYield;
}

if (!*HaveMoreLeftRows && !*HaveMoreRightRows) {
if (!IsSpillingFinished()) return;
if (!IsSpillingFinished()) return EFetchResult::Yield;
if (!IsSpillingFinalized) {
LeftPacker->TablePtr->FinalizeSpilling();
RightPacker->TablePtr->FinalizeSpilling();
IsSpillingFinalized = true;

UpdateSpilling();
}
if (!IsReadyForSpilledDataProcessing()) return;
if (!IsReadyForSpilledDataProcessing()) return EFetchResult::Yield;

YQL_LOG(INFO) << "switching to ProcessSpilled";
YQL_LOG(INFO) << (const void *)&*JoinedTablePtr << "# switching to ProcessSpilled";
SwitchMode(EOperatingMode::ProcessSpilled, ctx);
return;
return EFetchResult::Finish;
}
return EFetchResult::Yield;
}

EFetchResult ProcessSpilledData(TComputationContext&, NUdf::TUnboxedValue*const* output) {
Expand Down
Loading

0 comments on commit 9322b6b

Please sign in to comment.