Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-44808: [C++][Parquet] Add arrow::Result version of parquet::arrow::FileReader::GetRecordBatchReader() #44809

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 19 additions & 22 deletions cpp/src/parquet/arrow/arrow_reader_writer_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -479,14 +479,16 @@ void DoRoundTripWithBatches(
->Build(&reader));
std::unique_ptr<::arrow::RecordBatchReader> batch_reader;
if (column_subset.size() > 0) {
ASSERT_OK_NO_THROW(reader->GetRecordBatchReader(
Iota(reader->parquet_reader()->metadata()->num_row_groups()), column_subset,
&batch_reader));
ASSERT_OK_AND_ASSIGN(
batch_reader,
reader->GetRecordBatchReader(
Iota(reader->parquet_reader()->metadata()->num_row_groups()), column_subset));
} else {
// Read everything

ASSERT_OK_NO_THROW(reader->GetRecordBatchReader(
Iota(reader->parquet_reader()->metadata()->num_row_groups()), &batch_reader));
ASSERT_OK_AND_ASSIGN(
batch_reader, reader->GetRecordBatchReader(
Iota(reader->parquet_reader()->metadata()->num_row_groups())));
}
ASSERT_OK_AND_ASSIGN(*out, Table::FromRecordBatchReader(batch_reader.get()));
}
Expand Down Expand Up @@ -2385,8 +2387,7 @@ void TestGetRecordBatchReader(
ASSERT_OK(builder.properties(properties)->Build(&reader));

// Read the whole file, one batch at a time.
std::shared_ptr<::arrow::RecordBatchReader> rb_reader;
ASSERT_OK_NO_THROW(reader->GetRecordBatchReader({0, 1}, &rb_reader));
ASSERT_OK_AND_ASSIGN(auto rb_reader, reader->GetRecordBatchReader({0, 1}));
std::shared_ptr<::arrow::RecordBatch> actual_batch, expected_batch;
::arrow::TableBatchReader table_reader(*table);
table_reader.set_chunksize(batch_size);
Expand All @@ -2401,7 +2402,7 @@ void TestGetRecordBatchReader(
ASSERT_EQ(nullptr, actual_batch);

// ARROW-6005: Read just the second row group
ASSERT_OK_NO_THROW(reader->GetRecordBatchReader({1}, &rb_reader));
ASSERT_OK_AND_ASSIGN(rb_reader, reader->GetRecordBatchReader({1}));
std::shared_ptr<Table> second_rowgroup = table->Slice(num_rows / 2);
::arrow::TableBatchReader second_table_reader(*second_rowgroup);
second_table_reader.set_chunksize(batch_size);
Expand Down Expand Up @@ -2448,8 +2449,8 @@ TEST(TestArrowReadWrite, WaitCoalescedReads) {
::arrow::io::CacheOptions::Defaults());
ASSERT_OK(reader->parquet_reader()->WhenBuffered({0}, {0, 1, 2, 3, 4}).status());

std::shared_ptr<::arrow::RecordBatchReader> rb_reader;
ASSERT_OK_NO_THROW(reader->GetRecordBatchReader({0}, {0, 1, 2, 3, 4}, &rb_reader));
ASSERT_OK_AND_ASSIGN(auto rb_reader,
reader->GetRecordBatchReader({0}, {0, 1, 2, 3, 4}));

std::shared_ptr<::arrow::RecordBatch> actual_batch;
ASSERT_OK(rb_reader->ReadNext(&actual_batch));
Expand Down Expand Up @@ -2507,8 +2508,8 @@ TEST(TestArrowReadWrite, GetRecordBatchReaderNoColumns) {
ASSERT_OK(builder.Open(std::make_shared<BufferReader>(buffer)));
ASSERT_OK(builder.properties(properties)->Build(&reader));

std::shared_ptr<::arrow::RecordBatchReader> rb_reader;
ASSERT_OK_NO_THROW(reader->GetRecordBatchReader({0}, {}, &rb_reader));
ASSERT_OK_AND_ASSIGN(auto rb_reader, reader->GetRecordBatchReader(std::vector<int>{0},
std::vector<int>{}));

std::shared_ptr<::arrow::RecordBatch> actual_batch;
ASSERT_OK(rb_reader->ReadNext(&actual_batch));
Expand Down Expand Up @@ -4016,8 +4017,7 @@ TEST(TestArrowReaderAdHoc, LARGE_MEMORY_TEST(LargeStringColumn)) {
// ARROW-9297: ensure RecordBatchReader also works
reader = ParquetFileReader::Open(std::make_shared<BufferReader>(tables_buffer));
ASSERT_OK(FileReader::Make(default_memory_pool(), std::move(reader), &arrow_reader));
std::shared_ptr<::arrow::RecordBatchReader> batch_reader;
ASSERT_OK_NO_THROW(arrow_reader->GetRecordBatchReader(&batch_reader));
ASSERT_OK_AND_ASSIGN(auto batch_reader, arrow_reader->GetRecordBatchReader());
ASSERT_OK_AND_ASSIGN(auto batched_table,
::arrow::Table::FromRecordBatchReader(batch_reader.get()));

Expand Down Expand Up @@ -4491,9 +4491,8 @@ class TestArrowReadDictionary : public ::testing::TestWithParam<double> {
void CheckStreamReadWholeFile(const Table& expected) {
ASSERT_OK_AND_ASSIGN(auto reader, GetReader());

std::unique_ptr<::arrow::RecordBatchReader> rb;
ASSERT_OK(reader->GetRecordBatchReader(
::arrow::internal::Iota(options.num_row_groups), &rb));
ASSERT_OK_AND_ASSIGN(auto rb, reader->GetRecordBatchReader(
::arrow::internal::Iota(options.num_row_groups)));

ASSERT_OK_AND_ASSIGN(auto actual, rb->ToTable());
::arrow::AssertTablesEqual(expected, *actual, /*same_chunk_layout=*/false);
Expand Down Expand Up @@ -4773,10 +4772,9 @@ TEST_F(TestArrowReadDeltaEncoding, IncrementalDecodeDeltaByteArray) {
ArrowReaderProperties properties = default_arrow_reader_properties();
properties.set_batch_size(batch_size);
std::unique_ptr<FileReader> parquet_reader;
std::shared_ptr<::arrow::RecordBatchReader> rb_reader;
ASSERT_OK(FileReader::Make(pool, ParquetFileReader::OpenFile(file, false), properties,
&parquet_reader));
ASSERT_OK(parquet_reader->GetRecordBatchReader(&rb_reader));
ASSERT_OK_AND_ASSIGN(auto rb_reader, parquet_reader->GetRecordBatchReader());

auto convert_options = ::arrow::csv::ConvertOptions::Defaults();
std::vector<std::string> column_names = {
Expand Down Expand Up @@ -5262,9 +5260,8 @@ TEST(TestArrowReadWrite, WriteAndReadRecordBatch) {
}

// Verify batch data read via RecordBatch
std::unique_ptr<::arrow::RecordBatchReader> batch_reader;
ASSERT_OK_NO_THROW(
arrow_reader->GetRecordBatchReader(Iota(num_row_groups), &batch_reader));
ASSERT_OK_AND_ASSIGN(auto batch_reader,
arrow_reader->GetRecordBatchReader(Iota(num_row_groups)));
std::shared_ptr<::arrow::RecordBatch> read_record_batch;
ASSERT_OK(batch_reader->ReadNext(&read_record_batch));
EXPECT_TRUE(record_batch->Equals(*read_record_batch));
Expand Down
57 changes: 34 additions & 23 deletions cpp/src/parquet/arrow/reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -325,19 +325,19 @@ class FileReaderImpl : public FileReader {
return ReadRowGroup(i, Iota(reader_->metadata()->num_columns()), table);
}

Status GetRecordBatchReader(const std::vector<int>& row_group_indices,
const std::vector<int>& column_indices,
std::unique_ptr<RecordBatchReader>* out) override;
Result<std::unique_ptr<RecordBatchReader>> GetRecordBatchReader(
const std::vector<int>& row_group_indices,
const std::vector<int>& column_indices) override;

Status GetRecordBatchReader(const std::vector<int>& row_group_indices,
std::unique_ptr<RecordBatchReader>* out) override {
Result<std::unique_ptr<RecordBatchReader>> GetRecordBatchReader(
const std::vector<int>& row_group_indices) override {
return GetRecordBatchReader(row_group_indices,
Iota(reader_->metadata()->num_columns()), out);
Iota(reader_->metadata()->num_columns()));
}

Status GetRecordBatchReader(std::unique_ptr<RecordBatchReader>* out) override {
Result<std::unique_ptr<RecordBatchReader>> GetRecordBatchReader() override {
return GetRecordBatchReader(Iota(num_row_groups()),
Iota(reader_->metadata()->num_columns()), out);
Iota(reader_->metadata()->num_columns()));
}

::arrow::Result<::arrow::AsyncGenerator<std::shared_ptr<::arrow::RecordBatch>>>
Expand Down Expand Up @@ -972,9 +972,8 @@ Status GetReader(const SchemaField& field, const std::shared_ptr<ReaderContext>&

} // namespace

Status FileReaderImpl::GetRecordBatchReader(const std::vector<int>& row_groups,
const std::vector<int>& column_indices,
std::unique_ptr<RecordBatchReader>* out) {
Result<std::unique_ptr<RecordBatchReader>> FileReaderImpl::GetRecordBatchReader(
const std::vector<int>& row_groups, const std::vector<int>& column_indices) {
RETURN_NOT_OK(BoundsCheck(row_groups, column_indices));

if (reader_properties_.pre_buffer()) {
Expand Down Expand Up @@ -1008,10 +1007,8 @@ Status FileReaderImpl::GetRecordBatchReader(const std::vector<int>& row_groups,
}
}

*out = std::make_unique<RowGroupRecordBatchReader>(
return std::make_unique<RowGroupRecordBatchReader>(
::arrow::MakeVectorIterator(std::move(batches)), std::move(batch_schema));

return Status::OK();
}

int64_t num_rows = 0;
Expand Down Expand Up @@ -1062,10 +1059,8 @@ Status FileReaderImpl::GetRecordBatchReader(const std::vector<int>& row_groups,
[table, table_reader] { return table_reader->Next(); });
});

*out = std::make_unique<RowGroupRecordBatchReader>(
return std::make_unique<RowGroupRecordBatchReader>(
::arrow::MakeFlattenIterator(std::move(batches)), std::move(batch_schema));

return Status::OK();
}

/// Given a file reader and a list of row groups, this is a generator of record
Expand Down Expand Up @@ -1291,26 +1286,42 @@ std::shared_ptr<RowGroupReader> FileReaderImpl::RowGroup(int row_group_index) {
// ----------------------------------------------------------------------
// Public factory functions

Status FileReader::GetRecordBatchReader(std::unique_ptr<RecordBatchReader>* out) {
ARROW_ASSIGN_OR_RAISE(*out, GetRecordBatchReader());
return Status::OK();
}

Status FileReader::GetRecordBatchReader(const std::vector<int>& row_group_indices,
std::unique_ptr<RecordBatchReader>* out) {
ARROW_ASSIGN_OR_RAISE(*out, GetRecordBatchReader(row_group_indices));
return Status::OK();
}

Status FileReader::GetRecordBatchReader(const std::vector<int>& row_group_indices,
const std::vector<int>& column_indices,
std::unique_ptr<RecordBatchReader>* out) {
ARROW_ASSIGN_OR_RAISE(*out, GetRecordBatchReader(row_group_indices, column_indices));
return Status::OK();
}

Status FileReader::GetRecordBatchReader(std::shared_ptr<RecordBatchReader>* out) {
std::unique_ptr<RecordBatchReader> tmp;
RETURN_NOT_OK(GetRecordBatchReader(&tmp));
ARROW_ASSIGN_OR_RAISE(auto tmp, GetRecordBatchReader());
out->reset(tmp.release());
return Status::OK();
}

Status FileReader::GetRecordBatchReader(const std::vector<int>& row_group_indices,
std::shared_ptr<RecordBatchReader>* out) {
std::unique_ptr<RecordBatchReader> tmp;
RETURN_NOT_OK(GetRecordBatchReader(row_group_indices, &tmp));
ARROW_ASSIGN_OR_RAISE(auto tmp, GetRecordBatchReader(row_group_indices));
out->reset(tmp.release());
return Status::OK();
}

Status FileReader::GetRecordBatchReader(const std::vector<int>& row_group_indices,
const std::vector<int>& column_indices,
std::shared_ptr<RecordBatchReader>* out) {
std::unique_ptr<RecordBatchReader> tmp;
RETURN_NOT_OK(GetRecordBatchReader(row_group_indices, column_indices, &tmp));
ARROW_ASSIGN_OR_RAISE(auto tmp,
GetRecordBatchReader(row_group_indices, column_indices));
out->reset(tmp.release());
return Status::OK();
}
Expand Down
41 changes: 37 additions & 4 deletions cpp/src/parquet/arrow/reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -155,18 +155,36 @@ class PARQUET_EXPORT FileReader {
std::shared_ptr<::arrow::ChunkedArray>* out) = 0;

/// \brief Return a RecordBatchReader of all row groups and columns.
virtual ::arrow::Status GetRecordBatchReader(
std::unique_ptr<::arrow::RecordBatchReader>* out) = 0;
///
/// \deprecated Deprecated in 19.0.0. Use arrow::Result version instead.
ARROW_DEPRECATED("Deprecated in 19.0.0. Use arrow::Result version instead.")
::arrow::Status GetRecordBatchReader(std::unique_ptr<::arrow::RecordBatchReader>* out);

/// \brief Return a RecordBatchReader of all row groups and columns.
virtual ::arrow::Result<std::unique_ptr<::arrow::RecordBatchReader>>
GetRecordBatchReader() = 0;

/// \brief Return a RecordBatchReader of row groups selected from row_group_indices.
///
/// Note that the ordering in row_group_indices matters. FileReaders must outlive
/// their RecordBatchReaders.
///
/// \returns error Status if row_group_indices contains an invalid index
///
/// \deprecated Deprecated in 19.0.0. Use arrow::Result version instead.
ARROW_DEPRECATED("Deprecated in 19.0.0. Use arrow::Result version instead.")
virtual ::arrow::Status GetRecordBatchReader(
const std::vector<int>& row_group_indices,
std::unique_ptr<::arrow::RecordBatchReader>* out) = 0;
std::unique_ptr<::arrow::RecordBatchReader>* out);

/// \brief Return a RecordBatchReader of row groups selected from row_group_indices.
///
/// Note that the ordering in row_group_indices matters. FileReaders must outlive
/// their RecordBatchReaders.
///
/// \returns error Result if row_group_indices contains an invalid index
virtual ::arrow::Result<std::unique_ptr<::arrow::RecordBatchReader>>
GetRecordBatchReader(const std::vector<int>& row_group_indices) = 0;

/// \brief Return a RecordBatchReader of row groups selected from
/// row_group_indices, whose columns are selected by column_indices.
Expand All @@ -176,9 +194,24 @@ class PARQUET_EXPORT FileReader {
///
/// \returns error Status if either row_group_indices or column_indices
/// contains an invalid index
///
/// \deprecated Deprecated in 19.0.0. Use arrow::Result version instead.
ARROW_DEPRECATED("Deprecated in 19.0.0. Use arrow::Result version instead.")
virtual ::arrow::Status GetRecordBatchReader(
const std::vector<int>& row_group_indices, const std::vector<int>& column_indices,
std::unique_ptr<::arrow::RecordBatchReader>* out) = 0;
std::unique_ptr<::arrow::RecordBatchReader>* out);

/// \brief Return a RecordBatchReader of row groups selected from
/// row_group_indices, whose columns are selected by column_indices.
///
/// Note that the ordering in row_group_indices and column_indices
/// matter. FileReaders must outlive their RecordBatchReaders.
///
/// \returns error Result if either row_group_indices or column_indices
/// contains an invalid index
virtual ::arrow::Result<std::unique_ptr<::arrow::RecordBatchReader>>
GetRecordBatchReader(const std::vector<int>& row_group_indices,
const std::vector<int>& column_indices) = 0;

/// \brief Return a RecordBatchReader of row groups selected from
/// row_group_indices, whose columns are selected by column_indices.
Expand Down
Loading