Skip to content

Commit

Permalink
Move to Polars to enable struct values unnest (#60)
Browse files Browse the repository at this point in the history
This enables unnesting of arrays of structs (see unnest_structs test)
  • Loading branch information
vincev authored Mar 4, 2024
1 parent dfd9a1e commit 0626169
Show file tree
Hide file tree
Showing 41 changed files with 1,842 additions and 3,153 deletions.
2,230 changes: 1,043 additions & 1,187 deletions Cargo.lock

Large diffs are not rendered by default.

34 changes: 24 additions & 10 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,26 +5,40 @@ repository = "https://github.com/vincev/dply-rs"
edition = "2021"
license = "Apache-2.0"
name = "dply"
version = "0.2.1"
version = "0.3.0"
rust-version = "1.70.0"

[dependencies]
anyhow = "1.0"
chrono = { version = "0.4.26", default-features = false }
clap = { version = "4.2", features = ["derive"] }
comfy-table = "7"
datafusion = { version = "32", default-features = false }
futures = "0.3.28"
hashbrown = "0.14.0"
home = "0.5"
lru = "0.12"
lru = "0.12.0"
nom = "7"
num-traits = "0.2.15"
parking_lot = "0.12.1"
reedline = "0.25"
reedline = "0.29"
regex = "1.9.4"
thiserror = "1.0"
tokio = { version = "1.29.1", features = ["rt-multi-thread", "macros", "sync"] }

[dependencies.polars]
version = "0.38.0"
default-features = false
features = [
"cross_join",
"csv",
"dtype-full",
"fmt",
"is_in",
"json",
"lazy",
"parquet",
"semi_anti_join",
"strings",
"timezones"
]

[dev-dependencies]
indoc = "2"

[profile.dev]
opt-level = 0
debug = 2
3 changes: 1 addition & 2 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
dply is a command line tool for viewing, querying, and writing csv and parquet
files, inspired by [dplyr](https://dplyr.tidyverse.org/index.html) and powered by
[DataFusion](https://github.com/apache/arrow-datafusion).
files, inspired by [dplyr](https://dplyr.tidyverse.org/index.html).

## Usage overview

Expand Down
195 changes: 86 additions & 109 deletions src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,9 @@
// SPDX-License-Identifier: Apache-2.0

//! Evaluate pipeline functions.
use anyhow::{bail, Result};
use datafusion::{
execution::context::SessionContext, logical_expr::LogicalPlan, physical_plan::ExecutionPlan,
prelude::Expr as DFExpr,
};
use std::{collections::HashMap, future::Future, sync::Arc};
use tokio::runtime;
use anyhow::{anyhow, bail, Result};
use polars::prelude::*;
use std::collections::HashMap;

use crate::{completions::Completions, config::FormatConfig, parser::Expr};

Expand All @@ -23,7 +19,6 @@ mod fmt;
mod glimpse;
mod group_by;
mod head;
mod io;
mod joins;
mod json;
mod mutate;
Expand All @@ -35,13 +30,14 @@ mod show;
mod summarize;
mod unnest;

#[derive(Default)]
pub struct Context {
/// Named logical plans.
vars: HashMap<String, LogicalPlan>,
/// Logical plan passed from one pipeline step to the next.
plan: Option<LogicalPlan>,
/// Columns passed to aggregate functions.
group: Option<Vec<DFExpr>>,
/// Named data frames.
vars: HashMap<String, LazyFrame>,
/// Input dataframe passed from one pipeline step to the next.
df: Option<LazyFrame>,
/// Group passed to aggregate functions.
group: Option<LazyGroupBy>,
/// Dataframe columns.
columns: Vec<String>,
/// Optional output used for testing.
Expand All @@ -50,30 +46,6 @@ pub struct Context {
format_config: FormatConfig,
/// Completions lru
completions: Completions,
/// Tokio runtime to run async tasks.
runtime: runtime::Runtime,
/// Datafusion context
session: SessionContext,
}

impl Default for Context {
fn default() -> Self {
let runtime = runtime::Builder::new_multi_thread()
.enable_all()
.build()
.unwrap();
Self {
vars: Default::default(),
plan: Default::default(),
group: Default::default(),
columns: Default::default(),
output: Default::default(),
format_config: Default::default(),
completions: Default::default(),
runtime,
session: Default::default(),
}
}
}

impl Context {
Expand All @@ -87,78 +59,46 @@ impl Context {
self.vars.keys().cloned().collect()
}

/// Returns the plan associated with the given variable.
fn get_plan(&self, name: &str) -> Option<LogicalPlan> {
self.vars.get(name).cloned()
}

/// Returns the active dataframe or group columns.
fn columns(&self) -> &Vec<String> {
&self.columns
}

/// Returns datafusion context
fn session(&self) -> &SessionContext {
&self.session
}

/// Returns the current format configuration
fn format_config(&self) -> &FormatConfig {
&self.format_config
}

async fn create_physical_plan(
&self,
logical_plan: &LogicalPlan,
) -> Result<Arc<dyn ExecutionPlan>> {
let plan = self
.session
.state()
.create_physical_plan(logical_plan)
.await?;
Ok(plan)
}

/// Returns datafusion context
fn block_on<F: Future>(&self, future: F) -> F::Output {
self.runtime.block_on(future)
}

/// Clear the context removing the active group and dataframe.
fn clear(&mut self) {
self.plan = None;
self.df = None;
self.group = None;
}

/// Returns and consume the input dataframe.
fn take_plan(&mut self) -> Option<LogicalPlan> {
self.plan.take()
fn take_df(&mut self) -> Option<LazyFrame> {
self.df.take()
}

/// Sets the dataframe to be used in pipeline steps.
fn set_plan(&mut self, plan: LogicalPlan) {
fn set_df(&mut self, df: LazyFrame) -> Result<()> {
assert!(self.group.is_none());

// Get unqualified column names.
self.columns = plan
self.columns = df
.schema()
.fields()
.iter()
.map(|f| f.name().to_owned())
.collect();
.map_err(|e| anyhow!("Schema error: {e}"))?
.iter_names()
.map(|s| s.to_string())
.collect::<Vec<_>>();

self.update_completions();

self.plan = Some(plan);
self.df = Some(df);
Ok(())
}

/// Sets the grouping columns used for aggregation.
fn set_group(&mut self, plan: LogicalPlan, group: Vec<DFExpr>) {
self.set_plan(plan);
self.group = Some(group);
/// Returns the dataframe associated to the given variable.
fn get_df(&self, name: &str) -> Option<&LazyFrame> {
self.vars.get(name)
}

fn take_group(&mut self) -> Option<Vec<DFExpr>> {
/// Returns and consume the active group.
fn take_group(&mut self) -> Option<LazyGroupBy> {
self.group.take()
}

Expand All @@ -167,30 +107,67 @@ impl Context {
self.group.is_some()
}

/// Sets the active group.
fn set_group(&mut self, group: LazyGroupBy) -> Result<()> {
assert!(self.df.is_none());

self.columns = group
.logical_plan
.schema()
.map_err(|e| anyhow!("Schema error: {e}"))?
.iter_names()
.map(|s| s.to_string())
.collect::<Vec<_>>();

self.update_completions();

self.group = Some(group);
Ok(())
}

fn update_completions(&mut self) {
self.completions.add(&self.columns);
}

fn show(&mut self, plan: LogicalPlan) -> Result<()> {
if let Some(mut output) = self.output.take() {
self.runtime.block_on(fmt::test(self, plan, &mut output))?;
self.output = Some(output);
Ok(())
/// Print results to the context output.
fn print(&mut self, df: DataFrame) -> Result<()> {
self.set_fmt();

if let Some(write) = self.output.as_mut() {
fmt::df_test(write, df)?;
} else {
self.runtime.block_on(fmt::show(self, plan))
println!("{df}");
}
Ok(())
}

fn glimpse(&mut self, plan: LogicalPlan) -> Result<()> {
if let Some(mut output) = self.output.take() {
self.runtime
.block_on(fmt::glimpse(self, plan, &mut output))?;
self.output = Some(output);
Ok(())
/// Show a glimpse view of the datafrmae.
fn glimpse(&mut self, df: LazyFrame) -> Result<()> {
self.set_fmt();

if let Some(write) = self.output.as_mut() {
fmt::glimpse(write, df)?;
} else {
let output = &mut std::io::stdout();
self.runtime.block_on(fmt::glimpse(self, plan, output))
fmt::glimpse(&mut std::io::stdout(), df)?;
}

Ok(())
}

fn set_fmt(&self) {
if let Some(w) = self.format_config.max_table_width {
std::env::set_var("POLARS_TABLE_WIDTH", w.to_string());
}

std::env::set_var(
"POLARS_FMT_MAX_COLS",
self.format_config.max_columns.to_string(),
);

std::env::set_var(
"POLARS_FMT_STR_LEN",
self.format_config.max_column_width.to_string(),
);
}
}

Expand All @@ -205,7 +182,7 @@ pub fn eval_to_string(exprs: &[Expr]) -> Result<String> {
let mut ctx = Context {
output: Some(Default::default()),
format_config: FormatConfig {
max_table_width: Some(82),
max_column_width: 82,
..Default::default()
},
..Default::default()
Expand Down Expand Up @@ -233,22 +210,22 @@ fn eval_pipelines(exprs: &[Expr], ctx: &mut Context) -> Result<()> {
fn eval_pipeline_step(expr: &Expr, ctx: &mut Context) -> Result<()> {
match expr {
Expr::Function(name, args) => match name.as_str() {
"anti_join" => joins::eval(args, ctx, joins::JoinType::Anti)?,
"anti_join" => joins::eval(args, ctx, JoinType::Anti)?,
"arrange" => arrange::eval(args, ctx)?,
"config" => config::eval(args, ctx)?,
"count" => count::eval(args, ctx)?,
"cross_join" => joins::eval(args, ctx, joins::JoinType::Cross)?,
"cross_join" => joins::eval(args, ctx, JoinType::Cross)?,
"csv" => csv::eval(args, ctx)?,
"distinct" => distinct::eval(args, ctx)?,
"filter" => filter::eval(args, ctx)?,
"glimpse" => glimpse::eval(args, ctx)?,
"group_by" => group_by::eval(args, ctx)?,
"head" => head::eval(args, ctx)?,
"inner_join" => joins::eval(args, ctx, joins::JoinType::Inner)?,
"inner_join" => joins::eval(args, ctx, JoinType::Inner)?,
"json" => json::eval(args, ctx)?,
"left_join" => joins::eval(args, ctx, joins::JoinType::Left)?,
"left_join" => joins::eval(args, ctx, JoinType::Left)?,
"mutate" => mutate::eval(args, ctx)?,
"outer_join" => joins::eval(args, ctx, joins::JoinType::Outer)?,
"outer_join" => joins::eval(args, ctx, JoinType::Outer { coalesce: true })?,
"parquet" => parquet::eval(args, ctx)?,
"relocate" => relocate::eval(args, ctx)?,
"rename" => rename::eval(args, ctx)?,
Expand All @@ -260,11 +237,11 @@ fn eval_pipeline_step(expr: &Expr, ctx: &mut Context) -> Result<()> {
},
Expr::Identifier(name) => {
// If there is an input assign it to the variable.
if let Some(plan) = ctx.take_plan() {
if let Some(plan) = ctx.take_df() {
ctx.vars.insert(name.to_owned(), plan.clone());
ctx.set_plan(plan);
ctx.set_df(plan)?;
} else if let Some(plan) = ctx.vars.get(name) {
ctx.set_plan(plan.clone());
ctx.set_df(plan.clone())?;
} else if ctx.is_grouping() {
bail!("Cannot assign a group to variable '{name}'");
} else {
Expand Down
Loading

0 comments on commit 0626169

Please sign in to comment.