Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Update throttler to support network signal #11611

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions velox/common/base/Counters.h
Original file line number Diff line number Diff line change
Expand Up @@ -357,4 +357,7 @@ constexpr folly::StringPiece kMetricStorageLocalThrottled{

constexpr folly::StringPiece kMetricStorageGlobalThrottled{
"velox.storage_global_throttled_count"};

constexpr folly::StringPiece kMetricStorageNetworkThrottled{
"velox.storage_network_throttled_count"};
} // namespace facebook::velox
4 changes: 3 additions & 1 deletion velox/docs/monitoring/metrics.rst
Original file line number Diff line number Diff line change
Expand Up @@ -435,7 +435,9 @@ Storage
* - storage_global_throttled_count
- Count
- The number of times that storage IOs get throttled in a storage cluster.

* - storage_network_throttled_count
- Count
- The number of times that storage IOs get throttled in a storage cluster because of network.
Spilling
--------

Expand Down
161 changes: 110 additions & 51 deletions velox/dwio/common/Throttler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ Throttler::Config::Config(
double _backoffScaleFactor,
uint32_t _minLocalThrottledSignals,
uint32_t _minGlobalThrottledSignals,
uint32_t _minNetworkThrottledSignals,
uint32_t _maxCacheEntries,
uint32_t _cacheTTLMs)
: throttleEnabled(_throttleEnabled),
Expand All @@ -48,18 +49,20 @@ Throttler::Config::Config(
backoffScaleFactor(_backoffScaleFactor),
minLocalThrottledSignals(_minLocalThrottledSignals),
minGlobalThrottledSignals(_minGlobalThrottledSignals),
minNetworkThrottledSignals(_minNetworkThrottledSignals),
maxCacheEntries(_maxCacheEntries),
cacheTTLMs(_cacheTTLMs) {}

std::string Throttler::Config::toString() const {
return fmt::format(
"throttleEnabled:{} minThrottleBackoffMs:{} maxThrottleBackoffMs:{} backoffScaleFactor:{} minLocalThrottledSignals:{} minGlobalThrottledSignals:{} maxCacheEntries:{} cacheTTLMs:{}",
"throttleEnabled:{} minThrottleBackoffMs:{} maxThrottleBackoffMs:{} backoffScaleFactor:{} minLocalThrottledSignals:{} minGlobalThrottledSignals:{} minNetworkThrottledSignals:{} maxCacheEntries:{} cacheTTLMs:{}",
throttleEnabled,
succinctMillis(minThrottleBackoffMs),
succinctMillis(maxThrottleBackoffMs),
backoffScaleFactor,
minLocalThrottledSignals,
minGlobalThrottledSignals,
minNetworkThrottledSignals,
maxCacheEntries,
succinctMillis(cacheTTLMs));
};
Expand All @@ -72,6 +75,8 @@ std::string Throttler::signalTypeName(SignalType type) {
return "Local";
case SignalType::kGlobal:
return "Global";
case SignalType::kNetwork:
return "Network";
default:
return fmt::format("Unknown Signal Type: {}", static_cast<int>(type));
}
Expand Down Expand Up @@ -103,24 +108,21 @@ Throttler::Throttler(const Config& config)
minThrottleBackoffDurationMs_(config.minThrottleBackoffMs),
maxThrottleBackoffDurationMs_(config.maxThrottleBackoffMs),
backoffScaleFactor_(config.backoffScaleFactor),
minLocalThrottledSignalsToBackoff_(config.minLocalThrottledSignals),
minGlobalThrottledSignalsToBackoff_(config.minGlobalThrottledSignals),
localThrottleCache_(
!throttleEnabled_
? nullptr
: new ThrottleSignalFactory{std::make_unique<SimpleLRUCache<std::string, ThrottleSignal>>(
config.maxCacheEntries,
config.cacheTTLMs),
std::unique_ptr<ThrottleSignalGenerator>{
new ThrottleSignalGenerator{}}}),
globalThrottleCache_(
!throttleEnabled_
? nullptr
: new ThrottleSignalFactory{std::make_unique<SimpleLRUCache<std::string, ThrottleSignal>>(
config.maxCacheEntries,
config.cacheTTLMs),
std::unique_ptr<ThrottleSignalGenerator>{
new ThrottleSignalGenerator{}}}) {
localThrottleCache_(maybeMakeThrottleSignalCache(
config.throttleEnabled,
config.minLocalThrottledSignals,
config.maxCacheEntries,
config.cacheTTLMs)),
globalThrottleCache_(maybeMakeThrottleSignalCache(
config.throttleEnabled,
config.minGlobalThrottledSignals,
config.maxCacheEntries,
config.cacheTTLMs)),
networkThrottleCache_(maybeMakeThrottleSignalCache(
config.throttleEnabled,
config.minNetworkThrottledSignals,
config.maxCacheEntries,
config.cacheTTLMs)) {
LOG(INFO) << "IO throttler config: " << config.toString();
}

Expand Down Expand Up @@ -149,12 +151,22 @@ void Throttler::updateThrottleStats(SignalType type, uint64_t backoffDelayMs) {
stats_.backOffDelay.increment(backoffDelayMs);
RECORD_HISTOGRAM_METRIC_VALUE(
kMetricStorageThrottledDurationMs, backoffDelayMs);
if (type == SignalType::kLocal) {
++stats_.localThrottled;
RECORD_METRIC_VALUE(kMetricStorageLocalThrottled);
} else {
++stats_.globalThrottled;
RECORD_METRIC_VALUE(kMetricStorageGlobalThrottled);

switch (type) {
case SignalType::kLocal:
++stats_.localThrottled;
RECORD_METRIC_VALUE(kMetricStorageLocalThrottled);
break;
case SignalType::kGlobal:
++stats_.globalThrottled;
RECORD_METRIC_VALUE(kMetricStorageGlobalThrottled);
break;
case SignalType::kNetwork:
++stats_.networkThrottled;
RECORD_METRIC_VALUE(kMetricStorageNetworkThrottled);
break;
default:
break;
}
}

Expand All @@ -163,48 +175,78 @@ void Throttler::updateThrottleCacheLocked(
const std::string& cluster,
const std::string& directory,
CachedThrottleSignalPtr& localSignal,
CachedThrottleSignalPtr& globalSignal) {
CachedThrottleSignalPtr& globalSignal,
CachedThrottleSignalPtr& networkSignal) {
VELOX_CHECK(throttleEnabled());

if (type == SignalType::kLocal) {
if (localSignal.get() == nullptr) {
localThrottleCache_->generate(localThrottleCacheKey(cluster, directory));
} else {
++localSignal->count;
}
} else {
if (globalSignal.get() == nullptr) {
globalThrottleCache_->generate(cluster);
} else {
++globalSignal->count;
}
}
switch (type) {
case SignalType::kLocal:
if (localSignal.get() == nullptr) {
localThrottleCache_.throttleCache->generate(
localThrottleCacheKey(cluster, directory));
} else {
++localSignal->count;
}
return;
case SignalType::kGlobal:
if (globalSignal.get() == nullptr) {
globalThrottleCache_.throttleCache->generate(cluster);
} else {
++globalSignal->count;
}
return;
case SignalType::kNetwork:
if (networkSignal.get() == nullptr) {
networkThrottleCache_.throttleCache->generate(cluster);
} else {
++networkSignal->count;
}
return;
default:
VELOX_UNREACHABLE("Invalid type provided: {}", signalTypeName(type));
};
}

uint64_t Throttler::calculateBackoffDurationAndUpdateThrottleCache(
SignalType type,
const std::string& cluster,
const std::string& directoy) {
const std::string& directory) {
std::lock_guard<std::mutex> l(mu_);
// Gets maximum count of local and global throttle signals in Cache.
auto localThrottleCachePtr =
localThrottleCache_->get(localThrottleCacheKey(cluster, directoy));
int64_t localThrottleCount =
// Gets maximum count of local, global, and network throttle signals in Cache.
auto localThrottleCachePtr = localThrottleCache_.throttleCache->get(
localThrottleCacheKey(cluster, directory));
const int64_t localThrottleCount =
(localThrottleCachePtr.get() != nullptr ? localThrottleCachePtr->count
: 0) +
(type == SignalType::kLocal ? 1 : 0) - minLocalThrottledSignalsToBackoff_;
auto globalThrottleCachePtr = globalThrottleCache_->get(cluster);
(type == SignalType::kLocal ? 1 : 0) -
localThrottleCache_.minThrottledSignalsToBackOff;

auto globalThrottleCachePtr =
globalThrottleCache_.throttleCache->get(cluster);
const int64_t globalThrottleCount =
(globalThrottleCachePtr.get() != nullptr ? globalThrottleCachePtr->count
: 0) +
(type == SignalType::kGlobal ? 1 : 0) -
minGlobalThrottledSignalsToBackoff_;
globalThrottleCache_.minThrottledSignalsToBackOff;

auto networkThrottleCachePtr =
networkThrottleCache_.throttleCache->get(cluster);
const int64_t networkThrottleCount =
(networkThrottleCachePtr.get() != nullptr ? networkThrottleCachePtr->count
: 0) +
(type == SignalType::kNetwork ? 1 : 0) -
networkThrottleCache_.minThrottledSignalsToBackOff;

// Update throttling signal cache.
updateThrottleCacheLocked(
type, cluster, directoy, localThrottleCachePtr, globalThrottleCachePtr);
type,
cluster,
directory,
localThrottleCachePtr,
globalThrottleCachePtr,
networkThrottleCachePtr);

const int64_t throttleAttempts =
std::max(localThrottleCount, globalThrottleCount);
const int64_t throttleAttempts = std::max(
networkThrottleCount, std::max(localThrottleCount, globalThrottleCount));

// Calculates the delay with exponential backoff
if (throttleAttempts <= 0) {
Expand All @@ -229,4 +271,21 @@ Throttler::ThrottleSignalGenerator::operator()(
const void* /*unused*/) {
return std::unique_ptr<ThrottleSignal>(new ThrottleSignal{1});
}

/* static */
Throttler::ThrottleSignalCache Throttler::maybeMakeThrottleSignalCache(
bool enabled,
uint32_t minThrottledSignals,
uint32_t maxCacheEntries,
uint32_t cacheTTLMs) {
return {
.throttleCache = !enabled
? nullptr
: std::make_unique<ThrottleSignalFactory>(
std::make_unique<SimpleLRUCache<std::string, ThrottleSignal>>(
maxCacheEntries, cacheTTLMs),
std::make_unique<ThrottleSignalGenerator>()),
.minThrottledSignalsToBackOff = minThrottledSignals,
};
}
} // namespace facebook::velox::dwio::common
31 changes: 26 additions & 5 deletions velox/dwio/common/Throttler.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ class Throttler {
/// backoff.
uint32_t minGlobalThrottledSignals;

/// The minimum number of received network throttled signals before starting
/// backoff.
uint32_t minNetworkThrottledSignals;

/// The maximum number of entries in the throttled signal cache. There is
/// one cache for each throttle signal type. For local throttle signal
/// cache, each cache entry corresponds to a unqiue file direcotry in a
Expand All @@ -68,6 +72,7 @@ class Throttler {
static constexpr double kBackoffScaleFactorDefault{2.0};
static constexpr uint32_t kMinLocalThrottledSignalsDefault{1'000};
static constexpr uint32_t kMinGlobalThrottledSignalsDefault{100'000};
static constexpr uint32_t kMinNetworkThrottledSignal{1'000};
static constexpr uint32_t kMaxCacheEntriesDefault{10'000};
static constexpr uint32_t kCacheTTLMsDefault{3 * 60 * 1'000};

Expand All @@ -78,6 +83,7 @@ class Throttler {
double backoffScaleFactor = kBackoffScaleFactorDefault,
uint32_t minLocalThrottledSignals = kMinLocalThrottledSignalsDefault,
uint32_t minGlobalThrottledSignals = kMinGlobalThrottledSignalsDefault,
uint32_t minNetworkThrottledSignals = kMinNetworkThrottledSignal,
uint32_t maxCacheEntries = kMaxCacheEntriesDefault,
uint32_t cacheTTLMs = kCacheTTLMsDefault);

Expand All @@ -88,6 +94,7 @@ class Throttler {
struct Stats {
std::atomic_uint64_t localThrottled{0};
std::atomic_uint64_t globalThrottled{0};
std::atomic_uint64_t networkThrottled{0};
/// Counts the backoff delay in milliseconds.
io::IoCounter backOffDelay;
};
Expand All @@ -104,6 +111,8 @@ class Throttler {
kLocal,
/// A cluster-wise throttled signal.
kGlobal,
/// Network throttled signal.
kNetwork,
};
static std::string signalTypeName(SignalType type);

Expand Down Expand Up @@ -174,25 +183,37 @@ class Throttler {
using ThrottleSignalFactory = facebook::velox::
CachedFactory<std::string, ThrottleSignal, ThrottleSignalGenerator>;

struct ThrottleSignalCache {
std::unique_ptr<ThrottleSignalFactory> throttleCache;
uint32_t minThrottledSignalsToBackOff;
};

void updateThrottleCacheLocked(
SignalType type,
const std::string& cluster,
const std::string& directory,
CachedThrottleSignalPtr& localSignal,
CachedThrottleSignalPtr& globalSignal);
CachedThrottleSignalPtr& globalSignal,
CachedThrottleSignalPtr& networkSignal);

void updateThrottleStats(SignalType type, uint64_t backoffDelayMs);

static ThrottleSignalCache maybeMakeThrottleSignalCache(
bool enabled,
uint32_t minThrottledSignals,
uint32_t maxCacheEntries,
uint32_t cacheTTLMs);

static const uint64_t kNoBackOffMs_{0};

const bool throttleEnabled_;
const uint64_t minThrottleBackoffDurationMs_;
const uint64_t maxThrottleBackoffDurationMs_;
const double backoffScaleFactor_;
const uint32_t minLocalThrottledSignalsToBackoff_;
const uint32_t minGlobalThrottledSignalsToBackoff_;
const std::unique_ptr<ThrottleSignalFactory> localThrottleCache_;
const std::unique_ptr<ThrottleSignalFactory> globalThrottleCache_;

const ThrottleSignalCache localThrottleCache_;
const ThrottleSignalCache globalThrottleCache_;
const ThrottleSignalCache networkThrottleCache_;

mutable std::mutex mu_;

Expand Down
Loading
Loading