Skip to content

Commit

Permalink
Table writer 6: create directory if not exists (#3414)
Browse files Browse the repository at this point in the history
Summary:
If the parent directory of the file that a LocalFileSink writes to
does not exist, create the directory first when creating
the LocalFileSink.

Rename FileSink to LocalFileSink, to be more clear.
LocalFileSink is one subclass of the interface
dwio::common::DataSink that implements a local file
system based data sink for a dwio writer.

This is needed by a partitioned table writer, which calls on
the LocalFileSink to write to files under partition subdirectories
that are created during query execution.

Pull Request resolved: #3414

Test Plan:
Imported from GitHub, without a `Test Plan:` line.

buck build //fb_presto_cpp:main
buck test velox/dwio/common/tests:

Reviewed By: mbasmanova

Differential Revision: D41723673

Pulled By: gggrace14

fbshipit-source-id: fed85fe462eeb5e53f695b9760426a9b4620c7bf
  • Loading branch information
gggrace14 authored and facebook-github-bot committed Dec 15, 2022
1 parent e968ded commit 8127e86
Show file tree
Hide file tree
Showing 8 changed files with 69 additions and 14 deletions.
4 changes: 2 additions & 2 deletions velox/connectors/hive/HiveConnector.h
Original file line number Diff line number Diff line change
Expand Up @@ -269,12 +269,12 @@ class HiveConnectorFactory : public ConnectorFactory {
"hive-hadoop2";

HiveConnectorFactory() : ConnectorFactory(kHiveConnectorName) {
dwio::common::FileSink::registerFactory();
dwio::common::LocalFileSink::registerFactory();
}

HiveConnectorFactory(const char* FOLLY_NONNULL connectorName)
: ConnectorFactory(connectorName) {
dwio::common::FileSink::registerFactory();
dwio::common::LocalFileSink::registerFactory();
}

std::shared_ptr<Connector> newConnector(
Expand Down
19 changes: 13 additions & 6 deletions velox/dwio/common/DataSink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
*/

#include "velox/dwio/common/DataSink.h"

#include "velox/common/base/Fs.h"
#include "velox/dwio/common/exception/Exception.h"

#include <fcntl.h>
Expand All @@ -23,19 +25,23 @@

namespace facebook::velox::dwio::common {

FileSink::FileSink(
LocalFileSink::LocalFileSink(
const std::string& name,
const MetricsLogPtr& metricLogger,
IoStatistics* stats)
: DataSink{name, metricLogger, stats} {
auto dir = fs::path(name).parent_path();
if (!fs::exists(dir)) {
DWIO_ENSURE(velox::common::generateFileDirectory(dir.c_str()));
}
file_ = open(name_.c_str(), O_CREAT | O_WRONLY | O_TRUNC, S_IRUSR | S_IWUSR);
if (file_ == -1) {
markClosed();
DWIO_RAISE("Can't open ", name_, " ErrorNo ", errno, ": ", strerror(errno));
}
}

void FileSink::write(std::vector<DataBuffer<char>>& buffers) {
void LocalFileSink::write(std::vector<DataBuffer<char>>& buffers) {
writeImpl(buffers, [&](auto& buffer) {
size_t size = buffer.size();
size_t offset = 0;
Expand Down Expand Up @@ -93,19 +99,20 @@ std::unique_ptr<DataSink> DataSink::create(
return result;
}
}
return std::make_unique<FileSink>(path, metricsLog, stats);
return std::make_unique<LocalFileSink>(path, metricsLog, stats);
}

static std::unique_ptr<DataSink> fileSink(
static std::unique_ptr<DataSink> localFileSink(
const std::string& filename,
const MetricsLogPtr& metricsLog,
IoStatistics* stats = nullptr) {
if (strncmp(filename.c_str(), "file:", 5) == 0) {
return std::make_unique<FileSink>(filename.substr(5), metricsLog, stats);
return std::make_unique<LocalFileSink>(
filename.substr(5), metricsLog, stats);
}
return nullptr;
}

VELOX_REGISTER_DATA_SINK_METHOD_DEFINITION(FileSink, fileSink);
VELOX_REGISTER_DATA_SINK_METHOD_DEFINITION(LocalFileSink, localFileSink);

} // namespace facebook::velox::dwio::common
10 changes: 7 additions & 3 deletions velox/dwio/common/DataSink.h
Original file line number Diff line number Diff line change
Expand Up @@ -133,14 +133,14 @@ class DataSink : public Closeable {
}
};

class FileSink : public DataSink {
class LocalFileSink : public DataSink {
public:
explicit FileSink(
explicit LocalFileSink(
const std::string& name,
const MetricsLogPtr& metricLogger = MetricsLog::voidLog(),
IoStatistics* stats = nullptr);

~FileSink() override {
~LocalFileSink() override {
destroy();
}

Expand All @@ -159,6 +159,10 @@ class FileSink : public DataSink {
int file_;
};

#ifdef VELOX_ENABLE_BACKWARD_COMPATIBILITY
using FileSink = LocalFileSink;
#endif

class MemorySink : public DataSink {
public:
MemorySink(
Expand Down
1 change: 1 addition & 0 deletions velox/dwio/common/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ add_executable(
ChainedBufferTests.cpp
DataBufferTests.cpp
DecoderUtilTest.cpp
LocalFileSinkTest.cpp
LoggedExceptionTest.cpp
RangeTests.cpp
RetryTests.cpp
Expand Down
43 changes: 43 additions & 0 deletions velox/dwio/common/tests/LocalFileSinkTest.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "velox/common/base/Fs.h"
#include "velox/dwio/common/DataSink.h"
#include "velox/exec/tests/utils/TempDirectoryPath.h"

#include <gtest/gtest.h>

using namespace ::testing;
using namespace facebook::velox::exec::test;

namespace facebook::velox::dwio::common {

TEST(LocalFileSinkTest, create) {
LocalFileSink::registerFactory();

auto root = TempDirectoryPath::create();
auto filePath = fs::path(root->path) / "xxx/yyy/zzz/test_file.ext";

ASSERT_FALSE(fs::exists(filePath.string()));

auto localFileSink =
DataSink::create(fmt::format("file:{}", filePath.string()));
localFileSink->close();

EXPECT_TRUE(fs::exists(filePath.string()));
}

} // namespace facebook::velox::dwio::common
2 changes: 1 addition & 1 deletion velox/dwio/dwrf/test/E2EWriterTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ TEST(E2EWriterTests, DISABLED_TestFileCreation) {
batches.push_back(BatchMaker::createBatch(type, size, *pool, nullptr, i));
}

auto sink = std::make_unique<FileSink>("/tmp/e2e_generated_file.orc");
auto sink = std::make_unique<LocalFileSink>("/tmp/e2e_generated_file.orc");
E2EWriterTestUtil::writeData(
std::move(sink),
type,
Expand Down
2 changes: 1 addition & 1 deletion velox/dwio/parquet/tests/reader/ParquetReaderBenchmark.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ class ParquetReaderBenchmark {
pool_ = memory::getDefaultMemoryPool();
dataSetBuilder_ = std::make_unique<DataSetBuilder>(*pool_.get(), 0);

auto sink = std::make_unique<FileSink>("test.parquet");
auto sink = std::make_unique<LocalFileSink>("test.parquet");
std::shared_ptr<::parquet::WriterProperties> writerProperties;
if (disableDictionary_) {
// The parquet file is in plain encoding format.
Expand Down
2 changes: 1 addition & 1 deletion velox/exec/tests/utils/HiveConnectorTestBase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ void HiveConnectorTestBase::writeToFile(
options.config = config;
options.schema = vectors[0]->type();
auto sink =
std::make_unique<facebook::velox::dwio::common::FileSink>(filePath);
std::make_unique<facebook::velox::dwio::common::LocalFileSink>(filePath);
auto childPool =
pool_->addChild(kWriter, std::numeric_limits<int64_t>::max());
facebook::velox::dwrf::Writer writer{options, std::move(sink), *childPool};
Expand Down

0 comments on commit 8127e86

Please sign in to comment.