Skip to content

Commit

Permalink
migrate evt script over to dvt
Browse files Browse the repository at this point in the history
  • Loading branch information
ahiuchingau committed Feb 19, 2025
1 parent 554961d commit e0dcfd2
Show file tree
Hide file tree
Showing 10 changed files with 197 additions and 95 deletions.
6 changes: 6 additions & 0 deletions api/src/opentrons/hardware_control/modules/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
LimitSwitchStatus,
PlatformStatus,
StackerAxis,
Direction,
)
from opentrons.drivers.rpi_drivers.types import USBPort

Expand Down Expand Up @@ -395,6 +396,11 @@ def from_status(
case StackerAxis.L:
return cls.EXTENDED if status.LR else cls.RETRACTED
return cls.UNKNOWN

@classmethod
def from_direction(cls, direction: Direction) -> "StackerAxisState":
"""Get the axis state from the direction."""
return cls.RETRACTED if direction == Direction.RETRACT else cls.EXTENDED


class LatchState(str, Enum):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import asyncio
from pathlib import Path
from typing import cast
from dataclasses import asdict

from hardware_testing.data import ui

Expand Down Expand Up @@ -114,7 +113,7 @@ async def _main(cfg: TestConfig) -> None:
# RUN TESTS
for section, test_run in cfg.tests.items():
ui.print_title(section.value)
await test_run(stacker, report, section.value, **asdict(cfg))
await test_run(stacker, report, section.value, cfg.simulate, api, cfg.revision)

# SAVE REPORT
ui.print_title("DONE")
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"""Config."""
from dataclasses import dataclass
import enum
from typing import Dict, Callable
from typing import Dict, Any, TypeVar, Callable, Coroutine

from opentrons.drivers.flex_stacker.types import HardwareRevision
from hardware_testing.data.csv_report import CSVReport, CSVSection
Expand All @@ -16,6 +16,9 @@
)


T = TypeVar("T")


class TestSection(enum.Enum):
"""Test Section."""

Expand All @@ -33,7 +36,7 @@ class TestConfig:

revision: HardwareRevision
simulate: bool
tests: Dict[TestSection, Callable]
tests: Dict[TestSection, Callable[..., Coroutine[Any, Any, None]]]


TESTS = [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
CSVResult,
)

from hardware_testing.opentrons_api import helpers_ot3
from opentrons.drivers.flex_stacker.types import HardwareRevision
from opentrons.hardware_control.modules import FlexStacker

Expand All @@ -23,7 +24,7 @@ def build_csv_lines() -> List[Union[CSVLine, CSVLineRepeating]]:


async def test_gcode(
module: FlexStacker, report: CSVReport, revision: HardwareRevision, **kwargs
module: FlexStacker, report: CSVReport, revision: HardwareRevision
) -> None:
"""Send and receive response for GCODE M115."""
success = True
Expand All @@ -41,9 +42,7 @@ async def test_gcode(
)


async def test_eeprom(
module: FlexStacker, report: CSVReport, simulate: bool, **kwargs
) -> None:
async def test_eeprom(module: FlexStacker, report: CSVReport, simulate: bool) -> None:
"""Set serial number and make sure device info is updated accordingly."""
success = True
if not simulate:
Expand All @@ -62,9 +61,7 @@ async def test_eeprom(
)


async def test_leds(
module: FlexStacker, report: CSVReport, simulate: bool, **kwargs
) -> None:
async def test_leds(module: FlexStacker, report: CSVReport, simulate: bool) -> None:
"""Prompt tester to verify the status led is blinking."""
if not simulate:
is_blinking = ui.get_user_answer("Is the status LED blinking?")
Expand All @@ -75,13 +72,20 @@ async def test_leds(
)


async def run(driver: FlexStacker, report: CSVReport, section: str, **kwargs) -> None:
async def run(
module: FlexStacker,
report: CSVReport,
section: str,
simulate: bool,
api: helpers_ot3.OT3API,
hardware_revision: HardwareRevision,
) -> None:
"""Run."""
ui.print_header("USB Communication")
await test_gcode(driver, report, **kwargs)
await test_gcode(module, report, hardware_revision)

ui.print_header("EEPROM Communication")
await test_eeprom(driver, report, **kwargs)
await test_eeprom(module, report, simulate)

ui.print_header("LED Blinking")
await test_leds(driver, report, **kwargs)
await test_leds(module, report, simulate)
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@
CSVResult,
)

from .driver import FlexStacker
from hardware_testing.opentrons_api import helpers_ot3
from opentrons.hardware_control.modules import FlexStacker
from opentrons.drivers.flex_stacker.types import HardwareRevision


def build_csv_lines() -> List[Union[CSVLine, CSVLineRepeating]]:
Expand All @@ -21,16 +23,25 @@ def build_csv_lines() -> List[Union[CSVLine, CSVLineRepeating]]:
]


def run(driver: FlexStacker, report: CSVReport, section: str) -> None:
async def run(
module: FlexStacker,
report: CSVReport,
section: str,
simulate: bool,
api: helpers_ot3.OT3API,
hardware_revision: HardwareRevision,
) -> None:
"""Run."""
ui.print_header("Close Door")
if not driver._simulating:
if not simulate:
ui.get_user_ready("Close the hopper door")
closed = driver.get_hopper_door_closed()
await module._reader.get_door_closed()
closed = module._reader.hopper_door_closed
report(section, "close-door", [CSVResult.from_bool(closed)])

ui.print_header("Open Door")
if not driver._simulating:
if not simulate:
ui.get_user_ready("Open the hopper door")
closed = driver.get_hopper_door_closed()
await module._reader.get_door_closed()
closed = module._reader.hopper_door_closed
report(section, "open-door", [CSVResult.from_bool(not closed)])
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,14 @@
CSVResult,
)

from .driver import FlexStacker, Direction, StackerAxis
from hardware_testing.opentrons_api import helpers_ot3
from opentrons.drivers.flex_stacker.types import (
StackerAxis,
Direction,
HardwareRevision,
)
from opentrons.hardware_control.modules import FlexStacker
from opentrons.hardware_control.types import EstopState


def build_csv_lines() -> List[Union[CSVLine, CSVLineRepeating]]:
Expand All @@ -24,40 +31,54 @@ def build_csv_lines() -> List[Union[CSVLine, CSVLineRepeating]]:
]


def axis_at_limit(driver: FlexStacker, axis: StackerAxis) -> Direction:
async def axis_at_limit(module: FlexStacker, axis: StackerAxis) -> Direction:
"""Check which direction an axis is at the limit switch."""
if axis is StackerAxis.L:
# L axis only has one limit switch
if driver.get_limit_switch(axis, Direction.RETRACT):
print(axis, "is at ", Direction.RETRACT, "limit switch")
if await module._driver.get_limit_switch(axis, Direction.RETRACT):
print(axis, " is at ", Direction.RETRACT, " limit switch")
return Direction.RETRACT
else:
for dir in Direction:
if driver.get_limit_switch(axis, dir):
print(axis, "is at ", dir, "limit switch")
if await module._driver.get_limit_switch(axis, dir):
print(axis, " is at ", dir, " limit switch")
return dir
raise RuntimeError(f"{axis} is not at any limit switch")


def run(driver: FlexStacker, report: CSVReport, section: str) -> None:
async def run(
module: FlexStacker,
report: CSVReport,
section: str,
simulate: bool,
api: helpers_ot3.OT3API,
hardware_revision: HardwareRevision,
) -> None:
"""Run."""
x_limit = axis_at_limit(driver, StackerAxis.X)
z_limit = axis_at_limit(driver, StackerAxis.Z)
l_limit = axis_at_limit(driver, StackerAxis.L)
if not simulate:
x_limit = await axis_at_limit(module, StackerAxis.X)
z_limit = await axis_at_limit(module, StackerAxis.Z)
l_limit = await axis_at_limit(module, StackerAxis.L)
else:
x_limit = Direction.RETRACT
z_limit = Direction.RETRACT
l_limit = Direction.RETRACT

ui.print_header("Trigger E-Stop")
if not driver._simulating:
if not simulate:
ui.get_user_ready("Trigger the E-Stop")

if not driver.get_estop():
if not api.get_estop_state() == EstopState.PHYSICALLY_ENGAGED:
print("E-Stop is not triggered")
report(section, "trigger-estop", [CSVResult.FAIL])
return

report(section, "trigger-estop", [CSVResult.PASS])

print("Check X limit switch...")
limit_switch_triggered = driver.get_limit_switch(StackerAxis.X, x_limit)
limit_switch_triggered = await module._driver.get_limit_switch(
StackerAxis.X, x_limit
)
if limit_switch_triggered:
report(
section,
Expand All @@ -66,25 +87,35 @@ def run(driver: FlexStacker, report: CSVReport, section: str) -> None:
)
else:
print("try to move X axis back to the limit switch...")
driver.move_in_mm(StackerAxis.X, x_limit.distance(3))
await module.move_axis(StackerAxis.X, x_limit, 3.0)
print("X should not move")
report(
section,
"x-move-disabled",
[CSVResult.from_bool(not driver.get_limit_switch(StackerAxis.X, x_limit))],
[
CSVResult.from_bool(
not await module._driver.get_limit_switch(StackerAxis.X, x_limit)
)
],
)

print("try to move Z axis...")
driver.move_in_mm(StackerAxis.Z, z_limit.opposite().distance(10))
await module.move_axis(StackerAxis.Z, z_limit.opposite(), 10.0)
print("Z should not move")
report(
section,
"z-move-disabled",
[CSVResult.from_bool(driver.get_limit_switch(StackerAxis.Z, z_limit))],
[
CSVResult.from_bool(
await module._driver.get_limit_switch(StackerAxis.Z, z_limit)
)
],
)

print("Check L limit switch...")
limit_switch_triggered = driver.get_limit_switch(StackerAxis.L, l_limit)
limit_switch_triggered = await module._driver.get_limit_switch(
StackerAxis.L, l_limit
)
if limit_switch_triggered:
report(
section,
Expand All @@ -93,14 +124,22 @@ def run(driver: FlexStacker, report: CSVReport, section: str) -> None:
)
else:
print("try to move L axis back to the limit switch...")
driver.move_in_mm(StackerAxis.L, l_limit.distance(1))
await module.move_axis(StackerAxis.L, l_limit, 1.0)
print("L should not move")
report(
section,
"l-move-disabled",
[CSVResult.from_bool(not driver.get_limit_switch(StackerAxis.L, l_limit))],
[
CSVResult.from_bool(
not await module._driver.get_limit_switch(StackerAxis.L, l_limit)
)
],
)

if not driver._simulating:
if not simulate:
ui.get_user_ready("Untrigger the E-Stop")
report(section, "untrigger-estop", [CSVResult.from_bool(not driver.get_estop())])
report(
section,
"untrigger-estop",
[CSVResult.from_bool(api.get_estop_state() == EstopState.DISENGAGED)],
)
Loading

0 comments on commit e0dcfd2

Please sign in to comment.