Skip to content

Commit

Permalink
chore(cubestore): Improved detection of suboptimal queries (cube-js#7812
Browse files Browse the repository at this point in the history
)
  • Loading branch information
RusovDmitriy authored and JichaoS committed May 7, 2024
1 parent fff6722 commit 4969902
Show file tree
Hide file tree
Showing 2 changed files with 205 additions and 9 deletions.
50 changes: 50 additions & 0 deletions rust/cubestore/cubestore-sql-tests/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use crate::rows::{rows, NULL};
use crate::SqlClient;
use async_compression::tokio::write::GzipEncoder;
use cubestore::metastore::{Column, ColumnType};
use cubestore::queryplanner::physical_plan_flags::PhysicalPlanFlags;
use cubestore::queryplanner::pretty_printers::{pp_phys_plan, pp_phys_plan_ext, PPOptions};
use cubestore::queryplanner::MIN_TOPK_STREAM_ROWS;
use cubestore::sql::{timestamp_from_string, InlineTable, SqlQueryContext};
Expand Down Expand Up @@ -132,6 +133,7 @@ pub fn sql_tests() -> Vec<(&'static str, TestFn)> {
t("hyperloglog_inplace_group_by", hyperloglog_inplace_group_by),
t("hyperloglog_postgres", hyperloglog_postgres),
t("hyperloglog_snowflake", hyperloglog_snowflake),
t("physical_plan_flags", physical_plan_flags),
t("planning_inplace_aggregate", planning_inplace_aggregate),
t("planning_hints", planning_hints),
t("planning_inplace_aggregate2", planning_inplace_aggregate2),
Expand Down Expand Up @@ -2772,6 +2774,54 @@ async fn hyperloglog_snowflake(service: Box<dyn SqlClient>) {
.unwrap_err();
}

async fn physical_plan_flags(service: Box<dyn SqlClient>) {
service.exec_query("CREATE SCHEMA s").await.unwrap();
service
.exec_query("CREATE PARTITIONED INDEX s.ind(url text, day text, category text)")
.await
.unwrap();
service
.exec_query(
"CREATE TABLE s.Data(url text, day text, category text, hits int, clicks int) \
ADD TO PARTITIONED INDEX s.ind(url, day, category)",
)
.await
.unwrap();

// (query, is_optimal)
let cases = vec![
("SELECT SUM(hits) FROM s.Data", true),
("SELECT SUM(hits) FROM s.Data WHERE url = 'test'", true),
("SELECT SUM(hits) FROM s.Data WHERE url = 'test' AND day > 'test'", true),
("SELECT SUM(hits) FROM s.Data WHERE day = 'test'", false),
("SELECT SUM(hits) FROM s.Data WHERE url = 'test' AND day = 'test'", true),
("SELECT SUM(hits) FROM s.Data WHERE url = 'test' AND category = 'test'", false),
("SELECT SUM(hits) FROM s.Data WHERE url = 'test' OR url = 'test_2'", true),
("SELECT SUM(hits) FROM s.Data WHERE url = 'test' OR category = 'test'", false),
("SELECT SUM(hits) FROM s.Data WHERE (url = 'test' AND day = 'test') OR (url = 'test' AND category = 'test')", false),
("SELECT SUM(hits) FROM s.Data WHERE (url = 'test' AND day = 'test') OR (url = 'test_1' OR url = 'test_2')", true),
("SELECT SUM(hits) FROM s.Data WHERE (url = 'test' AND day = 'test') OR (url = 'test_1' OR day = 'test_2')", false),
("SELECT SUM(hits) FROM s.Data WHERE (url = 'test' AND day = 'test') OR (url = 'test_1' OR day > 'test_2')", true),
("SELECT SUM(hits) FROM s.Data WHERE url IN ('test_1', 'test_2')", false),
("SELECT SUM(hits) FROM s.Data WHERE url IS NOT NULL", false),
("SELECT SUM(hits), url FROM s.Data GROUP BY url", true),
("SELECT SUM(hits), url, day FROM s.Data GROUP BY url, day", true),
("SELECT SUM(hits), day FROM s.Data GROUP BY day", false),
("SELECT SUM(hits), day, category FROM s.Data GROUP BY day, category", false),
];

for (query, expected_optimal) in cases {
let p = service.plan_query(query).await.unwrap();
let flags = PhysicalPlanFlags::with_execution_plan(p.router.as_ref());
assert_eq!(
flags.is_suboptimal_query(),
!expected_optimal,
"Query failed: {}",
query
);
}
}

async fn planning_inplace_aggregate(service: Box<dyn SqlClient>) {
service.exec_query("CREATE SCHEMA s").await.unwrap();
service
Expand Down
164 changes: 155 additions & 9 deletions rust/cubestore/cubestore/src/queryplanner/physical_plan_flags.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,27 @@
use datafusion::logical_plan::Operator;
use datafusion::physical_plan::expressions::{BinaryExpr, CastExpr, Column, Literal, TryCastExpr};
use datafusion::physical_plan::filter::FilterExec;
use datafusion::physical_plan::hash_aggregate::{
AggregateMode, AggregateStrategy, HashAggregateExec,
};
use datafusion::physical_plan::ExecutionPlan;
use datafusion::physical_plan::merge::MergeExec;
use datafusion::physical_plan::merge_sort::MergeSortExec;
use datafusion::physical_plan::{ExecutionPlan, PhysicalExpr};

use serde::Serialize;
use serde_json::{json, Value};

use crate::queryplanner::query_executor::CubeTableExec;

#[derive(Serialize, Debug)]
pub struct PhysicalPlanFlags {
pub merge_sort_plan: bool,
pub predicate_sorted: Option<bool>,
}

impl PhysicalPlanFlags {
pub fn enough_to_fill(&self) -> bool {
self.merge_sort_plan
}

pub fn is_suboptimal_query(&self) -> bool {
!self.merge_sort_plan
!self.merge_sort_plan || self.predicate_sorted == Some(false)
}

pub fn to_json(&self) -> Value {
Expand All @@ -27,6 +31,7 @@ impl PhysicalPlanFlags {
pub fn with_execution_plan(p: &dyn ExecutionPlan) -> Self {
let mut flags = PhysicalPlanFlags {
merge_sort_plan: false,
predicate_sorted: None,
};
PhysicalPlanFlags::physical_plan_flags_fill(p, &mut flags);
flags
Expand All @@ -48,12 +53,153 @@ impl PhysicalPlanFlags {
if is_final_hash_agg_without_groups || is_full_inplace_agg || is_final_inplace_agg {
flags.merge_sort_plan = true;
}

// Stop the recursion if we have an optimal plan with groups, otherwise continue to check the children, filters for example
if agg.group_expr().len() > 0 && flags.merge_sort_plan {
return;
}
} else if let Some(f) = a.downcast_ref::<FilterExec>() {
// Stop the recursion if we found a filter and if plan already suboptimal or predicate flag is already set
if flags.merge_sort_plan == false || flags.predicate_sorted.is_some() {
return;
}

let predicate = f.predicate();
let predicate_column_groups = extract_columns_with_operators(predicate.as_ref());
let input = f.input();

let maybe_input_exec = input
.as_any()
.downcast_ref::<MergeExec>()
.map(|exec| exec.input().as_any())
.or_else(|| {
input
.as_any()
.downcast_ref::<MergeSortExec>()
.map(|exec| exec.input().as_any())
});

if let Some(input_exec_any) = maybe_input_exec {
if let Some(cte) = input_exec_any.downcast_ref::<CubeTableExec>() {
let index_columns = cte.index_snapshot.index.row.columns();
flags.predicate_sorted = Some(check_predicate_order(
predicate_column_groups,
index_columns,
));
}
}
}
if flags.enough_to_fill() {
return;
}

for child in p.children() {
PhysicalPlanFlags::physical_plan_flags_fill(child.as_ref(), flags);
}
}
}

fn check_predicate_order(
predicate_column_groups: Vec<Vec<(&Column, &Operator)>>,
index_columns: &Vec<crate::metastore::Column>,
) -> bool {
let index_column_names: Vec<String> = index_columns
.into_iter()
.map(|c| c.get_name().clone())
.collect();

'group_loop: for group in predicate_column_groups.iter() {
if group.len() == 0 {
// No columns in the group means a non-binary expression (InListExpr, IsNullExpr etc.)
// Which is suboptimal for now
return false;
}

let eq_column_names: Vec<String> = group
.iter()
.filter_map(|(col, op)| {
if matches!(op, Operator::Eq) {
Some(col.name().clone().to_string())
} else {
None
}
})
.collect();

let mut checked_length = 0;
for index_name in &index_column_names {
if eq_column_names.contains(index_name) {
checked_length += 1;
} else {
if eq_column_names.len() > checked_length {
return false;
}
continue 'group_loop;
}
}
}

true
}

fn extract_columns_with_operators(predicate: &dyn PhysicalExpr) -> Vec<Vec<(&Column, &Operator)>> {
let mut columns = Vec::new();
extract_columns_with_operators_impl(predicate, &mut columns, true);
columns
}

fn extract_columns_with_operators_impl<'a>(
predicate: &'a dyn PhysicalExpr,
out: &mut Vec<Vec<(&'a Column, &'a Operator)>>,
is_root: bool,
) {
let is_constant = |mut e: &dyn PhysicalExpr| loop {
if e.as_any().is::<Literal>() {
return true;
} else if let Some(c) = e.as_any().downcast_ref::<CastExpr>() {
e = c.expr().as_ref();
} else if let Some(c) = e.as_any().downcast_ref::<TryCastExpr>() {
e = c.expr().as_ref();
} else {
return false;
}
};

let predicate = predicate.as_any();
if let Some(binary) = predicate.downcast_ref::<BinaryExpr>() {
match binary.op() {
Operator::Or => {
let conditions = [binary.left().as_ref(), binary.right().as_ref()]
.iter()
.flat_map(|&expr| {
let mut sub_conditions = Vec::new();
extract_columns_with_operators_impl(expr, &mut sub_conditions, false);
sub_conditions
})
.collect::<Vec<_>>();
if !conditions.is_empty() {
out.extend(conditions);
}
}
Operator::And => {
extract_columns_with_operators_impl(binary.left().as_ref(), out, false);
extract_columns_with_operators_impl(binary.right().as_ref(), out, false);
}
_ => {
let mut left = binary.left();
let mut right = binary.right();
if !left.as_any().is::<Column>() {
std::mem::swap(&mut left, &mut right);
}
let left = left.as_any().downcast_ref::<Column>();
if let Some(column) = left {
if is_constant(right.as_ref()) {
if out.is_empty() || is_root {
out.push(Vec::new());
}
out.last_mut().unwrap().push((column, binary.op()));
}
}
}
}
} else if is_root && out.is_empty() {
out.push(Vec::new());
}
}

0 comments on commit 4969902

Please sign in to comment.