diff --git a/components/gripper.py b/components/gripper.py index 611c8cab..f2e20428 100644 --- a/components/gripper.py +++ b/components/gripper.py @@ -92,6 +92,17 @@ def get_current_piece(self) -> GamePiece: return GamePiece.NONE return self.holding + @feedback + def get_current_piece_as_int(self) -> int: + piece = self.get_current_piece() + if piece == GamePiece.NONE: + return 0 + if piece == GamePiece.CONE: + return 1 + if piece == GamePiece.CUBE: + return 2 + return -1 + @feedback def cube_present(self) -> bool: return self.get_full_open() and self.cube_break_beam_broken() diff --git a/components/score_tracker.py b/components/score_tracker.py new file mode 100644 index 00000000..c04772ba --- /dev/null +++ b/components/score_tracker.py @@ -0,0 +1,176 @@ +import numpy as np +import numpy.typing as npt +import wpilib +import magicbot +from ntcore import NetworkTableInstance +from utilities.game import GamePiece + + +class ScoreTracker: + CUBE_MASK = np.array( + [ + [0, 1, 0, 0, 1, 0, 0, 1, 0], + [0, 1, 0, 0, 1, 0, 0, 1, 0], + [1, 1, 1, 1, 1, 1, 1, 1, 1], + ], + dtype=bool, + ) + CONE_MASK = np.array( + [ + [1, 0, 1, 1, 0, 1, 1, 0, 1], + [1, 0, 1, 1, 0, 1, 1, 0, 1], + [1, 1, 1, 1, 1, 1, 1, 1, 1], + ], + dtype=bool, + ) + + CONF_EXP_FILTER_ALPHA = 0.8 + + CONF_THRESHOLD = 0.2 + + def __init__(self) -> None: + # -1.0 - no piece for certain, 1.0 - a piece for certain, 0.0 - unsure + self.confidences_red: np.ndarray = np.zeros((3, 9), dtype=float) + self.confidences_blue: np.ndarray = np.zeros((3, 9), dtype=float) + self.state_red = np.zeros((3, 9), dtype=bool) + self.state_blue = np.zeros((3, 9), dtype=bool) + self.did_states_change = magicbot.will_reset_to(False) + self.inst = NetworkTableInstance.getDefault() + nt = self.inst.getTable("left_cam") + self.nodes = nt.getEntry("nodes") + + def execute(self) -> None: + if not self.nodes.exists(): + print("skipping") + return + _data = self.nodes.getStringArray([]) + data = self.nt_data_to_node_data(_data) + for node in data: + side = ( + wpilib.DriverStation.Alliance.kBlue + if node[0] >= 27 + else wpilib.DriverStation.Alliance.kRed + ) + col = node[0] % 9 + row = (node[0] % 27) // 9 + self.add_vision_data( + side=side, + pos=np.array([row, col]), + confidence=(1.0 if node[1] else -0.5), + ) + + def nt_data_to_node_data(self, data: list[str]) -> list[tuple[int, bool]]: + nodes: list[tuple[int, bool]] = [] + for node in data: + as_array = str(node) + a = (int(f"{as_array[0]}{as_array[1]}"), as_array[2] == "1") + nodes.append(a) + return nodes + + def add_vision_data( + self, side: wpilib.DriverStation.Alliance, pos: npt.ArrayLike, confidence: float + ) -> None: + confidences = ( + self.confidences_red if side == side.kRed else self.confidences_blue + ) + confidences[pos] = confidences[ + pos + ] * ScoreTracker.CONF_EXP_FILTER_ALPHA + confidence * ( + 1.0 - ScoreTracker.CONF_EXP_FILTER_ALPHA + ) + if abs(confidences[pos]) > ScoreTracker.CONF_THRESHOLD: + self.did_states_change = True + state = self.state_red if side == side.kRed else self.state_blue + state[pos] = confidence > 0.0 + + @staticmethod + def count_links(r: npt.NDArray[np.bool_]) -> int: + i = 0 + n = 0 + length = len(r) + while i < length - 2: + if r[i] and r[i + 1] and r[i + 2]: + n += 1 + i += 3 + continue + i += 1 + return n + + @staticmethod + def evaluate_state(a: npt.NDArray[np.bool_]) -> int: + return ( + sum(ScoreTracker.count_links(r) for r in a) * 5 + + a[0].sum() * 5 + + a[1].sum() * 3 + + a[2].sum() * 2 + ) + + @staticmethod + def run_lengths_mod3(state: npt.NDArray[np.bool_]) -> npt.NDArray[np.int_]: + """ + Returns an array where corresponding in shape to the input, where + every value is replaced by the length of the longest uninterrupted + run of true values containing it, modulo 3 + """ + run_lengths = np.zeros_like(state, dtype=int) + for y in range(3): + x = 0 + while x < 9: + if not state[y, x]: + x += 1 + continue + acc = 0 + for xn in range(x, 9): + if not state[y, xn]: + break + acc += 1 + run_lengths[y, x : x + acc] = acc % 3 + x += acc + return run_lengths + + @staticmethod + def get_in_row(arr: npt.NDArray, x: int, y: int, def_val): + if x < 0 or x > 8: + return def_val + else: + return arr[y, x] + + @staticmethod + def get_best_moves( + state: npt.NDArray[np.bool_], + type_to_test: GamePiece, + link_preparation_score: float = 2.5, + ) -> npt.NDArray: + vals = np.zeros_like(state, dtype=float) + run_lengths = ScoreTracker.run_lengths_mod3(state) + for y in range(3): + for x in range(9): + if ( + state[y, x] + or ( + type_to_test == GamePiece.CUBE + and not ScoreTracker.CUBE_MASK[y, x] + ) + or ( + type_to_test == GamePiece.CONE + and not ScoreTracker.CONE_MASK[y, x] + ) + ): + continue + val = [5.0, 3.0, 2.0][y] + # Check link completion + if ( + ScoreTracker.get_in_row(run_lengths, x - 1, y, 0) + + ScoreTracker.get_in_row(run_lengths, x + 1, y, 0) + >= 2 + ): + val += 5.0 + # Otherwise, check link preparation (state where a link can be completed after 1 move) + else: + for o in [-2, -1, 1, 2]: + if ScoreTracker.get_in_row(run_lengths, x + o, y, 0) == 1: + val += link_preparation_score + break + vals[y, x] = val + m = vals.max() + return np.argwhere(vals == m) diff --git a/controllers/score_game_piece.py b/controllers/score_game_piece.py index 3c98ef19..215c97cd 100644 --- a/controllers/score_game_piece.py +++ b/controllers/score_game_piece.py @@ -5,11 +5,17 @@ from controllers.movement import Movement from controllers.recover import RecoverController -from magicbot import state, timed_state, StateMachine +from magicbot import state, StateMachine, feedback, timed_state from enum import Enum, auto -from utilities.game import Node, get_closest_node, get_score_location, Rows - -from wpimath.geometry import Translation2d +from utilities.game import ( + Node, + get_closest_node, + get_score_location, + Rows, + is_red, + get_closest_node_in_allowed, +) +from components.score_tracker import ScoreTracker class NodePickStratergy(Enum): @@ -25,11 +31,14 @@ class ScoreGamePieceController(StateMachine): movement: Movement recover: RecoverController + + score_tracker: ScoreTracker + HARD_UP_SPEED = 0.3 ARM_PRE_TIME = 1.5 def __init__(self) -> None: - self.node_stratergy = NodePickStratergy.CLOSEST + self.node_stratergy = NodePickStratergy.BEST self.override_node = Node(Rows.HIGH, 0) self.prefered_row = Rows.HIGH self.target_node = Node(Rows.HIGH, 0) @@ -69,32 +78,76 @@ def open_flapper(self) -> None: @timed_state(duration=0.5, must_finish=True) def dropping(self) -> None: self.gripper.open() + if self.gripper.get_full_open(): + self.done() def done(self) -> None: super().done() - self.movement.inputs_lock = False self.recover.engage() + def score_best(self) -> None: + self.node_stratergy = NodePickStratergy.BEST + def score_closest_high(self) -> None: - self.target_node = self._get_closest(Rows.HIGH) - self.engage() + self.node_stratergy = NodePickStratergy.CLOSEST + self.prefer_high() def score_closest_mid(self) -> None: - self.target_node = self._get_closest(Rows.MID) - self.engage() + self.node_stratergy = NodePickStratergy.CLOSEST + self.prefer_high() - def _get_closest(self, row: Rows) -> Node: + def pick_node(self) -> Node: cur_pos = self.movement.chassis.get_pose().translation() - cur_vel = self.movement.chassis.get_velocity() - lookahead_time = 1.0 - effective_pos = cur_pos + Translation2d( - cur_vel.vx * lookahead_time, cur_vel.vy * lookahead_time - ) - return get_closest_node(effective_pos, self.gripper.get_current_piece(), row) + if self.node_stratergy is NodePickStratergy.CLOSEST: + return get_closest_node( + cur_pos, self.gripper.get_current_piece(), self.prefered_row, set() + ) + elif self.node_stratergy is NodePickStratergy.OVERRIDE: + return self.override_node + elif self.node_stratergy is NodePickStratergy.BEST: + state = ( + self.score_tracker.state_blue + if is_red() + else self.score_tracker.state_red + ) + best = self.score_tracker.get_best_moves(state, self.gripper.holding) + nodes: list[Node] = [] + for i in range(len(best)): + as_tuple = tuple(best[i]) + node = Node(Rows(int(as_tuple[0])), as_tuple[1]) + nodes.append(node) + + return get_closest_node_in_allowed( + cur_pos, self.gripper.get_current_piece(), nodes + ) + + @feedback + def state_red(self) -> list[bool]: + state: list[bool] = [] + for i in self.score_tracker.state_blue.tolist(): + for j in i: + state.append(j) + return state + + @feedback + def state_blue(self) -> list[bool]: + state: list[bool] = [] + for i in self.score_tracker.state_blue.tolist(): + for j in i: + state.append(j) + return state + + @feedback + def pick_node_as_int(self) -> int: + # node = self.pick_node() + node = Node(Rows.HIGH, 0) + return (node.row.value - 1) * 3 + node.col + + def prefer_high(self) -> None: + self.prefered_row = Rows.HIGH - def score_best(self) -> None: - # placeholder - self.score_closest_high() + def prefer_mid(self) -> None: + self.prefered_row = Rows.MID def score_without_moving(self, node: Node) -> None: self.target_node = node diff --git a/robot.py b/robot.py index 56429ac4..8050dc83 100644 --- a/robot.py +++ b/robot.py @@ -19,6 +19,7 @@ from components.arm import Arm from components.gripper import Gripper from components.leds import StatusLights, DisplayType, LedColors +from components.score_tracker import ScoreTracker from utilities.scalers import rescale_js from utilities.game import is_red @@ -40,6 +41,7 @@ class MyRobot(magicbot.MagicRobot): intake: Intake status_lights: StatusLights gripper: Gripper + score_tracker: ScoreTracker port_localizer: VisualLocalizer starboard_localizer: VisualLocalizer @@ -258,6 +260,7 @@ def testPeriodic(self) -> None: self.port_localizer.execute() self.starboard_localizer.execute() self.status_lights.execute() + self.score_tracker.execute() def cancel_controllers(self): self.acquire_cone.done() diff --git a/utilities/game.py b/utilities/game.py index 78273499..b860f110 100644 --- a/utilities/game.py +++ b/utilities/game.py @@ -97,6 +97,9 @@ def get_valid_piece(self) -> GamePiece: else: return GamePiece.CONE + def get_id(self) -> int: + return (self.row.value - 1) * 3 + self.col + def get_node_location(node: Node) -> Translation3d: if is_red(): @@ -123,8 +126,12 @@ def get_score_location(node: Node) -> tuple[Pose2d, Rotation2d]: return goal, approach -def get_closest_node(pos: Translation2d, piece: GamePiece, row: Rows) -> Node: +def get_closest_node( + pos: Translation2d, piece: GamePiece, row: Rows, impossible: set[int] +) -> Node: def get_node_dist(node: Node) -> float: + if node.get_id() in impossible: + return 999999 return get_score_location(node)[0].translation().distance(pos) if piece == GamePiece.CONE: @@ -137,6 +144,15 @@ def get_node_dist(node: Node) -> float: return min(nodes, key=get_node_dist) +def get_closest_node_in_allowed( + pos: Translation2d, piece: GamePiece, allowed: list[Node] +) -> Node: + def get_node_dist(node: Node) -> float: + return get_score_location(node)[0].translation().distance(pos) + + return min(allowed, key=get_node_dist) + + # tag in blue loading bay, on red side of field 16=x tag_4 = apriltag_layout.getTagPose(4) assert tag_4 is not None