From 060d666281b3e1ddaa3b71dd3f9c0eafd35a40a6 Mon Sep 17 00:00:00 2001 From: Mikhail Cheshkov Date: Fri, 6 Dec 2024 14:38:31 +0200 Subject: [PATCH] refactor(cubesql): Remove Extension from LogicalPlanLanguage::CubeScan (#8775) Extensions is used meaningfully only for CubeScan, but not for CubeScanWrapper. Both are first-class logical plan nodes for us, so Extension is not helpful in egraph --- .../cubesql/src/compile/rewrite/analysis.rs | 1 - .../cubesql/src/compile/rewrite/converter.rs | 995 ++++++++---------- .../cubesql/src/compile/rewrite/mod.rs | 2 +- 3 files changed, 468 insertions(+), 530 deletions(-) diff --git a/rust/cubesql/cubesql/src/compile/rewrite/analysis.rs b/rust/cubesql/cubesql/src/compile/rewrite/analysis.rs index da252cef9e141..f0af8e3396fa3 100644 --- a/rust/cubesql/cubesql/src/compile/rewrite/analysis.rs +++ b/rust/cubesql/cubesql/src/compile/rewrite/analysis.rs @@ -1242,7 +1242,6 @@ impl LogicalPlanAnalysis { Some(c.to_string()) } LogicalPlanLanguage::CubeScan(params) => cube_reference(params[0]), - LogicalPlanLanguage::Extension(params) => cube_reference(params[0]), _ => None, } } diff --git a/rust/cubesql/cubesql/src/compile/rewrite/converter.rs b/rust/cubesql/cubesql/src/compile/rewrite/converter.rs index d1944d123c908..99d5057e706f3 100644 --- a/rust/cubesql/cubesql/src/compile/rewrite/converter.rs +++ b/rust/cubesql/cubesql/src/compile/rewrite/converter.rs @@ -786,12 +786,7 @@ impl LogicalPlanToLanguageConverter { } // TODO LogicalPlan::Extension(ext) => { - if let Some(_cube_scan) = ext.node.as_any().downcast_ref::() { - todo!("LogicalPlanLanguage::Extension"); - // self.graph.add(LogicalPlanLanguage::Extension([])) - } else { - panic!("Unsupported extension node: {}", ext.node.schema()); - } + panic!("Unsupported extension node: {}", ext.node.schema()); } LogicalPlan::Distinct(distinct) => { let input = self.add_logical_plan_replace_params( @@ -1531,559 +1526,505 @@ impl LanguageToLogicalPlanConverter { // panic!("Analyze is not supported"); // } LogicalPlanLanguage::Extension(params) => { - let node = match self.best_expr.index(params[0]) { - LogicalPlanLanguage::CubeScan(cube_scan_params) => { - let alias_to_cube = - match_data_node!(node_by_id, cube_scan_params[0], CubeScanAliasToCube); - let members = - match_list_node!(node_by_id, cube_scan_params[1], CubeScanMembers); - let order = - match_list_node!(node_by_id, cube_scan_params[3], CubeScanOrder); - let wrapped = - match_data_node!(node_by_id, cube_scan_params[8], CubeScanWrapped); - // TODO filters - // TODO - let mut query = V1LoadRequestQuery::new(); - let mut fields = Vec::new(); - let mut query_measures = Vec::new(); - let mut query_time_dimensions = Vec::new(); - let mut query_order = Vec::new(); - let mut query_dimensions = Vec::new(); - - for m in members { - match m { - LogicalPlanLanguage::Measure(measure_params) => { - let measure = match_data_node!( - node_by_id, - measure_params[0], - MeasureName - ); - let expr = self.to_expr(measure_params[1])?; - query_measures.push(measure.to_string()); - let data_type = self - .cube_context - .meta - .find_df_data_type(measure.to_string()) - .ok_or(CubeError::internal(format!( - "Can't find measure '{}'", - measure - )))?; - fields.push(( - DFField::new( - expr_relation(&expr), - &expr_name(&expr)?, - data_type, - true, - ), - MemberField::Member(measure.to_string()), - )); - } - LogicalPlanLanguage::TimeDimension(params) => { - let dimension = - match_data_node!(node_by_id, params[0], TimeDimensionName); - let granularity = match_data_node!( - node_by_id, - params[1], - TimeDimensionGranularity - ); - let date_range = match_data_node!( - node_by_id, - params[2], - TimeDimensionDateRange - ); - let expr = self.to_expr(params[3])?; - let query_time_dimension = V1LoadRequestQueryTimeDimension { - dimension: dimension.to_string(), - granularity: granularity.clone(), - date_range: date_range.map(|date_range| { - serde_json::Value::Array( - date_range - .into_iter() - .map(|d| serde_json::Value::String(d)) - .collect(), - ) - }), - }; - if !query_time_dimensions.contains(&query_time_dimension) { - query_time_dimensions.push(query_time_dimension); - } - if let Some(granularity) = &granularity { - fields.push(( - DFField::new( - expr_relation(&expr), - // TODO empty schema - &expr_name(&expr)?, - DataType::Timestamp(TimeUnit::Nanosecond, None), - true, - ), - MemberField::Member(format!( - "{}.{}", - dimension, granularity - )), - )); - } - } - LogicalPlanLanguage::Dimension(params) => { - let dimension = - match_data_node!(node_by_id, params[0], DimensionName); - let expr = self.to_expr(params[1])?; - let data_type = self + panic!("Unexpected extension node: {:?}", params[0]) + } + LogicalPlanLanguage::CubeScan(cube_scan_params) => { + let alias_to_cube = + match_data_node!(node_by_id, cube_scan_params[0], CubeScanAliasToCube); + let members = match_list_node!(node_by_id, cube_scan_params[1], CubeScanMembers); + let order = match_list_node!(node_by_id, cube_scan_params[3], CubeScanOrder); + let wrapped = match_data_node!(node_by_id, cube_scan_params[8], CubeScanWrapped); + // TODO filters + // TODO + let mut query = V1LoadRequestQuery::new(); + let mut fields = Vec::new(); + let mut query_measures = Vec::new(); + let mut query_time_dimensions = Vec::new(); + let mut query_order = Vec::new(); + let mut query_dimensions = Vec::new(); + + for m in members { + match m { + LogicalPlanLanguage::Measure(measure_params) => { + let measure = + match_data_node!(node_by_id, measure_params[0], MeasureName); + let expr = self.to_expr(measure_params[1])?; + query_measures.push(measure.to_string()); + let data_type = self + .cube_context + .meta + .find_df_data_type(measure.to_string()) + .ok_or(CubeError::internal(format!( + "Can't find measure '{}'", + measure + )))?; + fields.push(( + DFField::new( + expr_relation(&expr), + &expr_name(&expr)?, + data_type, + true, + ), + MemberField::Member(measure.to_string()), + )); + } + LogicalPlanLanguage::TimeDimension(params) => { + let dimension = + match_data_node!(node_by_id, params[0], TimeDimensionName); + let granularity = + match_data_node!(node_by_id, params[1], TimeDimensionGranularity); + let date_range = + match_data_node!(node_by_id, params[2], TimeDimensionDateRange); + let expr = self.to_expr(params[3])?; + let query_time_dimension = V1LoadRequestQueryTimeDimension { + dimension: dimension.to_string(), + granularity: granularity.clone(), + date_range: date_range.map(|date_range| { + serde_json::Value::Array( + date_range + .into_iter() + .map(|d| serde_json::Value::String(d)) + .collect(), + ) + }), + }; + if !query_time_dimensions.contains(&query_time_dimension) { + query_time_dimensions.push(query_time_dimension); + } + if let Some(granularity) = &granularity { + fields.push(( + DFField::new( + expr_relation(&expr), + // TODO empty schema + &expr_name(&expr)?, + DataType::Timestamp(TimeUnit::Nanosecond, None), + true, + ), + MemberField::Member(format!("{}.{}", dimension, granularity)), + )); + } + } + LogicalPlanLanguage::Dimension(params) => { + let dimension = match_data_node!(node_by_id, params[0], DimensionName); + let expr = self.to_expr(params[1])?; + let data_type = self + .cube_context + .meta + .find_df_data_type(dimension.to_string()) + .ok_or(CubeError::internal(format!( + "Can't find dimension '{}'", + dimension + )))?; + query_dimensions.push(dimension.to_string()); + fields.push(( + DFField::new( + expr_relation(&expr), + // TODO empty schema + &expr_name(&expr)?, + data_type, + true, + ), + MemberField::Member(dimension), + )); + } + LogicalPlanLanguage::Segment(params) => { + let expr = self.to_expr(params[1])?; + fields.push(( + DFField::new( + expr_relation(&expr), + // TODO empty schema + &expr_name(&expr)?, + DataType::Boolean, + true, + ), + MemberField::Literal(ScalarValue::Boolean(None)), + )); + } + LogicalPlanLanguage::ChangeUser(params) => { + let expr = self.to_expr(params[1])?; + fields.push(( + DFField::new( + expr_relation(&expr), + // TODO empty schema + &expr_name(&expr)?, + DataType::Utf8, + true, + ), + MemberField::Literal(ScalarValue::Utf8(None)), + )); + } + LogicalPlanLanguage::LiteralMember(params) => { + let value = match_data_node!(node_by_id, params[0], LiteralMemberValue); + let expr = self.to_expr(params[1])?; + let relation = + match_data_node!(node_by_id, params[2], LiteralMemberRelation); + fields.push(( + DFField::new( + relation.as_deref(), + &expr_name(&expr)?, + value.get_datatype(), + true, + ), + MemberField::Literal(value), + )); + } + LogicalPlanLanguage::VirtualField(params) => { + let expr = self.to_expr(params[2])?; + fields.push(( + DFField::new( + expr_relation(&expr), + // TODO empty schema + &expr_name(&expr)?, + DataType::Utf8, + true, + ), + MemberField::Literal(ScalarValue::Utf8(None)), + )); + } + LogicalPlanLanguage::MemberError(params) => { + let error = match_data_node!(node_by_id, params[0], MemberErrorError); + return Err(CubeError::user(error.to_string())); + } + LogicalPlanLanguage::AllMembers(_) => { + if !wrapped { + return Err(CubeError::internal( + "Can't detect Cube query and it may be not supported yet" + .to_string(), + )); + } else { + for (alias, cube) in alias_to_cube.iter() { + let cube = self .cube_context .meta - .find_df_data_type(dimension.to_string()) - .ok_or(CubeError::internal(format!( - "Can't find dimension '{}'", - dimension - )))?; - query_dimensions.push(dimension.to_string()); - fields.push(( - DFField::new( - expr_relation(&expr), - // TODO empty schema - &expr_name(&expr)?, - data_type, - true, - ), - MemberField::Member(dimension), - )); - } - LogicalPlanLanguage::Segment(params) => { - let expr = self.to_expr(params[1])?; - fields.push(( - DFField::new( - expr_relation(&expr), - // TODO empty schema - &expr_name(&expr)?, - DataType::Boolean, - true, - ), - MemberField::Literal(ScalarValue::Boolean(None)), - )); - } - LogicalPlanLanguage::ChangeUser(params) => { - let expr = self.to_expr(params[1])?; - fields.push(( - DFField::new( - expr_relation(&expr), - // TODO empty schema - &expr_name(&expr)?, - DataType::Utf8, - true, - ), - MemberField::Literal(ScalarValue::Utf8(None)), - )); - } - LogicalPlanLanguage::LiteralMember(params) => { - let value = - match_data_node!(node_by_id, params[0], LiteralMemberValue); - let expr = self.to_expr(params[1])?; - let relation = match_data_node!( - node_by_id, - params[2], - LiteralMemberRelation - ); - fields.push(( - DFField::new( - relation.as_deref(), - &expr_name(&expr)?, - value.get_datatype(), - true, - ), - MemberField::Literal(value), - )); - } - LogicalPlanLanguage::VirtualField(params) => { - let expr = self.to_expr(params[2])?; - fields.push(( - DFField::new( - expr_relation(&expr), - // TODO empty schema - &expr_name(&expr)?, - DataType::Utf8, - true, - ), - MemberField::Literal(ScalarValue::Utf8(None)), - )); - } - LogicalPlanLanguage::MemberError(params) => { - let error = - match_data_node!(node_by_id, params[0], MemberErrorError); - return Err(CubeError::user(error.to_string())); - } - LogicalPlanLanguage::AllMembers(_) => { - if !wrapped { - return Err(CubeError::internal( - "Can't detect Cube query and it may be not supported yet" - .to_string(), - )); - } else { - for (alias, cube) in alias_to_cube.iter() { - let cube = self - .cube_context - .meta - .find_cube_with_name(cube) - .ok_or_else(|| { - CubeError::user(format!( - "Can't find cube '{}'", - cube - )) - })?; - for column in cube.get_columns() { - if self.cube_context.meta.is_synthetic_field( + .find_cube_with_name(cube) + .ok_or_else(|| { + CubeError::user(format!("Can't find cube '{}'", cube)) + })?; + for column in cube.get_columns() { + if self + .cube_context + .meta + .is_synthetic_field(column.member_name().to_string()) + { + fields.push(( + DFField::new( + Some(&alias), + column.get_name(), + column.get_column_type().to_arrow(), + true, + ), + MemberField::Literal(ScalarValue::Utf8(None)), + )); + } else { + fields.push(( + DFField::new( + Some(&alias), + column.get_name(), + column.get_column_type().to_arrow(), + true, + ), + MemberField::Member( column.member_name().to_string(), - ) { - fields.push(( - DFField::new( - Some(&alias), - column.get_name(), - column.get_column_type().to_arrow(), - true, - ), - MemberField::Literal(ScalarValue::Utf8( - None, - )), - )); - } else { - fields.push(( - DFField::new( - Some(&alias), - column.get_name(), - column.get_column_type().to_arrow(), - true, - ), - MemberField::Member( - column.member_name().to_string(), - ), - )); - } - } + ), + )); } } } - x => panic!("Expected dimension but found {:?}", x), } } + x => panic!("Expected dimension but found {:?}", x), + } + } - let filters = - match_list_node!(node_by_id, cube_scan_params[2], CubeScanFilters); + let filters = match_list_node!(node_by_id, cube_scan_params[2], CubeScanFilters); + + fn to_filter( + query_time_dimensions: &mut Vec, + filters: Vec, + node_by_id: &impl Index, + is_in_or: bool, + ) -> Result< + ( + Vec, + Vec, + Option, + ), + CubeError, + > { + let mut result = Vec::new(); + let mut segments_result = Vec::new(); + let mut change_user_result = Vec::new(); + + for f in filters { + match f { + LogicalPlanLanguage::FilterOp(params) => { + let filters = + match_list_node!(node_by_id, params[0], FilterOpFilters); + let op = match_data_node!(node_by_id, params[1], FilterOpOp); + let is_and_op = op == "and"; + let (filters, segments, change_user) = to_filter( + query_time_dimensions, + filters, + node_by_id, + !is_in_or || !is_and_op, + )?; + match op.as_str() { + "and" => { + result.push(V1LoadRequestQueryFilterItem { + member: None, + operator: None, + values: None, + or: None, + and: Some( + filters + .into_iter() + .map(|f| serde_json::json!(f)) + .collect(), + ), + }); + segments_result.extend(segments); - fn to_filter( - query_time_dimensions: &mut Vec, - filters: Vec, - node_by_id: &impl Index, - is_in_or: bool, - ) -> Result< - ( - Vec, - Vec, - Option, - ), - CubeError, - > { - let mut result = Vec::new(); - let mut segments_result = Vec::new(); - let mut change_user_result = Vec::new(); - - for f in filters { - match f { - LogicalPlanLanguage::FilterOp(params) => { - let filters = match_list_node!( - node_by_id, - params[0], - FilterOpFilters - ); - let op = - match_data_node!(node_by_id, params[1], FilterOpOp); - let is_and_op = op == "and"; - let (filters, segments, change_user) = to_filter( - query_time_dimensions, - filters, - node_by_id, - !is_in_or || !is_and_op, - )?; - match op.as_str() { - "and" => { - result.push(V1LoadRequestQueryFilterItem { - member: None, - operator: None, - values: None, - or: None, - and: Some( - filters - .into_iter() - .map(|f| serde_json::json!(f)) - .collect(), - ), - }); - segments_result.extend(segments); - - if change_user.is_some() { - change_user_result.extend(change_user); - } - } - "or" => { - result.push(V1LoadRequestQueryFilterItem { - member: None, - operator: None, - values: None, - or: Some( - filters - .into_iter() - .map(|f| serde_json::json!(f)) - .collect(), - ), - and: None, - }); - if !segments.is_empty() { - return Err(CubeError::internal( - "Can't use OR operator with segments" - .to_string(), - )); - } - - if change_user.is_some() { - return Err(CubeError::internal( - "Can't use OR operator with __user column" - .to_string(), - )); - } - } - x => panic!("Unsupported filter operator: {}", x), + if change_user.is_some() { + change_user_result.extend(change_user); } } - LogicalPlanLanguage::FilterMember(params) => { - let member = match_data_node!( - node_by_id, - params[0], - FilterMemberMember - ); - let op = - match_data_node!(node_by_id, params[1], FilterMemberOp); - let values = match_data_node!( - node_by_id, - params[2], - FilterMemberValues - ); - if !is_in_or && op == "inDateRange" { - let existing_time_dimension = - query_time_dimensions.iter_mut().find_map(|td| { - if td.dimension == member - && td.date_range.is_none() - { - td.date_range = Some(json!(values)); - Some(td) - } else { - None - } - }); - if existing_time_dimension.is_none() { - let dimension = V1LoadRequestQueryTimeDimension { - dimension: member.to_string(), - granularity: None, - date_range: Some(json!(values)), - }; - query_time_dimensions.push(dimension); - } - } else { - result.push(V1LoadRequestQueryFilterItem { - member: Some(member), - operator: Some(op), - values: if !values.is_empty() { - Some(values) - } else { - None - }, - or: None, - and: None, - }); + "or" => { + result.push(V1LoadRequestQueryFilterItem { + member: None, + operator: None, + values: None, + or: Some( + filters + .into_iter() + .map(|f| serde_json::json!(f)) + .collect(), + ), + and: None, + }); + if !segments.is_empty() { + return Err(CubeError::internal( + "Can't use OR operator with segments".to_string(), + )); + } + + if change_user.is_some() { + return Err(CubeError::internal( + "Can't use OR operator with __user column" + .to_string(), + )); } } - LogicalPlanLanguage::SegmentMember(params) => { - let member = match_data_node!( - node_by_id, - params[0], - SegmentMemberMember - ); - segments_result.push(member); - } - LogicalPlanLanguage::ChangeUserMember(params) => { - let member = match_data_node!( - node_by_id, - params[0], - ChangeUserMemberValue - ); - change_user_result.push(member); + x => panic!("Unsupported filter operator: {}", x), + } + } + LogicalPlanLanguage::FilterMember(params) => { + let member = + match_data_node!(node_by_id, params[0], FilterMemberMember); + let op = match_data_node!(node_by_id, params[1], FilterMemberOp); + let values = + match_data_node!(node_by_id, params[2], FilterMemberValues); + if !is_in_or && op == "inDateRange" { + let existing_time_dimension = + query_time_dimensions.iter_mut().find_map(|td| { + if td.dimension == member && td.date_range.is_none() { + td.date_range = Some(json!(values)); + Some(td) + } else { + None + } + }); + if existing_time_dimension.is_none() { + let dimension = V1LoadRequestQueryTimeDimension { + dimension: member.to_string(), + granularity: None, + date_range: Some(json!(values)), + }; + query_time_dimensions.push(dimension); } - x => panic!("Expected filter but found {:?}", x), + } else { + result.push(V1LoadRequestQueryFilterItem { + member: Some(member), + operator: Some(op), + values: if !values.is_empty() { + Some(values) + } else { + None + }, + or: None, + and: None, + }); } } - - if change_user_result.len() > 1 { - return Err(CubeError::internal( - "Unable to use multiple __user in one Cube query".to_string(), - )); + LogicalPlanLanguage::SegmentMember(params) => { + let member = + match_data_node!(node_by_id, params[0], SegmentMemberMember); + segments_result.push(member); } - - Ok((result, segments_result, change_user_result.pop())) + LogicalPlanLanguage::ChangeUserMember(params) => { + let member = + match_data_node!(node_by_id, params[0], ChangeUserMemberValue); + change_user_result.push(member); + } + x => panic!("Expected filter but found {:?}", x), } + } - let (filters, segments, change_user) = - to_filter(&mut query_time_dimensions, filters, node_by_id, false)?; + if change_user_result.len() > 1 { + return Err(CubeError::internal( + "Unable to use multiple __user in one Cube query".to_string(), + )); + } - query.filters = if filters.len() > 0 { - Some(filters) - } else { - None - }; - - query.segments = Some(segments); - - for o in order { - let order_params = match_params!(o, Order); - let order_member = - match_data_node!(node_by_id, order_params[0], OrderMember); - let order_asc = match_data_node!(node_by_id, order_params[1], OrderAsc); - query_order.push(vec![ - order_member, - if order_asc { - "asc".to_string() - } else { - "desc".to_string() - }, - ]) - } + Ok((result, segments_result, change_user_result.pop())) + } - if !wrapped && fields.len() == 0 { - return Err(CubeError::internal( - "Can't detect Cube query and it may be not supported yet" - .to_string(), - )); - } + let (filters, segments, change_user) = + to_filter(&mut query_time_dimensions, filters, node_by_id, false)?; + + query.filters = if filters.len() > 0 { + Some(filters) + } else { + None + }; - query.measures = Some(query_measures.into_iter().unique().collect()); - query.dimensions = Some(query_dimensions.into_iter().unique().collect()); - query.time_dimensions = if query_time_dimensions.len() > 0 { - Some( - query_time_dimensions - .into_iter() - .unique_by(|td| { - ( - td.dimension.to_string(), - td.granularity.clone(), - td.date_range - .as_ref() - .map(|range| serde_json::to_string(range).unwrap()), - ) - }) - .collect(), - ) + query.segments = Some(segments); + + for o in order { + let order_params = match_params!(o, Order); + let order_member = match_data_node!(node_by_id, order_params[0], OrderMember); + let order_asc = match_data_node!(node_by_id, order_params[1], OrderAsc); + query_order.push(vec![ + order_member, + if order_asc { + "asc".to_string() } else { - None - }; - - let cube_scan_query_limit = self - .cube_context - .sessions - .server - .config_obj - .non_streaming_query_max_row_limit() - as usize; - let fail_on_max_limit_hit = env::var("CUBESQL_FAIL_ON_MAX_LIMIT_HIT") - .map(|v| v.to_lowercase() == "true") - .unwrap_or(false); - let mut limit_was_changed = false; - query.limit = match match_data_node!( - node_by_id, - cube_scan_params[4], - CubeScanLimit - ) { - Some(n) => { - if n > cube_scan_query_limit { - limit_was_changed = true; - } - Some(n) + "desc".to_string() + }, + ]) + } + + if !wrapped && fields.len() == 0 { + return Err(CubeError::internal( + "Can't detect Cube query and it may be not supported yet".to_string(), + )); + } + + query.measures = Some(query_measures.into_iter().unique().collect()); + query.dimensions = Some(query_dimensions.into_iter().unique().collect()); + query.time_dimensions = if query_time_dimensions.len() > 0 { + Some( + query_time_dimensions + .into_iter() + .unique_by(|td| { + ( + td.dimension.to_string(), + td.granularity.clone(), + td.date_range + .as_ref() + .map(|range| serde_json::to_string(range).unwrap()), + ) + }) + .collect(), + ) + } else { + None + }; + + let cube_scan_query_limit = + self.cube_context + .sessions + .server + .config_obj + .non_streaming_query_max_row_limit() as usize; + let fail_on_max_limit_hit = env::var("CUBESQL_FAIL_ON_MAX_LIMIT_HIT") + .map(|v| v.to_lowercase() == "true") + .unwrap_or(false); + let mut limit_was_changed = false; + query.limit = + match match_data_node!(node_by_id, cube_scan_params[4], CubeScanLimit) { + Some(n) => { + if n > cube_scan_query_limit { + limit_was_changed = true; } - None => { - if fail_on_max_limit_hit { - limit_was_changed = true; - Some(cube_scan_query_limit) - } else { - None - } + Some(n) + } + None => { + if fail_on_max_limit_hit { + limit_was_changed = true; + Some(cube_scan_query_limit) + } else { + None } } - .map(|n| n as i32); + } + .map(|n| n as i32); - let max_records = if fail_on_max_limit_hit && limit_was_changed { - Some(cube_scan_query_limit) - } else { - None - }; - - let offset = - match_data_node!(node_by_id, cube_scan_params[5], CubeScanOffset) - .map(|offset| offset as i32); - if offset.is_some() { - query.offset = offset; - } + let max_records = if fail_on_max_limit_hit && limit_was_changed { + Some(cube_scan_query_limit) + } else { + None + }; - fields = fields - .into_iter() - .unique_by(|(f, _)| f.qualified_name()) - .collect(); + let offset = match_data_node!(node_by_id, cube_scan_params[5], CubeScanOffset) + .map(|offset| offset as i32); + if offset.is_some() { + query.offset = offset; + } - let ungrouped = - match_data_node!(node_by_id, cube_scan_params[9], CubeScanUngrouped); + fields = fields + .into_iter() + .unique_by(|(f, _)| f.qualified_name()) + .collect(); - if ungrouped { - query.ungrouped = Some(true); - } + let ungrouped = + match_data_node!(node_by_id, cube_scan_params[9], CubeScanUngrouped); - query.order = if !query_order.is_empty() { - Some(query_order) - } else { - // If no order was specified in client SQL, - // there should be no order implicitly added. - // in case when CUBESQL_SQL_NO_IMPLICIT_ORDER it is set to true - no implicit order is - // added for all queries. - // We need to return empty array so the processing in - // BaseQuery.js won't automatically add default order - - let cube_no_implicit_order = self - .cube_context - .sessions - .server - .config_obj - .no_implicit_order(); + if ungrouped { + query.ungrouped = Some(true); + } - if cube_no_implicit_order || query.ungrouped == Some(true) { - Some(vec![]) - } else { - None - } - }; - - let member_fields = fields.iter().map(|(_, m)| m.clone()).collect(); - - Arc::new(CubeScanNode::new( - Arc::new(DFSchema::new_with_metadata( - fields.into_iter().map(|(f, _)| f).collect(), - HashMap::new(), - )?), - member_fields, - query, - self.auth_context.clone(), - CubeScanOptions { - change_user, - max_records, - }, - alias_to_cube.into_iter().map(|(_, c)| c).unique().collect(), - self.span_id.clone(), - )) + query.order = if !query_order.is_empty() { + Some(query_order) + } else { + // If no order was specified in client SQL, + // there should be no order implicitly added. + // in case when CUBESQL_SQL_NO_IMPLICIT_ORDER it is set to true - no implicit order is + // added for all queries. + // We need to return empty array so the processing in + // BaseQuery.js won't automatically add default order + + let cube_no_implicit_order = self + .cube_context + .sessions + .server + .config_obj + .no_implicit_order(); + + if cube_no_implicit_order || query.ungrouped == Some(true) { + Some(vec![]) + } else { + None } - x => panic!("Unexpected extension node: {:?}", x), }; + let member_fields = fields.iter().map(|(_, m)| m.clone()).collect(); + + let node = Arc::new(CubeScanNode::new( + Arc::new(DFSchema::new_with_metadata( + fields.into_iter().map(|(f, _)| f).collect(), + HashMap::new(), + )?), + member_fields, + query, + self.auth_context.clone(), + CubeScanOptions { + change_user, + max_records, + }, + alias_to_cube.into_iter().map(|(_, c)| c).unique().collect(), + self.span_id.clone(), + )); + LogicalPlan::Extension(Extension { node }) } LogicalPlanLanguage::CubeScanWrapper(params) => { @@ -2349,11 +2290,9 @@ impl LanguageToLogicalPlanConverter { fn is_cube_scan_node(&self, node_id: Id) -> bool { let node_by_id = &self.best_expr; match node_by_id.index(node_id) { - LogicalPlanLanguage::Extension(params) => match node_by_id.index(params[0]) { - LogicalPlanLanguage::CubeScan(_) => return true, - _ => (), - }, - LogicalPlanLanguage::CubeScanWrapper(_) => return true, + LogicalPlanLanguage::CubeScan(_) | LogicalPlanLanguage::CubeScanWrapper(_) => { + return true + } _ => (), } diff --git a/rust/cubesql/cubesql/src/compile/rewrite/mod.rs b/rust/cubesql/cubesql/src/compile/rewrite/mod.rs index 7b67cc2042bd6..5473a4548f29a 100644 --- a/rust/cubesql/cubesql/src/compile/rewrite/mod.rs +++ b/rust/cubesql/cubesql/src/compile/rewrite/mod.rs @@ -2101,7 +2101,7 @@ fn cube_scan( ungrouped: impl Display, ) -> String { format!( - "(Extension (CubeScan {} {} {} {} {} {} {} {} {} {}))", + "(CubeScan {} {} {} {} {} {} {} {} {} {})", alias_to_cube, members, filters,