Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rust Integration: Full find_min_cut Rust Integration #3

Merged
merged 55 commits into from
Dec 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
55 commits
Select commit Hold shift + click to select a range
74929fc
set up pr
ohjuny Nov 9, 2024
ca6f716
refactor modules + set up graph lib
ohjuny Nov 9, 2024
293579d
graph updates
ohjuny Nov 11, 2024
b115396
operation and costmodel
ohjuny Nov 11, 2024
8e7b787
note on caching
ohjuny Nov 11, 2024
3bd85de
rename to ids
ohjuny Nov 11, 2024
86844c8
edit operation fields
ohjuny Nov 11, 2024
b5911fc
floats instead of int
ohjuny Nov 11, 2024
7a575c9
not working; cost model issues, making generic might be hard with int…
ohjuny Nov 14, 2024
cd5c43b
wip cost model, issues with creating generic constructor
ohjuny Nov 14, 2024
7a0901b
working: minimal change with new PhillipsDessouky setup
ohjuny Nov 19, 2024
6342abb
working: fix type hint
ohjuny Nov 19, 2024
4d23187
working: actually fix type hint
ohjuny Nov 19, 2024
83efa13
remove backup file since working
ohjuny Nov 19, 2024
ed4fbf2
restructure max_flow temporarily, use with_capacity+extend for perfor…
ohjuny Nov 19, 2024
ddacc3f
working: rustify up to first max flow in find_min_cut
ohjuny Nov 20, 2024
3e93337
working: rename solvers
ohjuny Nov 20, 2024
86fe7e3
set up next goal state: up to 2nd max flow
ohjuny Nov 20, 2024
065e948
minor solver touches
ohjuny Nov 20, 2024
3d55a54
note very hard to find bug caused by difference in py/rs impl
ohjuny Nov 20, 2024
0f823b6
working: replace everything up to 2nd max flow
ohjuny Nov 21, 2024
b89e1c0
set up next goal state solver
ohjuny Nov 22, 2024
d2ee220
working: full find_min_cut rewrite
ohjuny Nov 22, 2024
e9053a6
working: full f_m_c replace + remove testing functions
ohjuny Nov 22, 2024
abef61a
working: full find_min_cut, rename solver checkpoints
ohjuny Dec 3, 2024
da8a3e7
set up next reduce_duration goal state
ohjuny Dec 3, 2024
7ba89ae
redefine goal state with aoa_to_crit_dag
ohjuny Dec 3, 2024
b6ea2ac
set up rust side for aoa_to_crit
ohjuny Dec 3, 2024
de77916
final set up for aoa_to_crit
ohjuny Dec 3, 2024
a5adb59
issues with multiple immutable/mutable borrows
ohjuny Dec 4, 2024
edcc456
undo aon_to_crit, polish full find_min_cut rewrite
ohjuny Dec 9, 2024
3874064
remove unused files
ohjuny Dec 9, 2024
c143e33
update pyo3 version
ohjuny Dec 9, 2024
bbf932c
use prior pyo3 v
ohjuny Dec 10, 2024
c4e64e2
black solver formatting
ohjuny Dec 10, 2024
0188772
fix pyi interface
ohjuny Dec 10, 2024
b52c6d8
fix solver style
ohjuny Dec 10, 2024
83d9873
fix pyi pyright
ohjuny Dec 10, 2024
d97295c
actaully fix pyi pyright
ohjuny Dec 10, 2024
1c64642
upgrade pyo3
ohjuny Dec 10, 2024
03fc9fb
try removing macos-12 from maturin.yml
ohjuny Dec 10, 2024
e489444
keep macos-12 x86_64 commented in yml
ohjuny Dec 10, 2024
55d98da
remove temporary profiling logs
ohjuny Dec 10, 2024
c9a0723
remove unused time library
ohjuny Dec 10, 2024
ce21569
Update src/lowtime_graph.rs
ohjuny Dec 11, 2024
b97e6e0
add newline to eof
ohjuny Dec 11, 2024
8219998
Update src/phillips_dessouky.rs
ohjuny Dec 11, 2024
48d1725
make source/sink raw u32 instead of Option
ohjuny Dec 11, 2024
eeef91d
add original python find_min_cut, edit docstring to specify why it is…
ohjuny Dec 11, 2024
46c78e0
make LowtimeEdge vars public, remove getters/setters
ohjuny Dec 11, 2024
86752f9
make LowtimeGraph.node_ids pub
ohjuny Dec 11, 2024
cb06b53
make LowtimeGraph.source/sink_node_id pub
ohjuny Dec 11, 2024
e41933e
make non-capacity vars in LowtimeEdge raw f64 instead of OrderedFloat
ohjuny Dec 12, 2024
4a094f8
add deque to solver, even though find_min_cut never called
ohjuny Dec 12, 2024
723d80a
add nx edmonds_karp even though find_min_cut never called
ohjuny Dec 12, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/maturin.yml
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,8 @@ jobs:
strategy:
matrix:
platform:
- runner: macos-12
target: x86_64
# - runner: macos-12
# target: x86_64
- runner: macos-14
target: aarch64
steps:
Expand Down
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ name = "_lowtime_rs"
crate-type = ["cdylib"]

[dependencies]
pyo3 = "0.22.0"
pyo3-log = "0.11.0"
pyo3 = { version = "0.23.3", features = ["extension-module"] }
pyo3-log = "0.12.0"
log = "0.4"
ordered-float = { version = "4.0", default-features = false }
pathfinding = "4.11.0"
10 changes: 8 additions & 2 deletions lowtime/_lowtime_rs.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,15 @@ import networkx as nx
class PhillipsDessouky:
def __init__(
self,
fp_error: float,
node_ids: list[int] | nx.classes.reportviews.NodeView,
source_node_id: int,
sink_node_id: int,
edges_raw: list[tuple[tuple[int, int], float]],
edges_raw: list[
tuple[
tuple[int, int],
tuple[float, float, float, float],
]
],
) -> None: ...
def max_flow(self) -> list[tuple[tuple[int, int], float]]: ...
def find_min_cut(self) -> tuple[set[int], set[int]]: ...
207 changes: 70 additions & 137 deletions lowtime/solver.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@

from __future__ import annotations

import time
import sys
import logging
from collections import deque
from collections.abc import Generator

import networkx as nx
from networkx.algorithms.flow import edmonds_karp
from attrs import define, field

from lowtime.operation import Operation
Expand Down Expand Up @@ -161,6 +161,35 @@ def __init__(self, dag: nx.DiGraph, attr_name: str = "op") -> None:
self.aon_dag = dag
self.unit_time = unit_time_candidates.pop()

def format_rust_inputs(
self,
dag: nx.DiGraph,
) -> tuple[
nx.classes.reportviews.NodeView,
list[
tuple[
tuple[int, int],
tuple[float, float, float, float],
]
],
]:
"""Convert Python-side nx.DiGraph into format compatible with Rust-side LowtimeGraph."""
nodes = dag.nodes
edges = []
for from_, to_, edge_attrs in dag.edges(data=True):
edges.append(
(
(from_, to_),
(
edge_attrs.get("capacity", 0),
edge_attrs.get("flow", 0),
edge_attrs.get("ub", 0),
edge_attrs.get("lb", 0),
),
)
)
return nodes, edges

def run(self) -> Generator[IterationResult, None, None]:
"""Run the algorithm and yield a DAG after each iteration.

Expand All @@ -172,7 +201,6 @@ def run(self) -> Generator[IterationResult, None, None]:
of all operations are up to date w.r.t. the `duration` of each operation.
"""
logger.info("Starting Phillips-Dessouky solver.")
profiling_setup = time.time()

# Convert the original activity-on-node DAG to activity-on-arc DAG form.
# AOA DAGs are purely internal. All public input and output of this class
Expand Down Expand Up @@ -208,15 +236,9 @@ def run(self) -> Generator[IterationResult, None, None]:
num_iters = max_time - min_time + 1
logger.info("Expected number of PD iterations: %d", num_iters)

profiling_setup = time.time() - profiling_setup
logger.info(
"PROFILING PhillipsDessouky::run set up time: %.10fs", profiling_setup
)

# Iteratively reduce the execution time of the DAG.
for iteration in range(sys.maxsize):
logger.info(">>> Beginning iteration %d/%d", iteration + 1, num_iters)
profiling_iter = time.time()

# At this point, `critical_dag` always exists and is what we want.
# For the first iteration, the critical DAG is computed before the for
Expand All @@ -238,13 +260,7 @@ def run(self) -> Generator[IterationResult, None, None]:
sum(op.duration for op in non_dummy_ops),
)

profiling_annotate = time.time()
self.annotate_capacities(critical_dag)
profiling_annotate = time.time() - profiling_annotate
logger.info(
"PROFILING PhillipsDessouky::annotate_capacities time: %.10fs",
profiling_annotate,
)

if logger.isEnabledFor(logging.DEBUG):
logger.debug("Capacity DAG:")
Expand All @@ -258,25 +274,21 @@ def run(self) -> Generator[IterationResult, None, None]:
)

try:
profiling_min_cut = time.time()
s_set, t_set = self.find_min_cut(critical_dag)
profiling_min_cut = time.time() - profiling_min_cut
logger.info(
"PROFILING PhillipsDessouky::find_min_cut time: %.10fs",
profiling_min_cut,
nodes, edges = self.format_rust_inputs(critical_dag)
rust_runner = _lowtime_rs.PhillipsDessouky(
FP_ERROR,
nodes,
critical_dag.graph["source_node"],
critical_dag.graph["sink_node"],
edges,
)
s_set, t_set = rust_runner.find_min_cut()
except LowtimeFlowError as e:
logger.info("Could not find minimum cut: %s", e.message)
logger.info("Terminating PD iteration.")
break

profiling_reduce = time.time()
cost_change = self.reduce_durations(critical_dag, s_set, t_set)
profiling_reduce = time.time() - profiling_reduce
logger.info(
"PROFILING PhillipsDessouky::reduce_durations time: %.10fs",
profiling_reduce,
)

if cost_change == float("inf") or abs(cost_change) < FP_ERROR:
logger.info("No further time reduction possible.")
Expand All @@ -298,11 +310,6 @@ def run(self) -> Generator[IterationResult, None, None]:
unit_time=self.unit_time,
)
logger.info("%s", result)
profiling_iter = time.time() - profiling_iter
logger.info(
"PROFILING PhillipsDessouky::run single iteration time: %.10fs",
profiling_iter,
)
yield result

def reduce_durations(
Expand Down Expand Up @@ -361,6 +368,10 @@ def reduce_durations(
def find_min_cut(self, dag: nx.DiGraph) -> tuple[set[int], set[int]]:
ohjuny marked this conversation as resolved.
Show resolved Hide resolved
"""Find the min cut of the DAG annotated with lower/upper bound flow capacities.

Note: this function is not used and instead accelerated by calling
rust_runner.find_min_cut. It is left here for reference in case someone wants
to modify the algorithm in Python for research.

Assumptions:
- The capacity DAG is in AOA form.
- The capacity DAG has been annotated with `lb` and `ub` attributes on edges,
Expand All @@ -374,7 +385,6 @@ def find_min_cut(self, dag: nx.DiGraph) -> tuple[set[int], set[int]]:
Raises:
LowtimeFlowError: When no feasible flow exists.
"""
profiling_min_cut_setup = time.time()
source_node = dag.graph["source_node"]
sink_node = dag.graph["sink_node"]

Expand Down Expand Up @@ -428,63 +438,18 @@ def find_min_cut(self, dag: nx.DiGraph) -> tuple[set[int], set[int]]:
capacity=float("inf"),
)

profiling_min_cut_setup = time.time() - profiling_min_cut_setup
logger.info(
"PROFILING PhillipsDessouky::find_min_cut setup time: %.10fs",
profiling_min_cut_setup,
)

# Helper function for Rust interop
def format_rust_inputs(
dag: nx.DiGraph,
) -> tuple[
nx.classes.reportviews.NodeView, list[tuple[tuple[int, int], float]]
]:
nodes = dag.nodes
edges = [
((u, v), cap)
for (u, v), cap in nx.get_edge_attributes(dag, "capacity").items()
]
return nodes, edges

# Helper function for Rust interop
# Rust's pathfinding::edmonds_karp does not return edges with 0 flow,
# but nx.max_flow does. So we fill in the 0s and empty nodes.
def reformat_rust_flow_to_dict(
flow_vec: list[tuple[tuple[int, int], float]], dag: nx.DiGraph
) -> dict[int, dict[int, float]]:
flow_dict = dict()
for u in dag.nodes:
flow_dict[u] = dict()
for v in dag.successors(u):
flow_dict[u][v] = 0.0

for (u, v), cap in flow_vec:
flow_dict[u][v] = cap

return flow_dict

# We're done with constructing the DAG with only flow upper bounds.
# Find the maximum flow on this DAG.
profiling_max_flow = time.time()
nodes, edges = format_rust_inputs(unbound_dag)

profiling_data_transfer = time.time()
rust_dag = _lowtime_rs.PhillipsDessouky(nodes, s_prime_id, t_prime_id, edges)
profiling_data_transfer = time.time() - profiling_data_transfer
logger.info(
"PROFILING PhillipsDessouky::find_min_cut data transfer time: %.10fs",
profiling_data_transfer,
)

rust_flow_vec = rust_dag.max_flow()
flow_dict = reformat_rust_flow_to_dict(rust_flow_vec, unbound_dag)

profiling_max_flow = time.time() - profiling_max_flow
logger.info(
"PROFILING PhillipsDessouky::find_min_cut maximum_flow_1 time: %.10fs",
profiling_max_flow,
)
try:
_, flow_dict = nx.maximum_flow(
unbound_dag,
s_prime_id,
t_prime_id,
capacity="capacity",
flow_func=edmonds_karp,
)
except nx.NetworkXUnbounded as e:
raise LowtimeFlowError("ERROR: Infinite flow for unbounded DAG.") from e

if logger.isEnabledFor(logging.DEBUG):
logger.debug("After first max flow")
Expand All @@ -494,8 +459,6 @@ def reformat_rust_flow_to_dict(
total_flow += flow
logger.debug("Sum of all flow values: %f", total_flow)

profiling_min_cut_between_max_flows = time.time()

# Check if residual graph is saturated. If so, we have a feasible flow.
for u in unbound_dag.successors(s_prime_id):
if (
Expand Down Expand Up @@ -537,50 +500,28 @@ def reformat_rust_flow_to_dict(
# u -> v capacity `ub - flow` and v -> u capacity `flow - lb`.
residual_graph = nx.DiGraph(dag)
for u, v in dag.edges:
# Rounding small negative values to 0.0 avoids Rust-side
# pathfinding::edmonds_karp from entering unreachable code.
# This edge case did not exist in Python-side nx.max_flow call.
uv_capacity = residual_graph[u][v]["ub"] - residual_graph[u][v]["flow"]
uv_capacity = 0.0 if abs(uv_capacity) < FP_ERROR else uv_capacity
residual_graph[u][v]["capacity"] = uv_capacity

vu_capacity = residual_graph[u][v]["flow"] - residual_graph[u][v]["lb"]
vu_capacity = 0.0 if abs(vu_capacity) < FP_ERROR else vu_capacity
residual_graph[u][v]["capacity"] = (
residual_graph[u][v]["ub"] - residual_graph[u][v]["flow"]
)
capacity = residual_graph[u][v]["flow"] - residual_graph[u][v]["lb"]
if dag.has_edge(v, u):
residual_graph[v][u]["capacity"] = vu_capacity
residual_graph[v][u]["capacity"] = capacity
else:
residual_graph.add_edge(v, u, capacity=vu_capacity)

profiling_min_cut_between_max_flows = (
time.time() - profiling_min_cut_between_max_flows
)
logger.info(
"PROFILING PhillipsDessouky::find_min_cut between max flows time: %.10fs",
profiling_min_cut_between_max_flows,
)
residual_graph.add_edge(v, u, capacity=capacity)

# Run max flow on the new residual graph.
profiling_max_flow = time.time()
nodes, edges = format_rust_inputs(residual_graph)

profiling_data_transfer = time.time()
rust_dag = _lowtime_rs.PhillipsDessouky(nodes, source_node, sink_node, edges)
profiling_data_transfer = time.time() - profiling_data_transfer
logger.info(
"PROFILING PhillipsDessouky::find_min_cut data transfer 2 time: %.10fs",
profiling_data_transfer,
)

rust_flow_vec = rust_dag.max_flow()
flow_dict = reformat_rust_flow_to_dict(rust_flow_vec, residual_graph)

profiling_max_flow = time.time() - profiling_max_flow
logger.info(
"PROFILING PhillipsDessouky::find_min_cut maximum_flow_2 time: %.10fs",
profiling_max_flow,
)

profiling_min_cut_after_max_flows = time.time()
try:
_, flow_dict = nx.maximum_flow(
residual_graph,
source_node,
sink_node,
capacity="capacity",
flow_func=edmonds_karp,
)
except nx.NetworkXUnbounded as e:
raise LowtimeFlowError(
"ERROR: Infinite flow on capacity residual graph."
) from e

# Add additional flow we get to the original graph
for u, v in dag.edges:
Expand Down Expand Up @@ -621,14 +562,6 @@ def reformat_rust_flow_to_dict(
q.append(child_id)
t_set = set(new_residual.nodes) - s_set

profiling_min_cut_after_max_flows = (
time.time() - profiling_min_cut_after_max_flows
)
logger.info(
"PROFILING PhillipsDessouky::find_min_cut after max flows time: %.10fs",
profiling_min_cut_after_max_flows,
)

return s_set, t_set

def annotate_capacities(self, critical_dag: nx.DiGraph) -> None:
Expand Down
4 changes: 4 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
use pyo3::prelude::*;

mod lowtime_graph;
mod phillips_dessouky;
mod utils;

use phillips_dessouky::PhillipsDessouky;


#[pymodule]
fn _lowtime_rs(m: &Bound<'_, PyModule>) -> PyResult<()> {
pyo3_log::init(); // send Rust logs to Python logger
Expand Down
Loading
Loading