Skip to content

Commit

Permalink
Extend early dictionary assessment to all stripes (facebookincubator#…
Browse files Browse the repository at this point in the history
…10131)

Summary:
Pull Request resolved: facebookincubator#10131

Follow up diff to allow assessing dictionary early for all stripes.

Currently, this will cause continual invocation of abandonDictionaries for later stripes that are no-ops.

Reviewed By: xiaoxmeng

Differential Revision: D58396478
  • Loading branch information
Huameng (Michael) Jiang authored and facebook-github-bot committed Jun 12, 2024
1 parent 9434bba commit fcbbe64
Show file tree
Hide file tree
Showing 4 changed files with 86 additions and 20 deletions.
66 changes: 61 additions & 5 deletions velox/dwio/dwrf/test/FlushPolicyTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -126,14 +126,15 @@ TEST_F(DefaultFlushPolicyTest, AdditionalCriteriaTest) {
dwio::common::StripeProgress{},
testCase.dictionarySize))
<< fmt::format(
"flushStripe = {}, overMemoryBudget = {}, dictionarySize = {}",
"decision = {}, flushStripe = {}, overMemoryBudget = {}, dictionarySize = {}",
testCase.decision,
testCase.flushStripe,
testCase.overMemoryBudget,
testCase.dictionarySize);
}
}

TEST_F(DefaultFlushPolicyTest, EarlyDictionaryEvaluation) {
TEST_F(DefaultFlushPolicyTest, EarlyDictionaryAssessment) {
// Test the precedence of decisions.
struct TestCase {
dwio::common::StripeProgress stripeProgress;
Expand All @@ -152,7 +153,7 @@ TEST_F(DefaultFlushPolicyTest, EarlyDictionaryEvaluation) {
.flushStripe = false,
.dictionarySizeThreshold = 20,
.dictionarySize = 15,
.decision = FlushDecision::EVALUATE_DICTIONARY},
.decision = FlushDecision::CHECK_DICTIONARY},
TestCase{
.stripeProgress =
dwio::common::StripeProgress{.stripeSizeEstimate = 100},
Expand Down Expand Up @@ -212,7 +213,7 @@ TEST_F(DefaultFlushPolicyTest, EarlyDictionaryEvaluation) {
/*overMemoryBudget=*/false,
dwio::common::StripeProgress{.stripeSizeEstimate = 400},
/*dictionarySize=*/20),
FlushDecision::EVALUATE_DICTIONARY);
FlushDecision::CHECK_DICTIONARY);
EXPECT_EQ(
policy.shouldFlushDictionary(
/*flushStripe=*/false,
Expand All @@ -226,7 +227,7 @@ TEST_F(DefaultFlushPolicyTest, EarlyDictionaryEvaluation) {
/*overMemoryBudget=*/false,
dwio::common::StripeProgress{.stripeSizeEstimate = 700},
/*dictionarySize=*/20),
FlushDecision::EVALUATE_DICTIONARY);
FlushDecision::CHECK_DICTIONARY);
EXPECT_EQ(
policy.shouldFlushDictionary(
/*flushStripe=*/false,
Expand All @@ -236,6 +237,61 @@ TEST_F(DefaultFlushPolicyTest, EarlyDictionaryEvaluation) {
FlushDecision::SKIP);
}

TEST_F(DefaultFlushPolicyTest, EarlyDictionaryAssessmentAllStripes) {
// Test dictionary evaluation signals throughout the stripe.
DefaultFlushPolicy policy{
/*stripeSizeThreshold=*/1000,
/*dictionarySizeThreshold=*/std::numeric_limits<uint64_t>::max()};
EXPECT_EQ(
policy.shouldFlushDictionary(
/*flushStripe=*/false,
/*overMemoryBudget=*/false,
dwio::common::StripeProgress{
.stripeIndex = 0, .stripeSizeEstimate = 100},
/*dictionarySize=*/20),
FlushDecision::SKIP);
EXPECT_EQ(
policy.shouldFlushDictionary(
/*flushStripe=*/false,
/*overMemoryBudget=*/false,
dwio::common::StripeProgress{
.stripeIndex = 0, .stripeSizeEstimate = 700},
/*dictionarySize=*/20),
FlushDecision::CHECK_DICTIONARY);
EXPECT_EQ(
policy.shouldFlushDictionary(
/*flushStripe=*/false,
/*overMemoryBudget=*/false,
dwio::common::StripeProgress{
.stripeIndex = 0, .stripeSizeEstimate = 800},
/*dictionarySize=*/20),
FlushDecision::SKIP);
EXPECT_EQ(
policy.shouldFlushDictionary(
/*flushStripe=*/false,
/*overMemoryBudget=*/false,
dwio::common::StripeProgress{
.stripeIndex = 1, .stripeSizeEstimate = 100},
/*dictionarySize=*/20),
FlushDecision::SKIP);
EXPECT_EQ(
policy.shouldFlushDictionary(
/*flushStripe=*/false,
/*overMemoryBudget=*/false,
dwio::common::StripeProgress{
.stripeIndex = 1, .stripeSizeEstimate = 400},
/*dictionarySize=*/20),
FlushDecision::CHECK_DICTIONARY);
EXPECT_EQ(
policy.shouldFlushDictionary(
/*flushStripe=*/false,
/*overMemoryBudget=*/false,
dwio::common::StripeProgress{
.stripeIndex = 2, .stripeSizeEstimate = 900},
/*dictionarySize=*/20),
FlushDecision::CHECK_DICTIONARY);
}

TEST_F(DefaultFlushPolicyTest, EmptyFile) {
// Empty vector creation succeeds.
RowsPerStripeFlushPolicy policy({});
Expand Down
30 changes: 19 additions & 11 deletions velox/dwio/dwrf/writer/FlushPolicy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
#include "velox/dwio/dwrf/writer/FlushPolicy.h"

namespace {
static constexpr size_t kNumDictioanryTestsPerStripe = 3UL;
static constexpr size_t kNumDictionaryAssessmentsPerStripe = 3UL;
} // namespace

namespace facebook::velox::dwrf {
Expand All @@ -27,10 +27,17 @@ DefaultFlushPolicy::DefaultFlushPolicy(
uint64_t dictionarySizeThreshold)
: stripeSizeThreshold_{stripeSizeThreshold},
dictionarySizeThreshold_{dictionarySizeThreshold},
dictionaryAssessmentThreshold_{getDictionaryAssessmentIncrement()} {}
dictionaryCheckIncrement_{
stripeSizeThreshold_ / kNumDictionaryAssessmentsPerStripe} {
VELOX_CHECK_GT(dictionaryCheckIncrement_, 0);
setNextDictionaryCheckThreshold();
}

uint64_t DefaultFlushPolicy::getDictionaryAssessmentIncrement() const {
return stripeSizeThreshold_ / kNumDictioanryTestsPerStripe;
void DefaultFlushPolicy::setNextDictionaryCheckThreshold(
uint64_t stripeSizeEstimate) {
// Add 1 so we can round up to the next increment above the current estimate.
dictionaryCheckThreshold_ =
bits::roundUp(stripeSizeEstimate + 1, dictionaryCheckIncrement_);
}

FlushDecision DefaultFlushPolicy::shouldFlushDictionary(
Expand All @@ -45,13 +52,14 @@ FlushDecision DefaultFlushPolicy::shouldFlushDictionary(
if (dictionaryMemoryUsage > dictionarySizeThreshold_) {
return FlushDecision::FLUSH_DICTIONARY;
}
if (stripeProgress.stripeSizeEstimate >= dictionaryAssessmentThreshold_) {
// In the current implementation, since we don't ever change encoding
// decision after the first stripe, we don't need to ever reset this
// threshold.
dictionaryAssessmentThreshold_ +=
stripeSizeThreshold_ / kNumDictioanryTestsPerStripe;
return FlushDecision::EVALUATE_DICTIONARY;
if (stripeIndex_ < stripeProgress.stripeIndex) {
// Reset dictionary check threshold for the new stripe.
setNextDictionaryCheckThreshold();
stripeIndex_ = stripeProgress.stripeIndex;
}
if (stripeProgress.stripeSizeEstimate >= dictionaryCheckThreshold_) {
setNextDictionaryCheckThreshold(stripeProgress.stripeSizeEstimate);
return FlushDecision::CHECK_DICTIONARY;
}
return FlushDecision::SKIP;
}
Expand Down
8 changes: 5 additions & 3 deletions velox/dwio/dwrf/writer/FlushPolicy.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
namespace facebook::velox::dwrf {
enum class FlushDecision {
SKIP,
EVALUATE_DICTIONARY,
CHECK_DICTIONARY,
FLUSH_DICTIONARY,
ABANDON_DICTIONARY,
};
Expand Down Expand Up @@ -76,11 +76,13 @@ class DefaultFlushPolicy : public DWRFFlushPolicy {
void onClose() override {}

private:
uint64_t getDictionaryAssessmentIncrement() const;
void setNextDictionaryCheckThreshold(uint64_t stripeSizeEstimate = 0);

const uint64_t stripeSizeThreshold_;
const uint64_t dictionarySizeThreshold_;
uint64_t dictionaryAssessmentThreshold_;
const uint64_t dictionaryCheckIncrement_;
uint64_t stripeIndex_{0};
uint64_t dictionaryCheckThreshold_{0};
};

class RowsPerStripeFlushPolicy : public DWRFFlushPolicy {
Expand Down
2 changes: 1 addition & 1 deletion velox/dwio/dwrf/writer/Writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,7 @@ bool Writer::shouldFlush(const WriterContext& context, size_t nextWriteRows) {
overBudget = overMemoryBudget(context, nextWriteRows);
stripeProgressDecision =
flushPolicy_->shouldFlush(getStripeProgress(context));
} else if (dictionaryFlushDecision == FlushDecision::EVALUATE_DICTIONARY) {
} else if (dictionaryFlushDecision == FlushDecision::CHECK_DICTIONARY) {
writer_->tryAbandonDictionaries(/*force=*/false);
}

Expand Down

0 comments on commit fcbbe64

Please sign in to comment.