Skip to content

Commit

Permalink
Introduce ProjectionMask (#51)
Browse files Browse the repository at this point in the history
  • Loading branch information
Jefffrey authored Dec 29, 2023
1 parent f5a5b21 commit 3863577
Show file tree
Hide file tree
Showing 6 changed files with 177 additions and 49 deletions.
9 changes: 6 additions & 3 deletions src/arrow_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ use crate::arrow_reader::column::timestamp::new_timestamp_iter;
use crate::arrow_reader::column::NullableIterator;
use crate::builder::BoxedArrayBuilder;
use crate::error::{self, InvalidColumnSnafu, IoSnafu, Result};
use crate::projection::ProjectionMask;
use crate::proto::stream::Kind;
use crate::proto::StripeFooter;
use crate::reader::decompress::{Compression, Decompressor};
Expand Down Expand Up @@ -816,7 +817,8 @@ pub struct Cursor<R> {
impl<R: ChunkReader> Cursor<R> {
pub fn new<T: AsRef<str>>(mut reader: R, fields: &[T]) -> Result<Self> {
let file_metadata = Arc::new(read_metadata(&mut reader)?);
let projected_data_type = file_metadata.root_data_type().project(fields);
let mask = ProjectionMask::named_roots(file_metadata.root_data_type(), fields);
let projected_data_type = file_metadata.root_data_type().project(&mask);
Ok(Self {
reader,
file_metadata,
Expand All @@ -840,7 +842,8 @@ impl<R: ChunkReader> Cursor<R> {
impl<R: AsyncChunkReader> Cursor<R> {
pub async fn new_async<T: AsRef<str>>(mut reader: R, fields: &[T]) -> Result<Self> {
let file_metadata = Arc::new(read_metadata_async(&mut reader).await?);
let projected_data_type = file_metadata.root_data_type().project(fields);
let mask = ProjectionMask::named_roots(file_metadata.root_data_type(), fields);
let projected_data_type = file_metadata.root_data_type().project(&mask);
Ok(Self {
reader,
file_metadata,
Expand Down Expand Up @@ -914,7 +917,7 @@ impl Stripe {
let columns = projected_data_type
.children()
.iter()
.map(|(name, data_type)| Column::new(name, data_type, &footer, info.number_of_rows()))
.map(|col| Column::new(col.name(), col.data_type(), &footer, info.number_of_rows()))
.collect();

let mut stream_map = HashMap::new();
Expand Down
6 changes: 3 additions & 3 deletions src/arrow_reader/column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,11 +105,11 @@ impl Column {
| DataType::Date { .. } => vec![],
DataType::Struct { children, .. } => children
.iter()
.map(|(name, data_type)| Column {
.map(|col| Column {
number_of_rows: self.number_of_rows,
footer: self.footer.clone(),
name: name.clone(),
data_type: data_type.clone(),
name: col.name().to_string(),
data_type: col.data_type().clone(),
})
.collect(),
DataType::List { child, .. } => {
Expand Down
2 changes: 1 addition & 1 deletion src/async_arrow_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ impl Stripe {
let columns = projected_data_type
.children()
.iter()
.map(|(name, data_type)| Column::new(name, data_type, &footer, info.number_of_rows()))
.map(|col| Column::new(col.name(), col.data_type(), &footer, info.number_of_rows()))
.collect();

let mut stream_map = HashMap::new();
Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ pub mod arrow_reader;
pub mod async_arrow_reader;
pub(crate) mod builder;
pub mod error;
pub mod projection;
pub mod proto;
pub mod reader;
pub mod schema;
Expand Down
61 changes: 61 additions & 0 deletions src/projection.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
use crate::schema::RootDataType;

// TODO: be able to nest project (project columns within struct type)

/// Specifies which column indices to project from an ORC type.
#[derive(Debug, Clone)]
pub struct ProjectionMask {
/// Indices of column in ORC type, can refer to nested types
/// (not only root level columns)
indices: Option<Vec<usize>>,
}

impl ProjectionMask {
/// Project all columns.
pub fn all() -> Self {
Self { indices: None }
}

/// Project only specific columns from the root type by column index.
pub fn roots(root_data_type: &RootDataType, indices: impl IntoIterator<Item = usize>) -> Self {
// TODO: return error if column index not found?
let input_indices = indices.into_iter().collect::<Vec<_>>();
// By default always project root
let mut indices = vec![0];
root_data_type
.children()
.iter()
.filter(|col| input_indices.contains(&col.data_type().column_index()))
.for_each(|col| indices.extend(col.data_type().all_indices()));
Self {
indices: Some(indices),
}
}

/// Project only specific columns from the root type by column name.
pub fn named_roots<T>(root_data_type: &RootDataType, names: &[T]) -> Self
where
T: AsRef<str>,
{
// TODO: return error if column name not found?
// By default always project root
let mut indices = vec![0];
let names = names.iter().map(AsRef::as_ref).collect::<Vec<_>>();
root_data_type
.children()
.iter()
.filter(|col| names.contains(&col.name()))
.for_each(|col| indices.extend(col.data_type().all_indices()));
Self {
indices: Some(indices),
}
}

/// Check if ORC column should is projected or not, by index.
pub fn is_index_projected(&self, index: usize) -> bool {
match &self.indices {
Some(indices) => indices.contains(&index),
None => true,
}
}
}
Loading

0 comments on commit 3863577

Please sign in to comment.