diff --git a/velox/dwio/dwrf/test/FlushPolicyTest.cpp b/velox/dwio/dwrf/test/FlushPolicyTest.cpp index e571477cf9473..375a51fdf00d5 100644 --- a/velox/dwio/dwrf/test/FlushPolicyTest.cpp +++ b/velox/dwio/dwrf/test/FlushPolicyTest.cpp @@ -126,14 +126,15 @@ TEST_F(DefaultFlushPolicyTest, AdditionalCriteriaTest) { dwio::common::StripeProgress{}, testCase.dictionarySize)) << fmt::format( - "flushStripe = {}, overMemoryBudget = {}, dictionarySize = {}", + "decision = {}, flushStripe = {}, overMemoryBudget = {}, dictionarySize = {}", + testCase.decision, testCase.flushStripe, testCase.overMemoryBudget, testCase.dictionarySize); } } -TEST_F(DefaultFlushPolicyTest, EarlyDictionaryEvaluation) { +TEST_F(DefaultFlushPolicyTest, EarlyDictionaryAssessment) { // Test the precedence of decisions. struct TestCase { dwio::common::StripeProgress stripeProgress; @@ -152,7 +153,7 @@ TEST_F(DefaultFlushPolicyTest, EarlyDictionaryEvaluation) { .flushStripe = false, .dictionarySizeThreshold = 20, .dictionarySize = 15, - .decision = FlushDecision::EVALUATE_DICTIONARY}, + .decision = FlushDecision::CHECK_DICTIONARY}, TestCase{ .stripeProgress = dwio::common::StripeProgress{.stripeSizeEstimate = 100}, @@ -212,7 +213,7 @@ TEST_F(DefaultFlushPolicyTest, EarlyDictionaryEvaluation) { /*overMemoryBudget=*/false, dwio::common::StripeProgress{.stripeSizeEstimate = 400}, /*dictionarySize=*/20), - FlushDecision::EVALUATE_DICTIONARY); + FlushDecision::CHECK_DICTIONARY); EXPECT_EQ( policy.shouldFlushDictionary( /*flushStripe=*/false, @@ -226,7 +227,7 @@ TEST_F(DefaultFlushPolicyTest, EarlyDictionaryEvaluation) { /*overMemoryBudget=*/false, dwio::common::StripeProgress{.stripeSizeEstimate = 700}, /*dictionarySize=*/20), - FlushDecision::EVALUATE_DICTIONARY); + FlushDecision::CHECK_DICTIONARY); EXPECT_EQ( policy.shouldFlushDictionary( /*flushStripe=*/false, @@ -236,6 +237,61 @@ TEST_F(DefaultFlushPolicyTest, EarlyDictionaryEvaluation) { FlushDecision::SKIP); } +TEST_F(DefaultFlushPolicyTest, EarlyDictionaryAssessmentAllStripes) { + // Test dictionary evaluation signals throughout the stripe. + DefaultFlushPolicy policy{ + /*stripeSizeThreshold=*/1000, + /*dictionarySizeThreshold=*/std::numeric_limits::max()}; + EXPECT_EQ( + policy.shouldFlushDictionary( + /*flushStripe=*/false, + /*overMemoryBudget=*/false, + dwio::common::StripeProgress{ + .stripeIndex = 0, .stripeSizeEstimate = 100}, + /*dictionarySize=*/20), + FlushDecision::SKIP); + EXPECT_EQ( + policy.shouldFlushDictionary( + /*flushStripe=*/false, + /*overMemoryBudget=*/false, + dwio::common::StripeProgress{ + .stripeIndex = 0, .stripeSizeEstimate = 700}, + /*dictionarySize=*/20), + FlushDecision::CHECK_DICTIONARY); + EXPECT_EQ( + policy.shouldFlushDictionary( + /*flushStripe=*/false, + /*overMemoryBudget=*/false, + dwio::common::StripeProgress{ + .stripeIndex = 0, .stripeSizeEstimate = 800}, + /*dictionarySize=*/20), + FlushDecision::SKIP); + EXPECT_EQ( + policy.shouldFlushDictionary( + /*flushStripe=*/false, + /*overMemoryBudget=*/false, + dwio::common::StripeProgress{ + .stripeIndex = 1, .stripeSizeEstimate = 100}, + /*dictionarySize=*/20), + FlushDecision::SKIP); + EXPECT_EQ( + policy.shouldFlushDictionary( + /*flushStripe=*/false, + /*overMemoryBudget=*/false, + dwio::common::StripeProgress{ + .stripeIndex = 1, .stripeSizeEstimate = 400}, + /*dictionarySize=*/20), + FlushDecision::CHECK_DICTIONARY); + EXPECT_EQ( + policy.shouldFlushDictionary( + /*flushStripe=*/false, + /*overMemoryBudget=*/false, + dwio::common::StripeProgress{ + .stripeIndex = 2, .stripeSizeEstimate = 900}, + /*dictionarySize=*/20), + FlushDecision::CHECK_DICTIONARY); +} + TEST_F(DefaultFlushPolicyTest, EmptyFile) { // Empty vector creation succeeds. RowsPerStripeFlushPolicy policy({}); diff --git a/velox/dwio/dwrf/writer/FlushPolicy.cpp b/velox/dwio/dwrf/writer/FlushPolicy.cpp index 0236929654b6a..da81bc6dd6059 100644 --- a/velox/dwio/dwrf/writer/FlushPolicy.cpp +++ b/velox/dwio/dwrf/writer/FlushPolicy.cpp @@ -17,7 +17,7 @@ #include "velox/dwio/dwrf/writer/FlushPolicy.h" namespace { -static constexpr size_t kNumDictioanryTestsPerStripe = 3UL; +static constexpr size_t kNumDictionaryAssessmentsPerStripe = 3UL; } // namespace namespace facebook::velox::dwrf { @@ -27,10 +27,17 @@ DefaultFlushPolicy::DefaultFlushPolicy( uint64_t dictionarySizeThreshold) : stripeSizeThreshold_{stripeSizeThreshold}, dictionarySizeThreshold_{dictionarySizeThreshold}, - dictionaryAssessmentThreshold_{getDictionaryAssessmentIncrement()} {} + dictionaryCheckIncrement_{ + stripeSizeThreshold_ / kNumDictionaryAssessmentsPerStripe} { + VELOX_CHECK_GT(dictionaryCheckIncrement_, 0); + setNextDictionaryCheckThreshold(); +} -uint64_t DefaultFlushPolicy::getDictionaryAssessmentIncrement() const { - return stripeSizeThreshold_ / kNumDictioanryTestsPerStripe; +void DefaultFlushPolicy::setNextDictionaryCheckThreshold( + uint64_t stripeSizeEstimate) { + // Add 1 so we can round up to the next increment above the current estimate. + dictionaryCheckThreshold_ = + bits::roundUp(stripeSizeEstimate + 1, dictionaryCheckIncrement_); } FlushDecision DefaultFlushPolicy::shouldFlushDictionary( @@ -45,13 +52,14 @@ FlushDecision DefaultFlushPolicy::shouldFlushDictionary( if (dictionaryMemoryUsage > dictionarySizeThreshold_) { return FlushDecision::FLUSH_DICTIONARY; } - if (stripeProgress.stripeSizeEstimate >= dictionaryAssessmentThreshold_) { - // In the current implementation, since we don't ever change encoding - // decision after the first stripe, we don't need to ever reset this - // threshold. - dictionaryAssessmentThreshold_ += - stripeSizeThreshold_ / kNumDictioanryTestsPerStripe; - return FlushDecision::EVALUATE_DICTIONARY; + if (stripeIndex_ < stripeProgress.stripeIndex) { + // Reset dictionary check threshold for the new stripe. + setNextDictionaryCheckThreshold(); + stripeIndex_ = stripeProgress.stripeIndex; + } + if (stripeProgress.stripeSizeEstimate >= dictionaryCheckThreshold_) { + setNextDictionaryCheckThreshold(stripeProgress.stripeSizeEstimate); + return FlushDecision::CHECK_DICTIONARY; } return FlushDecision::SKIP; } diff --git a/velox/dwio/dwrf/writer/FlushPolicy.h b/velox/dwio/dwrf/writer/FlushPolicy.h index 1e60f5a0dc530..1e9e4c87db829 100644 --- a/velox/dwio/dwrf/writer/FlushPolicy.h +++ b/velox/dwio/dwrf/writer/FlushPolicy.h @@ -23,7 +23,7 @@ namespace facebook::velox::dwrf { enum class FlushDecision { SKIP, - EVALUATE_DICTIONARY, + CHECK_DICTIONARY, FLUSH_DICTIONARY, ABANDON_DICTIONARY, }; @@ -76,11 +76,13 @@ class DefaultFlushPolicy : public DWRFFlushPolicy { void onClose() override {} private: - uint64_t getDictionaryAssessmentIncrement() const; + void setNextDictionaryCheckThreshold(uint64_t stripeSizeEstimate = 0); const uint64_t stripeSizeThreshold_; const uint64_t dictionarySizeThreshold_; - uint64_t dictionaryAssessmentThreshold_; + const uint64_t dictionaryCheckIncrement_; + uint64_t stripeIndex_{0}; + uint64_t dictionaryCheckThreshold_{0}; }; class RowsPerStripeFlushPolicy : public DWRFFlushPolicy { diff --git a/velox/dwio/dwrf/writer/Writer.cpp b/velox/dwio/dwrf/writer/Writer.cpp index 1b1807faaaf27..753e6c7c42132 100644 --- a/velox/dwio/dwrf/writer/Writer.cpp +++ b/velox/dwio/dwrf/writer/Writer.cpp @@ -354,7 +354,7 @@ bool Writer::shouldFlush(const WriterContext& context, size_t nextWriteRows) { overBudget = overMemoryBudget(context, nextWriteRows); stripeProgressDecision = flushPolicy_->shouldFlush(getStripeProgress(context)); - } else if (dictionaryFlushDecision == FlushDecision::EVALUATE_DICTIONARY) { + } else if (dictionaryFlushDecision == FlushDecision::CHECK_DICTIONARY) { writer_->tryAbandonDictionaries(/*force=*/false); }