Skip to content

Commit

Permalink
Add fuzzer for async data cache (facebookincubator#10244)
Browse files Browse the repository at this point in the history
Summary:
Introduce a basic fuzzer for the async data cache. Each iteration involves:
1. Creating a set of data files of varying sizes.
2. Setting up the async data cache with an SSD using a specified configuration.
3. Performing parallel random reads from these data files.

In the initial setup, most of the parameters are defined as gflags and we'll decide later which parameters should be fuzzed during the tests.

Pull Request resolved: facebookincubator#10244

Differential Revision: D58715904

Pulled By: zacw7
  • Loading branch information
zacw7 authored and facebook-github-bot committed Jun 19, 2024
1 parent c6d7390 commit 7aeb378
Show file tree
Hide file tree
Showing 7 changed files with 515 additions and 0 deletions.
36 changes: 36 additions & 0 deletions velox/docs/develop/testing/async-data-cache-fuzzer.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
=======================
Async Data Cache Fuzzer
=======================

Async data cache fuzzer is designed to test the correctness and performance of
the async data cache in Velox.

During each iteration, the fuzzer performs the following actions steps by steps:
1. Creating a set of data files of varying sizes.
2. Setting up the async data cache with an SSD using a specified configuration.
3. Performing parallel random reads from these data files.

How to run
----------

Use velox_async_data_cache_fuzzer_test binary to run async data cache fuzzer:

::

velox/exec/tests/velox_async_data_cache_fuzzer_test

By default, the fuzzer will go through 10 interations. Use --steps
or --duration-sec flag to run fuzzer for longer. Use --seed to
reproduce fuzzer failures.

Here is a full list of supported command line arguments.

* ``–-steps``: How many iterations to run. Each iteration generates and
evaluates one tale writer plan. Default is 10.

* ``–-duration_sec``: For how long to run in seconds. If both ``-–steps``
and ``-–duration_sec`` are specified, –duration_sec takes precedence.

* ``–-seed``: The seed to generate random expressions and input vectors with.

If running from CLion IDE, add ``--logtostderr=1`` to see the full output.
354 changes: 354 additions & 0 deletions velox/exec/fuzzer/AsyncDataCacheFuzzer.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,354 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "velox/exec/fuzzer/AsyncDataCacheFuzzer.h"

#include <boost/random/uniform_int_distribution.hpp>

#include <folly/executors/IOThreadPoolExecutor.h>
#include "velox/common/caching/FileIds.h"
#include "velox/common/caching/SsdCache.h"
#include "velox/common/file/File.h"
#include "velox/common/file/FileSystems.h"
#include "velox/common/memory/Memory.h"
#include "velox/common/memory/MmapAllocator.h"
#include "velox/dwio/common/CachedBufferedInput.h"
#include "velox/exec/tests/utils/TempDirectoryPath.h"
#include "velox/vector/fuzzer/VectorFuzzer.h"

DEFINE_int32(steps, 10, "Number of plans to generate and test.");

DEFINE_int32(
duration_sec,
0,
"For how long it should run (in seconds). If zero, "
"it executes exactly --steps iterations and exits.");

DEFINE_int32(
max_num_reads,
100,
"Max number of reads to be performed per thread.");

DEFINE_int32(num_threads, 16, "Number of threads to read.");

DEFINE_int32(num_files, 8, "Number of data files to be created.");

DEFINE_uint64(
offset_interval_bytes,
8 << 20,
"The offset bytes to be aligned at for cache reads.");

DEFINE_uint64(
min_file_bytes,
32 << 20,
"Minimum file size in bytes of the data files to be created.");

DEFINE_uint64(
max_file_bytes,
64 << 20,
"Maximum file size in bytes of the data files to be created.");

DEFINE_int32(num_files_in_group, 3, "Number of files to be grouped together.");

DEFINE_int64(memory_cache_bytes, 16 << 20, "Memory cache size in bytes.");

DEFINE_uint64(ssd_cache_bytes, 128 << 20, "Ssd cache size in bytes.");

DEFINE_int32(num_shards, 4, "Number of shards of SSD cache.");

DEFINE_uint64(
ssd_checkpoint_interval_bytes,
64 << 20,
"Checkpoint after every 'ssd_checkpoint_interval_bytes'/'num_shards', "
"written into each file. 0 means no checkpointing.");

DEFINE_bool(enable_checksum, true, "Enable checksum write to SSD.");

DEFINE_bool(
enable_read_verification,
true,
"Enable checksum read verification from SSD.");

using namespace facebook::velox::cache;
using namespace facebook::velox::dwio::common;

namespace facebook::velox::exec::test {
namespace {

class AsyncDataCacheFuzzer {
public:
explicit AsyncDataCacheFuzzer(size_t seed);

void go();

private:
void seed(size_t seed) {
currentSeed_ = seed;
rng_.seed(currentSeed_);
}

void reSeed() {
seed(rng_());
}

folly::IOThreadPoolExecutor* executor() {
static std::mutex mutex;
std::lock_guard<std::mutex> l(mutex);
if (!executor_) {
// We have up to 20 threads and 16 threads are used for reading so
// there are some threads left over for SSD background write.
executor_ = std::make_unique<folly::IOThreadPoolExecutor>(20);
}
return executor_.get();
}

void initializeDataFiles();

void initializeCache();

void initializeInputs();

void readInParallel();

void reset();

uint64_t read(uint32_t fileIdx, uint64_t offset, uint64_t length);

FuzzerGenerator rng_;
size_t currentSeed_{0};
std::shared_ptr<memory::MemoryPool> rootPool_{
memory::memoryManager()->addRootPool()};
std::shared_ptr<memory::MemoryPool> pool_{rootPool_->addLeafChild("leaf")};
std::vector<StringIdLease> fileIds_;
std::vector<size_t> fileSizes_;
std::vector<StringIdLease> groupIds_;
std::vector<std::shared_ptr<LocalReadFile>> readFiles_;
std::vector<std::unique_ptr<CachedBufferedInput>> inputs_;
std::shared_ptr<exec::test::TempDirectoryPath> tempDirectory_;
std::unique_ptr<memory::MemoryManager> manager_;
memory::MemoryAllocator* allocator_{};
std::unique_ptr<folly::IOThreadPoolExecutor> executor_;
std::shared_ptr<AsyncDataCache> cache_;
};

template <typename T>
bool isDone(size_t i, T startTime) {
if (FLAGS_duration_sec > 0) {
const std::chrono::duration<double> elapsed =
std::chrono::system_clock::now() - startTime;
return elapsed.count() >= FLAGS_duration_sec;
}
return i >= FLAGS_steps;
}

// Calls func on 'numThreads' in parallel.
template <typename Func>
void runThreads(int32_t numThreads, Func func) {
std::vector<std::thread> threads;
threads.reserve(numThreads);
for (int32_t i = 0; i < numThreads; ++i) {
threads.push_back(std::thread([i, func]() { func(i); }));
}
for (auto& thread : threads) {
thread.join();
}
}

AsyncDataCacheFuzzer::AsyncDataCacheFuzzer(size_t initialSeed) {
seed(initialSeed);
}

void AsyncDataCacheFuzzer::initializeDataFiles() {
tempDirectory_ = exec::test::TempDirectoryPath::create();

// Create files with random sizes.
if (fileIds_.empty()) {
for (auto i = 0; i < FLAGS_num_files; ++i) {
auto fileName = fmt::format("{}/file_{}", tempDirectory_->getPath(), i);

const size_t fileSize = boost::random::uniform_int_distribution<uint64_t>(
FLAGS_min_file_bytes, FLAGS_max_file_bytes)(rng_);
auto writeFile = LocalWriteFile(fileName);
size_t writtenSize = 0;
int32_t offset = 0;
while (writtenSize < fileSize) {
size_t chunkSize = std::min(
fileSize - writtenSize,
size_t(4 << 20)); // Write in chunks of 4MB
auto buffer = folly::IOBuf::create(chunkSize);
buffer->append(chunkSize);
// Fill buffer with data.
std::generate_n(
buffer->writableData(), chunkSize, [&offset]() -> uint8_t {
return static_cast<uint8_t>(offset++ % 256);
});
writeFile.append(std::move(buffer));
writtenSize += chunkSize;
}
writeFile.close();

fileIds_.emplace_back(fileIds(), fileName);
readFiles_.emplace_back(std::make_shared<LocalReadFile>(fileName));
fileSizes_.emplace_back(fileSize);
}
}
}

void AsyncDataCacheFuzzer::initializeCache() {
std::unique_ptr<SsdCache> ssdCache;
if (FLAGS_ssd_cache_bytes > 0) {
SsdCache::Config config(
fmt::format("{}/cache", tempDirectory_->getPath()),
FLAGS_ssd_cache_bytes,
FLAGS_num_shards,
executor(),
FLAGS_ssd_checkpoint_interval_bytes,
false,
FLAGS_enable_checksum,
FLAGS_enable_read_verification);
ssdCache = std::make_unique<SsdCache>(config);
}

memory::MemoryManagerOptions options;
options.useMmapAllocator = true;
options.allocatorCapacity = FLAGS_memory_cache_bytes;
options.arbitratorCapacity = FLAGS_memory_cache_bytes;
options.arbitratorReservedCapacity = 0;
options.trackDefaultUsage = true;
manager_ = std::make_unique<memory::MemoryManager>(options);
allocator_ = dynamic_cast<memory::MmapAllocator*>(manager_->allocator());

// TODO: Test different ssd write behaviors with AsyncDataCache::Options.
cache_ = AsyncDataCache::create(allocator_, std::move(ssdCache), {});
}

void AsyncDataCacheFuzzer::initializeInputs() {
auto readOptions = io::ReaderOptions(pool_.get());
auto tracker = std::make_shared<ScanTracker>(
"testTracker", nullptr, io::ReaderOptions::kDefaultLoadQuantum);
auto ioStats = std::make_shared<IoStatistics>();
inputs_.reserve(FLAGS_num_files);
for (auto i = 0; i < FLAGS_num_files; ++i) {
groupIds_.emplace_back(
fileIds(), fmt::format("group{}", i / FLAGS_num_files_in_group));
inputs_.emplace_back(std::make_unique<CachedBufferedInput>(
readFiles_[i], // NOLINT
MetricsLog::voidLog(),
fileIds_[i].id(), // NOLINT
cache_.get(),
tracker,
groupIds_[i].id(),
ioStats,
executor_.get(),
readOptions));
}
}

void AsyncDataCacheFuzzer::readInParallel() {
runThreads(FLAGS_num_threads, [&](int32_t /*i*/) {
auto numReads = boost::random::uniform_int_distribution<uint32_t>(
10, FLAGS_max_num_reads)(rng_);
while (numReads-- > 0) {
const auto fileIdx = boost::random::uniform_int_distribution<uint32_t>(
0, FLAGS_num_files - 1)(rng_);
// Align read offset at N * kOffsetIntervalBytes.
const auto offset =
boost::random::uniform_int_distribution<uint64_t>(
0, fileSizes_[fileIdx] / FLAGS_offset_interval_bytes)(rng_) *
FLAGS_offset_interval_bytes;
const auto length = boost::random::uniform_int_distribution<uint64_t>(
0, fileSizes_[fileIdx] - offset)(rng_);
if (length > 0) {
read(fileIdx, offset, length);
}
}
});
}

void AsyncDataCacheFuzzer::reset() {
cache_->shutdown();
cache_->ssdCache()->testingWaitForWriteToFinish();
executor_->join();
executor_.reset();
fileIds_.clear();
fileSizes_.clear();
groupIds_.clear();
readFiles_.clear();
inputs_.clear();
cache_.reset();
manager_.reset();
tempDirectory_.reset();
fileIds().testingReset();
}

uint64_t
AsyncDataCacheFuzzer::read(uint32_t fileIdx, uint64_t offset, uint64_t length) {
// TODO: Faulty injection.
auto stream = inputs_[fileIdx]->read(offset, length, LogType::TEST); // NOLINT
const void* buffer;
int32_t size;
int32_t numRead = 0;
while (numRead < length) {
if (stream->Next(&buffer, &size)) {
numRead += size;
} else {
break;
}
}
return numRead;
}

void AsyncDataCacheFuzzer::go() {
VELOX_CHECK(
FLAGS_steps > 0 || FLAGS_duration_sec > 0,
"Either --steps or --duration_sec needs to be greater than zero.")

auto startTime = std::chrono::system_clock::now();
size_t iteration = 0;

while (!isDone(iteration, startTime)) {
LOG(INFO) << "==============================> Started iteration "
<< iteration << " (seed: " << currentSeed_ << ")";

initializeDataFiles();

initializeCache();

initializeInputs();

readInParallel();

// TODO: Test cache restart.

LOG(INFO) << cache_->refreshStats().toString();

reset();

LOG(INFO) << "==============================> Done with iteration "
<< iteration;

reSeed();
++iteration;
}
}

} // namespace

void asyncDataCacheFuzzer(size_t seed) {
auto asyncDataCacheFuzzer = AsyncDataCacheFuzzer(seed);
asyncDataCacheFuzzer.go();
}
} // namespace facebook::velox::exec::test
Loading

0 comments on commit 7aeb378

Please sign in to comment.