Skip to content

Commit

Permalink
Remove Reader struct, condense into Cursor (#48)
Browse files Browse the repository at this point in the history
* Remove Reader struct, condense into Cursor

* Don't read all stripe footers upfront
  • Loading branch information
Jefffrey authored Nov 27, 2023
1 parent 0392dd9 commit ed84772
Show file tree
Hide file tree
Showing 10 changed files with 295 additions and 321 deletions.
8 changes: 3 additions & 5 deletions benches/arrow_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
use std::fs::File;

use criterion::{criterion_group, criterion_main, Criterion};
use datafusion_orc::{ArrowReader, ArrowStreamReader, Cursor, Reader};
use datafusion_orc::{ArrowReader, ArrowStreamReader, Cursor};
use futures_util::TryStreamExt;

fn basic_path(path: &str) -> String {
Expand All @@ -44,8 +44,7 @@ async fn async_read_all() {
let file_path = basic_path(file);
let f = tokio::fs::File::open(file_path).await.unwrap();

let reader = Reader::new_async(f).await.unwrap();
let cursor = Cursor::root(reader).unwrap();
let cursor = Cursor::root_async(f).await.unwrap();

ArrowStreamReader::new(cursor, None)
.try_collect::<Vec<_>>()
Expand All @@ -58,8 +57,7 @@ fn sync_read_all() {
let file_path = basic_path(file);
let f = File::open(file_path).unwrap();

let reader = Reader::new(f).unwrap();
let cursor = Cursor::root(reader).unwrap();
let cursor = Cursor::root(f).unwrap();

ArrowReader::new(cursor, None)
.collect::<Result<Vec<_>, _>>()
Expand Down
167 changes: 109 additions & 58 deletions src/arrow_reader.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
pub mod column;

use std::collections::HashMap;
use std::io::{Read, Seek};
use std::io::Read;
use std::sync::Arc;

use arrow::array::{
Expand All @@ -20,6 +20,7 @@ use arrow::datatypes::{Field, TimeUnit};
use arrow::error::ArrowError;
use arrow::record_batch::{RecordBatch, RecordBatchReader};
use bytes::Bytes;
use prost::Message;
use snafu::{OptionExt, ResultExt};

use self::column::list::{new_list_iter, ListDecoder};
Expand All @@ -36,15 +37,16 @@ use crate::arrow_reader::column::struct_column::StructDecoder;
use crate::arrow_reader::column::timestamp::new_timestamp_iter;
use crate::arrow_reader::column::NullableIterator;
use crate::builder::BoxedArrayBuilder;
use crate::error::{self, InvalidColumnSnafu, Result};
use crate::error::{self, InvalidColumnSnafu, IoSnafu, Result};
use crate::proto::stream::Kind;
use crate::proto::StripeFooter;
use crate::reader::decompress::{Compression, Decompressor};
use crate::reader::Reader;
use crate::reader::metadata::{read_metadata, read_metadata_async, FileMetadata};
use crate::reader::{AsyncChunkReader, ChunkReader};
use crate::schema::{DataType, RootDataType};
use crate::stripe::StripeMetadata;

pub struct ArrowReader<R: Read> {
pub struct ArrowReader<R: ChunkReader> {
cursor: Cursor<R>,
schema_ref: SchemaRef,
current_stripe: Option<Box<dyn Iterator<Item = Result<RecordBatch>>>>,
Expand All @@ -53,7 +55,7 @@ pub struct ArrowReader<R: Read> {

pub const DEFAULT_BATCH_SIZE: usize = 8192;

impl<R: Read> ArrowReader<R> {
impl<R: ChunkReader> ArrowReader<R> {
pub fn new(cursor: Cursor<R>, batch_size: Option<usize>) -> Self {
let batch_size = batch_size.unwrap_or(DEFAULT_BATCH_SIZE);
let schema = Arc::new(create_arrow_schema(&cursor));
Expand All @@ -66,11 +68,11 @@ impl<R: Read> ArrowReader<R> {
}

pub fn total_row_count(&self) -> u64 {
self.cursor.reader.metadata().number_of_rows()
self.cursor.file_metadata.number_of_rows()
}
}

impl<R: Read + Seek> ArrowReader<R> {
impl<R: ChunkReader> ArrowReader<R> {
fn try_advance_stripe(&mut self) -> Option<std::result::Result<RecordBatch, ArrowError>> {
match self
.cursor
Expand All @@ -96,22 +98,21 @@ impl<R: Read + Seek> ArrowReader<R> {

pub fn create_arrow_schema<R>(cursor: &Cursor<R>) -> Schema {
let metadata = cursor
.reader
.metadata()
.file_metadata
.user_custom_metadata()
.iter()
.map(|(key, value)| (key.clone(), String::from_utf8_lossy(value).to_string()))
.collect::<HashMap<_, _>>();
cursor.root_data_type.create_arrow_schema(&metadata)
cursor.projected_data_type.create_arrow_schema(&metadata)
}

impl<R: Read + Seek> RecordBatchReader for ArrowReader<R> {
impl<R: ChunkReader> RecordBatchReader for ArrowReader<R> {
fn schema(&self) -> SchemaRef {
self.schema_ref.clone()
}
}

impl<R: Read + Seek> Iterator for ArrowReader<R> {
impl<R: ChunkReader> Iterator for ArrowReader<R> {
type Item = std::result::Result<RecordBatch, ArrowError>;

fn next(&mut self) -> Option<Self::Item> {
Expand Down Expand Up @@ -806,45 +807,78 @@ impl NaiveStripeDecoder {
}

pub struct Cursor<R> {
pub(crate) reader: Reader<R>,
pub(crate) root_data_type: RootDataType,
pub(crate) reader: R,
pub(crate) file_metadata: Arc<FileMetadata>,
pub(crate) projected_data_type: RootDataType,
pub(crate) stripe_offset: usize,
}

impl<R> Cursor<R> {
pub fn new<T: AsRef<str>>(r: Reader<R>, fields: &[T]) -> Result<Self> {
let projected_data_type = r.metadata().root_data_type().project(fields);
impl<R: ChunkReader> Cursor<R> {
pub fn new<T: AsRef<str>>(mut reader: R, fields: &[T]) -> Result<Self> {
let file_metadata = Arc::new(read_metadata(&mut reader)?);
let projected_data_type = file_metadata.root_data_type().project(fields);
Ok(Self {
reader: r,
root_data_type: projected_data_type,
reader,
file_metadata,
projected_data_type,
stripe_offset: 0,
})
}

pub fn root(r: Reader<R>) -> Result<Self> {
let data_type = r.metadata().root_data_type().clone();
pub fn root(mut reader: R) -> Result<Self> {
let file_metadata = Arc::new(read_metadata(&mut reader)?);
let data_type = file_metadata.root_data_type().clone();
Ok(Self {
reader: r,
root_data_type: data_type,
reader,
file_metadata,
projected_data_type: data_type,
stripe_offset: 0,
})
}
}

impl<R: Read + Seek> Iterator for Cursor<R> {
impl<R: AsyncChunkReader> Cursor<R> {
pub async fn new_async<T: AsRef<str>>(mut reader: R, fields: &[T]) -> Result<Self> {
let file_metadata = Arc::new(read_metadata_async(&mut reader).await?);
let projected_data_type = file_metadata.root_data_type().project(fields);
Ok(Self {
reader,
file_metadata,
projected_data_type,
stripe_offset: 0,
})
}

pub async fn root_async(mut reader: R) -> Result<Self> {
let file_metadata = Arc::new(read_metadata_async(&mut reader).await?);
let data_type = file_metadata.root_data_type().clone();
Ok(Self {
reader,
file_metadata,
projected_data_type: data_type,
stripe_offset: 0,
})
}
}

impl<R: ChunkReader> Iterator for Cursor<R> {
type Item = Result<Stripe>;

fn next(&mut self) -> Option<Self::Item> {
if let Some(info) = self.reader.stripe(self.stripe_offset).cloned() {
if let Some(info) = self
.file_metadata
.stripe_metadatas()
.get(self.stripe_offset)
.cloned()
{
let stripe = Stripe::new(
&mut self.reader,
self.root_data_type.clone(),
&self.file_metadata,
&self.projected_data_type.clone(),
self.stripe_offset,
&info,
);

self.stripe_offset += 1;

Some(stripe)
} else {
None
Expand All @@ -861,41 +895,23 @@ pub struct Stripe {
pub(crate) stream_map: Arc<StreamMap>,
}

#[derive(Debug)]
pub struct StreamMap {
pub inner: HashMap<(u32, Kind), Bytes>,
pub compression: Option<Compression>,
}

impl StreamMap {
pub fn get(&self, column: &Column, kind: Kind) -> Result<Decompressor> {
self.get_opt(column, kind).context(InvalidColumnSnafu {
name: column.name(),
})
}

pub fn get_opt(&self, column: &Column, kind: Kind) -> Option<Decompressor> {
let column_id = column.column_id();

self.inner
.get(&(column_id, kind))
.cloned()
.map(|data| Decompressor::new(data, self.compression, vec![]))
}
}

impl Stripe {
pub fn new<R: Read + Seek>(
r: &mut Reader<R>,
root_data_type: RootDataType,
pub fn new<R: ChunkReader>(
reader: &mut R,
file_metadata: &Arc<FileMetadata>,
projected_data_type: &RootDataType,
stripe: usize,
info: &StripeMetadata,
) -> Result<Self> {
let footer = Arc::new(r.stripe_footer(stripe).clone());
let compression = file_metadata.compression();

let footer = reader
.get_bytes(info.footer_offset(), info.footer_length())
.context(IoSnafu)?;
let footer = Arc::new(deserialize_stripe_footer(&footer, compression)?);

let compression = r.metadata().compression();
//TODO(weny): add tz
let columns = root_data_type
let columns = projected_data_type
.children()
.iter()
.map(|(name, data_type)| Column::new(name, data_type, &footer, info.number_of_rows()))
Expand All @@ -907,7 +923,7 @@ impl Stripe {
let length = stream.length();
let column_id = stream.column();
let kind = stream.kind();
let data = Column::read_stream(r, stream_offset, length as usize)?;
let data = Column::read_stream(reader, stream_offset, length)?;

// TODO(weny): filter out unused streams.
stream_map.insert((column_id, kind), data);
Expand All @@ -934,3 +950,38 @@ impl Stripe {
self.stripe_offset
}
}

#[derive(Debug)]
pub struct StreamMap {
pub inner: HashMap<(u32, Kind), Bytes>,
pub compression: Option<Compression>,
}

impl StreamMap {
pub fn get(&self, column: &Column, kind: Kind) -> Result<Decompressor> {
self.get_opt(column, kind).context(InvalidColumnSnafu {
name: column.name(),
})
}

pub fn get_opt(&self, column: &Column, kind: Kind) -> Option<Decompressor> {
let column_id = column.column_id();

self.inner
.get(&(column_id, kind))
.cloned()
.map(|data| Decompressor::new(data, self.compression, vec![]))
}
}

pub(crate) fn deserialize_stripe_footer(
bytes: &[u8],
compression: Option<Compression>,
) -> Result<StripeFooter> {
let mut buffer = vec![];
// TODO: refactor to not need Bytes::copy_from_slice
Decompressor::new(Bytes::copy_from_slice(bytes), compression, vec![])
.read_to_end(&mut buffer)
.context(error::IoSnafu)?;
StripeFooter::decode(buffer.as_slice()).context(error::DecodeProtoSnafu)
}
40 changes: 8 additions & 32 deletions src/arrow_reader/column.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
use std::io::{Read, Seek, SeekFrom};
use std::sync::Arc;

use arrow::datatypes::Field;
use bytes::Bytes;
use snafu::ResultExt;
use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt};

use crate::error::{self, Result};
use crate::error::{IoSnafu, Result};
use crate::proto::{ColumnEncoding, StripeFooter};
use crate::reader::Reader;
use crate::reader::{AsyncChunkReader, ChunkReader};
use crate::schema::DataType;

pub mod binary;
Expand Down Expand Up @@ -45,24 +43,6 @@ impl From<&Column> for Field {
}
}

macro_rules! impl_read_stream {
($reader:ident,$start:ident,$length:ident $($_await:tt)*) => {{
$reader
.inner
.seek(SeekFrom::Start($start))$($_await)*
.context(error::IoSnafu)?;

let mut scratch = vec![0; $length];

$reader
.inner
.read_exact(&mut scratch)$($_await)*
.context(error::IoSnafu)?;

Ok(Bytes::from(scratch))
}};
}

impl Column {
pub fn new(
name: &str,
Expand Down Expand Up @@ -172,20 +152,16 @@ impl Column {
}
}

pub fn read_stream<R: Read + Seek>(
reader: &mut Reader<R>,
start: u64,
length: usize,
) -> Result<Bytes> {
impl_read_stream!(reader, start, length)
pub fn read_stream<R: ChunkReader>(reader: &mut R, start: u64, length: u64) -> Result<Bytes> {
reader.get_bytes(start, length).context(IoSnafu)
}

pub async fn read_stream_async<R: AsyncRead + AsyncSeek + Unpin + Send>(
reader: &mut Reader<R>,
pub async fn read_stream_async<R: AsyncChunkReader>(
reader: &mut R,
start: u64,
length: usize,
length: u64,
) -> Result<Bytes> {
impl_read_stream!(reader, start, length.await)
reader.get_bytes(start, length).await.context(IoSnafu)
}
}

Expand Down
Loading

0 comments on commit ed84772

Please sign in to comment.