From 132eaf921ba37205d7c1640039fba9dba98cdd77 Mon Sep 17 00:00:00 2001 From: Jimmy Lu Date: Fri, 3 Jan 2025 14:21:36 -0800 Subject: [PATCH] fix: Avoid redownload coalesced region gap twice in buffered inputs Summary: In the current `DirectBufferedInput` and `CachedBufferedInput`, when we coalesce some regions during prefetch, the gap between regions are not marked as available, and if they are needed in adhoc streams, we would issue separate IOs for them, resulting in unnecessary IOs. This change fixes this by rearranging the regions and loads generation logic, first generate and group the prefetch regions, then we can compare the coalesced regions to the adhoc regions and move those already coalesced to the prefetch loads, reducing the number of adhoc IOs. Differential Revision: D67810195 --- velox/dwio/common/BufferedInput.h | 46 +++++ velox/dwio/common/CachedBufferedInput.cpp | 158 +++++++++++------- velox/dwio/common/CachedBufferedInput.h | 15 +- velox/dwio/common/DirectBufferedInput.cpp | 95 +++++++---- velox/dwio/common/DirectBufferedInput.h | 13 +- .../dwrf/test/DirectBufferedInputTest.cpp | 8 +- 6 files changed, 235 insertions(+), 100 deletions(-) diff --git a/velox/dwio/common/BufferedInput.h b/velox/dwio/common/BufferedInput.h index 2a1f1eba9826..6118b9300a5a 100644 --- a/velox/dwio/common/BufferedInput.h +++ b/velox/dwio/common/BufferedInput.h @@ -167,6 +167,52 @@ class BufferedInput { return pct; } + // Move the requests in `noPrefetch' to `prefetch' if it is already covered by + // coalescing in `prefetch'. + template + static void moveCoalesced( + std::vector& prefetch, + std::vector& ends, + std::vector& noPrefetch, + GetRegionOffset getRegionOffset, + GetRegionEnd getRegionEnd) { + auto numOldPrefetch = prefetch.size(); + prefetch.resize(prefetch.size() + noPrefetch.size()); + std::copy_backward( + prefetch.data(), prefetch.data() + numOldPrefetch, prefetch.end()); + auto* oldPrefetch = prefetch.data() + noPrefetch.size(); + int numMoved = 0; + int i = 0; // index into noPrefetch for read + int j = 0; // index into oldPrefetch + int k = 0; // index into prefetch + int l = 0; // index into noPrefetch for write + for (auto& end : ends) { + prefetch[k++] = oldPrefetch[j++]; + while (j < end) { + auto coalesceStart = getRegionEnd(oldPrefetch[j - 1]); + auto coalesceEnd = getRegionOffset(oldPrefetch[j]); + while (i < noPrefetch.size() && + getRegionOffset(noPrefetch[i]) < coalesceStart) { + noPrefetch[l++] = noPrefetch[i++]; + } + while (i < noPrefetch.size() && + getRegionEnd(noPrefetch[i]) <= coalesceEnd) { + prefetch[k++] = noPrefetch[i++]; + ++numMoved; + } + prefetch[k++] = oldPrefetch[j++]; + } + end += numMoved; + } + while (i < noPrefetch.size()) { + noPrefetch[l++] = noPrefetch[i++]; + } + VELOX_CHECK_EQ(k, numOldPrefetch + numMoved); + prefetch.resize(k); + VELOX_CHECK_EQ(l + numMoved, noPrefetch.size()); + noPrefetch.resize(l); + } + const std::shared_ptr input_; memory::MemoryPool* const pool_; diff --git a/velox/dwio/common/CachedBufferedInput.cpp b/velox/dwio/common/CachedBufferedInput.cpp index e8c5b2ab7ea9..6e357c6882fb 100644 --- a/velox/dwio/common/CachedBufferedInput.cpp +++ b/velox/dwio/common/CachedBufferedInput.cpp @@ -100,6 +100,7 @@ bool CachedBufferedInput::shouldPreload(int32_t numPages) { } namespace { + bool isPrefetchPct(int32_t pct) { return pct >= FLAGS_cache_prefetch_min_pct; } @@ -139,6 +140,26 @@ std::vector makeRequestParts( return parts; } +template +uint64_t getOffset(const CacheRequest& request) { + if constexpr (kSsd) { + VELOX_DCHECK(!request.ssdPin.empty()); + return request.ssdPin.run().offset(); + } else { + return request.key.offset; + } +} + +template +std::pair toRegion(const CacheRequest& request) { + return std::make_pair(getOffset(request), request.size); +} + +template +bool lessThan(const CacheRequest* left, const CacheRequest* right) { + return toRegion(*left) < toRegion(*right); +} + } // namespace void CachedBufferedInput::load(const LogType /*unused*/) { @@ -187,42 +208,48 @@ void CachedBufferedInput::load(const LogType /*unused*/) { } } - makeLoads(std::move(storageLoad[1]), true); - makeLoads(std::move(ssdLoad[1]), true); - makeLoads(std::move(storageLoad[0]), false); - makeLoads(std::move(ssdLoad[0]), false); + std::sort(storageLoad[0].begin(), storageLoad[0].end(), lessThan); + std::sort(storageLoad[1].begin(), storageLoad[1].end(), lessThan); + std::sort(ssdLoad[0].begin(), ssdLoad[0].end(), lessThan); + std::sort(ssdLoad[1].begin(), ssdLoad[1].end(), lessThan); + makeLoads(storageLoad); + makeLoads(ssdLoad); } -void CachedBufferedInput::makeLoads( - std::vector requests, - bool prefetch) { +template +void CachedBufferedInput::makeLoads(std::vector requests[2]) { + std::vector groupEnds[2]; + groupEnds[1] = groupRequests(requests[1], true); + moveCoalesced( + requests[1], + groupEnds[1], + requests[0], + [](auto* request) { return getOffset(*request); }, + [](auto* request) { return getOffset(*request) + request->size; }); + groupEnds[0] = groupRequests(requests[0], false); + readRegions(requests[1], true, groupEnds[1]); + readRegions(requests[0], false, groupEnds[0]); +} + +template +std::vector CachedBufferedInput::groupRequests( + const std::vector& requests, + bool prefetch) const { if (requests.empty() || (requests.size() < 2 && !prefetch)) { - return; + return {}; } - const bool isSsd = !requests[0]->ssdPin.empty(); - const int32_t maxDistance = isSsd ? 20000 : options_.maxCoalesceDistance(); - std::sort( - requests.begin(), - requests.end(), - [&](const CacheRequest* left, const CacheRequest* right) { - if (isSsd) { - return left->ssdPin.run().offset() < right->ssdPin.run().offset(); - } else { - return left->key.offset < right->key.offset; - } - }); + const int32_t maxDistance = kSsd ? 20000 : options_.maxCoalesceDistance(); // Combine adjacent short reads. - int32_t numNewLoads = 0; int64_t coalescedBytes = 0; - coalesceIo( + std::vector ends; + ends.reserve(requests.size()); + std::vector ranges; + coalesceIo( requests, maxDistance, std::numeric_limits::max(), - [&](int32_t index) { - return isSsd ? requests[index]->ssdPin.run().offset() - : requests[index]->key.offset; - }, + [&](int32_t index) { return getOffset(*requests[index]); }, [&](int32_t index) { const auto size = requests[index]->size; coalescedBytes += size; @@ -235,41 +262,16 @@ void CachedBufferedInput::makeLoads( } return requests[index]->coalesces ? 1 : kNoCoalesce; }, - [&](CacheRequest* request, std::vector& ranges) { - ranges.push_back(request); + [&](CacheRequest* /*request*/, std::vector& ranges) { + ranges.push_back(0); }, - [&](int32_t /*gap*/, std::vector /*ranges*/) { /*no op*/ }, + [&](int32_t /*gap*/, std::vector /*ranges*/) { /*no op*/ }, [&](const std::vector& /*requests*/, int32_t /*begin*/, - int32_t /*end*/, + int32_t end, uint64_t /*offset*/, - const std::vector& ranges) { - ++numNewLoads; - readRegion(ranges, prefetch); - }); - - if (prefetch && (executor_ != nullptr)) { - std::vector doneIndices; - for (auto i = 0; i < allCoalescedLoads_.size(); ++i) { - auto& load = allCoalescedLoads_[i]; - if (load->state() == CoalescedLoad::State::kPlanned) { - executor_->add( - [pendingLoad = load, ssdSavable = !options_.noCacheRetention()]() { - process::TraceContext trace("Read Ahead"); - pendingLoad->loadOrFuture(nullptr, ssdSavable); - }); - } else { - doneIndices.push_back(i); - } - } - - // Remove the loads that were complete. There can be done loads if the same - // CachedBufferedInput has multiple cycles of enqueues and loads. - for (int32_t i = doneIndices.size() - 1; i >= 0; --i) { - assert(!doneIndices.empty()); // lint - allCoalescedLoads_.erase(allCoalescedLoads_.begin() + doneIndices[i]); - } - } + const std::vector& /*ranges*/) { ends.push_back(end); }); + return ends; } namespace { @@ -469,6 +471,46 @@ void CachedBufferedInput::readRegion( }); } +void CachedBufferedInput::readRegions( + const std::vector& requests, + bool prefetch, + const std::vector& groupEnds) { + int i = 0; + std::vector group; + for (auto end : groupEnds) { + while (i < end) { + group.push_back(requests[i++]); + } + readRegion(group, prefetch); + group.clear(); + } + if (prefetch && executor_) { + std::vector doneIndices; + for (auto i = 0; i < allCoalescedLoads_.size(); ++i) { + auto& load = allCoalescedLoads_[i]; + if (load->state() == CoalescedLoad::State::kPlanned) { + executor_->add( + [pendingLoad = load, ssdSavable = !options_.noCacheRetention()]() { + process::TraceContext trace("Read Ahead"); + pendingLoad->loadOrFuture(nullptr, ssdSavable); + }); + } else { + doneIndices.push_back(i); + } + } + // Remove the loads that were complete. There can be done loads if the same + // CachedBufferedInput has multiple cycles of enqueues and loads. + for (int i = 0, j = 0, k = 0; i < allCoalescedLoads_.size(); ++i) { + if (j < doneIndices.size() && doneIndices[j] == i) { + ++j; + } else { + allCoalescedLoads_[k++] = std::move(allCoalescedLoads_[i]); + } + } + allCoalescedLoads_.resize(allCoalescedLoads_.size() - doneIndices.size()); + } +} + std::shared_ptr CachedBufferedInput::coalescedLoad( const SeekableInputStream* stream) { return coalescedLoads_.withWLock( @@ -478,7 +520,7 @@ std::shared_ptr CachedBufferedInput::coalescedLoad( return nullptr; } auto load = std::move(it->second); - auto* dwioLoad = dynamic_cast(load.get()); + auto* dwioLoad = static_cast(load.get()); for (auto& request : dwioLoad->requests()) { loads.erase(request.stream); } diff --git a/velox/dwio/common/CachedBufferedInput.h b/velox/dwio/common/CachedBufferedInput.h index 240ae8354bd8..c29251e930df 100644 --- a/velox/dwio/common/CachedBufferedInput.h +++ b/velox/dwio/common/CachedBufferedInput.h @@ -169,9 +169,10 @@ class CachedBufferedInput : public BufferedInput { } private: - // Sorts requests and makes CoalescedLoads for nearby requests. If 'prefetch' - // is true, starts background loading. - void makeLoads(std::vector requests, bool prefetch); + template + std::vector groupRequests( + const std::vector& requests, + bool prefetch) const; // Makes a CoalescedLoad for 'requests' to be read together, coalescing IO is // appropriate. If 'prefetch' is set, schedules the CoalescedLoad on @@ -179,6 +180,14 @@ class CachedBufferedInput : public BufferedInput { // concerns. void readRegion(const std::vector& requests, bool prefetch); + void readRegions( + const std::vector& requests, + bool prefetch, + const std::vector& groupEnds); + + template + void makeLoads(std::vector requests[2]); + // We only support up to 8MB load quantum size on SSD and there is no need for // larger SSD read size performance wise. void checkLoadQuantum() { diff --git a/velox/dwio/common/DirectBufferedInput.cpp b/velox/dwio/common/DirectBufferedInput.cpp index d580a14250ad..6d606fad059c 100644 --- a/velox/dwio/common/DirectBufferedInput.cpp +++ b/velox/dwio/common/DirectBufferedInput.cpp @@ -81,6 +81,10 @@ bool isPrefetchablePct(int32_t pct) { return pct >= FLAGS_cache_prefetch_min_pct; } +bool lessThan(const LoadRequest* left, const LoadRequest* right) { + return left->region < right->region; +} + } // namespace void DirectBufferedInput::load(const LogType /*unused*/) { @@ -100,17 +104,30 @@ void DirectBufferedInput::load(const LogType /*unused*/) { : 0; storageLoad[loadIndex].push_back(&request); } - makeLoads(std::move(storageLoad[1]), true); - makeLoads(std::move(storageLoad[0]), false); + std::sort(storageLoad[1].begin(), storageLoad[1].end(), lessThan); + std::sort(storageLoad[0].begin(), storageLoad[0].end(), lessThan); + std::vector groupEnds[2]; + groupEnds[1] = groupRequests(storageLoad[1], true); + moveCoalesced( + storageLoad[1], + groupEnds[1], + storageLoad[0], + [](auto* request) { return request->region.offset; }, + [](auto* request) { + return request->region.offset + request->region.length; + }); + groupEnds[0] = groupRequests(storageLoad[0], false); + readRegions(storageLoad[1], true, groupEnds[1]); + readRegions(storageLoad[0], false, groupEnds[0]); } -void DirectBufferedInput::makeLoads( - std::vector requests, - bool prefetch) { +std::vector DirectBufferedInput::groupRequests( + const std::vector& requests, + bool prefetch) const { if (requests.empty() || (requests.size() < 2 && !prefetch)) { // A single request has no other requests to coalesce with and is not // eligible to prefetch. This will be loaded by itself on first use. - return; + return {}; } const int32_t maxDistance = options_.maxCoalesceDistance(); const auto loadQuantum = options_.loadQuantum(); @@ -119,17 +136,13 @@ void DirectBufferedInput::makeLoads( // is correlated. const auto maxCoalesceBytes = prefetch ? options_.maxCoalesceBytes() : loadQuantum; - std::sort( - requests.begin(), - requests.end(), - [&](const LoadRequest* left, const LoadRequest* right) { - return left->region.offset < right->region.offset; - }); // Combine adjacent short reads. - int32_t numNewLoads = 0; int64_t coalescedBytes = 0; - coalesceIo( + std::vector ends; + ends.reserve(requests.size()); + std::vector ranges; + coalesceIo( requests, maxDistance, // Break batches up. Better load more short ones i parallel. @@ -151,33 +164,21 @@ void DirectBufferedInput::makeLoads( } return 1; }, - [&](LoadRequest* request, std::vector& ranges) { - ranges.push_back(request); + [&](LoadRequest* /*request*/, std::vector& ranges) { + // ranges.size() is used in coalesceIo so we cannot leave it empty. + ranges.push_back(0); }, - [&](int32_t /*gap*/, std::vector /*ranges*/) { /*no op*/ }, + [&](int32_t /*gap*/, std::vector /*ranges*/) { /*no op*/ }, [&](const std::vector& /*requests*/, int32_t /*begin*/, - int32_t /*end*/, + int32_t end, uint64_t /*offset*/, - const std::vector& ranges) { - ++numNewLoads; - readRegion(ranges, prefetch); - }); - if (prefetch && executor_) { - for (auto i = 0; i < coalescedLoads_.size(); ++i) { - auto& load = coalescedLoads_[i]; - if (load->state() == CoalescedLoad::State::kPlanned) { - executor_->add([pendingLoad = load]() { - process::TraceContext trace("Read Ahead"); - pendingLoad->loadOrFuture(nullptr); - }); - } - } - } + const std::vector& /*ranges*/) { ends.push_back(end); }); + return ends; } void DirectBufferedInput::readRegion( - std::vector requests, + const std::vector& requests, bool prefetch) { if (requests.empty() || (requests.size() == 1 && !prefetch)) { return; @@ -192,6 +193,32 @@ void DirectBufferedInput::readRegion( }); } +void DirectBufferedInput::readRegions( + const std::vector& requests, + bool prefetch, + const std::vector& groupEnds) { + int i = 0; + std::vector group; + for (auto end : groupEnds) { + while (i < end) { + group.push_back(requests[i++]); + } + readRegion(group, prefetch); + group.clear(); + } + if (prefetch && executor_) { + for (auto i = 0; i < coalescedLoads_.size(); ++i) { + auto& load = coalescedLoads_[i]; + if (load->state() == CoalescedLoad::State::kPlanned) { + executor_->add([pendingLoad = load]() { + process::TraceContext trace("Read Ahead"); + pendingLoad->loadOrFuture(nullptr); + }); + } + } + } +} + std::shared_ptr DirectBufferedInput::coalescedLoad( const SeekableInputStream* stream) { return streamToCoalescedLoad_.withWLock( diff --git a/velox/dwio/common/DirectBufferedInput.h b/velox/dwio/common/DirectBufferedInput.h index 734b3bfeb026..97559bc397f7 100644 --- a/velox/dwio/common/DirectBufferedInput.h +++ b/velox/dwio/common/DirectBufferedInput.h @@ -204,15 +204,20 @@ class DirectBufferedInput : public BufferedInput { fileSize_(input_->getLength()), options_(readerOptions) {} - // Sorts requests and makes CoalescedLoads for nearby requests. If 'prefetch' - // is true, starts background loading. - void makeLoads(std::vector requests, bool prefetch); + std::vector groupRequests( + const std::vector& requests, + bool prefetch) const; // Makes a CoalescedLoad for 'requests' to be read together, coalescing IO if // appropriate. If 'prefetch' is set, schedules the CoalescedLoad on // 'executor_'. Links the CoalescedLoad to all DirectInputStreams that it // covers. - void readRegion(std::vector requests, bool prefetch); + void readRegion(const std::vector& requests, bool prefetch); + + void readRegions( + const std::vector& requests, + bool prefetch, + const std::vector& groupEnds); const uint64_t fileNum_; const std::shared_ptr tracker_; diff --git a/velox/dwio/dwrf/test/DirectBufferedInputTest.cpp b/velox/dwio/dwrf/test/DirectBufferedInputTest.cpp index 842dd85cf2d6..20554176cf93 100644 --- a/velox/dwio/dwrf/test/DirectBufferedInputTest.cpp +++ b/velox/dwio/dwrf/test/DirectBufferedInputTest.cpp @@ -39,6 +39,7 @@ using IoStatisticsPtr = std::shared_ptr; struct TestRegion { int32_t offset; int32_t length; + bool read = true; }; class DirectBufferedInputTest : public testing::Test { @@ -92,7 +93,7 @@ class DirectBufferedInputTest : public testing::Test { } input->load(LogType::FILE); for (auto i = 0; i < regions.size(); ++i) { - if (regions[i].length > 0) { + if (regions[i].read && regions[i].length > 0) { checkRead(streams[i].get(), regions[i]); } } @@ -186,3 +187,8 @@ TEST_F(DirectBufferedInputTest, basic) { // in one part. testLoads({{1000, 9000000}, {9010000, 1000000}}, 3); } + +TEST_F(DirectBufferedInputTest, noRedownloadCoalescedPrefetch) { + testLoads({{100, 100}, {201, 1, false}, {202, 100}}, 1); + testLoads({{100, 100}, {201, 1, true}, {202, 100}}, 1); +}