From 9b1e4d3a5fff8726cf2483b5ba2197e1796e5acc Mon Sep 17 00:00:00 2001 From: Xin Li Date: Fri, 17 May 2024 09:05:13 +0000 Subject: [PATCH 1/4] Add examples of how to convert logical plan to/from sql strings --- datafusion-examples/examples/plan_to_sql.rs | 135 +++++++++++++++++++- 1 file changed, 134 insertions(+), 1 deletion(-) diff --git a/datafusion-examples/examples/plan_to_sql.rs b/datafusion-examples/examples/plan_to_sql.rs index 3915d3991f76..1e56be30afda 100644 --- a/datafusion-examples/examples/plan_to_sql.rs +++ b/datafusion-examples/examples/plan_to_sql.rs @@ -20,7 +20,7 @@ use datafusion::error::Result; use datafusion::prelude::*; use datafusion::sql::unparser::expr_to_sql; use datafusion_sql::unparser::dialect::CustomDialect; -use datafusion_sql::unparser::Unparser; +use datafusion_sql::unparser::{plan_to_sql, Unparser}; /// This example demonstrates the programmatic construction of /// SQL using the DataFusion Expr [`Expr`] and LogicalPlan [`LogicalPlan`] API. @@ -41,6 +41,10 @@ async fn main() -> Result<()> { simple_expr_to_sql_demo()?; simple_expr_to_sql_demo_no_escape()?; simple_expr_to_sql_demo_escape_mysql_style()?; + simple_plan_to_sql_parquest_dataframe_demo().await?; + simple_plan_to_sql_csv_dataframe_demo().await?; + round_trip_plan_to_sql_parquest_dataframe_demo().await?; + round_trip_plan_to_sql_csv_dataframe_demo().await?; Ok(()) } @@ -77,3 +81,132 @@ fn simple_expr_to_sql_demo_escape_mysql_style() -> Result<()> { assert_eq!(sql, r#"((`a` < 5) OR (`a` = 8))"#); Ok(()) } + +/// DataFusion can convert a logic plan created using the DataFrames API to read from a parquet file +/// to SQL, using column name escaping PostgreSQL style. +async fn simple_plan_to_sql_parquest_dataframe_demo() -> Result<()> { + // create local execution context + let ctx = SessionContext::new(); // define the query using the DataFrame trait + + let testdata = datafusion::test_util::parquet_test_data(); + let df = ctx + .read_parquet( + &format!("{testdata}/alltypes_plain.parquet"), + ParquetReadOptions::default(), + ) + .await? + .select_columns(&["id", "int_col", "double_col", "date_string_col"])?; + + let ast = plan_to_sql(&df.logical_plan())?; + + let sql = format!("{}", ast); + + assert_eq!( + sql, + r#"SELECT "?table?"."id", "?table?"."int_col", "?table?"."double_col", "?table?"."date_string_col" FROM "?table?""# + ); + + Ok(()) +} + +/// DataFusion can convert a logic plan created using the DataFrames API to read from a csv file +/// to SQL, using column name escaping PostgreSQL style. +async fn simple_plan_to_sql_csv_dataframe_demo() -> Result<()> { + // create local execution context + let ctx = SessionContext::new(); // define the query using the DataFrame trait + + let testdata = datafusion::test_util::arrow_test_data(); + let df = ctx + .read_csv( + &format!("{testdata}/csv/aggregate_test_100.csv"), + CsvReadOptions::default(), + ) + .await? + .select(vec![col("c1"), min(col("c12")), max(col("c12"))])?; + + let ast = plan_to_sql(&df.logical_plan())?; + + let sql = format!("{}", ast); + + assert_eq!( + sql, + r#"SELECT "?table?"."c1", MIN("?table?"."c12"), MAX("?table?"."c12") FROM "?table?""# + ); + + Ok(()) +} + +async fn round_trip_plan_to_sql_parquest_dataframe_demo() -> Result<()> { + // create local execution context + let ctx = SessionContext::new(); // define the query using the DataFrame trait + + let testdata = datafusion::test_util::parquet_test_data(); + + // register parquet file with the execution context + ctx.register_parquet( + "alltypes_plain", + &format!("{testdata}/alltypes_plain.parquet"), + ParquetReadOptions::default(), + ) + .await?; + + // execute the query + let df = ctx + .sql( + "SELECT int_col, double_col, CAST(date_string_col as VARCHAR) \ + FROM alltypes_plain", + ) + .await? + .filter( + col("id") + .gt(lit(1)) + .and(col("tinyint_col").lt(col("double_col"))), + )?; + + let ast = plan_to_sql(&df.logical_plan())?; + + let sql = format!("{}", ast); + + assert_eq!( + sql, + r#"SELECT "alltypes_plain"."int_col", "alltypes_plain"."double_col", CAST("alltypes_plain"."date_string_col" AS VARCHAR) FROM "alltypes_plain" WHERE (("alltypes_plain"."id" > 1) AND ("alltypes_plain"."tinyint_col" < "alltypes_plain"."double_col"))"# + ); + + Ok(()) +} + +async fn round_trip_plan_to_sql_csv_dataframe_demo() -> Result<()> { + // create local execution context + let ctx = SessionContext::new(); // define the query using the DataFrame trait + + let testdata = datafusion::test_util::arrow_test_data(); + + // register parquet file with the execution context + ctx.register_csv( + "aggregate_test_100", + &format!("{testdata}/csv/aggregate_test_100.csv"), + CsvReadOptions::default(), + ) + .await?; + + // execute the query + let df = ctx + .sql( + "SELECT c1, MIN(c12), MAX(c12) \ + FROM aggregate_test_100 + GROUP BY c1", + ) + .await? + .filter(col("c1").gt(lit(0.1)).and(col("c1").lt(lit(0.9))))?; + + let ast = plan_to_sql(&df.logical_plan())?; + + let sql = format!("{}", ast); + + assert_eq!( + sql, + r#"SELECT "aggregate_test_100"."c1", MIN("aggregate_test_100"."c12"), MAX("aggregate_test_100"."c12") FROM "aggregate_test_100" GROUP BY "aggregate_test_100"."c1" HAVING (("aggregate_test_100"."c1" > 0.1) AND ("aggregate_test_100"."c1" < 0.9))"# + ); + + Ok(()) +} From 7d4e90c526143c8c1a83759101f53e7699feb940 Mon Sep 17 00:00:00 2001 From: Xin Li Date: Fri, 17 May 2024 11:46:30 +0000 Subject: [PATCH 2/4] Fix clippy --- datafusion-examples/examples/plan_to_sql.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/datafusion-examples/examples/plan_to_sql.rs b/datafusion-examples/examples/plan_to_sql.rs index 1e56be30afda..cdffea0972cd 100644 --- a/datafusion-examples/examples/plan_to_sql.rs +++ b/datafusion-examples/examples/plan_to_sql.rs @@ -97,7 +97,7 @@ async fn simple_plan_to_sql_parquest_dataframe_demo() -> Result<()> { .await? .select_columns(&["id", "int_col", "double_col", "date_string_col"])?; - let ast = plan_to_sql(&df.logical_plan())?; + let ast = plan_to_sql(df.logical_plan())?; let sql = format!("{}", ast); @@ -124,7 +124,7 @@ async fn simple_plan_to_sql_csv_dataframe_demo() -> Result<()> { .await? .select(vec![col("c1"), min(col("c12")), max(col("c12"))])?; - let ast = plan_to_sql(&df.logical_plan())?; + let ast = plan_to_sql(df.logical_plan())?; let sql = format!("{}", ast); @@ -163,7 +163,7 @@ async fn round_trip_plan_to_sql_parquest_dataframe_demo() -> Result<()> { .and(col("tinyint_col").lt(col("double_col"))), )?; - let ast = plan_to_sql(&df.logical_plan())?; + let ast = plan_to_sql(df.logical_plan())?; let sql = format!("{}", ast); @@ -199,7 +199,7 @@ async fn round_trip_plan_to_sql_csv_dataframe_demo() -> Result<()> { .await? .filter(col("c1").gt(lit(0.1)).and(col("c1").lt(lit(0.9))))?; - let ast = plan_to_sql(&df.logical_plan())?; + let ast = plan_to_sql(df.logical_plan())?; let sql = format!("{}", ast); From c09b798f27bf84ac87dbfb5467c8e48032cac767 Mon Sep 17 00:00:00 2001 From: Xin Li Date: Sat, 18 May 2024 08:42:22 +0000 Subject: [PATCH 3/4] fix comments --- datafusion-examples/examples/plan_to_sql.rs | 70 ++------------------- 1 file changed, 4 insertions(+), 66 deletions(-) diff --git a/datafusion-examples/examples/plan_to_sql.rs b/datafusion-examples/examples/plan_to_sql.rs index cdffea0972cd..61b822db913a 100644 --- a/datafusion-examples/examples/plan_to_sql.rs +++ b/datafusion-examples/examples/plan_to_sql.rs @@ -86,7 +86,7 @@ fn simple_expr_to_sql_demo_escape_mysql_style() -> Result<()> { /// to SQL, using column name escaping PostgreSQL style. async fn simple_plan_to_sql_parquest_dataframe_demo() -> Result<()> { // create local execution context - let ctx = SessionContext::new(); // define the query using the DataFrame trait + let ctx = SessionContext::new(); let testdata = datafusion::test_util::parquet_test_data(); let df = ctx @@ -109,36 +109,10 @@ async fn simple_plan_to_sql_parquest_dataframe_demo() -> Result<()> { Ok(()) } -/// DataFusion can convert a logic plan created using the DataFrames API to read from a csv file -/// to SQL, using column name escaping PostgreSQL style. -async fn simple_plan_to_sql_csv_dataframe_demo() -> Result<()> { - // create local execution context - let ctx = SessionContext::new(); // define the query using the DataFrame trait - - let testdata = datafusion::test_util::arrow_test_data(); - let df = ctx - .read_csv( - &format!("{testdata}/csv/aggregate_test_100.csv"), - CsvReadOptions::default(), - ) - .await? - .select(vec![col("c1"), min(col("c12")), max(col("c12"))])?; - - let ast = plan_to_sql(df.logical_plan())?; - - let sql = format!("{}", ast); - - assert_eq!( - sql, - r#"SELECT "?table?"."c1", MIN("?table?"."c12"), MAX("?table?"."c12") FROM "?table?""# - ); - - Ok(()) -} - +// DataFusion could parse a SQL into a DataFrame, adding a Filter, and converting that back to sql. async fn round_trip_plan_to_sql_parquest_dataframe_demo() -> Result<()> { // create local execution context - let ctx = SessionContext::new(); // define the query using the DataFrame trait + let ctx = SessionContext::new(); let testdata = datafusion::test_util::parquet_test_data(); @@ -150,7 +124,7 @@ async fn round_trip_plan_to_sql_parquest_dataframe_demo() -> Result<()> { ) .await?; - // execute the query + // create a logical plan from a SQL string and then programmatically add new filters let df = ctx .sql( "SELECT int_col, double_col, CAST(date_string_col as VARCHAR) \ @@ -174,39 +148,3 @@ async fn round_trip_plan_to_sql_parquest_dataframe_demo() -> Result<()> { Ok(()) } - -async fn round_trip_plan_to_sql_csv_dataframe_demo() -> Result<()> { - // create local execution context - let ctx = SessionContext::new(); // define the query using the DataFrame trait - - let testdata = datafusion::test_util::arrow_test_data(); - - // register parquet file with the execution context - ctx.register_csv( - "aggregate_test_100", - &format!("{testdata}/csv/aggregate_test_100.csv"), - CsvReadOptions::default(), - ) - .await?; - - // execute the query - let df = ctx - .sql( - "SELECT c1, MIN(c12), MAX(c12) \ - FROM aggregate_test_100 - GROUP BY c1", - ) - .await? - .filter(col("c1").gt(lit(0.1)).and(col("c1").lt(lit(0.9))))?; - - let ast = plan_to_sql(df.logical_plan())?; - - let sql = format!("{}", ast); - - assert_eq!( - sql, - r#"SELECT "aggregate_test_100"."c1", MIN("aggregate_test_100"."c12"), MAX("aggregate_test_100"."c12") FROM "aggregate_test_100" GROUP BY "aggregate_test_100"."c1" HAVING (("aggregate_test_100"."c1" > 0.1) AND ("aggregate_test_100"."c1" < 0.9))"# - ); - - Ok(()) -} From de77e5db4cfd12eafaaae4967217a751c5c4bd0b Mon Sep 17 00:00:00 2001 From: Xin Li Date: Sat, 18 May 2024 08:43:16 +0000 Subject: [PATCH 4/4] fix comments --- datafusion-examples/examples/plan_to_sql.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/datafusion-examples/examples/plan_to_sql.rs b/datafusion-examples/examples/plan_to_sql.rs index 61b822db913a..0e9485ba7fbe 100644 --- a/datafusion-examples/examples/plan_to_sql.rs +++ b/datafusion-examples/examples/plan_to_sql.rs @@ -42,9 +42,7 @@ async fn main() -> Result<()> { simple_expr_to_sql_demo_no_escape()?; simple_expr_to_sql_demo_escape_mysql_style()?; simple_plan_to_sql_parquest_dataframe_demo().await?; - simple_plan_to_sql_csv_dataframe_demo().await?; round_trip_plan_to_sql_parquest_dataframe_demo().await?; - round_trip_plan_to_sql_csv_dataframe_demo().await?; Ok(()) }