From d26cb1df9cd4d3f86146224e228cc0798229da10 Mon Sep 17 00:00:00 2001 From: Zac Wen Date: Wed, 26 Jun 2024 22:45:13 -0700 Subject: [PATCH] Add fuzzer for async data cache (#10244) Summary: Introduce a basic fuzzer for the async data cache. Each iteration involves: 1. Creating a set of data files of varying sizes. 2. Setting up the async data cache with an SSD using a specified configuration. 3. Performing parallel random reads from these data files. In the initial setup, most of the parameters are defined as gflags and we'll decide later which parameters should be fuzzed during the tests. Pull Request resolved: https://github.com/facebookincubator/velox/pull/10244 Reviewed By: xiaoxmeng Differential Revision: D58715904 Pulled By: zacw7 fbshipit-source-id: d93f9ba49aa3c7f925074e79a5bfeb4f3e84c8a1 --- velox/common/caching/AsyncDataCache.cpp | 20 +- .../testing/async-data-cache-fuzzer.rst | 63 +++ velox/exec/fuzzer/CMakeLists.txt | 5 + velox/exec/fuzzer/CacheFuzzer.cpp | 364 ++++++++++++++++++ velox/exec/fuzzer/CacheFuzzer.h | 24 ++ velox/exec/fuzzer/CacheFuzzerRunner.h | 45 +++ velox/exec/tests/CMakeLists.txt | 6 + velox/exec/tests/CacheFuzzerTest.cpp | 44 +++ 8 files changed, 564 insertions(+), 7 deletions(-) create mode 100644 velox/docs/develop/testing/async-data-cache-fuzzer.rst create mode 100644 velox/exec/fuzzer/CacheFuzzer.cpp create mode 100644 velox/exec/fuzzer/CacheFuzzer.h create mode 100644 velox/exec/fuzzer/CacheFuzzerRunner.h create mode 100644 velox/exec/tests/CacheFuzzerTest.cpp diff --git a/velox/common/caching/AsyncDataCache.cpp b/velox/common/caching/AsyncDataCache.cpp index a171029b7202..ce6e97b0aa27 100644 --- a/velox/common/caching/AsyncDataCache.cpp +++ b/velox/common/caching/AsyncDataCache.cpp @@ -25,6 +25,15 @@ #include "velox/common/base/SuccinctPrinter.h" #include "velox/common/caching/FileIds.h" +#define VELOX_CACHE_ERROR(errorMessage) \ + _VELOX_THROW( \ + ::facebook::velox::VeloxRuntimeError, \ + ::facebook::velox::error_source::kErrorSourceRuntime.c_str(), \ + ::facebook::velox::error_code::kNoCacheSpace.c_str(), \ + /* isRetriable */ true, \ + "{}", \ + errorMessage); + namespace facebook::velox::cache { using memory::MachinePageCount; @@ -120,13 +129,10 @@ void AsyncDataCacheEntry::initialize(FileCacheKey key) { } else { // No memory to cover 'this'. release(); - _VELOX_THROW( - VeloxRuntimeError, - error_source::kErrorSourceRuntime.c_str(), - error_code::kNoCacheSpace.c_str(), - /* isRetriable */ true, - "Failed to allocate {} bytes for cache", - size_); + VELOX_CACHE_ERROR(fmt::format( + "Failed to allocate {} bytes for cache: {}", + size_, + cache->allocator()->getAndClearFailureMessage())); } } } diff --git a/velox/docs/develop/testing/async-data-cache-fuzzer.rst b/velox/docs/develop/testing/async-data-cache-fuzzer.rst new file mode 100644 index 000000000000..1259064724d5 --- /dev/null +++ b/velox/docs/develop/testing/async-data-cache-fuzzer.rst @@ -0,0 +1,63 @@ +============ +Cache Fuzzer +============ + +Cache fuzzer is designed to test the correctness and the reliability of the +in-memory async data cache and the durable local SSD cache, and their +interactions such as staging from async data cache to SSD cache, and load the +cache miss data from SSD cache into async data cache. + +During each iteration, the fuzzer performs the following actions steps by steps: +1. Creating a set of data files on local file system with varying sizes as source data files. +2. Setting up the async data cache with and without SSD using a specific configuration. +3. Performing parallel random reads from the source data files created in step1. + +How to run +---------- + +Use velox_cache_fuzzer_test binary to run cache fuzzer: + +:: + + velox/exec/tests/velox_cache_fuzzer_test + +By default, the fuzzer will go through 10 interations. Use --steps +or --duration-sec flag to run fuzzer for longer. Use --seed to +reproduce fuzzer failures. + +Here is a full list of supported command line arguments. + +* ``–-steps``: How many iterations to run. Each iteration generates and + evaluates one tale writer plan. Default is 10. + +* ``–-duration_sec``: For how long to run in seconds. If both ``-–steps`` + and ``-–duration_sec`` are specified, –duration_sec takes precedence. + +* ``–-seed``: The seed to generate random expressions and input vectors with. + +* ``–-num_threads``: Number of read threads. + +* ``–-read_iteration_sec``: For how long each read thread should run (in seconds). + +* ``–-num_source_files``: Number of data files to be created. + +* ``–-min_source_file_bytes``: Minimum source file size in bytes. + +* ``–-max_source_file_bytes``: Maximum source file size in bytes. + +* ``–-memory_cache_bytes``: Memory cache size in bytes. + +* ``–-ssd_cache_bytes``: Ssd cache size in bytes. + +* ``–-num_ssd_cache_shards``: Number of SSD cache shards. + +* ``–-ssd_checkpoint_interval_bytes``: Checkpoint after every + ``--ssd_checkpoint_interval_bytes``/``--num_ssd_cache_shards`` written into + each file. 0 means no checkpointing. + +* ``–-enable_checksum``: Enable checksum write to SSD. + +* ``–-enable_checksum_read_verification``: Enable checksum read verification + from SSD. + +If running from CLion IDE, add ``--logtostderr=1`` to see the full output. diff --git a/velox/exec/fuzzer/CMakeLists.txt b/velox/exec/fuzzer/CMakeLists.txt index d481f1076e82..64fa24bb6b5e 100644 --- a/velox/exec/fuzzer/CMakeLists.txt +++ b/velox/exec/fuzzer/CMakeLists.txt @@ -104,3 +104,8 @@ target_link_libraries( velox_expression_test_utility velox_functions_prestosql velox_aggregates) + +add_library(velox_cache_fuzzer CacheFuzzer.cpp) + +target_link_libraries(velox_cache_fuzzer velox_dwio_common velox_temp_path + velox_vector_test_lib) diff --git a/velox/exec/fuzzer/CacheFuzzer.cpp b/velox/exec/fuzzer/CacheFuzzer.cpp new file mode 100644 index 000000000000..fa7ab89b47f7 --- /dev/null +++ b/velox/exec/fuzzer/CacheFuzzer.cpp @@ -0,0 +1,364 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/exec/fuzzer/CacheFuzzer.h" + +#include + +#include +#include +#include "velox/common/caching/FileIds.h" +#include "velox/common/caching/SsdCache.h" +#include "velox/common/file/FileSystems.h" +#include "velox/common/memory/Memory.h" +#include "velox/common/memory/MmapAllocator.h" +#include "velox/dwio/common/CachedBufferedInput.h" +#include "velox/exec/tests/utils/TempDirectoryPath.h" +#include "velox/vector/fuzzer/VectorFuzzer.h" + +DEFINE_int32(steps, 10, "Number of plans to generate and test."); + +DEFINE_int32( + duration_sec, + 0, + "For how long it should run (in seconds). If zero, " + "it executes exactly --steps iterations and exits."); + +DEFINE_int32(num_threads, 16, "Number of read threads."); + +DEFINE_int32( + read_iteration_sec, + 10, + "For how long each read thread should run (in seconds)."); + +DEFINE_int32(num_source_files, 8, "Number of data files to be created."); + +DEFINE_uint64( + min_source_file_bytes, + 32 << 20, + "Minimum source file size in bytes."); + +DEFINE_uint64( + max_source_file_bytes, + 64 << 20, + "Maximum source file size in bytes."); + +DEFINE_int64(memory_cache_bytes, 32 << 20, "Memory cache size in bytes."); + +DEFINE_uint64(ssd_cache_bytes, 128 << 20, "Ssd cache size in bytes."); + +DEFINE_int32(num_ssd_cache_shards, 4, "Number of SSD cache shards."); + +DEFINE_uint64( + ssd_checkpoint_interval_bytes, + 64 << 20, + "Checkpoint after every 'ssd_checkpoint_interval_bytes'/'num_ssd_cache_shards', " + "written into each file. 0 means no checkpointing."); + +DEFINE_bool(enable_checksum, true, "Enable checksum write to SSD."); + +DEFINE_bool( + enable_checksum_read_verification, + true, + "Enable checksum read verification from SSD."); + +using namespace facebook::velox::cache; +using namespace facebook::velox::dwio::common; + +namespace facebook::velox::exec::test { +namespace { + +class CacheFuzzer { + public: + explicit CacheFuzzer(size_t seed); + + void go(); + + private: + void seed(size_t seed) { + currentSeed_ = seed; + rng_.seed(currentSeed_); + } + + void reSeed() { + seed(rng_()); + } + + void initSourceDataFiles(); + + void initializeCache(); + + void initializeInputs(); + + void readCache(); + + void reset(); + + void read(uint32_t fileIdx, int32_t fragmentIdx); + + FuzzerGenerator rng_; + size_t currentSeed_{0}; + std::shared_ptr rootPool_{ + memory::memoryManager()->addRootPool()}; + std::shared_ptr pool_{rootPool_->addLeafChild("leaf")}; + std::shared_ptr fs_; + std::vector fileNames_; + std::vector fileIds_; + std::vector fileSizes_; + // The file fragments used to perform random reads by different threads. + // NOTE: the production file reader reads from the specific offset from a file + // instead of random location for cache reuse. + std::vector>> fileFragments_; + std::vector> inputs_; + std::shared_ptr sourceDataDir_; + std::unique_ptr memoryManager_; + std::unique_ptr executor_; + std::shared_ptr cache_; +}; + +template +bool isDone(size_t i, T startTime) { + if (FLAGS_duration_sec > 0) { + const std::chrono::duration elapsed = + std::chrono::system_clock::now() - startTime; + return elapsed.count() >= FLAGS_duration_sec; + } + return i >= FLAGS_steps; +} + +CacheFuzzer::CacheFuzzer(size_t initialSeed) { + seed(initialSeed); + filesystems::registerLocalFileSystem(); +} + +void CacheFuzzer::initSourceDataFiles() { + sourceDataDir_ = exec::test::TempDirectoryPath::create(); + fs_ = filesystems::getFileSystem(sourceDataDir_->getPath(), nullptr); + + // Create files with random sizes. + if (fileIds_.empty()) { + for (auto i = 0; i < FLAGS_num_source_files; ++i) { + const auto fileName = + fmt::format("{}/file_{}", sourceDataDir_->getPath(), i); + const size_t fileSize = boost::random::uniform_int_distribution( + FLAGS_min_source_file_bytes, FLAGS_max_source_file_bytes)(rng_); + auto writeFile = fs_->openFileForWrite(fileName); + size_t writtenSize = 0; + int32_t offset = 0; + while (writtenSize < fileSize) { + const size_t chunkSize = std::min( + fileSize - writtenSize, + size_t(4 << 20)); // Write in chunks of 4MB + auto buffer = folly::IOBuf::create(chunkSize); + buffer->append(chunkSize); + // Fill buffer with data. + std::generate_n( + buffer->writableData(), chunkSize, [&offset]() -> uint8_t { + return static_cast(offset++ % 256); + }); + writeFile->append(std::move(buffer)); + writtenSize += chunkSize; + } + writeFile->close(); + + fileNames_.emplace_back(fileName); + fileIds_.emplace_back(fileIds(), fileName); + fileSizes_.emplace_back(fileSize); + } + } +} + +void CacheFuzzer::initializeCache() { + // We have up to 20 threads and 16 threads are used for reading so + // there are some threads left over for SSD background write. + executor_ = std::make_unique(20); + + std::unique_ptr ssdCache; + if (FLAGS_ssd_cache_bytes > 0) { + SsdCache::Config config( + fmt::format("{}/cache", sourceDataDir_->getPath()), + FLAGS_ssd_cache_bytes, + FLAGS_num_ssd_cache_shards, + executor_.get(), + FLAGS_ssd_checkpoint_interval_bytes, + false, + FLAGS_enable_checksum, + FLAGS_enable_checksum_read_verification); + ssdCache = std::make_unique(config); + } + + memory::MemoryManagerOptions options; + options.useMmapAllocator = true; + options.allocatorCapacity = FLAGS_memory_cache_bytes; + options.arbitratorCapacity = FLAGS_memory_cache_bytes; + options.arbitratorReservedCapacity = 0; + options.trackDefaultUsage = true; + memoryManager_ = std::make_unique(options); + + // TODO: Test different ssd write behaviors with AsyncDataCache::Options. + cache_ = AsyncDataCache::create( + dynamic_cast(memoryManager_->allocator()), + std::move(ssdCache), + {}); +} + +void CacheFuzzer::initializeInputs() { + const auto readOptions = io::ReaderOptions(pool_.get()); + auto tracker = std::make_shared( + "testTracker", nullptr, 256 << 10 /*256KB*/); + auto ioStats = std::make_shared(); + inputs_.reserve(FLAGS_num_source_files); + for (auto i = 0; i < FLAGS_num_source_files; ++i) { + // Initialize buffered input. + auto readFile = fs_->openFileForRead(fileNames_[i]); + auto const withExecutor = !folly::Random::oneIn(3, rng_); + inputs_.emplace_back(std::make_unique( + std::move(readFile), + MetricsLog::voidLog(), + fileIds_[i].id(), // NOLINT + cache_.get(), + tracker, + fileIds_[i].id(), // NOLINT + ioStats, + withExecutor ? executor_.get() : nullptr, + readOptions)); + + // Divide file into fragments. + std::vector> fragments; + int32_t offset = 0; + while (offset < fileSizes_[i]) { + const auto length = boost::random::uniform_int_distribution( + 1, fileSizes_[i] - offset)(rng_); + fragments.emplace_back(offset, length); + offset += length; + } + fileFragments_.emplace_back(std::move(fragments)); + } +} + +void CacheFuzzer::readCache() { + std::atomic_bool readStopped{false}; + std::vector threads; + threads.reserve(FLAGS_num_threads); + for (int32_t i = 0; i < FLAGS_num_threads; ++i) { + threads.emplace_back([&]() { + while (!readStopped) { + const auto fileIdx = boost::random::uniform_int_distribution( + 0, FLAGS_num_source_files - 1)(rng_); + const auto fragmentIdx = + boost::random::uniform_int_distribution( + 0, fileFragments_[fileIdx].size() - 1)(rng_); + read(fileIdx, fragmentIdx); + } + }); + } + std::this_thread::sleep_for(std::chrono::seconds(FLAGS_read_iteration_sec)); + readStopped = true; + for (auto& thread : threads) { + thread.join(); + } +} + +void CacheFuzzer::reset() { + cache_->shutdown(); + cache_->ssdCache()->testingWaitForWriteToFinish(); + executor_->join(); + executor_.reset(); + fileNames_.clear(); + fileIds_.clear(); + fileSizes_.clear(); + fileFragments_.clear(); + inputs_.clear(); + fs_.reset(); + cache_.reset(); + memoryManager_.reset(); + sourceDataDir_.reset(); + fileIds().testingReset(); +} + +void CacheFuzzer::read(uint32_t fileIdx, int32_t fragmentIdx) { + // TODO: Faulty injection. + const auto [offset, length] = fileFragments_[fileIdx][fragmentIdx]; + auto stream = inputs_[fileIdx]->read(offset, length, LogType::TEST); + const void* buffer; + int32_t size; + int32_t numRead = 0; + while (numRead < length) { + try { + if (stream->Next(&buffer, &size)) { + if (folly::Random::oneIn(4)) { + // Verify read content. + const auto* data = reinterpret_cast(buffer); + for (int32_t sequence = 0; sequence < size; ++sequence) { + ASSERT_EQ(data[sequence], (offset + numRead + sequence) % 256); + } + } + numRead += size; + } else { + break; + } + } catch (const VeloxException& e) { + if (e.errorCode() == error_code::kNoCacheSpace.c_str()) { + LOG(WARNING) << e.what(); + } else { + std::rethrow_exception(std::current_exception()); + } + } + } + ASSERT_EQ(numRead, length); +} + +void CacheFuzzer::go() { + VELOX_CHECK( + FLAGS_steps > 0 || FLAGS_duration_sec > 0, + "Either --steps or --duration_sec needs to be greater than zero.") + + auto startTime = std::chrono::system_clock::now(); + size_t iteration = 0; + + while (!isDone(iteration, startTime)) { + LOG(INFO) << "==============================> Started iteration " + << iteration << " (seed: " << currentSeed_ << ")"; + + initSourceDataFiles(); + + initializeCache(); + + initializeInputs(); + + readCache(); + + // TODO: Test cache restart. + + LOG(INFO) << cache_->refreshStats().toString(); + + reset(); + + LOG(INFO) << "==============================> Done with iteration " + << iteration; + + reSeed(); + ++iteration; + } +} + +} // namespace + +void cacheFuzzer(size_t seed) { + auto cacheFuzzer = CacheFuzzer(seed); + cacheFuzzer.go(); +} +} // namespace facebook::velox::exec::test diff --git a/velox/exec/fuzzer/CacheFuzzer.h b/velox/exec/fuzzer/CacheFuzzer.h new file mode 100644 index 000000000000..985735ccdbf3 --- /dev/null +++ b/velox/exec/fuzzer/CacheFuzzer.h @@ -0,0 +1,24 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include + +namespace facebook::velox::exec::test { +/// Runs the async data cache fuzzer. +/// @param seed Random seed - Pass the same seed for reproducibility. +void cacheFuzzer(size_t seed); +} // namespace facebook::velox::exec::test diff --git a/velox/exec/fuzzer/CacheFuzzerRunner.h b/velox/exec/fuzzer/CacheFuzzerRunner.h new file mode 100644 index 000000000000..cde1e4f368e9 --- /dev/null +++ b/velox/exec/fuzzer/CacheFuzzerRunner.h @@ -0,0 +1,45 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include +#include +#include +#include +#include +#include + +#include "velox/common/file/FileSystems.h" +#include "velox/connectors/hive/HiveConnector.h" +#include "velox/exec/fuzzer/CacheFuzzer.h" +#include "velox/exec/fuzzer/FuzzerUtil.h" +#include "velox/expression/fuzzer/FuzzerToolkit.h" +#include "velox/parse/TypeResolver.h" +#include "velox/serializers/PrestoSerializer.h" +#include "velox/vector/fuzzer/VectorFuzzer.h" + +namespace facebook::velox::exec::test { + +class CacheRunner { + public: + static int run(size_t seed) { + filesystems::registerLocalFileSystem(); + cacheFuzzer(seed); + return RUN_ALL_TESTS(); + } +}; + +} // namespace facebook::velox::exec::test diff --git a/velox/exec/tests/CMakeLists.txt b/velox/exec/tests/CMakeLists.txt index 0331acec2281..fa7a28cbe90a 100644 --- a/velox/exec/tests/CMakeLists.txt +++ b/velox/exec/tests/CMakeLists.txt @@ -222,6 +222,12 @@ add_executable(velox_memory_arbitration_fuzzer_test target_link_libraries(velox_memory_arbitration_fuzzer_test velox_memory_arbitration_fuzzer gtest gtest_main) +# Cache Fuzzer +add_executable(velox_cache_fuzzer_test CacheFuzzerTest.cpp) + +target_link_libraries(velox_cache_fuzzer_test velox_cache_fuzzer + velox_fuzzer_util gtest gtest_main) + add_executable(velox_exchange_fuzzer_test ExchangeFuzzer.cpp) target_link_libraries( diff --git a/velox/exec/tests/CacheFuzzerTest.cpp b/velox/exec/tests/CacheFuzzerTest.cpp new file mode 100644 index 000000000000..c66b15eaf2a0 --- /dev/null +++ b/velox/exec/tests/CacheFuzzerTest.cpp @@ -0,0 +1,44 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include + +#include "velox/exec/fuzzer/CacheFuzzerRunner.h" + +DEFINE_int64( + seed, + 0, + "Initial seed for random number generator used to reproduce previous " + "results (0 means start with random seed)."); + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + + // Calls common init functions in the necessary order, initializing + // singletons, installing proper signal handlers for better debugging + // experience, and initialize glog and gflags. + folly::Init init(&argc, &argv); + + facebook::velox::memory::MemoryManager::initialize({}); + + size_t initialSeed = FLAGS_seed == 0 ? std::time(nullptr) : FLAGS_seed; + + using Runner = facebook::velox::exec::test::CacheRunner; + + return Runner::run(initialSeed); +}