diff --git a/azure-pipelines.yml b/azure-pipelines.yml index b331704011..c5197f56c7 100644 --- a/azure-pipelines.yml +++ b/azure-pipelines.yml @@ -51,7 +51,7 @@ parameters: architecture: 'x64' displayName: 'Use Python $(python.version)' - - bash: python -m pip install --upgrade pip wheel setuptools "jupyterlab>=3.0.14" numpy "pyarrow>=2" "black==20.8b1" flake8-black + - bash: python -m pip install --upgrade pip wheel setuptools "jupyterlab>=3.0.14" numpy "pyarrow>=5" "black==20.8b1" flake8-black displayName: 'Install Python base dependencies' condition: and(succeeded(), ne(variables['python.version'], '2.7')) @@ -462,7 +462,7 @@ jobs: architecture: 'x64' displayName: 'Use Python $(python.version)' - - bash: python -m pip install --upgrade pip wheel setuptools jupyterlab numpy "pyarrow>=2" "black==20.8b1" flake8-black + - bash: python -m pip install --upgrade pip wheel setuptools jupyterlab numpy "pyarrow>=5" "black==20.8b1" flake8-black displayName: 'Install Python base dependencies' condition: and(succeeded(), ne(variables['python.version'], '2.7')) diff --git a/cmake/arrow.txt.in b/cmake/arrow.txt.in index d8da63f81b..68e4f87872 100644 --- a/cmake/arrow.txt.in +++ b/cmake/arrow.txt.in @@ -5,7 +5,7 @@ project(arrow-download NONE) include(ExternalProject) ExternalProject_Add(apachearrow GIT_REPOSITORY https://github.com/apache/arrow.git - GIT_TAG apache-arrow-1.0.1 + GIT_TAG apache-arrow-5.0.0 SOURCE_DIR "${CMAKE_BINARY_DIR}/arrow-src" BINARY_DIR "${CMAKE_BINARY_DIR}/arrow-build" CONFIGURE_COMMAND "" diff --git a/cmake/arrow/CMakeLists.txt b/cmake/arrow/CMakeLists.txt index baf5f4f30e..9bbe684658 100644 --- a/cmake/arrow/CMakeLists.txt +++ b/cmake/arrow/CMakeLists.txt @@ -25,6 +25,7 @@ set(ARROW_SRCS ${CMAKE_BINARY_DIR}/arrow-src/cpp/src/arrow/buffer.cc ${CMAKE_BINARY_DIR}/arrow-src/cpp/src/arrow/chunked_array.cc ${CMAKE_BINARY_DIR}/arrow-src/cpp/src/arrow/compare.cc + ${CMAKE_BINARY_DIR}/arrow-src/cpp/src/arrow/datum.cc ${CMAKE_BINARY_DIR}/arrow-src/cpp/src/arrow/device.cc ${CMAKE_BINARY_DIR}/arrow-src/cpp/src/arrow/extension_type.cc ${CMAKE_BINARY_DIR}/arrow-src/cpp/src/arrow/memory_pool.cc @@ -45,6 +46,7 @@ set(ARROW_SRCS ${CMAKE_BINARY_DIR}/arrow-src/cpp/src/arrow/csv/column_decoder.cc ${CMAKE_BINARY_DIR}/arrow-src/cpp/src/arrow/csv/options.cc ${CMAKE_BINARY_DIR}/arrow-src/cpp/src/arrow/csv/parser.cc + ${CMAKE_BINARY_DIR}/arrow-src/cpp/src/arrow/csv/reader.cc ${CMAKE_BINARY_DIR}/arrow-src/cpp/src/arrow/filesystem/filesystem.cc ${CMAKE_BINARY_DIR}/arrow-src/cpp/src/arrow/filesystem/localfs.cc ${CMAKE_BINARY_DIR}/arrow-src/cpp/src/arrow/filesystem/mockfs.cc @@ -57,6 +59,7 @@ set(ARROW_SRCS ${CMAKE_BINARY_DIR}/arrow-src/cpp/src/arrow/json/parser.cc ${CMAKE_BINARY_DIR}/arrow-src/cpp/src/arrow/json/reader.cc ${CMAKE_BINARY_DIR}/arrow-src/cpp/src/arrow/io/buffered.cc + ${CMAKE_BINARY_DIR}/arrow-src/cpp/src/arrow/io/caching.cc ${CMAKE_BINARY_DIR}/arrow-src/cpp/src/arrow/io/compressed.cc ${CMAKE_BINARY_DIR}/arrow-src/cpp/src/arrow/io/interfaces.cc ${CMAKE_BINARY_DIR}/arrow-src/cpp/src/arrow/io/memory.cc @@ -66,17 +69,20 @@ set(ARROW_SRCS ${CMAKE_BINARY_DIR}/arrow-src/cpp/src/arrow/util/bit_util.cc ${CMAKE_BINARY_DIR}/arrow-src/cpp/src/arrow/util/bitmap_builders.cc ${CMAKE_BINARY_DIR}/arrow-src/cpp/src/arrow/util/bitmap_ops.cc + ${CMAKE_BINARY_DIR}/arrow-src/cpp/src/arrow/util/cancel.cc ${CMAKE_BINARY_DIR}/arrow-src/cpp/src/arrow/util/compression.cc ${CMAKE_BINARY_DIR}/arrow-src/cpp/src/arrow/util/cpu_info.cc ${CMAKE_BINARY_DIR}/arrow-src/cpp/src/arrow/util/decimal.cc ${CMAKE_BINARY_DIR}/arrow-src/cpp/src/arrow/util/future.cc + ${CMAKE_BINARY_DIR}/arrow-src/cpp/src/arrow/util/formatting.cc ${CMAKE_BINARY_DIR}/arrow-src/cpp/src/arrow/util/delimiting.cc ${CMAKE_BINARY_DIR}/arrow-src/cpp/src/arrow/util/int_util.cc ${CMAKE_BINARY_DIR}/arrow-src/cpp/src/arrow/util/io_util.cc - ${CMAKE_BINARY_DIR}/arrow-src/cpp/src/arrow/util/iterator.cc + # ${CMAKE_BINARY_DIR}/arrow-src/cpp/src/arrow/util/iterator.cc ${CMAKE_BINARY_DIR}/arrow-src/cpp/src/arrow/util/logging.cc ${CMAKE_BINARY_DIR}/arrow-src/cpp/src/arrow/util/key_value_metadata.cc ${CMAKE_BINARY_DIR}/arrow-src/cpp/src/arrow/util/memory.cc + ${CMAKE_BINARY_DIR}/arrow-src/cpp/src/arrow/util/mutex.cc ${CMAKE_BINARY_DIR}/arrow-src/cpp/src/arrow/util/string.cc ${CMAKE_BINARY_DIR}/arrow-src/cpp/src/arrow/util/string_builder.cc ${CMAKE_BINARY_DIR}/arrow-src/cpp/src/arrow/util/task_group.cc @@ -104,13 +110,12 @@ set(ARROW_SRCS if (PSP_PYTHON_BUILD) set(ARROW_SRCS ${ARROW_SRCS} - ${CMAKE_BINARY_DIR}/arrow-src/cpp/src/arrow/csv/reader.cc - ${CMAKE_BINARY_DIR}/arrow-src/cpp/src/arrow/datum.cc + # ${CMAKE_BINARY_DIR}/arrow-src/cpp/src/arrow/datum.cc ${CMAKE_BINARY_DIR}/arrow-src/cpp/src/arrow/io/file.cc ${CMAKE_BINARY_DIR}/arrow-src/cpp/src/arrow/tensor/coo_converter.cc ${CMAKE_BINARY_DIR}/arrow-src/cpp/src/arrow/tensor/csf_converter.cc ${CMAKE_BINARY_DIR}/arrow-src/cpp/src/arrow/tensor/csx_converter.cc - ${CMAKE_BINARY_DIR}/arrow-src/cpp/src/arrow/util/formatting.cc + # ${CMAKE_BINARY_DIR}/arrow-src/cpp/src/arrow/util/formatting.cc ${CMAKE_BINARY_DIR}/arrow-src/cpp/src/arrow/util/time.cc ${CMAKE_BINARY_DIR}/arrow-src/cpp/src/arrow/vendored/double-conversion/bignum-dtoa.cc ${CMAKE_BINARY_DIR}/arrow-src/cpp/src/arrow/vendored/double-conversion/fast-dtoa.cc diff --git a/cmake/arrow/config.h b/cmake/arrow/config.h index 33ccf6f1cd..cbf82a2648 100644 --- a/cmake/arrow/config.h +++ b/cmake/arrow/config.h @@ -15,9 +15,9 @@ // specific language governing permissions and limitations // under the License. -#define ARROW_VERSION_MAJOR 1 +#define ARROW_VERSION_MAJOR 5 #define ARROW_VERSION_MINOR 0 -#define ARROW_VERSION_PATCH 1 +#define ARROW_VERSION_PATCH 0 #define ARROW_VERSION ((ARROW_VERSION_MAJOR * 1000) + ARROW_VERSION_MINOR) * 1000 + ARROW_VERSION_PATCH /* #undef DOUBLE_CONVERSION_HAS_CASE_INSENSIBILITY */ diff --git a/cpp/perspective/src/cpp/arrow_csv.cpp b/cpp/perspective/src/cpp/arrow_csv.cpp index 7c19a1b9a6..84c50fa0bf 100644 --- a/cpp/perspective/src/cpp/arrow_csv.cpp +++ b/cpp/perspective/src/cpp/arrow_csv.cpp @@ -20,6 +20,49 @@ #include #endif + +template +static inline arrow::TimestampType::c_type ConvertTimePoint(TimePoint tp, arrow::TimeUnit::type unit) { + auto duration = tp.time_since_epoch(); + switch (unit) { + case arrow::TimeUnit::SECOND: + return std::chrono::duration_cast(duration).count(); + case arrow::TimeUnit::MILLI: + return std::chrono::duration_cast(duration).count(); + case arrow::TimeUnit::MICRO: + return std::chrono::duration_cast(duration).count(); + case arrow::TimeUnit::NANO: + return std::chrono::duration_cast(duration).count(); + default: + // Compiler errors without default case even though all enum cases are handled + assert(0); + return 0; + } +} + + +static inline bool ParseYYYY_MM_DD(const char* s, + arrow_vendored::date::year_month_day* out) { + uint16_t year = 0; + uint8_t month = 0; + uint8_t day = 0; + if (ARROW_PREDICT_FALSE(s[4] != '-') || ARROW_PREDICT_FALSE(s[7] != '-')) { + return false; + } + if (ARROW_PREDICT_FALSE(!arrow::internal::ParseUnsigned(s + 0, 4, &year))) { + return false; + } + if (ARROW_PREDICT_FALSE(!arrow::internal::ParseUnsigned(s + 5, 2, &month))) { + return false; + } + if (ARROW_PREDICT_FALSE(!arrow::internal::ParseUnsigned(s + 8, 2, &day))) { + return false; + } + *out = {arrow_vendored::date::year{year}, arrow_vendored::date::month{month}, + arrow_vendored::date::day{day}}; + return out->ok(); +} + namespace perspective { namespace apachearrow { @@ -101,7 +144,7 @@ namespace apachearrow { // "YYYY-MM-DD[ T]hh:mm:ss.sss" arrow_vendored::date::year_month_day ymd; if (ARROW_PREDICT_FALSE( - !arrow::internal::detail::ParseYYYY_MM_DD( + !ParseYYYY_MM_DD( s, &ymd))) { return false; } @@ -116,7 +159,7 @@ namespace apachearrow { return false; } - *out = arrow::internal::detail::ConvertTimePoint( + *out = ConvertTimePoint( arrow_vendored::date::sys_days(ymd) + seconds + millis, unit); return true; @@ -124,7 +167,7 @@ namespace apachearrow { // "2008-09-15[ T]15:53:00+05:00" arrow_vendored::date::year_month_day ymd; if (ARROW_PREDICT_FALSE( - !arrow::internal::detail::ParseYYYY_MM_DD( + !ParseYYYY_MM_DD( s, &ymd))) { return false; } @@ -139,7 +182,7 @@ namespace apachearrow { return false; } - *out = arrow::internal::detail::ConvertTimePoint( + *out = ConvertTimePoint( arrow_vendored::date::sys_days(ymd) + tz + seconds, unit); return true; @@ -193,7 +236,7 @@ namespace apachearrow { csvToTable(std::string& csv, bool is_update, std::unordered_map>& schema) { - arrow::MemoryPool* pool = arrow::default_memory_pool(); + arrow::io::IOContext io_context = arrow::io::default_io_context(); auto input = std::make_shared(csv); auto read_options = arrow::csv::ReadOptions::Defaults(); auto parse_options = arrow::csv::ParseOptions::Defaults(); @@ -209,7 +252,7 @@ namespace apachearrow { } auto maybe_reader = arrow::csv::TableReader::Make( - pool, input, read_options, parse_options, convert_options); + io_context, input, read_options, parse_options, convert_options); std::shared_ptr reader = *maybe_reader; diff --git a/cpp/perspective/src/cpp/vendor/arrow_single_threaded_reader.cpp b/cpp/perspective/src/cpp/vendor/arrow_single_threaded_reader.cpp index 898a17280c..7b02504b76 100644 --- a/cpp/perspective/src/cpp/vendor/arrow_single_threaded_reader.cpp +++ b/cpp/perspective/src/cpp/vendor/arrow_single_threaded_reader.cpp @@ -62,562 +62,729 @@ #include "arrow/status.h" #include "arrow/table.h" #include "arrow/type.h" +#include "arrow/type_fwd.h" +// #include "arrow/util/async_generator.h" +// #include "arrow/util/future.h" #include "arrow/util/iterator.h" #include "arrow/util/logging.h" #include "arrow/util/macros.h" #include "arrow/util/optional.h" #include "arrow/util/task_group.h" +// #include "arrow/util/thread_pool.h" #include "arrow/util/utf8.h" +#include "arrow/util/vector.h" namespace arrow { +namespace csv { -class MemoryPool; +using internal::Executor; + +namespace { + +struct ConversionSchema { + struct Column { + std::string name; + // Physical column index in CSV file + int32_t index; + // If true, make a column of nulls + bool is_missing; + // If set, convert the CSV column to this type + // If unset (and is_missing is false), infer the type from the CSV column + std::shared_ptr type; + }; + + static Column NullColumn(std::string col_name, std::shared_ptr type) { + return Column{std::move(col_name), -1, true, std::move(type)}; + } + + static Column TypedColumn(std::string col_name, int32_t col_index, + std::shared_ptr type) { + return Column{std::move(col_name), col_index, false, std::move(type)}; + } + + static Column InferredColumn(std::string col_name, int32_t col_index) { + return Column{std::move(col_name), col_index, false, nullptr}; + } + + std::vector columns; +}; + +// An iterator of Buffers that makes sure there is no straddling CRLF sequence. +class CSVBufferIterator { + public: + static Iterator> Make( + Iterator> buffer_iterator) { + Transformer, std::shared_ptr> fn = + CSVBufferIterator(); + return MakeTransformedIterator(std::move(buffer_iterator), fn); + } + +// static AsyncGenerator> MakeAsync( +// AsyncGenerator> buffer_iterator) { +// Transformer, std::shared_ptr> fn = +// CSVBufferIterator(); +// return MakeTransformedGenerator(std::move(buffer_iterator), fn); +// } + + Result>> operator()(std::shared_ptr buf) { + if (buf == nullptr) { + // EOF + return TransformFinish(); + } -namespace io { + int64_t offset = 0; + if (first_buffer_) { + ARROW_ASSIGN_OR_RAISE(auto data, util::SkipUTF8BOM(buf->data(), buf->size())); + offset += data - buf->data(); + DCHECK_GE(offset, 0); + first_buffer_ = false; + } - class InputStream; + if (trailing_cr_ && buf->data()[offset] == '\n') { + // Skip '\r\n' line separator that started at the end of previous buffer + ++offset; + } -} // namespace io + trailing_cr_ = (buf->data()[buf->size() - 1] == '\r'); + buf = SliceBuffer(buf, offset); + if (buf->size() == 0) { + // EOF + return TransformFinish(); + } else { + return TransformYield(buf); + } + } + + protected: + bool first_buffer_ = true; + // Whether there was a trailing CR at the end of last received buffer + bool trailing_cr_ = false; +}; + +struct CSVBlock { + // (partial + completion + buffer) is an entire delimited CSV buffer. + std::shared_ptr partial; + std::shared_ptr completion; + std::shared_ptr buffer; + int64_t block_index; + bool is_final; + int64_t bytes_skipped; + std::function consume_bytes; +}; + +} // namespace +} // namespace csv + +template <> +struct IterationTraits { + static csv::CSVBlock End() { return csv::CSVBlock{{}, {}, {}, -1, true, 0, {}}; } + static bool IsEnd(const csv::CSVBlock& val) { return val.block_index < 0; } +}; namespace csv { - - // using internal::GetCpuThreadPool; - // using internal::ThreadPool; - - struct ConversionSchema { - struct Column { - std::string name; - // Physical column index in CSV file - int32_t index; - // If true, make a column of nulls - bool is_missing; - // If set, convert the CSV column to this type - // If unset (and is_missing is false), infer the type from the CSV - // column - std::shared_ptr type; +namespace { + +// This is a callable that can be used to transform an iterator. The source iterator +// will contain buffers of data and the output iterator will contain delimited CSV +// blocks. util::optional is used so that there is an end token (required by the +// iterator APIs (e.g. Visit)) even though an empty optional is never used in this code. +class BlockReader { + public: + BlockReader(std::unique_ptr chunker, std::shared_ptr first_buffer, + int64_t skip_rows) + : chunker_(std::move(chunker)), + partial_(std::make_shared("")), + buffer_(std::move(first_buffer)), + skip_rows_(skip_rows) {} + + protected: + std::unique_ptr chunker_; + std::shared_ptr partial_, buffer_; + int64_t skip_rows_; + int64_t block_index_ = 0; + // Whether there was a trailing CR at the end of last received buffer + bool trailing_cr_ = false; +}; + +// An object that reads delimited CSV blocks for serial use. +// The number of bytes consumed should be notified after each read, +// using CSVBlock::consume_bytes. +class SerialBlockReader : public BlockReader { + public: + using BlockReader::BlockReader; + + static Iterator MakeIterator( + Iterator> buffer_iterator, std::unique_ptr chunker, + std::shared_ptr first_buffer, int64_t skip_rows) { + auto block_reader = + std::make_shared(std::move(chunker), first_buffer, skip_rows); + // Wrap shared pointer in callable + Transformer, CSVBlock> block_reader_fn = + [block_reader](std::shared_ptr buf) { + return (*block_reader)(std::move(buf)); }; + return MakeTransformedIterator(std::move(buffer_iterator), block_reader_fn); + } + +// static AsyncGenerator MakeAsyncIterator( +// AsyncGenerator> buffer_generator, +// std::unique_ptr chunker, std::shared_ptr first_buffer, +// int64_t skip_rows) { +// auto block_reader = +// std::make_shared(std::move(chunker), first_buffer, skip_rows); +// // Wrap shared pointer in callable +// Transformer, CSVBlock> block_reader_fn = +// [block_reader](std::shared_ptr next) { +// return (*block_reader)(std::move(next)); +// }; +// return MakeTransformedGenerator(std::move(buffer_generator), block_reader_fn); +// } + + Result> operator()(std::shared_ptr next_buffer) { + if (buffer_ == nullptr) { + return TransformFinish(); + } - static Column - NullColumn(std::string col_name, std::shared_ptr type) { - return Column{std::move(col_name), -1, true, std::move(type)}; - } - - static Column - TypedColumn(std::string col_name, int32_t col_index, - std::shared_ptr type) { - return Column{ - std::move(col_name), col_index, false, std::move(type)}; - } - - static Column - InferredColumn(std::string col_name, int32_t col_index) { - return Column{std::move(col_name), col_index, false, nullptr}; - } - - std::vector columns; - }; - - // An iterator of Buffers that makes sure there is no straddling CRLF - // sequence. - class CSVBufferIterator { - public: - explicit CSVBufferIterator( - Iterator> buffer_iterator) - : buffer_iterator_(std::move(buffer_iterator)) {} - - static Iterator> - Make(Iterator> buffer_iterator) { - CSVBufferIterator it(std::move(buffer_iterator)); - return Iterator>(std::move(it)); - } - - Result> - Next() { - ARROW_ASSIGN_OR_RAISE(auto buf, buffer_iterator_.Next()); - if (buf == nullptr) { - // EOF - return nullptr; - } - - int64_t offset = 0; - if (first_buffer_) { - ARROW_ASSIGN_OR_RAISE( - auto data, util::SkipUTF8BOM(buf->data(), buf->size())); - offset += data - buf->data(); - DCHECK_GE(offset, 0); - first_buffer_ = false; - } - - if (trailing_cr_ && buf->data()[offset] == '\n') { - // Skip '\r\n' line separator that started at the end of - // previous buffer - ++offset; - } - - trailing_cr_ = (buf->data()[buf->size() - 1] == '\r'); - buf = SliceBuffer(buf, offset); - if (buf->size() == 0) { - // EOF - return nullptr; - } else { - return buf; - } - } - - protected: - Iterator> buffer_iterator_; - bool first_buffer_ = true; - // Whether there was a trailing CR at the end of last received buffer - bool trailing_cr_ = false; - }; + bool is_final = (next_buffer == nullptr); + int64_t bytes_skipped = 0; + + if (skip_rows_) { + bytes_skipped += partial_->size(); + auto orig_size = buffer_->size(); + RETURN_NOT_OK( + chunker_->ProcessSkip(partial_, buffer_, is_final, &skip_rows_, &buffer_)); + bytes_skipped += orig_size - buffer_->size(); + auto empty = std::make_shared(nullptr, 0); + if (skip_rows_) { + // Still have rows beyond this buffer to skip return empty block + partial_ = std::move(buffer_); + buffer_ = next_buffer; + return TransformYield(CSVBlock{empty, empty, empty, block_index_++, + is_final, bytes_skipped, + [](int64_t) { return Status::OK(); }}); + } + partial_ = std::move(empty); + } - struct CSVBlock { - // (partial + completion + buffer) is an entire delimited CSV buffer. - std::shared_ptr partial; - std::shared_ptr completion; - std::shared_ptr buffer; - int64_t block_index; - bool is_final; - std::function consume_bytes; - }; + std::shared_ptr completion; - class BlockReader { - public: - BlockReader(std::unique_ptr chunker, - Iterator> buffer_iterator, - std::shared_ptr first_buffer) - : chunker_(std::move(chunker)) - , buffer_iterator_(std::move(buffer_iterator)) - , partial_(std::make_shared("")) - , buffer_(std::move(first_buffer)) {} - - protected: - std::unique_ptr chunker_; - Iterator> buffer_iterator_; - - std::shared_ptr partial_, buffer_; - int64_t block_index_ = 0; - // Whether there was a trailing CR at the end of last received buffer - bool trailing_cr_ = false; + if (is_final) { + // End of file reached => compute completion from penultimate block + RETURN_NOT_OK(chunker_->ProcessFinal(partial_, buffer_, &completion, &buffer_)); + } else { + // Get completion of partial from previous block. + RETURN_NOT_OK( + chunker_->ProcessWithPartial(partial_, buffer_, &completion, &buffer_)); + } + int64_t bytes_before_buffer = partial_->size() + completion->size(); + + auto consume_bytes = [this, bytes_before_buffer, + next_buffer](int64_t nbytes) -> Status { + DCHECK_GE(nbytes, 0); + auto offset = nbytes - bytes_before_buffer; + if (offset < 0) { + // Should not happen + return Status::Invalid("CSV parser got out of sync with chunker"); + } + partial_ = SliceBuffer(buffer_, offset); + buffer_ = next_buffer; + return Status::OK(); }; - // An object that reads delimited CSV blocks for serial use. - // The number of bytes consumed should be notified after each read, - // using CSVBlock::consume_bytes. - class SerialBlockReader : public BlockReader { - public: - using BlockReader::BlockReader; - - Result> - Next() { - if (buffer_ == nullptr) { - // EOF - return util::optional(); - } - - std::shared_ptr next_buffer, completion; - ARROW_ASSIGN_OR_RAISE(next_buffer, buffer_iterator_.Next()); - bool is_final = (next_buffer == nullptr); - - if (is_final) { - // End of file reached => compute completion from penultimate - // block - RETURN_NOT_OK(chunker_->ProcessFinal( - partial_, buffer_, &completion, &buffer_)); - } else { - // Get completion of partial from previous block. - RETURN_NOT_OK(chunker_->ProcessWithPartial( - partial_, buffer_, &completion, &buffer_)); - } - int64_t bytes_before_buffer = partial_->size() + completion->size(); - - auto consume_bytes = [this, bytes_before_buffer, next_buffer]( - int64_t nbytes) -> Status { - DCHECK_GE(nbytes, 0); - auto offset = nbytes - bytes_before_buffer; - if (offset < 0) { - // Should not happen - return Status::Invalid( - "CSV parser got out of sync with chunker"); - } - partial_ = SliceBuffer(buffer_, offset); - buffer_ = next_buffer; - return Status::OK(); - }; - - return CSVBlock{partial_, completion, buffer_, block_index_++, - is_final, std::move(consume_bytes)}; - } - }; + return TransformYield(CSVBlock{partial_, completion, buffer_, + block_index_++, is_final, bytes_skipped, + std::move(consume_bytes)}); + } +}; + +struct ParsedBlock { + std::shared_ptr parser; + int64_t block_index; + int64_t bytes_parsed_or_skipped; +}; + +struct DecodedBlock { + std::shared_ptr record_batch; + // Represents the number of input bytes represented by this batch + // This will include bytes skipped when skipping rows after the header + int64_t bytes_processed; +}; + +} // namespace + +} // namespace csv + +template <> +struct IterationTraits { + static csv::ParsedBlock End() { return csv::ParsedBlock{nullptr, -1, -1}; } + static bool IsEnd(const csv::ParsedBlock& val) { return val.block_index < 0; } +}; + +template <> +struct IterationTraits { + static csv::DecodedBlock End() { return csv::DecodedBlock{nullptr, -1}; } + static bool IsEnd(const csv::DecodedBlock& val) { return val.bytes_processed < 0; } +}; - - ///////////////////////////////////////////////////////////////////////// - // Base class for common functionality - - class ReaderMixin { - public: - ReaderMixin(MemoryPool* pool, std::shared_ptr input, - const ReadOptions& read_options, const ParseOptions& parse_options, - const ConvertOptions& convert_options) - : pool_(pool) - , read_options_(read_options) - , parse_options_(parse_options) - , convert_options_(convert_options) - , input_(std::move(input)) {} - - protected: - // Read header and column names from buffer, create column builders - Status - ProcessHeader( - const std::shared_ptr& buf, std::shared_ptr* rest) { - const uint8_t* data = buf->data(); - const auto data_end = data + buf->size(); - DCHECK_GT(data_end - data, 0); - - if (read_options_.skip_rows) { - // Skip initial rows (potentially invalid CSV data) - auto num_skipped_rows - = SkipRows(data, static_cast(data_end - data), - read_options_.skip_rows, &data); - if (num_skipped_rows < read_options_.skip_rows) { - return Status::Invalid("Could not skip initial ", - read_options_.skip_rows, - " rows from CSV file, " - "either file is too short or header is larger than " - "block size"); - } - } - - if (read_options_.column_names.empty()) { - // Parse one row (either to read column names or to know the - // number of columns) - BlockParser parser(pool_, parse_options_, num_csv_cols_, 1); - uint32_t parsed_size = 0; - RETURN_NOT_OK(parser.Parse( - util::string_view( - reinterpret_cast(data), data_end - data), - &parsed_size)); - if (parser.num_rows() != 1) { - return Status::Invalid( - "Could not read first row from CSV file, either " - "file is too short or header is larger than block " - "size"); - } - if (parser.num_cols() == 0) { - return Status::Invalid("No columns in CSV file"); - } - - if (read_options_.autogenerate_column_names) { - column_names_ = GenerateColumnNames(parser.num_cols()); - } else { - // Read column names from header row - auto visit = [&](const uint8_t* data, uint32_t size, - bool quoted) -> Status { - column_names_.emplace_back( - reinterpret_cast(data), size); - return Status::OK(); - }; - RETURN_NOT_OK(parser.VisitLastRow(visit)); - DCHECK_EQ(static_cast(parser.num_cols()), - column_names_.size()); - // Skip parsed header row - data += parsed_size; - } - } else { - column_names_ = read_options_.column_names; - } - *rest = SliceBuffer(buf, data - buf->data()); - - num_csv_cols_ = static_cast(column_names_.size()); - DCHECK_GT(num_csv_cols_, 0); - - return MakeConversionSchema(); - } +namespace csv { +namespace { + +// A function object that takes in a buffer of CSV data and returns a parsed batch of CSV +// data (CSVBlock -> ParsedBlock) for use with MakeMappedGenerator. +// The parsed batch contains a list of offsets for each of the columns so that columns +// can be individually scanned +// +// This operator is not re-entrant +class BlockParsingOperator { + public: + BlockParsingOperator(io::IOContext io_context, ParseOptions parse_options, + int num_csv_cols, int64_t first_row) + : io_context_(io_context), + parse_options_(parse_options), + num_csv_cols_(num_csv_cols), + count_rows_(first_row >= 0), + num_rows_seen_(first_row) {} + + Result operator()(const CSVBlock& block) { + constexpr int32_t max_num_rows = std::numeric_limits::max(); + auto parser = std::make_shared( + io_context_.pool(), parse_options_, num_csv_cols_, num_rows_seen_, max_num_rows); + + std::shared_ptr straddling; + std::vector views; + if (block.partial->size() != 0 || block.completion->size() != 0) { + if (block.partial->size() == 0) { + straddling = block.completion; + } else if (block.completion->size() == 0) { + straddling = block.partial; + } else { + ARROW_ASSIGN_OR_RAISE( + straddling, + ConcatenateBuffers({block.partial, block.completion}, io_context_.pool())); + } + views = {util::string_view(*straddling), util::string_view(*block.buffer)}; + } else { + views = {util::string_view(*block.buffer)}; + } + uint32_t parsed_size; + if (block.is_final) { + RETURN_NOT_OK(parser->ParseFinal(views, &parsed_size)); + } else { + RETURN_NOT_OK(parser->Parse(views, &parsed_size)); + } + if (count_rows_) { + num_rows_seen_ += parser->num_rows(); + } + RETURN_NOT_OK(block.consume_bytes(parsed_size)); + return ParsedBlock{std::move(parser), block.block_index, + static_cast(parsed_size) + block.bytes_skipped}; + } + + private: + io::IOContext io_context_; + ParseOptions parse_options_; + int num_csv_cols_; + bool count_rows_; + int64_t num_rows_seen_; +}; + +///////////////////////////////////////////////////////////////////////// +// Base class for common functionality + +class ReaderMixin { + public: + ReaderMixin(io::IOContext io_context, std::shared_ptr input, + const ReadOptions& read_options, const ParseOptions& parse_options, + const ConvertOptions& convert_options, bool count_rows) + : io_context_(std::move(io_context)), + read_options_(read_options), + parse_options_(parse_options), + convert_options_(convert_options), + count_rows_(count_rows), + num_rows_seen_(count_rows_ ? 1 : -1), + input_(std::move(input)) {} + + protected: + // Read header and column names from buffer, create column builders + // Returns the # of bytes consumed + Result ProcessHeader(const std::shared_ptr& buf, + std::shared_ptr* rest) { + const uint8_t* data = buf->data(); + const auto data_end = data + buf->size(); + DCHECK_GT(data_end - data, 0); + + if (read_options_.skip_rows) { + // Skip initial rows (potentially invalid CSV data) + auto num_skipped_rows = SkipRows(data, static_cast(data_end - data), + read_options_.skip_rows, &data); + if (num_skipped_rows < read_options_.skip_rows) { + return Status::Invalid( + "Could not skip initial ", read_options_.skip_rows, + " rows from CSV file, " + "either file is too short or header is larger than block size"); + } + if (count_rows_) { + num_rows_seen_ += num_skipped_rows; + } + } - std::vector - GenerateColumnNames(int32_t num_cols) { - std::vector res; - res.reserve(num_cols); - for (int32_t i = 0; i < num_cols; ++i) { - std::stringstream ss; - ss << "f" << i; - res.push_back(ss.str()); - } - return res; + if (read_options_.column_names.empty()) { + // Parse one row (either to read column names or to know the number of columns) + BlockParser parser(io_context_.pool(), parse_options_, num_csv_cols_, + num_rows_seen_, 1); + uint32_t parsed_size = 0; + RETURN_NOT_OK(parser.Parse( + util::string_view(reinterpret_cast(data), data_end - data), + &parsed_size)); + if (parser.num_rows() != 1) { + return Status::Invalid( + "Could not read first row from CSV file, either " + "file is too short or header is larger than block size"); + } + if (parser.num_cols() == 0) { + return Status::Invalid("No columns in CSV file"); + } + + if (read_options_.autogenerate_column_names) { + column_names_ = GenerateColumnNames(parser.num_cols()); + } else { + // Read column names from header row + auto visit = [&](const uint8_t* data, uint32_t size, bool quoted) -> Status { + column_names_.emplace_back(reinterpret_cast(data), size); + return Status::OK(); + }; + RETURN_NOT_OK(parser.VisitLastRow(visit)); + DCHECK_EQ(static_cast(parser.num_cols()), column_names_.size()); + // Skip parsed header row + data += parsed_size; + if (count_rows_) { + ++num_rows_seen_; } + } + } else { + column_names_ = read_options_.column_names; + } - // Make conversion schema from options and parsed CSV header - Status - MakeConversionSchema() { - // Append a column converted from CSV data - auto append_csv_column - = [&](std::string col_name, int32_t col_index) { - // Does the named column have a fixed type? - auto it = convert_options_.column_types.find(col_name); - if (it == convert_options_.column_types.end()) { - conversion_schema_.columns.push_back( - ConversionSchema::InferredColumn( - std::move(col_name), col_index)); - } else { - conversion_schema_.columns.push_back( - ConversionSchema::TypedColumn( - std::move(col_name), col_index, it->second)); - } - }; - - // Append a column of nulls - auto append_null_column = [&](std::string col_name) { - // If the named column has a fixed type, use it, otherwise use - // null() - std::shared_ptr type; - auto it = convert_options_.column_types.find(col_name); - if (it == convert_options_.column_types.end()) { - type = null(); - } else { - type = it->second; - } - conversion_schema_.columns.push_back( - ConversionSchema::NullColumn( - std::move(col_name), std::move(type))); - }; - - if (convert_options_.include_columns.empty()) { - // Include all columns in CSV file order - for (int32_t col_index = 0; col_index < num_csv_cols_; - ++col_index) { - append_csv_column(column_names_[col_index], col_index); - } - } else { - // Include columns from `include_columns` (in that order) - // Compute indices of columns in the CSV file - std::unordered_map col_indices; - col_indices.reserve(column_names_.size()); - for (int32_t i = 0; - i < static_cast(column_names_.size()); ++i) { - col_indices.emplace(column_names_[i], i); - } - - for (const auto& col_name : convert_options_.include_columns) { - auto it = col_indices.find(col_name); - if (it != col_indices.end()) { - append_csv_column(col_name, it->second); - } else if (convert_options_.include_missing_columns) { - append_null_column(col_name); - } else { - return Status::KeyError("Column '", col_name, - "' in include_columns " - "does not exist in CSV file"); - } - } - } - return Status::OK(); - } + if (count_rows_) { + // increase rows seen to skip past rows which will be skipped + num_rows_seen_ += read_options_.skip_rows_after_names; + } - struct ParseResult { - std::shared_ptr parser; - int64_t parsed_bytes; - }; + auto bytes_consumed = data - buf->data(); + *rest = SliceBuffer(buf, bytes_consumed); - Result - Parse(const std::shared_ptr& partial, - const std::shared_ptr& completion, - const std::shared_ptr& block, int64_t block_index, - bool is_final) { - static constexpr int32_t max_num_rows - = std::numeric_limits::max(); - auto parser = std::make_shared( - pool_, parse_options_, num_csv_cols_, max_num_rows); - - std::shared_ptr straddling; - std::vector views; - if (partial->size() != 0 || completion->size() != 0) { - if (partial->size() == 0) { - straddling = completion; - } else if (completion->size() == 0) { - straddling = partial; - } else { - ARROW_ASSIGN_OR_RAISE(straddling, - ConcatenateBuffers({partial, completion}, pool_)); - } - views = { - util::string_view(*straddling), util::string_view(*block)}; - } else { - views = {util::string_view(*block)}; - } - uint32_t parsed_size; - if (is_final) { - RETURN_NOT_OK(parser->ParseFinal(views, &parsed_size)); - } else { - RETURN_NOT_OK(parser->Parse(views, &parsed_size)); - } - return ParseResult{ - std::move(parser), static_cast(parsed_size)}; - } + num_csv_cols_ = static_cast(column_names_.size()); + DCHECK_GT(num_csv_cols_, 0); - MemoryPool* pool_; - ReadOptions read_options_; - ParseOptions parse_options_; - ConvertOptions convert_options_; + RETURN_NOT_OK(MakeConversionSchema()); + return bytes_consumed; + } - // Number of columns in the CSV file - int32_t num_csv_cols_ = -1; - // Column names in the CSV file - std::vector column_names_; - ConversionSchema conversion_schema_; + std::vector GenerateColumnNames(int32_t num_cols) { + std::vector res; + res.reserve(num_cols); + for (int32_t i = 0; i < num_cols; ++i) { + std::stringstream ss; + ss << "f" << i; + res.push_back(ss.str()); + } + return res; + } + + // Make conversion schema from options and parsed CSV header + Status MakeConversionSchema() { + // Append a column converted from CSV data + auto append_csv_column = [&](std::string col_name, int32_t col_index) { + // Does the named column have a fixed type? + auto it = convert_options_.column_types.find(col_name); + if (it == convert_options_.column_types.end()) { + conversion_schema_.columns.push_back( + ConversionSchema::InferredColumn(std::move(col_name), col_index)); + } else { + conversion_schema_.columns.push_back( + ConversionSchema::TypedColumn(std::move(col_name), col_index, it->second)); + } + }; - std::shared_ptr input_; - Iterator> buffer_iterator_; - std::shared_ptr task_group_; + // Append a column of nulls + auto append_null_column = [&](std::string col_name) { + // If the named column has a fixed type, use it, otherwise use null() + std::shared_ptr type; + auto it = convert_options_.column_types.find(col_name); + if (it == convert_options_.column_types.end()) { + type = null(); + } else { + type = it->second; + } + conversion_schema_.columns.push_back( + ConversionSchema::NullColumn(std::move(col_name), std::move(type))); }; - ///////////////////////////////////////////////////////////////////////// - // Base class for one-shot table readers - - class BaseTableReader : public ReaderMixin, public csv::TableReader { - public: - using ReaderMixin::ReaderMixin; - - virtual Status Init() = 0; - - protected: - // Make column builders from conversion schema - Status - MakeColumnBuilders() { - for (const auto& column : conversion_schema_.columns) { - std::shared_ptr builder; - if (column.is_missing) { - ARROW_ASSIGN_OR_RAISE(builder, - ColumnBuilder::MakeNull( - pool_, column.type, task_group_)); - } else if (column.type != nullptr) { - ARROW_ASSIGN_OR_RAISE(builder, - ColumnBuilder::Make(pool_, column.type, column.index, - convert_options_, task_group_)); - } else { - ARROW_ASSIGN_OR_RAISE(builder, - ColumnBuilder::Make(pool_, column.index, - convert_options_, task_group_)); - } - column_builders_.push_back(std::move(builder)); - } - return Status::OK(); + if (convert_options_.include_columns.empty()) { + // Include all columns in CSV file order + for (int32_t col_index = 0; col_index < num_csv_cols_; ++col_index) { + append_csv_column(column_names_[col_index], col_index); + } + } else { + // Include columns from `include_columns` (in that order) + // Compute indices of columns in the CSV file + std::unordered_map col_indices; + col_indices.reserve(column_names_.size()); + for (int32_t i = 0; i < static_cast(column_names_.size()); ++i) { + col_indices.emplace(column_names_[i], i); + } + + for (const auto& col_name : convert_options_.include_columns) { + auto it = col_indices.find(col_name); + if (it != col_indices.end()) { + append_csv_column(col_name, it->second); + } else if (convert_options_.include_missing_columns) { + append_null_column(col_name); + } else { + return Status::KeyError("Column '", col_name, + "' in include_columns " + "does not exist in CSV file"); } + } + } + return Status::OK(); + } + + struct ParseResult { + std::shared_ptr parser; + int64_t parsed_bytes; + }; + + Result Parse(const std::shared_ptr& partial, + const std::shared_ptr& completion, + const std::shared_ptr& block, int64_t block_index, + bool is_final) { + static constexpr int32_t max_num_rows = std::numeric_limits::max(); + auto parser = std::make_shared( + io_context_.pool(), parse_options_, num_csv_cols_, num_rows_seen_, max_num_rows); + + std::shared_ptr straddling; + std::vector views; + if (partial->size() != 0 || completion->size() != 0) { + if (partial->size() == 0) { + straddling = completion; + } else if (completion->size() == 0) { + straddling = partial; + } else { + ARROW_ASSIGN_OR_RAISE( + straddling, ConcatenateBuffers({partial, completion}, io_context_.pool())); + } + views = {util::string_view(*straddling), util::string_view(*block)}; + } else { + views = {util::string_view(*block)}; + } + uint32_t parsed_size; + if (is_final) { + RETURN_NOT_OK(parser->ParseFinal(views, &parsed_size)); + } else { + RETURN_NOT_OK(parser->Parse(views, &parsed_size)); + } + if (count_rows_) { + num_rows_seen_ += parser->num_rows(); + } + return ParseResult{std::move(parser), static_cast(parsed_size)}; + } + + io::IOContext io_context_; + ReadOptions read_options_; + ParseOptions parse_options_; + ConvertOptions convert_options_; + + // Number of columns in the CSV file + int32_t num_csv_cols_ = -1; + // Whether num_rows_seen_ tracks the number of rows seen in the CSV being parsed + bool count_rows_; + // Number of rows seen in the csv. Not used if count_rows is false + int64_t num_rows_seen_; + // Column names in the CSV file + std::vector column_names_; + ConversionSchema conversion_schema_; + + std::shared_ptr input_; + std::shared_ptr task_group_; +}; + +///////////////////////////////////////////////////////////////////////// +// Base class for one-shot table readers + +class BaseTableReader : public ReaderMixin, public csv::TableReader { + public: + using ReaderMixin::ReaderMixin; + + virtual Status Init() = 0; + +// Future> ReadAsync() override { +// return Future>::MakeFinished(Read()); +// } + + protected: + // Make column builders from conversion schema + Status MakeColumnBuilders() { + for (const auto& column : conversion_schema_.columns) { + std::shared_ptr builder; + if (column.is_missing) { + ARROW_ASSIGN_OR_RAISE(builder, ColumnBuilder::MakeNull(io_context_.pool(), + column.type, task_group_)); + } else if (column.type != nullptr) { + ARROW_ASSIGN_OR_RAISE( + builder, ColumnBuilder::Make(io_context_.pool(), column.type, column.index, + convert_options_, task_group_)); + } else { + ARROW_ASSIGN_OR_RAISE(builder, + ColumnBuilder::Make(io_context_.pool(), column.index, + convert_options_, task_group_)); + } + column_builders_.push_back(std::move(builder)); + } + return Status::OK(); + } + + Result ParseAndInsert(const std::shared_ptr& partial, + const std::shared_ptr& completion, + const std::shared_ptr& block, + int64_t block_index, bool is_final) { + ARROW_ASSIGN_OR_RAISE(auto result, + Parse(partial, completion, block, block_index, is_final)); + RETURN_NOT_OK(ProcessData(result.parser, block_index)); + return result.parsed_bytes; + } + + // Trigger conversion of parsed block data + Status ProcessData(const std::shared_ptr& parser, int64_t block_index) { + for (auto& builder : column_builders_) { + builder->Insert(block_index, parser); + } + return Status::OK(); + } - Result - ParseAndInsert(const std::shared_ptr& partial, - const std::shared_ptr& completion, - const std::shared_ptr& block, int64_t block_index, - bool is_final) { - ARROW_ASSIGN_OR_RAISE(auto result, - Parse(partial, completion, block, block_index, is_final)); - RETURN_NOT_OK(ProcessData(result.parser, block_index)); - return result.parsed_bytes; - } + Result> MakeTable() { + DCHECK_EQ(column_builders_.size(), conversion_schema_.columns.size()); - // Trigger conversion of parsed block data - Status - ProcessData( - const std::shared_ptr& parser, int64_t block_index) { - for (auto& builder : column_builders_) { - builder->Insert(block_index, parser); - } - return Status::OK(); - } + std::vector> fields; + std::vector> columns; - Result> - MakeTable() { - DCHECK_EQ( - column_builders_.size(), conversion_schema_.columns.size()); - - std::vector> fields; - std::vector> columns; - - for (int32_t i = 0; - i < static_cast(column_builders_.size()); ++i) { - const auto& column = conversion_schema_.columns[i]; - ARROW_ASSIGN_OR_RAISE( - auto array, column_builders_[i]->Finish()); - fields.push_back(::arrow::field(column.name, array->type())); - columns.emplace_back(std::move(array)); - } - return Table::Make(schema(fields), columns); - } + for (int32_t i = 0; i < static_cast(column_builders_.size()); ++i) { + const auto& column = conversion_schema_.columns[i]; + ARROW_ASSIGN_OR_RAISE(auto array, column_builders_[i]->Finish()); + fields.push_back(::arrow::field(column.name, array->type())); + columns.emplace_back(std::move(array)); + } + return Table::Make(schema(std::move(fields)), std::move(columns)); + } - // Column builders for target Table (in ConversionSchema order) - std::vector> column_builders_; - }; + // Column builders for target Table (in ConversionSchema order) + std::vector> column_builders_; +}; - ///////////////////////////////////////////////////////////////////////// - // Base class for streaming readers - ///////////////////////////////////////////////////////////////////////// - // Serial TableReader implementation +} // namespace - class SerialTableReader : public BaseTableReader { - public: - using BaseTableReader::BaseTableReader; - Status - Init() override { - ARROW_ASSIGN_OR_RAISE(auto istream_it, - io::MakeInputStreamIterator(input_, read_options_.block_size)); +///////////////////////////////////////////////////////////////////////// +// Serial TableReader implementation - buffer_iterator_ = CSVBufferIterator::Make(std::move(istream_it)); - return Status::OK(); - } +class SerialTableReader : public BaseTableReader { + public: + using BaseTableReader::BaseTableReader; - Result> - Read() override { - task_group_ = internal::TaskGroup::MakeSerial(); - - // First block - ARROW_ASSIGN_OR_RAISE(auto first_buffer, buffer_iterator_.Next()); - if (first_buffer == nullptr) { - return Status::Invalid("Empty CSV file"); - } - RETURN_NOT_OK(ProcessHeader(first_buffer, &first_buffer)); - RETURN_NOT_OK(MakeColumnBuilders()); - - SerialBlockReader block_reader(MakeChunker(parse_options_), - std::move(buffer_iterator_), std::move(first_buffer)); - - while (true) { - ARROW_ASSIGN_OR_RAISE(auto maybe_block, block_reader.Next()); - if (!maybe_block.has_value()) { - // EOF - break; - } - ARROW_ASSIGN_OR_RAISE(int64_t parsed_bytes, - ParseAndInsert(maybe_block->partial, - maybe_block->completion, maybe_block->buffer, - maybe_block->block_index, maybe_block->is_final)); - RETURN_NOT_OK(maybe_block->consume_bytes(parsed_bytes)); - } - // Finish conversion, create schema and table - RETURN_NOT_OK(task_group_->Finish()); - return MakeTable(); - } - }; + Status Init() override { + ARROW_ASSIGN_OR_RAISE(auto istream_it, + io::MakeInputStreamIterator(input_, read_options_.block_size)); - ///////////////////////////////////////////////////////////////////////// - // Factory functions - - Result> - TableReader::Make(MemoryPool* pool, std::shared_ptr input, - const ReadOptions& read_options, const ParseOptions& parse_options, - const ConvertOptions& convert_options) { - std::shared_ptr reader; - // if (read_options.use_threads) { - // reader = std::make_shared( - // pool, input, read_options, parse_options, convert_options, - // GetCpuThreadPool()); - // } else { - reader = std::make_shared( - pool, input, read_options, parse_options, convert_options); - - RETURN_NOT_OK(reader->Init()); - return reader; - } + // Since we're converting serially, no need to readahead more than one block + // int32_t block_queue_size = 1; + // ARROW_ASSIGN_OR_RAISE(auto rh_it, + // MakeReadaheadIterator(std::move(istream_it), block_queue_size)); + buffer_iterator_ = CSVBufferIterator::Make(std::move(istream_it)); + return Status::OK(); + } -} // namespace csv -} // namespace arrow + Result> Read() override { + task_group_ = internal::TaskGroup::MakeSerial(io_context_.stop_token()); + + // First block + ARROW_ASSIGN_OR_RAISE(auto first_buffer, buffer_iterator_.Next()); + if (first_buffer == nullptr) { + return Status::Invalid("Empty CSV file"); + } + RETURN_NOT_OK(ProcessHeader(first_buffer, &first_buffer)); + RETURN_NOT_OK(MakeColumnBuilders()); + + auto block_iterator = SerialBlockReader::MakeIterator( + std::move(buffer_iterator_), MakeChunker(parse_options_), std::move(first_buffer), + read_options_.skip_rows_after_names); + while (true) { + RETURN_NOT_OK(io_context_.stop_token().Poll()); + + ARROW_ASSIGN_OR_RAISE(auto maybe_block, block_iterator.Next()); + if (IsIterationEnd(maybe_block)) { + // EOF + break; + } + ARROW_ASSIGN_OR_RAISE( + int64_t parsed_bytes, + ParseAndInsert(maybe_block.partial, maybe_block.completion, maybe_block.buffer, + maybe_block.block_index, maybe_block.is_final)); + RETURN_NOT_OK(maybe_block.consume_bytes(parsed_bytes)); + } + // Finish conversion, create schema and table + RETURN_NOT_OK(task_group_->Finish()); + return MakeTable(); + } + + protected: + Iterator> buffer_iterator_; +}; + + +Result> MakeTableReader( + MemoryPool* pool, io::IOContext io_context, std::shared_ptr input, + const ReadOptions& read_options, const ParseOptions& parse_options, + const ConvertOptions& convert_options) { + RETURN_NOT_OK(parse_options.Validate()); + RETURN_NOT_OK(read_options.Validate()); + RETURN_NOT_OK(convert_options.Validate()); + std::shared_ptr reader; +// if (read_options.use_threads) { +// auto cpu_executor = internal::GetCpuThreadPool(); +// reader = std::make_shared( +// io_context, input, read_options, parse_options, convert_options, cpu_executor); +// } else { + reader = std::make_shared(io_context, input, read_options, + parse_options, convert_options, + /*count_rows=*/true); +// } + RETURN_NOT_OK(reader->Init()); + return reader; +} + + + + +///////////////////////////////////////////////////////////////////////// +// Factory functions + +Result> TableReader::Make( + io::IOContext io_context, std::shared_ptr input, + const ReadOptions& read_options, const ParseOptions& parse_options, + const ConvertOptions& convert_options) { + return MakeTableReader(io_context.pool(), io_context, std::move(input), read_options, + parse_options, convert_options); +} + +Result> TableReader::Make( + MemoryPool* pool, io::IOContext io_context, std::shared_ptr input, + const ReadOptions& read_options, const ParseOptions& parse_options, + const ConvertOptions& convert_options) { + return MakeTableReader(pool, io_context, std::move(input), read_options, parse_options, + convert_options); +} + +} // namespace csv + +} // namespace arrow diff --git a/cpp/perspective/src/include/perspective/vendor/arrow_single_threaded_reader.h b/cpp/perspective/src/include/perspective/vendor/arrow_single_threaded_reader.h index ceaa2ae4a6..bc725c1042 100644 --- a/cpp/perspective/src/include/perspective/vendor/arrow_single_threaded_reader.h +++ b/cpp/perspective/src/include/perspective/vendor/arrow_single_threaded_reader.h @@ -27,35 +27,48 @@ */ #pragma once + #include -#include "arrow/csv/options.h" // IWYU pragma: keep + +#include "arrow/csv/options.h" // IWYU pragma: keep +#include "arrow/io/interfaces.h" #include "arrow/record_batch.h" #include "arrow/result.h" #include "arrow/type.h" #include "arrow/type_fwd.h" +// #include "arrow/util/future.h" +// #include "arrow/util/thread_pool.h" #include "arrow/util/visibility.h" namespace arrow { - namespace io { - class InputStream; -} // namespace io +class InputStream; +} // namespace io namespace csv { - /// A class that reads an entire CSV file into a Arrow Table - class ARROW_EXPORT TableReader { - public: - virtual ~TableReader() = default; +/// A class that reads an entire CSV file into a Arrow Table +class ARROW_EXPORT TableReader { + public: + virtual ~TableReader() = default; + + /// Read the entire CSV file and convert it to a Arrow Table + virtual Result> Read() = 0; + /// Read the entire CSV file and convert it to a Arrow Table +// virtual Future> ReadAsync() = 0; - /// Read the entire CSV file and convert it to a Arrow Table - virtual Result> Read() = 0; + /// Create a TableReader instance + static Result> Make(io::IOContext io_context, + std::shared_ptr input, + const ReadOptions&, + const ParseOptions&, + const ConvertOptions&); - /// Create a TableReader instance - static Result> Make(MemoryPool* pool, - std::shared_ptr input, const ReadOptions&, - const ParseOptions&, const ConvertOptions&); - }; + ARROW_DEPRECATED("Use MemoryPool-less variant (the IOContext holds a pool already)") + static Result> Make( + MemoryPool* pool, io::IOContext io_context, std::shared_ptr input, + const ReadOptions&, const ParseOptions&, const ConvertOptions&); +}; -} // namespace csv -} // namespace arrow +} // namespace csv +} // namespace arrow diff --git a/docker/python/manylinux2010/Dockerfile b/docker/python/manylinux2010/Dockerfile index fbdb0210fc..bb9901ff87 100644 --- a/docker/python/manylinux2010/Dockerfile +++ b/docker/python/manylinux2010/Dockerfile @@ -61,10 +61,10 @@ RUN cd boost_1_71_0 && ./bootstrap.sh RUN cd boost_1_71_0 && ./b2 -j8 --with-program_options --with-filesystem --with-system install RUN python2.7 -m pip install 'numpy>=1.13.1' 'pandas>=0.22.0' 'pyarrow>=0.16.0' git+https://chromium.googlesource.com/external/gyp -RUN python3.6 -m pip install 'numpy>=1.13.1' 'pandas>=0.22.0' 'pyarrow>=2.0.0' -RUN python3.7 -m pip install 'numpy>=1.13.1' 'pandas>=0.22.0' 'pyarrow>=2.0.0' -RUN python3.8 -m pip install 'numpy>=1.13.1' 'pandas>=0.22.0' 'pyarrow>=2.0.0' -RUN python3.9 -m pip install 'numpy>=1.13.1' 'pandas>=0.22.0' 'pyarrow>=2.0.0' +RUN python3.6 -m pip install 'numpy>=1.13.1' 'pandas>=0.22.0' 'pyarrow>=5.0.0' +RUN python3.7 -m pip install 'numpy>=1.13.1' 'pandas>=0.22.0' 'pyarrow>=5.0.0' +RUN python3.8 -m pip install 'numpy>=1.13.1' 'pandas>=0.22.0' 'pyarrow>=5.0.0' +RUN python3.9 -m pip install 'numpy>=1.13.1' 'pandas>=0.22.0' 'pyarrow>=5.0.0' # install node RUN curl -sL https://rpm.nodesource.com/setup_10.x | sudo bash - diff --git a/docker/python/manylinux2014/Dockerfile b/docker/python/manylinux2014/Dockerfile index fe89dc051f..fd609e99ab 100644 --- a/docker/python/manylinux2014/Dockerfile +++ b/docker/python/manylinux2014/Dockerfile @@ -56,10 +56,10 @@ RUN tar xfz boost_1_71_0.tar.gz RUN cd boost_1_71_0 && ./bootstrap.sh RUN cd boost_1_71_0 && ./b2 -j8 --with-program_options --with-filesystem --with-system install -RUN python3.6 -m pip install 'numpy>=1.13.1' 'pandas>=0.22.0' 'pyarrow>=2.0.0' -RUN python3.7 -m pip install 'numpy>=1.13.1' 'pandas>=0.22.0' 'pyarrow>=2.0.0' -RUN python3.8 -m pip install 'numpy>=1.13.1' 'pandas>=0.22.0' 'pyarrow>=2.0.0' -RUN python3.9 -m pip install 'numpy>=1.13.1' 'pandas>=0.22.0' 'pyarrow>=2.0.0' +RUN python3.6 -m pip install 'numpy>=1.13.1' 'pandas>=0.22.0' 'pyarrow>=5.0.0' +RUN python3.7 -m pip install 'numpy>=1.13.1' 'pandas>=0.22.0' 'pyarrow>=5.0.0' +RUN python3.8 -m pip install 'numpy>=1.13.1' 'pandas>=0.22.0' 'pyarrow>=5.0.0' +RUN python3.9 -m pip install 'numpy>=1.13.1' 'pandas>=0.22.0' 'pyarrow>=5.0.0' # install node RUN curl -sL https://rpm.nodesource.com/setup_14.x | sudo bash -