Skip to content

Commit

Permalink
fix ci
Browse files Browse the repository at this point in the history
fix
  • Loading branch information
xxhZs committed Nov 29, 2024
1 parent e05752d commit 7eb87de
Showing 1 changed file with 112 additions and 101 deletions.
213 changes: 112 additions & 101 deletions src/frontend/src/optimizer/rule/source_to_iceberg_scan_rule.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,118 +33,129 @@ impl FallibleRule for SourceToIcebergScanRule {
Some(s) => s,
None => return ApplyResult::NotApplicable,
};
if source.core.is_iceberg_connector() {
let s = if let ConnectorProperties::Iceberg(prop) = ConnectorProperties::extract(
source
.core
.catalog
.as_ref()
.unwrap()
.with_properties
.clone(),
false,
)? {
IcebergSplitEnumerator::new_inner(*prop, SourceEnumeratorContext::dummy().into())
} else {
return ApplyResult::NotApplicable;
};

let delete_column_names = tokio::task::block_in_place(|| {
FRONTEND_RUNTIME.block_on(s.get_all_delete_column_names())
})?;
// data file scan
let data_iceberg_scan = LogicalIcebergScan::new(source, IcebergScanType::DataScan);
if delete_column_names.is_empty() {
return ApplyResult::Ok(data_iceberg_scan.into());
}
// equality delete scan
let column_catalog = source
.core
.column_catalog
.iter()
.filter(|c| {
delete_column_names.contains(&c.column_desc.name)
|| c.column_desc.name.eq(ICEBERG_SEQUENCE_NUM_COLUMN_NAME)
})
.cloned()
.collect();
let equality_delete_source = source.clone_with_column_catalog(column_catalog)?;
let equality_delete_iceberg_scan = LogicalIcebergScan::new(
&equality_delete_source,
IcebergScanType::EqualityDeleteScan,
);
#[cfg(madsim)]
return ApplyResult::Err(crate::error::ErrorCode::BindError(
"iceberg_scan can't be used in the madsim mode".to_string(),
)
.into());
#[cfg(not(madsim))]
{
if source.core.is_iceberg_connector() {
let s = if let ConnectorProperties::Iceberg(prop) = ConnectorProperties::extract(
source
.core
.catalog
.as_ref()
.unwrap()
.with_properties
.clone(),
false,
)? {
IcebergSplitEnumerator::new_inner(
*prop,
SourceEnumeratorContext::dummy().into(),
)
} else {
return ApplyResult::NotApplicable;
};

let data_columns_len = data_iceberg_scan.core.schema().len();
// The join condition is delete_column_names is equal and sequence number is less than, join type is left anti
let build_inputs = |scan: &LogicalIcebergScan, add_index: usize| {
let delete_column_inputs = scan
let delete_column_names = tokio::task::block_in_place(|| {
FRONTEND_RUNTIME.block_on(s.get_all_delete_column_names())
})?;
// data file scan
let data_iceberg_scan = LogicalIcebergScan::new(source, IcebergScanType::DataScan);
if delete_column_names.is_empty() {
return ApplyResult::Ok(data_iceberg_scan.into());
}
// equality delete scan
let column_catalog = source
.core
.schema()
.fields()
.column_catalog
.iter()
.enumerate()
.filter_map(|(index, data_column)| {
if delete_column_names.contains(&data_column.name) {
Some(InputRef {
index: add_index + index,
data_type: data_column.data_type(),
})
} else {
None
}
.filter(|c| {
delete_column_names.contains(&c.column_desc.name)
|| c.column_desc.name.eq(ICEBERG_SEQUENCE_NUM_COLUMN_NAME)
})
.collect::<Vec<InputRef>>();
let seq_num_inputs = InputRef {
index: scan
.cloned()
.collect();
let equality_delete_source = source.clone_with_column_catalog(column_catalog)?;
let equality_delete_iceberg_scan = LogicalIcebergScan::new(
&equality_delete_source,
IcebergScanType::EqualityDeleteScan,
);

let data_columns_len = data_iceberg_scan.core.schema().len();
// The join condition is delete_column_names is equal and sequence number is less than, join type is left anti
let build_inputs = |scan: &LogicalIcebergScan, add_index: usize| {
let delete_column_inputs = scan
.core
.schema()
.fields()
.iter()
.position(|f| f.name.eq(ICEBERG_SEQUENCE_NUM_COLUMN_NAME))
.unwrap()
+ add_index,
data_type: risingwave_common::types::DataType::Int64,
.enumerate()
.filter_map(|(index, data_column)| {
if delete_column_names.contains(&data_column.name) {
Some(InputRef {
index: add_index + index,
data_type: data_column.data_type(),
})
} else {
None
}
})
.collect::<Vec<InputRef>>();
let seq_num_inputs = InputRef {
index: scan
.core
.schema()
.fields()
.iter()
.position(|f| f.name.eq(ICEBERG_SEQUENCE_NUM_COLUMN_NAME))
.unwrap()
+ add_index,
data_type: risingwave_common::types::DataType::Int64,
};
(delete_column_inputs, seq_num_inputs)
};
(delete_column_inputs, seq_num_inputs)
};
let (join_left_delete_column_inputs, join_left_seq_num_input) =
build_inputs(&data_iceberg_scan, 0);
let (join_right_delete_column_inputs, join_right_seq_num_input) =
build_inputs(&equality_delete_iceberg_scan, data_columns_len);
let (join_left_delete_column_inputs, join_left_seq_num_input) =
build_inputs(&data_iceberg_scan, 0);
let (join_right_delete_column_inputs, join_right_seq_num_input) =
build_inputs(&equality_delete_iceberg_scan, data_columns_len);

let mut eq_join_expr = join_left_delete_column_inputs
.iter()
.zip_eq_fast(join_right_delete_column_inputs.iter())
.map(|(left, right)| {
Ok(FunctionCall::new(
ExprType::Equal,
vec![left.clone().into(), right.clone().into()],
let mut eq_join_expr = join_left_delete_column_inputs
.iter()
.zip_eq_fast(join_right_delete_column_inputs.iter())
.map(|(left, right)| {
Ok(FunctionCall::new(
ExprType::Equal,
vec![left.clone().into(), right.clone().into()],
)?
.into())
})
.collect::<Result<Vec<ExprImpl>>>()?;
eq_join_expr.push(
FunctionCall::new(
ExprType::LessThan,
vec![
join_left_seq_num_input.into(),
join_right_seq_num_input.into(),
],
)?
.into())
})
.collect::<Result<Vec<ExprImpl>>>()?;
eq_join_expr.push(
FunctionCall::new(
ExprType::LessThan,
vec![
join_left_seq_num_input.into(),
join_right_seq_num_input.into(),
],
)?
.into(),
);
let on = Condition {
conjunctions: eq_join_expr,
};
let join = LogicalJoin::new(
data_iceberg_scan.into(),
equality_delete_iceberg_scan.into(),
risingwave_pb::plan_common::JoinType::LeftAnti,
on,
);
ApplyResult::Ok(join.into())
} else {
ApplyResult::NotApplicable
.into(),
);
let on = Condition {
conjunctions: eq_join_expr,
};
let join = LogicalJoin::new(
data_iceberg_scan.into(),
equality_delete_iceberg_scan.into(),
risingwave_pb::plan_common::JoinType::LeftAnti,
on,
);
ApplyResult::Ok(join.into())
} else {
ApplyResult::NotApplicable
}
}
}
}
Expand Down

0 comments on commit 7eb87de

Please sign in to comment.