Skip to content

Commit

Permalink
Add file read ahead through AsyncDataCache (#3389)
Browse files Browse the repository at this point in the history
Summary:
adds a code sample and supporting functions for using CachedBufferedInput for smart prefetching of sequential files.  A SeekableInputStream is registered for each file of interest. Each time the read proceeds past a given fraction of the current load quantum, the load of the next quantum is scheduled. The load prefetches into AsyncDataCache and will be found there when moving past the end of thecurrent quantum. Prefetch will fail silently if there is no memory or if over half the cache is taken by not yet accessed prefetched data.

Adds a special StreamIdentifier to denote expected sequential access. The default for no StreamIdentifier in enqueue is preloading the whole enqueued range.

Adds a mode where each visited cache entry is made evictable immediately after unpinning. This allows not polluting the cache with one time large sequential accesses.

Adds a test that simulates a multifile merge. Each thread has 100 files and consumes a chunk of each in turn.

Pull Request resolved: #3389

Reviewed By: Yuhta

Differential Revision: D41642368

Pulled By: oerling

fbshipit-source-id: e02c0103fd5de67fd23b5822adffd4e8a7ae02ff
  • Loading branch information
Orri Erling authored and facebook-github-bot committed Dec 2, 2022
1 parent 5a34db3 commit 1299c91
Show file tree
Hide file tree
Showing 10 changed files with 254 additions and 17 deletions.
5 changes: 5 additions & 0 deletions velox/common/caching/AsyncDataCache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,11 @@ void AsyncDataCacheEntry::initialize(FileCacheKey key) {
}
}

void AsyncDataCacheEntry::makeEvictable() {
accessStats_.lastUse = 0;
accessStats_.numUses = 0;
}

std::string AsyncDataCacheEntry::toString() const {
return fmt::format(
"<entry key:{}:{} size {} pins {}>",
Expand Down
9 changes: 8 additions & 1 deletion velox/common/caching/AsyncDataCache.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,12 @@ struct AccessStats {
// works well with a typical formula of time over use count going to
// zero as uses go up and time goes down. 'now' is the current
// accessTime(), passed from the caller since getting the time is
// expensive and many entries are checked one after the other.
// expensive and many entries are checked one after the other. lastUse == 0
// means explicitly evictable.
int32_t score(AccessTime now, uint64_t /*size*/) const {
if (!lastUse) {
return std::numeric_limits<int32_t>::max();
}
return (now - lastUse) / (1 + numUses);
}

Expand Down Expand Up @@ -232,6 +236,9 @@ class AsyncDataCacheEntry {
groupId_ = groupId;
}

/// Sets access stats so that this is immediately evictable.
void makeEvictable();

// Moves the promise out of 'this'. Used in order to handle the
// promise within the lock of the cache shard, so not within private
// methods of 'this'.
Expand Down
5 changes: 3 additions & 2 deletions velox/dwio/common/BufferedInput.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,9 @@ class BufferedInput {

// True if there is free memory for prefetching the stripe. This is
// called to check if a stripe that is not next for read should be
// prefetched.
virtual bool shouldPreload() {
// prefetched. 'numPages' is added to the already enqueued pages, so
// that this can be called also before enqueueing regions.
virtual bool shouldPreload(int32_t /*numPages*/ = 0) {
return false;
}

Expand Down
20 changes: 20 additions & 0 deletions velox/dwio/common/CacheInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,23 @@ bool CacheInputStream::Next(const void** buffer, int32_t* size) {
*size = region_.length - position_;
}
offsetInRun_ += *size;
if (prefetchPct_ < 100) {
auto offsetInQuantum = position_ % loadQuantum_;
auto nextQuantum = position_ - offsetInQuantum + loadQuantum_;
auto prefetchThreshold = loadQuantum_ * prefetchPct_ / 100;
if (!prefetchStarted_ && offsetInQuantum + *size > prefetchThreshold &&
position_ - offsetInQuantum + loadQuantum_ < region_.length) {
// We read past 'prefetchPct_' % of the current load quantum and the
// current load quantum is not the last in the region. Prefetch the next
// load quantum.
auto prefetchSize =
std::min(region_.length, nextQuantum + loadQuantum_) - nextQuantum;
prefetchStarted_ = bufferedInput_->prefetch(
Region{region_.offset + nextQuantum, prefetchSize});
}
}
position_ += *size;

if (tracker_) {
tracker_->recordRead(trackingId_, *size, fileNum_, groupId_);
}
Expand Down Expand Up @@ -153,9 +169,13 @@ void CacheInputStream::loadSync(Region region) {
// so as not to double count when the individual parts are
// hit.
ioStats_->incRawBytesRead(region.length);
prefetchStarted_ = false;
do {
folly::SemiFuture<bool> wait(false);
cache::RawFileCacheKey key{fileNum_, region.offset};
if (noRetention_ && !pin_.empty()) {
pin_.checkedEntry()->makeEvictable();
}
pin_.clear();
pin_ = cache_->findOrCreate(key, region.length, &wait);
if (pin_.empty()) {
Expand Down
30 changes: 30 additions & 0 deletions velox/dwio/common/CacheInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,23 @@ class CacheInputStream : public SeekableInputStream {
/// outside of the window. Use together wiht clone() and skip().
void setRemainingBytes(uint64_t remainingBytes);

/// Causes the next load quantum to be scheduled for read-ahead when
/// 'percent' of the current load quantum has been returned by
/// Next(). If Next() returns the whole read quantum, them the first
/// Next triggers the read ahead of te next quantum right away. a
/// value of over 100 causes no prefetches to be made. If there is
/// no memory to cover the load quantum to prefetch the prefetch
/// fails silently.
void setPrefetchPct(int32_t pct) {
prefetchPct_ = pct;
}

/// Enables a mode where cache entries are made immediately evictable after
/// unpinning.
void setNoRetention() {
noRetention_ = true;
}

private:
// Ensures that the current position is covered by 'pin_'.
void loadPosition();
Expand Down Expand Up @@ -126,6 +143,19 @@ class CacheInputStream : public SeekableInputStream {
// A restricted view over 'region'. offset is relative to 'region_'. A cloned
// CacheInputStream can cover a subrange of the range of the original.
std::optional<Region> window_;

// Percentage of 'loadQuantum_' at which the next load quantum gets scheduled.
// Over 100 means no prefetch.
int32_t prefetchPct_{200};

// True if prefetch f the next 'loadQuantum_' has been started. Cleared when
// moving to the next load quantum.
bool prefetchStarted_{false};

// True if a pin should be set to lowest retention score after
// unpinning. This applies to sequential reads where a second access
// to the page is not expected.
bool noRetention_{false};
};

} // namespace facebook::velox::dwio::common
53 changes: 45 additions & 8 deletions velox/dwio/common/CachedBufferedInput.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,9 @@ std::unique_ptr<SeekableInputStream> CachedBufferedInput::enqueue(
VELOX_CHECK_LE(region.offset + region.length, fileSize_);
requests_.emplace_back(
RawFileCacheKey{fileNum_, region.offset}, region.length, id);
tracker_->recordReference(id, region.length, fileNum_, groupId_);
if (tracker_) {
tracker_->recordReference(id, region.length, fileNum_, groupId_);
}
auto stream = std::make_unique<CacheInputStream>(
this,
ioStats_.get(),
Expand All @@ -69,13 +71,12 @@ bool CachedBufferedInput::isBuffered(uint64_t /*offset*/, uint64_t /*length*/)
return false;
}

bool CachedBufferedInput::shouldPreload() {
bool CachedBufferedInput::shouldPreload(int32_t numPages) {
// True if after scheduling this for preload, half the capacity
// would be in a loading but not yet accessed state.
if (requests_.empty()) {
if (requests_.empty() && !numPages) {
return false;
}
int32_t numPages = 0;
for (auto& request : requests_) {
numPages += bits::roundUp(
std::min<int32_t>(request.size, loadQuantum_),
Expand Down Expand Up @@ -115,6 +116,8 @@ std::vector<CacheRequest*> makeRequestParts(
// Large columns will be part of coalesced reads if the access frequency
// qualifies for read ahead and if over 80% of the column gets accessed. Large
// metadata columns (empty no trackingData) always coalesce.
bool prefetchOne =
request.trackingId.id() == StreamIdentifier::sequentialFile().id_;
auto readPct =
(100 * trackingData.numReads) / (1 + trackingData.numReferences);
auto readDensity =
Expand All @@ -130,6 +133,9 @@ std::vector<CacheRequest*> makeRequestParts(
request.trackingId));
parts.push_back(extraRequests.back().get());
parts.back()->coalesces = prefetch;
if (prefetchOne) {
break;
}
}
return parts;
}
Expand Down Expand Up @@ -166,11 +172,12 @@ void CachedBufferedInput::load(const LogType) {
continue;
}
cache::TrackingData trackingData;
if (!request.trackingId.empty()) {
bool prefetchAnyway = request.trackingId.empty() ||
request.trackingId.id() == StreamIdentifier::sequentialFile().id_;
if (!prefetchAnyway && tracker_) {
trackingData = tracker_->trackingData(request.trackingId);
}
if (request.trackingId.empty() ||
adjustedReadPct(trackingData) >= readPct) {
if (prefetchAnyway || adjustedReadPct(trackingData) >= readPct) {
request.processed = true;
auto parts = makeRequestParts(
request, trackingData, loadQuantum_, extraRequests);
Expand Down Expand Up @@ -248,14 +255,24 @@ void CachedBufferedInput::makeLoads(
readRegion(ranges, prefetch);
});
if (prefetch && executor_) {
for (auto& load : allCoalescedLoads_) {
std::vector<int32_t> doneIndices;
for (auto i = 0; i < allCoalescedLoads_.size(); ++i) {
auto& load = allCoalescedLoads_[i];
if (load->state() == LoadState::kPlanned) {
executor_->add([pendingLoad = load]() {
process::TraceContext trace("Read Ahead");
pendingLoad->loadOrFuture(nullptr);
});
} else {
doneIndices.push_back(i);
}
}
// Remove the loads that were complete. There can be done loads if the same
// CachedBufferedInput has multiple cycles of enqueues and loads.
for (int32_t i = doneIndices.size() - 1; i >= 0; --i) {
assert(!doneIndices.empty()); // lint
allCoalescedLoads_.erase(allCoalescedLoads_.begin() + doneIndices[i]);
}
}
}

Expand Down Expand Up @@ -358,6 +375,9 @@ class DwioCoalescedLoad : public DwioCoalescedLoadBase {
keys_,
[&](int32_t index) { return sizes_[index]; },
[&](int32_t /*index*/, CachePin pin) {
if (isPrefetch) {
pin.checkedEntry()->setPrefetch(true);
}
pins.push_back(std::move(pin));
});
if (pins.empty()) {
Expand Down Expand Up @@ -400,6 +420,9 @@ class SsdLoad : public DwioCoalescedLoadBase {
keys_,
[&](int32_t index) { return sizes_[index]; },
[&](int32_t index, CachePin pin) {
if (isPrefetch) {
pin.checkedEntry()->setPrefetch(true);
}
pins.push_back(std::move(pin));
ssdPins.push_back(std::move(requests_[index].ssdPin));
});
Expand Down Expand Up @@ -475,4 +498,18 @@ std::unique_ptr<SeekableInputStream> CachedBufferedInput::read(
loadQuantum_);
}

bool CachedBufferedInput::prefetch(Region region) {
int32_t numPages = bits::roundUp(region.length, MemoryAllocator::kPageSize) /
MemoryAllocator::kPageSize;
if (!shouldPreload(numPages)) {
return false;
}
auto stream = enqueue(region, nullptr);
load(LogType::FILE);
// Remove the coalesced load made for the stream. It will not be accessed. The
// cache entry will be accessed.
coalescedLoad(stream.get());
return true;
}

} // namespace facebook::velox::dwio::common
6 changes: 5 additions & 1 deletion velox/dwio/common/CachedBufferedInput.h
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,11 @@ class CachedBufferedInput : public BufferedInput {
std::unique_ptr<SeekableInputStream>
read(uint64_t offset, uint64_t length, LogType logType) const override;

bool shouldPreload() override;
/// Schedules load of 'region' on 'executor_'. Fails silently if no memory or
/// if shouldPreload() is false.
bool prefetch(Region region);

bool shouldPreload(int32_t numPages = 0) override;

bool shouldPrefetchStripes() const override {
return true;
Expand Down
7 changes: 7 additions & 0 deletions velox/dwio/common/StreamIdentifier.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,13 @@ class StreamIdentifier {
return fmt::format("[id={}]", id_);
}

/// Returns a special value indicating a stream to be read load quantum by
/// load quantum
static StreamIdentifier sequentialFile() {
constexpr int32_t kSequentialFile = std::numeric_limits<int32_t>::max() - 1;
return StreamIdentifier(kSequentialFile);
}

int32_t id_;
};

Expand Down
Loading

0 comments on commit 1299c91

Please sign in to comment.