diff --git a/velox/connectors/hive/HiveConnector.h b/velox/connectors/hive/HiveConnector.h index 7ae846c2b3ff..37901cc4a2e0 100644 --- a/velox/connectors/hive/HiveConnector.h +++ b/velox/connectors/hive/HiveConnector.h @@ -269,12 +269,12 @@ class HiveConnectorFactory : public ConnectorFactory { "hive-hadoop2"; HiveConnectorFactory() : ConnectorFactory(kHiveConnectorName) { - dwio::common::FileSink::registerFactory(); + dwio::common::LocalFileSink::registerFactory(); } HiveConnectorFactory(const char* FOLLY_NONNULL connectorName) : ConnectorFactory(connectorName) { - dwio::common::FileSink::registerFactory(); + dwio::common::LocalFileSink::registerFactory(); } std::shared_ptr newConnector( diff --git a/velox/dwio/common/DataSink.cpp b/velox/dwio/common/DataSink.cpp index 5bc95db68b0f..671e74233f62 100644 --- a/velox/dwio/common/DataSink.cpp +++ b/velox/dwio/common/DataSink.cpp @@ -15,6 +15,8 @@ */ #include "velox/dwio/common/DataSink.h" + +#include "velox/common/base/Fs.h" #include "velox/dwio/common/exception/Exception.h" #include @@ -23,11 +25,15 @@ namespace facebook::velox::dwio::common { -FileSink::FileSink( +LocalFileSink::LocalFileSink( const std::string& name, const MetricsLogPtr& metricLogger, IoStatistics* stats) : DataSink{name, metricLogger, stats} { + auto dir = fs::path(name).parent_path(); + if (!fs::exists(dir)) { + DWIO_ENSURE(velox::common::generateFileDirectory(dir.c_str())); + } file_ = open(name_.c_str(), O_CREAT | O_WRONLY | O_TRUNC, S_IRUSR | S_IWUSR); if (file_ == -1) { markClosed(); @@ -35,7 +41,7 @@ FileSink::FileSink( } } -void FileSink::write(std::vector>& buffers) { +void LocalFileSink::write(std::vector>& buffers) { writeImpl(buffers, [&](auto& buffer) { size_t size = buffer.size(); size_t offset = 0; @@ -93,19 +99,20 @@ std::unique_ptr DataSink::create( return result; } } - return std::make_unique(path, metricsLog, stats); + return std::make_unique(path, metricsLog, stats); } -static std::unique_ptr fileSink( +static std::unique_ptr localFileSink( const std::string& filename, const MetricsLogPtr& metricsLog, IoStatistics* stats = nullptr) { if (strncmp(filename.c_str(), "file:", 5) == 0) { - return std::make_unique(filename.substr(5), metricsLog, stats); + return std::make_unique( + filename.substr(5), metricsLog, stats); } return nullptr; } -VELOX_REGISTER_DATA_SINK_METHOD_DEFINITION(FileSink, fileSink); +VELOX_REGISTER_DATA_SINK_METHOD_DEFINITION(LocalFileSink, localFileSink); } // namespace facebook::velox::dwio::common diff --git a/velox/dwio/common/DataSink.h b/velox/dwio/common/DataSink.h index b58ce0ef736f..0d26e8aa1765 100644 --- a/velox/dwio/common/DataSink.h +++ b/velox/dwio/common/DataSink.h @@ -133,14 +133,14 @@ class DataSink : public Closeable { } }; -class FileSink : public DataSink { +class LocalFileSink : public DataSink { public: - explicit FileSink( + explicit LocalFileSink( const std::string& name, const MetricsLogPtr& metricLogger = MetricsLog::voidLog(), IoStatistics* stats = nullptr); - ~FileSink() override { + ~LocalFileSink() override { destroy(); } @@ -159,6 +159,10 @@ class FileSink : public DataSink { int file_; }; +#ifdef VELOX_ENABLE_BACKWARD_COMPATIBILITY +using FileSink = LocalFileSink; +#endif + class MemorySink : public DataSink { public: MemorySink( diff --git a/velox/dwio/common/tests/CMakeLists.txt b/velox/dwio/common/tests/CMakeLists.txt index d4edcd374912..dea3f1ff12c3 100644 --- a/velox/dwio/common/tests/CMakeLists.txt +++ b/velox/dwio/common/tests/CMakeLists.txt @@ -21,6 +21,7 @@ add_executable( ChainedBufferTests.cpp DataBufferTests.cpp DecoderUtilTest.cpp + LocalFileSinkTest.cpp LoggedExceptionTest.cpp RangeTests.cpp RetryTests.cpp diff --git a/velox/dwio/common/tests/LocalFileSinkTest.cpp b/velox/dwio/common/tests/LocalFileSinkTest.cpp new file mode 100644 index 000000000000..358cf741a57e --- /dev/null +++ b/velox/dwio/common/tests/LocalFileSinkTest.cpp @@ -0,0 +1,43 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/common/base/Fs.h" +#include "velox/dwio/common/DataSink.h" +#include "velox/exec/tests/utils/TempDirectoryPath.h" + +#include + +using namespace ::testing; +using namespace facebook::velox::exec::test; + +namespace facebook::velox::dwio::common { + +TEST(LocalFileSinkTest, create) { + LocalFileSink::registerFactory(); + + auto root = TempDirectoryPath::create(); + auto filePath = fs::path(root->path) / "xxx/yyy/zzz/test_file.ext"; + + ASSERT_FALSE(fs::exists(filePath.string())); + + auto localFileSink = + DataSink::create(fmt::format("file:{}", filePath.string())); + localFileSink->close(); + + EXPECT_TRUE(fs::exists(filePath.string())); +} + +} // namespace facebook::velox::dwio::common diff --git a/velox/dwio/dwrf/test/E2EWriterTests.cpp b/velox/dwio/dwrf/test/E2EWriterTests.cpp index 4bccc5c12ef7..99111865be22 100644 --- a/velox/dwio/dwrf/test/E2EWriterTests.cpp +++ b/velox/dwio/dwrf/test/E2EWriterTests.cpp @@ -82,7 +82,7 @@ TEST(E2EWriterTests, DISABLED_TestFileCreation) { batches.push_back(BatchMaker::createBatch(type, size, *pool, nullptr, i)); } - auto sink = std::make_unique("/tmp/e2e_generated_file.orc"); + auto sink = std::make_unique("/tmp/e2e_generated_file.orc"); E2EWriterTestUtil::writeData( std::move(sink), type, diff --git a/velox/dwio/parquet/tests/reader/ParquetReaderBenchmark.cpp b/velox/dwio/parquet/tests/reader/ParquetReaderBenchmark.cpp index 95d04443c386..52e8e44a5a22 100644 --- a/velox/dwio/parquet/tests/reader/ParquetReaderBenchmark.cpp +++ b/velox/dwio/parquet/tests/reader/ParquetReaderBenchmark.cpp @@ -44,7 +44,7 @@ class ParquetReaderBenchmark { pool_ = memory::getDefaultMemoryPool(); dataSetBuilder_ = std::make_unique(*pool_.get(), 0); - auto sink = std::make_unique("test.parquet"); + auto sink = std::make_unique("test.parquet"); std::shared_ptr<::parquet::WriterProperties> writerProperties; if (disableDictionary_) { // The parquet file is in plain encoding format. diff --git a/velox/exec/tests/utils/HiveConnectorTestBase.cpp b/velox/exec/tests/utils/HiveConnectorTestBase.cpp index f810b67f6e12..4b1f85dea471 100644 --- a/velox/exec/tests/utils/HiveConnectorTestBase.cpp +++ b/velox/exec/tests/utils/HiveConnectorTestBase.cpp @@ -64,7 +64,7 @@ void HiveConnectorTestBase::writeToFile( options.config = config; options.schema = vectors[0]->type(); auto sink = - std::make_unique(filePath); + std::make_unique(filePath); auto childPool = pool_->addChild(kWriter, std::numeric_limits::max()); facebook::velox::dwrf::Writer writer{options, std::move(sink), *childPool};