Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use ParquetRecordBatchStream #2711

Closed
wants to merge 14 commits into from
1 change: 1 addition & 0 deletions datafusion/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ pyarrow = ["pyo3"]
arrow = { version = "16.0.0", features = ["prettyprint"] }
avro-rs = { version = "0.13", features = ["snappy"], optional = true }
cranelift-module = { version = "0.85.0", optional = true }
object_store = { version = "0.3", optional = true }
ordered-float = "3.0"
parquet = { version = "16.0.0", features = ["arrow"], optional = true }
pyo3 = { version = "0.16", optional = true }
Expand Down
21 changes: 21 additions & 0 deletions datafusion/common/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ pub enum DataFusionError {
/// Wraps an error from the Avro crate
#[cfg(feature = "avro")]
AvroError(AvroError),
/// Wraps an error from the object_store crate
#[cfg(feature = "object_store")]
ObjectStore(object_store::Error),
/// Error associated to I/O operations and associated traits.
IoError(io::Error),
/// Error returned when SQL is syntactically incorrect.
Expand Down Expand Up @@ -203,6 +206,20 @@ impl From<AvroError> for DataFusionError {
}
}

#[cfg(feature = "object_store")]
impl From<object_store::Error> for DataFusionError {
fn from(e: object_store::Error) -> Self {
DataFusionError::ObjectStore(e)
}
}

#[cfg(feature = "object_store")]
impl From<object_store::path::Error> for DataFusionError {
fn from(e: object_store::path::Error) -> Self {
DataFusionError::ObjectStore(e.into())
}
}

impl From<ParserError> for DataFusionError {
fn from(e: ParserError) -> Self {
DataFusionError::SQL(e)
Expand Down Expand Up @@ -264,6 +281,10 @@ impl Display for DataFusionError {
DataFusionError::JITError(ref desc) => {
write!(f, "JIT error: {}", desc)
}
#[cfg(feature = "object_store")]
DataFusionError::ObjectStore(ref desc) => {
write!(f, "Object Store error: {}", desc)
}
}
}
}
Expand Down
7 changes: 4 additions & 3 deletions datafusion/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,9 @@ ahash = { version = "0.7", default-features = false }
arrow = { version = "16.0.0", features = ["prettyprint"] }
async-trait = "0.1.41"
avro-rs = { version = "0.13", features = ["snappy"], optional = true }
bytes = "1.1"
chrono = { version = "0.4", default-features = false }
datafusion-common = { path = "../common", version = "9.0.0", features = ["parquet"] }
datafusion-data-access = { path = "../data-access", version = "9.0.0" }
datafusion-common = { path = "../common", version = "9.0.0", features = ["parquet", "object_store"] }
datafusion-expr = { path = "../expr", version = "9.0.0" }
datafusion-jit = { path = "../jit", version = "9.0.0", optional = true }
datafusion-optimizer = { path = "../optimizer", version = "9.0.0" }
Expand All @@ -75,9 +75,10 @@ lazy_static = { version = "^1.4.0" }
log = "^0.4"
num-traits = { version = "0.2", optional = true }
num_cpus = "1.13.0"
object_store = "0.3.0"
ordered-float = "3.0"
parking_lot = "0.12"
parquet = { version = "16.0.0", features = ["arrow"] }
parquet = { version = "16.0.0", features = ["arrow", "async"] }
paste = "^1.0"
pin-project-lite = "^0.2.7"
pyo3 = { version = "0.16", optional = true }
Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/src/catalog/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ mod tests {
use std::sync::Arc;

use arrow::datatypes::Schema;
use datafusion_data_access::object_store::local::LocalFileSystem;
use object_store::local::LocalFileSystem;

use crate::assert_batches_eq;
use crate::catalog::catalog::{CatalogProvider, MemoryCatalogProvider};
Expand Down Expand Up @@ -170,7 +170,7 @@ mod tests {
let schema = MemorySchemaProvider::new();

let ctx = SessionContext::new();
let store = Arc::new(LocalFileSystem {});
let store = Arc::new(LocalFileSystem::new());
ctx.runtime_env().register_object_store("file", store);

let config = ListingTableConfig::new(table_path)
Expand Down
19 changes: 12 additions & 7 deletions datafusion/core/src/datasource/file_format/avro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use std::sync::Arc;
use arrow::datatypes::Schema;
use arrow::{self, datatypes::SchemaRef};
use async_trait::async_trait;
use datafusion_data_access::FileMeta;
use object_store::{GetResult, ObjectMeta, ObjectStore};

use super::FileFormat;
use crate::avro_to_arrow::read_avro_schema_from_reader;
Expand All @@ -32,7 +32,6 @@ use crate::logical_plan::Expr;
use crate::physical_plan::file_format::{AvroExec, FileScanConfig};
use crate::physical_plan::ExecutionPlan;
use crate::physical_plan::Statistics;
use datafusion_data_access::object_store::ObjectStore;

/// The default file extension of avro files
pub const DEFAULT_AVRO_EXTENSION: &str = ".avro";
Expand All @@ -49,12 +48,18 @@ impl FileFormat for AvroFormat {
async fn infer_schema(
&self,
store: &Arc<dyn ObjectStore>,
files: &[FileMeta],
objects: &[ObjectMeta],
) -> Result<SchemaRef> {
let mut schemas = vec![];
for file in files {
let mut reader = store.file_reader(file.sized_file.clone())?.sync_reader()?;
let schema = read_avro_schema_from_reader(&mut reader)?;
for object in objects {
let schema = match store.get(&object.location).await? {
GetResult::File(mut file, _) => read_avro_schema_from_reader(&mut file)?,
r @ GetResult::Stream(_) => {
// TODO: Fetching entire file to get schema is potentially wasteful
let data = r.bytes().await?;
read_avro_schema_from_reader(&mut data.as_ref())?
}
};
schemas.push(schema);
}
let merged_schema = Schema::try_merge(schemas)?;
Expand All @@ -65,7 +70,7 @@ impl FileFormat for AvroFormat {
&self,
_store: &Arc<dyn ObjectStore>,
_table_schema: SchemaRef,
_file: &FileMeta,
_object: &ObjectMeta,
) -> Result<Statistics> {
Ok(Statistics::default())
}
Expand Down
20 changes: 13 additions & 7 deletions datafusion/core/src/datasource/file_format/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ use std::sync::Arc;
use arrow::datatypes::Schema;
use arrow::{self, datatypes::SchemaRef};
use async_trait::async_trait;
use datafusion_data_access::FileMeta;
use datafusion_common::DataFusionError;
use futures::TryFutureExt;
use object_store::{ObjectMeta, ObjectStore};

use super::FileFormat;
use crate::datasource::file_format::DEFAULT_SCHEMA_INFER_MAX_RECORD;
Expand All @@ -32,7 +34,6 @@ use crate::logical_plan::Expr;
use crate::physical_plan::file_format::{CsvExec, FileScanConfig};
use crate::physical_plan::ExecutionPlan;
use crate::physical_plan::Statistics;
use datafusion_data_access::object_store::ObjectStore;

/// The default file extension of csv files
pub const DEFAULT_CSV_EXTENSION: &str = ".csv";
Expand Down Expand Up @@ -96,16 +97,21 @@ impl FileFormat for CsvFormat {
async fn infer_schema(
&self,
store: &Arc<dyn ObjectStore>,
files: &[FileMeta],
objects: &[ObjectMeta],
) -> Result<SchemaRef> {
let mut schemas = vec![];

let mut records_to_read = self.schema_infer_max_rec.unwrap_or(usize::MAX);

for file in files {
let mut reader = store.file_reader(file.sized_file.clone())?.sync_reader()?;
for object in objects {
let data = store
.get(&object.location)
.and_then(|r| r.bytes())
.await
.map_err(|e| DataFusionError::External(Box::new(e)))?;

let (schema, records_read) = arrow::csv::reader::infer_reader_schema(
&mut reader,
&mut data.as_ref(),
self.delimiter,
Some(records_to_read),
self.has_header,
Expand All @@ -128,7 +134,7 @@ impl FileFormat for CsvFormat {
&self,
_store: &Arc<dyn ObjectStore>,
_table_schema: SchemaRef,
_file: &FileMeta,
_object: &ObjectMeta,
) -> Result<Statistics> {
Ok(Statistics::default())
}
Expand Down
43 changes: 26 additions & 17 deletions datafusion/core/src/datasource/file_format/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use arrow::datatypes::SchemaRef;
use arrow::json::reader::infer_json_schema_from_iterator;
use arrow::json::reader::ValueIter;
use async_trait::async_trait;
use datafusion_data_access::{object_store::ObjectStore, FileMeta};
use object_store::{GetResult, ObjectMeta, ObjectStore};

use super::FileFormat;
use super::FileScanConfig;
Expand Down Expand Up @@ -71,21 +71,33 @@ impl FileFormat for JsonFormat {
async fn infer_schema(
&self,
store: &Arc<dyn ObjectStore>,
files: &[FileMeta],
objects: &[ObjectMeta],
) -> Result<SchemaRef> {
let mut schemas = Vec::new();
let mut records_to_read = self.schema_infer_max_rec.unwrap_or(usize::MAX);
for file in files {
let reader = store.file_reader(file.sized_file.clone())?.sync_reader()?;
let mut reader = BufReader::new(reader);
let iter = ValueIter::new(&mut reader, None);
let schema = infer_json_schema_from_iterator(iter.take_while(|_| {
for object in objects {
let mut take_while = || {
let should_take = records_to_read > 0;
if should_take {
records_to_read -= 1;
}
should_take
}))?;
};

let schema = match store.get(&object.location).await? {
GetResult::File(file, _) => {
let mut reader = BufReader::new(file);
let iter = ValueIter::new(&mut reader, None);
infer_json_schema_from_iterator(iter.take_while(|_| take_while()))?
}
r @ GetResult::Stream(_) => {
let data = r.bytes().await?;
let mut reader = BufReader::new(data.as_ref());
let iter = ValueIter::new(&mut reader, None);
infer_json_schema_from_iterator(iter.take_while(|_| take_while()))?
}
};

schemas.push(schema);
if records_to_read == 0 {
break;
Expand All @@ -100,7 +112,7 @@ impl FileFormat for JsonFormat {
&self,
_store: &Arc<dyn ObjectStore>,
_table_schema: SchemaRef,
_file: &FileMeta,
_object: &ObjectMeta,
) -> Result<Statistics> {
Ok(Statistics::default())
}
Expand All @@ -120,15 +132,12 @@ mod tests {
use super::super::test_util::scan_format;
use arrow::array::Int64Array;
use futures::StreamExt;
use object_store::local::LocalFileSystem;

use super::*;
use crate::physical_plan::collect;
use crate::prelude::{SessionConfig, SessionContext};
use crate::{
datafusion_data_access::object_store::local::{
local_unpartitioned_file, LocalFileSystem,
},
physical_plan::collect,
};
use crate::test::object_store::local_unpartitioned_file;

#[tokio::test]
async fn read_small_batches() -> Result<()> {
Expand Down Expand Up @@ -229,12 +238,12 @@ mod tests {

#[tokio::test]
async fn infer_schema_with_limit() {
let store = Arc::new(LocalFileSystem {}) as _;
let store = Arc::new(LocalFileSystem::new()) as _;
let filename = "tests/jsons/schema_infer_limit.json";
let format = JsonFormat::default().with_schema_infer_max_rec(Some(3));

let file_schema = format
.infer_schema(&store, &[local_unpartitioned_file(filename.to_string())])
.infer_schema(&store, &[local_unpartitioned_file(filename)])
.await
.expect("Schema inference");

Expand Down
16 changes: 7 additions & 9 deletions datafusion/core/src/datasource/file_format/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,7 @@ use crate::physical_plan::file_format::FileScanConfig;
use crate::physical_plan::{ExecutionPlan, Statistics};

use async_trait::async_trait;
use datafusion_data_access::object_store::ObjectStore;
use datafusion_data_access::FileMeta;
use object_store::{ObjectMeta, ObjectStore};

/// This trait abstracts all the file format specific implementations
/// from the `TableProvider`. This helps code re-utilization across
Expand All @@ -55,7 +54,7 @@ pub trait FileFormat: Send + Sync + fmt::Debug {
async fn infer_schema(
&self,
store: &Arc<dyn ObjectStore>,
files: &[FileMeta],
objects: &[ObjectMeta],
) -> Result<SchemaRef>;

/// Infer the statistics for the provided object. The cost and accuracy of the
Expand All @@ -69,7 +68,7 @@ pub trait FileFormat: Send + Sync + fmt::Debug {
&self,
store: &Arc<dyn ObjectStore>,
table_schema: SchemaRef,
file: &FileMeta,
object: &ObjectMeta,
) -> Result<Statistics>;

/// Take a list of files and convert it to the appropriate executor
Expand All @@ -86,9 +85,8 @@ pub(crate) mod test_util {
use super::*;
use crate::datasource::listing::PartitionedFile;
use crate::datasource::object_store::ObjectStoreUrl;
use datafusion_data_access::object_store::local::{
local_unpartitioned_file, LocalFileSystem,
};
use crate::test::object_store::local_unpartitioned_file;
use object_store::local::LocalFileSystem;

pub async fn scan_format(
format: &dyn FileFormat,
Expand All @@ -97,7 +95,7 @@ pub(crate) mod test_util {
projection: Option<Vec<usize>>,
limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
let store = Arc::new(LocalFileSystem {}) as _;
let store = Arc::new(LocalFileSystem::new()) as _;
let meta = local_unpartitioned_file(format!("{}/{}", store_root, file_name));

let file_schema = format.infer_schema(&store, &[meta.clone()]).await?;
Expand All @@ -107,7 +105,7 @@ pub(crate) mod test_util {
.await?;

let file_groups = vec![vec![PartitionedFile {
file_meta: meta,
object_meta: meta,
partition_values: vec![],
range: None,
}]];
Expand Down
Loading