From b356a66c47e26afd846784f87a0953d61d6dc9c2 Mon Sep 17 00:00:00 2001 From: wypb Date: Fri, 30 Aug 2024 11:19:03 +0800 Subject: [PATCH] Add sessionTimezone and adjustTimestampToTimezone to DWRF reader and writer options --- velox/connectors/Connector.h | 10 ++++ velox/connectors/hive/HiveConnectorUtil.cpp | 2 + velox/connectors/hive/HiveDataSink.cpp | 7 +++ velox/connectors/hive/SplitReader.cpp | 9 ++-- .../hive/tests/HiveDataSinkTest.cpp | 1 + velox/dwio/common/Options.h | 15 +++++- .../common/tests/utils/E2EFilterTestBase.cpp | 5 ++ .../common/tests/utils/E2EFilterTestBase.h | 2 + velox/dwio/dwrf/reader/DwrfData.h | 8 +++ velox/dwio/dwrf/reader/DwrfReader.cpp | 19 ++----- velox/dwio/dwrf/reader/DwrfReader.h | 1 - velox/dwio/dwrf/reader/ReaderBase.cpp | 50 +++++------------ velox/dwio/dwrf/reader/ReaderBase.h | 53 +++++++------------ velox/dwio/dwrf/reader/StripeStream.h | 20 +++++++ .../dwio/dwrf/test/ColumnWriterStatsTests.cpp | 1 + velox/dwio/dwrf/test/ColumnWriterTest.cpp | 9 ++++ velox/dwio/dwrf/test/E2EFilterTest.cpp | 4 ++ velox/dwio/dwrf/test/E2EReaderTest.cpp | 1 + velox/dwio/dwrf/test/E2EWriterTest.cpp | 11 ++-- velox/dwio/dwrf/test/OrcTest.h | 8 +++ velox/dwio/dwrf/test/ReaderBaseTests.cpp | 8 ++- velox/dwio/dwrf/test/ReaderTest.cpp | 27 ++++++++-- .../dwio/dwrf/test/StripeReaderBaseTests.cpp | 4 +- velox/dwio/dwrf/test/TestStripeStream.cpp | 26 ++++++--- velox/dwio/dwrf/test/WriterTest.cpp | 4 +- .../dwrf/test/utils/E2EWriterTestUtil.cpp | 1 + velox/dwio/dwrf/writer/Writer.cpp | 10 +++- velox/dwio/dwrf/writer/Writer.h | 2 + velox/dwio/dwrf/writer/WriterBase.h | 9 +++- velox/dwio/dwrf/writer/WriterContext.cpp | 4 ++ velox/dwio/dwrf/writer/WriterContext.h | 12 +++++ velox/exec/Operator.cpp | 1 + 32 files changed, 231 insertions(+), 113 deletions(-) diff --git a/velox/connectors/Connector.h b/velox/connectors/Connector.h index 1dc77823e4366..33130977a52b8 100644 --- a/velox/connectors/Connector.h +++ b/velox/connectors/Connector.h @@ -269,6 +269,7 @@ class ConnectorQueryCtx { const std::string& planNodeId, int driverId, const std::string& sessionTimezone, + bool adjustTimestampToTimezone = false, folly::CancellationToken cancellationToken = {}) : operatorPool_(operatorPool), connectorPool_(connectorPool), @@ -283,6 +284,7 @@ class ConnectorQueryCtx { driverId_(driverId), planNodeId_(planNodeId), sessionTimezone_(sessionTimezone), + adjustTimestampToTimezone_(adjustTimestampToTimezone), cancellationToken_(std::move(cancellationToken)) { VELOX_CHECK_NOT_NULL(sessionProperties); } @@ -351,6 +353,13 @@ class ConnectorQueryCtx { return sessionTimezone_; } + /// Whether to adjust Timestamp to the timeZone obtained through + /// sessionTimezone(). This is used to be compatible with the + /// old logic of Presto. + const bool adjustTimestampToTimezone() const { + return adjustTimestampToTimezone_; + } + /// Returns the cancellation token associated with this task. const folly::CancellationToken& cancellationToken() const { return cancellationToken_; @@ -370,6 +379,7 @@ class ConnectorQueryCtx { const int driverId_; const std::string planNodeId_; const std::string sessionTimezone_; + bool adjustTimestampToTimezone_; const folly::CancellationToken cancellationToken_; }; diff --git a/velox/connectors/hive/HiveConnectorUtil.cpp b/velox/connectors/hive/HiveConnectorUtil.cpp index 236120e1763f2..25df7d3978a86 100644 --- a/velox/connectors/hive/HiveConnectorUtil.cpp +++ b/velox/connectors/hive/HiveConnectorUtil.cpp @@ -564,6 +564,8 @@ void configureReaderOptions( const auto timezone = tz::locateZone(sessionTzName); readerOptions.setSessionTimezone(timezone); } + readerOptions.setAdjustTimestampToTimezone( + connectorQueryCtx->adjustTimestampToTimezone()); if (readerOptions.fileFormat() != dwio::common::FileFormat::UNKNOWN) { VELOX_CHECK( diff --git a/velox/connectors/hive/HiveDataSink.cpp b/velox/connectors/hive/HiveDataSink.cpp index dfdccc75fc710..70fb75fa0cd0d 100644 --- a/velox/connectors/hive/HiveDataSink.cpp +++ b/velox/connectors/hive/HiveDataSink.cpp @@ -743,6 +743,13 @@ uint32_t HiveDataSink::appendWriter(const HiveWriterId& id) { connectorSessionProperties, options); + const auto& sessionTzName = connectorQueryCtx_->sessionTimezone(); + if (!sessionTzName.empty()) { + options->sessionTimezone = tz::locateZone(sessionTzName); + } + options->adjustTimestampToTimezone = + connectorQueryCtx_->adjustTimestampToTimezone(); + // Prevents the memory allocation during the writer creation. WRITER_NON_RECLAIMABLE_SECTION_GUARD(writerInfo_.size() - 1); auto writer = writerFactory_->createWriter( diff --git a/velox/connectors/hive/SplitReader.cpp b/velox/connectors/hive/SplitReader.cpp index e9bf9387a0670..ca5e393926c05 100644 --- a/velox/connectors/hive/SplitReader.cpp +++ b/velox/connectors/hive/SplitReader.cpp @@ -34,7 +34,8 @@ VectorPtr newConstantFromString( const TypePtr& type, const std::optional& value, vector_size_t size, - velox::memory::MemoryPool* pool) { + velox::memory::MemoryPool* pool, + const std::string& sessionTimezone) { using T = typename TypeTraits::NativeType; if (!value.has_value()) { return std::make_shared>(pool, size, true, type, T()); @@ -362,7 +363,8 @@ std::vector SplitReader::adaptColumns( infoColumnType, iter->second, 1, - connectorQueryCtx_->memoryPool()); + connectorQueryCtx_->memoryPool(), + connectorQueryCtx_->sessionTimezone()); childSpec->setConstantValue(constant); } else { auto fileTypeIdx = fileType->getChildIdxIfExists(fieldName); @@ -414,7 +416,8 @@ void SplitReader::setPartitionValue( type, value, 1, - connectorQueryCtx_->memoryPool()); + connectorQueryCtx_->memoryPool(), + connectorQueryCtx_->sessionTimezone()); spec->setConstantValue(constant); } diff --git a/velox/connectors/hive/tests/HiveDataSinkTest.cpp b/velox/connectors/hive/tests/HiveDataSinkTest.cpp index e464e14784ef9..e849d79d9aeb8 100644 --- a/velox/connectors/hive/tests/HiveDataSinkTest.cpp +++ b/velox/connectors/hive/tests/HiveDataSinkTest.cpp @@ -1067,6 +1067,7 @@ TEST_F(HiveDataSinkTest, flushPolicyWithDWRF) { dataSink->close(); dwio::common::ReaderOptions readerOpts{pool_.get()}; + readerOpts.setFileFormat(dwio::common::FileFormat::DWRF); const std::vector filePaths = listFiles(outputDirectory->getPath()); auto bufferedInput = std::make_unique( diff --git a/velox/dwio/common/Options.h b/velox/dwio/common/Options.h index 90c4822302402..a67c4ad93aefc 100644 --- a/velox/dwio/common/Options.h +++ b/velox/dwio/common/Options.h @@ -502,6 +502,11 @@ class ReaderOptions : public io::ReaderOptions { return *this; } + ReaderOptions& setAdjustTimestampToTimezone(bool adjustTimestampToTimezone) { + adjustTimestampToTimezone_ = adjustTimestampToTimezone; + return *this; + } + /// Gets the desired tail location. uint64_t tailLocation() const { return tailLocation_; @@ -541,10 +546,14 @@ class ReaderOptions : public io::ReaderOptions { return ioExecutor_; } - const tz::TimeZone* getSessionTimezone() const { + const tz::TimeZone* sessionTimezone() const { return sessionTimezone_; } + bool adjustTimestampToTimezone() const { + return adjustTimestampToTimezone_; + } + bool fileColumnNamesReadAsLowerCase() const { return fileColumnNamesReadAsLowerCase_; } @@ -591,6 +600,7 @@ class ReaderOptions : public io::ReaderOptions { std::shared_ptr randomSkip_; std::shared_ptr scanSpec_; const tz::TimeZone* sessionTimezone_{nullptr}; + bool adjustTimestampToTimezone_{false}; }; struct WriterOptions { @@ -621,6 +631,9 @@ struct WriterOptions { std::function()> flushPolicyFactory; + const tz::TimeZone* sessionTimezone{nullptr}; + bool adjustTimestampToTimezone{false}; + virtual ~WriterOptions() = default; }; diff --git a/velox/dwio/common/tests/utils/E2EFilterTestBase.cpp b/velox/dwio/common/tests/utils/E2EFilterTestBase.cpp index 287d245c84085..50a8c14a8ec51 100644 --- a/velox/dwio/common/tests/utils/E2EFilterTestBase.cpp +++ b/velox/dwio/common/tests/utils/E2EFilterTestBase.cpp @@ -97,6 +97,7 @@ void E2EFilterTestBase::readWithoutFilter( dwio::common::RowReaderOptions rowReaderOpts; auto input = std::make_unique( std::make_shared(sinkData_), readerOpts.memoryPool()); + setUpReaderOptions(readerOpts); auto reader = makeReader(readerOpts, std::move(input)); // The spec must stay live over the lifetime of the reader. @@ -150,6 +151,7 @@ void E2EFilterTestBase::readWithFilter( dwio::common::RowReaderOptions rowReaderOpts; auto input = std::make_unique( std::make_shared(sinkData_), readerOpts.memoryPool()); + setUpReaderOptions(readerOpts); auto reader = makeReader(readerOpts, std::move(input)); // The spec must stay live over the lifetime of the reader. @@ -458,6 +460,7 @@ void E2EFilterTestBase::testMetadataFilterImpl( RowReaderOptions rowReaderOpts; auto input = std::make_unique( std::make_shared(sinkData_), readerOpts.memoryPool()); + setUpReaderOptions(readerOpts); auto reader = makeReader(readerOpts, std::move(input)); setUpRowReaderOptions(rowReaderOpts, spec); rowReaderOpts.setMetadataFilter(metadataFilter); @@ -654,6 +657,7 @@ void E2EFilterTestBase::testSubfieldsPruning() { RowReaderOptions rowReaderOpts; auto input = std::make_unique( std::make_shared(sinkData_), readerOpts.memoryPool()); + setUpReaderOptions(readerOpts); auto reader = makeReader(readerOpts, std::move(input)); setUpRowReaderOptions(rowReaderOpts, spec); auto rowReader = reader->createRowReader(rowReaderOpts); @@ -718,6 +722,7 @@ void E2EFilterTestBase::testMutationCornerCases() { ReaderOptions readerOpts{leafPool_.get()}; auto input = std::make_unique( std::make_shared(sinkData_), readerOpts.memoryPool()); + setUpReaderOptions(readerOpts); auto reader = makeReader(readerOpts, std::move(input)); // 1. Interleave batches with and without deletions. diff --git a/velox/dwio/common/tests/utils/E2EFilterTestBase.h b/velox/dwio/common/tests/utils/E2EFilterTestBase.h index 9aafbc4ac118e..a788cb068f7ad 100644 --- a/velox/dwio/common/tests/utils/E2EFilterTestBase.h +++ b/velox/dwio/common/tests/utils/E2EFilterTestBase.h @@ -218,6 +218,8 @@ class E2EFilterTestBase : public testing::Test { const std::vector& batches, bool forRowGroupSkip) = 0; + virtual void setUpReaderOptions(ReaderOptions&) {} + virtual std::unique_ptr makeReader( const dwio::common::ReaderOptions& opts, std::unique_ptr input) = 0; diff --git a/velox/dwio/dwrf/reader/DwrfData.h b/velox/dwio/dwrf/reader/DwrfData.h index 995cc80ac4100..e330edbf71184 100644 --- a/velox/dwio/dwrf/reader/DwrfData.h +++ b/velox/dwio/dwrf/reader/DwrfData.h @@ -149,6 +149,14 @@ class DwrfParams : public dwio::common::FormatParams { return streamLabels_; } + const tz::TimeZone* sessionTimezone() const { + return stripeStreams_.sessionTimezone(); + } + + bool adjustTimestampToTimezone() const { + return stripeStreams_.adjustTimestampToTimezone(); + } + private: const StreamLabels& streamLabels_; StripeStreams& stripeStreams_; diff --git a/velox/dwio/dwrf/reader/DwrfReader.cpp b/velox/dwio/dwrf/reader/DwrfReader.cpp index d20d67bbb0a94..185c7934f9058 100644 --- a/velox/dwio/dwrf/reader/DwrfReader.cpp +++ b/velox/dwio/dwrf/reader/DwrfReader.cpp @@ -783,26 +783,15 @@ std::optional DwrfRowReader::estimatedRowSize() const { DwrfReader::DwrfReader( const ReaderOptions& options, std::unique_ptr input) - : readerBase_(std::make_unique( - options.memoryPool(), - std::move(input), - options.decrypterFactory(), - options.footerEstimatedSize(), - options.filePreloadThreshold(), - options.fileFormat() == FileFormat::ORC ? FileFormat::ORC - : FileFormat::DWRF, - options.fileColumnNamesReadAsLowerCase(), - options.randomSkip(), - options.scanSpec())), - options_(options) { + : readerBase_(std::make_unique(options, std::move(input))) { // If we are not using column names to map table columns to file columns, // then we use indices. In that case we need to ensure the names completely // match, because we are still mapping columns by names further down the // code. So we rename column names in the file schema to match table schema. // We test the options to have 'fileSchema' (actually table schema) as most // of the unit tests fail to provide it. - if ((!options_.useColumnNamesForColumnMapping()) && - (options_.fileSchema() != nullptr)) { + if ((!readerBase_->readerOptions().useColumnNamesForColumnMapping()) && + (readerBase_->readerOptions().fileSchema() != nullptr)) { updateColumnNamesFromTableSchema(); } } @@ -899,7 +888,7 @@ TypePtr updateColumnNames( } // namespace void DwrfReader::updateColumnNamesFromTableSchema() { - const auto& tableSchema = options_.fileSchema(); + const auto& tableSchema = readerBase_->readerOptions().fileSchema(); const auto& fileSchema = readerBase_->schema(); readerBase_->setSchema(std::dynamic_pointer_cast( updateColumnNames(fileSchema, tableSchema, "", ""))); diff --git a/velox/dwio/dwrf/reader/DwrfReader.h b/velox/dwio/dwrf/reader/DwrfReader.h index 43a4dc5fae7b2..c1ea1182b0996 100644 --- a/velox/dwio/dwrf/reader/DwrfReader.h +++ b/velox/dwio/dwrf/reader/DwrfReader.h @@ -346,7 +346,6 @@ class DwrfReader : public dwio::common::Reader { private: std::shared_ptr readerBase_; - const dwio::common::ReaderOptions options_; }; class DwrfReaderFactory : public dwio::common::ReaderFactory { diff --git a/velox/dwio/dwrf/reader/ReaderBase.cpp b/velox/dwio/dwrf/reader/ReaderBase.cpp index ff2b13c9dd7c8..bc02de00d0adf 100644 --- a/velox/dwio/dwrf/reader/ReaderBase.cpp +++ b/velox/dwio/dwrf/reader/ReaderBase.cpp @@ -77,44 +77,21 @@ FooterStatisticsImpl::FooterStatisticsImpl( } ReaderBase::ReaderBase( - MemoryPool& pool, - std::unique_ptr input, - FileFormat fileFormat) - : ReaderBase( - pool, - std::move(input), - nullptr, - dwio::common::ReaderOptions::kDefaultFooterEstimatedSize, - dwio::common::ReaderOptions::kDefaultFilePreloadThreshold, - fileFormat) {} - -ReaderBase::ReaderBase( - MemoryPool& pool, - std::unique_ptr input, - std::shared_ptr decryptorFactory, - uint64_t footerEstimatedSize, - uint64_t filePreloadThreshold, - FileFormat fileFormat, - bool fileColumnNamesReadAsLowerCase, - std::shared_ptr randomSkip, - std::shared_ptr scanSpec) - : pool_{pool}, + const dwio::common::ReaderOptions& options, + std::unique_ptr input) + : options_{options}, arena_(std::make_unique()), - decryptorFactory_(decryptorFactory), - footerEstimatedSize_(footerEstimatedSize), - filePreloadThreshold_(filePreloadThreshold), input_(std::move(input)), - randomSkip_(std::move(randomSkip)), - scanSpec_(std::move(scanSpec)), fileLength_(input_->getReadFile()->size()) { process::TraceContext trace("ReaderBase::ReaderBase"); // TODO: make a config DWIO_ENSURE(fileLength_ > 0, "ORC file is empty"); VELOX_CHECK_GE(fileLength_, 4, "File size too small"); - const auto preloadFile = fileLength_ <= filePreloadThreshold_; - const uint64_t readSize = - preloadFile ? fileLength_ : std::min(fileLength_, footerEstimatedSize_); + const auto preloadFile = fileLength_ <= options_.filePreloadThreshold(); + const uint64_t readSize = preloadFile + ? fileLength_ + : std::min(fileLength_, options_.footerEstimatedSize()); if (input_->supportSyncLoad()) { input_->enqueue({fileLength_ - readSize, readSize, "footer"}); input_->load(preloadFile ? LogType::FILE : LogType::FOOTER); @@ -135,7 +112,7 @@ ReaderBase::ReaderBase( fileLength_, "Corrupted file, Post script size is invalid"); - if (fileFormat == FileFormat::DWRF) { + if (options.fileFormat() == FileFormat::DWRF) { auto postScript = ProtoUtils::readProto( input_->read(fileLength_ - psLength_ - 1, psLength_, LogType::FOOTER)); postScript_ = std::make_unique(std::move(postScript)); @@ -173,7 +150,7 @@ ReaderBase::ReaderBase( auto footerStream = input_->read( fileLength_ - psLength_ - footerSize - 1, footerSize, LogType::FOOTER); - if (fileFormat == FileFormat::DWRF) { + if (options.fileFormat() == FileFormat::DWRF) { auto footer = google::protobuf::Arena::CreateMessage(arena_.get()); ProtoUtils::readProtoInto( @@ -190,7 +167,7 @@ ReaderBase::ReaderBase( } schema_ = std::dynamic_pointer_cast( - convertType(*footer_, 0, fileColumnNamesReadAsLowerCase)); + convertType(*footer_, 0, options_.fileColumnNamesReadAsLowerCase())); VELOX_CHECK_NOT_NULL(schema_, "invalid schema"); // load stripe index/footer cache @@ -204,8 +181,8 @@ ReaderBase::ReaderBase( input_->read(cacheOffset, cacheSize, LogType::FOOTER)); input_->load(LogType::FOOTER); } else { - auto cacheBuffer = - std::make_shared>(pool, cacheSize); + auto cacheBuffer = std::make_shared>( + options_.memoryPool(), cacheSize); input_->read(cacheOffset, cacheSize, LogType::FOOTER) ->readFully(cacheBuffer->data(), cacheSize); cache_ = std::make_unique( @@ -226,7 +203,8 @@ ReaderBase::ReaderBase( } } // initialize file decrypter - handler_ = DecryptionHandler::create(*footer_, decryptorFactory_.get()); + handler_ = + DecryptionHandler::create(*footer_, options_.decrypterFactory().get()); } std::vector ReaderBase::rowsPerStripe() const { diff --git a/velox/dwio/dwrf/reader/ReaderBase.h b/velox/dwio/dwrf/reader/ReaderBase.h index 34287d3fcd716..18752996feb46 100644 --- a/velox/dwio/dwrf/reader/ReaderBase.h +++ b/velox/dwio/dwrf/reader/ReaderBase.h @@ -62,33 +62,18 @@ class ReaderBase { public: /// Creates reader base from buffered input. ReaderBase( - memory::MemoryPool& pool, - std::unique_ptr input, - std::shared_ptr - decryptorFactory = nullptr, - uint64_t footerEstimatedSize = - dwio::common::ReaderOptions::kDefaultFooterEstimatedSize, - uint64_t filePreloadThreshold = - dwio::common::ReaderOptions::kDefaultFilePreloadThreshold, - dwio::common::FileFormat fileFormat = dwio::common::FileFormat::DWRF, - bool fileColumnNamesReadAsLowerCase = false, - std::shared_ptr randomSkip = nullptr, - std::shared_ptr scanSpec = nullptr); - - ReaderBase( - memory::MemoryPool& pool, - std::unique_ptr input, - dwio::common::FileFormat fileFormat); + const dwio::common::ReaderOptions& options, + std::unique_ptr input); /// Creates reader base from metadata. ReaderBase( - memory::MemoryPool& pool, + const dwio::common::ReaderOptions& options, std::unique_ptr input, std::unique_ptr ps, const proto::Footer* footer, std::unique_ptr cache, std::unique_ptr handler = nullptr) - : pool_{pool}, + : options_{options}, postScript_{std::move(ps)}, footer_{std::make_unique(footer)}, cache_{std::move(cache)}, @@ -105,12 +90,17 @@ class ReaderBase { } // for testing - explicit ReaderBase(memory::MemoryPool& pool) : pool_{pool}, fileLength_{0} {} + explicit ReaderBase(const dwio::common::ReaderOptions& options) + : options_{options}, fileLength_{0} {} virtual ~ReaderBase() = default; + const dwio::common::ReaderOptions& readerOptions() const { + return options_; + } + memory::MemoryPool& memoryPool() const { - return pool_; + return options_.memoryPool(); } const PostScript& postScript() const { @@ -131,8 +121,9 @@ class ReaderBase { const std::shared_ptr& schemaWithId() const { if (!schemaWithId_) { - if (scanSpec_) { - schemaWithId_ = dwio::common::TypeWithId::create(schema_, *scanSpec_); + if (options_.scanSpec()) { + schemaWithId_ = + dwio::common::TypeWithId::create(schema_, *options_.scanSpec()); } else { schemaWithId_ = dwio::common::TypeWithId::create(schema_); } @@ -153,7 +144,7 @@ class ReaderBase { } uint64_t footerEstimatedSize() const { - return footerEstimatedSize_; + return options_.footerEstimatedSize(); } uint64_t fileLength() const { @@ -213,7 +204,7 @@ class ReaderBase { compressionKind(), std::move(compressed), compressionBlockSize(), - pool_, + options_.memoryPool(), streamDebugInfo, decrypter); } @@ -237,7 +228,7 @@ class ReaderBase { } const std::shared_ptr& randomSkip() const { - return randomSkip_; + return options_.randomSkip(); } private: @@ -246,22 +237,14 @@ class ReaderBase { uint32_t index = 0, bool fileColumnNamesReadAsLowerCase = false); - memory::MemoryPool& pool_; + const dwio::common::ReaderOptions options_; std::unique_ptr arena_; std::unique_ptr postScript_; std::unique_ptr footer_ = nullptr; std::unique_ptr cache_; - // Keeps factory alive for possibly async prefetch. - std::shared_ptr decryptorFactory_; std::unique_ptr handler_; - const uint64_t footerEstimatedSize_{ - dwio::common::ReaderOptions::kDefaultFooterEstimatedSize}; - const uint64_t filePreloadThreshold_{ - dwio::common::ReaderOptions::kDefaultFilePreloadThreshold}; const std::unique_ptr input_; - const std::shared_ptr randomSkip_; - const std::shared_ptr scanSpec_; const uint64_t fileLength_; RowTypePtr schema_; diff --git a/velox/dwio/dwrf/reader/StripeStream.h b/velox/dwio/dwrf/reader/StripeStream.h index 362a0b43d097e..832acd40b2b3d 100644 --- a/velox/dwio/dwrf/reader/StripeStream.h +++ b/velox/dwio/dwrf/reader/StripeStream.h @@ -103,6 +103,18 @@ class StripeStreams { */ virtual const dwio::common::ColumnSelector& getColumnSelector() const = 0; + /** + * Session timezone used for reading Timestamp. + */ + virtual const tz::TimeZone* sessionTimezone() const = 0; + + /** + * Whether to adjust Timestamp to the timeZone obtained via + * sessionTimezone(). This is used to be compatible with the + * old logic of Presto. + */ + virtual bool adjustTimestampToTimezone() const = 0; + // Get row reader options virtual const dwio::common::RowReaderOptions& rowReaderOptions() const = 0; @@ -250,6 +262,14 @@ class StripeStreamsImpl : public StripeStreamsBase { return *selector_; } + const tz::TimeZone* sessionTimezone() const override { + return readState_->readerBase->readerOptions().sessionTimezone(); + } + + bool adjustTimestampToTimezone() const override { + return readState_->readerBase->readerOptions().adjustTimestampToTimezone(); + } + const dwio::common::RowReaderOptions& rowReaderOptions() const override { return opts_; } diff --git a/velox/dwio/dwrf/test/ColumnWriterStatsTests.cpp b/velox/dwio/dwrf/test/ColumnWriterStatsTests.cpp index f0d66c6f699bf..f07ea9bee6c05 100644 --- a/velox/dwio/dwrf/test/ColumnWriterStatsTests.cpp +++ b/velox/dwio/dwrf/test/ColumnWriterStatsTests.cpp @@ -195,6 +195,7 @@ class ColumnWriterStatsTest : public ::testing::Test { auto input = std::make_unique(readFile, *leafPool_); dwio::common::ReaderOptions readerOpts{leafPool_.get()}; + readerOpts.setFileFormat(FileFormat::DWRF); RowReaderOptions rowReaderOpts; auto reader = std::make_unique(readerOpts, std::move(input)); return reader->createRowReader(rowReaderOpts); diff --git a/velox/dwio/dwrf/test/ColumnWriterTest.cpp b/velox/dwio/dwrf/test/ColumnWriterTest.cpp index 798d60461042c..829418e3ce950 100644 --- a/velox/dwio/dwrf/test/ColumnWriterTest.cpp +++ b/velox/dwio/dwrf/test/ColumnWriterTest.cpp @@ -154,6 +154,14 @@ class TestStripeStreams : public StripeStreamsBase { return selector_; } + const tz::TimeZone* sessionTimezone() const override { + return context_.sessionTimezone(); + } + + bool adjustTimestampToTimezone() const override { + return context_.adjustTimestampToTimezone(); + } + const RowReaderOptions& rowReaderOptions() const override { return options_; } @@ -1631,6 +1639,7 @@ std::unique_ptr getDwrfReader( std::string data(sinkPtr->data(), sinkPtr->size()); dwio::common::ReaderOptions readerOpts{&leafPool}; + readerOpts.setFileFormat(FileFormat::DWRF); return std::make_unique( readerOpts, std::make_unique( diff --git a/velox/dwio/dwrf/test/E2EFilterTest.cpp b/velox/dwio/dwrf/test/E2EFilterTest.cpp index 43b67e91e5508..78ad00e3c84ac 100644 --- a/velox/dwio/dwrf/test/E2EFilterTest.cpp +++ b/velox/dwio/dwrf/test/E2EFilterTest.cpp @@ -98,6 +98,10 @@ class E2EFilterTest : public E2EFilterTestBase { } } + void setUpReaderOptions(dwio::common::ReaderOptions& opts) override { + opts.setFileFormat(FileFormat::DWRF); + } + std::unique_ptr makeReader( const dwio::common::ReaderOptions& opts, std::unique_ptr input) override { diff --git a/velox/dwio/dwrf/test/E2EReaderTest.cpp b/velox/dwio/dwrf/test/E2EReaderTest.cpp index 3e3ab15f1964d..1c195e7681f8a 100644 --- a/velox/dwio/dwrf/test/E2EReaderTest.cpp +++ b/velox/dwio/dwrf/test/E2EReaderTest.cpp @@ -175,6 +175,7 @@ TEST_P(E2EReaderTest, SharedDictionaryFlatmapReadAsStruct) { writer.reset(); dwio::common::ReaderOptions readerOpts{pool.get()}; + readerOpts.setFileFormat(FileFormat::DWRF); auto bufferedInput = std::make_unique( std::make_shared(path), *pool); auto reader = DwrfReader::create(std::move(bufferedInput), readerOpts); diff --git a/velox/dwio/dwrf/test/E2EWriterTest.cpp b/velox/dwio/dwrf/test/E2EWriterTest.cpp index dd2cc39f01d3f..0f21a9db8b6da 100644 --- a/velox/dwio/dwrf/test/E2EWriterTest.cpp +++ b/velox/dwio/dwrf/test/E2EWriterTest.cpp @@ -21,17 +21,15 @@ #include "velox/common/testutil/TestValue.h" #include "velox/dwio/common/Options.h" #include "velox/dwio/common/Statistics.h" -#include "velox/dwio/common/TypeWithId.h" #include "velox/dwio/common/encryption/TestProvider.h" #include "velox/dwio/common/tests/utils/BatchMaker.h" #include "velox/dwio/common/tests/utils/MapBuilder.h" #include "velox/dwio/dwrf/common/Config.h" +#include "velox/dwio/dwrf/reader/ColumnReader.h" #include "velox/dwio/dwrf/reader/DwrfReader.h" #include "velox/dwio/dwrf/test/OrcTest.h" #include "velox/dwio/dwrf/test/utils/E2EWriterTestUtil.h" -#include "velox/dwio/dwrf/writer/Writer.h" #include "velox/type/fbhive/HiveTypeParser.h" -#include "velox/vector/FlatVector.h" #include "velox/vector/fuzzer/VectorFuzzer.h" #include "velox/vector/tests/utils/VectorMaker.h" @@ -62,7 +60,7 @@ class E2EWriterTest : public testing::Test { leafPool_ = rootPool_->addLeafChild("leaf"); } - std::unique_ptr createReader( + static std::unique_ptr createReader( const MemorySink& sink, const dwio::common::ReaderOptions& opts) { std::string_view data(sink.data(), sink.size()); @@ -105,6 +103,7 @@ class E2EWriterTest : public testing::Test { writer.close(); dwio::common::ReaderOptions readerOpts{leafPool_.get()}; + readerOpts.setFileFormat(FileFormat::DWRF); RowReaderOptions rowReaderOpts; auto reader = createReader(*sinkPtr, readerOpts); auto rowReader = reader->createRowReader(rowReaderOpts); @@ -167,6 +166,7 @@ class E2EWriterTest : public testing::Test { writer.close(); dwio::common::ReaderOptions readerOpts{leafPool_.get()}; + readerOpts.setFileFormat(FileFormat::DWRF); RowReaderOptions rowReaderOpts; auto reader = createReader(*sinkPtr, readerOpts); auto rowReader = reader->createRowReader(rowReaderOpts); @@ -567,6 +567,7 @@ TEST_F(E2EWriterTest, PresentStreamIsSuppressedOnFlatMap) { dwrf::E2EWriterTestUtil::simpleFlushPolicyFactory(true)); dwio::common::ReaderOptions readerOpts{leafPool_.get()}; + readerOpts.setFileFormat(FileFormat::DWRF); RowReaderOptions rowReaderOpts; auto reader = createReader(*sinkPtr, readerOpts); auto rowReader = reader->createRowReader(rowReaderOpts); @@ -940,6 +941,7 @@ TEST_F(E2EWriterTest, PartialStride) { writer.close(); dwio::common::ReaderOptions readerOpts{leafPool_.get()}; + readerOpts.setFileFormat(FileFormat::DWRF); RowReaderOptions rowReaderOpts; auto reader = createReader(*sinkPtr, readerOpts); ASSERT_EQ( @@ -1140,6 +1142,7 @@ class E2EEncryptionTest : public E2EWriterTest { // read it back for compare dwio::common::ReaderOptions readerOpts{leafPool_.get()}; + readerOpts.setFileFormat(FileFormat::DWRF); readerOpts.setDecrypterFactory(decrypterFactory); return createReader(*sink_, readerOpts); } diff --git a/velox/dwio/dwrf/test/OrcTest.h b/velox/dwio/dwrf/test/OrcTest.h index 942ad269242af..8e21bb8135686 100644 --- a/velox/dwio/dwrf/test/OrcTest.h +++ b/velox/dwio/dwrf/test/OrcTest.h @@ -108,6 +108,14 @@ class MockStripeStreams : public StripeStreams { return *getColumnSelectorProxy(); } + const tz::TimeZone* sessionTimezone() const override { + return nullptr; + } + + bool adjustTimestampToTimezone() const override { + return false; + } + const dwio::common::RowReaderOptions& rowReaderOptions() const override { auto ptr = getRowReaderOptionsProxy(); return ptr ? *ptr : options_; diff --git a/velox/dwio/dwrf/test/ReaderBaseTests.cpp b/velox/dwio/dwrf/test/ReaderBaseTests.cpp index 17bb111780a6c..d4550e40530af 100644 --- a/velox/dwio/dwrf/test/ReaderBaseTests.cpp +++ b/velox/dwio/dwrf/test/ReaderBaseTests.cpp @@ -103,8 +103,10 @@ class EncryptedStatsTest : public Test { auto readFile = std::make_shared(std::string()); readerPool_ = pool_->addLeafChild("reader"); + facebook::velox::dwio::common::ReaderOptions readerOpts{readerPool_.get()}; + readerOpts.setFileFormat(FileFormat::DWRF); reader_ = std::make_unique( - *readerPool_, + readerOpts, std::make_unique(readFile, *readerPool_), std::make_unique(std::move(ps)), footer, @@ -212,8 +214,10 @@ std::unique_ptr createCorruptedFileReader( sink.write(std::move(buf)); auto readFile = std::make_shared( std::string(sink.data(), sink.size())); + facebook::velox::dwio::common::ReaderOptions readerOpts{pool.get()}; + readerOpts.setFileFormat(FileFormat::DWRF); return std::make_unique( - *pool, std::make_unique(readFile, *pool)); + readerOpts, std::make_unique(readFile, *pool)); } class ReaderBaseTest : public Test { diff --git a/velox/dwio/dwrf/test/ReaderTest.cpp b/velox/dwio/dwrf/test/ReaderTest.cpp index a463ea71aab75..474904f6430f5 100644 --- a/velox/dwio/dwrf/test/ReaderTest.cpp +++ b/velox/dwio/dwrf/test/ReaderTest.cpp @@ -241,7 +241,7 @@ void verifyFlatMapReading( const std::vector& expectedPrefetchRowSizes = {}, const std::vector& shouldTryPrefetch = {}) { dwio::common::ReaderOptions readerOpts{pool}; - + readerOpts.setFileFormat(FileFormat::DWRF); /* If an extra sanity check is desired you can uncomment the 2 below lines and * re-run */ // readerOpts.setFooterEstimatedSize(257); @@ -361,6 +361,7 @@ TEST_P(TestFlatMapReader, testReadFlatMapEmptyMap) { auto returnFlatVector = GetParam(); dwio::common::ReaderOptions readerOpts{pool()}; + readerOpts.setFileFormat(FileFormat::DWRF); RowReaderOptions rowReaderOpts; rowReaderOpts.setReturnFlatVector(returnFlatVector); std::shared_ptr emptyFileType = @@ -391,6 +392,7 @@ TEST_P(TestFlatMapReader, testStringKeyLifeCycle) { VectorPtr batch; dwio::common::ReaderOptions readerOptions{pool()}; + readerOptions.setFileFormat(FileFormat::DWRF); { RowReaderOptions rowReaderOptions; @@ -506,6 +508,7 @@ class TestFlatMapReaderFlatLayout TEST_P(TestFlatMapReaderFlatLayout, testCompare) { dwio::common::ReaderOptions readerOptions{pool()}; + readerOptions.setFileFormat(FileFormat::DWRF); auto reader = DwrfReader::create( createFileBufferedInput(getFMSmallFile(), readerOptions.memoryPool()), readerOptions); @@ -539,6 +542,7 @@ TEST_F(TestReader, testReadFlatMapWithKeyFilters) { // batch size is set as 1000 in reading // file has schema: a int, b struct, c float dwio::common::ReaderOptions readerOpts{pool()}; + readerOpts.setFileFormat(FileFormat::DWRF); RowReaderOptions rowReaderOpts; // set map key filter for map1 we only need key=1, and map2 only key-1 auto cs = std::make_shared( @@ -592,6 +596,7 @@ TEST_F(TestReader, testReadFlatMapWithKeyRejectList) { // batch size is set as 1000 in reading // file has schema: a int, b struct, c float dwio::common::ReaderOptions readerOpts{pool()}; + readerOpts.setFileFormat(FileFormat::DWRF); RowReaderOptions rowReaderOpts; auto cs = std::make_shared( getFlatmapSchema(), std::vector{"map1#[\"!2\",\"!3\"]"}); @@ -647,7 +652,7 @@ TEST_F(TestReader, testStatsCallbackFiredWithFiltering) { }); dwio::common::ReaderOptions readerOpts{pool()}; - + readerOpts.setFileFormat(FileFormat::DWRF); auto reader = DwrfReader::create( createFileBufferedInput(getFMSmallFile(), readerOpts.memoryPool()), readerOpts); @@ -685,7 +690,7 @@ TEST_F(TestReader, testBlockedIoCallbackFiredBlocking) { rowReaderOpts.setEagerFirstStripeLoad(false); dwio::common::ReaderOptions readerOpts{pool()}; - + readerOpts.setFileFormat(FileFormat::DWRF); auto reader = DwrfReader::create( createFileBufferedInput(getFMLargeFile(), readerOpts.memoryPool()), readerOpts); @@ -813,6 +818,7 @@ TEST_F(TestReader, DISABLED_testBlockedIoCallbackFiredWithFirstStripeLoad) { TEST_F(TestReader, testEstimatedSize) { dwio::common::ReaderOptions readerOpts{pool()}; + readerOpts.setFileFormat(FileFormat::DWRF); { auto reader = DwrfReader::create( createFileBufferedInput(getFMSmallFile(), readerOpts.memoryPool()), @@ -858,7 +864,7 @@ TEST_F(TestReader, testStatsCallbackFiredWithoutFiltering) { }); dwio::common::ReaderOptions readerOpts{pool()}; - + readerOpts.setFileFormat(FileFormat::DWRF); auto reader = DwrfReader::create( createFileBufferedInput(getFMSmallFile(), readerOpts.memoryPool()), readerOpts); @@ -943,6 +949,7 @@ void verifyFlatmapStructEncoding( const std::vector& keysToSelect, size_t batchSize = 1000) { dwio::common::ReaderOptions readerOpts{pool}; + readerOpts.setFileFormat(FileFormat::DWRF); auto reader = DwrfReader::create( createFileBufferedInput(filename, readerOpts.memoryPool()), readerOpts); @@ -1049,6 +1056,7 @@ TEST_F(TestReader, testFlatmapAsStructRequiringKeyList) { TEST_F(TestReader, testMismatchSchemaMoreFields) { // file has schema: a int, b struct, c float dwio::common::ReaderOptions readerOpts{pool()}; + readerOpts.setFileFormat(FileFormat::DWRF); RowReaderOptions rowReaderOpts; std::shared_ptr requestedType = std::dynamic_pointer_cast(HiveTypeParser().parse( @@ -1094,6 +1102,7 @@ TEST_F(TestReader, testMismatchSchemaMoreFields) { TEST_F(TestReader, testMismatchSchemaFewerFields) { // file has schema: a int, b struct, c float dwio::common::ReaderOptions readerOpts{pool()}; + readerOpts.setFileFormat(FileFormat::DWRF); RowReaderOptions rowReaderOpts; std::shared_ptr requestedType = std::dynamic_pointer_cast(HiveTypeParser().parse( @@ -1135,6 +1144,7 @@ TEST_F(TestReader, testMismatchSchemaFewerFields) { TEST_F(TestReader, testMismatchSchemaNestedMoreFields) { // file has schema: a int, b struct, c float dwio::common::ReaderOptions readerOpts{pool()}; + readerOpts.setFileFormat(FileFormat::DWRF); RowReaderOptions rowReaderOpts; std::shared_ptr requestedType = std::dynamic_pointer_cast(HiveTypeParser().parse( @@ -1201,6 +1211,7 @@ TEST_F(TestReader, testMismatchSchemaNestedMoreFields) { TEST_F(TestReader, testMismatchSchemaNestedFewerFields) { // file has schema: a int, b struct, c float dwio::common::ReaderOptions readerOpts{pool()}; + readerOpts.setFileFormat(FileFormat::DWRF); RowReaderOptions rowReaderOpts; std::shared_ptr requestedType = std::dynamic_pointer_cast(HiveTypeParser().parse( @@ -1258,6 +1269,7 @@ TEST_F(TestReader, testMismatchSchemaNestedFewerFields) { TEST_F(TestReader, testMismatchSchemaIncompatibleNotSelected) { // file has schema: a int, b struct, c float dwio::common::ReaderOptions readerOpts{pool()}; + readerOpts.setFileFormat(FileFormat::DWRF); RowReaderOptions rowReaderOpts; std::shared_ptr requestedType = std::dynamic_pointer_cast(HiveTypeParser().parse( @@ -1339,6 +1351,7 @@ TEST_F(TestReader, testMismatchSchemaIncompatible) { TEST_F(TestReader, fileColumnNamesReadAsLowerCase) { // upper.orc holds one columns (Bool_Val: BOOLEAN, b: BIGINT) dwio::common::ReaderOptions readerOpts{pool()}; + readerOpts.setFileFormat(FileFormat::DWRF); readerOpts.setFileColumnNamesReadAsLowerCase(true); auto reader = DwrfReader::create( createFileBufferedInput( @@ -1353,6 +1366,7 @@ TEST_F(TestReader, fileColumnNamesReadAsLowerCaseComplexStruct) { // upper_complex.orc holds type // Cc:struct>>>> dwio::common::ReaderOptions readerOpts{pool()}; + readerOpts.setFileFormat(FileFormat::DWRF); readerOpts.setFileColumnNamesReadAsLowerCase(true); auto reader = DwrfReader::create( createFileBufferedInput( @@ -1391,6 +1405,7 @@ TEST_F(TestReader, fileColumnNamesReadAsLowerCaseComplexStruct) { TEST_F(TestReader, TestStripeSizeCallback) { dwio::common::ReaderOptions readerOpts{pool()}; + readerOpts.setFileFormat(FileFormat::DWRF); readerOpts.setFilePreloadThreshold(0); readerOpts.setFooterEstimatedSize(4); RowReaderOptions rowReaderOpts; @@ -1419,6 +1434,7 @@ TEST_F(TestReader, TestStripeSizeCallback) { TEST_F(TestReader, TestStripeSizeCallbackLimitsOneStripe) { dwio::common::ReaderOptions readerOpts{pool()}; + readerOpts.setFileFormat(FileFormat::DWRF); readerOpts.setFilePreloadThreshold(0); readerOpts.setFooterEstimatedSize(4); RowReaderOptions rowReaderOpts; @@ -1448,6 +1464,7 @@ TEST_F(TestReader, TestStripeSizeCallbackLimitsOneStripe) { TEST_F(TestReader, TestStripeSizeCallbackLimitsTwoStripe) { dwio::common::ReaderOptions readerOpts{pool()}; + readerOpts.setFileFormat(FileFormat::DWRF); readerOpts.setFilePreloadThreshold(0); readerOpts.setFooterEstimatedSize(4); RowReaderOptions rowReaderOpts; @@ -1843,6 +1860,7 @@ void testBufferLifeCycle( std::make_shared(std::move(data)), *pool); dwio::common::ReaderOptions readerOpts{pool}; + readerOpts.setFileFormat(FileFormat::DWRF); RowReaderOptions rowReaderOpts; rowReaderOpts.setReturnFlatVector(true); auto reader = std::make_unique(readerOpts, std::move(input)); @@ -1901,6 +1919,7 @@ void testFlatmapAsMapFieldLifeCycle( std::make_shared(std::move(data)), *pool); dwio::common::ReaderOptions readerOpts{pool}; + readerOpts.setFileFormat(FileFormat::DWRF); RowReaderOptions rowReaderOpts; rowReaderOpts.setReturnFlatVector(true); auto reader = std::make_unique(readerOpts, std::move(input)); diff --git a/velox/dwio/dwrf/test/StripeReaderBaseTests.cpp b/velox/dwio/dwrf/test/StripeReaderBaseTests.cpp index 1ed91ed929fa0..242edb2448eb5 100644 --- a/velox/dwio/dwrf/test/StripeReaderBaseTests.cpp +++ b/velox/dwio/dwrf/test/StripeReaderBaseTests.cpp @@ -65,9 +65,9 @@ class StripeLoadKeysTest : public Test { auto handler = DecryptionHandler::create(FooterWrapper(footer_.get()), &factory); pool_ = memoryManager()->addLeafPool(); - + dwio::common::ReaderOptions readerOpts{pool_.get()}; reader_ = std::make_unique( - *pool_, + readerOpts, std::make_unique( std::make_shared(std::string()), *pool_), nullptr, diff --git a/velox/dwio/dwrf/test/TestStripeStream.cpp b/velox/dwio/dwrf/test/TestStripeStream.cpp index 58dd3bab49ea9..fa46be53ab9d8 100644 --- a/velox/dwio/dwrf/test/TestStripeStream.cpp +++ b/velox/dwio/dwrf/test/TestStripeStream.cpp @@ -128,8 +128,9 @@ TEST_F(StripeStreamTest, planReads) { ProtoUtils::writeType(*type, *footer); auto is = std::make_unique(); auto isPtr = is.get(); + facebook::velox::dwio::common::ReaderOptions readerOpts{pool_.get()}; auto readerBase = std::make_shared( - *pool_, + readerOpts, std::make_unique( std::move(is), *pool_, @@ -187,8 +188,9 @@ TEST_F(StripeStreamTest, filterSequences) { ProtoUtils::writeType(*type, *footer); auto is = std::make_unique(); auto isPtr = is.get(); + facebook::velox::dwio::common::ReaderOptions readerOpts{pool_.get()}; auto readerBase = std::make_shared( - *pool_, + readerOpts, std::make_unique(std::move(is), *pool_), std::make_unique(proto::PostScript{}), footer, @@ -254,8 +256,9 @@ TEST_F(StripeStreamTest, zeroLength) { ps.set_compression(proto::CompressionKind::ZSTD); auto is = std::make_unique(); auto isPtr = is.get(); + facebook::velox::dwio::common::ReaderOptions readerOpts{pool_.get()}; auto readerBase = std::make_shared( - *pool_, + readerOpts, std::make_unique(std::move(is), *pool_), std::make_unique(std::move(ps)), footer, @@ -346,8 +349,9 @@ TEST_F(StripeStreamTest, planReadsIndex) { auto is = std::make_unique(); auto isPtr = is.get(); + facebook::velox::dwio::common::ReaderOptions readerOpts{pool_.get()}; auto readerBase = std::make_shared( - *pool_, + readerOpts, std::make_unique(std::move(is), *pool_), std::make_unique(std::move(ps)), footer, @@ -477,8 +481,9 @@ TEST_F(StripeStreamTest, readEncryptedStreams) { *stripeFooter->add_encryptiongroups() = ""; auto readerPool = pool->addLeafChild("reader"); + facebook::velox::dwio::common::ReaderOptions readerOpts{pool_.get()}; auto readerBase = std::make_shared( - *readerPool, + readerOpts, std::make_unique( std::make_shared(std::string()), *readerPool), @@ -559,9 +564,10 @@ TEST_F(StripeStreamTest, schemaMismatch) { encrypter.setKey("key"); pw.writeProto(*stripeFooter->add_encryptiongroups(), group, encrypter); + facebook::velox::dwio::common::ReaderOptions readerOpts{pool_.get()}; auto readerPool = pool->addLeafChild("reader"); auto readerBase = std::make_shared( - *readerPool, + readerOpts, std::make_unique( std::make_shared(std::string()), *pool_), @@ -635,6 +641,14 @@ class TestStripeStreams : public StripeStreamsBase { VELOX_UNSUPPORTED(); } + const facebook::velox::tz::TimeZone* sessionTimezone() const override { + VELOX_UNSUPPORTED(); + } + + bool adjustTimestampToTimezone() const override { + return false; + } + const facebook::velox::dwio::common::RowReaderOptions& rowReaderOptions() const override { VELOX_UNSUPPORTED(); diff --git a/velox/dwio/dwrf/test/WriterTest.cpp b/velox/dwio/dwrf/test/WriterTest.cpp index 3a8fee8e6d6b2..c2d16f45b0824 100644 --- a/velox/dwio/dwrf/test/WriterTest.cpp +++ b/velox/dwio/dwrf/test/WriterTest.cpp @@ -61,7 +61,9 @@ class WriterTest : public Test { std::string data(sinkPtr_->data(), sinkPtr_->size()); auto readFile = std::make_shared(std::move(data)); auto input = std::make_unique(std::move(readFile), *pool_); - return std::make_unique(*pool_, std::move(input)); + dwio::common::ReaderOptions readerOpts{pool_.get()}; + readerOpts.setFileFormat(FileFormat::DWRF); + return std::make_unique(readerOpts, std::move(input)); } auto& getContext() { diff --git a/velox/dwio/dwrf/test/utils/E2EWriterTestUtil.cpp b/velox/dwio/dwrf/test/utils/E2EWriterTestUtil.cpp index 1504cd1f6185d..9624112f2f353 100644 --- a/velox/dwio/dwrf/test/utils/E2EWriterTestUtil.cpp +++ b/velox/dwio/dwrf/test/utils/E2EWriterTestUtil.cpp @@ -115,6 +115,7 @@ namespace facebook::velox::dwrf { auto input = std::make_unique(readFile, pool); dwio::common::ReaderOptions readerOpts{&pool}; + readerOpts.setFileFormat(dwio::common::FileFormat::DWRF); RowReaderOptions rowReaderOpts; auto reader = std::make_unique(readerOpts, std::move(input)); EXPECT_GE(numStripesUpper, reader->getNumberOfStripes()); diff --git a/velox/dwio/dwrf/writer/Writer.cpp b/velox/dwio/dwrf/writer/Writer.cpp index 74c0aae8b5c28..5f768bf6207c1 100644 --- a/velox/dwio/dwrf/writer/Writer.cpp +++ b/velox/dwio/dwrf/writer/Writer.cpp @@ -74,8 +74,14 @@ Writer::Writer( *options.encryptionSpec, options.encrypterFactory.get()) : nullptr); - writerBase_->initContext(options.config, pool, std::move(handler)); - + writerBase_->initContext( + options.config, + pool, + options.sessionTimezone, + options.adjustTimestampToTimezone, + std::move(handler)); + common::testutil::TestValue::adjust( + "facebook::velox::dwrf::Writer::Writer", writerBase_.get()); auto& context = writerBase_->getContext(); VELOX_CHECK_EQ( context.getTotalMemoryUsage(), diff --git a/velox/dwio/dwrf/writer/Writer.h b/velox/dwio/dwrf/writer/Writer.h index 66c38c99a5164..56ea96088fbc2 100644 --- a/velox/dwio/dwrf/writer/Writer.h +++ b/velox/dwio/dwrf/writer/Writer.h @@ -42,6 +42,8 @@ struct WriterOptions : public dwio::common::WriterOptions { WriterContext& context, const velox::dwio::common::TypeWithId& type)> columnWriterFactory; + const tz::TimeZone* sessionTimezone{nullptr}; + bool adjustTimestampToTimezone{false}; }; class Writer : public dwio::common::Writer { diff --git a/velox/dwio/dwrf/writer/WriterBase.h b/velox/dwio/dwrf/writer/WriterBase.h index 29ba724a45d27..2545f5ce4c654 100644 --- a/velox/dwio/dwrf/writer/WriterBase.h +++ b/velox/dwio/dwrf/writer/WriterBase.h @@ -74,9 +74,16 @@ class WriterBase { void initContext( const std::shared_ptr& config, std::shared_ptr pool, + const tz::TimeZone* sessionTimezone = nullptr, + const bool adjustTimestampToTimezone = false, std::unique_ptr handler = nullptr) { context_ = std::make_unique( - config, std::move(pool), sink_->metricsLog(), std::move(handler)); + config, + std::move(pool), + sink_->metricsLog(), + sessionTimezone, + adjustTimestampToTimezone, + std::move(handler)); writerSink_ = std::make_unique( *sink_, context_->getMemoryPool(MemoryUsageCategory::OUTPUT_STREAM), diff --git a/velox/dwio/dwrf/writer/WriterContext.cpp b/velox/dwio/dwrf/writer/WriterContext.cpp index 68cb916d55e2d..ce02aacf29213 100644 --- a/velox/dwio/dwrf/writer/WriterContext.cpp +++ b/velox/dwio/dwrf/writer/WriterContext.cpp @@ -27,6 +27,8 @@ WriterContext::WriterContext( const std::shared_ptr& config, std::shared_ptr pool, const dwio::common::MetricsLogPtr& metricLogger, + const tz::TimeZone* sessionTimezone, + const bool adjustTimestampToTimezone, std::unique_ptr handler) : config_{config}, pool_{std::move(pool)}, @@ -52,6 +54,8 @@ WriterContext::WriterContext( // metadata with dwio::common::request::AccessDescriptor upstream and // pass down the metric log. metricLogger_{metricLogger}, + sessionTimezone_{sessionTimezone}, + adjustTimestampToTimezone_{adjustTimestampToTimezone}, handler_{std::move(handler)} { const bool forceLowMemoryMode{getConfig(Config::FORCE_LOW_MEMORY_MODE)}; const bool disableLowMemoryMode{getConfig(Config::DISABLE_LOW_MEMORY_MODE)}; diff --git a/velox/dwio/dwrf/writer/WriterContext.h b/velox/dwio/dwrf/writer/WriterContext.h index 4c93409d8d034..9ba444d53175d 100644 --- a/velox/dwio/dwrf/writer/WriterContext.h +++ b/velox/dwio/dwrf/writer/WriterContext.h @@ -43,6 +43,8 @@ class WriterContext : public CompressionBufferPool { std::shared_ptr pool, const dwio::common::MetricsLogPtr& metricLogger = dwio::common::MetricsLog::voidLog(), + const tz::TimeZone* sessionTimezone = nullptr, + const bool adjustTimestampToTimezone = false, std::unique_ptr handler = nullptr); ~WriterContext() override; @@ -595,6 +597,14 @@ class WriterContext : public CompressionBufferPool { return compressionBuffer_.get(); } + const tz::TimeZone* sessionTimezone() const { + return sessionTimezone_; + } + + bool adjustTimestampToTimezone() const { + return adjustTimestampToTimezone_; + } + private: void validateConfigs() const; @@ -628,6 +638,8 @@ class WriterContext : public CompressionBufferPool { const bool streamSizeAboveThresholdCheckEnabled_; const uint64_t rawDataSizePerBatch_; const dwio::common::MetricsLogPtr metricLogger_; + const tz::TimeZone* sessionTimezone_; + const bool adjustTimestampToTimezone_; // Map needs referential stability because reference to map value is stored by // another class. diff --git a/velox/exec/Operator.cpp b/velox/exec/Operator.cpp index 67afed0eafa3c..ab032821c0d68 100644 --- a/velox/exec/Operator.cpp +++ b/velox/exec/Operator.cpp @@ -68,6 +68,7 @@ OperatorCtx::createConnectorQueryCtx( planNodeId, driverCtx_->driverId, driverCtx_->queryConfig().sessionTimezone(), + driverCtx_->queryConfig().adjustTimestampToTimezone(), task->getCancellationToken()); }