Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Document ChunkedArray Abstractions #5295

Closed
tustvold opened this issue Jan 11, 2024 · 11 comments · Fixed by #6527
Closed

Document ChunkedArray Abstractions #5295

tustvold opened this issue Jan 11, 2024 · 11 comments · Fixed by #6527
Assignees
Labels
enhancement Any new improvement worthy of a entry in the changelog

Comments

@tustvold
Copy link
Contributor

tustvold commented Jan 11, 2024

Is your feature request related to a problem or challenge? Please describe what you are trying to do.

The question has come up a couple of times as to why we don't have a ChunkedArray abstraction, we should document why we don't and what the equivalent constructions are.

Describe the solution you'd like

A ChunkedArray is really just sugar over the top of Vec<ArrayRef>, and is used within arrow-cpp and pyarrow for representing large in-memory datasets.

Equivalent constructions in arrow-rs would be:

  • Vec<ArrayRef>: a fairly exact mirror to ChunkedArray sans some ergonomic niceties like working natively in kernels
  • impl Iterator<Item=ArrayRef> a lazy version of a ChunkedArray
  • impl Stream<Item=ArrayRef> a lazy async version of a ChunkedArray - this is what DataFusion uses extensively

There are also equivalent constructions using RecordBatch instead of ArrayRef.

These abstractions are strictly more flexible the pyarrow ChunkedArray concept, integrate better with the Rust ecosystem, and encourage users towards lazy evaluation, which has much better memory usage characteristics.

Describe alternatives you've considered

Additional context

@tustvold tustvold added the enhancement Any new improvement worthy of a entry in the changelog label Jan 11, 2024
@alamb
Copy link
Contributor

alamb commented Jan 11, 2024

impl Stream<Item=ArrayRef> a lazy async version of a ChunkedArray - this is what DataFusion uses extensively

In case anyone wants details, this is called RecordBatchStream:

https://docs.rs/datafusion/latest/datafusion/execution/trait.RecordBatchStream.html

@efredine
Copy link
Contributor

efredine commented Oct 6, 2024

@alamb @tustvold - Last week I found myself wishing I had these abstractions. They are nice quality of life improvements.

So I'm happy to pick this one up. But I'll hold off if you think its too low a priority and will just add to the the backlog of pull requests problem (which seems to have abated somewhat since Andrew's call for help).

Cheers,
Eric

@tustvold
Copy link
Contributor Author

tustvold commented Oct 7, 2024

The point this ticket is making is these abstractions do exist, they just need documenting

@efredine
Copy link
Contributor

efredine commented Oct 7, 2024

Ha, ha - ok!

I think I also managed to persuade myself it wasn't necessary after thinking about it some more ;-).

My use case was a simple one - I'm retrieving some columns (all float values it so happens) from a Parquet file (using Data Fusion in fact) and plotting results. I wanted something like the equivalent of a "table of Chunked Arrays".

But that can be achieved with something like the following:

fn get_column_chunks_by_name(batches: Vec<RecordBatch>) -> HashMap<String, Vec<ArrayRef>> {
    batches
        .into_iter()
        .fold(HashMap::new(), |mut map: HashMap<String, Vec<ArrayRef>>, batch| {
            batch
                .columns()
                .iter()
                .enumerate()
                .for_each(|(i, array)| {
                    let name = batch.schema().field(i).name().to_string();
                    map.entry(name).or_default().push(array.clone());
                });
            map
        })
        .into_iter()
        .collect()
}

I'll have a go at writing something.

@efredine
Copy link
Contributor

efredine commented Oct 7, 2024

take

@tustvold
Copy link
Contributor Author

tustvold commented Oct 7, 2024

FWIW assuming the batches have the same schema, and therefore the same column order, you don't need the hashmap

@efredine
Copy link
Contributor

efredine commented Oct 8, 2024

Yeah - I know. Indexes always seem very fragile and error prone, especially if you have a lot of columns with the same type. So I was just experimenting with different approaches.

@kylebarron
Copy link
Contributor

I think it's fair that arrow-rs doesn't implement ChunkedArray — its surface area is big enough already — but I would like to point out that there are some use cases not served by its omission.

The most glaring is that it is not currently possible to use arrow-rs's FFI to exchange something like a ChunkedArray when those arrays do not represent RecordBatches. ffi_stream::ArrowArrayStreamReader exists but will error if the data type of the stream is not Struct.

This makes it impossible in the general case to interop with a pyarrow.ChunkedArray or polars.Series (via Python).

In pyo3-arrow I have an ArrayReader trait to parallel arrow::RecordBatchReader, and vendored a derived copy of ffi_stream.rs to make it possible to handle this interop (while not necessarily materializing the entire stream as a ChunkedArray..

impl Stream<Item=ArrayRef> a lazy async version of a ChunkedArray - this is what DataFusion uses extensively

In case anyone wants details, this is called RecordBatchStream:

It's IMO an important distinction that RecordBatchStream really isn't a impl Stream<Item=ArrayRef>, it's a impl Stream<Item=RecordBatch>.

@tustvold
Copy link
Contributor Author

tustvold commented Oct 9, 2024

The most glaring is that it is not currently possible to use arrow-rs's FFI to exchange something like a ChunkedArray

This feels like an unrelated feature request, if arrow FFI has a mechanism to transport bare arrays we should support that. Although reading https://arrow.apache.org/docs/format/CStreamInterface.html I'm not sure how this would work

@kylebarron
Copy link
Contributor

I was trying to express that there are some benefits to making a sequence of ArrayRef a first-class citizen. One of which would be FFI support.

Aside from FFI, I think a concept like an ArrayIterator (a direct corollary to RecordBatchIterator) is useful over impl Iterator<Item=ArrayRef> because it gives type-level validation that all the arrays in the iterator have the same DataType, while still avoiding the ChunkedArray pitfall of requiring all arrays to be in-memory.

if arrow FFI has a mechanism to transport bare arrays we should support that. Although reading arrow.apache.org/docs/format/CStreamInterface.html I'm not sure how this would work

We can discuss this on a separate issue if you'd prefer. Arrow FFI only transports bare arrays. get_next() of ArrowArrayStream returns an ArrowArray, and an ArrowArray can be any generic Arrow array. That Arrow array is often a StructArray, with the understanding that the StructArray represents a RecordBatch, but it doesn't have to be.

Here:

let result = unsafe {
from_ffi_and_data_type(array, DataType::Struct(self.schema().fields().clone()))
};
Some(result.map(|data| RecordBatch::from(StructArray::from(data))))

you assume that the data type of the stream is struct (and also assume that you can interpret the C Schema as a Schema), but that isn't required by the spec. To be more generic, you can use the data type of the C Schema directly.

@tustvold
Copy link
Contributor Author

tustvold commented Oct 9, 2024

To be more generic, you can use the data type of the C Schema directly.

I think a ticket to add support for this makes sense to me, likely the existing RecordBatch reader could just be a wrapper around a reader operating on arrays. This is the approach we use in most other readers, e.g. parquet, CSV, JSON, etc...

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement Any new improvement worthy of a entry in the changelog
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants