Skip to content

Commit

Permalink
Fix and optimize read IO pattern for flat map column (facebookincubat…
Browse files Browse the repository at this point in the history
…or#11236)

Summary:

1. `numReferences` and `numReads` in `TrackingData` can overflow for large volume of read on flatmap column, which causes the read percentage becomes negative and fail to coalesce.  Fix this by using `double` and only count the bytes, because load quantum is decoupled with `Next` so there is no way to correlate the number of reads (`Next`) with number of loads (`enqueue`) correctly.
2. Increase the limit for maximum number of regions for coalesce, reducing the number of IO reads for a typical flatmap column to 1/3.
3. Use binary search when looking up the request by region offsets in `DirectBufferedInput`.

Reviewed By: xiaoxmeng, oerling

Differential Revision: D64225777
  • Loading branch information
Yuhta authored and facebook-github-bot committed Oct 14, 2024
1 parent f9d9f48 commit 6dd493a
Show file tree
Hide file tree
Showing 9 changed files with 100 additions and 132 deletions.
19 changes: 10 additions & 9 deletions velox/common/caching/ScanTracker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,10 @@ void ScanTracker::recordReference(
fileGroupStats_->recordReference(fileId, groupId, id, bytes);
}
std::lock_guard<std::mutex> l(mutex_);
data_[id].incrementReference(bytes, loadQuantum_);
sum_.incrementReference(bytes, loadQuantum_);
auto& data = data_[id];
data.referencedBytes += bytes;
data.lastReferencedBytes = bytes;
sum_.referencedBytes += bytes;
}

void ScanTracker::recordRead(
Expand All @@ -45,18 +47,17 @@ void ScanTracker::recordRead(
fileGroupStats_->recordRead(fileId, groupId, id, bytes);
}
std::lock_guard<std::mutex> l(mutex_);
data_[id].incrementRead(bytes);
sum_.incrementRead(bytes);
auto& data = data_[id];
data.readBytes += bytes;
sum_.readBytes += bytes;
}

std::string ScanTracker::toString() const {
std::stringstream out;
out << "ScanTracker for " << id_ << std::endl;
for (const auto& pair : data_) {
const int pct =
100 * pair.second.readBytes / (1 + pair.second.referencedBytes);
out << pair.first.id() << ": " << pct << "% " << pair.second.readBytes
<< "/" << pair.second.numReads << std::endl;
for (const auto& [id, data] : data_) {
out << id.id() << ": " << data.readBytes << "/" << data.referencedBytes
<< std::endl;
}
return out.str();
}
Expand Down
36 changes: 5 additions & 31 deletions velox/common/caching/ScanTracker.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,30 +73,9 @@ class FileGroupStats;

/// Records references and actual uses of a stream.
struct TrackingData {
int64_t referencedBytes{};
int64_t readBytes{};
int32_t numReferences{};
int32_t numReads{};

/// Marks that 'bytes' worth of data in the tracked object has been referenced
/// and may later be accessed. If 'bytes' is larger than a single
/// 'loadQuantum', the reference counts for as many accesses as are needed to
/// cover 'bytes'. When reading a large object, we will get a read per
/// quantum. So then if the referenced and read counts match, we know that the
/// object is densely read.
void incrementReference(uint64_t bytes, int32_t loadQuantum) {
referencedBytes += bytes;
if (loadQuantum == 0) {
++numReferences;
} else {
numReferences += bits::roundUp(bytes, loadQuantum) / loadQuantum;
}
}

void incrementRead(uint64_t bytes) {
readBytes += bytes;
++numReads;
}
double referencedBytes{};
double lastReferencedBytes{};
double readBytes{};
};

/// Tracks column access frequency during execution of a query. A ScanTracker is
Expand All @@ -121,7 +100,6 @@ class ScanTracker {
FileGroupStats* fileGroupStats = nullptr)
: id_(id),
unregisterer_(std::move(unregisterer)),
loadQuantum_(loadQuantum),
fileGroupStats_(fileGroupStats) {}

~ScanTracker() {
Expand Down Expand Up @@ -156,10 +134,10 @@ class ScanTracker {
int32_t readPct(TrackingId id) {
std::lock_guard<std::mutex> l(mutex_);
const auto& data = data_[id];
if (data.numReferences == 0) {
if (data.referencedBytes == 0) {
return 100;
}
return (100 * data.numReads) / data.numReferences;
return data.readBytes / data.referencedBytes * 100;
}

TrackingData trackingData(TrackingId id) {
Expand All @@ -181,10 +159,6 @@ class ScanTracker {
// Id of query + scan operator to track.
const std::string id_;
const std::function<void(ScanTracker*)> unregisterer_{nullptr};
// Maximum size of a read. 10MB would count as two references if the quantum
// were 8MB. At the same time this would count as a single 10MB reference for
// 'fileGroupStats_'. 0 means the read size is unlimited.
const int32_t loadQuantum_;
FileGroupStats* const fileGroupStats_;

std::mutex mutex_;
Expand Down
18 changes: 18 additions & 0 deletions velox/dwio/common/BufferedInput.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

#pragma once

#include "velox/common/caching/ScanTracker.h"
#include "velox/common/memory/AllocationPool.h"
#include "velox/dwio/common/SeekableInputStream.h"
#include "velox/dwio/common/StreamIdentifier.h"
Expand Down Expand Up @@ -149,6 +150,23 @@ class BufferedInput {
virtual uint64_t nextFetchSize() const;

protected:
static int adjustedReadPct(const cache::TrackingData& trackingData) {
// When this method is called, there is one more reference that is already
// counted, but the corresponding read (if exists) has not happened yet. So
// we must count one fewer reference at this point.
const auto referencedBytes =
trackingData.referencedBytes - trackingData.lastReferencedBytes;
if (referencedBytes == 0) {
return 0;
}
const int pct = trackingData.readBytes / referencedBytes * 100;
VELOX_CHECK_LE(0, pct, "Bad read percentage: {}", pct);
// It is possible to seek back or clone the stream and read the same data
// multiple times, or because of unplanned read, so pct could be larger than
// 100. This should be rare in production though.
return pct;
}

const std::shared_ptr<ReadFileInputStream> input_;
memory::MemoryPool* const pool_;

Expand Down
86 changes: 35 additions & 51 deletions velox/dwio/common/CachedBufferedInput.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -121,12 +121,11 @@ std::vector<CacheRequest*> makeRequestParts(
// metadata columns (empty no trackingData) always coalesce.
const bool prefetchOne =
request.trackingId.id() == StreamIdentifier::sequentialFile().id_;
const auto readPct =
(100 * trackingData.numReads) / (1 + trackingData.numReferences);
const auto readDensity =
(100 * trackingData.readBytes) / (1 + trackingData.referencedBytes);
trackingData.readBytes / (1 + trackingData.referencedBytes);
const auto readPct = 100 * readDensity;
const bool prefetch = trackingData.referencedBytes > 0 &&
(isPrefetchPct(readPct) && readDensity >= 80);
isPrefetchPct(readPct) && readDensity >= 0.8;
std::vector<CacheRequest*> parts;
for (uint64_t offset = 0; offset < request.size; offset += loadQuantum) {
const int32_t size = std::min<int32_t>(loadQuantum, request.size - offset);
Expand All @@ -143,14 +142,6 @@ std::vector<CacheRequest*> makeRequestParts(
return parts;
}

int32_t adjustedReadPct(const cache::TrackingData& trackingData) {
// When called, there will be one more reference that read, since references
// are counted before reading.
if (trackingData.numReferences < 2) {
return 0;
}
return (100 * trackingData.numReads) / (trackingData.numReferences - 1);
}
} // namespace

void CachedBufferedInput::load(const LogType /*unused*/) {
Expand All @@ -165,49 +156,43 @@ void CachedBufferedInput::load(const LogType /*unused*/) {
// Extra requests made for pre-loadable regions that are larger than
// 'loadQuantum'.
std::vector<std::unique_ptr<CacheRequest>> extraRequests;
// We loop over access frequency buckets. For example readPct 80 will get all
// streams where 80% or more of the referenced data is actually loaded.
for (const auto readPct : std::vector<int32_t>{80, 50, 20, 0}) {
std::vector<CacheRequest*> storageLoad;
std::vector<CacheRequest*> ssdLoad;
for (auto& request : requests) {
if (request.processed) {
std::vector<CacheRequest*> storageLoad[2];
std::vector<CacheRequest*> ssdLoad[2];
for (auto& request : requests) {
cache::TrackingData trackingData;
const bool prefetchAnyway = request.trackingId.empty() ||
request.trackingId.id() == StreamIdentifier::sequentialFile().id_;
if (!prefetchAnyway && (tracker_ != nullptr)) {
trackingData = tracker_->trackingData(request.trackingId);
}
const int loadIndex =
(prefetchAnyway || isPrefetchPct(adjustedReadPct(trackingData))) ? 1
: 0;
auto parts = makeRequestParts(
request, trackingData, options_.loadQuantum(), extraRequests);
for (auto part : parts) {
if (cache_->exists(part->key)) {
continue;
}
cache::TrackingData trackingData;
const bool prefetchAnyway = request.trackingId.empty() ||
request.trackingId.id() == StreamIdentifier::sequentialFile().id_;
if (!prefetchAnyway && (tracker_ != nullptr)) {
trackingData = tracker_->trackingData(request.trackingId);
}
if (prefetchAnyway || adjustedReadPct(trackingData) >= readPct) {
request.processed = true;
auto parts = makeRequestParts(
request, trackingData, options_.loadQuantum(), extraRequests);
for (auto part : parts) {
if (cache_->exists(part->key)) {
continue;
}
if (ssdFile != nullptr) {
part->ssdPin = ssdFile->find(part->key);
if (!part->ssdPin.empty() &&
part->ssdPin.run().size() < part->size) {
LOG(INFO) << "IOERR: Ignoring SSD shorter than requested: "
<< part->ssdPin.run().size() << " vs " << part->size;
part->ssdPin.clear();
}
if (!part->ssdPin.empty()) {
ssdLoad.push_back(part);
continue;
}
}
storageLoad.push_back(part);
if (ssdFile != nullptr) {
part->ssdPin = ssdFile->find(part->key);
if (!part->ssdPin.empty() && part->ssdPin.run().size() < part->size) {
LOG(INFO) << "IOERR: Ignoring SSD shorter than requested: "
<< part->ssdPin.run().size() << " vs " << part->size;
part->ssdPin.clear();
}
if (!part->ssdPin.empty()) {
ssdLoad[loadIndex].push_back(part);
continue;
}
}
storageLoad[loadIndex].push_back(part);
}
makeLoads(std::move(storageLoad), isPrefetchPct(readPct));
makeLoads(std::move(ssdLoad), isPrefetchPct(readPct));
}
makeLoads(std::move(storageLoad[1]), true);
makeLoads(std::move(ssdLoad[1]), true);
makeLoads(std::move(storageLoad[0]), false);
makeLoads(std::move(ssdLoad[0]), false);
}

void CachedBufferedInput::makeLoads(
Expand Down Expand Up @@ -235,8 +220,7 @@ void CachedBufferedInput::makeLoads(
coalesceIo<CacheRequest*, CacheRequest*>(
requests,
maxDistance,
// Break batches up. Better load more short ones in parallel.
40,
std::numeric_limits<int32_t>::max(),
[&](int32_t index) {
return isSsd ? requests[index]->ssdPin.run().offset()
: requests[index]->key.offset;
Expand Down
2 changes: 0 additions & 2 deletions velox/dwio/common/CachedBufferedInput.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,6 @@ struct CacheRequest {
cache::CachePin pin;
cache::SsdPin ssdPin;

bool processed{false};

/// True if this should be coalesced into a CoalescedLoad with other nearby
/// requests with a similar load probability. This is false for sparsely
/// accessed large columns where hitting one piece should not load the
Expand Down
61 changes: 24 additions & 37 deletions velox/dwio/common/DirectBufferedInput.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,42 +81,27 @@ bool isPrefetchablePct(int32_t pct) {
return pct >= FLAGS_cache_prefetch_min_pct;
}

int32_t adjustedReadPct(const cache::TrackingData& trackingData) {
// When called, there will be one more reference that read, since references
// are counted before reading.
if (trackingData.numReferences < 2) {
return 0;
}
return (100 * trackingData.numReads) / (trackingData.numReferences - 1);
}
} // namespace

void DirectBufferedInput::load(const LogType /*unused*/) {
// After load, new requests cannot be merged into pre-load ones.
auto requests = std::move(requests_);

// We loop over access frequency buckets. For example readPct 80
// will get all streams where 80% or more of the referenced data is
// actually loaded.
for (auto readPct : std::vector<int32_t>{80, 50, 20, 0}) {
std::vector<LoadRequest*> storageLoad;
for (auto& request : requests) {
if (request.processed) {
continue;
}
cache::TrackingData trackingData;
const bool prefetchAnyway = request.trackingId.empty() ||
request.trackingId.id() == StreamIdentifier::sequentialFile().id_;
if (!prefetchAnyway && tracker_) {
trackingData = tracker_->trackingData(request.trackingId);
}
if (prefetchAnyway || adjustedReadPct(trackingData) >= readPct) {
request.processed = true;
storageLoad.push_back(&request);
}
std::vector<LoadRequest*> storageLoad[2];
for (auto& request : requests) {
cache::TrackingData trackingData;
const bool prefetchAnyway = request.trackingId.empty() ||
request.trackingId.id() == StreamIdentifier::sequentialFile().id_;
if (!prefetchAnyway && tracker_) {
trackingData = tracker_->trackingData(request.trackingId);
}
makeLoads(std::move(storageLoad), isPrefetchablePct(readPct));
const int loadIndex =
(prefetchAnyway || isPrefetchablePct(adjustedReadPct(trackingData)))
? 1
: 0;
storageLoad[loadIndex].push_back(&request);
}
makeLoads(std::move(storageLoad[1]), true);
makeLoads(std::move(storageLoad[0]), false);
}

void DirectBufferedInput::makeLoads(
Expand Down Expand Up @@ -148,7 +133,7 @@ void DirectBufferedInput::makeLoads(
requests,
maxDistance,
// Break batches up. Better load more short ones i parallel.
1000, // limit coalesce by size, not count.
std::numeric_limits<int32_t>::max(), // limit coalesce by size, not count.
[&](int32_t index) { return requests[index]->region.offset; },
[&](int32_t index) -> int32_t {
auto size = requests[index]->region.length;
Expand Down Expand Up @@ -313,14 +298,16 @@ int32_t DirectCoalescedLoad::getData(
int64_t offset,
memory::Allocation& data,
std::string& tinyData) {
for (auto& request : requests_) {
if (request.region.offset == offset) {
data = std::move(request.data);
tinyData = std::move(request.tinyData);
return request.loadSize;
}
auto it = std::lower_bound(
requests_.begin(), requests_.end(), offset, [](auto& x, auto offset) {
return x.region.offset < offset;
});
if (it == requests_.end() || it->region.offset != offset) {
return 0;
}
return 0;
data = std::move(it->data);
tinyData = std::move(it->tinyData);
return it->loadSize;
}

} // namespace facebook::velox::dwio::common
5 changes: 4 additions & 1 deletion velox/dwio/common/DirectBufferedInput.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ struct LoadRequest {

velox::common::Region region;
cache::TrackingId trackingId;
bool processed{false};

const SeekableInputStream* stream;

Expand All @@ -63,6 +62,10 @@ class DirectCoalescedLoad : public cache::CoalescedLoad {
input_(std::move(input)),
loadQuantum_(loadQuantum),
pool_(pool) {
VELOX_DCHECK(
std::is_sorted(requests.begin(), requests.end(), [](auto* x, auto* y) {
return x->region.offset < y->region.offset;
}));
requests_.reserve(requests.size());
for (auto i = 0; i < requests.size(); ++i) {
requests_.push_back(std::move(*requests[i]));
Expand Down
3 changes: 3 additions & 0 deletions velox/dwio/dwrf/common/Common.h
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,9 @@ class DwrfStreamIdentifier : public dwio::common::StreamIdentifier {
stream.has_column() ? stream.column() : dwio::common::MAX_UINT32,
stream.kind()) {}

/// Pruned flat map keys are not enqueued thus all flatmap values on same
/// column should have similar read percentage, so it is ok for them to share
/// the same TrackingData.
DwrfStreamIdentifier(
uint32_t node,
uint32_t sequence,
Expand Down
2 changes: 1 addition & 1 deletion velox/dwio/dwrf/test/CacheInputTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -539,7 +539,7 @@ TEST_F(CacheTest, ssd) {
auto sparseStripeBytes = (ioStats_->rawBytesRead() - bytes) / 10;
EXPECT_LT(sparseStripeBytes, fullStripeBytes / 4);
// Expect the dense fraction of columns to have read ahead.
EXPECT_LT(1000000, ioStats_->prefetch().sum());
EXPECT_LT(400'000, ioStats_->prefetch().sum());

constexpr int32_t kStripesPerFile = 10;
auto bytesPerFile = fullStripeBytes * kStripesPerFile;
Expand Down

0 comments on commit 6dd493a

Please sign in to comment.