Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add write_parquet to DataFrame #1940

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions datafusion/src/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use crate::error::Result;
use crate::logical_plan::{
DFSchema, Expr, FunctionRegistry, JoinType, LogicalPlan, Partitioning,
};
use parquet::file::properties::WriterProperties;
use std::sync::Arc;

use crate::physical_plan::SendableRecordBatchStream;
Expand Down Expand Up @@ -408,4 +409,11 @@ pub trait DataFrame: Send + Sync {

/// Write a `DataFrame` to a CSV file.
async fn write_csv(&self, path: &str) -> Result<()>;

/// Write a `DataFrame` to a Parquet file.
async fn write_parquet(
&self,
path: &str,
writer_properties: Option<WriterProperties>,
) -> Result<()>;
}
145 changes: 3 additions & 142 deletions datafusion/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,9 @@ use crate::{
use log::{debug, trace};
use parking_lot::Mutex;
use std::collections::{HashMap, HashSet};
use std::path::Path;
use std::path::PathBuf;
use std::string::String;
use std::sync::Arc;
use std::{fs, path::PathBuf};

use futures::{StreamExt, TryStreamExt};
use tokio::task::{self, JoinHandle};

use arrow::datatypes::SchemaRef;

Expand Down Expand Up @@ -80,7 +76,7 @@ use crate::physical_optimizer::repartition::Repartition;

use crate::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
use crate::logical_plan::plan::Explain;
use crate::physical_plan::file_format::plan_to_csv;
use crate::physical_plan::file_format::{plan_to_csv, plan_to_parquet};
use crate::physical_plan::planner::DefaultPhysicalPlanner;
use crate::physical_plan::udf::ScalarUDF;
use crate::physical_plan::ExecutionPlan;
Expand All @@ -93,7 +89,6 @@ use crate::variable::{VarProvider, VarType};
use crate::{dataframe::DataFrame, physical_plan::udaf::AggregateUDF};
use async_trait::async_trait;
use chrono::{DateTime, Utc};
use parquet::arrow::ArrowWriter;
use parquet::file::properties::WriterProperties;

use super::{
Expand Down Expand Up @@ -728,42 +723,7 @@ impl ExecutionContext {
path: impl AsRef<str>,
writer_properties: Option<WriterProperties>,
) -> Result<()> {
let path = path.as_ref();
// create directory to contain the Parquet files (one per partition)
let fs_path = Path::new(path);
let runtime = self.runtime_env();
match fs::create_dir(fs_path) {
Ok(()) => {
let mut tasks = vec![];
for i in 0..plan.output_partitioning().partition_count() {
let plan = plan.clone();
let filename = format!("part-{}.parquet", i);
let path = fs_path.join(&filename);
let file = fs::File::create(path)?;
let mut writer = ArrowWriter::try_new(
file.try_clone().unwrap(),
plan.schema(),
writer_properties.clone(),
)?;
let stream = plan.execute(i, runtime.clone()).await?;
let handle: JoinHandle<Result<()>> = task::spawn(async move {
stream
.map(|batch| writer.write(&batch?))
.try_collect()
.await
.map_err(DataFusionError::from)?;
writer.close().map_err(DataFusionError::from).map(|_| ())
});
tasks.push(handle);
}
futures::future::join_all(tasks).await;
Ok(())
}
Err(e) => Err(DataFusionError::Execution(format!(
"Could not create directory {}: {:?}",
path, e
))),
}
plan_to_parquet(self, plan, path, writer_properties).await
}

/// Optimizes the logical plan by applying optimizer rules, and
Expand Down Expand Up @@ -2680,79 +2640,6 @@ mod tests {
Ok(())
}

#[tokio::test]
async fn write_csv_results() -> Result<()> {
// create partitioned input file and context
let tmp_dir = TempDir::new()?;
let mut ctx = create_ctx(&tmp_dir, 4).await?;

// execute a simple query and write the results to CSV
let out_dir = tmp_dir.as_ref().to_str().unwrap().to_string() + "/out";
write_csv(&mut ctx, "SELECT c1, c2 FROM test", &out_dir).await?;

// create a new context and verify that the results were saved to a partitioned csv file
let mut ctx = ExecutionContext::new();

let schema = Arc::new(Schema::new(vec![
Field::new("c1", DataType::UInt32, false),
Field::new("c2", DataType::UInt64, false),
]));

// register each partition as well as the top level dir
let csv_read_option = CsvReadOptions::new().schema(&schema);
ctx.register_csv("part0", &format!("{}/part-0.csv", out_dir), csv_read_option)
.await?;
ctx.register_csv("allparts", &out_dir, csv_read_option)
.await?;

let part0 = plan_and_collect(&mut ctx, "SELECT c1, c2 FROM part0").await?;
let allparts = plan_and_collect(&mut ctx, "SELECT c1, c2 FROM allparts").await?;

let allparts_count: usize = allparts.iter().map(|batch| batch.num_rows()).sum();

assert_eq!(part0[0].schema(), allparts[0].schema());

assert_eq!(allparts_count, 40);

Ok(())
}

#[tokio::test]
async fn write_parquet_results() -> Result<()> {
// create partitioned input file and context
let tmp_dir = TempDir::new()?;
let mut ctx = create_ctx(&tmp_dir, 4).await?;

// execute a simple query and write the results to parquet
let out_dir = tmp_dir.as_ref().to_str().unwrap().to_string() + "/out";
write_parquet(&mut ctx, "SELECT c1, c2 FROM test", &out_dir, None).await?;

// create a new context and verify that the results were saved to a partitioned csv file
let mut ctx = ExecutionContext::new();

// register each partition as well as the top level dir
ctx.register_parquet("part0", &format!("{}/part-0.parquet", out_dir))
.await?;
ctx.register_parquet("part1", &format!("{}/part-1.parquet", out_dir))
.await?;
ctx.register_parquet("part2", &format!("{}/part-2.parquet", out_dir))
.await?;
ctx.register_parquet("part3", &format!("{}/part-3.parquet", out_dir))
.await?;
ctx.register_parquet("allparts", &out_dir).await?;

let part0 = plan_and_collect(&mut ctx, "SELECT c1, c2 FROM part0").await?;
let allparts = plan_and_collect(&mut ctx, "SELECT c1, c2 FROM allparts").await?;

let allparts_count: usize = allparts.iter().map(|batch| batch.num_rows()).sum();

assert_eq!(part0[0].schema(), allparts[0].schema());

assert_eq!(allparts_count, 40);

Ok(())
}

#[tokio::test]
async fn query_csv_with_custom_partition_extension() -> Result<()> {
let tmp_dir = TempDir::new()?;
Expand Down Expand Up @@ -3224,32 +3111,6 @@ mod tests {
plan_and_collect(&mut ctx, sql).await
}

/// Execute SQL and write results to partitioned csv files
async fn write_csv(
ctx: &mut ExecutionContext,
sql: &str,
out_dir: &str,
) -> Result<()> {
let logical_plan = ctx.create_logical_plan(sql)?;
let logical_plan = ctx.optimize(&logical_plan)?;
let physical_plan = ctx.create_physical_plan(&logical_plan).await?;
ctx.write_csv(physical_plan, out_dir).await
}

/// Execute SQL and write results to partitioned parquet files
async fn write_parquet(
ctx: &mut ExecutionContext,
sql: &str,
out_dir: &str,
writer_properties: Option<WriterProperties>,
) -> Result<()> {
let logical_plan = ctx.create_logical_plan(sql)?;
let logical_plan = ctx.optimize(&logical_plan)?;
let physical_plan = ctx.create_physical_plan(&logical_plan).await?;
ctx.write_parquet(physical_plan, out_dir, writer_properties)
.await
}

/// Generate CSV partitions within the supplied directory
fn populate_csv_partitions(
tmp_dir: &TempDir,
Expand Down
14 changes: 13 additions & 1 deletion datafusion/src/execution/dataframe_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,12 @@ use crate::{
dataframe::*,
physical_plan::{collect, collect_partitioned},
};
use parquet::file::properties::WriterProperties;

use crate::arrow::util::pretty;
use crate::datasource::TableProvider;
use crate::datasource::TableType;
use crate::physical_plan::file_format::plan_to_csv;
use crate::physical_plan::file_format::{plan_to_csv, plan_to_parquet};
use crate::physical_plan::{
execute_stream, execute_stream_partitioned, ExecutionPlan, SendableRecordBatchStream,
};
Expand Down Expand Up @@ -321,6 +322,17 @@ impl DataFrame for DataFrameImpl {
let ctx = ExecutionContext::from(Arc::new(Mutex::new(state)));
plan_to_csv(&ctx, plan, path).await
}

async fn write_parquet(
&self,
path: &str,
writer_properties: Option<WriterProperties>,
) -> Result<()> {
let plan = self.create_physical_plan().await?;
let state = self.ctx_state.lock().clone();
let ctx = ExecutionContext::from(Arc::new(Mutex::new(state)));
plan_to_parquet(&ctx, plan, path, writer_properties).await
}
}

#[cfg(test)]
Expand Down
1 change: 1 addition & 0 deletions datafusion/src/physical_plan/file_format/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ mod file_stream;
mod json;
mod parquet;

pub(crate) use self::parquet::plan_to_parquet;
pub use self::parquet::ParquetExec;
use arrow::{
array::{ArrayData, ArrayRef, DictionaryArray},
Expand Down
Loading