diff --git a/velox/connectors/hive/HiveConfig.cpp b/velox/connectors/hive/HiveConfig.cpp index d89216248747..18eb9377bbff 100644 --- a/velox/connectors/hive/HiveConfig.cpp +++ b/velox/connectors/hive/HiveConfig.cpp @@ -296,17 +296,6 @@ uint8_t HiveConfig::readTimestampUnit(const Config* session) const { return unit; } -uint8_t HiveConfig::parquetWriteTimestampUnit(const Config* session) const { - const auto unit = session->get( - kParquetWriteTimestampUnitSession, - config_->get(kParquetWriteTimestampUnit, 9 /*nano*/)); - VELOX_CHECK( - unit == 0 /*second*/ || unit == 3 /*milli*/ || unit == 6 /*micro*/ || - unit == 9, - "Invalid timestamp unit."); - return unit; -} - bool HiveConfig::cacheNoRetention(const Config* session) const { return session->get( kCacheNoRetentionSession, diff --git a/velox/connectors/hive/HiveConfig.h b/velox/connectors/hive/HiveConfig.h index 0486991cf264..57f6bac6bc84 100644 --- a/velox/connectors/hive/HiveConfig.h +++ b/velox/connectors/hive/HiveConfig.h @@ -237,12 +237,6 @@ class HiveConfig { static constexpr const char* kReadTimestampUnitSession = "hive.reader.timestamp_unit"; - /// Timestamp unit for Parquet write through Arrow bridge. - static constexpr const char* kParquetWriteTimestampUnit = - "hive.parquet.writer.timestamp-unit"; - static constexpr const char* kParquetWriteTimestampUnitSession = - "hive.parquet.writer.timestamp_unit"; - static constexpr const char* kCacheNoRetention = "cache.no_retention"; static constexpr const char* kCacheNoRetentionSession = "cache.no_retention"; @@ -342,10 +336,6 @@ class HiveConfig { // Returns the timestamp unit used when reading timestamps from files. uint8_t readTimestampUnit(const Config* session) const; - /// Returns the timestamp unit used when writing timestamps into Parquet - /// through Arrow bridge. 0: second, 3: milli, 6: micro, 9: nano. - uint8_t parquetWriteTimestampUnit(const Config* session) const; - /// Returns true to evict out a query scanned data out of in-memory cache /// right after the access, and also skip staging to the ssd cache. This helps /// to prevent the cache space pollution from the one-time table scan by large diff --git a/velox/connectors/hive/HiveDataSink.cpp b/velox/connectors/hive/HiveDataSink.cpp index 645833e5fb49..455b1346a5c5 100644 --- a/velox/connectors/hive/HiveDataSink.cpp +++ b/velox/connectors/hive/HiveDataSink.cpp @@ -677,16 +677,25 @@ uint32_t HiveDataSink::appendWriter(const HiveWriterId& id) { ioStats_.emplace_back(std::make_shared()); setMemoryReclaimers(writerInfo_.back().get(), ioStats_.back().get()); - // Take the one provided by the user as a starting point, or allocate a new - // one. - const auto& writerOptions = insertTableHandle_->writerOptions(); - std::shared_ptr options = writerOptions - ? writerOptions - : std::make_shared(); + // Take the writer options provided by the user as a starting point, or + // allocate a new one. + auto options = insertTableHandle_->writerOptions(); + if (!options) { + options = writerFactory_->createWriterOptions(); + } const auto* connectorSessionProperties = connectorQueryCtx_->sessionProperties(); + // Acquire file format specifc configs. The precedence order is: + // + // 1. First respect any options specified as part of the query plan (accessed + // through insertTableHandle) + // 2. Otherwise, acquire user defined session properties. + // 3. Lastly, acquire general hive connector configs. + options->processSessionConfigs(*connectorSessionProperties); + options->processHiveConnectorConfigs(*hiveConfig_->config()); + // Only overwrite options in case they were not already provided. if (options->schema == nullptr) { options->schema = getNonPartitionTypes(dataChannels_, inputType_); @@ -738,11 +747,6 @@ uint32_t HiveDataSink::appendWriter(const HiveWriterId& id) { connectorSessionProperties); } - if (!options->parquetWriteTimestampUnit) { - options->parquetWriteTimestampUnit = - hiveConfig_->parquetWriteTimestampUnit(connectorSessionProperties); - } - if (!options->orcMinCompressionSize) { options->orcMinCompressionSize = std::optional( hiveConfig_->orcWriterMinCompressionSize(connectorSessionProperties)); diff --git a/velox/dwio/common/CMakeLists.txt b/velox/dwio/common/CMakeLists.txt index 1fc725a124e6..06a171b72684 100644 --- a/velox/dwio/common/CMakeLists.txt +++ b/velox/dwio/common/CMakeLists.txt @@ -69,6 +69,7 @@ target_link_libraries( velox_caching velox_common_io velox_common_compression + velox_config velox_dwio_common_encryption velox_dwio_common_exception velox_exception diff --git a/velox/dwio/common/Options.h b/velox/dwio/common/Options.h index 1f8c0daee430..f1e7808f2ea4 100644 --- a/velox/dwio/common/Options.h +++ b/velox/dwio/common/Options.h @@ -25,6 +25,7 @@ #include "velox/common/compression/Compression.h" #include "velox/common/io/Options.h" #include "velox/common/memory/Memory.h" +#include "velox/core/Config.h" #include "velox/dwio/common/ColumnSelector.h" #include "velox/dwio/common/ErrorTolerance.h" #include "velox/dwio/common/FlatMapHelper.h" @@ -597,6 +598,7 @@ struct WriterOptions { velox::memory::MemoryPool* memoryPool{nullptr}; const velox::common::SpillConfig* spillConfig{nullptr}; tsan_atomic* nonReclaimableSection{nullptr}; + /// A ready-to-use default memory reclaimer factory. It shall be provided by /// the system that creates writers to ensure a smooth memory system /// integration (e.g. graceful suspension upon arbitration request). Writer @@ -604,6 +606,7 @@ struct WriterOptions { /// this default one. std::function()> defaultMemoryReclaimerFactory{[]() { return nullptr; }}; + std::optional compressionKind; std::optional orcMinCompressionSize{std::nullopt}; std::optional maxStripeSize{std::nullopt}; @@ -612,10 +615,14 @@ struct WriterOptions { std::optional orcWriterIntegerDictionaryEncodingEnabled{std::nullopt}; std::optional orcWriterStringDictionaryEncodingEnabled{std::nullopt}; std::map serdeParameters; - std::optional parquetWriteTimestampUnit; std::optional zlibCompressionLevel; std::optional zstdCompressionLevel; + // WriterOption implementations should provide this function to specify how to + // process format-specific session and connector configs. + virtual void processSessionConfigs(const Config&) {} + virtual void processHiveConnectorConfigs(const Config&) {} + virtual ~WriterOptions() = default; }; diff --git a/velox/dwio/common/WriterFactory.h b/velox/dwio/common/WriterFactory.h index 125ef69cf2e0..1ab485bfde78 100644 --- a/velox/dwio/common/WriterFactory.h +++ b/velox/dwio/common/WriterFactory.h @@ -64,6 +64,10 @@ class WriterFactory { } #endif + /// Creates a polymorphic writer options object. + virtual std::unique_ptr + createWriterOptions() = 0; + private: const FileFormat format_; }; diff --git a/velox/dwio/dwrf/writer/Writer.cpp b/velox/dwio/dwrf/writer/Writer.cpp index 2ce6cebe9d90..a4a490e23984 100644 --- a/velox/dwio/dwrf/writer/Writer.cpp +++ b/velox/dwio/dwrf/writer/Writer.cpp @@ -32,8 +32,8 @@ using facebook::velox::common::testutil::TestValue; namespace facebook::velox::dwrf { - namespace { + dwio::common::StripeProgress getStripeProgress(const WriterContext& context) { return dwio::common::StripeProgress{ .stripeIndex = context.stripeIndex(), @@ -860,6 +860,11 @@ std::unique_ptr DwrfWriterFactory::createWriter( return std::make_unique(std::move(sink), dwrfOptions); } +std::unique_ptr +DwrfWriterFactory::createWriterOptions() { + return std::make_unique(); +} + void registerDwrfWriterFactory() { dwio::common::registerWriterFactory(std::make_shared()); } diff --git a/velox/dwio/dwrf/writer/Writer.h b/velox/dwio/dwrf/writer/Writer.h index fdd4f9790579..7e64ff62bb12 100644 --- a/velox/dwio/dwrf/writer/Writer.h +++ b/velox/dwio/dwrf/writer/Writer.h @@ -30,7 +30,7 @@ namespace facebook::velox::dwrf { -struct WriterOptions { +struct WriterOptions : public dwio::common::WriterOptions { std::shared_ptr config = std::make_shared(); std::shared_ptr schema; velox::memory::MemoryPool* memoryPool; @@ -220,6 +220,8 @@ class DwrfWriterFactory : public dwio::common::WriterFactory { std::unique_ptr createWriter( std::unique_ptr sink, const std::shared_ptr& options) override; + + std::unique_ptr createWriterOptions() override; }; } // namespace facebook::velox::dwrf diff --git a/velox/dwio/parquet/tests/ParquetTestBase.h b/velox/dwio/parquet/tests/ParquetTestBase.h index fbf0e63cadb3..0195447dfbda 100644 --- a/velox/dwio/parquet/tests/ParquetTestBase.h +++ b/velox/dwio/parquet/tests/ParquetTestBase.h @@ -167,7 +167,7 @@ class ParquetTestBase : public testing::Test, public test::VectorTestBase { facebook::velox::parquet::WriterOptions options; options.memoryPool = rootPool_.get(); options.flushPolicyFactory = flushPolicy; - options.compression = compressionKind; + options.compressionKind = compressionKind; return std::make_unique( std::move(sink), options, rowType); } diff --git a/velox/dwio/parquet/tests/reader/E2EFilterTest.cpp b/velox/dwio/parquet/tests/reader/E2EFilterTest.cpp index 52f53fa02e92..0b27395f9a00 100644 --- a/velox/dwio/parquet/tests/reader/E2EFilterTest.cpp +++ b/velox/dwio/parquet/tests/reader/E2EFilterTest.cpp @@ -159,7 +159,7 @@ TEST_F(E2EFilterTest, compression) { } options_.dataPageSize = 4 * 1024; - options_.compression = compression; + options_.compressionKind = compression; testWithTypes( "tinyint_val:tinyint," diff --git a/velox/dwio/parquet/tests/writer/ParquetWriterTest.cpp b/velox/dwio/parquet/tests/writer/ParquetWriterTest.cpp index f1caaecaf9bd..619ed3840593 100644 --- a/velox/dwio/parquet/tests/writer/ParquetWriterTest.cpp +++ b/velox/dwio/parquet/tests/writer/ParquetWriterTest.cpp @@ -106,7 +106,7 @@ TEST_F(ParquetWriterTest, compression) { auto sinkPtr = sink.get(); facebook::velox::parquet::WriterOptions writerOptions; writerOptions.memoryPool = leafPool_.get(); - writerOptions.compression = CompressionKind::CompressionKind_SNAPPY; + writerOptions.compressionKind = CompressionKind::CompressionKind_SNAPPY; const auto& fieldNames = schema->names(); @@ -151,8 +151,7 @@ DEBUG_ONLY_TEST_F(ParquetWriterTest, unitFromWriterOptions) { 10'000, [](auto row) { return Timestamp(row, row); })}); parquet::WriterOptions writerOptions; writerOptions.memoryPool = leafPool_.get(); - writerOptions.parquetWriteTimestampUnit = - static_cast(TimestampUnit::kMicro); + writerOptions.parquetWriteTimestampUnit = TimestampUnit::kMicro; // Create an in-memory writer. auto sink = std::make_unique( diff --git a/velox/dwio/parquet/writer/Writer.cpp b/velox/dwio/parquet/writer/Writer.cpp index ec36a3749a1e..d508b53356f8 100644 --- a/velox/dwio/parquet/writer/Writer.cpp +++ b/velox/dwio/parquet/writer/Writer.cpp @@ -131,8 +131,8 @@ std::shared_ptr getArrowParquetWriterOptions( if (!options.enableDictionary) { properties = properties->disable_dictionary(); } - properties = - properties->compression(getArrowParquetCompression(options.compression)); + properties = properties->compression(getArrowParquetCompression( + options.compressionKind.value_or(common::CompressionKind_NONE))); for (const auto& columnCompressionValues : options.columnCompressionsMap) { properties->compression( columnCompressionValues.first, @@ -236,7 +236,7 @@ Writer::Writer( flushPolicy_ = std::make_unique(); } options_.timestampUnit = - static_cast(options.parquetWriteTimestampUnit); + options.parquetWriteTimestampUnit.value_or(TimestampUnit::kNano); arrowContext_->properties = getArrowParquetWriterOptions(options, flushPolicy_); setMemoryReclaimers(); @@ -388,20 +388,6 @@ void Writer::abort() { arrowContext_.reset(); } -parquet::WriterOptions getParquetOptions( - const dwio::common::WriterOptions& options) { - parquet::WriterOptions parquetOptions; - parquetOptions.memoryPool = options.memoryPool; - if (options.compressionKind.has_value()) { - parquetOptions.compression = options.compressionKind.value(); - } - if (options.parquetWriteTimestampUnit.has_value()) { - parquetOptions.parquetWriteTimestampUnit = - options.parquetWriteTimestampUnit.value(); - } - return parquetOptions; -} - void Writer::setMemoryReclaimers() { VELOX_CHECK( !pool_->isLeaf(), @@ -419,12 +405,53 @@ void Writer::setMemoryReclaimers() { generalPool_->setReclaimer(exec::MemoryReclaimer::create()); } +namespace { + +std::optional getTimestampUnit( + const Config& config, + const char* configKey) { + if (const auto unit = config.get(configKey)) { + VELOX_CHECK( + unit == 0 /*second*/ || unit == 3 /*milli*/ || unit == 6 /*micro*/ || + unit == 9 /*nano*/, + "Invalid timestamp unit: {}", + unit.value()); + return std::optional(static_cast(unit.value())); + } + return std::nullopt; +} + +} // namespace + +void WriterOptions::processSessionConfigs(const Config& config) { + if (!parquetWriteTimestampUnit) { + parquetWriteTimestampUnit = + getTimestampUnit(config, kParquetSessionWriteTimestampUnit); + } +} + +void WriterOptions::processHiveConnectorConfigs(const Config& config) { + if (!parquetWriteTimestampUnit) { + parquetWriteTimestampUnit = + getTimestampUnit(config, kParquetHiveConnectorWriteTimestampUnit); + } +} + std::unique_ptr ParquetWriterFactory::createWriter( std::unique_ptr sink, const std::shared_ptr& options) { - auto parquetOptions = getParquetOptions(*options); + auto parquetOptions = + std::dynamic_pointer_cast(options); + VELOX_CHECK_NOT_NULL( + parquetOptions, + "Parquet writer factory expected a Parquet WriterOptions object."); return std::make_unique( - std::move(sink), parquetOptions, asRowType(options->schema)); + std::move(sink), *parquetOptions, asRowType(options->schema)); +} + +std::unique_ptr +ParquetWriterFactory::createWriterOptions() { + return std::make_unique(); } } // namespace facebook::velox::parquet diff --git a/velox/dwio/parquet/writer/Writer.h b/velox/dwio/parquet/writer/Writer.h index 97b2d1d52f9f..6901c2dc3be7 100644 --- a/velox/dwio/parquet/writer/Writer.h +++ b/velox/dwio/parquet/writer/Writer.h @@ -17,6 +17,7 @@ #pragma once #include "velox/common/compression/Compression.h" +#include "velox/core/Config.h" #include "velox/dwio/common/DataBuffer.h" #include "velox/dwio/common/FileSink.h" #include "velox/dwio/common/FlushPolicy.h" @@ -86,26 +87,42 @@ class LambdaFlushPolicy : public DefaultFlushPolicy { std::function lambda_; }; -struct WriterOptions { +struct WriterOptions : public dwio::common::WriterOptions { bool enableDictionary = true; int64_t dataPageSize = 1'024 * 1'024; int64_t dictionaryPageSizeLimit = 1'024 * 1'024; + // Growth ratio passed to ArrowDataBufferSink. The default value is a // heuristic borrowed from // folly/FBVector(https://github.com/facebook/folly/blob/main/folly/docs/FBVector.md#memory-handling). double bufferGrowRatio = 1.5; - common::CompressionKind compression = common::CompressionKind_NONE; + arrow::Encoding::type encoding = arrow::Encoding::PLAIN; - velox::memory::MemoryPool* memoryPool; + // The default factory allows the writer to construct the default flush // policy with the configs in its ctor. std::function()> flushPolicyFactory; std::shared_ptr codecOptions; std::unordered_map columnCompressionsMap; - uint8_t parquetWriteTimestampUnit = - static_cast(TimestampUnit::kNano); + + /// Timestamp unit for Parquet write through Arrow bridge. + /// Default if not specified: TimestampUnit::kNano (9). + std::optional parquetWriteTimestampUnit; bool writeInt96AsTimestamp = false; + + // Parsing session and hive configs. + + // This isn't a typo; session and hive connector config names are different + // ('_' vs '-'). + static constexpr const char* kParquetSessionWriteTimestampUnit = + "hive.parquet.writer.timestamp_unit"; + static constexpr const char* kParquetHiveConnectorWriteTimestampUnit = + "hive.parquet.writer.timestamp-unit"; + + // Process hive connector and session configs. + void processSessionConfigs(const Config& config) override; + void processHiveConnectorConfigs(const Config& config) override; }; // Writes Velox vectors into a DataSink using Arrow Parquet writer. @@ -176,6 +193,8 @@ class ParquetWriterFactory : public dwio::common::WriterFactory { std::unique_ptr createWriter( std::unique_ptr sink, const std::shared_ptr& options) override; + + std::unique_ptr createWriterOptions() override; }; } // namespace facebook::velox::parquet