diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_grace_join.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_grace_join.cpp index f8c071dd837b..4f4b83168e47 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_grace_join.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_grace_join.cpp @@ -197,7 +197,6 @@ TColumnDataPackInfo GetPackInfo(TType* type) { void TGraceJoinPacker::Pack() { TuplesPacked++; - TuplesBatchPacked++; std::fill(TupleIntVals.begin(), TupleIntVals.end(), 0); for (ui64 i = 0; i < ColumnsPackInfo.size(); i++) { @@ -590,6 +589,7 @@ class TGraceJoinSpillingSupportState : public TComputationValueTuplesBatchPacked >= LeftPacker->BatchSize ) { - *PartialJoinCompleted = true; - JoinedTablePtr->Join(*LeftPacker->TablePtr, *RightPacker->TablePtr, JoinKind, *HaveMoreLeftRows, *HaveMoreRightRows); - JoinedTablePtr->ResetIterator(); - } - - if (!*HaveMoreLeftRows && !*PartialJoinCompleted && RightPacker->TuplesBatchPacked >= RightPacker->BatchSize ) { - *PartialJoinCompleted = true; - JoinedTablePtr->Join(*LeftPacker->TablePtr, *RightPacker->TablePtr, JoinKind, *HaveMoreLeftRows, *HaveMoreRightRows); - JoinedTablePtr->ResetIterator(); - } + if (!*PartialJoinCompleted && ( + (!*HaveMoreRightRows && (!*HaveMoreLeftRows || LeftPacker->TuplesBatchPacked >= LeftPacker->BatchSize )) || + (!*HaveMoreLeftRows && RightPacker->TuplesBatchPacked >= RightPacker->BatchSize))) { - if (!*HaveMoreRightRows && !*HaveMoreLeftRows && !*PartialJoinCompleted) { + YQL_LOG(GRACEJOIN_TRACE) + << (const void *)&*JoinedTablePtr << '#' + << " HaveLeft " << *HaveMoreLeftRows << " LeftPacked " << LeftPacker->TuplesBatchPacked << " LeftBatch " << LeftPacker->BatchSize + << " HaveRight " << *HaveMoreRightRows << " RightPacked " << RightPacker->TuplesBatchPacked << " RightBatch " << RightPacker->BatchSize + ; *PartialJoinCompleted = true; LeftPacker->StartTime = std::chrono::system_clock::now(); RightPacker->StartTime = std::chrono::system_clock::now(); @@ -832,7 +874,7 @@ class TGraceJoinSpillingSupportState : public TComputationValueTablePtr->IsRestoringSpilledBuckets() || RightPacker->TablePtr->IsRestoringSpilledBuckets(); } -void DoCalculateWithSpilling(TComputationContext& ctx) { +EFetchResult DoCalculateWithSpilling(TComputationContext& ctx, NUdf::TUnboxedValue*const* output) { UpdateSpilling(); ui32 cnt = 0; @@ -841,15 +883,15 @@ void DoCalculateWithSpilling(TComputationContext& ctx) { if (!HasMemoryForProcessing() && !IsSpillingFinalized) { bool isWaitingForReduce = TryToReduceMemoryAndWait(); - if (isWaitingForReduce) return; + if (isWaitingForReduce) return EFetchResult::Yield; } } - bool isYield = FetchAndPackData(ctx); - if (isYield) return; + auto isYield = FetchAndPackData(ctx, output); + if (isYield != EFetchResult::Finish) return isYield; } if (!*HaveMoreLeftRows && !*HaveMoreRightRows) { - if (!IsSpillingFinished()) return; + if (!IsSpillingFinished()) return EFetchResult::Yield; if (!IsSpillingFinalized) { LeftPacker->TablePtr->FinalizeSpilling(); RightPacker->TablePtr->FinalizeSpilling(); @@ -857,12 +899,13 @@ void DoCalculateWithSpilling(TComputationContext& ctx) { UpdateSpilling(); } - if (!IsReadyForSpilledDataProcessing()) return; + if (!IsReadyForSpilledDataProcessing()) return EFetchResult::Yield; - YQL_LOG(INFO) << "switching to ProcessSpilled"; + YQL_LOG(INFO) << (const void *)&*JoinedTablePtr << "# switching to ProcessSpilled"; SwitchMode(EOperatingMode::ProcessSpilled, ctx); - return; + return EFetchResult::Finish; } + return EFetchResult::Yield; } EFetchResult ProcessSpilledData(TComputationContext&, NUdf::TUnboxedValue*const* output) { diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_grace_join_imp.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_grace_join_imp.cpp index b772aad54dbb..dc79431f1550 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_grace_join_imp.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_grace_join_imp.cpp @@ -13,7 +13,10 @@ namespace NMiniKQL { namespace GraceJoin { -void TTable::AddTuple( ui64 * intColumns, char ** stringColumns, ui32 * stringsSizes, NYql::NUdf::TUnboxedValue * iColumns ) { +TTable::EAddTupleResult TTable::AddTuple( ui64 * intColumns, char ** stringColumns, ui32 * stringsSizes, NYql::NUdf::TUnboxedValue * iColumns, const TTable &other) { + + if ((intColumns[0] & 1)) + return EAddTupleResult::Unmatched; TotalPacked++; @@ -83,6 +86,16 @@ void TTable::AddTuple( ui64 * intColumns, char ** stringColumns, ui32 * strings ui64 bucket = hash & BucketsMask; + if (other.TableBucketsStats[bucket].BloomFilter.IsFinalized()) { + auto bucket2 = &other.TableBucketsStats[bucket]; + auto &bloomFilter = bucket2->BloomFilter; + ++BloomLookups_; + if (bloomFilter.IsMissing(hash)) { + ++BloomHits_; + return EAddTupleResult::Unmatched; + } + } + std::vector> & keyIntVals = TableBuckets[bucket].KeyIntVals; std::vector> & stringsOffsets = TableBuckets[bucket].StringsOffsets; std::vector> & dataIntVals = TableBuckets[bucket].DataIntVals; @@ -98,7 +111,8 @@ void TTable::AddTuple( ui64 * intColumns, char ** stringColumns, ui32 * strings if (IsAny_) { if ( !AddKeysToHashTable(kh, keyIntVals.begin() + offset, iColumns) ) { keyIntVals.resize(offset); - return; + ++AnyFiltered_; + return EAddTupleResult::AnyMatch; } } @@ -146,6 +160,7 @@ void TTable::AddTuple( ui64 * intColumns, char ** stringColumns, ui32 * strings TableBucketsStats[bucket].KeyIntValsTotalSize += keyIntVals.size() - offset; TableBucketsStats[bucket].StringValuesTotalSize += stringVals.size() - initialStringsSize; + return EAddTupleResult::Added; } void TTable::ResetIterator() { @@ -348,6 +363,8 @@ void TTable::Join( TTable & t1, TTable & t2, EJoinKind joinKind, bool hasMoreLef joinResults.clear(); TTableBucket * bucket1 = &JoinTable1->TableBuckets[bucket]; TTableBucket * bucket2 = &JoinTable2->TableBuckets[bucket]; + TTableBucketStats * bucketStats1 = &JoinTable1->TableBucketsStats[bucket]; + TTableBucketStats * bucketStats2 = &JoinTable2->TableBucketsStats[bucket]; ui64 tuplesNum1 = JoinTable1->TableBucketsStats[bucket].TuplesNum; ui64 tuplesNum2 = JoinTable2->TableBucketsStats[bucket].TuplesNum; @@ -367,6 +384,7 @@ void TTable::Join( TTable & t1, TTable & t2, EJoinKind joinKind, bool hasMoreLef if (swapTables) { std::swap(bucket1, bucket2); + std::swap(bucketStats1, bucketStats2); std::swap(headerSize1, headerSize2); std::swap(nullsSize1, nullsSize2); std::swap(keyIntOffset1, keyIntOffset2); @@ -388,12 +406,15 @@ void TTable::Join( TTable & t1, TTable & t2, EJoinKind joinKind, bool hasMoreLef ui64 &nSlots = bucket2->NSlots; auto &joinSlots = bucket2->JoinSlots; + auto &bloomFilter = bucketStats2->BloomFilter; bool initHashTable = false; if (!nSlots) { nSlots = (3 * tuplesNum2 + 1) | 1; joinSlots.resize(nSlots*slotSize, 0); + bloomFilter.Resize(tuplesNum2); initHashTable = true; + ++InitHashTableCount_; } auto firstSlot = [begin = joinSlots.begin(), slotSize, nSlots](auto hash) { @@ -421,11 +442,16 @@ void TTable::Join( TTable & t1, TTable & t2, EJoinKind joinKind, bool hasMoreLef if (HasBitSet(nullsPtr, 1)) continue; + bloomFilter.Add(hash); + auto slotIt = firstSlot(hash); + ++HashLookups_; for (; *slotIt != 0; slotIt = nextSlot(slotIt)) { + ++HashO1Iterations_; } + ++HashSlotIterations_; if (keysValSize <= slotSize - 1) { @@ -439,8 +465,11 @@ void TTable::Join( TTable & t1, TTable & t2, EJoinKind joinKind, bool hasMoreLef } slotIt[slotSize - 1] = tuple2Idx; } + bloomFilter.Finalize(); + if (swapTables) JoinTable1Total_ += tuplesNum2; else JoinTable2Total_ += tuplesNum2; } + if (swapTables) JoinTable2Total_ += tuplesNum1; else JoinTable1Total_ += tuplesNum1; ui32 tuple1Idx = 0; auto it1 = bucket1->KeyIntVals.begin(); @@ -451,6 +480,8 @@ void TTable::Join( TTable & t1, TTable & t2, EJoinKind joinKind, bool hasMoreLef // strSize only present if HasKeyStrCol || HasKeyICol // strPos is only present if (HasKeyStrCol || HasKeyICol) && strSize + headerSize >= slotSize // slotSize, slotIdx and strPos is only for hashtable (table2) + ui64 bloomHits = 0; + ui64 bloomLookups = 0; for (ui64 keysValSize = headerSize1; it1 != bucket1->KeyIntVals.end(); it1 += keysValSize, ++tuple1Idx ) { @@ -465,14 +496,27 @@ void TTable::Join( TTable & t1, TTable & t2, EJoinKind joinKind, bool hasMoreLef continue; } + if (initHashTable) { + bloomLookups++; + if (bloomFilter.IsMissing(hash)) { + bloomHits++; + continue; + } + } + + ++HashLookups_; + + auto saveTuplesFound = tuplesFound; auto slotIt = firstSlot(hash); for (; *slotIt != 0; slotIt = nextSlot(slotIt) ) { + ++HashO1Iterations_; if (*slotIt != hash) continue; auto tuple2Idx = slotIt[slotSize - 1]; + ++HashSlotIterations_; if (table1HasKeyIColumns || !(keysValSize - nullsSize1 <= slotSize - 1 - nullsSize2)) { // 2nd condition cannot be true unless HasKeyStringColumns or HasKeyIColumns, hence size at the end of header is present @@ -518,14 +562,23 @@ void TTable::Join( TTable & t1, TTable & t2, EJoinKind joinKind, bool hasMoreLef joinIds.id2 = swapTables ? tuple1Idx : tuple2Idx; joinResults.emplace_back(joinIds); } + BloomFalsePositives_ += saveTuplesFound == tuplesFound; } if (!hasMoreLeftTuples && !hasMoreRightTuples) { joinSlots.clear(); joinSlots.shrink_to_fit(); nSlots = 0; + bloomFilter.Shrink(); } + if (bloomHits < bloomLookups/8) { + // Bloomfilter was inefficient, drop it + bloomFilter.Shrink(); + } + BloomHits_ += bloomHits; + BloomLookups_ += bloomLookups; + std::sort(joinResults.begin(), joinResults.end(), [](JoinTuplesIds a, JoinTuplesIds b) { if (a.id1 < b.id1) return true; @@ -571,6 +624,21 @@ void TTable::Join( TTable & t1, TTable & t2, EJoinKind joinKind, bool hasMoreLef } } + YQL_LOG(GRACEJOIN_TRACE) + << (const void *)this << '#' + << bucket + << " Table1 " << JoinTable1->TableBucketsStats[bucket].TuplesNum + << " Table2 " << JoinTable2->TableBucketsStats[bucket].TuplesNum + << " LeftTableBatch " << LeftTableBatch_ + << " leftMatchedIds " << leftMatchedIds.size() + << " RightTableBatch " << RightTableBatch_ + << " rightMatchedIds " << rightMatchedIds.size() + << " rightIds " << rightIds.size() + << " joinIds " << joinIds.size() + << " joinKind " << (int)JoinKind + << " swapTables " << swapTables + << " initHashTable " << initHashTable + ; } HasMoreLeftTuples_ = hasMoreLeftTuples; @@ -700,7 +768,7 @@ inline bool TTable::AddKeysToHashTable(KeysHashTable& t, ui64* keys, NYql::NUdf: } if ( HasBitSet(keys + HashSize, 1)) // Keys with null value - return false; + return true; ui64 hash = *keys; ui64 slot = hash % t.NSlots; @@ -1291,6 +1359,16 @@ TTable::TTable( ui64 numberOfKeyIntColumns, ui64 numberOfKeyStringColumns, } TTable::~TTable() { + YQL_LOG_IF(GRACEJOIN_DEBUG, InitHashTableCount_) + << (const void *)this << '#' << "InitHashTableCount " << InitHashTableCount_ + << " BloomLookups " << BloomLookups_ << " BloomHits " << BloomHits_ << " BloomFalsePositives " << BloomFalsePositives_ + << " HashLookups " << HashLookups_ << " HashChainTraversal " << HashO1Iterations_/(double)HashLookups_ << " HashSlotOperations " << HashSlotIterations_/(double)HashLookups_ + << " Table1 " << JoinTable1Total_ << " Table2 " << JoinTable2Total_ << " TuplesFound " << TuplesFound_ + ; + YQL_LOG_IF(GRACEJOIN_DEBUG, JoinTable1 && JoinTable1->AnyFiltered_) << (const void *)this << '#' << "L AnyFiltered " << JoinTable1->AnyFiltered_; + YQL_LOG_IF(GRACEJOIN_DEBUG, JoinTable1 && JoinTable1->BloomLookups_) << (const void *)this << '#' << "L BloomLookups " << JoinTable1->BloomLookups_ << " BloomHits " << JoinTable1->BloomHits_; + YQL_LOG_IF(GRACEJOIN_DEBUG, JoinTable2 && JoinTable2->AnyFiltered_) << (const void *)this << '#' << "R AnyFiltered " << JoinTable2->AnyFiltered_; + YQL_LOG_IF(GRACEJOIN_DEBUG, JoinTable2 && JoinTable2->BloomLookups_) << (const void *)this << '#' << "R BloomLookups " << JoinTable2->BloomLookups_ << " BloomHits " << JoinTable2->BloomHits_; }; TTableBucketSpiller::TTableBucketSpiller(ISpiller::TPtr spiller, size_t sizeLimit) diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_grace_join_imp.h b/ydb/library/yql/minikql/comp_nodes/mkql_grace_join_imp.h index 2a897df2018b..1324e293cd8c 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_grace_join_imp.h +++ b/ydb/library/yql/minikql/comp_nodes/mkql_grace_join_imp.h @@ -11,6 +11,8 @@ namespace NMiniKQL { namespace GraceJoin { class TTableBucketSpiller; +#define GRACEJOIN_DEBUG DEBUG +#define GRACEJOIN_TRACE TRACE const ui64 BitsForNumberOfBuckets = 5; // 2^5 = 32 const ui64 BucketsMask = (0x00000001 << BitsForNumberOfBuckets) - 1; @@ -21,6 +23,89 @@ const ui64 HashSize = 1; // Using ui64 hash size const ui64 SpillingSizeLimit = 1_MB; // Don't try to spill if net effect is lower than this size const ui32 SpillingRowLimit = 1024; // Don't try to invoke spilling more often than 1 in this number of rows +constexpr ui64 CachelineBits = 9; +constexpr ui64 CachelineSize = ui64(1)< +class TBloomfilter { + std::vector Storage_; + ui64 *Ptr_; + ui64 Bits_; + bool Finalized_ = false; + + public: + + static constexpr ui64 BlockSize = CachelineSize; + static constexpr ui64 BlockBits = CachelineBits; + + TBloomfilter() {} + TBloomfilter(ui64 size) { + Resize(size); + } + + void Resize(ui64 size) { + size = std::max(size, CachelineSize); + Bits_ = 6; + + for (; (ui64(1)< multiply by 8 + size = 1u<<(Bits_ - 6); + + Storage_.clear(); + Storage_.resize(size + CachelineSize/sizeof(ui64) - 1); + + // align Ptr_ up to BlockSize + Ptr_ = (ui64 *)((uintptr_t(Storage_.data()) + BlockSize - 1) & ~(BlockSize - 1)); + Finalized_ = false; + } + + void Add(ui64 hash) { + Y_DEBUG_ABORT_UNLESS(!Finalized_); + + auto bit = (hash >> (64 - Bits_)); + Ptr_[bit/64] |= (ui64(1)<<(bit % 64)); + // replace low BlockBits with next part of hash + auto low = hash >> (64 - Bits_ - BlockBits); + bit &= ~(BlockSize - 1); + bit ^= low & (BlockSize - 1); + Ptr_[bit/64] |= (ui64(1) << (bit % 64)); + } + + bool IsMissing(ui64 hash) const { + Y_DEBUG_ABORT_UNLESS(Finalized_); + + auto bit = (hash >> (64 - Bits_)); + if (!(Ptr_[bit/64] & (ui64(1)<<(bit % 64)))) + return true; + // replace low BlockBits with next part of hash + auto low = hash >> (64 - Bits_ - BlockBits); + bit &= ~(BlockSize - 1); + bit ^= low & (BlockSize - 1); + if (!(Ptr_[bit/64] & (ui64(1)<<(bit % 64)))) + return true; + return false; + } + + constexpr bool IsFinalized() const { + return Finalized_; + } + + void Finalize() { + Finalized_ = true; + } + + void Shrink() { + Finalized_ = false; + Bits_ = 1; + Storage_.clear(); + Storage_.resize(1, ~ui64(0)); + Storage_.shrink_to_fit(); + Ptr_ = Storage_.data(); + } +}; + /* Table data stored in buckets. Table columns are interpreted either as integers, strings or some interface-based type, providing IHash, IEquate, IPack and IUnpack functions. @@ -73,9 +158,11 @@ struct TTableBucket { std::vector> JoinSlots; // Hashtable ui64 NSlots = 0; // Hashtable + }; struct TTableBucketStats { + TBloomfilter> BloomFilter; KeysHashTable AnyHashTable; // Hash table to process join only for unique keys (any join attribute) ui64 TuplesNum = 0; // Total number of tuples in bucket ui64 StringValuesTotalSize = 0; // Total size of StringsValues. Used to correctly calculate StringsOffsets. @@ -262,10 +349,6 @@ class TTable { public: - // Adds new tuple to the table. intColumns, stringColumns - data of columns, - // stringsSizes - sizes of strings columns. Indexes of null-value columns - // in the form of bit array should be first values of intColumns. - void AddTuple(ui64* intColumns, char** stringColumns, ui32* stringsSizes, NYql::NUdf::TUnboxedValue * iColumns = nullptr); // Resets iterators. In case of join results table it also resets iterators for joined tables void ResetIterator(); @@ -335,9 +418,28 @@ class TTable { ui64 numberOfDataIntColumns = 0, ui64 numberOfDataStringColumns = 0, ui64 numberOfKeyIColumns = 0, ui64 numberOfDataIColumns = 0, ui64 nullsBitmapSize = 1, TColTypeInterface * colInterfaces = nullptr, bool isAny = false); + + enum class EAddTupleResult { Added, Unmatched, AnyMatch }; + // Adds new tuple to the table. intColumns, stringColumns - data of columns, + // stringsSizes - sizes of strings columns. Indexes of null-value columns + // in the form of bit array should be first values of intColumns. + EAddTupleResult AddTuple(ui64* intColumns, char** stringColumns, ui32* stringsSizes, NYql::NUdf::TUnboxedValue * iColumns = nullptr, const TTable &other = {}); ~TTable(); + ui64 InitHashTableCount_ = 0; + + ui64 HashLookups_ = 0; // hash lookups + ui64 HashO1Iterations_ = 0; // hash chain + ui64 HashSlotIterations_ = 0; // O(SlotSize) operations + + ui64 JoinTable1Total_ = 0; + ui64 JoinTable2Total_ = 0; + ui64 AnyFiltered_ = 0; + + ui64 BloomLookups_ = 0; + ui64 BloomHits_ = 0; + ui64 BloomFalsePositives_ = 0; };