Skip to content

Commit

Permalink
Merge branch 'master' into kafka-metadata
Browse files Browse the repository at this point in the history
  • Loading branch information
vaibhawvipul committed Oct 30, 2024
2 parents fa2d089 + 790b47d commit c38bc8b
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 24 deletions.
4 changes: 3 additions & 1 deletion crates/arroyo-worker/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -412,7 +412,9 @@ impl WorkerGrpc for WorkerServer {
let logical = LogicalProgram::try_from(req.program.expect("Program is None"))
.expect("Failed to create LogicalProgram");

debug!("Starting execution for graph\n{}", to_d2(&logical));
if let Ok(v) = to_d2(&logical) {
debug!("Starting execution for graph\n{}", v);
}

for (udf_name, dylib_config) in &logical.program_config.udf_dylibs {
info!("Loading UDF {}", udf_name);
Expand Down
31 changes: 10 additions & 21 deletions crates/arroyo-worker/src/utils.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::engine::construct_operator;
use anyhow::bail;
use arrow_schema::Schema;
use arroyo_datastream::logical::LogicalProgram;
use arroyo_df::physical::new_registry;
Expand All @@ -13,20 +14,17 @@ fn format_arrow_schema_fields(schema: &Schema) -> Vec<(String, String)> {
.collect()
}

pub fn to_d2(logical: &LogicalProgram) -> String {
pub fn to_d2(logical: &LogicalProgram) -> anyhow::Result<String> {
let registry = Arc::new(new_registry());
assert!(
logical.program_config.udf_dylibs.is_empty(),
"UDFs not supported"
);
assert!(
logical.program_config.python_udfs.is_empty(),
"UDFs not supported"
);

if !logical.program_config.udf_dylibs.is_empty()
|| !logical.program_config.python_udfs.is_empty()
{
bail!("UDFs are not yet supported in the pipeline visualizer");
}

let mut d2 = String::new();

// Nodes
for idx in logical.graph.node_indices() {
let node = logical.graph.node_weight(idx).unwrap();
let operator = construct_operator(
Expand All @@ -36,7 +34,6 @@ pub fn to_d2(logical: &LogicalProgram) -> String {
);
let display = operator.display();

// Create a Markdown-formatted label with operator details
let mut label = format!("### {} ({})", operator.name(), &display.name);
for (field, value) in display.fields {
label.push_str(&format!("\n- **{}**: {}", field, value));
Expand All @@ -56,37 +53,30 @@ pub fn to_d2(logical: &LogicalProgram) -> String {
.unwrap();
}

// Edges and Schema Nodes
for idx in logical.graph.edge_indices() {
let edge = logical.graph.edge_weight(idx).unwrap();
let (from, to) = logical.graph.edge_endpoints(idx).unwrap();

// Edge label (could be empty or minimal)
let edge_label = format!("{}", edge.edge_type);

// Create a schema node using sql_table shape
let schema_node_name = format!("schema_{}", idx.index());
let schema_fields = format_arrow_schema_fields(&edge.schema.schema);

// Begin schema node definition
writeln!(&mut d2, "{}: {{", schema_node_name).unwrap();
writeln!(&mut d2, " shape: sql_table").unwrap();

// Add fields to the schema node
for (field_name, field_type) in schema_fields {
writeln!(
&mut d2,
" \"{}\": \"{}\"", // Field definition
" \"{}\": \"{}\"",
field_name.replace("\"", "\\\""),
field_type.replace("\"", "\\\"")
)
.unwrap();
}

// End schema node definition
writeln!(&mut d2, "}}").unwrap();

// Connect source operator to schema node
writeln!(
&mut d2,
"{} -> {}: \"{}\"",
Expand All @@ -96,9 +86,8 @@ pub fn to_d2(logical: &LogicalProgram) -> String {
)
.unwrap();

// Connect schema node to destination operator
writeln!(&mut d2, "{} -> {}", schema_node_name, to.index()).unwrap();
}

d2
Ok(d2)
}
4 changes: 2 additions & 2 deletions crates/arroyo/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -484,7 +484,7 @@ async fn visualize(query: Input, open: bool) {

if open {
let tmp = temp_dir().join("plan.d2");
tokio::fs::write(&tmp, utils::to_d2(&compiled.program))
tokio::fs::write(&tmp, utils::to_d2(&compiled.program).unwrap())
.await
.expect("Failed to write plan");
let output = tmp.with_extension("svg");
Expand All @@ -506,6 +506,6 @@ async fn visualize(query: Input, open: bool) {

let _ = open::that(format!("file://{}", output.to_string_lossy()));
} else {
println!("{}", utils::to_d2(&compiled.program));
println!("{}", utils::to_d2(&compiled.program).unwrap());
}
}

0 comments on commit c38bc8b

Please sign in to comment.