diff --git a/crates/arroyo-worker/src/lib.rs b/crates/arroyo-worker/src/lib.rs index a6f2f914a..a767652fa 100644 --- a/crates/arroyo-worker/src/lib.rs +++ b/crates/arroyo-worker/src/lib.rs @@ -412,7 +412,9 @@ impl WorkerGrpc for WorkerServer { let logical = LogicalProgram::try_from(req.program.expect("Program is None")) .expect("Failed to create LogicalProgram"); - debug!("Starting execution for graph\n{}", to_d2(&logical)); + if let Ok(v) = to_d2(&logical) { + debug!("Starting execution for graph\n{}", v); + } for (udf_name, dylib_config) in &logical.program_config.udf_dylibs { info!("Loading UDF {}", udf_name); diff --git a/crates/arroyo-worker/src/utils.rs b/crates/arroyo-worker/src/utils.rs index cf153cd7f..b615e12d4 100644 --- a/crates/arroyo-worker/src/utils.rs +++ b/crates/arroyo-worker/src/utils.rs @@ -1,4 +1,5 @@ use crate::engine::construct_operator; +use anyhow::bail; use arrow_schema::Schema; use arroyo_datastream::logical::LogicalProgram; use arroyo_df::physical::new_registry; @@ -13,20 +14,17 @@ fn format_arrow_schema_fields(schema: &Schema) -> Vec<(String, String)> { .collect() } -pub fn to_d2(logical: &LogicalProgram) -> String { +pub fn to_d2(logical: &LogicalProgram) -> anyhow::Result { let registry = Arc::new(new_registry()); - assert!( - logical.program_config.udf_dylibs.is_empty(), - "UDFs not supported" - ); - assert!( - logical.program_config.python_udfs.is_empty(), - "UDFs not supported" - ); + + if !logical.program_config.udf_dylibs.is_empty() + || !logical.program_config.python_udfs.is_empty() + { + bail!("UDFs are not yet supported in the pipeline visualizer"); + } let mut d2 = String::new(); - // Nodes for idx in logical.graph.node_indices() { let node = logical.graph.node_weight(idx).unwrap(); let operator = construct_operator( @@ -36,7 +34,6 @@ pub fn to_d2(logical: &LogicalProgram) -> String { ); let display = operator.display(); - // Create a Markdown-formatted label with operator details let mut label = format!("### {} ({})", operator.name(), &display.name); for (field, value) in display.fields { label.push_str(&format!("\n- **{}**: {}", field, value)); @@ -56,37 +53,30 @@ pub fn to_d2(logical: &LogicalProgram) -> String { .unwrap(); } - // Edges and Schema Nodes for idx in logical.graph.edge_indices() { let edge = logical.graph.edge_weight(idx).unwrap(); let (from, to) = logical.graph.edge_endpoints(idx).unwrap(); - // Edge label (could be empty or minimal) let edge_label = format!("{}", edge.edge_type); - // Create a schema node using sql_table shape let schema_node_name = format!("schema_{}", idx.index()); let schema_fields = format_arrow_schema_fields(&edge.schema.schema); - // Begin schema node definition writeln!(&mut d2, "{}: {{", schema_node_name).unwrap(); writeln!(&mut d2, " shape: sql_table").unwrap(); - // Add fields to the schema node for (field_name, field_type) in schema_fields { writeln!( &mut d2, - " \"{}\": \"{}\"", // Field definition + " \"{}\": \"{}\"", field_name.replace("\"", "\\\""), field_type.replace("\"", "\\\"") ) .unwrap(); } - // End schema node definition writeln!(&mut d2, "}}").unwrap(); - // Connect source operator to schema node writeln!( &mut d2, "{} -> {}: \"{}\"", @@ -96,9 +86,8 @@ pub fn to_d2(logical: &LogicalProgram) -> String { ) .unwrap(); - // Connect schema node to destination operator writeln!(&mut d2, "{} -> {}", schema_node_name, to.index()).unwrap(); } - d2 + Ok(d2) } diff --git a/crates/arroyo/src/main.rs b/crates/arroyo/src/main.rs index 01c025f5c..bbab8b013 100644 --- a/crates/arroyo/src/main.rs +++ b/crates/arroyo/src/main.rs @@ -484,7 +484,7 @@ async fn visualize(query: Input, open: bool) { if open { let tmp = temp_dir().join("plan.d2"); - tokio::fs::write(&tmp, utils::to_d2(&compiled.program)) + tokio::fs::write(&tmp, utils::to_d2(&compiled.program).unwrap()) .await .expect("Failed to write plan"); let output = tmp.with_extension("svg"); @@ -506,6 +506,6 @@ async fn visualize(query: Input, open: bool) { let _ = open::that(format!("file://{}", output.to_string_lossy())); } else { - println!("{}", utils::to_d2(&compiled.program)); + println!("{}", utils::to_d2(&compiled.program).unwrap()); } }