Skip to content

Commit

Permalink
[Rust interop 2/2] find_min_cut in Rust (#3)
Browse files Browse the repository at this point in the history
  • Loading branch information
ohjuny authored and jaywonchung committed Dec 12, 2024
1 parent bed69cf commit 3656a3d
Show file tree
Hide file tree
Showing 8 changed files with 538 additions and 193 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/maturin.yml
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,8 @@ jobs:
strategy:
matrix:
platform:
- runner: macos-12
target: x86_64
# - runner: macos-12
# target: x86_64
- runner: macos-14
target: aarch64
steps:
Expand Down
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ name = "_lowtime_rs"
crate-type = ["cdylib"]

[dependencies]
pyo3 = "0.22.0"
pyo3-log = "0.11.0"
pyo3 = { version = "0.23.3", features = ["extension-module"] }
pyo3-log = "0.12.0"
log = "0.4"
ordered-float = { version = "4.0", default-features = false }
pathfinding = "4.11.0"
10 changes: 8 additions & 2 deletions lowtime/_lowtime_rs.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,15 @@ import networkx as nx
class PhillipsDessouky:
def __init__(
self,
fp_error: float,
node_ids: list[int] | nx.classes.reportviews.NodeView,
source_node_id: int,
sink_node_id: int,
edges_raw: list[tuple[tuple[int, int], float]],
edges_raw: list[
tuple[
tuple[int, int],
tuple[float, float, float, float],
]
],
) -> None: ...
def max_flow(self) -> list[tuple[tuple[int, int], float]]: ...
def find_min_cut(self) -> tuple[set[int], set[int]]: ...
207 changes: 70 additions & 137 deletions lowtime/solver.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@

from __future__ import annotations

import time
import sys
import logging
from collections import deque
from collections.abc import Generator

import networkx as nx
from networkx.algorithms.flow import edmonds_karp
from attrs import define, field

from lowtime.operation import Operation
Expand Down Expand Up @@ -161,6 +161,35 @@ def __init__(self, dag: nx.DiGraph, attr_name: str = "op") -> None:
self.aon_dag = dag
self.unit_time = unit_time_candidates.pop()

def format_rust_inputs(
self,
dag: nx.DiGraph,
) -> tuple[
nx.classes.reportviews.NodeView,
list[
tuple[
tuple[int, int],
tuple[float, float, float, float],
]
],
]:
"""Convert Python-side nx.DiGraph into format compatible with Rust-side LowtimeGraph."""
nodes = dag.nodes
edges = []
for from_, to_, edge_attrs in dag.edges(data=True):
edges.append(
(
(from_, to_),
(
edge_attrs.get("capacity", 0),
edge_attrs.get("flow", 0),
edge_attrs.get("ub", 0),
edge_attrs.get("lb", 0),
),
)
)
return nodes, edges

def run(self) -> Generator[IterationResult, None, None]:
"""Run the algorithm and yield a DAG after each iteration.
Expand All @@ -172,7 +201,6 @@ def run(self) -> Generator[IterationResult, None, None]:
of all operations are up to date w.r.t. the `duration` of each operation.
"""
logger.info("Starting Phillips-Dessouky solver.")
profiling_setup = time.time()

# Convert the original activity-on-node DAG to activity-on-arc DAG form.
# AOA DAGs are purely internal. All public input and output of this class
Expand Down Expand Up @@ -208,15 +236,9 @@ def run(self) -> Generator[IterationResult, None, None]:
num_iters = max_time - min_time + 1
logger.info("Expected number of PD iterations: %d", num_iters)

profiling_setup = time.time() - profiling_setup
logger.info(
"PROFILING PhillipsDessouky::run set up time: %.10fs", profiling_setup
)

# Iteratively reduce the execution time of the DAG.
for iteration in range(sys.maxsize):
logger.info(">>> Beginning iteration %d/%d", iteration + 1, num_iters)
profiling_iter = time.time()

# At this point, `critical_dag` always exists and is what we want.
# For the first iteration, the critical DAG is computed before the for
Expand All @@ -238,13 +260,7 @@ def run(self) -> Generator[IterationResult, None, None]:
sum(op.duration for op in non_dummy_ops),
)

profiling_annotate = time.time()
self.annotate_capacities(critical_dag)
profiling_annotate = time.time() - profiling_annotate
logger.info(
"PROFILING PhillipsDessouky::annotate_capacities time: %.10fs",
profiling_annotate,
)

if logger.isEnabledFor(logging.DEBUG):
logger.debug("Capacity DAG:")
Expand All @@ -258,25 +274,21 @@ def run(self) -> Generator[IterationResult, None, None]:
)

try:
profiling_min_cut = time.time()
s_set, t_set = self.find_min_cut(critical_dag)
profiling_min_cut = time.time() - profiling_min_cut
logger.info(
"PROFILING PhillipsDessouky::find_min_cut time: %.10fs",
profiling_min_cut,
nodes, edges = self.format_rust_inputs(critical_dag)
rust_runner = _lowtime_rs.PhillipsDessouky(
FP_ERROR,
nodes,
critical_dag.graph["source_node"],
critical_dag.graph["sink_node"],
edges,
)
s_set, t_set = rust_runner.find_min_cut()
except LowtimeFlowError as e:
logger.info("Could not find minimum cut: %s", e.message)
logger.info("Terminating PD iteration.")
break

profiling_reduce = time.time()
cost_change = self.reduce_durations(critical_dag, s_set, t_set)
profiling_reduce = time.time() - profiling_reduce
logger.info(
"PROFILING PhillipsDessouky::reduce_durations time: %.10fs",
profiling_reduce,
)

if cost_change == float("inf") or abs(cost_change) < FP_ERROR:
logger.info("No further time reduction possible.")
Expand All @@ -298,11 +310,6 @@ def run(self) -> Generator[IterationResult, None, None]:
unit_time=self.unit_time,
)
logger.info("%s", result)
profiling_iter = time.time() - profiling_iter
logger.info(
"PROFILING PhillipsDessouky::run single iteration time: %.10fs",
profiling_iter,
)
yield result

def reduce_durations(
Expand Down Expand Up @@ -361,6 +368,10 @@ def reduce_durations(
def find_min_cut(self, dag: nx.DiGraph) -> tuple[set[int], set[int]]:
"""Find the min cut of the DAG annotated with lower/upper bound flow capacities.
Note: this function is not used and instead accelerated by calling
rust_runner.find_min_cut. It is left here for reference in case someone wants
to modify the algorithm in Python for research.
Assumptions:
- The capacity DAG is in AOA form.
- The capacity DAG has been annotated with `lb` and `ub` attributes on edges,
Expand All @@ -374,7 +385,6 @@ def find_min_cut(self, dag: nx.DiGraph) -> tuple[set[int], set[int]]:
Raises:
LowtimeFlowError: When no feasible flow exists.
"""
profiling_min_cut_setup = time.time()
source_node = dag.graph["source_node"]
sink_node = dag.graph["sink_node"]

Expand Down Expand Up @@ -428,63 +438,18 @@ def find_min_cut(self, dag: nx.DiGraph) -> tuple[set[int], set[int]]:
capacity=float("inf"),
)

profiling_min_cut_setup = time.time() - profiling_min_cut_setup
logger.info(
"PROFILING PhillipsDessouky::find_min_cut setup time: %.10fs",
profiling_min_cut_setup,
)

# Helper function for Rust interop
def format_rust_inputs(
dag: nx.DiGraph,
) -> tuple[
nx.classes.reportviews.NodeView, list[tuple[tuple[int, int], float]]
]:
nodes = dag.nodes
edges = [
((u, v), cap)
for (u, v), cap in nx.get_edge_attributes(dag, "capacity").items()
]
return nodes, edges

# Helper function for Rust interop
# Rust's pathfinding::edmonds_karp does not return edges with 0 flow,
# but nx.max_flow does. So we fill in the 0s and empty nodes.
def reformat_rust_flow_to_dict(
flow_vec: list[tuple[tuple[int, int], float]], dag: nx.DiGraph
) -> dict[int, dict[int, float]]:
flow_dict = dict()
for u in dag.nodes:
flow_dict[u] = dict()
for v in dag.successors(u):
flow_dict[u][v] = 0.0

for (u, v), cap in flow_vec:
flow_dict[u][v] = cap

return flow_dict

# We're done with constructing the DAG with only flow upper bounds.
# Find the maximum flow on this DAG.
profiling_max_flow = time.time()
nodes, edges = format_rust_inputs(unbound_dag)

profiling_data_transfer = time.time()
rust_dag = _lowtime_rs.PhillipsDessouky(nodes, s_prime_id, t_prime_id, edges)
profiling_data_transfer = time.time() - profiling_data_transfer
logger.info(
"PROFILING PhillipsDessouky::find_min_cut data transfer time: %.10fs",
profiling_data_transfer,
)

rust_flow_vec = rust_dag.max_flow()
flow_dict = reformat_rust_flow_to_dict(rust_flow_vec, unbound_dag)

profiling_max_flow = time.time() - profiling_max_flow
logger.info(
"PROFILING PhillipsDessouky::find_min_cut maximum_flow_1 time: %.10fs",
profiling_max_flow,
)
try:
_, flow_dict = nx.maximum_flow(
unbound_dag,
s_prime_id,
t_prime_id,
capacity="capacity",
flow_func=edmonds_karp,
)
except nx.NetworkXUnbounded as e:
raise LowtimeFlowError("ERROR: Infinite flow for unbounded DAG.") from e

if logger.isEnabledFor(logging.DEBUG):
logger.debug("After first max flow")
Expand All @@ -494,8 +459,6 @@ def reformat_rust_flow_to_dict(
total_flow += flow
logger.debug("Sum of all flow values: %f", total_flow)

profiling_min_cut_between_max_flows = time.time()

# Check if residual graph is saturated. If so, we have a feasible flow.
for u in unbound_dag.successors(s_prime_id):
if (
Expand Down Expand Up @@ -537,50 +500,28 @@ def reformat_rust_flow_to_dict(
# u -> v capacity `ub - flow` and v -> u capacity `flow - lb`.
residual_graph = nx.DiGraph(dag)
for u, v in dag.edges:
# Rounding small negative values to 0.0 avoids Rust-side
# pathfinding::edmonds_karp from entering unreachable code.
# This edge case did not exist in Python-side nx.max_flow call.
uv_capacity = residual_graph[u][v]["ub"] - residual_graph[u][v]["flow"]
uv_capacity = 0.0 if abs(uv_capacity) < FP_ERROR else uv_capacity
residual_graph[u][v]["capacity"] = uv_capacity

vu_capacity = residual_graph[u][v]["flow"] - residual_graph[u][v]["lb"]
vu_capacity = 0.0 if abs(vu_capacity) < FP_ERROR else vu_capacity
residual_graph[u][v]["capacity"] = (
residual_graph[u][v]["ub"] - residual_graph[u][v]["flow"]
)
capacity = residual_graph[u][v]["flow"] - residual_graph[u][v]["lb"]
if dag.has_edge(v, u):
residual_graph[v][u]["capacity"] = vu_capacity
residual_graph[v][u]["capacity"] = capacity
else:
residual_graph.add_edge(v, u, capacity=vu_capacity)

profiling_min_cut_between_max_flows = (
time.time() - profiling_min_cut_between_max_flows
)
logger.info(
"PROFILING PhillipsDessouky::find_min_cut between max flows time: %.10fs",
profiling_min_cut_between_max_flows,
)
residual_graph.add_edge(v, u, capacity=capacity)

# Run max flow on the new residual graph.
profiling_max_flow = time.time()
nodes, edges = format_rust_inputs(residual_graph)

profiling_data_transfer = time.time()
rust_dag = _lowtime_rs.PhillipsDessouky(nodes, source_node, sink_node, edges)
profiling_data_transfer = time.time() - profiling_data_transfer
logger.info(
"PROFILING PhillipsDessouky::find_min_cut data transfer 2 time: %.10fs",
profiling_data_transfer,
)

rust_flow_vec = rust_dag.max_flow()
flow_dict = reformat_rust_flow_to_dict(rust_flow_vec, residual_graph)

profiling_max_flow = time.time() - profiling_max_flow
logger.info(
"PROFILING PhillipsDessouky::find_min_cut maximum_flow_2 time: %.10fs",
profiling_max_flow,
)

profiling_min_cut_after_max_flows = time.time()
try:
_, flow_dict = nx.maximum_flow(
residual_graph,
source_node,
sink_node,
capacity="capacity",
flow_func=edmonds_karp,
)
except nx.NetworkXUnbounded as e:
raise LowtimeFlowError(
"ERROR: Infinite flow on capacity residual graph."
) from e

# Add additional flow we get to the original graph
for u, v in dag.edges:
Expand Down Expand Up @@ -621,14 +562,6 @@ def reformat_rust_flow_to_dict(
q.append(child_id)
t_set = set(new_residual.nodes) - s_set

profiling_min_cut_after_max_flows = (
time.time() - profiling_min_cut_after_max_flows
)
logger.info(
"PROFILING PhillipsDessouky::find_min_cut after max flows time: %.10fs",
profiling_min_cut_after_max_flows,
)

return s_set, t_set

def annotate_capacities(self, critical_dag: nx.DiGraph) -> None:
Expand Down
4 changes: 4 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
use pyo3::prelude::*;

mod lowtime_graph;
mod phillips_dessouky;
mod utils;

use phillips_dessouky::PhillipsDessouky;


#[pymodule]
fn _lowtime_rs(m: &Bound<'_, PyModule>) -> PyResult<()> {
pyo3_log::init(); // send Rust logs to Python logger
Expand Down
Loading

0 comments on commit 3656a3d

Please sign in to comment.