From 7425c6f86beae8f1a0324b8bd710299dec1adea2 Mon Sep 17 00:00:00 2001 From: Ke Date: Thu, 6 Jun 2024 01:58:59 -0700 Subject: [PATCH] Add bucket verification in TableWriter Fuzzer (#10039) Summary: Pull Request resolved: https://github.com/facebookincubator/velox/pull/10039 Reviewed By: xiaoxmeng Differential Revision: D58141952 Pulled By: kewang1024 fbshipit-source-id: e80a93029e6df947354af68818ce0c1f467149eb --- velox/docs/develop/testing/writer-fuzzer.rst | 7 +- velox/exec/fuzzer/PrestoQueryRunner.cpp | 20 ++- velox/exec/fuzzer/WriterFuzzer.cpp | 141 ++++++++++++++----- 3 files changed, 127 insertions(+), 41 deletions(-) diff --git a/velox/docs/develop/testing/writer-fuzzer.rst b/velox/docs/develop/testing/writer-fuzzer.rst index 3ce3c72dfcaee..20dbbba369b7c 100644 --- a/velox/docs/develop/testing/writer-fuzzer.rst +++ b/velox/docs/develop/testing/writer-fuzzer.rst @@ -2,17 +2,18 @@ Writer Fuzzer ============= -Writer fuzzer tests table write plan with up to 5 regular columns and -up to 3 partition keys. +Writer fuzzer tests table write plan with up to 5 regular columns, up to +3 partition keys and up to 3 bucket columns. At each iteration, fuzzer randomly generate a table write plan with different -table properties, as of now, only support partitioned and unpartitioned table. +table properties including un-partitioned and partitioned, non-bucketed and bucketed. The fuzzer then generates inputs and runs the query plan and compares the results with PrestoDB. As of now, we compare: 1. How many rows were written. 2. Output directories have the same directory layout and hierarchy. +3. Same data were written by velox and prestoDB. How to run ---------- diff --git a/velox/exec/fuzzer/PrestoQueryRunner.cpp b/velox/exec/fuzzer/PrestoQueryRunner.cpp index f77d8d4775a6b..26998890c1bfe 100644 --- a/velox/exec/fuzzer/PrestoQueryRunner.cpp +++ b/velox/exec/fuzzer/PrestoQueryRunner.cpp @@ -541,8 +541,8 @@ std::optional PrestoQueryRunner::toSql( // Returns a CTAS sql with specified table properties from TableWriteNode, // example sql: - // CREATE TABLE tmp_write WITH (PARTITIONED_BY = ARRAY['p0']) - // AS SELECT * FROM tmp + // CREATE TABLE tmp_write WITH (PARTITIONED_BY = ARRAY['p0'], BUCKETED_COUNT = + // 20, BUCKETED_BY = ARRAY['b0', 'b1']) AS SELECT * FROM tmp std::stringstream sql; sql << "CREATE TABLE tmp_write"; std::vector partitionKeys; @@ -558,7 +558,21 @@ std::optional PrestoQueryRunner::toSql( appendComma(i, sql); sql << "'" << partitionKeys[i] << "'"; } - sql << "])"; + sql << "]"; + + if (insertTableHandle->bucketProperty() != nullptr) { + const auto bucketCount = + insertTableHandle->bucketProperty()->bucketCount(); + const auto bucketColumns = + insertTableHandle->bucketProperty()->bucketedBy(); + sql << ", BUCKET_COUNT = " << bucketCount << ", BUCKETED_BY = ARRAY["; + for (int i = 0; i < bucketColumns.size(); ++i) { + appendComma(i, sql); + sql << "'" << bucketColumns[i] << "'"; + } + sql << "]"; + } + sql << ")"; } sql << " AS SELECT * FROM tmp"; diff --git a/velox/exec/fuzzer/WriterFuzzer.cpp b/velox/exec/fuzzer/WriterFuzzer.cpp index 5e65a07915d0f..82056f8b779b8 100644 --- a/velox/exec/fuzzer/WriterFuzzer.cpp +++ b/velox/exec/fuzzer/WriterFuzzer.cpp @@ -108,6 +108,8 @@ class WriterFuzzer { void verifyWriter( const std::vector& input, + int32_t bucketCount, + const std::vector& bucketColumns, const std::vector& partitionKeys, const std::string& outputDirectoryPath); @@ -122,13 +124,15 @@ class WriterFuzzer { // Query Presto to find out table's location on disk. std::string getReferenceOutputDirectoryPath(int32_t layers); - // Compares if two directories have same partitions - void comparePartitions( + // Compares if two directories have same partitions and each partition has + // same number of buckets. + void comparePartitionAndBucket( const std::string& outputDirectoryPath, - const std::string& referenceOutputDirectoryPath); + const std::string& referenceOutputDirectoryPath, + int32_t bucketCount); - // Returns all the partition names in tableDirectoryPath. - std::set getPartitionNames( + // Returns all the partition name and how many files in each partition. + std::map getPartitionNameAndFilecount( const std::string& tableDirectoryPath); const std::vector kRegularColumnTypes_{ @@ -141,6 +145,17 @@ class WriterFuzzer { VARBINARY(), TIMESTAMP(), }; + // Supported bucket column types: + // https://github.com/prestodb/presto/blob/master/presto-hive/src/main/java/com/facebook/presto/hive/HiveBucketing.java#L142 + const std::vector kSupportedBucketColumnTypes_{ + BOOLEAN(), + TINYINT(), + SMALLINT(), + INTEGER(), + BIGINT(), + VARCHAR(), + TIMESTAMP(), + }; // Supported partition key column types // According to VectorHasher::typeKindSupportsValueIds and // https://github.com/prestodb/presto/blob/master/presto-hive/src/main/java/com/facebook/presto/hive/HiveUtil.java#L575 @@ -215,11 +230,22 @@ void WriterFuzzer::go() { std::vector names; std::vector types; + std::vector bucketColumns; std::vector partitionKeys; generateColumns(5, "c", kRegularColumnTypes_, 2, names, types); - const auto partitionOffset = names.size(); + + // 50% of times test bucketed write. + int32_t bucketCount = 0; + if (vectorFuzzer_.coinToss(0.5)) { + bucketColumns = generateColumns( + 5, "b", kSupportedBucketColumnTypes_, 1, names, types); + bucketCount = + boost::random::uniform_int_distribution(1, 3)(rng_); + } + // 50% of times test partitioned write. + const auto partitionOffset = names.size(); if (vectorFuzzer_.coinToss(0.5)) { partitionKeys = generateColumns(3, "p", kPartitionKeyTypes_, 1, names, types); @@ -227,7 +253,12 @@ void WriterFuzzer::go() { auto input = generateInputData(names, types, partitionOffset); auto tempDirPath = exec::test::TempDirectoryPath::create(); - verifyWriter(input, partitionKeys, tempDirPath->getPath()); + verifyWriter( + input, + bucketCount, + bucketColumns, + partitionKeys, + tempDirPath->getPath()); LOG(INFO) << "==============================> Done with iteration " << iteration++; @@ -263,9 +294,8 @@ std::vector WriterFuzzer::generateInputData( auto inputType = ROW(std::move(names), std::move(types)); std::vector input; - // For partition keys, limit the distinct value to 4 to avoid exceeding - // partition number limit of 100. Since we could have up to 3 partition - // keys, it would generate up to 64 partitions. + // For partition keys, limit the distinct value to 4. Since we could have up + // to 3 partition keys, it would generate up to 64 partitions. std::vector partitionValues; for (auto i = partitionOffset; i < inputType->size(); ++i) { partitionValues.push_back(vectorFuzzer_.fuzz(inputType->childAt(i), 4)); @@ -292,12 +322,16 @@ std::vector WriterFuzzer::generateInputData( void WriterFuzzer::verifyWriter( const std::vector& input, + int32_t bucketCount, + const std::vector& bucketColumns, const std::vector& partitionKeys, const std::string& outputDirectoryPath) { - const auto plan = PlanBuilder() - .values(input) - .tableWrite(outputDirectoryPath, partitionKeys) - .planNode(); + const auto plan = + PlanBuilder() + .values(input) + .tableWrite( + outputDirectoryPath, partitionKeys, bucketCount, bucketColumns) + .planNode(); const auto maxDrivers = boost::random::uniform_int_distribution(1, 16)(rng_); @@ -325,11 +359,12 @@ void WriterFuzzer::verifyWriter( assertEqualResults(expectedResult, plan->outputType(), {result}), "Velox and reference DB results don't match"); - // 2. Verifies directory layout. + // 2. Verifies directory layout for partitioned (bucketed) table. if (!partitionKeys.empty()) { const auto referencedOutputDirectoryPath = getReferenceOutputDirectoryPath(partitionKeys.size()); - comparePartitions(outputDirectoryPath, referencedOutputDirectoryPath); + comparePartitionAndBucket( + outputDirectoryPath, referencedOutputDirectoryPath, bucketCount); } // 3. Verifies data itself. @@ -360,7 +395,12 @@ RowVectorPtr WriterFuzzer::execute( if (!splits.empty()) { builder.splits(splits); } - return builder.maxDrivers(maxDrivers).copyResults(pool_.get()); + return builder.maxDrivers(maxDrivers) + .connectorSessionProperty( + kHiveConnectorId, + connector::hive::HiveConfig::kMaxPartitionsPerWritersSession, + "400") + .copyResults(pool_.get()); } RowVectorPtr WriterFuzzer::veloxToPrestoResult(const RowVectorPtr& result) { @@ -390,32 +430,51 @@ std::string WriterFuzzer::getReferenceOutputDirectoryPath(int32_t layers) { return tableDirectoryPath.string(); } -void WriterFuzzer::comparePartitions( +void WriterFuzzer::comparePartitionAndBucket( const std::string& outputDirectoryPath, - const std::string& referenceOutputDirectoryPath) { - const auto partitions = getPartitionNames(outputDirectoryPath); - LOG(INFO) << "Velox written partitions:" << std::endl; - for (const std::string& partition : partitions) { - LOG(INFO) << partition << std::endl; + const std::string& referenceOutputDirectoryPath, + int32_t bucketCount) { + LOG(INFO) << "Velox output directory:" << outputDirectoryPath << std::endl; + const auto partitionNameAndFileCount = + getPartitionNameAndFilecount(outputDirectoryPath); + LOG(INFO) << "Partitions and file count:" << std::endl; + std::vector partitionNames; + partitionNames.reserve(partitionNameAndFileCount.size()); + for (const auto& i : partitionNameAndFileCount) { + LOG(INFO) << i.first << ":" << i.second << std::endl; + partitionNames.emplace_back(i.first); } - const auto referencedPartitions = - getPartitionNames(referenceOutputDirectoryPath); - LOG(INFO) << "Presto written partitions:" << std::endl; - for (const std::string& partition : referencedPartitions) { - LOG(INFO) << partition << std::endl; + LOG(INFO) << "Presto output directory:" << referenceOutputDirectoryPath + << std::endl; + const auto referencedPartitionNameAndFileCount = + getPartitionNameAndFilecount(referenceOutputDirectoryPath); + LOG(INFO) << "Partitions and file count:" << std::endl; + std::vector referencePartitionNames; + referencePartitionNames.reserve(referencedPartitionNameAndFileCount.size()); + for (const auto& i : referencedPartitionNameAndFileCount) { + LOG(INFO) << i.first << ":" << i.second << std::endl; + referencePartitionNames.emplace_back(i.first); } - VELOX_CHECK( - partitions == referencedPartitions, - "Velox and reference DB output directory hierarchies don't match"); + if (bucketCount == 0) { + // If not bucketed, only verify if their partition names match + VELOX_CHECK( + partitionNames == referencePartitionNames, + "Velox and reference DB output partitions don't match"); + } else { + VELOX_CHECK( + partitionNameAndFileCount == referencedPartitionNameAndFileCount, + "Velox and reference DB output partition and bucket don't match"); + } } -std::set WriterFuzzer::getPartitionNames( +// static +std::map WriterFuzzer::getPartitionNameAndFilecount( const std::string& tableDirectoryPath) { auto fileSystem = filesystems::getFileSystem("/", nullptr); auto directories = listFolders(tableDirectoryPath); - std::set partitionNames; + std::map partitionNameAndFileCount; for (std::string directory : directories) { // If it's a hidden directory, ignore @@ -423,14 +482,26 @@ std::set WriterFuzzer::getPartitionNames( continue; } + // Count non-empty non-hidden files + const auto files = fileSystem->list(directory); + int32_t fileCount = 0; + for (const auto& file : files) { + // Presto query runner sometime creates empty files, ignore those. + if (file.find("/.") == std::string::npos && + fileSystem->openFileForRead(file)->size() > 0) { + fileCount++; + } + } + // Remove the path prefix to get the partition name // For example: /test/tmp_write/p0=1/p1=2020 // partition name is /p0=1/p1=2020 directory.erase(0, fileSystem->extractPath(tableDirectoryPath).length()); - partitionNames.insert(directory); + + partitionNameAndFileCount.emplace(directory, fileCount); } - return partitionNames; + return partitionNameAndFileCount; } } // namespace