Skip to content

Commit

Permalink
fix: Avoid redownload coalesced region gap twice in buffered inputs
Browse files Browse the repository at this point in the history
Summary:
In the current `DirectBufferedInput` and `CachedBufferedInput`, when we
coalesce some regions during prefetch, the gap between regions are not marked as
available, and if they are needed in adhoc streams, we would issue separate IOs
for them, resulting in unnecessary IOs.  This change fixes this by rearranging
the regions and loads generation logic, first generate and group the prefetch
regions, then we can compare the coalesced regions to the adhoc regions and move
those already coalesced to the prefetch loads, reducing the number of adhoc IOs.

Differential Revision: D67810195
  • Loading branch information
Yuhta authored and facebook-github-bot committed Jan 3, 2025
1 parent 8b3b55a commit 132eaf9
Show file tree
Hide file tree
Showing 6 changed files with 235 additions and 100 deletions.
46 changes: 46 additions & 0 deletions velox/dwio/common/BufferedInput.h
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,52 @@ class BufferedInput {
return pct;
}

// Move the requests in `noPrefetch' to `prefetch' if it is already covered by
// coalescing in `prefetch'.
template <typename Request, typename GetRegionOffset, typename GetRegionEnd>
static void moveCoalesced(
std::vector<Request>& prefetch,
std::vector<int32_t>& ends,
std::vector<Request>& noPrefetch,
GetRegionOffset getRegionOffset,
GetRegionEnd getRegionEnd) {
auto numOldPrefetch = prefetch.size();
prefetch.resize(prefetch.size() + noPrefetch.size());
std::copy_backward(
prefetch.data(), prefetch.data() + numOldPrefetch, prefetch.end());
auto* oldPrefetch = prefetch.data() + noPrefetch.size();
int numMoved = 0;
int i = 0; // index into noPrefetch for read
int j = 0; // index into oldPrefetch
int k = 0; // index into prefetch
int l = 0; // index into noPrefetch for write
for (auto& end : ends) {
prefetch[k++] = oldPrefetch[j++];
while (j < end) {
auto coalesceStart = getRegionEnd(oldPrefetch[j - 1]);
auto coalesceEnd = getRegionOffset(oldPrefetch[j]);
while (i < noPrefetch.size() &&
getRegionOffset(noPrefetch[i]) < coalesceStart) {
noPrefetch[l++] = noPrefetch[i++];
}
while (i < noPrefetch.size() &&
getRegionEnd(noPrefetch[i]) <= coalesceEnd) {
prefetch[k++] = noPrefetch[i++];
++numMoved;
}
prefetch[k++] = oldPrefetch[j++];
}
end += numMoved;
}
while (i < noPrefetch.size()) {
noPrefetch[l++] = noPrefetch[i++];
}
VELOX_CHECK_EQ(k, numOldPrefetch + numMoved);
prefetch.resize(k);
VELOX_CHECK_EQ(l + numMoved, noPrefetch.size());
noPrefetch.resize(l);
}

const std::shared_ptr<ReadFileInputStream> input_;
memory::MemoryPool* const pool_;

Expand Down
158 changes: 100 additions & 58 deletions velox/dwio/common/CachedBufferedInput.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ bool CachedBufferedInput::shouldPreload(int32_t numPages) {
}

namespace {

bool isPrefetchPct(int32_t pct) {
return pct >= FLAGS_cache_prefetch_min_pct;
}
Expand Down Expand Up @@ -139,6 +140,26 @@ std::vector<CacheRequest*> makeRequestParts(
return parts;
}

template <bool kSsd>
uint64_t getOffset(const CacheRequest& request) {
if constexpr (kSsd) {
VELOX_DCHECK(!request.ssdPin.empty());
return request.ssdPin.run().offset();
} else {
return request.key.offset;
}
}

template <bool kSsd>
std::pair<uint64_t, uint64_t> toRegion(const CacheRequest& request) {
return std::make_pair(getOffset<kSsd>(request), request.size);
}

template <bool kSsd>
bool lessThan(const CacheRequest* left, const CacheRequest* right) {
return toRegion<kSsd>(*left) < toRegion<kSsd>(*right);
}

} // namespace

void CachedBufferedInput::load(const LogType /*unused*/) {
Expand Down Expand Up @@ -187,42 +208,48 @@ void CachedBufferedInput::load(const LogType /*unused*/) {
}
}

makeLoads(std::move(storageLoad[1]), true);
makeLoads(std::move(ssdLoad[1]), true);
makeLoads(std::move(storageLoad[0]), false);
makeLoads(std::move(ssdLoad[0]), false);
std::sort(storageLoad[0].begin(), storageLoad[0].end(), lessThan<false>);
std::sort(storageLoad[1].begin(), storageLoad[1].end(), lessThan<false>);
std::sort(ssdLoad[0].begin(), ssdLoad[0].end(), lessThan<true>);
std::sort(ssdLoad[1].begin(), ssdLoad[1].end(), lessThan<true>);
makeLoads<false>(storageLoad);
makeLoads<true>(ssdLoad);
}

void CachedBufferedInput::makeLoads(
std::vector<CacheRequest*> requests,
bool prefetch) {
template <bool kSsd>
void CachedBufferedInput::makeLoads(std::vector<CacheRequest*> requests[2]) {
std::vector<int32_t> groupEnds[2];
groupEnds[1] = groupRequests<kSsd>(requests[1], true);
moveCoalesced(
requests[1],
groupEnds[1],
requests[0],
[](auto* request) { return getOffset<kSsd>(*request); },
[](auto* request) { return getOffset<kSsd>(*request) + request->size; });
groupEnds[0] = groupRequests<kSsd>(requests[0], false);
readRegions(requests[1], true, groupEnds[1]);
readRegions(requests[0], false, groupEnds[0]);
}

template <bool kSsd>
std::vector<int32_t> CachedBufferedInput::groupRequests(
const std::vector<CacheRequest*>& requests,
bool prefetch) const {
if (requests.empty() || (requests.size() < 2 && !prefetch)) {
return;
return {};
}
const bool isSsd = !requests[0]->ssdPin.empty();
const int32_t maxDistance = isSsd ? 20000 : options_.maxCoalesceDistance();
std::sort(
requests.begin(),
requests.end(),
[&](const CacheRequest* left, const CacheRequest* right) {
if (isSsd) {
return left->ssdPin.run().offset() < right->ssdPin.run().offset();
} else {
return left->key.offset < right->key.offset;
}
});
const int32_t maxDistance = kSsd ? 20000 : options_.maxCoalesceDistance();

// Combine adjacent short reads.
int32_t numNewLoads = 0;
int64_t coalescedBytes = 0;
coalesceIo<CacheRequest*, CacheRequest*>(
std::vector<int32_t> ends;
ends.reserve(requests.size());
std::vector<char> ranges;
coalesceIo<CacheRequest*, char>(
requests,
maxDistance,
std::numeric_limits<int32_t>::max(),
[&](int32_t index) {
return isSsd ? requests[index]->ssdPin.run().offset()
: requests[index]->key.offset;
},
[&](int32_t index) { return getOffset<kSsd>(*requests[index]); },
[&](int32_t index) {
const auto size = requests[index]->size;
coalescedBytes += size;
Expand All @@ -235,41 +262,16 @@ void CachedBufferedInput::makeLoads(
}
return requests[index]->coalesces ? 1 : kNoCoalesce;
},
[&](CacheRequest* request, std::vector<CacheRequest*>& ranges) {
ranges.push_back(request);
[&](CacheRequest* /*request*/, std::vector<char>& ranges) {
ranges.push_back(0);
},
[&](int32_t /*gap*/, std::vector<CacheRequest*> /*ranges*/) { /*no op*/ },
[&](int32_t /*gap*/, std::vector<char> /*ranges*/) { /*no op*/ },
[&](const std::vector<CacheRequest*>& /*requests*/,
int32_t /*begin*/,
int32_t /*end*/,
int32_t end,
uint64_t /*offset*/,
const std::vector<CacheRequest*>& ranges) {
++numNewLoads;
readRegion(ranges, prefetch);
});

if (prefetch && (executor_ != nullptr)) {
std::vector<int32_t> doneIndices;
for (auto i = 0; i < allCoalescedLoads_.size(); ++i) {
auto& load = allCoalescedLoads_[i];
if (load->state() == CoalescedLoad::State::kPlanned) {
executor_->add(
[pendingLoad = load, ssdSavable = !options_.noCacheRetention()]() {
process::TraceContext trace("Read Ahead");
pendingLoad->loadOrFuture(nullptr, ssdSavable);
});
} else {
doneIndices.push_back(i);
}
}

// Remove the loads that were complete. There can be done loads if the same
// CachedBufferedInput has multiple cycles of enqueues and loads.
for (int32_t i = doneIndices.size() - 1; i >= 0; --i) {
assert(!doneIndices.empty()); // lint
allCoalescedLoads_.erase(allCoalescedLoads_.begin() + doneIndices[i]);
}
}
const std::vector<char>& /*ranges*/) { ends.push_back(end); });
return ends;
}

namespace {
Expand Down Expand Up @@ -469,6 +471,46 @@ void CachedBufferedInput::readRegion(
});
}

void CachedBufferedInput::readRegions(
const std::vector<CacheRequest*>& requests,
bool prefetch,
const std::vector<int32_t>& groupEnds) {
int i = 0;
std::vector<CacheRequest*> group;
for (auto end : groupEnds) {
while (i < end) {
group.push_back(requests[i++]);
}
readRegion(group, prefetch);
group.clear();
}
if (prefetch && executor_) {
std::vector<int32_t> doneIndices;
for (auto i = 0; i < allCoalescedLoads_.size(); ++i) {
auto& load = allCoalescedLoads_[i];
if (load->state() == CoalescedLoad::State::kPlanned) {
executor_->add(
[pendingLoad = load, ssdSavable = !options_.noCacheRetention()]() {
process::TraceContext trace("Read Ahead");
pendingLoad->loadOrFuture(nullptr, ssdSavable);
});
} else {
doneIndices.push_back(i);
}
}
// Remove the loads that were complete. There can be done loads if the same
// CachedBufferedInput has multiple cycles of enqueues and loads.
for (int i = 0, j = 0, k = 0; i < allCoalescedLoads_.size(); ++i) {
if (j < doneIndices.size() && doneIndices[j] == i) {
++j;
} else {
allCoalescedLoads_[k++] = std::move(allCoalescedLoads_[i]);
}
}
allCoalescedLoads_.resize(allCoalescedLoads_.size() - doneIndices.size());
}
}

std::shared_ptr<cache::CoalescedLoad> CachedBufferedInput::coalescedLoad(
const SeekableInputStream* stream) {
return coalescedLoads_.withWLock(
Expand All @@ -478,7 +520,7 @@ std::shared_ptr<cache::CoalescedLoad> CachedBufferedInput::coalescedLoad(
return nullptr;
}
auto load = std::move(it->second);
auto* dwioLoad = dynamic_cast<DwioCoalescedLoadBase*>(load.get());
auto* dwioLoad = static_cast<DwioCoalescedLoadBase*>(load.get());
for (auto& request : dwioLoad->requests()) {
loads.erase(request.stream);
}
Expand Down
15 changes: 12 additions & 3 deletions velox/dwio/common/CachedBufferedInput.h
Original file line number Diff line number Diff line change
Expand Up @@ -169,16 +169,25 @@ class CachedBufferedInput : public BufferedInput {
}

private:
// Sorts requests and makes CoalescedLoads for nearby requests. If 'prefetch'
// is true, starts background loading.
void makeLoads(std::vector<CacheRequest*> requests, bool prefetch);
template <bool kSsd>
std::vector<int32_t> groupRequests(
const std::vector<CacheRequest*>& requests,
bool prefetch) const;

// Makes a CoalescedLoad for 'requests' to be read together, coalescing IO is
// appropriate. If 'prefetch' is set, schedules the CoalescedLoad on
// 'executor_'. Links the CoalescedLoad to all CacheInputStreams that it
// concerns.
void readRegion(const std::vector<CacheRequest*>& requests, bool prefetch);

void readRegions(
const std::vector<CacheRequest*>& requests,
bool prefetch,
const std::vector<int32_t>& groupEnds);

template <bool kSsd>
void makeLoads(std::vector<CacheRequest*> requests[2]);

// We only support up to 8MB load quantum size on SSD and there is no need for
// larger SSD read size performance wise.
void checkLoadQuantum() {
Expand Down
Loading

0 comments on commit 132eaf9

Please sign in to comment.