Skip to content

Commit

Permalink
Add fuzzer for async data cache
Browse files Browse the repository at this point in the history
Introduce a basic fuzzer for the async data cache. Each iteration
involves:
1. Creating a set of data files of varying sizes.
2. Setting up the async data cache with an SSD using a specified
configuration.
3. Performing parallel random reads from these data files.
In the initial setup, most of the parameters are defined as gflags and
we'll decide later which parameters should be fuzzed during the tests.
  • Loading branch information
zacw7 committed Jun 18, 2024
1 parent 718c8e9 commit 6d762ea
Show file tree
Hide file tree
Showing 7 changed files with 480 additions and 0 deletions.
353 changes: 353 additions & 0 deletions velox/exec/fuzzer/AsyncDataCacheFuzzer.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,353 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "velox/exec/fuzzer/AsyncDataCacheFuzzer.h"

#include <boost/random/uniform_int_distribution.hpp>

#include <folly/executors/IOThreadPoolExecutor.h>
#include "velox/common/caching/FileIds.h"
#include "velox/common/caching/SsdCache.h"
#include "velox/common/file/File.h"
#include "velox/common/file/FileSystems.h"
#include "velox/common/memory/Memory.h"
#include "velox/common/memory/MmapAllocator.h"
#include "velox/dwio/common/CachedBufferedInput.h"
#include "velox/exec/tests/utils/TempDirectoryPath.h"
#include "velox/vector/fuzzer/VectorFuzzer.h"

DEFINE_int32(steps, 10, "Number of plans to generate and test.");

DEFINE_int32(
duration_sec,
0,
"For how long it should run (in seconds). If zero, "
"it executes exactly --steps iterations and exits.");

DEFINE_int32(
max_num_reads,
100,
"Max number of reads to be performed per thread.");

DEFINE_int32(num_threads, 16, "Number of threads to read.");

DEFINE_int32(num_files, 8, "Number of data files to be created.");

DEFINE_uint64(
offset_interval_bytes,
8 << 20,
"The offset bytes to be aligned at for cache reads.");

DEFINE_uint64(
min_file_bytes,
32 << 20,
"Minimum file size in bytes of the data files to be created.");

DEFINE_uint64(
max_file_bytes,
64 << 20,
"Maximum file size in bytes of the data files to be created.");

DEFINE_int32(num_files_in_group, 3, "Number of files to be grouped together.");

DEFINE_int64(memory_cache_bytes, 16 << 20, "Memory cache size in bytes.");

DEFINE_uint64(ssd_cache_bytes, 128 << 20, "Ssd cache size in bytes.");

DEFINE_int32(num_shards, 4, "Number of shards of SSD cache.");

DEFINE_uint64(
ssd_checkpoint_interval_bytes,
64 << 20,
"Checkpoint after every 'ssd_checkpoint_interval_bytes'/'num_shards', "
"written into each file. 0 means no checkpointing.");

DEFINE_bool(enable_checksum, true, "Enable checksum write to SSD.");

DEFINE_bool(
enable_read_verification,
true,
"Enable checksum read verification from SSD.");

using namespace facebook::velox::cache;
using namespace facebook::velox::dwio::common;

namespace facebook::velox::exec::test {
namespace {

class AsyncDataCacheFuzzer {
public:
explicit AsyncDataCacheFuzzer(size_t seed);

void go();

private:
void seed(size_t seed) {
currentSeed_ = seed;
rng_.seed(currentSeed_);
}

void reSeed() {
seed(rng_());
}

folly::IOThreadPoolExecutor* executor() {
static std::mutex mutex;
std::lock_guard<std::mutex> l(mutex);
if (!executor_) {
// We have up to 20 threads and 16 threads are used for reading so
// there are some threads left over for SSD background write.
executor_ = std::make_unique<folly::IOThreadPoolExecutor>(20);
}
return executor_.get();
}

void initializeDataFiles();

void initializeCache();

void initializeInputs();

void readInParallel();

void reset();

uint64_t read(uint32_t fileIdx, uint64_t offset, uint64_t length);

FuzzerGenerator rng_;
size_t currentSeed_{0};
std::shared_ptr<memory::MemoryPool> rootPool_{
memory::memoryManager()->addRootPool()};
std::shared_ptr<memory::MemoryPool> pool_{rootPool_->addLeafChild("leaf")};
std::vector<StringIdLease> fileIds_;
std::vector<size_t> fileSizes_;
std::vector<StringIdLease> groupIds_;
std::vector<std::shared_ptr<LocalReadFile>> readFiles_;
std::vector<std::unique_ptr<CachedBufferedInput>> inputs_;
std::shared_ptr<exec::test::TempDirectoryPath> tempDirectory_;
std::unique_ptr<memory::MemoryManager> manager_;
memory::MemoryAllocator* allocator_{};
std::unique_ptr<folly::IOThreadPoolExecutor> executor_;
std::shared_ptr<AsyncDataCache> cache_;
};

template <typename T>
bool isDone(size_t i, T startTime) {
if (FLAGS_duration_sec > 0) {
const std::chrono::duration<double> elapsed =
std::chrono::system_clock::now() - startTime;
return elapsed.count() >= FLAGS_duration_sec;
}
return i >= FLAGS_steps;
}

// Calls func on 'numThreads' in parallel.
template <typename Func>
void runThreads(int32_t numThreads, Func func) {
std::vector<std::thread> threads;
threads.reserve(numThreads);
for (int32_t i = 0; i < numThreads; ++i) {
threads.push_back(std::thread([i, func]() { func(i); }));
}
for (auto& thread : threads) {
thread.join();
}
}

AsyncDataCacheFuzzer::AsyncDataCacheFuzzer(size_t initialSeed) {
seed(initialSeed);
}

void AsyncDataCacheFuzzer::initializeDataFiles() {
tempDirectory_ = exec::test::TempDirectoryPath::create();

// Create files with random sizes, populated with random data.
if (fileIds_.empty()) {
for (auto i = 0; i < FLAGS_num_files; ++i) {
auto fileName = fmt::format("{}/file_{}", tempDirectory_->getPath(), i);

const size_t fileSize = boost::random::uniform_int_distribution<uint64_t>(
FLAGS_min_file_bytes, FLAGS_max_file_bytes)(rng_);
auto writeFile = LocalWriteFile(fileName);
size_t writtenSize = 0;
while (writtenSize < fileSize) {
size_t chunkSize = std::min(
fileSize - writtenSize,
size_t(4 * 1024 * 1024)); // Write in chunks of 4MB
auto buffer = folly::IOBuf::create(chunkSize);
buffer->append(chunkSize);
// Fill buffer with random data
std::generate_n(buffer->writableData(), chunkSize, []() -> uint8_t {
// TODO: fill files with verifiable data.
return static_cast<uint8_t>(rand() % 256);
});
writeFile.append(std::move(buffer));
writtenSize += chunkSize;
}
writeFile.close();

fileIds_.emplace_back(fileIds(), fileName);
readFiles_.emplace_back(std::make_shared<LocalReadFile>(fileName));
fileSizes_.emplace_back(fileSize);
}
}
}

void AsyncDataCacheFuzzer::initializeCache() {
std::unique_ptr<SsdCache> ssdCache;
if (FLAGS_ssd_cache_bytes > 0) {
SsdCache::Config config(
fmt::format("{}/cache", tempDirectory_->getPath()),
FLAGS_ssd_cache_bytes,
FLAGS_num_shards,
executor(),
FLAGS_ssd_checkpoint_interval_bytes,
false,
FLAGS_enable_checksum,
FLAGS_enable_read_verification);
ssdCache = std::make_unique<SsdCache>(config);
}

memory::MemoryManagerOptions options;
options.useMmapAllocator = true;
options.allocatorCapacity = FLAGS_memory_cache_bytes;
options.arbitratorCapacity = FLAGS_memory_cache_bytes;
options.arbitratorReservedCapacity = 0;
options.trackDefaultUsage = true;
manager_ = std::make_unique<memory::MemoryManager>(options);
allocator_ = dynamic_cast<memory::MmapAllocator*>(manager_->allocator());

// TODO: Test different ssd write behaviors with AsyncDataCache::Options.
cache_ = AsyncDataCache::create(allocator_, std::move(ssdCache), {});
}

void AsyncDataCacheFuzzer::initializeInputs() {
auto readOptions = io::ReaderOptions(pool_.get());
auto tracker = std::make_shared<ScanTracker>(
"testTracker", nullptr, io::ReaderOptions::kDefaultLoadQuantum);
auto ioStats = std::make_shared<IoStatistics>();
inputs_.reserve(FLAGS_num_files);
for (auto i = 0; i < FLAGS_num_files; ++i) {
groupIds_.emplace_back(
fileIds(), fmt::format("group{}", i / FLAGS_num_files_in_group));
inputs_.emplace_back(std::make_unique<CachedBufferedInput>(
readFiles_[i],
MetricsLog::voidLog(),
fileIds_[i].id(),
cache_.get(),
tracker,
groupIds_[i].id(),
ioStats,
executor_.get(),
readOptions));
}
}

void AsyncDataCacheFuzzer::readInParallel() {
runThreads(FLAGS_num_threads, [&](int32_t /*i*/) {
auto numReads = boost::random::uniform_int_distribution<uint32_t>(
10, FLAGS_max_num_reads)(rng_);
while (numReads-- > 0) {
const auto fileIdx = boost::random::uniform_int_distribution<uint32_t>(
0, FLAGS_num_files - 1)(rng_);
// Align read offset at N * kOffsetIntervalBytes.
const auto offset =
boost::random::uniform_int_distribution<uint64_t>(
0, fileSizes_[fileIdx] / FLAGS_offset_interval_bytes)(rng_) *
FLAGS_offset_interval_bytes;
const auto length = boost::random::uniform_int_distribution<uint64_t>(
0, fileSizes_[fileIdx] - offset)(rng_);
if (length > 0) {
read(fileIdx, offset, length);
}
}
});
}

void AsyncDataCacheFuzzer::reset() {
cache_->shutdown();
cache_->ssdCache()->testingWaitForWriteToFinish();
executor_->join();
executor_.reset();
fileIds_.clear();
fileSizes_.clear();
groupIds_.clear();
readFiles_.clear();
inputs_.clear();
cache_.reset();
manager_.reset();
tempDirectory_.reset();
fileIds().testingReset();
}

uint64_t
AsyncDataCacheFuzzer::read(uint32_t fileIdx, uint64_t offset, uint64_t length) {
// TODO: Test data corruption.
auto stream = inputs_[fileIdx]->read(offset, length, LogType::TEST);
const void* buffer;
int32_t size;
int32_t numRead = 0;
while (numRead < length) {
if (stream->Next(&buffer, &size)) {
numRead += size;
} else {
break;
}
}
return numRead;
}

void AsyncDataCacheFuzzer::go() {
VELOX_CHECK(
FLAGS_steps > 0 || FLAGS_duration_sec > 0,
"Either --steps or --duration_sec needs to be greater than zero.")

auto startTime = std::chrono::system_clock::now();
size_t iteration = 0;

while (!isDone(iteration, startTime)) {
LOG(INFO) << "==============================> Started iteration "
<< iteration << " (seed: " << currentSeed_ << ")";

initializeDataFiles();

initializeCache();

initializeInputs();

readInParallel();

// TODO: Test cache restart.

LOG(INFO) << cache_->refreshStats().toString();

reset();

LOG(INFO) << "==============================> Done with iteration "
<< iteration;

reSeed();
++iteration;
}
}

} // namespace

void asyncDataCacheFuzzer(size_t seed) {
auto asyncDataCacheFuzzer = AsyncDataCacheFuzzer(seed);
asyncDataCacheFuzzer.go();
}
} // namespace facebook::velox::exec::test
25 changes: 25 additions & 0 deletions velox/exec/fuzzer/AsyncDataCacheFuzzer.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once

#include <cstddef>

namespace facebook::velox::exec::test {
/// Runs the async data cache fuzzer.
/// @param seed Random seed - Pass the same seed for reproducibility.
void asyncDataCacheFuzzer(size_t seed);
} // namespace facebook::velox::exec::test

Loading

0 comments on commit 6d762ea

Please sign in to comment.