From 3ef1d611bdbb672067671d788899c354d64ed409 Mon Sep 17 00:00:00 2001 From: Jialiang Tan Date: Tue, 13 Aug 2024 20:30:03 -0700 Subject: [PATCH] Refactor to migrate to velox::config::ConfigBase class in the codebase (#10716) Summary: As part of the consolidating and polishing config effort, all configs should now utilize common/config/Config.h as it provides richer functionality. Deprecate the old one to make the codebase clean Pull Request resolved: https://github.com/facebookincubator/velox/pull/10716 Reviewed By: xiaoxmeng Differential Revision: D61147922 Pulled By: tanjialiang fbshipit-source-id: 751cc47b17d551e882f4172e6ea05c851daa5a81 --- velox/benchmarks/filesystem/ReadBenchmark.cpp | 8 +- velox/benchmarks/tpch/TpchBenchmark.cpp | 4 +- velox/common/config/Config.h | 2 + velox/common/file/FileSystems.cpp | 15 +-- velox/common/file/FileSystems.h | 12 +- velox/common/file/tests/FaultyFileSystem.cpp | 4 +- velox/common/file/tests/FaultyFileSystem.h | 2 +- velox/connectors/CMakeLists.txt | 2 +- velox/connectors/Connector.h | 23 ++-- velox/connectors/fuzzer/FuzzerConnector.h | 17 ++- .../fuzzer/tests/FuzzerConnectorTestBase.h | 3 +- velox/connectors/hive/FileHandle.h | 7 +- velox/connectors/hive/HiveConfig.cpp | 54 +++++---- velox/connectors/hive/HiveConfig.h | 52 +++++---- velox/connectors/hive/HiveConnector.cpp | 2 +- velox/connectors/hive/HiveConnector.h | 18 ++- velox/connectors/hive/HiveConnectorUtil.cpp | 2 +- velox/connectors/hive/HiveConnectorUtil.h | 2 +- .../tests/IcebergSplitReaderBenchmark.cpp | 8 +- .../storage_adapters/abfs/AbfsFileSystem.cpp | 15 +-- .../storage_adapters/abfs/AbfsFileSystem.h | 3 +- .../abfs/RegisterAbfsFileSystem.cpp | 4 +- .../abfs/tests/AbfsFileSystemTest.cpp | 4 +- .../abfs/tests/AzuriteServer.h | 1 - .../storage_adapters/gcs/GCSFileSystem.cpp | 8 +- .../hive/storage_adapters/gcs/GCSFileSystem.h | 2 +- .../gcs/RegisterGCSFileSystem.cpp | 46 ++++---- .../gcs/examples/GCSFileSystemExample.cpp | 6 +- .../gcs/tests/GCSFileSystemTest.cpp | 11 +- .../storage_adapters/hdfs/HdfsFileSystem.cpp | 14 ++- .../storage_adapters/hdfs/HdfsFileSystem.h | 4 +- .../hdfs/RegisterHdfsFileSystem.cpp | 7 +- .../hdfs/tests/HdfsFileSystemTest.cpp | 72 ++++++------ .../s3fs/RegisterS3FileSystem.cpp | 10 +- .../storage_adapters/s3fs/S3FileSystem.cpp | 16 +-- .../hive/storage_adapters/s3fs/S3FileSystem.h | 4 +- .../storage_adapters/s3fs/tests/MinioServer.h | 6 +- .../s3fs/tests/S3FileSystemFinalizeTest.cpp | 5 +- .../s3fs/tests/S3FileSystemTest.cpp | 19 ++-- .../connectors/hive/tests/HiveConfigTest.cpp | 26 +++-- .../hive/tests/HiveConnectorUtilTest.cpp | 8 +- .../hive/tests/HiveDataSinkTest.cpp | 11 +- velox/connectors/tests/ConnectorTest.cpp | 13 ++- velox/connectors/tpch/TpchConnector.h | 17 ++- velox/connectors/tpch/tests/SpeedTest.cpp | 4 +- .../tpch/tests/TpchConnectorTest.cpp | 4 +- velox/core/CMakeLists.txt | 1 + velox/core/QueryConfig.cpp | 106 ++++-------------- velox/core/QueryConfig.h | 22 +--- velox/core/QueryCtx.cpp | 38 ++++++- velox/core/QueryCtx.h | 33 ++++-- velox/core/tests/QueryConfigTest.cpp | 106 +----------------- velox/dwio/common/CMakeLists.txt | 2 +- velox/dwio/common/FileSink.h | 10 +- velox/dwio/common/Options.h | 6 +- velox/dwio/parquet/tests/ParquetTpchTest.cpp | 8 +- .../tests/reader/ParquetTableScanTest.cpp | 4 +- .../tests/writer/ParquetWriterTest.cpp | 4 +- velox/dwio/parquet/writer/Writer.cpp | 10 +- velox/dwio/parquet/writer/Writer.h | 6 +- velox/examples/ScanAndSort.cpp | 5 +- velox/exec/fuzzer/AggregationFuzzerBase.h | 4 +- velox/exec/fuzzer/JoinFuzzer.cpp | 5 +- velox/exec/fuzzer/MemoryArbitrationFuzzer.cpp | 5 +- velox/exec/fuzzer/RowNumberFuzzer.cpp | 5 +- velox/exec/fuzzer/WriterFuzzerRunner.h | 4 +- velox/exec/tests/AsyncConnectorTest.cpp | 18 ++- velox/exec/tests/TableWriteTest.cpp | 4 +- velox/exec/tests/VeloxIn10MinDemo.cpp | 4 +- .../tests/utils/HiveConnectorTestBase.cpp | 5 +- .../exec/tests/utils/HiveConnectorTestBase.h | 3 +- .../wave/exec/WaveHiveDataSource.cpp | 3 +- velox/expression/tests/CastExprTest.cpp | 4 - .../tests/utils/AggregationTestBase.cpp | 5 +- 74 files changed, 509 insertions(+), 498 deletions(-) diff --git a/velox/benchmarks/filesystem/ReadBenchmark.cpp b/velox/benchmarks/filesystem/ReadBenchmark.cpp index e256dfb00028..f99639b8c966 100644 --- a/velox/benchmarks/filesystem/ReadBenchmark.cpp +++ b/velox/benchmarks/filesystem/ReadBenchmark.cpp @@ -16,11 +16,11 @@ #include "velox/benchmarks/filesystem/ReadBenchmark.h" +#include "velox/common/config/Config.h" #include "velox/connectors/hive/storage_adapters/abfs/RegisterAbfsFileSystem.h" #include "velox/connectors/hive/storage_adapters/gcs/RegisterGCSFileSystem.h" #include "velox/connectors/hive/storage_adapters/hdfs/RegisterHdfsFileSystem.h" #include "velox/connectors/hive/storage_adapters/s3fs/RegisterS3FileSystem.h" -#include "velox/core/Config.h" DEFINE_string(path, "", "Path of the input file"); DEFINE_int64( @@ -60,7 +60,7 @@ DEFINE_validator(path, ¬Empty); namespace facebook::velox { -std::shared_ptr readConfig(const std::string& filePath) { +std::shared_ptr readConfig(const std::string& filePath) { std::ifstream configFile(filePath); if (!configFile.is_open()) { throw std::runtime_error( @@ -80,7 +80,7 @@ std::shared_ptr readConfig(const std::string& filePath) { properties.emplace(name, value); } - return std::make_shared(properties); + return std::make_shared(std::move(properties)); } // Initialize a LocalReadFile instance for the specified 'path'. @@ -108,7 +108,7 @@ void ReadBenchmark::initialize() { filesystems::registerGCSFileSystem(); filesystems::registerHdfsFileSystem(); filesystems::abfs::registerAbfsFileSystem(); - std::shared_ptr config; + std::shared_ptr config; if (!FLAGS_config.empty()) { config = readConfig(FLAGS_config); } diff --git a/velox/benchmarks/tpch/TpchBenchmark.cpp b/velox/benchmarks/tpch/TpchBenchmark.cpp index 413bd82e4a5d..babb86a0e9ef 100644 --- a/velox/benchmarks/tpch/TpchBenchmark.cpp +++ b/velox/benchmarks/tpch/TpchBenchmark.cpp @@ -265,8 +265,8 @@ class TpchBenchmark { std::to_string(FLAGS_max_coalesced_distance_bytes); configurationValues[connector::hive::HiveConfig::kPrefetchRowGroups] = std::to_string(FLAGS_parquet_prefetch_rowgroups); - auto properties = - std::make_shared(configurationValues); + auto properties = std::make_shared( + std::move(configurationValues)); // Create hive connector with config... auto hiveConnector = diff --git a/velox/common/config/Config.h b/velox/common/config/Config.h index 793c808a6e0b..533a53944345 100644 --- a/velox/common/config/Config.h +++ b/velox/common/config/Config.h @@ -80,6 +80,8 @@ class ConfigBase { bool _mutable = false) : configs_(std::move(configs)), mutable_(_mutable) {} + virtual ~ConfigBase() {} + template ConfigBase& set(const Entry& entry, const T& val) { VELOX_CHECK(mutable_, "Cannot set in immutable config"); diff --git a/velox/common/file/FileSystems.cpp b/velox/common/file/FileSystems.cpp index 52c9e17aa172..eb7de8df7ad7 100644 --- a/velox/common/file/FileSystems.cpp +++ b/velox/common/file/FileSystems.cpp @@ -30,8 +30,9 @@ constexpr std::string_view kFileScheme("file:"); using RegisteredFileSystems = std::vector, - std::function(std::shared_ptr, std::string_view)>>>; + std::function( + std::shared_ptr, + std::string_view)>>>; RegisteredFileSystems& registeredFileSystems() { // Meyers singleton. @@ -44,14 +45,14 @@ RegisteredFileSystems& registeredFileSystems() { void registerFileSystem( std::function schemeMatcher, std::function( - std::shared_ptr, + std::shared_ptr, std::string_view)> fileSystemGenerator) { registeredFileSystems().emplace_back(schemeMatcher, fileSystemGenerator); } std::shared_ptr getFileSystem( std::string_view filePath, - std::shared_ptr properties) { + std::shared_ptr properties) { const auto& filesystems = registeredFileSystems(); for (const auto& p : filesystems) { if (p.first(filePath)) { @@ -78,7 +79,7 @@ folly::once_flag localFSInstantiationFlag; // Implement Local FileSystem. class LocalFileSystem : public FileSystem { public: - explicit LocalFileSystem(std::shared_ptr config) + explicit LocalFileSystem(std::shared_ptr config) : FileSystem(config) {} ~LocalFileSystem() override {} @@ -193,9 +194,9 @@ class LocalFileSystem : public FileSystem { } static std::function(std::shared_ptr, std::string_view)> + FileSystem>(std::shared_ptr, std::string_view)> fileSystemGenerator() { - return [](std::shared_ptr properties, + return [](std::shared_ptr properties, std::string_view filePath) { // One instance of Local FileSystem is sufficient. // Initialize on first access and reuse after that. diff --git a/velox/common/file/FileSystems.h b/velox/common/file/FileSystems.h index a1a233cdfba1..16aa14bd2b32 100644 --- a/velox/common/file/FileSystems.h +++ b/velox/common/file/FileSystems.h @@ -23,7 +23,9 @@ #include namespace facebook::velox { -class Config; +namespace config { +class ConfigBase; +} class ReadFile; class WriteFile; } // namespace facebook::velox @@ -50,7 +52,7 @@ struct FileOptions { /// An abstract FileSystem class FileSystem { public: - FileSystem(std::shared_ptr config) + FileSystem(std::shared_ptr config) : config_(std::move(config)) {} virtual ~FileSystem() = default; @@ -101,12 +103,12 @@ class FileSystem { virtual void rmdir(std::string_view path) = 0; protected: - std::shared_ptr config_; + std::shared_ptr config_; }; std::shared_ptr getFileSystem( std::string_view filename, - std::shared_ptr config); + std::shared_ptr config); /// Returns true if filePath is supported by any registered file system, /// otherwise false. @@ -120,7 +122,7 @@ bool isPathSupportedByRegisteredFileSystems(const std::string_view& filePath); void registerFileSystem( std::function schemeMatcher, std::function( - std::shared_ptr, + std::shared_ptr, std::string_view)> fileSystemGenerator); /// Register the local filesystem. diff --git a/velox/common/file/tests/FaultyFileSystem.cpp b/velox/common/file/tests/FaultyFileSystem.cpp index 25609c8d1f9f..39b4f6b09e15 100644 --- a/velox/common/file/tests/FaultyFileSystem.cpp +++ b/velox/common/file/tests/FaultyFileSystem.cpp @@ -38,9 +38,9 @@ std::function schemeMatcher() { folly::once_flag faultFilesystemInitOnceFlag; std::function(std::shared_ptr, std::string_view)> + FileSystem>(std::shared_ptr, std::string_view)> fileSystemGenerator() { - return [](std::shared_ptr properties, + return [](std::shared_ptr properties, std::string_view /*unused*/) { // One instance of faulty FileSystem is sufficient. Initializes on first // access and reuse after that. diff --git a/velox/common/file/tests/FaultyFileSystem.h b/velox/common/file/tests/FaultyFileSystem.h index 0393f6ff855f..b55266d41b9a 100644 --- a/velox/common/file/tests/FaultyFileSystem.h +++ b/velox/common/file/tests/FaultyFileSystem.h @@ -32,7 +32,7 @@ using namespace filesystems; /// file operation to the real file system underneath. class FaultyFileSystem : public FileSystem { public: - explicit FaultyFileSystem(std::shared_ptr config) + explicit FaultyFileSystem(std::shared_ptr config) : FileSystem(std::move(config)) {} ~FaultyFileSystem() override {} diff --git a/velox/connectors/CMakeLists.txt b/velox/connectors/CMakeLists.txt index 1c673903aba3..3cc600201f6b 100644 --- a/velox/connectors/CMakeLists.txt +++ b/velox/connectors/CMakeLists.txt @@ -13,7 +13,7 @@ # limitations under the License. velox_add_library(velox_connector Connector.cpp) -velox_link_libraries(velox_connector velox_config velox_vector) +velox_link_libraries(velox_connector velox_common_config velox_vector) add_subdirectory(fuzzer) diff --git a/velox/connectors/Connector.h b/velox/connectors/Connector.h index 17decfedf96b..125066176687 100644 --- a/velox/connectors/Connector.h +++ b/velox/connectors/Connector.h @@ -29,15 +29,17 @@ #include +namespace facebook::velox { +class Config; +} namespace facebook::velox::wave { class WaveDataSource; } namespace facebook::velox::common { class Filter; } - -namespace facebook::velox { -class Config; +namespace facebook::velox::config { +class ConfigBase; } namespace facebook::velox::connector { @@ -256,7 +258,7 @@ class ConnectorQueryCtx { ConnectorQueryCtx( memory::MemoryPool* operatorPool, memory::MemoryPool* connectorPool, - const Config* sessionProperties, + const config::ConfigBase* sessionProperties, const common::SpillConfig* spillConfig, common::PrefixSortConfig prefixSortConfig, std::unique_ptr expressionEvaluator, @@ -297,7 +299,7 @@ class ConnectorQueryCtx { return connectorPool_; } - const Config* sessionProperties() const { + const config::ConfigBase* sessionProperties() const { return sessionProperties_; } @@ -356,7 +358,7 @@ class ConnectorQueryCtx { private: memory::MemoryPool* const operatorPool_; memory::MemoryPool* const connectorPool_; - const Config* const sessionProperties_; + const config::ConfigBase* const sessionProperties_; const common::SpillConfig* const spillConfig_; const common::PrefixSortConfig prefixSortConfig_; std::unique_ptr expressionEvaluator_; @@ -380,7 +382,8 @@ class Connector { return id_; } - virtual const std::shared_ptr& connectorConfig() const { + virtual const std::shared_ptr& connectorConfig() + const { VELOX_NYI("connectorConfig is not supported yet"); } @@ -447,6 +450,12 @@ class ConnectorFactory { return name_; } + virtual std::shared_ptr newConnector( + const std::string& id, + std::shared_ptr config, + folly::Executor* executor = nullptr) = 0; + + // TODO(jtan6): [Config Refactor] Remove this old API when refactor is done. virtual std::shared_ptr newConnector( const std::string& id, std::shared_ptr config, diff --git a/velox/connectors/fuzzer/FuzzerConnector.h b/velox/connectors/fuzzer/FuzzerConnector.h index d04797f65187..f920b0064434 100644 --- a/velox/connectors/fuzzer/FuzzerConnector.h +++ b/velox/connectors/fuzzer/FuzzerConnector.h @@ -15,8 +15,10 @@ */ #pragma once +#include "velox/common/config/Config.h" #include "velox/connectors/Connector.h" #include "velox/connectors/fuzzer/FuzzerConnectorSplit.h" +#include "velox/core/Config.h" #include "velox/vector/fuzzer/VectorFuzzer.h" namespace facebook::velox::connector::fuzzer { @@ -103,7 +105,7 @@ class FuzzerConnector final : public Connector { public: FuzzerConnector( const std::string& id, - std::shared_ptr config, + std::shared_ptr config, folly::Executor* /*executor*/) : Connector(id) {} @@ -139,10 +141,21 @@ class FuzzerConnectorFactory : public ConnectorFactory { std::shared_ptr newConnector( const std::string& id, - std::shared_ptr config, + std::shared_ptr config, folly::Executor* executor = nullptr) override { return std::make_shared(id, config, executor); } + + std::shared_ptr newConnector( + const std::string& id, + std::shared_ptr config, + folly::Executor* executor = nullptr) override { + std::shared_ptr convertedConfig; + convertedConfig = config == nullptr + ? nullptr + : std::make_shared(config->valuesCopy()); + return newConnector(id, convertedConfig, executor); + } }; } // namespace facebook::velox::connector::fuzzer diff --git a/velox/connectors/fuzzer/tests/FuzzerConnectorTestBase.h b/velox/connectors/fuzzer/tests/FuzzerConnectorTestBase.h index 9e7f536157e8..e47cc810012f 100644 --- a/velox/connectors/fuzzer/tests/FuzzerConnectorTestBase.h +++ b/velox/connectors/fuzzer/tests/FuzzerConnectorTestBase.h @@ -26,10 +26,11 @@ class FuzzerConnectorTestBase : public exec::test::OperatorTestBase { void SetUp() override { OperatorTestBase::SetUp(); + std::shared_ptr config; auto fuzzerConnector = connector::getConnectorFactory( connector::fuzzer::FuzzerConnectorFactory::kFuzzerConnectorName) - ->newConnector(kFuzzerConnectorId, nullptr); + ->newConnector(kFuzzerConnectorId, config); connector::registerConnector(fuzzerConnector); } diff --git a/velox/connectors/hive/FileHandle.h b/velox/connectors/hive/FileHandle.h index 18ff0a8b00ed..5db30b1d7f4c 100644 --- a/velox/connectors/hive/FileHandle.h +++ b/velox/connectors/hive/FileHandle.h @@ -27,13 +27,12 @@ #include "velox/common/caching/CachedFactory.h" #include "velox/common/caching/FileIds.h" +#include "velox/common/config/Config.h" #include "velox/common/file/File.h" #include "velox/connectors/hive/FileProperties.h" namespace facebook::velox { -class Config; - // See the file comment. struct FileHandle { std::shared_ptr file; @@ -66,14 +65,14 @@ using FileHandleCache = SimpleLRUCache; class FileHandleGenerator { public: FileHandleGenerator() {} - FileHandleGenerator(std::shared_ptr properties) + FileHandleGenerator(std::shared_ptr properties) : properties_(std::move(properties)) {} std::unique_ptr operator()( const std::string& filename, const FileProperties* properties); private: - const std::shared_ptr properties_; + const std::shared_ptr properties_; }; using FileHandleFactory = CachedFactory< diff --git a/velox/connectors/hive/HiveConfig.cpp b/velox/connectors/hive/HiveConfig.cpp index 1dc655e0fee6..984a27cade42 100644 --- a/velox/connectors/hive/HiveConfig.cpp +++ b/velox/connectors/hive/HiveConfig.cpp @@ -15,7 +15,7 @@ */ #include "velox/connectors/hive/HiveConfig.h" -#include "velox/core/Config.h" +#include "velox/common/config/Config.h" #include "velox/core/QueryConfig.h" #include @@ -53,13 +53,15 @@ std::string HiveConfig::insertExistingPartitionsBehaviorString( } HiveConfig::InsertExistingPartitionsBehavior -HiveConfig::insertExistingPartitionsBehavior(const Config* session) const { +HiveConfig::insertExistingPartitionsBehavior( + const config::ConfigBase* session) const { return stringToInsertExistingPartitionsBehavior(session->get( kInsertExistingPartitionsBehaviorSession, config_->get(kInsertExistingPartitionsBehavior, "ERROR"))); } -uint32_t HiveConfig::maxPartitionsPerWriters(const Config* session) const { +uint32_t HiveConfig::maxPartitionsPerWriters( + const config::ConfigBase* session) const { return session->get( kMaxPartitionsPerWritersSession, config_->get(kMaxPartitionsPerWriters, 100)); @@ -90,15 +92,18 @@ std::string HiveConfig::s3Endpoint() const { } std::optional HiveConfig::s3AccessKey() const { - return static_cast>(config_->get(kS3AwsAccessKey)); + return static_cast>( + config_->get(kS3AwsAccessKey)); } std::optional HiveConfig::s3SecretKey() const { - return static_cast>(config_->get(kS3AwsSecretKey)); + return static_cast>( + config_->get(kS3AwsSecretKey)); } std::optional HiveConfig::s3IAMRole() const { - return static_cast>(config_->get(kS3IamRole)); + return static_cast>( + config_->get(kS3IamRole)); } std::string HiveConfig::s3IAMRoleSessionName() const { @@ -151,22 +156,24 @@ std::optional HiveConfig::gcsMaxRetryTime() const { config_->get(kGCSMaxRetryTime)); } -bool HiveConfig::isOrcUseColumnNames(const Config* session) const { +bool HiveConfig::isOrcUseColumnNames(const config::ConfigBase* session) const { return session->get( kOrcUseColumnNamesSession, config_->get(kOrcUseColumnNames, false)); } -bool HiveConfig::isFileColumnNamesReadAsLowerCase(const Config* session) const { +bool HiveConfig::isFileColumnNamesReadAsLowerCase( + const config::ConfigBase* session) const { return session->get( kFileColumnNamesReadAsLowerCaseSession, config_->get(kFileColumnNamesReadAsLowerCase, false)); } -bool HiveConfig::isPartitionPathAsLowerCase(const Config* session) const { +bool HiveConfig::isPartitionPathAsLowerCase( + const config::ConfigBase* session) const { return session->get(kPartitionPathAsLowerCaseSession, true); } -bool HiveConfig::ignoreMissingFiles(const Config* session) const { +bool HiveConfig::ignoreMissingFiles(const config::ConfigBase* session) const { return session->get(kIgnoreMissingFilesSession, false); } @@ -194,7 +201,8 @@ bool HiveConfig::isFileHandleCacheEnabled() const { return config_->get(kEnableFileHandleCache, true); } -uint64_t HiveConfig::orcWriterMaxStripeSize(const Config* session) const { +uint64_t HiveConfig::orcWriterMaxStripeSize( + const config::ConfigBase* session) const { return config::toCapacity( session->get( kOrcWriterMaxStripeSizeSession, @@ -202,7 +210,8 @@ uint64_t HiveConfig::orcWriterMaxStripeSize(const Config* session) const { config::CapacityUnit::BYTE); } -uint64_t HiveConfig::orcWriterMaxDictionaryMemory(const Config* session) const { +uint64_t HiveConfig::orcWriterMaxDictionaryMemory( + const config::ConfigBase* session) const { return config::toCapacity( session->get( kOrcWriterMaxDictionaryMemorySession, @@ -211,34 +220,35 @@ uint64_t HiveConfig::orcWriterMaxDictionaryMemory(const Config* session) const { } bool HiveConfig::isOrcWriterIntegerDictionaryEncodingEnabled( - const Config* session) const { + const config::ConfigBase* session) const { return session->get( kOrcWriterIntegerDictionaryEncodingEnabledSession, config_->get(kOrcWriterIntegerDictionaryEncodingEnabled, true)); } bool HiveConfig::isOrcWriterStringDictionaryEncodingEnabled( - const Config* session) const { + const config::ConfigBase* session) const { return session->get( kOrcWriterStringDictionaryEncodingEnabledSession, config_->get(kOrcWriterStringDictionaryEncodingEnabled, true)); } bool HiveConfig::orcWriterLinearStripeSizeHeuristics( - const Config* session) const { + const config::ConfigBase* session) const { return session->get( kOrcWriterLinearStripeSizeHeuristicsSession, config_->get(kOrcWriterLinearStripeSizeHeuristics, true)); } -uint64_t HiveConfig::orcWriterMinCompressionSize(const Config* session) const { +uint64_t HiveConfig::orcWriterMinCompressionSize( + const config::ConfigBase* session) const { return session->get( kOrcWriterMinCompressionSizeSession, config_->get(kOrcWriterMinCompressionSize, 1024)); } std::optional HiveConfig::orcWriterCompressionLevel( - const Config* session) const { + const config::ConfigBase* session) const { auto sessionProp = session->get(kOrcWriterCompressionLevelSession); if (sessionProp.has_value()) { @@ -260,13 +270,15 @@ std::string HiveConfig::writeFileCreateConfig() const { return config_->get(kWriteFileCreateConfig, ""); } -uint32_t HiveConfig::sortWriterMaxOutputRows(const Config* session) const { +uint32_t HiveConfig::sortWriterMaxOutputRows( + const config::ConfigBase* session) const { return session->get( kSortWriterMaxOutputRowsSession, config_->get(kSortWriterMaxOutputRows, 1024)); } -uint64_t HiveConfig::sortWriterMaxOutputBytes(const Config* session) const { +uint64_t HiveConfig::sortWriterMaxOutputBytes( + const config::ConfigBase* session) const { return config::toCapacity( session->get( kSortWriterMaxOutputBytesSession, @@ -286,7 +298,7 @@ bool HiveConfig::s3UseProxyFromEnv() const { return config_->get(kS3UseProxyFromEnv, false); } -uint8_t HiveConfig::readTimestampUnit(const Config* session) const { +uint8_t HiveConfig::readTimestampUnit(const config::ConfigBase* session) const { const auto unit = session->get( kReadTimestampUnitSession, config_->get(kReadTimestampUnit, 3 /*milli*/)); @@ -296,7 +308,7 @@ uint8_t HiveConfig::readTimestampUnit(const Config* session) const { return unit; } -bool HiveConfig::cacheNoRetention(const Config* session) const { +bool HiveConfig::cacheNoRetention(const config::ConfigBase* session) const { return session->get( kCacheNoRetentionSession, config_->get(kCacheNoRetention, /*defaultValue=*/false)); diff --git a/velox/connectors/hive/HiveConfig.h b/velox/connectors/hive/HiveConfig.h index 57f6bac6bc84..3fbbe710b865 100644 --- a/velox/connectors/hive/HiveConfig.h +++ b/velox/connectors/hive/HiveConfig.h @@ -17,10 +17,10 @@ #include #include -#include "velox/core/Config.h" +#include "velox/common/base/Exceptions.h" -namespace facebook::velox { -class Config; +namespace facebook::velox::config { +class ConfigBase; } namespace facebook::velox::connector::hive { @@ -241,9 +241,9 @@ class HiveConfig { static constexpr const char* kCacheNoRetentionSession = "cache.no_retention"; InsertExistingPartitionsBehavior insertExistingPartitionsBehavior( - const Config* session) const; + const config::ConfigBase* session) const; - uint32_t maxPartitionsPerWriters(const Config* session) const; + uint32_t maxPartitionsPerWriters(const config::ConfigBase* session) const; bool immutablePartitions() const; @@ -285,13 +285,14 @@ class HiveConfig { std::optional gcsMaxRetryTime() const; - bool isOrcUseColumnNames(const Config* session) const; + bool isOrcUseColumnNames(const config::ConfigBase* session) const; - bool isFileColumnNamesReadAsLowerCase(const Config* session) const; + bool isFileColumnNamesReadAsLowerCase( + const config::ConfigBase* session) const; - bool isPartitionPathAsLowerCase(const Config* session) const; + bool isPartitionPathAsLowerCase(const config::ConfigBase* session) const; - bool ignoreMissingFiles(const Config* session) const; + bool ignoreMissingFiles(const config::ConfigBase* session) const; int64_t maxCoalescedBytes() const; @@ -307,25 +308,30 @@ class HiveConfig { uint64_t fileWriterFlushThresholdBytes() const; - uint64_t orcWriterMaxStripeSize(const Config* session) const; + uint64_t orcWriterMaxStripeSize(const config::ConfigBase* session) const; - uint64_t orcWriterMaxDictionaryMemory(const Config* session) const; + uint64_t orcWriterMaxDictionaryMemory( + const config::ConfigBase* session) const; - bool isOrcWriterIntegerDictionaryEncodingEnabled(const Config* session) const; + bool isOrcWriterIntegerDictionaryEncodingEnabled( + const config::ConfigBase* session) const; - bool isOrcWriterStringDictionaryEncodingEnabled(const Config* session) const; + bool isOrcWriterStringDictionaryEncodingEnabled( + const config::ConfigBase* session) const; - bool orcWriterLinearStripeSizeHeuristics(const Config* session) const; + bool orcWriterLinearStripeSizeHeuristics( + const config::ConfigBase* session) const; - uint64_t orcWriterMinCompressionSize(const Config* session) const; + uint64_t orcWriterMinCompressionSize(const config::ConfigBase* session) const; - std::optional orcWriterCompressionLevel(const Config* session) const; + std::optional orcWriterCompressionLevel( + const config::ConfigBase* session) const; std::string writeFileCreateConfig() const; - uint32_t sortWriterMaxOutputRows(const Config* session) const; + uint32_t sortWriterMaxOutputRows(const config::ConfigBase* session) const; - uint64_t sortWriterMaxOutputBytes(const Config* session) const; + uint64_t sortWriterMaxOutputBytes(const config::ConfigBase* session) const; uint64_t footerEstimatedSize() const; @@ -334,28 +340,28 @@ class HiveConfig { bool s3UseProxyFromEnv() const; // Returns the timestamp unit used when reading timestamps from files. - uint8_t readTimestampUnit(const Config* session) const; + uint8_t readTimestampUnit(const config::ConfigBase* session) const; /// Returns true to evict out a query scanned data out of in-memory cache /// right after the access, and also skip staging to the ssd cache. This helps /// to prevent the cache space pollution from the one-time table scan by large /// batch query when mixed running with interactive query which has high data /// locality. - bool cacheNoRetention(const Config* session) const; + bool cacheNoRetention(const config::ConfigBase* session) const; - HiveConfig(std::shared_ptr config) { + HiveConfig(std::shared_ptr config) { VELOX_CHECK_NOT_NULL( config, "Config is null for HiveConfig initialization"); config_ = std::move(config); // TODO: add sanity check } - const std::shared_ptr& config() const { + const std::shared_ptr& config() const { return config_; } private: - std::shared_ptr config_; + std::shared_ptr config_; }; } // namespace facebook::velox::connector::hive diff --git a/velox/connectors/hive/HiveConnector.cpp b/velox/connectors/hive/HiveConnector.cpp index 5617b8613d1e..4b1a5c608b28 100644 --- a/velox/connectors/hive/HiveConnector.cpp +++ b/velox/connectors/hive/HiveConnector.cpp @@ -45,7 +45,7 @@ namespace facebook::velox::connector::hive { HiveConnector::HiveConnector( const std::string& id, - std::shared_ptr config, + std::shared_ptr config, folly::Executor* executor) : Connector(id), hiveConfig_(std::make_shared(config)), diff --git a/velox/connectors/hive/HiveConnector.h b/velox/connectors/hive/HiveConnector.h index d3c59e36c2fc..1f3005a56490 100644 --- a/velox/connectors/hive/HiveConnector.h +++ b/velox/connectors/hive/HiveConnector.h @@ -31,10 +31,11 @@ class HiveConnector : public Connector { public: HiveConnector( const std::string& id, - std::shared_ptr config, + std::shared_ptr config, folly::Executor* executor); - const std::shared_ptr& connectorConfig() const override { + const std::shared_ptr& connectorConfig() + const override { return hiveConfig_->config(); } @@ -96,10 +97,21 @@ class HiveConnectorFactory : public ConnectorFactory { std::shared_ptr newConnector( const std::string& id, - std::shared_ptr config, + std::shared_ptr config, folly::Executor* executor = nullptr) override { return std::make_shared(id, config, executor); } + + std::shared_ptr newConnector( + const std::string& id, + std::shared_ptr config, + folly::Executor* executor = nullptr) override { + std::shared_ptr convertedConfig; + convertedConfig = config == nullptr + ? nullptr + : std::make_shared(config->valuesCopy()); + return newConnector(id, convertedConfig, executor); + } }; class HiveHadoop2ConnectorFactory : public HiveConnectorFactory { diff --git a/velox/connectors/hive/HiveConnectorUtil.cpp b/velox/connectors/hive/HiveConnectorUtil.cpp index bf01ac9e9149..bbb1adc24932 100644 --- a/velox/connectors/hive/HiveConnectorUtil.cpp +++ b/velox/connectors/hive/HiveConnectorUtil.cpp @@ -574,7 +574,7 @@ void configureRowReaderOptions( const RowTypePtr& rowType, const std::shared_ptr& hiveSplit, const std::shared_ptr& hiveConfig, - const Config* sessionProperties) { + const config::ConfigBase* sessionProperties) { auto skipRowsIt = tableParameters.find(dwio::common::TableParameter::kSkipHeaderLineCount); if (skipRowsIt != tableParameters.end()) { diff --git a/velox/connectors/hive/HiveConnectorUtil.h b/velox/connectors/hive/HiveConnectorUtil.h index fd39c9cd1810..2a4b36d6d01f 100644 --- a/velox/connectors/hive/HiveConnectorUtil.h +++ b/velox/connectors/hive/HiveConnectorUtil.h @@ -83,7 +83,7 @@ void configureRowReaderOptions( const RowTypePtr& rowType, const std::shared_ptr& hiveSplit, const std::shared_ptr& hiveConfig = nullptr, - const Config* sessionProperties = nullptr); + const config::ConfigBase* sessionProperties = nullptr); bool testFilters( const common::ScanSpec* scanSpec, diff --git a/velox/connectors/hive/iceberg/tests/IcebergSplitReaderBenchmark.cpp b/velox/connectors/hive/iceberg/tests/IcebergSplitReaderBenchmark.cpp index 170ae81efa30..e0b2a6c31f85 100644 --- a/velox/connectors/hive/iceberg/tests/IcebergSplitReaderBenchmark.cpp +++ b/velox/connectors/hive/iceberg/tests/IcebergSplitReaderBenchmark.cpp @@ -290,7 +290,8 @@ void IcebergSplitReaderBenchmark::readSingleColumn( rowType); std::shared_ptr hiveConfig = - std::make_shared(std::make_shared()); + std::make_shared(std::make_shared( + std::unordered_map(), true)); const RowTypePtr readerOutputType; const std::shared_ptr ioStats = std::make_shared(); @@ -301,8 +302,9 @@ void IcebergSplitReaderBenchmark::readSingleColumn( std::shared_ptr opPool = root->addLeafChild("operator"); std::shared_ptr connectorPool = root->addAggregateChild(kHiveConnectorId, MemoryReclaimer::create()); - std::shared_ptr connectorSessionProperties_ = - std::make_shared(); + std::shared_ptr connectorSessionProperties_ = + std::make_shared( + std::unordered_map()); std::unique_ptr connectorQueryCtx_ = std::make_unique( diff --git a/velox/connectors/hive/storage_adapters/abfs/AbfsFileSystem.cpp b/velox/connectors/hive/storage_adapters/abfs/AbfsFileSystem.cpp index c4d570ed818a..681d26b35e76 100644 --- a/velox/connectors/hive/storage_adapters/abfs/AbfsFileSystem.cpp +++ b/velox/connectors/hive/storage_adapters/abfs/AbfsFileSystem.cpp @@ -21,30 +21,30 @@ #include #include +#include "velox/common/config/Config.h" #include "velox/common/file/File.h" #include "velox/connectors/hive/HiveConfig.h" #include "velox/connectors/hive/storage_adapters/abfs/AbfsReadFile.h" #include "velox/connectors/hive/storage_adapters/abfs/AbfsWriteFile.h" -#include "velox/core/Config.h" namespace facebook::velox::filesystems::abfs { using namespace Azure::Storage::Blobs; class AbfsConfig { public: - AbfsConfig(const Config* config) : config_(config) {} + AbfsConfig(const config::ConfigBase* config) : config_(config) {} std::string connectionString(const std::string& path) const { auto abfsAccount = AbfsAccount(path); auto key = abfsAccount.credKey(); VELOX_USER_CHECK( - config_->isValueExists(key), "Failed to find storage credentials"); + config_->valueExists(key), "Failed to find storage credentials"); - return abfsAccount.connectionString(config_->get(key).value()); + return abfsAccount.connectionString(config_->get(key).value()); } private: - const Config* config_; + const config::ConfigBase* config_; }; class AbfsReadFile::Impl { @@ -224,7 +224,7 @@ uint64_t AbfsReadFile::getNaturalReadSize() const { class AbfsFileSystem::Impl { public: - explicit Impl(const Config* config) : abfsConfig_(config) { + explicit Impl(const config::ConfigBase* config) : abfsConfig_(config) { LOG(INFO) << "Init Azure Blob file system"; } @@ -242,7 +242,8 @@ class AbfsFileSystem::Impl { std::shared_ptr ioExecutor_; }; -AbfsFileSystem::AbfsFileSystem(const std::shared_ptr& config) +AbfsFileSystem::AbfsFileSystem( + const std::shared_ptr& config) : FileSystem(config) { impl_ = std::make_shared(config.get()); } diff --git a/velox/connectors/hive/storage_adapters/abfs/AbfsFileSystem.h b/velox/connectors/hive/storage_adapters/abfs/AbfsFileSystem.h index 4b8ec74d5954..319a85b7a382 100644 --- a/velox/connectors/hive/storage_adapters/abfs/AbfsFileSystem.h +++ b/velox/connectors/hive/storage_adapters/abfs/AbfsFileSystem.h @@ -34,7 +34,8 @@ namespace facebook::velox::filesystems::abfs { /// https://learn.microsoft.com/en-us/azure/databricks/storage/azure-storage. class AbfsFileSystem : public FileSystem { public: - explicit AbfsFileSystem(const std::shared_ptr& config); + explicit AbfsFileSystem( + const std::shared_ptr& config); std::string name() const override; diff --git a/velox/connectors/hive/storage_adapters/abfs/RegisterAbfsFileSystem.cpp b/velox/connectors/hive/storage_adapters/abfs/RegisterAbfsFileSystem.cpp index 5a566770f4c2..a8b0df52a6db 100644 --- a/velox/connectors/hive/storage_adapters/abfs/RegisterAbfsFileSystem.cpp +++ b/velox/connectors/hive/storage_adapters/abfs/RegisterAbfsFileSystem.cpp @@ -15,9 +15,9 @@ */ #ifdef VELOX_ENABLE_ABFS +#include "velox/common/config/Config.h" #include "velox/connectors/hive/storage_adapters/abfs/AbfsFileSystem.h" // @manual #include "velox/connectors/hive/storage_adapters/abfs/AbfsUtil.h" // @manual -#include "velox/core/Config.h" #endif namespace facebook::velox::filesystems::abfs { @@ -26,7 +26,7 @@ namespace facebook::velox::filesystems::abfs { folly::once_flag abfsInitiationFlag; std::shared_ptr abfsFileSystemGenerator( - std::shared_ptr properties, + std::shared_ptr properties, std::string_view filePath) { static std::shared_ptr filesystem; folly::call_once(abfsInitiationFlag, [&properties]() { diff --git a/velox/connectors/hive/storage_adapters/abfs/tests/AbfsFileSystemTest.cpp b/velox/connectors/hive/storage_adapters/abfs/tests/AbfsFileSystemTest.cpp index 1e9559096a4a..926f064ce28e 100644 --- a/velox/connectors/hive/storage_adapters/abfs/tests/AbfsFileSystemTest.cpp +++ b/velox/connectors/hive/storage_adapters/abfs/tests/AbfsFileSystemTest.cpp @@ -44,7 +44,7 @@ static const std::string fullFilePath = class AbfsFileSystemTest : public testing::Test { public: - static std::shared_ptr hiveConfig( + static std::shared_ptr hiveConfig( const std::unordered_map configOverride = {}) { std::unordered_map config({}); @@ -55,7 +55,7 @@ class AbfsFileSystemTest : public testing::Test { << std::endl; } - return std::make_shared(std::move(config)); + return std::make_shared(std::move(config)); } public: diff --git a/velox/connectors/hive/storage_adapters/abfs/tests/AzuriteServer.h b/velox/connectors/hive/storage_adapters/abfs/tests/AzuriteServer.h index 4836183f3819..67a4f434cb4d 100644 --- a/velox/connectors/hive/storage_adapters/abfs/tests/AzuriteServer.h +++ b/velox/connectors/hive/storage_adapters/abfs/tests/AzuriteServer.h @@ -14,7 +14,6 @@ * limitations under the License. */ -#include "velox/core/Config.h" #include "velox/exec/tests/utils/TempDirectoryPath.h" #include diff --git a/velox/connectors/hive/storage_adapters/gcs/GCSFileSystem.cpp b/velox/connectors/hive/storage_adapters/gcs/GCSFileSystem.cpp index 0c08c0272f9f..bd4ccfa907ba 100644 --- a/velox/connectors/hive/storage_adapters/gcs/GCSFileSystem.cpp +++ b/velox/connectors/hive/storage_adapters/gcs/GCSFileSystem.cpp @@ -16,10 +16,10 @@ #include "velox/connectors/hive/storage_adapters/gcs/GCSFileSystem.h" #include "velox/common/base/Exceptions.h" +#include "velox/common/config/Config.h" #include "velox/common/file/File.h" #include "velox/connectors/hive/HiveConfig.h" #include "velox/connectors/hive/storage_adapters/gcs/GCSUtil.h" -#include "velox/core/Config.h" #include "velox/core/QueryConfig.h" #include @@ -258,9 +258,9 @@ auto constexpr kGCSInvalidPath = "File {} is not a valid gcs file"; class GCSFileSystem::Impl { public: - Impl(const Config* config) + Impl(const config::ConfigBase* config) : hiveConfig_(std::make_shared( - std::make_shared(config->values()))) {} + std::make_shared(config->rawConfigsCopy()))) {} ~Impl() = default; @@ -315,7 +315,7 @@ class GCSFileSystem::Impl { std::shared_ptr client_; }; -GCSFileSystem::GCSFileSystem(std::shared_ptr config) +GCSFileSystem::GCSFileSystem(std::shared_ptr config) : FileSystem(config) { impl_ = std::make_shared(config.get()); } diff --git a/velox/connectors/hive/storage_adapters/gcs/GCSFileSystem.h b/velox/connectors/hive/storage_adapters/gcs/GCSFileSystem.h index 5cd98ed0fd72..0d80cacd9df1 100644 --- a/velox/connectors/hive/storage_adapters/gcs/GCSFileSystem.h +++ b/velox/connectors/hive/storage_adapters/gcs/GCSFileSystem.h @@ -26,7 +26,7 @@ namespace facebook::velox::filesystems { /// (register|generate)ReadFile and (register|generate)WriteFile functions. class GCSFileSystem : public FileSystem { public: - explicit GCSFileSystem(std::shared_ptr config); + explicit GCSFileSystem(std::shared_ptr config); /// Initialize the google::cloud::storage::Client from the input Config /// parameters. diff --git a/velox/connectors/hive/storage_adapters/gcs/RegisterGCSFileSystem.cpp b/velox/connectors/hive/storage_adapters/gcs/RegisterGCSFileSystem.cpp index 7be24fa1b523..3474c8d4dfb9 100644 --- a/velox/connectors/hive/storage_adapters/gcs/RegisterGCSFileSystem.cpp +++ b/velox/connectors/hive/storage_adapters/gcs/RegisterGCSFileSystem.cpp @@ -15,9 +15,9 @@ */ #ifdef VELOX_ENABLE_GCS +#include "velox/common/config/Config.h" #include "velox/connectors/hive/storage_adapters/gcs/GCSFileSystem.h" // @manual #include "velox/connectors/hive/storage_adapters/gcs/GCSUtil.h" // @manual -#include "velox/core/Config.h" #endif namespace facebook::velox::filesystems { @@ -26,28 +26,30 @@ namespace facebook::velox::filesystems { folly::once_flag GCSInstantiationFlag; std::function(std::shared_ptr, std::string_view)> + FileSystem>(std::shared_ptr, std::string_view)> gcsFileSystemGenerator() { - static auto filesystemGenerator = [](std::shared_ptr properties, - std::string_view filePath) { - // Only one instance of GCSFileSystem is supported for now (follow S3 for - // now). - // TODO: Support multiple GCSFileSystem instances using a cache - // Initialize on first access and reuse after that. - static std::shared_ptr gcsfs; - folly::call_once(GCSInstantiationFlag, [&properties]() { - std::shared_ptr fs; - if (properties != nullptr) { - fs = std::make_shared(properties); - } else { - fs = std::make_shared( - std::make_shared()); - } - fs->initializeClient(); - gcsfs = fs; - }); - return gcsfs; - }; + static auto filesystemGenerator = + [](std::shared_ptr properties, + std::string_view filePath) { + // Only one instance of GCSFileSystem is supported for now (follow S3 + // for now). + // TODO: Support multiple GCSFileSystem instances using a cache + // Initialize on first access and reuse after that. + static std::shared_ptr gcsfs; + folly::call_once(GCSInstantiationFlag, [&properties]() { + std::shared_ptr fs; + if (properties != nullptr) { + fs = std::make_shared(properties); + } else { + fs = std::make_shared( + std::make_shared( + std::unordered_map())); + } + fs->initializeClient(); + gcsfs = fs; + }); + return gcsfs; + }; return filesystemGenerator; } #endif diff --git a/velox/connectors/hive/storage_adapters/gcs/examples/GCSFileSystemExample.cpp b/velox/connectors/hive/storage_adapters/gcs/examples/GCSFileSystemExample.cpp index 9288c1d94575..ee026a86e0db 100644 --- a/velox/connectors/hive/storage_adapters/gcs/examples/GCSFileSystemExample.cpp +++ b/velox/connectors/hive/storage_adapters/gcs/examples/GCSFileSystemExample.cpp @@ -13,14 +13,12 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +#include "velox/common/config/Config.h" #include "velox/common/file/File.h" #include "velox/connectors/hive/storage_adapters/gcs/GCSFileSystem.h" -#include "velox/core/Config.h" #include - #include - #include DEFINE_string(gcs_path, "", "Path of GCS bucket"); @@ -37,7 +35,7 @@ auto newConfiguration() { if (!FLAGS_gcs_max_retry_time.empty()) { configOverride.emplace("hive.gcs.max-retry-time", FLAGS_gcs_max_retry_time); } - return std::make_shared(std::move(configOverride)); + return std::make_shared(std::move(configOverride)); } int main(int argc, char** argv) { diff --git a/velox/connectors/hive/storage_adapters/gcs/tests/GCSFileSystemTest.cpp b/velox/connectors/hive/storage_adapters/gcs/tests/GCSFileSystemTest.cpp index 545af97ba4a2..5293901f61f9 100644 --- a/velox/connectors/hive/storage_adapters/gcs/tests/GCSFileSystemTest.cpp +++ b/velox/connectors/hive/storage_adapters/gcs/tests/GCSFileSystemTest.cpp @@ -16,10 +16,10 @@ #include "velox/connectors/hive/storage_adapters/gcs/GCSFileSystem.h" #include "velox/common/base/tests/GTestUtils.h" +#include "velox/common/config/Config.h" #include "velox/common/file/File.h" #include "velox/connectors/hive/FileHandle.h" #include "velox/connectors/hive/storage_adapters/gcs/GCSUtil.h" -#include "velox/core/Config.h" #include "velox/exec/tests/utils/TempFilePath.h" #include @@ -139,12 +139,13 @@ class GCSFileSystemTest : public testing::Test { << ">, status=" << object.status(); } - std::shared_ptr testGcsOptions() const { + std::shared_ptr testGcsOptions() const { std::unordered_map configOverride = {}; configOverride["hive.gcs.scheme"] = "http"; configOverride["hive.gcs.endpoint"] = "localhost:" + testbench_->port(); - return std::make_shared(std::move(configOverride)); + return std::make_shared( + std::move(configOverride)); } std::string preexistingBucketName() { @@ -345,8 +346,8 @@ TEST_F(GCSFileSystemTest, credentialsConfig) { })"""; configOverride["hive.gcs.scheme"] = "http"; configOverride["hive.gcs.endpoint"] = "localhost:" + testbench_->port(); - std::shared_ptr conf = - std::make_shared(std::move(configOverride)); + std::shared_ptr conf = + std::make_shared(std::move(configOverride)); filesystems::GCSFileSystem gcfs(conf); diff --git a/velox/connectors/hive/storage_adapters/hdfs/HdfsFileSystem.cpp b/velox/connectors/hive/storage_adapters/hdfs/HdfsFileSystem.cpp index af18aa32e5e9..3d6636fbb878 100644 --- a/velox/connectors/hive/storage_adapters/hdfs/HdfsFileSystem.cpp +++ b/velox/connectors/hive/storage_adapters/hdfs/HdfsFileSystem.cpp @@ -16,9 +16,9 @@ #include "velox/connectors/hive/storage_adapters/hdfs/HdfsFileSystem.h" #include #include +#include "velox/common/config/Config.h" #include "velox/connectors/hive/storage_adapters/hdfs/HdfsReadFile.h" #include "velox/connectors/hive/storage_adapters/hdfs/HdfsWriteFile.h" -#include "velox/core/Config.h" namespace facebook::velox::filesystems { std::string_view HdfsFileSystem::kScheme("hdfs://"); @@ -26,7 +26,9 @@ std::string_view HdfsFileSystem::kScheme("hdfs://"); class HdfsFileSystem::Impl { public: // Keep config here for possible use in the future. - explicit Impl(const Config* config, const HdfsServiceEndpoint& endpoint) { + explicit Impl( + const config::ConfigBase* config, + const HdfsServiceEndpoint& endpoint) { auto builder = hdfsNewBuilder(); hdfsBuilderSetNameNode(builder, endpoint.host.c_str()); hdfsBuilderSetNameNodePort(builder, atoi(endpoint.port.data())); @@ -57,7 +59,7 @@ class HdfsFileSystem::Impl { }; HdfsFileSystem::HdfsFileSystem( - const std::shared_ptr& config, + const std::shared_ptr& config, const HdfsServiceEndpoint& endpoint) : FileSystem(config) { impl_ = std::make_shared(config.get(), endpoint); @@ -94,17 +96,17 @@ bool HdfsFileSystem::isHdfsFile(const std::string_view filePath) { /// fixed one from configuration. HdfsServiceEndpoint HdfsFileSystem::getServiceEndpoint( const std::string_view filePath, - const Config* config) { + const config::ConfigBase* config) { auto endOfIdentityInfo = filePath.find('/', kScheme.size()); std::string hdfsIdentity{ filePath.data(), kScheme.size(), endOfIdentityInfo - kScheme.size()}; if (hdfsIdentity.empty()) { // Fall back to get a fixed endpoint from config. - auto hdfsHost = config->get("hive.hdfs.host"); + auto hdfsHost = config->get("hive.hdfs.host"); VELOX_CHECK( hdfsHost.hasValue(), "hdfsHost is empty, configuration missing for hdfs host"); - auto hdfsPort = config->get("hive.hdfs.port"); + auto hdfsPort = config->get("hive.hdfs.port"); VELOX_CHECK( hdfsPort.hasValue(), "hdfsPort is empty, configuration missing for hdfs port"); diff --git a/velox/connectors/hive/storage_adapters/hdfs/HdfsFileSystem.h b/velox/connectors/hive/storage_adapters/hdfs/HdfsFileSystem.h index 295df6f8f0f6..c9a4e879bdfa 100644 --- a/velox/connectors/hive/storage_adapters/hdfs/HdfsFileSystem.h +++ b/velox/connectors/hive/storage_adapters/hdfs/HdfsFileSystem.h @@ -43,7 +43,7 @@ struct HdfsServiceEndpoint { class HdfsFileSystem : public FileSystem { public: explicit HdfsFileSystem( - const std::shared_ptr& config, + const std::shared_ptr& config, const HdfsServiceEndpoint& endpoint); std::string name() const override; @@ -88,7 +88,7 @@ class HdfsFileSystem : public FileSystem { /// will be used. static HdfsServiceEndpoint getServiceEndpoint( const std::string_view filePath, - const Config* config); + const config::ConfigBase* config); static std::string_view kScheme; diff --git a/velox/connectors/hive/storage_adapters/hdfs/RegisterHdfsFileSystem.cpp b/velox/connectors/hive/storage_adapters/hdfs/RegisterHdfsFileSystem.cpp index d1db8bc864e2..bdff4a7a4fdc 100644 --- a/velox/connectors/hive/storage_adapters/hdfs/RegisterHdfsFileSystem.cpp +++ b/velox/connectors/hive/storage_adapters/hdfs/RegisterHdfsFileSystem.cpp @@ -17,9 +17,9 @@ #ifdef VELOX_ENABLE_HDFS3 #include "folly/concurrency/ConcurrentHashMap.h" +#include "velox/common/config/Config.h" #include "velox/connectors/hive/storage_adapters/hdfs/HdfsFileSystem.h" // @manual #include "velox/connectors/hive/storage_adapters/hdfs/HdfsUtil.h" // @manual -#include "velox/core/Config.h" #include "velox/dwio/common/FileSink.h" #endif @@ -29,9 +29,10 @@ namespace facebook::velox::filesystems { std::mutex mtx; std::function(std::shared_ptr, std::string_view)> + FileSystem>(std::shared_ptr, std::string_view)> hdfsFileSystemGenerator() { - static auto filesystemGenerator = [](std::shared_ptr properties, + static auto filesystemGenerator = [](std::shared_ptr + properties, std::string_view filePath) { static folly::ConcurrentHashMap> filesystems; diff --git a/velox/connectors/hive/storage_adapters/hdfs/tests/HdfsFileSystemTest.cpp b/velox/connectors/hive/storage_adapters/hdfs/tests/HdfsFileSystemTest.cpp index 80bc0eaca37c..da65d8e03478 100644 --- a/velox/connectors/hive/storage_adapters/hdfs/tests/HdfsFileSystemTest.cpp +++ b/velox/connectors/hive/storage_adapters/hdfs/tests/HdfsFileSystemTest.cpp @@ -100,10 +100,11 @@ void readData(ReadFile* readFile) { } std::unique_ptr openFileForWrite(std::string_view path) { - auto memConfig = std::make_shared(configurationValues); + auto config = std::make_shared( + std::unordered_map(configurationValues)); std::string hdfsFilePath = "hdfs://" + localhost + ":" + hdfsPort + std::string(path); - auto hdfsFileSystem = filesystems::getFileSystem(hdfsFilePath, memConfig); + auto hdfsFileSystem = filesystems::getFileSystem(hdfsFilePath, config); return hdfsFileSystem->openFileForWrite(path); } @@ -156,12 +157,12 @@ void verifyFailures(hdfsFS hdfs) { HdfsFileSystemTest::miniCluster->stop(); checkReadErrorMessages(&readFile2, readFailErrorMessage, 1); try { - auto memConfig = - std::make_shared(configurationValues); + auto config = std::make_shared( + std::unordered_map(configurationValues)); filesystems::HdfsFileSystem hdfsFileSystem( - memConfig, + config, filesystems::HdfsFileSystem::getServiceEndpoint( - simpleDestinationPath, memConfig.get())); + simpleDestinationPath, config.get())); FAIL() << "expected VeloxException"; } catch (VeloxException const& error) { EXPECT_THAT(error.message(), testing::HasSubstr(builderErrorMessage)); @@ -178,18 +179,18 @@ TEST_F(HdfsFileSystemTest, read) { } TEST_F(HdfsFileSystemTest, viaFileSystem) { - auto memConfig = std::make_shared(configurationValues); - auto hdfsFileSystem = - filesystems::getFileSystem(fullDestinationPath, memConfig); + auto config = std::make_shared( + std::unordered_map(configurationValues)); + auto hdfsFileSystem = filesystems::getFileSystem(fullDestinationPath, config); auto readFile = hdfsFileSystem->openFileForRead(fullDestinationPath); readData(readFile.get()); } TEST_F(HdfsFileSystemTest, initializeFsWithEndpointInfoInFilePath) { // Without host/port configured. - auto memConfig = std::make_shared(); - auto hdfsFileSystem = - filesystems::getFileSystem(fullDestinationPath, memConfig); + auto config = std::make_shared( + std::unordered_map()); + auto hdfsFileSystem = filesystems::getFileSystem(fullDestinationPath, config); auto readFile = hdfsFileSystem->openFileForRead(fullDestinationPath); readData(readFile.get()); @@ -197,14 +198,15 @@ TEST_F(HdfsFileSystemTest, initializeFsWithEndpointInfoInFilePath) { const std::string wrongFullDestinationPath = "hdfs://not_exist_host:" + hdfsPort + destinationPath; VELOX_ASSERT_THROW( - filesystems::getFileSystem(wrongFullDestinationPath, memConfig), + filesystems::getFileSystem(wrongFullDestinationPath, config), "Unable to connect to HDFS"); } TEST_F(HdfsFileSystemTest, fallbackToUseConfig) { - auto memConfig = std::make_shared(configurationValues); + auto config = std::make_shared( + std::unordered_map(configurationValues)); auto hdfsFileSystem = - filesystems::getFileSystem(simpleDestinationPath, memConfig); + filesystems::getFileSystem(simpleDestinationPath, config); auto readFile = hdfsFileSystem->openFileForRead(simpleDestinationPath); readData(readFile.get()); } @@ -218,9 +220,9 @@ TEST_F(HdfsFileSystemTest, oneFsInstanceForOneEndpoint) { } TEST_F(HdfsFileSystemTest, missingFileViaFileSystem) { - auto memConfig = std::make_shared(configurationValues); - auto hdfsFileSystem = - filesystems::getFileSystem(fullDestinationPath, memConfig); + auto config = std::make_shared( + std::unordered_map(configurationValues)); + auto hdfsFileSystem = filesystems::getFileSystem(fullDestinationPath, config); VELOX_ASSERT_RUNTIME_THROW_CODE( hdfsFileSystem->openFileForRead( "hdfs://localhost:7777/path/that/does/not/exist"), @@ -232,12 +234,12 @@ TEST_F(HdfsFileSystemTest, missingHost) { try { std::unordered_map missingHostConfiguration( {{"hive.hdfs.port", hdfsPort}}); - auto memConfig = - std::make_shared(missingHostConfiguration); + auto config = std::make_shared( + std::move(missingHostConfiguration)); filesystems::HdfsFileSystem hdfsFileSystem( - memConfig, + config, filesystems::HdfsFileSystem::getServiceEndpoint( - simpleDestinationPath, memConfig.get())); + simpleDestinationPath, config.get())); FAIL() << "expected VeloxException"; } catch (VeloxException const& error) { EXPECT_THAT( @@ -251,12 +253,12 @@ TEST_F(HdfsFileSystemTest, missingPort) { try { std::unordered_map missingPortConfiguration( {{"hive.hdfs.host", localhost}}); - auto memConfig = - std::make_shared(missingPortConfiguration); + auto config = std::make_shared( + std::move(missingPortConfiguration)); filesystems::HdfsFileSystem hdfsFileSystem( - memConfig, + config, filesystems::HdfsFileSystem::getServiceEndpoint( - simpleDestinationPath, memConfig.get())); + simpleDestinationPath, config.get())); FAIL() << "expected VeloxException"; } catch (VeloxException const& error) { EXPECT_THAT( @@ -300,10 +302,10 @@ TEST_F(HdfsFileSystemTest, schemeMatching) { TEST_F(HdfsFileSystemTest, writeNotSupported) { try { - auto memConfig = - std::make_shared(configurationValues); + auto config = std::make_shared( + std::unordered_map(configurationValues)); auto hdfsFileSystem = - filesystems::getFileSystem(fullDestinationPath, memConfig); + filesystems::getFileSystem(fullDestinationPath, config); hdfsFileSystem->openFileForWrite("/path"); } catch (VeloxException const& error) { EXPECT_EQ(error.message(), "Write to HDFS is unsupported"); @@ -312,10 +314,10 @@ TEST_F(HdfsFileSystemTest, writeNotSupported) { TEST_F(HdfsFileSystemTest, removeNotSupported) { try { - auto memConfig = - std::make_shared(configurationValues); + auto config = std::make_shared( + std::unordered_map(configurationValues)); auto hdfsFileSystem = - filesystems::getFileSystem(fullDestinationPath, memConfig); + filesystems::getFileSystem(fullDestinationPath, config); hdfsFileSystem->remove("/path"); } catch (VeloxException const& error) { EXPECT_EQ(error.message(), "Does not support removing files from hdfs"); @@ -355,9 +357,9 @@ TEST_F(HdfsFileSystemTest, multipleThreadsWithReadFile) { TEST_F(HdfsFileSystemTest, multipleThreadsWithFileSystem) { startThreads = false; - auto memConfig = std::make_shared(configurationValues); - auto hdfsFileSystem = - filesystems::getFileSystem(fullDestinationPath, memConfig); + auto config = std::make_shared( + std::unordered_map(configurationValues)); + auto hdfsFileSystem = filesystems::getFileSystem(fullDestinationPath, config); std::vector threads; std::mt19937 generator(std::random_device{}()); diff --git a/velox/connectors/hive/storage_adapters/s3fs/RegisterS3FileSystem.cpp b/velox/connectors/hive/storage_adapters/s3fs/RegisterS3FileSystem.cpp index 885fa2439f19..6f746b1c1bc4 100644 --- a/velox/connectors/hive/storage_adapters/s3fs/RegisterS3FileSystem.cpp +++ b/velox/connectors/hive/storage_adapters/s3fs/RegisterS3FileSystem.cpp @@ -37,7 +37,7 @@ FileSystemMap& fileSystems() { return instances; } -std::string getS3Identity(const std::shared_ptr& config) { +std::string getS3Identity(const std::shared_ptr& config) { HiveConfig hiveConfig = HiveConfig(config); auto endpoint = hiveConfig.s3Endpoint(); if (!endpoint.empty()) { @@ -49,11 +49,13 @@ std::string getS3Identity(const std::shared_ptr& config) { } std::shared_ptr fileSystemGenerator( - std::shared_ptr properties, + std::shared_ptr properties, std::string_view /*filePath*/) { - std::shared_ptr config = std::make_shared(); + std::shared_ptr config = + std::make_shared( + std::unordered_map()); if (properties) { - *config = core::MemConfig(properties->values()); + config = std::make_shared(properties->rawConfigsCopy()); } const auto s3Identity = getS3Identity(config); diff --git a/velox/connectors/hive/storage_adapters/s3fs/S3FileSystem.cpp b/velox/connectors/hive/storage_adapters/s3fs/S3FileSystem.cpp index 168f2474842b..08a4261703f9 100644 --- a/velox/connectors/hive/storage_adapters/s3fs/S3FileSystem.cpp +++ b/velox/connectors/hive/storage_adapters/s3fs/S3FileSystem.cpp @@ -15,11 +15,11 @@ */ #include "velox/connectors/hive/storage_adapters/s3fs/S3FileSystem.h" +#include "velox/common/config/Config.h" #include "velox/common/file/File.h" #include "velox/connectors/hive/HiveConfig.h" #include "velox/connectors/hive/storage_adapters/s3fs/S3Util.h" #include "velox/connectors/hive/storage_adapters/s3fs/S3WriteFile.h" -#include "velox/core/Config.h" #include "velox/core/QueryConfig.h" #include "velox/dwio/common/DataBuffer.h" @@ -438,7 +438,7 @@ struct AwsInstance { } // Returns true iff the instance was newly initialized with config. - bool initialize(const Config* config) { + bool initialize(const config::ConfigBase* config) { if (isFinalized_.load()) { VELOX_FAIL("Attempt to initialize S3 after it has been finalized."); } @@ -476,9 +476,9 @@ struct AwsInstance { } private: - void doInitialize(const Config* config) { + void doInitialize(const config::ConfigBase* config) { std::shared_ptr hiveConfig = std::make_shared( - std::make_shared(config->values())); + std::make_shared(config->rawConfigsCopy())); awsOptions_.loggingOptions.logLevel = inferS3LogLevel(hiveConfig->s3GetLogLevel()); // In some situations, curl triggers a SIGPIPE signal causing the entire @@ -503,7 +503,7 @@ AwsInstance* getAwsInstance() { return instance.get(); } -bool initializeS3(const Config* config) { +bool initializeS3(const config::ConfigBase* config) { return getAwsInstance()->initialize(config); } @@ -516,9 +516,9 @@ void finalizeS3() { class S3FileSystem::Impl { public: - Impl(const Config* config) { + Impl(const config::ConfigBase* config) { hiveConfig_ = std::make_shared( - std::make_shared(config->values())); + std::make_shared(config->rawConfigsCopy())); VELOX_CHECK(getAwsInstance()->isInitialized(), "S3 is not initialized"); Aws::Client::ClientConfiguration clientConfig; clientConfig.endpointOverride = hiveConfig_->s3Endpoint(); @@ -714,7 +714,7 @@ class S3FileSystem::Impl { std::shared_ptr client_; }; -S3FileSystem::S3FileSystem(std::shared_ptr config) +S3FileSystem::S3FileSystem(std::shared_ptr config) : FileSystem(config) { impl_ = std::make_shared(config.get()); } diff --git a/velox/connectors/hive/storage_adapters/s3fs/S3FileSystem.h b/velox/connectors/hive/storage_adapters/s3fs/S3FileSystem.h index caa66583d215..088575760f99 100644 --- a/velox/connectors/hive/storage_adapters/s3fs/S3FileSystem.h +++ b/velox/connectors/hive/storage_adapters/s3fs/S3FileSystem.h @@ -22,7 +22,7 @@ namespace facebook::velox::filesystems { using namespace facebook::velox::connector::hive; -bool initializeS3(const Config* config); +bool initializeS3(const config::ConfigBase* config); void finalizeS3(); @@ -31,7 +31,7 @@ void finalizeS3(); /// type of file can be constructed based on a filename. class S3FileSystem : public FileSystem { public: - explicit S3FileSystem(std::shared_ptr config); + explicit S3FileSystem(std::shared_ptr config); std::string name() const override; diff --git a/velox/connectors/hive/storage_adapters/s3fs/tests/MinioServer.h b/velox/connectors/hive/storage_adapters/s3fs/tests/MinioServer.h index a625b9995ac6..591ed403f350 100644 --- a/velox/connectors/hive/storage_adapters/s3fs/tests/MinioServer.h +++ b/velox/connectors/hive/storage_adapters/s3fs/tests/MinioServer.h @@ -16,7 +16,7 @@ #pragma once -#include "velox/core/Config.h" +#include "velox/common/config/Config.h" #include "velox/exec/tests/utils/PortUtil.h" #include "velox/exec/tests/utils/TempDirectoryPath.h" @@ -54,7 +54,7 @@ class MinioServer { return tempPath_->getPath(); } - std::shared_ptr hiveConfig( + std::shared_ptr hiveConfig( const std::unordered_map configOverride = {}) const { std::unordered_map config({ @@ -70,7 +70,7 @@ class MinioServer { config[configName] = configValue; } - return std::make_shared(std::move(config)); + return std::make_shared(std::move(config)); } private: diff --git a/velox/connectors/hive/storage_adapters/s3fs/tests/S3FileSystemFinalizeTest.cpp b/velox/connectors/hive/storage_adapters/s3fs/tests/S3FileSystemFinalizeTest.cpp index 4a383d9b229a..1ee0387faa0e 100644 --- a/velox/connectors/hive/storage_adapters/s3fs/tests/S3FileSystemFinalizeTest.cpp +++ b/velox/connectors/hive/storage_adapters/s3fs/tests/S3FileSystemFinalizeTest.cpp @@ -15,8 +15,8 @@ */ #include "velox/common/base/tests/GTestUtils.h" +#include "velox/common/config/Config.h" #include "velox/connectors/hive/storage_adapters/s3fs/S3FileSystem.h" -#include "velox/core/Config.h" #include "gtest/gtest.h" @@ -24,7 +24,8 @@ namespace facebook::velox { namespace { TEST(S3FileSystemFinalizeTest, finalize) { - auto s3Config = std::make_shared(); + auto s3Config = std::make_shared( + std::unordered_map()); ASSERT_TRUE(filesystems::initializeS3(s3Config.get())); ASSERT_FALSE(filesystems::initializeS3(s3Config.get())); { diff --git a/velox/connectors/hive/storage_adapters/s3fs/tests/S3FileSystemTest.cpp b/velox/connectors/hive/storage_adapters/s3fs/tests/S3FileSystemTest.cpp index 93de7e79e515..1b92147d5b88 100644 --- a/velox/connectors/hive/storage_adapters/s3fs/tests/S3FileSystemTest.cpp +++ b/velox/connectors/hive/storage_adapters/s3fs/tests/S3FileSystemTest.cpp @@ -62,11 +62,11 @@ TEST_F(S3FileSystemTest, writeAndRead) { TEST_F(S3FileSystemTest, invalidCredentialsConfig) { { - const std::unordered_map config( + std::unordered_map config( {{"hive.s3.use-instance-credentials", "true"}, {"hive.s3.iam-role", "dummy-iam-role"}}); auto hiveConfig = - std::make_shared(std::move(config)); + std::make_shared(std::move(config)); // Both instance credentials and iam-role cannot be specified VELOX_ASSERT_THROW( @@ -74,34 +74,34 @@ TEST_F(S3FileSystemTest, invalidCredentialsConfig) { "Invalid configuration: specify only one among 'access/secret keys', 'use instance credentials', 'IAM role'"); } { - const std::unordered_map config( + std::unordered_map config( {{"hive.s3.aws-secret-key", "dummy-key"}, {"hive.s3.aws-access-key", "dummy-key"}, {"hive.s3.iam-role", "dummy-iam-role"}}); auto hiveConfig = - std::make_shared(std::move(config)); + std::make_shared(std::move(config)); // Both access/secret keys and iam-role cannot be specified VELOX_ASSERT_THROW( filesystems::S3FileSystem(hiveConfig), "Invalid configuration: specify only one among 'access/secret keys', 'use instance credentials', 'IAM role'"); } { - const std::unordered_map config( + std::unordered_map config( {{"hive.s3.aws-secret-key", "dummy"}, {"hive.s3.aws-access-key", "dummy"}, {"hive.s3.use-instance-credentials", "true"}}); auto hiveConfig = - std::make_shared(std::move(config)); + std::make_shared(std::move(config)); // Both access/secret keys and instance credentials cannot be specified VELOX_ASSERT_THROW( filesystems::S3FileSystem(hiveConfig), "Invalid configuration: specify only one among 'access/secret keys', 'use instance credentials', 'IAM role'"); } { - const std::unordered_map config( + std::unordered_map config( {{"hive.s3.aws-secret-key", "dummy"}}); auto hiveConfig = - std::make_shared(std::move(config)); + std::make_shared(std::move(config)); // Both access key and secret key must be specified VELOX_ASSERT_THROW( filesystems::S3FileSystem(hiveConfig), @@ -167,7 +167,8 @@ TEST_F(S3FileSystemTest, noBackendServer) { TEST_F(S3FileSystemTest, logLevel) { std::unordered_map config; auto checkLogLevelName = [&config](std::string_view expected) { - auto s3Config = std::make_shared(config); + auto s3Config = + std::make_shared(std::move(config)); filesystems::S3FileSystem s3fs(s3Config); EXPECT_EQ(s3fs.getLogLevelName(), expected); }; diff --git a/velox/connectors/hive/tests/HiveConfigTest.cpp b/velox/connectors/hive/tests/HiveConfigTest.cpp index 9595401ed2d9..b4585ecd4443 100644 --- a/velox/connectors/hive/tests/HiveConfigTest.cpp +++ b/velox/connectors/hive/tests/HiveConfigTest.cpp @@ -16,15 +16,17 @@ #include "velox/connectors/hive/HiveConfig.h" #include "gtest/gtest.h" -#include "velox/core/Config.h" +#include "velox/common/config/Config.h" +using namespace facebook::velox; using namespace facebook::velox::connector::hive; -using namespace facebook::velox::core; using facebook::velox::connector::hive::HiveConfig; TEST(HiveConfigTest, defaultConfig) { - HiveConfig hiveConfig(std::make_shared()); - const auto emptySession = std::make_unique(); + HiveConfig hiveConfig(std::make_shared( + std::unordered_map())); + const auto emptySession = std::make_unique( + std::unordered_map()); ASSERT_EQ( hiveConfig.insertExistingPartitionsBehavior(emptySession.get()), facebook::velox::connector::hive::HiveConfig:: @@ -77,7 +79,7 @@ TEST(HiveConfigTest, defaultConfig) { } TEST(HiveConfigTest, overrideConfig) { - const std::unordered_map configFromFile = { + std::unordered_map configFromFile = { {HiveConfig::kInsertExistingPartitionsBehavior, "OVERWRITE"}, {HiveConfig::kMaxPartitionsPerWriters, "120"}, {HiveConfig::kImmutablePartitions, "true"}, @@ -109,8 +111,10 @@ TEST(HiveConfigTest, overrideConfig) { {HiveConfig::kOrcWriterMinCompressionSize, "512"}, {HiveConfig::kOrcWriterCompressionLevel, "1"}, {HiveConfig::kCacheNoRetention, "true"}}; - HiveConfig hiveConfig(std::make_shared(configFromFile)); - auto emptySession = std::make_unique(); + HiveConfig hiveConfig( + std::make_shared(std::move(configFromFile))); + auto emptySession = std::make_shared( + std::unordered_map()); ASSERT_EQ( hiveConfig.insertExistingPartitionsBehavior(emptySession.get()), facebook::velox::connector::hive::HiveConfig:: @@ -161,8 +165,9 @@ TEST(HiveConfigTest, overrideConfig) { } TEST(HiveConfigTest, overrideSession) { - HiveConfig hiveConfig(std::make_shared()); - const std::unordered_map sessionOverride = { + HiveConfig hiveConfig(std::make_shared( + std::unordered_map())); + std::unordered_map sessionOverride = { {HiveConfig::kInsertExistingPartitionsBehaviorSession, "OVERWRITE"}, {HiveConfig::kOrcUseColumnNamesSession, "true"}, {HiveConfig::kFileColumnNamesReadAsLowerCaseSession, "true"}, @@ -178,7 +183,8 @@ TEST(HiveConfigTest, overrideSession) { {HiveConfig::kOrcWriterCompressionLevelSession, "1"}, {HiveConfig::kOrcWriterLinearStripeSizeHeuristicsSession, "false"}, {HiveConfig::kCacheNoRetentionSession, "true"}}; - const auto session = std::make_unique(sessionOverride); + const auto session = + std::make_unique(std::move(sessionOverride)); ASSERT_EQ( hiveConfig.insertExistingPartitionsBehavior(session.get()), facebook::velox::connector::hive::HiveConfig:: diff --git a/velox/connectors/hive/tests/HiveConnectorUtilTest.cpp b/velox/connectors/hive/tests/HiveConnectorUtilTest.cpp index c452c9e0d499..1ca8ae88be78 100644 --- a/velox/connectors/hive/tests/HiveConnectorUtilTest.cpp +++ b/velox/connectors/hive/tests/HiveConnectorUtilTest.cpp @@ -19,7 +19,6 @@ #include "velox/connectors/hive/HiveConfig.h" #include "velox/connectors/hive/HiveConnectorSplit.h" #include "velox/connectors/hive/TableHandle.h" -#include "velox/core/Config.h" #include "velox/exec/tests/utils/HiveConnectorTestBase.h" #include "velox/exec/tests/utils/PrefixSortUtils.h" @@ -42,7 +41,7 @@ class HiveConnectorUtilTest : public exec::test::HiveConnectorTestBase { }; TEST_F(HiveConnectorUtilTest, configureReaderOptions) { - core::MemConfig sessionProperties; + config::ConfigBase sessionProperties({}); auto connectorQueryCtx = std::make_unique( pool_.get(), pool_.get(), @@ -57,7 +56,8 @@ TEST_F(HiveConnectorUtilTest, configureReaderOptions) { 0, ""); auto hiveConfig = - std::make_shared(std::make_shared()); + std::make_shared(std::make_shared( + std::unordered_map())); const std::unordered_map> partitionKeys; const std::unordered_map customSplitInfo; @@ -188,7 +188,7 @@ TEST_F(HiveConnectorUtilTest, configureReaderOptions) { customHiveConfigProps[hive::HiveConfig::kFilePreloadThreshold] = "9999"; customHiveConfigProps[hive::HiveConfig::kPrefetchRowGroups] = "10"; hiveConfig = std::make_shared( - std::make_shared(customHiveConfigProps)); + std::make_shared(std::move(customHiveConfigProps))); performConfigure(); EXPECT_EQ(readerOptions.loadQuantum(), hiveConfig->loadQuantum()); EXPECT_EQ(readerOptions.maxCoalesceBytes(), hiveConfig->maxCoalescedBytes()); diff --git a/velox/connectors/hive/tests/HiveDataSinkTest.cpp b/velox/connectors/hive/tests/HiveDataSinkTest.cpp index f8f74461bef6..c8d959b2f5c9 100644 --- a/velox/connectors/hive/tests/HiveDataSinkTest.cpp +++ b/velox/connectors/hive/tests/HiveDataSinkTest.cpp @@ -22,7 +22,6 @@ #include "velox/common/base/Fs.h" #include "velox/common/base/tests/GTestUtils.h" #include "velox/common/testutil/TestValue.h" -#include "velox/core/Config.h" #include "velox/dwio/common/Options.h" #include "velox/dwio/dwrf/writer/Writer.h" #include "velox/exec/tests/utils/PlanBuilder.h" @@ -204,11 +203,13 @@ class HiveDataSinkTest : public exec::test::HiveConnectorTestBase { std::shared_ptr opPool_; std::shared_ptr connectorPool_; RowTypePtr rowType_; - std::shared_ptr connectorSessionProperties_ = - std::make_shared(); + std::shared_ptr connectorSessionProperties_ = + std::make_shared( + std::unordered_map()); std::unique_ptr connectorQueryCtx_; std::shared_ptr connectorConfig_ = - std::make_shared(std::make_shared()); + std::make_shared(std::make_shared( + std::unordered_map())); std::unique_ptr spillExecutor_; }; @@ -820,7 +821,7 @@ TEST_F(HiveDataSinkTest, memoryReclaimAfterClose) { connectorConfig.emplace("hive.orc.writer.dictionary-max-memory", "1GB"); connectorConfig_ = std::make_shared( - std::make_shared(std::move(connectorConfig))); + std::make_shared(std::move(connectorConfig))); const auto outputDirectory = TempDirectoryPath::create(); std::shared_ptr bucketProperty; std::vector partitionBy; diff --git a/velox/connectors/tests/ConnectorTest.cpp b/velox/connectors/tests/ConnectorTest.cpp index 9ae293ecc066..171a16a6f4fc 100644 --- a/velox/connectors/tests/ConnectorTest.cpp +++ b/velox/connectors/tests/ConnectorTest.cpp @@ -16,6 +16,7 @@ #include "velox/connectors/Connector.h" #include "velox/common/base/tests/GTestUtils.h" +#include "velox/common/config/Config.h" #include @@ -61,6 +62,13 @@ class TestConnectorFactory : public connector::ConnectorFactory { folly::Executor* /*executor*/ = nullptr) override { return std::make_shared(id); } + + std::shared_ptr newConnector( + const std::string& id, + std::shared_ptr /*config*/, + folly::Executor* /*executor*/ = nullptr) override { + return std::make_shared(id); + } }; } // namespace @@ -75,7 +83,10 @@ TEST_F(ConnectorTest, getAllConnectors) { for (int32_t i = 0; i < numConnectors; i++) { registerConnector( getConnectorFactory(TestConnectorFactory::kConnectorFactoryName) - ->newConnector(fmt::format("connector-{}", i), {})); + ->newConnector( + fmt::format("connector-{}", i), + std::make_shared( + std::unordered_map()))); } const auto& connectors = getAllConnectors(); EXPECT_EQ(connectors.size(), numConnectors); diff --git a/velox/connectors/tpch/TpchConnector.h b/velox/connectors/tpch/TpchConnector.h index 341b20254b85..4b54e0696728 100644 --- a/velox/connectors/tpch/TpchConnector.h +++ b/velox/connectors/tpch/TpchConnector.h @@ -15,8 +15,10 @@ */ #pragma once +#include "velox/common/config/Config.h" #include "velox/connectors/Connector.h" #include "velox/connectors/tpch/TpchConnectorSplit.h" +#include "velox/core/Config.h" #include "velox/tpch/gen/TpchGen.h" namespace facebook::velox::connector::tpch { @@ -130,7 +132,7 @@ class TpchConnector final : public Connector { public: TpchConnector( const std::string& id, - std::shared_ptr config, + std::shared_ptr config, folly::Executor* /*executor*/) : Connector(id) {} @@ -169,10 +171,21 @@ class TpchConnectorFactory : public ConnectorFactory { std::shared_ptr newConnector( const std::string& id, - std::shared_ptr config, + std::shared_ptr config, folly::Executor* executor = nullptr) override { return std::make_shared(id, config, executor); } + + std::shared_ptr newConnector( + const std::string& id, + std::shared_ptr config, + folly::Executor* executor = nullptr) override { + std::shared_ptr convertedConfig; + convertedConfig = config == nullptr + ? nullptr + : std::make_shared(config->valuesCopy()); + return newConnector(id, convertedConfig, executor); + } }; } // namespace facebook::velox::connector::tpch diff --git a/velox/connectors/tpch/tests/SpeedTest.cpp b/velox/connectors/tpch/tests/SpeedTest.cpp index fbf0337769c0..8bb713c9c69b 100644 --- a/velox/connectors/tpch/tests/SpeedTest.cpp +++ b/velox/connectors/tpch/tests/SpeedTest.cpp @@ -60,7 +60,9 @@ class TpchSpeedTest { connector::getConnectorFactory( connector::tpch::TpchConnectorFactory::kTpchConnectorName) ->newConnector( - kTpchConnectorId_, std::make_shared()); + kTpchConnectorId_, + std::make_shared( + std::unordered_map())); connector::registerConnector(tpchConnector); } diff --git a/velox/connectors/tpch/tests/TpchConnectorTest.cpp b/velox/connectors/tpch/tests/TpchConnectorTest.cpp index 74459930497c..04f4199d348a 100644 --- a/velox/connectors/tpch/tests/TpchConnectorTest.cpp +++ b/velox/connectors/tpch/tests/TpchConnectorTest.cpp @@ -40,7 +40,9 @@ class TpchConnectorTest : public exec::test::OperatorTestBase { connector::getConnectorFactory( connector::tpch::TpchConnectorFactory::kTpchConnectorName) ->newConnector( - kTpchConnectorId, std::make_shared()); + kTpchConnectorId, + std::make_shared( + std::unordered_map())); connector::registerConnector(tpchConnector); } diff --git a/velox/core/CMakeLists.txt b/velox/core/CMakeLists.txt index f2a272937773..3b3a4f3f7990 100644 --- a/velox/core/CMakeLists.txt +++ b/velox/core/CMakeLists.txt @@ -35,6 +35,7 @@ velox_link_libraries( PUBLIC velox_arrow_bridge velox_caching velox_config + velox_common_config velox_connector velox_exception velox_expression_functions diff --git a/velox/core/QueryConfig.cpp b/velox/core/QueryConfig.cpp index de251815c4c3..cc9b57b23d6a 100644 --- a/velox/core/QueryConfig.cpp +++ b/velox/core/QueryConfig.cpp @@ -16,103 +16,41 @@ #include +#include "velox/common/config/Config.h" #include "velox/core/QueryConfig.h" +#include "velox/type/tz/TimeZoneMap.h" namespace facebook::velox::core { -double toBytesPerCapacityUnit(CapacityUnit unit) { - switch (unit) { - case CapacityUnit::BYTE: - return 1; - case CapacityUnit::KILOBYTE: - return exp2(10); - case CapacityUnit::MEGABYTE: - return exp2(20); - case CapacityUnit::GIGABYTE: - return exp2(30); - case CapacityUnit::TERABYTE: - return exp2(40); - case CapacityUnit::PETABYTE: - return exp2(50); - default: - VELOX_USER_FAIL("Invalid capacity unit '{}'", (int)unit); - } -} - -CapacityUnit valueOfCapacityUnit(const std::string& unitStr) { - if (unitStr == "B") { - return CapacityUnit::BYTE; - } - // To be backward compatible this is lowercase. - if (unitStr == "kB") { - return CapacityUnit::KILOBYTE; - } - if (unitStr == "MB") { - return CapacityUnit::MEGABYTE; - } - if (unitStr == "GB") { - return CapacityUnit::GIGABYTE; - } - if (unitStr == "TB") { - return CapacityUnit::TERABYTE; - } - if (unitStr == "PB") { - return CapacityUnit::PETABYTE; - } - VELOX_USER_FAIL("Invalid capacity unit '{}'", unitStr); +QueryConfig::QueryConfig( + const std::unordered_map& values) + : config_{std::make_unique( + std::unordered_map(values))} { + validateConfig(); } -// Convert capacity string with unit to the capacity number in the specified -// units -uint64_t toCapacity(const std::string& from, CapacityUnit to) { - static const RE2 kPattern(R"(^\s*(\d+(?:\.\d+)?)\s*([a-zA-Z]+)\s*$)"); - double value; - std::string unit; - if (!RE2::FullMatch(from, kPattern, &value, &unit)) { - VELOX_USER_FAIL("Invalid capacity string '{}'", from); - } - - return value * - (toBytesPerCapacityUnit(valueOfCapacityUnit(unit)) / - toBytesPerCapacityUnit(to)); +QueryConfig::QueryConfig(std::unordered_map&& values) + : config_{std::make_unique(std::move(values))} { + validateConfig(); } -std::chrono::duration toDuration(const std::string& str) { - static const RE2 kPattern(R"(^\s*(\d+(?:\.\d+)?)\s*([a-zA-Z]+)\s*)"); - - double value; - std::string unit; - if (!RE2::FullMatch(str, kPattern, &value, &unit)) { - VELOX_USER_FAIL("Invalid duration {}", str); - } - if (unit == "ns") { - return std::chrono::duration(value); - } else if (unit == "us") { - return std::chrono::duration(value); - } else if (unit == "ms") { - return std::chrono::duration(value); - } else if (unit == "s") { - return std::chrono::duration(value); - } else if (unit == "m") { - return std::chrono::duration>(value); - } else if (unit == "h") { - return std::chrono::duration>(value); - } else if (unit == "d") { - return std::chrono::duration>(value); +void QueryConfig::validateConfig() { + // Validate if timezone name can be recognized. + if (config_->valueExists(QueryConfig::kSessionTimezone)) { + VELOX_USER_CHECK( + tz::getTimeZoneID( + config_->get(QueryConfig::kSessionTimezone).value(), + false) != -1, + fmt::format( + "session '{}' set with invalid value '{}'", + QueryConfig::kSessionTimezone, + config_->get(QueryConfig::kSessionTimezone).value())); } - VELOX_USER_FAIL("Invalid duration {}", str); } -QueryConfig::QueryConfig( - const std::unordered_map& values) - : config_{std::make_unique(values)} {} - -QueryConfig::QueryConfig(std::unordered_map&& values) - : config_{std::make_unique(std::move(values))} {} - void QueryConfig::testingOverrideConfigUnsafe( std::unordered_map&& values) { - config_ = std::make_unique(std::move(values)); + config_ = std::make_unique(std::move(values)); } } // namespace facebook::velox::core diff --git a/velox/core/QueryConfig.h b/velox/core/QueryConfig.h index 451f67c8a53a..8c1bf28d6026 100644 --- a/velox/core/QueryConfig.h +++ b/velox/core/QueryConfig.h @@ -19,24 +19,6 @@ #include "velox/core/Config.h" namespace facebook::velox::core { -enum class CapacityUnit { - BYTE, - KILOBYTE, - MEGABYTE, - GIGABYTE, - TERABYTE, - PETABYTE -}; - -double toBytesPerCapacityUnit(CapacityUnit unit); - -CapacityUnit valueOfCapacityUnit(const std::string& unitStr); - -/// Convert capacity string with unit to the capacity number in the specified -/// units -uint64_t toCapacity(const std::string& from, CapacityUnit to); - -std::chrono::duration toDuration(const std::string& str); /// A simple wrapper around velox::Config. Defines constants for query /// config properties and accessor methods. @@ -745,6 +727,8 @@ class QueryConfig { std::unordered_map&& values); private: - std::unique_ptr config_; + void validateConfig(); + + std::unique_ptr config_; }; } // namespace facebook::velox::core diff --git a/velox/core/QueryCtx.cpp b/velox/core/QueryCtx.cpp index 6e93db442e45..af0085cc7470 100644 --- a/velox/core/QueryCtx.cpp +++ b/velox/core/QueryCtx.cpp @@ -16,13 +16,17 @@ #include "velox/core/QueryCtx.h" #include "velox/common/base/SpillConfig.h" +#include "velox/common/config/Config.h" +#include "velox/core/Config.h" namespace facebook::velox::core { -/*static*/ std::shared_ptr QueryCtx::create( +// static +std::shared_ptr QueryCtx::create( folly::Executor* executor, QueryConfig&& queryConfig, - std::unordered_map> connectorConfigs, + std::unordered_map> + connectorConfigs, cache::AsyncDataCache* cache, std::shared_ptr pool, folly::Executor* spillExecutor, @@ -39,10 +43,38 @@ namespace facebook::velox::core { return queryCtx; } +// static +std::shared_ptr QueryCtx::create( + folly::Executor* executor, + QueryConfig&& queryConfig, + std::unordered_map> connectorConfigs, + cache::AsyncDataCache* cache, + std::shared_ptr pool, + folly::Executor* spillExecutor, + const std::string& queryId) { + std::unordered_map> + convertedConfigs; + for (const auto& pair : connectorConfigs) { + convertedConfigs.insert( + {pair.first, + std::make_shared(pair.second->valuesCopy())}); + } + std::shared_ptr queryCtx(new QueryCtx( + executor, + std::move(queryConfig), + std::move(convertedConfigs), + cache, + std::move(pool), + spillExecutor, + queryId)); + queryCtx->maybeSetReclaimer(); + return queryCtx; +} + QueryCtx::QueryCtx( folly::Executor* executor, QueryConfig&& queryConfig, - std::unordered_map> + std::unordered_map> connectorSessionProperties, cache::AsyncDataCache* cache, std::shared_ptr pool, diff --git a/velox/core/QueryCtx.h b/velox/core/QueryCtx.h index 69a34de6434f..39f12385f830 100644 --- a/velox/core/QueryCtx.h +++ b/velox/core/QueryCtx.h @@ -24,6 +24,10 @@ #include "velox/vector/DecodedVector.h" #include "velox/vector/VectorPool.h" +namespace facebook::velox { +class Config; +}; + namespace facebook::velox::core { class QueryCtx : public std::enable_shared_from_this { @@ -42,13 +46,24 @@ class QueryCtx : public std::enable_shared_from_this { static std::shared_ptr create( folly::Executor* executor = nullptr, QueryConfig&& queryConfig = QueryConfig{{}}, - std::unordered_map> + std::unordered_map> connectorConfigs = {}, cache::AsyncDataCache* cache = cache::AsyncDataCache::getInstance(), std::shared_ptr pool = nullptr, folly::Executor* spillExecutor = nullptr, const std::string& queryId = ""); + // TODO(jtan6): [Config Refactor] Remove this old API after refactoring is + // done. + static std::shared_ptr create( + folly::Executor* executor, + QueryConfig&& queryConfig, + std::unordered_map> connectorConfigs, + cache::AsyncDataCache* cache = cache::AsyncDataCache::getInstance(), + std::shared_ptr pool = nullptr, + folly::Executor* spillExecutor = nullptr, + const std::string& queryId = ""); + static std::string generatePoolName(const std::string& queryId); memory::MemoryPool* pool() const { @@ -72,7 +87,8 @@ class QueryCtx : public std::enable_shared_from_this { return queryConfig_; } - Config* connectorSessionProperties(const std::string& connectorId) const { + config::ConfigBase* connectorSessionProperties( + const std::string& connectorId) const { auto it = connectorSessionProperties_.find(connectorId); if (it == connectorSessionProperties_.end()) { return getEmptyConfig(); @@ -93,7 +109,7 @@ class QueryCtx : public std::enable_shared_from_this { const std::string& connectorId, std::unordered_map&& configOverrides) { connectorSessionProperties_[connectorId] = - std::make_shared(std::move(configOverrides)); + std::make_shared(std::move(configOverrides)); } folly::Executor* spillExecutor() const { @@ -135,7 +151,7 @@ class QueryCtx : public std::enable_shared_from_this { QueryCtx( folly::Executor* executor = nullptr, QueryConfig&& queryConfig = QueryConfig{{}}, - std::unordered_map> + std::unordered_map> connectorConfigs = {}, cache::AsyncDataCache* cache = cache::AsyncDataCache::getInstance(), std::shared_ptr pool = nullptr, @@ -174,9 +190,10 @@ class QueryCtx : public std::enable_shared_from_this { memory::MemoryPool* const pool_; }; - static Config* getEmptyConfig() { - static const std::unique_ptr kEmptyConfig = - std::make_unique(); + static config::ConfigBase* getEmptyConfig() { + static const std::unique_ptr kEmptyConfig = + std::make_unique( + std::unordered_map()); return kEmptyConfig.get(); } @@ -201,7 +218,7 @@ class QueryCtx : public std::enable_shared_from_this { folly::Executor* const spillExecutor_{nullptr}; cache::AsyncDataCache* const cache_; - std::unordered_map> + std::unordered_map> connectorSessionProperties_; std::shared_ptr pool_; QueryConfig queryConfig_; diff --git a/velox/core/tests/QueryConfigTest.cpp b/velox/core/tests/QueryConfigTest.cpp index 476f38f5eced..81a199685bcc 100644 --- a/velox/core/tests/QueryConfigTest.cpp +++ b/velox/core/tests/QueryConfigTest.cpp @@ -47,53 +47,10 @@ TEST_F(QueryConfigTest, setConfig) { TEST_F(QueryConfigTest, invalidConfig) { std::unordered_map configData( - {{QueryConfig::kSessionTimezone, "Invalid"}}); + {{QueryConfig::kSessionTimezone, "invalid"}}); VELOX_ASSERT_USER_THROW( QueryCtx::create(nullptr, QueryConfig{std::move(configData)}), - "Unknown time zone: 'Invalid'"); - - auto queryCtx = QueryCtx::create(nullptr); - VELOX_ASSERT_USER_THROW( - queryCtx->testingOverrideConfigUnsafe({ - {core::QueryConfig::kSessionTimezone, ""}, - }), - "Unknown time zone: ''"); -} - -TEST_F(QueryConfigTest, memConfig) { - const std::string tz = "UTC"; - const std::unordered_map configData( - {{QueryConfig::kSessionTimezone, tz}}); - - { - MemConfig cfg{configData}; - MemConfig cfg2{}; - auto configDataCopy = configData; - ASSERT_EQ( - tz, - cfg.Config::get(QueryConfig::kSessionTimezone).value()); - ASSERT_FALSE(cfg.Config::get("missing-entry").has_value()); - ASSERT_EQ(configData, cfg.values()); - ASSERT_EQ(configData, cfg.valuesCopy()); - } - - { - MemConfigMutable cfg{configData}; - MemConfigMutable cfg2{}; - auto configDataCopy = configData; - MemConfigMutable cfg3{std::move(configDataCopy)}; - ASSERT_EQ( - tz, - cfg.Config::get(QueryConfig::kSessionTimezone).value()); - ASSERT_FALSE(cfg.Config::get("missing-entry").has_value()); - const std::string tz2 = "PST"; - ASSERT_NO_THROW(cfg.setValue(QueryConfig::kSessionTimezone, tz2)); - ASSERT_EQ( - tz2, - cfg.Config::get(QueryConfig::kSessionTimezone).value()); - ASSERT_THROW(cfg.values(), VeloxException); - ASSERT_EQ(configData, cfg3.valuesCopy()); - } + "session 'session_timezone' set with invalid value 'invalid'"); } TEST_F(QueryConfigTest, taskWriterCountConfig) { @@ -187,63 +144,4 @@ TEST_F(QueryConfigTest, enableExpressionEvaluationCacheConfig) { testConfig(false); } -TEST_F(QueryConfigTest, capacityConversion) { - folly::Random::DefaultGenerator rng; - rng.seed(1); - - std::unordered_map unitStrLookup{ - {CapacityUnit::BYTE, "B"}, - {CapacityUnit::KILOBYTE, "kB"}, - {CapacityUnit::MEGABYTE, "MB"}, - {CapacityUnit::GIGABYTE, "GB"}, - {CapacityUnit::TERABYTE, "TB"}, - {CapacityUnit::PETABYTE, "PB"}}; - - std::vector> units{ - {CapacityUnit::BYTE, 1}, - {CapacityUnit::KILOBYTE, 1024}, - {CapacityUnit::MEGABYTE, 1024 * 1024}, - {CapacityUnit::GIGABYTE, 1024 * 1024 * 1024}, - {CapacityUnit::TERABYTE, 1024ll * 1024 * 1024 * 1024}, - {CapacityUnit::PETABYTE, 1024ll * 1024 * 1024 * 1024 * 1024}}; - for (int32_t i = 0; i < units.size(); i++) { - for (int32_t j = 0; j < units.size(); j++) { - // We use this diffRatio to prevent float conversion overflow when - // converting from one unit to another. - uint64_t diffRatio = i < j ? units[j].second / units[i].second - : units[i].second / units[j].second; - uint64_t randNumber = folly::Random::rand64(rng); - uint64_t testNumber = i > j ? randNumber / diffRatio : randNumber; - ASSERT_EQ( - toCapacity( - std::string( - std::to_string(testNumber) + unitStrLookup[units[i].first]), - units[j].first), - (uint64_t)(testNumber * (units[i].second / units[j].second))); - } - } -} - -TEST_F(QueryConfigTest, durationConversion) { - folly::Random::DefaultGenerator rng; - rng.seed(1); - - std::vector> units{ - {"ns", 1}, - {"us", 1000}, - {"ms", 1000 * 1000}, - {"s", 1000ll * 1000 * 1000}, - {"m", 1000ll * 1000 * 1000 * 60}, - {"h", 1000ll * 1000 * 1000 * 60 * 60}, - {"d", 1000ll * 1000 * 1000 * 60 * 60 * 24}}; - for (uint32_t i = 0; i < units.size(); i++) { - auto testNumber = folly::Random::rand32(rng) % 10000; - auto duration = - toDuration(std::string(std::to_string(testNumber) + units[i].first)); - ASSERT_EQ( - testNumber * units[i].second, - std::chrono::duration_cast(duration).count()); - } -} - } // namespace facebook::velox::core::test diff --git a/velox/dwio/common/CMakeLists.txt b/velox/dwio/common/CMakeLists.txt index e1e21eaa2172..55fadf6c2f21 100644 --- a/velox/dwio/common/CMakeLists.txt +++ b/velox/dwio/common/CMakeLists.txt @@ -71,7 +71,7 @@ velox_link_libraries( velox_caching velox_common_io velox_common_compression - velox_config + velox_common_config velox_dwio_common_encryption velox_dwio_common_exception velox_exception diff --git a/velox/dwio/common/FileSink.h b/velox/dwio/common/FileSink.h index 802d58af7e67..fd9fec9fa860 100644 --- a/velox/dwio/common/FileSink.h +++ b/velox/dwio/common/FileSink.h @@ -18,16 +18,13 @@ #include +#include "velox/common/config/Config.h" #include "velox/common/file/File.h" #include "velox/common/io/IoStatistics.h" #include "velox/dwio/common/Closeable.h" #include "velox/dwio/common/DataBuffer.h" #include "velox/dwio/common/MetricsLog.h" -namespace facebook::velox { -class Config; -} - namespace facebook::velox::dwio::common { using namespace facebook::velox::io; @@ -40,7 +37,8 @@ class FileSink : public Closeable { bool bufferWrite{true}; /// Connector properties are required to create a FileSink on FileSystems /// such as S3. - const std::shared_ptr& connectorProperties{nullptr}; + const std::shared_ptr& connectorProperties{ + nullptr}; /// Config used to create sink files. This config is provided to underlying /// file system and the config is free form. The form should be defined by /// the underlying file system. @@ -111,7 +109,7 @@ class FileSink : public Closeable { const std::function&)>& callback); const std::string name_; - const std::shared_ptr connectorProperties_; + const std::shared_ptr connectorProperties_; memory::MemoryPool* const pool_; const MetricsLogPtr metricLogger_; IoStatistics* const stats_; diff --git a/velox/dwio/common/Options.h b/velox/dwio/common/Options.h index 11de58bb724b..a3aaa8ff3b28 100644 --- a/velox/dwio/common/Options.h +++ b/velox/dwio/common/Options.h @@ -23,9 +23,9 @@ #include "velox/common/base/RandomUtil.h" #include "velox/common/base/SpillConfig.h" #include "velox/common/compression/Compression.h" +#include "velox/common/config/Config.h" #include "velox/common/io/Options.h" #include "velox/common/memory/Memory.h" -#include "velox/core/Config.h" #include "velox/dwio/common/ColumnSelector.h" #include "velox/dwio/common/ErrorTolerance.h" #include "velox/dwio/common/FlatMapHelper.h" @@ -620,8 +620,8 @@ struct WriterOptions { // WriterOption implementations should provide this function to specify how to // process format-specific session and connector configs. - virtual void processSessionConfigs(const Config&) {} - virtual void processHiveConnectorConfigs(const Config&) {} + virtual void processSessionConfigs(const config::ConfigBase&) {} + virtual void processHiveConnectorConfigs(const config::ConfigBase&) {} virtual ~WriterOptions() = default; }; diff --git a/velox/dwio/parquet/tests/ParquetTpchTest.cpp b/velox/dwio/parquet/tests/ParquetTpchTest.cpp index 186f4a4ff3bf..cac577ecd654 100644 --- a/velox/dwio/parquet/tests/ParquetTpchTest.cpp +++ b/velox/dwio/parquet/tests/ParquetTpchTest.cpp @@ -57,14 +57,18 @@ class ParquetTpchTest : public testing::Test { connector::getConnectorFactory( connector::hive::HiveConnectorFactory::kHiveConnectorName) ->newConnector( - kHiveConnectorId, std::make_shared()); + kHiveConnectorId, + std::make_shared( + std::unordered_map())); connector::registerConnector(hiveConnector); auto tpchConnector = connector::getConnectorFactory( connector::tpch::TpchConnectorFactory::kTpchConnectorName) ->newConnector( - kTpchConnectorId, std::make_shared()); + kTpchConnectorId, + std::make_shared( + std::unordered_map())); connector::registerConnector(tpchConnector); saveTpchTablesAsParquet(); diff --git a/velox/dwio/parquet/tests/reader/ParquetTableScanTest.cpp b/velox/dwio/parquet/tests/reader/ParquetTableScanTest.cpp index f9239a8249d6..4261ee702249 100644 --- a/velox/dwio/parquet/tests/reader/ParquetTableScanTest.cpp +++ b/velox/dwio/parquet/tests/reader/ParquetTableScanTest.cpp @@ -49,7 +49,9 @@ class ParquetTableScanTest : public HiveConnectorTestBase { connector::getConnectorFactory( connector::hive::HiveConnectorFactory::kHiveConnectorName) ->newConnector( - kHiveConnectorId, std::make_shared()); + kHiveConnectorId, + std::make_shared( + std::unordered_map())); connector::registerConnector(hiveConnector); } diff --git a/velox/dwio/parquet/tests/writer/ParquetWriterTest.cpp b/velox/dwio/parquet/tests/writer/ParquetWriterTest.cpp index 96193b757d7b..87a7c6f0421d 100644 --- a/velox/dwio/parquet/tests/writer/ParquetWriterTest.cpp +++ b/velox/dwio/parquet/tests/writer/ParquetWriterTest.cpp @@ -46,7 +46,9 @@ class ParquetWriterTest : public ParquetTestBase { connector::getConnectorFactory( connector::hive::HiveConnectorFactory::kHiveConnectorName) ->newConnector( - kHiveConnectorId, std::make_shared()); + kHiveConnectorId, + std::make_shared( + std::unordered_map())); connector::registerConnector(hiveConnector); parquet::registerParquetWriterFactory(); } diff --git a/velox/dwio/parquet/writer/Writer.cpp b/velox/dwio/parquet/writer/Writer.cpp index baffa40cec9c..0a92aa916e02 100644 --- a/velox/dwio/parquet/writer/Writer.cpp +++ b/velox/dwio/parquet/writer/Writer.cpp @@ -18,6 +18,7 @@ #include #include #include +#include "velox/common/config/Config.h" #include "velox/common/testutil/TestValue.h" #include "velox/core/QueryConfig.h" #include "velox/dwio/parquet/writer/arrow/Properties.h" @@ -410,7 +411,7 @@ void Writer::setMemoryReclaimers() { namespace { std::optional getTimestampUnit( - const Config& config, + const config::ConfigBase& config, const char* configKey) { if (const auto unit = config.get(configKey)) { VELOX_CHECK( @@ -424,7 +425,7 @@ std::optional getTimestampUnit( } std::optional getTimestampTimeZone( - const Config& config, + const config::ConfigBase& config, const char* configKey) { if (const auto timezone = config.get(configKey)) { return timezone.value(); @@ -434,7 +435,7 @@ std::optional getTimestampTimeZone( } // namespace -void WriterOptions::processSessionConfigs(const Config& config) { +void WriterOptions::processSessionConfigs(const config::ConfigBase& config) { if (!parquetWriteTimestampUnit) { parquetWriteTimestampUnit = getTimestampUnit(config, kParquetSessionWriteTimestampUnit); @@ -446,7 +447,8 @@ void WriterOptions::processSessionConfigs(const Config& config) { } } -void WriterOptions::processHiveConnectorConfigs(const Config& config) { +void WriterOptions::processHiveConnectorConfigs( + const config::ConfigBase& config) { if (!parquetWriteTimestampUnit) { parquetWriteTimestampUnit = getTimestampUnit(config, kParquetHiveConnectorWriteTimestampUnit); diff --git a/velox/dwio/parquet/writer/Writer.h b/velox/dwio/parquet/writer/Writer.h index 261fd9016f74..e3da877be4f5 100644 --- a/velox/dwio/parquet/writer/Writer.h +++ b/velox/dwio/parquet/writer/Writer.h @@ -17,7 +17,7 @@ #pragma once #include "velox/common/compression/Compression.h" -#include "velox/core/Config.h" +#include "velox/common/config/Config.h" #include "velox/dwio/common/DataBuffer.h" #include "velox/dwio/common/FileSink.h" #include "velox/dwio/common/FlushPolicy.h" @@ -123,8 +123,8 @@ struct WriterOptions : public dwio::common::WriterOptions { "hive.parquet.writer.timestamp-unit"; // Process hive connector and session configs. - void processSessionConfigs(const Config& config) override; - void processHiveConnectorConfigs(const Config& config) override; + void processSessionConfigs(const config::ConfigBase& config) override; + void processHiveConnectorConfigs(const config::ConfigBase& config) override; }; // Writes Velox vectors into a DataSink using Arrow Parquet writer. diff --git a/velox/examples/ScanAndSort.cpp b/velox/examples/ScanAndSort.cpp index 6625e7a4d7c0..4f86083d1cf3 100644 --- a/velox/examples/ScanAndSort.cpp +++ b/velox/examples/ScanAndSort.cpp @@ -89,7 +89,10 @@ int main(int argc, char** argv) { auto hiveConnector = connector::getConnectorFactory( connector::hive::HiveConnectorFactory::kHiveConnectorName) - ->newConnector(kHiveConnectorId, std::make_shared()); + ->newConnector( + kHiveConnectorId, + std::make_shared( + std::unordered_map())); connector::registerConnector(hiveConnector); // To be able to read local files, we need to register the local file diff --git a/velox/exec/fuzzer/AggregationFuzzerBase.h b/velox/exec/fuzzer/AggregationFuzzerBase.h index f8ab7f827e1b..53a29d96f254 100644 --- a/velox/exec/fuzzer/AggregationFuzzerBase.h +++ b/velox/exec/fuzzer/AggregationFuzzerBase.h @@ -78,7 +78,9 @@ class AggregationFuzzerBase { connector::getConnectorFactory( connector::hive::HiveConnectorFactory::kHiveConnectorName) ->newConnector( - kHiveConnectorId, std::make_shared()); + kHiveConnectorId, + std::make_shared( + std::unordered_map())); connector::registerConnector(hiveConnector); seed(initialSeed); diff --git a/velox/exec/fuzzer/JoinFuzzer.cpp b/velox/exec/fuzzer/JoinFuzzer.cpp index cb98ce6adf70..50261592c363 100644 --- a/velox/exec/fuzzer/JoinFuzzer.cpp +++ b/velox/exec/fuzzer/JoinFuzzer.cpp @@ -309,13 +309,14 @@ JoinFuzzer::JoinFuzzer( filesystems::registerLocalFileSystem(); // Make sure not to run out of open file descriptors. - const std::unordered_map hiveConfig = { + std::unordered_map hiveConfig = { {connector::hive::HiveConfig::kNumCacheFileHandles, "1000"}}; auto hiveConnector = connector::getConnectorFactory( connector::hive::HiveConnectorFactory::kHiveConnectorName) ->newConnector( - kHiveConnectorId, std::make_shared(hiveConfig)); + kHiveConnectorId, + std::make_shared(std::move(hiveConfig))); connector::registerConnector(hiveConnector); seed(initialSeed); diff --git a/velox/exec/fuzzer/MemoryArbitrationFuzzer.cpp b/velox/exec/fuzzer/MemoryArbitrationFuzzer.cpp index f5794182f9ca..14e1e2c99bf8 100644 --- a/velox/exec/fuzzer/MemoryArbitrationFuzzer.cpp +++ b/velox/exec/fuzzer/MemoryArbitrationFuzzer.cpp @@ -220,13 +220,14 @@ class MemoryArbitrationFuzzer { MemoryArbitrationFuzzer::MemoryArbitrationFuzzer(size_t initialSeed) : vectorFuzzer_{getFuzzerOptions(), pool_.get()} { // Make sure not to run out of open file descriptors. - const std::unordered_map hiveConfig = { + std::unordered_map hiveConfig = { {connector::hive::HiveConfig::kNumCacheFileHandles, "1000"}}; const auto hiveConnector = connector::getConnectorFactory( connector::hive::HiveConnectorFactory::kHiveConnectorName) ->newConnector( - kHiveConnectorId, std::make_shared(hiveConfig)); + kHiveConnectorId, + std::make_shared(std::move(hiveConfig))); connector::registerConnector(hiveConnector); seed(initialSeed); } diff --git a/velox/exec/fuzzer/RowNumberFuzzer.cpp b/velox/exec/fuzzer/RowNumberFuzzer.cpp index c117b2dcde4b..002e1e7da481 100644 --- a/velox/exec/fuzzer/RowNumberFuzzer.cpp +++ b/velox/exec/fuzzer/RowNumberFuzzer.cpp @@ -173,13 +173,14 @@ RowNumberFuzzer::RowNumberFuzzer( filesystems::registerLocalFileSystem(); // Make sure not to run out of open file descriptors. - const std::unordered_map hiveConfig = { + std::unordered_map hiveConfig = { {connector::hive::HiveConfig::kNumCacheFileHandles, "1000"}}; auto hiveConnector = connector::getConnectorFactory( connector::hive::HiveConnectorFactory::kHiveConnectorName) ->newConnector( - kHiveConnectorId, std::make_shared(hiveConfig)); + kHiveConnectorId, + std::make_shared(std::move(hiveConfig))); connector::registerConnector(hiveConnector); seed(initialSeed); diff --git a/velox/exec/fuzzer/WriterFuzzerRunner.h b/velox/exec/fuzzer/WriterFuzzerRunner.h index ca6fbbfa7019..a5f45a32fc61 100644 --- a/velox/exec/fuzzer/WriterFuzzerRunner.h +++ b/velox/exec/fuzzer/WriterFuzzerRunner.h @@ -75,7 +75,9 @@ class WriterFuzzerRunner { connector::getConnectorFactory( connector::hive::HiveConnectorFactory::kHiveConnectorName) ->newConnector( - kHiveConnectorId, std::make_shared()); + kHiveConnectorId, + std::make_shared( + std::unordered_map())); connector::registerConnector(hiveConnector); facebook::velox::exec::test::writerFuzzer( seed, std::move(referenceQueryRunner)); diff --git a/velox/exec/tests/AsyncConnectorTest.cpp b/velox/exec/tests/AsyncConnectorTest.cpp index 3d4844126b0a..e509559678b8 100644 --- a/velox/exec/tests/AsyncConnectorTest.cpp +++ b/velox/exec/tests/AsyncConnectorTest.cpp @@ -163,10 +163,21 @@ class TestConnectorFactory : public connector::ConnectorFactory { std::shared_ptr newConnector( const std::string& id, - std::shared_ptr config, + std::shared_ptr config, folly::Executor* /* executor */) override { return std::make_shared(id); } + + std::shared_ptr newConnector( + const std::string& id, + std::shared_ptr config, + folly::Executor* executor = nullptr) override { + std::shared_ptr convertedConfig; + convertedConfig = config == nullptr + ? nullptr + : std::make_shared(config->valuesCopy()); + return newConnector(id, convertedConfig, executor); + } }; } // namespace @@ -179,7 +190,10 @@ class AsyncConnectorTest : public OperatorTestBase { auto testConnector = connector::getConnectorFactory(TestConnectorFactory::kTestConnectorName) ->newConnector( - kTestConnectorId, std::make_shared(), nullptr); + kTestConnectorId, + std::make_shared( + std::unordered_map()), + nullptr); connector::registerConnector(testConnector); } diff --git a/velox/exec/tests/TableWriteTest.cpp b/velox/exec/tests/TableWriteTest.cpp index 27612fc1ca2b..f249fc612c27 100644 --- a/velox/exec/tests/TableWriteTest.cpp +++ b/velox/exec/tests/TableWriteTest.cpp @@ -2387,8 +2387,8 @@ TEST_P(UnpartitionedTableWriterTest, immutableSettings) { std::unordered_map propFromFile{ {"hive.immutable-partitions", testData.immutablePartitionsEnabled ? "true" : "false"}}; - std::shared_ptr config{ - std::make_shared(propFromFile)}; + std::shared_ptr config{ + std::make_shared(std::move(propFromFile))}; resetHiveConnector(config); auto input = makeVectors(10, 10); diff --git a/velox/exec/tests/VeloxIn10MinDemo.cpp b/velox/exec/tests/VeloxIn10MinDemo.cpp index 790a11313bb6..f7ad1fd41abb 100644 --- a/velox/exec/tests/VeloxIn10MinDemo.cpp +++ b/velox/exec/tests/VeloxIn10MinDemo.cpp @@ -52,7 +52,9 @@ class VeloxIn10MinDemo : public VectorTestBase { connector::getConnectorFactory( connector::tpch::TpchConnectorFactory::kTpchConnectorName) ->newConnector( - kTpchConnectorId, std::make_shared()); + kTpchConnectorId, + std::make_shared( + std::unordered_map())); connector::registerConnector(tpchConnector); } diff --git a/velox/exec/tests/utils/HiveConnectorTestBase.cpp b/velox/exec/tests/utils/HiveConnectorTestBase.cpp index d83770714c4c..6e4f5e56db97 100644 --- a/velox/exec/tests/utils/HiveConnectorTestBase.cpp +++ b/velox/exec/tests/utils/HiveConnectorTestBase.cpp @@ -36,7 +36,8 @@ void HiveConnectorTestBase::SetUp() { connector::hive::HiveConnectorFactory::kHiveConnectorName) ->newConnector( kHiveConnectorId, - std::make_shared(), + std::make_shared( + std::unordered_map()), ioExecutor_.get()); connector::registerConnector(hiveConnector); } @@ -50,7 +51,7 @@ void HiveConnectorTestBase::TearDown() { } void HiveConnectorTestBase::resetHiveConnector( - const std::shared_ptr& config) { + const std::shared_ptr& config) { connector::unregisterConnector(kHiveConnectorId); auto hiveConnector = connector::getConnectorFactory( diff --git a/velox/exec/tests/utils/HiveConnectorTestBase.h b/velox/exec/tests/utils/HiveConnectorTestBase.h index 83666d17a748..973c7b4eac0b 100644 --- a/velox/exec/tests/utils/HiveConnectorTestBase.h +++ b/velox/exec/tests/utils/HiveConnectorTestBase.h @@ -40,7 +40,8 @@ class HiveConnectorTestBase : public OperatorTestBase { void SetUp() override; void TearDown() override; - void resetHiveConnector(const std::shared_ptr& config); + void resetHiveConnector( + const std::shared_ptr& config); void writeToFile(const std::string& filePath, RowVectorPtr vector); diff --git a/velox/experimental/wave/exec/WaveHiveDataSource.cpp b/velox/experimental/wave/exec/WaveHiveDataSource.cpp index 4a5d866b90d2..016aa5c011ce 100644 --- a/velox/experimental/wave/exec/WaveHiveDataSource.cpp +++ b/velox/experimental/wave/exec/WaveHiveDataSource.cpp @@ -160,7 +160,8 @@ void WaveHiveDataSource::registerConnector() { return; } registered = true; - auto config = std::make_shared(); + auto config = std::make_shared( + std::unordered_map()); // Create hive connector with config... auto hiveConnector = diff --git a/velox/expression/tests/CastExprTest.cpp b/velox/expression/tests/CastExprTest.cpp index a86bc1f515a9..efa036d32455 100644 --- a/velox/expression/tests/CastExprTest.cpp +++ b/velox/expression/tests/CastExprTest.cpp @@ -844,10 +844,6 @@ TEST_F(CastExprTest, timestampAdjustToTimezone) { }); } -TEST_F(CastExprTest, timestampAdjustToTimezoneInvalid) { - VELOX_ASSERT_USER_THROW(setTimezone("bla"), "Unknown time zone: 'bla'"); -} - TEST_F(CastExprTest, date) { testCast( "date", diff --git a/velox/functions/lib/aggregates/tests/utils/AggregationTestBase.cpp b/velox/functions/lib/aggregates/tests/utils/AggregationTestBase.cpp index 4fbe18ce1120..dd24e110cc67 100644 --- a/velox/functions/lib/aggregates/tests/utils/AggregationTestBase.cpp +++ b/velox/functions/lib/aggregates/tests/utils/AggregationTestBase.cpp @@ -70,7 +70,10 @@ void AggregationTestBase::SetUp() { auto hiveConnector = connector::getConnectorFactory( connector::hive::HiveConnectorFactory::kHiveConnectorName) - ->newConnector(kHiveConnectorId, std::make_shared()); + ->newConnector( + kHiveConnectorId, + std::make_shared( + std::unordered_map())); connector::registerConnector(hiveConnector); }