Skip to content

Commit

Permalink
SSD cache code cleanup (facebookincubator#9724)
Browse files Browse the repository at this point in the history
Summary: Pull Request resolved: facebookincubator#9724

Reviewed By: zacw7

Differential Revision: D57012132

Pulled By: xiaoxmeng

fbshipit-source-id: 390ec69d19206a7dac4aabc915db4b3b140076f4
  • Loading branch information
xiaoxmeng authored and facebook-github-bot committed May 7, 2024
1 parent 5911129 commit 668d578
Show file tree
Hide file tree
Showing 8 changed files with 142 additions and 144 deletions.
6 changes: 3 additions & 3 deletions velox/common/caching/AsyncDataCache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -519,9 +519,9 @@ void CacheShard::updateStats(CacheStats& stats) {

void CacheShard::appendSsdSaveable(std::vector<CachePin>& pins) {
std::lock_guard<std::mutex> l(mutex_);
// Do not add more than 70% of entries to a write batch.If SSD save
// is slower than storage read, we must not have a situation where
// SSD save pins everything and stops reading.
// Do not add more than 70% of entries to a write batch. If SSD save is slower
// than storage read, we must not have a situation where SSD save pins
// everything and stops reading.
const int32_t limit = (entries_.size() * 100) / 70;
VELOX_CHECK(cache_->ssdCache()->writeInProgress());
for (auto& entry : entries_) {
Expand Down
29 changes: 16 additions & 13 deletions velox/common/caching/SsdCache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,9 @@ SsdCache::SsdCache(
files_.reserve(numShards_);
// Cache size must be a multiple of this so that each shard has the same max
// size.
uint64_t sizeQuantum = numShards_ * SsdFile::kRegionSize;
int32_t fileMaxRegions = bits::roundUp(maxBytes, sizeQuantum) / sizeQuantum;
const uint64_t sizeQuantum = numShards_ * SsdFile::kRegionSize;
const int32_t fileMaxRegions =
bits::roundUp(maxBytes, sizeQuantum) / sizeQuantum;
for (auto i = 0; i < numShards_; ++i) {
files_.push_back(std::make_unique<SsdFile>(
fmt::format("{}{}", filePrefix_, i),
Expand All @@ -80,11 +81,12 @@ bool SsdCache::startWrite() {
}
// There were writes in progress, so compensate for the increment.
writesInProgress_.fetch_sub(numShards_);
VELOX_CHECK_GE(writesInProgress_, 0);
return false;
}

void SsdCache::write(std::vector<CachePin> pins) {
VELOX_CHECK_LE(numShards_, writesInProgress_);
VELOX_CHECK_GE(numShards_, writesInProgress_);

TestValue::adjust("facebook::velox::cache::SsdCache::write", this);

Expand All @@ -104,15 +106,16 @@ void SsdCache::write(std::vector<CachePin> pins) {
++numNoStore;
continue;
}

struct PinHolder {
std::vector<CachePin> pins;

explicit PinHolder(std::vector<CachePin>&& _pins)
: pins(std::move(_pins)) {}
};

// We move the mutable vector of pins to the executor. These must
// be wrapped in a shared struct to be passed via lambda capture.
// We move the mutable vector of pins to the executor. These must be wrapped
// in a shared struct to be passed via lambda capture.
auto pinHolder = std::make_shared<PinHolder>(std::move(shards[i]));
executor_->add([this, i, pinHolder, bytes, startTimeUs]() {
try {
Expand All @@ -128,8 +131,8 @@ void SsdCache::write(std::vector<CachePin> pins) {
// Typically occurs every few GB. Allows detecting unusually slow rates
// from failing devices.
VELOX_SSD_CACHE_LOG(INFO) << fmt::format(
"Wrote {}MB, {} MB/s",
bytes >> 20,
"Wrote {}, {} bytes/s",
succinctBytes(bytes),
static_cast<float>(bytes) / (getCurrentTimeMicro() - startTimeUs));
}
});
Expand Down Expand Up @@ -175,13 +178,13 @@ void SsdCache::clear() {
}

std::string SsdCache::toString() const {
auto data = stats();
uint64_t capacity = maxBytes();
const auto data = stats();
const uint64_t capacity = maxBytes();
std::stringstream out;
out << "Ssd cache IO: Write " << (data.bytesWritten >> 20) << "MB read "
<< (data.bytesRead >> 20) << "MB Size " << (capacity >> 30)
<< "GB Occupied " << (data.bytesCached >> 30) << "GB";
out << (data.entriesCached >> 10) << "K entries.";
out << "Ssd cache IO: Write " << succinctBytes(data.bytesWritten) << " read "
<< succinctBytes(data.bytesRead) << " Size " << succinctBytes(capacity)
<< " Occupied " << succinctBytes(data.bytesCached);
out << " " << (data.entriesCached >> 10) << "K entries.";
out << "\nGroupStats: " << groupStats_->toString(capacity);
return out.str();
}
Expand Down
20 changes: 8 additions & 12 deletions velox/common/caching/SsdCache.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,9 @@ class SsdCache {
/// next multiple of kRegionSize * 'numShards'. This means that all the shards
/// have an equal number of regions. For 2 shards and 200MB size, the size
/// rounds up to 256M with 2 shards each of 128M (2 regions).
/// If 'checkpointIntervalBytes' is non-0, the cache makes a durable
/// If 'checkpointIntervalBytes' is non-zero, the cache makes a durable
/// checkpointed state that survives restart after each
/// 'checkpointIntervalBytes' written.
/// If 'setNoCowFlagForSsdFiles' is true, the cache sets 'no copy on write'
/// flag to each file. This prevents the cache to go over the 'maxBytes',
/// eventually use up all disk space and stop working. Should be set to true
/// for file systems supporting COW (like brtfs).
/// If 'disableFileCow' is true, the cache disables the file COW (copy on
/// write) feature if the underlying filesystem (such as brtfs) supports it.
/// This prevents the actual cache space usage on disk from exceeding the
Expand Down Expand Up @@ -76,7 +72,7 @@ class SsdCache {
/// have returned true.
void write(std::vector<CachePin> pins);

/// Remove cached entries from all SsdFiles for files in the fileNum set
/// Removes cached entries from all SsdFiles for files in the fileNum set
/// 'filesToRemove'. If successful, return true, and 'filesRetained' contains
/// entries that should not be removed, ex., from pinned regions. Otherwise,
/// return false and 'filesRetained' could be ignored.
Expand Down Expand Up @@ -112,15 +108,15 @@ class SsdCache {
private:
const std::string filePrefix_;
const int32_t numShards_;
// Stats for selecting entries to save from AsyncDataCache.
const std::unique_ptr<FileGroupStats> groupStats_;
folly::Executor* const executor_;

std::vector<std::unique_ptr<SsdFile>> files_;

// Count of shards with unfinished writes.
std::atomic<int32_t> writesInProgress_{0};

// Stats for selecting entries to save from AsyncDataCache.
std::unique_ptr<FileGroupStats> groupStats_;
folly::Executor* executor_;
std::atomic<bool> isShutdown_{false};
std::atomic_int32_t writesInProgress_{0};
std::atomic_bool isShutdown_{false};
};

} // namespace facebook::velox::cache
130 changes: 66 additions & 64 deletions velox/common/caching/SsdFile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ SsdFile::SsdFile(
oDirect = FLAGS_ssd_odirect ? O_DIRECT : 0;
#endif // linux
fd_ = open(fileName_.c_str(), O_CREAT | O_RDWR | oDirect, S_IRUSR | S_IWUSR);
if (FOLLY_UNLIKELY(fd_ < 0)) {
if (fd_ < 0) {
++stats_.openFileErrors;
}
// TODO: add fault tolerant handling for open file errors.
Expand All @@ -153,23 +153,20 @@ SsdFile::SsdFile(
}

readFile_ = std::make_unique<LocalReadFile>(fd_);
uint64_t size = lseek(fd_, 0, SEEK_END);
numRegions_ = size / kRegionSize;
if (numRegions_ > maxRegions_) {
numRegions_ = maxRegions_;
}
const uint64_t size = lseek(fd_, 0, SEEK_END);
numRegions_ = std::min<int32_t>(size / kRegionSize, maxRegions_);
fileSize_ = numRegions_ * kRegionSize;
if (size % kRegionSize > 0 || size > numRegions_ * kRegionSize) {
ftruncate(fd_, fileSize_);
if ((size % kRegionSize > 0) || (size > numRegions_ * kRegionSize)) {
::ftruncate(fd_, fileSize_);
}
// The existing regions in the file are writable.
writableRegions_.resize(numRegions_);
std::iota(writableRegions_.begin(), writableRegions_.end(), 0);
tracker_.resize(maxRegions_);
regionSizes_.resize(maxRegions_);
erasedRegionSizes_.resize(maxRegions_);
regionPins_.resize(maxRegions_);
if (checkpointIntervalBytes_) {
regionSizes_.resize(maxRegions_, 0);
erasedRegionSizes_.resize(maxRegions_, 0);
regionPins_.resize(maxRegions_, 0);
if (checkpointIntervalBytes_ > 0) {
initializeCheckpoint();
}
}
Expand Down Expand Up @@ -330,7 +327,7 @@ bool SsdFile::growOrEvictLocked() {
<< newSize;
}

auto candidates =
const auto candidates =
tracker_.findEvictionCandidates(3, numRegions_, regionPins_);
if (candidates.empty()) {
suspended_ = true;
Expand Down Expand Up @@ -417,6 +414,7 @@ void SsdFile::write(std::vector<CachePin>& pins) {
std::lock_guard<std::shared_mutex> l(mutex_);
for (auto i = storeIndex; i < storeIndex + numWritten; ++i) {
auto* entry = pins[i].checkedEntry();
VELOX_CHECK_NULL(entry->ssdFile());
entry->setSsdFile(this, offset);
const auto size = entry->size();
FileCacheKey key = {
Expand Down Expand Up @@ -467,11 +465,9 @@ void SsdFile::verifyWrite(AsyncDataCacheEntry& entry, SsdRun ssdRun) {
for (auto i = 0; i < data.numRuns(); ++i) {
const auto run = data.runAt(i);
const auto compareSize = std::min<int64_t>(bytesLeft, run.numBytes());
auto badIndex = indexOfFirstMismatch(
const auto badIndex = indexOfFirstMismatch(
run.data<char>(), testData.get() + offset, compareSize);
if (badIndex != -1) {
VELOX_FAIL("Bad read back");
}
VELOX_CHECK_EQ(badIndex, -1, "Bad read back");
bytesLeft -= run.numBytes();
offset += run.numBytes();
if (bytesLeft <= 0) {
Expand Down Expand Up @@ -570,15 +566,15 @@ bool SsdFile::removeFileEntries(
continue;
}

entriesAgedOut++;
++entriesAgedOut;
erasedRegionSizes_[region] += ssdRun.size();

it = entries_.erase(it);
}

std::vector<int32_t> toFree;
toFree.reserve(numRegions_);
for (auto region = 0; region < numRegions_; region++) {
for (auto region = 0; region < numRegions_; ++region) {
if (erasedRegionSizes_[region] >
regionSizes_[region] * kMaxErasedSizePct / 100) {
toFree.push_back(region);
Expand Down Expand Up @@ -638,7 +634,7 @@ void SsdFile::deleteCheckpoint(bool keepLog) {
if ((logRc != 0) || (checkpointRc != 0)) {
++stats_.deleteCheckpointErrors;
VELOX_SSD_CACHE_LOG(ERROR)
<< "Error in deleting log and checkpoint. log: " << logRc
<< "Error in deleting log and checkpoint. log: " << logRc
<< " checkpoint: " << checkpointRc;
}
}
Expand Down Expand Up @@ -686,48 +682,6 @@ void SsdFile::checkpoint(bool force) {
return rc;
};

std::ofstream state;
auto checkpointPath = fileName_ + kCheckpointExtension;
state.exceptions(std::ofstream::failbit);
state.open(checkpointPath, std::ios_base::out | std::ios_base::trunc);
// The checkpoint state file contains:
// int32_t The 4 bytes of kCheckpointMagic,
// int32_t maxRegions,
// int32_t numRegions,
// regionScores from the 'tracker_',
// {fileId, fileName} pairs,
// kMapMarker,
// {fileId, offset, SSdRun} triples,
// kEndMarker.
state.write(kCheckpointMagic, sizeof(int32_t));
state.write(asChar(&maxRegions_), sizeof(maxRegions_));
state.write(asChar(&numRegions_), sizeof(numRegions_));

// Copy the region scores before writing out for tsan.
const auto scoresCopy = tracker_.copyScores();
state.write(asChar(scoresCopy.data()), maxRegions_ * sizeof(uint64_t));
std::unordered_set<uint64_t> fileNums;
for (const auto& entry : entries_) {
const auto fileNum = entry.first.fileNum.id();
if (fileNums.insert(fileNum).second) {
state.write(asChar(&fileNum), sizeof(fileNum));
const auto name = fileIds().string(fileNum);
const int32_t length = name.size();
state.write(asChar(&length), sizeof(length));
state.write(name.data(), length);
}
}

const auto mapMarker = kCheckpointMapMarker;
state.write(asChar(&mapMarker), sizeof(mapMarker));
for (auto& pair : entries_) {
auto id = pair.first.fileNum.id();
state.write(asChar(&id), sizeof(id));
state.write(asChar(&pair.first.offset), sizeof(pair.first.offset));
auto offsetAndSize = pair.second.bits();
state.write(asChar(&offsetAndSize), sizeof(offsetAndSize));
}

// We schedule the potentially long fsync of the cache file on another
// thread of the cache write executor, if available. If there is none, we do
// the sync on this thread at the end.
Expand All @@ -737,6 +691,53 @@ void SsdFile::checkpoint(bool force) {
executor_->add([fileSync]() { fileSync->prepare(); });
}

std::ofstream state;
const auto checkpointPath = fileName_ + kCheckpointExtension;
try {
state.exceptions(std::ofstream::failbit);
state.open(checkpointPath, std::ios_base::out | std::ios_base::trunc);
// The checkpoint state file contains:
// int32_t The 4 bytes of kCheckpointMagic,
// int32_t maxRegions,
// int32_t numRegions,
// regionScores from the 'tracker_',
// {fileId, fileName} pairs,
// kMapMarker,
// {fileId, offset, SSdRun} triples,
// kEndMarker.
state.write(kCheckpointMagic, sizeof(int32_t));
state.write(asChar(&maxRegions_), sizeof(maxRegions_));
state.write(asChar(&numRegions_), sizeof(numRegions_));

// Copy the region scores before writing out for tsan.
const auto scoresCopy = tracker_.copyScores();
state.write(asChar(scoresCopy.data()), maxRegions_ * sizeof(uint64_t));
std::unordered_set<uint64_t> fileNums;
for (const auto& entry : entries_) {
const auto fileNum = entry.first.fileNum.id();
if (fileNums.insert(fileNum).second) {
state.write(asChar(&fileNum), sizeof(fileNum));
const auto name = fileIds().string(fileNum);
const int32_t length = name.size();
state.write(asChar(&length), sizeof(length));
state.write(name.data(), length);
}
}

const auto mapMarker = kCheckpointMapMarker;
state.write(asChar(&mapMarker), sizeof(mapMarker));
for (auto& pair : entries_) {
auto id = pair.first.fileNum.id();
state.write(asChar(&id), sizeof(id));
state.write(asChar(&pair.first.offset), sizeof(pair.first.offset));
auto offsetAndSize = pair.second.bits();
state.write(asChar(&offsetAndSize), sizeof(offsetAndSize));
}
} catch (const std::exception& e) {
fileSync->close();
std::rethrow_exception(std::current_exception());
}

// NOTE: we need to ensure cache file data sync update completes before
// updating checkpoint file.
const auto fileSyncRc = fileSync->move();
Expand Down Expand Up @@ -785,6 +786,7 @@ void SsdFile::initializeCheckpoint() {
if (checkpointIntervalBytes_ == 0) {
return;
}

bool hasCheckpoint = true;
std::ifstream state(fileName_ + kCheckpointExtension);
if (!state.is_open()) {
Expand Down Expand Up @@ -854,7 +856,7 @@ T readNumber(std::ifstream& stream) {
void SsdFile::readCheckpoint(std::ifstream& state) {
char magic[4];
state.read(magic, sizeof(magic));
VELOX_CHECK_EQ(strncmp(magic, kCheckpointMagic, 4), 0);
VELOX_CHECK_EQ(::strncmp(magic, kCheckpointMagic, 4), 0);
const auto maxRegions = readNumber<int32_t>(state);
VELOX_CHECK_EQ(
maxRegions,
Expand All @@ -865,7 +867,7 @@ void SsdFile::readCheckpoint(std::ifstream& state) {
state.read(asChar(scores.data()), maxRegions_ * sizeof(uint64_t));
std::unordered_map<uint64_t, StringIdLease> idMap;
for (;;) {
auto id = readNumber<uint64_t>(state);
const auto id = readNumber<uint64_t>(state);
if (id == kCheckpointMapMarker) {
break;
}
Expand Down
Loading

0 comments on commit 668d578

Please sign in to comment.