diff --git a/velox/connectors/hive/CMakeLists.txt b/velox/connectors/hive/CMakeLists.txt index 2e137dbd6c293..87234e4732597 100644 --- a/velox/connectors/hive/CMakeLists.txt +++ b/velox/connectors/hive/CMakeLists.txt @@ -44,6 +44,7 @@ velox_link_libraries( velox_dwio_orc_reader velox_dwio_parquet_reader velox_dwio_parquet_writer + velox_dwio_text_writer_register velox_file velox_hive_partition_function velox_type_tz diff --git a/velox/connectors/hive/HiveConnectorUtil.cpp b/velox/connectors/hive/HiveConnectorUtil.cpp index dcbe91146a697..60c200d1d7c52 100644 --- a/velox/connectors/hive/HiveConnectorUtil.cpp +++ b/velox/connectors/hive/HiveConnectorUtil.cpp @@ -1049,8 +1049,11 @@ void updateWriterOptionsFromHiveConfig( case dwio::common::FileFormat::NIMBLE: // No-op for now. break; + case dwio::common::FileFormat::TEXT: + // No-op for now. + break; default: - VELOX_UNSUPPORTED("{}", fileFormat); + VELOX_UNSUPPORTED("Unsupported file format: {}", fileFormat); } } diff --git a/velox/dwio/CMakeLists.txt b/velox/dwio/CMakeLists.txt index efcb3c06bebe3..d4a879e8f6803 100644 --- a/velox/dwio/CMakeLists.txt +++ b/velox/dwio/CMakeLists.txt @@ -35,3 +35,4 @@ add_subdirectory(catalog) add_subdirectory(dwrf) add_subdirectory(orc) add_subdirectory(parquet) +add_subdirectory(text) diff --git a/velox/dwio/text/CMakeLists.txt b/velox/dwio/text/CMakeLists.txt new file mode 100644 index 0000000000000..844a12ffd6012 --- /dev/null +++ b/velox/dwio/text/CMakeLists.txt @@ -0,0 +1,23 @@ +# Copyright (c) Facebook, Inc. and its affiliates. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +if(${VELOX_BUILD_TESTING}) + add_subdirectory(tests) +endif() + +add_subdirectory(writer) + +velox_add_library(velox_dwio_text_writer_register RegisterTextWriter.cpp) + +velox_link_libraries(velox_dwio_text_writer_register velox_dwio_text_writer) diff --git a/velox/dwio/text/RegisterTextWriter.cpp b/velox/dwio/text/RegisterTextWriter.cpp new file mode 100644 index 0000000000000..59a81228e7212 --- /dev/null +++ b/velox/dwio/text/RegisterTextWriter.cpp @@ -0,0 +1,29 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/velox/dwio/text/writer/TextWriter.h" + +namespace facebook::velox::text { + +void registerTextWriterFactory() { + dwio::common::registerWriterFactory(std::make_shared()); +} + +void unregisterTextWriterFactory() { + dwio::common::unregisterWriterFactory(dwio::common::FileFormat::TEXT); +} + +} // namespace facebook::velox::text diff --git a/velox/dwio/text/RegisterTextWriter.h b/velox/dwio/text/RegisterTextWriter.h new file mode 100644 index 0000000000000..08ac312d74704 --- /dev/null +++ b/velox/dwio/text/RegisterTextWriter.h @@ -0,0 +1,25 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +namespace facebook::velox::text { + +void registerTextWriterFactory(); + +void unregisterTextWriterFactory(); + +} // namespace facebook::velox::text diff --git a/velox/dwio/text/tests/CMakeLists.txt b/velox/dwio/text/tests/CMakeLists.txt new file mode 100644 index 0000000000000..34a05424d3662 --- /dev/null +++ b/velox/dwio/text/tests/CMakeLists.txt @@ -0,0 +1,26 @@ +# Copyright (c) Facebook, Inc. and its affiliates. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +set(TEST_LINK_LIBS + velox_dwio_common_test_utils + velox_vector_test_lib + velox_exec_test_lib + velox_temp_path + GTest::gtest + GTest::gtest_main + GTest::gmock + gflags::gflags + glog::glog) + +add_subdirectory(writer) diff --git a/velox/dwio/text/tests/writer/CMakeLists.txt b/velox/dwio/text/tests/writer/CMakeLists.txt new file mode 100644 index 0000000000000..7a38526dfd630 --- /dev/null +++ b/velox/dwio/text/tests/writer/CMakeLists.txt @@ -0,0 +1,31 @@ +# Copyright (c) Facebook, Inc. and its affiliates. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +add_executable(velox_text_writer_test TextWriterTest.cpp) + +add_test( + NAME velox_text_writer_test + COMMAND velox_text_writer_test + WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}) + +target_link_libraries( + velox_text_writer_test + velox_dwio_text_writer + velox_dwio_common_test_utils + velox_link_libs + Boost::regex + Folly::folly + ${TEST_LINK_LIBS} + GTest::gtest + fmt::fmt) diff --git a/velox/dwio/text/tests/writer/TextWriterTest.cpp b/velox/dwio/text/tests/writer/TextWriterTest.cpp new file mode 100644 index 0000000000000..0382912d08c14 --- /dev/null +++ b/velox/dwio/text/tests/writer/TextWriterTest.cpp @@ -0,0 +1,97 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/dwio/text/writer/TextWriter.h" +#include +#include "velox/common/base/Fs.h" +#include "velox/common/file/FileSystems.h" +#include "velox/exec/tests/utils/TempDirectoryPath.h" +#include "velox/vector/tests/utils/VectorTestBase.h" + +namespace facebook::velox::text { +std::vector> readFile(const std::string& name) { + std::ifstream file(name); + std::string line; + std::vector> table; + + while (std::getline(file, line)) { + std::stringstream ss(line); + std::string cell; + std::vector row; + + while (std::getline(ss, cell, TextWriter::SOH)) { + row.push_back(cell); + } + + table.push_back(row); + } + return table; +} + +class TextWriterTest : public testing::Test, + public velox::test::VectorTestBase { + public: + void SetUp() override { + velox::filesystems::registerLocalFileSystem(); + dwio::common::LocalFileSink::registerFactory(); + rootPool_ = memory::memoryManager()->addRootPool("TextWriterTests"); + leafPool_ = rootPool_->addLeafChild("TextWriterTests"); + tempPath_ = exec::test::TempDirectoryPath::create(); + } + + protected: + static void SetUpTestCase() { + memory::MemoryManager::testingSetInstance({}); + } + + std::shared_ptr rootPool_; + std::shared_ptr leafPool_; + std::shared_ptr tempPath_; +}; + +TEST_F(TextWriterTest, write) { + auto schema = ROW({"c0", "c1"}, {BIGINT(), BOOLEAN()}); + auto data = makeRowVector( + {"c0", "c1"}, + { + makeFlatVector({1, 2, 3}), + makeConstant(true, 3), + }); + + WriterOptions writerOptions; + writerOptions.memoryPool = rootPool_.get(); + auto filePath = + fs::path(fmt::format("{}/test_abort.txt", tempPath_->getPath())); + auto sink = std::make_unique( + filePath, dwio::common::FileSink::Options{.pool = leafPool_.get()}); + auto writer = std::make_unique( + schema, + std::move(sink), + std::make_shared(writerOptions)); + writer->write(data); + writer->close(); + + std::vector> result = readFile(filePath); + EXPECT_EQ(result.size(), 3); + EXPECT_EQ(result[0].size(), 2); + EXPECT_EQ(result[0][0], "1"); + EXPECT_EQ(result[0][1], "1"); + EXPECT_EQ(result[1][0], "2"); + EXPECT_EQ(result[1][1], "1"); + EXPECT_EQ(result[2][0], "3"); + EXPECT_EQ(result[2][1], "1"); +} +} // namespace facebook::velox::text diff --git a/velox/dwio/text/writer/BufferedWriterSink.cpp b/velox/dwio/text/writer/BufferedWriterSink.cpp new file mode 100644 index 0000000000000..109443bea8d61 --- /dev/null +++ b/velox/dwio/text/writer/BufferedWriterSink.cpp @@ -0,0 +1,80 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/dwio/text/writer/BufferedWriterSink.h" + +namespace facebook::velox::text { + +BufferedWriterSink::BufferedWriterSink( + std::unique_ptr sink, + std::shared_ptr pool, + int64_t flushCount) + : sink_(std::move(sink)), pool_(std::move(pool)), flushCount_(flushCount) { + initializeBuffer(); +} + +BufferedWriterSink::~BufferedWriterSink() { + VELOX_CHECK_EQ( + buf_->size(), + 0, + "There are still data in data buffer when BufferedWriterSink is destroyed"); +} + +void BufferedWriterSink::write(char value) { + if (buf_->size() >= flushCount_) { + flush(); + } + buf_->append(value); +} + +void BufferedWriterSink::write(const char* data, uint64_t size) { + VELOX_CHECK_GE(flushCount_, size, "size is larger than flushCount"); + // TODO Add logic for when size is larger than flushCount_ + + if (buf_->size() + size > flushCount_) { + flush(); + } + buf_->append(buf_->size(), data, size); +} + +void BufferedWriterSink::flush() { + if (buf_->size() == 0) { + return; + } + + sink_->write(std::move(*buf_)); + initializeBuffer(); +} + +void BufferedWriterSink::close() { + flush(); + buf_->clear(); + sink_->close(); +} + +void BufferedWriterSink::abort() { + buf_->clear(); + sink_->close(); +} + +void BufferedWriterSink::initializeBuffer() { + if (buf_ == nullptr) { + buf_ = std::make_unique>(*pool_); + buf_->reserve(flushCount_); + } +} + +} // namespace facebook::velox::text diff --git a/velox/dwio/text/writer/BufferedWriterSink.h b/velox/dwio/text/writer/BufferedWriterSink.h new file mode 100644 index 0000000000000..d25b8fe36830b --- /dev/null +++ b/velox/dwio/text/writer/BufferedWriterSink.h @@ -0,0 +1,51 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "velox/dwio/common/FileSink.h" + +namespace facebook::velox::text { + +/// Takes character(s) and writes into a 'sink'. +/// It will buffer the characters(s) in memory before flushing to the sink. +/// The upper limit character count is specified by 'flushCount'. +class BufferedWriterSink { + public: + BufferedWriterSink( + std::unique_ptr sink, + std::shared_ptr pool, + int64_t flushCount); + + ~BufferedWriterSink(); + + void write(char value); + void write(const char* data, uint64_t size); + void flush(); + void close(); + void abort(); + + private: + const std::unique_ptr sink_; + const std::shared_ptr pool_; + // The upper limit character count. + const int64_t flushCount_; + std::unique_ptr> buf_; + + void initializeBuffer(); +}; + +} // namespace facebook::velox::text diff --git a/velox/dwio/text/writer/CMakeLists.txt b/velox/dwio/text/writer/CMakeLists.txt new file mode 100644 index 0000000000000..ebfb78c629055 --- /dev/null +++ b/velox/dwio/text/writer/CMakeLists.txt @@ -0,0 +1,24 @@ +# Copyright (c) Facebook, Inc. and its affiliates. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +velox_add_library(velox_dwio_text_writer TextWriter.cpp BufferedWriterSink.cpp) + +velox_link_libraries( + velox_dwio_text_writer + velox_dwio_arrow_parquet_writer_lib + velox_dwio_arrow_parquet_writer_util_lib + velox_dwio_common + velox_arrow_bridge + arrow + fmt::fmt) diff --git a/velox/dwio/text/writer/TextWriter.cpp b/velox/dwio/text/writer/TextWriter.cpp new file mode 100644 index 0000000000000..249d2377a4387 --- /dev/null +++ b/velox/dwio/text/writer/TextWriter.cpp @@ -0,0 +1,142 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/dwio/text/writer/TextWriter.h" + +#include +#include "velox/common/base/Pointers.h" +#include "velox/exec/MemoryReclaimer.h" + +namespace facebook::velox::text { + +static std::string encodeColumnCellValue( + const std::shared_ptr& decodedColumnVector, + const TypePtr& type, + vector_size_t row) { + if (decodedColumnVector->isNullAt(row)) { + return TextWriter::nullData; + } + + switch (type->kind()) { + case TypeKind::BOOLEAN: + return std::to_string(decodedColumnVector->valueAt(row)); + case TypeKind::TINYINT: + return std::to_string(decodedColumnVector->valueAt(row)); + case TypeKind::SMALLINT: + return std::to_string(decodedColumnVector->valueAt(row)); + case TypeKind::INTEGER: + return std::to_string(decodedColumnVector->valueAt(row)); + case TypeKind::BIGINT: + return std::to_string(decodedColumnVector->valueAt(row)); + case TypeKind::REAL: + return std::to_string(decodedColumnVector->valueAt(row)); + case TypeKind::DOUBLE: + return std::to_string(decodedColumnVector->valueAt(row)); + case TypeKind::VARCHAR: + return decodedColumnVector->valueAt(row).getString(); + case TypeKind::TIMESTAMP: + return decodedColumnVector->valueAt(row).toString(); + case TypeKind::VARBINARY: + return decodedColumnVector->valueAt(row).getString(); + case TypeKind::ARRAY: + [[fallthrough]]; + case TypeKind::MAP: + [[fallthrough]]; + case TypeKind::ROW: + [[fallthrough]]; + case TypeKind::UNKNOWN: + [[fallthrough]]; + default: + VELOX_NYI("{} is not supported yet in TextWriter", type->kind()); + } +} + +TextWriter::TextWriter( + RowTypePtr schema, + std::unique_ptr sink, + const std::shared_ptr& options) + : schema_(std::move(schema)) { + bufferedWriterSink_ = std::make_unique( + std::move(sink), + options->memoryPool->addLeafChild(fmt::format( + "{}.text_writer_node.{}", + options->memoryPool->name(), + folly::to(folly::Random::rand64()))), + options->defaultFlushCount); +} + +void TextWriter::write(const VectorPtr& data) { + VELOX_CHECK_EQ( + data->encoding(), + VectorEncoding::Simple::ROW, + "Text writer expects row vector input"); + VELOX_CHECK( + data->type()->equivalent(*schema_), + "The file schema type should be equal with the input row vector type."); + const RowVector* dataRowVector = data->as(); + + std::vector> decodedColumnVectors; + const auto numColumns = dataRowVector->childrenSize(); + for (size_t column = 0; column < numColumns; ++column) { + auto decodedColumnVector = std::make_shared(DecodedVector( + *dataRowVector->childAt(column), + SelectivityVector(dataRowVector->size()))); + decodedColumnVectors.push_back(decodedColumnVector); + } + + for (vector_size_t row = 0; row < data->size(); ++row) { + for (size_t column = 0; column < numColumns; ++column) { + if (column != 0) { + bufferedWriterSink_->write(SOH); + } + auto columnData = encodeColumnCellValue( + decodedColumnVectors.at(column), schema_->childAt(column), row); + bufferedWriterSink_->write(columnData.c_str(), columnData.length()); + } + bufferedWriterSink_->write(newLine); + } +} + +void TextWriter::flush() { + bufferedWriterSink_->flush(); +} + +void TextWriter::close() { + bufferedWriterSink_->close(); +} + +void TextWriter::abort() { + bufferedWriterSink_->abort(); +} + +std::unique_ptr TextWriterFactory::createWriter( + std::unique_ptr sink, + const std::shared_ptr& options) { + auto textOptions = std::dynamic_pointer_cast(options); + VELOX_CHECK_NOT_NULL( + textOptions, "Text writer factory expected a Text WriterOptions object."); + return std::make_unique( + asRowType(options->schema), std::move(sink), textOptions); +} + +std::unique_ptr +TextWriterFactory::createWriterOptions() { + return std::make_unique(); +} + +const std::string TextWriter::nullData = "\\N"; + +} // namespace facebook::velox::text diff --git a/velox/dwio/text/writer/TextWriter.h b/velox/dwio/text/writer/TextWriter.h new file mode 100644 index 0000000000000..a07c291f983f9 --- /dev/null +++ b/velox/dwio/text/writer/TextWriter.h @@ -0,0 +1,83 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "velox/common/compression/Compression.h" +#include "velox/common/config/Config.h" +#include "velox/dwio/common/DataBuffer.h" +#include "velox/dwio/common/FileSink.h" +#include "velox/dwio/common/FlushPolicy.h" +#include "velox/dwio/common/Options.h" +#include "velox/dwio/common/Writer.h" +#include "velox/dwio/common/WriterFactory.h" +#include "velox/dwio/text/writer/BufferedWriterSink.h" +#include "velox/vector/ComplexVector.h" + +namespace facebook::velox::text { + +struct WriterOptions : public dwio::common::WriterOptions { + int64_t defaultFlushCount = 10 << 10; +}; + +/// Encodes Velox vectors in TextFormat and writes into a FileSink. +class TextWriter : public dwio::common::Writer { + public: + const static char SOH = '\x01'; + const static char newLine = '\n'; + const static std::string nullData; + + /// Constructs a writer with output to a 'sink'. + /// @param schema specifies the file's overall schema, and it is always + /// non-null. + /// @param sink output sink + /// @param options writer options + TextWriter( + RowTypePtr schema, + std::unique_ptr sink, + const std::shared_ptr& options); + + ~TextWriter() override = default; + + void write(const VectorPtr& data) override; + + void flush() override; + + bool finish() override { + return true; + } + + void close() override; + + void abort() override; + + private: + const RowTypePtr schema_; + std::unique_ptr bufferedWriterSink_; +}; + +class TextWriterFactory : public dwio::common::WriterFactory { + public: + TextWriterFactory() : WriterFactory(dwio::common::FileFormat::TEXT) {} + + std::unique_ptr createWriter( + std::unique_ptr sink, + const std::shared_ptr& options) override; + + std::unique_ptr createWriterOptions() override; +}; + +} // namespace facebook::velox::text