Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove pattern try...except from tests #322

Merged
merged 1 commit into from
Apr 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 19 additions & 27 deletions tests/solvers/cpp/test_cpp_solvers.py
Original file line number Diff line number Diff line change
Expand Up @@ -466,33 +466,25 @@ def decode(val):


def test_solver_cpp(solver_cpp, parallel, shared_memory):
noexcept = True

if solver_cpp["entry"] == "UCT":
pytest.skip("There is a heap corruption in MCTS solver")
try:
dom = GridDomain()
solver_type = load_registered_solver(solver_cpp["entry"])
solver_args = deepcopy(solver_cpp["config"])
if "parallel" in inspect.signature(solver_type.__init__).parameters:
solver_args["parallel"] = parallel
if (
"shared_memory_proxy" in inspect.signature(solver_type.__init__).parameters
and shared_memory
):
solver_args["shared_memory_proxy"] = GridShmProxy()
solver_args["domain_factory"] = lambda: GridDomain()

with solver_type(**solver_args) as slv:
GridDomain.solve_with(slv)
plan, cost = get_plan(dom, slv)
except Exception as e:
print(e)
noexcept = False
assert (
solver_type.check_domain(dom)
and noexcept
and (
(not solver_cpp["optimal"]) or parallel or (cost == 18 and len(plan) == 18)
)

dom = GridDomain()
solver_type = load_registered_solver(solver_cpp["entry"])
solver_args = deepcopy(solver_cpp["config"])
if "parallel" in inspect.signature(solver_type.__init__).parameters:
solver_args["parallel"] = parallel
if (
"shared_memory_proxy" in inspect.signature(solver_type.__init__).parameters
and shared_memory
):
solver_args["shared_memory_proxy"] = GridShmProxy()
solver_args["domain_factory"] = lambda: GridDomain()

with solver_type(**solver_args) as slv:
GridDomain.solve_with(slv)
plan, cost = get_plan(dom, slv)

assert solver_type.check_domain(dom) and (
(not solver_cpp["optimal"]) or parallel or (cost == 18 and len(plan) == 18)
)
34 changes: 13 additions & 21 deletions tests/solvers/python/test_python_solvers.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,25 +150,17 @@ def get_plan(domain, solver):


def test_solve_python(solver_python):
noexcept = True

try:
dom = GridDomain()
solver_type = load_registered_solver(solver_python["entry"])
solver_args = deepcopy(solver_python["config"])
if solver_python["entry"] == "StableBaseline":
solver_args["algo_class"] = PPO
elif solver_python["entry"] == "RayRLlib":
solver_args["algo_class"] = DQN

with solver_type(**solver_args) as slv:
GridDomain.solve_with(slv)
plan, cost = get_plan(dom, slv)
except Exception as e:
print(e)
noexcept = False
assert (
solver_type.check_domain(dom)
and noexcept
and ((not solver_python["optimal"]) or (cost == 18 and len(plan) == 18))
dom = GridDomain()
solver_type = load_registered_solver(solver_python["entry"])
solver_args = deepcopy(solver_python["config"])
if solver_python["entry"] == "StableBaseline":
solver_args["algo_class"] = PPO
elif solver_python["entry"] == "RayRLlib":
solver_args["algo_class"] = DQN

with solver_type(**solver_args) as slv:
GridDomain.solve_with(slv)
plan, cost = get_plan(dom, slv)
assert solver_type.check_domain(dom) and (
(not solver_python["optimal"]) or (cost == 18 and len(plan) == 18)
)
259 changes: 124 additions & 135 deletions tests/solvers/python/test_up_bridge_solver.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,150 +10,139 @@

@pytest.mark.skipif(sys.version_info < (3, 10), reason="requires python3.10 or higher")
def test_up_bridge_solver_classic():
noexcept = True

try:
import unified_planning
from unified_planning.plans import ActionInstance
from unified_planning.shortcuts import BoolType, OneshotPlanner, UserType

from skdecide.hub.domain.up import SkUPAction, UPDomain
from skdecide.hub.solver.up import UPSolver

Location = UserType("Location")
robot_at = unified_planning.model.Fluent("robot_at", BoolType(), l=Location)
connected = unified_planning.model.Fluent(
"connected", BoolType(), l_from=Location, l_to=Location
)

move = unified_planning.model.InstantaneousAction(
"move", l_from=Location, l_to=Location
)
l_from = move.parameter("l_from")
l_to = move.parameter("l_to")
move.add_precondition(connected(l_from, l_to))
move.add_precondition(robot_at(l_from))
move.add_effect(robot_at(l_from), False)
move.add_effect(robot_at(l_to), True)

problem = unified_planning.model.Problem("robot")
problem.add_fluent(robot_at, default_initial_value=False)
problem.add_fluent(connected, default_initial_value=False)
problem.add_action(move)

NLOC = 10
locations = [
unified_planning.model.Object("l%s" % i, Location) for i in range(NLOC)
import unified_planning
from unified_planning.plans import ActionInstance
from unified_planning.shortcuts import BoolType, OneshotPlanner, UserType

from skdecide.hub.domain.up import SkUPAction, UPDomain
from skdecide.hub.solver.up import UPSolver

Location = UserType("Location")
robot_at = unified_planning.model.Fluent("robot_at", BoolType(), l=Location)
connected = unified_planning.model.Fluent(
"connected", BoolType(), l_from=Location, l_to=Location
)

move = unified_planning.model.InstantaneousAction(
"move", l_from=Location, l_to=Location
)
l_from = move.parameter("l_from")
l_to = move.parameter("l_to")
move.add_precondition(connected(l_from, l_to))
move.add_precondition(robot_at(l_from))
move.add_effect(robot_at(l_from), False)
move.add_effect(robot_at(l_to), True)

problem = unified_planning.model.Problem("robot")
problem.add_fluent(robot_at, default_initial_value=False)
problem.add_fluent(connected, default_initial_value=False)
problem.add_action(move)

NLOC = 10
locations = [
unified_planning.model.Object("l%s" % i, Location) for i in range(NLOC)
]
problem.add_objects(locations)

problem.set_initial_value(robot_at(locations[0]), True)
for i in range(NLOC - 1):
problem.set_initial_value(connected(locations[i], locations[i + 1]), True)

problem.add_goal(robot_at(locations[-1]))

domain_factory = lambda: UPDomain(problem)
domain = domain_factory()

with UPSolver(
operation_mode=OneshotPlanner,
name="pyperplan",
engine_params={"output_stream": sys.stdout},
) as solver:
UPDomain.solve_with(solver, domain_factory)
s = domain.get_initial_state()
step = 0
p = []
while not domain.is_goal(s) and step < 10:
p.append(solver.get_next_action(s))
s = domain.get_next_state(s, p[-1])
step += 1
ep = [
SkUPAction(ActionInstance(move, (locations[i], locations[i + 1])))
for i in range(9)
]
problem.add_objects(locations)

problem.set_initial_value(robot_at(locations[0]), True)
for i in range(NLOC - 1):
problem.set_initial_value(connected(locations[i], locations[i + 1]), True)

problem.add_goal(robot_at(locations[-1]))

domain_factory = lambda: UPDomain(problem)
domain = domain_factory()

with UPSolver(
operation_mode=OneshotPlanner,
name="pyperplan",
engine_params={"output_stream": sys.stdout},
) as solver:
UPDomain.solve_with(solver, domain_factory)
s = domain.get_initial_state()
step = 0
p = []
while not domain.is_goal(s) and step < 10:
p.append(solver.get_next_action(s))
s = domain.get_next_state(s, p[-1])
step += 1
ep = [
SkUPAction(ActionInstance(move, (locations[i], locations[i + 1])))
for i in range(9)
]
except Exception as e:
print(e)
noexcept = False
assert noexcept and UPSolver.check_domain(domain) and step == 9 and p == ep

assert UPSolver.check_domain(domain) and step == 9 and p == ep


@pytest.mark.skipif(
sys.version_info < (3, 10),
reason="requires python3.10 or higher",
)
def test_up_bridge_solver_numeric():
noexcept = True

try:
import unified_planning
from unified_planning.shortcuts import (
Fluent,
InstantaneousAction,
Not,
OneshotPlanner,
)

from skdecide.hub.domain.up import UPDomain
from skdecide.hub.solver.up import UPSolver

x = Fluent("x")
y = Fluent("y")

a = InstantaneousAction("a")
a.add_precondition(Not(x))
a.add_effect(x, True)

b = InstantaneousAction("b")
b.add_precondition(Not(y))
b.add_effect(y, True)

c = InstantaneousAction("c")
c.add_precondition(y)
c.add_effect(x, True)

problem = unified_planning.model.Problem("simple_with_costs")

problem.add_fluent(x)
problem.add_fluent(y)

problem.add_action(a)
problem.add_action(b)
problem.add_action(c)

problem.set_initial_value(x, False)
problem.set_initial_value(y, False)

problem.add_goal(x)

problem.add_quality_metric(
unified_planning.model.metrics.MinimizeActionCosts({a: 10, b: 1, c: 1})
)

domain_factory = lambda: UPDomain(problem)
domain = domain_factory()

# Cannot run on Windows: see https://github.com/aiplan4eu/up-fast-downward/issues/15
with UPSolver(
operation_mode=OneshotPlanner,
name="fast-downward-opt",
engine_params={"output_stream": sys.stdout},
) as solver:
UPDomain.solve_with(solver, domain_factory)
s = domain.get_initial_state()
step = 0
p = []
while not domain.is_goal(s) and step < 10:
p.append(solver.get_next_action(s))
s = domain.get_next_state(s, p[-1])
step += 1
except Exception as e:
print(e)
noexcept = False
import unified_planning
from unified_planning.shortcuts import (
Fluent,
InstantaneousAction,
Not,
OneshotPlanner,
)

from skdecide.hub.domain.up import UPDomain
from skdecide.hub.solver.up import UPSolver

x = Fluent("x")
y = Fluent("y")

a = InstantaneousAction("a")
a.add_precondition(Not(x))
a.add_effect(x, True)

b = InstantaneousAction("b")
b.add_precondition(Not(y))
b.add_effect(y, True)

c = InstantaneousAction("c")
c.add_precondition(y)
c.add_effect(x, True)

problem = unified_planning.model.Problem("simple_with_costs")

problem.add_fluent(x)
problem.add_fluent(y)

problem.add_action(a)
problem.add_action(b)
problem.add_action(c)

problem.set_initial_value(x, False)
problem.set_initial_value(y, False)

problem.add_goal(x)

problem.add_quality_metric(
unified_planning.model.metrics.MinimizeActionCosts({a: 10, b: 1, c: 1})
)

domain_factory = lambda: UPDomain(problem)
domain = domain_factory()

# Cannot run on Windows: see https://github.com/aiplan4eu/up-fast-downward/issues/15
with UPSolver(
operation_mode=OneshotPlanner,
name="fast-downward-opt",
engine_params={"output_stream": sys.stdout},
) as solver:
UPDomain.solve_with(solver, domain_factory)
s = domain.get_initial_state()
step = 0
p = []
while not domain.is_goal(s) and step < 10:
p.append(solver.get_next_action(s))
s = domain.get_next_state(s, p[-1])
step += 1

assert (
noexcept
and UPSolver.check_domain(domain)
UPSolver.check_domain(domain)
and step == 2
and p[0].up_action == b
and p[1].up_action == c
Expand Down
Loading