Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add round trip test of physical plan in tpch unit tests #6918

Merged
merged 1 commit into from
Jul 12, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
223 changes: 106 additions & 117 deletions benchmarks/src/bin/tpch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -394,9 +394,24 @@ mod tests {
use std::path::Path;

use super::*;
use datafusion_proto::bytes::{logical_plan_from_bytes, logical_plan_to_bytes};
use datafusion_proto::bytes::{
logical_plan_from_bytes, logical_plan_to_bytes, physical_plan_from_bytes,
physical_plan_to_bytes,
};

fn get_tpch_data_path() -> Result<String> {
let path =
std::env::var("TPCH_DATA").unwrap_or_else(|_| "benchmarks/data".to_string());
if !Path::new(&path).exists() {
return Err(DataFusionError::Execution(format!(
"Benchmark data not found (set TPCH_DATA env var to override): {}",
path
)));
}
Ok(path)
}

async fn serde_round_trip(query: usize) -> Result<()> {
async fn round_trip_logical_plan(query: usize) -> Result<()> {
let ctx = SessionContext::default();
let path = get_tpch_data_path()?;
let opt = DataFusionBenchmarkOpt {
Expand Down Expand Up @@ -425,125 +440,99 @@ mod tests {
Ok(())
}

#[tokio::test]
async fn serde_q1() -> Result<()> {
serde_round_trip(1).await
}

#[tokio::test]
async fn serde_q2() -> Result<()> {
serde_round_trip(2).await
}

#[tokio::test]
async fn serde_q3() -> Result<()> {
serde_round_trip(3).await
}

#[tokio::test]
async fn serde_q4() -> Result<()> {
serde_round_trip(4).await
}

#[tokio::test]
async fn serde_q5() -> Result<()> {
serde_round_trip(5).await
}

#[tokio::test]
async fn serde_q6() -> Result<()> {
serde_round_trip(6).await
}

#[tokio::test]
async fn serde_q7() -> Result<()> {
serde_round_trip(7).await
}

#[tokio::test]
async fn serde_q8() -> Result<()> {
serde_round_trip(8).await
}

#[tokio::test]
async fn serde_q9() -> Result<()> {
serde_round_trip(9).await
}

#[tokio::test]
async fn serde_q10() -> Result<()> {
serde_round_trip(10).await
}

#[tokio::test]
async fn serde_q11() -> Result<()> {
serde_round_trip(11).await
}

#[tokio::test]
async fn serde_q12() -> Result<()> {
serde_round_trip(12).await
}

#[tokio::test]
async fn serde_q13() -> Result<()> {
serde_round_trip(13).await
}

#[tokio::test]
async fn serde_q14() -> Result<()> {
serde_round_trip(14).await
}

#[tokio::test]
async fn serde_q15() -> Result<()> {
serde_round_trip(15).await
}

#[tokio::test]
async fn serde_q16() -> Result<()> {
serde_round_trip(16).await
}

#[tokio::test]
async fn serde_q17() -> Result<()> {
serde_round_trip(17).await
}

#[tokio::test]
async fn serde_q18() -> Result<()> {
serde_round_trip(18).await
}

#[tokio::test]
async fn serde_q19() -> Result<()> {
serde_round_trip(19).await
}

#[tokio::test]
async fn serde_q20() -> Result<()> {
serde_round_trip(20).await
async fn round_trip_physical_plan(query: usize) -> Result<()> {
let ctx = SessionContext::default();
let path = get_tpch_data_path()?;
let opt = DataFusionBenchmarkOpt {
query: Some(query),
debug: false,
iterations: 1,
partitions: 2,
batch_size: 8192,
path: PathBuf::from(path.to_string()),
file_format: "tbl".to_string(),
mem_table: false,
output_path: None,
disable_statistics: false,
};
register_tables(&opt, &ctx).await?;
let queries = get_query_sql(query)?;
for query in queries {
let plan = ctx.sql(&query).await?;
let plan = plan.create_physical_plan().await?;
let bytes = physical_plan_to_bytes(plan.clone())?;
let plan2 = physical_plan_from_bytes(&bytes, &ctx)?;
let plan_formatted = format!("{}", displayable(plan.as_ref()).indent(false));
let plan2_formatted =
format!("{}", displayable(plan2.as_ref()).indent(false));
assert_eq!(plan_formatted, plan2_formatted);
}
Ok(())
}

#[tokio::test]
async fn serde_q21() -> Result<()> {
serde_round_trip(21).await
macro_rules! test_round_trip_logical {
($tn:ident, $query:expr) => {
#[tokio::test]
async fn $tn() -> Result<()> {
round_trip_logical_plan($query).await
}
};
}

#[tokio::test]
async fn serde_q22() -> Result<()> {
serde_round_trip(22).await
macro_rules! test_round_trip_physical {
($tn:ident, $query:expr) => {
#[tokio::test]
async fn $tn() -> Result<()> {
round_trip_physical_plan($query).await
}
};
}

fn get_tpch_data_path() -> Result<String> {
let path =
std::env::var("TPCH_DATA").unwrap_or_else(|_| "benchmarks/data".to_string());
if !Path::new(&path).exists() {
return Err(DataFusionError::Execution(format!(
"Benchmark data not found (set TPCH_DATA env var to override): {}",
path
)));
}
Ok(path)
}
// logical plan tests
test_round_trip_logical!(round_trip_logical_plan_q1, 1);
test_round_trip_logical!(round_trip_logical_plan_q2, 2);
test_round_trip_logical!(round_trip_logical_plan_q3, 3);
test_round_trip_logical!(round_trip_logical_plan_q4, 4);
test_round_trip_logical!(round_trip_logical_plan_q5, 5);
test_round_trip_logical!(round_trip_logical_plan_q6, 6);
test_round_trip_logical!(round_trip_logical_plan_q7, 7);
test_round_trip_logical!(round_trip_logical_plan_q8, 8);
test_round_trip_logical!(round_trip_logical_plan_q9, 9);
test_round_trip_logical!(round_trip_logical_plan_q10, 10);
test_round_trip_logical!(round_trip_logical_plan_q11, 11);
test_round_trip_logical!(round_trip_logical_plan_q12, 12);
test_round_trip_logical!(round_trip_logical_plan_q13, 13);
test_round_trip_logical!(round_trip_logical_plan_q14, 14);
test_round_trip_logical!(round_trip_logical_plan_q15, 15);
test_round_trip_logical!(round_trip_logical_plan_q16, 16);
test_round_trip_logical!(round_trip_logical_plan_q17, 17);
test_round_trip_logical!(round_trip_logical_plan_q18, 18);
test_round_trip_logical!(round_trip_logical_plan_q19, 19);
test_round_trip_logical!(round_trip_logical_plan_q20, 20);
test_round_trip_logical!(round_trip_logical_plan_q21, 21);
test_round_trip_logical!(round_trip_logical_plan_q22, 22);

// physical plan tests
test_round_trip_physical!(round_trip_physical_plan_q1, 1);
test_round_trip_physical!(round_trip_physical_plan_q2, 2);
test_round_trip_physical!(round_trip_physical_plan_q3, 3);
test_round_trip_physical!(round_trip_physical_plan_q4, 4);
test_round_trip_physical!(round_trip_physical_plan_q5, 5);
test_round_trip_physical!(round_trip_physical_plan_q6, 6);
test_round_trip_physical!(round_trip_physical_plan_q7, 7);
test_round_trip_physical!(round_trip_physical_plan_q8, 8);
test_round_trip_physical!(round_trip_physical_plan_q9, 9);
test_round_trip_physical!(round_trip_physical_plan_q10, 10);
test_round_trip_physical!(round_trip_physical_plan_q11, 11);
test_round_trip_physical!(round_trip_physical_plan_q12, 12);
test_round_trip_physical!(round_trip_physical_plan_q13, 13);
test_round_trip_physical!(round_trip_physical_plan_q14, 14);
test_round_trip_physical!(round_trip_physical_plan_q15, 15);
test_round_trip_physical!(round_trip_physical_plan_q16, 16);
test_round_trip_physical!(round_trip_physical_plan_q17, 17);
test_round_trip_physical!(round_trip_physical_plan_q18, 18);
test_round_trip_physical!(round_trip_physical_plan_q19, 19);
test_round_trip_physical!(round_trip_physical_plan_q20, 20);
test_round_trip_physical!(round_trip_physical_plan_q21, 21);
test_round_trip_physical!(round_trip_physical_plan_q22, 22);
}